7#ifndef _SETJOINPARELLED_H_
8#define _SETJOINPARELLED_H_
18#include <unordered_map>
19#include <unordered_set>
42 std::string
typeMap[3] = {
"Jaccard",
"Cosine",
"Dice"};
78#if MAINTAIN_VALUE == 1
102 std::vector<std::pair<ui, ui>> range;
103 std::vector<ui> range_st;
104 std::vector<int> range_id, rangeIdQuery, rangeQueryAdd;
107 std::vector<std::vector<ui>> parts_keys, partsKeysQuery;
108 std::vector<std::vector<ui>> onedelete_keys, oneDeleteKeysQuery;
109 std::vector<std::vector<ui>> odkeys_st, oDKeysStQuery;
115 std::vector<std::pair<int, ui>> valuesArr[
MAXTHREADNUM];
132 SetJoinParallel(
const std::vector<std::vector<ui>> &sorted_records,
const std::vector<double> &recwt,
133 const std::vector<double> &_wordwt,
double _det,
ui _maxHeapSize = 0,
134 bool _isWeightedComp =
false)
138 if(_det <= 0.4 && work_maxSize >= 55) {
143 printf(
"Work size: %u\n",
work_n);
147#if MAINTAIN_VALUE == 1
155 SetJoinParallel(
const std::vector<std::vector<ui>> &work_records,
const std::vector<std::vector<ui>> &query_records,
156 const std::vector<double> &workwt,
const std::vector<double> &querywt,
157 const std::vector<double> &_wordwt,
double _det,
ui _maxHeapSize = 0,
158 bool _isWeightedComp =
false)
165 if(_det <= 0.4 && work_maxSize >= 55) {
169 if(_det <= 0.4 && query_maxSize >= 55) {
179#if MAINTAIN_VALUE == 1
198 printf(
"type: %s det: %.4lf coe: %.8lf ALPHA: %.8lf maxIndexPartNum: %u \n", st.c_str(),
det,
coePart,
ALPHA,
205 double avergaeSize = 0.0;
206 for(
const auto &rec : dataset) {
207 if(rec.empty())
continue;
208 avergaeSize += (double)rec.size() * 1.0;
211 avergaeSize = avergaeSize / (unempty * 1.0);
212 long double sd = 0.0;
213 for(
const auto &rec : dataset) {
214 if(rec.empty())
continue;
215 sd += (rec.size() * 1.0 - avergaeSize) * (rec.size() * 1.0 - avergaeSize);
217 sd = sd / (unempty * 1.0);
218 printf(
"Resize dataset on 1sd, mean: %.1lf\tsd: %.1Lf\n", avergaeSize, sd);
219 int bound = ceil(avergaeSize + 1.0 * sd - 1e-5);
220 for(
auto &rec : dataset)
221 if((
int)rec.size() > bound)
226 double total_hash_cost = 0;
227 double total_memeory_cost = 0;
228 double total_find_cost = 0;
229 double total_alloc_cost = 0;
230 double total_verif_cost = 0;
239 sum = total_hash_cost + total_memeory_cost + total_find_cost + total_alloc_cost + total_verif_cost;
240 total_hash_cost = total_hash_cost / sum *
search_cost;
241 total_memeory_cost = total_memeory_cost / sum *
search_cost;
242 total_find_cost = total_find_cost / sum *
search_cost;
243 total_alloc_cost = total_alloc_cost / sum *
search_cost;
244 total_verif_cost = total_verif_cost / sum *
search_cost;
246 printf(
"|index_cost| total_hash_cost| total_memeory_cost| find_cost| alloc_cost| verif_cost|\n");
247 printf(
"|%f|%f|%f|%f|%f|%f|\n",
index_cost, total_hash_cost, total_memeory_cost, total_find_cost, total_alloc_cost, total_verif_cost);
251 std::vector<unsigned long long> range_size(range.size());
252 for (
ui i = 0; i < range_id.size(); i++) {
256 double total_size = 0;
258 for (
auto &size : range_size) {
260 max_size = std::max(max_size, (
double)size);
263 printf(
"Average Range size: %.3f Maximum Range size ratio %.3f \n", total_size / range_size.size(), max_size / total_size);
268 unsigned long long pairs_amount = 0;
278#pragma omp parallel for
280 int tid = omp_get_thread_num();
291#pragma omp parallel for
293 int tid = omp_get_thread_num();
305 std::cout <<
"Start merging" << std::endl << std::flush;
306#if MAINTAIN_VALUE == 0
311#elif MAINTAIN_VALUE == 1
316 finalPairs.emplace_back(wp.id1, wp.id2);
322 sort(finalPairs.begin(), finalPairs.end());
323 auto it = unique(finalPairs.begin(), finalPairs.end());
324 if(it != finalPairs.end()) {
325 std::cerr <<
"Duplicate results: " << distance(it, finalPairs.end()) << std::endl;
329 std::cout <<
workEmpty.size() << std::endl << std::flush;
336 int require_overlap = 0;
355 if ((
int)
work_dataset[x].size() - posx + current_overlap < require_overlap || (
int)
work_dataset[y].size() - posy + current_overlap < require_overlap)
369 return current_overlap >= require_overlap;
375 int grpIdX =
grpIdA[revIdx];
376 int grpIdY =
grpIdA[revIdy];
378 if(grpIdX == -1 && grpIdY == -1)
379 return overlapSelf(x, y, posx, posy, current_overlap);
380 else if(grpIdX != -1 && grpIdY == -1) {
381 for(
const auto &icid :
groupA[grpIdX]) {
387 else if(grpIdX == -1 && grpIdY != -1) {
388 for(
const auto &icid:
groupA[grpIdY]) {
404 bool overlapRS(
ui x,
ui y,
int posx = 0,
int posy = 0,
int current_overlap = 0) {
406 int require_overlap = 0;
425 if ((
int)
query_dataset[x].size() - posx + current_overlap < require_overlap || (
int)
work_dataset[y].size() - posy + current_overlap < require_overlap)
438 return current_overlap >= require_overlap;
444 int grpIdX =
grpIdA[revIdx];
445 int grpIdY =
grpIdB[revIdy];
447 if(grpIdX == -1 && grpIdY == -1)
448 return overlapRS(x, y, posx, posy, current_overlap);
449 else if(grpIdX != -1 && grpIdY == -1) {
450 for(
const auto &icid :
groupA[grpIdX]) {
456 else if(grpIdX == -1 && grpIdY != -1) {
457 for(
const auto &icid:
groupB[grpIdY]) {
479 set_intersection(records1.begin(), records1.end(),
480 records2.begin(), records2.end(),
481 std::back_inserter(res));
484 for(
const auto &e : res)
495 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
497 return ovlp / (rw1 + rw2 - ovlp);
504 set_intersection(records1.begin(), records1.end(),
505 records2.begin(), records2.end(),
506 std::back_inserter(res));
507 int ovlp = (int)res.size();
509 return ovlp * 1.0 / (records1.size() + records2.size() - ovlp) * 1.0;
517 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
519 return ovlp / sqrt(rw1 * rw2);
526 set_intersection(records1.begin(), records1.end(),
527 records2.begin(), records2.end(),
528 std::back_inserter(res));
529 int ovlp = (int)res.size();
531 return ovlp * 1.0 / sqrt(records1.size() * records2.size()) * 1.0;
539 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
541 return 2.0 * ovlp / (rw1 + rw2);
548 set_intersection(records1.begin(), records1.end(),
549 records2.begin(), records2.end(),
550 std::back_inserter(res));
551 int ovlp = (int)res.size();
553 return ovlp * 2.0 / (records1.size() + records2.size()) * 1.0;
559 void index(
double threshold);
563 ui record_length,
const std::vector<ui> &p_keys,
564 const std::vector<ui> &od_keys,
const std::vector<ui> &odk_st);
Definition setjoin_parallel.h:34
bool flagIC
Definition setjoin_parallel.h:121
std::vector< std::vector< ui > > work_dataset
Definition setjoin_parallel.h:54
bool overlapRSIC(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:441
std::vector< WeightPair > result_pairs_[MAXTHREADNUM]
Definition setjoin_parallel.h:80
ui maxHeapSize
Definition setjoin_parallel.h:73
std::vector< int > grpIdA
Definition setjoin_parallel.h:122
std::vector< std::pair< int, int > > result_pairs[MAXTHREADNUM]
Definition setjoin_parallel.h:76
void findSimPairsSelf()
Definition setjoin_parallel.cc:791
std::vector< double > wordwt
Definition setjoin_parallel.h:58
std::vector< ui > revIdMapB
Definition setjoin_parallel.h:124
void index(double threshold)
Definition setjoin_parallel.cc:11
double weightedJaccard(ui x, ui y)
Definition setjoin_parallel.h:490
double verif_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:91
double ALPHA
Definition setjoin_parallel.h:68
double mem_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:88
ui work_n
Definition setjoin_parallel.h:69
double(SetJoinParallel::* normalFunc)(ui, ui)
Definition setjoin_parallel.h:44
std::vector< ui > idMapA
Definition setjoin_parallel.h:125
bool ifRS
Definition setjoin_parallel.h:36
void reportTimeCost()
Definition setjoin_parallel.h:225
double weightedDice(ui x, ui y)
Definition setjoin_parallel.h:534
double find_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:89
std::vector< std::pair< int, int > > emptyPairs[MAXTHREADNUM]
Definition setjoin_parallel.h:77
ui work_minSize
Definition setjoin_parallel.h:71
SetJoinParallel(const std::vector< std::vector< ui > > &sorted_records, const std::vector< double > &recwt, const std::vector< double > &_wordwt, double _det, ui _maxHeapSize=0, bool _isWeightedComp=false)
Definition setjoin_parallel.h:132
double ** featureValueCache
Definition setjoin_parallel.h:127
double dice(ui x, ui y)
Definition setjoin_parallel.h:543
std::vector< std::vector< int > > groupB
Definition setjoin_parallel.h:123
std::vector< std::vector< ui > > query_dataset
Definition setjoin_parallel.h:55
std::vector< int > grpIdB
Definition setjoin_parallel.h:122
std::vector< ui > workLength
Definition setjoin_parallel.h:63
ui query_maxSize
Definition setjoin_parallel.h:72
double search_cost
Definition setjoin_parallel.h:86
~SetJoinParallel()=default
int earlyTerminated[MAXTHREADNUM]
Definition setjoin_parallel.h:37
std::vector< ui > queryEmpty
Definition setjoin_parallel.h:61
double jaccard(ui x, ui y)
Definition setjoin_parallel.h:499
double alloc_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:90
void mergeResults(std::vector< std::pair< int, int > > &finalPairs)
Definition setjoin_parallel.h:275
void GreedyFindCandidateAndSimPairs(const int &tid, const int indexLenGrp, const ui rid, ui record_length, const std::vector< ui > &p_keys, const std::vector< ui > &od_keys, const std::vector< ui > &odk_st)
Definition setjoin_parallel.cc:427
bool overlapSelfIC(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:372
std::vector< ui > idMapB
Definition setjoin_parallel.h:125
std::string typeMap[3]
Definition setjoin_parallel.h:42
double coePart
Definition setjoin_parallel.h:67
double cosine(ui x, ui y)
Definition setjoin_parallel.h:521
std::vector< double > work_weights
Definition setjoin_parallel.h:56
bool overlapSelf(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:334
SetJoinParallel(const std::vector< std::vector< ui > > &work_records, const std::vector< std::vector< ui > > &query_records, const std::vector< double > &workwt, const std::vector< double > &querywt, const std::vector< double > &_wordwt, double _det, ui _maxHeapSize=0, bool _isWeightedComp=false)
Definition setjoin_parallel.h:155
void resizeData(std::vector< std::vector< ui > > &dataset)
Definition setjoin_parallel.h:202
double coe
Definition setjoin_parallel.h:66
double(SetJoinParallel::* weightedFunc)(ui, ui)
Definition setjoin_parallel.h:43
std::vector< std::vector< int > > groupA
Definition setjoin_parallel.h:123
void reportLargestGroup()
Definition setjoin_parallel.h:250
std::vector< double > query_weights
Definition setjoin_parallel.h:57
uint64_t candidateNum
Definition setjoin_parallel.h:49
bool(SetJoinParallel::* overlapFunc)(ui, ui, int, int, int)
Definition setjoin_parallel.h:45
ui maxIndexPartNum
Definition setjoin_parallel.h:51
int * discreteCacheIdx
Definition setjoin_parallel.h:128
std::vector< ui > workEmpty
Definition setjoin_parallel.h:60
SimFuncType simFType
Definition setjoin_parallel.h:41
unsigned long long getResultPairsAmount()
Definition setjoin_parallel.h:267
int earlyTerminatedEmpty[MAXTHREADNUM]
Definition setjoin_parallel.h:38
double weightedOverlap(ui x, ui y)
Definition setjoin_parallel.h:474
ui work_maxSize
Definition setjoin_parallel.h:71
ui query_n
Definition setjoin_parallel.h:70
double index_cost
Definition setjoin_parallel.h:85
uint64_t resultNum
Definition setjoin_parallel.h:48
bool isWeightedComp
Definition setjoin_parallel.h:79
double weightedCosine(ui x, ui y)
Definition setjoin_parallel.h:512
int isHeap[MAXTHREADNUM]
Definition setjoin_parallel.h:81
void findSimPairsRS()
Definition setjoin_parallel.cc:984
std::vector< ui > revIdMapA
Definition setjoin_parallel.h:124
double hashInFind_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:87
void showPara() const
Definition setjoin_parallel.h:191
uint64_t listlens
Definition setjoin_parallel.h:50
bool overlapRS(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:404
double det
Definition setjoin_parallel.h:47
ui query_minSize
Definition setjoin_parallel.h:72
#define MAX_PAIR_SIZE
Definition config.h:44
#define MAXTHREADNUM
Definition config.h:38
#define EPS
Definition config.h:78
#define MAX_EMPTY_SIZE
Definition config.h:86
SimFuncType
Definition type.h:48
unsigned int ui
Definition type.h:8