1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (c) 2016-2020 MariaDB
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: subquerystep.cpp 6370 2010-03-18 02:58:09Z xlou $
20 
21 
22 #include <iostream>
23 //#define NDEBUG
24 #include <cassert>
25 using namespace std;
26 
27 #include <boost/scoped_array.hpp>
28 #include <boost/shared_array.hpp>
29 #include <boost/scoped_ptr.hpp>
30 #include <boost/shared_ptr.hpp>
31 #include <boost/thread.hpp>
32 #include <boost/uuid/uuid_io.hpp>
33 using namespace boost;
34 
35 #include "parsetree.h"
36 #include "logicoperator.h"
37 using namespace execplan;
38 
39 #include "rowgroup.h"
40 using namespace rowgroup;
41 
42 #include "errorids.h"
43 #include "exceptclasses.h"
44 using namespace logging;
45 
46 #include "querytele.h"
47 using namespace querytele;
48 
49 #include "funcexp.h"
50 
51 #include "jobstep.h"
52 #include "jlf_common.h"
53 #include "jlf_tuplejoblist.h"
54 #include "expressionstep.h"
55 #include "subquerystep.h"
56 using namespace joblist;
57 
58 
59 namespace joblist
60 {
61 
SubQueryStep(const JobInfo & jobInfo)62 SubQueryStep::SubQueryStep(const JobInfo& jobInfo)
63     : JobStep(jobInfo)
64     , fRowsReturned(0)
65 {
66     fExtendedInfo = "SQS: ";
67     fQtc.stepParms().stepType = StepTeleStats::T_SQS;
68 }
69 
~SubQueryStep()70 SubQueryStep::~SubQueryStep()
71 {
72 }
73 
run()74 void SubQueryStep::run()
75 {
76     fSubJobList->doQuery();
77 }
78 
join()79 void SubQueryStep::join()
80 {
81     if (fRunner)
82         fRunner->join();
83 }
84 
abort()85 void SubQueryStep::abort()
86 {
87     JobStep::abort();
88     fSubJobList->abort();
89 }
90 
toString() const91 const string SubQueryStep::toString() const
92 {
93     ostringstream oss;
94     oss << "SubQueryStep    ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
95 
96     if (fOutputJobStepAssociation.outSize() > 0)
97     {
98         oss << " out:";
99 
100         for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
101             oss << fOutputJobStepAssociation.outAt(i);
102     }
103 
104     return oss.str();
105 }
106 
107 
108 /*
109 void SubQueryStep::printCalTrace()
110 {
111 	time_t t = time (0);
112 	char timeString[50];
113 	ctime_r (&t, timeString);
114 	timeString[strlen (timeString )-1] = '\0';
115 	ostringstream logStr;
116 	logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at "<< timeString
117 			<< "; total rows returned-" << fRowsReturned << endl
118 			<< "\t1st read " << dlTimes.FirstReadTimeString()
119 			<< "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
120 			<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
121 			<< "s;\n\tJob completion status " << status() << endl;
122 	logEnd(logStr.str().c_str());
123 	fExtendedInfo += logStr.str();
124 	formatMiniStats();
125 }
126 
127 
128 void SubQueryStep::formatMiniStats()
129 {
130 	ostringstream oss;
131 	oss << "SQS "
132 		<< "UM "
133 		<< "- "
134 		<< "- "
135 		<< "- "
136 		<< "- "
137 		<< "- "
138 		<< "- "
139 		<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
140 		<< fRowsReturned << " ";
141 	fMiniInfo += oss.str();
142 }
143 */
144 
145 
SubAdapterStep(SJSTEP & s,const JobInfo & jobInfo)146 SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo)
147     : JobStep(jobInfo)
148     , fTableOid(s->tableOid())
149     , fSubStep(s)
150     , fRowsInput(0)
151     , fRowsReturned(0)
152     , fEndOfResult(false)
153     , fInputIterator(0)
154     , fOutputIterator(0)
155     , fRunner(0)
156 {
157     fAlias = s->alias();
158     fView = s->view();
159     fInputJobStepAssociation = s->outputAssociation();
160     fRowGroupIn = dynamic_cast<SubQueryStep*>(s.get())->getOutputRowGroup();
161     setOutputRowGroup(fRowGroupIn);
162 }
163 
164 
~SubAdapterStep()165 SubAdapterStep::~SubAdapterStep()
166 {
167 }
168 
abort()169 void SubAdapterStep::abort()
170 {
171     JobStep::abort();
172 
173     if (fSubStep)
174         fSubStep->abort();
175 }
176 
run()177 void SubAdapterStep::run()
178 {
179     if (fInputJobStepAssociation.outSize() == 0)
180         throw logic_error("No input data list for subquery adapter step.");
181 
182     fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
183 
184     if (fInputDL == NULL)
185         throw logic_error("Input is not a RowGroup data list.");
186 
187     fInputIterator = fInputDL->getIterator();
188 
189     if (fOutputJobStepAssociation.outSize() == 0)
190         throw logic_error("No output data list for non-delivery subquery adapter step.");
191 
192     fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
193 
194     if (fOutputDL == NULL)
195         throw logic_error("Output is not a RowGroup data list.");
196 
197     if (fDelivery)
198         fOutputIterator = fOutputDL->getIterator();
199 
200     fRunner = jobstepThreadPool.invoke(Runner(this));
201 }
202 
203 
join()204 void SubAdapterStep::join()
205 {
206     if (fRunner)
207         jobstepThreadPool.join(fRunner);
208 }
209 
210 
nextBand(messageqcpp::ByteStream & bs)211 uint32_t SubAdapterStep::nextBand(messageqcpp::ByteStream& bs)
212 {
213     RGData rgDataOut;
214     bool more = false;
215     uint32_t rowCount = 0;
216 
217     try
218     {
219         bs.restart();
220 
221         more = fOutputDL->next(fOutputIterator, &rgDataOut);
222 
223         if (!more || cancelled())
224         {
225             //@bug4459.
226             while (more)
227                 more = fOutputDL->next(fOutputIterator, &rgDataOut);
228 
229             fEndOfResult = true;
230         }
231 
232         if (more && !fEndOfResult)
233         {
234             fRowGroupDeliver.setData(&rgDataOut);
235             fRowGroupDeliver.serializeRGData(bs);
236             rowCount = fRowGroupDeliver.getRowCount();
237         }
238     }
239     catch (...)
240     {
241         handleException(std::current_exception(),
242                         logging::ERR_IN_DELIVERY,
243                         logging::ERR_ALWAYS_CRITICAL,
244                         "SubAdapterStep::nextBand()");
245         while (more)
246             more = fOutputDL->next(fOutputIterator, &rgDataOut);
247         fEndOfResult = true;
248     }
249 
250     if (fEndOfResult)
251     {
252         // send an empty / error band
253         RGData rgData(fRowGroupDeliver, 0);
254         fRowGroupDeliver.setData(&rgData);
255         fRowGroupDeliver.resetRowGroup(0);
256         fRowGroupDeliver.setStatus(status());
257         fRowGroupDeliver.serializeRGData(bs);
258     }
259 
260     return rowCount;
261 }
262 
263 
setFeRowGroup(const rowgroup::RowGroup & rg)264 void SubAdapterStep::setFeRowGroup(const rowgroup::RowGroup& rg)
265 {
266     fRowGroupFe = rg;
267 }
268 
269 
setOutputRowGroup(const rowgroup::RowGroup & rg)270 void SubAdapterStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
271 {
272     fRowGroupOut = fRowGroupDeliver = rg;
273 
274     if (fRowGroupFe.getColumnCount() == 0)
275         fIndexMap = makeMapping(fRowGroupIn, fRowGroupOut);
276     else
277         fIndexMap = makeMapping(fRowGroupFe, fRowGroupOut);
278 
279     checkDupOutputColumns();
280 }
281 
282 
checkDupOutputColumns()283 void SubAdapterStep::checkDupOutputColumns()
284 {
285     map<uint32_t, uint32_t> keymap; // map<unique col key, col index in the row group>
286     fDupColumns.clear();
287     const vector<uint32_t>& keys = fRowGroupDeliver.getKeys();
288 
289     for (uint32_t i = 0; i < keys.size(); i++)
290     {
291         map<uint32_t, uint32_t>::iterator j = keymap.find(keys[i]);
292 
293         if (j == keymap.end())
294             keymap.insert(make_pair(keys[i], i));           // map key to col index
295         else
296             fDupColumns.push_back(make_pair(i, j->second)); // dest/src index pair
297     }
298 }
299 
300 
dupOutputColumns(Row & row)301 void SubAdapterStep::dupOutputColumns(Row& row)
302 {
303     for (uint64_t i = 0; i < fDupColumns.size(); i++)
304         row.copyField(fDupColumns[i].first, fDupColumns[i].second);
305 }
306 
307 
outputRow(Row & rowIn,Row & rowOut)308 void SubAdapterStep::outputRow(Row& rowIn, Row& rowOut)
309 {
310     applyMapping(fIndexMap, rowIn, &rowOut);
311 
312     if (fDupColumns.size() > 0)
313         dupOutputColumns(rowOut);
314 
315     fRowGroupOut.incRowCount();
316     rowOut.nextRow();
317 }
318 
319 
deliverStringTableRowGroup(bool b)320 void SubAdapterStep::deliverStringTableRowGroup(bool b)
321 {
322     fRowGroupOut.setUseStringTable(b);
323     fRowGroupDeliver.setUseStringTable(b);
324 }
325 
326 
deliverStringTableRowGroup() const327 bool SubAdapterStep::deliverStringTableRowGroup() const
328 {
329     idbassert(fRowGroupOut.usesStringTable() == fRowGroupDeliver.usesStringTable());
330     return fRowGroupDeliver.usesStringTable();
331 }
332 
333 
toString() const334 const string SubAdapterStep::toString() const
335 {
336     ostringstream oss;
337     oss << "SubAdapterStep  ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
338 
339     if (fInputJobStepAssociation.outSize() > 0)
340         oss << fInputJobStepAssociation.outAt(0);
341 
342     if (fOutputJobStepAssociation.outSize() > 0)
343         oss << fOutputJobStepAssociation.outAt(0);
344 
345     return oss.str();
346 }
347 
348 
execute()349 void SubAdapterStep::execute()
350 {
351     RGData rgDataIn;
352     RGData rgDataOut;
353     Row rowIn;
354     Row rowFe;
355     Row rowOut;
356     fRowGroupIn.initRow(&rowIn);
357     fRowGroupOut.initRow(&rowOut);
358 
359     RGData rowFeData;
360     StepTeleStats sts;
361     sts.query_uuid = fQueryUuid;
362     sts.step_uuid = fStepUuid;
363     bool usesFE = false;
364 
365     if (fRowGroupFe.getColumnCount() > 0)
366     {
367         usesFE = true;
368         fRowGroupFe.initRow(&rowFe, true);
369         rowFeData = RGData(fRowGroupFe, 1);
370         fRowGroupFe.setData(&rowFeData);
371         fRowGroupFe.getRow(0, &rowFe);
372     }
373 
374     bool more = false;
375 
376     try
377     {
378         sts.msg_type = StepTeleStats::ST_START;
379         sts.total_units_of_work = 1;
380         postStepStartTele(sts);
381 
382         fSubStep->run();
383 
384         more = fInputDL->next(fInputIterator, &rgDataIn);
385 
386         if (traceOn()) dlTimes.setFirstReadTime();
387 
388         while (more && !cancelled())
389         {
390             fRowGroupIn.setData(&rgDataIn);
391             rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
392             fRowGroupOut.setData(&rgDataOut);
393             fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
394 
395             fRowGroupIn.getRow(0, &rowIn);
396             fRowGroupOut.getRow(0, &rowOut);
397 
398             fRowsInput += fRowGroupIn.getRowCount();
399 
400             for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
401             {
402                 if (fExpression.get() == NULL)
403                 {
404                     outputRow(rowIn, rowOut);
405                 }
406                 else if (!usesFE)
407                 {
408                     if (fExpression->evaluate(&rowIn))
409                     {
410                         outputRow(rowIn, rowOut);
411                     }
412                 }
413                 else
414                 {
415                     copyRow(rowIn, &rowFe, rowIn.getColumnCount());
416 
417                     //memcpy(rowFe.getData(), rowIn.getData(), rowIn.getSize());
418                     if (fExpression->evaluate(&rowFe))
419                     {
420                         outputRow(rowFe, rowOut);
421                     }
422                 }
423 
424                 rowIn.nextRow();
425             }
426 
427             if (fRowGroupOut.getRowCount() > 0)
428             {
429                 fRowsReturned += fRowGroupOut.getRowCount();
430                 fOutputDL->insert(rgDataOut);
431             }
432 
433             more = fInputDL->next(fInputIterator, &rgDataIn);
434         }
435     }
436     catch (...)
437     {
438         handleException(std::current_exception(),
439                         logging::ERR_EXEMGR_MALFUNCTION,
440                         logging::ERR_ALWAYS_CRITICAL,
441                         "SubAdapterStep::execute()");
442     }
443 
444     if (cancelled())
445         while (more)
446             more = fInputDL->next(fInputIterator, &rgDataIn);
447 
448     if (traceOn())
449     {
450         dlTimes.setLastReadTime();
451         dlTimes.setEndOfInputTime();
452         printCalTrace();
453     }
454 
455     sts.msg_type = StepTeleStats::ST_SUMMARY;
456     sts.total_units_of_work = sts.units_of_work_completed = 1;
457     sts.rows = fRowsReturned;
458     postStepSummaryTele(sts);
459 
460     // Bug 3136, let mini stats to be formatted if traceOn.
461     fOutputDL->endOfInput();
462 }
463 
464 
addExpression(const JobStepVector & exps,JobInfo & jobInfo)465 void SubAdapterStep::addExpression(const JobStepVector& exps, JobInfo& jobInfo)
466 {
467     // maps key to the index in the RG
468     map<uint32_t, uint32_t> keyToIndexMap;
469     const vector<uint32_t>& keys = fRowGroupIn.getKeys();
470 
471     for (size_t i = 0; i < keys.size(); i++)
472         keyToIndexMap[keys[i]] = i;
473 
474     // combine the expression to one parse tree
475     ParseTree* filter = NULL;
476 
477     for (JobStepVector::const_iterator it = exps.begin(); it != exps.end(); it++)
478     {
479         ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
480         idbassert(e);
481 
482         e->updateInputIndex(keyToIndexMap, jobInfo);
483 
484         if (filter != NULL)
485         {
486             ParseTree* left = filter;
487             ParseTree* right = new ParseTree();
488             right->copyTree(*(e->expressionFilter()));
489             filter = new ParseTree(new LogicOperator("and"));
490             filter->left(left);
491             filter->right(right);
492         }
493         else
494         {
495             filter = new ParseTree();
496             filter->copyTree(*(e->expressionFilter()));
497         }
498     }
499 
500     // add to the expression wrapper
501     if (fExpression.get() == NULL)
502         fExpression.reset(new funcexp::FuncExpWrapper());
503 
504     fExpression->addFilter(boost::shared_ptr<execplan::ParseTree>(filter));
505 }
506 
507 
addExpression(const vector<SRCP> & exps)508 void SubAdapterStep::addExpression(const vector<SRCP>& exps)
509 {
510     // add to the function wrapper
511     if (fExpression.get() == NULL)
512         fExpression.reset(new funcexp::FuncExpWrapper());
513 
514     for (vector<SRCP>::const_iterator i = exps.begin(); i != exps.end(); i++)
515         fExpression->addReturnedColumn(*i);
516 }
517 
518 
addFcnJoinExp(const vector<SRCP> & exps)519 void SubAdapterStep::addFcnJoinExp(const vector<SRCP>& exps)
520 {
521     addExpression(exps);
522 }
523 
524 
printCalTrace()525 void SubAdapterStep::printCalTrace()
526 {
527     time_t t = time (0);
528     char timeString[50];
529     ctime_r (&t, timeString);
530     timeString[strlen (timeString ) - 1] = '\0';
531     ostringstream logStr;
532     logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
533             << "; total rows input-" << fRowsInput
534             << "; total rows returned-" << fRowsReturned << endl
535             << "\t1st read " << dlTimes.FirstReadTimeString()
536             << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
537             << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
538             << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
539             << "\tJob completion status " << status() << endl;
540     logEnd(logStr.str().c_str());
541     fExtendedInfo += logStr.str();
542     formatMiniStats();
543 }
544 
545 
formatMiniStats()546 void SubAdapterStep::formatMiniStats()
547 {
548     /*
549     	ostringstream oss;
550     	oss << "SAS "
551     		<< "UM "
552     		<< "- "
553     		<< "- "
554     		<< "- "
555     		<< "- "
556     		<< "- "
557     		<< "- "
558     		<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
559     		<< fRowsReturned << " ";
560     	fMiniInfo += oss.str();
561     */
562 }
563 
564 
565 }
566 // vim:ts=4 sw=4:
567 
568