1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019-2020 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: tuple-bps.cpp 9705 2013-07-17 20:06:07Z pleblanc $
20
21
22 #include <unistd.h>
23 //#define NDEBUG
24 #include <cassert>
25 #include <sstream>
26 #include <iomanip>
27 #include <algorithm>
28 #include <ctime>
29 #include <sys/time.h>
30 #include <deque>
31 using namespace std;
32
33 #include <boost/thread.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/uuid/uuid_io.hpp>
36 using namespace boost;
37
38 #include "bpp-jl.h"
39 #include "distributedenginecomm.h"
40 #include "elementtype.h"
41 #include "jlf_common.h"
42 #include "primitivestep.h"
43 #include "unique32generator.h"
44 #include "rowestimator.h"
45 using namespace joblist;
46
47 #include "messagequeue.h"
48 using namespace messageqcpp;
49
50 #include "configcpp.h"
51 using namespace config;
52
53 #include "messagelog.h"
54 #include "messageobj.h"
55 #include "loggingid.h"
56 #include "errorcodes.h"
57 #include "errorids.h"
58 #include "exceptclasses.h"
59 using namespace logging;
60
61 #include "liboamcpp.h"
62
63 #include "calpontsystemcatalog.h"
64 using namespace execplan;
65
66 #include "brm.h"
67 using namespace BRM;
68
69 #include "oamcache.h"
70
71 #include "rowgroup.h"
72 using namespace rowgroup;
73
74 #include "threadnaming.h"
75
76 #include "querytele.h"
77 using namespace querytele;
78
79 #include "pseudocolumn.h"
80 //#define DEBUG 1
81
82 extern boost::mutex fileLock_g;
83
84 namespace
85 {
86 const uint32_t LOGICAL_EXTENT_CONVERTER = 10; // 10 + 13. 13 to convert to logical blocks,
87 // 10 to convert to groups of 1024 logical blocks
88 const uint32_t DEFAULT_EXTENTS_PER_SEG_FILE = 2;
89
90 }
91
92 /** Debug macro */
93 #define THROTTLE_DEBUG 0
94 #if THROTTLE_DEBUG
95 #define THROTTLEDEBUG std::cout
96 #else
97 #define THROTTLEDEBUG if (false) std::cout
98 #endif
99 namespace joblist
100 {
101
102 struct TupleBPSPrimitive
103 {
TupleBPSPrimitivejoblist::TupleBPSPrimitive104 TupleBPSPrimitive(TupleBPS* batchPrimitiveStep) :
105 fBatchPrimitiveStep(batchPrimitiveStep)
106 {}
107 TupleBPS* fBatchPrimitiveStep;
operator ()joblist::TupleBPSPrimitive108 void operator()()
109 {
110 try
111 {
112 utils::setThreadName("BPSPrimitive");
113 fBatchPrimitiveStep->sendPrimitiveMessages();
114 }
115 catch (std::exception& re)
116 {
117 cerr << "TupleBPS: send thread threw an exception: " << re.what() <<
118 "\t" << this << endl;
119 catchHandler(re.what(), ERR_TUPLE_BPS, fBatchPrimitiveStep->errorInfo());
120 }
121 catch (...)
122 {
123 string msg("TupleBPS: send thread threw an unknown exception ");
124 catchHandler(msg, ERR_TUPLE_BPS, fBatchPrimitiveStep->errorInfo());
125 cerr << msg << this << endl;
126 }
127 }
128 };
129
130 struct TupleBPSAggregators
131 {
TupleBPSAggregatorsjoblist::TupleBPSAggregators132 TupleBPSAggregators(TupleBPS* batchPrimitiveStep, uint64_t index) :
133 fBatchPrimitiveStepCols(batchPrimitiveStep), fThreadId(index)
134 {}
135 TupleBPS* fBatchPrimitiveStepCols;
136 uint64_t fThreadId;
137
operator ()joblist::TupleBPSAggregators138 void operator()()
139 {
140 try
141 {
142 utils::setThreadName("BPSAggregator");
143 fBatchPrimitiveStepCols->receiveMultiPrimitiveMessages(fThreadId);
144 }
145 catch (std::exception& re)
146 {
147 cerr << fBatchPrimitiveStepCols->toString() << ": receive thread threw an exception: " << re.what() << endl;
148 catchHandler(re.what(), ERR_TUPLE_BPS, fBatchPrimitiveStepCols->errorInfo());
149 }
150 catch (...)
151 {
152 string msg("TupleBPS: recv thread threw an unknown exception ");
153 cerr << fBatchPrimitiveStepCols->toString() << msg << endl;
154 catchHandler(msg, ERR_TUPLE_BPS, fBatchPrimitiveStepCols->errorInfo());
155 }
156 }
157 };
158
159 //------------------------------------------------------------------------------
160 // Initialize configurable parameters
161 //------------------------------------------------------------------------------
initializeConfigParms()162 void TupleBPS::initializeConfigParms()
163 {
164 string strVal;
165
166
167 //...Get the tuning parameters that throttle msgs sent to primproc
168 //...fFilterRowReqLimit puts a cap on how many rids we will request from
169 //... primproc, before pausing to let the consumer thread catch up.
170 //... Without this limit, there is a chance that PrimProc could flood
171 //... ExeMgr with thousands of messages that will consume massive
172 //... amounts of memory for a 100 gigabyte database.
173 //...fFilterRowReqThreshold is the level at which the number of outstanding
174 //... rids must fall below, before the producer can send more rids.
175
176 //These could go in constructor
177 fRequestSize = fRm->getJlRequestSize();
178 fMaxOutstandingRequests = fRm->getJlMaxOutstandingRequests();
179 fProcessorThreadsPerScan = fRm->getJlProcessorThreadsPerScan();
180 fNumThreads = 0;
181
182 config::Config* cf = config::Config::makeConfig();
183 string epsf = cf->getConfig("ExtentMap", "ExtentsPerSegmentFile");
184
185 if ( epsf.length() != 0 )
186 fExtentsPerSegFile = cf->uFromText(epsf);
187
188 if (fRequestSize >= fMaxOutstandingRequests)
189 fRequestSize = 1;
190
191 if ((fSessionId & 0x80000000) == 0)
192 fMaxNumThreads = fRm->getJlNumScanReceiveThreads();
193 else
194 fMaxNumThreads = 1;
195
196 // Reserve the max number of thread space. A bit of an optimization.
197 fProducerThreads.clear();
198 fProducerThreads.reserve(fMaxNumThreads);
199 }
200
TupleBPS(const pColStep & rhs,const JobInfo & jobInfo)201 TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
202 BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm)
203 {
204 fInputJobStepAssociation = rhs.inputAssociation();
205 fOutputJobStepAssociation = rhs.outputAssociation();
206 fDec = 0;
207 fSessionId = rhs.sessionId();
208 fFilterCount = rhs.filterCount();
209 fFilterString = rhs.filterString();
210 isFilterFeeder = rhs.getFeederFlag();
211 fOid = rhs.oid();
212 fTableOid = rhs.tableOid();
213 extentSize = rhs.extentSize;
214
215 scannedExtents = rhs.extents;
216 extentsMap[fOid] = tr1::unordered_map<int64_t, EMEntry>();
217 tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[fOid];
218
219 for (uint32_t z = 0; z < rhs.extents.size(); z++)
220 ref[rhs.extents[z].range.start] = rhs.extents[z];
221
222 lbidList = rhs.lbidList;
223 rpbShift = rhs.rpbShift;
224 divShift = rhs.divShift;
225 modMask = rhs.modMask;
226 numExtents = rhs.numExtents;
227 ridsRequested = 0;
228 ridsReturned = 0;
229 recvExited = 0;
230 totalMsgs = 0;
231 msgsSent = 0;
232 msgsRecvd = 0;
233 fMsgBytesIn = 0;
234 fMsgBytesOut = 0;
235 fBlockTouched = 0;
236 fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
237 recvWaiting = 0;
238 fStepCount = 1;
239 fCPEvaluated = false;
240 fEstimatedRows = 0;
241 fColType = rhs.colType();
242 alias(rhs.alias());
243 view(rhs.view());
244 name(rhs.name());
245 fColWidth = fColType.colWidth;
246 fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
247 initializeConfigParms();
248 fBPP->setSessionID(fSessionId);
249 fBPP->setStepID(fStepId);
250 fBPP->setQueryContext(fVerId);
251 fBPP->setTxnID(fTxnId);
252 fTraceFlags = rhs.fTraceFlags;
253 fBPP->setTraceFlags(fTraceFlags);
254 fBPP->setOutputType(ROW_GROUP);
255 finishedSending = sendWaiting = false;
256 fNumBlksSkipped = 0;
257 fPhysicalIO = 0;
258 fCacheIO = 0;
259 BPPIsAllocated = false;
260 uniqueID = UniqueNumberGenerator::instance()->getUnique32();
261 fBPP->setUniqueID(uniqueID);
262 fBPP->setUuid(fStepUuid);
263 fCardinality = rhs.cardinality();
264 doJoin = false;
265 hasPMJoin = false;
266 hasUMJoin = false;
267 fRunExecuted = false;
268 fSwallowRows = false;
269 smallOuterJoiner = -1;
270
271 // @1098 initialize scanFlags to be true
272 scanFlags.assign(numExtents, true);
273 runtimeCPFlags.assign(numExtents, true);
274 bop = BOP_AND;
275
276 runRan = joinRan = false;
277 fDelivery = false;
278 fExtendedInfo = "TBPS: ";
279 fQtc.stepParms().stepType = StepTeleStats::T_BPS;
280
281
282 hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
283 hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
284 }
285
TupleBPS(const pColScanStep & rhs,const JobInfo & jobInfo)286 TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
287 BatchPrimitive(jobInfo), fRm(jobInfo.rm)
288 {
289 fInputJobStepAssociation = rhs.inputAssociation();
290 fOutputJobStepAssociation = rhs.outputAssociation();
291 fDec = 0;
292 fFilterCount = rhs.filterCount();
293 fFilterString = rhs.filterString();
294 isFilterFeeder = rhs.getFeederFlag();
295 fOid = rhs.oid();
296 fTableOid = rhs.tableOid();
297 extentSize = rhs.extentSize;
298 lbidRanges = rhs.lbidRanges;
299
300 /* These lines are obsoleted by initExtentMarkers. Need to remove & retest. */
301 scannedExtents = rhs.extents;
302 extentsMap[fOid] = tr1::unordered_map<int64_t, EMEntry>();
303 tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[fOid];
304
305 for (uint32_t z = 0; z < rhs.extents.size(); z++)
306 ref[rhs.extents[z].range.start] = rhs.extents[z];
307
308 divShift = rhs.divShift;
309 totalMsgs = 0;
310 msgsSent = 0;
311 msgsRecvd = 0;
312 ridsReturned = 0;
313 ridsRequested = 0;
314 fNumBlksSkipped = 0;
315 fMsgBytesIn = 0;
316 fMsgBytesOut = 0;
317 fBlockTouched = 0;
318 fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
319 recvWaiting = 0;
320 fSwallowRows = false;
321 fStepCount = 1;
322 fCPEvaluated = false;
323 fEstimatedRows = 0;
324 fColType = rhs.colType();
325 alias(rhs.alias());
326 view(rhs.view());
327 name(rhs.name());
328
329 fColWidth = fColType.colWidth;
330 lbidList = rhs.lbidList;
331
332 finishedSending = sendWaiting = false;
333 firstRead = true;
334 fSwallowRows = false;
335 recvExited = 0;
336 fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
337 initializeConfigParms();
338 fBPP->setSessionID(fSessionId);
339 fBPP->setQueryContext(fVerId);
340 fBPP->setTxnID(fTxnId);
341 fTraceFlags = rhs.fTraceFlags;
342 fBPP->setTraceFlags(fTraceFlags);
343 fBPP->setStepID(fStepId);
344 fBPP->setOutputType(ROW_GROUP);
345 fPhysicalIO = 0;
346 fCacheIO = 0;
347 BPPIsAllocated = false;
348 uniqueID = UniqueNumberGenerator::instance()->getUnique32();
349 fBPP->setUniqueID(uniqueID);
350 fBPP->setUuid(fStepUuid);
351 fCardinality = rhs.cardinality();
352 doJoin = false;
353 hasPMJoin = false;
354 hasUMJoin = false;
355 fRunExecuted = false;
356 smallOuterJoiner = -1;
357 bop = BOP_AND;
358
359 runRan = joinRan = false;
360 fDelivery = false;
361 fExtendedInfo = "TBPS: ";
362
363 initExtentMarkers();
364 fQtc.stepParms().stepType = StepTeleStats::T_BPS;
365
366 hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
367 hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
368 }
369
TupleBPS(const PassThruStep & rhs,const JobInfo & jobInfo)370 TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
371 BatchPrimitive(jobInfo), fRm(jobInfo.rm)
372 {
373 fInputJobStepAssociation = rhs.inputAssociation();
374 fOutputJobStepAssociation = rhs.outputAssociation();
375 fDec = 0;
376 fFilterCount = 0;
377 fOid = rhs.oid();
378 fTableOid = rhs.tableOid();
379 ridsReturned = 0;
380 ridsRequested = 0;
381 fMsgBytesIn = 0;
382 fMsgBytesOut = 0;
383 fBlockTouched = 0;
384 fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
385 recvExited = 0;
386 totalMsgs = 0;
387 msgsSent = 0;
388 msgsRecvd = 0;
389 recvWaiting = 0;
390 fStepCount = 1;
391 fCPEvaluated = false;
392 fEstimatedRows = 0;
393 fColType = rhs.colType();
394 alias(rhs.alias());
395 view(rhs.view());
396 name(rhs.name());
397 fColWidth = fColType.colWidth;
398 fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
399 initializeConfigParms();
400 fBPP->setSessionID(fSessionId);
401 fBPP->setStepID(fStepId);
402 fBPP->setQueryContext(fVerId);
403 fBPP->setTxnID(fTxnId);
404 fTraceFlags = rhs.fTraceFlags;
405 fBPP->setTraceFlags(fTraceFlags);
406 fBPP->setOutputType(ROW_GROUP);
407 finishedSending = sendWaiting = false;
408 fSwallowRows = false;
409 fNumBlksSkipped = 0;
410 fPhysicalIO = 0;
411 fCacheIO = 0;
412 BPPIsAllocated = false;
413 uniqueID = UniqueNumberGenerator::instance()->getUnique32();
414 fBPP->setUniqueID(uniqueID);
415 fBPP->setUuid(fStepUuid);
416 doJoin = false;
417 hasPMJoin = false;
418 hasUMJoin = false;
419 fRunExecuted = false;
420 isFilterFeeder = false;
421 smallOuterJoiner = -1;
422
423 // @1098 initialize scanFlags to be true
424 scanFlags.assign(numExtents, true);
425 runtimeCPFlags.assign(numExtents, true);
426 bop = BOP_AND;
427
428 runRan = joinRan = false;
429 fDelivery = false;
430 fExtendedInfo = "TBPS: ";
431 fQtc.stepParms().stepType = StepTeleStats::T_BPS;
432
433 hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
434 hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
435
436 }
437
TupleBPS(const pDictionaryStep & rhs,const JobInfo & jobInfo)438 TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
439 BatchPrimitive(jobInfo), fRm(jobInfo.rm)
440 {
441 fInputJobStepAssociation = rhs.inputAssociation();
442 fOutputJobStepAssociation = rhs.outputAssociation();
443 fDec = 0;
444 fOid = rhs.oid();
445 fTableOid = rhs.tableOid();
446 totalMsgs = 0;
447 msgsSent = 0;
448 msgsRecvd = 0;
449 ridsReturned = 0;
450 ridsRequested = 0;
451 fNumBlksSkipped = 0;
452 fBlockTouched = 0;
453 fMsgBytesIn = 0;
454 fMsgBytesOut = 0;
455 fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
456 recvWaiting = 0;
457 fSwallowRows = false;
458 fStepCount = 1;
459 fCPEvaluated = false;
460 fEstimatedRows = 0;
461 alias(rhs.alias());
462 view(rhs.view());
463 name(rhs.name());
464 finishedSending = sendWaiting = false;
465 recvExited = 0;
466 fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
467 initializeConfigParms();
468 fBPP->setSessionID(fSessionId);
469 fBPP->setStepID(fStepId);
470 fBPP->setQueryContext(fVerId);
471 fBPP->setTxnID(fTxnId);
472 fTraceFlags = rhs.fTraceFlags;
473 fBPP->setTraceFlags(fTraceFlags);
474 fBPP->setOutputType(ROW_GROUP);
475 fPhysicalIO = 0;
476 fCacheIO = 0;
477 BPPIsAllocated = false;
478 uniqueID = UniqueNumberGenerator::instance()->getUnique32();
479 fBPP->setUniqueID(uniqueID);
480 fBPP->setUuid(fStepUuid);
481 fCardinality = rhs.cardinality();
482 doJoin = false;
483 hasPMJoin = false;
484 hasUMJoin = false;
485 fRunExecuted = false;
486 isFilterFeeder = false;
487 smallOuterJoiner = -1;
488 // @1098 initialize scanFlags to be true
489 scanFlags.assign(numExtents, true);
490 runtimeCPFlags.assign(numExtents, true);
491 bop = BOP_AND;
492
493 runRan = joinRan = false;
494 fDelivery = false;
495 fExtendedInfo = "TBPS: ";
496 fQtc.stepParms().stepType = StepTeleStats::T_BPS;
497
498 hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
499 hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
500
501 }
502
~TupleBPS()503 TupleBPS::~TupleBPS()
504 {
505 if (fDec)
506 {
507 fDec->removeDECEventListener(this);
508
509 if (BPPIsAllocated)
510 {
511 ByteStream bs;
512 fBPP->destroyBPP(bs);
513
514 try
515 {
516 fDec->write(uniqueID, bs);
517 }
518 catch (const std::exception& e)
519 {
520 // log the exception
521 cerr << "~TupleBPS caught: " << e.what() << endl;
522 catchHandler(e.what(), ERR_TUPLE_BPS, fErrorInfo, fSessionId);
523 }
524 catch (...)
525 {
526 cerr << "~TupleBPS caught unknown exception" << endl;
527 catchHandler("~TupleBPS caught unknown exception",
528 ERR_TUPLE_BPS, fErrorInfo, fSessionId);
529 }
530 }
531
532 fDec->removeQueue(uniqueID);
533 }
534
535 }
536
setBPP(JobStep * jobStep)537 void TupleBPS::setBPP(JobStep* jobStep)
538 {
539 fCardinality = jobStep->cardinality();
540
541 pColStep* pcsp = dynamic_cast<pColStep*>(jobStep);
542
543 int colWidth = 0;
544
545 if (pcsp != 0)
546 {
547 PseudoColStep* pseudo = dynamic_cast<PseudoColStep*>(jobStep);
548
549 if (pseudo)
550 {
551 fBPP->addFilterStep(*pseudo);
552
553 if (pseudo->filterCount() > 0)
554 {
555 hasPCFilter = true;
556
557 switch (pseudo->pseudoColumnId())
558 {
559 case PSEUDO_EXTENTRELATIVERID:
560 hasRIDFilter = true;
561 break;
562
563 case PSEUDO_DBROOT:
564 hasDBRootFilter = true;
565 break;
566
567 case PSEUDO_PM:
568 hasPMFilter = true;
569 break;
570
571 case PSEUDO_SEGMENT:
572 hasSegmentFilter = true;
573 break;
574
575 case PSEUDO_SEGMENTDIR:
576 hasSegmentDirFilter = true;
577 break;
578
579 case PSEUDO_EXTENTMIN:
580 hasMinFilter = true;
581 break;
582
583 case PSEUDO_EXTENTMAX:
584 hasMaxFilter = true;
585 break;
586
587 case PSEUDO_BLOCKID:
588 hasLBIDFilter = true;
589 break;
590
591 case PSEUDO_EXTENTID:
592 hasExtentIDFilter = true;
593 break;
594
595 case PSEUDO_PARTITION:
596 hasPartitionFilter = true;
597 break;
598 }
599 }
600 }
601 else
602 fBPP->addFilterStep(*pcsp);
603
604 extentsMap[pcsp->fOid] = tr1::unordered_map<int64_t, EMEntry>();
605 tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcsp->fOid];
606
607 for (uint32_t z = 0; z < pcsp->extents.size(); z++)
608 ref[pcsp->extents[z].range.start] = pcsp->extents[z];
609
610 colWidth = (pcsp->colType()).colWidth;
611 isFilterFeeder = pcsp->getFeederFlag();
612
613 // it17 does not allow combined AND/OR, this pcolstep is for hashjoin optimization.
614 if (bop == BOP_OR && isFilterFeeder == false)
615 fBPP->setForHJ(true);
616 }
617 else
618 {
619 pColScanStep* pcss = dynamic_cast<pColScanStep*>(jobStep);
620
621 if (pcss != 0)
622 {
623 fBPP->addFilterStep(*pcss, lastScannedLBID);
624
625 extentsMap[pcss->fOid] = tr1::unordered_map<int64_t, EMEntry>();
626 tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcss->fOid];
627
628 for (uint32_t z = 0; z < pcss->extents.size(); z++)
629 ref[pcss->extents[z].range.start] = pcss->extents[z];
630
631 colWidth = (pcss->colType()).colWidth;
632 isFilterFeeder = pcss->getFeederFlag();
633 }
634 else
635 {
636 pDictionaryStep* pdsp = dynamic_cast<pDictionaryStep*>(jobStep);
637
638 if (pdsp != 0)
639 {
640 fBPP->addFilterStep(*pdsp);
641 colWidth = (pdsp->colType()).colWidth;
642 }
643 else
644 {
645 FilterStep* pfsp = dynamic_cast<FilterStep*>(jobStep);
646
647 if (pfsp)
648 {
649 fBPP->addFilterStep(*pfsp);
650 }
651 }
652
653 }
654 }
655
656 if (colWidth > fColWidth)
657 {
658 fColWidth = colWidth;
659 }
660 }
661
setProjectBPP(JobStep * jobStep1,JobStep * jobStep2)662 void TupleBPS::setProjectBPP(JobStep* jobStep1, JobStep* jobStep2)
663 {
664 int colWidth = 0;
665
666 if (jobStep2 != NULL)
667 {
668 pDictionaryStep* pdsp = 0;
669 pColStep* pcsp = dynamic_cast<pColStep*>(jobStep1);
670
671 if (pcsp != 0)
672 {
673 pdsp = dynamic_cast<pDictionaryStep*>(jobStep2);
674 fBPP->addProjectStep(*pcsp, *pdsp);
675
676 //@Bug 961
677 if (!pcsp->isExeMgr())
678 fBPP->setNeedRidsAtDelivery(true);
679
680 colWidth = (pcsp->colType()).colWidth;
681 projectOids.push_back(jobStep1->oid());
682 }
683 else
684 {
685 PassThruStep* psth = dynamic_cast<PassThruStep*>(jobStep1);
686
687 if (psth != 0)
688 {
689 pdsp = dynamic_cast<pDictionaryStep*>(jobStep2);
690 fBPP->addProjectStep(*psth, *pdsp);
691
692 //@Bug 961
693 if (!psth->isExeMgr())
694 fBPP->setNeedRidsAtDelivery(true);
695
696 projectOids.push_back(jobStep1->oid());
697 colWidth = (psth->colType()).colWidth;
698 }
699 }
700 }
701 else
702 {
703 pColStep* pcsp = dynamic_cast<pColStep*>(jobStep1);
704
705 if (pcsp != 0)
706 {
707 PseudoColStep* pseudo = dynamic_cast<PseudoColStep*>(jobStep1);
708
709 if (pseudo)
710 {
711 fBPP->addProjectStep(*pseudo);
712 }
713 else
714 fBPP->addProjectStep(*pcsp);
715
716 extentsMap[pcsp->fOid] = tr1::unordered_map<int64_t, EMEntry>();
717 tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcsp->fOid];
718
719 for (uint32_t z = 0; z < pcsp->extents.size(); z++)
720 ref[pcsp->extents[z].range.start] = pcsp->extents[z];
721
722 //@Bug 961
723 if (!pcsp->isExeMgr())
724 fBPP->setNeedRidsAtDelivery(true);
725
726 colWidth = (pcsp->colType()).colWidth;
727 projectOids.push_back(jobStep1->oid());
728 }
729 else
730 {
731 PassThruStep* passthru = dynamic_cast<PassThruStep*>(jobStep1);
732
733 if (passthru != 0)
734 {
735 idbassert(!fBPP->getFilterSteps().empty());
736
737 if (static_cast<CalpontSystemCatalog::OID>(fBPP->getFilterSteps().back()->getOID()) != passthru->oid())
738 {
739 SJSTEP pts;
740
741 if (passthru->pseudoType() == 0)
742 {
743 pts.reset(new pColStep(*passthru));
744 pcsp = dynamic_cast<pColStep*>(pts.get());
745 }
746 else
747 {
748 pts.reset(new PseudoColStep(*passthru));
749 pcsp = dynamic_cast<PseudoColStep*>(pts.get());
750 }
751
752 fBPP->addProjectStep(*pcsp);
753
754 if (!passthru->isExeMgr())
755 fBPP->setNeedRidsAtDelivery(true);
756
757 colWidth = passthru->colType().colWidth;
758 projectOids.push_back(pts->oid());
759 }
760 else
761 {
762 fBPP->addProjectStep(*passthru);
763
764 //@Bug 961
765 if (!passthru->isExeMgr())
766 fBPP->setNeedRidsAtDelivery(true);
767
768 colWidth = (passthru->colType()).colWidth;
769 projectOids.push_back(jobStep1->oid());
770 }
771 }
772 }
773 }
774
775 if (colWidth > fColWidth)
776 {
777 fColWidth = colWidth;
778 }
779 }
780
storeCasualPartitionInfo(const bool estimateRowCounts)781 void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
782 {
783 const vector<SCommand>& colCmdVec = fBPP->getFilterSteps();
784 vector<ColumnCommandJL*> cpColVec;
785 vector<SP_LBIDList> lbidListVec;
786 ColumnCommandJL* colCmd = 0;
787
788 // @bug 2123. We call this earlier in the process for the hash join estimation process now. Return if we've already done the work.
789 if (fCPEvaluated)
790 {
791 return;
792 }
793
794 fCPEvaluated = true;
795
796 if (colCmdVec.size() == 0)
797 return;
798
799 for (uint32_t i = 0; i < colCmdVec.size(); i++)
800 {
801 colCmd = dynamic_cast<ColumnCommandJL*>(colCmdVec[i].get());
802
803 if (!colCmd || dynamic_cast<PseudoCCJL*>(colCmdVec[i].get()))
804 continue;
805
806 SP_LBIDList tmplbidList(new LBIDList(0));
807
808 if (tmplbidList->CasualPartitionDataType(colCmd->getColType().colDataType, colCmd->getColType().colWidth))
809 {
810 lbidListVec.push_back(tmplbidList);
811 cpColVec.push_back(colCmd);
812 }
813
814 // @Bug 3503. Use the total table size as the estimate for non CP columns.
815 else if (fEstimatedRows == 0 && estimateRowCounts)
816 {
817 RowEstimator rowEstimator;
818 fEstimatedRows = rowEstimator.estimateRowsForNonCPColumn(*colCmd);
819 }
820 }
821
822
823 if (cpColVec.size() == 0)
824 return;
825
826 const bool ignoreCP = ((fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP) != 0);
827
828 for (uint32_t idx = 0; idx < numExtents; idx++)
829 {
830 scanFlags[idx] = true;
831
832 for (uint32_t i = 0; i < cpColVec.size(); i++)
833 {
834 colCmd = cpColVec[i];
835 const EMEntry& extent = colCmd->getExtents()[idx];
836
837 /* If any column filter eliminates an extent, it doesn't get scanned */
838 scanFlags[idx] = scanFlags[idx] &&
839 (ignoreCP || extent.partition.cprange.isValid != BRM::CP_VALID ||
840 lbidListVec[i]->CasualPartitionPredicate(
841 extent.partition.cprange.lo_val,
842 extent.partition.cprange.hi_val,
843 &(colCmd->getFilterString()),
844 colCmd->getFilterCount(),
845 colCmd->getColType(),
846 colCmd->getBOP())
847 );
848
849 if (!scanFlags[idx])
850 {
851 break;
852 }
853 }
854 }
855
856 // @bug 2123. Use the casual partitioning information to estimate the number of rows that will be returned for use in estimating
857 // the large side table for hashjoins.
858 if (estimateRowCounts)
859 {
860 RowEstimator rowEstimator;
861 fEstimatedRows = rowEstimator.estimateRows(cpColVec, scanFlags, dbrm, fOid);
862 }
863 }
864
startPrimitiveThread()865 void TupleBPS::startPrimitiveThread()
866 {
867 pThread = jobstepThreadPool.invoke(TupleBPSPrimitive(this));
868 }
869
startAggregationThread()870 void TupleBPS::startAggregationThread()
871 {
872 // This block of code starts all threads up front
873 // fMaxNumThreads = 1;
874 // fNumThreads = fMaxNumThreads;
875 // for (uint32_t i = 0; i < fMaxNumThreads; i++)
876 // fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i)));
877
878 // This block of code starts one thread at a time
879 if (fNumThreads >= fMaxNumThreads)
880 return;
881
882 fNumThreads++;
883 fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads - 1)));
884 }
885
serializeJoiner()886 void TupleBPS::serializeJoiner()
887 {
888 ByteStream bs;
889 bool more = true;
890
891 /* false from nextJoinerMsg means it's the last msg,
892 it's not exactly the exit condition*/
893 while (more)
894 {
895 {
896 // code block to release the lock immediatly
897 boost::mutex::scoped_lock lk(serializeJoinerMutex);
898 more = fBPP->nextTupleJoinerMsg(bs);
899 }
900 #ifdef JLF_DEBUG
901 cout << "serializing joiner into " << bs.length() << " bytes" << endl;
902 #endif
903 fDec->write(uniqueID, bs);
904 bs.restart();
905 }
906
907 }
908
serializeJoiner(uint32_t conn)909 void TupleBPS::serializeJoiner(uint32_t conn)
910 {
911 // We need this lock for TupleBPS::serializeJoiner()
912 boost::mutex::scoped_lock lk(serializeJoinerMutex);
913
914 ByteStream bs;
915 bool more = true;
916
917 /* false from nextJoinerMsg means it's the last msg,
918 it's not exactly the exit condition*/
919 while (more)
920 {
921 more = fBPP->nextTupleJoinerMsg(bs);
922 fDec->write(bs, conn);
923 bs.restart();
924 }
925 }
926
prepCasualPartitioning()927 void TupleBPS::prepCasualPartitioning()
928 {
929 uint32_t i;
930 int64_t min, max, seq;
931 boost::mutex::scoped_lock lk(cpMutex);
932
933 for (i = 0; i < scannedExtents.size(); i++)
934 {
935 if (fOid >= 3000)
936 {
937 scanFlags[i] = scanFlags[i] && runtimeCPFlags[i];
938
939 if (scanFlags[i] && lbidList->CasualPartitionDataType(fColType.colDataType,
940 fColType.colWidth))
941 lbidList->GetMinMax(min, max, seq, (int64_t) scannedExtents[i].range.start,
942 &scannedExtents, fColType.colDataType);
943 }
944 else
945 scanFlags[i] = true;
946 }
947 }
948
goodExtentCount()949 bool TupleBPS::goodExtentCount()
950 {
951 uint32_t eCount = extentsMap.begin()->second.size();
952 map<CalpontSystemCatalog::OID, tr1::unordered_map<int64_t, EMEntry> >
953 ::iterator it;
954
955 for (it = extentsMap.begin(); it != extentsMap.end(); ++it)
956 if (it->second.size() != eCount)
957 return false;
958
959 return true;
960 }
961
initExtentMarkers()962 void TupleBPS::initExtentMarkers()
963 {
964 numDBRoots = fRm->getDBRootCount();
965 lastExtent.resize(numDBRoots);
966 lastScannedLBID.resize(numDBRoots);
967
968 tr1::unordered_map<int64_t, struct BRM::EMEntry>& ref = extentsMap[fOid];
969 tr1::unordered_map<int64_t, struct BRM::EMEntry>::iterator it;
970
971 // Map part# and seg# to an extent count per segment file.
972 // Part# is 32 hi order bits of key, seg# is 32 lo order bits
973 std::tr1::unordered_map<uint64_t, int> extentCountPerDbFile;
974
975 scannedExtents.clear();
976
977 for (it = ref.begin(); it != ref.end(); ++it)
978 {
979 scannedExtents.push_back(it->second);
980
981 //@bug 5322: 0 HWM may not mean full extent if 1 extent in file.
982 // Track how many extents are in each segment file
983 if (fExtentsPerSegFile > 1)
984 {
985 EMEntry& e = it->second;
986 uint64_t key = ((uint64_t)e.partitionNum << 32) + e.segmentNum;
987 ++extentCountPerDbFile[key];
988 }
989 }
990
991 sort(scannedExtents.begin(), scannedExtents.end(), ExtentSorter());
992
993 numExtents = scannedExtents.size();
994 // @1098 initialize scanFlags to be true
995 scanFlags.assign(numExtents, true);
996 runtimeCPFlags.assign(numExtents, true);
997
998 for (uint32_t i = 0; i < numDBRoots; i++)
999 lastExtent[i] = -1;
1000
1001 for (uint32_t i = 0; i < scannedExtents.size(); i++)
1002 {
1003 uint32_t dbRoot = scannedExtents[i].dbRoot - 1;
1004
1005 /* Kludge to account for gaps in the dbroot mapping. */
1006 if (scannedExtents[i].dbRoot > numDBRoots)
1007 {
1008 lastExtent.resize(scannedExtents[i].dbRoot);
1009 lastScannedLBID.resize(scannedExtents[i].dbRoot);
1010
1011 for (uint32_t z = numDBRoots; z < scannedExtents[i].dbRoot; z++)
1012 lastExtent[z] = -1;
1013
1014 numDBRoots = scannedExtents[i].dbRoot;
1015 }
1016
1017 if ((scannedExtents[i].status == EXTENTAVAILABLE) && (lastExtent[dbRoot] < (int) i))
1018 lastExtent[dbRoot] = i;
1019
1020 //@bug 5322: 0 HWM may not mean full extent if 1 extent in file.
1021 // Use special status (EXTENTSTATUSMAX+1) to denote a single extent
1022 // file with HWM 0; retrieve 1 block and not full extent.
1023 if ((fExtentsPerSegFile > 1) && (scannedExtents[i].HWM == 0))
1024 {
1025 uint64_t key = ((uint64_t)scannedExtents[i].partitionNum << 32) +
1026 scannedExtents[i].segmentNum;
1027
1028 if (extentCountPerDbFile[key] == 1)
1029 scannedExtents[i].status = EXTENTSTATUSMAX + 1;
1030 }
1031 }
1032
1033 // if only 1 block is written in the last extent, HWM is 0 and didn't get counted.
1034 for (uint32_t i = 0; i < numDBRoots; i++)
1035 {
1036 if (lastExtent[i] != -1)
1037 lastScannedLBID[i] = scannedExtents[lastExtent[i]].range.start +
1038 (scannedExtents[lastExtent[i]].HWM - scannedExtents[lastExtent[i]].blockOffset);
1039 else
1040 lastScannedLBID[i] = -1;
1041 }
1042 }
1043
reloadExtentLists()1044 void TupleBPS::reloadExtentLists()
1045 {
1046 /*
1047 * Iterate over each ColumnCommand instance
1048 *
1049 * 1) reload its extent array
1050 * 2) update TupleBPS's extent array
1051 * 3) update vars dependent on the extent layout (lastExtent, scanFlags, etc)
1052 */
1053
1054 uint32_t i, j;
1055 ColumnCommandJL* cc;
1056 vector<SCommand>& filters = fBPP->getFilterSteps();
1057 vector<SCommand>& projections = fBPP->getProjectSteps();
1058 uint32_t oid;
1059
1060 /* To reduce the race, make all CC's get new extents as close together
1061 * as possible, then rebuild the local copies.
1062 */
1063
1064 for (i = 0; i < filters.size(); i++)
1065 {
1066 cc = dynamic_cast<ColumnCommandJL*>(filters[i].get());
1067
1068 if (cc != NULL)
1069 cc->reloadExtents();
1070 }
1071
1072 for (i = 0; i < projections.size(); i++)
1073 {
1074 cc = dynamic_cast<ColumnCommandJL*>(projections[i].get());
1075
1076 if (cc != NULL)
1077 cc->reloadExtents();
1078 }
1079
1080 extentsMap.clear();
1081
1082 for (i = 0; i < filters.size(); i++)
1083 {
1084 cc = dynamic_cast<ColumnCommandJL*>(filters[i].get());
1085
1086 if (cc == NULL)
1087 continue;
1088
1089 const vector<EMEntry>& extents = cc->getExtents();
1090 oid = cc->getOID();
1091
1092 extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
1093 tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[oid];
1094
1095 for (j = 0; j < extents.size(); j++)
1096 mref[extents[j].range.start] = extents[j];
1097 }
1098
1099 for (i = 0; i < projections.size(); i++)
1100 {
1101 cc = dynamic_cast<ColumnCommandJL*>(projections[i].get());
1102
1103 if (cc == NULL)
1104 continue;
1105
1106 const vector<EMEntry>& extents = cc->getExtents();
1107 oid = cc->getOID();
1108
1109 extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
1110 tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[oid];
1111
1112 for (j = 0; j < extents.size(); j++)
1113 mref[extents[j].range.start] = extents[j];
1114 }
1115
1116 initExtentMarkers();
1117 }
1118
run()1119 void TupleBPS::run()
1120 {
1121 uint32_t i;
1122 boost::mutex::scoped_lock lk(jlLock);
1123 uint32_t retryCounter = 0;
1124 const uint32_t retryMax = 1000; // 50s max; we've seen a 15s window so 50s should be 'safe'
1125 const uint32_t waitInterval = 50000; // in us
1126
1127 if (fRunExecuted)
1128 return;
1129
1130 fRunExecuted = true;
1131
1132 // make sure each numeric column has the same # of extents! See bugs 4564 and 3607
1133 try
1134 {
1135 while (!goodExtentCount() && retryCounter++ < retryMax)
1136 {
1137 usleep(waitInterval);
1138 reloadExtentLists();
1139 }
1140 }
1141 catch (std::exception& e)
1142 {
1143 ostringstream os;
1144 os << "TupleBPS: Could not get a consistent extent count for each column. Got '"
1145 << e.what() << "'\n";
1146 catchHandler(os.str(), ERR_TUPLE_BPS, fErrorInfo, fSessionId);
1147 fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
1148 return;
1149 }
1150
1151 if (retryCounter == retryMax)
1152 {
1153 catchHandler("TupleBPS: Could not get a consistent extent count for each column.",
1154 ERR_TUPLE_BPS, fErrorInfo, fSessionId);
1155 fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
1156 return;
1157 }
1158
1159 if (traceOn())
1160 {
1161 syslogStartStep(16, // exemgr subsystem
1162 std::string("TupleBPS")); // step name
1163 }
1164
1165 ByteStream bs;
1166
1167 if (fDelivery)
1168 {
1169 deliveryDL.reset(new RowGroupDL(1, 5));
1170 deliveryIt = deliveryDL->getIterator();
1171 }
1172
1173 fBPP->setThreadCount(fMaxNumThreads);
1174
1175 if (doJoin)
1176 for (i = 0; i < smallSideCount; i++)
1177 tjoiners[i]->setThreadCount(fMaxNumThreads);
1178
1179 if (fe1)
1180 fBPP->setFEGroup1(fe1, fe1Input);
1181
1182 if (fe2 && runFEonPM)
1183 fBPP->setFEGroup2(fe2, fe2Output);
1184
1185 if (fe2)
1186 {
1187 primRowGroup.initRow(&fe2InRow);
1188 fe2Output.initRow(&fe2OutRow);
1189 }
1190
1191 try
1192 {
1193 fDec->addDECEventListener(this);
1194 fBPP->priority(priority());
1195 fBPP->createBPP(bs);
1196 fDec->write(uniqueID, bs);
1197 BPPIsAllocated = true;
1198
1199 if (doJoin && tjoiners[0]->inPM())
1200 serializeJoiner();
1201
1202 prepCasualPartitioning();
1203 startPrimitiveThread();
1204 fProducerThreads.clear();
1205 fProducerThreads.reserve(fMaxNumThreads);
1206 startAggregationThread();
1207 }
1208 catch (...)
1209 {
1210 handleException(std::current_exception(),
1211 logging::ERR_TUPLE_BPS,
1212 logging::ERR_ALWAYS_CRITICAL,
1213 "TupleBPS::run()");
1214 fOutputJobStepAssociation.outAt(0)->rowGroupDL()->endOfInput();
1215 }
1216 }
1217
join()1218 void TupleBPS::join()
1219 {
1220 boost::mutex::scoped_lock lk(jlLock);
1221
1222 if (joinRan)
1223 return;
1224
1225 joinRan = true;
1226
1227 if (fRunExecuted)
1228 {
1229 if (msgsRecvd < msgsSent)
1230 {
1231 // wake up the sending thread, it should drain the input dl and exit
1232 boost::unique_lock<boost::mutex> tplLock(tplMutex);
1233 condvarWakeupProducer.notify_all();
1234 tplLock.unlock();
1235 }
1236
1237 if (pThread)
1238 jobstepThreadPool.join(pThread);
1239
1240 jobstepThreadPool.join(fProducerThreads);
1241
1242 if (BPPIsAllocated)
1243 {
1244 ByteStream bs;
1245 fDec->removeDECEventListener(this);
1246 fBPP->destroyBPP(bs);
1247
1248 try
1249 {
1250 fDec->write(uniqueID, bs);
1251 }
1252 catch (...)
1253 {
1254 handleException(std::current_exception(),
1255 logging::ERR_TUPLE_BPS,
1256 logging::ERR_ALWAYS_CRITICAL,
1257 "TupleBPS::join()");
1258 }
1259
1260 BPPIsAllocated = false;
1261 fDec->removeQueue(uniqueID);
1262 tjoiners.clear();
1263 }
1264 }
1265 }
1266
sendError(uint16_t status)1267 void TupleBPS::sendError(uint16_t status)
1268 {
1269 ByteStream msgBpp;
1270 fBPP->setCount(1);
1271 fBPP->setStatus(status);
1272 fBPP->runErrorBPP(msgBpp);
1273
1274 try
1275 {
1276 fDec->write(uniqueID, msgBpp);
1277 }
1278 catch (...)
1279 {
1280 // this fcn is only called in exception handlers
1281 // let the first error take precedence
1282 }
1283
1284 fBPP->reset();
1285 finishedSending = true;
1286 condvar.notify_all();
1287 condvarWakeupProducer.notify_all();
1288 }
1289
nextBand(ByteStream & bs)1290 uint32_t TupleBPS::nextBand(ByteStream& bs)
1291 {
1292 bool more = true;
1293 RowGroup& realOutputRG = (fe2 ? fe2Output : primRowGroup);
1294 RGData rgData;
1295 uint32_t rowCount = 0;
1296
1297 bs.restart();
1298
1299 while (rowCount == 0 && more)
1300 {
1301 more = deliveryDL->next(deliveryIt, &rgData);
1302
1303 if (!more)
1304 rgData = fBPP->getErrorRowGroupData(status());
1305
1306 realOutputRG.setData(&rgData);
1307 rowCount = realOutputRG.getRowCount();
1308
1309 if ((more && rowCount > 0) || !more)
1310 realOutputRG.serializeRGData(bs);
1311 }
1312
1313 return rowCount;
1314 }
1315
1316 /* The current interleaving rotates over PMs, clustering jobs for a single PM
1317 * by dbroot to keep block accesses adjacent & make best use of prefetching &
1318 * cache.
1319 */
interleaveJobs(vector<Job> * jobs) const1320 void TupleBPS::interleaveJobs(vector<Job>* jobs) const
1321 {
1322 vector<Job> newJobs;
1323 uint32_t i;
1324 uint32_t pmCount = 0;
1325 scoped_array<deque<Job> > bins;
1326
1327 // the input is grouped by dbroot
1328
1329 if (pmCount == 1)
1330 return;
1331
1332 /* Need to get the 'real' PM count */
1333 for (i = 0; i < jobs->size(); i++)
1334 if (pmCount < (*jobs)[i].connectionNum + 1)
1335 pmCount = (*jobs)[i].connectionNum + 1;
1336
1337 bins.reset(new deque<Job>[pmCount]);
1338
1339 // group by connection number
1340 for (i = 0; i < jobs->size(); i++)
1341 bins[(*jobs)[i].connectionNum].push_back((*jobs)[i]);
1342
1343 // interleave by connection num
1344 bool noWorkDone;
1345
1346 while (newJobs.size() < jobs->size())
1347 {
1348 noWorkDone = true;
1349
1350 for (i = 0; i < pmCount; i++)
1351 {
1352 if (!bins[i].empty())
1353 {
1354 newJobs.push_back(bins[i].front());
1355 bins[i].pop_front();
1356 noWorkDone = false;
1357 }
1358 }
1359
1360 idbassert(!noWorkDone);
1361 }
1362
1363 #if 0
1364 /* Work in progress */
1365 // up to this point, only the first connection to a PM is used.
1366 // the last step is to round-robin on the connections of each PM.
1367 // ex: on a 3 PM system where connections per PM is 2,
1368 // connections 0 and 3 are for PM 1, 1 and 4 are PM2, 2 and 5 are PM3.
1369 uint32_t* jobCounters = (uint32_t*) alloca(pmCount * sizeof(uint32_t));
1370 memset(jobCounters, 0, pmCount * sizeof(uint32_t));
1371
1372 for (i = 0; i < newJobs.size(); i++)
1373 {
1374 uint32_t& conn = newJobs[i].connectionNum; // for readability's sake
1375 conn = conn + (pmCount * jobCounters[conn]);
1376 jobCounters[conn]++;
1377 }
1378
1379 #endif
1380
1381 jobs->swap(newJobs);
1382 // cout << "-------------\n";
1383 // for (i = 0; i < jobs->size(); i++)
1384 // cout << "job " << i+1 << ": dbroot " << (*jobs)[i].dbroot << ", PM "
1385 // << (*jobs)[i].connectionNum + 1 << endl;
1386 }
1387
sendJobs(const vector<Job> & jobs)1388 void TupleBPS::sendJobs(const vector<Job>& jobs)
1389 {
1390 uint32_t i;
1391 boost::unique_lock<boost::mutex> tplLock(tplMutex, boost::defer_lock);
1392
1393 for (i = 0; i < jobs.size() && !cancelled(); i++)
1394 {
1395 fDec->write(uniqueID, *(jobs[i].msg));
1396 tplLock.lock();
1397 msgsSent += jobs[i].expectedResponses;
1398
1399 if (recvWaiting)
1400 condvar.notify_all();
1401
1402 while ((msgsSent - msgsRecvd > fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER)
1403 && !fDie)
1404 {
1405 sendWaiting = true;
1406 condvarWakeupProducer.wait(tplLock);
1407 sendWaiting = false;
1408 }
1409
1410 tplLock.unlock();
1411 }
1412 }
1413
compareSingleValue(uint8_t COP,int64_t val1,int64_t val2) const1414 bool TupleBPS::compareSingleValue(uint8_t COP, int64_t val1, int64_t val2) const
1415 {
1416 switch (COP)
1417 {
1418 case COMPARE_LT:
1419 case COMPARE_NGE:
1420 return (val1 < val2);
1421
1422 case COMPARE_LE:
1423 case COMPARE_NGT:
1424 return (val1 <= val2);
1425
1426 case COMPARE_GT:
1427 case COMPARE_NLE:
1428 return (val1 > val2);
1429
1430 case COMPARE_GE:
1431 case COMPARE_NLT:
1432 return (val1 >= val2);
1433
1434 case COMPARE_EQ:
1435 return (val1 == val2);
1436
1437 case COMPARE_NE:
1438 return (val1 != val2);
1439 }
1440
1441 return false;
1442 }
1443
1444 /* (range COP val) comparisons */
compareRange(uint8_t COP,int64_t min,int64_t max,int64_t val) const1445 bool TupleBPS::compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const
1446 {
1447 switch (COP)
1448 {
1449 case COMPARE_LT:
1450 case COMPARE_NGE:
1451 return (min < val);
1452
1453 case COMPARE_LE:
1454 case COMPARE_NGT:
1455 return (min <= val);
1456
1457 case COMPARE_GT:
1458 case COMPARE_NLE:
1459 return (max > val);
1460
1461 case COMPARE_GE:
1462 case COMPARE_NLT:
1463 return (max >= val);
1464
1465 case COMPARE_EQ: // an 'in' comparison
1466 return (val >= min && val <= max);
1467
1468 case COMPARE_NE: // 'not in'
1469 return (val < min || val > max);
1470 }
1471
1472 return false;
1473 }
1474
processSingleFilterString_ranged(int8_t BOP,int8_t colWidth,int64_t min,int64_t max,const uint8_t * filterString,uint32_t filterCount) const1475 bool TupleBPS::processSingleFilterString_ranged(int8_t BOP, int8_t colWidth, int64_t min, int64_t max, const uint8_t* filterString,
1476 uint32_t filterCount) const
1477 {
1478 uint j;
1479 bool ret = true;
1480
1481 for (j = 0; j < filterCount; j++)
1482 {
1483 int8_t COP;
1484 int64_t val2;
1485 bool thisPredicate;
1486 COP = *filterString++;
1487 filterString++; // skip the round var, don't think that applies here
1488
1489 switch (colWidth)
1490 {
1491 case 1:
1492 val2 = *((int8_t*) filterString);
1493 filterString++;
1494 break;
1495
1496 case 2:
1497 val2 = *((int16_t*) filterString);
1498 filterString += 2;
1499 break;
1500
1501 case 4:
1502 val2 = *((int32_t*) filterString);
1503 filterString += 4;
1504 break;
1505
1506 case 8:
1507 val2 = *((int64_t*) filterString);
1508 filterString += 8;
1509 break;
1510
1511 default:
1512 throw logic_error("invalid column width");
1513 }
1514
1515 thisPredicate = compareRange(COP, min, max, val2);
1516
1517 if (j == 0)
1518 ret = thisPredicate;
1519
1520 if (BOP == BOP_OR && thisPredicate)
1521 return true;
1522 else if (BOP == BOP_AND && !thisPredicate)
1523 return false;
1524 }
1525
1526 return ret;
1527 }
1528
processSingleFilterString(int8_t BOP,int8_t colWidth,int64_t val,const uint8_t * filterString,uint32_t filterCount) const1529 bool TupleBPS::processSingleFilterString(int8_t BOP, int8_t colWidth, int64_t val, const uint8_t* filterString,
1530 uint32_t filterCount) const
1531 {
1532 uint j;
1533 bool ret = true;
1534
1535 for (j = 0; j < filterCount; j++)
1536 {
1537 int8_t COP;
1538 int64_t val2;
1539 bool thisPredicate;
1540 COP = *filterString++;
1541 filterString++; // skip the round var, don't think that applies here
1542
1543 switch (colWidth)
1544 {
1545 case 1:
1546 val2 = *((int8_t*) filterString);
1547 filterString++;
1548 break;
1549
1550 case 2:
1551 val2 = *((int16_t*) filterString);
1552 filterString += 2;
1553 break;
1554
1555 case 4:
1556 val2 = *((int32_t*) filterString);
1557 filterString += 4;
1558 break;
1559
1560 case 8:
1561 val2 = *((int64_t*) filterString);
1562 filterString += 8;
1563 break;
1564
1565 default:
1566 throw logic_error("invalid column width");
1567 }
1568
1569 thisPredicate = compareSingleValue(COP, val, val2);
1570
1571 if (j == 0)
1572 ret = thisPredicate;
1573
1574 if (BOP == BOP_OR && thisPredicate)
1575 return true;
1576 else if (BOP == BOP_AND && !thisPredicate)
1577 return false;
1578 }
1579
1580 return ret;
1581 }
1582
processOneFilterType(int8_t colWidth,int64_t value,uint32_t type) const1583 bool TupleBPS::processOneFilterType(int8_t colWidth, int64_t value, uint32_t type) const
1584 {
1585 const vector<SCommand>& filters = fBPP->getFilterSteps();
1586 uint i;
1587 bool ret = true;
1588 bool firstPseudo = true;
1589
1590 for (i = 0; i < filters.size(); i++)
1591 {
1592 PseudoCCJL* pseudo = dynamic_cast<PseudoCCJL*>(filters[i].get());
1593
1594 if (!pseudo || pseudo->getFunction() != type)
1595 continue;
1596
1597 int8_t BOP = pseudo->getBOP(); // I think this is the same as TupleBPS's bop var...?
1598
1599 /* 1-byte COP, 1-byte 'round', colWidth-bytes value */
1600 const uint8_t* filterString = pseudo->getFilterString().buf();
1601 uint32_t filterCount = pseudo->getFilterCount();
1602 bool thisPredicate = processSingleFilterString(BOP, colWidth, value, filterString, filterCount);
1603
1604 if (firstPseudo)
1605 {
1606 firstPseudo = false;
1607 ret = thisPredicate;
1608 }
1609
1610 if (bop == BOP_OR && thisPredicate)
1611 return true;
1612 else if (bop == BOP_AND && !thisPredicate)
1613 return false;
1614 }
1615
1616 return ret;
1617 }
1618
processLBIDFilter(const EMEntry & emEntry) const1619 bool TupleBPS::processLBIDFilter(const EMEntry& emEntry) const
1620 {
1621 const vector<SCommand>& filters = fBPP->getFilterSteps();
1622 uint i;
1623 bool ret = true;
1624 bool firstPseudo = true;
1625 LBID_t firstLBID = emEntry.range.start;
1626 LBID_t lastLBID = firstLBID + (emEntry.range.size * 1024) - 1;
1627
1628 for (i = 0; i < filters.size(); i++)
1629 {
1630 PseudoCCJL* pseudo = dynamic_cast<PseudoCCJL*>(filters[i].get());
1631
1632 if (!pseudo || pseudo->getFunction() != PSEUDO_BLOCKID)
1633 continue;
1634
1635 int8_t BOP = pseudo->getBOP(); // I think this is the same as TupleBPS's bop var...?
1636
1637 /* 1-byte COP, 1-byte 'round', colWidth-bytes value */
1638 const uint8_t* filterString = pseudo->getFilterString().buf();
1639 uint32_t filterCount = pseudo->getFilterCount();
1640 bool thisPredicate = processSingleFilterString_ranged(BOP, 8,
1641 firstLBID, lastLBID, filterString, filterCount);
1642
1643 if (firstPseudo)
1644 {
1645 firstPseudo = false;
1646 ret = thisPredicate;
1647 }
1648
1649 if (bop == BOP_OR && thisPredicate)
1650 return true;
1651 else if (bop == BOP_AND && !thisPredicate)
1652 return false;
1653 }
1654
1655 return ret;
1656 }
1657
processPseudoColFilters(uint32_t extentIndex,boost::shared_ptr<map<int,int>> dbRootPMMap) const1658 bool TupleBPS::processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr<map<int, int> > dbRootPMMap) const
1659 {
1660 if (!hasPCFilter)
1661 return true;
1662
1663 const EMEntry& emEntry = scannedExtents[extentIndex];
1664
1665 if (bop == BOP_AND)
1666 {
1667 /* All Pseudocolumns have been promoted to 8-bytes except the casual partitioning filters */
1668 return (!hasPMFilter || processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM))
1669 && (!hasSegmentFilter || processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT))
1670 && (!hasDBRootFilter || processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT))
1671 && (!hasSegmentDirFilter || processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR))
1672 && (!hasExtentIDFilter || processOneFilterType(8, emEntry.range.start, PSEUDO_EXTENTID))
1673 && (!hasMaxFilter || (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1674 processOneFilterType(emEntry.range.size, emEntry.partition.cprange.hi_val, PSEUDO_EXTENTMAX) : true))
1675 && (!hasMinFilter || (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1676 processOneFilterType(emEntry.range.size, emEntry.partition.cprange.lo_val, PSEUDO_EXTENTMIN) : true))
1677 && (!hasLBIDFilter || processLBIDFilter(emEntry))
1678 ;
1679 }
1680 else
1681 {
1682 return (hasPMFilter && processOneFilterType(8, (*dbRootPMMap)[emEntry.dbRoot], PSEUDO_PM))
1683 || (hasSegmentFilter && processOneFilterType(8, emEntry.segmentNum, PSEUDO_SEGMENT))
1684 || (hasDBRootFilter && processOneFilterType(8, emEntry.dbRoot, PSEUDO_DBROOT))
1685 || (hasSegmentDirFilter && processOneFilterType(8, emEntry.partitionNum, PSEUDO_SEGMENTDIR))
1686 || (hasExtentIDFilter && processOneFilterType(8, emEntry.range.start, PSEUDO_EXTENTID))
1687 || (hasMaxFilter && (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1688 processOneFilterType(emEntry.range.size, emEntry.partition.cprange.hi_val, PSEUDO_EXTENTMAX) : false))
1689 || (hasMinFilter && (emEntry.partition.cprange.isValid == BRM::CP_VALID ?
1690 processOneFilterType(emEntry.range.size, emEntry.partition.cprange.lo_val, PSEUDO_EXTENTMIN) : false))
1691 || (hasLBIDFilter && processLBIDFilter(emEntry))
1692 ;
1693 }
1694 }
1695
1696
makeJobs(vector<Job> * jobs)1697 void TupleBPS::makeJobs(vector<Job>* jobs)
1698 {
1699 boost::shared_ptr<ByteStream> bs;
1700 uint32_t i;
1701 uint32_t lbidsToScan;
1702 uint32_t blocksToScan;
1703 uint32_t blocksPerJob;
1704 LBID_t startingLBID;
1705 oam::OamCache* oamCache = oam::OamCache::makeOamCache();
1706 boost::shared_ptr<map<int, int> > dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
1707 boost::shared_ptr<map<int, int> > dbRootPMMap = oamCache->getDBRootToPMMap();
1708 int localPMId = oamCache->getLocalPMId();
1709
1710 idbassert(ffirstStepType == SCAN);
1711
1712 if (fOid >= 3000 && bop == BOP_AND)
1713 storeCasualPartitionInfo(false);
1714
1715 totalMsgs = 0;
1716
1717 for (i = 0; i < scannedExtents.size(); i++)
1718 {
1719
1720 // the # of LBIDs to scan in this extent, if it will be scanned.
1721 //@bug 5322: status EXTENTSTATUSMAX+1 means single block extent.
1722 if ((scannedExtents[i].HWM == 0) &&
1723 ((int) i < lastExtent[scannedExtents[i].dbRoot - 1]) &&
1724 (scannedExtents[i].status <= EXTENTSTATUSMAX))
1725 lbidsToScan = scannedExtents[i].range.size * 1024;
1726 else
1727 lbidsToScan = scannedExtents[i].HWM - scannedExtents[i].blockOffset + 1;
1728
1729 // skip this extent if CP data rules it out or the scan has already passed
1730 // the last extent for that DBRoot (import may be adding extents that shouldn't
1731 // be read yet). Also skip if there's a pseudocolumn with a filter that would
1732 // eliminate this extent
1733
1734 bool inBounds = ((int)i <= lastExtent[scannedExtents[i].dbRoot - 1]);
1735
1736 if (!inBounds)
1737 {
1738 continue;
1739 }
1740
1741 if (!scanFlags[i])
1742 {
1743 fNumBlksSkipped += lbidsToScan;
1744 continue;
1745 }
1746
1747 if (!processPseudoColFilters(i, dbRootPMMap))
1748 {
1749 fNumBlksSkipped += lbidsToScan;
1750 continue;
1751 }
1752
1753 //if (!scanFlags[i] || !inBounds)
1754 // continue;
1755
1756 /* Figure out many blocks have data in this extent
1757 * Calc how many jobs to issue,
1758 * Get the dbroot,
1759 * construct the job msgs
1760 */
1761
1762 // Bug5741 If we are local only and this doesn't belongs to us, skip it
1763 if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY)
1764 {
1765 if (localPMId == 0)
1766 {
1767 throw IDBExcept(ERR_LOCAL_QUERY_UM);
1768 }
1769
1770 if (dbRootPMMap->find(scannedExtents[i].dbRoot)->second != localPMId)
1771 continue;
1772 }
1773
1774 // a necessary DB root is offline
1775 if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
1776 {
1777 // MCOL-259 force a reload of the xml. This usualy fixes it.
1778 Logger log;
1779 log.logMessage(logging::LOG_TYPE_WARNING, "forcing reload of columnstore.xml for dbRootConnectionMap");
1780 oamCache->forceReload();
1781 dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
1782
1783 if (dbRootConnectionMap->find(scannedExtents[i].dbRoot) == dbRootConnectionMap->end())
1784 {
1785 log.logMessage(logging::LOG_TYPE_WARNING, "dbroot still not in dbRootConnectionMap");
1786 throw IDBExcept(ERR_DATA_OFFLINE);
1787 }
1788 }
1789
1790
1791 // the # of logical blocks in this extent
1792 if (lbidsToScan % fColType.colWidth)
1793 blocksToScan = lbidsToScan / fColType.colWidth + 1;
1794 else
1795 blocksToScan = lbidsToScan / fColType.colWidth;
1796
1797 totalMsgs += blocksToScan;
1798
1799 // how many logical blocks to process with a single job (& single thread on the PM)
1800 #if defined(_MSC_VER) && BOOST_VERSION < 105200
1801 blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16UL);
1802 #else
1803 blocksPerJob = max(blocksToScan / fProcessorThreadsPerScan, 16U);
1804 #endif
1805
1806 startingLBID = scannedExtents[i].range.start;
1807
1808 while (blocksToScan > 0)
1809 {
1810 uint32_t blocksThisJob = min(blocksToScan, blocksPerJob);
1811
1812 fBPP->setLBID(startingLBID, scannedExtents[i]);
1813 fBPP->setCount(blocksThisJob);
1814 bs.reset(new ByteStream());
1815 fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot]);
1816 jobs->push_back(Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot],
1817 blocksThisJob, bs));
1818 blocksToScan -= blocksThisJob;
1819 startingLBID += fColType.colWidth * blocksThisJob;
1820 fBPP->reset();
1821 }
1822 }
1823
1824 }
1825
sendPrimitiveMessages()1826 void TupleBPS::sendPrimitiveMessages()
1827 {
1828 vector<Job> jobs;
1829
1830 idbassert(ffirstStepType == SCAN);
1831
1832 if (cancelled())
1833 goto abort;
1834
1835 try
1836 {
1837 makeJobs(&jobs);
1838 interleaveJobs(&jobs);
1839 sendJobs(jobs);
1840 }
1841 catch (...)
1842 {
1843 sendError(logging::ERR_TUPLE_BPS);
1844 handleException(std::current_exception(),
1845 logging::ERR_TUPLE_BPS,
1846 logging::ERR_ALWAYS_CRITICAL,
1847 "st: " + std::to_string(fStepId) +
1848 " TupleBPS::sendPrimitiveMessages()");
1849 abort_nolock();
1850 }
1851
1852 abort:
1853 boost::unique_lock<boost::mutex> tplLock(tplMutex);
1854 finishedSending = true;
1855 condvar.notify_all();
1856 tplLock.unlock();
1857 }
1858
1859 struct _CPInfo
1860 {
_CPInfojoblist::_CPInfo1861 _CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool val) : min(MIN), max(MAX), LBID(l), valid(val) { };
1862 int64_t min;
1863 int64_t max;
1864 uint64_t LBID;
1865 bool valid;
1866 };
1867
receiveMultiPrimitiveMessages(uint32_t threadID)1868 void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
1869 {
1870 AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
1871 RowGroupDL* dlp = (fDelivery ? deliveryDL.get() : dl->rowGroupDL());
1872
1873 uint32_t size = 0;
1874 vector<boost::shared_ptr<ByteStream> > bsv;
1875
1876 RGData rgData;
1877 vector<RGData> rgDatav;
1878 vector<RGData> fromPrimProc;
1879
1880 bool validCPData;
1881 int64_t min;
1882 int64_t max;
1883 uint64_t lbid;
1884 vector<_CPInfo> cpv;
1885 uint32_t cachedIO;
1886 uint32_t physIO;
1887 uint32_t touchedBlocks;
1888 uint32_t cachedIO_Thread = 0;
1889 uint32_t physIO_Thread = 0;
1890 uint32_t touchedBlocks_Thread = 0;
1891 int64_t ridsReturned_Thread = 0;
1892 bool lastThread = false;
1893 uint32_t i, j, k;
1894 RowGroup local_primRG = primRowGroup;
1895 RowGroup local_outputRG = outputRowGroup;
1896 bool didEOF = false;
1897 bool unused;
1898
1899 /* Join vars */
1900 vector<vector<Row::Pointer> > joinerOutput; // clean usage
1901 Row largeSideRow, joinedBaseRow, largeNull, joinFERow; // LSR clean
1902 scoped_array<Row> smallSideRows, smallNulls;
1903 scoped_array<uint8_t> joinedBaseRowData;
1904 scoped_array<uint8_t> joinFERowData;
1905 shared_array<int> largeMapping;
1906 vector<shared_array<int> > smallMappings;
1907 vector<shared_array<int> > fergMappings;
1908 RGData joinedData;
1909 scoped_array<uint8_t> largeNullMemory;
1910 scoped_array<shared_array<uint8_t> > smallNullMemory;
1911 uint32_t matchCount;
1912
1913 /* Thread-scoped F&E 2 var */
1914 Row postJoinRow; // postJoinRow is also used for joins
1915 RowGroup local_fe2Output;
1916 RGData local_fe2Data;
1917 Row local_fe2OutRow;
1918 funcexp::FuncExpWrapper local_fe2;
1919 StepTeleStats sts;
1920 sts.query_uuid = fQueryUuid;
1921 sts.step_uuid = fStepUuid;
1922 boost::unique_lock<boost::mutex> tplLock(tplMutex, boost::defer_lock);
1923
1924 try
1925 {
1926 if (doJoin || fe2)
1927 {
1928 local_outputRG.initRow(&postJoinRow);
1929 }
1930
1931 if (fe2)
1932 {
1933 local_fe2Output = fe2Output;
1934 local_fe2Output.initRow(&local_fe2OutRow);
1935 local_fe2Data.reinit(fe2Output);
1936 local_fe2Output.setData(&local_fe2Data);
1937 // local_fe2OutRow = fe2OutRow;
1938 local_fe2 = *fe2;
1939 }
1940
1941 if (doJoin)
1942 {
1943 joinerOutput.resize(smallSideCount);
1944 smallSideRows.reset(new Row[smallSideCount]);
1945 smallNulls.reset(new Row[smallSideCount]);
1946 smallMappings.resize(smallSideCount);
1947 fergMappings.resize(smallSideCount + 1);
1948 smallNullMemory.reset(new shared_array<uint8_t>[smallSideCount]);
1949 local_primRG.initRow(&largeSideRow);
1950 local_outputRG.initRow(&joinedBaseRow, true);
1951 joinedBaseRowData.reset(new uint8_t[joinedBaseRow.getSize()]);
1952 joinedBaseRow.setData(joinedBaseRowData.get());
1953 joinedBaseRow.initToNull();
1954 largeMapping = makeMapping(local_primRG, local_outputRG);
1955
1956 bool hasJoinFE = false;
1957
1958 for (i = 0; i < smallSideCount; i++)
1959 {
1960 joinerMatchesRGs[i].initRow(&smallSideRows[i]);
1961 smallMappings[i] = makeMapping(joinerMatchesRGs[i], local_outputRG);
1962
1963 // if (tjoiners[i]->semiJoin() || tjoiners[i]->antiJoin()) {
1964 if (tjoiners[i]->hasFEFilter())
1965 {
1966 fergMappings[i] = makeMapping(joinerMatchesRGs[i], joinFERG);
1967 hasJoinFE = true;
1968 }
1969
1970 // }
1971 }
1972
1973 if (hasJoinFE)
1974 {
1975 joinFERG.initRow(&joinFERow, true);
1976 joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
1977 memset(joinFERowData.get(), 0, joinFERow.getSize());
1978 joinFERow.setData(joinFERowData.get());
1979 fergMappings[smallSideCount] = makeMapping(local_primRG, joinFERG);
1980 }
1981
1982 for (i = 0; i < smallSideCount; i++)
1983 {
1984 joinerMatchesRGs[i].initRow(&smallNulls[i], true);
1985 smallNullMemory[i].reset(new uint8_t[smallNulls[i].getSize()]);
1986 smallNulls[i].setData(smallNullMemory[i].get());
1987 smallNulls[i].initToNull();
1988 }
1989
1990 local_primRG.initRow(&largeNull, true);
1991 largeNullMemory.reset(new uint8_t[largeNull.getSize()]);
1992 largeNull.setData(largeNullMemory.get());
1993 largeNull.initToNull();
1994
1995 #if 0
1996
1997 if (threadID == 0)
1998 {
1999 /* Some rowgroup debugging stuff. */
2000 uint8_t* tmp8;
2001 tmp8 = local_primRG.getData();
2002 local_primRG.setData(NULL);
2003 cout << "large-side RG: " << local_primRG.toString() << endl;
2004 local_primRG.setData(tmp8);
2005
2006 for (i = 0; i < smallSideCount; i++)
2007 {
2008 tmp8 = joinerMatchesRGs[i].getData();
2009 joinerMatchesRGs[i].setData(NULL);
2010 cout << "small-side[" << i << "] RG: " << joinerMatchesRGs[i].toString() << endl;
2011 }
2012
2013 tmp8 = local_outputRG.getData();
2014 local_outputRG.setData(NULL);
2015 cout << "output RG: " << local_outputRG.toString() << endl;
2016 local_outputRG.setData(tmp8);
2017
2018 cout << "large mapping:\n";
2019
2020 for (i = 0; i < local_primRG.getColumnCount(); i++)
2021 cout << largeMapping[i] << " ";
2022
2023 cout << endl;
2024
2025 for (uint32_t z = 0; z < smallSideCount; z++)
2026 {
2027 cout << "small mapping[" << z << "] :\n";
2028
2029 for (i = 0; i < joinerMatchesRGs[z].getColumnCount(); i++)
2030 cout << smallMappings[z][i] << " ";
2031
2032 cout << endl;
2033 }
2034 }
2035
2036 #endif
2037 }
2038
2039 tplLock.lock();
2040
2041 while (1)
2042 {
2043 // sync with the send side
2044 while (!finishedSending && msgsSent == msgsRecvd)
2045 {
2046 recvWaiting++;
2047 condvar.wait(tplLock);
2048 recvWaiting--;
2049 }
2050
2051 if (msgsSent == msgsRecvd && finishedSending)
2052 break;
2053
2054 bool flowControlOn;
2055 fDec->read_some(uniqueID, fNumThreads, bsv, &flowControlOn);
2056 size = bsv.size();
2057
2058 // @bug 4562
2059 if (traceOn() && fOid >= 3000 && dlTimes.FirstReadTime().tv_sec == 0)
2060 dlTimes.setFirstReadTime();
2061
2062 if (fOid >= 3000 && threadID == 0 && sts.msg_type == StepTeleStats::ST_INVALID && size > 0)
2063 {
2064 sts.msg_type = StepTeleStats::ST_START;
2065 sts.total_units_of_work = totalMsgs;
2066 postStepStartTele(sts);
2067 }
2068
2069 /* This is a simple ramp-up of the TBPS msg processing threads.
2070 One thread is created by run(), and add'l threads are created
2071 as needed. Right now, "as needed" means that flow control
2072 is on, which means that the UM is not keeping up by definition,
2073 or size > 5. We found that using flow control alone was not aggressive
2074 enough when the messages were small. The 'size' parameter checks
2075 the number of msgs waiting in the DEC buffers. Since each
2076 message can be processed independently of the others, they can all
2077 be processed in different threads. In benchmarking we found that
2078 there was no end-to-end performance difference between using 1
2079 and 20 msgs as the threshold. Erring on the side of aggressiveness,
2080 we chose '5'.
2081 '5' still preserves the original goal of not starting MAX threads
2082 for small queries or when the UM can keep up with the PMs with
2083 fewer threads. Tweak as necessary. */
2084
2085 if ((size > 5 || flowControlOn) && fNumThreads < fMaxNumThreads)
2086 startAggregationThread();
2087
2088 for (uint32_t z = 0; z < size; z++)
2089 {
2090 if (bsv[z]->length() > 0 && fBPP->countThisMsg(*(bsv[z])))
2091 ++msgsRecvd;
2092 }
2093
2094 //@Bug 1424,1298
2095
2096 if (sendWaiting && ((msgsSent - msgsRecvd) <=
2097 (fMaxOutstandingRequests << LOGICAL_EXTENT_CONVERTER)))
2098 {
2099 condvarWakeupProducer.notify_one();
2100 THROTTLEDEBUG << "receiveMultiPrimitiveMessages wakes up sending side .. " << " msgsSent: " << msgsSent << " msgsRecvd = " << msgsRecvd << endl;
2101 }
2102
2103 /* If there's an error and the joblist is being aborted, don't
2104 sit around forever waiting for responses. */
2105 if (cancelled())
2106 {
2107 if (sendWaiting)
2108 condvarWakeupProducer.notify_one();
2109
2110 break;
2111 }
2112
2113 if (size == 0)
2114 {
2115 tplLock.unlock();
2116 usleep(2000 * fNumThreads);
2117 tplLock.lock();
2118 continue;
2119 }
2120
2121 tplLock.unlock();
2122
2123 for (i = 0; i < size && !cancelled(); i++)
2124 {
2125 ByteStream* bs = bsv[i].get();
2126
2127 // @bug 488. when PrimProc node is down. error out
2128 //An error condition. We are not going to do anymore.
2129 ISMPacketHeader* hdr = (ISMPacketHeader*)(bs->buf());
2130
2131 if (bs->length() == 0 || hdr->Status > 0)
2132 {
2133 /* PM errors mean this should abort right away instead of draining the PM backlog */
2134 tplLock.lock();
2135
2136 if (bs->length() == 0)
2137 {
2138 errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_PRIMPROC_DOWN));
2139 status(ERR_PRIMPROC_DOWN);
2140 }
2141 else
2142 {
2143 string errMsg;
2144
2145 bs->advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
2146 *bs >> errMsg;
2147 status(hdr->Status);
2148 errorMessage(errMsg);
2149 }
2150
2151 abort_nolock();
2152 goto out;
2153 }
2154
2155 fromPrimProc.clear();
2156 fBPP->getRowGroupData(*bs, &fromPrimProc, &validCPData, &lbid, &min, &max,
2157 &cachedIO, &physIO, &touchedBlocks, &unused, threadID);
2158
2159 /* Another layer of messiness. Need to refactor this fcn. */
2160 while (!fromPrimProc.empty() && !cancelled())
2161 {
2162 rgData = fromPrimProc.back();
2163 fromPrimProc.pop_back();
2164
2165 local_primRG.setData(&rgData);
2166 ridsReturned_Thread += local_primRG.getRowCount(); // TODO need the pre-join count even on PM joins... later
2167
2168 /* TupleHashJoinStep::joinOneRG() is a port of the main join loop here. Any
2169 * changes made here should also be made there and vice versa. */
2170 if (hasUMJoin || !fBPP->pmSendsFinalResult())
2171 {
2172 joinedData = RGData(local_outputRG);
2173 local_outputRG.setData(&joinedData);
2174 local_outputRG.resetRowGroup(local_primRG.getBaseRid());
2175 local_outputRG.setDBRoot(local_primRG.getDBRoot());
2176 local_primRG.getRow(0, &largeSideRow);
2177
2178 for (k = 0; k < local_primRG.getRowCount() && !cancelled(); k++, largeSideRow.nextRow())
2179 {
2180 matchCount = 0;
2181
2182 for (j = 0; j < smallSideCount; j++)
2183 {
2184 tjoiners[j]->match(largeSideRow, k, threadID, &joinerOutput[j]);
2185 #ifdef JLF_DEBUG
2186 // Debugging code to print the matches
2187 Row r;
2188 joinerMatchesRGs[j].initRow(&r);
2189 cout << joinerOutput[j].size() << " matches: \n";
2190 for (uint32_t z = 0; z < joinerOutput[j].size(); z++) {
2191 r.setPointer(joinerOutput[j][z]);
2192 cout << " " << r.toString() << endl;
2193 }
2194 #endif
2195 matchCount = joinerOutput[j].size();
2196
2197 if (tjoiners[j]->inUM())
2198 {
2199 /* Count the # of rows that pass the join filter */
2200 if (tjoiners[j]->hasFEFilter() && matchCount > 0)
2201 {
2202 vector<Row::Pointer> newJoinerOutput;
2203 applyMapping(fergMappings[smallSideCount], largeSideRow, &joinFERow);
2204
2205 for (uint32_t z = 0; z < joinerOutput[j].size(); z++)
2206 {
2207 smallSideRows[j].setPointer(joinerOutput[j][z]);
2208 applyMapping(fergMappings[j], smallSideRows[j], &joinFERow);
2209
2210 if (!tjoiners[j]->evaluateFilter(joinFERow, threadID))
2211 matchCount--;
2212 else
2213 {
2214 /* The first match includes it in a SEMI join result and excludes it from an ANTI join
2215 * result. If it's SEMI & SCALAR however, it needs to continue.
2216 */
2217 newJoinerOutput.push_back(joinerOutput[j][z]);
2218
2219 if (tjoiners[j]->antiJoin() || (tjoiners[j]->semiJoin() && !tjoiners[j]->scalar()))
2220 break;
2221 }
2222 }
2223
2224 // the filter eliminated all matches, need to join with the NULL row
2225 if (matchCount == 0 && tjoiners[j]->largeOuterJoin())
2226 {
2227 newJoinerOutput.push_back(Row::Pointer(smallNullMemory[j].get()));
2228 matchCount = 1;
2229 }
2230
2231 joinerOutput[j].swap(newJoinerOutput);
2232 }
2233
2234 // XXXPAT: This has gone through enough revisions it would benefit
2235 // from refactoring
2236
2237 /* If anti-join, reverse the result */
2238 if (tjoiners[j]->antiJoin())
2239 {
2240 matchCount = (matchCount ? 0 : 1);
2241 }
2242
2243 if (matchCount == 0)
2244 {
2245 joinerOutput[j].clear();
2246 break;
2247 }
2248 else if (!tjoiners[j]->scalar() &&
2249 (tjoiners[j]->antiJoin() || tjoiners[j]->semiJoin()))
2250 {
2251 joinerOutput[j].clear();
2252 joinerOutput[j].push_back(Row::Pointer(smallNullMemory[j].get()));
2253 matchCount = 1;
2254 }
2255 }
2256
2257 if (matchCount == 0 && tjoiners[j]->innerJoin())
2258 break;
2259
2260 /* Scalar check */
2261 if (tjoiners[j]->scalar() && matchCount > 1)
2262 {
2263 errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW));
2264 status(ERR_MORE_THAN_1_ROW);
2265 abort();
2266 }
2267
2268 if (tjoiners[j]->smallOuterJoin())
2269 tjoiners[j]->markMatches(threadID, joinerOutput[j]);
2270
2271 }
2272
2273 if (matchCount > 0)
2274 {
2275 applyMapping(largeMapping, largeSideRow, &joinedBaseRow);
2276 joinedBaseRow.setRid(largeSideRow.getRelRid());
2277 generateJoinResultSet(joinerOutput, joinedBaseRow, smallMappings,
2278 0, local_outputRG, joinedData, &rgDatav, smallSideRows, postJoinRow);
2279
2280 /* Bug 3510: Don't let the join results buffer get out of control. Need
2281 to refactor this. All post-join processing needs to go here AND below
2282 for now. */
2283 if (rgDatav.size() * local_outputRG.getMaxDataSize() > 50000000)
2284 {
2285 RowGroup out(local_outputRG);
2286
2287 if (fe2 && !runFEonPM)
2288 {
2289 processFE2(out, local_fe2Output, postJoinRow,
2290 local_fe2OutRow, &rgDatav, &local_fe2);
2291 rgDataVecToDl(rgDatav, local_fe2Output, dlp);
2292 }
2293 else
2294 rgDataVecToDl(rgDatav, out, dlp);
2295 }
2296 }
2297 } // end of the for-loop in the join code
2298
2299 if (local_outputRG.getRowCount() > 0)
2300 {
2301 rgDatav.push_back(joinedData);
2302 }
2303 }
2304 else
2305 {
2306 rgDatav.push_back(rgData);
2307 }
2308
2309 /* Execute UM F & E group 2 on rgDatav */
2310 if (fe2 && !runFEonPM && rgDatav.size() > 0 && !cancelled())
2311 {
2312 processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
2313 rgDataVecToDl(rgDatav, local_fe2Output, dlp);
2314 }
2315
2316 cachedIO_Thread += cachedIO;
2317 physIO_Thread += physIO;
2318 touchedBlocks_Thread += touchedBlocks;
2319
2320 if (fOid >= 3000 && ffirstStepType == SCAN && bop == BOP_AND)
2321 cpv.push_back(_CPInfo(min, max, lbid, validCPData));
2322 } // end of the per-rowgroup processing loop
2323
2324 // insert the resulting rowgroup data from a single bytestream into dlp
2325 if (rgDatav.size() > 0)
2326 {
2327 if (fe2 && runFEonPM)
2328 rgDataVecToDl(rgDatav, local_fe2Output, dlp);
2329 else
2330 rgDataVecToDl(rgDatav, local_outputRG, dlp);
2331 }
2332 } // end of the per-bytestream loop
2333
2334 // @bug 4562
2335 if (traceOn() && fOid >= 3000)
2336 dlTimes.setFirstInsertTime();
2337
2338 //update casual partition
2339 size = cpv.size();
2340
2341 if (size > 0 && !cancelled())
2342 {
2343 cpMutex.lock();
2344
2345 for (i = 0; i < size; i++)
2346 {
2347 lbidList->UpdateMinMax(cpv[i].min, cpv[i].max, cpv[i].LBID, fColType,
2348 cpv[i].valid);
2349 }
2350
2351 cpMutex.unlock();
2352 }
2353
2354 cpv.clear();
2355
2356 tplLock.lock();
2357
2358 if (fOid >= 3000)
2359 {
2360 uint64_t progress = msgsRecvd * 100 / totalMsgs;
2361 bool postProgress = (progress > fProgress);
2362
2363 if (postProgress)
2364 {
2365 fProgress = progress;
2366
2367 sts.msg_type = StepTeleStats::ST_PROGRESS;
2368 sts.total_units_of_work = totalMsgs;
2369 sts.units_of_work_completed = msgsRecvd;
2370 postStepProgressTele(sts);
2371 }
2372 }
2373
2374 } // done reading
2375
2376 }//try
2377 catch (...)
2378 {
2379 handleException(std::current_exception(),
2380 logging::ERR_TUPLE_BPS,
2381 logging::ERR_ALWAYS_CRITICAL,
2382 "st: " + std::to_string(fStepId) +
2383 " TupleBPS::receiveMultiPrimitiveMessages()");
2384 abort_nolock();
2385 }
2386
2387 out:
2388
2389 if (++recvExited == fNumThreads)
2390 {
2391
2392 if (doJoin && smallOuterJoiner != -1 && !cancelled())
2393 {
2394 tplLock.unlock();
2395 /* If this was a left outer join, this needs to put the unmatched
2396 rows from the joiner into the output
2397 XXXPAT: This might be a problem if later steps depend
2398 on sensible rids and/or sensible ordering */
2399 vector<Row::Pointer> unmatched;
2400 #ifdef JLF_DEBUG
2401 cout << "finishing small-outer join output\n";
2402 #endif
2403 i = smallOuterJoiner;
2404 tjoiners[i]->getUnmarkedRows(&unmatched);
2405 joinedData = RGData(local_outputRG);
2406 local_outputRG.setData(&joinedData);
2407 local_outputRG.resetRowGroup(-1);
2408 local_outputRG.getRow(0, &joinedBaseRow);
2409
2410 for (j = 0; j < unmatched.size(); j++)
2411 {
2412 smallSideRows[i].setPointer(unmatched[j]);
2413
2414 for (k = 0; k < smallSideCount; k++)
2415 {
2416 if (i == k)
2417 applyMapping(smallMappings[i], smallSideRows[i], &joinedBaseRow);
2418 else
2419 applyMapping(smallMappings[k], smallNulls[k], &joinedBaseRow);
2420 }
2421
2422 applyMapping(largeMapping, largeNull, &joinedBaseRow);
2423 joinedBaseRow.setRid(0);
2424 joinedBaseRow.nextRow();
2425 local_outputRG.incRowCount();
2426
2427 if (local_outputRG.getRowCount() == 8192)
2428 {
2429 if (fe2)
2430 {
2431 rgDatav.push_back(joinedData);
2432 processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
2433
2434 if (rgDatav.size() > 0)
2435 rgDataToDl(rgDatav[0], local_fe2Output, dlp);
2436
2437 rgDatav.clear();
2438 }
2439 else
2440 rgDataToDl(joinedData, local_outputRG, dlp);
2441
2442 joinedData = RGData(local_outputRG);
2443 local_outputRG.setData(&joinedData);
2444 local_outputRG.resetRowGroup(-1);
2445 local_outputRG.getRow(0, &joinedBaseRow);
2446 }
2447 }
2448
2449 if (local_outputRG.getRowCount() > 0)
2450 {
2451 if (fe2)
2452 {
2453 rgDatav.push_back(joinedData);
2454 processFE2(local_outputRG, local_fe2Output, postJoinRow, local_fe2OutRow, &rgDatav, &local_fe2);
2455
2456 if (rgDatav.size() > 0)
2457 rgDataToDl(rgDatav[0], local_fe2Output, dlp);
2458
2459 rgDatav.clear();
2460 }
2461 else
2462 rgDataToDl(joinedData, local_outputRG, dlp);
2463 }
2464
2465 tplLock.lock();
2466 }
2467
2468 if (traceOn() && fOid >= 3000)
2469 {
2470 //...Casual partitioning could cause us to do no processing. In that
2471 //...case these time stamps did not get set. So we set them here.
2472 if (dlTimes.FirstReadTime().tv_sec == 0)
2473 {
2474 dlTimes.setFirstReadTime();
2475 dlTimes.setLastReadTime();
2476 dlTimes.setFirstInsertTime();
2477 }
2478
2479 dlTimes.setEndOfInputTime();
2480 }
2481
2482 ByteStream bs;
2483
2484 try
2485 {
2486 if (BPPIsAllocated)
2487 {
2488 fDec->removeDECEventListener(this);
2489 fBPP->destroyBPP(bs);
2490 fDec->write(uniqueID, bs);
2491 BPPIsAllocated = false;
2492 }
2493 }
2494 // catch and do nothing. Let it continue with the clean up and profiling
2495 catch (const std::exception& e)
2496 {
2497 cerr << "tuple-bps caught: " << e.what() << endl;
2498 }
2499 catch (...)
2500 {
2501 cerr << "tuple-bps caught unknown exception" << endl;
2502 }
2503
2504 Stats stats = fDec->getNetworkStats(uniqueID);
2505 fMsgBytesIn = stats.dataRecvd();
2506 fMsgBytesOut = stats.dataSent();
2507 fDec->removeQueue(uniqueID);
2508 tjoiners.clear();
2509
2510 lastThread = true;
2511 }
2512
2513 //@Bug 1099
2514 ridsReturned += ridsReturned_Thread;
2515 fPhysicalIO += physIO_Thread;
2516 fCacheIO += cachedIO_Thread;
2517 fBlockTouched += touchedBlocks_Thread;
2518 tplLock.unlock();
2519
2520 if (fTableOid >= 3000 && lastThread)
2521 {
2522 struct timeval tvbuf;
2523 gettimeofday(&tvbuf, 0);
2524 FIFO<boost::shared_array<uint8_t> >* pFifo = 0;
2525 uint64_t totalBlockedReadCount = 0;
2526 uint64_t totalBlockedWriteCount = 0;
2527
2528 //...Sum up the blocked FIFO reads for all input associations
2529 size_t inDlCnt = fInputJobStepAssociation.outSize();
2530
2531 for (size_t iDataList = 0; iDataList < inDlCnt; iDataList++)
2532 {
2533 pFifo = dynamic_cast<FIFO<boost::shared_array<uint8_t> > *>(
2534 fInputJobStepAssociation.outAt(iDataList)->rowGroupDL());
2535
2536 if (pFifo)
2537 {
2538 totalBlockedReadCount += pFifo->blockedReadCount();
2539 }
2540 }
2541
2542 //...Sum up the blocked FIFO writes for all output associations
2543 size_t outDlCnt = fOutputJobStepAssociation.outSize();
2544
2545 for (size_t iDataList = 0; iDataList < outDlCnt; iDataList++)
2546 {
2547 pFifo = dynamic_cast<FIFO<boost::shared_array<uint8_t> > *>(dlp);
2548
2549 if (pFifo)
2550 {
2551 totalBlockedWriteCount += pFifo->blockedWriteCount();
2552 }
2553 }
2554
2555 //...Roundoff msg byte counts to nearest KB for display
2556 uint64_t msgBytesInKB = fMsgBytesIn >> 10;
2557 uint64_t msgBytesOutKB = fMsgBytesOut >> 10;
2558
2559 if (fMsgBytesIn & 512)
2560 msgBytesInKB++;
2561
2562 if (fMsgBytesOut & 512)
2563 msgBytesOutKB++;
2564
2565 if (traceOn())
2566 {
2567 // @bug 828
2568 ostringstream logStr;
2569 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " <<
2570 JSTimeStamp::format(tvbuf) << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" <<
2571 fCacheIO << "; MsgsSent-" << msgsSent << "; MsgsRvcd-" << msgsRecvd <<
2572 "; BlocksTouched-" << fBlockTouched <<
2573 "; BlockedFifoIn/Out-" << totalBlockedReadCount <<
2574 "/" << totalBlockedWriteCount <<
2575 "; output size-" << ridsReturned << endl <<
2576 "\tPartitionBlocksEliminated-" << fNumBlksSkipped <<
2577 "; MsgBytesIn-" << msgBytesInKB << "KB" <<
2578 "; MsgBytesOut-" << msgBytesOutKB << "KB" <<
2579 "; TotalMsgs-" << totalMsgs << endl <<
2580 "\t1st read " << dlTimes.FirstReadTimeString() <<
2581 "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" <<
2582 JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) <<
2583 "s\n\tUUID " << uuids::to_string(fStepUuid) <<
2584 "\n\tQuery UUID " << uuids::to_string(queryUuid()) <<
2585 "\n\tJob completion status " << status() << endl;
2586 logEnd(logStr.str().c_str());
2587
2588 syslogReadBlockCounts(16, // exemgr sybsystem
2589 fPhysicalIO, // # blocks read from disk
2590 fCacheIO, // # blocks read from cache
2591 fNumBlksSkipped); // # casual partition block hits
2592 syslogProcessingTimes(16, // exemgr subsystem
2593 dlTimes.FirstReadTime(), // first datalist read
2594 dlTimes.LastReadTime(), // last datalist read
2595 dlTimes.FirstInsertTime(), // first datalist write
2596 dlTimes.EndOfInputTime()); // last (endOfInput) datalist write
2597 syslogEndStep(16, // exemgr subsystem
2598 totalBlockedReadCount, // blocked datalist input
2599 totalBlockedWriteCount, // blocked datalist output
2600 fMsgBytesIn, // incoming msg byte count
2601 fMsgBytesOut); // outgoing msg byte count
2602 fExtendedInfo += toString() + logStr.str();
2603 formatMiniStats();
2604 }
2605
2606 if (lastThread && fOid >= 3000)
2607 {
2608 sts.msg_type = StepTeleStats::ST_SUMMARY;
2609 sts.phy_io = fPhysicalIO;
2610 sts.cache_io = fCacheIO;
2611 sts.msg_rcv_cnt = sts.total_units_of_work = sts.units_of_work_completed = msgsRecvd;
2612 sts.cp_blocks_skipped = fNumBlksSkipped;
2613 sts.msg_bytes_in = fMsgBytesIn;
2614 sts.msg_bytes_out = fMsgBytesOut;
2615 sts.rows = ridsReturned;
2616 postStepSummaryTele(sts);
2617 }
2618
2619 if (ffirstStepType == SCAN && bop == BOP_AND && !cancelled())
2620 {
2621 cpMutex.lock();
2622 lbidList->UpdateAllPartitionInfo();
2623 cpMutex.unlock();
2624 }
2625 }
2626
2627 // Bug 3136, let mini stats to be formatted if traceOn.
2628 if (lastThread && !didEOF)
2629 dlp->endOfInput();
2630 }
2631
toString() const2632 const string TupleBPS::toString() const
2633 {
2634 ostringstream oss;
2635 oss << "TupleBPS ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId <<
2636 " tb/col:" << fTableOid << "/" << fOid;
2637
2638 if (alias().length()) oss << " alias:" << alias();
2639
2640 if (view().length()) oss << " view:" << view();
2641
2642 #if 0
2643
2644 // @bug 1282, don't have output datalist for delivery
2645 if (!fDelivery)
2646 oss << " " << omitOidInDL << fOutputJobStepAssociation.outAt(0) << showOidInDL;
2647
2648 #else
2649
2650 if (fDelivery)
2651 oss << " is del ";
2652 else
2653 oss << " not del ";
2654
2655 if (bop == BOP_OR)
2656 oss << " BOP_OR ";
2657
2658 if (fDie)
2659 oss << " aborting " << msgsSent << "/" << msgsRecvd << " " << uniqueID << " ";
2660
2661 if (fOutputJobStepAssociation.outSize() > 0)
2662 {
2663 oss << fOutputJobStepAssociation.outAt(0);
2664
2665 if (fOutputJobStepAssociation.outSize() > 1)
2666 oss << " (too many outputs?)";
2667 }
2668 else
2669 {
2670 oss << " (no outputs?)";
2671 }
2672
2673 #endif
2674 oss << " nf:" << fFilterCount;
2675 oss << " in:";
2676
2677 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
2678 {
2679 oss << fInputJobStepAssociation.outAt(i);
2680 }
2681
2682 oss << endl << " UUID: " << uuids::to_string(fStepUuid) << endl;
2683 oss << " Query UUID: " << uuids::to_string(queryUuid()) << endl;
2684 oss << " " << fBPP->toString() << endl;
2685 return oss.str();
2686 }
2687
2688 /* This exists to avoid a DBRM lookup for every rid. */
scanit(uint64_t rid)2689 inline bool TupleBPS::scanit(uint64_t rid)
2690 {
2691 uint64_t fbo;
2692 uint32_t extentIndex;
2693
2694 if (fOid < 3000)
2695 return true;
2696
2697 fbo = rid >> rpbShift;
2698 extentIndex = fbo >> divShift;
2699 return scanFlags[extentIndex] && runtimeCPFlags[extentIndex];
2700 }
2701
getFBO(uint64_t lbid)2702 uint64_t TupleBPS::getFBO(uint64_t lbid)
2703 {
2704 uint32_t i;
2705 uint64_t lastLBID;
2706
2707 for (i = 0; i < numExtents; i++)
2708 {
2709 lastLBID = scannedExtents[i].range.start + (scannedExtents[i].range.size << 10) - 1;
2710
2711 if (lbid >= (uint64_t) scannedExtents[i].range.start && lbid <= lastLBID)
2712 return (lbid - scannedExtents[i].range.start) + (i << divShift);
2713 }
2714
2715 throw logic_error("TupleBPS: didn't find the FBO?");
2716 }
2717
useJoiner(boost::shared_ptr<joiner::TupleJoiner> tj)2718 void TupleBPS::useJoiner(boost::shared_ptr<joiner::TupleJoiner> tj)
2719 {
2720 vector<boost::shared_ptr<joiner::TupleJoiner> > v;
2721 v.push_back(tj);
2722 useJoiners(v);
2723 }
2724
useJoiners(const vector<boost::shared_ptr<joiner::TupleJoiner>> & joiners)2725 void TupleBPS::useJoiners(const vector<boost::shared_ptr<joiner::TupleJoiner> >& joiners)
2726 {
2727 uint32_t i;
2728
2729 tjoiners = joiners;
2730 doJoin = (joiners.size() != 0);
2731
2732 joinerMatchesRGs.clear();
2733 smallSideCount = tjoiners.size();
2734 hasPMJoin = false;
2735 hasUMJoin = false;
2736
2737 for (i = 0; i < smallSideCount; i++)
2738 {
2739 joinerMatchesRGs.push_back(tjoiners[i]->getSmallRG());
2740
2741 if (tjoiners[i]->inPM())
2742 hasPMJoin = true;
2743 else
2744 hasUMJoin = true;
2745
2746 if (tjoiners[i]->getJoinType() & SMALLOUTER)
2747 smallOuterJoiner = i;
2748 }
2749
2750 if (hasPMJoin)
2751 fBPP->useJoiners(tjoiners);
2752 }
2753
newPMOnline(uint32_t connectionNumber)2754 void TupleBPS::newPMOnline(uint32_t connectionNumber)
2755 {
2756 ByteStream bs;
2757
2758 fBPP->createBPP(bs);
2759
2760 try
2761 {
2762 fDec->write(bs, connectionNumber);
2763
2764 if (hasPMJoin)
2765 serializeJoiner(connectionNumber);
2766 }
2767 catch (IDBExcept& e)
2768 {
2769 abort();
2770 catchHandler(e.what(), e.errorCode(), fErrorInfo, fSessionId);
2771 }
2772 }
2773
setInputRowGroup(const rowgroup::RowGroup & rg)2774 void TupleBPS::setInputRowGroup(const rowgroup::RowGroup& rg)
2775 {
2776 inputRowGroup = rg;
2777 fBPP->setInputRowGroup(rg);
2778 }
2779
setOutputRowGroup(const rowgroup::RowGroup & rg)2780 void TupleBPS::setOutputRowGroup(const rowgroup::RowGroup& rg)
2781 {
2782 outputRowGroup = rg;
2783 primRowGroup = rg;
2784 fBPP->setProjectionRowGroup(rg);
2785 checkDupOutputColumns(rg);
2786
2787 if (fe2)
2788 fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2789 }
2790
setJoinedResultRG(const rowgroup::RowGroup & rg)2791 void TupleBPS::setJoinedResultRG(const rowgroup::RowGroup& rg)
2792 {
2793 outputRowGroup = rg;
2794 checkDupOutputColumns(rg);
2795 fBPP->setJoinedRowGroup(rg);
2796
2797 if (fe2)
2798 fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2799 }
2800
2801 /* probably worthwhile to make some of these class vars */
generateJoinResultSet(const vector<vector<Row::Pointer>> & joinerOutput,Row & baseRow,const vector<shared_array<int>> & mappings,const uint32_t depth,RowGroup & outputRG,RGData & rgData,vector<RGData> * outputData,const scoped_array<Row> & smallRows,Row & joinedRow)2802 void TupleBPS::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput,
2803 Row& baseRow, const vector<shared_array<int> >& mappings, const uint32_t depth,
2804 RowGroup& outputRG, RGData& rgData, vector<RGData>* outputData, const scoped_array<Row>& smallRows,
2805 Row& joinedRow)
2806 {
2807 uint32_t i;
2808 Row& smallRow = smallRows[depth];
2809
2810 if (depth < smallSideCount - 1)
2811 {
2812 for (i = 0; i < joinerOutput[depth].size(); i++)
2813 {
2814 smallRow.setPointer(joinerOutput[depth][i]);
2815 applyMapping(mappings[depth], smallRow, &baseRow);
2816 generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1,
2817 outputRG, rgData, outputData, smallRows, joinedRow);
2818 }
2819 }
2820 else
2821 {
2822 outputRG.getRow(outputRG.getRowCount(), &joinedRow);
2823
2824 for (i = 0; i < joinerOutput[depth].size(); i++, joinedRow.nextRow(),
2825 outputRG.incRowCount())
2826 {
2827 smallRow.setPointer(joinerOutput[depth][i]);
2828
2829 if (UNLIKELY(outputRG.getRowCount() == 8192))
2830 {
2831 uint32_t dbRoot = outputRG.getDBRoot();
2832 uint64_t baseRid = outputRG.getBaseRid();
2833 outputData->push_back(rgData);
2834 rgData = RGData(outputRG);
2835 outputRG.setData(&rgData);
2836 outputRG.resetRowGroup(baseRid);
2837 outputRG.setDBRoot(dbRoot);
2838 outputRG.getRow(0, &joinedRow);
2839 }
2840
2841 applyMapping(mappings[depth], smallRow, &baseRow);
2842 copyRow(baseRow, &joinedRow);
2843 }
2844 }
2845 }
2846
getOutputRowGroup() const2847 const rowgroup::RowGroup& TupleBPS::getOutputRowGroup() const
2848 {
2849 return outputRowGroup;
2850 }
2851
setAggregateStep(const rowgroup::SP_ROWAGG_PM_t & agg,const rowgroup::RowGroup & rg)2852 void TupleBPS::setAggregateStep(const rowgroup::SP_ROWAGG_PM_t& agg, const rowgroup::RowGroup& rg)
2853 {
2854 if (rg.getColumnCount() > 0)
2855 {
2856 fAggRowGroupPm = rg;
2857 fAggregatorPm = agg;
2858
2859 fBPP->addAggregateStep(agg, rg);
2860 fBPP->setNeedRidsAtDelivery(false);
2861 }
2862 }
2863
setBOP(uint8_t op)2864 void TupleBPS::setBOP(uint8_t op)
2865 {
2866 bop = op;
2867 fBPP->setBOP(bop);
2868 }
2869
setJobInfo(const JobInfo * jobInfo)2870 void TupleBPS::setJobInfo(const JobInfo* jobInfo)
2871 {
2872 fBPP->jobInfo(jobInfo);
2873 }
2874
getEstimatedRowCount()2875 uint64_t TupleBPS::getEstimatedRowCount()
2876 {
2877 // Call function that populates the scanFlags array based on the extents that qualify based on casual partitioning.
2878 storeCasualPartitionInfo(true);
2879 // TODO: Strip out the cout below after a few days of testing.
2880 #ifdef JLF_DEBUG
2881 cout << "OID-" << fOid << " EstimatedRowCount-" << fEstimatedRows << endl;
2882 #endif
2883 return fEstimatedRows;
2884 }
2885
checkDupOutputColumns(const rowgroup::RowGroup & rg)2886 void TupleBPS::checkDupOutputColumns(const rowgroup::RowGroup& rg)
2887 {
2888 // bug 1965, find if any duplicate columns selected
2889 map<uint32_t, uint32_t> keymap; // map<unique col key, col index in the row group>
2890 dupColumns.clear();
2891 const vector<uint32_t>& keys = rg.getKeys();
2892
2893 for (uint32_t i = 0; i < keys.size(); i++)
2894 {
2895 map<uint32_t, uint32_t>::iterator j = keymap.find(keys[i]);
2896
2897 if (j == keymap.end())
2898 keymap.insert(make_pair(keys[i], i)); // map key to col index
2899 else
2900 dupColumns.push_back(make_pair(i, j->second)); // dest/src index pair
2901 }
2902 }
2903
dupOutputColumns(RGData & data,RowGroup & rg)2904 void TupleBPS::dupOutputColumns(RGData& data, RowGroup& rg)
2905 {
2906 rg.setData(&data);
2907 dupOutputColumns(rg);
2908 }
2909
dupOutputColumns(RowGroup & rg)2910 void TupleBPS::dupOutputColumns(RowGroup& rg)
2911 {
2912 Row workingRow;
2913 rg.initRow(&workingRow);
2914 rg.getRow(0, &workingRow);
2915
2916 for (uint64_t i = 0; i < rg.getRowCount(); i++)
2917 {
2918 for (uint64_t j = 0; j < dupColumns.size(); j++)
2919 workingRow.copyField(dupColumns[j].first, dupColumns[j].second);
2920
2921 workingRow.nextRow();
2922 }
2923 }
2924
stepId(uint16_t stepId)2925 void TupleBPS::stepId(uint16_t stepId)
2926 {
2927 fStepId = stepId;
2928 fBPP->setStepID(stepId);
2929 }
2930
addFcnJoinExp(const vector<execplan::SRCP> & fe)2931 void TupleBPS::addFcnJoinExp(const vector<execplan::SRCP>& fe)
2932 {
2933 if (!fe1)
2934 fe1.reset(new funcexp::FuncExpWrapper());
2935
2936 for (uint32_t i = 0; i < fe.size(); i++)
2937 fe1->addReturnedColumn(fe[i]);
2938 }
2939
addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree> & fe)2940 void TupleBPS::addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree>& fe)
2941 {
2942 if (!fe1)
2943 fe1.reset(new funcexp::FuncExpWrapper());
2944
2945 fe1->addFilter(fe);
2946 }
2947
setFE1Input(const RowGroup & feInput)2948 void TupleBPS::setFE1Input(const RowGroup& feInput)
2949 {
2950 fe1Input = feInput;
2951 }
2952
setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper> & fe,const rowgroup::RowGroup & rg,bool runFE2onPM)2953 void TupleBPS::setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper>& fe,
2954 const rowgroup::RowGroup& rg, bool runFE2onPM)
2955 {
2956 fe2 = fe;
2957 fe2Output = rg;
2958 checkDupOutputColumns(rg);
2959 fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2960 runFEonPM = runFE2onPM;
2961
2962 if (runFEonPM)
2963 fBPP->setFEGroup2(fe2, fe2Output);
2964 }
2965
setFcnExpGroup3(const vector<execplan::SRCP> & fe)2966 void TupleBPS::setFcnExpGroup3(const vector<execplan::SRCP>& fe)
2967 {
2968 if (!fe2)
2969 fe2.reset(new funcexp::FuncExpWrapper());
2970
2971 for (uint32_t i = 0; i < fe.size(); i++)
2972 fe2->addReturnedColumn(fe[i]);
2973
2974 // if this is called, there's no join, so it can always run on the PM
2975 runFEonPM = true;
2976 fBPP->setFEGroup2(fe2, fe2Output);
2977 }
2978
setFE23Output(const rowgroup::RowGroup & feOutput)2979 void TupleBPS::setFE23Output(const rowgroup::RowGroup& feOutput)
2980 {
2981 fe2Output = feOutput;
2982 checkDupOutputColumns(feOutput);
2983 fe2Mapping = makeMapping(outputRowGroup, fe2Output);
2984
2985 if (fe2 && runFEonPM)
2986 fBPP->setFEGroup2(fe2, fe2Output);
2987 }
2988
processFE2_oneRG(RowGroup & input,RowGroup & output,Row & inRow,Row & outRow,funcexp::FuncExpWrapper * local_fe)2989 void TupleBPS::processFE2_oneRG(RowGroup& input, RowGroup& output, Row& inRow,
2990 Row& outRow, funcexp::FuncExpWrapper* local_fe)
2991 {
2992 bool ret;
2993 uint32_t i;
2994
2995 output.resetRowGroup(input.getBaseRid());
2996 output.setDBRoot(input.getDBRoot());
2997 output.getRow(0, &outRow);
2998 input.getRow(0, &inRow);
2999
3000 for (i = 0; i < input.getRowCount(); i++, inRow.nextRow())
3001 {
3002 ret = local_fe->evaluate(&inRow);
3003
3004 if (ret)
3005 {
3006 applyMapping(fe2Mapping, inRow, &outRow);
3007 outRow.setRid(inRow.getRelRid());
3008 output.incRowCount();
3009 outRow.nextRow();
3010 }
3011 }
3012 }
3013
processFE2(RowGroup & input,RowGroup & output,Row & inRow,Row & outRow,vector<RGData> * rgData,funcexp::FuncExpWrapper * local_fe)3014 void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& outRow,
3015 vector<RGData>* rgData, funcexp::FuncExpWrapper* local_fe)
3016 {
3017 vector<RGData> results;
3018 RGData result;
3019 uint32_t i, j;
3020 bool ret;
3021
3022 result = RGData(output);
3023 output.setData(&result);
3024 output.resetRowGroup(-1);
3025 output.getRow(0, &outRow);
3026
3027 for (i = 0; i < rgData->size(); i++)
3028 {
3029 input.setData(&(*rgData)[i]);
3030
3031 if (output.getRowCount() == 0)
3032 {
3033 output.resetRowGroup(input.getBaseRid());
3034 output.setDBRoot(input.getDBRoot());
3035 }
3036
3037 input.getRow(0, &inRow);
3038
3039 for (j = 0; j < input.getRowCount(); j++, inRow.nextRow())
3040 {
3041 ret = local_fe->evaluate(&inRow);
3042
3043 if (ret)
3044 {
3045 applyMapping(fe2Mapping, inRow, &outRow);
3046 outRow.setRid(inRow.getRelRid());
3047 output.incRowCount();
3048 outRow.nextRow();
3049
3050 if (output.getRowCount() == 8192 ||
3051 output.getDBRoot() != input.getDBRoot() ||
3052 output.getBaseRid() != input.getBaseRid()
3053 )
3054 {
3055 results.push_back(result);
3056 result = RGData(output);
3057 output.setData(&result);
3058 output.resetRowGroup(input.getBaseRid());
3059 output.setDBRoot(input.getDBRoot());
3060 output.getRow(0, &outRow);
3061 }
3062 }
3063 }
3064 }
3065
3066 if (output.getRowCount() > 0)
3067 {
3068 results.push_back(result);
3069 }
3070
3071 rgData->swap(results);
3072 }
3073
getDeliveredRowGroup() const3074 const rowgroup::RowGroup& TupleBPS::getDeliveredRowGroup() const
3075 {
3076 if (fe2)
3077 return fe2Output;
3078
3079 return outputRowGroup;
3080 }
3081
deliverStringTableRowGroup(bool b)3082 void TupleBPS::deliverStringTableRowGroup(bool b)
3083 {
3084 if (fe2)
3085 fe2Output.setUseStringTable(b);
3086 else if (doJoin)
3087 outputRowGroup.setUseStringTable(b);
3088 else
3089 {
3090 outputRowGroup.setUseStringTable(b);
3091 primRowGroup.setUseStringTable(b);
3092 }
3093
3094 fBPP->deliverStringTableRowGroup(b);
3095 }
3096
deliverStringTableRowGroup() const3097 bool TupleBPS::deliverStringTableRowGroup() const
3098 {
3099 if (fe2)
3100 return fe2Output.usesStringTable();
3101
3102 return outputRowGroup.usesStringTable();
3103 }
3104
formatMiniStats()3105 void TupleBPS::formatMiniStats()
3106 {
3107
3108 ostringstream oss;
3109 oss << "BPS "
3110 << "PM "
3111 << alias() << " "
3112 << fTableOid << " "
3113 << fBPP->toMiniString() << " "
3114 << fPhysicalIO << " "
3115 << fCacheIO << " "
3116 << fNumBlksSkipped << " "
3117 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
3118 << ridsReturned << " ";
3119
3120 fMiniInfo += oss.str();
3121 }
3122
rgDataToDl(RGData & rgData,RowGroup & rg,RowGroupDL * dlp)3123 void TupleBPS::rgDataToDl(RGData& rgData, RowGroup& rg, RowGroupDL* dlp)
3124 {
3125 // bug 1965, populate duplicate columns if any.
3126 if (dupColumns.size() > 0)
3127 dupOutputColumns(rgData, rg);
3128
3129 dlp->insert(rgData);
3130 }
3131
3132
rgDataVecToDl(vector<RGData> & rgDatav,RowGroup & rg,RowGroupDL * dlp)3133 void TupleBPS::rgDataVecToDl(vector<RGData>& rgDatav, RowGroup& rg, RowGroupDL* dlp)
3134 {
3135 uint64_t size = rgDatav.size();
3136
3137 if (size > 0 && !cancelled())
3138 {
3139 dlMutex.lock();
3140
3141 for (uint64_t i = 0; i < size; i++)
3142 {
3143 rgDataToDl(rgDatav[i], rg, dlp);
3144 }
3145
3146 dlMutex.unlock();
3147 }
3148
3149 rgDatav.clear();
3150 }
3151
setJoinFERG(const RowGroup & rg)3152 void TupleBPS::setJoinFERG(const RowGroup& rg)
3153 {
3154 joinFERG = rg;
3155 fBPP->setJoinFERG(rg);
3156 }
3157
addCPPredicates(uint32_t OID,const vector<int64_t> & vals,bool isRange)3158 void TupleBPS::addCPPredicates(uint32_t OID, const vector<int64_t>& vals, bool isRange)
3159 {
3160
3161 if (fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP || fOid < 3000)
3162 return;
3163
3164 uint32_t i, j, k;
3165 int64_t min, max, seq;
3166 bool isValid, intersection;
3167 vector<SCommand> colCmdVec = fBPP->getFilterSteps();
3168 ColumnCommandJL* cmd;
3169
3170 for (i = 0; i < fBPP->getProjectSteps().size(); i++)
3171 colCmdVec.push_back(fBPP->getProjectSteps()[i]);
3172
3173 LBIDList ll(OID, 0);
3174
3175 /* Find the columncommand with that OID.
3176 * Check that the column type is one handled by CP.
3177 * For each extent in that OID,
3178 * grab the min & max,
3179 * OR together all of the intersection tests,
3180 * AND it with the current CP flag.
3181 */
3182
3183 for (i = 0; i < colCmdVec.size(); i++)
3184 {
3185 cmd = dynamic_cast<ColumnCommandJL*>(colCmdVec[i].get());
3186
3187 if (cmd != NULL && cmd->getOID() == OID)
3188 {
3189 if (!ll.CasualPartitionDataType(cmd->getColType().colDataType, cmd->getColType().colWidth)
3190 || cmd->isDict())
3191 return;
3192
3193 // @bug 2989, use correct extents
3194 tr1::unordered_map<int64_t, struct BRM::EMEntry>* extentsPtr = NULL;
3195 vector<struct BRM::EMEntry> extents; // in case the extents of OID is not in Map
3196
3197 // TODO: store the sorted vectors from the pcolscans/steps as a minor optimization
3198 dbrm.getExtents(OID, extents);
3199 sort(extents.begin(), extents.end(), ExtentSorter());
3200
3201 if (extentsMap.find(OID) != extentsMap.end())
3202 {
3203 extentsPtr = &extentsMap[OID];
3204 }
3205 else if (dbrm.getExtents(OID, extents) == 0)
3206 {
3207 extentsMap[OID] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
3208 tr1::unordered_map<int64_t, struct BRM::EMEntry>& mref = extentsMap[OID];
3209
3210 for (uint32_t z = 0; z < extents.size(); z++)
3211 mref[extents[z].range.start] = extents[z];
3212
3213 extentsPtr = &mref;
3214 }
3215
3216 for (j = 0; j < extents.size(); j++)
3217 {
3218 isValid = ll.GetMinMax(&min, &max, &seq, extents[j].range.start, *extentsPtr,
3219 cmd->getColType().colDataType);
3220
3221 if (isValid)
3222 {
3223 if (isRange)
3224 runtimeCPFlags[j] = ll.checkRangeOverlap(min, max, vals[0], vals[1],
3225 cmd->getColType()) && runtimeCPFlags[j];
3226 else
3227 {
3228 intersection = false;
3229
3230 for (k = 0; k < vals.size(); k++)
3231 intersection = intersection ||
3232 ll.checkSingleValue(min, max, vals[k], cmd->getColType());
3233
3234 runtimeCPFlags[j] = intersection && runtimeCPFlags[j];
3235 }
3236 }
3237 }
3238
3239 break;
3240 }
3241 }
3242 }
3243
dec(DistributedEngineComm * dec)3244 void TupleBPS::dec(DistributedEngineComm* dec)
3245 {
3246 if (fDec)
3247 fDec->removeQueue(uniqueID);
3248
3249 fDec = dec;
3250
3251 if (fDec)
3252 fDec->addQueue(uniqueID, true);
3253 }
3254
abort_nolock()3255 void TupleBPS::abort_nolock()
3256 {
3257 if (fDie)
3258 return;
3259
3260 JobStep::abort();
3261
3262 if (fDec && BPPIsAllocated)
3263 {
3264 ByteStream bs;
3265 fBPP->abortProcessing(&bs);
3266
3267 try
3268 {
3269 fDec->write(uniqueID, bs);
3270 }
3271 catch (...)
3272 {
3273 // this throws only if there are no PMs left. If there are none,
3274 // that is the cause of the abort and that will be reported to the
3275 // front-end already. Nothing to do here.
3276 }
3277
3278 BPPIsAllocated = false;
3279 fDec->shutdownQueue(uniqueID);
3280 }
3281
3282 condvarWakeupProducer.notify_all();
3283 condvar.notify_all();
3284 }
3285
abort()3286 void TupleBPS::abort()
3287 {
3288 boost::mutex::scoped_lock scoped(boost::mutex);
3289 abort_nolock();
3290 }
3291
3292 } //namespace
3293 // vim:ts=4 sw=4:
3294