1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: tupleconstantstep.cpp 9649 2013-06-25 16:08:05Z xlou $
20 
21 
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 using namespace std;
27 
28 #include <boost/shared_ptr.hpp>
29 #include <boost/shared_array.hpp>
30 #include <boost/uuid/uuid_io.hpp>
31 using namespace boost;
32 
33 #include "messagequeue.h"
34 using namespace messageqcpp;
35 
36 #include "loggingid.h"
37 #include "errorcodes.h"
38 #include "idberrorinfo.h"
39 #include "errorids.h"
40 using namespace logging;
41 
42 #include "calpontsystemcatalog.h"
43 #include "constantcolumn.h"
44 using namespace execplan;
45 
46 #include "jobstep.h"
47 #include "rowgroup.h"
48 using namespace rowgroup;
49 
50 #include "querytele.h"
51 using namespace querytele;
52 
53 #include "jlf_common.h"
54 #include "tupleconstantstep.h"
55 
56 
57 namespace joblist
58 {
59 // static utility method
addConstantStep(const JobInfo & jobInfo,const rowgroup::RowGroup * rg)60 SJSTEP TupleConstantStep::addConstantStep(const JobInfo& jobInfo, const rowgroup::RowGroup* rg)
61 {
62     TupleConstantStep* tcs = NULL;
63 
64     if (jobInfo.constantCol != CONST_COL_ONLY)
65     {
66         tcs = new TupleConstantStep(jobInfo);
67     }
68     else
69     {
70         tcs = new TupleConstantOnlyStep(jobInfo);
71     }
72 
73     tcs->initialize(jobInfo, rg);
74     SJSTEP spcs(tcs);
75     return spcs;
76 }
77 
78 
TupleConstantStep(const JobInfo & jobInfo)79 TupleConstantStep::TupleConstantStep(const JobInfo& jobInfo) :
80     JobStep(jobInfo),
81     fRowsReturned(0),
82     fInputDL(NULL),
83     fOutputDL(NULL),
84     fInputIterator(0),
85     fRunner(0),
86     fEndOfResult(false)
87 {
88     fExtendedInfo = "TCS: ";
89     fQtc.stepParms().stepType = StepTeleStats::T_TCS;
90 }
91 
92 
~TupleConstantStep()93 TupleConstantStep::~TupleConstantStep()
94 {
95 }
96 
97 
setOutputRowGroup(const rowgroup::RowGroup & rg)98 void TupleConstantStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
99 {
100     throw runtime_error("Disabled, use initialize() to set output RowGroup.");
101 }
102 
103 
initialize(const JobInfo & jobInfo,const RowGroup * rgIn)104 void TupleConstantStep::initialize(const JobInfo& jobInfo, const RowGroup* rgIn)
105 {
106     vector<uint32_t> oids, oidsIn = fRowGroupIn.getOIDs();
107     vector<uint32_t> keys, keysIn = fRowGroupIn.getKeys();
108     vector<uint32_t> scale, scaleIn = fRowGroupIn.getScale();
109     vector<uint32_t> precision, precisionIn = fRowGroupIn.getPrecision();
110     vector<CalpontSystemCatalog::ColDataType> types, typesIn = fRowGroupIn.getColTypes();
111     vector<uint32_t> csNums, csNumsIn = fRowGroupIn.getCharsetNumbers();
112     vector<uint32_t> pos;
113     pos.push_back(2);
114 
115     if (rgIn)
116     {
117         fRowGroupIn = *rgIn;
118         fRowGroupIn.initRow(&fRowIn);
119         oidsIn = fRowGroupIn.getOIDs();
120         keysIn = fRowGroupIn.getKeys();
121         scaleIn = fRowGroupIn.getScale();
122         precisionIn = fRowGroupIn.getPrecision();
123         typesIn = fRowGroupIn.getColTypes();
124         csNumsIn = fRowGroupIn.getCharsetNumbers();
125     }
126 
127     for (uint64_t i = 0, j = 0; i < jobInfo.deliveredCols.size(); i++)
128     {
129         const ConstantColumn* cc =
130             dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[i].get());
131 
132         if (cc != NULL)
133         {
134             CalpontSystemCatalog::ColType ct = cc->resultType();
135 
136             if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
137                 ct.colWidth++;
138 
139             //Round colWidth up
140             if (ct.colWidth == 3)
141                 ct.colWidth = 4;
142             else if (ct.colWidth == 5 || ct.colWidth == 6 || ct.colWidth == 7)
143                 ct.colWidth = 8;
144 
145             oids.push_back(-1);
146             keys.push_back(-1);
147             scale.push_back(ct.scale);
148             precision.push_back(ct.precision);
149             types.push_back(ct.colDataType);
150             csNums.push_back(ct.charsetNumber);
151             pos.push_back(pos.back() + ct.colWidth);
152 
153             fIndexConst.push_back(i);
154         }
155         else
156         {
157             // select (select a) from region;
158             if (j >= oidsIn.size() && jobInfo.tableList.empty())
159             {
160                 throw IDBExcept(ERR_NO_FROM);
161             }
162 
163             idbassert(j < oidsIn.size());
164 
165             oids.push_back(oidsIn[j]);
166             keys.push_back(keysIn[j]);
167             scale.push_back(scaleIn[j]);
168             precision.push_back(precisionIn[j]);
169             types.push_back(typesIn[j]);
170             csNums.push_back(csNumsIn[j]);
171             pos.push_back(pos.back() + fRowGroupIn.getColumnWidth(j));
172             j++;
173 
174             fIndexMapping.push_back(i);
175         }
176     }
177 
178     fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
179                             jobInfo.stringTableThreshold);
180     fRowGroupOut.initRow(&fRowOut);
181     fRowGroupOut.initRow(&fRowConst, true);
182 
183     constructContanstRow(jobInfo);
184 }
185 
186 
constructContanstRow(const JobInfo & jobInfo)187 void TupleConstantStep::constructContanstRow(const JobInfo& jobInfo)
188 {
189     // construct a row with only the constant values
190     fConstRowData.reset(new uint8_t[fRowConst.getSize()]);
191     fRowConst.setData(fConstRowData.get());
192     fRowConst.initToNull(); // make sure every col is init'd to something, because later we copy the whole row
193     const vector<CalpontSystemCatalog::ColDataType>& types = fRowGroupOut.getColTypes();
194 
195     for (vector<uint64_t>::iterator i = fIndexConst.begin(); i != fIndexConst.end(); i++)
196     {
197         const ConstantColumn* cc =
198             dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[*i].get());
199         const execplan::Result c = cc->result();
200 
201         if (cc->type() == ConstantColumn::NULLDATA)
202         {
203             if (types[*i] == CalpontSystemCatalog::CHAR ||
204                     types[*i] == CalpontSystemCatalog::VARCHAR ||
205                     types[*i] == CalpontSystemCatalog::TEXT)
206             {
207                 fRowConst.setStringField("", *i);
208             }
209             else if (isUnsigned(types[*i]))
210             {
211                 fRowConst.setUintField(fRowConst.getNullValue(*i), *i);
212             }
213             else
214             {
215                 fRowConst.setIntField(fRowConst.getSignedNullValue(*i), *i);
216             }
217 
218             continue;
219         }
220 
221 
222         switch (types[*i])
223         {
224             case CalpontSystemCatalog::TINYINT:
225             case CalpontSystemCatalog::SMALLINT:
226             case CalpontSystemCatalog::MEDINT:
227             case CalpontSystemCatalog::INT:
228             case CalpontSystemCatalog::BIGINT:
229             case CalpontSystemCatalog::DATE:
230             case CalpontSystemCatalog::DATETIME:
231             case CalpontSystemCatalog::TIME:
232             case CalpontSystemCatalog::TIMESTAMP:
233             {
234                 fRowConst.setIntField(c.intVal, *i);
235                 break;
236             }
237 
238             case CalpontSystemCatalog::DECIMAL:
239             case CalpontSystemCatalog::UDECIMAL:
240             {
241                 fRowConst.setIntField(c.decimalVal.value, *i);
242                 break;
243             }
244 
245             case CalpontSystemCatalog::FLOAT:
246             case CalpontSystemCatalog::UFLOAT:
247             {
248                 fRowConst.setFloatField(c.floatVal, *i);
249                 break;
250             }
251 
252             case CalpontSystemCatalog::DOUBLE:
253             case CalpontSystemCatalog::UDOUBLE:
254             {
255                 fRowConst.setDoubleField(c.doubleVal, *i);
256                 break;
257             }
258 
259             case CalpontSystemCatalog::LONGDOUBLE:
260             {
261                 fRowConst.setLongDoubleField(c.longDoubleVal, *i);
262                 break;
263             }
264 
265             case CalpontSystemCatalog::CHAR:
266             case CalpontSystemCatalog::VARCHAR:
267             case CalpontSystemCatalog::TEXT:
268             {
269                 fRowConst.setStringField(c.strVal, *i);
270                 break;
271             }
272 
273             case CalpontSystemCatalog::UTINYINT:
274             case CalpontSystemCatalog::USMALLINT:
275             case CalpontSystemCatalog::UMEDINT:
276             case CalpontSystemCatalog::UINT:
277             case CalpontSystemCatalog::UBIGINT:
278             {
279                 fRowConst.setUintField(c.uintVal, *i);
280                 break;
281             }
282 
283             default:
284             {
285                 throw runtime_error("un-supported constant column type.");
286                 break;
287             }
288         } // switch
289     } // for constant columns
290 }
291 
292 
run()293 void TupleConstantStep::run()
294 {
295     if (fInputJobStepAssociation.outSize() == 0)
296         throw logic_error("No input data list for constant step.");
297 
298     fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
299 
300     if (fInputDL == NULL)
301         throw logic_error("Input is not a RowGroup data list.");
302 
303     fInputIterator = fInputDL->getIterator();
304 
305     if (fDelivery == false)
306     {
307         if (fOutputJobStepAssociation.outSize() == 0)
308             throw logic_error("No output data list for non-delivery constant step.");
309 
310         fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
311 
312         if (fOutputDL == NULL)
313             throw logic_error("Output is not a RowGroup data list.");
314 
315         fRunner = jobstepThreadPool.invoke(Runner(this));
316     }
317 }
318 
319 
join()320 void TupleConstantStep::join()
321 {
322     if (fRunner)
323         jobstepThreadPool.join(fRunner);
324 }
325 
326 
nextBand(messageqcpp::ByteStream & bs)327 uint32_t TupleConstantStep::nextBand(messageqcpp::ByteStream& bs)
328 {
329     RGData rgDataIn;
330     RGData rgDataOut;
331     bool more = false;
332     uint32_t rowCount = 0;
333 
334     try
335     {
336         bs.restart();
337 
338         more = fInputDL->next(fInputIterator, &rgDataIn);
339 
340         if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
341             dlTimes.setFirstReadTime();
342 
343         if (!more && cancelled())
344         {
345             fEndOfResult = true;
346         }
347 
348         if (more && !fEndOfResult)
349         {
350             fRowGroupIn.setData(&rgDataIn);
351             rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
352             fRowGroupOut.setData(&rgDataOut);
353 
354             fillInConstants();
355             fRowGroupOut.serializeRGData(bs);
356             rowCount = fRowGroupOut.getRowCount();
357         }
358         else
359         {
360             fEndOfResult = true;
361         }
362     }
363     catch (...)
364     {
365         handleException(std::current_exception(),
366                         logging::tupleConstantStepErr,
367                         logging::ERR_ALWAYS_CRITICAL,
368                         "TupleConstantStep::nextBand()");
369         while (more)
370             more = fInputDL->next(fInputIterator, &rgDataIn);
371 
372         fEndOfResult = true;
373     }
374 
375     if (fEndOfResult)
376     {
377         // send an empty / error band
378         RGData rgData(fRowGroupOut, 0);
379         fRowGroupOut.setData(&rgData);
380         fRowGroupOut.resetRowGroup(0);
381         fRowGroupOut.setStatus(status());
382         fRowGroupOut.serializeRGData(bs);
383 
384         if (traceOn())
385         {
386             dlTimes.setLastReadTime();
387             dlTimes.setEndOfInputTime();
388             printCalTrace();
389         }
390     }
391 
392     return rowCount;
393 }
394 
395 
execute()396 void TupleConstantStep::execute()
397 {
398     RGData rgDataIn;
399     RGData rgDataOut;
400     bool more = false;
401     StepTeleStats sts;
402     sts.query_uuid = fQueryUuid;
403     sts.step_uuid = fStepUuid;
404 
405     try
406     {
407         more = fInputDL->next(fInputIterator, &rgDataIn);
408 
409         if (traceOn()) dlTimes.setFirstReadTime();
410 
411         sts.msg_type = StepTeleStats::ST_START;
412         sts.total_units_of_work = 1;
413         postStepStartTele(sts);
414 
415         if (!more && cancelled())
416         {
417             fEndOfResult = true;
418         }
419 
420         while (more && !fEndOfResult)
421         {
422             fRowGroupIn.setData(&rgDataIn);
423             rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
424             fRowGroupOut.setData(&rgDataOut);
425 
426             fillInConstants();
427 
428             more = fInputDL->next(fInputIterator, &rgDataIn);
429 
430             if (cancelled())
431             {
432                 fEndOfResult = true;
433             }
434             else
435             {
436                 fOutputDL->insert(rgDataOut);
437             }
438         }
439     }
440     catch (...)
441     {
442         handleException(std::current_exception(),
443                         logging::tupleConstantStepErr,
444                         logging::ERR_ALWAYS_CRITICAL,
445                         "TupleConstantStep::execute()");
446     }
447 
448     while (more)
449         more = fInputDL->next(fInputIterator, &rgDataIn);
450 
451     sts.msg_type = StepTeleStats::ST_SUMMARY;
452     sts.total_units_of_work = sts.units_of_work_completed = 1;
453     sts.rows = fRowsReturned;
454     postStepSummaryTele(sts);
455 
456     // Bug 3136, let mini stats to be formatted if traceOn.
457     if (traceOn())
458     {
459         dlTimes.setLastReadTime();
460         dlTimes.setEndOfInputTime();
461         printCalTrace();
462     }
463 
464     fEndOfResult = true;
465     fOutputDL->endOfInput();
466 }
467 
468 // *DRRTUY Copy row at once not one field at a time
fillInConstants()469 void TupleConstantStep::fillInConstants()
470 {
471     fRowGroupIn.getRow(0, &fRowIn);
472     fRowGroupOut.getRow(0, &fRowOut);
473 
474     if (fIndexConst.size() > 1 || fIndexConst[0] != 0)
475     {
476         for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
477         {
478             copyRow(fRowConst, &fRowOut);
479 
480             fRowOut.setRid(fRowIn.getRelRid());
481 
482             for (uint64_t j = 0; j < fIndexMapping.size(); ++j)
483                 fRowIn.copyField(fRowOut, fIndexMapping[j], j);
484 
485             fRowIn.nextRow();
486             fRowOut.nextRow();
487         }
488     }
489     else // only first column is constant
490     {
491         for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
492         {
493             fRowOut.setRid(fRowIn.getRelRid());
494             fRowConst.copyField(fRowOut, 0, 0);
495 
496             for (uint32_t i = 1; i < fRowOut.getColumnCount(); i++)
497                 fRowIn.copyField(fRowOut, i, i - 1);
498 
499             fRowIn.nextRow();
500             fRowOut.nextRow();
501         }
502     }
503 
504     fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
505     fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
506     fRowsReturned += fRowGroupOut.getRowCount();
507 }
508 
509 
fillInConstants(const rowgroup::Row & rowIn,rowgroup::Row & rowOut)510 void TupleConstantStep::fillInConstants(const rowgroup::Row& rowIn, rowgroup::Row& rowOut)
511 {
512     if (fIndexConst.size() > 1 || fIndexConst[0] != 0)
513     {
514         copyRow(fRowConst, &rowOut);
515         //memcpy(rowOut.getData(), fRowConst.getData(), fRowConst.getSize());
516         rowOut.setRid(rowIn.getRelRid());
517 
518         for (uint64_t j = 0; j < fIndexMapping.size(); ++j)
519             rowIn.copyField(rowOut, fIndexMapping[j], j);
520 
521         //rowIn.copyField(rowOut.getData() + rowOut.getOffset(fIndexMapping[j]), j);
522     }
523     else // only first column is constant
524     {
525         //size_t n = rowOut.getOffset(rowOut.getColumnCount()) - rowOut.getOffset(1);
526         rowOut.setRid(rowIn.getRelRid());
527         fRowConst.copyField(rowOut, 0, 0);
528 
529         //fRowConst.copyField(rowOut.getData()+2, 0); // hardcoded 2 for rid length
530         for (uint32_t i = 1; i < rowOut.getColumnCount(); i++)
531             rowIn.copyField(rowOut, i, i - 1);
532 
533         //memcpy(rowOut.getData()+rowOut.getOffset(1), rowIn.getData()+2, n);
534     }
535 }
536 
537 
getOutputRowGroup() const538 const RowGroup& TupleConstantStep::getOutputRowGroup() const
539 {
540     return fRowGroupOut;
541 }
542 
543 
getDeliveredRowGroup() const544 const RowGroup& TupleConstantStep::getDeliveredRowGroup() const
545 {
546     return fRowGroupOut;
547 }
548 
549 
deliverStringTableRowGroup(bool b)550 void TupleConstantStep::deliverStringTableRowGroup(bool b)
551 {
552     fRowGroupOut.setUseStringTable(b);
553 }
554 
555 
deliverStringTableRowGroup() const556 bool TupleConstantStep::deliverStringTableRowGroup() const
557 {
558     return fRowGroupOut.usesStringTable();
559 }
560 
561 
toString() const562 const string TupleConstantStep::toString() const
563 {
564     ostringstream oss;
565     oss << "ConstantStep   ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
566 
567     oss << " in:";
568 
569     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
570         oss << fInputJobStepAssociation.outAt(i);
571 
572     oss << " out:";
573 
574     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
575         oss << fOutputJobStepAssociation.outAt(i);
576 
577     oss << endl;
578 
579     return oss.str();
580 }
581 
582 
printCalTrace()583 void TupleConstantStep::printCalTrace()
584 {
585     time_t t = time (0);
586     char timeString[50];
587     ctime_r (&t, timeString);
588     timeString[strlen (timeString ) - 1] = '\0';
589     ostringstream logStr;
590     logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
591             << "; total rows returned-" << fRowsReturned << endl
592             << "\t1st read " << dlTimes.FirstReadTimeString()
593             << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
594             << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
595             << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
596             << "\tJob completion status " << status() << endl;
597     logEnd(logStr.str().c_str());
598     fExtendedInfo += logStr.str();
599     formatMiniStats();
600 }
601 
602 
formatMiniStats()603 void TupleConstantStep::formatMiniStats()
604 {
605     ostringstream oss;
606     oss << "TCS "
607         << "UM "
608         << "- "
609         << "- "
610         << "- "
611         << "- "
612         << "- "
613         << "- "
614         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
615         << fRowsReturned << " ";
616     fMiniInfo += oss.str();
617 }
618 
619 
620 // class TupleConstantOnlyStep
TupleConstantOnlyStep(const JobInfo & jobInfo)621 TupleConstantOnlyStep::TupleConstantOnlyStep(const JobInfo& jobInfo) : TupleConstantStep(jobInfo)
622 {
623 //	fExtendedInfo = "TCOS: ";
624 }
625 
626 
~TupleConstantOnlyStep()627 TupleConstantOnlyStep::~TupleConstantOnlyStep()
628 {
629 }
630 
631 
632 //void TupleConstantOnlyStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
initialize(const JobInfo & jobInfo,const rowgroup::RowGroup * rgIn)633 void TupleConstantOnlyStep::initialize(const JobInfo& jobInfo, const rowgroup::RowGroup* rgIn)
634 {
635     vector<uint32_t> oids;
636     vector<uint32_t> keys;
637     vector<uint32_t> scale;
638     vector<uint32_t> precision;
639     vector<CalpontSystemCatalog::ColDataType> types;
640     vector<uint32_t> csNums;
641     vector<uint32_t> pos;
642     pos.push_back(2);
643 
644     deliverStringTableRowGroup(false);
645 
646     for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++)
647     {
648         const ConstantColumn* cc =
649             dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[i].get());
650 
651         if (cc == NULL)
652             throw runtime_error("none constant column found.");
653 
654         CalpontSystemCatalog::ColType ct = cc->resultType();
655 
656         if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
657             ct.colWidth++;
658 
659         //Round colWidth up
660         if (ct.colWidth == 3)
661             ct.colWidth = 4;
662         else if (ct.colWidth == 5 || ct.colWidth == 6 || ct.colWidth == 7)
663             ct.colWidth = 8;
664 
665         oids.push_back(-1);
666         keys.push_back(-1);
667         scale.push_back(ct.scale);
668         precision.push_back(ct.precision);
669         types.push_back(ct.colDataType);
670         csNums.push_back(ct.charsetNumber);
671         pos.push_back(pos.back() + ct.colWidth);
672 
673         fIndexConst.push_back(i);
674     }
675 
676     fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold, false);
677     fRowGroupOut.initRow(&fRowOut);
678     fRowGroupOut.initRow(&fRowConst, true);
679 
680     constructContanstRow(jobInfo);
681 }
682 
683 
run()684 void TupleConstantOnlyStep::run()
685 {
686     if (fDelivery == false)
687     {
688         if (fOutputJobStepAssociation.outSize() == 0)
689             throw logic_error("No output data list for non-delivery constant step.");
690 
691         fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
692 
693         if (fOutputDL == NULL)
694             throw logic_error("Output is not a RowGroup data list.");
695 
696         try
697         {
698             RGData rgDataOut(fRowGroupOut, 1);
699             fRowGroupOut.setData(&rgDataOut);
700 
701             if (traceOn()) dlTimes.setFirstReadTime();
702 
703             fillInConstants();
704 
705             fOutputDL->insert(rgDataOut);
706         }
707         catch (...)
708         {
709             handleException(std::current_exception(),
710                             logging::tupleConstantStepErr,
711                             logging::ERR_ALWAYS_CRITICAL,
712                             "TupleConstantOnlyStep::run()");
713         }
714 
715         if (traceOn())
716         {
717             dlTimes.setLastReadTime();
718             dlTimes.setEndOfInputTime();
719             printCalTrace();
720         }
721 
722         // Bug 3136, let mini stats to be formatted if traceOn.
723         fEndOfResult = true;
724         fOutputDL->endOfInput();
725     }
726 }
727 
728 
nextBand(messageqcpp::ByteStream & bs)729 uint32_t TupleConstantOnlyStep::nextBand(messageqcpp::ByteStream& bs)
730 {
731     RGData rgDataOut;
732     uint32_t rowCount = 0;
733 
734     if (!fEndOfResult)
735     {
736         try
737         {
738             bs.restart();
739 
740             if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
741                 dlTimes.setFirstReadTime();
742 
743             rgDataOut.reinit(fRowGroupOut, 1);
744             fRowGroupOut.setData(&rgDataOut);
745 
746             fillInConstants();
747             fRowGroupOut.serializeRGData(bs);
748             rowCount = fRowGroupOut.getRowCount();
749         }
750         catch (...)
751         {
752             handleException(std::current_exception(),
753                             logging::tupleConstantStepErr,
754                             logging::ERR_ALWAYS_CRITICAL,
755                             "TupleConstantOnlyStep::nextBand()");
756         }
757 
758         fEndOfResult = true;
759     }
760     else
761     {
762         // send an empty / error band
763         RGData rgData(fRowGroupOut, 0);
764         fRowGroupOut.setData(&rgData);
765         fRowGroupOut.resetRowGroup(0);
766         fRowGroupOut.setStatus(status());
767         fRowGroupOut.serializeRGData(bs);
768 
769         if (traceOn())
770         {
771             dlTimes.setLastReadTime();
772             dlTimes.setEndOfInputTime();
773             printCalTrace();
774         }
775     }
776 
777     return rowCount;
778 }
779 
780 
fillInConstants()781 void TupleConstantOnlyStep::fillInConstants()
782 {
783     fRowGroupOut.getRow(0, &fRowOut);
784     idbassert(fRowConst.getColumnCount() == fRowOut.getColumnCount());
785     fRowOut.usesStringTable(fRowConst.usesStringTable());
786     copyRow(fRowConst, &fRowOut);
787     fRowGroupOut.resetRowGroup(0);
788     fRowGroupOut.setRowCount(1);
789     fRowsReturned = 1;
790 }
791 
792 
toString() const793 const string TupleConstantOnlyStep::toString() const
794 {
795     ostringstream oss;
796     oss << "ConstantOnlyStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
797 
798     oss << " out:";
799 
800     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
801         oss << fOutputJobStepAssociation.outAt(i);
802 
803     oss << endl;
804 
805     return oss.str();
806 }
807 
808 
809 // class TupleConstantBooleanStep
TupleConstantBooleanStep(const JobInfo & jobInfo,bool value)810 TupleConstantBooleanStep::TupleConstantBooleanStep(const JobInfo& jobInfo, bool value) :
811     TupleConstantStep(jobInfo),
812     fValue(value)
813 {
814 //	fExtendedInfo = "TCBS: ";
815 }
816 
817 
~TupleConstantBooleanStep()818 TupleConstantBooleanStep::~TupleConstantBooleanStep()
819 {
820 }
821 
822 
initialize(const RowGroup & rgIn,const JobInfo &)823 void TupleConstantBooleanStep::initialize(const RowGroup& rgIn, const JobInfo&)
824 {
825     fRowGroupOut = rgIn;
826     fRowGroupOut.initRow(&fRowOut);
827     fRowGroupOut.initRow(&fRowConst, true);
828 }
829 
830 
run()831 void TupleConstantBooleanStep::run()
832 {
833     if (fDelivery == false)
834     {
835         if (fOutputJobStepAssociation.outSize() == 0)
836             throw logic_error("No output data list for non-delivery constant step.");
837 
838         fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
839 
840         if (fOutputDL == NULL)
841             throw logic_error("Output is not a RowGroup data list.");
842 
843         if (traceOn())
844         {
845             dlTimes.setFirstReadTime();
846             dlTimes.setLastReadTime();
847             dlTimes.setEndOfInputTime();
848             printCalTrace();
849         }
850 
851         // Bug 3136, let mini stats to be formatted if traceOn.
852         fOutputDL->endOfInput();
853     }
854 }
855 
856 
nextBand(messageqcpp::ByteStream & bs)857 uint32_t TupleConstantBooleanStep::nextBand(messageqcpp::ByteStream& bs)
858 {
859     // send an empty band
860     RGData rgData(fRowGroupOut, 0);
861     fRowGroupOut.setData(&rgData);
862     fRowGroupOut.resetRowGroup(0);
863     fRowGroupOut.setStatus(status());
864     fRowGroupOut.serializeRGData(bs);
865 
866     if (traceOn())
867     {
868         dlTimes.setFirstReadTime();
869         dlTimes.setLastReadTime();
870         dlTimes.setEndOfInputTime();
871         printCalTrace();
872     }
873 
874     return 0;
875 }
876 
877 
toString() const878 const string TupleConstantBooleanStep::toString() const
879 {
880     ostringstream oss;
881     oss << "ConstantBooleanStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
882 
883     oss << " out:";
884 
885     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
886         oss << fOutputJobStepAssociation.outAt(i);
887 
888     oss << endl;
889 
890     return oss.str();
891 }
892 
893 
894 }   //namespace
895 // vim:ts=4 sw=4:
896 
897