1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (c) 2016-2020 MariaDB
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: tuplehashjoin.cpp 9709 2013-07-20 06:08:46Z xlou $
20 
21 
22 #include <climits>
23 #include <cstdio>
24 #include <ctime>
25 #include <sys/time.h>
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <fcntl.h>
29 #include <iomanip>
30 #include <sstream>
31 #include <stdexcept>
32 #include <unistd.h>
33 //#define NDEBUG
34 #include <cassert>
35 #include <algorithm>
36 using namespace std;
37 
38 #include "jlf_common.h"
39 #include "primitivestep.h"
40 #include "tuplehashjoin.h"
41 #include "calpontsystemcatalog.h"
42 #include "elementcompression.h"
43 #include "resourcemanager.h"
44 #include "tupleaggregatestep.h"
45 #include "errorids.h"
46 #include "diskjoinstep.h"
47 #include "vlarray.h"
48 
49 using namespace execplan;
50 using namespace joiner;
51 using namespace rowgroup;
52 using namespace boost;
53 using namespace funcexp;
54 
55 #include "querytele.h"
56 using namespace querytele;
57 
58 #include "atomicops.h"
59 #include "spinlock.h"
60 
61 namespace joblist
62 {
63 
TupleHashJoinStep(const JobInfo & jobInfo)64 TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
65     JobStep(jobInfo),
66     joinType(INIT),
67     fTableOID1(0),
68     fTableOID2(0),
69     fOid1(0),
70     fOid2(0),
71     fDictOid1(0),
72     fDictOid2(0),
73     fSequence1(-1),
74     fSequence2(-1),
75     fTupleId1(-1),
76     fTupleId2(-1),
77     fCorrelatedSide(0),
78     resourceManager(jobInfo.rm),
79     runRan(false),
80     joinRan(false),
81     largeSideIndex(1),
82     joinIsTooBig(false),
83     isExeMgr(jobInfo.isExeMgr),
84     lastSmallOuterJoiner(-1),
85     fTokenJoin(-1),
86     fStatsMutexPtr(new boost::mutex()),
87     fFunctionJoinKeys(jobInfo.keyInfo->functionJoinKeys),
88     sessionMemLimit(jobInfo.umMemLimit),
89     rgdLock(false)
90 {
91     /* Need to figure out how much memory these use...
92     	Overhead storing 16 byte elements is about 32 bytes.  That
93     	should stay the same for other element sizes.
94     */
95 
96     pmMemLimit = resourceManager->getHjPmMaxMemorySmallSide(fSessionId);
97     uniqueLimit = resourceManager->getHjCPUniqueLimit();
98 
99     fExtendedInfo = "THJS: ";
100     joinType = INIT;
101     joinThreadCount = resourceManager->getJlNumScanReceiveThreads();
102     largeBPS = NULL;
103     moreInput = true;
104     fQtc.stepParms().stepType = StepTeleStats::T_HJS;
105     outputDL = NULL;
106     ownsOutputDL = false;
107     djsSmallUsage = jobInfo.smallSideUsage;
108     djsSmallLimit = jobInfo.smallSideLimit;
109     djsLargeLimit = jobInfo.largeSideLimit;
110     djsPartitionSize = jobInfo.partitionSize;
111     isDML = jobInfo.isDML;
112 
113     config::Config* config = config::Config::makeConfig();
114     string str = config->getConfig("HashJoin", "AllowDiskBasedJoin");
115 
116     if (str.empty() || str == "y" || str == "Y")
117         allowDJS = true;
118     else
119         allowDJS = false;
120 
121     numCores = resourceManager->numCores();
122     if (numCores <= 0)
123         numCores = 8;
124     /* Debugging, rand() is used to simulate failures
125     time_t t = time(NULL);
126     srand(t);
127     */
128 }
129 
~TupleHashJoinStep()130 TupleHashJoinStep::~TupleHashJoinStep()
131 {
132     delete fStatsMutexPtr;
133 
134     if (ownsOutputDL)
135         delete outputDL;
136 
137     if (memUsedByEachJoin)
138     {
139         for (uint i = 0 ; i < smallDLs.size(); i++)
140             resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
141     }
142 
143 
144     //cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl;
145 }
146 
run()147 void TupleHashJoinStep::run()
148 {
149     uint32_t i;
150 
151     boost::mutex::scoped_lock lk(jlLock);
152 
153     if (runRan)
154         return;
155 
156     runRan = true;
157 
158     deliverMutex.lock();
159 
160 // 	cout << "TupleHashJoinStep::run(): fOutputJobStepAssociation.outSize = " << fOutputJobStepAssociation.outSize() << ", fDelivery = " << boolalpha << fDelivery << endl;
161     idbassert((fOutputJobStepAssociation.outSize() == 1 && !fDelivery) ||
162               (fOutputJobStepAssociation.outSize() == 0 && fDelivery));
163     idbassert(fInputJobStepAssociation.outSize() >= 2);
164 
165     largeDL = fInputJobStepAssociation.outAt(largeSideIndex)->rowGroupDL();
166     largeIt = largeDL->getIterator();
167 
168     for (i = 0; i < fInputJobStepAssociation.outSize(); i++)
169     {
170         if (i != largeSideIndex)
171         {
172             smallDLs.push_back(fInputJobStepAssociation.outAt(i)->rowGroupDL());
173             smallIts.push_back(smallDLs.back()->getIterator());
174         }
175     }
176 
177     if (!fDelivery)
178         outputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
179     else if (!largeBPS)
180     {
181         ownsOutputDL = true;
182         outputDL = new RowGroupDL(1, 5);
183         outputIt = outputDL->getIterator();
184     }
185 
186     joiners.resize(smallDLs.size());
187     mainRunner = jobstepThreadPool.invoke(HJRunner(this));
188 }
189 
join()190 void TupleHashJoinStep::join()
191 {
192     boost::mutex::scoped_lock lk(jlLock);
193 
194     if (joinRan)
195         return;
196 
197     joinRan = true;
198     jobstepThreadPool.join(mainRunner);
199 
200     if (djs)
201     {
202         for (int i = 0; i < (int) djsJoiners.size(); i++)
203             djs[i].join();
204 
205         jobstepThreadPool.join(djsReader);
206         jobstepThreadPool.join(djsRelay);
207         //cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl;
208     }
209 }
210 
211 // simple sol'n.  Poll mem usage of Joiner once per second.  Request mem
212 // increase after the fact.  Failure to get mem will be detected and handled by
213 // the threads inserting into Joiner.
trackMem(uint index)214 void TupleHashJoinStep::trackMem(uint index)
215 {
216     boost::shared_ptr<TupleJoiner> joiner = joiners[index];
217     ssize_t memBefore = 0, memAfter = 0;
218     bool gotMem;
219 
220     boost::unique_lock<boost::mutex> scoped(memTrackMutex);
221     while (!stopMemTracking)
222     {
223         memAfter = joiner->getMemUsage();
224         if (memAfter != memBefore)
225         {
226             gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
227             atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
228             memBefore = memAfter;
229             if (!gotMem)
230                 return;
231         }
232         memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1));
233     }
234 
235     // one more iteration to capture mem usage since last poll, for this one
236     // raise an error if mem went over the limit
237     memAfter = joiner->getMemUsage();
238     if (memAfter == memBefore)
239         return;
240     gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
241     atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
242     if (!gotMem)
243     {
244         if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) ||
245             (tableOid() < 3000 && tableOid() >= 1000)))
246         {
247             joinIsTooBig = true;
248             fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
249             errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
250             status(logging::ERR_JOIN_TOO_BIG);
251             cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl;
252             abort();
253         }
254     }
255 }
256 
startSmallRunners(uint index)257 void TupleHashJoinStep::startSmallRunners(uint index)
258 {
259     utils::setThreadName("HJSStartSmall");
260     string extendedInfo;
261     JoinType jt;
262     boost::shared_ptr<TupleJoiner> joiner;
263 
264     jt = joinTypes[index];
265     extendedInfo += toString();
266 
267     if (typelessJoin[index])
268     {
269         joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index],
270                                      largeSideKeys[index], jt, &jobstepThreadPool));
271     }
272     else
273     {
274         joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0],
275                                      largeSideKeys[index][0], jt, &jobstepThreadPool));
276     }
277 
278     joiner->setUniqueLimit(uniqueLimit);
279     joiner->setTableName(smallTableNames[index]);
280     joiners[index] = joiner;
281 
282     /* check for join types unsupported on the PM. */
283     if (!largeBPS || !isExeMgr)
284         joiner->setInUM(rgData[index]);
285 
286     /*
287         start the small runners
288         join them
289         check status
290         handle abort, out of memory, etc
291     */
292 
293     /* To measure wall-time spent constructing the small-side tables...
294     boost::posix_time::ptime end_time, start_time =
295         boost::posix_time::microsec_clock::universal_time();
296     */
297 
298     stopMemTracking = false;
299     utils::VLArray<uint64_t> jobs(numCores);
300     uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
301     // starting 1 thread when in PM mode, since it's only inserting into a
302     // vector of rows.  The rest will be started when converted to UM mode.
303     if (joiner->inUM())
304         for (int i = 0; i < numCores; i++)
305             jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); });
306     else
307         jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); });
308 
309     // wait for the first thread to join, then decide whether the others exist and need joining
310     jobstepThreadPool.join(jobs[0]);
311     if (joiner->inUM())
312         for (int i = 1; i < numCores; i++)
313             jobstepThreadPool.join(jobs[i]);
314 
315     // stop the monitor thread
316     memTrackMutex.lock();
317     stopMemTracking = true;
318     memTrackDone.notify_one();
319     memTrackMutex.unlock();
320     jobstepThreadPool.join(memMonitor);
321 
322     /* If there was an error or an abort, drain the input DL,
323         do endOfInput on the output */
324     if (cancelled())
325     {
326 //		cout << "HJ stopping... status is " << status() << endl;
327         if (largeBPS)
328             largeBPS->abort();
329 
330         bool more = true;
331         RGData oneRG;
332         while (more)
333             more = smallDLs[index]->next(smallIts[index], &oneRG);
334     }
335 
336     /* To measure wall-time spent constructing the small-side tables...
337     end_time = boost::posix_time::microsec_clock::universal_time();
338     if (!(fSessionId & 0x80000000))
339         cout << "hash table construction time = " << end_time - start_time <<
340         " size = " << joiner->size() << endl;
341     */
342 
343     extendedInfo += "\n";
344 
345     ostringstream oss;
346     if (!joiner->onDisk())
347     {
348         // add extended info, and if not aborted then tell joiner
349         // we're done reading the small side.
350         if (joiner->inPM())
351         {
352             oss << "PM join (" << index << ")" << endl;
353             #ifdef JLF_DEBUG
354             cout << oss.str();
355             #endif
356             extendedInfo += oss.str();
357         }
358         else if (joiner->inUM())
359         {
360             oss << "UM join (" << index << ")" << endl;
361             #ifdef JLF_DEBUG
362             cout << oss.str();
363             #endif
364             extendedInfo += oss.str();
365         }
366         if (!cancelled())
367             joiner->doneInserting();
368     }
369 
370     boost::mutex::scoped_lock lk(*fStatsMutexPtr);
371     fExtendedInfo += extendedInfo;
372     formatMiniStats(index);
373 }
374 
375 /* Index is which small input to read. */
smallRunnerFcn(uint32_t index,uint threadID,uint64_t * jobs)376 void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t *jobs)
377 {
378     utils::setThreadName("HJSmallRunner");
379     bool more = true;
380     RGData oneRG;
381     Row r;
382     RowGroupDL* smallDL;
383     uint32_t smallIt;
384     RowGroup smallRG;
385     boost::shared_ptr<TupleJoiner> joiner = joiners[index];
386 
387     smallDL = smallDLs[index];
388     smallIt = smallIts[index];
389     smallRG = smallRGs[index];
390 
391     smallRG.initRow(&r);
392 
393     try
394     {
395         ssize_t rgSize;
396         bool gotMem;
397         goto next;
398         while (more && !cancelled())
399         {
400             smallRG.setData(&oneRG);
401             if (smallRG.getRowCount() == 0)
402                 goto next;
403 
404             // TupleHJ owns the row memory
405             utils::getSpinlock(rgdLock);
406             rgData[index].push_back(oneRG);
407             utils::releaseSpinlock(rgdLock);
408 
409             rgSize = smallRG.getSizeWithStrings();
410             atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize);
411             gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false);
412             if (!gotMem)
413             {
414                 /*  Mem went over the limit.
415                     If DML or a syscat query, abort.
416                     if disk join is enabled, use it.
417                     else abort.
418                 */
419                 boost::unique_lock<boost::mutex> sl(saneErrMsg);
420                 if (cancelled())
421                     return;
422                 if (!allowDJS || isDML || (fSessionId & 0x80000000) ||
423                     (tableOid() < 3000 && tableOid() >= 1000))
424                 {
425                     joinIsTooBig = true;
426                     fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
427                     errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
428                     status(logging::ERR_JOIN_TOO_BIG);
429                     cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
430                     abort();
431                 }
432                 else if (allowDJS)
433                     joiner->setConvertToDiskJoin();
434                 return;
435             }
436 
437             joiner->insertRGData(smallRG, threadID);
438 
439             if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit))
440             {
441                 joiner->setInUM(rgData[index]);
442                 for (int i = 1; i < numCores; i++)
443                     jobs[i] = jobstepThreadPool.invoke([this, i, index, jobs]
444                         { this->smallRunnerFcn(index, i, jobs); });
445             }
446 next:
447             dlMutex.lock();
448             more = smallDL->next(smallIt, &oneRG);
449             dlMutex.unlock();
450         }
451     }
452     catch (...)
453     {
454         handleException(std::current_exception(),
455                         logging::ERR_EXEMGR_MALFUNCTION,
456                         logging::ERR_JOIN_TOO_BIG,
457                         "TupleHashJoinStep::smallRunnerFcn()");
458         status(logging::ERR_EXEMGR_MALFUNCTION);
459     }
460 
461     if (!joiner->inUM())
462         joiner->setInPM();
463 }
464 
forwardCPData()465 void TupleHashJoinStep::forwardCPData()
466 {
467     uint32_t i, col;
468 
469     if (largeBPS == NULL)
470         return;
471 
472     for (i = 0; i < joiners.size(); i++)
473     {
474         if (joiners[i]->antiJoin() || joiners[i]->largeOuterJoin())
475             continue;
476 
477         for (col = 0; col < joiners[i]->getSmallKeyColumns().size(); col++)
478         {
479             if (smallRGs[i].isLongString(joiners[i]->getSmallKeyColumns()[col]))
480                 continue;
481 
482             // @bug3683, not to add CP predicates if large side is not simple column
483             if (fFunctionJoinKeys.find(largeRG.getKeys()[joiners[i]->getLargeKeyColumns()[col]]) !=
484                     fFunctionJoinKeys.end())
485                 continue;
486 
487             largeBPS->addCPPredicates(largeRG.getOIDs()[joiners[i]->getLargeKeyColumns()[col]],
488                                       joiners[i]->getCPData()[col], !joiners[i]->discreteCPValues()[col]);
489         }
490     }
491 }
492 
djsRelayFcn()493 void TupleHashJoinStep::djsRelayFcn()
494 {
495     /*
496     	read from largeDL
497     	map to largeRG + outputRG format
498     	insert into fifos[0]
499     */
500 
501     RowGroup djsInputRG = largeRG + outputRG;
502     RowGroup l_largeRG = (tbpsJoiners.empty() ? largeRG : largeRG + outputRG);
503     boost::shared_array<int> relayMapping = makeMapping(l_largeRG, djsInputRG);
504     bool more;
505     RGData inData, outData;
506     Row l_largeRow, djsInputRow;
507     int i;
508 
509     l_largeRG.initRow(&l_largeRow);
510     djsInputRG.initRow(&djsInputRow);
511 
512     //cout << "Relay started" << endl;
513 
514     more = largeDL->next(largeIt, &inData);
515 
516     while (more && !cancelled())
517     {
518         l_largeRG.setData(&inData);
519 
520         //if (fSessionId < 0x80000000)
521         //	cout << "got largeside data = " << l_largeRG.toString() << endl;
522         if (l_largeRG.getRowCount() == 0)
523             goto next;
524 
525         outData.reinit(djsInputRG, l_largeRG.getRowCount());
526         djsInputRG.setData(&outData);
527         djsInputRG.resetRowGroup(0);
528         l_largeRG.getRow(0, &l_largeRow);
529         djsInputRG.getRow(0, &djsInputRow);
530 
531         for (i = 0; i < (int) l_largeRG.getRowCount(); i++, l_largeRow.nextRow(), djsInputRow.nextRow())
532         {
533             applyMapping(relayMapping, l_largeRow, &djsInputRow);
534             djsInputRG.incRowCount();
535         }
536 
537         fifos[0]->insert(outData);
538 next:
539         more = largeDL->next(largeIt, &inData);
540     }
541 
542     while (more)
543         more = largeDL->next(largeIt, &inData);
544 
545     fifos[0]->endOfInput();
546 }
547 
djsReaderFcn(int index)548 void TupleHashJoinStep::djsReaderFcn(int index)
549 {
550     /*
551     	read from fifos[index]
552     	   - incoming rgdata's have outputRG format
553     	do FE2 processing
554     	put into outputDL, to be picked up by the next JS or nextBand()
555     */
556 
557     int it = fifos[index]->getIterator();
558     bool more = true;
559     RowGroup l_outputRG = outputRG;
560     RGData rgData;
561     vector<RGData> v_rgData;
562 
563     RowGroup l_fe2RG;
564     Row fe2InRow, fe2OutRow;
565     FuncExpWrapper l_fe;
566 
567     if (fe2)
568     {
569         l_fe2RG = fe2Output;
570         l_outputRG.initRow(&fe2InRow);
571         l_fe2RG.initRow(&fe2OutRow);
572         l_fe = *fe2;
573     }
574 
575     makeDupList(fe2 ? l_fe2RG : l_outputRG);
576 
577     while (!cancelled())
578     {
579         more = fifos[index]->next(it, &rgData);
580 
581         if (!more)
582             break;
583 
584         l_outputRG.setData(&rgData);
585 
586         if (l_outputRG.getRowCount() == 0)
587             continue;
588 
589         v_rgData.clear();
590         v_rgData.push_back(rgData);
591 
592         if (fe2)
593             processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &v_rgData, &l_fe);
594 
595         processDupList(0, (fe2 ? l_fe2RG : l_outputRG), &v_rgData);
596         sendResult(v_rgData);
597     }
598 
599     while (more)
600         more = fifos[index]->next(it, &rgData);
601 
602     for (int i = 0; i < (int) djsJoiners.size(); i++)
603     {
604         fExtendedInfo += djs[i].extendedInfo();
605         fMiniInfo += djs[i].miniInfo();
606     }
607 
608     outputDL->endOfInput();
609 }
610 
hjRunner()611 void TupleHashJoinStep::hjRunner()
612 {
613     uint32_t i;
614     std::vector<uint64_t> smallRunners; // thread handles from thread pool
615 
616     if (cancelled())
617     {
618         if (fOutputJobStepAssociation.outSize() > 0)
619             fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
620 
621         startAdjoiningSteps();
622         deliverMutex.unlock();
623         return;
624     }
625 
626     StepTeleStats sts;
627 
628     if (fTableOID1 >= 3000)
629     {
630         sts.query_uuid = fQueryUuid;
631         sts.step_uuid = fStepUuid;
632         sts.msg_type = StepTeleStats::ST_START;
633         sts.start_time = QueryTeleClient::timeNowms();
634         sts.total_units_of_work = 1;
635         postStepStartTele(sts);
636     }
637 
638     idbassert(joinTypes.size() == smallDLs.size());
639     idbassert(joinTypes.size() == joiners.size());
640 
641     /* Start the small-side runners */
642     rgData.reset(new vector<RGData>[smallDLs.size()]);
643     memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]);
644 
645     for (i = 0; i < smallDLs.size(); i++)
646         memUsedByEachJoin[i] = 0;
647 
648     try
649     {
650         /* Note: the only join that can have a useful small outer table is the last small outer,
651          * the others get clobbered by the join after it. Turn off small outer for 'the others'.
652          * The last small outer can be:
653          *     the last small side; or followed by large outer small sides */
654         bool turnOffSmallouter = false;
655 
656         for (int j = smallDLs.size() - 1; j >= 0; j--)
657         {
658             if (joinTypes[j] & SMALLOUTER)
659             {
660                 if (turnOffSmallouter)
661                 {
662                     joinTypes[j] &= ~SMALLOUTER;
663                 }
664                 else   // turnOffSmallouter == false, keep this one, but turn off any one in front
665                 {
666                     lastSmallOuterJoiner = j;
667                     turnOffSmallouter = true;
668                 }
669             }
670             else if (joinTypes[j] & INNER && turnOffSmallouter == false)
671             {
672                 turnOffSmallouter = true;
673             }
674         }
675 
676         smallRunners.clear();
677         smallRunners.reserve(smallDLs.size());
678 
679         for (i = 0; i < smallDLs.size(); i++)
680             smallRunners.push_back(jobstepThreadPool.invoke(SmallRunner(this, i)));
681     }
682     catch (thread_resource_error&)
683     {
684         string emsg = "TupleHashJoin caught a thread resource error, aborting...\n";
685         errorMessage("too many threads");
686         status(logging::threadResourceErr);
687         errorLogging(emsg, logging::threadResourceErr);
688         fDie = true;
689         deliverMutex.unlock();
690     }
691 
692     jobstepThreadPool.join(smallRunners);
693     smallRunners.clear();
694 
695     for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++)
696         joiners[feIndexes[i]]->setFcnExpFilter(fe[i]);
697 
698     /* segregate the Joiners into ones for TBPS and ones for DJS */
699     segregateJoiners();
700 
701     /* Need to clean this stuff up.  If the query was cancelled before this, and this would have had
702        a disk join, it's still necessary to construct the DJS objects to finish the abort.
703        Update: Is this more complicated than scanning joiners for either ondisk() or (not isFinished())
704        and draining the corresponding inputs & telling downstream EOF?  todo, think about it */
705     if (!djsJoiners.empty())
706     {
707         joinIsTooBig = false;
708 
709         if (!cancelled())
710             fLogger->logMessage(logging::LOG_TYPE_INFO, logging::INFO_SWITCHING_TO_DJS);
711 
712         uint32_t smallSideCount = djsJoiners.size();
713 
714         if (!outputDL)
715         {
716             ownsOutputDL = true;
717             outputDL = new RowGroupDL(1, 5);
718             outputIt = outputDL->getIterator();
719         }
720 
721         djs.reset(new DiskJoinStep[smallSideCount]);
722         fifos.reset(new boost::shared_ptr<RowGroupDL>[smallSideCount + 1]);
723 
724         for (i = 0; i <= smallSideCount; i++)
725             fifos[i].reset(new RowGroupDL(1, 5));
726 
727         boost::mutex::scoped_lock sl(djsLock);
728 
729         for (i = 0; i < smallSideCount; i++)
730         {
731             // these link themselves fifos[0]->DSJ[0]->fifos[1]->DSJ[1] ... ->fifos[smallSideCount],
732             // THJS puts data into fifos[0], reads it from fifos[smallSideCount]
733             djs[i] = DiskJoinStep(this, i, djsJoinerMap[i], (i == smallSideCount - 1));
734         }
735 
736         sl.unlock();
737 
738         try
739         {
740             for (i = 0; !cancelled() && i < smallSideCount; i++)
741             {
742                 vector<RGData> empty;
743                 resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit);
744                 memUsedByEachJoin[djsJoinerMap[i]] = 0;
745                 djs[i].loadExistingData(rgData[djsJoinerMap[i]]);
746                 rgData[djsJoinerMap[i]].swap(empty);
747             }
748         }
749         catch (...)
750         {
751             handleException(std::current_exception(),
752                             logging::ERR_EXEMGR_MALFUNCTION,
753                             logging::ERR_JOIN_TOO_BIG,
754                             "TupleHashJoinStep::hjRunner()");
755             status(logging::ERR_EXEMGR_MALFUNCTION);
756             abort();
757         }
758 
759         if (fe2)
760             fe2Mapping = makeMapping(outputRG, fe2Output);
761 
762         bool relay = false, reader = false;
763 
764         /* If an error happened loading the existing data, these threads are necessary
765         to finish the abort */
766         try
767         {
768             djsRelay = jobstepThreadPool.invoke(DJSRelay(this));
769             relay = true;
770             djsReader = jobstepThreadPool.invoke(DJSReader(this, smallSideCount));
771             reader = true;
772 
773             for (i = 0; i < smallSideCount; i++)
774                 djs[i].run();
775         }
776         catch (thread_resource_error&)
777         {
778             /* This means there is a gap somewhere in the chain, need to identify
779                where the gap is, drain the input, and close the output. */
780 
781             string emsg = "TupleHashJoin caught a thread resource error, aborting...\n";
782             errorMessage("too many threads");
783             status(logging::threadResourceErr);
784             errorLogging(emsg, logging::threadResourceErr);
785             abort();
786 
787             if (reader && relay)     // must have been thrown from the djs::run() loop
788             {
789                 // fill the gap in the chain: drain input of the failed DJS (i), close the last fifo
790                 if (largeBPS)
791                     largeDL->endOfInput();
792 
793                 int it = fifos[i]->getIterator();
794                 RGData rg;
795 
796                 while (fifos[i]->next(it, &rg));
797 
798                 fifos[smallSideCount]->endOfInput();
799             }
800             else     // no DJS's have been started
801             {
802                 if (relay)
803                 {
804                     // drain Relay's output
805                     if (largeBPS)
806                         largeDL->endOfInput();
807 
808                     int it = fifos[0]->getIterator();
809                     RGData rg;
810 
811                     while (fifos[0]->next(it, &rg));
812                 }
813 
814                 if (reader)
815                     // close Reader's input
816                     fifos[smallSideCount]->endOfInput();
817                 else  // close the next JobStep's input
818                     outputDL->endOfInput();
819             }
820         }
821     }
822 
823     /* Final THJS configuration is settled here at the moment */
824     deliverMutex.unlock();
825 
826     if (cancelled())
827     {
828         if (joinIsTooBig && !status())
829         {
830             fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
831             errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
832             status(logging::ERR_JOIN_TOO_BIG);
833             cout << "Join is too big, raise the UM join limit for now" << endl;
834 
835             /* Drop memory */
836             if (!fDelivery)
837             {
838                 joiners.clear();
839                 tbpsJoiners.clear();
840                 rgData.reset();
841                 for (uint i = 0; i < smallDLs.size(); i++)
842                 {
843                     resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
844                     memUsedByEachJoin[i] = 0;
845                 }
846             }
847         }
848     }
849 
850     // todo: forwardCPData needs to grab data from djs
851     if (!djs)
852         forwardCPData(); 	// this fcn has its own exclusion list
853 
854     // decide if perform aggregation on PM
855     if (dynamic_cast<TupleAggregateStep*>(fDeliveryStep.get()) != NULL && largeBPS)
856     {
857         bool pmAggregation = !(dynamic_cast<TupleAggregateStep*>(fDeliveryStep.get())->umOnly());
858 
859         for (i = 0; i < joiners.size() && pmAggregation; ++i)
860             pmAggregation = pmAggregation && (joiners[i]->inPM() && !joiners[i]->smallOuterJoin());
861 
862         if (pmAggregation)
863             dynamic_cast<TupleAggregateStep*>(fDeliveryStep.get())->setPmHJAggregation(largeBPS);
864     }
865 
866     // can we sort the joiners?  Currently they all have to be inner joins.
867     // Note, any vars that used to parallel the joiners list will be invalidated
868     // (ie. smallTableNames)
869 
870     for (i = 0; i < tbpsJoiners.size(); i++)
871         if (!tbpsJoiners[i]->innerJoin())
872             break;
873 
874     if (i == tbpsJoiners.size())
875         sort(tbpsJoiners.begin(), tbpsJoiners.end(), JoinerSorter());
876 
877     /* Each thread independently decides whether a given join can execute on the PM.
878      * A PM join can't follow a UM join, so we fix that here.
879      */
880     bool doUM;
881 
882     for (i = 0, doUM = false; i < tbpsJoiners.size(); i++)
883     {
884         if (tbpsJoiners[i]->inUM())
885             doUM = true;
886 
887         if (tbpsJoiners[i]->inPM() && doUM)
888         {
889 #ifdef JLF_DEBUG
890             cout << "moving join " << i << " to UM (PM join can't follow a UM join)\n";
891 #endif
892             tbpsJoiners[i]->setInUM(rgData[i]);
893         }
894     }
895 
896     // there is an in-mem UM or PM join
897     if (largeBPS && !tbpsJoiners.empty())
898     {
899         largeBPS->useJoiners(tbpsJoiners);
900 
901         if (djs)
902             largeBPS->setJoinedResultRG(largeRG + outputRG);
903         else
904             largeBPS->setJoinedResultRG(outputRG);
905 
906         if (!feIndexes.empty())
907             largeBPS->setJoinFERG(joinFilterRG);
908 
909 // 		cout << "join UM memory available is " << totalUMMemoryUsage << endl;
910 
911         /* Figure out whether fe2 can run with the tables joined on the PM.  If so,
912         fe2 -> PM, otherwise fe2 -> UM.
913         For now, the alg is "assume if any joins are done on the UM, fe2 has to go on
914         the UM."  The structs and logic aren't in place yet to track all of the tables
915         through a joblist. */
916         if (fe2 && !djs)
917         {
918             /* Can't do a small outer join when the PM sends back joined rows */
919             runFE2onPM = true;
920 
921             if (joinTypes[joiners.size() - 1] == SMALLOUTER)
922                 runFE2onPM = false;
923 
924             for (i = 0; i < joiners.size(); i++)
925                 if (joiners[i]->inUM())
926                 {
927                     runFE2onPM = false;
928                     break;
929                 }
930 
931 #ifdef JLF_DEBUG
932             if (runFE2onPM)
933                 cout << "PM runs FE2\n";
934             else
935                 cout << "UM runs FE2\n";
936 #endif
937             largeBPS->setFcnExpGroup2(fe2, fe2Output, runFE2onPM);
938         }
939         else if (fe2)
940             runFE2onPM = false;
941 
942         if (!fDelivery && !djs)
943         {
944             /* connect the largeBPS directly to the next step */
945             JobStepAssociation newJsa;
946             newJsa.outAdd(fOutputJobStepAssociation.outAt(0));
947 
948             for (unsigned i = 1; i < largeBPS->outputAssociation().outSize(); i++)
949                 newJsa.outAdd(largeBPS->outputAssociation().outAt(i));
950 
951             largeBPS->outputAssociation(newJsa);
952         }
953 
954         startAdjoiningSteps();
955     }
956     else if (largeBPS)
957     {
958         // there are no in-mem UM or PM joins, only disk-joins
959         startAdjoiningSteps();
960     }
961     else if (!djs)
962         // if there's no largeBPS, all joins are either done by DJS or join threads,
963         // this clause starts the THJS join threads.
964         startJoinThreads();
965 
966     if (fTableOID1 >= 3000)
967     {
968         sts.msg_type = StepTeleStats::ST_SUMMARY;
969         sts.end_time = QueryTeleClient::timeNowms();
970         sts.total_units_of_work = sts.units_of_work_completed = 1;
971         postStepSummaryTele(sts);
972     }
973 }
974 
nextBand(messageqcpp::ByteStream & bs)975 uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs)
976 {
977     RGData oneRG;
978     bool more;
979     uint32_t ret = 0;
980     RowGroupDL* dl;
981     uint64_t it;
982 
983     idbassert(fDelivery);
984 
985     boost::mutex::scoped_lock lk(deliverMutex);
986 
987     RowGroup* deliveredRG;
988 
989     if (fe2)
990         deliveredRG = &fe2Output;
991     else
992         deliveredRG = &outputRG;
993 
994     if (largeBPS && !djs)
995     {
996         dl = largeDL;
997         it = largeIt;
998     }
999     else
1000     {
1001         dl = outputDL;
1002         it = outputIt;
1003     }
1004 
1005     while (ret == 0)
1006     {
1007         if (cancelled())
1008         {
1009             oneRG.reinit(*deliveredRG, 0);
1010             deliveredRG->setData(&oneRG);
1011             deliveredRG->resetRowGroup(0);
1012             deliveredRG->setStatus(status());
1013             deliveredRG->serializeRGData(bs);
1014             more = true;
1015 
1016             while (more)
1017                 more = dl->next(it, &oneRG);
1018 
1019             joiners.clear();
1020             rgData.reset();
1021             for (uint i = 0; i < smallDLs.size(); i++)
1022             {
1023                 resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
1024                 memUsedByEachJoin[i] = 0;
1025             }
1026             return 0;
1027         }
1028 
1029         more = dl->next(it, &oneRG);
1030 
1031         if (!more)
1032         {
1033             joiners.clear();
1034             tbpsJoiners.clear();
1035             rgData.reset();
1036             oneRG.reinit(*deliveredRG, 0);
1037             deliveredRG->setData(&oneRG);
1038             deliveredRG->resetRowGroup(0);
1039             deliveredRG->setStatus(status());
1040 
1041             if (status() != 0)
1042                 cout << " -- returning error status " << deliveredRG->getStatus() << endl;
1043 
1044             deliveredRG->serializeRGData(bs);
1045             for (uint i = 0; i < smallDLs.size(); i++)
1046             {
1047                 resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
1048                 memUsedByEachJoin[i] = 0;
1049             }
1050             return 0;
1051         }
1052 
1053         deliveredRG->setData(&oneRG);
1054         ret = deliveredRG->getRowCount();
1055     }
1056 
1057     deliveredRG->serializeRGData(bs);
1058     return ret;
1059 }
1060 
setLargeSideBPS(BatchPrimitive * b)1061 void TupleHashJoinStep::setLargeSideBPS(BatchPrimitive* b)
1062 {
1063     largeBPS = dynamic_cast<TupleBPS*>(b);
1064 }
1065 
startAdjoiningSteps()1066 void TupleHashJoinStep::startAdjoiningSteps()
1067 {
1068     if (largeBPS)
1069         largeBPS->run();
1070 }
1071 
1072 /* TODO: update toString() with the multiple table join info */
toString() const1073 const string TupleHashJoinStep::toString() const
1074 {
1075     ostringstream oss;
1076     size_t idlsz = fInputJobStepAssociation.outSize();
1077     idbassert(idlsz > 1);
1078     oss << "TupleHashJoinStep    ses:" << fSessionId << " st:" << fStepId;
1079     oss << omitOidInDL;
1080 
1081     for (size_t i = 0; i < idlsz; ++i)
1082     {
1083         RowGroupDL* idl = fInputJobStepAssociation.outAt(i)->rowGroupDL();
1084         CalpontSystemCatalog::OID oidi = 0;
1085 
1086         if (idl) oidi = idl->OID();
1087 
1088         oss << " in ";
1089 
1090         if (largeSideIndex == i)
1091             oss << "*";
1092 
1093         oss << "tb/col:" << fTableOID1 << "/" << oidi;
1094         oss << " " << fInputJobStepAssociation.outAt(i);
1095     }
1096 
1097     idlsz = fOutputJobStepAssociation.outSize();
1098 
1099     if (idlsz > 0)
1100     {
1101         oss << endl << "					";
1102         RowGroupDL* dlo = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
1103         CalpontSystemCatalog::OID oido = 0;
1104 
1105         if (dlo) oido = dlo->OID();
1106 
1107         oss << " out tb/col:" << fTableOID1 << "/" << oido;
1108         oss << " " << fOutputJobStepAssociation.outAt(0);
1109     }
1110 
1111     oss << endl;
1112 
1113     return oss.str();
1114 }
1115 
1116 //------------------------------------------------------------------------------
1117 // Log specified error to stderr and the critical log
1118 //------------------------------------------------------------------------------
errorLogging(const string & msg,int err) const1119 void TupleHashJoinStep::errorLogging(const string& msg, int err) const
1120 {
1121     ostringstream errMsg;
1122     errMsg << "Step " << stepId() << "; " << msg;
1123     cerr   << errMsg.str() << endl;
1124     SErrorInfo errorInfo(new ErrorInfo); // dummy, error info already set by caller.
1125     catchHandler(msg, err, errorInfo, fSessionId);
1126 }
1127 
addSmallSideRG(const vector<rowgroup::RowGroup> & rgs,const vector<string> & tnames)1128 void TupleHashJoinStep::addSmallSideRG(const vector<rowgroup::RowGroup>& rgs,
1129                                        const vector<string>& tnames)
1130 {
1131     smallTableNames.insert(smallTableNames.end(), tnames.begin(), tnames.end());
1132     smallRGs.insert(smallRGs.end(), rgs.begin(), rgs.end());
1133 }
1134 
addJoinKeyIndex(const vector<JoinType> & jt,const vector<bool> & typeless,const vector<vector<uint32_t>> & smallkey,const vector<vector<uint32_t>> & largekey)1135 void TupleHashJoinStep::addJoinKeyIndex(const vector<JoinType>& jt,
1136                                         const vector<bool>& typeless,
1137                                         const vector<vector<uint32_t> >& smallkey,
1138                                         const vector<vector<uint32_t> >& largekey)
1139 {
1140     joinTypes.insert(joinTypes.end(), jt.begin(), jt.end());
1141     typelessJoin.insert(typelessJoin.end(), typeless.begin(), typeless.end());
1142     smallSideKeys.insert(smallSideKeys.end(), smallkey.begin(), smallkey.end());
1143     largeSideKeys.insert(largeSideKeys.end(), largekey.begin(), largekey.end());
1144 #ifdef JLF_DEBUG
1145 
1146     for (uint32_t i = 0; i < joinTypes.size(); i++)
1147         cout << "jointype[" << i << "] = 0x" << hex << joinTypes[i] << dec << endl;
1148 
1149 #endif
1150 }
1151 
configSmallSideRG(const vector<RowGroup> & rgs,const vector<string> & tnames)1152 void TupleHashJoinStep::configSmallSideRG(const vector<RowGroup>& rgs, const vector<string>& tnames)
1153 {
1154     smallTableNames.insert(smallTableNames.begin(), tnames.begin(), tnames.end());
1155     smallRGs.insert(smallRGs.begin(), rgs.begin(), rgs.end());
1156 }
1157 
configLargeSideRG(const RowGroup & rg)1158 void TupleHashJoinStep::configLargeSideRG(const RowGroup& rg)
1159 {
1160     largeRG = rg;
1161 }
1162 
configJoinKeyIndex(const vector<JoinType> & jt,const vector<bool> & typeless,const vector<vector<uint32_t>> & smallkey,const vector<vector<uint32_t>> & largekey)1163 void TupleHashJoinStep::configJoinKeyIndex(const vector<JoinType>& jt,
1164         const vector<bool>& typeless,
1165         const vector<vector<uint32_t> >& smallkey,
1166         const vector<vector<uint32_t> >& largekey)
1167 {
1168     joinTypes.insert(joinTypes.begin(), jt.begin(), jt.end());
1169     typelessJoin.insert(typelessJoin.begin(), typeless.begin(), typeless.end());
1170     smallSideKeys.insert(smallSideKeys.begin(), smallkey.begin(), smallkey.end());
1171     largeSideKeys.insert(largeSideKeys.begin(), largekey.begin(), largekey.end());
1172 #ifdef JLF_DEBUG
1173 
1174     for (uint32_t i = 0; i < joinTypes.size(); i++)
1175         cout << "jointype[" << i << "] = 0x" << hex << joinTypes[i] << dec << endl;
1176 
1177 #endif
1178 }
1179 
setOutputRowGroup(const RowGroup & rg)1180 void TupleHashJoinStep::setOutputRowGroup(const RowGroup& rg)
1181 {
1182     outputRG = rg;
1183 }
1184 
smallSideKeyOID(uint32_t s,uint32_t k) const1185 execplan::CalpontSystemCatalog::OID TupleHashJoinStep::smallSideKeyOID(uint32_t s, uint32_t k) const
1186 {
1187     return smallRGs[s].getOIDs()[smallSideKeys[s][k]];
1188 }
1189 
largeSideKeyOID(uint32_t s,uint32_t k) const1190 execplan::CalpontSystemCatalog::OID TupleHashJoinStep::largeSideKeyOID(uint32_t s, uint32_t k) const
1191 {
1192     return largeRG.getOIDs()[largeSideKeys[s][k]];
1193 }
1194 
addFcnExpGroup2(const boost::shared_ptr<execplan::ParseTree> & fe)1195 void TupleHashJoinStep::addFcnExpGroup2(const boost::shared_ptr<execplan::ParseTree>& fe)
1196 {
1197     if (!fe2)
1198         fe2.reset(new funcexp::FuncExpWrapper());
1199 
1200     fe2->addFilter(fe);
1201 }
1202 
setFcnExpGroup3(const vector<boost::shared_ptr<execplan::ReturnedColumn>> & v)1203 void TupleHashJoinStep::setFcnExpGroup3(const vector<boost::shared_ptr<execplan::ReturnedColumn> >& v)
1204 {
1205     if (!fe2)
1206         fe2.reset(new funcexp::FuncExpWrapper());
1207 
1208     for (uint32_t i = 0; i < v.size(); i++)
1209         fe2->addReturnedColumn(v[i]);
1210 }
1211 
setFE23Output(const rowgroup::RowGroup & rg)1212 void TupleHashJoinStep::setFE23Output(const rowgroup::RowGroup& rg)
1213 {
1214     fe2Output = rg;
1215 }
1216 
getDeliveredRowGroup() const1217 const rowgroup::RowGroup& TupleHashJoinStep::getDeliveredRowGroup() const
1218 {
1219     if (fe2)
1220         return fe2Output;
1221 
1222     return outputRG;
1223 }
1224 
deliverStringTableRowGroup(bool b)1225 void TupleHashJoinStep::deliverStringTableRowGroup(bool b)
1226 {
1227     if (fe2)
1228         fe2Output.setUseStringTable(b);
1229 
1230     outputRG.setUseStringTable(b);
1231 }
1232 
deliverStringTableRowGroup() const1233 bool TupleHashJoinStep::deliverStringTableRowGroup() const
1234 {
1235     if (fe2)
1236         return fe2Output.usesStringTable();
1237 
1238     return outputRG.usesStringTable();
1239 }
1240 
1241 //Must hold the stats lock when calling this!
formatMiniStats(uint32_t index)1242 void TupleHashJoinStep::formatMiniStats(uint32_t index)
1243 {
1244     ostringstream oss;
1245     oss << "HJS ";
1246 
1247     if (joiners[index]->inUM())
1248         oss << "UM ";
1249     else
1250         oss << "PM ";
1251 
1252     oss << alias() << "-" << joiners[index]->getTableName() << " ";
1253 
1254     if (fTableOID2 >= 3000)
1255         oss << fTableOID2;
1256     else
1257         oss << "- ";
1258 
1259     oss << " "
1260         << "- "
1261         << "- "
1262         << "- "
1263         << "- "
1264 //		<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1265 //		dlTimes are not timed in this step, using '--------' instead.
1266         << "-------- "
1267         << "-\n";
1268     fMiniInfo += oss.str();
1269 }
1270 
addJoinFilter(boost::shared_ptr<execplan::ParseTree> pt,uint32_t index)1271 void TupleHashJoinStep::addJoinFilter(boost::shared_ptr<execplan::ParseTree> pt, uint32_t index)
1272 {
1273     boost::shared_ptr<funcexp::FuncExpWrapper> newfe(new funcexp::FuncExpWrapper());
1274 
1275     newfe->addFilter(pt);
1276     fe.push_back(newfe);
1277     feIndexes.push_back(index);
1278 }
1279 
hasJoinFilter(uint32_t index) const1280 bool TupleHashJoinStep::hasJoinFilter(uint32_t index) const
1281 {
1282     for (uint32_t i = 0; i < feIndexes.size(); i++)
1283         if (feIndexes[i] == static_cast<int>(index))
1284             return true;
1285 
1286     return false;
1287 }
1288 
getJoinFilter(uint32_t index) const1289 boost::shared_ptr<funcexp::FuncExpWrapper> TupleHashJoinStep::getJoinFilter(uint32_t index) const
1290 {
1291     for (uint32_t i = 0; i < feIndexes.size(); i++)
1292         if (feIndexes[i] == static_cast<int>(index))
1293             return fe[i];
1294 
1295     return boost::shared_ptr<funcexp::FuncExpWrapper>();
1296 }
1297 
setJoinFilterInputRG(const rowgroup::RowGroup & rg)1298 void TupleHashJoinStep::setJoinFilterInputRG(const rowgroup::RowGroup& rg)
1299 {
1300     joinFilterRG = rg;
1301 }
1302 
startJoinThreads()1303 void TupleHashJoinStep::startJoinThreads()
1304 {
1305     uint32_t i;
1306     uint32_t smallSideCount = smallDLs.size();
1307     bool more = true;
1308     RGData oneRG;
1309 
1310     if (joinRunners.size() > 0)
1311         return;
1312 
1313     //@bug4836, in error case, stop process, and unblock the next step.
1314     if (cancelled())
1315     {
1316         outputDL->endOfInput();
1317 
1318         //@bug5785, memory leak on canceling complex queries
1319         while (more)
1320             more = largeDL->next(largeIt, &oneRG);
1321 
1322         return;
1323     }
1324 
1325     /* Init class-scope vars.
1326      *
1327      * Get a list of small RGs consistent with the joiners.
1328      * Generate small & large mappings for joinFERG and outputRG.
1329      * If fDelivery, create outputDL.
1330      */
1331     for (i = 0; i < smallSideCount; i++)
1332         smallRGs[i] = joiners[i]->getSmallRG();
1333 
1334     columnMappings.reset(new shared_array<int>[smallSideCount + 1]);
1335 
1336     for (i = 0; i < smallSideCount; i++)
1337         columnMappings[i] = makeMapping(smallRGs[i], outputRG);
1338 
1339     columnMappings[smallSideCount] = makeMapping(largeRG, outputRG);
1340 
1341     if (!feIndexes.empty())
1342     {
1343         fergMappings.reset(new shared_array<int>[smallSideCount + 1]);
1344 
1345         for (i = 0; i < smallSideCount; i++)
1346             fergMappings[i] = makeMapping(smallRGs[i], joinFilterRG);
1347 
1348         fergMappings[smallSideCount] = makeMapping(largeRG, joinFilterRG);
1349     }
1350 
1351     if (fe2)
1352         fe2Mapping = makeMapping(outputRG, fe2Output);
1353 
1354     smallNullMemory.reset(new scoped_array<uint8_t>[smallSideCount]);
1355 
1356     for (i = 0; i < smallSideCount; i++)
1357     {
1358         Row smallRow;
1359         smallRGs[i].initRow(&smallRow, true);
1360         smallNullMemory[i].reset(new uint8_t[smallRow.getSize()]);
1361         smallRow.setData(smallNullMemory[i].get());
1362         smallRow.initToNull();
1363     }
1364 
1365     for (i = 0; i < smallSideCount; i++)
1366         joiners[i]->setThreadCount(joinThreadCount);
1367 
1368     makeDupList(fe2 ? fe2Output : outputRG);
1369 
1370     /* Start join runners */
1371     joinRunners.reserve(joinThreadCount);
1372 
1373     for (i = 0; i < joinThreadCount; i++)
1374         joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i)));
1375 
1376     /* Join them and call endOfInput */
1377     jobstepThreadPool.join(joinRunners);
1378 
1379     if (lastSmallOuterJoiner != (uint32_t) - 1)
1380         finishSmallOuterJoin();
1381 
1382     outputDL->endOfInput();
1383 }
1384 
finishSmallOuterJoin()1385 void TupleHashJoinStep::finishSmallOuterJoin()
1386 {
1387     vector<Row::Pointer> unmatched;
1388     uint32_t smallSideCount = smallDLs.size();
1389     uint32_t i, j, k;
1390     shared_array<uint8_t> largeNullMemory;
1391     RGData joinedData;
1392     Row joinedBaseRow, fe2InRow, fe2OutRow;
1393     shared_array<Row> smallRowTemplates;
1394     shared_array<Row> smallNullRows;
1395     Row largeNullRow;
1396     RowGroup l_outputRG = outputRG;
1397     RowGroup l_fe2Output = fe2Output;
1398 
1399     joiners[lastSmallOuterJoiner]->getUnmarkedRows(&unmatched);
1400 
1401     if (unmatched.empty())
1402         return;
1403 
1404     smallRowTemplates.reset(new Row[smallSideCount]);
1405     smallNullRows.reset(new Row[smallSideCount]);
1406 
1407     for (i = 0; i < smallSideCount; i++)
1408     {
1409         smallRGs[i].initRow(&smallRowTemplates[i]);
1410         smallRGs[i].initRow(&smallNullRows[i], true);
1411         smallNullRows[i].setData(smallNullMemory[i].get());
1412     }
1413 
1414     largeRG.initRow(&largeNullRow, true);
1415     largeNullMemory.reset(new uint8_t[largeNullRow.getSize()]);
1416     largeNullRow.setData(largeNullMemory.get());
1417     largeNullRow.initToNull();
1418 
1419     joinedData.reinit(l_outputRG);
1420     l_outputRG.setData(&joinedData);
1421     l_outputRG.resetRowGroup(0);
1422     l_outputRG.initRow(&joinedBaseRow);
1423     l_outputRG.getRow(0, &joinedBaseRow);
1424 
1425     if (fe2)
1426     {
1427         l_outputRG.initRow(&fe2InRow);
1428         fe2Output.initRow(&fe2OutRow);
1429     }
1430 
1431     for (j = 0; j < unmatched.size(); j++)
1432     {
1433         smallRowTemplates[lastSmallOuterJoiner].setPointer(unmatched[j]);
1434 
1435         for (k = 0; k < smallSideCount; k++)
1436         {
1437             if (k == lastSmallOuterJoiner)
1438                 applyMapping(columnMappings[lastSmallOuterJoiner], smallRowTemplates[lastSmallOuterJoiner], &joinedBaseRow);
1439             else
1440                 applyMapping(columnMappings[k], smallNullRows[k], &joinedBaseRow);
1441         }
1442 
1443         applyMapping(columnMappings[smallSideCount], largeNullRow, &joinedBaseRow);
1444         joinedBaseRow.setRid(0);
1445         joinedBaseRow.nextRow();
1446         l_outputRG.incRowCount();
1447 
1448         if (l_outputRG.getRowCount() == 8192)
1449         {
1450             if (fe2)
1451             {
1452                 vector<RGData> rgDatav;
1453                 rgDatav.push_back(joinedData);
1454                 processFE2(l_outputRG, l_fe2Output, fe2InRow, fe2OutRow, &rgDatav, fe2.get());
1455                 outputDL->insert(rgDatav[0]);
1456             }
1457             else
1458             {
1459                 outputDL->insert(joinedData);
1460             }
1461 
1462             joinedData.reinit(l_outputRG);
1463             l_outputRG.setData(&joinedData);
1464             l_outputRG.resetRowGroup(0);
1465             l_outputRG.getRow(0, &joinedBaseRow);
1466         }
1467     }
1468 
1469     if (l_outputRG.getRowCount() > 0)
1470     {
1471         if (fe2)
1472         {
1473             vector<RGData> rgDatav;
1474             rgDatav.push_back(joinedData);
1475             processFE2(l_outputRG, l_fe2Output, fe2InRow, fe2OutRow, &rgDatav, fe2.get());
1476             outputDL->insert(rgDatav[0]);
1477         }
1478         else
1479         {
1480             outputDL->insert(joinedData);
1481         }
1482     }
1483 }
1484 
joinRunnerFcn(uint32_t threadID)1485 void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID)
1486 {
1487     RowGroup local_inputRG, local_outputRG, local_joinFERG;
1488     uint32_t smallSideCount = smallDLs.size();
1489     vector<RGData> inputData, joinedRowData;
1490     bool hasJoinFE = !fe.empty();
1491     uint32_t i;
1492 
1493     /* thread-local scratch space for join processing */
1494     shared_array<uint8_t> joinFERowData;
1495     Row largeRow, joinFERow, joinedRow, baseRow;
1496     shared_array<uint8_t> baseRowData;
1497     vector<vector<Row::Pointer> > joinMatches;
1498     shared_array<Row> smallRowTemplates;
1499 
1500     /* F & E vars */
1501     FuncExpWrapper local_fe;
1502     RowGroup local_fe2RG;
1503     Row fe2InRow, fe2OutRow;
1504 
1505     joinMatches.resize(smallSideCount);
1506     local_inputRG = largeRG;
1507     local_outputRG = outputRG;
1508     local_inputRG.initRow(&largeRow);
1509     local_outputRG.initRow(&joinedRow);
1510     local_outputRG.initRow(&baseRow, true);
1511     baseRowData.reset(new uint8_t[baseRow.getSize()]);
1512     baseRow.setData(baseRowData.get());
1513 
1514     if (hasJoinFE)
1515     {
1516         local_joinFERG = joinFilterRG;
1517         local_joinFERG.initRow(&joinFERow, true);
1518         joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
1519         joinFERow.setData(joinFERowData.get());
1520     }
1521 
1522     if (fe2)
1523     {
1524         local_fe2RG = fe2Output;
1525         local_outputRG.initRow(&fe2InRow);
1526         local_fe2RG.initRow(&fe2OutRow);
1527         local_fe = *fe2;
1528     }
1529 
1530     smallRowTemplates.reset(new Row[smallSideCount]);
1531 
1532     for (i = 0; i < smallSideCount; i++)
1533         smallRGs[i].initRow(&smallRowTemplates[i]);
1534 
1535     grabSomeWork(&inputData);
1536 
1537     while (!inputData.empty() && !cancelled())
1538     {
1539         for (i = 0; i < inputData.size() && !cancelled(); i++)
1540         {
1541             local_inputRG.setData(&inputData[i]);
1542 
1543             if (local_inputRG.getRowCount() == 0)
1544                 continue;
1545 
1546             joinOneRG(threadID, &joinedRowData, local_inputRG, local_outputRG, largeRow,
1547                       joinFERow, joinedRow, baseRow, joinMatches, smallRowTemplates);
1548         }
1549 
1550         if (fe2)
1551             processFE2(local_outputRG, local_fe2RG, fe2InRow, fe2OutRow, &joinedRowData, &local_fe);
1552 
1553         processDupList(threadID, (fe2 ? local_fe2RG : local_outputRG), &joinedRowData);
1554         sendResult(joinedRowData);
1555         joinedRowData.clear();
1556         grabSomeWork(&inputData);
1557     }
1558 
1559     while (!inputData.empty())
1560         grabSomeWork(&inputData);
1561 }
1562 
makeDupList(const RowGroup & rg)1563 void TupleHashJoinStep::makeDupList(const RowGroup& rg)
1564 {
1565     uint32_t i, j, cols = rg.getColumnCount();
1566 
1567     for (i = 0; i < cols; i++)
1568         for (j = i + 1; j < cols; j++)
1569             if (rg.getKeys()[i] == rg.getKeys()[j])
1570                 dupList.push_back(make_pair(j, i));
1571 
1572     dupRows.reset(new Row[joinThreadCount]);
1573 
1574     for (i = 0; i < joinThreadCount; i++)
1575         rg.initRow(&dupRows[i]);
1576 }
1577 
processDupList(uint32_t threadID,RowGroup & rg,vector<RGData> * rowData)1578 void TupleHashJoinStep::processDupList(uint32_t threadID, RowGroup& rg,
1579                                        vector<RGData>* rowData)
1580 {
1581     uint32_t i, j, k;
1582 
1583     if (dupList.empty())
1584         return;
1585 
1586     for (i = 0; i < rowData->size(); i++)
1587     {
1588         rg.setData(&(*rowData)[i]);
1589         rg.getRow(0, &dupRows[threadID]);
1590 
1591         for (j = 0; j < rg.getRowCount(); j++, dupRows[threadID].nextRow())
1592             for (k = 0; k < dupList.size(); k++)
1593                 dupRows[threadID].copyField(dupList[k].first, dupList[k].second);
1594     }
1595 }
1596 
processFE2(RowGroup & input,RowGroup & output,Row & inRow,Row & outRow,vector<RGData> * rgData,funcexp::FuncExpWrapper * local_fe)1597 void TupleHashJoinStep::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& outRow,
1598                                    vector<RGData>* rgData, funcexp::FuncExpWrapper* local_fe)
1599 {
1600     vector<RGData> results;
1601     RGData result;
1602     uint32_t i, j;
1603     bool ret;
1604 
1605     result.reinit(output);
1606     output.setData(&result);
1607     output.resetRowGroup(0);
1608     output.getRow(0, &outRow);
1609 
1610     for (i = 0; i < rgData->size(); i++)
1611     {
1612         input.setData(&(*rgData)[i]);
1613 
1614         if (output.getRowCount() == 0)
1615         {
1616             output.resetRowGroup(input.getBaseRid());
1617             output.setDBRoot(input.getDBRoot());
1618         }
1619 
1620         input.getRow(0, &inRow);
1621 
1622         for (j = 0; j < input.getRowCount(); j++, inRow.nextRow())
1623         {
1624             ret = local_fe->evaluate(&inRow);
1625 
1626             if (ret)
1627             {
1628                 applyMapping(fe2Mapping, inRow, &outRow);
1629                 output.incRowCount();
1630                 outRow.nextRow();
1631 
1632                 if (output.getRowCount() == 8192)
1633                 {
1634                     results.push_back(result);
1635                     result.reinit(output);
1636                     output.setData(&result);
1637                     output.resetRowGroup(input.getBaseRid());
1638                     output.setDBRoot(input.getDBRoot());
1639                     output.getRow(0, &outRow);
1640                 }
1641             }
1642         }
1643     }
1644 
1645     if (output.getRowCount() > 0)
1646     {
1647         results.push_back(result);
1648     }
1649 
1650     rgData->swap(results);
1651 }
1652 
sendResult(const vector<RGData> & res)1653 void TupleHashJoinStep::sendResult(const vector<RGData>& res)
1654 {
1655     boost::mutex::scoped_lock lock(outputDLLock);
1656 
1657     for (uint32_t i = 0; i < res.size(); i++)
1658         //INSERT_ADAPTER(outputDL, res[i]);
1659         outputDL->insert(res[i]);
1660 }
1661 
grabSomeWork(vector<RGData> * work)1662 void TupleHashJoinStep::grabSomeWork(vector<RGData>* work)
1663 {
1664     boost::mutex::scoped_lock lock(inputDLLock);
1665     work->clear();
1666 
1667     if (!moreInput)
1668         return;
1669 
1670     RGData e;
1671     moreInput = largeDL->next(largeIt, &e);
1672 
1673     /* Tunable number here, but it probably won't change things much */
1674     for (uint32_t i = 0; i < 10 && moreInput; i++)
1675     {
1676         work->push_back(e);
1677         moreInput = largeDL->next(largeIt, &e);
1678     }
1679 
1680     if (moreInput)
1681         work->push_back(e);
1682 }
1683 
1684 /* This function is a port of the main join loop in TupleBPS::receiveMultiPrimitiveMessages().  Any
1685  * changes made here should also be made there and vice versa. */
joinOneRG(uint32_t threadID,vector<RGData> * out,RowGroup & inputRG,RowGroup & joinOutput,Row & largeSideRow,Row & joinFERow,Row & joinedRow,Row & baseRow,vector<vector<Row::Pointer>> & joinMatches,shared_array<Row> & smallRowTemplates,vector<boost::shared_ptr<joiner::TupleJoiner>> * tjoiners,boost::shared_array<boost::shared_array<int>> * rgMappings,boost::shared_array<boost::shared_array<int>> * feMappings,boost::scoped_array<boost::scoped_array<uint8_t>> * smallNullMem)1686 void TupleHashJoinStep::joinOneRG(uint32_t threadID, vector<RGData>* out,
1687                                   RowGroup& inputRG, RowGroup& joinOutput, Row& largeSideRow, Row& joinFERow,
1688                                   Row& joinedRow, Row& baseRow, vector<vector<Row::Pointer> >& joinMatches,
1689                                   shared_array<Row>& smallRowTemplates,
1690                                   // disk-join support vars.  This param list is insane; refactor attempt would be nice at some point.
1691                                   vector<boost::shared_ptr<joiner::TupleJoiner> >* tjoiners,
1692                                   boost::shared_array<boost::shared_array<int> >* rgMappings,
1693                                   boost::shared_array<boost::shared_array<int> >* feMappings,
1694                                   boost::scoped_array<boost::scoped_array<uint8_t> >* smallNullMem
1695                                  )
1696 {
1697 
1698     /* Disk-join support.
1699        These dissociate the fcn from THJS's members & allow this fcn to be called from DiskJoinStep
1700     */
1701     if (!tjoiners)
1702         tjoiners = &joiners;
1703 
1704     if (!rgMappings)
1705         rgMappings = &columnMappings;
1706 
1707     if (!feMappings)
1708         feMappings = &fergMappings;
1709 
1710     if (!smallNullMem)
1711         smallNullMem = &smallNullMemory;
1712 
1713     RGData joinedData;
1714     uint32_t matchCount, smallSideCount = tjoiners->size();
1715     uint32_t j, k;
1716 
1717     joinedData.reinit(joinOutput);
1718     joinOutput.setData(&joinedData);
1719     joinOutput.resetRowGroup(inputRG.getBaseRid());
1720     joinOutput.setDBRoot(inputRG.getDBRoot());
1721     inputRG.getRow(0, &largeSideRow);
1722 
1723     //cout << "jointype = " << (*tjoiners)[0]->getJoinType() << endl;
1724     for (k = 0; k < inputRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow())
1725     {
1726         //cout << "THJS: Large side row: " << largeSideRow.toString() << endl;
1727         matchCount = 0;
1728 
1729         for (j = 0; j < smallSideCount; j++)
1730         {
1731             (*tjoiners)[j]->match(largeSideRow, k, threadID, &joinMatches[j]);
1732             /* Debugging code to print the matches
1733             	Row r;
1734             	smallRGs[j].initRow(&r);
1735             	cout << joinMatches[j].size() << " matches: \n";
1736             	for (uint32_t z = 0; z < joinMatches[j].size(); z++) {
1737             		r.setData(joinMatches[j][z]);
1738             		cout << "  " << r.toString() << endl;
1739             	}
1740             */
1741             matchCount = joinMatches[j].size();
1742 
1743             if ((*tjoiners)[j]->hasFEFilter() && matchCount > 0)
1744             {
1745                 //cout << "doing FE filter" << endl;
1746                 vector<Row::Pointer> newJoinMatches;
1747                 applyMapping((*feMappings)[smallSideCount], largeSideRow, &joinFERow);
1748 
1749                 for (uint32_t z = 0; z < joinMatches[j].size(); z++)
1750                 {
1751                     smallRowTemplates[j].setPointer(joinMatches[j][z]);
1752                     applyMapping((*feMappings)[j], smallRowTemplates[j], &joinFERow);
1753 
1754                     if (!(*tjoiners)[j]->evaluateFilter(joinFERow, threadID))
1755                         matchCount--;
1756                     else
1757                     {
1758                         /* The first match includes it in a SEMI join result and excludes it from an ANTI join
1759                          * result.  If it's SEMI & SCALAR however, it needs to continue.
1760                          */
1761                         newJoinMatches.push_back(joinMatches[j][z]);
1762 
1763                         if ((*tjoiners)[j]->antiJoin() || ((*tjoiners)[j]->semiJoin() && !(*tjoiners)[j]->scalar()))
1764                             break;
1765                     }
1766                 }
1767 
1768                 // the filter eliminated all matches, need to join with the NULL row
1769                 if (matchCount == 0 && (*tjoiners)[j]->largeOuterJoin())
1770                 {
1771                     newJoinMatches.clear();
1772                     newJoinMatches.push_back(Row::Pointer((*smallNullMem)[j].get()));
1773                     matchCount = 1;
1774                 }
1775 
1776                 joinMatches[j].swap(newJoinMatches);
1777             }
1778 
1779             /* If anti-join, reverse the result */
1780             if ((*tjoiners)[j]->antiJoin())
1781             {
1782                 matchCount = (matchCount ? 0 : 1);
1783             }
1784 
1785             if (matchCount == 0)
1786             {
1787                 joinMatches[j].clear();
1788                 break;
1789             }
1790             else if (!(*tjoiners)[j]->scalar() && ((*tjoiners)[j]->semiJoin() || (*tjoiners)[j]->antiJoin()))
1791             {
1792                 joinMatches[j].clear();
1793                 joinMatches[j].push_back(Row::Pointer((*smallNullMem)[j].get()));
1794                 matchCount = 1;
1795             }
1796 
1797             if (matchCount == 0 && (*tjoiners)[j]->innerJoin())
1798                 break;
1799 
1800             if ((*tjoiners)[j]->scalar() && matchCount > 1)
1801             {
1802                 errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_MORE_THAN_1_ROW));
1803                 status(logging::ERR_MORE_THAN_1_ROW);
1804                 abort();
1805             }
1806 
1807             if ((*tjoiners)[j]->smallOuterJoin())
1808                 (*tjoiners)[j]->markMatches(threadID, joinMatches[j]);
1809 
1810         }
1811 
1812         if (matchCount > 0)
1813         {
1814             /* TODO!!!  See TupleBPS for the fix for bug 3510! */
1815             applyMapping((*rgMappings)[smallSideCount], largeSideRow, &baseRow);
1816             baseRow.setRid(largeSideRow.getRelRid());
1817             generateJoinResultSet(joinMatches, baseRow, *rgMappings,
1818                                   0, joinOutput, joinedData, out, smallRowTemplates, joinedRow);
1819         }
1820     }
1821 
1822     if (joinOutput.getRowCount() > 0)
1823         out->push_back(joinedData);
1824 }
1825 
generateJoinResultSet(const vector<vector<Row::Pointer>> & joinerOutput,Row & baseRow,const shared_array<shared_array<int>> & mappings,const uint32_t depth,RowGroup & l_outputRG,RGData & rgData,vector<RGData> * outputData,const shared_array<Row> & smallRows,Row & joinedRow)1826 void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput,
1827         Row& baseRow, const shared_array<shared_array<int> >& mappings,
1828         const uint32_t depth, RowGroup& l_outputRG, RGData& rgData,
1829         vector<RGData>* outputData,	const shared_array<Row>& smallRows,
1830         Row& joinedRow)
1831 {
1832     uint32_t i;
1833     Row& smallRow = smallRows[depth];
1834     uint32_t smallSideCount = joinerOutput.size();
1835 
1836     if (depth < smallSideCount - 1)
1837     {
1838         for (i = 0; i < joinerOutput[depth].size(); i++)
1839         {
1840             smallRow.setPointer(joinerOutput[depth][i]);
1841             applyMapping(mappings[depth], smallRow, &baseRow);
1842 // 			cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl;
1843             generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1,
1844                                   l_outputRG, rgData, outputData, smallRows, joinedRow);
1845         }
1846     }
1847     else
1848     {
1849         l_outputRG.getRow(l_outputRG.getRowCount(), &joinedRow);
1850 
1851         for (i = 0; i < joinerOutput[depth].size(); i++, joinedRow.nextRow(),
1852                 l_outputRG.incRowCount())
1853         {
1854             smallRow.setPointer(joinerOutput[depth][i]);
1855 
1856             if (UNLIKELY(l_outputRG.getRowCount() == 8192))
1857             {
1858                 uint32_t dbRoot = l_outputRG.getDBRoot();
1859                 uint64_t baseRid = l_outputRG.getBaseRid();
1860                 outputData->push_back(rgData);
1861                 rgData.reinit(l_outputRG);
1862                 l_outputRG.setData(&rgData);
1863                 l_outputRG.resetRowGroup(baseRid);
1864                 l_outputRG.setDBRoot(dbRoot);
1865                 l_outputRG.getRow(0, &joinedRow);
1866             }
1867 
1868 // 			cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " << i << ": " << smallRow.toString() << endl;
1869             applyMapping(mappings[depth], smallRow, &baseRow);
1870             copyRow(baseRow, &joinedRow);
1871             //memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize());
1872             //cout << "(step " << stepID << ") fully joined row is: " << joinedRow.toString() << endl;
1873         }
1874     }
1875 }
1876 
segregateJoiners()1877 void TupleHashJoinStep::segregateJoiners()
1878 {
1879     uint32_t i;
1880     bool allInnerJoins = true;
1881     bool anyTooLarge = false;
1882     uint32_t smallSideCount = smallDLs.size();
1883 
1884     for (i = 0; i < smallSideCount; i++)
1885     {
1886         allInnerJoins &= (joinTypes[i] == INNER);
1887         anyTooLarge |= !joiners[i]->isFinished();
1888     }
1889 
1890     /* When DDL updates syscat, the syscat checks here are necessary */
1891     if (isDML || !allowDJS || (fSessionId & 0x80000000) ||
1892             (tableOid() < 3000 && tableOid() >= 1000))
1893     {
1894         if (anyTooLarge)
1895         {
1896             joinIsTooBig = true;
1897             abort();
1898         }
1899 
1900         tbpsJoiners = joiners;
1901         return;
1902     }
1903 
1904     #if 0
1905     // Debugging code, this makes all eligible joins disk-based.
1906     else {
1907     	cout << "making all joins disk-based" << endl;
1908 		joinIsTooBig = true;
1909     	for (i = 0; i < smallSideCount; i++) {
1910             joiner[i]->setConvertToDiskJoin();
1911     		djsJoiners.push_back(joiners[i]);
1912     		djsJoinerMap.push_back(i);
1913     	}
1914     	return;
1915     }
1916     #endif
1917 
1918     boost::mutex::scoped_lock sl(djsLock);
1919     /* For now if there is no largeBPS all joins need to either be DJS or not, not mixed */
1920     if (!largeBPS)
1921     {
1922         if (anyTooLarge)
1923         {
1924             joinIsTooBig = true;
1925 
1926             for (i = 0; i < smallSideCount; i++)
1927             {
1928                 joiners[i]->setConvertToDiskJoin();
1929                 djsJoiners.push_back(joiners[i]);
1930                 djsJoinerMap.push_back(i);
1931             }
1932         }
1933         else
1934             tbpsJoiners = joiners;
1935 
1936         return;
1937     }
1938 
1939     /* If they are all inner joins they can be segregated w/o respect to
1940     ordering; if they're not, the ordering has to stay consistent therefore
1941     the first joiner that isn't finished and everything after has to be
1942     done by DJS. */
1943 
1944     if (allInnerJoins)
1945     {
1946         for (i = 0; i < smallSideCount; i++)
1947         {
1948             //if (joiners[i]->isFinished() && (rand() % 2)) {    // for debugging
1949             if (joiners[i]->isFinished())
1950             {
1951                 //cout << "1joiner " << i << "  " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl;
1952                 tbpsJoiners.push_back(joiners[i]);
1953             }
1954             else
1955             {
1956                 joinIsTooBig = true;
1957                 joiners[i]->setConvertToDiskJoin();
1958                 //cout << "1joiner " << i << "  " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
1959                 djsJoiners.push_back(joiners[i]);
1960                 djsJoinerMap.push_back(i);
1961             }
1962         }
1963     }
1964     else
1965     {
1966         //uint limit = rand() % smallSideCount;
1967         for (i = 0; i < smallSideCount; i++)
1968         {
1969             //if (joiners[i]->isFinished() && i < limit) {  // debugging
1970             if (joiners[i]->isFinished())
1971             {
1972                 //cout << "2joiner " << i << "  " << hex << (uint64_t) joiners[i].get() << dec << " -> TBPS" << endl;
1973                 tbpsJoiners.push_back(joiners[i]);
1974             }
1975             else
1976                 break;
1977         }
1978 
1979         for (; i < smallSideCount; i++)
1980         {
1981             joinIsTooBig = true;
1982             joiners[i]->setConvertToDiskJoin();
1983             //cout << "2joiner " << i << "  " << hex << (uint64_t) joiners[i].get() << dec << " -> DJS" << endl;
1984             djsJoiners.push_back(joiners[i]);
1985             djsJoinerMap.push_back(i);
1986         }
1987     }
1988 }
1989 
abort()1990 void TupleHashJoinStep::abort()
1991 {
1992     JobStep::abort();
1993     boost::mutex::scoped_lock sl(djsLock);
1994 
1995     if (djs)
1996     {
1997         for (uint32_t i = 0; i < djsJoiners.size(); i++)
1998             djs[i].abort();
1999     }
2000 }
2001 
2002 }
2003 // vim:ts=4 sw=4:
2004