1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /***********************************************************************
19 *   $Id: pcolstep.cpp 9655 2013-06-25 23:08:13Z xlou $
20 *
21 *
22 ***********************************************************************/
23 #include <sstream>
24 #include <iomanip>
25 #include <algorithm>
26 //#define NDEBUG
27 #include <cassert>
28 #include <boost/thread.hpp>
29 #include <boost/thread/condition.hpp>
30 using namespace std;
31 
32 #include "distributedenginecomm.h"
33 #include "elementtype.h"
34 #include "unique32generator.h"
35 
36 #include "messagequeue.h"
37 using namespace messageqcpp;
38 #include "configcpp.h"
39 using namespace config;
40 
41 #include "messagelog.h"
42 #include "messageobj.h"
43 #include "loggingid.h"
44 using namespace logging;
45 
46 #include "calpontsystemcatalog.h"
47 #include "logicoperator.h"
48 using namespace execplan;
49 
50 #include "brm.h"
51 using namespace BRM;
52 
53 #include "idbcompress.h"
54 #include "jlf_common.h"
55 #include "primitivestep.h"
56 
57 // #define DEBUG 1
58 
59 namespace joblist
60 {
61 #if 0
62 //const uint32_t defaultProjectBlockReqLimit     = 32768;
63 //const uint32_t defaultProjectBlockReqThreshold =  16384;
64 struct pColStepPrimitive
65 {
66     pColStepPrimitive(pColStep* pColStep) : fPColStep(pColStep)
67     {}
68     pColStep* fPColStep;
69     void operator()()
70     {
71         try
72         {
73             fPColStep->sendPrimitiveMessages();
74         }
75         catch (exception& re)
76         {
77             cerr << "pColStep: send thread threw an exception: " << re.what() <<
78                  "\t" << this << endl;
79         }
80     }
81 };
82 
83 struct pColStepAggregator
84 {
85     pColStepAggregator(pColStep* pColStep) : fPColStepCol(pColStep)
86     {}
87     pColStep* fPColStepCol;
88     void operator()()
89     {
90         try
91         {
92             fPColStepCol->receivePrimitiveMessages();
93         }
94         catch (exception& re)
95         {
96             cerr << fPColStepCol->toString() << ": recv thread threw an exception: " << re.what() << endl;
97         }
98     }
99 };
100 #endif
101 
pColStep(CalpontSystemCatalog::OID o,CalpontSystemCatalog::OID t,const CalpontSystemCatalog::ColType & ct,const JobInfo & jobInfo)102 pColStep::pColStep(
103     CalpontSystemCatalog::OID o,
104     CalpontSystemCatalog::OID t,
105     const CalpontSystemCatalog::ColType& ct,
106     const JobInfo& jobInfo) :
107     JobStep(jobInfo),
108     fRm(jobInfo.rm),
109     sysCat(jobInfo.csc),
110     fOid(o),
111     fTableOid(t),
112     fColType(ct),
113     fFilterCount(0),
114     fBOP(BOP_NONE),
115     ridList(0),
116     msgsSent(0),
117     msgsRecvd(0),
118     finishedSending(false),
119     recvWaiting(false),
120     fIsDict(false),
121     isEM(jobInfo.isExeMgr),
122     ridCount(0),
123     fFlushInterval(jobInfo.flushInterval),
124     fSwallowRows(false),
125     fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()),
126     fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()),
127     fStopSending(false),
128     isFilterFeeder(false),
129     fPhysicalIO(0),
130     fCacheIO(0),
131     fNumBlksSkipped(0),
132     fMsgBytesIn(0),
133     fMsgBytesOut(0)
134 {
135     if (fTableOid == 0) // cross engine support
136         return;
137 
138     int err, i;
139     uint32_t mask;
140 
141     if (fFlushInterval == 0 || !isEM)
142         fOutputType = OT_BOTH;
143     else
144         fOutputType = OT_TOKEN;
145 
146     if (fOid < 1000)
147         throw runtime_error("pColStep: invalid column");
148 
149     compress::IDBCompressInterface cmpif;
150 
151     if (!cmpif.isCompressionAvail(fColType.compressionType))
152     {
153         ostringstream oss;
154         oss << "Unsupported compression type " << fColType.compressionType;
155         oss << " for " << sysCat->colName(fOid);
156 #ifdef SKIP_IDB_COMPRESSION
157         oss << ". It looks you're running Community binaries on an Enterprise database.";
158 #endif
159         throw runtime_error(oss.str());
160     }
161 
162     realWidth = fColType.colWidth;
163 
164     if ( fColType.colDataType == CalpontSystemCatalog::VARCHAR )
165     {
166         if (8 > fColType.colWidth && 4 <= fColType.colWidth )
167             fColType.colDataType = CalpontSystemCatalog::CHAR;
168 
169         fColType.colWidth++;
170     }
171 
172     //If this is a dictionary column, fudge the numbers...
173     if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY)
174             || (fColType.colDataType == CalpontSystemCatalog::BLOB)
175             || (fColType.colDataType == CalpontSystemCatalog::TEXT))
176     {
177         fColType.colWidth = 8;
178         fIsDict = true;
179     }
180     else if (fColType.colWidth > 8 )
181     {
182         fColType.colWidth = 8;
183         fIsDict = true;
184         //TODO: is this right?
185         fColType.colDataType = CalpontSystemCatalog::VARCHAR;
186     }
187 
188     //Round colWidth up
189     if (fColType.colWidth == 3)
190         fColType.colWidth = 4;
191     else if (fColType.colWidth == 5 || fColType.colWidth == 6 || fColType.colWidth == 7)
192         fColType.colWidth = 8;
193 
194     idbassert(fColType.colWidth > 0);
195     ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
196 
197     /* calculate some shortcuts for extent and block based arithmetic */
198     extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
199 
200     for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
201     {
202         mask <<= 1;
203         modMask = (modMask << 1) | 1;
204 
205         if (extentSize & mask)
206         {
207             divShift = i;
208             break;
209         }
210     }
211 
212     for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
213         if (extentSize & mask)
214             throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
215 
216     /* calculate shortcuts for rid-based arithmetic */
217     for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
218     {
219         mask <<= 1;
220         rpbMask = (rpbMask << 1) | 1;
221 
222         if (ridsPerBlock & mask)
223         {
224             rpbShift = i;
225             break;
226         }
227     }
228 
229     for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
230         if (ridsPerBlock & mask)
231             throw runtime_error("pColStep: Block size and column width must be a power of 2");
232 
233     for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
234     {
235         if (mask == BLOCK_SIZE)
236         {
237             blockSizeShift = i;
238             break;
239         }
240 
241         mask <<= 1;
242     }
243 
244     if (i == 32)
245         throw runtime_error("pColStep: Block size must be a power of 2");
246 
247     err = dbrm.getExtents(o, extents);
248 
249     if (err)
250     {
251         ostringstream os;
252         os << "pColStep: BRM lookup error. Could not get extents for OID " << o;
253         throw runtime_error(os.str());
254     }
255 
256     if (fOid > 3000)
257     {
258         lbidList.reset(new LBIDList(fOid, 0));
259     }
260 
261     sort(extents.begin(), extents.end(), ExtentSorter());
262     numExtents = extents.size();
263 //	uniqueID = UniqueNumberGenerator::instance()->getUnique32();
264 //	if (fDec)
265 //		fDec->addQueue(uniqueID);
266 // 	initializeConfigParms ( );
267 }
268 
pColStep(const pColScanStep & rhs)269 pColStep::pColStep(const pColScanStep& rhs) :
270     JobStep(rhs),
271     fRm(rhs.resourceManager()),
272     fOid(rhs.oid()),
273     fTableOid(rhs.tableOid()),
274     fColType(rhs.colType()),
275     fFilterCount(rhs.filterCount()),
276     fBOP(rhs.BOP()),
277     ridList(0),
278     fFilterString(rhs.filterString()),
279     msgsSent(0),
280     msgsRecvd(0),
281     finishedSending(false),
282     recvWaiting(false),
283     fIsDict(rhs.isDictCol()),
284     ridCount(0),
285     // Per Cindy, it's save to put fFlushInterval to be 0
286     fFlushInterval(0),
287     fSwallowRows(false),
288     fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()),
289     fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()),
290     fStopSending(false),
291     fPhysicalIO(0),
292     fCacheIO(0),
293     fNumBlksSkipped(0),
294     fMsgBytesIn(0),
295     fMsgBytesOut(0),
296     fFilters(rhs.getFilters())
297 {
298     int err, i;
299     uint32_t mask;
300 
301     if (fTableOid == 0)  // cross engine support
302         return;
303 
304     if (fOid < 1000)
305         throw runtime_error("pColStep: invalid column");
306 
307     ridsPerBlock = rhs.getRidsPerBlock();
308     /* calculate some shortcuts for extent and block based arithmetic */
309     extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
310 
311     for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
312     {
313         mask <<= 1;
314         modMask = (modMask << 1) | 1;
315 
316         if (extentSize & mask)
317         {
318             divShift = i;
319             break;
320         }
321     }
322 
323     for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
324         if (extentSize & mask)
325             throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
326 
327     /* calculate shortcuts for rid-based arithmetic */
328     for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
329     {
330         mask <<= 1;
331         rpbMask = (rpbMask << 1) | 1;
332 
333         if (ridsPerBlock & mask)
334         {
335             rpbShift = i;
336             break;
337         }
338     }
339 
340     for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
341         if (ridsPerBlock & mask)
342             throw runtime_error("pColStep: Block size and column width must be a power of 2");
343 
344     for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
345     {
346         if (mask == BLOCK_SIZE)
347         {
348             blockSizeShift = i;
349             break;
350         }
351 
352         mask <<= 1;
353     }
354 
355     if (i == 32)
356         throw runtime_error("pColStep: Block size must be a power of 2");
357 
358     err = dbrm.getExtents(fOid, extents);
359 
360     if (err)
361     {
362         ostringstream os;
363         os << "pColStep: BRM lookup error. Could not get extents for OID " << fOid;
364         throw runtime_error(os.str());
365     }
366 
367     lbidList = rhs.getlbidList();
368 
369     sort(extents.begin(), extents.end(), ExtentSorter());
370     numExtents = extents.size();
371 
372     fOnClauseFilter = rhs.onClauseFilter();
373 
374 //	uniqueID = UniqueNumberGenerator::instance()->getUnique32();
375 //	if (fDec)
376 //		fDec->addQueue(uniqueID);
377 // 	initializeConfigParms ( );
378 }
379 
pColStep(const PassThruStep & rhs)380 pColStep::pColStep(const PassThruStep& rhs) :
381     JobStep(rhs),
382     fRm(rhs.resourceManager()),
383     fOid(rhs.oid()),
384     fTableOid(rhs.tableOid()),
385     fColType(rhs.colType()),
386     fFilterCount(0),
387     fBOP(BOP_NONE),
388     ridList(0),
389     msgsSent(0),
390     msgsRecvd(0),
391     finishedSending(false),
392     recvWaiting(false),
393     fIsDict(rhs.isDictCol()),
394     ridCount(0),
395     // Per Cindy, it's save to put fFlushInterval to be 0
396     fFlushInterval(0),
397     fSwallowRows(false),
398     fProjectBlockReqLimit(fRm->getJlProjectBlockReqLimit()),
399     fProjectBlockReqThreshold(fRm->getJlProjectBlockReqThreshold()),
400     fStopSending(false),
401     fPhysicalIO(0),
402     fCacheIO(0),
403     fNumBlksSkipped(0),
404     fMsgBytesIn(0),
405     fMsgBytesOut(0)
406 {
407     int err, i;
408     uint32_t mask;
409 
410     if (fTableOid == 0)  // cross engine support
411         return;
412 
413     if (fOid < 1000)
414         throw runtime_error("pColStep: invalid column");
415 
416     ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
417     /* calculate some shortcuts for extent and block based arithmetic */
418     extentSize = (fRm->getExtentRows() * fColType.colWidth) / BLOCK_SIZE;
419 
420     for (i = 1, mask = 1, modMask = 0; i <= 32; i++)
421     {
422         mask <<= 1;
423         modMask = (modMask << 1) | 1;
424 
425         if (extentSize & mask)
426         {
427             divShift = i;
428             break;
429         }
430     }
431 
432     for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
433         if (extentSize & mask)
434             throw runtime_error("pColStep: Extent size must be a power of 2 in blocks");
435 
436     /* calculate shortcuts for rid-based arithmetic */
437     for (i = 1, mask = 1, rpbMask = 0; i <= 32; i++)
438     {
439         mask <<= 1;
440         rpbMask = (rpbMask << 1) | 1;
441 
442         if (ridsPerBlock & mask)
443         {
444             rpbShift = i;
445             break;
446         }
447     }
448 
449     for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
450         if (ridsPerBlock & mask)
451             throw runtime_error("pColStep: Block size and column width must be a power of 2");
452 
453     for (i = 0, mask = 1, blockSizeShift = 0; i < 32; i++)
454     {
455         if (mask == BLOCK_SIZE)
456         {
457             blockSizeShift = i;
458             break;
459         }
460 
461         mask <<= 1;
462     }
463 
464     if (i == 32)
465         throw runtime_error("pColStep: Block size must be a power of 2");
466 
467     err = dbrm.getExtents(fOid, extents);
468 
469     if (err)
470     {
471         ostringstream os;
472         os << "pColStep: BRM lookup error. Could not get extents for OID " << fOid;
473         throw runtime_error(os.str());
474     }
475 
476     sort(extents.begin(), extents.end(), ExtentSorter());
477     numExtents = extents.size();
478 //	uniqueID = UniqueNumberGenerator::instance()->getUnique32();
479 //	if (fDec)
480 //		fDec->addQueue(uniqueID);
481 // 	initializeConfigParms ( );
482 }
483 
~pColStep()484 pColStep::~pColStep()
485 {
486     // join?
487     //delete lbidList;
488 //	if (fDec)
489 //		fDec->removeQueue(uniqueID);
490 }
491 
492 //------------------------------------------------------------------------------
493 // Initialize configurable parameters
494 //------------------------------------------------------------------------------
initializeConfigParms()495 void pColStep::initializeConfigParms()
496 {
497 
498 // 	const string section           ( "JobList" );
499 // 	const string sendLimitName     ( "ProjectBlockReqLimit" );
500 // 	const string sendThresholdName ( "ProjectBlockReqThreshold" );
501 // 	Config* cf = Config::makeConfig();
502 //
503 // 	string        strVal;
504 // 	uint64_t numVal;
505 
506     //...Get the tuning parameters that throttle msgs sent to primproc
507     //...fFilterRowReqLimit puts a cap on how many rids we will request from
508     //...    primproc, before pausing to let the consumer thread catch up.
509     //...    Without this limit, there is a chance that PrimProc could flood
510     //...    ExeMgr with thousands of messages that will consume massive
511     //...    amounts of memory for a 100 gigabyte database.
512     //...fFilterRowReqThreshhold is the level at which the number of outstanding
513     //...    rids must fall below, before the producer can send more rids.
514 
515 // 	strVal = cf->getConfig(section, sendLimitName);
516 // 	if (strVal.size() > 0)
517 // 	{
518 // 		errno  = 0;
519 // 		numVal = Config::uFromText(strVal);
520 // 		if ( errno == 0 )
521 // 			fProjectBlockReqLimit     = (uint32_t)numVal;
522 // 	}
523 //
524 // 	strVal = cf->getConfig(section, sendThresholdName);
525 // 	if (strVal.size() > 0)
526 // 	{
527 // 		errno  = 0;
528 // 		numVal = Config::uFromText(strVal);
529 // 		if ( errno == 0 )
530 // 			fProjectBlockReqThreshold = (uint32_t)numVal;
531 // 	}
532 }
533 
startPrimitiveThread()534 void pColStep::startPrimitiveThread()
535 {
536 //	pThread.reset(new boost::thread(pColStepPrimitive(this)));
537 }
538 
startAggregationThread()539 void pColStep::startAggregationThread()
540 {
541 //	cThread.reset(new boost::thread(pColStepAggregator(this)));
542 }
543 
run()544 void pColStep::run()
545 {
546 //	if (traceOn())
547 //	{
548 //		syslogStartStep(16,           // exemgr subsystem
549 //			std::string("pColStep")); // step name
550 //	}
551 //
552 //	size_t sz = fInputJobStepAssociation.outSize();
553 //	idbassert(sz > 0);
554 //	const AnyDataListSPtr& dl = fInputJobStepAssociation.outAt(0);
555 //	DataList_t* dlp = dl->dataList();
556 //	DataList<StringElementType>* strDlp = dl->stringDataList();
557 //	if ( dlp )
558 //		setRidList(dlp);
559 //	else
560 //	{
561 //		setStrRidList( strDlp );
562 //	}
563 //	//Sort can be set through the jobstep or the input JSA if fFlushinterval is 0
564 //	fToSort = (fFlushInterval) ? 0 : (!fToSort) ? fInputJobStepAssociation.toSort() : fToSort;
565 //	fToSort = 0;
566 //	//pthread_mutex_init(&mutex, NULL);
567 //	//pthread_cond_init(&condvar, NULL);
568 //	//pthread_cond_init(&flushed, NULL);
569 //	startPrimitiveThread();
570 //	startAggregationThread();
571 }
572 
join()573 void pColStep::join()
574 {
575 //	pThread->join();
576 //	cThread->join();
577 //	//pthread_mutex_destroy(&mutex);
578 //	//pthread_cond_destroy(&condvar);
579 //	//pthread_cond_destroy(&flushed);
580 }
581 
addFilter(int8_t COP,float value)582 void pColStep::addFilter(int8_t COP, float value)
583 {
584     fFilterString << (uint8_t) COP;
585     fFilterString << (uint8_t) 0;
586     fFilterString << *((uint32_t*) &value);
587     fFilterCount++;
588 }
589 
addFilter(int8_t COP,int64_t value,uint8_t roundFlag)590 void pColStep::addFilter(int8_t COP, int64_t value, uint8_t roundFlag)
591 {
592     int8_t tmp8;
593     int16_t tmp16;
594     int32_t tmp32;
595 
596     fFilterString << (uint8_t) COP;
597     fFilterString << roundFlag;
598 
599     // converts to a type of the appropriate width, then bitwise
600     // copies into the filter ByteStream
601     switch (fColType.colWidth)
602     {
603         case 1:
604             tmp8 = value;
605             fFilterString << *((uint8_t*) &tmp8);
606             break;
607 
608         case 2:
609             tmp16 = value;
610             fFilterString << *((uint16_t*) &tmp16);
611             break;
612 
613         case 4:
614             tmp32 = value;
615             fFilterString << *((uint32_t*) &tmp32);
616             break;
617 
618         case 8:
619             fFilterString << *((uint64_t*) &value);
620             break;
621 
622         default:
623             ostringstream o;
624 
625             o << "pColStep: CalpontSystemCatalog says OID " << fOid <<
626               " has a width of " << fColType.colWidth;
627             throw runtime_error(o.str());
628     }
629 
630     fFilterCount++;
631 }
632 
setRidList(DataList<ElementType> * dl)633 void pColStep::setRidList(DataList<ElementType>* dl)
634 {
635     ridList = dl;
636 }
637 
setStrRidList(DataList<StringElementType> * strDl)638 void pColStep::setStrRidList(DataList<StringElementType>* strDl)
639 {
640     strRidList = strDl;
641 }
642 
setBOP(int8_t b)643 void pColStep::setBOP(int8_t b)
644 {
645     fBOP = b;
646 }
647 
setOutputType(int8_t OutputType)648 void pColStep::setOutputType(int8_t OutputType)
649 {
650     fOutputType = OutputType;
651 }
652 
setSwallowRows(const bool swallowRows)653 void pColStep::setSwallowRows(const bool swallowRows)
654 {
655     fSwallowRows = swallowRows;
656 }
657 
sendPrimitiveMessages()658 void pColStep::sendPrimitiveMessages()
659 {
660 //	int it = -1;
661 //	int msgRidCount = 0;
662 //	int ridListIdx = 0;
663 //	bool more = false;
664 //	uint64_t absoluteRID = 0;
665 //	int64_t msgLBID = -1;
666 //	int64_t nextLBID = -1;
667 //	int64_t msgLargeBlock = -1;
668 //	int64_t nextLargeBlock = -1;
669 //	uint16_t blockRelativeRID;
670 //	uint32_t msgCount = 0;
671 //	uint32_t sentBlockCount = 0;
672 //	int msgsSkip=0;
673 //	bool scan=false;
674 //	bool scanThisBlock=false;
675 //	ElementType e;
676 //	UintRowGroup rw;
677 //	StringElementType strE;
678 //	StringRowGroup strRw;
679 //
680 //	ByteStream msgRidList;
681 //	ByteStream primMsg(MAX_BUFFER_SIZE);   //the MAX_BUFFER_SIZE as of 8/20
682 //
683 //	NewColRequestHeader hdr;
684 //
685 //	AnyDataListSPtr dl;
686 //	FifoDataList *fifo = NULL;
687 //	StringFifoDataList* strFifo = NULL;
688 //
689 //    const bool ignoreCP = ((fTraceFlags & CalpontSelectExecutionPlan::IGNORE_CP) != 0);
690 //
691 //	//The presence of more than 1 input DL means we (probably) have a pDictionaryScan step feeding this step
692 //	// a list of tokens to get the rids for. Convert the input tokens to a filter string. We also have a rid
693 //	// list as the second input dl
694 //	if (fInputJobStepAssociation.outSize() > 1)
695 //	{
696 //		addFilters();
697 //		if (fTableOid >= 3000)
698 //			cout << toString() << endl;
699 //		//If we got no input rids (as opposed to no input DL at all) then there were no matching rows from
700 //		//  the previous step, so this step should not return any rows either. This would be the case, for
701 //		//  instance, if P_NAME LIKE '%xxxx%' produced no signature matches.
702 //		if (fFilterCount == 0)
703 //		{
704 //			goto done;
705 //		}
706 //	}
707 //
708 //	// determine which ranges/extents to eliminate from this step
709 //
710 //#ifdef DEBUG
711 //	if (fOid>=3000)
712 //		cout << "oid " << fOid << endl;
713 //#endif
714 //
715 //	scanFlags.resize(numExtents);
716 //
717 //	for (uint32_t idx=0; idx <numExtents; idx++)
718 //	{
719 //		if (extents[idx].partition.cprange.isValid != BRM::CP_VALID) {
720 //			scanFlags[idx]=1;
721 //		}
722 //		else
723 //		{
724 //
725 //		bool flag = lbidList->CasualPartitionPredicate(
726 //											extents[idx].partition.cprange.lo_val,
727 //											extents[idx].partition.cprange.hi_val,
728 //											&fFilterString,
729 //                                            fFilterCount,
730 //                                            fColType,
731 //                                            fBOP) || ignoreCP;
732 //		scanFlags[idx]=flag;
733 //#ifdef DEBUG
734 //		if (fOid >= 3000 && flushInterval == 0)
735 //			cout << (flag ? "  will scan " : "  will not scan ")
736 //				<< "extent with range " << extents[idx].partition.cprange.lo_val
737 //				<< "-" << extents[idx].partition.cprange.hi_val << endl;
738 //#endif
739 //
740 //		}
741 //
742 ////		if (fOid>=3000)
743 ////		cout << " " << scanFlags[idx];
744 //	}
745 ////	if (scanFlags.size()>0)
746 ////		cout << endl;
747 //
748 //	// If there was more than 1 input DL, the first is a list of filters and the second is a list of rids,
749 //	// otherwise the first is the list of rids.
750 //	if (fInputJobStepAssociation.outSize() > 1)
751 //		ridListIdx = 1;
752 //	else
753 //		ridListIdx = 0;
754 //
755 //	dl = fInputJobStepAssociation.outAt(ridListIdx);
756 //	ridList = dl->dataList();
757 //	if ( ridList )
758 //	{
759 //		fifo = dl->fifoDL();
760 //
761 //		if (fifo)
762 //			it = fifo->getIterator();
763 //		else
764 //			it = ridList->getIterator();
765 //	}
766 //	else
767 //	{
768 //		strRidList = dl->stringDataList();
769 //		strFifo = dl->stringDL();
770 //
771 //		if (strFifo)
772 //			it = strFifo->getIterator();
773 //		else
774 //			it = strRidList->getIterator();
775 //	}
776 //
777 //	if (ridList)
778 //	{
779 //		if (fifo)
780 //		{
781 //			more = fifo->next(it, &rw);
782 //			if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
783 //		    	dlTimes.setFirstReadTime();
784 //	   	 	}
785 //			absoluteRID = rw.et[0].first;
786 //		}
787 //		else
788 //		{
789 //			more = ridList->next(it, &e);
790 //			if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
791 //		    	dlTimes.setFirstReadTime();
792 //	   		}
793 //			absoluteRID = e.first;
794 //			rw.count = 1;
795 //		}
796 //	}
797 //	else
798 //	{
799 //		if (strFifo)
800 //		{
801 //			more = strFifo->next(it, &strRw);
802 //			if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
803 //		    	dlTimes.setFirstReadTime();
804 //	   	 	}
805 //			absoluteRID = strRw.et[0].first;
806 //		}
807 //		else
808 //		{
809 //			more = strRidList->next(it, &strE);
810 //			if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
811 //		    	dlTimes.setFirstReadTime();
812 //	   		}
813 //			absoluteRID = strE.first;
814 //			strRw.count = 1;
815 //		}
816 //	}
817 //
818 //	if (more)
819 //		msgLBID = getLBID(absoluteRID, scan);
820 //	scanThisBlock = scan;
821 //	msgLargeBlock = absoluteRID >> blockSizeShift;
822 //
823 //	while (more || msgRidCount > 0) {
824 //		uint64_t rwCount;
825 //		if ( ridList)
826 //			rwCount = rw.count;
827 //		else
828 //			rwCount = strRw.count;
829 //
830 //		for (uint64_t i = 0; ((i < rwCount) || (!more && msgRidCount > 0)); )
831 //		{
832 //			if ( ridList)
833 //			{
834 //				if (fifo)
835 //					absoluteRID = rw.et[i].first;
836 //				else
837 //					absoluteRID = e.first;
838 //			}
839 //			else
840 //			{
841 //				if (strFifo)
842 //					absoluteRID = strRw.et[i].first;
843 //				else
844 //					absoluteRID = strE.first;
845 //			}
846 //
847 //			if (more) {
848 //			    nextLBID = getLBID(absoluteRID, scan);
849 //			    nextLargeBlock = absoluteRID >> blockSizeShift;
850 //			}
851 //
852 //			//XXXPAT: need to prove N & S here
853 //			if (nextLBID == msgLBID && more) {
854 //// 				blockRelativeRID = absoluteRID % ridsPerBlock;
855 //				blockRelativeRID = absoluteRID & rpbMask;
856 //				msgRidList << blockRelativeRID;
857 //				msgRidCount++;
858 //				++i;
859 //			}
860 //			else {
861 //				//Bug 831: move building msg after the check of scanThisBlock
862 //				if (scanThisBlock==true)
863 //				{
864 //					hdr.ism.Interleave=0;
865 //					hdr.ism.Flags=planFlagsToPrimFlags(fTraceFlags);
866 //					hdr.ism.Command=COL_BY_SCAN;
867 //					hdr.ism.Size=sizeof(NewColRequestHeader) + fFilterString.length() +
868 //					msgRidList.length();
869 //					hdr.ism.Type=2;
870 //
871 //					hdr.hdr.SessionID = fSessionId;
872 //					//hdr.hdr.StatementID = 0;
873 //					hdr.hdr.TransactionID = fTxnId;
874 //					hdr.hdr.VerID = fVerId;
875 //					hdr.hdr.StepID = fStepId;
876 //					hdr.hdr.UniqueID = uniqueID;
877 //
878 //					hdr.LBID = msgLBID;
879 //// 					idbassert(hdr.LBID >= 0);
880 //					hdr.DataSize = fColType.colWidth;
881 //					hdr.DataType = fColType.colDataType;
882 //					hdr.CompType = fColType.compressionType;
883 //					hdr.OutputType = fOutputType;
884 //					hdr.BOP = fBOP;
885 //					hdr.NOPS = fFilterCount;
886 //					hdr.NVALS = msgRidCount;
887 //					hdr.sort = fToSort;
888 //
889 //					primMsg.append((const uint8_t *) &hdr, sizeof(NewColRequestHeader));
890 //					primMsg += fFilterString;
891 //					primMsg += msgRidList;
892 //					ridCount += msgRidCount;
893 //					++sentBlockCount;
894 //
895 //#ifdef DEBUG
896 //					if (flushInterval == 0 && fOid >= 3000)
897 //						cout << "sending a prim msg for LBID " << msgLBID << endl;
898 //#endif
899 //					++msgCount;
900 ////  				cout << "made a primitive\n";
901 //					if (msgLargeBlock != nextLargeBlock || !more) {
902 ////  					cout << "writing " << msgCount << " primitives\n";
903 //						fMsgBytesOut += primMsg.lengthWithHdrOverhead();
904 //						fDec->write(primMsg);
905 //						msgsSent += msgCount;
906 //						msgCount = 0;
907 //						primMsg.restart();
908 //						msgLargeBlock = nextLargeBlock;
909 //
910 //						// @bug 769 - Added "&& !fSwallowRows" condition below to fix problem with
911 //						// caltraceon(16) not working for tpch01 and some other queries. If a query
912 //                      // ever held off requesting more blocks, it would lock and never finish.
913 //						//Bug 815
914 //						if (( sentBlockCount >= fProjectBlockReqLimit) && !fSwallowRows &&
915 //						   (( msgsSent - msgsRecvd) >  fProjectBlockReqThreshold))
916 //						{
917 //							mutex.lock(); //pthread_mutex_lock(&mutex);
918 //							fStopSending = true;
919 //
920 //							// @bug 836.  Wake up the receiver if he's sleeping.
921 //							if (recvWaiting)
922 //								condvar.notify_one(); //pthread_cond_signal(&condvar);
923 //							flushed.wait(mutex); //pthread_cond_wait(&flushed, &mutex);
924 //							fStopSending = false;
925 //							mutex.unlock(); //pthread_mutex_unlock(&mutex);
926 //							sentBlockCount = 0;
927 //						}
928 //					}
929 //				}
930 //				else
931 //				{
932 //					msgsSkip++;
933 //				}
934 //				msgLBID = nextLBID;
935 //				msgRidList.restart();
936 //				msgRidCount = 0;
937 //
938 //				mutex.lock(); //pthread_mutex_lock(&mutex);
939 //
940 //				if (scanThisBlock) {
941 //					if (recvWaiting)
942 //						condvar.notify_one(); //pthread_cond_signal(&condvar);
943 //					#ifdef DEBUG
944 //// 					cout << "msgsSent++ = " << msgsSent << endl;
945 //					#endif
946 //				}
947 //				scanThisBlock = scan;
948 //				mutex.unlock(); //pthread_mutex_unlock(&mutex);
949 //
950 //				// break the for loop
951 //				if (!more)
952 //				break;
953 //			}
954 //		} // for rw.count
955 //
956 //		if (more)
957 //		{
958 //			if ( ridList )
959 //			{
960 //				if (fifo)
961 //				{
962 //					rw.count = 0;
963 //					more = fifo->next(it, &rw);
964 //				}
965 //				else
966 //				{
967 //					rw.count = 1;
968 //					more = ridList->next(it, &e);
969 //				}
970 //			}
971 //			else
972 //			{
973 //				if (strFifo)
974 //				{
975 //					strRw.count = 0;
976 //					more = strFifo->next(it, &strRw);
977 //				}
978 //				else
979 //				{
980 //					strRw.count = 1;
981 //					more = strRidList->next(it, &strE);
982 //				}
983 //			}
984 //		}
985 //	}
986 //
987 //	if (fOid>=3000) dlTimes.setLastReadTime();
988 //
989 //done:
990 //	mutex.lock(); //pthread_mutex_lock(&mutex);
991 //	finishedSending = true;
992 //	if (recvWaiting)
993 //		condvar.notify_one(); //pthread_cond_signal(&condvar);
994 //	mutex.unlock(); //pthread_mutex_unlock(&mutex);
995 //
996 //#ifdef DEBUG
997 //	if (fOid >=3000)
998 //		cout << "pColStep msgSent "
999 //			<< msgsSent << "/" << msgsSkip
1000 //			<< " rids " << ridCount
1001 //			<< " oid " << fOid << " " << msgLBID << endl;
1002 //#endif
1003 //	//...Track the number of LBIDs we skip due to Casual Partioning.
1004 //	fNumBlksSkipped += msgsSkip;
1005 }
1006 
receivePrimitiveMessages()1007 void pColStep::receivePrimitiveMessages()
1008 {
1009 //	int64_t ridResults = 0;
1010 //	AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
1011 //	DataList_t* dlp = dl->dataList();
1012 //	uint64_t fbo;
1013 //	FifoDataList *fifo = dl->fifoDL();
1014 //	UintRowGroup rw;
1015 //	uint64_t ridBase;
1016 //	boost::shared_ptr<ByteStream> bs;
1017 //	uint32_t i = 0, length;
1018 //
1019 //	while (1) {
1020 //		// sync with the send side
1021 //		mutex.lock(); //pthread_mutex_lock(&mutex);
1022 //		while (!finishedSending && msgsSent == msgsRecvd) {
1023 //			recvWaiting = true;
1024 // 			#ifdef DEBUG
1025 // 			cout << "c sleeping" << endl;
1026 // 			#endif
1027 //			// @bug 836.  Wake up the sender if he's sleeping.
1028 //			if (fStopSending)
1029 //				flushed.notify_one(); //pthread_cond_signal(&flushed);
1030 //			condvar.wait(mutex); //pthread_cond_wait(&condvar, &mutex);
1031 // 			#ifdef DEBUG
1032 //			cout << "c waking" << endl;
1033 // 			#endif
1034 //			recvWaiting = false;
1035 //		}
1036 //		if (msgsSent == msgsRecvd) {
1037 //			mutex.unlock(); //pthread_mutex_unlock(&mutex);
1038 //			break;
1039 //		}
1040 //		mutex.unlock(); //pthread_mutex_unlock(&mutex);
1041 //
1042 //		// do the recv
1043 //		fDec->read(uniqueID, bs);
1044 //		fMsgBytesIn += bs->lengthWithHdrOverhead();
1045 //
1046 //		// no more messages, and buffered messages should be already processed by now.
1047 //		if (bs->length() == 0) break;
1048 //
1049 //		#ifdef DEBUG
1050 //		cout << "msgsRecvd++ = " << msgsRecvd << ".  RidResults = " << ridResults << endl;
1051 //		cout << "Got a ColResultHeader!: " << bs.length() << " bytes" << endl;
1052 //		#endif
1053 //
1054 //		const ByteStream::byte* bsp = bs->buf();
1055 //
1056 //		// get the ISMPacketHeader out of the bytestream
1057 // 		//const ISMPacketHeader* ism = reinterpret_cast<const ISMPacketHeader*>(bsp);
1058 //
1059 //		// get the ColumnResultHeader out of the bytestream
1060 //		const ColResultHeader* crh = reinterpret_cast<const ColResultHeader*>
1061 //			(&bsp[sizeof(ISMPacketHeader)]);
1062 //
1063 //		bool firstRead = true;
1064 //		length = bs->length();
1065 //
1066 //		i = 0;
1067 //		uint32_t msgCount = 0;
1068 //		while (i < length) {
1069 //			++msgCount;
1070 //
1071 //			i += sizeof(ISMPacketHeader);
1072 //			crh = reinterpret_cast<const ColResultHeader*>(&bsp[i]);
1073 //			// double check the sequence number is increased by one each time
1074 //			i += sizeof(ColResultHeader);
1075 //
1076 //			fCacheIO    += crh->CacheIO;
1077 //			fPhysicalIO += crh->PhysicalIO;
1078 //
1079 //			// From this point on the rest of the bytestream is the data that comes back from the primitive server
1080 //			// This needs to be fed to a datalist that is retrieved from the outputassociation object.
1081 //
1082 //			fbo = getFBO(crh->LBID);
1083 //			ridBase = fbo << rpbShift;
1084 //
1085 //			#ifdef DEBUG
1086 ////	 		cout << "  NVALS = " << crh->NVALS << "  fbo = " << fbo << "  lbid = " << crh->LBID << endl;
1087 //			#endif
1088 //
1089 //			//Check output type
1090 //			if ( fOutputType == OT_RID )
1091 //			{
1092 //				ridResults += crh->NVALS;
1093 //			}
1094 //
1095 //			/* XXXPAT: This clause is executed when ExeMgr calls the new nextBand(BS) fcn.
1096 //
1097 //			   TODO: both classes have to agree
1098 //			   on which nextBand() variant will be called.  pColStep
1099 //			   currently has to infer that from flushInterval and the
1100 //			   Table OID.  It would be better to have a more explicit form
1101 //			   of agreement.
1102 //
1103 //			   The goal of the nextBand(BS) fcn is to avoid iterating over
1104 //			   every row except at unserialization.  This clause copies
1105 //			   the raw results from the PrimProc response directly into
1106 //			   the memory used for the ElementType array.  DeliveryStep
1107 //			   will also treat the ElementType array as raw memory and
1108 //			   serialize that.  TableColumn now parses the packed data
1109 //			   instead of whole ElementTypes.
1110 //			*/
1111 //			else if (fOutputType == OT_TOKEN && fFlushInterval > 0 && !fIsDict) {
1112 //
1113 //				if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0)
1114 //					dlTimes.setFirstInsertTime();
1115 //				ridResults += crh->NVALS;
1116 //
1117 //				/* memcpy the bytestream into the output set */
1118 //				uint32_t toCopy, bsPos = 0;
1119 //				uint8_t *pos;
1120 //				while (bsPos < crh->NVALS) {
1121 //					toCopy = (crh->NVALS - bsPos > rw.ElementsPerGroup - rw.count ?
1122 //						rw.ElementsPerGroup - rw.count : crh->NVALS - bsPos);
1123 //					pos = ((uint8_t *) &rw.et[0]) + (rw.count * fColType.colWidth);
1124 //					memcpy(pos, &bsp[i], toCopy * fColType.colWidth);
1125 //					bsPos += toCopy;
1126 //					i += toCopy * fColType.colWidth;
1127 //					rw.count += toCopy;
1128 //					if (rw.count == rw.ElementsPerGroup) {
1129 //						if (!fSwallowRows)
1130 //							fifo->insert(rw);
1131 //						rw.count = 0;
1132 //					}
1133 //				}
1134 //			}
1135 //			else if ( fOutputType == OT_TOKEN)
1136 //			{
1137 //				uint64_t dv;
1138 //				uint64_t rid;
1139 //
1140 //				if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0)
1141 //					dlTimes.setFirstInsertTime();
1142 //				ridResults += crh->NVALS;
1143 //				for(int j = 0; j < crh->NVALS; ++j)
1144 //  				{
1145 //					// XXXPAT: Only use this when the RID doesn't matter or when
1146 //					// the response contains every row.
1147 //
1148 //					rid = j + ridBase;
1149 //					switch (fColType.colWidth) {
1150 //						case 8: dv = *((const uint64_t *) &bsp[i]); i += 8; break;
1151 //						case 4: dv = *((const uint32_t *) &bsp[i]); i += 4; break;
1152 //						case 2: dv = *((const uint16_t *) &bsp[i]); i += 2; break;
1153 //						case 1: dv = *((const uint8_t *) &bsp[i]); ++i; break;
1154 //						default:
1155 //							throw runtime_error("pColStep: invalid column width!");
1156 //					}
1157 //
1158 //						// @bug 663 - Don't output any rows if fSwallowRows (caltraceon(16)) is on.
1159 //						// 	      This options swallows rows in the project steps.
1160 //	   				if (!fSwallowRows)
1161 //						{
1162 //							if (fifo)
1163 //							{
1164 //								rw.et[rw.count].first = rid;
1165 //								rw.et[rw.count++].second = dv;
1166 //								if (rw.count == rw.ElementsPerGroup)
1167 //								{
1168 //									fifo->insert(rw);
1169 //									rw.count = 0;
1170 //								}
1171 //							}
1172 //							else
1173 //							{
1174 //								dlp->insert(ElementType(rid, dv));
1175 //							}
1176 //				#ifdef DEBUG
1177 //					//cout << "  -- inserting <" << rid << ", " << dv << "> " << *prid << endl;
1178 //				#endif
1179 //						}
1180 //					}
1181 //				}
1182 //				else if ( fOutputType == OT_BOTH )
1183 //				{
1184 //					ridResults += crh->NVALS;
1185 //	  				for(int j = 0; j < crh->NVALS; ++j)
1186 //  					{
1187 //						uint64_t dv;
1188 //						uint64_t rid;
1189 //
1190 //						rid = *((const uint16_t *) &bsp[i]) + ridBase;
1191 //						i += sizeof(uint16_t);
1192 //						switch (fColType.colWidth) {
1193 //							case 8: dv = *((const uint64_t *) &bsp[i]); i += 8; break;
1194 //							case 4: dv = *((const uint32_t *) &bsp[i]); i += 4; break;
1195 //							case 2: dv = *((const uint16_t *) &bsp[i]); i += 2; break;
1196 //							case 1: dv = *((const uint8_t *) &bsp[i]); ++i; break;
1197 //							default:
1198 //								throw runtime_error("pColStep: invalid column width!");
1199 //						}
1200 //
1201 //						// @bug 663 - Don't output any rows if fSwallowRows (caltraceon(16)) is on.
1202 //						// 	      This options swallows rows in the project steps.
1203 //   					if (!fSwallowRows) {
1204 //						if (fOid>=3000 && dlTimes.FirstInsertTime().tv_sec==0)
1205 //							dlTimes.setFirstInsertTime();
1206 //						if(fifo)
1207 //						{
1208 //// 							rw.et[rw.count++] = ElementType(rid, dv);
1209 //							rw.et[rw.count].first = rid;
1210 //							rw.et[rw.count++].second = dv;
1211 //							if (rw.count == rw.ElementsPerGroup)
1212 //							{
1213 //								fifo->insert(rw);
1214 //								rw.count = 0;
1215 //							}
1216 //						}
1217 //						else
1218 //						{
1219 //							dlp->insert(ElementType(rid, dv));
1220 //						}
1221 //				#ifdef DEBUG
1222 //					//cout << "  -- inserting <" << rid << ", " << dv << "> " << *prid << endl;
1223 //				#endif
1224 //						}
1225 //					}
1226 //				}
1227 //			}	// unpacking the BS
1228 //
1229 //			//Bug 815: Check whether we have enough to process
1230 //			//++lockCount;
1231 //			mutex.lock(); //pthread_mutex_lock(&mutex);
1232 //			if  ( fStopSending && ((msgsSent - msgsRecvd ) <=  fProjectBlockReqThreshold) )
1233 //			{
1234 //					flushed.notify_one(); //pthread_cond_signal(&flushed);
1235 //			}
1236 //			mutex.unlock(); //pthread_mutex_unlock(&mutex);
1237 //
1238 //			firstRead = false;
1239 //			msgsRecvd += msgCount;
1240 //	}	// read loop
1241 //	// done reading
1242 //
1243 //	if (fifo && rw.count > 0)
1244 //		fifo->insert(rw);
1245 //
1246 //	//...Casual partitioning could cause us to do no processing.  In that
1247 //	//...case these time stamps did not get set.  So we set them here.
1248 //	if (fOid>=3000 && dlTimes.FirstReadTime().tv_sec==0) {
1249 //		dlTimes.setFirstReadTime();
1250 //		dlTimes.setLastReadTime();
1251 //		dlTimes.setFirstInsertTime();
1252 //	}
1253 //	if (fOid>=3000) dlTimes.setEndOfInputTime();
1254 //
1255 //	//@bug 699: Reset StepMsgQueue
1256 //	fDec->removeQueue(uniqueID);
1257 //
1258 //	if (fifo)
1259 //		fifo->endOfInput();
1260 //	else
1261 //		dlp->endOfInput();
1262 //
1263 //	if (fTableOid >= 3000)
1264 //	{
1265 //		//...Construct timestamp using ctime_r() instead of ctime() not
1266 //		//...necessarily due to re-entrancy, but because we want to strip
1267 //		//...the newline ('\n') off the end of the formatted string.
1268 //		time_t t = time(0);
1269 //		char timeString[50];
1270 //		ctime_r(&t, timeString);
1271 //		timeString[strlen(timeString)-1 ] = '\0';
1272 //
1273 //		FifoDataList* pFifo    = 0;
1274 //		uint64_t totalBlockedReadCount  = 0;
1275 //		uint64_t totalBlockedWriteCount = 0;
1276 //
1277 //		//...Sum up the blocked FIFO reads for all input associations
1278 //		size_t inDlCnt  = fInputJobStepAssociation.outSize();
1279 //		for (size_t iDataList=0; iDataList<inDlCnt; iDataList++)
1280 //		{
1281 //			pFifo = fInputJobStepAssociation.outAt(iDataList)->fifoDL();
1282 //			if (pFifo)
1283 //			{
1284 //				totalBlockedReadCount += pFifo->blockedReadCount();
1285 //			}
1286 //		}
1287 //
1288 //		//...Sum up the blocked FIFO writes for all output associations
1289 //		size_t outDlCnt = fOutputJobStepAssociation.outSize();
1290 //		for (size_t iDataList=0; iDataList<outDlCnt; iDataList++)
1291 //		{
1292 //			pFifo = fOutputJobStepAssociation.outAt(iDataList)->fifoDL();
1293 //			if (pFifo)
1294 //			{
1295 //				totalBlockedWriteCount += pFifo->blockedWriteCount();
1296 //			}
1297 //		}
1298 //
1299 //		//...Roundoff msg byte counts to nearest KB for display
1300 //		uint64_t msgBytesInKB  = fMsgBytesIn  >> 10;
1301 //		uint64_t msgBytesOutKB = fMsgBytesOut >> 10;
1302 //		if (fMsgBytesIn  & 512)
1303 //			msgBytesInKB++;
1304 //		if (fMsgBytesOut & 512)
1305 //			msgBytesOutKB++;
1306 //
1307 //        // @bug 828
1308 //        if (fifo)
1309 //            fifo->totalSize(ridResults);
1310 //
1311 //		if (traceOn())
1312 //		{
1313 //			//...Print job step completion information
1314 //			ostringstream logStr;
1315 //			logStr << "ses:" << fSessionId <<
1316 //				" st: " << fStepId << " finished at " <<
1317 //				timeString << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-"  <<
1318 //				fCacheIO << "; MsgsRvcd-" << msgsRecvd <<
1319 //				"; BlockedFifoIn/Out-" << totalBlockedReadCount <<
1320 //				"/" << totalBlockedWriteCount <<
1321 //				"; output size-" << ridResults << endl <<
1322 //				"\tPartitionBlocksEliminated-" << fNumBlksSkipped <<
1323 //				"; MsgBytesIn-"  << msgBytesInKB  << "KB" <<
1324 //				"; MsgBytesOut-" << msgBytesOutKB << "KB" << endl  <<
1325 //				"\t1st read " << dlTimes.FirstReadTimeString() <<
1326 //				"; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" <<
1327 //				JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(),dlTimes.FirstReadTime()) <<
1328 //				"s" << endl;
1329 //
1330 //			logEnd(logStr.str().c_str());
1331 //
1332 //			syslogReadBlockCounts(16,    // exemgr sybsystem
1333 //				fPhysicalIO,             // # blocks read from disk
1334 //				fCacheIO,                // # blocks read from cache
1335 //				fNumBlksSkipped);        // # casual partition block hits
1336 //			syslogProcessingTimes(16,    // exemgr subsystem
1337 //				dlTimes.FirstReadTime(),   // first datalist read
1338 //				dlTimes.LastReadTime(),    // last  datalist read
1339 //				dlTimes.FirstInsertTime(), // first datalist write
1340 //				dlTimes.EndOfInputTime()); // last (endOfInput) datalist write
1341 //			syslogEndStep(16,            // exemgr subsystem
1342 //				totalBlockedReadCount,   // blocked datalist input
1343 //				totalBlockedWriteCount,  // blocked datalist output
1344 //				fMsgBytesIn,             // incoming msg byte count
1345 //				fMsgBytesOut);           // outgoing msg byte count
1346 //		}
1347 //	}
1348 }
1349 
toString() const1350 const string pColStep::toString() const
1351 {
1352     ostringstream oss;
1353     oss << "pColStep        ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId << " st:" << fStepId <<
1354         " tb/col:" << fTableOid << "/" << fOid;
1355 
1356     if (alias().length()) oss << " alias:" << alias();
1357 
1358     if (view().length()) oss << " view:" << view();
1359 
1360     if (fOutputJobStepAssociation.outSize() > 0)
1361         oss << " " << omitOidInDL
1362             << fOutputJobStepAssociation.outAt(0) << showOidInDL;
1363     else
1364         oss << " (no output yet)";
1365 
1366     oss << " nf:" << fFilterCount;
1367     oss << " in:";
1368 
1369     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
1370     {
1371         oss << fInputJobStepAssociation.outAt(i) << ", ";
1372     }
1373 
1374     if (fSwallowRows)
1375         oss << " (sink)";
1376 
1377     return oss.str();
1378 }
1379 
addFilters()1380 void pColStep::addFilters()
1381 {
1382     AnyDataListSPtr dl = fInputJobStepAssociation.outAt(0);
1383     DataList_t* bdl = dl->dataList();
1384     FifoDataList* fifo = fInputJobStepAssociation.outAt(0)->fifoDL();
1385 
1386     idbassert(bdl);
1387     int it = -1;
1388     bool more;
1389     ElementType e;
1390     int64_t token;
1391 
1392     if (fifo != NULL)
1393     {
1394         try
1395         {
1396             it = fifo->getIterator();
1397         }
1398         catch (exception& ex)
1399         {
1400             cerr << "pColStep::addFilters: caught exception: " << ex.what() << " stepno: " <<
1401                  fStepId << endl;
1402         }
1403         catch (...)
1404         {
1405             cerr << "pColStep::addFilters: caught exception" << endl;
1406         }
1407 
1408         fBOP = BOP_OR;
1409         UintRowGroup rw;
1410 
1411         more = fifo->next(it, &rw);
1412 
1413         while (more)
1414         {
1415             for (uint64_t i = 0; i < rw.count; ++i)
1416                 addFilter(COMPARE_EQ, (int64_t) rw.et[i].second);
1417 
1418             more = fifo->next(it, &rw);
1419         }
1420     }
1421     else
1422     {
1423         try
1424         {
1425             it = bdl->getIterator();
1426         }
1427         catch (exception& ex)
1428         {
1429             cerr << "pColStep::addFilters: caught exception: " << ex.what() << " stepno: " <<
1430                  fStepId << endl;
1431         }
1432         catch (...)
1433         {
1434             cerr << "pColStep::addFilters: caught exception" << endl;
1435         }
1436 
1437         fBOP = BOP_OR;
1438 
1439         more = bdl->next(it, &e);
1440 
1441         while (more)
1442         {
1443             token = e.second;
1444             addFilter(COMPARE_EQ, token);
1445 
1446             more = bdl->next(it, &e);
1447         }
1448     }
1449 
1450     return;
1451 }
1452 
1453 /* This exists to avoid a DBRM lookup for every rid. */
getLBID(uint64_t rid,bool & scan)1454 inline uint64_t pColStep::getLBID(uint64_t rid, bool& scan)
1455 {
1456     uint32_t extentIndex, extentOffset;
1457     uint64_t fbo;
1458     fbo = rid >> rpbShift;
1459     extentIndex = fbo >> divShift;
1460     extentOffset = fbo & modMask;
1461     scan = (scanFlags[extentIndex] != 0);
1462     return extents[extentIndex].range.start + extentOffset;
1463 }
1464 
getFBO(uint64_t lbid)1465 inline uint64_t pColStep::getFBO(uint64_t lbid)
1466 {
1467     uint32_t i;
1468     uint64_t lastLBID;
1469 
1470     for (i = 0; i < numExtents; i++)
1471     {
1472         lastLBID = extents[i].range.start + (extents[i].range.size << 10) - 1;
1473 
1474         if (lbid >= (uint64_t) extents[i].range.start && lbid <= lastLBID)
1475             return (lbid - extents[i].range.start) + (i << divShift);
1476     }
1477 
1478     cerr << "pColStep: didn't find the FBO?\n";
1479     throw logic_error("pColStep: didn't find the FBO?");
1480 }
1481 
1482 
appendFilter(const messageqcpp::ByteStream & filter,unsigned count)1483 void pColStep::appendFilter(const messageqcpp::ByteStream& filter, unsigned count)
1484 {
1485     fFilterString += filter;
1486     fFilterCount += count;
1487 }
1488 
1489 
addFilter(const Filter * f)1490 void pColStep::addFilter(const Filter* f)
1491 {
1492     if (NULL != f)
1493         fFilters.push_back(f);
1494 }
1495 
1496 
appendFilter(const std::vector<const execplan::Filter * > & fs)1497 void pColStep::appendFilter(const std::vector<const execplan::Filter*>& fs)
1498 {
1499     fFilters.insert(fFilters.end(), fs.begin(), fs.end());
1500 }
1501 
1502 }   //namespace
1503 // vim:ts=4 sw=4:
1504 
1505