1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (c) 2016-2020 MariaDB
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: windowfunctionstep.cpp 9681 2013-07-11 22:58:05Z xlou $
20 
21 
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 using namespace std;
27 
28 #include <boost/algorithm/string.hpp>  //  to_upper_copy
29 #include <boost/shared_ptr.hpp>
30 #include <boost/shared_array.hpp>
31 #include <boost/thread.hpp>
32 #include <boost/uuid/uuid_io.hpp>
33 using namespace boost;
34 
35 #include "atomicops.h"
36 using namespace atomicops;
37 
38 #include "loggingid.h"
39 #include "errorcodes.h"
40 #include "idberrorinfo.h"
41 using namespace logging;
42 
43 #include "configcpp.h"
44 using namespace config;
45 
46 #include "calpontselectexecutionplan.h"
47 #include "calpontsystemcatalog.h"
48 #include "aggregatecolumn.h"
49 #include "arithmeticcolumn.h"
50 #include "constantcolumn.h"
51 #include "functioncolumn.h"
52 #include "pseudocolumn.h"
53 #include "simplefilter.h"
54 #include "windowfunctioncolumn.h"
55 using namespace execplan;
56 
57 #include "../../utils/windowfunction/windowfunction.h"
58 #include "../../utils/windowfunction/windowfunctiontype.h"
59 #include "../../utils/windowfunction/framebound.h"
60 #include "../../utils/windowfunction/frameboundrange.h"
61 #include "../../utils/windowfunction/frameboundrow.h"
62 #include "../../utils/windowfunction/windowframe.h"
63 using namespace windowfunction;
64 
65 #include "rowgroup.h"
66 using namespace rowgroup;
67 
68 using namespace ordering;
69 
70 #include "funcexp.h"
71 using namespace funcexp;
72 
73 #include "querytele.h"
74 using namespace querytele;
75 
76 #include "jlf_common.h"
77 #include "jobstep.h"
78 #include "windowfunctionstep.h"
79 using namespace joblist;
80 
81 #include "checks.h"
82 
83 
84 namespace
85 {
86 
87 
getColumnIndex(const SRCP & c,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)88 uint64_t getColumnIndex(const SRCP& c, const map<uint64_t, uint64_t>& m, JobInfo& jobInfo)
89 {
90     uint64_t key = getTupleKey(jobInfo, c, true);
91     const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(c.get());
92 
93     if (sc != NULL && !sc->schemaName().empty())
94     {
95         // special handling for dictionary
96         CalpontSystemCatalog::ColType ct = sc->colType();
97 
98 //XXX use this before connector sets colType in sc correctly.
99 //    type of pseudo column is set by connector
100         if (!(dynamic_cast<const PseudoColumn*>(sc)))
101         {
102             ct = jobInfo.csc->colType(sc->oid());
103             ct.charsetNumber =sc->colType().charsetNumber;
104         }
105 
106 //X
107         CalpontSystemCatalog::OID dictOid = isDictCol(ct);
108         string alias(extractTableAlias(sc));
109 
110         if (dictOid > 0)
111         {
112             TupleInfo ti =
113                 setTupleInfo(ct, dictOid, jobInfo, tableOid(sc, jobInfo.csc), sc, alias);
114             key = ti.key;
115         }
116     }
117 
118     map<uint64_t, uint64_t>::const_iterator j = m.find(key);
119 
120     if (j == m.end())
121     {
122         string name = jobInfo.keyInfo->tupleKeyToName[key];
123         cerr << name << " is not in tuple, key=" << key << endl;
124         throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
125                         ERR_WF_COLUMN_MISSING);
126     }
127 
128     return j->second;
129 }
130 
131 
132 }
133 
134 
135 namespace joblist
136 {
137 
138 
WindowFunctionStep(const JobInfo & jobInfo)139 WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) :
140     JobStep(jobInfo),
141     fRunner(0),
142     fCatalog(jobInfo.csc),
143     fRowsReturned(0),
144     fEndOfResult(false),
145     fIsSelect(true),
146     fUseSSMutex(false),
147     fUseUFMutex(false),
148     fInputDL(NULL),
149     fOutputDL(NULL),
150     fInputIterator(-1),
151     fOutputIterator(-1),
152     fFunctionCount(0),
153     fTotalThreads(1),
154     fNextIndex(0),
155     fMemUsage(0),
156     fRm(jobInfo.rm),
157     fSessionMemLimit(jobInfo.umMemLimit)
158 {
159     fTotalThreads = fRm->windowFunctionThreads();
160     fExtendedInfo = "WFS: ";
161     fQtc.stepParms().stepType = StepTeleStats::T_WFS;
162 }
163 
164 
~WindowFunctionStep()165 WindowFunctionStep::~WindowFunctionStep()
166 {
167     if (fMemUsage > 0)
168         fRm->returnMemory(fMemUsage, fSessionMemLimit);
169 }
170 
171 
run()172 void WindowFunctionStep::run()
173 {
174     if (fInputJobStepAssociation.outSize() == 0)
175         throw logic_error("No input data list for window function step.");
176 
177     fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
178 
179     if (fInputDL == NULL)
180         throw logic_error("Input is not a RowGroup data list in window function step.");
181 
182     fInputIterator = fInputDL->getIterator();
183 
184     if (fOutputJobStepAssociation.outSize() == 0)
185         throw logic_error("No output data list for window function step.");
186 
187     fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
188 
189     if (fOutputDL == NULL)
190         throw logic_error("Output of window function step is not a RowGroup data list.");
191 
192     if (fDelivery == true)
193     {
194         fOutputIterator = fOutputDL->getIterator();
195     }
196 
197     fRunner = jobstepThreadPool.invoke(Runner(this));
198 }
199 
200 
join()201 void WindowFunctionStep::join()
202 {
203     if (fRunner)
204         jobstepThreadPool.join(fRunner);
205 }
206 
207 
nextBand(messageqcpp::ByteStream & bs)208 uint32_t WindowFunctionStep::nextBand(messageqcpp::ByteStream& bs)
209 {
210     RGData rgDataOut;
211     bool more = false;
212     uint32_t rowCount = 0;
213 
214     try
215     {
216         bs.restart();
217 
218         more = fOutputDL->next(fOutputIterator, &rgDataOut);
219 
220         if (more && !cancelled())
221         {
222             fRowGroupDelivered.setData(&rgDataOut);
223             fRowGroupDelivered.serializeRGData(bs);
224             rowCount = fRowGroupDelivered.getRowCount();
225         }
226         else
227         {
228             while (more)
229                 more = fOutputDL->next(fOutputIterator, &rgDataOut);
230 
231             fEndOfResult = true;
232         }
233     }
234     catch (...)
235     {
236         handleException(std::current_exception(),
237                         logging::ERR_IN_DELIVERY,
238                         logging::ERR_WF_DATA_SET_TOO_BIG,
239                         "WindowFunctionStep::nextBand()");
240         while (more)
241             more = fOutputDL->next(fOutputIterator, &rgDataOut);
242         fEndOfResult = true;
243     }
244 
245     if (fEndOfResult)
246     {
247         // send an empty / error band
248         rgDataOut.reinit(fRowGroupDelivered, 0);
249         fRowGroupDelivered.setData(&rgDataOut);
250         fRowGroupDelivered.resetRowGroup(0);
251         fRowGroupDelivered.setStatus(status());
252         fRowGroupDelivered.serializeRGData(bs);
253     }
254 
255     return rowCount;
256 }
257 
258 
setOutputRowGroup(const RowGroup & rg)259 void WindowFunctionStep::setOutputRowGroup(const RowGroup& rg)
260 {
261     idbassert(0);
262 }
263 
264 
getOutputRowGroup() const265 const RowGroup& WindowFunctionStep::getOutputRowGroup() const
266 {
267     return fRowGroupOut;
268 }
269 
270 
getDeliveredRowGroup() const271 const RowGroup& WindowFunctionStep::getDeliveredRowGroup() const
272 {
273     return fRowGroupDelivered;
274 }
275 
276 
deliverStringTableRowGroup(bool b)277 void WindowFunctionStep::deliverStringTableRowGroup(bool b)
278 {
279     fRowGroupOut.setUseStringTable(b);
280     fRowGroupDelivered.setUseStringTable(b);
281 }
282 
283 
deliverStringTableRowGroup() const284 bool WindowFunctionStep::deliverStringTableRowGroup() const
285 {
286     idbassert(fRowGroupOut.usesStringTable() == fRowGroupDelivered.usesStringTable());
287     return fRowGroupDelivered.usesStringTable();
288 }
289 
290 
toString() const291 const string WindowFunctionStep::toString() const
292 {
293     ostringstream oss;
294     oss << "WindowFunctionStep   ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
295 
296     oss << " in:";
297 
298     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
299         oss << fInputJobStepAssociation.outAt(i);
300 
301     if (fOutputJobStepAssociation.outSize() > 0)
302     {
303         oss << " out:";
304 
305         for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
306             oss << fOutputJobStepAssociation.outAt(i);
307     }
308 
309     return oss.str();
310 }
311 
AddSimplColumn(const vector<SimpleColumn * > & scs,JobInfo & jobInfo)312 void WindowFunctionStep::AddSimplColumn(const vector<SimpleColumn*>& scs,
313                                      JobInfo& jobInfo)
314 {
315     // append the simple columns if not already projected
316     set<UniqId> scProjected;
317 
318     for (RetColsVector::iterator i  = jobInfo.projectionCols.begin();
319             i != jobInfo.projectionCols.end();
320             i++)
321     {
322         SimpleColumn* sc = dynamic_cast<SimpleColumn*>(i->get());
323 
324         if (sc != NULL)
325         {
326             if (sc->schemaName().empty())
327                 sc->oid(joblist::tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
328 
329             scProjected.insert(UniqId(sc));
330         }
331     }
332 
333     for (vector<SimpleColumn*>::const_iterator i = scs.begin(); i != scs.end(); i++)
334     {
335         if (scProjected.find(UniqId(*i)) == scProjected.end())
336         {
337             jobInfo.windowDels.push_back(SRCP((*i)->clone()));
338 // MCOL-3343 Enable this if we decide to allow Window Functions to run with
339 // aggregates with no group by. MariaDB allows this. Nobody else in the world does.
340 // There will be more work to get it to function if we try this.
341 //            jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
342             scProjected.insert(UniqId(*i));
343         }
344     }
345 }
346 
checkWindowFunction(CalpontSelectExecutionPlan * csep,JobInfo & jobInfo)347 void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
348 {
349     // window functions in select clause, selected or in expression
350     jobInfo.windowDels = jobInfo.deliveredCols;
351 
352     for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
353     {
354         const vector<WindowFunctionColumn*>& wcl = (*i)->windowfunctionColumnList();
355         RetColsVector wcList;
356 
357         for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
358             wcList.push_back(SRCP((*j)->clone()));
359 
360         if (!wcList.empty())
361         {
362             jobInfo.windowExps.push_back(*i);
363             jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
364         }
365 
366         if (dynamic_cast<WindowFunctionColumn*>(i->get()) != NULL)
367         {
368             jobInfo.windowCols.push_back(*i);
369             jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
370         }
371         else if (!wcList.empty())
372         {
373             jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end());
374 
375             for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
376             {
377                 jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
378             }
379         }
380     }
381 
382     // window functions in order by clause
383     const CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols();
384     RetColsVector wcInOrderby;
385 
386     for (uint64_t i = 0; i < orderByCols.size(); i++)
387     {
388         if (orderByCols[i]->orderPos() == (uint64_t)(-1))
389         {
390             WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get());
391             const vector<WindowFunctionColumn*>& wcl = orderByCols[i]->windowfunctionColumnList();
392             RetColsVector wcList;
393 
394             for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
395                 wcList.push_back(SRCP((*j)->clone()));
396 
397             if (wc == NULL && wcList.empty())
398                 continue;
399 
400             // an window function or expression of window functions
401             wcInOrderby.push_back(orderByCols[i]);
402 
403             if (!wcList.empty())
404             {
405                 jobInfo.windowExps.push_back(orderByCols[i]);
406                 jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
407             }
408 
409             if (dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get()) != NULL)
410             {
411                 jobInfo.windowCols.push_back(orderByCols[i]);
412                 jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
413             }
414             else if (!wcList.empty())
415             {
416                 jobInfo.windowCols.insert(
417                     jobInfo.windowCols.end(), wcList.begin(), wcList.end());
418 
419                 for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
420                 {
421                     jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
422                 }
423             }
424         }
425     }
426 
427     // no window function involved in the query
428     if (jobInfo.windowCols.empty())
429         return;
430 
431     // Add in the non-window side of arithmetic columns and functions
432     for (uint64_t i = 0; i < jobInfo.windowExps.size(); i++)
433     {
434         const ArithmeticColumn* ac =
435             dynamic_cast<const ArithmeticColumn*>(jobInfo.windowExps[i].get());
436         const FunctionColumn* fc =
437             dynamic_cast<const FunctionColumn*>(jobInfo.windowExps[i].get());
438 
439         if (ac != NULL && ac->windowfunctionColumnList().size() > 0)
440         {
441             AddSimplColumn(ac->simpleColumnList(), jobInfo);
442         }
443         else if (fc != NULL && fc->windowfunctionColumnList().size() > 0)
444         {
445             AddSimplColumn(fc->simpleColumnList(), jobInfo);
446         }
447     }
448     // reconstruct the delivered column list with auxiliary columns
449     set<uint64_t> colSet;
450     jobInfo.deliveredCols.resize(0);
451 
452     for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
453     {
454         jobInfo.deliveredCols.push_back(*i);
455         uint64_t key = getTupleKey(jobInfo, *i, true);
456 
457         // TODO: remove duplicates in select clause
458         colSet.insert(key);
459     }
460 
461     // add window columns in orderby
462     for (RetColsVector::iterator i = wcInOrderby.begin(); i < wcInOrderby.end(); i++)
463     {
464         jobInfo.deliveredCols.push_back(*i);
465         uint64_t key = getTupleKey(jobInfo, *i, true);
466         colSet.insert(key);
467     }
468 
469     // MCOL-3435 We haven't yet checked for aggregate, but we need to know
470     bool hasAggregation = false;
471     for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++)
472     {
473         if (dynamic_cast<AggregateColumn*>(jobInfo.deliveredCols[i].get()) != NULL)
474         {
475             hasAggregation = true;
476             break;
477         }
478     }
479 
480     // add non-duplicate auxiliary columns
481     RetColsVector colsInAf;
482 
483     for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
484     {
485         uint64_t key = getTupleKey(jobInfo, *i, true);
486 
487         if (colSet.find(key) == colSet.end())
488             jobInfo.deliveredCols.push_back(*i);
489 
490         RetColsVector columns = dynamic_cast<WindowFunctionColumn*>(i->get())->getColumnList();
491 
492         for (RetColsVector::iterator j = columns.begin(); j < columns.end(); j++)
493         {
494             if (dynamic_cast<ConstantColumn*>(j->get()) != NULL)
495                 continue;
496 
497             key = getTupleKey(jobInfo, *j, true);
498 
499             if (colSet.find(key) == colSet.end())
500             {
501                 jobInfo.deliveredCols.push_back(*j);
502                 // MCOL-3435 Allow Window Functions to run with aggregates with
503                 // no group by by inserting a group by for window parameters.
504                 if (hasAggregation)
505                 {
506                     // If an argument is an AggregateColumn, don't group by it.
507                     if (dynamic_cast<AggregateColumn*>(j->get()) == NULL)
508                     {
509                         bool bFound = false;
510                         for (std::vector<SRCP>::iterator igpc = csep->groupByCols().begin();
511                                                          igpc < csep->groupByCols().end();
512                                                          ++igpc)
513                         {
514                             if ((*igpc)->alias() == (*j)->alias())
515                             {
516                                 bFound = true;
517                                 break;
518                             }
519                         }
520                         if (!bFound)
521                         {
522                             csep->groupByCols().push_back(*j);
523                         }
524                     }
525                 }
526             }
527 
528             colSet.insert(key);
529         }
530     }
531 
532     // for handling order by and limit in outer query
533     jobInfo.wfqLimitStart = csep->limitStart();
534     jobInfo.wfqLimitCount = csep->limitNum();
535     csep->limitStart(0);
536     csep->limitNum(-1);
537 
538     if (csep->orderByCols().size() > 0)
539     {
540         jobInfo.wfqOrderby = csep->orderByCols();
541         csep->orderByCols().clear();
542 
543         // add order by columns
544         for (RetColsVector::iterator i = jobInfo.wfqOrderby.begin(); i < jobInfo.wfqOrderby.end(); i++)
545         {
546             if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
547                 continue;
548 
549             uint64_t key = getTupleKey(jobInfo, *i, true);
550 
551             if (colSet.find(key) == colSet.end())
552                 jobInfo.deliveredCols.push_back(*i);
553 
554             colSet.insert(key);
555         }
556     }
557 }
558 
559 
makeWindowFunctionStep(SJSTEP & step,JobInfo & jobInfo)560 SJSTEP WindowFunctionStep::makeWindowFunctionStep(SJSTEP& step, JobInfo& jobInfo)
561 {
562     // create a window function step
563     WindowFunctionStep* ws = new WindowFunctionStep(jobInfo);
564 
565     // connect to the feeding step
566     JobStepAssociation jsa;
567     AnyDataListSPtr spdl(new AnyDataList());
568     RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
569     dl->OID(execplan::CNX_VTABLE_ID);
570     spdl->rowGroupDL(dl);
571     jsa.outAdd(spdl);
572     ws->inputAssociation(jsa);
573     ws->stepId(step->stepId() + 1);
574     step->outputAssociation(jsa);
575 
576     AnyDataListSPtr spdlOut(new AnyDataList());
577     RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
578     dlOut->OID(CNX_VTABLE_ID);
579     spdlOut->rowGroupDL(dlOut);
580     JobStepAssociation jsaOut;
581     jsaOut.outAdd(spdlOut);
582     ws->outputAssociation(jsaOut);
583 
584     // configure the rowgroups and index mapping
585     TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(step.get());
586     idbassert(ds != NULL);
587     ws->initialize(ds->getDeliveredRowGroup(), jobInfo);
588 
589     // restore the original delivery coloumns
590     jobInfo.deliveredCols = jobInfo.windowDels;
591     jobInfo.nonConstDelCols.clear();
592 
593     for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
594     {
595         if (NULL == dynamic_cast<const ConstantColumn*>(i->get()))
596             jobInfo.nonConstDelCols.push_back(*i);
597     }
598 
599     return SJSTEP(ws);
600 }
601 
602 
initialize(const RowGroup & rg,JobInfo & jobInfo)603 void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
604 {
605     if (jobInfo.trace) cout << "Input to WindowFunctionStep: " << rg.toString() << endl;
606 
607     // query type decides the output by dbroot or partition
608     // @bug 5631. Insert select should be treated as select
609     fIsSelect = (jobInfo.queryType == "SELECT" ||
610                  jobInfo.queryType == "INSERT_SELECT");
611 
612     // input row meta data
613     fRowGroupIn = rg;
614     fRowGroupIn.initRow(&fRowIn);
615 
616     // make an input map(id, index)
617     map<uint64_t, uint64_t> colIndexMap;
618     uint64_t colCntIn = rg.getColumnCount();
619     const vector<uint32_t>& pos = rg.getOffsets();
620     const vector<uint32_t>& oids = rg.getOIDs();
621     const vector<uint32_t>& keys = rg.getKeys();
622     const vector<CalpontSystemCatalog::ColDataType>& types = rg.getColTypes();
623     const vector<uint32_t>& csNums = rg.getCharsetNumbers();
624     const vector<uint32_t>& scales = rg.getScale();
625     const vector<uint32_t>& precisions = rg.getPrecision();
626 
627     for (uint64_t i = 0; i < colCntIn; i++)
628         colIndexMap.insert(make_pair(keys[i], i));
629 
630     // @bug6065, window functions that will update string table
631     int64_t wfsUpdateStringTable = 0;
632     int64_t wfsUserFunctionCount = 0;
633 
634     for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
635     {
636         bool isUDAF = false;
637         // window function type
638         WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
639         uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo);    // result index
640         // @bug6065, window functions that will update string table
641         {
642             CalpontSystemCatalog::ColType rt = wc->resultType();
643 
644             if ((types[ridx] == CalpontSystemCatalog::CHAR ||
645                     types[ridx] == CalpontSystemCatalog::VARCHAR ||
646                     types[ridx] == CalpontSystemCatalog::TEXT ||
647                     types[ridx] == CalpontSystemCatalog::VARBINARY ||
648                     types[ridx] == CalpontSystemCatalog::BLOB) &&
649                     rg.getColumnWidth(ridx) >= jobInfo.stringTableThreshold)
650             {
651                 ++wfsUpdateStringTable;
652             }
653         }
654 
655 //		if (boost::iequals(wc->functionName(),"UDAF_FUNC")
656         if (wc->functionName() == "UDAF_FUNC")
657         {
658             isUDAF = true;
659             ++wfsUserFunctionCount;
660         }
661 
662         vector<int64_t> fields;
663         fields.push_back(ridx);  // result
664         const RetColsVector& parms = wc->functionParms();
665 
666         for (uint64_t i = 0; i < parms.size(); i++)                  // arguments
667         {
668             // skip constant column
669             if (dynamic_cast<const ConstantColumn*>(parms[i].get()) == NULL)
670                 fields.push_back(getColumnIndex(parms[i], colIndexMap, jobInfo));
671             else
672                 fields.push_back(-1);
673         }
674 
675         // partition & order by
676         const RetColsVector& partitions = wc->partitions();
677         vector<uint64_t> eqIdx;
678         vector<uint64_t> peerIdx;
679         vector<IdbSortSpec> sorts;
680 
681         for (uint64_t i = 0; i < partitions.size(); i++)
682         {
683             // skip constant column
684             if (dynamic_cast<const ConstantColumn*>(partitions[i].get()) != NULL)
685                 continue;
686 
687             // get column index
688             uint64_t idx = getColumnIndex(partitions[i], colIndexMap, jobInfo);
689             eqIdx.push_back(idx);
690             sorts.push_back(IdbSortSpec(idx, partitions[i]->asc(), partitions[i]->nullsFirst()));
691         }
692 
693         const RetColsVector& orders = wc->orderBy().fOrders;
694 
695         for (uint64_t i = 0; i < orders.size(); i++)
696         {
697             // skip constant column
698             if (dynamic_cast<const ConstantColumn*>(orders[i].get()) != NULL)
699                 continue;
700 
701             // get column index
702             uint64_t idx = getColumnIndex(orders[i], colIndexMap, jobInfo);
703             peerIdx.push_back(idx);
704             sorts.push_back(IdbSortSpec(idx, orders[i]->asc(), orders[i]->nullsFirst()));
705         }
706 
707         // functors for sorting
708         boost::shared_ptr<EqualCompData> parts(new EqualCompData(eqIdx, rg));
709         boost::shared_ptr<OrderByData> orderbys(new OrderByData(sorts, rg));
710         boost::shared_ptr<EqualCompData> peers(new EqualCompData(peerIdx, rg));
711 
712         // column type for functor templates
713         int ct = 0;
714 
715         if (isUDAF)
716         {
717             ct = wc->getUDAFContext().getResultType();
718         }
719         // make sure index is in range
720         else if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
721             ct = types[fields[1]];
722 
723         // workaround for functions using "within group (order by)" syntax
724         string fn = boost::to_upper_copy(wc->functionName());
725 
726         if ( (fn == "MEDIAN" || fn == "PERCENTILE_CONT" || fn == "PERCENTILE_DISC") &&
727                 utils::is_nonnegative(peerIdx[0]) && peerIdx[0] < types.size() )
728             ct = types[peerIdx[0]];
729 
730         // create the functor based on function name
731         boost::shared_ptr<WindowFunctionType> func =
732             WindowFunctionType::makeWindowFunction(fn, ct, wc);
733 
734         // parse parms after peer and fields are set
735         // functions may need to set order column index
736         func->peer(peers);
737         func->fieldIndex(fields);
738         func->parseParms(parms);
739 
740         // window frame
741         const WF_Frame& frame = wc->orderBy().fFrame;
742         int frameUnit = (frame.fIsRange) ? WF__FRAME_RANGE : WF__FRAME_ROWS;
743 
744         if (frame.fStart.fFrame == WF_UNBOUNDED_PRECEDING &&
745                 frame.fEnd.fFrame == WF_UNBOUNDED_FOLLOWING)
746             frameUnit = WF__FRAME_ROWS;
747 
748         boost::shared_ptr<FrameBound> upper = parseFrameBound(
749                 frame.fStart, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, true);
750         boost::shared_ptr<FrameBound> lower = parseFrameBound(
751                 frame.fEnd, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, false);
752         boost::shared_ptr<WindowFrame> windows(new WindowFrame(frameUnit, upper, lower));
753         func->frameUnit(frameUnit);
754 
755         // add to the function list
756         fFunctions.push_back(boost::shared_ptr<WindowFunction>(
757                                  new WindowFunction(func, parts, orderbys, windows, rg, fRowIn)));
758         fFunctionCount++;
759     }
760 
761     // initialize window function expresssions
762     fExpression = jobInfo.windowExps;
763 
764     for (RetColsVector::iterator i = fExpression.begin(); i < fExpression.end(); i++)
765     {
766         // output index
767         (*i)->outputIndex(getColumnIndex(*i, colIndexMap, jobInfo));
768 
769         // map the input indices
770         const vector<SimpleColumn*>& scols = (*i)->simpleColumnList();
771 
772         for (vector<SimpleColumn*>::const_iterator j = scols.begin(); j != scols.end(); j++)
773         {
774             uint64_t key = getTupleKey(jobInfo, *j);
775             CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*j)->colType());
776 
777             if (dictOid > 0)
778             {
779                 key = jobInfo.keyInfo->dictKeyMap[key];
780             }
781 
782             map<uint64_t, uint64_t>::iterator k = colIndexMap.find(key);
783 
784             if (k != colIndexMap.end())
785             {
786                 (*j)->inputIndex(k->second);
787             }
788             else
789             {
790                 string name = jobInfo.keyInfo->tupleKeyToName[key];
791                 cerr << name << " is not in tuple, key=" << key << endl;
792                 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
793                                 ERR_WF_COLUMN_MISSING);
794             }
795         }
796 
797         ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>((*i).get());
798         FunctionColumn*   fc = dynamic_cast<FunctionColumn*>((*i).get());
799 
800         if (ac != NULL)
801         {
802             updateWindowCols(ac->expression(), colIndexMap, jobInfo);
803         }
804         else if (fc != NULL)
805         {
806 //			RetColsVector wcList = fc->windowfunctionColumnList();
807 //			for (RetColsVector::iterator j = wcList.begin(); j != wcList.end(); j++)
808 //				(*j)->inputIndex(getColumnIndex(*j, colIndexMap, jobInfo));
809             funcexp::FunctionParm parms = fc->functionParms();
810 
811             for (vector<execplan::SPTP>::iterator j = parms.begin(); j < parms.end(); j++)
812                 updateWindowCols(j->get(), colIndexMap, jobInfo);
813         }
814     }
815 
816     // order by part
817     if (jobInfo.wfqOrderby.size() > 0)
818     {
819         // query order by
820         vector<uint64_t> eqIdx;
821         vector<IdbSortSpec> sorts;
822         const RetColsVector& orderby = jobInfo.wfqOrderby;
823 
824         for (uint64_t i = 0; i < orderby.size(); i++)
825         {
826             // skip constant column
827             if (dynamic_cast<const ConstantColumn*>(orderby[i].get()) != NULL)
828                 continue;
829 
830             // get column index
831             uint64_t idx = getColumnIndex(orderby[i], colIndexMap, jobInfo);
832             sorts.push_back(IdbSortSpec(idx, orderby[i]->asc(), orderby[i]->nullsFirst()));
833         }
834 
835         fQueryOrderBy.reset(new OrderByData(sorts, rg));
836     }
837 
838     // limit part
839     fQueryLimitStart = jobInfo.wfqLimitStart;
840     fQueryLimitCount = jobInfo.wfqLimitCount;
841 
842     // fix the delivered rowgroup data
843     vector<uint64_t> delColIdx;
844 
845     for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
846     {
847         // find the none constantant columns in the deliver
848         // leave constants to annexstep for now.
849         if (dynamic_cast<const ConstantColumn*>((*i).get()) != NULL)
850             continue;
851 
852         delColIdx.push_back(getColumnIndex(*i, colIndexMap, jobInfo));
853     }
854 
855     size_t retColCount = delColIdx.size();
856     vector<uint32_t> pos1;
857     vector<uint32_t> oids1;
858     vector<uint32_t> keys1;
859     vector<uint32_t> scales1;
860     vector<uint32_t> precisions1;
861     vector<CalpontSystemCatalog::ColDataType> types1;
862     vector<uint32_t> csNums1;
863     pos1.push_back(2);
864 
865     for (size_t i = 0; i < retColCount; i++)
866     {
867         size_t j = delColIdx[i];
868         pos1.push_back(pos1[i] + (pos[j + 1] - pos[j]));
869         oids1.push_back(oids[j]);
870         keys1.push_back(keys[j]);
871         scales1.push_back(scales[j]);
872         precisions1.push_back(precisions[j]);
873         types1.push_back(types[j]);
874         csNums1.push_back(csNums[j]);
875     }
876 
877     fRowGroupDelivered = RowGroup(
878                              retColCount, pos1, oids1, keys1, types1, csNums1, scales1, precisions1, jobInfo.stringTableThreshold);
879 
880     if (jobInfo.trace)
881         cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
882 
883     if (wfsUpdateStringTable > 1)
884         fUseSSMutex = true;
885 
886     if (wfsUserFunctionCount > 1)
887         fUseUFMutex = true;
888 
889     fRowGroupOut = fRowGroupDelivered;
890 }
891 
892 
execute()893 void WindowFunctionStep::execute()
894 {
895     RGData rgData;
896     Row row;
897     fRowGroupIn.initRow(&row);
898     bool more = fInputDL->next(fInputIterator, &rgData);
899     uint64_t i = 0; // for RowGroup index in the fInRowGroupData
900 
901     if (traceOn()) dlTimes.setFirstReadTime();
902 
903     StepTeleStats sts;
904     sts.query_uuid = fQueryUuid;
905     sts.step_uuid = fStepUuid;
906     sts.msg_type = StepTeleStats::ST_START;
907     sts.total_units_of_work = 1;
908     postStepStartTele(sts);
909 
910     try
911     {
912         while (more && !cancelled())
913         {
914             fRowGroupIn.setData(&rgData);
915             fRowGroupIn.getRow(0, &row);
916             uint64_t rowCnt = fRowGroupIn.getRowCount();
917 
918             if (rowCnt > 0)
919             {
920                 fInRowGroupData.push_back(rgData);
921                 uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition);
922                 fMemUsage += memAdd;
923 
924                 if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
925                     throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
926 
927                 for (uint64_t j = 0; j < rowCnt; ++j)
928                 {
929                     if (i > 0x0000FFFFFFFFFFFFULL || j > 0x000000000000FFFFULL)
930                         throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
931 
932                     fRows.push_back(RowPosition(i, j));
933                     row.nextRow();
934                 }
935 
936                 //@bug6065, make StringStore::storeString() thread safe, default to false.
937                 rgData.useStoreStringMutex(fUseSSMutex);
938                 // For the User Data of UDAnF
939                 rgData.useUserDataMutex(fUseUFMutex);
940 
941                 // window function does not change row count
942                 fRowsReturned += rowCnt;
943 
944                 i++;
945             }
946 
947             more = fInputDL->next(fInputIterator, &rgData);
948         }
949     } // try
950     catch (...)
951     {
952         handleException(std::current_exception(),
953                         logging::ERR_READ_INPUT_DATALIST,
954                         logging::ERR_WF_DATA_SET_TOO_BIG,
955                         "WindowFunctionStep::execute()");
956     }
957 
958     if (traceOn())
959         dlTimes.setLastReadTime();
960 
961     // no need for the window function if aborted or result set is empty.
962     if (cancelled() || fRows.size() == 0)
963     {
964         while (more)
965             more = fInputDL->next(fInputIterator, &rgData);
966 
967         fOutputDL->endOfInput();
968 
969         sts.msg_type = StepTeleStats::ST_SUMMARY;
970         sts.total_units_of_work = sts.units_of_work_completed = 1;
971         sts.rows = fRowsReturned;
972         postStepSummaryTele(sts);
973 
974         if (traceOn())
975         {
976             dlTimes.setEndOfInputTime();
977             printCalTrace();
978         }
979 
980         return;
981     }
982 
983     // got something to work on
984     try
985     {
986         if (fFunctionCount == 1)
987         {
988             doFunction();
989         }
990         else
991         {
992             if (fTotalThreads > fFunctionCount)
993                 fTotalThreads = fFunctionCount;
994 
995             fFunctionThreads.clear();
996             fFunctionThreads.reserve(fTotalThreads);
997 
998             for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++)
999                 fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this)));
1000 
1001             // If cancelled, not all threads are started.
1002             jobstepThreadPool.join(fFunctionThreads);
1003         }
1004 
1005         if (!(cancelled()))
1006         {
1007             if (fIsSelect)
1008                 doPostProcessForSelect();
1009             else
1010                 doPostProcessForDml();
1011         }
1012 
1013     }
1014     catch (...)
1015     {
1016         handleException(std::current_exception(),
1017                         logging::ERR_EXECUTE_WINDOW_FUNCTION,
1018                         logging::ERR_WF_DATA_SET_TOO_BIG,
1019                         "WindowFunctionStep::execute()");
1020     }
1021 
1022     fOutputDL->endOfInput();
1023 
1024     sts.msg_type = StepTeleStats::ST_SUMMARY;
1025     sts.total_units_of_work = sts.units_of_work_completed = 1;
1026     sts.rows = fRowsReturned;
1027     postStepSummaryTele(sts);
1028 
1029     if (traceOn())
1030     {
1031         dlTimes.setEndOfInputTime();
1032         printCalTrace();
1033     }
1034 
1035     return;
1036 }
1037 
1038 
nextFunctionIndex()1039 uint64_t WindowFunctionStep::nextFunctionIndex()
1040 {
1041     uint64_t idx = atomicInc(&fNextIndex);
1042 
1043     // return index in the function array
1044     return --idx;
1045 }
1046 
1047 
doFunction()1048 void WindowFunctionStep::doFunction()
1049 {
1050     uint64_t i = 0;
1051 
1052     try
1053     {
1054         while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled())
1055         {
1056             uint64_t memAdd = fRows.size() * sizeof(RowPosition);
1057             fMemUsage += memAdd;
1058 
1059             if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
1060                 throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
1061 
1062             fFunctions[i]->setCallback(this, i);
1063             (*fFunctions[i].get())();
1064         }
1065     }
1066     catch (...)
1067     {
1068         handleException(std::current_exception(),
1069                         logging::ERR_EXECUTE_WINDOW_FUNCTION,
1070                         logging::ERR_WF_DATA_SET_TOO_BIG,
1071                         "WindowFunctionStep::doFunction()");
1072     }
1073 }
1074 
doPostProcessForSelect()1075 void WindowFunctionStep::doPostProcessForSelect()
1076 {
1077     FuncExp* fe = funcexp::FuncExp::instance();
1078     boost::shared_array<int> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
1079     Row rowIn, rowOut;
1080     fRowGroupIn.initRow(&rowIn);
1081     fRowGroupOut.initRow(&rowOut);
1082     RGData rgData;
1083     vector<RowPosition>& rowData = *(fFunctions.back()->fRowData.get());
1084     int64_t rowsLeft = rowData.size();
1085     int64_t rowsInRg = 0;
1086     int64_t rgCapacity = 0;
1087 
1088     int64_t begin = fQueryLimitStart;
1089     int64_t count = (fQueryLimitCount == (uint64_t) - 1) ? rowsLeft : fQueryLimitCount;
1090     int64_t end = begin + count;
1091     end = (end < rowsLeft) ? end : rowsLeft;
1092     rowsLeft = (end > begin) ? (end - begin) : 0;
1093 
1094     if (fQueryOrderBy.get() != NULL)
1095         sort(rowData.begin(), rowData.size());
1096 
1097     for (int64_t i = begin; i < end; i++)
1098     {
1099         if (rgData.rowData.get() == NULL)
1100         {
1101             rgCapacity = ((rowsLeft > 8192) ? 8192 : rowsLeft);
1102             rowsLeft -= rgCapacity;
1103             rgData.reinit(fRowGroupOut, rgCapacity);
1104 
1105             fRowGroupOut.setData(&rgData);
1106             fRowGroupOut.resetRowGroup(0);
1107             fRowGroupOut.setDBRoot(0);           // not valid dbroot
1108             fRowGroupOut.getRow(0, &rowOut);
1109             rowsInRg = 0;
1110         }
1111 
1112         rowIn.setData(getPointer(rowData[i], fRowGroupIn, rowIn));
1113 
1114         // evaluate the window function expressions before apply mapping
1115         if (fExpression.size() > 0)
1116             fe->evaluate(rowIn, fExpression);
1117 
1118         applyMapping(mapping, rowIn, &rowOut);
1119         rowOut.nextRow();
1120         rowsInRg++;
1121 
1122         if (rowsInRg == rgCapacity)
1123         {
1124             fRowGroupOut.setRowCount(rowsInRg);
1125             fOutputDL->insert(rgData);
1126             rgData.clear();
1127         }
1128     }
1129 }
1130 
1131 
doPostProcessForDml()1132 void WindowFunctionStep::doPostProcessForDml()
1133 {
1134     FuncExp* fe = funcexp::FuncExp::instance();
1135     boost::shared_array<int> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
1136     Row rowIn, rowOut;
1137     fRowGroupIn.initRow(&rowIn);
1138     fRowGroupOut.initRow(&rowOut);
1139 
1140     for (vector<RGData>::iterator i = fInRowGroupData.begin();
1141             i < fInRowGroupData.end(); i++)
1142     {
1143         fRowGroupIn.setData(&(*i));
1144         RGData rgData = RGData(fRowGroupIn, fRowGroupIn.getRowCount());
1145         fRowGroupOut.setData(&rgData);
1146         // @bug 5631. reset rowgroup before the data is populated.
1147         fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
1148         fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
1149         fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
1150 
1151         fRowGroupIn.getRow(0, &rowIn);
1152         fRowGroupOut.getRow(0, &rowOut);
1153 
1154         for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
1155         {
1156             // evaluate the window function expressions before apply mapping
1157             if (fExpression.size() > 0)
1158                 fe->evaluate(rowIn, fExpression);
1159 
1160             applyMapping(mapping, rowIn, &rowOut);
1161             rowIn.nextRow();
1162             rowOut.nextRow();
1163         }
1164 
1165         //fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
1166         //fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
1167 
1168         fOutputDL->insert(rgData);
1169     }
1170 }
1171 
1172 
parseFrameBoundRows(const execplan::WF_Boundary & b,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)1173 boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRows(
1174     const execplan::WF_Boundary& b,
1175     const map<uint64_t, uint64_t>& m,
1176     JobInfo& jobInfo)
1177 {
1178     boost::shared_ptr<FrameBound> fb;
1179 
1180     if (b.fFrame == WF_CURRENT_ROW)
1181     {
1182         fb.reset(new FrameBoundRow(WF__CURRENT_ROW));
1183         return fb;
1184     }
1185 
1186     ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
1187 
1188     if (cc != NULL)
1189     {
1190         Row dummy;
1191         bool isNull = false;
1192         int val = cc->getIntVal(dummy, isNull);
1193 
1194         if (val >= 0 && isNull == false)
1195         {
1196             int type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
1197             fb.reset(new FrameBoundConstantRow(type, val));
1198         }
1199         else
1200         {
1201             string str("NULL");
1202 
1203             if (!isNull)
1204             {
1205                 ostringstream oss;
1206                 oss << val;
1207                 str = oss.str();
1208             }
1209 
1210             throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
1211                             ERR_WF_BOUND_OUT_OF_RANGE);
1212         }
1213     }
1214     else
1215     {
1216         int type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
1217         uint64_t id = getTupleKey(jobInfo, b.fVal);
1218         uint64_t idx = getColumnIndex(b.fVal, m, jobInfo);
1219         TupleInfo ti = getTupleInfo(id, jobInfo);
1220 
1221         switch (ti.dtype)
1222         {
1223             case execplan::CalpontSystemCatalog::TINYINT:
1224             case execplan::CalpontSystemCatalog::SMALLINT:
1225             case execplan::CalpontSystemCatalog::MEDINT:
1226             case execplan::CalpontSystemCatalog::INT:
1227             case execplan::CalpontSystemCatalog::BIGINT:
1228             case execplan::CalpontSystemCatalog::DECIMAL:
1229             {
1230                 fb.reset(new FrameBoundExpressionRow<int64_t>(type, id, idx));
1231                 break;
1232             }
1233 
1234             case execplan::CalpontSystemCatalog::DOUBLE:
1235             case execplan::CalpontSystemCatalog::UDOUBLE:
1236             {
1237                 fb.reset(new FrameBoundExpressionRow<double>(type, id, idx));
1238                 break;
1239             }
1240 
1241             case execplan::CalpontSystemCatalog::FLOAT:
1242             case execplan::CalpontSystemCatalog::UFLOAT:
1243             {
1244                 fb.reset(new FrameBoundExpressionRow<float>(type, id, idx));
1245                 break;
1246             }
1247 
1248             case execplan::CalpontSystemCatalog::UTINYINT:
1249             case execplan::CalpontSystemCatalog::USMALLINT:
1250             case execplan::CalpontSystemCatalog::UMEDINT:
1251             case execplan::CalpontSystemCatalog::UINT:
1252             case execplan::CalpontSystemCatalog::UBIGINT:
1253             case execplan::CalpontSystemCatalog::UDECIMAL:
1254             case execplan::CalpontSystemCatalog::DATE:
1255             case execplan::CalpontSystemCatalog::DATETIME:
1256             case execplan::CalpontSystemCatalog::TIME:
1257             case execplan::CalpontSystemCatalog::TIMESTAMP:
1258             {
1259                 fb.reset(new FrameBoundExpressionRow<uint64_t>(type, id, idx));
1260                 break;
1261             }
1262 
1263             default:
1264             {
1265                 string str = windowfunction::colType2String[ti.dtype];
1266                 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
1267                                 ERR_WF_INVALID_BOUND_TYPE);
1268                 break;
1269             }
1270         }
1271     }
1272 
1273     return fb;
1274 }
1275 
1276 
parseFrameBoundRange(const execplan::WF_Boundary & b,const map<uint64_t,uint64_t> & m,const vector<SRCP> & o,JobInfo & jobInfo)1277 boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRange(const execplan::WF_Boundary& b,
1278         const map<uint64_t, uint64_t>& m,
1279         const vector<SRCP>& o,
1280         JobInfo& jobInfo)
1281 {
1282     boost::shared_ptr<FrameBound> fb;
1283 
1284     if (b.fFrame == WF_CURRENT_ROW)
1285     {
1286         fb.reset(new FrameBoundRange(WF__CURRENT_ROW));
1287         return fb;
1288     }
1289 
1290     bool isConstant = false;
1291     bool isNull = false;
1292     Row  dummy;
1293     ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
1294 
1295     if (cc != NULL)
1296     {
1297         isConstant = true;
1298         double val = cc->getDoubleVal(dummy, isNull);
1299 
1300         if (val < 0 || isNull)
1301         {
1302             string str("NULL");
1303 
1304             if (!isNull)
1305             {
1306                 ostringstream oss;
1307                 oss << val;
1308                 str = oss.str();
1309             }
1310 
1311             throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
1312                             ERR_WF_BOUND_OUT_OF_RANGE);
1313         }
1314     }
1315 
1316     int type = 0;
1317     vector<uint64_t> ids;
1318     vector<int> index;
1319     ids.push_back(getTupleKey(jobInfo, o[0]));
1320     index.push_back(getColumnIndex(o[0], m, jobInfo));
1321 
1322     if (isConstant)
1323     {
1324         type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
1325         ids.push_back(-1);   // dummy, n/a for constant
1326         index.push_back(-1); // dummy, n/a for constant
1327     }
1328     else
1329     {
1330         type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
1331         ids.push_back(getTupleKey(jobInfo, b.fVal));
1332         index.push_back(getColumnIndex(b.fVal, m, jobInfo));
1333     }
1334 
1335     ids.push_back(getTupleKey(jobInfo, b.fBound));
1336     index.push_back(getColumnIndex(b.fBound, m, jobInfo));
1337 
1338     FrameBoundRange* fbr = NULL;
1339     TupleInfo ti = getTupleInfo(ids[0], jobInfo);
1340     bool asc = o[0]->asc();
1341     bool nlf = o[0]->nullsFirst();
1342 
1343     switch (ti.dtype)
1344     {
1345         case execplan::CalpontSystemCatalog::TINYINT:
1346         case execplan::CalpontSystemCatalog::SMALLINT:
1347         case execplan::CalpontSystemCatalog::MEDINT:
1348         case execplan::CalpontSystemCatalog::INT:
1349         case execplan::CalpontSystemCatalog::BIGINT:
1350         case execplan::CalpontSystemCatalog::DECIMAL:
1351         {
1352             if (isConstant)
1353             {
1354                 int64_t v = cc->getIntVal(dummy, isNull);
1355                 fbr = new FrameBoundConstantRange<int64_t>(type, asc, nlf, &v);
1356                 fbr->isZero((v == 0));
1357             }
1358             else
1359             {
1360                 fbr = new FrameBoundExpressionRange<int64_t>(type, asc, nlf);
1361             }
1362 
1363             break;
1364         }
1365 
1366         case execplan::CalpontSystemCatalog::DOUBLE:
1367         case execplan::CalpontSystemCatalog::UDOUBLE:
1368         {
1369             if (isConstant)
1370             {
1371                 double v = cc->getDoubleVal(dummy, isNull);
1372                 fbr = new FrameBoundConstantRange<double>(type, asc, nlf, &v);
1373                 fbr->isZero((v == 0.0));
1374             }
1375             else
1376             {
1377                 fbr = new FrameBoundExpressionRange<double>(type, asc, nlf);
1378             }
1379 
1380             break;
1381         }
1382 
1383         case execplan::CalpontSystemCatalog::FLOAT:
1384         case execplan::CalpontSystemCatalog::UFLOAT:
1385         {
1386             if (isConstant)
1387             {
1388                 float v = cc->getFloatVal(dummy, isNull);
1389                 fbr = new FrameBoundConstantRange<float>(type, asc, nlf, &v);
1390                 fbr->isZero((v == 0.0));
1391             }
1392             else
1393             {
1394                 fbr = new FrameBoundExpressionRange<float>(type, asc, nlf);
1395             }
1396 
1397             break;
1398         }
1399 
1400         case execplan::CalpontSystemCatalog::UTINYINT:
1401         case execplan::CalpontSystemCatalog::USMALLINT:
1402         case execplan::CalpontSystemCatalog::UMEDINT:
1403         case execplan::CalpontSystemCatalog::UINT:
1404         case execplan::CalpontSystemCatalog::UBIGINT:
1405         case execplan::CalpontSystemCatalog::UDECIMAL:
1406         case execplan::CalpontSystemCatalog::DATE:
1407         case execplan::CalpontSystemCatalog::DATETIME:
1408         case execplan::CalpontSystemCatalog::TIME:
1409         case execplan::CalpontSystemCatalog::TIMESTAMP:
1410         {
1411             if (isConstant)
1412             {
1413                 uint64_t v = cc->getUintVal(dummy, isNull);
1414                 fbr = new FrameBoundConstantRange<uint64_t>(type, asc, nlf, &v);
1415                 fbr->isZero((v == 0));
1416             }
1417             else
1418             {
1419                 fbr = new FrameBoundExpressionRange<uint64_t>(type, asc, nlf);
1420             }
1421 
1422             break;
1423         }
1424 
1425         default:
1426         {
1427             string str = windowfunction::colType2String[ti.dtype];
1428             throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
1429                             ERR_WF_INVALID_BOUND_TYPE);
1430             break;
1431         }
1432     }
1433 
1434     fbr->setTupleId(ids);
1435     fbr->setIndex(index);
1436     fb.reset(fbr);
1437 
1438     return fb;
1439 }
1440 
1441 
parseFrameBound(const execplan::WF_Boundary & b,const map<uint64_t,uint64_t> & m,const vector<SRCP> & o,const boost::shared_ptr<EqualCompData> & p,JobInfo & j,bool rows,bool s)1442 boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBound(const execplan::WF_Boundary& b,
1443         const map<uint64_t, uint64_t>& m,
1444         const vector<SRCP>& o,
1445         const boost::shared_ptr<EqualCompData>& p,
1446         JobInfo& j,
1447         bool rows,
1448         bool s)
1449 {
1450     boost::shared_ptr<FrameBound> fb;
1451 
1452     switch (b.fFrame)
1453     {
1454         case WF_UNBOUNDED_PRECEDING:
1455         {
1456             fb.reset(new FrameBound(WF__UNBOUNDED_PRECEDING));
1457             break;
1458         }
1459 
1460         case WF_UNBOUNDED_FOLLOWING:
1461         {
1462             fb.reset(new FrameBound(WF__UNBOUNDED_FOLLOWING));
1463             break;
1464         }
1465 
1466         case WF_CURRENT_ROW:
1467         case WF_PRECEDING:
1468         case WF_FOLLOWING:
1469         {
1470             if (rows)
1471             {
1472                 fb = parseFrameBoundRows(b, m, j);
1473             }
1474             else
1475             {
1476                 fb = parseFrameBoundRange(b, m, o, j);
1477             }
1478 
1479             break;
1480         }
1481 
1482         default:  //  unknown
1483         {
1484             throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_UNKNOWN_BOUND, b.fFrame),
1485                             ERR_WF_UNKNOWN_BOUND);
1486             break;
1487         }
1488     }
1489 
1490     fb->peer(p);
1491     fb->start(s);
1492 
1493     return fb;
1494 }
1495 
1496 
updateWindowCols(ReturnedColumn * rc,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)1497 void WindowFunctionStep::updateWindowCols(ReturnedColumn* rc,
1498         const map<uint64_t, uint64_t>& m,
1499         JobInfo& jobInfo)
1500 {
1501     if (rc == NULL)
1502         return;
1503 
1504     ArithmeticColumn*     ac = dynamic_cast<ArithmeticColumn*>(rc);
1505     FunctionColumn*       fc = dynamic_cast<FunctionColumn*>(rc);
1506     SimpleFilter*         sf = dynamic_cast<SimpleFilter*>(rc);
1507     WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(rc);
1508 
1509     if (wc)
1510     {
1511         uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
1512         map<uint64_t, uint64_t>::const_iterator j = m.find(key);
1513 
1514         if (j == m.end())
1515         {
1516             string name = jobInfo.keyInfo->tupleKeyToName[key];
1517             cerr << name << " is not in tuple, key=" << key << endl;
1518             throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
1519                             ERR_WF_COLUMN_MISSING);
1520         }
1521 
1522         wc->inputIndex(j->second);
1523     }
1524     else if (ac)
1525     {
1526         updateWindowCols(ac->expression(), m, jobInfo);
1527     }
1528     else if (fc)
1529     {
1530         funcexp::FunctionParm parms = fc->functionParms();
1531 
1532         for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
1533             updateWindowCols(i->get(), m, jobInfo);
1534     }
1535     else if (sf)
1536     {
1537         updateWindowCols(sf->lhs(), m, jobInfo);
1538         updateWindowCols(sf->rhs(), m, jobInfo);
1539     }
1540 }
1541 
1542 
updateWindowCols(ParseTree * pt,const map<uint64_t,uint64_t> & m,JobInfo & jobInfo)1543 void WindowFunctionStep::updateWindowCols(ParseTree* pt,
1544         const map<uint64_t, uint64_t>& m,
1545         JobInfo& jobInfo)
1546 {
1547     if (pt == NULL)
1548         return;
1549 
1550     updateWindowCols(pt->left(), m, jobInfo);
1551     updateWindowCols(pt->right(), m, jobInfo);
1552 
1553     TreeNode* tn = pt->data();
1554     ArithmeticColumn*     ac = dynamic_cast<ArithmeticColumn*>(tn);
1555     FunctionColumn*       fc = dynamic_cast<FunctionColumn*>(tn);
1556     SimpleFilter*         sf = dynamic_cast<SimpleFilter*>(tn);
1557     WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(tn);
1558 
1559     if (wc)
1560     {
1561         uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
1562         map<uint64_t, uint64_t>::const_iterator j = m.find(key);
1563 
1564         if (j == m.end())
1565         {
1566             string name = jobInfo.keyInfo->tupleKeyToName[key];
1567             cerr << name << " is not in tuple, key=" << key << endl;
1568             throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
1569                             ERR_WF_COLUMN_MISSING);
1570         }
1571 
1572         wc->inputIndex(j->second);
1573     }
1574     else if (ac)
1575     {
1576         updateWindowCols(ac->expression(), m, jobInfo);
1577     }
1578     else if (fc)
1579     {
1580         funcexp::FunctionParm parms = fc->functionParms();
1581 
1582         for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
1583             updateWindowCols(i->get(), m, jobInfo);
1584     }
1585     else if (sf)
1586     {
1587         updateWindowCols(sf->lhs(), m, jobInfo);
1588         updateWindowCols(sf->rhs(), m, jobInfo);
1589     }
1590 }
1591 
1592 
sort(std::vector<RowPosition>::iterator v,uint64_t n)1593 void WindowFunctionStep::sort(std::vector<RowPosition>::iterator v, uint64_t n)
1594 {
1595     // recursive function termination condition.
1596     if (n < 2 || cancelled())
1597         return;
1598 
1599     RowPosition                   p = *(v + n / 2); // pivot value
1600     vector<RowPosition>::iterator l = v;            // low   address
1601     vector<RowPosition>::iterator h = v + (n - 1);  // high  address
1602 
1603     while (l <= h && !cancelled())
1604     {
1605         // Can use while here, but need check boundary and cancel status.
1606         if (fQueryOrderBy->operator()(getPointer(*l), getPointer(p)))
1607         {
1608             l++;
1609         }
1610         else if (fQueryOrderBy->operator()(getPointer(p), getPointer(*h)))
1611         {
1612             h--;
1613         }
1614         else
1615         {
1616             RowPosition t = *l;    // temp value for swap
1617             *l++ = *h;
1618             *h-- = t;
1619         }
1620     }
1621 
1622     sort(v, std::distance(v, h) + 1);
1623     sort(l, std::distance(l, v) + n);
1624 }
1625 
1626 
printCalTrace()1627 void WindowFunctionStep::printCalTrace()
1628 {
1629     time_t t = time (0);
1630     char timeString[50];
1631     ctime_r (&t, timeString);
1632     timeString[strlen (timeString ) - 1] = '\0';
1633     ostringstream logStr;
1634     logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
1635             << "; total rows returned-" << fRowsReturned << endl
1636             << "\t1st read " << dlTimes.FirstReadTimeString()
1637             << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
1638             << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
1639             << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
1640             << "\tJob completion status " << status() << endl;
1641     logEnd(logStr.str().c_str());
1642     fExtendedInfo += logStr.str();
1643     formatMiniStats();
1644 }
1645 
1646 
formatMiniStats()1647 void WindowFunctionStep::formatMiniStats()
1648 {
1649     ostringstream oss;
1650     oss << "WFS "
1651         << "UM "
1652         << "- "
1653         << "- "
1654         << "- "
1655         << "- "
1656         << "- "
1657         << "- "
1658         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1659         << fRowsReturned << " ";
1660     fMiniInfo += oss.str();
1661 }
1662 
1663 
1664 }   //namespace
1665 // vim:ts=4 sw=4:
1666 
1667