1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 // $Id: tuplehavingstep.cpp 9709 2013-07-20 06:08:46Z xlou $
19
20
21 //#define NDEBUG
22 #include <cassert>
23 #include <sstream>
24 #include <iomanip>
25 using namespace std;
26
27 #include <boost/shared_ptr.hpp>
28 #include <boost/shared_array.hpp>
29 #include <boost/uuid/uuid_io.hpp>
30 using namespace boost;
31
32 #include "messagequeue.h"
33 using namespace messageqcpp;
34
35 #include "loggingid.h"
36 #include "errorcodes.h"
37 using namespace logging;
38
39 #include "calpontsystemcatalog.h"
40 #include "aggregatecolumn.h"
41 #include "constantcolumn.h"
42 #include "simplecolumn.h"
43 using namespace execplan;
44
45 #include "jobstep.h"
46 #include "rowgroup.h"
47 using namespace rowgroup;
48
49 #include "querytele.h"
50 using namespace querytele;
51
52 #include "funcexp.h"
53
54 #include "jlf_common.h"
55 #include "tuplehavingstep.h"
56
57
58 namespace joblist
59 {
60
TupleHavingStep(const JobInfo & jobInfo)61 TupleHavingStep::TupleHavingStep(const JobInfo& jobInfo) :
62 ExpressionStep(jobInfo),
63 fInputDL(NULL),
64 fOutputDL(NULL),
65 fInputIterator(0),
66 fRunner(0),
67 fRowsReturned(0),
68 fEndOfResult(false),
69 fFeInstance(funcexp::FuncExp::instance())
70 {
71 fExtendedInfo = "HVS: ";
72 fQtc.stepParms().stepType = StepTeleStats::T_HVS;
73 }
74
75
~TupleHavingStep()76 TupleHavingStep::~TupleHavingStep()
77 {
78 }
79
80
setOutputRowGroup(const rowgroup::RowGroup & rg)81 void TupleHavingStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
82 {
83 throw runtime_error("Disabled, use initialize() to set output RowGroup.");
84 }
85
86
initialize(const RowGroup & rgIn,const JobInfo & jobInfo)87 void TupleHavingStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
88 {
89 fRowGroupIn = rgIn;
90 fRowGroupIn.initRow(&fRowIn);
91
92 map<uint32_t, uint32_t> keyToIndexMap;
93
94 for (uint64_t i = 0; i < fRowGroupIn.getKeys().size(); ++i)
95 if (keyToIndexMap.find(fRowGroupIn.getKeys()[i]) == keyToIndexMap.end())
96 keyToIndexMap.insert(make_pair(fRowGroupIn.getKeys()[i], i));
97
98 updateInputIndex(keyToIndexMap, jobInfo);
99
100 vector<uint32_t> oids, oidsIn = fRowGroupIn.getOIDs();
101 vector<uint32_t> keys, keysIn = fRowGroupIn.getKeys();
102 vector<uint32_t> scale, scaleIn = fRowGroupIn.getScale();
103 vector<uint32_t> precision, precisionIn = fRowGroupIn.getPrecision();
104 vector<CalpontSystemCatalog::ColDataType> types, typesIn = fRowGroupIn.getColTypes();
105 vector<uint32_t> csNums, csNumsIn = fRowGroupIn.getCharsetNumbers();
106 vector<uint32_t> pos, posIn = fRowGroupIn.getOffsets();
107
108 size_t n = 0;
109 RetColsVector::const_iterator i = jobInfo.deliveredCols.begin();
110
111 while (i != jobInfo.deliveredCols.end())
112 if (NULL == dynamic_cast<const ConstantColumn*>(i++->get()))
113 n++;
114
115 oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n);
116 keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n);
117 scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n);
118 precision.insert(precision.end(), precisionIn.begin(), precisionIn.begin() + n);
119 types.insert(types.end(), typesIn.begin(), typesIn.begin() + n);
120 csNums.insert(csNums.end(), csNumsIn.begin(), csNumsIn.begin() + n);
121 pos.insert(pos.end(), posIn.begin(), posIn.begin() + n + 1);
122
123 fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
124 fRowGroupOut.initRow(&fRowOut);
125 }
126
127
expressionFilter(const ParseTree * filter,JobInfo & jobInfo)128 void TupleHavingStep::expressionFilter(const ParseTree* filter, JobInfo& jobInfo)
129 {
130 // let base class handle the simple columns
131 ExpressionStep::expressionFilter(filter, jobInfo);
132
133 // extract simple columns from parse tree
134 vector<AggregateColumn*> acv;
135 fExpressionFilter->walk(getAggCols, &acv);
136 fColumns.insert(fColumns.end(), acv.begin(), acv.end());
137 }
138
139
run()140 void TupleHavingStep::run()
141 {
142 if (fInputJobStepAssociation.outSize() == 0)
143 throw logic_error("No input data list for having step.");
144
145 fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
146
147 if (fInputDL == NULL)
148 throw logic_error("Input is not a RowGroup data list.");
149
150 fInputIterator = fInputDL->getIterator();
151
152 if (fDelivery == false)
153 {
154 if (fOutputJobStepAssociation.outSize() == 0)
155 throw logic_error("No output data list for non-delivery having step.");
156
157 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
158
159 if (fOutputDL == NULL)
160 throw logic_error("Output is not a RowGroup data list.");
161
162 fRunner = jobstepThreadPool.invoke(Runner(this));
163 }
164 }
165
166
join()167 void TupleHavingStep::join()
168 {
169 if (fRunner)
170 jobstepThreadPool.join(fRunner);
171 }
172
173
nextBand(messageqcpp::ByteStream & bs)174 uint32_t TupleHavingStep::nextBand(messageqcpp::ByteStream& bs)
175 {
176 RGData rgDataIn;
177 RGData rgDataOut;
178 bool more = false;
179 uint32_t rowCount = 0;
180
181 try
182 {
183 bs.restart();
184
185 more = fInputDL->next(fInputIterator, &rgDataIn);
186
187 if (dlTimes.FirstReadTime().tv_sec == 0)
188 dlTimes.setFirstReadTime();
189
190 if (!more || cancelled())
191 {
192 fEndOfResult = true;
193 }
194
195 bool emptyRowGroup = true;
196
197 while (more && !fEndOfResult && emptyRowGroup)
198 {
199 if (cancelled())
200 {
201 while (more)
202 more = fInputDL->next(fInputIterator, &rgDataIn);
203
204 break;
205 }
206
207 fRowGroupIn.setData(&rgDataIn);
208 rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
209 fRowGroupOut.setData(&rgDataOut);
210
211 doHavingFilters();
212
213 if (fRowGroupOut.getRowCount() > 0)
214 {
215 emptyRowGroup = false;
216 fRowGroupOut.serializeRGData(bs);
217 rowCount = fRowGroupOut.getRowCount();
218 }
219 else
220 {
221 more = fInputDL->next(fInputIterator, &rgDataIn);
222 }
223 }
224
225 if (!more)
226 {
227 fEndOfResult = true;
228 }
229 }
230 catch (...)
231 {
232 handleException(std::current_exception(),
233 logging::tupleHavingStepErr,
234 logging::ERR_ALWAYS_CRITICAL,
235 "TupleHavingStep::nextBand()");
236 while (more)
237 more = fInputDL->next(fInputIterator, &rgDataIn);
238
239 fEndOfResult = true;
240 }
241
242 if (fEndOfResult)
243 {
244 // send an empty / error band
245 rgDataOut.reinit(fRowGroupOut, 0);
246 fRowGroupOut.setData(&rgDataOut);
247 fRowGroupOut.resetRowGroup(0);
248 fRowGroupOut.setStatus(status());
249 fRowGroupOut.serializeRGData(bs);
250
251 dlTimes.setLastReadTime();
252 dlTimes.setEndOfInputTime();
253
254 if (traceOn())
255 printCalTrace();
256 }
257
258 return rowCount;
259 }
260
261
execute()262 void TupleHavingStep::execute()
263 {
264 RGData rgDataIn;
265 RGData rgDataOut;
266 bool more = false;
267 StepTeleStats sts;
268 sts.query_uuid = fQueryUuid;
269 sts.step_uuid = fStepUuid;
270
271 try
272 {
273 more = fInputDL->next(fInputIterator, &rgDataIn);
274 dlTimes.setFirstReadTime();
275
276 sts.msg_type = StepTeleStats::ST_START;
277 sts.total_units_of_work = 1;
278 postStepStartTele(sts);
279
280 if (!more && cancelled())
281 {
282 fEndOfResult = true;
283 }
284
285 while (more && !fEndOfResult)
286 {
287 fRowGroupIn.setData(&rgDataIn);
288 rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
289 fRowGroupOut.setData(&rgDataOut);
290
291 doHavingFilters();
292
293 more = fInputDL->next(fInputIterator, &rgDataIn);
294
295 if (cancelled())
296 {
297 fEndOfResult = true;
298 }
299 else
300 {
301 fOutputDL->insert(rgDataOut);
302 }
303 }
304 }
305 catch (...)
306 {
307 handleException(std::current_exception(),
308 logging::tupleHavingStepErr,
309 logging::ERR_ALWAYS_CRITICAL,
310 "TupleHavingStep::nextBand()");
311 }
312
313 while (more)
314 more = fInputDL->next(fInputIterator, &rgDataIn);
315
316 fEndOfResult = true;
317 fOutputDL->endOfInput();
318
319 sts.msg_type = StepTeleStats::ST_SUMMARY;
320 sts.total_units_of_work = sts.units_of_work_completed = 1;
321 sts.rows = fRowsReturned;
322 postStepSummaryTele(sts);
323
324 dlTimes.setLastReadTime();
325 dlTimes.setEndOfInputTime();
326
327 if (traceOn())
328 printCalTrace();
329 }
330
331
doHavingFilters()332 void TupleHavingStep::doHavingFilters()
333 {
334 fRowGroupIn.getRow(0, &fRowIn);
335 fRowGroupOut.getRow(0, &fRowOut);
336 fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
337
338 for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
339 {
340 if (fFeInstance->evaluate(fRowIn, fExpressionFilter))
341 {
342 copyRow(fRowIn, &fRowOut);
343 fRowGroupOut.incRowCount();
344 fRowOut.nextRow();
345 }
346
347 fRowIn.nextRow();
348 }
349
350 fRowsReturned += fRowGroupOut.getRowCount();
351 }
352
353
getOutputRowGroup() const354 const RowGroup& TupleHavingStep::getOutputRowGroup() const
355 {
356 return fRowGroupOut;
357 }
358
359
getDeliveredRowGroup() const360 const RowGroup& TupleHavingStep::getDeliveredRowGroup() const
361 {
362 return fRowGroupOut;
363 }
364
365
deliverStringTableRowGroup(bool b)366 void TupleHavingStep::deliverStringTableRowGroup(bool b)
367 {
368 fRowGroupOut.setUseStringTable(b);
369 }
370
371
deliverStringTableRowGroup() const372 bool TupleHavingStep::deliverStringTableRowGroup() const
373 {
374 return fRowGroupOut.usesStringTable();
375 }
376
377
toString() const378 const string TupleHavingStep::toString() const
379 {
380 ostringstream oss;
381 oss << "HavingStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
382
383 oss << " in:";
384
385 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
386 oss << fInputJobStepAssociation.outAt(i);
387
388 oss << " out:";
389
390 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
391 oss << fOutputJobStepAssociation.outAt(i);
392
393 oss << endl;
394
395 return oss.str();
396 }
397
398
printCalTrace()399 void TupleHavingStep::printCalTrace()
400 {
401 time_t t = time (0);
402 char timeString[50];
403 ctime_r (&t, timeString);
404 timeString[strlen (timeString ) - 1] = '\0';
405 ostringstream logStr;
406 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
407 << "; total rows returned-" << fRowsReturned << endl
408 << "\t1st read " << dlTimes.FirstReadTimeString()
409 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
410 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
411 << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
412 << "\tJob completion status " << status() << endl;
413 logEnd(logStr.str().c_str());
414 fExtendedInfo += logStr.str();
415 formatMiniStats();
416 }
417
418
formatMiniStats()419 void TupleHavingStep::formatMiniStats()
420 {
421 fMiniInfo += "THS ";
422 fMiniInfo += "UM ";
423 fMiniInfo += "- ";
424 fMiniInfo += "- ";
425 fMiniInfo += "- ";
426 fMiniInfo += "- ";
427 fMiniInfo += "- ";
428 fMiniInfo += "- ";
429 fMiniInfo += JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) + " ";
430 fMiniInfo += "- ";
431 }
432
433
434 } //namespace
435 // vim:ts=4 sw=4:
436
437