Entity Matching by Similarity Join
 
Loading...
Searching...
No Matches
stringjoin_parallel.h
Go to the documentation of this file.
1/*
2 * author: Dong Deng
3 * modified: Yunqi Li
4 * contact: liyunqixa@gmail.com
5 */
6#ifndef _STRING_JOIN_PARALLEL_H_
7#define _STRING_JOIN_PARALLEL_H_
8
9#include "config.h"
10#include "joinutil.h"
11#include "index.h"
12#include "simfunc.h"
13#ifdef EDLIB_INSTALLED
14#include "edlib.h"
15#endif
16#include <iostream>
17#include <unordered_map>
18#include <unordered_set>
19#include <algorithm>
20#include <cmath>
21#include <omp.h>
22#include <immintrin.h>
23#include <assert.h>
24
25
27{
28public:
29 using InvListsParallel = std::unordered_map<uint64_t, std::vector<int>>;
30 using InvListPrefix = std::unordered_map<uint64_t, std::vector<std::pair<int, int>>>;
31
32#if TIMER_ON == 1
33public:
34 double indexProbingCost[MAXTHREADNUM]{0};
35 double verifyingCost[MAXTHREADNUM]{0};
36 uint64_t globalTimerCount{0};
37#endif
38
39public:
40 int sharePrefix{0}; // approximate
41 std::vector<uint64_t> workPrefixHash;
42 std::vector<uint64_t> queryPrefixHash;
44
45public:
46 int workMinDictLen{19260817};
48 int queryMinDictLen{19260817};
50 int maxDictLen{0};
51 int minDictLen{0};
52 uint64_t avgDictLen{0};
53 int workN{0};
54 int queryN{0};
55 int N{0};
56 int D{0}; // threshold
57 int PN{0}; // part num
58 int hashNumber{31}; // prime number for hashing
59 int modNumber{1000000007}; // prime mod number for hashing
60 std::vector<std::string> work_dataset; // work dataset
61 std::vector<std::string> query_dataset; // query dataset, null if self-join
62 std::vector<std::pair<int, int>> pairs[MAXTHREADNUM];
63
64 uint64_t candNum{0}; // number of candidate pairs
65 uint64_t veriNum{0}; // number of pairs passed left verification
66 uint64_t listNum{0}; // number of inverted list matched
67 uint64_t realNum{0}; // number of results
68 bool valid[MAXTHREADNUM]; // used for early termination
70 int right[MAXTHREADNUM]; // verify range of left part: [D - left, D + right]
72 int _right[MAXTHREADNUM]; // verify range of right part: [D - _left, D + _right]
73
74 int **matrix[MAXTHREADNUM]; // matrix used to verify left parts
75 int **_matrix[MAXTHREADNUM]; // matrix used to verify right parts
76 bool *quickRef[MAXTHREADNUM]; // record results to avoid duplicate verification
77 int **partLen{nullptr}; // record length of all partitions
78 int **partPos{nullptr}; // record start position of all partitions
79 int *dist{nullptr}; // id of string with different length to its former one
80 std::vector<int> workLengthArray; // lengths of work records
81 std::vector<int> queryLengthArray;
82 std::vector<std::vector<int>> worklengthMap; // inverted index -> lenght: {ids}
83 std::vector<std::vector<int>> querylengthMap;
84 uint64_t *power{nullptr}; // the power of hash number, used to hash segment and substring
85 InvListsParallel **invLists{nullptr}; // inverted index of different string length and different segments
87 std::vector<PIndex> **partIndex{nullptr}; // the index record with pair of substring and segment should be checked
89 // added index
90 std::unordered_set<std::string> strCount; // count total different strings
91 int *workInvSC{nullptr}; // inverted index for strCount
92 int *queryInvSC{nullptr};
93
95#if MAINTAIN_VALUE_EDIT == 1
96 std::vector<WeightPairEdit> result_pairs_[MAXTHREADNUM];
97 int isHeap[MAXTHREADNUM] = { 0 };
98#endif
99
100public:
102
103 StringJoinParallel(const std::vector<std::string>& data, int threshold, ui _maxHeapSize = 0)
104 : workN(data.size()), D(threshold), PN(threshold + 1), work_dataset(data) {
105#if DROP_EMPTY == 1
106 printf("Drop empty strings for string(lev) join\n");
107 auto wit = work_dataset.begin();
108 for( ; wit != work_dataset.end(); ) {
109 if((*wit).empty())
110 wit = work_dataset.erase(wit);
111 else
112 ++ wit;
113 }
114 workN = work_dataset.size();
115#endif
116 for(const auto &wit : work_dataset)
117 if(strCount.find(wit) == strCount.end())
118 strCount.insert(wit);
119 for(int i = 0; i < workN; i++) {
120 workMaxDictLen = (int)work_dataset[i].size() > workMaxDictLen ? (int)work_dataset[i].size()
122 workMinDictLen = (int)work_dataset[i].size() < workMinDictLen ? (int)work_dataset[i].size()
124 avgDictLen += work_dataset[i].size();
125 }
126 std::sort(work_dataset.begin(), work_dataset.end(), StringJoinUtil::strLessT);
127
130 N = workN;
131 avgDictLen /= workN;
132
133 printf("Work size: %zu\tQuery size: %zu\tAvg length: %lu\n", work_dataset.size(), query_dataset.size(), avgDictLen);
134 printf("Min work record's size: %u\tMax work record's size: %u\n", workMinDictLen, workMaxDictLen);
135 printf("Number of tokens: %ld\n", strCount.size());
136
137#if APPROXIMATE == 1
138 sharePrefix = avgDictLen / 10 + 1;
139 if(sharePrefix == 0) ++ sharePrefix;
140 printf("Trigger approximate join on sharing prefix: %d\n", sharePrefix);
141#endif
142
143 maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE : _maxHeapSize;
144#if MAINTAIN_VALUE_EDIT == 1
145 for(int tid = 0; tid < MAXTHREADNUM; tid++)
146 result_pairs_[tid].reserve(maxHeapSize);
147#endif
148 }
149
150 StringJoinParallel(const std::vector<std::string>& work, const std::vector<std::string>& query, int threshold,
151 ui _maxHeapSize = 0)
152 : workN(work.size()), queryN(query.size()), D(threshold), PN(threshold + 1),
153 work_dataset(work), query_dataset(query) {
154#if DROP_EMPTY == 1
155 printf("Drop empty strings for string(lev) join\n");
156 // work
157 auto wit = work_dataset.begin();
158 for( ; wit != work_dataset.end(); ) {
159 if((*wit).empty())
160 wit = work_dataset.erase(wit);
161 else
162 ++ wit;
163 }
164 workN = work_dataset.size();
165 // query
166 auto qit = query_dataset.begin();
167 for( ; qit != query_dataset.end(); ) {
168 if((*qit).empty())
169 qit = query_dataset.erase(qit);
170 else
171 ++ qit;
172 }
173 queryN = query_dataset.size();
174#endif
175 for(const auto &wit : work_dataset)
176 if(strCount.find(wit) == strCount.end())
177 strCount.insert(wit);
178 for(const auto &qit : query_dataset)
179 if(strCount.find(qit) == strCount.end())
180 strCount.insert(qit);
181 for(int i = 0; i < workN; i++) {
182 workMaxDictLen = (int)work_dataset[i].size() > workMaxDictLen ? (int)work_dataset[i].size()
184 workMinDictLen = (int)work_dataset[i].size() < workMinDictLen ? (int)work_dataset[i].size()
186 avgDictLen += work_dataset[i].size();
187 }
188 for(int i = 0; i < queryN; i++) {
189 queryMaxDictLen = (int)query_dataset[i].size() > queryMaxDictLen ? (int)query_dataset[i].size()
191 queryMinDictLen = (int)query_dataset[i].size() < queryMinDictLen ? (int)query_dataset[i].size()
193 avgDictLen += query_dataset[i].size();
194 }
195 std::sort(work_dataset.begin(), work_dataset.end(), StringJoinUtil::strLessT);
196 std::sort(query_dataset.begin(), query_dataset.end(), StringJoinUtil::strLessT);
197
200 N = std::max(workN, queryN);
201 avgDictLen /= (workN + queryN);
202
203 printf("Work size: %zu\tQuery size: %zu\tAvg length: %lu\n", work_dataset.size(), query_dataset.size(), avgDictLen);
204 printf("Min work record's size: %u\tMax work record's size: %u\n", workMinDictLen, workMaxDictLen);
205 printf("Min query record's size: %u\tMax query record's size: %u\n", queryMinDictLen, queryMaxDictLen);
206 printf("Number of tokens: %ld\n", strCount.size());
207
208#if APPROXIMATE == 1
209 sharePrefix = avgDictLen - 2 * D;
210 if(sharePrefix == 0) ++ sharePrefix;
211 printf("Trigger approximate join on sharing prefix: %d\n", sharePrefix);
212#endif
213
214 maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE : _maxHeapSize;
215#if MAINTAIN_VALUE_EDIT == 1
216 for(int tid = 0; tid < MAXTHREADNUM; tid++)
217 result_pairs_[tid].reserve(maxHeapSize);
218#endif
219 }
220
222 for (int lp = 0; lp < PN; lp++) {
223 delete[] partPos[lp];
224 delete[] partLen[lp];
225 delete[] invLists[lp];
226 delete[] partIndex[lp];
227 }
228 delete[] partPos[PN];
229
230 for(ui i = 0; i < MAXTHREADNUM; i++) {
231 for (int lp = maxDictLen; lp >= 0; lp--) {
232 delete[] matrix[i][lp];
233 delete[] _matrix[i][lp];
234 }
235 delete[] matrix[i];
236 delete[] _matrix[i];
237 delete[] quickRef[i];
238 delete[] hv[i];
239 }
240
241 delete[] dist;
242 delete[] power;
243 delete[] partPos;
244 delete[] partLen;
245 delete[] invLists;
246 delete[] partIndex;
247 delete[] workInvSC;
248 delete[] queryInvSC;
249
250 std::cout << "destructor" << std::endl << std::flush;
251 }
252
255
256public:
257 // pre-process
258 void init();
259 void prepareSelf();
260 void prepareRS();
261
262 // verify
263 bool verifyLeftPartSelf(int xid, int yid, int xlen, int ylen, int Tau, int tid, int sharing=0) {
264 const auto &workString = work_dataset[xid];
265 const auto &queryString = work_dataset[yid];
266 auto &currM = matrix[tid];
267
268 for (int i = sharing + 1; i <= xlen; i++) {
269 valid[tid] = 0;
270
271 if (i <= left[tid]) {
272 currM[i][D - i] = i;
273 valid[tid] = 1;
274 }
275
276 int val1 = i - left[tid];
277 int val2 = i + right[tid];
278 int lowerBound = std::max(val1, 1);
279 int upperBound = std::min(val2, ylen);
280
281 for (int j = lowerBound; j <= upperBound; j++) {
282 int val = j - i + D;
283
284 if (workString[i - 1] == queryString[j - 1])
285 currM[i][val] = currM[i - 1][val];
286 else
287 currM[i][val] = StringJoinUtil::min(currM[i - 1][val],
288 j - 1 >= val1 ? currM[i][val - 1] : D,
289 j + 1 <= val2 ? currM[i - 1][val + 1] : D) + 1;
290
291 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
292 valid[tid] = 1;
293 }
294
295 if (!valid[tid])
296 return false;
297 }
298
299 return currM[xlen][ylen - xlen + D] <= Tau;
300 }
301
302 bool verifyRightPartSelf(int xid, int yid, int xlen, int ylen, int xpos, int ypos, int Tau, int tid, int sharing=0) {
303 const auto &workString = work_dataset[xid];
304 const auto &queryString = work_dataset[yid];
305 auto &currM = _matrix[tid];
306
307 for (int i = sharing + 1; i <= xlen; i++) {
308 valid[tid] = 0;
309
310 if (i <= _left[tid]) {
311 currM[i][D - i] = i;
312 valid[tid] = 1;
313 }
314
315 int val1 = i - _left[tid];
316 int val2 = i + _right[tid];
317 int lowerBound = std::max(val1, 1);
318 int upperBound = std::min(val2, ylen);
319
320 for (int j = lowerBound; j <= upperBound; j++) {
321 int val = j - i + D;
322
323 if (workString[xpos + i - 1] == queryString[ypos + j - 1])
324 currM[i][val] = currM[i - 1][val];
325 else
326 currM[i][val] = StringJoinUtil::min(currM[i - 1][val],
327 j - 1 >= val1 ? currM[i][val - 1] : D,
328 j + 1 <= val2 ? currM[i - 1][val + 1] : D) + 1;
329
330 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
331 valid[tid] = 1;
332 }
333
334 if (!valid[tid])
335 return false;
336 }
337
338 return currM[xlen][ylen - xlen + D] <= Tau;
339 }
340
341 bool verifyLeftPartRS(int xid, int yid, int xlen, int ylen, int Tau, int tid, int sharing=0) {
342#if TIMER_ON == 1
343 timeval begin, end;
344 gettimeofday(&begin, NULL);
345#endif
346 const auto &workString = work_dataset[xid];
347 const auto &queryString = query_dataset[yid];
348 auto &currM = matrix[tid];
349
350 for (int i = sharing + 1; i <= xlen; i++) {
351 valid[tid] = 0;
352
353 if (i <= left[tid]) {
354 currM[i][D - i] = i;
355 valid[tid] = 1;
356 }
357
358 int val1 = i - left[tid];
359 int val2 = i + right[tid];
360 int lowerBound = std::max(val1, 1);
361 int upperBound = std::min(val2, ylen);
362
363 for (int j = lowerBound; j <= upperBound; j++) {
364 int val = j - i + D;
365
366 if (workString[i - 1] == queryString[j - 1])
367 currM[i][val] = currM[i - 1][val];
368 else
369 currM[i][val] = StringJoinUtil::min(currM[i - 1][val],
370 j - 1 >= val1 ? currM[i][val - 1] : D,
371 j + 1 <= val2 ? currM[i - 1][val + 1] : D) + 1;
372
373 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
374 valid[tid] = 1;
375 }
376
377 if (!valid[tid])
378 return false;
379 }
380
381#if TIMER_ON == 1
382 gettimeofday(&end, NULL);
383 double elapsedTime = end.tv_sec - begin.tv_sec + (end.tv_usec - begin.tv_usec) / 1e6;
384 verifyingCost[tid] += elapsedTime;
385#endif
386
387 return currM[xlen][ylen - xlen + D] <= Tau;
388 }
389
390 bool verifyRightPartRS(int xid, int yid, int xlen, int ylen, int xpos, int ypos, int Tau, int tid, int sharing=0) {
391#if TIMER_ON == 1
392 timeval begin, end;
393 gettimeofday(&begin, NULL);
394#endif
395 const auto &workString = work_dataset[xid];
396 const auto &queryString = query_dataset[yid];
397 auto &currM = _matrix[tid];
398
399 for (int i = sharing + 1; i <= xlen; i++) {
400 valid[tid] = 0;
401
402 if (i <= _left[tid]) {
403 currM[i][D - i] = i;
404 valid[tid] = 1;
405 }
406
407 int val1 = i - _left[tid];
408 int val2 = i + _right[tid];
409 int lowerBound = std::max(val1, 1);
410 int upperBound = std::min(val2, ylen);
411
412 for (int j = lowerBound; j <= upperBound; j++) {
413 int val = j - i + D;
414
415 if (workString[xpos + i - 1] == queryString[ypos + j - 1])
416 currM[i][val] = currM[i - 1][val];
417 else
418 currM[i][val] = StringJoinUtil::min(currM[i - 1][val],
419 j - 1 >= val1 ? currM[i][val - 1] : D,
420 j + 1 <= val2 ? currM[i - 1][val + 1] : D) + 1;
421
422 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
423 valid[tid] = 1;
424 }
425
426 if (!valid[tid])
427 return false;
428 }
429
430#if TIMER_ON == 1
431 gettimeofday(&end, NULL);
432 double elapsedTime = end.tv_sec - begin.tv_sec + (end.tv_usec - begin.tv_usec) / 1e6;
433 verifyingCost[tid] += elapsedTime;
434#endif
435
436 return currM[xlen][ylen - xlen + D] <= Tau;
437 }
438
439 // proposed in extended version in TODS'13
440 bool iterativeVerifyLeftPartRS(int xid, int yid, int stPos, int xlen, int ylen, int wlen, int taul, int partId) {
441 int xidx = xlen - 1;
442 int yidx = ylen - 1;
443 while(work_dataset[xid][xidx] == query_dataset[yid][yidx] && yidx >= 0 && xidx >= stPos) {
444 -- xidx;
445 -- yidx;
446 }
447
448 // the second from last segment must have length >= 1
449 if(xidx <= stPos)
450 return true;
451 int addPos = stPos;
452 int addLen = xidx - stPos;
453 int addId = partId - 1;
454 int delta = std::abs(ylen - addLen);
455 int selectStart = std::max(addPos - addId, addPos + delta - (taul - addId));
456 int selectEnd = std::min(addPos + addId, addPos + delta + (taul - addId));
457 bool found = false;
458
459 for (int i = selectStart; stPos <= selectEnd; i++) {
460 bool equal = true;
461 for(int l = 0; l < addLen; l++) {
462 if(work_dataset[xid][i + l] != query_dataset[yid][addPos + l]) {
463 equal = false;
464 break;
465 }
466 }
467 if(equal) {
468 found = true;
469 if(i == selectStart && addId >= 1)
470 return iterativeVerifyLeftPartRS(xid, yid, partPos[addId - 1][wlen], addPos, selectStart, wlen, taul - 1, addId);
471 else
472 return true;
473 }
474 }
475
476 if(found == false)
477 return false;
478
479 return true;
480 }
481
482 bool iterativeVerifyRightPartRS(int xid, int yid, int xpos, int ypos, int xlen, int ylen, int partId, int tid) {
483 return true;
484 }
485
486 // join
487 void selfJoin(std::vector<std::pair<int, int>> &finalPairs);
488 void RSJoin(std::vector<std::pair<int, int>> &finalPairs);
489 // check
490 void checkSelfResults() const;
491 void printDebugInfo(int currLen) const;
492};
493
494
496{
497public:
498 ExactJoinParallel() = default;
500 ExactJoinParallel(const ExactJoinParallel &other) = delete;
502
503public:
504 static void exactJoinRS(const std::vector<std::string> &colA, const std::vector<std::string> &colB,
505 std::vector<std::pair<int, int>> &pairs, ui _maxHeapSize = 0) {
506 ui sizeA = colA.size();
507 ui sizeB = colB.size();
508
509 std::unordered_map<ui, std::vector<ui>> indexA;
510 std::unordered_map<ui, std::vector<ui>> indexB;
511
512 std::vector<std::pair<int, int>> tempPairs[MAXTHREADNUM];
513 int eralyTerminated[MAXTHREADNUM] = { 0 };
514 ui maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE : _maxHeapSize;
515
516 for (ui j = 0; j < sizeA; j++) {
517 ui length = colA[j].length();
518 // skip empty
519#if DROP_EMPTY == 1
520 if (length == 0)
521 continue;
522#endif
523 indexA[length].emplace_back(j);
524 }
525 for (ui j = 0; j < sizeB; j++) {
526 ui length = colB[j].length();
527 // skip empty
528#if DROP_EMPTY == 1
529 if (length == 0)
530 continue;
531#endif
532 indexB[length].emplace_back(j);
533 }
534
535 for (auto &itA : indexA) {
536 ui bucketSizeA = itA.second.size();
537 ui bucketSizeB = indexB[itA.first].size();
538 const auto &bucketB = indexB[itA.first];
539 #pragma omp parallel for
540 for (ui ii = 0; ii < bucketSizeA; ii++) {
541 int tid = omp_get_thread_num();
542 if(eralyTerminated[tid] == 1)
543 continue;
544
545 for(ui jj = 0; jj < bucketSizeB; jj++)
546 if(colA[itA.second[ii]] == colB[bucketB[jj]])
547 tempPairs[tid].emplace_back(ii, jj);
548
549 if(tempPairs[tid].size() >= maxHeapSize)
550 eralyTerminated[tid] = 1;
551 }
552 }
553
554 for (const auto &tp : tempPairs)
555 pairs.insert(pairs.end(), tp.begin(), tp.end());
556 }
557
558 static void exactJoinSelf(const std::vector<std::string> &col, std::vector<std::pair<int, int>> &pairs,
559 ui _maxHeapSize = 0) {
560 ui size = col.size();
561
562 std::unordered_map<ui, std::vector<ui>> index;
563 std::vector<std::pair<int, int>> tempPairs[MAXTHREADNUM];
564 int eralyTerminated[MAXTHREADNUM] = { 0 };
565 ui maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE : _maxHeapSize;
566
567 for (ui j = 0; j < size; j++) {
568 ui length = col[j].length();
569 // skip empty
570#if DROP_EMPTY == 1
571 if (length == 0)
572 continue;
573#endif
574 index[length].emplace_back(j);
575 }
576
577 for(const auto &it : index) {
578 ui bucketSize = it.second.size();
579 #pragma omp parallel for
580 for(ui ii = 0; ii < bucketSize; ii++) {
581 int tid = omp_get_thread_num();
582 if(eralyTerminated[tid] == 1)
583 continue;
584
585 for(ui jj = ii + 1; jj < bucketSize; jj++)
586 if(col[it.second[ii]] == col[it.second[jj]])
587 tempPairs[tid].emplace_back(ii, jj);
588
589 if(tempPairs[tid].size() >= maxHeapSize)
590 eralyTerminated[tid] = 1;
591 }
592 }
593
594 for (const auto &tp : tempPairs)
595 pairs.insert(pairs.end(), tp.begin(), tp.end());
596 }
597};
598
599
600#endif // _STRING_JOIN_PARALLEL_H_
Definition stringjoin_parallel.h:496
ExactJoinParallel(ExactJoinParallel &&other)=delete
ExactJoinParallel()=default
static void exactJoinRS(const std::vector< std::string > &colA, const std::vector< std::string > &colB, std::vector< std::pair< int, int > > &pairs, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:504
static void exactJoinSelf(const std::vector< std::string > &col, std::vector< std::pair< int, int > > &pairs, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:558
ExactJoinParallel(const ExactJoinParallel &other)=delete
~ExactJoinParallel()=default
Definition stringjoin_parallel.h:27
std::vector< PIndex > ** partIndex
Definition stringjoin_parallel.h:87
std::vector< uint64_t > queryPrefixHash
Definition stringjoin_parallel.h:42
std::unordered_map< uint64_t, std::vector< std::pair< int, int > > > InvListPrefix
Definition stringjoin_parallel.h:30
int workN
Definition stringjoin_parallel.h:53
int PN
Definition stringjoin_parallel.h:57
StringJoinParallel(StringJoinParallel &&other)=delete
std::vector< int > queryLengthArray
Definition stringjoin_parallel.h:81
int _left[MAXTHREADNUM]
Definition stringjoin_parallel.h:71
std::unordered_map< uint64_t, std::vector< int > > InvListsParallel
Definition stringjoin_parallel.h:29
std::vector< uint64_t > workPrefixHash
Definition stringjoin_parallel.h:41
void init()
Definition stringjoin_parallel.cc:9
std::vector< int > workLengthArray
Definition stringjoin_parallel.h:80
InvListsParallel ** invLists
Definition stringjoin_parallel.h:85
int N
Definition stringjoin_parallel.h:55
std::vector< std::string > query_dataset
Definition stringjoin_parallel.h:61
void RSJoin(std::vector< std::pair< int, int > > &finalPairs)
Definition stringjoin_parallel.cc:455
uint64_t veriNum
Definition stringjoin_parallel.h:65
int queryN
Definition stringjoin_parallel.h:54
int minDictLen
Definition stringjoin_parallel.h:51
std::vector< std::vector< int > > querylengthMap
Definition stringjoin_parallel.h:83
bool valid[MAXTHREADNUM]
Definition stringjoin_parallel.h:68
bool verifyLeftPartRS(int xid, int yid, int xlen, int ylen, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:341
uint64_t realNum
Definition stringjoin_parallel.h:67
void printDebugInfo(int currLen) const
Definition stringjoin_parallel.cc:753
int earlyTerminated[MAXTHREADNUM]
Definition stringjoin_parallel.h:43
int * workInvSC
Definition stringjoin_parallel.h:91
int queryMinDictLen
Definition stringjoin_parallel.h:48
hashValue * hv[MAXTHREADNUM]
Definition stringjoin_parallel.h:88
int modNumber
Definition stringjoin_parallel.h:59
bool iterativeVerifyLeftPartRS(int xid, int yid, int stPos, int xlen, int ylen, int wlen, int taul, int partId)
Definition stringjoin_parallel.h:440
bool * quickRef[MAXTHREADNUM]
Definition stringjoin_parallel.h:76
int workMinDictLen
Definition stringjoin_parallel.h:46
int D
Definition stringjoin_parallel.h:56
int left[MAXTHREADNUM]
Definition stringjoin_parallel.h:69
int ** _matrix[MAXTHREADNUM]
Definition stringjoin_parallel.h:75
bool verifyLeftPartSelf(int xid, int yid, int xlen, int ylen, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:263
bool iterativeVerifyRightPartRS(int xid, int yid, int xpos, int ypos, int xlen, int ylen, int partId, int tid)
Definition stringjoin_parallel.h:482
uint64_t candNum
Definition stringjoin_parallel.h:64
int * dist
Definition stringjoin_parallel.h:79
int sharePrefix
Definition stringjoin_parallel.h:40
std::vector< std::vector< int > > worklengthMap
Definition stringjoin_parallel.h:82
~StringJoinParallel()
Definition stringjoin_parallel.h:221
int queryMaxDictLen
Definition stringjoin_parallel.h:49
void selfJoin(std::vector< std::pair< int, int > > &finalPairs)
Definition stringjoin_parallel.cc:206
int workMaxDictLen
Definition stringjoin_parallel.h:47
std::vector< std::string > work_dataset
Definition stringjoin_parallel.h:60
StringJoinParallel()=default
int right[MAXTHREADNUM]
Definition stringjoin_parallel.h:70
std::unordered_set< std::string > strCount
Definition stringjoin_parallel.h:90
int ** partLen
Definition stringjoin_parallel.h:77
ui maxHeapSize
Definition stringjoin_parallel.h:94
void checkSelfResults() const
Definition stringjoin_parallel.cc:732
uint64_t avgDictLen
Definition stringjoin_parallel.h:52
int _right[MAXTHREADNUM]
Definition stringjoin_parallel.h:72
StringJoinParallel(const std::vector< std::string > &work, const std::vector< std::string > &query, int threshold, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:150
std::vector< std::pair< int, int > > pairs[MAXTHREADNUM]
Definition stringjoin_parallel.h:62
int * queryInvSC
Definition stringjoin_parallel.h:92
StringJoinParallel(const std::vector< std::string > &data, int threshold, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:103
StringJoinParallel(const StringJoinParallel &other)=delete
uint64_t * power
Definition stringjoin_parallel.h:84
void prepareSelf()
Definition stringjoin_parallel.cc:77
bool verifyRightPartSelf(int xid, int yid, int xlen, int ylen, int xpos, int ypos, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:302
int ** matrix[MAXTHREADNUM]
Definition stringjoin_parallel.h:74
bool verifyRightPartRS(int xid, int yid, int xlen, int ylen, int xpos, int ypos, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:390
InvListPrefix ** invListsPre
Definition stringjoin_parallel.h:86
int ** partPos
Definition stringjoin_parallel.h:78
uint64_t listNum
Definition stringjoin_parallel.h:66
int hashNumber
Definition stringjoin_parallel.h:58
int maxDictLen
Definition stringjoin_parallel.h:50
void prepareRS()
Definition stringjoin_parallel.cc:128
static int min(int a, int b, int c)
Definition joinutil.h:86
static bool strLessT(const std::string &s1, const std::string &s2)
Definition joinutil.cc:243
#define MAX_PAIR_SIZE
Definition config.h:44
#define MAXTHREADNUM
Definition config.h:38
Definition index.h:105
unsigned int ui
Definition type.h:8