1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /***********************************************************************
20  *   $Id: primitiveserver.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
21  *
22  *
23  ***********************************************************************/
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26 #include <unistd.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <stdexcept>
31 //#define NDEBUG
32 #include <cassert>
33 #include <boost/thread.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/foreach.hpp>
36 #ifdef _MSC_VER
37 #include <unordered_map>
38 typedef int pthread_t;
39 #else
40 #include <tr1/unordered_map>
41 #include <tr1/unordered_set>
42 #include <pthread.h>
43 #endif
44 #include <cerrno>
45 
46 using namespace std;
47 
48 #include <boost/scoped_ptr.hpp>
49 #include <boost/scoped_array.hpp>
50 #include <boost/thread.hpp>
51 using namespace boost;
52 
53 #include "primproc.h"
54 #include "primitiveserver.h"
55 #include "primitivemsg.h"
56 #include "umsocketselector.h"
57 #include "brm.h"
58 using namespace BRM;
59 
60 #include "writeengine.h"
61 
62 #include "messagequeue.h"
63 using namespace messageqcpp;
64 
65 #include "blockrequestprocessor.h"
66 #include "blockcacheclient.h"
67 #include "stats.h"
68 using namespace dbbc;
69 
70 #include "liboamcpp.h"
71 using namespace oam;
72 
73 #include "configcpp.h"
74 using namespace config;
75 
76 #include "bppseeder.h"
77 #include "primitiveprocessor.h"
78 #include "pp_logger.h"
79 using namespace primitives;
80 
81 #include "errorcodes.h"
82 #include "exceptclasses.h"
83 
84 #include "idbcompress.h"
85 using namespace compress;
86 
87 #include "IDBDataFile.h"
88 #include "IDBPolicy.h"
89 using namespace idbdatafile;
90 
91 using namespace threadpool;
92 
93 #include "threadnaming.h"
94 
95 #include "atomicops.h"
96 
97 #ifndef O_BINARY
98 #  define O_BINARY 0
99 #endif
100 #ifndef O_DIRECT
101 #  define O_DIRECT 0
102 #endif
103 #ifndef O_LARGEFILE
104 #  define O_LARGEFILE 0
105 #endif
106 #ifndef O_NOATIME
107 #  define O_NOATIME 0
108 #endif
109 
110 typedef tr1::unordered_set<BRM::OID_t> USOID;
111 
112 // make global for blockcache
113 //
114 static const char* statsName = {"pm"};
115 dbbc::Stats* gPMStatsPtr = 0;
116 bool gPMProfOn = false;
117 uint32_t gSession = 0;
118 #ifndef _MSC_VER
119 dbbc::Stats pmstats(statsName);
120 #endif
121 
122 oam::OamCache* oamCache = oam::OamCache::makeOamCache();
123 
124 //FIXME: there is an anon ns burried later in between 2 named namespaces...
125 namespace primitiveprocessor
126 {
127 boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
128 
129 BlockRequestProcessor** BRPp;
130 #ifndef _MSC_VER
131 dbbc::Stats stats;
132 #endif
133 extern DebugLevel gDebugLevel;
134 BRM::DBRM* brm;
135 int fCacheCount;
136 bool fPMProfOn;
137 bool fLBIDTraceOn;
138 
139 /* params from the config file */
140 uint32_t BPPCount;
141 uint32_t blocksReadAhead;
142 uint32_t defaultBufferSize;
143 uint32_t connectionsPerUM;
144 uint32_t highPriorityThreads;
145 uint32_t medPriorityThreads;
146 uint32_t lowPriorityThreads;
147 int  directIOFlag = O_DIRECT;
148 int  noVB = 0;
149 
150 const uint8_t fMaxColWidth(8);
151 BPPMap bppMap;
152 boost::mutex bppLock;
153 
154 #define DJLOCK_READ 0
155 #define DJLOCK_WRITE 1
156 boost::mutex djMutex;   // lock for djLock, lol.
157 std::map<uint64_t, shared_mutex *> djLock;  // djLock synchronizes destroy and joiner msgs, see bug 2619
158 
159 volatile int32_t asyncCounter;
160 const int asyncMax = 20;	// current number of asynchronous loads
161 
162 struct preFetchCond
163 {
164     //uint64_t lbid;
165     boost::condition cond;
166     unsigned waiters;
167 
preFetchCondprimitiveprocessor::preFetchCond168     preFetchCond(const uint64_t l)
169     {
170         waiters = 0;
171     }
172 
~preFetchCondprimitiveprocessor::preFetchCond173     ~preFetchCond() { }
174 };
175 
176 typedef preFetchCond preFetchBlock_t;
177 typedef std::tr1::unordered_map<uint64_t, preFetchBlock_t*> pfBlockMap_t;
178 typedef std::tr1::unordered_map<uint64_t, preFetchBlock_t*>::iterator pfBlockMapIter_t;
179 
180 pfBlockMap_t pfBlockMap;
181 boost::mutex pfbMutex; // = PTHREAD_MUTEX_INITIALIZER;
182 
183 pfBlockMap_t pfExtentMap;
184 boost::mutex pfMutex; // = PTHREAD_MUTEX_INITIALIZER;
185 
186 map<uint32_t, boost::shared_ptr<DictEqualityFilter> > dictEqualityFilters;
187 boost::mutex eqFilterMutex;
188 
cacheNum(uint64_t lbid)189 uint32_t cacheNum(uint64_t lbid)
190 {
191     return ( lbid / brm->getExtentSize() ) % fCacheCount;
192 }
193 
buildOidFileName(const BRM::OID_t oid,const uint16_t dbRoot,const uint16_t partNum,const uint32_t segNum,char * file_name)194 void buildOidFileName(const BRM::OID_t oid,
195                       const uint16_t dbRoot,
196                       const uint16_t partNum,
197                       const uint32_t segNum,
198                       char* file_name)
199 {
200     WriteEngine::FileOp fileOp(false);
201 
202     if (fileOp.getFileName(oid, file_name, dbRoot, partNum, segNum) != WriteEngine::NO_ERROR)
203     {
204         file_name[0] = 0;
205         throw std::runtime_error("fileOp.getFileName failed");
206     }
207 
208     //cout << "Oid2Filename o: " << oid << " n: " << file_name << endl;
209 }
210 
211 
waitForRetry(long count)212 void waitForRetry(long count)
213 {
214     timespec ts;
215     ts.tv_sec = 5L * count / 10L;
216     ts.tv_nsec = (5L * count % 10L) * 100000000L;
217 #ifdef _MSC_VER
218     Sleep(ts.tv_sec * 1000 + ts.tv_nsec / 1000 / 1000);
219 #else
220     nanosleep(&ts, 0);
221 #endif
222 }
223 
224 
prefetchBlocks(const uint64_t lbid,const int compType,uint32_t * rCount)225 void prefetchBlocks(const uint64_t lbid,
226                     const int compType,
227                     uint32_t* rCount)
228 {
229     uint16_t dbRoot;
230     uint32_t partNum;
231     uint16_t segNum;
232     uint32_t hwm;
233     uint32_t fbo;
234     uint32_t lowfbo;
235     uint32_t highfbo;
236     BRM::OID_t oid;
237     pfBlockMap_t::const_iterator iter;
238     uint64_t lowlbid = (lbid / blocksReadAhead) * blocksReadAhead;
239     blockCacheClient bc(*BRPp[cacheNum(lbid)]);
240     BRM::InlineLBIDRange range;
241     int err;
242 
243     pfbMutex.lock();
244 
245     iter = pfBlockMap.find(lowlbid);
246 
247     if (iter != pfBlockMap.end())
248     {
249         iter->second->waiters++;
250         iter->second->cond.wait(pfbMutex);
251         iter->second->waiters--;
252         pfbMutex.unlock();
253         return;
254     }
255 
256     preFetchBlock_t* pfb = 0;
257     pfb = new preFetchBlock_t(lowlbid);
258 
259     pfBlockMap[lowlbid] = pfb;
260     pfbMutex.unlock();
261 
262     // loadBlock will catch a versioned block so vbflag can be set to false here
263     err = brm->lookupLocal(lbid, 0, false, oid, dbRoot, partNum, segNum, fbo); // need the oid
264 
265     if (err < 0)
266     {
267         cerr << "prefetchBlocks(): BRM lookupLocal failed! Expect more errors.\n";
268         goto cleanup;
269     }
270 
271     // We ignore extState that tells us whether the extent is
272     // an outOfService extent to be ignored.  The filtering for
273     // outOfService extents is done elsewhere.
274     int extState;
275     err = brm->getLocalHWM(oid, partNum, segNum, hwm, extState);
276 
277     if (err < 0)
278     {
279         cerr << "prefetchBlock(): BRM getLocalHWM failed! Expect more errors.\n";
280         goto cleanup;
281     }
282 
283     lowfbo = fbo - (lbid - lowlbid);
284     highfbo = lowfbo + blocksReadAhead - 1;
285     range.start = lowlbid;
286 
287     if (hwm < highfbo)
288         range.size = hwm - lowfbo + 1;
289     else
290         range.size = blocksReadAhead;
291 
292     try
293     {
294         if (range.size > blocksReadAhead)
295         {
296             ostringstream os;
297             os << "Invalid Range from HWM for lbid " << lbid
298                << ", range size should be <= blocksReadAhead: HWM " << hwm
299                << ", dbroot " << dbRoot
300                << ", highfbo " << highfbo << ", lowfbo " << lowfbo
301                << ", blocksReadAhead " << blocksReadAhead
302                << ", range size " << (int) range.size << endl;
303             throw logging::InvalidRangeHWMExcept(os.str());
304         }
305 
306         idbassert(range.size <= blocksReadAhead);
307 
308         bc.check(range, QueryContext(numeric_limits<VER_t>::max()), 0, compType, *rCount);
309     }
310     catch (...)
311     {
312         // Perform necessary cleanup before rethrowing the exception
313         pfb->cond.notify_all();
314 
315         pfbMutex.lock();
316 
317         while (pfb->waiters > 0)
318         {
319             pfbMutex.unlock();
320             //handle race condition with other threads going into wait before the broadcast above
321             pfb->cond.notify_one();
322             usleep(1);
323             pfbMutex.lock();
324         }
325 
326         if (pfBlockMap.erase(lowlbid) > 0)
327             delete pfb;
328 
329         pfb = 0;
330         pfbMutex.unlock();
331         throw;
332     }
333 
334 cleanup:
335     pfb->cond.notify_all();
336 
337     pfbMutex.lock();
338 
339     while (pfb->waiters > 0)
340     {
341         pfbMutex.unlock();
342         //handle race condition with other threads going into wait before the broadcast above
343         pfb->cond.notify_one();
344         usleep(1);
345         pfbMutex.lock();
346     }
347 
348     if (pfBlockMap.erase(lowlbid) > 0)
349         delete pfb;
350 
351     pfb = 0;
352     pfbMutex.unlock();
353 
354 } // prefetchBlocks()
355 
356 // returns the # that were cached.
loadBlocks(LBID_t * lbids,QueryContext qc,VER_t txn,int compType,uint8_t ** bufferPtrs,uint32_t * rCount,bool LBIDTrace,uint32_t sessionID,uint32_t blockCount,bool * blocksWereVersioned,bool doPrefetch,VSSCache * vssCache)357 uint32_t loadBlocks (
358     LBID_t* lbids,
359     QueryContext qc,
360     VER_t txn,
361     int compType,
362     uint8_t** bufferPtrs,
363     uint32_t* rCount,
364     bool LBIDTrace,
365     uint32_t sessionID,
366     uint32_t blockCount,
367     bool* blocksWereVersioned,
368     bool doPrefetch,
369     VSSCache* vssCache)
370 {
371     blockCacheClient bc(*BRPp[cacheNum(lbids[0])]);
372     uint32_t blksRead = 0;
373     VSSCache::iterator it;
374     uint32_t i, ret;
375     bool* vbFlags;
376     int* vssRCs;
377     bool* cacheThisBlock;
378     bool* wasCached;
379 
380     *blocksWereVersioned = false;
381 
382 #ifndef _MSC_VER
383 
384     if (LBIDTrace)
385     {
386         for (i = 0; i < blockCount; i++)
387         {
388 
389             stats.touchedLBID(lbids[i], pthread_self(), sessionID);
390         }
391     }
392 
393 #endif
394     VER_t* vers = (VER_t*) alloca(blockCount * sizeof(VER_t));
395     vbFlags = (bool*) alloca(blockCount);
396     vssRCs = (int*) alloca(blockCount * sizeof(int));
397     cacheThisBlock = (bool*) alloca(blockCount);
398     wasCached = (bool*) alloca(blockCount);
399 
400     for (i = 0; i < blockCount; i++)
401     {
402         if (vssCache)
403         {
404             it = vssCache->find(lbids[i]);
405 
406             if (it != vssCache->end())
407             {
408                 VSSData& vd = it->second;
409                 vers[i] = vd.verID;
410                 vbFlags[i] = vd.vbFlag;
411                 vssRCs[i] = vd.returnCode;
412 
413                 if (vssRCs[i] == ERR_SNAPSHOT_TOO_OLD)
414                     throw runtime_error("Snapshot too old");
415             }
416         }
417 
418         if (!vssCache || it == vssCache->end())
419             vssRCs[i] = brm->vssLookup(lbids[i], qc, txn, &vers[i], &vbFlags[i]);
420 
421         *blocksWereVersioned |= vbFlags[i];
422 
423         // If the block is being modified by this txn, set the useCache flag to false
424         if (txn > 0 && vers[i] == txn && !vbFlags[i])
425             cacheThisBlock[i] = false;
426         else
427             cacheThisBlock[i] = true;
428     }
429 
430     /*
431     cout << "  resolved ver #s: ";
432     for (uint32_t i = 0; i < blockCount; i++)
433     	cout << " <" << vers[i] << ", " << (int) vbFlags[i] << ", " << (int)
434     		cacheThisBlock[i] << ">";
435     cout << endl;
436     */
437 
438     ret = bc.getCachedBlocks(lbids, vers, bufferPtrs, wasCached, blockCount);
439 
440     // Do we want to check any VB flags here?  Initial thought: no, because we have
441     // no idea whether any other blocks in the prefetch range are versioned,
442     // what's the difference if one in the visible range is?
443     if (ret != blockCount && doPrefetch)
444     {
445         prefetchBlocks(lbids[0], compType, &blksRead);
446 
447 #ifndef _MSC_VER
448 
449         if (fPMProfOn)
450             pmstats.markEvent(lbids[0], (pthread_t) - 1, sessionID, 'M');
451 
452 #endif
453         /* After the prefetch they're all cached if they are in the same range, so
454          * prune the block list and try getCachedBlocks again first, then fall back
455          * to single-block IO requests if for some reason they aren't. */
456         uint32_t l_blockCount = 0;
457 
458         for (i = 0; i < blockCount; i++)
459         {
460             if (!wasCached[i])
461             {
462                 lbids[l_blockCount] = lbids[i];
463                 vers[l_blockCount] = vers[i];
464                 bufferPtrs[l_blockCount] = bufferPtrs[i];
465                 vbFlags[l_blockCount] = vbFlags[i];
466                 cacheThisBlock[l_blockCount] = cacheThisBlock[i];
467                 ++l_blockCount;
468             }
469         }
470 
471         ret += bc.getCachedBlocks(lbids, vers, bufferPtrs, wasCached, l_blockCount);
472 
473         if (ret != blockCount)
474         {
475             for (i = 0; i < l_blockCount; i++)
476                 if (!wasCached[i])
477                 {
478                     bool ver;
479 
480                     qc.currentScn = vers[i];
481                     bc.getBlock(lbids[i], qc, txn, compType, (void*) bufferPtrs[i],
482                                 vbFlags[i], wasCached[i], &ver, cacheThisBlock[i], false);
483                     *blocksWereVersioned |= ver;
484                     blksRead++;
485                 }
486         }
487     }
488     /* Some blocks weren't cached, prefetch is disabled -> issue single-block IO requests,
489      * skip checking the cache again. */
490     else if (ret != blockCount)
491     {
492         for (i = 0; i < blockCount; i++)
493         {
494             if (!wasCached[i])
495             {
496                 bool ver;
497 
498                 qc.currentScn = vers[i];
499                 bc.getBlock(lbids[i], qc, txn, compType, (void*) bufferPtrs[i], vbFlags[i],
500                             wasCached[i], &ver, cacheThisBlock[i], false);
501                 *blocksWereVersioned |= ver;
502                 blksRead++;
503             }
504         }
505     }
506 
507     if (rCount)
508         *rCount = blksRead;
509 
510     //if (*verBlocks)
511     //	cout << "loadBlock says there were versioned blocks\n";
512     return ret;
513 }
514 
loadBlock(uint64_t lbid,QueryContext v,uint32_t t,int compType,void * bufferPtr,bool * pWasBlockInCache,uint32_t * rCount,bool LBIDTrace,uint32_t sessionID,bool doPrefetch,VSSCache * vssCache)515 void loadBlock (
516     uint64_t lbid,
517     QueryContext v,
518     uint32_t t,
519     int compType,
520     void* bufferPtr,
521     bool* pWasBlockInCache,
522     uint32_t* rCount,
523     bool LBIDTrace,
524     uint32_t sessionID,
525     bool doPrefetch,
526     VSSCache* vssCache)
527 {
528     bool flg = false;
529     BRM::OID_t oid;
530     BRM::VER_t txn = (BRM::VER_t)t;
531     uint16_t dbRoot = 0;
532     uint32_t partitionNum = 0;
533     uint16_t segmentNum = 0;
534     int rc;
535     BRM::VER_t ver;
536     blockCacheClient bc(*BRPp[cacheNum(lbid)]);
537     char file_name[WriteEngine::FILE_NAME_SIZE] = {0};
538     char* fileNamePtr = file_name;
539     uint32_t blksRead = 0;
540     VSSCache::iterator it;
541 
542 #ifndef _MSC_VER
543 
544     if (LBIDTrace)
545         stats.touchedLBID(lbid, pthread_self(), sessionID);
546 
547 #endif
548 
549     if (vssCache)
550     {
551         it = vssCache->find(lbid);
552 
553         if (it != vssCache->end())
554         {
555             VSSData& vd = it->second;
556             ver = vd.verID;
557             flg	= vd.vbFlag;
558             rc = vd.returnCode;
559         }
560     }
561 
562     if (!vssCache || it == vssCache->end())
563         rc = brm->vssLookup((BRM::LBID_t)lbid, v, txn, &ver, &flg);
564 
565     v.currentScn = ver;
566     //cout << "VSS l/u: l=" << lbid << " v=" << ver << " t=" << txn << " flg=" << flg << " rc: " << rc << endl;
567 
568     // if this block is locked by this session, don't cache it, just read it directly from disk
569     if (txn > 0 && ver == txn && !flg && !noVB)
570     {
571         uint64_t offset;
572         uint32_t fbo;
573         boost::scoped_array<uint8_t> newBufferSa;
574         boost::scoped_array<char> cmpHdrBufSa;
575         boost::scoped_array<char> cmpBufSa;
576         boost::scoped_array<unsigned char> uCmpBufSa;
577 
578 
579 
580         ptrdiff_t alignedBuffer = 0;
581         void* readBufferPtr = NULL;
582         char* cmpHdrBuf = NULL;
583         char* cmpBuf = NULL;
584         unsigned char* uCmpBuf = NULL;
585         uint64_t cmpBufLen = 0;
586         int blockReadRetryCount = 0;
587         unsigned idx = 0;
588         int pageSize = getpagesize();
589         IDBDataFile* fp = 0;
590 
591         try
592         {
593             rc = brm->lookupLocal((BRM::LBID_t)lbid,
594                                   ver,
595                                   flg,
596                                   oid,
597                                   dbRoot,
598                                   partitionNum,
599                                   segmentNum,
600                                   fbo);
601 
602 
603             // load the block
604             buildOidFileName(oid, dbRoot, partitionNum, segmentNum, fileNamePtr);
605             int opts = directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
606             fp = IDBDataFile::open(
607                      IDBPolicy::getType( fileNamePtr, IDBPolicy::PRIMPROC ),
608                      fileNamePtr,
609                      "r",
610                      opts);
611 
612             if ( fp == NULL )
613             {
614                 int errCode = errno;
615                 SUMMARY_INFO2("open failed: ", fileNamePtr);
616                 char errbuf[80];
617                 string errMsg;
618 #ifdef __linux__
619 //#if STRERROR_R_CHAR_P
620                 const char* p;
621 
622                 if ((p = strerror_r(errCode, errbuf, 80)) != 0)
623                     errMsg = p;
624 
625 #else
626                 int p;
627 
628                 if ((p = strerror_r(errCode, errbuf, 80)) == 0)
629                     errMsg = errbuf;
630 
631 #endif
632 
633 #ifndef _MSC_VER
634 
635                 if (errCode == EINVAL)
636                 {
637                     throw logging::IDBExcept(logging::IDBErrorInfo::instance()->
638                                              errorMsg(logging::ERR_O_DIRECT), logging::ERR_O_DIRECT);
639                 }
640 
641 #endif
642                 string errStr(fileNamePtr);
643                 errStr += ": open: ";
644                 errStr += errMsg;
645                 throw std::runtime_error(errStr);
646             }
647 
648             //  fd >= 0 must be true, otherwise above exception thrown.
649             offset = (uint64_t)fbo * (uint64_t)DATA_BLOCK_SIZE;
650             idx = offset / (4 * 1024 * 1024);
651 
652             errno = 0;
653             rc = 0;
654             int i = -1;
655 
656 
657             if (compType == 0)
658             {
659                 newBufferSa.reset(new uint8_t[DATA_BLOCK_SIZE + pageSize]);
660                 alignedBuffer = (ptrdiff_t) newBufferSa.get();
661 
662                 if ((alignedBuffer % pageSize) != 0)
663                 {
664                     alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
665                     alignedBuffer += pageSize;
666                 }
667 
668                 readBufferPtr = (void*) alignedBuffer;
669                 i = fp->pread( readBufferPtr, offset, DATA_BLOCK_SIZE );
670                 memcpy(bufferPtr, readBufferPtr, i);
671 #ifdef IDB_COMP_POC_DEBUG
672                 {
673                     boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
674                     cout << "pread2(" << fd << ", 0x" << hex << (ptrdiff_t)readBufferPtr << dec << ", " << DATA_BLOCK_SIZE << ", " << offset << ") = " << i << endl;
675                 }
676 #endif // IDB_COMP_POC_DEBUG
677             }  // if (compType == 0)
678             else    // if (compType != 0)
679             {
680 // retry if file is out of sync -- compressed column file only.
681 blockReadRetry:
682 
683                 uCmpBufSa.reset(new unsigned char[4 * 1024 * 1024 + 4]);
684                 uCmpBuf = uCmpBufSa.get();
685                 cmpHdrBufSa.reset(new char[4096 * 3 + pageSize]);
686                 alignedBuffer = (ptrdiff_t) cmpHdrBufSa.get();
687 
688                 if ((alignedBuffer % pageSize) != 0)
689                 {
690                     alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
691                     alignedBuffer += pageSize;
692                 }
693 
694                 cmpHdrBuf = (char*) alignedBuffer;
695 
696                 i = fp->pread( &cmpHdrBuf[0], 0, 4096 * 3);
697 
698                 CompChunkPtrList ptrList;
699                 IDBCompressInterface decompressor;
700                 int dcrc = 0;
701 
702                 if (i == 4096 * 3)
703                 {
704                     uint64_t numHdrs = 0; // extra headers
705                     dcrc = decompressor.getPtrList(&cmpHdrBuf[4096], 4096, ptrList);
706 
707                     if (dcrc == 0 && ptrList.size() > 0)
708                         numHdrs = ptrList[0].first / 4096ULL - 2ULL;
709 
710                     if (numHdrs > 0)
711                     {
712                         boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + pageSize]);
713                         alignedBuffer = (ptrdiff_t) nextHdrBufsa.get();
714 
715                         if ((alignedBuffer % pageSize) != 0)
716                         {
717                             alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
718                             alignedBuffer += pageSize;
719                         }
720 
721                         char* nextHdrBufPtr = (char*) alignedBuffer;
722 
723                         i = fp->pread( &nextHdrBufPtr[0], 4096 * 2, numHdrs * 4096 );
724 
725                         CompChunkPtrList nextPtrList;
726                         dcrc = decompressor.getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
727 
728                         if (dcrc == 0)
729                             ptrList.insert(ptrList.end(), nextPtrList.begin(), nextPtrList.end());
730                     }
731                 }
732 
733                 if (dcrc != 0 || idx >= ptrList.size())
734                 {
735                     // Due to race condition, the header on disk may not upated yet.
736                     // Log an info message and retry.
737                     if (blockReadRetryCount == 0)
738                     {
739                         logging::Message::Args args;
740                         args.add(oid);
741                         ostringstream infoMsg;
742                         infoMsg << "retry read from " << fileNamePtr << ". dcrc=" << dcrc
743                                 << ", idx=" << idx << ", ptr.size=" << ptrList.size();
744                         args.add(infoMsg.str());
745                         mlp->logInfoMessage(logging::M0061, args);
746                     }
747 
748                     if (++blockReadRetryCount < 30)
749                     {
750                         waitForRetry(blockReadRetryCount);
751                         goto blockReadRetry;
752                     }
753                     else
754                     {
755                         rc = -1004;
756                     }
757                 }
758 
759                 if (rc == 0)
760                 {
761                     unsigned cmpBlkOff = offset % (4 * 1024 * 1024);
762                     uint64_t cmpBufOff = ptrList[idx].first;
763                     uint64_t cmpBufSz = ptrList[idx].second;
764 
765                     if (cmpBufSa.get() == NULL || cmpBufLen < cmpBufSz)
766                     {
767                         cmpBufSa.reset(new char[cmpBufSz + pageSize]);
768                         cmpBufLen = cmpBufSz;
769                         alignedBuffer = (ptrdiff_t) cmpBufSa.get();
770 
771                         if ((alignedBuffer % pageSize) != 0)
772                         {
773                             alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
774                             alignedBuffer += pageSize;
775                         }
776 
777                         cmpBuf = (char*) alignedBuffer;
778                     }
779 
780                     unsigned blen = 4 * 1024 * 1024;
781 
782                     i = fp->pread( cmpBuf, cmpBufOff, cmpBufSz );
783 
784                     dcrc = decompressor.uncompressBlock(cmpBuf, cmpBufSz, uCmpBuf, blen);
785 
786                     if (dcrc == 0)
787                     {
788                         memcpy(bufferPtr, &uCmpBuf[cmpBlkOff], DATA_BLOCK_SIZE);
789                     }
790                     else
791                     {
792                         // Due to race condition, the header on disk may not upated yet.
793                         // Log an info message and retry.
794                         if (blockReadRetryCount == 0)
795                         {
796                             logging::Message::Args args;
797                             args.add(oid);
798                             ostringstream infoMsg;
799                             infoMsg << "retry read from " << fileNamePtr << ". dcrc=" << dcrc
800                                     << ", idx=" << idx << ", ptr.size=" << ptrList.size();
801                             args.add(infoMsg.str());
802                             mlp->logInfoMessage(logging::M0061, args);
803                         }
804 
805                         if (++blockReadRetryCount < 30)
806                         {
807                             waitForRetry(blockReadRetryCount);
808                             goto blockReadRetry;
809                         }
810                         else
811                         {
812                             rc = -1006;
813                         }
814                     }
815                 }
816             }
817 
818             if ( rc < 0 )
819             {
820                 string msg("pread failed");
821                 ostringstream infoMsg;
822                 infoMsg << " rc:" << rc << ")";
823                 msg = msg + ", error:" + strerror(errno) + infoMsg.str();
824                 SUMMARY_INFO(msg);
825                 //FIXME: free-up allocated memory!
826                 throw std::runtime_error(msg);
827             }
828         }
829         catch (...)
830         {
831             delete fp;
832             fp = 0;
833             throw;
834         }
835 
836         delete fp;
837         fp = 0;
838 
839         // log the retries
840         if (blockReadRetryCount > 0)
841         {
842             logging::Message::Args args;
843             args.add(oid);
844             ostringstream infoMsg;
845             infoMsg << "Successfully uncompress " << fileNamePtr << " chunk "
846                     << idx << " @" << " blockReadRetry:" << blockReadRetryCount;
847             args.add(infoMsg.str());
848             mlp->logInfoMessage(logging::M0006, args);
849         }
850 
851         if (pWasBlockInCache)
852             *pWasBlockInCache = false;
853 
854         if (rCount)
855             *rCount = 1;
856 
857         return;
858     }
859 
860     FileBuffer* fbPtr = 0;
861     bool wasBlockInCache = false;
862 
863     fbPtr = bc.getBlockPtr(lbid, ver, flg);
864 
865     if (fbPtr)
866     {
867         memcpy(bufferPtr, fbPtr->getData(), BLOCK_SIZE);
868         wasBlockInCache = true;
869     }
870 
871     if (doPrefetch && !wasBlockInCache && !flg)
872     {
873         prefetchBlocks(lbid, compType, &blksRead);
874 
875 #ifndef _MSC_VER
876 
877         if (fPMProfOn)
878             pmstats.markEvent(lbid, (pthread_t) - 1, sessionID, 'M');
879 
880 #endif
881         bc.getBlock(lbid, v, txn, compType, (uint8_t*) bufferPtr, flg, wasBlockInCache);
882 
883         if (!wasBlockInCache)
884             blksRead++;
885     }
886     else if (!wasBlockInCache)
887     {
888         bc.getBlock(lbid, v, txn, compType, (uint8_t*) bufferPtr, flg, wasBlockInCache);
889 
890         if (!wasBlockInCache)
891             blksRead++;
892     }
893 
894     if (pWasBlockInCache)
895         *pWasBlockInCache = wasBlockInCache;
896 
897     if (rCount)
898         *rCount = blksRead;
899 
900 }
901 
902 struct AsynchLoader
903 {
AsynchLoaderprimitiveprocessor::AsynchLoader904     AsynchLoader(uint64_t l,
905                  const QueryContext& v,
906                  uint32_t t,
907                  int ct,
908                  uint32_t* cCount,
909                  uint32_t* rCount,
910                  bool trace,
911                  uint32_t sesID,
912                  boost::mutex* m,
913                  uint32_t* loaderCount,
914                  boost::shared_ptr<BPPSendThread> st,	// sendThread for abort upon exception.
915                  VSSCache* vCache) :
916         lbid(l),
917         ver(v),
918         txn(t),
919         compType(ct),
920         LBIDTrace(trace),
921         sessionID(sesID),
922         cacheCount(cCount),
923         readCount(rCount),
924         busyLoaders(loaderCount),
925         mutex(m),
926         sendThread(st),
927         vssCache(vCache)
928     { }
929 
operator ()primitiveprocessor::AsynchLoader930     void operator()()
931     {
932         utils::setThreadName("PPAsyncLoader");
933         bool cached = false;
934         uint32_t rCount = 0;
935         char buf[BLOCK_SIZE];
936 
937         //cout << "asynch started " << pthread_self() << " l: " << lbid << endl;
938         try
939         {
940             loadBlock(lbid, ver, txn, compType, buf, &cached, &rCount, LBIDTrace, true, vssCache);
941         }
942         catch (std::exception& ex)
943         {
944             sendThread->abort();
945             cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl;
946             idbassert(asyncCounter > 0);
947             (void)atomicops::atomicDec(&asyncCounter);
948             mutex->lock();
949             --(*busyLoaders);
950             mutex->unlock();
951             logging::Message::Args args;
952             args.add(string("PrimProc AsyncLoader caught error: "));
953             args.add(ex.what());
954             primitiveprocessor::mlp->logMessage(logging::M0000, args, false);
955             return;
956         }
957         catch (...)
958         {
959             sendThread->abort();
960             cerr << "AsynchLoader caught unknown exception: " << endl;
961             //FIXME Use a locked processor primitive?
962             idbassert(asyncCounter > 0);
963             (void)atomicops::atomicDec(&asyncCounter);
964             mutex->lock();
965             --(*busyLoaders);
966             mutex->unlock();
967             logging::Message::Args args;
968             args.add(string("PrimProc AsyncLoader caught unknown error"));
969             primitiveprocessor::mlp->logMessage(logging::M0000, args, false);
970             return;
971         }
972 
973         idbassert(asyncCounter > 0);
974         (void)atomicops::atomicDec(&asyncCounter);
975         mutex->lock();
976 
977         if (cached)
978             (*cacheCount)++;
979 
980         *readCount += rCount;
981         --(*busyLoaders);
982         mutex->unlock();
983 // 			cerr << "done\n";
984     }
985 
986 private:
987     uint64_t lbid;
988     QueryContext ver;
989     uint32_t txn;
990     int compType;
991     uint8_t dataWidth;
992     bool LBIDTrace;
993     uint32_t sessionID;
994     uint32_t* cacheCount;
995     uint32_t* readCount;
996     uint32_t* busyLoaders;
997     boost::mutex* mutex;
998     boost::shared_ptr<BPPSendThread> sendThread;
999     VSSCache* vssCache;
1000 };
1001 
loadBlockAsync(uint64_t lbid,const QueryContext & c,uint32_t txn,int compType,uint32_t * cCount,uint32_t * rCount,bool LBIDTrace,uint32_t sessionID,boost::mutex * m,uint32_t * busyLoaders,boost::shared_ptr<BPPSendThread> sendThread,VSSCache * vssCache)1002 void loadBlockAsync(uint64_t lbid,
1003                     const QueryContext& c,
1004                     uint32_t txn,
1005                     int compType,
1006                     uint32_t* cCount,
1007                     uint32_t* rCount,
1008                     bool LBIDTrace,
1009                     uint32_t sessionID,
1010                     boost::mutex* m,
1011                     uint32_t* busyLoaders,
1012                     boost::shared_ptr<BPPSendThread> sendThread,		// sendThread for abort upon exception.
1013                     VSSCache* vssCache)
1014 {
1015     blockCacheClient bc(*BRPp[cacheNum(lbid)]);
1016     bool vbFlag;
1017     BRM::VER_t ver;
1018     VSSCache::iterator it;
1019 
1020     if (vssCache)
1021     {
1022         it = vssCache->find(lbid);
1023 
1024         if (it != vssCache->end())
1025         {
1026             //cout << "async: vss cache hit on " << lbid << endl;
1027             VSSData& vd = it->second;
1028             ver = vd.verID;
1029             vbFlag = vd.vbFlag;
1030         }
1031     }
1032 
1033     if (!vssCache || it == vssCache->end())
1034         brm->vssLookup((BRM::LBID_t) lbid, c, txn, &ver, &vbFlag);
1035 
1036     if (bc.exists(lbid, ver))
1037         return;
1038 
1039     /* a quick and easy stand-in for a threadpool for loaders */
1040     atomicops::atomicMb();
1041 
1042     if (asyncCounter >= asyncMax)
1043         return;
1044 
1045     (void)atomicops::atomicInc(&asyncCounter);
1046 
1047     boost::mutex::scoped_lock sl(*m);
1048 
1049     try
1050     {
1051         boost::thread thd(AsynchLoader(lbid, c, txn, compType, cCount, rCount,
1052                                        LBIDTrace, sessionID, m, busyLoaders, sendThread, vssCache));
1053         (*busyLoaders)++;
1054     }
1055     catch (boost::thread_resource_error& e)
1056     {
1057         cerr << "AsynchLoader: caught a thread resource error, need to lower asyncMax\n";
1058         idbassert(asyncCounter > 0);
1059         (void)atomicops::atomicDec(&asyncCounter);
1060     }
1061 }
1062 
1063 } //namespace primitiveprocessor
1064 
1065 //#define DCT_DEBUG 1
1066 #define SETUP_GUARD \
1067 { \
1068 	unsigned char* o = outputp.get(); \
1069 	memset(o, 0xa5, ouput_buf_size*3); \
1070 }
1071 #undef SETUP_GUARD
1072 #define SETUP_GUARD
1073 #define CHECK_GUARD(lbid) \
1074 { \
1075 	unsigned char* o = outputp.get(); \
1076 	for (int i = 0; i < ouput_buf_size; i++) \
1077 	{ \
1078 		if (*o++ != 0xa5) \
1079 		{ \
1080 			cerr << "Buffer underrun on LBID " << lbid << endl; \
1081 			idbassert(0); \
1082 		} \
1083 	} \
1084 	o += ouput_buf_size; \
1085 	for (int i = 0; i < ouput_buf_size; i++) \
1086 	{ \
1087 		if (*o++ != 0xa5) \
1088 		{ \
1089 			cerr << "Buffer overrun on LBID " << lbid << endl; \
1090 			idbassert(0); \
1091 		} \
1092 	} \
1093 }
1094 #undef CHECK_GUARD
1095 #define CHECK_GUARD(x)
1096 
1097 namespace
1098 {
1099 using namespace primitiveprocessor;
1100 
1101 /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
1102  * TODO: Move this & the impl into different files
1103  */
1104 class DictScanJob : public threadpool::PriorityThreadPool::Functor
1105 {
1106 public:
1107     DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
1108     virtual ~DictScanJob();
1109 
1110     void write(const ByteStream&);
1111     int operator()();
1112     void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr);
1113     void sendErrorMsg(uint32_t id, uint16_t code);
1114 
1115 private:
1116     SP_UM_IOSOCK fIos;
1117     SBS fByteStream;
1118     SP_UM_MUTEX fWriteLock;
1119     posix_time::ptime dieTime;
1120 };
1121 
DictScanJob(SP_UM_IOSOCK ios,SBS bs,SP_UM_MUTEX writeLock)1122 DictScanJob::DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock) :
1123     fIos(ios), fByteStream(bs), fWriteLock(writeLock)
1124 {
1125     dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
1126 }
1127 
~DictScanJob()1128 DictScanJob::~DictScanJob()
1129 {
1130 }
1131 
write(const ByteStream & bs)1132 void DictScanJob::write(const ByteStream& bs)
1133 {
1134     boost::mutex::scoped_lock lk(*fWriteLock);
1135     fIos->write(bs);
1136 }
1137 
operator ()()1138 int DictScanJob::operator()()
1139 {
1140     utils::setThreadName("PPDictScanJob");
1141     uint8_t data[DATA_BLOCK_SIZE];
1142     uint32_t output_buf_size = MAX_BUFFER_SIZE;
1143     uint32_t session;
1144     uint32_t uniqueId = 0;
1145     bool wasBlockInCache = false;
1146     uint32_t blocksRead = 0;
1147     uint16_t runCount;
1148 
1149 
1150     boost::shared_ptr<DictEqualityFilter> eqFilter;
1151     ByteStream results(output_buf_size);
1152     TokenByScanRequestHeader* cmd;
1153     PrimitiveProcessor pproc(gDebugLevel);
1154     TokenByScanResultHeader* output;
1155     QueryContext verInfo;
1156 
1157     try
1158     {
1159 #ifdef DCT_DEBUG
1160         DebugLevel oldDebugLevel = gDebugLevel;
1161         gDebugLevel = VERBOSE;
1162 #endif
1163         fByteStream->advance(sizeof(TokenByScanRequestHeader));
1164         *fByteStream >> verInfo;
1165         cmd = (TokenByScanRequestHeader*) fByteStream->buf();
1166 
1167         session = cmd->Hdr.SessionID;
1168         uniqueId = cmd->Hdr.UniqueID;
1169         runCount = cmd->Count;
1170         output = (TokenByScanResultHeader*) results.getInputPtr();
1171 #ifdef VALGRIND
1172         memset(output, 0, sizeof(TokenByScanResultHeader));
1173 #endif
1174 
1175         /* Grab the equality filter if one is specified */
1176         if (cmd->flags & HAS_EQ_FILTER)
1177         {
1178             boost::mutex::scoped_lock sl(eqFilterMutex);
1179             map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it;
1180             it = dictEqualityFilters.find(uniqueId);
1181 
1182             if (it != dictEqualityFilters.end())
1183                 eqFilter = it->second;
1184 
1185             sl.unlock();
1186 
1187             if (!eqFilter)
1188             {
1189                 if (posix_time::second_clock::universal_time() < dieTime)
1190                 {
1191                     fByteStream->rewind();
1192                     return -1;    // it's still being built, wait for it
1193                 }
1194                 else
1195                     return 0;   // was probably aborted, go away...
1196             }
1197         }
1198 
1199         for (uint16_t i = 0; i < runCount; ++i)
1200         {
1201             loadBlock(cmd->LBID,
1202                       verInfo,
1203                       cmd->Hdr.TransactionID,
1204                       cmd->CompType,
1205                       data,
1206                       &wasBlockInCache,
1207                       &blocksRead,
1208                       fLBIDTraceOn,
1209                       session);
1210             pproc.setBlockPtr((int*) data);
1211             pproc.p_TokenByScan(cmd, output, output_buf_size, eqFilter);
1212 
1213             if (wasBlockInCache)
1214                 output->CacheIO++;
1215             else
1216                 output->PhysicalIO += blocksRead;
1217 
1218             results.advanceInputPtr(output->NBYTES);
1219             write(results);
1220             results.restart();
1221             cmd->LBID++;
1222         }
1223 
1224 #ifdef DCT_DEBUG
1225         gDebugLevel = oldDebugLevel;
1226 #endif
1227     }
1228     catch (logging::IDBExcept& iex)
1229     {
1230         cerr << "DictScanJob caught an IDBException: " << iex.what() << endl;
1231         catchHandler(iex.what(), uniqueId, iex.errorCode());
1232     }
1233     catch (std::exception& re)
1234     {
1235         cerr << "DictScanJob caught an exception: " << re.what() << endl;
1236         catchHandler(re.what(), uniqueId);
1237     }
1238     catch (...)
1239     {
1240         string msg("Unknown exception caught in DictScanJob.");
1241         cerr << msg << endl;
1242         catchHandler(msg, uniqueId);
1243     }
1244 
1245     return 0;
1246 }
1247 
catchHandler(const string & ex,uint32_t id,uint16_t code)1248 void DictScanJob::catchHandler(const string& ex, uint32_t id, uint16_t code)
1249 {
1250     Logger log;
1251     log.logMessage(ex);
1252     sendErrorMsg(id, code);
1253 }
1254 
sendErrorMsg(uint32_t id,uint16_t code)1255 void DictScanJob::sendErrorMsg(uint32_t id, uint16_t code)
1256 {
1257     ISMPacketHeader ism;
1258     PrimitiveHeader ph;
1259     ism.Status =  code;
1260     ph.UniqueID = id;
1261 
1262     ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
1263     msg.append((uint8_t*) &ism, sizeof(ism));
1264     msg.append((uint8_t*) &ph, sizeof(ph));
1265 
1266     write(msg);
1267 }
1268 
1269 struct BPPHandler
1270 {
BPPHandler__anon59b59a2d0111::BPPHandler1271     BPPHandler(PrimitiveServer* ps) : fPrimitiveServerPtr(ps) { }
1272 
1273     // Keep a list of keys so that if connection fails we don't leave BPP
1274     // threads lying around
1275     std::vector<uint32_t> bppKeys;
1276     std::vector<uint32_t>::iterator bppKeysIt;
1277 
~BPPHandler__anon59b59a2d0111::BPPHandler1278     ~BPPHandler()
1279     {
1280         boost::mutex::scoped_lock scoped(bppLock);
1281 
1282         for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt)
1283         {
1284             uint32_t key = *bppKeysIt;
1285             BPPMap::iterator it;
1286 
1287             it = bppMap.find(key);
1288 
1289             if (it != bppMap.end())
1290             {
1291                 it->second->abort();
1292                 bppMap.erase(it);
1293             }
1294 
1295             fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
1296             OOBPool->removeJobs(key);
1297         }
1298 
1299         scoped.unlock();
1300     }
1301 
1302     struct BPPHandlerFunctor : public PriorityThreadPool::Functor
1303     {
BPPHandlerFunctor__anon59b59a2d0111::BPPHandler::BPPHandlerFunctor1304         BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
1305         {
1306             rt = r;
1307             dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
1308         }
1309 
1310         boost::shared_ptr<BPPHandler> rt;
1311         SBS bs;
1312         posix_time::ptime dieTime;
1313     };
1314 
1315     struct LastJoiner : public BPPHandlerFunctor
1316     {
LastJoiner__anon59b59a2d0111::BPPHandler::LastJoiner1317         LastJoiner(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::LastJoiner1318         int operator()()
1319         {
1320             utils::setThreadName("PPHandLastJoiner");
1321             return rt->lastJoinerMsg(*bs, dieTime);
1322         }
1323     };
1324 
1325     struct Create : public BPPHandlerFunctor
1326     {
Create__anon59b59a2d0111::BPPHandler::Create1327         Create(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::Create1328         int operator()()
1329         {
1330             utils::setThreadName("PPHandCreate");
1331             rt->createBPP(*bs);
1332             return 0;
1333         }
1334     };
1335 
1336     struct Destroy : public BPPHandlerFunctor
1337     {
Destroy__anon59b59a2d0111::BPPHandler::Destroy1338         Destroy(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::Destroy1339         int operator()()
1340         {
1341             utils::setThreadName("PPHandDestroy");
1342             return rt->destroyBPP(*bs, dieTime);
1343         }
1344     };
1345 
1346     struct AddJoiner : public BPPHandlerFunctor
1347     {
AddJoiner__anon59b59a2d0111::BPPHandler::AddJoiner1348         AddJoiner(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::AddJoiner1349         int operator()()
1350         {
1351             utils::setThreadName("PPHandAddJoiner");
1352             return rt->addJoinerToBPP(*bs, dieTime);
1353         }
1354     };
1355 
1356     struct Abort : public BPPHandlerFunctor
1357     {
Abort__anon59b59a2d0111::BPPHandler::Abort1358         Abort(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::Abort1359         int operator()()
1360         {
1361             utils::setThreadName("PPHandAbort");
1362             return rt->doAbort(*bs, dieTime);
1363         }
1364     };
1365 
doAbort__anon59b59a2d0111::BPPHandler1366     int doAbort(ByteStream& bs, const posix_time::ptime& dieTime)
1367     {
1368         uint32_t key;
1369         BPPMap::iterator it;
1370 
1371         try
1372         {
1373             bs.advance(sizeof(ISMPacketHeader));
1374             bs >> key;
1375         }
1376         catch (...)
1377         {
1378             // MCOL-857 We don't have the full packet yet
1379             bs.rewind();
1380             return -1;
1381         }
1382 
1383         boost::mutex::scoped_lock scoped(bppLock);
1384         bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key);
1385 
1386         if (bppKeysIt != bppKeys.end())
1387         {
1388             bppKeys.erase(bppKeysIt);
1389         }
1390 
1391         it = bppMap.find(key);
1392 
1393         if (it != bppMap.end())
1394         {
1395             it->second->abort();
1396             bppMap.erase(it);
1397         }
1398         else
1399         {
1400             bs.rewind();
1401 
1402             if (posix_time::second_clock::universal_time() > dieTime)
1403                 return 0;
1404             else
1405                 return -1;
1406         }
1407 
1408         scoped.unlock();
1409         fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
1410         OOBPool->removeJobs(key);
1411         return 0;
1412     }
1413 
doAck__anon59b59a2d0111::BPPHandler1414     int doAck(ByteStream& bs)
1415     {
1416         uint32_t key;
1417         int16_t msgCount;
1418         SBPPV bpps;
1419         const ISMPacketHeader* ism = (const ISMPacketHeader*) bs.buf();
1420 
1421         key = ism->Interleave;
1422         msgCount = (int16_t) ism->Size;
1423         bs.advance(sizeof(ISMPacketHeader));
1424 
1425         bpps = grabBPPs(key);
1426 
1427         if (bpps)
1428         {
1429             bpps->getSendThread()->sendMore(msgCount);
1430             return 0;
1431         }
1432         else
1433             return -1;
1434     }
1435 
createBPP__anon59b59a2d0111::BPPHandler1436     void createBPP(ByteStream& bs)
1437     {
1438         uint32_t i;
1439         uint32_t key, initMsgsLeft;
1440         SBPP bpp;
1441         SBPPV bppv;
1442 
1443         // make the new BPP object
1444         bppv.reset(new BPPV());
1445         bpp.reset(new BatchPrimitiveProcessor(bs, fPrimitiveServerPtr->prefetchThreshold(),
1446                                               bppv->getSendThread(), fPrimitiveServerPtr->ProcessorThreads()));
1447 
1448         if (bs.length() > 0)
1449             bs >> initMsgsLeft;
1450         else
1451         {
1452             initMsgsLeft = -1;
1453         }
1454 
1455         idbassert(bs.length() == 0);
1456         bppv->getSendThread()->sendMore(initMsgsLeft);
1457         bppv->add(bpp);
1458 
1459         // this block of code creates some BPP instances up front for user queries,
1460         // seems to get us slightly better query times
1461         if ((bpp->getSessionID() & 0x80000000) == 0)
1462         {
1463             for (i = 1; i < BPPCount / 8; i++)
1464             {
1465                 SBPP dup = bpp->duplicate();
1466 
1467                 /* Uncomment these lines to verify duplicate().  == op might need updating */
1468 //  					if (*bpp != *dup)
1469 // 	 					cerr << "createBPP: duplicate mismatch at index " << i << endl;
1470 //  					idbassert(*bpp == *dup);
1471                 bppv->add(dup);
1472             }
1473         }
1474 
1475         boost::mutex::scoped_lock scoped(bppLock);
1476         key = bpp->getUniqueID();
1477         bppKeys.push_back(key);
1478         bool newInsert;
1479         newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second;
1480         //cout << "creating BPP # " << key << endl;
1481         scoped.unlock();
1482 
1483         if (!newInsert)
1484         {
1485             if (bpp->getSessionID() & 0x80000000)
1486                 cerr << "warning: createBPP() tried to clobber a BPP with duplicate sessionID & stepID. sessionID=" <<
1487                      (int) (bpp->getSessionID() ^ 0x80000000) << " stepID=" <<
1488                      bpp->getStepID() << " (syscat)" << endl;
1489             else
1490                 cerr << "warning: createBPP() tried to clobber a BPP with duplicate sessionID & stepID.  sessionID=" <<
1491                      bpp->getSessionID() << " stepID=" << bpp->getStepID() << endl;
1492         }
1493     }
1494 
grabBPPs__anon59b59a2d0111::BPPHandler1495     inline SBPPV grabBPPs(uint32_t uniqueID)
1496     {
1497         BPPMap::iterator it;
1498         /*
1499         		uint32_t failCount = 0;
1500         		uint32_t maxFailCount = (fatal ? 500 : 5000);
1501         */
1502         SBPPV ret;
1503 
1504         boost::mutex::scoped_lock scoped(bppLock);
1505         it = bppMap.find(uniqueID);
1506 
1507         if (it != bppMap.end())
1508             return it->second;
1509         else
1510             return SBPPV();
1511 
1512         /*
1513         		do
1514         		{
1515         			if (++failCount == maxFailCount) {
1516         				//cout << "grabBPPs couldn't find the BPPs for " << uniqueID << endl;
1517         				return ret;
1518         				//throw logic_error("grabBPPs couldn't find the unique ID");
1519         			}
1520         			scoped.unlock();
1521         			usleep(5000);
1522         			scoped.lock();
1523         			it = bppMap.find(uniqueID);
1524         		} while (it == bppMap.end());
1525 
1526         		ret = it->second;
1527         		return ret;
1528         */
1529     }
1530 
getDJLock__anon59b59a2d0111::BPPHandler1531     inline shared_mutex & getDJLock(uint32_t uniqueID)
1532     {
1533         boost::mutex::scoped_lock lk(djMutex);
1534         auto it = djLock.find(uniqueID);
1535         if (it != djLock.end())
1536             return *it->second;
1537         else
1538         {
1539             auto ret = djLock.insert(make_pair(uniqueID, new shared_mutex())).first;
1540             return *ret->second;
1541         }
1542     }
1543 
deleteDJLock__anon59b59a2d0111::BPPHandler1544     inline void deleteDJLock(uint32_t uniqueID)
1545     {
1546         boost::mutex::scoped_lock lk(djMutex);
1547         auto it = djLock.find(uniqueID);
1548         if (it != djLock.end())
1549         {
1550             delete it->second;
1551             djLock.erase(it);
1552         }
1553     }
1554 
addJoinerToBPP__anon59b59a2d0111::BPPHandler1555     int addJoinerToBPP(ByteStream& bs, const posix_time::ptime& dieTime)
1556     {
1557         SBPPV bppv;
1558         uint32_t uniqueID;
1559         const uint8_t* buf;
1560 
1561         /* call addToJoiner() on the first BPP */
1562 
1563         buf = bs.buf();
1564         /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
1565         uniqueID = *((const uint32_t*) &buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
1566         bppv = grabBPPs(uniqueID);
1567 
1568         if (bppv)
1569         {
1570             shared_lock<shared_mutex> lk(getDJLock(uniqueID));
1571             bppv->get()[0]->addToJoiner(bs);
1572             return 0;
1573         }
1574         else
1575         {
1576             if (posix_time::second_clock::universal_time() > dieTime)
1577                 return 0;
1578             else
1579                 return -1;
1580         }
1581     }
1582 
lastJoinerMsg__anon59b59a2d0111::BPPHandler1583     int lastJoinerMsg(ByteStream& bs, const posix_time::ptime& dieTime)
1584     {
1585         SBPPV bppv;
1586         uint32_t uniqueID, i;
1587         const uint8_t* buf;
1588         int err;
1589 
1590         /* call endOfJoiner() on the every BPP */
1591 
1592         buf = bs.buf();
1593         /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
1594         uniqueID = *((const uint32_t*) &buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
1595 
1596         bppv = grabBPPs(uniqueID);
1597 
1598         if (!bppv)
1599         {
1600             //cout << "got a lastJoiner msg for an unknown obj " << uniqueID << endl;
1601             if (posix_time::second_clock::universal_time() > dieTime)
1602                 return 0;
1603             else
1604                 return -1;
1605         }
1606 
1607         boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
1608 
1609 
1610         for (i = 0; i < bppv->get().size(); i++)
1611         {
1612             err = bppv->get()[i]->endOfJoiner();
1613 
1614             if (err == -1)
1615             {
1616                 if (posix_time::second_clock::universal_time() > dieTime)
1617                     return 0;
1618                 else
1619                     return -1;
1620             }
1621         }
1622         bppv->get()[0]->doneSendingJoinerData();
1623 
1624         /* Note: some of the duplicate/run/join sync was moved to the BPPV class to do
1625         more intelligent scheduling.  Once the join data is received, BPPV will
1626         start letting jobs run and create more BPP instances on demand. */
1627 
1628         atomicops::atomicMb();  // make sure the joinDataReceived assignment doesn't migrate upward...
1629         bppv->joinDataReceived = true;
1630         return 0;
1631     }
1632 
destroyBPP__anon59b59a2d0111::BPPHandler1633     int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime)
1634     {
1635         uint32_t uniqueID, sessionID, stepID;
1636         BPPMap::iterator it;
1637 
1638         try
1639         {
1640 
1641             bs.advance(sizeof(ISMPacketHeader));
1642             bs >> sessionID;
1643             bs >> stepID;
1644             bs >> uniqueID;
1645         }
1646         catch (...)
1647         {
1648             // MCOL-857 We don't appear to have the full packet yet!
1649             bs.rewind();
1650             return -1;
1651         }
1652 
1653         boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
1654         boost::mutex::scoped_lock scoped(bppLock);
1655 
1656         bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID);
1657 
1658         if (bppKeysIt != bppKeys.end())
1659         {
1660             bppKeys.erase(bppKeysIt);
1661         }
1662 
1663         it = bppMap.find(uniqueID);
1664 
1665         if (it != bppMap.end())
1666         {
1667             boost::shared_ptr<BPPV> bppv = it->second;
1668 
1669             if (bppv->joinDataReceived)
1670             {
1671                 bppv->abort();
1672                 bppMap.erase(it);
1673             }
1674             else
1675             {
1676                 // MCOL-5. On ubuntu, a crash was happening. Checking
1677                 // joinDataReceived here fixes it.
1678                 // We're not ready for a destroy. Reschedule.
1679                 return -1;
1680             }
1681         }
1682         else
1683         {
1684             //cout << "got a destroy for an unknown obj " << uniqueID << endl;
1685             bs.rewind();
1686 
1687             if (posix_time::second_clock::universal_time() > dieTime)
1688             {
1689                 // XXXPAT: going to let this fall through and delete jobs for
1690 				// uniqueID if there are any.  Not clear what the downside is.
1691 				/*
1692                 lk.unlock();
1693                 deleteDJLock(uniqueID);
1694                 return 0;
1695 				*/
1696             }
1697             else
1698                 return -1;
1699         }
1700 
1701 // 			cout << "  destroy: new size is " << bppMap.size() << endl;
1702         /*
1703         		if (sessionID & 0x80000000)
1704         			cerr << "destroyed BPP instances for sessionID " << (int)
1705         			(sessionID ^ 0x80000000) << " stepID "<< stepID << " (syscat)\n";
1706         		else
1707         			cerr << "destroyed BPP instances for sessionID " << sessionID <<
1708         			" stepID "<< stepID << endl;
1709         */
1710         fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID);
1711         OOBPool->removeJobs(uniqueID);
1712         lk.unlock();
1713         deleteDJLock(uniqueID);
1714         return 0;
1715     }
1716 
setBPPToError__anon59b59a2d0111::BPPHandler1717     void setBPPToError(uint32_t uniqueID, const string& error, logging::ErrorCodeValues errorCode)
1718     {
1719         SBPPV bppv;
1720 
1721         bppv = grabBPPs(uniqueID);
1722 
1723         if (!bppv)
1724             return;
1725 
1726         for (uint32_t i = 0; i < bppv->get().size(); i++)
1727             bppv->get()[i]->setError(error, errorCode);
1728 
1729         if (bppv->get().empty() && !bppMap.empty() )
1730             bppMap.begin()->second.get()->get()[0]->setError(error, errorCode);
1731     }
1732 
1733     // Would be good to define the structure of these msgs somewhere...
getUniqueID__anon59b59a2d0111::BPPHandler1734     inline uint32_t getUniqueID(SBS bs, uint8_t command)
1735     {
1736         uint8_t* buf;
1737 
1738         buf = bs->buf();
1739 
1740         switch (command)
1741         {
1742             case BATCH_PRIMITIVE_ABORT:
1743                 return *((uint32_t*) &buf[sizeof(ISMPacketHeader)]);
1744 
1745             case BATCH_PRIMITIVE_ACK:
1746             {
1747                 ISMPacketHeader* ism = (ISMPacketHeader*) buf;
1748                 return ism->Interleave;
1749             }
1750 
1751             case BATCH_PRIMITIVE_ADD_JOINER:
1752             case BATCH_PRIMITIVE_END_JOINER:
1753             case BATCH_PRIMITIVE_DESTROY:
1754                 return *((uint32_t*) &buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
1755 
1756             default:
1757                 return 0;
1758         }
1759     }
1760 
1761     PrimitiveServer* fPrimitiveServerPtr;
1762 };
1763 
1764 class DictionaryOp : public PriorityThreadPool::Functor
1765 {
1766 public:
DictionaryOp(SBS cmd)1767     DictionaryOp(SBS cmd) : bs(cmd)
1768     {
1769         dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
1770     }
1771     virtual int execute() = 0;
operator ()()1772     int operator()()
1773     {
1774         utils::setThreadName("PPDictOp");
1775         int ret;
1776         ret = execute();
1777 
1778         if (ret != 0)
1779         {
1780             bs->rewind();
1781 
1782             if (posix_time::second_clock::universal_time() > dieTime)
1783                 return 0;
1784         }
1785 
1786         return ret;
1787     }
1788 protected:
1789     SBS bs;
1790 private:
1791     posix_time::ptime dieTime;
1792 };
1793 
1794 class CreateEqualityFilter : public DictionaryOp
1795 {
1796 public:
CreateEqualityFilter(SBS cmd)1797     CreateEqualityFilter(SBS cmd) : DictionaryOp(cmd) { }
execute()1798     int execute()
1799     {
1800         createEqualityFilter();
1801         return 0;
1802     }
1803 
1804 private:
createEqualityFilter()1805     void createEqualityFilter()
1806     {
1807         uint32_t uniqueID, count, i, charsetNumber;
1808         string str;
1809         bs->advance(sizeof(ISMPacketHeader));
1810         *bs >> uniqueID;
1811         *bs >> charsetNumber;
1812 
1813         datatypes::Charset cs(charsetNumber);
1814         boost::shared_ptr<DictEqualityFilter> filter(new DictEqualityFilter(cs));
1815 
1816         *bs >> count;
1817 
1818         for (i = 0; i < count; i++)
1819         {
1820             *bs >> str;
1821             filter->insert(str);
1822         }
1823 
1824         boost::mutex::scoped_lock sl(eqFilterMutex);
1825         dictEqualityFilters[uniqueID] = filter;
1826     }
1827 };
1828 
1829 class DestroyEqualityFilter : public DictionaryOp
1830 {
1831 public:
DestroyEqualityFilter(SBS cmd)1832     DestroyEqualityFilter(SBS cmd) : DictionaryOp(cmd) { }
execute()1833     int execute()
1834     {
1835         return destroyEqualityFilter();
1836     }
1837 
destroyEqualityFilter()1838     int destroyEqualityFilter()
1839     {
1840         uint32_t uniqueID;
1841         map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it;
1842 
1843         bs->advance(sizeof(ISMPacketHeader));
1844         *bs >> uniqueID;
1845 
1846         boost::mutex::scoped_lock sl(eqFilterMutex);
1847         it = dictEqualityFilters.find(uniqueID);
1848 
1849         if (it != dictEqualityFilters.end())
1850         {
1851             dictEqualityFilters.erase(it);
1852             return 0;
1853         }
1854         else
1855             return -1;
1856     }
1857 };
1858 
1859 struct ReadThread
1860 {
ReadThread__anon59b59a2d0111::ReadThread1861     ReadThread(const string& serverName, IOSocket& ios, PrimitiveServer* ps) :
1862         fServerName(serverName), fIos(ios), fPrimitiveServerPtr(ps)
1863     {
1864         fBPPHandler.reset(new BPPHandler(ps));
1865     }
1866 
buildCacheOpResp__anon59b59a2d0111::ReadThread1867     const ByteStream buildCacheOpResp(int32_t result)
1868     {
1869         const int msgsize = sizeof(ISMPacketHeader) + sizeof(int32_t);
1870         ByteStream::byte msgbuf[msgsize];
1871         memset(msgbuf, 0, sizeof(ISMPacketHeader));
1872         ISMPacketHeader* hdrp = reinterpret_cast<ISMPacketHeader*>(&msgbuf[0]);
1873         hdrp->Command = CACHE_OP_RESULTS;
1874         int32_t* resp = reinterpret_cast<int32_t*>(&msgbuf[sizeof(ISMPacketHeader)]);
1875         *resp = result;
1876         return ByteStream(msgbuf, msgsize);
1877     }
1878 
1879     /* Message format:
1880      * 	ISMPacketHeader
1881      * 	OID count - 32 bits
1882      *  OID array - 32 bits * count
1883      */
doCacheFlushByOID__anon59b59a2d0111::ReadThread1884     void doCacheFlushByOID(SP_UM_IOSOCK ios, ByteStream& bs)
1885     {
1886         uint8_t* buf = bs.buf();
1887         buf += sizeof(ISMPacketHeader);
1888         uint32_t count = *((uint32_t*) buf);
1889         buf += 4;
1890         uint32_t* oids = (uint32_t*) buf;
1891 
1892         for (int i = 0; i < fCacheCount; i++)
1893         {
1894             blockCacheClient bc(*BRPp[i]);
1895             bc.flushOIDs(oids, count);
1896         }
1897 
1898         ios->write(buildCacheOpResp(0));
1899     }
1900 
1901     /* Message format:
1902     	 * 	ISMPacketHeader
1903     	 * 	Partition count - 32 bits
1904     	 * 	Partition set - sizeof(LogicalPartition) * count
1905     	 * 	OID count - 32 bits
1906     	 * 	OID array - 32 bits * count
1907     */
doCacheFlushByPartition__anon59b59a2d0111::ReadThread1908     void doCacheFlushByPartition(SP_UM_IOSOCK ios, ByteStream& bs)
1909     {
1910         set<BRM::LogicalPartition> partitions;
1911         vector<OID_t> oids;
1912 
1913         bs.advance(sizeof(ISMPacketHeader));
1914         deserializeSet<LogicalPartition>(bs, partitions);
1915         deserializeInlineVector<OID_t>(bs, oids);
1916 
1917         idbassert(bs.length() == 0);
1918 
1919         for (int i = 0; i < fCacheCount; i++)
1920         {
1921             blockCacheClient bc(*BRPp[i]);
1922             bc.flushPartition(oids, partitions);
1923         }
1924 
1925         ios->write(buildCacheOpResp(0));
1926     }
1927 
doCacheFlushCmd__anon59b59a2d0111::ReadThread1928     void doCacheFlushCmd(SP_UM_IOSOCK ios, const ByteStream& bs)
1929     {
1930         for (int i = 0; i < fCacheCount; i++)
1931         {
1932             blockCacheClient bc(*BRPp[i]);
1933             bc.flushCache();
1934         }
1935 
1936         ios->write(buildCacheOpResp(0));
1937     }
1938 
doCacheDropFDs__anon59b59a2d0111::ReadThread1939     void doCacheDropFDs(SP_UM_IOSOCK ios, ByteStream& bs)
1940     {
1941         std::vector<BRM::FileInfo> files;
1942 
1943         bs.advance(sizeof(ISMPacketHeader));
1944         dropFDCache();
1945         ios->write(buildCacheOpResp(0));
1946     }
1947 
doCachePurgeFDs__anon59b59a2d0111::ReadThread1948     void doCachePurgeFDs(SP_UM_IOSOCK ios, ByteStream& bs)
1949     {
1950         std::vector<BRM::FileInfo> files;
1951 
1952         bs.advance(sizeof(ISMPacketHeader));
1953         deserializeInlineVector<BRM::FileInfo>(bs, files);
1954         purgeFDCache(files);
1955         ios->write(buildCacheOpResp(0));
1956     }
1957 
1958     //N.B. this fcn doesn't actually clean the VSS, but rather instructs PP to flush its
1959     //   cache of specific LBID's
doCacheCleanVSSCmd__anon59b59a2d0111::ReadThread1960     void doCacheCleanVSSCmd(SP_UM_IOSOCK ios, const ByteStream& bs)
1961     {
1962         const ByteStream::byte* bytePtr = bs.buf();
1963         const uint32_t* cntp = reinterpret_cast<const uint32_t*>(&bytePtr[sizeof(ISMPacketHeader)]);
1964         const LbidAtVer* itemp =
1965             reinterpret_cast<const LbidAtVer*>(&bytePtr[sizeof(ISMPacketHeader) + sizeof(uint32_t)]);
1966 
1967         for (int i = 0; i < fCacheCount; i++)
1968         {
1969             blockCacheClient bc(*BRPp[i]);
1970             bc.flushMany(itemp, *cntp);
1971         }
1972 
1973         ios->write(buildCacheOpResp(0));
1974     }
1975 
doCacheFlushAllversion__anon59b59a2d0111::ReadThread1976     void doCacheFlushAllversion(SP_UM_IOSOCK ios, const ByteStream& bs)
1977     {
1978         const ByteStream::byte* bytePtr = bs.buf();
1979         const uint32_t* cntp = reinterpret_cast<const uint32_t*>(&bytePtr[sizeof(ISMPacketHeader)]);
1980         const LBID_t* itemp =
1981             reinterpret_cast<const LBID_t*>(&bytePtr[sizeof(ISMPacketHeader) + sizeof(uint32_t)]);
1982 
1983         for (int i = 0; i < fCacheCount; i++)
1984         {
1985             blockCacheClient bc(*BRPp[i]);
1986             bc.flushManyAllversion(itemp, *cntp);
1987         }
1988 
1989         ios->write(buildCacheOpResp(0));
1990     }
1991 
operator ()__anon59b59a2d0111::ReadThread1992     void operator()()
1993     {
1994         utils::setThreadName("PPReadThread");
1995         boost::shared_ptr<threadpool::PriorityThreadPool> procPoolPtr =
1996             fPrimitiveServerPtr->getProcessorThreadPool();
1997         SBS bs;
1998         UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
1999 
2000         // Establish default output IOSocket (and mutex) based on the input
2001         // IOSocket. If we end up rotating through multiple output sockets
2002         // for the same UM, we will use UmSocketSelector to select output.
2003         SP_UM_IOSOCK outIosDefault(new IOSocket(fIos));
2004         SP_UM_MUTEX  writeLockDefault(new boost::mutex());
2005 
2006         bool bRotateDest = fPrimitiveServerPtr->rotatingDestination();
2007 
2008         if (bRotateDest)
2009         {
2010             // If we tried adding an IP address not listed as UM in config
2011             // file; probably a DMLProc connection.  We allow the connection
2012             // but disable destination rotation since not in Columnstore.xml.
2013             if (!pUmSocketSelector->addConnection(outIosDefault, writeLockDefault))
2014             {
2015                 bRotateDest = false;
2016             }
2017         }
2018 
2019         SP_UM_IOSOCK outIos(outIosDefault);
2020         SP_UM_MUTEX  writeLock(writeLockDefault);
2021 
2022         //..Loop to process incoming messages on IOSocket fIos
2023         for (;;)
2024         {
2025             try
2026             {
2027                 bs = fIos.read();
2028             }
2029             catch (...)
2030             {
2031                 //This connection is dead, nothing useful will come from it ever again
2032                 //We can't rely on the state of bs at this point...
2033                 if (bRotateDest && pUmSocketSelector)
2034                     pUmSocketSelector->delConnection(fIos);
2035 
2036                 fIos.close();
2037                 break;
2038             }
2039 
2040             try
2041             {
2042                 if (bs->length() != 0)
2043                 {
2044                     idbassert(bs->length() >= sizeof(ISMPacketHeader));
2045 
2046                     const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
2047 
2048                     /* This switch is for the OOB commands */
2049                     switch (ismHdr->Command)
2050                     {
2051                         case CACHE_FLUSH_PARTITION:
2052                             doCacheFlushByPartition(outIos, *bs);
2053                             fIos.close();
2054                             return;
2055 
2056                         case CACHE_FLUSH_BY_OID:
2057                             doCacheFlushByOID(outIos, *bs);
2058                             fIos.close();
2059                             return;
2060 
2061                         case CACHE_FLUSH:
2062                             doCacheFlushCmd(outIos, *bs);
2063                             fIos.close();
2064                             return;
2065 
2066                         case CACHE_CLEAN_VSS:
2067                             doCacheCleanVSSCmd(outIos, *bs);
2068                             fIos.close();
2069                             return;
2070 
2071                         case FLUSH_ALL_VERSION:
2072                             doCacheFlushAllversion(outIos, *bs);
2073                             fIos.close();
2074                             return;
2075 
2076                         case CACHE_DROP_FDS:
2077                             doCacheDropFDs(outIos, *bs);
2078                             fIos.close();
2079                             return;
2080 
2081                         case CACHE_PURGE_FDS:
2082                             doCachePurgeFDs(outIos, *bs);
2083                             fIos.close();
2084                             return;
2085 
2086                         default:
2087                             break;
2088                     }
2089 
2090                     switch (ismHdr->Command)
2091                     {
2092                         case DICT_CREATE_EQUALITY_FILTER:
2093                         {
2094                             PriorityThreadPool::Job job;
2095                             const uint8_t* buf = bs->buf();
2096                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2097                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2098                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2099                             job.sock = outIos;
2100                             job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new CreateEqualityFilter(bs));
2101                             OOBPool->addJob(job);
2102                             break;
2103                         }
2104 
2105                         case DICT_DESTROY_EQUALITY_FILTER:
2106                         {
2107                             PriorityThreadPool::Job job;
2108                             const uint8_t* buf = bs->buf();
2109                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2110                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2111                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2112                             job.sock = outIos;
2113                             job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new DestroyEqualityFilter(bs));
2114                             OOBPool->addJob(job);
2115                             break;
2116                         }
2117 
2118                         case DICT_TOKEN_BY_SCAN_COMPARE:
2119                         {
2120                             idbassert(bs->length() >= sizeof(TokenByScanRequestHeader));
2121                             TokenByScanRequestHeader* hdr = (TokenByScanRequestHeader*) ismHdr;
2122 
2123                             if (bRotateDest)
2124                             {
2125                                 if (!pUmSocketSelector->nextIOSocket(
2126                                             fIos, outIos, writeLock))
2127                                 {
2128                                     // If we ever fall into this part of the
2129                                     // code we have a "bug" of some sort.
2130                                     // See handleUmSockSelErr() for more info.
2131                                     // We reset ios and mutex to defaults.
2132                                     handleUmSockSelErr(string("default cmd"));
2133                                     outIos		= outIosDefault;
2134                                     writeLock	= writeLockDefault;
2135                                     pUmSocketSelector->delConnection(fIos);
2136                                     bRotateDest = false;
2137                                 }
2138                             }
2139 
2140                             PriorityThreadPool::Job job;
2141                             job.functor = boost::shared_ptr<DictScanJob>(new DictScanJob(outIos,
2142                                           bs, writeLock));
2143                             job.id = hdr->Hdr.UniqueID;
2144                             job.weight = LOGICAL_BLOCK_RIDS;
2145                             job.priority = hdr->Hdr.Priority;
2146                             const uint8_t* buf = bs->buf();
2147                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2148                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2149                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2150                             job.sock = outIos;
2151 
2152                             if (hdr->flags & IS_SYSCAT)
2153                             {
2154                                 //boost::thread t(DictScanJob(outIos, bs, writeLock));
2155                                 // using already-existing threads may cut latency
2156                                 // if it's changed back to running in an independent thread
2157                                 // change the issyscat() checks in BPPSeeder as well
2158                                 OOBPool->addJob(job);
2159                             }
2160                             else
2161                             {
2162                                 procPoolPtr->addJob(job);
2163                             }
2164 
2165                             break;
2166                         }
2167 
2168                         case BATCH_PRIMITIVE_RUN:
2169                         {
2170                             if (bRotateDest)
2171                             {
2172                                 if (!pUmSocketSelector->nextIOSocket(
2173                                             fIos, outIos, writeLock))
2174                                 {
2175 
2176                                     // If we ever fall into this part of the
2177                                     // code we have a "bug" of some sort.
2178                                     // See handleUmSockSelErr() for more info.
2179                                     // We reset ios and mutex to defaults.
2180                                     handleUmSockSelErr(string("BPR cmd"));
2181                                     outIos		= outIosDefault;
2182                                     writeLock	= writeLockDefault;
2183                                     pUmSocketSelector->delConnection(fIos);
2184                                     bRotateDest = false;
2185                                 }
2186                             }
2187 
2188                             /* Decide whether this is a syscat call and run
2189                             right away instead of queueing */
2190                             boost::shared_ptr<BPPSeeder> bpps(new BPPSeeder(bs, writeLock, outIos,
2191                                                               fPrimitiveServerPtr->ProcessorThreads(),
2192                                                               fPrimitiveServerPtr->PTTrace()));
2193                             PriorityThreadPool::Job job;
2194                             job.functor = bpps;
2195                             job.id = bpps->getID();
2196                             job.weight = ismHdr->Size;
2197                             job.priority = bpps->priority();
2198                             const uint8_t* buf = bs->buf();
2199                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2200                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2201                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2202                             job.sock = outIos;
2203 
2204                             if (bpps->isSysCat())
2205                             {
2206 
2207                                 //boost::thread t(*bpps);
2208                                 // using already-existing threads may cut latency
2209                                 // if it's changed back to running in an independent thread
2210                                 // change the issyscat() checks in BPPSeeder as well
2211                                 OOBPool->addJob(job);
2212                             }
2213                             else
2214                             {
2215                                 procPoolPtr->addJob(job);
2216                             }
2217 
2218                             break;
2219                         }
2220 
2221                         case BATCH_PRIMITIVE_CREATE:
2222                         {
2223                             PriorityThreadPool::Job job;
2224                             job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Create(fBPPHandler, bs));
2225                             const uint8_t* buf = bs->buf();
2226                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2227                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2228                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2229                             job.sock = outIos;
2230                             OOBPool->addJob(job);
2231                             //fBPPHandler->createBPP(*bs);
2232                             break;
2233                         }
2234 
2235                         case BATCH_PRIMITIVE_ADD_JOINER:
2236                         {
2237                             PriorityThreadPool::Job job;
2238                             job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::AddJoiner(fBPPHandler, bs));
2239                             job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2240                             const uint8_t* buf = bs->buf();
2241                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2242                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2243                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2244                             job.sock = outIos;
2245                             OOBPool->addJob(job);
2246                             //fBPPHandler->addJoinerToBPP(*bs);
2247                             break;
2248                         }
2249 
2250                         case BATCH_PRIMITIVE_END_JOINER:
2251                         {
2252                             // lastJoinerMsg can block; must do this in a different thread
2253                             //OOBPool->invoke(BPPHandler::LastJoiner(fBPPHandler, bs));  // needs a threadpool that can resched
2254                             //boost::thread tmp(BPPHandler::LastJoiner(fBPPHandler, bs));
2255                             PriorityThreadPool::Job job;
2256                             job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::LastJoiner(fBPPHandler, bs));
2257                             job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2258                             const uint8_t* buf = bs->buf();
2259                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2260                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2261                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2262                             job.sock = outIos;
2263                             OOBPool->addJob(job);
2264                             break;
2265                         }
2266 
2267                         case BATCH_PRIMITIVE_DESTROY:
2268                         {
2269                             //OOBPool->invoke(BPPHandler::Destroy(fBPPHandler, bs));  // needs a threadpool that can resched
2270                             //boost::thread tmp(BPPHandler::Destroy(fBPPHandler, bs));
2271                             PriorityThreadPool::Job job;
2272                             job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Destroy(fBPPHandler, bs));
2273                             job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2274                             const uint8_t* buf = bs->buf();
2275                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2276                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2277                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2278                             job.sock = outIos;
2279                             OOBPool->addJob(job);
2280                             //fBPPHandler->destroyBPP(*bs);
2281                             break;
2282                         }
2283 
2284                         case BATCH_PRIMITIVE_ACK:
2285                         {
2286                             fBPPHandler->doAck(*bs);
2287                             break;
2288                         }
2289 
2290                         case BATCH_PRIMITIVE_ABORT:
2291                         {
2292                             //OBPool->invoke(BPPHandler::Abort(fBPPHandler, bs));
2293                             //fBPPHandler->doAbort(*bs);
2294                             PriorityThreadPool::Job job;
2295                             job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Abort(fBPPHandler, bs));
2296                             job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2297                             const uint8_t* buf = bs->buf();
2298                             uint32_t pos = sizeof(ISMPacketHeader) - 2;
2299                             job.stepID = *((uint32_t*) &buf[pos + 6]);
2300                             job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2301                             job.sock = outIos;
2302                             OOBPool->addJob(job);
2303                             break;
2304                         }
2305 
2306                         default:
2307                         {
2308                             std::ostringstream os;
2309                             Logger log;
2310                             os << "unknown primitive cmd: " << ismHdr->Command;
2311                             log.logMessage(os.str());
2312                             break;
2313                         }
2314                     }  // the switch stmt
2315                 }
2316                 else // bs.length() == 0
2317                 {
2318                     if (bRotateDest)
2319                         pUmSocketSelector->delConnection(fIos);
2320 
2321                     fIos.close();
2322                     break;
2323                 }
2324             }   // the try- surrounding the if stmt
2325             catch (std::exception& e)
2326             {
2327                 Logger logger;
2328                 logger.logMessage(e.what());
2329             }
2330         }
2331     }
2332 
2333 // If this function is called, we have a "bug" of some sort.  We added
2334 // the "fIos" connection to UmSocketSelector earlier, so at the very
2335 // least, UmSocketSelector should have been able to return that con-
2336 // nection/port.  We will try to recover by using the original fIos to
2337 // send the response msg; but as stated, if this ever happens we have
2338 // a bug we need to resolve.
handleUmSockSelErr__anon59b59a2d0111::ReadThread2339     void handleUmSockSelErr(const string& cmd)
2340     {
2341         ostringstream oss;
2342         oss << "Unable to rotate through socket destinations (" <<
2343             cmd << ") for connection: " << fIos.toString();
2344         cerr << oss.str() << endl;
2345         logging::Message::Args args;
2346         args.add(oss.str());
2347         mlp->logMessage(logging::M0058, args, false);
2348     }
2349 
~ReadThread__anon59b59a2d0111::ReadThread2350     ~ReadThread() {}
2351     string fServerName;
2352     IOSocket fIos;
2353     PrimitiveServer* fPrimitiveServerPtr;
2354     boost::shared_ptr<BPPHandler> fBPPHandler;
2355 };
2356 
2357 /** @brief accept a primitive command from the user module
2358  */
2359 struct ServerThread
2360 {
ServerThread__anon59b59a2d0111::ServerThread2361     ServerThread(string serverName, PrimitiveServer* ps) :
2362         fServerName(serverName), fPrimitiveServerPtr(ps)
2363     {
2364         SUMMARY_INFO2("starting server ", fServerName);
2365 
2366         bool tellUser = true;
2367         bool toldUser = false;
2368 
2369         for (;;)
2370         {
2371             try
2372             {
2373                 mqServerPtr = new MessageQueueServer(fServerName);
2374                 break;
2375             }
2376             catch (runtime_error& re)
2377             {
2378                 string what = re.what();
2379 
2380                 if (what.find("Address already in use") != string::npos)
2381                 {
2382                     if (tellUser)
2383                     {
2384                         cerr << "Address already in use, retrying..." << endl;
2385                         tellUser = false;
2386                         toldUser = true;
2387                     }
2388 
2389                     sleep(5);
2390                 }
2391                 else
2392                 {
2393                     throw;
2394                 }
2395             }
2396         }
2397 
2398         if (toldUser)
2399             cerr << "Ready." << endl;
2400     }
2401 
operator ()__anon59b59a2d0111::ServerThread2402     void operator()()
2403     {
2404         utils::setThreadName("PPServerThr");
2405         IOSocket ios;
2406 
2407         try
2408         {
2409             for (;;)
2410             {
2411                 ios = mqServerPtr->accept();
2412                 //startup a detached thread to handle this socket's I/O
2413                 boost::thread rt(ReadThread(fServerName, ios, fPrimitiveServerPtr));
2414             }
2415         }
2416         catch (std::exception& ex)
2417         {
2418             SUMMARY_INFO2("exception caught in ServerThread: ", ex.what());
2419         }
2420         catch (...)
2421         {
2422             SUMMARY_INFO("exception caught in ServerThread.");
2423         }
2424     }
2425 
2426     string fServerName;
2427     PrimitiveServer* fPrimitiveServerPtr;
2428     MessageQueueServer* mqServerPtr;
2429 };
2430 
2431 } // namespace anon
2432 
2433 namespace primitiveprocessor
2434 {
PrimitiveServer(int serverThreads,int serverQueueSize,int processorWeight,int processorQueueSize,bool rotatingDestination,uint32_t BRPBlocks,int BRPThreads,int cacheCount,int maxBlocksPerRead,int readAheadBlocks,uint32_t deleteBlocks,bool ptTrace,double prefetch,uint64_t smallSide)2435 PrimitiveServer::PrimitiveServer(int serverThreads,
2436                                  int serverQueueSize,
2437                                  int processorWeight,
2438                                  int processorQueueSize,
2439                                  bool rotatingDestination,
2440                                  uint32_t BRPBlocks,
2441                                  int BRPThreads,
2442                                  int cacheCount,
2443                                  int maxBlocksPerRead,
2444                                  int readAheadBlocks,
2445                                  uint32_t deleteBlocks,
2446                                  bool ptTrace,
2447                                  double prefetch,
2448                                  uint64_t smallSide
2449                                 ):
2450     fServerThreads(serverThreads),
2451     fServerQueueSize(serverQueueSize),
2452     fProcessorWeight(processorWeight),
2453     fProcessorQueueSize(processorQueueSize),
2454     fMaxBlocksPerRead(maxBlocksPerRead),
2455     fReadAheadBlocks(readAheadBlocks),
2456     fRotatingDestination(rotatingDestination),
2457     fPTTrace(ptTrace),
2458     fPrefetchThreshold(prefetch),
2459     fPMSmallSide(smallSide)
2460 {
2461     fCacheCount = cacheCount;
2462     fServerpool.setMaxThreads(fServerThreads);
2463     fServerpool.setQueueSize(fServerQueueSize);
2464     fServerpool.setName("PrimitiveServer");
2465 
2466     fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
2467                          medPriorityThreads, lowPriorityThreads, 0));
2468 
2469     // We're not using either the priority or the job-clustering features, just need a threadpool
2470     // that can reschedule jobs, and an unlimited non-blocking queue
2471     OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
2472 
2473     asyncCounter = 0;
2474 
2475     brm = new DBRM();
2476 
2477     BRPp = new BlockRequestProcessor*[fCacheCount];
2478 
2479     try
2480     {
2481         for (int i = 0; i < fCacheCount; i++)
2482             BRPp[i] = new BlockRequestProcessor(BRPBlocks / fCacheCount, BRPThreads / fCacheCount,
2483                                                 fMaxBlocksPerRead, deleteBlocks / fCacheCount);
2484     }
2485     catch (...)
2486     {
2487         cerr << "Unable to allocate " << BRPBlocks << " cache blocks. Adjust the DBBC config parameter "
2488              "downward." << endl;
2489         mlp->logMessage(logging::M0045, logging::Message::Args(), true);
2490         exit(1);
2491     }
2492 }
2493 
~PrimitiveServer()2494 PrimitiveServer::~PrimitiveServer()
2495 {
2496 }
2497 
start(Service * service)2498 void PrimitiveServer::start(Service *service)
2499 {
2500     // start all the server threads
2501     for ( int i = 1; i <= fServerThreads; i++)
2502     {
2503         string s("PMS");
2504         stringstream oss;
2505         oss << s << i;
2506 
2507         fServerpool.invoke(ServerThread(oss.str(), this));
2508     }
2509 
2510     service->NotifyServiceStarted();
2511 
2512     fServerpool.wait();
2513 
2514     cerr << "PrimitiveServer::start() exiting!" << endl;
2515 }
2516 
BPPV()2517 BPPV::BPPV()
2518 {
2519     sendThread.reset(new BPPSendThread());
2520     v.reserve(BPPCount);
2521     pos = 0;
2522     joinDataReceived = false;
2523 }
2524 
~BPPV()2525 BPPV::~BPPV()
2526 {
2527     BOOST_FOREACH( boost::shared_ptr<BatchPrimitiveProcessor> bpp, v )
2528     {
2529         bpp->resetMem();
2530     }
2531 }
2532 
add(boost::shared_ptr<BatchPrimitiveProcessor> a)2533 void BPPV::add(boost::shared_ptr<BatchPrimitiveProcessor> a)
2534 {
2535     /* Right now BPP initialization locks the object if there is a join to
2536     prevent the instance from being used before the join data is received.
2537     The init/join/run sync code is quite old and confusing at this point,
2538     and this makes it a little worse by circumventing the original design.
2539     Due for a rewrite. */
2540 
2541     if (!unusedInstance)
2542     {
2543         unusedInstance = a->duplicate();
2544 
2545         if (a->hasJoin())
2546         {
2547             joinDataReceived = false;
2548             unusedInstance->unlock();
2549         }
2550         else
2551             joinDataReceived = true;
2552     }
2553 
2554     v.push_back(a);
2555 }
get()2556 const vector<boost::shared_ptr<BatchPrimitiveProcessor> >& BPPV::get()
2557 {
2558     return v;
2559 }
2560 
2561 
next()2562 boost::shared_ptr<BatchPrimitiveProcessor> BPPV::next()
2563 {
2564     uint32_t size = v.size();
2565     uint32_t i;
2566 
2567 #if 0
2568 
2569     // This block of code scans for the first available BPP instance,
2570     // makes BPPSeeder reschedule it if none are available. Relies on BPP instance
2571     // being preallocated.
2572     for (i = 0; i < size; i++)
2573     {
2574         uint32_t index = (i + pos) % size;
2575 
2576         if (!(v[index]->busy()))
2577         {
2578             pos = (index + 1) % size;
2579             v[index]->busy(true);
2580             return v[index];
2581         }
2582     }
2583 
2584     // They're all busy, make threadpool reschedule the job
2585     return boost::shared_ptr<BatchPrimitiveProcessor>();
2586 #endif
2587 
2588     //This block of code creates BPP instances if/when they are needed
2589 
2590     // don't use a processor thread when it will just block, reschedule it instead
2591     if (!joinDataReceived)
2592         return boost::shared_ptr<BatchPrimitiveProcessor>();
2593 
2594     for (i = 0; i < size; i++)
2595     {
2596         uint32_t index = (i + pos) % size;
2597 
2598         if (!(v[index]->busy()))
2599         {
2600             pos = (index + 1) % size;
2601             v[index]->busy(true);
2602             return v[index];
2603         }
2604     }
2605 
2606     // honor the BPPCount limit, mostly for debugging purposes.
2607     if (size >= BPPCount)
2608         return boost::shared_ptr<BatchPrimitiveProcessor>();
2609 
2610     SBPP newone = unusedInstance->duplicate();
2611 
2612     if (newone->hasJoin())
2613         newone->unlock();
2614 
2615     newone->busy(true);
2616     v.push_back(newone);
2617     return newone;
2618 }
2619 
abort()2620 void BPPV::abort()
2621 {
2622     sendThread->abort();
2623     BOOST_FOREACH( boost::shared_ptr<BatchPrimitiveProcessor> bpp, v )
2624     {
2625         bpp->unlock();
2626     }
2627 }
2628 
aborted()2629 bool BPPV::aborted()
2630 {
2631     return sendThread->aborted();
2632 }
2633 
2634 // end workaround
2635 
2636 } // namespace primitiveprocessor
2637 // vim:ts=4 sw=4:
2638 
2639