1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (c) 2016-2020 MariaDB
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: subquerystep.cpp 6370 2010-03-18 02:58:09Z xlou $
20
21
22 #include <iostream>
23 //#define NDEBUG
24 #include <cassert>
25 using namespace std;
26
27 #include <boost/scoped_array.hpp>
28 #include <boost/shared_array.hpp>
29 #include <boost/scoped_ptr.hpp>
30 #include <boost/shared_ptr.hpp>
31 #include <boost/thread.hpp>
32 #include <boost/uuid/uuid_io.hpp>
33 using namespace boost;
34
35 #include "parsetree.h"
36 #include "logicoperator.h"
37 using namespace execplan;
38
39 #include "rowgroup.h"
40 using namespace rowgroup;
41
42 #include "errorids.h"
43 #include "exceptclasses.h"
44 using namespace logging;
45
46 #include "querytele.h"
47 using namespace querytele;
48
49 #include "funcexp.h"
50
51 #include "jobstep.h"
52 #include "jlf_common.h"
53 #include "jlf_tuplejoblist.h"
54 #include "expressionstep.h"
55 #include "subquerystep.h"
56 using namespace joblist;
57
58
59 namespace joblist
60 {
61
SubQueryStep(const JobInfo & jobInfo)62 SubQueryStep::SubQueryStep(const JobInfo& jobInfo)
63 : JobStep(jobInfo)
64 , fRowsReturned(0)
65 {
66 fExtendedInfo = "SQS: ";
67 fQtc.stepParms().stepType = StepTeleStats::T_SQS;
68 }
69
~SubQueryStep()70 SubQueryStep::~SubQueryStep()
71 {
72 }
73
run()74 void SubQueryStep::run()
75 {
76 fSubJobList->doQuery();
77 }
78
join()79 void SubQueryStep::join()
80 {
81 if (fRunner)
82 fRunner->join();
83 }
84
abort()85 void SubQueryStep::abort()
86 {
87 JobStep::abort();
88 fSubJobList->abort();
89 }
90
toString() const91 const string SubQueryStep::toString() const
92 {
93 ostringstream oss;
94 oss << "SubQueryStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
95
96 if (fOutputJobStepAssociation.outSize() > 0)
97 {
98 oss << " out:";
99
100 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
101 oss << fOutputJobStepAssociation.outAt(i);
102 }
103
104 return oss.str();
105 }
106
107
108 /*
109 void SubQueryStep::printCalTrace()
110 {
111 time_t t = time (0);
112 char timeString[50];
113 ctime_r (&t, timeString);
114 timeString[strlen (timeString )-1] = '\0';
115 ostringstream logStr;
116 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at "<< timeString
117 << "; total rows returned-" << fRowsReturned << endl
118 << "\t1st read " << dlTimes.FirstReadTimeString()
119 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
120 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
121 << "s;\n\tJob completion status " << status() << endl;
122 logEnd(logStr.str().c_str());
123 fExtendedInfo += logStr.str();
124 formatMiniStats();
125 }
126
127
128 void SubQueryStep::formatMiniStats()
129 {
130 ostringstream oss;
131 oss << "SQS "
132 << "UM "
133 << "- "
134 << "- "
135 << "- "
136 << "- "
137 << "- "
138 << "- "
139 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
140 << fRowsReturned << " ";
141 fMiniInfo += oss.str();
142 }
143 */
144
145
SubAdapterStep(SJSTEP & s,const JobInfo & jobInfo)146 SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo)
147 : JobStep(jobInfo)
148 , fTableOid(s->tableOid())
149 , fSubStep(s)
150 , fRowsInput(0)
151 , fRowsReturned(0)
152 , fEndOfResult(false)
153 , fInputIterator(0)
154 , fOutputIterator(0)
155 , fRunner(0)
156 {
157 fAlias = s->alias();
158 fView = s->view();
159 fInputJobStepAssociation = s->outputAssociation();
160 fRowGroupIn = dynamic_cast<SubQueryStep*>(s.get())->getOutputRowGroup();
161 setOutputRowGroup(fRowGroupIn);
162 }
163
164
~SubAdapterStep()165 SubAdapterStep::~SubAdapterStep()
166 {
167 }
168
abort()169 void SubAdapterStep::abort()
170 {
171 JobStep::abort();
172
173 if (fSubStep)
174 fSubStep->abort();
175 }
176
run()177 void SubAdapterStep::run()
178 {
179 if (fInputJobStepAssociation.outSize() == 0)
180 throw logic_error("No input data list for subquery adapter step.");
181
182 fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
183
184 if (fInputDL == NULL)
185 throw logic_error("Input is not a RowGroup data list.");
186
187 fInputIterator = fInputDL->getIterator();
188
189 if (fOutputJobStepAssociation.outSize() == 0)
190 throw logic_error("No output data list for non-delivery subquery adapter step.");
191
192 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
193
194 if (fOutputDL == NULL)
195 throw logic_error("Output is not a RowGroup data list.");
196
197 if (fDelivery)
198 fOutputIterator = fOutputDL->getIterator();
199
200 fRunner = jobstepThreadPool.invoke(Runner(this));
201 }
202
203
join()204 void SubAdapterStep::join()
205 {
206 if (fRunner)
207 jobstepThreadPool.join(fRunner);
208 }
209
210
nextBand(messageqcpp::ByteStream & bs)211 uint32_t SubAdapterStep::nextBand(messageqcpp::ByteStream& bs)
212 {
213 RGData rgDataOut;
214 bool more = false;
215 uint32_t rowCount = 0;
216
217 try
218 {
219 bs.restart();
220
221 more = fOutputDL->next(fOutputIterator, &rgDataOut);
222
223 if (!more || cancelled())
224 {
225 //@bug4459.
226 while (more)
227 more = fOutputDL->next(fOutputIterator, &rgDataOut);
228
229 fEndOfResult = true;
230 }
231
232 if (more && !fEndOfResult)
233 {
234 fRowGroupDeliver.setData(&rgDataOut);
235 fRowGroupDeliver.serializeRGData(bs);
236 rowCount = fRowGroupDeliver.getRowCount();
237 }
238 }
239 catch (...)
240 {
241 handleException(std::current_exception(),
242 logging::ERR_IN_DELIVERY,
243 logging::ERR_ALWAYS_CRITICAL,
244 "SubAdapterStep::nextBand()");
245 while (more)
246 more = fOutputDL->next(fOutputIterator, &rgDataOut);
247 fEndOfResult = true;
248 }
249
250 if (fEndOfResult)
251 {
252 // send an empty / error band
253 RGData rgData(fRowGroupDeliver, 0);
254 fRowGroupDeliver.setData(&rgData);
255 fRowGroupDeliver.resetRowGroup(0);
256 fRowGroupDeliver.setStatus(status());
257 fRowGroupDeliver.serializeRGData(bs);
258 }
259
260 return rowCount;
261 }
262
263
setFeRowGroup(const rowgroup::RowGroup & rg)264 void SubAdapterStep::setFeRowGroup(const rowgroup::RowGroup& rg)
265 {
266 fRowGroupFe = rg;
267 }
268
269
setOutputRowGroup(const rowgroup::RowGroup & rg)270 void SubAdapterStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
271 {
272 fRowGroupOut = fRowGroupDeliver = rg;
273
274 if (fRowGroupFe.getColumnCount() == 0)
275 fIndexMap = makeMapping(fRowGroupIn, fRowGroupOut);
276 else
277 fIndexMap = makeMapping(fRowGroupFe, fRowGroupOut);
278
279 checkDupOutputColumns();
280 }
281
282
checkDupOutputColumns()283 void SubAdapterStep::checkDupOutputColumns()
284 {
285 map<uint32_t, uint32_t> keymap; // map<unique col key, col index in the row group>
286 fDupColumns.clear();
287 const vector<uint32_t>& keys = fRowGroupDeliver.getKeys();
288
289 for (uint32_t i = 0; i < keys.size(); i++)
290 {
291 map<uint32_t, uint32_t>::iterator j = keymap.find(keys[i]);
292
293 if (j == keymap.end())
294 keymap.insert(make_pair(keys[i], i)); // map key to col index
295 else
296 fDupColumns.push_back(make_pair(i, j->second)); // dest/src index pair
297 }
298 }
299
300
dupOutputColumns(Row & row)301 void SubAdapterStep::dupOutputColumns(Row& row)
302 {
303 for (uint64_t i = 0; i < fDupColumns.size(); i++)
304 row.copyField(fDupColumns[i].first, fDupColumns[i].second);
305 }
306
307
outputRow(Row & rowIn,Row & rowOut)308 void SubAdapterStep::outputRow(Row& rowIn, Row& rowOut)
309 {
310 applyMapping(fIndexMap, rowIn, &rowOut);
311
312 if (fDupColumns.size() > 0)
313 dupOutputColumns(rowOut);
314
315 fRowGroupOut.incRowCount();
316 rowOut.nextRow();
317 }
318
319
deliverStringTableRowGroup(bool b)320 void SubAdapterStep::deliverStringTableRowGroup(bool b)
321 {
322 fRowGroupOut.setUseStringTable(b);
323 fRowGroupDeliver.setUseStringTable(b);
324 }
325
326
deliverStringTableRowGroup() const327 bool SubAdapterStep::deliverStringTableRowGroup() const
328 {
329 idbassert(fRowGroupOut.usesStringTable() == fRowGroupDeliver.usesStringTable());
330 return fRowGroupDeliver.usesStringTable();
331 }
332
333
toString() const334 const string SubAdapterStep::toString() const
335 {
336 ostringstream oss;
337 oss << "SubAdapterStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
338
339 if (fInputJobStepAssociation.outSize() > 0)
340 oss << fInputJobStepAssociation.outAt(0);
341
342 if (fOutputJobStepAssociation.outSize() > 0)
343 oss << fOutputJobStepAssociation.outAt(0);
344
345 return oss.str();
346 }
347
348
execute()349 void SubAdapterStep::execute()
350 {
351 RGData rgDataIn;
352 RGData rgDataOut;
353 Row rowIn;
354 Row rowFe;
355 Row rowOut;
356 fRowGroupIn.initRow(&rowIn);
357 fRowGroupOut.initRow(&rowOut);
358
359 RGData rowFeData;
360 StepTeleStats sts;
361 sts.query_uuid = fQueryUuid;
362 sts.step_uuid = fStepUuid;
363 bool usesFE = false;
364
365 if (fRowGroupFe.getColumnCount() > 0)
366 {
367 usesFE = true;
368 fRowGroupFe.initRow(&rowFe, true);
369 rowFeData = RGData(fRowGroupFe, 1);
370 fRowGroupFe.setData(&rowFeData);
371 fRowGroupFe.getRow(0, &rowFe);
372 }
373
374 bool more = false;
375
376 try
377 {
378 sts.msg_type = StepTeleStats::ST_START;
379 sts.total_units_of_work = 1;
380 postStepStartTele(sts);
381
382 fSubStep->run();
383
384 more = fInputDL->next(fInputIterator, &rgDataIn);
385
386 if (traceOn()) dlTimes.setFirstReadTime();
387
388 while (more && !cancelled())
389 {
390 fRowGroupIn.setData(&rgDataIn);
391 rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
392 fRowGroupOut.setData(&rgDataOut);
393 fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
394
395 fRowGroupIn.getRow(0, &rowIn);
396 fRowGroupOut.getRow(0, &rowOut);
397
398 fRowsInput += fRowGroupIn.getRowCount();
399
400 for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
401 {
402 if (fExpression.get() == NULL)
403 {
404 outputRow(rowIn, rowOut);
405 }
406 else if (!usesFE)
407 {
408 if (fExpression->evaluate(&rowIn))
409 {
410 outputRow(rowIn, rowOut);
411 }
412 }
413 else
414 {
415 copyRow(rowIn, &rowFe, rowIn.getColumnCount());
416
417 //memcpy(rowFe.getData(), rowIn.getData(), rowIn.getSize());
418 if (fExpression->evaluate(&rowFe))
419 {
420 outputRow(rowFe, rowOut);
421 }
422 }
423
424 rowIn.nextRow();
425 }
426
427 if (fRowGroupOut.getRowCount() > 0)
428 {
429 fRowsReturned += fRowGroupOut.getRowCount();
430 fOutputDL->insert(rgDataOut);
431 }
432
433 more = fInputDL->next(fInputIterator, &rgDataIn);
434 }
435 }
436 catch (...)
437 {
438 handleException(std::current_exception(),
439 logging::ERR_EXEMGR_MALFUNCTION,
440 logging::ERR_ALWAYS_CRITICAL,
441 "SubAdapterStep::execute()");
442 }
443
444 if (cancelled())
445 while (more)
446 more = fInputDL->next(fInputIterator, &rgDataIn);
447
448 if (traceOn())
449 {
450 dlTimes.setLastReadTime();
451 dlTimes.setEndOfInputTime();
452 printCalTrace();
453 }
454
455 sts.msg_type = StepTeleStats::ST_SUMMARY;
456 sts.total_units_of_work = sts.units_of_work_completed = 1;
457 sts.rows = fRowsReturned;
458 postStepSummaryTele(sts);
459
460 // Bug 3136, let mini stats to be formatted if traceOn.
461 fOutputDL->endOfInput();
462 }
463
464
addExpression(const JobStepVector & exps,JobInfo & jobInfo)465 void SubAdapterStep::addExpression(const JobStepVector& exps, JobInfo& jobInfo)
466 {
467 // maps key to the index in the RG
468 map<uint32_t, uint32_t> keyToIndexMap;
469 const vector<uint32_t>& keys = fRowGroupIn.getKeys();
470
471 for (size_t i = 0; i < keys.size(); i++)
472 keyToIndexMap[keys[i]] = i;
473
474 // combine the expression to one parse tree
475 ParseTree* filter = NULL;
476
477 for (JobStepVector::const_iterator it = exps.begin(); it != exps.end(); it++)
478 {
479 ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
480 idbassert(e);
481
482 e->updateInputIndex(keyToIndexMap, jobInfo);
483
484 if (filter != NULL)
485 {
486 ParseTree* left = filter;
487 ParseTree* right = new ParseTree();
488 right->copyTree(*(e->expressionFilter()));
489 filter = new ParseTree(new LogicOperator("and"));
490 filter->left(left);
491 filter->right(right);
492 }
493 else
494 {
495 filter = new ParseTree();
496 filter->copyTree(*(e->expressionFilter()));
497 }
498 }
499
500 // add to the expression wrapper
501 if (fExpression.get() == NULL)
502 fExpression.reset(new funcexp::FuncExpWrapper());
503
504 fExpression->addFilter(boost::shared_ptr<execplan::ParseTree>(filter));
505 }
506
507
addExpression(const vector<SRCP> & exps)508 void SubAdapterStep::addExpression(const vector<SRCP>& exps)
509 {
510 // add to the function wrapper
511 if (fExpression.get() == NULL)
512 fExpression.reset(new funcexp::FuncExpWrapper());
513
514 for (vector<SRCP>::const_iterator i = exps.begin(); i != exps.end(); i++)
515 fExpression->addReturnedColumn(*i);
516 }
517
518
addFcnJoinExp(const vector<SRCP> & exps)519 void SubAdapterStep::addFcnJoinExp(const vector<SRCP>& exps)
520 {
521 addExpression(exps);
522 }
523
524
printCalTrace()525 void SubAdapterStep::printCalTrace()
526 {
527 time_t t = time (0);
528 char timeString[50];
529 ctime_r (&t, timeString);
530 timeString[strlen (timeString ) - 1] = '\0';
531 ostringstream logStr;
532 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
533 << "; total rows input-" << fRowsInput
534 << "; total rows returned-" << fRowsReturned << endl
535 << "\t1st read " << dlTimes.FirstReadTimeString()
536 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
537 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
538 << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
539 << "\tJob completion status " << status() << endl;
540 logEnd(logStr.str().c_str());
541 fExtendedInfo += logStr.str();
542 formatMiniStats();
543 }
544
545
formatMiniStats()546 void SubAdapterStep::formatMiniStats()
547 {
548 /*
549 ostringstream oss;
550 oss << "SAS "
551 << "UM "
552 << "- "
553 << "- "
554 << "- "
555 << "- "
556 << "- "
557 << "- "
558 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
559 << fRowsReturned << " ";
560 fMiniInfo += oss.str();
561 */
562 }
563
564
565 }
566 // vim:ts=4 sw=4:
567
568