1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (c) 2016-2020 MariaDB
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: windowfunctionstep.cpp 9681 2013-07-11 22:58:05Z xlou $
20
21
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 using namespace std;
27
28 #include <boost/algorithm/string.hpp> // to_upper_copy
29 #include <boost/shared_ptr.hpp>
30 #include <boost/shared_array.hpp>
31 #include <boost/thread.hpp>
32 #include <boost/uuid/uuid_io.hpp>
33 using namespace boost;
34
35 #include "atomicops.h"
36 using namespace atomicops;
37
38 #include "loggingid.h"
39 #include "errorcodes.h"
40 #include "idberrorinfo.h"
41 using namespace logging;
42
43 #include "configcpp.h"
44 using namespace config;
45
46 #include "calpontselectexecutionplan.h"
47 #include "calpontsystemcatalog.h"
48 #include "aggregatecolumn.h"
49 #include "arithmeticcolumn.h"
50 #include "constantcolumn.h"
51 #include "functioncolumn.h"
52 #include "pseudocolumn.h"
53 #include "simplefilter.h"
54 #include "windowfunctioncolumn.h"
55 using namespace execplan;
56
57 #include "../../utils/windowfunction/windowfunction.h"
58 #include "../../utils/windowfunction/windowfunctiontype.h"
59 #include "../../utils/windowfunction/framebound.h"
60 #include "../../utils/windowfunction/frameboundrange.h"
61 #include "../../utils/windowfunction/frameboundrow.h"
62 #include "../../utils/windowfunction/windowframe.h"
63 using namespace windowfunction;
64
65 #include "rowgroup.h"
66 using namespace rowgroup;
67
68 using namespace ordering;
69
70 #include "funcexp.h"
71 using namespace funcexp;
72
73 #include "querytele.h"
74 using namespace querytele;
75
76 #include "jlf_common.h"
77 #include "jobstep.h"
78 #include "windowfunctionstep.h"
79 using namespace joblist;
80
81 #include "checks.h"
82
83
84 namespace
85 {
86
87
getColumnIndex(const SRCP & c,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)88 uint64_t getColumnIndex(const SRCP& c, const map<uint64_t, uint64_t>& m, JobInfo& jobInfo)
89 {
90 uint64_t key = getTupleKey(jobInfo, c, true);
91 const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(c.get());
92
93 if (sc != NULL && !sc->schemaName().empty())
94 {
95 // special handling for dictionary
96 CalpontSystemCatalog::ColType ct = sc->colType();
97
98 //XXX use this before connector sets colType in sc correctly.
99 // type of pseudo column is set by connector
100 if (!(dynamic_cast<const PseudoColumn*>(sc)))
101 {
102 ct = jobInfo.csc->colType(sc->oid());
103 ct.charsetNumber =sc->colType().charsetNumber;
104 }
105
106 //X
107 CalpontSystemCatalog::OID dictOid = isDictCol(ct);
108 string alias(extractTableAlias(sc));
109
110 if (dictOid > 0)
111 {
112 TupleInfo ti =
113 setTupleInfo(ct, dictOid, jobInfo, tableOid(sc, jobInfo.csc), sc, alias);
114 key = ti.key;
115 }
116 }
117
118 map<uint64_t, uint64_t>::const_iterator j = m.find(key);
119
120 if (j == m.end())
121 {
122 string name = jobInfo.keyInfo->tupleKeyToName[key];
123 cerr << name << " is not in tuple, key=" << key << endl;
124 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
125 ERR_WF_COLUMN_MISSING);
126 }
127
128 return j->second;
129 }
130
131
132 }
133
134
135 namespace joblist
136 {
137
138
WindowFunctionStep(const JobInfo & jobInfo)139 WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) :
140 JobStep(jobInfo),
141 fRunner(0),
142 fCatalog(jobInfo.csc),
143 fRowsReturned(0),
144 fEndOfResult(false),
145 fIsSelect(true),
146 fUseSSMutex(false),
147 fUseUFMutex(false),
148 fInputDL(NULL),
149 fOutputDL(NULL),
150 fInputIterator(-1),
151 fOutputIterator(-1),
152 fFunctionCount(0),
153 fTotalThreads(1),
154 fNextIndex(0),
155 fMemUsage(0),
156 fRm(jobInfo.rm),
157 fSessionMemLimit(jobInfo.umMemLimit)
158 {
159 fTotalThreads = fRm->windowFunctionThreads();
160 fExtendedInfo = "WFS: ";
161 fQtc.stepParms().stepType = StepTeleStats::T_WFS;
162 }
163
164
~WindowFunctionStep()165 WindowFunctionStep::~WindowFunctionStep()
166 {
167 if (fMemUsage > 0)
168 fRm->returnMemory(fMemUsage, fSessionMemLimit);
169 }
170
171
run()172 void WindowFunctionStep::run()
173 {
174 if (fInputJobStepAssociation.outSize() == 0)
175 throw logic_error("No input data list for window function step.");
176
177 fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
178
179 if (fInputDL == NULL)
180 throw logic_error("Input is not a RowGroup data list in window function step.");
181
182 fInputIterator = fInputDL->getIterator();
183
184 if (fOutputJobStepAssociation.outSize() == 0)
185 throw logic_error("No output data list for window function step.");
186
187 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
188
189 if (fOutputDL == NULL)
190 throw logic_error("Output of window function step is not a RowGroup data list.");
191
192 if (fDelivery == true)
193 {
194 fOutputIterator = fOutputDL->getIterator();
195 }
196
197 fRunner = jobstepThreadPool.invoke(Runner(this));
198 }
199
200
join()201 void WindowFunctionStep::join()
202 {
203 if (fRunner)
204 jobstepThreadPool.join(fRunner);
205 }
206
207
nextBand(messageqcpp::ByteStream & bs)208 uint32_t WindowFunctionStep::nextBand(messageqcpp::ByteStream& bs)
209 {
210 RGData rgDataOut;
211 bool more = false;
212 uint32_t rowCount = 0;
213
214 try
215 {
216 bs.restart();
217
218 more = fOutputDL->next(fOutputIterator, &rgDataOut);
219
220 if (more && !cancelled())
221 {
222 fRowGroupDelivered.setData(&rgDataOut);
223 fRowGroupDelivered.serializeRGData(bs);
224 rowCount = fRowGroupDelivered.getRowCount();
225 }
226 else
227 {
228 while (more)
229 more = fOutputDL->next(fOutputIterator, &rgDataOut);
230
231 fEndOfResult = true;
232 }
233 }
234 catch (...)
235 {
236 handleException(std::current_exception(),
237 logging::ERR_IN_DELIVERY,
238 logging::ERR_WF_DATA_SET_TOO_BIG,
239 "WindowFunctionStep::nextBand()");
240 while (more)
241 more = fOutputDL->next(fOutputIterator, &rgDataOut);
242 fEndOfResult = true;
243 }
244
245 if (fEndOfResult)
246 {
247 // send an empty / error band
248 rgDataOut.reinit(fRowGroupDelivered, 0);
249 fRowGroupDelivered.setData(&rgDataOut);
250 fRowGroupDelivered.resetRowGroup(0);
251 fRowGroupDelivered.setStatus(status());
252 fRowGroupDelivered.serializeRGData(bs);
253 }
254
255 return rowCount;
256 }
257
258
setOutputRowGroup(const RowGroup & rg)259 void WindowFunctionStep::setOutputRowGroup(const RowGroup& rg)
260 {
261 idbassert(0);
262 }
263
264
getOutputRowGroup() const265 const RowGroup& WindowFunctionStep::getOutputRowGroup() const
266 {
267 return fRowGroupOut;
268 }
269
270
getDeliveredRowGroup() const271 const RowGroup& WindowFunctionStep::getDeliveredRowGroup() const
272 {
273 return fRowGroupDelivered;
274 }
275
276
deliverStringTableRowGroup(bool b)277 void WindowFunctionStep::deliverStringTableRowGroup(bool b)
278 {
279 fRowGroupOut.setUseStringTable(b);
280 fRowGroupDelivered.setUseStringTable(b);
281 }
282
283
deliverStringTableRowGroup() const284 bool WindowFunctionStep::deliverStringTableRowGroup() const
285 {
286 idbassert(fRowGroupOut.usesStringTable() == fRowGroupDelivered.usesStringTable());
287 return fRowGroupDelivered.usesStringTable();
288 }
289
290
toString() const291 const string WindowFunctionStep::toString() const
292 {
293 ostringstream oss;
294 oss << "WindowFunctionStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
295
296 oss << " in:";
297
298 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
299 oss << fInputJobStepAssociation.outAt(i);
300
301 if (fOutputJobStepAssociation.outSize() > 0)
302 {
303 oss << " out:";
304
305 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
306 oss << fOutputJobStepAssociation.outAt(i);
307 }
308
309 return oss.str();
310 }
311
AddSimplColumn(const vector<SimpleColumn * > & scs,JobInfo & jobInfo)312 void WindowFunctionStep::AddSimplColumn(const vector<SimpleColumn*>& scs,
313 JobInfo& jobInfo)
314 {
315 // append the simple columns if not already projected
316 set<UniqId> scProjected;
317
318 for (RetColsVector::iterator i = jobInfo.projectionCols.begin();
319 i != jobInfo.projectionCols.end();
320 i++)
321 {
322 SimpleColumn* sc = dynamic_cast<SimpleColumn*>(i->get());
323
324 if (sc != NULL)
325 {
326 if (sc->schemaName().empty())
327 sc->oid(joblist::tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
328
329 scProjected.insert(UniqId(sc));
330 }
331 }
332
333 for (vector<SimpleColumn*>::const_iterator i = scs.begin(); i != scs.end(); i++)
334 {
335 if (scProjected.find(UniqId(*i)) == scProjected.end())
336 {
337 jobInfo.windowDels.push_back(SRCP((*i)->clone()));
338 // MCOL-3343 Enable this if we decide to allow Window Functions to run with
339 // aggregates with no group by. MariaDB allows this. Nobody else in the world does.
340 // There will be more work to get it to function if we try this.
341 // jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
342 scProjected.insert(UniqId(*i));
343 }
344 }
345 }
346
checkWindowFunction(CalpontSelectExecutionPlan * csep,JobInfo & jobInfo)347 void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
348 {
349 // window functions in select clause, selected or in expression
350 jobInfo.windowDels = jobInfo.deliveredCols;
351
352 for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
353 {
354 const vector<WindowFunctionColumn*>& wcl = (*i)->windowfunctionColumnList();
355 RetColsVector wcList;
356
357 for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
358 wcList.push_back(SRCP((*j)->clone()));
359
360 if (!wcList.empty())
361 {
362 jobInfo.windowExps.push_back(*i);
363 jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
364 }
365
366 if (dynamic_cast<WindowFunctionColumn*>(i->get()) != NULL)
367 {
368 jobInfo.windowCols.push_back(*i);
369 jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
370 }
371 else if (!wcList.empty())
372 {
373 jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end());
374
375 for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
376 {
377 jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
378 }
379 }
380 }
381
382 // window functions in order by clause
383 const CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols();
384 RetColsVector wcInOrderby;
385
386 for (uint64_t i = 0; i < orderByCols.size(); i++)
387 {
388 if (orderByCols[i]->orderPos() == (uint64_t)(-1))
389 {
390 WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get());
391 const vector<WindowFunctionColumn*>& wcl = orderByCols[i]->windowfunctionColumnList();
392 RetColsVector wcList;
393
394 for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
395 wcList.push_back(SRCP((*j)->clone()));
396
397 if (wc == NULL && wcList.empty())
398 continue;
399
400 // an window function or expression of window functions
401 wcInOrderby.push_back(orderByCols[i]);
402
403 if (!wcList.empty())
404 {
405 jobInfo.windowExps.push_back(orderByCols[i]);
406 jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
407 }
408
409 if (dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get()) != NULL)
410 {
411 jobInfo.windowCols.push_back(orderByCols[i]);
412 jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
413 }
414 else if (!wcList.empty())
415 {
416 jobInfo.windowCols.insert(
417 jobInfo.windowCols.end(), wcList.begin(), wcList.end());
418
419 for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
420 {
421 jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
422 }
423 }
424 }
425 }
426
427 // no window function involved in the query
428 if (jobInfo.windowCols.empty())
429 return;
430
431 // Add in the non-window side of arithmetic columns and functions
432 for (uint64_t i = 0; i < jobInfo.windowExps.size(); i++)
433 {
434 const ArithmeticColumn* ac =
435 dynamic_cast<const ArithmeticColumn*>(jobInfo.windowExps[i].get());
436 const FunctionColumn* fc =
437 dynamic_cast<const FunctionColumn*>(jobInfo.windowExps[i].get());
438
439 if (ac != NULL && ac->windowfunctionColumnList().size() > 0)
440 {
441 AddSimplColumn(ac->simpleColumnList(), jobInfo);
442 }
443 else if (fc != NULL && fc->windowfunctionColumnList().size() > 0)
444 {
445 AddSimplColumn(fc->simpleColumnList(), jobInfo);
446 }
447 }
448 // reconstruct the delivered column list with auxiliary columns
449 set<uint64_t> colSet;
450 jobInfo.deliveredCols.resize(0);
451
452 for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
453 {
454 jobInfo.deliveredCols.push_back(*i);
455 uint64_t key = getTupleKey(jobInfo, *i, true);
456
457 // TODO: remove duplicates in select clause
458 colSet.insert(key);
459 }
460
461 // add window columns in orderby
462 for (RetColsVector::iterator i = wcInOrderby.begin(); i < wcInOrderby.end(); i++)
463 {
464 jobInfo.deliveredCols.push_back(*i);
465 uint64_t key = getTupleKey(jobInfo, *i, true);
466 colSet.insert(key);
467 }
468
469 // MCOL-3435 We haven't yet checked for aggregate, but we need to know
470 bool hasAggregation = false;
471 for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++)
472 {
473 if (dynamic_cast<AggregateColumn*>(jobInfo.deliveredCols[i].get()) != NULL)
474 {
475 hasAggregation = true;
476 break;
477 }
478 }
479
480 // add non-duplicate auxiliary columns
481 RetColsVector colsInAf;
482
483 for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
484 {
485 uint64_t key = getTupleKey(jobInfo, *i, true);
486
487 if (colSet.find(key) == colSet.end())
488 jobInfo.deliveredCols.push_back(*i);
489
490 RetColsVector columns = dynamic_cast<WindowFunctionColumn*>(i->get())->getColumnList();
491
492 for (RetColsVector::iterator j = columns.begin(); j < columns.end(); j++)
493 {
494 if (dynamic_cast<ConstantColumn*>(j->get()) != NULL)
495 continue;
496
497 key = getTupleKey(jobInfo, *j, true);
498
499 if (colSet.find(key) == colSet.end())
500 {
501 jobInfo.deliveredCols.push_back(*j);
502 // MCOL-3435 Allow Window Functions to run with aggregates with
503 // no group by by inserting a group by for window parameters.
504 if (hasAggregation)
505 {
506 // If an argument is an AggregateColumn, don't group by it.
507 if (dynamic_cast<AggregateColumn*>(j->get()) == NULL)
508 {
509 bool bFound = false;
510 for (std::vector<SRCP>::iterator igpc = csep->groupByCols().begin();
511 igpc < csep->groupByCols().end();
512 ++igpc)
513 {
514 if ((*igpc)->alias() == (*j)->alias())
515 {
516 bFound = true;
517 break;
518 }
519 }
520 if (!bFound)
521 {
522 csep->groupByCols().push_back(*j);
523 }
524 }
525 }
526 }
527
528 colSet.insert(key);
529 }
530 }
531
532 // for handling order by and limit in outer query
533 jobInfo.wfqLimitStart = csep->limitStart();
534 jobInfo.wfqLimitCount = csep->limitNum();
535 csep->limitStart(0);
536 csep->limitNum(-1);
537
538 if (csep->orderByCols().size() > 0)
539 {
540 jobInfo.wfqOrderby = csep->orderByCols();
541 csep->orderByCols().clear();
542
543 // add order by columns
544 for (RetColsVector::iterator i = jobInfo.wfqOrderby.begin(); i < jobInfo.wfqOrderby.end(); i++)
545 {
546 if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
547 continue;
548
549 uint64_t key = getTupleKey(jobInfo, *i, true);
550
551 if (colSet.find(key) == colSet.end())
552 jobInfo.deliveredCols.push_back(*i);
553
554 colSet.insert(key);
555 }
556 }
557 }
558
559
makeWindowFunctionStep(SJSTEP & step,JobInfo & jobInfo)560 SJSTEP WindowFunctionStep::makeWindowFunctionStep(SJSTEP& step, JobInfo& jobInfo)
561 {
562 // create a window function step
563 WindowFunctionStep* ws = new WindowFunctionStep(jobInfo);
564
565 // connect to the feeding step
566 JobStepAssociation jsa;
567 AnyDataListSPtr spdl(new AnyDataList());
568 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
569 dl->OID(execplan::CNX_VTABLE_ID);
570 spdl->rowGroupDL(dl);
571 jsa.outAdd(spdl);
572 ws->inputAssociation(jsa);
573 ws->stepId(step->stepId() + 1);
574 step->outputAssociation(jsa);
575
576 AnyDataListSPtr spdlOut(new AnyDataList());
577 RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
578 dlOut->OID(CNX_VTABLE_ID);
579 spdlOut->rowGroupDL(dlOut);
580 JobStepAssociation jsaOut;
581 jsaOut.outAdd(spdlOut);
582 ws->outputAssociation(jsaOut);
583
584 // configure the rowgroups and index mapping
585 TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(step.get());
586 idbassert(ds != NULL);
587 ws->initialize(ds->getDeliveredRowGroup(), jobInfo);
588
589 // restore the original delivery coloumns
590 jobInfo.deliveredCols = jobInfo.windowDels;
591 jobInfo.nonConstDelCols.clear();
592
593 for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
594 {
595 if (NULL == dynamic_cast<const ConstantColumn*>(i->get()))
596 jobInfo.nonConstDelCols.push_back(*i);
597 }
598
599 return SJSTEP(ws);
600 }
601
602
initialize(const RowGroup & rg,JobInfo & jobInfo)603 void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
604 {
605 if (jobInfo.trace) cout << "Input to WindowFunctionStep: " << rg.toString() << endl;
606
607 // query type decides the output by dbroot or partition
608 // @bug 5631. Insert select should be treated as select
609 fIsSelect = (jobInfo.queryType == "SELECT" ||
610 jobInfo.queryType == "INSERT_SELECT");
611
612 // input row meta data
613 fRowGroupIn = rg;
614 fRowGroupIn.initRow(&fRowIn);
615
616 // make an input map(id, index)
617 map<uint64_t, uint64_t> colIndexMap;
618 uint64_t colCntIn = rg.getColumnCount();
619 const vector<uint32_t>& pos = rg.getOffsets();
620 const vector<uint32_t>& oids = rg.getOIDs();
621 const vector<uint32_t>& keys = rg.getKeys();
622 const vector<CalpontSystemCatalog::ColDataType>& types = rg.getColTypes();
623 const vector<uint32_t>& csNums = rg.getCharsetNumbers();
624 const vector<uint32_t>& scales = rg.getScale();
625 const vector<uint32_t>& precisions = rg.getPrecision();
626
627 for (uint64_t i = 0; i < colCntIn; i++)
628 colIndexMap.insert(make_pair(keys[i], i));
629
630 // @bug6065, window functions that will update string table
631 int64_t wfsUpdateStringTable = 0;
632 int64_t wfsUserFunctionCount = 0;
633
634 for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
635 {
636 bool isUDAF = false;
637 // window function type
638 WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
639 uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index
640 // @bug6065, window functions that will update string table
641 {
642 CalpontSystemCatalog::ColType rt = wc->resultType();
643
644 if ((types[ridx] == CalpontSystemCatalog::CHAR ||
645 types[ridx] == CalpontSystemCatalog::VARCHAR ||
646 types[ridx] == CalpontSystemCatalog::TEXT ||
647 types[ridx] == CalpontSystemCatalog::VARBINARY ||
648 types[ridx] == CalpontSystemCatalog::BLOB) &&
649 rg.getColumnWidth(ridx) >= jobInfo.stringTableThreshold)
650 {
651 ++wfsUpdateStringTable;
652 }
653 }
654
655 // if (boost::iequals(wc->functionName(),"UDAF_FUNC")
656 if (wc->functionName() == "UDAF_FUNC")
657 {
658 isUDAF = true;
659 ++wfsUserFunctionCount;
660 }
661
662 vector<int64_t> fields;
663 fields.push_back(ridx); // result
664 const RetColsVector& parms = wc->functionParms();
665
666 for (uint64_t i = 0; i < parms.size(); i++) // arguments
667 {
668 // skip constant column
669 if (dynamic_cast<const ConstantColumn*>(parms[i].get()) == NULL)
670 fields.push_back(getColumnIndex(parms[i], colIndexMap, jobInfo));
671 else
672 fields.push_back(-1);
673 }
674
675 // partition & order by
676 const RetColsVector& partitions = wc->partitions();
677 vector<uint64_t> eqIdx;
678 vector<uint64_t> peerIdx;
679 vector<IdbSortSpec> sorts;
680
681 for (uint64_t i = 0; i < partitions.size(); i++)
682 {
683 // skip constant column
684 if (dynamic_cast<const ConstantColumn*>(partitions[i].get()) != NULL)
685 continue;
686
687 // get column index
688 uint64_t idx = getColumnIndex(partitions[i], colIndexMap, jobInfo);
689 eqIdx.push_back(idx);
690 sorts.push_back(IdbSortSpec(idx, partitions[i]->asc(), partitions[i]->nullsFirst()));
691 }
692
693 const RetColsVector& orders = wc->orderBy().fOrders;
694
695 for (uint64_t i = 0; i < orders.size(); i++)
696 {
697 // skip constant column
698 if (dynamic_cast<const ConstantColumn*>(orders[i].get()) != NULL)
699 continue;
700
701 // get column index
702 uint64_t idx = getColumnIndex(orders[i], colIndexMap, jobInfo);
703 peerIdx.push_back(idx);
704 sorts.push_back(IdbSortSpec(idx, orders[i]->asc(), orders[i]->nullsFirst()));
705 }
706
707 // functors for sorting
708 boost::shared_ptr<EqualCompData> parts(new EqualCompData(eqIdx, rg));
709 boost::shared_ptr<OrderByData> orderbys(new OrderByData(sorts, rg));
710 boost::shared_ptr<EqualCompData> peers(new EqualCompData(peerIdx, rg));
711
712 // column type for functor templates
713 int ct = 0;
714
715 if (isUDAF)
716 {
717 ct = wc->getUDAFContext().getResultType();
718 }
719 // make sure index is in range
720 else if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
721 ct = types[fields[1]];
722
723 // workaround for functions using "within group (order by)" syntax
724 string fn = boost::to_upper_copy(wc->functionName());
725
726 if ( (fn == "MEDIAN" || fn == "PERCENTILE_CONT" || fn == "PERCENTILE_DISC") &&
727 utils::is_nonnegative(peerIdx[0]) && peerIdx[0] < types.size() )
728 ct = types[peerIdx[0]];
729
730 // create the functor based on function name
731 boost::shared_ptr<WindowFunctionType> func =
732 WindowFunctionType::makeWindowFunction(fn, ct, wc);
733
734 // parse parms after peer and fields are set
735 // functions may need to set order column index
736 func->peer(peers);
737 func->fieldIndex(fields);
738 func->parseParms(parms);
739
740 // window frame
741 const WF_Frame& frame = wc->orderBy().fFrame;
742 int frameUnit = (frame.fIsRange) ? WF__FRAME_RANGE : WF__FRAME_ROWS;
743
744 if (frame.fStart.fFrame == WF_UNBOUNDED_PRECEDING &&
745 frame.fEnd.fFrame == WF_UNBOUNDED_FOLLOWING)
746 frameUnit = WF__FRAME_ROWS;
747
748 boost::shared_ptr<FrameBound> upper = parseFrameBound(
749 frame.fStart, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, true);
750 boost::shared_ptr<FrameBound> lower = parseFrameBound(
751 frame.fEnd, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, false);
752 boost::shared_ptr<WindowFrame> windows(new WindowFrame(frameUnit, upper, lower));
753 func->frameUnit(frameUnit);
754
755 // add to the function list
756 fFunctions.push_back(boost::shared_ptr<WindowFunction>(
757 new WindowFunction(func, parts, orderbys, windows, rg, fRowIn)));
758 fFunctionCount++;
759 }
760
761 // initialize window function expresssions
762 fExpression = jobInfo.windowExps;
763
764 for (RetColsVector::iterator i = fExpression.begin(); i < fExpression.end(); i++)
765 {
766 // output index
767 (*i)->outputIndex(getColumnIndex(*i, colIndexMap, jobInfo));
768
769 // map the input indices
770 const vector<SimpleColumn*>& scols = (*i)->simpleColumnList();
771
772 for (vector<SimpleColumn*>::const_iterator j = scols.begin(); j != scols.end(); j++)
773 {
774 uint64_t key = getTupleKey(jobInfo, *j);
775 CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*j)->colType());
776
777 if (dictOid > 0)
778 {
779 key = jobInfo.keyInfo->dictKeyMap[key];
780 }
781
782 map<uint64_t, uint64_t>::iterator k = colIndexMap.find(key);
783
784 if (k != colIndexMap.end())
785 {
786 (*j)->inputIndex(k->second);
787 }
788 else
789 {
790 string name = jobInfo.keyInfo->tupleKeyToName[key];
791 cerr << name << " is not in tuple, key=" << key << endl;
792 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
793 ERR_WF_COLUMN_MISSING);
794 }
795 }
796
797 ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>((*i).get());
798 FunctionColumn* fc = dynamic_cast<FunctionColumn*>((*i).get());
799
800 if (ac != NULL)
801 {
802 updateWindowCols(ac->expression(), colIndexMap, jobInfo);
803 }
804 else if (fc != NULL)
805 {
806 // RetColsVector wcList = fc->windowfunctionColumnList();
807 // for (RetColsVector::iterator j = wcList.begin(); j != wcList.end(); j++)
808 // (*j)->inputIndex(getColumnIndex(*j, colIndexMap, jobInfo));
809 funcexp::FunctionParm parms = fc->functionParms();
810
811 for (vector<execplan::SPTP>::iterator j = parms.begin(); j < parms.end(); j++)
812 updateWindowCols(j->get(), colIndexMap, jobInfo);
813 }
814 }
815
816 // order by part
817 if (jobInfo.wfqOrderby.size() > 0)
818 {
819 // query order by
820 vector<uint64_t> eqIdx;
821 vector<IdbSortSpec> sorts;
822 const RetColsVector& orderby = jobInfo.wfqOrderby;
823
824 for (uint64_t i = 0; i < orderby.size(); i++)
825 {
826 // skip constant column
827 if (dynamic_cast<const ConstantColumn*>(orderby[i].get()) != NULL)
828 continue;
829
830 // get column index
831 uint64_t idx = getColumnIndex(orderby[i], colIndexMap, jobInfo);
832 sorts.push_back(IdbSortSpec(idx, orderby[i]->asc(), orderby[i]->nullsFirst()));
833 }
834
835 fQueryOrderBy.reset(new OrderByData(sorts, rg));
836 }
837
838 // limit part
839 fQueryLimitStart = jobInfo.wfqLimitStart;
840 fQueryLimitCount = jobInfo.wfqLimitCount;
841
842 // fix the delivered rowgroup data
843 vector<uint64_t> delColIdx;
844
845 for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
846 {
847 // find the none constantant columns in the deliver
848 // leave constants to annexstep for now.
849 if (dynamic_cast<const ConstantColumn*>((*i).get()) != NULL)
850 continue;
851
852 delColIdx.push_back(getColumnIndex(*i, colIndexMap, jobInfo));
853 }
854
855 size_t retColCount = delColIdx.size();
856 vector<uint32_t> pos1;
857 vector<uint32_t> oids1;
858 vector<uint32_t> keys1;
859 vector<uint32_t> scales1;
860 vector<uint32_t> precisions1;
861 vector<CalpontSystemCatalog::ColDataType> types1;
862 vector<uint32_t> csNums1;
863 pos1.push_back(2);
864
865 for (size_t i = 0; i < retColCount; i++)
866 {
867 size_t j = delColIdx[i];
868 pos1.push_back(pos1[i] + (pos[j + 1] - pos[j]));
869 oids1.push_back(oids[j]);
870 keys1.push_back(keys[j]);
871 scales1.push_back(scales[j]);
872 precisions1.push_back(precisions[j]);
873 types1.push_back(types[j]);
874 csNums1.push_back(csNums[j]);
875 }
876
877 fRowGroupDelivered = RowGroup(
878 retColCount, pos1, oids1, keys1, types1, csNums1, scales1, precisions1, jobInfo.stringTableThreshold);
879
880 if (jobInfo.trace)
881 cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
882
883 if (wfsUpdateStringTable > 1)
884 fUseSSMutex = true;
885
886 if (wfsUserFunctionCount > 1)
887 fUseUFMutex = true;
888
889 fRowGroupOut = fRowGroupDelivered;
890 }
891
892
execute()893 void WindowFunctionStep::execute()
894 {
895 RGData rgData;
896 Row row;
897 fRowGroupIn.initRow(&row);
898 bool more = fInputDL->next(fInputIterator, &rgData);
899 uint64_t i = 0; // for RowGroup index in the fInRowGroupData
900
901 if (traceOn()) dlTimes.setFirstReadTime();
902
903 StepTeleStats sts;
904 sts.query_uuid = fQueryUuid;
905 sts.step_uuid = fStepUuid;
906 sts.msg_type = StepTeleStats::ST_START;
907 sts.total_units_of_work = 1;
908 postStepStartTele(sts);
909
910 try
911 {
912 while (more && !cancelled())
913 {
914 fRowGroupIn.setData(&rgData);
915 fRowGroupIn.getRow(0, &row);
916 uint64_t rowCnt = fRowGroupIn.getRowCount();
917
918 if (rowCnt > 0)
919 {
920 fInRowGroupData.push_back(rgData);
921 uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition);
922 fMemUsage += memAdd;
923
924 if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
925 throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
926
927 for (uint64_t j = 0; j < rowCnt; ++j)
928 {
929 if (i > 0x0000FFFFFFFFFFFFULL || j > 0x000000000000FFFFULL)
930 throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
931
932 fRows.push_back(RowPosition(i, j));
933 row.nextRow();
934 }
935
936 //@bug6065, make StringStore::storeString() thread safe, default to false.
937 rgData.useStoreStringMutex(fUseSSMutex);
938 // For the User Data of UDAnF
939 rgData.useUserDataMutex(fUseUFMutex);
940
941 // window function does not change row count
942 fRowsReturned += rowCnt;
943
944 i++;
945 }
946
947 more = fInputDL->next(fInputIterator, &rgData);
948 }
949 } // try
950 catch (...)
951 {
952 handleException(std::current_exception(),
953 logging::ERR_READ_INPUT_DATALIST,
954 logging::ERR_WF_DATA_SET_TOO_BIG,
955 "WindowFunctionStep::execute()");
956 }
957
958 if (traceOn())
959 dlTimes.setLastReadTime();
960
961 // no need for the window function if aborted or result set is empty.
962 if (cancelled() || fRows.size() == 0)
963 {
964 while (more)
965 more = fInputDL->next(fInputIterator, &rgData);
966
967 fOutputDL->endOfInput();
968
969 sts.msg_type = StepTeleStats::ST_SUMMARY;
970 sts.total_units_of_work = sts.units_of_work_completed = 1;
971 sts.rows = fRowsReturned;
972 postStepSummaryTele(sts);
973
974 if (traceOn())
975 {
976 dlTimes.setEndOfInputTime();
977 printCalTrace();
978 }
979
980 return;
981 }
982
983 // got something to work on
984 try
985 {
986 if (fFunctionCount == 1)
987 {
988 doFunction();
989 }
990 else
991 {
992 if (fTotalThreads > fFunctionCount)
993 fTotalThreads = fFunctionCount;
994
995 fFunctionThreads.clear();
996 fFunctionThreads.reserve(fTotalThreads);
997
998 for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++)
999 fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this)));
1000
1001 // If cancelled, not all threads are started.
1002 jobstepThreadPool.join(fFunctionThreads);
1003 }
1004
1005 if (!(cancelled()))
1006 {
1007 if (fIsSelect)
1008 doPostProcessForSelect();
1009 else
1010 doPostProcessForDml();
1011 }
1012
1013 }
1014 catch (...)
1015 {
1016 handleException(std::current_exception(),
1017 logging::ERR_EXECUTE_WINDOW_FUNCTION,
1018 logging::ERR_WF_DATA_SET_TOO_BIG,
1019 "WindowFunctionStep::execute()");
1020 }
1021
1022 fOutputDL->endOfInput();
1023
1024 sts.msg_type = StepTeleStats::ST_SUMMARY;
1025 sts.total_units_of_work = sts.units_of_work_completed = 1;
1026 sts.rows = fRowsReturned;
1027 postStepSummaryTele(sts);
1028
1029 if (traceOn())
1030 {
1031 dlTimes.setEndOfInputTime();
1032 printCalTrace();
1033 }
1034
1035 return;
1036 }
1037
1038
nextFunctionIndex()1039 uint64_t WindowFunctionStep::nextFunctionIndex()
1040 {
1041 uint64_t idx = atomicInc(&fNextIndex);
1042
1043 // return index in the function array
1044 return --idx;
1045 }
1046
1047
doFunction()1048 void WindowFunctionStep::doFunction()
1049 {
1050 uint64_t i = 0;
1051
1052 try
1053 {
1054 while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled())
1055 {
1056 uint64_t memAdd = fRows.size() * sizeof(RowPosition);
1057 fMemUsage += memAdd;
1058
1059 if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
1060 throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
1061
1062 fFunctions[i]->setCallback(this, i);
1063 (*fFunctions[i].get())();
1064 }
1065 }
1066 catch (...)
1067 {
1068 handleException(std::current_exception(),
1069 logging::ERR_EXECUTE_WINDOW_FUNCTION,
1070 logging::ERR_WF_DATA_SET_TOO_BIG,
1071 "WindowFunctionStep::doFunction()");
1072 }
1073 }
1074
doPostProcessForSelect()1075 void WindowFunctionStep::doPostProcessForSelect()
1076 {
1077 FuncExp* fe = funcexp::FuncExp::instance();
1078 boost::shared_array<int> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
1079 Row rowIn, rowOut;
1080 fRowGroupIn.initRow(&rowIn);
1081 fRowGroupOut.initRow(&rowOut);
1082 RGData rgData;
1083 vector<RowPosition>& rowData = *(fFunctions.back()->fRowData.get());
1084 int64_t rowsLeft = rowData.size();
1085 int64_t rowsInRg = 0;
1086 int64_t rgCapacity = 0;
1087
1088 int64_t begin = fQueryLimitStart;
1089 int64_t count = (fQueryLimitCount == (uint64_t) - 1) ? rowsLeft : fQueryLimitCount;
1090 int64_t end = begin + count;
1091 end = (end < rowsLeft) ? end : rowsLeft;
1092 rowsLeft = (end > begin) ? (end - begin) : 0;
1093
1094 if (fQueryOrderBy.get() != NULL)
1095 sort(rowData.begin(), rowData.size());
1096
1097 for (int64_t i = begin; i < end; i++)
1098 {
1099 if (rgData.rowData.get() == NULL)
1100 {
1101 rgCapacity = ((rowsLeft > 8192) ? 8192 : rowsLeft);
1102 rowsLeft -= rgCapacity;
1103 rgData.reinit(fRowGroupOut, rgCapacity);
1104
1105 fRowGroupOut.setData(&rgData);
1106 fRowGroupOut.resetRowGroup(0);
1107 fRowGroupOut.setDBRoot(0); // not valid dbroot
1108 fRowGroupOut.getRow(0, &rowOut);
1109 rowsInRg = 0;
1110 }
1111
1112 rowIn.setData(getPointer(rowData[i], fRowGroupIn, rowIn));
1113
1114 // evaluate the window function expressions before apply mapping
1115 if (fExpression.size() > 0)
1116 fe->evaluate(rowIn, fExpression);
1117
1118 applyMapping(mapping, rowIn, &rowOut);
1119 rowOut.nextRow();
1120 rowsInRg++;
1121
1122 if (rowsInRg == rgCapacity)
1123 {
1124 fRowGroupOut.setRowCount(rowsInRg);
1125 fOutputDL->insert(rgData);
1126 rgData.clear();
1127 }
1128 }
1129 }
1130
1131
doPostProcessForDml()1132 void WindowFunctionStep::doPostProcessForDml()
1133 {
1134 FuncExp* fe = funcexp::FuncExp::instance();
1135 boost::shared_array<int> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
1136 Row rowIn, rowOut;
1137 fRowGroupIn.initRow(&rowIn);
1138 fRowGroupOut.initRow(&rowOut);
1139
1140 for (vector<RGData>::iterator i = fInRowGroupData.begin();
1141 i < fInRowGroupData.end(); i++)
1142 {
1143 fRowGroupIn.setData(&(*i));
1144 RGData rgData = RGData(fRowGroupIn, fRowGroupIn.getRowCount());
1145 fRowGroupOut.setData(&rgData);
1146 // @bug 5631. reset rowgroup before the data is populated.
1147 fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
1148 fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
1149 fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
1150
1151 fRowGroupIn.getRow(0, &rowIn);
1152 fRowGroupOut.getRow(0, &rowOut);
1153
1154 for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
1155 {
1156 // evaluate the window function expressions before apply mapping
1157 if (fExpression.size() > 0)
1158 fe->evaluate(rowIn, fExpression);
1159
1160 applyMapping(mapping, rowIn, &rowOut);
1161 rowIn.nextRow();
1162 rowOut.nextRow();
1163 }
1164
1165 //fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
1166 //fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
1167
1168 fOutputDL->insert(rgData);
1169 }
1170 }
1171
1172
parseFrameBoundRows(const execplan::WF_Boundary & b,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)1173 boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRows(
1174 const execplan::WF_Boundary& b,
1175 const map<uint64_t, uint64_t>& m,
1176 JobInfo& jobInfo)
1177 {
1178 boost::shared_ptr<FrameBound> fb;
1179
1180 if (b.fFrame == WF_CURRENT_ROW)
1181 {
1182 fb.reset(new FrameBoundRow(WF__CURRENT_ROW));
1183 return fb;
1184 }
1185
1186 ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
1187
1188 if (cc != NULL)
1189 {
1190 Row dummy;
1191 bool isNull = false;
1192 int val = cc->getIntVal(dummy, isNull);
1193
1194 if (val >= 0 && isNull == false)
1195 {
1196 int type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
1197 fb.reset(new FrameBoundConstantRow(type, val));
1198 }
1199 else
1200 {
1201 string str("NULL");
1202
1203 if (!isNull)
1204 {
1205 ostringstream oss;
1206 oss << val;
1207 str = oss.str();
1208 }
1209
1210 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
1211 ERR_WF_BOUND_OUT_OF_RANGE);
1212 }
1213 }
1214 else
1215 {
1216 int type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
1217 uint64_t id = getTupleKey(jobInfo, b.fVal);
1218 uint64_t idx = getColumnIndex(b.fVal, m, jobInfo);
1219 TupleInfo ti = getTupleInfo(id, jobInfo);
1220
1221 switch (ti.dtype)
1222 {
1223 case execplan::CalpontSystemCatalog::TINYINT:
1224 case execplan::CalpontSystemCatalog::SMALLINT:
1225 case execplan::CalpontSystemCatalog::MEDINT:
1226 case execplan::CalpontSystemCatalog::INT:
1227 case execplan::CalpontSystemCatalog::BIGINT:
1228 case execplan::CalpontSystemCatalog::DECIMAL:
1229 {
1230 fb.reset(new FrameBoundExpressionRow<int64_t>(type, id, idx));
1231 break;
1232 }
1233
1234 case execplan::CalpontSystemCatalog::DOUBLE:
1235 case execplan::CalpontSystemCatalog::UDOUBLE:
1236 {
1237 fb.reset(new FrameBoundExpressionRow<double>(type, id, idx));
1238 break;
1239 }
1240
1241 case execplan::CalpontSystemCatalog::FLOAT:
1242 case execplan::CalpontSystemCatalog::UFLOAT:
1243 {
1244 fb.reset(new FrameBoundExpressionRow<float>(type, id, idx));
1245 break;
1246 }
1247
1248 case execplan::CalpontSystemCatalog::UTINYINT:
1249 case execplan::CalpontSystemCatalog::USMALLINT:
1250 case execplan::CalpontSystemCatalog::UMEDINT:
1251 case execplan::CalpontSystemCatalog::UINT:
1252 case execplan::CalpontSystemCatalog::UBIGINT:
1253 case execplan::CalpontSystemCatalog::UDECIMAL:
1254 case execplan::CalpontSystemCatalog::DATE:
1255 case execplan::CalpontSystemCatalog::DATETIME:
1256 case execplan::CalpontSystemCatalog::TIME:
1257 case execplan::CalpontSystemCatalog::TIMESTAMP:
1258 {
1259 fb.reset(new FrameBoundExpressionRow<uint64_t>(type, id, idx));
1260 break;
1261 }
1262
1263 default:
1264 {
1265 string str = windowfunction::colType2String[ti.dtype];
1266 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
1267 ERR_WF_INVALID_BOUND_TYPE);
1268 break;
1269 }
1270 }
1271 }
1272
1273 return fb;
1274 }
1275
1276
parseFrameBoundRange(const execplan::WF_Boundary & b,const map<uint64_t,uint64_t> & m,const vector<SRCP> & o,JobInfo & jobInfo)1277 boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRange(const execplan::WF_Boundary& b,
1278 const map<uint64_t, uint64_t>& m,
1279 const vector<SRCP>& o,
1280 JobInfo& jobInfo)
1281 {
1282 boost::shared_ptr<FrameBound> fb;
1283
1284 if (b.fFrame == WF_CURRENT_ROW)
1285 {
1286 fb.reset(new FrameBoundRange(WF__CURRENT_ROW));
1287 return fb;
1288 }
1289
1290 bool isConstant = false;
1291 bool isNull = false;
1292 Row dummy;
1293 ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
1294
1295 if (cc != NULL)
1296 {
1297 isConstant = true;
1298 double val = cc->getDoubleVal(dummy, isNull);
1299
1300 if (val < 0 || isNull)
1301 {
1302 string str("NULL");
1303
1304 if (!isNull)
1305 {
1306 ostringstream oss;
1307 oss << val;
1308 str = oss.str();
1309 }
1310
1311 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
1312 ERR_WF_BOUND_OUT_OF_RANGE);
1313 }
1314 }
1315
1316 int type = 0;
1317 vector<uint64_t> ids;
1318 vector<int> index;
1319 ids.push_back(getTupleKey(jobInfo, o[0]));
1320 index.push_back(getColumnIndex(o[0], m, jobInfo));
1321
1322 if (isConstant)
1323 {
1324 type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
1325 ids.push_back(-1); // dummy, n/a for constant
1326 index.push_back(-1); // dummy, n/a for constant
1327 }
1328 else
1329 {
1330 type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
1331 ids.push_back(getTupleKey(jobInfo, b.fVal));
1332 index.push_back(getColumnIndex(b.fVal, m, jobInfo));
1333 }
1334
1335 ids.push_back(getTupleKey(jobInfo, b.fBound));
1336 index.push_back(getColumnIndex(b.fBound, m, jobInfo));
1337
1338 FrameBoundRange* fbr = NULL;
1339 TupleInfo ti = getTupleInfo(ids[0], jobInfo);
1340 bool asc = o[0]->asc();
1341 bool nlf = o[0]->nullsFirst();
1342
1343 switch (ti.dtype)
1344 {
1345 case execplan::CalpontSystemCatalog::TINYINT:
1346 case execplan::CalpontSystemCatalog::SMALLINT:
1347 case execplan::CalpontSystemCatalog::MEDINT:
1348 case execplan::CalpontSystemCatalog::INT:
1349 case execplan::CalpontSystemCatalog::BIGINT:
1350 case execplan::CalpontSystemCatalog::DECIMAL:
1351 {
1352 if (isConstant)
1353 {
1354 int64_t v = cc->getIntVal(dummy, isNull);
1355 fbr = new FrameBoundConstantRange<int64_t>(type, asc, nlf, &v);
1356 fbr->isZero((v == 0));
1357 }
1358 else
1359 {
1360 fbr = new FrameBoundExpressionRange<int64_t>(type, asc, nlf);
1361 }
1362
1363 break;
1364 }
1365
1366 case execplan::CalpontSystemCatalog::DOUBLE:
1367 case execplan::CalpontSystemCatalog::UDOUBLE:
1368 {
1369 if (isConstant)
1370 {
1371 double v = cc->getDoubleVal(dummy, isNull);
1372 fbr = new FrameBoundConstantRange<double>(type, asc, nlf, &v);
1373 fbr->isZero((v == 0.0));
1374 }
1375 else
1376 {
1377 fbr = new FrameBoundExpressionRange<double>(type, asc, nlf);
1378 }
1379
1380 break;
1381 }
1382
1383 case execplan::CalpontSystemCatalog::FLOAT:
1384 case execplan::CalpontSystemCatalog::UFLOAT:
1385 {
1386 if (isConstant)
1387 {
1388 float v = cc->getFloatVal(dummy, isNull);
1389 fbr = new FrameBoundConstantRange<float>(type, asc, nlf, &v);
1390 fbr->isZero((v == 0.0));
1391 }
1392 else
1393 {
1394 fbr = new FrameBoundExpressionRange<float>(type, asc, nlf);
1395 }
1396
1397 break;
1398 }
1399
1400 case execplan::CalpontSystemCatalog::UTINYINT:
1401 case execplan::CalpontSystemCatalog::USMALLINT:
1402 case execplan::CalpontSystemCatalog::UMEDINT:
1403 case execplan::CalpontSystemCatalog::UINT:
1404 case execplan::CalpontSystemCatalog::UBIGINT:
1405 case execplan::CalpontSystemCatalog::UDECIMAL:
1406 case execplan::CalpontSystemCatalog::DATE:
1407 case execplan::CalpontSystemCatalog::DATETIME:
1408 case execplan::CalpontSystemCatalog::TIME:
1409 case execplan::CalpontSystemCatalog::TIMESTAMP:
1410 {
1411 if (isConstant)
1412 {
1413 uint64_t v = cc->getUintVal(dummy, isNull);
1414 fbr = new FrameBoundConstantRange<uint64_t>(type, asc, nlf, &v);
1415 fbr->isZero((v == 0));
1416 }
1417 else
1418 {
1419 fbr = new FrameBoundExpressionRange<uint64_t>(type, asc, nlf);
1420 }
1421
1422 break;
1423 }
1424
1425 default:
1426 {
1427 string str = windowfunction::colType2String[ti.dtype];
1428 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
1429 ERR_WF_INVALID_BOUND_TYPE);
1430 break;
1431 }
1432 }
1433
1434 fbr->setTupleId(ids);
1435 fbr->setIndex(index);
1436 fb.reset(fbr);
1437
1438 return fb;
1439 }
1440
1441
parseFrameBound(const execplan::WF_Boundary & b,const map<uint64_t,uint64_t> & m,const vector<SRCP> & o,const boost::shared_ptr<EqualCompData> & p,JobInfo & j,bool rows,bool s)1442 boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBound(const execplan::WF_Boundary& b,
1443 const map<uint64_t, uint64_t>& m,
1444 const vector<SRCP>& o,
1445 const boost::shared_ptr<EqualCompData>& p,
1446 JobInfo& j,
1447 bool rows,
1448 bool s)
1449 {
1450 boost::shared_ptr<FrameBound> fb;
1451
1452 switch (b.fFrame)
1453 {
1454 case WF_UNBOUNDED_PRECEDING:
1455 {
1456 fb.reset(new FrameBound(WF__UNBOUNDED_PRECEDING));
1457 break;
1458 }
1459
1460 case WF_UNBOUNDED_FOLLOWING:
1461 {
1462 fb.reset(new FrameBound(WF__UNBOUNDED_FOLLOWING));
1463 break;
1464 }
1465
1466 case WF_CURRENT_ROW:
1467 case WF_PRECEDING:
1468 case WF_FOLLOWING:
1469 {
1470 if (rows)
1471 {
1472 fb = parseFrameBoundRows(b, m, j);
1473 }
1474 else
1475 {
1476 fb = parseFrameBoundRange(b, m, o, j);
1477 }
1478
1479 break;
1480 }
1481
1482 default: // unknown
1483 {
1484 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_UNKNOWN_BOUND, b.fFrame),
1485 ERR_WF_UNKNOWN_BOUND);
1486 break;
1487 }
1488 }
1489
1490 fb->peer(p);
1491 fb->start(s);
1492
1493 return fb;
1494 }
1495
1496
updateWindowCols(ReturnedColumn * rc,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)1497 void WindowFunctionStep::updateWindowCols(ReturnedColumn* rc,
1498 const map<uint64_t, uint64_t>& m,
1499 JobInfo& jobInfo)
1500 {
1501 if (rc == NULL)
1502 return;
1503
1504 ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(rc);
1505 FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
1506 SimpleFilter* sf = dynamic_cast<SimpleFilter*>(rc);
1507 WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(rc);
1508
1509 if (wc)
1510 {
1511 uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
1512 map<uint64_t, uint64_t>::const_iterator j = m.find(key);
1513
1514 if (j == m.end())
1515 {
1516 string name = jobInfo.keyInfo->tupleKeyToName[key];
1517 cerr << name << " is not in tuple, key=" << key << endl;
1518 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
1519 ERR_WF_COLUMN_MISSING);
1520 }
1521
1522 wc->inputIndex(j->second);
1523 }
1524 else if (ac)
1525 {
1526 updateWindowCols(ac->expression(), m, jobInfo);
1527 }
1528 else if (fc)
1529 {
1530 funcexp::FunctionParm parms = fc->functionParms();
1531
1532 for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
1533 updateWindowCols(i->get(), m, jobInfo);
1534 }
1535 else if (sf)
1536 {
1537 updateWindowCols(sf->lhs(), m, jobInfo);
1538 updateWindowCols(sf->rhs(), m, jobInfo);
1539 }
1540 }
1541
1542
updateWindowCols(ParseTree * pt,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)1543 void WindowFunctionStep::updateWindowCols(ParseTree* pt,
1544 const map<uint64_t, uint64_t>& m,
1545 JobInfo& jobInfo)
1546 {
1547 if (pt == NULL)
1548 return;
1549
1550 updateWindowCols(pt->left(), m, jobInfo);
1551 updateWindowCols(pt->right(), m, jobInfo);
1552
1553 TreeNode* tn = pt->data();
1554 ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(tn);
1555 FunctionColumn* fc = dynamic_cast<FunctionColumn*>(tn);
1556 SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
1557 WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(tn);
1558
1559 if (wc)
1560 {
1561 uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
1562 map<uint64_t, uint64_t>::const_iterator j = m.find(key);
1563
1564 if (j == m.end())
1565 {
1566 string name = jobInfo.keyInfo->tupleKeyToName[key];
1567 cerr << name << " is not in tuple, key=" << key << endl;
1568 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
1569 ERR_WF_COLUMN_MISSING);
1570 }
1571
1572 wc->inputIndex(j->second);
1573 }
1574 else if (ac)
1575 {
1576 updateWindowCols(ac->expression(), m, jobInfo);
1577 }
1578 else if (fc)
1579 {
1580 funcexp::FunctionParm parms = fc->functionParms();
1581
1582 for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
1583 updateWindowCols(i->get(), m, jobInfo);
1584 }
1585 else if (sf)
1586 {
1587 updateWindowCols(sf->lhs(), m, jobInfo);
1588 updateWindowCols(sf->rhs(), m, jobInfo);
1589 }
1590 }
1591
1592
sort(std::vector<RowPosition>::iterator v,uint64_t n)1593 void WindowFunctionStep::sort(std::vector<RowPosition>::iterator v, uint64_t n)
1594 {
1595 // recursive function termination condition.
1596 if (n < 2 || cancelled())
1597 return;
1598
1599 RowPosition p = *(v + n / 2); // pivot value
1600 vector<RowPosition>::iterator l = v; // low address
1601 vector<RowPosition>::iterator h = v + (n - 1); // high address
1602
1603 while (l <= h && !cancelled())
1604 {
1605 // Can use while here, but need check boundary and cancel status.
1606 if (fQueryOrderBy->operator()(getPointer(*l), getPointer(p)))
1607 {
1608 l++;
1609 }
1610 else if (fQueryOrderBy->operator()(getPointer(p), getPointer(*h)))
1611 {
1612 h--;
1613 }
1614 else
1615 {
1616 RowPosition t = *l; // temp value for swap
1617 *l++ = *h;
1618 *h-- = t;
1619 }
1620 }
1621
1622 sort(v, std::distance(v, h) + 1);
1623 sort(l, std::distance(l, v) + n);
1624 }
1625
1626
printCalTrace()1627 void WindowFunctionStep::printCalTrace()
1628 {
1629 time_t t = time (0);
1630 char timeString[50];
1631 ctime_r (&t, timeString);
1632 timeString[strlen (timeString ) - 1] = '\0';
1633 ostringstream logStr;
1634 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
1635 << "; total rows returned-" << fRowsReturned << endl
1636 << "\t1st read " << dlTimes.FirstReadTimeString()
1637 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
1638 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
1639 << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
1640 << "\tJob completion status " << status() << endl;
1641 logEnd(logStr.str().c_str());
1642 fExtendedInfo += logStr.str();
1643 formatMiniStats();
1644 }
1645
1646
formatMiniStats()1647 void WindowFunctionStep::formatMiniStats()
1648 {
1649 ostringstream oss;
1650 oss << "WFS "
1651 << "UM "
1652 << "- "
1653 << "- "
1654 << "- "
1655 << "- "
1656 << "- "
1657 << "- "
1658 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1659 << fRowsReturned << " ";
1660 fMiniInfo += oss.str();
1661 }
1662
1663
1664 } //namespace
1665 // vim:ts=4 sw=4:
1666
1667