1 /* Copyright (C) 2014 InfiniDB, Inc. 2 3 This program is free software; you can redistribute it and/or 4 modify it under the terms of the GNU General Public License 5 as published by the Free Software Foundation; version 2 of 6 the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 16 MA 02110-1301, USA. */ 17 18 // $Id: jobstep.h 9636 2013-06-20 14:23:36Z rdempsey $ 19 20 21 /** @file */ 22 23 #ifndef JOBLIST_JOBSTEP_H_ 24 #define JOBLIST_JOBSTEP_H_ 25 26 #include <iostream> 27 #include <vector> 28 #include <string> 29 #include <cassert> 30 #include <sys/time.h> 31 #include <stdexcept> 32 33 #include <boost/shared_ptr.hpp> 34 #include <boost/shared_array.hpp> 35 #include <boost/uuid/uuid.hpp> 36 37 #include "calpontsystemcatalog.h" 38 #include "calpontselectexecutionplan.h" 39 #include "elementtype.h" 40 #include "errorinfo.h" 41 #include "jl_logger.h" 42 #include "timestamp.h" 43 #include "rowgroup.h" 44 #include "querytele.h" 45 #include "threadpool.h" 46 #include "atomicops.h" 47 48 #include "branchpred.h" 49 50 #ifndef __GNUC__ 51 # ifndef __attribute__ 52 # define __attribute__(x) 53 # endif 54 #endif 55 56 namespace joblist 57 { 58 59 /** @brief class JobStepAssociation mediator class to connect/control JobSteps and DataLists 60 * 61 * Class JobStepAssociation connects/controls JobSteps and DalaLists 62 */ 63 class JobStepAssociation 64 { 65 public: JobStepAssociation()66 JobStepAssociation() { } ~JobStepAssociation()67 virtual ~JobStepAssociation() {} 68 inAdd(const AnyDataListSPtr & spdl)69 void inAdd(const AnyDataListSPtr& spdl) __attribute__((deprecated)) 70 { 71 fInDataList.push_back(spdl); 72 } outAdd(const AnyDataListSPtr & spdl)73 void outAdd(const AnyDataListSPtr& spdl) 74 { 75 fOutDataList.push_back(spdl); 76 } outAdd(const AnyDataListSPtr & spdl,size_t pos)77 void outAdd(const AnyDataListSPtr& spdl, size_t pos) 78 { 79 if (pos > fOutDataList.size()) throw std::logic_error("Insert position is beyond end."); 80 81 fOutDataList.insert(fOutDataList.begin() + pos, spdl); 82 } outAdd(const DataListVec & spdlVec,size_t pos)83 void outAdd(const DataListVec& spdlVec, size_t pos) 84 { 85 if (pos > fOutDataList.size()) throw std::logic_error("Insert position is beyond end."); 86 87 fOutDataList.insert(fOutDataList.begin() + pos, spdlVec.begin(), spdlVec.end()); 88 } inSize()89 size_t inSize() const __attribute__((deprecated)) 90 { 91 return fInDataList.size(); 92 } outSize()93 size_t outSize() const 94 { 95 return fOutDataList.size(); 96 } inAt(size_t i)97 const AnyDataListSPtr& inAt(size_t i) const __attribute__((deprecated)) 98 { 99 return fInDataList.at(i); 100 } outAt(size_t i)101 const AnyDataListSPtr& outAt(size_t i) const 102 { 103 return fOutDataList.at(i); 104 } outAt(size_t i)105 AnyDataListSPtr& outAt(size_t i) 106 { 107 return fOutDataList.at(i); 108 } 109 110 private: 111 DataListVec fInDataList; 112 DataListVec fOutDataList; 113 }; 114 115 116 // forward reference 117 struct JobInfo; 118 119 120 /** @brief class JobStep abstract class describing a query execution step 121 * 122 * Class JobStep is an abstract class that describes a query execution step 123 */ 124 class JobStep 125 { 126 public: 127 128 /** constructor 129 */ JobStep()130 JobStep() { } 131 JobStep(const JobInfo&); 132 /** destructor 133 */ ~JobStep()134 virtual ~JobStep() { /*pthread_mutex_destroy(&mutex);*/ } 135 /** @brief virtual void Run method 136 */ 137 virtual void run() = 0; abort()138 virtual void abort() 139 { 140 fDie = true; 141 } 142 /** @brief virtual void join method 143 */ 144 virtual void join() = 0; 145 /** @brief virtual string toString method 146 */ 147 virtual const std::string toString() const = 0; 148 149 /** @brief virtual JobStepAssociation * inputAssociation method 150 */ inputAssociation(const JobStepAssociation & inputAssociation)151 virtual void inputAssociation(const JobStepAssociation& inputAssociation) 152 { 153 fInputJobStepAssociation = inputAssociation; 154 } inputAssociation()155 virtual const JobStepAssociation& inputAssociation() const 156 { 157 return fInputJobStepAssociation; 158 } 159 /** @brief virtual JobStepAssociation * outputAssociation method 160 */ outputAssociation(const JobStepAssociation & outputAssociation)161 virtual void outputAssociation(const JobStepAssociation& outputAssociation) 162 { 163 fOutputJobStepAssociation = outputAssociation; 164 } outputAssociation()165 virtual const JobStepAssociation& outputAssociation() const 166 { 167 return fOutputJobStepAssociation; 168 } 169 stepId(uint16_t stepId)170 virtual void stepId(uint16_t stepId) 171 { 172 fStepId = stepId; 173 } stepId()174 virtual uint16_t stepId() const 175 { 176 return fStepId; 177 } sessionId()178 virtual uint32_t sessionId() const 179 { 180 return fSessionId; 181 } txnId()182 virtual uint32_t txnId() const 183 { 184 return fTxnId; 185 } statementId()186 virtual uint32_t statementId() const 187 { 188 return fStatementId; 189 } logger(const SPJL & logger)190 virtual void logger(const SPJL& logger) 191 { 192 fLogger = logger; 193 } 194 isDictCol()195 virtual bool isDictCol() const 196 { 197 return 0; 198 } oid()199 virtual execplan::CalpontSystemCatalog::OID oid() const 200 { 201 return 0; 202 } tableOid()203 virtual execplan::CalpontSystemCatalog::OID tableOid() const 204 { 205 return 0; 206 } 207 // @bug 598 Added alias for self-join alias()208 virtual std::string alias() const 209 { 210 return fAlias; 211 } alias(const std::string & alias)212 virtual void alias(const std::string& alias) 213 { 214 fAlias = alias; 215 } 216 // @bug 3401 & 3402, view support view()217 virtual std::string view() const 218 { 219 return fView; 220 } view(const std::string & vw)221 virtual void view(const std::string& vw) 222 { 223 fView = vw; 224 } 225 // @bug 3438, stats with column name name()226 virtual std::string name() const 227 { 228 return fName; 229 } name(const std::string & nm)230 virtual void name(const std::string& nm) 231 { 232 fName = nm; 233 } schema()234 virtual std::string schema() const 235 { 236 return fSchema; 237 } schema(const std::string & s)238 virtual void schema(const std::string& s) 239 { 240 fSchema = s; 241 } 242 // @bug 3398, add columns' unique tuple ID to job step tupleId()243 virtual uint64_t tupleId() const 244 { 245 return fTupleId; 246 } tupleId(uint64_t id)247 virtual void tupleId(uint64_t id) 248 { 249 fTupleId = id; 250 } 251 252 //...Final I/O blk count, msg rcv count, etc for this job step. These 253 //...methods do not use a mutex lock to acquire values, because it is 254 //...assumed they are called after all processing is complete. phyIOCount()255 virtual uint64_t phyIOCount ( ) const 256 { 257 return 0; 258 } cacheIOCount()259 virtual uint64_t cacheIOCount ( ) const 260 { 261 return 0; 262 } msgsRcvdCount()263 virtual uint64_t msgsRcvdCount ( ) const 264 { 265 return 0; 266 } msgBytesIn()267 virtual uint64_t msgBytesIn ( ) const 268 { 269 return 0; 270 } msgBytesOut()271 virtual uint64_t msgBytesOut ( ) const 272 { 273 return 0; 274 } blockTouched()275 virtual uint64_t blockTouched ( ) const 276 { 277 return 0; 278 } cardinality()279 virtual uint64_t cardinality ( ) const 280 { 281 return fCardinality; 282 } cardinality(const uint64_t cardinality)283 virtual void cardinality ( const uint64_t cardinality ) 284 { 285 fCardinality = cardinality; 286 } 287 288 // functions to delay/control jobstep execution; decWaitToRunStepCnt() per- 289 // forms atomic decrement op because it is accessed by multiple threads. delayedRun()290 bool delayedRun() const 291 { 292 return fDelayedRunFlag; 293 } waitToRunStepCnt()294 int waitToRunStepCnt() 295 { 296 return fWaitToRunStepCnt; 297 } incWaitToRunStepCnt()298 void incWaitToRunStepCnt() 299 { 300 fDelayedRunFlag = true; 301 ++fWaitToRunStepCnt; 302 } decWaitToRunStepCnt()303 int decWaitToRunStepCnt() 304 { 305 return atomicops::atomicDec(&fWaitToRunStepCnt); 306 } resetDelayedRun()307 void resetDelayedRun() 308 { 309 fDelayedRunFlag = false; 310 fWaitToRunStepCnt = 0; 311 } 312 logEnd(const char * s)313 void logEnd(const char* s) 314 { 315 fLogMutex.lock(); //pthread_mutex_lock(&mutex); 316 std::cout << s << std::endl; 317 fLogMutex.unlock(); //pthread_mutex_unlock(&mutex); 318 } 319 void syslogStartStep(uint32_t subSystem, 320 const std::string& stepName) const; 321 void syslogEndStep (uint32_t subSystem, 322 uint64_t blockedDLInput, 323 uint64_t blockedDLOutput, 324 uint64_t msgBytesInput = 0, 325 uint64_t msgBytesOutput = 0 ) const; 326 void syslogReadBlockCounts (uint32_t subSystem, 327 uint64_t physicalReadCount, 328 uint64_t cacheReadCount, 329 uint64_t casualPartBlocks ) const; 330 void syslogProcessingTimes (uint32_t subSystem, 331 const struct timeval& firstReadTime, 332 const struct timeval& lastReadTime, 333 const struct timeval& firstWriteTime, 334 const struct timeval& lastWriteTime) const; 335 void setTrace(bool trace) __attribute__((deprecated)); 336 bool traceOn() const; setTraceFlags(uint32_t flags)337 void setTraceFlags(uint32_t flags) 338 { 339 fTraceFlags = flags; 340 } 341 JSTimeStamp dlTimes; 342 extendedInfo()343 const std::string& extendedInfo() const 344 { 345 return fExtendedInfo; 346 } miniInfo()347 const std::string& miniInfo() const 348 { 349 return fMiniInfo; 350 } 351 priority()352 uint32_t priority() 353 { 354 return fPriority; 355 } priority(uint32_t p)356 void priority(uint32_t p) 357 { 358 fPriority = p; 359 } 360 status()361 uint32_t status() const 362 { 363 return fErrorInfo->errCode; 364 } status(uint32_t s)365 void status(uint32_t s) 366 { 367 fErrorInfo->errCode = s; 368 } errorMessage()369 std::string errorMessage() 370 { 371 return fErrorInfo->errMsg; 372 } errorMessage(const std::string & s)373 void errorMessage(const std::string& s) 374 { 375 fErrorInfo->errMsg = s; 376 } errorInfo()377 const SErrorInfo& errorInfo() const 378 { 379 return fErrorInfo; 380 } errorInfo()381 SErrorInfo& errorInfo() 382 { 383 return fErrorInfo; 384 } errorInfo(SErrorInfo & sp)385 void errorInfo(SErrorInfo& sp) 386 { 387 fErrorInfo = sp; 388 } 389 cancelled()390 bool cancelled() 391 { 392 return (fErrorInfo->errCode > 0 || fDie); 393 } 394 stringTableFriendly()395 virtual bool stringTableFriendly() 396 { 397 return false; 398 } 399 delivery()400 bool delivery() const 401 { 402 return fDelivery; 403 } delivery(bool b)404 void delivery(bool b) 405 { 406 fDelivery = b; 407 } 408 queryUuid()409 const boost::uuids::uuid& queryUuid() const 410 { 411 return fQueryUuid; 412 } 413 414 //@bug5887, distinguish on clause filter and where clause filter onClauseFilter()415 bool onClauseFilter() const 416 { 417 return fOnClauseFilter; 418 } onClauseFilter(bool b)419 void onClauseFilter(bool b) 420 { 421 fOnClauseFilter = b; 422 } 423 timeZone(const std::string & timezone)424 void timeZone(const std::string& timezone) 425 { 426 fTimeZone = timezone; 427 } timeZone()428 const std::string timeZone() const 429 { 430 return fTimeZone; 431 } 432 433 void handleException(std::exception_ptr e, 434 const int errorCode, 435 const unsigned infoErrorCode, 436 const std::string& methodName); 437 438 static threadpool::ThreadPool jobstepThreadPool; 439 protected: 440 441 //@bug6088, for telemetry posting 442 static const int64_t STEP_TELE_INTERVAL = 5000; // now, this is the browser refresh rate postStepStartTele(querytele::StepTeleStats & sts)443 void postStepStartTele(querytele::StepTeleStats& sts) 444 { 445 fStartTime = fLastStepTeleTime = sts.start_time = querytele::QueryTeleClient::timeNowms(); 446 fQtc.postStepTele(sts); 447 } postStepProgressTele(querytele::StepTeleStats & sts)448 void postStepProgressTele(querytele::StepTeleStats& sts) 449 { 450 int64_t crntTime = querytele::QueryTeleClient::timeNowms(); 451 452 if ((crntTime - fLastStepTeleTime) >= STEP_TELE_INTERVAL) 453 { 454 // interval between step telemetry 455 sts.start_time = fStartTime; 456 fQtc.postStepTele(sts); 457 fLastStepTeleTime = crntTime; 458 } 459 } postStepSummaryTele(querytele::StepTeleStats & sts)460 void postStepSummaryTele(querytele::StepTeleStats& sts) 461 { 462 sts.start_time = fStartTime; 463 sts.end_time = fLastStepTeleTime = querytele::QueryTeleClient::timeNowms(); 464 fQtc.postStepTele(sts); 465 } 466 467 JobStepAssociation fInputJobStepAssociation; 468 JobStepAssociation fOutputJobStepAssociation; 469 470 uint32_t fSessionId; 471 uint32_t fTxnId; 472 BRM::QueryContext fVerId; 473 uint32_t fStatementId; 474 475 uint32_t fStepId; 476 uint64_t fTupleId; 477 478 std::string fAlias; 479 std::string fView; 480 std::string fName; 481 std::string fSchema; 482 uint32_t fTraceFlags; 483 uint64_t fCardinality; 484 bool fDelayedRunFlag; 485 bool fDelivery; 486 bool fOnClauseFilter; 487 volatile bool fDie; 488 volatile uint32_t fWaitToRunStepCnt; 489 std::string fExtendedInfo; 490 std::string fMiniInfo; 491 492 uint32_t fPriority; 493 494 SErrorInfo fErrorInfo; 495 SPJL fLogger; 496 497 uint32_t fLocalQuery; 498 499 boost::uuids::uuid fQueryUuid; 500 boost::uuids::uuid fStepUuid; 501 querytele::QueryTeleClient fQtc; 502 uint64_t fProgress; 503 int64_t fStartTime; 504 int64_t fLastStepTeleTime; 505 std::string fTimeZone; 506 507 private: 508 static boost::mutex fLogMutex; 509 510 friend class CommandJL; 511 }; 512 513 514 class TupleJobStep 515 { 516 public: TupleJobStep()517 TupleJobStep() { } ~TupleJobStep()518 virtual ~TupleJobStep() { } 519 virtual void setOutputRowGroup(const rowgroup::RowGroup&) = 0; setFcnExpGroup3(const std::vector<execplan::SRCP> &)520 virtual void setFcnExpGroup3(const std::vector<execplan::SRCP>&) {} setFE23Output(const rowgroup::RowGroup &)521 virtual void setFE23Output(const rowgroup::RowGroup&) {} 522 virtual const rowgroup::RowGroup& getOutputRowGroup() const = 0; 523 }; 524 525 526 class TupleDeliveryStep : public TupleJobStep 527 { 528 public: ~TupleDeliveryStep()529 virtual ~TupleDeliveryStep() { } 530 virtual uint32_t nextBand(messageqcpp::ByteStream& bs) = 0; 531 virtual const rowgroup::RowGroup& getDeliveredRowGroup() const = 0; 532 virtual void deliverStringTableRowGroup(bool b) = 0; 533 virtual bool deliverStringTableRowGroup() const = 0; 534 }; 535 536 class NullStep : public JobStep 537 { 538 public: 539 /** @brief virtual void Run method 540 */ run()541 virtual void run() {} 542 /** @brief virtual void join method 543 */ join()544 virtual void join() {} 545 /** @brief virtual string toString method 546 */ toString()547 virtual const std::string toString() const 548 { 549 return "NullStep"; 550 } 551 }; 552 553 // calls rhs->toString() 554 std::ostream& operator<<(std::ostream& os, const JobStep* rhs); 555 556 typedef boost::shared_ptr<JobStepAssociation> SJSA; 557 typedef boost::shared_ptr<JobStepAssociation> JobStepAssociationSPtr; 558 559 typedef boost::shared_ptr<JobStep> SJSTEP; 560 561 } 562 563 #endif // JOBLIST_JOBSTEP_H_ 564 // vim:ts=4 sw=4: 565 566