1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (c) 2016-2020 MariaDB
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /***********************************************************************
20 * $Id: pdictionaryscan.cpp 9655 2013-06-25 23:08:13Z xlou $
21 *
22 *
23 ***********************************************************************/
24 #include <stdexcept>
25 #include <cstring>
26 #include <utility>
27 #include <sstream>
28 //#define NDEBUG
29 #include <cassert>
30 #include <ctime>
31 using namespace std;
32
33 #include <boost/thread.hpp>
34 #include <boost/thread/condition.hpp>
35
36 #include "distributedenginecomm.h"
37 #include "elementtype.h"
38 #include "unique32generator.h"
39 #include "oamcache.h"
40 #include "jlf_common.h"
41 #include "primitivestep.h"
42
43 #include "messagequeue.h"
44 using namespace messageqcpp;
45 #include "configcpp.h"
46 using namespace config;
47
48 #include "messagelog.h"
49 #include "messageobj.h"
50 #include "loggingid.h"
51 #include "liboamcpp.h"
52 using namespace logging;
53
54 #include "calpontsystemcatalog.h"
55 #include "logicoperator.h"
56 using namespace execplan;
57
58 #include "brm.h"
59 using namespace BRM;
60
61 #include "rowgroup.h"
62 using namespace rowgroup;
63
64 #include "querytele.h"
65 using namespace querytele;
66
67 #include "threadnaming.h"
68 #include "checks.h"
69
70 namespace joblist
71 {
72
73 struct pDictionaryScanPrimitive
74 {
pDictionaryScanPrimitivejoblist::pDictionaryScanPrimitive75 pDictionaryScanPrimitive(pDictionaryScan* pDictScan) : fPDictScan(pDictScan)
76 {}
77 pDictionaryScan* fPDictScan;
operator ()joblist::pDictionaryScanPrimitive78 void operator()()
79 {
80 try
81 {
82 utils::setThreadName("DSSScan");
83 fPDictScan->sendPrimitiveMessages();
84 }
85 catch (runtime_error& re)
86 {
87 catchHandler(re.what(), ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
88 }
89 catch (...)
90 {
91 catchHandler("pDictionaryScan send caught an unknown exception",
92 ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
93 }
94
95 }
96 };
97
98 struct pDictionaryScanAggregator
99 {
pDictionaryScanAggregatorjoblist::pDictionaryScanAggregator100 pDictionaryScanAggregator(pDictionaryScan* pDictScan) : fPDictScan(pDictScan)
101 {}
102 pDictionaryScan* fPDictScan;
operator ()joblist::pDictionaryScanAggregator103 void operator()()
104 {
105 try
106 {
107 utils::setThreadName("DSSAgg");
108 fPDictScan->receivePrimitiveMessages();
109 }
110 catch (runtime_error& re)
111 {
112 catchHandler(re.what(), ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
113 }
114 catch (...)
115 {
116 catchHandler("pDictionaryScan receive caught an unknown exception",
117 ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
118 }
119
120 }
121 };
122
123
pDictionaryScan(CalpontSystemCatalog::OID o,CalpontSystemCatalog::OID t,const CalpontSystemCatalog::ColType & ct,const JobInfo & jobInfo)124 pDictionaryScan::pDictionaryScan(
125 CalpontSystemCatalog::OID o,
126 CalpontSystemCatalog::OID t,
127 const CalpontSystemCatalog::ColType& ct,
128 const JobInfo& jobInfo) :
129 JobStep(jobInfo),
130 fDec(NULL),
131 sysCat(jobInfo.csc),
132 fOid(o),
133 fTableOid(t),
134 fFilterCount(0),
135 fBOP(BOP_NONE),
136 msgsSent(0),
137 msgsRecvd(0),
138 finishedSending(false),
139 recvWaiting(false),
140 sendWaiting(false),
141 ridCount(0),
142 ridList(0),
143 fColType(ct),
144 pThread(0),
145 cThread(0),
146 fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()),
147 fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()),
148 fStopSending(false),
149 fSingleThread(false),
150 fPhysicalIO(0),
151 fCacheIO(0),
152 fMsgBytesIn(0),
153 fMsgBytesOut(0),
154 fMsgsToPm(0),
155 fMsgsExpect(0),
156 fRm(jobInfo.rm),
157 isEquality(false)
158 {
159 int err;
160 DBRM dbrm;
161
162 // Get list of non outOfService extents for the OID of interest
163 err = dbrm.lookup(fOid, fDictlbids);
164
165 if (err)
166 {
167 ostringstream oss;
168 oss << "pDictionaryScan: lookup error (2)! For OID-" << fOid;
169 throw runtime_error(oss.str());
170 }
171
172 err = dbrm.getExtents(fOid, extents);
173
174 if (err)
175 {
176 ostringstream oss;
177 oss << "pDictionaryScan: dbrm.getExtents error! For OID-" << fOid;
178 throw runtime_error(oss.str());
179 }
180
181 sort(extents.begin(), extents.end(), ExtentSorter());
182 numExtents = extents.size();
183 extentSize = (fRm->getExtentRows() * 8) / BLOCK_SIZE;
184
185 uint64_t i = 1, mask = 1;
186
187 for (; i <= 32; i++)
188 {
189 mask <<= 1;
190
191 if (extentSize & mask)
192 {
193 divShift = i;
194 break;
195 }
196 }
197
198 for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
199 if (extentSize & mask)
200 throw runtime_error("pDictionaryScan: Extent size must be a power of 2 in blocks");
201
202 fCOP1 = COMPARE_NIL;
203 fCOP2 = COMPARE_NIL;
204
205 uniqueID = UniqueNumberGenerator::instance()->getUnique32();
206 initializeConfigParms();
207 fExtendedInfo = "DSS: ";
208 fQtc.stepParms().stepType = StepTeleStats::T_DSS;
209 }
210
~pDictionaryScan()211 pDictionaryScan::~pDictionaryScan()
212 {
213 if (fDec)
214 {
215 if (isEquality)
216 destroyEqualityFilter();
217
218 fDec->removeQueue(uniqueID);
219 }
220 }
221
222 //------------------------------------------------------------------------------
223 // Initialize configurable parameters
224 //------------------------------------------------------------------------------
initializeConfigParms()225 void pDictionaryScan::initializeConfigParms()
226 {
227 fLogicalBlocksPerScan = fRm->getJlLogicalBlocksPerScan();
228 }
229
startPrimitiveThread()230 void pDictionaryScan::startPrimitiveThread()
231 {
232 pThread = jobstepThreadPool.invoke(pDictionaryScanPrimitive(this));
233 }
234
startAggregationThread()235 void pDictionaryScan::startAggregationThread()
236 {
237 cThread = jobstepThreadPool.invoke(pDictionaryScanAggregator(this));
238 }
239
run()240 void pDictionaryScan::run()
241 {
242 if (traceOn())
243 {
244 syslogStartStep(16, // exemgr subsystem
245 std::string("pDictionaryScan")); // step name
246 }
247
248 //For now, we cannot handle an input DL to this step
249 if (fInputJobStepAssociation.outSize() > 0)
250 throw logic_error("pDictionaryScan::run: don't know what to do with an input DL!");
251
252 if (isEquality)
253 serializeEqualityFilter();
254
255 startPrimitiveThread();
256 startAggregationThread();
257 }
258
join()259 void pDictionaryScan::join()
260 {
261 jobstepThreadPool.join(pThread);
262 jobstepThreadPool.join(cThread);
263
264 if (isEquality && fDec)
265 {
266 destroyEqualityFilter();
267 isEquality = false;
268 }
269 }
270
addFilter(int8_t COP,const string & value)271 void pDictionaryScan::addFilter(int8_t COP, const string& value)
272 {
273 // uint8_t* s = (uint8_t*)alloca(value.size() * sizeof(uint8_t));
274
275 // memcpy(s, value.data(), value.size());
276 // fFilterString << (uint16_t) value.size();
277 // fFilterString.append(s, value.size());
278 fFilterCount++;
279
280 if (fFilterCount == 1)
281 {
282 fCOP1 = COP;
283
284 if (COP == COMPARE_EQ || COP == COMPARE_NE)
285 {
286 isEquality = true;
287 equalityFilter.push_back(value);
288 }
289 }
290
291 if (fFilterCount == 2)
292 {
293 fCOP2 = COP;
294
295 // This static_cast should be safe since COP's are small, non-negative numbers
296 if ((COP == COMPARE_EQ || COP == COMPARE_NE) && COP == static_cast<int8_t>(fCOP1))
297 {
298 isEquality = true;
299 equalityFilter.push_back(value);
300 }
301 else
302 {
303 isEquality = false;
304 equalityFilter.clear();
305 }
306 }
307
308 if (fFilterCount > 2 && isEquality)
309 {
310 fFilterString.reset();
311 equalityFilter.push_back(value);
312 }
313 else
314 {
315 fFilterString << (uint16_t) value.size();
316 fFilterString.append((const uint8_t*)value.data(), value.size());
317 }
318 }
319
setRidList(DataList<ElementType> * dl)320 void pDictionaryScan::setRidList(DataList<ElementType>* dl)
321 {
322 ridList = dl;
323 }
324
setBOP(int8_t b)325 void pDictionaryScan::setBOP(int8_t b)
326 {
327 fBOP = b;
328 }
329
sendPrimitiveMessages()330 void pDictionaryScan::sendPrimitiveMessages()
331 {
332 LBIDRange_v::iterator it;
333 HWM_t hwm;
334 uint32_t fbo;
335 ByteStream primMsg(65536);
336 DBRM dbrm;
337 uint16_t dbroot;
338 uint32_t partNum;
339 uint16_t segNum;
340 BRM::OID_t oid;
341 boost::shared_ptr<map<int, int> > dbRootConnectionMap;
342 boost::shared_ptr<map<int, int> > dbRootPMMap;
343 oam::OamCache* oamCache = oam::OamCache::makeOamCache();
344 int localPMId = oamCache->getLocalPMId();
345
346 try
347 {
348 dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
349 dbRootPMMap = oamCache->getDBRootToPMMap();
350
351 it = fDictlbids.begin();
352
353 for (; it != fDictlbids.end() && !cancelled(); it++)
354 {
355 LBID_t msgLbidStart = it->start;
356 dbrm.lookupLocal(msgLbidStart,
357 (BRM::VER_t)fVerId.currentScn,
358 false,
359 oid,
360 dbroot,
361 partNum,
362 segNum,
363 fbo);
364
365 // Bug5741 If we are local only and this doesn't belongs to us, skip it
366 if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY)
367 {
368 if (localPMId == 0)
369 throw IDBExcept(ERR_LOCAL_QUERY_UM);
370
371 if (dbRootPMMap->find(dbroot)->second != localPMId)
372 continue;
373 }
374
375 // Retrieve the HWM blk for the segment file specified by the oid,
376 // partition, and segment number. The extState arg indicates
377 // whether the hwm block is outOfService or not, but we already
378 // filtered out the outOfService extents when we constructed the
379 // fDictlbids list, so extState is extraneous info at this point.
380 int extState;
381 dbrm.getLocalHWM(oid, partNum, segNum, hwm, extState);
382
383 uint32_t remainingLbids = fMsgsExpect =
384 ( (hwm > (fbo + it->size - 1)) ? (it->size) : (hwm - fbo + 1) );
385
386 uint32_t msgLbidCount = fLogicalBlocksPerScan;
387
388 while ( remainingLbids && !cancelled())
389 {
390 if ( remainingLbids < msgLbidCount)
391 msgLbidCount = remainingLbids;
392
393 if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end())
394 {
395 // MCOL-259 force a reload of the xml. This usualy fixes it.
396 Logger log;
397 log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary forcing reload of columnstore.xml for dbRootConnectionMap");
398 oamCache->forceReload();
399 dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
400
401 if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end())
402 {
403 log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary still not in dbRootConnectionMap");
404 throw IDBExcept(ERR_DATA_OFFLINE);
405 }
406 }
407
408 sendAPrimitiveMessage(primMsg, msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]);
409 primMsg.restart();
410
411 mutex.lock();
412 msgsSent += msgLbidCount;
413
414 if (recvWaiting)
415 condvar.notify_all();
416
417 while (( msgsSent - msgsRecvd) > fScanLbidReqThreshold)
418 {
419 sendWaiting = true;
420 condvarWakeupProducer.wait(mutex);
421 sendWaiting = false;
422 }
423
424 mutex.unlock();
425
426 remainingLbids -= msgLbidCount;
427 msgLbidStart += msgLbidCount;
428 }
429 } // end of loop through LBID ranges to be requested from primproc
430 }//try
431 catch (...)
432 {
433 handleException(std::current_exception(),
434 logging::ERR_DICTIONARY_SCAN,
435 logging::ERR_ALWAYS_CRITICAL,
436 "pDictionaryScan::sendPrimitiveMessages()");
437 sendError(ERR_DICTIONARY_SCAN);
438 }
439
440 mutex.lock();
441 finishedSending = true;
442
443 if (recvWaiting)
444 {
445 condvar.notify_one();
446 }
447
448 mutex.unlock();
449
450 #ifdef DEBUG2
451
452 if (fOid >= 3000)
453 {
454 time_t t = time(0);
455 char timeString[50];
456 ctime_r(&t, timeString);
457 timeString[strlen(timeString) - 1 ] = '\0';
458 cout << "pDictionaryScan Finished sending primitives for: " <<
459 fOid << " at " << timeString << endl;
460 }
461
462 #endif
463
464 }
465
sendError(uint16_t s)466 void pDictionaryScan::sendError(uint16_t s)
467 {
468 status(s);
469 }
470
471 //------------------------------------------------------------------------------
472 // Construct and send a single primitive message to primproc
473 //------------------------------------------------------------------------------
sendAPrimitiveMessage(ByteStream & primMsg,BRM::LBID_t msgLbidStart,uint32_t msgLbidCount,uint16_t pm)474 void pDictionaryScan::sendAPrimitiveMessage(
475 ByteStream& primMsg,
476 BRM::LBID_t msgLbidStart,
477 uint32_t msgLbidCount,
478 uint16_t pm
479 )
480 {
481 DictTokenByScanRequestHeader hdr;
482 void *hdrp = static_cast<void*>(&hdr);
483 memset(hdrp, 0, sizeof(hdr));
484
485 hdr.ism.Interleave = pm;
486 hdr.ism.Flags = planFlagsToPrimFlags(fTraceFlags);
487 hdr.ism.Command = DICT_TOKEN_BY_SCAN_COMPARE;
488 hdr.ism.Size = sizeof(DictTokenByScanRequestHeader) +
489 fFilterString.length();
490 hdr.ism.Type = 2;
491
492 hdr.Hdr.SessionID = fSessionId;
493 hdr.Hdr.TransactionID = fTxnId;
494 hdr.Hdr.VerID = fVerId.currentScn;
495 hdr.Hdr.StepID = fStepId;
496 hdr.Hdr.UniqueID = uniqueID;
497 hdr.Hdr.Priority = priority();
498
499 hdr.LBID = msgLbidStart;
500 idbassert(utils::is_nonnegative(hdr.LBID));
501 hdr.OutputType = OT_TOKEN;
502 hdr.BOP = fBOP;
503 hdr.COP1 = fCOP1;
504 hdr.COP2 = fCOP2;
505 hdr.NVALS = fFilterCount;
506 hdr.Count = msgLbidCount;
507 hdr.CompType = fColType.ddn.compressionType;
508 hdr.charsetNumber = fColType.charsetNumber;
509 idbassert(hdr.Count > 0);
510
511 if (isEquality)
512 hdr.flags |= HAS_EQ_FILTER;
513
514 if (fSessionId & 0x80000000)
515 hdr.flags |= IS_SYSCAT;
516
517
518 /* TODO: Need to figure out how to get the full fVerID into this msg.
519 * XXXPAT: The way I did it is IMO the least kludgy, while requiring only a couple
520 * changes.
521 * The old msg was: TokenByScanRequestHeader + fFilterString
522 * The new msg is: TokenByScanRequestHeader + fVerId + old message
523 * Prepending verid wastes a few bytes that go over the network, but that is better
524 * than putting it in the middle or at the end in terms of simplicity & memory usage,
525 * given the current code.
526 */
527
528 primMsg.load((const uint8_t*) &hdr, sizeof(DictTokenByScanRequestHeader));
529 primMsg << fVerId;
530 primMsg.append((const uint8_t*) &hdr, sizeof(DictTokenByScanRequestHeader));
531 primMsg += fFilterString;
532
533 //cout << "Sending rqst LBIDS " << msgLbidStart
534 // << " hdr.Count " << hdr.Count
535 // << " filterCount " << fFilterCount << endl;
536 #ifdef DEBUG2
537
538 if (fOid >= 3000)
539 cout << "pDictionaryScan producer st: " << fStepId <<
540 ": sending req for lbid start " << msgLbidStart <<
541 "; lbid count " << msgLbidCount << endl;
542
543 #endif
544
545 try
546 {
547 fDec->write(uniqueID, primMsg);
548 }
549 catch (...)
550 {
551 abort();
552 handleException(std::current_exception(),
553 logging::ERR_DICTIONARY_SCAN,
554 logging::ERR_ALWAYS_CRITICAL,
555 "pDictionaryScan::sendAPrimitiveMessage()");
556 sendError(ERR_DICTIONARY_SCAN);
557 }
558
559 fMsgsToPm++;
560 }
561
receivePrimitiveMessages()562 void pDictionaryScan::receivePrimitiveMessages()
563 {
564 RowGroupDL* rgFifo = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
565 boost::shared_ptr<ByteStream> bs;
566 RGData rgData;
567 Row r;
568
569 fRidResults = 0;
570
571 idbassert(fOutputRowGroup.getColumnCount() > 0);
572 fOutputRowGroup.initRow(&r);
573 rgData = RGData(fOutputRowGroup);
574 fOutputRowGroup.setData(&rgData);
575 fOutputRowGroup.resetRowGroup(0);
576
577 StepTeleStats sts;
578 sts.query_uuid = fQueryUuid;
579 sts.step_uuid = fStepUuid;
580
581 if (fOid >= 3000)
582 {
583 sts.msg_type = StepTeleStats::ST_START;
584 sts.total_units_of_work = fMsgsExpect;
585 postStepStartTele(sts);
586 }
587
588 uint16_t error = status();
589
590 //...Be careful here. Mutex is locked prior to entering the loop, so
591 //...any continue statement in the loop must be sure the mutex is locked.
592 //error condition will not go through loop
593 if (!error) mutex.lock();
594
595 try
596 {
597 while (!error)
598 {
599
600 // sync with the send side
601 while (!finishedSending && msgsSent == msgsRecvd)
602 {
603 recvWaiting = true;
604 condvar.wait(mutex);
605 recvWaiting = false;
606 }
607
608 if (finishedSending && (msgsSent == msgsRecvd))
609 {
610 mutex.unlock();
611 break;
612 }
613
614 mutex.unlock();
615
616 fDec->read(uniqueID, bs);
617
618 if (fOid >= 3000 && traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
619 dlTimes.setFirstReadTime();
620
621 if (fOid >= 3000 && traceOn())
622 dlTimes.setLastReadTime();
623
624 if (bs->length() == 0)
625 {
626 mutex.lock();
627 fStopSending = true;
628 condvarWakeupProducer.notify_one();
629 mutex.unlock();
630 break;
631 }
632
633 ISMPacketHeader* hdr = (ISMPacketHeader*)(bs->buf());
634 error = hdr->Status;
635
636 if (! error)
637 {
638 const ByteStream::byte* bsp = bs->buf();
639
640 // get the ResultHeader out of the bytestream
641 const TokenByScanResultHeader* crh = reinterpret_cast<const TokenByScanResultHeader*>(bsp);
642 bsp += sizeof(TokenByScanResultHeader);
643
644 fCacheIO += crh->CacheIO;
645 fPhysicalIO += crh->PhysicalIO;
646
647 // From this point on the rest of the bytestream is the data that comes back from the primitive server
648 // This needs to be fed to a datalist that is retrieved from the outputassociation object.
649
650 PrimToken pt;
651 uint64_t token;
652 #ifdef DEBUG
653 cout << "dict step " << fStepId << " NVALS = " << crh->NVALS << endl;
654 #endif
655
656 if (fOid >= 3000 && traceOn() && dlTimes.FirstInsertTime().tv_sec == 0)
657 dlTimes.setFirstInsertTime();
658
659 for (int j = 0; j < crh->NVALS && !cancelled(); j++)
660 {
661 memcpy(&pt, bsp, sizeof(pt));
662 bsp += sizeof(pt);
663 uint64_t rid = fRidResults++;
664 token = (pt.LBID << 10) | pt.offset;
665
666 fOutputRowGroup.getRow(fOutputRowGroup.getRowCount(), &r);
667 // load r up w/ values
668 r.setRid(rid);
669 r.setUintField<8>(token, 0);
670 fOutputRowGroup.incRowCount();
671
672 if (fOutputRowGroup.getRowCount() == 8192)
673 {
674 //INSERT_ADAPTER(rgFifo, rgData);
675 //fOutputRowGroup.convertToInlineDataInPlace();
676 rgFifo->insert(rgData);
677 rgData = RGData(fOutputRowGroup);
678 fOutputRowGroup.setData(&rgData);
679 fOutputRowGroup.resetRowGroup(0);
680 }
681 }
682
683 mutex.lock();
684 msgsRecvd++;
685
686 if (fOid >= 3000)
687 {
688 uint64_t progress = msgsRecvd * 100 / fMsgsExpect;
689
690 if (progress > fProgress)
691 {
692 fProgress = progress;
693 sts.msg_type = StepTeleStats::ST_PROGRESS;
694 sts.total_units_of_work = fMsgsExpect;
695 sts.units_of_work_completed = msgsRecvd;
696 postStepProgressTele(sts);
697 }
698 }
699
700 //...If producer is waiting, and we have gone below our threshold value,
701 //...then we signal the producer to request more data from primproc
702 if ( (sendWaiting) &&
703 ( (msgsSent - msgsRecvd) < fScanLbidReqThreshold ) )
704 {
705 #ifdef DEBUG2
706
707 if (fOid >= 3000)
708 cout << "pDictionaryScan consumer signaling producer for "
709 "more data: " <<
710 "st:" << fStepId <<
711 "; sentCount-" << msgsSent <<
712 "; recvCount-" << msgsRecvd <<
713 "; threshold-" << fScanLbidReqThreshold << endl;
714
715 #endif
716 condvarWakeupProducer.notify_one();
717 }
718 } //if !error
719 else
720 {
721 mutex.lock();
722 fStopSending = true;
723 condvarWakeupProducer.notify_one();
724 mutex.unlock();
725 string errMsg;
726
727 //bs->advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
728 //*bs >> errMsg;
729 if (error < 1000)
730 {
731 logging::ErrorCodes errorcodes;
732 errMsg = errorcodes.errorString(error);
733 }
734 else
735 {
736 errMsg = IDBErrorInfo::instance()->errorMsg(error);
737 }
738
739 errorMessage(errMsg);
740 status(error);
741 }
742 } // end of loop to read LBID responses from primproc
743 }
744 catch (const LargeDataListExcept& ex)
745 {
746 catchHandler(ex.what(), ERR_DICTIONARY_SCAN, fErrorInfo, fSessionId);
747 mutex.unlock();
748 }
749 catch (...)
750 {
751 handleException(std::current_exception(),
752 logging::ERR_DICTIONARY_SCAN,
753 logging::ERR_ALWAYS_CRITICAL,
754 "pDictionaryScan::receivePrimitiveMessages()");
755 mutex.unlock();
756 }
757
758 if (fOutputRowGroup.getRowCount() > 0)
759 {
760 //fOutputRowGroup.convertToInlineDataInPlace();
761 //INSERT_ADAPTER(rgFifo, rgData);
762 rgFifo->insert(rgData);
763 rgData = RGData(fOutputRowGroup);
764 fOutputRowGroup.setData(&rgData);
765 fOutputRowGroup.resetRowGroup(0);
766 }
767
768 Stats stats = fDec->getNetworkStats(uniqueID);
769 fMsgBytesIn = stats.dataRecvd();
770 fMsgBytesOut = stats.dataSent();
771
772 //@bug 699: Reset StepMsgQueue
773 fDec->removeQueue(uniqueID);
774
775 if (fTableOid >= 3000)
776 {
777 //...Construct timestamp using ctime_r() instead of ctime() not
778 //...necessarily due to re-entrancy, but because we want to strip
779 //...the newline ('\n') off the end of the formatted string.
780 time_t t = time(0);
781 char timeString[50];
782 ctime_r(&t, timeString);
783 timeString[strlen(timeString) - 1 ] = '\0';
784
785 //...Roundoff inbound msg byte count to nearest KB for display;
786 //...no need to do so for outbound, because it should be small.
787 uint64_t msgBytesInKB = fMsgBytesIn >> 10;
788
789 if (fMsgBytesIn & 512)
790 msgBytesInKB++;
791
792 if (traceOn())
793 {
794 dlTimes.setEndOfInputTime();
795
796 //...Print job step completion information
797 ostringstream logStr;
798 logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " <<
799 timeString << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" <<
800 fCacheIO << "; MsgsSent-" << fMsgsToPm << "; MsgsRcvd-" << msgsRecvd <<
801 "; output size-" << fRidResults << endl <<
802 "\tMsgBytesIn-" << msgBytesInKB << "KB"
803 "; MsgBytesOut-" << fMsgBytesOut << "B" << endl <<
804 "\t1st read " << dlTimes.FirstReadTimeString() <<
805 "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" <<
806 JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s" << endl
807 << "\tJob completion status " << status() << endl;
808
809 logEnd(logStr.str().c_str());
810
811 syslogReadBlockCounts(16, // exemgr subsystem
812 fPhysicalIO, // # blocks read from disk
813 fCacheIO, // # blocks read from cache
814 0); // # casual partition block hits
815 syslogProcessingTimes(16, // exemgr subsystem
816 dlTimes.FirstReadTime(), // first datalist read
817 dlTimes.LastReadTime(), // last datalist read
818 dlTimes.FirstInsertTime(), // first datlist write
819 dlTimes.EndOfInputTime()); // last (endOfInput) datalist write
820 syslogEndStep(16, // exemgr subsystem
821 fMsgBytesIn, // incoming msg byte count
822 fMsgBytesOut); // outgoing msg byte count
823 fExtendedInfo += toString() + logStr.str();
824 formatMiniStats();
825 }
826
827 sts.msg_type = StepTeleStats::ST_SUMMARY;
828 sts.phy_io = fPhysicalIO;
829 sts.cache_io = fCacheIO;
830 sts.msg_rcv_cnt = sts.total_units_of_work = sts.units_of_work_completed = msgsRecvd;
831 sts.msg_bytes_in = fMsgBytesIn;
832 sts.msg_bytes_out = fMsgBytesOut;
833 sts.rows = fRidResults;
834 postStepSummaryTele(sts);
835 }
836
837 rgFifo->endOfInput();
838 }
839
toString() const840 const string pDictionaryScan::toString() const
841 {
842 ostringstream oss;
843 oss << "pDictionaryScan ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId
844 << " st:" << fStepId << " alias: " << (fAlias.length() ? fAlias : "none")
845 << " tb/col:" << fTableOid << "/" << fOid;
846 oss << " " << omitOidInDL
847 << fOutputJobStepAssociation.outAt(0) << showOidInDL;
848 oss << " nf:" << fFilterCount;
849 oss << " in:";
850
851 for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
852 {
853 oss << fInputJobStepAssociation.outAt(i) << ", ";
854 }
855
856 return oss.str();
857 }
858
formatMiniStats()859 void pDictionaryScan::formatMiniStats()
860 {
861 ostringstream oss;
862 oss << "DSS "
863 << "PM "
864 << fAlias << " "
865 << fTableOid << " (" << fName << ") "
866 << fPhysicalIO << " "
867 << fCacheIO << " "
868 << "- "
869 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
870 << fRidResults << " ";
871 fMiniInfo += oss.str();
872 }
873
874
addFilter(const Filter * f)875 void pDictionaryScan::addFilter(const Filter* f)
876 {
877 if (NULL != f)
878 fFilters.push_back(f);
879 }
880
881
appendFilter(const std::vector<const execplan::Filter * > & fs)882 void pDictionaryScan::appendFilter(const std::vector<const execplan::Filter*>& fs)
883 {
884 fFilters.insert(fFilters.end(), fs.begin(), fs.end());
885 }
886
887
appendFilter(const messageqcpp::ByteStream & filter,unsigned count)888 void pDictionaryScan::appendFilter(const messageqcpp::ByteStream& filter, unsigned count)
889 {
890 fFilterString += filter;
891 fFilterCount += count;
892 }
893
894
serializeEqualityFilter()895 void pDictionaryScan::serializeEqualityFilter()
896 {
897 ByteStream msg;
898 ISMPacketHeader ism;
899 uint32_t i;
900 vector<string> empty;
901
902 void *ismp = static_cast<void*>(&ism);
903 memset(ismp, 0, sizeof(ISMPacketHeader));
904 ism.Command = DICT_CREATE_EQUALITY_FILTER;
905 msg.load((uint8_t*) &ism, sizeof(ISMPacketHeader));
906 msg << uniqueID;
907 msg << (uint32_t) colType().charsetNumber;
908 msg << (uint32_t) equalityFilter.size();
909
910 for (i = 0; i < equalityFilter.size(); i++)
911 msg << equalityFilter[i];
912
913 try
914 {
915 fDec->write(uniqueID, msg);
916 }
917 catch (...)
918 {
919 abort();
920 handleException(std::current_exception(),
921 logging::ERR_DICTIONARY_SCAN,
922 logging::ERR_ALWAYS_CRITICAL,
923 "pDictionaryScan::serializeEqualityFilter()");
924 }
925
926 empty.swap(equalityFilter);
927 }
928
destroyEqualityFilter()929 void pDictionaryScan::destroyEqualityFilter()
930 {
931 ByteStream msg;
932 ISMPacketHeader ism;
933
934 void *ismp = static_cast<void*>(&ism);
935 memset(ismp, 0, sizeof(ISMPacketHeader));
936 ism.Command = DICT_DESTROY_EQUALITY_FILTER;
937 msg.load((uint8_t*) &ism, sizeof(ISMPacketHeader));
938 msg << uniqueID;
939
940 try
941 {
942 fDec->write(uniqueID, msg);
943 }
944 catch (...)
945 {
946 abort();
947 handleException(std::current_exception(),
948 logging::ERR_DICTIONARY_SCAN,
949 logging::ERR_ALWAYS_CRITICAL,
950 "pDictionaryScan::destroyEqualityFilter()");
951 }
952 }
953
abort()954 void pDictionaryScan::abort()
955 {
956 JobStep::abort();
957
958 if (fDec)
959 fDec->shutdownQueue(uniqueID);
960 }
961
962
963
964
965 } //namespace
966 // vim:ts=4 sw=4:
967
968