1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // CegoBufferPool.cc
4 // -----------------
5 // Cego buffer pool implementation module
6 //
7 // Design and Implementation by Bjoern Lemke
8 //
9 // (C)opyright 2000-2019 Bjoern Lemke
10 //
11 // IMPLEMENTATION MODULE
12 //
13 // Class: CegoBufferPool
14 //
15 // Description: The buffer pool management class
16 //
17 // Status: CLEAN
18 //
19 ///////////////////////////////////////////////////////////////////////////////
20 
21 // LFC INCLUDES
22 #include <lfcbase/Exception.h>
23 #include <lfcbase/CommandExecuter.h>
24 #include <lfcbase/Datetime.h>
25 #include <lfcbase/Sleeper.h>
26 #include <lfcbase/ThreadLock.h>
27 
28 // CEGO INCLUDES
29 #include "CegoBufferPool.h"
30 #include "CegoDefs.h"
31 #include "CegoXMLdef.h"
32 #include "CegoCheckpointDump.h"
33 
34 // POSIX INCLUDES
35 #include <string.h>
36 #include <stdlib.h>
37 
38 #define BUFFERPOOLHEAD_SIZE (((sizeof(BufferPoolHead)-1)/BUPMNG_ALIGNMENT)+1)*BUPMNG_ALIGNMENT
39 #define BUFFERHEAD_SIZE (((sizeof(BufferHead)-1)/BUPMNG_ALIGNMENT)+1)*BUPMNG_ALIGNMENT
40 
41 static ThreadLock _lmLock[TABMNG_MAXTABSET];
42 extern bool __lockStatOn;
43 
CegoBufferPool(const Chain & xmlDef,const Chain & logFile,const Chain & progName)44 CegoBufferPool::CegoBufferPool(const Chain& xmlDef, const Chain& logFile, const Chain& progName) :  CegoLogManager(xmlDef, logFile, progName)
45 {
46     _pBufPool = 0;
47     _numDiskRead=0;
48     _numDiskWrite=0;
49     _fixCount=0;
50     _fixTry=0;
51     _avgReadDelay=0;
52     _avgWriteDelay=0;
53     _cpCount = 0;
54 
55     for (int i=0;i< TABMNG_MAXTABSET;i++)
56     {
57 	_lmLock[i].init(LCKMNG_LOCKWAITDELAY, __lockStatOn);
58     }
59 
60     Datetime ts;
61     _statStart = ts.asLong();
62     _poolStart = ts.asLong();
63     _modId = getModId("CegoBufferPool");
64 }
65 
~CegoBufferPool()66 CegoBufferPool::~CegoBufferPool()
67 {
68 }
69 
initPool(unsigned long long numSegment,unsigned long long numPages)70 void CegoBufferPool::initPool(unsigned long long numSegment, unsigned long long numPages)
71 {
72 #ifdef DEBUG
73     log(_modId, Logger::DEBUG, Chain("Reading xml def ..."));
74 #endif
75 
76     Chain dbName = getDbName();
77     int pageSize = getPageSize();
78     _maxFixTries = getMaxFixTries();
79     _maxPageDelete = getMaxPageDelete();
80 
81     _dbName = dbName;
82     _numSegment = numSegment;
83     _numPages = numPages;
84     _pageSize = pageSize;
85 
86     if (_pBufPool == 0)
87     {
88 
89 	log(_modId, Logger::NOTICE, Chain("Allocating ") + Chain(_numSegment) + Chain(" buffer pool segments ( each ") +  Chain(_numPages) + Chain(" pages ) ..."));
90 
91 
92 	_pBufPool = (void**)malloc(_numSegment * sizeof(void*));
93 	if ( _pBufPool == NULL )
94 	{
95 	    throw Exception(EXLOC, "Cannot initialize pool");
96 	}
97 
98 
99 	for ( int i=0; i<_numSegment; i++)
100 	{
101 	    _pBufPool[i] = malloc(_numPages * (BUFFERHEAD_SIZE + _pageSize) + BUFFERPOOLHEAD_SIZE);
102 
103 	    if ( _pBufPool[i] == NULL )
104 	    {
105 		throw Exception(EXLOC, "Cannot initialize pool");
106 	    }
107 
108 	    log(_modId, Logger::NOTICE, Chain("Initializing buffer pool pages for segment ") + Chain(i) + Chain(" ..."));
109 
110 	    void *base = (void*)_pBufPool[i];
111 	    void *ptr = (void*)_pBufPool[i];
112 
113 	    BufferPoolHead bph;
114 	    bph.numPages=_numPages;
115 
116 	    memcpy(base, &bph, sizeof(BufferPoolHead));
117 	    // memcpy(base, &bph, BUFFERPOOLHEAD_SIZE);
118 
119 	    ptr = (void*)(BUFFERPOOLHEAD_SIZE + (long long)base);
120 
121 	    BufferHead ibh;
122 	    ibh.isOccupied = NOT_OCCUPIED;
123 	    ibh.isDirty = 0;
124 	    ibh.numFixes = 0;
125 	    ibh.tabSetId = 0;
126 	    ibh.pageId = 0;
127 	    ibh.fixStat = 0;
128 	    ibh.numUsage = 0;
129 
130 	    for (int i = 0; i<_numPages; i++)
131 	    {
132 		memcpy(ptr, &ibh, BUFFERHEAD_SIZE );
133 		ptr = (void*)((long long)ptr + BUFFERHEAD_SIZE + _pageSize);
134 	    }
135 
136 	    log(_modId, Logger::NOTICE, Chain("Buffer pool initialized"));
137 	}
138     }
139     else
140     {
141 	throw Exception(EXLOC, "Buffer pool already created");
142     }
143 }
144 
removePool()145 void CegoBufferPool::removePool()
146 {
147     if (_pBufPool)
148     {
149 	log(_modId, Logger::NOTICE, Chain("Removing pool ..."));
150 
151 	for ( int i=0; i<_numSegment; i++)
152 	{
153 	    free ( _pBufPool[i] );
154 	}
155 	free ( _pBufPool );
156 
157 	_pBufPool = 0;
158 	log(_modId, Logger::NOTICE, Chain("Pool removed"));
159     }
160 }
161 
getDBName()162 Chain CegoBufferPool::getDBName()
163 {
164     return _dbName;
165 }
166 
bufferFix(CegoBufferPage & bp,int tabSetId,PageIdType pageId,FixMode m,CegoLockHandler * pLockHandle,int numTry)167 void CegoBufferPool::bufferFix(CegoBufferPage &bp, int tabSetId, PageIdType pageId, FixMode m, CegoLockHandler *pLockHandle, int numTry)
168 {
169     _fixCount++;
170 
171     if ( _fixCount % BUPMNG_STATSPERIOD == 0 )
172     {
173 	resetStats();
174     }
175 
176     if (_pBufPool == 0)
177     {
178 	throw Exception(EXLOC, "No valid bufferpool" );
179     }
180 
181     int segid = calcSegment(pageId);
182 
183     void *base = (void*)_pBufPool[segid];
184 
185     int hashId = calcHash(pageId);
186 
187     // cout << "PageId=" << pageId << ", Seg=" << segid << ", HashId=" << hashId << endl;
188 
189     void* bufPtr = (void*)((long long)base + (long long)BUFFERPOOLHEAD_SIZE + (long long )hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ));
190 
191     bool isFixed = false;
192 
193     int numTries = 0;
194 
195     // Step I : searching for possible slot
196 
197     while ( isFixed == false && numTries < _maxFixTries )
198     {
199 	_fixTry++;
200 
201 	BufferHead* pBH = (BufferHead*)bufPtr;
202 
203 	pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
204 
205 	if ( pBH->isOccupied != NOT_OCCUPIED && pBH->pageId == pageId)
206 	{
207 	    isFixed = true;
208 
209 	    if ( m == CegoBufferPool::PERSISTENT )
210 	       pBH->isOccupied = PERSISTENT_OCCUPIED;
211 	    else if ( m == CegoBufferPool::NOSYNC && pBH->isOccupied == WRITE_ON_SYNC)
212 	       pBH->isOccupied = WRITE_ON_DIRTY;
213 	    else
214 	    {
215 		pBH->isOccupied = WRITE_ON_SYNC;
216 	    }
217 
218 	    pBH->numFixes++;
219 	    pBH->fixStat++;
220 	    pBH->numUsage++;
221 
222 	    pLockHandle->unlockBufferPool(hashId);
223 	}
224 	else
225 	{
226 	    pLockHandle->unlockBufferPool(hashId);
227 
228 	    hashId =  ( hashId + 1 ) % _numPages;
229 	    bufPtr = (void*)( (long long)hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ) + (long long)base + (long long)BUFFERPOOLHEAD_SIZE );
230 	    numTries++;
231 	}
232     }
233 
234     // Step II : page is not in buffercache yet, searching free and not occupied slot
235 
236     void* minFixStatBufPtr = 0;
237     int minHashId = -1;
238 
239     if ( ! isFixed )
240     {
241 	numTries = 0;
242 	hashId = calcHash(pageId);
243 	bufPtr = (void*)((long long)base + (long long)BUFFERPOOLHEAD_SIZE + (long long)hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ));
244 
245 	int minFixStat = -1;
246 
247 	while ( ! isFixed && numTries < _maxFixTries )
248 	{
249 	    try
250 	    {
251 		pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
252 	    }
253 	    catch ( Exception e )
254 	    {
255 		if ( minHashId != -1 )
256 		    pLockHandle->unlockBufferPool(minHashId);
257 		throw Exception(EXLOC, "Cannot lock bufferpool", e);
258 	    }
259 
260 	    // cout << "Trying to fix with try " << numTries << endl;
261 
262 	    BufferHead* pBH = (BufferHead*)bufPtr;
263 
264 	    if ( pBH->isOccupied == NOT_OCCUPIED )
265 	    {
266 		if ( m == CegoBufferPool::PERSISTENT )
267 		    pBH->isOccupied = PERSISTENT_OCCUPIED;
268 		else if ( m == CegoBufferPool::SYNC )
269 		    pBH->isOccupied = WRITE_ON_SYNC;
270 		else
271 		    pBH->isOccupied = WRITE_ON_DIRTY;
272 
273 		pBH->pageId = pageId;
274 		pBH->isDirty = 0;
275 		pBH->numFixes = 1;
276 
277 		try
278 		{
279 		    _numDiskRead++;
280 		    _diskReadTimer.start();
281 		    readPage(pageId, pBH->tabSetId, pBH->fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
282 		    _diskReadTimer.stop();
283 		    _avgReadDelay = ( _diskReadTimer.getSum() /  (_numDiskRead+(unsigned long long)1)  ) / 1000;
284 		}
285 		catch ( Exception e )
286 		{
287 		    pLockHandle->unlockBufferPool(hashId);
288 		    if ( minHashId != -1 )
289 			pLockHandle->unlockBufferPool(minHashId);
290 		    throw e;
291 		}
292 
293 		pBH->fixStat++;
294 		pBH->numUsage++;
295 
296 		isFixed = true;
297 
298 		pLockHandle->unlockBufferPool(hashId);
299 		if ( minHashId != -1 )
300 		    pLockHandle->unlockBufferPool(minHashId);
301 
302 	    }
303 	    else
304 	    {
305 
306 		// for now, we take any tabSetId
307 		// tabSetId = bh.tabSetId;
308 
309 		if ( ( ( pBH->isOccupied != PERSISTENT_OCCUPIED && pBH->isDirty == 0 )
310 		       || pBH->isOccupied == WRITE_ON_DIRTY )
311 		     && pBH->numFixes == 0 && ( minFixStat > pBH->fixStat || minFixStat == -1))
312 		{
313 		    minFixStatBufPtr = bufPtr;
314 		    // cout << "Found page with minFixStat = " << bh.fixStat << endl;
315 		    minFixStat = pBH->fixStat;
316 
317 		    if ( minHashId != -1 )
318 		    {
319 			pLockHandle->unlockBufferPool(minHashId);
320 		    }
321 
322 		    minHashId = hashId;
323 
324 		}
325 		else
326 		{
327 		    pLockHandle->unlockBufferPool(hashId);
328 		}
329 
330 		hashId =  ( hashId + 1 ) % _numPages;
331 
332 		bufPtr = (void*)( (long long)hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ) + (long long)base + (long long)BUFFERPOOLHEAD_SIZE );
333 		numTries++;
334 	    }
335 	}
336 
337 	// Step III : page is not in buffercache yet and all slots are occupied, using the occupied slot
338 	//            with lowest fix stat
339 
340 	if ( ! isFixed && minFixStatBufPtr != 0)
341 	{
342 
343 	    bufPtr = minFixStatBufPtr;
344 
345 	    // minHashId is already locked
346 	    // pLockHandle->lockBufferPool(minHashId);
347 
348 	    BufferHead *pBH = (BufferHead*)bufPtr;
349 
350 	    if ( pBH->isOccupied == WRITE_ON_DIRTY && pBH->isDirty == 1 )
351 	    {
352 		_numDiskWrite++;
353 
354 #ifdef DEBUG
355 		log(_modId, Logger::DEBUG, Chain("Async write of page [") + Chain(pBH->pageId) + Chain("]"));
356 #endif
357 
358 		// cout << "---- >>>> Async write of page [" << bh.fileId << "," << bh.pageId << "]" << endl;
359 
360 		_diskWriteTimer.start();
361 		writePage(pBH->pageId, pBH->fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
362 		_diskWriteTimer.stop();
363 
364 		_avgWriteDelay = ( _diskWriteTimer.getSum() /  (_numDiskWrite+(unsigned long long)1)  ) / 1000;
365 
366 	    }
367 
368 	    // cout << "Using occupied page ( type " << bh.isOccupied << " ) with fileId " << bh.fileId << " pageId " << bh.pageId << " numFixes " << bh.numFixes << endl;
369 
370 	    if ( m == CegoBufferPool::PERSISTENT )
371 		pBH->isOccupied = PERSISTENT_OCCUPIED;
372 	    else if ( m == CegoBufferPool::SYNC )
373 		pBH->isOccupied = WRITE_ON_SYNC;
374 	    else if ( m == CegoBufferPool::NOSYNC )
375 		pBH->isOccupied = WRITE_ON_DIRTY;
376 
377 	    pBH->pageId = pageId;
378 	    pBH->isDirty = 0;
379 	    pBH->numFixes = 1;
380 
381 	    try
382 	    {
383 		_numDiskRead++;
384 		_diskReadTimer.start();
385 		readPage(pageId, pBH->tabSetId, pBH->fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
386 		_diskReadTimer.stop();
387 
388 		_avgReadDelay = ( _diskReadTimer.getSum() /  (_numDiskRead+(unsigned long long)1) ) / 1000;
389 
390 	    }
391 	    catch ( Exception e )
392 	    {
393 		pLockHandle->unlockBufferPool(minHashId);
394 		throw e;
395 	    }
396 	    pBH->fixStat++;
397 	    pBH->numUsage++;
398 
399 	    pLockHandle->unlockBufferPool(minHashId);
400 
401 	    isFixed = true;
402 
403 	}
404 
405 	if ( ! isFixed )
406 	{
407 	    if ( numTry >= 2 )
408 		throw Exception(EXLOC, "No more buffers available");
409 
410 	    Chain tableSet = getTabSetName(tabSetId);
411 	    log(_modId, Logger::NOTICE, Chain("Forced checkpoint by bufferFix for tableset ") + tableSet);
412 
413 	    if ( numTry == 0 )
414 		writeCheckPoint(tabSetId, true, Chain(""), 0,  pLockHandle);
415 
416 	    if ( numTry == 1 )
417 	    {
418 
419 		// we force checkpoints for all active tableset,
420 		ListT<int> tsList = getOnlineTableSet();
421 		int* pTS = tsList.First();
422 		while ( pTS )
423 		{
424 		    if ( *pTS != tabSetId )
425 		    {
426 			Chain foreignTableSet = getTabSetName(*pTS);
427 			log(_modId, Logger::NOTICE, Chain("Forced checkpoint by bufferFix for foreign tableset ") + foreignTableSet);
428 			writeCheckPoint(*pTS, true, Chain(""), 0,  pLockHandle);
429 		    }
430 		    pTS = tsList.Next();
431 		}
432 	    }
433 
434 	    numTry++;
435 
436 	    // try again
437 	    return bufferFix(bp, tabSetId, pageId, m, pLockHandle, numTry);
438 
439 	}
440     }
441 
442     bp.setPageSize(_pageSize);
443     bp.setPagePtr((char*)((long long)bufPtr + BUFFERHEAD_SIZE));
444     bp.setPageHead((CegoBufferPage::PageHead*)((long long)bufPtr + BUFFERHEAD_SIZE));
445     bp.setPageId(pageId);
446     bp.setFixed(true);
447 
448 }
449 
emptyFix(CegoBufferPage & bp,int tabSetId,FixMode m,CegoFileHandler::FileType ft,CegoLockHandler * pLockHandle,bool doAppend)450 void CegoBufferPool::emptyFix(CegoBufferPage &bp, int tabSetId, FixMode m, CegoFileHandler::FileType ft, CegoLockHandler *pLockHandle, bool doAppend)
451 {
452     if (_pBufPool == 0)
453     {
454 	throw Exception(EXLOC, "No valid bufferpool" );
455     }
456 
457     PageIdType pageId;
458 
459     unsigned* fbm;
460     int fbmSize=0;
461 
462     allocatePage(tabSetId, ft, pageId, pLockHandle, fbm, fbmSize, doAppend);
463 
464     if ( fbmSize > 0 )
465     {
466 	int fileId = getFileIdForPageId(pageId);
467 	logBM(tabSetId, fileId, fbm, fbmSize, pLockHandle);
468 	delete[] fbm;
469     }
470 
471     bufferFix(bp, tabSetId, pageId, m, pLockHandle);
472     bp.initPage(CegoBufferPage::TABLE);
473     bp.setFixed(true);
474 
475 }
476 
bufferUnfix(CegoBufferPage & bp,bool isDirty,CegoLockHandler * pLockHandle)477 void CegoBufferPool::bufferUnfix(CegoBufferPage &bp, bool isDirty, CegoLockHandler *pLockHandle)
478 {
479     if (_pBufPool == 0)
480     {
481 	throw Exception(EXLOC, "No valid bufferpool" );
482     }
483 
484     int segid = calcSegment(bp.getPageId());
485 
486     void *base = (void*)_pBufPool[segid];
487     char* bufPtr = (char*)bp.getPagePtr();
488     int hashId = ( (long long)bufPtr - (long long)base - BUFFERPOOLHEAD_SIZE ) / (  BUFFERHEAD_SIZE + _pageSize );
489 
490     bufPtr = bufPtr - BUFFERHEAD_SIZE;
491 
492     pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
493 
494     BufferHead* pBH = (BufferHead*)bufPtr;
495 
496     if ( pBH->numFixes > 0 )
497     {
498 	if ( isDirty )
499 	{
500 	    pBH->isDirty = 1;
501 	}
502 	pBH->numFixes--;
503     }
504     else
505     {
506 	pLockHandle->unlockBufferPool(hashId);
507 	throw Exception(EXLOC, "Number of fixes is already zero");
508     }
509 
510     bp.setFixed(false);
511 
512     pLockHandle->unlockBufferPool(hashId);
513 }
514 
bufferRelease(CegoBufferPage & bp,CegoLockHandler * pLockHandle)515 void CegoBufferPool::bufferRelease(CegoBufferPage &bp, CegoLockHandler *pLockHandle)
516 {
517     if (_pBufPool == 0)
518     {
519 	throw Exception(EXLOC, "No valid bufferpool" );
520     }
521 
522     char* bufPtr = (char*)bp.getPagePtr();
523     bufPtr = bufPtr - BUFFERHEAD_SIZE;
524 
525     BufferHead bh;
526 
527     int hashId = calcHash(bp.getPageId());
528 
529     pLockHandle->lockBufferPool(hashId, CegoLockHandler::WRITE);
530 
531 
532     int tabSetId = 0;
533 
534     try
535     {
536 	memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
537 
538 	// save tabSetId value
539 	tabSetId = bh.tabSetId;
540 
541 	bh.isOccupied = NOT_OCCUPIED;
542 	bh.numFixes=0;
543 	bh.isDirty = 0;
544 	bh.tabSetId = 0;
545 	bh.pageId = 0;
546 	bh.fixStat = 0;
547 	bh.numUsage = 0;
548 
549 	memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
550 
551 	// we just collect freed page ids
552 	// they are physically deleted at next checkpoint
553 	_deletedPageList.Insert(bp.getPageId());
554 
555 	pLockHandle->unlockBufferPool(hashId);
556 
557     }
558     catch ( Exception e )
559     {
560 	pLockHandle->unlockBufferPool(hashId);
561 	throw Exception(EXLOC, "Cannot release file page", e);
562     }
563     bp.setFixed(false);
564 
565     if ( _deletedPageList.Size() > _maxPageDelete )
566     {
567 	Chain tableSet = getTabSetName(tabSetId);
568 	log(_modId, Logger::NOTICE, Chain("Forced checkpoint by bufferRelease for tableset ") + tableSet);
569 	writeCheckPoint(tableSet, true, false, pLockHandle);
570     }
571 
572 }
573 
writeCheckPoint(const Chain & tableSet,bool switchLog,bool archComplete,CegoLockHandler * pLockHandle,const Chain & escCmd,int escTimeout,int archTimeout)574 unsigned long long CegoBufferPool::writeCheckPoint(const Chain& tableSet, bool switchLog, bool archComplete, CegoLockHandler *pLockHandle, const Chain& escCmd, int escTimeout, int archTimeout)
575 {
576     int tabSetId = getTabSetId(tableSet);
577 
578     log(_modId, Logger::NOTICE, Chain("Writing checkpoint for tableset ") + tableSet + Chain(" as lsn = ")
579 	+ Chain(getCurrentLSN(tabSetId)) + Chain(" ..."));
580 
581     unsigned long long lsn = writeCheckPoint(tabSetId, switchLog, escCmd, escTimeout, pLockHandle);
582 
583     Datetime tsStart;
584 
585     unsigned long long archExceed = tsStart.asLong() + archTimeout;
586     if ( archComplete )
587     {
588 	while ( archiveComplete(tableSet) == false )
589 	{
590 	    log(_modId, Logger::NOTICE, Chain("Waiting to complete archiving in tableset ") + tableSet + Chain(" ..."));
591 
592 	    Datetime tsNow;
593 	    if ( tsNow.asLong() > archExceed )
594 	    {
595 		throw Exception(EXLOC, "Archiving timeout reached");
596 	    }
597 	    Sleeper s;
598 	    s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
599 	}
600     }
601     return lsn;
602 }
603 
604 
writeCheckPoint(int tabSetId,bool switchLog,const Chain & escCmd,int timeout,CegoLockHandler * pLockHandle)605 unsigned long long CegoBufferPool::writeCheckPoint(int tabSetId, bool switchLog, const Chain& escCmd, int timeout, CegoLockHandler *pLockHandle)
606 {
607 
608     if (_pBufPool == 0)
609     {
610 	throw Exception(EXLOC, "No valid bufferpool" );
611     }
612 
613     try
614     {
615 	pLockHandle->lockBufferPool();
616 
617 	unsigned long long lsn;
618 
619 	if ( escCmd != Chain("") )
620 	{
621 	    log(_modId, Logger::NOTICE, Chain("Executing escape command <") + escCmd + Chain(">"));
622 
623 	    char *pShell = getenv(CGEXESHELLVARNAME);
624 
625 	    Chain shellCmd;
626 	    if ( pShell == NULL )
627 	    {
628 		shellCmd = Chain(CGSTDEXESHELL);
629 	    }
630 	    else
631 	    {
632 		shellCmd = Chain(pShell);
633 	    }
634 
635 	    CommandExecuter cmdExe(shellCmd);
636 
637 	    try
638 	    {
639 		int retCode = cmdExe.execute(escCmd, timeout);
640 		log(_modId, Logger::NOTICE, Chain("Escape command finished with return code : <") + Chain(retCode) + Chain(">"));
641 	    }
642 	    catch ( Exception e )
643 	    {
644 		Chain msg;
645 		e.pop(msg);
646 		log(_modId, Logger::LOGERR, msg);
647 		throw Exception(EXLOC, msg);
648 	    }
649 	}
650 
651 
652 	PageIdType *pDelPage = _deletedPageList.First();
653 	while ( pDelPage )
654 	{
655 	    int fbmSize=0;
656 	    unsigned* fbm;
657 
658 	    // cout << "Releasing page " << *pDelPage << endl;
659 
660 	    releasePage(*pDelPage, pLockHandle, fbm, fbmSize);
661 
662 	    if ( fbmSize > 0 )
663 	    {
664 		int fileId = getFileIdForPageId(*pDelPage);
665 		logBM(tabSetId, fileId, fbm, fbmSize, pLockHandle);
666 		delete[] fbm;
667 	    }
668 	    pDelPage = _deletedPageList.Next();
669 	}
670 
671 	// and reset the list
672 	_deletedPageList.Empty();
673 
674 
675 	// is checkpoint dump enabled for tableset ?
676 
677 	if ( checkPointDumpEnabled(tabSetId) )
678 	{
679 
680 	    // free all collected deleted page id's
681 	    // we don't treat this in dump, since a crash during checkpoint
682 	    // might just result in page leaks which can be corrected via cleanup startup option
683 
684 	    dumpCheckpoint(tabSetId);
685 
686 	    if ( switchLog )
687 	    {
688 		// set commited lsn to current lsn to sync with switchLogFile if log is active
689 		lsn = getCurrentLSN(tabSetId);
690 		setCommittedLSN(tabSetId, lsn);
691 
692 		while ( switchLogFile(tabSetId) == false )
693 		{
694 		    log(_modId, Logger::NOTICE, Chain("Logfile for tabSetId ") + Chain(tabSetId) + Chain(" still active, switch failed"));
695 		    Sleeper s;
696 		    s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
697 		}
698 	    }
699 	    else
700 	    {
701 		// we write a sync in any case
702 		CegoLogRecord lr;
703 		lr.setAction(CegoLogRecord::LOGREC_SYNC);
704 		logAction(tabSetId, lr);
705 	    }
706 
707 	    lsn = getCurrentLSN(tabSetId);
708 	    setCommittedLSN(tabSetId, lsn);
709 
710 	    // force sync
711 	    doc2Xml();
712 
713 	    // now dump to data file
714 	    commitCheckpoint(tabSetId, pLockHandle);
715 
716 	}
717 	else // ( checkPointDumpEnabled(tabSetId) == false )
718 	{
719 
720 	    if ( switchLog )
721 	    {
722 		// set lsn before logfile switch, the method switchLogFile synchronizes with doc2Xml
723 		lsn = getCurrentLSN(tabSetId);
724 		setCommittedLSN(tabSetId, lsn);
725 
726 		while ( switchLogFile(tabSetId) == false )
727 		{
728 		    log(_modId, Logger::NOTICE, Chain("Logfile for tabSetId ") + Chain(tabSetId) + Chain(" still active, switch failed"));
729 		    Sleeper s;
730 		    s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
731 		}
732 	    }
733 	    else
734 	    {
735 		// we write a sync in any case
736 		CegoLogRecord lr;
737 		lr.setAction(CegoLogRecord::LOGREC_SYNC);
738 		logAction(tabSetId, lr);
739 
740 		lsn = getCurrentLSN(tabSetId);
741 		setCommittedLSN(tabSetId, lsn);
742 	    }
743 
744 	    Chain currentState = getTableSetRunState(tabSetId);
745 	    setTableSetRunState(tabSetId, XML_CHECKPOINT_VALUE);
746 
747 	    // sync control information to xml database file
748 	    doc2Xml();
749 
750 	    for ( int i=0; i<_numSegment; i++)
751 	    {
752 		void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
753 
754 		int pageCount = 0;
755 
756 		while ( pageCount < _numPages )
757 		{
758 		    BufferHead bh;
759 		    memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
760 
761 		    if ( bh.isOccupied != NOT_OCCUPIED && bh.isDirty != 0 && bh.tabSetId == tabSetId )
762 		    {
763 			if ( currentState == Chain(XML_BACKUP_VALUE) )
764 			{
765 			    if ( needPageBackup(bh.pageId) )
766 			    {
767 #ifdef DEBUG
768 				log(_modId, Logger::DEBUG, Chain("Reading page (") + Chain(bh.fileId) + Chain(",") + Chain(bh.pageId) + Chain(") to log ..."));
769 #endif
770 				int ts;
771 				unsigned fixStat;
772 				char *pageData = new char[_pageSize];
773 				readPage(bh.pageId, ts, fixStat, pageData, pLockHandle);
774 
775 				CegoLogRecord lr;
776 				lr.setAction(CegoLogRecord::LOGREC_BUPAGE);
777 				lr.setData(pageData);
778 				lr.setDataLen(_pageSize);
779 				lr.setPageId(bh.pageId);
780 
781 				if ( logAction(tabSetId, lr) == CegoLogManager::LOG_FULL )
782 				{
783 #ifdef DEBUG
784 				    log(_modId, Logger::DEBUG, Chain("Switching logFiles ..."));
785 #endif
786 
787 				    while ( switchLogFile(tabSetId) == false )
788 				    {
789 					log(_modId, Logger::NOTICE, Chain("Logfile for tabSetId ") + Chain(tabSetId) + Chain(" still active, switch failed"));
790 
791 					Sleeper s;
792 					s.secSleep(LOGMNG_LOGSWITCH_WAIT_DELAY);
793 
794 				    }
795 
796 				    if ( logAction(tabSetId, lr) != CegoLogManager::LOG_SUCCESS )
797 				    {
798 					delete[] pageData;
799 					throw Exception(EXLOC, "Cannot write to log");
800 				    }
801 				}
802 
803 				delete[] pageData;
804 			    }
805 			}
806 			_numDiskWrite++;
807 			_diskWriteTimer.start();
808 
809 			writePage(bh.pageId, bh.fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
810 			_diskWriteTimer.stop();
811 
812 			_avgWriteDelay = ( _diskWriteTimer.getSum() /  (_numDiskWrite+(unsigned long long)1) ) / 1000;
813 
814 			bh.isDirty = 0;
815 			memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
816 		    }
817 		    bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
818 		    pageCount++;
819 		}
820 	    }
821 	    // set back tableset runstate to saved value
822 	    setTableSetRunState(tabSetId, currentState);
823 	    // sync control information to xml database file
824 	    doc2Xml();
825 	}
826 
827 	optimizePool(tabSetId);
828 
829 	_cpCount++;
830 
831 	pLockHandle->unlockBufferPool();
832 	return lsn;
833 
834     }
835     catch ( Exception e )
836     {
837 	// if we got an exception, we have to unlock the still locked bufferpool area
838 	pLockHandle->unlockBufferPool();
839 	throw e;
840     }
841 }
842 
getCPCount() const843 unsigned long long CegoBufferPool::getCPCount() const
844 {
845     return _cpCount;
846 }
847 
dumpCheckpoint(int tabSetId)848 void CegoBufferPool::dumpCheckpoint(int tabSetId)
849 {
850     Chain tableSet = getTabSetName(tabSetId);
851     Chain tsRoot = getTSRoot(tableSet);
852 
853     CegoCheckpointDump cpd(tableSet, tsRoot, _pageSize);
854 
855     cpd.startWrite();
856 
857     for ( int i=0; i<_numSegment; i++)
858     {
859 	void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
860 
861 	int pageCount = 0;
862 
863 	while ( pageCount < _numPages )
864 	{
865 	    BufferHead bh;
866 	    memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
867 
868 	    if ( bh.isOccupied != NOT_OCCUPIED && bh.isDirty != 0 && bh.tabSetId == tabSetId )
869 	    {
870 		cpd.dumpPage(bh.pageId, bh.fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE));
871 		bh.isDirty = 0;
872 		memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
873 	    }
874 	    bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
875 	    pageCount++;
876 	}
877     }
878 
879     cpd.finish();
880 }
881 
commitCheckpoint(int tabSetId,CegoLockHandler * pLockHandle)882 void CegoBufferPool::commitCheckpoint(int tabSetId, CegoLockHandler *pLockHandle)
883 {
884     Chain currentState = getTableSetRunState(tabSetId);
885 
886     Chain tableSet = getTabSetName(tabSetId);
887     Chain tsRoot = getTSRoot(tableSet);
888 
889     CegoCheckpointDump cpd(tableSet, tsRoot, _pageSize);
890 
891 
892     // commit the dump
893     cpd.commitDump();
894 
895     // cout << "CRASH SIMULATION .." << endl;
896     // for Debuging: Simuldate crash here
897     // int *pCrash;
898     // *pCrash = 42;
899 
900     PageIdType pageId;
901     unsigned fixStat;
902     char *buf = new char[_pageSize];
903 
904     cpd.startRead();
905 
906     bool logExceedDetected=false;
907 
908     while ( cpd.readDump(pageId, fixStat, buf) )
909     {
910 	if ( currentState == Chain(XML_BACKUP_VALUE) )
911 	{
912 	    if ( needPageBackup(pageId) )
913 	    {
914 #ifdef DEBUG
915 		log(_modId, Logger::DEBUG, Chain("Reading page (") + Chain(bh.fileId) + Chain(",") + Chain(bh.pageId) + Chain(") to log ..."));
916 #endif
917 		int ts;
918 		unsigned oldFixStat;
919 		char *oldPageData = new char[_pageSize];
920 		try
921 		{
922 		    readPage(pageId, ts, oldFixStat, oldPageData, pLockHandle);
923 
924 		    CegoLogRecord lr;
925 		    lr.setAction(CegoLogRecord::LOGREC_BUPAGE);
926 		    lr.setData(oldPageData);
927 		    lr.setDataLen(_pageSize);
928 		    lr.setPageId(pageId);
929 
930 		    if ( logAction(tabSetId, lr) == CegoLogManager::LOG_FULL && logExceedDetected == false )
931 		    {
932 			logExceedDetected = true;
933 			log(_modId, Logger::NOTICE, Chain("Logfile exceeded limit during checkpoint dump at offset ") + Chain(getLogOffset(tabSetId)));
934 		    }
935 		}
936 		catch ( Exception e )
937 		{
938 		    delete[] oldPageData;
939 		    throw e;
940 		}
941 		delete[] oldPageData;
942 	    }
943 	}
944 	_numDiskWrite++;
945 	_diskWriteTimer.start();
946 
947 	writePage(pageId, fixStat, buf, pLockHandle);
948 	_diskWriteTimer.stop();
949 
950 	_avgWriteDelay = ( _diskWriteTimer.getSum() /  (_numDiskWrite+(unsigned long long)1) ) / 1000;
951     }
952 
953     delete[] buf;
954 
955     cpd.remove();
956 }
957 
restoreCheckpointDump(int tabSetId,CegoLockHandler * pLockHandle)958 void CegoBufferPool::restoreCheckpointDump(int tabSetId, CegoLockHandler *pLockHandle)
959 {
960     Chain tableSet = getTabSetName(tabSetId);
961     Chain tsRoot = getTSRoot(tableSet);
962 
963     CegoCheckpointDump cpd(tableSet, tsRoot, _pageSize);
964 
965     if ( cpd.readyDumpExists() )
966     {
967 
968 	PageIdType pageId;
969 	unsigned fixStat;
970 	char *buf = new char[_pageSize];
971 
972 	cpd.startRead();
973 
974 	while ( cpd.readDump(pageId, fixStat, buf) )
975 	{
976 	    _numDiskWrite++;
977 	    _diskWriteTimer.start();
978 
979 	    writePage(pageId, fixStat, buf, pLockHandle);
980 	    _diskWriteTimer.stop();
981 
982 	    _avgWriteDelay = ( _diskWriteTimer.getSum() /  (_numDiskWrite+(unsigned long long)1) ) / 1000;
983 	}
984 
985 	delete[] buf;
986 
987 	cpd.remove();
988     }
989 
990     cpd.cleanUp();
991 }
992 
writeAndRemoveTabSet(int tabSetId,CegoLockHandler * pLockHandle)993 void CegoBufferPool::writeAndRemoveTabSet(int tabSetId, CegoLockHandler *pLockHandle)
994 {
995     if (_pBufPool == 0)
996     {
997 	throw Exception(EXLOC, "No valid bufferpool");
998     }
999 
1000     for ( int i=0; i<_numSegment; i++)
1001     {
1002 	void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1003 
1004 	int pageCount = 0;
1005 
1006 	while ( pageCount < _numPages )
1007 	{
1008 	    BufferHead bh;
1009 	    memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1010 
1011 	    if ( bh.tabSetId == tabSetId )
1012 	    {
1013 		if ( bh.isOccupied != NOT_OCCUPIED && bh.isDirty != 0 )
1014 		{
1015 		    _numDiskWrite++;
1016 
1017 		    _diskWriteTimer.start();
1018 		    writePage(bh.pageId, bh.fixStat, (char*)((long long)bufPtr + BUFFERHEAD_SIZE), pLockHandle);
1019 		    _diskWriteTimer.stop();
1020 
1021 		    _avgWriteDelay = ( _diskWriteTimer.getSum() /  (_numDiskWrite+(unsigned long long)1) ) / 1000;
1022 		}
1023 
1024 		bh.isOccupied = NOT_OCCUPIED;
1025 		bh.numFixes=0;
1026 
1027 		memcpy(bufPtr, &bh, BUFFERHEAD_SIZE);
1028 
1029 	    }
1030 	    bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1031 	    pageCount++;
1032 	}
1033     }
1034     releaseFiles(tabSetId);
1035 }
1036 
uptime() const1037 unsigned long long CegoBufferPool::uptime() const
1038 {
1039     Datetime ts;
1040     return ts.asLong() - _poolStart;
1041 }
1042 
poolInfo(int & pageSize,unsigned long long & numTotal,unsigned long long & numUsed,unsigned long long & numFree,unsigned long long & numDirty,unsigned long long & numFixes,unsigned long long & numPersistent,unsigned long long & numNoSync,unsigned long long & numDiskRead,unsigned long long & numDiskWrite,double & hitRate,double & spreadRate,unsigned long long & readDelay,unsigned long long & writeDelay,unsigned long long & curFixCount,unsigned long long & maxFixCount,unsigned long long & avgFixTry,unsigned long long & statStart,unsigned long long & uptime) const1043 void CegoBufferPool::poolInfo(int& pageSize,
1044 			      unsigned long long& numTotal,
1045 			      unsigned long long& numUsed,
1046 			      unsigned long long& numFree,
1047 			      unsigned long long& numDirty,
1048 			      unsigned long long& numFixes,
1049 			      unsigned long long& numPersistent,
1050 			      unsigned long long& numNoSync,
1051 			      unsigned long long& numDiskRead,
1052 			      unsigned long long& numDiskWrite,
1053 			      double& hitRate,
1054 			      double& spreadRate,
1055 			      unsigned long long& readDelay,
1056 			      unsigned long long& writeDelay,
1057 			      unsigned long long& curFixCount,
1058 			      unsigned long long& maxFixCount,
1059 			      unsigned long long& avgFixTry,
1060 			      unsigned long long& statStart,
1061                               unsigned long long& uptime) const
1062 {
1063     if (_pBufPool == 0)
1064     {
1065 	throw Exception(EXLOC, "No valid bufferpool" );
1066     }
1067 
1068     pageSize = _pageSize;
1069     numTotal = _numPages * _numSegment;
1070     numUsed=0;
1071     numFree=0;
1072     numFixes=0;
1073     numDirty=0;
1074     numPersistent=0;
1075     numNoSync=0;
1076 
1077     unsigned long long numUsage = 0;
1078 
1079     for ( int i=0; i<_numSegment; i++)
1080     {
1081 	void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1082 
1083 	unsigned long long pageCount = 0;
1084 
1085 	unsigned long long numSegPageUsed = 0;
1086 
1087 	while ( pageCount < _numPages )
1088 	{
1089 	    BufferHead bh;
1090 	    memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1091 
1092 	    numFixes += (long)bh.numFixes;
1093 	    numDirty += (long)bh.isDirty;
1094 
1095 	    // we don't count persistent hot spot pages
1096 	    if ( bh.isOccupied != PERSISTENT_OCCUPIED )
1097 		numUsage += bh.numUsage;
1098 
1099 	    if (bh.isOccupied == NOT_OCCUPIED)
1100 	    {
1101 		numFree++;
1102 	    }
1103 	    else if (bh.isOccupied == WRITE_ON_SYNC)
1104 	    {
1105 		numSegPageUsed++;
1106 		numUsed++;
1107 	    }
1108 	    else if (bh.isOccupied == WRITE_ON_DIRTY)
1109 	    {
1110 		numSegPageUsed++;
1111 		numUsed++;
1112 		numNoSync++;
1113 	    }
1114 	    else if (bh.isOccupied == PERSISTENT_OCCUPIED)
1115 	    {
1116 		numSegPageUsed++;
1117 		numUsed++;
1118 		numPersistent++;
1119 	    }
1120 
1121 	    bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1122 	    pageCount++;
1123 	}
1124 	// cout << "Segment " << i << " : " << numSegPageUsed << " pages used" << endl;
1125     }
1126 
1127     numDiskWrite = _numDiskWrite;
1128     numDiskRead = _numDiskRead;
1129 
1130     // hitRate = BUPMNG_STATSPERIOD - _numDiskRead / ( _fixCount / BUPMNG_STATSPERIOD + 1) ;
1131     hitRate = 100.0 * ( (double)_fixCount - _numDiskRead + 1 ) / ( (double)_fixCount + 1 );
1132 
1133     double usageMedian = (double)numUsage / (double)( _numSegment * _numPages );
1134     double usageDist = 0.0;
1135 
1136     for ( int i=0; i<_numSegment; i++)
1137     {
1138 	void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1139 
1140 	unsigned long long pageCount = 0;
1141 
1142 	while ( pageCount < _numPages )
1143 	{
1144 	    BufferHead bh;
1145 	    memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1146 
1147 	    if ( bh.isOccupied != PERSISTENT_OCCUPIED )
1148 	    {
1149 		usageMedian > bh.numUsage ? usageDist += usageMedian - (double)bh.numUsage : usageDist += (double)bh.numUsage - usageMedian;
1150 	    }
1151 
1152 	    bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1153 	    pageCount++;
1154 	}
1155     }
1156 
1157     double avgDist = usageDist / (double)( _numSegment * _numPages );
1158     // cout << "DistMed = " << usageMedian << endl;
1159     // cout << "AvgDist = " << avgDist << endl;
1160     // cout << "NumUsage  = " << numUsage << endl;
1161     spreadRate =  avgDist / ( usageMedian + 1 );
1162 
1163     // cout << "Fix Dist Rate = " << fixDistRate << endl;
1164 
1165     readDelay = _avgReadDelay;
1166     writeDelay = _avgWriteDelay;
1167     statStart = _statStart;
1168     curFixCount = _fixCount;
1169     avgFixTry = _fixCount > 0 ? _fixTry / _fixCount : 0;
1170     maxFixCount = BUPMNG_STATSPERIOD;
1171 
1172     Datetime ts;
1173     uptime = ts.asLong() - _poolStart;
1174 }
1175 
getPoolEntryList(ListT<CegoBufferPoolEntry> & entryList)1176 void CegoBufferPool::getPoolEntryList(ListT<CegoBufferPoolEntry>& entryList)
1177 {
1178     if (_pBufPool == 0)
1179     {
1180 	throw Exception(EXLOC, "No valid bufferpool" );
1181     }
1182 
1183     entryList.Empty();
1184 
1185     for ( int seg=0; seg<_numSegment; seg++)
1186     {
1187 	void *bufPtr = (void*)( (long long)_pBufPool[seg] + BUFFERPOOLHEAD_SIZE );
1188 
1189 	unsigned long long pageCount = 0;
1190 
1191 	while ( pageCount < _numPages )
1192 	{
1193 	    BufferHead bh;
1194 	    memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1195 
1196 	    bool isDirty = false;
1197 	    if ( bh.isDirty )
1198 		isDirty=true;
1199 
1200 	    Chain occState;
1201 	    if (bh.isOccupied == NOT_OCCUPIED)
1202 		occState=Chain("NOTOCCUPIED");
1203 	    else if (bh.isOccupied == WRITE_ON_SYNC)
1204 		occState=Chain("WRITEONSYNC");
1205 	    else if (bh.isOccupied == WRITE_ON_DIRTY)
1206 		occState=Chain("WRITEONDIRTY");
1207 	    else if (bh.isOccupied == PERSISTENT_OCCUPIED)
1208 		occState=Chain("PERSISTENT");
1209 
1210 	    CegoBufferPoolEntry bpe(seg, pageCount, occState, isDirty, bh.numFixes, bh.tabSetId, bh.pageId, bh.fixStat, bh.numUsage);
1211 	    entryList.Insert(bpe);
1212 
1213 	    bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1214 	    pageCount++;
1215 	}
1216     }
1217 }
1218 
resetStats()1219 void CegoBufferPool::resetStats()
1220 {
1221     _numDiskRead=0;
1222     _numDiskWrite=0;
1223     _fixCount=0;
1224     _fixTry=0;
1225     _avgReadDelay=0;
1226     _avgWriteDelay=0;
1227     _diskReadTimer.reset();
1228     _diskWriteTimer.reset();
1229     Datetime ts;
1230     _statStart = ts.asLong();
1231 }
1232 
logIt(int tabSetId,CegoLogRecord & lr,CegoLockHandler * pLockHandle,bool flushLog)1233 void CegoBufferPool::logIt(int tabSetId, CegoLogRecord& lr, CegoLockHandler* pLockHandle, bool flushLog)
1234 {
1235     _lmLock[tabSetId].writeLock(LM_LOCKTIMEOUT);
1236 
1237     try
1238     {
1239 	CegoLogManager::LogResult res = logAction(tabSetId, lr, flushLog);
1240 
1241 	if ( res == CegoLogManager::LOG_FULL )
1242 	{
1243 	    // if log is full, we have to force a checkpoint
1244 	    // to be consistent, log entry has already been written to log
1245 
1246 	    Chain tableSet = getTabSetName(tabSetId);
1247 	    writeCheckPoint(tableSet, true, false, pLockHandle);
1248 	}
1249 
1250 	if ( res == CegoLogManager::LOG_ERROR )
1251 	{
1252 	    Chain tableSet = getTabSetName(tabSetId);
1253 	    setTableSetSyncState(tableSet, XML_LOG_LOSS_VALUE);
1254 
1255 	    throw Exception(EXLOC, Chain("Cannot write to log"));
1256 	}
1257     }
1258     catch ( Exception e )
1259     {
1260 	_lmLock[tabSetId].unlock();
1261 	throw e;
1262     }
1263 
1264     _lmLock[tabSetId].unlock();
1265 }
1266 
printPool()1267 void CegoBufferPool::printPool()
1268 {
1269     if (_pBufPool == 0)
1270     {
1271 	throw Exception(EXLOC, "No valid bufferpool" );
1272     }
1273 
1274     cout << "--- BufferPool ---" << endl;
1275     cout << "BasePtr: " << (long long)_pBufPool << endl;
1276     cout << "PageSize: " << _pageSize << endl;
1277     cout << "NumPages: " << _numPages << endl;
1278 
1279     unsigned long long numUsed=0;
1280     unsigned long long numFree=0;
1281     unsigned long long numFixes=0;
1282     unsigned long long numDirty=0;
1283 
1284     for ( int i=0; i<_numSegment; i++)
1285     {
1286 	void *bufPtr = (void*)( (long long)_pBufPool[i] + BUFFERPOOLHEAD_SIZE );
1287 
1288 	unsigned long long pageCount = 0;
1289 
1290 	while ( pageCount < _numPages )
1291 	{
1292 	    BufferHead bh;
1293 	    memcpy(&bh, bufPtr, BUFFERHEAD_SIZE);
1294 
1295 	    numFixes += bh.numFixes;
1296 	    numDirty += (unsigned long long)bh.isDirty;
1297 
1298 	    if (bh.isOccupied == 0)
1299 		numFree++;
1300 	    else
1301 		numUsed++;
1302 
1303 	    bufPtr = (void*)((long long)bufPtr + (long long)(BUFFERHEAD_SIZE + _pageSize));
1304 	    pageCount++;
1305 	}
1306     }
1307 
1308     cout << "NumUsed: " << numUsed << endl;
1309     cout << "NumFixes: " << numFixes << endl;
1310     cout << "NumDirty: " << numDirty << endl;
1311     cout << "NumFree: " << numFree << endl;
1312 
1313     cout << "NumFree: " << numFree << endl;
1314 }
1315 
getLMLockStat(int tabSetId,Chain & lockName,int & lockCount,unsigned long long & numRdLock,unsigned long long & numWrLock,unsigned long long & sumRdDelay,unsigned long long & sumWrDelay)1316 void CegoBufferPool::getLMLockStat(int tabSetId, Chain& lockName, int& lockCount, unsigned long long &numRdLock, unsigned long long &numWrLock, unsigned long long &sumRdDelay, unsigned long long &sumWrDelay)
1317 {
1318     lockName = getTabSetName(tabSetId);
1319     lockCount = _lmLock[tabSetId].numLockTry();
1320 
1321     numRdLock = _lmLock[tabSetId].numReadLock();
1322     numWrLock = _lmLock[tabSetId].numWriteLock();
1323     sumRdDelay = 0;
1324     sumWrDelay = 0;
1325 
1326     if ( _lmLock[tabSetId].numReadLock() > 0 )
1327 	sumRdDelay = _lmLock[tabSetId].sumReadDelay() / LCKMNG_DELRES;
1328     if ( _lmLock[tabSetId].numWriteLock() > 0 )
1329 	sumWrDelay = _lmLock[tabSetId].sumWriteDelay() / LCKMNG_DELRES;
1330 }
1331 
1332 /////////////////////
1333 // private methods //
1334 /////////////////////
1335 
calcHash(PageIdType pageId)1336 int CegoBufferPool::calcHash(PageIdType pageId)
1337 {
1338     // we have to cast to long, since for large fileid values, we might exceed integer range
1339     // unsigned long long s = (unsigned long long)(fileId+1) * (unsigned long long)_hashkey + (unsigned long long)(pageId+1);
1340 
1341     unsigned long long s = (unsigned long long)(pageId+1);
1342     unsigned long long d = (unsigned long long)calcSegment(pageId) * (unsigned long long)_numPages;
1343 
1344     if ( s > d )
1345 	s = s - d;
1346 
1347     return (int)( s % (unsigned long long)_numPages);
1348 }
1349 
calcSegment(PageIdType pageId)1350 int CegoBufferPool::calcSegment(PageIdType pageId)
1351 {
1352     return (int)( ( pageId / (unsigned long long)_numPages ) % (unsigned long long)_numSegment);
1353 }
1354 
1355 
logBM(int tabSetId,int fileId,unsigned * fbm,int fbmSize,CegoLockHandler * pLockHandle)1356 void CegoBufferPool::logBM(int tabSetId, int fileId, unsigned* fbm, int fbmSize, CegoLockHandler* pLockHandle)
1357 {
1358     CegoLogRecord lr;
1359 
1360     lr.setAction(CegoLogRecord::LOGREC_BUFBM);
1361     lr.setData((char*)fbm);
1362     lr.setDataLen(fbmSize * sizeof(unsigned));
1363     lr.setFileId(fileId);
1364 
1365     logIt(tabSetId, lr, pLockHandle);
1366 }
1367 
archiveComplete(const Chain & tableSet)1368 bool CegoBufferPool::archiveComplete(const Chain& tableSet)
1369 {
1370     ListT<Chain> lfList;
1371     ListT<int> sizeList;
1372     ListT<Chain> statusList;
1373 
1374     getLogFileInfo(tableSet, lfList, sizeList, statusList);
1375 
1376     Chain *pStatus = statusList.First();
1377 
1378     while ( pStatus )
1379     {
1380 	if ( *pStatus == Chain(XML_OCCUPIED_VALUE))
1381 	{
1382 	    return false;
1383 	}
1384 	else
1385 	{
1386 	    pStatus = statusList.Next();
1387 	}
1388     }
1389     return true;
1390 }
1391 
optimizePool(int tabSetId)1392 void CegoBufferPool::optimizePool(int tabSetId)
1393 {
1394     Chain tableSet = getTabSetName(tabSetId);
1395     log(_modId, Logger::NOTICE, Chain("Optimizing bufferpool for tableset ") + tableSet + Chain(" ..."));
1396 
1397     unsigned long long numRelocated = 0;
1398 
1399     for ( int seg=0; seg<_numSegment; seg++)
1400     {
1401 	void *basePtr = (void*)( (long long)_pBufPool[seg] + BUFFERPOOLHEAD_SIZE );
1402 
1403 	unsigned long long pageIndex = 0;
1404 
1405 	while ( pageIndex < _numPages )
1406 	{
1407 
1408 	    BufferHead bh;
1409 	    memcpy(&bh, basePtr, BUFFERHEAD_SIZE);
1410 
1411 	    if ( ( bh.isOccupied == WRITE_ON_SYNC || bh.isOccupied == WRITE_ON_DIRTY ) && bh.tabSetId == tabSetId && bh.numFixes == 0 )
1412 	    {
1413 		int hashId = calcHash(bh.pageId);
1414 
1415 		if ( pageIndex != hashId )
1416 		{
1417 		    // cout << "PageId=" << pageId << ", Seg=" << segid << ", HashId=" << hashId << endl;
1418 
1419 		    void* checkPtr = (void*)((long long)_pBufPool[seg] + BUFFERPOOLHEAD_SIZE + (long long )hashId * (long long)( BUFFERHEAD_SIZE + _pageSize ));
1420 
1421 		    BufferHead cbh;
1422 		    memcpy(&cbh, checkPtr, BUFFERHEAD_SIZE);
1423 
1424 
1425 		    if ( cbh.isOccupied == NOT_OCCUPIED
1426 			 || ( cbh.numFixes == 0 && ( cbh.fixStat + BUPMNG_RELOCATION_THRESHOLD ) < bh.fixStat && cbh.isDirty == 0 ) )
1427 		    {
1428 
1429 			/*
1430 			cout << "Drop Page:" << endl;
1431 			cout << "PageId = " << cbh.pageId << endl;
1432 			cout << "FixStat = " << cbh.fixStat << endl;
1433 			cout << "Relocate Page:" << endl;
1434 			cout << "PageId = " << bh.pageId << endl;
1435 			cout << "FixStat = " << bh.fixStat << endl;
1436 			*/
1437 
1438 			numRelocated++;
1439 
1440 			// relocate page
1441 			memcpy(checkPtr, basePtr, BUFFERHEAD_SIZE + _pageSize);
1442 
1443 			// clean slot
1444 			BufferHead ibh;
1445 			ibh.isOccupied = NOT_OCCUPIED;
1446 			ibh.isDirty = 0;
1447 			ibh.numFixes = 0;
1448 			ibh.tabSetId = 0;
1449 			ibh.pageId = 0;
1450 			ibh.fixStat = 0;
1451 			ibh.numUsage = 0;
1452 			memcpy(basePtr, &ibh, BUFFERHEAD_SIZE );
1453 
1454 		    }
1455 		}
1456 	    }
1457 
1458 	    pageIndex++;
1459 
1460 	    basePtr = (void*)((long long)basePtr + (long long)( BUFFERHEAD_SIZE + _pageSize ));
1461 	}
1462     }
1463 
1464 
1465     log(_modId, Logger::NOTICE, Chain(numRelocated) + Chain(" pages relocated "));
1466 }
1467