1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 //  $Id: tuplehavingstep.cpp 9709 2013-07-20 06:08:46Z xlou $
19 
20 
21 //#define NDEBUG
22 #include <cassert>
23 #include <sstream>
24 #include <iomanip>
25 using namespace std;
26 
27 #include <boost/shared_ptr.hpp>
28 #include <boost/shared_array.hpp>
29 #include <boost/uuid/uuid_io.hpp>
30 using namespace boost;
31 
32 #include "messagequeue.h"
33 using namespace messageqcpp;
34 
35 #include "loggingid.h"
36 #include "errorcodes.h"
37 using namespace logging;
38 
39 #include "calpontsystemcatalog.h"
40 #include "aggregatecolumn.h"
41 #include "constantcolumn.h"
42 #include "simplecolumn.h"
43 using namespace execplan;
44 
45 #include "jobstep.h"
46 #include "rowgroup.h"
47 using namespace rowgroup;
48 
49 #include "querytele.h"
50 using namespace querytele;
51 
52 #include "funcexp.h"
53 
54 #include "jlf_common.h"
55 #include "tuplehavingstep.h"
56 
57 
58 namespace joblist
59 {
60 
TupleHavingStep(const JobInfo & jobInfo)61 TupleHavingStep::TupleHavingStep(const JobInfo& jobInfo) :
62     ExpressionStep(jobInfo),
63     fInputDL(NULL),
64     fOutputDL(NULL),
65     fInputIterator(0),
66     fRunner(0),
67     fRowsReturned(0),
68     fEndOfResult(false),
69     fFeInstance(funcexp::FuncExp::instance())
70 {
71     fExtendedInfo = "HVS: ";
72     fQtc.stepParms().stepType = StepTeleStats::T_HVS;
73 }
74 
75 
~TupleHavingStep()76 TupleHavingStep::~TupleHavingStep()
77 {
78 }
79 
80 
setOutputRowGroup(const rowgroup::RowGroup & rg)81 void TupleHavingStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
82 {
83     throw runtime_error("Disabled, use initialize() to set output RowGroup.");
84 }
85 
86 
initialize(const RowGroup & rgIn,const JobInfo & jobInfo)87 void TupleHavingStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
88 {
89     fRowGroupIn = rgIn;
90     fRowGroupIn.initRow(&fRowIn);
91 
92     map<uint32_t, uint32_t> keyToIndexMap;
93 
94     for (uint64_t i = 0; i < fRowGroupIn.getKeys().size(); ++i)
95         if (keyToIndexMap.find(fRowGroupIn.getKeys()[i]) == keyToIndexMap.end())
96             keyToIndexMap.insert(make_pair(fRowGroupIn.getKeys()[i], i));
97 
98     updateInputIndex(keyToIndexMap, jobInfo);
99 
100     vector<uint32_t> oids, oidsIn = fRowGroupIn.getOIDs();
101     vector<uint32_t> keys, keysIn = fRowGroupIn.getKeys();
102     vector<uint32_t> scale, scaleIn = fRowGroupIn.getScale();
103     vector<uint32_t> precision, precisionIn = fRowGroupIn.getPrecision();
104     vector<CalpontSystemCatalog::ColDataType> types, typesIn = fRowGroupIn.getColTypes();
105     vector<uint32_t> csNums, csNumsIn = fRowGroupIn.getCharsetNumbers();
106     vector<uint32_t> pos, posIn = fRowGroupIn.getOffsets();
107 
108     size_t n = 0;
109     RetColsVector::const_iterator i = jobInfo.deliveredCols.begin();
110 
111     while (i != jobInfo.deliveredCols.end())
112         if (NULL == dynamic_cast<const ConstantColumn*>(i++->get()))
113             n++;
114 
115     oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n);
116     keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n);
117     scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n);
118     precision.insert(precision.end(), precisionIn.begin(), precisionIn.begin() + n);
119     types.insert(types.end(), typesIn.begin(), typesIn.begin() + n);
120     csNums.insert(csNums.end(), csNumsIn.begin(), csNumsIn.begin() + n);
121     pos.insert(pos.end(), posIn.begin(), posIn.begin() + n + 1);
122 
123     fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
124     fRowGroupOut.initRow(&fRowOut);
125 }
126 
127 
expressionFilter(const ParseTree * filter,JobInfo & jobInfo)128 void TupleHavingStep::expressionFilter(const ParseTree* filter, JobInfo& jobInfo)
129 {
130     // let base class handle the simple columns
131     ExpressionStep::expressionFilter(filter, jobInfo);
132 
133     // extract simple columns from parse tree
134     vector<AggregateColumn*> acv;
135     fExpressionFilter->walk(getAggCols, &acv);
136     fColumns.insert(fColumns.end(), acv.begin(), acv.end());
137 }
138 
139 
run()140 void TupleHavingStep::run()
141 {
142     if (fInputJobStepAssociation.outSize() == 0)
143         throw logic_error("No input data list for having step.");
144 
145     fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
146 
147     if (fInputDL == NULL)
148         throw logic_error("Input is not a RowGroup data list.");
149 
150     fInputIterator = fInputDL->getIterator();
151 
152     if (fDelivery == false)
153     {
154         if (fOutputJobStepAssociation.outSize() == 0)
155             throw logic_error("No output data list for non-delivery having step.");
156 
157         fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
158 
159         if (fOutputDL == NULL)
160             throw logic_error("Output is not a RowGroup data list.");
161 
162         fRunner = jobstepThreadPool.invoke(Runner(this));
163     }
164 }
165 
166 
join()167 void TupleHavingStep::join()
168 {
169     if (fRunner)
170         jobstepThreadPool.join(fRunner);
171 }
172 
173 
nextBand(messageqcpp::ByteStream & bs)174 uint32_t TupleHavingStep::nextBand(messageqcpp::ByteStream& bs)
175 {
176     RGData rgDataIn;
177     RGData rgDataOut;
178     bool more = false;
179     uint32_t rowCount = 0;
180 
181     try
182     {
183         bs.restart();
184 
185         more = fInputDL->next(fInputIterator, &rgDataIn);
186 
187         if (dlTimes.FirstReadTime().tv_sec == 0)
188             dlTimes.setFirstReadTime();
189 
190         if (!more || cancelled())
191         {
192             fEndOfResult = true;
193         }
194 
195         bool emptyRowGroup = true;
196 
197         while (more && !fEndOfResult && emptyRowGroup)
198         {
199             if (cancelled())
200             {
201                 while (more)
202                     more = fInputDL->next(fInputIterator, &rgDataIn);
203 
204                 break;
205             }
206 
207             fRowGroupIn.setData(&rgDataIn);
208             rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
209             fRowGroupOut.setData(&rgDataOut);
210 
211             doHavingFilters();
212 
213             if (fRowGroupOut.getRowCount() > 0)
214             {
215                 emptyRowGroup = false;
216                 fRowGroupOut.serializeRGData(bs);
217                 rowCount = fRowGroupOut.getRowCount();
218             }
219             else
220             {
221                 more = fInputDL->next(fInputIterator, &rgDataIn);
222             }
223         }
224 
225         if (!more)
226         {
227             fEndOfResult = true;
228         }
229     }
230     catch (...)
231     {
232         handleException(std::current_exception(),
233                         logging::tupleHavingStepErr,
234                         logging::ERR_ALWAYS_CRITICAL,
235                         "TupleHavingStep::nextBand()");
236          while (more)
237             more = fInputDL->next(fInputIterator, &rgDataIn);
238 
239         fEndOfResult = true;
240     }
241 
242     if (fEndOfResult)
243     {
244         // send an empty / error band
245         rgDataOut.reinit(fRowGroupOut, 0);
246         fRowGroupOut.setData(&rgDataOut);
247         fRowGroupOut.resetRowGroup(0);
248         fRowGroupOut.setStatus(status());
249         fRowGroupOut.serializeRGData(bs);
250 
251         dlTimes.setLastReadTime();
252         dlTimes.setEndOfInputTime();
253 
254         if (traceOn())
255             printCalTrace();
256     }
257 
258     return rowCount;
259 }
260 
261 
execute()262 void TupleHavingStep::execute()
263 {
264     RGData rgDataIn;
265     RGData rgDataOut;
266     bool more = false;
267     StepTeleStats sts;
268     sts.query_uuid = fQueryUuid;
269     sts.step_uuid = fStepUuid;
270 
271     try
272     {
273         more = fInputDL->next(fInputIterator, &rgDataIn);
274         dlTimes.setFirstReadTime();
275 
276         sts.msg_type = StepTeleStats::ST_START;
277         sts.total_units_of_work = 1;
278         postStepStartTele(sts);
279 
280         if (!more && cancelled())
281         {
282             fEndOfResult = true;
283         }
284 
285         while (more && !fEndOfResult)
286         {
287             fRowGroupIn.setData(&rgDataIn);
288             rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
289             fRowGroupOut.setData(&rgDataOut);
290 
291             doHavingFilters();
292 
293             more = fInputDL->next(fInputIterator, &rgDataIn);
294 
295             if (cancelled())
296             {
297                 fEndOfResult = true;
298             }
299             else
300             {
301                 fOutputDL->insert(rgDataOut);
302             }
303         }
304     }
305     catch (...)
306     {
307         handleException(std::current_exception(),
308                         logging::tupleHavingStepErr,
309                         logging::ERR_ALWAYS_CRITICAL,
310                         "TupleHavingStep::nextBand()");
311     }
312 
313     while (more)
314         more = fInputDL->next(fInputIterator, &rgDataIn);
315 
316     fEndOfResult = true;
317     fOutputDL->endOfInput();
318 
319     sts.msg_type = StepTeleStats::ST_SUMMARY;
320     sts.total_units_of_work = sts.units_of_work_completed = 1;
321     sts.rows = fRowsReturned;
322     postStepSummaryTele(sts);
323 
324     dlTimes.setLastReadTime();
325     dlTimes.setEndOfInputTime();
326 
327     if (traceOn())
328         printCalTrace();
329 }
330 
331 
doHavingFilters()332 void TupleHavingStep::doHavingFilters()
333 {
334     fRowGroupIn.getRow(0, &fRowIn);
335     fRowGroupOut.getRow(0, &fRowOut);
336     fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
337 
338     for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
339     {
340         if (fFeInstance->evaluate(fRowIn, fExpressionFilter))
341         {
342             copyRow(fRowIn, &fRowOut);
343             fRowGroupOut.incRowCount();
344             fRowOut.nextRow();
345         }
346 
347         fRowIn.nextRow();
348     }
349 
350     fRowsReturned += fRowGroupOut.getRowCount();
351 }
352 
353 
getOutputRowGroup() const354 const RowGroup& TupleHavingStep::getOutputRowGroup() const
355 {
356     return fRowGroupOut;
357 }
358 
359 
getDeliveredRowGroup() const360 const RowGroup& TupleHavingStep::getDeliveredRowGroup() const
361 {
362     return fRowGroupOut;
363 }
364 
365 
deliverStringTableRowGroup(bool b)366 void TupleHavingStep::deliverStringTableRowGroup(bool b)
367 {
368     fRowGroupOut.setUseStringTable(b);
369 }
370 
371 
deliverStringTableRowGroup() const372 bool TupleHavingStep::deliverStringTableRowGroup() const
373 {
374     return fRowGroupOut.usesStringTable();
375 }
376 
377 
toString() const378 const string TupleHavingStep::toString() const
379 {
380     ostringstream oss;
381     oss << "HavingStep   ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
382 
383     oss << " in:";
384 
385     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
386         oss << fInputJobStepAssociation.outAt(i);
387 
388     oss << " out:";
389 
390     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
391         oss << fOutputJobStepAssociation.outAt(i);
392 
393     oss << endl;
394 
395     return oss.str();
396 }
397 
398 
printCalTrace()399 void TupleHavingStep::printCalTrace()
400 {
401     time_t t = time (0);
402     char timeString[50];
403     ctime_r (&t, timeString);
404     timeString[strlen (timeString ) - 1] = '\0';
405     ostringstream logStr;
406     logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
407             << "; total rows returned-" << fRowsReturned << endl
408             << "\t1st read " << dlTimes.FirstReadTimeString()
409             << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
410             << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
411             << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
412             << "\tJob completion status " << status() << endl;
413     logEnd(logStr.str().c_str());
414     fExtendedInfo += logStr.str();
415     formatMiniStats();
416 }
417 
418 
formatMiniStats()419 void TupleHavingStep::formatMiniStats()
420 {
421     fMiniInfo += "THS ";
422     fMiniInfo += "UM ";
423     fMiniInfo += "- ";
424     fMiniInfo += "- ";
425     fMiniInfo += "- ";
426     fMiniInfo += "- ";
427     fMiniInfo += "- ";
428     fMiniInfo += "- ";
429     fMiniInfo += JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) + " ";
430     fMiniInfo += "- ";
431 }
432 
433 
434 }   //namespace
435 // vim:ts=4 sw=4:
436 
437