1 /* Copyright (C) 2014 InfiniDB, Inc. 2 Copyright (C) 2019 MariaDB Corporation 3 4 This program is free software; you can redistribute it and/or 5 modify it under the terms of the GNU General Public License 6 as published by the Free Software Foundation; version 2 of 7 the License. 8 9 This program is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License 15 along with this program; if not, write to the Free Software 16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 17 MA 02110-1301, USA. */ 18 19 // $Id: primitivestep.h 9688 2013-07-15 19:27:22Z pleblanc $ 20 21 22 /** @file */ 23 24 #ifndef JOBLIST_PRIMITIVESTEP_H 25 #define JOBLIST_PRIMITIVESTEP_H 26 27 #include <iostream> 28 #include <sstream> 29 #include <vector> 30 #include <string> 31 #include <utility> 32 #include <cassert> 33 #include <sys/time.h> 34 #include <set> 35 #include <map> 36 #include <stdexcept> 37 #include <sstream> 38 #ifndef _MSC_VER 39 #include <tr1/memory> 40 #else 41 #include <memory> 42 #endif 43 44 #include <boost/shared_ptr.hpp> 45 #include <boost/shared_array.hpp> 46 #include <boost/thread.hpp> 47 #include <boost/thread/condition.hpp> 48 49 #include "calpontsystemcatalog.h" 50 #include "calpontselectexecutionplan.h" 51 #include "brm.h" 52 #include "parsetree.h" 53 #include "simplefilter.h" 54 55 #include "jobstep.h" 56 #include "primitivemsg.h" 57 #include "elementtype.h" 58 #include "distributedenginecomm.h" 59 #include "lbidlist.h" 60 #include "joblisttypes.h" 61 #include "timestamp.h" 62 #include "timeset.h" 63 #include "resourcemanager.h" 64 #include "joiner.h" 65 #include "tuplejoiner.h" 66 #include "rowgroup.h" 67 #include "rowaggregation.h" 68 #include "funcexpwrapper.h" 69 70 namespace joblist 71 { 72 73 /* Forward decl's to support the batch primitive classes */ 74 struct JobInfo; 75 class CommandJL; 76 class ColumnCommandJL; 77 class DictStepJL; 78 class BatchPrimitiveProcessorJL; 79 class pColStep; 80 class pColScanStep; 81 class PassThruStep; 82 class PseudoColStep; 83 84 85 typedef boost::shared_ptr<LBIDList> SP_LBIDList; 86 typedef std::vector<execplan::CalpontSystemCatalog::OID> OIDVector; 87 typedef std::vector<std::pair<execplan::CalpontSystemCatalog::OID, int> > OIDIntVector; 88 89 90 enum PrimitiveStepType 91 { 92 SCAN, 93 COLSTEP, 94 DICTIONARYSCAN, 95 DICTIONARY, 96 PASSTHRU, 97 AGGRFILTERSTEP 98 }; 99 100 101 /** @brief class PrimitiveMsg 102 * 103 */ 104 class PrimitiveMsg 105 { 106 public: 107 /** @brief virtual void Send method 108 */ 109 virtual void send(); 110 /** @brief virtual void Receive method 111 */ 112 virtual void receive(); 113 /** @brief virtual void BuildPrimitiveMessage method 114 */ 115 virtual void buildPrimitiveMessage(ISMPACKETCOMMAND cmd, void* filterValues, void* ridArray); 116 virtual void sendPrimitiveMessages(); 117 virtual void receivePrimitiveMessages(); 118 PrimitiveMsg()119 PrimitiveMsg() { } 120 ~PrimitiveMsg()121 virtual ~PrimitiveMsg() { } 122 123 uint16_t planFlagsToPrimFlags(uint32_t planFlags); 124 125 private: 126 }; 127 128 129 class pColScanStep; 130 class pColStep : public JobStep, public PrimitiveMsg 131 { 132 133 typedef std::pair<int64_t, int64_t> element_t; 134 135 public: 136 /** @brief pColStep constructor 137 * @param flushInterval The interval in msgs at which the sending side should 138 * wait for the receiveing side to catch up. 0 (default) means never. 139 */ 140 pColStep( 141 execplan::CalpontSystemCatalog::OID oid, 142 execplan::CalpontSystemCatalog::OID tableOid, 143 const execplan::CalpontSystemCatalog::ColType& ct, 144 const JobInfo& jobInfo); 145 146 pColStep(const pColScanStep& rhs); 147 148 pColStep(const PassThruStep& rhs); 149 150 virtual ~pColStep(); 151 152 /** @brief Starts processing. Set at least the RID list before calling. 153 * 154 * Starts processing. Set at least the RID list before calling this. 155 */ 156 virtual void run(); 157 /** @brief Sync's the caller with the end of execution. 158 * 159 * Does nothing. Returns when this instance is finished. 160 */ 161 virtual void join(); 162 163 virtual const std::string toString() const; 164 isDictCol()165 virtual bool isDictCol() const 166 { 167 return fIsDict; 168 }; isExeMgr()169 bool isExeMgr() const 170 { 171 return isEM; 172 } 173 174 /** @brief Set config parameters for this JobStep. 175 * 176 * Set the config parameters this JobStep. 177 */ 178 void initializeConfigParms(); 179 180 /** @brief The main loop for the send-side thread 181 * 182 * The main loop for the primitive-issuing thread. Don't call it directly. 183 */ 184 void sendPrimitiveMessages(); 185 186 /** @brief The main loop for the recv-side thread 187 * 188 * The main loop for the receive-side thread. Don't call it directly. 189 */ 190 void receivePrimitiveMessages(); 191 192 /** @brief Add a filter. Use this interface when the column stores anything but 4-byte floats. 193 * 194 * Add a filter. Use this interface when the column stores anything but 4-byte floats. 195 */ 196 void addFilter(int8_t COP, int64_t value, uint8_t roundFlag = 0); 197 void addFilter(int8_t COP, float value); 198 199 /** @brief Sets the DataList to get RID values from. 200 * 201 * Sets the DataList to get RID values from. Filtering by RID distinguishes 202 * this class from pColScan. Use pColScan if the every RID should be considered; it's 203 * faster at that. 204 */ 205 void setRidList(DataList<ElementType>* rids); 206 207 /** @brief Sets the String DataList to get RID values from. 208 * 209 * Sets the string DataList to get RID values from. Filtering by RID distinguishes 210 * this class from pColScan. Use pColScan if the every RID should be considered; it's 211 * faster at that. 212 */ 213 void setStrRidList(DataList<StringElementType>* strDl); 214 215 /** @brief Set the binary operator for the filter predicate (BOP_AND or BOP_OR). 216 * 217 * Set the binary operator for the filter predicate (BOP_AND or BOP_OR). 218 */ 219 void setBOP(int8_t BOP); 220 221 /** @brief Set the output type. 222 * 223 * Set the output type (1 = RID, 2 = Token, 3 = Both). 224 */ 225 void setOutputType(int8_t OutputType); 226 227 /** @brief Set the swallowRows flag. 228 * 229 * 230 * If true, no rows will be inserted to the output datalists. 231 */ 232 void setSwallowRows(const bool swallowRows); 233 234 /** @brief Get the swallowRows flag. 235 * 236 * 237 * If true, no rows will be inserted to the output datalists. 238 */ getSwallowRows()239 bool getSwallowRows() const 240 { 241 return fSwallowRows; 242 } 243 oid()244 virtual execplan::CalpontSystemCatalog::OID oid() const 245 { 246 return fOid; 247 } 248 tableOid()249 virtual execplan::CalpontSystemCatalog::OID tableOid() const 250 { 251 return fTableOid; 252 } 253 filterCount()254 uint32_t filterCount() const 255 { 256 return fFilterCount; 257 } filterString()258 const messageqcpp::ByteStream& filterString() const 259 { 260 return fFilterString; 261 } BOP()262 int8_t BOP() const 263 { 264 return fBOP; 265 } colType()266 const execplan::CalpontSystemCatalog::ColType& colType() const 267 { 268 return fColType; 269 } 270 void appendFilter(const messageqcpp::ByteStream& filter, unsigned count); flushInterval()271 uint32_t flushInterval() const 272 { 273 return fFlushInterval; 274 } getFeederFlag()275 bool getFeederFlag() const 276 { 277 return isFilterFeeder; 278 } 279 setFeederFlag(bool filterFeeder)280 void setFeederFlag (bool filterFeeder) 281 { 282 isFilterFeeder = filterFeeder; 283 } phyIOCount()284 virtual uint64_t phyIOCount () const 285 { 286 return fPhysicalIO; 287 } cacheIOCount()288 virtual uint64_t cacheIOCount () const 289 { 290 return fCacheIO; 291 } msgsRcvdCount()292 virtual uint64_t msgsRcvdCount () const 293 { 294 return msgsRecvd; 295 } msgBytesIn()296 virtual uint64_t msgBytesIn () const 297 { 298 return fMsgBytesIn; 299 } msgBytesOut()300 virtual uint64_t msgBytesOut () const 301 { 302 return fMsgBytesOut; 303 } 304 305 //...Currently only supported by pColStep and pColScanStep, so didn't bother 306 //...to define abstract method in base class, but if start adding to other 307 //...classes, then should consider adding pure virtual method to JobStep. blksSkipped()308 uint64_t blksSkipped () const 309 { 310 return fNumBlksSkipped; 311 } resourceManager()312 ResourceManager* resourceManager() const 313 { 314 return fRm; 315 } 316 getlbidList()317 SP_LBIDList getlbidList() const 318 { 319 return lbidList; 320 } 321 322 void addFilter(const execplan::Filter* f); 323 void appendFilter(const std::vector<const execplan::Filter*>& fs); getFilters()324 std::vector<const execplan::Filter*>& getFilters() 325 { 326 return fFilters; 327 } 328 329 protected: 330 void addFilters(); 331 332 private: 333 334 /** @brief constructor for completeness 335 */ 336 explicit pColStep(); 337 338 /** @brief StartPrimitiveThread 339 * Utility function to start worker thread that sends primitive messages 340 */ 341 void startPrimitiveThread(); 342 /** @brief StartAggregationThread 343 * Utility function to start worker thread that receives result aggregation from primitive servers 344 */ 345 void startAggregationThread(); 346 uint64_t getLBID(uint64_t rid, bool& scan); 347 uint64_t getFBO(uint64_t lbid); 348 349 ResourceManager* fRm; 350 boost::shared_ptr<execplan::CalpontSystemCatalog> sysCat; 351 execplan::CalpontSystemCatalog::OID fOid; 352 execplan::CalpontSystemCatalog::OID fTableOid; 353 execplan::CalpontSystemCatalog::ColType fColType; 354 uint32_t fFilterCount; 355 int8_t fBOP; 356 int8_t fOutputType; 357 uint16_t realWidth; 358 DataList_t* ridList; 359 StrDataList* strRidList; 360 messageqcpp::ByteStream fFilterString; 361 std::vector<struct BRM::EMEntry> extents; 362 uint32_t extentSize, divShift, modMask, ridsPerBlock, rpbShift, blockSizeShift, numExtents; 363 uint64_t rpbMask; 364 uint64_t msgsSent, msgsRecvd; 365 bool finishedSending, recvWaiting, fIsDict; 366 bool isEM; 367 int64_t ridCount; 368 uint32_t fFlushInterval; 369 370 // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. 371 // Running with this one will swallow rows at projection. 372 bool fSwallowRows; 373 uint32_t fProjectBlockReqLimit; // max number of rids to send in a scan 374 // request to primproc 375 uint32_t fProjectBlockReqThreshold; // min level of rids backlog before 376 // consumer will tell producer to send 377 // more rids scan requests to primproc 378 379 volatile bool fStopSending; 380 bool isFilterFeeder; 381 uint64_t fPhysicalIO; // total physical I/O count 382 uint64_t fCacheIO; // total cache I/O count 383 uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP 384 uint64_t fMsgBytesIn; // total byte count for incoming messages 385 uint64_t fMsgBytesOut; // total byte count for outcoming messages 386 387 BRM::DBRM dbrm; 388 389 // boost::shared_ptr<boost::thread> cThread; //consumer thread 390 // boost::shared_ptr<boost::thread> pThread; //producer thread 391 boost::mutex mutex; 392 boost::condition condvar; 393 boost::condition flushed; 394 SP_LBIDList lbidList; 395 std::vector<bool> scanFlags; // use to keep track of which extents to eliminate from this step 396 uint32_t uniqueID; 397 398 //@bug 2634 399 //@bug 3128 change ParseTree* to vector<Filter*> 400 std::vector<const execplan::Filter*> fFilters; 401 402 friend class pColScanStep; 403 friend class PassThruStep; 404 friend class ColumnCommandJL; 405 friend class RTSCommandJL; 406 friend class BatchPrimitiveStep; 407 friend class TupleBPS; 408 }; 409 410 /** @brief the pColScan Step 411 * 412 * The most common step which requires no input RID list, but may have value filters applied 413 * 414 * The input association will always be null here so that we can go as soon as the Run function is called 415 * 416 * The StartPrimitiveThread will spawn a new worker thread that will 417 * a) take any input filters and apply them to a primitive message to be sent 418 * b) walk the block resolution manager via an LBID list for the oid 419 * c) send messages to the primitive server as quickly as possible 420 */ 421 422 class pColScanStep : public JobStep, public PrimitiveMsg 423 { 424 public: 425 /** @brief pColScanStep constructor 426 */ 427 pColScanStep( 428 execplan::CalpontSystemCatalog::OID oid, 429 execplan::CalpontSystemCatalog::OID tableOid, 430 const execplan::CalpontSystemCatalog::ColType& ct, 431 const JobInfo& jobInfo); 432 433 pColScanStep(const pColStep& rhs); 434 ~pColScanStep(); 435 436 /** @brief Starts processing. 437 * 438 * Starts processing. 439 */ 440 virtual void run(); 441 442 /** @brief Sync's the caller with the end of execution. 443 * 444 * Does nothing. Returns when this instance is finished. 445 */ 446 virtual void join(); 447 isDictCol()448 virtual bool isDictCol() const 449 { 450 return fIsDict; 451 }; 452 453 /** @brief The main loop for the send-side thread 454 * 455 * The main loop for the primitive-issuing thread. Don't call it directly. 456 */ 457 void sendPrimitiveMessages(); 458 459 /** @brief The main loop for the recv-side thread 460 * 461 * The main loop for the receive-side thread. Don't call it directly. 462 */ 463 using PrimitiveMsg::receivePrimitiveMessages; 464 void receivePrimitiveMessages(uint64_t i = 0); 465 466 /** @brief Add a filter when the column is a 4-byte float type 467 * 468 * Add a filter when the column is a 4-byte float type 469 */ 470 void addFilter(int8_t COP, float value); 471 472 /** @brief Add a filter when the column is anything but a 4-byte float type. 473 * 474 * Add a filter when the column is anything but a 4-byte float type, including 475 * 8-byte doubles. 476 */ 477 void addFilter(int8_t COP, int64_t value, uint8_t roundFlag = 0); 478 479 /** @brief Set the binary operator for the filter predicates 480 * 481 * Set the binary operator for the filter predicates (BOP_AND or BOP_OR). 482 * It is initialized to OR. 483 */ 484 void setBOP(int8_t BOP); // AND or OR BOP()485 int8_t BOP() const 486 { 487 return fBOP; 488 } 489 getFeederFlag()490 bool getFeederFlag() const 491 { 492 return isFilterFeeder; 493 } 494 setFeederFlag(bool filterFeeder)495 void setFeederFlag (bool filterFeeder) 496 { 497 isFilterFeeder = filterFeeder; 498 } 499 /** @brief Get the string of the filter predicates 500 * 501 * Get the filter string constructed from the predicates 502 */ filterString()503 messageqcpp::ByteStream filterString() const 504 { 505 return fFilterString; 506 } 507 508 void setSingleThread(bool b); getSingleThread()509 bool getSingleThread() 510 { 511 return fSingleThread; 512 } 513 514 /** @brief Set the output type. 515 * 516 * Set the output type (1 = RID, 2 = Token, 3 = Both).pColScan 517 */ 518 void setOutputType(int8_t OutputType); filterCount()519 uint32_t filterCount() const 520 { 521 return fFilterCount; 522 } 523 524 virtual const std::string toString() const; 525 oid()526 virtual execplan::CalpontSystemCatalog::OID oid() const 527 { 528 return fOid; 529 } 530 tableOid()531 virtual execplan::CalpontSystemCatalog::OID tableOid() const 532 { 533 return fTableOid; 534 } colType()535 const execplan::CalpontSystemCatalog::ColType& colType() const 536 { 537 return fColType; 538 } resourceManager()539 ResourceManager* resourceManager() const 540 { 541 return fRm; 542 } 543 phyIOCount()544 virtual uint64_t phyIOCount () const 545 { 546 return fPhysicalIO; 547 } cacheIOCount()548 virtual uint64_t cacheIOCount () const 549 { 550 return fCacheIO; 551 } msgsRcvdCount()552 virtual uint64_t msgsRcvdCount () const 553 { 554 return recvCount; 555 } msgBytesIn()556 virtual uint64_t msgBytesIn () const 557 { 558 return fMsgBytesIn; 559 } msgBytesOut()560 virtual uint64_t msgBytesOut () const 561 { 562 return fMsgBytesOut; 563 } getRidsPerBlock()564 uint32_t getRidsPerBlock() const 565 { 566 return ridsPerBlock; 567 } 568 569 //...Currently only supported by pColStep and pColScanStep, so didn't bother 570 //...to define abstract method in base class, but if start adding to other 571 //...classes, then should consider adding pure virtual method to JobStep. blksSkipped()572 uint64_t blksSkipped () const 573 { 574 return fNumBlksSkipped; 575 } 576 udfName()577 std::string udfName() const 578 { 579 return fUdfName; 580 }; udfName(const std::string & name)581 void udfName(const std::string& name) 582 { 583 fUdfName = name; 584 } 585 getlbidList()586 SP_LBIDList getlbidList() const 587 { 588 return lbidList; 589 } 590 591 void addFilter(const execplan::Filter* f); 592 void appendFilter(const std::vector<const execplan::Filter*>& fs); getFilters()593 std::vector<const execplan::Filter*>& getFilters() 594 { 595 return fFilters; 596 } getFilters()597 const std::vector<const execplan::Filter*>& getFilters() const 598 { 599 return fFilters; 600 } 601 602 protected: 603 void addFilters(); 604 605 private: 606 //defaults okay? 607 //pColScanStep(const pColScanStep& rhs); 608 //pColScanStep& operator=(const pColScanStep& rhs); 609 610 typedef boost::shared_ptr<boost::thread> SPTHD; 611 void startPrimitiveThread(); 612 void startAggregationThread(); 613 void initializeConfigParms(); 614 void sendAPrimitiveMessage ( 615 ISMPacketHeader& ism, 616 BRM::LBID_t msgLbidStart, 617 uint32_t msgLbidCount); 618 uint64_t getFBO(uint64_t lbid); 619 bool isEmptyVal(const uint8_t* val8) const; 620 621 ResourceManager* fRm; 622 ColByScanRangeRequestHeader fMsgHeader; 623 SPTHD fConsumerThread; 624 /// number of threads on the receive side 625 uint32_t fNumThreads; 626 627 SPTHD* fProducerThread; 628 messageqcpp::ByteStream fFilterString; 629 uint32_t fFilterCount; 630 execplan::CalpontSystemCatalog::OID fOid; 631 execplan::CalpontSystemCatalog::OID fTableOid; 632 execplan::CalpontSystemCatalog::ColType fColType; 633 int8_t fBOP; 634 int8_t fOutputType; 635 uint32_t sentCount; 636 uint32_t recvCount; 637 BRM::LBIDRange_v lbidRanges; 638 BRM::DBRM dbrm; 639 SP_LBIDList lbidList; 640 641 boost::mutex mutex; 642 boost::mutex dlMutex; 643 boost::mutex cpMutex; 644 boost::condition condvar; 645 boost::condition condvarWakeupProducer; 646 bool finishedSending, sendWaiting, rDoNothing, fIsDict; 647 uint32_t recvWaiting, recvExited; 648 649 std::vector<struct BRM::EMEntry> extents; 650 uint32_t extentSize, divShift, ridsPerBlock, rpbShift, numExtents; 651 // config::Config *fConfig; 652 653 uint32_t fScanLbidReqLimit; // max number of LBIDs to send in a scan 654 // request to primproc 655 uint32_t fScanLbidReqThreshold; // min level of scan LBID backlog before 656 // consumer will tell producer to send 657 // more LBID scan requests to primproc 658 659 bool fStopSending; 660 bool fSingleThread; 661 bool isFilterFeeder; 662 uint64_t fPhysicalIO; // total physical I/O count 663 uint64_t fCacheIO; // total cache I/O count 664 uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP 665 uint64_t fMsgBytesIn; // total byte count for incoming messages 666 uint64_t fMsgBytesOut; // total byte count for outcoming messages 667 uint32_t fMsgsToPm; // total number of messages sent to PMs 668 uint32_t uniqueID; 669 std::string fUdfName; 670 671 //@bug 2634 672 //@bug 3128 change ParseTree* to vector<Filter*> 673 std::vector<const execplan::Filter*> fFilters; 674 675 friend class ColumnCommandJL; 676 friend class BatchPrimitiveProcessorJL; 677 friend class BucketReuseStep; 678 friend class BatchPrimitiveStep; 679 friend class TupleBPS; 680 }; 681 682 683 #if 0 684 class pIdxStep : public JobStep 685 { 686 public: 687 /** @brief pIdxStep constructor 688 * @param in the inputAssociation pointer 689 * @param out the outputAssociation pointer 690 * @param ec the DistributedEngineComm pointer 691 */ 692 pIdxStep(JobStepAssociation* in, JobStepAssociation* out, DistributedEngineComm* ec); 693 /** @brief virtual void Run method 694 */ 695 virtual void run(); 696 private: 697 pIdxStep(); 698 void startPrimitveThread(); 699 void startAggregationThread(); 700 701 protected: 702 DistributedEngineComm* fDec; 703 JobStepAssociation* fInputJobStepAssociation; 704 JobStepAssociation* fOutputJobStepAssociation; 705 }; 706 #endif 707 708 /** @brief class pDictionaryStep 709 * 710 */ 711 class pDictionaryStep : public JobStep, public PrimitiveMsg 712 { 713 714 public: 715 /** @brief pDictionaryStep constructor 716 */ 717 718 pDictionaryStep( 719 execplan::CalpontSystemCatalog::OID oid, 720 execplan::CalpontSystemCatalog::OID tabelOid, 721 const execplan::CalpontSystemCatalog::ColType& ct, 722 const JobInfo& jobInfo); 723 724 virtual ~pDictionaryStep(); 725 726 /** @brief virtual void Run method 727 */ 728 virtual void run(); 729 virtual void join(); 730 //void setOutList(StringDataList* rids); 731 void setInputList(DataList_t* rids); 732 void setBOP(int8_t b); 733 void sendPrimitiveMessages(); 734 void receivePrimitiveMessages(); 735 736 virtual const std::string toString() const; 737 colType()738 execplan::CalpontSystemCatalog::ColType& colType() 739 { 740 return fColType; 741 } colType()742 execplan::CalpontSystemCatalog::ColType colType() const 743 { 744 return fColType; 745 } 746 oid()747 virtual execplan::CalpontSystemCatalog::OID oid() const 748 { 749 return fOid; 750 } tableOid()751 virtual execplan::CalpontSystemCatalog::OID tableOid() const 752 { 753 return fTableOid; 754 } phyIOCount()755 virtual uint64_t phyIOCount () const 756 { 757 return fPhysicalIO; 758 } cacheIOCount()759 virtual uint64_t cacheIOCount () const 760 { 761 return fCacheIO; 762 } msgsRcvdCount()763 virtual uint64_t msgsRcvdCount () const 764 { 765 return msgsRecvd; 766 } msgBytesIn()767 virtual uint64_t msgBytesIn () const 768 { 769 return fMsgBytesIn; 770 } msgBytesOut()771 virtual uint64_t msgBytesOut () const 772 { 773 return fMsgBytesOut; 774 } 775 void addFilter(int8_t COP, const std::string& value); filterCount()776 uint32_t filterCount() const 777 { 778 return fFilterCount; 779 } filterString()780 messageqcpp::ByteStream filterString() const 781 { 782 return fFilterString; 783 } 784 785 // @bug3321, add filters into pDictionary 786 void appendFilter(const messageqcpp::ByteStream& filter, unsigned count); 787 void addFilter(const execplan::Filter* f); 788 void appendFilter(const std::vector<const execplan::Filter*>& fs); getFilters()789 std::vector<const execplan::Filter*>& getFilters() 790 { 791 return fFilters; 792 } BOP()793 int8_t BOP() const 794 { 795 return fBOP; 796 } 797 798 private: 799 pDictionaryStep(); 800 void startPrimitiveThread(); 801 void startAggregationThread(); 802 803 boost::shared_ptr<execplan::CalpontSystemCatalog> sysCat; 804 execplan::CalpontSystemCatalog::OID fOid; 805 execplan::CalpontSystemCatalog::OID fTableOid; 806 uint32_t fBOP; 807 uint32_t msgsSent; 808 uint32_t msgsRecvd; 809 uint32_t finishedSending; 810 uint32_t recvWaiting; 811 int64_t ridCount; 812 execplan::CalpontSystemCatalog::ColType fColType; 813 uint64_t pThread; //producer thread 814 uint64_t cThread; //producer thread 815 816 messageqcpp::ByteStream fFilterString; 817 uint32_t fFilterCount; 818 819 DataList_t* requestList; 820 //StringDataList* stringList; 821 boost::mutex mutex; 822 boost::condition condvar; 823 uint32_t fInterval; 824 uint64_t fPhysicalIO; // total physical I/O count 825 uint64_t fCacheIO; // total cache I/O count 826 uint64_t fMsgBytesIn; // total byte count for incoming messages 827 uint64_t fMsgBytesOut; // total byte count for outcoming messages 828 uint32_t uniqueID; 829 ResourceManager* fRm; 830 831 //@bug 3128 change ParseTree* to vector<Filter*> 832 std::vector<const execplan::Filter*> fFilters; 833 834 bool hasEqualityFilter; 835 int8_t tmpCOP; 836 std::vector<std::string> eqFilter; 837 838 friend class DictStepJL; 839 friend class RTSCommandJL; 840 friend class BucketReuseStep; 841 friend class BatchPrimitiveStep; 842 friend class TupleBPS; 843 }; 844 845 /** @brief class pDictionaryScan 846 * 847 */ 848 class pDictionaryScan : public JobStep, public PrimitiveMsg 849 { 850 public: 851 852 /** @brief pDictionaryScan constructor 853 */ 854 855 pDictionaryScan( 856 execplan::CalpontSystemCatalog::OID oid, 857 execplan::CalpontSystemCatalog::OID tableOid, 858 const execplan::CalpontSystemCatalog::ColType& ct, 859 const JobInfo& jobInfo); 860 861 ~pDictionaryScan(); 862 863 /** @brief virtual void Run method 864 */ 865 virtual void run(); 866 virtual void join(); 867 void setInputList(DataList_t* rids); 868 void setBOP(int8_t b); 869 void sendPrimitiveMessages(); 870 void receivePrimitiveMessages(); 871 void setSingleThread(); 872 virtual const std::string toString() const; 873 874 void setRidList(DataList<ElementType>* rids); 875 876 /** @brief Add a filter. Use this interface when the column stores anything but 4-byte floats. 877 * 878 * Add a filter. Use this interface when the column stores anything but 4-byte floats. 879 */ 880 void addFilter(int8_t COP, const std::string& value); // all but FLOATS can use this interface 881 882 /** @brief Set the DistributedEngineComm object this instance should use 883 * 884 * Set the DistributedEngineComm object this instance should use 885 */ dec(DistributedEngineComm * dec)886 void dec(DistributedEngineComm* dec) 887 { 888 if (fDec) fDec->removeQueue(uniqueID); 889 890 fDec = dec; 891 892 if (fDec) fDec->addQueue(uniqueID); 893 } 894 oid()895 virtual execplan::CalpontSystemCatalog::OID oid() const 896 { 897 return fOid; 898 } tableOid()899 virtual execplan::CalpontSystemCatalog::OID tableOid() const 900 { 901 return fTableOid; 902 } 903 phyIOCount()904 uint64_t phyIOCount () const 905 { 906 return fPhysicalIO; 907 } cacheIOCount()908 uint64_t cacheIOCount () const 909 { 910 return fCacheIO; 911 } msgsRcvdCount()912 uint64_t msgsRcvdCount () const 913 { 914 return msgsRecvd; 915 } msgBytesIn()916 uint64_t msgBytesIn () const 917 { 918 return fMsgBytesIn; 919 } msgBytesOut()920 uint64_t msgBytesOut () const 921 { 922 return fMsgBytesOut; 923 } 924 getOutputType()925 BPSOutputType getOutputType() const 926 { 927 return fOutType; 928 } setOutputType(BPSOutputType ot)929 void setOutputType(BPSOutputType ot) 930 { 931 fOutType = ot; 932 } setOutputRowGroup(const rowgroup::RowGroup & rg)933 void setOutputRowGroup(const rowgroup::RowGroup& rg) 934 { 935 fOutputRowGroup = rg; 936 } getOutputRowGroup()937 const rowgroup::RowGroup& getOutputRowGroup() const 938 { 939 return fOutputRowGroup; 940 } 941 942 // @bug3321, add interface for combining filters. BOP()943 int8_t BOP() const 944 { 945 return fBOP; 946 } 947 void addFilter(const execplan::Filter* f); 948 void appendFilter(const std::vector<const execplan::Filter*>& fs); getFilters()949 std::vector<const execplan::Filter*>& getFilters() 950 { 951 return fFilters; 952 } filterString()953 messageqcpp::ByteStream filterString() const 954 { 955 return fFilterString; 956 } filterCount()957 uint32_t filterCount() const 958 { 959 return fFilterCount; 960 } 961 void appendFilter(const messageqcpp::ByteStream& filter, unsigned count); 962 963 virtual void abort(); 964 colType()965 const execplan::CalpontSystemCatalog::ColType& colType() const 966 { 967 return fColType; 968 } 969 970 protected: 971 void sendError(uint16_t error); 972 973 private: 974 pDictionaryScan(); 975 void startPrimitiveThread(); 976 void startAggregationThread(); 977 void initializeConfigParms(); 978 void sendAPrimitiveMessage( 979 messageqcpp::ByteStream& primMsg, 980 BRM::LBID_t msgLbidStart, 981 uint32_t msgLbidCount, uint16_t dbroot); 982 void formatMiniStats(); 983 984 DistributedEngineComm* fDec; 985 boost::shared_ptr<execplan::CalpontSystemCatalog> sysCat; 986 execplan::CalpontSystemCatalog::OID fOid; 987 execplan::CalpontSystemCatalog::OID fTableOid; 988 uint32_t fFilterCount; 989 uint32_t fBOP; 990 uint32_t fCOP1; 991 uint32_t fCOP2; 992 uint32_t msgsSent; 993 uint32_t msgsRecvd; 994 uint32_t finishedSending; 995 uint32_t recvWaiting; 996 uint32_t sendWaiting; 997 int64_t ridCount; 998 uint32_t fLogicalBlocksPerScan; 999 DataList<ElementType>* ridList; 1000 messageqcpp::ByteStream fFilterString; 1001 execplan::CalpontSystemCatalog::ColType fColType; 1002 uint64_t pThread; //producer thread. thread pool handle 1003 uint64_t cThread; //consumer thread. thread pool handle 1004 DataList_t* requestList; 1005 //StringDataList* stringList; 1006 boost::mutex mutex; 1007 boost::condition condvar; 1008 boost::condition condvarWakeupProducer; 1009 BRM::LBIDRange_v fDictlbids; 1010 std::vector<struct BRM::EMEntry> extents; 1011 uint64_t extentSize; 1012 uint64_t divShift; 1013 uint64_t numExtents; 1014 uint32_t fScanLbidReqLimit; // max number of LBIDs to send in a scan 1015 // request to primproc 1016 uint32_t fScanLbidReqThreshold; // min level of scan LBID backlog before 1017 // consumer will tell producer to send 1018 bool fStopSending; 1019 bool fSingleThread; 1020 uint64_t fPhysicalIO; // total physical I/O count 1021 uint64_t fCacheIO; // total cache I/O count 1022 uint64_t fMsgBytesIn; // total byte count for incoming messages 1023 uint64_t fMsgBytesOut; // total byte count for outcoming messages 1024 uint32_t fMsgsToPm; // total number of messages sent to PMs 1025 uint32_t fMsgsExpect; // total blocks to scan 1026 uint32_t uniqueID; 1027 ResourceManager* fRm; 1028 BPSOutputType fOutType; 1029 rowgroup::RowGroup fOutputRowGroup; 1030 uint64_t fRidResults; 1031 1032 //@bug 2634 1033 //@bug 3128 change ParseTree* to vector<Filter*> 1034 std::vector<const execplan::Filter*> fFilters; 1035 1036 bool isEquality; 1037 std::vector<std::string> equalityFilter; 1038 void serializeEqualityFilter(); 1039 void destroyEqualityFilter(); 1040 }; 1041 1042 1043 class BatchPrimitive : public JobStep, public PrimitiveMsg, public DECEventListener 1044 { 1045 public: 1046 BatchPrimitive(const JobInfo & jobInfo)1047 BatchPrimitive(const JobInfo& jobInfo) : JobStep(jobInfo) {} 1048 virtual bool getFeederFlag() const = 0; 1049 virtual uint64_t getLastTupleId() const = 0; 1050 virtual uint32_t getStepCount () const = 0; 1051 virtual void setBPP(JobStep* jobStep) = 0; 1052 virtual void setFirstStepType(PrimitiveStepType firstStepType) = 0; 1053 virtual void setIsProjectionOnly() = 0; 1054 virtual void setLastTupleId(uint64_t) = 0; 1055 virtual void setOutputType(BPSOutputType outputType) = 0; 1056 virtual void setProjectBPP(JobStep* jobStep1, JobStep* jobStep2) = 0; 1057 virtual void setStepCount() = 0; 1058 virtual void setSwallowRows(const bool swallowRows) = 0; 1059 virtual void setBppStep() = 0; 1060 virtual void dec(DistributedEngineComm* dec) = 0; 1061 virtual const OIDVector& getProjectOids() const = 0; 1062 virtual uint64_t blksSkipped() const = 0; 1063 virtual bool wasStepRun() const = 0; 1064 virtual BPSOutputType getOutputType() const = 0; 1065 virtual uint64_t getRows() const = 0; 1066 virtual void setJobInfo(const JobInfo* jobInfo) = 0; 1067 virtual void setOutputRowGroup(const rowgroup::RowGroup& rg) = 0; 1068 virtual const rowgroup::RowGroup& getOutputRowGroup() const = 0; 1069 virtual void addFcnJoinExp(const std::vector<execplan::SRCP>& fe) = 0; 1070 virtual void addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree>& fe) = 0; 1071 virtual void setFE1Input(const rowgroup::RowGroup& feInput) = 0; 1072 virtual void setFcnExpGroup3(const std::vector<execplan::SRCP>& fe) = 0; 1073 virtual void setFE23Output(const rowgroup::RowGroup& rg) = 0; 1074 }; 1075 1076 1077 /** @brief class TupleBPS 1078 * 1079 */ 1080 class TupleBPS : public BatchPrimitive, public TupleDeliveryStep 1081 { 1082 public: 1083 TupleBPS(const pColStep& rhs, const JobInfo& jobInfo); 1084 TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo); 1085 TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo); 1086 TupleBPS(const pDictionaryScan& rhs, const JobInfo& jobInfo); 1087 TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo); 1088 virtual ~TupleBPS(); 1089 1090 /** @brief Starts processing. 1091 * 1092 * Starts processing. 1093 */ 1094 virtual void run(); 1095 /** @brief Sync's the caller with the end of execution. 1096 * 1097 * Does nothing. Returns when this instance is finished. 1098 */ 1099 virtual void join(); 1100 1101 void abort(); 1102 void abort_nolock(); 1103 1104 /** @brief The main loop for the send-side thread 1105 * 1106 * The main loop for the primitive-issuing thread. Don't call it directly. 1107 */ 1108 void sendPrimitiveMessages(); 1109 1110 /** @brief The main loop for the recv-side thread 1111 * 1112 * The main loop for the receive-side thread. Don't call it directly. 1113 */ 1114 void receiveMultiPrimitiveMessages(uint32_t threadID); 1115 1116 /** @brief Add a filter when the column is anything but a 4-byte float type. 1117 * 1118 * Add a filter when the column is anything but a 4-byte float type, including 1119 * 8-byte doubles. 1120 */ 1121 void setBPP(JobStep* jobStep); 1122 void setProjectBPP(JobStep* jobStep1, JobStep* jobStep2); 1123 bool scanit(uint64_t rid); 1124 void storeCasualPartitionInfo(const bool estimateRowCounts); getFeederFlag()1125 bool getFeederFlag() const 1126 { 1127 return isFilterFeeder; 1128 } setFeederFlag(bool filterFeeder)1129 void setFeederFlag (bool filterFeeder) 1130 { 1131 isFilterFeeder = filterFeeder; 1132 } setSwallowRows(const bool swallowRows)1133 void setSwallowRows(const bool swallowRows) 1134 { 1135 fSwallowRows = swallowRows; 1136 } getSwallowRows()1137 bool getSwallowRows() const 1138 { 1139 return fSwallowRows; 1140 } 1141 1142 /* Base class interface fcn that can go away */ setOutputType(BPSOutputType)1143 void setOutputType(BPSOutputType) { } //Can't change the ot of a TupleBPS getOutputType()1144 BPSOutputType getOutputType() const 1145 { 1146 return ROW_GROUP; 1147 } setBppStep()1148 void setBppStep() { } setIsProjectionOnly()1149 void setIsProjectionOnly() { } 1150 getRows()1151 uint64_t getRows() const 1152 { 1153 return ridsReturned; 1154 } setFirstStepType(PrimitiveStepType firstStepType)1155 void setFirstStepType(PrimitiveStepType firstStepType) 1156 { 1157 ffirstStepType = firstStepType; 1158 } getPrimitiveStepType()1159 PrimitiveStepType getPrimitiveStepType () 1160 { 1161 return ffirstStepType; 1162 } setStepCount()1163 void setStepCount() 1164 { 1165 fStepCount++; 1166 } getStepCount()1167 uint32_t getStepCount () const 1168 { 1169 return fStepCount; 1170 } setLastTupleId(uint64_t id)1171 void setLastTupleId(uint64_t id) 1172 { 1173 fLastTupleId = id; 1174 } getLastTupleId()1175 uint64_t getLastTupleId() const 1176 { 1177 return fLastTupleId; 1178 } 1179 1180 /** @brief Set the DistributedEngineComm object this instance should use 1181 * 1182 * Set the DistributedEngineComm object this instance should use 1183 */ 1184 void dec(DistributedEngineComm* dec); 1185 1186 virtual void stepId(uint16_t stepId); stepId()1187 virtual uint16_t stepId() const 1188 { 1189 return fStepId; 1190 } 1191 virtual const std::string toString() const; 1192 oid()1193 virtual execplan::CalpontSystemCatalog::OID oid() const 1194 { 1195 return fOid; 1196 } tableOid()1197 virtual execplan::CalpontSystemCatalog::OID tableOid() const 1198 { 1199 return fTableOid; 1200 } colType()1201 const execplan::CalpontSystemCatalog::ColType& colType() const 1202 { 1203 return fColType; 1204 } getProjectOids()1205 const OIDVector& getProjectOids() const 1206 { 1207 return projectOids; 1208 } phyIOCount()1209 virtual uint64_t phyIOCount () const 1210 { 1211 return fPhysicalIO; 1212 } cacheIOCount()1213 virtual uint64_t cacheIOCount () const 1214 { 1215 return fCacheIO; 1216 } msgsRcvdCount()1217 virtual uint64_t msgsRcvdCount () const 1218 { 1219 return msgsRecvd; 1220 } msgBytesIn()1221 virtual uint64_t msgBytesIn () const 1222 { 1223 return fMsgBytesIn; 1224 } msgBytesOut()1225 virtual uint64_t msgBytesOut () const 1226 { 1227 return fMsgBytesOut; 1228 } blockTouched()1229 virtual uint64_t blockTouched () const 1230 { 1231 return fBlockTouched; 1232 } 1233 uint32_t nextBand(messageqcpp::ByteStream& bs); 1234 1235 //...Currently only supported by pColStep and pColScanStep, so didn't bother 1236 //...to define abstract method in base class, but if start adding to other 1237 //...classes, then should consider adding pure virtual method to JobStep. blksSkipped()1238 uint64_t blksSkipped () const 1239 { 1240 return fNumBlksSkipped; 1241 } 1242 getUniqueID()1243 uint32_t getUniqueID() 1244 { 1245 return uniqueID; 1246 } 1247 void useJoiner(boost::shared_ptr<joiner::TupleJoiner>); 1248 void useJoiners(const std::vector<boost::shared_ptr<joiner::TupleJoiner> >&); wasStepRun()1249 bool wasStepRun() const 1250 { 1251 return fRunExecuted; 1252 } 1253 1254 // DEC event listener interface 1255 void newPMOnline(uint32_t connectionNumber); 1256 1257 void setInputRowGroup(const rowgroup::RowGroup& rg); 1258 void setOutputRowGroup(const rowgroup::RowGroup& rg); 1259 const rowgroup::RowGroup& getOutputRowGroup() const; 1260 1261 void setAggregateStep(const rowgroup::SP_ROWAGG_PM_t& agg, const rowgroup::RowGroup& rg); 1262 1263 /* This is called by TupleHashJoin only */ 1264 void setJoinedResultRG(const rowgroup::RowGroup& rg); 1265 1266 /* OR hacks */ 1267 void setBOP(uint8_t op); // BOP_AND or BOP_OR getBOP()1268 uint8_t getBOP() 1269 { 1270 return bop; 1271 } 1272 1273 void setJobInfo(const JobInfo* jobInfo); 1274 1275 // @bug 2123. Added getEstimatedRowCount function. 1276 /* @brief estimates the number of rows that will be returned for use in determining the 1277 * large side table for hashjoins. 1278 */ 1279 uint64_t getEstimatedRowCount(); 1280 1281 /* Functions & Expressions support. 1282 Group 1 is for single-table filters only at the moment. Group 1 objects 1283 are registered by JLF on the TBPS object directly because there is no join 1284 involved. 1285 1286 Group 2 is for cross-table filters only and should be registered on the 1287 join instance by the JLF. When the query starts running, the join object 1288 decides whether the Group 2 instance should run on the PM and UM, then 1289 registers it with the TBPS. 1290 1291 Group 3 is for selected columns whether or not its calculation is single- 1292 table or cross-table. If it's single-table and there's no join instance, 1293 JLF should register that object with the TBPS for that table. If it's 1294 cross-table, then JLF should register it with the join step. 1295 */ 1296 void addFcnJoinExp(const std::vector<execplan::SRCP>& fe); 1297 void addFcnExpGroup1(const boost::shared_ptr<execplan::ParseTree>& fe); 1298 void setFE1Input(const rowgroup::RowGroup& feInput); 1299 1300 /* for use by the THJS only... */ 1301 void setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper>& fe2, 1302 const rowgroup::RowGroup& output, bool runFE2onPM); 1303 1304 /* Functions & Expressions in select and groupby clause. 1305 JLF should use these only if there isn't a join. If there is, call the 1306 equivalent fcn on THJS instead */ 1307 void setFcnExpGroup3(const std::vector<execplan::SRCP>& fe); 1308 void setFE23Output(const rowgroup::RowGroup& rg); hasFcnExpGroup3()1309 bool hasFcnExpGroup3() 1310 { 1311 return (fe2 != NULL); 1312 } 1313 1314 // rowgroup to connector 1315 const rowgroup::RowGroup& getDeliveredRowGroup() const; 1316 void deliverStringTableRowGroup(bool b); 1317 bool deliverStringTableRowGroup() const; 1318 1319 /* Interface for adding add'l predicates for casual partitioning. 1320 * This fcn checks for any intersection between the values in vals 1321 * and the range of a given extent. If there is no intersection, that extent 1322 * won't be processed. For every extent in OID, it effectively calculates 1323 * ((vals[0] >= min && vals[0] <= max) || (vals[1] >= min && vals[1] <= max)) ... 1324 * && (previous calculation for that extent). 1325 * Note that it is an adder not a setter. For an extent to be scanned, all calls 1326 * must have a non-empty intersection. 1327 */ 1328 void addCPPredicates(uint32_t OID, const std::vector<int64_t>& vals, bool isRange); 1329 1330 /* semijoin adds */ 1331 void setJoinFERG(const rowgroup::RowGroup& rg); 1332 1333 /* To cover over the race between creating extents in each column. Mitigates 1334 * bug 3607.*/ 1335 bool goodExtentCount(); 1336 void reloadExtentLists(); 1337 void initExtentMarkers(); // need a better name for this 1338 stringTableFriendly()1339 virtual bool stringTableFriendly() 1340 { 1341 return true; 1342 } 1343 1344 protected: 1345 void sendError(uint16_t status); 1346 1347 private: 1348 void formatMiniStats(); 1349 1350 void startPrimitiveThread(); 1351 void startAggregationThread(); 1352 void initializeConfigParms(); 1353 uint64_t getFBO(uint64_t lbid); 1354 void checkDupOutputColumns(const rowgroup::RowGroup& rg); 1355 void dupOutputColumns(rowgroup::RowGroup&); 1356 void dupOutputColumns(rowgroup::RGData&, rowgroup::RowGroup&); 1357 void rgDataToDl(rowgroup::RGData&, rowgroup::RowGroup&, RowGroupDL*); 1358 void rgDataVecToDl(std::vector<rowgroup::RGData>&, rowgroup::RowGroup&, RowGroupDL*); 1359 1360 DistributedEngineComm* fDec; 1361 boost::shared_ptr<BatchPrimitiveProcessorJL> fBPP; 1362 uint16_t fNumSteps; 1363 int fColWidth; 1364 uint32_t fStepCount; 1365 bool fCPEvaluated; // @bug 2123 1366 uint64_t fEstimatedRows; // @bug 2123 1367 /// number of threads on the receive side 1368 uint32_t fMaxNumThreads; 1369 uint32_t fNumThreads; 1370 PrimitiveStepType ffirstStepType; 1371 bool isFilterFeeder; 1372 std::vector<uint64_t> fProducerThreads; // thread pool handles 1373 messageqcpp::ByteStream fFilterString; 1374 uint32_t fFilterCount; 1375 execplan::CalpontSystemCatalog::ColType fColType; 1376 execplan::CalpontSystemCatalog::OID fOid; 1377 execplan::CalpontSystemCatalog::OID fTableOid; 1378 uint64_t fLastTupleId; 1379 BRM::LBIDRange_v lbidRanges; 1380 std::vector<int32_t> lastExtent; 1381 std::vector<BRM::LBID_t> lastScannedLBID; 1382 BRM::DBRM dbrm; 1383 SP_LBIDList lbidList; 1384 uint64_t ridsRequested; 1385 uint64_t totalMsgs; 1386 volatile uint64_t msgsSent; 1387 volatile uint64_t msgsRecvd; 1388 volatile bool finishedSending; 1389 bool firstRead; 1390 bool sendWaiting; 1391 uint32_t recvWaiting; 1392 uint32_t recvExited; 1393 uint64_t ridsReturned; 1394 std::map<execplan::CalpontSystemCatalog::OID, std::tr1::unordered_map<int64_t, struct BRM::EMEntry> > extentsMap; 1395 std::vector<BRM::EMEntry> scannedExtents; 1396 OIDVector projectOids; 1397 uint32_t extentSize, divShift, rpbShift, numExtents, modMask; 1398 uint32_t fRequestSize; // the number of logical extents per batch of requests sent to PrimProc. 1399 uint32_t fProcessorThreadsPerScan; // The number of messages sent per logical extent. 1400 bool fSwallowRows; 1401 uint32_t fMaxOutstandingRequests; // The number of logical extents have not processed by PrimProc 1402 uint64_t fPhysicalIO; // total physical I/O count 1403 uint64_t fCacheIO; // total cache I/O count 1404 uint64_t fNumBlksSkipped;//total number of block scans skipped due to CP 1405 uint64_t fMsgBytesIn; // total byte count for incoming messages 1406 uint64_t fMsgBytesOut; // total byte count for outcoming messages 1407 uint64_t fBlockTouched; // total blocks touched 1408 uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File 1409 // uint64_t cThread; //consumer thread. thread handle from thread pool 1410 uint64_t pThread; //producer thread. thread handle from thread pool 1411 boost::mutex tplMutex; 1412 boost::mutex dlMutex; 1413 boost::mutex cpMutex; 1414 boost::mutex serializeJoinerMutex; 1415 boost::condition condvarWakeupProducer, condvar; 1416 1417 std::vector<bool> scanFlags; // use to keep track of which extents to eliminate from this step 1418 bool BPPIsAllocated; 1419 uint32_t uniqueID; 1420 ResourceManager* fRm; 1421 1422 /* HashJoin support */ 1423 1424 void serializeJoiner(); 1425 void serializeJoiner(uint32_t connectionNumber); 1426 1427 void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer> >& joinerOutput, 1428 rowgroup::Row& baseRow, const std::vector<boost::shared_array<int> >& mappings, 1429 const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, 1430 std::vector<rowgroup::RGData>* outputData, 1431 const boost::scoped_array<rowgroup::Row>& smallRows, rowgroup::Row& joinedRow); 1432 1433 std::vector<boost::shared_ptr<joiner::TupleJoiner> > tjoiners; 1434 bool doJoin, hasPMJoin, hasUMJoin; 1435 std::vector<rowgroup::RowGroup> joinerMatchesRGs; // parses the small-side matches from joiner 1436 1437 uint32_t smallSideCount; 1438 int smallOuterJoiner; 1439 1440 bool fRunExecuted; // was the run method executed for this step 1441 rowgroup::RowGroup inputRowGroup; // for parsing the data read from the datalist 1442 rowgroup::RowGroup primRowGroup; // for parsing the data received from the PM 1443 rowgroup::RowGroup outputRowGroup; // if there's a join, these are the joined 1444 // result, otherwise it's = to primRowGroup 1445 // aggregation support 1446 rowgroup::SP_ROWAGG_PM_t fAggregatorPm; 1447 rowgroup::RowGroup fAggRowGroupPm; 1448 1449 // OR hacks 1450 uint8_t bop; // BOP_AND or BOP_OR 1451 1452 // temporary hack to make sure JobList only calls run and join once 1453 boost::mutex jlLock; 1454 bool runRan; 1455 bool joinRan; 1456 1457 // bug 1965, trace duplicat columns in delivery list <dest, src> 1458 std::vector<std::pair<uint32_t, uint32_t> > dupColumns; 1459 1460 /* Functions & Expressions vars */ 1461 boost::shared_ptr<funcexp::FuncExpWrapper> fe1, fe2; 1462 rowgroup::RowGroup fe1Input, fe2Output; 1463 boost::shared_array<int> fe2Mapping; 1464 bool runFEonPM; 1465 1466 /* for UM F & E 2 processing */ 1467 rowgroup::RGData fe2Data; 1468 rowgroup::Row fe2InRow, fe2OutRow; 1469 1470 void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, 1471 rowgroup::Row& inRow, rowgroup::Row& outRow, 1472 std::vector<rowgroup::RGData>* rgData, 1473 funcexp::FuncExpWrapper* localFE2); 1474 void processFE2_oneRG(rowgroup::RowGroup& input, rowgroup::RowGroup& output, 1475 rowgroup::Row& inRow, rowgroup::Row& outRow, 1476 funcexp::FuncExpWrapper* localFE2); 1477 1478 /* Runtime Casual Partitioning adjustments. The CP code is needlessly complicated; 1479 * to avoid making it worse, decided to designate 'scanFlags' as the static 1480 * component and this new array as the runtime component. The final CP decision 1481 * is scanFlags & runtimeCP. 1482 */ 1483 std::vector<bool> runtimeCPFlags; 1484 1485 /* semijoin vars */ 1486 rowgroup::RowGroup joinFERG; 1487 1488 boost::shared_ptr<RowGroupDL> deliveryDL; 1489 uint32_t deliveryIt; 1490 1491 /* shared nothing support */ 1492 struct Job 1493 { JobJob1494 Job(uint32_t d, uint32_t n, uint32_t b, boost::shared_ptr<messageqcpp::ByteStream>& bs) : 1495 dbroot(d), connectionNum(n), expectedResponses(b), msg(bs) { } 1496 uint32_t dbroot; 1497 uint32_t connectionNum; 1498 uint32_t expectedResponses; 1499 boost::shared_ptr<messageqcpp::ByteStream> msg; 1500 }; 1501 1502 void prepCasualPartitioning(); 1503 void makeJobs(std::vector<Job>* jobs); 1504 void interleaveJobs(std::vector<Job>* jobs) const; 1505 void sendJobs(const std::vector<Job>& jobs); 1506 uint32_t numDBRoots; 1507 1508 /* Pseudo column filter processing. Think about refactoring into a separate class. */ 1509 bool processPseudoColFilters(uint32_t extentIndex, boost::shared_ptr<std::map<int, int> > dbRootPMMap) const; 1510 bool processOneFilterType(int8_t colWidth, int64_t value, uint32_t type) const; 1511 bool processSingleFilterString(int8_t BOP, int8_t colWidth, int64_t val, const uint8_t* filterString, 1512 uint32_t filterCount) const; 1513 bool processSingleFilterString_ranged(int8_t BOP, int8_t colWidth, int64_t min, int64_t max, 1514 const uint8_t* filterString, uint32_t filterCount) const; 1515 bool processLBIDFilter(const BRM::EMEntry& emEntry) const; 1516 bool compareSingleValue(uint8_t COP, int64_t val1, int64_t val2) const; 1517 bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const; 1518 bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter, 1519 hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter; 1520 1521 }; 1522 1523 /** @brief class FilterStep 1524 * 1525 */ 1526 class FilterStep : public JobStep 1527 { 1528 public: 1529 1530 FilterStep(const execplan::CalpontSystemCatalog::ColType& colType, const JobInfo& jobInfo); 1531 ~FilterStep(); 1532 1533 /** @brief virtual void Run method 1534 */ 1535 void run(); 1536 void join(); 1537 1538 const std::string toString() const; 1539 tableOid()1540 execplan::CalpontSystemCatalog::OID tableOid() const 1541 { 1542 return fTableOID; 1543 } tableOid(execplan::CalpontSystemCatalog::OID tableOid)1544 void tableOid(execplan::CalpontSystemCatalog::OID tableOid) 1545 { 1546 fTableOID = tableOid; 1547 } colType()1548 const execplan::CalpontSystemCatalog::ColType& colType() const 1549 { 1550 return fColType; 1551 } 1552 void setBOP(int8_t b); BOP()1553 int8_t BOP() const 1554 { 1555 return fBOP; 1556 } 1557 friend struct FSRunner; 1558 1559 void addFilter(const execplan::Filter* f); getFilters()1560 std::vector<const execplan::Filter*>& getFilters() 1561 { 1562 return fFilters; 1563 } 1564 1565 protected: 1566 // void unblockDataLists(FifoDataList* fifo, StringFifoDataList* strFifo, StrDataList* strResult, DataList_t* result); 1567 1568 private: 1569 1570 //This i/f is not meaningful in this step oid()1571 execplan::CalpontSystemCatalog::OID oid() const 1572 { 1573 return 0; 1574 } 1575 void doFilter(); // @bug 686 1576 1577 // config::Config *fConfig; 1578 1579 execplan::CalpontSystemCatalog::OID fTableOID; 1580 execplan::CalpontSystemCatalog::ColType fColType; 1581 int8_t fBOP; 1582 // int64_t runner; // thread handle from thread pool 1583 1584 // @bug 687 Add data and friend declarations for concurrent filter steps. 1585 std::vector<ElementType> fSortedA; // used to internally sort input data 1586 std::vector<ElementType> fSortedB; 1587 // FifoDataList* fFAp; // Used to internally pass data to 1588 // FifoDataList* fFBp; // FilterOperation thread. 1589 uint64_t resultCount; 1590 1591 std::vector<const execplan::Filter*> fFilters; 1592 }; 1593 1594 /** @brief class PassThruStep 1595 * 1596 */ 1597 class PassThruStep : public JobStep, public PrimitiveMsg 1598 { 1599 1600 typedef std::pair<int64_t, int64_t> element_t; 1601 1602 public: 1603 /** @brief PassThruStep constructor 1604 */ 1605 PassThruStep( 1606 execplan::CalpontSystemCatalog::OID oid, 1607 execplan::CalpontSystemCatalog::OID tableOid, 1608 const execplan::CalpontSystemCatalog::ColType& colType, 1609 const JobInfo& jobInfo); 1610 1611 PassThruStep(const pColStep& rhs); 1612 PassThruStep(const PseudoColStep& rhs); 1613 1614 virtual ~PassThruStep(); 1615 1616 /** @brief Starts processing. Set at least the RID list before calling. 1617 * 1618 * Starts processing. Set at least the RID list before calling this. 1619 */ 1620 virtual void run(); 1621 1622 /** @brief Sync's the caller with the end of execution. 1623 * 1624 * Does nothing. Returns when this instance is finished. 1625 */ 1626 virtual void join(); 1627 1628 virtual const std::string toString() const; 1629 oid()1630 virtual execplan::CalpontSystemCatalog::OID oid() const 1631 { 1632 return fOid; 1633 } 1634 tableOid()1635 virtual execplan::CalpontSystemCatalog::OID tableOid() const 1636 { 1637 return fTableOid; 1638 } 1639 getColWidth()1640 uint8_t getColWidth() const 1641 { 1642 return colWidth; 1643 } isDictCol()1644 bool isDictCol() const 1645 { 1646 return isDictColumn; 1647 } isExeMgr()1648 bool isExeMgr() const 1649 { 1650 return isEM; 1651 } colType()1652 const execplan::CalpontSystemCatalog::ColType& colType() const 1653 { 1654 return fColType; 1655 } resourceManager()1656 ResourceManager* resourceManager() const 1657 { 1658 return fRm; 1659 } 1660 pseudoType(uint32_t p)1661 void pseudoType(uint32_t p) 1662 { 1663 fPseudoType = p; 1664 } pseudoType()1665 uint32_t pseudoType() const 1666 { 1667 return fPseudoType; 1668 } 1669 1670 protected: 1671 1672 private: 1673 1674 /** @brief constructor for completeness 1675 */ 1676 explicit PassThruStep(); 1677 1678 uint64_t getLBID(uint64_t rid, bool& scan); 1679 uint64_t getFBO(uint64_t lbid); 1680 1681 boost::shared_ptr<execplan::CalpontSystemCatalog> catalog; 1682 execplan::CalpontSystemCatalog::OID fOid; 1683 execplan::CalpontSystemCatalog::OID fTableOid; 1684 uint8_t colWidth; 1685 uint16_t realWidth; 1686 uint32_t fPseudoType; 1687 execplan::CalpontSystemCatalog::ColType fColType; 1688 bool isDictColumn; 1689 bool isEM; 1690 1691 // boost::thread* fPTThd; 1692 1693 // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. 1694 // Running with this one will swallow rows at projection. 1695 bool fSwallowRows; 1696 ResourceManager* fRm; 1697 friend class PassThruCommandJL; 1698 friend class RTSCommandJL; 1699 friend class BatchPrimitiveStep; 1700 friend class TupleBPS; 1701 }; 1702 1703 class PseudoColStep : public pColStep 1704 { 1705 public: 1706 /** @brief PseudoColStep constructor 1707 */ PseudoColStep(execplan::CalpontSystemCatalog::OID oid,execplan::CalpontSystemCatalog::OID tableOid,uint32_t pId,const execplan::CalpontSystemCatalog::ColType & ct,const JobInfo & jobInfo)1708 PseudoColStep( 1709 execplan::CalpontSystemCatalog::OID oid, 1710 execplan::CalpontSystemCatalog::OID tableOid, 1711 uint32_t pId, 1712 const execplan::CalpontSystemCatalog::ColType& ct, 1713 const JobInfo& jobInfo) : 1714 pColStep(oid, tableOid, ct, jobInfo), 1715 fPseudoColumnId(pId) 1716 {} 1717 PseudoColStep(const PassThruStep & rhs)1718 PseudoColStep(const PassThruStep& rhs) : 1719 pColStep(rhs), 1720 fPseudoColumnId(rhs.pseudoType()) 1721 {} 1722 ~PseudoColStep()1723 virtual ~PseudoColStep() {} 1724 pseudoColumnId()1725 uint32_t pseudoColumnId() const 1726 { 1727 return fPseudoColumnId; 1728 } pseudoColumnId(uint32_t pId)1729 void pseudoColumnId(uint32_t pId) 1730 { 1731 fPseudoColumnId = pId; 1732 } 1733 1734 protected: 1735 uint32_t fPseudoColumnId; 1736 1737 private: 1738 /** @brief disabled constuctor 1739 */ 1740 PseudoColStep(const pColScanStep&); 1741 PseudoColStep(const pColStep&); 1742 }; 1743 1744 1745 } 1746 1747 #endif // JOBLIST_PRIMITIVESTEP_H 1748 // vim:ts=4 sw=4: 1749 1750 1751