1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 //  $Id: jobstep.h 9636 2013-06-20 14:23:36Z rdempsey $
19 
20 
21 /** @file */
22 
23 #ifndef JOBLIST_JOBSTEP_H_
24 #define JOBLIST_JOBSTEP_H_
25 
26 #include <iostream>
27 #include <vector>
28 #include <string>
29 #include <cassert>
30 #include <sys/time.h>
31 #include <stdexcept>
32 
33 #include <boost/shared_ptr.hpp>
34 #include <boost/shared_array.hpp>
35 #include <boost/uuid/uuid.hpp>
36 
37 #include "calpontsystemcatalog.h"
38 #include "calpontselectexecutionplan.h"
39 #include "elementtype.h"
40 #include "errorinfo.h"
41 #include "jl_logger.h"
42 #include "timestamp.h"
43 #include "rowgroup.h"
44 #include "querytele.h"
45 #include "threadpool.h"
46 #include "atomicops.h"
47 
48 #include "branchpred.h"
49 
50 #ifndef __GNUC__
51 #  ifndef __attribute__
52 #    define __attribute__(x)
53 #  endif
54 #endif
55 
56 namespace joblist
57 {
58 
59 /** @brief class JobStepAssociation mediator class to connect/control JobSteps and DataLists
60  *
61  * Class JobStepAssociation connects/controls JobSteps and DalaLists
62  */
63 class JobStepAssociation
64 {
65 public:
JobStepAssociation()66     JobStepAssociation() { }
~JobStepAssociation()67     virtual ~JobStepAssociation() {}
68 
inAdd(const AnyDataListSPtr & spdl)69     void inAdd(const AnyDataListSPtr& spdl) __attribute__((deprecated))
70     {
71         fInDataList.push_back(spdl);
72     }
outAdd(const AnyDataListSPtr & spdl)73     void outAdd(const AnyDataListSPtr& spdl)
74     {
75         fOutDataList.push_back(spdl);
76     }
outAdd(const AnyDataListSPtr & spdl,size_t pos)77     void outAdd(const AnyDataListSPtr& spdl, size_t pos)
78     {
79         if (pos > fOutDataList.size()) throw std::logic_error("Insert position is beyond end.");
80 
81         fOutDataList.insert(fOutDataList.begin() + pos, spdl);
82     }
outAdd(const DataListVec & spdlVec,size_t pos)83     void outAdd(const DataListVec& spdlVec, size_t pos)
84     {
85         if (pos > fOutDataList.size()) throw std::logic_error("Insert position is beyond end.");
86 
87         fOutDataList.insert(fOutDataList.begin() + pos, spdlVec.begin(), spdlVec.end());
88     }
inSize()89     size_t inSize() const __attribute__((deprecated))
90     {
91         return fInDataList.size();
92     }
outSize()93     size_t outSize() const
94     {
95         return fOutDataList.size();
96     }
inAt(size_t i)97     const AnyDataListSPtr& inAt(size_t i) const __attribute__((deprecated))
98     {
99         return fInDataList.at(i);
100     }
outAt(size_t i)101     const AnyDataListSPtr& outAt(size_t i) const
102     {
103         return fOutDataList.at(i);
104     }
outAt(size_t i)105     AnyDataListSPtr& outAt(size_t i)
106     {
107         return fOutDataList.at(i);
108     }
109 
110 private:
111     DataListVec fInDataList;
112     DataListVec fOutDataList;
113 };
114 
115 
116 // forward reference
117 struct JobInfo;
118 
119 
120 /** @brief class JobStep abstract class describing a query execution step
121  *
122  * Class JobStep is an abstract class that describes a query execution step
123  */
124 class JobStep
125 {
126 public:
127 
128     /** constructor
129      */
JobStep()130     JobStep() { }
131     JobStep(const JobInfo&);
132     /** destructor
133      */
~JobStep()134     virtual ~JobStep() { /*pthread_mutex_destroy(&mutex);*/ }
135     /** @brief virtual void Run method
136      */
137     virtual void run() = 0;
abort()138     virtual void abort()
139     {
140         fDie = true;
141     }
142     /** @brief virtual void join method
143      */
144     virtual void join() = 0;
145     /** @brief virtual string toString method
146      */
147     virtual const std::string toString() const = 0;
148 
149     /** @brief virtual JobStepAssociation * inputAssociation method
150      */
inputAssociation(const JobStepAssociation & inputAssociation)151     virtual void  inputAssociation(const JobStepAssociation& inputAssociation)
152     {
153         fInputJobStepAssociation = inputAssociation;
154     }
inputAssociation()155     virtual const JobStepAssociation& inputAssociation() const
156     {
157         return fInputJobStepAssociation;
158     }
159     /** @brief virtual JobStepAssociation * outputAssociation method
160      */
outputAssociation(const JobStepAssociation & outputAssociation)161     virtual void  outputAssociation(const JobStepAssociation& outputAssociation)
162     {
163         fOutputJobStepAssociation = outputAssociation;
164     }
outputAssociation()165     virtual const JobStepAssociation& outputAssociation() const
166     {
167         return fOutputJobStepAssociation;
168     }
169 
stepId(uint16_t stepId)170     virtual void stepId(uint16_t stepId)
171     {
172         fStepId = stepId;
173     }
stepId()174     virtual uint16_t stepId()      const
175     {
176         return fStepId;
177     }
sessionId()178     virtual uint32_t sessionId()   const
179     {
180         return fSessionId;
181     }
txnId()182     virtual uint32_t txnId()       const
183     {
184         return fTxnId;
185     }
statementId()186     virtual uint32_t statementId() const
187     {
188         return fStatementId;
189     }
logger(const SPJL & logger)190     virtual void logger(const SPJL& logger)
191     {
192         fLogger = logger;
193     }
194 
isDictCol()195     virtual bool isDictCol() const
196     {
197         return 0;
198     }
oid()199     virtual execplan::CalpontSystemCatalog::OID oid() const
200     {
201         return 0;
202     }
tableOid()203     virtual execplan::CalpontSystemCatalog::OID tableOid() const
204     {
205         return 0;
206     }
207     // @bug 598 Added alias for self-join
alias()208     virtual std::string alias() const
209     {
210         return fAlias;
211     }
alias(const std::string & alias)212     virtual void  alias(const std::string& alias)
213     {
214         fAlias = alias;
215     }
216     // @bug 3401 & 3402, view support
view()217     virtual std::string view() const
218     {
219         return fView;
220     }
view(const std::string & vw)221     virtual void  view(const std::string& vw)
222     {
223         fView = vw;
224     }
225     // @bug 3438, stats with column name
name()226     virtual std::string name() const
227     {
228         return fName;
229     }
name(const std::string & nm)230     virtual void  name(const std::string& nm)
231     {
232         fName = nm;
233     }
schema()234     virtual std::string schema() const
235     {
236         return fSchema;
237     }
schema(const std::string & s)238     virtual void  schema(const std::string& s)
239     {
240         fSchema = s;
241     }
242     // @bug 3398, add columns' unique tuple ID to job step
tupleId()243     virtual uint64_t tupleId() const
244     {
245         return fTupleId;
246     }
tupleId(uint64_t id)247     virtual void tupleId(uint64_t id)
248     {
249         fTupleId = id;
250     }
251 
252     //...Final I/O blk count, msg rcv count, etc for this job step. These
253     //...methods do not use a mutex lock to acquire values, because it is
254     //...assumed they are called after all processing is complete.
phyIOCount()255     virtual uint64_t phyIOCount    ( ) const
256     {
257         return 0;
258     }
cacheIOCount()259     virtual uint64_t cacheIOCount  ( ) const
260     {
261         return 0;
262     }
msgsRcvdCount()263     virtual uint64_t msgsRcvdCount ( ) const
264     {
265         return 0;
266     }
msgBytesIn()267     virtual uint64_t msgBytesIn    ( ) const
268     {
269         return 0;
270     }
msgBytesOut()271     virtual uint64_t msgBytesOut   ( ) const
272     {
273         return 0;
274     }
blockTouched()275     virtual uint64_t blockTouched  ( ) const
276     {
277         return 0;
278     }
cardinality()279     virtual uint64_t cardinality   ( ) const
280     {
281         return fCardinality;
282     }
cardinality(const uint64_t cardinality)283     virtual void cardinality ( const uint64_t cardinality )
284     {
285         fCardinality = cardinality;
286     }
287 
288     // functions to delay/control jobstep execution; decWaitToRunStepCnt() per-
289     // forms atomic decrement op because it is accessed by multiple threads.
delayedRun()290     bool     delayedRun() const
291     {
292         return fDelayedRunFlag;
293     }
waitToRunStepCnt()294     int      waitToRunStepCnt()
295     {
296         return fWaitToRunStepCnt;
297     }
incWaitToRunStepCnt()298     void     incWaitToRunStepCnt()
299     {
300         fDelayedRunFlag = true;
301         ++fWaitToRunStepCnt;
302     }
decWaitToRunStepCnt()303     int      decWaitToRunStepCnt()
304     {
305         return atomicops::atomicDec(&fWaitToRunStepCnt);
306     }
resetDelayedRun()307     void resetDelayedRun()
308     {
309         fDelayedRunFlag = false;
310         fWaitToRunStepCnt = 0;
311     }
312 
logEnd(const char * s)313     void logEnd(const char* s)
314     {
315         fLogMutex.lock(); //pthread_mutex_lock(&mutex);
316         std::cout << s << std::endl;
317         fLogMutex.unlock(); //pthread_mutex_unlock(&mutex);
318     }
319     void syslogStartStep(uint32_t subSystem,
320                          const std::string& stepName) const;
321     void syslogEndStep  (uint32_t subSystem,
322                          uint64_t blockedDLInput,
323                          uint64_t blockedDLOutput,
324                          uint64_t msgBytesInput = 0,
325                          uint64_t msgBytesOutput = 0 )   const;
326     void syslogReadBlockCounts (uint32_t subSystem,
327                                 uint64_t physicalReadCount,
328                                 uint64_t cacheReadCount,
329                                 uint64_t casualPartBlocks )   const;
330     void syslogProcessingTimes (uint32_t subSystem,
331                                 const struct timeval&   firstReadTime,
332                                 const struct timeval&   lastReadTime,
333                                 const struct timeval&   firstWriteTime,
334                                 const struct timeval&   lastWriteTime) const;
335     void setTrace(bool trace) __attribute__((deprecated));
336     bool traceOn() const;
setTraceFlags(uint32_t flags)337     void setTraceFlags(uint32_t flags)
338     {
339         fTraceFlags = flags;
340     }
341     JSTimeStamp dlTimes;
342 
extendedInfo()343     const std::string& extendedInfo() const
344     {
345         return fExtendedInfo;
346     }
miniInfo()347     const std::string& miniInfo() const
348     {
349         return fMiniInfo;
350     }
351 
priority()352     uint32_t priority()
353     {
354         return fPriority;
355     }
priority(uint32_t p)356     void priority(uint32_t p)
357     {
358         fPriority = p;
359     }
360 
status()361     uint32_t status() const
362     {
363         return fErrorInfo->errCode;
364     }
status(uint32_t s)365     void  status(uint32_t s)
366     {
367         fErrorInfo->errCode = s;
368     }
errorMessage()369     std::string errorMessage()
370     {
371         return fErrorInfo->errMsg;
372     }
errorMessage(const std::string & s)373     void errorMessage(const std::string& s)
374     {
375         fErrorInfo->errMsg = s;
376     }
errorInfo()377     const SErrorInfo& errorInfo() const
378     {
379         return fErrorInfo;
380     }
errorInfo()381     SErrorInfo& errorInfo()
382     {
383         return fErrorInfo;
384     }
errorInfo(SErrorInfo & sp)385     void errorInfo(SErrorInfo& sp)
386     {
387         fErrorInfo = sp;
388     }
389 
cancelled()390     bool cancelled()
391     {
392         return (fErrorInfo->errCode > 0 || fDie);
393     }
394 
stringTableFriendly()395     virtual bool stringTableFriendly()
396     {
397         return false;
398     }
399 
delivery()400     bool delivery() const
401     {
402         return fDelivery;
403     }
delivery(bool b)404     void delivery(bool b)
405     {
406         fDelivery = b;
407     }
408 
queryUuid()409     const boost::uuids::uuid& queryUuid() const
410     {
411         return fQueryUuid;
412     }
413 
414     //@bug5887, distinguish on clause filter and where clause filter
onClauseFilter()415     bool onClauseFilter() const
416     {
417         return fOnClauseFilter;
418     }
onClauseFilter(bool b)419     void onClauseFilter(bool b)
420     {
421         fOnClauseFilter = b;
422     }
423 
timeZone(const std::string & timezone)424     void timeZone(const std::string& timezone)
425     {
426         fTimeZone = timezone;
427     }
timeZone()428     const std::string timeZone() const
429     {
430         return fTimeZone;
431     }
432 
433     void handleException(std::exception_ptr e,
434                          const int errorCode,
435                          const unsigned infoErrorCode,
436                          const std::string& methodName);
437 
438     static threadpool::ThreadPool jobstepThreadPool;
439 protected:
440 
441     //@bug6088, for telemetry posting
442     static const int64_t STEP_TELE_INTERVAL = 5000;  // now, this is the browser refresh rate
postStepStartTele(querytele::StepTeleStats & sts)443     void postStepStartTele(querytele::StepTeleStats& sts)
444     {
445         fStartTime = fLastStepTeleTime = sts.start_time = querytele::QueryTeleClient::timeNowms();
446         fQtc.postStepTele(sts);
447     }
postStepProgressTele(querytele::StepTeleStats & sts)448     void postStepProgressTele(querytele::StepTeleStats& sts)
449     {
450         int64_t crntTime = querytele::QueryTeleClient::timeNowms();
451 
452         if ((crntTime - fLastStepTeleTime) >= STEP_TELE_INTERVAL)
453         {
454             // interval between step telemetry
455             sts.start_time = fStartTime;
456             fQtc.postStepTele(sts);
457             fLastStepTeleTime = crntTime;
458         }
459     }
postStepSummaryTele(querytele::StepTeleStats & sts)460     void postStepSummaryTele(querytele::StepTeleStats& sts)
461     {
462         sts.start_time = fStartTime;
463         sts.end_time = fLastStepTeleTime = querytele::QueryTeleClient::timeNowms();
464         fQtc.postStepTele(sts);
465     }
466 
467     JobStepAssociation fInputJobStepAssociation;
468     JobStepAssociation fOutputJobStepAssociation;
469 
470     uint32_t fSessionId;
471     uint32_t fTxnId;
472     BRM::QueryContext fVerId;
473     uint32_t fStatementId;
474 
475     uint32_t fStepId;
476     uint64_t fTupleId;
477 
478     std::string fAlias;
479     std::string fView;
480     std::string fName;
481     std::string fSchema;
482     uint32_t    fTraceFlags;
483     uint64_t    fCardinality;
484     bool        fDelayedRunFlag;
485     bool        fDelivery;
486     bool        fOnClauseFilter;
487     volatile bool fDie;
488     volatile uint32_t fWaitToRunStepCnt;
489     std::string fExtendedInfo;
490     std::string fMiniInfo;
491 
492     uint32_t fPriority;
493 
494     SErrorInfo fErrorInfo;
495     SPJL fLogger;
496 
497     uint32_t fLocalQuery;
498 
499     boost::uuids::uuid fQueryUuid;
500     boost::uuids::uuid fStepUuid;
501     querytele::QueryTeleClient fQtc;
502     uint64_t fProgress;
503     int64_t  fStartTime;
504     int64_t  fLastStepTeleTime;
505     std::string fTimeZone;
506 
507 private:
508     static boost::mutex fLogMutex;
509 
510     friend class CommandJL;
511 };
512 
513 
514 class TupleJobStep
515 {
516 public:
TupleJobStep()517     TupleJobStep() { }
~TupleJobStep()518     virtual ~TupleJobStep() { }
519     virtual void  setOutputRowGroup(const rowgroup::RowGroup&) = 0;
setFcnExpGroup3(const std::vector<execplan::SRCP> &)520     virtual void  setFcnExpGroup3(const std::vector<execplan::SRCP>&) {}
setFE23Output(const rowgroup::RowGroup &)521     virtual void  setFE23Output(const rowgroup::RowGroup&) {}
522     virtual const rowgroup::RowGroup& getOutputRowGroup() const = 0;
523 };
524 
525 
526 class TupleDeliveryStep : public TupleJobStep
527 {
528 public:
~TupleDeliveryStep()529     virtual ~TupleDeliveryStep() { }
530     virtual uint32_t  nextBand(messageqcpp::ByteStream& bs) = 0;
531     virtual const rowgroup::RowGroup& getDeliveredRowGroup() const = 0;
532     virtual void  deliverStringTableRowGroup(bool b) = 0;
533     virtual bool  deliverStringTableRowGroup() const = 0;
534 };
535 
536 class NullStep : public JobStep
537 {
538 public:
539     /** @brief virtual void Run method
540      */
run()541     virtual void run() {}
542     /** @brief virtual void join method
543      */
join()544     virtual void join() {}
545     /** @brief virtual string toString method
546      */
toString()547     virtual const std::string toString() const
548     {
549         return "NullStep";
550     }
551 };
552 
553 // calls rhs->toString()
554 std::ostream& operator<<(std::ostream& os, const JobStep* rhs);
555 
556 typedef boost::shared_ptr<JobStepAssociation> SJSA;
557 typedef boost::shared_ptr<JobStepAssociation> JobStepAssociationSPtr;
558 
559 typedef boost::shared_ptr<JobStep> SJSTEP;
560 
561 }
562 
563 #endif  // JOBLIST_JOBSTEP_H_
564 // vim:ts=4 sw=4:
565 
566