1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019-2020 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: tupleaggregatestep.cpp 9732 2013-08-02 15:56:15Z pleblanc $
20 
21 
22 //#define NDEBUG
23 // Cross engine needs to be at top due to MySQL includes
24 #include "crossenginestep.h"
25 
26 #include <cassert>
27 #include <sstream>
28 #include <iomanip>
29 #include <algorithm>
30 using namespace std;
31 
32 #include <boost/shared_ptr.hpp>
33 #include <boost/shared_array.hpp>
34 #include <boost/scoped_array.hpp>
35 #include <boost/uuid/uuid_io.hpp>
36 using namespace boost;
37 
38 #include "messagequeue.h"
39 using namespace messageqcpp;
40 
41 #include "loggingid.h"
42 #include "errorcodes.h"
43 #include "idberrorinfo.h"
44 using namespace logging;
45 
46 #include "configcpp.h"
47 using namespace config;
48 
49 #include "calpontsystemcatalog.h"
50 #include "aggregatecolumn.h"
51 #include "udafcolumn.h"
52 #include "arithmeticcolumn.h"
53 #include "functioncolumn.h"
54 #include "constantcolumn.h"
55 using namespace execplan;
56 
57 #include "rowgroup.h"
58 #include "rowaggregation.h"
59 using namespace rowgroup;
60 
61 #include "querytele.h"
62 using namespace querytele;
63 
64 #include "jlf_common.h"
65 #include "jobstep.h"
66 #include "primitivestep.h"
67 #include "subquerystep.h"
68 #include "tuplehashjoin.h"
69 #include "tupleaggregatestep.h"
70 
71 //#include "stopwatch.cpp"
72 
73 //Stopwatch timer;
74 
75 namespace
76 {
77 
78 struct cmpTuple
79 {
operator ()__anon337c136f0111::cmpTuple80     bool operator()(boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>* > a,
81                     boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>* > b)
82     {
83         uint32_t keya = boost::get<0>(a);
84         uint32_t keyb = boost::get<0>(b);
85         int opa;
86         int opb;
87         mcsv1sdk::mcsv1_UDAF* pUDAFa;
88         mcsv1sdk::mcsv1_UDAF* pUDAFb;
89 
90         // If key is less than
91         if (keya < keyb)
92             return true;
93         if (keya == keyb)
94         {
95             // test Op
96             opa = boost::get<1>(a);
97             opb = boost::get<1>(b);
98             if (opa < opb)
99                 return true;
100             if (opa == opb)
101             {
102                 // look at the UDAF object
103                 pUDAFa = boost::get<2>(a);
104                 pUDAFb = boost::get<2>(b);
105                 if (pUDAFa < pUDAFb)
106                     return true;
107                 if (pUDAFa == pUDAFb)
108                 {
109                     if (pUDAFa == NULL)
110                         return false;
111                     std::vector<uint32_t>* paramKeysa = boost::get<3>(a);
112                     std::vector<uint32_t>* paramKeysb = boost::get<3>(b);
113 
114                     if (paramKeysa->size() < paramKeysb->size())
115                         return true;
116                     if (paramKeysa->size() == paramKeysb->size())
117                     {
118                         if (paramKeysa == NULL)
119                             return false;
120                         for (uint64_t i = 0; i < paramKeysa->size(); ++i)
121                         {
122                             if ((*paramKeysa)[i] < (*paramKeysb)[i])
123                                 return true;
124                         }
125                     }
126                 }
127             }
128         }
129         return false;
130     }
131 };
132 
133 typedef vector<std::pair<Row::Pointer, uint64_t>> RowBucket;
134 typedef vector<RowBucket> RowBucketVec;
135 
136 // The AGG_MAP type is used to maintain a list of aggregate functions in order to
137 // detect duplicates. Since all UDAF have the same op type (ROWAGG_UDAF), we add in
138 // the function pointer in order to ensure uniqueness.
139 typedef map<boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>* >, uint64_t, cmpTuple> AGG_MAP;
140 
functionIdMap(int planFuncId)141 inline RowAggFunctionType functionIdMap(int planFuncId)
142 {
143     switch (planFuncId)
144     {
145         case AggregateColumn::COUNT_ASTERISK:
146             return ROWAGG_COUNT_ASTERISK;
147 
148         case AggregateColumn::COUNT:
149             return ROWAGG_COUNT_COL_NAME;
150 
151         case AggregateColumn::SUM:
152             return ROWAGG_SUM;
153 
154         case AggregateColumn::AVG:
155             return ROWAGG_AVG;
156 
157         case AggregateColumn::MIN:
158             return ROWAGG_MIN;
159 
160         case AggregateColumn::MAX:
161             return ROWAGG_MAX;
162 
163         case AggregateColumn::DISTINCT_COUNT:
164             return ROWAGG_COUNT_DISTINCT_COL_NAME;
165 
166         case AggregateColumn::DISTINCT_SUM:
167             return ROWAGG_DISTINCT_SUM;
168 
169         case AggregateColumn::DISTINCT_AVG:
170             return ROWAGG_DISTINCT_AVG;
171 
172         case AggregateColumn::STDDEV_POP:
173             return ROWAGG_STATS;
174 
175         case AggregateColumn::STDDEV_SAMP:
176             return ROWAGG_STATS;
177 
178         case AggregateColumn::VAR_POP:
179             return ROWAGG_STATS;
180 
181         case AggregateColumn::VAR_SAMP:
182             return ROWAGG_STATS;
183 
184         case AggregateColumn::BIT_AND:
185             return ROWAGG_BIT_AND;
186 
187         case AggregateColumn::BIT_OR:
188             return ROWAGG_BIT_OR;
189 
190         case AggregateColumn::BIT_XOR:
191             return ROWAGG_BIT_XOR;
192 
193         case AggregateColumn::GROUP_CONCAT:
194             return ROWAGG_GROUP_CONCAT;
195 
196         case AggregateColumn::CONSTANT:
197             return ROWAGG_CONSTANT;
198 
199         case AggregateColumn::UDAF:
200             return ROWAGG_UDAF;
201 
202         case AggregateColumn::MULTI_PARM:
203             return ROWAGG_MULTI_PARM;
204 
205         default:
206             return ROWAGG_FUNCT_UNDEFINE;
207     }
208 }
209 
210 
statsFuncIdMap(int planFuncId)211 inline RowAggFunctionType statsFuncIdMap(int planFuncId)
212 {
213     switch (planFuncId)
214     {
215         case AggregateColumn::STDDEV_POP:
216             return ROWAGG_STDDEV_POP;
217 
218         case AggregateColumn::STDDEV_SAMP:
219             return ROWAGG_STDDEV_SAMP;
220 
221         case AggregateColumn::VAR_POP:
222             return ROWAGG_VAR_POP;
223 
224         case AggregateColumn::VAR_SAMP:
225             return ROWAGG_VAR_SAMP;
226 
227         default:
228             return ROWAGG_FUNCT_UNDEFINE;
229     }
230 }
231 
232 
colTypeIdString(CalpontSystemCatalog::ColDataType type)233 inline string colTypeIdString(CalpontSystemCatalog::ColDataType type)
234 {
235     switch (type)
236     {
237         case CalpontSystemCatalog::BIT:
238             return string("BIT");
239 
240         case CalpontSystemCatalog::TINYINT:
241             return string("TINYINT");
242 
243         case CalpontSystemCatalog::CHAR:
244             return string("CHAR");
245 
246         case CalpontSystemCatalog::SMALLINT:
247             return string("SMALLINT");
248 
249         case CalpontSystemCatalog::DECIMAL:
250             return string("DECIMAL");
251 
252         case CalpontSystemCatalog::MEDINT:
253             return string("MEDINT");
254 
255         case CalpontSystemCatalog::INT:
256             return string("INT");
257 
258         case CalpontSystemCatalog::FLOAT:
259             return string("FLOAT");
260 
261         case CalpontSystemCatalog::DATE:
262             return string("DATE");
263 
264         case CalpontSystemCatalog::BIGINT:
265             return string("BIGINT");
266 
267         case CalpontSystemCatalog::DOUBLE:
268             return string("DOUBLE");
269 
270         case CalpontSystemCatalog::LONGDOUBLE:
271             return string("LONGDOUBLE");
272 
273         case CalpontSystemCatalog::DATETIME:
274             return string("DATETIME");
275 
276         case CalpontSystemCatalog::TIMESTAMP:
277             return string("TIMESTAMP");
278 
279         case CalpontSystemCatalog::TIME:
280             return string("TIME");
281 
282         case CalpontSystemCatalog::VARCHAR:
283             return string("VARCHAR");
284 
285         case CalpontSystemCatalog::CLOB:
286             return string("CLOB");
287 
288         case CalpontSystemCatalog::BLOB:
289             return string("BLOB");
290 
291         case CalpontSystemCatalog::TEXT:
292             return string("TEXT");
293 
294         case CalpontSystemCatalog::UTINYINT:
295             return string("UTINYINT");
296 
297         case CalpontSystemCatalog::USMALLINT:
298             return string("USMALLINT");
299 
300         case CalpontSystemCatalog::UDECIMAL:
301             return string("UDECIMAL");
302 
303         case CalpontSystemCatalog::UMEDINT:
304             return string("UMEDINT");
305 
306         case CalpontSystemCatalog::UINT:
307             return string("UINT");
308 
309         case CalpontSystemCatalog::UFLOAT:
310             return string("UFLOAT");
311 
312         case CalpontSystemCatalog::UBIGINT:
313             return string("UBIGINT");
314 
315         case CalpontSystemCatalog::UDOUBLE:
316             return string("UDOUBLE");
317 
318         default:
319             return string("UNKNOWN");
320     }
321 }
322 
323 
keyName(uint64_t i,uint32_t key,const joblist::JobInfo & jobInfo)324 string keyName(uint64_t i, uint32_t key, const joblist::JobInfo& jobInfo)
325 {
326     string name = jobInfo.projectionCols[i]->alias();
327 
328     if (name.empty())
329     {
330         name = jobInfo.keyInfo->tupleKeyToName[key];
331 
332         if (jobInfo.keyInfo->tupleKeyVec[key].fId < 100)
333             name = "Expression/Function";
334     }
335 
336     return name = "'" + name + "'";
337 }
338 
339 
340 }
341 
342 
343 namespace joblist
344 {
345 
346 
TupleAggregateStep(const SP_ROWAGG_UM_t & agg,const RowGroup & rgOut,const RowGroup & rgIn,const JobInfo & jobInfo)347 TupleAggregateStep::TupleAggregateStep(
348     const SP_ROWAGG_UM_t& agg,
349     const RowGroup& rgOut,
350     const RowGroup& rgIn,
351     const JobInfo& jobInfo) :
352     JobStep(jobInfo),
353     fCatalog(jobInfo.csc),
354     fRowsReturned(0),
355     fDoneAggregate(false),
356     fEndOfResult(false),
357     fAggregator(agg),
358     fRowGroupOut(rgOut),
359     fRowGroupIn(rgIn),
360     fRunner(0),
361     fUmOnly(false),
362     fRm(jobInfo.rm),
363     fBucketNum(0),
364     fInputIter(-1),
365     fSessionMemLimit(jobInfo.umMemLimit)
366 {
367     fRowGroupData.reinit(fRowGroupOut);
368     fRowGroupOut.setData(&fRowGroupData);
369     fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
370 
371     // decide if this needs to be multi-threaded
372     RowAggregationDistinct* multiAgg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
373     fIsMultiThread = (multiAgg || fAggregator->aggMapKeyLength() > 0);
374 
375     // initialize multi-thread variables
376     fNumOfThreads = fRm->aggNumThreads();
377     fNumOfBuckets = fRm->aggNumBuckets();
378     fNumOfRowGroups = fRm->aggNumRowGroups();
379 
380     auto memLimit = std::min(fRm->availableMemory(), *fSessionMemLimit);
381     fNumOfBuckets = calcNumberOfBuckets(memLimit,
382                                         fNumOfThreads,
383                                         fNumOfBuckets,
384                                         fNumOfRowGroups,
385                                         fRowGroupIn.getRowSize(),
386                                         fRowGroupOut.getRowSize(),
387                                         fRm->getAllowDiskAggregation());
388     fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);
389 
390     fMemUsage.reset(new uint64_t[fNumOfThreads]);
391     memset(fMemUsage.get(), 0, fNumOfThreads * sizeof(uint64_t));
392 
393     fExtendedInfo = "TAS: ";
394     fQtc.stepParms().stepType = StepTeleStats::T_TAS;
395 }
396 
397 
~TupleAggregateStep()398 TupleAggregateStep::~TupleAggregateStep()
399 {
400     for (uint32_t i = 0; i < fNumOfThreads; i++)
401         fRm->returnMemory(fMemUsage[i], fSessionMemLimit);
402 
403     for (uint32_t i = 0; i < fAgg_mutex.size(); i++)
404         delete fAgg_mutex[i];
405 }
406 
407 
initializeMultiThread()408 void TupleAggregateStep::initializeMultiThread()
409 {
410     RowGroupDL* dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
411     uint32_t i;
412 
413     if (dlIn == NULL)
414         throw logic_error("Input is not RowGroup data list in delivery step.");
415 
416     if (fInputIter < 0)
417         fInputIter = dlIn->getIterator();
418 
419     fRowGroupIns.resize(fNumOfThreads);
420     fRowGroupOuts.resize(fNumOfBuckets);
421     fRowGroupDatas.resize(fNumOfBuckets);
422 
423     rowgroup::SP_ROWAGG_UM_t agg;
424     RGData rgData;
425 
426     for (i = 0; i < fNumOfBuckets; i++)
427     {
428         boost::mutex* lock = new boost::mutex();
429         fAgg_mutex.push_back(lock);
430         fRowGroupOuts[i] = fRowGroupOut;
431         rgData.reinit(fRowGroupOut);
432         fRowGroupDatas[i] = rgData;
433         fRowGroupOuts[i].setData(&fRowGroupDatas[i]);
434         fRowGroupOuts[i].resetRowGroup(0);
435     }
436 }
437 
438 
run()439 void TupleAggregateStep::run()
440 {
441     if (fDelivery == false)
442     {
443         fRunner = jobstepThreadPool.invoke(Aggregator(this));
444     }
445 }
446 
447 
join()448 void TupleAggregateStep::join()
449 {
450     if (fRunner)
451         jobstepThreadPool.join(fRunner);
452 }
453 
454 
doThreadedSecondPhaseAggregate(uint32_t threadID)455 void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
456 {
457     if (threadID >= fNumOfBuckets)
458         return;
459 
460     scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
461     scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
462     uint32_t hashlen = fAggregator->aggMapKeyLength();
463 
464     try
465     {
466         RowAggregationDistinct* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[threadID].get());
467         RowAggregationMultiDistinct* multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[threadID].get());
468         Row rowIn;
469         RowGroup* rowGroupIn = nullptr;
470         rowGroupIn = (aggDist->aggregator()->getOutputRowGroup());
471         uint32_t bucketID;
472         std::vector<std::unique_ptr<RGData>> rgDataVec;
473 
474         if (multiDist)
475         {
476             for (uint32_t i = 0; i < fNumOfBuckets; i++)
477                 rowBucketVecs[i].resize(multiDist->subAggregators().size());
478         }
479         else
480         {
481             for (uint32_t i = 0; i < fNumOfBuckets; i++)
482                 rowBucketVecs[i].resize(1);
483         }
484 
485         // dispatch rows to bucket
486         if (multiDist)
487         {
488             for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
489             {
490                 rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
491                 rowGroupIn->initRow(&rowIn);
492                 auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
493 
494                 while (subDistAgg->nextRowGroup())
495                 {
496                     rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
497                     rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
498                     rowGroupIn->getRow(0, &rowIn);
499 
500                     for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
501                     {
502                         // The key is the groupby columns, which are the leading columns.
503                         //uint8_t* hashMapKey = rowIn.getData() + 2;
504                         //bucketID = hash.operator()(hashMapKey) & fBucketMask;
505                         uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
506                         bucketID = hash % fNumOfBuckets;
507                         rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
508                         rowIn.nextRow();
509                     }
510                 }
511             }
512         }
513         else
514         {
515             rowGroupIn->initRow(&rowIn);
516             auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
517 
518             while (subAgg->nextRowGroup())
519             {
520                 rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
521                 rgDataVec.emplace_back(subAgg->moveCurrentRGData());
522                 rowGroupIn->getRow(0, &rowIn);
523 
524                 for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
525                 {
526                     // The key is the groupby columns, which are the leading columns.
527                     //uint8_t* hashMapKey = rowIn.getData() + 2;
528                     //bucketID = hash.operator()(hashMapKey) & fBucketMask;
529                     uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
530                     bucketID = hash % fNumOfBuckets;
531                     rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
532                     rowIn.nextRow();
533                 }
534             }
535         }
536 
537         bool done = false;
538 
539         // reset bucketDone[] to be false
540         //memset(bucketDone, 0, sizeof(bucketDone));
541         fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
542 
543         while (!done && !cancelled())
544         {
545             done = true;
546 
547             for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++)
548             {
549                 if (!bucketDone[c] && fAgg_mutex[c]->try_lock())
550                 {
551                     try
552                     {
553                         if (multiDist)
554                             dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())->doDistinctAggregation_rowVec(rowBucketVecs[c]);
555                         else
556                             dynamic_cast<RowAggregationDistinct*>(fAggregators[c].get())->doDistinctAggregation_rowVec(rowBucketVecs[c][0]);
557                     }
558                     catch (...)
559                     {
560                         fAgg_mutex[c]->unlock();
561                         throw;
562                     }
563 
564                     fAgg_mutex[c]->unlock();
565                     bucketDone[c] = true;
566                     rowBucketVecs[c][0].clear();
567                 }
568                 else if (!bucketDone[c])
569                 {
570                     done = false;
571                 }
572             }
573         }
574 
575         if (cancelled())
576         {
577             fEndOfResult = true;
578         }
579 
580     } // try
581     catch (...)
582     {
583         handleException(std::current_exception(),
584                         logging::tupleAggregateStepErr,
585                         logging::ERR_AGGREGATION_TOO_BIG,
586                         "TupleAggregateStep::doThreadedSecondPhaseAggregate()");
587         fEndOfResult = true;
588     }
589 
590 
591     fDoneAggregate = true;
592 
593     if (traceOn())
594     {
595         dlTimes.setLastReadTime();
596         dlTimes.setEndOfInputTime();
597     }
598 }
599 
600 
nextBand_singleThread(messageqcpp::ByteStream & bs)601 uint32_t TupleAggregateStep::nextBand_singleThread(messageqcpp::ByteStream& bs)
602 {
603     uint32_t rowCount = 0;
604 
605     try
606     {
607         if (!fDoneAggregate)
608             aggregateRowGroups();
609 
610         if (fEndOfResult == false)
611         {
612             bs.restart();
613 
614             // do the final aggregtion and deliver the results
615             // at least one RowGroup for aggregate results
616             if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) != NULL)
617             {
618                 dynamic_cast<RowAggregationDistinct*>(fAggregator.get())->doDistinctAggregation();
619             }
620 
621             if (fAggregator->nextRowGroup())
622             {
623                 fAggregator->finalize();
624                 rowCount = fRowGroupOut.getRowCount();
625                 fRowsReturned += rowCount;
626                 fRowGroupDelivered.setData(fRowGroupOut.getRGData());
627 
628                 if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
629                     pruneAuxColumns();
630 
631                 fRowGroupDelivered.serializeRGData(bs);
632             }
633             else
634             {
635                 fEndOfResult = true;
636             }
637         }
638     } // try
639     catch (...)
640     {
641         handleException(std::current_exception(),
642                         logging::tupleAggregateStepErr,
643                         logging::ERR_AGGREGATION_TOO_BIG,
644                         "TupleAggregateStep::doThreadedSecondPhaseAggregate()");
645         fEndOfResult = true;
646     }
647 
648     if (fEndOfResult)
649     {
650         StepTeleStats sts;
651         sts.query_uuid = fQueryUuid;
652         sts.step_uuid = fStepUuid;
653         sts.msg_type = StepTeleStats::ST_SUMMARY;
654         sts.total_units_of_work = sts.units_of_work_completed = 1;
655         sts.rows = fRowsReturned;
656         postStepSummaryTele(sts);
657 
658         // send an empty / error band
659         RGData rgData(fRowGroupOut, 0);
660         fRowGroupOut.setData(&rgData);
661         fRowGroupOut.resetRowGroup(0);
662         fRowGroupOut.setStatus(status());
663         fRowGroupOut.serializeRGData(bs);
664         rowCount = 0;
665 
666         if (traceOn())
667             printCalTrace();
668     }
669 
670     return rowCount;
671 }
672 
673 
nextDeliveredRowGroup()674 bool TupleAggregateStep::nextDeliveredRowGroup()
675 {
676     for (; fBucketNum < fNumOfBuckets; fBucketNum++)
677     {
678         while (fAggregators[fBucketNum]->nextRowGroup())
679         {
680             fAggregators[fBucketNum]->finalize();
681             fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
682             fRowGroupOut.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
683             return true;
684         }
685     }
686 
687     fBucketNum = 0;
688     return false;
689 }
690 
691 
nextBand(messageqcpp::ByteStream & bs)692 uint32_t TupleAggregateStep::nextBand(messageqcpp::ByteStream& bs)
693 {
694     // use the orignal single thread model when no group by and distnct.
695     // @bug4314. DO NOT access fAggregtor before the first read of input,
696     // because hashjoin may not have finalized fAggregator.
697     if (!fIsMultiThread)
698         return nextBand_singleThread(bs);
699 
700     return doThreadedAggregate(bs, 0);
701 }
702 
703 
setPmHJAggregation(JobStep * step)704 bool TupleAggregateStep::setPmHJAggregation(JobStep* step)
705 {
706     TupleBPS* bps = dynamic_cast<TupleBPS*>(step);
707 
708     if (bps != NULL)
709     {
710         fAggregatorUM->expression(fAggregator->expression());
711         fAggregatorUM->constantAggregate(fAggregator->constantAggregate());
712         fAggregator = fAggregatorUM;
713         fRowGroupIn = fRowGroupPMHJ;
714         fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
715         bps->setAggregateStep(fAggregatorPM, fRowGroupPMHJ);
716     }
717 
718     return (bps != NULL);
719 }
720 
721 
configDeliveredRowGroup(const JobInfo & jobInfo)722 void TupleAggregateStep::configDeliveredRowGroup(const JobInfo& jobInfo)
723 {
724     // configure the oids and keys
725     vector<uint32_t> oids = fRowGroupOut.getOIDs();
726     vector<uint32_t> keys = fRowGroupOut.getKeys();
727     vector<pair<int, int> >::const_iterator begin = jobInfo.aggEidIndexList.begin();
728     vector<pair<int, int> >::const_iterator end   = jobInfo.aggEidIndexList.end();
729 
730     for (vector<pair<int, int> >::const_iterator i = begin; i != end; i++)
731     {
732         oids[i->second] = i->first;
733         keys[i->second] = getExpTupleKey(jobInfo, i->first);
734     }
735 
736     // correct the scale
737     vector<uint32_t> scale = fRowGroupOut.getScale();
738 
739 //    for (uint64_t i = 0; i < scale.size(); i++)
740 //    {
741         // to support CNX_DECIMAL_SCALE the avg column's scale is coded with two scales:
742         // fe's avg column scale << 8 + original column scale
743         //if ((scale[i] & 0x0000FF00) > 0)
744 //        scale[i] = scale[i] &  0x000000FF;
745 //    }
746 
747     size_t retColCount = jobInfo.nonConstDelCols.size();
748 
749     if (jobInfo.havingStep)
750         retColCount = jobInfo.returnedColVec.size();
751 
752     vector<uint32_t>::const_iterator offsets0 = fRowGroupOut.getOffsets().begin();
753     vector<CalpontSystemCatalog::ColDataType>::const_iterator types0 =
754         fRowGroupOut.getColTypes().begin();
755     vector<uint32_t> csNums = fRowGroupOut.getCharsetNumbers();
756     vector<uint32_t>::const_iterator precision0 = fRowGroupOut.getPrecision().begin();
757     fRowGroupDelivered = RowGroup(retColCount,
758                                   vector<uint32_t>(offsets0, offsets0 + retColCount + 1),
759                                   vector<uint32_t>(oids.begin(), oids.begin() + retColCount),
760                                   vector<uint32_t>(keys.begin(), keys.begin() + retColCount),
761                                   vector<CalpontSystemCatalog::ColDataType>(types0, types0 + retColCount),
762                                   vector<uint32_t>(csNums.begin(), csNums.begin() + retColCount),
763                                   vector<uint32_t>(scale.begin(), scale.begin() + retColCount),
764                                   vector<uint32_t>(precision0, precision0 + retColCount),
765                                   jobInfo.stringTableThreshold);
766 
767     if (jobInfo.trace)
768         cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
769 
770 }
771 
772 
setOutputRowGroup(const RowGroup & rg)773 void TupleAggregateStep::setOutputRowGroup(const RowGroup& rg)
774 {
775     fRowGroupOut = rg;
776     fRowGroupData.reinit(fRowGroupOut);
777     fRowGroupOut.setData(&fRowGroupData);
778     fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
779 }
780 
781 
getOutputRowGroup() const782 const RowGroup& TupleAggregateStep::getOutputRowGroup() const
783 {
784     return fRowGroupOut;
785 }
786 
787 
getDeliveredRowGroup() const788 const RowGroup& TupleAggregateStep::getDeliveredRowGroup() const
789 {
790     return fRowGroupDelivered;
791 }
792 
793 
savePmHJData(SP_ROWAGG_t & um,SP_ROWAGG_t & pm,RowGroup & rg)794 void TupleAggregateStep::savePmHJData(SP_ROWAGG_t& um, SP_ROWAGG_t& pm, RowGroup& rg)
795 {
796     fAggregatorUM = dynamic_pointer_cast<RowAggregationUM>(um);
797     fAggregatorPM = pm;
798     fRowGroupPMHJ = rg;
799 }
800 
801 
deliverStringTableRowGroup(bool b)802 void TupleAggregateStep::deliverStringTableRowGroup(bool b)
803 {
804     fRowGroupDelivered.setUseStringTable(b);
805 }
806 
807 
deliverStringTableRowGroup() const808 bool TupleAggregateStep::deliverStringTableRowGroup() const
809 {
810     return fRowGroupDelivered.usesStringTable();
811 }
812 
813 
toString() const814 const string TupleAggregateStep::toString() const
815 {
816     ostringstream oss;
817     oss << "AggregateStep   ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
818 
819     oss << " in:";
820 
821     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
822         oss << fInputJobStepAssociation.outAt(i);
823 
824     if (fOutputJobStepAssociation.outSize() > 0)
825     {
826         oss << " out:";
827 
828         for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
829             oss << fOutputJobStepAssociation.outAt(i);
830     }
831 
832     return oss.str();
833 }
834 
prepAggregate(SJSTEP & step,JobInfo & jobInfo)835 SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
836 {
837     SJSTEP spjs;
838     TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(step.get());
839     TupleBPS* tbps = dynamic_cast<TupleBPS*>(step.get());
840     TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(step.get());
841     SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(step.get());
842     CrossEngineStep* ces = dynamic_cast<CrossEngineStep*>(step.get());
843     vector<RowGroup> rgs;      // 0-ProjRG, 1-UMRG, [2-PMRG -- if 2 phases]
844     vector<SP_ROWAGG_t> aggs;
845     SP_ROWAGG_UM_t aggUM;
846     bool distinctAgg = false;
847     int64_t constKey = -1;
848     vector<ConstantAggData> constAggDataVec;
849 
850     vector<std::pair<uint32_t, int> > returnedColVecOrig = jobInfo.returnedColVec;
851 
852     for (uint32_t idx = 0; idx < jobInfo.returnedColVec.size(); idx++)
853     {
854         if (jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_COUNT ||
855                 jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_AVG ||
856                 jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_SUM
857            )
858         {
859             distinctAgg = true;
860         }
861 
862         // Change COUNT_ASTERISK to CONSTANT if necessary.
863         // In joblistfactory, all aggregate(constant) are set to count(*) for easy process.
864         map<uint64_t, SRCP>::iterator it = jobInfo.constAggregate.find(idx);
865 
866         if (it != jobInfo.constAggregate.end())
867         {
868             AggregateColumn* ac = dynamic_cast<AggregateColumn*>(it->second.get());
869 
870             if (ac->aggOp() == AggregateColumn::COUNT_ASTERISK)
871             {
872                 if (jobInfo.cntStarPos == -1)
873                     jobInfo.cntStarPos = idx;
874             }
875             else
876             {
877                 constKey = jobInfo.returnedColVec[idx].first;
878                 CalpontSystemCatalog::ColType ct = ac->resultType();
879                 TupleInfo ti =
880                     setExpTupleInfo(ct, ac->expressionId(), ac->alias(), jobInfo);
881                 jobInfo.returnedColVec[idx].first = ti.key;
882                 jobInfo.returnedColVec[idx].second = AggregateColumn::CONSTANT;
883 
884                 ConstantColumn* cc = dynamic_cast<ConstantColumn*>(ac->constCol().get());
885                 idbassert(cc != NULL);   // @bug5261
886                 bool isNull = (ConstantColumn::NULLDATA == cc->type());
887 
888                 if (ac->aggOp() == AggregateColumn::UDAF)
889                 {
890                     UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
891                     if (udafc)
892                     {
893                         constAggDataVec.push_back(
894                             ConstantAggData(cc->constval(), udafc->getContext().getName(),
895                                             functionIdMap(ac->aggOp()), isNull));
896                     }
897                 }
898                 else
899                 {
900                     constAggDataVec.push_back(
901                         ConstantAggData(cc->constval(), functionIdMap(ac->aggOp()), isNull));
902                 }
903             }
904         }
905     }
906 
907     // If there are aggregate(constant) columns, but no count(*), add a count(*).
908     if (constAggDataVec.size() > 0 && jobInfo.cntStarPos < 0)
909     {
910         jobInfo.cntStarPos = jobInfo.returnedColVec.size();
911         jobInfo.returnedColVec.push_back(make_pair(constKey, AggregateColumn::COUNT_ASTERISK));
912     }
913 
914     // preprocess the columns used by group_concat
915     jobInfo.groupConcatInfo.prepGroupConcat(jobInfo);
916     bool doUMOnly = jobInfo.groupConcatInfo.columns().size() > 0
917 //                 || jobInfo.windowSet.size() > 0
918                  || sas
919                  || ces;
920 
921     rgs.push_back(tds->getDeliveredRowGroup());
922 
923     // get rowgroup and aggregator
924     // For TupleHashJoin, we prepare for both PM and UM only aggregation
925     if (doUMOnly || thjs)
926     {
927         if (distinctAgg == true)
928             prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
929         else
930             prep1PhaseAggregate(jobInfo, rgs, aggs);
931 
932         // TODO: fix this
933         if (doUMOnly)
934             rgs.push_back(rgs[0]);
935     }
936 
937     if (!doUMOnly)
938     {
939         if (distinctAgg == true)
940             prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
941         else
942             prep2PhasesAggregate(jobInfo, rgs, aggs);
943     }
944 
945     if (tbps != NULL)
946     {
947         // create delivery step
948         aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
949         spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo));
950 
951         if (doUMOnly)
952             dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
953         else
954             tbps->setAggregateStep(aggs[1], rgs[2]);
955     }
956     else if (thjs != NULL)
957     {
958         // create delivery step
959         aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
960         spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
961 
962         if (doUMOnly)
963             dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
964         else
965             dynamic_cast<TupleAggregateStep*>(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]);
966 
967         // set input side
968         thjs->deliveryStep(spjs);
969     }
970     else
971     {
972         aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
973         spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
974     }
975 
976     // Setup the input JobstepAssoctiation -- the mechanism
977     // whereby the previous step feeds data to this step.
978     // Otherwise, we need to create one and hook to the
979     // previous step as well as this aggregate step.
980     spjs->stepId(step->stepId() + 1);
981 
982     JobStepAssociation jsa;
983     AnyDataListSPtr spdl(new AnyDataList());
984     RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
985     dl->OID(execplan::CNX_VTABLE_ID);
986     spdl->rowGroupDL(dl);
987     jsa.outAdd(spdl);
988 
989     spjs->inputAssociation(jsa); // Aggregate input
990 
991     //Previous step output
992     step->outputAssociation(jsa);
993 
994     // add the aggregate on constants
995     if (constAggDataVec.size() > 0)
996     {
997         dynamic_cast<TupleAggregateStep*>(spjs.get())->addConstangAggregate(constAggDataVec);
998         jobInfo.returnedColVec.swap(returnedColVecOrig); // restore the original return columns
999     }
1000 
1001     // fix the delivered rowgroup data
1002     dynamic_cast<TupleAggregateStep*>(spjs.get())->configDeliveredRowGroup(jobInfo);
1003 
1004     if (jobInfo.expressionVec.size() > 0)
1005         dynamic_cast<TupleAggregateStep*>(spjs.get())->prepExpressionOnAggregate(aggUM, jobInfo);
1006 
1007     return spjs;
1008 }
1009 
1010 
prep1PhaseAggregate(JobInfo & jobInfo,vector<RowGroup> & rowgroups,vector<SP_ROWAGG_t> & aggregators)1011 void TupleAggregateStep::prep1PhaseAggregate(
1012     JobInfo& jobInfo, vector<RowGroup>& rowgroups, vector<SP_ROWAGG_t>& aggregators)
1013 {
1014     // check if there are any aggregate columns
1015     vector<pair<uint32_t, int> > aggColVec;
1016     vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
1017 
1018     for (uint64_t i = 0; i < returnedColVec.size(); i++)
1019     {
1020         if (returnedColVec[i].second != 0)
1021             aggColVec.push_back(returnedColVec[i]);
1022     }
1023 
1024     // populate the aggregate rowgroup: projectedRG   -> aggregateRG
1025     //
1026     // Aggregate preparation by joblist factory:
1027     // 1. get projected rowgroup (done by doAggProject) -- passed in
1028     // 2. construct aggregate rowgroup  -- output of UM
1029     const RowGroup projRG = rowgroups[0];
1030     const vector<uint32_t>& oidsProj = projRG.getOIDs();
1031     const vector<uint32_t>& keysProj = projRG.getKeys();
1032     const vector<uint32_t>& scaleProj = projRG.getScale();
1033     const vector<uint32_t>& precisionProj = projRG.getPrecision();
1034     const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
1035     const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
1036 
1037     vector<uint32_t> posAgg;
1038     vector<uint32_t> oidsAgg;
1039     vector<uint32_t> keysAgg;
1040     vector<uint32_t> scaleAgg;
1041     vector<uint32_t> precisionAgg;
1042     vector<CalpontSystemCatalog::ColDataType> typeAgg;
1043     vector<uint32_t> csNumAgg;
1044     vector<uint32_t> widthAgg;
1045     vector<SP_ROWAGG_GRPBY_t> groupBy;
1046     vector<SP_ROWAGG_FUNC_t> functionVec;
1047     uint32_t bigIntWidth = sizeof(int64_t);
1048     uint32_t bigUintWidth = sizeof(uint64_t);
1049     // For UDAF
1050     uint32_t projColsUDAFIdx = 0;
1051     UDAFColumn* udafc = NULL;
1052     mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
1053     // for count column of average function
1054     map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
1055 
1056     // collect the projected column info, prepare for aggregation
1057     vector<uint32_t> width;
1058     map<uint32_t, int> projColPosMap;
1059 
1060     for (uint64_t i = 0; i < keysProj.size(); i++)
1061     {
1062         projColPosMap.insert(make_pair(keysProj[i], i));
1063         width.push_back(projRG.getColumnWidth(i));
1064     }
1065 
1066     // for groupby column
1067     map<uint32_t, int> groupbyMap;
1068 
1069     for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
1070     {
1071         int64_t colProj = projColPosMap[jobInfo.groupByColVec[i]];
1072         SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1));
1073         groupBy.push_back(groupby);
1074         groupbyMap.insert(make_pair(jobInfo.groupByColVec[i], i));
1075     }
1076 
1077     // for distinct column
1078     for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
1079     {
1080         //@bug6126, continue if already in group by
1081         if (groupbyMap.find(jobInfo.distinctColVec[i]) != groupbyMap.end())
1082             continue;
1083 
1084         int64_t colProj = projColPosMap[jobInfo.distinctColVec[i]];
1085         SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1));
1086         groupBy.push_back(groupby);
1087         groupbyMap.insert(make_pair(jobInfo.distinctColVec[i], i));
1088     }
1089 
1090     // populate the aggregate rowgroup
1091     AGG_MAP aggFuncMap;
1092     uint64_t outIdx = 0;
1093 
1094     for (uint64_t i = 0; i < returnedColVec.size(); i++)
1095     {
1096         RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
1097         RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
1098         uint32_t key = returnedColVec[i].first;
1099 
1100         if (aggOp == ROWAGG_CONSTANT)
1101         {
1102             TupleInfo ti = getTupleInfo(key, jobInfo);
1103             oidsAgg.push_back(ti.oid);
1104             keysAgg.push_back(key);
1105             scaleAgg.push_back(ti.scale);
1106             precisionAgg.push_back(ti.precision);
1107             typeAgg.push_back(ti.dtype);
1108             csNumAgg.push_back(ti.csNum);
1109             widthAgg.push_back(ti.width);
1110             SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(
1111                                        aggOp, stats, 0, outIdx, jobInfo.cntStarPos));
1112             functionVec.push_back(funct);
1113             ++outIdx;
1114             continue;
1115         }
1116 
1117         if (aggOp == ROWAGG_GROUP_CONCAT)
1118         {
1119             TupleInfo ti = getTupleInfo(key, jobInfo);
1120             uint32_t ptrSize = sizeof(GroupConcatAg*);
1121             uint32_t width = (ti.width >= ptrSize) ? ti.width : ptrSize;
1122             oidsAgg.push_back(ti.oid);
1123             keysAgg.push_back(key);
1124             scaleAgg.push_back(ti.scale);
1125             precisionAgg.push_back(ti.precision);
1126             typeAgg.push_back(ti.dtype);
1127             csNumAgg.push_back(ti.csNum);
1128             widthAgg.push_back(width);
1129             SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(
1130                                        aggOp, stats, 0, outIdx, -1));
1131             functionVec.push_back(funct);
1132 
1133             ++outIdx;
1134             continue;
1135         }
1136 
1137         if (projColPosMap.find(key) == projColPosMap.end())
1138         {
1139             ostringstream emsg;
1140             emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
1141             cerr << "prep1PhaseAggregate: " << emsg.str()
1142                  << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[key].fId
1143                  << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
1144 
1145             if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
1146                 cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
1147 
1148             cerr << endl;
1149             throw logic_error(emsg.str());
1150         }
1151 
1152         // make sure the colProj is correct
1153         int64_t colProj = projColPosMap[key];
1154 
1155         if (keysProj[colProj] != key)
1156         {
1157             ostringstream emsg;
1158             emsg << "projection column map is out of sync.";
1159             cerr << "prep1PhaseAggregate: " << emsg.str() << endl;
1160             throw logic_error(emsg.str());
1161         }
1162 
1163         if (aggOp == ROWAGG_FUNCT_UNDEFINE)
1164         {
1165             // must be a groupby column or function on aggregation
1166             // or used by group_concat
1167             map<uint32_t, int>::iterator it = groupbyMap.find(key);
1168 
1169             if (it != groupbyMap.end())
1170             {
1171                 oidsAgg.push_back(oidsProj[colProj]);
1172                 keysAgg.push_back(key);
1173                 scaleAgg.push_back(scaleProj[colProj]);
1174                 precisionAgg.push_back(precisionProj[colProj]);
1175                 typeAgg.push_back(typeProj[colProj]);
1176                 csNumAgg.push_back(csNumProj[colProj]);
1177                 widthAgg.push_back(width[colProj]);
1178 
1179                 if (groupBy[it->second]->fOutputColumnIndex == (uint32_t) - 1)
1180                     groupBy[it->second]->fOutputColumnIndex = outIdx;
1181                 else
1182                     functionVec.push_back(SP_ROWAGG_FUNC_t(
1183                                               new RowAggFunctionCol(
1184                                                   ROWAGG_DUP_FUNCT,
1185                                                   ROWAGG_FUNCT_UNDEFINE,
1186                                                   -1,
1187                                                   outIdx,
1188                                                   groupBy[it->second]->fOutputColumnIndex)));
1189 
1190                 ++outIdx;
1191                 continue;
1192             }
1193             else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) !=
1194                      jobInfo.expressionVec.end())
1195             {
1196                 TupleInfo ti = getTupleInfo(key, jobInfo);
1197                 oidsAgg.push_back(ti.oid);
1198                 keysAgg.push_back(key);
1199                 scaleAgg.push_back(ti.scale);
1200                 precisionAgg.push_back(ti.precision);
1201                 typeAgg.push_back(ti.dtype);
1202                 csNumAgg.push_back(ti.csNum);
1203                 widthAgg.push_back(ti.width);
1204                 ++outIdx;
1205                 continue;
1206             }
1207             else if (jobInfo.groupConcatInfo.columns().find(key) !=
1208                      jobInfo.groupConcatInfo.columns().end())
1209             {
1210                 // TODO: columns only for group_concat do not needed in result set.
1211                 oidsAgg.push_back(oidsProj[colProj]);
1212                 keysAgg.push_back(key);
1213                 scaleAgg.push_back(scaleProj[colProj]);
1214                 precisionAgg.push_back(precisionProj[colProj]);
1215                 typeAgg.push_back(typeProj[colProj]);
1216                 csNumAgg.push_back(csNumProj[colProj]);
1217                 widthAgg.push_back(width[colProj]);
1218                 ++outIdx;
1219                 continue;
1220             }
1221             else if (jobInfo.windowSet.find(key) != jobInfo.windowSet.end())
1222             {
1223                 // skip window columns/expression, which are computed later
1224                 oidsAgg.push_back(oidsProj[colProj]);
1225                 keysAgg.push_back(key);
1226                 scaleAgg.push_back(scaleProj[colProj]);
1227                 precisionAgg.push_back(precisionProj[colProj]);
1228                 typeAgg.push_back(typeProj[colProj]);
1229                 csNumAgg.push_back(csNumProj[colProj]);
1230                 widthAgg.push_back(width[colProj]);
1231                 ++outIdx;
1232                 continue;
1233             }
1234             else
1235             {
1236                 Message::Args args;
1237                 args.add(keyName(i, key, jobInfo));
1238                 string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
1239                 cerr << "prep1PhaseAggregate: " << emsg << " oid="
1240                      << (int) jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias="
1241                      << jobInfo.keyInfo->tupleKeyVec[key].fTable << ", view="
1242                      << jobInfo.keyInfo->tupleKeyVec[key].fView << ", function="
1243                      << (int) aggOp << endl;
1244                 throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
1245             }
1246         }
1247 
1248         SP_ROWAGG_FUNC_t funct;
1249 
1250         if (aggOp == ROWAGG_UDAF)
1251         {
1252             std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
1253             for (; it != jobInfo.projectionCols.end(); it++)
1254             {
1255                 udafc = dynamic_cast<UDAFColumn*>((*it).get());
1256                 projColsUDAFIdx++;
1257                 if (udafc)
1258                 {
1259                     pUDAFFunc =  udafc->getContext().getFunction();
1260                     // Save the multi-parm keys for dup-detection.
1261                     if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
1262                     {
1263                         for (uint64_t k = i+1;
1264                              k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
1265                              ++k)
1266                         {
1267                             udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
1268                         }
1269                     }
1270                     // Create a RowAggFunctionCol (UDAF subtype) with the context.
1271                     funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, outIdx));
1272                     break;
1273                 }
1274             }
1275             if (it == jobInfo.projectionCols.end())
1276             {
1277                 throw logic_error("(1)prep1PhaseAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
1278             }
1279         }
1280         else
1281         {
1282             funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, outIdx));
1283         }
1284 
1285         functionVec.push_back(funct);
1286 
1287         switch (aggOp)
1288         {
1289             case ROWAGG_MIN:
1290             case ROWAGG_MAX:
1291             {
1292                 oidsAgg.push_back(oidsProj[colProj]);
1293                 keysAgg.push_back(key);
1294                 scaleAgg.push_back(scaleProj[colProj]);
1295                 precisionAgg.push_back(precisionProj[colProj]);
1296                 typeAgg.push_back(typeProj[colProj]);
1297                 csNumAgg.push_back(csNumProj[colProj]);
1298                  widthAgg.push_back(width[colProj]);
1299             }
1300             break;
1301 
1302             case ROWAGG_AVG:
1303                 avgFuncMap.insert(make_pair(key, funct));
1304                 /* fall through */
1305             case ROWAGG_SUM:
1306             {
1307                 if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
1308                         typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
1309                         typeProj[colProj] == CalpontSystemCatalog::BLOB ||
1310                         typeProj[colProj] == CalpontSystemCatalog::TEXT ||
1311                         typeProj[colProj] == CalpontSystemCatalog::DATE ||
1312                         typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
1313                         typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
1314                         typeProj[colProj] == CalpontSystemCatalog::TIME)
1315                 {
1316                     Message::Args args;
1317                     args.add("sum/average");
1318                     args.add(colTypeIdString(typeProj[colProj]));
1319                     string emsg =
1320                         IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
1321                     cerr << "prep1PhaseAggregate: " << emsg << endl;
1322                     throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
1323                 }
1324 
1325                 oidsAgg.push_back(oidsProj[colProj]);
1326                 keysAgg.push_back(key);
1327                 typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
1328                 csNumAgg.push_back(csNumProj[colProj]);
1329                 precisionAgg.push_back(-1);
1330                 widthAgg.push_back(sizeof(long double));
1331                 scaleAgg.push_back(0);
1332             }
1333             break;
1334 
1335             case ROWAGG_COUNT_COL_NAME:
1336             case ROWAGG_COUNT_ASTERISK:
1337             {
1338                 oidsAgg.push_back(oidsProj[colProj]);
1339                 keysAgg.push_back(key);
1340                 scaleAgg.push_back(0);
1341                 // work around count() in select subquery
1342                 precisionAgg.push_back(9999);
1343                 typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
1344                 csNumAgg.push_back(csNumProj[colProj]);
1345                 widthAgg.push_back(bigIntWidth);
1346             }
1347             break;
1348 
1349             case ROWAGG_STATS:
1350             {
1351                 if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
1352                         typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
1353                         typeProj[colProj] == CalpontSystemCatalog::TEXT ||
1354                         typeProj[colProj] == CalpontSystemCatalog::BLOB ||
1355                         typeProj[colProj] == CalpontSystemCatalog::DATE ||
1356                         typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
1357                         typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
1358                         typeProj[colProj] == CalpontSystemCatalog::TIME)
1359                 {
1360                     Message::Args args;
1361                     args.add("variance/standard deviation");
1362                     args.add(colTypeIdString(typeProj[colProj]));
1363                     string emsg =
1364                         IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
1365                     cerr << "prep1PhaseAggregate: " << emsg << endl;
1366                     throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
1367                 }
1368 
1369                 oidsAgg.push_back(oidsProj[colProj]);
1370                 keysAgg.push_back(key);
1371                 scaleAgg.push_back(scaleProj[colProj]);
1372                 precisionAgg.push_back(0);
1373                 typeAgg.push_back(CalpontSystemCatalog::DOUBLE);
1374                 csNumAgg.push_back(csNumProj[colProj]);
1375                 widthAgg.push_back(sizeof(double));
1376             }
1377             break;
1378 
1379             case ROWAGG_BIT_AND:
1380             case ROWAGG_BIT_OR:
1381             case ROWAGG_BIT_XOR:
1382             {
1383                 oidsAgg.push_back(oidsProj[colProj]);
1384                 keysAgg.push_back(key);
1385                 scaleAgg.push_back(0);
1386                 precisionAgg.push_back(-16);  // for connector to skip null check
1387 
1388                 if (isUnsigned(typeProj[colProj]))
1389                 {
1390                     typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
1391                 }
1392                 else
1393                 {
1394                     typeAgg.push_back(CalpontSystemCatalog::BIGINT);
1395                 }
1396 
1397                 csNumAgg.push_back(csNumProj[colProj]);
1398                 widthAgg.push_back(bigIntWidth);
1399             }
1400             break;
1401 
1402             case ROWAGG_UDAF:
1403             {
1404                 RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
1405 
1406                 if (!udafFuncCol)
1407                 {
1408                     throw logic_error("(2)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
1409                 }
1410 
1411                 // Return column
1412                 oidsAgg.push_back(oidsProj[colProj]);
1413                 keysAgg.push_back(key);
1414                 scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
1415                 precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
1416                 typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
1417                 csNumAgg.push_back(csNumProj[colProj]);
1418                 widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
1419                 break;
1420             }
1421 
1422             case ROWAGG_MULTI_PARM:
1423             {
1424             }
1425             break;
1426 
1427             default:
1428             {
1429                 ostringstream emsg;
1430                 emsg << "aggregate function (" << (uint64_t) aggOp << ") isn't supported";
1431                 cerr << "prep1PhaseAggregate: " << emsg.str() << endl;
1432                 throw QueryDataExcept(emsg.str(), aggregateFuncErr);
1433             }
1434         }
1435 
1436         // find if this func is a duplicate
1437         AGG_MAP::iterator iter = aggFuncMap.find(boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
1438 
1439         if (iter != aggFuncMap.end())
1440         {
1441             if (funct->fAggFunction == ROWAGG_AVG)
1442                 funct->fAggFunction = ROWAGG_DUP_AVG;
1443             else if (funct->fAggFunction == ROWAGG_STATS)
1444                 funct->fAggFunction = ROWAGG_DUP_STATS;
1445             else if (funct->fAggFunction == ROWAGG_UDAF)
1446                 funct->fAggFunction = ROWAGG_DUP_UDAF;
1447             else
1448                 funct->fAggFunction = ROWAGG_DUP_FUNCT;
1449 
1450             funct->fAuxColumnIndex = iter->second;
1451         }
1452         else
1453         {
1454             aggFuncMap.insert(make_pair(boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex));
1455         }
1456 
1457         if (aggOp != ROWAGG_MULTI_PARM)
1458         {
1459             ++outIdx;
1460         }
1461     }
1462 
1463     // now fix the AVG function, locate the count(column) position
1464     for (uint64_t i = 0; i < functionVec.size(); i++)
1465     {
1466         if (functionVec[i]->fAggFunction != ROWAGG_COUNT_COL_NAME)
1467             continue;
1468 
1469         // if the count(k) can be associated with an avg(k)
1470         map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
1471             avgFuncMap.find(keysAgg[functionVec[i]->fOutputColumnIndex]);
1472 
1473         if (k != avgFuncMap.end())
1474         {
1475             k->second->fAuxColumnIndex = functionVec[i]->fOutputColumnIndex;
1476             functionVec[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
1477         }
1478     }
1479 
1480     uint64_t lastCol = outIdx;
1481 
1482     // there is avg(k), but no count(k) in the select list
1483     for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
1484     {
1485         if (k->second->fAuxColumnIndex == (uint32_t) - 1)
1486         {
1487             k->second->fAuxColumnIndex = lastCol++;
1488             oidsAgg.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
1489             keysAgg.push_back(k->first);
1490             scaleAgg.push_back(0);
1491             precisionAgg.push_back(19);
1492             typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
1493             widthAgg.push_back(bigIntWidth);
1494         }
1495     }
1496 
1497     // add auxiliary fields for UDAF and statistics functions
1498     for (uint64_t i = 0; i < functionVec.size(); i++)
1499     {
1500         uint64_t j = functionVec[i]->fInputColumnIndex;
1501 
1502         if (functionVec[i]->fAggFunction == ROWAGG_UDAF)
1503         {
1504             // Column for index of UDAF UserData struct
1505             RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec[i].get());
1506 
1507             if (!udafFuncCol)
1508             {
1509                 throw logic_error("(3)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
1510             }
1511 
1512             functionVec[i]->fAuxColumnIndex = lastCol++;
1513             oidsAgg.push_back(oidsProj[j]);
1514             keysAgg.push_back(keysProj[j]);
1515             scaleAgg.push_back(0);
1516             precisionAgg.push_back(0);
1517             precisionAgg.push_back(0);
1518             typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
1519             csNumAgg.push_back(8);
1520             widthAgg.push_back(bigUintWidth);
1521             continue;
1522         }
1523 
1524         if (functionVec[i]->fAggFunction != ROWAGG_STATS)
1525             continue;
1526 
1527         functionVec[i]->fAuxColumnIndex = lastCol;
1528 
1529         // sum(x)
1530         oidsAgg.push_back(oidsProj[j]);
1531         keysAgg.push_back(keysProj[j]);
1532         scaleAgg.push_back(0);
1533         precisionAgg.push_back(-1);
1534         typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
1535         csNumAgg.push_back(8);
1536         widthAgg.push_back(sizeof(long double));
1537         ++lastCol;
1538 
1539         // sum(x**2)
1540         oidsAgg.push_back(oidsProj[j]);
1541         keysAgg.push_back(keysProj[j]);
1542         scaleAgg.push_back(0);
1543         precisionAgg.push_back(-1);
1544         typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
1545         csNumAgg.push_back(8);
1546         widthAgg.push_back(sizeof(long double));
1547         ++lastCol;
1548     }
1549 
1550     // calculate the offset and create the rowaggregation, rowgroup
1551     posAgg.push_back(2);
1552 
1553     for (uint64_t i = 0; i < oidsAgg.size(); i++)
1554         posAgg.push_back(posAgg[i] + widthAgg[i]);
1555 
1556     RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
1557                    jobInfo.stringTableThreshold);
1558     SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit));
1559     rowAgg->timeZone(jobInfo.timeZone);
1560     rowgroups.push_back(aggRG);
1561     aggregators.push_back(rowAgg);
1562 
1563     // mapping the group_concat columns, if any.
1564     if (jobInfo.groupConcatInfo.groupConcat().size() > 0)
1565     {
1566         jobInfo.groupConcatInfo.mapColumns(projRG);
1567         rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
1568     }
1569 
1570     if (jobInfo.trace)
1571         cout << "\n====== Aggregation RowGroups ======" << endl
1572              << "projected  RG: " << projRG.toString() << endl
1573              << "aggregated RG: " << aggRG.toString() << endl;
1574 }
1575 
1576 
prep1PhaseDistinctAggregate(JobInfo & jobInfo,vector<RowGroup> & rowgroups,vector<SP_ROWAGG_t> & aggregators)1577 void TupleAggregateStep::prep1PhaseDistinctAggregate(
1578     JobInfo& jobInfo,
1579     vector<RowGroup>& rowgroups,
1580     vector<SP_ROWAGG_t>& aggregators)
1581 {
1582     // check if there are any aggregate columns
1583     vector<pair<uint32_t, int> > aggColVec;
1584     vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
1585 
1586     for (uint64_t i = 0; i < returnedColVec.size(); i++)
1587     {
1588         if (returnedColVec[i].second != 0)
1589             aggColVec.push_back(returnedColVec[i]);
1590     }
1591 
1592     // populate the aggregate rowgroup: projectedRG   -> aggregateRG
1593     //
1594     // Aggregate preparation by joblist factory:
1595     // 1. get projected rowgroup (done by doAggProject) -- passed in
1596     // 2. construct aggregate rowgroup  -- output of UM
1597     const RowGroup projRG = rowgroups[0];
1598     const vector<uint32_t>& oidsProj = projRG.getOIDs();
1599     const vector<uint32_t>& keysProj = projRG.getKeys();
1600     const vector<uint32_t>& scaleProj = projRG.getScale();
1601     const vector<uint32_t>& precisionProj = projRG.getPrecision();
1602     const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
1603     const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
1604 
1605     vector<uint32_t> posAgg, posAggDist;
1606     vector<uint32_t> oidsAgg, oidsAggDist;
1607     vector<uint32_t> keysAgg, keysAggDist;
1608     vector<uint32_t> scaleAgg, scaleAggDist;
1609     vector<uint32_t> precisionAgg, precisionAggDist;
1610     vector<CalpontSystemCatalog::ColDataType> typeAgg, typeAggDist;
1611     vector<uint32_t> csNumAgg, csNumAggDist;
1612     vector<uint32_t> widthProj, widthAgg, widthAggDist;
1613     vector<SP_ROWAGG_GRPBY_t> groupBy, groupByNoDist;
1614     vector<SP_ROWAGG_FUNC_t> functionVec1, functionVec2, functionNoDistVec;
1615     uint32_t bigIntWidth = sizeof(int64_t);
1616     // map key = column key, operation (enum), and UDAF pointer if UDAF.
1617     AGG_MAP aggFuncMap;
1618 //    set<uint32_t> avgSet;
1619     list<uint32_t> multiParmIndexes;
1620 
1621     // fOR udaf
1622     UDAFColumn* udafc = NULL;
1623     mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
1624     uint32_t projColsUDAFIdx = 0;
1625     uint32_t udafcParamIdx = 0;
1626 
1627     // for count column of average function
1628     map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
1629 
1630     // associate the columns between projected RG and aggregate RG on UM
1631     // populated the aggregate columns
1632     //     the groupby columns are put in front, even not a returned column
1633     //     sum and count(column name) are omitted, if avg present
1634     {
1635         // project only unique oids, but they may be repeated in aggregation
1636         // collect the projected column info, prepare for aggregation
1637         map<uint32_t, int> projColPosMap;
1638 
1639         for (uint64_t i = 0; i < keysProj.size(); i++)
1640         {
1641             projColPosMap.insert(make_pair(keysProj[i], i));
1642             widthProj.push_back(projRG.getColumnWidth(i));
1643         }
1644 
1645         // column index for aggregate rowgroup
1646         uint64_t colAgg = 0;
1647 
1648         // for groupby column
1649         for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
1650         {
1651             uint32_t key = jobInfo.groupByColVec[i];
1652 
1653             if (projColPosMap.find(key) == projColPosMap.end())
1654             {
1655                 ostringstream emsg;
1656                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
1657                 cerr << "prep1PhaseDistinctAggregate: groupby " << emsg.str()
1658                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[key].fId
1659                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
1660 
1661                 if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
1662                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
1663 
1664                 cerr << endl;
1665                 throw logic_error(emsg.str());
1666             }
1667 
1668             uint64_t colProj = projColPosMap[key];
1669 
1670             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg));
1671             groupBy.push_back(groupby);
1672 
1673             // copy down to aggregation rowgroup
1674             oidsAgg.push_back(oidsProj[colProj]);
1675             keysAgg.push_back(key);
1676             scaleAgg.push_back(scaleProj[colProj]);
1677             precisionAgg.push_back(precisionProj[colProj]);
1678             typeAgg.push_back(typeProj[colProj]);
1679             csNumAgg.push_back(csNumProj[colProj]);
1680             widthAgg.push_back(widthProj[colProj]);
1681 
1682             aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
1683             colAgg++;
1684         }
1685 
1686         // for distinct column
1687         for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
1688         {
1689             uint32_t key = jobInfo.distinctColVec[i];
1690 
1691             if (projColPosMap.find(key) == projColPosMap.end())
1692             {
1693                 ostringstream emsg;
1694                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
1695                 cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str()
1696                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[key].fId
1697                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
1698 
1699                 if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
1700                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
1701 
1702                 cerr << endl;
1703                 throw logic_error(emsg.str());
1704             }
1705 
1706             // check for dup distinct column -- @bug6126
1707             if (find(keysAgg.begin(), keysAgg.end(), key) != keysAgg.end())
1708                 continue;
1709 
1710             uint64_t colProj = projColPosMap[key];
1711 
1712             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg));
1713             groupBy.push_back(groupby);
1714 
1715             // copy down to aggregation rowgroup
1716             oidsAgg.push_back(oidsProj[colProj]);
1717             keysAgg.push_back(key);
1718             scaleAgg.push_back(scaleProj[colProj]);
1719             precisionAgg.push_back(precisionProj[colProj]);
1720             typeAgg.push_back(typeProj[colProj]);
1721             csNumAgg.push_back(csNumProj[colProj]);
1722             widthAgg.push_back(widthProj[colProj]);
1723 
1724             aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
1725             colAgg++;
1726         }
1727 
1728         // vectors for aggregate functions
1729         for (uint64_t i = 0; i < aggColVec.size(); i++)
1730         {
1731             pUDAFFunc = NULL;
1732             uint32_t aggKey = aggColVec[i].first;
1733             RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second);
1734             RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
1735 
1736             // skip if this is a constant
1737             if (aggOp == ROWAGG_CONSTANT)
1738                 continue;
1739 
1740             // skip if this is a group_concat
1741             if (aggOp == ROWAGG_GROUP_CONCAT)
1742             {
1743                 TupleInfo ti = getTupleInfo(aggKey, jobInfo);
1744                 uint32_t width = sizeof(GroupConcatAg*);
1745                 oidsAgg.push_back(ti.oid);
1746                 keysAgg.push_back(aggKey);
1747                 scaleAgg.push_back(ti.scale);
1748                 precisionAgg.push_back(ti.precision);
1749                 typeAgg.push_back(ti.dtype);
1750                 csNumAgg.push_back(ti.csNum);
1751                 widthAgg.push_back(width);
1752                 SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(
1753                                            aggOp, stats, colAgg, colAgg, -1));
1754                 functionVec1.push_back(funct);
1755                 aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
1756                 colAgg++;
1757 
1758                 continue;
1759             }
1760 
1761             if (projColPosMap.find(aggKey) == projColPosMap.end())
1762             {
1763                 ostringstream emsg;
1764                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
1765                 cerr << "prep1PhaseDistinctAggregate: aggregate " << emsg.str()
1766                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[aggKey].fId
1767                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
1768 
1769                 if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
1770                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
1771 
1772                 cerr << endl;
1773                 throw logic_error(emsg.str());
1774             }
1775 
1776             // skip sum / count(column) if avg is also selected
1777 //            if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) &&
1778 //                    (avgSet.find(aggKey) != avgSet.end()))
1779 //                continue;
1780 
1781             if (aggOp == ROWAGG_DISTINCT_SUM ||
1782                     aggOp == ROWAGG_DISTINCT_AVG ||
1783                     aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
1784                 continue;
1785 
1786             uint64_t colProj = projColPosMap[aggKey];
1787 
1788             SP_ROWAGG_FUNC_t funct;
1789 
1790             if (aggOp == ROWAGG_UDAF)
1791             {
1792                 std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
1793 
1794                 for (; it != jobInfo.projectionCols.end(); it++)
1795                 {
1796                     udafc = dynamic_cast<UDAFColumn*>((*it).get());
1797                     projColsUDAFIdx++;
1798 
1799                     if (udafc)
1800                     {
1801                         pUDAFFunc =  udafc->getContext().getFunction();
1802                         // Save the multi-parm keys for dup-detection.
1803                         if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
1804                         {
1805                             for (uint64_t k = i+1;
1806                                  k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
1807                                  ++k)
1808                             {
1809                                 udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
1810                             }
1811                         }
1812                         // Create a RowAggFunctionCol (UDAF subtype) with the context.
1813                         funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
1814                         break;
1815                     }
1816                 }
1817                 if (it == jobInfo.projectionCols.end())
1818                 {
1819                     throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
1820                 }
1821             }
1822             else
1823             {
1824                 funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
1825             }
1826 
1827             // skip if this is a duplicate
1828             if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end())
1829                 continue;
1830 
1831             functionVec1.push_back(funct);
1832             aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
1833 
1834             switch (aggOp)
1835             {
1836                 case ROWAGG_MIN:
1837                 case ROWAGG_MAX:
1838                 {
1839                     oidsAgg.push_back(oidsProj[colProj]);
1840                     keysAgg.push_back(aggKey);
1841                     scaleAgg.push_back(scaleProj[colProj]);
1842                     precisionAgg.push_back(precisionProj[colProj]);
1843                     typeAgg.push_back(typeProj[colProj]);
1844                     csNumAgg.push_back(csNumProj[colProj]);
1845                     widthAgg.push_back(widthProj[colProj]);
1846                     colAgg++;
1847                 }
1848                 break;
1849 
1850                 case ROWAGG_SUM:
1851                 case ROWAGG_AVG:
1852                 {
1853                     if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
1854                             typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
1855                             typeProj[colProj] == CalpontSystemCatalog::BLOB ||
1856                             typeProj[colProj] == CalpontSystemCatalog::TEXT ||
1857                             typeProj[colProj] == CalpontSystemCatalog::DATE ||
1858                             typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
1859                             typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
1860                             typeProj[colProj] == CalpontSystemCatalog::TIME)
1861                     {
1862                         Message::Args args;
1863                         args.add("sum/average");
1864                         args.add(colTypeIdString(typeProj[colProj]));
1865                         string emsg = IDBErrorInfo::instance()->
1866                                       errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
1867                         cerr << "prep1PhaseDistinctAggregate: " << emsg << endl;
1868                         throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
1869                     }
1870 
1871                     oidsAgg.push_back(oidsProj[colProj]);
1872                     keysAgg.push_back(aggKey);
1873                     typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
1874                     csNumAgg.push_back(8);
1875                     precisionAgg.push_back(-1);
1876                     widthAgg.push_back(sizeof(long double));
1877                     scaleAgg.push_back(0);
1878                     colAgg++;
1879 
1880                 // has distinct step, put the count column for avg next to the sum
1881                 // let fall through to add a count column for average function
1882                 if (aggOp == ROWAGG_AVG)
1883                     funct->fAuxColumnIndex = colAgg;
1884                 else
1885                     break;
1886                 }
1887                 /* fall through */
1888 
1889                 case ROWAGG_COUNT_ASTERISK:
1890                 case ROWAGG_COUNT_COL_NAME:
1891                 {
1892                     oidsAgg.push_back(oidsProj[colProj]);
1893                     keysAgg.push_back(aggKey);
1894                     scaleAgg.push_back(0);
1895                     // work around count() in select subquery
1896                     precisionAgg.push_back(9999);
1897 
1898                     if (isUnsigned(typeProj[colProj]))
1899                     {
1900                         typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
1901                     }
1902                     else
1903                     {
1904                         typeAgg.push_back(CalpontSystemCatalog::BIGINT);
1905                     }
1906 
1907                     csNumAgg.push_back(8);
1908                     widthAgg.push_back(bigIntWidth);
1909                     colAgg++;
1910                 }
1911                 break;
1912 
1913                 case ROWAGG_STATS:
1914                 {
1915                     if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
1916                             typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
1917                             typeProj[colProj] == CalpontSystemCatalog::BLOB ||
1918                             typeProj[colProj] == CalpontSystemCatalog::TEXT ||
1919                             typeProj[colProj] == CalpontSystemCatalog::DATE ||
1920                             typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
1921                             typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
1922                             typeProj[colProj] == CalpontSystemCatalog::TIME)
1923                     {
1924                         Message::Args args;
1925                         args.add("variance/standard deviation");
1926                         args.add(colTypeIdString(typeProj[colProj]));
1927                         string emsg = IDBErrorInfo::instance()->
1928                                       errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
1929                         cerr << "prep1PhaseDistinctAggregate:: " << emsg << endl;
1930                         throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
1931                     }
1932 
1933                     // count(x)
1934                     oidsAgg.push_back(oidsProj[colProj]);
1935                     keysAgg.push_back(aggKey);
1936                     scaleAgg.push_back(scaleProj[colProj]);
1937                     precisionAgg.push_back(0);
1938                     typeAgg.push_back(CalpontSystemCatalog::DOUBLE);
1939                     csNumAgg.push_back(8);
1940                     widthAgg.push_back(sizeof(double));
1941                     funct->fAuxColumnIndex = ++colAgg;
1942 
1943                     // sum(x)
1944                     oidsAgg.push_back(oidsProj[colProj]);
1945                     keysAgg.push_back(aggKey);
1946                     scaleAgg.push_back(0);
1947                     precisionAgg.push_back(-1);
1948                     typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
1949                     csNumAgg.push_back(8);
1950                     widthAgg.push_back(sizeof(long double));
1951                     ++colAgg;
1952 
1953                     // sum(x**2)
1954                     oidsAgg.push_back(oidsProj[colProj]);
1955                     keysAgg.push_back(aggKey);
1956                     scaleAgg.push_back(0);
1957                     precisionAgg.push_back(-1);
1958                     typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
1959                     csNumAgg.push_back(8);
1960                     widthAgg.push_back(sizeof(long double));
1961                     ++colAgg;
1962                 }
1963                 break;
1964 
1965                 case ROWAGG_BIT_AND:
1966                 case ROWAGG_BIT_OR:
1967                 case ROWAGG_BIT_XOR:
1968                 {
1969                     oidsAgg.push_back(oidsProj[colProj]);
1970                     keysAgg.push_back(aggKey);
1971                     scaleAgg.push_back(0);
1972                     precisionAgg.push_back(-16);  // for connector to skip null check
1973 
1974                     if (isUnsigned(typeProj[colProj]))
1975                     {
1976                         typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
1977                     }
1978                     else
1979                     {
1980                         typeAgg.push_back(CalpontSystemCatalog::BIGINT);
1981                     }
1982 
1983                     csNumAgg.push_back(8);
1984                     widthAgg.push_back(bigIntWidth);
1985                     colAgg++;
1986                 }
1987                 break;
1988 
1989                 case ROWAGG_UDAF:
1990                 {
1991                     RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
1992 
1993                     if (!udafFuncCol)
1994                     {
1995                         throw logic_error("(2)prep1PhaseDistinctAggregate A UDAF function is called but there's no RowUDAFFunctionCol");
1996                     }
1997 
1998                     // Return column
1999                     oidsAgg.push_back(oidsProj[colProj]);
2000                     keysAgg.push_back(aggKey);
2001                     scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
2002                     precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
2003                     typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
2004                     csNumAgg.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
2005                     widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
2006                     ++colAgg;
2007                     // Column for index of UDAF UserData struct
2008                     oidsAgg.push_back(oidsProj[colProj]);
2009                     keysAgg.push_back(aggKey);
2010                     scaleAgg.push_back(0);
2011                     precisionAgg.push_back(0);
2012                     typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
2013                     csNumAgg.push_back(8);
2014                     widthAgg.push_back(sizeof(uint64_t));
2015                     funct->fAuxColumnIndex = colAgg++;
2016                     // If the first param is const
2017                     udafcParamIdx = 0;
2018                     ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
2019                     if (cc)
2020                     {
2021                         funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
2022                     }
2023                     ++udafcParamIdx;
2024                     break;
2025                 }
2026 
2027                 case ROWAGG_MULTI_PARM:
2028                 {
2029                     oidsAgg.push_back(oidsProj[colProj]);
2030                     keysAgg.push_back(aggKey);
2031                     scaleAgg.push_back(scaleProj[colProj]);
2032                     precisionAgg.push_back(precisionProj[colProj]);
2033                     typeAgg.push_back(typeProj[colProj]);
2034                     csNumAgg.push_back(csNumProj[colProj]);
2035                     widthAgg.push_back(widthProj[colProj]);
2036                     multiParmIndexes.push_back(colAgg);
2037                     ++colAgg;
2038                     // If the param is const
2039                     if (udafc)
2040                     {
2041                         if (udafcParamIdx > udafc->aggParms().size() - 1)
2042                         {
2043                             throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr);
2044                         }
2045                         ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
2046                         if (cc)
2047                         {
2048                             funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
2049                         }
2050                     }
2051                     else
2052                     {
2053                         throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr);
2054                     }
2055                     ++udafcParamIdx;
2056                 }
2057                 break;
2058 
2059                 default:
2060                 {
2061                     ostringstream emsg;
2062                     emsg << "aggregate function (" << (uint64_t) aggOp << ") isn't supported";
2063                     cerr << "prep1PhaseDistinctAggregate: " << emsg.str() << endl;
2064                     throw QueryDataExcept(emsg.str(), aggregateFuncErr);
2065                 }
2066             }
2067         }
2068     }
2069 
2070     // populated the functionNoDistVec
2071     {
2072 //		for (uint32_t idx = 0; idx < functionVec1.size(); idx++)
2073 //		{
2074 //			SP_ROWAGG_FUNC_t func1 = functionVec1[idx];
2075 //			SP_ROWAGG_FUNC_t funct(
2076 //					new RowAggFunctionCol(func1->fAggFunction,
2077 //					func1->fStatsFunction,
2078 //					func1->fOutputColumnIndex,
2079 //					func1->fOutputColumnIndex,
2080 //					func1->fAuxColumnIndex));
2081 //			functionNoDistVec.push_back(funct);
2082 //		}
2083         functionNoDistVec = functionVec1;
2084     }
2085 
2086     // associate the columns between the non-distinct aggregator and distinct aggregator
2087     // populated the returned columns
2088     //     remove not returned groupby column
2089     //     add back sum or count(column name) if omitted due to avg column
2090     //     put count(column name) column to the end, if it is for avg only
2091     {
2092         // check if the count column for AVG is also a returned column,
2093         // if so, replace the "-1" to actual position in returned vec.
2094         AGG_MAP aggDupFuncMap;
2095         projColsUDAFIdx = 0;
2096         int64_t multiParms = 0;
2097 
2098         // copy over the groupby vector
2099         // update the outputColumnIndex if returned
2100         for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
2101         {
2102             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
2103             groupByNoDist.push_back(groupby);
2104             aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), i));
2105         }
2106 
2107         // locate the return column position in aggregated rowgroup
2108         uint64_t outIdx = 0;
2109         for (uint64_t i = 0; i < returnedColVec.size(); i++)
2110         {
2111             udafc = NULL;
2112             pUDAFFunc = NULL;
2113             uint32_t retKey = returnedColVec[i].first;
2114             RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
2115             RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
2116             int colAgg = -1;
2117 
2118             if (aggOp == ROWAGG_MULTI_PARM)
2119             {
2120                 // Skip on final agg.: Extra parms for an aggregate have no work there.
2121                 ++multiParms;
2122                 continue;
2123             }
2124 
2125             if  (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
2126                     jobInfo.distinctColVec.end() )
2127             {
2128                 AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
2129 
2130                 if (it != aggFuncMap.end())
2131                 {
2132                     colAgg = it->second;
2133                 }
2134                 else
2135                 {
2136                     ostringstream emsg;
2137                     emsg << "'" << jobInfo.keyInfo->tupleKeyToName[retKey] << "' isn't in tuple.";
2138                     cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str()
2139                          << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId
2140                          << ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable;
2141 
2142                     if (jobInfo.keyInfo->tupleKeyVec[retKey].fView.length() > 0)
2143                         cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView;
2144 
2145                     cerr << endl;
2146                     throw QueryDataExcept(emsg.str(), aggregateFuncErr);
2147                 }
2148             }
2149 
2150             if (aggOp == ROWAGG_UDAF)
2151             {
2152                 std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
2153                 for (; it != jobInfo.projectionCols.end(); it++)
2154                 {
2155                     udafc = dynamic_cast<UDAFColumn*>((*it).get());
2156                     projColsUDAFIdx++;
2157                     if (udafc)
2158                     {
2159                         pUDAFFunc =  udafc->getContext().getFunction();
2160                         // Save the multi-parm keys for dup-detection.
2161                         if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
2162                         {
2163                             for (uint64_t k = i+1;
2164                                  k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
2165                                  ++k)
2166                             {
2167                                 udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
2168                             }
2169                         }
2170                         break;
2171                     }
2172                 }
2173                 if (it == jobInfo.projectionCols.end())
2174                 {
2175                     throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
2176                 }
2177             }
2178 
2179             switch (aggOp)
2180             {
2181                 case ROWAGG_DISTINCT_AVG:
2182                 case ROWAGG_DISTINCT_SUM:
2183                 {
2184                     if (typeAgg[colAgg] == CalpontSystemCatalog::CHAR ||
2185                             typeAgg[colAgg] == CalpontSystemCatalog::VARCHAR ||
2186                             typeAgg[colAgg] == CalpontSystemCatalog::BLOB ||
2187                             typeAgg[colAgg] == CalpontSystemCatalog::TEXT ||
2188                             typeAgg[colAgg] == CalpontSystemCatalog::DATE ||
2189                             typeAgg[colAgg] == CalpontSystemCatalog::DATETIME ||
2190                             typeAgg[colAgg] == CalpontSystemCatalog::TIMESTAMP ||
2191                             typeAgg[colAgg] == CalpontSystemCatalog::TIME)
2192                     {
2193                         Message::Args args;
2194                         args.add("sum/average");
2195                         args.add(colTypeIdString(typeAgg[colAgg]));
2196                         string emsg = IDBErrorInfo::instance()->
2197                                       errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
2198                         cerr << "prep1PhaseDistinctAggregate: " << emsg << endl;
2199                         throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
2200                     }
2201 
2202                     oidsAggDist.push_back(oidsAgg[colAgg]);
2203                     keysAggDist.push_back(retKey);
2204                     typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
2205                     csNumAggDist.push_back(8);
2206                     precisionAggDist.push_back(-1);
2207                     widthAggDist.push_back(sizeof(long double));
2208                     scaleAggDist.push_back(0);
2209                 }
2210                 break;
2211 
2212                 case ROWAGG_COUNT_DISTINCT_COL_NAME:
2213                 {
2214                     oidsAggDist.push_back(oidsAgg[colAgg]);
2215                     keysAggDist.push_back(retKey);
2216                     scaleAggDist.push_back(0);
2217                     // work around count() in select subquery
2218                     precisionAggDist.push_back(9999);
2219                     typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
2220                     csNumAggDist.push_back(8);
2221                     widthAggDist.push_back(bigIntWidth);
2222                 }
2223                 break;
2224 
2225                 case ROWAGG_MIN:
2226                 case ROWAGG_MAX:
2227                 case ROWAGG_SUM:
2228                 case ROWAGG_AVG:
2229                 case ROWAGG_COUNT_ASTERISK:
2230                 case ROWAGG_COUNT_COL_NAME:
2231                 case ROWAGG_STATS:
2232                 case ROWAGG_BIT_AND:
2233                 case ROWAGG_BIT_OR:
2234                 case ROWAGG_BIT_XOR:
2235                 default:
2236                 {
2237                     AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
2238 
2239                     if (it != aggFuncMap.end())
2240                     {
2241                         colAgg = it->second;
2242                         oidsAggDist.push_back(oidsAgg[colAgg]);
2243                         keysAggDist.push_back(keysAgg[colAgg]);
2244                         scaleAggDist.push_back(scaleAgg[colAgg]);
2245                         precisionAggDist.push_back(precisionAgg[colAgg]);
2246                         typeAggDist.push_back(typeAgg[colAgg]);
2247                         csNumAggDist.push_back(csNumAgg[colAgg]);
2248                         uint32_t width = widthAgg[colAgg];
2249 
2250                         if (aggOp == ROWAGG_GROUP_CONCAT)
2251                         {
2252                             TupleInfo ti = getTupleInfo(retKey, jobInfo);
2253 
2254                             if (ti.width > width)
2255                                 width = ti.width;
2256                         }
2257 
2258                         widthAggDist.push_back(width);
2259                     }
2260 
2261                     // not a direct hit -- a returned column is not already in the RG from PMs
2262                     else
2263                     {
2264                         bool returnColMissing = true;
2265 
2266                         // check if a SUM or COUNT covered by AVG
2267                         if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
2268                         {
2269                             it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
2270 
2271                             if (it != aggFuncMap.end())
2272                             {
2273                                 // false alarm
2274                                 returnColMissing = false;
2275 
2276                                 colAgg = it->second;
2277 
2278                                 if (aggOp == ROWAGG_SUM)
2279                                 {
2280                                     oidsAggDist.push_back(oidsAgg[colAgg]);
2281                                     keysAggDist.push_back(retKey);
2282                                     scaleAggDist.push_back(0);
2283                                     typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
2284                                     csNumAggDist.push_back(8);
2285                                     precisionAggDist.push_back(-1);
2286                                     widthAggDist.push_back(sizeof(long double));
2287                                 }
2288                                 else
2289                                 {
2290                                     // leave the count() to avg
2291                                     aggOp = ROWAGG_COUNT_NO_OP;
2292 
2293                                     oidsAggDist.push_back(oidsAgg[colAgg]);
2294                                     keysAggDist.push_back(retKey);
2295                                     scaleAggDist.push_back(0);
2296 
2297                                     if (isUnsigned(typeAgg[colAgg]))
2298                                     {
2299                                         typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
2300                                         precisionAggDist.push_back(20);
2301                                     }
2302                                     else
2303                                     {
2304                                         typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
2305                                         precisionAggDist.push_back(19);
2306                                     }
2307                                     csNumAggDist.push_back(8);
2308                                     widthAggDist.push_back(bigIntWidth);
2309                                 }
2310                             }
2311                         }
2312                         else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(),
2313                                       retKey) != jobInfo.expressionVec.end())
2314                         {
2315                             // a function on aggregation
2316                             TupleInfo ti = getTupleInfo(retKey, jobInfo);
2317                             oidsAggDist.push_back(ti.oid);
2318                             keysAggDist.push_back(retKey);
2319                             scaleAggDist.push_back(ti.scale);
2320                             precisionAggDist.push_back(ti.precision);
2321                             typeAggDist.push_back(ti.dtype);
2322                             csNumAggDist.push_back(ti.csNum);
2323                             widthAggDist.push_back(ti.width);
2324 
2325                             returnColMissing = false;
2326                         }
2327                         else if (aggOp == ROWAGG_CONSTANT)
2328                         {
2329                             TupleInfo ti = getTupleInfo(retKey, jobInfo);
2330                             oidsAggDist.push_back(ti.oid);
2331                             keysAggDist.push_back(retKey);
2332                             scaleAggDist.push_back(ti.scale);
2333                             precisionAggDist.push_back(ti.precision);
2334                             typeAggDist.push_back(ti.dtype);
2335                             csNumAggDist.push_back(ti.csNum);
2336                             widthAggDist.push_back(ti.width);
2337 
2338                             returnColMissing = false;
2339                         }
2340 
2341 #if 0
2342                         else if (aggOp == ROWAGG_GROUP_CONCAT)
2343                         {
2344                             TupleInfo ti = getTupleInfo(retKey, jobInfo);
2345                             oidsAggDist.push_back(ti.oid);
2346                             keysAggDist.push_back(retKey);
2347                             scaleAggDist.push_back(ti.scale);
2348                             precisionAggDist.push_back(ti.precision);
2349                             typeAggDist.push_back(ti.dtype);
2350                             csNumAggDist.push_back(ti.csNum);
2351                             widthAggDist.push_back(ti.width);
2352 
2353                             returnColMissing = false;
2354                         }
2355 
2356 #endif
2357                         else if (jobInfo.groupConcatInfo.columns().find(retKey) !=
2358                                  jobInfo.groupConcatInfo.columns().end())
2359                         {
2360                             // TODO: columns only for group_concat do not needed in result set.
2361                             for (uint64_t k = 0; k < keysProj.size(); k++)
2362                             {
2363                                 if (retKey == keysProj[k])
2364                                 {
2365                                     oidsAggDist.push_back(oidsProj[k]);
2366                                     keysAggDist.push_back(retKey);
2367                                     scaleAggDist.push_back(scaleProj[k] >> 8);
2368                                     precisionAggDist.push_back(precisionProj[k]);
2369                                     typeAggDist.push_back(typeProj[k]);
2370                                     csNumAggDist.push_back(csNumProj[k]);
2371                                     widthAggDist.push_back(widthProj[k]);
2372 
2373                                     returnColMissing = false;
2374                                     break;
2375                                 }
2376                             }
2377                         }
2378 
2379                         else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
2380                         {
2381                             // skip window columns/expression, which are computed later
2382                             for (uint64_t k = 0; k < keysProj.size(); k++)
2383                             {
2384                                 if (retKey == keysProj[k])
2385                                 {
2386                                     oidsAggDist.push_back(oidsProj[k]);
2387                                     keysAggDist.push_back(retKey);
2388                                     scaleAggDist.push_back(scaleProj[k] >> 8);
2389                                     precisionAggDist.push_back(precisionProj[k]);
2390                                     typeAggDist.push_back(typeProj[k]);
2391                                     csNumAggDist.push_back(csNumProj[k]);
2392                                     widthAggDist.push_back(widthProj[k]);
2393 
2394                                     returnColMissing = false;
2395                                     break;
2396                                 }
2397                             }
2398                         }
2399 
2400                         if (returnColMissing)
2401                         {
2402                             Message::Args args;
2403                             args.add(keyName(outIdx, retKey, jobInfo));
2404                             string emsg = IDBErrorInfo::instance()->
2405                                           errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
2406                             cerr << "prep1PhaseDistinctAggregate: " << emsg << " oid="
2407                                  << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias="
2408                                  << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view="
2409                                  << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function="
2410                                  << (int) aggOp << endl;
2411                             throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
2412                         }
2413                     } //else
2414                 } // switch
2415             }
2416 
2417             // update groupby vector if the groupby column is a returned column
2418             if (returnedColVec[i].second == 0)
2419             {
2420                 int dupGroupbyIndex = -1;
2421 
2422                 for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
2423                 {
2424                     if (jobInfo.groupByColVec[j] == retKey)
2425                     {
2426                         if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t) - 1)
2427                             groupByNoDist[j]->fOutputColumnIndex = outIdx;
2428                         else
2429                             dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex;
2430                     }
2431                 }
2432 
2433                 // a duplicate group by column
2434                 if (dupGroupbyIndex != -1)
2435                     functionVec2.push_back(SP_ROWAGG_FUNC_t(
2436                                                new RowAggFunctionCol(
2437                                                    ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
2438             }
2439             else
2440             {
2441                 // update the aggregate function vector
2442                 SP_ROWAGG_FUNC_t funct;
2443                 if (aggOp == ROWAGG_UDAF)
2444                 {
2445                     funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, outIdx));
2446                 }
2447                 else
2448                 {
2449                     funct.reset(new RowAggFunctionCol(aggOp, stats, colAgg, outIdx));
2450                 }
2451 
2452                 if (aggOp == ROWAGG_COUNT_NO_OP)
2453                     funct->fAuxColumnIndex = colAgg;
2454                 else if (aggOp == ROWAGG_CONSTANT)
2455                     funct->fAuxColumnIndex = jobInfo.cntStarPos;
2456 
2457                 functionVec2.push_back(funct);
2458 
2459                 // find if this func is a duplicate
2460                 AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
2461 
2462                 if (iter != aggDupFuncMap.end())
2463                 {
2464                     if (funct->fAggFunction == ROWAGG_AVG)
2465                         funct->fAggFunction = ROWAGG_DUP_AVG;
2466                     else if (funct->fAggFunction == ROWAGG_STATS)
2467                         funct->fAggFunction = ROWAGG_DUP_STATS;
2468                     else if (funct->fAggFunction == ROWAGG_UDAF)
2469                         funct->fAggFunction = ROWAGG_DUP_UDAF;
2470                     else
2471                         funct->fAggFunction = ROWAGG_DUP_FUNCT;
2472 
2473                     funct->fAuxColumnIndex = iter->second;
2474                 }
2475                 else
2476                 {
2477                     aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
2478                                                    funct->fOutputColumnIndex));
2479                 }
2480 
2481                 if (returnedColVec[i].second == AggregateColumn::AVG)
2482                     avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
2483                 else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
2484                     avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
2485             }
2486             ++outIdx;
2487         } // for (i
2488 
2489         // now fix the AVG function, locate the count(column) position
2490         for (uint64_t i = 0; i < functionVec2.size(); i++)
2491         {
2492             if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_NO_OP)
2493             {
2494                 // if the count(k) can be associated with an avg(k)
2495                 map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
2496                     avgFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]);
2497 
2498                 if (k != avgFuncMap.end())
2499                     k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex;
2500             }
2501         }
2502 
2503         // there is avg(k), but no count(k) in the select list
2504         uint64_t lastCol = outIdx;
2505 
2506         for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
2507         {
2508             if (k->second->fAuxColumnIndex == (uint32_t) - 1)
2509             {
2510                 k->second->fAuxColumnIndex = lastCol++;
2511                 oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
2512                 keysAggDist.push_back(k->first);
2513                 scaleAggDist.push_back(0);
2514                 precisionAggDist.push_back(19);
2515                 typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
2516                 csNumAggDist.push_back(8);
2517                 widthAggDist.push_back(bigIntWidth);
2518             }
2519         }
2520 
2521         // now fix the AVG distinct function, locate the count(distinct column) position
2522         for (uint64_t i = 0; i < functionVec2.size(); i++)
2523         {
2524             if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME)
2525             {
2526                 // if the count(distinct k) can be associated with an avg(distinct k)
2527                 map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
2528                     avgDistFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]);
2529 
2530                 if (k != avgDistFuncMap.end())
2531                 {
2532                     k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex;
2533                     functionVec2[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
2534                 }
2535             }
2536         }
2537 
2538         // there is avg(distinct k), but no count(distinct k) in the select list
2539         for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++)
2540         {
2541             if (k->second->fAuxColumnIndex == (uint32_t) - 1)
2542             {
2543                 k->second->fAuxColumnIndex = lastCol++;
2544                 oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
2545                 keysAggDist.push_back(k->first);
2546                 scaleAggDist.push_back(0);
2547                 precisionAggDist.push_back(19);
2548                 typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
2549                 csNumAggDist.push_back(8);
2550                 widthAggDist.push_back(bigIntWidth);
2551             }
2552         }
2553 
2554         // add auxiliary fields for UDAF and statistics functions
2555         for (uint64_t i = 0; i < functionVec2.size(); i++)
2556         {
2557             uint64_t j = functionVec2[i]->fInputColumnIndex;
2558 
2559             if (functionVec2[i]->fAggFunction == ROWAGG_UDAF)
2560             {
2561                 // Column for index of UDAF UserData struct
2562                 RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec2[i].get());
2563 
2564                 if (!udafFuncCol)
2565                 {
2566                     throw logic_error("(4)prep1PhaseDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
2567                 }
2568 
2569                 functionVec2[i]->fAuxColumnIndex = lastCol++;
2570                 oidsAggDist.push_back(oidsAgg[j]); // Dummy?
2571                 keysAggDist.push_back(keysAgg[j]); // Dummy?
2572                 scaleAggDist.push_back(0);
2573                 precisionAggDist.push_back(0);
2574                 typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
2575                 csNumAggDist.push_back(8);
2576                 widthAggDist.push_back(sizeof(uint64_t));
2577                 continue;
2578             }
2579 
2580             if (functionVec2[i]->fAggFunction != ROWAGG_STATS)
2581                 continue;
2582 
2583             functionVec2[i]->fAuxColumnIndex = lastCol;
2584 
2585             // sum(x)
2586             oidsAggDist.push_back(oidsAgg[j]);
2587             keysAggDist.push_back(keysAgg[j]);
2588             scaleAggDist.push_back(0);
2589             precisionAggDist.push_back(0);
2590             typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
2591             csNumAggDist.push_back(8);
2592             widthAggDist.push_back(sizeof(long double));
2593             ++lastCol;
2594 
2595             // sum(x**2)
2596             oidsAggDist.push_back(oidsAgg[j]);
2597             keysAggDist.push_back(keysAgg[j]);
2598             scaleAggDist.push_back(0);
2599             precisionAggDist.push_back(-1);
2600             typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
2601             csNumAggDist.push_back(8);
2602             widthAggDist.push_back(sizeof(long double));
2603             ++lastCol;
2604         }
2605     }
2606 
2607     // calculate the offset and create the rowaggregation, rowgroup
2608     posAgg.push_back(2);
2609 
2610     for (uint64_t i = 0; i < oidsAgg.size(); i++)
2611         posAgg.push_back(posAgg[i] + widthAgg[i]);
2612 
2613     RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
2614                    jobInfo.stringTableThreshold);
2615     SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit));
2616     rowAgg->timeZone(jobInfo.timeZone);
2617 
2618     posAggDist.push_back(2);   // rid
2619 
2620     for (uint64_t i = 0; i < oidsAggDist.size(); i++)
2621         posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
2622 
2623     RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist,
2624                        csNumAggDist, scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold);
2625     SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit));
2626     rowAggDist->timeZone(jobInfo.timeZone);
2627 
2628     // mapping the group_concat columns, if any.
2629     if (jobInfo.groupConcatInfo.groupConcat().size() > 0)
2630     {
2631         jobInfo.groupConcatInfo.mapColumns(projRG);
2632         rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
2633         rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat());
2634     }
2635 
2636     // if distinct key word applied to more than one aggregate column, reset rowAggDist
2637     vector<RowGroup> subRgVec;
2638 
2639     if (jobInfo.distinctColVec.size() > 1)
2640     {
2641         RowAggregationMultiDistinct* multiDistinctAggregator =
2642             new RowAggregationMultiDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit);
2643         multiDistinctAggregator->timeZone(jobInfo.timeZone);
2644         rowAggDist.reset(multiDistinctAggregator);
2645         rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat());
2646 
2647         // construct and add sub-aggregators to rowAggDist
2648         vector<uint32_t> posAggGb, posAggSub;
2649         vector<uint32_t> oidsAggGb, oidsAggSub;
2650         vector<uint32_t> keysAggGb, keysAggSub;
2651         vector<uint32_t> scaleAggGb, scaleAggSub;
2652         vector<uint32_t> precisionAggGb, precisionAggSub;
2653         vector<CalpontSystemCatalog::ColDataType> typeAggGb, typeAggSub;
2654         vector<uint32_t> csNumAggGb, csNumAggSub;
2655         vector<uint32_t> widthAggGb, widthAggSub;
2656 
2657         // populate groupby column info
2658         for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
2659         {
2660             oidsAggGb.push_back(oidsProj[i]);
2661             keysAggGb.push_back(keysProj[i]);
2662             scaleAggGb.push_back(scaleProj[i]);
2663             precisionAggGb.push_back(precisionProj[i]);
2664             typeAggGb.push_back(typeProj[i]);
2665             csNumAggGb.push_back(csNumProj[i]);
2666             widthAggGb.push_back(widthProj[i]);
2667         }
2668 
2669         // for distinct, each column requires seperate rowgroup
2670         vector<SP_ROWAGG_DIST> rowAggSubDistVec;
2671 
2672         for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
2673         {
2674             uint32_t distinctColKey = jobInfo.distinctColVec[i];
2675             uint64_t j = -1;
2676 
2677             // locate the distinct key in the row group
2678             for (uint64_t k = 0; k < keysAgg.size(); k++)
2679             {
2680                 if (keysProj[k] == distinctColKey)
2681                 {
2682                     j = k;
2683                     break;
2684                 }
2685             }
2686 
2687             idbassert(j != (uint64_t) - 1);
2688 
2689             oidsAggSub = oidsAggGb;
2690             keysAggSub = keysAggGb;
2691             scaleAggSub = scaleAggGb;
2692             precisionAggSub = precisionAggGb;
2693             typeAggSub = typeAggGb;
2694             csNumAggSub = csNumAggGb;
2695             widthAggSub = widthAggGb;
2696 
2697             oidsAggSub.push_back(oidsProj[j]);
2698             keysAggSub.push_back(keysProj[j]);
2699             scaleAggSub.push_back(scaleProj[j]);
2700             precisionAggSub.push_back(precisionProj[j]);
2701             typeAggSub.push_back(typeProj[j]);
2702             csNumAggSub.push_back(csNumProj[j]);
2703             widthAggSub.push_back(widthProj[j]);
2704 
2705             // construct sub-rowgroup
2706             posAggSub.clear();
2707             posAggSub.push_back(2);   // rid
2708 
2709             for (uint64_t k = 0; k < oidsAggSub.size(); k++)
2710                 posAggSub.push_back(posAggSub[k] + widthAggSub[k]);
2711 
2712             RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub,
2713                            csNumAggSub, scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold);
2714             subRgVec.push_back(subRg);
2715 
2716             // construct groupby vector
2717             vector<SP_ROWAGG_GRPBY_t> groupBySub;
2718             uint64_t k = 0;
2719 
2720             while (k < jobInfo.groupByColVec.size())
2721             {
2722                 SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k));
2723                 groupBySub.push_back(groupby);
2724                 k++;
2725             }
2726 
2727             // add the distinct column as groupby
2728             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
2729             groupBySub.push_back(groupby);
2730 
2731             // Keep a count of the parms after the first for any aggregate.
2732             // These will be skipped and the count needs to be subtracted
2733             // from where the aux column will be.
2734             int64_t multiParms = 0;
2735 
2736             // tricky part : 2 function vectors
2737             //   -- dummy function vector for sub-aggregator, which does distinct only
2738             //   -- aggregate function on this distinct column for rowAggDist
2739             vector<SP_ROWAGG_FUNC_t> functionSub1, functionSub2;
2740 
2741             for (uint64_t k = 0; k < returnedColVec.size(); k++)
2742             {
2743                 if (functionIdMap(returnedColVec[i].second) == ROWAGG_MULTI_PARM)
2744                 {
2745                     ++multiParms;
2746                     continue;
2747                 }
2748                 if (returnedColVec[k].first != distinctColKey)
2749                     continue;
2750 
2751                 // search the function in functionVec
2752                 vector<SP_ROWAGG_FUNC_t>::iterator it = functionVec2.begin();
2753 
2754                 while (it != functionVec2.end())
2755                 {
2756                     SP_ROWAGG_FUNC_t f = *it++;
2757 
2758                     if ((f->fOutputColumnIndex == k) &&
2759                             (f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME ||
2760                              f->fAggFunction == ROWAGG_DISTINCT_SUM ||
2761                              f->fAggFunction == ROWAGG_DISTINCT_AVG))
2762                     {
2763                         SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(
2764                                                    f->fAggFunction,
2765                                                    f->fStatsFunction,
2766                                                    groupBySub.size() - 1,
2767                                                    f->fOutputColumnIndex,
2768                                                    f->fAuxColumnIndex-multiParms));
2769                         functionSub2.push_back(funct);
2770                     }
2771                 }
2772             }
2773 
2774             // construct sub-aggregator
2775             SP_ROWAGG_UM_t subAgg(
2776                 new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
2777             subAgg->timeZone(jobInfo.timeZone);
2778             subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
2779 
2780             // add to rowAggDist
2781             multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2);
2782         }
2783 
2784         // cover any non-distinct column functions
2785         {
2786             vector<SP_ROWAGG_FUNC_t> functionSub1 = functionNoDistVec;
2787             vector<SP_ROWAGG_FUNC_t> functionSub2;
2788             int64_t multiParms = 0;
2789 
2790             for (uint64_t k = 0; k < returnedColVec.size(); k++)
2791             {
2792                 if (functionIdMap(returnedColVec[k].second) == ROWAGG_MULTI_PARM)
2793                 {
2794                     ++multiParms;
2795                     continue;
2796                 }
2797                 // search non-distinct functions in functionVec
2798                 vector<SP_ROWAGG_FUNC_t>::iterator it = functionVec2.begin();
2799 
2800                 while (it != functionVec2.end())
2801                 {
2802                     SP_ROWAGG_FUNC_t funct;
2803                     SP_ROWAGG_FUNC_t f = *it++;
2804 
2805                     if (f->fAggFunction == ROWAGG_UDAF)
2806                     {
2807                         RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(f.get());
2808                         funct.reset(new RowUDAFFunctionCol(
2809                                         udafFuncCol->fUDAFContext,
2810                                         udafFuncCol->fInputColumnIndex,
2811                                         udafFuncCol->fOutputColumnIndex,
2812                                         udafFuncCol->fAuxColumnIndex-multiParms));
2813                         functionSub2.push_back(funct);
2814                     }
2815                     else if ((f->fOutputColumnIndex == k) &&
2816                              (f->fAggFunction == ROWAGG_COUNT_ASTERISK ||
2817                               f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
2818                               f->fAggFunction == ROWAGG_SUM ||
2819                               f->fAggFunction == ROWAGG_AVG ||
2820                               f->fAggFunction == ROWAGG_MIN ||
2821                               f->fAggFunction == ROWAGG_MAX ||
2822                               f->fAggFunction == ROWAGG_STATS   ||
2823                               f->fAggFunction == ROWAGG_BIT_AND ||
2824                               f->fAggFunction == ROWAGG_BIT_OR  ||
2825                               f->fAggFunction == ROWAGG_BIT_XOR ||
2826                               f->fAggFunction == ROWAGG_CONSTANT ||
2827                               f->fAggFunction == ROWAGG_GROUP_CONCAT))
2828                     {
2829                         funct.reset(new RowAggFunctionCol(
2830                                         f->fAggFunction,
2831                                         f->fStatsFunction,
2832                                         f->fInputColumnIndex,
2833                                         f->fOutputColumnIndex,
2834                                         f->fAuxColumnIndex-multiParms));
2835                         functionSub2.push_back(funct);
2836                     }
2837                 }
2838             }
2839 
2840 
2841             if (functionSub1.size() > 0)
2842             {
2843                 // make sure the group by columns are available for next aggregate phase.
2844                 vector<SP_ROWAGG_GRPBY_t> groupBySubNoDist;
2845 
2846                 for (uint64_t i = 0; i < groupByNoDist.size(); i++)
2847                     groupBySubNoDist.push_back(SP_ROWAGG_GRPBY_t(
2848                                                    new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i)));
2849 
2850                 // construct sub-aggregator
2851                 SP_ROWAGG_UM_t subAgg(
2852                     new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
2853                 subAgg->timeZone(jobInfo.timeZone);
2854                 subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
2855 
2856                 // add to rowAggDist
2857                 multiDistinctAggregator->addSubAggregator(subAgg, aggRG, functionSub2);
2858                 subRgVec.push_back(aggRG);
2859             }
2860         }
2861     }
2862 
2863     rowAggDist->addAggregator(rowAgg, aggRG);
2864     rowgroups.push_back(aggRgDist);
2865     aggregators.push_back(rowAggDist);
2866 
2867     if (jobInfo.trace)
2868     {
2869         cout << "projected  RG: " << projRG.toString() << endl
2870              << "aggregated RG: " << aggRG.toString() << endl;
2871 
2872         for (uint64_t i = 0; i < subRgVec.size(); i++)
2873             cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl;
2874 
2875         cout << "aggregatedDist RG: " << aggRgDist.toString() << endl;
2876     }
2877 }
2878 
2879 
prep2PhasesAggregate(JobInfo & jobInfo,vector<RowGroup> & rowgroups,vector<SP_ROWAGG_t> & aggregators)2880 void TupleAggregateStep::prep2PhasesAggregate(
2881     JobInfo& jobInfo, vector<RowGroup>& rowgroups, vector<SP_ROWAGG_t>& aggregators)
2882 {
2883     // check if there are any aggregate columns
2884     // a vector that has the aggregate function to be done by PM
2885     vector<pair<uint32_t, int> > aggColVec;
2886     set<uint32_t> avgSet;
2887     vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
2888     // For UDAF
2889     uint32_t projColsUDAFIdx = 0;
2890     uint32_t udafcParamIdx = 0;
2891     UDAFColumn* udafc = NULL;
2892     mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
2893 
2894     for (uint64_t i = 0; i < returnedColVec.size(); i++)
2895     {
2896         // skip if not an aggregation column
2897         if (returnedColVec[i].second == 0)
2898             continue;
2899 
2900         aggColVec.push_back(returnedColVec[i]);
2901 
2902         // remember if a column has an average function,
2903         // with avg function, no need for separate sum or count_column_name
2904         if (returnedColVec[i].second == AggregateColumn::AVG)
2905             avgSet.insert(returnedColVec[i].first);
2906     }
2907 
2908     // populate the aggregate rowgroup on PM and UM
2909     // PM: projectedRG   -> aggregateRGPM
2910     // UM: aggregateRGPM -> aggregateRGUM
2911     //
2912     // Aggregate preparation by joblist factory:
2913     // 1. get projected rowgroup (done by doAggProject) -- input to PM AGG
2914     // 2. construct aggregate rowgroup  -- output of PM, input of UM
2915     // 3. construct aggregate rowgroup  -- output of UM
2916     const RowGroup projRG = rowgroups[0];
2917     const vector<uint32_t>& oidsProj = projRG.getOIDs();
2918     const vector<uint32_t>& keysProj = projRG.getKeys();
2919     const vector<uint32_t>& scaleProj = projRG.getScale();
2920     const vector<uint32_t>& precisionProj = projRG.getPrecision();
2921     const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
2922     const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
2923 
2924     vector<uint32_t> posAggPm, posAggUm;
2925     vector<uint32_t> oidsAggPm, oidsAggUm;
2926     vector<uint32_t> keysAggPm, keysAggUm;
2927     vector<uint32_t> scaleAggPm, scaleAggUm;
2928     vector<uint32_t> precisionAggPm, precisionAggUm;
2929     vector<CalpontSystemCatalog::ColDataType> typeAggPm, typeAggUm;
2930     vector<uint32_t> csNumAggPm, csNumAggUm;
2931     vector<uint32_t> widthAggPm, widthAggUm;
2932     vector<SP_ROWAGG_GRPBY_t> groupByPm, groupByUm;
2933     vector<SP_ROWAGG_FUNC_t> functionVecPm, functionVecUm;
2934     uint32_t bigIntWidth = sizeof(int64_t);
2935     uint32_t bigUintWidth = sizeof(uint64_t);
2936     AGG_MAP aggFuncMap;
2937 
2938     // associate the columns between projected RG and aggregate RG on PM
2939     // populated the aggregate columns
2940     //     the groupby columns are put in front, even not a returned column
2941     //     sum and count(column name) are omitted, if avg present
2942     {
2943         // project only unique oids, but they may be repeated in aggregation
2944         // collect the projected column info, prepare for aggregation
2945         vector<uint32_t> width;
2946         map<uint32_t, int> projColPosMap;
2947 
2948         for (uint64_t i = 0; i < keysProj.size(); i++)
2949         {
2950             projColPosMap.insert(make_pair(keysProj[i], i));
2951             width.push_back(projRG.getColumnWidth(i));
2952         }
2953 
2954         // column index for PM aggregate rowgroup
2955         uint64_t colAggPm = 0;
2956 
2957         // for groupby column
2958         for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
2959         {
2960             uint32_t key = jobInfo.groupByColVec[i];
2961 
2962             if (projColPosMap.find(key) == projColPosMap.end())
2963             {
2964                 ostringstream emsg;
2965                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
2966                 cerr << "prep2PhasesAggregate: groupby " << emsg.str()
2967                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[key].fId
2968                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
2969 
2970                 if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
2971                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
2972 
2973                 cerr << endl;
2974                 throw logic_error(emsg.str());
2975             }
2976 
2977             uint64_t colProj = projColPosMap[key];
2978 
2979             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
2980             groupByPm.push_back(groupby);
2981 
2982             // PM: just copy down to aggregation rowgroup
2983             oidsAggPm.push_back(oidsProj[colProj]);
2984             keysAggPm.push_back(key);
2985             scaleAggPm.push_back(scaleProj[colProj]);
2986             precisionAggPm.push_back(precisionProj[colProj]);
2987             typeAggPm.push_back(typeProj[colProj]);
2988             csNumAggPm.push_back(csNumProj[colProj]);
2989             widthAggPm.push_back(width[colProj]);
2990 
2991             aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
2992             colAggPm++;
2993         }
2994 
2995         // for distinct column
2996         for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
2997         {
2998             uint32_t key = jobInfo.distinctColVec[i];
2999 
3000             if (projColPosMap.find(key) == projColPosMap.end())
3001             {
3002                 ostringstream emsg;
3003                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
3004                 cerr << "prep2PhasesAggregate: distinct " << emsg.str()
3005                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[key].fId
3006                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
3007 
3008                 if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
3009                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
3010 
3011                 cerr << endl;
3012                 throw logic_error(emsg.str());
3013             }
3014 
3015             uint64_t colProj = projColPosMap[key];
3016 
3017             // check for dup distinct column -- @bug6126
3018             if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end())
3019                 continue;
3020 
3021             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
3022             groupByPm.push_back(groupby);
3023 
3024             // PM: just copy down to aggregation rowgroup
3025             oidsAggPm.push_back(oidsProj[colProj]);
3026             keysAggPm.push_back(key);
3027             scaleAggPm.push_back(scaleProj[colProj]);
3028             typeAggPm.push_back(typeProj[colProj]);
3029             csNumAggPm.push_back(csNumProj[colProj]);
3030             widthAggPm.push_back(width[colProj]);
3031             precisionAggPm.push_back(precisionProj[colProj]);
3032 
3033             aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
3034             colAggPm++;
3035         }
3036 
3037         // vectors for aggregate functions
3038         for (uint64_t i = 0; i < aggColVec.size(); i++)
3039         {
3040             pUDAFFunc = NULL;
3041             uint32_t aggKey = aggColVec[i].first;
3042             RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second);
3043             RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
3044 
3045             // skip on PM if this is a constant
3046             if (aggOp == ROWAGG_CONSTANT)
3047                 continue;
3048 
3049             if (projColPosMap.find(aggKey) == projColPosMap.end())
3050             {
3051                 ostringstream emsg;
3052                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
3053                 cerr << "prep2PhasesAggregate: aggregate " << emsg.str()
3054                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[aggKey].fId
3055                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
3056 
3057                 if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
3058                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
3059 
3060                 cerr << endl;
3061                 throw logic_error(emsg.str());
3062             }
3063 
3064             if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) &&
3065                     (avgSet.find(aggKey) != avgSet.end()))
3066                 // skip sum / count(column) if avg is also selected
3067                 continue;
3068 
3069             uint64_t colProj = projColPosMap[aggKey];
3070             SP_ROWAGG_FUNC_t funct;
3071 
3072             if (aggOp == ROWAGG_UDAF)
3073             {
3074                 std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
3075                 for (; it != jobInfo.projectionCols.end(); it++)
3076                 {
3077                     udafc = dynamic_cast<UDAFColumn*>((*it).get());
3078                     projColsUDAFIdx++;
3079                     if (udafc)
3080                     {
3081                         pUDAFFunc =  udafc->getContext().getFunction();
3082                         // Save the multi-parm keys for dup-detection.
3083                         if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
3084                         {
3085                             for (uint64_t k = i+1;
3086                                  k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
3087                                  ++k)
3088                             {
3089                                 udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
3090                             }
3091                         }
3092                         // Create a RowAggFunctionCol (UDAF subtype) with the context.
3093                         funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
3094                         break;
3095                     }
3096                 }
3097                 if (it == jobInfo.projectionCols.end())
3098                 {
3099                     throw logic_error("(1)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
3100                 }
3101             }
3102             else
3103             {
3104                 funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
3105             }
3106 
3107             // skip if this is a duplicate
3108             if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end())
3109                 continue;
3110 
3111             functionVecPm.push_back(funct);
3112             aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
3113 
3114             switch (aggOp)
3115             {
3116                 case ROWAGG_MIN:
3117                 case ROWAGG_MAX:
3118                 {
3119                     oidsAggPm.push_back(oidsProj[colProj]);
3120                     keysAggPm.push_back(aggKey);
3121                     scaleAggPm.push_back(scaleProj[colProj]);
3122                     precisionAggPm.push_back(precisionProj[colProj]);
3123                     typeAggPm.push_back(typeProj[colProj]);
3124                     csNumAggPm.push_back(csNumProj[colProj]);
3125                     widthAggPm.push_back(width[colProj]);
3126                     colAggPm++;
3127                 }
3128                 break;
3129 
3130                 case ROWAGG_SUM:
3131                 case ROWAGG_AVG:
3132                 {
3133                     if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
3134                             typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
3135                             typeProj[colProj] == CalpontSystemCatalog::BLOB ||
3136                             typeProj[colProj] == CalpontSystemCatalog::TEXT ||
3137                             typeProj[colProj] == CalpontSystemCatalog::DATE ||
3138                             typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
3139                             typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
3140                             typeProj[colProj] == CalpontSystemCatalog::TIME)
3141                     {
3142                         Message::Args args;
3143                         args.add("sum/average");
3144                         args.add(colTypeIdString(typeProj[colProj]));
3145                         string emsg = IDBErrorInfo::instance()->
3146                                       errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
3147                         cerr << "prep2PhasesAggregate: " << emsg << endl;
3148                         throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
3149                     }
3150 
3151                     oidsAggPm.push_back(oidsProj[colProj]);
3152                     keysAggPm.push_back(aggKey);
3153                     typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
3154                     csNumAggPm.push_back(8);
3155                     scaleAggPm.push_back(0);
3156                     precisionAggPm.push_back(-1);
3157                     widthAggPm.push_back(sizeof(long double));
3158                     colAggPm++;
3159                 }
3160 
3161                 // PM: put the count column for avg next to the sum
3162                 // let fall through to add a count column for average function
3163                 if (aggOp != ROWAGG_AVG)
3164                     break;
3165                 /* fall through */
3166 
3167                 case ROWAGG_COUNT_ASTERISK:
3168                 case ROWAGG_COUNT_COL_NAME:
3169                 {
3170                     oidsAggPm.push_back(oidsProj[colProj]);
3171                     keysAggPm.push_back(aggKey);
3172                     scaleAggPm.push_back(0);
3173                     // work around count() in select subquery
3174                     precisionAggPm.push_back(9999);
3175                     typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
3176                     csNumAggPm.push_back(8);
3177                     widthAggPm.push_back(bigIntWidth);
3178                     colAggPm++;
3179                 }
3180                 break;
3181 
3182                 case ROWAGG_STATS:
3183                 {
3184                     if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
3185                             typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
3186                             typeProj[colProj] == CalpontSystemCatalog::BLOB ||
3187                             typeProj[colProj] == CalpontSystemCatalog::TEXT ||
3188                             typeProj[colProj] == CalpontSystemCatalog::DATE ||
3189                             typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
3190                             typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
3191                             typeProj[colProj] == CalpontSystemCatalog::TIME)
3192                     {
3193                         Message::Args args;
3194                         args.add("variance/standard deviation");
3195                         args.add(colTypeIdString(typeProj[colProj]));
3196                         string emsg = IDBErrorInfo::instance()->
3197                                       errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
3198                         cerr << "prep2PhaseAggregate:: " << emsg << endl;
3199                         throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
3200                     }
3201 
3202                     // counts(x)
3203                     oidsAggPm.push_back(oidsProj[colProj]);
3204                     keysAggPm.push_back(aggKey);
3205                     scaleAggPm.push_back(scaleProj[colProj]);
3206                     precisionAggPm.push_back(0);
3207                     typeAggPm.push_back(CalpontSystemCatalog::DOUBLE);
3208                     csNumAggPm.push_back(8);
3209                     widthAggPm.push_back(sizeof(double));
3210                     funct->fAuxColumnIndex = ++colAggPm;
3211 
3212                     // sum(x)
3213                     oidsAggPm.push_back(oidsProj[colProj]);
3214                     keysAggPm.push_back(aggKey);
3215                     scaleAggPm.push_back(0);
3216                     precisionAggPm.push_back(-1);
3217                     typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
3218                     csNumAggPm.push_back(8);
3219                     widthAggPm.push_back(sizeof(long double));
3220                     ++colAggPm;
3221 
3222                     // sum(x**2)
3223                     oidsAggPm.push_back(oidsProj[colProj]);
3224                     keysAggPm.push_back(aggKey);
3225                     scaleAggPm.push_back(0);
3226                     precisionAggPm.push_back(-1);
3227                     typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
3228                     csNumAggPm.push_back(8);
3229                     widthAggPm.push_back(sizeof(long double));
3230                     ++colAggPm;
3231                 }
3232                 break;
3233 
3234                 case ROWAGG_BIT_AND:
3235                 case ROWAGG_BIT_OR:
3236                 case ROWAGG_BIT_XOR:
3237                 {
3238                     oidsAggPm.push_back(oidsProj[colProj]);
3239                     keysAggPm.push_back(aggKey);
3240                     scaleAggPm.push_back(0);
3241                     precisionAggPm.push_back(-16);  // for connector to skip null check
3242 
3243                     if (isUnsigned(typeProj[colProj]))
3244                     {
3245                         typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
3246                     }
3247                     else
3248                     {
3249                         typeAggPm.push_back(CalpontSystemCatalog::BIGINT);
3250                     }
3251 
3252                     csNumAggPm.push_back(8);
3253                     widthAggPm.push_back(bigIntWidth);
3254                     colAggPm++;
3255                 }
3256                 break;
3257 
3258                 case ROWAGG_UDAF:
3259                 {
3260                     // Return column
3261                     RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
3262 
3263                     if (!udafFuncCol)
3264                     {
3265                         throw logic_error("(2)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
3266                     }
3267 
3268                     oidsAggPm.push_back(oidsProj[colProj]);
3269                     keysAggPm.push_back(aggKey);
3270                     scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
3271                     precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
3272                     typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
3273                     csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
3274                     widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
3275                     ++colAggPm;
3276                     // Column for index of UDAF UserData struct
3277                     oidsAggPm.push_back(oidsProj[colProj]);
3278                     keysAggPm.push_back(aggKey);
3279                     scaleAggPm.push_back(0);
3280                     precisionAggPm.push_back(0);
3281                     typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
3282                     csNumAggPm.push_back(8);
3283                     widthAggPm.push_back(bigUintWidth);
3284                     funct->fAuxColumnIndex = colAggPm++;
3285                     // If the first param is const
3286                     udafcParamIdx = 0;
3287                     ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
3288                     if (cc)
3289                     {
3290                         funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
3291                     }
3292                     ++udafcParamIdx;
3293                     break;
3294                 }
3295 
3296                 case ROWAGG_MULTI_PARM:
3297                 {
3298                     oidsAggPm.push_back(oidsProj[colProj]);
3299                     keysAggPm.push_back(aggKey);
3300                     scaleAggPm.push_back(scaleProj[colProj]);
3301                     precisionAggPm.push_back(precisionProj[colProj]);
3302                     typeAggPm.push_back(typeProj[colProj]);
3303                     csNumAggPm.push_back(csNumProj[colProj]);
3304                     widthAggPm.push_back(width[colProj]);
3305                     colAggPm++;
3306                     // If the param is const
3307                     if (udafc)
3308                     {
3309                         if (udafcParamIdx > udafc->aggParms().size() - 1)
3310                         {
3311                             throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with too many parms", aggregateFuncErr);
3312                         }
3313                         ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
3314                         if (cc)
3315                         {
3316                             funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
3317                         }
3318                     }
3319                     else
3320                     {
3321                         throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with no parms", aggregateFuncErr);
3322                     }
3323                     ++udafcParamIdx;
3324                 }
3325                 break;
3326 
3327                 default:
3328                 {
3329                     ostringstream emsg;
3330                     emsg << "aggregate function (" << (uint64_t) aggOp << ") isn't supported";
3331                     cerr << "prep2PhasesAggregate: " << emsg.str() << endl;
3332                     throw QueryDataExcept(emsg.str(), aggregateFuncErr);
3333                 }
3334             }
3335         }
3336     }
3337 
3338     // associate the columns between the aggregate RGs on PM and UM
3339     // populated the returned columns
3340     //     remove not returned groupby column
3341     //     add back sum or count(column name) if omitted due to avg column
3342     //     put count(column name) column to the end, if it is for avg only
3343     {
3344         // check if the count column for AVG is also a returned column,
3345         // if so, replace the "-1" to actual position in returned vec.
3346         map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
3347         AGG_MAP aggDupFuncMap;
3348 
3349         projColsUDAFIdx = 0;
3350         // copy over the groupby vector
3351         // update the outputColumnIndex if returned
3352         for (uint64_t i = 0; i < groupByPm.size(); i++)
3353         {
3354             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(groupByPm[i]->fOutputColumnIndex, -1));
3355             groupByUm.push_back(groupby);
3356         }
3357 
3358         // locate the return column position in aggregated rowgroup from PM
3359         // outIdx is i without the multi-columns,
3360         uint64_t outIdx = 0;
3361         for (uint64_t i = 0; i < returnedColVec.size(); i++)
3362         {
3363             uint32_t retKey = returnedColVec[i].first;
3364             RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
3365             RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
3366             int colPm = -1;
3367 
3368             if (aggOp == ROWAGG_MULTI_PARM)
3369             {
3370                 // Skip on UM: Extra parms for an aggregate have no work on the UM
3371                 continue;
3372             }
3373 
3374             // Is this a UDAF? use the function as part of the key.
3375             pUDAFFunc = NULL;
3376             udafc = NULL;
3377             if (aggOp == ROWAGG_UDAF)
3378             {
3379                 std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
3380 
3381                 for (; it != jobInfo.projectionCols.end(); it++)
3382                 {
3383                     udafc = dynamic_cast<UDAFColumn*>((*it).get());
3384                     projColsUDAFIdx++;
3385                     if (udafc)
3386                     {
3387                         pUDAFFunc =  udafc->getContext().getFunction();
3388                         // Save the multi-parm keys for dup-detection.
3389                         if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
3390                         {
3391                             for (uint64_t k = i+1;
3392                                  k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
3393                                  ++k)
3394                             {
3395                                 udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
3396                             }
3397                         }
3398                         break;
3399                     }
3400                 }
3401                 if (it == jobInfo.projectionCols.end())
3402                 {
3403                     throw logic_error("(3)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
3404                 }
3405             }
3406 
3407             AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
3408 
3409             if (it != aggFuncMap.end())
3410             {
3411                 colPm = it->second;
3412                 oidsAggUm.push_back(oidsAggPm[colPm]);
3413                 keysAggUm.push_back(retKey);
3414                 scaleAggUm.push_back(scaleAggPm[colPm]);
3415                 precisionAggUm.push_back(precisionAggPm[colPm]);
3416                 typeAggUm.push_back(typeAggPm[colPm]);
3417                 csNumAggUm.push_back(csNumAggPm[colPm]);
3418                 widthAggUm.push_back(widthAggPm[colPm]);
3419             }
3420 
3421             // not a direct hit -- a returned column is not already in the RG from PMs
3422             else
3423             {
3424                 bool returnColMissing = true;
3425 
3426                 // check if a SUM or COUNT covered by AVG
3427                 if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
3428                 {
3429                     it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
3430 
3431                     if (it != aggFuncMap.end())
3432                     {
3433                         // false alarm
3434                         returnColMissing = false;
3435 
3436                         colPm = it->second;
3437 
3438                         if (aggOp == ROWAGG_SUM)
3439                         {
3440                             oidsAggUm.push_back(oidsAggPm[colPm]);
3441                             keysAggUm.push_back(retKey);
3442                             scaleAggUm.push_back(0);
3443                             typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE);
3444                             csNumAggUm.push_back(8);
3445                             precisionAggUm.push_back(-1);
3446                             widthAggUm.push_back(sizeof(long double));
3447                         }
3448                         else
3449                         {
3450                             // leave the count() to avg
3451                             aggOp = ROWAGG_COUNT_NO_OP;
3452 
3453                             colPm++;
3454                             oidsAggUm.push_back(oidsAggPm[colPm]);
3455                             keysAggUm.push_back(retKey);
3456                             scaleAggUm.push_back(0);
3457                             precisionAggUm.push_back(19);
3458                             typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
3459                             csNumAggUm.push_back(8);
3460                             widthAggUm.push_back(bigIntWidth);
3461                         }
3462                     }
3463                 }
3464                 else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(),
3465                               retKey) != jobInfo.expressionVec.end())
3466                 {
3467                     // a function on aggregation
3468                     TupleInfo ti = getTupleInfo(retKey, jobInfo);
3469                     oidsAggUm.push_back(ti.oid);
3470                     keysAggUm.push_back(retKey);
3471                     scaleAggUm.push_back(ti.scale);
3472                     precisionAggUm.push_back(ti.precision);
3473                     typeAggUm.push_back(ti.dtype);
3474                     csNumAggUm.push_back(ti.csNum);
3475                     widthAggUm.push_back(ti.width);
3476 
3477                     returnColMissing = false;
3478                 }
3479                 else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
3480                 {
3481                     // an window function
3482                     TupleInfo ti = getTupleInfo(retKey, jobInfo);
3483                     oidsAggUm.push_back(ti.oid);
3484                     keysAggUm.push_back(retKey);
3485                     scaleAggUm.push_back(ti.scale);
3486                     precisionAggUm.push_back(ti.precision);
3487                     typeAggUm.push_back(ti.dtype);
3488                     csNumAggUm.push_back(ti.csNum);
3489                     widthAggUm.push_back(ti.width);
3490 
3491                     returnColMissing = false;
3492                 }
3493                 else if (aggOp == ROWAGG_CONSTANT)
3494                 {
3495                     TupleInfo ti = getTupleInfo(retKey, jobInfo);
3496                     oidsAggUm.push_back(ti.oid);
3497                     keysAggUm.push_back(retKey);
3498                     scaleAggUm.push_back(ti.scale);
3499                     precisionAggUm.push_back(ti.precision);
3500                     typeAggUm.push_back(ti.dtype);
3501                     csNumAggUm.push_back(ti.csNum);
3502                     widthAggUm.push_back(ti.width);
3503 
3504                     returnColMissing = false;
3505                 }
3506 
3507 
3508                 if (returnColMissing)
3509                 {
3510                     Message::Args args;
3511                     args.add(keyName(outIdx, retKey, jobInfo));
3512                     string emsg = IDBErrorInfo::instance()->
3513                                   errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
3514                     cerr << "prep2PhasesAggregate: " << emsg << " oid="
3515                          << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias="
3516                          << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view="
3517                          << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function="
3518                          << (int) aggOp << endl;
3519                     throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
3520                 }
3521             }
3522 
3523             // update groupby vector if the groupby column is a returned column
3524             if (returnedColVec[i].second == 0)
3525             {
3526                 int dupGroupbyIndex = -1;
3527 
3528                 for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
3529                 {
3530                     if (jobInfo.groupByColVec[j] == retKey)
3531                     {
3532                         if (groupByUm[j]->fOutputColumnIndex == (uint32_t) - 1)
3533                             groupByUm[j]->fOutputColumnIndex = outIdx;
3534                         else
3535                             dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
3536                     }
3537                 }
3538 
3539                 for (uint64_t j = 0; j < jobInfo.distinctColVec.size(); j++)
3540                 {
3541                     if (jobInfo.distinctColVec[j] == retKey)
3542                     {
3543                         if (groupByUm[j]->fOutputColumnIndex == (uint32_t) - 1)
3544                             groupByUm[j]->fOutputColumnIndex = outIdx;
3545                         else
3546                             dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
3547                     }
3548                 }
3549 
3550                 // a duplicate group by column
3551                 if (dupGroupbyIndex != -1)
3552                     functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
3553                                                 ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
3554             }
3555             else
3556             {
3557                 // update the aggregate function vector
3558                 SP_ROWAGG_FUNC_t funct;
3559                 if (aggOp == ROWAGG_UDAF)
3560                 {
3561                     funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, outIdx));
3562                 }
3563                 else
3564                 {
3565                     funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, outIdx));
3566                 }
3567 
3568                 if (aggOp == ROWAGG_COUNT_NO_OP)
3569                     funct->fAuxColumnIndex = colPm;
3570                 else if (aggOp == ROWAGG_CONSTANT)
3571                     funct->fAuxColumnIndex = jobInfo.cntStarPos;
3572 
3573                 functionVecUm.push_back(funct);
3574 
3575                 // find if this func is a duplicate
3576                 AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
3577 
3578                 if (iter != aggDupFuncMap.end())
3579                 {
3580                     if (funct->fAggFunction == ROWAGG_AVG)
3581                         funct->fAggFunction = ROWAGG_DUP_AVG;
3582                     else if (funct->fAggFunction == ROWAGG_STATS)
3583                         funct->fAggFunction = ROWAGG_DUP_STATS;
3584                     else if (funct->fAggFunction == ROWAGG_UDAF)
3585                         funct->fAggFunction = ROWAGG_DUP_UDAF;
3586                     else
3587                         funct->fAggFunction = ROWAGG_DUP_FUNCT;
3588 
3589                     funct->fAuxColumnIndex = iter->second;
3590                 }
3591                 else
3592                 {
3593                     aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
3594                                                    funct->fOutputColumnIndex));
3595                 }
3596 
3597                 if (returnedColVec[i].second == AggregateColumn::AVG)
3598                     avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
3599             }
3600             ++outIdx;
3601         }
3602 
3603         // now fix the AVG function, locate the count(column) position
3604         for (uint64_t i = 0; i < functionVecUm.size(); i++)
3605         {
3606             if (functionVecUm[i]->fAggFunction != ROWAGG_COUNT_NO_OP)
3607                 continue;
3608 
3609             // if the count(k) can be associated with an avg(k)
3610             map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
3611                 avgFuncMap.find(keysAggUm[functionVecUm[i]->fOutputColumnIndex]);
3612 
3613             if (k != avgFuncMap.end())
3614                 k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
3615         }
3616 
3617         // there is avg(k), but no count(k) in the select list
3618         uint64_t lastCol = outIdx;
3619 
3620         for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
3621         {
3622             if (k->second->fAuxColumnIndex == (uint32_t) - 1)
3623             {
3624                 k->second->fAuxColumnIndex = lastCol++;
3625                 oidsAggUm.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
3626                 keysAggUm.push_back(k->first);
3627                 scaleAggUm.push_back(0);
3628                 precisionAggUm.push_back(19);
3629                 typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
3630                 csNumAggUm.push_back(8);
3631                 widthAggUm.push_back(bigIntWidth);
3632             }
3633         }
3634 
3635         // add auxiliary fields for UDAF and statistics functions
3636         for (uint64_t i = 0; i < functionVecUm.size(); i++)
3637         {
3638             uint64_t j = functionVecUm[i]->fInputColumnIndex;
3639 
3640             if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
3641             {
3642                 // Column for index of UDAF UserData struct
3643                 RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
3644 
3645                 if (!udafFuncCol)
3646                 {
3647                     throw logic_error("(4)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
3648                 }
3649 
3650                 functionVecUm[i]->fAuxColumnIndex = lastCol++;
3651                 oidsAggUm.push_back(oidsAggPm[j]); // Dummy?
3652                 keysAggUm.push_back(keysAggPm[j]); // Dummy?
3653                 scaleAggUm.push_back(0);
3654                 precisionAggUm.push_back(0);
3655                 typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
3656                 csNumAggUm.push_back(8);
3657                 widthAggUm.push_back(bigUintWidth);
3658                 continue;
3659             }
3660 
3661             if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
3662                 continue;
3663 
3664             functionVecUm[i]->fAuxColumnIndex = lastCol;
3665 
3666             // sum(x)
3667             oidsAggUm.push_back(oidsAggPm[j]);
3668             keysAggUm.push_back(keysAggPm[j]);
3669             scaleAggUm.push_back(0);
3670             precisionAggUm.push_back(-1);
3671             typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE);
3672             csNumAggUm.push_back(8);
3673             widthAggUm.push_back(sizeof(long double));
3674             ++lastCol;
3675 
3676             // sum(x**2)
3677             oidsAggUm.push_back(oidsAggPm[j]);
3678             keysAggUm.push_back(keysAggPm[j]);
3679             scaleAggUm.push_back(0);
3680             precisionAggUm.push_back(-1);
3681             typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE);
3682             csNumAggUm.push_back(8);
3683             widthAggUm.push_back(sizeof(long double));
3684             ++lastCol;
3685         }
3686     }
3687 
3688     // calculate the offset and create the rowaggregations, rowgroups
3689     posAggUm.push_back(2);   // rid
3690 
3691     for (uint64_t i = 0; i < oidsAggUm.size(); i++)
3692         posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
3693 
3694     RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm,
3695                      csNumAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold);
3696     SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
3697     rowAggUm->timeZone(jobInfo.timeZone);
3698     rowgroups.push_back(aggRgUm);
3699     aggregators.push_back(rowAggUm);
3700 
3701     posAggPm.push_back(2);   // rid
3702 
3703     for (uint64_t i = 0; i < oidsAggPm.size(); i++)
3704         posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
3705 
3706     RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm,
3707                      csNumAggPm, scaleAggPm, precisionAggPm, jobInfo.stringTableThreshold);
3708     SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm));
3709     rowAggPm->timeZone(jobInfo.timeZone);
3710     rowgroups.push_back(aggRgPm);
3711     aggregators.push_back(rowAggPm);
3712 
3713     if (jobInfo.trace)
3714         cout << "\n====== Aggregation RowGroups ======" << endl
3715              << "projected   RG: " << projRG.toString() << endl
3716              << "aggregated1 RG: " << aggRgPm.toString() << endl
3717              << "aggregated2 RG: " << aggRgUm.toString() << endl;
3718 }
3719 
3720 
prep2PhasesDistinctAggregate(JobInfo & jobInfo,vector<RowGroup> & rowgroups,vector<SP_ROWAGG_t> & aggregators)3721 void TupleAggregateStep::prep2PhasesDistinctAggregate(
3722     JobInfo& jobInfo,
3723     vector<RowGroup>& rowgroups,
3724     vector<SP_ROWAGG_t>& aggregators)
3725 {
3726     // check if there are any aggregate columns
3727     // a vector that has the aggregate function to be done by PM
3728     vector<pair<uint32_t, int> > aggColVec, aggNoDistColVec;
3729     set<uint32_t> avgSet, avgDistSet;
3730     vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
3731     // For UDAF
3732     uint32_t projColsUDAFIdx = 0;
3733     uint32_t udafcParamIdx = 0;
3734     UDAFColumn* udafc = NULL;
3735     mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
3736 
3737     for (uint64_t i = 0; i < returnedColVec.size(); i++)
3738     {
3739         // col should be an aggregate or groupBy or window function
3740         uint32_t rtcKey = returnedColVec[i].first;
3741         uint32_t rtcOp = returnedColVec[i].second;
3742 
3743         if (rtcOp == 0 &&
3744                 find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), rtcKey) !=
3745                 jobInfo.distinctColVec.end() &&
3746                 find(jobInfo.groupByColVec.begin(), jobInfo.groupByColVec.end(), rtcKey ) ==
3747                 jobInfo.groupByColVec.end() &&
3748                 jobInfo.windowSet.find(rtcKey) != jobInfo.windowSet.end())
3749         {
3750             Message::Args args;
3751             args.add(keyName(i, rtcKey, jobInfo));
3752             string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
3753             cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid="
3754                  << (int) jobInfo.keyInfo->tupleKeyVec[rtcKey].fId << ", alias="
3755                  << jobInfo.keyInfo->tupleKeyVec[rtcKey].fTable << ", view="
3756                  << jobInfo.keyInfo->tupleKeyVec[rtcKey].fView << endl;
3757             throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
3758         }
3759 
3760         // skip if not an aggregation column
3761         if (returnedColVec[i].second == 0)
3762             continue;
3763 
3764         aggColVec.push_back(returnedColVec[i]);
3765 
3766         // remember if a column has an average function,
3767         // with avg function, no need for separate sum or count_column_name
3768         if (returnedColVec[i].second == AggregateColumn::AVG)
3769             avgSet.insert(returnedColVec[i].first);
3770 
3771         if ( returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
3772             avgDistSet.insert(returnedColVec[i].first);
3773     }
3774 
3775     // populate the aggregate rowgroup on PM and UM
3776     // PM: projectedRG   -> aggregateRGPM
3777     // UM: aggregateRGPM -> aggregateRGUM
3778     //
3779     // Aggregate preparation by joblist factory:
3780     // 1. get projected rowgroup (done by doAggProject) -- input to PM AGG
3781     // 2. construct aggregate rowgroup  -- output of PM, input of UM
3782     // 3. construct aggregate rowgroup  -- output of UM
3783     // 4. construct aggregate rowgroup  -- output of distinct aggregates
3784 
3785     const RowGroup projRG = rowgroups[0];
3786     const vector<uint32_t>& oidsProj = projRG.getOIDs();
3787     const vector<uint32_t>& keysProj = projRG.getKeys();
3788     const vector<uint32_t>& scaleProj = projRG.getScale();
3789     const vector<uint32_t>& precisionProj = projRG.getPrecision();
3790     const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
3791     const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
3792 
3793     vector<uint32_t> posAggPm, posAggUm, posAggDist;
3794     vector<uint32_t> oidsAggPm, oidsAggUm, oidsAggDist;
3795     vector<uint32_t> keysAggPm, keysAggUm, keysAggDist;
3796     vector<uint32_t> scaleAggPm, scaleAggUm, scaleAggDist;
3797     vector<uint32_t> precisionAggPm, precisionAggUm, precisionAggDist;
3798     vector<CalpontSystemCatalog::ColDataType> typeAggPm, typeAggUm, typeAggDist;
3799     vector<uint32_t> csNumAggPm, csNumAggUm, csNumAggDist;
3800     vector<uint32_t> widthAggPm, widthAggUm, widthAggDist;
3801 
3802     vector<SP_ROWAGG_GRPBY_t> groupByPm, groupByUm, groupByNoDist;
3803     vector<SP_ROWAGG_FUNC_t> functionVecPm, functionNoDistVec, functionVecUm;
3804     list<uint32_t> multiParmIndexes;
3805 
3806     uint32_t bigIntWidth = sizeof(int64_t);
3807     map<pair<uint32_t, int>, uint64_t> avgFuncDistMap;
3808     AGG_MAP aggFuncMap;
3809 
3810     // associate the columns between projected RG and aggregate RG on PM
3811     // populated the aggregate columns
3812     //     the groupby columns are put in front, even not a returned column
3813     //     sum and count(column name) are omitted, if avg present
3814     {
3815         // project only unique oids, but they may be repeated in aggregation
3816         // collect the projected column info, prepare for aggregation
3817         vector<uint32_t> width;
3818         map<uint32_t, int> projColPosMap;
3819 
3820         for (uint64_t i = 0; i < keysProj.size(); i++)
3821         {
3822             projColPosMap.insert(make_pair(keysProj[i], i));
3823             width.push_back(projRG.getColumnWidth(i));
3824         }
3825 
3826         // column index for PM aggregate rowgroup
3827         uint64_t colAggPm = 0;
3828         uint64_t multiParm = 0;
3829 
3830         // for groupby column
3831         for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
3832         {
3833             uint32_t key = jobInfo.groupByColVec[i];
3834 
3835             if (projColPosMap.find(key) == projColPosMap.end())
3836             {
3837                 ostringstream emsg;
3838                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
3839                 cerr << "prep2PhasesDistinctAggregate: group " << emsg.str()
3840                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[key].fId
3841                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
3842 
3843                 if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
3844                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
3845 
3846                 cerr << endl;
3847                 throw logic_error(emsg.str());
3848             }
3849 
3850             uint64_t colProj = projColPosMap[key];
3851 
3852             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
3853             groupByPm.push_back(groupby);
3854 
3855             // PM: just copy down to aggregation rowgroup
3856             oidsAggPm.push_back(oidsProj[colProj]);
3857             keysAggPm.push_back(key);
3858             scaleAggPm.push_back(scaleProj[colProj]);
3859             precisionAggPm.push_back(precisionProj[colProj]);
3860             typeAggPm.push_back(typeProj[colProj]);
3861             csNumAggPm.push_back(csNumProj[colProj]);
3862             widthAggPm.push_back(width[colProj]);
3863 
3864             aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
3865             colAggPm++;
3866         }
3867 
3868         // for distinct column
3869         for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
3870         {
3871             uint32_t key = jobInfo.distinctColVec[i];
3872 
3873             if (projColPosMap.find(key) == projColPosMap.end())
3874             {
3875                 ostringstream emsg;
3876                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
3877                 cerr << "prep2PhasesDistinctAggregate: distinct " << emsg.str()
3878                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[key].fId
3879                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
3880 
3881                 if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
3882                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
3883 
3884                 cerr << endl;
3885                 throw logic_error(emsg.str());
3886             }
3887 
3888             // check for dup distinct column -- @bug6126
3889             if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end())
3890                 continue;
3891 
3892             uint64_t colProj = projColPosMap[key];
3893 
3894             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
3895             groupByPm.push_back(groupby);
3896 
3897             // PM: just copy down to aggregation rowgroup
3898             oidsAggPm.push_back(oidsProj[colProj]);
3899             keysAggPm.push_back(key);
3900             scaleAggPm.push_back(scaleProj[colProj]);
3901             precisionAggPm.push_back(precisionProj[colProj]);
3902             typeAggPm.push_back(typeProj[colProj]);
3903             csNumAggPm.push_back(csNumProj[colProj]);
3904             widthAggPm.push_back(width[colProj]);
3905             precisionAggPm.push_back(precisionProj[colProj]);
3906 
3907             aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
3908             colAggPm++;
3909         }
3910 
3911         // vectors for aggregate functions
3912         for (uint64_t i = 0; i < aggColVec.size(); i++)
3913         {
3914             // skip on PM if this is a constant
3915             RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second);
3916 
3917             if (aggOp == ROWAGG_CONSTANT)
3918                 continue;
3919 
3920             pUDAFFunc = NULL;
3921             uint32_t aggKey = aggColVec[i].first;
3922 
3923             if (projColPosMap.find(aggKey) == projColPosMap.end())
3924             {
3925                 ostringstream emsg;
3926                 emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
3927                 cerr << "prep2PhasesDistinctAggregate: aggregate " << emsg.str()
3928                      << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[aggKey].fId
3929                      << ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
3930 
3931                 if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
3932                     cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
3933 
3934                 cerr << endl;
3935                 throw logic_error(emsg.str());
3936             }
3937 
3938             RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
3939 
3940             // skip sum / count(column) if avg is also selected
3941             if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) &&
3942                     (avgSet.find(aggKey) != avgSet.end()))
3943                 continue;
3944 
3945             if (aggOp == ROWAGG_DISTINCT_SUM ||
3946                     aggOp == ROWAGG_DISTINCT_AVG ||
3947                     aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
3948                 continue;
3949 
3950             uint64_t colProj = projColPosMap[aggKey];
3951             SP_ROWAGG_FUNC_t funct;
3952 
3953             if (aggOp == ROWAGG_UDAF)
3954             {
3955                 std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
3956                 for (; it != jobInfo.projectionCols.end(); it++)
3957                 {
3958                     udafc = dynamic_cast<UDAFColumn*>((*it).get());
3959                     projColsUDAFIdx++;
3960                     if (udafc)
3961                     {
3962                         pUDAFFunc =  udafc->getContext().getFunction();
3963                         // Save the multi-parm keys for dup-detection.
3964                         if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
3965                         {
3966                             for (uint64_t k = i+1;
3967                                  k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
3968                                  ++k)
3969                             {
3970                                 udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
3971                             }
3972                         }
3973                         // Create a RowAggFunctionCol (UDAF subtype) with the context.
3974                         funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
3975                         break;
3976                     }
3977                 }
3978                 if (it == jobInfo.projectionCols.end())
3979                 {
3980                     throw logic_error("(1)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
3981                 }
3982             }
3983             else
3984             {
3985                 funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
3986             }
3987 
3988             // skip if this is a duplicate
3989             if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end())
3990                 continue;
3991 
3992             functionVecPm.push_back(funct);
3993             aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm-multiParm));
3994 
3995             switch (aggOp)
3996             {
3997                 case ROWAGG_MIN:
3998                 case ROWAGG_MAX:
3999                 {
4000                     oidsAggPm.push_back(oidsProj[colProj]);
4001                     keysAggPm.push_back(aggKey);
4002                     scaleAggPm.push_back(scaleProj[colProj]);
4003                     precisionAggPm.push_back(precisionProj[colProj]);
4004                     typeAggPm.push_back(typeProj[colProj]);
4005                     csNumAggPm.push_back(csNumProj[colProj]);
4006                     widthAggPm.push_back(width[colProj]);
4007                     colAggPm++;
4008                 }
4009                 break;
4010 
4011                 case ROWAGG_SUM:
4012                 case ROWAGG_AVG:
4013                 {
4014                     if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
4015                             typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
4016                             typeProj[colProj] == CalpontSystemCatalog::BLOB ||
4017                             typeProj[colProj] == CalpontSystemCatalog::TEXT ||
4018                             typeProj[colProj] == CalpontSystemCatalog::DATE ||
4019                             typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
4020                             typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
4021                             typeProj[colProj] == CalpontSystemCatalog::TIME)
4022                     {
4023                         Message::Args args;
4024                         args.add("sum/average");
4025                         args.add(colTypeIdString(typeProj[colProj]));
4026                         string emsg = IDBErrorInfo::instance()->
4027                                       errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
4028                         cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
4029                         throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
4030                     }
4031 
4032                     oidsAggPm.push_back(oidsProj[colProj]);
4033                     keysAggPm.push_back(aggKey);
4034                     typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
4035                     csNumAggPm.push_back(8);
4036                     precisionAggPm.push_back(-1);
4037                     widthAggPm.push_back(sizeof(long double));
4038                     scaleAggPm.push_back(0);
4039                     colAggPm++;
4040                 }
4041 
4042                 // PM: put the count column for avg next to the sum
4043                 // let fall through to add a count column for average function
4044                 if (aggOp == ROWAGG_AVG)
4045                     funct->fAuxColumnIndex = colAggPm;
4046                 else
4047                     break;
4048                 /* fall through */
4049 
4050                 case ROWAGG_COUNT_ASTERISK:
4051                 case ROWAGG_COUNT_COL_NAME:
4052                 {
4053                     oidsAggPm.push_back(oidsProj[colProj]);
4054                     keysAggPm.push_back(aggKey);
4055                     scaleAggPm.push_back(0);
4056                     // work around count() in select subquery
4057                     precisionAggPm.push_back(9999);
4058 
4059                     if (isUnsigned(typeProj[colProj]))
4060                     {
4061                         typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
4062                     }
4063                     else
4064                     {
4065                         typeAggPm.push_back(CalpontSystemCatalog::BIGINT);
4066                     }
4067 
4068                     csNumAggPm.push_back(8);
4069                     widthAggPm.push_back(bigIntWidth);
4070                     colAggPm++;
4071                 }
4072                 break;
4073 
4074                 case ROWAGG_STATS:
4075                 {
4076                     if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
4077                             typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
4078                             typeProj[colProj] == CalpontSystemCatalog::BLOB ||
4079                             typeProj[colProj] == CalpontSystemCatalog::TEXT ||
4080                             typeProj[colProj] == CalpontSystemCatalog::DATE ||
4081                             typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
4082                             typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
4083                             typeProj[colProj] == CalpontSystemCatalog::TIME)
4084                     {
4085                         Message::Args args;
4086                         args.add("variance/standard deviation");
4087                         args.add(colTypeIdString(typeProj[colProj]));
4088                         string emsg = IDBErrorInfo::instance()->
4089                                       errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
4090                         cerr << "prep2PhasesDistinctAggregate:: " << emsg << endl;
4091                         throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
4092                     }
4093 
4094                     // count(x)
4095                     oidsAggPm.push_back(oidsProj[colProj]);
4096                     keysAggPm.push_back(aggKey);
4097                     scaleAggPm.push_back(scaleProj[colProj]);
4098                     precisionAggPm.push_back(0);
4099                     typeAggPm.push_back(CalpontSystemCatalog::DOUBLE);
4100                     csNumAggPm.push_back(8);
4101                     widthAggPm.push_back(sizeof(double));
4102                     funct->fAuxColumnIndex = ++colAggPm;
4103 
4104                     // sum(x)
4105                     oidsAggPm.push_back(oidsProj[colProj]);
4106                     keysAggPm.push_back(aggKey);
4107                     scaleAggPm.push_back(0);
4108                     precisionAggPm.push_back(-1);
4109                     typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
4110                     csNumAggPm.push_back(8);
4111                     widthAggPm.push_back(sizeof(long double));
4112                     ++colAggPm;
4113 
4114                     // sum(x**2)
4115                     oidsAggPm.push_back(oidsProj[colProj]);
4116                     keysAggPm.push_back(aggKey);
4117                     scaleAggPm.push_back(0);
4118                     precisionAggPm.push_back(-1);
4119                     typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
4120                     csNumAggPm.push_back(8);
4121                     widthAggPm.push_back(sizeof(long double));
4122                     ++colAggPm;
4123                 }
4124                 break;
4125 
4126                 case ROWAGG_BIT_AND:
4127                 case ROWAGG_BIT_OR:
4128                 case ROWAGG_BIT_XOR:
4129                 {
4130                     oidsAggPm.push_back(oidsProj[colProj]);
4131                     keysAggPm.push_back(aggKey);
4132                     scaleAggPm.push_back(0);
4133                     precisionAggPm.push_back(-16);  // for connector to skip null check
4134 
4135                     if (isUnsigned(typeProj[colProj]))
4136                     {
4137                         typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
4138                     }
4139                     else
4140                     {
4141                         typeAggPm.push_back(CalpontSystemCatalog::BIGINT);
4142                     }
4143 
4144                     csNumAggPm.push_back(8);
4145                     widthAggPm.push_back(bigIntWidth);
4146                     ++colAggPm;
4147                 }
4148                 break;
4149 
4150                 case ROWAGG_UDAF:
4151                 {
4152                     RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
4153 
4154                     if (!udafFuncCol)
4155                     {
4156                         throw logic_error("(2)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
4157                     }
4158 
4159                     // Return column
4160                     oidsAggPm.push_back(oidsProj[colProj]);
4161                     keysAggPm.push_back(aggKey);
4162                     scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
4163                     precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
4164                     typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
4165                     csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
4166                     widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
4167                     ++colAggPm;
4168                     // Column for index of UDAF UserData struct
4169                     oidsAggPm.push_back(oidsProj[colProj]);
4170                     keysAggPm.push_back(aggKey);
4171                     scaleAggPm.push_back(0);
4172                     precisionAggPm.push_back(0);
4173                     typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
4174                     csNumAggPm.push_back(8);
4175                     widthAggPm.push_back(sizeof(uint64_t));
4176                     funct->fAuxColumnIndex = colAggPm++;
4177                     // If the first param is const
4178                     udafcParamIdx = 0;
4179                     ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
4180                     if (cc)
4181                     {
4182                         funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
4183                     }
4184                     ++udafcParamIdx;
4185                     break;
4186                 }
4187 
4188                 case ROWAGG_MULTI_PARM:
4189                 {
4190                     oidsAggPm.push_back(oidsProj[colProj]);
4191                     keysAggPm.push_back(aggKey);
4192                     scaleAggPm.push_back(scaleProj[colProj]);
4193                     precisionAggPm.push_back(precisionProj[colProj]);
4194                     typeAggPm.push_back(typeProj[colProj]);
4195                     csNumAggPm.push_back(csNumProj[colProj]);
4196                     widthAggPm.push_back(width[colProj]);
4197                     multiParmIndexes.push_back(colAggPm);
4198                     ++colAggPm;
4199                     ++multiParm;
4200                     // If the param is const
4201                     if (udafc)
4202                     {
4203                         if (udafcParamIdx > udafc->aggParms().size() - 1)
4204                         {
4205                             throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr);
4206                         }
4207                         ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
4208                         if (cc)
4209                         {
4210                             funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
4211                         }
4212                     }
4213                     else
4214                     {
4215                         throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr);
4216                     }
4217                     ++udafcParamIdx;
4218                 }
4219                 break;
4220 
4221                 default:
4222                 {
4223                     ostringstream emsg;
4224                     emsg << "aggregate function (" << (uint64_t) aggOp << ") isn't supported";
4225                     cerr << "prep2PhasesDistinctAggregate: " << emsg.str() << endl;
4226                     throw QueryDataExcept(emsg.str(), aggregateFuncErr);
4227                 }
4228             }
4229         }
4230     }
4231 
4232 
4233     // associate the columns between the aggregate RGs on PM and UM without distinct aggregator
4234     // populated the returned columns
4235     {
4236         int64_t multiParms = 0;
4237 
4238         for (uint32_t idx = 0; idx < groupByPm.size(); idx++)
4239         {
4240             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(idx, idx));
4241             groupByUm.push_back(groupby);
4242         }
4243 
4244         for (uint32_t idx = 0; idx < functionVecPm.size(); idx++)
4245         {
4246             SP_ROWAGG_FUNC_t funct;
4247             SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx];
4248 
4249             if (funcPm->fAggFunction == ROWAGG_MULTI_PARM)
4250             {
4251                 // Skip on UM: Extra parms for an aggregate have no work on the UM
4252                 ++multiParms;
4253                 continue;
4254             }
4255 
4256             if (funcPm->fAggFunction == ROWAGG_UDAF)
4257             {
4258                 RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get());
4259                 if (!udafFuncCol)
4260                 {
4261                    throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
4262                 }
4263                 funct.reset(new RowUDAFFunctionCol(
4264                                 udafFuncCol->fUDAFContext,
4265                                 udafFuncCol->fOutputColumnIndex,
4266                                 udafFuncCol->fOutputColumnIndex-multiParms,
4267                                 udafFuncCol->fAuxColumnIndex-multiParms));
4268                 functionNoDistVec.push_back(funct);
4269                 pUDAFFunc =  udafFuncCol->fUDAFContext.getFunction();
4270             }
4271             else
4272             {
4273                 funct.reset(new RowAggFunctionCol(
4274                                 funcPm->fAggFunction,
4275                                 funcPm->fStatsFunction,
4276                                 funcPm->fOutputColumnIndex,
4277                                 funcPm->fOutputColumnIndex-multiParms,
4278                                 funcPm->fAuxColumnIndex-multiParms));
4279                 functionNoDistVec.push_back(funct);
4280                 pUDAFFunc = NULL;
4281             }
4282         }
4283 
4284         // Copy over the PM arrays to the UM. Skip any that are a multi-parm entry.
4285         for (uint32_t idx = 0; idx < oidsAggPm.size(); ++idx)
4286         {
4287             if (find (multiParmIndexes.begin(), multiParmIndexes.end(), idx ) != multiParmIndexes.end())
4288             {
4289                 continue;
4290             }
4291             oidsAggUm.push_back(oidsAggPm[idx]);
4292             keysAggUm.push_back(keysAggPm[idx]);
4293             scaleAggUm.push_back(scaleAggPm[idx]);
4294             precisionAggUm.push_back(precisionAggPm[idx]);
4295             widthAggUm.push_back(widthAggPm[idx]);
4296             typeAggUm.push_back(typeAggPm[idx]);
4297             csNumAggUm.push_back(csNumAggPm[idx]);
4298         }
4299     }
4300 
4301 
4302     // associate the columns between the aggregate RGs on UM and Distinct
4303     // populated the returned columns
4304     //     remove not returned groupby column
4305     //     add back sum or count(column name) if omitted due to avg column
4306     //     put count(column name) column to the end, if it is for avg only
4307     {
4308         // Keep a count of the parms after the first for any aggregate.
4309         // These will be skipped and the count needs to be subtracted
4310         // from where the aux column will be.
4311         int64_t multiParms = 0;
4312         projColsUDAFIdx = 0;
4313         // check if the count column for AVG is also a returned column,
4314         // if so, replace the "-1" to actual position in returned vec.
4315         map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
4316         AGG_MAP aggDupFuncMap;
4317 
4318         // copy over the groupby vector
4319         for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
4320         {
4321             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
4322             groupByNoDist.push_back(groupby);
4323         }
4324 
4325         // locate the return column position in aggregated rowgroup from PM
4326         // outIdx is i without the multi-columns,
4327         uint64_t outIdx = 0;
4328         for (uint64_t i = 0; i < returnedColVec.size(); i++)
4329         {
4330             pUDAFFunc = NULL;
4331             udafc = NULL;
4332             uint32_t retKey = returnedColVec[i].first;
4333 
4334             RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
4335             RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
4336             int colUm = -1;
4337 
4338             if (aggOp == ROWAGG_MULTI_PARM)
4339             {
4340                 // Skip on UM: Extra parms for an aggregate have no work on the UM
4341                 ++multiParms;
4342                 continue;
4343             }
4344 
4345             if (aggOp == ROWAGG_UDAF)
4346             {
4347                 std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
4348                 for (; it != jobInfo.projectionCols.end(); it++)
4349                 {
4350                     udafc = dynamic_cast<UDAFColumn*>((*it).get());
4351                     projColsUDAFIdx++;
4352                     if (udafc)
4353                     {
4354                         pUDAFFunc =  udafc->getContext().getFunction();
4355                         // Save the multi-parm keys for dup-detection.
4356                         if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
4357                         {
4358                             for (uint64_t k = i+1;
4359                                  k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
4360                                  ++k)
4361                             {
4362                                 udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
4363                             }
4364                         }
4365                         break;
4366                     }
4367                 }
4368                 if (it == jobInfo.projectionCols.end())
4369                 {
4370                     throw logic_error("(4)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
4371                 }
4372             }
4373 
4374             if  (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
4375                     jobInfo.distinctColVec.end() )
4376             {
4377                 AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
4378 
4379                 if (it != aggFuncMap.end())
4380                 {
4381                     colUm = it->second;
4382                 }
4383             }
4384 
4385             if (colUm > -1) // Means we found a DISTINCT and have a column number
4386             {
4387                 switch (aggOp)
4388                 {
4389                     case ROWAGG_DISTINCT_AVG:
4390 
4391                     //avgFuncMap.insert(make_pair(key, funct));
4392                     case ROWAGG_DISTINCT_SUM:
4393                     {
4394                         if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR ||
4395                                 typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR ||
4396                                 typeAggUm[colUm] == CalpontSystemCatalog::BLOB ||
4397                                 typeAggUm[colUm] == CalpontSystemCatalog::TEXT ||
4398                                 typeAggUm[colUm] == CalpontSystemCatalog::DATE ||
4399                                 typeAggUm[colUm] == CalpontSystemCatalog::DATETIME ||
4400                                 typeAggUm[colUm] == CalpontSystemCatalog::TIMESTAMP ||
4401                                 typeAggUm[colUm] == CalpontSystemCatalog::TIME)
4402                         {
4403                             Message::Args args;
4404                             args.add("sum/average");
4405                             args.add(colTypeIdString(typeAggUm[colUm]));
4406                             string emsg = IDBErrorInfo::instance()->
4407                                           errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
4408                             cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
4409                             throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
4410                         }
4411 
4412                         oidsAggDist.push_back(oidsAggUm[colUm]);
4413                         keysAggDist.push_back(retKey);
4414                         typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
4415                         csNumAggDist.push_back(8);
4416                         precisionAggDist.push_back(-1);
4417                         widthAggDist.push_back(sizeof(long double));
4418                         scaleAggDist.push_back(0);
4419                     }
4420                     // PM: put the count column for avg next to the sum
4421                     // let fall through to add a count column for average function
4422                     //if (aggOp != ROWAGG_DISTINCT_AVG)
4423                     break;
4424 
4425                     case ROWAGG_COUNT_DISTINCT_COL_NAME:
4426                     {
4427                         oidsAggDist.push_back(oidsAggUm[colUm]);
4428                         keysAggDist.push_back(retKey);
4429                         scaleAggDist.push_back(0);
4430                         // work around count() in select subquery
4431                         precisionAggDist.push_back(9999);
4432                         typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
4433                         csNumAggDist.push_back(8);
4434                         widthAggDist.push_back(bigIntWidth);
4435                     }
4436                     break;
4437 
4438                     default:
4439                         // could happen if agg and agg distinct use same column.
4440                         colUm = -1;
4441                         break;
4442                 } // switch
4443             }
4444             // For non distinct aggregates
4445             if (colUm == -1)
4446             {
4447                 AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
4448 
4449                 if (it != aggFuncMap.end())
4450                 {
4451                     colUm = it->second;
4452                     oidsAggDist.push_back(oidsAggUm[colUm]);
4453                     keysAggDist.push_back(keysAggUm[colUm]);
4454                     scaleAggDist.push_back(scaleAggUm[colUm]);
4455                     precisionAggDist.push_back(precisionAggUm[colUm]);
4456                     typeAggDist.push_back(typeAggUm[colUm]);
4457                     csNumAggDist.push_back(csNumAggUm[colUm]);
4458                     widthAggDist.push_back(widthAggUm[colUm]);
4459                 }
4460 
4461                 // not a direct hit -- a returned column is not already in the RG from PMs
4462                 else
4463                 {
4464                     bool returnColMissing = true;
4465 
4466                     // check if a SUM or COUNT covered by AVG
4467                     if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
4468                     {
4469                         it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
4470 
4471                         if (it != aggFuncMap.end())
4472                         {
4473                             // false alarm
4474                             returnColMissing = false;
4475 
4476                             colUm = it->second;
4477 
4478                             if (aggOp == ROWAGG_SUM)
4479                             {
4480                                 oidsAggDist.push_back(oidsAggUm[colUm]);
4481                                 keysAggDist.push_back(retKey);
4482                                 scaleAggDist.push_back(0);
4483                                 typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
4484                                 csNumAggDist.push_back(8);
4485                                 precisionAggDist.push_back(-1);
4486                                 widthAggDist.push_back(sizeof(long double));
4487                             }
4488                             else
4489                             {
4490                                 // leave the count() to avg
4491                                 aggOp = ROWAGG_COUNT_NO_OP;
4492 
4493                                 oidsAggDist.push_back(oidsAggUm[colUm]);
4494                                 keysAggDist.push_back(retKey);
4495                                 scaleAggDist.push_back(0);
4496                                 if (isUnsigned(typeAggUm[colUm]))
4497                                 {
4498                                     precisionAggDist.push_back(20);
4499                                     typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
4500                                 }
4501                                 else
4502                                 {
4503                                     precisionAggDist.push_back(19);
4504                                     typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
4505                                 }
4506                                 csNumAggDist.push_back(8);
4507                                 widthAggDist.push_back(bigIntWidth);
4508                             }
4509                         }
4510                     }
4511                     else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(),
4512                                   retKey) != jobInfo.expressionVec.end())
4513                     {
4514                         // a function on aggregation
4515                         TupleInfo ti = getTupleInfo(retKey, jobInfo);
4516                         oidsAggDist.push_back(ti.oid);
4517                         keysAggDist.push_back(retKey);
4518                         scaleAggDist.push_back(ti.scale);
4519                         precisionAggDist.push_back(ti.precision);
4520                         typeAggDist.push_back(ti.dtype);
4521                         csNumAggDist.push_back(ti.csNum);
4522                         widthAggDist.push_back(ti.width);
4523 
4524                         returnColMissing = false;
4525                     }
4526                     else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
4527                     {
4528                         // a window function
4529                         TupleInfo ti = getTupleInfo(retKey, jobInfo);
4530                         oidsAggDist.push_back(ti.oid);
4531                         keysAggDist.push_back(retKey);
4532                         scaleAggDist.push_back(ti.scale);
4533                         precisionAggDist.push_back(ti.precision);
4534                         typeAggDist.push_back(ti.dtype);
4535                         csNumAggDist.push_back(ti.csNum);
4536                         widthAggDist.push_back(ti.width);
4537 
4538                         returnColMissing = false;
4539                     }
4540                     else if (aggOp == ROWAGG_CONSTANT)
4541                     {
4542                         TupleInfo ti = getTupleInfo(retKey, jobInfo);
4543                         oidsAggDist.push_back(ti.oid);
4544                         keysAggDist.push_back(retKey);
4545                         scaleAggDist.push_back(ti.scale);
4546                         precisionAggDist.push_back(ti.precision);
4547                         typeAggDist.push_back(ti.dtype);
4548                         csNumAggDist.push_back(ti.csNum);
4549                         widthAggDist.push_back(ti.width);
4550 
4551                         returnColMissing = false;
4552                     }
4553 
4554                     if (returnColMissing)
4555                     {
4556                         Message::Args args;
4557                         args.add(keyName(outIdx, retKey, jobInfo));
4558                         string emsg = IDBErrorInfo::instance()->
4559                                       errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
4560                         cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid="
4561                              << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias="
4562                              << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view="
4563                              << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function="
4564                              << (int) aggOp << endl;
4565                         throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
4566                     }
4567                 } //else not a direct hit
4568             } // else not a DISTINCT
4569 
4570             // update groupby vector if the groupby column is a returned column
4571             if (returnedColVec[i].second == 0)
4572             {
4573                 int dupGroupbyIndex = -1;
4574 
4575                 for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
4576                 {
4577                     if (jobInfo.groupByColVec[j] == retKey)
4578                     {
4579                         if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t) - 1)
4580                             groupByNoDist[j]->fOutputColumnIndex = outIdx;
4581                         else
4582                             dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex;
4583                     }
4584                 }
4585 
4586                 // a duplicate group by column
4587                 if (dupGroupbyIndex != -1)
4588                     functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
4589                                                 ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
4590             }
4591             else
4592             {
4593                 // update the aggregate function vector
4594                 SP_ROWAGG_FUNC_t funct;
4595                 if (aggOp == ROWAGG_UDAF)
4596                 {
4597                     funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, outIdx));
4598                 }
4599                 else
4600                 {
4601                     funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, outIdx));
4602                 }
4603 
4604                 if (aggOp == ROWAGG_COUNT_NO_OP)
4605                     funct->fAuxColumnIndex = colUm;
4606                 else if (aggOp == ROWAGG_CONSTANT)
4607                     funct->fAuxColumnIndex = jobInfo.cntStarPos;
4608 
4609                 functionVecUm.push_back(funct);
4610 
4611                 // find if this func is a duplicate
4612                 AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
4613 
4614                 if (iter != aggDupFuncMap.end())
4615                 {
4616                     if (funct->fAggFunction == ROWAGG_AVG)
4617                         funct->fAggFunction = ROWAGG_DUP_AVG;
4618                     else if (funct->fAggFunction == ROWAGG_STATS)
4619                         funct->fAggFunction = ROWAGG_DUP_STATS;
4620                     else if (funct->fAggFunction == ROWAGG_UDAF)
4621                         funct->fAggFunction = ROWAGG_DUP_UDAF;
4622                     else
4623                         funct->fAggFunction = ROWAGG_DUP_FUNCT;
4624 
4625                     funct->fAuxColumnIndex = iter->second;
4626                 }
4627                 else
4628                 {
4629                     aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
4630                                                    funct->fOutputColumnIndex));
4631                 }
4632 
4633                 if (returnedColVec[i].second == AggregateColumn::AVG)
4634                     avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
4635                 else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
4636                     avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
4637             }
4638             ++outIdx;
4639         } // for (i
4640 
4641         // now fix the AVG function, locate the count(column) position
4642         for (uint64_t i = 0; i < functionVecUm.size(); i++)
4643         {
4644             // if the count(k) can be associated with an avg(k)
4645             if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_NO_OP)
4646             {
4647                 map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
4648                     avgFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]);
4649 
4650                 if (k != avgFuncMap.end())
4651                     k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
4652             }
4653         }
4654 
4655         // there is avg(k), but no count(k) in the select list
4656         uint64_t lastCol = outIdx;
4657 
4658         for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
4659         {
4660             if (k->second->fAuxColumnIndex == (uint32_t) - 1)
4661             {
4662                 k->second->fAuxColumnIndex = lastCol++;
4663                 oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
4664                 keysAggDist.push_back(k->first);
4665                 scaleAggDist.push_back(0);
4666                 precisionAggDist.push_back(19);
4667                 typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
4668                 csNumAggDist.push_back(8);
4669                 widthAggDist.push_back(bigIntWidth);
4670             }
4671         }
4672 
4673         //distinct avg
4674         for (uint64_t i = 0; i < functionVecUm.size(); i++)
4675         {
4676             if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME)
4677             {
4678                 map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
4679                     avgDistFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]);
4680 
4681                 if (k != avgDistFuncMap.end())
4682                 {
4683                     k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
4684                     functionVecUm[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
4685                 }
4686             }
4687         }
4688 
4689         // there is avg(distinct k), but no count(distinct k) in the select list
4690         for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++)
4691         {
4692             // find count(distinct k) or add it
4693             if (k->second->fAuxColumnIndex == (uint32_t) - 1)
4694             {
4695                 k->second->fAuxColumnIndex = lastCol++;
4696                 oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
4697                 keysAggDist.push_back(k->first);
4698                 scaleAggDist.push_back(0);
4699                 precisionAggDist.push_back(19);
4700                 typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
4701                 csNumAggDist.push_back(8);
4702                 widthAggDist.push_back(bigIntWidth);
4703             }
4704         }
4705 
4706         // add auxiliary fields for UDAF and statistics functions
4707         for (uint64_t i = 0; i < functionVecUm.size(); i++)
4708         {
4709             uint64_t j = functionVecUm[i]->fInputColumnIndex;
4710 
4711             if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
4712             {
4713                 // Column for index of UDAF UserData struct
4714                 RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
4715 
4716                 if (!udafFuncCol)
4717                 {
4718                     throw logic_error("(5)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
4719                 }
4720 
4721                 functionVecUm[i]->fAuxColumnIndex = lastCol++;
4722                 oidsAggDist.push_back(oidsAggPm[j]); // Dummy?
4723                 keysAggDist.push_back(keysAggPm[j]); // Dummy?
4724                 scaleAggDist.push_back(0);
4725                 precisionAggDist.push_back(0);
4726                 typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
4727                 csNumAggDist.push_back(8);
4728                 widthAggDist.push_back(sizeof(uint64_t));
4729                 continue;
4730             }
4731 
4732             if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
4733                 continue;
4734 
4735             functionVecUm[i]->fAuxColumnIndex = lastCol;
4736 
4737             // sum(x)
4738             oidsAggDist.push_back(oidsAggPm[j]);
4739             keysAggDist.push_back(keysAggPm[j]);
4740             scaleAggDist.push_back(0);
4741             precisionAggDist.push_back(-1);
4742             typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
4743             csNumAggDist.push_back(8);
4744             widthAggDist.push_back(sizeof(long double));
4745             ++lastCol;
4746 
4747             // sum(x**2)
4748             oidsAggDist.push_back(oidsAggPm[j]);
4749             keysAggDist.push_back(keysAggPm[j]);
4750             scaleAggDist.push_back(0);
4751             precisionAggDist.push_back(-1);
4752             typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
4753             csNumAggDist.push_back(8);
4754             widthAggDist.push_back(sizeof(long double));
4755             ++lastCol;
4756         }
4757     }
4758 
4759 
4760     // calculate the offset and create the rowaggregations, rowgroups
4761     posAggUm.push_back(2);   // rid
4762 
4763     for (uint64_t i = 0; i < oidsAggUm.size(); i++)
4764         posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
4765 
4766     RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm,
4767                      csNumAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold);
4768     SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit));
4769     rowAggUm->timeZone(jobInfo.timeZone);
4770 
4771     posAggDist.push_back(2);   // rid
4772 
4773     for (uint64_t i = 0; i < oidsAggDist.size(); i++)
4774         posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
4775 
4776     RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist,
4777                        typeAggDist, csNumAggDist, scaleAggDist,
4778                        precisionAggDist, jobInfo.stringTableThreshold);
4779     SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
4780     rowAggDist->timeZone(jobInfo.timeZone);
4781 
4782     // if distinct key word applied to more than one aggregate column, reset rowAggDist
4783     vector<RowGroup> subRgVec;
4784 
4785     if (jobInfo.distinctColVec.size() > 1)
4786     {
4787         RowAggregationMultiDistinct* multiDistinctAggregator =
4788             new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit);
4789         multiDistinctAggregator->timeZone(jobInfo.timeZone);
4790         rowAggDist.reset(multiDistinctAggregator);
4791 
4792         // construct and add sub-aggregators to rowAggDist
4793         vector<uint32_t> posAggGb, posAggSub;
4794         vector<uint32_t> oidsAggGb, oidsAggSub;
4795         vector<uint32_t> keysAggGb, keysAggSub;
4796         vector<uint32_t> scaleAggGb, scaleAggSub;
4797         vector<uint32_t> precisionAggGb, precisionAggSub;
4798         vector<CalpontSystemCatalog::ColDataType> typeAggGb, typeAggSub;
4799         vector<uint32_t> csNumAggGb, csNumAggSub;
4800         vector<uint32_t> widthAggGb, widthAggSub;
4801 
4802         // populate groupby column info
4803         for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
4804         {
4805             oidsAggGb.push_back(oidsAggUm[i]);
4806             keysAggGb.push_back(keysAggUm[i]);
4807             scaleAggGb.push_back(scaleAggUm[i]);
4808             precisionAggGb.push_back(precisionAggUm[i]);
4809             typeAggGb.push_back(typeAggUm[i]);
4810             csNumAggGb.push_back(csNumAggUm[i]);
4811             widthAggGb.push_back(widthAggUm[i]);
4812         }
4813 
4814         // for distinct, each column requires seperate rowgroup
4815         vector<SP_ROWAGG_DIST> rowAggSubDistVec;
4816 
4817         for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
4818         {
4819             uint32_t distinctColKey = jobInfo.distinctColVec[i];
4820             uint64_t j = -1;
4821 
4822             // locate the distinct key in the row group
4823             for (uint64_t k = 0; k < keysAggUm.size(); k++)
4824             {
4825                 if (keysAggUm[k] == distinctColKey)
4826                 {
4827                     j = k;
4828                     break;
4829                 }
4830             }
4831 
4832             idbassert(j != (uint64_t) - 1);
4833 
4834             oidsAggSub = oidsAggGb;
4835             keysAggSub = keysAggGb;
4836             scaleAggSub = scaleAggGb;
4837             precisionAggSub = precisionAggGb;
4838             typeAggSub = typeAggGb;
4839             csNumAggSub = csNumAggGb;
4840             widthAggSub = widthAggGb;
4841 
4842             oidsAggSub.push_back(oidsAggUm[j]);
4843             keysAggSub.push_back(keysAggUm[j]);
4844             scaleAggSub.push_back(scaleAggUm[j]);
4845             precisionAggSub.push_back(precisionAggUm[j]);
4846             typeAggSub.push_back(typeAggUm[j]);
4847             csNumAggSub.push_back(csNumAggUm[i]);
4848             widthAggSub.push_back(widthAggUm[j]);
4849 
4850             // construct sub-rowgroup
4851             posAggSub.clear();
4852             posAggSub.push_back(2);   // rid
4853 
4854             for (uint64_t k = 0; k < oidsAggSub.size(); k++)
4855                 posAggSub.push_back(posAggSub[k] + widthAggSub[k]);
4856 
4857             RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub,
4858                            csNumAggSub, scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold);
4859             subRgVec.push_back(subRg);
4860 
4861             // construct groupby vector
4862             vector<SP_ROWAGG_GRPBY_t> groupBySub;
4863             uint64_t k = 0;
4864 
4865             while (k < jobInfo.groupByColVec.size())
4866             {
4867                 SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k));
4868                 groupBySub.push_back(groupby);
4869                 k++;
4870             }
4871 
4872             // add the distinct column as groupby
4873             SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
4874             groupBySub.push_back(groupby);
4875 
4876             // Keep a count of the parms after the first for any aggregate.
4877             // These will be skipped and the count needs to be subtracted
4878             // from where the aux column will be.
4879             int64_t multiParms = 0;
4880 
4881             // tricky part : 2 function vectors
4882             //   -- dummy function vector for sub-aggregator, which does distinct only
4883             //   -- aggregate function on this distinct column for rowAggDist
4884             vector<SP_ROWAGG_FUNC_t> functionSub1, functionSub2;
4885 
4886             for (uint64_t k = 0; k < returnedColVec.size(); k++)
4887             {
4888                 if (functionIdMap(returnedColVec[i].second) == ROWAGG_MULTI_PARM)
4889                 {
4890                     ++multiParms;
4891                     continue;
4892                 }
4893                 if (returnedColVec[k].first != distinctColKey)
4894                     continue;
4895 
4896                 // search the function in functionVec
4897                 vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
4898 
4899                 while (it != functionVecUm.end())
4900                 {
4901                     SP_ROWAGG_FUNC_t f = *it++;
4902 
4903                     if ((f->fOutputColumnIndex == k) &&
4904                             (f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME ||
4905                              f->fAggFunction == ROWAGG_DISTINCT_SUM ||
4906                              f->fAggFunction == ROWAGG_DISTINCT_AVG))
4907                     {
4908                         SP_ROWAGG_FUNC_t funct(
4909                             new RowAggFunctionCol(
4910                                 f->fAggFunction,
4911                                 f->fStatsFunction,
4912                                 groupBySub.size() - 1,
4913                                 f->fOutputColumnIndex,
4914                                 f->fAuxColumnIndex-multiParms));
4915                         functionSub2.push_back(funct);
4916                     }
4917                 }
4918             }
4919 
4920             // construct sub-aggregator
4921             SP_ROWAGG_UM_t subAgg(new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
4922             subAgg->timeZone(jobInfo.timeZone);
4923 
4924             // add to rowAggDist
4925             multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2);
4926         }
4927 
4928         // cover any non-distinct column functions
4929         {
4930             vector<SP_ROWAGG_FUNC_t> functionSub1 = functionNoDistVec;
4931             vector<SP_ROWAGG_FUNC_t> functionSub2;
4932             int64_t multiParms = 0;
4933 
4934             for (uint64_t k = 0; k < returnedColVec.size(); k++)
4935             {
4936                 if (functionIdMap(returnedColVec[k].second) == ROWAGG_MULTI_PARM)
4937                 {
4938                     ++multiParms;
4939                     continue;
4940                 }
4941                 // search non-distinct functions in functionVec
4942                 vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
4943 
4944                 while (it != functionVecUm.end())
4945                 {
4946                     SP_ROWAGG_FUNC_t funct;
4947                     SP_ROWAGG_FUNC_t f = *it++;
4948 
4949                     if (f->fOutputColumnIndex == k)
4950                     {
4951                         if (f->fAggFunction == ROWAGG_UDAF)
4952                         {
4953                             RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(f.get());
4954                             funct.reset(new RowUDAFFunctionCol(
4955                                             udafFuncCol->fUDAFContext,
4956                                             udafFuncCol->fInputColumnIndex,
4957                                             udafFuncCol->fOutputColumnIndex,
4958                                             udafFuncCol->fAuxColumnIndex-multiParms));
4959                             functionSub2.push_back(funct);
4960                         }
4961                         else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK ||
4962                                  f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
4963                                  f->fAggFunction == ROWAGG_SUM ||
4964                                  f->fAggFunction == ROWAGG_AVG ||
4965                                  f->fAggFunction == ROWAGG_MIN ||
4966                                  f->fAggFunction == ROWAGG_MAX ||
4967                                  f->fAggFunction == ROWAGG_STATS   ||
4968                                  f->fAggFunction == ROWAGG_BIT_AND ||
4969                                  f->fAggFunction == ROWAGG_BIT_OR  ||
4970                                  f->fAggFunction == ROWAGG_BIT_XOR ||
4971                                  f->fAggFunction == ROWAGG_CONSTANT)
4972                         {
4973                             funct.reset(
4974                                 new RowAggFunctionCol(
4975                                     f->fAggFunction,
4976                                     f->fStatsFunction,
4977                                     f->fInputColumnIndex,
4978                                     f->fOutputColumnIndex,
4979 	                            f->fAuxColumnIndex-multiParms));
4980                             functionSub2.push_back(funct);
4981                         }
4982                     }
4983                 }
4984             }
4985 
4986             if (functionSub1.size() > 0)
4987             {
4988                 // make sure the group by columns are available for next aggregate phase.
4989                 vector<SP_ROWAGG_GRPBY_t> groupBySubNoDist;
4990 
4991                 for (uint64_t i = 0; i < groupByNoDist.size(); i++)
4992                     groupBySubNoDist.push_back(SP_ROWAGG_GRPBY_t(
4993                                                    new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i)));
4994 
4995                 // construct sub-aggregator
4996                 SP_ROWAGG_UM_t subAgg(
4997                     new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
4998                 subAgg->timeZone(jobInfo.timeZone);
4999 
5000                 // add to rowAggDist
5001                 multiDistinctAggregator->addSubAggregator(subAgg, aggRgUm, functionSub2);
5002                 subRgVec.push_back(aggRgUm);
5003             }
5004         }
5005     }
5006 
5007     rowAggDist->addAggregator(rowAggUm, aggRgUm);
5008     rowgroups.push_back(aggRgDist);
5009     aggregators.push_back(rowAggDist);
5010 
5011     posAggPm.push_back(2);   // rid
5012 
5013     for (uint64_t i = 0; i < oidsAggPm.size(); i++)
5014         posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
5015 
5016     RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm,
5017                      csNumAggPm, scaleAggPm, precisionAggPm, jobInfo.stringTableThreshold);
5018     SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm));
5019     rowAggPm->timeZone(jobInfo.timeZone);
5020     rowgroups.push_back(aggRgPm);
5021     aggregators.push_back(rowAggPm);
5022 
5023     if (jobInfo.trace)
5024     {
5025         cout << "projected   RG: " << projRG.toString() << endl
5026              << "aggregated1 RG: " << aggRgPm.toString() << endl
5027              << "aggregated2 RG: " << aggRgUm.toString() << endl;
5028 
5029         for (uint64_t i = 0; i < subRgVec.size(); i++)
5030             cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl;
5031 
5032         cout << "aggregatedDist RG: " << aggRgDist.toString() << endl;
5033     }
5034 }
5035 
5036 
prepExpressionOnAggregate(SP_ROWAGG_UM_t & aggUM,JobInfo & jobInfo)5037 void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInfo& jobInfo)
5038 {
5039     map<uint32_t, uint32_t> keyToIndexMap;
5040 
5041     for (uint64_t i = 0; i < fRowGroupOut.getKeys().size(); ++i)
5042     {
5043         if (keyToIndexMap.find(fRowGroupOut.getKeys()[i]) == keyToIndexMap.end())
5044             keyToIndexMap.insert(make_pair(fRowGroupOut.getKeys()[i], i));
5045     }
5046 
5047     RetColsVector expressionVec;
5048     ArithmeticColumn* ac = NULL;
5049     FunctionColumn* fc = NULL;
5050     RetColsVector& cols = jobInfo.nonConstCols;
5051     vector<SimpleColumn*> simpleColumns;
5052 
5053     for (RetColsVector::iterator it = cols.begin(); it != cols.end(); ++it)
5054     {
5055         uint64_t eid = -1;
5056 
5057         if (((ac = dynamic_cast<ArithmeticColumn*>(it->get())) != NULL) &&
5058              (ac->aggColumnList().size() > 0) &&
5059              (ac->windowfunctionColumnList().size() == 0))
5060         {
5061             const vector<SimpleColumn*>& scols = ac->simpleColumnList();
5062             simpleColumns.insert(simpleColumns.end(), scols.begin(), scols.end());
5063 
5064             eid = ac->expressionId();
5065             expressionVec.push_back(*it);
5066         }
5067         else if (((fc = dynamic_cast<FunctionColumn*>(it->get())) != NULL) &&
5068                   (fc->aggColumnList().size() > 0) &&
5069                   (fc->windowfunctionColumnList().size() == 0))
5070         {
5071             const vector<SimpleColumn*>& sCols = fc->simpleColumnList();
5072             simpleColumns.insert(simpleColumns.end(), sCols.begin(), sCols.end());
5073 
5074             eid = fc->expressionId();
5075             expressionVec.push_back(*it);
5076         }
5077 
5078         // update the output index
5079         if (eid != (uint64_t) - 1)
5080         {
5081             map<uint32_t, uint32_t>::iterator mit = keyToIndexMap.find(getExpTupleKey(jobInfo, eid));
5082 
5083             if (mit != keyToIndexMap.end())
5084             {
5085                 it->get()->outputIndex(mit->second);
5086             }
5087             else
5088             {
5089                 ostringstream emsg;
5090                 emsg << "expression " << eid << " cannot be found in tuple.";
5091                 cerr << "prepExpressionOnAggregate: " << emsg.str() << endl;
5092                 throw QueryDataExcept(emsg.str(), aggregateFuncErr);
5093             }
5094         }
5095     }
5096 
5097     // map the input indices
5098     for (vector<SimpleColumn*>::iterator i = simpleColumns.begin(); i != simpleColumns.end(); i++)
5099     {
5100         CalpontSystemCatalog::OID oid = (*i)->oid();
5101         uint32_t key = getTupleKey(jobInfo, *i);
5102         CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*i)->colType());
5103 
5104         if (dictOid > 0)
5105         {
5106             oid = dictOid;
5107             key = jobInfo.keyInfo->dictKeyMap[key];
5108         }
5109 
5110         map<uint32_t, uint32_t>::iterator mit = keyToIndexMap.find(key);
5111 
5112         if (mit != keyToIndexMap.end())
5113         {
5114             (*i)->inputIndex(mit->second);
5115         }
5116         else
5117         {
5118             ostringstream emsg;
5119             emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' cannot be found in tuple.";
5120             cerr << "prepExpressionOnAggregate: " << emsg.str() << "  simple column: oid("
5121                  << oid << "), alias(" << extractTableAlias(*i) << ")." << endl;
5122             throw QueryDataExcept(emsg.str(), aggregateFuncErr);
5123         }
5124     }
5125 
5126     // add expression to UM aggregator
5127     aggUM->expression(expressionVec);
5128 }
5129 
5130 
addConstangAggregate(vector<ConstantAggData> & constAggDataVec)5131 void TupleAggregateStep::addConstangAggregate(vector<ConstantAggData>& constAggDataVec)
5132 {
5133     fAggregator->constantAggregate(constAggDataVec);
5134 }
5135 
5136 
aggregateRowGroups()5137 void TupleAggregateStep::aggregateRowGroups()
5138 {
5139     RGData rgData;
5140     bool more = true;
5141     RowGroupDL* dlIn = NULL;
5142 
5143     if (!fDoneAggregate)
5144     {
5145         if (fInputJobStepAssociation.outSize() == 0)
5146             throw logic_error("No input data list for TupleAggregate step.");
5147 
5148         dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
5149 
5150         if (dlIn == NULL)
5151             throw logic_error("Input is not RowGroup data list in TupleAggregate step.");
5152 
5153         if (fInputIter < 0)
5154             fInputIter = dlIn->getIterator();
5155 
5156         more = dlIn->next(fInputIter, &rgData);
5157 
5158         if (traceOn()) dlTimes.setFirstReadTime();
5159 
5160         StepTeleStats sts;
5161         sts.query_uuid = fQueryUuid;
5162         sts.step_uuid = fStepUuid;
5163         sts.msg_type = StepTeleStats::ST_START;
5164         sts.total_units_of_work = 1;
5165         postStepStartTele(sts);
5166 
5167         try
5168         {
5169             // this check covers the no row case
5170             if (!more && cancelled())
5171             {
5172                 fDoneAggregate = true;
5173                 fEndOfResult = true;
5174             }
5175 
5176             while (more && !fEndOfResult)
5177             {
5178                 fRowGroupIn.setData(&rgData);
5179                 fAggregator->addRowGroup(&fRowGroupIn);
5180                 more = dlIn->next(fInputIter, &rgData);
5181 
5182                 // error checking
5183                 if (cancelled())
5184                 {
5185                     fEndOfResult = true;
5186 
5187                     while (more)
5188                         more = dlIn->next(fInputIter, &rgData);
5189                 }
5190             }
5191         } // try
5192         catch (...)
5193         {
5194             handleException(std::current_exception(),
5195                             logging::tupleAggregateStepErr,
5196                             logging::ERR_AGGREGATION_TOO_BIG,
5197                             "TupleAggregateStep::aggregateRowGroups()");
5198             fEndOfResult = true;
5199         }
5200     }
5201 
5202     fDoneAggregate = true;
5203 
5204     while (more)
5205         more = dlIn->next(fInputIter, &rgData);
5206 
5207     if (traceOn())
5208     {
5209         dlTimes.setLastReadTime();
5210         dlTimes.setEndOfInputTime();
5211     }
5212 }
5213 
threadedAggregateFinalize(uint32_t threadID)5214 void TupleAggregateStep::threadedAggregateFinalize(uint32_t threadID)
5215 {
5216   for (uint32_t i = 0; i < fNumOfBuckets; ++i)
5217   {
5218     if (fAgg_mutex[i]->try_lock())
5219     {
5220       try
5221       {
5222         if (fAggregators[i])
5223           fAggregators[i]->finalAggregation();
5224       }
5225       catch (...)
5226       {
5227         fAgg_mutex[i]->unlock();
5228         throw;
5229       }
5230       fAgg_mutex[i]->unlock();
5231     }
5232   }
5233 }
5234 
threadedAggregateRowGroups(uint32_t threadID)5235 void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
5236 {
5237     RGData rgData;
5238     scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
5239     scoped_array<Row> distRow;
5240     scoped_array<shared_array<uint8_t> > distRowData;
5241     uint32_t bucketID;
5242     scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
5243     vector<uint32_t> hashLens;
5244     bool locked = false;
5245     bool more = true;
5246     RowGroupDL* dlIn = nullptr;
5247     uint32_t rgVecShift = float(fNumOfBuckets) / fNumOfThreads * threadID;
5248 
5249     RowAggregationMultiDistinct* multiDist = nullptr;
5250 
5251     if (!fDoneAggregate)
5252     {
5253         if (fInputJobStepAssociation.outSize() == 0)
5254             throw logic_error("No input data list for delivery.");
5255 
5256         dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
5257 
5258         if (dlIn == nullptr)
5259             throw logic_error("Input is not RowGroup data list in delivery step.");
5260 
5261         vector<RGData> rgDatas;
5262 
5263         try
5264         {
5265             // this check covers the no row case
5266             if (!more && cancelled())
5267             {
5268                 fDoneAggregate = true;
5269                 fEndOfResult = true;
5270             }
5271 
5272             bool firstRead = true;
5273             Row rowIn;
5274 
5275             while (more && !fEndOfResult)
5276             {
5277                 fMutex.lock();
5278                 locked = true;
5279 
5280                 for (uint32_t c = 0; c < fNumOfRowGroups && !cancelled(); c++)
5281                 {
5282                     more = dlIn->next(fInputIter, &rgData);
5283 
5284                     if (firstRead)
5285                     {
5286                         if (threadID == 0)
5287                         {
5288                             if (traceOn())
5289                                 dlTimes.setFirstReadTime();
5290 
5291                             StepTeleStats sts;
5292                             sts.query_uuid = fQueryUuid;
5293                             sts.step_uuid = fStepUuid;
5294                             sts.msg_type = StepTeleStats::ST_START;
5295                             sts.total_units_of_work = 1;
5296                             postStepStartTele(sts);
5297                         }
5298 
5299                         multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregator.get());
5300 
5301                         if (multiDist)
5302                         {
5303                             for (uint32_t i = 0; i < fNumOfBuckets; i++)
5304                                 rowBucketVecs[i].resize(multiDist->subAggregators().size());
5305 
5306                             distRow.reset(new Row[multiDist->subAggregators().size()]);
5307                             distRowData.reset(new shared_array<uint8_t>[
5308                                                   multiDist->subAggregators().size()]);
5309 
5310                             for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
5311                             {
5312                                 multiDist->subAggregators()[j]->getOutputRowGroup()->initRow(
5313                                     &distRow[j], true);
5314                                 distRowData[j].reset(new uint8_t[distRow[j].getSize()]);
5315                                 distRow[j].setData(distRowData[j].get());
5316                                 hashLens.push_back(multiDist->subAggregators()[j]->aggMapKeyLength());
5317                             }
5318                         }
5319                         else
5320                         {
5321                             for (uint32_t i = 0; i < fNumOfBuckets; i++)
5322                                 rowBucketVecs[i].resize(1);
5323 
5324                             if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()))
5325                                 hashLens.push_back(dynamic_cast<RowAggregationDistinct*>(fAggregator.get())->aggregator()->aggMapKeyLength());
5326                             else
5327                                 hashLens.push_back(fAggregator->aggMapKeyLength());
5328                         }
5329 
5330                         fRowGroupIns[threadID] = fRowGroupIn;
5331                         fRowGroupIns[threadID].initRow(&rowIn);
5332                         firstRead = false;
5333                     }
5334 
5335                     if (more)
5336                     {
5337                       fRowGroupIns[threadID].setData(&rgData);
5338                       fMemUsage[threadID] +=
5339                           fRowGroupIns[threadID].getSizeWithStrings();
5340 
5341                       bool diskAggAllowed = fRm->getAllowDiskAggregation();
5342                       if (!fRm->getMemory(
5343                               fRowGroupIns[threadID].getSizeWithStrings(),
5344                               fSessionMemLimit, !diskAggAllowed))
5345                       {
5346                           if (!diskAggAllowed)
5347                           {
5348                               rgDatas.clear();    // to short-cut the rest of processing
5349                               more = false;
5350                               fEndOfResult = true;
5351 
5352                               if (status() == 0)
5353                               {
5354                                   errorMessage(IDBErrorInfo::instance()->errorMsg(
5355                                       ERR_AGGREGATION_TOO_BIG));
5356                                   status(ERR_AGGREGATION_TOO_BIG);
5357                               }
5358                           }
5359                           else
5360                           {
5361                               rgDatas.push_back(rgData);
5362                           }
5363                           break;
5364                       }
5365                       rgDatas.push_back(rgData);
5366                     }
5367                     else
5368                     {
5369                         break;
5370                     }
5371                 }
5372 
5373                 // input rowgroup and aggregator is finalized only right before hashjoin starts
5374                 // if there is.
5375                 if (fAggregators.empty())
5376                 {
5377                     fAggregators.resize(fNumOfBuckets);
5378 
5379                     for (uint32_t i = 0; i < fNumOfBuckets; i++)
5380                     {
5381                         fAggregators[i].reset(fAggregator->clone());
5382                         fAggregators[i]->setInputOutput(fRowGroupIn, &fRowGroupOuts[i]);
5383                     }
5384                 }
5385 
5386                 fMutex.unlock();
5387                 locked = false;
5388 
5389                 multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregator.get());
5390 
5391                 // dispatch rows to row buckets
5392                 if (multiDist)
5393                 {
5394                     for (uint32_t c = 0; c < rgDatas.size(); c++)
5395                     {
5396                         fRowGroupIns[threadID].setData(&rgDatas[c]);
5397 
5398                         for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
5399                         {
5400                             fRowGroupIns[threadID].getRow(0, &rowIn);
5401                             rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
5402 
5403                             for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
5404                             {
5405                                 for (uint64_t k = 0;
5406                                         k < multiDist->subAggregators()[j]->getGroupByCols().size();
5407                                         ++k)
5408                                 {
5409                                     rowIn.copyField(distRow[j], k, multiDist->subAggregators()[j]->getGroupByCols()[k].get()->fInputColumnIndex);
5410                                 }
5411 
5412                                 // TBD This approach could potentiall
5413                                 // put all values in on bucket.
5414                                 uint64_t hash = rowgroup::hashRow(distRow[j], hashLens[j] - 1);
5415                                 bucketID = hash % fNumOfBuckets;
5416                                 rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
5417                                 rowIn.nextRow();
5418                             }
5419                         }
5420                     }
5421                 }
5422                 else
5423                 {
5424                     for (uint32_t c = 0; c < rgDatas.size(); c++)
5425                     {
5426                         fRowGroupIns[threadID].setData(&rgDatas[c]);
5427                         fRowGroupIns[threadID].getRow(0, &rowIn);
5428                         rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
5429 
5430                         for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
5431                         {
5432                             // The key is the groupby columns, which are the leading columns.
5433                             // TBD This approach could potential
5434                             // put all values in on bucket.
5435                             uint64_t hash = rowgroup::hashRow(rowIn, hashLens[0] - 1);
5436                             int bucketID = hash% fNumOfBuckets;
5437                             rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
5438                             rowIn.nextRow();
5439                         }
5440                     }
5441                 }
5442 
5443                 // insert to the hashmaps owned by each aggregator
5444                 bool done = false;
5445                 fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
5446 
5447                 while (!fEndOfResult && !done && !cancelled())
5448                 {
5449                     bool didWork = false;
5450                     done = true;
5451 
5452                     // each thread starts from its own bucket for better distribution
5453                     uint32_t shift = (rgVecShift++) % fNumOfBuckets;
5454                     for (uint32_t ci = 0; ci < fNumOfBuckets && !cancelled(); ci++)
5455                     {
5456                         uint32_t c = (ci + shift) % fNumOfBuckets;
5457                         if (!fEndOfResult && !bucketDone[c] && fAgg_mutex[c]->try_lock())
5458                         {
5459                             try
5460                             {
5461                                 didWork = true;
5462 
5463                                 if (multiDist)
5464                                     dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c]);
5465                                 else
5466                                     fAggregators[c]->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c][0]);
5467                             }
5468                             catch (...)
5469                             {
5470                                 fAgg_mutex[c]->unlock();
5471                                 throw;
5472                             }
5473 
5474                             rowBucketVecs[c][0].clear();
5475                             bucketDone[c] = true;
5476                             fAgg_mutex[c]->unlock();
5477                         }
5478                         else if (!bucketDone[c])
5479                         {
5480                             done = false;
5481                         }
5482                     }
5483 
5484                     if (!didWork)
5485                         usleep(1000);   // avoid using all CPU during busy wait
5486                 }
5487 
5488                 rgDatas.clear();
5489                 fRm->returnMemory(fMemUsage[threadID], fSessionMemLimit);
5490                 fMemUsage[threadID] = 0;
5491 
5492                 if (cancelled())
5493                 {
5494                     fEndOfResult = true;
5495                     fMutex.lock();
5496 
5497                     while (more)
5498                         more = dlIn->next(fInputIter, &rgData);
5499 
5500                     fMutex.unlock();
5501                 }
5502             }
5503         } // try
5504         catch (...)
5505         {
5506             handleException(std::current_exception(),
5507                             logging::tupleAggregateStepErr,
5508                             logging::ERR_AGGREGATION_TOO_BIG,
5509                             "TupleAggregateStep::threadedAggregateRowGroups()[" + std::to_string(threadID) + "]");
5510             fEndOfResult = true;
5511             fDoneAggregate = true;
5512         }
5513     }
5514 
5515     if (!locked) fMutex.lock();
5516 
5517     while (more)
5518         more = dlIn->next(fInputIter, &rgData);
5519 
5520     fMutex.unlock();
5521     locked = false;
5522 
5523     if (traceOn())
5524     {
5525         dlTimes.setLastReadTime();
5526         dlTimes.setEndOfInputTime();
5527     }
5528 }
5529 
5530 
doAggregate_singleThread()5531 void TupleAggregateStep::doAggregate_singleThread()
5532 {
5533     AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
5534     RowGroupDL* dlp = dl->rowGroupDL();
5535     RGData rgData;
5536 
5537     try
5538     {
5539         if (!fDoneAggregate)
5540             aggregateRowGroups();
5541 
5542         if (fEndOfResult == false)
5543         {
5544             // do the final aggregtion and deliver the results
5545             // at least one RowGroup for aggregate results
5546             if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) != NULL)
5547             {
5548                 dynamic_cast<RowAggregationDistinct*>(fAggregator.get())->doDistinctAggregation();
5549             }
5550 
5551             while (fAggregator->nextRowGroup())
5552             {
5553                 fAggregator->finalize();
5554                 fRowsReturned += fRowGroupOut.getRowCount();
5555                 rgData = fRowGroupOut.duplicate();
5556                 fRowGroupDelivered.setData(&rgData);
5557 
5558                 if (fRowGroupOut.getColumnCount() > fRowGroupDelivered.getColumnCount())
5559                     pruneAuxColumns();
5560 
5561                 dlp->insert(rgData);
5562             }
5563         }
5564     } // try
5565     catch (...)
5566     {
5567         handleException(std::current_exception(),
5568                         logging::tupleAggregateStepErr,
5569                         logging::ERR_AGGREGATION_TOO_BIG,
5570                         "TupleAggregateStep::doAggregate_singleThread()");
5571     }
5572 
5573     if (traceOn())
5574         printCalTrace();
5575 
5576     StepTeleStats sts;
5577     sts.query_uuid = fQueryUuid;
5578     sts.step_uuid = fStepUuid;
5579     sts.msg_type = StepTeleStats::ST_SUMMARY;
5580     sts.total_units_of_work = sts.units_of_work_completed = 1;
5581     sts.rows = fRowsReturned;
5582     postStepSummaryTele(sts);
5583 
5584     // Bug 3136, let mini stats to be formatted if traceOn.
5585     fEndOfResult = true;
5586     dlp->endOfInput();
5587 }
5588 
5589 
doAggregate()5590 void TupleAggregateStep::doAggregate()
5591 {
5592     // @bug4314. DO NOT access fAggregtor before the first read of input,
5593     // because hashjoin may not have finalized fAggregator.
5594     if (!fIsMultiThread)
5595         return doAggregate_singleThread();
5596 
5597     AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
5598     RowGroupDL* dlp = dl->rowGroupDL();
5599     ByteStream bs;
5600     doThreadedAggregate(bs, dlp);
5601     return;
5602 }
5603 
5604 
doThreadedAggregate(ByteStream & bs,RowGroupDL * dlp)5605 uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp)
5606 {
5607     uint32_t i;
5608     RGData rgData;
5609     uint64_t rowCount = 0;
5610 
5611     try
5612     {
5613         if (!fDoneAggregate)
5614         {
5615             initializeMultiThread();
5616 
5617             vector<uint64_t> runners; // thread pool handles
5618             runners.reserve(fNumOfThreads); // to prevent a resize during use
5619 
5620             // Start the aggregator threads
5621             for (i = 0; i < fNumOfThreads; i++)
5622             {
5623                 runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
5624             }
5625 
5626             // Now wait for all those threads
5627             jobstepThreadPool.join(runners);
5628         }
5629 
5630         if (!cancelled())
5631         {
5632             vector<uint64_t> runners;
5633             // use half of the threads because finalizing requires twice as
5634             // much memory on average
5635             uint32_t threads = std::max(1U, fNumOfThreads / 2);
5636             runners.reserve(threads);
5637             for (i = 0; i < threads; ++i)
5638             {
5639                 runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, i)));
5640             }
5641             jobstepThreadPool.join(runners);
5642         }
5643 
5644         if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)
5645         {
5646             // 2nd phase multi-threaded aggregate
5647             if (!fEndOfResult)
5648             {
5649                 if (!fDoneAggregate)
5650                 {
5651                     vector<uint64_t> runners; // thread pool handles
5652                     fRowGroupsDeliveredData.resize(fNumOfBuckets);
5653 
5654                     uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads;
5655                     uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ?
5656                                            fNumOfThreads : fNumOfThreads + 1);
5657                     //uint32_t bucketsPerThread = 1;
5658                     //uint32_t numThreads = fNumOfBuckets;
5659 
5660                     runners.reserve(numThreads);
5661 
5662                     for (i = 0; i < numThreads; i++)
5663                     {
5664                         runners.push_back(jobstepThreadPool.invoke(ThreadedSecondPhaseAggregator(this, i * bucketsPerThread, bucketsPerThread)));
5665                     }
5666 
5667                     jobstepThreadPool.join(runners);
5668                 }
5669 
5670                 fDoneAggregate = true;
5671                 bool done = true;
5672 
5673                 while (nextDeliveredRowGroup())
5674                 {
5675                     done = false;
5676                     rowCount = fRowGroupOut.getRowCount();
5677 
5678                     if ( rowCount != 0 )
5679                     {
5680                         if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
5681                             pruneAuxColumns();
5682 
5683                         if (dlp)
5684                         {
5685                             rgData = fRowGroupDelivered.duplicate();
5686                             dlp->insert(rgData);
5687                         }
5688                         else
5689                         {
5690                             bs.restart();
5691                             fRowGroupDelivered.serializeRGData(bs);
5692                             break;
5693                         }
5694                     }
5695 
5696                     done = true;
5697                 }
5698 
5699                 if (done)
5700                     fEndOfResult = true;
5701             }
5702         }
5703         else
5704         {
5705             auto* agg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
5706 
5707             if (!fEndOfResult)
5708             {
5709                 if (!fDoneAggregate)
5710                 {
5711                     for (i = 0; i < fNumOfBuckets; i++)
5712                     {
5713                         if (fEndOfResult == false)
5714                         {
5715                             // do the final aggregtion and deliver the results
5716                             // at least one RowGroup for aggregate results
5717                             // for "distinct without group by" case
5718                             if (agg != nullptr)
5719                             {
5720                                 auto* aggMultiDist =
5721                                     dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[i].get());
5722                                 auto* aggDist =
5723                                     dynamic_cast<RowAggregationDistinct*>(fAggregators[i].get());
5724                                 agg->aggregator(aggDist->aggregator());
5725 
5726                                 if (aggMultiDist)
5727                                 {
5728                                     (dynamic_cast<RowAggregationMultiDistinct*>(agg))
5729                                         ->subAggregators(aggMultiDist->subAggregators());
5730                                 }
5731 
5732                                 agg->doDistinctAggregation();
5733                             }
5734                             // for "group by without distinct" case
5735                             else
5736                             {
5737                                 fAggregator->append(fAggregators[i].get());
5738                             }
5739                         }
5740                     }
5741                 }
5742 
5743                 fDoneAggregate = true;
5744             }
5745 
5746             bool done = true;
5747 
5748             //@bug4459
5749             while (fAggregator->nextRowGroup() && !cancelled())
5750             {
5751                 done = false;
5752                 fAggregator->finalize();
5753                 rowCount = fRowGroupOut.getRowCount();
5754                 fRowsReturned += rowCount;
5755                 fRowGroupDelivered.setData(fRowGroupOut.getRGData());
5756 
5757                 if (rowCount != 0)
5758                 {
5759                     if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
5760                         pruneAuxColumns();
5761 
5762                     if (dlp)
5763                     {
5764                         rgData = fRowGroupDelivered.duplicate();
5765                         dlp->insert(rgData);
5766                     }
5767                     else
5768                     {
5769                         bs.restart();
5770                         fRowGroupDelivered.serializeRGData(bs);
5771                         break;
5772                     }
5773                 }
5774 
5775                 done = true;
5776             }
5777 
5778             if (done)
5779             {
5780                 fEndOfResult = true;
5781             }
5782         }
5783     } //try
5784     catch (...)
5785     {
5786         handleException(std::current_exception(),
5787                         logging::tupleAggregateStepErr,
5788                         logging::ERR_AGGREGATION_TOO_BIG,
5789                         "TupleAggregateStep::doThreadedAggregate()");
5790         fEndOfResult = true;
5791     }
5792 
5793     if (fEndOfResult)
5794     {
5795         StepTeleStats sts;
5796         sts.query_uuid = fQueryUuid;
5797         sts.step_uuid = fStepUuid;
5798         sts.msg_type = StepTeleStats::ST_SUMMARY;
5799         sts.total_units_of_work = sts.units_of_work_completed = 1;
5800         sts.rows = fRowsReturned;
5801         postStepSummaryTele(sts);
5802 
5803         if (dlp)
5804         {
5805             dlp->endOfInput();
5806         }
5807         else
5808         {
5809             // send an empty / error band
5810             RGData rgData(fRowGroupOut, 0);
5811             fRowGroupOut.setData(&rgData);
5812             fRowGroupOut.resetRowGroup(0);
5813             fRowGroupOut.setStatus(status());
5814             fRowGroupOut.serializeRGData(bs);
5815             rowCount = 0;
5816         }
5817 
5818         if (traceOn())
5819             printCalTrace();
5820     }
5821 
5822     return rowCount;
5823 }
5824 
5825 
pruneAuxColumns()5826 void TupleAggregateStep::pruneAuxColumns()
5827 {
5828     uint64_t rowCount = fRowGroupOut.getRowCount();
5829     Row row1, row2;
5830     fRowGroupOut.initRow(&row1);
5831     fRowGroupOut.getRow(0, &row1);
5832     fRowGroupDelivered.initRow(&row2);
5833     fRowGroupDelivered.getRow(0, &row2);
5834 
5835     for (uint64_t i = 1; i < rowCount; i++)
5836     {
5837         // skip the first row
5838         row1.nextRow();
5839         row2.nextRow();
5840 
5841         // bug4463, memmove for src, dest overlap
5842         memmove(row2.getData(), row1.getData(), row2.getSize());
5843     }
5844 }
5845 
5846 
printCalTrace()5847 void TupleAggregateStep::printCalTrace()
5848 {
5849     time_t t = time (0);
5850     char timeString[50];
5851     ctime_r (&t, timeString);
5852     timeString[strlen (timeString ) - 1] = '\0';
5853     ostringstream logStr;
5854     logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
5855             << "; total rows returned-" << fRowsReturned << endl
5856             << "\t1st read " << dlTimes.FirstReadTimeString()
5857             << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
5858             << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
5859             << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
5860             << "\tJob completion status " << status() << endl;
5861     logEnd(logStr.str().c_str());
5862     fExtendedInfo += logStr.str();
5863     formatMiniStats();
5864 }
5865 
5866 
formatMiniStats()5867 void TupleAggregateStep::formatMiniStats()
5868 {
5869     ostringstream oss;
5870     oss << "TAS "
5871         << "UM "
5872         << "- "
5873         << "- "
5874         << "- "
5875         << "- "
5876         << "- "
5877         << "- "
5878         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
5879         << fRowsReturned << " ";
5880     fMiniInfo += oss.str();
5881 }
5882 
5883 
5884 }   //namespace
5885 // vim:ts=4 sw=4:
5886 
5887