1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 // $Id: iomanager.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
20 //
21 // C++ Implementation: iomanager
22 //
23 // Description:
24 //
25 //
26 // Author: Jason Rodriguez <jrodriguez@calpont.com>
27 //
28 //
29 //
30 
31 #include "mcsconfig.h"
32 
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35 #ifdef __linux__
36 #include <sys/mount.h>
37 #include <linux/fs.h>
38 #endif
39 #ifdef BLOCK_SIZE
40 #undef BLOCK_SIZE
41 #endif
42 #ifdef READ
43 #undef READ
44 #endif
45 #ifdef WRITE
46 #undef WRITE
47 #endif
48 #include <stdexcept>
49 #include <unistd.h>
50 #include <stdlib.h>
51 #include <string>
52 #include <sstream>
53 #ifdef _MSC_VER
54 #include <unordered_map>
55 #include <unordered_set>
56 #else
57 #include <tr1/unordered_map>
58 #include <tr1/unordered_set>
59 #endif
60 #include <set>
61 #include <sys/types.h>
62 #include <sys/stat.h>
63 #include <sys/time.h>
64 #include <fcntl.h>
65 #include <errno.h>
66 #include <boost/shared_ptr.hpp>
67 #include <boost/scoped_array.hpp>
68 #include <boost/thread.hpp>
69 #include <boost/thread/condition.hpp>
70 #ifdef _MSC_VER
71 typedef int pthread_t;
72 #else
73 #include <pthread.h>
74 #endif
75 //#define NDEBUG
76 #include <cassert>
77 
78 using namespace std;
79 
80 #include "configcpp.h"
81 using namespace config;
82 
83 #include "messageobj.h"
84 #include "messageids.h"
85 using namespace logging;
86 
87 #include "brmtypes.h"
88 
89 #include "pp_logger.h"
90 
91 #include "fsutils.h"
92 
93 #include "rwlock_local.h"
94 
95 #include "iomanager.h"
96 #include "liboamcpp.h"
97 
98 #include "idbcompress.h"
99 using namespace compress;
100 
101 #include "IDBDataFile.h"
102 #include "IDBPolicy.h"
103 #include "IDBLogger.h"
104 using namespace idbdatafile;
105 
106 #include "mcsconfig.h"
107 
108 typedef tr1::unordered_set<BRM::OID_t> USOID;
109 
110 namespace primitiveprocessor
111 {
112 extern Logger* mlp;
113 extern int directIOFlag;
114 extern int noVB;
115 }
116 
117 #ifndef O_BINARY
118 #  define O_BINARY 0
119 #endif
120 #ifndef O_LARGEFILE
121 #  define O_LARGEFILE 0
122 #endif
123 #ifndef O_NOATIME
124 #  define O_NOATIME 0
125 #endif
126 
127 namespace
128 {
129 
130 using namespace dbbc;
131 using namespace std;
132 
133 const std::string boldStart = "\033[0;1m";
134 const std::string boldStop = "\033[0;39m";
135 
136 const uint32_t MAX_OPEN_FILES = 16384;
137 const uint32_t DECREASE_OPEN_FILES = 4096;
138 
timespec_sub(const struct timespec & tv1,const struct timespec & tv2,double & tm)139 void timespec_sub(const struct timespec& tv1,
140                   const struct timespec& tv2,
141                   double& tm)
142 {
143     tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
144 }
145 
146 struct IOMThreadArg
147 {
148     ioManager* iom;
149     int32_t thdId;
150 };
151 
152 typedef IOMThreadArg IOMThreadArg_t;
153 
154 /* structures shared across all iomanagers */
155 class FdEntry
156 {
157 public:
FdEntry()158     FdEntry() : oid(0), dbroot(0), partNum(0), segNum(0),
159         fp(0), c(0), inUse(0), compType(0)
160     {
161         cmpMTime = 0;
162     }
163 
FdEntry(const BRM::OID_t o,const uint16_t d,const uint32_t p,const uint16_t s,const int ct,IDBDataFile * f)164     FdEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const int ct, IDBDataFile* f) :
165         oid(o), dbroot(d), partNum(p), segNum(s), fp(f), c(0), inUse(0), compType(0)
166     {
167         cmpMTime = 0;
168 
169         if (oid >= 1000)
170             compType = ct;
171     }
172 
~FdEntry()173     ~FdEntry()
174     {
175         delete fp;
176         fp = 0;
177     }
178 
179     BRM::OID_t oid;
180     uint16_t dbroot;
181     uint32_t partNum;
182     uint16_t segNum;
183     IDBDataFile* fp;
184     uint32_t c;
185     int inUse;
186 
187     CompChunkPtrList ptrList;
188 
189     int compType;
isCompressed() const190     bool isCompressed() const
191     {
192         return (oid >= 1000 && compType != 0);
193     }
194     time_t cmpMTime;
operator <<(ostream & out,const FdEntry & o)195     friend ostream& operator<<(ostream& out, const FdEntry& o)
196     {
197         out << " o: " << o.oid
198             << " f: " << o.fp
199             << " d: " << o.dbroot
200             << " p: " << o.partNum
201             << " s: " << o.segNum
202             << " c: " << o.c
203             << " t: " << o.compType
204             << " m: " << o.cmpMTime;
205         return out;
206     }
207 };
208 
209 struct fdCacheMapLessThan
210 {
operator ()__anon7384bac30111::fdCacheMapLessThan211     bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
212     {
213         if (lhs.oid < rhs.oid)
214             return true;
215 
216         if (lhs.oid == rhs.oid && lhs.dbroot < rhs.dbroot)
217             return true;
218 
219         if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum < rhs.partNum)
220             return true;
221 
222         if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum && lhs.segNum < rhs.segNum)
223             return true;
224 
225         return false;
226 
227     }
228 };
229 
230 struct fdMapEqual
231 {
operator ()__anon7384bac30111::fdMapEqual232     bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
233     {
234         return (lhs.oid == rhs.oid &&
235                 lhs.dbroot == rhs.dbroot &&
236                 lhs.partNum == rhs.partNum &&
237                 lhs.segNum == rhs.segNum);
238     }
239 
240 };
241 
242 typedef boost::shared_ptr<FdEntry> SPFdEntry_t;
243 typedef std::map<FdEntry, SPFdEntry_t, fdCacheMapLessThan> FdCacheType_t;
244 
245 struct FdCountEntry
246 {
FdCountEntry__anon7384bac30111::FdCountEntry247     FdCountEntry() {}
FdCountEntry__anon7384bac30111::FdCountEntry248     FdCountEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const uint32_t c,
249                  const FdCacheType_t::iterator it) : oid(o), dbroot(d), partNum(p), segNum(s), cnt(c), fdit(it) {}
~FdCountEntry__anon7384bac30111::FdCountEntry250     ~FdCountEntry() {}
251 
252     BRM::OID_t oid;
253     uint16_t dbroot;
254     uint32_t partNum;
255     uint16_t segNum;
256     uint32_t cnt;
257     FdCacheType_t::iterator fdit;
258 
operator <<(ostream & out,const FdCountEntry & o)259     friend ostream& operator<<(ostream& out, const FdCountEntry& o)
260     {
261         out << " o: " << o.oid
262             << " d: " << o.dbroot
263             << " p: " << o.partNum
264             << " s: " << o.segNum
265             << " c: " << o.cnt;
266 
267         return out;
268     }
269 
270 }; // FdCountEntry
271 
272 typedef FdCountEntry FdCountEntry_t;
273 
274 struct fdCountCompare
275 {
operator ()__anon7384bac30111::fdCountCompare276     bool operator() (const FdCountEntry_t& lhs, const FdCountEntry_t& rhs)
277     {
278         return lhs.cnt > rhs.cnt;
279     }
280 };
281 
282 typedef multiset<FdCountEntry_t, fdCountCompare> FdCacheCountType_t;
283 
284 FdCacheType_t fdcache;
285 boost::mutex fdMapMutex;
286 rwlock::RWLock_local localLock;
287 
alignTo(const char * in,int av)288 char* alignTo(const char* in, int av)
289 {
290     ptrdiff_t inx = reinterpret_cast<ptrdiff_t>(in);
291     ptrdiff_t avx = static_cast<ptrdiff_t>(av);
292 
293     if ((inx % avx) != 0)
294     {
295         inx &= ~(avx - 1);
296         inx += avx;
297     }
298 
299     char* outx = reinterpret_cast<char*>(inx);
300     return outx;
301 }
302 
waitForRetry(long count)303 void waitForRetry(long count)
304 {
305     usleep(5000 * count);
306     return;
307 }
308 
309 
310 //Must hold the FD cache lock!
updateptrs(char * ptr,FdCacheType_t::iterator fdit,const IDBCompressInterface & decompressor)311 int updateptrs(char* ptr, FdCacheType_t::iterator fdit, const IDBCompressInterface& decompressor)
312 {
313     ssize_t i;
314     uint32_t progress;
315 
316 
317     // ptr is taken from buffer, already been checked: realbuff.get() == 0
318     if (ptr == 0)
319         return -1;
320 
321     // already checked before: fdit->second->isCompressed()
322     if (fdit->second.get() == 0)
323         return -2;
324 
325     IDBDataFile* fp = fdit->second->fp;
326 
327     if (fp == INVALID_HANDLE_VALUE)
328     {
329         Message::Args args;
330         args.add("updateptrs got invalid fp.");
331         primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
332         return -3;
333     }
334 
335     //We need to read one extra block because we need the first ptr in the 3rd block
336     // to know if we're done.
337     //FIXME: re-work all of this so we don't have to re-read the 3rd block.
338     progress = 0;
339 
340     while (progress < 4096 * 3)
341     {
342         i = fp->pread(&ptr[progress], progress, (4096 * 3) - progress);
343 
344         if (i <= 0)
345             break;
346 
347         progress += i;
348     }
349 
350     if (progress != 4096 * 3)
351         return -4;   // let it retry. Not likely, but ...
352 
353     fdit->second->cmpMTime = 0;
354     time_t mtime = fp->mtime();
355 
356     if ( mtime != (time_t) - 1 )
357         fdit->second->cmpMTime = mtime;
358 
359     int gplRc = 0;
360     gplRc = decompressor.getPtrList(&ptr[4096], 4096, fdit->second->ptrList);
361 
362     if (gplRc != 0)
363         return -5; // go for a retry.
364 
365     if (fdit->second->ptrList.size() == 0)
366         return -6; // go for a retry.
367 
368     uint64_t numHdrs = fdit->second->ptrList[0].first / 4096ULL - 2ULL;
369 
370     if (numHdrs > 0)
371     {
372         boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + 4095]);
373         char* nextHdrBufPtr = 0;
374 
375         nextHdrBufPtr = alignTo(nextHdrBufsa.get(), 4096);
376 
377         progress = 0;
378 
379         while (progress < numHdrs * 4096)
380         {
381             i = fp->pread(&nextHdrBufPtr[progress], (4096 * 2) + progress,
382                           (numHdrs * 4096) - progress);
383 
384             if (i <= 0)
385                 break;
386 
387             progress += i;
388         }
389 
390         if (progress != numHdrs * 4096)
391             return -8;
392 
393         CompChunkPtrList nextPtrList;
394         gplRc = decompressor.getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
395 
396         if (gplRc != 0)
397             return -7; // go for a retry.
398 
399         fdit->second->ptrList.insert(fdit->second->ptrList.end(), nextPtrList.begin(), nextPtrList.end());
400     }
401 
402     return 0;
403 }
404 
thr_popper(ioManager * arg)405 void* thr_popper(ioManager* arg)
406 {
407     ioManager* iom = arg;
408     FileBufferMgr* fbm;
409     int totalRqst = 0;
410     fileRequest* fr = 0;
411     BRM::LBID_t lbid = 0;
412     BRM::OID_t oid = 0;
413     BRM::VER_t ver = 0;
414     BRM::QueryContext qc;
415     BRM::VER_t txn = 0;
416     int compType = 0;
417     int blocksLoaded = 0;
418     int blocksRead = 0;
419     const unsigned pageSize = 4096;
420     fbm = &iom->fileBufferManager();
421     char fileName[WriteEngine::FILE_NAME_SIZE];
422     const uint64_t fileBlockSize = BLOCK_SIZE;
423     bool flg = false;
424     bool useCache;
425     uint16_t dbroot = 0;
426     uint32_t partNum = 0;
427     uint16_t segNum = 0;
428     uint32_t offset = 0;
429     char* fileNamePtr = fileName;
430     uint64_t longSeekOffset = 0;
431     int err;
432     uint32_t dlen = 0, acc, readSize, blocksThisRead, j;
433     uint32_t blocksRequested = 0;
434     ssize_t i;
435     char* alignedbuff = 0;
436     boost::scoped_array<char> realbuff;
437     pthread_t threadId = 0;
438     ostringstream iomLogFileName;
439     ofstream lFile;
440     struct timespec rqst1;
441     struct timespec rqst2;
442     struct timespec tm;
443     struct timespec tm2;
444     double tm3;
445     double rqst3;
446     bool locked = false;
447     SPFdEntry_t fe;
448     IDBCompressInterface decompressor;
449     vector<CacheInsert_t> cacheInsertOps;
450     bool copyLocked = false;
451 
452     if (iom->IOTrace())
453     {
454 #ifdef _MSC_VER
455         threadId = GetCurrentThreadId();
456         iomLogFileName << "C:/Calpont/log/trace/iom." << threadId;
457 #else
458         threadId = pthread_self();
459         iomLogFileName << MCSLOGDIR << "/trace/iom." << threadId;
460 #endif
461         lFile.open(iomLogFileName.str().c_str(), ios_base::app | ios_base::ate);
462     }
463 
464     FdCacheType_t::iterator fdit;
465     IDBDataFile* fp = 0;
466     uint32_t maxCompSz = IDBCompressInterface::maxCompressedSize(iom->blocksPerRead * BLOCK_SIZE);
467     uint32_t readBufferSz = maxCompSz + pageSize;
468 
469     realbuff.reset(new char[readBufferSz]);
470 
471     if (realbuff.get() == 0)
472     {
473         cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
474         return 0;
475     }
476 
477     alignedbuff = alignTo(realbuff.get(), 4096);
478 
479     if ((((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff.get()) >= (ptrdiff_t)pageSize) ||
480             (((ptrdiff_t)alignedbuff % pageSize) != 0))
481         throw runtime_error("aligned buffer size is not matching the page size.");
482 
483     uint8_t* uCmpBuf = 0;
484     uCmpBuf = new uint8_t[4 * 1024 * 1024 + 4];
485 
486     for ( ; ; )
487     {
488         if (copyLocked)
489         {
490             iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
491             copyLocked = false;
492         }
493 
494         if (locked)
495         {
496             localLock.read_unlock();
497             locked = false;
498         }
499 
500         fr = iom->getNextRequest();
501 
502         localLock.read_lock();
503         locked = true;
504 
505         if (iom->IOTrace())
506             clock_gettime(CLOCK_REALTIME, &rqst1);
507 
508         lbid = fr->Lbid();
509         qc = fr->Ver();
510         txn = fr->Txn();
511         flg = fr->Flg();
512         compType = fr->CompType();
513         useCache = fr->useCache();
514         blocksLoaded = 0;
515         blocksRead = 0;
516         dlen = fr->BlocksRequested();
517         blocksRequested = fr->BlocksRequested();
518         oid = 0;
519         dbroot = 0;
520         partNum = 0;
521         segNum = 0;
522         offset = 0;
523 
524         // special case for getBlock.
525         iom->dbrm()->lockLBIDRange(lbid, blocksRequested);
526         copyLocked = true;
527 
528         // special case for getBlock.
529         if (blocksRequested == 1)
530         {
531             BRM::VER_t outVer;
532             iom->dbrm()->vssLookup((BRM::LBID_t) lbid, qc, txn, &outVer, &flg);
533             ver = outVer;
534             fr->versioned(flg);
535         }
536         else
537         {
538             fr->versioned(false);
539             ver = qc.currentScn;
540         }
541 
542         err = iom->localLbidLookup(lbid,
543                                    ver,
544                                    flg,
545                                    oid,
546                                    dbroot,
547                                    partNum,
548                                    segNum,
549                                    offset);
550 
551         if (err == BRM::ERR_SNAPSHOT_TOO_OLD)
552         {
553             ostringstream errMsg;
554             errMsg << "thr_popper: version " << ver << " of LBID " << lbid <<
555                    "is too old";
556             iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
557             continue;
558         }
559         else if (err < 0)
560         {
561             ostringstream errMsg;
562             errMsg << "thr_popper: BRM lookup failure; lbid=" <<
563                    lbid << "; ver=" << ver << "; flg=" << (flg ? 1 : 0);
564             iom->handleBlockReadError( fr, errMsg.str(), &copyLocked, fileRequest::BRM_LOOKUP_ERROR);
565             continue;
566         }
567 
568 #ifdef IDB_COMP_POC_DEBUG
569         {
570             boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
571 
572             if (compType != 0) cout << boldStart;
573 
574             cout << "fileRequest: " << *fr << endl;
575 
576             if (compType != 0) cout << boldStop;
577         }
578 #endif
579         const uint32_t extentSize = iom->getExtentRows();
580         FdEntry fdKey(oid, dbroot, partNum, segNum, compType, NULL);
581         //cout << "Looking for " << fdKey << endl
582         //   << "O: " << oid << " D: " << dbroot << " P: " << partNum << " S: " << segNum << endl;
583 
584         fdMapMutex.lock();
585         fdit = fdcache.find(fdKey);
586 
587         if (fdit == fdcache.end())
588         {
589             try
590             {
591                 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
592             }
593             catch (exception& exc)
594             {
595                 fdMapMutex.unlock();
596                 Message::Args args;
597                 args.add(oid);
598                 args.add(exc.what());
599                 primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
600                 ostringstream errMsg;
601                 errMsg << "thr_popper: Error building filename for OID " <<
602                        oid << "; " << exc.what();
603                 iom->handleBlockReadError( fr, errMsg.str(), &copyLocked );
604                 continue;
605             }
606 
607 #ifdef IDB_COMP_USE_CMP_SUFFIX
608 
609             if (compType != 0)
610             {
611                 char* ptr = strrchr(fileNamePtr, '.');
612                 idbassert(ptr);
613                 strcpy(ptr, ".cmp");
614             }
615 
616 #endif
617 
618             if (oid > 3000)
619             {
620                 //TODO: should syscat columns be considered when reducing open file count
621                 //  They are always needed why should they be closed?
622                 if (fdcache.size() >= iom->MaxOpenFiles())
623                 {
624                     FdCacheCountType_t fdCountSort;
625 
626                     for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
627                     {
628                         struct FdCountEntry fdc(it->second->oid,
629                                                 it->second->dbroot,
630                                                 it->second->partNum,
631                                                 it->second->segNum,
632                                                 it->second->c,
633                                                 it);
634 
635                         fdCountSort.insert(fdc);
636                     }
637 
638                     if (iom->FDCacheTrace())
639                     {
640                         iom->FDTraceFile()
641                                 << "Before flushing sz: " << fdcache.size()
642                                 << " delCount: " << iom->DecreaseOpenFilesCount()
643                                 << endl;
644 
645                         for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
646                             iom->FDTraceFile() << *(*it).second << endl;
647 
648                         iom->FDTraceFile() << "==================" << endl << endl;
649                     }
650 
651                     // TODO: should we consider a minimum number of open files
652                     //      currently, there is nothing to prevent all open files
653                     //      from being closed by the IOManager.
654 
655                     uint32_t delCount = 0;
656 
657                     for (FdCacheCountType_t::reverse_iterator rit = fdCountSort.rbegin();
658                             rit != fdCountSort.rend() &&
659                             fdcache.size() > 0 &&
660                             delCount < iom->DecreaseOpenFilesCount();
661                             rit++)
662                     {
663                         FdEntry oldfdKey(rit->oid, rit->dbroot, rit->partNum, rit->segNum, 0, NULL);
664                         FdCacheType_t::iterator it = fdcache.find(oldfdKey);
665 
666                         if (it != fdcache.end())
667                         {
668                             if (iom->FDCacheTrace())
669                             {
670                                 if (!rit->fdit->second->inUse)
671                                     iom->FDTraceFile()
672                                             << "Removing dc: " << delCount << " sz: " << fdcache.size()
673                                             << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
674                                 else
675                                     iom->FDTraceFile()
676                                             << "Skip Remove in use dc: " << delCount << " sz: " << fdcache.size()
677                                             << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
678                             }
679 
680                             if (rit->fdit->second->inUse <= 0)
681                             {
682                                 fdcache.erase(it);
683                                 delCount++;
684                             }
685                         }
686                     } // for (FdCacheCountType_t...
687 
688                     if (iom->FDCacheTrace())
689                     {
690                         iom->FDTraceFile() << "After flushing sz: " << fdcache.size() << endl;
691 
692                         for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
693                         {
694                             iom->FDTraceFile()
695                                     << *(*it).second << endl;
696                         }
697 
698                         iom->FDTraceFile() << "==================" << endl << endl;
699                     }
700 
701                     fdCountSort.clear();
702 
703                 } // if (fdcache.size()...
704             } // if (oid > 3000)
705 
706             int opts = primitiveprocessor::directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
707             fp = NULL;
708             uint32_t openRetries = 0;
709             int saveErrno = 0;
710 
711             while (fp == NULL && openRetries++ < 5)
712             {
713                 fp = IDBDataFile::open(
714                          IDBPolicy::getType( fileNamePtr, IDBPolicy::PRIMPROC ),
715                          fileNamePtr,
716                          "r",
717                          opts);
718                 saveErrno = errno;
719 
720                 if (fp == NULL)
721                     sleep(1);
722             }
723 
724             if ( fp == NULL )
725             {
726                 Message::Args args;
727                 fdit = fdcache.end();
728                 fdMapMutex.unlock();
729                 args.add(oid);
730                 args.add(string(fileNamePtr) + ":" + strerror(saveErrno));
731                 primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
732                 ostringstream errMsg;
733                 errMsg << "thr_popper: Error opening file for OID " << oid << "; "
734                        << fileNamePtr << "; " << strerror(saveErrno);
735                 int errorCode = fileRequest::FAILED;
736 
737                 if (saveErrno == EINVAL)
738                     errorCode = fileRequest::FS_EINVAL;
739                 else if (saveErrno == ENOENT)
740                     errorCode = fileRequest::FS_ENOENT;
741 
742                 iom->handleBlockReadError(fr, errMsg.str(), &copyLocked, errorCode);
743                 continue;
744             }
745 
746             fe.reset( new FdEntry(oid, dbroot, partNum, segNum, compType, fp) );
747             fe->inUse++;
748             fdcache[fdKey] = fe;
749             fdit = fdcache.find(fdKey);
750             fe.reset();
751         }
752 
753         else
754         {
755             if (fdit->second.get())
756             {
757                 fdit->second->c++;
758                 fdit->second->inUse++;
759                 fp = fdit->second->fp;
760             }
761             else
762             {
763                 Message::Args args;
764                 fdit = fdcache.end();
765                 fdMapMutex.unlock();
766                 args.add(oid);
767                 ostringstream errMsg;
768                 errMsg << "Null FD cache entry. (dbroot, partNum, segNum, compType) = ("
769                        << dbroot << ", " << partNum << ", " << segNum << ", " << compType << ")";
770                 args.add(errMsg.str());
771                 primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
772                 iom->handleBlockReadError( fr, errMsg.str(), &copyLocked );
773                 continue;
774             }
775         }
776 
777         fdMapMutex.unlock();
778 
779 #ifdef SHARED_NOTHING_DEMO_2
780 
781         // change offset if it's shared nothing
782         /* Get the extent #, divide by # of PMs, calculate base offset for the new extent #,
783         	add extent offset */
784         if (oid >= 10000)
785             offset = (((offset / extentSize) / iom->pmCount) * extentSize) + (offset % extentSize);
786 
787 #endif
788 
789         longSeekOffset = (uint64_t)offset * (uint64_t)fileBlockSize;
790         lldiv_t cmpOffFact = lldiv(longSeekOffset, (4LL * 1024LL * 1024LL));
791         totalRqst++;
792 
793         uint32_t readCount = 0;
794         uint32_t bytesRead = 0;
795         uint32_t compressedBytesRead = 0; // @Bug 3149.  IOMTrace was not reporting bytesRead correctly for compressed columns.
796         uint32_t jend = blocksRequested / iom->blocksPerRead;
797 
798         if (iom->IOTrace())
799             clock_gettime(CLOCK_REALTIME, &tm);
800 
801         ostringstream errMsg;
802         bool errorOccurred = false;
803         string errorString;
804 #ifdef IDB_COMP_POC_DEBUG
805         bool debugWrite = false;
806 #endif
807 
808 #ifdef EM_AS_A_TABLE_POC__
809         dlen = 1;
810 #endif
811 
812         if (blocksRequested % iom->blocksPerRead)
813             jend++;
814 
815         for (j = 0; j < jend; j++)
816         {
817 
818             int decompRetryCount = 0;
819             int retryReadHeadersCount = 0;
820 
821 decompRetry:
822             blocksThisRead = std::min(dlen, iom->blocksPerRead);
823             readSize = blocksThisRead * BLOCK_SIZE;
824 
825             acc = 0;
826 
827             while (acc < readSize)
828             {
829 #if defined(EM_AS_A_TABLE_POC__)
830 
831                 if (oid == 1084)
832                 {
833                     uint32_t h;
834                     int32_t o = 0;
835                     int32_t* ip;
836                     ip = (int32_t*)(&alignedbuff[acc]);
837 
838                     for (o = 0; o < 2048; o++)
839                     {
840                         if (iom->dbrm()->getHWM(o + 3000, h) == 0)
841                             *ip++ = h;
842                         else
843                             *ip++ = numeric_limits<int32_t>::min() + 1;
844                     }
845 
846                     i = BLOCK_SIZE;
847                 }
848                 else
849                     i = pread(fd, &alignedbuff[acc], readSize - acc, longSeekOffset);
850 
851 #else
852 
853                 if (fdit->second->isCompressed())
854                 {
855 retryReadHeaders:
856                     //hdrs may have been modified since we cached them in fdit->second...
857                     time_t cur_mtime = numeric_limits<time_t>::max();
858                     int updatePtrsRc = 0;
859                     fdMapMutex.lock();
860                     time_t fp_mtime = fp->mtime();
861 
862                     if ( fp_mtime != (time_t) - 1)
863                         cur_mtime = fp_mtime;
864 
865                     if (decompRetryCount > 0 || retryReadHeadersCount > 0 || cur_mtime > fdit->second->cmpMTime)
866                         updatePtrsRc = updateptrs(&alignedbuff[0], fdit, decompressor);
867 
868                     fdMapMutex.unlock();
869 
870                     int idx = cmpOffFact.quot;
871 
872                     if (updatePtrsRc != 0 || idx >= (signed)fdit->second->ptrList.size())
873                     {
874                         // Due to race condition, the header on disk may not upated yet.
875                         // Log an info message and retry.
876                         if (retryReadHeadersCount == 0)
877                         {
878                             Message::Args args;
879                             args.add(oid);
880                             ostringstream infoMsg;
881                             iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
882                             infoMsg << "retry updateptrs for " << fileNamePtr
883                                     << ". rc=" << updatePtrsRc << ", idx="
884                                     << idx << ", ptr.size=" << fdit->second->ptrList.size();
885                             args.add(infoMsg.str());
886                             primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
887                         }
888 
889                         if (++retryReadHeadersCount < 30)
890                         {
891                             waitForRetry(retryReadHeadersCount);
892                             fdit->second->cmpMTime = 0;
893                             goto retryReadHeaders;
894                         }
895                         else
896                         {
897                             // still fail after all the retries.
898                             errorOccurred = true;
899                             errMsg << "Error reading compression header. rc=" << updatePtrsRc << ", idx="
900                                    << idx << ", ptr.size=" << fdit->second->ptrList.size();
901                             errorString = errMsg.str();
902                             break;
903                         }
904                     }
905 
906                     //FIXME: make sure alignedbuff can hold fdit->second->ptrList[idx].second bytes
907                     if (fdit->second->ptrList[idx].second > maxCompSz)
908                     {
909                         errorOccurred = true;
910                         errMsg << "aligned buff too small. dataSize="
911                                << fdit->second->ptrList[idx].second
912                                << ", buffSize=" << maxCompSz;
913                         errorString = errMsg.str();
914                         break;
915                     }
916 
917                     i = fp->pread(&alignedbuff[0], fdit->second->ptrList[idx].first, fdit->second->ptrList[idx].second );
918 #ifdef IDB_COMP_POC_DEBUG
919                     {
920                         boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
921                         cout << boldStart << "pread1.1(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", " << fdit->second->ptrList[idx].second <<
922                              ", " << fdit->second->ptrList[idx].first << ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << boldStop << endl;
923                     }
924 #endif
925 
926                     // @bug3407, give it some retries if pread failed.
927                     if (i != (ssize_t)fdit->second->ptrList[idx].second)
928                     {
929                         // log an info message
930                         if (retryReadHeadersCount == 0)
931                         {
932                             Message::Args args;
933                             args.add(oid);
934                             ostringstream infoMsg;
935                             iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
936                             infoMsg << " Read from " << fileNamePtr << " failed at chunk " << idx
937                                     << ". (offset, size, actuall read) = ("
938                                     << fdit->second->ptrList[idx].first << ", "
939                                     << fdit->second->ptrList[idx].second << ", " << i << ")";
940                             args.add(infoMsg.str());
941                             primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
942                         }
943 
944                         if (++retryReadHeadersCount < 30)
945                         {
946                             waitForRetry(retryReadHeadersCount);
947                             fdit->second->cmpMTime = 0;
948                             goto retryReadHeaders;
949                         }
950                         else
951                         {
952                             errorOccurred = true;
953                             errMsg << "Error reading chunk " << idx;
954                             errorString = errMsg.str();
955                             break;
956                         }
957                     }
958 
959                     compressedBytesRead += i; // @Bug 3149.
960                     i = readSize;
961                 }
962                 else
963                 {
964                     i = fp->pread(&alignedbuff[acc], longSeekOffset, readSize - acc);
965 #ifdef IDB_COMP_POC_DEBUG
966                     {
967                         boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
968                         cout << "pread1.2(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[acc] << dec << ", " << (readSize - acc) <<
969                              ", " << longSeekOffset << ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << endl;
970                     }
971 #endif
972                 }
973 
974 #endif
975 
976                 if (i < 0 && errno == EINTR)
977                 {
978                     continue;
979                 }
980                 else if (i < 0)
981                 {
982                     try
983                     {
984                         iom->buildOidFileName(oid, dbroot, partNum, segNum,  fileNamePtr);
985                     }
986                     catch (exception& exc)
987                     {
988                         cerr << "FileName Err:" << exc.what() << endl;
989                         strcpy(fileNamePtr, "unknown filename");
990                     }
991 
992                     errorString   = strerror(errno);
993                     errorOccurred = true;
994                     errMsg << "thr_popper: Error reading file for OID " << oid << "; "
995                            << " fp " << fp << "; offset " << longSeekOffset << "; fileName "
996                            << fileNamePtr << "; " << errorString;
997                     break; // break from "while(acc..." loop
998                 }
999                 else if (i == 0)
1000                 {
1001                     iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
1002                     errorString   = "early EOF";
1003                     errorOccurred = true;
1004                     errMsg << "thr_popper: Early EOF reading file for OID " <<
1005                            oid << "; " << fileNamePtr;
1006                     break; // break from "while(acc..." loop
1007                 }
1008 
1009                 acc += i;
1010                 longSeekOffset += (uint64_t)i;
1011                 readCount++;
1012                 bytesRead += i;
1013             } // while(acc...
1014 
1015             //..Break out of "for (j..." read loop if error occurred
1016             if (errorOccurred)
1017             {
1018                 Message::Args args;
1019                 args.add(oid);
1020                 args.add(errorString);
1021                 primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
1022                 iom->handleBlockReadError( fr, errMsg.str(), &copyLocked );
1023                 break;
1024             }
1025 
1026             blocksRead += blocksThisRead;
1027 
1028             if (iom->IOTrace())
1029                 clock_gettime(CLOCK_REALTIME, &tm2);
1030 
1031             /* New bulk VSS lookup code */
1032             {
1033                 vector<BRM::LBID_t> lbids;
1034                 vector<BRM::VER_t> versions;
1035                 vector<bool> isLocked;
1036 
1037                 for (i = 0; (uint32_t) i < blocksThisRead; i++)
1038                     lbids.push_back((BRM::LBID_t) (lbid + i) + (j * iom->blocksPerRead));
1039 
1040                 if (blocksRequested > 1 || !flg)  // prefetch, or an unversioned single-block read
1041                     iom->dbrm()->bulkGetCurrentVersion(lbids, &versions, &isLocked);
1042                 else     // a single-block read that was versioned
1043                 {
1044                     versions.push_back(ver);
1045                     isLocked.push_back(false);
1046                 }
1047 
1048                 uint8_t* ptr = (uint8_t*)&alignedbuff[0];
1049 
1050                 if (blocksThisRead > 0 && fdit->second->isCompressed())
1051                 {
1052 #ifdef _MSC_VER
1053                     unsigned int blen = 4 * 1024 * 1024 + 4;
1054 #else
1055                     uint32_t blen = 4 * 1024 * 1024 + 4;
1056 #endif
1057 #ifdef IDB_COMP_POC_DEBUG
1058                     {
1059                         boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
1060                         cout << "decompress(0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", " << fdit->second->ptrList[cmpOffFact.quot].second << ", 0x" << hex << (ptrdiff_t)uCmpBuf << dec << ", " << blen << ")" << endl;
1061                     }
1062 #endif
1063                     int dcrc = decompressor.uncompressBlock(&alignedbuff[0],
1064                                                             fdit->second->ptrList[cmpOffFact.quot].second, uCmpBuf, blen);
1065 
1066                     if (dcrc != 0)
1067                     {
1068 #ifdef IDB_COMP_POC_DEBUG
1069                         boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
1070 #endif
1071 
1072                         if (++decompRetryCount < 30)
1073                         {
1074                             blocksRead -= blocksThisRead;
1075                             waitForRetry(decompRetryCount);
1076 
1077                             // log an info message every 10 retries
1078                             if (decompRetryCount == 1)
1079                             {
1080                                 Message::Args args;
1081                                 args.add(oid);
1082                                 ostringstream infoMsg;
1083                                 iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
1084                                 infoMsg << "decompress retry for " << fileNamePtr
1085                                         << " decompRetry chunk " << cmpOffFact.quot
1086                                         << " code=" << dcrc;
1087                                 args.add(infoMsg.str());
1088                                 primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
1089                             }
1090 
1091                             goto decompRetry;
1092                         }
1093 
1094                         cout << boldStart << "decomp returned " << dcrc << boldStop << endl;
1095 
1096                         errorOccurred = true;
1097                         Message::Args args;
1098                         args.add(oid);
1099                         errMsg << "Error decompressing block " << cmpOffFact.quot << " code=" << dcrc << " part=" << partNum << " seg=" << segNum;
1100                         args.add(errMsg.str());
1101                         primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
1102                         iom->handleBlockReadError( fr, errMsg.str(), &copyLocked );
1103                         break;
1104                     }
1105 
1106                     //FIXME: why doesn't this work??? (See later for why)
1107                     //ptr = &uCmpBuf[cmpOffFact.rem];
1108                     memcpy(ptr, &uCmpBuf[cmpOffFact.rem], blocksThisRead * BLOCK_SIZE);
1109 
1110                     // log the retries, if any
1111                     if (retryReadHeadersCount > 0 || decompRetryCount > 0)
1112                     {
1113                         Message::Args args;
1114                         args.add(oid);
1115                         ostringstream infoMsg;
1116                         iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
1117                         infoMsg << "Successfully uncompress " << fileNamePtr << " chunk "
1118                                 << cmpOffFact.quot << " @";
1119 
1120                         if (retryReadHeadersCount > 0)
1121                             infoMsg << " HeaderRetry:" << retryReadHeadersCount;
1122 
1123                         if (decompRetryCount > 0)
1124                             infoMsg << " UncompressRetry:" << decompRetryCount;
1125 
1126                         args.add(infoMsg.str());
1127                         primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
1128                     }
1129                 }
1130 
1131                 for (i = 0; useCache && (uint32_t) i < lbids.size(); i++)
1132                 {
1133                     if (!isLocked[i])
1134                     {
1135 #ifdef IDB_COMP_POC_DEBUG
1136                         {
1137                             if (debugWrite)
1138                             {
1139                                 boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
1140                                 cout << boldStart << "i = " << i << ", ptr = 0x" << hex << (ptrdiff_t)&ptr[i * BLOCK_SIZE] << dec << boldStop << endl;
1141                                 cout << boldStart;
1142 #if 0
1143                                 int32_t* i32p;
1144                                 i32p = (int32_t*)&ptr[i * BLOCK_SIZE];
1145 
1146                                 for (int iy = 0; iy < 2; iy++)
1147                                 {
1148                                     for (int ix = 0; ix < 8; ix++, i32p++)
1149                                         cout << *i32p << ' ';
1150 
1151                                     cout << endl;
1152                                 }
1153 
1154 #else
1155                                 int64_t* i64p;
1156                                 i64p = (int64_t*)&ptr[i * BLOCK_SIZE];
1157 
1158                                 for (int iy = 0; iy < 2; iy++)
1159                                 {
1160                                     for (int ix = 0; ix < 8; ix++, i64p++)
1161                                         cout << *i64p << ' ';
1162 
1163                                     cout << endl;
1164                                 }
1165 
1166 #endif
1167                                 cout << boldStop << endl;
1168                             }
1169                         }
1170 #endif
1171                         cacheInsertOps.push_back(CacheInsert_t(lbids[i], versions[i], (uint8_t*)
1172                                                                &alignedbuff[i * BLOCK_SIZE]));
1173                     }
1174                 }
1175 
1176                 if (useCache)
1177                 {
1178                     blocksLoaded += fbm->bulkInsert(cacheInsertOps);
1179                     cacheInsertOps.clear();
1180                 }
1181             }
1182 
1183             dlen -= blocksThisRead;
1184 
1185         } // for (j...
1186 
1187         fdMapMutex.lock();
1188 
1189         if (fdit->second.get())
1190             fdit->second->inUse--;
1191 
1192         fdit = fdcache.end();
1193         fdMapMutex.unlock();
1194 
1195         if (errorOccurred)
1196             continue;
1197 
1198         try
1199         {
1200             iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
1201             copyLocked = false;
1202         }
1203         catch (exception& e)
1204         {
1205             cout << "releaseRange: " << e.what() << endl;
1206         }
1207 
1208         fr->BlocksRead(blocksRead);
1209         fr->BlocksLoaded(blocksLoaded);
1210 
1211         //FIXME: This is why some code above doesn't work...
1212         if (fr->data != 0 && blocksRequested == 1)
1213             memcpy(fr->data, alignedbuff, BLOCK_SIZE);
1214 
1215         fr->frMutex().lock();
1216         fr->SetPredicate(fileRequest::COMPLETE);
1217         fr->frCond().notify_one();
1218         fr->frMutex().unlock();
1219 
1220         if (iom->IOTrace())
1221         {
1222             clock_gettime(CLOCK_REALTIME, &rqst2);
1223             timespec_sub(tm, tm2, tm3);
1224             timespec_sub(rqst1, rqst2, rqst3);
1225 
1226             // @Bug 3149.  IOMTrace was not reporting bytesRead correctly for compressed columns.
1227             uint32_t reportBytesRead = (compressedBytesRead > 0) ? compressedBytesRead : bytesRead;
1228 
1229             lFile
1230                     << left  << setw(5) << setfill(' ') << oid
1231                     << right << setw(5) << setfill(' ') << offset / extentSize << " "
1232                     << right << setw(11) << setfill(' ') << lbid << " "
1233                     << right << setw(9) << bytesRead / (readCount << 13) << " "
1234                     << right << setw(9)	<< blocksRequested << " "
1235                     << right << setw(10) << fixed << tm3 << " "
1236                     << right << fixed << ((double)(rqst1.tv_sec + (1.e-9 * rqst1.tv_nsec))) << " "
1237                     << right << setw(10) << fixed << rqst3 << " "
1238                     << right << setw(10) << fixed << longSeekOffset << " "
1239                     << right << setw(10) << fixed << reportBytesRead << " "
1240                     << right << setw(3) << fixed << dbroot << " "
1241                     << right << setw(3) << fixed << partNum << " "
1242                     << right << setw(3) << fixed << segNum << " "
1243                     << endl;
1244         }
1245 
1246     } // for(;;)
1247 
1248     delete [] uCmpBuf;
1249 
1250     lFile.close();
1251 
1252     //reaching here is an error...
1253 
1254     return 0;
1255 } // end thr_popper
1256 
1257 } // anonymous namespace
1258 
1259 namespace dbbc
1260 {
1261 
setReadLock()1262 void setReadLock()
1263 {
1264     localLock.read_lock();
1265 }
1266 
releaseReadLock()1267 void releaseReadLock()
1268 {
1269     localLock.read_unlock();
1270 }
1271 
dropFDCache()1272 void dropFDCache()
1273 {
1274     localLock.write_lock();
1275     fdcache.clear();
1276     localLock.write_unlock();
1277 }
purgeFDCache(std::vector<BRM::FileInfo> & files)1278 void purgeFDCache(std::vector<BRM::FileInfo>& files)
1279 {
1280     localLock.write_lock();
1281 
1282     FdCacheType_t::iterator fdit;
1283 
1284     for ( uint32_t i = 0; i < files.size(); i++)
1285     {
1286         FdEntry fdKey(files[i].oid, files[i].dbRoot, files[i].partitionNum, files[i].segmentNum, files[i].compType, NULL);
1287         fdit = fdcache.find(fdKey);
1288 
1289         if (fdit != fdcache.end())
1290             fdcache.erase(fdit);
1291     }
1292 
1293     localLock.write_unlock();
1294 }
1295 
ioManager(FileBufferMgr & fbm,fileBlockRequestQueue & fbrq,int thrCount,int bsPerRead)1296 ioManager::ioManager(FileBufferMgr& fbm,
1297                      fileBlockRequestQueue& fbrq,
1298                      int thrCount,
1299                      int bsPerRead):
1300     blocksPerRead(bsPerRead),
1301     fIOMfbMgr(fbm),
1302     fIOMRequestQueue(fbrq),
1303     fFileOp(false)
1304 {
1305     if (thrCount <= 0)
1306         thrCount = 1;
1307 
1308     if (thrCount > 256)
1309         thrCount = 256;
1310 
1311     fConfig = Config::makeConfig();
1312     string val = fConfig->getConfig("DBBC", "IOMTracing");
1313     int temp = 0;
1314 
1315     if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1316 
1317     if (temp > 0)
1318         fIOTrace = true;
1319     else
1320         fIOTrace = false;
1321 
1322     val = fConfig->getConfig("DBBC", "MaxOpenFiles");
1323     temp = 0;
1324     fMaxOpenFiles = MAX_OPEN_FILES;
1325 
1326     if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1327 
1328     if (temp > 0)
1329         fMaxOpenFiles = temp;
1330 
1331     val = fConfig->getConfig("DBBC", "DecreaseOpenFilesCount");
1332     temp = 0;
1333     fDecreaseOpenFilesCount = DECREASE_OPEN_FILES;
1334 
1335     if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1336 
1337     if (temp > 0)
1338         fDecreaseOpenFilesCount = temp;
1339 
1340     // limit the number of files closed
1341     if (fDecreaseOpenFilesCount > (uint32_t)(0.75 * fMaxOpenFiles))
1342         fDecreaseOpenFilesCount = (uint32_t)(0.75 * fMaxOpenFiles);
1343 
1344     val = fConfig->getConfig("DBBC", "FDCacheTrace");
1345     temp = 0;
1346     fFDCacheTrace = false;
1347 
1348     if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
1349 
1350     if (temp > 0)
1351     {
1352         fFDCacheTrace = true;
1353 #ifdef _MSC_VER
1354         FDTraceFile().open("C:/Calpont/log/trace/fdcache", ios_base::ate | ios_base::app);
1355 #else
1356         FDTraceFile().open(string(MCSLOGDIR) + "/trace/fdcache", ios_base::ate | ios_base::app);
1357 #endif
1358     }
1359 
1360     fThreadCount = thrCount;
1361     go();
1362 }
1363 
buildOidFileName(const BRM::OID_t oid,uint16_t dbRoot,const uint32_t partNum,const uint16_t segNum,char * file_name)1364 void ioManager::buildOidFileName(const BRM::OID_t oid, uint16_t dbRoot, const uint32_t partNum, const uint16_t segNum, char* file_name)
1365 {
1366     // when it's a request for the version buffer, the dbroot comes in as 0 for legacy reasons
1367     if (dbRoot == 0 && oid < 1000)
1368         dbRoot = fdbrm.getDBRootOfVBOID(oid);
1369 
1370     fFileOp.getFileNameForPrimProc(oid, file_name, dbRoot, partNum, segNum);
1371 }
1372 
localLbidLookup(BRM::LBID_t lbid,BRM::VER_t verid,bool vbFlag,BRM::OID_t & oid,uint16_t & dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,uint32_t & fileBlockOffset)1373 int ioManager::localLbidLookup(BRM::LBID_t lbid,
1374                                      BRM::VER_t verid,
1375                                      bool vbFlag,
1376                                      BRM::OID_t& oid,
1377                                      uint16_t& dbRoot,
1378                                      uint32_t& partitionNum,
1379                                      uint16_t& segmentNum,
1380                                      uint32_t& fileBlockOffset)
1381 {
1382     if (primitiveprocessor::noVB > 0)
1383         vbFlag = false;
1384 
1385     int rc = fdbrm.lookupLocal(lbid,
1386                                verid,
1387                                vbFlag,
1388                                oid,
1389                                dbRoot,
1390                                partitionNum,
1391                                segmentNum,
1392                                fileBlockOffset);
1393 
1394     return rc;
1395 }
1396 
1397 struct LambdaKludge
1398 {
LambdaKludgedbbc::LambdaKludge1399     LambdaKludge(ioManager* i) : iom(i) { }
~LambdaKludgedbbc::LambdaKludge1400     ~LambdaKludge()
1401     {
1402         iom = NULL;
1403     }
1404     ioManager* iom;
operator ()dbbc::LambdaKludge1405     void operator()()
1406     {
1407         thr_popper(iom);
1408     }
1409 };
1410 
createReaders()1411 void ioManager::createReaders()
1412 {
1413     int idx;
1414 
1415     for (idx = 0; idx < fThreadCount; idx++)
1416     {
1417         try
1418         {
1419             fThreadArr.create_thread(LambdaKludge(this));
1420         }
1421         catch (exception& e)
1422         {
1423             cerr << "IOM::createReaders() caught " << e.what() << endl;
1424             idx--;
1425             sleep(1);
1426             continue;
1427         }
1428     }
1429 }
1430 
~ioManager()1431 ioManager::~ioManager()
1432 {
1433     stop();
1434 }
1435 
go(void)1436 void ioManager::go(void)
1437 {
1438     createReaders();
1439 }
1440 
1441 
1442 //FIXME: is this right? what does this method do?
stop()1443 void ioManager::stop()
1444 {
1445     for (int idx = 0; idx < fThreadCount; idx++)
1446     {
1447         (void)0; //pthread_detach(fThreadArr[idx]);
1448     }
1449 }
1450 
1451 
getNextRequest()1452 fileRequest* ioManager::getNextRequest()
1453 {
1454     fileRequest* blk = 0;
1455 
1456     try
1457     {
1458         blk = fIOMRequestQueue.pop();
1459         return blk;
1460     }
1461     catch (exception&)
1462     {
1463         cerr << "ioManager::getNextRequest() ERROR " << endl;
1464     }
1465 
1466     return blk;
1467 
1468 }
1469 
1470 //------------------------------------------------------------------------------
1471 // Prints stderr msg and updates fileRequest object to reflect an error.
1472 // Lastly, notifies waiting thread that fileRequest has been completed.
1473 //------------------------------------------------------------------------------
handleBlockReadError(fileRequest * fr,const string & errMsg,bool * copyLocked,int errorCode)1474 void ioManager::handleBlockReadError( fileRequest* fr,
1475                                       const string& errMsg, bool* copyLocked, int errorCode )
1476 {
1477     try
1478     {
1479         dbrm()->releaseLBIDRange(fr->Lbid(), fr->BlocksRequested());
1480         *copyLocked = false;
1481     }
1482     catch (exception& e)
1483     {
1484         cout << "releaseRange on read error: " << e.what() << endl;
1485     }
1486 
1487     cerr << errMsg << endl;
1488     fr->RequestStatus(errorCode);
1489     fr->RequestStatusStr(errMsg);
1490 
1491     fr->frMutex().lock();
1492     fr->SetPredicate(fileRequest::COMPLETE);
1493     fr->frCond().notify_one();
1494     fr->frMutex().unlock();
1495 }
1496 
1497 }
1498 // vim:ts=4 sw=4:
1499