1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: largehashjoin.cpp 9655 2013-06-25 23:08:13Z xlou $
20 #include <string>
21 #include <sstream>
22 #include <cassert>
23 #include <stdexcept>
24 #include <ctime>
25 #include <sys/time.h>
26 #include <iomanip>
27 using namespace std;
28
29 #include <boost/thread/mutex.hpp>
30
31 #include "calpontsystemcatalog.h"
32 using namespace execplan;
33
34 #include "jobstep.h"
35 #include "largehashjoin.h"
36 #include "elementtype.h"
37 using namespace joblist;
38
39 #include "mcsconfig.h"
40
41 boost::mutex fileLock_g;
42
43 namespace
44 {
logDiskIoInfo(uint64_t stepId,const AnyDataListSPtr & spdl)45 void logDiskIoInfo(uint64_t stepId, const AnyDataListSPtr& spdl)
46 {
47 boost::mutex::scoped_lock lk(fileLock_g);
48 string umDiskioLog = string(MCSLOGDIR) + "/trace/umdiskio.log";
49 string umDiskioBak = string(MCSLOGDIR) + "/trace/umdiskio.bak";
50
51 ofstream umDiskIoFile(umDiskioLog.c_str(), ios_base::app);
52
53 CalpontSystemCatalog::OID oid;
54 uint64_t maxBuckets = 0;
55 list<DiskIoInfo>* infoList = NULL;
56 string bkt("bkt");
57 BucketDL<ElementType>* bdl = spdl->bucketDL();
58 BucketDL<StringElementType>* sbdl = spdl->stringBucketDL();
59 ZDL<ElementType>* zdl = spdl->zonedDL();
60 ZDL<StringElementType>* szdl = spdl->stringZonedDL();
61
62 if (bdl != NULL)
63 {
64 maxBuckets = bdl->bucketCount();
65 oid = bdl->OID();
66 }
67 else if (zdl != NULL)
68 {
69 maxBuckets = zdl->bucketCount();
70 oid = zdl->OID();
71 bkt = "zdl";
72 }
73 else if (sbdl != NULL)
74 {
75 maxBuckets = sbdl->bucketCount();
76 oid = sbdl->OID();
77 }
78 else if (szdl != NULL)
79 {
80 maxBuckets = szdl->bucketCount();
81 oid = szdl->OID();
82 bkt = "zdl";
83 }
84 else
85 {
86 // not logged for now.
87 return;
88 }
89
90 for (uint64_t i = 0; i < maxBuckets; i++)
91 {
92 if (bdl) infoList = &(bdl->diskIoInfoList(i));
93 else if (zdl) infoList = &(zdl->diskIoInfoList(i));
94 else if (sbdl) infoList = &(sbdl->diskIoInfoList(i));
95 else if (szdl) infoList = &(szdl->diskIoInfoList(i));
96
97 for (list<DiskIoInfo>::iterator j = infoList->begin(); j != infoList->end(); j++)
98 {
99 boost::posix_time::time_duration td = j->fEnd - j->fStart;
100 umDiskIoFile << setfill('0')
101 << "st" << setw(2) << stepId << "oid" << oid << bkt << setw(3) << i
102 << (j->fWrite ? " writes " : " reads ") << setw(7) << setfill(' ')
103 << j->fBytes << " bytes, at " << j->fStart << " duration "
104 << td.total_microseconds() << " mcs @ "
105 << (j->fBytes / td.total_microseconds()) << "MB/s" << endl;
106 }
107 }
108
109 streampos curPos = umDiskIoFile.tellp( );
110 umDiskIoFile.close();
111
112 // move the current file to bak when size above .5 G, so total log is 1 G
113 if (curPos > 0x20000000)
114 {
115 string cmd = "/bin/mv " + umDiskioLog + " " + umDiskioBak;
116 (void)system(cmd.c_str());
117 }
118 }
119
120 }
121
122
123 namespace joblist
124 {
125 const uint64_t ZDL_VEC_SIZE = 4096;
126 //@bug 686. Make hashjoin doing jobs in seperate thread and return to main immediately.
127 // So the other job steps can start.
128 struct HJRunner
129 {
HJRunnerjoblist::HJRunner130 HJRunner(LargeHashJoin* p) : joiner(p)
131 {}
132 LargeHashJoin* joiner;
operator ()joblist::HJRunner133 void operator()()
134 {
135 try
136 {
137 joiner->doHashJoin();
138 }
139 catch (std::exception& e)
140 {
141 ostringstream errMsg;
142 errMsg << "HJRunner caught: " << e.what();
143 joiner->errorLogging(errMsg.str());
144 joiner->unblockDatalists(logging::largeHashJoinErr);
145 }
146 catch (...)
147 {
148 string msg("HJRunner caught something not an exception!");
149 joiner->errorLogging(msg);
150 joiner->unblockDatalists(logging::largeHashJoinErr);
151 }
152 }
153 };
154
155 struct StringHJRunner
156 {
StringHJRunnerjoblist::StringHJRunner157 StringHJRunner(StringHashJoinStep* p) : joiner(p)
158 {}
159 StringHashJoinStep* joiner;
operator ()joblist::StringHJRunner160 void operator()()
161 {
162 try
163 {
164 joiner->doStringHashJoin();
165 }
166 catch (std::exception& e)
167 {
168 ostringstream errMsg;
169 errMsg << "StringHJRunner caught: " << e.what();
170 joiner->errorLogging(errMsg.str());
171 joiner->unblockDatalists(logging::stringHashJoinStepErr);
172 }
173 catch (...)
174 {
175 string msg("StringHJRunner caught something not an exception!");
176 joiner->errorLogging(msg);
177 joiner->unblockDatalists(logging::stringHashJoinStepErr);
178 }
179 }
180 };
181
182 // Thread function used by HashJoin
183 //
184 template<typename e_t>
HashJoinByBucket_thr(void * arg)185 void* HashJoinByBucket_thr(void* arg)
186 {
187 typename HashJoin<e_t>::thrParams_t* params = (typename HashJoin<e_t>::thrParams_t*)arg;
188 HashJoin<e_t>* hjPtr = params->hjptr;
189 const uint32_t thrIdx = params->thrIdx;
190 long set1Size = 0;
191 long set2Size = 0;
192 bool sendAllHashSet = false;
193 bool sendAllSearchSet = false;
194
195 try
196 {
197 for (uint32_t idx = 0, bucketIdx = params->startBucket;
198 idx < params->numBuckets;
199 idx++, bucketIdx++)
200 {
201
202 #ifdef DEBUG
203 cout << "\tJoinByBucket() thr " << dec << thrIdx
204 << " bkt " << bucketIdx
205 << "/" << hjPtr->Set1()->bucketCount()
206 << "/" << params->numBuckets << endl;
207 #endif
208
209 JoinType joinType = hjPtr->getJoinType();
210
211 set1Size = hjPtr->Set1()->size(bucketIdx);
212 set2Size = hjPtr->Set2()->size(bucketIdx);
213
214 if ( set1Size <= 0 && set2Size <= 0 )
215 {
216 continue;
217 }
218 else
219 {
220 if (set1Size > set2Size)
221 {
222 hjPtr->setSearchSet(hjPtr->Set1()->getBDL(), thrIdx);
223 hjPtr->setHashSet(hjPtr->Set2()->getBDL(), thrIdx);
224 hjPtr->setSearchResult(hjPtr->Result1(), thrIdx);
225 hjPtr->setHashResult(hjPtr->Result2(), thrIdx);
226 sendAllHashSet = (joinType == RIGHTOUTER);
227 sendAllSearchSet = (joinType == LEFTOUTER);
228 }
229 else
230 {
231 hjPtr->setHashSet(hjPtr->Set1()->getBDL(), thrIdx);
232 hjPtr->setSearchSet(hjPtr->Set2()->getBDL(), thrIdx);
233 hjPtr->setHashResult(hjPtr->Result1(), thrIdx);
234 hjPtr->setSearchResult(hjPtr->Result2(), thrIdx);
235 sendAllHashSet = (joinType == LEFTOUTER);
236 sendAllSearchSet = (joinType == RIGHTOUTER);
237 } //if set1Size > set2Size ...
238
239 } // if set1Size <=0 . . .
240
241 params->timeset.setTimer(createHashStr);
242 hjPtr->createHash(hjPtr->HashSet(thrIdx),
243 hjPtr->HashTable(thrIdx),
244 bucketIdx,
245 sendAllHashSet,
246 hjPtr->HashResult(thrIdx),
247 params->dlTimes, params->die);
248 params->timeset.holdTimer(createHashStr);
249
250 #ifdef DEBUG
251 long hashSetTotal = 0;
252 long searchSetTotal = 0;
253
254 for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
255 hashSetTotal += hjPtr->HashSet(thrIdx)->bucketCount(); // are bucketDL
256
257 for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
258 searchSetTotal += hjPtr->SearchSet(thrIdx)->size(j); // can be any datalist
259
260 cout << "\t\tJoinByBucket() thr " << dec << thrIdx
261 << " bkt " << bucketIdx
262 << " hashSize " << hashSetTotal
263 << " searchSize " << searchSetTotal << endl;
264 #endif
265
266 bool more;
267 e_t e;
268 e_t e2;
269 const uint64_t InvalidRID = static_cast<uint64_t>(-1);
270 int iter = hjPtr->SearchSet(thrIdx)->getIterator(bucketIdx);
271
272 ZonedDL* zdl1 = dynamic_cast<ZonedDL*>(hjPtr->SearchResult(thrIdx));
273 ZonedDL* zdl2 = dynamic_cast<ZonedDL*>(hjPtr->HashResult(thrIdx));
274 vector <e_t> vec1;
275 vector <e_t> vec2;
276
277 std::pair<typename HashJoin<e_t>::hashIter_t, typename HashJoin<e_t>::hashIter_t> hashItPair;
278 typename HashJoin<e_t>::hashIter_t hashIt;
279 typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
280 params->timeset.setTimer(hashJoinStr);
281
282 for (more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e);
283 more && !(*params->die);
284 more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e) )
285 {
286
287 // If sendAllSearchSet=true, keep all of the search set. If this is
288 // a right outer, we are dealing with a join such as
289 // col1 = col2 (+)
290 // where col1 is the SearchSet and col2 is the HashSet. We want to include
291 // all of col1 in this case regardless of whether there is a matching col2.
292 if (sendAllSearchSet)
293 {
294 if (zdl1)
295 {
296 vec1.push_back(e);
297
298 if (vec1.size() >= ZDL_VEC_SIZE)
299 {
300 params->timeset.setTimer(insertResultsStr);
301 hjPtr->SearchResult(thrIdx)->insert(vec1);
302 vec1.clear();
303 params->timeset.holdTimer(insertResultsStr);
304 }
305 }
306 else
307 hjPtr->SearchResult(thrIdx)->insert(e);
308 }
309
310 hashIt = ht->find(e.second);
311
312 if (hashIt != ht->end())
313 {
314 #ifdef DEBUG
315
316 if (hjPtr->SearchResult(thrIdx)->OID() >= 3000)
317 cout << "JoinByBucket() SearchResult add " << bucketIdx
318 << " [" << e.first << "][" << e.second << "]" << endl;
319
320 uint32_t a = 0;
321 e_t b = e_t();
322 #endif
323
324 // If sendAllSearchSet=false, we already added the search result
325 // before the if condition above.
326 if (!sendAllSearchSet)
327 {
328 if (zdl1)
329 {
330 vec1.push_back(e);
331
332 if (vec1.size() >= ZDL_VEC_SIZE)
333 {
334 params->timeset.setTimer(insertResultsStr);
335 hjPtr->SearchResult(thrIdx)->insert(vec1);
336 vec1.clear();
337 params->timeset.holdTimer(insertResultsStr);
338 }
339 }
340 else
341 hjPtr->SearchResult(thrIdx)->insert(e);
342 }
343
344 // If sendAllHashSet=false, add the hash results to the output datalist.
345 // If it is a left outer join then we already added all of the right side rows
346 // in the bucket in the createHash call earlier in this function.
347 if (!sendAllHashSet)
348 {
349
350 // If the matching pair has it's RID set to invalid, it's already been encountered,
351 // so no reason to add it to the output datalist or keep searching for more matching values.
352 if (hashIt->second != InvalidRID)
353 {
354
355 // If the matching pair has it's RID set to invalid, it's already been encountered,
356 hashItPair = ht->equal_range(e.second);
357
358 for (hashIt = hashItPair.first; hashIt != hashItPair.second; hashIt++)
359 {
360 e2.first = hashIt->second;
361 e2.second = e.second;
362
363 if (zdl2)
364 {
365 vec2.push_back(e2);
366
367 if (vec2.size() >= ZDL_VEC_SIZE)
368 {
369 params->timeset.setTimer(insertResultsStr);
370 hjPtr->HashResult(thrIdx)->insert(vec2);
371 vec2.clear();
372 params->timeset.holdTimer(insertResultsStr);
373 }
374 }
375 else
376 hjPtr->HashResult(thrIdx)->insert(e2);
377
378 #ifdef DEBUG
379 a++;
380 b = v.second;
381 #endif
382
383 // Set the RID to invalid rid now that it's been matched and added to the output datalist.
384 // This will keep us from duplicating it if it is matched again.
385 hashIt->second = InvalidRID;
386
387 }
388
389 #ifdef DEBUG
390 cout << "\t\tadded " << b << " " << a << " times" << endl << endl;
391 #endif
392 }
393
394 }
395
396 } // if hashIt != hashIt->end()
397
398 } // for ( hjPtr...
399
400 params->timeset.holdTimer(hashJoinStr);
401
402 params->timeset.setTimer(insertLastResultsStr);
403
404 if (vec1.size() != 0)
405 {
406 hjPtr->SearchResult(thrIdx)->insert(vec1);
407 vec1.clear();
408 }
409
410 if (vec2.size() != 0)
411 {
412 hjPtr->HashResult(thrIdx)->insert(vec2);
413 vec2.clear();
414 }
415
416 params->timeset.holdTimer(insertLastResultsStr);
417
418 // typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
419 ht->clear();
420
421 } // for (bucketIdx...
422 } // try
423 // We don't have to call JSA.endOfInput() for this exception, because
424 // the parent thread takes care of that in performThreadedJoin().
425 catch (const logging::LargeDataListExcept& ex)
426 {
427 ostringstream errMsg;
428
429 if (typeid(e_t) == typeid(StringElementType))
430 {
431 errMsg << "HashJoinByBucket_thr<String>: caught LDL error: " <<
432 ex.what();
433 hjPtr->status(logging::stringHashJoinStepLargeDataListFileErr);
434 }
435 else
436 {
437 errMsg << "HashJoinByBucket_thr: caught LDL error: " << ex.what();
438 hjPtr->status(logging::largeHashJoinLargeDataListFileErr);
439 }
440
441 cerr << errMsg.str() << endl;
442 catchHandler(errMsg.str(), hjPtr->sessionId());
443 }
444 catch (const exception& ex)
445 {
446 ostringstream errMsg;
447
448 if (typeid(e_t) == typeid(StringElementType))
449 {
450 errMsg << "HashJoinByBucket_thr<String>: caught: " << ex.what();
451 hjPtr->status(logging::stringHashJoinStepErr);
452 }
453 else
454 {
455 errMsg << "HashJoinByBucket_thr: caught: " << ex.what();
456 hjPtr->status(logging::largeHashJoinErr);
457 }
458
459 cerr << errMsg.str() << endl;
460 catchHandler(errMsg.str(), hjPtr->sessionId());
461 }
462 catch (...)
463 {
464 ostringstream errMsg;
465
466 if (typeid(e_t) == typeid(StringElementType))
467 {
468 errMsg << "HashJoinByBucket_thr<String>: caught unknown exception: ";
469 hjPtr->status(logging::stringHashJoinStepErr);
470 }
471 else
472 {
473 errMsg << "HashJoinByBucket_thr: caught unknown exception";
474 hjPtr->status(logging::largeHashJoinErr);
475 }
476
477 cerr << errMsg.str() << endl;
478 catchHandler(errMsg.str(), hjPtr->sessionId());
479 }
480
481 return NULL;
482 } // HashJoinByBucket_thr
483
LargeHashJoin(JoinType joinType,uint32_t sessionId,uint32_t txnId,uint32_t statementId,ResourceManager * rm)484 LargeHashJoin::LargeHashJoin(JoinType joinType,
485 uint32_t sessionId,
486 uint32_t txnId,
487 uint32_t statementId,
488 ResourceManager* rm ):
489 fSessionId(sessionId), fTxnId(txnId),
490 fStepId(0), fStatementId(statementId), fTableOID1(0), fTableOID2(0),
491 fJoinType(joinType), fRm(rm), fAlias1(), fAlias2()
492 {
493 // fConfig = config::Config::makeConfig();
494 // fJoinType = joinType;
495 }
496
~LargeHashJoin()497 LargeHashJoin::~LargeHashJoin()
498 {
499 if (traceOn())
500 {
501 for (uint64_t i = 0; i < fInputJobStepAssociation.outSize(); i++)
502 logDiskIoInfo(fStepId, fInputJobStepAssociation.outAt(i));
503
504 for (uint64_t i = 0; i < fOutputJobStepAssociation.outSize(); i++)
505 logDiskIoInfo(fStepId, fOutputJobStepAssociation.outAt(i));
506 }
507 }
508
join()509 void LargeHashJoin::join()
510 {
511 runner->join();
512 }
513
run()514 void LargeHashJoin::run()
515 {
516 if (traceOn())
517 {
518 syslogStartStep(16, // exemgr subsystem
519 std::string("LargeHashJoin")); // step name
520 }
521
522 runner.reset(new boost::thread(HJRunner(this)));
523 }
524
unblockDatalists(uint16_t status)525 void LargeHashJoin::unblockDatalists(uint16_t status)
526 {
527 fOutputJobStepAssociation.status(status);
528 fOutputJobStepAssociation.outAt(0)->dataList()->endOfInput();
529 fOutputJobStepAssociation.outAt(1)->dataList()->endOfInput();
530 }
531
errorLogging(const string & msg) const532 void LargeHashJoin::errorLogging(const string& msg) const
533 {
534 ostringstream errMsg;
535 errMsg << "Step " << stepId() << "; " << msg;
536 cerr << errMsg.str() << endl;
537 catchHandler( errMsg.str(), sessionId() );
538 }
539
doHashJoin()540 void LargeHashJoin::doHashJoin()
541 {
542 string val;
543
544 idbassert(fInputJobStepAssociation.outSize() >= 2);
545 idbassert(fOutputJobStepAssociation.outSize() >= 2);
546 BucketDataList* Ap = 0;
547 BucketDataList* Bp = 0;
548 BucketDataList* tAp = 0;
549 BucketDataList* tBp = 0;
550 DataList_t* inDL1 = 0;
551 DataList_t* inDL2 = 0;
552 inDL1 = fInputJobStepAssociation.outAt(0)->dataList();
553 inDL2 = fInputJobStepAssociation.outAt(1)->dataList();
554 idbassert(inDL1);
555 idbassert(inDL2);
556
557 HashJoin<ElementType>* hj = 0;
558 double createWorkTime = 0;
559 double hashWorkTime = 0;
560 double insertWorkTime = 0;
561 DataList_t* resultA = fOutputJobStepAssociation.outAt(0)->dataList();
562 DataList_t* resultB = fOutputJobStepAssociation.outAt(1)->dataList();
563
564 if (0 < fInputJobStepAssociation.status())
565 {
566 unblockDatalists(fInputJobStepAssociation.status());
567 }
568 else
569 {
570
571 string currentAction("preparing join");
572
573 try
574 {
575 //If we're given BucketDL's, use them
576 if (typeid(*inDL1) == typeid(BucketDataList))
577 {
578
579 if (typeid(*inDL2) != typeid(BucketDataList))
580 {
581 throw logic_error("LargeHashJoin::run: expected either 0 or 2 BucketDL's!");
582 }
583
584 Ap = dynamic_cast<BucketDataList*>(inDL1);
585 Bp = dynamic_cast<BucketDataList*>(inDL2);
586 }
587 else
588 {
589 throw logic_error("HashJoin will take only BucketDLs as inputs");
590 int maxBuckets = fRm.getHjMaxBuckets();
591 joblist::ridtype_t maxElems = fRm.getHjMaxElems();
592
593 BucketDataList* tAp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
594 BucketDataList* tBp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
595 tAp->setHashMode(1);
596 tBp->setHashMode(1);
597
598 ElementType element;
599 int id;
600
601 id = inDL1->getIterator();
602
603 while (inDL1->next(id, &element))
604 {
605 tAp->insert(element);
606 }
607
608 tAp->endOfInput();
609
610 id = inDL2->getIterator();
611
612 while (inDL2->next(id, &element))
613 {
614 tBp->insert(element);
615 }
616
617 tBp->endOfInput();
618
619 Ap = tAp;
620 Bp = tBp;
621 }
622
623 unsigned numThreads = fRm.getHjNumThreads();
624
625 BDLWrapper< ElementType > setA(Ap);
626 BDLWrapper< ElementType > setB(Bp);
627
628 hj = new HashJoin<ElementType>(setA, setB, resultA, resultB, fJoinType, &dlTimes, fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
629
630 if (fTableOID2 >= 3000)
631 {
632 ostringstream logStr2;
633 logStr2 << "LargeHashJoin::run: ses:" << fSessionId <<
634 " st:" << fStepId <<
635 " input sizes: " << setA.size() << "/" << setB.size() << endl;
636 cout << logStr2.str();
637 }
638
639 currentAction = "performing join";
640
641 if (fTableOID2 >= 3000)
642 {
643 dlTimes.setFirstReadTime();
644 dlTimes.setEndOfInputTime( dlTimes.FirstReadTime() );
645 }
646
647 hj->performJoin(numThreads);
648
649 } // try
650 catch (const logging::LargeDataListExcept& ex)
651 {
652 ostringstream errMsg;
653 errMsg << __FILE__ << " doHashJoin: " <<
654 currentAction << ", caught LDL error: " << ex.what();
655 errorLogging(errMsg.str());
656 unblockDatalists(logging::largeHashJoinLargeDataListFileErr);
657 }
658 catch (const exception& ex)
659 {
660 ostringstream errMsg;
661 errMsg << __FILE__ << " doHashJoin: " <<
662 currentAction << ", caught: " << ex.what();
663 errorLogging(errMsg.str());
664 unblockDatalists(logging::largeHashJoinErr);
665 }
666 catch (...)
667 {
668 ostringstream errMsg;
669 errMsg << __FILE__ << " doHashJoin: " <<
670 currentAction << ", caught unknown exception";
671 errorLogging(errMsg.str());
672 unblockDatalists(logging::largeHashJoinErr);
673 }
674
675 if (hj)
676 {
677 //..hashWorkTime is the time to perform the hashjoin excluding the
678 // the output insertion time. insertWorkTime is the sum or total
679 // of both insert times. The end result is that createWorkTime +
680 // hashWorkTime + insertWorkTime roughly equates to the total work
681 // time.
682 createWorkTime = hj->getTimeSet()->totalTime(createHashStr);
683 hashWorkTime = hj->getTimeSet()->totalTime(hashJoinStr) -
684 hj->getTimeSet()->totalTime(insertResultsStr);
685 insertWorkTime = hj->getTimeSet()->totalTime(insertResultsStr) +
686 hj->getTimeSet()->totalTime(insertLastResultsStr);
687 }
688
689 } // (fInputJobStepAssociation.status() == 0)
690
691 if (fTableOID2 >= 3000 && traceOn())
692 {
693 time_t finTime = time(0);
694 char finTimeString[50];
695 ctime_r(&finTime, finTimeString);
696 finTimeString[strlen(finTimeString) - 1 ] = '\0';
697
698 ostringstream logStr;
699 logStr << "ses:" << fSessionId << " st: " << fStepId <<
700 " finished at " << finTimeString <<
701 "; 1st read " << dlTimes.FirstReadTimeString() <<
702 "; EOI " << dlTimes.EndOfInputTimeString() << endl
703 << "\tLargeHashJoin::run: output sizes: "
704 << resultA->totalSize() << "/" << resultB->totalSize()
705 << " run time: " <<
706 JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
707 << fixed << setprecision(2)
708 << "s\n\tTotal work times: create hash: " << createWorkTime
709 << "s, hash join: " << hashWorkTime
710 << "s, insert results: " << insertWorkTime << "s\n"
711 << "\tJob completion status " << fInputJobStepAssociation.status() << endl;
712 logEnd(logStr.str().c_str());
713
714 syslogProcessingTimes(16, // exemgr subsystem
715 dlTimes.FirstReadTime(), // use join start time for first read time
716 dlTimes.EndOfInputTime(), // use join end time for last read time
717 dlTimes.FirstReadTime(), // use join start time for first write time
718 dlTimes.EndOfInputTime()); // use join end time for last write time
719 syslogEndStep(16, // exemgr subsystem
720 0, // no blocked datalist input to report
721 0); // no blocked datalist output to report
722 }
723
724 delete hj;
725 delete tAp;
726 delete tBp;
727 }
728
toString() const729 const string LargeHashJoin::toString() const
730 {
731 ostringstream oss;
732 CalpontSystemCatalog::OID oid1 = 0;
733 CalpontSystemCatalog::OID oid2 = 0;
734 DataList_t* dl1;
735 DataList_t* dl2;
736 size_t idlsz;
737
738 idlsz = fInputJobStepAssociation.outSize();
739 idbassert(idlsz == 2);
740 dl1 = fInputJobStepAssociation.outAt(0)->dataList();
741
742 if (dl1) oid1 = dl1->OID();
743
744 dl2 = fInputJobStepAssociation.outAt(1)->dataList();
745
746 if (dl2) oid2 = dl2->OID();
747
748 oss << "LargeHashJoin ses:" << fSessionId << " st:" << fStepId;
749 oss << omitOidInDL;
750 oss << " in tb/col1:" << fTableOID1 << "/" << oid1;
751 oss << " " << fInputJobStepAssociation.outAt(0);
752 oss << " in tb/col2:" << fTableOID2 << "/" << oid2;
753 oss << " " << fInputJobStepAssociation.outAt(1);
754
755 idlsz = fOutputJobStepAssociation.outSize();
756 idbassert(idlsz == 2);
757 dl1 = fOutputJobStepAssociation.outAt(0)->dataList();
758
759 if (dl1) oid1 = dl1->OID();
760
761 dl2 = fOutputJobStepAssociation.outAt(1)->dataList();
762
763 if (dl2) oid2 = dl2->OID();
764
765 oss << endl << " ";
766 oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
767 oss << " " << fOutputJobStepAssociation.outAt(0);
768 oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
769 oss << " " << fOutputJobStepAssociation.outAt(1) << endl;
770
771 return oss.str();
772 }
773
StringHashJoinStep(JoinType joinType,uint32_t sessionId,uint32_t txnId,uint32_t statementId,ResourceManager * rm)774 StringHashJoinStep::StringHashJoinStep(JoinType joinType,
775 uint32_t sessionId,
776 uint32_t txnId,
777 uint32_t statementId,
778 ResourceManager* rm):
779 LargeHashJoin(joinType, sessionId, txnId, statementId, rm)
780 {
781 }
782
~StringHashJoinStep()783 StringHashJoinStep::~StringHashJoinStep()
784 {
785 }
786
run()787 void StringHashJoinStep::run()
788 {
789 if (traceOn())
790 {
791 syslogStartStep(16, // exemgr subsystem
792 std::string("StringHashJoinStep")); // step name
793 }
794
795 runner.reset(new boost::thread(StringHJRunner(this)));
796 }
797
doStringHashJoin()798 void StringHashJoinStep::doStringHashJoin()
799 {
800 string val;
801
802 idbassert(fInputJobStepAssociation.outSize() >= 2);
803 idbassert(fOutputJobStepAssociation.outSize() >= 2);
804 DataList<StringElementType>* inDL1 = fInputJobStepAssociation.outAt(0)->stringDataList();
805 DataList<StringElementType>* inDL2 = fInputJobStepAssociation.outAt(1)->stringDataList();
806 idbassert(inDL1);
807 idbassert(inDL2);
808
809 BucketDL<StringElementType>* Ap = 0;
810 BucketDL<StringElementType>* Bp = 0;
811 BucketDL<StringElementType>* tAp = 0;
812 BucketDL<StringElementType>* tBp = 0;
813
814 HashJoin<StringElementType>* hj = 0;
815 double createWorkTime = 0;
816 double hashWorkTime = 0;
817 double insertWorkTime = 0;
818 DataList_t* resultA = fOutputJobStepAssociation.outAt(0)->dataList();
819 DataList_t* resultB = fOutputJobStepAssociation.outAt(1)->dataList();
820 struct timeval start_time;
821 gettimeofday(&start_time, 0);
822 struct timeval end_time = start_time;
823 ZonedDL* bdl1 = 0;
824 ZonedDL* bdl2 = 0;
825
826 // result from hashjoinstep is expected to be BandedDataList
827 // but the HashJoin<StringElementType> returns StringDataList
828 // also, the null is reported as "_CpNuLl_" by pDictionStep
829 // create two StringDataList for the intermediate result BDL
830 // @bug 721. use zdl.
831 StringZonedDL* dlA = new StringZonedDL(1, fRm);
832 dlA->setMultipleProducers(true);
833 StringZonedDL* dlB = new StringZonedDL(1, fRm);
834 dlB->setMultipleProducers(true);
835
836 if (0 < fInputJobStepAssociation.status() )
837 {
838 unblockDatalists(fInputJobStepAssociation.status());
839 }
840 else
841 {
842
843 string currentAction("preparing join");
844
845 try
846 {
847 //If we're given BucketDL's, use them
848 if (typeid(*inDL1) == typeid(BucketDL<StringElementType>))
849 {
850 if (typeid(*inDL2) != typeid(BucketDL<StringElementType>))
851 {
852 throw logic_error("StringHashJoinStep::run: expected either 0 or 2 BucketDL's!");
853 }
854
855 Ap = dynamic_cast<BucketDL<StringElementType>*>(inDL1);
856 Bp = dynamic_cast<BucketDL<StringElementType>*>(inDL2);
857 }
858 else
859 {
860 int maxBuckets = fRm.getHjMaxBuckets();
861 joblist::ridtype_t maxElems = fRm.getHjMaxElems();
862
863 // int maxBuckets=4;
864 // joblist::ridtype_t maxElems=1024*8;
865 // val = fConfig->getConfig("HashJoin", "MaxBuckets"); // same as HashJoin
866 // if (val.size() > 0)
867 // maxBuckets = static_cast<int>(config::Config::fromText(val));
868 // if (maxBuckets <=0)
869 // maxBuckets=4;
870 // val = fConfig->getConfig("HashJoin", "MaxElems"); // same as HashJoin
871 // if (val.size() >0)
872 // maxElems = config::Config::uFromText(val);
873 // if (maxElems<=0)
874 // maxElems=1024*8;
875
876 tAp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
877 tBp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
878 tAp->setHashMode(1);
879 tBp->setHashMode(1);
880
881 StringElementType element;
882 int id = inDL1->getIterator();
883
884 while (inDL1->next(id, &element))
885 {
886 tAp->insert(element);
887 }
888
889 tAp->endOfInput();
890
891 id = inDL2->getIterator();
892
893 while (inDL2->next(id, &element))
894 {
895 tBp->insert(element);
896 }
897
898 tBp->endOfInput();
899
900 Ap = tAp;
901 Bp = tBp;
902 }
903
904 unsigned numThreads = fRm.getHjNumThreads();
905 // unsigned numThreads = 0;
906 // val = fConfig->getConfig("HashJoin", "NumThreads");
907 // if (val.size() > 0)
908 // numThreads = static_cast<unsigned>(config::Config::uFromText(val));
909 // if (numThreads <= 0)
910 // numThreads = 4;
911
912 BDLWrapper< StringElementType > setA(Ap);
913 BDLWrapper< StringElementType > setB(Bp);
914
915 HashJoin<StringElementType>* hj =
916 new HashJoin<StringElementType>(setA, setB, dlA, dlB, fJoinType, &dlTimes, fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
917
918 if ((dlA == NULL) || (dlB == NULL) || (hj == NULL))
919 {
920 ostringstream oss;
921 oss << "StringHashJoinStep::run() null pointer from new -- ";
922 oss << "StringDataList A(0x" << hex << (ptrdiff_t)dlA << "), B(0x"
923 << (ptrdiff_t)dlB << "), HashJoin hj(0x" << (ptrdiff_t)hj << ")";
924 throw (runtime_error(oss.str().c_str()));
925 }
926
927 // leave this in
928 if (fTableOID2 >= 3000)
929 {
930 ostringstream logStr2;
931 logStr2 << "StringHashJoinStep::run: ses:" << fSessionId <<
932 " st:" << fStepId <<
933 " input sizes: " << setA.size() << "/" << setB.size() << endl;
934 cout << logStr2.str();
935 }
936
937 currentAction = "performing join";
938
939 if (fTableOID2 >= 3000)
940 {
941 dlTimes.setFirstReadTime();
942 dlTimes.setEndOfInputTime( dlTimes.FirstReadTime() );
943 }
944
945 hj->performJoin(numThreads);
946
947 currentAction = "after join";
948
949 // convert from StringElementType to ElementType by grabbing the rid
950 // take _CpNuLl_ out of the result
951 StringElementType se;
952 ElementType e;
953 int id = dlA->getIterator();
954
955 bdl1 = dynamic_cast<ZonedDL*>(resultA);
956 bdl2 = dynamic_cast<ZonedDL*>(resultB);
957 vector <ElementType> v;
958 v.reserve(ZDL_VEC_SIZE);
959
960 if (bdl1)
961 {
962 while (dlA->next(id, &se))
963 {
964 if (se.second != CPNULLSTRMARK)
965 {
966 e.first = se.first;
967 v.push_back(e);
968
969 if (v.size() >= ZDL_VEC_SIZE)
970 {
971 resultA->insert(v);
972 v.clear();
973 }
974 }
975 }
976
977 if (v.size() > 0)
978 resultA->insert(v);
979
980 resultA->endOfInput();
981 }
982
983 else
984 {
985 while (dlA->next(id, &se))
986 {
987 if (se.second != CPNULLSTRMARK)
988 {
989 e.first = se.first;
990 resultA->insert(e);
991 }
992 }
993
994 resultA->endOfInput();
995 }
996
997 id = dlB->getIterator();
998
999 if (bdl2)
1000 {
1001 v.clear();
1002
1003 while (dlB->next(id, &se))
1004 {
1005 if (se.second != CPNULLSTRMARK)
1006 {
1007 e.first = se.first;
1008 v.push_back(e);
1009
1010 if (v.size() >= ZDL_VEC_SIZE)
1011 {
1012 resultB->insert(v);
1013 v.clear();
1014 }
1015 }
1016 }
1017
1018 if (v.size() > 0)
1019 resultB->insert(v);
1020
1021 resultB->endOfInput();
1022 }
1023 else
1024 {
1025 while (dlB->next(id, &se))
1026 {
1027 if (se.second != CPNULLSTRMARK)
1028 {
1029 e.first = se.first;
1030 resultB->insert(e);
1031 }
1032 }
1033
1034 resultB->endOfInput();
1035 }
1036 } // try
1037 catch (const logging::LargeDataListExcept& ex)
1038 {
1039 ostringstream errMsg;
1040 errMsg << __FILE__ << " doStringHashJoin: " <<
1041 currentAction << ", caught LDL error: " << ex.what();
1042 errorLogging(errMsg.str());
1043 unblockDatalists(logging::stringHashJoinStepLargeDataListFileErr);
1044 dlA->endOfInput();
1045 dlB->endOfInput();
1046 }
1047 catch (const exception& ex)
1048 {
1049 ostringstream errMsg;
1050 errMsg << __FILE__ << " doStringHashJoin: " <<
1051 currentAction << ", caught: " << ex.what();
1052 errorLogging(errMsg.str());
1053 unblockDatalists(logging::stringHashJoinStepErr);
1054 dlA->endOfInput();
1055 dlB->endOfInput();
1056 }
1057 catch (...)
1058 {
1059 ostringstream errMsg;
1060 errMsg << __FILE__ << " doStringHashJoin: " <<
1061 currentAction << ", caught unknown exception";
1062 errorLogging(errMsg.str());
1063 unblockDatalists(logging::stringHashJoinStepErr);
1064 dlA->endOfInput();
1065 dlB->endOfInput();
1066 }
1067
1068 gettimeofday(&end_time, 0);
1069
1070 if (fTableOID2 >= 3000) dlTimes.setEndOfInputTime();
1071
1072 if (hj)
1073 {
1074 //..hashWorkTime is the time to perform the hashjoin excluding the
1075 // the output insertion time. insertWorkTime is the sum or total
1076 // of both insert times. The end result is that createWorkTime +
1077 // hashWorkTime + insertWorkTime roughly equates to the total work
1078 // time.
1079 createWorkTime = hj->getTimeSet()->totalTime(createHashStr);
1080 hashWorkTime = hj->getTimeSet()->totalTime(hashJoinStr) -
1081 hj->getTimeSet()->totalTime(insertResultsStr);
1082 insertWorkTime = hj->getTimeSet()->totalTime(insertResultsStr) +
1083 hj->getTimeSet()->totalTime(insertLastResultsStr);
1084 }
1085
1086 } // (fInputJobStepAssociation.status() == 0)
1087
1088 if (fTableOID2 >= 3000 && traceOn())
1089 {
1090 time_t finTime = time(0);
1091 char finTimeString[50];
1092 ctime_r(&finTime, finTimeString);
1093 finTimeString[strlen(finTimeString) - 1 ] = '\0';
1094
1095 ostringstream logStr;
1096 logStr << "ses:" << fSessionId << " st: " << fStepId <<
1097 " finished at " << finTimeString <<
1098 "; 1st read " << dlTimes.FirstReadTimeString() <<
1099 "; EOI " << dlTimes.EndOfInputTimeString() << endl
1100 << "\tStringHashJoinStep::run: output sizes: "
1101 << dlA->totalSize() << "/" << dlB->totalSize() << " [";
1102
1103 if (bdl1 && bdl2)
1104 logStr << bdl1->totalSize() << "/" << bdl2->totalSize();
1105
1106 logStr << "] run time: " <<
1107 JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
1108 << fixed << setprecision(2)
1109 << "s\n\tTotal work times: create hash: " << createWorkTime
1110 << "s, hash join: " << hashWorkTime
1111 << "s, insert results: " << insertWorkTime << "s\n"
1112 << "\tJob completion status " << fInputJobStepAssociation.status() << endl;
1113 logEnd(logStr.str().c_str());
1114
1115 syslogProcessingTimes(16, // exemgr subsystem
1116 start_time, // use join start time for first read time
1117 end_time, // use join end time for last read time
1118 start_time, // use join start time for first write time
1119 end_time); // use join end time for last write time
1120 syslogEndStep(16, // exemgr subsystem
1121 0, // no blocked datalist input to report
1122 0); // no blocked datalist output to report
1123 }
1124
1125 delete hj;
1126 delete tAp;
1127 delete tBp;
1128 delete dlA;
1129 delete dlB;
1130 }
1131
toString() const1132 const string StringHashJoinStep::toString() const
1133 {
1134 ostringstream oss;
1135 CalpontSystemCatalog::OID oid1 = 0;
1136 CalpontSystemCatalog::OID oid2 = 0;
1137
1138 size_t idlsz = fInputJobStepAssociation.outSize();
1139 idbassert(idlsz == 2);
1140 DataList<StringElementType>* dl1 = fInputJobStepAssociation.outAt(0)->stringDataList();
1141
1142 if (dl1) oid1 = dl1->OID();
1143
1144 DataList<StringElementType>* dl2 = fInputJobStepAssociation.outAt(1)->stringDataList();
1145
1146 if (dl2) oid2 = dl2->OID();
1147
1148 oss << "StringHashJoinStep ses:" << fSessionId << " st:" << fStepId;
1149 oss << omitOidInDL;
1150 oss << " in tb/col1:" << fTableOID1 << "/" << oid1;
1151 oss << " " << fInputJobStepAssociation.outAt(0);
1152 oss << " in tb/col2:" << fTableOID2 << "/" << oid2;
1153 oss << " " << fInputJobStepAssociation.outAt(1);
1154
1155 idlsz = fOutputJobStepAssociation.outSize();
1156 idbassert(idlsz == 2);
1157 DataList_t* dl3 = fOutputJobStepAssociation.outAt(0)->dataList();
1158
1159 if (dl3) oid1 = dl3->OID();
1160
1161 DataList_t* dl4 = fOutputJobStepAssociation.outAt(1)->dataList();
1162
1163 if (dl4) oid2 = dl4->OID();
1164
1165 oss << endl << " ";
1166 oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
1167 oss << " " << fOutputJobStepAssociation.outAt(0);
1168 oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
1169 oss << " " << fOutputJobStepAssociation.outAt(1);
1170
1171 return oss.str();
1172 }
1173
1174 }
1175
1176