1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (c) 2016-2020 MariaDB
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: tuplehashjoin.cpp 9709 2013-07-20 06:08:46Z xlou $
20
21
22 #include <climits>
23 #include <cstdio>
24 #include <ctime>
25 #include <sys/time.h>
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <fcntl.h>
29 #include <iomanip>
30 #include <sstream>
31 #include <stdexcept>
32 #include <unistd.h>
33 //#define NDEBUG
34 #include <cassert>
35 #include <algorithm>
36 using namespace std;
37
38 #include "jlf_common.h"
39 #include "primitivestep.h"
40 #include "tuplehashjoin.h"
41 #include "calpontsystemcatalog.h"
42 #include "elementcompression.h"
43 #include "resourcemanager.h"
44 #include "tupleaggregatestep.h"
45 #include "errorids.h"
46 #include "diskjoinstep.h"
47 #include "vlarray.h"
48
49 using namespace execplan;
50 using namespace joiner;
51 using namespace rowgroup;
52 using namespace boost;
53 using namespace funcexp;
54
55 #include "querytele.h"
56 using namespace querytele;
57
58 #include "atomicops.h"
59 #include "spinlock.h"
60
61 namespace joblist
62 {
63
TupleHashJoinStep(const JobInfo & jobInfo)64 TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
65 JobStep(jobInfo),
66 joinType(INIT),
67 fTableOID1(0),
68 fTableOID2(0),
69 fOid1(0),
70 fOid2(0),
71 fDictOid1(0),
72 fDictOid2(0),
73 fSequence1(-1),
74 fSequence2(-1),
75 fTupleId1(-1),
76 fTupleId2(-1),
77 fCorrelatedSide(0),
78 resourceManager(jobInfo.rm),
79 runRan(false),
80 joinRan(false),
81 largeSideIndex(1),
82 joinIsTooBig(false),
83 isExeMgr(jobInfo.isExeMgr),
84 lastSmallOuterJoiner(-1),
85 fTokenJoin(-1),
86 fStatsMutexPtr(new boost::mutex()),
87 fFunctionJoinKeys(jobInfo.keyInfo->functionJoinKeys),
88 sessionMemLimit(jobInfo.umMemLimit),
89 rgdLock(false)
90 {
91 /* Need to figure out how much memory these use...
92 Overhead storing 16 byte elements is about 32 bytes. That
93 should stay the same for other element sizes.
94 */
95
96 pmMemLimit = resourceManager->getHjPmMaxMemorySmallSide(fSessionId);
97 uniqueLimit = resourceManager->getHjCPUniqueLimit();
98
99 fExtendedInfo = "THJS: ";
100 joinType = INIT;
101 joinThreadCount = resourceManager->getJlNumScanReceiveThreads();
102 largeBPS = NULL;
103 moreInput = true;
104 fQtc.stepParms().stepType = StepTeleStats::T_HJS;
105 outputDL = NULL;
106 ownsOutputDL = false;
107 djsSmallUsage = jobInfo.smallSideUsage;
108 djsSmallLimit = jobInfo.smallSideLimit;
109 djsLargeLimit = jobInfo.largeSideLimit;
110 djsPartitionSize = jobInfo.partitionSize;
111 isDML = jobInfo.isDML;
112
113 config::Config* config = config::Config::makeConfig();
114 string str = config->getConfig("HashJoin", "AllowDiskBasedJoin");
115
116 if (str.empty() || str == "y" || str == "Y")
117 allowDJS = true;
118 else
119 allowDJS = false;
120
121 numCores = resourceManager->numCores();
122 if (numCores <= 0)
123 numCores = 8;
124 /* Debugging, rand() is used to simulate failures
125 time_t t = time(NULL);
126 srand(t);
127 */
128 }
129
~TupleHashJoinStep()130 TupleHashJoinStep::~TupleHashJoinStep()
131 {
132 delete fStatsMutexPtr;
133
134 if (ownsOutputDL)
135 delete outputDL;
136
137 if (memUsedByEachJoin)
138 {
139 for (uint i = 0 ; i < smallDLs.size(); i++)
140 resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
141 }
142
143
144 //cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl;
145 }
146
run()147 void TupleHashJoinStep::run()
148 {
149 uint32_t i;
150
151 boost::mutex::scoped_lock lk(jlLock);
152
153 if (runRan)
154 return;
155
156 runRan = true;
157
158 deliverMutex.lock();
159
160 // cout << "TupleHashJoinStep::run(): fOutputJobStepAssociation.outSize = " << fOutputJobStepAssociation.outSize() << ", fDelivery = " << boolalpha << fDelivery << endl;
161 idbassert((fOutputJobStepAssociation.outSize() == 1 && !fDelivery) ||
162 (fOutputJobStepAssociation.outSize() == 0 && fDelivery));
163 idbassert(fInputJobStepAssociation.outSize() >= 2);
164
165 largeDL = fInputJobStepAssociation.outAt(largeSideIndex)->rowGroupDL();
166 largeIt = largeDL->getIterator();
167
168 for (i = 0; i < fInputJobStepAssociation.outSize(); i++)
169 {
170 if (i != largeSideIndex)
171 {
172 smallDLs.push_back(fInputJobStepAssociation.outAt(i)->rowGroupDL());
173 smallIts.push_back(smallDLs.back()->getIterator());
174 }
175 }
176
177 if (!fDelivery)
178 outputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
179 else if (!largeBPS)
180 {
181 ownsOutputDL = true;
182 outputDL = new RowGroupDL(1, 5);
183 outputIt = outputDL->getIterator();
184 }
185
186 joiners.resize(smallDLs.size());
187 mainRunner = jobstepThreadPool.invoke(HJRunner(this));
188 }
189
join()190 void TupleHashJoinStep::join()
191 {
192 boost::mutex::scoped_lock lk(jlLock);
193
194 if (joinRan)
195 return;
196
197 joinRan = true;
198 jobstepThreadPool.join(mainRunner);
199
200 if (djs)
201 {
202 for (int i = 0; i < (int) djsJoiners.size(); i++)
203 djs[i].join();
204
205 jobstepThreadPool.join(djsReader);
206 jobstepThreadPool.join(djsRelay);
207 //cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl;
208 }
209 }
210
211 // simple sol'n. Poll mem usage of Joiner once per second. Request mem
212 // increase after the fact. Failure to get mem will be detected and handled by
213 // the threads inserting into Joiner.
trackMem(uint index)214 void TupleHashJoinStep::trackMem(uint index)
215 {
216 boost::shared_ptr<TupleJoiner> joiner = joiners[index];
217 ssize_t memBefore = 0, memAfter = 0;
218 bool gotMem;
219
220 boost::unique_lock<boost::mutex> scoped(memTrackMutex);
221 while (!stopMemTracking)
222 {
223 memAfter = joiner->getMemUsage();
224 if (memAfter != memBefore)
225 {
226 gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
227 atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
228 memBefore = memAfter;
229 if (!gotMem)
230 return;
231 }
232 memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1));
233 }
234
235 // one more iteration to capture mem usage since last poll, for this one
236 // raise an error if mem went over the limit
237 memAfter = joiner->getMemUsage();
238 if (memAfter == memBefore)
239 return;
240 gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
241 atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
242 if (!gotMem)
243 {
244 if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) ||
245 (tableOid() < 3000 && tableOid() >= 1000)))
246 {
247 joinIsTooBig = true;
248 fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
249 errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
250 status(logging::ERR_JOIN_TOO_BIG);
251 cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl;
252 abort();
253 }
254 }
255 }
256
startSmallRunners(uint index)257 void TupleHashJoinStep::startSmallRunners(uint index)
258 {
259 utils::setThreadName("HJSStartSmall");
260 string extendedInfo;
261 JoinType jt;
262 boost::shared_ptr<TupleJoiner> joiner;
263
264 jt = joinTypes[index];
265 extendedInfo += toString();
266
267 if (typelessJoin[index])
268 {
269 joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index],
270 largeSideKeys[index], jt, &jobstepThreadPool));
271 }
272 else
273 {
274 joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0],
275 largeSideKeys[index][0], jt, &jobstepThreadPool));
276 }
277
278 joiner->setUniqueLimit(uniqueLimit);
279 joiner->setTableName(smallTableNames[index]);
280 joiners[index] = joiner;
281
282 /* check for join types unsupported on the PM. */
283 if (!largeBPS || !isExeMgr)
284 joiner->setInUM(rgData[index]);
285
286 /*
287 start the small runners
288 join them
289 check status
290 handle abort, out of memory, etc
291 */
292
293 /* To measure wall-time spent constructing the small-side tables...
294 boost::posix_time::ptime end_time, start_time =
295 boost::posix_time::microsec_clock::universal_time();
296 */
297
298 stopMemTracking = false;
299 utils::VLArray<uint64_t> jobs(numCores);
300 uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
301 // starting 1 thread when in PM mode, since it's only inserting into a
302 // vector of rows. The rest will be started when converted to UM mode.
303 if (joiner->inUM())
304 for (int i = 0; i < numCores; i++)
305 jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); });
306 else
307 jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); });
308
309 // wait for the first thread to join, then decide whether the others exist and need joining
310 jobstepThreadPool.join(jobs[0]);
311 if (joiner->inUM())
312 for (int i = 1; i < numCores; i++)
313 jobstepThreadPool.join(jobs[i]);
314
315 // stop the monitor thread
316 memTrackMutex.lock();
317 stopMemTracking = true;
318 memTrackDone.notify_one();
319 memTrackMutex.unlock();
320 jobstepThreadPool.join(memMonitor);
321
322 /* If there was an error or an abort, drain the input DL,
323 do endOfInput on the output */
324 if (cancelled())
325 {
326 // cout << "HJ stopping... status is " << status() << endl;
327 if (largeBPS)
328 largeBPS->abort();
329
330 bool more = true;
331 RGData oneRG;
332 while (more)
333 more = smallDLs[index]->next(smallIts[index], &oneRG);
334 }
335
336 /* To measure wall-time spent constructing the small-side tables...
337 end_time = boost::posix_time::microsec_clock::universal_time();
338 if (!(fSessionId & 0x80000000))
339 cout << "hash table construction time = " << end_time - start_time <<
340 " size = " << joiner->size() << endl;
341 */
342
343 extendedInfo += "\n";
344
345 ostringstream oss;
346 if (!joiner->onDisk())
347 {
348 // add extended info, and if not aborted then tell joiner
349 // we're done reading the small side.
350 if (joiner->inPM())
351 {
352 oss << "PM join (" << index << ")" << endl;
353 #ifdef JLF_DEBUG
354 cout << oss.str();
355 #endif
356 extendedInfo += oss.str();
357 }
358 else if (joiner->inUM())
359 {
360 oss << "UM join (" << index << ")" << endl;
361 #ifdef JLF_DEBUG
362 cout << oss.str();
363 #endif
364 extendedInfo += oss.str();
365 }
366 if (!cancelled())
367 joiner->doneInserting();
368 }
369
370 boost::mutex::scoped_lock lk(*fStatsMutexPtr);
371 fExtendedInfo += extendedInfo;
372 formatMiniStats(index);
373 }
374
375 /* Index is which small input to read. */
smallRunnerFcn(uint32_t index,uint threadID,uint64_t * jobs)376 void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t *jobs)
377 {
378 utils::setThreadName("HJSmallRunner");
379 bool more = true;
380 RGData oneRG;
381 Row r;
382 RowGroupDL* smallDL;
383 uint32_t smallIt;
384 RowGroup smallRG;
385 boost::shared_ptr<TupleJoiner> joiner = joiners[index];
386
387 smallDL = smallDLs[index];
388 smallIt = smallIts[index];
389 smallRG = smallRGs[index];
390
391 smallRG.initRow(&r);
392
393 try
394 {
395 ssize_t rgSize;
396 bool gotMem;
397 goto next;
398 while (more && !cancelled())
399 {
400 smallRG.setData(&oneRG);
401 if (smallRG.getRowCount() == 0)
402 goto next;
403
404 // TupleHJ owns the row memory
405 utils::getSpinlock(rgdLock);
406 rgData[index].push_back(oneRG);
407 utils::releaseSpinlock(rgdLock);
408
409 rgSize = smallRG.getSizeWithStrings();
410 atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize);
411 gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false);
412 if (!gotMem)
413 {
414 /* Mem went over the limit.
415 If DML or a syscat query, abort.
416 if disk join is enabled, use it.
417 else abort.
418 */
419 boost::unique_lock<boost::mutex> sl(saneErrMsg);
420 if (cancelled())
421 return;
422 if (!allowDJS || isDML || (fSessionId & 0x80000000) ||
423 (tableOid() < 3000 && tableOid() >= 1000))
424 {
425 joinIsTooBig = true;
426 fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
427 errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
428 status(logging::ERR_JOIN_TOO_BIG);
429 cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
430 abort();
431 }
432 else if (allowDJS)
433 joiner->setConvertToDiskJoin();
434 return;
435 }
436
437 joiner->insertRGData(smallRG, threadID);
438
439 if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit))
440 {
441 joiner->setInUM(rgData[index]);
442 for (int i = 1; i < numCores; i++)
443 jobs[i] = jobstepThreadPool.invoke([this, i, index, jobs]
444 { this->smallRunnerFcn(index, i, jobs); });
445 }
446 next:
447 dlMutex.lock();
448 more = smallDL->next(smallIt, &oneRG);
449 dlMutex.unlock();
450 }
451 }
452 catch (...)
453 {
454 handleException(std::current_exception(),
455 logging::ERR_EXEMGR_MALFUNCTION,
456 logging::ERR_JOIN_TOO_BIG,
457 "TupleHashJoinStep::smallRunnerFcn()");
458 status(logging::ERR_EXEMGR_MALFUNCTION);
459 }
460
461 if (!joiner->inUM())
462 joiner->setInPM();
463 }
464
forwardCPData()465 void TupleHashJoinStep::forwardCPData()
466 {
467 uint32_t i, col;
468
469 if (largeBPS == NULL)
470 return;
471
472 for (i = 0; i < joiners.size(); i++)
473 {
474 if (joiners[i]->antiJoin() || joiners[i]->largeOuterJoin())
475 continue;
476
477 for (col = 0; col < joiners[i]->getSmallKeyColumns().size(); col++)
478 {
479 if (smallRGs[i].isLongString(joiners[i]->getSmallKeyColumns()[col]))
480 continue;
481
482 // @bug3683, not to add CP predicates if large side is not simple column
483 if (fFunctionJoinKeys.find(largeRG.getKeys()[joiners[i]->getLargeKeyColumns()[col]]) !=
484 fFunctionJoinKeys.end())
485 continue;
486
487 largeBPS->addCPPredicates(largeRG.getOIDs()[joiners[i]->getLargeKeyColumns()[col]],
488 joiners[i]->getCPData()[col], !joiners[i]->discreteCPValues()[col]);
489 }
490 }
491 }
492
djsRelayFcn()493 void TupleHashJoinStep::djsRelayFcn()
494 {
495 /*
496 read from largeDL
497 map to largeRG + outputRG format
498 insert into fifos[0]
499 */
500
501 RowGroup djsInputRG = largeRG + outputRG;
502 RowGroup l_largeRG = (tbpsJoiners.empty() ? largeRG : largeRG + outputRG);
503 boost::shared_array<int> relayMapping = makeMapping(l_largeRG, djsInputRG);
504 bool more;
505 RGData inData, outData;
506 Row l_largeRow, djsInputRow;
507 int i;
508
509 l_largeRG.initRow(&l_largeRow);
510 djsInputRG.initRow(&djsInputRow);
511
512 //cout << "Relay started" << endl;
513
514 more = largeDL->next(largeIt, &inData);
515
516 while (more && !cancelled())
517 {
518 l_largeRG.setData(&inData);
519
520 //if (fSessionId < 0x80000000)
521 // cout << "got largeside data = " << l_largeRG.toString() << endl;
522 if (l_largeRG.getRowCount() == 0)
523 goto next;
524
525 outData.reinit(djsInputRG, l_largeRG.getRowCount());
526 djsInputRG.setData(&outData);
527 djsInputRG.resetRowGroup(0);
528 l_largeRG.getRow(0, &l_largeRow);
529 djsInputRG.getRow(0, &djsInputRow);
530
531 for (i = 0; i < (int) l_largeRG.getRowCount(); i++, l_largeRow.nextRow(), djsInputRow.nextRow())
532 {
533 applyMapping(relayMapping, l_largeRow, &djsInputRow);
534 djsInputRG.incRowCount();
535 }
536
537 fifos[0]->insert(outData);
538 next:
539 more = largeDL->next(largeIt, &inData);
540 }
541
542 while (more)
543 more = largeDL->next(largeIt, &inData);
544
545 fifos[0]->endOfInput();
546 }
547
djsReaderFcn(int index)548 void TupleHashJoinStep::djsReaderFcn(int index)
549 {
550 /*
551 read from fifos[index]
552 - incoming rgdata's have outputRG format
553 do FE2 processing
554 put into outputDL, to be picked up by the next JS or nextBand()
555 */
556
557 int it = fifos[index]->getIterator();
558 bool more = true;
559 RowGroup l_outputRG = outputRG;
560 RGData rgData;
561 vector<RGData> v_rgData;
562
563 RowGroup l_fe2RG;
564 Row fe2InRow, fe2OutRow;
565 FuncExpWrapper l_fe;
566
567 if (fe2)
568 {
569 l_fe2RG = fe2Output;
570 l_outputRG.initRow(&fe2InRow);
571 l_fe2RG.initRow(&fe2OutRow);
572 l_fe = *fe2;
573 }
574
575 makeDupList(fe2 ? l_fe2RG : l_outputRG);
576
577 while (!cancelled())
578 {
579 more = fifos[index]->next(it, &rgData);
580
581 if (!more)
582 break;
583
584 l_outputRG.setData(&rgData);
585
586 if (l_outputRG.getRowCount() == 0)
587 continue;
588
589 v_rgData.clear();
590 v_rgData.push_back(rgData);
591
592 if (fe2)
593 processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &v_rgData, &l_fe);
594
595 processDupList(0, (fe2 ? l_fe2RG : l_outputRG), &v_rgData);
596 sendResult(v_rgData);
597 }
598
599 while (more)
600 more = fifos[index]->next(it, &rgData);
601
602 for (int i = 0; i < (int) djsJoiners.size(); i++)
603 {
604 fExtendedInfo += djs[i].extendedInfo();
605 fMiniInfo += djs[i].miniInfo();
606 }
607
608 outputDL->endOfInput();
609 }
610
hjRunner()611 void TupleHashJoinStep::hjRunner()
612 {
613 uint32_t i;
614 std::vector<uint64_t> smallRunners; // thread handles from thread pool
615
616 if (cancelled())
617 {
618 if (fOutputJobStepAssociation.outSize() > 0)
619 fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
620
621 startAdjoiningSteps();
622 deliverMutex.unlock();
623 return;
624 }
625
626 StepTeleStats sts;
627
628 if (fTableOID1 >= 3000)
629 {
630 sts.query_uuid = fQueryUuid;
631 sts.step_uuid = fStepUuid;
632 sts.msg_type = StepTeleStats::ST_START;
633 sts.start_time = QueryTeleClient::timeNowms();
634 sts.total_units_of_work = 1;
635 postStepStartTele(sts);
636 }
637
638 idbassert(joinTypes.size() == smallDLs.size());
639 idbassert(joinTypes.size() == joiners.size());
640
641 /* Start the small-side runners */
642 rgData.reset(new vector<RGData>[smallDLs.size()]);
643 memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]);
644
645 for (i = 0; i < smallDLs.size(); i++)
646 memUsedByEachJoin[i] = 0;
647
648 try
649 {
650 /* Note: the only join that can have a useful small outer table is the last small outer,
651 * the others get clobbered by the join after it. Turn off small outer for 'the others'.
652 * The last small outer can be:
653 * the last small side; or followed by large outer small sides */
654 bool turnOffSmallouter = false;
655
656 for (int j = smallDLs.size() - 1; j >= 0; j--)
657 {
658 if (joinTypes[j] & SMALLOUTER)
659 {
660 if (turnOffSmallouter)
661 {
662 joinTypes[j] &= ~SMALLOUTER;
663 }
664 else // turnOffSmallouter == false, keep this one, but turn off any one in front
665 {
666 lastSmallOuterJoiner = j;
667 turnOffSmallouter = true;
668 }
669 }
670 else if (joinTypes[j] & INNER && turnOffSmallouter == false)
671 {
672 turnOffSmallouter = true;
673 }
674 }
675
676 smallRunners.clear();
677 smallRunners.reserve(smallDLs.size());
678
679 for (i = 0; i < smallDLs.size(); i++)
680 smallRunners.push_back(jobstepThreadPool.invoke(SmallRunner(this, i)));
681 }
682 catch (thread_resource_error&)
683 {
684 string emsg = "TupleHashJoin caught a thread resource error, aborting...\n";
685 errorMessage("too many threads");
686 status(logging::threadResourceErr);
687 errorLogging(emsg, logging::threadResourceErr);
688 fDie = true;
689 deliverMutex.unlock();
690 }
691
692 jobstepThreadPool.join(smallRunners);
693 smallRunners.clear();
694
695 for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++)
696 joiners[feIndexes[i]]->setFcnExpFilter(fe[i]);
697
698 /* segregate the Joiners into ones for TBPS and ones for DJS */
699 segregateJoiners();
700
701 /* Need to clean this stuff up. If the query was cancelled before this, and this would have had
702 a disk join, it's still necessary to construct the DJS objects to finish the abort.
703 Update: Is this more complicated than scanning joiners for either ondisk() or (not isFinished())
704 and draining the corresponding inputs & telling downstream EOF? todo, think about it */
705 if (!djsJoiners.empty())
706 {
707 joinIsTooBig = false;
708
709 if (!cancelled())
710 fLogger->logMessage(logging::LOG_TYPE_INFO, logging::INFO_SWITCHING_TO_DJS);
711
712 uint32_t smallSideCount = djsJoiners.size();
713
714 if (!outputDL)
715 {
716 ownsOutputDL = true;
717 outputDL = new RowGroupDL(1, 5);
718 outputIt = outputDL->getIterator();
719 }
720
721 djs.reset(new DiskJoinStep[smallSideCount]);
722 fifos.reset(new boost::shared_ptr<RowGroupDL>[smallSideCount + 1]);
723
724 for (i = 0; i <= smallSideCount; i++)
725 fifos[i].reset(new RowGroupDL(1, 5));
726
727 boost::mutex::scoped_lock sl(djsLock);
728
729 for (i = 0; i < smallSideCount; i++)
730 {
731 // these link themselves fifos[0]->DSJ[0]->fifos[1]->DSJ[1] ... ->fifos[smallSideCount],
732 // THJS puts data into fifos[0], reads it from fifos[smallSideCount]
733 djs[i] = DiskJoinStep(this, i, djsJoinerMap[i], (i == smallSideCount - 1));
734 }
735
736 sl.unlock();
737
738 try
739 {
740 for (i = 0; !cancelled() && i < smallSideCount; i++)
741 {
742 vector<RGData> empty;
743 resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit);
744 memUsedByEachJoin[djsJoinerMap[i]] = 0;
745 djs[i].loadExistingData(rgData[djsJoinerMap[i]]);
746 rgData[djsJoinerMap[i]].swap(empty);
747 }
748 }
749 catch (...)
750 {
751 handleException(std::current_exception(),
752 logging::ERR_EXEMGR_MALFUNCTION,
753 logging::ERR_JOIN_TOO_BIG,
754 "TupleHashJoinStep::hjRunner()");
755 status(logging::ERR_EXEMGR_MALFUNCTION);
756 abort();
757 }
758
759 if (fe2)
760 fe2Mapping = makeMapping(outputRG, fe2Output);
761
762 bool relay = false, reader = false;
763
764 /* If an error happened loading the existing data, these threads are necessary
765 to finish the abort */
766 try
767 {
768 djsRelay = jobstepThreadPool.invoke(DJSRelay(this));
769 relay = true;
770 djsReader = jobstepThreadPool.invoke(DJSReader(this, smallSideCount));
771 reader = true;
772
773 for (i = 0; i < smallSideCount; i++)
774 djs[i].run();
775 }
776 catch (thread_resource_error&)
777 {
778 /* This means there is a gap somewhere in the chain, need to identify
779 where the gap is, drain the input, and close the output. */
780
781 string emsg = "TupleHashJoin caught a thread resource error, aborting...\n";
782 errorMessage("too many threads");
783 status(logging::threadResourceErr);
784 errorLogging(emsg, logging::threadResourceErr);
785 abort();
786
787 if (reader && relay) // must have been thrown from the djs::run() loop
788 {
789 // fill the gap in the chain: drain input of the failed DJS (i), close the last fifo
790 if (largeBPS)
791 largeDL->endOfInput();
792
793 int it = fifos[i]->getIterator();
794 RGData rg;
795
796 while (fifos[i]->next(it, &rg));
797
798 fifos[smallSideCount]->endOfInput();
799 }
800 else // no DJS's have been started
801 {
802 if (relay)
803 {
804 // drain Relay's output
805 if (largeBPS)
806 largeDL->endOfInput();
807
808 int it = fifos[0]->getIterator();
809 RGData rg;
810
811 while (fifos[0]->next(it, &rg));
812 }
813
814 if (reader)
815 // close Reader's input
816 fifos[smallSideCount]->endOfInput();
817 else // close the next JobStep's input
818 outputDL->endOfInput();
819 }
820 }
821 }
822
823 /* Final THJS configuration is settled here at the moment */
824 deliverMutex.unlock();
825
826 if (cancelled())
827 {
828 if (joinIsTooBig && !status())
829 {
830 fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
831 errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
832 status(logging::ERR_JOIN_TOO_BIG);
833 cout << "Join is too big, raise the UM join limit for now" << endl;
834
835 /* Drop memory */
836 if (!fDelivery)
837 {
838 joiners.clear();
839 tbpsJoiners.clear();
840 rgData.reset();
841 for (uint i = 0; i < smallDLs.size(); i++)
842 {
843 resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
844 memUsedByEachJoin[i] = 0;
845 }
846 }
847 }
848 }
849
850 // todo: forwardCPData needs to grab data from djs
851 if (!djs)
852 forwardCPData(); // this fcn has its own exclusion list
853
854 // decide if perform aggregation on PM
855 if (dynamic_cast<TupleAggregateStep*>(fDeliveryStep.get()) != NULL && largeBPS)
856 {
857 bool pmAggregation = !(dynamic_cast<TupleAggregateStep*>(fDeliveryStep.get())->umOnly());
858
859 for (i = 0; i < joiners.size() && pmAggregation; ++i)
860 pmAggregation = pmAggregation && (joiners[i]->inPM() && !joiners[i]->smallOuterJoin());
861
862 if (pmAggregation)
863 dynamic_cast<TupleAggregateStep*>(fDeliveryStep.get())->setPmHJAggregation(largeBPS);
864 }
865
866 // can we sort the joiners? Currently they all have to be inner joins.
867 // Note, any vars that used to parallel the joiners list will be invalidated
868 // (ie. smallTableNames)
869
870 for (i = 0; i < tbpsJoiners.size(); i++)
871 if (!tbpsJoiners[i]->innerJoin())
872 break;
873
874 if (i == tbpsJoiners.size())
875 sort(tbpsJoiners.begin(), tbpsJoiners.end(), JoinerSorter());
876
877 /* Each thread independently decides whether a given join can execute on the PM.
878 * A PM join can't follow a UM join, so we fix that here.
879 */
880 bool doUM;
881
882 for (i = 0, doUM = false; i < tbpsJoiners.size(); i++)
883 {
884 if (tbpsJoiners[i]->inUM())
885 doUM = true;
886
887 if (tbpsJoiners[i]->inPM() && doUM)
888 {
889 #ifdef JLF_DEBUG
890 cout << "moving join " << i << " to UM (PM join can't follow a UM join)\n";
891 #endif
892 tbpsJoiners[i]->setInUM(rgData[i]);
893 }
894 }
895
896 // there is an in-mem UM or PM join
897 if (largeBPS && !tbpsJoiners.empty())
898 {
899 largeBPS->useJoiners(tbpsJoiners);
900
901 if (djs)
902 largeBPS->setJoinedResultRG(largeRG + outputRG);
903 else
904 largeBPS->setJoinedResultRG(outputRG);
905
906 if (!feIndexes.empty())
907 largeBPS->setJoinFERG(joinFilterRG);
908
909 // cout << "join UM memory available is " << totalUMMemoryUsage << endl;
910
911 /* Figure out whether fe2 can run with the tables joined on the PM. If so,
912 fe2 -> PM, otherwise fe2 -> UM.
913 For now, the alg is "assume if any joins are done on the UM, fe2 has to go on
914 the UM." The structs and logic aren't in place yet to track all of the tables
915 through a joblist. */
916 if (fe2 && !djs)
917 {
918 /* Can't do a small outer join when the PM sends back joined rows */
919 runFE2onPM = true;
920
921 if (joinTypes[joiners.size() - 1] == SMALLOUTER)
922 runFE2onPM = false;
923
924 for (i = 0; i < joiners.size(); i++)
925 if (joiners[i]->inUM())
926 {
927 runFE2onPM = false;
928 break;
929 }
930
931 #ifdef JLF_DEBUG
932 if (runFE2onPM)
933 cout << "PM runs FE2\n";
934 else
935 cout << "UM runs FE2\n";
936 #endif
937 largeBPS->setFcnExpGroup2(fe2, fe2Output, runFE2onPM);
938 }
939 else if (fe2)
940 runFE2onPM = false;
941
942 if (!fDelivery && !djs)
943 {
944 /* connect the largeBPS directly to the next step */
945 JobStepAssociation newJsa;
946 newJsa.outAdd(fOutputJobStepAssociation.outAt(0));
947
948 for (unsigned i = 1; i < largeBPS->outputAssociation().outSize(); i++)
949 newJsa.outAdd(largeBPS->outputAssociation().outAt(i));
950
951 largeBPS->outputAssociation(newJsa);
952 }
953
954 startAdjoiningSteps();
955 }
956 else if (largeBPS)
957 {
958 // there are no in-mem UM or PM joins, only disk-joins
959 startAdjoiningSteps();
960 }
961 else if (!djs)
962 // if there's no largeBPS, all joins are either done by DJS or join threads,
963 // this clause starts the THJS join threads.
964 startJoinThreads();
965
966 if (fTableOID1 >= 3000)
967 {
968 sts.msg_type = StepTeleStats::ST_SUMMARY;
969 sts.end_time = QueryTeleClient::timeNowms();
970 sts.total_units_of_work = sts.units_of_work_completed = 1;
971 postStepSummaryTele(sts);
972 }
973 }
974
nextBand(messageqcpp::ByteStream & bs)975 uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs)
976 {
977 RGData oneRG;
978 bool more;
979 uint32_t ret = 0;
980 RowGroupDL* dl;
981 uint64_t it;
982
983 idbassert(fDelivery);
984
985 boost::mutex::scoped_lock lk(deliverMutex);
986
987 RowGroup* deliveredRG;
988
989 if (fe2)
990 deliveredRG = &fe2Output;
991 else
992 deliveredRG = &outputRG;
993
994 if (largeBPS && !djs)
995 {
996 dl = largeDL;
997 it = largeIt;
998 }
999 else
1000 {
1001 dl = outputDL;
1002 it = outputIt;
1003 }
1004
1005 while (ret == 0)
1006 {
1007 if (cancelled())
1008 {
1009 oneRG.reinit(*deliveredRG, 0);
1010 deliveredRG->setData(&oneRG);
1011 deliveredRG->resetRowGroup(0);
1012 deliveredRG->setStatus(status());
1013 deliveredRG->serializeRGData(bs);
1014 more = true;
1015
1016 while (more)
1017 more = dl->next(it, &oneRG);
1018
1019 joiners.clear();
1020 rgData.reset();
1021 for (uint i = 0; i < smallDLs.size(); i++)
1022 {
1023 resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
1024 memUsedByEachJoin[i] = 0;
1025 }
1026 return 0;
1027 }
1028
1029 more = dl->next(it, &oneRG);
1030
1031 if (!more)
1032 {
1033 joiners.clear();
1034 tbpsJoiners.clear();
1035 rgData.reset();
1036 oneRG.reinit(*deliveredRG, 0);
1037 deliveredRG->setData(&oneRG);
1038 deliveredRG->resetRowGroup(0);
1039 deliveredRG->setStatus(status());
1040
1041 if (status() != 0)
1042 cout << " -- returning error status " << deliveredRG->getStatus() << endl;
1043
1044 deliveredRG->serializeRGData(bs);
1045 for (uint i = 0; i < smallDLs.size(); i++)
1046 {
1047 resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
1048 memUsedByEachJoin[i] = 0;
1049 }
1050 return 0;
1051 }
1052
1053 deliveredRG->setData(&oneRG);
1054 ret = deliveredRG->getRowCount();
1055 }
1056
1057 deliveredRG->serializeRGData(bs);
1058 return ret;
1059 }
1060
setLargeSideBPS(BatchPrimitive * b)1061 void TupleHashJoinStep::setLargeSideBPS(BatchPrimitive* b)
1062 {
1063 largeBPS = dynamic_cast<TupleBPS*>(b);
1064 }
1065
startAdjoiningSteps()1066 void TupleHashJoinStep::startAdjoiningSteps()
1067 {
1068 if (largeBPS)
1069 largeBPS->run();
1070 }
1071
1072 /* TODO: update toString() with the multiple table join info */
toString() const1073 const string TupleHashJoinStep::toString() const
1074 {
1075 ostringstream oss;
1076 size_t idlsz = fInputJobStepAssociation.outSize();
1077 idbassert(idlsz > 1);
1078 oss << "TupleHashJoinStep ses:" << fSessionId << " st:" << fStepId;
1079 oss << omitOidInDL;
1080
1081 for (size_t i = 0; i < idlsz; ++i)
1082 {
1083 RowGroupDL* idl = fInputJobStepAssociation.outAt(i)->rowGroupDL();
1084 CalpontSystemCatalog::OID oidi = 0;
1085
1086 if (idl) oidi = idl->OID();
1087
1088 oss << " in ";
1089
1090 if (largeSideIndex == i)
1091 oss << "*";
1092
1093 oss << "tb/col:" << fTableOID1 << "/" << oidi;
1094 oss << " " << fInputJobStepAssociation.outAt(i);
1095 }
1096
1097 idlsz = fOutputJobStepAssociation.outSize();
1098
1099 if (idlsz > 0)
1100 {
1101 oss << endl << " ";
1102 RowGroupDL* dlo = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
1103 CalpontSystemCatalog::OID oido = 0;
1104
1105 if (dlo) oido = dlo->OID();
1106
1107 oss << " out tb/col:" << fTableOID1 << "/" << oido;
1108 oss << " " << fOutputJobStepAssociation.outAt(0);
1109 }
1110
1111 oss << endl;
1112
1113 return oss.str();
1114 }
1115
1116 //------------------------------------------------------------------------------
1117 // Log specified error to stderr and the critical log
1118 //------------------------------------------------------------------------------
errorLogging(const string & msg,int err) const1119 void TupleHashJoinStep::errorLogging(const string& msg, int err) const
1120 {
1121 ostringstream errMsg;
1122 errMsg << "Step " << stepId() << "; " << msg;
1123 cerr << errMsg.str() << endl;
1124 SErrorInfo errorInfo(new ErrorInfo); // dummy, error info already set by caller.
1125 catchHandler(msg, err, errorInfo, fSessionId);
1126 }
1127
addSmallSideRG(const vector<rowgroup::RowGroup> & rgs,const vector<string> & tnames)1128 void TupleHashJoinStep::addSmallSideRG(const vector<rowgroup::RowGroup>& rgs,
1129 const vector<string>& tnames)
1130 {
1131 smallTableNames.insert(smallTableNames.end(), tnames.begin(), tnames.end());
1132 smallRGs.insert(smallRGs.end(), rgs.begin(), rgs.end());
1133 }
1134
addJoinKeyIndex(const vector<JoinType> & jt,const vector<bool> & typeless,const vector<vector<uint32_t>> & smallkey,const vector<vector<uint32_t>> & largekey)1135 void TupleHashJoinStep::addJoinKeyIndex(const vector<JoinType>& jt,
1136 const vector<bool>& typeless,
1137 const vector<vector<uint32_t> >& smallkey,
1138 const vector<vector<uint32_t> >& largekey)
1139 {
1140 joinTypes.insert(joinTypes.end(), jt.begin(), jt.end());
1141 typelessJoin.insert(typelessJoin.end(), typeless.begin(), typeless.end());
1142 smallSideKeys.insert(smallSideKeys.end(), smallkey.begin(), smallkey.end());
1143 largeSideKeys.insert(largeSideKeys.end(), largekey.begin(), largekey.end());
1144 #ifdef JLF_DEBUG
1145
1146 for (uint32_t i = 0; i < joinTypes.size(); i++)
1147 cout << "jointype[" << i << "] = 0x" << hex << joinTypes[i] << dec << endl;
1148
1149 #endif
1150 }
1151
configSmallSideRG(const vector<RowGroup> & rgs,const vector<string> & tnames)1152 void TupleHashJoinStep::configSmallSideRG(const vector<RowGroup>& rgs, const vector<string>& tnames)
1153 {
1154 smallTableNames.insert(smallTableNames.begin(), tnames.begin(), tnames.end());
1155 smallRGs.insert(smallRGs.begin(), rgs.begin(), rgs.end());
1156 }
1157
configLargeSideRG(const RowGroup & rg)1158 void TupleHashJoinStep::configLargeSideRG(const RowGroup& rg)
1159 {
1160 largeRG = rg;
1161 }
1162
configJoinKeyIndex(const vector<JoinType> & jt,const vector<bool> & typeless,const vector<vector<uint32_t>> & smallkey,const vector<vector<uint32_t>> & largekey)1163 void TupleHashJoinStep::configJoinKeyIndex(const vector<JoinType>& jt,
1164 const vector<bool>& typeless,
1165 const vector<vector<uint32_t> >& smallkey,
1166 const vector<vector<uint32_t> >& largekey)
1167 {
1168 joinTypes.insert(joinTypes.begin(), jt.begin(), jt.end());
1169 typelessJoin.insert(typelessJoin.begin(), typeless.begin(), typeless.end());
1170 smallSideKeys.insert(smallSideKeys.begin(), smallkey.begin(), smallkey.end());
1171 largeSideKeys.insert(largeSideKeys.begin(), largekey.begin(), largekey.end());
1172 #ifdef JLF_DEBUG
1173
1174 for (uint32_t i = 0; i < joinTypes.size(); i++)
1175 cout << "jointype[" << i << "] = 0x" << hex << joinTypes[i] << dec << endl;
1176
1177 #endif
1178 }
1179
setOutputRowGroup(const RowGroup & rg)1180 void TupleHashJoinStep::setOutputRowGroup(const RowGroup& rg)
1181 {
1182 outputRG = rg;
1183 }
1184
smallSideKeyOID(uint32_t s,uint32_t k) const1185 execplan::CalpontSystemCatalog::OID TupleHashJoinStep::smallSideKeyOID(uint32_t s, uint32_t k) const
1186 {
1187 return smallRGs[s].getOIDs()[smallSideKeys[s][k]];
1188 }
1189
largeSideKeyOID(uint32_t s,uint32_t k) const1190 execplan::CalpontSystemCatalog::OID TupleHashJoinStep::largeSideKeyOID(uint32_t s, uint32_t k) const
1191 {
1192 return largeRG.getOIDs()[largeSideKeys[s][k]];
1193 }
1194
addFcnExpGroup2(const boost::shared_ptr<execplan::ParseTree> & fe)1195 void TupleHashJoinStep::addFcnExpGroup2(const boost::shared_ptr<execplan::ParseTree>& fe)
1196 {
1197 if (!fe2)
1198 fe2.reset(new funcexp::FuncExpWrapper());
1199
1200 fe2->addFilter(fe);
1201 }
1202
setFcnExpGroup3(const vector<boost::shared_ptr<execplan::ReturnedColumn>> & v)1203 void TupleHashJoinStep::setFcnExpGroup3(const vector<boost::shared_ptr<execplan::ReturnedColumn> >& v)
1204 {
1205 if (!fe2)
1206 fe2.reset(new funcexp::FuncExpWrapper());
1207
1208 for (uint32_t i = 0; i < v.size(); i++)
1209 fe2->addReturnedColumn(v[i]);
1210 }
1211
setFE23Output(const rowgroup::RowGroup & rg)1212 void TupleHashJoinStep::setFE23Output(const rowgroup::RowGroup& rg)
1213 {
1214 fe2Output = rg;
1215 }
1216
getDeliveredRowGroup() const1217 const rowgroup::RowGroup& TupleHashJoinStep::getDeliveredRowGroup() const
1218 {
1219 if (fe2)
1220 return fe2Output;
1221
1222 return outputRG;
1223 }
1224
deliverStringTableRowGroup(bool b)1225 void TupleHashJoinStep::deliverStringTableRowGroup(bool b)
1226 {
1227 if (fe2)
1228 fe2Output.setUseStringTable(b);
1229
1230 outputRG.setUseStringTable(b);
1231 }
1232
deliverStringTableRowGroup() const1233 bool TupleHashJoinStep::deliverStringTableRowGroup() const
1234 {
1235 if (fe2)
1236 return fe2Output.usesStringTable();
1237
1238 return outputRG.usesStringTable();
1239 }
1240
1241 //Must hold the stats lock when calling this!
formatMiniStats(uint32_t index)1242 void TupleHashJoinStep::formatMiniStats(uint32_t index)
1243 {
1244 ostringstream oss;
1245 oss << "HJS ";
1246
1247 if (joiners[index]->inUM())
1248 oss << "UM ";
1249 else
1250 oss << "PM ";
1251
1252 oss << alias() << "-" << joiners[index]->getTableName() << " ";
1253
1254 if (fTableOID2 >= 3000)
1255 oss << fTableOID2;
1256 else
1257 oss << "- ";
1258
1259 oss << " "
1260 << "- "
1261 << "- "
1262 << "- "
1263 << "- "
1264 // << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1265 // dlTimes are not timed in this step, using '--------' instead.
1266 << "-------- "
1267 << "-\n";
1268 fMiniInfo += oss.str();
1269 }
1270
addJoinFilter(boost::shared_ptr<execplan::ParseTree> pt,uint32_t index)1271 void TupleHashJoinStep::addJoinFilter(boost::shared_ptr<execplan::ParseTree> pt, uint32_t index)
1272 {
1273 boost::shared_ptr<funcexp::FuncExpWrapper> newfe(new funcexp::FuncExpWrapper());
1274
1275 newfe->addFilter(pt);
1276 fe.push_back(newfe);
1277 feIndexes.push_back(index);
1278 }
1279
hasJoinFilter(uint32_t index) const1280 bool TupleHashJoinStep::hasJoinFilter(uint32_t index) const
1281 {
1282 for (uint32_t i = 0; i < feIndexes.size(); i++)
1283 if (feIndexes[i] == static_cast<int>(index))
1284 return true;
1285
1286 return false;
1287 }
1288
getJoinFilter(uint32_t index) const1289 boost::shared_ptr<funcexp::FuncExpWrapper> TupleHashJoinStep::getJoinFilter(uint32_t index) const
1290 {
1291 for (uint32_t i = 0; i < feIndexes.size(); i++)
1292 if (feIndexes[i] == static_cast<int>(index))
1293 return fe[i];
1294
1295 return boost::shared_ptr<funcexp::FuncExpWrapper>();
1296 }
1297
setJoinFilterInputRG(const rowgroup::RowGroup & rg)1298 void TupleHashJoinStep::setJoinFilterInputRG(const rowgroup::RowGroup& rg)
1299 {
1300 joinFilterRG = rg;
1301 }
1302
startJoinThreads()1303 void TupleHashJoinStep::startJoinThreads()
1304 {
1305 uint32_t i;
1306 uint32_t smallSideCount = smallDLs.size();
1307 bool more = true;
1308 RGData oneRG;
1309
1310 if (joinRunners.size() > 0)
1311 return;
1312
1313 //@bug4836, in error case, stop process, and unblock the next step.
1314 if (cancelled())
1315 {
1316 outputDL->endOfInput();
1317
1318 //@bug5785, memory leak on canceling complex queries
1319 while (more)
1320 more = largeDL->next(largeIt, &oneRG);
1321
1322 return;
1323 }
1324
1325 /* Init class-scope vars.
1326 *
1327 * Get a list of small RGs consistent with the joiners.
1328 * Generate small & large mappings for joinFERG and outputRG.
1329 * If fDelivery, create outputDL.
1330 */
1331 for (i = 0; i < smallSideCount; i++)
1332 smallRGs[i] = joiners[i]->getSmallRG();
1333
1334 columnMappings.reset(new shared_array<int>[smallSideCount + 1]);
1335
1336 for (i = 0; i < smallSideCount; i++)
1337 columnMappings[i] = makeMapping(smallRGs[i], outputRG);
1338
1339 columnMappings[smallSideCount] = makeMapping(largeRG, outputRG);
1340
1341 if (!feIndexes.empty())
1342 {
1343 fergMappings.reset(new shared_array<int>[smallSideCount + 1]);
1344
1345 for (i = 0; i < smallSideCount; i++)
1346 fergMappings[i] = makeMapping(smallRGs[i], joinFilterRG);
1347
1348 fergMappings[smallSideCount] = makeMapping(largeRG, joinFilterRG);
1349 }
1350
1351 if (fe2)
1352 fe2Mapping = makeMapping(outputRG, fe2Output);
1353
1354 smallNullMemory.reset(new scoped_array<uint8_t>[smallSideCount]);
1355
1356 for (i = 0; i < smallSideCount; i++)
1357 {
1358 Row smallRow;
1359 smallRGs[i].initRow(&smallRow, true);
1360 smallNullMemory[i].reset(new uint8_t[smallRow.getSize()]);
1361 smallRow.setData(smallNullMemory[i].get());
1362 smallRow.initToNull();
1363 }
1364
1365 for (i = 0; i < smallSideCount; i++)
1366 joiners[i]->setThreadCount(joinThreadCount);
1367
1368 makeDupList(fe2 ? fe2Output : outputRG);
1369
1370 /* Start join runners */
1371 joinRunners.reserve(joinThreadCount);
1372
1373 for (i = 0; i < joinThreadCount; i++)
1374 joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i)));
1375
1376 /* Join them and call endOfInput */
1377 jobstepThreadPool.join(joinRunners);
1378
1379 if (lastSmallOuterJoiner != (uint32_t) - 1)
1380 finishSmallOuterJoin();
1381
1382 outputDL->endOfInput();
1383 }
1384
finishSmallOuterJoin()1385 void TupleHashJoinStep::finishSmallOuterJoin()
1386 {
1387 vector<Row::Pointer> unmatched;
1388 uint32_t smallSideCount = smallDLs.size();
1389 uint32_t i, j, k;
1390 shared_array<uint8_t> largeNullMemory;
1391 RGData joinedData;
1392 Row joinedBaseRow, fe2InRow, fe2OutRow;
1393 shared_array<Row> smallRowTemplates;
1394 shared_array<Row> smallNullRows;
1395 Row largeNullRow;
1396 RowGroup l_outputRG = outputRG;
1397 RowGroup l_fe2Output = fe2Output;
1398
1399 joiners[lastSmallOuterJoiner]->getUnmarkedRows(&unmatched);
1400
1401 if (unmatched.empty())
1402 return;
1403
1404 smallRowTemplates.reset(new Row[smallSideCount]);
1405 smallNullRows.reset(new Row[smallSideCount]);
1406
1407 for (i = 0; i < smallSideCount; i++)
1408 {
1409 smallRGs[i].initRow(&smallRowTemplates[i]);
1410 smallRGs[i].initRow(&smallNullRows[i], true);
1411 smallNullRows[i].setData(smallNullMemory[i].get());
1412 }
1413
1414 largeRG.initRow(&largeNullRow, true);
1415 largeNullMemory.reset(new uint8_t[largeNullRow.getSize()]);
1416 largeNullRow.setData(largeNullMemory.get());
1417 largeNullRow.initToNull();
1418
1419 joinedData.reinit(l_outputRG);
1420 l_outputRG.setData(&joinedData);
1421 l_outputRG.resetRowGroup(0);
1422 l_outputRG.initRow(&joinedBaseRow);
1423 l_outputRG.getRow(0, &joinedBaseRow);
1424
1425 if (fe2)
1426 {
1427 l_outputRG.initRow(&fe2InRow);
1428 fe2Output.initRow(&fe2OutRow);
1429 }
1430
1431 for (j = 0; j < unmatched.size(); j++)
1432 {
1433 smallRowTemplates[lastSmallOuterJoiner].setPointer(unmatched[j]);
1434
1435 for (k = 0; k < smallSideCount; k++)
1436 {
1437 if (k == lastSmallOuterJoiner)
1438 applyMapping(columnMappings[lastSmallOuterJoiner], smallRowTemplates[lastSmallOuterJoiner], &joinedBaseRow);
1439 else
1440 applyMapping(columnMappings[k], smallNullRows[k], &joinedBaseRow);
1441 }
1442
1443 applyMapping(columnMappings[smallSideCount], largeNullRow, &joinedBaseRow);
1444 joinedBaseRow.setRid(0);
1445 joinedBaseRow.nextRow();
1446 l_outputRG.incRowCount();
1447
1448 if (l_outputRG.getRowCount() == 8192)
1449 {
1450 if (fe2)
1451 {
1452 vector<RGData> rgDatav;
1453 rgDatav.push_back(joinedData);
1454 processFE2(l_outputRG, l_fe2Output, fe2InRow, fe2OutRow, &rgDatav, fe2.get());
1455 outputDL->insert(rgDatav[0]);
1456 }
1457 else
1458 {
1459 outputDL->insert(joinedData);
1460 }
1461
1462 joinedData.reinit(l_outputRG);
1463 l_outputRG.setData(&joinedData);
1464 l_outputRG.resetRowGroup(0);
1465 l_outputRG.getRow(0, &joinedBaseRow);
1466 }
1467 }
1468
1469 if (l_outputRG.getRowCount() > 0)
1470 {
1471 if (fe2)
1472 {
1473 vector<RGData> rgDatav;
1474 rgDatav.push_back(joinedData);
1475 processFE2(l_outputRG, l_fe2Output, fe2InRow, fe2OutRow, &rgDatav, fe2.get());
1476 outputDL->insert(rgDatav[0]);
1477 }
1478 else
1479 {
1480 outputDL->insert(joinedData);
1481 }
1482 }
1483 }
1484
joinRunnerFcn(uint32_t threadID)1485 void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID)
1486 {
1487 RowGroup local_inputRG, local_outputRG, local_joinFERG;
1488 uint32_t smallSideCount = smallDLs.size();
1489 vector<RGData> inputData, joinedRowData;
1490 bool hasJoinFE = !fe.empty();
1491 uint32_t i;
1492
1493 /* thread-local scratch space for join processing */
1494 shared_array<uint8_t> joinFERowData;
1495 Row largeRow, joinFERow, joinedRow, baseRow;
1496 shared_array<uint8_t> baseRowData;
1497 vector<vector<Row::Pointer> > joinMatches;
1498 shared_array<Row> smallRowTemplates;
1499
1500 /* F & E vars */
1501 FuncExpWrapper local_fe;
1502 RowGroup local_fe2RG;
1503 Row fe2InRow, fe2OutRow;
1504
1505 joinMatches.resize(smallSideCount);
1506 local_inputRG = largeRG;
1507 local_outputRG = outputRG;
1508 local_inputRG.initRow(&largeRow);
1509 local_outputRG.initRow(&joinedRow);
1510 local_outputRG.initRow(&baseRow, true);
1511 baseRowData.reset(new uint8_t[baseRow.getSize()]);
1512 baseRow.setData(baseRowData.get());
1513
1514 if (hasJoinFE)
1515 {
1516 local_joinFERG = joinFilterRG;
1517 local_joinFERG.initRow(&joinFERow, true);
1518 joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
1519 joinFERow.setData(joinFERowData.get());
1520 }
1521
1522 if (fe2)
1523 {
1524 local_fe2RG = fe2Output;
1525 local_outputRG.initRow(&fe2InRow);
1526 local_fe2RG.initRow(&fe2OutRow);
1527 local_fe = *fe2;
1528 }
1529
1530 smallRowTemplates.reset(new Row[smallSideCount]);
1531
1532 for (i = 0; i < smallSideCount; i++)
1533 smallRGs[i].initRow(&smallRowTemplates[i]);
1534
1535 grabSomeWork(&inputData);
1536
1537 while (!inputData.empty() && !cancelled())
1538 {
1539 for (i = 0; i < inputData.size() && !cancelled(); i++)
1540 {
1541 local_inputRG.setData(&inputData[i]);
1542
1543 if (local_inputRG.getRowCount() == 0)
1544 continue;
1545
1546 joinOneRG(threadID, &joinedRowData, local_inputRG, local_outputRG, largeRow,
1547 joinFERow, joinedRow, baseRow, joinMatches, smallRowTemplates);
1548 }
1549
1550 if (fe2)
1551 processFE2(local_outputRG, local_fe2RG, fe2InRow, fe2OutRow, &joinedRowData, &local_fe);
1552
1553 processDupList(threadID, (fe2 ? local_fe2RG : local_outputRG), &joinedRowData);
1554 sendResult(joinedRowData);
1555 joinedRowData.clear();
1556 grabSomeWork(&inputData);
1557 }
1558
1559 while (!inputData.empty())
1560 grabSomeWork(&inputData);
1561 }
1562
makeDupList(const RowGroup & rg)1563 void TupleHashJoinStep::makeDupList(const RowGroup& rg)
1564 {
1565 uint32_t i, j, cols = rg.getColumnCount();
1566
1567 for (i = 0; i < cols; i++)
1568 for (j = i + 1; j < cols; j++)
1569 if (rg.getKeys()[i] == rg.getKeys()[j])
1570 dupList.push_back(make_pair(j, i));
1571
1572 dupRows.reset(new Row[joinThreadCount]);
1573
1574 for (i = 0; i < joinThreadCount; i++)
1575 rg.initRow(&dupRows[i]);
1576 }
1577
processDupList(uint32_t threadID,RowGroup & rg,vector<RGData> * rowData)1578 void TupleHashJoinStep::processDupList(uint32_t threadID, RowGroup& rg,
1579 vector<RGData>* rowData)
1580 {
1581 uint32_t i, j, k;
1582
1583 if (dupList.empty())
1584 return;
1585
1586 for (i = 0; i < rowData->size(); i++)
1587 {
1588 rg.setData(&(*rowData)[i]);
1589 rg.getRow(0, &dupRows[threadID]);
1590
1591 for (j = 0; j < rg.getRowCount(); j++, dupRows[threadID].nextRow())
1592 for (k = 0; k < dupList.size(); k++)
1593 dupRows[threadID].copyField(dupList[k].first, dupList[k].second);
1594 }
1595 }
1596
processFE2(RowGroup & input,RowGroup & output,Row & inRow,Row & outRow,vector<RGData> * rgData,funcexp::FuncExpWrapper * local_fe)1597 void TupleHashJoinStep::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& outRow,
1598 vector<RGData>* rgData, funcexp::FuncExpWrapper* local_fe)
1599 {
1600 vector<RGData> results;
1601 RGData result;
1602 uint32_t i, j;
1603 bool ret;
1604
1605 result.reinit(output);
1606 output.setData(&result);
1607 output.resetRowGroup(0);
1608 output.getRow(0, &outRow);
1609
1610 for (i = 0; i < rgData->size(); i++)
1611 {
1612 input.setData(&(*rgData)[i]);
1613
1614 if (output.getRowCount() == 0)
1615 {
1616 output.resetRowGroup(input.getBaseRid());
1617 output.setDBRoot(input.getDBRoot());
1618 }
1619
1620 input.getRow(0, &inRow);
1621
1622 for (j = 0; j < input.getRowCount(); j++, inRow.nextRow())
1623 {
1624 ret = local_fe->evaluate(&inRow);
1625
1626 if (ret)
1627 {
1628 applyMapping(fe2Mapping, inRow, &outRow);
1629 output.incRowCount();
1630 outRow.nextRow();
1631
1632 if (output.getRowCount() == 8192)
1633 {
1634 results.push_back(result);
1635 result.reinit(output);
1636 output.setData(&result);
1637 output.resetRowGroup(input.getBaseRid());
1638 output.setDBRoot(input.getDBRoot());
1639 output.getRow(0, &outRow);
1640 }
1641 }
1642 }
1643 }
1644
1645 if (output.getRowCount() > 0)
1646 {
1647 results.push_back(result);
1648 }
1649
1650 rgData->swap(results);
1651 }
1652
sendResult(const vector<RGData> & res)1653 void TupleHashJoinStep::sendResult(const vector<RGData>& res)
1654 {
1655 boost::mutex::scoped_lock lock(outputDLLock);
1656
1657 for (uint32_t i = 0; i < res.size(); i++)
1658 //INSERT_ADAPTER(outputDL, res[i]);
1659 outputDL->insert(res[i]);
1660 }
1661
grabSomeWork(vector<RGData> * work)1662 void TupleHashJoinStep::grabSomeWork(vector<RGData>* work)
1663 {
1664 boost::mutex::scoped_lock lock(inputDLLock);
1665 work->clear();
1666
1667 if (!moreInput)
1668 return;
1669
1670 RGData e;
1671 moreInput = largeDL->next(largeIt, &e);
1672
1673 /* Tunable number here, but it probably won't change things much */
1674 for (uint32_t i = 0; i < 10 && moreInput; i++)
1675 {
1676 work->push_back(e);
1677 moreInput = largeDL->next(largeIt, &e);
1678 }
1679
1680 if (moreInput)
1681 work->push_back(e);
1682 }
1683
1684 /* This function is a port of the main join loop in TupleBPS::receiveMultiPrimitiveMessages(). Any
1685 * changes made here should also be made there and vice versa. */
joinOneRG(uint32_t threadID,vector<RGData> * out,RowGroup & inputRG,RowGroup & joinOutput,Row & largeSideRow,Row & joinFERow,Row & joinedRow,Row & baseRow,vector<vector<Row::Pointer>> & joinMatches,shared_array<Row> & smallRowTemplates,vector<boost::shared_ptr<joiner::TupleJoiner>> * tjoiners,boost::shared_array<boost::shared_array<int>> * rgMappings,boost::shared_array<boost::shared_array<int>> * feMappings,boost::scoped_array<boost::scoped_array<uint8_t>> * smallNullMem)1686 void TupleHashJoinStep::joinOneRG(uint32_t threadID, vector<RGData>* out,
1687 RowGroup& inputRG, RowGroup& joinOutput, Row& largeSideRow, Row& joinFERow,
1688 Row& joinedRow, Row& baseRow, vector<vector<Row::Pointer> >& joinMatches,
1689 shared_array<Row>& smallRowTemplates,
1690 // disk-join support vars. This param list is insane; refactor attempt would be nice at some point.
1691 vector<boost::shared_ptr<joiner::TupleJoiner> >* tjoiners,
1692 boost::shared_array<boost::shared_array<int> >* rgMappings,
1693 boost::shared_array<boost::shared_array<int> >* feMappings,
1694 boost::scoped_array<boost::scoped_array<uint8_t> >* smallNullMem
1695 )
1696 {
1697
1698 /* Disk-join support.
1699 These dissociate the fcn from THJS's members & allow this fcn to be called from DiskJoinStep
1700 */
1701 if (!tjoiners)
1702 tjoiners = &joiners;
1703
1704 if (!rgMappings)
1705 rgMappings = &columnMappings;
1706
1707 if (!feMappings)
1708 feMappings = &fergMappings;
1709
1710 if (!smallNullMem)
1711 smallNullMem = &smallNullMemory;
1712
1713 RGData joinedData;
1714 uint32_t matchCount, smallSideCount = tjoiners->size();
1715 uint32_t j, k;
1716
1717 joinedData.reinit(joinOutput);
1718 joinOutput.setData(&joinedData);
1719 joinOutput.resetRowGroup(inputRG.getBaseRid());
1720 joinOutput.setDBRoot(inputRG.getDBRoot());
1721 inputRG.getRow(0, &largeSideRow);
1722
1723 //cout << "jointype = " << (*tjoiners)[0]->getJoinType() << endl;
1724 for (k = 0; k < inputRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow())
1725 {
1726 //cout << "THJS: Large side row: " << largeSideRow.toString() << endl;
1727 matchCount = 0;
1728
1729 for (j = 0; j < smallSideCount; j++)
1730 {
1731 (*tjoiners)[j]->match(largeSideRow, k, threadID, &joinMatches[j]);
1732 /* Debugging code to print the matches
1733 Row r;
1734 smallRGs[j].initRow(&r);
1735 cout << joinMatches[j].size() << " matches: \n";
1736 for (uint32_t z = 0; z < joinMatches[j].size(); z++) {
1737 r.setData(joinMatches[j][z]);
1738 cout << " " << r.toString() << endl;
1739 }
1740 */
1741 matchCount = joinMatches[j].size();
1742
1743 if ((*tjoiners)[j]->hasFEFilter() && matchCount > 0)
1744 {
1745 //cout << "doing FE filter" << endl;
1746 vector<Row::Pointer> newJoinMatches;
1747 applyMapping((*feMappings)[smallSideCount], largeSideRow, &joinFERow);
1748
1749 for (uint32_t z = 0; z < joinMatches[j].size(); z++)
1750 {
1751 smallRowTemplates[j].setPointer(joinMatches[j][z]);
1752 applyMapping((*feMappings)[j], smallRowTemplates[j], &joinFERow);
1753
1754 if (!(*tjoiners)[j]->evaluateFilter(joinFERow, threadID))
1755 matchCount--;
1756 else
1757 {
1758 /* The first match includes it in a SEMI join result and excludes it from an ANTI join
1759 * result. If it's SEMI & SCALAR however, it needs to continue.
1760 */
1761 newJoinMatches.push_back(joinMatches[j][z]);
1762
1763 if ((*tjoiners)[j]->antiJoin() || ((*tjoiners)[j]->semiJoin() && !(*tjoiners)[j]->scalar()))
1764 break;
1765 }
1766 }
1767
1768 // the filter eliminated all matches, need to join with the NULL row
1769 if (matchCount == 0 && (*tjoiners)[j]->largeOuterJoin())
1770 {
1771 newJoinMatches.clear();
1772 newJoinMatches.push_back(Row::Pointer((*smallNullMem)[j].get()));
1773 matchCount = 1;
1774 }
1775
1776 joinMatches[j].swap(newJoinMatches);
1777 }
1778
1779 /* If anti-join, reverse the result */
1780 if ((*tjoiners)[j]->antiJoin())
1781 {
1782 matchCount = (matchCount ? 0 : 1);
1783 }
1784
1785 if (matchCount == 0)
1786 {
1787 joinMatches[j].clear();
1788 break;
1789 }
1790 else if (!(*tjoiners)[j]->scalar() && ((*tjoiners)[j]->semiJoin() || (*tjoiners)[j]->antiJoin()))
1791 {
1792 joinMatches[j].clear();
1793 joinMatches[j].push_back(Row::Pointer((*smallNullMem)[j].get()));
1794 matchCount = 1;
1795 }
1796
1797 if (matchCount == 0 && (*tjoiners)[j]->innerJoin())
1798 break;
1799
1800 if ((*tjoiners)[j]->scalar() && matchCount > 1)
1801 {
1802 errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_MORE_THAN_1_ROW));
1803 status(logging::ERR_MORE_THAN_1_ROW);
1804 abort();
1805 }
1806
1807 if ((*tjoiners)[j]->smallOuterJoin())
1808 (*tjoiners)[j]->markMatches(threadID, joinMatches[j]);
1809
1810 }
1811
1812 if (matchCount > 0)
1813 {
1814 /* TODO!!! See TupleBPS for the fix for bug 3510! */
1815 applyMapping((*rgMappings)[smallSideCount], largeSideRow, &baseRow);
1816 baseRow.setRid(largeSideRow.getRelRid());
1817 generateJoinResultSet(joinMatches, baseRow, *rgMappings,
1818 0, joinOutput, joinedData, out, smallRowTemplates, joinedRow);
1819 }
1820 }
1821
1822 if (joinOutput.getRowCount() > 0)
1823 out->push_back(joinedData);
1824 }
1825
generateJoinResultSet(const vector<vector<Row::Pointer>> & joinerOutput,Row & baseRow,const shared_array<shared_array<int>> & mappings,const uint32_t depth,RowGroup & l_outputRG,RGData & rgData,vector<RGData> * outputData,const shared_array<Row> & smallRows,Row & joinedRow)1826 void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput,
1827 Row& baseRow, const shared_array<shared_array<int> >& mappings,
1828 const uint32_t depth, RowGroup& l_outputRG, RGData& rgData,
1829 vector<RGData>* outputData, const shared_array<Row>& smallRows,
1830 Row& joinedRow)
1831 {
1832 uint32_t i;
1833 Row& smallRow = smallRows[depth];
1834 uint32_t smallSideCount = joinerOutput.size();
1835
1836 if (depth < smallSideCount - 1)
1837 {
1838 for (i = 0; i < joinerOutput[depth].size(); i++)
1839 {
1840 smallRow.setPointer(joinerOutput[depth][i]);
1841 applyMapping(mappings[depth], smallRow, &baseRow);
1842 // cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl;
1843 generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1,
1844 l_outputRG, rgData, outputData, smallRows, joinedRow);
1845 }
1846 }
1847 else
1848 {
1849 l_outputRG.getRow(l_outputRG.getRowCount(), &joinedRow);
1850
1851 for (i = 0; i < joinerOutput[depth].size(); i++, joinedRow.nextRow(),
1852 l_outputRG.incRowCount())
1853 {
1854 smallRow.setPointer(joinerOutput[depth][i]);
1855
1856 if (UNLIKELY(l_outputRG.getRowCount() == 8192))
1857 {
1858 uint32_t dbRoot = l_outputRG.getDBRoot();
1859 uint64_t baseRid = l_outputRG.getBaseRid();
1860 outputData->push_back(rgData);
1861 rgData.reinit(l_outputRG);
1862 l_outputRG.setData(&rgData);
1863 l_outputRG.resetRowGroup(baseRid);
1864 l_outputRG.setDBRoot(dbRoot);
1865 l_outputRG.getRow(0, &joinedRow);
1866 }
1867
1868 // cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl;
1869 applyMapping(mappings[depth], smallRow, &baseRow);
1870 copyRow(baseRow, &joinedRow);
1871 //memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize());
1872 //cout << "(step " << stepID << ") fully joined row is: " << joinedRow.toString() << endl;
1873 }
1874 }
1875 }
1876
segregateJoiners()1877 void TupleHashJoinStep::segregateJoiners()
1878 {
1879 uint32_t i;
1880 bool allInnerJoins = true;
1881 bool anyTooLarge = false;
1882 uint32_t smallSideCount = smallDLs.size();
1883
1884 for (i = 0; i < smallSideCount; i++)
1885 {
1886 allInnerJoins &= (joinTypes[i] == INNER);
1887 anyTooLarge |= !joiners[i]->isFinished();
1888 }
1889
1890 /* When DDL updates syscat, the syscat checks here are necessary */
1891 if (isDML || !allowDJS || (fSessionId & 0x80000000) ||
1892 (tableOid() < 3000 && tableOid() >= 1000))
1893 {
1894 if (anyTooLarge)
1895 {
1896 joinIsTooBig = true;
1897 abort();
1898 }
1899
1900 tbpsJoiners = joiners;
1901 return;
1902 }
1903
1904 #if 0
1905 // Debugging code, this makes all eligible joins disk-based.
1906 else {
1907 cout << "making all joins disk-based" << endl;
1908 joinIsTooBig = true;
1909 for (i = 0; i < smallSideCount; i++) {
1910 joiner[i]->setConvertToDiskJoin();
1911 djsJoiners.push_back(joiners[i]);
1912 djsJoinerMap.push_back(i);
1913 }
1914 return;
1915 }
1916 #endif
1917
1918 boost::mutex::scoped_lock sl(djsLock);
1919 /* For now if there is no largeBPS all joins need to either be DJS or not, not mixed */
1920 if (!largeBPS)
1921 {
1922 if (anyTooLarge)
1923 {
1924 joinIsTooBig = true;
1925
1926 for (i = 0; i < smallSideCount; i++)
1927 {
1928 joiners[i]->setConvertToDiskJoin();
1929 djsJoiners.push_back(joiners[i]);
1930 djsJoinerMap.push_back(i);
1931 }
1932 }
1933 else
1934 tbpsJoiners = joiners;
1935
1936 return;
1937 }
1938
1939 /* If they are all inner joins they can be segregated w/o respect to
1940 ordering; if they're not, the ordering has to stay consistent therefore
1941 the first joiner that isn't finished and everything after has to be
1942 done by DJS. */
1943
1944 if (allInnerJoins)
1945 {
1946 for (i = 0; i < smallSideCount; i++)
1947 {
1948 //if (joiners[i]->isFinished() && (rand() % 2)) { // for debugging
1949 if (joiners[i]->isFinished())
1950 {
1951 //cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl;
1952 tbpsJoiners.push_back(joiners[i]);
1953 }
1954 else
1955 {
1956 joinIsTooBig = true;
1957 joiners[i]->setConvertToDiskJoin();
1958 //cout << "1joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
1959 djsJoiners.push_back(joiners[i]);
1960 djsJoinerMap.push_back(i);
1961 }
1962 }
1963 }
1964 else
1965 {
1966 //uint limit = rand() % smallSideCount;
1967 for (i = 0; i < smallSideCount; i++)
1968 {
1969 //if (joiners[i]->isFinished() && i < limit) { // debugging
1970 if (joiners[i]->isFinished())
1971 {
1972 //cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl;
1973 tbpsJoiners.push_back(joiners[i]);
1974 }
1975 else
1976 break;
1977 }
1978
1979 for (; i < smallSideCount; i++)
1980 {
1981 joinIsTooBig = true;
1982 joiners[i]->setConvertToDiskJoin();
1983 //cout << "2joiner " << i << " " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
1984 djsJoiners.push_back(joiners[i]);
1985 djsJoinerMap.push_back(i);
1986 }
1987 }
1988 }
1989
abort()1990 void TupleHashJoinStep::abort()
1991 {
1992 JobStep::abort();
1993 boost::mutex::scoped_lock sl(djsLock);
1994
1995 if (djs)
1996 {
1997 for (uint32_t i = 0; i < djsJoiners.size(); i++)
1998 djs[i].abort();
1999 }
2000 }
2001
2002 }
2003 // vim:ts=4 sw=4:
2004