1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /***********************************************************************
19 * $Id: pcolstep.cpp 9655 2013-06-25 23:08:13Z xlou $
20 *
21 *
22 ***********************************************************************/
23 #include <sstream>
24 #include <iomanip>
25 #include <algorithm>
26 //#define NDEBUG
27 #include <cassert>
28 #include <boost/thread.hpp>
29 #include <boost/thread/condition.hpp>
30 using namespace std;
31
32 #include "distributedenginecomm.h"
33 #include "elementtype.h"
34 #include "unique32generator.h"
35
36 #include "messagequeue.h"
37 using namespace messageqcpp;
38 #include "configcpp.h"
39 using namespace config;
40
41 #include "messagelog.h"
42 #include "messageobj.h"
43 #include "loggingid.h"
44 using namespace logging;
45
46 #include "calpontsystemcatalog.h"
47 #include "logicoperator.h"
48 using namespace execplan;
49
50 #include "brm.h"
51 using namespace BRM;
52
53 #include "idbcompress.h"
54 #include "jlf_common.h"
55 #include "primitivestep.h"
56
57 // #define DEBUG 1
58
59 namespace joblist
60 {
61 #if 0
62 //const uint32_t defaultProjectBlockReqLimit = 32768;
63 //const uint32_t defaultProjectBlockReqThreshold = 16384;
64 struct pColStepPrimitive
65 {
66 pColStepPrimitive(pColStep* pColStep) : fPColStep(pColStep)
67 {}
68 pColStep* fPColStep;
69 void operator()()
70 {
71 try
72 {
73 fPColStep->sendPrimitiveMessages();
74 }
75 catch (exception& re)
76 {
77 cerr << "pColStep: send thread threw an exception: " << re.what() <<
78 "\t" << this << endl;
79 }
80 }
81 };
82
83 struct pColStepAggregator
84 {
85 pColStepAggregator(pColStep* pColStep) : fPColStepCol(pColStep)
86 {}
87 pColStep* fPColStepCol;
88 void operator()()
89 {
90 try
91 {
92 fPColStepCol->receivePrimitiveMessages();
93 }
94 catch (exception& re)
95 {
96 cerr << fPColStepCol->toString() << ": recv thread threw an exception: " << re.what() << endl;
97 }
98 }
99 };
100 #endif
101
pColStep(CalpontSystemCatalog::OID o,CalpontSystemCatalog::OID t,const CalpontSystemCatalog::ColType & ct,const JobInfo & jobInfo)102 pColStep::pColStep(
103 CalpontSystemCatalog::OID o,
104 CalpontSystemCatalog::OID t,
105 const CalpontSystemCatalog::ColType& ct,
106 const JobInfo& jobInfo) :
107 JobStep(jobInfo),
108 fRm(jobInfo.rm),
109 sysCat(jobInfo.csc),
110 fOid(o),
111 fTableOid(t),
112 fColType(ct),
113 fFilterCount(0),
114 fBOP(BOP_NONE),
115 ridList(0),
116 msgsSent(0),
117 msgsRecvd(0),
118 finishedSending(false),
119 recvWaiting(false),
120 fIsDict(false),
121 isEM(jobInfo.isExeMgr),
122 ridCount(0),
123 fFlushInterval(jobInfo.flushInterval),
124 fSwallowRows(false),
125 fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()),
126 fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()),
127 fStopSending(false),
128 isFilterFeeder(false),
129 fPhysicalIO(0),
130 fCacheIO(0),
131 fNumBlksSkipped(0),
132 fMsgBytesIn(0),
133 fMsgBytesOut(0)
134 {
135 if (fTableOid == 0) // cross engine support
136 return;
137
138 int err, i;
139 uint32_t mask;
140
141 if (fFlushInterval == 0 || !isEM)
142 fOutputType = OT_BOTH;
143 else
144 fOutputType = OT_TOKEN;
145
146 if (fOid < 1000)
147 throw runtime_error("pColStep: invalid column");
148
149 compress::IDBCompressInterface cmpif;
150
151 if (!cmpif.isCompressionAvail(fColType.compressionType))
152 {
153 ostringstream oss;
154 oss << "Unsupported compression type " << fColType.compressionType;
155 oss << " for " << sysCat->colName(fOid);
156 #ifdef SKIP_IDB_COMPRESSION
157 oss << ". It looks you're running Community binaries on an Enterprise database.";
158 #endif
159 throw runtime_error(oss.str());
160 }
161
162 realWidth = fColType.colWidth;
163
164 if ( fColType.colDataType == CalpontSystemCatalog::VARCHAR )
165 {
166 if (8 > fColType.colWidth && 4 <= fColType.colWidth )
167 fColType.colDataType = CalpontSystemCatalog::CHAR;
168
169 fColType.colWidth++;
170 }
171
172 //If this is a dictionary column, fudge the numbers...
173 if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY)
174 || (fColType.colDataType == CalpontSystemCatalog::BLOB)
175 || (fColType.colDataType == CalpontSystemCatalog::TEXT))
176 {
177 fColType.colWidth = 8;
178 fIsDict = true;
179 }
180 else if (fColType.colWidth > 8 )
181 {
182 fColType.colWidth = 8;
183 fIsDict = true;
184 //TODO: is this right?
185 fColType.colDataType = CalpontSystemCatalog::VARCHAR;
186 }
187
188 //Round colWidth up
189 if (fColType.colWidth == 3)
190 fColType.colWidth = 4;
191 else if (fColType.colWidth == 5 || fColType.colWidth == 6 || fColType.colWidth == 7)
192 fColType.colWidth = 8;
193
194 idbassert(fColType.colWidth > 0);
195 ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
196
197 /* calculate some shortcuts for extent and block based arithmetic */
198 extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
199
200 for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
201 {
202 mask <<= 1;
203 modMask = (modMask << 1) | 1;
204
205 if (extentSize & mask)
206 {
207 divShift = i;
208 break;
209 }
210 }
211
212 for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
213 if (extentSize & mask)
214 throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
215
216 /* calculate shortcuts for rid-based arithmetic */
217 for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
218 {
219 mask <<= 1;
220 rpbMask = (rpbMask << 1) | 1;
221
222 if (ridsPerBlock & mask)
223 {
224 rpbShift = i;
225 break;
226 }
227 }
228
229 for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
230 if (ridsPerBlock & mask)
231 throw runtime_error("pColStep: Block size and column width must be a power of 2");
232
233 for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
234 {
235 if (mask == BLOCK_SIZE)
236 {
237 blockSizeShift = i;
238 break;
239 }
240
241 mask <<= 1;
242 }
243
244 if (i == 32)
245 throw runtime_error("pColStep: Block size must be a power of 2");
246
247 err = dbrm.getExtents(o, extents);
248
249 if (err)
250 {
251 ostringstream os;
252 os << "pColStep: BRM lookup error. Could not get extents for OID " << o;
253 throw runtime_error(os.str());
254 }
255
256 if (fOid > 3000)
257 {
258 lbidList.reset(new LBIDList(fOid, 0));
259 }
260
261 sort(extents.begin(), extents.end(), ExtentSorter());
262 numExtents = extents.size();
263 // uniqueID = UniqueNumberGenerator::instance()->getUnique32();
264 // if (fDec)
265 // fDec->addQueue(uniqueID);
266 // initializeConfigParms ( );
267 }
268
pColStep(const pColScanStep & rhs)269 pColStep::pColStep(const pColScanStep& rhs) :
270 JobStep(rhs),
271 fRm(rhs.resourceManager()),
272 fOid(rhs.oid()),
273 fTableOid(rhs.tableOid()),
274 fColType(rhs.colType()),
275 fFilterCount(rhs.filterCount()),
276 fBOP(rhs.BOP()),
277 ridList(0),
278 fFilterString(rhs.filterString()),
279 msgsSent(0),
280 msgsRecvd(0),
281 finishedSending(false),
282 recvWaiting(false),
283 fIsDict(rhs.isDictCol()),
284 ridCount(0),
285 // Per Cindy, it's save to put fFlushInterval to be 0
286 fFlushInterval(0),
287 fSwallowRows(false),
288 fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()),
289 fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()),
290 fStopSending(false),
291 fPhysicalIO(0),
292 fCacheIO(0),
293 fNumBlksSkipped(0),
294 fMsgBytesIn(0),
295 fMsgBytesOut(0),
296 fFilters(rhs.getFilters())
297 {
298 int err, i;
299 uint32_t mask;
300
301 if (fTableOid == 0) // cross engine support
302 return;
303
304 if (fOid < 1000)
305 throw runtime_error("pColStep: invalid column");
306
307 ridsPerBlock = rhs.getRidsPerBlock();
308 /* calculate some shortcuts for extent and block based arithmetic */
309 extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
310
311 for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
312 {
313 mask <<= 1;
314 modMask = (modMask << 1) | 1;
315
316 if (extentSize & mask)
317 {
318 divShift = i;
319 break;
320 }
321 }
322
323 for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
324 if (extentSize & mask)
325 throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
326
327 /* calculate shortcuts for rid-based arithmetic */
328 for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
329 {
330 mask <<= 1;
331 rpbMask = (rpbMask << 1) | 1;
332
333 if (ridsPerBlock & mask)
334 {
335 rpbShift = i;
336 break;
337 }
338 }
339
340 for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
341 if (ridsPerBlock & mask)
342 throw runtime_error("pColStep: Block size and column width must be a power of 2");
343
344 for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
345 {
346 if (mask == BLOCK_SIZE)
347 {
348 blockSizeShift = i;
349 break;
350 }
351
352 mask <<= 1;
353 }
354
355 if (i == 32)
356 throw runtime_error("pColStep: Block size must be a power of 2");
357
358 err = dbrm.getExtents(fOid, extents);
359
360 if (err)
361 {
362 ostringstream os;
363 os << "pColStep: BRM lookup error. Could not get extents for OID " << fOid;
364 throw runtime_error(os.str());
365 }
366
367 lbidList = rhs.getlbidList();
368
369 sort(extents.begin(), extents.end(), ExtentSorter());
370 numExtents = extents.size();
371
372 fOnClauseFilter = rhs.onClauseFilter();
373
374 // uniqueID = UniqueNumberGenerator::instance()->getUnique32();
375 // if (fDec)
376 // fDec->addQueue(uniqueID);
377 // initializeConfigParms ( );
378 }
379
pColStep(const PassThruStep & rhs)380 pColStep::pColStep(const PassThruStep& rhs) :
381 JobStep(rhs),
382 fRm(rhs.resourceManager()),
383 fOid(rhs.oid()),
384 fTableOid(rhs.tableOid()),
385 fColType(rhs.colType()),
386 fFilterCount(0),
387 fBOP(BOP_NONE),
388 ridList(0),
389 msgsSent(0),
390 msgsRecvd(0),
391 finishedSending(false),
392 recvWaiting(false),
393 fIsDict(rhs.isDictCol()),
394 ridCount(0),
395 // Per Cindy, it's save to put fFlushInterval to be 0
396 fFlushInterval(0),
397 fSwallowRows(false),
398 fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()),
399 fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()),
400 fStopSending(false),
401 fPhysicalIO(0),
402 fCacheIO(0),
403 fNumBlksSkipped(0),
404 fMsgBytesIn(0),
405 fMsgBytesOut(0)
406 {
407 int err, i;
408 uint32_t mask;
409
410 if (fTableOid == 0) // cross engine support
411 return;
412
413 if (fOid < 1000)
414 throw runtime_error("pColStep: invalid column");
415
416 ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
417 /* calculate some shortcuts for extent and block based arithmetic */
418 extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
419
420 for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
421 {
422 mask <<= 1;
423 modMask = (modMask << 1) | 1;
424
425 if (extentSize & mask)
426 {
427 divShift = i;
428 break;
429 }
430 }
431
432 for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
433 if (extentSize & mask)
434 throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
435
436 /* calculate shortcuts for rid-based arithmetic */
437 for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
438 {
439 mask <<= 1;
440 rpbMask = (rpbMask << 1) | 1;
441
442 if (ridsPerBlock & mask)
443 {
444 rpbShift = i;
445 break;
446 }
447 }
448
449 for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
450 if (ridsPerBlock & mask)
451 throw runtime_error("pColStep: Block size and column width must be a power of 2");
452
453 for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
454 {
455 if (mask == BLOCK_SIZE)
456 {
457 blockSizeShift = i;
458 break;
459 }
460
461 mask <<= 1;
462 }
463
464 if (i == 32)
465 throw runtime_error("pColStep: Block size must be a power of 2");
466
467 err = dbrm.getExtents(fOid, extents);
468
469 if (err)
470 {
471 ostringstream os;
472 os << "pColStep: BRM lookup error. Could not get extents for OID " << fOid;
473 throw runtime_error(os.str());
474 }
475
476 sort(extents.begin(), extents.end(), ExtentSorter());
477 numExtents = extents.size();
478 // uniqueID = UniqueNumberGenerator::instance()->getUnique32();
479 // if (fDec)
480 // fDec->addQueue(uniqueID);
481 // initializeConfigParms ( );
482 }
483
~pColStep()484 pColStep::~pColStep()
485 {
486 // join?
487 //delete lbidList;
488 // if (fDec)
489 // fDec->removeQueue(uniqueID);
490 }
491
492 //------------------------------------------------------------------------------
493 // Initialize configurable parameters
494 //------------------------------------------------------------------------------
initializeConfigParms()495 void pColStep::initializeConfigParms()
496 {
497
498 // const string section ( "JobList" );
499 // const string sendLimitName ( "ProjectBlockReqLimit" );
500 // const string sendThresholdName ( "ProjectBlockReqThreshold" );
501 // Config* cf = Config::makeConfig();
502 //
503 // string strVal;
504 // uint64_t numVal;
505
506 //...Get the tuning parameters that throttle msgs sent to primproc
507 //...fFilterRowReqLimit puts a cap on how many rids we will request from
508 //... primproc, before pausing to let the consumer thread catch up.
509 //... Without this limit, there is a chance that PrimProc could flood
510 //... ExeMgr with thousands of messages that will consume massive
511 //... amounts of memory for a 100 gigabyte database.
512 //...fFilterRowReqThreshhold is the level at which the number of outstanding
513 //... rids must fall below, before the producer can send more rids.
514
515 // strVal = cf->getConfig(section, sendLimitName);
516 // if (strVal.size() > 0)
517 // {
518 // errno = 0;
519 // numVal = Config::uFromText(strVal);
520 // if ( errno == 0 )
521 // fProjectBlockReqLimit = (uint32_t)numVal;
522 // }
523 //
524 // strVal = cf->getConfig(section, sendThresholdName);
525 // if (strVal.size() > 0)
526 // {
527 // errno = 0;
528 // numVal = Config::uFromText(strVal);
529 // if ( errno == 0 )
530 // fProjectBlockReqThreshold = (uint32_t)numVal;
531 // }
532 }
533
startPrimitiveThread()534 void pColStep::startPrimitiveThread()
535 {
536 // pThread.reset(new boost::thread(pColStepPrimitive(this)));
537 }
538
startAggregationThread()539 void pColStep::startAggregationThread()
540 {
541 // cThread.reset(new boost::thread(pColStepAggregator(this)));
542 }
543
run()544 void pColStep::run()
545 {
546 // if (traceOn())
547 // {
548 // syslogStartStep(16, // exemgr subsystem
549 // std::string("pColStep")); // step name
550 // }
551 //
552 // size_t sz = fInputJobStepAssociation.outSize();
553 // idbassert(sz > 0);
554 // const AnyDataListSPtr& dl = fInputJobStepAssociation.outAt(0);
555 // DataList_t* dlp = dl->dataList();
556 // DataList<StringElementType>* strDlp = dl->stringDataList();
557 // if ( dlp )
558 // setRidList(dlp);
559 // else
560 // {
561 // setStrRidList( strDlp );
562 // }
563 // //Sort can be set through the jobstep or the input JSA if fFlushinterval is 0
564 // fToSort = (fFlushInterval) ? 0 : (!fToSort) ? fInputJobStepAssociation.toSort() : fToSort;
565 // fToSort = 0;
566 // //pthread_mutex_init(&mutex, NULL);
567 // //pthread_cond_init(&condvar, NULL);
568 // //pthread_cond_init(&flushed, NULL);
569 // startPrimitiveThread();
570 // startAggregationThread();
571 }
572
join()573 void pColStep::join()
574 {
575 // pThread->join();
576 // cThread->join();
577 // //pthread_mutex_destroy(&mutex);
578 // //pthread_cond_destroy(&condvar);
579 // //pthread_cond_destroy(&flushed);
580 }
581
addFilter(int8_t COP,float value)582 void pColStep::addFilter(int8_t COP, float value)
583 {
584 fFilterString << (uint8_t) COP;
585 fFilterString << (uint8_t) 0;
586 fFilterString << *((uint32_t*) &value);
587 fFilterCount++;
588 }
589
addFilter(int8_t COP,int64_t value,uint8_t roundFlag)590 void pColStep::addFilter(int8_t COP, int64_t value, uint8_t roundFlag)
591 {
592 int8_t tmp8;
593 int16_t tmp16;
594 int32_t tmp32;
595
596 fFilterString << (uint8_t) COP;
597 fFilterString << roundFlag;
598
599 // converts to a type of the appropriate width, then bitwise
600 // copies into the filter ByteStream
601 switch (fColType.colWidth)
602 {
603 case 1:
604 tmp8 = value;
605 fFilterString << *((uint8_t*) &tmp8);
606 break;
607
608 case 2:
609 tmp16 = value;
610 fFilterString << *((uint16_t*) &tmp16);
611 break;
612
613 case 4:
614 tmp32 = value;
615 fFilterString << *((uint32_t*) &tmp32);
616 break;
617
618 case 8:
619 fFilterString << *((uint64_t*) &value);
620 break;
621
622 default:
623 ostringstream o;
624
625 o << "pColStep: CalpontSystemCatalog says OID " << fOid <<
626 " has a width of " << fColType.colWidth;
627 throw runtime_error(o.str());
628 }
629
630 fFilterCount++;
631 }
632
setRidList(DataList<ElementType> * dl)633 void pColStep::setRidList(DataList<ElementType>* dl)
634 {
635 ridList = dl;
636 }
637
setStrRidList(DataList<StringElementType> * strDl)638 void pColStep::setStrRidList(DataList<StringElementType>* strDl)
639 {
640 strRidList = strDl;
641 }
642
setBOP(int8_t b)643 void pColStep::setBOP(int8_t b)
644 {
645 fBOP = b;
646 }
647
setOutputType(int8_t OutputType)648 void pColStep::setOutputType(int8_t OutputType)
649 {
650 fOutputType = OutputType;
651 }
652
setSwallowRows(const bool swallowRows)653 void pColStep::setSwallowRows(const bool swallowRows)
654 {
655 fSwallowRows = swallowRows;
656 }
657
sendPrimitiveMessages()658 void pColStep::sendPrimitiveMessages()
659 {
660 // int it = -1;
661 // int msgRidCount = 0;
662 // int ridListIdx = 0;
663 // bool more = false;
664 // uint64_t absoluteRID = 0;
665 // int64_t msgLBID = -1;
666 // int64_t nextLBID = -1;
667 // int64_t msgLargeBlock = -1;
668 // int64_t nextLargeBlock = -1;
669 // uint16_t blockRelativeRID;
670 // uint32_t msgCount = 0;
671 // uint32_t sentBlockCount = 0;
672 // int msgsSkip=0;
673 // bool scan=false;
674 // bool scanThisBlock=false;
675 // ElementType e;
676 // UintRowGroup rw;
677 // StringElementType strE;
678 // StringRowGroup strRw;
679 //
680 // ByteStream msgRidList;
681 // ByteStream primMsg(MAX_BUFFER_SIZE); //the MAX_BUFFER_SIZE as of 8/20
682 //
683 // NewColRequestHeader hdr;
684 //
685 // AnyDataListSPtr dl;
686 // FifoDataList *fifo = NULL;
687 // StringFifoDataList* strFifo = NULL;
688 //
689 // const bool ignoreCP = ((fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP) != 0);
690 //
691 // //The presence of more than 1 input DL means we (probably) have a pDictionaryScan step feeding this step
692 // // a list of tokens to get the rids for. Convert the input tokens to a filter string. We also have a rid
693 // // list as the second input dl
694 // if (fInputJobStepAssociation.outSize() > 1)
695 // {
696 // addFilters();
697 // if (fTableOid >= 3000)
698 // cout << toString() << endl;
699 // //If we got no input rids (as opposed to no input DL at all) then there were no matching rows from
700 // // the previous step, so this step should not return any rows either. This would be the case, for
701 // // instance, if P_NAME LIKE '%xxxx%' produced no signature matches.
702 // if (fFilterCount == 0)
703 // {
704 // goto done;
705 // }
706 // }
707 //
708 // // determine which ranges/extents to eliminate from this step
709 //
710 //#ifdef DEBUG
711 // if (fOid>=3000)
712 // cout << "oid " << fOid << endl;
713 //#endif
714 //
715 // scanFlags.resize(numExtents);
716 //
717 // for (uint32_t idx=0; idx <numExtents; idx++)
718 // {
719 // if (extents[idx].partition.cprange.isValid != BRM::CP_VALID) {
720 // scanFlags[idx]=1;
721 // }
722 // else
723 // {
724 //
725 // bool flag = lbidList->CasualPartitionPredicate(
726 // extents[idx].partition.cprange.lo_val,
727 // extents[idx].partition.cprange.hi_val,
728 // &fFilterString,
729 // fFilterCount,
730 // fColType,
731 // fBOP) || ignoreCP;
732 // scanFlags[idx]=flag;
733 //#ifdef DEBUG
734 // if (fOid >= 3000 && flushInterval == 0)
735 // cout << (flag ? " will scan " : " will not scan ")
736 // << "extent with range " << extents[idx].partition.cprange.lo_val
737 // << "-" << extents[idx].partition.cprange.hi_val << endl;
738 //#endif
739 //
740 // }
741 //
742 //// if (fOid>=3000)
743 //// cout << " " << scanFlags[idx];
744 // }
745 //// if (scanFlags.size()>0)
746 //// cout << endl;
747 //
748 // // If there was more than 1 input DL, the first is a list of filters and the second is a list of rids,
749 // // otherwise the first is the list of rids.
750 // if (fInputJobStepAssociation.outSize() > 1)
751 // ridListIdx = 1;
752 // else
753 // ridListIdx = 0;
754 //
755 // dl = fInputJobStepAssociation.outAt(ridListIdx);
756 // ridList = dl->dataList();
757 // if ( ridList )
758 // {
759 // fifo = dl->fifoDL();
760 //
761 // if (fifo)
762 // it = fifo->getIterator();
763 // else
764 // it = ridList->getIterator();
765 // }
766 // else
767 // {
768 // strRidList = dl->stringDataList();
769 // strFifo = dl->stringDL();
770 //
771 // if (strFifo)
772 // it = strFifo->getIterator();
773 // else
774 // it = strRidList->getIterator();
775 // }
776 //
777 // if (ridList)
778 // {
779 // if (fifo)
780 // {
781 // more = fifo->next(it, &rw);
782 // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
783 // dlTimes.setFirstReadTime();
784 // }
785 // absoluteRID = rw.et[0].first;
786 // }
787 // else
788 // {
789 // more = ridList->next(it, &e);
790 // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
791 // dlTimes.setFirstReadTime();
792 // }
793 // absoluteRID = e.first;
794 // rw.count = 1;
795 // }
796 // }
797 // else
798 // {
799 // if (strFifo)
800 // {
801 // more = strFifo->next(it, &strRw);
802 // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
803 // dlTimes.setFirstReadTime();
804 // }
805 // absoluteRID = strRw.et[0].first;
806 // }
807 // else
808 // {
809 // more = strRidList->next(it, &strE);
810 // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
811 // dlTimes.setFirstReadTime();
812 // }
813 // absoluteRID = strE.first;
814 // strRw.count = 1;
815 // }
816 // }
817 //
818 // if (more)
819 // msgLBID = getLBID(absoluteRID, scan);
820 // scanThisBlock = scan;
821 // msgLargeBlock = absoluteRID >> blockSizeShift;
822 //
823 // while (more || msgRidCount > 0) {
824 // uint64_t rwCount;
825 // if ( ridList)
826 // rwCount = rw.count;
827 // else
828 // rwCount = strRw.count;
829 //
830 // for (uint64_t i = 0; ((i < rwCount) || (!more && msgRidCount > 0)); )
831 // {
832 // if ( ridList)
833 // {
834 // if (fifo)
835 // absoluteRID = rw.et[i].first;
836 // else
837 // absoluteRID = e.first;
838 // }
839 // else
840 // {
841 // if (strFifo)
842 // absoluteRID = strRw.et[i].first;
843 // else
844 // absoluteRID = strE.first;
845 // }
846 //
847 // if (more) {
848 // nextLBID = getLBID(absoluteRID, scan);
849 // nextLargeBlock = absoluteRID >> blockSizeShift;
850 // }
851 //
852 // //XXXPAT: need to prove N & S here
853 // if (nextLBID == msgLBID && more) {
854 //// blockRelativeRID = absoluteRID % ridsPerBlock;
855 // blockRelativeRID = absoluteRID & rpbMask;
856 // msgRidList << blockRelativeRID;
857 // msgRidCount++;
858 // ++i;
859 // }
860 // else {
861 // //Bug 831: move building msg after the check of scanThisBlock
862 // if (scanThisBlock==true)
863 // {
864 // hdr.ism.Interleave=0;
865 // hdr.ism.Flags=planFlagsToPrimFlags(fTraceFlags);
866 // hdr.ism.Command=COL_BY_SCAN;
867 // hdr.ism.Size=sizeof(NewColRequestHeader) + fFilterString.length() +
868 // msgRidList.length();
869 // hdr.ism.Type=2;
870 //
871 // hdr.hdr.SessionID = fSessionId;
872 // //hdr.hdr.StatementID = 0;
873 // hdr.hdr.TransactionID = fTxnId;
874 // hdr.hdr.VerID = fVerId;
875 // hdr.hdr.StepID = fStepId;
876 // hdr.hdr.UniqueID = uniqueID;
877 //
878 // hdr.LBID = msgLBID;
879 //// idbassert(hdr.LBID >= 0);
880 // hdr.DataSize = fColType.colWidth;
881 // hdr.DataType = fColType.colDataType;
882 // hdr.CompType = fColType.compressionType;
883 // hdr.OutputType = fOutputType;
884 // hdr.BOP = fBOP;
885 // hdr.NOPS = fFilterCount;
886 // hdr.NVALS = msgRidCount;
887 // hdr.sort = fToSort;
888 //
889 // primMsg.append((const uint8_t *) &hdr, sizeof(NewColRequestHeader));
890 // primMsg += fFilterString;
891 // primMsg += msgRidList;
892 // ridCount += msgRidCount;
893 // ++sentBlockCount;
894 //
895 //#ifdef DEBUG
896 // if (flushInterval == 0 && fOid >= 3000)
897 // cout << "sending a prim msg for LBID " << msgLBID << endl;
898 //#endif
899 // ++msgCount;
900 //// cout << "made a primitive\n";
901 // if (msgLargeBlock != nextLargeBlock || !more) {
902 //// cout << "writing " << msgCount << " primitives\n";
903 // fMsgBytesOut += primMsg.lengthWithHdrOverhead();
904 // fDec->write(primMsg);
905 // msgsSent += msgCount;
906 // msgCount = 0;
907 // primMsg.restart();
908 // msgLargeBlock = nextLargeBlock;
909 //
910 // // @bug 769 - Added "&& !fSwallowRows" condition below to fix problem with
911 // // caltraceon(16) not working for tpch01 and some other queries. If a query
912 // // ever held off requesting more blocks, it would lock and never finish.
913 // //Bug 815
914 // if (( sentBlockCount >= fProjectBlockReqLimit) && !fSwallowRows &&
915 // (( msgsSent - msgsRecvd) > fProjectBlockReqThreshold))
916 // {
917 // mutex.lock(); //pthread_mutex_lock(&mutex);
918 // fStopSending = true;
919 //
920 // // @bug 836. Wake up the receiver if he's sleeping.
921 // if (recvWaiting)
922 // condvar.notify_one(); //pthread_cond_signal(&condvar);
923 // flushed.wait(mutex); //pthread_cond_wait(&flushed, &mutex);
924 // fStopSending = false;
925 // mutex.unlock(); //pthread_mutex_unlock(&mutex);
926 // sentBlockCount = 0;
927 // }
928 // }
929 // }
930 // else
931 // {
932 // msgsSkip++;
933 // }
934 // msgLBID = nextLBID;
935 // msgRidList.restart();
936 // msgRidCount = 0;
937 //
938 // mutex.lock(); //pthread_mutex_lock(&mutex);
939 //
940 // if (scanThisBlock) {
941 // if (recvWaiting)
942 // condvar.notify_one(); //pthread_cond_signal(&condvar);
943 // #ifdef DEBUG
944 //// cout << "msgsSent++ = " << msgsSent << endl;
945 // #endif
946 // }
947 // scanThisBlock = scan;
948 // mutex.unlock(); //pthread_mutex_unlock(&mutex);
949 //
950 // // break the for loop
951 // if (!more)
952 // break;
953 // }
954 // } // for rw.count
955 //
956 // if (more)
957 // {
958 // if ( ridList )
959 // {
960 // if (fifo)
961 // {
962 // rw.count = 0;
963 // more = fifo->next(it, &rw);
964 // }
965 // else
966 // {
967 // rw.count = 1;
968 // more = ridList->next(it, &e);
969 // }
970 // }
971 // else
972 // {
973 // if (strFifo)
974 // {
975 // strRw.count = 0;
976 // more = strFifo->next(it, &strRw);
977 // }
978 // else
979 // {
980 // strRw.count = 1;
981 // more = strRidList->next(it, &strE);
982 // }
983 // }
984 // }
985 // }
986 //
987 // if (fOid>=3000) dlTimes.setLastReadTime();
988 //
989 //done:
990 // mutex.lock(); //pthread_mutex_lock(&mutex);
991 // finishedSending = true;
992 // if (recvWaiting)
993 // condvar.notify_one(); //pthread_cond_signal(&condvar);
994 // mutex.unlock(); //pthread_mutex_unlock(&mutex);
995 //
996 //#ifdef DEBUG
997 // if (fOid >=3000)
998 // cout << "pColStep msgSent "
999 // << msgsSent << "/" << msgsSkip
1000 // << " rids " << ridCount
1001 // << " oid " << fOid << " " << msgLBID << endl;
1002 //#endif
1003 // //...Track the number of LBIDs we skip due to Casual Partioning.
1004 // fNumBlksSkipped += msgsSkip;
1005 }
1006
receivePrimitiveMessages()1007 void pColStep::receivePrimitiveMessages()
1008 {
1009 // int64_t ridResults = 0;
1010 // AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
1011 // DataList_t* dlp = dl->dataList();
1012 // uint64_t fbo;
1013 // FifoDataList *fifo = dl->fifoDL();
1014 // UintRowGroup rw;
1015 // uint64_t ridBase;
1016 // boost::shared_ptr<ByteStream> bs;
1017 // uint32_t i = 0, length;
1018 //
1019 // while (1) {
1020 // // sync with the send side
1021 // mutex.lock(); //pthread_mutex_lock(&mutex);
1022 // while (!finishedSending && msgsSent == msgsRecvd) {
1023 // recvWaiting = true;
1024 // #ifdef DEBUG
1025 // cout << "c sleeping" << endl;
1026 // #endif
1027 // // @bug 836. Wake up the sender if he's sleeping.
1028 // if (fStopSending)
1029 // flushed.notify_one(); //pthread_cond_signal(&flushed);
1030 // condvar.wait(mutex); //pthread_cond_wait(&condvar, &mutex);
1031 // #ifdef DEBUG
1032 // cout << "c waking" << endl;
1033 // #endif
1034 // recvWaiting = false;
1035 // }
1036 // if (msgsSent == msgsRecvd) {
1037 // mutex.unlock(); //pthread_mutex_unlock(&mutex);
1038 // break;
1039 // }
1040 // mutex.unlock(); //pthread_mutex_unlock(&mutex);
1041 //
1042 // // do the recv
1043 // fDec->read(uniqueID, bs);
1044 // fMsgBytesIn += bs->lengthWithHdrOverhead();
1045 //
1046 // // no more messages, and buffered messages should be already processed by now.
1047 // if (bs->length() == 0) break;
1048 //
1049 // #ifdef DEBUG
1050 // cout << "msgsRecvd++ = " << msgsRecvd << ". RidResults = " << ridResults << endl;
1051 // cout << "Got a ColResultHeader!: " << bs.length() << " bytes" << endl;
1052 // #endif
1053 //
1054 // const ByteStream::byte* bsp = bs->buf();
1055 //
1056 // // get the ISMPacketHeader out of the bytestream
1057 // //const ISMPacketHeader* ism = reinterpret_cast<const ISMPacketHeader*>(bsp);
1058 //
1059 // // get the ColumnResultHeader out of the bytestream
1060 // const ColResultHeader* crh = reinterpret_cast<const ColResultHeader*>
1061 // (&bsp[sizeof(ISMPacketHeader)]);
1062 //
1063 // bool firstRead = true;
1064 // length = bs->length();
1065 //
1066 // i = 0;
1067 // uint32_t msgCount = 0;
1068 // while (i < length) {
1069 // ++msgCount;
1070 //
1071 // i += sizeof(ISMPacketHeader);
1072 // crh = reinterpret_cast<const ColResultHeader*>(&bsp[i]);
1073 // // double check the sequence number is increased by one each time
1074 // i += sizeof(ColResultHeader);
1075 //
1076 // fCacheIO += crh->CacheIO;
1077 // fPhysicalIO += crh->PhysicalIO;
1078 //
1079 // // From this point on the rest of the bytestream is the data that comes back from the primitive server
1080 // // This needs to be fed to a datalist that is retrieved from the outputassociation object.
1081 //
1082 // fbo = getFBO(crh->LBID);
1083 // ridBase = fbo << rpbShift;
1084 //
1085 // #ifdef DEBUG
1086 //// cout << " NVALS = " << crh->NVALS << " fbo = " << fbo << " lbid = " << crh->LBID << endl;
1087 // #endif
1088 //
1089 // //Check output type
1090 // if ( fOutputType == OT_RID )
1091 // {
1092 // ridResults += crh->NVALS;
1093 // }
1094 //
1095 // /* XXXPAT: This clause is executed when ExeMgr calls the new nextBand(BS) fcn.
1096 //
1097 // TODO: both classes have to agree
1098 // on which nextBand() variant will be called. pColStep
1099 // currently has to infer that from flushInterval and the
1100 // Table OID. It would be better to have a more explicit form
1101 // of agreement.
1102 //
1103 // The goal of the nextBand(BS) fcn is to avoid iterating over
1104 // every row except at unserialization. This clause copies
1105 // the raw results from the PrimProc response directly into
1106 // the memory used for the ElementType array. DeliveryStep
1107 // will also treat the ElementType array as raw memory and
1108 // serialize that. TableColumn now parses the packed data
1109 // instead of whole ElementTypes.
1110 // */
1111 // else if (fOutputType == OT_TOKEN && fFlushInterval > 0 && !fIsDict) {
1112 //
1113 // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0)
1114 // dlTimes.setFirstInsertTime();
1115 // ridResults += crh->NVALS;
1116 //
1117 // /* memcpy the bytestream into the output set */
1118 // uint32_t toCopy, bsPos = 0;
1119 // uint8_t *pos;
1120 // while (bsPos < crh->NVALS) {
1121 // toCopy = (crh->NVALS - bsPos > rw.ElementsPerGroup - rw.count ?
1122 // rw.ElementsPerGroup - rw.count : crh->NVALS - bsPos);
1123 // pos = ((uint8_t *) &rw.et[0]) + (rw.count * fColType.colWidth);
1124 // memcpy(pos, &bsp[i], toCopy * fColType.colWidth);
1125 // bsPos += toCopy;
1126 // i += toCopy * fColType.colWidth;
1127 // rw.count += toCopy;
1128 // if (rw.count == rw.ElementsPerGroup) {
1129 // if (!fSwallowRows)
1130 // fifo->insert(rw);
1131 // rw.count = 0;
1132 // }
1133 // }
1134 // }
1135 // else if ( fOutputType == OT_TOKEN)
1136 // {
1137 // uint64_t dv;
1138 // uint64_t rid;
1139 //
1140 // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0)
1141 // dlTimes.setFirstInsertTime();
1142 // ridResults += crh->NVALS;
1143 // for(int j = 0; j < crh->NVALS; ++j)
1144 // {
1145 // // XXXPAT: Only use this when the RID doesn't matter or when
1146 // // the response contains every row.
1147 //
1148 // rid = j + ridBase;
1149 // switch (fColType.colWidth) {
1150 // case 8: dv = *((const uint64_t *) &bsp[i]); i += 8; break;
1151 // case 4: dv = *((const uint32_t *) &bsp[i]); i += 4; break;
1152 // case 2: dv = *((const uint16_t *) &bsp[i]); i += 2; break;
1153 // case 1: dv = *((const uint8_t *) &bsp[i]); ++i; break;
1154 // default:
1155 // throw runtime_error("pColStep: invalid column width!");
1156 // }
1157 //
1158 // // @bug 663 - Don't output any rows if fSwallowRows (caltraceon(16)) is on.
1159 // // This options swallows rows in the project steps.
1160 // if (!fSwallowRows)
1161 // {
1162 // if (fifo)
1163 // {
1164 // rw.et[rw.count].first = rid;
1165 // rw.et[rw.count++].second = dv;
1166 // if (rw.count == rw.ElementsPerGroup)
1167 // {
1168 // fifo->insert(rw);
1169 // rw.count = 0;
1170 // }
1171 // }
1172 // else
1173 // {
1174 // dlp->insert(ElementType(rid, dv));
1175 // }
1176 // #ifdef DEBUG
1177 // //cout << " -- inserting <" << rid << ", " << dv << "> " << *prid << endl;
1178 // #endif
1179 // }
1180 // }
1181 // }
1182 // else if ( fOutputType == OT_BOTH )
1183 // {
1184 // ridResults += crh->NVALS;
1185 // for(int j = 0; j < crh->NVALS; ++j)
1186 // {
1187 // uint64_t dv;
1188 // uint64_t rid;
1189 //
1190 // rid = *((const uint16_t *) &bsp[i]) + ridBase;
1191 // i += sizeof(uint16_t);
1192 // switch (fColType.colWidth) {
1193 // case 8: dv = *((const uint64_t *) &bsp[i]); i += 8; break;
1194 // case 4: dv = *((const uint32_t *) &bsp[i]); i += 4; break;
1195 // case 2: dv = *((const uint16_t *) &bsp[i]); i += 2; break;
1196 // case 1: dv = *((const uint8_t *) &bsp[i]); ++i; break;
1197 // default:
1198 // throw runtime_error("pColStep: invalid column width!");
1199 // }
1200 //
1201 // // @bug 663 - Don't output any rows if fSwallowRows (caltraceon(16)) is on.
1202 // // This options swallows rows in the project steps.
1203 // if (!fSwallowRows) {
1204 // if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0)
1205 // dlTimes.setFirstInsertTime();
1206 // if(fifo)
1207 // {
1208 //// rw.et[rw.count++] = ElementType(rid, dv);
1209 // rw.et[rw.count].first = rid;
1210 // rw.et[rw.count++].second = dv;
1211 // if (rw.count == rw.ElementsPerGroup)
1212 // {
1213 // fifo->insert(rw);
1214 // rw.count = 0;
1215 // }
1216 // }
1217 // else
1218 // {
1219 // dlp->insert(ElementType(rid, dv));
1220 // }
1221 // #ifdef DEBUG
1222 // //cout << " -- inserting <" << rid << ", " << dv << "> " << *prid << endl;
1223 // #endif
1224 // }
1225 // }
1226 // }
1227 // } // unpacking the BS
1228 //
1229 // //Bug 815: Check whether we have enough to process
1230 // //++lockCount;
1231 // mutex.lock(); //pthread_mutex_lock(&mutex);
1232 // if ( fStopSending && ((msgsSent - msgsRecvd ) <= fProjectBlockReqThreshold) )
1233 // {
1234 // flushed.notify_one(); //pthread_cond_signal(&flushed);
1235 // }
1236 // mutex.unlock(); //pthread_mutex_unlock(&mutex);
1237 //
1238 // firstRead = false;
1239 // msgsRecvd += msgCount;
1240 // } // read loop
1241 // // done reading
1242 //
1243 // if (fifo && rw.count > 0)
1244 // fifo->insert(rw);
1245 //
1246 // //...Casual partitioning could cause us to do no processing. In that
1247 // //...case these time stamps did not get set. So we set them here.
1248 // if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
1249 // dlTimes.setFirstReadTime();
1250 // dlTimes.setLastReadTime();
1251 // dlTimes.setFirstInsertTime();
1252 // }
1253 // if (fOid>=3000) dlTimes.setEndOfInputTime();
1254 //
1255 // //@bug 699: Reset StepMsgQueue
1256 // fDec->removeQueue(uniqueID);
1257 //
1258 // if (fifo)
1259 // fifo->endOfInput();
1260 // else
1261 // dlp->endOfInput();
1262 //
1263 // if (fTableOid >= 3000)
1264 // {
1265 // //...Construct timestamp using ctime_r() instead of ctime() not
1266 // //...necessarily due to re-entrancy, but because we want to strip
1267 // //...the newline ('\n') off the end of the formatted string.
1268 // time_t t = time(0);
1269 // char timeString[50];
1270 // ctime_r(&t, timeString);
1271 // timeString[strlen(timeString)-1 ] = '\0';
1272 //
1273 // FifoDataList* pFifo = 0;
1274 // uint64_t totalBlockedReadCount = 0;
1275 // uint64_t totalBlockedWriteCount = 0;
1276 //
1277 // //...Sum up the blocked FIFO reads for all input associations
1278 // size_t inDlCnt = fInputJobStepAssociation.outSize();
1279 // for (size_t iDataList=0; iDataList<inDlCnt; iDataList++)
1280 // {
1281 // pFifo = fInputJobStepAssociation.outAt(iDataList)->fifoDL();
1282 // if (pFifo)
1283 // {
1284 // totalBlockedReadCount += pFifo->blockedReadCount();
1285 // }
1286 // }
1287 //
1288 // //...Sum up the blocked FIFO writes for all output associations
1289 // size_t outDlCnt = fOutputJobStepAssociation.outSize();
1290 // for (size_t iDataList=0; iDataList<outDlCnt; iDataList++)
1291 // {
1292 // pFifo = fOutputJobStepAssociation.outAt(iDataList)->fifoDL();
1293 // if (pFifo)
1294 // {
1295 // totalBlockedWriteCount += pFifo->blockedWriteCount();
1296 // }
1297 // }
1298 //
1299 // //...Roundoff msg byte counts to nearest KB for display
1300 // uint64_t msgBytesInKB = fMsgBytesIn >> 10;
1301 // uint64_t msgBytesOutKB = fMsgBytesOut >> 10;
1302 // if (fMsgBytesIn & 512)
1303 // msgBytesInKB++;
1304 // if (fMsgBytesOut & 512)
1305 // msgBytesOutKB++;
1306 //
1307 // // @bug 828
1308 // if (fifo)
1309 // fifo->totalSize(ridResults);
1310 //
1311 // if (traceOn())
1312 // {
1313 // //...Print job step completion information
1314 // ostringstream logStr;
1315 // logStr << "ses:" << fSessionId <<
1316 // " st: " << fStepId << " finished at " <<
1317 // timeString << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" <<
1318 // fCacheIO << "; MsgsRvcd-" << msgsRecvd <<
1319 // "; BlockedFifoIn/Out-" << totalBlockedReadCount <<
1320 // "/" << totalBlockedWriteCount <<
1321 // "; output size-" << ridResults << endl <<
1322 // "\tPartitionBlocksEliminated-" << fNumBlksSkipped <<
1323 // "; MsgBytesIn-" << msgBytesInKB << "KB" <<
1324 // "; MsgBytesOut-" << msgBytesOutKB << "KB" << endl <<
1325 // "\t1st read " << dlTimes.FirstReadTimeString() <<
1326 // "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" <<
1327 // JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(),dlTimes.FirstReadTime()) <<
1328 // "s" << endl;
1329 //
1330 // logEnd(logStr.str().c_str());
1331 //
1332 // syslogReadBlockCounts(16, // exemgr sybsystem
1333 // fPhysicalIO, // # blocks read from disk
1334 // fCacheIO, // # blocks read from cache
1335 // fNumBlksSkipped); // # casual partition block hits
1336 // syslogProcessingTimes(16, // exemgr subsystem
1337 // dlTimes.FirstReadTime(), // first datalist read
1338 // dlTimes.LastReadTime(), // last datalist read
1339 // dlTimes.FirstInsertTime(), // first datalist write
1340 // dlTimes.EndOfInputTime()); // last (endOfInput) datalist write
1341 // syslogEndStep(16, // exemgr subsystem
1342 // totalBlockedReadCount, // blocked datalist input
1343 // totalBlockedWriteCount, // blocked datalist output
1344 // fMsgBytesIn, // incoming msg byte count
1345 // fMsgBytesOut); // outgoing msg byte count
1346 // }
1347 // }
1348 }
1349
toString() const1350 const string pColStep::toString() const
1351 {
1352 ostringstream oss;
1353 oss << "pColStep ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId <<
1354 " tb/col:" << fTableOid << "/" << fOid;
1355
1356 if (alias().length()) oss << " alias:" << alias();
1357
1358 if (view().length()) oss << " view:" << view();
1359
1360 if (fOutputJobStepAssociation.outSize() > 0)
1361 oss << " " << omitOidInDL
1362 << fOutputJobStepAssociation.outAt(0) << showOidInDL;
1363 else
1364 oss << " (no output yet)";
1365
1366 oss << " nf:" << fFilterCount;
1367 oss << " in:";
1368
1369 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
1370 {
1371 oss << fInputJobStepAssociation.outAt(i) << ", ";
1372 }
1373
1374 if (fSwallowRows)
1375 oss << " (sink)";
1376
1377 return oss.str();
1378 }
1379
addFilters()1380 void pColStep::addFilters()
1381 {
1382 AnyDataListSPtr dl = fInputJobStepAssociation.outAt(0);
1383 DataList_t* bdl = dl->dataList();
1384 FifoDataList* fifo = fInputJobStepAssociation.outAt(0)->fifoDL();
1385
1386 idbassert(bdl);
1387 int it = -1;
1388 bool more;
1389 ElementType e;
1390 int64_t token;
1391
1392 if (fifo != NULL)
1393 {
1394 try
1395 {
1396 it = fifo->getIterator();
1397 }
1398 catch (exception& ex)
1399 {
1400 cerr << "pColStep::addFilters: caught exception: " << ex.what() << " stepno: " <<
1401 fStepId << endl;
1402 }
1403 catch (...)
1404 {
1405 cerr << "pColStep::addFilters: caught exception" << endl;
1406 }
1407
1408 fBOP = BOP_OR;
1409 UintRowGroup rw;
1410
1411 more = fifo->next(it, &rw);
1412
1413 while (more)
1414 {
1415 for (uint64_t i = 0; i < rw.count; ++i)
1416 addFilter(COMPARE_EQ, (int64_t) rw.et[i].second);
1417
1418 more = fifo->next(it, &rw);
1419 }
1420 }
1421 else
1422 {
1423 try
1424 {
1425 it = bdl->getIterator();
1426 }
1427 catch (exception& ex)
1428 {
1429 cerr << "pColStep::addFilters: caught exception: " << ex.what() << " stepno: " <<
1430 fStepId << endl;
1431 }
1432 catch (...)
1433 {
1434 cerr << "pColStep::addFilters: caught exception" << endl;
1435 }
1436
1437 fBOP = BOP_OR;
1438
1439 more = bdl->next(it, &e);
1440
1441 while (more)
1442 {
1443 token = e.second;
1444 addFilter(COMPARE_EQ, token);
1445
1446 more = bdl->next(it, &e);
1447 }
1448 }
1449
1450 return;
1451 }
1452
1453 /* This exists to avoid a DBRM lookup for every rid. */
getLBID(uint64_t rid,bool & scan)1454 inline uint64_t pColStep::getLBID(uint64_t rid, bool& scan)
1455 {
1456 uint32_t extentIndex, extentOffset;
1457 uint64_t fbo;
1458 fbo = rid >> rpbShift;
1459 extentIndex = fbo >> divShift;
1460 extentOffset = fbo & modMask;
1461 scan = (scanFlags[extentIndex] != 0);
1462 return extents[extentIndex].range.start + extentOffset;
1463 }
1464
getFBO(uint64_t lbid)1465 inline uint64_t pColStep::getFBO(uint64_t lbid)
1466 {
1467 uint32_t i;
1468 uint64_t lastLBID;
1469
1470 for (i = 0; i < numExtents; i++)
1471 {
1472 lastLBID = extents[i].range.start + (extents[i].range.size << 10) - 1;
1473
1474 if (lbid >= (uint64_t) extents[i].range.start && lbid <= lastLBID)
1475 return (lbid - extents[i].range.start) + (i << divShift);
1476 }
1477
1478 cerr << "pColStep: didn't find the FBO?\n";
1479 throw logic_error("pColStep: didn't find the FBO?");
1480 }
1481
1482
appendFilter(const messageqcpp::ByteStream & filter,unsigned count)1483 void pColStep::appendFilter(const messageqcpp::ByteStream& filter, unsigned count)
1484 {
1485 fFilterString += filter;
1486 fFilterCount += count;
1487 }
1488
1489
addFilter(const Filter * f)1490 void pColStep::addFilter(const Filter* f)
1491 {
1492 if (NULL != f)
1493 fFilters.push_back(f);
1494 }
1495
1496
appendFilter(const std::vector<const execplan::Filter * > & fs)1497 void pColStep::appendFilter(const std::vector<const execplan::Filter*>& fs)
1498 {
1499 fFilters.insert(fFilters.end(), fs.begin(), fs.end());
1500 }
1501
1502 } //namespace
1503 // vim:ts=4 sw=4:
1504
1505