1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // CegoBufferPool.cc
4 // -----------------
5 // Cego buffer pool implementation module
6 //
7 // Design and Implementation by Bjoern Lemke
8 //
9 // (C)opyright 2000-2019 Bjoern Lemke
10 //
11 // IMPLEMENTATION MODULE
12 //
13 // Class: CegoBufferPool
14 //
15 // Description: The buffer pool management class
16 //
17 // Status: CLEAN
18 //
19 ///////////////////////////////////////////////////////////////////////////////
20
21 // LFC INCLUDES
22 #include <lfcbase/Exception.h>
23 #include <lfcbase/CommandExecuter.h>
24 #include <lfcbase/Datetime.h>
25 #include <lfcbase/Sleeper.h>
26 #include <lfcbase/ThreadLock.h>
27
28 // CEGO INCLUDES
29 #include "CegoBufferPool.h"
30 #include "CegoDefs.h"
31 #include "CegoXMLdef.h"
32 #include "CegoCheckpointDump.h"
33
34 // POSIX INCLUDES
35 #include <string.h>
36 #include <stdlib.h>
37
38 #define BUFFERPOOLHEAD_SIZE (((sizeof(BufferPoolHead)-1)/BUPMNG_ALIGNMENT)+1)*BUPMNG_ALIGNMENT
39 #define BUFFERHEAD_SIZE (((sizeof(BufferHead)-1)/BUPMNG_ALIGNMENT)+1)*BUPMNG_ALIGNMENT
40
41 static ThreadLock _lmLock[TABMNG_MAXTABSET];
42 extern bool __lockStatOn;
43
CegoBufferPool(const Chain & xmlDef,const Chain & logFile,const Chain & progName)44 CegoBufferPool::CegoBufferPool(const Chain& xmlDef, const Chain& logFile, const Chain& progName) : CegoLogManager(xmlDef, logFile, progName)
45 {
46 _pBufPool = 0;
47 _numDiskRead=0;
48 _numDiskWrite=0;
49 _fixCount=0;
50 _fixTry=0;
51 _avgReadDelay=0;
52 _avgWriteDelay=0;
53 _cpCount = 0;
54
55 for (int i=0;i< TABMNG_MAXTABSET;i++)
56 {
57 _lmLock[i].init(LCKMNG_LOCKWAITDELAY, __lockStatOn);
58 }
59
60 Datetime ts;
61 _statStart = ts.asLong();
62 _poolStart = ts.asLong();
63 _modId = getModId("CegoBufferPool");
64 }
65
~CegoBufferPool()66 CegoBufferPool::~CegoBufferPool()
67 {
68 }
69
initPool(unsigned long long numSegment,unsigned long long numPages)70 void CegoBufferPool::initPool(unsigned long long numSegment, unsigned long long numPages)
71 {
72 #ifdef DEBUG
73 log(_modId, Logger::DEBUG, Chain("Reading xml def ..."));
74 #endif
75
76 Chain dbName = getDbName();
77 int pageSize = getPageSize();
78 _maxFixTries = getMaxFixTries();
79 _maxPageDelete = getMaxPageDelete();
80
81 _dbName = dbName;
82 _numSegment = numSegment;
83 _numPages = numPages;
84 _pageSize = pageSize;
85
86 if (_pBufPool == 0)
87 {
88
89 log(_modId, Logger::NOTICE, Chain("Allocating ") + Chain(_numSegment) + Chain(" buffer pool segments ( each ") + Chain(_numPages) + Chain(" pages ) ..."));
90
91
92 _pBufPool = (void**)malloc(_numSegment * sizeof(void*));
93 if ( _pBufPool == NULL )
94 {
95 throw Exception(EXLOC, "Cannot initialize pool");
96 }
97
98
99 for ( int i=0; i<_numSegment; i++)
100 {
101 _pBufPool[i] = malloc(_numPages * (BUFFERHEAD_SIZE + _pageSize) + BUFFERPOOLHEAD_SIZE);
102
103 if ( _pBufPool[i] == NULL )
104 {
105 throw Exception(EXLOC, "Cannot initialize pool");
106 }
107
108 log(_modId, Logger::NOTICE, Chain("Initializing buffer pool pages for segment ") + Chain(i) + Chain(" ..."));
109
110 void *base = (void*)_pBufPool[i];
111 void *ptr = (void*)_pBufPool[i];
112
113 BufferPoolHead bph;
114 bph.numPages=_numPages;
115
116 memcpy(base, &bph, sizeof(BufferPoolHead));
117 // memcpy(base, &bph, BUFFERPOOLHEAD_SIZE);
118
119 ptr = (void*)(BUFFERPOOLHEAD_SIZE + (long long)base);
120
121 BufferHead ibh;
122 ibh.isOccupied = NOT_OCCUPIED;
123 ibh.isDirty = 0;
124 ibh.numFixes = 0;
125 ibh.tabSetId = 0;
126 ibh.pageId = 0;
127 ibh.fixStat = 0;
128 ibh.numUsage = 0;
129
130 for (int i = 0; i<_numPages; i++)
131 {
132 memcpy(ptr, &ibh, BUFFERHEAD_SIZE );
133 ptr = (void*)((long long)ptr + BUFFERHEAD_SIZE + _pageSize);
134 }
135
136 log(_modId, Logger::NOTICE, Chain("Buffer pool initialized"));
137 }
138 }
139 else
140 {
141 throw Exception(EXLOC, "Buffer pool already created");
142 }
143 }
144
removePool()145 void CegoBufferPool::removePool()
146 {
147 if (_pBufPool)
148 {
149 log(_modId, Logger::NOTICE, Chain("Removing pool ..."));
150
151 for ( int i=0; i<_numSegment; i++)
152 {
153 free ( _pBufPool[i] );
154 }
155 free ( _pBufPool );
156
157 _pBufPool = 0;
158 log(_modId, Logger::NOTICE, Chain("Pool removed"));
159 }
160 }
161
getDBName()162 Chain CegoBufferPool::getDBName()
163 {
164 return _dbName;
165 }
166
bufferFix(CegoBufferPage & bp,int tabSetId,PageIdType pageId,FixMode m,CegoLockHandler * pLockHandle,int numTry)167 void CegoBufferPool::bufferFix(CegoBufferPage &bp, int tabSetId, PageIdType pageId, FixMode m, CegoLockHandler *pLockHandle, int numTry)
168 {
169 _fixCount++;
170
171 if ( _fixCount % BUPMNG_STATSPERIOD == 0 )
172 {
173 resetStats();
174 }
175
176 if (_pBufPool == 0)
177 {
178 throw Exception(EXLOC, "No valid bufferpool" );
179 }
180
181 int segid = calcSegment(pageId);
182
183 void *base = (void*)_pBufPool[segid];
184
185 int hashId = calcHash(pageId);
186
187 // cout << "PageId=" << pageId << ", Seg=" << segid << ", HashId=" << hashId << endl;
188
189 void* bufPtr = (void*)((long long)base + (long long)BUFFERPOOLHEAD_SIZE + (long long )hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ));
190
191 bool isFixed = false;
192
193 int numTries = 0;
194
195 // Step I : searching for possible slot
196
197 while ( isFixed == false && numTries < _maxFixTries )
198 {
199 _fixTry++;
200
201 BufferHead* pBH = (BufferHead*)bufPtr;
202
203 pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
204
205 if ( pBH->isOccupied != NOT_OCCUPIED && pBH->pageId == pageId)
206 {
207 isFixed = true;
208
209 if ( m == CegoBufferPool::PERSISTENT )
210 pBH->isOccupied = PERSISTENT_OCCUPIED;
211 else if ( m == CegoBufferPool::NOSYNC && pBH->isOccupied == WRITE_ON_SYNC)
212 pBH->isOccupied = WRITE_ON_DIRTY;
213 else
214 {
215 pBH->isOccupied = WRITE_ON_SYNC;
216 }
217
218 pBH->numFixes++;
219 pBH->fixStat++;
220 pBH->numUsage++;
221
222 pLockHandle->unlockBufferPool(hashId);
223 }
224 else
225 {
226 pLockHandle->unlockBufferPool(hashId);
227
228 hashId = ( hashId + 1 ) % _numPages;
229 bufPtr = (void*)( (long long)hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ) + (long long)base + (long long)BUFFERPOOLHEAD_SIZE );
230 numTries++;
231 }
232 }
233
234 // Step II : page is not in buffercache yet, searching free and not occupied slot
235
236 void* minFixStatBufPtr = 0;
237 int minHashId = -1;
238
239 if ( ! isFixed )
240 {
241 numTries = 0;
242 hashId = calcHash(pageId);
243 bufPtr = (void*)((long long)base + (long long)BUFFERPOOLHEAD_SIZE + (long long)hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ));
244
245 int minFixStat = -1;
246
247 while ( ! isFixed && numTries < _maxFixTries )
248 {
249 try
250 {
251 pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
252 }
253 catch ( Exception e )
254 {
255 if ( minHashId != -1 )
256 pLockHandle->unlockBufferPool(minHashId);
257 throw Exception(EXLOC, "Cannot lock bufferpool", e);
258 }
259
260 // cout << "Trying to fix with try " << numTries << endl;
261
262 BufferHead* pBH = (BufferHead*)bufPtr;
263
264 if ( pBH->isOccupied == NOT_OCCUPIED )
265 {
266 if ( m == CegoBufferPool::PERSISTENT )
267 pBH->isOccupied = PERSISTENT_OCCUPIED;
268 else if ( m == CegoBufferPool::SYNC )
269 pBH->isOccupied = WRITE_ON_SYNC;
270 else
271 pBH->isOccupied = WRITE_ON_DIRTY;
272
273 pBH->pageId = pageId;
274 pBH->isDirty = 0;
275 pBH->numFixes = 1;
276
277 try
278 {
279 _numDiskRead++;
280 _diskReadTimer.start();
281 readPage(pageId, pBH->tabSetId, pBH->fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
282 _diskReadTimer.stop();
283 _avgReadDelay = ( _diskReadTimer.getSum() / (_numDiskRead+(unsigned long long)1) ) / 1000;
284 }
285 catch ( Exception e )
286 {
287 pLockHandle->unlockBufferPool(hashId);
288 if ( minHashId != -1 )
289 pLockHandle->unlockBufferPool(minHashId);
290 throw e;
291 }
292
293 pBH->fixStat++;
294 pBH->numUsage++;
295
296 isFixed = true;
297
298 pLockHandle->unlockBufferPool(hashId);
299 if ( minHashId != -1 )
300 pLockHandle->unlockBufferPool(minHashId);
301
302 }
303 else
304 {
305
306 // for now, we take any tabSetId
307 // tabSetId = bh.tabSetId;
308
309 if ( ( ( pBH->isOccupied != PERSISTENT_OCCUPIED && pBH->isDirty == 0 )
310 || pBH->isOccupied == WRITE_ON_DIRTY )
311 && pBH->numFixes == 0 && ( minFixStat > pBH->fixStat || minFixStat == -1))
312 {
313 minFixStatBufPtr = bufPtr;
314 // cout << "Found page with minFixStat = " << bh.fixStat << endl;
315 minFixStat = pBH->fixStat;
316
317 if ( minHashId != -1 )
318 {
319 pLockHandle->unlockBufferPool(minHashId);
320 }
321
322 minHashId = hashId;
323
324 }
325 else
326 {
327 pLockHandle->unlockBufferPool(hashId);
328 }
329
330 hashId = ( hashId + 1 ) % _numPages;
331
332 bufPtr = (void*)( (long long)hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ) + (long long)base + (long long)BUFFERPOOLHEAD_SIZE );
333 numTries++;
334 }
335 }
336
337 // Step III : page is not in buffercache yet and all slots are occupied, using the occupied slot
338 // with lowest fix stat
339
340 if ( ! isFixed && minFixStatBufPtr != 0)
341 {
342
343 bufPtr = minFixStatBufPtr;
344
345 // minHashId is already locked
346 // pLockHandle->lockBufferPool(minHashId);
347
348 BufferHead *pBH = (BufferHead*)bufPtr;
349
350 if ( pBH->isOccupied == WRITE_ON_DIRTY && pBH->isDirty == 1 )
351 {
352 _numDiskWrite++;
353
354 #ifdef DEBUG
355 log(_modId, Logger::DEBUG, Chain("Async write of page [") + Chain(pBH->pageId) + Chain("]"));
356 #endif
357
358 // cout << "---- >>>> Async write of page [" << bh.fileId << "," << bh.pageId << "]" << endl;
359
360 _diskWriteTimer.start();
361 writePage(pBH->pageId, pBH->fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
362 _diskWriteTimer.stop();
363
364 _avgWriteDelay = ( _diskWriteTimer.getSum() / (_numDiskWrite+(unsigned long long)1) ) / 1000;
365
366 }
367
368 // cout << "Using occupied page ( type " << bh.isOccupied << " ) with fileId " << bh.fileId << " pageId " << bh.pageId << " numFixes " << bh.numFixes << endl;
369
370 if ( m == CegoBufferPool::PERSISTENT )
371 pBH->isOccupied = PERSISTENT_OCCUPIED;
372 else if ( m == CegoBufferPool::SYNC )
373 pBH->isOccupied = WRITE_ON_SYNC;
374 else if ( m == CegoBufferPool::NOSYNC )
375 pBH->isOccupied = WRITE_ON_DIRTY;
376
377 pBH->pageId = pageId;
378 pBH->isDirty = 0;
379 pBH->numFixes = 1;
380
381 try
382 {
383 _numDiskRead++;
384 _diskReadTimer.start();
385 readPage(pageId, pBH->tabSetId, pBH->fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
386 _diskReadTimer.stop();
387
388 _avgReadDelay = ( _diskReadTimer.getSum() / (_numDiskRead+(unsigned long long)1) ) / 1000;
389
390 }
391 catch ( Exception e )
392 {
393 pLockHandle->unlockBufferPool(minHashId);
394 throw e;
395 }
396 pBH->fixStat++;
397 pBH->numUsage++;
398
399 pLockHandle->unlockBufferPool(minHashId);
400
401 isFixed = true;
402
403 }
404
405 if ( ! isFixed )
406 {
407 if ( numTry >= 2 )
408 throw Exception(EXLOC, "No more buffers available");
409
410 Chain tableSet = getTabSetName(tabSetId);
411 log(_modId, Logger::NOTICE, Chain("Forced checkpoint by bufferFix for tableset ") + tableSet);
412
413 if ( numTry == 0 )
414 writeCheckPoint(tabSetId, true, Chain(""), 0, pLockHandle);
415
416 if ( numTry == 1 )
417 {
418
419 // we force checkpoints for all active tableset,
420 ListT<int> tsList = getOnlineTableSet();
421 int* pTS = tsList.First();
422 while ( pTS )
423 {
424 if ( *pTS != tabSetId )
425 {
426 Chain foreignTableSet = getTabSetName(*pTS);
427 log(_modId, Logger::NOTICE, Chain("Forced checkpoint by bufferFix for foreign tableset ") + foreignTableSet);
428 writeCheckPoint(*pTS, true, Chain(""), 0, pLockHandle);
429 }
430 pTS = tsList.Next();
431 }
432 }
433
434 numTry++;
435
436 // try again
437 return bufferFix(bp, tabSetId, pageId, m, pLockHandle, numTry);
438
439 }
440 }
441
442 bp.setPageSize(_pageSize);
443 bp.setPagePtr((char*)((long long)bufPtr + BUFFERHEAD_SIZE));
444 bp.setPageHead((CegoBufferPage::PageHead*)((long long)bufPtr + BUFFERHEAD_SIZE));
445 bp.setPageId(pageId);
446 bp.setFixed(true);
447
448 }
449
emptyFix(CegoBufferPage & bp,int tabSetId,FixMode m,CegoFileHandler::FileType ft,CegoLockHandler * pLockHandle,bool doAppend)450 void CegoBufferPool::emptyFix(CegoBufferPage &bp, int tabSetId, FixMode m, CegoFileHandler::FileType ft, CegoLockHandler *pLockHandle, bool doAppend)
451 {
452 if (_pBufPool == 0)
453 {
454 throw Exception(EXLOC, "No valid bufferpool" );
455 }
456
457 PageIdType pageId;
458
459 unsigned* fbm;
460 int fbmSize=0;
461
462 allocatePage(tabSetId, ft, pageId, pLockHandle, fbm, fbmSize, doAppend);
463
464 if ( fbmSize > 0 )
465 {
466 int fileId = getFileIdForPageId(pageId);
467 logBM(tabSetId, fileId, fbm, fbmSize, pLockHandle);
468 delete[] fbm;
469 }
470
471 bufferFix(bp, tabSetId, pageId, m, pLockHandle);
472 bp.initPage(CegoBufferPage::TABLE);
473 bp.setFixed(true);
474
475 }
476
bufferUnfix(CegoBufferPage & bp,bool isDirty,CegoLockHandler * pLockHandle)477 void CegoBufferPool::bufferUnfix(CegoBufferPage &bp, bool isDirty, CegoLockHandler *pLockHandle)
478 {
479 if (_pBufPool == 0)
480 {
481 throw Exception(EXLOC, "No valid bufferpool" );
482 }
483
484 int segid = calcSegment(bp.getPageId());
485
486 void *base = (void*)_pBufPool[segid];
487 char* bufPtr = (char*)bp.getPagePtr();
488 int hashId = ( (long long)bufPtr - (long long)base - BUFFERPOOLHEAD_SIZE ) / ( BUFFERHEAD_SIZE + _pageSize );
489
490 bufPtr = bufPtr - BUFFERHEAD_SIZE;
491
492 pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
493
494 BufferHead* pBH = (BufferHead*)bufPtr;
495
496 if ( pBH->numFixes > 0 )
497 {
498 if ( isDirty )
499 {
500 pBH->isDirty = 1;
501 }
502 pBH->numFixes--;
503 }
504 else
505 {
506 pLockHandle->unlockBufferPool(hashId);
507 throw Exception(EXLOC, "Number of fixes is already zero");
508 }
509
510 bp.setFixed(false);
511
512 pLockHandle->unlockBufferPool(hashId);
513 }
514
bufferRelease(CegoBufferPage & bp,CegoLockHandler * pLockHandle)515 void CegoBufferPool::bufferRelease(CegoBufferPage &bp, CegoLockHandler *pLockHandle)
516 {
517 if (_pBufPool == 0)
518 {
519 throw Exception(EXLOC, "No valid bufferpool" );
520 }
521
522 char* bufPtr = (char*)bp.getPagePtr();
523 bufPtr = bufPtr - BUFFERHEAD_SIZE;
524
525 BufferHead bh;
526
527 int hashId = calcHash(bp.getPageId());
528
529 pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
530
531
532 int tabSetId = 0;
533
534 try
535 {
536 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
537
538 // save tabSetId value
539 tabSetId = bh.tabSetId;
540
541 bh.isOccupied = NOT_OCCUPIED;
542 bh.numFixes=0;
543 bh.isDirty = 0;
544 bh.tabSetId = 0;
545 bh.pageId = 0;
546 bh.fixStat = 0;
547 bh.numUsage = 0;
548
549 memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
550
551 // we just collect freed page ids
552 // they are physically deleted at next checkpoint
553 _deletedPageList.Insert(bp.getPageId());
554
555 pLockHandle->unlockBufferPool(hashId);
556
557 }
558 catch ( Exception e )
559 {
560 pLockHandle->unlockBufferPool(hashId);
561 throw Exception(EXLOC, "Cannot release file page", e);
562 }
563 bp.setFixed(false);
564
565 if ( _deletedPageList.Size() > _maxPageDelete )
566 {
567 Chain tableSet = getTabSetName(tabSetId);
568 log(_modId, Logger::NOTICE, Chain("Forced checkpoint by bufferRelease for tableset ") + tableSet);
569 writeCheckPoint(tableSet, true, false, pLockHandle);
570 }
571
572 }
573
writeCheckPoint(const Chain & tableSet,bool switchLog,bool archComplete,CegoLockHandler * pLockHandle,const Chain & escCmd,int escTimeout,int archTimeout)574 unsigned long long CegoBufferPool::writeCheckPoint(const Chain& tableSet, bool switchLog, bool archComplete, CegoLockHandler *pLockHandle, const Chain& escCmd, int escTimeout, int archTimeout)
575 {
576 int tabSetId = getTabSetId(tableSet);
577
578 log(_modId, Logger::NOTICE, Chain("Writing checkpoint for tableset ") + tableSet + Chain(" as lsn = ")
579 + Chain(getCurrentLSN(tabSetId)) + Chain(" ..."));
580
581 unsigned long long lsn = writeCheckPoint(tabSetId, switchLog, escCmd, escTimeout, pLockHandle);
582
583 Datetime tsStart;
584
585 unsigned long long archExceed = tsStart.asLong() + archTimeout;
586 if ( archComplete )
587 {
588 while ( archiveComplete(tableSet) == false )
589 {
590 log(_modId, Logger::NOTICE, Chain("Waiting to complete archiving in tableset ") + tableSet + Chain(" ..."));
591
592 Datetime tsNow;
593 if ( tsNow.asLong() > archExceed )
594 {
595 throw Exception(EXLOC, "Archiving timeout reached");
596 }
597 Sleeper s;
598 s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
599 }
600 }
601 return lsn;
602 }
603
604
writeCheckPoint(int tabSetId,bool switchLog,const Chain & escCmd,int timeout,CegoLockHandler * pLockHandle)605 unsigned long long CegoBufferPool::writeCheckPoint(int tabSetId, bool switchLog, const Chain& escCmd, int timeout, CegoLockHandler *pLockHandle)
606 {
607
608 if (_pBufPool == 0)
609 {
610 throw Exception(EXLOC, "No valid bufferpool" );
611 }
612
613 try
614 {
615 pLockHandle->lockBufferPool();
616
617 unsigned long long lsn;
618
619 if ( escCmd != Chain("") )
620 {
621 log(_modId, Logger::NOTICE, Chain("Executing escape command <") + escCmd + Chain(">"));
622
623 char *pShell = getenv(CGEXESHELLVARNAME);
624
625 Chain shellCmd;
626 if ( pShell == NULL )
627 {
628 shellCmd = Chain(CGSTDEXESHELL);
629 }
630 else
631 {
632 shellCmd = Chain(pShell);
633 }
634
635 CommandExecuter cmdExe(shellCmd);
636
637 try
638 {
639 int retCode = cmdExe.execute(escCmd, timeout);
640 log(_modId, Logger::NOTICE, Chain("Escape command finished with return code : <") + Chain(retCode) + Chain(">"));
641 }
642 catch ( Exception e )
643 {
644 Chain msg;
645 e.pop(msg);
646 log(_modId, Logger::LOGERR, msg);
647 throw Exception(EXLOC, msg);
648 }
649 }
650
651
652 PageIdType *pDelPage = _deletedPageList.First();
653 while ( pDelPage )
654 {
655 int fbmSize=0;
656 unsigned* fbm;
657
658 // cout << "Releasing page " << *pDelPage << endl;
659
660 releasePage(*pDelPage, pLockHandle, fbm, fbmSize);
661
662 if ( fbmSize > 0 )
663 {
664 int fileId = getFileIdForPageId(*pDelPage);
665 logBM(tabSetId, fileId, fbm, fbmSize, pLockHandle);
666 delete[] fbm;
667 }
668 pDelPage = _deletedPageList.Next();
669 }
670
671 // and reset the list
672 _deletedPageList.Empty();
673
674
675 // is checkpoint dump enabled for tableset ?
676
677 if ( checkPointDumpEnabled(tabSetId) )
678 {
679
680 // free all collected deleted page id's
681 // we don't treat this in dump, since a crash during checkpoint
682 // might just result in page leaks which can be corrected via cleanup startup option
683
684 dumpCheckpoint(tabSetId);
685
686 if ( switchLog )
687 {
688 // set commited lsn to current lsn to sync with switchLogFile if log is active
689 lsn = getCurrentLSN(tabSetId);
690 setCommittedLSN(tabSetId, lsn);
691
692 while ( switchLogFile(tabSetId) == false )
693 {
694 log(_modId, Logger::NOTICE, Chain("Logfile for tabSetId ") + Chain(tabSetId) + Chain(" still active, switch failed"));
695 Sleeper s;
696 s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
697 }
698 }
699 else
700 {
701 // we write a sync in any case
702 CegoLogRecord lr;
703 lr.setAction(CegoLogRecord::LOGREC_SYNC);
704 logAction(tabSetId, lr);
705 }
706
707 lsn = getCurrentLSN(tabSetId);
708 setCommittedLSN(tabSetId, lsn);
709
710 // force sync
711 doc2Xml();
712
713 // now dump to data file
714 commitCheckpoint(tabSetId, pLockHandle);
715
716 }
717 else // ( checkPointDumpEnabled(tabSetId) == false )
718 {
719
720 if ( switchLog )
721 {
722 // set lsn before logfile switch, the method switchLogFile synchronizes with doc2Xml
723 lsn = getCurrentLSN(tabSetId);
724 setCommittedLSN(tabSetId, lsn);
725
726 while ( switchLogFile(tabSetId) == false )
727 {
728 log(_modId, Logger::NOTICE, Chain("Logfile for tabSetId ") + Chain(tabSetId) + Chain(" still active, switch failed"));
729 Sleeper s;
730 s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
731 }
732 }
733 else
734 {
735 // we write a sync in any case
736 CegoLogRecord lr;
737 lr.setAction(CegoLogRecord::LOGREC_SYNC);
738 logAction(tabSetId, lr);
739
740 lsn = getCurrentLSN(tabSetId);
741 setCommittedLSN(tabSetId, lsn);
742 }
743
744 Chain currentState = getTableSetRunState(tabSetId);
745 setTableSetRunState(tabSetId, XML_CHECKPOINT_VALUE);
746
747 // sync control information to xml database file
748 doc2Xml();
749
750 for ( int i=0; i<_numSegment; i++)
751 {
752 void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
753
754 int pageCount = 0;
755
756 while ( pageCount < _numPages )
757 {
758 BufferHead bh;
759 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
760
761 if ( bh.isOccupied != NOT_OCCUPIED && bh.isDirty != 0 && bh.tabSetId == tabSetId )
762 {
763 if ( currentState == Chain(XML_BACKUP_VALUE) )
764 {
765 if ( needPageBackup(bh.pageId) )
766 {
767 #ifdef DEBUG
768 log(_modId, Logger::DEBUG, Chain("Reading page (") + Chain(bh.fileId) + Chain(",") + Chain(bh.pageId) + Chain(") to log ..."));
769 #endif
770 int ts;
771 unsigned fixStat;
772 char *pageData = new char[_pageSize];
773 readPage(bh.pageId, ts, fixStat, pageData, pLockHandle);
774
775 CegoLogRecord lr;
776 lr.setAction(CegoLogRecord::LOGREC_BUPAGE);
777 lr.setData(pageData);
778 lr.setDataLen(_pageSize);
779 lr.setPageId(bh.pageId);
780
781 if ( logAction(tabSetId, lr) == CegoLogManager::LOG_FULL )
782 {
783 #ifdef DEBUG
784 log(_modId, Logger::DEBUG, Chain("Switching logFiles ..."));
785 #endif
786
787 while ( switchLogFile(tabSetId) == false )
788 {
789 log(_modId, Logger::NOTICE, Chain("Logfile for tabSetId ") + Chain(tabSetId) + Chain(" still active, switch failed"));
790
791 Sleeper s;
792 s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
793
794 }
795
796 if ( logAction(tabSetId, lr) != CegoLogManager::LOG_SUCCESS )
797 {
798 delete[] pageData;
799 throw Exception(EXLOC, "Cannot write to log");
800 }
801 }
802
803 delete[] pageData;
804 }
805 }
806 _numDiskWrite++;
807 _diskWriteTimer.start();
808
809 writePage(bh.pageId, bh.fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
810 _diskWriteTimer.stop();
811
812 _avgWriteDelay = ( _diskWriteTimer.getSum() / (_numDiskWrite+(unsigned long long)1) ) / 1000;
813
814 bh.isDirty = 0;
815 memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
816 }
817 bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
818 pageCount++;
819 }
820 }
821 // set back tableset runstate to saved value
822 setTableSetRunState(tabSetId, currentState);
823 // sync control information to xml database file
824 doc2Xml();
825 }
826
827 optimizePool(tabSetId);
828
829 _cpCount++;
830
831 pLockHandle->unlockBufferPool();
832 return lsn;
833
834 }
835 catch ( Exception e )
836 {
837 // if we got an exception, we have to unlock the still locked bufferpool area
838 pLockHandle->unlockBufferPool();
839 throw e;
840 }
841 }
842
getCPCount() const843 unsigned long long CegoBufferPool::getCPCount() const
844 {
845 return _cpCount;
846 }
847
dumpCheckpoint(int tabSetId)848 void CegoBufferPool::dumpCheckpoint(int tabSetId)
849 {
850 Chain tableSet = getTabSetName(tabSetId);
851 Chain tsRoot = getTSRoot(tableSet);
852
853 CegoCheckpointDump cpd(tableSet, tsRoot, _pageSize);
854
855 cpd.startWrite();
856
857 for ( int i=0; i<_numSegment; i++)
858 {
859 void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
860
861 int pageCount = 0;
862
863 while ( pageCount < _numPages )
864 {
865 BufferHead bh;
866 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
867
868 if ( bh.isOccupied != NOT_OCCUPIED && bh.isDirty != 0 && bh.tabSetId == tabSetId )
869 {
870 cpd.dumpPage(bh.pageId, bh.fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE));
871 bh.isDirty = 0;
872 memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
873 }
874 bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
875 pageCount++;
876 }
877 }
878
879 cpd.finish();
880 }
881
commitCheckpoint(int tabSetId,CegoLockHandler * pLockHandle)882 void CegoBufferPool::commitCheckpoint(int tabSetId, CegoLockHandler *pLockHandle)
883 {
884 Chain currentState = getTableSetRunState(tabSetId);
885
886 Chain tableSet = getTabSetName(tabSetId);
887 Chain tsRoot = getTSRoot(tableSet);
888
889 CegoCheckpointDump cpd(tableSet, tsRoot, _pageSize);
890
891
892 // commit the dump
893 cpd.commitDump();
894
895 // cout << "CRASH SIMULATION .." << endl;
896 // for Debuging: Simuldate crash here
897 // int *pCrash;
898 // *pCrash = 42;
899
900 PageIdType pageId;
901 unsigned fixStat;
902 char *buf = new char[_pageSize];
903
904 cpd.startRead();
905
906 bool logExceedDetected=false;
907
908 while ( cpd.readDump(pageId, fixStat, buf) )
909 {
910 if ( currentState == Chain(XML_BACKUP_VALUE) )
911 {
912 if ( needPageBackup(pageId) )
913 {
914 #ifdef DEBUG
915 log(_modId, Logger::DEBUG, Chain("Reading page (") + Chain(bh.fileId) + Chain(",") + Chain(bh.pageId) + Chain(") to log ..."));
916 #endif
917 int ts;
918 unsigned oldFixStat;
919 char *oldPageData = new char[_pageSize];
920 try
921 {
922 readPage(pageId, ts, oldFixStat, oldPageData, pLockHandle);
923
924 CegoLogRecord lr;
925 lr.setAction(CegoLogRecord::LOGREC_BUPAGE);
926 lr.setData(oldPageData);
927 lr.setDataLen(_pageSize);
928 lr.setPageId(pageId);
929
930 if ( logAction(tabSetId, lr) == CegoLogManager::LOG_FULL && logExceedDetected == false )
931 {
932 logExceedDetected = true;
933 log(_modId, Logger::NOTICE, Chain("Logfile exceeded limit during checkpoint dump at offset ") + Chain(getLogOffset(tabSetId)));
934 }
935 }
936 catch ( Exception e )
937 {
938 delete[] oldPageData;
939 throw e;
940 }
941 delete[] oldPageData;
942 }
943 }
944 _numDiskWrite++;
945 _diskWriteTimer.start();
946
947 writePage(pageId, fixStat, buf, pLockHandle);
948 _diskWriteTimer.stop();
949
950 _avgWriteDelay = ( _diskWriteTimer.getSum() / (_numDiskWrite+(unsigned long long)1) ) / 1000;
951 }
952
953 delete[] buf;
954
955 cpd.remove();
956 }
957
restoreCheckpointDump(int tabSetId,CegoLockHandler * pLockHandle)958 void CegoBufferPool::restoreCheckpointDump(int tabSetId, CegoLockHandler *pLockHandle)
959 {
960 Chain tableSet = getTabSetName(tabSetId);
961 Chain tsRoot = getTSRoot(tableSet);
962
963 CegoCheckpointDump cpd(tableSet, tsRoot, _pageSize);
964
965 if ( cpd.readyDumpExists() )
966 {
967
968 PageIdType pageId;
969 unsigned fixStat;
970 char *buf = new char[_pageSize];
971
972 cpd.startRead();
973
974 while ( cpd.readDump(pageId, fixStat, buf) )
975 {
976 _numDiskWrite++;
977 _diskWriteTimer.start();
978
979 writePage(pageId, fixStat, buf, pLockHandle);
980 _diskWriteTimer.stop();
981
982 _avgWriteDelay = ( _diskWriteTimer.getSum() / (_numDiskWrite+(unsigned long long)1) ) / 1000;
983 }
984
985 delete[] buf;
986
987 cpd.remove();
988 }
989
990 cpd.cleanUp();
991 }
992
writeAndRemoveTabSet(int tabSetId,CegoLockHandler * pLockHandle)993 void CegoBufferPool::writeAndRemoveTabSet(int tabSetId, CegoLockHandler *pLockHandle)
994 {
995 if (_pBufPool == 0)
996 {
997 throw Exception(EXLOC, "No valid bufferpool");
998 }
999
1000 for ( int i=0; i<_numSegment; i++)
1001 {
1002 void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1003
1004 int pageCount = 0;
1005
1006 while ( pageCount < _numPages )
1007 {
1008 BufferHead bh;
1009 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1010
1011 if ( bh.tabSetId == tabSetId )
1012 {
1013 if ( bh.isOccupied != NOT_OCCUPIED && bh.isDirty != 0 )
1014 {
1015 _numDiskWrite++;
1016
1017 _diskWriteTimer.start();
1018 writePage(bh.pageId, bh.fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
1019 _diskWriteTimer.stop();
1020
1021 _avgWriteDelay = ( _diskWriteTimer.getSum() / (_numDiskWrite+(unsigned long long)1) ) / 1000;
1022 }
1023
1024 bh.isOccupied = NOT_OCCUPIED;
1025 bh.numFixes=0;
1026
1027 memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
1028
1029 }
1030 bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1031 pageCount++;
1032 }
1033 }
1034 releaseFiles(tabSetId);
1035 }
1036
uptime() const1037 unsigned long long CegoBufferPool::uptime() const
1038 {
1039 Datetime ts;
1040 return ts.asLong() - _poolStart;
1041 }
1042
poolInfo(int & pageSize,unsigned long long & numTotal,unsigned long long & numUsed,unsigned long long & numFree,unsigned long long & numDirty,unsigned long long & numFixes,unsigned long long & numPersistent,unsigned long long & numNoSync,unsigned long long & numDiskRead,unsigned long long & numDiskWrite,double & hitRate,double & spreadRate,unsigned long long & readDelay,unsigned long long & writeDelay,unsigned long long & curFixCount,unsigned long long & maxFixCount,unsigned long long & avgFixTry,unsigned long long & statStart,unsigned long long & uptime) const1043 void CegoBufferPool::poolInfo(int& pageSize,
1044 unsigned long long& numTotal,
1045 unsigned long long& numUsed,
1046 unsigned long long& numFree,
1047 unsigned long long& numDirty,
1048 unsigned long long& numFixes,
1049 unsigned long long& numPersistent,
1050 unsigned long long& numNoSync,
1051 unsigned long long& numDiskRead,
1052 unsigned long long& numDiskWrite,
1053 double& hitRate,
1054 double& spreadRate,
1055 unsigned long long& readDelay,
1056 unsigned long long& writeDelay,
1057 unsigned long long& curFixCount,
1058 unsigned long long& maxFixCount,
1059 unsigned long long& avgFixTry,
1060 unsigned long long& statStart,
1061 unsigned long long& uptime) const
1062 {
1063 if (_pBufPool == 0)
1064 {
1065 throw Exception(EXLOC, "No valid bufferpool" );
1066 }
1067
1068 pageSize = _pageSize;
1069 numTotal = _numPages * _numSegment;
1070 numUsed=0;
1071 numFree=0;
1072 numFixes=0;
1073 numDirty=0;
1074 numPersistent=0;
1075 numNoSync=0;
1076
1077 unsigned long long numUsage = 0;
1078
1079 for ( int i=0; i<_numSegment; i++)
1080 {
1081 void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1082
1083 unsigned long long pageCount = 0;
1084
1085 unsigned long long numSegPageUsed = 0;
1086
1087 while ( pageCount < _numPages )
1088 {
1089 BufferHead bh;
1090 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1091
1092 numFixes += (long)bh.numFixes;
1093 numDirty += (long)bh.isDirty;
1094
1095 // we don't count persistent hot spot pages
1096 if ( bh.isOccupied != PERSISTENT_OCCUPIED )
1097 numUsage += bh.numUsage;
1098
1099 if (bh.isOccupied == NOT_OCCUPIED)
1100 {
1101 numFree++;
1102 }
1103 else if (bh.isOccupied == WRITE_ON_SYNC)
1104 {
1105 numSegPageUsed++;
1106 numUsed++;
1107 }
1108 else if (bh.isOccupied == WRITE_ON_DIRTY)
1109 {
1110 numSegPageUsed++;
1111 numUsed++;
1112 numNoSync++;
1113 }
1114 else if (bh.isOccupied == PERSISTENT_OCCUPIED)
1115 {
1116 numSegPageUsed++;
1117 numUsed++;
1118 numPersistent++;
1119 }
1120
1121 bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1122 pageCount++;
1123 }
1124 // cout << "Segment " << i << " : " << numSegPageUsed << " pages used" << endl;
1125 }
1126
1127 numDiskWrite = _numDiskWrite;
1128 numDiskRead = _numDiskRead;
1129
1130 // hitRate = BUPMNG_STATSPERIOD - _numDiskRead / ( _fixCount / BUPMNG_STATSPERIOD + 1) ;
1131 hitRate = 100.0 * ( (double)_fixCount - _numDiskRead + 1 ) / ( (double)_fixCount + 1 );
1132
1133 double usageMedian = (double)numUsage / (double)( _numSegment * _numPages );
1134 double usageDist = 0.0;
1135
1136 for ( int i=0; i<_numSegment; i++)
1137 {
1138 void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1139
1140 unsigned long long pageCount = 0;
1141
1142 while ( pageCount < _numPages )
1143 {
1144 BufferHead bh;
1145 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1146
1147 if ( bh.isOccupied != PERSISTENT_OCCUPIED )
1148 {
1149 usageMedian > bh.numUsage ? usageDist += usageMedian - (double)bh.numUsage : usageDist += (double)bh.numUsage - usageMedian;
1150 }
1151
1152 bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1153 pageCount++;
1154 }
1155 }
1156
1157 double avgDist = usageDist / (double)( _numSegment * _numPages );
1158 // cout << "DistMed = " << usageMedian << endl;
1159 // cout << "AvgDist = " << avgDist << endl;
1160 // cout << "NumUsage = " << numUsage << endl;
1161 spreadRate = avgDist / ( usageMedian + 1 );
1162
1163 // cout << "Fix Dist Rate = " << fixDistRate << endl;
1164
1165 readDelay = _avgReadDelay;
1166 writeDelay = _avgWriteDelay;
1167 statStart = _statStart;
1168 curFixCount = _fixCount;
1169 avgFixTry = _fixCount > 0 ? _fixTry / _fixCount : 0;
1170 maxFixCount = BUPMNG_STATSPERIOD;
1171
1172 Datetime ts;
1173 uptime = ts.asLong() - _poolStart;
1174 }
1175
getPoolEntryList(ListT<CegoBufferPoolEntry> & entryList)1176 void CegoBufferPool::getPoolEntryList(ListT<CegoBufferPoolEntry>& entryList)
1177 {
1178 if (_pBufPool == 0)
1179 {
1180 throw Exception(EXLOC, "No valid bufferpool" );
1181 }
1182
1183 entryList.Empty();
1184
1185 for ( int seg=0; seg<_numSegment; seg++)
1186 {
1187 void *bufPtr = (void*)( (long long)_pBufPool[seg] + BUFFERPOOLHEAD_SIZE );
1188
1189 unsigned long long pageCount = 0;
1190
1191 while ( pageCount < _numPages )
1192 {
1193 BufferHead bh;
1194 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1195
1196 bool isDirty = false;
1197 if ( bh.isDirty )
1198 isDirty=true;
1199
1200 Chain occState;
1201 if (bh.isOccupied == NOT_OCCUPIED)
1202 occState=Chain("NOTOCCUPIED");
1203 else if (bh.isOccupied == WRITE_ON_SYNC)
1204 occState=Chain("WRITEONSYNC");
1205 else if (bh.isOccupied == WRITE_ON_DIRTY)
1206 occState=Chain("WRITEONDIRTY");
1207 else if (bh.isOccupied == PERSISTENT_OCCUPIED)
1208 occState=Chain("PERSISTENT");
1209
1210 CegoBufferPoolEntry bpe(seg, pageCount, occState, isDirty, bh.numFixes, bh.tabSetId, bh.pageId, bh.fixStat, bh.numUsage);
1211 entryList.Insert(bpe);
1212
1213 bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1214 pageCount++;
1215 }
1216 }
1217 }
1218
resetStats()1219 void CegoBufferPool::resetStats()
1220 {
1221 _numDiskRead=0;
1222 _numDiskWrite=0;
1223 _fixCount=0;
1224 _fixTry=0;
1225 _avgReadDelay=0;
1226 _avgWriteDelay=0;
1227 _diskReadTimer.reset();
1228 _diskWriteTimer.reset();
1229 Datetime ts;
1230 _statStart = ts.asLong();
1231 }
1232
logIt(int tabSetId,CegoLogRecord & lr,CegoLockHandler * pLockHandle,bool flushLog)1233 void CegoBufferPool::logIt(int tabSetId, CegoLogRecord& lr, CegoLockHandler* pLockHandle, bool flushLog)
1234 {
1235 _lmLock[tabSetId].writeLock(LM_LOCKTIMEOUT);
1236
1237 try
1238 {
1239 CegoLogManager::LogResult res = logAction(tabSetId, lr, flushLog);
1240
1241 if ( res == CegoLogManager::LOG_FULL )
1242 {
1243 // if log is full, we have to force a checkpoint
1244 // to be consistent, log entry has already been written to log
1245
1246 Chain tableSet = getTabSetName(tabSetId);
1247 writeCheckPoint(tableSet, true, false, pLockHandle);
1248 }
1249
1250 if ( res == CegoLogManager::LOG_ERROR )
1251 {
1252 Chain tableSet = getTabSetName(tabSetId);
1253 setTableSetSyncState(tableSet, XML_LOG_LOSS_VALUE);
1254
1255 throw Exception(EXLOC, Chain("Cannot write to log"));
1256 }
1257 }
1258 catch ( Exception e )
1259 {
1260 _lmLock[tabSetId].unlock();
1261 throw e;
1262 }
1263
1264 _lmLock[tabSetId].unlock();
1265 }
1266
printPool()1267 void CegoBufferPool::printPool()
1268 {
1269 if (_pBufPool == 0)
1270 {
1271 throw Exception(EXLOC, "No valid bufferpool" );
1272 }
1273
1274 cout << "--- BufferPool ---" << endl;
1275 cout << "BasePtr: " << (long long)_pBufPool << endl;
1276 cout << "PageSize: " << _pageSize << endl;
1277 cout << "NumPages: " << _numPages << endl;
1278
1279 unsigned long long numUsed=0;
1280 unsigned long long numFree=0;
1281 unsigned long long numFixes=0;
1282 unsigned long long numDirty=0;
1283
1284 for ( int i=0; i<_numSegment; i++)
1285 {
1286 void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1287
1288 unsigned long long pageCount = 0;
1289
1290 while ( pageCount < _numPages )
1291 {
1292 BufferHead bh;
1293 memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1294
1295 numFixes += bh.numFixes;
1296 numDirty += (unsigned long long)bh.isDirty;
1297
1298 if (bh.isOccupied == 0)
1299 numFree++;
1300 else
1301 numUsed++;
1302
1303 bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1304 pageCount++;
1305 }
1306 }
1307
1308 cout << "NumUsed: " << numUsed << endl;
1309 cout << "NumFixes: " << numFixes << endl;
1310 cout << "NumDirty: " << numDirty << endl;
1311 cout << "NumFree: " << numFree << endl;
1312
1313 cout << "NumFree: " << numFree << endl;
1314 }
1315
getLMLockStat(int tabSetId,Chain & lockName,int & lockCount,unsigned long long & numRdLock,unsigned long long & numWrLock,unsigned long long & sumRdDelay,unsigned long long & sumWrDelay)1316 void CegoBufferPool::getLMLockStat(int tabSetId, Chain& lockName, int& lockCount, unsigned long long &numRdLock, unsigned long long &numWrLock, unsigned long long &sumRdDelay, unsigned long long &sumWrDelay)
1317 {
1318 lockName = getTabSetName(tabSetId);
1319 lockCount = _lmLock[tabSetId].numLockTry();
1320
1321 numRdLock = _lmLock[tabSetId].numReadLock();
1322 numWrLock = _lmLock[tabSetId].numWriteLock();
1323 sumRdDelay = 0;
1324 sumWrDelay = 0;
1325
1326 if ( _lmLock[tabSetId].numReadLock() > 0 )
1327 sumRdDelay = _lmLock[tabSetId].sumReadDelay() / LCKMNG_DELRES;
1328 if ( _lmLock[tabSetId].numWriteLock() > 0 )
1329 sumWrDelay = _lmLock[tabSetId].sumWriteDelay() / LCKMNG_DELRES;
1330 }
1331
1332 /////////////////////
1333 // private methods //
1334 /////////////////////
1335
calcHash(PageIdType pageId)1336 int CegoBufferPool::calcHash(PageIdType pageId)
1337 {
1338 // we have to cast to long, since for large fileid values, we might exceed integer range
1339 // unsigned long long s = (unsigned long long)(fileId+1) * (unsigned long long)_hashkey + (unsigned long long)(pageId+1);
1340
1341 unsigned long long s = (unsigned long long)(pageId+1);
1342 unsigned long long d = (unsigned long long)calcSegment(pageId) * (unsigned long long)_numPages;
1343
1344 if ( s > d )
1345 s = s - d;
1346
1347 return (int)( s % (unsigned long long)_numPages);
1348 }
1349
calcSegment(PageIdType pageId)1350 int CegoBufferPool::calcSegment(PageIdType pageId)
1351 {
1352 return (int)( ( pageId / (unsigned long long)_numPages ) % (unsigned long long)_numSegment);
1353 }
1354
1355
logBM(int tabSetId,int fileId,unsigned * fbm,int fbmSize,CegoLockHandler * pLockHandle)1356 void CegoBufferPool::logBM(int tabSetId, int fileId, unsigned* fbm, int fbmSize, CegoLockHandler* pLockHandle)
1357 {
1358 CegoLogRecord lr;
1359
1360 lr.setAction(CegoLogRecord::LOGREC_BUFBM);
1361 lr.setData((char*)fbm);
1362 lr.setDataLen(fbmSize * sizeof(unsigned));
1363 lr.setFileId(fileId);
1364
1365 logIt(tabSetId, lr, pLockHandle);
1366 }
1367
archiveComplete(const Chain & tableSet)1368 bool CegoBufferPool::archiveComplete(const Chain& tableSet)
1369 {
1370 ListT<Chain> lfList;
1371 ListT<int> sizeList;
1372 ListT<Chain> statusList;
1373
1374 getLogFileInfo(tableSet, lfList, sizeList, statusList);
1375
1376 Chain *pStatus = statusList.First();
1377
1378 while ( pStatus )
1379 {
1380 if ( *pStatus == Chain(XML_OCCUPIED_VALUE))
1381 {
1382 return false;
1383 }
1384 else
1385 {
1386 pStatus = statusList.Next();
1387 }
1388 }
1389 return true;
1390 }
1391
optimizePool(int tabSetId)1392 void CegoBufferPool::optimizePool(int tabSetId)
1393 {
1394 Chain tableSet = getTabSetName(tabSetId);
1395 log(_modId, Logger::NOTICE, Chain("Optimizing bufferpool for tableset ") + tableSet + Chain(" ..."));
1396
1397 unsigned long long numRelocated = 0;
1398
1399 for ( int seg=0; seg<_numSegment; seg++)
1400 {
1401 void *basePtr = (void*)( (long long)_pBufPool[seg] + BUFFERPOOLHEAD_SIZE );
1402
1403 unsigned long long pageIndex = 0;
1404
1405 while ( pageIndex < _numPages )
1406 {
1407
1408 BufferHead bh;
1409 memcpy(&bh, basePtr, BUFFERHEAD_SIZE);
1410
1411 if ( ( bh.isOccupied == WRITE_ON_SYNC || bh.isOccupied == WRITE_ON_DIRTY ) && bh.tabSetId == tabSetId && bh.numFixes == 0 )
1412 {
1413 int hashId = calcHash(bh.pageId);
1414
1415 if ( pageIndex != hashId )
1416 {
1417 // cout << "PageId=" << pageId << ", Seg=" << segid << ", HashId=" << hashId << endl;
1418
1419 void* checkPtr = (void*)((long long)_pBufPool[seg] + BUFFERPOOLHEAD_SIZE + (long long )hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ));
1420
1421 BufferHead cbh;
1422 memcpy(&cbh, checkPtr, BUFFERHEAD_SIZE);
1423
1424
1425 if ( cbh.isOccupied == NOT_OCCUPIED
1426 || ( cbh.numFixes == 0 && ( cbh.fixStat + BUPMNG_RELOCATION_THRESHOLD ) < bh.fixStat && cbh.isDirty == 0 ) )
1427 {
1428
1429 /*
1430 cout << "Drop Page:" << endl;
1431 cout << "PageId = " << cbh.pageId << endl;
1432 cout << "FixStat = " << cbh.fixStat << endl;
1433 cout << "Relocate Page:" << endl;
1434 cout << "PageId = " << bh.pageId << endl;
1435 cout << "FixStat = " << bh.fixStat << endl;
1436 */
1437
1438 numRelocated++;
1439
1440 // relocate page
1441 memcpy(checkPtr, basePtr, BUFFERHEAD_SIZE + _pageSize);
1442
1443 // clean slot
1444 BufferHead ibh;
1445 ibh.isOccupied = NOT_OCCUPIED;
1446 ibh.isDirty = 0;
1447 ibh.numFixes = 0;
1448 ibh.tabSetId = 0;
1449 ibh.pageId = 0;
1450 ibh.fixStat = 0;
1451 ibh.numUsage = 0;
1452 memcpy(basePtr, &ibh, BUFFERHEAD_SIZE );
1453
1454 }
1455 }
1456 }
1457
1458 pageIndex++;
1459
1460 basePtr = (void*)((long long)basePtr + (long long)( BUFFERHEAD_SIZE + _pageSize ));
1461 }
1462 }
1463
1464
1465 log(_modId, Logger::NOTICE, Chain(numRelocated) + Chain(" pages relocated "));
1466 }
1467