Entity Matching by Similarity Join
 
Loading...
Searching...
No Matches
setjoin_parallel.h
Go to the documentation of this file.
1/*
2 * author: Dong Deng
3 * modified: Zhencan Peng in rutgers-db/RedPajama_Analysis
4 * modified: Yunqi Li
5 * contact: liyunqixa@gmail.com
6 */
7#ifndef _SETJOINPARELLED_H_
8#define _SETJOINPARELLED_H_
9
10#include "config.h"
11#include "type.h"
12#include "joinutil.h"
13#include "index.h"
14#include <iostream>
15#include <fstream>
16#include <vector>
17#include <map>
18#include <unordered_map>
19#include <unordered_set>
20#include <string>
21#include <algorithm>
22#include <queue>
23#include <numeric>
24#include <cmath>
25#include <cstdio>
26#include <string.h>
27#include <inttypes.h>
28#include <sys/time.h>
29#include <limits.h>
30#include <assert.h>
31
32
34{
35public:
36 bool ifRS = false;
39
40 // join func
42 std::string typeMap[3] = {"Jaccard", "Cosine", "Dice"};
43 double (SetJoinParallel::*weightedFunc)(ui, ui) = nullptr;
44 double (SetJoinParallel::*normalFunc)(ui, ui) = nullptr;
45 bool (SetJoinParallel::*overlapFunc)(ui, ui, int, int, int) = nullptr;
46
47 double det;
48 uint64_t resultNum = 0; // Number of result pairs found
49 uint64_t candidateNum = 0; // Number of candidate pairs considered
50 uint64_t listlens = 0;
51 ui maxIndexPartNum{0}; // Maximum index partition number
52
53 // Dataset containing records to be joined
54 std::vector<std::vector<ui>> work_dataset;
55 std::vector<std::vector<ui>> query_dataset;
56 std::vector<double> work_weights;
57 std::vector<double> query_weights;
58 std::vector<double> wordwt;
59 // Bucket for empty
60 std::vector<ui> workEmpty;
61 std::vector<ui> queryEmpty;
62 // length
63 std::vector<ui> workLength;
64
65 // Parameters related to calculation and dataset
66 double coe{0.0};
67 double coePart{0.0};
68 double ALPHA{0.0};
69 ui work_n{0}; // Number of records in the dataset
71 ui work_maxSize{0}, work_minSize{0}; // Maximum size of the records
74
75 // Array to store result pairs for each thread
76 std::vector<std::pair<int, int>> result_pairs[MAXTHREADNUM];
77 std::vector<std::pair<int, int>> emptyPairs[MAXTHREADNUM];
78#if MAINTAIN_VALUE == 1
79 bool isWeightedComp{false}; // use the weighted version sim funcs
80 std::vector<WeightPair> result_pairs_[MAXTHREADNUM];
81 int isHeap[MAXTHREADNUM] = { 0 };
82#endif
83
84 // Recording time cost of different part
85 double index_cost;
92
93private:
94 // Index
95 SetJoinParelledIndex invertedIndex;
96
97 int *prime_exp; // Array for storing prime numbers, presumably for hashing
98 bool **quickRef2D; // 2D quick reference array
99 bool **negRef2D; // 2D negative reference array
100
101 // Vectors for storing range information(the groups that based on the size of documents)
102 std::vector<std::pair<ui, ui>> range;
103 std::vector<ui> range_st;
104 std::vector<int> range_id, rangeIdQuery, rangeQueryAdd;
105
106 // the precalculated hashvalue key for the partitions and ondeletions
107 std::vector<std::vector<ui>> parts_keys, partsKeysQuery;
108 std::vector<std::vector<ui>> onedelete_keys, oneDeleteKeysQuery;
109 std::vector<std::vector<ui>> odkeys_st, oDKeysStQuery; // Stores position of one deletion information
110
111 // vectors needed when allocate by greedy heap method
112 std::vector<ui> invPtrArr[MAXTHREADNUM];
113 std::vector<ui> intPtrArr[MAXTHREADNUM];
114 std::vector<std::vector<ui>> onePtrArr[MAXTHREADNUM];
115 std::vector<std::pair<int, ui>> valuesArr[MAXTHREADNUM]; // <value, loc>
116 std::vector<ui> scoresArr[MAXTHREADNUM];
117
118public:
119 // interchangeable value
120 // A must be query and B must be work
121 bool flagIC{false}; // indicate whether considering interchangeable value
122 std::vector<int> grpIdA, grpIdB; // default to -1
123 std::vector<std::vector<int>> groupA, groupB;
124 std::vector<ui> revIdMapA, revIdMapB;
125 std::vector<ui> idMapA, idMapB;
126 // copy from "fast_group"
127 double **featureValueCache{nullptr};
128 int *discreteCacheIdx{nullptr};
129
130public:
131 // Self-join
132 SetJoinParallel(const std::vector<std::vector<ui>> &sorted_records, const std::vector<double> &recwt,
133 const std::vector<double> &_wordwt, double _det, ui _maxHeapSize = 0,
134 bool _isWeightedComp = false)
135 : work_dataset(sorted_records), work_weights(recwt), wordwt(_wordwt), work_n(work_dataset.size()),
136 work_maxSize(work_dataset.back().size()), work_minSize(work_dataset.front().size()) {
137#if RESIZE_DATA == 1
138 if(_det <= 0.4 && work_maxSize >= 55) {
140 work_maxSize = work_dataset.back().size();
141 }
142#endif
143 printf("Work size: %u\n", work_n);
144 printf("Min record's size: %u\tMax record's size: %u\n", work_minSize, work_maxSize);
145
146 maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE : _maxHeapSize;
147#if MAINTAIN_VALUE == 1
148 isWeightedComp = _isWeightedComp;
149 for(int tid = 0; tid < MAXTHREADNUM; tid++)
150 result_pairs_[tid].reserve(maxHeapSize);
151#endif
152 }
153
154 // RS-join
155 SetJoinParallel(const std::vector<std::vector<ui>> &work_records, const std::vector<std::vector<ui>> &query_records,
156 const std::vector<double> &workwt, const std::vector<double> &querywt,
157 const std::vector<double> &_wordwt, double _det, ui _maxHeapSize = 0,
158 bool _isWeightedComp = false)
159 : ifRS(true), work_dataset(work_records), query_dataset(query_records),
160 work_weights(workwt), query_weights(querywt), wordwt(_wordwt),
161 work_n(work_dataset.size()), query_n(query_dataset.size()),
162 work_maxSize(work_dataset.back().size()), work_minSize(work_dataset.front().size()),
163 query_maxSize(query_dataset.back().size()), query_minSize(query_dataset.front().size()) {
164#if RESIZE_DATA == 1
165 if(_det <= 0.4 && work_maxSize >= 55) {
167 work_maxSize = work_dataset.back().size();
168 }
169 if(_det <= 0.4 && query_maxSize >= 55) {
171 query_maxSize = query_dataset.back().size();
172 }
173#endif
174 printf("Work size: %u\tQuery size: %u\n", work_n, query_n);
175 printf("Min work record's size: %u\tMax work record's size: %u\n", work_minSize, work_maxSize);
176 printf("Min query record's size: %u\tMax query record's size: %u\n", query_minSize, query_maxSize);
177
178 maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE : _maxHeapSize;
179#if MAINTAIN_VALUE == 1
180 isWeightedComp = _isWeightedComp;
181 for(int tid = 0; tid < MAXTHREADNUM; tid++)
182 result_pairs_[tid].reserve(maxHeapSize);
183#endif
184 }
185
186 // index's memory released in functions
187 ~SetJoinParallel() = default;
188
189public:
190 // Output the Parameters
191 void showPara() const {
192 std::string st = "";
193 switch(simFType) {
194 case SimFuncType::JACCARD : st = "jaccard"; break;
195 case SimFuncType::COSINE : st = "cosine"; break;
196 case SimFuncType::DICE : st = "dice"; break;
197 }
198 printf("type: %s det: %.4lf coe: %.8lf ALPHA: %.8lf maxIndexPartNum: %u \n", st.c_str(), det, coePart, ALPHA,
200 }
201
202 void resizeData(std::vector<std::vector<ui>> &dataset) {
203 // [\sigma - 2 * sd, \sigma + 2 * sd]
204 int unempty = 0;
205 double avergaeSize = 0.0;
206 for(const auto &rec : dataset) {
207 if(rec.empty()) continue;
208 avergaeSize += (double)rec.size() * 1.0;
209 ++ unempty;
210 }
211 avergaeSize = avergaeSize / (unempty * 1.0);
212 long double sd = 0.0;
213 for(const auto &rec : dataset) {
214 if(rec.empty()) continue;
215 sd += (rec.size() * 1.0 - avergaeSize) * (rec.size() * 1.0 - avergaeSize);
216 }
217 sd = sd / (unempty * 1.0);
218 printf("Resize dataset on 1sd, mean: %.1lf\tsd: %.1Lf\n", avergaeSize, sd);
219 int bound = ceil(avergaeSize + 1.0 * sd - 1e-5);
220 for(auto &rec : dataset)
221 if((int)rec.size() > bound)
222 rec.resize(bound);
223 }
224
226 double total_hash_cost = 0;
227 double total_memeory_cost = 0;
228 double total_find_cost = 0;
229 double total_alloc_cost = 0;
230 double total_verif_cost = 0;
231 double sum;
232 for (int i = 0; i < MAXTHREADNUM; i++) {
233 total_hash_cost += hashInFind_cost[i];
234 total_memeory_cost += mem_cost[i];
235 total_find_cost += find_cost[i];
236 total_alloc_cost += alloc_cost[i];
237 total_verif_cost += verif_cost[i];
238 }
239 sum = total_hash_cost + total_memeory_cost + total_find_cost + total_alloc_cost + total_verif_cost;
240 total_hash_cost = total_hash_cost / sum * search_cost;
241 total_memeory_cost = total_memeory_cost / sum * search_cost;
242 total_find_cost = total_find_cost / sum * search_cost;
243 total_alloc_cost = total_alloc_cost / sum * search_cost;
244 total_verif_cost = total_verif_cost / sum * search_cost;
245
246 printf("|index_cost| total_hash_cost| total_memeory_cost| find_cost| alloc_cost| verif_cost|\n");
247 printf("|%f|%f|%f|%f|%f|%f|\n", index_cost, total_hash_cost, total_memeory_cost, total_find_cost, total_alloc_cost, total_verif_cost);
248 }
249
251 std::vector<unsigned long long> range_size(range.size());
252 for (ui i = 0; i < range_id.size(); i++) {
253 range_size[range_id[i]] += work_dataset[i].size();
254 }
255
256 double total_size = 0;
257 double max_size = 0;
258 for (auto &size : range_size) {
259 total_size += size;
260 max_size = std::max(max_size, (double)size);
261 }
262
263 printf("Average Range size: %.3f Maximum Range size ratio %.3f \n", total_size / range_size.size(), max_size / total_size);
264 }
265
266 // Function to get the total number of result pairs found by all threads
267 unsigned long long getResultPairsAmount() {
268 unsigned long long pairs_amount = 0;
269 for (int i = 0; i < MAXTHREADNUM; i++) {
270 pairs_amount += result_pairs[i].size();
271 }
272 return pairs_amount;
273 }
274
275 void mergeResults(std::vector<std::pair<int, int>> &finalPairs) {
276#if APPEND_EMPTY == 1
277 if(!ifRS) {
278#pragma omp parallel for
279 for(ui i = 0; i < workEmpty.size(); i++) {
280 int tid = omp_get_thread_num();
281 if(earlyTerminatedEmpty[tid] == 1)
282 continue;
283
284 for(ui j = i + 1; j < workEmpty.size(); j++)
285 emptyPairs[tid].emplace_back(i, j);
286 if(emptyPairs[tid].size() > MAX_EMPTY_SIZE)
287 earlyTerminatedEmpty[tid] = 1;
288 }
289 }
290 else {
291#pragma omp parallel for
292 for(ui j = 0; j < workEmpty.size(); j++) {
293 int tid = omp_get_thread_num();
294 if(earlyTerminatedEmpty[tid] == 1)
295 continue;
296
297 for(ui i = 0; i < queryEmpty.size(); i++)
298 emptyPairs[tid].emplace_back(i, j);
299 if(emptyPairs[tid].size() > MAX_EMPTY_SIZE)
300 earlyTerminatedEmpty[tid] = 1;
301 }
302 }
303#endif
304
305 std::cout << "Start merging" << std::endl << std::flush;
306#if MAINTAIN_VALUE == 0
307 for(int i = 0; i < MAXTHREADNUM; i++) {
308 finalPairs.insert(finalPairs.end(), emptyPairs[i].begin(), emptyPairs[i].end());
309 finalPairs.insert(finalPairs.end(), result_pairs[i].begin(), result_pairs[i].end());
310 }
311#elif MAINTAIN_VALUE == 1
312 for(int i = 0; i < MAXTHREADNUM; i++) {
313 finalPairs.insert(finalPairs.end(), emptyPairs[i].begin(), emptyPairs[i].end());
314 // finalPairs.insert(finalPairs.end(), result_pairs_[i].begin(), result_pairs_[i].end());
315 for(const auto &wp : result_pairs_[i])
316 finalPairs.emplace_back(wp.id1, wp.id2);
317 }
318#endif
319
320
321#if DEDUPLICATE == 1
322 sort(finalPairs.begin(), finalPairs.end());
323 auto it = unique(finalPairs.begin(), finalPairs.end());
324 if(it != finalPairs.end()) {
325 std::cerr << "Duplicate results: " << distance(it, finalPairs.end()) << std::endl;
326 exit(1);
327 }
328#endif
329 std::cout << workEmpty.size() << std::endl << std::flush;
330 }
331
332public:
333 // sim funcs
334 bool overlapSelf(ui x, ui y, int posx = 0, int posy = 0, int current_overlap = 0) {
335 // Calculate required overlap based on a formula
336 int require_overlap = 0;
337 switch(simFType) {
338 case SimFuncType::JACCARD : {
339 require_overlap = ceil(det / (1 + det) * (int)(work_dataset[x].size() + work_dataset[y].size()) - EPS);
340 break;
341 }
342 case SimFuncType::COSINE : {
343 require_overlap = ceil(1.0 * det * sqrt(work_dataset[x].size() * work_dataset[y].size()) - EPS);
344 break;
345 }
346 case SimFuncType::DICE : {
347 require_overlap = ceil(0.5 * det * (int)(work_dataset[x].size() + work_dataset[y].size()) - EPS);
348 break;
349 }
350 }
351
352 // Loop through both sets to find overlap
353 while (posx < (int)work_dataset[x].size() && posy < (int)work_dataset[y].size()) {
354 // Check if remaining elements are sufficient for required overlap
355 if ((int)work_dataset[x].size() - posx + current_overlap < require_overlap || (int)work_dataset[y].size() - posy + current_overlap < require_overlap)
356 return false;
357
358 if (work_dataset[x][posx] == work_dataset[y][posy]) {
359 current_overlap++;
360 posx++;
361 posy++;
362 } else if (work_dataset[x][posx] < work_dataset[y][posy]) {
363 posx++;
364 } else {
365 posy++;
366 }
367 }
368
369 return current_overlap >= require_overlap;
370 }
371
372 bool overlapSelfIC(ui x, ui y, int posx = 0, int posy = 0, int current_overlap = 0) {
373 ui revIdx = idMapA[x];
374 ui revIdy = idMapA[y];
375 int grpIdX = grpIdA[revIdx];
376 int grpIdY = grpIdA[revIdy];
377
378 if(grpIdX == -1 && grpIdY == -1)
379 return overlapSelf(x, y, posx, posy, current_overlap);
380 else if(grpIdX != -1 && grpIdY == -1) {
381 for(const auto &icid : groupA[grpIdX]) {
382 bool success = overlapSelf(revIdMapA[icid], y, posx, posy, current_overlap);
383 if(success)
384 return true;
385 }
386 }
387 else if(grpIdX == -1 && grpIdY != -1) {
388 for(const auto &icid: groupA[grpIdY]) {
389 bool success = overlapSelf(x, revIdMapA[icid], posx, posy, current_overlap);
390 if(success)
391 return true;
392 }
393 }
394 else {
395 int dcIdxX = discreteCacheIdx[grpIdX];
396 int dcIdxY = discreteCacheIdx[grpIdY];
397 double val = featureValueCache[dcIdxX][dcIdxY];
398 return val >= det;
399 }
400
401 return false;
402 }
403
404 bool overlapRS(ui x, ui y, int posx = 0, int posy = 0, int current_overlap = 0) {
405 // Calculate required overlap based on a formula
406 int require_overlap = 0;
407 switch(simFType) {
408 case SimFuncType::JACCARD : {
409 require_overlap = ceil(det / (1 + det) * (int)(query_dataset[x].size() + work_dataset[y].size()) - EPS);
410 break;
411 }
412 case SimFuncType::COSINE : {
413 require_overlap = ceil(1.0 * det * sqrt(query_dataset[x].size() * work_dataset[y].size()) - EPS);
414 break;
415 }
416 case SimFuncType::DICE : {
417 require_overlap = ceil(0.5 * det * (int)(query_dataset[x].size() + work_dataset[y].size()) - EPS);
418 break;
419 }
420 }
421
422 // Loop through both sets to find overlap
423 while (posx < (int)query_dataset[x].size() && posy < (int)work_dataset[y].size()) {
424 // Check if remaining elements are sufficient for required overlap
425 if ((int)query_dataset[x].size() - posx + current_overlap < require_overlap || (int)work_dataset[y].size() - posy + current_overlap < require_overlap)
426 return false;
427
428 if (query_dataset[x][posx] == work_dataset[y][posy]) {
429 current_overlap++;
430 posx++;
431 posy++;
432 } else if (query_dataset[x][posx] < work_dataset[y][posy]) {
433 posx++;
434 } else {
435 posy++;
436 }
437 }
438 return current_overlap >= require_overlap;
439 }
440
441 bool overlapRSIC(ui x, ui y, int posx = 0, int posy = 0, int current_overlap = 0) {
442 ui revIdx = idMapA[x];
443 ui revIdy = idMapB[y];
444 int grpIdX = grpIdA[revIdx];
445 int grpIdY = grpIdB[revIdy];
446
447 if(grpIdX == -1 && grpIdY == -1)
448 return overlapRS(x, y, posx, posy, current_overlap);
449 else if(grpIdX != -1 && grpIdY == -1) {
450 for(const auto &icid : groupA[grpIdX]) {
451 bool success = overlapRS(revIdMapA[icid], y, posx, posy, current_overlap);
452 if(success)
453 return true;
454 }
455 }
456 else if(grpIdX == -1 && grpIdY != -1) {
457 for(const auto &icid: groupB[grpIdY]) {
458 bool success = overlapRS(x, revIdMapB[icid], posx, posy, current_overlap);
459 if(success)
460 return true;
461 }
462 }
463 else {
464 int dcIdxX = discreteCacheIdx[grpIdX];
465 int dcIdxY = discreteCacheIdx[grpIdY];
466 double val = featureValueCache[dcIdxX][dcIdxY];
467 return val >= det;
468 }
469
470 return false;
471 }
472
473 // weighted sim funcs
474 double weightedOverlap(ui x, ui y) {
475 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
476 const auto &records2 = work_dataset[y];
477
478 std::vector<ui> res;
479 set_intersection(records1.begin(), records1.end(),
480 records2.begin(), records2.end(),
481 std::back_inserter(res));
482
483 double ovlp = 0.0;
484 for(const auto &e : res)
485 ovlp += wordwt[e];
486
487 return ovlp;
488 }
489
490 double weightedJaccard(ui x, ui y) {
491 double ovlp = weightedOverlap(x, y);
492 double rw1 = ifRS ? query_weights[x] : work_weights[x];
493 double rw2 = work_weights[y];
494
495 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
496
497 return ovlp / (rw1 + rw2 - ovlp);
498 }
499 double jaccard(ui x, ui y) {
500 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
501 const auto &records2 = work_dataset[y];
502
503 std::vector<ui> res;
504 set_intersection(records1.begin(), records1.end(),
505 records2.begin(), records2.end(),
506 std::back_inserter(res));
507 int ovlp = (int)res.size();
508
509 return ovlp * 1.0 / (records1.size() + records2.size() - ovlp) * 1.0;
510 }
511
512 double weightedCosine(ui x, ui y) {
513 double ovlp = weightedOverlap(x, y);
514 double rw1 = ifRS ? query_weights[x] : work_weights[x];
515 double rw2 = work_weights[y];
516
517 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
518
519 return ovlp / sqrt(rw1 * rw2);
520 }
521 double cosine(ui x, ui y) {
522 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
523 const auto &records2 = work_dataset[y];
524
525 std::vector<ui> res;
526 set_intersection(records1.begin(), records1.end(),
527 records2.begin(), records2.end(),
528 std::back_inserter(res));
529 int ovlp = (int)res.size();
530
531 return ovlp * 1.0 / sqrt(records1.size() * records2.size()) * 1.0;
532 }
533
534 double weightedDice(ui x, ui y) {
535 double ovlp = weightedOverlap(x, y);
536 double rw1 = ifRS ? query_weights[x] : work_weights[x];
537 double rw2 = work_weights[y];
538
539 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
540
541 return 2.0 * ovlp / (rw1 + rw2);
542 }
543 double dice(ui x, ui y) {
544 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
545 const auto &records2 = work_dataset[y];
546
547 std::vector<ui> res;
548 set_intersection(records1.begin(), records1.end(),
549 records2.begin(), records2.end(),
550 std::back_inserter(res));
551 int ovlp = (int)res.size();
552
553 return ovlp * 2.0 / (records1.size() + records2.size()) * 1.0;
554 }
555
556public:
557 // join steps
558 // Function to build index
559 void index(double threshold);
560
561 // Function to find candidate and similar pairs using a greedy approach
562 void GreedyFindCandidateAndSimPairs(const int &tid, const int indexLenGrp, const ui rid,
563 ui record_length, const std::vector<ui> &p_keys,
564 const std::vector<ui> &od_keys, const std::vector<ui> &odk_st);
565
566 // Function to find similar pairs
567 void findSimPairsSelf();
568 void findSimPairsRS();
569};
570
571
572#endif
Definition setjoin_parallel.h:34
bool flagIC
Definition setjoin_parallel.h:121
std::vector< std::vector< ui > > work_dataset
Definition setjoin_parallel.h:54
bool overlapRSIC(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:441
std::vector< WeightPair > result_pairs_[MAXTHREADNUM]
Definition setjoin_parallel.h:80
ui maxHeapSize
Definition setjoin_parallel.h:73
std::vector< int > grpIdA
Definition setjoin_parallel.h:122
std::vector< std::pair< int, int > > result_pairs[MAXTHREADNUM]
Definition setjoin_parallel.h:76
void findSimPairsSelf()
Definition setjoin_parallel.cc:791
std::vector< double > wordwt
Definition setjoin_parallel.h:58
std::vector< ui > revIdMapB
Definition setjoin_parallel.h:124
void index(double threshold)
Definition setjoin_parallel.cc:11
double weightedJaccard(ui x, ui y)
Definition setjoin_parallel.h:490
double verif_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:91
double ALPHA
Definition setjoin_parallel.h:68
double mem_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:88
ui work_n
Definition setjoin_parallel.h:69
double(SetJoinParallel::* normalFunc)(ui, ui)
Definition setjoin_parallel.h:44
std::vector< ui > idMapA
Definition setjoin_parallel.h:125
bool ifRS
Definition setjoin_parallel.h:36
void reportTimeCost()
Definition setjoin_parallel.h:225
double weightedDice(ui x, ui y)
Definition setjoin_parallel.h:534
double find_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:89
std::vector< std::pair< int, int > > emptyPairs[MAXTHREADNUM]
Definition setjoin_parallel.h:77
ui work_minSize
Definition setjoin_parallel.h:71
SetJoinParallel(const std::vector< std::vector< ui > > &sorted_records, const std::vector< double > &recwt, const std::vector< double > &_wordwt, double _det, ui _maxHeapSize=0, bool _isWeightedComp=false)
Definition setjoin_parallel.h:132
double ** featureValueCache
Definition setjoin_parallel.h:127
double dice(ui x, ui y)
Definition setjoin_parallel.h:543
std::vector< std::vector< int > > groupB
Definition setjoin_parallel.h:123
std::vector< std::vector< ui > > query_dataset
Definition setjoin_parallel.h:55
std::vector< int > grpIdB
Definition setjoin_parallel.h:122
std::vector< ui > workLength
Definition setjoin_parallel.h:63
ui query_maxSize
Definition setjoin_parallel.h:72
double search_cost
Definition setjoin_parallel.h:86
~SetJoinParallel()=default
int earlyTerminated[MAXTHREADNUM]
Definition setjoin_parallel.h:37
std::vector< ui > queryEmpty
Definition setjoin_parallel.h:61
double jaccard(ui x, ui y)
Definition setjoin_parallel.h:499
double alloc_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:90
void mergeResults(std::vector< std::pair< int, int > > &finalPairs)
Definition setjoin_parallel.h:275
void GreedyFindCandidateAndSimPairs(const int &tid, const int indexLenGrp, const ui rid, ui record_length, const std::vector< ui > &p_keys, const std::vector< ui > &od_keys, const std::vector< ui > &odk_st)
Definition setjoin_parallel.cc:427
bool overlapSelfIC(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:372
std::vector< ui > idMapB
Definition setjoin_parallel.h:125
std::string typeMap[3]
Definition setjoin_parallel.h:42
double coePart
Definition setjoin_parallel.h:67
double cosine(ui x, ui y)
Definition setjoin_parallel.h:521
std::vector< double > work_weights
Definition setjoin_parallel.h:56
bool overlapSelf(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:334
SetJoinParallel(const std::vector< std::vector< ui > > &work_records, const std::vector< std::vector< ui > > &query_records, const std::vector< double > &workwt, const std::vector< double > &querywt, const std::vector< double > &_wordwt, double _det, ui _maxHeapSize=0, bool _isWeightedComp=false)
Definition setjoin_parallel.h:155
void resizeData(std::vector< std::vector< ui > > &dataset)
Definition setjoin_parallel.h:202
double coe
Definition setjoin_parallel.h:66
double(SetJoinParallel::* weightedFunc)(ui, ui)
Definition setjoin_parallel.h:43
std::vector< std::vector< int > > groupA
Definition setjoin_parallel.h:123
void reportLargestGroup()
Definition setjoin_parallel.h:250
std::vector< double > query_weights
Definition setjoin_parallel.h:57
uint64_t candidateNum
Definition setjoin_parallel.h:49
bool(SetJoinParallel::* overlapFunc)(ui, ui, int, int, int)
Definition setjoin_parallel.h:45
ui maxIndexPartNum
Definition setjoin_parallel.h:51
int * discreteCacheIdx
Definition setjoin_parallel.h:128
std::vector< ui > workEmpty
Definition setjoin_parallel.h:60
SimFuncType simFType
Definition setjoin_parallel.h:41
unsigned long long getResultPairsAmount()
Definition setjoin_parallel.h:267
int earlyTerminatedEmpty[MAXTHREADNUM]
Definition setjoin_parallel.h:38
double weightedOverlap(ui x, ui y)
Definition setjoin_parallel.h:474
ui work_maxSize
Definition setjoin_parallel.h:71
ui query_n
Definition setjoin_parallel.h:70
double index_cost
Definition setjoin_parallel.h:85
uint64_t resultNum
Definition setjoin_parallel.h:48
bool isWeightedComp
Definition setjoin_parallel.h:79
double weightedCosine(ui x, ui y)
Definition setjoin_parallel.h:512
int isHeap[MAXTHREADNUM]
Definition setjoin_parallel.h:81
void findSimPairsRS()
Definition setjoin_parallel.cc:984
std::vector< ui > revIdMapA
Definition setjoin_parallel.h:124
double hashInFind_cost[MAXTHREADNUM]
Definition setjoin_parallel.h:87
void showPara() const
Definition setjoin_parallel.h:191
uint64_t listlens
Definition setjoin_parallel.h:50
bool overlapRS(ui x, ui y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin_parallel.h:404
double det
Definition setjoin_parallel.h:47
ui query_minSize
Definition setjoin_parallel.h:72
#define MAX_PAIR_SIZE
Definition config.h:44
#define MAXTHREADNUM
Definition config.h:38
#define EPS
Definition config.h:78
#define MAX_EMPTY_SIZE
Definition config.h:86
Definition index.h:61
SimFuncType
Definition type.h:48
unsigned int ui
Definition type.h:8