Entity Matching by Similarity Join
 
Loading...
Searching...
No Matches
setjoin.h
Go to the documentation of this file.
1/*
2 * author: Dong Deng
3 * modified: Zhencan Peng in rutgers-db/RedPajama_Analysis
4 * modified: Yunqi Li
5 * contact: liyunqixa@gmail.com
6 */
7#ifndef _SETJOIN_H_
8#define _SETJOIN_H_
9
10#include "config.h"
11#include "type.h"
12#include "joinutil.h"
13#include <iostream>
14#include <fstream>
15#include <vector>
16#include <unordered_map>
17#include <unordered_set>
18#include <string>
19#include <algorithm>
20#include <queue>
21#include <cmath>
22#include <cstdio>
23#include <string.h>
24#include <inttypes.h>
25#include <assert.h>
26
27
28// SetJoin, single thread
29/*
30 * adaptive grouping mechanism is only suitable for threshold bigger than 0.5
31 * since the \alpha is within [0.5, 1]
32 * and that's reason why we have a seg fault when threshold is smaller than 0.5
33 * thus we just adopt length filter
34 */
35class SetJoin
36{
37public:
38 double overlap_cost = 0;
39 double allocation_cost = 0;
40 double index_cost = 0;
41
42public:
43 bool ifRS = false;
44 // join func
46 double (SetJoin::*weightedFunc)(ui, ui) = nullptr;
47 double (SetJoin::*normalFunc)(ui, ui) = nullptr;
48 bool (SetJoin::*overlapFunc)(int, int, int, int, int) = nullptr;
49 // index
50 std::vector<std::pair<int, int>> cacheVec;
51 std::vector<std::vector<std::pair<int, int>>> indexVecs;
52
53public:
54 double det; // delta in vldb2016-setjoin
55 uint64_t resultNum = 0;
56 uint64_t candidateNum = 0;
57 uint64_t lengthSum = 0;
58 uint64_t listlens = 0;
59
61 std::vector<std::vector<ui>> dataset_all;
62 std::vector<std::vector<ui>> work_dataset;
63 std::vector<std::vector<ui>> query_dataset;
64 std::vector<double> work_weights;
65 std::vector<double> query_weights;
66 std::vector<double> wordwt;
67 // bucket for empty tokenzied sets
68 std::vector<ui> workEmpty, queryEmpty;
69 std::vector<std::pair<int, int>> result_pairs;
70 // io
71 std::string simP_file_path;
72 // heap
74#if MAINTAIN_VALUE == 1
75 bool isWeightedComp{false}; // use the weighted version sim funcs
76 std::vector<WeightPair> result_pairs_;
77 int isHeap = 0;
78#endif
79
80 struct invertedList {
81 int vec_no, cnt;
82 std::pair<int, int> cache[CACHE_SIZE];
83
84 std::vector<std::pair<int, int>>& getVector(SetJoin *joiner) const {
85 if (cnt <= CACHE_SIZE) {
86 joiner->cacheVec.assign(cache, cache + cnt);
87 return joiner->cacheVec;
88 } else
89 return joiner->indexVecs[vec_no];
90 }
91
92 void add(std::pair<int, int> data, SetJoin *joiner) {
93 if (cnt < CACHE_SIZE)
94 cache[cnt++] = data;
95 else {
96 if (CACHE_SIZE == cnt) {
97 joiner->indexVecs.push_back(std::vector<std::pair<int, int>>());
98 vec_no = joiner->indexVecs.size() - 1;
99 joiner->indexVecs[vec_no].assign(cache, cache + CACHE_SIZE);
100 }
101 ++cnt;
102 joiner->indexVecs[vec_no].push_back(data);
103 }
104 }
105 };
106
108 unsigned long long list_no;
110
112 };
113
114 std::vector<invertedList> indexLists;
115
116public:
117 SetJoin() = default;
118 SetJoin(const std::vector<std::vector<ui>> &sorted_records, const std::vector<double> &recwt,
119 const std::vector<double> &_wordwt, std::string _sim_pairs_filepath, double _det,
120 ui _maxHeapSize = 0, bool _isWeightedComp = false)
121 : ifRS(false), work_dataset(sorted_records), work_weights(recwt), wordwt(_wordwt), simP_file_path(_sim_pairs_filepath) {
122 indexVecs.clear();
123 cacheVec.clear();
124 cacheVec.resize(CACHE_SIZE);
125
126 ui min_records_size = sorted_records.front().size();
127 ui max_records_size = sorted_records.back().size();
128
129#if RESIZE_DATA == 1
130 if(_det <= 0.4 && max_records_size >= 55) {
132 max_records_size = work_dataset.back().size();
133 }
134#endif
135
136 printf("Work size: %zu\n", work_dataset.size());
137 printf("Min record's size: %u\tMax record's size: %u\n", min_records_size, max_records_size);
138
139 maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE_SERIAL : _maxHeapSize;
140#if MAINTAIN_VALUE == 1
141 isWeightedComp = _isWeightedComp;
142 result_pairs_.reserve(maxHeapSize);
143#endif
144 }
145 // RS-join
146 SetJoin(const std::vector<std::vector<ui>> &work_records, const std::vector<std::vector<ui>> &query_records,
147 const std::vector<double> &workwt, const std::vector<double> &querywt, const std::vector<double> &_wordwt,
148 const std::string &_sim_pairs_filepath, double _det, ui _maxHeapSize = 0, bool _isWeightedComp = false)
149 : ifRS(true), work_dataset(work_records), query_dataset(query_records), work_weights(workwt), query_weights(querywt),
150 wordwt(_wordwt), simP_file_path(_sim_pairs_filepath) {
151 indexVecs.clear();
152 cacheVec.clear();
153 cacheVec.resize(CACHE_SIZE);
154
155 ui min_work_size = work_dataset.front().size();
156 ui min_query_size = query_dataset.front().size();
157 ui max_work_size = work_dataset.back().size();
158 ui max_query_size = query_dataset.back().size();
159
160#if RESIZE_DATA == 1
161 if(_det <= 0.4 && max_work_size >= 55) {
163 max_work_size = work_dataset.back().size();
164 }
165 if(_det <= 0.4 && max_query_size >= 55) {
167 max_query_size = query_dataset.back().size();
168 }
169#endif
170
171 printf("Work size: %zu\tQuery size: %zu\n", work_dataset.size(), query_dataset.size());
172 printf("Min work record's size: %u\tMax work record's size: %u\n", min_work_size, max_work_size);
173 printf("Min query record's size: %u\tMax query record's size: %u\n", min_query_size, max_query_size);
174
175 maxHeapSize = _maxHeapSize == 0 ? MAX_PAIR_SIZE_SERIAL : _maxHeapSize;
176#if MAINTAIN_VALUE == 1
177 isWeightedComp = _isWeightedComp;
178 result_pairs_.reserve(maxHeapSize);
179#endif
180 }
181
183 cacheVec.clear();
184 indexVecs.clear();
185 }
186
187public:
188 void loadDataset(const std::vector<std::vector<ui>> &records, std::string file) {
189 simP_file_path = file;
190 dataset_all = records;
191 work_dataset.resize(dataset_all.size());
192 indexVecs.clear();
193 cacheVec.clear();
194 cacheVec.resize(CACHE_SIZE);
195 }
196 void prepare(const std::vector<std::vector<ui>> &offsets, ui column) {
197 ui size = dataset_all.size();
198 for(ui i = 0; i < size; i++)
199 work_dataset[i].clear();
200 for(ui i = 0; i < size; i++) {
201 ui prefix = offsets[i][column];
202 ui suffix = offsets[i][column + 1];
203 // for(ui j = prefix; j < suffix; j++)
204 // work_dataset[i].emplace_back(dataset_all[i][j]);
205 work_dataset[i].assign(dataset_all[i].begin() + prefix,
206 dataset_all[i].begin() + suffix);
207 }
208 }
209
210 void resizeData(std::vector<std::vector<ui>> &dataset) {
211 // [\sigma - 2 * sd, \sigma + 2 * sd]
212 int unempty = 0;
213 double avergaeSize = 0.0;
214 for(const auto &rec : dataset) {
215 if(rec.empty()) continue;
216 avergaeSize += (double)rec.size() * 1.0;
217 ++ unempty;
218 }
219 avergaeSize = avergaeSize / (unempty * 1.0);
220 long double sd = 0.0;
221 for(const auto &rec : dataset) {
222 if(rec.empty()) continue;
223 sd += (rec.size() * 1.0 - avergaeSize) * (rec.size() * 1.0 - avergaeSize);
224 }
225 sd = sd / (unempty * 1.0);
226 printf("Resize dataset on 1sd, mean: %.1lf\tsd: %.1Lf\n", avergaeSize, sd);
227 int bound = ceil(avergaeSize + 1.0 * sd - 1e-5);
228 for(auto &rec : dataset)
229 if((int)rec.size() > bound)
230 rec.resize(bound);
231 }
232
233 // weighted sim funcs
234 double weightedOverlap(ui x, ui y) {
235 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
236 const auto &records2 = work_dataset[y];
237
238 std::vector<ui> res;
239 set_intersection(records1.begin(), records1.end(),
240 records2.begin(), records2.end(),
241 std::back_inserter(res));
242
243 double ovlp = 0.0;
244 for(const auto &e : res)
245 ovlp += wordwt[e];
246
247 return ovlp;
248 }
249
250 double weightedJaccard(ui x, ui y) {
251 double ovlp = weightedOverlap(x, y);
252 double rw1 = ifRS ? query_weights[x] : work_weights[x];
253 double rw2 = work_weights[y];
254
255 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
256
257 return ovlp / (rw1 + rw2 - ovlp);
258 }
259 double jaccard(ui x, ui y) {
260 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
261 const auto &records2 = work_dataset[y];
262
263 std::vector<ui> res;
264 set_intersection(records1.begin(), records1.end(),
265 records2.begin(), records2.end(),
266 std::back_inserter(res));
267 int ovlp = (int)res.size();
268
269 return ovlp * 1.0 / (records1.size() + records2.size() - ovlp) * 1.0;
270 }
271
272 double weightedCosine(ui x, ui y) {
273 double ovlp = weightedOverlap(x, y);
274 double rw1 = ifRS ? query_weights[x] : work_weights[x];
275 double rw2 = work_weights[y];
276
277 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
278
279 return ovlp / sqrt(rw1 * rw2);
280 }
281 double cosine(ui x, ui y) {
282 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
283 const auto &records2 = work_dataset[y];
284
285 std::vector<ui> res;
286 set_intersection(records1.begin(), records1.end(),
287 records2.begin(), records2.end(),
288 std::back_inserter(res));
289 int ovlp = (int)res.size();
290
291 return ovlp * 1.0 / sqrt(records1.size() * records2.size()) * 1.0;
292 }
293
294 double weightedDice(ui x, ui y) {
295 double ovlp = weightedOverlap(x, y);
296 double rw1 = ifRS ? query_weights[x] : work_weights[x];
297 double rw2 = work_weights[y];
298
299 assert(std::abs(rw1) > 1e-7 && std::abs(rw2) > 1e-7);
300
301 return 2.0 * ovlp / (rw1 + rw2);
302 }
303 double dice(ui x, ui y) {
304 const auto &records1 = ifRS ? query_dataset[x] : work_dataset[x];
305 const auto &records2 = work_dataset[y];
306
307 std::vector<ui> res;
308 set_intersection(records1.begin(), records1.end(),
309 records2.begin(), records2.end(),
310 std::back_inserter(res));
311 int ovlp = (int)res.size();
312
313 return ovlp * 2.0 / (records1.size() + records2.size()) * 1.0;
314 }
315
316 // check overlap between two sets
317 bool overlap(int x, int y, int posx = 0, int posy = 0, int current_overlap = 0);
318 bool overlapRS(int x, int y, int posx = 0, int posy = 0, int current_overlap = 0);
319 // Self-join
320 void setSelfJoin(double threshold, std::vector<std::pair<int, int>>& sim_pairs);
321 // RS-join
322 void setRSJoin(double threshold, std::vector<std::pair<int, int>>& sim_pairs);
323};
324
325
326
327#endif
Definition setjoin.h:36
void setRSJoin(double threshold, std::vector< std::pair< int, int > > &sim_pairs)
Definition setjoin.cc:619
void setSelfJoin(double threshold, std::vector< std::pair< int, int > > &sim_pairs)
Definition setjoin.cc:87
bool overlapRS(int x, int y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin.cc:48
SetJoin(const std::vector< std::vector< ui > > &work_records, const std::vector< std::vector< ui > > &query_records, const std::vector< double > &workwt, const std::vector< double > &querywt, const std::vector< double > &_wordwt, const std::string &_sim_pairs_filepath, double _det, ui _maxHeapSize=0, bool _isWeightedComp=false)
Definition setjoin.h:146
std::vector< ui > workEmpty
Definition setjoin.h:68
uint64_t candidateNum
Definition setjoin.h:56
std::vector< std::vector< ui > > query_dataset
Definition setjoin.h:63
std::vector< double > work_weights
Definition setjoin.h:64
std::vector< invertedList > indexLists
Definition setjoin.h:114
ui maxHeapSize
Definition setjoin.h:73
std::vector< std::vector< ui > > work_dataset
Definition setjoin.h:62
double weightedOverlap(ui x, ui y)
Definition setjoin.h:234
void loadDataset(const std::vector< std::vector< ui > > &records, std::string file)
Definition setjoin.h:188
int prime_exp[MAX_LINE_LENGTH]
Definition setjoin.h:60
double(SetJoin::* weightedFunc)(ui, ui)
Definition setjoin.h:46
bool(SetJoin::* overlapFunc)(int, int, int, int, int)
Definition setjoin.h:48
double weightedDice(ui x, ui y)
Definition setjoin.h:294
uint64_t listlens
Definition setjoin.h:58
bool ifRS
Definition setjoin.h:43
SetJoin(const std::vector< std::vector< ui > > &sorted_records, const std::vector< double > &recwt, const std::vector< double > &_wordwt, std::string _sim_pairs_filepath, double _det, ui _maxHeapSize=0, bool _isWeightedComp=false)
Definition setjoin.h:118
SimFuncType simFType
Definition setjoin.h:45
int isHeap
Definition setjoin.h:77
uint64_t lengthSum
Definition setjoin.h:57
uint64_t resultNum
Definition setjoin.h:55
std::vector< double > query_weights
Definition setjoin.h:65
double(SetJoin::* normalFunc)(ui, ui)
Definition setjoin.h:47
bool isWeightedComp
Definition setjoin.h:75
double weightedJaccard(ui x, ui y)
Definition setjoin.h:250
void resizeData(std::vector< std::vector< ui > > &dataset)
Definition setjoin.h:210
double jaccard(ui x, ui y)
Definition setjoin.h:259
std::vector< double > wordwt
Definition setjoin.h:66
std::vector< std::vector< std::pair< int, int > > > indexVecs
Definition setjoin.h:51
double det
Definition setjoin.h:54
double index_cost
Definition setjoin.h:40
double cosine(ui x, ui y)
Definition setjoin.h:281
std::string simP_file_path
Definition setjoin.h:71
~SetJoin()
Definition setjoin.h:182
std::vector< std::vector< ui > > dataset_all
Definition setjoin.h:61
std::vector< WeightPair > result_pairs_
Definition setjoin.h:76
double allocation_cost
Definition setjoin.h:39
std::vector< std::pair< int, int > > result_pairs
Definition setjoin.h:69
std::vector< std::pair< int, int > > cacheVec
Definition setjoin.h:50
SetJoin()=default
bool overlap(int x, int y, int posx=0, int posy=0, int current_overlap=0)
Definition setjoin.cc:12
double dice(ui x, ui y)
Definition setjoin.h:303
std::vector< ui > queryEmpty
Definition setjoin.h:68
double overlap_cost
Definition setjoin.h:38
double weightedCosine(ui x, ui y)
Definition setjoin.h:272
void prepare(const std::vector< std::vector< ui > > &offsets, ui column)
Definition setjoin.h:196
#define MAX_PAIR_SIZE_SERIAL
Definition config.h:51
#define CACHE_SIZE
Definition config.h:82
#define MAX_LINE_LENGTH
Definition config.h:81
Definition setjoin.h:107
invIndexStruct()
Definition setjoin.h:111
unsigned long long list_no
Definition setjoin.h:108
int * oneList
Definition setjoin.h:109
Definition setjoin.h:80
int vec_no
Definition setjoin.h:81
std::vector< std::pair< int, int > > & getVector(SetJoin *joiner) const
Definition setjoin.h:84
std::pair< int, int > cache[CACHE_SIZE]
Definition setjoin.h:82
void add(std::pair< int, int > data, SetJoin *joiner)
Definition setjoin.h:92
int cnt
Definition setjoin.h:81
SimFuncType
Definition type.h:48
unsigned int ui
Definition type.h:8