1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 // $Id: crossenginestep.cpp 9709 2013-07-20 06:08:46Z xlou $
19
20 #include "crossenginestep.h"
21 #include <unistd.h>
22 //#define NDEBUG
23 #include <cassert>
24 #include <sstream>
25 #include <iomanip>
26 using namespace std;
27
28 #include <boost/shared_ptr.hpp>
29 #include <boost/shared_array.hpp>
30 #include <boost/uuid/uuid_io.hpp>
31 using namespace boost;
32
33 #include "messagequeue.h"
34 using namespace messageqcpp;
35
36 #include "loggingid.h"
37 #include "errorcodes.h"
38 #include "idberrorinfo.h"
39 #include "errorids.h"
40 using namespace logging;
41
42 #include "calpontsystemcatalog.h"
43 #include "constantcolumn.h"
44 using namespace execplan;
45
46 #include "rowgroup.h"
47 using namespace rowgroup;
48
49 #include "dataconvert.h"
50 using namespace dataconvert;
51
52 #include "querytele.h"
53 using namespace querytele;
54
55 #include "funcexp.h"
56
57 #include "jobstep.h"
58 #include "jlf_common.h"
59
60 #include "libmysql_client.h"
61
62 namespace joblist
63 {
64
CrossEngineStep(const std::string & schema,const std::string & table,const std::string & alias,const JobInfo & jobInfo)65 CrossEngineStep::CrossEngineStep(
66 const std::string& schema,
67 const std::string& table,
68 const std::string& alias,
69 const JobInfo& jobInfo) :
70 BatchPrimitive(jobInfo),
71 fRowsRetrieved(0),
72 fRowsReturned(0),
73 fRowsPerGroup(256),
74 fOutputDL(NULL),
75 fOutputIterator(0),
76 fRunner(0),
77 fEndOfResult(false),
78 fSchema(schema),
79 fTable(table),
80 fAlias(alias),
81 fColumnCount(0),
82 fFeInstance(funcexp::FuncExp::instance())
83 {
84 fExtendedInfo = "CES: ";
85 getMysqldInfo(jobInfo);
86 fQtc.stepParms().stepType = StepTeleStats::T_CES;
87 mysql = new utils::LibMySQL();
88 }
89
90
~CrossEngineStep()91 CrossEngineStep::~CrossEngineStep()
92 {
93 delete mysql;
94 }
95
96
setOutputRowGroup(const rowgroup::RowGroup & rg)97 void CrossEngineStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
98 {
99 fRowGroupOut = fRowGroupDelivered = fRowGroupAdded = rg;
100 }
101
102
deliverStringTableRowGroup(bool b)103 void CrossEngineStep::deliverStringTableRowGroup(bool b)
104 {
105 // results are either using setField, or mapping to delivered row group
106 fRowGroupDelivered.setUseStringTable(b);
107 }
108
109
deliverStringTableRowGroup() const110 bool CrossEngineStep::deliverStringTableRowGroup() const
111 {
112 return fRowGroupDelivered.usesStringTable();
113 }
114
115
addFcnJoinExp(const std::vector<execplan::SRCP> & fe)116 void CrossEngineStep::addFcnJoinExp(const std::vector<execplan::SRCP>& fe)
117 {
118 fFeFcnJoin = fe;
119 }
120
121
addFcnExpGroup1(const boost::shared_ptr<ParseTree> & fe)122 void CrossEngineStep::addFcnExpGroup1(const boost::shared_ptr<ParseTree>& fe)
123 {
124 fFeFilters.push_back(fe);
125 }
126
127
setFE1Input(const rowgroup::RowGroup & rg)128 void CrossEngineStep::setFE1Input(const rowgroup::RowGroup& rg)
129 {
130 fRowGroupFe1 = rg;
131 }
132
133
setFcnExpGroup3(const std::vector<execplan::SRCP> & fe)134 void CrossEngineStep::setFcnExpGroup3(const std::vector<execplan::SRCP>& fe)
135 {
136 fFeSelects = fe;
137 }
138
139
setFE23Output(const rowgroup::RowGroup & rg)140 void CrossEngineStep::setFE23Output(const rowgroup::RowGroup& rg)
141 {
142 fRowGroupFe3 = fRowGroupDelivered = fRowGroupAdded = rg;
143 }
144
145
makeMappings()146 void CrossEngineStep::makeMappings()
147 {
148 fFe1Column.reset(new int[fColumnCount]);
149
150 for (uint64_t i = 0; i < fColumnCount; ++i)
151 fFe1Column[i] = -1;
152
153 if (fFeFilters.size() > 0 || fFeFcnJoin.size() > 0)
154 {
155 const std::vector<uint32_t>& colInFe1 = fRowGroupFe1.getKeys();
156
157 for (uint64_t i = 0; i < colInFe1.size(); i++)
158 {
159 map<uint32_t, uint32_t>::iterator it = fColumnMap.find(colInFe1[i]);
160
161 if (it != fColumnMap.end())
162 fFe1Column[it->second] = i;
163 }
164
165 fFeMapping1 = makeMapping(fRowGroupFe1, fRowGroupOut);
166 }
167
168 if (!fFeSelects.empty())
169 fFeMapping3 = makeMapping(fRowGroupOut, fRowGroupFe3);
170 }
171
172
setField(int i,const char * value,unsigned long length,MYSQL_FIELD * field,Row & row)173 void CrossEngineStep::setField(int i, const char* value, unsigned long length, MYSQL_FIELD* field, Row& row)
174 {
175 CalpontSystemCatalog::ColDataType colType = row.getColType(i);
176
177 if ((colType == CalpontSystemCatalog::CHAR || colType == CalpontSystemCatalog::VARCHAR) &&
178 row.getColumnWidth(i) > 8)
179 {
180 if (value != NULL)
181 row.setStringField(value, i);
182 else
183 row.setStringField("", i);
184 }
185 else if ((colType == CalpontSystemCatalog::BLOB) ||
186 (colType == CalpontSystemCatalog::TEXT) ||
187 (colType == CalpontSystemCatalog::VARBINARY))
188 {
189 if (value != NULL)
190 row.setVarBinaryField((const uint8_t*)value, length, i);
191 else
192 row.setVarBinaryField(NULL, 0, i);
193 }
194 else
195 {
196 CalpontSystemCatalog::ColType ct;
197 ct.colDataType = colType;
198 ct.colWidth = row.getColumnWidth(i);
199
200 if (colType == CalpontSystemCatalog::DECIMAL)
201 {
202 ct.scale = field->decimals;
203 ct.precision = field->length;
204 }
205 else
206 {
207 ct.scale = row.getScale(i);
208 ct.precision = row.getPrecision(i);
209 }
210
211 row.setIntField(convertValueNum(value, ct, row.getSignedNullValue(i)), i);
212 }
213 }
214
215
addRow(RGData & data)216 inline void CrossEngineStep::addRow(RGData& data)
217 {
218 fRowDelivered.setRid(fRowsReturned % fRowsPerGroup);
219 fRowDelivered.nextRow();
220 fRowGroupAdded.incRowCount();
221
222 if (++fRowsReturned % fRowsPerGroup == 0)
223 {
224 fOutputDL->insert(data);
225 data.reinit(fRowGroupAdded, fRowsPerGroup);
226 fRowGroupAdded.setData(&data);
227 fRowGroupAdded.resetRowGroup(fRowsReturned);
228 fRowGroupAdded.getRow(0, &fRowDelivered);
229 }
230 }
231
232
233 // simplified version of convertValueNum() in jlf_execplantojoblist.cpp.
convertValueNum(const char * str,const CalpontSystemCatalog::ColType & ct,int64_t nullValue)234 int64_t CrossEngineStep::convertValueNum(
235 const char* str, const CalpontSystemCatalog::ColType& ct, int64_t nullValue)
236 {
237 // return value
238 int64_t rv = nullValue;
239
240 // null value
241 if (str == NULL)
242 return rv;
243
244 // convertColumnData(execplan::CalpontSystemCatalog::ColType colType,
245 // const std::string& dataOrig,
246 // bool& pushWarning,
247 // bool nulFlag,
248 // bool noRoundup )
249 bool pushWarning = false;
250 boost::any anyVal = DataConvert::convertColumnData(ct, str, pushWarning, fTimeZone, false, true, false);
251
252 // Out of range values are treated as NULL as discussed during design review.
253 if (pushWarning)
254 return rv;
255
256 // non-null value
257 switch (ct.colDataType)
258 {
259 case CalpontSystemCatalog::BIT:
260 rv = boost::any_cast<bool>(anyVal);
261 break;
262
263 case CalpontSystemCatalog::TINYINT:
264 rv = boost::any_cast<char>(anyVal);
265 break;
266
267 case CalpontSystemCatalog::UTINYINT:
268 rv = boost::any_cast<uint8_t>(anyVal);
269 break;
270
271 case CalpontSystemCatalog::SMALLINT:
272 rv = boost::any_cast<int16_t>(anyVal);
273 break;
274
275 case CalpontSystemCatalog::USMALLINT:
276 rv = boost::any_cast<uint16_t>(anyVal);
277 break;
278
279 case CalpontSystemCatalog::MEDINT:
280 case CalpontSystemCatalog::INT:
281 #ifdef _MSC_VER
282 rv = boost::any_cast<int>(anyVal);
283 #else
284 rv = boost::any_cast<int32_t>(anyVal);
285 #endif
286 break;
287
288 case CalpontSystemCatalog::UMEDINT:
289 case CalpontSystemCatalog::UINT:
290 rv = boost::any_cast<uint32_t>(anyVal);
291 break;
292
293 case CalpontSystemCatalog::BIGINT:
294 rv = boost::any_cast<long long>(anyVal);
295 break;
296
297 case CalpontSystemCatalog::UBIGINT:
298 rv = boost::any_cast<uint64_t>(anyVal);
299 break;
300
301 case CalpontSystemCatalog::FLOAT:
302 case CalpontSystemCatalog::UFLOAT:
303 {
304 float f = boost::any_cast<float>(anyVal);
305
306 //N.B. There is a bug in boost::any or in gcc where, if you store a nan,
307 // you will get back a nan, but not necessarily the same bits that you put in.
308 // This only seems to be for float (double seems to work).
309 if (isnan(f))
310 {
311 uint32_t ti = joblist::FLOATNULL;
312 float* tfp = (float*)&ti;
313 f = *tfp;
314 }
315
316 float* fp = &f;
317 int32_t* ip = reinterpret_cast<int32_t*>(fp);
318 rv = *ip;
319 }
320 break;
321
322 case CalpontSystemCatalog::DOUBLE:
323 case CalpontSystemCatalog::UDOUBLE:
324 {
325 double d = boost::any_cast<double>(anyVal);
326 double* dp = &d;
327 int64_t* ip = reinterpret_cast<int64_t*>(dp);
328 rv = *ip;
329 }
330 break;
331
332 case CalpontSystemCatalog::CHAR:
333 case CalpontSystemCatalog::VARCHAR:
334 case CalpontSystemCatalog::VARBINARY:
335 case CalpontSystemCatalog::BLOB:
336 case CalpontSystemCatalog::TEXT:
337 case CalpontSystemCatalog::CLOB:
338 {
339 std::string i = boost::any_cast<std::string>(anyVal);
340 // bug 1932, pad nulls up to the size of v
341 i.resize(sizeof(rv), 0);
342 rv = *((uint64_t*) i.data());
343 }
344 break;
345
346 case CalpontSystemCatalog::DATE:
347 rv = boost::any_cast<uint32_t>(anyVal);
348 break;
349
350 case CalpontSystemCatalog::DATETIME:
351 rv = boost::any_cast<uint64_t>(anyVal);
352 break;
353
354 case CalpontSystemCatalog::TIMESTAMP:
355 rv = boost::any_cast<uint64_t>(anyVal);
356 break;
357
358 case CalpontSystemCatalog::TIME:
359 rv = boost::any_cast<int64_t>(anyVal);
360 break;
361
362 case CalpontSystemCatalog::DECIMAL:
363 case CalpontSystemCatalog::UDECIMAL:
364 if (ct.colWidth == CalpontSystemCatalog::ONE_BYTE)
365 rv = boost::any_cast<char>(anyVal);
366 else if (ct.colWidth == CalpontSystemCatalog::TWO_BYTE)
367 rv = boost::any_cast<int16_t>(anyVal);
368 else if (ct.colWidth == CalpontSystemCatalog::FOUR_BYTE)
369 #ifdef _MSC_VER
370 rv = boost::any_cast<int>(anyVal);
371
372 #else
373 rv = boost::any_cast<int32_t>(anyVal);
374 #endif
375 else
376 rv = boost::any_cast<long long>(anyVal);
377
378 break;
379
380 default:
381 break;
382 }
383
384 return rv;
385 }
386
387
getMysqldInfo(const JobInfo & jobInfo)388 void CrossEngineStep::getMysqldInfo(const JobInfo& jobInfo)
389 {
390 if (jobInfo.rm->getMysqldInfo(fHost, fUser, fPasswd, fPort) == false)
391 throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG),
392 ERR_CROSS_ENGINE_CONFIG);
393 }
394
395
run()396 void CrossEngineStep::run()
397 {
398 // idbassert(!fDelivery);
399
400 if (fOutputJobStepAssociation.outSize() == 0)
401 throw logic_error("No output data list for cross engine step.");
402
403 fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
404
405 if (fOutputDL == NULL)
406 throw logic_error("Output is not a RowGroup data list.");
407
408 if (fDelivery == true)
409 {
410 fOutputIterator = fOutputDL->getIterator();
411 }
412
413 fRunner = jobstepThreadPool.invoke(Runner(this));
414 }
415
416
join()417 void CrossEngineStep::join()
418 {
419 if (fRunner)
420 jobstepThreadPool.join(fRunner);
421 }
422
423
execute()424 void CrossEngineStep::execute()
425 {
426 int ret = 0;
427 StepTeleStats sts;
428 sts.query_uuid = fQueryUuid;
429 sts.step_uuid = fStepUuid;
430
431 try
432 {
433 sts.msg_type = StepTeleStats::ST_START;
434 sts.total_units_of_work = 1;
435 postStepStartTele(sts);
436
437 ret = mysql->init(fHost.c_str(), fPort, fUser.c_str(), fPasswd.c_str(), fSchema.c_str());
438
439 if (ret != 0)
440 mysql->handleMySqlError(mysql->getError().c_str(), ret);
441
442 std::string query(makeQuery());
443 fLogger->logMessage(logging::LOG_TYPE_INFO, "QUERY to foreign engine: " + query);
444
445 if (traceOn())
446 cout << "QUERY: " << query << endl;
447
448 ret = mysql->run(query.c_str());
449
450 if (ret != 0)
451 mysql->handleMySqlError(mysql->getError().c_str(), ret);
452
453 int num_fields = mysql->getFieldCount();
454
455 char** rowIn; // input
456 //shared_array<uint8_t> rgDataDelivered; // output
457 RGData rgDataDelivered;
458 fRowGroupAdded.initRow(&fRowDelivered);
459 // use getDataSize() i/o getMaxDataSize() to make sure there are 8192 rows.
460 rgDataDelivered.reinit(fRowGroupAdded, fRowsPerGroup);
461 fRowGroupAdded.setData(&rgDataDelivered);
462 fRowGroupAdded.resetRowGroup(0);
463 fRowGroupAdded.getRow(0, &fRowDelivered);
464
465 if (traceOn())
466 dlTimes.setFirstReadTime();
467
468 // Any functions to evaluate
469 makeMappings();
470 bool doFE1 = ((fFeFcnJoin.size() > 0) || (fFeFilters.size() > 0));
471 bool doFE3 = (fFeSelects.size() > 0);
472
473 if (!doFE1 && !doFE3)
474 {
475 while ((rowIn = mysql->nextRow()) && !cancelled())
476 {
477 for (int i = 0; i < num_fields; i++)
478 setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), fRowDelivered);
479
480 addRow(rgDataDelivered);
481 }
482 }
483
484 else if (doFE1 && !doFE3) // FE in WHERE clause only
485 {
486 shared_array<uint8_t> rgDataFe1; // functions in where clause
487 Row rowFe1; // row for fe evaluation
488 fRowGroupFe1.initRow(&rowFe1, true);
489 rgDataFe1.reset(new uint8_t[rowFe1.getSize()]);
490 rowFe1.setData(rgDataFe1.get());
491
492 while ((rowIn = mysql->nextRow()) && !cancelled())
493 {
494 // Parse the columns used in FE1 first, the other column may not need be parsed.
495 for (int i = 0; i < num_fields; i++)
496 {
497 if (fFe1Column[i] != -1)
498 setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe1);
499 }
500
501 if (fFeFilters.size() > 0)
502 {
503 bool feBreak = false;
504
505 for (std::vector<boost::shared_ptr<execplan::ParseTree> >::iterator it = fFeFilters.begin(); it != fFeFilters.end(); it++)
506 {
507 if (fFeInstance->evaluate(rowFe1, (*it).get()) == false)
508 {
509 feBreak = true;
510 break;
511 }
512 }
513
514 if (feBreak)
515 continue;
516 }
517
518 // evaluate the FE join column
519 fFeInstance->evaluate(rowFe1, fFeFcnJoin);
520
521 // Pass throug the parsed columns, and parse the remaining columns.
522 applyMapping(fFeMapping1, rowFe1, &fRowDelivered);
523
524 for (int i = 0; i < num_fields; i++)
525 {
526 if (fFe1Column[i] == -1)
527 setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), fRowDelivered);
528 }
529
530 addRow(rgDataDelivered);
531 }
532 }
533
534 else if (!doFE1 && doFE3) // FE in SELECT clause only
535 {
536 shared_array<uint8_t> rgDataFe3; // functions in select clause
537 Row rowFe3; // row for fe evaluation
538 fRowGroupOut.initRow(&rowFe3, true);
539 rgDataFe3.reset(new uint8_t[rowFe3.getSize()]);
540 rowFe3.setData(rgDataFe3.get());
541
542 while ((rowIn = mysql->nextRow()) && !cancelled())
543 {
544 for (int i = 0; i < num_fields; i++)
545 setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe3);
546
547 fFeInstance->evaluate(rowFe3, fFeSelects);
548
549 applyMapping(fFeMapping3, rowFe3, &fRowDelivered);
550
551 addRow(rgDataDelivered);
552 }
553 }
554
555 else // FE in SELECT clause, FE join and WHERE clause
556 {
557 shared_array<uint8_t> rgDataFe1; // functions in where clause
558 Row rowFe1; // row for fe1 evaluation
559 fRowGroupFe1.initRow(&rowFe1, true);
560 rgDataFe1.reset(new uint8_t[rowFe1.getSize()]);
561 rowFe1.setData(rgDataFe1.get());
562
563 shared_array<uint8_t> rgDataFe3; // functions in select clause
564 Row rowFe3; // row for fe3 evaluation
565 fRowGroupOut.initRow(&rowFe3, true);
566 rgDataFe3.reset(new uint8_t[rowFe3.getSize()]);
567 rowFe3.setData(rgDataFe3.get());
568
569 while ((rowIn = mysql->nextRow()) && !cancelled())
570 {
571 // Parse the columns used in FE1 first, the other column may not need be parsed.
572 for (int i = 0; i < num_fields; i++)
573 {
574 if (fFe1Column[i] != -1)
575 setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe1);
576 }
577
578 if (fFeFilters.size() > 0)
579 {
580 bool feBreak = false;
581
582 for (std::vector<boost::shared_ptr<execplan::ParseTree> >::iterator it = fFeFilters.begin(); it != fFeFilters.end(); it++)
583 {
584 if (fFeInstance->evaluate(rowFe1, (*it).get()) == false)
585 {
586 feBreak = true;
587 break;
588 }
589 }
590
591 if (feBreak)
592 continue;
593 }
594
595 // evaluate the FE join column
596 fFeInstance->evaluate(rowFe1, fFeFcnJoin);
597
598 // Pass throug the parsed columns, and parse the remaining columns.
599 applyMapping(fFeMapping1, rowFe1, &rowFe3);
600
601 for (int i = 0; i < num_fields; i++)
602 {
603 if (fFe1Column[i] == -1)
604 setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe3);
605 }
606
607 fFeInstance->evaluate(rowFe3, fFeSelects);
608 applyMapping(fFeMapping3, rowFe3, &fRowDelivered);
609
610 addRow(rgDataDelivered);
611 }
612 }
613
614 //INSERT_ADAPTER(fOutputDL, rgDataDelivered);
615 fOutputDL->insert(rgDataDelivered);
616 fRowsRetrieved = mysql->getRowCount();
617 }
618 catch (...)
619 {
620 handleException(std::current_exception(),
621 logging::ERR_CROSS_ENGINE_CONNECT,
622 logging::ERR_ALWAYS_CRITICAL,
623 "CrossEngineStep::execute()");
624 }
625
626 sts.msg_type = StepTeleStats::ST_SUMMARY;
627 sts.total_units_of_work = sts.units_of_work_completed = 1;
628 sts.rows = fRowsReturned;
629 postStepSummaryTele(sts);
630
631 fEndOfResult = true;
632 fOutputDL->endOfInput();
633
634 // Bug 3136, let mini stats to be formatted if traceOn.
635 if (traceOn())
636 {
637 dlTimes.setLastReadTime();
638 dlTimes.setEndOfInputTime();
639 printCalTrace();
640 }
641 }
642
643
setBPP(JobStep * jobStep)644 void CrossEngineStep::setBPP(JobStep* jobStep)
645 {
646 pColStep* pcs = dynamic_cast<pColStep*>(jobStep);
647 pColScanStep* pcss = NULL;
648 pDictionaryStep* pds = NULL;
649 pDictionaryScan* pdss = NULL;
650 FilterStep* fs = NULL;
651 std::string bop = " AND ";
652
653 if (pcs != 0)
654 {
655 if (dynamic_cast<PseudoColStep*>(pcs) != NULL)
656 throw logic_error("No Psedo Column for foreign engine.");
657
658 if (pcs->BOP() == BOP_OR)
659 bop = " OR ";
660
661 addFilterStr(pcs->getFilters(), bop);
662 }
663 else if ((pcss = dynamic_cast<pColScanStep*>(jobStep)) != NULL)
664 {
665 if (pcss->BOP() == BOP_OR)
666 bop = " OR ";
667
668 addFilterStr(pcss->getFilters(), bop);
669 }
670 else if ((pds = dynamic_cast<pDictionaryStep*>(jobStep)) != NULL)
671 {
672 if (pds->BOP() == BOP_OR)
673 bop = " OR ";
674
675 addFilterStr(pds->getFilters(), bop);
676 }
677 else if ((pdss = dynamic_cast<pDictionaryScan*>(jobStep)) != NULL)
678 {
679 if (pds->BOP() == BOP_OR)
680 bop = " OR ";
681
682 addFilterStr(pdss->getFilters(), bop);
683 }
684 else if ((fs = dynamic_cast<FilterStep*>(jobStep)) != NULL)
685 {
686 addFilterStr(fs->getFilters(), bop);
687 }
688 }
689
addFilterStr(const std::vector<const Filter * > & f,const std::string & bop)690 void CrossEngineStep::addFilterStr(const std::vector<const Filter*>& f, const std::string& bop)
691 {
692 if (f.size() == 0)
693 return;
694
695 std::string filterStr;
696
697 for (uint64_t i = 0; i < f.size(); i++)
698 {
699 if (f[i]->data().empty())
700 continue;
701
702 if (!filterStr.empty())
703 filterStr += bop;
704
705 filterStr += f[i]->data();
706 }
707
708 if (!filterStr.empty())
709 {
710 if (!fWhereClause.empty())
711 fWhereClause += " AND (" + filterStr + ")";
712 else
713 fWhereClause += " WHERE (" + filterStr + ")";
714 }
715 }
716
717
setProjectBPP(JobStep * jobStep1,JobStep *)718 void CrossEngineStep::setProjectBPP(JobStep* jobStep1, JobStep*)
719 {
720 fColumnMap[jobStep1->tupleId()] = fColumnCount++;
721
722 if (!fSelectClause.empty())
723 fSelectClause += ", ";
724 else
725 fSelectClause += "SELECT ";
726
727 fSelectClause += "`" + jobStep1->name() + "`";
728 }
729
730
makeQuery()731 std::string CrossEngineStep::makeQuery()
732 {
733 ostringstream oss;
734 oss << fSelectClause << " FROM `" << fTable << "`";
735
736 if (fTable.compare(fAlias) != 0)
737 oss << " `" << fAlias << "`";
738
739 if (!fWhereClause.empty())
740 oss << fWhereClause;
741
742 // the std::string must consist of a single SQL statement without a terminating semicolon ; or \g.
743 // oss << ";";
744 return oss.str();
745 }
746
getOutputRowGroup() const747 const RowGroup& CrossEngineStep::getOutputRowGroup() const
748 {
749 return fRowGroupOut;
750 }
751
752
getDeliveredRowGroup() const753 const RowGroup& CrossEngineStep::getDeliveredRowGroup() const
754 {
755 return fRowGroupDelivered;
756 }
757
758
nextBand(messageqcpp::ByteStream & bs)759 uint32_t CrossEngineStep::nextBand(messageqcpp::ByteStream& bs)
760 {
761 //shared_array<uint8_t> rgDataOut;
762 RGData rgDataOut;
763 bool more = false;
764 uint32_t rowCount = 0;
765
766 try
767 {
768 bs.restart();
769
770 more = fOutputDL->next(fOutputIterator, &rgDataOut);
771
772 if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
773 dlTimes.setFirstReadTime();
774
775 if (more && !cancelled())
776 {
777 fRowGroupDelivered.setData(&rgDataOut);
778 fRowGroupDelivered.serializeRGData(bs);
779 rowCount = fRowGroupDelivered.getRowCount();
780 }
781 else
782 {
783 while (more)
784 more = fOutputDL->next(fOutputIterator, &rgDataOut);
785
786 fEndOfResult = true;
787 }
788 }
789 catch (...)
790 {
791 handleException(std::current_exception(),
792 logging::ERR_IN_DELIVERY,
793 logging::ERR_ALWAYS_CRITICAL,
794 "CrossEngineStep::nextBand()");
795 while (more)
796 more = fOutputDL->next(fOutputIterator, &rgDataOut);
797
798 fEndOfResult = true;
799 }
800
801 if (fEndOfResult)
802 {
803 // send an empty / error band
804 rgDataOut.reinit(fRowGroupDelivered, 0);
805 fRowGroupDelivered.setData(&rgDataOut);
806 fRowGroupDelivered.resetRowGroup(0);
807 fRowGroupDelivered.setStatus(status());
808 fRowGroupDelivered.serializeRGData(bs);
809
810 if (traceOn())
811 {
812 dlTimes.setLastReadTime();
813 dlTimes.setEndOfInputTime();
814 }
815
816 if (traceOn())
817 printCalTrace();
818 }
819
820 return rowCount;
821 }
822
823
toString() const824 const std::string CrossEngineStep::toString() const
825 {
826 ostringstream oss;
827 oss << "CrossEngineStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
828
829 oss << " in:";
830
831 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
832 oss << fInputJobStepAssociation.outAt(i);
833
834 oss << " out:";
835
836 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
837 oss << fOutputJobStepAssociation.outAt(i);
838
839 oss << endl;
840
841 return oss.str();
842 }
843
844
printCalTrace()845 void CrossEngineStep::printCalTrace()
846 {
847 time_t t = time (0);
848 char timeString[50];
849 ctime_r (&t, timeString);
850 timeString[strlen (timeString ) - 1] = '\0';
851 ostringstream logStr;
852 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
853 << "; rows retrieved-" << fRowsRetrieved
854 << "; total rows returned-" << fRowsReturned << endl
855 << "\t1st read " << dlTimes.FirstReadTimeString()
856 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
857 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
858 << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
859 << "\tJob completion status " << status() << endl;
860 logEnd(logStr.str().c_str());
861 fExtendedInfo += logStr.str();
862 formatMiniStats();
863 }
864
865
formatMiniStats()866 void CrossEngineStep::formatMiniStats()
867 {
868 ostringstream oss;
869 oss << "CES "
870 << "UM "
871 << "- "
872 << "- "
873 << "- "
874 << "- "
875 << "- "
876 << "- "
877 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
878 << fRowsReturned << " ";
879 fMiniInfo += oss.str();
880 }
881
882 }
883 // vim:ts=4 sw=4:
884