1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: primitivestep.h 9688 2013-07-15 19:27:22Z pleblanc $
20 
21 
22 /** @file */
23 
24 #ifndef JOBLIST_PRIMITIVESTEP_H
25 #define JOBLIST_PRIMITIVESTEP_H
26 
27 #include <iostream>
28 #include <sstream>
29 #include <vector>
30 #include <string>
31 #include <utility>
32 #include <cassert>
33 #include <sys/time.h>
34 #include <set>
35 #include <map>
36 #include <stdexcept>
37 #include <sstream>
38 #ifndef _MSC_VER
39 #include <tr1/memory>
40 #else
41 #include <memory>
42 #endif
43 
44 #include <boost/shared_ptr.hpp>
45 #include <boost/shared_array.hpp>
46 #include <boost/thread.hpp>
47 #include <boost/thread/condition.hpp>
48 
49 #include "calpontsystemcatalog.h"
50 #include "calpontselectexecutionplan.h"
51 #include "brm.h"
52 #include "parsetree.h"
53 #include "simplefilter.h"
54 
55 #include "jobstep.h"
56 #include "primitivemsg.h"
57 #include "elementtype.h"
58 #include "distributedenginecomm.h"
59 #include "lbidlist.h"
60 #include "joblisttypes.h"
61 #include "timestamp.h"
62 #include "timeset.h"
63 #include "resourcemanager.h"
64 #include "joiner.h"
65 #include "tuplejoiner.h"
66 #include "rowgroup.h"
67 #include "rowaggregation.h"
68 #include "funcexpwrapper.h"
69 
70 namespace joblist
71 {
72 
73 /* Forward decl's to support the batch primitive classes */
74 struct JobInfo;
75 class CommandJL;
76 class ColumnCommandJL;
77 class DictStepJL;
78 class BatchPrimitiveProcessorJL;
79 class pColStep;
80 class pColScanStep;
81 class PassThruStep;
82 class PseudoColStep;
83 
84 
85 typedef boost::shared_ptr<LBIDList> SP_LBIDList;
86 typedef std::vector<execplan::CalpontSystemCatalog::OID> OIDVector;
87 typedef std::vector<std::pair<execplan::CalpontSystemCatalog::OID, int> > OIDIntVector;
88 
89 
90 enum PrimitiveStepType
91 {
92     SCAN,
93     COLSTEP,
94     DICTIONARYSCAN,
95     DICTIONARY,
96     PASSTHRU,
97     AGGRFILTERSTEP
98 };
99 
100 
101 /** @brief class PrimitiveMsg
102  *
103  */
104 class PrimitiveMsg
105 {
106 public:
107     /** @brief virtual void Send method
108      */
109     virtual void send();
110     /** @brief virtual void Receive method
111      */
112     virtual void receive();
113     /** @brief virtual void BuildPrimitiveMessage method
114      */
115     virtual void buildPrimitiveMessage(ISMPACKETCOMMAND cmd, void* filterValues, void* ridArray);
116     virtual void sendPrimitiveMessages();
117     virtual void receivePrimitiveMessages();
118 
PrimitiveMsg()119     PrimitiveMsg() { }
120 
~PrimitiveMsg()121     virtual ~PrimitiveMsg() { }
122 
123     uint16_t planFlagsToPrimFlags(uint32_t planFlags);
124 
125 private:
126 };
127 
128 
129 class pColScanStep;
130 class pColStep : public JobStep, public PrimitiveMsg
131 {
132 
133     typedef std::pair<int64_t, int64_t> element_t;
134 
135 public:
136     /** @brief pColStep constructor
137      * @param flushInterval The interval in msgs at which the sending side should
138      * wait for the receiveing side to catch up.  0 (default) means never.
139      */
140     pColStep(
141         execplan::CalpontSystemCatalog::OID oid,
142         execplan::CalpontSystemCatalog::OID tableOid,
143         const execplan::CalpontSystemCatalog::ColType& ct,
144         const JobInfo& jobInfo);
145 
146     pColStep(const pColScanStep& rhs);
147 
148     pColStep(const PassThruStep& rhs);
149 
150     virtual ~pColStep();
151 
152     /** @brief Starts processing.  Set at least the RID list before calling.
153      *
154      * Starts processing.  Set at least the RID list before calling this.
155      */
156     virtual void run();
157     /** @brief Sync's the caller with the end of execution.
158      *
159      * Does nothing.  Returns when this instance is finished.
160      */
161     virtual void join();
162 
163     virtual const std::string toString() const;
164 
isDictCol()165     virtual bool isDictCol() const
166     {
167         return fIsDict;
168     };
isExeMgr()169     bool isExeMgr() const
170     {
171         return isEM;
172     }
173 
174     /** @brief Set config parameters for this JobStep.
175      *
176      * Set the config parameters this JobStep.
177      */
178     void initializeConfigParms();
179 
180     /** @brief The main loop for the send-side thread
181      *
182      * The main loop for the primitive-issuing thread.  Don't call it directly.
183      */
184     void sendPrimitiveMessages();
185 
186     /** @brief The main loop for the recv-side thread
187      *
188      * The main loop for the receive-side thread.  Don't call it directly.
189      */
190     void receivePrimitiveMessages();
191 
192     /** @brief Add a filter.  Use this interface when the column stores anything but 4-byte floats.
193      *
194      * Add a filter.  Use this interface when the column stores anything but 4-byte floats.
195      */
196     void addFilter(int8_t COP, int64_t value, uint8_t roundFlag = 0);
197     void addFilter(int8_t COP, float value);
198 
199     /** @brief Sets the DataList to get RID values from.
200      *
201      * Sets the DataList to get RID values from.  Filtering by RID distinguishes
202      * this class from pColScan.  Use pColScan if the every RID should be considered; it's
203      * faster at that.
204      */
205     void setRidList(DataList<ElementType>* rids);
206 
207     /** @brief Sets the String DataList to get RID values from.
208      *
209      * Sets the string DataList to get RID values from.  Filtering by RID distinguishes
210      * this class from pColScan.  Use pColScan if the every RID should be considered; it's
211      * faster at that.
212      */
213     void setStrRidList(DataList<StringElementType>* strDl);
214 
215     /** @brief Set the binary operator for the filter predicate (BOP_AND or BOP_OR).
216      *
217      * Set the binary operator for the filter predicate (BOP_AND or BOP_OR).
218      */
219     void setBOP(int8_t BOP);
220 
221     /** @brief Set the output type.
222      *
223      * Set the output type (1 = RID, 2 = Token, 3 = Both).
224      */
225     void setOutputType(int8_t OutputType);
226 
227     /** @brief Set the swallowRows flag.
228      *
229      *
230      * If true, no rows will be inserted to the output datalists.
231      */
232     void setSwallowRows(const bool swallowRows);
233 
234     /** @brief Get the swallowRows flag.
235      *
236      *
237      * If true, no rows will be inserted to the output datalists.
238      */
getSwallowRows()239     bool getSwallowRows() const
240     {
241         return fSwallowRows;
242     }
243 
oid()244     virtual execplan::CalpontSystemCatalog::OID oid() const
245     {
246         return fOid;
247     }
248 
tableOid()249     virtual execplan::CalpontSystemCatalog::OID tableOid() const
250     {
251         return fTableOid;
252     }
253 
filterCount()254     uint32_t filterCount() const
255     {
256         return fFilterCount;
257     }
filterString()258     const messageqcpp::ByteStream& filterString() const
259     {
260         return fFilterString;
261     }
BOP()262     int8_t BOP() const
263     {
264         return fBOP;
265     }
colType()266     const execplan::CalpontSystemCatalog::ColType& colType() const
267     {
268         return fColType;
269     }
270     void appendFilter(const messageqcpp::ByteStream& filter, unsigned count);
flushInterval()271     uint32_t flushInterval() const
272     {
273         return fFlushInterval;
274     }
getFeederFlag()275     bool getFeederFlag() const
276     {
277         return isFilterFeeder;
278     }
279 
setFeederFlag(bool filterFeeder)280     void setFeederFlag (bool filterFeeder)
281     {
282         isFilterFeeder = filterFeeder;
283     }
phyIOCount()284     virtual uint64_t phyIOCount    () const
285     {
286         return fPhysicalIO;
287     }
cacheIOCount()288     virtual uint64_t cacheIOCount  () const
289     {
290         return fCacheIO;
291     }
msgsRcvdCount()292     virtual uint64_t msgsRcvdCount () const
293     {
294         return msgsRecvd;
295     }
msgBytesIn()296     virtual uint64_t msgBytesIn    () const
297     {
298         return fMsgBytesIn;
299     }
msgBytesOut()300     virtual uint64_t msgBytesOut   () const
301     {
302         return fMsgBytesOut;
303     }
304 
305     //...Currently only supported by pColStep and pColScanStep, so didn't bother
306     //...to define abstract method in base class, but if start adding to other
307     //...classes, then should consider adding pure virtual method to JobStep.
blksSkipped()308     uint64_t blksSkipped           () const
309     {
310         return fNumBlksSkipped;
311     }
resourceManager()312     ResourceManager* resourceManager() const
313     {
314         return fRm;
315     }
316 
getlbidList()317     SP_LBIDList getlbidList() const
318     {
319         return lbidList;
320     }
321 
322     void addFilter(const execplan::Filter* f);
323     void appendFilter(const std::vector<const execplan::Filter*>& fs);
getFilters()324     std::vector<const execplan::Filter*>& getFilters()
325     {
326         return fFilters;
327     }
328 
329 protected:
330     void addFilters();
331 
332 private:
333 
334     /** @brief constructor for completeness
335      */
336     explicit pColStep();
337 
338     /** @brief StartPrimitiveThread
339      *  Utility function to start worker thread that sends primitive messages
340      */
341     void startPrimitiveThread();
342     /** @brief StartAggregationThread
343      *  Utility function to start worker thread that receives result aggregation from primitive servers
344      */
345     void startAggregationThread();
346     uint64_t getLBID(uint64_t rid, bool& scan);
347     uint64_t getFBO(uint64_t lbid);
348 
349     ResourceManager* fRm;
350     boost::shared_ptr<execplan::CalpontSystemCatalog> sysCat;
351     execplan::CalpontSystemCatalog::OID fOid;
352     execplan::CalpontSystemCatalog::OID fTableOid;
353     execplan::CalpontSystemCatalog::ColType fColType;
354     uint32_t fFilterCount;
355     int8_t fBOP;
356     int8_t fOutputType;
357     uint16_t realWidth;
358     DataList_t* ridList;
359     StrDataList* strRidList;
360     messageqcpp::ByteStream fFilterString;
361     std::vector<struct BRM::EMEntry> extents;
362     uint32_t extentSize, divShift, modMask, ridsPerBlock, rpbShift, blockSizeShift, numExtents;
363     uint64_t rpbMask;
364     uint64_t msgsSent, msgsRecvd;
365     bool finishedSending, recvWaiting, fIsDict;
366     bool isEM;
367     int64_t ridCount;
368     uint32_t fFlushInterval;
369 
370     // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4.
371     // 	      Running with this one will swallow rows at projection.
372     bool fSwallowRows;
373     uint32_t fProjectBlockReqLimit;     // max number of rids to send in a scan
374     // request to primproc
375     uint32_t fProjectBlockReqThreshold; // min level of rids backlog before
376     // consumer will tell producer to send
377     // more rids scan requests to primproc
378 
379     volatile bool fStopSending;
380     bool isFilterFeeder;
381     uint64_t fPhysicalIO;	// total physical I/O count
382     uint64_t fCacheIO;		// total cache I/O count
383     uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP
384     uint64_t fMsgBytesIn;   // total byte count for incoming messages
385     uint64_t fMsgBytesOut;  // total byte count for outcoming messages
386 
387     BRM::DBRM dbrm;
388 
389     // boost::shared_ptr<boost::thread> cThread;  //consumer thread
390     // boost::shared_ptr<boost::thread> pThread;  //producer thread
391     boost::mutex mutex;
392     boost::condition condvar;
393     boost::condition flushed;
394     SP_LBIDList lbidList;
395     std::vector<bool> scanFlags; // use to keep track of which extents to eliminate from this step
396     uint32_t uniqueID;
397 
398     //@bug 2634
399     //@bug 3128 change ParseTree* to vector<Filter*>
400     std::vector<const execplan::Filter*> fFilters;
401 
402     friend class pColScanStep;
403     friend class PassThruStep;
404     friend class ColumnCommandJL;
405     friend class RTSCommandJL;
406     friend class BatchPrimitiveStep;
407     friend class TupleBPS;
408 };
409 
410 /** @brief the pColScan Step
411  *
412  *  The most common step which requires no input RID list, but may have value filters applied
413  *
414  *  The input association will always be null here so that we can go as soon as the Run function is called
415  *
416  *  The StartPrimitiveThread will spawn a new worker thread that will
417  *  a) take any input filters and apply them to a primitive message to be sent
418  *  b) walk the block resolution manager via an LBID list for the oid
419  *  c) send messages to the primitive server as quickly as possible
420  */
421 
422 class pColScanStep : public JobStep, public PrimitiveMsg
423 {
424 public:
425     /** @brief pColScanStep constructor
426      */
427     pColScanStep(
428         execplan::CalpontSystemCatalog::OID oid,
429         execplan::CalpontSystemCatalog::OID tableOid,
430         const execplan::CalpontSystemCatalog::ColType& ct,
431         const JobInfo& jobInfo);
432 
433     pColScanStep(const pColStep& rhs);
434     ~pColScanStep();
435 
436     /** @brief Starts processing.
437      *
438      * Starts processing.
439      */
440     virtual void run();
441 
442     /** @brief Sync's the caller with the end of execution.
443      *
444      * Does nothing.  Returns when this instance is finished.
445      */
446     virtual void join();
447 
isDictCol()448     virtual bool isDictCol() const
449     {
450         return fIsDict;
451     };
452 
453     /** @brief The main loop for the send-side thread
454      *
455      * The main loop for the primitive-issuing thread.  Don't call it directly.
456      */
457     void sendPrimitiveMessages();
458 
459     /** @brief The main loop for the recv-side thread
460      *
461      * The main loop for the receive-side thread.  Don't call it directly.
462      */
463     using PrimitiveMsg::receivePrimitiveMessages;
464     void receivePrimitiveMessages(uint64_t i = 0);
465 
466     /** @brief Add a filter when the column is a 4-byte float type
467      *
468      * Add a filter when the column is a 4-byte float type
469      */
470     void addFilter(int8_t COP, float value);
471 
472     /** @brief Add a filter when the column is anything but a 4-byte float type.
473      *
474      * Add a filter when the column is anything but a 4-byte float type, including
475      * 8-byte doubles.
476      */
477     void addFilter(int8_t COP, int64_t value, uint8_t roundFlag = 0);
478 
479     /** @brief Set the binary operator for the filter predicates
480      *
481      * Set the binary operator for the filter predicates (BOP_AND or BOP_OR).
482      * It is initialized to OR.
483      */
484     void setBOP(int8_t BOP);	// AND or OR
BOP()485     int8_t BOP() const
486     {
487         return fBOP;
488     }
489 
getFeederFlag()490     bool getFeederFlag() const
491     {
492         return isFilterFeeder;
493     }
494 
setFeederFlag(bool filterFeeder)495     void setFeederFlag (bool filterFeeder)
496     {
497         isFilterFeeder = filterFeeder;
498     }
499     /** @brief Get the string of the filter predicates
500      *
501      * Get the filter string constructed from the predicates
502      */
filterString()503     messageqcpp::ByteStream filterString() const
504     {
505         return fFilterString;
506     }
507 
508     void setSingleThread(bool b);
getSingleThread()509     bool getSingleThread()
510     {
511         return fSingleThread;
512     }
513 
514     /** @brief Set the output type.
515      *
516      * Set the output type (1 = RID, 2 = Token, 3 = Both).pColScan
517      */
518     void setOutputType(int8_t OutputType);
filterCount()519     uint32_t filterCount() const
520     {
521         return fFilterCount;
522     }
523 
524     virtual const std::string toString() const;
525 
oid()526     virtual execplan::CalpontSystemCatalog::OID oid() const
527     {
528         return fOid;
529     }
530 
tableOid()531     virtual execplan::CalpontSystemCatalog::OID tableOid() const
532     {
533         return fTableOid;
534     }
colType()535     const execplan::CalpontSystemCatalog::ColType& colType() const
536     {
537         return fColType;
538     }
resourceManager()539     ResourceManager* resourceManager() const
540     {
541         return fRm;
542     }
543 
phyIOCount()544     virtual uint64_t phyIOCount    () const
545     {
546         return fPhysicalIO;
547     }
cacheIOCount()548     virtual uint64_t cacheIOCount  () const
549     {
550         return fCacheIO;
551     }
msgsRcvdCount()552     virtual uint64_t msgsRcvdCount () const
553     {
554         return recvCount;
555     }
msgBytesIn()556     virtual uint64_t msgBytesIn    () const
557     {
558         return fMsgBytesIn;
559     }
msgBytesOut()560     virtual uint64_t msgBytesOut   () const
561     {
562         return fMsgBytesOut;
563     }
getRidsPerBlock()564     uint32_t getRidsPerBlock() const
565     {
566         return ridsPerBlock;
567     }
568 
569     //...Currently only supported by pColStep and pColScanStep, so didn't bother
570     //...to define abstract method in base class, but if start adding to other
571     //...classes, then should consider adding pure virtual method to JobStep.
blksSkipped()572     uint64_t blksSkipped           () const
573     {
574         return fNumBlksSkipped;
575     }
576 
udfName()577     std::string udfName() const
578     {
579         return fUdfName;
580     };
udfName(const std::string & name)581     void udfName(const std::string& name)
582     {
583         fUdfName = name;
584     }
585 
getlbidList()586     SP_LBIDList getlbidList() const
587     {
588         return lbidList;
589     }
590 
591     void addFilter(const execplan::Filter* f);
592     void appendFilter(const std::vector<const execplan::Filter*>& fs);
getFilters()593     std::vector<const execplan::Filter*>& getFilters()
594     {
595         return fFilters;
596     }
getFilters()597     const std::vector<const execplan::Filter*>& getFilters() const
598     {
599         return fFilters;
600     }
601 
602 protected:
603     void addFilters();
604 
605 private:
606     //defaults okay?
607     //pColScanStep(const pColScanStep& rhs);
608     //pColScanStep& operator=(const pColScanStep& rhs);
609 
610     typedef boost::shared_ptr<boost::thread> SPTHD;
611     void startPrimitiveThread();
612     void startAggregationThread();
613     void initializeConfigParms();
614     void sendAPrimitiveMessage (
615         ISMPacketHeader& ism,
616         BRM::LBID_t msgLbidStart,
617         uint32_t msgLbidCount);
618     uint64_t getFBO(uint64_t lbid);
619     bool isEmptyVal(const uint8_t* val8) const;
620 
621     ResourceManager* fRm;
622     ColByScanRangeRequestHeader fMsgHeader;
623     SPTHD fConsumerThread;
624     /// number of threads on the receive side
625     uint32_t fNumThreads;
626 
627     SPTHD* fProducerThread;
628     messageqcpp::ByteStream fFilterString;
629     uint32_t fFilterCount;
630     execplan::CalpontSystemCatalog::OID fOid;
631     execplan::CalpontSystemCatalog::OID fTableOid;
632     execplan::CalpontSystemCatalog::ColType fColType;
633     int8_t fBOP;
634     int8_t fOutputType;
635     uint32_t sentCount;
636     uint32_t recvCount;
637     BRM::LBIDRange_v lbidRanges;
638     BRM::DBRM dbrm;
639     SP_LBIDList lbidList;
640 
641     boost::mutex mutex;
642     boost::mutex dlMutex;
643     boost::mutex cpMutex;
644     boost::condition condvar;
645     boost::condition condvarWakeupProducer;
646     bool finishedSending, sendWaiting, rDoNothing, fIsDict;
647     uint32_t recvWaiting, recvExited;
648 
649     std::vector<struct BRM::EMEntry> extents;
650     uint32_t extentSize, divShift, ridsPerBlock, rpbShift, numExtents;
651 // 	config::Config *fConfig;
652 
653     uint32_t fScanLbidReqLimit;     // max number of LBIDs to send in a scan
654     // request to primproc
655     uint32_t fScanLbidReqThreshold; // min level of scan LBID backlog before
656     // consumer will tell producer to send
657     // more LBID scan requests to primproc
658 
659     bool fStopSending;
660     bool fSingleThread;
661     bool isFilterFeeder;
662     uint64_t fPhysicalIO;	// total physical I/O count
663     uint64_t fCacheIO;		// total cache I/O count
664     uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP
665     uint64_t fMsgBytesIn;   // total byte count for incoming messages
666     uint64_t fMsgBytesOut;  // total byte count for outcoming messages
667     uint32_t fMsgsToPm;     // total number of messages sent to PMs
668     uint32_t uniqueID;
669     std::string fUdfName;
670 
671     //@bug 2634
672     //@bug 3128 change ParseTree* to vector<Filter*>
673     std::vector<const execplan::Filter*> fFilters;
674 
675     friend class ColumnCommandJL;
676     friend class BatchPrimitiveProcessorJL;
677     friend class BucketReuseStep;
678     friend class BatchPrimitiveStep;
679     friend class TupleBPS;
680 };
681 
682 
683 #if 0
684 class pIdxStep : public JobStep
685 {
686 public:
687     /** @brief pIdxStep constructor
688      * @param in the inputAssociation pointer
689      * @param out the outputAssociation pointer
690      * @param ec the DistributedEngineComm pointer
691      */
692     pIdxStep(JobStepAssociation* in, JobStepAssociation* out, DistributedEngineComm* ec);
693     /** @brief virtual void Run method
694      */
695     virtual void run();
696 private:
697     pIdxStep();
698     void startPrimitveThread();
699     void startAggregationThread();
700 
701 protected:
702     DistributedEngineComm* fDec;
703     JobStepAssociation* fInputJobStepAssociation;
704     JobStepAssociation* fOutputJobStepAssociation;
705 };
706 #endif
707 
708 /** @brief class pDictionaryStep
709  *
710  */
711 class pDictionaryStep : public JobStep, public PrimitiveMsg
712 {
713 
714 public:
715     /** @brief pDictionaryStep constructor
716      */
717 
718     pDictionaryStep(
719         execplan::CalpontSystemCatalog::OID oid,
720         execplan::CalpontSystemCatalog::OID tabelOid,
721         const execplan::CalpontSystemCatalog::ColType& ct,
722         const JobInfo& jobInfo);
723 
724     virtual ~pDictionaryStep();
725 
726     /** @brief virtual void Run method
727      */
728     virtual void run();
729     virtual void join();
730     //void setOutList(StringDataList* rids);
731     void setInputList(DataList_t* rids);
732     void setBOP(int8_t b);
733     void sendPrimitiveMessages();
734     void receivePrimitiveMessages();
735 
736     virtual const std::string toString() const;
737 
colType()738     execplan::CalpontSystemCatalog::ColType& colType()
739     {
740         return fColType;
741     }
colType()742     execplan::CalpontSystemCatalog::ColType colType() const
743     {
744         return fColType;
745     }
746 
oid()747     virtual execplan::CalpontSystemCatalog::OID oid() const
748     {
749         return fOid;
750     }
tableOid()751     virtual execplan::CalpontSystemCatalog::OID tableOid() const
752     {
753         return fTableOid;
754     }
phyIOCount()755     virtual uint64_t phyIOCount    () const
756     {
757         return fPhysicalIO;
758     }
cacheIOCount()759     virtual uint64_t cacheIOCount  () const
760     {
761         return fCacheIO;
762     }
msgsRcvdCount()763     virtual uint64_t msgsRcvdCount () const
764     {
765         return msgsRecvd;
766     }
msgBytesIn()767     virtual uint64_t msgBytesIn    () const
768     {
769         return fMsgBytesIn;
770     }
msgBytesOut()771     virtual uint64_t msgBytesOut   () const
772     {
773         return fMsgBytesOut;
774     }
775     void addFilter(int8_t COP, const std::string& value);
filterCount()776     uint32_t filterCount() const
777     {
778         return fFilterCount;
779     }
filterString()780     messageqcpp::ByteStream filterString() const
781     {
782         return fFilterString;
783     }
784 
785     // @bug3321, add filters into pDictionary
786     void appendFilter(const messageqcpp::ByteStream& filter, unsigned count);
787     void addFilter(const execplan::Filter* f);
788     void appendFilter(const std::vector<const execplan::Filter*>& fs);
getFilters()789     std::vector<const execplan::Filter*>& getFilters()
790     {
791         return fFilters;
792     }
BOP()793     int8_t BOP() const
794     {
795         return fBOP;
796     }
797 
798 private:
799     pDictionaryStep();
800     void startPrimitiveThread();
801     void startAggregationThread();
802 
803     boost::shared_ptr<execplan::CalpontSystemCatalog> sysCat;
804     execplan::CalpontSystemCatalog::OID fOid;
805     execplan::CalpontSystemCatalog::OID fTableOid;
806     uint32_t fBOP;
807     uint32_t msgsSent;
808     uint32_t msgsRecvd;
809     uint32_t finishedSending;
810     uint32_t recvWaiting;
811     int64_t ridCount;
812     execplan::CalpontSystemCatalog::ColType fColType;
813     uint64_t pThread;  //producer thread
814     uint64_t cThread;  //producer thread
815 
816     messageqcpp::ByteStream fFilterString;
817     uint32_t fFilterCount;
818 
819     DataList_t* requestList;
820     //StringDataList* stringList;
821     boost::mutex mutex;
822     boost::condition condvar;
823     uint32_t fInterval;
824     uint64_t fPhysicalIO;	// total physical I/O count
825     uint64_t fCacheIO;		// total cache I/O count
826     uint64_t fMsgBytesIn;   // total byte count for incoming messages
827     uint64_t fMsgBytesOut;  // total byte count for outcoming messages
828     uint32_t uniqueID;
829     ResourceManager* fRm;
830 
831     //@bug 3128 change ParseTree* to vector<Filter*>
832     std::vector<const execplan::Filter*> fFilters;
833 
834     bool hasEqualityFilter;
835     int8_t tmpCOP;
836     std::vector<std::string> eqFilter;
837 
838     friend class DictStepJL;
839     friend class RTSCommandJL;
840     friend class BucketReuseStep;
841     friend class BatchPrimitiveStep;
842     friend class TupleBPS;
843 };
844 
845 /** @brief class pDictionaryScan
846  *
847  */
848 class pDictionaryScan : public JobStep, public PrimitiveMsg
849 {
850 public:
851 
852     /** @brief pDictionaryScan constructor
853      */
854 
855     pDictionaryScan(
856         execplan::CalpontSystemCatalog::OID oid,
857         execplan::CalpontSystemCatalog::OID tableOid,
858         const execplan::CalpontSystemCatalog::ColType& ct,
859         const JobInfo& jobInfo);
860 
861     ~pDictionaryScan();
862 
863     /** @brief virtual void Run method
864      */
865     virtual void run();
866     virtual void join();
867     void setInputList(DataList_t* rids);
868     void setBOP(int8_t b);
869     void sendPrimitiveMessages();
870     void receivePrimitiveMessages();
871     void setSingleThread();
872     virtual const std::string toString() const;
873 
874     void setRidList(DataList<ElementType>* rids);
875 
876     /** @brief Add a filter.  Use this interface when the column stores anything but 4-byte floats.
877      *
878      * Add a filter.  Use this interface when the column stores anything but 4-byte floats.
879      */
880     void addFilter(int8_t COP, const std::string& value);  // all but FLOATS can use this interface
881 
882     /** @brief Set the DistributedEngineComm object this instance should use
883      *
884      * Set the DistributedEngineComm object this instance should use
885      */
dec(DistributedEngineComm * dec)886     void dec(DistributedEngineComm* dec)
887     {
888         if (fDec) fDec->removeQueue(uniqueID);
889 
890         fDec = dec;
891 
892         if (fDec) fDec->addQueue(uniqueID);
893     }
894 
oid()895     virtual execplan::CalpontSystemCatalog::OID oid() const
896     {
897         return fOid;
898     }
tableOid()899     virtual execplan::CalpontSystemCatalog::OID tableOid() const
900     {
901         return fTableOid;
902     }
903 
phyIOCount()904     uint64_t phyIOCount    () const
905     {
906         return fPhysicalIO;
907     }
cacheIOCount()908     uint64_t cacheIOCount  () const
909     {
910         return fCacheIO;
911     }
msgsRcvdCount()912     uint64_t msgsRcvdCount () const
913     {
914         return msgsRecvd;
915     }
msgBytesIn()916     uint64_t msgBytesIn    () const
917     {
918         return fMsgBytesIn;
919     }
msgBytesOut()920     uint64_t msgBytesOut   () const
921     {
922         return fMsgBytesOut;
923     }
924 
getOutputType()925     BPSOutputType getOutputType() const
926     {
927         return fOutType;
928     }
setOutputType(BPSOutputType ot)929     void setOutputType(BPSOutputType ot)
930     {
931         fOutType = ot;
932     }
setOutputRowGroup(const rowgroup::RowGroup & rg)933     void setOutputRowGroup(const rowgroup::RowGroup& rg)
934     {
935         fOutputRowGroup = rg;
936     }
getOutputRowGroup()937     const rowgroup::RowGroup& getOutputRowGroup() const
938     {
939         return fOutputRowGroup;
940     }
941 
942     // @bug3321, add interface for combining filters.
BOP()943     int8_t BOP() const
944     {
945         return fBOP;
946     }
947     void addFilter(const execplan::Filter* f);
948     void appendFilter(const std::vector<const execplan::Filter*>& fs);
getFilters()949     std::vector<const execplan::Filter*>& getFilters()
950     {
951         return fFilters;
952     }
filterString()953     messageqcpp::ByteStream filterString() const
954     {
955         return fFilterString;
956     }
filterCount()957     uint32_t filterCount() const
958     {
959         return fFilterCount;
960     }
961     void appendFilter(const messageqcpp::ByteStream& filter, unsigned count);
962 
963     virtual void abort();
964 
colType()965     const execplan::CalpontSystemCatalog::ColType& colType() const
966     {
967         return fColType;
968     }
969 
970 protected:
971     void sendError(uint16_t error);
972 
973 private:
974     pDictionaryScan();
975     void startPrimitiveThread();
976     void startAggregationThread();
977     void initializeConfigParms();
978     void sendAPrimitiveMessage(
979         messageqcpp::ByteStream& primMsg,
980         BRM::LBID_t msgLbidStart,
981         uint32_t msgLbidCount, uint16_t dbroot);
982     void formatMiniStats();
983 
984     DistributedEngineComm* fDec;
985     boost::shared_ptr<execplan::CalpontSystemCatalog> sysCat;
986     execplan::CalpontSystemCatalog::OID fOid;
987     execplan::CalpontSystemCatalog::OID fTableOid;
988     uint32_t fFilterCount;
989     uint32_t fBOP;
990     uint32_t fCOP1;
991     uint32_t fCOP2;
992     uint32_t msgsSent;
993     uint32_t msgsRecvd;
994     uint32_t finishedSending;
995     uint32_t recvWaiting;
996     uint32_t sendWaiting;
997     int64_t  ridCount;
998     uint32_t fLogicalBlocksPerScan;
999     DataList<ElementType>* ridList;
1000     messageqcpp::ByteStream fFilterString;
1001     execplan::CalpontSystemCatalog::ColType fColType;
1002     uint64_t pThread;  //producer thread. thread pool handle
1003     uint64_t cThread;  //consumer thread. thread pool handle
1004     DataList_t* requestList;
1005     //StringDataList* stringList;
1006     boost::mutex mutex;
1007     boost::condition condvar;
1008     boost::condition condvarWakeupProducer;
1009     BRM::LBIDRange_v fDictlbids;
1010     std::vector<struct BRM::EMEntry> extents;
1011     uint64_t extentSize;
1012     uint64_t divShift;
1013     uint64_t numExtents;
1014     uint32_t fScanLbidReqLimit;     // max number of LBIDs to send in a scan
1015     // request to primproc
1016     uint32_t fScanLbidReqThreshold; // min level of scan LBID backlog before
1017     // consumer will tell producer to send
1018     bool fStopSending;
1019     bool fSingleThread;
1020     uint64_t fPhysicalIO;	// total physical I/O count
1021     uint64_t fCacheIO;		// total cache I/O count
1022     uint64_t fMsgBytesIn;   // total byte count for incoming messages
1023     uint64_t fMsgBytesOut;  // total byte count for outcoming messages
1024     uint32_t fMsgsToPm;     // total number of messages sent to PMs
1025     uint32_t fMsgsExpect;   // total blocks to scan
1026     uint32_t uniqueID;
1027     ResourceManager* fRm;
1028     BPSOutputType fOutType;
1029     rowgroup::RowGroup fOutputRowGroup;
1030     uint64_t fRidResults;
1031 
1032     //@bug 2634
1033     //@bug 3128 change ParseTree* to vector<Filter*>
1034     std::vector<const execplan::Filter*> fFilters;
1035 
1036     bool isEquality;
1037     std::vector<std::string> equalityFilter;
1038     void serializeEqualityFilter();
1039     void destroyEqualityFilter();
1040 };
1041 
1042 
1043 class BatchPrimitive : public JobStep, public PrimitiveMsg, public DECEventListener
1044 {
1045 public:
1046 
BatchPrimitive(const JobInfo & jobInfo)1047     BatchPrimitive(const JobInfo& jobInfo) : JobStep(jobInfo) {}
1048     virtual bool getFeederFlag() const = 0;
1049     virtual uint64_t getLastTupleId() const = 0;
1050     virtual uint32_t getStepCount () const = 0;
1051     virtual void setBPP(JobStep* jobStep) = 0;
1052     virtual void setFirstStepType(PrimitiveStepType firstStepType) = 0;
1053     virtual void setIsProjectionOnly() = 0;
1054     virtual void setLastTupleId(uint64_t) = 0;
1055     virtual void setOutputType(BPSOutputType outputType) = 0;
1056     virtual void setProjectBPP(JobStep* jobStep1, JobStep* jobStep2) = 0;
1057     virtual void setStepCount() = 0;
1058     virtual void setSwallowRows(const bool swallowRows) = 0;
1059     virtual void setBppStep() = 0;
1060     virtual void dec(DistributedEngineComm* dec) = 0;
1061     virtual const OIDVector& getProjectOids() const = 0;
1062     virtual uint64_t blksSkipped() const = 0;
1063     virtual bool wasStepRun() const = 0;
1064     virtual BPSOutputType getOutputType() const = 0;
1065     virtual uint64_t getRows() const = 0;
1066     virtual void setJobInfo(const JobInfo* jobInfo) = 0;
1067     virtual void setOutputRowGroup(const rowgroup::RowGroup& rg) = 0;
1068     virtual const rowgroup::RowGroup& getOutputRowGroup() const = 0;
1069     virtual void addFcnJoinExp(const std::vector<execplan::SRCP>& fe) = 0;
1070     virtual void addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree>& fe) = 0;
1071     virtual void setFE1Input(const rowgroup::RowGroup& feInput) = 0;
1072     virtual void setFcnExpGroup3(const std::vector<execplan::SRCP>& fe) = 0;
1073     virtual void setFE23Output(const rowgroup::RowGroup& rg) = 0;
1074 };
1075 
1076 
1077 /** @brief class TupleBPS
1078  *
1079  */
1080 class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
1081 {
1082 public:
1083     TupleBPS(const pColStep& rhs, const JobInfo& jobInfo);
1084     TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo);
1085     TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo);
1086     TupleBPS(const pDictionaryScan& rhs, const JobInfo& jobInfo);
1087     TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo);
1088     virtual ~TupleBPS();
1089 
1090     /** @brief Starts processing.
1091      *
1092      * Starts processing.
1093      */
1094     virtual void run();
1095     /** @brief Sync's the caller with the end of execution.
1096      *
1097      * Does nothing.  Returns when this instance is finished.
1098      */
1099     virtual void join();
1100 
1101     void abort();
1102     void abort_nolock();
1103 
1104     /** @brief The main loop for the send-side thread
1105      *
1106      * The main loop for the primitive-issuing thread.  Don't call it directly.
1107      */
1108     void sendPrimitiveMessages();
1109 
1110     /** @brief The main loop for the recv-side thread
1111      *
1112      * The main loop for the receive-side thread.  Don't call it directly.
1113      */
1114     void receiveMultiPrimitiveMessages(uint32_t threadID);
1115 
1116     /** @brief Add a filter when the column is anything but a 4-byte float type.
1117      *
1118      * Add a filter when the column is anything but a 4-byte float type, including
1119      * 8-byte doubles.
1120      */
1121     void setBPP(JobStep* jobStep);
1122     void setProjectBPP(JobStep* jobStep1, JobStep* jobStep2);
1123     bool scanit(uint64_t rid);
1124     void storeCasualPartitionInfo(const bool estimateRowCounts);
getFeederFlag()1125     bool getFeederFlag() const
1126     {
1127         return isFilterFeeder;
1128     }
setFeederFlag(bool filterFeeder)1129     void setFeederFlag (bool filterFeeder)
1130     {
1131         isFilterFeeder = filterFeeder;
1132     }
setSwallowRows(const bool swallowRows)1133     void setSwallowRows(const bool swallowRows)
1134     {
1135         fSwallowRows = swallowRows;
1136     }
getSwallowRows()1137     bool getSwallowRows() const
1138     {
1139         return fSwallowRows;
1140     }
1141 
1142     /* Base class interface fcn that can go away */
setOutputType(BPSOutputType)1143     void setOutputType(BPSOutputType) { } //Can't change the ot of a TupleBPS
getOutputType()1144     BPSOutputType getOutputType() const
1145     {
1146         return ROW_GROUP;
1147     }
setBppStep()1148     void setBppStep() { }
setIsProjectionOnly()1149     void setIsProjectionOnly() { }
1150 
getRows()1151     uint64_t getRows() const
1152     {
1153         return ridsReturned;
1154     }
setFirstStepType(PrimitiveStepType firstStepType)1155     void setFirstStepType(PrimitiveStepType firstStepType)
1156     {
1157         ffirstStepType = firstStepType;
1158     }
getPrimitiveStepType()1159     PrimitiveStepType getPrimitiveStepType ()
1160     {
1161         return ffirstStepType;
1162     }
setStepCount()1163     void setStepCount()
1164     {
1165         fStepCount++;
1166     }
getStepCount()1167     uint32_t getStepCount () const
1168     {
1169         return fStepCount;
1170     }
setLastTupleId(uint64_t id)1171     void setLastTupleId(uint64_t id)
1172     {
1173         fLastTupleId = id;
1174     }
getLastTupleId()1175     uint64_t getLastTupleId() const
1176     {
1177         return fLastTupleId;
1178     }
1179 
1180     /** @brief Set the DistributedEngineComm object this instance should use
1181      *
1182      * Set the DistributedEngineComm object this instance should use
1183      */
1184     void dec(DistributedEngineComm* dec);
1185 
1186     virtual void stepId(uint16_t stepId);
stepId()1187     virtual uint16_t stepId() const
1188     {
1189         return fStepId;
1190     }
1191     virtual const std::string toString() const;
1192 
oid()1193     virtual execplan::CalpontSystemCatalog::OID oid() const
1194     {
1195         return fOid;
1196     }
tableOid()1197     virtual execplan::CalpontSystemCatalog::OID tableOid() const
1198     {
1199         return fTableOid;
1200     }
colType()1201     const execplan::CalpontSystemCatalog::ColType& colType() const
1202     {
1203         return fColType;
1204     }
getProjectOids()1205     const OIDVector& getProjectOids() const
1206     {
1207         return projectOids;
1208     }
phyIOCount()1209     virtual uint64_t phyIOCount    () const
1210     {
1211         return fPhysicalIO;
1212     }
cacheIOCount()1213     virtual uint64_t cacheIOCount  () const
1214     {
1215         return fCacheIO;
1216     }
msgsRcvdCount()1217     virtual uint64_t msgsRcvdCount () const
1218     {
1219         return msgsRecvd;
1220     }
msgBytesIn()1221     virtual uint64_t msgBytesIn    () const
1222     {
1223         return fMsgBytesIn;
1224     }
msgBytesOut()1225     virtual uint64_t msgBytesOut   () const
1226     {
1227         return fMsgBytesOut;
1228     }
blockTouched()1229     virtual uint64_t blockTouched  () const
1230     {
1231         return fBlockTouched;
1232     }
1233     uint32_t nextBand(messageqcpp::ByteStream& bs);
1234 
1235     //...Currently only supported by pColStep and pColScanStep, so didn't bother
1236     //...to define abstract method in base class, but if start adding to other
1237     //...classes, then should consider adding pure virtual method to JobStep.
blksSkipped()1238     uint64_t blksSkipped           () const
1239     {
1240         return fNumBlksSkipped;
1241     }
1242 
getUniqueID()1243     uint32_t getUniqueID()
1244     {
1245         return uniqueID;
1246     }
1247     void useJoiner(boost::shared_ptr<joiner::TupleJoiner>);
1248     void useJoiners(const std::vector<boost::shared_ptr<joiner::TupleJoiner> >&);
wasStepRun()1249     bool wasStepRun() const
1250     {
1251         return fRunExecuted;
1252     }
1253 
1254     // DEC event listener interface
1255     void newPMOnline(uint32_t connectionNumber);
1256 
1257     void setInputRowGroup(const rowgroup::RowGroup& rg);
1258     void setOutputRowGroup(const rowgroup::RowGroup& rg);
1259     const rowgroup::RowGroup& getOutputRowGroup() const;
1260 
1261     void setAggregateStep(const rowgroup::SP_ROWAGG_PM_t& agg, const rowgroup::RowGroup& rg);
1262 
1263     /* This is called by TupleHashJoin only */
1264     void setJoinedResultRG(const rowgroup::RowGroup& rg);
1265 
1266     /* OR hacks */
1267     void setBOP(uint8_t op);  // BOP_AND or BOP_OR
getBOP()1268     uint8_t getBOP()
1269     {
1270         return bop;
1271     }
1272 
1273     void setJobInfo(const JobInfo* jobInfo);
1274 
1275     // @bug 2123.  Added getEstimatedRowCount function.
1276     /* @brief estimates the number of rows that will be returned for use in determining the
1277     *  large side table for hashjoins.
1278     */
1279     uint64_t getEstimatedRowCount();
1280 
1281     /* Functions & Expressions support.
1282     	Group 1 is for single-table filters only at the moment.  Group 1 objects
1283     	are registered by JLF on the TBPS object directly because there is no join
1284     	involved.
1285 
1286     	Group 2 is for cross-table filters only and should be registered on the
1287     	join instance by the JLF.  When the query starts running, the join object
1288     	decides whether the Group 2 instance should run on the PM and UM, then
1289     	registers it with the TBPS.
1290 
1291     	Group 3 is for selected columns whether or not its calculation is single-
1292     	table or cross-table.  If it's single-table and there's no join instance,
1293     	JLF should register that object with the TBPS for that table.  If it's
1294     	cross-table, then JLF should register it with the join step.
1295     */
1296     void addFcnJoinExp(const std::vector<execplan::SRCP>& fe);
1297     void addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree>& fe);
1298     void setFE1Input(const rowgroup::RowGroup& feInput);
1299 
1300     /* for use by the THJS only... */
1301     void setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper>& fe2,
1302                          const rowgroup::RowGroup& output, bool runFE2onPM);
1303 
1304     /* Functions & Expressions in select and groupby clause.
1305     JLF should use these only if there isn't a join.  If there is, call the
1306     equivalent fcn on THJS instead */
1307     void setFcnExpGroup3(const std::vector<execplan::SRCP>& fe);
1308     void setFE23Output(const rowgroup::RowGroup& rg);
hasFcnExpGroup3()1309     bool hasFcnExpGroup3()
1310     {
1311         return (fe2 != NULL);
1312     }
1313 
1314     // rowgroup to connector
1315     const rowgroup::RowGroup& getDeliveredRowGroup() const;
1316     void  deliverStringTableRowGroup(bool b);
1317     bool  deliverStringTableRowGroup() const;
1318 
1319     /* Interface for adding add'l predicates for casual partitioning.
1320      * This fcn checks for any intersection between the values in vals
1321      * and the range of a given extent.  If there is no intersection, that extent
1322      * won't be processed.  For every extent in OID, it effectively calculates
1323      * ((vals[0] >= min && vals[0] <= max) || (vals[1] >= min && vals[1] <= max)) ...
1324      * && (previous calculation for that extent).
1325      * Note that it is an adder not a setter.  For an extent to be scanned, all calls
1326      * must have a non-empty intersection.
1327      */
1328     void addCPPredicates(uint32_t OID, const std::vector<int64_t>& vals, bool isRange);
1329 
1330     /* semijoin adds */
1331     void setJoinFERG(const rowgroup::RowGroup& rg);
1332 
1333     /* To cover over the race between creating extents in each column.  Mitigates
1334      * bug 3607.*/
1335     bool goodExtentCount();
1336     void reloadExtentLists();
1337     void initExtentMarkers();   // need a better name for this
1338 
stringTableFriendly()1339     virtual bool stringTableFriendly()
1340     {
1341         return true;
1342     }
1343 
1344 protected:
1345     void sendError(uint16_t status);
1346 
1347 private:
1348     void formatMiniStats();
1349 
1350     void startPrimitiveThread();
1351     void startAggregationThread();
1352     void initializeConfigParms();
1353     uint64_t getFBO(uint64_t lbid);
1354     void checkDupOutputColumns(const rowgroup::RowGroup& rg);
1355     void dupOutputColumns(rowgroup::RowGroup&);
1356     void dupOutputColumns(rowgroup::RGData&, rowgroup::RowGroup&);
1357     void rgDataToDl(rowgroup::RGData&, rowgroup::RowGroup&, RowGroupDL*);
1358     void rgDataVecToDl(std::vector<rowgroup::RGData>&, rowgroup::RowGroup&, RowGroupDL*);
1359 
1360     DistributedEngineComm* fDec;
1361     boost::shared_ptr<BatchPrimitiveProcessorJL> fBPP;
1362     uint16_t fNumSteps;
1363     int fColWidth;
1364     uint32_t fStepCount;
1365     bool fCPEvaluated;  // @bug 2123
1366     uint64_t fEstimatedRows; // @bug 2123
1367     /// number of threads on the receive side
1368     uint32_t fMaxNumThreads;
1369     uint32_t fNumThreads;
1370     PrimitiveStepType ffirstStepType;
1371     bool isFilterFeeder;
1372     std::vector<uint64_t> fProducerThreads; // thread pool handles
1373     messageqcpp::ByteStream fFilterString;
1374     uint32_t fFilterCount;
1375     execplan::CalpontSystemCatalog::ColType fColType;
1376     execplan::CalpontSystemCatalog::OID fOid;
1377     execplan::CalpontSystemCatalog::OID fTableOid;
1378     uint64_t fLastTupleId;
1379     BRM::LBIDRange_v lbidRanges;
1380     std::vector<int32_t> lastExtent;
1381     std::vector<BRM::LBID_t> lastScannedLBID;
1382     BRM::DBRM dbrm;
1383     SP_LBIDList lbidList;
1384     uint64_t ridsRequested;
1385     uint64_t totalMsgs;
1386     volatile uint64_t msgsSent;
1387     volatile uint64_t msgsRecvd;
1388     volatile bool finishedSending;
1389     bool firstRead;
1390     bool sendWaiting;
1391     uint32_t recvWaiting;
1392     uint32_t recvExited;
1393     uint64_t ridsReturned;
1394     std::map<execplan::CalpontSystemCatalog::OID, std::tr1::unordered_map<int64_t, struct BRM::EMEntry> > extentsMap;
1395     std::vector<BRM::EMEntry> scannedExtents;
1396     OIDVector projectOids;
1397     uint32_t extentSize, divShift, rpbShift, numExtents, modMask;
1398     uint32_t fRequestSize; // the number of logical extents per batch of requests sent to PrimProc.
1399     uint32_t fProcessorThreadsPerScan; // The number of messages sent per logical extent.
1400     bool fSwallowRows;
1401     uint32_t fMaxOutstandingRequests; // The number of logical extents have not processed by PrimProc
1402     uint64_t fPhysicalIO;	// total physical I/O count
1403     uint64_t fCacheIO;		// total cache I/O count
1404     uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP
1405     uint64_t fMsgBytesIn;   // total byte count for incoming messages
1406     uint64_t fMsgBytesOut;  // total byte count for outcoming messages
1407     uint64_t fBlockTouched; // total blocks touched
1408     uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File
1409     // uint64_t cThread;  //consumer thread. thread handle from thread pool
1410     uint64_t pThread;  //producer thread. thread handle from thread pool
1411     boost::mutex tplMutex;
1412     boost::mutex dlMutex;
1413     boost::mutex cpMutex;
1414     boost::mutex serializeJoinerMutex;
1415     boost::condition condvarWakeupProducer, condvar;
1416 
1417     std::vector<bool> scanFlags; // use to keep track of which extents to eliminate from this step
1418     bool BPPIsAllocated;
1419     uint32_t uniqueID;
1420     ResourceManager* fRm;
1421 
1422     /* HashJoin support */
1423 
1424     void serializeJoiner();
1425     void serializeJoiner(uint32_t connectionNumber);
1426 
1427     void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer> >& joinerOutput,
1428                                rowgroup::Row& baseRow, const std::vector<boost::shared_array<int> >& mappings,
1429                                const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData,
1430                                std::vector<rowgroup::RGData>* outputData,
1431                                const boost::scoped_array<rowgroup::Row>& smallRows, rowgroup::Row& joinedRow);
1432 
1433     std::vector<boost::shared_ptr<joiner::TupleJoiner> > tjoiners;
1434     bool doJoin, hasPMJoin, hasUMJoin;
1435     std::vector<rowgroup::RowGroup> joinerMatchesRGs;   // parses the small-side matches from joiner
1436 
1437     uint32_t smallSideCount;
1438     int  smallOuterJoiner;
1439 
1440     bool fRunExecuted; // was the run method executed for this step
1441     rowgroup::RowGroup inputRowGroup;   // for parsing the data read from the datalist
1442     rowgroup::RowGroup primRowGroup;    // for parsing the data received from the PM
1443     rowgroup::RowGroup outputRowGroup;  // if there's a join, these are the joined
1444     // result, otherwise it's = to primRowGroup
1445     // aggregation support
1446     rowgroup::SP_ROWAGG_PM_t fAggregatorPm;
1447     rowgroup::RowGroup       fAggRowGroupPm;
1448 
1449     // OR hacks
1450     uint8_t bop; 		// BOP_AND or BOP_OR
1451 
1452     // temporary hack to make sure JobList only calls run and join once
1453     boost::mutex jlLock;
1454     bool runRan;
1455     bool joinRan;
1456 
1457     // bug 1965, trace duplicat columns in delivery list <dest, src>
1458     std::vector<std::pair<uint32_t, uint32_t> > dupColumns;
1459 
1460     /* Functions & Expressions vars */
1461     boost::shared_ptr<funcexp::FuncExpWrapper> fe1, fe2;
1462     rowgroup::RowGroup fe1Input, fe2Output;
1463     boost::shared_array<int> fe2Mapping;
1464     bool runFEonPM;
1465 
1466     /* for UM F & E 2 processing */
1467     rowgroup::RGData fe2Data;
1468     rowgroup::Row fe2InRow, fe2OutRow;
1469 
1470     void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output,
1471                     rowgroup::Row& inRow, rowgroup::Row& outRow,
1472                     std::vector<rowgroup::RGData>* rgData,
1473                     funcexp::FuncExpWrapper* localFE2);
1474     void processFE2_oneRG(rowgroup::RowGroup& input, rowgroup::RowGroup& output,
1475                           rowgroup::Row& inRow, rowgroup::Row& outRow,
1476                           funcexp::FuncExpWrapper* localFE2);
1477 
1478     /* Runtime Casual Partitioning adjustments.  The CP code is needlessly complicated;
1479      * to avoid making it worse, decided to designate 'scanFlags' as the static
1480      * component and this new array as the runtime component.  The final CP decision
1481      * is scanFlags & runtimeCP.
1482      */
1483     std::vector<bool> runtimeCPFlags;
1484 
1485     /* semijoin vars */
1486     rowgroup::RowGroup joinFERG;
1487 
1488     boost::shared_ptr<RowGroupDL> deliveryDL;
1489     uint32_t deliveryIt;
1490 
1491     /* shared nothing support */
1492     struct Job
1493     {
JobJob1494         Job(uint32_t d, uint32_t n, uint32_t b, boost::shared_ptr<messageqcpp::ByteStream>& bs) :
1495             dbroot(d), connectionNum(n), expectedResponses(b), msg(bs) { }
1496         uint32_t dbroot;
1497         uint32_t connectionNum;
1498         uint32_t expectedResponses;
1499         boost::shared_ptr<messageqcpp::ByteStream> msg;
1500     };
1501 
1502     void prepCasualPartitioning();
1503     void makeJobs(std::vector<Job>* jobs);
1504     void interleaveJobs(std::vector<Job>* jobs) const;
1505     void sendJobs(const std::vector<Job>& jobs);
1506     uint32_t numDBRoots;
1507 
1508     /* Pseudo column filter processing.  Think about refactoring into a separate class. */
1509     bool processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr<std::map<int, int> > dbRootPMMap) const;
1510     bool processOneFilterType(int8_t colWidth, int64_t value, uint32_t type) const;
1511     bool processSingleFilterString(int8_t BOP, int8_t colWidth, int64_t val, const uint8_t* filterString,
1512                                    uint32_t filterCount) const;
1513     bool processSingleFilterString_ranged(int8_t BOP, int8_t colWidth, int64_t min, int64_t max,
1514                                           const uint8_t* filterString, uint32_t filterCount) const;
1515     bool processLBIDFilter(const BRM::EMEntry& emEntry) const;
1516     bool compareSingleValue(uint8_t COP, int64_t val1, int64_t val2) const;
1517     bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const;
1518     bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter,
1519          hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter;
1520 
1521 };
1522 
1523 /** @brief class FilterStep
1524  *
1525  */
1526 class FilterStep : public JobStep
1527 {
1528 public:
1529 
1530     FilterStep(const execplan::CalpontSystemCatalog::ColType& colType, const JobInfo& jobInfo);
1531     ~FilterStep();
1532 
1533     /** @brief virtual void Run method
1534      */
1535     void run();
1536     void join();
1537 
1538     const std::string toString() const;
1539 
tableOid()1540     execplan::CalpontSystemCatalog::OID tableOid() const
1541     {
1542         return fTableOID;
1543     }
tableOid(execplan::CalpontSystemCatalog::OID tableOid)1544     void tableOid(execplan::CalpontSystemCatalog::OID tableOid)
1545     {
1546         fTableOID = tableOid;
1547     }
colType()1548     const execplan::CalpontSystemCatalog::ColType& colType() const
1549     {
1550         return fColType;
1551     }
1552     void setBOP(int8_t b);
BOP()1553     int8_t BOP() const
1554     {
1555         return fBOP;
1556     }
1557     friend struct FSRunner;
1558 
1559     void addFilter(const execplan::Filter* f);
getFilters()1560     std::vector<const execplan::Filter*>& getFilters()
1561     {
1562         return fFilters;
1563     }
1564 
1565 protected:
1566 //	void unblockDataLists(FifoDataList* fifo, StringFifoDataList* strFifo, StrDataList* strResult, DataList_t* result);
1567 
1568 private:
1569 
1570     //This i/f is not meaningful in this step
oid()1571     execplan::CalpontSystemCatalog::OID oid() const
1572     {
1573         return 0;
1574     }
1575     void doFilter();    // @bug 686
1576 
1577 // 	config::Config *fConfig;
1578 
1579     execplan::CalpontSystemCatalog::OID fTableOID;
1580     execplan::CalpontSystemCatalog::ColType fColType;
1581     int8_t fBOP;
1582     // int64_t runner;    // thread handle from thread pool
1583 
1584     // @bug 687 Add data and friend declarations for concurrent filter steps.
1585     std::vector<ElementType> fSortedA; // used to internally sort input data
1586     std::vector<ElementType> fSortedB;
1587 //	FifoDataList* fFAp;                // Used to internally pass data to
1588 //	FifoDataList* fFBp;                // FilterOperation thread.
1589     uint64_t  resultCount;
1590 
1591     std::vector<const execplan::Filter*> fFilters;
1592 };
1593 
1594 /** @brief class PassThruStep
1595  *
1596  */
1597 class PassThruStep : public JobStep, public PrimitiveMsg
1598 {
1599 
1600     typedef std::pair<int64_t, int64_t> element_t;
1601 
1602 public:
1603     /** @brief PassThruStep constructor
1604      */
1605     PassThruStep(
1606         execplan::CalpontSystemCatalog::OID oid,
1607         execplan::CalpontSystemCatalog::OID tableOid,
1608         const execplan::CalpontSystemCatalog::ColType& colType,
1609         const JobInfo& jobInfo);
1610 
1611     PassThruStep(const pColStep& rhs);
1612     PassThruStep(const PseudoColStep& rhs);
1613 
1614     virtual ~PassThruStep();
1615 
1616     /** @brief Starts processing.  Set at least the RID list before calling.
1617      *
1618      * Starts processing.  Set at least the RID list before calling this.
1619      */
1620     virtual void run();
1621 
1622     /** @brief Sync's the caller with the end of execution.
1623      *
1624      * Does nothing.  Returns when this instance is finished.
1625      */
1626     virtual void join();
1627 
1628     virtual const std::string toString() const;
1629 
oid()1630     virtual execplan::CalpontSystemCatalog::OID oid() const
1631     {
1632         return fOid;
1633     }
1634 
tableOid()1635     virtual execplan::CalpontSystemCatalog::OID tableOid() const
1636     {
1637         return fTableOid;
1638     }
1639 
getColWidth()1640     uint8_t getColWidth() const
1641     {
1642         return colWidth;
1643     }
isDictCol()1644     bool isDictCol() const
1645     {
1646         return isDictColumn;
1647     }
isExeMgr()1648     bool isExeMgr() const
1649     {
1650         return isEM;
1651     }
colType()1652     const execplan::CalpontSystemCatalog::ColType& colType() const
1653     {
1654         return fColType;
1655     }
resourceManager()1656     ResourceManager* resourceManager() const
1657     {
1658         return fRm;
1659     }
1660 
pseudoType(uint32_t p)1661     void pseudoType(uint32_t p)
1662     {
1663         fPseudoType = p;
1664     }
pseudoType()1665     uint32_t pseudoType() const
1666     {
1667         return fPseudoType;
1668     }
1669 
1670 protected:
1671 
1672 private:
1673 
1674     /** @brief constructor for completeness
1675      */
1676     explicit PassThruStep();
1677 
1678     uint64_t getLBID(uint64_t rid, bool& scan);
1679     uint64_t getFBO(uint64_t lbid);
1680 
1681     boost::shared_ptr<execplan::CalpontSystemCatalog> catalog;
1682     execplan::CalpontSystemCatalog::OID fOid;
1683     execplan::CalpontSystemCatalog::OID fTableOid;
1684     uint8_t colWidth;
1685     uint16_t realWidth;
1686     uint32_t fPseudoType;
1687     execplan::CalpontSystemCatalog::ColType fColType;
1688     bool isDictColumn;
1689     bool isEM;
1690 
1691 //	boost::thread* fPTThd;
1692 
1693     // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4.
1694     // 	      Running with this one will swallow rows at projection.
1695     bool fSwallowRows;
1696     ResourceManager* fRm;
1697     friend class PassThruCommandJL;
1698     friend class RTSCommandJL;
1699     friend class BatchPrimitiveStep;
1700     friend class TupleBPS;
1701 };
1702 
1703 class PseudoColStep : public pColStep
1704 {
1705 public:
1706     /** @brief PseudoColStep constructor
1707      */
PseudoColStep(execplan::CalpontSystemCatalog::OID oid,execplan::CalpontSystemCatalog::OID tableOid,uint32_t pId,const execplan::CalpontSystemCatalog::ColType & ct,const JobInfo & jobInfo)1708     PseudoColStep(
1709         execplan::CalpontSystemCatalog::OID oid,
1710         execplan::CalpontSystemCatalog::OID tableOid,
1711         uint32_t pId,
1712         const execplan::CalpontSystemCatalog::ColType& ct,
1713         const JobInfo& jobInfo) :
1714         pColStep(oid, tableOid, ct, jobInfo),
1715         fPseudoColumnId(pId)
1716     {}
1717 
PseudoColStep(const PassThruStep & rhs)1718     PseudoColStep(const PassThruStep& rhs) :
1719         pColStep(rhs),
1720         fPseudoColumnId(rhs.pseudoType())
1721     {}
1722 
~PseudoColStep()1723     virtual ~PseudoColStep() {}
1724 
pseudoColumnId()1725     uint32_t pseudoColumnId() const
1726     {
1727         return fPseudoColumnId;
1728     }
pseudoColumnId(uint32_t pId)1729     void pseudoColumnId(uint32_t pId)
1730     {
1731         fPseudoColumnId = pId;
1732     }
1733 
1734 protected:
1735     uint32_t fPseudoColumnId;
1736 
1737 private:
1738     /** @brief disabled constuctor
1739      */
1740     PseudoColStep(const pColScanStep&);
1741     PseudoColStep(const pColStep&);
1742 };
1743 
1744 
1745 }
1746 
1747 #endif  // JOBLIST_PRIMITIVESTEP_H
1748 // vim:ts=4 sw=4:
1749 
1750 
1751