1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: tupleannexstep.cpp 9661 2013-07-01 20:33:05Z pleblanc $
20
21
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 #ifdef _MSC_VER
27 #include <unordered_set>
28 #else
29 #include <tr1/unordered_set>
30 #endif
31 using namespace std;
32
33 #include <boost/shared_ptr.hpp>
34 #include <boost/shared_array.hpp>
35 #include <boost/uuid/uuid_io.hpp>
36 using namespace boost;
37
38 #include "messagequeue.h"
39 using namespace messageqcpp;
40
41 #include "loggingid.h"
42 #include "errorcodes.h"
43 using namespace logging;
44
45 #include "calpontsystemcatalog.h"
46 #include "constantcolumn.h"
47 #include "simplecolumn.h"
48 using namespace execplan;
49
50 #include "rowgroup.h"
51 using namespace rowgroup;
52
53 #include "hasher.h"
54 #include "stlpoolallocator.h"
55 #include "threadnaming.h"
56 using namespace utils;
57
58 #include "querytele.h"
59 using namespace querytele;
60
61 #include "funcexp.h"
62 #include "jobstep.h"
63 #include "jlf_common.h"
64 #include "tupleconstantstep.h"
65 #include "limitedorderby.h"
66
67 #include "tupleannexstep.h"
68
69 #define QUEUE_RESERVE_SIZE 100000
70
71 namespace
72 {
73 struct TAHasher
74 {
75 joblist::TupleAnnexStep* ts;
76 utils::Hasher_r h;
TAHasher__anon44a169c20111::TAHasher77 TAHasher(joblist::TupleAnnexStep* t) : ts(t) { }
78 uint64_t operator()(const rowgroup::Row::Pointer&) const;
79 };
80 struct TAEq
81 {
82 joblist::TupleAnnexStep* ts;
TAEq__anon44a169c20111::TAEq83 TAEq(joblist::TupleAnnexStep* t) : ts(t) { }
84 bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const;
85 };
86 //TODO: Generalize these and put them back in utils/common/hasher.h
87 typedef tr1::unordered_set<rowgroup::Row::Pointer, TAHasher, TAEq,
88 STLPoolAllocator<rowgroup::Row::Pointer> > DistinctMap_t;
89 };
90
operator ()(const Row::Pointer & p) const91 inline uint64_t TAHasher::operator()(const Row::Pointer& p) const
92 {
93 Row& row = ts->row1;
94 row.setPointer(p);
95 return row.hash();
96 }
97
operator ()(const Row::Pointer & d1,const Row::Pointer & d2) const98 inline bool TAEq::operator()(const Row::Pointer& d1, const Row::Pointer& d2) const
99 {
100 Row& r1 = ts->row1, &r2 = ts->row2;
101 r1.setPointer(d1);
102 r2.setPointer(d2);
103 return r1.equals(r2);
104 }
105
106 namespace joblist
107 {
108
TupleAnnexStep(const JobInfo & jobInfo)109 TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) :
110 JobStep(jobInfo),
111 fInputDL(NULL),
112 fOutputDL(NULL),
113 fInputIterator(0),
114 fOutputIterator(0),
115 fRunner(0),
116 fRowsProcessed(0),
117 fRowsReturned(0),
118 fLimitStart(0),
119 fLimitCount(-1),
120 fLimitHit(false),
121 fEndOfResult(false),
122 fDistinct(false),
123 fParallelOp(false),
124 fOrderBy(NULL),
125 fConstant(NULL),
126 fFeInstance(funcexp::FuncExp::instance()),
127 fJobList(jobInfo.jobListPtr),
128 fFinishedThreads(0)
129 {
130 fExtendedInfo = "TNS: ";
131 fQtc.stepParms().stepType = StepTeleStats::T_TNS;
132 }
133
134
~TupleAnnexStep()135 TupleAnnexStep::~TupleAnnexStep()
136 {
137 if(fParallelOp)
138 {
139 if(fOrderByList.size() > 0)
140 {
141 for(uint64_t id = 0; id < fOrderByList.size(); id++)
142 {
143 delete fOrderByList[id];
144 }
145
146 fOrderByList.clear();
147 }
148
149 fInputIteratorsList.clear();
150 fRunnersList.clear();
151 }
152
153 if (fOrderBy)
154 delete fOrderBy;
155
156 fOrderBy = NULL;
157
158 if (fConstant)
159 delete fConstant;
160
161 fConstant = NULL;
162 }
163
164
setOutputRowGroup(const rowgroup::RowGroup & rg)165 void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
166 {
167 throw runtime_error("Disabled, use initialize() to set output RowGroup.");
168 }
169
170
initialize(const RowGroup & rgIn,const JobInfo & jobInfo)171 void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
172 {
173 // Initialize structures used by separate workers
174 uint64_t id = 1;
175 fRowGroupIn = rgIn;
176 fRowGroupIn.initRow(&fRowIn);
177 if(fParallelOp && fOrderBy)
178 {
179 fOrderByList.resize(fMaxThreads+1);
180 for(id = 0; id <= fMaxThreads; id++)
181 {
182 // *DRRTUY use SP here?
183 fOrderByList[id] = new LimitedOrderBy();
184 fOrderByList[id]->distinct(fDistinct);
185 fOrderByList[id]->initialize(rgIn, jobInfo, false, true);
186 }
187 }
188 else
189 {
190 if (fOrderBy)
191 {
192 fOrderBy->distinct(fDistinct);
193 fOrderBy->initialize(rgIn, jobInfo);
194 }
195 }
196
197 if (fConstant == NULL)
198 {
199 vector<uint32_t> oids, oidsIn = rgIn.getOIDs();
200 vector<uint32_t> keys, keysIn = rgIn.getKeys();
201 vector<uint32_t> scale, scaleIn = rgIn.getScale();
202 vector<uint32_t> precision, precisionIn = rgIn.getPrecision();
203 vector<CalpontSystemCatalog::ColDataType> types, typesIn = rgIn.getColTypes();
204 vector<uint32_t> csNums, csNumsIn = rgIn.getCharsetNumbers();
205 vector<uint32_t> pos, posIn = rgIn.getOffsets();
206 size_t n = jobInfo.nonConstDelCols.size();
207
208 // Add all columns into output RG as keys. Can we put only keys?
209 oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n);
210 keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n);
211 scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n);
212 precision.insert(precision.end(), precisionIn.begin(), precisionIn.begin() + n);
213 types.insert(types.end(), typesIn.begin(), typesIn.begin() + n);
214 csNums.insert(csNums.end(), csNumsIn.begin(), csNumsIn.begin() + n);
215 pos.insert(pos.end(), posIn.begin(), posIn.begin() + n + 1);
216
217 fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
218 }
219 else
220 {
221 fConstant->initialize(jobInfo, &rgIn);
222 fRowGroupOut = fConstant->getOutputRowGroup();
223 }
224
225 fRowGroupOut.initRow(&fRowOut);
226 fRowGroupDeliver = fRowGroupOut;
227 }
228
229
run()230 void TupleAnnexStep::run()
231 {
232 if (fInputJobStepAssociation.outSize() == 0)
233 throw logic_error("No input data list for annex step.");
234
235 fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
236
237 if (fInputDL == NULL)
238 throw logic_error("Input is not a RowGroup data list.");
239
240 if (fOutputJobStepAssociation.outSize() == 0)
241 throw logic_error("No output data list for annex step.");
242
243 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
244
245 if (fOutputDL == NULL)
246 throw logic_error("Output is not a RowGroup data list.");
247
248 if (fDelivery == true)
249 {
250 fOutputIterator = fOutputDL->getIterator();
251 }
252
253 if(fParallelOp)
254 {
255 // Indexing begins with 1
256 fRunnersList.resize(fMaxThreads);
257 fInputIteratorsList.resize(fMaxThreads+1);
258
259 // Activate stats collecting before CS spawns threads.
260 if (traceOn()) dlTimes.setFirstReadTime();
261
262 // *DRRTUY Make this block conditional
263 StepTeleStats sts;
264 sts.query_uuid = fQueryUuid;
265 sts.step_uuid = fStepUuid;
266 sts.msg_type = StepTeleStats::ST_START;
267 sts.total_units_of_work = 1;
268 postStepStartTele(sts);
269
270 for(uint32_t id = 1; id <= fMaxThreads; id++)
271 {
272 fInputIteratorsList[id] = fInputDL->getIterator();
273 fRunnersList[id-1] = jobstepThreadPool.invoke(Runner(this, id));
274 }
275 }
276 else
277 {
278 fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
279
280 if (fInputDL == NULL)
281 throw logic_error("Input is not a RowGroup data list.");
282
283 fInputIterator = fInputDL->getIterator();
284 fRunner = jobstepThreadPool.invoke(Runner(this));
285 }
286
287 }
288
join()289 void TupleAnnexStep::join()
290 {
291 if(fParallelOp)
292 {
293 jobstepThreadPool.join(fRunnersList);
294 }
295 else
296 {
297 if (fRunner)
298 {
299 jobstepThreadPool.join(fRunner);
300 }
301 }
302
303 }
304
nextBand(messageqcpp::ByteStream & bs)305 uint32_t TupleAnnexStep::nextBand(messageqcpp::ByteStream& bs)
306 {
307 RGData rgDataOut;
308 bool more = false;
309 uint32_t rowCount = 0;
310
311 try
312 {
313 bs.restart();
314
315 more = fOutputDL->next(fOutputIterator, &rgDataOut);
316
317 if (more && !cancelled())
318 {
319 fRowGroupDeliver.setData(&rgDataOut);
320 fRowGroupDeliver.serializeRGData(bs);
321 rowCount = fRowGroupDeliver.getRowCount();
322 }
323 else
324 {
325 while (more)
326 more = fOutputDL->next(fOutputIterator, &rgDataOut);
327
328 fEndOfResult = true;
329 }
330 }
331 catch (...)
332 {
333 handleException(std::current_exception(),
334 logging::ERR_IN_DELIVERY,
335 logging::ERR_ALWAYS_CRITICAL,
336 "TupleAnnexStep::nextBand()");
337 while (more)
338 more = fOutputDL->next(fOutputIterator, &rgDataOut);
339
340 fEndOfResult = true;
341 }
342
343 if (fEndOfResult)
344 {
345 // send an empty / error band
346 rgDataOut.reinit(fRowGroupDeliver, 0);
347 fRowGroupDeliver.setData(&rgDataOut);
348 fRowGroupDeliver.resetRowGroup(0);
349 fRowGroupDeliver.setStatus(status());
350 fRowGroupDeliver.serializeRGData(bs);
351 }
352
353 return rowCount;
354 }
355
356
execute()357 void TupleAnnexStep::execute()
358 {
359 if (fOrderBy)
360 executeWithOrderBy();
361 else if (fDistinct)
362 executeNoOrderByWithDistinct();
363 else
364 executeNoOrderBy();
365
366 StepTeleStats sts;
367 sts.query_uuid = fQueryUuid;
368 sts.step_uuid = fStepUuid;
369 sts.msg_type = StepTeleStats::ST_SUMMARY;
370 sts.total_units_of_work = sts.units_of_work_completed = 1;
371 sts.rows = fRowsReturned;
372 postStepSummaryTele(sts);
373
374 if (traceOn())
375 {
376 if (dlTimes.FirstReadTime().tv_sec == 0)
377 dlTimes.setFirstReadTime();
378
379 dlTimes.setLastReadTime();
380 dlTimes.setEndOfInputTime();
381 printCalTrace();
382 }
383 }
384
execute(uint32_t id)385 void TupleAnnexStep::execute(uint32_t id)
386 {
387 if(fOrderByList[id])
388 executeParallelOrderBy(id);
389
390 }
391
executeNoOrderBy()392 void TupleAnnexStep::executeNoOrderBy()
393 {
394 utils::setThreadName("TASwoOrd");
395 RGData rgDataIn;
396 RGData rgDataOut;
397 bool more = false;
398
399 try
400 {
401 more = fInputDL->next(fInputIterator, &rgDataIn);
402
403 if (traceOn()) dlTimes.setFirstReadTime();
404
405 StepTeleStats sts;
406 sts.query_uuid = fQueryUuid;
407 sts.step_uuid = fStepUuid;
408 sts.msg_type = StepTeleStats::ST_START;
409 sts.total_units_of_work = 1;
410 postStepStartTele(sts);
411
412 while (more && !cancelled() && !fLimitHit)
413 {
414 fRowGroupIn.setData(&rgDataIn);
415 fRowGroupIn.getRow(0, &fRowIn);
416 // Get a new output rowgroup for each input rowgroup to preserve the rids
417 rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
418 fRowGroupOut.setData(&rgDataOut);
419 fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
420 fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
421 fRowGroupOut.getRow(0, &fRowOut);
422
423 for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i)
424 {
425 // skip first limit-start rows
426 if (fRowsProcessed++ < fLimitStart)
427 {
428 fRowIn.nextRow();
429 continue;
430 }
431
432 if (UNLIKELY(fRowsReturned >= fLimitCount))
433 {
434 fLimitHit = true;
435 fJobList->abortOnLimit((JobStep*) this);
436 continue;
437 }
438
439 if (fConstant)
440 fConstant->fillInConstants(fRowIn, fRowOut);
441 else
442 copyRow(fRowIn, &fRowOut);
443
444 fRowGroupOut.incRowCount();
445
446 if (++fRowsReturned < fLimitCount)
447 {
448 fRowOut.nextRow();
449 fRowIn.nextRow();
450 }
451 }
452
453 if (fRowGroupOut.getRowCount() > 0)
454 {
455 fOutputDL->insert(rgDataOut);
456 }
457
458 more = fInputDL->next(fInputIterator, &rgDataIn);
459 }
460 }
461 catch (...)
462 {
463 handleException(std::current_exception(),
464 logging::ERR_IN_PROCESS,
465 logging::ERR_ALWAYS_CRITICAL,
466 "TupleAnnexStep::executeNoOrderBy()");
467 }
468
469 while (more)
470 more = fInputDL->next(fInputIterator, &rgDataIn);
471
472 // Bug 3136, let mini stats to be formatted if traceOn.
473 fOutputDL->endOfInput();
474 }
475
476
executeNoOrderByWithDistinct()477 void TupleAnnexStep::executeNoOrderByWithDistinct()
478 {
479 utils::setThreadName("TASwoOrdDist");
480 scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
481 vector<RGData> dataVec;
482 vector<RGData> dataVecSkip;
483 RGData rgDataIn;
484 RGData rgDataOut;
485 RGData rgDataSkip;
486 RowGroup rowGroupSkip;
487 Row rowSkip;
488 bool more = false;
489
490 rgDataOut.reinit(fRowGroupOut);
491 fRowGroupOut.setData(&rgDataOut);
492 fRowGroupOut.resetRowGroup(0);
493 fRowGroupOut.getRow(0, &fRowOut);
494
495 fRowGroupOut.initRow(&row1);
496 fRowGroupOut.initRow(&row2);
497
498 rowGroupSkip = fRowGroupOut;
499 rgDataSkip.reinit(rowGroupSkip);
500 rowGroupSkip.setData(&rgDataSkip);
501 rowGroupSkip.resetRowGroup(0);
502 rowGroupSkip.initRow(&rowSkip);
503 rowGroupSkip.getRow(0, &rowSkip);
504
505 try
506 {
507 more = fInputDL->next(fInputIterator, &rgDataIn);
508
509 if (traceOn()) dlTimes.setFirstReadTime();
510
511 StepTeleStats sts;
512 sts.query_uuid = fQueryUuid;
513 sts.step_uuid = fStepUuid;
514 sts.msg_type = StepTeleStats::ST_START;
515 sts.total_units_of_work = 1;
516 postStepStartTele(sts);
517
518 while (more && !cancelled() && !fLimitHit)
519 {
520 fRowGroupIn.setData(&rgDataIn);
521 fRowGroupIn.getRow(0, &fRowIn);
522
523 for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i)
524 {
525 pair<DistinctMap_t::iterator, bool> inserted;
526 Row* rowPtr;
527
528 if (distinctMap->size() < fLimitStart)
529 rowPtr = &rowSkip;
530 else
531 rowPtr = &fRowOut;
532
533 if (fConstant)
534 fConstant->fillInConstants(fRowIn, *rowPtr);
535 else
536 copyRow(fRowIn, rowPtr);
537
538 fRowIn.nextRow();
539 inserted = distinctMap->insert(rowPtr->getPointer());
540 ++fRowsProcessed;
541
542 if (inserted.second)
543 {
544 if (UNLIKELY(fRowsReturned >= fLimitCount))
545 {
546 fLimitHit = true;
547 fJobList->abortOnLimit((JobStep*) this);
548 break;
549 }
550
551 // skip first limit-start rows
552 if (distinctMap->size() <= fLimitStart)
553 {
554 rowGroupSkip.incRowCount();
555 rowSkip.nextRow();
556 if (UNLIKELY(rowGroupSkip.getRowCount() >= rowgroup::rgCommonSize))
557 {
558 // allocate new RGData for skipped rows below the fLimitStart
559 // offset (do not take it into account in RM assuming there
560 // are few skipped rows)
561 dataVecSkip.push_back(rgDataSkip);
562 rgDataSkip.reinit(rowGroupSkip);
563 rowGroupSkip.setData(&rgDataSkip);
564 rowGroupSkip.resetRowGroup(0);
565 rowGroupSkip.getRow(0, &rowSkip);
566 }
567 continue;
568 }
569
570 ++fRowsReturned;
571
572 fRowGroupOut.incRowCount();
573 fRowOut.nextRow();
574
575 if (UNLIKELY(fRowGroupOut.getRowCount() >= rowgroup::rgCommonSize))
576 {
577 dataVec.push_back(rgDataOut);
578 rgDataOut.reinit(fRowGroupOut);
579 fRowGroupOut.setData(&rgDataOut);
580 fRowGroupOut.resetRowGroup(0);
581 fRowGroupOut.getRow(0, &fRowOut);
582 }
583 }
584 }
585
586 more = fInputDL->next(fInputIterator, &rgDataIn);
587 }
588
589 if (fRowGroupOut.getRowCount() > 0)
590 dataVec.push_back(rgDataOut);
591
592 for (vector<RGData>::iterator i = dataVec.begin(); i != dataVec.end(); i++)
593 {
594 rgDataOut = *i;
595 fRowGroupOut.setData(&rgDataOut);
596 fOutputDL->insert(rgDataOut);
597 }
598 }
599 catch (...)
600 {
601 handleException(std::current_exception(),
602 logging::ERR_IN_PROCESS,
603 logging::ERR_ALWAYS_CRITICAL,
604 "TupleAnnexStep::executeNoOrderByWithDistinct()");
605 }
606
607 while (more)
608 more = fInputDL->next(fInputIterator, &rgDataIn);
609
610 // Bug 3136, let mini stats to be formatted if traceOn.
611 fOutputDL->endOfInput();
612 }
613
614
executeWithOrderBy()615 void TupleAnnexStep::executeWithOrderBy()
616 {
617 utils::setThreadName("TASwOrd");
618 RGData rgDataIn;
619 RGData rgDataOut;
620 bool more = false;
621
622 try
623 {
624 more = fInputDL->next(fInputIterator, &rgDataIn);
625
626 if (traceOn()) dlTimes.setFirstReadTime();
627
628 StepTeleStats sts;
629 sts.query_uuid = fQueryUuid;
630 sts.step_uuid = fStepUuid;
631 sts.msg_type = StepTeleStats::ST_START;
632 sts.total_units_of_work = 1;
633 postStepStartTele(sts);
634
635 while (more && !cancelled())
636 {
637 fRowGroupIn.setData(&rgDataIn);
638 fRowGroupIn.getRow(0, &fRowIn);
639
640 for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled(); ++i)
641 {
642 fOrderBy->processRow(fRowIn);
643 fRowIn.nextRow();
644 }
645
646 more = fInputDL->next(fInputIterator, &rgDataIn);
647 }
648
649 fOrderBy->finalize();
650
651 if (!cancelled())
652 {
653 while (fOrderBy->getData(rgDataIn))
654 {
655 if (fConstant == NULL &&
656 fRowGroupOut.getColumnCount() == fRowGroupIn.getColumnCount())
657 {
658 rgDataOut = rgDataIn;
659 fRowGroupOut.setData(&rgDataOut);
660 }
661 else
662 {
663 fRowGroupIn.setData(&rgDataIn);
664 fRowGroupIn.getRow(0, &fRowIn);
665
666 rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
667 fRowGroupOut.setData(&rgDataOut);
668 fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
669 fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
670 fRowGroupOut.getRow(0, &fRowOut);
671
672 for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
673 {
674 if (fConstant)
675 fConstant->fillInConstants(fRowIn, fRowOut);
676 else
677 copyRow(fRowIn, &fRowOut);
678
679 fRowGroupOut.incRowCount();
680 fRowOut.nextRow();
681 fRowIn.nextRow();
682 }
683 }
684
685 if (fRowGroupOut.getRowCount() > 0)
686 {
687 fRowsReturned += fRowGroupOut.getRowCount();
688 fOutputDL->insert(rgDataOut);
689 }
690 }
691 }
692 }
693 catch (...)
694 {
695 handleException(std::current_exception(),
696 logging::ERR_IN_PROCESS,
697 logging::ERR_ALWAYS_CRITICAL,
698 "TupleAnnexStep::executeWithOrderBy()");
699 }
700
701 while (more)
702 more = fInputDL->next(fInputIterator, &rgDataIn);
703
704 // Bug 3136, let mini stats to be formatted if traceOn.
705 fOutputDL->endOfInput();
706 }
707
708 /*
709 The m() iterates over thread's LimitedOrderBy instances,
710 reverts the rules and then populates the final collection
711 used for final sorting. The method uses OrderByRow that
712 combination of Row::data and comparison rules.
713 When m() finishes with thread's LOBs it iterates over
714 final sorting collection, populates rgDataOut, then
715 sends it into outputDL.
716 Changing this method don't forget to make changes in
717 finalizeParallelOrderBy() that is a clone.
718 !!!The method doesn't set Row::baseRid
719 */
finalizeParallelOrderByDistinct()720 void TupleAnnexStep::finalizeParallelOrderByDistinct()
721 {
722 utils::setThreadName("TASwParOrdDistM");
723 uint64_t count = 0;
724 uint64_t offset = 0;
725 uint32_t rowSize = 0;
726
727 rowgroup::RGData rgDataOut;
728 rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
729 fRowGroupOut.setData(&rgDataOut);
730 fRowGroupOut.resetRowGroup(0);
731 // Calculate offset here
732 fRowGroupOut.getRow(0, &fRowOut);
733 ordering::SortingPQ finalPQ;
734 scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
735 fRowGroupIn.initRow(&row1);
736 fRowGroupIn.initRow(&row2);
737
738 try
739 {
740 for(uint64_t id = 1; id <= fMaxThreads; id++)
741 {
742 if (cancelled())
743 {
744 break;
745 }
746 // Revert the ordering rules before we
747 // add rows into the final PQ.
748 fOrderByList[id]->getRule().revertRules();
749 ordering::SortingPQ ¤tPQ = fOrderByList[id]->getQueue();
750 finalPQ.reserve(finalPQ.size()+currentPQ.size());
751 pair<DistinctMap_t::iterator, bool> inserted;
752 while (currentPQ.size())
753 {
754 ordering::OrderByRow &topOBRow =
755 const_cast<ordering::OrderByRow&>(currentPQ.top());
756 inserted = distinctMap->insert(topOBRow.fData);
757 if (inserted.second)
758 {
759 finalPQ.push(topOBRow);
760 }
761 currentPQ.pop();
762 }
763 }
764 }
765 catch (...)
766 {
767 handleException(std::current_exception(),
768 logging::ERR_IN_PROCESS,
769 logging::ERR_ALWAYS_CRITICAL,
770 "TupleAnnexStep::finalizeParallelOrderByDistinct()");
771 }
772
773 // OFFSET processing
774 while (finalPQ.size() && offset < fLimitStart)
775 {
776 offset++;
777 finalPQ.pop();
778 }
779
780 // Calculate rowSize only once
781 if (finalPQ.size())
782 {
783 ordering::OrderByRow& topOBRow =
784 const_cast<ordering::OrderByRow&>(finalPQ.top());
785 fRowIn.setData(topOBRow.fData);
786 if (!fConstant)
787 {
788 copyRow(fRowIn, &fRowOut);
789 }
790 else
791 {
792 fConstant->fillInConstants(fRowIn, fRowOut);
793 }
794 rowSize = fRowOut.getSize();
795 fRowGroupOut.incRowCount();
796 fRowOut.nextRow(rowSize);
797 finalPQ.pop();
798 count++;
799 }
800
801 if (!fConstant)
802 {
803 while(finalPQ.size())
804 {
805 if (cancelled())
806 {
807 break;
808 }
809
810 while (count < fLimitCount && finalPQ.size()
811 && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
812 {
813 ordering::OrderByRow &topOBRow =
814 const_cast<ordering::OrderByRow&>(finalPQ.top());
815
816 fRowIn.setData(topOBRow.fData);
817 copyRow(fRowIn, &fRowOut);
818 fRowGroupOut.incRowCount();
819 fRowOut.nextRow(rowSize);
820
821 finalPQ.pop();
822 count++;
823 if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
824 {
825 break;
826 }
827 }
828
829 if (fRowGroupOut.getRowCount() > 0)
830 {
831 fRowsReturned += fRowGroupOut.getRowCount();
832 fOutputDL->insert(rgDataOut);
833 rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
834 fRowGroupOut.setData(&rgDataOut);
835 fRowGroupOut.resetRowGroup(0);
836 fRowGroupOut.getRow(0, &fRowOut);
837 }
838 else
839 {
840 break;
841 }
842 } // end of limit bound while loop
843 }
844 else // Add ConstantColumns striped earlier
845 {
846 while(finalPQ.size())
847 {
848 if (cancelled())
849 {
850 break;
851 }
852
853 while (count < fLimitCount && finalPQ.size()
854 && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
855 {
856 ordering::OrderByRow &topOBRow =
857 const_cast<ordering::OrderByRow&>(finalPQ.top());
858
859 fRowIn.setData(topOBRow.fData);
860 fConstant->fillInConstants(fRowIn, fRowOut);
861 fRowGroupOut.incRowCount();
862 fRowOut.nextRow(rowSize);
863
864 finalPQ.pop();
865 count++;
866 if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
867 {
868 break;
869 }
870 }
871
872 if (fRowGroupOut.getRowCount() > 0)
873 {
874 fRowsReturned += fRowGroupOut.getRowCount();
875 fOutputDL->insert(rgDataOut);
876 rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
877 fRowGroupOut.setData(&rgDataOut);
878 fRowGroupOut.resetRowGroup(0);
879 fRowGroupOut.getRow(0, &fRowOut);
880 }
881 else
882 {
883 break;
884 }
885 } // end of limit bound while loop
886 } // end of if-else
887
888 if (fRowGroupOut.getRowCount() > 0)
889 {
890 fRowsReturned += fRowGroupOut.getRowCount();
891 fOutputDL->insert(rgDataOut);
892 }
893
894 fOutputDL->endOfInput();
895
896 StepTeleStats sts;
897 sts.query_uuid = fQueryUuid;
898 sts.step_uuid = fStepUuid;
899 sts.msg_type = StepTeleStats::ST_SUMMARY;
900 sts.total_units_of_work = sts.units_of_work_completed = 1;
901 sts.rows = fRowsReturned;
902 postStepSummaryTele(sts);
903
904 if (traceOn())
905 {
906 if (dlTimes.FirstReadTime().tv_sec == 0)
907 dlTimes.setFirstReadTime();
908
909 dlTimes.setLastReadTime();
910 dlTimes.setEndOfInputTime();
911 printCalTrace();
912 }
913 }
914
915 /*
916 The m() iterates over thread's LimitedOrderBy instances,
917 reverts the rules and then populates the final collection
918 used for final sorting. The method uses OrderByRow that
919 combination of Row::data and comparison rules.
920 When m() finishes with thread's LOBs it iterates over
921 final sorting collection, populates rgDataOut, then
922 sends it into outputDL.
923 Changing this method don't forget to make changes in
924 finalizeParallelOrderByDistinct() that is a clone.
925 !!!The method doesn't set Row::baseRid
926 */
finalizeParallelOrderBy()927 void TupleAnnexStep::finalizeParallelOrderBy()
928 {
929 utils::setThreadName("TASwParOrdMerge");
930 uint64_t count = 0;
931 uint64_t offset = 0;
932 uint32_t rowSize = 0;
933
934 rowgroup::RGData rgDataOut;
935 ordering::SortingPQ finalPQ;
936 rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
937 fRowGroupOut.setData(&rgDataOut);
938 fRowGroupOut.resetRowGroup(0);
939 // Calculate offset here
940 fRowGroupOut.getRow(0, &fRowOut);
941
942 try
943 {
944 for(uint64_t id = 1; id <= fMaxThreads; id++)
945 {
946 if (cancelled())
947 {
948 break;
949 }
950 // Revert the ordering rules before we
951 // add rows into the final PQ.
952 fOrderByList[id]->getRule().revertRules();
953 ordering::SortingPQ ¤tPQ = fOrderByList[id]->getQueue();
954 finalPQ.reserve(currentPQ.size());
955 while (currentPQ.size())
956 {
957 ordering::OrderByRow &topOBRow =
958 const_cast<ordering::OrderByRow&>(currentPQ.top());
959 finalPQ.push(topOBRow);
960 currentPQ.pop();
961 }
962 }
963 }
964 catch (...)
965 {
966 handleException(std::current_exception(),
967 logging::ERR_IN_PROCESS,
968 logging::ERR_ALWAYS_CRITICAL,
969 "TupleAnnexStep::finalizeParallelOrderBy()");
970 }
971
972 // OFFSET processing
973 while (finalPQ.size() && offset < fLimitStart)
974 {
975 offset++;
976 finalPQ.pop();
977 }
978
979 // Calculate rowSize only once
980 if (finalPQ.size())
981 {
982 ordering::OrderByRow& topOBRow =
983 const_cast<ordering::OrderByRow&>(finalPQ.top());
984 fRowIn.setData(topOBRow.fData);
985 if (!fConstant)
986 {
987 copyRow(fRowIn, &fRowOut);
988 }
989 else
990 {
991 fConstant->fillInConstants(fRowIn, fRowOut);
992 }
993 rowSize = fRowOut.getSize();
994 fRowGroupOut.incRowCount();
995 fRowOut.nextRow(rowSize);
996 finalPQ.pop();
997 count++;
998 }
999
1000 if (!fConstant)
1001 {
1002 while(finalPQ.size())
1003 {
1004 if (cancelled())
1005 {
1006 break;
1007 }
1008
1009 while (count < fLimitCount && finalPQ.size()
1010 && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
1011 {
1012 ordering::OrderByRow &topOBRow =
1013 const_cast<ordering::OrderByRow&>(finalPQ.top());
1014
1015 fRowIn.setData(topOBRow.fData);
1016 copyRow(fRowIn, &fRowOut);
1017 fRowGroupOut.incRowCount();
1018 fRowOut.nextRow(rowSize);
1019
1020 finalPQ.pop();
1021 count++;
1022 }
1023
1024 if (fRowGroupOut.getRowCount() > 0)
1025 {
1026 fRowsReturned += fRowGroupOut.getRowCount();
1027 fOutputDL->insert(rgDataOut);
1028 rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
1029 fRowGroupOut.setData(&rgDataOut);
1030 fRowGroupOut.resetRowGroup(0);
1031 fRowGroupOut.getRow(0, &fRowOut);
1032 }
1033 else
1034 {
1035 break;
1036 }
1037 } // end of limit bound while loop
1038 }
1039 else // Add ConstantColumns striped earlier
1040 {
1041 while(finalPQ.size())
1042 {
1043 if (cancelled())
1044 {
1045 break;
1046 }
1047
1048 while (count < fLimitCount && finalPQ.size()
1049 && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
1050 {
1051 ordering::OrderByRow &topOBRow =
1052 const_cast<ordering::OrderByRow&>(finalPQ.top());
1053
1054 fRowIn.setData(topOBRow.fData);
1055 fConstant->fillInConstants(fRowIn, fRowOut);
1056 fRowGroupOut.incRowCount();
1057 fRowOut.nextRow(rowSize);
1058
1059 finalPQ.pop();
1060 count++;
1061 if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
1062 {
1063 break;
1064 }
1065 }
1066
1067 if (fRowGroupOut.getRowCount() > 0)
1068 {
1069 fRowsReturned += fRowGroupOut.getRowCount();
1070 fOutputDL->insert(rgDataOut);
1071 rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
1072 fRowGroupOut.setData(&rgDataOut);
1073 fRowGroupOut.resetRowGroup(0);
1074 fRowGroupOut.getRow(0, &fRowOut);
1075 }
1076 else
1077 {
1078 break;
1079 }
1080 } // end of limit bound while loop
1081 } // end of if-else
1082
1083 if (fRowGroupOut.getRowCount() > 0)
1084 {
1085 fRowsReturned += fRowGroupOut.getRowCount();
1086 fOutputDL->insert(rgDataOut);
1087 }
1088
1089 fOutputDL->endOfInput();
1090
1091 StepTeleStats sts;
1092 sts.query_uuid = fQueryUuid;
1093 sts.step_uuid = fStepUuid;
1094 sts.msg_type = StepTeleStats::ST_SUMMARY;
1095 sts.total_units_of_work = sts.units_of_work_completed = 1;
1096 sts.rows = fRowsReturned;
1097 postStepSummaryTele(sts);
1098
1099 if (traceOn())
1100 {
1101 if (dlTimes.FirstReadTime().tv_sec == 0)
1102 dlTimes.setFirstReadTime();
1103
1104 dlTimes.setLastReadTime();
1105 dlTimes.setEndOfInputTime();
1106 printCalTrace();
1107 }
1108 }
1109
executeParallelOrderBy(uint64_t id)1110 void TupleAnnexStep::executeParallelOrderBy(uint64_t id)
1111 {
1112 utils::setThreadName("TASwParOrd");
1113 RGData rgDataIn;
1114 RGData rgDataOut;
1115 bool more = false;
1116 uint64_t dlOffset = 0;
1117 uint32_t rowSize = 0;
1118
1119 uint64_t rowCount = 0;
1120 uint64_t doubleRGSize = 2*rowgroup::rgCommonSize;
1121 rowgroup::Row r = fRowIn;
1122 rowgroup::RowGroup rg = fRowGroupIn;
1123 rg.initRow(&r);
1124 LimitedOrderBy *limOrderBy = fOrderByList[id];
1125 ordering::SortingPQ ¤tPQ = limOrderBy->getQueue();
1126 if (limOrderBy->getLimitCount() < QUEUE_RESERVE_SIZE)
1127 {
1128 currentPQ.reserve(limOrderBy->getLimitCount());
1129 }
1130 else
1131 {
1132 currentPQ.reserve(QUEUE_RESERVE_SIZE);
1133 }
1134
1135 try
1136 {
1137 more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
1138 if (more) dlOffset++;
1139
1140 while (more && !cancelled())
1141 {
1142 if (dlOffset%fMaxThreads == id-1)
1143 {
1144 if (cancelled())
1145 break;
1146
1147 if (currentPQ.capacity()-currentPQ.size() < doubleRGSize)
1148 {
1149 currentPQ.reserve(QUEUE_RESERVE_SIZE);
1150 }
1151
1152 rg.setData(&rgDataIn);
1153 rg.getRow(0, &r);
1154 if (!rowSize)
1155 {
1156 rowSize = r.getSize();
1157 }
1158 rowCount = rg.getRowCount();
1159
1160 for (uint64_t i = 0; i < rowCount; ++i)
1161 {
1162 limOrderBy->processRow(r);
1163 r.nextRow(rowSize);
1164 }
1165 }
1166
1167 // *DRRTUY Implement a method to skip elements in FIFO
1168 more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
1169 if(more) dlOffset++;
1170 }
1171 }
1172 catch (...)
1173 {
1174 handleException(std::current_exception(),
1175 logging::ERR_IN_PROCESS,
1176 logging::ERR_ALWAYS_CRITICAL,
1177 "TupleAnnexStep::executeParallelOrderBy()");
1178 }
1179
1180 // read out the input DL
1181 while (more)
1182 more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
1183
1184 // Count finished sorting threads under mutex and run final
1185 // sort step when the last thread converges
1186 fParallelFinalizeMutex.lock();
1187 fFinishedThreads++;
1188 if (fFinishedThreads == fMaxThreads)
1189 {
1190 fParallelFinalizeMutex.unlock();
1191 if(fDistinct)
1192 {
1193 finalizeParallelOrderByDistinct();
1194 }
1195 else
1196 {
1197 finalizeParallelOrderBy();
1198 }
1199 }
1200 else
1201 {
1202 fParallelFinalizeMutex.unlock();
1203 }
1204 }
1205
getOutputRowGroup() const1206 const RowGroup& TupleAnnexStep::getOutputRowGroup() const
1207 {
1208 return fRowGroupOut;
1209 }
1210
1211
getDeliveredRowGroup() const1212 const RowGroup& TupleAnnexStep::getDeliveredRowGroup() const
1213 {
1214 return fRowGroupDeliver;
1215 }
1216
1217
deliverStringTableRowGroup(bool b)1218 void TupleAnnexStep::deliverStringTableRowGroup(bool b)
1219 {
1220 fRowGroupOut.setUseStringTable(b);
1221 fRowGroupDeliver.setUseStringTable(b);
1222 }
1223
1224
deliverStringTableRowGroup() const1225 bool TupleAnnexStep::deliverStringTableRowGroup() const
1226 {
1227 idbassert(fRowGroupOut.usesStringTable() == fRowGroupDeliver.usesStringTable());
1228 return fRowGroupDeliver.usesStringTable();
1229 }
1230
1231
toString() const1232 const string TupleAnnexStep::toString() const
1233 {
1234 ostringstream oss;
1235 oss << "AnnexStep ";
1236 oss << " ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
1237
1238 oss << " in:";
1239
1240 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
1241 oss << fInputJobStepAssociation.outAt(i);
1242
1243 oss << " out:";
1244
1245 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
1246 oss << fOutputJobStepAssociation.outAt(i);
1247
1248 if (fOrderBy)
1249 oss << " " << fOrderBy->toString();
1250
1251 if (fConstant)
1252 oss << " " << fConstant->toString();
1253
1254 oss << endl;
1255
1256 return oss.str();
1257 }
1258
1259
printCalTrace()1260 void TupleAnnexStep::printCalTrace()
1261 {
1262 time_t t = time (0);
1263 char timeString[50];
1264 ctime_r (&t, timeString);
1265 timeString[strlen (timeString ) - 1] = '\0';
1266 ostringstream logStr;
1267 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
1268 << "; total rows returned-" << fRowsReturned << endl
1269 << "\t1st read " << dlTimes.FirstReadTimeString()
1270 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
1271 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
1272 << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
1273 << "\tJob completion status " << status() << endl;
1274 logEnd(logStr.str().c_str());
1275 fExtendedInfo += logStr.str();
1276 formatMiniStats();
1277 }
1278
1279
formatMiniStats()1280 void TupleAnnexStep::formatMiniStats()
1281 {
1282 ostringstream oss;
1283 oss << "TNS ";
1284 oss << "UM "
1285 << "- "
1286 << "- "
1287 << "- "
1288 << "- "
1289 << "- "
1290 << "- "
1291 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1292 << fRowsReturned << " ";
1293 fMiniInfo += oss.str();
1294 }
1295
1296
1297 } //namespace
1298 // vim:ts=4 sw=4:
1299
1300