1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (c) 2016-2020 MariaDB
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /***********************************************************************
20 *   $Id: pdictionaryscan.cpp 9655 2013-06-25 23:08:13Z xlou $
21 *
22 *
23 ***********************************************************************/
24 #include <stdexcept>
25 #include <cstring>
26 #include <utility>
27 #include <sstream>
28 //#define NDEBUG
29 #include <cassert>
30 #include <ctime>
31 using namespace std;
32 
33 #include <boost/thread.hpp>
34 #include <boost/thread/condition.hpp>
35 
36 #include "distributedenginecomm.h"
37 #include "elementtype.h"
38 #include "unique32generator.h"
39 #include "oamcache.h"
40 #include "jlf_common.h"
41 #include "primitivestep.h"
42 
43 #include "messagequeue.h"
44 using namespace messageqcpp;
45 #include "configcpp.h"
46 using namespace config;
47 
48 #include "messagelog.h"
49 #include "messageobj.h"
50 #include "loggingid.h"
51 #include "liboamcpp.h"
52 using namespace logging;
53 
54 #include "calpontsystemcatalog.h"
55 #include "logicoperator.h"
56 using namespace execplan;
57 
58 #include "brm.h"
59 using namespace BRM;
60 
61 #include "rowgroup.h"
62 using namespace rowgroup;
63 
64 #include "querytele.h"
65 using namespace querytele;
66 
67 #include "threadnaming.h"
68 #include "checks.h"
69 
70 namespace joblist
71 {
72 
73 struct pDictionaryScanPrimitive
74 {
pDictionaryScanPrimitivejoblist::pDictionaryScanPrimitive75     pDictionaryScanPrimitive(pDictionaryScan* pDictScan) : fPDictScan(pDictScan)
76     {}
77     pDictionaryScan* fPDictScan;
operator ()joblist::pDictionaryScanPrimitive78     void operator()()
79     {
80         try
81         {
82             utils::setThreadName("DSSScan");
83             fPDictScan->sendPrimitiveMessages();
84         }
85         catch (runtime_error& re)
86         {
87             catchHandler(re.what(), ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
88         }
89         catch (...)
90         {
91             catchHandler("pDictionaryScan send caught an unknown exception",
92                          ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
93         }
94 
95     }
96 };
97 
98 struct pDictionaryScanAggregator
99 {
pDictionaryScanAggregatorjoblist::pDictionaryScanAggregator100     pDictionaryScanAggregator(pDictionaryScan* pDictScan) : fPDictScan(pDictScan)
101     {}
102     pDictionaryScan* fPDictScan;
operator ()joblist::pDictionaryScanAggregator103     void operator()()
104     {
105         try
106         {
107             utils::setThreadName("DSSAgg");
108             fPDictScan->receivePrimitiveMessages();
109         }
110         catch (runtime_error& re)
111         {
112             catchHandler(re.what(), ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
113         }
114         catch (...)
115         {
116             catchHandler("pDictionaryScan receive caught an unknown exception",
117                          ERR_DICTIONARY_SCAN, fPDictScan->errorInfo(), fPDictScan->sessionId());
118         }
119 
120     }
121 };
122 
123 
pDictionaryScan(CalpontSystemCatalog::OID o,CalpontSystemCatalog::OID t,const CalpontSystemCatalog::ColType & ct,const JobInfo & jobInfo)124 pDictionaryScan::pDictionaryScan(
125     CalpontSystemCatalog::OID o,
126     CalpontSystemCatalog::OID t,
127     const CalpontSystemCatalog::ColType& ct,
128     const JobInfo& jobInfo) :
129     JobStep(jobInfo),
130     fDec(NULL),
131     sysCat(jobInfo.csc),
132     fOid(o),
133     fTableOid(t),
134     fFilterCount(0),
135     fBOP(BOP_NONE),
136     msgsSent(0),
137     msgsRecvd(0),
138     finishedSending(false),
139     recvWaiting(false),
140     sendWaiting(false),
141     ridCount(0),
142     ridList(0),
143     fColType(ct),
144     pThread(0),
145     cThread(0),
146     fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()),
147     fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()),
148     fStopSending(false),
149     fSingleThread(false),
150     fPhysicalIO(0),
151     fCacheIO(0),
152     fMsgBytesIn(0),
153     fMsgBytesOut(0),
154     fMsgsToPm(0),
155     fMsgsExpect(0),
156     fRm(jobInfo.rm),
157     isEquality(false)
158 {
159     int err;
160     DBRM dbrm;
161 
162     // Get list of non outOfService extents for the OID of interest
163     err = dbrm.lookup(fOid, fDictlbids);
164 
165     if (err)
166     {
167         ostringstream oss;
168         oss << "pDictionaryScan: lookup error (2)! For OID-" << fOid;
169         throw runtime_error(oss.str());
170     }
171 
172     err = dbrm.getExtents(fOid, extents);
173 
174     if (err)
175     {
176         ostringstream oss;
177         oss << "pDictionaryScan: dbrm.getExtents error! For OID-" << fOid;
178         throw runtime_error(oss.str());
179     }
180 
181     sort(extents.begin(), extents.end(), ExtentSorter());
182     numExtents = extents.size();
183     extentSize = (fRm->getExtentRows() * 8) / BLOCK_SIZE;
184 
185     uint64_t i = 1, mask = 1;
186 
187     for (; i <= 32; i++)
188     {
189         mask <<= 1;
190 
191         if (extentSize & mask)
192         {
193             divShift = i;
194             break;
195         }
196     }
197 
198     for (i++, mask <<= 1; i <= 32; i++, mask <<= 1)
199         if (extentSize & mask)
200             throw runtime_error("pDictionaryScan: Extent size must be a power of 2 in blocks");
201 
202     fCOP1 = COMPARE_NIL;
203     fCOP2 = COMPARE_NIL;
204 
205     uniqueID = UniqueNumberGenerator::instance()->getUnique32();
206     initializeConfigParms();
207     fExtendedInfo = "DSS: ";
208     fQtc.stepParms().stepType = StepTeleStats::T_DSS;
209 }
210 
~pDictionaryScan()211 pDictionaryScan::~pDictionaryScan()
212 {
213     if (fDec)
214     {
215         if (isEquality)
216             destroyEqualityFilter();
217 
218         fDec->removeQueue(uniqueID);
219     }
220 }
221 
222 //------------------------------------------------------------------------------
223 // Initialize configurable parameters
224 //------------------------------------------------------------------------------
initializeConfigParms()225 void pDictionaryScan::initializeConfigParms()
226 {
227     fLogicalBlocksPerScan = fRm->getJlLogicalBlocksPerScan();
228 }
229 
startPrimitiveThread()230 void pDictionaryScan::startPrimitiveThread()
231 {
232     pThread = jobstepThreadPool.invoke(pDictionaryScanPrimitive(this));
233 }
234 
startAggregationThread()235 void pDictionaryScan::startAggregationThread()
236 {
237     cThread = jobstepThreadPool.invoke(pDictionaryScanAggregator(this));
238 }
239 
run()240 void pDictionaryScan::run()
241 {
242     if (traceOn())
243     {
244         syslogStartStep(16,                  // exemgr subsystem
245                         std::string("pDictionaryScan")); // step name
246     }
247 
248     //For now, we cannot handle an input DL to this step
249     if (fInputJobStepAssociation.outSize() > 0)
250         throw logic_error("pDictionaryScan::run: don't know what to do with an input DL!");
251 
252     if (isEquality)
253         serializeEqualityFilter();
254 
255     startPrimitiveThread();
256     startAggregationThread();
257 }
258 
join()259 void pDictionaryScan::join()
260 {
261     jobstepThreadPool.join(pThread);
262     jobstepThreadPool.join(cThread);
263 
264     if (isEquality && fDec)
265     {
266         destroyEqualityFilter();
267         isEquality = false;
268     }
269 }
270 
addFilter(int8_t COP,const string & value)271 void pDictionaryScan::addFilter(int8_t COP, const string& value)
272 {
273 //	uint8_t* s = (uint8_t*)alloca(value.size() * sizeof(uint8_t));
274 
275 //	memcpy(s, value.data(), value.size());
276 //	fFilterString << (uint16_t) value.size();
277 //	fFilterString.append(s, value.size());
278     fFilterCount++;
279 
280     if (fFilterCount == 1)
281     {
282         fCOP1 = COP;
283 
284         if (COP == COMPARE_EQ || COP == COMPARE_NE)
285         {
286             isEquality = true;
287             equalityFilter.push_back(value);
288         }
289     }
290 
291     if (fFilterCount == 2)
292     {
293         fCOP2 = COP;
294 
295         // This static_cast should be safe since COP's are small, non-negative numbers
296         if ((COP == COMPARE_EQ || COP == COMPARE_NE) && COP == static_cast<int8_t>(fCOP1))
297         {
298             isEquality = true;
299             equalityFilter.push_back(value);
300         }
301         else
302         {
303             isEquality = false;
304             equalityFilter.clear();
305         }
306     }
307 
308     if (fFilterCount > 2 && isEquality)
309     {
310         fFilterString.reset();
311         equalityFilter.push_back(value);
312     }
313     else
314     {
315         fFilterString << (uint16_t) value.size();
316         fFilterString.append((const uint8_t*)value.data(), value.size());
317     }
318 }
319 
setRidList(DataList<ElementType> * dl)320 void pDictionaryScan::setRidList(DataList<ElementType>* dl)
321 {
322     ridList = dl;
323 }
324 
setBOP(int8_t b)325 void pDictionaryScan::setBOP(int8_t b)
326 {
327     fBOP = b;
328 }
329 
sendPrimitiveMessages()330 void pDictionaryScan::sendPrimitiveMessages()
331 {
332     LBIDRange_v::iterator it;
333     HWM_t hwm;
334     uint32_t fbo;
335     ByteStream primMsg(65536);
336     DBRM dbrm;
337     uint16_t dbroot;
338     uint32_t partNum;
339     uint16_t segNum;
340     BRM::OID_t oid;
341     boost::shared_ptr<map<int, int> > dbRootConnectionMap;
342     boost::shared_ptr<map<int, int> > dbRootPMMap;
343     oam::OamCache* oamCache = oam::OamCache::makeOamCache();
344     int localPMId = oamCache->getLocalPMId();
345 
346     try
347     {
348         dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
349         dbRootPMMap = oamCache->getDBRootToPMMap();
350 
351         it = fDictlbids.begin();
352 
353         for (; it != fDictlbids.end() && !cancelled(); it++)
354         {
355             LBID_t	msgLbidStart = it->start;
356             dbrm.lookupLocal(msgLbidStart,
357                              (BRM::VER_t)fVerId.currentScn,
358                              false,
359                              oid,
360                              dbroot,
361                              partNum,
362                              segNum,
363                              fbo);
364 
365             // Bug5741 If we are local only and this doesn't belongs to us, skip it
366             if (fLocalQuery == execplan::CalpontSelectExecutionPlan::LOCAL_QUERY)
367             {
368                 if (localPMId == 0)
369                     throw IDBExcept(ERR_LOCAL_QUERY_UM);
370 
371                 if (dbRootPMMap->find(dbroot)->second != localPMId)
372                     continue;
373             }
374 
375             // Retrieve the HWM blk for the segment file specified by the oid,
376             // partition, and segment number.  The extState arg indicates
377             // whether the hwm block is outOfService or not, but we already
378             // filtered out the outOfService extents when we constructed the
379             // fDictlbids list, so extState is extraneous info at this point.
380             int extState;
381             dbrm.getLocalHWM(oid, partNum, segNum, hwm, extState);
382 
383             uint32_t remainingLbids = fMsgsExpect =
384                                           ( (hwm > (fbo + it->size - 1)) ? (it->size) : (hwm - fbo + 1) );
385 
386             uint32_t msgLbidCount   =  fLogicalBlocksPerScan;
387 
388             while ( remainingLbids && !cancelled())
389             {
390                 if ( remainingLbids < msgLbidCount)
391                     msgLbidCount = remainingLbids;
392 
393                 if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end())
394                 {
395                     // MCOL-259 force a reload of the xml. This usualy fixes it.
396                     Logger log;
397                     log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary forcing reload of columnstore.xml for dbRootConnectionMap");
398                     oamCache->forceReload();
399                     dbRootConnectionMap = oamCache->getDBRootToConnectionMap();
400 
401                     if (dbRootConnectionMap->find(dbroot) == dbRootConnectionMap->end())
402                     {
403                         log.logMessage(logging::LOG_TYPE_DEBUG, "dictionary still not in dbRootConnectionMap");
404                         throw IDBExcept(ERR_DATA_OFFLINE);
405                     }
406                 }
407 
408                 sendAPrimitiveMessage(primMsg, msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]);
409                 primMsg.restart();
410 
411                 mutex.lock();
412                 msgsSent += msgLbidCount;
413 
414                 if (recvWaiting)
415                     condvar.notify_all();
416 
417                 while (( msgsSent - msgsRecvd) >  fScanLbidReqThreshold)
418                 {
419                     sendWaiting = true;
420                     condvarWakeupProducer.wait(mutex);
421                     sendWaiting = false;
422                 }
423 
424                 mutex.unlock();
425 
426                 remainingLbids -= msgLbidCount;
427                 msgLbidStart   += msgLbidCount;
428             }
429         } // end of loop through LBID ranges to be requested from primproc
430     }//try
431     catch (...)
432     {
433         handleException(std::current_exception(),
434                         logging::ERR_DICTIONARY_SCAN,
435                         logging::ERR_ALWAYS_CRITICAL,
436                         "pDictionaryScan::sendPrimitiveMessages()");
437         sendError(ERR_DICTIONARY_SCAN);
438     }
439 
440     mutex.lock();
441     finishedSending = true;
442 
443     if (recvWaiting)
444     {
445         condvar.notify_one();
446     }
447 
448     mutex.unlock();
449 
450 #ifdef DEBUG2
451 
452     if (fOid >= 3000)
453     {
454         time_t t = time(0);
455         char timeString[50];
456         ctime_r(&t, timeString);
457         timeString[strlen(timeString) - 1 ] = '\0';
458         cout << "pDictionaryScan Finished sending primitives for: " <<
459              fOid << " at " << timeString << endl;
460     }
461 
462 #endif
463 
464 }
465 
sendError(uint16_t s)466 void pDictionaryScan::sendError(uint16_t s)
467 {
468     status(s);
469 }
470 
471 //------------------------------------------------------------------------------
472 // Construct and send a single primitive message to primproc
473 //------------------------------------------------------------------------------
sendAPrimitiveMessage(ByteStream & primMsg,BRM::LBID_t msgLbidStart,uint32_t msgLbidCount,uint16_t pm)474 void pDictionaryScan::sendAPrimitiveMessage(
475     ByteStream& primMsg,
476     BRM::LBID_t	msgLbidStart,
477     uint32_t	msgLbidCount,
478     uint16_t	pm
479 )
480 {
481     DictTokenByScanRequestHeader hdr;
482     void *hdrp = static_cast<void*>(&hdr);
483     memset(hdrp, 0, sizeof(hdr));
484 
485     hdr.ism.Interleave    = pm;
486     hdr.ism.Flags         = planFlagsToPrimFlags(fTraceFlags);
487     hdr.ism.Command       = DICT_TOKEN_BY_SCAN_COMPARE;
488     hdr.ism.Size          = sizeof(DictTokenByScanRequestHeader) +
489                             fFilterString.length();
490     hdr.ism.Type          = 2;
491 
492     hdr.Hdr.SessionID     = fSessionId;
493     hdr.Hdr.TransactionID = fTxnId;
494     hdr.Hdr.VerID         = fVerId.currentScn;
495     hdr.Hdr.StepID        = fStepId;
496     hdr.Hdr.UniqueID	  = uniqueID;
497     hdr.Hdr.Priority	  = priority();
498 
499     hdr.LBID              = msgLbidStart;
500     idbassert(utils::is_nonnegative(hdr.LBID));
501     hdr.OutputType        = OT_TOKEN;
502     hdr.BOP               = fBOP;
503     hdr.COP1              = fCOP1;
504     hdr.COP2              = fCOP2;
505     hdr.NVALS             = fFilterCount;
506     hdr.Count             = msgLbidCount;
507     hdr.CompType          = fColType.ddn.compressionType;
508     hdr.charsetNumber     = fColType.charsetNumber;
509     idbassert(hdr.Count > 0);
510 
511     if (isEquality)
512         hdr.flags |= HAS_EQ_FILTER;
513 
514     if (fSessionId & 0x80000000)
515         hdr.flags |= IS_SYSCAT;
516 
517 
518     /* TODO: Need to figure out how to get the full fVerID into this msg.
519      * XXXPAT: The way I did it is IMO the least kludgy, while requiring only a couple
520      * changes.
521      * The old msg was: TokenByScanRequestHeader + fFilterString
522      * The new msg is: TokenByScanRequestHeader + fVerId + old message
523      * Prepending verid wastes a few bytes that go over the network, but that is better
524      * than putting it in the middle or at the end in terms of simplicity & memory usage,
525      * given the current code.
526      */
527 
528     primMsg.load((const uint8_t*) &hdr, sizeof(DictTokenByScanRequestHeader));
529     primMsg << fVerId;
530     primMsg.append((const uint8_t*) &hdr, sizeof(DictTokenByScanRequestHeader));
531     primMsg += fFilterString;
532 
533     //cout << "Sending rqst LBIDS " << msgLbidStart
534     //	<< " hdr.Count " << hdr.Count
535     //	<< " filterCount " << fFilterCount << endl;
536 #ifdef DEBUG2
537 
538     if (fOid >= 3000)
539         cout << "pDictionaryScan producer st: " << fStepId <<
540              ": sending req for lbid start "     << msgLbidStart <<
541              "; lbid count " << msgLbidCount     << endl;
542 
543 #endif
544 
545     try
546     {
547         fDec->write(uniqueID, primMsg);
548     }
549     catch (...)
550     {
551         abort();
552         handleException(std::current_exception(),
553                         logging::ERR_DICTIONARY_SCAN,
554                         logging::ERR_ALWAYS_CRITICAL,
555                         "pDictionaryScan::sendAPrimitiveMessage()");
556         sendError(ERR_DICTIONARY_SCAN);
557     }
558 
559     fMsgsToPm++;
560 }
561 
receivePrimitiveMessages()562 void pDictionaryScan::receivePrimitiveMessages()
563 {
564     RowGroupDL* rgFifo = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
565     boost::shared_ptr<ByteStream> bs;
566     RGData rgData;
567     Row r;
568 
569     fRidResults = 0;
570 
571     idbassert(fOutputRowGroup.getColumnCount() > 0);
572     fOutputRowGroup.initRow(&r);
573     rgData = RGData(fOutputRowGroup);
574     fOutputRowGroup.setData(&rgData);
575     fOutputRowGroup.resetRowGroup(0);
576 
577     StepTeleStats sts;
578     sts.query_uuid = fQueryUuid;
579     sts.step_uuid = fStepUuid;
580 
581     if (fOid >= 3000)
582     {
583         sts.msg_type = StepTeleStats::ST_START;
584         sts.total_units_of_work = fMsgsExpect;
585         postStepStartTele(sts);
586     }
587 
588     uint16_t error = status();
589 
590     //...Be careful here.  Mutex is locked prior to entering the loop, so
591     //...any continue statement in the loop must be sure the mutex is locked.
592     //error condition will not go through loop
593     if (!error) mutex.lock();
594 
595     try
596     {
597         while (!error)
598         {
599 
600             // sync with the send side
601             while (!finishedSending && msgsSent == msgsRecvd)
602             {
603                 recvWaiting = true;
604                 condvar.wait(mutex);
605                 recvWaiting = false;
606             }
607 
608             if (finishedSending && (msgsSent == msgsRecvd))
609             {
610                 mutex.unlock();
611                 break;
612             }
613 
614             mutex.unlock();
615 
616             fDec->read(uniqueID, bs);
617 
618             if (fOid >= 3000 && traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
619                 dlTimes.setFirstReadTime();
620 
621             if (fOid >= 3000 && traceOn())
622                 dlTimes.setLastReadTime();
623 
624             if (bs->length() == 0)
625             {
626                 mutex.lock();
627                 fStopSending = true;
628                 condvarWakeupProducer.notify_one();
629                 mutex.unlock();
630                 break;
631             }
632 
633             ISMPacketHeader* hdr = (ISMPacketHeader*)(bs->buf());
634             error = hdr->Status;
635 
636             if (! error)
637             {
638                 const ByteStream::byte* bsp = bs->buf();
639 
640                 // get the ResultHeader out of the bytestream
641                 const TokenByScanResultHeader* crh = reinterpret_cast<const TokenByScanResultHeader*>(bsp);
642                 bsp += sizeof(TokenByScanResultHeader);
643 
644                 fCacheIO    += crh->CacheIO;
645                 fPhysicalIO += crh->PhysicalIO;
646 
647                 // From this point on the rest of the bytestream is the data that comes back from the primitive server
648                 // This needs to be fed to a datalist that is retrieved from the outputassociation object.
649 
650                 PrimToken pt;
651                 uint64_t token;
652 #ifdef DEBUG
653                 cout << "dict step " << fStepId << "  NVALS = " << crh->NVALS << endl;
654 #endif
655 
656                 if (fOid >= 3000 && traceOn() && dlTimes.FirstInsertTime().tv_sec == 0)
657                     dlTimes.setFirstInsertTime();
658 
659                 for (int j = 0; j < crh->NVALS && !cancelled(); j++)
660                 {
661                     memcpy(&pt, bsp, sizeof(pt));
662                     bsp += sizeof(pt);
663                     uint64_t rid = fRidResults++;
664                     token = (pt.LBID << 10) | pt.offset;
665 
666                     fOutputRowGroup.getRow(fOutputRowGroup.getRowCount(), &r);
667                     // load r up w/ values
668                     r.setRid(rid);
669                     r.setUintField<8>(token, 0);
670                     fOutputRowGroup.incRowCount();
671 
672                     if (fOutputRowGroup.getRowCount() == 8192)
673                     {
674                         //INSERT_ADAPTER(rgFifo, rgData);
675                         //fOutputRowGroup.convertToInlineDataInPlace();
676                         rgFifo->insert(rgData);
677                         rgData = RGData(fOutputRowGroup);
678                         fOutputRowGroup.setData(&rgData);
679                         fOutputRowGroup.resetRowGroup(0);
680                     }
681                 }
682 
683                 mutex.lock();
684                 msgsRecvd++;
685 
686                 if (fOid >= 3000)
687                 {
688                     uint64_t progress = msgsRecvd * 100 / fMsgsExpect;
689 
690                     if (progress > fProgress)
691                     {
692                         fProgress = progress;
693                         sts.msg_type = StepTeleStats::ST_PROGRESS;
694                         sts.total_units_of_work = fMsgsExpect;
695                         sts.units_of_work_completed = msgsRecvd;
696                         postStepProgressTele(sts);
697                     }
698                 }
699 
700                 //...If producer is waiting, and we have gone below our threshold value,
701                 //...then we signal the producer to request more data from primproc
702                 if ( (sendWaiting) &&
703                         ( (msgsSent - msgsRecvd) < fScanLbidReqThreshold ) )
704                 {
705 #ifdef DEBUG2
706 
707                     if (fOid >= 3000)
708                         cout << "pDictionaryScan consumer signaling producer for "
709                              "more data: "  <<
710                              "st:"          << fStepId   <<
711                              "; sentCount-" << msgsSent  <<
712                              "; recvCount-" << msgsRecvd <<
713                              "; threshold-" << fScanLbidReqThreshold << endl;
714 
715 #endif
716                     condvarWakeupProducer.notify_one();
717                 }
718             }   //if !error
719             else
720             {
721                 mutex.lock();
722                 fStopSending = true;
723                 condvarWakeupProducer.notify_one();
724                 mutex.unlock();
725                 string errMsg;
726 
727                 //bs->advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
728                 //*bs >> errMsg;
729                 if (error < 1000)
730                 {
731                     logging::ErrorCodes errorcodes;
732                     errMsg = errorcodes.errorString(error);
733                 }
734                 else
735                 {
736                     errMsg = IDBErrorInfo::instance()->errorMsg(error);
737                 }
738 
739                 errorMessage(errMsg);
740                 status(error);
741             }
742         } // end of loop to read LBID responses from primproc
743     }
744     catch (const LargeDataListExcept& ex)
745     {
746         catchHandler(ex.what(), ERR_DICTIONARY_SCAN, fErrorInfo, fSessionId);
747         mutex.unlock();
748     }
749     catch (...)
750     {
751         handleException(std::current_exception(),
752                         logging::ERR_DICTIONARY_SCAN,
753                         logging::ERR_ALWAYS_CRITICAL,
754                         "pDictionaryScan::receivePrimitiveMessages()");
755         mutex.unlock();
756     }
757 
758     if (fOutputRowGroup.getRowCount() > 0)
759     {
760         //fOutputRowGroup.convertToInlineDataInPlace();
761         //INSERT_ADAPTER(rgFifo, rgData);
762         rgFifo->insert(rgData);
763         rgData = RGData(fOutputRowGroup);
764         fOutputRowGroup.setData(&rgData);
765         fOutputRowGroup.resetRowGroup(0);
766     }
767 
768     Stats stats = fDec->getNetworkStats(uniqueID);
769     fMsgBytesIn = stats.dataRecvd();
770     fMsgBytesOut = stats.dataSent();
771 
772     //@bug 699: Reset StepMsgQueue
773     fDec->removeQueue(uniqueID);
774 
775     if (fTableOid >= 3000)
776     {
777         //...Construct timestamp using ctime_r() instead of ctime() not
778         //...necessarily due to re-entrancy, but because we want to strip
779         //...the newline ('\n') off the end of the formatted string.
780         time_t t = time(0);
781         char timeString[50];
782         ctime_r(&t, timeString);
783         timeString[strlen(timeString) - 1 ] = '\0';
784 
785         //...Roundoff inbound msg byte count to nearest KB for display;
786         //...no need to do so for outbound, because it should be small.
787         uint64_t msgBytesInKB = fMsgBytesIn >> 10;
788 
789         if (fMsgBytesIn & 512)
790             msgBytesInKB++;
791 
792         if (traceOn())
793         {
794             dlTimes.setEndOfInputTime();
795 
796             //...Print job step completion information
797             ostringstream logStr;
798             logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " <<
799                    timeString << "; PhyI/O-" << fPhysicalIO << "; CacheI/O-" <<
800                    fCacheIO << "; MsgsSent-" << fMsgsToPm << "; MsgsRcvd-" << msgsRecvd <<
801                    "; output size-" << fRidResults << endl <<
802                    "\tMsgBytesIn-"  << msgBytesInKB  << "KB"
803                    "; MsgBytesOut-" << fMsgBytesOut  << "B" << endl    <<
804                    "\t1st read " << dlTimes.FirstReadTimeString() <<
805                    "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" <<
806                    JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s" << endl
807                    << "\tJob completion status " << status() << endl;
808 
809             logEnd(logStr.str().c_str());
810 
811             syslogReadBlockCounts(16,    // exemgr subsystem
812                                   fPhysicalIO,             // # blocks read from disk
813                                   fCacheIO,                // # blocks read from cache
814                                   0);                      // # casual partition block hits
815             syslogProcessingTimes(16,    // exemgr subsystem
816                                   dlTimes.FirstReadTime(),   // first datalist read
817                                   dlTimes.LastReadTime(),    // last  datalist read
818                                   dlTimes.FirstInsertTime(), // first datlist write
819                                   dlTimes.EndOfInputTime()); // last (endOfInput) datalist write
820             syslogEndStep(16,            // exemgr subsystem
821                           fMsgBytesIn,             // incoming msg byte count
822                           fMsgBytesOut);           // outgoing msg byte count
823             fExtendedInfo += toString() + logStr.str();
824             formatMiniStats();
825         }
826 
827         sts.msg_type = StepTeleStats::ST_SUMMARY;
828         sts.phy_io = fPhysicalIO;
829         sts.cache_io = fCacheIO;
830         sts.msg_rcv_cnt = sts.total_units_of_work = sts.units_of_work_completed = msgsRecvd;
831         sts.msg_bytes_in = fMsgBytesIn;
832         sts.msg_bytes_out = fMsgBytesOut;
833         sts.rows = fRidResults;
834         postStepSummaryTele(sts);
835     }
836 
837     rgFifo->endOfInput();
838 }
839 
toString() const840 const string pDictionaryScan::toString() const
841 {
842     ostringstream oss;
843     oss << "pDictionaryScan ses:" << fSessionId << " txn:" << fTxnId << " ver:" << fVerId
844         << " st:" << fStepId << " alias: " << (fAlias.length() ? fAlias : "none")
845         << " tb/col:" << fTableOid << "/" << fOid;
846     oss << " " << omitOidInDL
847         << fOutputJobStepAssociation.outAt(0) << showOidInDL;
848     oss << " nf:" << fFilterCount;
849     oss << " in:";
850 
851     for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
852     {
853         oss << fInputJobStepAssociation.outAt(i) << ", ";
854     }
855 
856     return oss.str();
857 }
858 
formatMiniStats()859 void pDictionaryScan::formatMiniStats()
860 {
861     ostringstream oss;
862     oss << "DSS "
863         << "PM "
864         << fAlias << " "
865         << fTableOid << " (" << fName << ") "
866         << fPhysicalIO << " "
867         << fCacheIO << " "
868         << "- "
869         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
870         << fRidResults << " ";
871     fMiniInfo += oss.str();
872 }
873 
874 
addFilter(const Filter * f)875 void pDictionaryScan::addFilter(const Filter* f)
876 {
877     if (NULL != f)
878         fFilters.push_back(f);
879 }
880 
881 
appendFilter(const std::vector<const execplan::Filter * > & fs)882 void pDictionaryScan::appendFilter(const std::vector<const execplan::Filter*>& fs)
883 {
884     fFilters.insert(fFilters.end(), fs.begin(), fs.end());
885 }
886 
887 
appendFilter(const messageqcpp::ByteStream & filter,unsigned count)888 void pDictionaryScan::appendFilter(const messageqcpp::ByteStream& filter, unsigned count)
889 {
890     fFilterString += filter;
891     fFilterCount += count;
892 }
893 
894 
serializeEqualityFilter()895 void pDictionaryScan::serializeEqualityFilter()
896 {
897     ByteStream msg;
898     ISMPacketHeader ism;
899     uint32_t i;
900     vector<string> empty;
901 
902     void *ismp = static_cast<void*>(&ism);
903     memset(ismp, 0, sizeof(ISMPacketHeader));
904     ism.Command  = DICT_CREATE_EQUALITY_FILTER;
905     msg.load((uint8_t*) &ism, sizeof(ISMPacketHeader));
906     msg << uniqueID;
907     msg << (uint32_t) colType().charsetNumber;
908     msg << (uint32_t) equalityFilter.size();
909 
910     for (i = 0; i < equalityFilter.size(); i++)
911         msg << equalityFilter[i];
912 
913     try
914     {
915         fDec->write(uniqueID, msg);
916     }
917     catch (...)
918     {
919         abort();
920         handleException(std::current_exception(),
921                         logging::ERR_DICTIONARY_SCAN,
922                         logging::ERR_ALWAYS_CRITICAL,
923                         "pDictionaryScan::serializeEqualityFilter()");
924     }
925 
926     empty.swap(equalityFilter);
927 }
928 
destroyEqualityFilter()929 void pDictionaryScan::destroyEqualityFilter()
930 {
931     ByteStream msg;
932     ISMPacketHeader ism;
933 
934     void *ismp = static_cast<void*>(&ism);
935     memset(ismp, 0, sizeof(ISMPacketHeader));
936     ism.Command  = DICT_DESTROY_EQUALITY_FILTER;
937     msg.load((uint8_t*) &ism, sizeof(ISMPacketHeader));
938     msg << uniqueID;
939 
940     try
941     {
942         fDec->write(uniqueID, msg);
943     }
944     catch (...)
945     {
946         abort();
947         handleException(std::current_exception(),
948                         logging::ERR_DICTIONARY_SCAN,
949                         logging::ERR_ALWAYS_CRITICAL,
950                         "pDictionaryScan::destroyEqualityFilter()");
951     }
952 }
953 
abort()954 void pDictionaryScan::abort()
955 {
956     JobStep::abort();
957 
958     if (fDec)
959         fDec->shutdownQueue(uniqueID);
960 }
961 
962 
963 
964 
965 }   //namespace
966 // vim:ts=4 sw=4:
967 
968