1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: jlf_tuplejoblist.cpp 9728 2013-07-26 22:08:20Z xlou $
20
21 // Cross engine needs to be at the top due to MySQL includes
22 #include "crossenginestep.h"
23 #include <iostream>
24 #include <stack>
25 #include <iterator>
26 #include <algorithm>
27 //#define NDEBUG
28 //#include <cassert>
29 #include <vector>
30 #include <set>
31 #include <map>
32 #include <limits>
33 using namespace std;
34
35 #include <boost/scoped_ptr.hpp>
36 #include <boost/shared_ptr.hpp>
37 using namespace boost;
38
39 #include "calpontsystemcatalog.h"
40 #include "logicoperator.h"
41 using namespace execplan;
42
43 #include "rowgroup.h"
44 #include "rowaggregation.h"
45 using namespace rowgroup;
46
47 #include "idberrorinfo.h"
48 #include "errorids.h"
49 #include "exceptclasses.h"
50 using namespace logging;
51
52 #include "dataconvert.h"
53 using namespace dataconvert;
54
55 #include "elementtype.h"
56 #include "jlf_common.h"
57 #include "limitedorderby.h"
58 #include "jobstep.h"
59 #include "primitivestep.h"
60 #include "expressionstep.h"
61 #include "subquerystep.h"
62 #include "tupleaggregatestep.h"
63 #include "tupleannexstep.h"
64 #include "tupleconstantstep.h"
65 #include "tuplehashjoin.h"
66 #include "tuplehavingstep.h"
67 #include "tupleunion.h"
68 #include "windowfunctionstep.h"
69 #include "configcpp.h"
70 #include "jlf_tuplejoblist.h"
71 using namespace joblist;
72
73
74 namespace
75 {
76
77
78 // construct a pcolstep from column key
tupleKeyToProjectStep(uint32_t key,JobStepVector & jsv,JobInfo & jobInfo)79 void tupleKeyToProjectStep(uint32_t key, JobStepVector& jsv, JobInfo& jobInfo)
80 {
81 // this JSA is for pcolstep construct, is not taking input/output
82 // because the pcolstep is to be added into TBPS
83 CalpontSystemCatalog::OID oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
84 DictOidToColOidMap::iterator mit = jobInfo.keyInfo->dictOidToColOid.find(oid);
85
86 // if the key is for a dictionary, start with its token key
87 if (mit != jobInfo.keyInfo->dictOidToColOid.end())
88 {
89 oid = mit->second;
90
91 for (map<uint32_t, uint32_t>::iterator i = jobInfo.keyInfo->dictKeyMap.begin();
92 i != jobInfo.keyInfo->dictKeyMap.end();
93 i++)
94 {
95 if (key == i->second)
96 {
97 key = i->first;
98 break;
99 }
100 }
101
102 jobInfo.tokenOnly[key] = false;
103 }
104
105 CalpontSystemCatalog::OID tableOid = jobInfo.keyInfo->tupleKeyToTableOid[key];
106 // JobStepAssociation dummyJsa;
107 // AnyDataListSPtr adl(new AnyDataList());
108 // RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
109 // dl->OID(oid);
110 // adl->rowGroupDL(dl);
111 // dummyJsa.outAdd(adl);
112
113 CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key];
114
115 if (jobInfo.keyInfo->token2DictTypeMap.find(key) != jobInfo.keyInfo->token2DictTypeMap.end())
116 ct = jobInfo.keyInfo->token2DictTypeMap[key];
117
118 uint32_t pt = jobInfo.keyInfo->pseudoType[key];
119
120 SJSTEP sjs;
121
122 if (pt == 0)
123 sjs.reset(new pColStep(oid, tableOid, ct, jobInfo));
124 else
125 sjs.reset(new PseudoColStep(oid, tableOid, pt, ct, jobInfo));
126
127 sjs->alias(jobInfo.keyInfo->tupleKeyVec[key].fTable);
128 sjs->view(jobInfo.keyInfo->tupleKeyVec[key].fView);
129 sjs->schema(jobInfo.keyInfo->tupleKeyVec[key].fSchema);
130 sjs->name(jobInfo.keyInfo->keyName[key]);
131 sjs->tupleId(key);
132
133 jsv.push_back(sjs);
134
135 bool tokenOnly = false;
136 map<uint32_t, bool>::iterator toIt = jobInfo.tokenOnly.find(key);
137
138 if (toIt != jobInfo.tokenOnly.end())
139 tokenOnly = toIt->second;
140
141 if (sjs.get()->isDictCol() && !tokenOnly)
142 {
143 // Need a dictionary step
144 uint32_t dictKey = jobInfo.keyInfo->dictKeyMap[key];
145 CalpontSystemCatalog::OID dictOid = jobInfo.keyInfo->tupleKeyVec[dictKey].fId;
146 sjs.reset(new pDictionaryStep(dictOid, tableOid, ct, jobInfo));
147 sjs->alias(jobInfo.keyInfo->tupleKeyVec[dictKey].fTable);
148 sjs->view(jobInfo.keyInfo->tupleKeyVec[dictKey].fView);
149 sjs->schema(jobInfo.keyInfo->tupleKeyVec[dictKey].fSchema);
150 sjs->name(jobInfo.keyInfo->keyName[dictKey]);
151 sjs->tupleId(dictKey);
152
153 jobInfo.keyInfo->dictOidToColOid[dictOid] = oid;
154
155 jsv.push_back(sjs);
156 }
157 }
158
159
addColumnToRG(uint32_t cid,vector<uint32_t> & pos,vector<uint32_t> & oids,vector<uint32_t> & keys,vector<uint32_t> & scale,vector<uint32_t> & precision,vector<CalpontSystemCatalog::ColDataType> & types,vector<uint32_t> & csNums,JobInfo & jobInfo)160 inline void addColumnToRG(uint32_t cid, vector<uint32_t>& pos, vector<uint32_t>& oids,
161 vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
162 vector<CalpontSystemCatalog::ColDataType>& types, vector<uint32_t>& csNums, JobInfo& jobInfo)
163 {
164 TupleInfo ti(getTupleInfo(cid, jobInfo));
165 pos.push_back(pos.back() + ti.width);
166 oids.push_back(ti.oid);
167 keys.push_back(ti.key);
168 types.push_back(ti.dtype);
169 csNums.push_back(ti.csNum);
170 scale.push_back(ti.scale);
171 precision.push_back(ti.precision);
172 }
173
174
addColumnInExpToRG(uint32_t cid,vector<uint32_t> & pos,vector<uint32_t> & oids,vector<uint32_t> & keys,vector<uint32_t> & scale,vector<uint32_t> & precision,vector<CalpontSystemCatalog::ColDataType> & types,vector<uint32_t> & csNums,JobInfo & jobInfo)175 inline void addColumnInExpToRG(uint32_t cid, vector<uint32_t>& pos, vector<uint32_t>& oids,
176 vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
177 vector<CalpontSystemCatalog::ColDataType>& types, vector<uint32_t>& csNums, JobInfo& jobInfo)
178 {
179 if (jobInfo.keyInfo->dictKeyMap.find(cid) != jobInfo.keyInfo->dictKeyMap.end())
180 cid = jobInfo.keyInfo->dictKeyMap[cid];
181
182 if (find(keys.begin(), keys.end(), cid) == keys.end())
183 addColumnToRG(cid, pos, oids, keys, scale, precision, types, csNums, jobInfo);
184 }
185
186
addColumnsToRG(uint32_t tid,vector<uint32_t> & pos,vector<uint32_t> & oids,vector<uint32_t> & keys,vector<uint32_t> & scale,vector<uint32_t> & precision,vector<CalpontSystemCatalog::ColDataType> & types,vector<uint32_t> & csNums,TableInfoMap & tableInfoMap,JobInfo & jobInfo)187 inline void addColumnsToRG(uint32_t tid, vector<uint32_t>& pos, vector<uint32_t>& oids,
188 vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
189 vector<CalpontSystemCatalog::ColDataType>& types,
190 vector<uint32_t>& csNums,
191 TableInfoMap& tableInfoMap, JobInfo& jobInfo)
192 {
193 // -- the selected columns
194 vector<uint32_t>& pjCol = tableInfoMap[tid].fProjectCols;
195
196 for (unsigned i = 0; i < pjCol.size(); i++)
197 {
198 addColumnToRG(pjCol[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
199 }
200
201 // -- any columns will be used in cross-table exps
202 vector<uint32_t>& exp2 = tableInfoMap[tid].fColsInExp2;
203
204 for (unsigned i = 0; i < exp2.size(); i++)
205 {
206 addColumnInExpToRG(exp2[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
207 }
208
209 // -- any columns will be used in returned exps
210 vector<uint32_t>& expr = tableInfoMap[tid].fColsInRetExp;
211
212 for (unsigned i = 0; i < expr.size(); i++)
213 {
214 addColumnInExpToRG(expr[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
215 }
216
217 // -- any columns will be used in final outer join expression
218 vector<uint32_t>& expo = tableInfoMap[tid].fColsInOuter;
219
220 for (unsigned i = 0; i < expo.size(); i++)
221 {
222 addColumnInExpToRG(expo[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
223 }
224 }
225
226
constructJoinedRowGroup(RowGroup & rg,uint32_t large,uint32_t prev,bool root,set<uint32_t> & tableSet,TableInfoMap & tableInfoMap,JobInfo & jobInfo)227 void constructJoinedRowGroup(RowGroup& rg, uint32_t large, uint32_t prev, bool root,
228 set<uint32_t>& tableSet, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
229 {
230 // Construct the output rowgroup for the join.
231 vector<uint32_t> pos;
232 vector<uint32_t> oids;
233 vector<uint32_t> keys;
234 vector<uint32_t> scale;
235 vector<uint32_t> precision;
236 vector<CalpontSystemCatalog::ColDataType> types;
237 vector<uint32_t> csNums;
238 pos.push_back(2);
239
240 // -- start with the join keys
241 // lead by joinkeys -- to have more controls on joins
242 // [loop throuh the key list to support compound join]
243 if (root == false) // not root
244 {
245 vector<uint32_t>& joinKeys = jobInfo.tableJoinMap[make_pair(large, prev)].fLeftKeys;
246
247 for (vector<uint32_t>::iterator i = joinKeys.begin(); i != joinKeys.end(); i++)
248 addColumnToRG(*i, pos, oids, keys, scale, precision, types, csNums, jobInfo);
249 }
250
251 // -- followed by the columns in select or expression
252 for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
253 addColumnsToRG(*i, pos, oids, keys, scale, precision, types, csNums, tableInfoMap, jobInfo);
254
255 RowGroup tmpRg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
256 rg = tmpRg;
257 }
258
259
constructJoinedRowGroup(RowGroup & rg,set<uint32_t> & tableSet,TableInfoMap & tableInfoMap,JobInfo & jobInfo)260 void constructJoinedRowGroup(RowGroup& rg, set<uint32_t>& tableSet, TableInfoMap& tableInfoMap,
261 JobInfo& jobInfo)
262 {
263 // Construct the output rowgroup for the join.
264 vector<uint32_t> pos;
265 vector<uint32_t> oids;
266 vector<uint32_t> keys;
267 vector<uint32_t> scale;
268 vector<uint32_t> precision;
269 vector<CalpontSystemCatalog::ColDataType> types;
270 vector<uint32_t> csNums;
271 pos.push_back(2);
272
273 for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
274 {
275 // columns in select or expression
276 addColumnsToRG(*i, pos, oids, keys, scale, precision, types, csNums, tableInfoMap, jobInfo);
277
278 // keys to be joined if not already in the rowgroup
279 vector<uint32_t>& adjList = tableInfoMap[*i].fAdjacentList;
280
281 for (vector<uint32_t>::iterator j = adjList.begin(); j != adjList.end(); j++)
282 {
283 if (find(tableSet.begin(), tableSet.end(), *j) == tableSet.end())
284 {
285 // not joined
286 vector<uint32_t>& joinKeys = jobInfo.tableJoinMap[make_pair(*i, *j)].fLeftKeys;
287
288 for (vector<uint32_t>::iterator k = joinKeys.begin(); k != joinKeys.end(); k++)
289 {
290 if (find(keys.begin(), keys.end(), *k) == keys.end())
291 addColumnToRG(*k, pos, oids, keys, scale, precision, types, csNums, jobInfo);
292 }
293 }
294 }
295 }
296
297 RowGroup tmpRg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
298 rg = tmpRg;
299 }
300
301
updateExp2Cols(JobStepVector & expSteps,TableInfoMap & tableInfoMap,JobInfo & jobInfo)302 void updateExp2Cols(JobStepVector& expSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
303 {
304 for (JobStepVector::iterator it = expSteps.begin(); it != expSteps.end(); it++)
305 {
306 ExpressionStep* exps = dynamic_cast<ExpressionStep*>(it->get());
307 const vector<uint32_t>& tables = exps->tableKeys();
308 const vector<uint32_t>& columns = exps->columnKeys();
309
310 for (uint64_t i = 0; i < tables.size(); ++i)
311 {
312 vector<uint32_t>& exp2 = tableInfoMap[tables[i]].fColsInExp2;
313 vector<uint32_t>::iterator cit = find(exp2.begin(), exp2.end(), columns[i]);
314
315 if (cit != exp2.end())
316 exp2.erase(cit);
317 }
318 }
319 }
320
321
adjustLastStep(JobStepVector & querySteps,DeliveredTableMap & deliverySteps,JobInfo & jobInfo)322 void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, JobInfo& jobInfo)
323 {
324 SJSTEP spjs = querySteps.back();
325 BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(spjs.get());
326 TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
327 SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(spjs.get());
328
329 if (!bps && !thjs && !sas)
330 throw runtime_error("Bad last step");
331
332 // original output rowgroup of the step
333 TupleJobStep* tjs = dynamic_cast<TupleJobStep*>(spjs.get());
334 const RowGroup* rg0 = &(tjs->getOutputRowGroup());
335
336 if (jobInfo.trace) cout << "Output RowGroup 0: " << rg0->toString() << endl;
337
338 // Construct a rowgroup that matches the select columns
339 TupleInfoVector v = jobInfo.pjColList;
340 vector<uint32_t> pos;
341 vector<uint32_t> oids;
342 vector<uint32_t> keys;
343 vector<uint32_t> scale;
344 vector<uint32_t> precision;
345 vector<CalpontSystemCatalog::ColDataType> types;
346 vector<uint32_t> csNums;
347 pos.push_back(2);
348
349 for (unsigned i = 0; i < v.size(); i++)
350 {
351 pos.push_back(pos.back() + v[i].width);
352 oids.push_back(v[i].oid);
353 keys.push_back(v[i].key);
354 types.push_back(v[i].dtype);
355 csNums.push_back(v[i].csNum);
356 scale.push_back(v[i].scale);
357 precision.push_back(v[i].precision);
358 }
359
360 RowGroup rg1(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
361
362 // evaluate the returned/groupby expressions if any
363 JobStepVector& expSteps = jobInfo.returnedExpressions;
364
365 if (expSteps.size() > 0)
366 {
367 // create a RG has the keys not in rg0
368 pos.clear();
369 oids.clear();
370 keys.clear();
371 scale.clear();
372 precision.clear();
373 types.clear();
374 csNums.clear();
375 pos.push_back(2);
376
377 const vector<uint32_t>& keys0 = rg0->getKeys();
378
379 for (unsigned i = 0; i < v.size(); i++)
380 {
381 if (find(keys0.begin(), keys0.end(), v[i].key) == keys0.end())
382 {
383 pos.push_back(pos.back() + v[i].width);
384 oids.push_back(v[i].oid);
385 keys.push_back(v[i].key);
386 types.push_back(v[i].dtype);
387 csNums.push_back(v[i].csNum);
388 scale.push_back(v[i].scale);
389 precision.push_back(v[i].precision);
390 }
391 }
392
393 // for v0.9.3.0, the output and input to the expression are in the same row
394 // add the returned column into the rg0 as rg01
395 RowGroup rg01 = *rg0 + RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
396
397 if (jobInfo.trace) cout << "Output RowGroup 01: " << rg01.toString() << endl;
398
399 map<uint32_t, uint32_t> keyToIndexMap0; // maps key to the index in the input RG
400
401 for (uint64_t i = 0; i < rg01.getKeys().size(); ++i)
402 keyToIndexMap0.insert(make_pair(rg01.getKeys()[i], i));
403
404 vector<SRCP> exps; // columns to be evaluated
405
406 for (JobStepVector::iterator eit = expSteps.begin(); eit != expSteps.end(); ++eit)
407 {
408 ExpressionStep* es = dynamic_cast<ExpressionStep*>(eit->get());
409 es->updateInputIndex(keyToIndexMap0, jobInfo);
410 es->updateOutputIndex(keyToIndexMap0, jobInfo); // same row as input
411 exps.push_back(es->expression());
412 }
413
414 // last step can be tbps (no join) or thjs, either one can have a group 3 expression
415 if (bps || thjs)
416 {
417 tjs->setOutputRowGroup(rg01);
418 tjs->setFcnExpGroup3(exps);
419 tjs->setFE23Output(rg1);
420 }
421 else if (sas)
422 {
423 sas->setFeRowGroup(rg01);
424 sas->addExpression(exps);
425 sas->setOutputRowGroup(rg1);
426 }
427 }
428 else
429 {
430 if (thjs && thjs->hasFcnExpGroup2())
431 thjs->setFE23Output(rg1);
432 else
433 tjs->setOutputRowGroup(rg1);
434 }
435
436 if (jobInfo.trace) cout << "Output RowGroup 1: " << rg1.toString() << endl;
437
438 if (jobInfo.hasAggregation == false)
439 {
440 if (thjs != NULL) //setup a few things for the final thjs step...
441 thjs->outputAssociation(JobStepAssociation());
442
443 deliverySteps[CNX_VTABLE_ID] = spjs;
444 }
445 else
446 {
447 TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());
448 idbassert(tds != NULL);
449 SJSTEP ads = TupleAggregateStep::prepAggregate(spjs, jobInfo);
450 querySteps.push_back(ads);
451
452 if (ads.get() != NULL)
453 deliverySteps[CNX_VTABLE_ID] = ads;
454 else
455 throw std::logic_error("Failed to prepare Aggregation Delivery Step.");
456 }
457
458 if (jobInfo.havingStep)
459 {
460 TupleDeliveryStep* ds =
461 dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
462
463 AnyDataListSPtr spdlIn(new AnyDataList());
464 RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize);
465 dlIn->OID(CNX_VTABLE_ID);
466 spdlIn->rowGroupDL(dlIn);
467 JobStepAssociation jsaIn;
468 jsaIn.outAdd(spdlIn);
469 dynamic_cast<JobStep*>(ds)->outputAssociation(jsaIn);
470 jobInfo.havingStep->inputAssociation(jsaIn);
471
472 AnyDataListSPtr spdlOut(new AnyDataList());
473 RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
474 dlOut->OID(CNX_VTABLE_ID);
475 spdlOut->rowGroupDL(dlOut);
476 JobStepAssociation jsaOut;
477 jsaOut.outAdd(spdlOut);
478 jobInfo.havingStep->outputAssociation(jsaOut);
479
480 querySteps.push_back(jobInfo.havingStep);
481 dynamic_cast<TupleHavingStep*>(jobInfo.havingStep.get())
482 ->initialize(ds->getDeliveredRowGroup(), jobInfo);
483 deliverySteps[CNX_VTABLE_ID] = jobInfo.havingStep;
484 }
485
486 if (jobInfo.windowCols.size() > 0)
487 {
488 spjs = querySteps.back();
489 SJSTEP ws = WindowFunctionStep::makeWindowFunctionStep(spjs, jobInfo);
490 idbassert(ws.get());
491 querySteps.push_back(ws);
492 deliverySteps[CNX_VTABLE_ID] = ws;
493 }
494
495 // TODO MCOL-894 we don't need to run sorting|distinct
496 // every time
497 // if ((jobInfo.limitCount != (uint64_t) - 1) ||
498 // (jobInfo.constantCol == CONST_COL_EXIST) ||
499 // (jobInfo.hasDistinct))
500 // {
501 if (jobInfo.annexStep.get() == NULL)
502 jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo));
503
504 TupleAnnexStep* tas = dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get());
505 tas->setLimit(jobInfo.limitStart, jobInfo.limitCount);
506
507 if (jobInfo.orderByColVec.size() > 0)
508 {
509 tas->addOrderBy(new LimitedOrderBy());
510 if (jobInfo.orderByThreads > 1)
511 tas->setParallelOp();
512 tas->setMaxThreads(jobInfo.orderByThreads);
513 }
514
515 if (jobInfo.constantCol == CONST_COL_EXIST)
516 tas->addConstant(new TupleConstantStep(jobInfo));
517
518 if (jobInfo.hasDistinct)
519 tas->setDistinct();
520
521 // }
522
523 if (jobInfo.annexStep)
524 {
525 TupleDeliveryStep* ds =
526 dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
527 RowGroup rg2 = ds->getDeliveredRowGroup();
528
529 if (jobInfo.trace) cout << "Output RowGroup 2: " << rg2.toString() << endl;
530
531 AnyDataListSPtr spdlIn(new AnyDataList());
532 RowGroupDL* dlIn;
533 if (jobInfo.orderByColVec.size() > 0)
534 dlIn = new RowGroupDL(jobInfo.orderByThreads, jobInfo.fifoSize);
535 else
536 dlIn = new RowGroupDL(1, jobInfo.fifoSize);
537 dlIn->OID(CNX_VTABLE_ID);
538 spdlIn->rowGroupDL(dlIn);
539 JobStepAssociation jsaIn;
540 jsaIn.outAdd(spdlIn);
541 dynamic_cast<JobStep*>(ds)->outputAssociation(jsaIn);
542 jobInfo.annexStep->inputAssociation(jsaIn);
543
544 AnyDataListSPtr spdlOut(new AnyDataList());
545 RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
546 dlOut->OID(CNX_VTABLE_ID);
547 spdlOut->rowGroupDL(dlOut);
548 JobStepAssociation jsaOut;
549 jsaOut.outAdd(spdlOut);
550 jobInfo.annexStep->outputAssociation(jsaOut);
551
552 querySteps.push_back(jobInfo.annexStep);
553 dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get())->initialize(rg2, jobInfo);
554 deliverySteps[CNX_VTABLE_ID] = jobInfo.annexStep;
555 }
556
557 // Check if constant false
558 if (jobInfo.constantFalse)
559 {
560 TupleConstantBooleanStep* tcs = new TupleConstantBooleanStep(jobInfo, false);
561 tcs->outputAssociation(querySteps.back().get()->outputAssociation());
562 TupleDeliveryStep* tds =
563 dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
564 tcs->initialize(tds->getDeliveredRowGroup(), jobInfo);
565
566 JobStepVector::iterator it = querySteps.begin();
567
568 while (it != querySteps.end())
569 {
570 if ((dynamic_cast<TupleAggregateStep*>(it->get()) != NULL) ||
571 (dynamic_cast<TupleAnnexStep*>(it->get()) != NULL))
572 break;
573
574 it++;
575 }
576
577 SJSTEP bs(tcs);
578
579 if (it != querySteps.end())
580 tcs->outputAssociation((*it)->inputAssociation());
581 else
582 deliverySteps[CNX_VTABLE_ID] = bs;
583
584 querySteps.erase(querySteps.begin(), it);
585 querySteps.insert(querySteps.begin(), bs);
586 }
587
588 if (jobInfo.trace)
589 {
590 TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
591
592 if (ds) cout << "Delivered RowGroup: " << ds->getDeliveredRowGroup().toString() << endl;
593 }
594 }
595
596
597 // add the project steps into the query TBPS and construct the output rowgroup
addProjectStepsToBps(TableInfoMap::iterator & mit,BatchPrimitive * bps,JobInfo & jobInfo)598 void addProjectStepsToBps(TableInfoMap::iterator& mit, BatchPrimitive* bps, JobInfo& jobInfo)
599 {
600 // make sure we have a good tuple bps
601 if (bps == NULL)
602 throw runtime_error("BPS is null");
603
604 // construct a pcolstep for each joinkey to be projected
605 vector<uint32_t>& joinKeys = mit->second.fJoinKeys;
606 JobStepVector keySteps;
607 vector<uint32_t> fjKeys;
608
609 for (vector<uint32_t>::iterator kit = joinKeys.begin(); kit != joinKeys.end(); kit++)
610 {
611 if (jobInfo.keyInfo.get()->tupleKeyToTableOid[*kit] != CNX_EXP_TABLE_ID)
612 tupleKeyToProjectStep(*kit, keySteps, jobInfo);
613 else
614 fjKeys.push_back(*kit);
615 }
616
617 // construct pcolstep for columns in expresssions
618 JobStepVector expSteps;
619 vector<uint32_t>& exp1 = mit->second.fColsInExp1;
620
621 for (vector<uint32_t>::iterator kit = exp1.begin(); kit != exp1.end(); kit++)
622 tupleKeyToProjectStep(*kit, expSteps, jobInfo);
623
624 vector<uint32_t>& exp2 = mit->second.fColsInExp2;
625
626 for (vector<uint32_t>::iterator kit = exp2.begin(); kit != exp2.end(); kit++)
627 tupleKeyToProjectStep(*kit, expSteps, jobInfo);
628
629 vector<uint32_t>& expRet = mit->second.fColsInRetExp;
630
631 for (vector<uint32_t>::iterator kit = expRet.begin(); kit != expRet.end(); kit++)
632 tupleKeyToProjectStep(*kit, expSteps, jobInfo);
633
634 vector<uint32_t>& expOut = mit->second.fColsInOuter;
635
636 for (vector<uint32_t>::iterator kit = expOut.begin(); kit != expOut.end(); kit++)
637 tupleKeyToProjectStep(*kit, expSteps, jobInfo);
638
639 vector<uint32_t>& expFj = mit->second.fColsInFuncJoin;
640
641 for (vector<uint32_t>::iterator kit = expFj.begin(); kit != expFj.end(); kit++)
642 tupleKeyToProjectStep(*kit, expSteps, jobInfo);
643
644 // for output rowgroup
645 vector<uint32_t> pos;
646 vector<uint32_t> oids;
647 vector<uint32_t> keys;
648 vector<uint32_t> scale;
649 vector<uint32_t> precision;
650 vector<CalpontSystemCatalog::ColDataType> types;
651 vector<uint32_t> csNums;
652 pos.push_back(2);
653
654 // this psv is a copy of the project steps, the original vector in mit is not changed
655 JobStepVector psv = mit->second.fProjectSteps; // columns being selected
656 psv.insert(psv.begin(), keySteps.begin(), keySteps.end()); // add joinkeys to project
657 psv.insert(psv.end(), expSteps.begin(), expSteps.end()); // add expressions to project
658 set<uint32_t> seenCols; // columns already processed
659
660 // for passthru conversion
661 // passthru is disabled (default lastTupleId to -1) unless the TupleBPS::bop is BOP_AND.
662 uint64_t lastTupleId = -1;
663 TupleBPS* tbps = dynamic_cast<TupleBPS*>(bps);
664
665 if (tbps != NULL && tbps->getBOP() == BOP_AND && exp1.size() == 0)
666 lastTupleId = tbps->getLastTupleId();
667
668 for (JobStepVector::iterator it = psv.begin(); it != psv.end(); it++)
669 {
670 JobStep* js = it->get();
671 uint32_t tupleKey = js->tupleId();
672
673 if (seenCols.find(tupleKey) != seenCols.end())
674 continue;
675
676 // update processed column set
677 seenCols.insert(tupleKey);
678
679 // if the projected column is the last accessed predicate
680 pColStep* pcol = dynamic_cast<pColStep*>(js);
681
682 if (pcol != NULL && js->tupleId() == lastTupleId)
683 {
684 PassThruStep* pts = new PassThruStep(*pcol);
685
686 if (dynamic_cast<PseudoColStep*>(pcol))
687 pts->pseudoType(dynamic_cast<PseudoColStep*>(pcol)->pseudoColumnId());
688
689 pts->alias(pcol->alias());
690 pts->view(pcol->view());
691 pts->name(pcol->name());
692 pts->tupleId(pcol->tupleId());
693 it->reset(pts);
694 }
695
696 // add projected column to TBPS
697 bool tokenOnly = false;
698 map<uint32_t, bool>::iterator toIt = jobInfo.tokenOnly.find(js->tupleId());
699
700 if (toIt != jobInfo.tokenOnly.end())
701 tokenOnly = toIt->second;
702
703 if (it->get()->isDictCol() && !tokenOnly)
704 {
705 // if (jobInfo.trace && bps->tableOid() >= 3000)
706 // cout << "1 setting project BPP for " << tbps->toString() << " with " <<
707 // it->get()->toString() << " and " << (it+1)->get()->toString() << endl;
708 bps->setProjectBPP(it->get(), (it + 1)->get());
709
710 // this is a two-step project step, remove the token step from id vector
711 vector<uint32_t>& pjv = mit->second.fProjectCols;
712 uint32_t tokenKey = js->tupleId();
713
714 for (vector<uint32_t>::iterator i = pjv.begin(); i != pjv.end(); ++i)
715 {
716 if (*i == tokenKey)
717 {
718 pjv.erase(i);
719 break;
720 }
721 }
722
723 // move to the dictionary step
724 js = (++it)->get();
725 tupleKey = js->tupleId();
726 seenCols.insert(tupleKey);
727 }
728 else
729 {
730 // if (jobInfo.trace && bps->tableOid() >= 3000)
731 // cout << "2 setting project BPP for " << tbps->toString() << " with " <<
732 // it->get()->toString() << " and " << "NULL" << endl;
733 bps->setProjectBPP(it->get(), NULL);
734 }
735
736 // add the tuple info of the column into the RowGroup
737 TupleInfo ti(getTupleInfo(tupleKey, jobInfo));
738 pos.push_back(pos.back() + ti.width);
739 oids.push_back(ti.oid);
740 keys.push_back(ti.key);
741 types.push_back(ti.dtype);
742 csNums.push_back(ti.csNum);
743 scale.push_back(ti.scale);
744 precision.push_back(ti.precision);
745 }
746
747 // add function join columns
748 for (vector<uint32_t>::iterator i = fjKeys.begin(); i != fjKeys.end(); i++)
749 {
750 TupleInfo ti(getTupleInfo(*i, jobInfo));
751 pos.push_back(pos.back() + ti.width);
752 oids.push_back(ti.oid);
753 keys.push_back(ti.key);
754 types.push_back(ti.dtype);
755 csNums.push_back(ti.csNum);
756 scale.push_back(ti.scale);
757 precision.push_back(ti.precision);
758 }
759
760 // construct RowGroup
761 RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
762
763 // fix the output association
764 AnyDataListSPtr spdl(new AnyDataList());
765 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
766 spdl->rowGroupDL(dl);
767 dl->OID(mit->first);
768 JobStepAssociation jsa;
769 jsa.outAdd(spdl);
770 bps->outputAssociation(jsa);
771 bps->setOutputRowGroup(rg);
772 }
773
774
775 // add one-table expression steps into the query TBPS
addExpresssionStepsToBps(TableInfoMap::iterator & mit,SJSTEP & sjsp,JobInfo & jobInfo)776 void addExpresssionStepsToBps(TableInfoMap::iterator& mit, SJSTEP& sjsp, JobInfo& jobInfo)
777 {
778 BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(sjsp.get());
779 CalpontSystemCatalog::OID tableOid = mit->second.fTableOid;
780 JobStepVector& exps = mit->second.fOneTableExpSteps;
781 JobStepVector& fjs = mit->second.fFuncJoinExps;
782 ExpressionStep* exp0 = NULL;
783
784 if (exps.size() > 0)
785 exp0 = dynamic_cast<ExpressionStep*>(exps[0].get());
786 else
787 exp0 = dynamic_cast<ExpressionStep*>(fjs[0].get());
788
789 if (bps == NULL)
790 {
791 if (tableOid > 0)
792 {
793 uint32_t key0 = exp0->columnKey();
794 CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key0];
795 map<uint32_t, CalpontSystemCatalog::ColType>::iterator dkMit;
796
797 if (jobInfo.keyInfo->token2DictTypeMap.find(key0) !=
798 jobInfo.keyInfo->token2DictTypeMap.end())
799 ct = jobInfo.keyInfo->token2DictTypeMap[key0];
800
801 scoped_ptr<pColScanStep> pcss(
802 new pColScanStep(exp0->oid(), tableOid, ct, jobInfo));
803
804 sjsp.reset(new TupleBPS(*pcss, jobInfo));
805 TupleBPS* tbps = dynamic_cast<TupleBPS*>(sjsp.get());
806 tbps->setJobInfo(&jobInfo);
807 tbps->setFirstStepType(SCAN);
808
809 // add the first column to BPP's filterSteps
810 tbps->setBPP(pcss.get());
811
812 bps = tbps;
813 }
814 else
815 {
816 sjsp.reset(new CrossEngineStep(mit->second.fSchema,
817 mit->second.fName,
818 mit->second.fAlias,
819 jobInfo));
820
821 bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
822 }
823 }
824
825 // rowgroup for evaluating the one table expression
826 vector<uint32_t> pos;
827 vector<uint32_t> oids;
828 vector<uint32_t> keys;
829 vector<uint32_t> scale;
830 vector<uint32_t> precision;
831 vector<CalpontSystemCatalog::ColDataType> types;
832 vector<uint32_t> csNums;
833 pos.push_back(2);
834
835 vector<uint32_t> cols;
836 JobStepVector& fjExp = mit->second.fFuncJoinExps;
837
838 for (JobStepVector::iterator it = fjExp.begin(); it != fjExp.end(); it++)
839 {
840 ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
841 cols.push_back(getExpTupleKey(jobInfo, e->expressionId()));
842 }
843
844 cols.insert(cols.end(), mit->second.fColsInExp1.begin(), mit->second.fColsInExp1.end());
845 cols.insert(cols.end(), mit->second.fColsInFuncJoin.begin(), mit->second.fColsInFuncJoin.end());
846 uint32_t index = 0; // index in the rowgroup
847 map<uint32_t, uint32_t> keyToIndexMap; // maps key to the index in the RG
848
849 for (vector<uint32_t>::iterator kit = cols.begin(); kit != cols.end(); kit++)
850 {
851 uint32_t key = *kit;
852
853 if (jobInfo.keyInfo->dictKeyMap.find(key) != jobInfo.keyInfo->dictKeyMap.end())
854 key = jobInfo.keyInfo->dictKeyMap[key];
855
856 // check if this key is already in
857 if (keyToIndexMap.find(key) != keyToIndexMap.end())
858 continue;
859
860 // update processed column set
861 keyToIndexMap.insert(make_pair(key, index++));
862
863 // add the tuple info of the column into the RowGroup
864 TupleInfo ti(getTupleInfo(key, jobInfo));
865 pos.push_back(pos.back() + ti.width);
866 oids.push_back(ti.oid);
867 keys.push_back(ti.key);
868 types.push_back(ti.dtype);
869 csNums.push_back(ti.csNum);
870 scale.push_back(ti.scale);
871 precision.push_back(ti.precision);
872 }
873
874 // construct RowGroup and add to TBPS
875 RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
876 bps->setFE1Input(rg);
877
878 if (jobInfo.trace) cout << "FE1 input RowGroup: " << rg.toString() << endl << endl;
879
880 // add the expression steps into TBPS, the input-indices are set in SCs.
881 for (JobStepVector::iterator it = exps.begin(); it != exps.end(); it++)
882 {
883 ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
884
885 if (e->functionJoin())
886 continue;
887
888 e->updateInputIndex(keyToIndexMap, jobInfo);
889 boost::shared_ptr<ParseTree> sppt(new ParseTree);
890 sppt->copyTree(*(e->expressionFilter()));
891 bps->addFcnExpGroup1(sppt);
892 }
893
894 // add the function join expression steps into TBPS, too
895 if (fjs.size() > 0)
896 {
897 vector<SRCP> fjCols;
898
899 for (JobStepVector::iterator it = fjs.begin(); it != fjs.end(); it++)
900 {
901 ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
902
903 if (e->virtualStep())
904 continue;
905
906 e->updateInputIndex(keyToIndexMap, jobInfo);
907 e->updateOutputIndex(keyToIndexMap, jobInfo);
908 fjCols.push_back(e->expression());
909 }
910
911 bps->addFcnJoinExp(fjCols);
912 }
913 }
914
915
combineJobStepsByTable(TableInfoMap::iterator & mit,JobInfo & jobInfo)916 bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
917 {
918 TableInfo& tableInfo = mit->second;
919 JobStepVector& qsv = tableInfo.fQuerySteps;
920 JobStepVector newSteps; // store combined steps
921 RowGroup rgOut; // rowgroup of combined steps
922 CalpontSystemCatalog::OID tableOid = tableInfo.fTableOid;
923
924 if (tableOid != CNX_VTABLE_ID)
925 {
926 // real table
927 if (qsv.size() == 0)
928 {
929 // find a column in FE1, FE2, or FE3
930 uint32_t key = -1;
931
932 if (tableInfo.fColsInExp1.size() > 0)
933 key = tableInfo.fColsInExp1[0];
934 else if (tableInfo.fColsInExp2.size() > 0)
935 key = tableInfo.fColsInExp2[0];
936 else if (tableInfo.fColsInRetExp.size() > 0)
937 key = tableInfo.fColsInRetExp[0];
938 else if (tableInfo.fColsInOuter.size() > 0)
939 key = tableInfo.fColsInOuter[0];
940 else if (tableInfo.fColsInColMap.size() > 0)
941 key = tableInfo.fColsInColMap[0];
942 else
943 throw runtime_error("No query step");
944
945 // construct a pcolscanstep to initialize the tbps
946 CalpontSystemCatalog::OID oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
947 CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key];
948 map<uint32_t, CalpontSystemCatalog::ColType>::iterator dkMit;
949
950 if (jobInfo.keyInfo->token2DictTypeMap.find(key) !=
951 jobInfo.keyInfo->token2DictTypeMap.end())
952 ct = jobInfo.keyInfo->token2DictTypeMap[key];
953
954 SJSTEP sjs(new pColScanStep(oid, tableOid, ct, jobInfo));
955 sjs->alias(jobInfo.keyInfo->tupleKeyVec[key].fTable);
956 sjs->view(jobInfo.keyInfo->tupleKeyVec[key].fView);
957 sjs->schema(jobInfo.keyInfo->tupleKeyVec[key].fSchema);
958 sjs->name(jobInfo.keyInfo->keyName[key]);
959 sjs->tupleId(key);
960 qsv.push_back(sjs);
961 }
962
963 SJSTEP sjsp; // shared_ptr for the new BatchPrimitive
964 BatchPrimitive* bps = NULL; // pscan/pcol/filter/etc combined to
965 vector<DictionaryScanInfo> pdsVec; // pds for string filters
966 JobStepVector::iterator begin = qsv.begin();
967 JobStepVector::iterator end = qsv.end();
968 JobStepVector::iterator it = begin;
969
970 // make sure there is a pcolscan if there is a pcolstep
971 while (it != end)
972 {
973 if (typeid(*(it->get())) == typeid(pColScanStep))
974 break;
975
976 if (typeid(*(it->get())) == typeid(pColStep))
977 {
978 pColStep* pcs = dynamic_cast<pColStep*>(it->get());
979 (*it).reset(new pColScanStep(*pcs));
980 break;
981 }
982
983 it++;
984 }
985
986 // ---- predicates ----
987 // setup TBPS and dictionaryscan
988 it = begin;
989
990 while (it != end)
991 {
992 if (typeid(*(it->get())) == typeid(pColScanStep))
993 {
994 if (bps == NULL)
995 {
996 if (tableOid > 0)
997 {
998 sjsp.reset(new TupleBPS(*(dynamic_cast<pColScanStep*>(it->get())), jobInfo));
999 TupleBPS* tbps = dynamic_cast<TupleBPS*>(sjsp.get());
1000 tbps->setJobInfo(&jobInfo);
1001 tbps->setFirstStepType(SCAN);
1002 bps = tbps;
1003 }
1004 else
1005 {
1006 sjsp.reset(new CrossEngineStep(mit->second.fSchema,
1007 mit->second.fName,
1008 mit->second.fAlias,
1009 jobInfo));
1010 bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
1011 }
1012 }
1013 else
1014 {
1015 pColScanStep* pcss = dynamic_cast<pColScanStep*>(it->get());
1016 (*it).reset(new pColStep(*pcss));
1017 }
1018 }
1019
1020 unsigned itInc = 1; // iterator increase number
1021 unsigned numOfStepsAddToBps = 0; // # steps to be added into TBPS
1022
1023 if ((std::distance(it, end) > 2 &&
1024 dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
1025 (dynamic_cast<pColScanStep*>((it + 1)->get()) != NULL ||
1026 dynamic_cast<pColStep*>((it + 1)->get()) != NULL) &&
1027 dynamic_cast<TupleHashJoinStep*>((it + 2)->get()) != NULL) ||
1028 (std::distance(it, end) > 1 &&
1029 dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
1030 dynamic_cast<TupleHashJoinStep*>((it + 1)->get()) != NULL))
1031 {
1032 // string access predicate
1033 // setup pDictionaryScan
1034 pDictionaryScan* pds = dynamic_cast<pDictionaryScan*>(it->get());
1035 vector<uint32_t> pos;
1036 vector<uint32_t> oids;
1037 vector<uint32_t> keys;
1038 vector<uint32_t> scale;
1039 vector<uint32_t> precision;
1040 vector<CalpontSystemCatalog::ColDataType> types;
1041 vector<uint32_t> csNums;
1042 pos.push_back(2);
1043
1044 pos.push_back(2 + 8);
1045 CalpontSystemCatalog::OID coid = jobInfo.keyInfo->dictOidToColOid[pds->oid()];
1046 oids.push_back(coid);
1047 uint32_t keyId = pds->tupleId();
1048 keys.push_back(keyId);
1049 types.push_back(CalpontSystemCatalog::BIGINT);
1050 csNums.push_back(pds->colType().charsetNumber);
1051 scale.push_back(0);
1052 precision.push_back(0);
1053
1054 RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
1055
1056 if (jobInfo.trace) cout << "RowGroup pds(and): " << rg.toString() << endl;
1057
1058 pds->setOutputRowGroup(rg);
1059 newSteps.push_back(*it);
1060
1061 DictionaryScanInfo pdsInfo;
1062 pdsInfo.fTokenId = keyId;
1063 pdsInfo.fDl = pds->outputAssociation().outAt(0);
1064 pdsInfo.fRowGroup = rg;
1065 pdsVec.push_back(pdsInfo);
1066
1067 // save the token join to the last
1068 itInc = 1;
1069 numOfStepsAddToBps = 0;
1070 }
1071 else if (std::distance(begin, it) > 1 &&
1072 (dynamic_cast<pDictionaryScan*>((it - 1)->get()) != NULL ||
1073 dynamic_cast<pDictionaryScan*>((it - 2)->get()) != NULL) &&
1074 dynamic_cast<TupleHashJoinStep*>(it->get()) != NULL)
1075 {
1076 // save the token join to the last, by pdsInfo
1077 itInc = 1;
1078 numOfStepsAddToBps = 0;
1079 }
1080 else if (std::distance(it, end) > 2 &&
1081 dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
1082 dynamic_cast<FilterStep*>((it + 2)->get()) != NULL)
1083 {
1084 itInc = 3;
1085 numOfStepsAddToBps = 3;
1086 }
1087 else if (std::distance(it, end) > 3 &&
1088 dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
1089 dynamic_cast<pDictionaryStep*>((it + 2)->get()) != NULL &&
1090 dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
1091 {
1092 itInc = 4;
1093 numOfStepsAddToBps = 4;
1094 }
1095 else if (std::distance(it, end) > 3 &&
1096 dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
1097 dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
1098 dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
1099 {
1100 itInc = 4;
1101 numOfStepsAddToBps = 4;
1102 }
1103 else if (std::distance(it, end) > 4 &&
1104 dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
1105 dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
1106 dynamic_cast<pDictionaryStep*>((it + 3)->get()) != NULL &&
1107 dynamic_cast<FilterStep*>((it + 4)->get()) != NULL)
1108 {
1109 itInc = 5;
1110 numOfStepsAddToBps = 5;
1111 }
1112 else if (std::distance(it, end) > 1 &&
1113 (dynamic_cast<pColStep*>(it->get()) != NULL ||
1114 dynamic_cast<pColScanStep*>(it->get()) != NULL) &&
1115 dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL)
1116 {
1117 itInc = 2;
1118 numOfStepsAddToBps = 2;
1119 }
1120 else if (dynamic_cast<pColStep*>(it->get()) != NULL)
1121 {
1122 pColStep* pcol = dynamic_cast<pColStep*>(it->get());
1123
1124 if (pcol->getFilters().size() == 0)
1125 {
1126 // not an access predicate, pcol for token will be added later if necessary
1127 numOfStepsAddToBps = 0;
1128 }
1129 else
1130 {
1131 numOfStepsAddToBps = 1;
1132 }
1133
1134 itInc = 1;
1135 }
1136 else if (dynamic_cast<pColScanStep*>(it->get()) != NULL)
1137 {
1138 numOfStepsAddToBps = 1;
1139 itInc = 1;
1140 }
1141 else
1142 {
1143 // Not a combinable step, or step pattern not recognized.
1144 cerr << boldStart << "Try to combine " << typeid(*(it->get())).name() << ": "
1145 << it->get()->oid() << " into TBPS" << boldStop << endl;
1146 return false;
1147 }
1148
1149 // now add the steps into the TBPS
1150 if (numOfStepsAddToBps > 0 && bps == NULL)
1151 throw runtime_error("BPS not created 1");
1152
1153 for (unsigned i = 0; i < numOfStepsAddToBps; i++)
1154 {
1155 bps->setBPP((it + i)->get());
1156 bps->setStepCount();
1157 bps->setLastTupleId((it + i)->get()->tupleId());
1158 }
1159
1160 it += itInc;
1161 }
1162
1163 // add one-table expression steps to TBPS
1164 if (tableInfo.fOneTableExpSteps.size() > 0 || tableInfo.fFuncJoinExps.size() > 0)
1165 addExpresssionStepsToBps(mit, sjsp, jobInfo);
1166
1167 // add TBPS to the step vector
1168 newSteps.push_back(sjsp);
1169
1170 // ---- projects ----
1171 // now, add the joinkeys to the project step vector
1172 addProjectStepsToBps(mit, bps, jobInfo);
1173
1174 // rowgroup has the joinkeys and selected columns
1175 // this is the expected output of this table
1176 rgOut = bps->getOutputRowGroup();
1177
1178 // add token joins
1179 if (pdsVec.size() > 0)
1180 {
1181 // ---- token joins ----
1182 // construct a TupleHashJoinStep
1183 TupleBPS* tbps = dynamic_cast<TupleBPS*>(bps);
1184 TupleHashJoinStep* thjs = new TupleHashJoinStep(jobInfo);
1185 thjs->tableOid1(0);
1186 thjs->tableOid2(tableInfo.fTableOid);
1187 thjs->alias1(tableInfo.fAlias);
1188 thjs->alias2(tableInfo.fAlias);
1189 thjs->view1(tableInfo.fView);
1190 thjs->view2(tableInfo.fView);
1191 thjs->schema1(tableInfo.fSchema);
1192 thjs->schema2(tableInfo.fSchema);
1193 thjs->setLargeSideBPS(tbps);
1194 thjs->joinId(-1); // token join is a filter force it done before other joins
1195 thjs->setJoinType(INNER);
1196 thjs->tokenJoin(mit->first);
1197 tbps->incWaitToRunStepCnt();
1198 SJSTEP spthjs(thjs);
1199
1200 // rowgroup of the TBPS side
1201 // start with the expected output of the table, tokens will be appended
1202 RowGroup rgTbps = rgOut;
1203
1204 // input jobstepassociation
1205 // 1. small sides -- pdictionaryscan steps
1206 vector<RowGroup> rgPdsVec;
1207 map<uint32_t, uint32_t> addedCol;
1208 vector<JoinType> jointypes;
1209 vector<bool> typeless;
1210 vector<vector<uint32_t> > smallKeyIndices;
1211 vector<vector<uint32_t> > largeKeyIndices;
1212 vector<string> tableNames;
1213 JobStepAssociation inJsa;
1214
1215 for (vector<DictionaryScanInfo>::iterator i = pdsVec.begin(); i != pdsVec.end(); i++)
1216 {
1217 // add the token steps to the TBPS
1218 uint32_t tupleKey = i->fTokenId;
1219 map<uint32_t, uint32_t>::iterator k = addedCol.find(tupleKey);
1220 unsigned largeSideIndex = rgTbps.getColumnCount();
1221
1222 if (k == addedCol.end())
1223 {
1224 SJSTEP sjs(new pColStep(jobInfo.keyInfo->tupleKeyVec[tupleKey].fId,
1225 tableInfo.fTableOid,
1226 jobInfo.keyInfo->token2DictTypeMap[tupleKey],
1227 jobInfo));
1228 sjs->alias(tableInfo.fAlias);
1229 sjs->view(tableInfo.fView);
1230 sjs->schema(tableInfo.fSchema);
1231 sjs->name(jobInfo.keyInfo->keyName[tupleKey]);
1232 sjs->tupleId(tupleKey);
1233 bps->setProjectBPP(sjs.get(), NULL);
1234
1235 // Update info, which will be used to config the hashjoin later.
1236 rgTbps += i->fRowGroup;
1237 addedCol[tupleKey] = largeSideIndex;
1238 }
1239 else
1240 {
1241 largeSideIndex = k->second;
1242 }
1243
1244 inJsa.outAdd(i->fDl);
1245 tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[tupleKey].fTable);
1246 rgPdsVec.push_back(i->fRowGroup);
1247 jointypes.push_back(INNER);
1248 typeless.push_back(false);
1249 smallKeyIndices.push_back(vector<uint32_t>(1, 0));
1250 largeKeyIndices.push_back(vector<uint32_t>(1, largeSideIndex));
1251 }
1252
1253 // 2. large side
1254 if (jobInfo.trace) cout << "RowGroup bps(and): " << rgTbps.toString() << endl;
1255
1256 bps->setOutputRowGroup(rgTbps);
1257 inJsa.outAdd(bps->outputAssociation().outAt(0));
1258
1259 // set input jobstepassociation
1260 thjs->inputAssociation(inJsa);
1261 thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
1262
1263 // output jobstepassociation
1264 AnyDataListSPtr spdl(new AnyDataList());
1265 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
1266 spdl->rowGroupDL(dl);
1267 dl->OID(mit->first);
1268 JobStepAssociation jsaOut;
1269 jsaOut.outAdd(spdl);
1270 thjs->outputAssociation(jsaOut);
1271
1272 // config the tuplehashjoin
1273 thjs->configSmallSideRG(rgPdsVec, tableNames);
1274 thjs->configLargeSideRG(rgTbps);
1275 thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
1276 thjs->setOutputRowGroup(rgOut);
1277 newSteps.push_back(spthjs);
1278 }
1279 }
1280 else
1281 {
1282 // table derived from subquery
1283 SubQueryStep* subStep = NULL;
1284 SubAdapterStep* adaStep = NULL;
1285
1286 for (JobStepVector::iterator it = qsv.begin(); it != qsv.end(); it++)
1287 {
1288 if (((subStep = dynamic_cast<SubQueryStep*>(it->get())) != NULL) ||
1289 ((adaStep = dynamic_cast<SubAdapterStep*>(it->get())) != NULL))
1290 newSteps.push_back(*it);
1291 }
1292
1293 if (subStep == NULL && adaStep == NULL)
1294 throw runtime_error("No step for subquery.");
1295
1296 if (subStep)
1297 {
1298 rgOut = subStep->getOutputRowGroup();
1299 }
1300 else
1301 {
1302 // add one-table expression steps to the adapter
1303 if (tableInfo.fOneTableExpSteps.size() > 0)
1304 adaStep->addExpression(tableInfo.fOneTableExpSteps, jobInfo);
1305
1306 // add function join steps
1307 if (tableInfo.fFuncJoinExps.size() > 0)
1308 {
1309 // fe rowgroup info
1310 RowGroup feRg = adaStep->getFeRowGroup();
1311
1312 if (feRg.getColumnCount() == 0)
1313 feRg = adaStep->getOutputRowGroup();
1314
1315 const vector<uint32_t>& feKeys = feRg.getKeys();
1316 map<uint32_t, uint32_t> keyToIndexMapFe;
1317
1318 for (uint64_t i = 0; i < feKeys.size(); ++i)
1319 keyToIndexMapFe.insert(make_pair(feKeys[i], i));
1320
1321 // output rowgroup info
1322 const RowGroup& outRg = adaStep->getOutputRowGroup();
1323 const vector<uint32_t>& outKeys = outRg.getKeys();
1324 map<uint32_t, uint32_t> keyToIndexMapOut;
1325
1326 for (uint64_t i = 0; i < outKeys.size(); ++i)
1327 keyToIndexMapOut.insert(make_pair(outKeys[i], i));
1328
1329 // make sure the function join columns are present in the rgs
1330 vector<uint32_t> fjKeys;
1331 vector<SRCP> fjCols;
1332 TupleInfoVector tis;
1333 uint64_t lastFeIdx = feKeys.size();
1334 JobStepVector& fjs = tableInfo.fFuncJoinExps;
1335
1336 for (JobStepVector::iterator it = fjs.begin(); it != fjs.end(); it++)
1337 {
1338 ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
1339 TupleInfo ti = setExpTupleInfo(e->expression().get(), jobInfo);
1340
1341 if (find(feKeys.begin(), feKeys.end(), ti.key) == feKeys.end())
1342 {
1343 tis.push_back(ti);
1344 keyToIndexMapFe.insert(make_pair(ti.key, lastFeIdx++));
1345 }
1346
1347 e->updateInputIndex(keyToIndexMapFe, jobInfo);
1348 e->updateOutputIndex(keyToIndexMapFe, jobInfo);
1349 fjCols.push_back(e->expression());
1350 }
1351
1352 // additional fields in the rowgroup
1353 vector<uint32_t> pos;
1354 vector<uint32_t> oids;
1355 vector<uint32_t> keys;
1356 vector<uint32_t> scale;
1357 vector<uint32_t> precision;
1358 vector<CalpontSystemCatalog::ColDataType> types;
1359 vector<uint32_t> csNums;
1360 pos.push_back(2);
1361
1362 for (unsigned i = 0; i < tis.size(); i++)
1363 {
1364 pos.push_back(pos.back() + tis[i].width);
1365 oids.push_back(tis[i].oid);
1366 keys.push_back(tis[i].key);
1367 types.push_back(tis[i].dtype);
1368 csNums.push_back(tis[i].csNum);
1369 scale.push_back(tis[i].scale);
1370 precision.push_back(tis[i].precision);
1371 }
1372
1373 RowGroup addRg(oids.size(), pos, oids, keys, types, csNums, scale, precision,
1374 jobInfo.stringTableThreshold);
1375
1376 RowGroup feRg1 = feRg;
1377 RowGroup outRg1 = outRg;
1378
1379 if (addRg.getColumnCount() > 0)
1380 {
1381 feRg1 += addRg;
1382 outRg1 += addRg;
1383 }
1384
1385 adaStep->addFcnJoinExp(fjCols);
1386 adaStep->setFeRowGroup(feRg1);
1387 adaStep->setOutputRowGroup(outRg1);
1388 }
1389
1390 rgOut = adaStep->getOutputRowGroup();
1391 }
1392 }
1393
1394 tableInfo.fDl = newSteps.back()->outputAssociation().outAt(0);
1395 tableInfo.fRowGroup = rgOut;
1396
1397 if (jobInfo.trace)
1398 cout << "RowGroup for " << mit->first << " : " << mit->second.fRowGroup.toString() << endl;
1399
1400 qsv.swap(newSteps);
1401
1402 return true;
1403 }
1404
1405
addFunctionJoin(vector<uint32_t> & joinedTables,JobStepVector & joinSteps,set<uint32_t> & nodeSet,set<pair<uint32_t,uint32_t>> & pathSet,TableInfoMap & tableInfoMap,JobInfo & jobInfo)1406 bool addFunctionJoin(vector<uint32_t>& joinedTables, JobStepVector& joinSteps,
1407 set<uint32_t>& nodeSet, set<pair<uint32_t, uint32_t> >& pathSet,
1408 TableInfoMap& tableInfoMap, JobInfo& jobInfo)
1409 {
1410 // @bug3683, adding function joins for not joined tables, one pair at a time.
1411 // design review comment:
1412 // all convertable expressions between a pair of tables should be done all, or none.
1413 vector<JobStep*>::iterator i = jobInfo.functionJoins.begin(); // candidates
1414 set<pair<uint32_t, uint32_t> > functionJoinPairs; // pairs
1415 bool added = false; // new node added
1416
1417 // for function join tables' scope checking, not to try semi join inside subquery.
1418 set<uint32_t> tables; // tables to join
1419 tables.insert(jobInfo.tableList.begin(), jobInfo.tableList.end());
1420
1421 // table pairs to be joined by function joins
1422 TableJoinMap::iterator m1 = jobInfo.tableJoinMap.end();
1423 TableJoinMap::iterator m2 = jobInfo.tableJoinMap.end();
1424
1425 for (; (i != jobInfo.functionJoins.end()); i++)
1426 {
1427 ExpressionStep* es = dynamic_cast<ExpressionStep*>((*i));
1428 idbassert(es);
1429
1430 if (es->functionJoin())
1431 continue; // already converted to a join
1432
1433 boost::shared_ptr<FunctionJoinInfo> fji = es->functionJoinInfo();
1434 uint32_t key1 = fji->fJoinKey[0];
1435 uint32_t key2 = fji->fJoinKey[1];
1436 uint32_t tid1 = fji->fTableKey[0];
1437 uint32_t tid2 = fji->fTableKey[1];
1438
1439 if (nodeSet.find(tid1) != nodeSet.end() && nodeSet.find(tid2) != nodeSet.end())
1440 continue; // both connected, will be a cycle if added.
1441
1442 if (nodeSet.find(tid1) == nodeSet.end() && nodeSet.find(tid2) == nodeSet.end())
1443 continue; // both isolated, wait until one is connected.
1444
1445 if (tables.find(tid1) == tables.end() || tables.find(tid2) == tables.end())
1446 continue; // sub-query case
1447
1448 // one & only one is already connected
1449 pair<uint32_t, uint32_t> p(tid1, tid2);
1450
1451 if (functionJoinPairs.empty())
1452 {
1453 functionJoinPairs.insert(make_pair(tid1, tid2));
1454 functionJoinPairs.insert(make_pair(tid2, tid1));
1455 tableInfoMap[tid1].fAdjacentList.push_back(tid2);
1456 tableInfoMap[tid2].fAdjacentList.push_back(tid1);
1457
1458 if (find(joinedTables.begin(), joinedTables.end(), tid1) == joinedTables.end())
1459 {
1460 joinedTables.push_back(tid1);
1461 nodeSet.insert(tid1);
1462 pathSet.insert(make_pair(tid2, tid1));
1463 }
1464 else
1465 {
1466 joinedTables.push_back(tid2);
1467 nodeSet.insert(tid2);
1468 pathSet.insert(make_pair(tid1, tid2));
1469 }
1470
1471 added = true;
1472
1473 m1 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid1, tid2), JoinData()));
1474 m2 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid2, tid1), JoinData()));
1475
1476 if (m1 == jobInfo.tableJoinMap.end() || m2 == jobInfo.tableJoinMap.end())
1477 throw runtime_error("Bad table map.");
1478
1479 TupleInfo ti1 = getTupleInfo(key1, jobInfo);
1480 TupleInfo ti2 = getTupleInfo(key2, jobInfo);
1481
1482 if (ti1.dtype == CalpontSystemCatalog::CHAR
1483 || ti1.dtype == CalpontSystemCatalog::VARCHAR
1484 || ti1.dtype == CalpontSystemCatalog::TEXT)
1485 // || ti1.dtype == CalpontSystemCatalog::LONGDOUBLE)
1486 m1->second.fTypeless = m2->second.fTypeless = true; // ti2 is compatible
1487 else
1488 m1->second.fTypeless = m2->second.fTypeless = false;
1489 }
1490 else if (functionJoinPairs.find(p) == functionJoinPairs.end())
1491 {
1492 continue; // different path
1493 }
1494 else
1495 {
1496 // path already added, do compound join
1497 m1->second.fTypeless = m2->second.fTypeless = true;
1498 }
1499
1500 // simple or compound function join
1501 es->functionJoin(true);
1502 updateTableKey(key1, tid1, jobInfo);
1503 updateTableKey(key2, tid2, jobInfo);
1504
1505 tableInfoMap[tid1].fJoinKeys.push_back(key1);
1506 tableInfoMap[tid2].fJoinKeys.push_back(key2);
1507
1508 if (fji->fStep[0])
1509 tableInfoMap[tid1].fFuncJoinExps.push_back(fji->fStep[0]);
1510
1511 if (fji->fStep[1])
1512 tableInfoMap[tid2].fFuncJoinExps.push_back(fji->fStep[1]);
1513
1514 vector<uint32_t>& cols1 = tableInfoMap[tid1].fColsInFuncJoin;
1515 cols1.insert(cols1.end(), fji->fColumnKeys[0].begin(), fji->fColumnKeys[0].end());
1516 vector<uint32_t>& cols2 = tableInfoMap[tid2].fColsInFuncJoin;
1517 cols2.insert(cols2.end(), fji->fColumnKeys[1].begin(), fji->fColumnKeys[1].end());
1518
1519 // construct a join step
1520 TupleHashJoinStep* thjs = new TupleHashJoinStep(jobInfo);
1521 thjs->tableOid1(fji->fTableOid[0]);
1522 thjs->tableOid2(fji->fTableOid[1]);
1523 thjs->oid1(fji->fOid[0]);
1524 thjs->oid2(fji->fOid[1]);
1525 thjs->alias1(fji->fAlias[0]);
1526 thjs->alias2(fji->fAlias[1]);
1527 thjs->view1(fji->fView[0]);
1528 thjs->view2(fji->fView[1]);
1529 thjs->schema1(fji->fSchema[0]);
1530 thjs->schema2(fji->fSchema[1]);
1531 thjs->column1(fji->fExpression[0]);
1532 thjs->column2(fji->fExpression[1]);
1533 thjs->sequence1(fji->fSequence[0]);
1534 thjs->sequence2(fji->fSequence[1]);
1535 thjs->joinId(fji->fJoinId);
1536 thjs->setJoinType(fji->fJoinType);
1537 thjs->funcJoinInfo(fji);
1538 thjs->tupleId1(key1);
1539 thjs->tupleId2(key2);
1540 SJSTEP spjs(thjs);
1541
1542 // check correlated info
1543 JoinType joinType = fji->fJoinType;
1544
1545 if (!(joinType & CORRELATED))
1546 {
1547 joinSteps.push_back(spjs);
1548
1549 // Keep a map of the join (table, key) pairs
1550 m1->second.fLeftKeys.push_back(key1);
1551 m1->second.fRightKeys.push_back(key2);
1552
1553 m2->second.fLeftKeys.push_back(key2);
1554 m2->second.fRightKeys.push_back(key1);
1555
1556 // Keep a map of the join type between the keys.
1557 // OUTER join and SEMI/ANTI join are mutually exclusive.
1558 if (joinType == LEFTOUTER)
1559 {
1560 m1->second.fTypes.push_back(SMALLOUTER);
1561 m2->second.fTypes.push_back(LARGEOUTER);
1562 jobInfo.outerOnTable.insert(tid2);
1563 }
1564 else if (joinType == RIGHTOUTER)
1565 {
1566 m1->second.fTypes.push_back(LARGEOUTER);
1567 m2->second.fTypes.push_back(SMALLOUTER);
1568 jobInfo.outerOnTable.insert(tid1);
1569 }
1570 else if ((joinType & SEMI) &&
1571 ((joinType & LEFTOUTER) == LEFTOUTER || (joinType & RIGHTOUTER) == RIGHTOUTER))
1572 {
1573 // @bug3998, DML UPDATE borrows "SEMI" flag,
1574 // allowing SEMI and LARGEOUTER combination to support update with outer join.
1575 if ((joinType & LEFTOUTER) == LEFTOUTER)
1576 {
1577 joinType ^= LEFTOUTER;
1578 m1->second.fTypes.push_back(joinType);
1579 m2->second.fTypes.push_back(joinType | LARGEOUTER);
1580 jobInfo.outerOnTable.insert(tid2);
1581 }
1582 else
1583 {
1584 joinType ^= RIGHTOUTER;
1585 m1->second.fTypes.push_back(joinType | LARGEOUTER);
1586 m2->second.fTypes.push_back(joinType);
1587 jobInfo.outerOnTable.insert(tid1);
1588 }
1589 }
1590 else
1591 {
1592 m1->second.fTypes.push_back(joinType);
1593 m2->second.fTypes.push_back(joinType);
1594 }
1595
1596 // need id to keep the join order
1597 m1->second.fJoinId = m2->second.fJoinId = thjs->joinId();
1598 }
1599 else
1600 {
1601 // one of the tables is in outer query
1602 jobInfo.correlateSteps.push_back(spjs);
1603 }
1604 }
1605
1606 return added;
1607 }
1608
1609
spanningTreeCheck(TableInfoMap & tableInfoMap,JobStepVector joinSteps,JobInfo & jobInfo)1610 void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps, JobInfo& jobInfo)
1611 {
1612 bool spanningTree = true;
1613 unsigned errcode = 0;
1614 Message::Args args;
1615
1616 if (jobInfo.trace)
1617 {
1618 cout << "Table Connection:" << endl;
1619
1620 for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
1621 {
1622 cout << i->first << " :";
1623 vector<uint32_t>::iterator j = i->second.fAdjacentList.begin();
1624
1625 while (j != i->second.fAdjacentList.end())
1626 cout << " " << *j++;
1627
1628 cout << endl;
1629 }
1630
1631 cout << endl;
1632 }
1633
1634 if (tableInfoMap.size() < 1)
1635 {
1636 spanningTree = false;
1637 cerr << boldStart << "No table information." << boldStop << endl;
1638 throw logic_error("No table information.");
1639 }
1640 else if (tableInfoMap.size() > 1)
1641 {
1642 // 1a. make sure all tables are joined if not a single table query.
1643 set<uint32_t> nodeSet;
1644 set<pair<uint32_t, uint32_t> > pathSet;
1645 vector<uint32_t> joinedTables;
1646 joinedTables.push_back((tableInfoMap.begin())->first);
1647
1648 for (size_t i = 0; i < joinedTables.size(); i++)
1649 {
1650 vector<uint32_t>& v = tableInfoMap[joinedTables[i]].fAdjacentList;
1651 nodeSet.insert(joinedTables[i]);
1652
1653 for (vector<uint32_t>::iterator j = v.begin(); j != v.end(); j++)
1654 {
1655 if (nodeSet.find(*j) == nodeSet.end())
1656 joinedTables.push_back(*j);
1657
1658 nodeSet.insert(*j);
1659 pathSet.insert(make_pair(joinedTables[i], *j));
1660 }
1661 }
1662
1663 // 1b. convert expressions to function joins if not connected with simple column joins.
1664 bool fjAdded = false;
1665
1666 while (joinedTables.size() < tableInfoMap.size() &&
1667 addFunctionJoin(joinedTables, joinSteps, nodeSet, pathSet, tableInfoMap, jobInfo))
1668 {
1669 fjAdded = true;
1670
1671 for (size_t i = joinedTables.size() - 1; i < joinedTables.size(); i++)
1672 {
1673 vector<uint32_t>& v = tableInfoMap[joinedTables[i]].fAdjacentList;
1674
1675 for (vector<uint32_t>::iterator j = v.begin(); j != v.end(); j++)
1676 {
1677 if (find(joinedTables.begin(), joinedTables.end(), *j) == joinedTables.end())
1678 joinedTables.push_back(*j);
1679
1680 nodeSet.insert(*j);
1681 pathSet.insert(make_pair(joinedTables[i], *j));
1682 }
1683 }
1684 }
1685
1686
1687 if (jobInfo.trace && fjAdded)
1688 {
1689 cout << "Table Connection after adding function join:" << endl;
1690
1691 for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
1692 {
1693 cout << i->first << " :";
1694 vector<uint32_t>::iterator j = i->second.fAdjacentList.begin();
1695
1696 while (j != i->second.fAdjacentList.end())
1697 cout << " " << *j++;
1698
1699 cout << endl;
1700 }
1701
1702 cout << endl;
1703 }
1704
1705 // Check that join is compatible
1706 set<string> views1;
1707 set<string> tables1;
1708 string errStr;
1709
1710 vector<uint32_t>::iterator k = joinedTables.begin();
1711
1712 k = joinedTables.begin();
1713
1714 for (; k != joinedTables.end(); k++)
1715 {
1716 if (jobInfo.keyInfo->tupleKeyVec[*k].fView.empty())
1717 tables1.insert(jobInfo.keyInfo->tupleKeyToName[*k]);
1718 else
1719 views1.insert(jobInfo.keyInfo->tupleKeyVec[*k].fView);
1720
1721 if (jobInfo.incompatibleJoinMap.find(*k) != jobInfo.incompatibleJoinMap.end())
1722 {
1723 errcode = ERR_INCOMPATIBLE_JOIN;
1724
1725 uint32_t key2 = jobInfo.incompatibleJoinMap[*k];
1726
1727 if (jobInfo.keyInfo->tupleKeyVec[*k].fView.length() > 0)
1728 {
1729 string view2 = jobInfo.keyInfo->tupleKeyVec[key2].fView;
1730
1731 if (jobInfo.keyInfo->tupleKeyVec[*k].fView == view2)
1732 {
1733 // same view
1734 errStr += "Tables in '" + view2 + "' have";
1735 }
1736 else if (view2.empty())
1737 {
1738 // view and real table
1739 errStr += "'" + jobInfo.keyInfo->tupleKeyVec[*k].fView + "' and '" +
1740 jobInfo.keyInfo->tupleKeyToName[key2] + "' have";
1741 }
1742 else
1743 {
1744 // two views
1745 errStr += "'" + jobInfo.keyInfo->tupleKeyVec[*k].fView + "' and '" +
1746 view2 + "' have";
1747 }
1748 }
1749 else
1750 {
1751 string view2 = jobInfo.keyInfo->tupleKeyVec[key2].fView;
1752
1753 if (view2.empty())
1754 {
1755 // two real tables
1756 errStr += "'" + jobInfo.keyInfo->tupleKeyToName[*k] + "' and '" +
1757 jobInfo.keyInfo->tupleKeyToName[key2] + "' have";
1758 }
1759 else
1760 {
1761 // real table and view
1762 errStr += "'" + jobInfo.keyInfo->tupleKeyToName[*k] + "' and '" +
1763 view2 + "' have";
1764 }
1765 }
1766
1767 args.add(errStr);
1768 spanningTree = false;
1769 break;
1770 }
1771
1772 }
1773
1774 // 1c. check again if all tables are joined after pulling in function joins.
1775 if (joinedTables.size() < tableInfoMap.size())
1776 {
1777 vector<uint32_t> notJoinedTables;
1778
1779 for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
1780 {
1781 if (find(joinedTables.begin(), joinedTables.end(), i->first) == joinedTables.end())
1782 notJoinedTables.push_back(i->first);
1783 }
1784
1785 set<string> views2;
1786 set<string> tables2;
1787 k = notJoinedTables.begin();
1788
1789 for (; k != notJoinedTables.end(); k++)
1790 {
1791 if (jobInfo.keyInfo->tupleKeyVec[*k].fView.empty())
1792 tables2.insert(jobInfo.keyInfo->tupleKeyToName[*k]);
1793 else
1794 views2.insert(jobInfo.keyInfo->tupleKeyVec[*k].fView);
1795 }
1796
1797 if (errStr.empty())
1798 {
1799 errcode = ERR_MISS_JOIN;
1800
1801 // 1. check if all tables in a view are joined
1802 for (set<string>::iterator s = views1.begin(); s != views1.end(); s++)
1803 {
1804 if (views2.find(*s) != views2.end())
1805 {
1806 errStr = "Tables in '" + (*s) + "' are";
1807 }
1808 }
1809
1810 // 2. tables and views are joined
1811 if (errStr.empty())
1812 {
1813 string set1;
1814
1815 for (set<string>::iterator s = views1.begin(); s != views1.end(); s++)
1816 {
1817 if (set1.empty())
1818 set1 = "'";
1819 else
1820 set1 += ", ";
1821
1822 set1 += (*s);
1823 }
1824
1825 for (set<string>::iterator s = tables1.begin(); s != tables1.end(); s++)
1826 {
1827 if (set1.empty())
1828 set1 = "'";
1829 else
1830 set1 += ", ";
1831
1832 set1 += (*s);
1833 }
1834
1835 string set2;
1836
1837 for (set<string>::iterator s = views2.begin(); s != views2.end(); s++)
1838 {
1839 if (set2.empty())
1840 set2 = "'";
1841 else
1842 set2 += ", ";
1843
1844 set2 += (*s);
1845 }
1846
1847 for (set<string>::iterator s = tables2.begin(); s != tables2.end(); s++)
1848 {
1849 if (set2.empty())
1850 set2 = "'";
1851 else
1852 set2 += ", ";
1853
1854 set2 += (*s);
1855 }
1856
1857 errStr = set1 + "' and " + set2 + "' are";
1858 args.add(errStr);
1859 spanningTree = false;
1860 }
1861 }
1862
1863 }
1864
1865 // 2. no cycles
1866 if (spanningTree && (nodeSet.size() - pathSet.size() / 2 != 1))
1867 {
1868 errcode = ERR_CIRCULAR_JOIN;
1869 spanningTree = false;
1870 }
1871 }
1872
1873 if (!spanningTree)
1874 {
1875 cerr << boldStart << IDBErrorInfo::instance()->errorMsg(errcode, args) << boldStop << endl;
1876 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(errcode, args), errcode);
1877 }
1878 }
1879
1880
outjoinPredicateAdjust(TableInfoMap & tableInfoMap,JobInfo & jobInfo)1881 void outjoinPredicateAdjust(TableInfoMap& tableInfoMap, JobInfo& jobInfo)
1882 {
1883 set<uint32_t>::iterator i = jobInfo.outerOnTable.begin();
1884
1885 for (; i != jobInfo.outerOnTable.end(); i++)
1886 {
1887 // resetTableFilters(tableInfoMap[*i], jobInfo)
1888 TableInfo& tblInfo = tableInfoMap[*i];
1889
1890 if (tblInfo.fTableOid != CNX_VTABLE_ID)
1891 {
1892 JobStepVector::iterator k = tblInfo.fQuerySteps.begin();
1893 JobStepVector onClauseFilterSteps; //@bug5887, 5311
1894
1895 for (; k != tblInfo.fQuerySteps.end(); k++)
1896 {
1897 if ((*k)->onClauseFilter())
1898 {
1899 onClauseFilterSteps.push_back(*k);
1900 continue;
1901 }
1902
1903 uint32_t colKey = -1;
1904 pColStep* pcs = dynamic_cast<pColStep*>(k->get());
1905 pColScanStep* pcss = dynamic_cast<pColScanStep*>(k->get());
1906 pDictionaryScan* pdss = dynamic_cast<pDictionaryScan*>(k->get());
1907 pDictionaryStep* pdsp = dynamic_cast<pDictionaryStep*>(k->get());
1908 vector<const execplan::Filter*>* filters = NULL;
1909 int8_t bop = -1;
1910
1911 if (pcs != NULL)
1912 {
1913 filters = &(pcs->getFilters());
1914 bop = pcs->BOP();
1915 colKey = pcs->tupleId();
1916 }
1917 else if (pcss != NULL)
1918 {
1919 filters = &(pcss->getFilters());
1920 bop = pcss->BOP();
1921 colKey = pcss->tupleId();
1922 }
1923 else if (pdss != NULL)
1924 {
1925 filters = &(pdss->getFilters());
1926 bop = pdss->BOP();
1927 colKey = pdss->tupleId();
1928 }
1929 else if (pdsp != NULL)
1930 {
1931 filters = &(pdsp->getFilters());
1932 bop = pdsp->BOP();
1933 colKey = pdsp->tupleId();
1934 }
1935
1936 if (filters != NULL && filters->size() > 0)
1937 {
1938 ParseTree* pt = new ParseTree((*filters)[0]->clone());
1939
1940 for (size_t i = 1; i < filters->size(); i++)
1941 {
1942 ParseTree* left = pt;
1943 ParseTree* right =
1944 new ParseTree((*filters)[i]->clone());
1945 ParseTree* op = (BOP_OR == bop) ?
1946 new ParseTree(new LogicOperator("or")) :
1947 new ParseTree(new LogicOperator("and"));
1948 op->left(left);
1949 op->right(right);
1950
1951 pt = op;
1952 }
1953
1954 ExpressionStep* es = new ExpressionStep(jobInfo);
1955
1956 if (es == NULL)
1957 throw runtime_error ("Failed to new ExpressionStep 2");
1958
1959 es->expressionFilter(pt, jobInfo);
1960 SJSTEP sjstep(es);
1961 jobInfo.outerJoinExpressions.push_back(sjstep);
1962 tblInfo.fColsInOuter.push_back(colKey);
1963
1964 delete pt;
1965 }
1966 }
1967
1968 // Do not apply the primitive filters if there is an "IS NULL" in where clause.
1969 if (jobInfo.tableHasIsNull.find(*i) != jobInfo.tableHasIsNull.end())
1970 tblInfo.fQuerySteps = onClauseFilterSteps;
1971 }
1972
1973 jobInfo.outerJoinExpressions.insert(jobInfo.outerJoinExpressions.end(),
1974 tblInfo.fOneTableExpSteps.begin(), tblInfo.fOneTableExpSteps.end());
1975 tblInfo.fOneTableExpSteps.clear();
1976
1977 tblInfo.fColsInOuter.insert(tblInfo.fColsInOuter.end(),
1978 tblInfo.fColsInExp1.begin(), tblInfo.fColsInExp1.end());
1979 }
1980 }
1981
1982
getLargestTable(JobInfo & jobInfo,TableInfoMap & tableInfoMap,bool overrideLargeSideEstimate)1983 uint32_t getLargestTable(JobInfo& jobInfo, TableInfoMap& tableInfoMap, bool overrideLargeSideEstimate)
1984 {
1985 // Subquery in FROM clause assumptions:
1986 // hint will be ignored, if the 1st table in FROM clause is a derived table.
1987 if (jobInfo.keyInfo->tupleKeyVec[jobInfo.tableList[0]].fId < 3000)
1988 overrideLargeSideEstimate = false;
1989
1990 // Bug 2123. Added logic to dynamically determine the large side table unless the SQL statement
1991 // contained a hint saying to skip the estimation and use the FIRST table in the from clause.
1992 // Prior to this, we were using the LAST table in the from clause. We switched it as there
1993 // were some outer join sqls that couldn't be written with the large table last.
1994 // Default to the first table in the from clause if:
1995 // the user set the hint; or
1996 // there is only one table in the query.
1997 uint32_t ret = jobInfo.tableList.front();
1998
1999 if (jobInfo.tableList.size() <= 1)
2000 {
2001 return ret;
2002 }
2003
2004 // Algorithm to dynamically determine the largest table.
2005 uint64_t largestCardinality = 0;
2006 uint64_t estimatedRowCount = 0;
2007
2008 // Loop through the tables and find the one with the largest estimated cardinality.
2009 for (uint32_t i = 0; i < jobInfo.tableList.size(); i++)
2010 {
2011 jobInfo.tableSize[jobInfo.tableList[i]] = 0;
2012 TableInfoMap::iterator it = tableInfoMap.find(jobInfo.tableList[i]);
2013
2014 if (it != tableInfoMap.end())
2015 {
2016 // @Bug 3771. Loop through the query steps until the tupleBPS is found instead of
2017 // just looking at the first one. Tables in the query that included a filter on a
2018 // dictionary column were not getting their row count estimated.
2019 for (JobStepVector::iterator jsIt = it->second.fQuerySteps.begin();
2020 jsIt != it->second.fQuerySteps.end(); jsIt++)
2021 {
2022 TupleBPS* tupleBPS = dynamic_cast<TupleBPS*>((*jsIt).get());
2023
2024 if (tupleBPS != NULL)
2025 {
2026 estimatedRowCount = tupleBPS->getEstimatedRowCount();
2027 jobInfo.tableSize[jobInfo.tableList[i]] = estimatedRowCount;
2028
2029 if (estimatedRowCount > largestCardinality)
2030 {
2031 ret = jobInfo.tableList[i];
2032 largestCardinality = estimatedRowCount;
2033 }
2034
2035 break;
2036 }
2037 }
2038 }
2039 }
2040
2041 // select /*! INFINIDB_ORDERED */
2042 if (overrideLargeSideEstimate)
2043 {
2044 ret = jobInfo.tableList.front();
2045 jobInfo.tableSize[ret] = numeric_limits<uint64_t>::max();
2046 }
2047
2048 return ret;
2049 }
2050
2051
getPrevLarge(uint32_t n,TableInfoMap & tableInfoMap)2052 uint32_t getPrevLarge(uint32_t n, TableInfoMap& tableInfoMap)
2053 {
2054 // root node : no previous node;
2055 // other node: only one immediate previous node;
2056 int prev = -1;
2057 vector<uint32_t>& adjList = tableInfoMap[n].fAdjacentList;
2058
2059 for (vector<uint32_t>::iterator i = adjList.begin(); i != adjList.end() && prev < 0; i++)
2060 {
2061 if (tableInfoMap[*i].fVisited == true)
2062 prev = *i;
2063 }
2064
2065 return prev;
2066 }
2067
2068
getKeyIndex(uint32_t key,const RowGroup & rg)2069 uint32_t getKeyIndex(uint32_t key, const RowGroup& rg)
2070 {
2071 vector<uint32_t>::const_iterator i = rg.getKeys().begin();
2072
2073 for (; i != rg.getKeys().end(); ++i)
2074 if (key == *i)
2075 break;
2076
2077 if (i == rg.getKeys().end())
2078 throw runtime_error("No key found.");
2079
2080 return std::distance(rg.getKeys().begin(), i);
2081 }
2082
2083
joinInfoCompare(const SP_JoinInfo & a,const SP_JoinInfo & b)2084 bool joinInfoCompare(const SP_JoinInfo& a, const SP_JoinInfo& b)
2085 {
2086 return (a->fJoinData.fJoinId < b->fJoinData.fJoinId);
2087 }
2088
2089
joinTypeToString(const JoinType & joinType)2090 string joinTypeToString(const JoinType& joinType)
2091 {
2092 string ret;
2093
2094 if (joinType & INNER)
2095 ret = "inner";
2096 else if (joinType & LARGEOUTER)
2097 ret = "largeOuter";
2098 else if (joinType & SMALLOUTER)
2099 ret = "smallOuter";
2100
2101 if (joinType & SEMI)
2102 ret += "+semi";
2103
2104 if (joinType & ANTI)
2105 ret += "+ant";
2106
2107 if (joinType & SCALAR)
2108 ret += "+scalar";
2109
2110 if (joinType & MATCHNULLS)
2111 ret += "+matchnulls";
2112
2113 if (joinType & WITHFCNEXP)
2114 ret += "+exp";
2115
2116 if (joinType & CORRELATED)
2117 ret += "+correlated";
2118
2119 return ret;
2120 }
2121
2122
joinToLargeTable(uint32_t large,TableInfoMap & tableInfoMap,JobInfo & jobInfo,vector<uint32_t> & joinOrder)2123 SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap,
2124 JobInfo& jobInfo, vector<uint32_t>& joinOrder)
2125 {
2126 vector<SP_JoinInfo> smallSides;
2127 tableInfoMap[large].fVisited = true;
2128 tableInfoMap[large].fJoinedTables.insert(large);
2129 set<uint32_t>& tableSet = tableInfoMap[large].fJoinedTables;
2130 vector<uint32_t>& adjList = tableInfoMap[large].fAdjacentList;
2131 uint32_t prevLarge = (uint32_t) getPrevLarge(large, tableInfoMap);
2132 bool root = (prevLarge == (uint32_t) - 1) ? true : false;
2133 uint32_t link = large;
2134 uint32_t cId = -1;
2135
2136 // Get small sides ready.
2137 for (vector<uint32_t>::iterator i = adjList.begin(); i != adjList.end(); i++)
2138 {
2139 if (tableInfoMap[*i].fVisited == false)
2140 {
2141 cId = *i;
2142 smallSides.push_back(joinToLargeTable(*i, tableInfoMap, jobInfo, joinOrder));
2143
2144 tableSet.insert(tableInfoMap[*i].fJoinedTables.begin(),
2145 tableInfoMap[*i].fJoinedTables.end());
2146 }
2147 }
2148
2149 // Join with its small sides, if not a leaf node.
2150 if (smallSides.size() > 0)
2151 {
2152 // non-leaf node, need a join
2153 SJSTEP spjs = tableInfoMap[large].fQuerySteps.back();
2154 BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(spjs.get());
2155 SubAdapterStep* tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
2156 TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
2157
2158 // @bug6158, try to put BPS on large side if possible
2159 if (tsas && smallSides.size() == 1)
2160 {
2161 SJSTEP sspjs = tableInfoMap[cId].fQuerySteps.back(), get();
2162 BatchPrimitive* sbps = dynamic_cast<BatchPrimitive*>(sspjs.get());
2163 TupleHashJoinStep* sthjs = dynamic_cast<TupleHashJoinStep*>(sspjs.get());
2164
2165 if (sbps || (sthjs && sthjs->tokenJoin() == cId))
2166 {
2167 SP_JoinInfo largeJoinInfo(new JoinInfo);
2168 largeJoinInfo->fTableOid = tableInfoMap[large].fTableOid;
2169 largeJoinInfo->fAlias = tableInfoMap[large].fAlias;
2170 largeJoinInfo->fView = tableInfoMap[large].fView;
2171 largeJoinInfo->fSchema = tableInfoMap[large].fSchema;
2172
2173 largeJoinInfo->fDl = tableInfoMap[large].fDl;
2174 largeJoinInfo->fRowGroup = tableInfoMap[large].fRowGroup;
2175
2176 TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(large, cId));
2177
2178 if (mit == jobInfo.tableJoinMap.end())
2179 throw runtime_error("Join step not found.");
2180
2181 largeJoinInfo->fJoinData = mit->second;
2182
2183 // switch large and small sides
2184 joinOrder.back() = large;
2185 large = cId;
2186 smallSides[0] = largeJoinInfo;
2187 tableInfoMap[large].fJoinedTables = tableSet;
2188
2189 bps = sbps;
2190 thjs = sthjs;
2191 tsas = NULL;
2192 }
2193 }
2194
2195 if (!bps && !thjs && !tsas)
2196 {
2197 if (dynamic_cast<SubQueryStep*>(spjs.get()))
2198 throw IDBExcept(ERR_NON_SUPPORT_SUB_QUERY_TYPE);
2199
2200 throw runtime_error("Not supported join.");
2201 }
2202
2203 size_t dcf = 0; // for dictionary column filters, 0 if thjs is null.
2204 RowGroup largeSideRG = tableInfoMap[large].fRowGroup;
2205
2206 if (thjs && thjs->tokenJoin() == large)
2207 {
2208 dcf = thjs->getLargeKeys().size();
2209 largeSideRG = thjs->getLargeRowGroup();
2210 }
2211
2212 // info for debug trace
2213 vector<string> tableNames;
2214 vector<string> traces;
2215
2216 // sort the smallsides base on the joinId
2217 sort(smallSides.begin(), smallSides.end(), joinInfoCompare);
2218 int64_t lastJoinId = smallSides.back()->fJoinData.fJoinId;
2219
2220 // get info to config the TupleHashjoin
2221 DataListVec smallSideDLs;
2222 vector<RowGroup> smallSideRGs;
2223 vector<JoinType> jointypes;
2224 vector<bool> typeless;
2225 vector<vector<uint32_t> > smallKeyIndices;
2226 vector<vector<uint32_t> > largeKeyIndices;
2227
2228 for (vector<SP_JoinInfo>::iterator i = smallSides.begin(); i != smallSides.end(); i++)
2229 {
2230 JoinInfo* info = i->get();
2231 smallSideDLs.push_back(info->fDl);
2232 smallSideRGs.push_back(info->fRowGroup);
2233 jointypes.push_back(info->fJoinData.fTypes[0]);
2234 typeless.push_back(info->fJoinData.fTypeless);
2235
2236 vector<uint32_t> smallIndices;
2237 vector<uint32_t> largeIndices;
2238 const vector<uint32_t>& keys1 = info->fJoinData.fLeftKeys;
2239 const vector<uint32_t>& keys2 = info->fJoinData.fRightKeys;
2240 vector<uint32_t>::const_iterator k1 = keys1.begin();
2241 vector<uint32_t>::const_iterator k2 = keys2.begin();
2242 uint32_t stid = getTableKey(jobInfo, *k1);
2243 tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[stid].fTable);
2244
2245 for (; k1 != keys1.end(); ++k1, ++k2)
2246 {
2247 smallIndices.push_back(getKeyIndex(*k1, info->fRowGroup));
2248 largeIndices.push_back(getKeyIndex(*k2, largeSideRG));
2249 }
2250
2251 smallKeyIndices.push_back(smallIndices);
2252 largeKeyIndices.push_back(largeIndices);
2253
2254 if (jobInfo.trace)
2255 {
2256 // small side column
2257 ostringstream smallKey, smallIndex;
2258 string alias1 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys1.front())];
2259 smallKey << alias1 << "-";
2260
2261 for (k1 = keys1.begin(); k1 != keys1.end(); ++k1)
2262 {
2263 CalpontSystemCatalog::OID oid1 = jobInfo.keyInfo->tupleKeyVec[*k1].fId;
2264 CalpontSystemCatalog::TableColName tcn1 = jobInfo.csc->colName(oid1);
2265 smallKey << "(" << tcn1.column << ":" << oid1 << ":" << *k1 << ")";
2266 smallIndex << " " << getKeyIndex(*k1, info->fRowGroup);
2267 }
2268
2269 // large side column
2270 ostringstream largeKey, largeIndex;
2271 string alias2 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys2.front())];
2272 largeKey << alias2 << "-";
2273
2274 for (k2 = keys2.begin(); k2 != keys2.end(); ++k2)
2275 {
2276 CalpontSystemCatalog::OID oid2 = jobInfo.keyInfo->tupleKeyVec[*k2].fId;
2277 CalpontSystemCatalog::TableColName tcn2 = jobInfo.csc->colName(oid2);
2278 largeKey << "(" << tcn2.column << ":" << oid2 << ":" << *k2 << ")";
2279 largeIndex << " " << getKeyIndex(*k2, largeSideRG);
2280 }
2281
2282 ostringstream oss;
2283 oss << smallKey.str() << " join on " << largeKey.str()
2284 << " joinType: " << info->fJoinData.fTypes.front()
2285 << "(" << joinTypeToString(info->fJoinData.fTypes.front()) << ")"
2286 << (info->fJoinData.fTypeless ? " " : " !") << "typeless" << endl;
2287 oss << "smallSideIndex-largeSideIndex :" << smallIndex.str() << " --"
2288 << largeIndex.str() << endl;
2289 oss << "small side RG" << endl << info->fRowGroup.toString() << endl;
2290 traces.push_back(oss.str());
2291 }
2292 }
2293
2294 if (jobInfo.trace)
2295 {
2296 ostringstream oss;
2297 oss << "large side RG" << endl << largeSideRG.toString() << endl;
2298 traces.push_back(oss.str());
2299 }
2300
2301 if (bps || tsas)
2302 {
2303 thjs = new TupleHashJoinStep(jobInfo);
2304 thjs->tableOid1(smallSides[0]->fTableOid);
2305 thjs->tableOid2(tableInfoMap[large].fTableOid);
2306 thjs->alias1(smallSides[0]->fAlias);
2307 thjs->alias2(tableInfoMap[large].fAlias);
2308 thjs->view1(smallSides[0]->fView);
2309 thjs->view2(tableInfoMap[large].fView);
2310 thjs->schema1(smallSides[0]->fSchema);
2311 thjs->schema2(tableInfoMap[large].fSchema);
2312 thjs->setLargeSideBPS(bps);
2313 thjs->joinId(lastJoinId);
2314
2315 if (dynamic_cast<TupleBPS*>(bps) != NULL)
2316 bps->incWaitToRunStepCnt();
2317
2318 SJSTEP spjs(thjs);
2319
2320 JobStepAssociation inJsa;
2321 inJsa.outAdd(smallSideDLs, 0);
2322 inJsa.outAdd(tableInfoMap[large].fDl);
2323 thjs->inputAssociation(inJsa);
2324 thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
2325
2326 JobStepAssociation outJsa;
2327 AnyDataListSPtr spdl(new AnyDataList());
2328 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
2329 spdl->rowGroupDL(dl);
2330 dl->OID(large);
2331 outJsa.outAdd(spdl);
2332 thjs->outputAssociation(outJsa);
2333
2334 thjs->configSmallSideRG(smallSideRGs, tableNames);
2335 thjs->configLargeSideRG(tableInfoMap[large].fRowGroup);
2336 thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
2337
2338 tableInfoMap[large].fQuerySteps.push_back(spjs);
2339 tableInfoMap[large].fDl = spdl;
2340 }
2341 else
2342 {
2343 JobStepAssociation inJsa = thjs->inputAssociation();
2344
2345 if (inJsa.outSize() < 2)
2346 throw runtime_error("Not enough input to a hashjoin.");
2347
2348 size_t last = inJsa.outSize() - 1;
2349 inJsa.outAdd(smallSideDLs, last);
2350 thjs->inputAssociation(inJsa);
2351 thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
2352
2353 thjs->addSmallSideRG(smallSideRGs, tableNames);
2354 thjs->addJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
2355 }
2356
2357 RowGroup rg;
2358 constructJoinedRowGroup(rg, link, prevLarge, root, tableSet, tableInfoMap, jobInfo);
2359 thjs->setOutputRowGroup(rg);
2360 tableInfoMap[large].fRowGroup = rg;
2361
2362 if (jobInfo.trace)
2363 {
2364 cout << boldStart << "\n====== join info ======\n" << boldStop;
2365
2366 for (vector<string>::iterator t = traces.begin(); t != traces.end(); ++t)
2367 cout << *t;
2368
2369 cout << "RowGroup join result: " << endl << rg.toString() << endl << endl;
2370 }
2371
2372 // check if any cross-table expressions can be evaluated after the join
2373 JobStepVector readyExpSteps;
2374 JobStepVector& expSteps = jobInfo.crossTableExpressions;
2375 JobStepVector::iterator eit = expSteps.begin();
2376
2377 while (eit != expSteps.end())
2378 {
2379 ExpressionStep* exp = dynamic_cast<ExpressionStep*>(eit->get());
2380
2381 if (exp == NULL)
2382 throw runtime_error("Not an expression.");
2383
2384 if (exp->functionJoin())
2385 {
2386 eit++;
2387 continue; // done as join
2388 }
2389
2390 const vector<uint32_t>& tables = exp->tableKeys();
2391 uint64_t i = 0;
2392
2393 for (; i < tables.size(); i++)
2394 {
2395 if (tableSet.find(tables[i]) == tableSet.end())
2396 break;
2397 }
2398
2399 // all tables for this expression are joined
2400 if (tables.size() == i)
2401 {
2402 readyExpSteps.push_back(*eit);
2403 eit = expSteps.erase(eit);
2404 }
2405 else
2406 {
2407 eit++;
2408 }
2409 }
2410
2411 // if root, handle the delayed outer join filters
2412 if (root && jobInfo.outerJoinExpressions.size() > 0)
2413 readyExpSteps.insert(readyExpSteps.end(),
2414 jobInfo.outerJoinExpressions.begin(),
2415 jobInfo.outerJoinExpressions.end());
2416
2417 // check additional compares for semi-join
2418 if (readyExpSteps.size() > 0)
2419 {
2420 map<uint32_t, uint32_t> keyToIndexMap; // map keys to the indices in the RG
2421
2422 for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
2423 keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
2424
2425 // tables have additional comparisons
2426 map<uint32_t, int> correlateTables; // index in thjs
2427 map<uint32_t, ParseTree*> correlateCompare; // expression
2428
2429 for (size_t i = 0; i != smallSides.size(); i++)
2430 {
2431 if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
2432 {
2433 uint32_t tid = getTableKey(jobInfo,
2434 smallSides[i]->fTableOid,
2435 smallSides[i]->fAlias,
2436 smallSides[i]->fSchema,
2437 smallSides[i]->fView);
2438 correlateTables[tid] = i;
2439 correlateCompare[tid] = NULL;
2440 }
2441 }
2442
2443 if (correlateTables.size() > 0)
2444 {
2445 // separate additional compare for each table pair
2446 JobStepVector::iterator eit = readyExpSteps.begin();
2447
2448 while (eit != readyExpSteps.end())
2449 {
2450 ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
2451
2452 if (e->selectFilter())
2453 {
2454 // @bug3780, leave select filter to normal expression
2455 eit++;
2456 continue;
2457 }
2458
2459 const vector<uint32_t>& tables = e->tableKeys();
2460 map<uint32_t, int>::iterator j = correlateTables.end();
2461
2462 for (size_t i = 0; i < tables.size(); i++)
2463 {
2464 j = correlateTables.find(tables[i]);
2465
2466 if (j != correlateTables.end())
2467 break;
2468 }
2469
2470 if (j == correlateTables.end())
2471 {
2472 eit++;
2473 continue;
2474 }
2475
2476 // map the input column index
2477 e->updateInputIndex(keyToIndexMap, jobInfo);
2478 ParseTree* pt = correlateCompare[j->first];
2479
2480 if (pt == NULL)
2481 {
2482 // first expression
2483 pt = new ParseTree;
2484 pt->copyTree(*(e->expressionFilter()));
2485 }
2486 else
2487 {
2488 // combine the expressions
2489 ParseTree* left = pt;
2490 ParseTree* right = new ParseTree;
2491 right->copyTree(*(e->expressionFilter()));
2492 pt = new ParseTree(new LogicOperator("and"));
2493 pt->left(left);
2494 pt->right(right);
2495 }
2496
2497 correlateCompare[j->first] = pt;
2498 eit = readyExpSteps.erase(eit);
2499 }
2500
2501 map<uint32_t, int>::iterator k = correlateTables.begin();
2502
2503 while (k != correlateTables.end())
2504 {
2505 ParseTree* pt = correlateCompare[k->first];
2506
2507 if (pt != NULL)
2508 {
2509 boost::shared_ptr<ParseTree> sppt(pt);
2510 thjs->addJoinFilter(sppt, dcf + k->second);
2511 }
2512
2513 k++;
2514 }
2515
2516 thjs->setJoinFilterInputRG(rg);
2517 }
2518
2519 // normal expression if any
2520 if (readyExpSteps.size() > 0)
2521 {
2522 // add the expression steps in where clause can be solved by this join to bps
2523 ParseTree* pt = NULL;
2524 JobStepVector::iterator eit = readyExpSteps.begin();
2525
2526 for (; eit != readyExpSteps.end(); eit++)
2527 {
2528 // map the input column index
2529 ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
2530 e->updateInputIndex(keyToIndexMap, jobInfo);
2531
2532 if (pt == NULL)
2533 {
2534 // first expression
2535 pt = new ParseTree;
2536 pt->copyTree(*(e->expressionFilter()));
2537 }
2538 else
2539 {
2540 // combine the expressions
2541 ParseTree* left = pt;
2542 ParseTree* right = new ParseTree;
2543 right->copyTree(*(e->expressionFilter()));
2544 pt = new ParseTree(new LogicOperator("and"));
2545 pt->left(left);
2546 pt->right(right);
2547 }
2548 }
2549
2550 boost::shared_ptr<ParseTree> sppt(pt);
2551 thjs->addFcnExpGroup2(sppt);
2552 }
2553
2554 // update the fColsInExp2 and construct the output RG
2555 updateExp2Cols(readyExpSteps, tableInfoMap, jobInfo);
2556 constructJoinedRowGroup(rg, link, prevLarge, root, tableSet, tableInfoMap, jobInfo);
2557
2558 if (thjs->hasFcnExpGroup2())
2559 thjs->setFE23Output(rg);
2560 else
2561 thjs->setOutputRowGroup(rg);
2562
2563 tableInfoMap[large].fRowGroup = rg;
2564
2565 if (jobInfo.trace)
2566 {
2567 cout << "RowGroup of " << tableInfoMap[large].fAlias << " after EXP G2: " << endl
2568 << rg.toString() << endl << endl;
2569 }
2570 }
2571 }
2572
2573
2574 // Prepare the current table info to join with its large side.
2575 SP_JoinInfo joinInfo(new JoinInfo);
2576 joinInfo->fTableOid = tableInfoMap[large].fTableOid;
2577 joinInfo->fAlias = tableInfoMap[large].fAlias;
2578 joinInfo->fView = tableInfoMap[large].fView;
2579 joinInfo->fSchema = tableInfoMap[large].fSchema;
2580
2581 joinInfo->fDl = tableInfoMap[large].fDl;
2582 joinInfo->fRowGroup = tableInfoMap[large].fRowGroup;
2583
2584 if (root == false) // not root
2585 {
2586 TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(link, prevLarge));
2587
2588 if (mit == jobInfo.tableJoinMap.end())
2589 throw runtime_error("Join step not found.");
2590
2591 joinInfo->fJoinData = mit->second;
2592 }
2593
2594 joinOrder.push_back(large);
2595
2596 return joinInfo;
2597 }
2598
2599
joinStepCompare(const SJSTEP & a,const SJSTEP & b)2600 bool joinStepCompare(const SJSTEP& a, const SJSTEP& b)
2601 {
2602 return (dynamic_cast<TupleHashJoinStep*>(a.get())->joinId() <
2603 dynamic_cast<TupleHashJoinStep*>(b.get())->joinId());
2604 }
2605
2606
2607 struct JoinOrderData
2608 {
2609 uint32_t fTid1;
2610 uint32_t fTid2;
2611 uint32_t fJoinId;
2612 };
2613
2614
getJoinOrder(vector<JoinOrderData> & joins,JobStepVector & joinSteps,JobInfo & jobInfo)2615 void getJoinOrder(vector<JoinOrderData>& joins, JobStepVector& joinSteps, JobInfo& jobInfo)
2616 {
2617 sort(joinSteps.begin(), joinSteps.end(), joinStepCompare);
2618
2619 for (JobStepVector::iterator i = joinSteps.begin(); i < joinSteps.end(); i++)
2620 {
2621 TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
2622 JoinOrderData jo;
2623 jo.fTid1 = getTableKey(jobInfo, thjs->tupleId1());
2624 jo.fTid2 = getTableKey(jobInfo, thjs->tupleId2());
2625 jo.fJoinId = thjs->joinId();
2626
2627 // not fastest, but good for a small list
2628 vector<JoinOrderData>::iterator j;
2629
2630 for (j = joins.begin(); j < joins.end(); j++)
2631 {
2632 if ((j->fTid1 == jo.fTid1 && j->fTid2 == jo.fTid2) ||
2633 (j->fTid1 == jo.fTid2 && j->fTid2 == jo.fTid1))
2634 {
2635 j->fJoinId = jo.fJoinId;
2636 break;
2637 }
2638 }
2639
2640 // insert unique join pair
2641 if (j == joins.end())
2642 joins.push_back(jo);
2643 }
2644 }
2645
updateJoinSides(uint32_t small,uint32_t large,map<uint32_t,SP_JoinInfo> & joinInfoMap,vector<SP_JoinInfo> & smallSides,TableInfoMap & tableInfoMap,JobInfo & jobInfo)2646 inline void updateJoinSides(uint32_t small, uint32_t large, map<uint32_t, SP_JoinInfo>& joinInfoMap,
2647 vector<SP_JoinInfo>& smallSides, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
2648 {
2649 TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(small, large));
2650
2651 if (mit == jobInfo.tableJoinMap.end())
2652 throw runtime_error("Join step not found.");
2653
2654 joinInfoMap[small]->fJoinData = mit->second;
2655 tableInfoMap[small].fJoinedTables.insert(small);
2656 smallSides.push_back(joinInfoMap[small]);
2657
2658 tableInfoMap[large].fJoinedTables.insert(
2659 tableInfoMap[small].fJoinedTables.begin(), tableInfoMap[small].fJoinedTables.end());
2660 tableInfoMap[large].fJoinedTables.insert(large);
2661 }
2662
2663
2664 // For OUTER JOIN bug @2422/2633/3437/3759, join table based on join order.
2665 // The largest table will be always the streaming table, other tables are always on small side.
joinTablesInOrder(uint32_t largest,JobStepVector & joinSteps,TableInfoMap & tableInfoMap,JobInfo & jobInfo,vector<uint32_t> & joinOrder)2666 void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& tableInfoMap,
2667 JobInfo& jobInfo, vector<uint32_t>& joinOrder)
2668 {
2669 // populate the tableInfo for join
2670 map<uint32_t, SP_JoinInfo> joinInfoMap; // <table, JoinInfo>
2671
2672 // <table, <last step involved, large priority> >
2673 // large priority:
2674 // -1 - must be on small side, like derived tables for semi join;
2675 // 0 - prefer to be on small side, like FROM subquery;
2676 // 1 - can be on either large or small side;
2677 // 2 - must be on large side.
2678 map<uint32_t, pair<SJSTEP, int> > joinStepMap;
2679
2680 BatchPrimitive* bps = NULL;
2681 SubAdapterStep* tsas = NULL;
2682 TupleHashJoinStep* thjs = NULL;
2683
2684 for (vector<uint32_t>::iterator i = jobInfo.tableList.begin(); i < jobInfo.tableList.end(); i++)
2685 {
2686 SP_JoinInfo joinInfo(new JoinInfo);
2687 joinInfo->fTableOid = tableInfoMap[*i].fTableOid;
2688 joinInfo->fAlias = tableInfoMap[*i].fAlias;
2689 joinInfo->fView = tableInfoMap[*i].fView;
2690 joinInfo->fSchema = tableInfoMap[*i].fSchema;
2691
2692 joinInfo->fDl = tableInfoMap[*i].fDl;
2693 joinInfo->fRowGroup = tableInfoMap[*i].fRowGroup;
2694
2695 joinInfoMap[*i] = joinInfo;
2696
2697 SJSTEP spjs = tableInfoMap[*i].fQuerySteps.back();
2698 bps = dynamic_cast<BatchPrimitive*>(spjs.get());
2699 tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
2700 thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
2701 TupleBPS* tbps = dynamic_cast<TupleBPS*>(spjs.get());
2702
2703 if (*i == largest)
2704 joinStepMap[*i] = make_pair(spjs, 2);
2705 else if (tbps || thjs)
2706 joinStepMap[*i] = make_pair(spjs, 1);
2707 else if (tsas)
2708 joinStepMap[*i] = make_pair(spjs, 0);
2709 else
2710 joinStepMap[*i] = make_pair(spjs, -1);
2711 }
2712
2713 // sort the join steps based on join ID.
2714 vector<JoinOrderData> joins;
2715 getJoinOrder(joins, joinSteps, jobInfo);
2716
2717 // join the steps
2718 int64_t lastJoinId = -1;
2719 uint32_t large = (uint32_t) - 1;
2720 uint32_t small = (uint32_t) - 1;
2721 uint32_t prevLarge = (uint32_t) - 1;
2722 bool umstream = false;
2723 vector<uint32_t> joinedTable;
2724 uint32_t lastJoin = joins.size() - 1;
2725 bool isSemijoin = true;
2726
2727 for (uint64_t js = 0; js < joins.size(); js++)
2728 {
2729 set<uint32_t> smallSideTid;
2730
2731 if (joins[js].fJoinId != 0)
2732 isSemijoin = false;
2733
2734 vector<SP_JoinInfo> smallSides;
2735 uint32_t tid1 = joins[js].fTid1;
2736 uint32_t tid2 = joins[js].fTid2;
2737 lastJoinId = joins[js].fJoinId;
2738
2739 // largest has already joined, and this join cannot be merged.
2740 if (prevLarge == largest && tid1 != largest && tid2 != largest)
2741 umstream = true;
2742
2743 if (joinStepMap[tid1].second > joinStepMap[tid2].second) // high priority
2744 {
2745 large = tid1;
2746 small = tid2;
2747 }
2748 else if (joinStepMap[tid1].second == joinStepMap[tid2].second &&
2749 jobInfo.tableSize[tid1] >= jobInfo.tableSize[tid2]) // favor t1 for hint
2750 {
2751 large = tid1;
2752 small = tid2;
2753 }
2754 else
2755 {
2756 large = tid2;
2757 small = tid1;
2758 }
2759
2760 updateJoinSides(small, large, joinInfoMap, smallSides, tableInfoMap, jobInfo);
2761
2762 if (find(joinedTable.begin(), joinedTable.end(), small) == joinedTable.end())
2763 joinedTable.push_back(small);
2764
2765 smallSideTid.insert(small);
2766
2767 // merge in the next step if the large side is the same
2768 for (uint64_t ns = js + 1; ns < joins.size(); js++, ns++)
2769 {
2770 uint32_t tid1 = joins[ns].fTid1;
2771 uint32_t tid2 = joins[ns].fTid2;
2772 uint32_t small = (uint32_t) - 1;
2773
2774 if ((tid1 == large) &&
2775 ((joinStepMap[tid1].second > joinStepMap[tid2].second) ||
2776 (joinStepMap[tid1].second == joinStepMap[tid2].second &&
2777 jobInfo.tableSize[tid1] >= jobInfo.tableSize[tid2])))
2778 {
2779 small = tid2;
2780 }
2781 else if ((tid2 == large) &&
2782 ((joinStepMap[tid2].second > joinStepMap[tid1].second) ||
2783 (joinStepMap[tid2].second == joinStepMap[tid1].second &&
2784 jobInfo.tableSize[tid2] >= jobInfo.tableSize[tid1])))
2785 {
2786 small = tid1;
2787 }
2788 else
2789 {
2790 break;
2791 }
2792
2793 // check if FE needs table in previous smallsides
2794 if (jobInfo.joinFeTableMap[joins[ns].fJoinId].size() > 0)
2795 {
2796 set<uint32_t>& tids = jobInfo.joinFeTableMap[joins[ns].fJoinId];
2797
2798 for (set<uint32_t>::iterator si = smallSideTid.begin(); si != smallSideTid.end(); si++)
2799 {
2800 if (tids.find(*si) != tids.end())
2801 throw runtime_error("On clause filter involving a table not directly involved in the outer join is currently not supported.");
2802 }
2803 }
2804
2805 updateJoinSides(small, large, joinInfoMap, smallSides, tableInfoMap, jobInfo);
2806 lastJoinId = joins[ns].fJoinId;
2807
2808 if (find(joinedTable.begin(), joinedTable.end(), small) == joinedTable.end())
2809 joinedTable.push_back(small);
2810
2811 smallSideTid.insert(small);
2812 }
2813
2814 joinedTable.push_back(large);
2815
2816 SJSTEP spjs = joinStepMap[large].first;
2817 bps = dynamic_cast<BatchPrimitive*>(spjs.get());
2818 tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
2819 thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
2820
2821 if (!bps && !thjs && !tsas)
2822 {
2823 if (dynamic_cast<SubQueryStep*>(spjs.get()))
2824 throw IDBExcept(ERR_NON_SUPPORT_SUB_QUERY_TYPE);
2825
2826 throw runtime_error("Not supported join.");
2827 }
2828
2829 size_t startPos = 0; // start point to add new smallsides
2830 RowGroup largeSideRG = joinInfoMap[large]->fRowGroup;
2831
2832 if (thjs && thjs->tokenJoin() == large)
2833 largeSideRG = thjs->getLargeRowGroup();
2834
2835 // get info to config the TupleHashjoin
2836 vector<string> traces;
2837 vector<string> tableNames;
2838 DataListVec smallSideDLs;
2839 vector<RowGroup> smallSideRGs;
2840 vector<JoinType> jointypes;
2841 vector<bool> typeless;
2842 vector<vector<uint32_t> > smallKeyIndices;
2843 vector<vector<uint32_t> > largeKeyIndices;
2844
2845 // bug5764, make sure semi joins acting as filter is after outer join.
2846 {
2847 // the inner table filters have to be performed after outer join
2848 vector<uint64_t> semijoins;
2849 vector<uint64_t> smallouts;
2850
2851 for (size_t i = 0; i < smallSides.size(); i++)
2852 {
2853 // find the the small-outer and semi-join joins
2854 JoinType jt = smallSides[i]->fJoinData.fTypes[0];
2855
2856 if (jt & SMALLOUTER)
2857 smallouts.push_back(i);
2858 else if (jt & (SEMI | ANTI | SCALAR | CORRELATED))
2859 semijoins.push_back(i);
2860 }
2861
2862 // check the join order, re-arrange if necessary
2863 if (smallouts.size() > 0 && semijoins.size() > 0)
2864 {
2865 uint64_t lastSmallOut = smallouts.back();
2866 uint64_t lastSemijoin = semijoins.back();
2867
2868 if (lastSmallOut > lastSemijoin)
2869 {
2870 vector<SP_JoinInfo> temp1;
2871 vector<SP_JoinInfo> temp2;
2872 size_t j = 0;
2873
2874 for (size_t i = 0; i < smallSides.size(); i++)
2875 {
2876 if (j < semijoins.size() && i == semijoins[j])
2877 {
2878 temp1.push_back(smallSides[i]);
2879 j++;
2880 }
2881 else
2882 {
2883 temp2.push_back(smallSides[i]);
2884 }
2885
2886 if (i == lastSmallOut)
2887 temp2.insert(temp2.end(), temp1.begin(), temp1.end());
2888 }
2889
2890 smallSides = temp2;
2891 }
2892 }
2893 }
2894
2895 for (vector<SP_JoinInfo>::iterator i = smallSides.begin(); i != smallSides.end(); i++)
2896 {
2897 JoinInfo* info = i->get();
2898 smallSideDLs.push_back(info->fDl);
2899 smallSideRGs.push_back(info->fRowGroup);
2900 jointypes.push_back(info->fJoinData.fTypes[0]);
2901 typeless.push_back(info->fJoinData.fTypeless);
2902
2903 vector<uint32_t> smallIndices;
2904 vector<uint32_t> largeIndices;
2905 const vector<uint32_t>& keys1 = info->fJoinData.fLeftKeys;
2906 const vector<uint32_t>& keys2 = info->fJoinData.fRightKeys;
2907 vector<uint32_t>::const_iterator k1 = keys1.begin();
2908 vector<uint32_t>::const_iterator k2 = keys2.begin();
2909 uint32_t stid = getTableKey(jobInfo, *k1);
2910 tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[stid].fTable);
2911
2912 for (; k1 != keys1.end(); ++k1, ++k2)
2913 {
2914 smallIndices.push_back(getKeyIndex(*k1, info->fRowGroup));
2915 largeIndices.push_back(getKeyIndex(*k2, largeSideRG));
2916 }
2917
2918 smallKeyIndices.push_back(smallIndices);
2919 largeKeyIndices.push_back(largeIndices);
2920
2921 if (jobInfo.trace)
2922 {
2923 // small side column
2924 ostringstream smallKey, smallIndex;
2925 string alias1 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys1.front())];
2926 smallKey << alias1 << "-";
2927
2928 for (k1 = keys1.begin(); k1 != keys1.end(); ++k1)
2929 {
2930 CalpontSystemCatalog::OID oid1 = jobInfo.keyInfo->tupleKeyVec[*k1].fId;
2931 CalpontSystemCatalog::TableColName tcn1 = jobInfo.csc->colName(oid1);
2932 smallKey << "(" << tcn1.column << ":" << oid1 << ":" << *k1 << ")";
2933 smallIndex << " " << getKeyIndex(*k1, info->fRowGroup);
2934 }
2935
2936 // large side column
2937 ostringstream largeKey, largeIndex;
2938 string alias2 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys2.front())];
2939 largeKey << alias2 << "-";
2940
2941 for (k2 = keys2.begin(); k2 != keys2.end(); ++k2)
2942 {
2943 CalpontSystemCatalog::OID oid2 = jobInfo.keyInfo->tupleKeyVec[*k2].fId;
2944 CalpontSystemCatalog::TableColName tcn2 = jobInfo.csc->colName(oid2);
2945 largeKey << "(" << tcn2.column << ":" << oid2 << ":" << *k2 << ")";
2946 largeIndex << " " << getKeyIndex(*k2, largeSideRG);
2947 }
2948
2949 ostringstream oss;
2950 oss << smallKey.str() << " join on " << largeKey.str()
2951 << " joinType: " << info->fJoinData.fTypes.front()
2952 << "(" << joinTypeToString(info->fJoinData.fTypes.front()) << ")"
2953 << (info->fJoinData.fTypeless ? " " : " !") << "typeless" << endl;
2954 oss << "smallSideIndex-largeSideIndex :" << smallIndex.str() << " --"
2955 << largeIndex.str() << endl;
2956 oss << "small side RG" << endl << info->fRowGroup.toString() << endl;
2957 traces.push_back(oss.str());
2958 }
2959 }
2960
2961 if (jobInfo.trace)
2962 {
2963 ostringstream oss;
2964 oss << "large side RG" << endl << largeSideRG.toString() << endl;
2965 traces.push_back(oss.str());
2966 }
2967
2968 if (bps || tsas || umstream || (thjs && joinStepMap[large].second < 1))
2969 {
2970 thjs = new TupleHashJoinStep(jobInfo);
2971 thjs->tableOid1(smallSides[0]->fTableOid);
2972 thjs->tableOid2(tableInfoMap[large].fTableOid);
2973 thjs->alias1(smallSides[0]->fAlias);
2974 thjs->alias2(tableInfoMap[large].fAlias);
2975 thjs->view1(smallSides[0]->fView);
2976 thjs->view2(tableInfoMap[large].fView);
2977 thjs->schema1(smallSides[0]->fSchema);
2978 thjs->schema2(tableInfoMap[large].fSchema);
2979 thjs->setLargeSideBPS(bps);
2980 thjs->joinId(lastJoinId);
2981
2982 if (dynamic_cast<TupleBPS*>(bps) != NULL)
2983 bps->incWaitToRunStepCnt();
2984
2985 spjs.reset(thjs);
2986
2987 JobStepAssociation inJsa;
2988 inJsa.outAdd(smallSideDLs, 0);
2989 inJsa.outAdd(joinInfoMap[large]->fDl);
2990 thjs->inputAssociation(inJsa);
2991 thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
2992
2993 JobStepAssociation outJsa;
2994 AnyDataListSPtr spdl(new AnyDataList());
2995 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
2996 spdl->rowGroupDL(dl);
2997 dl->OID(large);
2998 outJsa.outAdd(spdl);
2999 thjs->outputAssociation(outJsa);
3000
3001 thjs->configSmallSideRG(smallSideRGs, tableNames);
3002 thjs->configLargeSideRG(joinInfoMap[large]->fRowGroup);
3003 thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
3004
3005 tableInfoMap[large].fQuerySteps.push_back(spjs);
3006 tableInfoMap[large].fDl = spdl;
3007 }
3008 else // thjs && joinStepMap[large].second >= 1
3009 {
3010 JobStepAssociation inJsa = thjs->inputAssociation();
3011
3012 if (inJsa.outSize() < 2)
3013 throw runtime_error("Not enough input to a hashjoin.");
3014
3015 startPos = inJsa.outSize() - 1;
3016 inJsa.outAdd(smallSideDLs, startPos);
3017 thjs->inputAssociation(inJsa);
3018 thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
3019
3020 thjs->addSmallSideRG(smallSideRGs, tableNames);
3021 thjs->addJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
3022 }
3023
3024 RowGroup rg;
3025 set<uint32_t>& tableSet = tableInfoMap[large].fJoinedTables;
3026 constructJoinedRowGroup(rg, tableSet, tableInfoMap, jobInfo);
3027 thjs->setOutputRowGroup(rg);
3028 tableInfoMap[large].fRowGroup = rg;
3029 tableSet.insert(large);
3030
3031 if (jobInfo.trace)
3032 {
3033 cout << boldStart << "\n====== join info ======\n" << boldStop;
3034
3035 for (vector<string>::iterator t = traces.begin(); t != traces.end(); ++t)
3036 cout << *t;
3037
3038 cout << "RowGroup join result: " << endl << rg.toString() << endl << endl;
3039 }
3040
3041 // on clause filter association
3042 map<uint64_t, size_t> joinIdIndexMap;
3043
3044 for (size_t i = 0; i < smallSides.size(); i++)
3045 {
3046 if (smallSides[i]->fJoinData.fJoinId > 0)
3047 joinIdIndexMap[smallSides[i]->fJoinData.fJoinId] = i;
3048 }
3049
3050 // check if any cross-table expressions can be evaluated after the join
3051 JobStepVector readyExpSteps;
3052 JobStepVector& expSteps = jobInfo.crossTableExpressions;
3053 JobStepVector::iterator eit = expSteps.begin();
3054
3055 while (eit != expSteps.end())
3056 {
3057 ExpressionStep* exp = dynamic_cast<ExpressionStep*>(eit->get());
3058
3059 if (exp == NULL)
3060 throw runtime_error("Not an expression.");
3061
3062 if (exp->functionJoin())
3063 {
3064 eit++;
3065 continue; // done as join
3066 }
3067
3068 const vector<uint32_t>& tables = exp->tableKeys();
3069 uint64_t i = 0;
3070
3071 for (; i < tables.size(); i++)
3072 {
3073 if (tableInfoMap[large].fJoinedTables.find(tables[i]) ==
3074 tableInfoMap[large].fJoinedTables.end())
3075 break;
3076 }
3077
3078 // all tables for this expression are joined
3079 bool ready = (tables.size() == i);
3080
3081 // for on clause condition, need check join ID
3082 if (ready && exp->associatedJoinId() != 0)
3083 {
3084 map<uint64_t, size_t>::iterator x = joinIdIndexMap.find(exp->associatedJoinId());
3085 ready = (x != joinIdIndexMap.end());
3086 }
3087
3088 if (ready)
3089 {
3090 readyExpSteps.push_back(*eit);
3091 eit = expSteps.erase(eit);
3092 }
3093 else
3094 {
3095 eit++;
3096 }
3097 }
3098
3099 // if last join step, handle the delayed outer join filters
3100 if (js == lastJoin && jobInfo.outerJoinExpressions.size() > 0)
3101 readyExpSteps.insert(readyExpSteps.end(),
3102 jobInfo.outerJoinExpressions.begin(),
3103 jobInfo.outerJoinExpressions.end());
3104
3105 // check additional compares for semi-join
3106 if (readyExpSteps.size() > 0)
3107 {
3108 map<uint32_t, uint32_t> keyToIndexMap; // map keys to the indices in the RG
3109
3110 for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
3111 keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
3112
3113 // tables have additional comparisons
3114 map<uint32_t, int> correlateTables; // index in thjs
3115 map<uint32_t, ParseTree*> correlateCompare; // expression
3116
3117 for (size_t i = 0; i != smallSides.size(); i++)
3118 {
3119 if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
3120 {
3121 uint32_t tid = getTableKey(jobInfo,
3122 smallSides[i]->fTableOid,
3123 smallSides[i]->fAlias,
3124 smallSides[i]->fSchema,
3125 smallSides[i]->fView);
3126 correlateTables[tid] = i;
3127 correlateCompare[tid] = NULL;
3128 }
3129 }
3130
3131 if (correlateTables.size() > 0)
3132 {
3133 // separate additional compare for each table pair
3134 JobStepVector::iterator eit = readyExpSteps.begin();
3135
3136 while (eit != readyExpSteps.end())
3137 {
3138 ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
3139
3140 if (e->selectFilter())
3141 {
3142 // @bug3780, leave select filter to normal expression
3143 eit++;
3144 continue;
3145 }
3146
3147 const vector<uint32_t>& tables = e->tableKeys();
3148 map<uint32_t, int>::iterator j = correlateTables.end();
3149
3150 for (size_t i = 0; i < tables.size(); i++)
3151 {
3152 j = correlateTables.find(tables[i]);
3153
3154 if (j != correlateTables.end())
3155 break;
3156 }
3157
3158 if (j == correlateTables.end())
3159 {
3160 eit++;
3161 continue;
3162 }
3163
3164 // map the input column index
3165 e->updateInputIndex(keyToIndexMap, jobInfo);
3166 ParseTree* pt = correlateCompare[j->first];
3167
3168 if (pt == NULL)
3169 {
3170 // first expression
3171 pt = new ParseTree;
3172 pt->copyTree(*(e->expressionFilter()));
3173 }
3174 else
3175 {
3176 // combine the expressions
3177 ParseTree* left = pt;
3178 ParseTree* right = new ParseTree;
3179 right->copyTree(*(e->expressionFilter()));
3180 pt = new ParseTree(new LogicOperator("and"));
3181 pt->left(left);
3182 pt->right(right);
3183 }
3184
3185 correlateCompare[j->first] = pt;
3186 eit = readyExpSteps.erase(eit);
3187 }
3188
3189 map<uint32_t, int>::iterator k = correlateTables.begin();
3190
3191 while (k != correlateTables.end())
3192 {
3193 ParseTree* pt = correlateCompare[k->first];
3194
3195 if (pt != NULL)
3196 {
3197 boost::shared_ptr<ParseTree> sppt(pt);
3198 thjs->addJoinFilter(sppt, startPos + k->second);
3199 }
3200
3201 k++;
3202 }
3203
3204 thjs->setJoinFilterInputRG(rg);
3205 }
3206
3207 // normal expression if any
3208 if (readyExpSteps.size() > 0)
3209 {
3210 // add the expression steps in where clause can be solved by this join to bps
3211 ParseTree* pt = NULL;
3212 JobStepVector::iterator eit = readyExpSteps.begin();
3213
3214 for (; eit != readyExpSteps.end(); eit++)
3215 {
3216 // map the input column index
3217 ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
3218 e->updateInputIndex(keyToIndexMap, jobInfo);
3219
3220 // short circuit on clause expressions
3221 map<uint64_t, size_t>::iterator x = joinIdIndexMap.find(e->associatedJoinId());
3222
3223 if (x != joinIdIndexMap.end())
3224 {
3225 ParseTree* joinFilter = new ParseTree;
3226 joinFilter->copyTree(*(e->expressionFilter()));
3227 boost::shared_ptr<ParseTree> sppt(joinFilter);
3228 thjs->addJoinFilter(sppt, startPos + x->second);
3229 thjs->setJoinFilterInputRG(rg);
3230 continue;
3231 }
3232
3233 if (pt == NULL)
3234 {
3235 // first expression
3236 pt = new ParseTree;
3237 pt->copyTree(*(e->expressionFilter()));
3238 }
3239 else
3240 {
3241 // combine the expressions
3242 ParseTree* left = pt;
3243 ParseTree* right = new ParseTree;
3244 right->copyTree(*(e->expressionFilter()));
3245 pt = new ParseTree(new LogicOperator("and"));
3246 pt->left(left);
3247 pt->right(right);
3248 }
3249 }
3250
3251 if (pt != NULL)
3252 {
3253 boost::shared_ptr<ParseTree> sppt(pt);
3254 thjs->addFcnExpGroup2(sppt);
3255 }
3256 }
3257
3258 // update the fColsInExp2 and construct the output RG
3259 updateExp2Cols(readyExpSteps, tableInfoMap, jobInfo);
3260 constructJoinedRowGroup(rg, tableSet, tableInfoMap, jobInfo);
3261
3262 if (thjs->hasFcnExpGroup2())
3263 thjs->setFE23Output(rg);
3264 else
3265 thjs->setOutputRowGroup(rg);
3266
3267 tableInfoMap[large].fRowGroup = rg;
3268
3269 if (jobInfo.trace)
3270 {
3271 cout << "RowGroup of " << tableInfoMap[large].fAlias << " after EXP G2: " << endl
3272 << rg.toString() << endl << endl;
3273 }
3274 }
3275
3276 // update the info maps
3277 int l = (joinStepMap[large].second == 2) ? 2 : 0;
3278
3279 if (isSemijoin)
3280 joinStepMap[large] = make_pair(spjs, joinStepMap[large].second);
3281 else
3282 joinStepMap[large] = make_pair(spjs, l);
3283
3284 for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
3285 {
3286 joinInfoMap[*i]->fDl = tableInfoMap[large].fDl;
3287 joinInfoMap[*i]->fRowGroup = tableInfoMap[large].fRowGroup;
3288
3289 if (*i != large)
3290 {
3291 //@bug6117, token should be done for small side tables.
3292 SJSTEP smallJs = joinStepMap[*i].first;
3293 TupleHashJoinStep* smallThjs = dynamic_cast<TupleHashJoinStep*>(smallJs.get());
3294
3295 if (smallThjs && smallThjs->tokenJoin())
3296 smallThjs->tokenJoin(-1);
3297
3298 // Set join priority for smallsides.
3299 joinStepMap[*i] = make_pair(spjs, l);
3300
3301 // Mark joined tables, smalls and large, as a group.
3302 tableInfoMap[*i].fJoinedTables = tableInfoMap[large].fJoinedTables;
3303 }
3304 }
3305
3306 prevLarge = large;
3307 }
3308
3309 // Keep join order by the table last used for picking the right delivery step.
3310 {
3311 for (vector<uint32_t>::reverse_iterator i = joinedTable.rbegin(); i < joinedTable.rend(); i++)
3312 {
3313 if (find(joinOrder.begin(), joinOrder.end(), *i) == joinOrder.end())
3314 joinOrder.push_back(*i);
3315 }
3316
3317 const uint64_t n = joinOrder.size();
3318 const uint64_t h = n / 2;
3319 const uint64_t e = n - 1;
3320
3321 for (uint64_t i = 0; i < h; i++)
3322 std::swap(joinOrder[i], joinOrder[e - i]);
3323 }
3324 }
3325
3326
joinTables(JobStepVector & joinSteps,TableInfoMap & tableInfoMap,JobInfo & jobInfo,vector<uint32_t> & joinOrder,const bool overrideLargeSideEstimate)3327 inline void joinTables(JobStepVector& joinSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo,
3328 vector<uint32_t>& joinOrder, const bool overrideLargeSideEstimate)
3329 {
3330 uint32_t largestTable = getLargestTable(jobInfo, tableInfoMap, overrideLargeSideEstimate);
3331
3332 if (jobInfo.outerOnTable.size() == 0)
3333 joinToLargeTable(largestTable, tableInfoMap, jobInfo, joinOrder);
3334 else
3335 joinTablesInOrder(largestTable, joinSteps, tableInfoMap, jobInfo, joinOrder);
3336 }
3337
3338
makeNoTableJobStep(JobStepVector & querySteps,JobStepVector & projectSteps,DeliveredTableMap & deliverySteps,JobInfo & jobInfo)3339 void makeNoTableJobStep(JobStepVector& querySteps, JobStepVector& projectSteps,
3340 DeliveredTableMap& deliverySteps, JobInfo& jobInfo)
3341 {
3342 querySteps.clear();
3343 projectSteps.clear();
3344 deliverySteps.clear();
3345 querySteps.push_back(TupleConstantStep::addConstantStep(jobInfo));
3346 deliverySteps[CNX_VTABLE_ID] = querySteps.back();
3347 }
3348
3349
3350 }
3351
3352
3353
3354 namespace joblist
3355 {
associateTupleJobSteps(JobStepVector & querySteps,JobStepVector & projectSteps,DeliveredTableMap & deliverySteps,JobInfo & jobInfo,const bool overrideLargeSideEstimate)3356 void associateTupleJobSteps(JobStepVector& querySteps, JobStepVector& projectSteps,
3357 DeliveredTableMap& deliverySteps, JobInfo& jobInfo,
3358 const bool overrideLargeSideEstimate)
3359 {
3360 if (jobInfo.trace)
3361 {
3362 const boost::shared_ptr<TupleKeyInfo>& keyInfo = jobInfo.keyInfo;
3363 cout << "query steps:" << endl;
3364
3365 for (JobStepVector::iterator i = querySteps.begin(); i != querySteps.end(); ++i)
3366 {
3367 TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
3368
3369 if (thjs == NULL)
3370 {
3371 int64_t id = ((*i)->tupleId() != (uint64_t) - 1) ? (*i)->tupleId() : -1;
3372 cout << typeid(*(i->get())).name() << ": " << (*i)->oid() << " " << id << " "
3373 << (int)((id != -1) ? getTableKey(jobInfo, id) : -1) << endl;
3374 }
3375 else
3376 {
3377 int64_t id1 = (thjs->tupleId1() != (uint64_t) - 1) ? thjs->tupleId1() : -1;
3378 int64_t id2 = (thjs->tupleId2() != (uint64_t) - 1) ? thjs->tupleId2() : -1;
3379 cout << typeid(*thjs).name() << ": " << thjs->oid1() << " " << id1 << " "
3380 << (int)((id1 != -1) ? getTableKey(jobInfo, id1) : -1) << " - "
3381 << thjs->getJoinType() << " - " << thjs->oid2() << " " << id2 << " "
3382 << (int)((id2 != -1) ? getTableKey(jobInfo, id2) : -1) << endl;
3383 }
3384 }
3385
3386 cout << "project steps:" << endl;
3387
3388 for (JobStepVector::iterator i = projectSteps.begin(); i != projectSteps.end(); ++i)
3389 {
3390 cout << typeid(*(i->get())).name() << ": " << (*i)->oid() << " "
3391 << (*i)->tupleId() << " " << getTableKey(jobInfo, (*i)->tupleId()) << endl;
3392 }
3393
3394 cout << "delivery steps:" << endl;
3395
3396 for (DeliveredTableMap::iterator i = deliverySteps.begin(); i != deliverySteps.end(); ++i)
3397 cout << typeid(*(i->second.get())).name() << endl;
3398
3399 cout << "\nTable Info: (key oid name alias view sub)" << endl;
3400
3401 for (uint32_t i = 0; i < keyInfo->tupleKeyVec.size(); ++i)
3402 {
3403 int64_t tid = jobInfo.keyInfo->colKeyToTblKey[i];
3404
3405 if (tid == i)
3406 {
3407 CalpontSystemCatalog::OID oid = keyInfo->tupleKeyVec[i].fId;
3408 string alias = keyInfo->tupleKeyVec[i].fTable;
3409
3410 if (alias.length() < 1) alias = "N/A";
3411
3412 string name = keyInfo->keyName[i];
3413
3414 if (name.empty()) name = "unknown";
3415
3416 string view = keyInfo->tupleKeyVec[i].fView;
3417
3418 if (view.length() < 1) view = "N/A";
3419
3420 int sid = keyInfo->tupleKeyVec[i].fSubId;
3421 cout << i << "\t" << oid << "\t" << name << "\t" << alias
3422 << "\t" << view << "\t" << hex << sid << dec << endl;
3423 }
3424 }
3425
3426 cout << "\nTupleKey vector: (tupleKey oid tableKey name alias view sub)" << endl;
3427
3428 for (uint32_t i = 0; i < keyInfo->tupleKeyVec.size(); ++i)
3429 {
3430 CalpontSystemCatalog::OID oid = keyInfo->tupleKeyVec[i].fId;
3431 int64_t tid = jobInfo.keyInfo->colKeyToTblKey[i];
3432 string alias = keyInfo->tupleKeyVec[i].fTable;
3433
3434 if (alias.length() < 1) alias = "N/A";
3435
3436 // Expression IDs are borrowed from systemcatalog IDs, which are not used in tuple.
3437 string name = keyInfo->keyName[i];
3438
3439 if (keyInfo->dictOidToColOid.find(oid) != keyInfo->dictOidToColOid.end())
3440 {
3441 name += "[d]"; // indicate this is a dictionary column
3442 }
3443
3444 if (jobInfo.keyInfo->pseudoType[i] > 0)
3445 {
3446 name += "[p]"; // indicate this is a pseudo column
3447 }
3448
3449 if (name.empty())
3450 {
3451 name = "unknown";
3452 }
3453
3454 string view = keyInfo->tupleKeyVec[i].fView;
3455
3456 if (view.length() < 1) view = "N/A";
3457
3458 int sid = keyInfo->tupleKeyVec[i].fSubId;
3459 cout << i << "\t" << oid << "\t" << tid << "\t" << name << "\t" << alias
3460 << "\t" << view << "\t" << hex << sid << dec << endl;
3461 }
3462
3463 cout << endl;
3464 }
3465
3466
3467 // @bug 2771, handle no table select query
3468 if (jobInfo.tableList.size() < 1)
3469 {
3470 makeNoTableJobStep(querySteps, projectSteps, deliverySteps, jobInfo);
3471 return;
3472 }
3473
3474 // Create a step vector for each table in the from clause.
3475 TableInfoMap tableInfoMap;
3476
3477 for (uint64_t i = 0; i < jobInfo.tableList.size(); i++)
3478 {
3479 uint32_t tableUid = jobInfo.tableList[i];
3480 tableInfoMap[tableUid] = TableInfo();
3481 tableInfoMap[tableUid].fTableOid = jobInfo.keyInfo->tupleKeyVec[tableUid].fId;
3482 tableInfoMap[tableUid].fName = jobInfo.keyInfo->keyName[tableUid];
3483 tableInfoMap[tableUid].fAlias = jobInfo.keyInfo->tupleKeyVec[tableUid].fTable;
3484 tableInfoMap[tableUid].fView = jobInfo.keyInfo->tupleKeyVec[tableUid].fView;
3485 tableInfoMap[tableUid].fSchema = jobInfo.keyInfo->tupleKeyVec[tableUid].fSchema;
3486 tableInfoMap[tableUid].fSubId = jobInfo.keyInfo->tupleKeyVec[tableUid].fSubId;
3487 tableInfoMap[tableUid].fColsInColMap = jobInfo.columnMap[tableUid];
3488 }
3489
3490 // Set of the columns being projected.
3491 for (TupleInfoVector::iterator i = jobInfo.pjColList.begin(); i != jobInfo.pjColList.end(); i++)
3492 jobInfo.returnColSet.insert(i->key);
3493
3494 // Strip constantbooleanquerySteps
3495 for (uint64_t i = 0; i < querySteps.size(); )
3496 {
3497 TupleConstantBooleanStep* bs = dynamic_cast<TupleConstantBooleanStep*>(querySteps[i].get());
3498 ExpressionStep* es = dynamic_cast<ExpressionStep*>(querySteps[i].get());
3499
3500 if (bs != NULL)
3501 {
3502 // cosntant step
3503 if (bs->boolValue() == false)
3504 jobInfo.constantFalse = true;
3505
3506 querySteps.erase(querySteps.begin() + i);
3507 }
3508 else if (es != NULL && es->tableKeys().size() == 0)
3509 {
3510 // constant expression
3511 ParseTree* p = es->expressionFilter(); // filter
3512
3513 if (p != NULL)
3514 {
3515 Row r; // dummy row
3516
3517 if (funcexp::FuncExp::instance()->evaluate(r, p) == false)
3518 jobInfo.constantFalse = true;
3519
3520 querySteps.erase(querySteps.begin() + i);
3521 }
3522 }
3523 else
3524 {
3525 i++;
3526 }
3527 }
3528
3529 // double check if the function join canditates are still there.
3530 JobStepVector steps = querySteps;
3531
3532 for (int64_t i = jobInfo.functionJoins.size() - 1; i >= 0; i--)
3533 {
3534 bool exist = false;
3535
3536 for (JobStepVector::iterator j = steps.begin(); j != steps.end() && !exist; ++j)
3537 {
3538 if (jobInfo.functionJoins[i] == j->get())
3539 exist = true;
3540 }
3541
3542 if (!exist)
3543 jobInfo.functionJoins.erase(jobInfo.functionJoins.begin() + i);
3544 }
3545
3546 // Concatenate query and project steps
3547 steps.insert(steps.end(), projectSteps.begin(), projectSteps.end());
3548
3549 // Make sure each query step has an output DL
3550 // This is necessary for toString() method on most steps
3551 for (JobStepVector::iterator it = steps.begin(); it != steps.end(); ++it)
3552 {
3553 //if (dynamic_cast<OrDelimiter*>(it->get()))
3554 // continue;
3555
3556 if (it->get()->outputAssociation().outSize() == 0)
3557 {
3558 JobStepAssociation jsa;
3559 AnyDataListSPtr adl(new AnyDataList());
3560 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
3561 dl->OID(it->get()->oid());
3562 adl->rowGroupDL(dl);
3563 jsa.outAdd(adl);
3564 it->get()->outputAssociation(jsa);
3565 }
3566 }
3567
3568 // Populate the TableInfo map with the job steps keyed by table ID.
3569 JobStepVector joinSteps;
3570 JobStepVector& expSteps = jobInfo.crossTableExpressions;
3571 JobStepVector::iterator it = querySteps.begin();
3572 JobStepVector::iterator end = querySteps.end();
3573
3574 while (it != end)
3575 {
3576 // Separate table joins from other predicates.
3577 TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(it->get());
3578 ExpressionStep* exps = dynamic_cast<ExpressionStep*>(it->get());
3579 SubAdapterStep* subs = dynamic_cast<SubAdapterStep*>(it->get());
3580
3581 if (thjs != NULL && thjs->tupleId1() != thjs->tupleId2())
3582 {
3583 // simple column and constant column semi join
3584 if (thjs->tableOid2() == 0 && thjs->schema2().empty())
3585 {
3586 jobInfo.correlateSteps.push_back(*it++);
3587 continue;
3588 }
3589
3590 // check correlated join step
3591 JoinType joinType = thjs->getJoinType();
3592
3593 if (joinType & CORRELATED)
3594 {
3595 // one of the tables is in outer query
3596 jobInfo.correlateSteps.push_back(*it++);
3597 continue;
3598 }
3599
3600 // Save the join topology.
3601 uint32_t key1 = thjs->tupleId1();
3602 uint32_t key2 = thjs->tupleId2();
3603 uint32_t tid1 = getTableKey(jobInfo, key1);
3604 uint32_t tid2 = getTableKey(jobInfo, key2);
3605
3606 if (thjs->dictOid1() > 0)
3607 key1 = jobInfo.keyInfo->dictKeyMap[key1];
3608
3609 if (thjs->dictOid2() > 0)
3610 key2 = jobInfo.keyInfo->dictKeyMap[key2];
3611
3612 // not correlated
3613 joinSteps.push_back(*it);
3614 tableInfoMap[tid1].fJoinKeys.push_back(key1);
3615 tableInfoMap[tid2].fJoinKeys.push_back(key2);
3616
3617 // save the function join expressions
3618 boost::shared_ptr<FunctionJoinInfo> fji = thjs->funcJoinInfo();
3619
3620 if (fji)
3621 {
3622 if (fji->fStep[0])
3623 {
3624 tableInfoMap[tid1].fFuncJoinExps.push_back(fji->fStep[0]);
3625 vector<uint32_t>& cols = tableInfoMap[tid1].fColsInFuncJoin;
3626 cols.insert(cols.end(), fji->fColumnKeys[0].begin(), fji->fColumnKeys[0].end());
3627 }
3628
3629 if (fji->fStep[1])
3630 {
3631 tableInfoMap[tid2].fFuncJoinExps.push_back(fji->fStep[1]);
3632 vector<uint32_t>& cols = tableInfoMap[tid2].fColsInFuncJoin;
3633 cols.insert(cols.end(), fji->fColumnKeys[1].begin(), fji->fColumnKeys[1].end());
3634 }
3635 }
3636
3637 // keep a join map
3638 pair<uint32_t, uint32_t> tablePair(tid1, tid2);
3639 TableJoinMap::iterator m1 = jobInfo.tableJoinMap.find(tablePair);
3640 TableJoinMap::iterator m2 = jobInfo.tableJoinMap.end();
3641
3642 if (m1 == jobInfo.tableJoinMap.end())
3643 {
3644 tableInfoMap[tid1].fAdjacentList.push_back(tid2);
3645 tableInfoMap[tid2].fAdjacentList.push_back(tid1);
3646
3647 m1 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid1, tid2), JoinData()));
3648 m2 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid2, tid1), JoinData()));
3649
3650 TupleInfo ti1(getTupleInfo(key1, jobInfo));
3651 TupleInfo ti2(getTupleInfo(key2, jobInfo));
3652
3653 if (ti1.width > 8 || ti2.width > 8)
3654 {
3655 if (ti1.dtype == execplan::CalpontSystemCatalog::LONGDOUBLE
3656 || ti2.dtype == execplan::CalpontSystemCatalog::LONGDOUBLE)
3657 {
3658 m1->second.fTypeless = m2->second.fTypeless = false;
3659 }
3660 else
3661 {
3662 m1->second.fTypeless = m2->second.fTypeless = true;
3663 }
3664 }
3665 else
3666 {
3667 m1->second.fTypeless = m2->second.fTypeless = isCharType(ti1.dtype) ||
3668 isCharType(ti2.dtype);
3669 }
3670 }
3671 else
3672 {
3673 m2 = jobInfo.tableJoinMap.find(make_pair(tid2, tid1));
3674 m1->second.fTypeless = m2->second.fTypeless = true;
3675 }
3676
3677 if (m1 == jobInfo.tableJoinMap.end() || m2 == jobInfo.tableJoinMap.end())
3678 throw runtime_error("Bad table map.");
3679
3680 // Keep a map of the join (table, key) pairs
3681 m1->second.fLeftKeys.push_back(key1);
3682 m1->second.fRightKeys.push_back(key2);
3683
3684 m2->second.fLeftKeys.push_back(key2);
3685 m2->second.fRightKeys.push_back(key1);
3686
3687 // Keep a map of the join type between the keys.
3688 // OUTER join and SEMI/ANTI join are mutually exclusive.
3689 if (joinType == LEFTOUTER)
3690 {
3691 m1->second.fTypes.push_back(SMALLOUTER);
3692 m2->second.fTypes.push_back(LARGEOUTER);
3693 jobInfo.outerOnTable.insert(tid2);
3694 }
3695 else if (joinType == RIGHTOUTER)
3696 {
3697 m1->second.fTypes.push_back(LARGEOUTER);
3698 m2->second.fTypes.push_back(SMALLOUTER);
3699 jobInfo.outerOnTable.insert(tid1);
3700 }
3701 else if ((joinType & SEMI) &&
3702 ((joinType & LEFTOUTER) == LEFTOUTER || (joinType & RIGHTOUTER) == RIGHTOUTER))
3703 {
3704 // @bug3998, DML UPDATE borrows "SEMI" flag,
3705 // allowing SEMI and LARGEOUTER combination to support update with outer join.
3706 if ((joinType & LEFTOUTER) == LEFTOUTER)
3707 {
3708 joinType ^= LEFTOUTER;
3709 m1->second.fTypes.push_back(joinType);
3710 m2->second.fTypes.push_back(joinType | LARGEOUTER);
3711 jobInfo.outerOnTable.insert(tid2);
3712 }
3713 else
3714 {
3715 joinType ^= RIGHTOUTER;
3716 m1->second.fTypes.push_back(joinType | LARGEOUTER);
3717 m2->second.fTypes.push_back(joinType);
3718 jobInfo.outerOnTable.insert(tid1);
3719 }
3720 }
3721 else
3722 {
3723 m1->second.fTypes.push_back(joinType);
3724 m2->second.fTypes.push_back(joinType);
3725 }
3726
3727 // need id to keep the join order
3728 m1->second.fJoinId = m2->second.fJoinId = thjs->joinId();
3729 }
3730 // Separate the expressions
3731 else if (exps != NULL && subs == NULL)
3732 {
3733 const vector<uint32_t>& tables = exps->tableKeys();
3734 const vector<uint32_t>& columns = exps->columnKeys();
3735 bool tableInOuterQuery = false;
3736 set<uint32_t> tableSet; // involved unique tables
3737
3738 for (uint64_t i = 0; i < tables.size(); ++i)
3739 {
3740 if (find(jobInfo.tableList.begin(), jobInfo.tableList.end(), tables[i]) !=
3741 jobInfo.tableList.end())
3742 tableSet.insert(tables[i]);
3743 else
3744 tableInOuterQuery = true;
3745 }
3746
3747 if (tableInOuterQuery)
3748 {
3749 // all columns in subquery scope to be projected
3750 for (uint64_t i = 0; i < tables.size(); ++i)
3751 {
3752 // outer-query columns
3753 if (tableSet.find(tables[i]) == tableSet.end())
3754 continue;
3755
3756 // subquery columns
3757 uint32_t c = columns[i];
3758
3759 if (jobInfo.returnColSet.find(c) == jobInfo.returnColSet.end())
3760 {
3761 tableInfoMap[tables[i]].fProjectCols.push_back(c);
3762 jobInfo.pjColList.push_back(getTupleInfo(c, jobInfo));
3763 jobInfo.returnColSet.insert(c);
3764 const SimpleColumn* sc =
3765 dynamic_cast<const SimpleColumn*>(exps->columns()[i]);
3766
3767 if (sc != NULL)
3768 jobInfo.deliveredCols.push_back(SRCP(sc->clone()));
3769 }
3770 }
3771
3772 jobInfo.correlateSteps.push_back(*it++);
3773 continue;
3774 }
3775
3776 // is the expression cross tables?
3777 if (tableSet.size() == 1 && exps->associatedJoinId() == 0)
3778 {
3779 // single table and not in join on clause
3780 uint32_t tid = tables[0];
3781
3782 for (uint64_t i = 0; i < columns.size(); ++i)
3783 tableInfoMap[tid].fColsInExp1.push_back(columns[i]);
3784
3785 tableInfoMap[tid].fOneTableExpSteps.push_back(*it);
3786 }
3787 else
3788 {
3789 // WORKAROUND for limitation on join with filter
3790 if (exps->associatedJoinId() != 0)
3791 {
3792 for (uint64_t i = 0; i < exps->columns().size(); ++i)
3793 {
3794 jobInfo.joinFeTableMap[exps->associatedJoinId()].insert(tables[i]);
3795 }
3796 }
3797
3798 // resolve after join: cross table or on clause conditions
3799 for (uint64_t i = 0; i < columns.size(); ++i)
3800 {
3801 uint32_t cid = columns[i];
3802 uint32_t tid = getTableKey(jobInfo, cid);
3803 tableInfoMap[tid].fColsInExp2.push_back(cid);
3804 }
3805
3806 expSteps.push_back(*it);
3807 }
3808 }
3809 // Separate the other steps by unique ID.
3810 else
3811 {
3812 uint32_t tid = -1;
3813 uint64_t cid = (*it)->tupleId();
3814
3815 if (cid != (uint64_t) - 1)
3816 tid = getTableKey(jobInfo, (*it)->tupleId());
3817 else
3818 tid = getTableKey(jobInfo, it->get());
3819
3820 if (find(jobInfo.tableList.begin(), jobInfo.tableList.end(), tid) !=
3821 jobInfo.tableList.end())
3822 {
3823 tableInfoMap[tid].fQuerySteps.push_back(*it);
3824 }
3825 else
3826 {
3827 jobInfo.correlateSteps.push_back(*it);
3828 }
3829 }
3830
3831 it++;
3832 }
3833
3834 // @bug2634, delay isNull filter on outerjoin key
3835 // @bug5374, delay predicates for outerjoin
3836 outjoinPredicateAdjust(tableInfoMap, jobInfo);
3837
3838 // @bug4021, make sure there is real column to scan
3839 for (TableInfoMap::iterator it = tableInfoMap.begin(); it != tableInfoMap.end(); it++)
3840 {
3841 uint32_t tableUid = it->first;
3842
3843 if (jobInfo.pseudoColTable.find(tableUid) == jobInfo.pseudoColTable.end())
3844 continue;
3845
3846 JobStepVector& steps = tableInfoMap[tableUid].fQuerySteps;
3847 JobStepVector::iterator s = steps.begin();
3848 JobStepVector::iterator p = steps.end();
3849
3850 for (; s != steps.end(); s++)
3851 {
3852 if (typeid(*(s->get())) == typeid(pColScanStep) ||
3853 typeid(*(s->get())) == typeid(pColStep))
3854 break;
3855
3856 // @bug5893, iterator to the first pseudocolumn
3857 if (typeid(*(s->get())) == typeid(PseudoColStep) && p == steps.end())
3858 p = s;
3859 }
3860
3861 if (s == steps.end())
3862 {
3863 map<uint64_t, SRCP>::iterator t = jobInfo.tableColMap.find(tableUid);
3864
3865 if (t == jobInfo.tableColMap.end())
3866 {
3867 string msg = jobInfo.keyInfo->tupleKeyToName[tableUid];
3868 msg += " has no column in column map.";
3869 throw runtime_error(msg);
3870 }
3871
3872 SimpleColumn* sc = dynamic_cast<SimpleColumn*>(t->second.get());
3873 CalpontSystemCatalog::OID oid = sc->oid();
3874 CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
3875 CalpontSystemCatalog::ColType ct = sc->colType();
3876 string alias(extractTableAlias(sc));
3877 SJSTEP sjs(new pColScanStep(oid, tblOid, ct, jobInfo));
3878 sjs->alias(alias);
3879 sjs->view(sc->viewName());
3880 sjs->schema(sc->schemaName());
3881 sjs->name(sc->columnName());
3882 TupleInfo ti(setTupleInfo(ct, oid, jobInfo, tblOid, sc, alias));
3883 sjs->tupleId(ti.key);
3884 steps.insert(steps.begin(), sjs);
3885
3886 if (isDictCol(ct) && jobInfo.tokenOnly.find(ti.key) == jobInfo.tokenOnly.end())
3887 jobInfo.tokenOnly[ti.key] = true;
3888 }
3889 else if (s > p)
3890 {
3891 // @bug5893, make sure a pcol is in front of any pseudo step.
3892 SJSTEP t = *s;
3893 *s = *p;
3894 *p = t;
3895 }
3896 }
3897
3898 // @bug3767, error out scalar subquery with aggregation and correlated additional comparison.
3899 if (jobInfo.hasAggregation && (jobInfo.correlateSteps.size() > 0))
3900 {
3901 // expression filter
3902 ExpressionStep* exp = NULL;
3903
3904 for (it = jobInfo.correlateSteps.begin(); it != jobInfo.correlateSteps.end(); it++)
3905 {
3906 if (((exp = dynamic_cast<ExpressionStep*>(it->get())) != NULL) && (!exp->functionJoin()))
3907 break;
3908
3909 exp = NULL;
3910 }
3911
3912 // correlated join step
3913 TupleHashJoinStep* thjs = NULL;
3914
3915 for (it = jobInfo.correlateSteps.begin(); it != jobInfo.correlateSteps.end(); it++)
3916 {
3917 if ((thjs = dynamic_cast<TupleHashJoinStep*>(it->get())) != NULL)
3918 break;
3919 }
3920
3921 // @bug5202, error out not equal correlation and aggregation in subquery.
3922 if ((exp != NULL) && (thjs != NULL) && (thjs->getJoinType() & CORRELATED))
3923 throw IDBExcept(
3924 IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_NEQ_AGG_SUB),
3925 ERR_NON_SUPPORT_NEQ_AGG_SUB);
3926 }
3927
3928 it = projectSteps.begin();
3929 end = projectSteps.end();
3930
3931 while (it != end)
3932 {
3933 uint32_t tid = getTableKey(jobInfo, (*it)->tupleId());
3934 tableInfoMap[tid].fProjectSteps.push_back(*it);
3935 tableInfoMap[tid].fProjectCols.push_back((*it)->tupleId());
3936 it++;
3937 }
3938
3939 for (TupleInfoVector::iterator j = jobInfo.pjColList.begin(); j != jobInfo.pjColList.end(); j++)
3940 {
3941 if (jobInfo.keyInfo->tupleKeyVec[j->tkey].fId == CNX_EXP_TABLE_ID)
3942 continue;
3943
3944 vector<uint32_t>& projectCols = tableInfoMap[j->tkey].fProjectCols;
3945
3946 if (find(projectCols.begin(), projectCols.end(), j->key) == projectCols.end())
3947 projectCols.push_back(j->key);
3948 }
3949
3950 JobStepVector& retExp = jobInfo.returnedExpressions;
3951
3952 for (it = retExp.begin(); it != retExp.end(); ++it)
3953 {
3954 ExpressionStep* exp = dynamic_cast<ExpressionStep*>(it->get());
3955
3956 if (exp == NULL)
3957 throw runtime_error("Not an expression.");
3958
3959 for (uint64_t i = 0; i < exp->columnKeys().size(); ++i)
3960 {
3961 tableInfoMap[exp->tableKeys()[i]].fColsInRetExp.push_back(exp->columnKeys()[i]);
3962 }
3963 }
3964
3965 // reset all step vector
3966 querySteps.clear();
3967 projectSteps.clear();
3968 deliverySteps.clear();
3969
3970 // Check if the tables and joins can be used to construct a spanning tree.
3971 spanningTreeCheck(tableInfoMap, joinSteps, jobInfo);
3972
3973 // 1. combine job steps for each table
3974 TableInfoMap::iterator mit;
3975
3976 for (mit = tableInfoMap.begin(); mit != tableInfoMap.end(); mit++)
3977 if (combineJobStepsByTable(mit, jobInfo) == false)
3978 throw runtime_error("combineJobStepsByTable failed.");
3979
3980 // 2. join the combined steps together to form the spanning tree
3981 vector<uint32_t> joinOrder;
3982 joinTables(joinSteps, tableInfoMap, jobInfo, joinOrder, overrideLargeSideEstimate);
3983
3984 // 3. put the steps together
3985 for (vector<uint32_t>::iterator i = joinOrder.begin(); i != joinOrder.end(); ++i)
3986 querySteps.insert(querySteps.end(),
3987 tableInfoMap[*i].fQuerySteps.begin(),
3988 tableInfoMap[*i].fQuerySteps.end());
3989
3990 adjustLastStep(querySteps, deliverySteps, jobInfo); // to match the select clause
3991 }
3992
3993
unionQueries(JobStepVector & queries,uint64_t distinctUnionNum,JobInfo & jobInfo)3994 SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo& jobInfo)
3995 {
3996 vector<RowGroup> inputRGs;
3997 vector<bool> distinct;
3998 uint64_t colCount = jobInfo.deliveredCols.size();
3999
4000 vector<uint32_t> oids;
4001 vector<uint32_t> keys;
4002 vector<uint32_t> scale;
4003 vector<uint32_t> precision;
4004 vector<uint32_t> width;
4005 vector<CalpontSystemCatalog::ColDataType> types;
4006 vector<uint32_t> csNums;
4007 JobStepAssociation jsaToUnion;
4008
4009 // bug4388, share code with connector for column type coversion
4010 vector<vector<CalpontSystemCatalog::ColType> > queryColTypes;
4011
4012 for (uint64_t j = 0; j < colCount; ++j)
4013 queryColTypes.push_back(vector<CalpontSystemCatalog::ColType>(queries.size()));
4014
4015 for (uint64_t i = 0; i < queries.size(); i++)
4016 {
4017 SJSTEP& spjs = queries[i];
4018 TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());
4019
4020 if (tds == NULL)
4021 {
4022 throw runtime_error("Not a deliverable step.");
4023 }
4024
4025 const RowGroup& rg = tds->getDeliveredRowGroup();
4026 inputRGs.push_back(rg);
4027
4028 const vector<uint32_t>& scaleIn = rg.getScale();
4029 const vector<uint32_t>& precisionIn = rg.getPrecision();
4030 const vector<CalpontSystemCatalog::ColDataType>& typesIn = rg.getColTypes();
4031 const vector<uint32_t>& csNumsIn = rg.getCharsetNumbers();
4032
4033 for (uint64_t j = 0; j < colCount; ++j)
4034 {
4035 queryColTypes[j][i].colDataType = typesIn[j];
4036 queryColTypes[j][i].charsetNumber = csNumsIn[j];
4037 queryColTypes[j][i].scale = scaleIn[j];
4038 queryColTypes[j][i].precision = precisionIn[j];
4039 queryColTypes[j][i].colWidth = rg.getColumnWidth(j);
4040 }
4041
4042 if (i == 0)
4043 {
4044 const vector<uint32_t>& oidsIn = rg.getOIDs();
4045 const vector<uint32_t>& keysIn = rg.getKeys();
4046 oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + colCount);
4047 keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + colCount);
4048 }
4049
4050 // if all union types are UNION_ALL, distinctUnionNum is 0.
4051 distinct.push_back(distinctUnionNum > i);
4052
4053 AnyDataListSPtr spdl(new AnyDataList());
4054 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
4055 spdl->rowGroupDL(dl);
4056 dl->OID(CNX_VTABLE_ID);
4057 JobStepAssociation jsa;
4058 jsa.outAdd(spdl);
4059 spjs->outputAssociation(jsa);
4060 jsaToUnion.outAdd(spdl);
4061 }
4062
4063 AnyDataListSPtr spdl(new AnyDataList());
4064 RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
4065 spdl->rowGroupDL(dl);
4066 dl->OID(CNX_VTABLE_ID);
4067 JobStepAssociation jsa;
4068 jsa.outAdd(spdl);
4069 TupleUnion* unionStep = new TupleUnion(CNX_VTABLE_ID, jobInfo);
4070 unionStep->inputAssociation(jsaToUnion);
4071 unionStep->outputAssociation(jsa);
4072
4073 // get unioned column types
4074 for (uint64_t j = 0; j < colCount; ++j)
4075 {
4076 CalpontSystemCatalog::ColType colType = DataConvert::convertUnionColType(queryColTypes[j]);
4077 types.push_back(colType.colDataType);
4078 csNums.push_back(colType.charsetNumber);
4079 scale.push_back(colType.scale);
4080 precision.push_back(colType.precision);
4081 width.push_back(colType.colWidth);
4082 }
4083
4084 vector<uint32_t> pos;
4085 pos.push_back(2);
4086
4087 for (uint64_t i = 0; i < oids.size(); ++i)
4088 pos.push_back(pos[i] + width[i]);
4089
4090 unionStep->setInputRowGroups(inputRGs);
4091 unionStep->setDistinctFlags(distinct);
4092 unionStep->setOutputRowGroup(RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold));
4093
4094 // Fix for bug 4388 adjusts the result type at connector side, this workaround is obsolete.
4095 // bug 3067, update the returned column types.
4096 // This is a workaround as the connector always uses the first query' returned columns.
4097 // ct.colDataType = types[i];
4098 // ct.scale = scale[i];
4099 // ct.colWidth = width[i];
4100
4101 for (size_t i = 0; i < jobInfo.deliveredCols.size(); i++)
4102 {
4103 CalpontSystemCatalog::ColType ct = jobInfo.deliveredCols[i]->resultType();
4104 //XXX remove after connector change
4105 ct.colDataType = types[i];
4106 ct.scale = scale[i];
4107 ct.colWidth = width[i];
4108
4109 // varchar/varbinary column width has been fudged, see fudgeWidth in jlf_common.cpp.
4110 if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
4111 ct.colWidth--;
4112 else if (ct.colDataType == CalpontSystemCatalog::VARBINARY)
4113 ct.colWidth -= 2;
4114
4115 jobInfo.deliveredCols[i]->resultType(ct);
4116 }
4117
4118 if (jobInfo.trace)
4119 {
4120 cout << boldStart << "\ninput RGs: (distinct=" << distinctUnionNum << ")\n" << boldStop;
4121
4122 for (vector<RowGroup>::iterator i = inputRGs.begin(); i != inputRGs.end(); i++)
4123 cout << i->toString() << endl << endl;
4124
4125 cout << boldStart << "output RG:\n" << boldStop
4126 << unionStep->getDeliveredRowGroup().toString() << endl;
4127 }
4128
4129 return SJSTEP(unionStep);
4130 }
4131
4132 }
4133 // vim:ts=4 sw=4:
4134
4135