1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019-2020 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: tuple-bps.cpp 9705 2013-07-17 20:06:07Z pleblanc $
20 
21 
22 #include <unistd.h>
23 //#define NDEBUG
24 #include <cassert>
25 #include <sstream>
26 #include <iomanip>
27 #include <algorithm>
28 #include <ctime>
29 #include <sys/time.h>
30 #include <deque>
31 using namespace std;
32 
33 #include <boost/thread.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/uuid/uuid_io.hpp>
36 using namespace boost;
37 
38 #include "bpp-jl.h"
39 #include "distributedenginecomm.h"
40 #include "elementtype.h"
41 #include "jlf_common.h"
42 #include "primitivestep.h"
43 #include "unique32generator.h"
44 #include "rowestimator.h"
45 using namespace joblist;
46 
47 #include "messagequeue.h"
48 using namespace messageqcpp;
49 
50 #include "configcpp.h"
51 using namespace config;
52 
53 #include "messagelog.h"
54 #include "messageobj.h"
55 #include "loggingid.h"
56 #include "errorcodes.h"
57 #include "errorids.h"
58 #include "exceptclasses.h"
59 using namespace logging;
60 
61 #include "liboamcpp.h"
62 
63 #include "calpontsystemcatalog.h"
64 using namespace execplan;
65 
66 #include "brm.h"
67 using namespace BRM;
68 
69 #include "oamcache.h"
70 
71 #include "rowgroup.h"
72 using namespace rowgroup;
73 
74 #include "threadnaming.h"
75 
76 #include "querytele.h"
77 using namespace querytele;
78 
79 #include "pseudocolumn.h"
80 //#define DEBUG 1
81 
82 extern boost::mutex fileLock_g;
83 
84 namespace
85 {
86 const uint32_t LOGICAL_EXTENT_CONVERTER = 10;  		// 10 + 13.  13 to convert to logical blocks,
87 // 10 to convert to groups of 1024 logical blocks
88 const uint32_t DEFAULT_EXTENTS_PER_SEG_FILE = 2;
89 
90 }
91 
92 /** Debug macro */
93 #define THROTTLE_DEBUG 0
94 #if THROTTLE_DEBUG
95 #define THROTTLEDEBUG std::cout
96 #else
97 #define THROTTLEDEBUG if (false) std::cout
98 #endif
99 namespace joblist
100 {
101 
102 struct TupleBPSPrimitive
103 {
TupleBPSPrimitivejoblist::TupleBPSPrimitive104     TupleBPSPrimitive(TupleBPS* batchPrimitiveStep) :
105         fBatchPrimitiveStep(batchPrimitiveStep)
106     {}
107     TupleBPS* fBatchPrimitiveStep;
operator ()joblist::TupleBPSPrimitive108     void operator()()
109     {
110         try
111         {
112             utils::setThreadName("BPSPrimitive");
113             fBatchPrimitiveStep->sendPrimitiveMessages();
114         }
115         catch (std::exception& re)
116         {
117             cerr << "TupleBPS: send thread threw an exception: " << re.what() <<
118                  "\t" << this << endl;
119             catchHandler(re.what(), ERR_TUPLE_BPS, fBatchPrimitiveStep->errorInfo());
120         }
121         catch (...)
122         {
123             string msg("TupleBPS: send thread threw an unknown exception ");
124             catchHandler(msg, ERR_TUPLE_BPS, fBatchPrimitiveStep->errorInfo());
125             cerr << msg << this << endl;
126         }
127     }
128 };
129 
130 struct TupleBPSAggregators
131 {
TupleBPSAggregatorsjoblist::TupleBPSAggregators132     TupleBPSAggregators(TupleBPS* batchPrimitiveStep, uint64_t index) :
133         fBatchPrimitiveStepCols(batchPrimitiveStep), fThreadId(index)
134     {}
135     TupleBPS* fBatchPrimitiveStepCols;
136     uint64_t fThreadId;
137 
operator ()joblist::TupleBPSAggregators138     void operator()()
139     {
140         try
141         {
142             utils::setThreadName("BPSAggregator");
143             fBatchPrimitiveStepCols->receiveMultiPrimitiveMessages(fThreadId);
144         }
145         catch (std::exception& re)
146         {
147             cerr << fBatchPrimitiveStepCols->toString() << ": receive thread threw an exception: " << re.what() << endl;
148             catchHandler(re.what(), ERR_TUPLE_BPS, fBatchPrimitiveStepCols->errorInfo());
149         }
150         catch (...)
151         {
152             string msg("TupleBPS: recv thread threw an unknown exception ");
153             cerr << fBatchPrimitiveStepCols->toString() << msg << endl;
154             catchHandler(msg, ERR_TUPLE_BPS, fBatchPrimitiveStepCols->errorInfo());
155         }
156     }
157 };
158 
159 //------------------------------------------------------------------------------
160 // Initialize configurable parameters
161 //------------------------------------------------------------------------------
initializeConfigParms()162 void TupleBPS::initializeConfigParms()
163 {
164     string        strVal;
165 
166 
167     //...Get the tuning parameters that throttle msgs sent to primproc
168     //...fFilterRowReqLimit puts a cap on how many rids we will request from
169     //...    primproc, before pausing to let the consumer thread catch up.
170     //...    Without this limit, there is a chance that PrimProc could flood
171     //...    ExeMgr with thousands of messages that will consume massive
172     //...    amounts of memory for a 100 gigabyte database.
173     //...fFilterRowReqThreshold is the level at which the number of outstanding
174     //...    rids must fall below, before the producer can send more rids.
175 
176     //These could go in constructor
177     fRequestSize = fRm->getJlRequestSize();
178     fMaxOutstandingRequests = fRm->getJlMaxOutstandingRequests();
179     fProcessorThreadsPerScan = fRm->getJlProcessorThreadsPerScan();
180     fNumThreads = 0;
181 
182     config::Config* cf = config::Config::makeConfig();
183     string epsf = cf->getConfig("ExtentMap", "ExtentsPerSegmentFile");
184 
185     if ( epsf.length() != 0 )
186         fExtentsPerSegFile = cf->uFromText(epsf);
187 
188     if (fRequestSize >= fMaxOutstandingRequests)
189         fRequestSize = 1;
190 
191     if ((fSessionId & 0x80000000) == 0)
192         fMaxNumThreads = fRm->getJlNumScanReceiveThreads();
193     else
194         fMaxNumThreads = 1;
195 
196     // Reserve the max number of thread space. A bit of an optimization.
197     fProducerThreads.clear();
198     fProducerThreads.reserve(fMaxNumThreads);
199 }
200 
TupleBPS(const pColStep & rhs,const JobInfo & jobInfo)201 TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
202     BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm)
203 {
204     fInputJobStepAssociation = rhs.inputAssociation();
205     fOutputJobStepAssociation = rhs.outputAssociation();
206     fDec = 0;
207     fSessionId = rhs.sessionId();
208     fFilterCount = rhs.filterCount();
209     fFilterString = rhs.filterString();
210     isFilterFeeder = rhs.getFeederFlag();
211     fOid = rhs.oid();
212     fTableOid = rhs.tableOid();
213     extentSize = rhs.extentSize;
214 
215     scannedExtents = rhs.extents;
216     extentsMap[fOid] = tr1::unordered_map<int64_t, EMEntry>();
217     tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[fOid];
218 
219     for (uint32_t z = 0; z < rhs.extents.size(); z++)
220         ref[rhs.extents[z].range.start] = rhs.extents[z];
221 
222     lbidList = rhs.lbidList;
223     rpbShift = rhs.rpbShift;
224     divShift = rhs.divShift;
225     modMask = rhs.modMask;
226     numExtents = rhs.numExtents;
227     ridsRequested = 0;
228     ridsReturned = 0;
229     recvExited = 0;
230     totalMsgs = 0;
231     msgsSent = 0;
232     msgsRecvd = 0;
233     fMsgBytesIn = 0;
234     fMsgBytesOut = 0;
235     fBlockTouched = 0;
236     fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
237     recvWaiting = 0;
238     fStepCount = 1;
239     fCPEvaluated = false;
240     fEstimatedRows = 0;
241     fColType = rhs.colType();
242     alias(rhs.alias());
243     view(rhs.view());
244     name(rhs.name());
245     fColWidth = fColType.colWidth;
246     fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
247     initializeConfigParms();
248     fBPP->setSessionID(fSessionId);
249     fBPP->setStepID(fStepId);
250     fBPP->setQueryContext(fVerId);
251     fBPP->setTxnID(fTxnId);
252     fTraceFlags = rhs.fTraceFlags;
253     fBPP->setTraceFlags(fTraceFlags);
254     fBPP->setOutputType(ROW_GROUP);
255     finishedSending = sendWaiting = false;
256     fNumBlksSkipped = 0;
257     fPhysicalIO = 0;
258     fCacheIO = 0;
259     BPPIsAllocated = false;
260     uniqueID = UniqueNumberGenerator::instance()->getUnique32();
261     fBPP->setUniqueID(uniqueID);
262     fBPP->setUuid(fStepUuid);
263     fCardinality = rhs.cardinality();
264     doJoin = false;
265     hasPMJoin = false;
266     hasUMJoin = false;
267     fRunExecuted = false;
268     fSwallowRows = false;
269     smallOuterJoiner = -1;
270 
271     // @1098 initialize scanFlags to be true
272     scanFlags.assign(numExtents, true);
273     runtimeCPFlags.assign(numExtents, true);
274     bop = BOP_AND;
275 
276     runRan = joinRan = false;
277     fDelivery = false;
278     fExtendedInfo = "TBPS: ";
279     fQtc.stepParms().stepType = StepTeleStats::T_BPS;
280 
281 
282     hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
283                                     hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
284 }
285 
TupleBPS(const pColScanStep & rhs,const JobInfo & jobInfo)286 TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
287     BatchPrimitive(jobInfo), fRm(jobInfo.rm)
288 {
289     fInputJobStepAssociation = rhs.inputAssociation();
290     fOutputJobStepAssociation = rhs.outputAssociation();
291     fDec = 0;
292     fFilterCount = rhs.filterCount();
293     fFilterString = rhs.filterString();
294     isFilterFeeder = rhs.getFeederFlag();
295     fOid = rhs.oid();
296     fTableOid = rhs.tableOid();
297     extentSize = rhs.extentSize;
298     lbidRanges = rhs.lbidRanges;
299 
300     /* These lines are obsoleted by initExtentMarkers.  Need to remove & retest. */
301     scannedExtents = rhs.extents;
302     extentsMap[fOid] = tr1::unordered_map<int64_t, EMEntry>();
303     tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[fOid];
304 
305     for (uint32_t z = 0; z < rhs.extents.size(); z++)
306         ref[rhs.extents[z].range.start] = rhs.extents[z];
307 
308     divShift = rhs.divShift;
309     totalMsgs = 0;
310     msgsSent = 0;
311     msgsRecvd = 0;
312     ridsReturned = 0;
313     ridsRequested = 0;
314     fNumBlksSkipped = 0;
315     fMsgBytesIn = 0;
316     fMsgBytesOut = 0;
317     fBlockTouched = 0;
318     fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
319     recvWaiting = 0;
320     fSwallowRows = false;
321     fStepCount = 1;
322     fCPEvaluated = false;
323     fEstimatedRows = 0;
324     fColType = rhs.colType();
325     alias(rhs.alias());
326     view(rhs.view());
327     name(rhs.name());
328 
329     fColWidth = fColType.colWidth;
330     lbidList = rhs.lbidList;
331 
332     finishedSending = sendWaiting = false;
333     firstRead = true;
334     fSwallowRows = false;
335     recvExited = 0;
336     fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
337     initializeConfigParms();
338     fBPP->setSessionID(fSessionId);
339     fBPP->setQueryContext(fVerId);
340     fBPP->setTxnID(fTxnId);
341     fTraceFlags = rhs.fTraceFlags;
342     fBPP->setTraceFlags(fTraceFlags);
343     fBPP->setStepID(fStepId);
344     fBPP->setOutputType(ROW_GROUP);
345     fPhysicalIO = 0;
346     fCacheIO = 0;
347     BPPIsAllocated = false;
348     uniqueID = UniqueNumberGenerator::instance()->getUnique32();
349     fBPP->setUniqueID(uniqueID);
350     fBPP->setUuid(fStepUuid);
351     fCardinality = rhs.cardinality();
352     doJoin = false;
353     hasPMJoin = false;
354     hasUMJoin = false;
355     fRunExecuted = false;
356     smallOuterJoiner = -1;
357     bop = BOP_AND;
358 
359     runRan = joinRan = false;
360     fDelivery = false;
361     fExtendedInfo = "TBPS: ";
362 
363     initExtentMarkers();
364     fQtc.stepParms().stepType = StepTeleStats::T_BPS;
365 
366     hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
367                                     hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
368 }
369 
TupleBPS(const PassThruStep & rhs,const JobInfo & jobInfo)370 TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
371     BatchPrimitive(jobInfo), fRm(jobInfo.rm)
372 {
373     fInputJobStepAssociation = rhs.inputAssociation();
374     fOutputJobStepAssociation = rhs.outputAssociation();
375     fDec = 0;
376     fFilterCount = 0;
377     fOid = rhs.oid();
378     fTableOid = rhs.tableOid();
379     ridsReturned = 0;
380     ridsRequested = 0;
381     fMsgBytesIn = 0;
382     fMsgBytesOut = 0;
383     fBlockTouched = 0;
384     fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
385     recvExited = 0;
386     totalMsgs = 0;
387     msgsSent = 0;
388     msgsRecvd = 0;
389     recvWaiting = 0;
390     fStepCount = 1;
391     fCPEvaluated = false;
392     fEstimatedRows = 0;
393     fColType = rhs.colType();
394     alias(rhs.alias());
395     view(rhs.view());
396     name(rhs.name());
397     fColWidth = fColType.colWidth;
398     fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
399     initializeConfigParms();
400     fBPP->setSessionID(fSessionId);
401     fBPP->setStepID(fStepId);
402     fBPP->setQueryContext(fVerId);
403     fBPP->setTxnID(fTxnId);
404     fTraceFlags = rhs.fTraceFlags;
405     fBPP->setTraceFlags(fTraceFlags);
406     fBPP->setOutputType(ROW_GROUP);
407     finishedSending = sendWaiting = false;
408     fSwallowRows = false;
409     fNumBlksSkipped = 0;
410     fPhysicalIO = 0;
411     fCacheIO = 0;
412     BPPIsAllocated = false;
413     uniqueID = UniqueNumberGenerator::instance()->getUnique32();
414     fBPP->setUniqueID(uniqueID);
415     fBPP->setUuid(fStepUuid);
416     doJoin = false;
417     hasPMJoin = false;
418     hasUMJoin = false;
419     fRunExecuted = false;
420     isFilterFeeder = false;
421     smallOuterJoiner = -1;
422 
423     // @1098 initialize scanFlags to be true
424     scanFlags.assign(numExtents, true);
425     runtimeCPFlags.assign(numExtents, true);
426     bop = BOP_AND;
427 
428     runRan = joinRan = false;
429     fDelivery = false;
430     fExtendedInfo = "TBPS: ";
431     fQtc.stepParms().stepType = StepTeleStats::T_BPS;
432 
433     hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
434                                     hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
435 
436 }
437 
TupleBPS(const pDictionaryStep & rhs,const JobInfo & jobInfo)438 TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
439     BatchPrimitive(jobInfo), fRm(jobInfo.rm)
440 {
441     fInputJobStepAssociation = rhs.inputAssociation();
442     fOutputJobStepAssociation = rhs.outputAssociation();
443     fDec = 0;
444     fOid = rhs.oid();
445     fTableOid = rhs.tableOid();
446     totalMsgs = 0;
447     msgsSent = 0;
448     msgsRecvd = 0;
449     ridsReturned = 0;
450     ridsRequested = 0;
451     fNumBlksSkipped = 0;
452     fBlockTouched = 0;
453     fMsgBytesIn = 0;
454     fMsgBytesOut = 0;
455     fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
456     recvWaiting = 0;
457     fSwallowRows = false;
458     fStepCount = 1;
459     fCPEvaluated = false;
460     fEstimatedRows = 0;
461     alias(rhs.alias());
462     view(rhs.view());
463     name(rhs.name());
464     finishedSending = sendWaiting = false;
465     recvExited = 0;
466     fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
467     initializeConfigParms();
468     fBPP->setSessionID(fSessionId);
469     fBPP->setStepID(fStepId);
470     fBPP->setQueryContext(fVerId);
471     fBPP->setTxnID(fTxnId);
472     fTraceFlags = rhs.fTraceFlags;
473     fBPP->setTraceFlags(fTraceFlags);
474     fBPP->setOutputType(ROW_GROUP);
475     fPhysicalIO = 0;
476     fCacheIO = 0;
477     BPPIsAllocated = false;
478     uniqueID = UniqueNumberGenerator::instance()->getUnique32();
479     fBPP->setUniqueID(uniqueID);
480     fBPP->setUuid(fStepUuid);
481     fCardinality = rhs.cardinality();
482     doJoin = false;
483     hasPMJoin = false;
484     hasUMJoin = false;
485     fRunExecuted = false;
486     isFilterFeeder = false;
487     smallOuterJoiner = -1;
488     // @1098 initialize scanFlags to be true
489     scanFlags.assign(numExtents, true);
490     runtimeCPFlags.assign(numExtents, true);
491     bop = BOP_AND;
492 
493     runRan = joinRan = false;
494     fDelivery = false;
495     fExtendedInfo = "TBPS: ";
496     fQtc.stepParms().stepType = StepTeleStats::T_BPS;
497 
498     hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
499                                     hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
500 
501 }
502 
~TupleBPS()503 TupleBPS::~TupleBPS()
504 {
505     if (fDec)
506     {
507         fDec->removeDECEventListener(this);
508 
509         if (BPPIsAllocated)
510         {
511             ByteStream bs;
512             fBPP->destroyBPP(bs);
513 
514             try
515             {
516                 fDec->write(uniqueID, bs);
517             }
518             catch (const std::exception& e)
519             {
520                 // log the exception
521                 cerr << "~TupleBPS caught: " << e.what() << endl;
522                 catchHandler(e.what(), ERR_TUPLE_BPS, fErrorInfo, fSessionId);
523             }
524             catch (...)
525             {
526                 cerr << "~TupleBPS caught unknown exception" << endl;
527                 catchHandler("~TupleBPS caught unknown exception",
528                              ERR_TUPLE_BPS, fErrorInfo, fSessionId);
529             }
530         }
531 
532         fDec->removeQueue(uniqueID);
533     }
534 
535 }
536 
setBPP(JobStep * jobStep)537 void TupleBPS::setBPP(JobStep* jobStep)
538 {
539     fCardinality = jobStep->cardinality();
540 
541     pColStep* pcsp = dynamic_cast<pColStep*>(jobStep);
542 
543     int colWidth = 0;
544 
545     if (pcsp != 0)
546     {
547         PseudoColStep* pseudo = dynamic_cast<PseudoColStep*>(jobStep);
548 
549         if (pseudo)
550         {
551             fBPP->addFilterStep(*pseudo);
552 
553             if (pseudo->filterCount() > 0)
554             {
555                 hasPCFilter = true;
556 
557                 switch (pseudo->pseudoColumnId())
558                 {
559                     case PSEUDO_EXTENTRELATIVERID:
560                         hasRIDFilter = true;
561                         break;
562 
563                     case PSEUDO_DBROOT:
564                         hasDBRootFilter = true;
565                         break;
566 
567                     case PSEUDO_PM:
568                         hasPMFilter = true;
569                         break;
570 
571                     case PSEUDO_SEGMENT:
572                         hasSegmentFilter = true;
573                         break;
574 
575                     case PSEUDO_SEGMENTDIR:
576                         hasSegmentDirFilter = true;
577                         break;
578 
579                     case PSEUDO_EXTENTMIN:
580                         hasMinFilter = true;
581                         break;
582 
583                     case PSEUDO_EXTENTMAX:
584                         hasMaxFilter = true;
585                         break;
586 
587                     case PSEUDO_BLOCKID:
588                         hasLBIDFilter = true;
589                         break;
590 
591                     case PSEUDO_EXTENTID:
592                         hasExtentIDFilter = true;
593                         break;
594 
595                     case PSEUDO_PARTITION:
596                         hasPartitionFilter = true;
597                         break;
598                 }
599             }
600         }
601         else
602             fBPP->addFilterStep(*pcsp);
603 
604         extentsMap[pcsp->fOid] = tr1::unordered_map<int64_t, EMEntry>();
605         tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcsp->fOid];
606 
607         for (uint32_t z = 0; z < pcsp->extents.size(); z++)
608             ref[pcsp->extents[z].range.start] = pcsp->extents[z];
609 
610         colWidth = (pcsp->colType()).colWidth;
611         isFilterFeeder = pcsp->getFeederFlag();
612 
613         // it17 does not allow combined AND/OR, this pcolstep is for hashjoin optimization.
614         if (bop == BOP_OR && isFilterFeeder == false)
615             fBPP->setForHJ(true);
616     }
617     else
618     {
619         pColScanStep* pcss = dynamic_cast<pColScanStep*>(jobStep);
620 
621         if (pcss != 0)
622         {
623             fBPP->addFilterStep(*pcss, lastScannedLBID);
624 
625             extentsMap[pcss->fOid] = tr1::unordered_map<int64_t, EMEntry>();
626             tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcss->fOid];
627 
628             for (uint32_t z = 0; z < pcss->extents.size(); z++)
629                 ref[pcss->extents[z].range.start] = pcss->extents[z];
630 
631             colWidth = (pcss->colType()).colWidth;
632             isFilterFeeder = pcss->getFeederFlag();
633         }
634         else
635         {
636             pDictionaryStep* pdsp = dynamic_cast<pDictionaryStep*>(jobStep);
637 
638             if (pdsp != 0)
639             {
640                 fBPP->addFilterStep(*pdsp);
641                 colWidth = (pdsp->colType()).colWidth;
642             }
643             else
644             {
645                 FilterStep* pfsp = dynamic_cast<FilterStep*>(jobStep);
646 
647                 if (pfsp)
648                 {
649                     fBPP->addFilterStep(*pfsp);
650                 }
651             }
652 
653         }
654     }
655 
656     if (colWidth > fColWidth)
657     {
658         fColWidth = colWidth;
659     }
660 }
661 
setProjectBPP(JobStep * jobStep1,JobStep * jobStep2)662 void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2)
663 {
664     int colWidth = 0;
665 
666     if (jobStep2 != NULL)
667     {
668         pDictionaryStep* pdsp = 0;
669         pColStep* pcsp = dynamic_cast<pColStep*>(jobStep1);
670 
671         if (pcsp != 0)
672         {
673             pdsp = dynamic_cast<pDictionaryStep*>(jobStep2);
674             fBPP->addProjectStep(*pcsp, *pdsp);
675 
676             //@Bug 961
677             if (!pcsp->isExeMgr())
678                 fBPP->setNeedRidsAtDelivery(true);
679 
680             colWidth = (pcsp->colType()).colWidth;
681             projectOids.push_back(jobStep1->oid());
682         }
683         else
684         {
685             PassThruStep* psth = dynamic_cast<PassThruStep*>(jobStep1);
686 
687             if (psth != 0)
688             {
689                 pdsp = dynamic_cast<pDictionaryStep*>(jobStep2);
690                 fBPP->addProjectStep(*psth, *pdsp);
691 
692                 //@Bug 961
693                 if (!psth->isExeMgr())
694                     fBPP->setNeedRidsAtDelivery(true);
695 
696                 projectOids.push_back(jobStep1->oid());
697                 colWidth = (psth->colType()).colWidth;
698             }
699         }
700     }
701     else
702     {
703         pColStep* pcsp = dynamic_cast<pColStep*>(jobStep1);
704 
705         if (pcsp != 0)
706         {
707             PseudoColStep* pseudo = dynamic_cast<PseudoColStep*>(jobStep1);
708 
709             if (pseudo)
710             {
711                 fBPP->addProjectStep(*pseudo);
712             }
713             else
714                 fBPP->addProjectStep(*pcsp);
715 
716             extentsMap[pcsp->fOid] = tr1::unordered_map<int64_t, EMEntry>();
717             tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcsp->fOid];
718 
719             for (uint32_t z = 0; z < pcsp->extents.size(); z++)
720                 ref[pcsp->extents[z].range.start] = pcsp->extents[z];
721 
722             //@Bug 961
723             if (!pcsp->isExeMgr())
724                 fBPP->setNeedRidsAtDelivery(true);
725 
726             colWidth = (pcsp->colType()).colWidth;
727             projectOids.push_back(jobStep1->oid());
728         }
729         else
730         {
731             PassThruStep* passthru = dynamic_cast<PassThruStep*>(jobStep1);
732 
733             if (passthru != 0)
734             {
735                 idbassert(!fBPP->getFilterSteps().empty());
736 
737                 if (static_cast<CalpontSystemCatalog::OID>(fBPP->getFilterSteps().back()->getOID()) != passthru->oid())
738                 {
739                     SJSTEP pts;
740 
741                     if (passthru->pseudoType() == 0)
742                     {
743                         pts.reset(new pColStep(*passthru));
744                         pcsp = dynamic_cast<pColStep*>(pts.get());
745                     }
746                     else
747                     {
748                         pts.reset(new PseudoColStep(*passthru));
749                         pcsp = dynamic_cast<PseudoColStep*>(pts.get());
750                     }
751 
752                     fBPP->addProjectStep(*pcsp);
753 
754                     if (!passthru->isExeMgr())
755                         fBPP->setNeedRidsAtDelivery(true);
756 
757                     colWidth = passthru->colType().colWidth;
758                     projectOids.push_back(pts->oid());
759                 }
760                 else
761                 {
762                     fBPP->addProjectStep(*passthru);
763 
764                     //@Bug 961
765                     if (!passthru->isExeMgr())
766                         fBPP->setNeedRidsAtDelivery(true);
767 
768                     colWidth = (passthru->colType()).colWidth;
769                     projectOids.push_back(jobStep1->oid());
770                 }
771             }
772         }
773     }
774 
775     if (colWidth > fColWidth)
776     {
777         fColWidth = colWidth;
778     }
779 }
780 
storeCasualPartitionInfo(const bool estimateRowCounts)781 void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
782 {
783     const vector<SCommand>& colCmdVec = fBPP->getFilterSteps();
784     vector<ColumnCommandJL*> cpColVec;
785     vector<SP_LBIDList> lbidListVec;
786     ColumnCommandJL* colCmd = 0;
787 
788     // @bug 2123.  We call this earlier in the process for the hash join estimation process now.  Return if we've already done the work.
789     if (fCPEvaluated)
790     {
791         return;
792     }
793 
794     fCPEvaluated = true;
795 
796     if (colCmdVec.size() == 0)
797         return;
798 
799     for (uint32_t i = 0; i < colCmdVec.size(); i++)
800     {
801         colCmd = dynamic_cast<ColumnCommandJL*>(colCmdVec[i].get());
802 
803         if (!colCmd || dynamic_cast<PseudoCCJL*>(colCmdVec[i].get()))
804             continue;
805 
806         SP_LBIDList tmplbidList(new LBIDList(0));
807 
808         if (tmplbidList->CasualPartitionDataType(colCmd->getColType().colDataType, colCmd->getColType().colWidth))
809         {
810             lbidListVec.push_back(tmplbidList);
811             cpColVec.push_back(colCmd);
812         }
813 
814         // @Bug 3503. Use the total table size as the estimate for non CP columns.
815         else if (fEstimatedRows == 0 && estimateRowCounts)
816         {
817             RowEstimator rowEstimator;
818             fEstimatedRows = rowEstimator.estimateRowsForNonCPColumn(*colCmd);
819         }
820     }
821 
822 
823     if (cpColVec.size() == 0)
824         return;
825 
826     const bool ignoreCP = ((fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP) != 0);
827 
828     for (uint32_t idx = 0; idx < numExtents; idx++)
829     {
830         scanFlags[idx] = true;
831 
832         for (uint32_t i = 0; i < cpColVec.size(); i++)
833         {
834             colCmd = cpColVec[i];
835             const EMEntry& extent = colCmd->getExtents()[idx];
836 
837             /* If any column filter eliminates an extent, it doesn't get scanned */
838             scanFlags[idx] = scanFlags[idx] &&
839                              (ignoreCP || extent.partition.cprange.isValid != BRM::CP_VALID ||
840                               lbidListVec[i]->CasualPartitionPredicate(
841                                   extent.partition.cprange.lo_val,
842                                   extent.partition.cprange.hi_val,
843                                   &(colCmd->getFilterString()),
844                                   colCmd->getFilterCount(),
845                                   colCmd->getColType(),
846                                   colCmd->getBOP())
847                              );
848 
849             if (!scanFlags[idx])
850             {
851                 break;
852             }
853         }
854     }
855 
856     // @bug 2123.  Use the casual partitioning information to estimate the number of rows that will be returned for use in estimating
857     // the large side table for hashjoins.
858     if (estimateRowCounts)
859     {
860         RowEstimator rowEstimator;
861         fEstimatedRows = rowEstimator.estimateRows(cpColVec, scanFlags, dbrm, fOid);
862     }
863 }
864 
startPrimitiveThread()865 void TupleBPS::startPrimitiveThread()
866 {
867     pThread = jobstepThreadPool.invoke(TupleBPSPrimitive(this));
868 }
869 
startAggregationThread()870 void TupleBPS::startAggregationThread()
871 {
872 //  This block of code starts all threads up front
873 //     fMaxNumThreads = 1;
874 //     fNumThreads = fMaxNumThreads;
875 //     for (uint32_t i = 0; i < fMaxNumThreads; i++)
876 //             fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i)));
877 
878 //  This block of code starts one thread at a time
879     if (fNumThreads >= fMaxNumThreads)
880         return;
881 
882     fNumThreads++;
883     fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads - 1)));
884 }
885 
serializeJoiner()886 void TupleBPS::serializeJoiner()
887 {
888     ByteStream bs;
889     bool more = true;
890 
891     /* false from nextJoinerMsg means it's the last msg,
892     	it's not exactly the exit condition*/
893     while (more)
894     {
895         {
896             // code block to release the lock immediatly
897             boost::mutex::scoped_lock lk(serializeJoinerMutex);
898             more = fBPP->nextTupleJoinerMsg(bs);
899         }
900 #ifdef JLF_DEBUG
901         cout << "serializing joiner into " << bs.length() << " bytes" << endl;
902 #endif
903         fDec->write(uniqueID, bs);
904         bs.restart();
905     }
906 
907 }
908 
serializeJoiner(uint32_t conn)909 void TupleBPS::serializeJoiner(uint32_t conn)
910 {
911     // We need this lock for TupleBPS::serializeJoiner()
912     boost::mutex::scoped_lock lk(serializeJoinerMutex);
913 
914     ByteStream bs;
915     bool more = true;
916 
917     /* false from nextJoinerMsg means it's the last msg,
918     	it's not exactly the exit condition*/
919     while (more)
920     {
921         more = fBPP->nextTupleJoinerMsg(bs);
922         fDec->write(bs, conn);
923         bs.restart();
924     }
925 }
926 
prepCasualPartitioning()927 void TupleBPS::prepCasualPartitioning()
928 {
929     uint32_t i;
930     int64_t min, max, seq;
931 	boost::mutex::scoped_lock lk(cpMutex);
932 
933     for (i = 0; i < scannedExtents.size(); i++)
934     {
935         if (fOid >= 3000)
936         {
937             scanFlags[i] = scanFlags[i] && runtimeCPFlags[i];
938 
939             if (scanFlags[i] && lbidList->CasualPartitionDataType(fColType.colDataType,
940                     fColType.colWidth))
941                 lbidList->GetMinMax(min, max, seq, (int64_t) scannedExtents[i].range.start,
942                                     &scannedExtents, fColType.colDataType);
943         }
944         else
945             scanFlags[i] = true;
946     }
947 }
948 
goodExtentCount()949 bool TupleBPS::goodExtentCount()
950 {
951     uint32_t eCount = extentsMap.begin()->second.size();
952     map<CalpontSystemCatalog::OID, tr1::unordered_map<int64_t, EMEntry> >
953     ::iterator it;
954 
955     for (it = extentsMap.begin(); it != extentsMap.end(); ++it)
956         if (it->second.size() != eCount)
957             return false;
958 
959     return true;
960 }
961 
initExtentMarkers()962 void TupleBPS::initExtentMarkers()
963 {
964     numDBRoots = fRm->getDBRootCount();
965     lastExtent.resize(numDBRoots);
966     lastScannedLBID.resize(numDBRoots);
967 
968     tr1::unordered_map<int64_t, struct BRM::EMEntry>& ref = extentsMap[fOid];
969     tr1::unordered_map<int64_t, struct BRM::EMEntry>::iterator it;
970 
971     // Map part# and seg# to an extent count per segment file.
972     // Part# is 32 hi order bits of key, seg# is 32 lo order bits
973     std::tr1::unordered_map<uint64_t, int> extentCountPerDbFile;
974 
975     scannedExtents.clear();
976 
977     for (it = ref.begin(); it != ref.end(); ++it)
978     {
979         scannedExtents.push_back(it->second);
980 
981         //@bug 5322: 0 HWM may not mean full extent if 1 extent in file.
982         // Track how many extents are in each segment file
983         if (fExtentsPerSegFile > 1)
984         {
985             EMEntry& e = it->second;
986             uint64_t key = ((uint64_t)e.partitionNum << 32) + e.segmentNum;
987             ++extentCountPerDbFile[key];
988         }
989     }
990 
991     sort(scannedExtents.begin(), scannedExtents.end(), ExtentSorter());
992 
993     numExtents = scannedExtents.size();
994     // @1098 initialize scanFlags to be true
995     scanFlags.assign(numExtents, true);
996     runtimeCPFlags.assign(numExtents, true);
997 
998     for (uint32_t i = 0; i < numDBRoots; i++)
999         lastExtent[i] = -1;
1000 
1001     for (uint32_t i = 0; i < scannedExtents.size(); i++)
1002     {
1003         uint32_t dbRoot = scannedExtents[i].dbRoot - 1;
1004 
1005         /* Kludge to account for gaps in the dbroot mapping. */
1006         if (scannedExtents[i].dbRoot > numDBRoots)
1007         {
1008             lastExtent.resize(scannedExtents[i].dbRoot);
1009             lastScannedLBID.resize(scannedExtents[i].dbRoot);
1010 
1011             for (uint32_t z = numDBRoots; z < scannedExtents[i].dbRoot; z++)
1012                 lastExtent[z] = -1;
1013 
1014             numDBRoots = scannedExtents[i].dbRoot;
1015         }
1016 
1017         if ((scannedExtents[i].status == EXTENTAVAILABLE) && (lastExtent[dbRoot] < (int) i))
1018             lastExtent[dbRoot] = i;
1019 
1020         //@bug 5322: 0 HWM may not mean full extent if 1 extent in file.
1021         // Use special status (EXTENTSTATUSMAX+1) to denote a single extent
1022         // file with HWM 0; retrieve 1 block and not full extent.
1023         if ((fExtentsPerSegFile > 1) && (scannedExtents[i].HWM == 0))
1024         {
1025             uint64_t key = ((uint64_t)scannedExtents[i].partitionNum << 32) +
1026                            scannedExtents[i].segmentNum;
1027 
1028             if (extentCountPerDbFile[key] == 1)
1029                 scannedExtents[i].status = EXTENTSTATUSMAX + 1;
1030         }
1031     }
1032 
1033     // if only 1 block is written in the last extent, HWM is 0 and didn't get counted.
1034     for (uint32_t i = 0; i < numDBRoots; i++)
1035     {
1036         if (lastExtent[i] != -1)
1037             lastScannedLBID[i] = scannedExtents[lastExtent[i]].range.start +
1038                                  (scannedExtents[lastExtent[i]].HWM - scannedExtents[lastExtent[i]].blockOffset);
1039         else
1040             lastScannedLBID[i] = -1;
1041     }
1042 }
1043 
reloadExtentLists()1044 void TupleBPS::reloadExtentLists()
1045 {
1046     /*
1047      * Iterate over each ColumnCommand instance
1048      *
1049      * 1) reload its extent array
1050      * 2) update TupleBPS's extent array
1051      * 3) update vars dependent on the extent layout (lastExtent, scanFlags, etc)
1052      */
1053 
1054     uint32_t i, j;
1055     ColumnCommandJL* cc;
1056     vector<SCommand>& filters = fBPP->getFilterSteps();
1057     vector<SCommand>& projections = fBPP->getProjectSteps();
1058     uint32_t oid;
1059 
1060     /* To reduce the race, make all CC's get new extents as close together
1061      * as possible, then rebuild the local copies.
1062      */
1063 
1064     for (i = 0; i < filters.size(); i++)
1065     {
1066         cc = dynamic_cast<ColumnCommandJL*>(filters[i].get());
1067 
1068         if (cc != NULL)
1069             cc->reloadExtents();
1070     }
1071 
1072     for (i = 0; i < projections.size(); i++)
1073     {
1074         cc = dynamic_cast<ColumnCommandJL*>(projections[i].get());
1075 
1076         if (cc != NULL)
1077             cc->reloadExtents();
1078     }
1079 
1080     extentsMap.clear();
1081 
1082     for (i = 0; i < filters.size(); i++)
1083     {
1084         cc = dynamic_cast<ColumnCommandJL*>(filters[i].get());
1085 
1086         if (cc == NULL)
1087             continue;
1088 
1089         const vector<EMEntry>& extents = cc->getExtents();
1090         oid = cc->getOID();
1091 
1092         extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
1093         tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[oid];
1094 
1095         for (j = 0; j < extents.size(); j++)
1096             mref[extents[j].range.start] = extents[j];
1097     }
1098 
1099     for (i = 0; i < projections.size(); i++)
1100     {
1101         cc = dynamic_cast<ColumnCommandJL*>(projections[i].get());
1102 
1103         if (cc == NULL)
1104             continue;
1105 
1106         const vector<EMEntry>& extents = cc->getExtents();
1107         oid = cc->getOID();
1108 
1109         extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
1110         tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[oid];
1111 
1112         for (j = 0; j < extents.size(); j++)
1113             mref[extents[j].range.start] = extents[j];
1114     }
1115 
1116     initExtentMarkers();
1117 }
1118 
run()1119 void TupleBPS::run()
1120 {
1121     uint32_t i;
1122     boost::mutex::scoped_lock lk(jlLock);
1123     uint32_t retryCounter = 0;
1124     const uint32_t retryMax = 1000;   // 50s max; we've seen a 15s window so 50s should be 'safe'
1125     const uint32_t waitInterval = 50000;  // in us
1126 
1127     if (fRunExecuted)
1128         return;
1129 
1130     fRunExecuted = true;
1131 
1132     // make sure each numeric column has the same # of extents! See bugs 4564 and 3607
1133     try
1134     {
1135         while (!goodExtentCount() && retryCounter++ < retryMax)
1136         {
1137             usleep(waitInterval);
1138             reloadExtentLists();
1139         }
1140     }
1141     catch (std::exception& e)
1142     {
1143         ostringstream os;
1144         os << "TupleBPS: Could not get a consistent extent count for each column.  Got '"
1145            << e.what() << "'\n";
1146         catchHandler(os.str(), ERR_TUPLE_BPS, fErrorInfo, fSessionId);
1147         fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
1148         return;
1149     }
1150 
1151     if (retryCounter == retryMax)
1152     {
1153         catchHandler("TupleBPS: Could not get a consistent extent count for each column.",
1154                      ERR_TUPLE_BPS, fErrorInfo, fSessionId);
1155         fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
1156         return;
1157     }
1158 
1159     if (traceOn())
1160     {
1161         syslogStartStep(16,           // exemgr subsystem
1162                         std::string("TupleBPS")); // step name
1163     }
1164 
1165     ByteStream bs;
1166 
1167     if (fDelivery)
1168     {
1169         deliveryDL.reset(new RowGroupDL(1, 5));
1170         deliveryIt = deliveryDL->getIterator();
1171     }
1172 
1173     fBPP->setThreadCount(fMaxNumThreads);
1174 
1175     if (doJoin)
1176         for (i = 0; i < smallSideCount; i++)
1177             tjoiners[i]->setThreadCount(fMaxNumThreads);
1178 
1179     if (fe1)
1180         fBPP->setFEGroup1(fe1, fe1Input);
1181 
1182     if (fe2 && runFEonPM)
1183         fBPP->setFEGroup2(fe2, fe2Output);
1184 
1185     if (fe2)
1186     {
1187         primRowGroup.initRow(&fe2InRow);
1188         fe2Output.initRow(&fe2OutRow);
1189     }
1190 
1191     try
1192     {
1193         fDec->addDECEventListener(this);
1194         fBPP->priority(priority());
1195         fBPP->createBPP(bs);
1196         fDec->write(uniqueID, bs);
1197         BPPIsAllocated = true;
1198 
1199         if (doJoin && tjoiners[0]->inPM())
1200             serializeJoiner();
1201 
1202         prepCasualPartitioning();
1203         startPrimitiveThread();
1204         fProducerThreads.clear();
1205         fProducerThreads.reserve(fMaxNumThreads);
1206         startAggregationThread();
1207     }
1208     catch (...)
1209     {
1210         handleException(std::current_exception(),
1211                         logging::ERR_TUPLE_BPS,
1212                         logging::ERR_ALWAYS_CRITICAL,
1213                         "TupleBPS::run()");
1214         fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
1215     }
1216 }
1217 
join()1218 void TupleBPS::join()
1219 {
1220     boost::mutex::scoped_lock lk(jlLock);
1221 
1222     if (joinRan)
1223         return;
1224 
1225     joinRan = true;
1226 
1227     if (fRunExecuted)
1228     {
1229         if (msgsRecvd < msgsSent)
1230         {
1231             // wake up the sending thread, it should drain the input dl and exit
1232             boost::unique_lock<boost::mutex> tplLock(tplMutex);
1233             condvarWakeupProducer.notify_all();
1234             tplLock.unlock();
1235         }
1236 
1237         if (pThread)
1238             jobstepThreadPool.join(pThread);
1239 
1240         jobstepThreadPool.join(fProducerThreads);
1241 
1242         if (BPPIsAllocated)
1243         {
1244             ByteStream bs;
1245             fDec->removeDECEventListener(this);
1246             fBPP->destroyBPP(bs);
1247 
1248             try
1249             {
1250                 fDec->write(uniqueID, bs);
1251             }
1252             catch (...)
1253             {
1254                 handleException(std::current_exception(),
1255                                 logging::ERR_TUPLE_BPS,
1256                                 logging::ERR_ALWAYS_CRITICAL,
1257                                 "TupleBPS::join()");
1258             }
1259 
1260             BPPIsAllocated = false;
1261             fDec->removeQueue(uniqueID);
1262             tjoiners.clear();
1263         }
1264     }
1265 }
1266 
sendError(uint16_t status)1267 void TupleBPS::sendError(uint16_t status)
1268 {
1269     ByteStream msgBpp;
1270     fBPP->setCount(1);
1271     fBPP->setStatus(status);
1272     fBPP->runErrorBPP(msgBpp);
1273 
1274     try
1275     {
1276         fDec->write(uniqueID, msgBpp);
1277     }
1278     catch (...)
1279     {
1280         // this fcn is only called in exception handlers
1281         // let the first error take precedence
1282     }
1283 
1284     fBPP->reset();
1285     finishedSending = true;
1286     condvar.notify_all();
1287     condvarWakeupProducer.notify_all();
1288 }
1289 
nextBand(ByteStream & bs)1290 uint32_t TupleBPS::nextBand(ByteStream& bs)
1291 {
1292     bool more = true;
1293     RowGroup& realOutputRG = (fe2 ? fe2Output : primRowGroup);
1294     RGData rgData;
1295     uint32_t rowCount = 0;
1296 
1297     bs.restart();
1298 
1299     while (rowCount == 0 && more)
1300     {
1301         more = deliveryDL->next(deliveryIt, &rgData);
1302 
1303         if (!more)
1304             rgData = fBPP->getErrorRowGroupData(status());
1305 
1306         realOutputRG.setData(&rgData);
1307         rowCount = realOutputRG.getRowCount();
1308 
1309         if ((more && rowCount > 0) || !more)
1310             realOutputRG.serializeRGData(bs);
1311     }
1312 
1313     return rowCount;
1314 }
1315 
1316 /* The current interleaving rotates over PMs, clustering jobs for a single PM
1317  * by dbroot to keep block accesses adjacent & make best use of prefetching &
1318  * cache.
1319  */
interleaveJobs(vector<Job> * jobs) const1320 void TupleBPS::interleaveJobs(vector<Job>* jobs) const
1321 {
1322     vector<Job> newJobs;
1323     uint32_t i;
1324     uint32_t pmCount = 0;
1325     scoped_array<deque<Job> > bins;
1326 
1327     // the input is grouped by dbroot
1328 
1329     if (pmCount == 1)
1330         return;
1331 
1332     /* Need to get the 'real' PM count */
1333     for (i = 0; i < jobs->size(); i++)
1334         if (pmCount < (*jobs)[i].connectionNum + 1)
1335             pmCount = (*jobs)[i].connectionNum + 1;
1336 
1337     bins.reset(new deque<Job>[pmCount]);
1338 
1339     // group by connection number
1340     for (i = 0; i < jobs->size(); i++)
1341         bins[(*jobs)[i].connectionNum].push_back((*jobs)[i]);
1342 
1343     // interleave by connection num
1344     bool noWorkDone;
1345 
1346     while (newJobs.size() < jobs->size())
1347     {
1348         noWorkDone = true;
1349 
1350         for (i = 0; i < pmCount; i++)
1351         {
1352             if (!bins[i].empty())
1353             {
1354                 newJobs.push_back(bins[i].front());
1355                 bins[i].pop_front();
1356                 noWorkDone = false;
1357             }
1358         }
1359 
1360         idbassert(!noWorkDone);
1361     }
1362 
1363 #if 0
1364     /* Work in progress */
1365     // up to this point, only the first connection to a PM is used.
1366     // the last step is to round-robin on the connections of each PM.
1367     // ex: on a 3 PM system where connections per PM is 2,
1368     // connections 0 and 3 are for PM 1, 1 and 4 are PM2, 2 and 5 are PM3.
1369     uint32_t* jobCounters = (uint32_t*) alloca(pmCount * sizeof(uint32_t));
1370     memset(jobCounters, 0, pmCount * sizeof(uint32_t));
1371 
1372     for (i = 0; i < newJobs.size(); i++)
1373     {
1374         uint32_t& conn = newJobs[i].connectionNum;  // for readability's sake
1375         conn = conn + (pmCount * jobCounters[conn]);
1376         jobCounters[conn]++;
1377     }
1378 
1379 #endif
1380 
1381     jobs->swap(newJobs);
1382 //	cout << "-------------\n";
1383 //	for (i = 0; i < jobs->size(); i++)
1384 //		cout << "job " << i+1 << ": dbroot " << (*jobs)[i].dbroot << ", PM "
1385 //				<< (*jobs)[i].connectionNum + 1 << endl;
1386 }
1387 
sendJobs(const vector<Job> & jobs)1388 void TupleBPS::sendJobs(const vector<Job>& jobs)
1389 {
1390     uint32_t i;
1391     boost::unique_lock<boost::mutex> tplLock(tplMutex, boost::defer_lock);
1392 
1393     for (i = 0; i < jobs.size() && !cancelled(); i++)
1394     {
1395         fDec->write(uniqueID, *(jobs[i].msg));
1396         tplLock.lock();
1397         msgsSent += jobs[i].expectedResponses;
1398 
1399         if (recvWaiting)
1400             condvar.notify_all();
1401 
1402         while ((msgsSent - msgsRecvd > fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER)
1403                 && !fDie)
1404         {
1405             sendWaiting = true;
1406             condvarWakeupProducer.wait(tplLock);
1407             sendWaiting = false;
1408         }
1409 
1410         tplLock.unlock();
1411     }
1412 }
1413 
compareSingleValue(uint8_t COP,int64_t val1,int64_t val2) const1414 bool TupleBPS::compareSingleValue(uint8_t COP, int64_t val1, int64_t val2) const
1415 {
1416     switch (COP)
1417     {
1418         case COMPARE_LT:
1419         case COMPARE_NGE:
1420             return (val1 < val2);
1421 
1422         case COMPARE_LE:
1423         case COMPARE_NGT:
1424             return (val1 <= val2);
1425 
1426         case COMPARE_GT:
1427         case COMPARE_NLE:
1428             return (val1 > val2);
1429 
1430         case COMPARE_GE:
1431         case COMPARE_NLT:
1432             return (val1 >= val2);
1433 
1434         case COMPARE_EQ:
1435             return (val1 == val2);
1436 
1437         case COMPARE_NE:
1438             return (val1 != val2);
1439     }
1440 
1441     return false;
1442 }
1443 
1444 /* (range COP val) comparisons */
compareRange(uint8_t COP,int64_t min,int64_t max,int64_t val) const1445 bool TupleBPS::compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const
1446 {
1447     switch (COP)
1448     {
1449         case COMPARE_LT:
1450         case COMPARE_NGE:
1451             return (min < val);
1452 
1453         case COMPARE_LE:
1454         case COMPARE_NGT:
1455             return (min <= val);
1456 
1457         case COMPARE_GT:
1458         case COMPARE_NLE:
1459             return (max > val);
1460 
1461         case COMPARE_GE:
1462         case COMPARE_NLT:
1463             return (max >= val);
1464 
1465         case COMPARE_EQ:    // an 'in' comparison
1466             return (val >= min && val <= max);
1467 
1468         case COMPARE_NE:   //  'not in'
1469             return (val < min || val > max);
1470     }
1471 
1472     return false;
1473 }
1474 
processSingleFilterString_ranged(int8_t BOP,int8_t colWidth,int64_t min,int64_t max,const uint8_t * filterString,uint32_t filterCount) const1475 bool TupleBPS::processSingleFilterString_ranged(int8_t BOP, int8_t colWidth, int64_t min, int64_t max, const uint8_t* filterString,
1476         uint32_t filterCount) const
1477 {
1478     uint j;
1479     bool ret = true;
1480 
1481     for (j = 0; j < filterCount; j++)
1482     {
1483         int8_t COP;
1484         int64_t val2;
1485         bool thisPredicate;
1486         COP = *filterString++;
1487         filterString++;   // skip the round var, don't think that applies here
1488 
1489         switch (colWidth)
1490         {
1491             case 1:
1492                 val2 = *((int8_t*) filterString);
1493                 filterString++;
1494                 break;
1495 
1496             case 2:
1497                 val2 = *((int16_t*) filterString);
1498                 filterString += 2;
1499                 break;
1500 
1501             case 4:
1502                 val2 = *((int32_t*) filterString);
1503                 filterString += 4;
1504                 break;
1505 
1506             case 8:
1507                 val2 = *((int64_t*) filterString);
1508                 filterString += 8;
1509                 break;
1510 
1511             default:
1512                 throw logic_error("invalid column width");
1513         }
1514 
1515         thisPredicate = compareRange(COP, min, max, val2);
1516 
1517         if (j == 0)
1518             ret = thisPredicate;
1519 
1520         if (BOP == BOP_OR && thisPredicate)
1521             return true;
1522         else if (BOP == BOP_AND && !thisPredicate)
1523             return false;
1524     }
1525 
1526     return ret;
1527 }
1528 
processSingleFilterString(int8_t BOP,int8_t colWidth,int64_t val,const uint8_t * filterString,uint32_t filterCount) const1529 bool TupleBPS::processSingleFilterString(int8_t BOP, int8_t colWidth, int64_t val, const uint8_t* filterString,
1530         uint32_t filterCount) const
1531 {
1532     uint j;
1533     bool ret = true;
1534 
1535     for (j = 0; j < filterCount; j++)
1536     {
1537         int8_t COP;
1538         int64_t val2;
1539         bool thisPredicate;
1540         COP = *filterString++;
1541         filterString++;   // skip the round var, don't think that applies here
1542 
1543         switch (colWidth)
1544         {
1545             case 1:
1546                 val2 = *((int8_t*) filterString);
1547                 filterString++;
1548                 break;
1549 
1550             case 2:
1551                 val2 = *((int16_t*) filterString);
1552                 filterString += 2;
1553                 break;
1554 
1555             case 4:
1556                 val2 = *((int32_t*) filterString);
1557                 filterString += 4;
1558                 break;
1559 
1560             case 8:
1561                 val2 = *((int64_t*) filterString);
1562                 filterString += 8;
1563                 break;
1564 
1565             default:
1566                 throw logic_error("invalid column width");
1567         }
1568 
1569         thisPredicate = compareSingleValue(COP, val, val2);
1570 
1571         if (j == 0)
1572             ret = thisPredicate;
1573 
1574         if (BOP == BOP_OR && thisPredicate)
1575             return true;
1576         else if (BOP == BOP_AND && !thisPredicate)
1577             return false;
1578     }
1579 
1580     return ret;
1581 }
1582 
processOneFilterType(int8_t colWidth,int64_t value,uint32_t type) const1583 bool TupleBPS::processOneFilterType(int8_t colWidth, int64_t value, uint32_t type) const
1584 {
1585     const vector<SCommand>& filters = fBPP->getFilterSteps();
1586     uint i;
1587     bool ret = true;
1588     bool firstPseudo = true;
1589 
1590     for (i = 0; i < filters.size(); i++)
1591     {
1592         PseudoCCJL* pseudo = dynamic_cast<PseudoCCJL*>(filters[i].get());
1593 
1594         if (!pseudo || pseudo->getFunction() != type)
1595             continue;
1596 
1597         int8_t BOP = pseudo->getBOP();  // I think this is the same as TupleBPS's bop var...?
1598 
1599         /* 1-byte COP, 1-byte 'round', colWidth-bytes value */
1600         const uint8_t* filterString = pseudo->getFilterString().buf();
1601         uint32_t filterCount = pseudo->getFilterCount();
1602         bool thisPredicate = processSingleFilterString(BOP, colWidth, value, filterString, filterCount);
1603 
1604         if (firstPseudo)
1605         {
1606             firstPseudo = false;
1607             ret = thisPredicate;
1608         }
1609 
1610         if (bop == BOP_OR && thisPredicate)
1611             return true;
1612         else if (bop == BOP_AND && !thisPredicate)
1613             return false;
1614     }
1615 
1616     return ret;
1617 }
1618 
processLBIDFilter(const EMEntry & emEntry) const1619 bool TupleBPS::processLBIDFilter(const EMEntry& emEntry) const
1620 {
1621     const vector<SCommand>& filters = fBPP->getFilterSteps();
1622     uint i;
1623     bool ret = true;
1624     bool firstPseudo = true;
1625     LBID_t firstLBID = emEntry.range.start;
1626     LBID_t lastLBID = firstLBID + (emEntry.range.size * 1024) - 1;
1627 
1628     for (i = 0; i < filters.size(); i++)
1629     {
1630         PseudoCCJL* pseudo = dynamic_cast<PseudoCCJL*>(filters[i].get());
1631 
1632         if (!pseudo || pseudo->getFunction() != PSEUDO_BLOCKID)
1633             continue;
1634 
1635         int8_t BOP = pseudo->getBOP();  // I think this is the same as TupleBPS's bop var...?
1636 
1637         /* 1-byte COP, 1-byte 'round', colWidth-bytes value */
1638         const uint8_t* filterString = pseudo->getFilterString().buf();
1639         uint32_t filterCount = pseudo->getFilterCount();
1640         bool thisPredicate = processSingleFilterString_ranged(BOP, 8,
1641                              firstLBID, lastLBID, filterString, filterCount);
1642 
1643         if (firstPseudo)
1644         {
1645             firstPseudo = false;
1646             ret = thisPredicate;
1647         }
1648 
1649         if (bop == BOP_OR && thisPredicate)
1650             return true;
1651         else if (bop == BOP_AND && !thisPredicate)
1652             return false;
1653     }
1654 
1655     return ret;
1656 }
1657 
processPseudoColFilters(uint32_t extentIndex,boost::shared_ptr<map<int,int>> dbRootPMMap) const1658 bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr<map<int, int> > dbRootPMMap) const
1659 {
1660     if (!hasPCFilter)
1661         return true;
1662 
1663     const EMEntry& emEntry = scannedExtents[extentIndex];
1664 
1665     if (bop == BOP_AND)
1666     {
1667         /* All Pseudocolumns have been promoted to 8-bytes except the casual partitioning filters */
1668         return (!hasPMFilter || processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM))
1669                && (!hasSegmentFilter || processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT))
1670                && (!hasDBRootFilter || processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT))
1671                && (!hasSegmentDirFilter || processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR))
1672                && (!hasExtentIDFilter || processOneFilterType(8, emEntry.range.start, PSEUDO_EXTENTID))
1673                && (!hasMaxFilter || (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1674                                      processOneFilterType(emEntry.range.size, emEntry.partition.cprange.hi_val, PSEUDO_EXTENTMAX) : true))
1675                && (!hasMinFilter || (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1676                                      processOneFilterType(emEntry.range.size, emEntry.partition.cprange.lo_val, PSEUDO_EXTENTMIN) : true))
1677                && (!hasLBIDFilter || processLBIDFilter(emEntry))
1678                ;
1679     }
1680     else
1681     {
1682         return (hasPMFilter && processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM))
1683                || (hasSegmentFilter && processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT))
1684                || (hasDBRootFilter && processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT))
1685                || (hasSegmentDirFilter && processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR))
1686                || (hasExtentIDFilter && processOneFilterType(8, emEntry.range.start, PSEUDO_EXTENTID))
1687                || (hasMaxFilter && (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1688                                     processOneFilterType(emEntry.range.size, emEntry.partition.cprange.hi_val, PSEUDO_EXTENTMAX) : false))
1689                || (hasMinFilter && (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1690                                     processOneFilterType(emEntry.range.size, emEntry.partition.cprange.lo_val, PSEUDO_EXTENTMIN) : false))
1691                || (hasLBIDFilter && processLBIDFilter(emEntry))
1692                ;
1693     }
1694 }
1695 
1696 
makeJobs(vector<Job> * jobs)1697 void TupleBPS::makeJobs(vector<Job>* jobs)
1698 {
1699     boost::shared_ptr<ByteStream> bs;
1700     uint32_t i;
1701     uint32_t lbidsToScan;
1702     uint32_t blocksToScan;
1703     uint32_t blocksPerJob;
1704     LBID_t startingLBID;
1705     oam::OamCache* oamCache = oam::OamCache::makeOamCache();
1706     boost::shared_ptr<map<int, int> > dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
1707     boost::shared_ptr<map<int, int> > dbRootPMMap = oamCache->getDBRootToPMMap();
1708     int localPMId = oamCache->getLocalPMId();
1709 
1710     idbassert(ffirstStepType == SCAN);
1711 
1712     if (fOid >= 3000 && bop == BOP_AND)
1713         storeCasualPartitionInfo(false);
1714 
1715     totalMsgs = 0;
1716 
1717     for (i = 0; i < scannedExtents.size(); i++)
1718     {
1719 
1720         // the # of LBIDs to scan in this extent, if it will be scanned.
1721         //@bug 5322: status EXTENTSTATUSMAX+1 means single block extent.
1722         if ((scannedExtents[i].HWM == 0) &&
1723                 ((int) i < lastExtent[scannedExtents[i].dbRoot - 1]) &&
1724                 (scannedExtents[i].status <= EXTENTSTATUSMAX))
1725             lbidsToScan = scannedExtents[i].range.size * 1024;
1726         else
1727             lbidsToScan = scannedExtents[i].HWM - scannedExtents[i].blockOffset + 1;
1728 
1729         // skip this extent if CP data rules it out or the scan has already passed
1730         // the last extent for that DBRoot (import may be adding extents that shouldn't
1731         // be read yet).  Also skip if there's a pseudocolumn with a filter that would
1732         // eliminate this extent
1733 
1734         bool inBounds = ((int)i <= lastExtent[scannedExtents[i].dbRoot - 1]);
1735 
1736         if (!inBounds)
1737         {
1738             continue;
1739         }
1740 
1741         if (!scanFlags[i])
1742         {
1743             fNumBlksSkipped += lbidsToScan;
1744             continue;
1745         }
1746 
1747         if (!processPseudoColFilters(i, dbRootPMMap))
1748         {
1749             fNumBlksSkipped += lbidsToScan;
1750             continue;
1751         }
1752 
1753         //if (!scanFlags[i] || !inBounds)
1754         //	continue;
1755 
1756         /* Figure out many blocks have data in this extent
1757          * Calc how many jobs to issue,
1758          * Get the dbroot,
1759          * construct the job msgs
1760          */
1761 
1762         // Bug5741 If we are local only and this doesn't belongs to us, skip it
1763         if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY)
1764         {
1765             if (localPMId == 0)
1766             {
1767                 throw IDBExcept(ERR_LOCAL_QUERY_UM);
1768             }
1769 
1770             if (dbRootPMMap->find(scannedExtents[i].dbRoot)->second != localPMId)
1771                 continue;
1772         }
1773 
1774         // a necessary DB root is offline
1775         if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
1776         {
1777             // MCOL-259 force a reload of the xml. This usualy fixes it.
1778             Logger log;
1779             log.logMessage(logging::LOG_TYPE_WARNING, "forcing reload of columnstore.xml for dbRootConnectionMap");
1780             oamCache->forceReload();
1781             dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
1782 
1783             if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
1784             {
1785                 log.logMessage(logging::LOG_TYPE_WARNING, "dbroot still not in dbRootConnectionMap");
1786                 throw IDBExcept(ERR_DATA_OFFLINE);
1787             }
1788         }
1789 
1790 
1791         // the # of logical blocks in this extent
1792         if (lbidsToScan % fColType.colWidth)
1793             blocksToScan = lbidsToScan / fColType.colWidth + 1;
1794         else
1795             blocksToScan = lbidsToScan / fColType.colWidth;
1796 
1797         totalMsgs += blocksToScan;
1798 
1799         // how many logical blocks to process with a single job (& single thread on the PM)
1800 #if defined(_MSC_VER) && BOOST_VERSION < 105200
1801         blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16UL);
1802 #else
1803         blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16U);
1804 #endif
1805 
1806         startingLBID = scannedExtents[i].range.start;
1807 
1808         while (blocksToScan > 0)
1809         {
1810             uint32_t blocksThisJob = min(blocksToScan, blocksPerJob);
1811 
1812             fBPP->setLBID(startingLBID, scannedExtents[i]);
1813             fBPP->setCount(blocksThisJob);
1814             bs.reset(new ByteStream());
1815             fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot]);
1816             jobs->push_back(Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot],
1817                                 blocksThisJob, bs));
1818             blocksToScan -= blocksThisJob;
1819             startingLBID += fColType.colWidth * blocksThisJob;
1820             fBPP->reset();
1821         }
1822     }
1823 
1824 }
1825 
sendPrimitiveMessages()1826 void TupleBPS::sendPrimitiveMessages()
1827 {
1828     vector<Job> jobs;
1829 
1830     idbassert(ffirstStepType == SCAN);
1831 
1832     if (cancelled())
1833         goto abort;
1834 
1835     try
1836     {
1837         makeJobs(&jobs);
1838         interleaveJobs(&jobs);
1839         sendJobs(jobs);
1840     }
1841     catch (...)
1842     {
1843         sendError(logging::ERR_TUPLE_BPS);
1844         handleException(std::current_exception(),
1845                         logging::ERR_TUPLE_BPS,
1846                         logging::ERR_ALWAYS_CRITICAL,
1847                         "st: " + std::to_string(fStepId) +
1848                             " TupleBPS::sendPrimitiveMessages()");
1849         abort_nolock();
1850     }
1851 
1852 abort:
1853     boost::unique_lock<boost::mutex> tplLock(tplMutex);
1854     finishedSending = true;
1855     condvar.notify_all();
1856     tplLock.unlock();
1857 }
1858 
1859 struct _CPInfo
1860 {
_CPInfojoblist::_CPInfo1861     _CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool val) : min(MIN), max(MAX), LBID(l), valid(val) { };
1862     int64_t min;
1863     int64_t max;
1864     uint64_t LBID;
1865     bool valid;
1866 };
1867 
receiveMultiPrimitiveMessages(uint32_t threadID)1868 void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
1869 {
1870     AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
1871     RowGroupDL* dlp = (fDelivery ? deliveryDL.get() : dl->rowGroupDL());
1872 
1873     uint32_t size = 0;
1874     vector<boost::shared_ptr<ByteStream> > bsv;
1875 
1876     RGData rgData;
1877     vector<RGData> rgDatav;
1878     vector<RGData> fromPrimProc;
1879 
1880     bool validCPData;
1881     int64_t min;
1882     int64_t max;
1883     uint64_t lbid;
1884     vector<_CPInfo> cpv;
1885     uint32_t cachedIO;
1886     uint32_t physIO;
1887     uint32_t touchedBlocks;
1888     uint32_t cachedIO_Thread = 0;
1889     uint32_t physIO_Thread = 0;
1890     uint32_t touchedBlocks_Thread = 0;
1891     int64_t ridsReturned_Thread = 0;
1892     bool lastThread = false;
1893     uint32_t i, j, k;
1894     RowGroup local_primRG = primRowGroup;
1895     RowGroup local_outputRG = outputRowGroup;
1896     bool didEOF = false;
1897     bool unused;
1898 
1899     /* Join vars */
1900     vector<vector<Row::Pointer> > joinerOutput;   // clean usage
1901     Row largeSideRow, joinedBaseRow, largeNull, joinFERow;  // LSR clean
1902     scoped_array<Row> smallSideRows, smallNulls;
1903     scoped_array<uint8_t> joinedBaseRowData;
1904     scoped_array<uint8_t> joinFERowData;
1905     shared_array<int> largeMapping;
1906     vector<shared_array<int> > smallMappings;
1907     vector<shared_array<int> > fergMappings;
1908     RGData joinedData;
1909     scoped_array<uint8_t> largeNullMemory;
1910     scoped_array<shared_array<uint8_t> > smallNullMemory;
1911     uint32_t matchCount;
1912 
1913     /* Thread-scoped F&E 2 var */
1914     Row postJoinRow;    // postJoinRow is also used for joins
1915     RowGroup local_fe2Output;
1916     RGData local_fe2Data;
1917     Row local_fe2OutRow;
1918     funcexp::FuncExpWrapper local_fe2;
1919     StepTeleStats sts;
1920     sts.query_uuid = fQueryUuid;
1921     sts.step_uuid = fStepUuid;
1922     boost::unique_lock<boost::mutex> tplLock(tplMutex, boost::defer_lock);
1923 
1924     try
1925     {
1926         if (doJoin || fe2)
1927         {
1928             local_outputRG.initRow(&postJoinRow);
1929         }
1930 
1931         if (fe2)
1932         {
1933             local_fe2Output = fe2Output;
1934             local_fe2Output.initRow(&local_fe2OutRow);
1935             local_fe2Data.reinit(fe2Output);
1936             local_fe2Output.setData(&local_fe2Data);
1937             // local_fe2OutRow = fe2OutRow;
1938             local_fe2 = *fe2;
1939         }
1940 
1941         if (doJoin)
1942         {
1943             joinerOutput.resize(smallSideCount);
1944             smallSideRows.reset(new Row[smallSideCount]);
1945             smallNulls.reset(new Row[smallSideCount]);
1946             smallMappings.resize(smallSideCount);
1947             fergMappings.resize(smallSideCount + 1);
1948             smallNullMemory.reset(new shared_array<uint8_t>[smallSideCount]);
1949             local_primRG.initRow(&largeSideRow);
1950             local_outputRG.initRow(&joinedBaseRow, true);
1951             joinedBaseRowData.reset(new uint8_t[joinedBaseRow.getSize()]);
1952             joinedBaseRow.setData(joinedBaseRowData.get());
1953             joinedBaseRow.initToNull();
1954             largeMapping = makeMapping(local_primRG, local_outputRG);
1955 
1956             bool hasJoinFE = false;
1957 
1958             for (i = 0; i < smallSideCount; i++)
1959             {
1960                 joinerMatchesRGs[i].initRow(&smallSideRows[i]);
1961                 smallMappings[i] = makeMapping(joinerMatchesRGs[i], local_outputRG);
1962 
1963 //			if (tjoiners[i]->semiJoin() || tjoiners[i]->antiJoin()) {
1964                 if (tjoiners[i]->hasFEFilter())
1965                 {
1966                     fergMappings[i] = makeMapping(joinerMatchesRGs[i], joinFERG);
1967                     hasJoinFE = true;
1968                 }
1969 
1970 //			}
1971             }
1972 
1973             if (hasJoinFE)
1974             {
1975                 joinFERG.initRow(&joinFERow, true);
1976                 joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
1977                 memset(joinFERowData.get(), 0, joinFERow.getSize());
1978                 joinFERow.setData(joinFERowData.get());
1979                 fergMappings[smallSideCount] = makeMapping(local_primRG, joinFERG);
1980             }
1981 
1982             for (i = 0; i < smallSideCount; i++)
1983             {
1984                 joinerMatchesRGs[i].initRow(&smallNulls[i], true);
1985                 smallNullMemory[i].reset(new uint8_t[smallNulls[i].getSize()]);
1986                 smallNulls[i].setData(smallNullMemory[i].get());
1987                 smallNulls[i].initToNull();
1988             }
1989 
1990             local_primRG.initRow(&largeNull, true);
1991             largeNullMemory.reset(new uint8_t[largeNull.getSize()]);
1992             largeNull.setData(largeNullMemory.get());
1993             largeNull.initToNull();
1994 
1995 #if 0
1996 
1997             if (threadID == 0)
1998             {
1999                 /* Some rowgroup debugging stuff. */
2000                 uint8_t* tmp8;
2001                 tmp8 = local_primRG.getData();
2002                 local_primRG.setData(NULL);
2003                 cout << "large-side RG: " << local_primRG.toString() << endl;
2004                 local_primRG.setData(tmp8);
2005 
2006                 for (i = 0; i < smallSideCount; i++)
2007                 {
2008                     tmp8 = joinerMatchesRGs[i].getData();
2009                     joinerMatchesRGs[i].setData(NULL);
2010                     cout << "small-side[" << i << "] RG: " << joinerMatchesRGs[i].toString() << endl;
2011                 }
2012 
2013                 tmp8 = local_outputRG.getData();
2014                 local_outputRG.setData(NULL);
2015                 cout << "output RG: " << local_outputRG.toString() << endl;
2016                 local_outputRG.setData(tmp8);
2017 
2018                 cout << "large mapping:\n";
2019 
2020                 for (i = 0; i < local_primRG.getColumnCount(); i++)
2021                     cout << largeMapping[i] << " ";
2022 
2023                 cout << endl;
2024 
2025                 for (uint32_t z = 0; z < smallSideCount; z++)
2026                 {
2027                     cout << "small mapping[" << z << "] :\n";
2028 
2029                     for (i = 0; i < joinerMatchesRGs[z].getColumnCount(); i++)
2030                         cout << smallMappings[z][i] << " ";
2031 
2032                     cout << endl;
2033                 }
2034             }
2035 
2036 #endif
2037         }
2038 
2039         tplLock.lock();
2040 
2041         while (1)
2042         {
2043             // sync with the send side
2044             while (!finishedSending && msgsSent == msgsRecvd)
2045             {
2046                 recvWaiting++;
2047                 condvar.wait(tplLock);
2048                 recvWaiting--;
2049             }
2050 
2051             if (msgsSent == msgsRecvd && finishedSending)
2052                 break;
2053 
2054             bool flowControlOn;
2055             fDec->read_some(uniqueID, fNumThreads, bsv, &flowControlOn);
2056             size = bsv.size();
2057 
2058             // @bug 4562
2059             if (traceOn() && fOid >= 3000 && dlTimes.FirstReadTime().tv_sec == 0)
2060                 dlTimes.setFirstReadTime();
2061 
2062             if (fOid >= 3000 && threadID == 0 && sts.msg_type == StepTeleStats::ST_INVALID && size > 0)
2063             {
2064                 sts.msg_type = StepTeleStats::ST_START;
2065                 sts.total_units_of_work = totalMsgs;
2066                 postStepStartTele(sts);
2067             }
2068 
2069             /* This is a simple ramp-up of the TBPS msg processing threads.
2070             One thread is created by run(), and add'l threads are created
2071             as needed.  Right now, "as needed" means that flow control
2072             is on, which means that the UM is not keeping up by definition,
2073             or size > 5.  We found that using flow control alone was not aggressive
2074             enough when the messages were small.  The 'size' parameter checks
2075             the number of msgs waiting in the DEC buffers.  Since each
2076             message can be processed independently of the others, they can all
2077             be processed in different threads.  In benchmarking we found that
2078             there was no end-to-end performance difference between using 1
2079             and 20 msgs as the threshold.  Erring on the side of aggressiveness,
2080             we chose '5'.
2081             '5' still preserves the original goal of not starting MAX threads
2082             for small queries or when the UM can keep up with the PMs with
2083             fewer threads.  Tweak as necessary. */
2084 
2085             if ((size > 5 || flowControlOn) && fNumThreads < fMaxNumThreads)
2086                 startAggregationThread();
2087 
2088             for (uint32_t z = 0; z < size; z++)
2089             {
2090                 if (bsv[z]->length() > 0 && fBPP->countThisMsg(*(bsv[z])))
2091                     ++msgsRecvd;
2092             }
2093 
2094             //@Bug 1424,1298
2095 
2096             if (sendWaiting && ((msgsSent - msgsRecvd) <=
2097                                 (fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER)))
2098             {
2099                 condvarWakeupProducer.notify_one();
2100                 THROTTLEDEBUG << "receiveMultiPrimitiveMessages wakes up sending side .. " << "  msgsSent: " << msgsSent << "  msgsRecvd = " << msgsRecvd << endl;
2101             }
2102 
2103             /* If there's an error and the joblist is being aborted, don't
2104             	sit around forever waiting for responses.  */
2105             if (cancelled())
2106             {
2107                 if (sendWaiting)
2108                     condvarWakeupProducer.notify_one();
2109 
2110                 break;
2111             }
2112 
2113             if (size == 0)
2114             {
2115                 tplLock.unlock();
2116                 usleep(2000 * fNumThreads);
2117                 tplLock.lock();
2118                 continue;
2119             }
2120 
2121             tplLock.unlock();
2122 
2123             for (i = 0; i < size && !cancelled(); i++)
2124             {
2125                 ByteStream* bs = bsv[i].get();
2126 
2127                 // @bug 488. when PrimProc node is down. error out
2128                 //An error condition.  We are not going to do anymore.
2129                 ISMPacketHeader* hdr = (ISMPacketHeader*)(bs->buf());
2130 
2131                 if (bs->length() == 0 || hdr->Status > 0)
2132                 {
2133                     /* PM errors mean this should abort right away instead of draining the PM backlog */
2134                     tplLock.lock();
2135 
2136                     if (bs->length() == 0)
2137                     {
2138                         errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_PRIMPROC_DOWN));
2139                         status(ERR_PRIMPROC_DOWN);
2140                     }
2141                     else
2142                     {
2143                         string errMsg;
2144 
2145                         bs->advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
2146                         *bs >> errMsg;
2147                         status(hdr->Status);
2148                         errorMessage(errMsg);
2149                     }
2150 
2151                     abort_nolock();
2152                     goto out;
2153                 }
2154 
2155                 fromPrimProc.clear();
2156                 fBPP->getRowGroupData(*bs, &fromPrimProc, &validCPData, &lbid, &min, &max,
2157                                       &cachedIO, &physIO, &touchedBlocks, &unused, threadID);
2158 
2159                 /* Another layer of messiness.  Need to refactor this fcn. */
2160                 while (!fromPrimProc.empty() && !cancelled())
2161                 {
2162                     rgData = fromPrimProc.back();
2163                     fromPrimProc.pop_back();
2164 
2165                     local_primRG.setData(&rgData);
2166                     ridsReturned_Thread += local_primRG.getRowCount();   // TODO need the pre-join count even on PM joins... later
2167 
2168                     /* TupleHashJoinStep::joinOneRG() is a port of the main join loop here.  Any
2169                     * changes made here should also be made there and vice versa. */
2170                     if (hasUMJoin || !fBPP->pmSendsFinalResult())
2171                     {
2172                         joinedData = RGData(local_outputRG);
2173                         local_outputRG.setData(&joinedData);
2174                         local_outputRG.resetRowGroup(local_primRG.getBaseRid());
2175                         local_outputRG.setDBRoot(local_primRG.getDBRoot());
2176                         local_primRG.getRow(0, &largeSideRow);
2177 
2178                         for (k = 0; k < local_primRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow())
2179                         {
2180                             matchCount = 0;
2181 
2182                             for (j = 0; j < smallSideCount; j++)
2183                             {
2184                                 tjoiners[j]->match(largeSideRow, k, threadID, &joinerOutput[j]);
2185 #ifdef JLF_DEBUG
2186                                     // Debugging code to print the matches
2187                                 	Row r;
2188                                 	joinerMatchesRGs[j].initRow(&r);
2189                                 	cout << joinerOutput[j].size() << " matches: \n";
2190                                 	for (uint32_t z = 0; z < joinerOutput[j].size(); z++) {
2191                                 		r.setPointer(joinerOutput[j][z]);
2192                                 		cout << "  " << r.toString() << endl;
2193                                 	}
2194 #endif
2195                                 matchCount = joinerOutput[j].size();
2196 
2197                                 if (tjoiners[j]->inUM())
2198                                 {
2199                                     /* Count the # of rows that pass the join filter */
2200                                     if (tjoiners[j]->hasFEFilter() && matchCount > 0)
2201                                     {
2202                                         vector<Row::Pointer> newJoinerOutput;
2203                                         applyMapping(fergMappings[smallSideCount], largeSideRow, &joinFERow);
2204 
2205                                         for (uint32_t z = 0; z < joinerOutput[j].size(); z++)
2206                                         {
2207                                             smallSideRows[j].setPointer(joinerOutput[j][z]);
2208                                             applyMapping(fergMappings[j], smallSideRows[j], &joinFERow);
2209 
2210                                             if (!tjoiners[j]->evaluateFilter(joinFERow, threadID))
2211                                                 matchCount--;
2212                                             else
2213                                             {
2214                                                 /* The first match includes it in a SEMI join result and excludes it from an ANTI join
2215                                                 * result.  If it's SEMI & SCALAR however, it needs to continue.
2216                                                 */
2217                                                 newJoinerOutput.push_back(joinerOutput[j][z]);
2218 
2219                                                 if (tjoiners[j]->antiJoin() || (tjoiners[j]->semiJoin() && !tjoiners[j]->scalar()))
2220                                                     break;
2221                                             }
2222                                         }
2223 
2224                                         // the filter eliminated all matches, need to join with the NULL row
2225                                         if (matchCount == 0 && tjoiners[j]->largeOuterJoin())
2226                                         {
2227                                             newJoinerOutput.push_back(Row::Pointer(smallNullMemory[j].get()));
2228                                             matchCount = 1;
2229                                         }
2230 
2231                                         joinerOutput[j].swap(newJoinerOutput);
2232                                     }
2233 
2234                                     // XXXPAT: This has gone through enough revisions it would benefit
2235                                     // from refactoring
2236 
2237                                     /* If anti-join, reverse the result */
2238                                     if (tjoiners[j]->antiJoin())
2239                                     {
2240                                         matchCount = (matchCount ? 0 : 1);
2241                                     }
2242 
2243                                     if (matchCount == 0)
2244                                     {
2245                                         joinerOutput[j].clear();
2246                                         break;
2247                                     }
2248                                     else if (!tjoiners[j]->scalar() &&
2249                                              (tjoiners[j]->antiJoin() || tjoiners[j]->semiJoin()))
2250                                     {
2251                                         joinerOutput[j].clear();
2252                                         joinerOutput[j].push_back(Row::Pointer(smallNullMemory[j].get()));
2253                                         matchCount = 1;
2254                                     }
2255                                 }
2256 
2257                                 if (matchCount == 0 && tjoiners[j]->innerJoin())
2258                                     break;
2259 
2260                                 /* Scalar check */
2261                                 if (tjoiners[j]->scalar() && matchCount > 1)
2262                                 {
2263                                     errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW));
2264                                     status(ERR_MORE_THAN_1_ROW);
2265                                     abort();
2266                                 }
2267 
2268                                 if (tjoiners[j]->smallOuterJoin())
2269                                     tjoiners[j]->markMatches(threadID, joinerOutput[j]);
2270 
2271                             }
2272 
2273                             if (matchCount > 0)
2274                             {
2275                                 applyMapping(largeMapping, largeSideRow, &joinedBaseRow);
2276                                 joinedBaseRow.setRid(largeSideRow.getRelRid());
2277                                 generateJoinResultSet(joinerOutput, joinedBaseRow, smallMappings,
2278                                                       0, local_outputRG, joinedData, &rgDatav, smallSideRows, postJoinRow);
2279 
2280                                 /* Bug 3510: Don't let the join results buffer get out of control.  Need
2281                                 to refactor this.  All post-join processing needs to go here AND below
2282                                 for now. */
2283                                 if (rgDatav.size() * local_outputRG.getMaxDataSize() > 50000000)
2284                                 {
2285                                     RowGroup out(local_outputRG);
2286 
2287                                     if (fe2 && !runFEonPM)
2288                                     {
2289                                         processFE2(out, local_fe2Output, postJoinRow,
2290                                                    local_fe2OutRow, &rgDatav, &local_fe2);
2291                                         rgDataVecToDl(rgDatav, local_fe2Output, dlp);
2292                                     }
2293                                     else
2294                                         rgDataVecToDl(rgDatav, out, dlp);
2295                                 }
2296                             }
2297                         }  // end of the for-loop in the join code
2298 
2299                         if (local_outputRG.getRowCount() > 0)
2300                         {
2301                             rgDatav.push_back(joinedData);
2302                         }
2303                     }
2304                     else
2305                     {
2306                         rgDatav.push_back(rgData);
2307                     }
2308 
2309                     /* Execute UM F & E group 2 on rgDatav */
2310                     if (fe2 && !runFEonPM && rgDatav.size() > 0 && !cancelled())
2311                     {
2312                         processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
2313                         rgDataVecToDl(rgDatav, local_fe2Output, dlp);
2314                     }
2315 
2316                     cachedIO_Thread += cachedIO;
2317                     physIO_Thread += physIO;
2318                     touchedBlocks_Thread += touchedBlocks;
2319 
2320                     if (fOid >= 3000 && ffirstStepType == SCAN && bop == BOP_AND)
2321                         cpv.push_back(_CPInfo(min, max, lbid, validCPData));
2322                 }  // end of the per-rowgroup processing loop
2323 
2324                 // insert the resulting rowgroup data from a single bytestream into dlp
2325                 if (rgDatav.size() > 0)
2326                 {
2327                     if (fe2 && runFEonPM)
2328                         rgDataVecToDl(rgDatav, local_fe2Output, dlp);
2329                     else
2330                         rgDataVecToDl(rgDatav, local_outputRG, dlp);
2331                 }
2332             }  // end of the per-bytestream loop
2333 
2334             // @bug 4562
2335             if (traceOn() && fOid >= 3000)
2336                 dlTimes.setFirstInsertTime();
2337 
2338             //update casual partition
2339             size = cpv.size();
2340 
2341             if (size > 0 && !cancelled())
2342             {
2343                 cpMutex.lock();
2344 
2345                 for (i = 0; i < size; i++)
2346                 {
2347                     lbidList->UpdateMinMax(cpv[i].min, cpv[i].max, cpv[i].LBID, fColType,
2348                                            cpv[i].valid);
2349                 }
2350 
2351                 cpMutex.unlock();
2352             }
2353 
2354             cpv.clear();
2355 
2356             tplLock.lock();
2357 
2358             if (fOid >= 3000)
2359             {
2360                 uint64_t progress = msgsRecvd * 100 / totalMsgs;
2361                 bool postProgress = (progress > fProgress);
2362 
2363                 if (postProgress)
2364                 {
2365                     fProgress = progress;
2366 
2367                     sts.msg_type = StepTeleStats::ST_PROGRESS;
2368                     sts.total_units_of_work = totalMsgs;
2369                     sts.units_of_work_completed = msgsRecvd;
2370                     postStepProgressTele(sts);
2371                 }
2372             }
2373 
2374         } // done reading
2375 
2376     }//try
2377     catch (...)
2378     {
2379         handleException(std::current_exception(),
2380                         logging::ERR_TUPLE_BPS,
2381                         logging::ERR_ALWAYS_CRITICAL,
2382                         "st: " + std::to_string(fStepId) +
2383                             " TupleBPS::receiveMultiPrimitiveMessages()");
2384         abort_nolock();
2385     }
2386 
2387 out:
2388 
2389     if (++recvExited == fNumThreads)
2390     {
2391 
2392         if (doJoin && smallOuterJoiner != -1 && !cancelled())
2393         {
2394             tplLock.unlock();
2395             /* If this was a left outer join, this needs to put the unmatched
2396                rows from the joiner into the output
2397                XXXPAT: This might be a problem if later steps depend
2398                on sensible rids and/or sensible ordering */
2399             vector<Row::Pointer> unmatched;
2400 #ifdef JLF_DEBUG
2401             cout << "finishing small-outer join output\n";
2402 #endif
2403             i = smallOuterJoiner;
2404             tjoiners[i]->getUnmarkedRows(&unmatched);
2405             joinedData = RGData(local_outputRG);
2406             local_outputRG.setData(&joinedData);
2407             local_outputRG.resetRowGroup(-1);
2408             local_outputRG.getRow(0, &joinedBaseRow);
2409 
2410             for (j = 0; j < unmatched.size(); j++)
2411             {
2412                 smallSideRows[i].setPointer(unmatched[j]);
2413 
2414                 for (k = 0; k < smallSideCount; k++)
2415                 {
2416                     if (i == k)
2417                         applyMapping(smallMappings[i], smallSideRows[i], &joinedBaseRow);
2418                     else
2419                         applyMapping(smallMappings[k], smallNulls[k], &joinedBaseRow);
2420                 }
2421 
2422                 applyMapping(largeMapping, largeNull, &joinedBaseRow);
2423                 joinedBaseRow.setRid(0);
2424                 joinedBaseRow.nextRow();
2425                 local_outputRG.incRowCount();
2426 
2427                 if (local_outputRG.getRowCount() == 8192)
2428                 {
2429                     if (fe2)
2430                     {
2431                         rgDatav.push_back(joinedData);
2432                         processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
2433 
2434                         if (rgDatav.size() > 0)
2435                             rgDataToDl(rgDatav[0], local_fe2Output, dlp);
2436 
2437                         rgDatav.clear();
2438                     }
2439                     else
2440                         rgDataToDl(joinedData, local_outputRG, dlp);
2441 
2442                     joinedData = RGData(local_outputRG);
2443                     local_outputRG.setData(&joinedData);
2444                     local_outputRG.resetRowGroup(-1);
2445                     local_outputRG.getRow(0, &joinedBaseRow);
2446                 }
2447             }
2448 
2449             if (local_outputRG.getRowCount() > 0)
2450             {
2451                 if (fe2)
2452                 {
2453                     rgDatav.push_back(joinedData);
2454                     processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
2455 
2456                     if (rgDatav.size() > 0)
2457                         rgDataToDl(rgDatav[0], local_fe2Output, dlp);
2458 
2459                     rgDatav.clear();
2460                 }
2461                 else
2462                     rgDataToDl(joinedData, local_outputRG, dlp);
2463             }
2464 
2465             tplLock.lock();
2466         }
2467 
2468         if (traceOn() && fOid >= 3000)
2469         {
2470             //...Casual partitioning could cause us to do no processing.  In that
2471             //...case these time stamps did not get set.  So we set them here.
2472             if (dlTimes.FirstReadTime().tv_sec == 0)
2473             {
2474                 dlTimes.setFirstReadTime();
2475                 dlTimes.setLastReadTime();
2476                 dlTimes.setFirstInsertTime();
2477             }
2478 
2479             dlTimes.setEndOfInputTime();
2480         }
2481 
2482         ByteStream bs;
2483 
2484         try
2485         {
2486             if (BPPIsAllocated)
2487             {
2488                 fDec->removeDECEventListener(this);
2489                 fBPP->destroyBPP(bs);
2490                 fDec->write(uniqueID, bs);
2491                 BPPIsAllocated = false;
2492             }
2493         }
2494         // catch and do nothing. Let it continue with the clean up and profiling
2495         catch (const std::exception& e)
2496         {
2497             cerr << "tuple-bps caught: " << e.what() << endl;
2498         }
2499         catch (...)
2500         {
2501             cerr << "tuple-bps caught unknown exception" << endl;
2502         }
2503 
2504         Stats stats = fDec->getNetworkStats(uniqueID);
2505         fMsgBytesIn = stats.dataRecvd();
2506         fMsgBytesOut = stats.dataSent();
2507         fDec->removeQueue(uniqueID);
2508         tjoiners.clear();
2509 
2510         lastThread = true;
2511     }
2512 
2513     //@Bug 1099
2514     ridsReturned += ridsReturned_Thread;
2515     fPhysicalIO += physIO_Thread;
2516     fCacheIO += cachedIO_Thread;
2517     fBlockTouched += touchedBlocks_Thread;
2518     tplLock.unlock();
2519 
2520     if (fTableOid >= 3000 && lastThread)
2521     {
2522         struct timeval tvbuf;
2523         gettimeofday(&tvbuf, 0);
2524         FIFO<boost::shared_array<uint8_t> >* pFifo = 0;
2525         uint64_t totalBlockedReadCount  = 0;
2526         uint64_t totalBlockedWriteCount = 0;
2527 
2528         //...Sum up the blocked FIFO reads for all input associations
2529         size_t inDlCnt  = fInputJobStepAssociation.outSize();
2530 
2531         for (size_t iDataList = 0; iDataList < inDlCnt; iDataList++)
2532         {
2533             pFifo = dynamic_cast<FIFO<boost::shared_array<uint8_t> > *>(
2534                         fInputJobStepAssociation.outAt(iDataList)->rowGroupDL());
2535 
2536             if (pFifo)
2537             {
2538                 totalBlockedReadCount += pFifo->blockedReadCount();
2539             }
2540         }
2541 
2542         //...Sum up the blocked FIFO writes for all output associations
2543         size_t outDlCnt = fOutputJobStepAssociation.outSize();
2544 
2545         for (size_t iDataList = 0; iDataList < outDlCnt; iDataList++)
2546         {
2547             pFifo = dynamic_cast<FIFO<boost::shared_array<uint8_t> > *>(dlp);
2548 
2549             if (pFifo)
2550             {
2551                 totalBlockedWriteCount += pFifo->blockedWriteCount();
2552             }
2553         }
2554 
2555         //...Roundoff msg byte counts to nearest KB for display
2556         uint64_t msgBytesInKB  = fMsgBytesIn >> 10;
2557         uint64_t msgBytesOutKB = fMsgBytesOut >> 10;
2558 
2559         if (fMsgBytesIn & 512)
2560             msgBytesInKB++;
2561 
2562         if (fMsgBytesOut & 512)
2563             msgBytesOutKB++;
2564 
2565         if (traceOn())
2566         {
2567             // @bug 828
2568             ostringstream logStr;
2569             logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " <<
2570                    JSTimeStamp::format(tvbuf) << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-"  <<
2571                    fCacheIO << "; MsgsSent-" << msgsSent << "; MsgsRvcd-" << msgsRecvd <<
2572                    "; BlocksTouched-" << fBlockTouched <<
2573                    "; BlockedFifoIn/Out-" << totalBlockedReadCount <<
2574                    "/" << totalBlockedWriteCount <<
2575                    "; output size-" << ridsReturned << endl <<
2576                    "\tPartitionBlocksEliminated-" << fNumBlksSkipped <<
2577                    "; MsgBytesIn-"  << msgBytesInKB  << "KB" <<
2578                    "; MsgBytesOut-" << msgBytesOutKB << "KB" <<
2579                    "; TotalMsgs-" << totalMsgs << endl  <<
2580                    "\t1st read " << dlTimes.FirstReadTimeString() <<
2581                    "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" <<
2582                    JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) <<
2583                    "s\n\tUUID " << uuids::to_string(fStepUuid) <<
2584                    "\n\tQuery UUID " << uuids::to_string(queryUuid()) <<
2585                    "\n\tJob completion status " << status() << endl;
2586             logEnd(logStr.str().c_str());
2587 
2588             syslogReadBlockCounts(16,      // exemgr sybsystem
2589                                   fPhysicalIO,               // # blocks read from disk
2590                                   fCacheIO,                  // # blocks read from cache
2591                                   fNumBlksSkipped);          // # casual partition block hits
2592             syslogProcessingTimes(16,      // exemgr subsystem
2593                                   dlTimes.FirstReadTime(),   // first datalist read
2594                                   dlTimes.LastReadTime(),    // last  datalist read
2595                                   dlTimes.FirstInsertTime(), // first datalist write
2596                                   dlTimes.EndOfInputTime()); // last (endOfInput) datalist write
2597             syslogEndStep(16,              // exemgr subsystem
2598                           totalBlockedReadCount,     // blocked datalist input
2599                           totalBlockedWriteCount,    // blocked datalist output
2600                           fMsgBytesIn,               // incoming msg byte count
2601                           fMsgBytesOut);             // outgoing msg byte count
2602             fExtendedInfo += toString() + logStr.str();
2603             formatMiniStats();
2604         }
2605 
2606         if (lastThread && fOid >= 3000)
2607         {
2608             sts.msg_type = StepTeleStats::ST_SUMMARY;
2609             sts.phy_io = fPhysicalIO;
2610             sts.cache_io = fCacheIO;
2611             sts.msg_rcv_cnt = sts.total_units_of_work = sts.units_of_work_completed = msgsRecvd;
2612             sts.cp_blocks_skipped = fNumBlksSkipped;
2613             sts.msg_bytes_in = fMsgBytesIn;
2614             sts.msg_bytes_out = fMsgBytesOut;
2615             sts.rows = ridsReturned;
2616             postStepSummaryTele(sts);
2617         }
2618 
2619         if (ffirstStepType == SCAN && bop == BOP_AND && !cancelled())
2620         {
2621             cpMutex.lock();
2622             lbidList->UpdateAllPartitionInfo();
2623             cpMutex.unlock();
2624         }
2625     }
2626 
2627     // Bug 3136, let mini stats to be formatted if traceOn.
2628     if (lastThread && !didEOF)
2629         dlp->endOfInput();
2630 }
2631 
toString() const2632 const string TupleBPS::toString() const
2633 {
2634     ostringstream oss;
2635     oss << "TupleBPS        ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId <<
2636         " tb/col:" << fTableOid << "/" << fOid;
2637 
2638     if (alias().length()) oss << " alias:" << alias();
2639 
2640     if (view().length()) oss << " view:" << view();
2641 
2642 #if 0
2643 
2644     // @bug 1282, don't have output datalist for delivery
2645     if (!fDelivery)
2646         oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
2647 
2648 #else
2649 
2650     if (fDelivery)
2651         oss << " is del ";
2652     else
2653         oss << " not del ";
2654 
2655     if (bop == BOP_OR)
2656         oss << " BOP_OR ";
2657 
2658     if (fDie)
2659         oss << " aborting " << msgsSent << "/" << msgsRecvd << " " << uniqueID << " ";
2660 
2661     if (fOutputJobStepAssociation.outSize() > 0)
2662     {
2663         oss << fOutputJobStepAssociation.outAt(0);
2664 
2665         if (fOutputJobStepAssociation.outSize() > 1)
2666             oss << " (too many outputs?)";
2667     }
2668     else
2669     {
2670         oss << " (no outputs?)";
2671     }
2672 
2673 #endif
2674     oss << " nf:" << fFilterCount;
2675     oss << " in:";
2676 
2677     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
2678     {
2679         oss << fInputJobStepAssociation.outAt(i);
2680     }
2681 
2682     oss << endl << "  UUID: " << uuids::to_string(fStepUuid) << endl;
2683     oss << "  Query UUID: " << uuids::to_string(queryUuid()) << endl;
2684     oss << "  " << fBPP->toString() << endl;
2685     return oss.str();
2686 }
2687 
2688 /* This exists to avoid a DBRM lookup for every rid. */
scanit(uint64_t rid)2689 inline bool TupleBPS::scanit(uint64_t rid)
2690 {
2691     uint64_t fbo;
2692     uint32_t extentIndex;
2693 
2694     if (fOid < 3000)
2695         return true;
2696 
2697     fbo = rid >> rpbShift;
2698     extentIndex = fbo >> divShift;
2699     return scanFlags[extentIndex] && runtimeCPFlags[extentIndex];
2700 }
2701 
getFBO(uint64_t lbid)2702 uint64_t TupleBPS::getFBO(uint64_t lbid)
2703 {
2704     uint32_t i;
2705     uint64_t lastLBID;
2706 
2707     for (i = 0; i < numExtents; i++)
2708     {
2709         lastLBID = scannedExtents[i].range.start + (scannedExtents[i].range.size << 10) - 1;
2710 
2711         if (lbid >= (uint64_t) scannedExtents[i].range.start && lbid <= lastLBID)
2712             return (lbid - scannedExtents[i].range.start) + (i << divShift);
2713     }
2714 
2715     throw logic_error("TupleBPS: didn't find the FBO?");
2716 }
2717 
useJoiner(boost::shared_ptr<joiner::TupleJoiner> tj)2718 void TupleBPS::useJoiner(boost::shared_ptr<joiner::TupleJoiner> tj)
2719 {
2720     vector<boost::shared_ptr<joiner::TupleJoiner> > v;
2721     v.push_back(tj);
2722     useJoiners(v);
2723 }
2724 
useJoiners(const vector<boost::shared_ptr<joiner::TupleJoiner>> & joiners)2725 void TupleBPS::useJoiners(const vector<boost::shared_ptr<joiner::TupleJoiner> >& joiners)
2726 {
2727     uint32_t i;
2728 
2729     tjoiners = joiners;
2730     doJoin = (joiners.size() != 0);
2731 
2732     joinerMatchesRGs.clear();
2733     smallSideCount = tjoiners.size();
2734     hasPMJoin = false;
2735     hasUMJoin = false;
2736 
2737     for (i = 0; i < smallSideCount; i++)
2738     {
2739         joinerMatchesRGs.push_back(tjoiners[i]->getSmallRG());
2740 
2741         if (tjoiners[i]->inPM())
2742             hasPMJoin = true;
2743         else
2744             hasUMJoin = true;
2745 
2746         if (tjoiners[i]->getJoinType() & SMALLOUTER)
2747             smallOuterJoiner = i;
2748     }
2749 
2750     if (hasPMJoin)
2751         fBPP->useJoiners(tjoiners);
2752 }
2753 
newPMOnline(uint32_t connectionNumber)2754 void TupleBPS::newPMOnline(uint32_t connectionNumber)
2755 {
2756     ByteStream bs;
2757 
2758     fBPP->createBPP(bs);
2759 
2760     try
2761     {
2762         fDec->write(bs, connectionNumber);
2763 
2764         if (hasPMJoin)
2765             serializeJoiner(connectionNumber);
2766     }
2767     catch (IDBExcept& e)
2768     {
2769         abort();
2770         catchHandler(e.what(), e.errorCode(), fErrorInfo, fSessionId);
2771     }
2772 }
2773 
setInputRowGroup(const rowgroup::RowGroup & rg)2774 void TupleBPS::setInputRowGroup(const rowgroup::RowGroup& rg)
2775 {
2776     inputRowGroup = rg;
2777     fBPP->setInputRowGroup(rg);
2778 }
2779 
setOutputRowGroup(const rowgroup::RowGroup & rg)2780 void TupleBPS::setOutputRowGroup(const rowgroup::RowGroup& rg)
2781 {
2782     outputRowGroup = rg;
2783     primRowGroup = rg;
2784     fBPP->setProjectionRowGroup(rg);
2785     checkDupOutputColumns(rg);
2786 
2787     if (fe2)
2788         fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2789 }
2790 
setJoinedResultRG(const rowgroup::RowGroup & rg)2791 void TupleBPS::setJoinedResultRG(const rowgroup::RowGroup& rg)
2792 {
2793     outputRowGroup = rg;
2794     checkDupOutputColumns(rg);
2795     fBPP->setJoinedRowGroup(rg);
2796 
2797     if (fe2)
2798         fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2799 }
2800 
2801 /* probably worthwhile to make some of these class vars */
generateJoinResultSet(const vector<vector<Row::Pointer>> & joinerOutput,Row & baseRow,const vector<shared_array<int>> & mappings,const uint32_t depth,RowGroup & outputRG,RGData & rgData,vector<RGData> * outputData,const scoped_array<Row> & smallRows,Row & joinedRow)2802 void TupleBPS::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput,
2803                                      Row& baseRow, const vector<shared_array<int> >& mappings, const uint32_t depth,
2804                                      RowGroup& outputRG, RGData& rgData, vector<RGData>* outputData, const scoped_array<Row>& smallRows,
2805                                      Row& joinedRow)
2806 {
2807     uint32_t i;
2808     Row& smallRow = smallRows[depth];
2809 
2810     if (depth < smallSideCount - 1)
2811     {
2812         for (i = 0; i < joinerOutput[depth].size(); i++)
2813         {
2814             smallRow.setPointer(joinerOutput[depth][i]);
2815             applyMapping(mappings[depth], smallRow, &baseRow);
2816             generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1,
2817                                   outputRG, rgData, outputData, smallRows, joinedRow);
2818         }
2819     }
2820     else
2821     {
2822         outputRG.getRow(outputRG.getRowCount(), &joinedRow);
2823 
2824         for (i = 0; i < joinerOutput[depth].size(); i++, joinedRow.nextRow(),
2825                 outputRG.incRowCount())
2826         {
2827             smallRow.setPointer(joinerOutput[depth][i]);
2828 
2829             if (UNLIKELY(outputRG.getRowCount() == 8192))
2830             {
2831                 uint32_t dbRoot = outputRG.getDBRoot();
2832                 uint64_t baseRid = outputRG.getBaseRid();
2833                 outputData->push_back(rgData);
2834                 rgData = RGData(outputRG);
2835                 outputRG.setData(&rgData);
2836                 outputRG.resetRowGroup(baseRid);
2837                 outputRG.setDBRoot(dbRoot);
2838                 outputRG.getRow(0, &joinedRow);
2839             }
2840 
2841             applyMapping(mappings[depth], smallRow, &baseRow);
2842             copyRow(baseRow, &joinedRow);
2843         }
2844     }
2845 }
2846 
getOutputRowGroup() const2847 const rowgroup::RowGroup& TupleBPS::getOutputRowGroup() const
2848 {
2849     return outputRowGroup;
2850 }
2851 
setAggregateStep(const rowgroup::SP_ROWAGG_PM_t & agg,const rowgroup::RowGroup & rg)2852 void TupleBPS::setAggregateStep(const rowgroup::SP_ROWAGG_PM_t& agg, const rowgroup::RowGroup& rg)
2853 {
2854     if (rg.getColumnCount() > 0)
2855     {
2856         fAggRowGroupPm = rg;
2857         fAggregatorPm = agg;
2858 
2859         fBPP->addAggregateStep(agg, rg);
2860         fBPP->setNeedRidsAtDelivery(false);
2861     }
2862 }
2863 
setBOP(uint8_t op)2864 void TupleBPS::setBOP(uint8_t op)
2865 {
2866     bop = op;
2867     fBPP->setBOP(bop);
2868 }
2869 
setJobInfo(const JobInfo * jobInfo)2870 void TupleBPS::setJobInfo(const JobInfo* jobInfo)
2871 {
2872     fBPP->jobInfo(jobInfo);
2873 }
2874 
getEstimatedRowCount()2875 uint64_t TupleBPS::getEstimatedRowCount()
2876 {
2877     // Call function that populates the scanFlags array based on the extents that qualify based on casual partitioning.
2878     storeCasualPartitionInfo(true);
2879     // TODO:  Strip out the cout below after a few days of testing.
2880 #ifdef JLF_DEBUG
2881     cout << "OID-" << fOid << " EstimatedRowCount-" << fEstimatedRows << endl;
2882 #endif
2883     return fEstimatedRows;
2884 }
2885 
checkDupOutputColumns(const rowgroup::RowGroup & rg)2886 void TupleBPS::checkDupOutputColumns(const rowgroup::RowGroup& rg)
2887 {
2888     // bug 1965, find if any duplicate columns selected
2889     map<uint32_t, uint32_t> keymap; // map<unique col key, col index in the row group>
2890     dupColumns.clear();
2891     const vector<uint32_t>& keys = rg.getKeys();
2892 
2893     for (uint32_t i = 0; i < keys.size(); i++)
2894     {
2895         map<uint32_t, uint32_t>::iterator j = keymap.find(keys[i]);
2896 
2897         if (j == keymap.end())
2898             keymap.insert(make_pair(keys[i], i));          // map key to col index
2899         else
2900             dupColumns.push_back(make_pair(i, j->second)); // dest/src index pair
2901     }
2902 }
2903 
dupOutputColumns(RGData & data,RowGroup & rg)2904 void TupleBPS::dupOutputColumns(RGData& data, RowGroup& rg)
2905 {
2906     rg.setData(&data);
2907     dupOutputColumns(rg);
2908 }
2909 
dupOutputColumns(RowGroup & rg)2910 void TupleBPS::dupOutputColumns(RowGroup& rg)
2911 {
2912     Row workingRow;
2913     rg.initRow(&workingRow);
2914     rg.getRow(0, &workingRow);
2915 
2916     for (uint64_t i = 0; i < rg.getRowCount(); i++)
2917     {
2918         for (uint64_t j = 0; j < dupColumns.size(); j++)
2919             workingRow.copyField(dupColumns[j].first, dupColumns[j].second);
2920 
2921         workingRow.nextRow();
2922     }
2923 }
2924 
stepId(uint16_t stepId)2925 void TupleBPS::stepId(uint16_t stepId)
2926 {
2927     fStepId = stepId;
2928     fBPP->setStepID(stepId);
2929 }
2930 
addFcnJoinExp(const vector<execplan::SRCP> & fe)2931 void TupleBPS::addFcnJoinExp(const vector<execplan::SRCP>& fe)
2932 {
2933     if (!fe1)
2934         fe1.reset(new funcexp::FuncExpWrapper());
2935 
2936     for (uint32_t i = 0; i < fe.size(); i++)
2937         fe1->addReturnedColumn(fe[i]);
2938 }
2939 
addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree> & fe)2940 void TupleBPS::addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree>& fe)
2941 {
2942     if (!fe1)
2943         fe1.reset(new funcexp::FuncExpWrapper());
2944 
2945     fe1->addFilter(fe);
2946 }
2947 
setFE1Input(const RowGroup & feInput)2948 void TupleBPS::setFE1Input(const RowGroup& feInput)
2949 {
2950     fe1Input = feInput;
2951 }
2952 
setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper> & fe,const rowgroup::RowGroup & rg,bool runFE2onPM)2953 void TupleBPS::setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper>& fe,
2954                                const rowgroup::RowGroup& rg, bool runFE2onPM)
2955 {
2956     fe2 = fe;
2957     fe2Output = rg;
2958     checkDupOutputColumns(rg);
2959     fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2960     runFEonPM = runFE2onPM;
2961 
2962     if (runFEonPM)
2963         fBPP->setFEGroup2(fe2, fe2Output);
2964 }
2965 
setFcnExpGroup3(const vector<execplan::SRCP> & fe)2966 void TupleBPS::setFcnExpGroup3(const vector<execplan::SRCP>& fe)
2967 {
2968     if (!fe2)
2969         fe2.reset(new funcexp::FuncExpWrapper());
2970 
2971     for (uint32_t i = 0; i < fe.size(); i++)
2972         fe2->addReturnedColumn(fe[i]);
2973 
2974     // if this is called, there's no join, so it can always run on the PM
2975     runFEonPM = true;
2976     fBPP->setFEGroup2(fe2, fe2Output);
2977 }
2978 
setFE23Output(const rowgroup::RowGroup & feOutput)2979 void TupleBPS::setFE23Output(const rowgroup::RowGroup& feOutput)
2980 {
2981     fe2Output = feOutput;
2982     checkDupOutputColumns(feOutput);
2983     fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2984 
2985     if (fe2 && runFEonPM)
2986         fBPP->setFEGroup2(fe2, fe2Output);
2987 }
2988 
processFE2_oneRG(RowGroup & input,RowGroup & output,Row & inRow,Row & outRow,funcexp::FuncExpWrapper * local_fe)2989 void TupleBPS::processFE2_oneRG(RowGroup& input, RowGroup& output, Row& inRow,
2990                                 Row& outRow, funcexp::FuncExpWrapper* local_fe)
2991 {
2992     bool ret;
2993     uint32_t i;
2994 
2995     output.resetRowGroup(input.getBaseRid());
2996     output.setDBRoot(input.getDBRoot());
2997     output.getRow(0, &outRow);
2998     input.getRow(0, &inRow);
2999 
3000     for (i = 0; i < input.getRowCount(); i++, inRow.nextRow())
3001     {
3002         ret = local_fe->evaluate(&inRow);
3003 
3004         if (ret)
3005         {
3006             applyMapping(fe2Mapping, inRow, &outRow);
3007             outRow.setRid(inRow.getRelRid());
3008             output.incRowCount();
3009             outRow.nextRow();
3010         }
3011     }
3012 }
3013 
processFE2(RowGroup & input,RowGroup & output,Row & inRow,Row & outRow,vector<RGData> * rgData,funcexp::FuncExpWrapper * local_fe)3014 void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& outRow,
3015                           vector<RGData>* rgData, funcexp::FuncExpWrapper* local_fe)
3016 {
3017     vector<RGData> results;
3018     RGData result;
3019     uint32_t i, j;
3020     bool ret;
3021 
3022     result = RGData(output);
3023     output.setData(&result);
3024     output.resetRowGroup(-1);
3025     output.getRow(0, &outRow);
3026 
3027     for (i = 0; i < rgData->size(); i++)
3028     {
3029         input.setData(&(*rgData)[i]);
3030 
3031         if (output.getRowCount() == 0)
3032         {
3033             output.resetRowGroup(input.getBaseRid());
3034             output.setDBRoot(input.getDBRoot());
3035         }
3036 
3037         input.getRow(0, &inRow);
3038 
3039         for (j = 0; j < input.getRowCount(); j++, inRow.nextRow())
3040         {
3041             ret = local_fe->evaluate(&inRow);
3042 
3043             if (ret)
3044             {
3045                 applyMapping(fe2Mapping, inRow, &outRow);
3046                 outRow.setRid(inRow.getRelRid());
3047                 output.incRowCount();
3048                 outRow.nextRow();
3049 
3050                 if (output.getRowCount() == 8192 ||
3051                         output.getDBRoot() != input.getDBRoot() ||
3052                         output.getBaseRid() != input.getBaseRid()
3053                    )
3054                 {
3055                     results.push_back(result);
3056                     result = RGData(output);
3057                     output.setData(&result);
3058                     output.resetRowGroup(input.getBaseRid());
3059                     output.setDBRoot(input.getDBRoot());
3060                     output.getRow(0, &outRow);
3061                 }
3062             }
3063         }
3064     }
3065 
3066     if (output.getRowCount() > 0)
3067     {
3068         results.push_back(result);
3069     }
3070 
3071     rgData->swap(results);
3072 }
3073 
getDeliveredRowGroup() const3074 const rowgroup::RowGroup& TupleBPS::getDeliveredRowGroup() const
3075 {
3076     if (fe2)
3077         return fe2Output;
3078 
3079     return outputRowGroup;
3080 }
3081 
deliverStringTableRowGroup(bool b)3082 void TupleBPS::deliverStringTableRowGroup(bool b)
3083 {
3084     if (fe2)
3085         fe2Output.setUseStringTable(b);
3086     else if (doJoin)
3087         outputRowGroup.setUseStringTable(b);
3088     else
3089     {
3090         outputRowGroup.setUseStringTable(b);
3091         primRowGroup.setUseStringTable(b);
3092     }
3093 
3094     fBPP->deliverStringTableRowGroup(b);
3095 }
3096 
deliverStringTableRowGroup() const3097 bool TupleBPS::deliverStringTableRowGroup() const
3098 {
3099     if (fe2)
3100         return fe2Output.usesStringTable();
3101 
3102     return outputRowGroup.usesStringTable();
3103 }
3104 
formatMiniStats()3105 void TupleBPS::formatMiniStats()
3106 {
3107 
3108     ostringstream oss;
3109     oss << "BPS "
3110         << "PM "
3111         << alias() << " "
3112         << fTableOid << " "
3113         << fBPP->toMiniString() << " "
3114         << fPhysicalIO << " "
3115         << fCacheIO << " "
3116         << fNumBlksSkipped << " "
3117         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
3118         << ridsReturned << " ";
3119 
3120     fMiniInfo += oss.str();
3121 }
3122 
rgDataToDl(RGData & rgData,RowGroup & rg,RowGroupDL * dlp)3123 void TupleBPS::rgDataToDl(RGData& rgData, RowGroup& rg, RowGroupDL* dlp)
3124 {
3125     // bug 1965, populate duplicate columns if any.
3126     if (dupColumns.size() > 0)
3127         dupOutputColumns(rgData, rg);
3128 
3129     dlp->insert(rgData);
3130 }
3131 
3132 
rgDataVecToDl(vector<RGData> & rgDatav,RowGroup & rg,RowGroupDL * dlp)3133 void TupleBPS::rgDataVecToDl(vector<RGData>& rgDatav, RowGroup& rg, RowGroupDL* dlp)
3134 {
3135     uint64_t size = rgDatav.size();
3136 
3137     if (size > 0 && !cancelled())
3138     {
3139         dlMutex.lock();
3140 
3141         for (uint64_t i = 0; i < size; i++)
3142         {
3143             rgDataToDl(rgDatav[i], rg, dlp);
3144         }
3145 
3146         dlMutex.unlock();
3147     }
3148 
3149     rgDatav.clear();
3150 }
3151 
setJoinFERG(const RowGroup & rg)3152 void TupleBPS::setJoinFERG(const RowGroup& rg)
3153 {
3154     joinFERG = rg;
3155     fBPP->setJoinFERG(rg);
3156 }
3157 
addCPPredicates(uint32_t OID,const vector<int64_t> & vals,bool isRange)3158 void TupleBPS::addCPPredicates(uint32_t OID, const vector<int64_t>& vals, bool isRange)
3159 {
3160 
3161     if (fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP || fOid < 3000)
3162         return;
3163 
3164     uint32_t i, j, k;
3165     int64_t min, max, seq;
3166     bool isValid, intersection;
3167     vector<SCommand> colCmdVec = fBPP->getFilterSteps();
3168     ColumnCommandJL* cmd;
3169 
3170     for (i = 0; i < fBPP->getProjectSteps().size(); i++)
3171         colCmdVec.push_back(fBPP->getProjectSteps()[i]);
3172 
3173     LBIDList ll(OID, 0);
3174 
3175     /* Find the columncommand with that OID.
3176      * Check that the column type is one handled by CP.
3177      * For each extent in that OID,
3178      *    grab the min & max,
3179      *    OR together all of the intersection tests,
3180      *    AND it with the current CP flag.
3181      */
3182 
3183     for (i = 0; i < colCmdVec.size(); i++)
3184     {
3185         cmd = dynamic_cast<ColumnCommandJL*>(colCmdVec[i].get());
3186 
3187         if (cmd != NULL && cmd->getOID() == OID)
3188         {
3189             if (!ll.CasualPartitionDataType(cmd->getColType().colDataType, cmd->getColType().colWidth)
3190                     || cmd->isDict())
3191                 return;
3192 
3193             // @bug 2989, use correct extents
3194             tr1::unordered_map<int64_t, struct BRM::EMEntry>* extentsPtr = NULL;
3195             vector<struct BRM::EMEntry> extents;  // in case the extents of OID is not in Map
3196 
3197             // TODO: store the sorted vectors from the pcolscans/steps as a minor optimization
3198             dbrm.getExtents(OID, extents);
3199             sort(extents.begin(), extents.end(), ExtentSorter());
3200 
3201             if (extentsMap.find(OID) != extentsMap.end())
3202             {
3203                 extentsPtr = &extentsMap[OID];
3204             }
3205             else if (dbrm.getExtents(OID, extents) == 0)
3206             {
3207                 extentsMap[OID] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
3208                 tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[OID];
3209 
3210                 for (uint32_t z = 0; z < extents.size(); z++)
3211                     mref[extents[z].range.start] = extents[z];
3212 
3213                 extentsPtr = &mref;
3214             }
3215 
3216             for (j = 0; j < extents.size(); j++)
3217             {
3218                 isValid = ll.GetMinMax(&min, &max, &seq, extents[j].range.start, *extentsPtr,
3219                                        cmd->getColType().colDataType);
3220 
3221                 if (isValid)
3222                 {
3223                     if (isRange)
3224                         runtimeCPFlags[j] = ll.checkRangeOverlap(min, max, vals[0], vals[1],
3225                                             cmd->getColType()) && runtimeCPFlags[j];
3226                     else
3227                     {
3228                         intersection = false;
3229 
3230                         for (k = 0; k < vals.size(); k++)
3231                             intersection = intersection ||
3232                                            ll.checkSingleValue(min, max, vals[k], cmd->getColType());
3233 
3234                         runtimeCPFlags[j] = intersection && runtimeCPFlags[j];
3235                     }
3236                 }
3237             }
3238 
3239             break;
3240         }
3241     }
3242 }
3243 
dec(DistributedEngineComm * dec)3244 void TupleBPS::dec(DistributedEngineComm* dec)
3245 {
3246     if (fDec)
3247         fDec->removeQueue(uniqueID);
3248 
3249     fDec = dec;
3250 
3251     if (fDec)
3252         fDec->addQueue(uniqueID, true);
3253 }
3254 
abort_nolock()3255 void TupleBPS::abort_nolock()
3256 {
3257     if (fDie)
3258         return;
3259 
3260     JobStep::abort();
3261 
3262     if (fDec && BPPIsAllocated)
3263     {
3264         ByteStream bs;
3265         fBPP->abortProcessing(&bs);
3266 
3267         try
3268         {
3269             fDec->write(uniqueID, bs);
3270         }
3271         catch (...)
3272         {
3273             // this throws only if there are no PMs left.  If there are none,
3274             // that is the cause of the abort and that will be reported to the
3275             // front-end already.  Nothing to do here.
3276         }
3277 
3278         BPPIsAllocated = false;
3279         fDec->shutdownQueue(uniqueID);
3280     }
3281 
3282     condvarWakeupProducer.notify_all();
3283     condvar.notify_all();
3284 }
3285 
abort()3286 void TupleBPS::abort()
3287 {
3288     boost::mutex::scoped_lock scoped(boost::mutex);
3289     abort_nolock();
3290 }
3291 
3292 }   //namespace
3293 // vim:ts=4 sw=4:
3294