1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: tupleconstantstep.cpp 9649 2013-06-25 16:08:05Z xlou $
20
21
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 using namespace std;
27
28 #include <boost/shared_ptr.hpp>
29 #include <boost/shared_array.hpp>
30 #include <boost/uuid/uuid_io.hpp>
31 using namespace boost;
32
33 #include "messagequeue.h"
34 using namespace messageqcpp;
35
36 #include "loggingid.h"
37 #include "errorcodes.h"
38 #include "idberrorinfo.h"
39 #include "errorids.h"
40 using namespace logging;
41
42 #include "calpontsystemcatalog.h"
43 #include "constantcolumn.h"
44 using namespace execplan;
45
46 #include "jobstep.h"
47 #include "rowgroup.h"
48 using namespace rowgroup;
49
50 #include "querytele.h"
51 using namespace querytele;
52
53 #include "jlf_common.h"
54 #include "tupleconstantstep.h"
55
56
57 namespace joblist
58 {
59 // static utility method
addConstantStep(const JobInfo & jobInfo,const rowgroup::RowGroup * rg)60 SJSTEP TupleConstantStep::addConstantStep(const JobInfo& jobInfo, const rowgroup::RowGroup* rg)
61 {
62 TupleConstantStep* tcs = NULL;
63
64 if (jobInfo.constantCol != CONST_COL_ONLY)
65 {
66 tcs = new TupleConstantStep(jobInfo);
67 }
68 else
69 {
70 tcs = new TupleConstantOnlyStep(jobInfo);
71 }
72
73 tcs->initialize(jobInfo, rg);
74 SJSTEP spcs(tcs);
75 return spcs;
76 }
77
78
TupleConstantStep(const JobInfo & jobInfo)79 TupleConstantStep::TupleConstantStep(const JobInfo& jobInfo) :
80 JobStep(jobInfo),
81 fRowsReturned(0),
82 fInputDL(NULL),
83 fOutputDL(NULL),
84 fInputIterator(0),
85 fRunner(0),
86 fEndOfResult(false)
87 {
88 fExtendedInfo = "TCS: ";
89 fQtc.stepParms().stepType = StepTeleStats::T_TCS;
90 }
91
92
~TupleConstantStep()93 TupleConstantStep::~TupleConstantStep()
94 {
95 }
96
97
setOutputRowGroup(const rowgroup::RowGroup & rg)98 void TupleConstantStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
99 {
100 throw runtime_error("Disabled, use initialize() to set output RowGroup.");
101 }
102
103
initialize(const JobInfo & jobInfo,const RowGroup * rgIn)104 void TupleConstantStep::initialize(const JobInfo& jobInfo, const RowGroup* rgIn)
105 {
106 vector<uint32_t> oids, oidsIn = fRowGroupIn.getOIDs();
107 vector<uint32_t> keys, keysIn = fRowGroupIn.getKeys();
108 vector<uint32_t> scale, scaleIn = fRowGroupIn.getScale();
109 vector<uint32_t> precision, precisionIn = fRowGroupIn.getPrecision();
110 vector<CalpontSystemCatalog::ColDataType> types, typesIn = fRowGroupIn.getColTypes();
111 vector<uint32_t> csNums, csNumsIn = fRowGroupIn.getCharsetNumbers();
112 vector<uint32_t> pos;
113 pos.push_back(2);
114
115 if (rgIn)
116 {
117 fRowGroupIn = *rgIn;
118 fRowGroupIn.initRow(&fRowIn);
119 oidsIn = fRowGroupIn.getOIDs();
120 keysIn = fRowGroupIn.getKeys();
121 scaleIn = fRowGroupIn.getScale();
122 precisionIn = fRowGroupIn.getPrecision();
123 typesIn = fRowGroupIn.getColTypes();
124 csNumsIn = fRowGroupIn.getCharsetNumbers();
125 }
126
127 for (uint64_t i = 0, j = 0; i < jobInfo.deliveredCols.size(); i++)
128 {
129 const ConstantColumn* cc =
130 dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[i].get());
131
132 if (cc != NULL)
133 {
134 CalpontSystemCatalog::ColType ct = cc->resultType();
135
136 if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
137 ct.colWidth++;
138
139 //Round colWidth up
140 if (ct.colWidth == 3)
141 ct.colWidth = 4;
142 else if (ct.colWidth == 5 || ct.colWidth == 6 || ct.colWidth == 7)
143 ct.colWidth = 8;
144
145 oids.push_back(-1);
146 keys.push_back(-1);
147 scale.push_back(ct.scale);
148 precision.push_back(ct.precision);
149 types.push_back(ct.colDataType);
150 csNums.push_back(ct.charsetNumber);
151 pos.push_back(pos.back() + ct.colWidth);
152
153 fIndexConst.push_back(i);
154 }
155 else
156 {
157 // select (select a) from region;
158 if (j >= oidsIn.size() && jobInfo.tableList.empty())
159 {
160 throw IDBExcept(ERR_NO_FROM);
161 }
162
163 idbassert(j < oidsIn.size());
164
165 oids.push_back(oidsIn[j]);
166 keys.push_back(keysIn[j]);
167 scale.push_back(scaleIn[j]);
168 precision.push_back(precisionIn[j]);
169 types.push_back(typesIn[j]);
170 csNums.push_back(csNumsIn[j]);
171 pos.push_back(pos.back() + fRowGroupIn.getColumnWidth(j));
172 j++;
173
174 fIndexMapping.push_back(i);
175 }
176 }
177
178 fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
179 jobInfo.stringTableThreshold);
180 fRowGroupOut.initRow(&fRowOut);
181 fRowGroupOut.initRow(&fRowConst, true);
182
183 constructContanstRow(jobInfo);
184 }
185
186
constructContanstRow(const JobInfo & jobInfo)187 void TupleConstantStep::constructContanstRow(const JobInfo& jobInfo)
188 {
189 // construct a row with only the constant values
190 fConstRowData.reset(new uint8_t[fRowConst.getSize()]);
191 fRowConst.setData(fConstRowData.get());
192 fRowConst.initToNull(); // make sure every col is init'd to something, because later we copy the whole row
193 const vector<CalpontSystemCatalog::ColDataType>& types = fRowGroupOut.getColTypes();
194
195 for (vector<uint64_t>::iterator i = fIndexConst.begin(); i != fIndexConst.end(); i++)
196 {
197 const ConstantColumn* cc =
198 dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[*i].get());
199 const execplan::Result c = cc->result();
200
201 if (cc->type() == ConstantColumn::NULLDATA)
202 {
203 if (types[*i] == CalpontSystemCatalog::CHAR ||
204 types[*i] == CalpontSystemCatalog::VARCHAR ||
205 types[*i] == CalpontSystemCatalog::TEXT)
206 {
207 fRowConst.setStringField("", *i);
208 }
209 else if (isUnsigned(types[*i]))
210 {
211 fRowConst.setUintField(fRowConst.getNullValue(*i), *i);
212 }
213 else
214 {
215 fRowConst.setIntField(fRowConst.getSignedNullValue(*i), *i);
216 }
217
218 continue;
219 }
220
221
222 switch (types[*i])
223 {
224 case CalpontSystemCatalog::TINYINT:
225 case CalpontSystemCatalog::SMALLINT:
226 case CalpontSystemCatalog::MEDINT:
227 case CalpontSystemCatalog::INT:
228 case CalpontSystemCatalog::BIGINT:
229 case CalpontSystemCatalog::DATE:
230 case CalpontSystemCatalog::DATETIME:
231 case CalpontSystemCatalog::TIME:
232 case CalpontSystemCatalog::TIMESTAMP:
233 {
234 fRowConst.setIntField(c.intVal, *i);
235 break;
236 }
237
238 case CalpontSystemCatalog::DECIMAL:
239 case CalpontSystemCatalog::UDECIMAL:
240 {
241 fRowConst.setIntField(c.decimalVal.value, *i);
242 break;
243 }
244
245 case CalpontSystemCatalog::FLOAT:
246 case CalpontSystemCatalog::UFLOAT:
247 {
248 fRowConst.setFloatField(c.floatVal, *i);
249 break;
250 }
251
252 case CalpontSystemCatalog::DOUBLE:
253 case CalpontSystemCatalog::UDOUBLE:
254 {
255 fRowConst.setDoubleField(c.doubleVal, *i);
256 break;
257 }
258
259 case CalpontSystemCatalog::LONGDOUBLE:
260 {
261 fRowConst.setLongDoubleField(c.longDoubleVal, *i);
262 break;
263 }
264
265 case CalpontSystemCatalog::CHAR:
266 case CalpontSystemCatalog::VARCHAR:
267 case CalpontSystemCatalog::TEXT:
268 {
269 fRowConst.setStringField(c.strVal, *i);
270 break;
271 }
272
273 case CalpontSystemCatalog::UTINYINT:
274 case CalpontSystemCatalog::USMALLINT:
275 case CalpontSystemCatalog::UMEDINT:
276 case CalpontSystemCatalog::UINT:
277 case CalpontSystemCatalog::UBIGINT:
278 {
279 fRowConst.setUintField(c.uintVal, *i);
280 break;
281 }
282
283 default:
284 {
285 throw runtime_error("un-supported constant column type.");
286 break;
287 }
288 } // switch
289 } // for constant columns
290 }
291
292
run()293 void TupleConstantStep::run()
294 {
295 if (fInputJobStepAssociation.outSize() == 0)
296 throw logic_error("No input data list for constant step.");
297
298 fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
299
300 if (fInputDL == NULL)
301 throw logic_error("Input is not a RowGroup data list.");
302
303 fInputIterator = fInputDL->getIterator();
304
305 if (fDelivery == false)
306 {
307 if (fOutputJobStepAssociation.outSize() == 0)
308 throw logic_error("No output data list for non-delivery constant step.");
309
310 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
311
312 if (fOutputDL == NULL)
313 throw logic_error("Output is not a RowGroup data list.");
314
315 fRunner = jobstepThreadPool.invoke(Runner(this));
316 }
317 }
318
319
join()320 void TupleConstantStep::join()
321 {
322 if (fRunner)
323 jobstepThreadPool.join(fRunner);
324 }
325
326
nextBand(messageqcpp::ByteStream & bs)327 uint32_t TupleConstantStep::nextBand(messageqcpp::ByteStream& bs)
328 {
329 RGData rgDataIn;
330 RGData rgDataOut;
331 bool more = false;
332 uint32_t rowCount = 0;
333
334 try
335 {
336 bs.restart();
337
338 more = fInputDL->next(fInputIterator, &rgDataIn);
339
340 if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
341 dlTimes.setFirstReadTime();
342
343 if (!more && cancelled())
344 {
345 fEndOfResult = true;
346 }
347
348 if (more && !fEndOfResult)
349 {
350 fRowGroupIn.setData(&rgDataIn);
351 rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
352 fRowGroupOut.setData(&rgDataOut);
353
354 fillInConstants();
355 fRowGroupOut.serializeRGData(bs);
356 rowCount = fRowGroupOut.getRowCount();
357 }
358 else
359 {
360 fEndOfResult = true;
361 }
362 }
363 catch (...)
364 {
365 handleException(std::current_exception(),
366 logging::tupleConstantStepErr,
367 logging::ERR_ALWAYS_CRITICAL,
368 "TupleConstantStep::nextBand()");
369 while (more)
370 more = fInputDL->next(fInputIterator, &rgDataIn);
371
372 fEndOfResult = true;
373 }
374
375 if (fEndOfResult)
376 {
377 // send an empty / error band
378 RGData rgData(fRowGroupOut, 0);
379 fRowGroupOut.setData(&rgData);
380 fRowGroupOut.resetRowGroup(0);
381 fRowGroupOut.setStatus(status());
382 fRowGroupOut.serializeRGData(bs);
383
384 if (traceOn())
385 {
386 dlTimes.setLastReadTime();
387 dlTimes.setEndOfInputTime();
388 printCalTrace();
389 }
390 }
391
392 return rowCount;
393 }
394
395
execute()396 void TupleConstantStep::execute()
397 {
398 RGData rgDataIn;
399 RGData rgDataOut;
400 bool more = false;
401 StepTeleStats sts;
402 sts.query_uuid = fQueryUuid;
403 sts.step_uuid = fStepUuid;
404
405 try
406 {
407 more = fInputDL->next(fInputIterator, &rgDataIn);
408
409 if (traceOn()) dlTimes.setFirstReadTime();
410
411 sts.msg_type = StepTeleStats::ST_START;
412 sts.total_units_of_work = 1;
413 postStepStartTele(sts);
414
415 if (!more && cancelled())
416 {
417 fEndOfResult = true;
418 }
419
420 while (more && !fEndOfResult)
421 {
422 fRowGroupIn.setData(&rgDataIn);
423 rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
424 fRowGroupOut.setData(&rgDataOut);
425
426 fillInConstants();
427
428 more = fInputDL->next(fInputIterator, &rgDataIn);
429
430 if (cancelled())
431 {
432 fEndOfResult = true;
433 }
434 else
435 {
436 fOutputDL->insert(rgDataOut);
437 }
438 }
439 }
440 catch (...)
441 {
442 handleException(std::current_exception(),
443 logging::tupleConstantStepErr,
444 logging::ERR_ALWAYS_CRITICAL,
445 "TupleConstantStep::execute()");
446 }
447
448 while (more)
449 more = fInputDL->next(fInputIterator, &rgDataIn);
450
451 sts.msg_type = StepTeleStats::ST_SUMMARY;
452 sts.total_units_of_work = sts.units_of_work_completed = 1;
453 sts.rows = fRowsReturned;
454 postStepSummaryTele(sts);
455
456 // Bug 3136, let mini stats to be formatted if traceOn.
457 if (traceOn())
458 {
459 dlTimes.setLastReadTime();
460 dlTimes.setEndOfInputTime();
461 printCalTrace();
462 }
463
464 fEndOfResult = true;
465 fOutputDL->endOfInput();
466 }
467
468 // *DRRTUY Copy row at once not one field at a time
fillInConstants()469 void TupleConstantStep::fillInConstants()
470 {
471 fRowGroupIn.getRow(0, &fRowIn);
472 fRowGroupOut.getRow(0, &fRowOut);
473
474 if (fIndexConst.size() > 1 || fIndexConst[0] != 0)
475 {
476 for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
477 {
478 copyRow(fRowConst, &fRowOut);
479
480 fRowOut.setRid(fRowIn.getRelRid());
481
482 for (uint64_t j = 0; j < fIndexMapping.size(); ++j)
483 fRowIn.copyField(fRowOut, fIndexMapping[j], j);
484
485 fRowIn.nextRow();
486 fRowOut.nextRow();
487 }
488 }
489 else // only first column is constant
490 {
491 for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
492 {
493 fRowOut.setRid(fRowIn.getRelRid());
494 fRowConst.copyField(fRowOut, 0, 0);
495
496 for (uint32_t i = 1; i < fRowOut.getColumnCount(); i++)
497 fRowIn.copyField(fRowOut, i, i - 1);
498
499 fRowIn.nextRow();
500 fRowOut.nextRow();
501 }
502 }
503
504 fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
505 fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
506 fRowsReturned += fRowGroupOut.getRowCount();
507 }
508
509
fillInConstants(const rowgroup::Row & rowIn,rowgroup::Row & rowOut)510 void TupleConstantStep::fillInConstants(const rowgroup::Row& rowIn, rowgroup::Row& rowOut)
511 {
512 if (fIndexConst.size() > 1 || fIndexConst[0] != 0)
513 {
514 copyRow(fRowConst, &rowOut);
515 //memcpy(rowOut.getData(), fRowConst.getData(), fRowConst.getSize());
516 rowOut.setRid(rowIn.getRelRid());
517
518 for (uint64_t j = 0; j < fIndexMapping.size(); ++j)
519 rowIn.copyField(rowOut, fIndexMapping[j], j);
520
521 //rowIn.copyField(rowOut.getData() + rowOut.getOffset(fIndexMapping[j]), j);
522 }
523 else // only first column is constant
524 {
525 //size_t n = rowOut.getOffset(rowOut.getColumnCount()) - rowOut.getOffset(1);
526 rowOut.setRid(rowIn.getRelRid());
527 fRowConst.copyField(rowOut, 0, 0);
528
529 //fRowConst.copyField(rowOut.getData()+2, 0); // hardcoded 2 for rid length
530 for (uint32_t i = 1; i < rowOut.getColumnCount(); i++)
531 rowIn.copyField(rowOut, i, i - 1);
532
533 //memcpy(rowOut.getData()+rowOut.getOffset(1), rowIn.getData()+2, n);
534 }
535 }
536
537
getOutputRowGroup() const538 const RowGroup& TupleConstantStep::getOutputRowGroup() const
539 {
540 return fRowGroupOut;
541 }
542
543
getDeliveredRowGroup() const544 const RowGroup& TupleConstantStep::getDeliveredRowGroup() const
545 {
546 return fRowGroupOut;
547 }
548
549
deliverStringTableRowGroup(bool b)550 void TupleConstantStep::deliverStringTableRowGroup(bool b)
551 {
552 fRowGroupOut.setUseStringTable(b);
553 }
554
555
deliverStringTableRowGroup() const556 bool TupleConstantStep::deliverStringTableRowGroup() const
557 {
558 return fRowGroupOut.usesStringTable();
559 }
560
561
toString() const562 const string TupleConstantStep::toString() const
563 {
564 ostringstream oss;
565 oss << "ConstantStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
566
567 oss << " in:";
568
569 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
570 oss << fInputJobStepAssociation.outAt(i);
571
572 oss << " out:";
573
574 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
575 oss << fOutputJobStepAssociation.outAt(i);
576
577 oss << endl;
578
579 return oss.str();
580 }
581
582
printCalTrace()583 void TupleConstantStep::printCalTrace()
584 {
585 time_t t = time (0);
586 char timeString[50];
587 ctime_r (&t, timeString);
588 timeString[strlen (timeString ) - 1] = '\0';
589 ostringstream logStr;
590 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
591 << "; total rows returned-" << fRowsReturned << endl
592 << "\t1st read " << dlTimes.FirstReadTimeString()
593 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
594 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
595 << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
596 << "\tJob completion status " << status() << endl;
597 logEnd(logStr.str().c_str());
598 fExtendedInfo += logStr.str();
599 formatMiniStats();
600 }
601
602
formatMiniStats()603 void TupleConstantStep::formatMiniStats()
604 {
605 ostringstream oss;
606 oss << "TCS "
607 << "UM "
608 << "- "
609 << "- "
610 << "- "
611 << "- "
612 << "- "
613 << "- "
614 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
615 << fRowsReturned << " ";
616 fMiniInfo += oss.str();
617 }
618
619
620 // class TupleConstantOnlyStep
TupleConstantOnlyStep(const JobInfo & jobInfo)621 TupleConstantOnlyStep::TupleConstantOnlyStep(const JobInfo& jobInfo) : TupleConstantStep(jobInfo)
622 {
623 // fExtendedInfo = "TCOS: ";
624 }
625
626
~TupleConstantOnlyStep()627 TupleConstantOnlyStep::~TupleConstantOnlyStep()
628 {
629 }
630
631
632 //void TupleConstantOnlyStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
initialize(const JobInfo & jobInfo,const rowgroup::RowGroup * rgIn)633 void TupleConstantOnlyStep::initialize(const JobInfo& jobInfo, const rowgroup::RowGroup* rgIn)
634 {
635 vector<uint32_t> oids;
636 vector<uint32_t> keys;
637 vector<uint32_t> scale;
638 vector<uint32_t> precision;
639 vector<CalpontSystemCatalog::ColDataType> types;
640 vector<uint32_t> csNums;
641 vector<uint32_t> pos;
642 pos.push_back(2);
643
644 deliverStringTableRowGroup(false);
645
646 for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++)
647 {
648 const ConstantColumn* cc =
649 dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[i].get());
650
651 if (cc == NULL)
652 throw runtime_error("none constant column found.");
653
654 CalpontSystemCatalog::ColType ct = cc->resultType();
655
656 if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
657 ct.colWidth++;
658
659 //Round colWidth up
660 if (ct.colWidth == 3)
661 ct.colWidth = 4;
662 else if (ct.colWidth == 5 || ct.colWidth == 6 || ct.colWidth == 7)
663 ct.colWidth = 8;
664
665 oids.push_back(-1);
666 keys.push_back(-1);
667 scale.push_back(ct.scale);
668 precision.push_back(ct.precision);
669 types.push_back(ct.colDataType);
670 csNums.push_back(ct.charsetNumber);
671 pos.push_back(pos.back() + ct.colWidth);
672
673 fIndexConst.push_back(i);
674 }
675
676 fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold, false);
677 fRowGroupOut.initRow(&fRowOut);
678 fRowGroupOut.initRow(&fRowConst, true);
679
680 constructContanstRow(jobInfo);
681 }
682
683
run()684 void TupleConstantOnlyStep::run()
685 {
686 if (fDelivery == false)
687 {
688 if (fOutputJobStepAssociation.outSize() == 0)
689 throw logic_error("No output data list for non-delivery constant step.");
690
691 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
692
693 if (fOutputDL == NULL)
694 throw logic_error("Output is not a RowGroup data list.");
695
696 try
697 {
698 RGData rgDataOut(fRowGroupOut, 1);
699 fRowGroupOut.setData(&rgDataOut);
700
701 if (traceOn()) dlTimes.setFirstReadTime();
702
703 fillInConstants();
704
705 fOutputDL->insert(rgDataOut);
706 }
707 catch (...)
708 {
709 handleException(std::current_exception(),
710 logging::tupleConstantStepErr,
711 logging::ERR_ALWAYS_CRITICAL,
712 "TupleConstantOnlyStep::run()");
713 }
714
715 if (traceOn())
716 {
717 dlTimes.setLastReadTime();
718 dlTimes.setEndOfInputTime();
719 printCalTrace();
720 }
721
722 // Bug 3136, let mini stats to be formatted if traceOn.
723 fEndOfResult = true;
724 fOutputDL->endOfInput();
725 }
726 }
727
728
nextBand(messageqcpp::ByteStream & bs)729 uint32_t TupleConstantOnlyStep::nextBand(messageqcpp::ByteStream& bs)
730 {
731 RGData rgDataOut;
732 uint32_t rowCount = 0;
733
734 if (!fEndOfResult)
735 {
736 try
737 {
738 bs.restart();
739
740 if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
741 dlTimes.setFirstReadTime();
742
743 rgDataOut.reinit(fRowGroupOut, 1);
744 fRowGroupOut.setData(&rgDataOut);
745
746 fillInConstants();
747 fRowGroupOut.serializeRGData(bs);
748 rowCount = fRowGroupOut.getRowCount();
749 }
750 catch (...)
751 {
752 handleException(std::current_exception(),
753 logging::tupleConstantStepErr,
754 logging::ERR_ALWAYS_CRITICAL,
755 "TupleConstantOnlyStep::nextBand()");
756 }
757
758 fEndOfResult = true;
759 }
760 else
761 {
762 // send an empty / error band
763 RGData rgData(fRowGroupOut, 0);
764 fRowGroupOut.setData(&rgData);
765 fRowGroupOut.resetRowGroup(0);
766 fRowGroupOut.setStatus(status());
767 fRowGroupOut.serializeRGData(bs);
768
769 if (traceOn())
770 {
771 dlTimes.setLastReadTime();
772 dlTimes.setEndOfInputTime();
773 printCalTrace();
774 }
775 }
776
777 return rowCount;
778 }
779
780
fillInConstants()781 void TupleConstantOnlyStep::fillInConstants()
782 {
783 fRowGroupOut.getRow(0, &fRowOut);
784 idbassert(fRowConst.getColumnCount() == fRowOut.getColumnCount());
785 fRowOut.usesStringTable(fRowConst.usesStringTable());
786 copyRow(fRowConst, &fRowOut);
787 fRowGroupOut.resetRowGroup(0);
788 fRowGroupOut.setRowCount(1);
789 fRowsReturned = 1;
790 }
791
792
toString() const793 const string TupleConstantOnlyStep::toString() const
794 {
795 ostringstream oss;
796 oss << "ConstantOnlyStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
797
798 oss << " out:";
799
800 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
801 oss << fOutputJobStepAssociation.outAt(i);
802
803 oss << endl;
804
805 return oss.str();
806 }
807
808
809 // class TupleConstantBooleanStep
TupleConstantBooleanStep(const JobInfo & jobInfo,bool value)810 TupleConstantBooleanStep::TupleConstantBooleanStep(const JobInfo& jobInfo, bool value) :
811 TupleConstantStep(jobInfo),
812 fValue(value)
813 {
814 // fExtendedInfo = "TCBS: ";
815 }
816
817
~TupleConstantBooleanStep()818 TupleConstantBooleanStep::~TupleConstantBooleanStep()
819 {
820 }
821
822
initialize(const RowGroup & rgIn,const JobInfo &)823 void TupleConstantBooleanStep::initialize(const RowGroup& rgIn, const JobInfo&)
824 {
825 fRowGroupOut = rgIn;
826 fRowGroupOut.initRow(&fRowOut);
827 fRowGroupOut.initRow(&fRowConst, true);
828 }
829
830
run()831 void TupleConstantBooleanStep::run()
832 {
833 if (fDelivery == false)
834 {
835 if (fOutputJobStepAssociation.outSize() == 0)
836 throw logic_error("No output data list for non-delivery constant step.");
837
838 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
839
840 if (fOutputDL == NULL)
841 throw logic_error("Output is not a RowGroup data list.");
842
843 if (traceOn())
844 {
845 dlTimes.setFirstReadTime();
846 dlTimes.setLastReadTime();
847 dlTimes.setEndOfInputTime();
848 printCalTrace();
849 }
850
851 // Bug 3136, let mini stats to be formatted if traceOn.
852 fOutputDL->endOfInput();
853 }
854 }
855
856
nextBand(messageqcpp::ByteStream & bs)857 uint32_t TupleConstantBooleanStep::nextBand(messageqcpp::ByteStream& bs)
858 {
859 // send an empty band
860 RGData rgData(fRowGroupOut, 0);
861 fRowGroupOut.setData(&rgData);
862 fRowGroupOut.resetRowGroup(0);
863 fRowGroupOut.setStatus(status());
864 fRowGroupOut.serializeRGData(bs);
865
866 if (traceOn())
867 {
868 dlTimes.setFirstReadTime();
869 dlTimes.setLastReadTime();
870 dlTimes.setEndOfInputTime();
871 printCalTrace();
872 }
873
874 return 0;
875 }
876
877
toString() const878 const string TupleConstantBooleanStep::toString() const
879 {
880 ostringstream oss;
881 oss << "ConstantBooleanStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
882
883 oss << " out:";
884
885 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
886 oss << fOutputJobStepAssociation.outAt(i);
887
888 oss << endl;
889
890 return oss.str();
891 }
892
893
894 } //namespace
895 // vim:ts=4 sw=4:
896
897