1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /*****************************************************************************
20 * $Id: tupleunion.cpp 9665 2013-07-02 21:47:39Z pleblanc $
21 *
22 ****************************************************************************/
23
24 #include <string>
25 #include <boost/thread.hpp>
26 #include <boost/thread/mutex.hpp>
27 #include <boost/uuid/uuid_io.hpp>
28
29 #include "querytele.h"
30 using namespace querytele;
31
32 #include "dataconvert.h"
33 #include "hasher.h"
34 #include "jlf_common.h"
35 #include "resourcemanager.h"
36 #include "tupleunion.h"
37
38 using namespace std;
39 using namespace std::tr1;
40 using namespace boost;
41 using namespace execplan;
42 using namespace rowgroup;
43 using namespace dataconvert;
44
45 #ifndef __linux__
46 #ifndef M_LN10
47 #define M_LN10 2.30258509299404568402 /* log_e 10 */
48 #endif
49 namespace
50 {
51 //returns the value of 10 raised to the power x.
exp10(double x)52 inline double exp10(double x)
53 {
54 return exp(x * M_LN10);
55 }
56 }
57 #endif
58
59 namespace joblist
60 {
operator ()(const RowPosition & p) const61 inline uint64_t TupleUnion::Hasher::operator()(const RowPosition& p) const
62 {
63 Row& row = ts->row;
64
65 if (p.group & RowPosition::normalizedFlag)
66 ts->normalizedData[p.group & ~RowPosition::normalizedFlag].getRow(p.row, &row);
67 else
68 ts->rowMemory[p.group].getRow(p.row, &row);
69
70 return row.hash();
71 }
72
operator ()(const RowPosition & d1,const RowPosition & d2) const73 inline bool TupleUnion::Eq::operator()(const RowPosition& d1, const RowPosition& d2) const
74 {
75 Row& r1 = ts->row, &r2 = ts->row2;
76
77 if (d1.group & RowPosition::normalizedFlag)
78 ts->normalizedData[d1.group & ~RowPosition::normalizedFlag].getRow(d1.row, &r1);
79 else
80 ts->rowMemory[d1.group].getRow(d1.row, &r1);
81
82 if (d2.group & RowPosition::normalizedFlag)
83 ts->normalizedData[d2.group & ~RowPosition::normalizedFlag].getRow(d2.row, &r2);
84 else
85 ts->rowMemory[d2.group].getRow(d2.row, &r2);
86
87 return r1.equals(r2);
88 }
89
TupleUnion(CalpontSystemCatalog::OID tableOID,const JobInfo & jobInfo)90 TupleUnion::TupleUnion(CalpontSystemCatalog::OID tableOID, const JobInfo& jobInfo) :
91 JobStep(jobInfo),
92 fTableOID(tableOID),
93 output(NULL),
94 outputIt(-1),
95 memUsage(0),
96 rm(jobInfo.rm),
97 runnersDone(0),
98 distinctCount(0),
99 distinctDone(0),
100 fRowsReturned(0),
101 runRan(false),
102 joinRan(false),
103 sessionMemLimit(jobInfo.umMemLimit),
104 fTimeZone(jobInfo.timeZone)
105 {
106 uniquer.reset(new Uniquer_t(10, Hasher(this), Eq(this), allocator));
107 fExtendedInfo = "TUN: ";
108 fQtc.stepParms().stepType = StepTeleStats::T_TUN;
109 }
110
~TupleUnion()111 TupleUnion::~TupleUnion()
112 {
113 rm->returnMemory(memUsage, sessionMemLimit);
114
115 if (!runRan && output)
116 output->endOfInput();
117 }
118
tableOid() const119 CalpontSystemCatalog::OID TupleUnion::tableOid() const
120 {
121 return fTableOID;
122 }
123
setInputRowGroups(const vector<rowgroup::RowGroup> & in)124 void TupleUnion::setInputRowGroups(const vector<rowgroup::RowGroup>& in)
125 {
126 inputRGs = in;
127 }
128
setOutputRowGroup(const rowgroup::RowGroup & out)129 void TupleUnion::setOutputRowGroup(const rowgroup::RowGroup& out)
130 {
131 outputRG = out;
132 rowLength = outputRG.getRowSizeWithStrings();
133 }
134
setDistinctFlags(const vector<bool> & v)135 void TupleUnion::setDistinctFlags(const vector<bool>& v)
136 {
137 distinctFlags = v;
138 }
139
readInput(uint32_t which)140 void TupleUnion::readInput(uint32_t which)
141 {
142 /* The handling of the output got a little kludgey with the string table enhancement.
143 * When there is no distinct check, the outputs are all generated independently of
144 * each other locally in this fcn. When there is a distinct check, threads
145 * share the output, which is built in the 'rowMemory' vector rather than in
146 * thread-local memory. Building the result in a common space allows us to
147 * store 8-byte offsets in rowMemory rather than 16-bytes for absolute pointers.
148 */
149
150 RowGroupDL* dl = NULL;
151 bool more = true;
152 RGData inRGData, outRGData, *tmpRGData;
153 uint32_t it = numeric_limits<uint32_t>::max();
154 RowGroup l_inputRG, l_outputRG, l_tmpRG;
155 Row inRow, outRow, tmpRow;
156 bool distinct;
157 uint64_t memUsageBefore, memUsageAfter, memDiff;
158 StepTeleStats sts;
159 sts.query_uuid = fQueryUuid;
160 sts.step_uuid = fStepUuid;
161
162 l_outputRG = outputRG;
163 dl = inputs[which];
164 l_inputRG = inputRGs[which];
165 l_inputRG.initRow(&inRow);
166 l_outputRG.initRow(&outRow);
167 distinct = distinctFlags[which];
168
169 if (distinct)
170 {
171 l_tmpRG = outputRG;
172 tmpRGData = &normalizedData[which];
173 l_tmpRG.initRow(&tmpRow);
174 l_tmpRG.setData(tmpRGData);
175 l_tmpRG.resetRowGroup(0);
176 l_tmpRG.getRow(0, &tmpRow);
177 }
178 else
179 {
180 outRGData = RGData(l_outputRG);
181 l_outputRG.setData(&outRGData);
182 l_outputRG.resetRowGroup(0);
183 l_outputRG.getRow(0, &outRow);
184 }
185
186 try
187 {
188
189 it = dl->getIterator();
190 more = dl->next(it, &inRGData);
191
192 if (dlTimes.FirstReadTime().tv_sec == 0)
193 dlTimes.setFirstReadTime();
194
195 if (fStartTime == -1)
196 {
197 sts.msg_type = StepTeleStats::ST_START;
198 sts.total_units_of_work = 1;
199 postStepStartTele(sts);
200 }
201
202 while (more && !cancelled())
203 {
204 /*
205 normalize each row
206 if distinct flag is set
207 copy the row into the output and test for uniqueness
208 if unique, increment the row count
209 else
210 copy the row into the output & inc row count
211 */
212 l_inputRG.setData(&inRGData);
213 l_inputRG.getRow(0, &inRow);
214
215 if (distinct)
216 {
217 memDiff = 0;
218 l_tmpRG.resetRowGroup(0);
219 l_tmpRG.getRow(0, &tmpRow);
220 l_tmpRG.setRowCount(l_inputRG.getRowCount());
221
222 for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow(),
223 tmpRow.nextRow())
224 normalize(inRow, &tmpRow);
225
226 l_tmpRG.getRow(0, &tmpRow);
227 {
228 boost::mutex::scoped_lock lk(uniquerMutex);
229 getOutput(&l_outputRG, &outRow, &outRGData);
230 memUsageBefore = allocator.getMemUsage();
231
232 for (uint32_t i = 0; i < l_tmpRG.getRowCount(); i++, tmpRow.nextRow())
233 {
234 pair<Uniquer_t::iterator, bool> inserted;
235 inserted = uniquer->insert(RowPosition(which | RowPosition::normalizedFlag, i));
236
237 if (inserted.second)
238 {
239 copyRow(tmpRow, &outRow);
240 const_cast<RowPosition&>(*(inserted.first)) = RowPosition(rowMemory.size() - 1, l_outputRG.getRowCount());
241 memDiff += outRow.getRealSize();
242 addToOutput(&outRow, &l_outputRG, true, outRGData);
243 }
244 }
245
246 memUsageAfter = allocator.getMemUsage();
247 memDiff += (memUsageAfter - memUsageBefore);
248 memUsage += memDiff;
249 }
250
251 if (!rm->getMemory(memDiff, sessionMemLimit))
252 {
253 fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_UNION_TOO_BIG);
254
255 if (status() == 0) // preserve existing error code
256 {
257 errorMessage(logging::IDBErrorInfo::instance()->errorMsg(
258 logging::ERR_UNION_TOO_BIG));
259 status(logging::ERR_UNION_TOO_BIG);
260 }
261
262 abort();
263 }
264 }
265 else
266 {
267 for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow())
268 {
269 normalize(inRow, &outRow);
270 addToOutput(&outRow, &l_outputRG, false, outRGData);
271 }
272 }
273
274 more = dl->next(it, &inRGData);
275 }
276 }
277 catch (...)
278 {
279 handleException(std::current_exception(),
280 logging::unionStepErr,
281 logging::ERR_UNION_TOO_BIG,
282 "TupleUnion::readInput()");
283 status(logging::unionStepErr);
284 abort();
285 }
286
287 /* make sure that the input was drained before exiting. This can happen if the
288 query was aborted */
289 if (dl && it != numeric_limits<uint32_t>::max())
290 while (more)
291 more = dl->next(it, &inRGData);
292
293 {
294 boost::mutex::scoped_lock lock1(uniquerMutex);
295 boost::mutex::scoped_lock lock2(sMutex);
296
297 if (!distinct && l_outputRG.getRowCount() > 0)
298 output->insert(outRGData);
299
300 if (distinct)
301 {
302 getOutput(&l_outputRG, &outRow, &outRGData);
303
304 if (++distinctDone == distinctCount && l_outputRG.getRowCount() > 0)
305 output->insert(outRGData);
306 }
307
308 if (++runnersDone == fInputJobStepAssociation.outSize())
309 {
310 output->endOfInput();
311
312 sts.msg_type = StepTeleStats::ST_SUMMARY;
313 sts.total_units_of_work = sts.units_of_work_completed = 1;
314 sts.rows = fRowsReturned;
315 postStepSummaryTele(sts);
316
317 if (traceOn())
318 {
319 dlTimes.setLastReadTime();
320 dlTimes.setEndOfInputTime();
321
322 time_t t = time (0);
323 char timeString[50];
324 ctime_r (&t, timeString);
325 timeString[strlen (timeString ) - 1] = '\0';
326 ostringstream logStr;
327 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at "
328 << timeString << "; total rows returned-" << fRowsReturned << endl
329 << "\t1st read " << dlTimes.FirstReadTimeString()
330 << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
331 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
332 << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
333 << "\tJob completion status " << status() << endl;
334 logEnd(logStr.str().c_str());
335 fExtendedInfo += logStr.str();
336 formatMiniStats();
337 }
338 }
339 }
340 }
341
nextBand(messageqcpp::ByteStream & bs)342 uint32_t TupleUnion::nextBand(messageqcpp::ByteStream& bs)
343 {
344 RGData mem;
345 bool more;
346 uint32_t ret = 0;
347
348 bs.restart();
349 more = output->next(outputIt, &mem);
350
351 if (more)
352 outputRG.setData(&mem);
353 else
354 {
355 mem = RGData(outputRG, 0);
356 outputRG.setData(&mem);
357 outputRG.resetRowGroup(0);
358 outputRG.setStatus(status());
359 }
360
361 outputRG.serializeRGData(bs);
362 ret = outputRG.getRowCount();
363
364 return ret;
365 }
366
getOutput(RowGroup * rg,Row * row,RGData * data)367 void TupleUnion::getOutput(RowGroup* rg, Row* row, RGData* data)
368 {
369 if (UNLIKELY(rowMemory.empty()))
370 {
371 *data = RGData(*rg);
372 rg->setData(data);
373 rg->resetRowGroup(0);
374 rowMemory.push_back(*data);
375 }
376 else
377 {
378 *data = rowMemory.back();
379 rg->setData(data);
380 }
381
382 rg->getRow(rg->getRowCount(), row);
383 }
384
addToOutput(Row * r,RowGroup * rg,bool keepit,RGData & data)385 void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit,
386 RGData& data)
387 {
388 r->nextRow();
389 rg->incRowCount();
390 fRowsReturned++;
391
392 if (rg->getRowCount() == 8192)
393 {
394 {
395 boost::mutex::scoped_lock lock(sMutex);
396 output->insert(data);
397 }
398 data = RGData(*rg);
399 rg->setData(&data);
400 rg->resetRowGroup(0);
401 rg->getRow(0, r);
402
403 if (keepit)
404 rowMemory.push_back(data);
405 }
406 }
407
normalize(const Row & in,Row * out)408 void TupleUnion::normalize(const Row& in, Row* out)
409 {
410 uint32_t i;
411
412 out->setRid(0);
413
414 for (i = 0; i < out->getColumnCount(); i++)
415 {
416 if (in.isNullValue(i))
417 {
418 writeNull(out, i);
419 continue;
420 }
421
422 switch (in.getColTypes()[i])
423 {
424 case CalpontSystemCatalog::TINYINT:
425 case CalpontSystemCatalog::SMALLINT:
426 case CalpontSystemCatalog::MEDINT:
427 case CalpontSystemCatalog::INT:
428 case CalpontSystemCatalog::BIGINT:
429 switch (out->getColTypes()[i])
430 {
431 case CalpontSystemCatalog::TINYINT:
432 case CalpontSystemCatalog::SMALLINT:
433 case CalpontSystemCatalog::MEDINT:
434 case CalpontSystemCatalog::INT:
435 case CalpontSystemCatalog::BIGINT:
436 if (out->getScale(i) || in.getScale(i))
437 goto dec1;
438
439 out->setIntField(in.getIntField(i), i);
440 break;
441
442 case CalpontSystemCatalog::UTINYINT:
443 case CalpontSystemCatalog::USMALLINT:
444 case CalpontSystemCatalog::UMEDINT:
445 case CalpontSystemCatalog::UINT:
446 case CalpontSystemCatalog::UBIGINT:
447 if (in.getScale(i))
448 goto dec1;
449
450 out->setUintField(in.getUintField(i), i);
451 break;
452
453 case CalpontSystemCatalog::CHAR:
454 case CalpontSystemCatalog::TEXT:
455 case CalpontSystemCatalog::VARCHAR:
456 {
457 ostringstream os;
458
459 if (in.getScale(i))
460 {
461 double d = in.getIntField(i);
462 d /= exp10(in.getScale(i));
463 os.precision(15);
464 os << d;
465 }
466 else
467 os << in.getIntField(i);
468
469 out->setStringField(os.str(), i);
470 break;
471 }
472
473 case CalpontSystemCatalog::DATE:
474 case CalpontSystemCatalog::DATETIME:
475 case CalpontSystemCatalog::TIME:
476 case CalpontSystemCatalog::TIMESTAMP:
477 throw logic_error("TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime");
478
479 case CalpontSystemCatalog::FLOAT:
480 case CalpontSystemCatalog::UFLOAT:
481 {
482 int scale = in.getScale(i);
483
484 if (scale != 0)
485 {
486 float f = in.getIntField(i);
487 f /= (uint64_t) pow(10.0, scale);
488 out->setFloatField(f, i);
489 }
490 else
491 out->setFloatField(in.getIntField(i), i);
492
493 break;
494 }
495
496 case CalpontSystemCatalog::DOUBLE:
497 case CalpontSystemCatalog::UDOUBLE:
498 {
499 int scale = in.getScale(i);
500
501 if (scale != 0)
502 {
503 double d = in.getIntField(i);
504 d /= (uint64_t) pow(10.0, scale);
505 out->setDoubleField(d, i);
506 }
507 else
508 out->setDoubleField(in.getIntField(i), i);
509
510 break;
511 }
512
513 case CalpontSystemCatalog::LONGDOUBLE:
514 {
515 int scale = in.getScale(i);
516 long double d = in.getIntField(i);
517 if (scale != 0)
518 {
519 d /= (uint64_t) pow(10.0, scale);
520 }
521 out->setLongDoubleField(d, i);
522 break;
523 }
524
525 case CalpontSystemCatalog::DECIMAL:
526 case CalpontSystemCatalog::UDECIMAL:
527 {
528 dec1:
529 uint64_t val = in.getIntField(i);
530 int diff = out->getScale(i) - in.getScale(i);
531
532 if (diff < 0)
533 val /= (uint64_t) pow((double) 10, (double) - diff);
534 else
535 val *= (uint64_t) pow((double) 10, (double) diff);
536
537 out->setIntField(val, i);
538 break;
539 }
540
541 default:
542 ostringstream os;
543 os << "TupleUnion::normalize(): tried an illegal conversion: integer to "
544 << out->getColTypes()[i];
545 throw logic_error(os.str());
546 }
547
548 break;
549
550 case CalpontSystemCatalog::UTINYINT:
551 case CalpontSystemCatalog::USMALLINT:
552 case CalpontSystemCatalog::UMEDINT:
553 case CalpontSystemCatalog::UINT:
554 case CalpontSystemCatalog::UBIGINT:
555 switch (out->getColTypes()[i])
556 {
557 case CalpontSystemCatalog::TINYINT:
558 case CalpontSystemCatalog::SMALLINT:
559 case CalpontSystemCatalog::MEDINT:
560 case CalpontSystemCatalog::INT:
561 case CalpontSystemCatalog::BIGINT:
562 if (out->getScale(i))
563 goto dec2;
564
565 out->setIntField(in.getUintField(i), i);
566 break;
567
568 case CalpontSystemCatalog::UTINYINT:
569 case CalpontSystemCatalog::USMALLINT:
570 case CalpontSystemCatalog::UMEDINT:
571 case CalpontSystemCatalog::UINT:
572 case CalpontSystemCatalog::UBIGINT:
573 out->setUintField(in.getUintField(i), i);
574 break;
575
576 case CalpontSystemCatalog::CHAR:
577 case CalpontSystemCatalog::TEXT:
578 case CalpontSystemCatalog::VARCHAR:
579 {
580 ostringstream os;
581
582 if (in.getScale(i))
583 {
584 double d = in.getUintField(i);
585 d /= exp10(in.getScale(i));
586 os.precision(15);
587 os << d;
588 }
589 else
590 os << in.getUintField(i);
591
592 out->setStringField(os.str(), i);
593 break;
594 }
595
596 case CalpontSystemCatalog::DATE:
597 case CalpontSystemCatalog::DATETIME:
598 case CalpontSystemCatalog::TIME:
599 case CalpontSystemCatalog::TIMESTAMP:
600 throw logic_error("TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime");
601
602 case CalpontSystemCatalog::FLOAT:
603 case CalpontSystemCatalog::UFLOAT:
604 {
605 int scale = in.getScale(i);
606
607 if (scale != 0)
608 {
609 float f = in.getUintField(i);
610 f /= (uint64_t) pow(10.0, scale);
611 out->setFloatField(f, i);
612 }
613 else
614 out->setFloatField(in.getUintField(i), i);
615
616 break;
617 }
618
619 case CalpontSystemCatalog::DOUBLE:
620 case CalpontSystemCatalog::UDOUBLE:
621 {
622 int scale = in.getScale(i);
623
624 if (scale != 0)
625 {
626 double d = in.getUintField(i);
627 d /= (uint64_t) pow(10.0, scale);
628 out->setDoubleField(d, i);
629 }
630 else
631 out->setDoubleField(in.getUintField(i), i);
632
633 break;
634 }
635
636 case CalpontSystemCatalog::LONGDOUBLE:
637 {
638 int scale = in.getScale(i);
639
640 if (scale != 0)
641 {
642 long double d = in.getUintField(i);
643 d /= (uint64_t) pow(10.0, scale);
644 out->setLongDoubleField(d, i);
645 }
646 else
647 out->setLongDoubleField(in.getUintField(i), i);
648
649 break;
650 }
651
652 case CalpontSystemCatalog::DECIMAL:
653 case CalpontSystemCatalog::UDECIMAL:
654 {
655 dec2:
656 uint64_t val = in.getIntField(i);
657 int diff = out->getScale(i) - in.getScale(i);
658
659 if (diff < 0)
660 val /= (uint64_t) pow((double) 10, (double) - diff);
661 else
662 val *= (uint64_t) pow((double) 10, (double) diff);
663
664 out->setIntField(val, i);
665 break;
666 }
667
668 default:
669 ostringstream os;
670 os << "TupleUnion::normalize(): tried an illegal conversion: integer to "
671 << out->getColTypes()[i];
672 throw logic_error(os.str());
673 }
674
675 break;
676
677 case CalpontSystemCatalog::CHAR:
678 case CalpontSystemCatalog::TEXT:
679 case CalpontSystemCatalog::VARCHAR:
680 switch (out->getColTypes()[i])
681 {
682 case CalpontSystemCatalog::CHAR:
683 case CalpontSystemCatalog::TEXT:
684 case CalpontSystemCatalog::VARCHAR:
685 out->setStringField(in.getStringField(i), i);
686 break;
687
688 default:
689 {
690 ostringstream os;
691 os << "TupleUnion::normalize(): tried an illegal conversion: string to "
692 << out->getColTypes()[i];
693 throw logic_error(os.str());
694 }
695 }
696
697 break;
698
699 case CalpontSystemCatalog::DATE:
700 switch (out->getColTypes()[i])
701 {
702 case CalpontSystemCatalog::DATE:
703 out->setIntField(in.getIntField(i), i);
704 break;
705
706 case CalpontSystemCatalog::DATETIME:
707 {
708 uint64_t date = in.getUintField(i);
709 date &= ~0x3f; // zero the 'spare' field
710 date <<= 32;
711 out->setUintField(date, i);
712 break;
713 }
714
715 case CalpontSystemCatalog::TIMESTAMP:
716 {
717 dataconvert::Date date(in.getUintField(i));
718 dataconvert::MySQLTime m_time;
719 m_time.year = date.year;
720 m_time.month = date.month;
721 m_time.day = date.day;
722 m_time.hour = 0;
723 m_time.minute = 0;
724 m_time.second = 0;
725 m_time.second_part = 0;
726
727 dataconvert::TimeStamp timeStamp;
728 bool isValid = true;
729 int64_t seconds = dataconvert::mySQLTimeToGmtSec(m_time, fTimeZone, isValid);
730
731 if (!isValid)
732 {
733 timeStamp.reset();
734 }
735 else
736 {
737 timeStamp.second = seconds;
738 timeStamp.msecond = m_time.second_part;
739 }
740
741 uint64_t outValue = (uint64_t) *(reinterpret_cast<uint64_t*>(&timeStamp));
742 out->setUintField(outValue, i);
743 break;
744 }
745
746 case CalpontSystemCatalog::CHAR:
747 case CalpontSystemCatalog::TEXT:
748 case CalpontSystemCatalog::VARCHAR:
749 {
750 string d = DataConvert::dateToString(in.getUintField(i));
751 out->setStringField(d, i);
752 break;
753 }
754
755 default:
756 {
757 ostringstream os;
758 os << "TupleUnion::normalize(): tried an illegal conversion: date to "
759 << out->getColTypes()[i];
760 throw logic_error(os.str());
761 }
762 }
763
764 break;
765
766 case CalpontSystemCatalog::DATETIME:
767 switch (out->getColTypes()[i])
768 {
769 case CalpontSystemCatalog::DATETIME:
770 out->setIntField(in.getIntField(i), i);
771 break;
772
773 case CalpontSystemCatalog::DATE:
774 {
775 uint64_t val = in.getUintField(i);
776 val >>= 32;
777 out->setUintField(val, i);
778 break;
779 }
780
781 case CalpontSystemCatalog::TIMESTAMP:
782 {
783 uint64_t val = in.getUintField(i);
784 dataconvert::DateTime dtime(val);
785 dataconvert::MySQLTime m_time;
786 dataconvert::TimeStamp timeStamp;
787
788 m_time.year = dtime.year;
789 m_time.month = dtime.month;
790 m_time.day = dtime.day;
791 m_time.hour = dtime.hour;
792 m_time.minute = dtime.minute;
793 m_time.second = dtime.second;
794 m_time.second_part = dtime.msecond;
795
796 bool isValid = true;
797 int64_t seconds = mySQLTimeToGmtSec(m_time, fTimeZone, isValid);
798
799 if (!isValid)
800 {
801 timeStamp.reset();
802 }
803 else
804 {
805 timeStamp.second = seconds;
806 timeStamp.msecond = m_time.second_part;
807 }
808
809 uint64_t outValue = (uint64_t) *(reinterpret_cast<uint64_t*>(&timeStamp));
810 out->setUintField(outValue, i);
811 break;
812 }
813
814 case CalpontSystemCatalog::CHAR:
815 case CalpontSystemCatalog::TEXT:
816 case CalpontSystemCatalog::VARCHAR:
817 {
818 string d = DataConvert::datetimeToString(in.getUintField(i));
819 out->setStringField(d, i);
820 break;
821 }
822
823 default:
824 {
825 ostringstream os;
826 os << "TupleUnion::normalize(): tried an illegal conversion: datetime to "
827 << out->getColTypes()[i];
828 throw logic_error(os.str());
829 }
830 }
831
832 break;
833
834 case CalpontSystemCatalog::TIMESTAMP:
835 switch (out->getColTypes()[i])
836 {
837 case CalpontSystemCatalog::TIMESTAMP:
838 out->setIntField(in.getIntField(i), i);
839 break;
840
841 case CalpontSystemCatalog::DATE:
842 case CalpontSystemCatalog::DATETIME:
843 {
844 uint64_t val = in.getUintField(i);
845 dataconvert::TimeStamp timestamp(val);
846 int64_t seconds = timestamp.second;
847 uint64_t outValue;
848
849 dataconvert::MySQLTime time;
850 dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone);
851
852 if (out->getColTypes()[i] == CalpontSystemCatalog::DATE)
853 {
854 dataconvert::Date date;
855 date.year = time.year;
856 date.month = time.month;
857 date.day = time.day;
858 date.spare = 0;
859 outValue = (uint32_t) *(reinterpret_cast<uint32_t*>(&date));
860 }
861 else
862 {
863 dataconvert::DateTime datetime;
864 datetime.year = time.year;
865 datetime.month = time.month;
866 datetime.day = time.day;
867 datetime.hour = time.hour;
868 datetime.minute = time.minute;
869 datetime.second = time.second;
870 datetime.msecond = timestamp.msecond;
871 outValue = (uint64_t) *(reinterpret_cast<uint64_t*>(&datetime));
872 }
873
874 out->setUintField(outValue, i);
875 break;
876 }
877
878 case CalpontSystemCatalog::CHAR:
879 case CalpontSystemCatalog::TEXT:
880 case CalpontSystemCatalog::VARCHAR:
881 {
882 string d = DataConvert::timestampToString(in.getUintField(i), fTimeZone);
883 out->setStringField(d, i);
884 break;
885 }
886
887 default:
888 {
889 ostringstream os;
890 os << "TupleUnion::normalize(): tried an illegal conversion: timestamp to "
891 << out->getColTypes()[i];
892 throw logic_error(os.str());
893 }
894 }
895
896 break;
897
898 case CalpontSystemCatalog::TIME:
899 switch (out->getColTypes()[i])
900 {
901 case CalpontSystemCatalog::TIME:
902 out->setIntField(in.getIntField(i), i);
903 break;
904
905 case CalpontSystemCatalog::CHAR:
906 case CalpontSystemCatalog::TEXT:
907 case CalpontSystemCatalog::VARCHAR:
908 {
909 string d = DataConvert::timeToString(in.getIntField(i));
910 out->setStringField(d, i);
911 break;
912 }
913
914 default:
915 {
916 ostringstream os;
917 os << "TupleUnion::normalize(): tried an illegal conversion: time to "
918 << out->getColTypes()[i];
919 throw logic_error(os.str());
920 }
921 }
922
923 break;
924
925 case CalpontSystemCatalog::FLOAT:
926 case CalpontSystemCatalog::UFLOAT:
927 case CalpontSystemCatalog::DOUBLE:
928 case CalpontSystemCatalog::UDOUBLE:
929 {
930 double val = (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT ?
931 in.getFloatField(i) : in.getDoubleField(i));
932
933 switch (out->getColTypes()[i])
934 {
935 case CalpontSystemCatalog::TINYINT:
936 case CalpontSystemCatalog::SMALLINT:
937 case CalpontSystemCatalog::MEDINT:
938 case CalpontSystemCatalog::INT:
939 case CalpontSystemCatalog::BIGINT:
940 if (out->getScale(i))
941 goto dec3;
942
943 out->setIntField((int64_t) val, i);
944 break;
945
946 case CalpontSystemCatalog::UTINYINT:
947 case CalpontSystemCatalog::USMALLINT:
948 case CalpontSystemCatalog::UMEDINT:
949 case CalpontSystemCatalog::UINT:
950 case CalpontSystemCatalog::UBIGINT:
951 out->setUintField((uint64_t) val, i);
952 break;
953
954 case CalpontSystemCatalog::FLOAT:
955 case CalpontSystemCatalog::UFLOAT:
956 out->setFloatField(val, i);
957 break;
958
959 case CalpontSystemCatalog::DOUBLE:
960 case CalpontSystemCatalog::UDOUBLE:
961 out->setDoubleField(val, i);
962 break;
963
964 case CalpontSystemCatalog::LONGDOUBLE:
965 out->setLongDoubleField(val, i);
966 break;
967
968 case CalpontSystemCatalog::CHAR:
969 case CalpontSystemCatalog::TEXT:
970 case CalpontSystemCatalog::VARCHAR:
971 {
972 ostringstream os;
973 os.precision(15); // to match mysql's output
974 os << val;
975 out->setStringField(os.str(), i);
976 break;
977 }
978
979 case CalpontSystemCatalog::DECIMAL:
980 case CalpontSystemCatalog::UDECIMAL:
981 {
982 dec3: /* have to pick a scale to use for the double. using 5... */
983 uint32_t scale = 5;
984 uint64_t ival = (uint64_t) (double) (val * pow((double) 10, (double) scale));
985 int diff = out->getScale(i) - scale;
986
987 if (diff < 0)
988 ival /= (uint64_t) pow((double) 10, (double) - diff);
989 else
990 ival *= (uint64_t) pow((double) 10, (double) diff);
991
992 out->setIntField((int64_t) val, i);
993 break;
994 }
995
996 default:
997 ostringstream os;
998 os << "TupleUnion::normalize(): tried an illegal conversion: floating point to "
999 << out->getColTypes()[i];
1000 throw logic_error(os.str());
1001 }
1002
1003 break;
1004 }
1005
1006 case CalpontSystemCatalog::LONGDOUBLE:
1007 {
1008 long double val = in.getLongDoubleField(i);
1009
1010 switch (out->getColTypes()[i])
1011 {
1012 case CalpontSystemCatalog::TINYINT:
1013 case CalpontSystemCatalog::SMALLINT:
1014 case CalpontSystemCatalog::MEDINT:
1015 case CalpontSystemCatalog::INT:
1016 case CalpontSystemCatalog::BIGINT:
1017 if (out->getScale(i))
1018 goto dec4;
1019
1020 out->setIntField((int64_t) val, i);
1021 break;
1022
1023 case CalpontSystemCatalog::UTINYINT:
1024 case CalpontSystemCatalog::USMALLINT:
1025 case CalpontSystemCatalog::UMEDINT:
1026 case CalpontSystemCatalog::UINT:
1027 case CalpontSystemCatalog::UBIGINT:
1028 out->setUintField((uint64_t) val, i);
1029 break;
1030
1031 case CalpontSystemCatalog::FLOAT:
1032 case CalpontSystemCatalog::UFLOAT:
1033 out->setFloatField(val, i);
1034 break;
1035
1036 case CalpontSystemCatalog::DOUBLE:
1037 case CalpontSystemCatalog::UDOUBLE:
1038 out->setDoubleField(val, i);
1039 break;
1040
1041 case CalpontSystemCatalog::LONGDOUBLE:
1042 out->setLongDoubleField(val, i);
1043 break;
1044
1045 case CalpontSystemCatalog::CHAR:
1046 case CalpontSystemCatalog::TEXT:
1047 case CalpontSystemCatalog::VARCHAR:
1048 {
1049 ostringstream os;
1050 os.precision(15); // to match mysql's output
1051 os << val;
1052 out->setStringField(os.str(), i);
1053 break;
1054 }
1055
1056 case CalpontSystemCatalog::DECIMAL:
1057 case CalpontSystemCatalog::UDECIMAL:
1058 {
1059 dec4: /* have to pick a scale to use for the double. using 5... */
1060 uint32_t scale = 5;
1061 uint64_t ival = (uint64_t) (double) (val * pow((double) 10, (double) scale));
1062 int diff = out->getScale(i) - scale;
1063
1064 if (diff < 0)
1065 ival /= (uint64_t) pow((double) 10, (double) - diff);
1066 else
1067 ival *= (uint64_t) pow((double) 10, (double) diff);
1068
1069 out->setIntField((int64_t) val, i);
1070 break;
1071 }
1072
1073 default:
1074 ostringstream os;
1075 os << "TupleUnion::normalize(): tried an illegal conversion: floating point to "
1076 << out->getColTypes()[i];
1077 throw logic_error(os.str());
1078 }
1079
1080 break;
1081 }
1082
1083 case CalpontSystemCatalog::DECIMAL:
1084 case CalpontSystemCatalog::UDECIMAL:
1085 {
1086 int64_t val = in.getIntField(i);
1087 uint32_t scale = in.getScale(i);
1088
1089 switch (out->getColTypes()[i])
1090 {
1091 case CalpontSystemCatalog::TINYINT:
1092 case CalpontSystemCatalog::SMALLINT:
1093 case CalpontSystemCatalog::MEDINT:
1094 case CalpontSystemCatalog::INT:
1095 case CalpontSystemCatalog::BIGINT:
1096 case CalpontSystemCatalog::UTINYINT:
1097 case CalpontSystemCatalog::USMALLINT:
1098 case CalpontSystemCatalog::UMEDINT:
1099 case CalpontSystemCatalog::UINT:
1100 case CalpontSystemCatalog::UBIGINT:
1101 case CalpontSystemCatalog::DECIMAL:
1102 case CalpontSystemCatalog::UDECIMAL:
1103 {
1104 if (out->getScale(i) == scale)
1105 out->setIntField(val, i);
1106 else if (out->getScale(i) > scale)
1107 out->setIntField(IDB_pow[out->getScale(i) - scale] * val, i);
1108 else // should not happen, the output's scale is the largest
1109 throw logic_error("TupleUnion::normalize(): incorrect scale setting");
1110
1111 break;
1112 }
1113
1114 case CalpontSystemCatalog::FLOAT:
1115 case CalpontSystemCatalog::UFLOAT:
1116 {
1117 float fval = ((float) val) / IDB_pow[scale];
1118 out->setFloatField(fval, i);
1119 break;
1120 }
1121
1122 case CalpontSystemCatalog::DOUBLE:
1123 case CalpontSystemCatalog::UDOUBLE:
1124 {
1125 double dval = ((double) val) / IDB_pow[scale];
1126 out->setDoubleField(dval, i);
1127 break;
1128 }
1129
1130 case CalpontSystemCatalog::LONGDOUBLE:
1131 {
1132 long double dval = ((long double) val) / IDB_pow[scale];
1133 out->setLongDoubleField(dval, i);
1134 break;
1135 }
1136
1137 case CalpontSystemCatalog::CHAR:
1138 case CalpontSystemCatalog::TEXT:
1139 case CalpontSystemCatalog::VARCHAR:
1140 default:
1141 {
1142 char buf[50];
1143 dataconvert::DataConvert::decimalToString(val, scale, buf, 50, out->getColTypes()[i]);
1144 /* ostringstream oss;
1145 if (scale == 0)
1146 oss << val;
1147 else
1148 oss << (val / IDB_pow[scale]) << "."
1149 << setw(scale) << setfill('0') << (val % IDB_pow[scale]); */
1150 out->setStringField(string(buf), i);
1151 break;
1152 }
1153 }
1154
1155 break;
1156 }
1157
1158 case CalpontSystemCatalog::BLOB:
1159 case CalpontSystemCatalog::VARBINARY:
1160 {
1161 // out->setVarBinaryField(in.getVarBinaryStringField(i), i); // not efficient
1162 out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), i);
1163 break;
1164 }
1165
1166 default:
1167 {
1168 ostringstream os;
1169 os << "TupleUnion::normalize(): unknown input type (" << in.getColTypes()[i]
1170 << ")";
1171 cout << os.str() << endl;
1172 throw logic_error(os.str());
1173 }
1174 }
1175 }
1176 }
1177
run()1178 void TupleUnion::run()
1179 {
1180 uint32_t i;
1181
1182 boost::mutex::scoped_lock lk(jlLock);
1183
1184 if (runRan)
1185 return;
1186
1187 runRan = true;
1188 lk.unlock();
1189
1190 for (i = 0; i < fInputJobStepAssociation.outSize(); i++)
1191 inputs.push_back(fInputJobStepAssociation.outAt(i)->rowGroupDL());
1192
1193 output = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
1194
1195 if (fDelivery)
1196 {
1197 outputIt = output->getIterator();
1198 }
1199
1200 outputRG.initRow(&row);
1201 outputRG.initRow(&row2);
1202
1203 distinctCount = 0;
1204 normalizedData.reset(new RGData[inputs.size()]);
1205
1206 for (i = 0; i < inputs.size(); i++)
1207 {
1208 if (distinctFlags[i])
1209 {
1210 distinctCount++;
1211 normalizedData[i].reinit(outputRG);
1212 }
1213 }
1214
1215 runners.reserve(inputs.size());
1216
1217 for (i = 0; i < inputs.size(); i++)
1218 {
1219 runners.push_back(jobstepThreadPool.invoke(Runner(this, i)));
1220 }
1221 }
1222
join()1223 void TupleUnion::join()
1224 {
1225 boost::mutex::scoped_lock lk(jlLock);
1226
1227 if (joinRan)
1228 return;
1229
1230 joinRan = true;
1231 lk.unlock();
1232
1233 jobstepThreadPool.join(runners);
1234 runners.clear();
1235 uniquer->clear();
1236 rowMemory.clear();
1237 rm->returnMemory(memUsage, sessionMemLimit);
1238 memUsage = 0;
1239 }
1240
toString() const1241 const string TupleUnion::toString() const
1242 {
1243 ostringstream oss;
1244 oss << "TupleUnion ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId;
1245 oss << " st:" << fStepId;
1246 oss << " in:";
1247
1248 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
1249 oss << ((i == 0) ? " " : ", ") << fInputJobStepAssociation.outAt(i);
1250
1251 oss << " out:";
1252
1253 for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
1254 oss << ((i == 0) ? " " : ", ") << fOutputJobStepAssociation.outAt(i);
1255
1256 oss << endl;
1257
1258 return oss.str();
1259
1260 }
1261
writeNull(Row * out,uint32_t col)1262 void TupleUnion::writeNull(Row* out, uint32_t col)
1263 {
1264 switch (out->getColTypes()[col])
1265 {
1266 case CalpontSystemCatalog::TINYINT:
1267 out->setUintField<1>(joblist::TINYINTNULL, col);
1268 break;
1269
1270 case CalpontSystemCatalog::SMALLINT:
1271 out->setUintField<1>(joblist::SMALLINTNULL, col);
1272 break;
1273
1274 case CalpontSystemCatalog::UTINYINT:
1275 out->setUintField<1>(joblist::UTINYINTNULL, col);
1276 break;
1277
1278 case CalpontSystemCatalog::USMALLINT:
1279 out->setUintField<1>(joblist::USMALLINTNULL, col);
1280 break;
1281
1282 case CalpontSystemCatalog::DECIMAL:
1283 case CalpontSystemCatalog::UDECIMAL:
1284 {
1285 uint32_t len = out->getColumnWidth(col);
1286
1287 switch (len)
1288 {
1289 case 1:
1290 out->setUintField<1>(joblist::TINYINTNULL, col);
1291 break;
1292
1293 case 2:
1294 out->setUintField<1>(joblist::SMALLINTNULL, col);
1295 break;
1296
1297 case 4:
1298 out->setUintField<4>(joblist::INTNULL, col);
1299 break;
1300
1301 case 8:
1302 out->setUintField<8>(joblist::BIGINTNULL, col);
1303 break;
1304
1305 default:
1306 {}
1307 }
1308
1309 break;
1310 }
1311
1312 case CalpontSystemCatalog::MEDINT:
1313 case CalpontSystemCatalog::INT:
1314 out->setUintField<4>(joblist::INTNULL, col);
1315 break;
1316
1317 case CalpontSystemCatalog::UMEDINT:
1318 case CalpontSystemCatalog::UINT:
1319 out->setUintField<4>(joblist::UINTNULL, col);
1320 break;
1321
1322 case CalpontSystemCatalog::FLOAT:
1323 case CalpontSystemCatalog::UFLOAT:
1324 out->setUintField<4>(joblist::FLOATNULL, col);
1325 break;
1326
1327 case CalpontSystemCatalog::DATE:
1328 out->setUintField<4>(joblist::DATENULL, col);
1329 break;
1330
1331 case CalpontSystemCatalog::BIGINT:
1332 out->setUintField<8>(joblist::BIGINTNULL, col);
1333 break;
1334
1335 case CalpontSystemCatalog::UBIGINT:
1336 out->setUintField<8>(joblist::UBIGINTNULL, col);
1337 break;
1338
1339 case CalpontSystemCatalog::DOUBLE:
1340 case CalpontSystemCatalog::UDOUBLE:
1341 out->setUintField<8>(joblist::DOUBLENULL, col);
1342 break;
1343
1344 case CalpontSystemCatalog::DATETIME:
1345 out->setUintField<8>(joblist::DATETIMENULL, col);
1346 break;
1347
1348 case CalpontSystemCatalog::TIMESTAMP:
1349 out->setUintField<8>(joblist::TIMESTAMPNULL, col);
1350 break;
1351
1352 case CalpontSystemCatalog::TIME:
1353 out->setUintField<8>(joblist::TIMENULL, col);
1354 break;
1355
1356 case CalpontSystemCatalog::CHAR:
1357 case CalpontSystemCatalog::TEXT:
1358 case CalpontSystemCatalog::VARCHAR:
1359 {
1360 uint32_t len = out->getColumnWidth(col);
1361
1362 switch (len)
1363 {
1364 case 1:
1365 out->setUintField<1>(joblist::CHAR1NULL, col);
1366 break;
1367
1368 case 2:
1369 out->setUintField<2>(joblist::CHAR2NULL, col);
1370 break;
1371
1372 case 3:
1373 case 4:
1374 out->setUintField<4>(joblist::CHAR4NULL, col);
1375 break;
1376
1377 case 5:
1378 case 6:
1379 case 7:
1380 case 8:
1381 out->setUintField<8>(joblist::CHAR8NULL, col);
1382 break;
1383
1384 default:
1385 out->setStringField(joblist::CPNULLSTRMARK, col);
1386 break;
1387 }
1388
1389 break;
1390 }
1391
1392 case CalpontSystemCatalog::BLOB:
1393 case CalpontSystemCatalog::VARBINARY:
1394 // could use below if zero length and NULL are treated the same
1395 // out->setVarBinaryField("", col); break;
1396 out->setVarBinaryField(joblist::CPNULLSTRMARK, col);
1397 break;
1398
1399 default:
1400 { }
1401 }
1402 }
1403
formatMiniStats()1404 void TupleUnion::formatMiniStats()
1405 {
1406 ostringstream oss;
1407 oss << "TUS "
1408 << "UM "
1409 << "- "
1410 << "- "
1411 << "- "
1412 << "- "
1413 << "- "
1414 << "- "
1415 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
1416 << fRowsReturned << " ";
1417 fMiniInfo += oss.str();
1418 }
1419
1420 }
1421