1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /***********************************************************************
20 * $Id: primitiveserver.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
21 *
22 *
23 ***********************************************************************/
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26 #include <unistd.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <stdexcept>
31 //#define NDEBUG
32 #include <cassert>
33 #include <boost/thread.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/foreach.hpp>
36 #ifdef _MSC_VER
37 #include <unordered_map>
38 typedef int pthread_t;
39 #else
40 #include <tr1/unordered_map>
41 #include <tr1/unordered_set>
42 #include <pthread.h>
43 #endif
44 #include <cerrno>
45
46 using namespace std;
47
48 #include <boost/scoped_ptr.hpp>
49 #include <boost/scoped_array.hpp>
50 #include <boost/thread.hpp>
51 using namespace boost;
52
53 #include "primproc.h"
54 #include "primitiveserver.h"
55 #include "primitivemsg.h"
56 #include "umsocketselector.h"
57 #include "brm.h"
58 using namespace BRM;
59
60 #include "writeengine.h"
61
62 #include "messagequeue.h"
63 using namespace messageqcpp;
64
65 #include "blockrequestprocessor.h"
66 #include "blockcacheclient.h"
67 #include "stats.h"
68 using namespace dbbc;
69
70 #include "liboamcpp.h"
71 using namespace oam;
72
73 #include "configcpp.h"
74 using namespace config;
75
76 #include "bppseeder.h"
77 #include "primitiveprocessor.h"
78 #include "pp_logger.h"
79 using namespace primitives;
80
81 #include "errorcodes.h"
82 #include "exceptclasses.h"
83
84 #include "idbcompress.h"
85 using namespace compress;
86
87 #include "IDBDataFile.h"
88 #include "IDBPolicy.h"
89 using namespace idbdatafile;
90
91 using namespace threadpool;
92
93 #include "threadnaming.h"
94
95 #include "atomicops.h"
96
97 #ifndef O_BINARY
98 # define O_BINARY 0
99 #endif
100 #ifndef O_DIRECT
101 # define O_DIRECT 0
102 #endif
103 #ifndef O_LARGEFILE
104 # define O_LARGEFILE 0
105 #endif
106 #ifndef O_NOATIME
107 # define O_NOATIME 0
108 #endif
109
110 typedef tr1::unordered_set<BRM::OID_t> USOID;
111
112 // make global for blockcache
113 //
114 static const char* statsName = {"pm"};
115 dbbc::Stats* gPMStatsPtr = 0;
116 bool gPMProfOn = false;
117 uint32_t gSession = 0;
118 #ifndef _MSC_VER
119 dbbc::Stats pmstats(statsName);
120 #endif
121
122 oam::OamCache* oamCache = oam::OamCache::makeOamCache();
123
124 //FIXME: there is an anon ns burried later in between 2 named namespaces...
125 namespace primitiveprocessor
126 {
127 boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
128
129 BlockRequestProcessor** BRPp;
130 #ifndef _MSC_VER
131 dbbc::Stats stats;
132 #endif
133 extern DebugLevel gDebugLevel;
134 BRM::DBRM* brm;
135 int fCacheCount;
136 bool fPMProfOn;
137 bool fLBIDTraceOn;
138
139 /* params from the config file */
140 uint32_t BPPCount;
141 uint32_t blocksReadAhead;
142 uint32_t defaultBufferSize;
143 uint32_t connectionsPerUM;
144 uint32_t highPriorityThreads;
145 uint32_t medPriorityThreads;
146 uint32_t lowPriorityThreads;
147 int directIOFlag = O_DIRECT;
148 int noVB = 0;
149
150 const uint8_t fMaxColWidth(8);
151 BPPMap bppMap;
152 boost::mutex bppLock;
153
154 #define DJLOCK_READ 0
155 #define DJLOCK_WRITE 1
156 boost::mutex djMutex; // lock for djLock, lol.
157 std::map<uint64_t, shared_mutex *> djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619
158
159 volatile int32_t asyncCounter;
160 const int asyncMax = 20; // current number of asynchronous loads
161
162 struct preFetchCond
163 {
164 //uint64_t lbid;
165 boost::condition cond;
166 unsigned waiters;
167
preFetchCondprimitiveprocessor::preFetchCond168 preFetchCond(const uint64_t l)
169 {
170 waiters = 0;
171 }
172
~preFetchCondprimitiveprocessor::preFetchCond173 ~preFetchCond() { }
174 };
175
176 typedef preFetchCond preFetchBlock_t;
177 typedef std::tr1::unordered_map<uint64_t, preFetchBlock_t*> pfBlockMap_t;
178 typedef std::tr1::unordered_map<uint64_t, preFetchBlock_t*>::iterator pfBlockMapIter_t;
179
180 pfBlockMap_t pfBlockMap;
181 boost::mutex pfbMutex; // = PTHREAD_MUTEX_INITIALIZER;
182
183 pfBlockMap_t pfExtentMap;
184 boost::mutex pfMutex; // = PTHREAD_MUTEX_INITIALIZER;
185
186 map<uint32_t, boost::shared_ptr<DictEqualityFilter> > dictEqualityFilters;
187 boost::mutex eqFilterMutex;
188
cacheNum(uint64_t lbid)189 uint32_t cacheNum(uint64_t lbid)
190 {
191 return ( lbid / brm->getExtentSize() ) % fCacheCount;
192 }
193
buildOidFileName(const BRM::OID_t oid,const uint16_t dbRoot,const uint16_t partNum,const uint32_t segNum,char * file_name)194 void buildOidFileName(const BRM::OID_t oid,
195 const uint16_t dbRoot,
196 const uint16_t partNum,
197 const uint32_t segNum,
198 char* file_name)
199 {
200 WriteEngine::FileOp fileOp(false);
201
202 if (fileOp.getFileName(oid, file_name, dbRoot, partNum, segNum) != WriteEngine::NO_ERROR)
203 {
204 file_name[0] = 0;
205 throw std::runtime_error("fileOp.getFileName failed");
206 }
207
208 //cout << "Oid2Filename o: " << oid << " n: " << file_name << endl;
209 }
210
211
waitForRetry(long count)212 void waitForRetry(long count)
213 {
214 timespec ts;
215 ts.tv_sec = 5L * count / 10L;
216 ts.tv_nsec = (5L * count % 10L) * 100000000L;
217 #ifdef _MSC_VER
218 Sleep(ts.tv_sec * 1000 + ts.tv_nsec / 1000 / 1000);
219 #else
220 nanosleep(&ts, 0);
221 #endif
222 }
223
224
prefetchBlocks(const uint64_t lbid,const int compType,uint32_t * rCount)225 void prefetchBlocks(const uint64_t lbid,
226 const int compType,
227 uint32_t* rCount)
228 {
229 uint16_t dbRoot;
230 uint32_t partNum;
231 uint16_t segNum;
232 uint32_t hwm;
233 uint32_t fbo;
234 uint32_t lowfbo;
235 uint32_t highfbo;
236 BRM::OID_t oid;
237 pfBlockMap_t::const_iterator iter;
238 uint64_t lowlbid = (lbid / blocksReadAhead) * blocksReadAhead;
239 blockCacheClient bc(*BRPp[cacheNum(lbid)]);
240 BRM::InlineLBIDRange range;
241 int err;
242
243 pfbMutex.lock();
244
245 iter = pfBlockMap.find(lowlbid);
246
247 if (iter != pfBlockMap.end())
248 {
249 iter->second->waiters++;
250 iter->second->cond.wait(pfbMutex);
251 iter->second->waiters--;
252 pfbMutex.unlock();
253 return;
254 }
255
256 preFetchBlock_t* pfb = 0;
257 pfb = new preFetchBlock_t(lowlbid);
258
259 pfBlockMap[lowlbid] = pfb;
260 pfbMutex.unlock();
261
262 // loadBlock will catch a versioned block so vbflag can be set to false here
263 err = brm->lookupLocal(lbid, 0, false, oid, dbRoot, partNum, segNum, fbo); // need the oid
264
265 if (err < 0)
266 {
267 cerr << "prefetchBlocks(): BRM lookupLocal failed! Expect more errors.\n";
268 goto cleanup;
269 }
270
271 // We ignore extState that tells us whether the extent is
272 // an outOfService extent to be ignored. The filtering for
273 // outOfService extents is done elsewhere.
274 int extState;
275 err = brm->getLocalHWM(oid, partNum, segNum, hwm, extState);
276
277 if (err < 0)
278 {
279 cerr << "prefetchBlock(): BRM getLocalHWM failed! Expect more errors.\n";
280 goto cleanup;
281 }
282
283 lowfbo = fbo - (lbid - lowlbid);
284 highfbo = lowfbo + blocksReadAhead - 1;
285 range.start = lowlbid;
286
287 if (hwm < highfbo)
288 range.size = hwm - lowfbo + 1;
289 else
290 range.size = blocksReadAhead;
291
292 try
293 {
294 if (range.size > blocksReadAhead)
295 {
296 ostringstream os;
297 os << "Invalid Range from HWM for lbid " << lbid
298 << ", range size should be <= blocksReadAhead: HWM " << hwm
299 << ", dbroot " << dbRoot
300 << ", highfbo " << highfbo << ", lowfbo " << lowfbo
301 << ", blocksReadAhead " << blocksReadAhead
302 << ", range size " << (int) range.size << endl;
303 throw logging::InvalidRangeHWMExcept(os.str());
304 }
305
306 idbassert(range.size <= blocksReadAhead);
307
308 bc.check(range, QueryContext(numeric_limits<VER_t>::max()), 0, compType, *rCount);
309 }
310 catch (...)
311 {
312 // Perform necessary cleanup before rethrowing the exception
313 pfb->cond.notify_all();
314
315 pfbMutex.lock();
316
317 while (pfb->waiters > 0)
318 {
319 pfbMutex.unlock();
320 //handle race condition with other threads going into wait before the broadcast above
321 pfb->cond.notify_one();
322 usleep(1);
323 pfbMutex.lock();
324 }
325
326 if (pfBlockMap.erase(lowlbid) > 0)
327 delete pfb;
328
329 pfb = 0;
330 pfbMutex.unlock();
331 throw;
332 }
333
334 cleanup:
335 pfb->cond.notify_all();
336
337 pfbMutex.lock();
338
339 while (pfb->waiters > 0)
340 {
341 pfbMutex.unlock();
342 //handle race condition with other threads going into wait before the broadcast above
343 pfb->cond.notify_one();
344 usleep(1);
345 pfbMutex.lock();
346 }
347
348 if (pfBlockMap.erase(lowlbid) > 0)
349 delete pfb;
350
351 pfb = 0;
352 pfbMutex.unlock();
353
354 } // prefetchBlocks()
355
356 // returns the # that were cached.
loadBlocks(LBID_t * lbids,QueryContext qc,VER_t txn,int compType,uint8_t ** bufferPtrs,uint32_t * rCount,bool LBIDTrace,uint32_t sessionID,uint32_t blockCount,bool * blocksWereVersioned,bool doPrefetch,VSSCache * vssCache)357 uint32_t loadBlocks (
358 LBID_t* lbids,
359 QueryContext qc,
360 VER_t txn,
361 int compType,
362 uint8_t** bufferPtrs,
363 uint32_t* rCount,
364 bool LBIDTrace,
365 uint32_t sessionID,
366 uint32_t blockCount,
367 bool* blocksWereVersioned,
368 bool doPrefetch,
369 VSSCache* vssCache)
370 {
371 blockCacheClient bc(*BRPp[cacheNum(lbids[0])]);
372 uint32_t blksRead = 0;
373 VSSCache::iterator it;
374 uint32_t i, ret;
375 bool* vbFlags;
376 int* vssRCs;
377 bool* cacheThisBlock;
378 bool* wasCached;
379
380 *blocksWereVersioned = false;
381
382 #ifndef _MSC_VER
383
384 if (LBIDTrace)
385 {
386 for (i = 0; i < blockCount; i++)
387 {
388
389 stats.touchedLBID(lbids[i], pthread_self(), sessionID);
390 }
391 }
392
393 #endif
394 VER_t* vers = (VER_t*) alloca(blockCount * sizeof(VER_t));
395 vbFlags = (bool*) alloca(blockCount);
396 vssRCs = (int*) alloca(blockCount * sizeof(int));
397 cacheThisBlock = (bool*) alloca(blockCount);
398 wasCached = (bool*) alloca(blockCount);
399
400 for (i = 0; i < blockCount; i++)
401 {
402 if (vssCache)
403 {
404 it = vssCache->find(lbids[i]);
405
406 if (it != vssCache->end())
407 {
408 VSSData& vd = it->second;
409 vers[i] = vd.verID;
410 vbFlags[i] = vd.vbFlag;
411 vssRCs[i] = vd.returnCode;
412
413 if (vssRCs[i] == ERR_SNAPSHOT_TOO_OLD)
414 throw runtime_error("Snapshot too old");
415 }
416 }
417
418 if (!vssCache || it == vssCache->end())
419 vssRCs[i] = brm->vssLookup(lbids[i], qc, txn, &vers[i], &vbFlags[i]);
420
421 *blocksWereVersioned |= vbFlags[i];
422
423 // If the block is being modified by this txn, set the useCache flag to false
424 if (txn > 0 && vers[i] == txn && !vbFlags[i])
425 cacheThisBlock[i] = false;
426 else
427 cacheThisBlock[i] = true;
428 }
429
430 /*
431 cout << " resolved ver #s: ";
432 for (uint32_t i = 0; i < blockCount; i++)
433 cout << " <" << vers[i] << ", " << (int) vbFlags[i] << ", " << (int)
434 cacheThisBlock[i] << ">";
435 cout << endl;
436 */
437
438 ret = bc.getCachedBlocks(lbids, vers, bufferPtrs, wasCached, blockCount);
439
440 // Do we want to check any VB flags here? Initial thought: no, because we have
441 // no idea whether any other blocks in the prefetch range are versioned,
442 // what's the difference if one in the visible range is?
443 if (ret != blockCount && doPrefetch)
444 {
445 prefetchBlocks(lbids[0], compType, &blksRead);
446
447 #ifndef _MSC_VER
448
449 if (fPMProfOn)
450 pmstats.markEvent(lbids[0], (pthread_t) - 1, sessionID, 'M');
451
452 #endif
453 /* After the prefetch they're all cached if they are in the same range, so
454 * prune the block list and try getCachedBlocks again first, then fall back
455 * to single-block IO requests if for some reason they aren't. */
456 uint32_t l_blockCount = 0;
457
458 for (i = 0; i < blockCount; i++)
459 {
460 if (!wasCached[i])
461 {
462 lbids[l_blockCount] = lbids[i];
463 vers[l_blockCount] = vers[i];
464 bufferPtrs[l_blockCount] = bufferPtrs[i];
465 vbFlags[l_blockCount] = vbFlags[i];
466 cacheThisBlock[l_blockCount] = cacheThisBlock[i];
467 ++l_blockCount;
468 }
469 }
470
471 ret += bc.getCachedBlocks(lbids, vers, bufferPtrs, wasCached, l_blockCount);
472
473 if (ret != blockCount)
474 {
475 for (i = 0; i < l_blockCount; i++)
476 if (!wasCached[i])
477 {
478 bool ver;
479
480 qc.currentScn = vers[i];
481 bc.getBlock(lbids[i], qc, txn, compType, (void*) bufferPtrs[i],
482 vbFlags[i], wasCached[i], &ver, cacheThisBlock[i], false);
483 *blocksWereVersioned |= ver;
484 blksRead++;
485 }
486 }
487 }
488 /* Some blocks weren't cached, prefetch is disabled -> issue single-block IO requests,
489 * skip checking the cache again. */
490 else if (ret != blockCount)
491 {
492 for (i = 0; i < blockCount; i++)
493 {
494 if (!wasCached[i])
495 {
496 bool ver;
497
498 qc.currentScn = vers[i];
499 bc.getBlock(lbids[i], qc, txn, compType, (void*) bufferPtrs[i], vbFlags[i],
500 wasCached[i], &ver, cacheThisBlock[i], false);
501 *blocksWereVersioned |= ver;
502 blksRead++;
503 }
504 }
505 }
506
507 if (rCount)
508 *rCount = blksRead;
509
510 //if (*verBlocks)
511 // cout << "loadBlock says there were versioned blocks\n";
512 return ret;
513 }
514
loadBlock(uint64_t lbid,QueryContext v,uint32_t t,int compType,void * bufferPtr,bool * pWasBlockInCache,uint32_t * rCount,bool LBIDTrace,uint32_t sessionID,bool doPrefetch,VSSCache * vssCache)515 void loadBlock (
516 uint64_t lbid,
517 QueryContext v,
518 uint32_t t,
519 int compType,
520 void* bufferPtr,
521 bool* pWasBlockInCache,
522 uint32_t* rCount,
523 bool LBIDTrace,
524 uint32_t sessionID,
525 bool doPrefetch,
526 VSSCache* vssCache)
527 {
528 bool flg = false;
529 BRM::OID_t oid;
530 BRM::VER_t txn = (BRM::VER_t)t;
531 uint16_t dbRoot = 0;
532 uint32_t partitionNum = 0;
533 uint16_t segmentNum = 0;
534 int rc;
535 BRM::VER_t ver;
536 blockCacheClient bc(*BRPp[cacheNum(lbid)]);
537 char file_name[WriteEngine::FILE_NAME_SIZE] = {0};
538 char* fileNamePtr = file_name;
539 uint32_t blksRead = 0;
540 VSSCache::iterator it;
541
542 #ifndef _MSC_VER
543
544 if (LBIDTrace)
545 stats.touchedLBID(lbid, pthread_self(), sessionID);
546
547 #endif
548
549 if (vssCache)
550 {
551 it = vssCache->find(lbid);
552
553 if (it != vssCache->end())
554 {
555 VSSData& vd = it->second;
556 ver = vd.verID;
557 flg = vd.vbFlag;
558 rc = vd.returnCode;
559 }
560 }
561
562 if (!vssCache || it == vssCache->end())
563 rc = brm->vssLookup((BRM::LBID_t)lbid, v, txn, &ver, &flg);
564
565 v.currentScn = ver;
566 //cout << "VSS l/u: l=" << lbid << " v=" << ver << " t=" << txn << " flg=" << flg << " rc: " << rc << endl;
567
568 // if this block is locked by this session, don't cache it, just read it directly from disk
569 if (txn > 0 && ver == txn && !flg && !noVB)
570 {
571 uint64_t offset;
572 uint32_t fbo;
573 boost::scoped_array<uint8_t> newBufferSa;
574 boost::scoped_array<char> cmpHdrBufSa;
575 boost::scoped_array<char> cmpBufSa;
576 boost::scoped_array<unsigned char> uCmpBufSa;
577
578
579
580 ptrdiff_t alignedBuffer = 0;
581 void* readBufferPtr = NULL;
582 char* cmpHdrBuf = NULL;
583 char* cmpBuf = NULL;
584 unsigned char* uCmpBuf = NULL;
585 uint64_t cmpBufLen = 0;
586 int blockReadRetryCount = 0;
587 unsigned idx = 0;
588 int pageSize = getpagesize();
589 IDBDataFile* fp = 0;
590
591 try
592 {
593 rc = brm->lookupLocal((BRM::LBID_t)lbid,
594 ver,
595 flg,
596 oid,
597 dbRoot,
598 partitionNum,
599 segmentNum,
600 fbo);
601
602
603 // load the block
604 buildOidFileName(oid, dbRoot, partitionNum, segmentNum, fileNamePtr);
605 int opts = directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
606 fp = IDBDataFile::open(
607 IDBPolicy::getType( fileNamePtr, IDBPolicy::PRIMPROC ),
608 fileNamePtr,
609 "r",
610 opts);
611
612 if ( fp == NULL )
613 {
614 int errCode = errno;
615 SUMMARY_INFO2("open failed: ", fileNamePtr);
616 char errbuf[80];
617 string errMsg;
618 #ifdef __linux__
619 //#if STRERROR_R_CHAR_P
620 const char* p;
621
622 if ((p = strerror_r(errCode, errbuf, 80)) != 0)
623 errMsg = p;
624
625 #else
626 int p;
627
628 if ((p = strerror_r(errCode, errbuf, 80)) == 0)
629 errMsg = errbuf;
630
631 #endif
632
633 #ifndef _MSC_VER
634
635 if (errCode == EINVAL)
636 {
637 throw logging::IDBExcept(logging::IDBErrorInfo::instance()->
638 errorMsg(logging::ERR_O_DIRECT), logging::ERR_O_DIRECT);
639 }
640
641 #endif
642 string errStr(fileNamePtr);
643 errStr += ": open: ";
644 errStr += errMsg;
645 throw std::runtime_error(errStr);
646 }
647
648 // fd >= 0 must be true, otherwise above exception thrown.
649 offset = (uint64_t)fbo * (uint64_t)DATA_BLOCK_SIZE;
650 idx = offset / (4 * 1024 * 1024);
651
652 errno = 0;
653 rc = 0;
654 int i = -1;
655
656
657 if (compType == 0)
658 {
659 newBufferSa.reset(new uint8_t[DATA_BLOCK_SIZE + pageSize]);
660 alignedBuffer = (ptrdiff_t) newBufferSa.get();
661
662 if ((alignedBuffer % pageSize) != 0)
663 {
664 alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
665 alignedBuffer += pageSize;
666 }
667
668 readBufferPtr = (void*) alignedBuffer;
669 i = fp->pread( readBufferPtr, offset, DATA_BLOCK_SIZE );
670 memcpy(bufferPtr, readBufferPtr, i);
671 #ifdef IDB_COMP_POC_DEBUG
672 {
673 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
674 cout << "pread2(" << fd << ", 0x" << hex << (ptrdiff_t)readBufferPtr << dec << ", " << DATA_BLOCK_SIZE << ", " << offset << ") = " << i << endl;
675 }
676 #endif // IDB_COMP_POC_DEBUG
677 } // if (compType == 0)
678 else // if (compType != 0)
679 {
680 // retry if file is out of sync -- compressed column file only.
681 blockReadRetry:
682
683 uCmpBufSa.reset(new unsigned char[4 * 1024 * 1024 + 4]);
684 uCmpBuf = uCmpBufSa.get();
685 cmpHdrBufSa.reset(new char[4096 * 3 + pageSize]);
686 alignedBuffer = (ptrdiff_t) cmpHdrBufSa.get();
687
688 if ((alignedBuffer % pageSize) != 0)
689 {
690 alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
691 alignedBuffer += pageSize;
692 }
693
694 cmpHdrBuf = (char*) alignedBuffer;
695
696 i = fp->pread( &cmpHdrBuf[0], 0, 4096 * 3);
697
698 CompChunkPtrList ptrList;
699 IDBCompressInterface decompressor;
700 int dcrc = 0;
701
702 if (i == 4096 * 3)
703 {
704 uint64_t numHdrs = 0; // extra headers
705 dcrc = decompressor.getPtrList(&cmpHdrBuf[4096], 4096, ptrList);
706
707 if (dcrc == 0 && ptrList.size() > 0)
708 numHdrs = ptrList[0].first / 4096ULL - 2ULL;
709
710 if (numHdrs > 0)
711 {
712 boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + pageSize]);
713 alignedBuffer = (ptrdiff_t) nextHdrBufsa.get();
714
715 if ((alignedBuffer % pageSize) != 0)
716 {
717 alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
718 alignedBuffer += pageSize;
719 }
720
721 char* nextHdrBufPtr = (char*) alignedBuffer;
722
723 i = fp->pread( &nextHdrBufPtr[0], 4096 * 2, numHdrs * 4096 );
724
725 CompChunkPtrList nextPtrList;
726 dcrc = decompressor.getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
727
728 if (dcrc == 0)
729 ptrList.insert(ptrList.end(), nextPtrList.begin(), nextPtrList.end());
730 }
731 }
732
733 if (dcrc != 0 || idx >= ptrList.size())
734 {
735 // Due to race condition, the header on disk may not upated yet.
736 // Log an info message and retry.
737 if (blockReadRetryCount == 0)
738 {
739 logging::Message::Args args;
740 args.add(oid);
741 ostringstream infoMsg;
742 infoMsg << "retry read from " << fileNamePtr << ". dcrc=" << dcrc
743 << ", idx=" << idx << ", ptr.size=" << ptrList.size();
744 args.add(infoMsg.str());
745 mlp->logInfoMessage(logging::M0061, args);
746 }
747
748 if (++blockReadRetryCount < 30)
749 {
750 waitForRetry(blockReadRetryCount);
751 goto blockReadRetry;
752 }
753 else
754 {
755 rc = -1004;
756 }
757 }
758
759 if (rc == 0)
760 {
761 unsigned cmpBlkOff = offset % (4 * 1024 * 1024);
762 uint64_t cmpBufOff = ptrList[idx].first;
763 uint64_t cmpBufSz = ptrList[idx].second;
764
765 if (cmpBufSa.get() == NULL || cmpBufLen < cmpBufSz)
766 {
767 cmpBufSa.reset(new char[cmpBufSz + pageSize]);
768 cmpBufLen = cmpBufSz;
769 alignedBuffer = (ptrdiff_t) cmpBufSa.get();
770
771 if ((alignedBuffer % pageSize) != 0)
772 {
773 alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
774 alignedBuffer += pageSize;
775 }
776
777 cmpBuf = (char*) alignedBuffer;
778 }
779
780 unsigned blen = 4 * 1024 * 1024;
781
782 i = fp->pread( cmpBuf, cmpBufOff, cmpBufSz );
783
784 dcrc = decompressor.uncompressBlock(cmpBuf, cmpBufSz, uCmpBuf, blen);
785
786 if (dcrc == 0)
787 {
788 memcpy(bufferPtr, &uCmpBuf[cmpBlkOff], DATA_BLOCK_SIZE);
789 }
790 else
791 {
792 // Due to race condition, the header on disk may not upated yet.
793 // Log an info message and retry.
794 if (blockReadRetryCount == 0)
795 {
796 logging::Message::Args args;
797 args.add(oid);
798 ostringstream infoMsg;
799 infoMsg << "retry read from " << fileNamePtr << ". dcrc=" << dcrc
800 << ", idx=" << idx << ", ptr.size=" << ptrList.size();
801 args.add(infoMsg.str());
802 mlp->logInfoMessage(logging::M0061, args);
803 }
804
805 if (++blockReadRetryCount < 30)
806 {
807 waitForRetry(blockReadRetryCount);
808 goto blockReadRetry;
809 }
810 else
811 {
812 rc = -1006;
813 }
814 }
815 }
816 }
817
818 if ( rc < 0 )
819 {
820 string msg("pread failed");
821 ostringstream infoMsg;
822 infoMsg << " rc:" << rc << ")";
823 msg = msg + ", error:" + strerror(errno) + infoMsg.str();
824 SUMMARY_INFO(msg);
825 //FIXME: free-up allocated memory!
826 throw std::runtime_error(msg);
827 }
828 }
829 catch (...)
830 {
831 delete fp;
832 fp = 0;
833 throw;
834 }
835
836 delete fp;
837 fp = 0;
838
839 // log the retries
840 if (blockReadRetryCount > 0)
841 {
842 logging::Message::Args args;
843 args.add(oid);
844 ostringstream infoMsg;
845 infoMsg << "Successfully uncompress " << fileNamePtr << " chunk "
846 << idx << " @" << " blockReadRetry:" << blockReadRetryCount;
847 args.add(infoMsg.str());
848 mlp->logInfoMessage(logging::M0006, args);
849 }
850
851 if (pWasBlockInCache)
852 *pWasBlockInCache = false;
853
854 if (rCount)
855 *rCount = 1;
856
857 return;
858 }
859
860 FileBuffer* fbPtr = 0;
861 bool wasBlockInCache = false;
862
863 fbPtr = bc.getBlockPtr(lbid, ver, flg);
864
865 if (fbPtr)
866 {
867 memcpy(bufferPtr, fbPtr->getData(), BLOCK_SIZE);
868 wasBlockInCache = true;
869 }
870
871 if (doPrefetch && !wasBlockInCache && !flg)
872 {
873 prefetchBlocks(lbid, compType, &blksRead);
874
875 #ifndef _MSC_VER
876
877 if (fPMProfOn)
878 pmstats.markEvent(lbid, (pthread_t) - 1, sessionID, 'M');
879
880 #endif
881 bc.getBlock(lbid, v, txn, compType, (uint8_t*) bufferPtr, flg, wasBlockInCache);
882
883 if (!wasBlockInCache)
884 blksRead++;
885 }
886 else if (!wasBlockInCache)
887 {
888 bc.getBlock(lbid, v, txn, compType, (uint8_t*) bufferPtr, flg, wasBlockInCache);
889
890 if (!wasBlockInCache)
891 blksRead++;
892 }
893
894 if (pWasBlockInCache)
895 *pWasBlockInCache = wasBlockInCache;
896
897 if (rCount)
898 *rCount = blksRead;
899
900 }
901
902 struct AsynchLoader
903 {
AsynchLoaderprimitiveprocessor::AsynchLoader904 AsynchLoader(uint64_t l,
905 const QueryContext& v,
906 uint32_t t,
907 int ct,
908 uint32_t* cCount,
909 uint32_t* rCount,
910 bool trace,
911 uint32_t sesID,
912 boost::mutex* m,
913 uint32_t* loaderCount,
914 boost::shared_ptr<BPPSendThread> st, // sendThread for abort upon exception.
915 VSSCache* vCache) :
916 lbid(l),
917 ver(v),
918 txn(t),
919 compType(ct),
920 LBIDTrace(trace),
921 sessionID(sesID),
922 cacheCount(cCount),
923 readCount(rCount),
924 busyLoaders(loaderCount),
925 mutex(m),
926 sendThread(st),
927 vssCache(vCache)
928 { }
929
operator ()primitiveprocessor::AsynchLoader930 void operator()()
931 {
932 utils::setThreadName("PPAsyncLoader");
933 bool cached = false;
934 uint32_t rCount = 0;
935 char buf[BLOCK_SIZE];
936
937 //cout << "asynch started " << pthread_self() << " l: " << lbid << endl;
938 try
939 {
940 loadBlock(lbid, ver, txn, compType, buf, &cached, &rCount, LBIDTrace, true, vssCache);
941 }
942 catch (std::exception& ex)
943 {
944 sendThread->abort();
945 cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl;
946 idbassert(asyncCounter > 0);
947 (void)atomicops::atomicDec(&asyncCounter);
948 mutex->lock();
949 --(*busyLoaders);
950 mutex->unlock();
951 logging::Message::Args args;
952 args.add(string("PrimProc AsyncLoader caught error: "));
953 args.add(ex.what());
954 primitiveprocessor::mlp->logMessage(logging::M0000, args, false);
955 return;
956 }
957 catch (...)
958 {
959 sendThread->abort();
960 cerr << "AsynchLoader caught unknown exception: " << endl;
961 //FIXME Use a locked processor primitive?
962 idbassert(asyncCounter > 0);
963 (void)atomicops::atomicDec(&asyncCounter);
964 mutex->lock();
965 --(*busyLoaders);
966 mutex->unlock();
967 logging::Message::Args args;
968 args.add(string("PrimProc AsyncLoader caught unknown error"));
969 primitiveprocessor::mlp->logMessage(logging::M0000, args, false);
970 return;
971 }
972
973 idbassert(asyncCounter > 0);
974 (void)atomicops::atomicDec(&asyncCounter);
975 mutex->lock();
976
977 if (cached)
978 (*cacheCount)++;
979
980 *readCount += rCount;
981 --(*busyLoaders);
982 mutex->unlock();
983 // cerr << "done\n";
984 }
985
986 private:
987 uint64_t lbid;
988 QueryContext ver;
989 uint32_t txn;
990 int compType;
991 uint8_t dataWidth;
992 bool LBIDTrace;
993 uint32_t sessionID;
994 uint32_t* cacheCount;
995 uint32_t* readCount;
996 uint32_t* busyLoaders;
997 boost::mutex* mutex;
998 boost::shared_ptr<BPPSendThread> sendThread;
999 VSSCache* vssCache;
1000 };
1001
loadBlockAsync(uint64_t lbid,const QueryContext & c,uint32_t txn,int compType,uint32_t * cCount,uint32_t * rCount,bool LBIDTrace,uint32_t sessionID,boost::mutex * m,uint32_t * busyLoaders,boost::shared_ptr<BPPSendThread> sendThread,VSSCache * vssCache)1002 void loadBlockAsync(uint64_t lbid,
1003 const QueryContext& c,
1004 uint32_t txn,
1005 int compType,
1006 uint32_t* cCount,
1007 uint32_t* rCount,
1008 bool LBIDTrace,
1009 uint32_t sessionID,
1010 boost::mutex* m,
1011 uint32_t* busyLoaders,
1012 boost::shared_ptr<BPPSendThread> sendThread, // sendThread for abort upon exception.
1013 VSSCache* vssCache)
1014 {
1015 blockCacheClient bc(*BRPp[cacheNum(lbid)]);
1016 bool vbFlag;
1017 BRM::VER_t ver;
1018 VSSCache::iterator it;
1019
1020 if (vssCache)
1021 {
1022 it = vssCache->find(lbid);
1023
1024 if (it != vssCache->end())
1025 {
1026 //cout << "async: vss cache hit on " << lbid << endl;
1027 VSSData& vd = it->second;
1028 ver = vd.verID;
1029 vbFlag = vd.vbFlag;
1030 }
1031 }
1032
1033 if (!vssCache || it == vssCache->end())
1034 brm->vssLookup((BRM::LBID_t) lbid, c, txn, &ver, &vbFlag);
1035
1036 if (bc.exists(lbid, ver))
1037 return;
1038
1039 /* a quick and easy stand-in for a threadpool for loaders */
1040 atomicops::atomicMb();
1041
1042 if (asyncCounter >= asyncMax)
1043 return;
1044
1045 (void)atomicops::atomicInc(&asyncCounter);
1046
1047 boost::mutex::scoped_lock sl(*m);
1048
1049 try
1050 {
1051 boost::thread thd(AsynchLoader(lbid, c, txn, compType, cCount, rCount,
1052 LBIDTrace, sessionID, m, busyLoaders, sendThread, vssCache));
1053 (*busyLoaders)++;
1054 }
1055 catch (boost::thread_resource_error& e)
1056 {
1057 cerr << "AsynchLoader: caught a thread resource error, need to lower asyncMax\n";
1058 idbassert(asyncCounter > 0);
1059 (void)atomicops::atomicDec(&asyncCounter);
1060 }
1061 }
1062
1063 } //namespace primitiveprocessor
1064
1065 //#define DCT_DEBUG 1
1066 #define SETUP_GUARD \
1067 { \
1068 unsigned char* o = outputp.get(); \
1069 memset(o, 0xa5, ouput_buf_size*3); \
1070 }
1071 #undef SETUP_GUARD
1072 #define SETUP_GUARD
1073 #define CHECK_GUARD(lbid) \
1074 { \
1075 unsigned char* o = outputp.get(); \
1076 for (int i = 0; i < ouput_buf_size; i++) \
1077 { \
1078 if (*o++ != 0xa5) \
1079 { \
1080 cerr << "Buffer underrun on LBID " << lbid << endl; \
1081 idbassert(0); \
1082 } \
1083 } \
1084 o += ouput_buf_size; \
1085 for (int i = 0; i < ouput_buf_size; i++) \
1086 { \
1087 if (*o++ != 0xa5) \
1088 { \
1089 cerr << "Buffer overrun on LBID " << lbid << endl; \
1090 idbassert(0); \
1091 } \
1092 } \
1093 }
1094 #undef CHECK_GUARD
1095 #define CHECK_GUARD(x)
1096
1097 namespace
1098 {
1099 using namespace primitiveprocessor;
1100
1101 /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
1102 * TODO: Move this & the impl into different files
1103 */
1104 class DictScanJob : public threadpool::PriorityThreadPool::Functor
1105 {
1106 public:
1107 DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
1108 virtual ~DictScanJob();
1109
1110 void write(const ByteStream&);
1111 int operator()();
1112 void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr);
1113 void sendErrorMsg(uint32_t id, uint16_t code);
1114
1115 private:
1116 SP_UM_IOSOCK fIos;
1117 SBS fByteStream;
1118 SP_UM_MUTEX fWriteLock;
1119 posix_time::ptime dieTime;
1120 };
1121
DictScanJob(SP_UM_IOSOCK ios,SBS bs,SP_UM_MUTEX writeLock)1122 DictScanJob::DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock) :
1123 fIos(ios), fByteStream(bs), fWriteLock(writeLock)
1124 {
1125 dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
1126 }
1127
~DictScanJob()1128 DictScanJob::~DictScanJob()
1129 {
1130 }
1131
write(const ByteStream & bs)1132 void DictScanJob::write(const ByteStream& bs)
1133 {
1134 boost::mutex::scoped_lock lk(*fWriteLock);
1135 fIos->write(bs);
1136 }
1137
operator ()()1138 int DictScanJob::operator()()
1139 {
1140 utils::setThreadName("PPDictScanJob");
1141 uint8_t data[DATA_BLOCK_SIZE];
1142 uint32_t output_buf_size = MAX_BUFFER_SIZE;
1143 uint32_t session;
1144 uint32_t uniqueId = 0;
1145 bool wasBlockInCache = false;
1146 uint32_t blocksRead = 0;
1147 uint16_t runCount;
1148
1149
1150 boost::shared_ptr<DictEqualityFilter> eqFilter;
1151 ByteStream results(output_buf_size);
1152 TokenByScanRequestHeader* cmd;
1153 PrimitiveProcessor pproc(gDebugLevel);
1154 TokenByScanResultHeader* output;
1155 QueryContext verInfo;
1156
1157 try
1158 {
1159 #ifdef DCT_DEBUG
1160 DebugLevel oldDebugLevel = gDebugLevel;
1161 gDebugLevel = VERBOSE;
1162 #endif
1163 fByteStream->advance(sizeof(TokenByScanRequestHeader));
1164 *fByteStream >> verInfo;
1165 cmd = (TokenByScanRequestHeader*) fByteStream->buf();
1166
1167 session = cmd->Hdr.SessionID;
1168 uniqueId = cmd->Hdr.UniqueID;
1169 runCount = cmd->Count;
1170 output = (TokenByScanResultHeader*) results.getInputPtr();
1171 #ifdef VALGRIND
1172 memset(output, 0, sizeof(TokenByScanResultHeader));
1173 #endif
1174
1175 /* Grab the equality filter if one is specified */
1176 if (cmd->flags & HAS_EQ_FILTER)
1177 {
1178 boost::mutex::scoped_lock sl(eqFilterMutex);
1179 map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it;
1180 it = dictEqualityFilters.find(uniqueId);
1181
1182 if (it != dictEqualityFilters.end())
1183 eqFilter = it->second;
1184
1185 sl.unlock();
1186
1187 if (!eqFilter)
1188 {
1189 if (posix_time::second_clock::universal_time() < dieTime)
1190 {
1191 fByteStream->rewind();
1192 return -1; // it's still being built, wait for it
1193 }
1194 else
1195 return 0; // was probably aborted, go away...
1196 }
1197 }
1198
1199 for (uint16_t i = 0; i < runCount; ++i)
1200 {
1201 loadBlock(cmd->LBID,
1202 verInfo,
1203 cmd->Hdr.TransactionID,
1204 cmd->CompType,
1205 data,
1206 &wasBlockInCache,
1207 &blocksRead,
1208 fLBIDTraceOn,
1209 session);
1210 pproc.setBlockPtr((int*) data);
1211 pproc.p_TokenByScan(cmd, output, output_buf_size, eqFilter);
1212
1213 if (wasBlockInCache)
1214 output->CacheIO++;
1215 else
1216 output->PhysicalIO += blocksRead;
1217
1218 results.advanceInputPtr(output->NBYTES);
1219 write(results);
1220 results.restart();
1221 cmd->LBID++;
1222 }
1223
1224 #ifdef DCT_DEBUG
1225 gDebugLevel = oldDebugLevel;
1226 #endif
1227 }
1228 catch (logging::IDBExcept& iex)
1229 {
1230 cerr << "DictScanJob caught an IDBException: " << iex.what() << endl;
1231 catchHandler(iex.what(), uniqueId, iex.errorCode());
1232 }
1233 catch (std::exception& re)
1234 {
1235 cerr << "DictScanJob caught an exception: " << re.what() << endl;
1236 catchHandler(re.what(), uniqueId);
1237 }
1238 catch (...)
1239 {
1240 string msg("Unknown exception caught in DictScanJob.");
1241 cerr << msg << endl;
1242 catchHandler(msg, uniqueId);
1243 }
1244
1245 return 0;
1246 }
1247
catchHandler(const string & ex,uint32_t id,uint16_t code)1248 void DictScanJob::catchHandler(const string& ex, uint32_t id, uint16_t code)
1249 {
1250 Logger log;
1251 log.logMessage(ex);
1252 sendErrorMsg(id, code);
1253 }
1254
sendErrorMsg(uint32_t id,uint16_t code)1255 void DictScanJob::sendErrorMsg(uint32_t id, uint16_t code)
1256 {
1257 ISMPacketHeader ism;
1258 PrimitiveHeader ph;
1259 ism.Status = code;
1260 ph.UniqueID = id;
1261
1262 ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
1263 msg.append((uint8_t*) &ism, sizeof(ism));
1264 msg.append((uint8_t*) &ph, sizeof(ph));
1265
1266 write(msg);
1267 }
1268
1269 struct BPPHandler
1270 {
BPPHandler__anon59b59a2d0111::BPPHandler1271 BPPHandler(PrimitiveServer* ps) : fPrimitiveServerPtr(ps) { }
1272
1273 // Keep a list of keys so that if connection fails we don't leave BPP
1274 // threads lying around
1275 std::vector<uint32_t> bppKeys;
1276 std::vector<uint32_t>::iterator bppKeysIt;
1277
~BPPHandler__anon59b59a2d0111::BPPHandler1278 ~BPPHandler()
1279 {
1280 boost::mutex::scoped_lock scoped(bppLock);
1281
1282 for (bppKeysIt = bppKeys.begin() ; bppKeysIt != bppKeys.end(); ++bppKeysIt)
1283 {
1284 uint32_t key = *bppKeysIt;
1285 BPPMap::iterator it;
1286
1287 it = bppMap.find(key);
1288
1289 if (it != bppMap.end())
1290 {
1291 it->second->abort();
1292 bppMap.erase(it);
1293 }
1294
1295 fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
1296 OOBPool->removeJobs(key);
1297 }
1298
1299 scoped.unlock();
1300 }
1301
1302 struct BPPHandlerFunctor : public PriorityThreadPool::Functor
1303 {
BPPHandlerFunctor__anon59b59a2d0111::BPPHandler::BPPHandlerFunctor1304 BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
1305 {
1306 rt = r;
1307 dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
1308 }
1309
1310 boost::shared_ptr<BPPHandler> rt;
1311 SBS bs;
1312 posix_time::ptime dieTime;
1313 };
1314
1315 struct LastJoiner : public BPPHandlerFunctor
1316 {
LastJoiner__anon59b59a2d0111::BPPHandler::LastJoiner1317 LastJoiner(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::LastJoiner1318 int operator()()
1319 {
1320 utils::setThreadName("PPHandLastJoiner");
1321 return rt->lastJoinerMsg(*bs, dieTime);
1322 }
1323 };
1324
1325 struct Create : public BPPHandlerFunctor
1326 {
Create__anon59b59a2d0111::BPPHandler::Create1327 Create(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::Create1328 int operator()()
1329 {
1330 utils::setThreadName("PPHandCreate");
1331 rt->createBPP(*bs);
1332 return 0;
1333 }
1334 };
1335
1336 struct Destroy : public BPPHandlerFunctor
1337 {
Destroy__anon59b59a2d0111::BPPHandler::Destroy1338 Destroy(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::Destroy1339 int operator()()
1340 {
1341 utils::setThreadName("PPHandDestroy");
1342 return rt->destroyBPP(*bs, dieTime);
1343 }
1344 };
1345
1346 struct AddJoiner : public BPPHandlerFunctor
1347 {
AddJoiner__anon59b59a2d0111::BPPHandler::AddJoiner1348 AddJoiner(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::AddJoiner1349 int operator()()
1350 {
1351 utils::setThreadName("PPHandAddJoiner");
1352 return rt->addJoinerToBPP(*bs, dieTime);
1353 }
1354 };
1355
1356 struct Abort : public BPPHandlerFunctor
1357 {
Abort__anon59b59a2d0111::BPPHandler::Abort1358 Abort(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b) { }
operator ()__anon59b59a2d0111::BPPHandler::Abort1359 int operator()()
1360 {
1361 utils::setThreadName("PPHandAbort");
1362 return rt->doAbort(*bs, dieTime);
1363 }
1364 };
1365
doAbort__anon59b59a2d0111::BPPHandler1366 int doAbort(ByteStream& bs, const posix_time::ptime& dieTime)
1367 {
1368 uint32_t key;
1369 BPPMap::iterator it;
1370
1371 try
1372 {
1373 bs.advance(sizeof(ISMPacketHeader));
1374 bs >> key;
1375 }
1376 catch (...)
1377 {
1378 // MCOL-857 We don't have the full packet yet
1379 bs.rewind();
1380 return -1;
1381 }
1382
1383 boost::mutex::scoped_lock scoped(bppLock);
1384 bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key);
1385
1386 if (bppKeysIt != bppKeys.end())
1387 {
1388 bppKeys.erase(bppKeysIt);
1389 }
1390
1391 it = bppMap.find(key);
1392
1393 if (it != bppMap.end())
1394 {
1395 it->second->abort();
1396 bppMap.erase(it);
1397 }
1398 else
1399 {
1400 bs.rewind();
1401
1402 if (posix_time::second_clock::universal_time() > dieTime)
1403 return 0;
1404 else
1405 return -1;
1406 }
1407
1408 scoped.unlock();
1409 fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
1410 OOBPool->removeJobs(key);
1411 return 0;
1412 }
1413
doAck__anon59b59a2d0111::BPPHandler1414 int doAck(ByteStream& bs)
1415 {
1416 uint32_t key;
1417 int16_t msgCount;
1418 SBPPV bpps;
1419 const ISMPacketHeader* ism = (const ISMPacketHeader*) bs.buf();
1420
1421 key = ism->Interleave;
1422 msgCount = (int16_t) ism->Size;
1423 bs.advance(sizeof(ISMPacketHeader));
1424
1425 bpps = grabBPPs(key);
1426
1427 if (bpps)
1428 {
1429 bpps->getSendThread()->sendMore(msgCount);
1430 return 0;
1431 }
1432 else
1433 return -1;
1434 }
1435
createBPP__anon59b59a2d0111::BPPHandler1436 void createBPP(ByteStream& bs)
1437 {
1438 uint32_t i;
1439 uint32_t key, initMsgsLeft;
1440 SBPP bpp;
1441 SBPPV bppv;
1442
1443 // make the new BPP object
1444 bppv.reset(new BPPV());
1445 bpp.reset(new BatchPrimitiveProcessor(bs, fPrimitiveServerPtr->prefetchThreshold(),
1446 bppv->getSendThread(), fPrimitiveServerPtr->ProcessorThreads()));
1447
1448 if (bs.length() > 0)
1449 bs >> initMsgsLeft;
1450 else
1451 {
1452 initMsgsLeft = -1;
1453 }
1454
1455 idbassert(bs.length() == 0);
1456 bppv->getSendThread()->sendMore(initMsgsLeft);
1457 bppv->add(bpp);
1458
1459 // this block of code creates some BPP instances up front for user queries,
1460 // seems to get us slightly better query times
1461 if ((bpp->getSessionID() & 0x80000000) == 0)
1462 {
1463 for (i = 1; i < BPPCount / 8; i++)
1464 {
1465 SBPP dup = bpp->duplicate();
1466
1467 /* Uncomment these lines to verify duplicate(). == op might need updating */
1468 // if (*bpp != *dup)
1469 // cerr << "createBPP: duplicate mismatch at index " << i << endl;
1470 // idbassert(*bpp == *dup);
1471 bppv->add(dup);
1472 }
1473 }
1474
1475 boost::mutex::scoped_lock scoped(bppLock);
1476 key = bpp->getUniqueID();
1477 bppKeys.push_back(key);
1478 bool newInsert;
1479 newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second;
1480 //cout << "creating BPP # " << key << endl;
1481 scoped.unlock();
1482
1483 if (!newInsert)
1484 {
1485 if (bpp->getSessionID() & 0x80000000)
1486 cerr << "warning: createBPP() tried to clobber a BPP with duplicate sessionID & stepID. sessionID=" <<
1487 (int) (bpp->getSessionID() ^ 0x80000000) << " stepID=" <<
1488 bpp->getStepID() << " (syscat)" << endl;
1489 else
1490 cerr << "warning: createBPP() tried to clobber a BPP with duplicate sessionID & stepID. sessionID=" <<
1491 bpp->getSessionID() << " stepID=" << bpp->getStepID() << endl;
1492 }
1493 }
1494
grabBPPs__anon59b59a2d0111::BPPHandler1495 inline SBPPV grabBPPs(uint32_t uniqueID)
1496 {
1497 BPPMap::iterator it;
1498 /*
1499 uint32_t failCount = 0;
1500 uint32_t maxFailCount = (fatal ? 500 : 5000);
1501 */
1502 SBPPV ret;
1503
1504 boost::mutex::scoped_lock scoped(bppLock);
1505 it = bppMap.find(uniqueID);
1506
1507 if (it != bppMap.end())
1508 return it->second;
1509 else
1510 return SBPPV();
1511
1512 /*
1513 do
1514 {
1515 if (++failCount == maxFailCount) {
1516 //cout << "grabBPPs couldn't find the BPPs for " << uniqueID << endl;
1517 return ret;
1518 //throw logic_error("grabBPPs couldn't find the unique ID");
1519 }
1520 scoped.unlock();
1521 usleep(5000);
1522 scoped.lock();
1523 it = bppMap.find(uniqueID);
1524 } while (it == bppMap.end());
1525
1526 ret = it->second;
1527 return ret;
1528 */
1529 }
1530
getDJLock__anon59b59a2d0111::BPPHandler1531 inline shared_mutex & getDJLock(uint32_t uniqueID)
1532 {
1533 boost::mutex::scoped_lock lk(djMutex);
1534 auto it = djLock.find(uniqueID);
1535 if (it != djLock.end())
1536 return *it->second;
1537 else
1538 {
1539 auto ret = djLock.insert(make_pair(uniqueID, new shared_mutex())).first;
1540 return *ret->second;
1541 }
1542 }
1543
deleteDJLock__anon59b59a2d0111::BPPHandler1544 inline void deleteDJLock(uint32_t uniqueID)
1545 {
1546 boost::mutex::scoped_lock lk(djMutex);
1547 auto it = djLock.find(uniqueID);
1548 if (it != djLock.end())
1549 {
1550 delete it->second;
1551 djLock.erase(it);
1552 }
1553 }
1554
addJoinerToBPP__anon59b59a2d0111::BPPHandler1555 int addJoinerToBPP(ByteStream& bs, const posix_time::ptime& dieTime)
1556 {
1557 SBPPV bppv;
1558 uint32_t uniqueID;
1559 const uint8_t* buf;
1560
1561 /* call addToJoiner() on the first BPP */
1562
1563 buf = bs.buf();
1564 /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
1565 uniqueID = *((const uint32_t*) &buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
1566 bppv = grabBPPs(uniqueID);
1567
1568 if (bppv)
1569 {
1570 shared_lock<shared_mutex> lk(getDJLock(uniqueID));
1571 bppv->get()[0]->addToJoiner(bs);
1572 return 0;
1573 }
1574 else
1575 {
1576 if (posix_time::second_clock::universal_time() > dieTime)
1577 return 0;
1578 else
1579 return -1;
1580 }
1581 }
1582
lastJoinerMsg__anon59b59a2d0111::BPPHandler1583 int lastJoinerMsg(ByteStream& bs, const posix_time::ptime& dieTime)
1584 {
1585 SBPPV bppv;
1586 uint32_t uniqueID, i;
1587 const uint8_t* buf;
1588 int err;
1589
1590 /* call endOfJoiner() on the every BPP */
1591
1592 buf = bs.buf();
1593 /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
1594 uniqueID = *((const uint32_t*) &buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
1595
1596 bppv = grabBPPs(uniqueID);
1597
1598 if (!bppv)
1599 {
1600 //cout << "got a lastJoiner msg for an unknown obj " << uniqueID << endl;
1601 if (posix_time::second_clock::universal_time() > dieTime)
1602 return 0;
1603 else
1604 return -1;
1605 }
1606
1607 boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
1608
1609
1610 for (i = 0; i < bppv->get().size(); i++)
1611 {
1612 err = bppv->get()[i]->endOfJoiner();
1613
1614 if (err == -1)
1615 {
1616 if (posix_time::second_clock::universal_time() > dieTime)
1617 return 0;
1618 else
1619 return -1;
1620 }
1621 }
1622 bppv->get()[0]->doneSendingJoinerData();
1623
1624 /* Note: some of the duplicate/run/join sync was moved to the BPPV class to do
1625 more intelligent scheduling. Once the join data is received, BPPV will
1626 start letting jobs run and create more BPP instances on demand. */
1627
1628 atomicops::atomicMb(); // make sure the joinDataReceived assignment doesn't migrate upward...
1629 bppv->joinDataReceived = true;
1630 return 0;
1631 }
1632
destroyBPP__anon59b59a2d0111::BPPHandler1633 int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime)
1634 {
1635 uint32_t uniqueID, sessionID, stepID;
1636 BPPMap::iterator it;
1637
1638 try
1639 {
1640
1641 bs.advance(sizeof(ISMPacketHeader));
1642 bs >> sessionID;
1643 bs >> stepID;
1644 bs >> uniqueID;
1645 }
1646 catch (...)
1647 {
1648 // MCOL-857 We don't appear to have the full packet yet!
1649 bs.rewind();
1650 return -1;
1651 }
1652
1653 boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
1654 boost::mutex::scoped_lock scoped(bppLock);
1655
1656 bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID);
1657
1658 if (bppKeysIt != bppKeys.end())
1659 {
1660 bppKeys.erase(bppKeysIt);
1661 }
1662
1663 it = bppMap.find(uniqueID);
1664
1665 if (it != bppMap.end())
1666 {
1667 boost::shared_ptr<BPPV> bppv = it->second;
1668
1669 if (bppv->joinDataReceived)
1670 {
1671 bppv->abort();
1672 bppMap.erase(it);
1673 }
1674 else
1675 {
1676 // MCOL-5. On ubuntu, a crash was happening. Checking
1677 // joinDataReceived here fixes it.
1678 // We're not ready for a destroy. Reschedule.
1679 return -1;
1680 }
1681 }
1682 else
1683 {
1684 //cout << "got a destroy for an unknown obj " << uniqueID << endl;
1685 bs.rewind();
1686
1687 if (posix_time::second_clock::universal_time() > dieTime)
1688 {
1689 // XXXPAT: going to let this fall through and delete jobs for
1690 // uniqueID if there are any. Not clear what the downside is.
1691 /*
1692 lk.unlock();
1693 deleteDJLock(uniqueID);
1694 return 0;
1695 */
1696 }
1697 else
1698 return -1;
1699 }
1700
1701 // cout << " destroy: new size is " << bppMap.size() << endl;
1702 /*
1703 if (sessionID & 0x80000000)
1704 cerr << "destroyed BPP instances for sessionID " << (int)
1705 (sessionID ^ 0x80000000) << " stepID "<< stepID << " (syscat)\n";
1706 else
1707 cerr << "destroyed BPP instances for sessionID " << sessionID <<
1708 " stepID "<< stepID << endl;
1709 */
1710 fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID);
1711 OOBPool->removeJobs(uniqueID);
1712 lk.unlock();
1713 deleteDJLock(uniqueID);
1714 return 0;
1715 }
1716
setBPPToError__anon59b59a2d0111::BPPHandler1717 void setBPPToError(uint32_t uniqueID, const string& error, logging::ErrorCodeValues errorCode)
1718 {
1719 SBPPV bppv;
1720
1721 bppv = grabBPPs(uniqueID);
1722
1723 if (!bppv)
1724 return;
1725
1726 for (uint32_t i = 0; i < bppv->get().size(); i++)
1727 bppv->get()[i]->setError(error, errorCode);
1728
1729 if (bppv->get().empty() && !bppMap.empty() )
1730 bppMap.begin()->second.get()->get()[0]->setError(error, errorCode);
1731 }
1732
1733 // Would be good to define the structure of these msgs somewhere...
getUniqueID__anon59b59a2d0111::BPPHandler1734 inline uint32_t getUniqueID(SBS bs, uint8_t command)
1735 {
1736 uint8_t* buf;
1737
1738 buf = bs->buf();
1739
1740 switch (command)
1741 {
1742 case BATCH_PRIMITIVE_ABORT:
1743 return *((uint32_t*) &buf[sizeof(ISMPacketHeader)]);
1744
1745 case BATCH_PRIMITIVE_ACK:
1746 {
1747 ISMPacketHeader* ism = (ISMPacketHeader*) buf;
1748 return ism->Interleave;
1749 }
1750
1751 case BATCH_PRIMITIVE_ADD_JOINER:
1752 case BATCH_PRIMITIVE_END_JOINER:
1753 case BATCH_PRIMITIVE_DESTROY:
1754 return *((uint32_t*) &buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
1755
1756 default:
1757 return 0;
1758 }
1759 }
1760
1761 PrimitiveServer* fPrimitiveServerPtr;
1762 };
1763
1764 class DictionaryOp : public PriorityThreadPool::Functor
1765 {
1766 public:
DictionaryOp(SBS cmd)1767 DictionaryOp(SBS cmd) : bs(cmd)
1768 {
1769 dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
1770 }
1771 virtual int execute() = 0;
operator ()()1772 int operator()()
1773 {
1774 utils::setThreadName("PPDictOp");
1775 int ret;
1776 ret = execute();
1777
1778 if (ret != 0)
1779 {
1780 bs->rewind();
1781
1782 if (posix_time::second_clock::universal_time() > dieTime)
1783 return 0;
1784 }
1785
1786 return ret;
1787 }
1788 protected:
1789 SBS bs;
1790 private:
1791 posix_time::ptime dieTime;
1792 };
1793
1794 class CreateEqualityFilter : public DictionaryOp
1795 {
1796 public:
CreateEqualityFilter(SBS cmd)1797 CreateEqualityFilter(SBS cmd) : DictionaryOp(cmd) { }
execute()1798 int execute()
1799 {
1800 createEqualityFilter();
1801 return 0;
1802 }
1803
1804 private:
createEqualityFilter()1805 void createEqualityFilter()
1806 {
1807 uint32_t uniqueID, count, i, charsetNumber;
1808 string str;
1809 bs->advance(sizeof(ISMPacketHeader));
1810 *bs >> uniqueID;
1811 *bs >> charsetNumber;
1812
1813 datatypes::Charset cs(charsetNumber);
1814 boost::shared_ptr<DictEqualityFilter> filter(new DictEqualityFilter(cs));
1815
1816 *bs >> count;
1817
1818 for (i = 0; i < count; i++)
1819 {
1820 *bs >> str;
1821 filter->insert(str);
1822 }
1823
1824 boost::mutex::scoped_lock sl(eqFilterMutex);
1825 dictEqualityFilters[uniqueID] = filter;
1826 }
1827 };
1828
1829 class DestroyEqualityFilter : public DictionaryOp
1830 {
1831 public:
DestroyEqualityFilter(SBS cmd)1832 DestroyEqualityFilter(SBS cmd) : DictionaryOp(cmd) { }
execute()1833 int execute()
1834 {
1835 return destroyEqualityFilter();
1836 }
1837
destroyEqualityFilter()1838 int destroyEqualityFilter()
1839 {
1840 uint32_t uniqueID;
1841 map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it;
1842
1843 bs->advance(sizeof(ISMPacketHeader));
1844 *bs >> uniqueID;
1845
1846 boost::mutex::scoped_lock sl(eqFilterMutex);
1847 it = dictEqualityFilters.find(uniqueID);
1848
1849 if (it != dictEqualityFilters.end())
1850 {
1851 dictEqualityFilters.erase(it);
1852 return 0;
1853 }
1854 else
1855 return -1;
1856 }
1857 };
1858
1859 struct ReadThread
1860 {
ReadThread__anon59b59a2d0111::ReadThread1861 ReadThread(const string& serverName, IOSocket& ios, PrimitiveServer* ps) :
1862 fServerName(serverName), fIos(ios), fPrimitiveServerPtr(ps)
1863 {
1864 fBPPHandler.reset(new BPPHandler(ps));
1865 }
1866
buildCacheOpResp__anon59b59a2d0111::ReadThread1867 const ByteStream buildCacheOpResp(int32_t result)
1868 {
1869 const int msgsize = sizeof(ISMPacketHeader) + sizeof(int32_t);
1870 ByteStream::byte msgbuf[msgsize];
1871 memset(msgbuf, 0, sizeof(ISMPacketHeader));
1872 ISMPacketHeader* hdrp = reinterpret_cast<ISMPacketHeader*>(&msgbuf[0]);
1873 hdrp->Command = CACHE_OP_RESULTS;
1874 int32_t* resp = reinterpret_cast<int32_t*>(&msgbuf[sizeof(ISMPacketHeader)]);
1875 *resp = result;
1876 return ByteStream(msgbuf, msgsize);
1877 }
1878
1879 /* Message format:
1880 * ISMPacketHeader
1881 * OID count - 32 bits
1882 * OID array - 32 bits * count
1883 */
doCacheFlushByOID__anon59b59a2d0111::ReadThread1884 void doCacheFlushByOID(SP_UM_IOSOCK ios, ByteStream& bs)
1885 {
1886 uint8_t* buf = bs.buf();
1887 buf += sizeof(ISMPacketHeader);
1888 uint32_t count = *((uint32_t*) buf);
1889 buf += 4;
1890 uint32_t* oids = (uint32_t*) buf;
1891
1892 for (int i = 0; i < fCacheCount; i++)
1893 {
1894 blockCacheClient bc(*BRPp[i]);
1895 bc.flushOIDs(oids, count);
1896 }
1897
1898 ios->write(buildCacheOpResp(0));
1899 }
1900
1901 /* Message format:
1902 * ISMPacketHeader
1903 * Partition count - 32 bits
1904 * Partition set - sizeof(LogicalPartition) * count
1905 * OID count - 32 bits
1906 * OID array - 32 bits * count
1907 */
doCacheFlushByPartition__anon59b59a2d0111::ReadThread1908 void doCacheFlushByPartition(SP_UM_IOSOCK ios, ByteStream& bs)
1909 {
1910 set<BRM::LogicalPartition> partitions;
1911 vector<OID_t> oids;
1912
1913 bs.advance(sizeof(ISMPacketHeader));
1914 deserializeSet<LogicalPartition>(bs, partitions);
1915 deserializeInlineVector<OID_t>(bs, oids);
1916
1917 idbassert(bs.length() == 0);
1918
1919 for (int i = 0; i < fCacheCount; i++)
1920 {
1921 blockCacheClient bc(*BRPp[i]);
1922 bc.flushPartition(oids, partitions);
1923 }
1924
1925 ios->write(buildCacheOpResp(0));
1926 }
1927
doCacheFlushCmd__anon59b59a2d0111::ReadThread1928 void doCacheFlushCmd(SP_UM_IOSOCK ios, const ByteStream& bs)
1929 {
1930 for (int i = 0; i < fCacheCount; i++)
1931 {
1932 blockCacheClient bc(*BRPp[i]);
1933 bc.flushCache();
1934 }
1935
1936 ios->write(buildCacheOpResp(0));
1937 }
1938
doCacheDropFDs__anon59b59a2d0111::ReadThread1939 void doCacheDropFDs(SP_UM_IOSOCK ios, ByteStream& bs)
1940 {
1941 std::vector<BRM::FileInfo> files;
1942
1943 bs.advance(sizeof(ISMPacketHeader));
1944 dropFDCache();
1945 ios->write(buildCacheOpResp(0));
1946 }
1947
doCachePurgeFDs__anon59b59a2d0111::ReadThread1948 void doCachePurgeFDs(SP_UM_IOSOCK ios, ByteStream& bs)
1949 {
1950 std::vector<BRM::FileInfo> files;
1951
1952 bs.advance(sizeof(ISMPacketHeader));
1953 deserializeInlineVector<BRM::FileInfo>(bs, files);
1954 purgeFDCache(files);
1955 ios->write(buildCacheOpResp(0));
1956 }
1957
1958 //N.B. this fcn doesn't actually clean the VSS, but rather instructs PP to flush its
1959 // cache of specific LBID's
doCacheCleanVSSCmd__anon59b59a2d0111::ReadThread1960 void doCacheCleanVSSCmd(SP_UM_IOSOCK ios, const ByteStream& bs)
1961 {
1962 const ByteStream::byte* bytePtr = bs.buf();
1963 const uint32_t* cntp = reinterpret_cast<const uint32_t*>(&bytePtr[sizeof(ISMPacketHeader)]);
1964 const LbidAtVer* itemp =
1965 reinterpret_cast<const LbidAtVer*>(&bytePtr[sizeof(ISMPacketHeader) + sizeof(uint32_t)]);
1966
1967 for (int i = 0; i < fCacheCount; i++)
1968 {
1969 blockCacheClient bc(*BRPp[i]);
1970 bc.flushMany(itemp, *cntp);
1971 }
1972
1973 ios->write(buildCacheOpResp(0));
1974 }
1975
doCacheFlushAllversion__anon59b59a2d0111::ReadThread1976 void doCacheFlushAllversion(SP_UM_IOSOCK ios, const ByteStream& bs)
1977 {
1978 const ByteStream::byte* bytePtr = bs.buf();
1979 const uint32_t* cntp = reinterpret_cast<const uint32_t*>(&bytePtr[sizeof(ISMPacketHeader)]);
1980 const LBID_t* itemp =
1981 reinterpret_cast<const LBID_t*>(&bytePtr[sizeof(ISMPacketHeader) + sizeof(uint32_t)]);
1982
1983 for (int i = 0; i < fCacheCount; i++)
1984 {
1985 blockCacheClient bc(*BRPp[i]);
1986 bc.flushManyAllversion(itemp, *cntp);
1987 }
1988
1989 ios->write(buildCacheOpResp(0));
1990 }
1991
operator ()__anon59b59a2d0111::ReadThread1992 void operator()()
1993 {
1994 utils::setThreadName("PPReadThread");
1995 boost::shared_ptr<threadpool::PriorityThreadPool> procPoolPtr =
1996 fPrimitiveServerPtr->getProcessorThreadPool();
1997 SBS bs;
1998 UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
1999
2000 // Establish default output IOSocket (and mutex) based on the input
2001 // IOSocket. If we end up rotating through multiple output sockets
2002 // for the same UM, we will use UmSocketSelector to select output.
2003 SP_UM_IOSOCK outIosDefault(new IOSocket(fIos));
2004 SP_UM_MUTEX writeLockDefault(new boost::mutex());
2005
2006 bool bRotateDest = fPrimitiveServerPtr->rotatingDestination();
2007
2008 if (bRotateDest)
2009 {
2010 // If we tried adding an IP address not listed as UM in config
2011 // file; probably a DMLProc connection. We allow the connection
2012 // but disable destination rotation since not in Columnstore.xml.
2013 if (!pUmSocketSelector->addConnection(outIosDefault, writeLockDefault))
2014 {
2015 bRotateDest = false;
2016 }
2017 }
2018
2019 SP_UM_IOSOCK outIos(outIosDefault);
2020 SP_UM_MUTEX writeLock(writeLockDefault);
2021
2022 //..Loop to process incoming messages on IOSocket fIos
2023 for (;;)
2024 {
2025 try
2026 {
2027 bs = fIos.read();
2028 }
2029 catch (...)
2030 {
2031 //This connection is dead, nothing useful will come from it ever again
2032 //We can't rely on the state of bs at this point...
2033 if (bRotateDest && pUmSocketSelector)
2034 pUmSocketSelector->delConnection(fIos);
2035
2036 fIos.close();
2037 break;
2038 }
2039
2040 try
2041 {
2042 if (bs->length() != 0)
2043 {
2044 idbassert(bs->length() >= sizeof(ISMPacketHeader));
2045
2046 const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
2047
2048 /* This switch is for the OOB commands */
2049 switch (ismHdr->Command)
2050 {
2051 case CACHE_FLUSH_PARTITION:
2052 doCacheFlushByPartition(outIos, *bs);
2053 fIos.close();
2054 return;
2055
2056 case CACHE_FLUSH_BY_OID:
2057 doCacheFlushByOID(outIos, *bs);
2058 fIos.close();
2059 return;
2060
2061 case CACHE_FLUSH:
2062 doCacheFlushCmd(outIos, *bs);
2063 fIos.close();
2064 return;
2065
2066 case CACHE_CLEAN_VSS:
2067 doCacheCleanVSSCmd(outIos, *bs);
2068 fIos.close();
2069 return;
2070
2071 case FLUSH_ALL_VERSION:
2072 doCacheFlushAllversion(outIos, *bs);
2073 fIos.close();
2074 return;
2075
2076 case CACHE_DROP_FDS:
2077 doCacheDropFDs(outIos, *bs);
2078 fIos.close();
2079 return;
2080
2081 case CACHE_PURGE_FDS:
2082 doCachePurgeFDs(outIos, *bs);
2083 fIos.close();
2084 return;
2085
2086 default:
2087 break;
2088 }
2089
2090 switch (ismHdr->Command)
2091 {
2092 case DICT_CREATE_EQUALITY_FILTER:
2093 {
2094 PriorityThreadPool::Job job;
2095 const uint8_t* buf = bs->buf();
2096 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2097 job.stepID = *((uint32_t*) &buf[pos + 6]);
2098 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2099 job.sock = outIos;
2100 job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new CreateEqualityFilter(bs));
2101 OOBPool->addJob(job);
2102 break;
2103 }
2104
2105 case DICT_DESTROY_EQUALITY_FILTER:
2106 {
2107 PriorityThreadPool::Job job;
2108 const uint8_t* buf = bs->buf();
2109 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2110 job.stepID = *((uint32_t*) &buf[pos + 6]);
2111 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2112 job.sock = outIos;
2113 job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new DestroyEqualityFilter(bs));
2114 OOBPool->addJob(job);
2115 break;
2116 }
2117
2118 case DICT_TOKEN_BY_SCAN_COMPARE:
2119 {
2120 idbassert(bs->length() >= sizeof(TokenByScanRequestHeader));
2121 TokenByScanRequestHeader* hdr = (TokenByScanRequestHeader*) ismHdr;
2122
2123 if (bRotateDest)
2124 {
2125 if (!pUmSocketSelector->nextIOSocket(
2126 fIos, outIos, writeLock))
2127 {
2128 // If we ever fall into this part of the
2129 // code we have a "bug" of some sort.
2130 // See handleUmSockSelErr() for more info.
2131 // We reset ios and mutex to defaults.
2132 handleUmSockSelErr(string("default cmd"));
2133 outIos = outIosDefault;
2134 writeLock = writeLockDefault;
2135 pUmSocketSelector->delConnection(fIos);
2136 bRotateDest = false;
2137 }
2138 }
2139
2140 PriorityThreadPool::Job job;
2141 job.functor = boost::shared_ptr<DictScanJob>(new DictScanJob(outIos,
2142 bs, writeLock));
2143 job.id = hdr->Hdr.UniqueID;
2144 job.weight = LOGICAL_BLOCK_RIDS;
2145 job.priority = hdr->Hdr.Priority;
2146 const uint8_t* buf = bs->buf();
2147 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2148 job.stepID = *((uint32_t*) &buf[pos + 6]);
2149 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2150 job.sock = outIos;
2151
2152 if (hdr->flags & IS_SYSCAT)
2153 {
2154 //boost::thread t(DictScanJob(outIos, bs, writeLock));
2155 // using already-existing threads may cut latency
2156 // if it's changed back to running in an independent thread
2157 // change the issyscat() checks in BPPSeeder as well
2158 OOBPool->addJob(job);
2159 }
2160 else
2161 {
2162 procPoolPtr->addJob(job);
2163 }
2164
2165 break;
2166 }
2167
2168 case BATCH_PRIMITIVE_RUN:
2169 {
2170 if (bRotateDest)
2171 {
2172 if (!pUmSocketSelector->nextIOSocket(
2173 fIos, outIos, writeLock))
2174 {
2175
2176 // If we ever fall into this part of the
2177 // code we have a "bug" of some sort.
2178 // See handleUmSockSelErr() for more info.
2179 // We reset ios and mutex to defaults.
2180 handleUmSockSelErr(string("BPR cmd"));
2181 outIos = outIosDefault;
2182 writeLock = writeLockDefault;
2183 pUmSocketSelector->delConnection(fIos);
2184 bRotateDest = false;
2185 }
2186 }
2187
2188 /* Decide whether this is a syscat call and run
2189 right away instead of queueing */
2190 boost::shared_ptr<BPPSeeder> bpps(new BPPSeeder(bs, writeLock, outIos,
2191 fPrimitiveServerPtr->ProcessorThreads(),
2192 fPrimitiveServerPtr->PTTrace()));
2193 PriorityThreadPool::Job job;
2194 job.functor = bpps;
2195 job.id = bpps->getID();
2196 job.weight = ismHdr->Size;
2197 job.priority = bpps->priority();
2198 const uint8_t* buf = bs->buf();
2199 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2200 job.stepID = *((uint32_t*) &buf[pos + 6]);
2201 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2202 job.sock = outIos;
2203
2204 if (bpps->isSysCat())
2205 {
2206
2207 //boost::thread t(*bpps);
2208 // using already-existing threads may cut latency
2209 // if it's changed back to running in an independent thread
2210 // change the issyscat() checks in BPPSeeder as well
2211 OOBPool->addJob(job);
2212 }
2213 else
2214 {
2215 procPoolPtr->addJob(job);
2216 }
2217
2218 break;
2219 }
2220
2221 case BATCH_PRIMITIVE_CREATE:
2222 {
2223 PriorityThreadPool::Job job;
2224 job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Create(fBPPHandler, bs));
2225 const uint8_t* buf = bs->buf();
2226 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2227 job.stepID = *((uint32_t*) &buf[pos + 6]);
2228 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2229 job.sock = outIos;
2230 OOBPool->addJob(job);
2231 //fBPPHandler->createBPP(*bs);
2232 break;
2233 }
2234
2235 case BATCH_PRIMITIVE_ADD_JOINER:
2236 {
2237 PriorityThreadPool::Job job;
2238 job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::AddJoiner(fBPPHandler, bs));
2239 job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2240 const uint8_t* buf = bs->buf();
2241 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2242 job.stepID = *((uint32_t*) &buf[pos + 6]);
2243 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2244 job.sock = outIos;
2245 OOBPool->addJob(job);
2246 //fBPPHandler->addJoinerToBPP(*bs);
2247 break;
2248 }
2249
2250 case BATCH_PRIMITIVE_END_JOINER:
2251 {
2252 // lastJoinerMsg can block; must do this in a different thread
2253 //OOBPool->invoke(BPPHandler::LastJoiner(fBPPHandler, bs)); // needs a threadpool that can resched
2254 //boost::thread tmp(BPPHandler::LastJoiner(fBPPHandler, bs));
2255 PriorityThreadPool::Job job;
2256 job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::LastJoiner(fBPPHandler, bs));
2257 job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2258 const uint8_t* buf = bs->buf();
2259 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2260 job.stepID = *((uint32_t*) &buf[pos + 6]);
2261 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2262 job.sock = outIos;
2263 OOBPool->addJob(job);
2264 break;
2265 }
2266
2267 case BATCH_PRIMITIVE_DESTROY:
2268 {
2269 //OOBPool->invoke(BPPHandler::Destroy(fBPPHandler, bs)); // needs a threadpool that can resched
2270 //boost::thread tmp(BPPHandler::Destroy(fBPPHandler, bs));
2271 PriorityThreadPool::Job job;
2272 job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Destroy(fBPPHandler, bs));
2273 job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2274 const uint8_t* buf = bs->buf();
2275 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2276 job.stepID = *((uint32_t*) &buf[pos + 6]);
2277 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2278 job.sock = outIos;
2279 OOBPool->addJob(job);
2280 //fBPPHandler->destroyBPP(*bs);
2281 break;
2282 }
2283
2284 case BATCH_PRIMITIVE_ACK:
2285 {
2286 fBPPHandler->doAck(*bs);
2287 break;
2288 }
2289
2290 case BATCH_PRIMITIVE_ABORT:
2291 {
2292 //OBPool->invoke(BPPHandler::Abort(fBPPHandler, bs));
2293 //fBPPHandler->doAbort(*bs);
2294 PriorityThreadPool::Job job;
2295 job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Abort(fBPPHandler, bs));
2296 job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
2297 const uint8_t* buf = bs->buf();
2298 uint32_t pos = sizeof(ISMPacketHeader) - 2;
2299 job.stepID = *((uint32_t*) &buf[pos + 6]);
2300 job.uniqueID = *((uint32_t*) &buf[pos + 10]);
2301 job.sock = outIos;
2302 OOBPool->addJob(job);
2303 break;
2304 }
2305
2306 default:
2307 {
2308 std::ostringstream os;
2309 Logger log;
2310 os << "unknown primitive cmd: " << ismHdr->Command;
2311 log.logMessage(os.str());
2312 break;
2313 }
2314 } // the switch stmt
2315 }
2316 else // bs.length() == 0
2317 {
2318 if (bRotateDest)
2319 pUmSocketSelector->delConnection(fIos);
2320
2321 fIos.close();
2322 break;
2323 }
2324 } // the try- surrounding the if stmt
2325 catch (std::exception& e)
2326 {
2327 Logger logger;
2328 logger.logMessage(e.what());
2329 }
2330 }
2331 }
2332
2333 // If this function is called, we have a "bug" of some sort. We added
2334 // the "fIos" connection to UmSocketSelector earlier, so at the very
2335 // least, UmSocketSelector should have been able to return that con-
2336 // nection/port. We will try to recover by using the original fIos to
2337 // send the response msg; but as stated, if this ever happens we have
2338 // a bug we need to resolve.
handleUmSockSelErr__anon59b59a2d0111::ReadThread2339 void handleUmSockSelErr(const string& cmd)
2340 {
2341 ostringstream oss;
2342 oss << "Unable to rotate through socket destinations (" <<
2343 cmd << ") for connection: " << fIos.toString();
2344 cerr << oss.str() << endl;
2345 logging::Message::Args args;
2346 args.add(oss.str());
2347 mlp->logMessage(logging::M0058, args, false);
2348 }
2349
~ReadThread__anon59b59a2d0111::ReadThread2350 ~ReadThread() {}
2351 string fServerName;
2352 IOSocket fIos;
2353 PrimitiveServer* fPrimitiveServerPtr;
2354 boost::shared_ptr<BPPHandler> fBPPHandler;
2355 };
2356
2357 /** @brief accept a primitive command from the user module
2358 */
2359 struct ServerThread
2360 {
ServerThread__anon59b59a2d0111::ServerThread2361 ServerThread(string serverName, PrimitiveServer* ps) :
2362 fServerName(serverName), fPrimitiveServerPtr(ps)
2363 {
2364 SUMMARY_INFO2("starting server ", fServerName);
2365
2366 bool tellUser = true;
2367 bool toldUser = false;
2368
2369 for (;;)
2370 {
2371 try
2372 {
2373 mqServerPtr = new MessageQueueServer(fServerName);
2374 break;
2375 }
2376 catch (runtime_error& re)
2377 {
2378 string what = re.what();
2379
2380 if (what.find("Address already in use") != string::npos)
2381 {
2382 if (tellUser)
2383 {
2384 cerr << "Address already in use, retrying..." << endl;
2385 tellUser = false;
2386 toldUser = true;
2387 }
2388
2389 sleep(5);
2390 }
2391 else
2392 {
2393 throw;
2394 }
2395 }
2396 }
2397
2398 if (toldUser)
2399 cerr << "Ready." << endl;
2400 }
2401
operator ()__anon59b59a2d0111::ServerThread2402 void operator()()
2403 {
2404 utils::setThreadName("PPServerThr");
2405 IOSocket ios;
2406
2407 try
2408 {
2409 for (;;)
2410 {
2411 ios = mqServerPtr->accept();
2412 //startup a detached thread to handle this socket's I/O
2413 boost::thread rt(ReadThread(fServerName, ios, fPrimitiveServerPtr));
2414 }
2415 }
2416 catch (std::exception& ex)
2417 {
2418 SUMMARY_INFO2("exception caught in ServerThread: ", ex.what());
2419 }
2420 catch (...)
2421 {
2422 SUMMARY_INFO("exception caught in ServerThread.");
2423 }
2424 }
2425
2426 string fServerName;
2427 PrimitiveServer* fPrimitiveServerPtr;
2428 MessageQueueServer* mqServerPtr;
2429 };
2430
2431 } // namespace anon
2432
2433 namespace primitiveprocessor
2434 {
PrimitiveServer(int serverThreads,int serverQueueSize,int processorWeight,int processorQueueSize,bool rotatingDestination,uint32_t BRPBlocks,int BRPThreads,int cacheCount,int maxBlocksPerRead,int readAheadBlocks,uint32_t deleteBlocks,bool ptTrace,double prefetch,uint64_t smallSide)2435 PrimitiveServer::PrimitiveServer(int serverThreads,
2436 int serverQueueSize,
2437 int processorWeight,
2438 int processorQueueSize,
2439 bool rotatingDestination,
2440 uint32_t BRPBlocks,
2441 int BRPThreads,
2442 int cacheCount,
2443 int maxBlocksPerRead,
2444 int readAheadBlocks,
2445 uint32_t deleteBlocks,
2446 bool ptTrace,
2447 double prefetch,
2448 uint64_t smallSide
2449 ):
2450 fServerThreads(serverThreads),
2451 fServerQueueSize(serverQueueSize),
2452 fProcessorWeight(processorWeight),
2453 fProcessorQueueSize(processorQueueSize),
2454 fMaxBlocksPerRead(maxBlocksPerRead),
2455 fReadAheadBlocks(readAheadBlocks),
2456 fRotatingDestination(rotatingDestination),
2457 fPTTrace(ptTrace),
2458 fPrefetchThreshold(prefetch),
2459 fPMSmallSide(smallSide)
2460 {
2461 fCacheCount = cacheCount;
2462 fServerpool.setMaxThreads(fServerThreads);
2463 fServerpool.setQueueSize(fServerQueueSize);
2464 fServerpool.setName("PrimitiveServer");
2465
2466 fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
2467 medPriorityThreads, lowPriorityThreads, 0));
2468
2469 // We're not using either the priority or the job-clustering features, just need a threadpool
2470 // that can reschedule jobs, and an unlimited non-blocking queue
2471 OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
2472
2473 asyncCounter = 0;
2474
2475 brm = new DBRM();
2476
2477 BRPp = new BlockRequestProcessor*[fCacheCount];
2478
2479 try
2480 {
2481 for (int i = 0; i < fCacheCount; i++)
2482 BRPp[i] = new BlockRequestProcessor(BRPBlocks / fCacheCount, BRPThreads / fCacheCount,
2483 fMaxBlocksPerRead, deleteBlocks / fCacheCount);
2484 }
2485 catch (...)
2486 {
2487 cerr << "Unable to allocate " << BRPBlocks << " cache blocks. Adjust the DBBC config parameter "
2488 "downward." << endl;
2489 mlp->logMessage(logging::M0045, logging::Message::Args(), true);
2490 exit(1);
2491 }
2492 }
2493
~PrimitiveServer()2494 PrimitiveServer::~PrimitiveServer()
2495 {
2496 }
2497
start(Service * service)2498 void PrimitiveServer::start(Service *service)
2499 {
2500 // start all the server threads
2501 for ( int i = 1; i <= fServerThreads; i++)
2502 {
2503 string s("PMS");
2504 stringstream oss;
2505 oss << s << i;
2506
2507 fServerpool.invoke(ServerThread(oss.str(), this));
2508 }
2509
2510 service->NotifyServiceStarted();
2511
2512 fServerpool.wait();
2513
2514 cerr << "PrimitiveServer::start() exiting!" << endl;
2515 }
2516
BPPV()2517 BPPV::BPPV()
2518 {
2519 sendThread.reset(new BPPSendThread());
2520 v.reserve(BPPCount);
2521 pos = 0;
2522 joinDataReceived = false;
2523 }
2524
~BPPV()2525 BPPV::~BPPV()
2526 {
2527 BOOST_FOREACH( boost::shared_ptr<BatchPrimitiveProcessor> bpp, v )
2528 {
2529 bpp->resetMem();
2530 }
2531 }
2532
add(boost::shared_ptr<BatchPrimitiveProcessor> a)2533 void BPPV::add(boost::shared_ptr<BatchPrimitiveProcessor> a)
2534 {
2535 /* Right now BPP initialization locks the object if there is a join to
2536 prevent the instance from being used before the join data is received.
2537 The init/join/run sync code is quite old and confusing at this point,
2538 and this makes it a little worse by circumventing the original design.
2539 Due for a rewrite. */
2540
2541 if (!unusedInstance)
2542 {
2543 unusedInstance = a->duplicate();
2544
2545 if (a->hasJoin())
2546 {
2547 joinDataReceived = false;
2548 unusedInstance->unlock();
2549 }
2550 else
2551 joinDataReceived = true;
2552 }
2553
2554 v.push_back(a);
2555 }
get()2556 const vector<boost::shared_ptr<BatchPrimitiveProcessor> >& BPPV::get()
2557 {
2558 return v;
2559 }
2560
2561
next()2562 boost::shared_ptr<BatchPrimitiveProcessor> BPPV::next()
2563 {
2564 uint32_t size = v.size();
2565 uint32_t i;
2566
2567 #if 0
2568
2569 // This block of code scans for the first available BPP instance,
2570 // makes BPPSeeder reschedule it if none are available. Relies on BPP instance
2571 // being preallocated.
2572 for (i = 0; i < size; i++)
2573 {
2574 uint32_t index = (i + pos) % size;
2575
2576 if (!(v[index]->busy()))
2577 {
2578 pos = (index + 1) % size;
2579 v[index]->busy(true);
2580 return v[index];
2581 }
2582 }
2583
2584 // They're all busy, make threadpool reschedule the job
2585 return boost::shared_ptr<BatchPrimitiveProcessor>();
2586 #endif
2587
2588 //This block of code creates BPP instances if/when they are needed
2589
2590 // don't use a processor thread when it will just block, reschedule it instead
2591 if (!joinDataReceived)
2592 return boost::shared_ptr<BatchPrimitiveProcessor>();
2593
2594 for (i = 0; i < size; i++)
2595 {
2596 uint32_t index = (i + pos) % size;
2597
2598 if (!(v[index]->busy()))
2599 {
2600 pos = (index + 1) % size;
2601 v[index]->busy(true);
2602 return v[index];
2603 }
2604 }
2605
2606 // honor the BPPCount limit, mostly for debugging purposes.
2607 if (size >= BPPCount)
2608 return boost::shared_ptr<BatchPrimitiveProcessor>();
2609
2610 SBPP newone = unusedInstance->duplicate();
2611
2612 if (newone->hasJoin())
2613 newone->unlock();
2614
2615 newone->busy(true);
2616 v.push_back(newone);
2617 return newone;
2618 }
2619
abort()2620 void BPPV::abort()
2621 {
2622 sendThread->abort();
2623 BOOST_FOREACH( boost::shared_ptr<BatchPrimitiveProcessor> bpp, v )
2624 {
2625 bpp->unlock();
2626 }
2627 }
2628
aborted()2629 bool BPPV::aborted()
2630 {
2631 return sendThread->aborted();
2632 }
2633
2634 // end workaround
2635
2636 } // namespace primitiveprocessor
2637 // vim:ts=4 sw=4:
2638
2639