1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /*****************************************************************************
20  * $Id: tupleunion.cpp 9665 2013-07-02 21:47:39Z pleblanc $
21  *
22  ****************************************************************************/
23 
24 #include <string>
25 #include <boost/thread.hpp>
26 #include <boost/thread/mutex.hpp>
27 #include <boost/uuid/uuid_io.hpp>
28 
29 #include "querytele.h"
30 using namespace querytele;
31 
32 #include "dataconvert.h"
33 #include "hasher.h"
34 #include "jlf_common.h"
35 #include "resourcemanager.h"
36 #include "tupleunion.h"
37 
38 using namespace std;
39 using namespace std::tr1;
40 using namespace boost;
41 using namespace execplan;
42 using namespace rowgroup;
43 using namespace dataconvert;
44 
45 #ifndef __linux__
46 #ifndef M_LN10
47 #define M_LN10 2.30258509299404568402	/* log_e 10 */
48 #endif
49 namespace
50 {
51 //returns the value of 10 raised to the power x.
exp10(double x)52 inline double exp10(double x)
53 {
54     return exp(x * M_LN10);
55 }
56 }
57 #endif
58 
59 namespace joblist
60 {
operator ()(const RowPosition & p) const61 inline uint64_t TupleUnion::Hasher::operator()(const RowPosition& p) const
62 {
63     Row& row = ts->row;
64 
65     if (p.group & RowPosition::normalizedFlag)
66         ts->normalizedData[p.group & ~RowPosition::normalizedFlag].getRow(p.row, &row);
67     else
68         ts->rowMemory[p.group].getRow(p.row, &row);
69 
70     return row.hash();
71 }
72 
operator ()(const RowPosition & d1,const RowPosition & d2) const73 inline bool TupleUnion::Eq::operator()(const RowPosition& d1, const RowPosition& d2) const
74 {
75     Row& r1 = ts->row, &r2 = ts->row2;
76 
77     if (d1.group & RowPosition::normalizedFlag)
78         ts->normalizedData[d1.group & ~RowPosition::normalizedFlag].getRow(d1.row, &r1);
79     else
80         ts->rowMemory[d1.group].getRow(d1.row, &r1);
81 
82     if (d2.group & RowPosition::normalizedFlag)
83         ts->normalizedData[d2.group & ~RowPosition::normalizedFlag].getRow(d2.row, &r2);
84     else
85         ts->rowMemory[d2.group].getRow(d2.row, &r2);
86 
87     return r1.equals(r2);
88 }
89 
TupleUnion(CalpontSystemCatalog::OID tableOID,const JobInfo & jobInfo)90 TupleUnion::TupleUnion(CalpontSystemCatalog::OID tableOID, const JobInfo& jobInfo) :
91     JobStep(jobInfo),
92     fTableOID(tableOID),
93     output(NULL),
94     outputIt(-1),
95     memUsage(0),
96     rm(jobInfo.rm),
97     runnersDone(0),
98     distinctCount(0),
99     distinctDone(0),
100     fRowsReturned(0),
101     runRan(false),
102     joinRan(false),
103     sessionMemLimit(jobInfo.umMemLimit),
104     fTimeZone(jobInfo.timeZone)
105 {
106     uniquer.reset(new Uniquer_t(10, Hasher(this), Eq(this), allocator));
107     fExtendedInfo = "TUN: ";
108     fQtc.stepParms().stepType = StepTeleStats::T_TUN;
109 }
110 
~TupleUnion()111 TupleUnion::~TupleUnion()
112 {
113     rm->returnMemory(memUsage, sessionMemLimit);
114 
115     if (!runRan && output)
116         output->endOfInput();
117 }
118 
tableOid() const119 CalpontSystemCatalog::OID TupleUnion::tableOid() const
120 {
121     return fTableOID;
122 }
123 
setInputRowGroups(const vector<rowgroup::RowGroup> & in)124 void TupleUnion::setInputRowGroups(const vector<rowgroup::RowGroup>& in)
125 {
126     inputRGs = in;
127 }
128 
setOutputRowGroup(const rowgroup::RowGroup & out)129 void TupleUnion::setOutputRowGroup(const rowgroup::RowGroup& out)
130 {
131     outputRG = out;
132     rowLength = outputRG.getRowSizeWithStrings();
133 }
134 
setDistinctFlags(const vector<bool> & v)135 void TupleUnion::setDistinctFlags(const vector<bool>& v)
136 {
137     distinctFlags = v;
138 }
139 
readInput(uint32_t which)140 void TupleUnion::readInput(uint32_t which)
141 {
142     /* The handling of the output got a little kludgey with the string table enhancement.
143      * When there is no distinct check, the outputs are all generated independently of
144      * each other locally in this fcn.  When there is a distinct check, threads
145      * share the output, which is built in the 'rowMemory' vector rather than in
146      * thread-local memory.  Building the result in a common space allows us to
147      * store 8-byte offsets in rowMemory rather than 16-bytes for absolute pointers.
148      */
149 
150     RowGroupDL* dl = NULL;
151     bool more = true;
152     RGData inRGData, outRGData, *tmpRGData;
153     uint32_t it = numeric_limits<uint32_t>::max();
154     RowGroup l_inputRG, l_outputRG, l_tmpRG;
155     Row inRow, outRow, tmpRow;
156     bool distinct;
157     uint64_t memUsageBefore, memUsageAfter, memDiff;
158     StepTeleStats sts;
159     sts.query_uuid = fQueryUuid;
160     sts.step_uuid = fStepUuid;
161 
162     l_outputRG = outputRG;
163     dl = inputs[which];
164     l_inputRG = inputRGs[which];
165     l_inputRG.initRow(&inRow);
166     l_outputRG.initRow(&outRow);
167     distinct = distinctFlags[which];
168 
169     if (distinct)
170     {
171         l_tmpRG = outputRG;
172         tmpRGData = &normalizedData[which];
173         l_tmpRG.initRow(&tmpRow);
174         l_tmpRG.setData(tmpRGData);
175         l_tmpRG.resetRowGroup(0);
176         l_tmpRG.getRow(0, &tmpRow);
177     }
178     else
179     {
180         outRGData = RGData(l_outputRG);
181         l_outputRG.setData(&outRGData);
182         l_outputRG.resetRowGroup(0);
183         l_outputRG.getRow(0, &outRow);
184     }
185 
186     try
187     {
188 
189         it = dl->getIterator();
190         more = dl->next(it, &inRGData);
191 
192         if (dlTimes.FirstReadTime().tv_sec == 0)
193             dlTimes.setFirstReadTime();
194 
195         if (fStartTime == -1)
196         {
197             sts.msg_type = StepTeleStats::ST_START;
198             sts.total_units_of_work = 1;
199             postStepStartTele(sts);
200         }
201 
202         while (more && !cancelled())
203         {
204             /*
205             	normalize each row
206             	  if distinct flag is set
207             		copy the row into the output and test for uniqueness
208             		  if unique, increment the row count
209             	  else
210             	    copy the row into the output & inc row count
211             */
212             l_inputRG.setData(&inRGData);
213             l_inputRG.getRow(0, &inRow);
214 
215             if (distinct)
216             {
217                 memDiff = 0;
218                 l_tmpRG.resetRowGroup(0);
219                 l_tmpRG.getRow(0, &tmpRow);
220                 l_tmpRG.setRowCount(l_inputRG.getRowCount());
221 
222                 for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow(),
223                         tmpRow.nextRow())
224                     normalize(inRow, &tmpRow);
225 
226                 l_tmpRG.getRow(0, &tmpRow);
227                 {
228                     boost::mutex::scoped_lock lk(uniquerMutex);
229                     getOutput(&l_outputRG, &outRow, &outRGData);
230                     memUsageBefore = allocator.getMemUsage();
231 
232                     for (uint32_t i = 0; i < l_tmpRG.getRowCount(); i++, tmpRow.nextRow())
233                     {
234                         pair<Uniquer_t::iterator, bool> inserted;
235                         inserted = uniquer->insert(RowPosition(which | RowPosition::normalizedFlag, i));
236 
237                         if (inserted.second)
238                         {
239                             copyRow(tmpRow, &outRow);
240                             const_cast<RowPosition&>(*(inserted.first)) = RowPosition(rowMemory.size() - 1, l_outputRG.getRowCount());
241                             memDiff += outRow.getRealSize();
242                             addToOutput(&outRow, &l_outputRG, true, outRGData);
243                         }
244                     }
245 
246                     memUsageAfter = allocator.getMemUsage();
247                     memDiff += (memUsageAfter - memUsageBefore);
248                     memUsage += memDiff;
249                 }
250 
251                 if (!rm->getMemory(memDiff, sessionMemLimit))
252                 {
253                     fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_UNION_TOO_BIG);
254 
255                     if (status() == 0) // preserve existing error code
256                     {
257                         errorMessage(logging::IDBErrorInfo::instance()->errorMsg(
258                                          logging::ERR_UNION_TOO_BIG));
259                         status(logging::ERR_UNION_TOO_BIG);
260                     }
261 
262                     abort();
263                 }
264             }
265             else
266             {
267                 for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow())
268                 {
269                     normalize(inRow, &outRow);
270                     addToOutput(&outRow, &l_outputRG, false, outRGData);
271                 }
272             }
273 
274             more = dl->next(it, &inRGData);
275         }
276     }
277     catch (...)
278     {
279         handleException(std::current_exception(),
280                         logging::unionStepErr,
281                         logging::ERR_UNION_TOO_BIG,
282                         "TupleUnion::readInput()");
283         status(logging::unionStepErr);
284         abort();
285     }
286 
287     /* make sure that the input was drained before exiting.  This can happen if the
288     query was aborted */
289     if (dl && it != numeric_limits<uint32_t>::max())
290         while (more)
291             more = dl->next(it, &inRGData);
292 
293     {
294         boost::mutex::scoped_lock lock1(uniquerMutex);
295         boost::mutex::scoped_lock lock2(sMutex);
296 
297         if (!distinct && l_outputRG.getRowCount() > 0)
298             output->insert(outRGData);
299 
300         if (distinct)
301         {
302             getOutput(&l_outputRG, &outRow, &outRGData);
303 
304             if (++distinctDone == distinctCount && l_outputRG.getRowCount() > 0)
305                 output->insert(outRGData);
306         }
307 
308         if (++runnersDone == fInputJobStepAssociation.outSize())
309         {
310             output->endOfInput();
311 
312             sts.msg_type = StepTeleStats::ST_SUMMARY;
313             sts.total_units_of_work = sts.units_of_work_completed = 1;
314             sts.rows = fRowsReturned;
315             postStepSummaryTele(sts);
316 
317             if (traceOn())
318             {
319                 dlTimes.setLastReadTime();
320                 dlTimes.setEndOfInputTime();
321 
322                 time_t t = time (0);
323                 char timeString[50];
324                 ctime_r (&t, timeString);
325                 timeString[strlen (timeString ) - 1] = '\0';
326                 ostringstream logStr;
327                 logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at "
328                         << timeString << "; total rows returned-" << fRowsReturned << endl
329                         << "\t1st read " << dlTimes.FirstReadTimeString()
330                         << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
331                         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
332                         << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
333                         << "\tJob completion status " << status() << endl;
334                 logEnd(logStr.str().c_str());
335                 fExtendedInfo += logStr.str();
336                 formatMiniStats();
337             }
338         }
339     }
340 }
341 
nextBand(messageqcpp::ByteStream & bs)342 uint32_t TupleUnion::nextBand(messageqcpp::ByteStream& bs)
343 {
344     RGData mem;
345     bool more;
346     uint32_t ret = 0;
347 
348     bs.restart();
349     more = output->next(outputIt, &mem);
350 
351     if (more)
352         outputRG.setData(&mem);
353     else
354     {
355         mem = RGData(outputRG, 0);
356         outputRG.setData(&mem);
357         outputRG.resetRowGroup(0);
358         outputRG.setStatus(status());
359     }
360 
361     outputRG.serializeRGData(bs);
362     ret = outputRG.getRowCount();
363 
364     return ret;
365 }
366 
getOutput(RowGroup * rg,Row * row,RGData * data)367 void TupleUnion::getOutput(RowGroup* rg, Row* row, RGData* data)
368 {
369     if (UNLIKELY(rowMemory.empty()))
370     {
371         *data = RGData(*rg);
372         rg->setData(data);
373         rg->resetRowGroup(0);
374         rowMemory.push_back(*data);
375     }
376     else
377     {
378         *data = rowMemory.back();
379         rg->setData(data);
380     }
381 
382     rg->getRow(rg->getRowCount(), row);
383 }
384 
addToOutput(Row * r,RowGroup * rg,bool keepit,RGData & data)385 void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit,
386                              RGData& data)
387 {
388     r->nextRow();
389     rg->incRowCount();
390     fRowsReturned++;
391 
392     if (rg->getRowCount() == 8192)
393     {
394         {
395             boost::mutex::scoped_lock lock(sMutex);
396             output->insert(data);
397         }
398         data = RGData(*rg);
399         rg->setData(&data);
400         rg->resetRowGroup(0);
401         rg->getRow(0, r);
402 
403         if (keepit)
404             rowMemory.push_back(data);
405     }
406 }
407 
normalize(const Row & in,Row * out)408 void TupleUnion::normalize(const Row& in, Row* out)
409 {
410     uint32_t i;
411 
412     out->setRid(0);
413 
414     for (i = 0; i < out->getColumnCount(); i++)
415     {
416         if (in.isNullValue(i))
417         {
418             writeNull(out, i);
419             continue;
420         }
421 
422         switch (in.getColTypes()[i])
423         {
424             case CalpontSystemCatalog::TINYINT:
425             case CalpontSystemCatalog::SMALLINT:
426             case CalpontSystemCatalog::MEDINT:
427             case CalpontSystemCatalog::INT:
428             case CalpontSystemCatalog::BIGINT:
429                 switch (out->getColTypes()[i])
430                 {
431                     case CalpontSystemCatalog::TINYINT:
432                     case CalpontSystemCatalog::SMALLINT:
433                     case CalpontSystemCatalog::MEDINT:
434                     case CalpontSystemCatalog::INT:
435                     case CalpontSystemCatalog::BIGINT:
436                         if (out->getScale(i) || in.getScale(i))
437                             goto dec1;
438 
439                         out->setIntField(in.getIntField(i), i);
440                         break;
441 
442                     case CalpontSystemCatalog::UTINYINT:
443                     case CalpontSystemCatalog::USMALLINT:
444                     case CalpontSystemCatalog::UMEDINT:
445                     case CalpontSystemCatalog::UINT:
446                     case CalpontSystemCatalog::UBIGINT:
447                         if (in.getScale(i))
448                             goto dec1;
449 
450                         out->setUintField(in.getUintField(i), i);
451                         break;
452 
453                     case CalpontSystemCatalog::CHAR:
454                     case CalpontSystemCatalog::TEXT:
455                     case CalpontSystemCatalog::VARCHAR:
456                     {
457                         ostringstream os;
458 
459                         if (in.getScale(i))
460                         {
461                             double d = in.getIntField(i);
462                             d /= exp10(in.getScale(i));
463                             os.precision(15);
464                             os << d;
465                         }
466                         else
467                             os << in.getIntField(i);
468 
469                         out->setStringField(os.str(), i);
470                         break;
471                     }
472 
473                     case CalpontSystemCatalog::DATE:
474                     case CalpontSystemCatalog::DATETIME:
475                     case CalpontSystemCatalog::TIME:
476                     case CalpontSystemCatalog::TIMESTAMP:
477                         throw logic_error("TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime");
478 
479                     case CalpontSystemCatalog::FLOAT:
480                     case CalpontSystemCatalog::UFLOAT:
481                     {
482                         int scale = in.getScale(i);
483 
484                         if (scale != 0)
485                         {
486                             float f = in.getIntField(i);
487                             f /= (uint64_t) pow(10.0, scale);
488                             out->setFloatField(f, i);
489                         }
490                         else
491                             out->setFloatField(in.getIntField(i), i);
492 
493                         break;
494                     }
495 
496                     case CalpontSystemCatalog::DOUBLE:
497                     case CalpontSystemCatalog::UDOUBLE:
498                     {
499                         int scale = in.getScale(i);
500 
501                         if (scale != 0)
502                         {
503                             double d = in.getIntField(i);
504                             d /= (uint64_t) pow(10.0, scale);
505                             out->setDoubleField(d, i);
506                         }
507                         else
508                             out->setDoubleField(in.getIntField(i), i);
509 
510                         break;
511                     }
512 
513                     case CalpontSystemCatalog::LONGDOUBLE:
514                     {
515                         int scale = in.getScale(i);
516                         long double d = in.getIntField(i);
517                         if (scale != 0)
518                         {
519                             d /= (uint64_t) pow(10.0, scale);
520                         }
521                         out->setLongDoubleField(d, i);
522                         break;
523                     }
524 
525                     case CalpontSystemCatalog::DECIMAL:
526                     case CalpontSystemCatalog::UDECIMAL:
527                     {
528 dec1:
529                         uint64_t val = in.getIntField(i);
530                         int diff = out->getScale(i) - in.getScale(i);
531 
532                         if (diff < 0)
533                             val /= (uint64_t) pow((double) 10, (double) - diff);
534                         else
535                             val *= (uint64_t) pow((double) 10, (double) diff);
536 
537                         out->setIntField(val, i);
538                         break;
539                     }
540 
541                     default:
542                         ostringstream os;
543                         os << "TupleUnion::normalize(): tried an illegal conversion: integer to "
544                            << out->getColTypes()[i];
545                         throw logic_error(os.str());
546                 }
547 
548                 break;
549 
550             case CalpontSystemCatalog::UTINYINT:
551             case CalpontSystemCatalog::USMALLINT:
552             case CalpontSystemCatalog::UMEDINT:
553             case CalpontSystemCatalog::UINT:
554             case CalpontSystemCatalog::UBIGINT:
555                 switch (out->getColTypes()[i])
556                 {
557                     case CalpontSystemCatalog::TINYINT:
558                     case CalpontSystemCatalog::SMALLINT:
559                     case CalpontSystemCatalog::MEDINT:
560                     case CalpontSystemCatalog::INT:
561                     case CalpontSystemCatalog::BIGINT:
562                         if (out->getScale(i))
563                             goto dec2;
564 
565                         out->setIntField(in.getUintField(i), i);
566                         break;
567 
568                     case CalpontSystemCatalog::UTINYINT:
569                     case CalpontSystemCatalog::USMALLINT:
570                     case CalpontSystemCatalog::UMEDINT:
571                     case CalpontSystemCatalog::UINT:
572                     case CalpontSystemCatalog::UBIGINT:
573                         out->setUintField(in.getUintField(i), i);
574                         break;
575 
576                     case CalpontSystemCatalog::CHAR:
577                     case CalpontSystemCatalog::TEXT:
578                     case CalpontSystemCatalog::VARCHAR:
579                     {
580                         ostringstream os;
581 
582                         if (in.getScale(i))
583                         {
584                             double d = in.getUintField(i);
585                             d /= exp10(in.getScale(i));
586                             os.precision(15);
587                             os << d;
588                         }
589                         else
590                             os << in.getUintField(i);
591 
592                         out->setStringField(os.str(), i);
593                         break;
594                     }
595 
596                     case CalpontSystemCatalog::DATE:
597                     case CalpontSystemCatalog::DATETIME:
598                     case CalpontSystemCatalog::TIME:
599                     case CalpontSystemCatalog::TIMESTAMP:
600                         throw logic_error("TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime");
601 
602                     case CalpontSystemCatalog::FLOAT:
603                     case CalpontSystemCatalog::UFLOAT:
604                     {
605                         int scale = in.getScale(i);
606 
607                         if (scale != 0)
608                         {
609                             float f = in.getUintField(i);
610                             f /= (uint64_t) pow(10.0, scale);
611                             out->setFloatField(f, i);
612                         }
613                         else
614                             out->setFloatField(in.getUintField(i), i);
615 
616                         break;
617                     }
618 
619                     case CalpontSystemCatalog::DOUBLE:
620                     case CalpontSystemCatalog::UDOUBLE:
621                     {
622                         int scale = in.getScale(i);
623 
624                         if (scale != 0)
625                         {
626                             double d = in.getUintField(i);
627                             d /= (uint64_t) pow(10.0, scale);
628                             out->setDoubleField(d, i);
629                         }
630                         else
631                             out->setDoubleField(in.getUintField(i), i);
632 
633                         break;
634                     }
635 
636                     case CalpontSystemCatalog::LONGDOUBLE:
637                     {
638                         int scale = in.getScale(i);
639 
640                         if (scale != 0)
641                         {
642                             long double d = in.getUintField(i);
643                             d /= (uint64_t) pow(10.0, scale);
644                             out->setLongDoubleField(d, i);
645                         }
646                         else
647                             out->setLongDoubleField(in.getUintField(i), i);
648 
649                         break;
650                     }
651 
652                     case CalpontSystemCatalog::DECIMAL:
653                     case CalpontSystemCatalog::UDECIMAL:
654                     {
655 dec2:
656                         uint64_t val = in.getIntField(i);
657                         int diff = out->getScale(i) - in.getScale(i);
658 
659                         if (diff < 0)
660                             val /= (uint64_t) pow((double) 10, (double) - diff);
661                         else
662                             val *= (uint64_t) pow((double) 10, (double) diff);
663 
664                         out->setIntField(val, i);
665                         break;
666                     }
667 
668                     default:
669                         ostringstream os;
670                         os << "TupleUnion::normalize(): tried an illegal conversion: integer to "
671                            << out->getColTypes()[i];
672                         throw logic_error(os.str());
673                 }
674 
675                 break;
676 
677             case CalpontSystemCatalog::CHAR:
678             case CalpontSystemCatalog::TEXT:
679             case CalpontSystemCatalog::VARCHAR:
680                 switch (out->getColTypes()[i])
681                 {
682                     case CalpontSystemCatalog::CHAR:
683                     case CalpontSystemCatalog::TEXT:
684                     case CalpontSystemCatalog::VARCHAR:
685                         out->setStringField(in.getStringField(i), i);
686                         break;
687 
688                     default:
689                     {
690                         ostringstream os;
691                         os << "TupleUnion::normalize(): tried an illegal conversion: string to "
692                            << out->getColTypes()[i];
693                         throw logic_error(os.str());
694                     }
695                 }
696 
697                 break;
698 
699             case CalpontSystemCatalog::DATE:
700                 switch (out->getColTypes()[i])
701                 {
702                     case CalpontSystemCatalog::DATE:
703                         out->setIntField(in.getIntField(i), i);
704                         break;
705 
706                     case CalpontSystemCatalog::DATETIME:
707                     {
708                         uint64_t date = in.getUintField(i);
709                         date &= ~0x3f;  // zero the 'spare' field
710                         date <<= 32;
711                         out->setUintField(date, i);
712                         break;
713                     }
714 
715                     case CalpontSystemCatalog::TIMESTAMP:
716                     {
717                         dataconvert::Date date(in.getUintField(i));
718                         dataconvert::MySQLTime m_time;
719                         m_time.year = date.year;
720                         m_time.month = date.month;
721                         m_time.day = date.day;
722                         m_time.hour = 0;
723                         m_time.minute = 0;
724                         m_time.second = 0;
725                         m_time.second_part = 0;
726 
727                         dataconvert::TimeStamp timeStamp;
728                         bool isValid = true;
729                         int64_t seconds = dataconvert::mySQLTimeToGmtSec(m_time, fTimeZone, isValid);
730 
731                         if (!isValid)
732                         {
733                             timeStamp.reset();
734                         }
735                         else
736                         {
737                             timeStamp.second = seconds;
738                             timeStamp.msecond = m_time.second_part;
739                         }
740 
741                         uint64_t outValue = (uint64_t) *(reinterpret_cast<uint64_t*>(&timeStamp));
742                         out->setUintField(outValue, i);
743                         break;
744                     }
745 
746                     case CalpontSystemCatalog::CHAR:
747                     case CalpontSystemCatalog::TEXT:
748                     case CalpontSystemCatalog::VARCHAR:
749                     {
750                         string d = DataConvert::dateToString(in.getUintField(i));
751                         out->setStringField(d, i);
752                         break;
753                     }
754 
755                     default:
756                     {
757                         ostringstream os;
758                         os << "TupleUnion::normalize(): tried an illegal conversion: date to "
759                            << out->getColTypes()[i];
760                         throw logic_error(os.str());
761                     }
762                 }
763 
764                 break;
765 
766             case CalpontSystemCatalog::DATETIME:
767                 switch (out->getColTypes()[i])
768                 {
769                     case CalpontSystemCatalog::DATETIME:
770                         out->setIntField(in.getIntField(i), i);
771                         break;
772 
773                     case CalpontSystemCatalog::DATE:
774                     {
775                         uint64_t val = in.getUintField(i);
776                         val >>= 32;
777                         out->setUintField(val, i);
778                         break;
779                     }
780 
781                     case CalpontSystemCatalog::TIMESTAMP:
782                     {
783                         uint64_t val = in.getUintField(i);
784                         dataconvert::DateTime dtime(val);
785                         dataconvert::MySQLTime m_time;
786                         dataconvert::TimeStamp timeStamp;
787 
788                         m_time.year = dtime.year;
789                         m_time.month = dtime.month;
790                         m_time.day = dtime.day;
791                         m_time.hour = dtime.hour;
792                         m_time.minute = dtime.minute;
793                         m_time.second = dtime.second;
794                         m_time.second_part = dtime.msecond;
795 
796                         bool isValid = true;
797                         int64_t seconds = mySQLTimeToGmtSec(m_time, fTimeZone, isValid);
798 
799                         if (!isValid)
800                         {
801                             timeStamp.reset();
802                         }
803                         else
804                         {
805                             timeStamp.second = seconds;
806                             timeStamp.msecond = m_time.second_part;
807                         }
808 
809                         uint64_t outValue = (uint64_t) *(reinterpret_cast<uint64_t*>(&timeStamp));
810                         out->setUintField(outValue, i);
811                         break;
812                     }
813 
814                     case CalpontSystemCatalog::CHAR:
815                     case CalpontSystemCatalog::TEXT:
816                     case CalpontSystemCatalog::VARCHAR:
817                     {
818                         string d = DataConvert::datetimeToString(in.getUintField(i));
819                         out->setStringField(d, i);
820                         break;
821                     }
822 
823                     default:
824                     {
825                         ostringstream os;
826                         os << "TupleUnion::normalize(): tried an illegal conversion: datetime to "
827                            << out->getColTypes()[i];
828                         throw logic_error(os.str());
829                     }
830                 }
831 
832                 break;
833 
834             case CalpontSystemCatalog::TIMESTAMP:
835                 switch (out->getColTypes()[i])
836                 {
837                     case CalpontSystemCatalog::TIMESTAMP:
838                         out->setIntField(in.getIntField(i), i);
839                         break;
840 
841                     case CalpontSystemCatalog::DATE:
842                     case CalpontSystemCatalog::DATETIME:
843                     {
844                         uint64_t val = in.getUintField(i);
845                         dataconvert::TimeStamp timestamp(val);
846                         int64_t seconds = timestamp.second;
847                         uint64_t outValue;
848 
849                         dataconvert::MySQLTime time;
850                         dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone);
851 
852                         if (out->getColTypes()[i] == CalpontSystemCatalog::DATE)
853                         {
854                             dataconvert::Date date;
855                             date.year = time.year;
856                             date.month = time.month;
857                             date.day = time.day;
858                             date.spare = 0;
859                             outValue = (uint32_t) *(reinterpret_cast<uint32_t*>(&date));
860                         }
861                         else
862                         {
863                             dataconvert::DateTime datetime;
864                             datetime.year = time.year;
865                             datetime.month = time.month;
866                             datetime.day = time.day;
867                             datetime.hour = time.hour;
868                             datetime.minute = time.minute;
869                             datetime.second = time.second;
870                             datetime.msecond = timestamp.msecond;
871                             outValue = (uint64_t) *(reinterpret_cast<uint64_t*>(&datetime));
872                         }
873 
874                         out->setUintField(outValue, i);
875                         break;
876                     }
877 
878                     case CalpontSystemCatalog::CHAR:
879                     case CalpontSystemCatalog::TEXT:
880                     case CalpontSystemCatalog::VARCHAR:
881                     {
882                         string d = DataConvert::timestampToString(in.getUintField(i), fTimeZone);
883                         out->setStringField(d, i);
884                         break;
885                     }
886 
887                     default:
888                     {
889                         ostringstream os;
890                         os << "TupleUnion::normalize(): tried an illegal conversion: timestamp to "
891                            << out->getColTypes()[i];
892                         throw logic_error(os.str());
893                     }
894                 }
895 
896                 break;
897 
898             case CalpontSystemCatalog::TIME:
899                 switch (out->getColTypes()[i])
900                 {
901                     case CalpontSystemCatalog::TIME:
902                         out->setIntField(in.getIntField(i), i);
903                         break;
904 
905                     case CalpontSystemCatalog::CHAR:
906                     case CalpontSystemCatalog::TEXT:
907                     case CalpontSystemCatalog::VARCHAR:
908                     {
909                         string d = DataConvert::timeToString(in.getIntField(i));
910                         out->setStringField(d, i);
911                         break;
912                     }
913 
914                     default:
915                     {
916                         ostringstream os;
917                         os << "TupleUnion::normalize(): tried an illegal conversion: time to "
918                            << out->getColTypes()[i];
919                         throw logic_error(os.str());
920                     }
921                 }
922 
923                 break;
924 
925             case CalpontSystemCatalog::FLOAT:
926             case CalpontSystemCatalog::UFLOAT:
927             case CalpontSystemCatalog::DOUBLE:
928             case CalpontSystemCatalog::UDOUBLE:
929             {
930                 double val = (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT ?
931                               in.getFloatField(i) : in.getDoubleField(i));
932 
933                 switch (out->getColTypes()[i])
934                 {
935                     case CalpontSystemCatalog::TINYINT:
936                     case CalpontSystemCatalog::SMALLINT:
937                     case CalpontSystemCatalog::MEDINT:
938                     case CalpontSystemCatalog::INT:
939                     case CalpontSystemCatalog::BIGINT:
940                         if (out->getScale(i))
941                             goto dec3;
942 
943                         out->setIntField((int64_t) val, i);
944                         break;
945 
946                     case CalpontSystemCatalog::UTINYINT:
947                     case CalpontSystemCatalog::USMALLINT:
948                     case CalpontSystemCatalog::UMEDINT:
949                     case CalpontSystemCatalog::UINT:
950                     case CalpontSystemCatalog::UBIGINT:
951                         out->setUintField((uint64_t) val, i);
952                         break;
953 
954                     case CalpontSystemCatalog::FLOAT:
955                     case CalpontSystemCatalog::UFLOAT:
956                         out->setFloatField(val, i);
957                         break;
958 
959                     case CalpontSystemCatalog::DOUBLE:
960                     case CalpontSystemCatalog::UDOUBLE:
961                         out->setDoubleField(val, i);
962                         break;
963 
964                     case CalpontSystemCatalog::LONGDOUBLE:
965                         out->setLongDoubleField(val, i);
966                         break;
967 
968                     case CalpontSystemCatalog::CHAR:
969                     case CalpontSystemCatalog::TEXT:
970                     case CalpontSystemCatalog::VARCHAR:
971                     {
972                         ostringstream os;
973                         os.precision(15);  // to match mysql's output
974                         os << val;
975                         out->setStringField(os.str(), i);
976                         break;
977                     }
978 
979                     case CalpontSystemCatalog::DECIMAL:
980                     case CalpontSystemCatalog::UDECIMAL:
981                     {
982 dec3:					/* have to pick a scale to use for the double. using 5... */
983                         uint32_t scale = 5;
984                         uint64_t ival = (uint64_t) (double) (val * pow((double) 10, (double) scale));
985                         int diff = out->getScale(i) - scale;
986 
987                         if (diff < 0)
988                             ival /= (uint64_t) pow((double) 10, (double) - diff);
989                         else
990                             ival *= (uint64_t) pow((double) 10, (double) diff);
991 
992                         out->setIntField((int64_t) val, i);
993                         break;
994                     }
995 
996                     default:
997                         ostringstream os;
998                         os << "TupleUnion::normalize(): tried an illegal conversion: floating point to "
999                            << out->getColTypes()[i];
1000                         throw logic_error(os.str());
1001                 }
1002 
1003                 break;
1004             }
1005 
1006             case CalpontSystemCatalog::LONGDOUBLE:
1007             {
1008                 long double val = in.getLongDoubleField(i);
1009 
1010                 switch (out->getColTypes()[i])
1011                 {
1012                     case CalpontSystemCatalog::TINYINT:
1013                     case CalpontSystemCatalog::SMALLINT:
1014                     case CalpontSystemCatalog::MEDINT:
1015                     case CalpontSystemCatalog::INT:
1016                     case CalpontSystemCatalog::BIGINT:
1017                         if (out->getScale(i))
1018                             goto dec4;
1019 
1020                         out->setIntField((int64_t) val, i);
1021                         break;
1022 
1023                     case CalpontSystemCatalog::UTINYINT:
1024                     case CalpontSystemCatalog::USMALLINT:
1025                     case CalpontSystemCatalog::UMEDINT:
1026                     case CalpontSystemCatalog::UINT:
1027                     case CalpontSystemCatalog::UBIGINT:
1028                         out->setUintField((uint64_t) val, i);
1029                         break;
1030 
1031                     case CalpontSystemCatalog::FLOAT:
1032                     case CalpontSystemCatalog::UFLOAT:
1033                         out->setFloatField(val, i);
1034                         break;
1035 
1036                     case CalpontSystemCatalog::DOUBLE:
1037                     case CalpontSystemCatalog::UDOUBLE:
1038                         out->setDoubleField(val, i);
1039                         break;
1040 
1041                     case CalpontSystemCatalog::LONGDOUBLE:
1042                         out->setLongDoubleField(val, i);
1043                         break;
1044 
1045                     case CalpontSystemCatalog::CHAR:
1046                     case CalpontSystemCatalog::TEXT:
1047                     case CalpontSystemCatalog::VARCHAR:
1048                     {
1049                         ostringstream os;
1050                         os.precision(15);  // to match mysql's output
1051                         os << val;
1052                         out->setStringField(os.str(), i);
1053                         break;
1054                     }
1055 
1056                     case CalpontSystemCatalog::DECIMAL:
1057                     case CalpontSystemCatalog::UDECIMAL:
1058                     {
1059 dec4:					/* have to pick a scale to use for the double. using 5... */
1060                         uint32_t scale = 5;
1061                         uint64_t ival = (uint64_t) (double) (val * pow((double) 10, (double) scale));
1062                         int diff = out->getScale(i) - scale;
1063 
1064                         if (diff < 0)
1065                             ival /= (uint64_t) pow((double) 10, (double) - diff);
1066                         else
1067                             ival *= (uint64_t) pow((double) 10, (double) diff);
1068 
1069                         out->setIntField((int64_t) val, i);
1070                         break;
1071                     }
1072 
1073                     default:
1074                         ostringstream os;
1075                         os << "TupleUnion::normalize(): tried an illegal conversion: floating point to "
1076                            << out->getColTypes()[i];
1077                         throw logic_error(os.str());
1078                 }
1079 
1080                 break;
1081             }
1082 
1083             case CalpontSystemCatalog::DECIMAL:
1084             case CalpontSystemCatalog::UDECIMAL:
1085             {
1086                 int64_t val = in.getIntField(i);
1087                 uint32_t    scale = in.getScale(i);
1088 
1089                 switch (out->getColTypes()[i])
1090                 {
1091                     case CalpontSystemCatalog::TINYINT:
1092                     case CalpontSystemCatalog::SMALLINT:
1093                     case CalpontSystemCatalog::MEDINT:
1094                     case CalpontSystemCatalog::INT:
1095                     case CalpontSystemCatalog::BIGINT:
1096                     case CalpontSystemCatalog::UTINYINT:
1097                     case CalpontSystemCatalog::USMALLINT:
1098                     case CalpontSystemCatalog::UMEDINT:
1099                     case CalpontSystemCatalog::UINT:
1100                     case CalpontSystemCatalog::UBIGINT:
1101                     case CalpontSystemCatalog::DECIMAL:
1102                     case CalpontSystemCatalog::UDECIMAL:
1103                     {
1104                         if (out->getScale(i) == scale)
1105                             out->setIntField(val, i);
1106                         else if (out->getScale(i) > scale)
1107                             out->setIntField(IDB_pow[out->getScale(i) - scale] * val, i);
1108                         else // should not happen, the output's scale is the largest
1109                             throw logic_error("TupleUnion::normalize(): incorrect scale setting");
1110 
1111                         break;
1112                     }
1113 
1114                     case CalpontSystemCatalog::FLOAT:
1115                     case CalpontSystemCatalog::UFLOAT:
1116                     {
1117                         float fval = ((float) val) / IDB_pow[scale];
1118                         out->setFloatField(fval, i);
1119                         break;
1120                     }
1121 
1122                     case CalpontSystemCatalog::DOUBLE:
1123                     case CalpontSystemCatalog::UDOUBLE:
1124                     {
1125                         double dval = ((double) val) / IDB_pow[scale];
1126                         out->setDoubleField(dval, i);
1127                         break;
1128                     }
1129 
1130                     case CalpontSystemCatalog::LONGDOUBLE:
1131                     {
1132                         long double dval = ((long double) val) / IDB_pow[scale];
1133                         out->setLongDoubleField(dval, i);
1134                         break;
1135                     }
1136 
1137                     case CalpontSystemCatalog::CHAR:
1138                     case CalpontSystemCatalog::TEXT:
1139                     case CalpontSystemCatalog::VARCHAR:
1140                     default:
1141                     {
1142                         char buf[50];
1143                         dataconvert::DataConvert::decimalToString(val, scale, buf, 50, out->getColTypes()[i]);
1144                         /*	ostringstream oss;
1145                         	if (scale == 0)
1146                         		oss << val;
1147                         	else
1148                         		oss << (val / IDB_pow[scale]) << "."
1149                         			<< setw(scale) << setfill('0') << (val % IDB_pow[scale]); */
1150                         out->setStringField(string(buf), i);
1151                         break;
1152                     }
1153                 }
1154 
1155                 break;
1156             }
1157 
1158             case CalpontSystemCatalog::BLOB:
1159             case CalpontSystemCatalog::VARBINARY:
1160             {
1161                 // out->setVarBinaryField(in.getVarBinaryStringField(i), i);  // not efficient
1162                 out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), i);
1163                 break;
1164             }
1165 
1166             default:
1167             {
1168                 ostringstream os;
1169                 os << "TupleUnion::normalize(): unknown input type (" << in.getColTypes()[i]
1170                    << ")";
1171                 cout << os.str() << endl;
1172                 throw logic_error(os.str());
1173             }
1174         }
1175     }
1176 }
1177 
run()1178 void TupleUnion::run()
1179 {
1180     uint32_t i;
1181 
1182     boost::mutex::scoped_lock lk(jlLock);
1183 
1184     if (runRan)
1185         return;
1186 
1187     runRan = true;
1188     lk.unlock();
1189 
1190     for (i = 0; i < fInputJobStepAssociation.outSize(); i++)
1191         inputs.push_back(fInputJobStepAssociation.outAt(i)->rowGroupDL());
1192 
1193     output = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
1194 
1195     if (fDelivery)
1196     {
1197         outputIt = output->getIterator();
1198     }
1199 
1200     outputRG.initRow(&row);
1201     outputRG.initRow(&row2);
1202 
1203     distinctCount = 0;
1204     normalizedData.reset(new RGData[inputs.size()]);
1205 
1206     for (i = 0; i < inputs.size(); i++)
1207     {
1208         if (distinctFlags[i])
1209         {
1210             distinctCount++;
1211             normalizedData[i].reinit(outputRG);
1212         }
1213     }
1214 
1215     runners.reserve(inputs.size());
1216 
1217     for (i = 0; i < inputs.size(); i++)
1218     {
1219         runners.push_back(jobstepThreadPool.invoke(Runner(this, i)));
1220     }
1221 }
1222 
join()1223 void TupleUnion::join()
1224 {
1225     boost::mutex::scoped_lock lk(jlLock);
1226 
1227     if (joinRan)
1228         return;
1229 
1230     joinRan = true;
1231     lk.unlock();
1232 
1233     jobstepThreadPool.join(runners);
1234     runners.clear();
1235     uniquer->clear();
1236     rowMemory.clear();
1237     rm->returnMemory(memUsage, sessionMemLimit);
1238     memUsage = 0;
1239 }
1240 
toString() const1241 const string TupleUnion::toString() const
1242 {
1243     ostringstream oss;
1244     oss << "TupleUnion       ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId;
1245     oss << " st:" << fStepId;
1246     oss << " in:";
1247 
1248     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
1249         oss << ((i == 0) ? " " : ", ") << fInputJobStepAssociation.outAt(i);
1250 
1251     oss << " out:";
1252 
1253     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
1254         oss << ((i == 0) ? " " : ", ") << fOutputJobStepAssociation.outAt(i);
1255 
1256     oss << endl;
1257 
1258     return oss.str();
1259 
1260 }
1261 
writeNull(Row * out,uint32_t col)1262 void TupleUnion::writeNull(Row* out, uint32_t col)
1263 {
1264     switch (out->getColTypes()[col])
1265     {
1266         case CalpontSystemCatalog::TINYINT:
1267             out->setUintField<1>(joblist::TINYINTNULL, col);
1268             break;
1269 
1270         case CalpontSystemCatalog::SMALLINT:
1271             out->setUintField<1>(joblist::SMALLINTNULL, col);
1272             break;
1273 
1274         case CalpontSystemCatalog::UTINYINT:
1275             out->setUintField<1>(joblist::UTINYINTNULL, col);
1276             break;
1277 
1278         case CalpontSystemCatalog::USMALLINT:
1279             out->setUintField<1>(joblist::USMALLINTNULL, col);
1280             break;
1281 
1282         case CalpontSystemCatalog::DECIMAL:
1283         case CalpontSystemCatalog::UDECIMAL:
1284         {
1285             uint32_t len = out->getColumnWidth(col);
1286 
1287             switch (len)
1288             {
1289                 case 1:
1290                     out->setUintField<1>(joblist::TINYINTNULL, col);
1291                     break;
1292 
1293                 case 2:
1294                     out->setUintField<1>(joblist::SMALLINTNULL, col);
1295                     break;
1296 
1297                 case 4:
1298                     out->setUintField<4>(joblist::INTNULL, col);
1299                     break;
1300 
1301                 case 8:
1302                     out->setUintField<8>(joblist::BIGINTNULL, col);
1303                     break;
1304 
1305                 default:
1306                 {}
1307             }
1308 
1309             break;
1310         }
1311 
1312         case CalpontSystemCatalog::MEDINT:
1313         case CalpontSystemCatalog::INT:
1314             out->setUintField<4>(joblist::INTNULL, col);
1315             break;
1316 
1317         case CalpontSystemCatalog::UMEDINT:
1318         case CalpontSystemCatalog::UINT:
1319             out->setUintField<4>(joblist::UINTNULL, col);
1320             break;
1321 
1322         case CalpontSystemCatalog::FLOAT:
1323         case CalpontSystemCatalog::UFLOAT:
1324             out->setUintField<4>(joblist::FLOATNULL, col);
1325             break;
1326 
1327         case CalpontSystemCatalog::DATE:
1328             out->setUintField<4>(joblist::DATENULL, col);
1329             break;
1330 
1331         case CalpontSystemCatalog::BIGINT:
1332             out->setUintField<8>(joblist::BIGINTNULL, col);
1333             break;
1334 
1335         case CalpontSystemCatalog::UBIGINT:
1336             out->setUintField<8>(joblist::UBIGINTNULL, col);
1337             break;
1338 
1339         case CalpontSystemCatalog::DOUBLE:
1340         case CalpontSystemCatalog::UDOUBLE:
1341             out->setUintField<8>(joblist::DOUBLENULL, col);
1342             break;
1343 
1344         case CalpontSystemCatalog::DATETIME:
1345             out->setUintField<8>(joblist::DATETIMENULL, col);
1346             break;
1347 
1348         case CalpontSystemCatalog::TIMESTAMP:
1349             out->setUintField<8>(joblist::TIMESTAMPNULL, col);
1350             break;
1351 
1352         case CalpontSystemCatalog::TIME:
1353             out->setUintField<8>(joblist::TIMENULL, col);
1354             break;
1355 
1356         case CalpontSystemCatalog::CHAR:
1357         case CalpontSystemCatalog::TEXT:
1358         case CalpontSystemCatalog::VARCHAR:
1359         {
1360             uint32_t len = out->getColumnWidth(col);
1361 
1362             switch (len)
1363             {
1364                 case 1:
1365                     out->setUintField<1>(joblist::CHAR1NULL, col);
1366                     break;
1367 
1368                 case 2:
1369                     out->setUintField<2>(joblist::CHAR2NULL, col);
1370                     break;
1371 
1372                 case 3:
1373                 case 4:
1374                     out->setUintField<4>(joblist::CHAR4NULL, col);
1375                     break;
1376 
1377                 case 5:
1378                 case 6:
1379                 case 7:
1380                 case 8:
1381                     out->setUintField<8>(joblist::CHAR8NULL, col);
1382                     break;
1383 
1384                 default:
1385                     out->setStringField(joblist::CPNULLSTRMARK, col);
1386                     break;
1387             }
1388 
1389             break;
1390         }
1391 
1392         case CalpontSystemCatalog::BLOB:
1393         case CalpontSystemCatalog::VARBINARY:
1394             // could use below if zero length and NULL are treated the same
1395             // out->setVarBinaryField("", col); break;
1396             out->setVarBinaryField(joblist::CPNULLSTRMARK, col);
1397             break;
1398 
1399         default:
1400         { }
1401     }
1402 }
1403 
formatMiniStats()1404 void TupleUnion::formatMiniStats()
1405 {
1406     ostringstream oss;
1407     oss << "TUS "
1408         << "UM "
1409         << "- "
1410         << "- "
1411         << "- "
1412         << "- "
1413         << "- "
1414         << "- "
1415         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1416         << fRowsReturned << " ";
1417     fMiniInfo += oss.str();
1418 }
1419 
1420 }
1421