1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /***************************************************************************
20  *
21  *   $Id: filebuffermgr.cpp 2045 2013-01-30 20:26:59Z pleblanc $
22  *
23  *   jrodriguez@calpont.com   *
24  *                                                                         *
25  ***************************************************************************/
26 /**
27 * 	InitialDBBCSize - the starting number of elements the unordered set used to store disk blocks.
28 	This does not instantiate InitialDBBCSize disk blocks but only the initial size of the unordered_set
29 
30 **/
31 
32 //#define NDEBUG
33 #include <cassert>
34 #include <limits>
35 #include <boost/thread.hpp>
36 
37 #ifndef _MSC_VER
38 #include <pthread.h>
39 #endif
40 
41 #include "stats.h"
42 #include "configcpp.h"
43 #include "filebuffermgr.h"
44 #include "mcsconfig.h"
45 
46 using namespace config;
47 using namespace boost;
48 using namespace std;
49 using namespace BRM;
50 
51 extern dbbc::Stats* gPMStatsPtr;
52 extern bool gPMProfOn;
53 extern uint32_t gSession;
54 
55 namespace dbbc
56 {
57 const uint32_t gReportingFrequencyMin(32768);
58 
FileBufferMgr(const uint32_t numBlcks,const uint32_t blkSz,const uint32_t deleteBlocks)59 FileBufferMgr::FileBufferMgr(const uint32_t numBlcks, const uint32_t blkSz, const uint32_t deleteBlocks)
60     : fMaxNumBlocks(numBlcks),
61       fBlockSz(blkSz),
62       fWLock(),
63       fbSet(),
64       fbList(),
65       fCacheSize(0),
66       fFBPool(),
67       fDeleteBlocks(deleteBlocks),
68       fEmptyPoolSlots(),
69       fReportFrequency(0)
70 {
71     fFBPool.reserve(numBlcks);
72     fConfig = Config::makeConfig();
73     setReportingFrequency(0);
74 #ifdef _MSC_VER
75     fLog.open("C:/Calpont/log/trace/bc", ios_base::app | ios_base::ate);
76 #else
77     fLog.open(string(MCSLOGDIR) + "/trace/bc", ios_base::app | ios_base::ate);
78 #endif
79 }
80 
~FileBufferMgr()81 FileBufferMgr::~FileBufferMgr()
82 {
83     flushCache();
84 }
85 
86 // param d is used as a togle only
setReportingFrequency(const uint32_t d)87 void FileBufferMgr::setReportingFrequency(const uint32_t d)
88 {
89     if (d == 0)
90     {
91         fReportFrequency = 0;
92         return;
93     }
94 
95     const string val = fConfig->getConfig("DBBC", "ReportFrequency");
96     uint32_t temp = 0;
97 
98     if (val.length() > 0) temp = static_cast<int>(Config::fromText(val));
99 
100     if (temp > 0 && temp <= gReportingFrequencyMin)
101         fReportFrequency = gReportingFrequencyMin;
102     else
103         fReportFrequency = temp;
104 
105 }
106 
flushCache()107 void FileBufferMgr::flushCache()
108 {
109     boost::mutex::scoped_lock lk(fWLock);
110     {
111         filebuffer_uset_t sEmpty;
112         filebuffer_list_t lEmpty;
113         emptylist_t vEmpty;
114 
115         fbList.swap(lEmpty);
116         fbSet.swap(sEmpty);
117         fEmptyPoolSlots.swap(vEmpty);
118     }
119     fCacheSize = 0;
120 
121     // the block pool should not be freed in the above block to allow us
122     // to continue doing concurrent unprotected-but-"safe" memcpys
123     // from that memory
124     if (fReportFrequency)
125     {
126         fLog << "Clearing entire cache" << endl;
127     }
128     fFBPool.clear();
129 //	fFBPool.reserve(fMaxNumBlocks);
130 }
131 
flushOne(const BRM::LBID_t lbid,const BRM::VER_t ver)132 void FileBufferMgr::flushOne(const BRM::LBID_t lbid, const BRM::VER_t ver)
133 {
134     //similar in function to depleteCache()
135     boost::mutex::scoped_lock lk(fWLock);
136 
137     filebuffer_uset_iter_t iter = fbSet.find(HashObject_t(lbid, ver, 0));
138 
139     if (iter != fbSet.end())
140     {
141         //remove it from fbList
142         uint32_t idx = iter->poolIdx;
143         fbList.erase(fFBPool[idx].listLoc());
144         //add to fEmptyPoolSlots
145         fEmptyPoolSlots.push_back(idx);
146         //remove it from fbSet
147         fbSet.erase(iter);
148         //adjust fCacheSize
149         fCacheSize--;
150     }
151 
152 }
153 
flushMany(const LbidAtVer * laVptr,uint32_t cnt)154 void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt)
155 {
156     boost::mutex::scoped_lock lk(fWLock);
157 
158     BRM::LBID_t lbid;
159     BRM::VER_t ver;
160     filebuffer_uset_iter_t iter;
161     if (fReportFrequency)
162     {
163         fLog << "flushMany " << cnt << " items: ";
164         for (uint32_t j = 0; j < cnt; j++)
165         {
166             fLog << "lbid: " << laVptr[j].LBID << " ver: " << laVptr[j].Ver << ", ";
167         }
168         fLog << endl;
169     }
170     for (uint32_t j = 0; j < cnt; j++)
171     {
172         lbid = static_cast<BRM::LBID_t>(laVptr->LBID);
173         ver = static_cast<BRM::VER_t>(laVptr->Ver);
174         iter = fbSet.find(HashObject_t(lbid, ver, 0));
175 
176         if (iter != fbSet.end())
177         {
178             if (fReportFrequency)
179             {
180                 fLog << "flushMany hit, lbid: " << lbid << " index: " << iter->poolIdx << endl;
181             }
182             //remove it from fbList
183             uint32_t idx = iter->poolIdx;
184             fbList.erase(fFBPool[idx].listLoc());
185             //add to fEmptyPoolSlots
186             fEmptyPoolSlots.push_back(idx);
187             //remove it from fbSet
188             fbSet.erase(iter);
189             //adjust fCacheSize
190             fCacheSize--;
191         }
192 
193         ++laVptr;
194     }
195 }
196 
flushManyAllversion(const LBID_t * laVptr,uint32_t cnt)197 void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt)
198 {
199     filebuffer_uset_t::iterator it, tmpIt;
200     tr1::unordered_set<LBID_t> uniquer;
201     tr1::unordered_set<LBID_t>::iterator uit;
202 
203     boost::mutex::scoped_lock lk(fWLock);
204 
205     if (fReportFrequency)
206     {
207         fLog << "flushManyAllversion " << cnt << " items: ";
208         for (uint32_t i = 0; i < cnt; i++)
209         {
210             fLog << laVptr[i] << ", ";
211         }
212         fLog << endl;
213     }
214 
215     if (fCacheSize == 0 || cnt == 0)
216         return;
217 
218     for (uint32_t i = 0; i < cnt; i++)
219         uniquer.insert(laVptr[i]);
220 
221     for (it = fbSet.begin(); it != fbSet.end();)
222     {
223         if (uniquer.find(it->lbid) != uniquer.end())
224         {
225             if (fReportFrequency)
226             {
227                 fLog << "flushManyAllversion hit: " << it->lbid << " index: " << it->poolIdx << endl;
228             }
229             const uint32_t idx = it->poolIdx;
230             fbList.erase(fFBPool[idx].listLoc());
231             fEmptyPoolSlots.push_back(idx);
232             tmpIt = it;
233             ++it;
234             fbSet.erase(tmpIt);
235             fCacheSize--;
236         }
237         else
238             ++it;
239     }
240 }
241 
flushOIDs(const uint32_t * oids,uint32_t count)242 void FileBufferMgr::flushOIDs(const uint32_t* oids, uint32_t count)
243 {
244     DBRM dbrm;
245     uint32_t i;
246     vector<EMEntry> extents;
247     int err;
248     uint32_t currentExtent;
249     LBID_t currentLBID;
250     typedef tr1::unordered_multimap<LBID_t, filebuffer_uset_t::iterator> byLBID_t;
251     byLBID_t byLBID;
252     pair<byLBID_t::iterator, byLBID_t::iterator> itList;
253     filebuffer_uset_t::iterator it;
254 
255     if (fReportFrequency)
256     {
257         fLog << "flushOIDs " << count << " items: ";
258         for (uint32_t i = 0; i < count; i++)
259         {
260             fLog << oids[i] << ", ";
261         }
262         fLog << endl;
263     }
264 
265     // If there are more than this # of extents to drop, the whole cache will be cleared
266     const uint32_t clearThreshold = 50000;
267 
268     boost::mutex::scoped_lock lk(fWLock);
269 
270     if (fCacheSize == 0 || count == 0)
271         return;
272 
273     /* Index the cache by LBID */
274     for (it = fbSet.begin(); it != fbSet.end(); it++)
275         byLBID.insert(pair<LBID_t, filebuffer_uset_t::iterator>(it->lbid, it));
276 
277     for (i = 0; i < count; i++)
278     {
279         extents.clear();
280         err = dbrm.getExtents(oids[i], extents, true, true, true); // @Bug 3838 Include outofservice extents
281 
282         if (err < 0 || (i == 0 && (extents.size() * count) > clearThreshold))
283         {
284             // (The i == 0 should ensure it's not a dictionary column)
285             lk.unlock();
286             flushCache();
287             return;
288         }
289 
290         for (currentExtent = 0; currentExtent < extents.size(); currentExtent++)
291         {
292             EMEntry& range = extents[currentExtent];
293             LBID_t lastLBID = range.range.start + (range.range.size * 1024);
294 
295             for (currentLBID = range.range.start; currentLBID < lastLBID;
296                     currentLBID++)
297             {
298                 itList = byLBID.equal_range(currentLBID);
299 
300                 for (byLBID_t::iterator tmpIt = itList.first; tmpIt != itList.second;
301                         tmpIt++)
302                 {
303                     fbList.erase(fFBPool[tmpIt->second->poolIdx].listLoc());
304                     fEmptyPoolSlots.push_back(tmpIt->second->poolIdx);
305                     fbSet.erase(tmpIt->second);
306                     fCacheSize--;
307                 }
308             }
309         }
310     }
311 }
312 
flushPartition(const vector<OID_t> & oids,const set<BRM::LogicalPartition> & partitions)313 void FileBufferMgr::flushPartition(const vector<OID_t>& oids, const set<BRM::LogicalPartition>& partitions)
314 {
315     DBRM dbrm;
316     uint32_t i;
317     vector<EMEntry> extents;
318     int err;
319     uint32_t currentExtent;
320     LBID_t currentLBID;
321     typedef tr1::unordered_multimap<LBID_t, filebuffer_uset_t::iterator> byLBID_t;
322     byLBID_t byLBID;
323     pair<byLBID_t::iterator, byLBID_t::iterator> itList;
324     filebuffer_uset_t::iterator it;
325     uint32_t count = oids.size();
326 
327     boost::mutex::scoped_lock lk(fWLock);
328 
329     if (fReportFrequency)
330     {
331         std::set<BRM::LogicalPartition>::iterator sit;
332         fLog << "flushPartition oids: ";
333         for (uint32_t i = 0; i < count; i++)
334         {
335             fLog << oids[i] << ", ";
336         }
337         fLog << "flushPartition partitions: ";
338         for (sit = partitions.begin(); sit != partitions.end(); ++sit)
339         {
340             fLog << (*sit).toString() << ", ";
341         }
342         fLog << endl;
343     }
344 
345     if (fCacheSize == 0 || oids.size() == 0 || partitions.size() == 0)
346         return;
347 
348     /* Index the cache by LBID */
349     for (it = fbSet.begin(); it != fbSet.end(); it++)
350         byLBID.insert(pair<LBID_t, filebuffer_uset_t::iterator>(it->lbid, it));
351 
352     for (i = 0; i < count; i++)
353     {
354         extents.clear();
355         err = dbrm.getExtents(oids[i], extents, true, true, true); // @Bug 3838 Include outofservice extents
356 
357         if (err < 0)
358         {
359             lk.unlock();
360             flushCache();   // better than returning an error code to the user
361             return;
362         }
363 
364         for (currentExtent = 0; currentExtent < extents.size(); currentExtent++)
365         {
366             EMEntry& range = extents[currentExtent];
367 
368             LogicalPartition logicalPartNum(range.dbRoot, range.partitionNum, range.segmentNum);
369 
370             if (partitions.find(logicalPartNum) == partitions.end())
371                 continue;
372 
373             LBID_t lastLBID = range.range.start + (range.range.size * 1024);
374 
375             for (currentLBID = range.range.start; currentLBID < lastLBID; currentLBID++)
376             {
377                 itList = byLBID.equal_range(currentLBID);
378 
379                 for (byLBID_t::iterator tmpIt = itList.first; tmpIt != itList.second;
380                         tmpIt++)
381                 {
382                     fbList.erase(fFBPool[tmpIt->second->poolIdx].listLoc());
383                     fEmptyPoolSlots.push_back(tmpIt->second->poolIdx);
384                     fbSet.erase(tmpIt->second);
385                     fCacheSize--;
386                 }
387             }
388         }
389     }
390 }
391 
392 
exists(const BRM::LBID_t & lbid,const BRM::VER_t & ver) const393 bool FileBufferMgr::exists(const BRM::LBID_t& lbid, const BRM::VER_t& ver) const
394 {
395     const HashObject_t fb(lbid, ver, 0);
396     const bool b = exists(fb);
397     return b;
398 }
399 
findPtr(const HashObject_t & keyFb)400 FileBuffer* FileBufferMgr::findPtr(const HashObject_t& keyFb)
401 {
402     boost::mutex::scoped_lock lk(fWLock);
403 
404     filebuffer_uset_iter_t it = fbSet.find(keyFb);
405 
406     if (fbSet.end() != it)
407     {
408         FileBuffer* fb = &(fFBPool[it->poolIdx]);
409         fFBPool[it->poolIdx].listLoc()->hits++;
410         fbList.splice( fbList.begin(), fbList, (fFBPool[it->poolIdx]).listLoc() );
411         return fb;
412     }
413 
414     return NULL;
415 }
416 
417 
find(const HashObject_t & keyFb,FileBuffer & fb)418 bool FileBufferMgr::find(const HashObject_t& keyFb, FileBuffer& fb)
419 {
420     bool ret = false;
421 
422     boost::mutex::scoped_lock lk(fWLock);
423 
424     filebuffer_uset_iter_t it = fbSet.find(keyFb);
425 
426     if (fbSet.end() != it)
427     {
428         fFBPool[it->poolIdx].listLoc()->hits++;
429         fbList.splice( fbList.begin(), fbList, (fFBPool[it->poolIdx]).listLoc() );
430         fb = fFBPool[it->poolIdx];
431         ret = true;
432     }
433 
434     return ret;
435 }
436 
find(const HashObject_t & keyFb,void * bufferPtr)437 bool FileBufferMgr::find(const HashObject_t& keyFb, void* bufferPtr)
438 {
439     bool ret = false;
440 
441     if (gPMProfOn && gPMStatsPtr)
442 #ifdef _MSC_VER
443         gPMStatsPtr->markEvent(keyFb.lbid, GetCurrentThreadId(), gSession, 'L');
444 
445 #else
446         gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'L');
447 #endif
448     boost::mutex::scoped_lock lk(fWLock);
449 
450     if (gPMProfOn && gPMStatsPtr)
451 #ifdef _MSC_VER
452         gPMStatsPtr->markEvent(keyFb.lbid, GetCurrentThreadId(), gSession, 'M');
453 
454 #else
455         gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'M');
456 #endif
457     filebuffer_uset_iter_t it = fbSet.find(keyFb);
458 
459     if (fbSet.end() != it)
460     {
461         uint32_t idx = it->poolIdx;
462 
463         //@bug 669 LRU cache, move block to front of list as last recently used.
464         fFBPool[idx].listLoc()->hits++;
465         fbList.splice(fbList.begin(), fbList, (fFBPool[idx]).listLoc());
466         lk.unlock();
467         memcpy(bufferPtr, (fFBPool[idx]).getData(), 8192);
468 
469         if (gPMProfOn && gPMStatsPtr)
470 #ifdef _MSC_VER
471             gPMStatsPtr->markEvent(keyFb.lbid, GetCurrentThreadId(), gSession, 'U');
472 
473 #else
474             gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'U');
475 #endif
476         ret = true;
477     }
478 
479     return ret;
480 }
481 
bulkFind(const BRM::LBID_t * lbids,const BRM::VER_t * vers,uint8_t ** buffers,bool * wasCached,uint32_t count)482 uint32_t FileBufferMgr::bulkFind(const BRM::LBID_t* lbids, const BRM::VER_t* vers, uint8_t** buffers,
483                                  bool* wasCached, uint32_t count)
484 {
485     uint32_t i, ret = 0;
486     filebuffer_uset_iter_t* it = (filebuffer_uset_iter_t*) alloca(count * sizeof(filebuffer_uset_iter_t));
487     uint32_t* indexes = (uint32_t*) alloca(count * 4);
488 
489     if (gPMProfOn && gPMStatsPtr)
490     {
491         for (i = 0; i < count; i++)
492         {
493 #ifdef _MSC_VER
494             gPMStatsPtr->markEvent(lbids[i], GetCurrentThreadId(), gSession, 'L');
495 #else
496             gPMStatsPtr->markEvent(lbids[i], pthread_self(), gSession, 'L');
497 #endif
498         }
499     }
500 
501     boost::mutex::scoped_lock lk(fWLock);
502 
503     if (gPMProfOn && gPMStatsPtr)
504     {
505         for (i = 0; i < count; i++)
506         {
507 #ifdef _MSC_VER
508             gPMStatsPtr->markEvent(lbids[i], GetCurrentThreadId(), gSession, 'M');
509 #else
510             gPMStatsPtr->markEvent(lbids[i], pthread_self(), gSession, 'M');
511 #endif
512         }
513     }
514 
515     for (i = 0; i < count; i++)
516     {
517         new ((void*) &it[i]) filebuffer_uset_iter_t();
518         it[i] = fbSet.find(HashObject_t(lbids[i], vers[i], 0));
519 
520         if (it[i] != fbSet.end())
521         {
522             indexes[i] = it[i]->poolIdx;
523             wasCached[i] = true;
524             fFBPool[it[i]->poolIdx].listLoc()->hits++;
525             fbList.splice(fbList.begin(), fbList, (fFBPool[it[i]->poolIdx]).listLoc());
526         }
527         else
528         {
529             wasCached[i] = false;
530             indexes[i] = 0;
531         }
532     }
533 
534     lk.unlock();
535 
536     for (i = 0; i < count; i++)
537     {
538         if (wasCached[i])
539         {
540             memcpy(buffers[i], fFBPool[indexes[i]].getData(), 8192);
541             ret++;
542 
543             if (gPMProfOn && gPMStatsPtr)
544             {
545 #ifdef _MSC_VER
546                 gPMStatsPtr->markEvent(lbids[i], GetCurrentThreadId(), gSession, 'U');
547 #else
548                 gPMStatsPtr->markEvent(lbids[i], pthread_self(), gSession, 'U');
549 #endif
550             }
551         }
552 
553         it[i].filebuffer_uset_iter_t::~filebuffer_uset_iter_t();
554     }
555 
556     return ret;
557 }
558 
exists(const HashObject_t & fb) const559 bool FileBufferMgr::exists(const HashObject_t& fb) const
560 {
561     bool find_bool = false;
562     boost::mutex::scoped_lock lk(fWLock);
563 
564     filebuffer_uset_iter_t it = fbSet.find(fb);
565 
566     if (it != fbSet.end())
567     {
568         find_bool = true;
569         fFBPool[it->poolIdx].listLoc()->hits++;
570         fbList.splice(fbList.begin(), fbList, (fFBPool[it->poolIdx]).listLoc());
571     }
572 
573     return find_bool;
574 }
575 
576 // default insert operation.
577 // add a new fb into fbMgr and to fbList
578 // add to the front and age out from the back
579 // so add new fbs to the front of the list
580 //@bug 665: keep filebuffer in a vector. HashObject keeps the index of the filebuffer
581 
insert(const BRM::LBID_t lbid,const BRM::VER_t ver,const uint8_t * data)582 int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const uint8_t* data)
583 {
584     int ret = 0;
585 
586     if (gPMProfOn && gPMStatsPtr)
587 #ifdef _MSC_VER
588         gPMStatsPtr->markEvent(lbid, GetCurrentThreadId(), gSession, 'I');
589 
590 #else
591         gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'I');
592 #endif
593 
594     boost::mutex::scoped_lock lk(fWLock);
595 
596     HashObject_t fbIndex(lbid, ver, 0);
597     filebuffer_pair_t pr = fbSet.insert(fbIndex);
598 
599     if (pr.second)
600     {
601         // It was inserted (it wasn't there before)
602         // Right now we have an invalid cache: we have inserted an entry with a -1 index.
603         // We need to fix this quickly...
604         fCacheSize++;
605         FBData_t fbdata = {lbid, ver, 0};
606         fbList.push_front(fbdata);
607         fBlksLoaded++;
608 
609         if (fReportFrequency && (fBlksLoaded % fReportFrequency) == 0)
610         {
611             struct timespec tm;
612             clock_gettime(CLOCK_MONOTONIC, &tm);
613 			fLog << "insert: "
614                     << left << fixed << ((double)(tm.tv_sec + (1.e-9 * tm.tv_nsec))) << " "
615                     << right << setw(12) << fBlksLoaded << " "
616                     << right << setw(12) << fBlksNotUsed << endl;
617         }
618     }
619     else
620     {
621         // if it's a duplicate there's nothing to do
622         if (gPMProfOn && gPMStatsPtr)
623 #ifdef _MSC_VER
624             gPMStatsPtr->markEvent(lbid, GetCurrentThreadId(), gSession, 'D');
625 
626 #else
627             gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'D');
628 #endif
629         return ret;
630     }
631 
632     uint32_t pi = numeric_limits<int>::max();
633 
634     if (fCacheSize > maxCacheSize())
635     {
636         // If the insert above caused the cache to exceed its max size, find the lru block in
637         // the cache and use its pool index to store the block data.
638         FBData_t& fbdata = fbList.back();	//the lru block
639         HashObject_t lastFB(fbdata.lbid, fbdata.ver, 0);
640         filebuffer_uset_iter_t iter = fbSet.find( lastFB ); //should be there
641 
642         idbassert(iter != fbSet.end());
643         pi = iter->poolIdx;
644         idbassert(pi < maxCacheSize());
645         idbassert(pi < fFBPool.size());
646 
647         // set iters are always const. We are not changing the hash here, and this gets us
648         // the pointer we need cheaply...
649         HashObject_t& ref = const_cast<HashObject_t&>(*pr.first);
650         ref.poolIdx = pi;
651 
652         //replace the lru block with this block
653         FileBuffer fb(lbid, ver, NULL, 0);
654         fFBPool[pi] = fb;
655         fFBPool[pi].setData(data, 8192);
656         fbSet.erase(iter);
657 
658         if (fbList.back().hits == 0)
659             fBlksNotUsed++;
660 
661         fbList.pop_back();
662         fCacheSize--;
663         depleteCache();
664         ret = 1;
665     }
666     else
667     {
668         if ( ! fEmptyPoolSlots.empty() )
669         {
670             pi = fEmptyPoolSlots.front();
671             fEmptyPoolSlots.pop_front();
672             FileBuffer fb(lbid, ver, NULL, 0);
673             fFBPool[pi] = fb;
674             fFBPool[pi].setData(data, 8192);
675         }
676         else
677         {
678             pi = fFBPool.size();
679             FileBuffer fb(lbid, ver, NULL, 0);
680             fFBPool.push_back(fb);
681             fFBPool[pi].setData(data, 8192);
682         }
683 
684         // See comment above
685         HashObject_t& ref = const_cast<HashObject_t&>(*pr.first);
686         ref.poolIdx = pi;
687         ret = 1;
688     }
689 
690     idbassert(pi < fFBPool.size());
691     fFBPool[pi].listLoc(fbList.begin());
692 
693     if (gPMProfOn && gPMStatsPtr)
694 #ifdef _MSC_VER
695         gPMStatsPtr->markEvent(lbid, GetCurrentThreadId(), gSession, 'J');
696 
697 #else
698         gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'J');
699 #endif
700 
701     idbassert(fCacheSize <= maxCacheSize());
702 // 	idbassert(fCacheSize == fbSet.size());
703 // 	idbassert(fCacheSize == fbList.size());
704     return ret;
705 }
706 
707 
depleteCache()708 void FileBufferMgr::depleteCache()
709 {
710     for (uint32_t i = 0; i < fDeleteBlocks && !fbList.empty(); ++i)
711     {
712         FBData_t fbdata(fbList.back());	//the lru block
713         HashObject_t lastFB(fbdata.lbid, fbdata.ver, 0);
714         filebuffer_uset_iter_t iter = fbSet.find( lastFB );
715 
716         idbassert(iter != fbSet.end());
717         uint32_t idx = iter->poolIdx;
718         idbassert(idx < fFBPool.size());
719         //Save position in FileBuffer pool for reuse.
720         fEmptyPoolSlots.push_back(idx);
721         fbSet.erase(iter);
722 
723         if (fbList.back().hits == 0)
724             fBlksNotUsed++;
725 
726         fbList.pop_back();
727         fCacheSize--;
728     }
729 }
730 
formatLRUList(ostream & os) const731 ostream& FileBufferMgr::formatLRUList(ostream& os) const
732 {
733     filebuffer_list_t::const_iterator iter = fbList.begin();
734     filebuffer_list_t::const_iterator end = fbList.end();
735 
736     while (iter != end)
737     {
738         os << iter->lbid << '\t' << iter->ver << endl;
739         ++iter;
740     }
741 
742     return os;
743 }
744 
745 // puts the new entry at the front of the list
updateLRU(const FBData_t & f)746 void FileBufferMgr::updateLRU(const FBData_t& f)
747 {
748     if (fCacheSize > maxCacheSize())
749     {
750         list<FBData_t>::iterator last = fbList.end();
751         last--;
752         FBData_t& fbdata = *last;
753         HashObject_t lastFB(fbdata.lbid, fbdata.ver, 0);
754         filebuffer_uset_iter_t iter = fbSet.find(lastFB);
755         fEmptyPoolSlots.push_back(iter->poolIdx);
756 
757         if (fbdata.hits == 0)
758             fBlksNotUsed++;
759 
760         fbSet.erase(iter);
761         fbList.splice(fbList.begin(), fbList, last);
762         fbdata = f;
763         fCacheSize--;
764         //cout << "booted an entry\n";
765     }
766     else
767     {
768         //cout << "new entry\n";
769         fbList.push_front(f);
770     }
771 }
772 
doBlockCopy(const BRM::LBID_t & lbid,const BRM::VER_t & ver,const uint8_t * data)773 uint32_t FileBufferMgr::doBlockCopy(const BRM::LBID_t& lbid, const BRM::VER_t& ver, const uint8_t* data)
774 {
775     uint32_t poolIdx;
776 
777     if (!fEmptyPoolSlots.empty())
778     {
779         poolIdx = fEmptyPoolSlots.front();
780         fEmptyPoolSlots.pop_front();
781     }
782     else
783     {
784         poolIdx = fFBPool.size();
785         fFBPool.resize(poolIdx + 1);   //shouldn't trigger a 'real' resize b/c of the reserve call
786     }
787 
788     fFBPool[poolIdx].Lbid(lbid);
789     fFBPool[poolIdx].Verid(ver);
790     fFBPool[poolIdx].setData(data);
791     return poolIdx;
792 }
793 
bulkInsert(const vector<CacheInsert_t> & ops)794 int FileBufferMgr::bulkInsert(const vector<CacheInsert_t>& ops)
795 {
796     uint32_t i;
797     int32_t pi;
798     int ret = 0;
799 
800     boost::mutex::scoped_lock lk(fWLock);
801 
802     if (fReportFrequency)
803     {
804         fLog << "bulkInsert: ";
805     }
806 
807 	for (i = 0; i < ops.size(); i++) {
808 		const CacheInsert_t &op = ops[i];
809 
810         if (gPMProfOn && gPMStatsPtr)
811 #ifdef _MSC_VER
812             gPMStatsPtr->markEvent(op.lbid, GetCurrentThreadId(), gSession, 'I');
813 
814 #else
815             gPMStatsPtr->markEvent(op.lbid, pthread_self(), gSession, 'I');
816 #endif
817 
818         HashObject_t fbIndex(op.lbid, op.ver, 0);
819         filebuffer_pair_t pr = fbSet.insert(fbIndex);
820 
821         if (!pr.second)
822         {
823             if (gPMProfOn && gPMStatsPtr)
824 #ifdef _MSC_VER
825                 gPMStatsPtr->markEvent(op.lbid, GetCurrentThreadId(), gSession, 'D');
826 
827 #else
828                 gPMStatsPtr->markEvent(op.lbid, pthread_self(), gSession, 'D');
829 #endif
830             continue;
831         }
832 
833         if (fReportFrequency)
834         {
835 		    fLog << op.lbid << " " << op.ver << ", ";
836         }
837         fCacheSize++;
838         fBlksLoaded++;
839         FBData_t fbdata = {op.lbid, op.ver, 0};
840         updateLRU(fbdata);
841         pi = doBlockCopy(op.lbid, op.ver, op.data);
842 
843         HashObject_t& ref = const_cast<HashObject_t&>(*pr.first);
844         ref.poolIdx = pi;
845         fFBPool[pi].listLoc(fbList.begin());
846 
847         if (gPMProfOn && gPMStatsPtr)
848 #ifdef _MSC_VER
849             gPMStatsPtr->markEvent(op.lbid, GetCurrentThreadId(), gSession, 'J');
850 
851 #else
852             gPMStatsPtr->markEvent(op.lbid, pthread_self(), gSession, 'J');
853 #endif
854         ret++;
855     }
856     if (fReportFrequency)
857     {
858         fLog << endl;
859     }
860     idbassert(fCacheSize <= maxCacheSize());
861 
862     return ret;
863 }
864 
865 
866 }
867