1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 //  $Id: crossenginestep.cpp 9709 2013-07-20 06:08:46Z xlou $
19 
20 #include "crossenginestep.h"
21 #include <unistd.h>
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 using namespace std;
27 
28 #include <boost/shared_ptr.hpp>
29 #include <boost/shared_array.hpp>
30 #include <boost/uuid/uuid_io.hpp>
31 using namespace boost;
32 
33 #include "messagequeue.h"
34 using namespace messageqcpp;
35 
36 #include "loggingid.h"
37 #include "errorcodes.h"
38 #include "idberrorinfo.h"
39 #include "errorids.h"
40 using namespace logging;
41 
42 #include "calpontsystemcatalog.h"
43 #include "constantcolumn.h"
44 using namespace execplan;
45 
46 #include "rowgroup.h"
47 using namespace rowgroup;
48 
49 #include "dataconvert.h"
50 using namespace dataconvert;
51 
52 #include "querytele.h"
53 using namespace querytele;
54 
55 #include "funcexp.h"
56 
57 #include "jobstep.h"
58 #include "jlf_common.h"
59 
60 #include "libmysql_client.h"
61 
62 namespace joblist
63 {
64 
CrossEngineStep(const std::string & schema,const std::string & table,const std::string & alias,const JobInfo & jobInfo)65 CrossEngineStep::CrossEngineStep(
66     const std::string& schema,
67     const std::string& table,
68     const std::string& alias,
69     const JobInfo& jobInfo) :
70     BatchPrimitive(jobInfo),
71     fRowsRetrieved(0),
72     fRowsReturned(0),
73     fRowsPerGroup(256),
74     fOutputDL(NULL),
75     fOutputIterator(0),
76     fRunner(0),
77     fEndOfResult(false),
78     fSchema(schema),
79     fTable(table),
80     fAlias(alias),
81     fColumnCount(0),
82     fFeInstance(funcexp::FuncExp::instance())
83 {
84     fExtendedInfo = "CES: ";
85     getMysqldInfo(jobInfo);
86     fQtc.stepParms().stepType = StepTeleStats::T_CES;
87     mysql = new utils::LibMySQL();
88 }
89 
90 
~CrossEngineStep()91 CrossEngineStep::~CrossEngineStep()
92 {
93     delete mysql;
94 }
95 
96 
setOutputRowGroup(const rowgroup::RowGroup & rg)97 void CrossEngineStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
98 {
99     fRowGroupOut = fRowGroupDelivered = fRowGroupAdded = rg;
100 }
101 
102 
deliverStringTableRowGroup(bool b)103 void CrossEngineStep::deliverStringTableRowGroup(bool b)
104 {
105     // results are either using setField, or mapping to delivered row group
106     fRowGroupDelivered.setUseStringTable(b);
107 }
108 
109 
deliverStringTableRowGroup() const110 bool CrossEngineStep::deliverStringTableRowGroup() const
111 {
112     return fRowGroupDelivered.usesStringTable();
113 }
114 
115 
addFcnJoinExp(const std::vector<execplan::SRCP> & fe)116 void CrossEngineStep::addFcnJoinExp(const std::vector<execplan::SRCP>& fe)
117 {
118     fFeFcnJoin = fe;
119 }
120 
121 
addFcnExpGroup1(const boost::shared_ptr<ParseTree> & fe)122 void CrossEngineStep::addFcnExpGroup1(const boost::shared_ptr<ParseTree>& fe)
123 {
124     fFeFilters.push_back(fe);
125 }
126 
127 
setFE1Input(const rowgroup::RowGroup & rg)128 void CrossEngineStep::setFE1Input(const rowgroup::RowGroup& rg)
129 {
130     fRowGroupFe1 = rg;
131 }
132 
133 
setFcnExpGroup3(const std::vector<execplan::SRCP> & fe)134 void CrossEngineStep::setFcnExpGroup3(const std::vector<execplan::SRCP>& fe)
135 {
136     fFeSelects = fe;
137 }
138 
139 
setFE23Output(const rowgroup::RowGroup & rg)140 void CrossEngineStep::setFE23Output(const rowgroup::RowGroup& rg)
141 {
142     fRowGroupFe3 = fRowGroupDelivered = fRowGroupAdded = rg;
143 }
144 
145 
makeMappings()146 void CrossEngineStep::makeMappings()
147 {
148     fFe1Column.reset(new int[fColumnCount]);
149 
150     for (uint64_t i = 0; i < fColumnCount; ++i)
151         fFe1Column[i] = -1;
152 
153     if (fFeFilters.size() > 0 || fFeFcnJoin.size() > 0)
154     {
155         const std::vector<uint32_t>& colInFe1 = fRowGroupFe1.getKeys();
156 
157         for (uint64_t i = 0; i < colInFe1.size(); i++)
158         {
159             map<uint32_t, uint32_t>::iterator it = fColumnMap.find(colInFe1[i]);
160 
161             if (it != fColumnMap.end())
162                 fFe1Column[it->second] = i;
163         }
164 
165         fFeMapping1 = makeMapping(fRowGroupFe1, fRowGroupOut);
166     }
167 
168     if (!fFeSelects.empty())
169         fFeMapping3 = makeMapping(fRowGroupOut, fRowGroupFe3);
170 }
171 
172 
setField(int i,const char * value,unsigned long length,MYSQL_FIELD * field,Row & row)173 void CrossEngineStep::setField(int i, const char* value, unsigned long length, MYSQL_FIELD* field, Row& row)
174 {
175     CalpontSystemCatalog::ColDataType colType = row.getColType(i);
176 
177     if ((colType == CalpontSystemCatalog::CHAR || colType == CalpontSystemCatalog::VARCHAR) &&
178             row.getColumnWidth(i) > 8)
179     {
180         if (value != NULL)
181             row.setStringField(value, i);
182         else
183             row.setStringField("", i);
184     }
185     else if ((colType == CalpontSystemCatalog::BLOB) ||
186              (colType == CalpontSystemCatalog::TEXT) ||
187              (colType == CalpontSystemCatalog::VARBINARY))
188     {
189         if (value != NULL)
190             row.setVarBinaryField((const uint8_t*)value, length, i);
191         else
192             row.setVarBinaryField(NULL, 0, i);
193     }
194     else
195     {
196         CalpontSystemCatalog::ColType ct;
197         ct.colDataType = colType;
198         ct.colWidth = row.getColumnWidth(i);
199 
200         if (colType == CalpontSystemCatalog::DECIMAL)
201         {
202             ct.scale = field->decimals;
203             ct.precision = field->length;
204         }
205         else
206         {
207             ct.scale = row.getScale(i);
208             ct.precision = row.getPrecision(i);
209         }
210 
211         row.setIntField(convertValueNum(value, ct, row.getSignedNullValue(i)), i);
212     }
213 }
214 
215 
addRow(RGData & data)216 inline void CrossEngineStep::addRow(RGData& data)
217 {
218     fRowDelivered.setRid(fRowsReturned % fRowsPerGroup);
219     fRowDelivered.nextRow();
220     fRowGroupAdded.incRowCount();
221 
222     if (++fRowsReturned % fRowsPerGroup == 0)
223     {
224         fOutputDL->insert(data);
225         data.reinit(fRowGroupAdded, fRowsPerGroup);
226         fRowGroupAdded.setData(&data);
227         fRowGroupAdded.resetRowGroup(fRowsReturned);
228         fRowGroupAdded.getRow(0, &fRowDelivered);
229     }
230 }
231 
232 
233 // simplified version of convertValueNum() in jlf_execplantojoblist.cpp.
convertValueNum(const char * str,const CalpontSystemCatalog::ColType & ct,int64_t nullValue)234 int64_t CrossEngineStep::convertValueNum(
235     const char* str, const CalpontSystemCatalog::ColType& ct, int64_t nullValue)
236 {
237     // return value
238     int64_t rv = nullValue;
239 
240     // null value
241     if (str == NULL)
242         return rv;
243 
244     // convertColumnData(execplan::CalpontSystemCatalog::ColType colType,
245     //                   const std::string& dataOrig,
246     //                   bool& pushWarning,
247     //                   bool nulFlag,
248     //                   bool noRoundup )
249     bool pushWarning = false;
250     boost::any anyVal = DataConvert::convertColumnData(ct, str, pushWarning, fTimeZone, false, true, false);
251 
252     // Out of range values are treated as NULL as discussed during design review.
253     if (pushWarning)
254         return rv;
255 
256     // non-null value
257     switch (ct.colDataType)
258     {
259         case CalpontSystemCatalog::BIT:
260             rv = boost::any_cast<bool>(anyVal);
261             break;
262 
263         case CalpontSystemCatalog::TINYINT:
264             rv = boost::any_cast<char>(anyVal);
265             break;
266 
267         case CalpontSystemCatalog::UTINYINT:
268             rv = boost::any_cast<uint8_t>(anyVal);
269             break;
270 
271         case CalpontSystemCatalog::SMALLINT:
272             rv = boost::any_cast<int16_t>(anyVal);
273             break;
274 
275         case CalpontSystemCatalog::USMALLINT:
276             rv = boost::any_cast<uint16_t>(anyVal);
277             break;
278 
279         case CalpontSystemCatalog::MEDINT:
280         case CalpontSystemCatalog::INT:
281 #ifdef _MSC_VER
282             rv = boost::any_cast<int>(anyVal);
283 #else
284             rv = boost::any_cast<int32_t>(anyVal);
285 #endif
286             break;
287 
288         case CalpontSystemCatalog::UMEDINT:
289         case CalpontSystemCatalog::UINT:
290             rv = boost::any_cast<uint32_t>(anyVal);
291             break;
292 
293         case CalpontSystemCatalog::BIGINT:
294             rv = boost::any_cast<long long>(anyVal);
295             break;
296 
297         case CalpontSystemCatalog::UBIGINT:
298             rv = boost::any_cast<uint64_t>(anyVal);
299             break;
300 
301         case CalpontSystemCatalog::FLOAT:
302         case CalpontSystemCatalog::UFLOAT:
303         {
304             float f = boost::any_cast<float>(anyVal);
305 
306             //N.B. There is a bug in boost::any or in gcc where, if you store a nan,
307             //     you will get back a nan, but not necessarily the same bits that you put in.
308             //     This only seems to be for float (double seems to work).
309             if (isnan(f))
310             {
311                 uint32_t ti = joblist::FLOATNULL;
312                 float* tfp = (float*)&ti;
313                 f = *tfp;
314             }
315 
316             float* fp = &f;
317             int32_t* ip = reinterpret_cast<int32_t*>(fp);
318             rv = *ip;
319         }
320         break;
321 
322         case CalpontSystemCatalog::DOUBLE:
323         case CalpontSystemCatalog::UDOUBLE:
324         {
325             double d = boost::any_cast<double>(anyVal);
326             double* dp = &d;
327             int64_t* ip = reinterpret_cast<int64_t*>(dp);
328             rv = *ip;
329         }
330         break;
331 
332         case CalpontSystemCatalog::CHAR:
333         case CalpontSystemCatalog::VARCHAR:
334         case CalpontSystemCatalog::VARBINARY:
335         case CalpontSystemCatalog::BLOB:
336         case CalpontSystemCatalog::TEXT:
337         case CalpontSystemCatalog::CLOB:
338         {
339             std::string i = boost::any_cast<std::string>(anyVal);
340             // bug 1932, pad nulls up to the size of v
341             i.resize(sizeof(rv), 0);
342             rv = *((uint64_t*) i.data());
343         }
344         break;
345 
346         case CalpontSystemCatalog::DATE:
347             rv = boost::any_cast<uint32_t>(anyVal);
348             break;
349 
350         case CalpontSystemCatalog::DATETIME:
351             rv = boost::any_cast<uint64_t>(anyVal);
352             break;
353 
354         case CalpontSystemCatalog::TIMESTAMP:
355             rv = boost::any_cast<uint64_t>(anyVal);
356             break;
357 
358         case CalpontSystemCatalog::TIME:
359             rv = boost::any_cast<int64_t>(anyVal);
360             break;
361 
362         case CalpontSystemCatalog::DECIMAL:
363         case CalpontSystemCatalog::UDECIMAL:
364             if (ct.colWidth == CalpontSystemCatalog::ONE_BYTE)
365                 rv = boost::any_cast<char>(anyVal);
366             else if (ct.colWidth == CalpontSystemCatalog::TWO_BYTE)
367                 rv = boost::any_cast<int16_t>(anyVal);
368             else if (ct.colWidth == CalpontSystemCatalog::FOUR_BYTE)
369 #ifdef _MSC_VER
370                 rv = boost::any_cast<int>(anyVal);
371 
372 #else
373                 rv = boost::any_cast<int32_t>(anyVal);
374 #endif
375             else
376                 rv = boost::any_cast<long long>(anyVal);
377 
378             break;
379 
380         default:
381             break;
382     }
383 
384     return rv;
385 }
386 
387 
getMysqldInfo(const JobInfo & jobInfo)388 void CrossEngineStep::getMysqldInfo(const JobInfo& jobInfo)
389 {
390     if (jobInfo.rm->getMysqldInfo(fHost, fUser, fPasswd, fPort) == false)
391         throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG),
392                         ERR_CROSS_ENGINE_CONFIG);
393 }
394 
395 
run()396 void CrossEngineStep::run()
397 {
398 //	idbassert(!fDelivery);
399 
400     if (fOutputJobStepAssociation.outSize() == 0)
401         throw logic_error("No output data list for cross engine step.");
402 
403     fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
404 
405     if (fOutputDL == NULL)
406         throw logic_error("Output is not a RowGroup data list.");
407 
408     if (fDelivery == true)
409     {
410         fOutputIterator = fOutputDL->getIterator();
411     }
412 
413     fRunner = jobstepThreadPool.invoke(Runner(this));
414 }
415 
416 
join()417 void CrossEngineStep::join()
418 {
419     if (fRunner)
420         jobstepThreadPool.join(fRunner);
421 }
422 
423 
execute()424 void CrossEngineStep::execute()
425 {
426     int ret = 0;
427     StepTeleStats sts;
428     sts.query_uuid = fQueryUuid;
429     sts.step_uuid = fStepUuid;
430 
431     try
432     {
433         sts.msg_type = StepTeleStats::ST_START;
434         sts.total_units_of_work = 1;
435         postStepStartTele(sts);
436 
437         ret = mysql->init(fHost.c_str(), fPort, fUser.c_str(), fPasswd.c_str(), fSchema.c_str());
438 
439         if (ret != 0)
440             mysql->handleMySqlError(mysql->getError().c_str(), ret);
441 
442         std::string query(makeQuery());
443         fLogger->logMessage(logging::LOG_TYPE_INFO, "QUERY to foreign engine: " + query);
444 
445         if (traceOn())
446             cout << "QUERY: " << query << endl;
447 
448         ret = mysql->run(query.c_str());
449 
450         if (ret != 0)
451             mysql->handleMySqlError(mysql->getError().c_str(), ret);
452 
453         int num_fields = mysql->getFieldCount();
454 
455         char** rowIn;                            // input
456         //shared_array<uint8_t> rgDataDelivered;      // output
457         RGData rgDataDelivered;
458         fRowGroupAdded.initRow(&fRowDelivered);
459         // use getDataSize() i/o getMaxDataSize() to make sure there are 8192 rows.
460         rgDataDelivered.reinit(fRowGroupAdded, fRowsPerGroup);
461         fRowGroupAdded.setData(&rgDataDelivered);
462         fRowGroupAdded.resetRowGroup(0);
463         fRowGroupAdded.getRow(0, &fRowDelivered);
464 
465         if (traceOn())
466             dlTimes.setFirstReadTime();
467 
468         // Any functions to evaluate
469         makeMappings();
470         bool doFE1 = ((fFeFcnJoin.size() > 0) || (fFeFilters.size() > 0));
471         bool doFE3 =  (fFeSelects.size() > 0);
472 
473         if (!doFE1 && !doFE3)
474         {
475             while ((rowIn = mysql->nextRow()) && !cancelled())
476             {
477                 for (int i = 0; i < num_fields; i++)
478                     setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), fRowDelivered);
479 
480                 addRow(rgDataDelivered);
481             }
482         }
483 
484         else if (doFE1 && !doFE3)  // FE in WHERE clause only
485         {
486             shared_array<uint8_t> rgDataFe1;  // functions in where clause
487             Row rowFe1;                       // row for fe evaluation
488             fRowGroupFe1.initRow(&rowFe1, true);
489             rgDataFe1.reset(new uint8_t[rowFe1.getSize()]);
490             rowFe1.setData(rgDataFe1.get());
491 
492             while ((rowIn = mysql->nextRow()) && !cancelled())
493             {
494                 // Parse the columns used in FE1 first, the other column may not need be parsed.
495                 for (int i = 0; i < num_fields; i++)
496                 {
497                     if (fFe1Column[i] != -1)
498                         setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe1);
499                 }
500 
501                 if (fFeFilters.size() > 0)
502                 {
503                     bool feBreak = false;
504 
505                     for (std::vector<boost::shared_ptr<execplan::ParseTree> >::iterator it = fFeFilters.begin(); it != fFeFilters.end(); it++)
506                     {
507                         if (fFeInstance->evaluate(rowFe1, (*it).get()) == false)
508                         {
509                             feBreak = true;
510                             break;
511                         }
512                     }
513 
514                     if (feBreak)
515                         continue;
516                 }
517 
518                 // evaluate the FE join column
519                 fFeInstance->evaluate(rowFe1, fFeFcnJoin);
520 
521                 // Pass throug the parsed columns, and parse the remaining columns.
522                 applyMapping(fFeMapping1, rowFe1, &fRowDelivered);
523 
524                 for (int i = 0; i < num_fields; i++)
525                 {
526                     if (fFe1Column[i] == -1)
527                         setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), fRowDelivered);
528                 }
529 
530                 addRow(rgDataDelivered);
531             }
532         }
533 
534         else if (!doFE1 && doFE3)  // FE in SELECT clause only
535         {
536             shared_array<uint8_t> rgDataFe3;  // functions in select clause
537             Row rowFe3;                       // row for fe evaluation
538             fRowGroupOut.initRow(&rowFe3, true);
539             rgDataFe3.reset(new uint8_t[rowFe3.getSize()]);
540             rowFe3.setData(rgDataFe3.get());
541 
542             while ((rowIn = mysql->nextRow()) && !cancelled())
543             {
544                 for (int i = 0; i < num_fields; i++)
545                     setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe3);
546 
547                 fFeInstance->evaluate(rowFe3, fFeSelects);
548 
549                 applyMapping(fFeMapping3, rowFe3, &fRowDelivered);
550 
551                 addRow(rgDataDelivered);
552             }
553         }
554 
555         else  // FE in SELECT clause, FE join and WHERE clause
556         {
557             shared_array<uint8_t> rgDataFe1;  // functions in where clause
558             Row rowFe1;                       // row for fe1 evaluation
559             fRowGroupFe1.initRow(&rowFe1, true);
560             rgDataFe1.reset(new uint8_t[rowFe1.getSize()]);
561             rowFe1.setData(rgDataFe1.get());
562 
563             shared_array<uint8_t> rgDataFe3;  // functions in select clause
564             Row rowFe3;                       // row for fe3 evaluation
565             fRowGroupOut.initRow(&rowFe3, true);
566             rgDataFe3.reset(new uint8_t[rowFe3.getSize()]);
567             rowFe3.setData(rgDataFe3.get());
568 
569             while ((rowIn = mysql->nextRow()) && !cancelled())
570             {
571                 // Parse the columns used in FE1 first, the other column may not need be parsed.
572                 for (int i = 0; i < num_fields; i++)
573                 {
574                     if (fFe1Column[i] != -1)
575                         setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe1);
576                 }
577 
578                 if (fFeFilters.size() > 0)
579                 {
580                     bool feBreak = false;
581 
582                     for (std::vector<boost::shared_ptr<execplan::ParseTree> >::iterator it = fFeFilters.begin(); it != fFeFilters.end(); it++)
583                     {
584                         if (fFeInstance->evaluate(rowFe1, (*it).get()) == false)
585                         {
586                             feBreak = true;
587                             break;
588                         }
589                     }
590 
591                     if (feBreak)
592                         continue;
593                 }
594 
595                 // evaluate the FE join column
596                 fFeInstance->evaluate(rowFe1, fFeFcnJoin);
597 
598                 // Pass throug the parsed columns, and parse the remaining columns.
599                 applyMapping(fFeMapping1, rowFe1, &rowFe3);
600 
601                 for (int i = 0; i < num_fields; i++)
602                 {
603                     if (fFe1Column[i] == -1)
604                         setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe3);
605                 }
606 
607                 fFeInstance->evaluate(rowFe3, fFeSelects);
608                 applyMapping(fFeMapping3, rowFe3, &fRowDelivered);
609 
610                 addRow(rgDataDelivered);
611             }
612         }
613 
614         //INSERT_ADAPTER(fOutputDL, rgDataDelivered);
615         fOutputDL->insert(rgDataDelivered);
616         fRowsRetrieved = mysql->getRowCount();
617     }
618     catch (...)
619     {
620         handleException(std::current_exception(),
621                         logging::ERR_CROSS_ENGINE_CONNECT,
622                         logging::ERR_ALWAYS_CRITICAL,
623                         "CrossEngineStep::execute()");
624     }
625 
626     sts.msg_type = StepTeleStats::ST_SUMMARY;
627     sts.total_units_of_work = sts.units_of_work_completed = 1;
628     sts.rows = fRowsReturned;
629     postStepSummaryTele(sts);
630 
631     fEndOfResult = true;
632     fOutputDL->endOfInput();
633 
634     // Bug 3136, let mini stats to be formatted if traceOn.
635     if (traceOn())
636     {
637         dlTimes.setLastReadTime();
638         dlTimes.setEndOfInputTime();
639         printCalTrace();
640     }
641 }
642 
643 
setBPP(JobStep * jobStep)644 void CrossEngineStep::setBPP(JobStep* jobStep)
645 {
646     pColStep* pcs = dynamic_cast<pColStep*>(jobStep);
647     pColScanStep* pcss = NULL;
648     pDictionaryStep* pds = NULL;
649     pDictionaryScan* pdss = NULL;
650     FilterStep* fs = NULL;
651     std::string bop = " AND ";
652 
653     if (pcs != 0)
654     {
655         if (dynamic_cast<PseudoColStep*>(pcs) != NULL)
656             throw logic_error("No Psedo Column for foreign engine.");
657 
658         if (pcs->BOP() == BOP_OR)
659             bop = " OR ";
660 
661         addFilterStr(pcs->getFilters(), bop);
662     }
663     else if ((pcss = dynamic_cast<pColScanStep*>(jobStep)) != NULL)
664     {
665         if (pcss->BOP() == BOP_OR)
666             bop = " OR ";
667 
668         addFilterStr(pcss->getFilters(), bop);
669     }
670     else if ((pds = dynamic_cast<pDictionaryStep*>(jobStep)) != NULL)
671     {
672         if (pds->BOP() == BOP_OR)
673             bop = " OR ";
674 
675         addFilterStr(pds->getFilters(), bop);
676     }
677     else if ((pdss = dynamic_cast<pDictionaryScan*>(jobStep)) != NULL)
678     {
679         if (pds->BOP() == BOP_OR)
680             bop = " OR ";
681 
682         addFilterStr(pdss->getFilters(), bop);
683     }
684     else if ((fs = dynamic_cast<FilterStep*>(jobStep)) != NULL)
685     {
686         addFilterStr(fs->getFilters(), bop);
687     }
688 }
689 
addFilterStr(const std::vector<const Filter * > & f,const std::string & bop)690 void CrossEngineStep::addFilterStr(const std::vector<const Filter*>& f, const std::string& bop)
691 {
692     if (f.size() == 0)
693         return;
694 
695     std::string filterStr;
696 
697     for (uint64_t i = 0; i < f.size(); i++)
698     {
699         if (f[i]->data().empty())
700             continue;
701 
702         if (!filterStr.empty())
703             filterStr += bop;
704 
705         filterStr += f[i]->data();
706     }
707 
708     if (!filterStr.empty())
709     {
710         if (!fWhereClause.empty())
711             fWhereClause += " AND (" + filterStr + ")";
712         else
713             fWhereClause += " WHERE (" + filterStr + ")";
714     }
715 }
716 
717 
setProjectBPP(JobStep * jobStep1,JobStep *)718 void CrossEngineStep::setProjectBPP(JobStep* jobStep1, JobStep*)
719 {
720     fColumnMap[jobStep1->tupleId()] = fColumnCount++;
721 
722     if (!fSelectClause.empty())
723         fSelectClause += ", ";
724     else
725         fSelectClause += "SELECT ";
726 
727     fSelectClause += "`" + jobStep1->name() + "`";
728 }
729 
730 
makeQuery()731 std::string CrossEngineStep::makeQuery()
732 {
733     ostringstream oss;
734     oss << fSelectClause << " FROM `" << fTable << "`";
735 
736     if (fTable.compare(fAlias) != 0)
737         oss << " `" << fAlias << "`";
738 
739     if (!fWhereClause.empty())
740         oss << fWhereClause;
741 
742     // the std::string must consist of a single SQL statement without a terminating semicolon ; or \g.
743     // oss << ";";
744     return oss.str();
745 }
746 
getOutputRowGroup() const747 const RowGroup& CrossEngineStep::getOutputRowGroup() const
748 {
749     return fRowGroupOut;
750 }
751 
752 
getDeliveredRowGroup() const753 const RowGroup& CrossEngineStep::getDeliveredRowGroup() const
754 {
755     return fRowGroupDelivered;
756 }
757 
758 
nextBand(messageqcpp::ByteStream & bs)759 uint32_t CrossEngineStep::nextBand(messageqcpp::ByteStream& bs)
760 {
761     //shared_array<uint8_t> rgDataOut;
762     RGData rgDataOut;
763     bool more = false;
764     uint32_t rowCount = 0;
765 
766     try
767     {
768         bs.restart();
769 
770         more = fOutputDL->next(fOutputIterator, &rgDataOut);
771 
772         if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
773             dlTimes.setFirstReadTime();
774 
775         if (more && !cancelled())
776         {
777             fRowGroupDelivered.setData(&rgDataOut);
778             fRowGroupDelivered.serializeRGData(bs);
779             rowCount = fRowGroupDelivered.getRowCount();
780         }
781         else
782         {
783             while (more)
784                 more = fOutputDL->next(fOutputIterator, &rgDataOut);
785 
786             fEndOfResult = true;
787         }
788     }
789     catch (...)
790     {
791         handleException(std::current_exception(),
792                         logging::ERR_IN_DELIVERY,
793                         logging::ERR_ALWAYS_CRITICAL,
794                         "CrossEngineStep::nextBand()");
795         while (more)
796             more = fOutputDL->next(fOutputIterator, &rgDataOut);
797 
798         fEndOfResult = true;
799     }
800 
801     if (fEndOfResult)
802     {
803         // send an empty / error band
804         rgDataOut.reinit(fRowGroupDelivered, 0);
805         fRowGroupDelivered.setData(&rgDataOut);
806         fRowGroupDelivered.resetRowGroup(0);
807         fRowGroupDelivered.setStatus(status());
808         fRowGroupDelivered.serializeRGData(bs);
809 
810         if (traceOn())
811         {
812             dlTimes.setLastReadTime();
813             dlTimes.setEndOfInputTime();
814         }
815 
816         if (traceOn())
817             printCalTrace();
818     }
819 
820     return rowCount;
821 }
822 
823 
toString() const824 const std::string CrossEngineStep::toString() const
825 {
826     ostringstream oss;
827     oss << "CrossEngineStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
828 
829     oss << " in:";
830 
831     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
832         oss << fInputJobStepAssociation.outAt(i);
833 
834     oss << " out:";
835 
836     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
837         oss << fOutputJobStepAssociation.outAt(i);
838 
839     oss << endl;
840 
841     return oss.str();
842 }
843 
844 
printCalTrace()845 void CrossEngineStep::printCalTrace()
846 {
847     time_t t = time (0);
848     char timeString[50];
849     ctime_r (&t, timeString);
850     timeString[strlen (timeString ) - 1] = '\0';
851     ostringstream logStr;
852     logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
853             << "; rows retrieved-" << fRowsRetrieved
854             << "; total rows returned-" << fRowsReturned << endl
855             << "\t1st read " << dlTimes.FirstReadTimeString()
856             << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
857             << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
858             << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
859             << "\tJob completion status " << status() << endl;
860     logEnd(logStr.str().c_str());
861     fExtendedInfo += logStr.str();
862     formatMiniStats();
863 }
864 
865 
formatMiniStats()866 void CrossEngineStep::formatMiniStats()
867 {
868     ostringstream oss;
869     oss << "CES "
870         << "UM "
871         << "- "
872         << "- "
873         << "- "
874         << "- "
875         << "- "
876         << "- "
877         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
878         << fRowsReturned << " ";
879     fMiniInfo += oss.str();
880 }
881 
882 }
883 // vim:ts=4 sw=4:
884