1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: jlf_tuplejoblist.cpp 9728 2013-07-26 22:08:20Z xlou $
20 
21 // Cross engine needs to be at the top due to MySQL includes
22 #include "crossenginestep.h"
23 #include <iostream>
24 #include <stack>
25 #include <iterator>
26 #include <algorithm>
27 //#define NDEBUG
28 //#include <cassert>
29 #include <vector>
30 #include <set>
31 #include <map>
32 #include <limits>
33 using namespace std;
34 
35 #include <boost/scoped_ptr.hpp>
36 #include <boost/shared_ptr.hpp>
37 using namespace boost;
38 
39 #include "calpontsystemcatalog.h"
40 #include "logicoperator.h"
41 using namespace execplan;
42 
43 #include "rowgroup.h"
44 #include "rowaggregation.h"
45 using namespace rowgroup;
46 
47 #include "idberrorinfo.h"
48 #include "errorids.h"
49 #include "exceptclasses.h"
50 using namespace logging;
51 
52 #include "dataconvert.h"
53 using namespace dataconvert;
54 
55 #include "elementtype.h"
56 #include "jlf_common.h"
57 #include "limitedorderby.h"
58 #include "jobstep.h"
59 #include "primitivestep.h"
60 #include "expressionstep.h"
61 #include "subquerystep.h"
62 #include "tupleaggregatestep.h"
63 #include "tupleannexstep.h"
64 #include "tupleconstantstep.h"
65 #include "tuplehashjoin.h"
66 #include "tuplehavingstep.h"
67 #include "tupleunion.h"
68 #include "windowfunctionstep.h"
69 #include "configcpp.h"
70 #include "jlf_tuplejoblist.h"
71 using namespace joblist;
72 
73 
74 namespace
75 {
76 
77 
78 // construct a pcolstep from column key
tupleKeyToProjectStep(uint32_t key,JobStepVector & jsv,JobInfo & jobInfo)79 void tupleKeyToProjectStep(uint32_t key, JobStepVector& jsv, JobInfo& jobInfo)
80 {
81     // this JSA is for pcolstep construct, is not taking input/output
82     // because the pcolstep is to be added into TBPS
83     CalpontSystemCatalog::OID oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
84     DictOidToColOidMap::iterator mit = jobInfo.keyInfo->dictOidToColOid.find(oid);
85 
86     // if the key is for a dictionary, start with its token key
87     if (mit != jobInfo.keyInfo->dictOidToColOid.end())
88     {
89         oid = mit->second;
90 
91         for (map<uint32_t, uint32_t>::iterator i = jobInfo.keyInfo->dictKeyMap.begin();
92                 i != jobInfo.keyInfo->dictKeyMap.end();
93                 i++)
94         {
95             if (key == i->second)
96             {
97                 key = i->first;
98                 break;
99             }
100         }
101 
102         jobInfo.tokenOnly[key] = false;
103     }
104 
105     CalpontSystemCatalog::OID tableOid = jobInfo.keyInfo->tupleKeyToTableOid[key];
106 //	JobStepAssociation dummyJsa;
107 //	AnyDataListSPtr adl(new AnyDataList());
108 //	RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
109 //	dl->OID(oid);
110 //	adl->rowGroupDL(dl);
111 //	dummyJsa.outAdd(adl);
112 
113     CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key];
114 
115     if (jobInfo.keyInfo->token2DictTypeMap.find(key) != jobInfo.keyInfo->token2DictTypeMap.end())
116         ct = jobInfo.keyInfo->token2DictTypeMap[key];
117 
118     uint32_t pt = jobInfo.keyInfo->pseudoType[key];
119 
120     SJSTEP sjs;
121 
122     if (pt == 0)
123         sjs.reset(new pColStep(oid, tableOid, ct, jobInfo));
124     else
125         sjs.reset(new PseudoColStep(oid, tableOid, pt, ct, jobInfo));
126 
127     sjs->alias(jobInfo.keyInfo->tupleKeyVec[key].fTable);
128     sjs->view(jobInfo.keyInfo->tupleKeyVec[key].fView);
129     sjs->schema(jobInfo.keyInfo->tupleKeyVec[key].fSchema);
130     sjs->name(jobInfo.keyInfo->keyName[key]);
131     sjs->tupleId(key);
132 
133     jsv.push_back(sjs);
134 
135     bool tokenOnly = false;
136     map<uint32_t, bool>::iterator toIt = jobInfo.tokenOnly.find(key);
137 
138     if (toIt != jobInfo.tokenOnly.end())
139         tokenOnly = toIt->second;
140 
141     if (sjs.get()->isDictCol() && !tokenOnly)
142     {
143         // Need a dictionary step
144         uint32_t dictKey = jobInfo.keyInfo->dictKeyMap[key];
145         CalpontSystemCatalog::OID dictOid = jobInfo.keyInfo->tupleKeyVec[dictKey].fId;
146         sjs.reset(new pDictionaryStep(dictOid, tableOid, ct, jobInfo));
147         sjs->alias(jobInfo.keyInfo->tupleKeyVec[dictKey].fTable);
148         sjs->view(jobInfo.keyInfo->tupleKeyVec[dictKey].fView);
149         sjs->schema(jobInfo.keyInfo->tupleKeyVec[dictKey].fSchema);
150         sjs->name(jobInfo.keyInfo->keyName[dictKey]);
151         sjs->tupleId(dictKey);
152 
153         jobInfo.keyInfo->dictOidToColOid[dictOid] = oid;
154 
155         jsv.push_back(sjs);
156     }
157 }
158 
159 
addColumnToRG(uint32_t cid,vector<uint32_t> & pos,vector<uint32_t> & oids,vector<uint32_t> & keys,vector<uint32_t> & scale,vector<uint32_t> & precision,vector<CalpontSystemCatalog::ColDataType> & types,vector<uint32_t> & csNums,JobInfo & jobInfo)160 inline void addColumnToRG(uint32_t cid, vector<uint32_t>& pos, vector<uint32_t>& oids,
161                           vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
162                           vector<CalpontSystemCatalog::ColDataType>& types, vector<uint32_t>& csNums, JobInfo& jobInfo)
163 {
164     TupleInfo ti(getTupleInfo(cid, jobInfo));
165     pos.push_back(pos.back() + ti.width);
166     oids.push_back(ti.oid);
167     keys.push_back(ti.key);
168     types.push_back(ti.dtype);
169     csNums.push_back(ti.csNum);
170     scale.push_back(ti.scale);
171     precision.push_back(ti.precision);
172 }
173 
174 
addColumnInExpToRG(uint32_t cid,vector<uint32_t> & pos,vector<uint32_t> & oids,vector<uint32_t> & keys,vector<uint32_t> & scale,vector<uint32_t> & precision,vector<CalpontSystemCatalog::ColDataType> & types,vector<uint32_t> & csNums,JobInfo & jobInfo)175 inline void addColumnInExpToRG(uint32_t cid, vector<uint32_t>& pos, vector<uint32_t>& oids,
176                                vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
177                                vector<CalpontSystemCatalog::ColDataType>& types, vector<uint32_t>& csNums, JobInfo& jobInfo)
178 {
179     if (jobInfo.keyInfo->dictKeyMap.find(cid) != jobInfo.keyInfo->dictKeyMap.end())
180         cid = jobInfo.keyInfo->dictKeyMap[cid];
181 
182     if (find(keys.begin(), keys.end(), cid) == keys.end())
183         addColumnToRG(cid, pos, oids, keys, scale, precision, types, csNums, jobInfo);
184 }
185 
186 
addColumnsToRG(uint32_t tid,vector<uint32_t> & pos,vector<uint32_t> & oids,vector<uint32_t> & keys,vector<uint32_t> & scale,vector<uint32_t> & precision,vector<CalpontSystemCatalog::ColDataType> & types,vector<uint32_t> & csNums,TableInfoMap & tableInfoMap,JobInfo & jobInfo)187 inline void addColumnsToRG(uint32_t tid, vector<uint32_t>& pos, vector<uint32_t>& oids,
188                            vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
189                            vector<CalpontSystemCatalog::ColDataType>& types,
190                            vector<uint32_t>& csNums,
191                            TableInfoMap& tableInfoMap, JobInfo& jobInfo)
192 {
193     // -- the selected columns
194     vector<uint32_t>& pjCol = tableInfoMap[tid].fProjectCols;
195 
196     for (unsigned i = 0; i < pjCol.size(); i++)
197     {
198         addColumnToRG(pjCol[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
199     }
200 
201     // -- any columns will be used in cross-table exps
202     vector<uint32_t>& exp2 = tableInfoMap[tid].fColsInExp2;
203 
204     for (unsigned i = 0; i < exp2.size(); i++)
205     {
206         addColumnInExpToRG(exp2[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
207     }
208 
209     // -- any columns will be used in returned exps
210     vector<uint32_t>& expr = tableInfoMap[tid].fColsInRetExp;
211 
212     for (unsigned i = 0; i < expr.size(); i++)
213     {
214         addColumnInExpToRG(expr[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
215     }
216 
217     // -- any columns will be used in final outer join expression
218     vector<uint32_t>& expo = tableInfoMap[tid].fColsInOuter;
219 
220     for (unsigned i = 0; i < expo.size(); i++)
221     {
222         addColumnInExpToRG(expo[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
223     }
224 }
225 
226 
constructJoinedRowGroup(RowGroup & rg,uint32_t large,uint32_t prev,bool root,set<uint32_t> & tableSet,TableInfoMap & tableInfoMap,JobInfo & jobInfo)227 void constructJoinedRowGroup(RowGroup& rg, uint32_t large, uint32_t prev, bool root,
228                              set<uint32_t>& tableSet, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
229 {
230     // Construct the output rowgroup for the join.
231     vector<uint32_t> pos;
232     vector<uint32_t> oids;
233     vector<uint32_t> keys;
234     vector<uint32_t> scale;
235     vector<uint32_t> precision;
236     vector<CalpontSystemCatalog::ColDataType> types;
237     vector<uint32_t> csNums;
238     pos.push_back(2);
239 
240     // -- start with the join keys
241     // lead by joinkeys -- to have more controls on joins
242     //    [loop throuh the key list to support compound join]
243     if (root == false)  // not root
244     {
245         vector<uint32_t>& joinKeys = jobInfo.tableJoinMap[make_pair(large, prev)].fLeftKeys;
246 
247         for (vector<uint32_t>::iterator i = joinKeys.begin(); i != joinKeys.end(); i++)
248             addColumnToRG(*i, pos, oids, keys, scale, precision, types, csNums, jobInfo);
249     }
250 
251     // -- followed by the columns in select or expression
252     for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
253         addColumnsToRG(*i, pos, oids, keys, scale, precision, types, csNums, tableInfoMap, jobInfo);
254 
255     RowGroup tmpRg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
256     rg = tmpRg;
257 }
258 
259 
constructJoinedRowGroup(RowGroup & rg,set<uint32_t> & tableSet,TableInfoMap & tableInfoMap,JobInfo & jobInfo)260 void constructJoinedRowGroup(RowGroup& rg, set<uint32_t>& tableSet, TableInfoMap& tableInfoMap,
261                              JobInfo& jobInfo)
262 {
263     // Construct the output rowgroup for the join.
264     vector<uint32_t> pos;
265     vector<uint32_t> oids;
266     vector<uint32_t> keys;
267     vector<uint32_t> scale;
268     vector<uint32_t> precision;
269     vector<CalpontSystemCatalog::ColDataType> types;
270     vector<uint32_t> csNums;
271     pos.push_back(2);
272 
273     for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
274     {
275         // columns in select or expression
276         addColumnsToRG(*i, pos, oids, keys, scale, precision, types, csNums, tableInfoMap, jobInfo);
277 
278         // keys to be joined if not already in the rowgroup
279         vector<uint32_t>& adjList = tableInfoMap[*i].fAdjacentList;
280 
281         for (vector<uint32_t>::iterator j = adjList.begin(); j != adjList.end(); j++)
282         {
283             if (find(tableSet.begin(), tableSet.end(), *j) == tableSet.end())
284             {
285                 // not joined
286                 vector<uint32_t>& joinKeys = jobInfo.tableJoinMap[make_pair(*i, *j)].fLeftKeys;
287 
288                 for (vector<uint32_t>::iterator k = joinKeys.begin(); k != joinKeys.end(); k++)
289                 {
290                     if (find(keys.begin(), keys.end(), *k) == keys.end())
291                         addColumnToRG(*k, pos, oids, keys, scale, precision, types, csNums, jobInfo);
292                 }
293             }
294         }
295     }
296 
297     RowGroup tmpRg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
298     rg = tmpRg;
299 }
300 
301 
updateExp2Cols(JobStepVector & expSteps,TableInfoMap & tableInfoMap,JobInfo & jobInfo)302 void updateExp2Cols(JobStepVector& expSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
303 {
304     for (JobStepVector::iterator it = expSteps.begin(); it != expSteps.end(); it++)
305     {
306         ExpressionStep* exps = dynamic_cast<ExpressionStep*>(it->get());
307         const vector<uint32_t>& tables = exps->tableKeys();
308         const vector<uint32_t>& columns = exps->columnKeys();
309 
310         for (uint64_t i = 0; i < tables.size(); ++i)
311         {
312             vector<uint32_t>& exp2 = tableInfoMap[tables[i]].fColsInExp2;
313             vector<uint32_t>::iterator cit = find(exp2.begin(), exp2.end(), columns[i]);
314 
315             if (cit != exp2.end())
316                 exp2.erase(cit);
317         }
318     }
319 }
320 
321 
adjustLastStep(JobStepVector & querySteps,DeliveredTableMap & deliverySteps,JobInfo & jobInfo)322 void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, JobInfo& jobInfo)
323 {
324     SJSTEP spjs = querySteps.back();
325     BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(spjs.get());
326     TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
327     SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(spjs.get());
328 
329     if (!bps && !thjs && !sas)
330         throw runtime_error("Bad last step");
331 
332     // original output rowgroup of the step
333     TupleJobStep* tjs = dynamic_cast<TupleJobStep*>(spjs.get());
334     const RowGroup* rg0 = &(tjs->getOutputRowGroup());
335 
336     if (jobInfo.trace) cout << "Output RowGroup 0: " << rg0->toString() << endl;
337 
338     // Construct a rowgroup that matches the select columns
339     TupleInfoVector v = jobInfo.pjColList;
340     vector<uint32_t> pos;
341     vector<uint32_t> oids;
342     vector<uint32_t> keys;
343     vector<uint32_t> scale;
344     vector<uint32_t> precision;
345     vector<CalpontSystemCatalog::ColDataType> types;
346     vector<uint32_t> csNums;
347     pos.push_back(2);
348 
349     for (unsigned i = 0; i < v.size(); i++)
350     {
351         pos.push_back(pos.back() + v[i].width);
352         oids.push_back(v[i].oid);
353         keys.push_back(v[i].key);
354         types.push_back(v[i].dtype);
355         csNums.push_back(v[i].csNum);
356         scale.push_back(v[i].scale);
357         precision.push_back(v[i].precision);
358     }
359 
360     RowGroup rg1(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
361 
362     // evaluate the returned/groupby expressions if any
363     JobStepVector& expSteps = jobInfo.returnedExpressions;
364 
365     if (expSteps.size() > 0)
366     {
367         // create a RG has the keys not in rg0
368         pos.clear();
369         oids.clear();
370         keys.clear();
371         scale.clear();
372         precision.clear();
373         types.clear();
374         csNums.clear();
375         pos.push_back(2);
376 
377         const vector<uint32_t>& keys0 = rg0->getKeys();
378 
379         for (unsigned i = 0; i < v.size(); i++)
380         {
381             if (find(keys0.begin(), keys0.end(), v[i].key) == keys0.end())
382             {
383                 pos.push_back(pos.back() + v[i].width);
384                 oids.push_back(v[i].oid);
385                 keys.push_back(v[i].key);
386                 types.push_back(v[i].dtype);
387                 csNums.push_back(v[i].csNum);
388                 scale.push_back(v[i].scale);
389                 precision.push_back(v[i].precision);
390             }
391         }
392 
393         // for v0.9.3.0, the output and input to the expression are in the same row
394         // add the returned column into the rg0 as rg01
395         RowGroup rg01 = *rg0 + RowGroup(oids.size(), pos, oids, keys, types, csNums,  scale, precision, jobInfo.stringTableThreshold);
396 
397         if (jobInfo.trace) cout << "Output RowGroup 01: " << rg01.toString() << endl;
398 
399         map<uint32_t, uint32_t> keyToIndexMap0;  // maps key to the index in the input RG
400 
401         for (uint64_t i = 0; i < rg01.getKeys().size(); ++i)
402             keyToIndexMap0.insert(make_pair(rg01.getKeys()[i], i));
403 
404         vector<SRCP> exps; // columns to be evaluated
405 
406         for (JobStepVector::iterator eit = expSteps.begin(); eit != expSteps.end(); ++eit)
407         {
408             ExpressionStep* es = dynamic_cast<ExpressionStep*>(eit->get());
409             es->updateInputIndex(keyToIndexMap0, jobInfo);
410             es->updateOutputIndex(keyToIndexMap0, jobInfo); // same row as input
411             exps.push_back(es->expression());
412         }
413 
414         // last step can be tbps (no join) or thjs, either one can have a group 3 expression
415         if (bps || thjs)
416         {
417             tjs->setOutputRowGroup(rg01);
418             tjs->setFcnExpGroup3(exps);
419             tjs->setFE23Output(rg1);
420         }
421         else if (sas)
422         {
423             sas->setFeRowGroup(rg01);
424             sas->addExpression(exps);
425             sas->setOutputRowGroup(rg1);
426         }
427     }
428     else
429     {
430         if (thjs && thjs->hasFcnExpGroup2())
431             thjs->setFE23Output(rg1);
432         else
433             tjs->setOutputRowGroup(rg1);
434     }
435 
436     if (jobInfo.trace) cout << "Output RowGroup 1: " << rg1.toString() << endl;
437 
438     if (jobInfo.hasAggregation == false)
439     {
440         if (thjs != NULL) //setup a few things for the final thjs step...
441             thjs->outputAssociation(JobStepAssociation());
442 
443         deliverySteps[CNX_VTABLE_ID] = spjs;
444     }
445     else
446     {
447         TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());
448         idbassert(tds != NULL);
449         SJSTEP ads = TupleAggregateStep::prepAggregate(spjs, jobInfo);
450         querySteps.push_back(ads);
451 
452         if (ads.get() != NULL)
453             deliverySteps[CNX_VTABLE_ID] = ads;
454         else
455             throw std::logic_error("Failed to prepare Aggregation Delivery Step.");
456     }
457 
458     if (jobInfo.havingStep)
459     {
460         TupleDeliveryStep* ds =
461             dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
462 
463         AnyDataListSPtr spdlIn(new AnyDataList());
464         RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize);
465         dlIn->OID(CNX_VTABLE_ID);
466         spdlIn->rowGroupDL(dlIn);
467         JobStepAssociation jsaIn;
468         jsaIn.outAdd(spdlIn);
469         dynamic_cast<JobStep*>(ds)->outputAssociation(jsaIn);
470         jobInfo.havingStep->inputAssociation(jsaIn);
471 
472         AnyDataListSPtr spdlOut(new AnyDataList());
473         RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
474         dlOut->OID(CNX_VTABLE_ID);
475         spdlOut->rowGroupDL(dlOut);
476         JobStepAssociation jsaOut;
477         jsaOut.outAdd(spdlOut);
478         jobInfo.havingStep->outputAssociation(jsaOut);
479 
480         querySteps.push_back(jobInfo.havingStep);
481         dynamic_cast<TupleHavingStep*>(jobInfo.havingStep.get())
482         ->initialize(ds->getDeliveredRowGroup(), jobInfo);
483         deliverySteps[CNX_VTABLE_ID] = jobInfo.havingStep;
484     }
485 
486     if (jobInfo.windowCols.size() > 0)
487     {
488         spjs = querySteps.back();
489         SJSTEP ws = WindowFunctionStep::makeWindowFunctionStep(spjs, jobInfo);
490         idbassert(ws.get());
491         querySteps.push_back(ws);
492         deliverySteps[CNX_VTABLE_ID] = ws;
493     }
494 
495     // TODO MCOL-894 we don't need to run sorting|distinct
496     // every time
497 //    if ((jobInfo.limitCount != (uint64_t) - 1) ||
498 //            (jobInfo.constantCol == CONST_COL_EXIST) ||
499 //            (jobInfo.hasDistinct))
500 //    {
501     if (jobInfo.annexStep.get() == NULL)
502         jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo));
503 
504     TupleAnnexStep* tas = dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get());
505     tas->setLimit(jobInfo.limitStart, jobInfo.limitCount);
506 
507     if (jobInfo.orderByColVec.size() > 0)
508     {
509         tas->addOrderBy(new LimitedOrderBy());
510         if (jobInfo.orderByThreads > 1)
511             tas->setParallelOp();
512         tas->setMaxThreads(jobInfo.orderByThreads);
513     }
514 
515     if (jobInfo.constantCol == CONST_COL_EXIST)
516         tas->addConstant(new TupleConstantStep(jobInfo));
517 
518     if (jobInfo.hasDistinct)
519         tas->setDistinct();
520 
521 //    }
522 
523     if (jobInfo.annexStep)
524     {
525         TupleDeliveryStep* ds =
526             dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
527         RowGroup rg2 = ds->getDeliveredRowGroup();
528 
529         if (jobInfo.trace) cout << "Output RowGroup 2: " << rg2.toString() << endl;
530 
531         AnyDataListSPtr spdlIn(new AnyDataList());
532         RowGroupDL* dlIn;
533         if (jobInfo.orderByColVec.size() > 0)
534             dlIn = new RowGroupDL(jobInfo.orderByThreads, jobInfo.fifoSize);
535         else
536             dlIn = new RowGroupDL(1, jobInfo.fifoSize);
537         dlIn->OID(CNX_VTABLE_ID);
538         spdlIn->rowGroupDL(dlIn);
539         JobStepAssociation jsaIn;
540         jsaIn.outAdd(spdlIn);
541         dynamic_cast<JobStep*>(ds)->outputAssociation(jsaIn);
542         jobInfo.annexStep->inputAssociation(jsaIn);
543 
544         AnyDataListSPtr spdlOut(new AnyDataList());
545         RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
546         dlOut->OID(CNX_VTABLE_ID);
547         spdlOut->rowGroupDL(dlOut);
548         JobStepAssociation jsaOut;
549         jsaOut.outAdd(spdlOut);
550         jobInfo.annexStep->outputAssociation(jsaOut);
551 
552         querySteps.push_back(jobInfo.annexStep);
553         dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get())->initialize(rg2, jobInfo);
554         deliverySteps[CNX_VTABLE_ID] = jobInfo.annexStep;
555     }
556 
557     // Check if constant false
558     if (jobInfo.constantFalse)
559     {
560         TupleConstantBooleanStep* tcs = new TupleConstantBooleanStep(jobInfo, false);
561         tcs->outputAssociation(querySteps.back().get()->outputAssociation());
562         TupleDeliveryStep* tds =
563             dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
564         tcs->initialize(tds->getDeliveredRowGroup(), jobInfo);
565 
566         JobStepVector::iterator it = querySteps.begin();
567 
568         while (it != querySteps.end())
569         {
570             if ((dynamic_cast<TupleAggregateStep*>(it->get()) != NULL) ||
571                     (dynamic_cast<TupleAnnexStep*>(it->get()) != NULL))
572                 break;
573 
574             it++;
575         }
576 
577         SJSTEP bs(tcs);
578 
579         if (it != querySteps.end())
580             tcs->outputAssociation((*it)->inputAssociation());
581         else
582             deliverySteps[CNX_VTABLE_ID] = bs;
583 
584         querySteps.erase(querySteps.begin(), it);
585         querySteps.insert(querySteps.begin(), bs);
586     }
587 
588     if (jobInfo.trace)
589     {
590         TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
591 
592         if (ds) cout << "Delivered RowGroup: " << ds->getDeliveredRowGroup().toString() << endl;
593     }
594 }
595 
596 
597 // add the project steps into the query TBPS and construct the output rowgroup
addProjectStepsToBps(TableInfoMap::iterator & mit,BatchPrimitive * bps,JobInfo & jobInfo)598 void addProjectStepsToBps(TableInfoMap::iterator& mit, BatchPrimitive* bps, JobInfo& jobInfo)
599 {
600     // make sure we have a good tuple bps
601     if (bps == NULL)
602         throw runtime_error("BPS is null");
603 
604     // construct a pcolstep for each joinkey to be projected
605     vector<uint32_t>& joinKeys = mit->second.fJoinKeys;
606     JobStepVector keySteps;
607     vector<uint32_t> fjKeys;
608 
609     for (vector<uint32_t>::iterator kit = joinKeys.begin(); kit != joinKeys.end(); kit++)
610     {
611         if (jobInfo.keyInfo.get()->tupleKeyToTableOid[*kit] != CNX_EXP_TABLE_ID)
612             tupleKeyToProjectStep(*kit, keySteps, jobInfo);
613         else
614             fjKeys.push_back(*kit);
615     }
616 
617     // construct pcolstep for columns in expresssions
618     JobStepVector expSteps;
619     vector<uint32_t>& exp1 = mit->second.fColsInExp1;
620 
621     for (vector<uint32_t>::iterator kit = exp1.begin(); kit != exp1.end(); kit++)
622         tupleKeyToProjectStep(*kit, expSteps, jobInfo);
623 
624     vector<uint32_t>& exp2 = mit->second.fColsInExp2;
625 
626     for (vector<uint32_t>::iterator kit = exp2.begin(); kit != exp2.end(); kit++)
627         tupleKeyToProjectStep(*kit, expSteps, jobInfo);
628 
629     vector<uint32_t>& expRet = mit->second.fColsInRetExp;
630 
631     for (vector<uint32_t>::iterator kit = expRet.begin(); kit != expRet.end(); kit++)
632         tupleKeyToProjectStep(*kit, expSteps, jobInfo);
633 
634     vector<uint32_t>& expOut = mit->second.fColsInOuter;
635 
636     for (vector<uint32_t>::iterator kit = expOut.begin(); kit != expOut.end(); kit++)
637         tupleKeyToProjectStep(*kit, expSteps, jobInfo);
638 
639     vector<uint32_t>& expFj = mit->second.fColsInFuncJoin;
640 
641     for (vector<uint32_t>::iterator kit = expFj.begin(); kit != expFj.end(); kit++)
642         tupleKeyToProjectStep(*kit, expSteps, jobInfo);
643 
644     // for output rowgroup
645     vector<uint32_t> pos;
646     vector<uint32_t> oids;
647     vector<uint32_t> keys;
648     vector<uint32_t> scale;
649     vector<uint32_t> precision;
650     vector<CalpontSystemCatalog::ColDataType> types;
651     vector<uint32_t> csNums;
652     pos.push_back(2);
653 
654     // this psv is a copy of the project steps, the original vector in mit is not changed
655     JobStepVector psv = mit->second.fProjectSteps;             // columns being selected
656     psv.insert(psv.begin(), keySteps.begin(), keySteps.end()); // add joinkeys to project
657     psv.insert(psv.end(), expSteps.begin(), expSteps.end());   // add expressions to project
658     set<uint32_t> seenCols;                                    // columns already processed
659 
660     // for passthru conversion
661     // passthru is disabled (default lastTupleId to -1) unless the TupleBPS::bop is BOP_AND.
662     uint64_t lastTupleId = -1;
663     TupleBPS* tbps = dynamic_cast<TupleBPS*>(bps);
664 
665     if (tbps != NULL && tbps->getBOP() == BOP_AND && exp1.size() == 0)
666         lastTupleId = tbps->getLastTupleId();
667 
668     for (JobStepVector::iterator it = psv.begin(); it != psv.end(); it++)
669     {
670         JobStep* js = it->get();
671         uint32_t tupleKey = js->tupleId();
672 
673         if (seenCols.find(tupleKey) != seenCols.end())
674             continue;
675 
676         // update processed column set
677         seenCols.insert(tupleKey);
678 
679         // if the projected column is the last accessed predicate
680         pColStep* pcol = dynamic_cast<pColStep*>(js);
681 
682         if (pcol != NULL && js->tupleId() == lastTupleId)
683         {
684             PassThruStep* pts = new PassThruStep(*pcol);
685 
686             if (dynamic_cast<PseudoColStep*>(pcol))
687                 pts->pseudoType(dynamic_cast<PseudoColStep*>(pcol)->pseudoColumnId());
688 
689             pts->alias(pcol->alias());
690             pts->view(pcol->view());
691             pts->name(pcol->name());
692             pts->tupleId(pcol->tupleId());
693             it->reset(pts);
694         }
695 
696         // add projected column to TBPS
697         bool tokenOnly = false;
698         map<uint32_t, bool>::iterator toIt = jobInfo.tokenOnly.find(js->tupleId());
699 
700         if (toIt != jobInfo.tokenOnly.end())
701             tokenOnly = toIt->second;
702 
703         if (it->get()->isDictCol() && !tokenOnly)
704         {
705 //			if (jobInfo.trace && bps->tableOid() >= 3000)
706 //				cout << "1 setting project BPP for " << tbps->toString() << " with " <<
707 //					it->get()->toString() << " and " << (it+1)->get()->toString() << endl;
708             bps->setProjectBPP(it->get(), (it + 1)->get());
709 
710             // this is a two-step project step, remove the token step from id vector
711             vector<uint32_t>& pjv = mit->second.fProjectCols;
712             uint32_t tokenKey = js->tupleId();
713 
714             for (vector<uint32_t>::iterator i = pjv.begin(); i != pjv.end(); ++i)
715             {
716                 if (*i == tokenKey)
717                 {
718                     pjv.erase(i);
719                     break;
720                 }
721             }
722 
723             // move to the dictionary step
724             js = (++it)->get();
725             tupleKey = js->tupleId();
726             seenCols.insert(tupleKey);
727         }
728         else
729         {
730 //			if (jobInfo.trace && bps->tableOid() >= 3000)
731 //				cout << "2 setting project BPP for " << tbps->toString() << " with " <<
732 //					it->get()->toString() << " and " << "NULL" << endl;
733             bps->setProjectBPP(it->get(), NULL);
734         }
735 
736         // add the tuple info of the column into the RowGroup
737         TupleInfo ti(getTupleInfo(tupleKey, jobInfo));
738         pos.push_back(pos.back() + ti.width);
739         oids.push_back(ti.oid);
740         keys.push_back(ti.key);
741         types.push_back(ti.dtype);
742         csNums.push_back(ti.csNum);
743         scale.push_back(ti.scale);
744         precision.push_back(ti.precision);
745     }
746 
747     // add function join columns
748     for (vector<uint32_t>::iterator i = fjKeys.begin(); i != fjKeys.end(); i++)
749     {
750         TupleInfo ti(getTupleInfo(*i, jobInfo));
751         pos.push_back(pos.back() + ti.width);
752         oids.push_back(ti.oid);
753         keys.push_back(ti.key);
754         types.push_back(ti.dtype);
755         csNums.push_back(ti.csNum);
756         scale.push_back(ti.scale);
757         precision.push_back(ti.precision);
758     }
759 
760     // construct RowGroup
761     RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
762 
763     // fix the output association
764     AnyDataListSPtr spdl(new AnyDataList());
765     RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
766     spdl->rowGroupDL(dl);
767     dl->OID(mit->first);
768     JobStepAssociation jsa;
769     jsa.outAdd(spdl);
770     bps->outputAssociation(jsa);
771     bps->setOutputRowGroup(rg);
772 }
773 
774 
775 // add one-table expression steps into the query TBPS
addExpresssionStepsToBps(TableInfoMap::iterator & mit,SJSTEP & sjsp,JobInfo & jobInfo)776 void addExpresssionStepsToBps(TableInfoMap::iterator& mit, SJSTEP& sjsp, JobInfo& jobInfo)
777 {
778     BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(sjsp.get());
779     CalpontSystemCatalog::OID tableOid = mit->second.fTableOid;
780     JobStepVector& exps = mit->second.fOneTableExpSteps;
781     JobStepVector& fjs =  mit->second.fFuncJoinExps;
782     ExpressionStep* exp0 = NULL;
783 
784     if (exps.size() > 0)
785         exp0 = dynamic_cast<ExpressionStep*>(exps[0].get());
786     else
787         exp0 = dynamic_cast<ExpressionStep*>(fjs[0].get());
788 
789     if (bps == NULL)
790     {
791         if (tableOid > 0)
792         {
793             uint32_t key0 = exp0->columnKey();
794             CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key0];
795             map<uint32_t, CalpontSystemCatalog::ColType>::iterator dkMit;
796 
797             if (jobInfo.keyInfo->token2DictTypeMap.find(key0) !=
798                     jobInfo.keyInfo->token2DictTypeMap.end())
799                 ct = jobInfo.keyInfo->token2DictTypeMap[key0];
800 
801             scoped_ptr<pColScanStep> pcss(
802                 new pColScanStep(exp0->oid(), tableOid, ct, jobInfo));
803 
804             sjsp.reset(new TupleBPS(*pcss, jobInfo));
805             TupleBPS* tbps = dynamic_cast<TupleBPS*>(sjsp.get());
806             tbps->setJobInfo(&jobInfo);
807             tbps->setFirstStepType(SCAN);
808 
809             // add the first column to BPP's filterSteps
810             tbps->setBPP(pcss.get());
811 
812             bps = tbps;
813         }
814         else
815         {
816             sjsp.reset(new  CrossEngineStep(mit->second.fSchema,
817                                             mit->second.fName,
818                                             mit->second.fAlias,
819                                             jobInfo));
820 
821             bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
822         }
823     }
824 
825     // rowgroup for evaluating the one table expression
826     vector<uint32_t> pos;
827     vector<uint32_t> oids;
828     vector<uint32_t> keys;
829     vector<uint32_t> scale;
830     vector<uint32_t> precision;
831     vector<CalpontSystemCatalog::ColDataType> types;
832     vector<uint32_t> csNums;
833     pos.push_back(2);
834 
835     vector<uint32_t> cols;
836     JobStepVector& fjExp = mit->second.fFuncJoinExps;
837 
838     for (JobStepVector::iterator it = fjExp.begin(); it != fjExp.end(); it++)
839     {
840         ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
841         cols.push_back(getExpTupleKey(jobInfo, e->expressionId()));
842     }
843 
844     cols.insert(cols.end(), mit->second.fColsInExp1.begin(), mit->second.fColsInExp1.end());
845     cols.insert(cols.end(), mit->second.fColsInFuncJoin.begin(), mit->second.fColsInFuncJoin.end());
846     uint32_t index = 0;                 // index in the rowgroup
847     map<uint32_t, uint32_t> keyToIndexMap;  // maps key to the index in the RG
848 
849     for (vector<uint32_t>::iterator kit = cols.begin(); kit != cols.end(); kit++)
850     {
851         uint32_t key = *kit;
852 
853         if (jobInfo.keyInfo->dictKeyMap.find(key) != jobInfo.keyInfo->dictKeyMap.end())
854             key = jobInfo.keyInfo->dictKeyMap[key];
855 
856         // check if this key is already in
857         if (keyToIndexMap.find(key) != keyToIndexMap.end())
858             continue;
859 
860         // update processed column set
861         keyToIndexMap.insert(make_pair(key, index++));
862 
863         // add the tuple info of the column into the RowGroup
864         TupleInfo ti(getTupleInfo(key, jobInfo));
865         pos.push_back(pos.back() + ti.width);
866         oids.push_back(ti.oid);
867         keys.push_back(ti.key);
868         types.push_back(ti.dtype);
869         csNums.push_back(ti.csNum);
870         scale.push_back(ti.scale);
871         precision.push_back(ti.precision);
872     }
873 
874     // construct RowGroup and add to TBPS
875     RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
876     bps->setFE1Input(rg);
877 
878     if (jobInfo.trace) cout << "FE1 input RowGroup: " << rg.toString() << endl << endl;
879 
880     // add the expression steps into TBPS, the input-indices are set in SCs.
881     for (JobStepVector::iterator it = exps.begin(); it != exps.end(); it++)
882     {
883         ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
884 
885         if (e->functionJoin())
886             continue;
887 
888         e->updateInputIndex(keyToIndexMap, jobInfo);
889         boost::shared_ptr<ParseTree> sppt(new ParseTree);
890         sppt->copyTree(*(e->expressionFilter()));
891         bps->addFcnExpGroup1(sppt);
892     }
893 
894     // add the function join expression steps into TBPS, too
895     if (fjs.size() > 0)
896     {
897         vector<SRCP> fjCols;
898 
899         for (JobStepVector::iterator it = fjs.begin(); it != fjs.end(); it++)
900         {
901             ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
902 
903             if (e->virtualStep())
904                 continue;
905 
906             e->updateInputIndex(keyToIndexMap, jobInfo);
907             e->updateOutputIndex(keyToIndexMap, jobInfo);
908             fjCols.push_back(e->expression());
909         }
910 
911         bps->addFcnJoinExp(fjCols);
912     }
913 }
914 
915 
combineJobStepsByTable(TableInfoMap::iterator & mit,JobInfo & jobInfo)916 bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
917 {
918     TableInfo& tableInfo = mit->second;
919     JobStepVector& qsv = tableInfo.fQuerySteps;
920     JobStepVector newSteps;             // store combined steps
921     RowGroup rgOut;                     // rowgroup of combined steps
922     CalpontSystemCatalog::OID tableOid = tableInfo.fTableOid;
923 
924     if (tableOid != CNX_VTABLE_ID)
925     {
926         // real table
927         if (qsv.size() == 0)
928         {
929             // find a column in FE1, FE2, or FE3
930             uint32_t key = -1;
931 
932             if (tableInfo.fColsInExp1.size() > 0)
933                 key = tableInfo.fColsInExp1[0];
934             else if (tableInfo.fColsInExp2.size() > 0)
935                 key = tableInfo.fColsInExp2[0];
936             else if (tableInfo.fColsInRetExp.size() > 0)
937                 key = tableInfo.fColsInRetExp[0];
938             else if (tableInfo.fColsInOuter.size() > 0)
939                 key = tableInfo.fColsInOuter[0];
940             else if (tableInfo.fColsInColMap.size() > 0)
941                 key = tableInfo.fColsInColMap[0];
942             else
943                 throw runtime_error("No query step");
944 
945             // construct a pcolscanstep to initialize the tbps
946             CalpontSystemCatalog::OID oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
947             CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key];
948             map<uint32_t, CalpontSystemCatalog::ColType>::iterator dkMit;
949 
950             if (jobInfo.keyInfo->token2DictTypeMap.find(key) !=
951                     jobInfo.keyInfo->token2DictTypeMap.end())
952                 ct = jobInfo.keyInfo->token2DictTypeMap[key];
953 
954             SJSTEP sjs(new pColScanStep(oid, tableOid, ct, jobInfo));
955             sjs->alias(jobInfo.keyInfo->tupleKeyVec[key].fTable);
956             sjs->view(jobInfo.keyInfo->tupleKeyVec[key].fView);
957             sjs->schema(jobInfo.keyInfo->tupleKeyVec[key].fSchema);
958             sjs->name(jobInfo.keyInfo->keyName[key]);
959             sjs->tupleId(key);
960             qsv.push_back(sjs);
961         }
962 
963         SJSTEP sjsp;                        // shared_ptr for the new BatchPrimitive
964         BatchPrimitive* bps = NULL;         // pscan/pcol/filter/etc combined to
965         vector<DictionaryScanInfo> pdsVec;  // pds for string filters
966         JobStepVector::iterator begin = qsv.begin();
967         JobStepVector::iterator end = qsv.end();
968         JobStepVector::iterator it = begin;
969 
970         // make sure there is a pcolscan if there is a pcolstep
971         while (it != end)
972         {
973             if (typeid(*(it->get())) == typeid(pColScanStep))
974                 break;
975 
976             if (typeid(*(it->get())) == typeid(pColStep))
977             {
978                 pColStep* pcs = dynamic_cast<pColStep*>(it->get());
979                 (*it).reset(new pColScanStep(*pcs));
980                 break;
981             }
982 
983             it++;
984         }
985 
986         // ---- predicates ----
987         // setup TBPS and dictionaryscan
988         it = begin;
989 
990         while (it != end)
991         {
992             if (typeid(*(it->get())) == typeid(pColScanStep))
993             {
994                 if (bps == NULL)
995                 {
996                     if (tableOid > 0)
997                     {
998                         sjsp.reset(new TupleBPS(*(dynamic_cast<pColScanStep*>(it->get())), jobInfo));
999                         TupleBPS* tbps = dynamic_cast<TupleBPS*>(sjsp.get());
1000                         tbps->setJobInfo(&jobInfo);
1001                         tbps->setFirstStepType(SCAN);
1002                         bps = tbps;
1003                     }
1004                     else
1005                     {
1006                         sjsp.reset(new  CrossEngineStep(mit->second.fSchema,
1007                                                         mit->second.fName,
1008                                                         mit->second.fAlias,
1009                                                         jobInfo));
1010                         bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
1011                     }
1012                 }
1013                 else
1014                 {
1015                     pColScanStep* pcss = dynamic_cast<pColScanStep*>(it->get());
1016                     (*it).reset(new pColStep(*pcss));
1017                 }
1018             }
1019 
1020             unsigned itInc = 1;               // iterator increase number
1021             unsigned numOfStepsAddToBps = 0;  // # steps to be added into TBPS
1022 
1023             if ((std::distance(it, end) > 2 &&
1024                     dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
1025                     (dynamic_cast<pColScanStep*>((it + 1)->get()) != NULL ||
1026                      dynamic_cast<pColStep*>((it + 1)->get()) != NULL) &&
1027                     dynamic_cast<TupleHashJoinStep*>((it + 2)->get()) != NULL) ||
1028                     (std::distance(it, end) > 1 &&
1029                      dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
1030                      dynamic_cast<TupleHashJoinStep*>((it + 1)->get()) != NULL))
1031             {
1032                 // string access predicate
1033                 // setup pDictionaryScan
1034                 pDictionaryScan* pds = dynamic_cast<pDictionaryScan*>(it->get());
1035                 vector<uint32_t> pos;
1036                 vector<uint32_t> oids;
1037                 vector<uint32_t> keys;
1038                 vector<uint32_t> scale;
1039                 vector<uint32_t> precision;
1040                 vector<CalpontSystemCatalog::ColDataType> types;
1041                 vector<uint32_t> csNums;
1042                 pos.push_back(2);
1043 
1044                 pos.push_back(2 + 8);
1045                 CalpontSystemCatalog::OID coid = jobInfo.keyInfo->dictOidToColOid[pds->oid()];
1046                 oids.push_back(coid);
1047                 uint32_t keyId = pds->tupleId();
1048                 keys.push_back(keyId);
1049                 types.push_back(CalpontSystemCatalog::BIGINT);
1050                 csNums.push_back(pds->colType().charsetNumber);
1051                 scale.push_back(0);
1052                 precision.push_back(0);
1053 
1054                 RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
1055 
1056                 if (jobInfo.trace) cout << "RowGroup pds(and): " << rg.toString() << endl;
1057 
1058                 pds->setOutputRowGroup(rg);
1059                 newSteps.push_back(*it);
1060 
1061                 DictionaryScanInfo pdsInfo;
1062                 pdsInfo.fTokenId = keyId;
1063                 pdsInfo.fDl = pds->outputAssociation().outAt(0);
1064                 pdsInfo.fRowGroup = rg;
1065                 pdsVec.push_back(pdsInfo);
1066 
1067                 // save the token join to the last
1068                 itInc = 1;
1069                 numOfStepsAddToBps = 0;
1070             }
1071             else if (std::distance(begin, it) > 1 &&
1072                      (dynamic_cast<pDictionaryScan*>((it - 1)->get()) != NULL ||
1073                       dynamic_cast<pDictionaryScan*>((it - 2)->get()) != NULL) &&
1074                      dynamic_cast<TupleHashJoinStep*>(it->get()) != NULL)
1075             {
1076                 // save the token join to the last, by pdsInfo
1077                 itInc = 1;
1078                 numOfStepsAddToBps = 0;
1079             }
1080             else if (std::distance(it, end) > 2 &&
1081                      dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
1082                      dynamic_cast<FilterStep*>((it + 2)->get()) != NULL)
1083             {
1084                 itInc = 3;
1085                 numOfStepsAddToBps = 3;
1086             }
1087             else if (std::distance(it, end) > 3 &&
1088                      dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
1089                      dynamic_cast<pDictionaryStep*>((it + 2)->get()) != NULL &&
1090                      dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
1091             {
1092                 itInc = 4;
1093                 numOfStepsAddToBps = 4;
1094             }
1095             else if (std::distance(it, end) > 3 &&
1096                      dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
1097                      dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
1098                      dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
1099             {
1100                 itInc = 4;
1101                 numOfStepsAddToBps = 4;
1102             }
1103             else if (std::distance(it, end) > 4 &&
1104                      dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
1105                      dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
1106                      dynamic_cast<pDictionaryStep*>((it + 3)->get()) != NULL &&
1107                      dynamic_cast<FilterStep*>((it + 4)->get()) != NULL)
1108             {
1109                 itInc = 5;
1110                 numOfStepsAddToBps = 5;
1111             }
1112             else if (std::distance(it, end) > 1 &&
1113                      (dynamic_cast<pColStep*>(it->get()) != NULL ||
1114                       dynamic_cast<pColScanStep*>(it->get()) != NULL) &&
1115                      dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL)
1116             {
1117                 itInc = 2;
1118                 numOfStepsAddToBps = 2;
1119             }
1120             else if (dynamic_cast<pColStep*>(it->get()) != NULL)
1121             {
1122                 pColStep* pcol = dynamic_cast<pColStep*>(it->get());
1123 
1124                 if (pcol->getFilters().size() == 0)
1125                 {
1126                     // not an access predicate, pcol for token will be added later if necessary
1127                     numOfStepsAddToBps = 0;
1128                 }
1129                 else
1130                 {
1131                     numOfStepsAddToBps = 1;
1132                 }
1133 
1134                 itInc = 1;
1135             }
1136             else if (dynamic_cast<pColScanStep*>(it->get()) != NULL)
1137             {
1138                 numOfStepsAddToBps = 1;
1139                 itInc = 1;
1140             }
1141             else
1142             {
1143                 // Not a combinable step, or step pattern not recognized.
1144                 cerr << boldStart << "Try to combine " << typeid(*(it->get())).name() << ": "
1145                      << it->get()->oid() << " into TBPS" << boldStop << endl;
1146                 return false;
1147             }
1148 
1149             // now add the steps into the TBPS
1150             if (numOfStepsAddToBps > 0 && bps == NULL)
1151                 throw runtime_error("BPS not created 1");
1152 
1153             for (unsigned i = 0; i < numOfStepsAddToBps; i++)
1154             {
1155                 bps->setBPP((it + i)->get());
1156                 bps->setStepCount();
1157                 bps->setLastTupleId((it + i)->get()->tupleId());
1158             }
1159 
1160             it += itInc;
1161         }
1162 
1163         // add one-table expression steps to TBPS
1164         if (tableInfo.fOneTableExpSteps.size() > 0 || tableInfo.fFuncJoinExps.size() > 0)
1165             addExpresssionStepsToBps(mit, sjsp, jobInfo);
1166 
1167         // add TBPS to the step vector
1168         newSteps.push_back(sjsp);
1169 
1170         // ---- projects ----
1171         // now, add the joinkeys to the project step vector
1172         addProjectStepsToBps(mit, bps, jobInfo);
1173 
1174         // rowgroup has the joinkeys and selected columns
1175         // this is the expected output of this table
1176         rgOut = bps->getOutputRowGroup();
1177 
1178         // add token joins
1179         if (pdsVec.size() > 0)
1180         {
1181             // ---- token joins ----
1182             // construct a TupleHashJoinStep
1183             TupleBPS* tbps = dynamic_cast<TupleBPS*>(bps);
1184             TupleHashJoinStep* thjs = new TupleHashJoinStep(jobInfo);
1185             thjs->tableOid1(0);
1186             thjs->tableOid2(tableInfo.fTableOid);
1187             thjs->alias1(tableInfo.fAlias);
1188             thjs->alias2(tableInfo.fAlias);
1189             thjs->view1(tableInfo.fView);
1190             thjs->view2(tableInfo.fView);
1191             thjs->schema1(tableInfo.fSchema);
1192             thjs->schema2(tableInfo.fSchema);
1193             thjs->setLargeSideBPS(tbps);
1194             thjs->joinId(-1); // token join is a filter force it done before other joins
1195             thjs->setJoinType(INNER);
1196             thjs->tokenJoin(mit->first);
1197             tbps->incWaitToRunStepCnt();
1198             SJSTEP spthjs(thjs);
1199 
1200             // rowgroup of the TBPS side
1201             // start with the expected output of the table, tokens will be appended
1202             RowGroup rgTbps = rgOut;
1203 
1204             // input jobstepassociation
1205             // 1.  small sides -- pdictionaryscan steps
1206             vector<RowGroup> rgPdsVec;
1207             map<uint32_t, uint32_t> addedCol;
1208             vector<JoinType> jointypes;
1209             vector<bool> typeless;
1210             vector<vector<uint32_t> > smallKeyIndices;
1211             vector<vector<uint32_t> > largeKeyIndices;
1212             vector<string> tableNames;
1213             JobStepAssociation inJsa;
1214 
1215             for (vector<DictionaryScanInfo>::iterator i = pdsVec.begin(); i != pdsVec.end(); i++)
1216             {
1217                 // add the token steps to the TBPS
1218                 uint32_t tupleKey = i->fTokenId;
1219                 map<uint32_t, uint32_t>::iterator k = addedCol.find(tupleKey);
1220                 unsigned largeSideIndex = rgTbps.getColumnCount();
1221 
1222                 if (k == addedCol.end())
1223                 {
1224                     SJSTEP sjs(new pColStep(jobInfo.keyInfo->tupleKeyVec[tupleKey].fId,
1225                                             tableInfo.fTableOid,
1226                                             jobInfo.keyInfo->token2DictTypeMap[tupleKey],
1227                                             jobInfo));
1228                     sjs->alias(tableInfo.fAlias);
1229                     sjs->view(tableInfo.fView);
1230                     sjs->schema(tableInfo.fSchema);
1231                     sjs->name(jobInfo.keyInfo->keyName[tupleKey]);
1232                     sjs->tupleId(tupleKey);
1233                     bps->setProjectBPP(sjs.get(), NULL);
1234 
1235                     // Update info, which will be used to config the hashjoin later.
1236                     rgTbps += i->fRowGroup;
1237                     addedCol[tupleKey] = largeSideIndex;
1238                 }
1239                 else
1240                 {
1241                     largeSideIndex = k->second;
1242                 }
1243 
1244                 inJsa.outAdd(i->fDl);
1245                 tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[tupleKey].fTable);
1246                 rgPdsVec.push_back(i->fRowGroup);
1247                 jointypes.push_back(INNER);
1248                 typeless.push_back(false);
1249                 smallKeyIndices.push_back(vector<uint32_t>(1, 0));
1250                 largeKeyIndices.push_back(vector<uint32_t>(1, largeSideIndex));
1251             }
1252 
1253             // 2. large side
1254             if (jobInfo.trace) cout << "RowGroup bps(and): " << rgTbps.toString() << endl;
1255 
1256             bps->setOutputRowGroup(rgTbps);
1257             inJsa.outAdd(bps->outputAssociation().outAt(0));
1258 
1259             // set input jobstepassociation
1260             thjs->inputAssociation(inJsa);
1261             thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
1262 
1263             // output jobstepassociation
1264             AnyDataListSPtr spdl(new AnyDataList());
1265             RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
1266             spdl->rowGroupDL(dl);
1267             dl->OID(mit->first);
1268             JobStepAssociation jsaOut;
1269             jsaOut.outAdd(spdl);
1270             thjs->outputAssociation(jsaOut);
1271 
1272             // config the tuplehashjoin
1273             thjs->configSmallSideRG(rgPdsVec, tableNames);
1274             thjs->configLargeSideRG(rgTbps);
1275             thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
1276             thjs->setOutputRowGroup(rgOut);
1277             newSteps.push_back(spthjs);
1278         }
1279     }
1280     else
1281     {
1282         // table derived from subquery
1283         SubQueryStep* subStep = NULL;
1284         SubAdapterStep* adaStep = NULL;
1285 
1286         for (JobStepVector::iterator it = qsv.begin(); it != qsv.end(); it++)
1287         {
1288             if (((subStep = dynamic_cast<SubQueryStep*>(it->get())) != NULL) ||
1289                     ((adaStep = dynamic_cast<SubAdapterStep*>(it->get())) != NULL))
1290                 newSteps.push_back(*it);
1291         }
1292 
1293         if (subStep == NULL && adaStep == NULL)
1294             throw runtime_error("No step for subquery.");
1295 
1296         if (subStep)
1297         {
1298             rgOut = subStep->getOutputRowGroup();
1299         }
1300         else
1301         {
1302             // add one-table expression steps to the adapter
1303             if (tableInfo.fOneTableExpSteps.size() > 0)
1304                 adaStep->addExpression(tableInfo.fOneTableExpSteps, jobInfo);
1305 
1306             // add function join steps
1307             if (tableInfo.fFuncJoinExps.size() > 0)
1308             {
1309                 // fe rowgroup info
1310                 RowGroup feRg = adaStep->getFeRowGroup();
1311 
1312                 if (feRg.getColumnCount() == 0)
1313                     feRg = adaStep->getOutputRowGroup();
1314 
1315                 const vector<uint32_t>& feKeys = feRg.getKeys();
1316                 map<uint32_t, uint32_t> keyToIndexMapFe;
1317 
1318                 for (uint64_t i = 0; i < feKeys.size(); ++i)
1319                     keyToIndexMapFe.insert(make_pair(feKeys[i], i));
1320 
1321                 // output rowgroup info
1322                 const RowGroup& outRg = adaStep->getOutputRowGroup();
1323                 const vector<uint32_t>& outKeys = outRg.getKeys();
1324                 map<uint32_t, uint32_t> keyToIndexMapOut;
1325 
1326                 for (uint64_t i = 0; i < outKeys.size(); ++i)
1327                     keyToIndexMapOut.insert(make_pair(outKeys[i], i));
1328 
1329                 // make sure the function join columns are present in the rgs
1330                 vector<uint32_t> fjKeys;
1331                 vector<SRCP> fjCols;
1332                 TupleInfoVector tis;
1333                 uint64_t lastFeIdx  = feKeys.size();
1334                 JobStepVector& fjs =  tableInfo.fFuncJoinExps;
1335 
1336                 for (JobStepVector::iterator it = fjs.begin(); it != fjs.end(); it++)
1337                 {
1338                     ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
1339                     TupleInfo ti = setExpTupleInfo(e->expression().get(), jobInfo);
1340 
1341                     if (find(feKeys.begin(), feKeys.end(), ti.key) == feKeys.end())
1342                     {
1343                         tis.push_back(ti);
1344                         keyToIndexMapFe.insert(make_pair(ti.key, lastFeIdx++));
1345                     }
1346 
1347                     e->updateInputIndex(keyToIndexMapFe, jobInfo);
1348                     e->updateOutputIndex(keyToIndexMapFe, jobInfo);
1349                     fjCols.push_back(e->expression());
1350                 }
1351 
1352                 // additional fields in the rowgroup
1353                 vector<uint32_t> pos;
1354                 vector<uint32_t> oids;
1355                 vector<uint32_t> keys;
1356                 vector<uint32_t> scale;
1357                 vector<uint32_t> precision;
1358                 vector<CalpontSystemCatalog::ColDataType> types;
1359                 vector<uint32_t> csNums;
1360                 pos.push_back(2);
1361 
1362                 for (unsigned i = 0; i < tis.size(); i++)
1363                 {
1364                     pos.push_back(pos.back() + tis[i].width);
1365                     oids.push_back(tis[i].oid);
1366                     keys.push_back(tis[i].key);
1367                     types.push_back(tis[i].dtype);
1368                     csNums.push_back(tis[i].csNum);
1369                     scale.push_back(tis[i].scale);
1370                     precision.push_back(tis[i].precision);
1371                 }
1372 
1373                 RowGroup addRg(oids.size(), pos, oids, keys, types, csNums, scale, precision,
1374                                jobInfo.stringTableThreshold);
1375 
1376                 RowGroup feRg1 = feRg;
1377                 RowGroup outRg1 = outRg;
1378 
1379                 if (addRg.getColumnCount() > 0)
1380                 {
1381                     feRg1 += addRg;
1382                     outRg1 += addRg;
1383                 }
1384 
1385                 adaStep->addFcnJoinExp(fjCols);
1386                 adaStep->setFeRowGroup(feRg1);
1387                 adaStep->setOutputRowGroup(outRg1);
1388             }
1389 
1390             rgOut = adaStep->getOutputRowGroup();
1391         }
1392     }
1393 
1394     tableInfo.fDl = newSteps.back()->outputAssociation().outAt(0);
1395     tableInfo.fRowGroup = rgOut;
1396 
1397     if (jobInfo.trace)
1398         cout << "RowGroup for " << mit->first << " : " << mit->second.fRowGroup.toString() << endl;
1399 
1400     qsv.swap(newSteps);
1401 
1402     return true;
1403 }
1404 
1405 
addFunctionJoin(vector<uint32_t> & joinedTables,JobStepVector & joinSteps,set<uint32_t> & nodeSet,set<pair<uint32_t,uint32_t>> & pathSet,TableInfoMap & tableInfoMap,JobInfo & jobInfo)1406 bool addFunctionJoin(vector<uint32_t>& joinedTables, JobStepVector& joinSteps,
1407                      set<uint32_t>& nodeSet, set<pair<uint32_t, uint32_t> >& pathSet,
1408                      TableInfoMap& tableInfoMap, JobInfo& jobInfo)
1409 {
1410     // @bug3683, adding function joins for not joined tables, one pair at a time.
1411     // design review comment:
1412     //     all convertable expressions between a pair of tables should be done all, or none.
1413     vector<JobStep*>::iterator i = jobInfo.functionJoins.begin();   // candidates
1414     set<pair<uint32_t, uint32_t> > functionJoinPairs;               // pairs
1415     bool added = false;                                             // new node added
1416 
1417     // for function join tables' scope checking, not to try semi join inside subquery.
1418     set<uint32_t> tables;                                           // tables to join
1419     tables.insert(jobInfo.tableList.begin(), jobInfo.tableList.end());
1420 
1421     // table pairs to be joined by function joins
1422     TableJoinMap::iterator m1 = jobInfo.tableJoinMap.end();
1423     TableJoinMap::iterator m2 = jobInfo.tableJoinMap.end();
1424 
1425     for (; (i != jobInfo.functionJoins.end()); i++)
1426     {
1427         ExpressionStep* es = dynamic_cast<ExpressionStep*>((*i));
1428         idbassert(es);
1429 
1430         if (es->functionJoin())
1431             continue;  // already converted to a join
1432 
1433         boost::shared_ptr<FunctionJoinInfo> fji = es->functionJoinInfo();
1434         uint32_t key1 = fji->fJoinKey[0];
1435         uint32_t key2 = fji->fJoinKey[1];
1436         uint32_t tid1 = fji->fTableKey[0];
1437         uint32_t tid2 = fji->fTableKey[1];
1438 
1439         if (nodeSet.find(tid1) != nodeSet.end() && nodeSet.find(tid2) != nodeSet.end())
1440             continue;  // both connected, will be a cycle if added.
1441 
1442         if (nodeSet.find(tid1) == nodeSet.end() && nodeSet.find(tid2) == nodeSet.end())
1443             continue;  // both isolated, wait until one is connected.
1444 
1445         if (tables.find(tid1) == tables.end() || tables.find(tid2) == tables.end())
1446             continue;  // sub-query case
1447 
1448         // one & only one is already connected
1449         pair<uint32_t, uint32_t> p(tid1, tid2);
1450 
1451         if (functionJoinPairs.empty())
1452         {
1453             functionJoinPairs.insert(make_pair(tid1, tid2));
1454             functionJoinPairs.insert(make_pair(tid2, tid1));
1455             tableInfoMap[tid1].fAdjacentList.push_back(tid2);
1456             tableInfoMap[tid2].fAdjacentList.push_back(tid1);
1457 
1458             if (find(joinedTables.begin(), joinedTables.end(), tid1) == joinedTables.end())
1459             {
1460                 joinedTables.push_back(tid1);
1461                 nodeSet.insert(tid1);
1462                 pathSet.insert(make_pair(tid2, tid1));
1463             }
1464             else
1465             {
1466                 joinedTables.push_back(tid2);
1467                 nodeSet.insert(tid2);
1468                 pathSet.insert(make_pair(tid1, tid2));
1469             }
1470 
1471             added = true;
1472 
1473             m1 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid1, tid2), JoinData()));
1474             m2 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid2, tid1), JoinData()));
1475 
1476             if (m1 == jobInfo.tableJoinMap.end() || m2 == jobInfo.tableJoinMap.end())
1477                 throw runtime_error("Bad table map.");
1478 
1479             TupleInfo ti1 = getTupleInfo(key1, jobInfo);
1480             TupleInfo ti2 = getTupleInfo(key2, jobInfo);
1481 
1482             if (ti1.dtype == CalpontSystemCatalog::CHAR
1483              || ti1.dtype == CalpontSystemCatalog::VARCHAR
1484              || ti1.dtype == CalpontSystemCatalog::TEXT)
1485 //             || ti1.dtype == CalpontSystemCatalog::LONGDOUBLE)
1486                 m1->second.fTypeless = m2->second.fTypeless = true;  // ti2 is compatible
1487             else
1488                 m1->second.fTypeless = m2->second.fTypeless = false;
1489         }
1490         else if (functionJoinPairs.find(p) == functionJoinPairs.end())
1491         {
1492             continue;  // different path
1493         }
1494         else
1495         {
1496             // path already added, do compound join
1497             m1->second.fTypeless = m2->second.fTypeless = true;
1498         }
1499 
1500         // simple or compound function join
1501         es->functionJoin(true);
1502         updateTableKey(key1, tid1, jobInfo);
1503         updateTableKey(key2, tid2, jobInfo);
1504 
1505         tableInfoMap[tid1].fJoinKeys.push_back(key1);
1506         tableInfoMap[tid2].fJoinKeys.push_back(key2);
1507 
1508         if (fji->fStep[0])
1509             tableInfoMap[tid1].fFuncJoinExps.push_back(fji->fStep[0]);
1510 
1511         if (fji->fStep[1])
1512             tableInfoMap[tid2].fFuncJoinExps.push_back(fji->fStep[1]);
1513 
1514         vector<uint32_t>& cols1 = tableInfoMap[tid1].fColsInFuncJoin;
1515         cols1.insert(cols1.end(), fji->fColumnKeys[0].begin(), fji->fColumnKeys[0].end());
1516         vector<uint32_t>& cols2 = tableInfoMap[tid2].fColsInFuncJoin;
1517         cols2.insert(cols2.end(), fji->fColumnKeys[1].begin(), fji->fColumnKeys[1].end());
1518 
1519         // construct a join step
1520         TupleHashJoinStep* thjs = new TupleHashJoinStep(jobInfo);
1521         thjs->tableOid1(fji->fTableOid[0]);
1522         thjs->tableOid2(fji->fTableOid[1]);
1523         thjs->oid1(fji->fOid[0]);
1524         thjs->oid2(fji->fOid[1]);
1525         thjs->alias1(fji->fAlias[0]);
1526         thjs->alias2(fji->fAlias[1]);
1527         thjs->view1(fji->fView[0]);
1528         thjs->view2(fji->fView[1]);
1529         thjs->schema1(fji->fSchema[0]);
1530         thjs->schema2(fji->fSchema[1]);
1531         thjs->column1(fji->fExpression[0]);
1532         thjs->column2(fji->fExpression[1]);
1533         thjs->sequence1(fji->fSequence[0]);
1534         thjs->sequence2(fji->fSequence[1]);
1535         thjs->joinId(fji->fJoinId);
1536         thjs->setJoinType(fji->fJoinType);
1537         thjs->funcJoinInfo(fji);
1538         thjs->tupleId1(key1);
1539         thjs->tupleId2(key2);
1540         SJSTEP spjs(thjs);
1541 
1542         // check correlated info
1543         JoinType joinType = fji->fJoinType;
1544 
1545         if (!(joinType & CORRELATED))
1546         {
1547             joinSteps.push_back(spjs);
1548 
1549             // Keep a map of the join (table, key) pairs
1550             m1->second.fLeftKeys.push_back(key1);
1551             m1->second.fRightKeys.push_back(key2);
1552 
1553             m2->second.fLeftKeys.push_back(key2);
1554             m2->second.fRightKeys.push_back(key1);
1555 
1556             // Keep a map of the join type between the keys.
1557             // OUTER join and SEMI/ANTI join are mutually exclusive.
1558             if (joinType == LEFTOUTER)
1559             {
1560                 m1->second.fTypes.push_back(SMALLOUTER);
1561                 m2->second.fTypes.push_back(LARGEOUTER);
1562                 jobInfo.outerOnTable.insert(tid2);
1563             }
1564             else if (joinType == RIGHTOUTER)
1565             {
1566                 m1->second.fTypes.push_back(LARGEOUTER);
1567                 m2->second.fTypes.push_back(SMALLOUTER);
1568                 jobInfo.outerOnTable.insert(tid1);
1569             }
1570             else if ((joinType & SEMI) &&
1571                      ((joinType & LEFTOUTER) == LEFTOUTER || (joinType & RIGHTOUTER) == RIGHTOUTER))
1572             {
1573                 // @bug3998, DML UPDATE borrows "SEMI" flag,
1574                 // allowing SEMI and LARGEOUTER combination to support update with outer join.
1575                 if ((joinType & LEFTOUTER) == LEFTOUTER)
1576                 {
1577                     joinType ^= LEFTOUTER;
1578                     m1->second.fTypes.push_back(joinType);
1579                     m2->second.fTypes.push_back(joinType | LARGEOUTER);
1580                     jobInfo.outerOnTable.insert(tid2);
1581                 }
1582                 else
1583                 {
1584                     joinType ^= RIGHTOUTER;
1585                     m1->second.fTypes.push_back(joinType | LARGEOUTER);
1586                     m2->second.fTypes.push_back(joinType);
1587                     jobInfo.outerOnTable.insert(tid1);
1588                 }
1589             }
1590             else
1591             {
1592                 m1->second.fTypes.push_back(joinType);
1593                 m2->second.fTypes.push_back(joinType);
1594             }
1595 
1596             // need id to keep the join order
1597             m1->second.fJoinId = m2->second.fJoinId = thjs->joinId();
1598         }
1599         else
1600         {
1601             // one of the tables is in outer query
1602             jobInfo.correlateSteps.push_back(spjs);
1603         }
1604     }
1605 
1606     return added;
1607 }
1608 
1609 
spanningTreeCheck(TableInfoMap & tableInfoMap,JobStepVector joinSteps,JobInfo & jobInfo)1610 void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps, JobInfo& jobInfo)
1611 {
1612     bool spanningTree = true;
1613     unsigned errcode = 0;
1614     Message::Args args;
1615 
1616     if (jobInfo.trace)
1617     {
1618         cout << "Table Connection:" << endl;
1619 
1620         for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
1621         {
1622             cout << i->first << " :";
1623             vector<uint32_t>::iterator j = i->second.fAdjacentList.begin();
1624 
1625             while (j != i->second.fAdjacentList.end())
1626                 cout << " " << *j++;
1627 
1628             cout << endl;
1629         }
1630 
1631         cout << endl;
1632     }
1633 
1634     if (tableInfoMap.size() < 1)
1635     {
1636         spanningTree = false;
1637         cerr << boldStart << "No table information." << boldStop << endl;
1638         throw logic_error("No table information.");
1639     }
1640     else if (tableInfoMap.size() > 1)
1641     {
1642         // 1a. make sure all tables are joined if not a single table query.
1643         set<uint32_t> nodeSet;
1644         set<pair<uint32_t, uint32_t> > pathSet;
1645         vector<uint32_t> joinedTables;
1646         joinedTables.push_back((tableInfoMap.begin())->first);
1647 
1648         for (size_t i = 0; i < joinedTables.size(); i++)
1649         {
1650             vector<uint32_t>& v = tableInfoMap[joinedTables[i]].fAdjacentList;
1651             nodeSet.insert(joinedTables[i]);
1652 
1653             for (vector<uint32_t>::iterator j = v.begin(); j != v.end(); j++)
1654             {
1655                 if (nodeSet.find(*j) == nodeSet.end())
1656                     joinedTables.push_back(*j);
1657 
1658                 nodeSet.insert(*j);
1659                 pathSet.insert(make_pair(joinedTables[i], *j));
1660             }
1661         }
1662 
1663         // 1b. convert expressions to function joins if not connected with simple column joins.
1664         bool fjAdded = false;
1665 
1666         while (joinedTables.size() < tableInfoMap.size() &&
1667                 addFunctionJoin(joinedTables, joinSteps, nodeSet, pathSet, tableInfoMap, jobInfo))
1668         {
1669             fjAdded = true;
1670 
1671             for (size_t i = joinedTables.size() - 1; i < joinedTables.size(); i++)
1672             {
1673                 vector<uint32_t>& v = tableInfoMap[joinedTables[i]].fAdjacentList;
1674 
1675                 for (vector<uint32_t>::iterator j = v.begin(); j != v.end(); j++)
1676                 {
1677                     if (find(joinedTables.begin(), joinedTables.end(), *j) == joinedTables.end())
1678                         joinedTables.push_back(*j);
1679 
1680                     nodeSet.insert(*j);
1681                     pathSet.insert(make_pair(joinedTables[i], *j));
1682                 }
1683             }
1684         }
1685 
1686 
1687         if (jobInfo.trace && fjAdded)
1688         {
1689             cout << "Table Connection after adding function join:" << endl;
1690 
1691             for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
1692             {
1693                 cout << i->first << " :";
1694                 vector<uint32_t>::iterator j = i->second.fAdjacentList.begin();
1695 
1696                 while (j != i->second.fAdjacentList.end())
1697                     cout << " " << *j++;
1698 
1699                 cout << endl;
1700             }
1701 
1702             cout << endl;
1703         }
1704 
1705         // Check that join is compatible
1706         set<string> views1;
1707         set<string> tables1;
1708         string errStr;
1709 
1710         vector<uint32_t>::iterator k = joinedTables.begin();
1711 
1712         k = joinedTables.begin();
1713 
1714         for (; k != joinedTables.end(); k++)
1715         {
1716             if (jobInfo.keyInfo->tupleKeyVec[*k].fView.empty())
1717                 tables1.insert(jobInfo.keyInfo->tupleKeyToName[*k]);
1718             else
1719                 views1.insert(jobInfo.keyInfo->tupleKeyVec[*k].fView);
1720 
1721             if (jobInfo.incompatibleJoinMap.find(*k) != jobInfo.incompatibleJoinMap.end())
1722             {
1723                 errcode = ERR_INCOMPATIBLE_JOIN;
1724 
1725                 uint32_t key2 = jobInfo.incompatibleJoinMap[*k];
1726 
1727                 if (jobInfo.keyInfo->tupleKeyVec[*k].fView.length() > 0)
1728                 {
1729                     string view2 = jobInfo.keyInfo->tupleKeyVec[key2].fView;
1730 
1731                     if (jobInfo.keyInfo->tupleKeyVec[*k].fView == view2)
1732                     {
1733                         //  same view
1734                         errStr += "Tables in '" + view2 + "' have";
1735                     }
1736                     else if (view2.empty())
1737                     {
1738                         // view and real table
1739                         errStr += "'" + jobInfo.keyInfo->tupleKeyVec[*k].fView + "' and '" +
1740                                   jobInfo.keyInfo->tupleKeyToName[key2] + "' have";
1741                     }
1742                     else
1743                     {
1744                         // two views
1745                         errStr += "'" + jobInfo.keyInfo->tupleKeyVec[*k].fView + "' and '" +
1746                                   view2 + "' have";
1747                     }
1748                 }
1749                 else
1750                 {
1751                     string view2 = jobInfo.keyInfo->tupleKeyVec[key2].fView;
1752 
1753                     if (view2.empty())
1754                     {
1755                         // two real tables
1756                         errStr += "'" + jobInfo.keyInfo->tupleKeyToName[*k] + "' and '" +
1757                                   jobInfo.keyInfo->tupleKeyToName[key2] + "' have";
1758                     }
1759                     else
1760                     {
1761                         // real table and view
1762                         errStr += "'" + jobInfo.keyInfo->tupleKeyToName[*k] + "' and '" +
1763                                   view2 + "' have";
1764                     }
1765                 }
1766 
1767                 args.add(errStr);
1768                 spanningTree = false;
1769                 break;
1770             }
1771 
1772         }
1773 
1774         // 1c. check again if all tables are joined after pulling in function joins.
1775         if (joinedTables.size() < tableInfoMap.size())
1776         {
1777             vector<uint32_t> notJoinedTables;
1778 
1779             for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
1780             {
1781                 if (find(joinedTables.begin(), joinedTables.end(), i->first) == joinedTables.end())
1782                     notJoinedTables.push_back(i->first);
1783             }
1784 
1785             set<string> views2;
1786             set<string> tables2;
1787             k = notJoinedTables.begin();
1788 
1789             for (; k != notJoinedTables.end(); k++)
1790             {
1791                 if (jobInfo.keyInfo->tupleKeyVec[*k].fView.empty())
1792                     tables2.insert(jobInfo.keyInfo->tupleKeyToName[*k]);
1793                 else
1794                     views2.insert(jobInfo.keyInfo->tupleKeyVec[*k].fView);
1795             }
1796 
1797             if (errStr.empty())
1798             {
1799                 errcode = ERR_MISS_JOIN;
1800 
1801                 // 1. check if all tables in a view are joined
1802                 for (set<string>::iterator s = views1.begin(); s != views1.end(); s++)
1803                 {
1804                     if (views2.find(*s) != views2.end())
1805                     {
1806                         errStr = "Tables in '" + (*s) + "' are";
1807                     }
1808                 }
1809 
1810                 // 2. tables and views are joined
1811                 if (errStr.empty())
1812                 {
1813                     string set1;
1814 
1815                     for (set<string>::iterator s = views1.begin(); s != views1.end(); s++)
1816                     {
1817                         if (set1.empty())
1818                             set1 = "'";
1819                         else
1820                             set1 += ", ";
1821 
1822                         set1 += (*s);
1823                     }
1824 
1825                     for (set<string>::iterator s = tables1.begin(); s != tables1.end(); s++)
1826                     {
1827                         if (set1.empty())
1828                             set1 = "'";
1829                         else
1830                             set1 += ", ";
1831 
1832                         set1 += (*s);
1833                     }
1834 
1835                     string set2;
1836 
1837                     for (set<string>::iterator s = views2.begin(); s != views2.end(); s++)
1838                     {
1839                         if (set2.empty())
1840                             set2 = "'";
1841                         else
1842                             set2 += ", ";
1843 
1844                         set2 += (*s);
1845                     }
1846 
1847                     for (set<string>::iterator s = tables2.begin(); s != tables2.end(); s++)
1848                     {
1849                         if (set2.empty())
1850                             set2 = "'";
1851                         else
1852                             set2 += ", ";
1853 
1854                         set2 += (*s);
1855                     }
1856 
1857                     errStr = set1 + "' and " + set2 + "' are";
1858                     args.add(errStr);
1859                     spanningTree = false;
1860                 }
1861             }
1862 
1863         }
1864 
1865         // 2. no cycles
1866         if (spanningTree && (nodeSet.size() - pathSet.size() / 2 != 1))
1867         {
1868             errcode = ERR_CIRCULAR_JOIN;
1869             spanningTree = false;
1870         }
1871     }
1872 
1873     if (!spanningTree)
1874     {
1875         cerr << boldStart << IDBErrorInfo::instance()->errorMsg(errcode, args) << boldStop << endl;
1876         throw IDBExcept(IDBErrorInfo::instance()->errorMsg(errcode, args), errcode);
1877     }
1878 }
1879 
1880 
outjoinPredicateAdjust(TableInfoMap & tableInfoMap,JobInfo & jobInfo)1881 void outjoinPredicateAdjust(TableInfoMap& tableInfoMap, JobInfo& jobInfo)
1882 {
1883     set<uint32_t>::iterator i = jobInfo.outerOnTable.begin();
1884 
1885     for (; i != jobInfo.outerOnTable.end(); i++)
1886     {
1887         // resetTableFilters(tableInfoMap[*i], jobInfo)
1888         TableInfo& tblInfo = tableInfoMap[*i];
1889 
1890         if (tblInfo.fTableOid != CNX_VTABLE_ID)
1891         {
1892             JobStepVector::iterator k = tblInfo.fQuerySteps.begin();
1893             JobStepVector onClauseFilterSteps;  //@bug5887, 5311
1894 
1895             for (; k != tblInfo.fQuerySteps.end(); k++)
1896             {
1897                 if ((*k)->onClauseFilter())
1898                 {
1899                     onClauseFilterSteps.push_back(*k);
1900                     continue;
1901                 }
1902 
1903                 uint32_t colKey = -1;
1904                 pColStep* pcs = dynamic_cast<pColStep*>(k->get());
1905                 pColScanStep* pcss = dynamic_cast<pColScanStep*>(k->get());
1906                 pDictionaryScan* pdss = dynamic_cast<pDictionaryScan*>(k->get());
1907                 pDictionaryStep* pdsp = dynamic_cast<pDictionaryStep*>(k->get());
1908                 vector<const execplan::Filter*>* filters = NULL;
1909                 int8_t bop = -1;
1910 
1911                 if (pcs != NULL)
1912                 {
1913                     filters = &(pcs->getFilters());
1914                     bop = pcs->BOP();
1915                     colKey = pcs->tupleId();
1916                 }
1917                 else if (pcss != NULL)
1918                 {
1919                     filters = &(pcss->getFilters());
1920                     bop = pcss->BOP();
1921                     colKey = pcss->tupleId();
1922                 }
1923                 else if (pdss != NULL)
1924                 {
1925                     filters = &(pdss->getFilters());
1926                     bop = pdss->BOP();
1927                     colKey = pdss->tupleId();
1928                 }
1929                 else if (pdsp != NULL)
1930                 {
1931                     filters = &(pdsp->getFilters());
1932                     bop = pdsp->BOP();
1933                     colKey = pdsp->tupleId();
1934                 }
1935 
1936                 if (filters != NULL && filters->size() > 0)
1937                 {
1938                     ParseTree* pt = new ParseTree((*filters)[0]->clone());
1939 
1940                     for (size_t i = 1; i < filters->size(); i++)
1941                     {
1942                         ParseTree* left = pt;
1943                         ParseTree* right =
1944                             new ParseTree((*filters)[i]->clone());
1945                         ParseTree* op = (BOP_OR == bop) ?
1946                                         new ParseTree(new LogicOperator("or")) :
1947                                         new ParseTree(new LogicOperator("and"));
1948                         op->left(left);
1949                         op->right(right);
1950 
1951                         pt = op;
1952                     }
1953 
1954                     ExpressionStep* es = new ExpressionStep(jobInfo);
1955 
1956                     if (es == NULL)
1957                         throw runtime_error ("Failed to new ExpressionStep 2");
1958 
1959                     es->expressionFilter(pt, jobInfo);
1960                     SJSTEP sjstep(es);
1961                     jobInfo.outerJoinExpressions.push_back(sjstep);
1962                     tblInfo.fColsInOuter.push_back(colKey);
1963 
1964                     delete pt;
1965                 }
1966             }
1967 
1968             // Do not apply the primitive filters if there is an "IS NULL" in where clause.
1969             if (jobInfo.tableHasIsNull.find(*i) != jobInfo.tableHasIsNull.end())
1970                 tblInfo.fQuerySteps = onClauseFilterSteps;
1971         }
1972 
1973         jobInfo.outerJoinExpressions.insert(jobInfo.outerJoinExpressions.end(),
1974                                             tblInfo.fOneTableExpSteps.begin(), tblInfo.fOneTableExpSteps.end());
1975         tblInfo.fOneTableExpSteps.clear();
1976 
1977         tblInfo.fColsInOuter.insert(tblInfo.fColsInOuter.end(),
1978                                     tblInfo.fColsInExp1.begin(), tblInfo.fColsInExp1.end());
1979     }
1980 }
1981 
1982 
getLargestTable(JobInfo & jobInfo,TableInfoMap & tableInfoMap,bool overrideLargeSideEstimate)1983 uint32_t getLargestTable(JobInfo& jobInfo, TableInfoMap& tableInfoMap, bool overrideLargeSideEstimate)
1984 {
1985     // Subquery in FROM clause assumptions:
1986     //   hint will be ignored, if the 1st table in FROM clause is a derived table.
1987     if (jobInfo.keyInfo->tupleKeyVec[jobInfo.tableList[0]].fId < 3000)
1988         overrideLargeSideEstimate = false;
1989 
1990     // Bug 2123. Added logic to dynamically determine the large side table unless the SQL statement
1991     // contained a hint saying to skip the estimation and use the FIRST table in the from clause.
1992     // Prior to this, we were using the LAST table in the from clause.  We switched it as there
1993     // were some outer join sqls that couldn't be written with the large table last.
1994     // Default to the first table in the from clause if:
1995     //   the user set the hint; or
1996     //   there is only one table in the query.
1997     uint32_t ret = jobInfo.tableList.front();
1998 
1999     if (jobInfo.tableList.size() <= 1)
2000     {
2001         return ret;
2002     }
2003 
2004     // Algorithm to dynamically determine the largest table.
2005     uint64_t largestCardinality = 0;
2006     uint64_t estimatedRowCount = 0;
2007 
2008     // Loop through the tables and find the one with the largest estimated cardinality.
2009     for (uint32_t i = 0; i < jobInfo.tableList.size(); i++)
2010     {
2011         jobInfo.tableSize[jobInfo.tableList[i]] = 0;
2012         TableInfoMap::iterator it = tableInfoMap.find(jobInfo.tableList[i]);
2013 
2014         if (it != tableInfoMap.end())
2015         {
2016             // @Bug 3771.  Loop through the query steps until the tupleBPS is found instead of
2017             // just looking at the first one.  Tables in the query that included a filter on a
2018             // dictionary column were not getting their row count estimated.
2019             for (JobStepVector::iterator jsIt = it->second.fQuerySteps.begin();
2020                     jsIt != it->second.fQuerySteps.end(); jsIt++)
2021             {
2022                 TupleBPS* tupleBPS = dynamic_cast<TupleBPS*>((*jsIt).get());
2023 
2024                 if (tupleBPS != NULL)
2025                 {
2026                     estimatedRowCount = tupleBPS->getEstimatedRowCount();
2027                     jobInfo.tableSize[jobInfo.tableList[i]] = estimatedRowCount;
2028 
2029                     if (estimatedRowCount > largestCardinality)
2030                     {
2031                         ret = jobInfo.tableList[i];
2032                         largestCardinality = estimatedRowCount;
2033                     }
2034 
2035                     break;
2036                 }
2037             }
2038         }
2039     }
2040 
2041     // select /*! INFINIDB_ORDERED */
2042     if (overrideLargeSideEstimate)
2043     {
2044         ret = jobInfo.tableList.front();
2045         jobInfo.tableSize[ret] = numeric_limits<uint64_t>::max();
2046     }
2047 
2048     return ret;
2049 }
2050 
2051 
getPrevLarge(uint32_t n,TableInfoMap & tableInfoMap)2052 uint32_t getPrevLarge(uint32_t n, TableInfoMap& tableInfoMap)
2053 {
2054     // root node : no previous node;
2055     // other node: only one immediate previous node;
2056     int prev = -1;
2057     vector<uint32_t>& adjList = tableInfoMap[n].fAdjacentList;
2058 
2059     for (vector<uint32_t>::iterator i = adjList.begin(); i != adjList.end() && prev < 0; i++)
2060     {
2061         if (tableInfoMap[*i].fVisited == true)
2062             prev = *i;
2063     }
2064 
2065     return prev;
2066 }
2067 
2068 
getKeyIndex(uint32_t key,const RowGroup & rg)2069 uint32_t getKeyIndex(uint32_t key, const RowGroup& rg)
2070 {
2071     vector<uint32_t>::const_iterator i = rg.getKeys().begin();
2072 
2073     for (; i != rg.getKeys().end(); ++i)
2074         if (key == *i)
2075             break;
2076 
2077     if (i == rg.getKeys().end())
2078         throw runtime_error("No key found.");
2079 
2080     return std::distance(rg.getKeys().begin(), i);
2081 }
2082 
2083 
joinInfoCompare(const SP_JoinInfo & a,const SP_JoinInfo & b)2084 bool joinInfoCompare(const SP_JoinInfo& a, const SP_JoinInfo& b)
2085 {
2086     return (a->fJoinData.fJoinId < b->fJoinData.fJoinId);
2087 }
2088 
2089 
joinTypeToString(const JoinType & joinType)2090 string joinTypeToString(const JoinType& joinType)
2091 {
2092     string ret;
2093 
2094     if (joinType & INNER)
2095         ret = "inner";
2096     else if (joinType & LARGEOUTER)
2097         ret = "largeOuter";
2098     else if (joinType & SMALLOUTER)
2099         ret = "smallOuter";
2100 
2101     if (joinType & SEMI)
2102         ret += "+semi";
2103 
2104     if (joinType & ANTI)
2105         ret += "+ant";
2106 
2107     if (joinType & SCALAR)
2108         ret += "+scalar";
2109 
2110     if (joinType & MATCHNULLS)
2111         ret += "+matchnulls";
2112 
2113     if (joinType & WITHFCNEXP)
2114         ret += "+exp";
2115 
2116     if (joinType & CORRELATED)
2117         ret += "+correlated";
2118 
2119     return ret;
2120 }
2121 
2122 
joinToLargeTable(uint32_t large,TableInfoMap & tableInfoMap,JobInfo & jobInfo,vector<uint32_t> & joinOrder)2123 SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap,
2124                              JobInfo& jobInfo, vector<uint32_t>& joinOrder)
2125 {
2126     vector<SP_JoinInfo> smallSides;
2127     tableInfoMap[large].fVisited = true;
2128     tableInfoMap[large].fJoinedTables.insert(large);
2129     set<uint32_t>& tableSet = tableInfoMap[large].fJoinedTables;
2130     vector<uint32_t>& adjList = tableInfoMap[large].fAdjacentList;
2131     uint32_t prevLarge = (uint32_t) getPrevLarge(large, tableInfoMap);
2132     bool root = (prevLarge == (uint32_t) - 1) ? true : false;
2133     uint32_t link = large;
2134     uint32_t cId = -1;
2135 
2136     // Get small sides ready.
2137     for (vector<uint32_t>::iterator i = adjList.begin(); i != adjList.end(); i++)
2138     {
2139         if (tableInfoMap[*i].fVisited == false)
2140         {
2141             cId = *i;
2142             smallSides.push_back(joinToLargeTable(*i, tableInfoMap, jobInfo, joinOrder));
2143 
2144             tableSet.insert(tableInfoMap[*i].fJoinedTables.begin(),
2145                             tableInfoMap[*i].fJoinedTables.end());
2146         }
2147     }
2148 
2149     // Join with its small sides, if not a leaf node.
2150     if (smallSides.size() > 0)
2151     {
2152         // non-leaf node, need a join
2153         SJSTEP spjs = tableInfoMap[large].fQuerySteps.back();
2154         BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(spjs.get());
2155         SubAdapterStep* tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
2156         TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
2157 
2158         // @bug6158, try to put BPS on large side if possible
2159         if (tsas && smallSides.size() == 1)
2160         {
2161             SJSTEP sspjs = tableInfoMap[cId].fQuerySteps.back(), get();
2162             BatchPrimitive* sbps = dynamic_cast<BatchPrimitive*>(sspjs.get());
2163             TupleHashJoinStep* sthjs = dynamic_cast<TupleHashJoinStep*>(sspjs.get());
2164 
2165             if (sbps || (sthjs && sthjs->tokenJoin() == cId))
2166             {
2167                 SP_JoinInfo largeJoinInfo(new JoinInfo);
2168                 largeJoinInfo->fTableOid = tableInfoMap[large].fTableOid;
2169                 largeJoinInfo->fAlias = tableInfoMap[large].fAlias;
2170                 largeJoinInfo->fView = tableInfoMap[large].fView;
2171                 largeJoinInfo->fSchema = tableInfoMap[large].fSchema;
2172 
2173                 largeJoinInfo->fDl = tableInfoMap[large].fDl;
2174                 largeJoinInfo->fRowGroup = tableInfoMap[large].fRowGroup;
2175 
2176                 TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(large, cId));
2177 
2178                 if (mit == jobInfo.tableJoinMap.end())
2179                     throw runtime_error("Join step not found.");
2180 
2181                 largeJoinInfo->fJoinData = mit->second;
2182 
2183                 // switch large and small sides
2184                 joinOrder.back() = large;
2185                 large = cId;
2186                 smallSides[0] = largeJoinInfo;
2187                 tableInfoMap[large].fJoinedTables = tableSet;
2188 
2189                 bps = sbps;
2190                 thjs = sthjs;
2191                 tsas = NULL;
2192             }
2193         }
2194 
2195         if (!bps && !thjs && !tsas)
2196         {
2197             if (dynamic_cast<SubQueryStep*>(spjs.get()))
2198                 throw IDBExcept(ERR_NON_SUPPORT_SUB_QUERY_TYPE);
2199 
2200             throw runtime_error("Not supported join.");
2201         }
2202 
2203         size_t dcf = 0; // for dictionary column filters, 0 if thjs is null.
2204         RowGroup largeSideRG = tableInfoMap[large].fRowGroup;
2205 
2206         if (thjs && thjs->tokenJoin() == large)
2207         {
2208             dcf = thjs->getLargeKeys().size();
2209             largeSideRG = thjs->getLargeRowGroup();
2210         }
2211 
2212         // info for debug trace
2213         vector<string> tableNames;
2214         vector<string> traces;
2215 
2216         // sort the smallsides base on the joinId
2217         sort(smallSides.begin(), smallSides.end(), joinInfoCompare);
2218         int64_t lastJoinId = smallSides.back()->fJoinData.fJoinId;
2219 
2220         // get info to config the TupleHashjoin
2221         DataListVec smallSideDLs;
2222         vector<RowGroup> smallSideRGs;
2223         vector<JoinType> jointypes;
2224         vector<bool> typeless;
2225         vector<vector<uint32_t> > smallKeyIndices;
2226         vector<vector<uint32_t> > largeKeyIndices;
2227 
2228         for (vector<SP_JoinInfo>::iterator i = smallSides.begin(); i != smallSides.end(); i++)
2229         {
2230             JoinInfo* info = i->get();
2231             smallSideDLs.push_back(info->fDl);
2232             smallSideRGs.push_back(info->fRowGroup);
2233             jointypes.push_back(info->fJoinData.fTypes[0]);
2234             typeless.push_back(info->fJoinData.fTypeless);
2235 
2236             vector<uint32_t> smallIndices;
2237             vector<uint32_t> largeIndices;
2238             const vector<uint32_t>& keys1 = info->fJoinData.fLeftKeys;
2239             const vector<uint32_t>& keys2 = info->fJoinData.fRightKeys;
2240             vector<uint32_t>::const_iterator k1 = keys1.begin();
2241             vector<uint32_t>::const_iterator k2 = keys2.begin();
2242             uint32_t stid = getTableKey(jobInfo, *k1);
2243             tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[stid].fTable);
2244 
2245             for (; k1 != keys1.end(); ++k1, ++k2)
2246             {
2247                 smallIndices.push_back(getKeyIndex(*k1, info->fRowGroup));
2248                 largeIndices.push_back(getKeyIndex(*k2, largeSideRG));
2249             }
2250 
2251             smallKeyIndices.push_back(smallIndices);
2252             largeKeyIndices.push_back(largeIndices);
2253 
2254             if (jobInfo.trace)
2255             {
2256                 // small side column
2257                 ostringstream smallKey, smallIndex;
2258                 string alias1 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys1.front())];
2259                 smallKey << alias1 << "-";
2260 
2261                 for (k1 = keys1.begin(); k1 != keys1.end(); ++k1)
2262                 {
2263                     CalpontSystemCatalog::OID oid1 = jobInfo.keyInfo->tupleKeyVec[*k1].fId;
2264                     CalpontSystemCatalog::TableColName tcn1 = jobInfo.csc->colName(oid1);
2265                     smallKey << "(" << tcn1.column << ":" << oid1 << ":" << *k1 << ")";
2266                     smallIndex << " " << getKeyIndex(*k1,  info->fRowGroup);
2267                 }
2268 
2269                 // large side column
2270                 ostringstream largeKey, largeIndex;
2271                 string alias2 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys2.front())];
2272                 largeKey << alias2 << "-";
2273 
2274                 for (k2 = keys2.begin(); k2 != keys2.end(); ++k2)
2275                 {
2276                     CalpontSystemCatalog::OID oid2 = jobInfo.keyInfo->tupleKeyVec[*k2].fId;
2277                     CalpontSystemCatalog::TableColName tcn2 = jobInfo.csc->colName(oid2);
2278                     largeKey << "(" << tcn2.column << ":" << oid2 << ":" << *k2 << ")";
2279                     largeIndex << " " << getKeyIndex(*k2, largeSideRG);
2280                 }
2281 
2282                 ostringstream oss;
2283                 oss << smallKey.str() << " join on " << largeKey.str()
2284                     << " joinType: " << info->fJoinData.fTypes.front()
2285                     << "(" << joinTypeToString(info->fJoinData.fTypes.front()) << ")"
2286                     << (info->fJoinData.fTypeless ? " " : " !") << "typeless" << endl;
2287                 oss << "smallSideIndex-largeSideIndex :" << smallIndex.str() << " --"
2288                     << largeIndex.str() << endl;
2289                 oss << "small side RG" << endl << info->fRowGroup.toString() << endl;
2290                 traces.push_back(oss.str());
2291             }
2292         }
2293 
2294         if (jobInfo.trace)
2295         {
2296             ostringstream oss;
2297             oss << "large side RG" << endl << largeSideRG.toString() << endl;
2298             traces.push_back(oss.str());
2299         }
2300 
2301         if (bps || tsas)
2302         {
2303             thjs = new TupleHashJoinStep(jobInfo);
2304             thjs->tableOid1(smallSides[0]->fTableOid);
2305             thjs->tableOid2(tableInfoMap[large].fTableOid);
2306             thjs->alias1(smallSides[0]->fAlias);
2307             thjs->alias2(tableInfoMap[large].fAlias);
2308             thjs->view1(smallSides[0]->fView);
2309             thjs->view2(tableInfoMap[large].fView);
2310             thjs->schema1(smallSides[0]->fSchema);
2311             thjs->schema2(tableInfoMap[large].fSchema);
2312             thjs->setLargeSideBPS(bps);
2313             thjs->joinId(lastJoinId);
2314 
2315             if (dynamic_cast<TupleBPS*>(bps) != NULL)
2316                 bps->incWaitToRunStepCnt();
2317 
2318             SJSTEP spjs(thjs);
2319 
2320             JobStepAssociation inJsa;
2321             inJsa.outAdd(smallSideDLs, 0);
2322             inJsa.outAdd(tableInfoMap[large].fDl);
2323             thjs->inputAssociation(inJsa);
2324             thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
2325 
2326             JobStepAssociation outJsa;
2327             AnyDataListSPtr spdl(new AnyDataList());
2328             RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
2329             spdl->rowGroupDL(dl);
2330             dl->OID(large);
2331             outJsa.outAdd(spdl);
2332             thjs->outputAssociation(outJsa);
2333 
2334             thjs->configSmallSideRG(smallSideRGs, tableNames);
2335             thjs->configLargeSideRG(tableInfoMap[large].fRowGroup);
2336             thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
2337 
2338             tableInfoMap[large].fQuerySteps.push_back(spjs);
2339             tableInfoMap[large].fDl = spdl;
2340         }
2341         else
2342         {
2343             JobStepAssociation inJsa = thjs->inputAssociation();
2344 
2345             if (inJsa.outSize() < 2)
2346                 throw runtime_error("Not enough input to a hashjoin.");
2347 
2348             size_t last = inJsa.outSize() - 1;
2349             inJsa.outAdd(smallSideDLs, last);
2350             thjs->inputAssociation(inJsa);
2351             thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
2352 
2353             thjs->addSmallSideRG(smallSideRGs, tableNames);
2354             thjs->addJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
2355         }
2356 
2357         RowGroup rg;
2358         constructJoinedRowGroup(rg, link, prevLarge, root, tableSet, tableInfoMap, jobInfo);
2359         thjs->setOutputRowGroup(rg);
2360         tableInfoMap[large].fRowGroup = rg;
2361 
2362         if (jobInfo.trace)
2363         {
2364             cout << boldStart  << "\n====== join info ======\n" << boldStop;
2365 
2366             for (vector<string>::iterator t = traces.begin(); t != traces.end(); ++t)
2367                 cout << *t;
2368 
2369             cout << "RowGroup join result: " << endl << rg.toString() << endl << endl;
2370         }
2371 
2372         // check if any cross-table expressions can be evaluated after the join
2373         JobStepVector readyExpSteps;
2374         JobStepVector& expSteps = jobInfo.crossTableExpressions;
2375         JobStepVector::iterator eit = expSteps.begin();
2376 
2377         while (eit != expSteps.end())
2378         {
2379             ExpressionStep* exp = dynamic_cast<ExpressionStep*>(eit->get());
2380 
2381             if (exp == NULL)
2382                 throw runtime_error("Not an expression.");
2383 
2384             if (exp->functionJoin())
2385             {
2386                 eit++;
2387                 continue;  // done as join
2388             }
2389 
2390             const vector<uint32_t>& tables = exp->tableKeys();
2391             uint64_t i = 0;
2392 
2393             for (; i < tables.size(); i++)
2394             {
2395                 if (tableSet.find(tables[i]) == tableSet.end())
2396                     break;
2397             }
2398 
2399             // all tables for this expression are joined
2400             if (tables.size() ==  i)
2401             {
2402                 readyExpSteps.push_back(*eit);
2403                 eit = expSteps.erase(eit);
2404             }
2405             else
2406             {
2407                 eit++;
2408             }
2409         }
2410 
2411         // if root, handle the delayed outer join filters
2412         if (root && jobInfo.outerJoinExpressions.size() > 0)
2413             readyExpSteps.insert(readyExpSteps.end(),
2414                                  jobInfo.outerJoinExpressions.begin(),
2415                                  jobInfo.outerJoinExpressions.end());
2416 
2417         // check additional compares for semi-join
2418         if (readyExpSteps.size() > 0)
2419         {
2420             map<uint32_t, uint32_t> keyToIndexMap; // map keys to the indices in the RG
2421 
2422             for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
2423                 keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
2424 
2425             // tables have additional comparisons
2426             map<uint32_t, int> correlateTables;          // index in thjs
2427             map<uint32_t, ParseTree*> correlateCompare;  // expression
2428 
2429             for (size_t i = 0; i != smallSides.size(); i++)
2430             {
2431                 if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
2432                 {
2433                     uint32_t  tid = getTableKey(jobInfo,
2434                                                 smallSides[i]->fTableOid,
2435                                                 smallSides[i]->fAlias,
2436                                                 smallSides[i]->fSchema,
2437                                                 smallSides[i]->fView);
2438                     correlateTables[tid] = i;
2439                     correlateCompare[tid] = NULL;
2440                 }
2441             }
2442 
2443             if (correlateTables.size() > 0)
2444             {
2445                 // separate additional compare for each table pair
2446                 JobStepVector::iterator eit = readyExpSteps.begin();
2447 
2448                 while (eit != readyExpSteps.end())
2449                 {
2450                     ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
2451 
2452                     if (e->selectFilter())
2453                     {
2454                         // @bug3780, leave select filter to normal expression
2455                         eit++;
2456                         continue;
2457                     }
2458 
2459                     const vector<uint32_t>& tables = e->tableKeys();
2460                     map<uint32_t, int>::iterator j = correlateTables.end();
2461 
2462                     for (size_t i = 0; i < tables.size(); i++)
2463                     {
2464                         j = correlateTables.find(tables[i]);
2465 
2466                         if (j != correlateTables.end())
2467                             break;
2468                     }
2469 
2470                     if (j == correlateTables.end())
2471                     {
2472                         eit++;
2473                         continue;
2474                     }
2475 
2476                     // map the input column index
2477                     e->updateInputIndex(keyToIndexMap, jobInfo);
2478                     ParseTree* pt = correlateCompare[j->first];
2479 
2480                     if (pt == NULL)
2481                     {
2482                         // first expression
2483                         pt = new ParseTree;
2484                         pt->copyTree(*(e->expressionFilter()));
2485                     }
2486                     else
2487                     {
2488                         // combine the expressions
2489                         ParseTree* left = pt;
2490                         ParseTree* right = new ParseTree;
2491                         right->copyTree(*(e->expressionFilter()));
2492                         pt = new ParseTree(new LogicOperator("and"));
2493                         pt->left(left);
2494                         pt->right(right);
2495                     }
2496 
2497                     correlateCompare[j->first] = pt;
2498                     eit = readyExpSteps.erase(eit);
2499                 }
2500 
2501                 map<uint32_t, int>::iterator k = correlateTables.begin();
2502 
2503                 while (k != correlateTables.end())
2504                 {
2505                     ParseTree* pt = correlateCompare[k->first];
2506 
2507                     if (pt != NULL)
2508                     {
2509                         boost::shared_ptr<ParseTree> sppt(pt);
2510                         thjs->addJoinFilter(sppt, dcf + k->second);
2511                     }
2512 
2513                     k++;
2514                 }
2515 
2516                 thjs->setJoinFilterInputRG(rg);
2517             }
2518 
2519             // normal expression if any
2520             if (readyExpSteps.size() > 0)
2521             {
2522                 // add the expression steps in where clause can be solved by this join to bps
2523                 ParseTree* pt = NULL;
2524                 JobStepVector::iterator eit = readyExpSteps.begin();
2525 
2526                 for (; eit != readyExpSteps.end(); eit++)
2527                 {
2528                     // map the input column index
2529                     ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
2530                     e->updateInputIndex(keyToIndexMap, jobInfo);
2531 
2532                     if (pt == NULL)
2533                     {
2534                         // first expression
2535                         pt = new ParseTree;
2536                         pt->copyTree(*(e->expressionFilter()));
2537                     }
2538                     else
2539                     {
2540                         // combine the expressions
2541                         ParseTree* left = pt;
2542                         ParseTree* right = new ParseTree;
2543                         right->copyTree(*(e->expressionFilter()));
2544                         pt = new ParseTree(new LogicOperator("and"));
2545                         pt->left(left);
2546                         pt->right(right);
2547                     }
2548                 }
2549 
2550                 boost::shared_ptr<ParseTree> sppt(pt);
2551                 thjs->addFcnExpGroup2(sppt);
2552             }
2553 
2554             // update the fColsInExp2 and construct the output RG
2555             updateExp2Cols(readyExpSteps, tableInfoMap, jobInfo);
2556             constructJoinedRowGroup(rg, link, prevLarge, root, tableSet, tableInfoMap, jobInfo);
2557 
2558             if (thjs->hasFcnExpGroup2())
2559                 thjs->setFE23Output(rg);
2560             else
2561                 thjs->setOutputRowGroup(rg);
2562 
2563             tableInfoMap[large].fRowGroup = rg;
2564 
2565             if (jobInfo.trace)
2566             {
2567                 cout << "RowGroup of " << tableInfoMap[large].fAlias << " after EXP G2: " << endl
2568                      << rg.toString() << endl << endl;
2569             }
2570         }
2571     }
2572 
2573 
2574     // Prepare the current table info to join with its large side.
2575     SP_JoinInfo joinInfo(new JoinInfo);
2576     joinInfo->fTableOid = tableInfoMap[large].fTableOid;
2577     joinInfo->fAlias = tableInfoMap[large].fAlias;
2578     joinInfo->fView = tableInfoMap[large].fView;
2579     joinInfo->fSchema = tableInfoMap[large].fSchema;
2580 
2581     joinInfo->fDl = tableInfoMap[large].fDl;
2582     joinInfo->fRowGroup = tableInfoMap[large].fRowGroup;
2583 
2584     if (root == false)  // not root
2585     {
2586         TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(link, prevLarge));
2587 
2588         if (mit == jobInfo.tableJoinMap.end())
2589             throw runtime_error("Join step not found.");
2590 
2591         joinInfo->fJoinData = mit->second;
2592     }
2593 
2594     joinOrder.push_back(large);
2595 
2596     return joinInfo;
2597 }
2598 
2599 
joinStepCompare(const SJSTEP & a,const SJSTEP & b)2600 bool joinStepCompare(const SJSTEP& a, const SJSTEP& b)
2601 {
2602     return (dynamic_cast<TupleHashJoinStep*>(a.get())->joinId() <
2603             dynamic_cast<TupleHashJoinStep*>(b.get())->joinId());
2604 }
2605 
2606 
2607 struct JoinOrderData
2608 {
2609     uint32_t fTid1;
2610     uint32_t fTid2;
2611     uint32_t fJoinId;
2612 };
2613 
2614 
getJoinOrder(vector<JoinOrderData> & joins,JobStepVector & joinSteps,JobInfo & jobInfo)2615 void getJoinOrder(vector<JoinOrderData>& joins, JobStepVector& joinSteps, JobInfo& jobInfo)
2616 {
2617     sort(joinSteps.begin(), joinSteps.end(), joinStepCompare);
2618 
2619     for (JobStepVector::iterator i = joinSteps.begin(); i < joinSteps.end(); i++)
2620     {
2621         TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
2622         JoinOrderData jo;
2623         jo.fTid1 = getTableKey(jobInfo, thjs->tupleId1());
2624         jo.fTid2 = getTableKey(jobInfo, thjs->tupleId2());
2625         jo.fJoinId = thjs->joinId();
2626 
2627         // not fastest, but good for a small list
2628         vector<JoinOrderData>::iterator j;
2629 
2630         for (j = joins.begin(); j < joins.end(); j++)
2631         {
2632             if ((j->fTid1 == jo.fTid1 && j->fTid2 == jo.fTid2) ||
2633                     (j->fTid1 == jo.fTid2 && j->fTid2 == jo.fTid1))
2634             {
2635                 j->fJoinId = jo.fJoinId;
2636                 break;
2637             }
2638         }
2639 
2640         // insert unique join pair
2641         if (j == joins.end())
2642             joins.push_back(jo);
2643     }
2644 }
2645 
updateJoinSides(uint32_t small,uint32_t large,map<uint32_t,SP_JoinInfo> & joinInfoMap,vector<SP_JoinInfo> & smallSides,TableInfoMap & tableInfoMap,JobInfo & jobInfo)2646 inline void updateJoinSides(uint32_t small, uint32_t large, map<uint32_t, SP_JoinInfo>& joinInfoMap,
2647                             vector<SP_JoinInfo>& smallSides, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
2648 {
2649     TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(small, large));
2650 
2651     if (mit == jobInfo.tableJoinMap.end())
2652         throw runtime_error("Join step not found.");
2653 
2654     joinInfoMap[small]->fJoinData = mit->second;
2655     tableInfoMap[small].fJoinedTables.insert(small);
2656     smallSides.push_back(joinInfoMap[small]);
2657 
2658     tableInfoMap[large].fJoinedTables.insert(
2659         tableInfoMap[small].fJoinedTables.begin(), tableInfoMap[small].fJoinedTables.end());
2660     tableInfoMap[large].fJoinedTables.insert(large);
2661 }
2662 
2663 
2664 // For OUTER JOIN bug @2422/2633/3437/3759, join table based on join order.
2665 // The largest table will be always the streaming table, other tables are always on small side.
joinTablesInOrder(uint32_t largest,JobStepVector & joinSteps,TableInfoMap & tableInfoMap,JobInfo & jobInfo,vector<uint32_t> & joinOrder)2666 void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& tableInfoMap,
2667                        JobInfo& jobInfo, vector<uint32_t>& joinOrder)
2668 {
2669     // populate the tableInfo for join
2670     map<uint32_t, SP_JoinInfo> joinInfoMap;          // <table, JoinInfo>
2671 
2672     // <table,  <last step involved, large priority> >
2673     // large priority:
2674     //     -1 - must be on small side, like derived tables for semi join;
2675     //      0 - prefer to be on small side, like FROM subquery;
2676     //      1 - can be on either large or small side;
2677     //      2 - must be on large side.
2678     map<uint32_t, pair<SJSTEP, int> > joinStepMap;
2679 
2680     BatchPrimitive* bps = NULL;
2681     SubAdapterStep* tsas = NULL;
2682     TupleHashJoinStep* thjs = NULL;
2683 
2684     for (vector<uint32_t>::iterator i = jobInfo.tableList.begin(); i < jobInfo.tableList.end(); i++)
2685     {
2686         SP_JoinInfo joinInfo(new JoinInfo);
2687         joinInfo->fTableOid = tableInfoMap[*i].fTableOid;
2688         joinInfo->fAlias = tableInfoMap[*i].fAlias;
2689         joinInfo->fView = tableInfoMap[*i].fView;
2690         joinInfo->fSchema = tableInfoMap[*i].fSchema;
2691 
2692         joinInfo->fDl = tableInfoMap[*i].fDl;
2693         joinInfo->fRowGroup = tableInfoMap[*i].fRowGroup;
2694 
2695         joinInfoMap[*i] = joinInfo;
2696 
2697         SJSTEP spjs = tableInfoMap[*i].fQuerySteps.back();
2698         bps = dynamic_cast<BatchPrimitive*>(spjs.get());
2699         tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
2700         thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
2701         TupleBPS* tbps = dynamic_cast<TupleBPS*>(spjs.get());
2702 
2703         if (*i == largest)
2704             joinStepMap[*i] = make_pair(spjs, 2);
2705         else if (tbps || thjs)
2706             joinStepMap[*i] = make_pair(spjs, 1);
2707         else if (tsas)
2708             joinStepMap[*i] = make_pair(spjs, 0);
2709         else
2710             joinStepMap[*i] = make_pair(spjs, -1);
2711     }
2712 
2713     // sort the join steps based on join ID.
2714     vector<JoinOrderData> joins;
2715     getJoinOrder(joins, joinSteps, jobInfo);
2716 
2717     // join the steps
2718     int64_t lastJoinId = -1;
2719     uint32_t large = (uint32_t) - 1;
2720     uint32_t small = (uint32_t) - 1;
2721     uint32_t prevLarge = (uint32_t) - 1;
2722     bool umstream = false;
2723     vector<uint32_t> joinedTable;
2724     uint32_t lastJoin = joins.size() - 1;
2725     bool isSemijoin = true;
2726 
2727     for (uint64_t js = 0; js < joins.size(); js++)
2728     {
2729         set<uint32_t> smallSideTid;
2730 
2731         if (joins[js].fJoinId != 0)
2732             isSemijoin = false;
2733 
2734         vector<SP_JoinInfo> smallSides;
2735         uint32_t tid1 = joins[js].fTid1;
2736         uint32_t tid2 = joins[js].fTid2;
2737         lastJoinId = joins[js].fJoinId;
2738 
2739         // largest has already joined, and this join cannot be merged.
2740         if (prevLarge == largest && tid1 != largest && tid2 != largest)
2741             umstream = true;
2742 
2743         if (joinStepMap[tid1].second > joinStepMap[tid2].second) // high priority
2744         {
2745             large = tid1;
2746             small = tid2;
2747         }
2748         else if (joinStepMap[tid1].second == joinStepMap[tid2].second &&
2749                  jobInfo.tableSize[tid1] >= jobInfo.tableSize[tid2]) // favor t1 for hint
2750         {
2751             large = tid1;
2752             small = tid2;
2753         }
2754         else
2755         {
2756             large = tid2;
2757             small = tid1;
2758         }
2759 
2760         updateJoinSides(small, large, joinInfoMap, smallSides, tableInfoMap, jobInfo);
2761 
2762         if (find(joinedTable.begin(), joinedTable.end(), small) == joinedTable.end())
2763             joinedTable.push_back(small);
2764 
2765         smallSideTid.insert(small);
2766 
2767         // merge in the next step if the large side is the same
2768         for (uint64_t ns  = js + 1; ns < joins.size(); js++, ns++)
2769         {
2770             uint32_t tid1 = joins[ns].fTid1;
2771             uint32_t tid2 = joins[ns].fTid2;
2772             uint32_t small = (uint32_t) - 1;
2773 
2774             if ((tid1 == large) &&
2775                     ((joinStepMap[tid1].second > joinStepMap[tid2].second) ||
2776                      (joinStepMap[tid1].second == joinStepMap[tid2].second &&
2777                       jobInfo.tableSize[tid1] >= jobInfo.tableSize[tid2])))
2778             {
2779                 small = tid2;
2780             }
2781             else if ((tid2 == large) &&
2782                      ((joinStepMap[tid2].second > joinStepMap[tid1].second) ||
2783                       (joinStepMap[tid2].second == joinStepMap[tid1].second &&
2784                        jobInfo.tableSize[tid2] >= jobInfo.tableSize[tid1])))
2785             {
2786                 small = tid1;
2787             }
2788             else
2789             {
2790                 break;
2791             }
2792 
2793             // check if FE needs table in previous smallsides
2794             if (jobInfo.joinFeTableMap[joins[ns].fJoinId].size() > 0)
2795             {
2796                 set<uint32_t>& tids = jobInfo.joinFeTableMap[joins[ns].fJoinId];
2797 
2798                 for (set<uint32_t>::iterator si = smallSideTid.begin(); si != smallSideTid.end(); si++)
2799                 {
2800                     if (tids.find(*si) != tids.end())
2801                         throw runtime_error("On clause filter involving a table not directly involved in the outer join is currently not supported.");
2802                 }
2803             }
2804 
2805             updateJoinSides(small, large, joinInfoMap, smallSides, tableInfoMap, jobInfo);
2806             lastJoinId = joins[ns].fJoinId;
2807 
2808             if (find(joinedTable.begin(), joinedTable.end(), small) == joinedTable.end())
2809                 joinedTable.push_back(small);
2810 
2811             smallSideTid.insert(small);
2812         }
2813 
2814         joinedTable.push_back(large);
2815 
2816         SJSTEP spjs = joinStepMap[large].first;
2817         bps = dynamic_cast<BatchPrimitive*>(spjs.get());
2818         tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
2819         thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
2820 
2821         if (!bps && !thjs && !tsas)
2822         {
2823             if (dynamic_cast<SubQueryStep*>(spjs.get()))
2824                 throw IDBExcept(ERR_NON_SUPPORT_SUB_QUERY_TYPE);
2825 
2826             throw runtime_error("Not supported join.");
2827         }
2828 
2829         size_t startPos = 0; // start point to add new smallsides
2830         RowGroup largeSideRG = joinInfoMap[large]->fRowGroup;
2831 
2832         if (thjs && thjs->tokenJoin() == large)
2833             largeSideRG = thjs->getLargeRowGroup();
2834 
2835         // get info to config the TupleHashjoin
2836         vector<string> traces;
2837         vector<string> tableNames;
2838         DataListVec smallSideDLs;
2839         vector<RowGroup> smallSideRGs;
2840         vector<JoinType> jointypes;
2841         vector<bool> typeless;
2842         vector<vector<uint32_t> > smallKeyIndices;
2843         vector<vector<uint32_t> > largeKeyIndices;
2844 
2845         // bug5764, make sure semi joins acting as filter is after outer join.
2846         {
2847             // the inner table filters have to be performed after outer join
2848             vector<uint64_t> semijoins;
2849             vector<uint64_t> smallouts;
2850 
2851             for (size_t i = 0; i < smallSides.size(); i++)
2852             {
2853                 // find the the small-outer and semi-join joins
2854                 JoinType jt = smallSides[i]->fJoinData.fTypes[0];
2855 
2856                 if (jt & SMALLOUTER)
2857                     smallouts.push_back(i);
2858                 else if (jt & (SEMI | ANTI | SCALAR | CORRELATED))
2859                     semijoins.push_back(i);
2860             }
2861 
2862             // check the join order, re-arrange if necessary
2863             if (smallouts.size() > 0 && semijoins.size() > 0)
2864             {
2865                 uint64_t lastSmallOut = smallouts.back();
2866                 uint64_t lastSemijoin = semijoins.back();
2867 
2868                 if (lastSmallOut > lastSemijoin)
2869                 {
2870                     vector<SP_JoinInfo> temp1;
2871                     vector<SP_JoinInfo> temp2;
2872                     size_t j = 0;
2873 
2874                     for (size_t i = 0; i < smallSides.size(); i++)
2875                     {
2876                         if (j < semijoins.size() && i == semijoins[j])
2877                         {
2878                             temp1.push_back(smallSides[i]);
2879                             j++;
2880                         }
2881                         else
2882                         {
2883                             temp2.push_back(smallSides[i]);
2884                         }
2885 
2886                         if (i == lastSmallOut)
2887                             temp2.insert(temp2.end(), temp1.begin(), temp1.end());
2888                     }
2889 
2890                     smallSides = temp2;
2891                 }
2892             }
2893         }
2894 
2895         for (vector<SP_JoinInfo>::iterator i = smallSides.begin(); i != smallSides.end(); i++)
2896         {
2897             JoinInfo* info = i->get();
2898             smallSideDLs.push_back(info->fDl);
2899             smallSideRGs.push_back(info->fRowGroup);
2900             jointypes.push_back(info->fJoinData.fTypes[0]);
2901             typeless.push_back(info->fJoinData.fTypeless);
2902 
2903             vector<uint32_t> smallIndices;
2904             vector<uint32_t> largeIndices;
2905             const vector<uint32_t>& keys1 = info->fJoinData.fLeftKeys;
2906             const vector<uint32_t>& keys2 = info->fJoinData.fRightKeys;
2907             vector<uint32_t>::const_iterator k1 = keys1.begin();
2908             vector<uint32_t>::const_iterator k2 = keys2.begin();
2909             uint32_t stid = getTableKey(jobInfo, *k1);
2910             tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[stid].fTable);
2911 
2912             for (; k1 != keys1.end(); ++k1, ++k2)
2913             {
2914                 smallIndices.push_back(getKeyIndex(*k1, info->fRowGroup));
2915                 largeIndices.push_back(getKeyIndex(*k2, largeSideRG));
2916             }
2917 
2918             smallKeyIndices.push_back(smallIndices);
2919             largeKeyIndices.push_back(largeIndices);
2920 
2921             if (jobInfo.trace)
2922             {
2923                 // small side column
2924                 ostringstream smallKey, smallIndex;
2925                 string alias1 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys1.front())];
2926                 smallKey << alias1 << "-";
2927 
2928                 for (k1 = keys1.begin(); k1 != keys1.end(); ++k1)
2929                 {
2930                     CalpontSystemCatalog::OID oid1 = jobInfo.keyInfo->tupleKeyVec[*k1].fId;
2931                     CalpontSystemCatalog::TableColName tcn1 = jobInfo.csc->colName(oid1);
2932                     smallKey << "(" << tcn1.column << ":" << oid1 << ":" << *k1 << ")";
2933                     smallIndex << " " << getKeyIndex(*k1,  info->fRowGroup);
2934                 }
2935 
2936                 // large side column
2937                 ostringstream largeKey, largeIndex;
2938                 string alias2 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys2.front())];
2939                 largeKey << alias2 << "-";
2940 
2941                 for (k2 = keys2.begin(); k2 != keys2.end(); ++k2)
2942                 {
2943                     CalpontSystemCatalog::OID oid2 = jobInfo.keyInfo->tupleKeyVec[*k2].fId;
2944                     CalpontSystemCatalog::TableColName tcn2 = jobInfo.csc->colName(oid2);
2945                     largeKey << "(" << tcn2.column << ":" << oid2 << ":" << *k2 << ")";
2946                     largeIndex << " " << getKeyIndex(*k2, largeSideRG);
2947                 }
2948 
2949                 ostringstream oss;
2950                 oss << smallKey.str() << " join on " << largeKey.str()
2951                     << " joinType: " << info->fJoinData.fTypes.front()
2952                     << "(" << joinTypeToString(info->fJoinData.fTypes.front()) << ")"
2953                     << (info->fJoinData.fTypeless ? " " : " !") << "typeless" << endl;
2954                 oss << "smallSideIndex-largeSideIndex :" << smallIndex.str() << " --"
2955                     << largeIndex.str() << endl;
2956                 oss << "small side RG" << endl << info->fRowGroup.toString() << endl;
2957                 traces.push_back(oss.str());
2958             }
2959         }
2960 
2961         if (jobInfo.trace)
2962         {
2963             ostringstream oss;
2964             oss << "large side RG" << endl << largeSideRG.toString() << endl;
2965             traces.push_back(oss.str());
2966         }
2967 
2968         if (bps || tsas || umstream || (thjs && joinStepMap[large].second < 1))
2969         {
2970             thjs = new TupleHashJoinStep(jobInfo);
2971             thjs->tableOid1(smallSides[0]->fTableOid);
2972             thjs->tableOid2(tableInfoMap[large].fTableOid);
2973             thjs->alias1(smallSides[0]->fAlias);
2974             thjs->alias2(tableInfoMap[large].fAlias);
2975             thjs->view1(smallSides[0]->fView);
2976             thjs->view2(tableInfoMap[large].fView);
2977             thjs->schema1(smallSides[0]->fSchema);
2978             thjs->schema2(tableInfoMap[large].fSchema);
2979             thjs->setLargeSideBPS(bps);
2980             thjs->joinId(lastJoinId);
2981 
2982             if (dynamic_cast<TupleBPS*>(bps) != NULL)
2983                 bps->incWaitToRunStepCnt();
2984 
2985             spjs.reset(thjs);
2986 
2987             JobStepAssociation inJsa;
2988             inJsa.outAdd(smallSideDLs, 0);
2989             inJsa.outAdd(joinInfoMap[large]->fDl);
2990             thjs->inputAssociation(inJsa);
2991             thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
2992 
2993             JobStepAssociation outJsa;
2994             AnyDataListSPtr spdl(new AnyDataList());
2995             RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
2996             spdl->rowGroupDL(dl);
2997             dl->OID(large);
2998             outJsa.outAdd(spdl);
2999             thjs->outputAssociation(outJsa);
3000 
3001             thjs->configSmallSideRG(smallSideRGs, tableNames);
3002             thjs->configLargeSideRG(joinInfoMap[large]->fRowGroup);
3003             thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
3004 
3005             tableInfoMap[large].fQuerySteps.push_back(spjs);
3006             tableInfoMap[large].fDl = spdl;
3007         }
3008         else // thjs && joinStepMap[large].second >= 1
3009         {
3010             JobStepAssociation inJsa = thjs->inputAssociation();
3011 
3012             if (inJsa.outSize() < 2)
3013                 throw runtime_error("Not enough input to a hashjoin.");
3014 
3015             startPos = inJsa.outSize() - 1;
3016             inJsa.outAdd(smallSideDLs, startPos);
3017             thjs->inputAssociation(inJsa);
3018             thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
3019 
3020             thjs->addSmallSideRG(smallSideRGs, tableNames);
3021             thjs->addJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
3022         }
3023 
3024         RowGroup rg;
3025         set<uint32_t>& tableSet = tableInfoMap[large].fJoinedTables;
3026         constructJoinedRowGroup(rg, tableSet, tableInfoMap, jobInfo);
3027         thjs->setOutputRowGroup(rg);
3028         tableInfoMap[large].fRowGroup = rg;
3029         tableSet.insert(large);
3030 
3031         if (jobInfo.trace)
3032         {
3033             cout << boldStart  << "\n====== join info ======\n" << boldStop;
3034 
3035             for (vector<string>::iterator t = traces.begin(); t != traces.end(); ++t)
3036                 cout << *t;
3037 
3038             cout << "RowGroup join result: " << endl << rg.toString() << endl << endl;
3039         }
3040 
3041         // on clause filter association
3042         map<uint64_t, size_t> joinIdIndexMap;
3043 
3044         for (size_t i = 0; i < smallSides.size(); i++)
3045         {
3046             if (smallSides[i]->fJoinData.fJoinId > 0)
3047                 joinIdIndexMap[smallSides[i]->fJoinData.fJoinId] = i;
3048         }
3049 
3050         // check if any cross-table expressions can be evaluated after the join
3051         JobStepVector readyExpSteps;
3052         JobStepVector& expSteps = jobInfo.crossTableExpressions;
3053         JobStepVector::iterator eit = expSteps.begin();
3054 
3055         while (eit != expSteps.end())
3056         {
3057             ExpressionStep* exp = dynamic_cast<ExpressionStep*>(eit->get());
3058 
3059             if (exp == NULL)
3060                 throw runtime_error("Not an expression.");
3061 
3062             if (exp->functionJoin())
3063             {
3064                 eit++;
3065                 continue;  // done as join
3066             }
3067 
3068             const vector<uint32_t>& tables = exp->tableKeys();
3069             uint64_t i = 0;
3070 
3071             for (; i < tables.size(); i++)
3072             {
3073                 if (tableInfoMap[large].fJoinedTables.find(tables[i]) ==
3074                         tableInfoMap[large].fJoinedTables.end())
3075                     break;
3076             }
3077 
3078             // all tables for this expression are joined
3079             bool ready = (tables.size() == i);
3080 
3081             // for on clause condition, need check join ID
3082             if (ready && exp->associatedJoinId() != 0)
3083             {
3084                 map<uint64_t, size_t>::iterator x = joinIdIndexMap.find(exp->associatedJoinId());
3085                 ready = (x != joinIdIndexMap.end());
3086             }
3087 
3088             if (ready)
3089             {
3090                 readyExpSteps.push_back(*eit);
3091                 eit = expSteps.erase(eit);
3092             }
3093             else
3094             {
3095                 eit++;
3096             }
3097         }
3098 
3099         // if last join step, handle the delayed outer join filters
3100         if (js == lastJoin && jobInfo.outerJoinExpressions.size() > 0)
3101             readyExpSteps.insert(readyExpSteps.end(),
3102                                  jobInfo.outerJoinExpressions.begin(),
3103                                  jobInfo.outerJoinExpressions.end());
3104 
3105         // check additional compares for semi-join
3106         if (readyExpSteps.size() > 0)
3107         {
3108             map<uint32_t, uint32_t> keyToIndexMap; // map keys to the indices in the RG
3109 
3110             for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
3111                 keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
3112 
3113             // tables have additional comparisons
3114             map<uint32_t, int> correlateTables;          // index in thjs
3115             map<uint32_t, ParseTree*> correlateCompare;  // expression
3116 
3117             for (size_t i = 0; i != smallSides.size(); i++)
3118             {
3119                 if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
3120                 {
3121                     uint32_t  tid = getTableKey(jobInfo,
3122                                                 smallSides[i]->fTableOid,
3123                                                 smallSides[i]->fAlias,
3124                                                 smallSides[i]->fSchema,
3125                                                 smallSides[i]->fView);
3126                     correlateTables[tid] = i;
3127                     correlateCompare[tid] = NULL;
3128                 }
3129             }
3130 
3131             if (correlateTables.size() > 0)
3132             {
3133                 // separate additional compare for each table pair
3134                 JobStepVector::iterator eit = readyExpSteps.begin();
3135 
3136                 while (eit != readyExpSteps.end())
3137                 {
3138                     ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
3139 
3140                     if (e->selectFilter())
3141                     {
3142                         // @bug3780, leave select filter to normal expression
3143                         eit++;
3144                         continue;
3145                     }
3146 
3147                     const vector<uint32_t>& tables = e->tableKeys();
3148                     map<uint32_t, int>::iterator j = correlateTables.end();
3149 
3150                     for (size_t i = 0; i < tables.size(); i++)
3151                     {
3152                         j = correlateTables.find(tables[i]);
3153 
3154                         if (j != correlateTables.end())
3155                             break;
3156                     }
3157 
3158                     if (j == correlateTables.end())
3159                     {
3160                         eit++;
3161                         continue;
3162                     }
3163 
3164                     // map the input column index
3165                     e->updateInputIndex(keyToIndexMap, jobInfo);
3166                     ParseTree* pt = correlateCompare[j->first];
3167 
3168                     if (pt == NULL)
3169                     {
3170                         // first expression
3171                         pt = new ParseTree;
3172                         pt->copyTree(*(e->expressionFilter()));
3173                     }
3174                     else
3175                     {
3176                         // combine the expressions
3177                         ParseTree* left = pt;
3178                         ParseTree* right = new ParseTree;
3179                         right->copyTree(*(e->expressionFilter()));
3180                         pt = new ParseTree(new LogicOperator("and"));
3181                         pt->left(left);
3182                         pt->right(right);
3183                     }
3184 
3185                     correlateCompare[j->first] = pt;
3186                     eit = readyExpSteps.erase(eit);
3187                 }
3188 
3189                 map<uint32_t, int>::iterator k = correlateTables.begin();
3190 
3191                 while (k != correlateTables.end())
3192                 {
3193                     ParseTree* pt = correlateCompare[k->first];
3194 
3195                     if (pt != NULL)
3196                     {
3197                         boost::shared_ptr<ParseTree> sppt(pt);
3198                         thjs->addJoinFilter(sppt, startPos + k->second);
3199                     }
3200 
3201                     k++;
3202                 }
3203 
3204                 thjs->setJoinFilterInputRG(rg);
3205             }
3206 
3207             // normal expression if any
3208             if (readyExpSteps.size() > 0)
3209             {
3210                 // add the expression steps in where clause can be solved by this join to bps
3211                 ParseTree* pt = NULL;
3212                 JobStepVector::iterator eit = readyExpSteps.begin();
3213 
3214                 for (; eit != readyExpSteps.end(); eit++)
3215                 {
3216                     // map the input column index
3217                     ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
3218                     e->updateInputIndex(keyToIndexMap, jobInfo);
3219 
3220                     // short circuit on clause expressions
3221                     map<uint64_t, size_t>::iterator x = joinIdIndexMap.find(e->associatedJoinId());
3222 
3223                     if (x != joinIdIndexMap.end())
3224                     {
3225                         ParseTree* joinFilter = new ParseTree;
3226                         joinFilter->copyTree(*(e->expressionFilter()));
3227                         boost::shared_ptr<ParseTree> sppt(joinFilter);
3228                         thjs->addJoinFilter(sppt, startPos + x->second);
3229                         thjs->setJoinFilterInputRG(rg);
3230                         continue;
3231                     }
3232 
3233                     if (pt == NULL)
3234                     {
3235                         // first expression
3236                         pt = new ParseTree;
3237                         pt->copyTree(*(e->expressionFilter()));
3238                     }
3239                     else
3240                     {
3241                         // combine the expressions
3242                         ParseTree* left = pt;
3243                         ParseTree* right = new ParseTree;
3244                         right->copyTree(*(e->expressionFilter()));
3245                         pt = new ParseTree(new LogicOperator("and"));
3246                         pt->left(left);
3247                         pt->right(right);
3248                     }
3249                 }
3250 
3251                 if (pt != NULL)
3252                 {
3253                     boost::shared_ptr<ParseTree> sppt(pt);
3254                     thjs->addFcnExpGroup2(sppt);
3255                 }
3256             }
3257 
3258             // update the fColsInExp2 and construct the output RG
3259             updateExp2Cols(readyExpSteps, tableInfoMap, jobInfo);
3260             constructJoinedRowGroup(rg, tableSet, tableInfoMap, jobInfo);
3261 
3262             if (thjs->hasFcnExpGroup2())
3263                 thjs->setFE23Output(rg);
3264             else
3265                 thjs->setOutputRowGroup(rg);
3266 
3267             tableInfoMap[large].fRowGroup = rg;
3268 
3269             if (jobInfo.trace)
3270             {
3271                 cout << "RowGroup of " << tableInfoMap[large].fAlias << " after EXP G2: " << endl
3272                      << rg.toString() << endl << endl;
3273             }
3274         }
3275 
3276         // update the info maps
3277         int l = (joinStepMap[large].second == 2) ? 2 : 0;
3278 
3279         if (isSemijoin)
3280             joinStepMap[large] = make_pair(spjs, joinStepMap[large].second);
3281         else
3282             joinStepMap[large] = make_pair(spjs, l);
3283 
3284         for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
3285         {
3286             joinInfoMap[*i]->fDl = tableInfoMap[large].fDl;
3287             joinInfoMap[*i]->fRowGroup = tableInfoMap[large].fRowGroup;
3288 
3289             if (*i != large)
3290             {
3291                 //@bug6117, token should be done for small side tables.
3292                 SJSTEP smallJs = joinStepMap[*i].first;
3293                 TupleHashJoinStep* smallThjs = dynamic_cast<TupleHashJoinStep*>(smallJs.get());
3294 
3295                 if (smallThjs && smallThjs->tokenJoin())
3296                     smallThjs->tokenJoin(-1);
3297 
3298                 // Set join priority for smallsides.
3299                 joinStepMap[*i] = make_pair(spjs, l);
3300 
3301                 // Mark joined tables, smalls and large, as a group.
3302                 tableInfoMap[*i].fJoinedTables = tableInfoMap[large].fJoinedTables;
3303             }
3304         }
3305 
3306         prevLarge = large;
3307     }
3308 
3309     // Keep join order by the table last used for picking the right delivery step.
3310     {
3311         for (vector<uint32_t>::reverse_iterator i = joinedTable.rbegin(); i < joinedTable.rend(); i++)
3312         {
3313             if (find(joinOrder.begin(), joinOrder.end(), *i) == joinOrder.end())
3314                 joinOrder.push_back(*i);
3315         }
3316 
3317         const uint64_t n = joinOrder.size();
3318         const uint64_t h = n / 2;
3319         const uint64_t e = n - 1;
3320 
3321         for (uint64_t i = 0; i < h; i++)
3322             std::swap(joinOrder[i], joinOrder[e - i]);
3323     }
3324 }
3325 
3326 
joinTables(JobStepVector & joinSteps,TableInfoMap & tableInfoMap,JobInfo & jobInfo,vector<uint32_t> & joinOrder,const bool overrideLargeSideEstimate)3327 inline void joinTables(JobStepVector& joinSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo,
3328                        vector<uint32_t>& joinOrder, const bool overrideLargeSideEstimate)
3329 {
3330     uint32_t largestTable = getLargestTable(jobInfo, tableInfoMap, overrideLargeSideEstimate);
3331 
3332     if (jobInfo.outerOnTable.size() == 0)
3333         joinToLargeTable(largestTable, tableInfoMap, jobInfo, joinOrder);
3334     else
3335         joinTablesInOrder(largestTable, joinSteps, tableInfoMap, jobInfo, joinOrder);
3336 }
3337 
3338 
makeNoTableJobStep(JobStepVector & querySteps,JobStepVector & projectSteps,DeliveredTableMap & deliverySteps,JobInfo & jobInfo)3339 void makeNoTableJobStep(JobStepVector& querySteps, JobStepVector& projectSteps,
3340                         DeliveredTableMap& deliverySteps, JobInfo& jobInfo)
3341 {
3342     querySteps.clear();
3343     projectSteps.clear();
3344     deliverySteps.clear();
3345     querySteps.push_back(TupleConstantStep::addConstantStep(jobInfo));
3346     deliverySteps[CNX_VTABLE_ID] = querySteps.back();
3347 }
3348 
3349 
3350 }
3351 
3352 
3353 
3354 namespace joblist
3355 {
associateTupleJobSteps(JobStepVector & querySteps,JobStepVector & projectSteps,DeliveredTableMap & deliverySteps,JobInfo & jobInfo,const bool overrideLargeSideEstimate)3356 void associateTupleJobSteps(JobStepVector& querySteps, JobStepVector& projectSteps,
3357                             DeliveredTableMap& deliverySteps, JobInfo& jobInfo,
3358                             const bool overrideLargeSideEstimate)
3359 {
3360     if (jobInfo.trace)
3361     {
3362         const boost::shared_ptr<TupleKeyInfo>& keyInfo = jobInfo.keyInfo;
3363         cout << "query steps:" << endl;
3364 
3365         for (JobStepVector::iterator i = querySteps.begin(); i != querySteps.end(); ++i)
3366         {
3367             TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
3368 
3369             if (thjs == NULL)
3370             {
3371                 int64_t id = ((*i)->tupleId() != (uint64_t) - 1) ? (*i)->tupleId() : -1;
3372                 cout << typeid(*(i->get())).name() << ": " << (*i)->oid() << " " << id << " "
3373                      << (int)((id != -1) ? getTableKey(jobInfo, id) : -1) << endl;
3374             }
3375             else
3376             {
3377                 int64_t id1 = (thjs->tupleId1() != (uint64_t) - 1) ? thjs->tupleId1() : -1;
3378                 int64_t id2 = (thjs->tupleId2() != (uint64_t) - 1) ? thjs->tupleId2() : -1;
3379                 cout << typeid(*thjs).name() << ": " << thjs->oid1() << " " << id1 << " "
3380                      << (int)((id1 != -1) ? getTableKey(jobInfo, id1) : -1) << " - "
3381                      << thjs->getJoinType() << " - " << thjs->oid2() << " " << id2 << " "
3382                      << (int)((id2 != -1) ? getTableKey(jobInfo, id2) : -1) << endl;
3383             }
3384         }
3385 
3386         cout << "project steps:" << endl;
3387 
3388         for (JobStepVector::iterator i = projectSteps.begin(); i != projectSteps.end(); ++i)
3389         {
3390             cout << typeid(*(i->get())).name() << ": " << (*i)->oid() << " "
3391                  << (*i)->tupleId() << " " << getTableKey(jobInfo, (*i)->tupleId()) << endl;
3392         }
3393 
3394         cout << "delivery steps:" << endl;
3395 
3396         for (DeliveredTableMap::iterator i = deliverySteps.begin(); i != deliverySteps.end(); ++i)
3397             cout << typeid(*(i->second.get())).name() << endl;
3398 
3399         cout << "\nTable Info:  (key  oid  name alias view sub)" << endl;
3400 
3401         for (uint32_t i = 0; i < keyInfo->tupleKeyVec.size(); ++i)
3402         {
3403             int64_t tid = jobInfo.keyInfo->colKeyToTblKey[i];
3404 
3405             if (tid == i)
3406             {
3407                 CalpontSystemCatalog::OID oid = keyInfo->tupleKeyVec[i].fId;
3408                 string alias = keyInfo->tupleKeyVec[i].fTable;
3409 
3410                 if (alias.length() < 1) alias = "N/A";
3411 
3412                 string name = keyInfo->keyName[i];
3413 
3414                 if (name.empty()) name = "unknown";
3415 
3416                 string view = keyInfo->tupleKeyVec[i].fView;
3417 
3418                 if (view.length() < 1) view = "N/A";
3419 
3420                 int sid = keyInfo->tupleKeyVec[i].fSubId;
3421                 cout << i << "\t" << oid << "\t" << name << "\t" << alias
3422                      << "\t" << view << "\t" << hex << sid << dec << endl;
3423             }
3424         }
3425 
3426         cout << "\nTupleKey vector:  (tupleKey  oid  tableKey  name  alias view sub)" << endl;
3427 
3428         for (uint32_t i = 0; i < keyInfo->tupleKeyVec.size(); ++i)
3429         {
3430             CalpontSystemCatalog::OID oid = keyInfo->tupleKeyVec[i].fId;
3431             int64_t tid = jobInfo.keyInfo->colKeyToTblKey[i];
3432             string alias = keyInfo->tupleKeyVec[i].fTable;
3433 
3434             if (alias.length() < 1) alias = "N/A";
3435 
3436             // Expression IDs are borrowed from systemcatalog IDs, which are not used in tuple.
3437             string name = keyInfo->keyName[i];
3438 
3439             if (keyInfo->dictOidToColOid.find(oid) != keyInfo->dictOidToColOid.end())
3440             {
3441                 name += "[d]";  // indicate this is a dictionary column
3442             }
3443 
3444             if (jobInfo.keyInfo->pseudoType[i] > 0)
3445             {
3446                 name += "[p]";  // indicate this is a pseudo column
3447             }
3448 
3449             if (name.empty())
3450             {
3451                 name = "unknown";
3452             }
3453 
3454             string view = keyInfo->tupleKeyVec[i].fView;
3455 
3456             if (view.length() < 1) view = "N/A";
3457 
3458             int sid = keyInfo->tupleKeyVec[i].fSubId;
3459             cout << i << "\t" << oid << "\t" << tid << "\t" << name << "\t" << alias
3460                  << "\t" << view << "\t" << hex << sid << dec << endl;
3461         }
3462 
3463         cout << endl;
3464     }
3465 
3466 
3467     // @bug 2771, handle no table select query
3468     if (jobInfo.tableList.size() < 1)
3469     {
3470         makeNoTableJobStep(querySteps, projectSteps, deliverySteps, jobInfo);
3471         return;
3472     }
3473 
3474     // Create a step vector for each table in the from clause.
3475     TableInfoMap tableInfoMap;
3476 
3477     for (uint64_t i = 0; i < jobInfo.tableList.size(); i++)
3478     {
3479         uint32_t tableUid = jobInfo.tableList[i];
3480         tableInfoMap[tableUid] = TableInfo();
3481         tableInfoMap[tableUid].fTableOid = jobInfo.keyInfo->tupleKeyVec[tableUid].fId;
3482         tableInfoMap[tableUid].fName = jobInfo.keyInfo->keyName[tableUid];
3483         tableInfoMap[tableUid].fAlias = jobInfo.keyInfo->tupleKeyVec[tableUid].fTable;
3484         tableInfoMap[tableUid].fView = jobInfo.keyInfo->tupleKeyVec[tableUid].fView;
3485         tableInfoMap[tableUid].fSchema = jobInfo.keyInfo->tupleKeyVec[tableUid].fSchema;
3486         tableInfoMap[tableUid].fSubId = jobInfo.keyInfo->tupleKeyVec[tableUid].fSubId;
3487         tableInfoMap[tableUid].fColsInColMap = jobInfo.columnMap[tableUid];
3488     }
3489 
3490     // Set of the columns being projected.
3491     for (TupleInfoVector::iterator i = jobInfo.pjColList.begin(); i != jobInfo.pjColList.end(); i++)
3492         jobInfo.returnColSet.insert(i->key);
3493 
3494     // Strip constantbooleanquerySteps
3495     for (uint64_t i = 0; i < querySteps.size(); )
3496     {
3497         TupleConstantBooleanStep* bs = dynamic_cast<TupleConstantBooleanStep*>(querySteps[i].get());
3498         ExpressionStep* es = dynamic_cast<ExpressionStep*>(querySteps[i].get());
3499 
3500         if (bs != NULL)
3501         {
3502             // cosntant step
3503             if (bs->boolValue() == false)
3504                 jobInfo.constantFalse = true;
3505 
3506             querySteps.erase(querySteps.begin() + i);
3507         }
3508         else if (es != NULL && es->tableKeys().size() == 0)
3509         {
3510             // constant expression
3511             ParseTree* p = es->expressionFilter();  // filter
3512 
3513             if (p != NULL)
3514             {
3515                 Row r; // dummy row
3516 
3517                 if (funcexp::FuncExp::instance()->evaluate(r, p) == false)
3518                     jobInfo.constantFalse = true;
3519 
3520                 querySteps.erase(querySteps.begin() + i);
3521             }
3522         }
3523         else
3524         {
3525             i++;
3526         }
3527     }
3528 
3529     // double check if the function join canditates are still there.
3530     JobStepVector steps = querySteps;
3531 
3532     for (int64_t i = jobInfo.functionJoins.size() - 1; i >= 0; i--)
3533     {
3534         bool exist = false;
3535 
3536         for (JobStepVector::iterator j = steps.begin(); j != steps.end() && !exist; ++j)
3537         {
3538             if (jobInfo.functionJoins[i] == j->get())
3539                 exist = true;
3540         }
3541 
3542         if (!exist)
3543             jobInfo.functionJoins.erase(jobInfo.functionJoins.begin() + i);
3544     }
3545 
3546     // Concatenate query and project steps
3547     steps.insert(steps.end(), projectSteps.begin(), projectSteps.end());
3548 
3549     // Make sure each query step has an output DL
3550     // This is necessary for toString() method on most steps
3551     for (JobStepVector::iterator it = steps.begin(); it != steps.end(); ++it)
3552     {
3553         //if (dynamic_cast<OrDelimiter*>(it->get()))
3554         //	continue;
3555 
3556         if (it->get()->outputAssociation().outSize() == 0)
3557         {
3558             JobStepAssociation jsa;
3559             AnyDataListSPtr adl(new AnyDataList());
3560             RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
3561             dl->OID(it->get()->oid());
3562             adl->rowGroupDL(dl);
3563             jsa.outAdd(adl);
3564             it->get()->outputAssociation(jsa);
3565         }
3566     }
3567 
3568     // Populate the TableInfo map with the job steps keyed by table ID.
3569     JobStepVector joinSteps;
3570     JobStepVector& expSteps = jobInfo.crossTableExpressions;
3571     JobStepVector::iterator it = querySteps.begin();
3572     JobStepVector::iterator end = querySteps.end();
3573 
3574     while (it != end)
3575     {
3576         // Separate table joins from other predicates.
3577         TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(it->get());
3578         ExpressionStep* exps = dynamic_cast<ExpressionStep*>(it->get());
3579         SubAdapterStep* subs = dynamic_cast<SubAdapterStep*>(it->get());
3580 
3581         if (thjs != NULL && thjs->tupleId1() != thjs->tupleId2())
3582         {
3583             // simple column and constant column semi join
3584             if (thjs->tableOid2() == 0 && thjs->schema2().empty())
3585             {
3586                 jobInfo.correlateSteps.push_back(*it++);
3587                 continue;
3588             }
3589 
3590             // check correlated join step
3591             JoinType joinType = thjs->getJoinType();
3592 
3593             if (joinType & CORRELATED)
3594             {
3595                 // one of the tables is in outer query
3596                 jobInfo.correlateSteps.push_back(*it++);
3597                 continue;
3598             }
3599 
3600             // Save the join topology.
3601             uint32_t key1 = thjs->tupleId1();
3602             uint32_t key2 = thjs->tupleId2();
3603             uint32_t tid1 = getTableKey(jobInfo, key1);
3604             uint32_t tid2 = getTableKey(jobInfo, key2);
3605 
3606             if (thjs->dictOid1() > 0)
3607                 key1 = jobInfo.keyInfo->dictKeyMap[key1];
3608 
3609             if (thjs->dictOid2() > 0)
3610                 key2 = jobInfo.keyInfo->dictKeyMap[key2];
3611 
3612             // not correlated
3613             joinSteps.push_back(*it);
3614             tableInfoMap[tid1].fJoinKeys.push_back(key1);
3615             tableInfoMap[tid2].fJoinKeys.push_back(key2);
3616 
3617             // save the function join expressions
3618             boost::shared_ptr<FunctionJoinInfo> fji = thjs->funcJoinInfo();
3619 
3620             if (fji)
3621             {
3622                 if (fji->fStep[0])
3623                 {
3624                     tableInfoMap[tid1].fFuncJoinExps.push_back(fji->fStep[0]);
3625                     vector<uint32_t>& cols = tableInfoMap[tid1].fColsInFuncJoin;
3626                     cols.insert(cols.end(), fji->fColumnKeys[0].begin(), fji->fColumnKeys[0].end());
3627                 }
3628 
3629                 if (fji->fStep[1])
3630                 {
3631                     tableInfoMap[tid2].fFuncJoinExps.push_back(fji->fStep[1]);
3632                     vector<uint32_t>& cols = tableInfoMap[tid2].fColsInFuncJoin;
3633                     cols.insert(cols.end(), fji->fColumnKeys[1].begin(), fji->fColumnKeys[1].end());
3634                 }
3635             }
3636 
3637             // keep a join map
3638             pair<uint32_t, uint32_t> tablePair(tid1, tid2);
3639             TableJoinMap::iterator m1 = jobInfo.tableJoinMap.find(tablePair);
3640             TableJoinMap::iterator m2 = jobInfo.tableJoinMap.end();
3641 
3642             if (m1 == jobInfo.tableJoinMap.end())
3643             {
3644                 tableInfoMap[tid1].fAdjacentList.push_back(tid2);
3645                 tableInfoMap[tid2].fAdjacentList.push_back(tid1);
3646 
3647                 m1 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid1, tid2), JoinData()));
3648                 m2 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid2, tid1), JoinData()));
3649 
3650                 TupleInfo ti1(getTupleInfo(key1, jobInfo));
3651                 TupleInfo ti2(getTupleInfo(key2, jobInfo));
3652 
3653                 if (ti1.width > 8 || ti2.width > 8)
3654                 {
3655                     if (ti1.dtype ==  execplan::CalpontSystemCatalog::LONGDOUBLE
3656                      || ti2.dtype ==  execplan::CalpontSystemCatalog::LONGDOUBLE)
3657                     {
3658                         m1->second.fTypeless = m2->second.fTypeless = false;
3659                     }
3660                     else
3661                     {
3662                         m1->second.fTypeless = m2->second.fTypeless = true;
3663                     }
3664                 }
3665                 else
3666                 {
3667                     m1->second.fTypeless = m2->second.fTypeless = isCharType(ti1.dtype) ||
3668                                                                   isCharType(ti2.dtype);
3669                 }
3670             }
3671             else
3672             {
3673                 m2 = jobInfo.tableJoinMap.find(make_pair(tid2, tid1));
3674                 m1->second.fTypeless = m2->second.fTypeless = true;
3675             }
3676 
3677             if (m1 == jobInfo.tableJoinMap.end() || m2 == jobInfo.tableJoinMap.end())
3678                 throw runtime_error("Bad table map.");
3679 
3680             // Keep a map of the join (table, key) pairs
3681             m1->second.fLeftKeys.push_back(key1);
3682             m1->second.fRightKeys.push_back(key2);
3683 
3684             m2->second.fLeftKeys.push_back(key2);
3685             m2->second.fRightKeys.push_back(key1);
3686 
3687             // Keep a map of the join type between the keys.
3688             // OUTER join and SEMI/ANTI join are mutually exclusive.
3689             if (joinType == LEFTOUTER)
3690             {
3691                 m1->second.fTypes.push_back(SMALLOUTER);
3692                 m2->second.fTypes.push_back(LARGEOUTER);
3693                 jobInfo.outerOnTable.insert(tid2);
3694             }
3695             else if (joinType == RIGHTOUTER)
3696             {
3697                 m1->second.fTypes.push_back(LARGEOUTER);
3698                 m2->second.fTypes.push_back(SMALLOUTER);
3699                 jobInfo.outerOnTable.insert(tid1);
3700             }
3701             else if ((joinType & SEMI) &&
3702                      ((joinType & LEFTOUTER) == LEFTOUTER || (joinType & RIGHTOUTER) == RIGHTOUTER))
3703             {
3704                 // @bug3998, DML UPDATE borrows "SEMI" flag,
3705                 // allowing SEMI and LARGEOUTER combination to support update with outer join.
3706                 if ((joinType & LEFTOUTER) == LEFTOUTER)
3707                 {
3708                     joinType ^= LEFTOUTER;
3709                     m1->second.fTypes.push_back(joinType);
3710                     m2->second.fTypes.push_back(joinType | LARGEOUTER);
3711                     jobInfo.outerOnTable.insert(tid2);
3712                 }
3713                 else
3714                 {
3715                     joinType ^= RIGHTOUTER;
3716                     m1->second.fTypes.push_back(joinType | LARGEOUTER);
3717                     m2->second.fTypes.push_back(joinType);
3718                     jobInfo.outerOnTable.insert(tid1);
3719                 }
3720             }
3721             else
3722             {
3723                 m1->second.fTypes.push_back(joinType);
3724                 m2->second.fTypes.push_back(joinType);
3725             }
3726 
3727             // need id to keep the join order
3728             m1->second.fJoinId = m2->second.fJoinId = thjs->joinId();
3729         }
3730         // Separate the expressions
3731         else if (exps != NULL && subs == NULL)
3732         {
3733             const vector<uint32_t>& tables = exps->tableKeys();
3734             const vector<uint32_t>& columns = exps->columnKeys();
3735             bool  tableInOuterQuery = false;
3736             set<uint32_t> tableSet;	           // involved unique tables
3737 
3738             for (uint64_t i = 0; i < tables.size(); ++i)
3739             {
3740                 if (find(jobInfo.tableList.begin(), jobInfo.tableList.end(), tables[i]) !=
3741                         jobInfo.tableList.end())
3742                     tableSet.insert(tables[i]);
3743                 else
3744                     tableInOuterQuery = true;
3745             }
3746 
3747             if (tableInOuterQuery)
3748             {
3749                 // all columns in subquery scope to be projected
3750                 for (uint64_t i = 0; i < tables.size(); ++i)
3751                 {
3752                     // outer-query columns
3753                     if (tableSet.find(tables[i]) == tableSet.end())
3754                         continue;
3755 
3756                     // subquery columns
3757                     uint32_t c = columns[i];
3758 
3759                     if (jobInfo.returnColSet.find(c) == jobInfo.returnColSet.end())
3760                     {
3761                         tableInfoMap[tables[i]].fProjectCols.push_back(c);
3762                         jobInfo.pjColList.push_back(getTupleInfo(c, jobInfo));
3763                         jobInfo.returnColSet.insert(c);
3764                         const SimpleColumn* sc =
3765                             dynamic_cast<const SimpleColumn*>(exps->columns()[i]);
3766 
3767                         if (sc != NULL)
3768                             jobInfo.deliveredCols.push_back(SRCP(sc->clone()));
3769                     }
3770                 }
3771 
3772                 jobInfo.correlateSteps.push_back(*it++);
3773                 continue;
3774             }
3775 
3776             // is the expression cross tables?
3777             if (tableSet.size() == 1 && exps->associatedJoinId() == 0)
3778             {
3779                 // single table and not in join on clause
3780                 uint32_t tid = tables[0];
3781 
3782                 for (uint64_t i = 0; i < columns.size(); ++i)
3783                     tableInfoMap[tid].fColsInExp1.push_back(columns[i]);
3784 
3785                 tableInfoMap[tid].fOneTableExpSteps.push_back(*it);
3786             }
3787             else
3788             {
3789                 // WORKAROUND for limitation on join with filter
3790                 if (exps->associatedJoinId() != 0)
3791                 {
3792                     for (uint64_t i = 0; i < exps->columns().size(); ++i)
3793                     {
3794                         jobInfo.joinFeTableMap[exps->associatedJoinId()].insert(tables[i]);
3795                     }
3796                 }
3797 
3798                 // resolve after join: cross table or on clause conditions
3799                 for (uint64_t i = 0; i < columns.size(); ++i)
3800                 {
3801                     uint32_t cid = columns[i];
3802                     uint32_t tid = getTableKey(jobInfo, cid);
3803                     tableInfoMap[tid].fColsInExp2.push_back(cid);
3804                 }
3805 
3806                 expSteps.push_back(*it);
3807             }
3808         }
3809         // Separate the other steps by unique ID.
3810         else
3811         {
3812             uint32_t tid = -1;
3813             uint64_t cid = (*it)->tupleId();
3814 
3815             if (cid != (uint64_t) - 1)
3816                 tid = getTableKey(jobInfo, (*it)->tupleId());
3817             else
3818                 tid = getTableKey(jobInfo, it->get());
3819 
3820             if (find(jobInfo.tableList.begin(), jobInfo.tableList.end(), tid) !=
3821                     jobInfo.tableList.end())
3822             {
3823                 tableInfoMap[tid].fQuerySteps.push_back(*it);
3824             }
3825             else
3826             {
3827                 jobInfo.correlateSteps.push_back(*it);
3828             }
3829         }
3830 
3831         it++;
3832     }
3833 
3834     // @bug2634, delay isNull filter on outerjoin key
3835     // @bug5374, delay predicates for outerjoin
3836     outjoinPredicateAdjust(tableInfoMap, jobInfo);
3837 
3838     // @bug4021, make sure there is real column to scan
3839     for (TableInfoMap::iterator it = tableInfoMap.begin(); it != tableInfoMap.end(); it++)
3840     {
3841         uint32_t tableUid = it->first;
3842 
3843         if (jobInfo.pseudoColTable.find(tableUid) == jobInfo.pseudoColTable.end())
3844             continue;
3845 
3846         JobStepVector& steps = tableInfoMap[tableUid].fQuerySteps;
3847         JobStepVector::iterator s = steps.begin();
3848         JobStepVector::iterator p = steps.end();
3849 
3850         for (; s != steps.end(); s++)
3851         {
3852             if (typeid(*(s->get())) == typeid(pColScanStep) ||
3853                     typeid(*(s->get())) == typeid(pColStep))
3854                 break;
3855 
3856             // @bug5893, iterator to the first pseudocolumn
3857             if (typeid(*(s->get())) == typeid(PseudoColStep) && p == steps.end())
3858                 p = s;
3859         }
3860 
3861         if (s == steps.end())
3862         {
3863             map<uint64_t, SRCP>::iterator t = jobInfo.tableColMap.find(tableUid);
3864 
3865             if (t == jobInfo.tableColMap.end())
3866             {
3867                 string msg = jobInfo.keyInfo->tupleKeyToName[tableUid];
3868                 msg += " has no column in column map.";
3869                 throw runtime_error(msg);
3870             }
3871 
3872             SimpleColumn* sc = dynamic_cast<SimpleColumn*>(t->second.get());
3873             CalpontSystemCatalog::OID oid = sc->oid();
3874             CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
3875             CalpontSystemCatalog::ColType ct = sc->colType();
3876             string alias(extractTableAlias(sc));
3877             SJSTEP sjs(new pColScanStep(oid, tblOid, ct, jobInfo));
3878             sjs->alias(alias);
3879             sjs->view(sc->viewName());
3880             sjs->schema(sc->schemaName());
3881             sjs->name(sc->columnName());
3882             TupleInfo ti(setTupleInfo(ct, oid, jobInfo, tblOid, sc, alias));
3883             sjs->tupleId(ti.key);
3884             steps.insert(steps.begin(), sjs);
3885 
3886             if (isDictCol(ct) && jobInfo.tokenOnly.find(ti.key) == jobInfo.tokenOnly.end())
3887                 jobInfo.tokenOnly[ti.key] = true;
3888         }
3889         else if (s > p)
3890         {
3891             // @bug5893, make sure a pcol is in front of any pseudo step.
3892             SJSTEP t = *s;
3893             *s = *p;
3894             *p = t;
3895         }
3896     }
3897 
3898     // @bug3767, error out scalar subquery with aggregation and correlated additional comparison.
3899     if (jobInfo.hasAggregation && (jobInfo.correlateSteps.size() > 0))
3900     {
3901         // expression filter
3902         ExpressionStep* exp = NULL;
3903 
3904         for (it = jobInfo.correlateSteps.begin(); it != jobInfo.correlateSteps.end(); it++)
3905         {
3906             if (((exp = dynamic_cast<ExpressionStep*>(it->get())) != NULL) && (!exp->functionJoin()))
3907                 break;
3908 
3909             exp = NULL;
3910         }
3911 
3912         // correlated join step
3913         TupleHashJoinStep* thjs = NULL;
3914 
3915         for (it = jobInfo.correlateSteps.begin(); it != jobInfo.correlateSteps.end(); it++)
3916         {
3917             if ((thjs = dynamic_cast<TupleHashJoinStep*>(it->get())) != NULL)
3918                 break;
3919         }
3920 
3921         // @bug5202, error out not equal correlation and aggregation in subquery.
3922         if ((exp != NULL) && (thjs != NULL) && (thjs->getJoinType() & CORRELATED))
3923             throw IDBExcept(
3924                 IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_NEQ_AGG_SUB),
3925                 ERR_NON_SUPPORT_NEQ_AGG_SUB);
3926     }
3927 
3928     it = projectSteps.begin();
3929     end = projectSteps.end();
3930 
3931     while (it != end)
3932     {
3933         uint32_t tid = getTableKey(jobInfo, (*it)->tupleId());
3934         tableInfoMap[tid].fProjectSteps.push_back(*it);
3935         tableInfoMap[tid].fProjectCols.push_back((*it)->tupleId());
3936         it++;
3937     }
3938 
3939     for (TupleInfoVector::iterator j = jobInfo.pjColList.begin(); j != jobInfo.pjColList.end(); j++)
3940     {
3941         if (jobInfo.keyInfo->tupleKeyVec[j->tkey].fId == CNX_EXP_TABLE_ID)
3942             continue;
3943 
3944         vector<uint32_t>& projectCols = tableInfoMap[j->tkey].fProjectCols;
3945 
3946         if (find(projectCols.begin(), projectCols.end(), j->key) == projectCols.end())
3947             projectCols.push_back(j->key);
3948     }
3949 
3950     JobStepVector& retExp = jobInfo.returnedExpressions;
3951 
3952     for (it = retExp.begin(); it != retExp.end(); ++it)
3953     {
3954         ExpressionStep* exp = dynamic_cast<ExpressionStep*>(it->get());
3955 
3956         if (exp == NULL)
3957             throw runtime_error("Not an expression.");
3958 
3959         for (uint64_t i = 0; i < exp->columnKeys().size(); ++i)
3960         {
3961             tableInfoMap[exp->tableKeys()[i]].fColsInRetExp.push_back(exp->columnKeys()[i]);
3962         }
3963     }
3964 
3965     // reset all step vector
3966     querySteps.clear();
3967     projectSteps.clear();
3968     deliverySteps.clear();
3969 
3970     // Check if the tables and joins can be used to construct a spanning tree.
3971     spanningTreeCheck(tableInfoMap, joinSteps, jobInfo);
3972 
3973     // 1. combine job steps for each table
3974     TableInfoMap::iterator mit;
3975 
3976     for (mit = tableInfoMap.begin(); mit != tableInfoMap.end(); mit++)
3977         if (combineJobStepsByTable(mit, jobInfo) == false)
3978             throw runtime_error("combineJobStepsByTable failed.");
3979 
3980     // 2. join the combined steps together to form the spanning tree
3981     vector<uint32_t> joinOrder;
3982     joinTables(joinSteps, tableInfoMap, jobInfo, joinOrder, overrideLargeSideEstimate);
3983 
3984     // 3. put the steps together
3985     for (vector<uint32_t>::iterator i = joinOrder.begin(); i != joinOrder.end(); ++i)
3986         querySteps.insert(querySteps.end(),
3987                           tableInfoMap[*i].fQuerySteps.begin(),
3988                           tableInfoMap[*i].fQuerySteps.end());
3989 
3990     adjustLastStep(querySteps, deliverySteps, jobInfo);  // to match the select clause
3991 }
3992 
3993 
unionQueries(JobStepVector & queries,uint64_t distinctUnionNum,JobInfo & jobInfo)3994 SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo& jobInfo)
3995 {
3996     vector<RowGroup> inputRGs;
3997     vector<bool> distinct;
3998     uint64_t colCount = jobInfo.deliveredCols.size();
3999 
4000     vector<uint32_t> oids;
4001     vector<uint32_t> keys;
4002     vector<uint32_t> scale;
4003     vector<uint32_t> precision;
4004     vector<uint32_t> width;
4005     vector<CalpontSystemCatalog::ColDataType> types;
4006     vector<uint32_t> csNums;
4007     JobStepAssociation jsaToUnion;
4008 
4009     // bug4388, share code with connector for column type coversion
4010     vector<vector<CalpontSystemCatalog::ColType> > queryColTypes;
4011 
4012     for (uint64_t j = 0; j < colCount; ++j)
4013         queryColTypes.push_back(vector<CalpontSystemCatalog::ColType>(queries.size()));
4014 
4015     for (uint64_t i = 0; i < queries.size(); i++)
4016     {
4017         SJSTEP& spjs = queries[i];
4018         TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());
4019 
4020         if (tds == NULL)
4021         {
4022             throw runtime_error("Not a deliverable step.");
4023         }
4024 
4025         const RowGroup& rg = tds->getDeliveredRowGroup();
4026         inputRGs.push_back(rg);
4027 
4028         const vector<uint32_t>& scaleIn = rg.getScale();
4029         const vector<uint32_t>& precisionIn = rg.getPrecision();
4030         const vector<CalpontSystemCatalog::ColDataType>& typesIn = rg.getColTypes();
4031         const vector<uint32_t>& csNumsIn = rg.getCharsetNumbers();
4032 
4033         for (uint64_t j = 0; j < colCount; ++j)
4034         {
4035             queryColTypes[j][i].colDataType = typesIn[j];
4036             queryColTypes[j][i].charsetNumber = csNumsIn[j];
4037             queryColTypes[j][i].scale = scaleIn[j];
4038             queryColTypes[j][i].precision = precisionIn[j];
4039             queryColTypes[j][i].colWidth = rg.getColumnWidth(j);
4040         }
4041 
4042         if (i == 0)
4043         {
4044             const vector<uint32_t>& oidsIn = rg.getOIDs();
4045             const vector<uint32_t>& keysIn = rg.getKeys();
4046             oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + colCount);
4047             keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + colCount);
4048         }
4049 
4050         // if all union types are UNION_ALL, distinctUnionNum is 0.
4051         distinct.push_back(distinctUnionNum > i);
4052 
4053         AnyDataListSPtr spdl(new AnyDataList());
4054         RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
4055         spdl->rowGroupDL(dl);
4056         dl->OID(CNX_VTABLE_ID);
4057         JobStepAssociation jsa;
4058         jsa.outAdd(spdl);
4059         spjs->outputAssociation(jsa);
4060         jsaToUnion.outAdd(spdl);
4061     }
4062 
4063     AnyDataListSPtr spdl(new AnyDataList());
4064     RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
4065     spdl->rowGroupDL(dl);
4066     dl->OID(CNX_VTABLE_ID);
4067     JobStepAssociation jsa;
4068     jsa.outAdd(spdl);
4069     TupleUnion* unionStep = new  TupleUnion(CNX_VTABLE_ID, jobInfo);
4070     unionStep->inputAssociation(jsaToUnion);
4071     unionStep->outputAssociation(jsa);
4072 
4073     // get unioned column types
4074     for (uint64_t j = 0; j < colCount; ++j)
4075     {
4076         CalpontSystemCatalog::ColType colType = DataConvert::convertUnionColType(queryColTypes[j]);
4077         types.push_back(colType.colDataType);
4078         csNums.push_back(colType.charsetNumber);
4079         scale.push_back(colType.scale);
4080         precision.push_back(colType.precision);
4081         width.push_back(colType.colWidth);
4082     }
4083 
4084     vector<uint32_t> pos;
4085     pos.push_back(2);
4086 
4087     for (uint64_t i = 0; i < oids.size(); ++i)
4088         pos.push_back(pos[i] + width[i]);
4089 
4090     unionStep->setInputRowGroups(inputRGs);
4091     unionStep->setDistinctFlags(distinct);
4092     unionStep->setOutputRowGroup(RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold));
4093 
4094     // Fix for bug 4388 adjusts the result type at connector side, this workaround is obsolete.
4095     // bug 3067, update the returned column types.
4096     // This is a workaround as the connector always uses the first query' returned columns.
4097     // ct.colDataType = types[i];
4098     // ct.scale = scale[i];
4099     // ct.colWidth = width[i];
4100 
4101     for (size_t i = 0; i < jobInfo.deliveredCols.size(); i++)
4102     {
4103         CalpontSystemCatalog::ColType ct = jobInfo.deliveredCols[i]->resultType();
4104 //XXX remove after connector change
4105         ct.colDataType = types[i];
4106         ct.scale = scale[i];
4107         ct.colWidth = width[i];
4108 
4109         // varchar/varbinary column width has been fudged, see fudgeWidth in jlf_common.cpp.
4110         if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
4111             ct.colWidth--;
4112         else if (ct.colDataType == CalpontSystemCatalog::VARBINARY)
4113             ct.colWidth -= 2;
4114 
4115         jobInfo.deliveredCols[i]->resultType(ct);
4116     }
4117 
4118     if (jobInfo.trace)
4119     {
4120         cout << boldStart << "\ninput RGs: (distinct=" << distinctUnionNum << ")\n" << boldStop;
4121 
4122         for (vector<RowGroup>::iterator i = inputRGs.begin(); i != inputRGs.end(); i++)
4123             cout << i->toString() << endl << endl;
4124 
4125         cout << boldStart << "output RG:\n" << boldStop
4126              << unionStep->getDeliveredRowGroup().toString() << endl;
4127     }
4128 
4129     return SJSTEP(unionStep);
4130 }
4131 
4132 }
4133 // vim:ts=4 sw=4:
4134 
4135