1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: subquerytransformer.cpp 6406 2010-03-26 19:18:37Z xlou $
20
21
22 #include <iostream>
23 //#define NDEBUG
24 #include <cassert>
25 using namespace std;
26
27 #include <boost/scoped_ptr.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/algorithm/string/case_conv.hpp>
30 using namespace boost;
31
32 #include "aggregatecolumn.h"
33 #include "windowfunctioncolumn.h"
34 using namespace execplan;
35
36 #include "rowgroup.h"
37 using namespace rowgroup;
38
39 #include "errorids.h"
40 #include "exceptclasses.h"
41 using namespace logging;
42
43 #include "jobstep.h"
44 #include "jlf_common.h"
45 #include "jlf_tuplejoblist.h"
46 #include "distributedenginecomm.h"
47 #include "expressionstep.h"
48 #include "tuplehashjoin.h"
49 #include "subquerystep.h"
50 #include "subquerytransformer.h"
51
52
53 namespace joblist
54 {
55
SubQueryTransformer(JobInfo * jobInfo,SErrorInfo & err)56 SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err) :
57 fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
58 {
59 }
60
61
SubQueryTransformer(JobInfo * jobInfo,SErrorInfo & err,const string & view)62 SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err,
63 const string& view) :
64 fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
65 {
66 fVtable.view(view);
67 }
68
69
SubQueryTransformer(JobInfo * jobInfo,SErrorInfo & err,const string & alias,const string & view)70 SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err,
71 const string& alias,
72 const string& view) :
73 fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
74 {
75 fVtable.alias(alias);
76 fVtable.view(view);
77 }
78
79
~SubQueryTransformer()80 SubQueryTransformer::~SubQueryTransformer()
81 {
82 // OK to delete NULL ptr
83 delete fSubJobInfo;
84 fSubJobInfo = NULL;
85 }
86
87
makeSubQueryStep(execplan::CalpontSelectExecutionPlan * csep,bool subInFromClause)88 SJSTEP& SubQueryTransformer::makeSubQueryStep(execplan::CalpontSelectExecutionPlan* csep,
89 bool subInFromClause)
90 {
91 if (fOutJobInfo->trace)
92 cout << (*csep) << endl;
93
94 // Setup job info, job list and error status relation.
95 fSubJobInfo = new JobInfo(fOutJobInfo->rm);
96 fSubJobInfo->sessionId = fOutJobInfo->sessionId;
97 fSubJobInfo->txnId = fOutJobInfo->txnId;
98 fSubJobInfo->verId = fOutJobInfo->verId;
99 fSubJobInfo->statementId = fOutJobInfo->statementId;
100 fSubJobInfo->queryType = fOutJobInfo->queryType;
101 fSubJobInfo->csc = fOutJobInfo->csc;
102 fSubJobInfo->trace = fOutJobInfo->trace;
103 fSubJobInfo->traceFlags = fOutJobInfo->traceFlags;
104 fSubJobInfo->isExeMgr = fOutJobInfo->isExeMgr;
105 fSubJobInfo->subLevel = fOutJobInfo->subLevel + 1;
106 fSubJobInfo->keyInfo = fOutJobInfo->keyInfo;
107 fSubJobInfo->stringScanThreshold = fOutJobInfo->stringScanThreshold;
108 fSubJobInfo->tryTuples = true;
109 fSubJobInfo->errorInfo = fErrorInfo;
110 fOutJobInfo->subNum++;
111 fSubJobInfo->subCount = fOutJobInfo->subCount;
112 fSubJobInfo->subId = ++(*(fSubJobInfo->subCount));
113 fSubJobInfo->pJobInfo = fOutJobInfo;
114 fSubJobList.reset(new TupleJobList(true));
115 fSubJobList->priority(csep->priority());
116 fSubJobInfo->projectingTableOID = fSubJobList->projectingTableOIDPtr();
117 fSubJobInfo->jobListPtr = fSubJobList.get();
118 fSubJobInfo->stringTableThreshold = fOutJobInfo->stringTableThreshold;
119 fSubJobInfo->localQuery = fOutJobInfo->localQuery;
120 fSubJobInfo->uuid = fOutJobInfo->uuid;
121 fOutJobInfo->jobListPtr->addSubqueryJobList(fSubJobList);
122
123 fSubJobInfo->smallSideLimit = fOutJobInfo->smallSideLimit;
124 fSubJobInfo->largeSideLimit = fOutJobInfo->largeSideLimit;
125 fSubJobInfo->smallSideUsage = fOutJobInfo->smallSideUsage;
126 fSubJobInfo->partitionSize = fOutJobInfo->partitionSize;
127 fSubJobInfo->umMemLimit = fOutJobInfo->umMemLimit;
128 fSubJobInfo->isDML = fOutJobInfo->isDML;
129
130 // Update v-table's alias.
131 fVtable.name("$sub");
132
133 if (fVtable.alias().empty())
134 {
135 ostringstream oss;
136 oss << "$sub_"
137 << fSubJobInfo->subId
138 << "_" << fSubJobInfo->subLevel
139 << "_" << fOutJobInfo->subNum;
140 fVtable.alias(oss.str());
141 }
142
143 fSubJobInfo->subAlias = fVtable.alias(); //@bug5844, unique alias for sub
144
145 // Make the jobsteps out of the execution plan.
146 JobStepVector querySteps;
147 JobStepVector projectSteps;
148 DeliveredTableMap deliverySteps;
149
150 if (csep->unionVec().size() == 0)
151 makeJobSteps(csep, *fSubJobInfo, querySteps, projectSteps, deliverySteps);
152 else if (subInFromClause)
153 makeUnionJobSteps(csep, *fSubJobInfo, querySteps, projectSteps, deliverySteps);
154 else
155 throw IDBExcept(ERR_UNION_IN_SUBQUERY);
156
157 if (fSubJobInfo->trace)
158 {
159 ostringstream oss;
160 oss << boldStart
161 << "\nsubquery " << fSubJobInfo->subLevel << "." << fOutJobInfo->subNum << " steps:"
162 << boldStop << endl;
163 ostream_iterator<JobStepVector::value_type> oIter(oss, "\n");
164 copy(querySteps.begin(), querySteps.end(), oIter);
165 cout << oss.str();
166 }
167
168 // Add steps to the joblist.
169 fSubJobList->addQuery(querySteps);
170 fSubJobList->addDelivery(deliverySteps);
171 fSubJobList->putEngineComm(DistributedEngineComm::instance(fOutJobInfo->rm));
172 csep->setDynamicParseTreeVec(fSubJobInfo->dynamicParseTreeVec);
173
174 // Get the correlated steps
175 fCorrelatedSteps = fSubJobInfo->correlateSteps;
176 fSubReturnedCols = fSubJobInfo->deliveredCols;
177
178 // Convert subquery to step.
179 SubQueryStep* sqs =
180 new SubQueryStep(*fSubJobInfo);
181 sqs->tableOid(fVtable.tableOid());
182 sqs->alias(fVtable.alias());
183 sqs->subJoblist(fSubJobList);
184 sqs->setOutputRowGroup(fSubJobList->getOutputRowGroup());
185 AnyDataListSPtr spdl(new AnyDataList());
186 RowGroupDL* dl = new RowGroupDL(1, fSubJobInfo->fifoSize);
187 spdl->rowGroupDL(dl);
188 dl->OID(fVtable.tableOid());
189 JobStepAssociation jsa;
190 jsa.outAdd(spdl);
191 (querySteps.back())->outputAssociation(jsa);
192 sqs->outputAssociation(jsa);
193 fSubQueryStep.reset(sqs);
194
195 // Update the v-table columns and rowgroup
196 vector<uint32_t> pos;
197 vector<uint32_t> oids;
198 vector<uint32_t> keys;
199 vector<uint32_t> scale;
200 vector<uint32_t> precision;
201 vector<CalpontSystemCatalog::ColDataType> types;
202 vector<uint32_t> csNums;
203 pos.push_back(2);
204
205 CalpontSystemCatalog::OID tblOid = fVtable.tableOid();
206 string tableName = fVtable.name();
207 string alias = fVtable.alias();
208 const RowGroup& rg = fSubJobList->getOutputRowGroup();
209 Row row;
210 rg.initRow(&row);
211 uint64_t outputCols = rg.getColumnCount() < fSubReturnedCols.size() ?
212 rg.getColumnCount() : fSubReturnedCols.size() ;
213
214 for (uint64_t i = 0; i < outputCols; i++)
215 {
216 fVtable.addColumn(fSubReturnedCols[i]);
217
218 // make sure the column type is the same as rowgroup
219 CalpontSystemCatalog::ColType ct = fVtable.columnType(i);
220 CalpontSystemCatalog::ColDataType colDataTypeInRg = row.getColTypes()[i];
221
222 if (dynamic_cast<AggregateColumn*>(fSubReturnedCols[i].get()) != NULL ||
223 dynamic_cast<WindowFunctionColumn*>(fSubReturnedCols[i].get()) != NULL)
224 {
225 // skip char/varchar/varbinary column because the colWidth in row is fudged.
226 if (colDataTypeInRg != CalpontSystemCatalog::VARCHAR &&
227 colDataTypeInRg != CalpontSystemCatalog::CHAR &&
228 colDataTypeInRg != CalpontSystemCatalog::VARBINARY &&
229 colDataTypeInRg != CalpontSystemCatalog::TEXT &&
230 colDataTypeInRg != CalpontSystemCatalog::BLOB)
231 {
232 ct.colWidth = row.getColumnWidth(i);
233 ct.colDataType = row.getColTypes()[i];
234 ct.charsetNumber = row.getCharsetNumber(i);
235 ct.scale = row.getScale(i);
236
237 if (colDataTypeInRg != CalpontSystemCatalog::FLOAT &&
238 colDataTypeInRg != CalpontSystemCatalog::UFLOAT &&
239 colDataTypeInRg != CalpontSystemCatalog::DOUBLE &&
240 colDataTypeInRg != CalpontSystemCatalog::UDOUBLE &&
241 colDataTypeInRg != CalpontSystemCatalog::LONGDOUBLE)
242 {
243 if (ct.scale != 0 && ct.precision != -1)
244 ct.colDataType = CalpontSystemCatalog::DECIMAL;
245 }
246
247 ct.precision = row.getPrecision(i);
248 fVtable.columnType(ct, i);
249 }
250 }
251 // MySQL timestamp/time/date/datetime type is different from IDB type
252 else if (colDataTypeInRg == CalpontSystemCatalog::DATE ||
253 colDataTypeInRg == CalpontSystemCatalog::DATETIME ||
254 colDataTypeInRg == CalpontSystemCatalog::TIMESTAMP ||
255 colDataTypeInRg == CalpontSystemCatalog::TIME)
256 {
257 ct.colWidth = row.getColumnWidth(i);
258 ct.colDataType = row.getColTypes()[i];
259 ct.scale = row.getScale(i);
260 ct.precision = row.getPrecision(i);
261 fVtable.columnType(ct, i);
262 }
263
264 // build tuple info to export to outer query
265 TupleInfo ti(setTupleInfo(fVtable.columnType(i), fVtable.columnOid(i), *fOutJobInfo,
266 tblOid, fVtable.columns()[i].get(), alias));
267
268 if (i < rg.getColumnCount())
269 {
270 pos.push_back(pos.back() + ti.width);
271 oids.push_back(ti.oid);
272 keys.push_back(ti.key);
273 types.push_back(ti.dtype);
274 csNums.push_back(ti.csNum);
275 scale.push_back(ti.scale);
276 precision.push_back(ti.precision);
277 }
278
279 fOutJobInfo->vtableColTypes[UniqId(fVtable.columnOid(i), fVtable.alias(), "", "")] =
280 fVtable.columnType(i);
281 }
282
283 RowGroup rg1(oids.size(), pos, oids, keys, types, csNums, scale, precision, csep->stringTableThreshold());
284 rg1.setUseStringTable(rg.usesStringTable());
285
286 dynamic_cast<SubQueryStep*>(fSubQueryStep.get())->setOutputRowGroup(rg1);
287
288 return fSubQueryStep;
289 }
290
291
checkCorrelateInfo(TupleHashJoinStep * thjs,const JobInfo & jobInfo)292 void SubQueryTransformer::checkCorrelateInfo(TupleHashJoinStep* thjs, const JobInfo& jobInfo)
293 {
294 int pos = (thjs->correlatedSide() == 1) ? thjs->sequence2() : thjs->sequence1();
295
296 if (pos == -1 || (size_t) pos >= fVtable.columns().size())
297 {
298 uint64_t id = (thjs->correlatedSide() == 1) ? thjs->tupleId2() : thjs->tupleId1();
299 string alias = jobInfo.keyInfo->tupleKeyVec[id].fTable;
300 string name = jobInfo.keyInfo->keyName[id];
301
302 if (!name.empty() && alias.length() > 0)
303 name = alias + "." + name;
304
305 Message::Args args;
306 args.add(name);
307 string errMsg(IDBErrorInfo::instance()->errorMsg(ERR_CORRELATE_COL_MISSING, args));
308 cerr << errMsg << ": " << pos << endl;
309 throw IDBExcept(errMsg, ERR_CORRELATE_COL_MISSING);
310 }
311 }
312
313
updateCorrelateInfo()314 void SubQueryTransformer::updateCorrelateInfo()
315 {
316 // put vtable into the table list to resolve correlated filters
317 // Temp fix for @bug3932 until outer join has no dependency on table order.
318 // Insert at [1], not to mess with OUTER join and hint(INFINIDB_ORDERED -- bug2317).
319 fOutJobInfo->tableList.insert(fOutJobInfo->tableList.begin() + 1, makeTableKey(
320 *fOutJobInfo, fVtable.tableOid(), fVtable.name(), fVtable.alias(), "", fVtable.view()));
321
322 // tables in outer level
323 set<uint32_t> outTables;
324
325 for (uint64_t i = 0; i < fOutJobInfo->tableList.size(); i++)
326 outTables.insert(fOutJobInfo->tableList[i]);
327
328 // tables in subquery level
329 set<uint32_t> subTables;
330
331 for (uint64_t i = 0; i < fSubJobInfo->tableList.size(); i++)
332 subTables.insert(fSubJobInfo->tableList[i]);
333
334 // any function joins
335 fOutJobInfo->functionJoins.insert(fOutJobInfo->functionJoins.end(),
336 fSubJobInfo->functionJoins.begin(), fSubJobInfo->functionJoins.end());
337
338 // Update correlated steps
339 const map<UniqId, uint32_t>& subMap = fVtable.columnMap();
340
341 for (JobStepVector::iterator i = fCorrelatedSteps.begin(); i != fCorrelatedSteps.end(); i++)
342 {
343 TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
344 ExpressionStep* es = dynamic_cast<ExpressionStep*>(i->get());
345
346 if (thjs)
347 {
348 if (thjs->getJoinType() & CORRELATED)
349 {
350 checkCorrelateInfo(thjs, *fSubJobInfo);
351 thjs->setJoinType(thjs->getJoinType() ^ CORRELATED);
352
353 if (thjs->correlatedSide() == 1)
354 {
355 int pos = thjs->sequence2();
356 thjs->tableOid2(fVtable.tableOid());
357 thjs->oid2(fVtable.columnOid(pos));
358 thjs->alias2(fVtable.alias());
359 thjs->view2(fVtable.columns()[pos]->viewName());
360 thjs->schema2(fVtable.columns()[pos]->schemaName());
361 thjs->dictOid2(0);
362 thjs->sequence2(-1);
363 thjs->joinId(fOutJobInfo->joinNum++);
364
365 CalpontSystemCatalog::ColType ct2 = fVtable.columnType(pos);
366 TupleInfo ti(setTupleInfo(ct2, thjs->oid2(), *fOutJobInfo, thjs->tableOid2(),
367 fVtable.columns()[pos].get(), thjs->alias2()));
368 thjs->tupleId2(ti.key);
369 }
370 else
371 {
372 int pos = thjs->sequence1();
373 thjs->tableOid1(fVtable.tableOid());
374 thjs->oid1(fVtable.columnOid(pos));
375 thjs->alias1(fVtable.alias());
376 thjs->view1(fVtable.columns()[pos]->viewName());
377 thjs->schema1(fVtable.columns()[pos]->schemaName());
378 thjs->dictOid1(0);
379 thjs->sequence1(-1);
380 thjs->joinId(fOutJobInfo->joinNum++);
381
382 CalpontSystemCatalog::ColType ct1 = fVtable.columnType(pos);
383 TupleInfo ti(setTupleInfo(ct1, thjs->oid1(), *fOutJobInfo, thjs->tableOid1(),
384 fVtable.columns()[pos].get(), thjs->alias1()));
385 thjs->tupleId1(ti.key);
386 }
387 }
388
389 /*
390 else // oid1 == 0, dictionary column
391 {
392 CalpontSystemCatalog::ColType dct;
393 dct.colDataType = CalpontSystemCatalog::BIGINT;
394 dct.colWidth = 8;
395 dct.scale = 0;
396 dct.precision = 0;
397 }
398 */
399 }
400 else if (es)
401 {
402 // already handled by function join
403 if (es->functionJoin())
404 continue;
405
406 vector<ReturnedColumn*>& scList = es->columns();
407 vector<CalpontSystemCatalog::OID>& tableOids = es->tableOids();
408 vector<string>& aliases = es->aliases();
409 vector<string>& views = es->views();
410 vector<string>& schemas = es->schemas();
411 vector<uint32_t>& tableKeys = es->tableKeys();
412 vector<uint32_t>& columnKeys = es->columnKeys();
413
414 // update simple columns
415 for (uint64_t j = 0; j < scList.size(); j++)
416 {
417 SimpleColumn* sc = dynamic_cast<SimpleColumn*>(scList[j]);
418
419 if (sc != NULL)
420 {
421 if (subTables.find(tableKeys[j]) != subTables.end())
422 {
423 const map<UniqId, uint32_t>::const_iterator k =
424 subMap.find(UniqId(sc->oid(), aliases[j], schemas[j], views[j]));
425
426 if (k == subMap.end())
427 throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
428
429 sc->schemaName("");
430 sc->tableName(fVtable.name());
431 sc->tableAlias(fVtable.alias());
432 sc->viewName(fVtable.view());
433 sc->oid(fVtable.columnOid(k->second));
434 sc->columnName(fVtable.columns()[k->second]->columnName());
435 const CalpontSystemCatalog::ColType& ct = fVtable.columnType(k->second);
436 TupleInfo ti = setTupleInfo(
437 ct, sc->oid(), *fOutJobInfo, fVtable.tableOid(), sc, fVtable.alias());
438
439 tableOids[j] = execplan::CNX_VTABLE_ID;
440 aliases[j] = sc->tableAlias();
441 views[j] = sc->viewName();
442 schemas[j] = sc->schemaName();
443 columnKeys[j] = ti.key;
444 tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
445 }
446 else
447 {
448 const CalpontSystemCatalog::ColType&
449 ct = fOutJobInfo->keyInfo->colType[columnKeys[j]];
450 sc->joinInfo(0);
451 TupleInfo ti = setTupleInfo(
452 ct, sc->oid(), *fOutJobInfo, tableOids[j], sc, aliases[j]);
453
454 columnKeys[j] = ti.key;
455 tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
456 }
457 }
458 else if (dynamic_cast<WindowFunctionColumn*>(scList[j]) != NULL)
459 {
460 // workaround for window function IN/EXISTS subquery
461 const map<UniqId, uint32_t>::const_iterator k =
462 subMap.find(UniqId(scList[j]->expressionId(), "", "", ""));
463
464 if (k == subMap.end())
465 throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
466
467 sc = fVtable.columns()[k->second].get();
468 es->substitute(j, fVtable.columns()[k->second]);
469 CalpontSystemCatalog::ColType ct = sc->colType();
470 string alias(extractTableAlias(sc));
471 CalpontSystemCatalog::OID tblOid = fVtable.tableOid();
472 TupleInfo ti(setTupleInfo(ct, sc->oid(), *fOutJobInfo, tblOid, sc, alias));
473
474 tableOids[j] = execplan::CNX_VTABLE_ID;
475 columnKeys[j] = ti.key;
476 tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
477 }
478 }
479
480 es->updateColumnOidAlias(*fOutJobInfo);
481
482 // update function join info
483 boost::shared_ptr<FunctionJoinInfo>& fji = es->functionJoinInfo();
484
485 if (fji && fji->fCorrelatedSide)
486 {
487 // replace sub side with a virtual column
488 int64_t subSide = (fji->fCorrelatedSide == 1) ? 1 : 0;
489 ReturnedColumn* rc = fji->fExpression[subSide];
490 SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
491
492 if (sc == NULL)
493 {
494 UniqId colId = UniqId(rc->expressionId(), "", "", "");
495 const map<UniqId, uint32_t>::const_iterator k = subMap.find(colId);
496
497 if (k == subMap.end())
498 throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
499
500 SSC vc = fVtable.columns()[k->second];
501 sc = vc.get();
502 }
503
504 // virtual table info in outer query
505 TupleInfo ti(setTupleInfo(sc->colType(), sc->oid(), *fOutJobInfo,
506 fVtable.tableOid(), sc, fVtable.alias()));
507
508 set<uint32_t> cids;
509 cids.insert(ti.key);
510 fji->fJoinKey[subSide] = ti.key;
511 fji->fTableKey[subSide] = ti.tkey;
512 fji->fColumnKeys[subSide] = cids;
513 fji->fTableOid[subSide] = fVtable.tableOid();
514 fji->fAlias[subSide] = fVtable.alias();
515 fji->fView[subSide] = fVtable.view();
516 fji->fSchema[subSide] = "";
517
518 // turn off correlated flag
519 fji->fExpression[fji->fCorrelatedSide - 1]->joinInfo(0);
520 fji->fJoinType ^= CORRELATED;
521 fji->fJoinId = 0;
522 }
523 }
524 else
525 {
526 JobStep* j = i->get();
527 uint32_t tid = -1;
528 uint64_t cid = j->tupleId();
529
530 if (cid != (uint64_t) - 1)
531 tid = getTableKey(*fOutJobInfo, cid);
532 else
533 tid = getTableKey(*fOutJobInfo, j);
534
535 if (outTables.find(tid) == outTables.end())
536 {
537 if (subMap.find(
538 UniqId(j->oid(), j->alias(), j->schema(), j->view(), 0)) != subMap.end())
539 //throw CorrelateFailExcept();
540 throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
541 }
542 }
543 }
544 }
545
546
run()547 void SubQueryTransformer::run()
548 {
549 // not to be called for base class
550 }
551
552
553 // ------ SimpleScalarTransformer ------
SimpleScalarTransformer(JobInfo * jobInfo,SErrorInfo & status,bool e)554 SimpleScalarTransformer::SimpleScalarTransformer(JobInfo* jobInfo, SErrorInfo& status, bool e) :
555 SubQueryTransformer(jobInfo, status),
556 fInputDl(NULL),
557 fDlIterator(-1),
558 fEmptyResultSet(true),
559 fExistFilter(e)
560 {
561 }
562
563
SimpleScalarTransformer(const SubQueryTransformer & rhs)564 SimpleScalarTransformer::SimpleScalarTransformer(const SubQueryTransformer& rhs) :
565 SubQueryTransformer(rhs.outJobInfo(), rhs.errorInfo()),
566 fInputDl(NULL),
567 fDlIterator(-1),
568 fEmptyResultSet(true),
569 fExistFilter(false)
570 {
571 fSubJobList = rhs.subJobList();
572 fSubQueryStep = rhs.subQueryStep();
573 }
574
575
~SimpleScalarTransformer()576 SimpleScalarTransformer::~SimpleScalarTransformer()
577 {
578 }
579
580
run()581 void SimpleScalarTransformer::run()
582 {
583 // set up receiver
584 fRowGroup = dynamic_cast<SubQueryStep*>(fSubQueryStep.get())->getOutputRowGroup();
585 fRowGroup.initRow(&fRow, true);
586 fInputDl = fSubQueryStep->outputAssociation().outAt(0)->rowGroupDL();
587 fDlIterator = fInputDl->getIterator();
588
589 // run the subquery
590 fSubJobList->doQuery();
591
592 // retrieve the scalar result
593 getScalarResult();
594
595 // check result count
596 if (fErrorInfo->errCode == ERR_MORE_THAN_1_ROW)
597 throw MoreThan1RowExcept();
598 }
599
600
getScalarResult()601 void SimpleScalarTransformer::getScalarResult()
602 {
603 RGData rgData;
604 bool more;
605
606 more = fInputDl->next(fDlIterator, &rgData);
607
608 while (more)
609 {
610 fRowGroup.setData(&rgData);
611
612 // Only need one row for scalar filter
613 if (fEmptyResultSet && fRowGroup.getRowCount() == 1)
614 {
615 fEmptyResultSet = false;
616 Row row;
617 fRowGroup.initRow(&row);
618 fRowGroup.getRow(0, &row);
619 fRowData.reset(new uint8_t[fRow.getSize()]);
620 fRow.setData(fRowData.get());
621 copyRow(row, &fRow);
622
623 // For exist filter, stop the query after one or more rows retrieved.
624 if (fExistFilter)
625 {
626 fErrorInfo->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW);
627 fErrorInfo->errCode = ERR_MORE_THAN_1_ROW;
628 }
629 }
630
631 // more than 1 row case:
632 // not empty set, and get new rows
633 // empty set, but get more than 1 rows
634 else if (fRowGroup.getRowCount() > 0)
635 {
636 // Stop the query after some rows retrieved.
637 fEmptyResultSet = false;
638 fErrorInfo->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW);
639 fErrorInfo->errCode = ERR_MORE_THAN_1_ROW;
640 }
641
642 // For scalar filter, have to check all blocks to ensure only one row.
643 if (fErrorInfo->errCode != 0)
644 while (more) more = fInputDl->next(fDlIterator, &rgData);
645 else
646 more = fInputDl->next(fDlIterator, &rgData);
647 }
648 }
649
650
651 }
652 // vim:ts=4 sw=4:
653
654