1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: tupleannexstep.cpp 9661 2013-07-01 20:33:05Z pleblanc $
20 
21 
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 #ifdef _MSC_VER
27 #include <unordered_set>
28 #else
29 #include <tr1/unordered_set>
30 #endif
31 using namespace std;
32 
33 #include <boost/shared_ptr.hpp>
34 #include <boost/shared_array.hpp>
35 #include <boost/uuid/uuid_io.hpp>
36 using namespace boost;
37 
38 #include "messagequeue.h"
39 using namespace messageqcpp;
40 
41 #include "loggingid.h"
42 #include "errorcodes.h"
43 using namespace logging;
44 
45 #include "calpontsystemcatalog.h"
46 #include "constantcolumn.h"
47 #include "simplecolumn.h"
48 using namespace execplan;
49 
50 #include "rowgroup.h"
51 using namespace rowgroup;
52 
53 #include "hasher.h"
54 #include "stlpoolallocator.h"
55 #include "threadnaming.h"
56 using namespace utils;
57 
58 #include "querytele.h"
59 using namespace querytele;
60 
61 #include "funcexp.h"
62 #include "jobstep.h"
63 #include "jlf_common.h"
64 #include "tupleconstantstep.h"
65 #include "limitedorderby.h"
66 
67 #include "tupleannexstep.h"
68 
69 #define QUEUE_RESERVE_SIZE 100000
70 
71 namespace
72 {
73 struct TAHasher
74 {
75     joblist::TupleAnnexStep* ts;
76     utils::Hasher_r h;
TAHasher__anon44a169c20111::TAHasher77     TAHasher(joblist::TupleAnnexStep* t) : ts(t) { }
78     uint64_t operator()(const rowgroup::Row::Pointer&) const;
79 };
80 struct TAEq
81 {
82     joblist::TupleAnnexStep* ts;
TAEq__anon44a169c20111::TAEq83     TAEq(joblist::TupleAnnexStep* t) : ts(t) { }
84     bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const;
85 };
86 //TODO:  Generalize these and put them back in utils/common/hasher.h
87 typedef tr1::unordered_set<rowgroup::Row::Pointer, TAHasher, TAEq,
88         STLPoolAllocator<rowgroup::Row::Pointer> > DistinctMap_t;
89 };
90 
operator ()(const Row::Pointer & p) const91 inline uint64_t TAHasher::operator()(const Row::Pointer& p) const
92 {
93     Row& row = ts->row1;
94     row.setPointer(p);
95     return row.hash();
96 }
97 
operator ()(const Row::Pointer & d1,const Row::Pointer & d2) const98 inline bool TAEq::operator()(const Row::Pointer& d1, const Row::Pointer& d2) const
99 {
100     Row& r1 = ts->row1, &r2 = ts->row2;
101     r1.setPointer(d1);
102     r2.setPointer(d2);
103     return r1.equals(r2);
104 }
105 
106 namespace joblist
107 {
108 
TupleAnnexStep(const JobInfo & jobInfo)109 TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) :
110     JobStep(jobInfo),
111     fInputDL(NULL),
112     fOutputDL(NULL),
113     fInputIterator(0),
114     fOutputIterator(0),
115     fRunner(0),
116     fRowsProcessed(0),
117     fRowsReturned(0),
118     fLimitStart(0),
119     fLimitCount(-1),
120     fLimitHit(false),
121     fEndOfResult(false),
122     fDistinct(false),
123     fParallelOp(false),
124     fOrderBy(NULL),
125     fConstant(NULL),
126     fFeInstance(funcexp::FuncExp::instance()),
127     fJobList(jobInfo.jobListPtr),
128     fFinishedThreads(0)
129 {
130     fExtendedInfo = "TNS: ";
131     fQtc.stepParms().stepType = StepTeleStats::T_TNS;
132 }
133 
134 
~TupleAnnexStep()135 TupleAnnexStep::~TupleAnnexStep()
136 {
137     if(fParallelOp)
138     {
139         if(fOrderByList.size() > 0)
140         {
141             for(uint64_t id = 0; id < fOrderByList.size(); id++)
142             {
143                 delete fOrderByList[id];
144             }
145 
146             fOrderByList.clear();
147         }
148 
149         fInputIteratorsList.clear();
150         fRunnersList.clear();
151     }
152 
153     if (fOrderBy)
154         delete fOrderBy;
155 
156     fOrderBy = NULL;
157 
158     if (fConstant)
159         delete fConstant;
160 
161     fConstant = NULL;
162 }
163 
164 
setOutputRowGroup(const rowgroup::RowGroup & rg)165 void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
166 {
167     throw runtime_error("Disabled, use initialize() to set output RowGroup.");
168 }
169 
170 
initialize(const RowGroup & rgIn,const JobInfo & jobInfo)171 void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
172 {
173     // Initialize structures used by separate workers
174     uint64_t id = 1;
175     fRowGroupIn = rgIn;
176     fRowGroupIn.initRow(&fRowIn);
177     if(fParallelOp && fOrderBy)
178     {
179         fOrderByList.resize(fMaxThreads+1);
180         for(id = 0; id <= fMaxThreads; id++)
181         {
182             // *DRRTUY use SP here?
183             fOrderByList[id] = new LimitedOrderBy();
184             fOrderByList[id]->distinct(fDistinct);
185             fOrderByList[id]->initialize(rgIn, jobInfo, false, true);
186         }
187     }
188     else
189     {
190         if (fOrderBy)
191         {
192             fOrderBy->distinct(fDistinct);
193             fOrderBy->initialize(rgIn, jobInfo);
194         }
195     }
196 
197     if (fConstant == NULL)
198     {
199         vector<uint32_t> oids, oidsIn = rgIn.getOIDs();
200         vector<uint32_t> keys, keysIn = rgIn.getKeys();
201         vector<uint32_t> scale, scaleIn = rgIn.getScale();
202         vector<uint32_t> precision, precisionIn = rgIn.getPrecision();
203         vector<CalpontSystemCatalog::ColDataType> types, typesIn = rgIn.getColTypes();
204         vector<uint32_t> csNums, csNumsIn = rgIn.getCharsetNumbers();
205         vector<uint32_t> pos, posIn = rgIn.getOffsets();
206         size_t n = jobInfo.nonConstDelCols.size();
207 
208         // Add all columns into output RG as keys. Can we put only keys?
209         oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n);
210         keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n);
211         scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n);
212         precision.insert(precision.end(), precisionIn.begin(), precisionIn.begin() + n);
213         types.insert(types.end(), typesIn.begin(), typesIn.begin() + n);
214         csNums.insert(csNums.end(), csNumsIn.begin(), csNumsIn.begin() + n);
215         pos.insert(pos.end(), posIn.begin(), posIn.begin() + n + 1);
216 
217         fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
218     }
219     else
220     {
221         fConstant->initialize(jobInfo, &rgIn);
222         fRowGroupOut = fConstant->getOutputRowGroup();
223     }
224 
225     fRowGroupOut.initRow(&fRowOut);
226     fRowGroupDeliver = fRowGroupOut;
227 }
228 
229 
run()230 void TupleAnnexStep::run()
231 {
232     if (fInputJobStepAssociation.outSize() == 0)
233         throw logic_error("No input data list for annex step.");
234 
235     fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
236 
237     if (fInputDL == NULL)
238         throw logic_error("Input is not a RowGroup data list.");
239 
240     if (fOutputJobStepAssociation.outSize() == 0)
241         throw logic_error("No output data list for annex step.");
242 
243     fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
244 
245     if (fOutputDL == NULL)
246         throw logic_error("Output is not a RowGroup data list.");
247 
248     if (fDelivery == true)
249     {
250         fOutputIterator = fOutputDL->getIterator();
251     }
252 
253     if(fParallelOp)
254     {
255         // Indexing begins with 1
256         fRunnersList.resize(fMaxThreads);
257         fInputIteratorsList.resize(fMaxThreads+1);
258 
259         // Activate stats collecting before CS spawns threads.
260         if (traceOn()) dlTimes.setFirstReadTime();
261 
262         // *DRRTUY Make this block conditional
263         StepTeleStats sts;
264         sts.query_uuid = fQueryUuid;
265         sts.step_uuid = fStepUuid;
266         sts.msg_type = StepTeleStats::ST_START;
267         sts.total_units_of_work = 1;
268         postStepStartTele(sts);
269 
270         for(uint32_t id = 1; id <= fMaxThreads; id++)
271         {
272             fInputIteratorsList[id] = fInputDL->getIterator();
273             fRunnersList[id-1] = jobstepThreadPool.invoke(Runner(this, id));
274         }
275     }
276     else
277     {
278         fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
279 
280         if (fInputDL == NULL)
281             throw logic_error("Input is not a RowGroup data list.");
282 
283         fInputIterator = fInputDL->getIterator();
284         fRunner = jobstepThreadPool.invoke(Runner(this));
285     }
286 
287 }
288 
join()289 void TupleAnnexStep::join()
290 {
291     if(fParallelOp)
292     {
293         jobstepThreadPool.join(fRunnersList);
294     }
295     else
296     {
297         if (fRunner)
298         {
299             jobstepThreadPool.join(fRunner);
300         }
301     }
302 
303 }
304 
nextBand(messageqcpp::ByteStream & bs)305 uint32_t TupleAnnexStep::nextBand(messageqcpp::ByteStream& bs)
306 {
307     RGData rgDataOut;
308     bool more = false;
309     uint32_t rowCount = 0;
310 
311     try
312     {
313         bs.restart();
314 
315         more = fOutputDL->next(fOutputIterator, &rgDataOut);
316 
317         if (more && !cancelled())
318         {
319             fRowGroupDeliver.setData(&rgDataOut);
320             fRowGroupDeliver.serializeRGData(bs);
321             rowCount = fRowGroupDeliver.getRowCount();
322         }
323         else
324         {
325             while (more)
326                 more = fOutputDL->next(fOutputIterator, &rgDataOut);
327 
328             fEndOfResult = true;
329         }
330     }
331     catch (...)
332     {
333         handleException(std::current_exception(),
334                         logging::ERR_IN_DELIVERY,
335                         logging::ERR_ALWAYS_CRITICAL,
336                         "TupleAnnexStep::nextBand()");
337         while (more)
338             more = fOutputDL->next(fOutputIterator, &rgDataOut);
339 
340         fEndOfResult = true;
341     }
342 
343     if (fEndOfResult)
344     {
345         // send an empty / error band
346         rgDataOut.reinit(fRowGroupDeliver, 0);
347         fRowGroupDeliver.setData(&rgDataOut);
348         fRowGroupDeliver.resetRowGroup(0);
349         fRowGroupDeliver.setStatus(status());
350         fRowGroupDeliver.serializeRGData(bs);
351     }
352 
353     return rowCount;
354 }
355 
356 
execute()357 void TupleAnnexStep::execute()
358 {
359     if (fOrderBy)
360         executeWithOrderBy();
361     else if (fDistinct)
362         executeNoOrderByWithDistinct();
363     else
364         executeNoOrderBy();
365 
366     StepTeleStats sts;
367     sts.query_uuid = fQueryUuid;
368     sts.step_uuid = fStepUuid;
369     sts.msg_type = StepTeleStats::ST_SUMMARY;
370     sts.total_units_of_work = sts.units_of_work_completed = 1;
371     sts.rows = fRowsReturned;
372     postStepSummaryTele(sts);
373 
374     if (traceOn())
375     {
376         if (dlTimes.FirstReadTime().tv_sec == 0)
377             dlTimes.setFirstReadTime();
378 
379         dlTimes.setLastReadTime();
380         dlTimes.setEndOfInputTime();
381         printCalTrace();
382     }
383 }
384 
execute(uint32_t id)385 void TupleAnnexStep::execute(uint32_t id)
386 {
387     if(fOrderByList[id])
388         executeParallelOrderBy(id);
389 
390 }
391 
executeNoOrderBy()392 void TupleAnnexStep::executeNoOrderBy()
393 {
394     utils::setThreadName("TASwoOrd");
395     RGData rgDataIn;
396     RGData rgDataOut;
397     bool more = false;
398 
399     try
400     {
401         more = fInputDL->next(fInputIterator, &rgDataIn);
402 
403         if (traceOn()) dlTimes.setFirstReadTime();
404 
405         StepTeleStats sts;
406         sts.query_uuid = fQueryUuid;
407         sts.step_uuid = fStepUuid;
408         sts.msg_type = StepTeleStats::ST_START;
409         sts.total_units_of_work = 1;
410         postStepStartTele(sts);
411 
412         while (more && !cancelled() && !fLimitHit)
413         {
414             fRowGroupIn.setData(&rgDataIn);
415             fRowGroupIn.getRow(0, &fRowIn);
416             // Get a new output rowgroup for each input rowgroup to preserve the rids
417             rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
418             fRowGroupOut.setData(&rgDataOut);
419             fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
420             fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
421             fRowGroupOut.getRow(0, &fRowOut);
422 
423             for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i)
424             {
425                 // skip first limit-start rows
426                 if (fRowsProcessed++ < fLimitStart)
427                 {
428                     fRowIn.nextRow();
429                     continue;
430                 }
431 
432                 if (UNLIKELY(fRowsReturned >= fLimitCount))
433                 {
434                     fLimitHit = true;
435                     fJobList->abortOnLimit((JobStep*) this);
436                     continue;
437                 }
438 
439                 if (fConstant)
440                     fConstant->fillInConstants(fRowIn, fRowOut);
441                 else
442                     copyRow(fRowIn, &fRowOut);
443 
444                 fRowGroupOut.incRowCount();
445 
446                 if (++fRowsReturned < fLimitCount)
447                 {
448                     fRowOut.nextRow();
449                     fRowIn.nextRow();
450                 }
451             }
452 
453             if (fRowGroupOut.getRowCount() > 0)
454             {
455                 fOutputDL->insert(rgDataOut);
456             }
457 
458             more = fInputDL->next(fInputIterator, &rgDataIn);
459         }
460     }
461     catch (...)
462     {
463         handleException(std::current_exception(),
464                         logging::ERR_IN_PROCESS,
465                         logging::ERR_ALWAYS_CRITICAL,
466                         "TupleAnnexStep::executeNoOrderBy()");
467     }
468 
469     while (more)
470         more = fInputDL->next(fInputIterator, &rgDataIn);
471 
472     // Bug 3136, let mini stats to be formatted if traceOn.
473     fOutputDL->endOfInput();
474 }
475 
476 
executeNoOrderByWithDistinct()477 void TupleAnnexStep::executeNoOrderByWithDistinct()
478 {
479     utils::setThreadName("TASwoOrdDist");
480     scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
481     vector<RGData> dataVec;
482     vector<RGData> dataVecSkip;
483     RGData rgDataIn;
484     RGData rgDataOut;
485     RGData rgDataSkip;
486     RowGroup rowGroupSkip;
487     Row rowSkip;
488     bool more = false;
489 
490     rgDataOut.reinit(fRowGroupOut);
491     fRowGroupOut.setData(&rgDataOut);
492     fRowGroupOut.resetRowGroup(0);
493     fRowGroupOut.getRow(0, &fRowOut);
494 
495     fRowGroupOut.initRow(&row1);
496     fRowGroupOut.initRow(&row2);
497 
498     rowGroupSkip = fRowGroupOut;
499     rgDataSkip.reinit(rowGroupSkip);
500     rowGroupSkip.setData(&rgDataSkip);
501     rowGroupSkip.resetRowGroup(0);
502     rowGroupSkip.initRow(&rowSkip);
503     rowGroupSkip.getRow(0, &rowSkip);
504 
505     try
506     {
507         more = fInputDL->next(fInputIterator, &rgDataIn);
508 
509         if (traceOn()) dlTimes.setFirstReadTime();
510 
511         StepTeleStats sts;
512         sts.query_uuid = fQueryUuid;
513         sts.step_uuid = fStepUuid;
514         sts.msg_type = StepTeleStats::ST_START;
515         sts.total_units_of_work = 1;
516         postStepStartTele(sts);
517 
518         while (more && !cancelled() && !fLimitHit)
519         {
520             fRowGroupIn.setData(&rgDataIn);
521             fRowGroupIn.getRow(0, &fRowIn);
522 
523             for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i)
524             {
525                 pair<DistinctMap_t::iterator, bool> inserted;
526                 Row* rowPtr;
527 
528                 if (distinctMap->size() < fLimitStart)
529                     rowPtr = &rowSkip;
530                 else
531                     rowPtr = &fRowOut;
532 
533                 if (fConstant)
534                     fConstant->fillInConstants(fRowIn, *rowPtr);
535                 else
536                     copyRow(fRowIn, rowPtr);
537 
538                 fRowIn.nextRow();
539                 inserted = distinctMap->insert(rowPtr->getPointer());
540                 ++fRowsProcessed;
541 
542                 if (inserted.second)
543                 {
544                     if (UNLIKELY(fRowsReturned >= fLimitCount))
545                     {
546                         fLimitHit = true;
547                         fJobList->abortOnLimit((JobStep*) this);
548                         break;
549                     }
550 
551                     // skip first limit-start rows
552                     if (distinctMap->size() <= fLimitStart)
553                     {
554                         rowGroupSkip.incRowCount();
555                         rowSkip.nextRow();
556                         if (UNLIKELY(rowGroupSkip.getRowCount() >= rowgroup::rgCommonSize))
557                         {
558                             // allocate new RGData for skipped rows below the fLimitStart
559                             // offset (do not take it into account in RM assuming there
560                             // are few skipped rows)
561                             dataVecSkip.push_back(rgDataSkip);
562                             rgDataSkip.reinit(rowGroupSkip);
563                             rowGroupSkip.setData(&rgDataSkip);
564                             rowGroupSkip.resetRowGroup(0);
565                             rowGroupSkip.getRow(0, &rowSkip);
566                         }
567                         continue;
568                     }
569 
570                     ++fRowsReturned;
571 
572                     fRowGroupOut.incRowCount();
573                     fRowOut.nextRow();
574 
575                     if (UNLIKELY(fRowGroupOut.getRowCount() >= rowgroup::rgCommonSize))
576                     {
577                         dataVec.push_back(rgDataOut);
578                         rgDataOut.reinit(fRowGroupOut);
579                         fRowGroupOut.setData(&rgDataOut);
580                         fRowGroupOut.resetRowGroup(0);
581                         fRowGroupOut.getRow(0, &fRowOut);
582                     }
583                 }
584             }
585 
586             more = fInputDL->next(fInputIterator, &rgDataIn);
587         }
588 
589         if (fRowGroupOut.getRowCount() > 0)
590             dataVec.push_back(rgDataOut);
591 
592         for (vector<RGData>::iterator i = dataVec.begin(); i != dataVec.end(); i++)
593         {
594             rgDataOut = *i;
595             fRowGroupOut.setData(&rgDataOut);
596             fOutputDL->insert(rgDataOut);
597         }
598     }
599     catch (...)
600     {
601         handleException(std::current_exception(),
602                         logging::ERR_IN_PROCESS,
603                         logging::ERR_ALWAYS_CRITICAL,
604                         "TupleAnnexStep::executeNoOrderByWithDistinct()");
605     }
606 
607     while (more)
608         more = fInputDL->next(fInputIterator, &rgDataIn);
609 
610     // Bug 3136, let mini stats to be formatted if traceOn.
611     fOutputDL->endOfInput();
612 }
613 
614 
executeWithOrderBy()615 void TupleAnnexStep::executeWithOrderBy()
616 {
617     utils::setThreadName("TASwOrd");
618     RGData rgDataIn;
619     RGData rgDataOut;
620     bool more = false;
621 
622     try
623     {
624         more = fInputDL->next(fInputIterator, &rgDataIn);
625 
626         if (traceOn()) dlTimes.setFirstReadTime();
627 
628         StepTeleStats sts;
629         sts.query_uuid = fQueryUuid;
630         sts.step_uuid = fStepUuid;
631         sts.msg_type = StepTeleStats::ST_START;
632         sts.total_units_of_work = 1;
633         postStepStartTele(sts);
634 
635         while (more && !cancelled())
636         {
637             fRowGroupIn.setData(&rgDataIn);
638             fRowGroupIn.getRow(0, &fRowIn);
639 
640             for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled(); ++i)
641             {
642                 fOrderBy->processRow(fRowIn);
643                 fRowIn.nextRow();
644             }
645 
646             more = fInputDL->next(fInputIterator, &rgDataIn);
647         }
648 
649         fOrderBy->finalize();
650 
651         if (!cancelled())
652         {
653             while (fOrderBy->getData(rgDataIn))
654             {
655                 if (fConstant == NULL &&
656                         fRowGroupOut.getColumnCount() == fRowGroupIn.getColumnCount())
657                 {
658                     rgDataOut = rgDataIn;
659                     fRowGroupOut.setData(&rgDataOut);
660                 }
661                 else
662                 {
663                     fRowGroupIn.setData(&rgDataIn);
664                     fRowGroupIn.getRow(0, &fRowIn);
665 
666                     rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
667                     fRowGroupOut.setData(&rgDataOut);
668                     fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
669                     fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
670                     fRowGroupOut.getRow(0, &fRowOut);
671 
672                     for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
673                     {
674                         if (fConstant)
675                             fConstant->fillInConstants(fRowIn, fRowOut);
676                         else
677                             copyRow(fRowIn, &fRowOut);
678 
679                         fRowGroupOut.incRowCount();
680                         fRowOut.nextRow();
681                         fRowIn.nextRow();
682                     }
683                 }
684 
685                 if (fRowGroupOut.getRowCount() > 0)
686                 {
687                     fRowsReturned += fRowGroupOut.getRowCount();
688                     fOutputDL->insert(rgDataOut);
689                 }
690             }
691         }
692     }
693     catch (...)
694     {
695         handleException(std::current_exception(),
696                         logging::ERR_IN_PROCESS,
697                         logging::ERR_ALWAYS_CRITICAL,
698                         "TupleAnnexStep::executeWithOrderBy()");
699     }
700 
701     while (more)
702         more = fInputDL->next(fInputIterator, &rgDataIn);
703 
704     // Bug 3136, let mini stats to be formatted if traceOn.
705     fOutputDL->endOfInput();
706 }
707 
708 /*
709     The m() iterates over thread's LimitedOrderBy instances,
710     reverts the rules and then populates the final collection
711     used for final sorting. The method uses OrderByRow that
712     combination of Row::data and comparison rules.
713     When m() finishes with thread's LOBs it iterates over
714     final sorting collection, populates rgDataOut, then
715     sends it into outputDL.
716     Changing this method don't forget to make changes in
717     finalizeParallelOrderBy() that is a clone.
718     !!!The method doesn't set Row::baseRid
719 */
finalizeParallelOrderByDistinct()720 void TupleAnnexStep::finalizeParallelOrderByDistinct()
721 {
722     utils::setThreadName("TASwParOrdDistM");
723     uint64_t count = 0;
724     uint64_t offset = 0;
725     uint32_t rowSize = 0;
726 
727     rowgroup::RGData rgDataOut;
728     rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
729     fRowGroupOut.setData(&rgDataOut);
730     fRowGroupOut.resetRowGroup(0);
731     // Calculate offset here
732     fRowGroupOut.getRow(0, &fRowOut);
733     ordering::SortingPQ finalPQ;
734     scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
735     fRowGroupIn.initRow(&row1);
736     fRowGroupIn.initRow(&row2);
737 
738     try
739     {
740         for(uint64_t id = 1; id <= fMaxThreads; id++)
741         {
742             if (cancelled())
743             {
744                 break;
745             }
746             // Revert the ordering rules before we
747             // add rows into the final PQ.
748             fOrderByList[id]->getRule().revertRules();
749             ordering::SortingPQ &currentPQ = fOrderByList[id]->getQueue();
750             finalPQ.reserve(finalPQ.size()+currentPQ.size());
751             pair<DistinctMap_t::iterator, bool> inserted;
752             while (currentPQ.size())
753             {
754                 ordering::OrderByRow &topOBRow =
755                     const_cast<ordering::OrderByRow&>(currentPQ.top());
756                 inserted = distinctMap->insert(topOBRow.fData);
757                 if (inserted.second)
758                 {
759                     finalPQ.push(topOBRow);
760                 }
761                 currentPQ.pop();
762             }
763         }
764     }
765     catch (...)
766     {
767         handleException(std::current_exception(),
768                         logging::ERR_IN_PROCESS,
769                         logging::ERR_ALWAYS_CRITICAL,
770                         "TupleAnnexStep::finalizeParallelOrderByDistinct()");
771     }
772 
773     // OFFSET processing
774     while (finalPQ.size() && offset < fLimitStart)
775     {
776         offset++;
777         finalPQ.pop();
778     }
779 
780     // Calculate rowSize only once
781     if (finalPQ.size())
782     {
783         ordering::OrderByRow& topOBRow =
784             const_cast<ordering::OrderByRow&>(finalPQ.top());
785         fRowIn.setData(topOBRow.fData);
786         if (!fConstant)
787         {
788             copyRow(fRowIn, &fRowOut);
789         }
790         else
791         {
792             fConstant->fillInConstants(fRowIn, fRowOut);
793         }
794         rowSize = fRowOut.getSize();
795         fRowGroupOut.incRowCount();
796         fRowOut.nextRow(rowSize);
797         finalPQ.pop();
798         count++;
799     }
800 
801     if (!fConstant)
802     {
803         while(finalPQ.size())
804         {
805             if (cancelled())
806             {
807                 break;
808             }
809 
810             while (count < fLimitCount && finalPQ.size()
811                     && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
812             {
813                 ordering::OrderByRow &topOBRow =
814                     const_cast<ordering::OrderByRow&>(finalPQ.top());
815 
816                 fRowIn.setData(topOBRow.fData);
817                 copyRow(fRowIn, &fRowOut);
818                 fRowGroupOut.incRowCount();
819                 fRowOut.nextRow(rowSize);
820 
821                 finalPQ.pop();
822                 count++;
823                 if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
824                 {
825                     break;
826                 }
827             }
828 
829             if (fRowGroupOut.getRowCount() > 0)
830             {
831                 fRowsReturned += fRowGroupOut.getRowCount();
832                 fOutputDL->insert(rgDataOut);
833                 rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
834                 fRowGroupOut.setData(&rgDataOut);
835                 fRowGroupOut.resetRowGroup(0);
836                 fRowGroupOut.getRow(0, &fRowOut);
837             }
838             else
839             {
840                 break;
841             }
842         } // end of limit bound while loop
843     }
844     else // Add ConstantColumns striped earlier
845     {
846         while(finalPQ.size())
847         {
848             if (cancelled())
849             {
850                 break;
851             }
852 
853             while (count < fLimitCount && finalPQ.size()
854                     && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
855             {
856                 ordering::OrderByRow &topOBRow =
857                     const_cast<ordering::OrderByRow&>(finalPQ.top());
858 
859                 fRowIn.setData(topOBRow.fData);
860                 fConstant->fillInConstants(fRowIn, fRowOut);
861                 fRowGroupOut.incRowCount();
862                 fRowOut.nextRow(rowSize);
863 
864                 finalPQ.pop();
865                 count++;
866                 if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
867                 {
868                     break;
869                 }
870             }
871 
872             if (fRowGroupOut.getRowCount() > 0)
873             {
874                 fRowsReturned += fRowGroupOut.getRowCount();
875                 fOutputDL->insert(rgDataOut);
876                 rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
877                 fRowGroupOut.setData(&rgDataOut);
878                 fRowGroupOut.resetRowGroup(0);
879                 fRowGroupOut.getRow(0, &fRowOut);
880             }
881             else
882             {
883                     break;
884             }
885         } // end of limit bound while loop
886     } // end of if-else
887 
888     if (fRowGroupOut.getRowCount() > 0)
889     {
890         fRowsReturned += fRowGroupOut.getRowCount();
891         fOutputDL->insert(rgDataOut);
892     }
893 
894     fOutputDL->endOfInput();
895 
896     StepTeleStats sts;
897     sts.query_uuid = fQueryUuid;
898     sts.step_uuid = fStepUuid;
899     sts.msg_type = StepTeleStats::ST_SUMMARY;
900     sts.total_units_of_work = sts.units_of_work_completed = 1;
901     sts.rows = fRowsReturned;
902     postStepSummaryTele(sts);
903 
904     if (traceOn())
905     {
906         if (dlTimes.FirstReadTime().tv_sec == 0)
907             dlTimes.setFirstReadTime();
908 
909         dlTimes.setLastReadTime();
910         dlTimes.setEndOfInputTime();
911         printCalTrace();
912     }
913 }
914 
915 /*
916     The m() iterates over thread's LimitedOrderBy instances,
917     reverts the rules and then populates the final collection
918     used for final sorting. The method uses OrderByRow that
919     combination of Row::data and comparison rules.
920     When m() finishes with thread's LOBs it iterates over
921     final sorting collection, populates rgDataOut, then
922     sends it into outputDL.
923     Changing this method don't forget to make changes in
924     finalizeParallelOrderByDistinct() that is a clone.
925     !!!The method doesn't set Row::baseRid
926 */
finalizeParallelOrderBy()927 void TupleAnnexStep::finalizeParallelOrderBy()
928 {
929     utils::setThreadName("TASwParOrdMerge");
930     uint64_t count = 0;
931     uint64_t offset = 0;
932     uint32_t rowSize = 0;
933 
934     rowgroup::RGData rgDataOut;
935     ordering::SortingPQ finalPQ;
936     rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
937     fRowGroupOut.setData(&rgDataOut);
938     fRowGroupOut.resetRowGroup(0);
939     // Calculate offset here
940     fRowGroupOut.getRow(0, &fRowOut);
941 
942     try
943     {
944         for(uint64_t id = 1; id <= fMaxThreads; id++)
945         {
946             if (cancelled())
947             {
948                 break;
949             }
950             // Revert the ordering rules before we
951             // add rows into the final PQ.
952             fOrderByList[id]->getRule().revertRules();
953             ordering::SortingPQ &currentPQ = fOrderByList[id]->getQueue();
954             finalPQ.reserve(currentPQ.size());
955             while (currentPQ.size())
956             {
957                 ordering::OrderByRow &topOBRow =
958                     const_cast<ordering::OrderByRow&>(currentPQ.top());
959                 finalPQ.push(topOBRow);
960                 currentPQ.pop();
961             }
962         }
963     }
964     catch (...)
965     {
966         handleException(std::current_exception(),
967                         logging::ERR_IN_PROCESS,
968                         logging::ERR_ALWAYS_CRITICAL,
969                         "TupleAnnexStep::finalizeParallelOrderBy()");
970     }
971 
972     // OFFSET processing
973     while (finalPQ.size() && offset < fLimitStart)
974     {
975         offset++;
976         finalPQ.pop();
977     }
978 
979     // Calculate rowSize only once
980     if (finalPQ.size())
981     {
982         ordering::OrderByRow& topOBRow =
983             const_cast<ordering::OrderByRow&>(finalPQ.top());
984         fRowIn.setData(topOBRow.fData);
985         if (!fConstant)
986         {
987             copyRow(fRowIn, &fRowOut);
988         }
989         else
990         {
991             fConstant->fillInConstants(fRowIn, fRowOut);
992         }
993         rowSize = fRowOut.getSize();
994         fRowGroupOut.incRowCount();
995         fRowOut.nextRow(rowSize);
996         finalPQ.pop();
997         count++;
998     }
999 
1000     if (!fConstant)
1001     {
1002         while(finalPQ.size())
1003         {
1004             if (cancelled())
1005             {
1006                 break;
1007             }
1008 
1009             while (count < fLimitCount && finalPQ.size()
1010                     && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
1011             {
1012                 ordering::OrderByRow &topOBRow =
1013                     const_cast<ordering::OrderByRow&>(finalPQ.top());
1014 
1015                 fRowIn.setData(topOBRow.fData);
1016                 copyRow(fRowIn, &fRowOut);
1017                 fRowGroupOut.incRowCount();
1018                 fRowOut.nextRow(rowSize);
1019 
1020                 finalPQ.pop();
1021                 count++;
1022             }
1023 
1024             if (fRowGroupOut.getRowCount() > 0)
1025             {
1026                 fRowsReturned += fRowGroupOut.getRowCount();
1027                 fOutputDL->insert(rgDataOut);
1028                 rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
1029                 fRowGroupOut.setData(&rgDataOut);
1030                 fRowGroupOut.resetRowGroup(0);
1031                 fRowGroupOut.getRow(0, &fRowOut);
1032             }
1033             else
1034             {
1035                 break;
1036             }
1037         } // end of limit bound while loop
1038     }
1039     else // Add ConstantColumns striped earlier
1040     {
1041         while(finalPQ.size())
1042         {
1043             if (cancelled())
1044             {
1045                 break;
1046             }
1047 
1048             while (count < fLimitCount && finalPQ.size()
1049                     && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
1050             {
1051                 ordering::OrderByRow &topOBRow =
1052                     const_cast<ordering::OrderByRow&>(finalPQ.top());
1053 
1054                 fRowIn.setData(topOBRow.fData);
1055                 fConstant->fillInConstants(fRowIn, fRowOut);
1056                 fRowGroupOut.incRowCount();
1057                 fRowOut.nextRow(rowSize);
1058 
1059                 finalPQ.pop();
1060                 count++;
1061                 if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
1062                 {
1063                     break;
1064                 }
1065             }
1066 
1067             if (fRowGroupOut.getRowCount() > 0)
1068             {
1069                 fRowsReturned += fRowGroupOut.getRowCount();
1070                 fOutputDL->insert(rgDataOut);
1071                 rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
1072                 fRowGroupOut.setData(&rgDataOut);
1073                 fRowGroupOut.resetRowGroup(0);
1074                 fRowGroupOut.getRow(0, &fRowOut);
1075             }
1076             else
1077             {
1078                 break;
1079             }
1080         } // end of limit bound while loop
1081     } // end of if-else
1082 
1083     if (fRowGroupOut.getRowCount() > 0)
1084     {
1085         fRowsReturned += fRowGroupOut.getRowCount();
1086         fOutputDL->insert(rgDataOut);
1087     }
1088 
1089     fOutputDL->endOfInput();
1090 
1091     StepTeleStats sts;
1092     sts.query_uuid = fQueryUuid;
1093     sts.step_uuid = fStepUuid;
1094     sts.msg_type = StepTeleStats::ST_SUMMARY;
1095     sts.total_units_of_work = sts.units_of_work_completed = 1;
1096     sts.rows = fRowsReturned;
1097     postStepSummaryTele(sts);
1098 
1099     if (traceOn())
1100     {
1101         if (dlTimes.FirstReadTime().tv_sec == 0)
1102             dlTimes.setFirstReadTime();
1103 
1104         dlTimes.setLastReadTime();
1105         dlTimes.setEndOfInputTime();
1106         printCalTrace();
1107     }
1108 }
1109 
executeParallelOrderBy(uint64_t id)1110 void TupleAnnexStep::executeParallelOrderBy(uint64_t id)
1111 {
1112     utils::setThreadName("TASwParOrd");
1113     RGData rgDataIn;
1114     RGData rgDataOut;
1115     bool more = false;
1116     uint64_t dlOffset = 0;
1117     uint32_t rowSize = 0;
1118 
1119     uint64_t rowCount = 0;
1120     uint64_t doubleRGSize = 2*rowgroup::rgCommonSize;
1121     rowgroup::Row r = fRowIn;
1122     rowgroup::RowGroup rg = fRowGroupIn;
1123     rg.initRow(&r);
1124     LimitedOrderBy *limOrderBy = fOrderByList[id];
1125     ordering::SortingPQ &currentPQ = limOrderBy->getQueue();
1126     if (limOrderBy->getLimitCount() < QUEUE_RESERVE_SIZE)
1127     {
1128         currentPQ.reserve(limOrderBy->getLimitCount());
1129     }
1130     else
1131     {
1132         currentPQ.reserve(QUEUE_RESERVE_SIZE);
1133     }
1134 
1135     try
1136     {
1137         more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
1138         if (more) dlOffset++;
1139 
1140         while (more && !cancelled())
1141         {
1142             if (dlOffset%fMaxThreads == id-1)
1143             {
1144                 if (cancelled())
1145                     break;
1146 
1147                 if (currentPQ.capacity()-currentPQ.size() < doubleRGSize)
1148                 {
1149                     currentPQ.reserve(QUEUE_RESERVE_SIZE);
1150                 }
1151 
1152                 rg.setData(&rgDataIn);
1153                 rg.getRow(0, &r);
1154                 if (!rowSize)
1155                 {
1156                     rowSize = r.getSize();
1157                 }
1158                 rowCount = rg.getRowCount();
1159 
1160                 for (uint64_t i = 0; i < rowCount; ++i)
1161                 {
1162                     limOrderBy->processRow(r);
1163                     r.nextRow(rowSize);
1164                }
1165             }
1166 
1167             // *DRRTUY Implement a method to skip elements in FIFO
1168             more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
1169             if(more) dlOffset++;
1170         }
1171     }
1172     catch (...)
1173     {
1174         handleException(std::current_exception(),
1175                         logging::ERR_IN_PROCESS,
1176                         logging::ERR_ALWAYS_CRITICAL,
1177                         "TupleAnnexStep::executeParallelOrderBy()");
1178     }
1179 
1180     // read out the input DL
1181     while (more)
1182         more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
1183 
1184     // Count finished sorting threads under mutex and run final
1185     // sort step when the last thread converges
1186     fParallelFinalizeMutex.lock();
1187     fFinishedThreads++;
1188     if (fFinishedThreads == fMaxThreads)
1189     {
1190         fParallelFinalizeMutex.unlock();
1191         if(fDistinct)
1192         {
1193             finalizeParallelOrderByDistinct();
1194         }
1195         else
1196         {
1197             finalizeParallelOrderBy();
1198         }
1199     }
1200     else
1201     {
1202         fParallelFinalizeMutex.unlock();
1203     }
1204 }
1205 
getOutputRowGroup() const1206 const RowGroup& TupleAnnexStep::getOutputRowGroup() const
1207 {
1208     return fRowGroupOut;
1209 }
1210 
1211 
getDeliveredRowGroup() const1212 const RowGroup& TupleAnnexStep::getDeliveredRowGroup() const
1213 {
1214     return fRowGroupDeliver;
1215 }
1216 
1217 
deliverStringTableRowGroup(bool b)1218 void TupleAnnexStep::deliverStringTableRowGroup(bool b)
1219 {
1220     fRowGroupOut.setUseStringTable(b);
1221     fRowGroupDeliver.setUseStringTable(b);
1222 }
1223 
1224 
deliverStringTableRowGroup() const1225 bool TupleAnnexStep::deliverStringTableRowGroup() const
1226 {
1227     idbassert(fRowGroupOut.usesStringTable() == fRowGroupDeliver.usesStringTable());
1228     return fRowGroupDeliver.usesStringTable();
1229 }
1230 
1231 
toString() const1232 const string TupleAnnexStep::toString() const
1233 {
1234     ostringstream oss;
1235     oss << "AnnexStep ";
1236     oss << "  ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
1237 
1238     oss << " in:";
1239 
1240     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
1241         oss << fInputJobStepAssociation.outAt(i);
1242 
1243     oss << " out:";
1244 
1245     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
1246         oss << fOutputJobStepAssociation.outAt(i);
1247 
1248     if (fOrderBy)
1249         oss << "    " << fOrderBy->toString();
1250 
1251     if (fConstant)
1252         oss << "    " << fConstant->toString();
1253 
1254     oss << endl;
1255 
1256     return oss.str();
1257 }
1258 
1259 
printCalTrace()1260 void TupleAnnexStep::printCalTrace()
1261 {
1262     time_t t = time (0);
1263     char timeString[50];
1264     ctime_r (&t, timeString);
1265     timeString[strlen (timeString ) - 1] = '\0';
1266     ostringstream logStr;
1267     logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
1268             << "; total rows returned-" << fRowsReturned << endl
1269             << "\t1st read " << dlTimes.FirstReadTimeString()
1270             << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
1271             << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
1272             << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
1273             << "\tJob completion status " << status() << endl;
1274     logEnd(logStr.str().c_str());
1275     fExtendedInfo += logStr.str();
1276     formatMiniStats();
1277 }
1278 
1279 
formatMiniStats()1280 void TupleAnnexStep::formatMiniStats()
1281 {
1282     ostringstream oss;
1283     oss << "TNS ";
1284     oss << "UM "
1285         << "- "
1286         << "- "
1287         << "- "
1288         << "- "
1289         << "- "
1290         << "- "
1291         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1292         << fRowsReturned << " ";
1293     fMiniInfo += oss.str();
1294 }
1295 
1296 
1297 }   //namespace
1298 // vim:ts=4 sw=4:
1299 
1300