1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 // $Id: iomanager.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
20 //
21 // C++ Implementation: iomanager
22 //
23 // Description:
24 //
25 //
26 // Author: Jason Rodriguez <jrodriguez@calpont.com>
27 //
28 //
29 //
30
31 #include "mcsconfig.h"
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35 #ifdef __linux__
36 #include <sys/mount.h>
37 #include <linux/fs.h>
38 #endif
39 #ifdef BLOCK_SIZE
40 #undef BLOCK_SIZE
41 #endif
42 #ifdef READ
43 #undef READ
44 #endif
45 #ifdef WRITE
46 #undef WRITE
47 #endif
48 #include <stdexcept>
49 #include <unistd.h>
50 #include <stdlib.h>
51 #include <string>
52 #include <sstream>
53 #ifdef _MSC_VER
54 #include <unordered_map>
55 #include <unordered_set>
56 #else
57 #include <tr1/unordered_map>
58 #include <tr1/unordered_set>
59 #endif
60 #include <set>
61 #include <sys/types.h>
62 #include <sys/stat.h>
63 #include <sys/time.h>
64 #include <fcntl.h>
65 #include <errno.h>
66 #include <boost/shared_ptr.hpp>
67 #include <boost/scoped_array.hpp>
68 #include <boost/thread.hpp>
69 #include <boost/thread/condition.hpp>
70 #ifdef _MSC_VER
71 typedef int pthread_t;
72 #else
73 #include <pthread.h>
74 #endif
75 //#define NDEBUG
76 #include <cassert>
77
78 using namespace std;
79
80 #include "configcpp.h"
81 using namespace config;
82
83 #include "messageobj.h"
84 #include "messageids.h"
85 using namespace logging;
86
87 #include "brmtypes.h"
88
89 #include "pp_logger.h"
90
91 #include "fsutils.h"
92
93 #include "rwlock_local.h"
94
95 #include "iomanager.h"
96 #include "liboamcpp.h"
97
98 #include "idbcompress.h"
99 using namespace compress;
100
101 #include "IDBDataFile.h"
102 #include "IDBPolicy.h"
103 #include "IDBLogger.h"
104 using namespace idbdatafile;
105
106 #include "mcsconfig.h"
107
108 typedef tr1::unordered_set<BRM::OID_t> USOID;
109
110 namespace primitiveprocessor
111 {
112 extern Logger* mlp;
113 extern int directIOFlag;
114 extern int noVB;
115 }
116
117 #ifndef O_BINARY
118 # define O_BINARY 0
119 #endif
120 #ifndef O_LARGEFILE
121 # define O_LARGEFILE 0
122 #endif
123 #ifndef O_NOATIME
124 # define O_NOATIME 0
125 #endif
126
127 namespace
128 {
129
130 using namespace dbbc;
131 using namespace std;
132
133 const std::string boldStart = "\033[0;1m";
134 const std::string boldStop = "\033[0;39m";
135
136 const uint32_t MAX_OPEN_FILES = 16384;
137 const uint32_t DECREASE_OPEN_FILES = 4096;
138
timespec_sub(const struct timespec & tv1,const struct timespec & tv2,double & tm)139 void timespec_sub(const struct timespec& tv1,
140 const struct timespec& tv2,
141 double& tm)
142 {
143 tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
144 }
145
146 struct IOMThreadArg
147 {
148 ioManager* iom;
149 int32_t thdId;
150 };
151
152 typedef IOMThreadArg IOMThreadArg_t;
153
154 /* structures shared across all iomanagers */
155 class FdEntry
156 {
157 public:
FdEntry()158 FdEntry() : oid(0), dbroot(0), partNum(0), segNum(0),
159 fp(0), c(0), inUse(0), compType(0)
160 {
161 cmpMTime = 0;
162 }
163
FdEntry(const BRM::OID_t o,const uint16_t d,const uint32_t p,const uint16_t s,const int ct,IDBDataFile * f)164 FdEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const int ct, IDBDataFile* f) :
165 oid(o), dbroot(d), partNum(p), segNum(s), fp(f), c(0), inUse(0), compType(0)
166 {
167 cmpMTime = 0;
168
169 if (oid >= 1000)
170 compType = ct;
171 }
172
~FdEntry()173 ~FdEntry()
174 {
175 delete fp;
176 fp = 0;
177 }
178
179 BRM::OID_t oid;
180 uint16_t dbroot;
181 uint32_t partNum;
182 uint16_t segNum;
183 IDBDataFile* fp;
184 uint32_t c;
185 int inUse;
186
187 CompChunkPtrList ptrList;
188
189 int compType;
isCompressed() const190 bool isCompressed() const
191 {
192 return (oid >= 1000 && compType != 0);
193 }
194 time_t cmpMTime;
operator <<(ostream & out,const FdEntry & o)195 friend ostream& operator<<(ostream& out, const FdEntry& o)
196 {
197 out << " o: " << o.oid
198 << " f: " << o.fp
199 << " d: " << o.dbroot
200 << " p: " << o.partNum
201 << " s: " << o.segNum
202 << " c: " << o.c
203 << " t: " << o.compType
204 << " m: " << o.cmpMTime;
205 return out;
206 }
207 };
208
209 struct fdCacheMapLessThan
210 {
operator ()__anone26ae9ab0111::fdCacheMapLessThan211 bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
212 {
213 if (lhs.oid < rhs.oid)
214 return true;
215
216 if (lhs.oid == rhs.oid && lhs.dbroot < rhs.dbroot)
217 return true;
218
219 if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum < rhs.partNum)
220 return true;
221
222 if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum && lhs.segNum < rhs.segNum)
223 return true;
224
225 return false;
226
227 }
228 };
229
230 struct fdMapEqual
231 {
operator ()__anone26ae9ab0111::fdMapEqual232 bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
233 {
234 return (lhs.oid == rhs.oid &&
235 lhs.dbroot == rhs.dbroot &&
236 lhs.partNum == rhs.partNum &&
237 lhs.segNum == rhs.segNum);
238 }
239
240 };
241
242 typedef boost::shared_ptr<FdEntry> SPFdEntry_t;
243 typedef std::map<FdEntry, SPFdEntry_t, fdCacheMapLessThan> FdCacheType_t;
244
245 struct FdCountEntry
246 {
FdCountEntry__anone26ae9ab0111::FdCountEntry247 FdCountEntry() {}
FdCountEntry__anone26ae9ab0111::FdCountEntry248 FdCountEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const uint32_t c,
249 const FdCacheType_t::iterator it) : oid(o), dbroot(d), partNum(p), segNum(s), cnt(c), fdit(it) {}
~FdCountEntry__anone26ae9ab0111::FdCountEntry250 ~FdCountEntry() {}
251
252 BRM::OID_t oid;
253 uint16_t dbroot;
254 uint32_t partNum;
255 uint16_t segNum;
256 uint32_t cnt;
257 FdCacheType_t::iterator fdit;
258
operator <<(ostream & out,const FdCountEntry & o)259 friend ostream& operator<<(ostream& out, const FdCountEntry& o)
260 {
261 out << " o: " << o.oid
262 << " d: " << o.dbroot
263 << " p: " << o.partNum
264 << " s: " << o.segNum
265 << " c: " << o.cnt;
266
267 return out;
268 }
269
270 }; // FdCountEntry
271
272 typedef FdCountEntry FdCountEntry_t;
273
274 struct fdCountCompare
275 {
operator ()__anone26ae9ab0111::fdCountCompare276 bool operator() (const FdCountEntry_t& lhs, const FdCountEntry_t& rhs)
277 {
278 return lhs.cnt > rhs.cnt;
279 }
280 };
281
282 typedef multiset<FdCountEntry_t, fdCountCompare> FdCacheCountType_t;
283
284 FdCacheType_t fdcache;
285 boost::mutex fdMapMutex;
286 rwlock::RWLock_local localLock;
287
alignTo(const char * in,int av)288 char* alignTo(const char* in, int av)
289 {
290 ptrdiff_t inx = reinterpret_cast<ptrdiff_t>(in);
291 ptrdiff_t avx = static_cast<ptrdiff_t>(av);
292
293 if ((inx % avx) != 0)
294 {
295 inx &= ~(avx - 1);
296 inx += avx;
297 }
298
299 char* outx = reinterpret_cast<char*>(inx);
300 return outx;
301 }
302
waitForRetry(long count)303 void waitForRetry(long count)
304 {
305 usleep(5000 * count);
306 return;
307 }
308
309
310 //Must hold the FD cache lock!
updateptrs(char * ptr,FdCacheType_t::iterator fdit,const IDBCompressInterface & decompressor)311 int updateptrs(char* ptr, FdCacheType_t::iterator fdit, const IDBCompressInterface& decompressor)
312 {
313 ssize_t i;
314 uint32_t progress;
315
316
317 // ptr is taken from buffer, already been checked: realbuff.get() == 0
318 if (ptr == 0)
319 return -1;
320
321 // already checked before: fdit->second->isCompressed()
322 if (fdit->second.get() == 0)
323 return -2;
324
325 IDBDataFile* fp = fdit->second->fp;
326
327 if (fp == INVALID_HANDLE_VALUE)
328 {
329 Message::Args args;
330 args.add("updateptrs got invalid fp.");
331 primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
332 return -3;
333 }
334
335 //We need to read one extra block because we need the first ptr in the 3rd block
336 // to know if we're done.
337 //FIXME: re-work all of this so we don't have to re-read the 3rd block.
338 progress = 0;
339
340 while (progress < 4096 * 3)
341 {
342 i = fp->pread(&ptr[progress], progress, (4096 * 3) - progress);
343
344 if (i <= 0)
345 break;
346
347 progress += i;
348 }
349
350 if (progress != 4096 * 3)
351 return -4; // let it retry. Not likely, but ...
352
353 fdit->second->cmpMTime = 0;
354 time_t mtime = fp->mtime();
355
356 if ( mtime != (time_t) - 1 )
357 fdit->second->cmpMTime = mtime;
358
359 int gplRc = 0;
360 gplRc = decompressor.getPtrList(&ptr[4096], 4096, fdit->second->ptrList);
361
362 if (gplRc != 0)
363 return -5; // go for a retry.
364
365 if (fdit->second->ptrList.size() == 0)
366 return -6; // go for a retry.
367
368 uint64_t numHdrs = fdit->second->ptrList[0].first / 4096ULL - 2ULL;
369
370 if (numHdrs > 0)
371 {
372 boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + 4095]);
373 char* nextHdrBufPtr = 0;
374
375 nextHdrBufPtr = alignTo(nextHdrBufsa.get(), 4096);
376
377 progress = 0;
378
379 while (progress < numHdrs * 4096)
380 {
381 i = fp->pread(&nextHdrBufPtr[progress], (4096 * 2) + progress,
382 (numHdrs * 4096) - progress);
383
384 if (i <= 0)
385 break;
386
387 progress += i;
388 }
389
390 if (progress != numHdrs * 4096)
391 return -8;
392
393 CompChunkPtrList nextPtrList;
394 gplRc = decompressor.getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
395
396 if (gplRc != 0)
397 return -7; // go for a retry.
398
399 fdit->second->ptrList.insert(fdit->second->ptrList.end(), nextPtrList.begin(), nextPtrList.end());
400 }
401
402 return 0;
403 }
404
thr_popper(ioManager * arg)405 void* thr_popper(ioManager* arg)
406 {
407 ioManager* iom = arg;
408 FileBufferMgr* fbm;
409 int totalRqst = 0;
410 fileRequest* fr = 0;
411 BRM::LBID_t lbid = 0;
412 BRM::OID_t oid = 0;
413 BRM::VER_t ver = 0;
414 BRM::QueryContext qc;
415 BRM::VER_t txn = 0;
416 int compType = 0;
417 int blocksLoaded = 0;
418 int blocksRead = 0;
419 const unsigned pageSize = 4096;
420 fbm = &iom->fileBufferManager();
421 char fileName[WriteEngine::FILE_NAME_SIZE];
422 const uint64_t fileBlockSize = BLOCK_SIZE;
423 bool flg = false;
424 bool useCache;
425 uint16_t dbroot = 0;
426 uint32_t partNum = 0;
427 uint16_t segNum = 0;
428 uint32_t offset = 0;
429 char* fileNamePtr = fileName;
430 uint64_t longSeekOffset = 0;
431 int err;
432 uint32_t dlen = 0, acc, readSize, blocksThisRead, j;
433 uint32_t blocksRequested = 0;
434 ssize_t i;
435 char* alignedbuff = 0;
436 boost::scoped_array<char> realbuff;
437 pthread_t threadId = 0;
438 ostringstream iomLogFileName;
439 ofstream lFile;
440 struct timespec rqst1;
441 struct timespec rqst2;
442 struct timespec tm;
443 struct timespec tm2;
444 double tm3;
445 double rqst3;
446 bool locked = false;
447 SPFdEntry_t fe;
448 IDBCompressInterface decompressor;
449 vector<CacheInsert_t> cacheInsertOps;
450 bool copyLocked = false;
451
452 if (iom->IOTrace())
453 {
454 #ifdef _MSC_VER
455 threadId = GetCurrentThreadId();
456 iomLogFileName << "C:/Calpont/log/trace/iom." << threadId;
457 #else
458 threadId = pthread_self();
459 iomLogFileName << MCSLOGDIR << "/trace/iom." << threadId;
460 #endif
461 lFile.open(iomLogFileName.str().c_str(), ios_base::app | ios_base::ate);
462 }
463
464 FdCacheType_t::iterator fdit;
465 IDBDataFile* fp = 0;
466 uint32_t maxCompSz = IDBCompressInterface::maxCompressedSize(iom->blocksPerRead * BLOCK_SIZE);
467 uint32_t readBufferSz = maxCompSz + pageSize;
468
469 realbuff.reset(new char[readBufferSz]);
470
471 if (realbuff.get() == 0)
472 {
473 cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
474 return 0;
475 }
476
477 alignedbuff = alignTo(realbuff.get(), 4096);
478
479 if ((((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff.get()) >= (ptrdiff_t)pageSize) ||
480 (((ptrdiff_t)alignedbuff % pageSize) != 0))
481 throw runtime_error("aligned buffer size is not matching the page size.");
482
483 uint8_t* uCmpBuf = 0;
484 uCmpBuf = new uint8_t[4 * 1024 * 1024 + 4];
485
486 for ( ; ; )
487 {
488 if (copyLocked)
489 {
490 iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
491 copyLocked = false;
492 }
493
494 if (locked)
495 {
496 localLock.read_unlock();
497 locked = false;
498 }
499
500 fr = iom->getNextRequest();
501
502 localLock.read_lock();
503 locked = true;
504
505 if (iom->IOTrace())
506 clock_gettime(CLOCK_REALTIME, &rqst1);
507
508 lbid = fr->Lbid();
509 qc = fr->Ver();
510 txn = fr->Txn();
511 flg = fr->Flg();
512 compType = fr->CompType();
513 useCache = fr->useCache();
514 blocksLoaded = 0;
515 blocksRead = 0;
516 dlen = fr->BlocksRequested();
517 blocksRequested = fr->BlocksRequested();
518 oid = 0;
519 dbroot = 0;
520 partNum = 0;
521 segNum = 0;
522 offset = 0;
523
524 // special case for getBlock.
525 iom->dbrm()->lockLBIDRange(lbid, blocksRequested);
526 copyLocked = true;
527
528 // special case for getBlock.
529 if (blocksRequested == 1)
530 {
531 BRM::VER_t outVer;
532 iom->dbrm()->vssLookup((BRM::LBID_t) lbid, qc, txn, &outVer, &flg);
533 ver = outVer;
534 fr->versioned(flg);
535 }
536 else
537 {
538 fr->versioned(false);
539 ver = qc.currentScn;
540 }
541
542 err = iom->localLbidLookup(lbid,
543 ver,
544 flg,
545 oid,
546 dbroot,
547 partNum,
548 segNum,
549 offset);
550
551 if (err == BRM::ERR_SNAPSHOT_TOO_OLD)
552 {
553 ostringstream errMsg;
554 errMsg << "thr_popper: version " << ver << " of LBID " << lbid <<
555 "is too old";
556 iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
557 continue;
558 }
559 else if (err < 0)
560 {
561 ostringstream errMsg;
562 errMsg << "thr_popper: BRM lookup failure; lbid=" <<
563 lbid << "; ver=" << ver << "; flg=" << (flg ? 1 : 0);
564 iom->handleBlockReadError( fr, errMsg.str(), ©Locked, fileRequest::BRM_LOOKUP_ERROR);
565 continue;
566 }
567
568 #ifdef IDB_COMP_POC_DEBUG
569 {
570 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
571
572 if (compType != 0) cout << boldStart;
573
574 cout << "fileRequest: " << *fr << endl;
575
576 if (compType != 0) cout << boldStop;
577 }
578 #endif
579 const uint32_t extentSize = iom->getExtentRows();
580 FdEntry fdKey(oid, dbroot, partNum, segNum, compType, NULL);
581 //cout << "Looking for " << fdKey << endl
582 // << "O: " << oid << " D: " << dbroot << " P: " << partNum << " S: " << segNum << endl;
583
584 fdMapMutex.lock();
585 fdit = fdcache.find(fdKey);
586
587 if (fdit == fdcache.end())
588 {
589 try
590 {
591 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
592 }
593 catch (exception& exc)
594 {
595 fdMapMutex.unlock();
596 Message::Args args;
597 args.add(oid);
598 args.add(exc.what());
599 primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
600 ostringstream errMsg;
601 errMsg << "thr_popper: Error building filename for OID " <<
602 oid << "; " << exc.what();
603 iom->handleBlockReadError( fr, errMsg.str(), ©Locked );
604 continue;
605 }
606
607 #ifdef IDB_COMP_USE_CMP_SUFFIX
608
609 if (compType != 0)
610 {
611 char* ptr = strrchr(fileNamePtr, '.');
612 idbassert(ptr);
613 strcpy(ptr, ".cmp");
614 }
615
616 #endif
617
618 if (oid > 3000)
619 {
620 //TODO: should syscat columns be considered when reducing open file count
621 // They are always needed why should they be closed?
622 if (fdcache.size() >= iom->MaxOpenFiles())
623 {
624 FdCacheCountType_t fdCountSort;
625
626 for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
627 {
628 struct FdCountEntry fdc(it->second->oid,
629 it->second->dbroot,
630 it->second->partNum,
631 it->second->segNum,
632 it->second->c,
633 it);
634
635 fdCountSort.insert(fdc);
636 }
637
638 if (iom->FDCacheTrace())
639 {
640 iom->FDTraceFile()
641 << "Before flushing sz: " << fdcache.size()
642 << " delCount: " << iom->DecreaseOpenFilesCount()
643 << endl;
644
645 for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
646 iom->FDTraceFile() << *(*it).second << endl;
647
648 iom->FDTraceFile() << "==================" << endl << endl;
649 }
650
651 // TODO: should we consider a minimum number of open files
652 // currently, there is nothing to prevent all open files
653 // from being closed by the IOManager.
654
655 uint32_t delCount = 0;
656
657 for (FdCacheCountType_t::reverse_iterator rit = fdCountSort.rbegin();
658 rit != fdCountSort.rend() &&
659 fdcache.size() > 0 &&
660 delCount < iom->DecreaseOpenFilesCount();
661 rit++)
662 {
663 FdEntry oldfdKey(rit->oid, rit->dbroot, rit->partNum, rit->segNum, 0, NULL);
664 FdCacheType_t::iterator it = fdcache.find(oldfdKey);
665
666 if (it != fdcache.end())
667 {
668 if (iom->FDCacheTrace())
669 {
670 if (!rit->fdit->second->inUse)
671 iom->FDTraceFile()
672 << "Removing dc: " << delCount << " sz: " << fdcache.size()
673 << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
674 else
675 iom->FDTraceFile()
676 << "Skip Remove in use dc: " << delCount << " sz: " << fdcache.size()
677 << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
678 }
679
680 if (rit->fdit->second->inUse <= 0)
681 {
682 fdcache.erase(it);
683 delCount++;
684 }
685 }
686 } // for (FdCacheCountType_t...
687
688 if (iom->FDCacheTrace())
689 {
690 iom->FDTraceFile() << "After flushing sz: " << fdcache.size() << endl;
691
692 for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
693 {
694 iom->FDTraceFile()
695 << *(*it).second << endl;
696 }
697
698 iom->FDTraceFile() << "==================" << endl << endl;
699 }
700
701 fdCountSort.clear();
702
703 } // if (fdcache.size()...
704 } // if (oid > 3000)
705
706 int opts = primitiveprocessor::directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
707 fp = NULL;
708 uint32_t openRetries = 0;
709 int saveErrno = 0;
710
711 while (fp == NULL && openRetries++ < 5)
712 {
713 fp = IDBDataFile::open(
714 IDBPolicy::getType( fileNamePtr, IDBPolicy::PRIMPROC ),
715 fileNamePtr,
716 "r",
717 opts);
718 saveErrno = errno;
719
720 if (fp == NULL)
721 sleep(1);
722 }
723
724 if ( fp == NULL )
725 {
726 Message::Args args;
727 fdit = fdcache.end();
728 fdMapMutex.unlock();
729 args.add(oid);
730 args.add(string(fileNamePtr) + ":" + strerror(saveErrno));
731 primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
732 ostringstream errMsg;
733 errMsg << "thr_popper: Error opening file for OID " << oid << "; "
734 << fileNamePtr << "; " << strerror(saveErrno);
735 int errorCode = fileRequest::FAILED;
736
737 if (saveErrno == EINVAL)
738 errorCode = fileRequest::FS_EINVAL;
739 else if (saveErrno == ENOENT)
740 errorCode = fileRequest::FS_ENOENT;
741
742 iom->handleBlockReadError(fr, errMsg.str(), ©Locked, errorCode);
743 continue;
744 }
745
746 fe.reset( new FdEntry(oid, dbroot, partNum, segNum, compType, fp) );
747 fe->inUse++;
748 fdcache[fdKey] = fe;
749 fdit = fdcache.find(fdKey);
750 fe.reset();
751 }
752
753 else
754 {
755 if (fdit->second.get())
756 {
757 fdit->second->c++;
758 fdit->second->inUse++;
759 fp = fdit->second->fp;
760 }
761 else
762 {
763 Message::Args args;
764 fdit = fdcache.end();
765 fdMapMutex.unlock();
766 args.add(oid);
767 ostringstream errMsg;
768 errMsg << "Null FD cache entry. (dbroot, partNum, segNum, compType) = ("
769 << dbroot << ", " << partNum << ", " << segNum << ", " << compType << ")";
770 args.add(errMsg.str());
771 primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
772 iom->handleBlockReadError( fr, errMsg.str(), ©Locked );
773 continue;
774 }
775 }
776
777 fdMapMutex.unlock();
778
779 #ifdef SHARED_NOTHING_DEMO_2
780
781 // change offset if it's shared nothing
782 /* Get the extent #, divide by # of PMs, calculate base offset for the new extent #,
783 add extent offset */
784 if (oid >= 10000)
785 offset = (((offset / extentSize) / iom->pmCount) * extentSize) + (offset % extentSize);
786
787 #endif
788
789 longSeekOffset = (uint64_t)offset * (uint64_t)fileBlockSize;
790 lldiv_t cmpOffFact = lldiv(longSeekOffset, (4LL * 1024LL * 1024LL));
791 totalRqst++;
792
793 uint32_t readCount = 0;
794 uint32_t bytesRead = 0;
795 uint32_t compressedBytesRead = 0; // @Bug 3149. IOMTrace was not reporting bytesRead correctly for compressed columns.
796 uint32_t jend = blocksRequested / iom->blocksPerRead;
797
798 if (iom->IOTrace())
799 clock_gettime(CLOCK_REALTIME, &tm);
800
801 ostringstream errMsg;
802 bool errorOccurred = false;
803 string errorString;
804 #ifdef IDB_COMP_POC_DEBUG
805 bool debugWrite = false;
806 #endif
807
808 #ifdef EM_AS_A_TABLE_POC__
809 dlen = 1;
810 #endif
811
812 if (blocksRequested % iom->blocksPerRead)
813 jend++;
814
815 for (j = 0; j < jend; j++)
816 {
817
818 int decompRetryCount = 0;
819 int retryReadHeadersCount = 0;
820
821 decompRetry:
822 blocksThisRead = std::min(dlen, iom->blocksPerRead);
823 readSize = blocksThisRead * BLOCK_SIZE;
824
825 acc = 0;
826
827 while (acc < readSize)
828 {
829 #if defined(EM_AS_A_TABLE_POC__)
830
831 if (oid == 1084)
832 {
833 uint32_t h;
834 int32_t o = 0;
835 int32_t* ip;
836 ip = (int32_t*)(&alignedbuff[acc]);
837
838 for (o = 0; o < 2048; o++)
839 {
840 if (iom->dbrm()->getHWM(o + 3000, h) == 0)
841 *ip++ = h;
842 else
843 *ip++ = numeric_limits<int32_t>::min() + 1;
844 }
845
846 i = BLOCK_SIZE;
847 }
848 else
849 i = pread(fd, &alignedbuff[acc], readSize - acc, longSeekOffset);
850
851 #else
852
853 if (fdit->second->isCompressed())
854 {
855 retryReadHeaders:
856 //hdrs may have been modified since we cached them in fdit->second...
857 time_t cur_mtime = numeric_limits<time_t>::max();
858 int updatePtrsRc = 0;
859 fdMapMutex.lock();
860 time_t fp_mtime = fp->mtime();
861
862 if ( fp_mtime != (time_t) - 1)
863 cur_mtime = fp_mtime;
864
865 if (decompRetryCount > 0 || retryReadHeadersCount > 0 || cur_mtime > fdit->second->cmpMTime)
866 updatePtrsRc = updateptrs(&alignedbuff[0], fdit, decompressor);
867
868 fdMapMutex.unlock();
869
870 int idx = cmpOffFact.quot;
871
872 if (updatePtrsRc != 0 || idx >= (signed)fdit->second->ptrList.size())
873 {
874 // Due to race condition, the header on disk may not upated yet.
875 // Log an info message and retry.
876 if (retryReadHeadersCount == 0)
877 {
878 Message::Args args;
879 args.add(oid);
880 ostringstream infoMsg;
881 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
882 infoMsg << "retry updateptrs for " << fileNamePtr
883 << ". rc=" << updatePtrsRc << ", idx="
884 << idx << ", ptr.size=" << fdit->second->ptrList.size();
885 args.add(infoMsg.str());
886 primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
887 }
888
889 if (++retryReadHeadersCount < 30)
890 {
891 waitForRetry(retryReadHeadersCount);
892 fdit->second->cmpMTime = 0;
893 goto retryReadHeaders;
894 }
895 else
896 {
897 // still fail after all the retries.
898 errorOccurred = true;
899 errMsg << "Error reading compression header. rc=" << updatePtrsRc << ", idx="
900 << idx << ", ptr.size=" << fdit->second->ptrList.size();
901 errorString = errMsg.str();
902 break;
903 }
904 }
905
906 //FIXME: make sure alignedbuff can hold fdit->second->ptrList[idx].second bytes
907 if (fdit->second->ptrList[idx].second > maxCompSz)
908 {
909 errorOccurred = true;
910 errMsg << "aligned buff too small. dataSize="
911 << fdit->second->ptrList[idx].second
912 << ", buffSize=" << maxCompSz;
913 errorString = errMsg.str();
914 break;
915 }
916
917 i = fp->pread(&alignedbuff[0], fdit->second->ptrList[idx].first, fdit->second->ptrList[idx].second );
918 #ifdef IDB_COMP_POC_DEBUG
919 {
920 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
921 cout << boldStart << "pread1.1(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", " << fdit->second->ptrList[idx].second <<
922 ", " << fdit->second->ptrList[idx].first << ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << boldStop << endl;
923 }
924 #endif
925
926 // @bug3407, give it some retries if pread failed.
927 if (i != (ssize_t)fdit->second->ptrList[idx].second)
928 {
929 // log an info message
930 if (retryReadHeadersCount == 0)
931 {
932 Message::Args args;
933 args.add(oid);
934 ostringstream infoMsg;
935 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
936 infoMsg << " Read from " << fileNamePtr << " failed at chunk " << idx
937 << ". (offset, size, actuall read) = ("
938 << fdit->second->ptrList[idx].first << ", "
939 << fdit->second->ptrList[idx].second << ", " << i << ")";
940 args.add(infoMsg.str());
941 primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
942 }
943
944 if (++retryReadHeadersCount < 30)
945 {
946 waitForRetry(retryReadHeadersCount);
947 fdit->second->cmpMTime = 0;
948 goto retryReadHeaders;
949 }
950 else
951 {
952 errorOccurred = true;
953 errMsg << "Error reading chunk " << idx;
954 errorString = errMsg.str();
955 break;
956 }
957 }
958
959 compressedBytesRead += i; // @Bug 3149.
960 i = readSize;
961 }
962 else
963 {
964 i = fp->pread(&alignedbuff[acc], longSeekOffset, readSize - acc);
965 #ifdef IDB_COMP_POC_DEBUG
966 {
967 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
968 cout << "pread1.2(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[acc] << dec << ", " << (readSize - acc) <<
969 ", " << longSeekOffset << ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << endl;
970 }
971 #endif
972 }
973
974 #endif
975
976 if (i < 0 && errno == EINTR)
977 {
978 continue;
979 }
980 else if (i < 0)
981 {
982 try
983 {
984 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
985 }
986 catch (exception& exc)
987 {
988 cerr << "FileName Err:" << exc.what() << endl;
989 strcpy(fileNamePtr, "unknown filename");
990 }
991
992 errorString = strerror(errno);
993 errorOccurred = true;
994 errMsg << "thr_popper: Error reading file for OID " << oid << "; "
995 << " fp " << fp << "; offset " << longSeekOffset << "; fileName "
996 << fileNamePtr << "; " << errorString;
997 break; // break from "while(acc..." loop
998 }
999 else if (i == 0)
1000 {
1001 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
1002 errorString = "early EOF";
1003 errorOccurred = true;
1004 errMsg << "thr_popper: Early EOF reading file for OID " <<
1005 oid << "; " << fileNamePtr;
1006 break; // break from "while(acc..." loop
1007 }
1008
1009 acc += i;
1010 longSeekOffset += (uint64_t)i;
1011 readCount++;
1012 bytesRead += i;
1013 } // while(acc...
1014
1015 //..Break out of "for (j..." read loop if error occurred
1016 if (errorOccurred)
1017 {
1018 Message::Args args;
1019 args.add(oid);
1020 args.add(errorString);
1021 primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
1022 iom->handleBlockReadError( fr, errMsg.str(), ©Locked );
1023 break;
1024 }
1025
1026 blocksRead += blocksThisRead;
1027
1028 if (iom->IOTrace())
1029 clock_gettime(CLOCK_REALTIME, &tm2);
1030
1031 /* New bulk VSS lookup code */
1032 {
1033 vector<BRM::LBID_t> lbids;
1034 vector<BRM::VER_t> versions;
1035 vector<bool> isLocked;
1036
1037 for (i = 0; (uint32_t) i < blocksThisRead; i++)
1038 lbids.push_back((BRM::LBID_t) (lbid + i) + (j * iom->blocksPerRead));
1039
1040 if (blocksRequested > 1 || !flg) // prefetch, or an unversioned single-block read
1041 iom->dbrm()->bulkGetCurrentVersion(lbids, &versions, &isLocked);
1042 else // a single-block read that was versioned
1043 {
1044 versions.push_back(ver);
1045 isLocked.push_back(false);
1046 }
1047
1048 uint8_t* ptr = (uint8_t*)&alignedbuff[0];
1049
1050 if (blocksThisRead > 0 && fdit->second->isCompressed())
1051 {
1052 #ifdef _MSC_VER
1053 unsigned int blen = 4 * 1024 * 1024 + 4;
1054 #else
1055 uint32_t blen = 4 * 1024 * 1024 + 4;
1056 #endif
1057 #ifdef IDB_COMP_POC_DEBUG
1058 {
1059 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
1060 cout << "decompress(0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", " << fdit->second->ptrList[cmpOffFact.quot].second << ", 0x" << hex << (ptrdiff_t)uCmpBuf << dec << ", " << blen << ")" << endl;
1061 }
1062 #endif
1063 int dcrc = decompressor.uncompressBlock(&alignedbuff[0],
1064 fdit->second->ptrList[cmpOffFact.quot].second, uCmpBuf, blen);
1065
1066 if (dcrc != 0)
1067 {
1068 #ifdef IDB_COMP_POC_DEBUG
1069 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
1070 #endif
1071
1072 if (++decompRetryCount < 30)
1073 {
1074 blocksRead -= blocksThisRead;
1075 waitForRetry(decompRetryCount);
1076
1077 // log an info message every 10 retries
1078 if (decompRetryCount == 1)
1079 {
1080 Message::Args args;
1081 args.add(oid);
1082 ostringstream infoMsg;
1083 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
1084 infoMsg << "decompress retry for " << fileNamePtr
1085 << " decompRetry chunk " << cmpOffFact.quot
1086 << " code=" << dcrc;
1087 args.add(infoMsg.str());
1088 primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
1089 }
1090
1091 goto decompRetry;
1092 }
1093
1094 cout << boldStart << "decomp returned " << dcrc << boldStop << endl;
1095
1096 errorOccurred = true;
1097 Message::Args args;
1098 args.add(oid);
1099 errMsg << "Error decompressing block " << cmpOffFact.quot << " code=" << dcrc << " part=" << partNum << " seg=" << segNum;
1100 args.add(errMsg.str());
1101 primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
1102 iom->handleBlockReadError( fr, errMsg.str(), ©Locked );
1103 break;
1104 }
1105
1106 //FIXME: why doesn't this work??? (See later for why)
1107 //ptr = &uCmpBuf[cmpOffFact.rem];
1108 memcpy(ptr, &uCmpBuf[cmpOffFact.rem], blocksThisRead * BLOCK_SIZE);
1109
1110 // log the retries, if any
1111 if (retryReadHeadersCount > 0 || decompRetryCount > 0)
1112 {
1113 Message::Args args;
1114 args.add(oid);
1115 ostringstream infoMsg;
1116 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
1117 infoMsg << "Successfully uncompress " << fileNamePtr << " chunk "
1118 << cmpOffFact.quot << " @";
1119
1120 if (retryReadHeadersCount > 0)
1121 infoMsg << " HeaderRetry:" << retryReadHeadersCount;
1122
1123 if (decompRetryCount > 0)
1124 infoMsg << " UncompressRetry:" << decompRetryCount;
1125
1126 args.add(infoMsg.str());
1127 primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
1128 }
1129 }
1130
1131 for (i = 0; useCache && (uint32_t) i < lbids.size(); i++)
1132 {
1133 if (!isLocked[i])
1134 {
1135 #ifdef IDB_COMP_POC_DEBUG
1136 {
1137 if (debugWrite)
1138 {
1139 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
1140 cout << boldStart << "i = " << i << ", ptr = 0x" << hex << (ptrdiff_t)&ptr[i * BLOCK_SIZE] << dec << boldStop << endl;
1141 cout << boldStart;
1142 #if 0
1143 int32_t* i32p;
1144 i32p = (int32_t*)&ptr[i * BLOCK_SIZE];
1145
1146 for (int iy = 0; iy < 2; iy++)
1147 {
1148 for (int ix = 0; ix < 8; ix++, i32p++)
1149 cout << *i32p << ' ';
1150
1151 cout << endl;
1152 }
1153
1154 #else
1155 int64_t* i64p;
1156 i64p = (int64_t*)&ptr[i * BLOCK_SIZE];
1157
1158 for (int iy = 0; iy < 2; iy++)
1159 {
1160 for (int ix = 0; ix < 8; ix++, i64p++)
1161 cout << *i64p << ' ';
1162
1163 cout << endl;
1164 }
1165
1166 #endif
1167 cout << boldStop << endl;
1168 }
1169 }
1170 #endif
1171 cacheInsertOps.push_back(CacheInsert_t(lbids[i], versions[i], (uint8_t*)
1172 &alignedbuff[i * BLOCK_SIZE]));
1173 }
1174 }
1175
1176 if (useCache)
1177 {
1178 blocksLoaded += fbm->bulkInsert(cacheInsertOps);
1179 cacheInsertOps.clear();
1180 }
1181 }
1182
1183 dlen -= blocksThisRead;
1184
1185 } // for (j...
1186
1187 fdMapMutex.lock();
1188
1189 if (fdit->second.get())
1190 fdit->second->inUse--;
1191
1192 fdit = fdcache.end();
1193 fdMapMutex.unlock();
1194
1195 if (errorOccurred)
1196 continue;
1197
1198 try
1199 {
1200 iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
1201 copyLocked = false;
1202 }
1203 catch (exception& e)
1204 {
1205 cout << "releaseRange: " << e.what() << endl;
1206 }
1207
1208 fr->BlocksRead(blocksRead);
1209 fr->BlocksLoaded(blocksLoaded);
1210
1211 //FIXME: This is why some code above doesn't work...
1212 if (fr->data != 0 && blocksRequested == 1)
1213 memcpy(fr->data, alignedbuff, BLOCK_SIZE);
1214
1215 fr->frMutex().lock();
1216 fr->SetPredicate(fileRequest::COMPLETE);
1217 fr->frCond().notify_one();
1218 fr->frMutex().unlock();
1219
1220 if (iom->IOTrace())
1221 {
1222 clock_gettime(CLOCK_REALTIME, &rqst2);
1223 timespec_sub(tm, tm2, tm3);
1224 timespec_sub(rqst1, rqst2, rqst3);
1225
1226 // @Bug 3149. IOMTrace was not reporting bytesRead correctly for compressed columns.
1227 uint32_t reportBytesRead = (compressedBytesRead > 0) ? compressedBytesRead : bytesRead;
1228
1229 lFile
1230 << left << setw(5) << setfill(' ') << oid
1231 << right << setw(5) << setfill(' ') << offset / extentSize << " "
1232 << right << setw(11) << setfill(' ') << lbid << " "
1233 << right << setw(9) << bytesRead / (readCount << 13) << " "
1234 << right << setw(9) << blocksRequested << " "
1235 << right << setw(10) << fixed << tm3 << " "
1236 << right << fixed << ((double)(rqst1.tv_sec + (1.e-9 * rqst1.tv_nsec))) << " "
1237 << right << setw(10) << fixed << rqst3 << " "
1238 << right << setw(10) << fixed << longSeekOffset << " "
1239 << right << setw(10) << fixed << reportBytesRead << " "
1240 << right << setw(3) << fixed << dbroot << " "
1241 << right << setw(3) << fixed << partNum << " "
1242 << right << setw(3) << fixed << segNum << " "
1243 << endl;
1244 }
1245
1246 } // for(;;)
1247
1248 delete [] uCmpBuf;
1249
1250 lFile.close();
1251
1252 //reaching here is an error...
1253
1254 return 0;
1255 } // end thr_popper
1256
1257 } // anonymous namespace
1258
1259 namespace dbbc
1260 {
1261
setReadLock()1262 void setReadLock()
1263 {
1264 localLock.read_lock();
1265 }
1266
releaseReadLock()1267 void releaseReadLock()
1268 {
1269 localLock.read_unlock();
1270 }
1271
dropFDCache()1272 void dropFDCache()
1273 {
1274 localLock.write_lock();
1275 fdcache.clear();
1276 localLock.write_unlock();
1277 }
purgeFDCache(std::vector<BRM::FileInfo> & files)1278 void purgeFDCache(std::vector<BRM::FileInfo>& files)
1279 {
1280 localLock.write_lock();
1281
1282 FdCacheType_t::iterator fdit;
1283
1284 for ( uint32_t i = 0; i < files.size(); i++)
1285 {
1286 FdEntry fdKey(files[i].oid, files[i].dbRoot, files[i].partitionNum, files[i].segmentNum, files[i].compType, NULL);
1287 fdit = fdcache.find(fdKey);
1288
1289 if (fdit != fdcache.end())
1290 fdcache.erase(fdit);
1291 }
1292
1293 localLock.write_unlock();
1294 }
1295
ioManager(FileBufferMgr & fbm,fileBlockRequestQueue & fbrq,int thrCount,int bsPerRead)1296 ioManager::ioManager(FileBufferMgr& fbm,
1297 fileBlockRequestQueue& fbrq,
1298 int thrCount,
1299 int bsPerRead):
1300 blocksPerRead(bsPerRead),
1301 fIOMfbMgr(fbm),
1302 fIOMRequestQueue(fbrq),
1303 fFileOp(false)
1304 {
1305 if (thrCount <= 0)
1306 thrCount = 1;
1307
1308 if (thrCount > 256)
1309 thrCount = 256;
1310
1311 fConfig = Config::makeConfig();
1312 string val = fConfig->getConfig("DBBC", "IOMTracing");
1313 int temp = 0;
1314
1315 if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1316
1317 if (temp > 0)
1318 fIOTrace = true;
1319 else
1320 fIOTrace = false;
1321
1322 val = fConfig->getConfig("DBBC", "MaxOpenFiles");
1323 temp = 0;
1324 fMaxOpenFiles = MAX_OPEN_FILES;
1325
1326 if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1327
1328 if (temp > 0)
1329 fMaxOpenFiles = temp;
1330
1331 val = fConfig->getConfig("DBBC", "DecreaseOpenFilesCount");
1332 temp = 0;
1333 fDecreaseOpenFilesCount = DECREASE_OPEN_FILES;
1334
1335 if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1336
1337 if (temp > 0)
1338 fDecreaseOpenFilesCount = temp;
1339
1340 // limit the number of files closed
1341 if (fDecreaseOpenFilesCount > (uint32_t)(0.75 * fMaxOpenFiles))
1342 fDecreaseOpenFilesCount = (uint32_t)(0.75 * fMaxOpenFiles);
1343
1344 val = fConfig->getConfig("DBBC", "FDCacheTrace");
1345 temp = 0;
1346 fFDCacheTrace = false;
1347
1348 if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1349
1350 if (temp > 0)
1351 {
1352 fFDCacheTrace = true;
1353 #ifdef _MSC_VER
1354 FDTraceFile().open("C:/Calpont/log/trace/fdcache", ios_base::ate | ios_base::app);
1355 #else
1356 FDTraceFile().open(string(MCSLOGDIR) + "/trace/fdcache", ios_base::ate | ios_base::app);
1357 #endif
1358 }
1359
1360 fThreadCount = thrCount;
1361 go();
1362 }
1363
buildOidFileName(const BRM::OID_t oid,uint16_t dbRoot,const uint32_t partNum,const uint16_t segNum,char * file_name)1364 void ioManager::buildOidFileName(const BRM::OID_t oid, uint16_t dbRoot, const uint32_t partNum, const uint16_t segNum, char* file_name)
1365 {
1366 // when it's a request for the version buffer, the dbroot comes in as 0 for legacy reasons
1367 if (dbRoot == 0 && oid < 1000)
1368 dbRoot = fdbrm.getDBRootOfVBOID(oid);
1369
1370 fFileOp.getFileNameForPrimProc(oid, file_name, dbRoot, partNum, segNum);
1371 }
1372
localLbidLookup(BRM::LBID_t lbid,BRM::VER_t verid,bool vbFlag,BRM::OID_t & oid,uint16_t & dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,uint32_t & fileBlockOffset)1373 int ioManager::localLbidLookup(BRM::LBID_t lbid,
1374 BRM::VER_t verid,
1375 bool vbFlag,
1376 BRM::OID_t& oid,
1377 uint16_t& dbRoot,
1378 uint32_t& partitionNum,
1379 uint16_t& segmentNum,
1380 uint32_t& fileBlockOffset)
1381 {
1382 if (primitiveprocessor::noVB > 0)
1383 vbFlag = false;
1384
1385 int rc = fdbrm.lookupLocal(lbid,
1386 verid,
1387 vbFlag,
1388 oid,
1389 dbRoot,
1390 partitionNum,
1391 segmentNum,
1392 fileBlockOffset);
1393
1394 return rc;
1395 }
1396
1397 struct LambdaKludge
1398 {
LambdaKludgedbbc::LambdaKludge1399 LambdaKludge(ioManager* i) : iom(i) { }
~LambdaKludgedbbc::LambdaKludge1400 ~LambdaKludge()
1401 {
1402 iom = NULL;
1403 }
1404 ioManager* iom;
operator ()dbbc::LambdaKludge1405 void operator()()
1406 {
1407 thr_popper(iom);
1408 }
1409 };
1410
createReaders()1411 void ioManager::createReaders()
1412 {
1413 int idx;
1414
1415 for (idx = 0; idx < fThreadCount; idx++)
1416 {
1417 try
1418 {
1419 fThreadArr.create_thread(LambdaKludge(this));
1420 }
1421 catch (exception& e)
1422 {
1423 cerr << "IOM::createReaders() caught " << e.what() << endl;
1424 idx--;
1425 sleep(1);
1426 continue;
1427 }
1428 }
1429 }
1430
~ioManager()1431 ioManager::~ioManager()
1432 {
1433 stop();
1434 }
1435
go(void)1436 void ioManager::go(void)
1437 {
1438 createReaders();
1439 }
1440
1441
1442 //FIXME: is this right? what does this method do?
stop()1443 void ioManager::stop()
1444 {
1445 for (int idx = 0; idx < fThreadCount; idx++)
1446 {
1447 (void)0; //pthread_detach(fThreadArr[idx]);
1448 }
1449 }
1450
1451
getNextRequest()1452 fileRequest* ioManager::getNextRequest()
1453 {
1454 fileRequest* blk = 0;
1455
1456 try
1457 {
1458 blk = fIOMRequestQueue.pop();
1459 return blk;
1460 }
1461 catch (exception&)
1462 {
1463 cerr << "ioManager::getNextRequest() ERROR " << endl;
1464 }
1465
1466 return blk;
1467
1468 }
1469
1470 //------------------------------------------------------------------------------
1471 // Prints stderr msg and updates fileRequest object to reflect an error.
1472 // Lastly, notifies waiting thread that fileRequest has been completed.
1473 //------------------------------------------------------------------------------
handleBlockReadError(fileRequest * fr,const string & errMsg,bool * copyLocked,int errorCode)1474 void ioManager::handleBlockReadError( fileRequest* fr,
1475 const string& errMsg, bool* copyLocked, int errorCode )
1476 {
1477 try
1478 {
1479 dbrm()->releaseLBIDRange(fr->Lbid(), fr->BlocksRequested());
1480 *copyLocked = false;
1481 }
1482 catch (exception& e)
1483 {
1484 cout << "releaseRange on read error: " << e.what() << endl;
1485 }
1486
1487 cerr << errMsg << endl;
1488 fr->RequestStatus(errorCode);
1489 fr->RequestStatusStr(errMsg);
1490
1491 fr->frMutex().lock();
1492 fr->SetPredicate(fileRequest::COMPLETE);
1493 fr->frCond().notify_one();
1494 fr->frMutex().unlock();
1495 }
1496
1497 }
1498 // vim:ts=4 sw=4:
1499