6#ifndef _STRING_JOIN_PARALLEL_H_
7#define _STRING_JOIN_PARALLEL_H_
17#include <unordered_map>
18#include <unordered_set>
30 using InvListPrefix = std::unordered_map<uint64_t, std::vector<std::pair<int, int>>>;
36 uint64_t globalTimerCount{0};
95#if MAINTAIN_VALUE_EDIT == 1
106 printf(
"Drop empty strings for string(lev) join\n");
119 for(
int i = 0; i <
workN; i++) {
135 printf(
"Number of tokens: %ld\n",
strCount.size());
140 printf(
"Trigger approximate join on sharing prefix: %d\n",
sharePrefix);
144#if MAINTAIN_VALUE_EDIT == 1
150 StringJoinParallel(
const std::vector<std::string>& work,
const std::vector<std::string>& query,
int threshold,
152 :
workN(work.size()),
queryN(query.size()),
D(threshold),
PN(threshold + 1),
155 printf(
"Drop empty strings for string(lev) join\n");
181 for(
int i = 0; i <
workN; i++) {
188 for(
int i = 0; i <
queryN; i++) {
206 printf(
"Number of tokens: %ld\n",
strCount.size());
211 printf(
"Trigger approximate join on sharing prefix: %d\n",
sharePrefix);
215#if MAINTAIN_VALUE_EDIT == 1
222 for (
int lp = 0; lp <
PN; lp++) {
250 std::cout <<
"destructor" << std::endl << std::flush;
266 auto &currM =
matrix[tid];
268 for (
int i = sharing + 1; i <= xlen; i++) {
271 if (i <=
left[tid]) {
276 int val1 = i -
left[tid];
277 int val2 = i +
right[tid];
278 int lowerBound = std::max(val1, 1);
279 int upperBound = std::min(val2, ylen);
281 for (
int j = lowerBound; j <= upperBound; j++) {
284 if (workString[i - 1] == queryString[j - 1])
285 currM[i][val] = currM[i - 1][val];
288 j - 1 >= val1 ? currM[i][val - 1] :
D,
289 j + 1 <= val2 ? currM[i - 1][val + 1] :
D) + 1;
291 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
299 return currM[xlen][ylen - xlen +
D] <= Tau;
302 bool verifyRightPartSelf(
int xid,
int yid,
int xlen,
int ylen,
int xpos,
int ypos,
int Tau,
int tid,
int sharing=0) {
307 for (
int i = sharing + 1; i <= xlen; i++) {
310 if (i <=
_left[tid]) {
315 int val1 = i -
_left[tid];
316 int val2 = i +
_right[tid];
317 int lowerBound = std::max(val1, 1);
318 int upperBound = std::min(val2, ylen);
320 for (
int j = lowerBound; j <= upperBound; j++) {
323 if (workString[xpos + i - 1] == queryString[ypos + j - 1])
324 currM[i][val] = currM[i - 1][val];
327 j - 1 >= val1 ? currM[i][val - 1] :
D,
328 j + 1 <= val2 ? currM[i - 1][val + 1] :
D) + 1;
330 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
338 return currM[xlen][ylen - xlen +
D] <= Tau;
341 bool verifyLeftPartRS(
int xid,
int yid,
int xlen,
int ylen,
int Tau,
int tid,
int sharing=0) {
344 gettimeofday(&begin, NULL);
348 auto &currM =
matrix[tid];
350 for (
int i = sharing + 1; i <= xlen; i++) {
353 if (i <=
left[tid]) {
358 int val1 = i -
left[tid];
359 int val2 = i +
right[tid];
360 int lowerBound = std::max(val1, 1);
361 int upperBound = std::min(val2, ylen);
363 for (
int j = lowerBound; j <= upperBound; j++) {
366 if (workString[i - 1] == queryString[j - 1])
367 currM[i][val] = currM[i - 1][val];
370 j - 1 >= val1 ? currM[i][val - 1] :
D,
371 j + 1 <= val2 ? currM[i - 1][val + 1] :
D) + 1;
373 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
382 gettimeofday(&end, NULL);
383 double elapsedTime = end.tv_sec - begin.tv_sec + (end.tv_usec - begin.tv_usec) / 1e6;
384 verifyingCost[tid] += elapsedTime;
387 return currM[xlen][ylen - xlen +
D] <= Tau;
390 bool verifyRightPartRS(
int xid,
int yid,
int xlen,
int ylen,
int xpos,
int ypos,
int Tau,
int tid,
int sharing=0) {
393 gettimeofday(&begin, NULL);
399 for (
int i = sharing + 1; i <= xlen; i++) {
402 if (i <=
_left[tid]) {
407 int val1 = i -
_left[tid];
408 int val2 = i +
_right[tid];
409 int lowerBound = std::max(val1, 1);
410 int upperBound = std::min(val2, ylen);
412 for (
int j = lowerBound; j <= upperBound; j++) {
415 if (workString[xpos + i - 1] == queryString[ypos + j - 1])
416 currM[i][val] = currM[i - 1][val];
419 j - 1 >= val1 ? currM[i][val - 1] :
D,
420 j + 1 <= val2 ? currM[i - 1][val + 1] :
D) + 1;
422 if (abs(xlen - ylen - i + j) + currM[i][val] <= Tau)
431 gettimeofday(&end, NULL);
432 double elapsedTime = end.tv_sec - begin.tv_sec + (end.tv_usec - begin.tv_usec) / 1e6;
433 verifyingCost[tid] += elapsedTime;
436 return currM[xlen][ylen - xlen +
D] <= Tau;
452 int addLen = xidx - stPos;
453 int addId = partId - 1;
454 int delta = std::abs(ylen - addLen);
455 int selectStart = std::max(addPos - addId, addPos + delta - (taul - addId));
456 int selectEnd = std::min(addPos + addId, addPos + delta + (taul - addId));
459 for (
int i = selectStart; stPos <= selectEnd; i++) {
461 for(
int l = 0; l < addLen; l++) {
469 if(i == selectStart && addId >= 1)
487 void selfJoin(std::vector<std::pair<int, int>> &finalPairs);
488 void RSJoin(std::vector<std::pair<int, int>> &finalPairs);
504 static void exactJoinRS(
const std::vector<std::string> &colA,
const std::vector<std::string> &colB,
505 std::vector<std::pair<int, int>> &pairs,
ui _maxHeapSize = 0) {
506 ui sizeA = colA.size();
507 ui sizeB = colB.size();
509 std::unordered_map<ui, std::vector<ui>> indexA;
510 std::unordered_map<ui, std::vector<ui>> indexB;
512 std::vector<std::pair<int, int>> tempPairs[
MAXTHREADNUM];
516 for (
ui j = 0; j < sizeA; j++) {
517 ui length = colA[j].length();
523 indexA[length].emplace_back(j);
525 for (
ui j = 0; j < sizeB; j++) {
526 ui length = colB[j].length();
532 indexB[length].emplace_back(j);
535 for (
auto &itA : indexA) {
536 ui bucketSizeA = itA.second.size();
537 ui bucketSizeB = indexB[itA.first].size();
538 const auto &bucketB = indexB[itA.first];
539 #pragma omp parallel for
540 for (
ui ii = 0; ii < bucketSizeA; ii++) {
541 int tid = omp_get_thread_num();
542 if(eralyTerminated[tid] == 1)
545 for(
ui jj = 0; jj < bucketSizeB; jj++)
546 if(colA[itA.second[ii]] == colB[bucketB[jj]])
547 tempPairs[tid].emplace_back(ii, jj);
549 if(tempPairs[tid].size() >= maxHeapSize)
550 eralyTerminated[tid] = 1;
554 for (
const auto &tp : tempPairs)
555 pairs.insert(pairs.end(), tp.begin(), tp.end());
558 static void exactJoinSelf(
const std::vector<std::string> &col, std::vector<std::pair<int, int>> &pairs,
559 ui _maxHeapSize = 0) {
560 ui size = col.size();
562 std::unordered_map<ui, std::vector<ui>> index;
563 std::vector<std::pair<int, int>> tempPairs[
MAXTHREADNUM];
567 for (
ui j = 0; j < size; j++) {
568 ui length = col[j].length();
574 index[length].emplace_back(j);
577 for(
const auto &it : index) {
578 ui bucketSize = it.second.size();
579 #pragma omp parallel for
580 for(
ui ii = 0; ii < bucketSize; ii++) {
581 int tid = omp_get_thread_num();
582 if(eralyTerminated[tid] == 1)
585 for(
ui jj = ii + 1; jj < bucketSize; jj++)
586 if(col[it.second[ii]] == col[it.second[jj]])
587 tempPairs[tid].emplace_back(ii, jj);
589 if(tempPairs[tid].size() >= maxHeapSize)
590 eralyTerminated[tid] = 1;
594 for (
const auto &tp : tempPairs)
595 pairs.insert(pairs.end(), tp.begin(), tp.end());
Definition stringjoin_parallel.h:496
ExactJoinParallel(ExactJoinParallel &&other)=delete
ExactJoinParallel()=default
static void exactJoinRS(const std::vector< std::string > &colA, const std::vector< std::string > &colB, std::vector< std::pair< int, int > > &pairs, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:504
static void exactJoinSelf(const std::vector< std::string > &col, std::vector< std::pair< int, int > > &pairs, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:558
ExactJoinParallel(const ExactJoinParallel &other)=delete
~ExactJoinParallel()=default
Definition stringjoin_parallel.h:27
std::vector< PIndex > ** partIndex
Definition stringjoin_parallel.h:87
std::vector< uint64_t > queryPrefixHash
Definition stringjoin_parallel.h:42
std::unordered_map< uint64_t, std::vector< std::pair< int, int > > > InvListPrefix
Definition stringjoin_parallel.h:30
int workN
Definition stringjoin_parallel.h:53
int PN
Definition stringjoin_parallel.h:57
StringJoinParallel(StringJoinParallel &&other)=delete
std::vector< int > queryLengthArray
Definition stringjoin_parallel.h:81
int _left[MAXTHREADNUM]
Definition stringjoin_parallel.h:71
std::unordered_map< uint64_t, std::vector< int > > InvListsParallel
Definition stringjoin_parallel.h:29
std::vector< uint64_t > workPrefixHash
Definition stringjoin_parallel.h:41
void init()
Definition stringjoin_parallel.cc:9
std::vector< int > workLengthArray
Definition stringjoin_parallel.h:80
InvListsParallel ** invLists
Definition stringjoin_parallel.h:85
int N
Definition stringjoin_parallel.h:55
std::vector< std::string > query_dataset
Definition stringjoin_parallel.h:61
void RSJoin(std::vector< std::pair< int, int > > &finalPairs)
Definition stringjoin_parallel.cc:455
uint64_t veriNum
Definition stringjoin_parallel.h:65
int queryN
Definition stringjoin_parallel.h:54
int minDictLen
Definition stringjoin_parallel.h:51
std::vector< std::vector< int > > querylengthMap
Definition stringjoin_parallel.h:83
bool valid[MAXTHREADNUM]
Definition stringjoin_parallel.h:68
bool verifyLeftPartRS(int xid, int yid, int xlen, int ylen, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:341
uint64_t realNum
Definition stringjoin_parallel.h:67
void printDebugInfo(int currLen) const
Definition stringjoin_parallel.cc:753
int earlyTerminated[MAXTHREADNUM]
Definition stringjoin_parallel.h:43
int * workInvSC
Definition stringjoin_parallel.h:91
int queryMinDictLen
Definition stringjoin_parallel.h:48
hashValue * hv[MAXTHREADNUM]
Definition stringjoin_parallel.h:88
int modNumber
Definition stringjoin_parallel.h:59
bool iterativeVerifyLeftPartRS(int xid, int yid, int stPos, int xlen, int ylen, int wlen, int taul, int partId)
Definition stringjoin_parallel.h:440
bool * quickRef[MAXTHREADNUM]
Definition stringjoin_parallel.h:76
int workMinDictLen
Definition stringjoin_parallel.h:46
int D
Definition stringjoin_parallel.h:56
int left[MAXTHREADNUM]
Definition stringjoin_parallel.h:69
int ** _matrix[MAXTHREADNUM]
Definition stringjoin_parallel.h:75
bool verifyLeftPartSelf(int xid, int yid, int xlen, int ylen, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:263
bool iterativeVerifyRightPartRS(int xid, int yid, int xpos, int ypos, int xlen, int ylen, int partId, int tid)
Definition stringjoin_parallel.h:482
uint64_t candNum
Definition stringjoin_parallel.h:64
int * dist
Definition stringjoin_parallel.h:79
int sharePrefix
Definition stringjoin_parallel.h:40
std::vector< std::vector< int > > worklengthMap
Definition stringjoin_parallel.h:82
~StringJoinParallel()
Definition stringjoin_parallel.h:221
int queryMaxDictLen
Definition stringjoin_parallel.h:49
void selfJoin(std::vector< std::pair< int, int > > &finalPairs)
Definition stringjoin_parallel.cc:206
int workMaxDictLen
Definition stringjoin_parallel.h:47
std::vector< std::string > work_dataset
Definition stringjoin_parallel.h:60
StringJoinParallel()=default
int right[MAXTHREADNUM]
Definition stringjoin_parallel.h:70
std::unordered_set< std::string > strCount
Definition stringjoin_parallel.h:90
int ** partLen
Definition stringjoin_parallel.h:77
ui maxHeapSize
Definition stringjoin_parallel.h:94
void checkSelfResults() const
Definition stringjoin_parallel.cc:732
uint64_t avgDictLen
Definition stringjoin_parallel.h:52
int _right[MAXTHREADNUM]
Definition stringjoin_parallel.h:72
StringJoinParallel(const std::vector< std::string > &work, const std::vector< std::string > &query, int threshold, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:150
std::vector< std::pair< int, int > > pairs[MAXTHREADNUM]
Definition stringjoin_parallel.h:62
int * queryInvSC
Definition stringjoin_parallel.h:92
StringJoinParallel(const std::vector< std::string > &data, int threshold, ui _maxHeapSize=0)
Definition stringjoin_parallel.h:103
StringJoinParallel(const StringJoinParallel &other)=delete
uint64_t * power
Definition stringjoin_parallel.h:84
void prepareSelf()
Definition stringjoin_parallel.cc:77
bool verifyRightPartSelf(int xid, int yid, int xlen, int ylen, int xpos, int ypos, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:302
int ** matrix[MAXTHREADNUM]
Definition stringjoin_parallel.h:74
bool verifyRightPartRS(int xid, int yid, int xlen, int ylen, int xpos, int ypos, int Tau, int tid, int sharing=0)
Definition stringjoin_parallel.h:390
InvListPrefix ** invListsPre
Definition stringjoin_parallel.h:86
int ** partPos
Definition stringjoin_parallel.h:78
uint64_t listNum
Definition stringjoin_parallel.h:66
int hashNumber
Definition stringjoin_parallel.h:58
int maxDictLen
Definition stringjoin_parallel.h:50
void prepareRS()
Definition stringjoin_parallel.cc:128
static int min(int a, int b, int c)
Definition joinutil.h:86
static bool strLessT(const std::string &s1, const std::string &s2)
Definition joinutil.cc:243
#define MAX_PAIR_SIZE
Definition config.h:44
#define MAXTHREADNUM
Definition config.h:38
unsigned int ui
Definition type.h:8