1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 // $Id: largehashjoin.cpp 9655 2013-06-25 23:08:13Z xlou $
20 #include <string>
21 #include <sstream>
22 #include <cassert>
23 #include <stdexcept>
24 #include <ctime>
25 #include <sys/time.h>
26 #include <iomanip>
27 using namespace std;
28 
29 #include <boost/thread/mutex.hpp>
30 
31 #include "calpontsystemcatalog.h"
32 using namespace execplan;
33 
34 #include "jobstep.h"
35 #include "largehashjoin.h"
36 #include "elementtype.h"
37 using namespace joblist;
38 
39 #include "mcsconfig.h"
40 
41 boost::mutex fileLock_g;
42 
43 namespace
44 {
logDiskIoInfo(uint64_t stepId,const AnyDataListSPtr & spdl)45 void logDiskIoInfo(uint64_t stepId, const AnyDataListSPtr& spdl)
46 {
47     boost::mutex::scoped_lock lk(fileLock_g);
48     string umDiskioLog = string(MCSLOGDIR) + "/trace/umdiskio.log";
49     string umDiskioBak = string(MCSLOGDIR) + "/trace/umdiskio.bak";
50 
51     ofstream umDiskIoFile(umDiskioLog.c_str(), ios_base::app);
52 
53     CalpontSystemCatalog::OID oid;
54     uint64_t maxBuckets = 0;
55     list<DiskIoInfo>* infoList = NULL;
56     string bkt("bkt");
57     BucketDL<ElementType>*       bdl  = spdl->bucketDL();
58     BucketDL<StringElementType>* sbdl = spdl->stringBucketDL();
59     ZDL<ElementType>*            zdl  = spdl->zonedDL();
60     ZDL<StringElementType>*      szdl = spdl->stringZonedDL();
61 
62     if (bdl != NULL)
63     {
64         maxBuckets = bdl->bucketCount();
65         oid = bdl->OID();
66     }
67     else if (zdl != NULL)
68     {
69         maxBuckets = zdl->bucketCount();
70         oid = zdl->OID();
71         bkt = "zdl";
72     }
73     else if (sbdl != NULL)
74     {
75         maxBuckets = sbdl->bucketCount();
76         oid = sbdl->OID();
77     }
78     else if (szdl != NULL)
79     {
80         maxBuckets = szdl->bucketCount();
81         oid = szdl->OID();
82         bkt = "zdl";
83     }
84     else
85     {
86         // not logged for now.
87         return;
88     }
89 
90     for (uint64_t i = 0; i < maxBuckets; i++)
91     {
92         if (bdl) infoList = &(bdl->diskIoInfoList(i));
93         else if (zdl) infoList = &(zdl->diskIoInfoList(i));
94         else if (sbdl) infoList = &(sbdl->diskIoInfoList(i));
95         else if (szdl) infoList = &(szdl->diskIoInfoList(i));
96 
97         for (list<DiskIoInfo>::iterator j = infoList->begin(); j != infoList->end(); j++)
98         {
99             boost::posix_time::time_duration td = j->fEnd - j->fStart;
100             umDiskIoFile << setfill('0')
101                          << "st" << setw(2) << stepId << "oid" << oid << bkt << setw(3) << i
102                          << (j->fWrite ? " writes " : " reads  ") << setw(7) << setfill(' ')
103                          << j->fBytes << " bytes, at " << j->fStart << " duration "
104                          << td.total_microseconds() << " mcs @ "
105                          << (j->fBytes / td.total_microseconds()) << "MB/s" << endl;
106         }
107     }
108 
109     streampos curPos = umDiskIoFile.tellp( );
110     umDiskIoFile.close();
111 
112     // move the current file to bak when size above .5 G, so total log is 1 G
113     if (curPos > 0x20000000)
114     {
115         string cmd = "/bin/mv " + umDiskioLog + " " + umDiskioBak;
116         (void)system(cmd.c_str());
117     }
118 }
119 
120 }
121 
122 
123 namespace joblist
124 {
125 const uint64_t ZDL_VEC_SIZE = 4096;
126 //@bug 686. Make hashjoin doing jobs in seperate thread and return to main immediately.
127 // So the other job steps can start.
128 struct HJRunner
129 {
HJRunnerjoblist::HJRunner130     HJRunner(LargeHashJoin* p) : joiner(p)
131     {}
132     LargeHashJoin* joiner;
operator ()joblist::HJRunner133     void operator()()
134     {
135         try
136         {
137             joiner->doHashJoin();
138         }
139         catch (std::exception& e)
140         {
141             ostringstream errMsg;
142             errMsg << "HJRunner caught: " << e.what();
143             joiner->errorLogging(errMsg.str());
144             joiner->unblockDatalists(logging::largeHashJoinErr);
145         }
146         catch (...)
147         {
148             string msg("HJRunner caught something not an exception!");
149             joiner->errorLogging(msg);
150             joiner->unblockDatalists(logging::largeHashJoinErr);
151         }
152     }
153 };
154 
155 struct StringHJRunner
156 {
StringHJRunnerjoblist::StringHJRunner157     StringHJRunner(StringHashJoinStep* p) : joiner(p)
158     {}
159     StringHashJoinStep* joiner;
operator ()joblist::StringHJRunner160     void operator()()
161     {
162         try
163         {
164             joiner->doStringHashJoin();
165         }
166         catch (std::exception& e)
167         {
168             ostringstream errMsg;
169             errMsg << "StringHJRunner caught: " << e.what();
170             joiner->errorLogging(errMsg.str());
171             joiner->unblockDatalists(logging::stringHashJoinStepErr);
172         }
173         catch (...)
174         {
175             string msg("StringHJRunner caught something not an exception!");
176             joiner->errorLogging(msg);
177             joiner->unblockDatalists(logging::stringHashJoinStepErr);
178         }
179     }
180 };
181 
182 // Thread function used by HashJoin
183 //
184 template<typename e_t>
HashJoinByBucket_thr(void * arg)185 void* HashJoinByBucket_thr(void* arg)
186 {
187     typename HashJoin<e_t>::thrParams_t* params = (typename HashJoin<e_t>::thrParams_t*)arg;
188     HashJoin<e_t>* hjPtr = params->hjptr;
189     const uint32_t thrIdx = params->thrIdx;
190     long set1Size = 0;
191     long set2Size = 0;
192     bool sendAllHashSet = false;
193     bool sendAllSearchSet = false;
194 
195     try
196     {
197         for (uint32_t idx = 0, bucketIdx = params->startBucket;
198                 idx < params->numBuckets;
199                 idx++, bucketIdx++)
200         {
201 
202 #ifdef DEBUG
203             cout << "\tJoinByBucket() thr " << dec << thrIdx
204                  << " bkt " << bucketIdx
205                  <<  "/" << hjPtr->Set1()->bucketCount()
206                  << "/" << params->numBuckets << endl;
207 #endif
208 
209             JoinType joinType = hjPtr->getJoinType();
210 
211             set1Size = hjPtr->Set1()->size(bucketIdx);
212             set2Size = hjPtr->Set2()->size(bucketIdx);
213 
214             if ( set1Size <= 0 && set2Size <= 0 )
215             {
216                 continue;
217             }
218             else
219             {
220                 if (set1Size > set2Size)
221                 {
222                     hjPtr->setSearchSet(hjPtr->Set1()->getBDL(), thrIdx);
223                     hjPtr->setHashSet(hjPtr->Set2()->getBDL(), thrIdx);
224                     hjPtr->setSearchResult(hjPtr->Result1(), thrIdx);
225                     hjPtr->setHashResult(hjPtr->Result2(), thrIdx);
226                     sendAllHashSet = (joinType == RIGHTOUTER);
227                     sendAllSearchSet = (joinType == LEFTOUTER);
228                 }
229                 else
230                 {
231                     hjPtr->setHashSet(hjPtr->Set1()->getBDL(), thrIdx);
232                     hjPtr->setSearchSet(hjPtr->Set2()->getBDL(), thrIdx);
233                     hjPtr->setHashResult(hjPtr->Result1(), thrIdx);
234                     hjPtr->setSearchResult(hjPtr->Result2(), thrIdx);
235                     sendAllHashSet = (joinType == LEFTOUTER);
236                     sendAllSearchSet = (joinType == RIGHTOUTER);
237                 } //if set1Size > set2Size ...
238 
239             } // if set1Size <=0 . . .
240 
241             params->timeset.setTimer(createHashStr);
242             hjPtr->createHash(hjPtr->HashSet(thrIdx),
243                               hjPtr->HashTable(thrIdx),
244                               bucketIdx,
245                               sendAllHashSet,
246                               hjPtr->HashResult(thrIdx),
247                               params->dlTimes, params->die);
248             params->timeset.holdTimer(createHashStr);
249 
250 #ifdef DEBUG
251             long hashSetTotal = 0;
252             long searchSetTotal = 0;
253 
254             for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
255                 hashSetTotal += hjPtr->HashSet(thrIdx)->bucketCount(); // are bucketDL
256 
257             for (uint32_t j = 0; j < hjPtr->HashSet(thrIdx)->bucketCount(); j++)
258                 searchSetTotal += hjPtr->SearchSet(thrIdx)->size(j); // can be any datalist
259 
260             cout << "\t\tJoinByBucket() thr " << dec << thrIdx
261                  << " bkt " << bucketIdx
262                  << " hashSize " << hashSetTotal
263                  << " searchSize " << searchSetTotal << endl;
264 #endif
265 
266             bool more;
267             e_t e;
268             e_t e2;
269             const uint64_t InvalidRID = static_cast<uint64_t>(-1);
270             int iter = hjPtr->SearchSet(thrIdx)->getIterator(bucketIdx);
271 
272             ZonedDL* zdl1 = dynamic_cast<ZonedDL*>(hjPtr->SearchResult(thrIdx));
273             ZonedDL* zdl2 = dynamic_cast<ZonedDL*>(hjPtr->HashResult(thrIdx));
274             vector <e_t> vec1;
275             vector <e_t> vec2;
276 
277             std::pair<typename HashJoin<e_t>::hashIter_t, typename HashJoin<e_t>::hashIter_t> hashItPair;
278             typename HashJoin<e_t>::hashIter_t hashIt;
279             typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
280             params->timeset.setTimer(hashJoinStr);
281 
282             for (more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e);
283                     more && !(*params->die);
284                     more = hjPtr->SearchSet(thrIdx)->next(bucketIdx, iter, &e) )
285             {
286 
287                 // If sendAllSearchSet=true, keep all of the search set.  If this is
288                 // a right outer, we are dealing with a join such as
289                 // col1 = col2 (+)
290                 // where col1 is the SearchSet and col2 is the HashSet.  We want to include
291                 // all of col1 in this case regardless of whether there is a matching col2.
292                 if (sendAllSearchSet)
293                 {
294                     if (zdl1)
295                     {
296                         vec1.push_back(e);
297 
298                         if (vec1.size() >= ZDL_VEC_SIZE)
299                         {
300                             params->timeset.setTimer(insertResultsStr);
301                             hjPtr->SearchResult(thrIdx)->insert(vec1);
302                             vec1.clear();
303                             params->timeset.holdTimer(insertResultsStr);
304                         }
305                     }
306                     else
307                         hjPtr->SearchResult(thrIdx)->insert(e);
308                 }
309 
310                 hashIt = ht->find(e.second);
311 
312                 if (hashIt != ht->end())
313                 {
314 #ifdef DEBUG
315 
316                     if (hjPtr->SearchResult(thrIdx)->OID() >= 3000)
317                         cout << "JoinByBucket() SearchResult add " << bucketIdx
318                              << " [" << e.first << "][" << e.second << "]" << endl;
319 
320                     uint32_t a = 0;
321                     e_t b = e_t();
322 #endif
323 
324                     // If sendAllSearchSet=false, we already added the search result
325                     // before the if condition above.
326                     if (!sendAllSearchSet)
327                     {
328                         if (zdl1)
329                         {
330                             vec1.push_back(e);
331 
332                             if (vec1.size() >= ZDL_VEC_SIZE)
333                             {
334                                 params->timeset.setTimer(insertResultsStr);
335                                 hjPtr->SearchResult(thrIdx)->insert(vec1);
336                                 vec1.clear();
337                                 params->timeset.holdTimer(insertResultsStr);
338                             }
339                         }
340                         else
341                             hjPtr->SearchResult(thrIdx)->insert(e);
342                     }
343 
344                     // If sendAllHashSet=false, add the hash results to the output datalist.
345                     // If it is a left outer join then we already added all of the right side rows
346                     // in the bucket in the createHash call earlier in this function.
347                     if (!sendAllHashSet)
348                     {
349 
350                         // If the matching pair has it's RID set to invalid, it's already been encountered,
351                         // so no reason to add it to the output datalist or keep searching for more matching values.
352                         if (hashIt->second != InvalidRID)
353                         {
354 
355                             // If the matching pair has it's RID set to invalid, it's already been encountered,
356                             hashItPair = ht->equal_range(e.second);
357 
358                             for (hashIt = hashItPair.first; hashIt != hashItPair.second; hashIt++)
359                             {
360                                 e2.first = hashIt->second;
361                                 e2.second = e.second;
362 
363                                 if (zdl2)
364                                 {
365                                     vec2.push_back(e2);
366 
367                                     if (vec2.size() >= ZDL_VEC_SIZE)
368                                     {
369                                         params->timeset.setTimer(insertResultsStr);
370                                         hjPtr->HashResult(thrIdx)->insert(vec2);
371                                         vec2.clear();
372                                         params->timeset.holdTimer(insertResultsStr);
373                                     }
374                                 }
375                                 else
376                                     hjPtr->HashResult(thrIdx)->insert(e2);
377 
378 #ifdef DEBUG
379                                 a++;
380                                 b = v.second;
381 #endif
382 
383                                 // Set the RID to invalid rid now that it's been matched and added to the output datalist.
384                                 // This will keep us from duplicating it if it is matched again.
385                                 hashIt->second = InvalidRID;
386 
387                             }
388 
389 #ifdef DEBUG
390                             cout << "\t\tadded " << b << " " << a << " times" << endl << endl;
391 #endif
392                         }
393 
394                     }
395 
396                 } //  if hashIt != hashIt->end()
397 
398             } // for ( hjPtr...
399 
400             params->timeset.holdTimer(hashJoinStr);
401 
402             params->timeset.setTimer(insertLastResultsStr);
403 
404             if (vec1.size() != 0)
405             {
406                 hjPtr->SearchResult(thrIdx)->insert(vec1);
407                 vec1.clear();
408             }
409 
410             if (vec2.size() != 0)
411             {
412                 hjPtr->HashResult(thrIdx)->insert(vec2);
413                 vec2.clear();
414             }
415 
416             params->timeset.holdTimer(insertLastResultsStr);
417 
418             // typename HashJoin<e_t>::hash_t* ht = hjPtr->HashTable(thrIdx);
419             ht->clear();
420 
421         } // for (bucketIdx...
422     }  // try
423 // We don't have to call JSA.endOfInput() for this exception, because
424 // the parent thread takes care of that in performThreadedJoin().
425     catch (const logging::LargeDataListExcept& ex)
426     {
427         ostringstream errMsg;
428 
429         if (typeid(e_t) == typeid(StringElementType))
430         {
431             errMsg << "HashJoinByBucket_thr<String>: caught LDL error: " <<
432                    ex.what();
433             hjPtr->status(logging::stringHashJoinStepLargeDataListFileErr);
434         }
435         else
436         {
437             errMsg << "HashJoinByBucket_thr: caught LDL error: " << ex.what();
438             hjPtr->status(logging::largeHashJoinLargeDataListFileErr);
439         }
440 
441         cerr << errMsg.str() << endl;
442         catchHandler(errMsg.str(), hjPtr->sessionId());
443     }
444     catch (const exception& ex)
445     {
446         ostringstream errMsg;
447 
448         if (typeid(e_t) == typeid(StringElementType))
449         {
450             errMsg << "HashJoinByBucket_thr<String>: caught: " << ex.what();
451             hjPtr->status(logging::stringHashJoinStepErr);
452         }
453         else
454         {
455             errMsg << "HashJoinByBucket_thr: caught: " << ex.what();
456             hjPtr->status(logging::largeHashJoinErr);
457         }
458 
459         cerr << errMsg.str() << endl;
460         catchHandler(errMsg.str(), hjPtr->sessionId());
461     }
462     catch (...)
463     {
464         ostringstream errMsg;
465 
466         if (typeid(e_t) == typeid(StringElementType))
467         {
468             errMsg << "HashJoinByBucket_thr<String>: caught unknown exception: ";
469             hjPtr->status(logging::stringHashJoinStepErr);
470         }
471         else
472         {
473             errMsg << "HashJoinByBucket_thr: caught unknown exception";
474             hjPtr->status(logging::largeHashJoinErr);
475         }
476 
477         cerr << errMsg.str() << endl;
478         catchHandler(errMsg.str(), hjPtr->sessionId());
479     }
480 
481     return NULL;
482 } // HashJoinByBucket_thr
483 
LargeHashJoin(JoinType joinType,uint32_t sessionId,uint32_t txnId,uint32_t statementId,ResourceManager * rm)484 LargeHashJoin::LargeHashJoin(JoinType joinType,
485                              uint32_t sessionId,
486                              uint32_t txnId,
487                              uint32_t statementId,
488                              ResourceManager* rm ):
489     fSessionId(sessionId), fTxnId(txnId),
490     fStepId(0), fStatementId(statementId), fTableOID1(0), fTableOID2(0),
491     fJoinType(joinType), fRm(rm), fAlias1(), fAlias2()
492 {
493 // 	fConfig = config::Config::makeConfig();
494 // 	fJoinType = joinType;
495 }
496 
~LargeHashJoin()497 LargeHashJoin::~LargeHashJoin()
498 {
499     if (traceOn())
500     {
501         for (uint64_t i = 0; i < fInputJobStepAssociation.outSize(); i++)
502             logDiskIoInfo(fStepId, fInputJobStepAssociation.outAt(i));
503 
504         for (uint64_t i = 0; i < fOutputJobStepAssociation.outSize(); i++)
505             logDiskIoInfo(fStepId, fOutputJobStepAssociation.outAt(i));
506     }
507 }
508 
join()509 void LargeHashJoin::join()
510 {
511     runner->join();
512 }
513 
run()514 void LargeHashJoin::run()
515 {
516     if (traceOn())
517     {
518         syslogStartStep(16,               // exemgr subsystem
519                         std::string("LargeHashJoin")); // step name
520     }
521 
522     runner.reset(new boost::thread(HJRunner(this)));
523 }
524 
unblockDatalists(uint16_t status)525 void LargeHashJoin::unblockDatalists(uint16_t status)
526 {
527     fOutputJobStepAssociation.status(status);
528     fOutputJobStepAssociation.outAt(0)->dataList()->endOfInput();
529     fOutputJobStepAssociation.outAt(1)->dataList()->endOfInput();
530 }
531 
errorLogging(const string & msg) const532 void LargeHashJoin::errorLogging(const string& msg) const
533 {
534     ostringstream errMsg;
535     errMsg << "Step " << stepId() << "; " << msg;
536     cerr   << errMsg.str() << endl;
537     catchHandler( errMsg.str(), sessionId() );
538 }
539 
doHashJoin()540 void LargeHashJoin::doHashJoin()
541 {
542     string val;
543 
544     idbassert(fInputJobStepAssociation.outSize() >= 2);
545     idbassert(fOutputJobStepAssociation.outSize() >= 2);
546     BucketDataList* Ap = 0;
547     BucketDataList* Bp = 0;
548     BucketDataList* tAp = 0;
549     BucketDataList* tBp = 0;
550     DataList_t* inDL1 = 0;
551     DataList_t* inDL2 = 0;
552     inDL1 = fInputJobStepAssociation.outAt(0)->dataList();
553     inDL2 = fInputJobStepAssociation.outAt(1)->dataList();
554     idbassert(inDL1);
555     idbassert(inDL2);
556 
557     HashJoin<ElementType>* hj = 0;
558     double createWorkTime     = 0;
559     double hashWorkTime       = 0;
560     double insertWorkTime     = 0;
561     DataList_t* resultA       = fOutputJobStepAssociation.outAt(0)->dataList();
562     DataList_t* resultB       = fOutputJobStepAssociation.outAt(1)->dataList();
563 
564     if (0 < fInputJobStepAssociation.status())
565     {
566         unblockDatalists(fInputJobStepAssociation.status());
567     }
568     else
569     {
570 
571         string currentAction("preparing join");
572 
573         try
574         {
575             //If we're given BucketDL's, use them
576             if (typeid(*inDL1) == typeid(BucketDataList))
577             {
578 
579                 if (typeid(*inDL2) != typeid(BucketDataList))
580                 {
581                     throw logic_error("LargeHashJoin::run: expected either 0 or 2 BucketDL's!");
582                 }
583 
584                 Ap = dynamic_cast<BucketDataList*>(inDL1);
585                 Bp = dynamic_cast<BucketDataList*>(inDL2);
586             }
587             else
588             {
589                 throw logic_error("HashJoin will take only BucketDLs as inputs");
590                 int maxBuckets = fRm.getHjMaxBuckets();
591                 joblist::ridtype_t maxElems = fRm.getHjMaxElems();
592 
593                 BucketDataList* tAp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
594                 BucketDataList* tBp = new BucketDataList(maxBuckets, 1, maxElems, fRm);
595                 tAp->setHashMode(1);
596                 tBp->setHashMode(1);
597 
598                 ElementType element;
599                 int id;
600 
601                 id = inDL1->getIterator();
602 
603                 while (inDL1->next(id, &element))
604                 {
605                     tAp->insert(element);
606                 }
607 
608                 tAp->endOfInput();
609 
610                 id = inDL2->getIterator();
611 
612                 while (inDL2->next(id, &element))
613                 {
614                     tBp->insert(element);
615                 }
616 
617                 tBp->endOfInput();
618 
619                 Ap = tAp;
620                 Bp = tBp;
621             }
622 
623             unsigned numThreads = fRm.getHjNumThreads();
624 
625             BDLWrapper< ElementType > setA(Ap);
626             BDLWrapper< ElementType > setB(Bp);
627 
628             hj = new HashJoin<ElementType>(setA, setB, resultA, resultB, fJoinType, &dlTimes, fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
629 
630             if (fTableOID2 >= 3000)
631             {
632                 ostringstream logStr2;
633                 logStr2 << "LargeHashJoin::run: ses:" << fSessionId <<
634                         " st:" << fStepId <<
635                         " input sizes: " << setA.size() << "/" << setB.size() << endl;
636                 cout << logStr2.str();
637             }
638 
639             currentAction = "performing join";
640 
641             if (fTableOID2 >= 3000)
642             {
643                 dlTimes.setFirstReadTime();
644                 dlTimes.setEndOfInputTime( dlTimes.FirstReadTime() );
645             }
646 
647             hj->performJoin(numThreads);
648 
649         } // try
650         catch (const logging::LargeDataListExcept& ex)
651         {
652             ostringstream errMsg;
653             errMsg << __FILE__ << " doHashJoin: " <<
654                    currentAction << ", caught LDL error: " << ex.what();
655             errorLogging(errMsg.str());
656             unblockDatalists(logging::largeHashJoinLargeDataListFileErr);
657         }
658         catch (const exception& ex)
659         {
660             ostringstream errMsg;
661             errMsg << __FILE__ << " doHashJoin: " <<
662                    currentAction << ", caught: " << ex.what();
663             errorLogging(errMsg.str());
664             unblockDatalists(logging::largeHashJoinErr);
665         }
666         catch (...)
667         {
668             ostringstream errMsg;
669             errMsg << __FILE__ << " doHashJoin: " <<
670                    currentAction << ", caught unknown exception";
671             errorLogging(errMsg.str());
672             unblockDatalists(logging::largeHashJoinErr);
673         }
674 
675         if (hj)
676         {
677             //..hashWorkTime is the time to perform the hashjoin excluding the
678             //  the output insertion time.  insertWorkTime is the sum or total
679             //  of both insert times.  The end result is that createWorkTime +
680             //  hashWorkTime + insertWorkTime roughly equates to the total work
681             //  time.
682             createWorkTime  = hj->getTimeSet()->totalTime(createHashStr);
683             hashWorkTime    = hj->getTimeSet()->totalTime(hashJoinStr) -
684                               hj->getTimeSet()->totalTime(insertResultsStr);
685             insertWorkTime  = hj->getTimeSet()->totalTime(insertResultsStr) +
686                               hj->getTimeSet()->totalTime(insertLastResultsStr);
687         }
688 
689     } // (fInputJobStepAssociation.status() == 0)
690 
691     if (fTableOID2 >= 3000 && traceOn())
692     {
693         time_t finTime = time(0);
694         char finTimeString[50];
695         ctime_r(&finTime, finTimeString);
696         finTimeString[strlen(finTimeString) - 1 ] = '\0';
697 
698         ostringstream logStr;
699         logStr << "ses:" << fSessionId << " st: " << fStepId <<
700                " finished at " << finTimeString <<
701                "; 1st read " << dlTimes.FirstReadTimeString() <<
702                "; EOI " << dlTimes.EndOfInputTimeString() << endl
703                << "\tLargeHashJoin::run: output sizes: "
704                << resultA->totalSize() << "/" << resultB->totalSize()
705                << " run time: " <<
706                JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
707                << fixed << setprecision(2)
708                << "s\n\tTotal work times: create hash: " << createWorkTime
709                << "s, hash join: " << hashWorkTime
710                << "s, insert results: " << insertWorkTime << "s\n"
711                << "\tJob completion status " << fInputJobStepAssociation.status() << endl;
712         logEnd(logStr.str().c_str());
713 
714         syslogProcessingTimes(16, // exemgr subsystem
715                               dlTimes.FirstReadTime(),      // use join start time for first read time
716                               dlTimes.EndOfInputTime(),        // use join end   time for last  read time
717                               dlTimes.FirstReadTime(),      // use join start time for first write time
718                               dlTimes.EndOfInputTime());       // use join end   time for last  write time
719         syslogEndStep(16, // exemgr subsystem
720                       0,            // no blocked datalist input  to report
721                       0);           // no blocked datalist output to report
722     }
723 
724     delete hj;
725     delete tAp;
726     delete tBp;
727 }
728 
toString() const729 const string LargeHashJoin::toString() const
730 {
731     ostringstream oss;
732     CalpontSystemCatalog::OID oid1 = 0;
733     CalpontSystemCatalog::OID oid2 = 0;
734     DataList_t* dl1;
735     DataList_t* dl2;
736     size_t idlsz;
737 
738     idlsz = fInputJobStepAssociation.outSize();
739     idbassert(idlsz == 2);
740     dl1 = fInputJobStepAssociation.outAt(0)->dataList();
741 
742     if (dl1) oid1 = dl1->OID();
743 
744     dl2 = fInputJobStepAssociation.outAt(1)->dataList();
745 
746     if (dl2) oid2 = dl2->OID();
747 
748     oss << "LargeHashJoin    ses:" << fSessionId << " st:" << fStepId;
749     oss << omitOidInDL;
750     oss << " in  tb/col1:" << fTableOID1 << "/" << oid1;
751     oss << " " << fInputJobStepAssociation.outAt(0);
752     oss << " in  tb/col2:" << fTableOID2 << "/" << oid2;
753     oss << " " << fInputJobStepAssociation.outAt(1);
754 
755     idlsz = fOutputJobStepAssociation.outSize();
756     idbassert(idlsz == 2);
757     dl1 = fOutputJobStepAssociation.outAt(0)->dataList();
758 
759     if (dl1) oid1 = dl1->OID();
760 
761     dl2 = fOutputJobStepAssociation.outAt(1)->dataList();
762 
763     if (dl2) oid2 = dl2->OID();
764 
765     oss << endl << "                    ";
766     oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
767     oss << " " << fOutputJobStepAssociation.outAt(0);
768     oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
769     oss << " " << fOutputJobStepAssociation.outAt(1) << endl;
770 
771     return oss.str();
772 }
773 
StringHashJoinStep(JoinType joinType,uint32_t sessionId,uint32_t txnId,uint32_t statementId,ResourceManager * rm)774 StringHashJoinStep::StringHashJoinStep(JoinType joinType,
775                                        uint32_t sessionId,
776                                        uint32_t txnId,
777                                        uint32_t statementId,
778                                        ResourceManager* rm):
779     LargeHashJoin(joinType, sessionId, txnId, statementId, rm)
780 {
781 }
782 
~StringHashJoinStep()783 StringHashJoinStep::~StringHashJoinStep()
784 {
785 }
786 
run()787 void StringHashJoinStep::run()
788 {
789     if (traceOn())
790     {
791         syslogStartStep(16,                     // exemgr subsystem
792                         std::string("StringHashJoinStep")); // step name
793     }
794 
795     runner.reset(new boost::thread(StringHJRunner(this)));
796 }
797 
doStringHashJoin()798 void StringHashJoinStep::doStringHashJoin()
799 {
800     string val;
801 
802     idbassert(fInputJobStepAssociation.outSize() >= 2);
803     idbassert(fOutputJobStepAssociation.outSize() >= 2);
804     DataList<StringElementType>* inDL1 = fInputJobStepAssociation.outAt(0)->stringDataList();
805     DataList<StringElementType>* inDL2 = fInputJobStepAssociation.outAt(1)->stringDataList();
806     idbassert(inDL1);
807     idbassert(inDL2);
808 
809     BucketDL<StringElementType>* Ap = 0;
810     BucketDL<StringElementType>* Bp = 0;
811     BucketDL<StringElementType>* tAp = 0;
812     BucketDL<StringElementType>* tBp = 0;
813 
814     HashJoin<StringElementType>* hj = 0;
815     double createWorkTime     = 0;
816     double hashWorkTime       = 0;
817     double insertWorkTime     = 0;
818     DataList_t* resultA       = fOutputJobStepAssociation.outAt(0)->dataList();
819     DataList_t* resultB       = fOutputJobStepAssociation.outAt(1)->dataList();
820     struct timeval start_time;
821     gettimeofday(&start_time, 0);
822     struct timeval end_time   = start_time;
823     ZonedDL* bdl1             = 0;
824     ZonedDL* bdl2             = 0;
825 
826     // result from hashjoinstep is expected to be BandedDataList
827     // but the HashJoin<StringElementType> returns StringDataList
828     // also, the null is reported as "_CpNuLl_" by pDictionStep
829     // create two StringDataList for the intermediate result BDL
830     // @bug 721. use zdl.
831     StringZonedDL* dlA = new StringZonedDL(1, fRm);
832     dlA->setMultipleProducers(true);
833     StringZonedDL* dlB = new StringZonedDL(1, fRm);
834     dlB->setMultipleProducers(true);
835 
836     if (0 < fInputJobStepAssociation.status() )
837     {
838         unblockDatalists(fInputJobStepAssociation.status());
839     }
840     else
841     {
842 
843         string currentAction("preparing join");
844 
845         try
846         {
847             //If we're given BucketDL's, use them
848             if (typeid(*inDL1) == typeid(BucketDL<StringElementType>))
849             {
850                 if (typeid(*inDL2) != typeid(BucketDL<StringElementType>))
851                 {
852                     throw logic_error("StringHashJoinStep::run: expected either 0 or 2 BucketDL's!");
853                 }
854 
855                 Ap = dynamic_cast<BucketDL<StringElementType>*>(inDL1);
856                 Bp = dynamic_cast<BucketDL<StringElementType>*>(inDL2);
857             }
858             else
859             {
860                 int maxBuckets = fRm.getHjMaxBuckets();
861                 joblist::ridtype_t maxElems = fRm.getHjMaxElems();
862 
863 // 		int maxBuckets=4;
864 // 		joblist::ridtype_t maxElems=1024*8;
865 // 		val = fConfig->getConfig("HashJoin", "MaxBuckets");  // same as HashJoin
866 //    		if (val.size() > 0)
867 // 			maxBuckets = static_cast<int>(config::Config::fromText(val));
868 // 		if (maxBuckets <=0)
869 // 			maxBuckets=4;
870 // 		val = fConfig->getConfig("HashJoin", "MaxElems");    // same as HashJoin
871 // 		if (val.size() >0)
872 // 			maxElems = config::Config::uFromText(val);
873 // 		if (maxElems<=0)
874 // 			maxElems=1024*8;
875 
876                 tAp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
877                 tBp = new BucketDL<StringElementType>(maxBuckets, 1, maxElems, fRm);
878                 tAp->setHashMode(1);
879                 tBp->setHashMode(1);
880 
881                 StringElementType element;
882                 int id = inDL1->getIterator();
883 
884                 while (inDL1->next(id, &element))
885                 {
886                     tAp->insert(element);
887                 }
888 
889                 tAp->endOfInput();
890 
891                 id = inDL2->getIterator();
892 
893                 while (inDL2->next(id, &element))
894                 {
895                     tBp->insert(element);
896                 }
897 
898                 tBp->endOfInput();
899 
900                 Ap = tAp;
901                 Bp = tBp;
902             }
903 
904             unsigned numThreads = fRm.getHjNumThreads();
905 // 	unsigned numThreads = 0;
906 // 	val = fConfig->getConfig("HashJoin", "NumThreads");
907 // 	if (val.size() > 0)
908 // 		numThreads = static_cast<unsigned>(config::Config::uFromText(val));
909 // 	if (numThreads <= 0)
910 // 		numThreads = 4;
911 
912             BDLWrapper< StringElementType > setA(Ap);
913             BDLWrapper< StringElementType > setB(Bp);
914 
915             HashJoin<StringElementType>* hj =
916                 new HashJoin<StringElementType>(setA, setB, dlA, dlB, fJoinType, &dlTimes, fOutputJobStepAssociation.statusPtr(), sessionId(), &die);
917 
918             if ((dlA == NULL) || (dlB == NULL) || (hj == NULL))
919             {
920                 ostringstream oss;
921                 oss << "StringHashJoinStep::run() null pointer from new -- ";
922                 oss << "StringDataList A(0x" << hex << (ptrdiff_t)dlA << "), B(0x"
923                     << (ptrdiff_t)dlB << "), HashJoin hj(0x" << (ptrdiff_t)hj << ")";
924                 throw (runtime_error(oss.str().c_str()));
925             }
926 
927             // leave this in
928             if (fTableOID2 >= 3000)
929             {
930                 ostringstream logStr2;
931                 logStr2 << "StringHashJoinStep::run: ses:" << fSessionId <<
932                         " st:" << fStepId <<
933                         " input sizes: " << setA.size() << "/" << setB.size() << endl;
934                 cout << logStr2.str();
935             }
936 
937             currentAction = "performing join";
938 
939             if (fTableOID2 >= 3000)
940             {
941                 dlTimes.setFirstReadTime();
942                 dlTimes.setEndOfInputTime( dlTimes.FirstReadTime() );
943             }
944 
945             hj->performJoin(numThreads);
946 
947             currentAction = "after join";
948 
949             // convert from StringElementType to ElementType by grabbing the rid
950             // take _CpNuLl_ out of the result
951             StringElementType se;
952             ElementType       e;
953             int id = dlA->getIterator();
954 
955             bdl1 = dynamic_cast<ZonedDL*>(resultA);
956             bdl2 = dynamic_cast<ZonedDL*>(resultB);
957             vector <ElementType> v;
958             v.reserve(ZDL_VEC_SIZE);
959 
960             if (bdl1)
961             {
962                 while (dlA->next(id, &se))
963                 {
964                     if (se.second != CPNULLSTRMARK)
965                     {
966                         e.first = se.first;
967                         v.push_back(e);
968 
969                         if (v.size() >= ZDL_VEC_SIZE)
970                         {
971                             resultA->insert(v);
972                             v.clear();
973                         }
974                     }
975                 }
976 
977                 if (v.size() > 0)
978                     resultA->insert(v);
979 
980                 resultA->endOfInput();
981             }
982 
983             else
984             {
985                 while (dlA->next(id, &se))
986                 {
987                     if (se.second != CPNULLSTRMARK)
988                     {
989                         e.first = se.first;
990                         resultA->insert(e);
991                     }
992                 }
993 
994                 resultA->endOfInput();
995             }
996 
997             id = dlB->getIterator();
998 
999             if (bdl2)
1000             {
1001                 v.clear();
1002 
1003                 while (dlB->next(id, &se))
1004                 {
1005                     if (se.second != CPNULLSTRMARK)
1006                     {
1007                         e.first = se.first;
1008                         v.push_back(e);
1009 
1010                         if (v.size() >= ZDL_VEC_SIZE)
1011                         {
1012                             resultB->insert(v);
1013                             v.clear();
1014                         }
1015                     }
1016                 }
1017 
1018                 if (v.size() > 0)
1019                     resultB->insert(v);
1020 
1021                 resultB->endOfInput();
1022             }
1023             else
1024             {
1025                 while (dlB->next(id, &se))
1026                 {
1027                     if (se.second != CPNULLSTRMARK)
1028                     {
1029                         e.first = se.first;
1030                         resultB->insert(e);
1031                     }
1032                 }
1033 
1034                 resultB->endOfInput();
1035             }
1036         } // try
1037         catch (const logging::LargeDataListExcept& ex)
1038         {
1039             ostringstream errMsg;
1040             errMsg << __FILE__ << " doStringHashJoin: " <<
1041                    currentAction << ", caught LDL error: " << ex.what();
1042             errorLogging(errMsg.str());
1043             unblockDatalists(logging::stringHashJoinStepLargeDataListFileErr);
1044             dlA->endOfInput();
1045             dlB->endOfInput();
1046         }
1047         catch (const exception& ex)
1048         {
1049             ostringstream errMsg;
1050             errMsg << __FILE__ << " doStringHashJoin: " <<
1051                    currentAction << ", caught: " << ex.what();
1052             errorLogging(errMsg.str());
1053             unblockDatalists(logging::stringHashJoinStepErr);
1054             dlA->endOfInput();
1055             dlB->endOfInput();
1056         }
1057         catch (...)
1058         {
1059             ostringstream errMsg;
1060             errMsg << __FILE__ << " doStringHashJoin: " <<
1061                    currentAction << ", caught unknown exception";
1062             errorLogging(errMsg.str());
1063             unblockDatalists(logging::stringHashJoinStepErr);
1064             dlA->endOfInput();
1065             dlB->endOfInput();
1066         }
1067 
1068         gettimeofday(&end_time, 0);
1069 
1070         if (fTableOID2 >= 3000) dlTimes.setEndOfInputTime();
1071 
1072         if (hj)
1073         {
1074             //..hashWorkTime is the time to perform the hashjoin excluding the
1075             //  the output insertion time.  insertWorkTime is the sum or total
1076             //  of both insert times.  The end result is that createWorkTime +
1077             //  hashWorkTime + insertWorkTime roughly equates to the total work
1078             //  time.
1079             createWorkTime  = hj->getTimeSet()->totalTime(createHashStr);
1080             hashWorkTime    = hj->getTimeSet()->totalTime(hashJoinStr) -
1081                               hj->getTimeSet()->totalTime(insertResultsStr);
1082             insertWorkTime  = hj->getTimeSet()->totalTime(insertResultsStr) +
1083                               hj->getTimeSet()->totalTime(insertLastResultsStr);
1084         }
1085 
1086     } // (fInputJobStepAssociation.status() == 0)
1087 
1088     if (fTableOID2 >= 3000 && traceOn())
1089     {
1090         time_t finTime = time(0);
1091         char finTimeString[50];
1092         ctime_r(&finTime, finTimeString);
1093         finTimeString[strlen(finTimeString) - 1 ] = '\0';
1094 
1095         ostringstream logStr;
1096         logStr << "ses:" << fSessionId << " st: " << fStepId <<
1097                " finished at " << finTimeString <<
1098                "; 1st read " << dlTimes.FirstReadTimeString() <<
1099                "; EOI " << dlTimes.EndOfInputTimeString() << endl
1100                << "\tStringHashJoinStep::run: output sizes: "
1101                << dlA->totalSize() << "/" << dlB->totalSize() << " [";
1102 
1103         if (bdl1 && bdl2)
1104             logStr << bdl1->totalSize() << "/" << bdl2->totalSize();
1105 
1106         logStr << "] run time: " <<
1107                JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
1108                << fixed << setprecision(2)
1109                << "s\n\tTotal work times: create hash: " << createWorkTime
1110                << "s, hash join: " << hashWorkTime
1111                << "s, insert results: " << insertWorkTime << "s\n"
1112                << "\tJob completion status " << fInputJobStepAssociation.status() << endl;
1113         logEnd(logStr.str().c_str());
1114 
1115         syslogProcessingTimes(16, // exemgr subsystem
1116                               start_time,   // use join start time for first read time
1117                               end_time,     // use join end   time for last  read time
1118                               start_time,   // use join start time for first write time
1119                               end_time);    // use join end   time for last  write time
1120         syslogEndStep(16, // exemgr subsystem
1121                       0,            // no blocked datalist input  to report
1122                       0);           // no blocked datalist output to report
1123     }
1124 
1125     delete hj;
1126     delete tAp;
1127     delete tBp;
1128     delete dlA;
1129     delete dlB;
1130 }
1131 
toString() const1132 const string StringHashJoinStep::toString() const
1133 {
1134     ostringstream oss;
1135     CalpontSystemCatalog::OID oid1 = 0;
1136     CalpontSystemCatalog::OID oid2 = 0;
1137 
1138     size_t idlsz = fInputJobStepAssociation.outSize();
1139     idbassert(idlsz == 2);
1140     DataList<StringElementType>* dl1 = fInputJobStepAssociation.outAt(0)->stringDataList();
1141 
1142     if (dl1) oid1 = dl1->OID();
1143 
1144     DataList<StringElementType>* dl2 = fInputJobStepAssociation.outAt(1)->stringDataList();
1145 
1146     if (dl2) oid2 = dl2->OID();
1147 
1148     oss << "StringHashJoinStep    ses:" << fSessionId << " st:" << fStepId;
1149     oss << omitOidInDL;
1150     oss << " in  tb/col1:" << fTableOID1 << "/" << oid1;
1151     oss << " " << fInputJobStepAssociation.outAt(0);
1152     oss << " in  tb/col2:" << fTableOID2 << "/" << oid2;
1153     oss << " " << fInputJobStepAssociation.outAt(1);
1154 
1155     idlsz = fOutputJobStepAssociation.outSize();
1156     idbassert(idlsz == 2);
1157     DataList_t* dl3 = fOutputJobStepAssociation.outAt(0)->dataList();
1158 
1159     if (dl3) oid1 = dl3->OID();
1160 
1161     DataList_t* dl4 = fOutputJobStepAssociation.outAt(1)->dataList();
1162 
1163     if (dl4) oid2 = dl4->OID();
1164 
1165     oss << endl << "                          ";
1166     oss << " out tb/col1:" << fTableOID1 << "/" << oid1;
1167     oss << " " << fOutputJobStepAssociation.outAt(0);
1168     oss << " out tb/col2:" << fTableOID2 << "/" << oid2;
1169     oss << " " << fOutputJobStepAssociation.outAt(1);
1170 
1171     return oss.str();
1172 }
1173 
1174 }
1175 
1176