1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: subquerytransformer.cpp 6406 2010-03-26 19:18:37Z xlou $
20 
21 
22 #include <iostream>
23 //#define NDEBUG
24 #include <cassert>
25 using namespace std;
26 
27 #include <boost/scoped_ptr.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/algorithm/string/case_conv.hpp>
30 using namespace boost;
31 
32 #include "aggregatecolumn.h"
33 #include "windowfunctioncolumn.h"
34 using namespace execplan;
35 
36 #include "rowgroup.h"
37 using namespace rowgroup;
38 
39 #include "errorids.h"
40 #include "exceptclasses.h"
41 using namespace logging;
42 
43 #include "jobstep.h"
44 #include "jlf_common.h"
45 #include "jlf_tuplejoblist.h"
46 #include "distributedenginecomm.h"
47 #include "expressionstep.h"
48 #include "tuplehashjoin.h"
49 #include "subquerystep.h"
50 #include "subquerytransformer.h"
51 
52 
53 namespace joblist
54 {
55 
SubQueryTransformer(JobInfo * jobInfo,SErrorInfo & err)56 SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err) :
57     fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
58 {
59 }
60 
61 
SubQueryTransformer(JobInfo * jobInfo,SErrorInfo & err,const string & view)62 SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err,
63         const string& view) :
64     fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
65 {
66     fVtable.view(view);
67 }
68 
69 
SubQueryTransformer(JobInfo * jobInfo,SErrorInfo & err,const string & alias,const string & view)70 SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err,
71         const string& alias,
72         const string& view) :
73     fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
74 {
75     fVtable.alias(alias);
76     fVtable.view(view);
77 }
78 
79 
~SubQueryTransformer()80 SubQueryTransformer::~SubQueryTransformer()
81 {
82     // OK to delete NULL ptr
83     delete fSubJobInfo;
84     fSubJobInfo = NULL;
85 }
86 
87 
makeSubQueryStep(execplan::CalpontSelectExecutionPlan * csep,bool subInFromClause)88 SJSTEP& SubQueryTransformer::makeSubQueryStep(execplan::CalpontSelectExecutionPlan* csep,
89         bool subInFromClause)
90 {
91     if (fOutJobInfo->trace)
92         cout << (*csep) << endl;
93 
94     // Setup job info, job list and error status relation.
95     fSubJobInfo = new JobInfo(fOutJobInfo->rm);
96     fSubJobInfo->sessionId = fOutJobInfo->sessionId;
97     fSubJobInfo->txnId = fOutJobInfo->txnId;
98     fSubJobInfo->verId = fOutJobInfo->verId;
99     fSubJobInfo->statementId = fOutJobInfo->statementId;
100     fSubJobInfo->queryType = fOutJobInfo->queryType;
101     fSubJobInfo->csc = fOutJobInfo->csc;
102     fSubJobInfo->trace = fOutJobInfo->trace;
103     fSubJobInfo->traceFlags = fOutJobInfo->traceFlags;
104     fSubJobInfo->isExeMgr = fOutJobInfo->isExeMgr;
105     fSubJobInfo->subLevel = fOutJobInfo->subLevel + 1;
106     fSubJobInfo->keyInfo = fOutJobInfo->keyInfo;
107     fSubJobInfo->stringScanThreshold = fOutJobInfo->stringScanThreshold;
108     fSubJobInfo->tryTuples = true;
109     fSubJobInfo->errorInfo = fErrorInfo;
110     fOutJobInfo->subNum++;
111     fSubJobInfo->subCount = fOutJobInfo->subCount;
112     fSubJobInfo->subId = ++(*(fSubJobInfo->subCount));
113     fSubJobInfo->pJobInfo = fOutJobInfo;
114     fSubJobList.reset(new TupleJobList(true));
115     fSubJobList->priority(csep->priority());
116     fSubJobInfo->projectingTableOID = fSubJobList->projectingTableOIDPtr();
117     fSubJobInfo->jobListPtr = fSubJobList.get();
118     fSubJobInfo->stringTableThreshold = fOutJobInfo->stringTableThreshold;
119     fSubJobInfo->localQuery = fOutJobInfo->localQuery;
120     fSubJobInfo->uuid = fOutJobInfo->uuid;
121     fOutJobInfo->jobListPtr->addSubqueryJobList(fSubJobList);
122 
123     fSubJobInfo->smallSideLimit = fOutJobInfo->smallSideLimit;
124     fSubJobInfo->largeSideLimit = fOutJobInfo->largeSideLimit;
125     fSubJobInfo->smallSideUsage = fOutJobInfo->smallSideUsage;
126     fSubJobInfo->partitionSize = fOutJobInfo->partitionSize;
127     fSubJobInfo->umMemLimit = fOutJobInfo->umMemLimit;
128     fSubJobInfo->isDML = fOutJobInfo->isDML;
129 
130     // Update v-table's alias.
131     fVtable.name("$sub");
132 
133     if (fVtable.alias().empty())
134     {
135         ostringstream oss;
136         oss << "$sub_"
137             << fSubJobInfo->subId
138             << "_" << fSubJobInfo->subLevel
139             << "_" << fOutJobInfo->subNum;
140         fVtable.alias(oss.str());
141     }
142 
143     fSubJobInfo->subAlias = fVtable.alias(); //@bug5844, unique alias for sub
144 
145     // Make the jobsteps out of the execution plan.
146     JobStepVector querySteps;
147     JobStepVector projectSteps;
148     DeliveredTableMap deliverySteps;
149 
150     if (csep->unionVec().size() == 0)
151         makeJobSteps(csep, *fSubJobInfo, querySteps, projectSteps, deliverySteps);
152     else if (subInFromClause)
153         makeUnionJobSteps(csep, *fSubJobInfo, querySteps, projectSteps, deliverySteps);
154     else
155         throw IDBExcept(ERR_UNION_IN_SUBQUERY);
156 
157     if (fSubJobInfo->trace)
158     {
159         ostringstream oss;
160         oss << boldStart
161             << "\nsubquery " << fSubJobInfo->subLevel << "." << fOutJobInfo->subNum << " steps:"
162             << boldStop << endl;
163         ostream_iterator<JobStepVector::value_type> oIter(oss, "\n");
164         copy(querySteps.begin(), querySteps.end(), oIter);
165         cout << oss.str();
166     }
167 
168     // Add steps to the joblist.
169     fSubJobList->addQuery(querySteps);
170     fSubJobList->addDelivery(deliverySteps);
171     fSubJobList->putEngineComm(DistributedEngineComm::instance(fOutJobInfo->rm));
172     csep->setDynamicParseTreeVec(fSubJobInfo->dynamicParseTreeVec);
173 
174     // Get the correlated steps
175     fCorrelatedSteps = fSubJobInfo->correlateSteps;
176     fSubReturnedCols = fSubJobInfo->deliveredCols;
177 
178     // Convert subquery to step.
179     SubQueryStep* sqs =
180         new SubQueryStep(*fSubJobInfo);
181     sqs->tableOid(fVtable.tableOid());
182     sqs->alias(fVtable.alias());
183     sqs->subJoblist(fSubJobList);
184     sqs->setOutputRowGroup(fSubJobList->getOutputRowGroup());
185     AnyDataListSPtr spdl(new AnyDataList());
186     RowGroupDL* dl = new RowGroupDL(1, fSubJobInfo->fifoSize);
187     spdl->rowGroupDL(dl);
188     dl->OID(fVtable.tableOid());
189     JobStepAssociation jsa;
190     jsa.outAdd(spdl);
191     (querySteps.back())->outputAssociation(jsa);
192     sqs->outputAssociation(jsa);
193     fSubQueryStep.reset(sqs);
194 
195     // Update the v-table columns and rowgroup
196     vector<uint32_t> pos;
197     vector<uint32_t> oids;
198     vector<uint32_t> keys;
199     vector<uint32_t> scale;
200     vector<uint32_t> precision;
201     vector<CalpontSystemCatalog::ColDataType> types;
202     vector<uint32_t> csNums;
203     pos.push_back(2);
204 
205     CalpontSystemCatalog::OID tblOid = fVtable.tableOid();
206     string tableName = fVtable.name();
207     string alias = fVtable.alias();
208     const RowGroup& rg = fSubJobList->getOutputRowGroup();
209     Row row;
210     rg.initRow(&row);
211     uint64_t outputCols = rg.getColumnCount() < fSubReturnedCols.size() ?
212                           rg.getColumnCount() : fSubReturnedCols.size() ;
213 
214     for (uint64_t i = 0; i < outputCols; i++)
215     {
216         fVtable.addColumn(fSubReturnedCols[i]);
217 
218         // make sure the column type is the same as rowgroup
219         CalpontSystemCatalog::ColType ct = fVtable.columnType(i);
220         CalpontSystemCatalog::ColDataType colDataTypeInRg = row.getColTypes()[i];
221 
222         if (dynamic_cast<AggregateColumn*>(fSubReturnedCols[i].get()) != NULL ||
223                 dynamic_cast<WindowFunctionColumn*>(fSubReturnedCols[i].get()) != NULL)
224         {
225             // skip char/varchar/varbinary column because the colWidth in row is fudged.
226             if (colDataTypeInRg != CalpontSystemCatalog::VARCHAR &&
227                     colDataTypeInRg != CalpontSystemCatalog::CHAR &&
228                     colDataTypeInRg != CalpontSystemCatalog::VARBINARY &&
229                     colDataTypeInRg != CalpontSystemCatalog::TEXT &&
230                     colDataTypeInRg != CalpontSystemCatalog::BLOB)
231             {
232                 ct.colWidth = row.getColumnWidth(i);
233                 ct.colDataType = row.getColTypes()[i];
234                 ct.charsetNumber = row.getCharsetNumber(i);
235                 ct.scale = row.getScale(i);
236 
237                 if (colDataTypeInRg != CalpontSystemCatalog::FLOAT &&
238                         colDataTypeInRg != CalpontSystemCatalog::UFLOAT &&
239                         colDataTypeInRg != CalpontSystemCatalog::DOUBLE &&
240                         colDataTypeInRg != CalpontSystemCatalog::UDOUBLE &&
241                         colDataTypeInRg != CalpontSystemCatalog::LONGDOUBLE)
242                 {
243                     if (ct.scale != 0 && ct.precision != -1)
244                         ct.colDataType = CalpontSystemCatalog::DECIMAL;
245                 }
246 
247                 ct.precision = row.getPrecision(i);
248                 fVtable.columnType(ct, i);
249             }
250         }
251         // MySQL timestamp/time/date/datetime type is different from IDB type
252         else if (colDataTypeInRg == CalpontSystemCatalog::DATE ||
253                  colDataTypeInRg == CalpontSystemCatalog::DATETIME ||
254                  colDataTypeInRg == CalpontSystemCatalog::TIMESTAMP ||
255                  colDataTypeInRg == CalpontSystemCatalog::TIME)
256         {
257             ct.colWidth = row.getColumnWidth(i);
258             ct.colDataType = row.getColTypes()[i];
259             ct.scale = row.getScale(i);
260             ct.precision = row.getPrecision(i);
261             fVtable.columnType(ct, i);
262         }
263 
264         // build tuple info to export to outer query
265         TupleInfo ti(setTupleInfo(fVtable.columnType(i), fVtable.columnOid(i), *fOutJobInfo,
266                                   tblOid, fVtable.columns()[i].get(), alias));
267 
268         if (i < rg.getColumnCount())
269         {
270             pos.push_back(pos.back() + ti.width);
271             oids.push_back(ti.oid);
272             keys.push_back(ti.key);
273             types.push_back(ti.dtype);
274             csNums.push_back(ti.csNum);
275             scale.push_back(ti.scale);
276             precision.push_back(ti.precision);
277         }
278 
279         fOutJobInfo->vtableColTypes[UniqId(fVtable.columnOid(i), fVtable.alias(), "", "")] =
280             fVtable.columnType(i);
281     }
282 
283     RowGroup rg1(oids.size(), pos, oids, keys, types, csNums, scale, precision, csep->stringTableThreshold());
284     rg1.setUseStringTable(rg.usesStringTable());
285 
286     dynamic_cast<SubQueryStep*>(fSubQueryStep.get())->setOutputRowGroup(rg1);
287 
288     return fSubQueryStep;
289 }
290 
291 
checkCorrelateInfo(TupleHashJoinStep * thjs,const JobInfo & jobInfo)292 void SubQueryTransformer::checkCorrelateInfo(TupleHashJoinStep* thjs, const JobInfo& jobInfo)
293 {
294     int pos = (thjs->correlatedSide() == 1) ? thjs->sequence2() : thjs->sequence1();
295 
296     if (pos == -1 || (size_t) pos >= fVtable.columns().size())
297     {
298         uint64_t id = (thjs->correlatedSide() == 1) ? thjs->tupleId2() : thjs->tupleId1();
299         string alias = jobInfo.keyInfo->tupleKeyVec[id].fTable;
300         string name = jobInfo.keyInfo->keyName[id];
301 
302         if (!name.empty() && alias.length() > 0)
303             name = alias + "." + name;
304 
305         Message::Args args;
306         args.add(name);
307         string errMsg(IDBErrorInfo::instance()->errorMsg(ERR_CORRELATE_COL_MISSING, args));
308         cerr << errMsg << ": " << pos << endl;
309         throw IDBExcept(errMsg, ERR_CORRELATE_COL_MISSING);
310     }
311 }
312 
313 
updateCorrelateInfo()314 void SubQueryTransformer::updateCorrelateInfo()
315 {
316     // put vtable into the table list to resolve correlated filters
317     // Temp fix for @bug3932 until outer join has no dependency on table order.
318     // Insert at [1], not to mess with OUTER join and hint(INFINIDB_ORDERED -- bug2317).
319     fOutJobInfo->tableList.insert(fOutJobInfo->tableList.begin() + 1, makeTableKey(
320                                       *fOutJobInfo, fVtable.tableOid(), fVtable.name(), fVtable.alias(), "", fVtable.view()));
321 
322     // tables in outer level
323     set<uint32_t> outTables;
324 
325     for (uint64_t i = 0; i < fOutJobInfo->tableList.size(); i++)
326         outTables.insert(fOutJobInfo->tableList[i]);
327 
328     // tables in subquery level
329     set<uint32_t> subTables;
330 
331     for (uint64_t i = 0; i < fSubJobInfo->tableList.size(); i++)
332         subTables.insert(fSubJobInfo->tableList[i]);
333 
334     // any function joins
335     fOutJobInfo->functionJoins.insert(fOutJobInfo->functionJoins.end(),
336                                       fSubJobInfo->functionJoins.begin(), fSubJobInfo->functionJoins.end());
337 
338     // Update correlated steps
339     const map<UniqId, uint32_t>& subMap = fVtable.columnMap();
340 
341     for (JobStepVector::iterator i = fCorrelatedSteps.begin(); i != fCorrelatedSteps.end(); i++)
342     {
343         TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
344         ExpressionStep* es = dynamic_cast<ExpressionStep*>(i->get());
345 
346         if (thjs)
347         {
348             if (thjs->getJoinType() & CORRELATED)
349             {
350                 checkCorrelateInfo(thjs, *fSubJobInfo);
351                 thjs->setJoinType(thjs->getJoinType() ^ CORRELATED);
352 
353                 if (thjs->correlatedSide() == 1)
354                 {
355                     int pos = thjs->sequence2();
356                     thjs->tableOid2(fVtable.tableOid());
357                     thjs->oid2(fVtable.columnOid(pos));
358                     thjs->alias2(fVtable.alias());
359                     thjs->view2(fVtable.columns()[pos]->viewName());
360                     thjs->schema2(fVtable.columns()[pos]->schemaName());
361                     thjs->dictOid2(0);
362                     thjs->sequence2(-1);
363                     thjs->joinId(fOutJobInfo->joinNum++);
364 
365                     CalpontSystemCatalog::ColType ct2 = fVtable.columnType(pos);
366                     TupleInfo ti(setTupleInfo(ct2, thjs->oid2(), *fOutJobInfo, thjs->tableOid2(),
367                                               fVtable.columns()[pos].get(), thjs->alias2()));
368                     thjs->tupleId2(ti.key);
369                 }
370                 else
371                 {
372                     int pos = thjs->sequence1();
373                     thjs->tableOid1(fVtable.tableOid());
374                     thjs->oid1(fVtable.columnOid(pos));
375                     thjs->alias1(fVtable.alias());
376                     thjs->view1(fVtable.columns()[pos]->viewName());
377                     thjs->schema1(fVtable.columns()[pos]->schemaName());
378                     thjs->dictOid1(0);
379                     thjs->sequence1(-1);
380                     thjs->joinId(fOutJobInfo->joinNum++);
381 
382                     CalpontSystemCatalog::ColType ct1 = fVtable.columnType(pos);
383                     TupleInfo ti(setTupleInfo(ct1, thjs->oid1(), *fOutJobInfo, thjs->tableOid1(),
384                                               fVtable.columns()[pos].get(), thjs->alias1()));
385                     thjs->tupleId1(ti.key);
386                 }
387             }
388 
389             /*
390             			else // oid1 == 0, dictionary column
391             			{
392             				CalpontSystemCatalog::ColType dct;
393             				dct.colDataType = CalpontSystemCatalog::BIGINT;
394             				dct.colWidth = 8;
395             				dct.scale = 0;
396             				dct.precision = 0;
397             			}
398             */
399         }
400         else if (es)
401         {
402             // already handled by function join
403             if (es->functionJoin())
404                 continue;
405 
406             vector<ReturnedColumn*>& scList = es->columns();
407             vector<CalpontSystemCatalog::OID>& tableOids = es->tableOids();
408             vector<string>& aliases = es->aliases();
409             vector<string>& views = es->views();
410             vector<string>& schemas = es->schemas();
411             vector<uint32_t>& tableKeys = es->tableKeys();
412             vector<uint32_t>& columnKeys = es->columnKeys();
413 
414             // update simple columns
415             for (uint64_t j = 0; j < scList.size(); j++)
416             {
417                 SimpleColumn* sc = dynamic_cast<SimpleColumn*>(scList[j]);
418 
419                 if (sc != NULL)
420                 {
421                     if (subTables.find(tableKeys[j]) != subTables.end())
422                     {
423                         const map<UniqId, uint32_t>::const_iterator k =
424                             subMap.find(UniqId(sc->oid(), aliases[j], schemas[j], views[j]));
425 
426                         if (k == subMap.end())
427                             throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
428 
429                         sc->schemaName("");
430                         sc->tableName(fVtable.name());
431                         sc->tableAlias(fVtable.alias());
432                         sc->viewName(fVtable.view());
433                         sc->oid(fVtable.columnOid(k->second));
434                         sc->columnName(fVtable.columns()[k->second]->columnName());
435                         const CalpontSystemCatalog::ColType& ct = fVtable.columnType(k->second);
436                         TupleInfo ti = setTupleInfo(
437                                            ct, sc->oid(), *fOutJobInfo, fVtable.tableOid(), sc, fVtable.alias());
438 
439                         tableOids[j] = execplan::CNX_VTABLE_ID;
440                         aliases[j] = sc->tableAlias();
441                         views[j] = sc->viewName();
442                         schemas[j] = sc->schemaName();
443                         columnKeys[j] = ti.key;
444                         tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
445                     }
446                     else
447                     {
448                         const CalpontSystemCatalog::ColType&
449                         ct = fOutJobInfo->keyInfo->colType[columnKeys[j]];
450                         sc->joinInfo(0);
451                         TupleInfo ti = setTupleInfo(
452                                            ct, sc->oid(), *fOutJobInfo, tableOids[j], sc, aliases[j]);
453 
454                         columnKeys[j] = ti.key;
455                         tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
456                     }
457                 }
458                 else if (dynamic_cast<WindowFunctionColumn*>(scList[j]) != NULL)
459                 {
460                     // workaround for window function IN/EXISTS subquery
461                     const map<UniqId, uint32_t>::const_iterator k =
462                         subMap.find(UniqId(scList[j]->expressionId(), "", "", ""));
463 
464                     if (k == subMap.end())
465                         throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
466 
467                     sc = fVtable.columns()[k->second].get();
468                     es->substitute(j, fVtable.columns()[k->second]);
469                     CalpontSystemCatalog::ColType ct = sc->colType();
470                     string alias(extractTableAlias(sc));
471                     CalpontSystemCatalog::OID tblOid = fVtable.tableOid();
472                     TupleInfo ti(setTupleInfo(ct, sc->oid(), *fOutJobInfo, tblOid, sc, alias));
473 
474                     tableOids[j] = execplan::CNX_VTABLE_ID;
475                     columnKeys[j] = ti.key;
476                     tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
477                 }
478             }
479 
480             es->updateColumnOidAlias(*fOutJobInfo);
481 
482             // update function join info
483             boost::shared_ptr<FunctionJoinInfo>& fji = es->functionJoinInfo();
484 
485             if (fji && fji->fCorrelatedSide)
486             {
487                 // replace sub side with a virtual column
488                 int64_t subSide = (fji->fCorrelatedSide == 1) ? 1 : 0;
489                 ReturnedColumn* rc = fji->fExpression[subSide];
490                 SimpleColumn*   sc = dynamic_cast<SimpleColumn*>(rc);
491 
492                 if (sc == NULL)
493                 {
494                     UniqId colId = UniqId(rc->expressionId(), "", "", "");
495                     const map<UniqId, uint32_t>::const_iterator k = subMap.find(colId);
496 
497                     if (k == subMap.end())
498                         throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
499 
500                     SSC vc = fVtable.columns()[k->second];
501                     sc = vc.get();
502                 }
503 
504                 // virtual table info in outer query
505                 TupleInfo ti(setTupleInfo(sc->colType(), sc->oid(), *fOutJobInfo,
506                                           fVtable.tableOid(), sc, fVtable.alias()));
507 
508                 set<uint32_t> cids;
509                 cids.insert(ti.key);
510                 fji->fJoinKey[subSide]  = ti.key;
511                 fji->fTableKey[subSide]  = ti.tkey;
512                 fji->fColumnKeys[subSide] = cids;
513                 fji->fTableOid[subSide]  = fVtable.tableOid();
514                 fji->fAlias[subSide]    = fVtable.alias();
515                 fji->fView[subSide]      = fVtable.view();
516                 fji->fSchema[subSide]    = "";
517 
518                 // turn off correlated flag
519                 fji->fExpression[fji->fCorrelatedSide - 1]->joinInfo(0);
520                 fji->fJoinType ^= CORRELATED;
521                 fji->fJoinId = 0;
522             }
523         }
524         else
525         {
526             JobStep* j = i->get();
527             uint32_t tid = -1;
528             uint64_t cid = j->tupleId();
529 
530             if (cid != (uint64_t) - 1)
531                 tid = getTableKey(*fOutJobInfo, cid);
532             else
533                 tid = getTableKey(*fOutJobInfo, j);
534 
535             if (outTables.find(tid) == outTables.end())
536             {
537                 if (subMap.find(
538                             UniqId(j->oid(), j->alias(), j->schema(), j->view(), 0)) != subMap.end())
539                     //throw CorrelateFailExcept();
540                     throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
541             }
542         }
543     }
544 }
545 
546 
run()547 void SubQueryTransformer::run()
548 {
549     // not to be called for base class
550 }
551 
552 
553 // ------ SimpleScalarTransformer ------
SimpleScalarTransformer(JobInfo * jobInfo,SErrorInfo & status,bool e)554 SimpleScalarTransformer::SimpleScalarTransformer(JobInfo* jobInfo, SErrorInfo& status, bool e) :
555     SubQueryTransformer(jobInfo, status),
556     fInputDl(NULL),
557     fDlIterator(-1),
558     fEmptyResultSet(true),
559     fExistFilter(e)
560 {
561 }
562 
563 
SimpleScalarTransformer(const SubQueryTransformer & rhs)564 SimpleScalarTransformer::SimpleScalarTransformer(const SubQueryTransformer& rhs) :
565     SubQueryTransformer(rhs.outJobInfo(), rhs.errorInfo()),
566     fInputDl(NULL),
567     fDlIterator(-1),
568     fEmptyResultSet(true),
569     fExistFilter(false)
570 {
571     fSubJobList = rhs.subJobList();
572     fSubQueryStep = rhs.subQueryStep();
573 }
574 
575 
~SimpleScalarTransformer()576 SimpleScalarTransformer::~SimpleScalarTransformer()
577 {
578 }
579 
580 
run()581 void SimpleScalarTransformer::run()
582 {
583     // set up receiver
584     fRowGroup = dynamic_cast<SubQueryStep*>(fSubQueryStep.get())->getOutputRowGroup();
585     fRowGroup.initRow(&fRow, true);
586     fInputDl = fSubQueryStep->outputAssociation().outAt(0)->rowGroupDL();
587     fDlIterator = fInputDl->getIterator();
588 
589     // run the subquery
590     fSubJobList->doQuery();
591 
592     // retrieve the scalar result
593     getScalarResult();
594 
595     // check result count
596     if (fErrorInfo->errCode == ERR_MORE_THAN_1_ROW)
597         throw MoreThan1RowExcept();
598 }
599 
600 
getScalarResult()601 void SimpleScalarTransformer::getScalarResult()
602 {
603     RGData rgData;
604     bool more;
605 
606     more = fInputDl->next(fDlIterator, &rgData);
607 
608     while (more)
609     {
610         fRowGroup.setData(&rgData);
611 
612         // Only need one row for scalar filter
613         if (fEmptyResultSet && fRowGroup.getRowCount() == 1)
614         {
615             fEmptyResultSet = false;
616             Row row;
617             fRowGroup.initRow(&row);
618             fRowGroup.getRow(0, &row);
619             fRowData.reset(new uint8_t[fRow.getSize()]);
620             fRow.setData(fRowData.get());
621             copyRow(row, &fRow);
622 
623             // For exist filter, stop the query after one or more rows retrieved.
624             if (fExistFilter)
625             {
626                 fErrorInfo->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW);
627                 fErrorInfo->errCode = ERR_MORE_THAN_1_ROW;
628             }
629         }
630 
631         // more than 1 row case:
632         //   not empty set, and get new rows
633         //   empty set, but get more than 1 rows
634         else if (fRowGroup.getRowCount() > 0)
635         {
636             // Stop the query after some rows retrieved.
637             fEmptyResultSet = false;
638             fErrorInfo->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW);
639             fErrorInfo->errCode = ERR_MORE_THAN_1_ROW;
640         }
641 
642         // For scalar filter, have to check all blocks to ensure only one row.
643         if (fErrorInfo->errCode != 0)
644             while (more) more = fInputDl->next(fDlIterator, &rgData);
645         else
646             more = fInputDl->next(fDlIterator, &rgData);
647     }
648 }
649 
650 
651 }
652 // vim:ts=4 sw=4:
653 
654