1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // CegoDatabaseManager.cc
4 // ----------------------
5 // Cego Database Manager implementation
6 //
7 // Design and Implementation by Bjoern Lemke
8 //
9 // (C)opyright 2000-2019 Bjoern Lemke
10 //
11 // IMPLEMENTATION MODULE
12 //
13 // Class: CegoDatabaseManager
14 //
15 // Description: General database manager
16 //
17 // Status: CLEAN
18 //
19 ///////////////////////////////////////////////////////////////////////////////
20 
21 // LFC INCLUDES
22 #include <lfcbase/ThreadLock.h>
23 #include <lfcbase/File.h>
24 #include <lfcbase/Datetime.h>
25 #include <lfcbase/Sleeper.h>
26 #include <lfcbase/Net.h>
27 #include <lfcbase/NetHandler.h>
28 #include <lfcxml/XMLSuite.h>
29 #include <lfcxml/Element.h>
30 #include <lfcxml/Document.h>
31 
32 // CEGO INCLUDES
33 #include "CegoDatabaseManager.h"
34 #include "CegoTypeConverter.h"
35 #include "CegoDefs.h"
36 #include "CegoXMLdef.h"
37 #include "CegoXMLSpace.h"
38 
39 // POSIX INCLUDES
40 #include <string.h>
41 #include <stdlib.h>
42 
43 static ThreadLock dbmLock("DBM");
44 extern bool __lockStatOn;
45 
46 //////////////////
47 // ObjectRecord //
48 //////////////////
49 
ObjectRecord()50 CegoDatabaseManager::ObjectRecord::ObjectRecord()
51 {
52 }
53 
ObjectRecord(const int tabSetId,const Chain & objName,CegoObject::ObjectType type)54 CegoDatabaseManager::ObjectRecord::ObjectRecord(const int tabSetId, const Chain& objName, CegoObject::ObjectType type)
55 {
56     _objName = objName;
57     _type = type;
58     _tabSetId = tabSetId;
59     _numUsed = 0;
60     _mode = SHARED;
61     _tid = 0;
62 }
63 
~ObjectRecord()64 CegoDatabaseManager::ObjectRecord::~ObjectRecord()
65 {
66 }
67 
getTabSetId() const68 const int CegoDatabaseManager::ObjectRecord::getTabSetId() const
69 {
70     return _tabSetId;
71 }
72 
incUsed()73 void CegoDatabaseManager::ObjectRecord::incUsed()
74 {
75     _numUsed++;
76 }
77 
decUsed()78 void CegoDatabaseManager::ObjectRecord::decUsed()
79 {
80     if ( _numUsed > 0 )
81 	_numUsed--;
82 }
83 
numUsed() const84 int CegoDatabaseManager::ObjectRecord::numUsed() const
85 {
86     return _numUsed;
87 }
88 
setMode(ObjectUseMode mode)89 void CegoDatabaseManager::ObjectRecord::setMode(ObjectUseMode mode)
90 {
91     _mode = mode;
92 }
93 
getMode() const94 CegoDatabaseManager::ObjectUseMode CegoDatabaseManager::ObjectRecord::getMode() const
95 {
96     return _mode;
97 }
98 
setTid(unsigned long long tid)99 void CegoDatabaseManager::ObjectRecord::setTid(unsigned long long tid)
100 {
101     _tid = tid;
102 }
103 
getTid() const104 unsigned long long CegoDatabaseManager::ObjectRecord::getTid() const
105 {
106     return _tid;
107 }
108 
getName() const109 const Chain& CegoDatabaseManager::ObjectRecord::getName() const
110 {
111     return _objName;
112 }
113 
getType() const114 const CegoObject::ObjectType CegoDatabaseManager::ObjectRecord::getType() const
115 {
116     return _type;
117 }
118 
operator =(const CegoDatabaseManager::ObjectRecord & t)119 CegoDatabaseManager::ObjectRecord& CegoDatabaseManager::ObjectRecord::operator = ( const CegoDatabaseManager::ObjectRecord& t)
120 {
121     _tabSetId = t._tabSetId;
122     _objName = t._objName;
123     _type = t._type;
124     _numUsed = t._numUsed;
125     _mode = t._mode;
126     _tid = t._tid;
127     return (*this);
128 }
129 
operator ==(const CegoDatabaseManager::ObjectRecord & r)130 bool CegoDatabaseManager::ObjectRecord::operator == ( const CegoDatabaseManager::ObjectRecord& r)
131 {
132     bool typeMatch=false;
133     if ( ( _type == CegoObject::PAVLTREE || _type == CegoObject::UAVLTREE || _type == CegoObject::AVLTREE )
134 	 && ( r._type == CegoObject::PAVLTREE || r._type == CegoObject::UAVLTREE || r._type == CegoObject::AVLTREE ) )
135     {
136 	typeMatch=true;
137     }
138     else if ( ( _type == CegoObject::PBTREE || _type == CegoObject::UBTREE || _type == CegoObject::BTREE )
139 	&& ( r._type == CegoObject::PBTREE || r._type == CegoObject::UBTREE || r._type == CegoObject::BTREE ) )
140     {
141 	typeMatch=true;
142     }
143     else
144     {
145 	typeMatch = _type == r._type;
146     }
147 
148     if ( _tabSetId == r._tabSetId && _objName == r._objName && typeMatch )
149 	return true;
150     return false;
151 }
152 
153 //////////////////
154 // CopyRecord //
155 //////////////////
156 
CopyRecord()157 CegoDatabaseManager::CopyRecord::CopyRecord()
158 {
159 }
160 
CopyRecord(const Chain & tableSet,const Chain & targetHost,const Chain & mediatorHost,const Chain & user,const Chain & passwd,const Chain & msg)161 CegoDatabaseManager::CopyRecord::CopyRecord(const Chain& tableSet, const Chain& targetHost, const Chain& mediatorHost, const Chain& user, const Chain& passwd, const Chain& msg)
162 {
163     _id = 0;
164     _tableSet = tableSet;
165     _targetHost = targetHost;
166     _mediatorHost = mediatorHost;
167     _user = user;
168     _passwd = passwd;
169     _msg = msg;
170 }
171 
~CopyRecord()172 CegoDatabaseManager::CopyRecord::~CopyRecord()
173 {
174 }
175 
getTableSet() const176 const Chain& CegoDatabaseManager::CopyRecord::getTableSet() const
177 {
178     return _tableSet;
179 }
180 
getTargetHost() const181 const Chain& CegoDatabaseManager::CopyRecord::getTargetHost() const
182 {
183     return _targetHost;
184 }
185 
getMediatorHost() const186 const Chain& CegoDatabaseManager::CopyRecord::getMediatorHost() const
187 {
188     return _mediatorHost;
189 }
190 
getUser() const191 const Chain& CegoDatabaseManager::CopyRecord::getUser() const
192 {
193     return _user;
194 }
195 
getPasswd() const196 const Chain& CegoDatabaseManager::CopyRecord::getPasswd() const
197 {
198     return _passwd;
199 }
200 
setMsg(const Chain & msg)201 void CegoDatabaseManager::CopyRecord::setMsg(const Chain& msg)
202 {
203     _msg = msg;
204 }
205 
getMsg() const206 const Chain& CegoDatabaseManager::CopyRecord::getMsg() const
207 {
208     return _msg;
209 }
210 
getId() const211 int CegoDatabaseManager::CopyRecord::getId() const
212 {
213     return _id;
214 }
215 
setId(int id)216 void CegoDatabaseManager::CopyRecord::setId(int id)
217 {
218     _id = id;
219 }
220 
operator =(const CegoDatabaseManager::CopyRecord & cr)221 CegoDatabaseManager::CopyRecord& CegoDatabaseManager::CopyRecord::operator = ( const CegoDatabaseManager::CopyRecord& cr)
222 {
223     _id = cr._id;
224     _tableSet = cr._tableSet;
225     _targetHost = cr._targetHost;
226     _mediatorHost = cr._mediatorHost;
227 
228     _user = cr._user;
229     _passwd = cr._passwd;
230     _msg = cr._msg;
231 
232     return (*this);
233 }
234 
operator ==(const CegoDatabaseManager::CopyRecord & cr)235 bool CegoDatabaseManager::CopyRecord::operator == ( const CegoDatabaseManager::CopyRecord& cr)
236 {
237     if ( _tableSet == cr._tableSet && _targetHost == cr._targetHost)
238 	return true;
239     return false;
240 }
241 
242 /////////////////////
243 // DbSessionRecord //
244 /////////////////////
245 
DbSessionRecord()246 CegoDatabaseManager::DbSessionRecord::DbSessionRecord()
247 {
248 }
249 
DbSessionRecord(CegoDistDbHandler * pHandler)250 CegoDatabaseManager::DbSessionRecord::DbSessionRecord(CegoDistDbHandler* pHandler)
251 {
252     _pHandler = pHandler;
253 }
254 
DbSessionRecord(const Chain & hostName,const Chain & tableSet,const Chain & userName,CegoDistDbHandler * pHandler)255 CegoDatabaseManager::DbSessionRecord::DbSessionRecord(const Chain& hostName, const Chain& tableSet,
256 				     const Chain& userName, CegoDistDbHandler* pHandler)
257 {
258     _hostName = hostName;
259     _tableSet = tableSet;
260     _userName = userName;
261     _pHandler = pHandler;
262     _lastUsed = Datetime().asLong();
263 }
264 
~DbSessionRecord()265 CegoDatabaseManager::DbSessionRecord::~DbSessionRecord()
266 {
267 }
268 
getHostName() const269 const Chain& CegoDatabaseManager::DbSessionRecord::getHostName() const
270 {
271     return _hostName;
272 }
273 
getTableSet() const274 const Chain& CegoDatabaseManager::DbSessionRecord::getTableSet() const
275 {
276     return _tableSet;
277 }
278 
getUserName() const279 const Chain& CegoDatabaseManager::DbSessionRecord::getUserName() const
280 {
281     return _userName;
282 }
283 
getDbHandler() const284 CegoDistDbHandler* CegoDatabaseManager::DbSessionRecord::getDbHandler() const
285 {
286     return _pHandler;
287 }
288 
isUsed() const289 bool CegoDatabaseManager::DbSessionRecord::isUsed() const
290 {
291     return _isUsed;
292 }
setUsed(bool isUsed)293 void CegoDatabaseManager::DbSessionRecord::setUsed(bool isUsed)
294 {
295     _isUsed = isUsed;
296 }
297 
getTSLastUsed()298 unsigned long long CegoDatabaseManager::DbSessionRecord::getTSLastUsed()
299 {
300     return _lastUsed;
301 }
302 
setTSLastUsed(unsigned long long ts)303 void CegoDatabaseManager::DbSessionRecord::setTSLastUsed(unsigned long long ts)
304 {
305     _lastUsed = ts;
306 }
307 
operator =(const CegoDatabaseManager::DbSessionRecord & sr)308 CegoDatabaseManager::DbSessionRecord& CegoDatabaseManager::DbSessionRecord::operator = ( const CegoDatabaseManager::DbSessionRecord& sr)
309 {
310     _hostName = sr._hostName;
311     _tableSet = sr._tableSet;
312     _userName = sr._userName;
313     _pHandler = sr._pHandler;
314     _lastUsed = sr._lastUsed;
315     return (*this);
316 
317 }
318 
operator ==(const CegoDatabaseManager::DbSessionRecord & sr)319 bool CegoDatabaseManager::DbSessionRecord::operator == ( const CegoDatabaseManager::DbSessionRecord& sr)
320 {
321     if ( _pHandler == sr._pHandler )
322 	return true;
323     return false;
324 }
325 
326 /////////////////////////
327 // CegoDatabaseManager //
328 /////////////////////////
329 
CegoDatabaseManager(const Chain & xmlDef,const Chain & lckFileName,const Chain & logFile,const Chain & progName,CegoDbHandler::ProtocolType protType)330 CegoDatabaseManager::CegoDatabaseManager(const Chain& xmlDef, const Chain& lckFileName, const Chain& logFile, const Chain& progName, CegoDbHandler::ProtocolType protType) :  CegoBufferPool(xmlDef, logFile, progName)
331 {
332     _protType = protType;
333 
334     _lckFileName = lckFileName;
335 
336     File lckFile(_lckFileName);
337     if ( lckFile.exists() )
338     {
339 	Chain msg = Chain("Running database instance detected at lock file ") + lckFileName;
340 	throw Exception(EXLOC, msg);
341     }
342     else
343     {
344 	lckFile.open(File::WRITE);
345 	lckFile.writeChain("Locked");
346 	_nextBeat=0;
347 	lckFile.close();
348     }
349 
350     dbmLock.init(LCKMNG_LOCKWAITDELAY, __lockStatOn);
351     for ( int i=0; i<TABMNG_MAXTABSET; i++)
352     {
353 	_recoveryMode[i] = OFF;
354 	_pQueryCache[i] = 0;
355 	_pTableCache[i]= 0;
356     }
357     _nextCopyId=1;
358 
359     _logConfigured=false;
360 
361     _modId = getModId("CegoDatabaseManager");
362 }
363 
~CegoDatabaseManager()364 CegoDatabaseManager::~CegoDatabaseManager()
365 {
366     File lckFile(_lckFileName);
367     lckFile.remove();
368 
369     for ( int i=0; i<TABMNG_MAXTABSET; i++)
370     {
371 	if ( _pQueryCache[i] )
372 	    delete _pQueryCache[i];
373 	if ( _pTableCache[i] )
374 	    delete _pTableCache[i];
375     }
376 }
377 
allocateQueryCache(const Chain & tableSet)378 void CegoDatabaseManager::allocateQueryCache(const Chain& tableSet)
379 {
380     if ( getQueryCacheMode(tableSet) )
381     {
382 	int maxEntry = getMaxQueryCacheEntry(tableSet);
383 	int maxSize = getMaxQueryCacheSize(tableSet);
384 	int hashRange = getQueryCacheHashRange(tableSet);
385 
386 	if ( maxEntry > 0 && maxSize > 0 && hashRange > 0)
387 	{
388 	    int tabSetId = getTabSetId(tableSet);
389 
390 	    if ( _pQueryCache[tabSetId] )
391 		delete _pQueryCache[tabSetId];
392 
393 	    _pQueryCache[tabSetId] = new CegoQueryCache(maxEntry, maxSize, hashRange);
394 	}
395 	else
396 	{
397 	    log(_modId, Logger::NOTICE, Chain("Query Cache Size/Entry not appropriate, skipping cache allocation"));
398 	}
399     }
400 }
401 
allocateTableCache(const Chain & tableSet)402 void CegoDatabaseManager::allocateTableCache(const Chain& tableSet)
403 {
404     if ( getTableCacheMode(tableSet) )
405     {
406 
407 	int numEntry = getMaxTableCacheEntry(tableSet);
408 	int cacheSize = getMaxTableCacheSize(tableSet);
409 
410 	if ( numEntry > 0 && cacheSize > 0 )
411 	{
412 	    int tabSetId = getTabSetId(tableSet);
413 
414 	    if ( _pTableCache[tabSetId] )
415 		delete _pTableCache[tabSetId];
416 	    _pTableCache[tabSetId] = new CegoTableCache(numEntry,cacheSize, this);
417 	}
418 	else
419 	{
420 	    log(_modId, Logger::NOTICE, Chain("Table Cache Size/Entry not appropriate, skipping cache allocation"));
421 	}
422     }
423 }
424 
releaseQueryCache(const Chain & tableSet)425 void CegoDatabaseManager::releaseQueryCache(const Chain& tableSet)
426 {
427     int tabSetId = getTabSetId(tableSet);
428 
429     if ( _pQueryCache[tabSetId] )
430     {
431 	delete _pQueryCache[tabSetId];
432 	_pQueryCache[tabSetId]=0;
433     }
434 }
435 
releaseTableCache(const Chain & tableSet)436 void CegoDatabaseManager::releaseTableCache(const Chain& tableSet)
437 {
438     int tabSetId = getTabSetId(tableSet);
439 
440     if ( _pTableCache[tabSetId] )
441     {
442 	delete _pTableCache[tabSetId];
443 	_pTableCache[tabSetId]=0;
444     }
445 }
446 
cleanCache(int tabSetId,CegoObject::ObjectType objType,const Chain & objName)447 void CegoDatabaseManager::cleanCache(int tabSetId, CegoObject::ObjectType objType, const Chain& objName)
448 {
449     if ( _pQueryCache[tabSetId] )
450     {
451 	_pQueryCache[tabSetId]->invalidate(CegoObject(objType, objName, tabSetId));
452     }
453 
454     if ( _pTableCache[tabSetId] && objType == CegoObject::TABLE )
455     {
456 	_pTableCache[tabSetId]->invalidate(tabSetId, objName);
457     }
458 }
459 
getQueryCache(const Chain & tableSet) const460 CegoQueryCache* CegoDatabaseManager::getQueryCache(const Chain& tableSet) const
461 {
462     int tabSetId = getTabSetId(tableSet);
463     return _pQueryCache[tabSetId];
464 }
465 
getQueryCache(int tabSetId) const466 CegoQueryCache* CegoDatabaseManager::getQueryCache(int tabSetId) const
467 {
468     return _pQueryCache[tabSetId];
469 }
470 
getTableCache(const Chain & tableSet) const471 CegoTableCache* CegoDatabaseManager::getTableCache(const Chain& tableSet) const
472 {
473     int tabSetId = getTabSetId(tableSet);
474     return _pTableCache[tabSetId];
475 }
476 
getTableCache(int tabSetId) const477 CegoTableCache* CegoDatabaseManager::getTableCache(int tabSetId) const
478 {
479     return _pTableCache[tabSetId];
480 }
481 
beat()482 void CegoDatabaseManager::beat()
483 {
484     File lckFile(_lckFileName);
485     lckFile.open(File::WRITE);
486     lckFile.writeChain(Chain("Beat=") + Chain(_nextBeat) + Chain("\n"));
487     _nextBeat++;
488     lckFile.close();
489 }
490 
491 
checkTableSetRunState(int tabSetId)492 void CegoDatabaseManager::checkTableSetRunState(int tabSetId)
493 {
494     Chain runState = getTableSetRunState(tabSetId);
495 
496     if ( runState != Chain(XML_ONLINE_VALUE)
497 	 && runState != Chain(XML_BACKUP_VALUE)
498 	 && runState != Chain(XML_RECOVERY_VALUE)
499 	 && runState != Chain(XML_CHECKPOINT_VALUE) )
500     {
501 	Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
502 	throw Exception(EXLOC, msg);
503     }
504 }
505 
506 
PR()507 void CegoDatabaseManager::PR()
508 {
509     dbmLock.readLock(DBM_LOCKTIMEOUT);
510 }
511 
PW()512 void CegoDatabaseManager::PW()
513 {
514     dbmLock.writeLock(DBM_LOCKTIMEOUT);
515 }
516 
V()517 void CegoDatabaseManager::V()
518 {
519     dbmLock.unlock();
520 }
521 
isLoggerConfigured()522 bool CegoDatabaseManager::isLoggerConfigured()
523 {
524     return _logConfigured;
525 }
526 
getRecoveryMode(int tabSetId)527 CegoDatabaseManager::RecoveryMode CegoDatabaseManager::getRecoveryMode(int tabSetId)
528 {
529     return _recoveryMode[tabSetId];
530 }
531 
setRecoveryMode(int tabSetId,CegoDatabaseManager::RecoveryMode m)532 void CegoDatabaseManager::setRecoveryMode(int tabSetId, CegoDatabaseManager::RecoveryMode m)
533 {
534     _recoveryMode[tabSetId] = m;
535 }
536 
setAllRecoveryOff()537 void CegoDatabaseManager::setAllRecoveryOff()
538 {
539     for ( int i=0; i<TABMNG_MAXTABSET; i++)
540     {
541 	_recoveryMode[i] = OFF;
542     }
543 }
544 
startRecovery(const Chain & tableSet)545 void CegoDatabaseManager::startRecovery(const Chain& tableSet)
546 {
547     PW();
548     _recoveryList.Insert(tableSet);
549     V();
550 }
551 
startCopy(const Chain & tableSet,const Chain & targetHost,const Chain & mediatorHost,const Chain & user,const Chain & passwd,const Chain & msg)552 void CegoDatabaseManager::startCopy(const Chain& tableSet, const Chain& targetHost, const Chain& mediatorHost, const Chain& user, const Chain& passwd, const Chain& msg)
553 {
554     PW();
555     _copyList.Insert( CopyRecord( tableSet, targetHost, mediatorHost, user, passwd, msg));
556     V();
557 }
558 
nextRecovery(Chain & tableSet)559 bool CegoDatabaseManager::nextRecovery(Chain& tableSet)
560 {
561     PW();
562     Chain *pS = _recoveryList.First();
563     if ( pS )
564     {
565 	tableSet = *pS;
566 	_recoveryList.Remove(tableSet);
567 	V();
568 	return true;
569     }
570     V();
571     return false;
572 }
573 
nextCopy(int & id,Chain & tableSet,Chain & targetHost,Chain & mediatorHost,Chain & user,Chain & passwd)574 bool CegoDatabaseManager::nextCopy(int& id, Chain& tableSet, Chain& targetHost, Chain& mediatorHost, Chain& user, Chain& passwd)
575 {
576     PW();
577     CopyRecord *pCR = _copyList.First();
578     while ( pCR )
579     {
580 	if ( pCR->getId() == 0 )
581 	{
582 	    id = _nextCopyId++;
583 	    pCR->setId(id);
584 
585 	    tableSet = pCR->getTableSet();
586 	    targetHost = pCR->getTargetHost();
587 	    mediatorHost = pCR->getMediatorHost();
588 	    user = pCR->getUser();
589 	    passwd = pCR->getPasswd();
590 
591 	    // _copyList.Remove(*pCR);
592 	    V();
593 	    return true;
594 	}
595 	pCR = _copyList.Next();
596     }
597     V();
598     return false;
599 }
600 
setCopyStatus(int id,const Chain & msg)601 void CegoDatabaseManager::setCopyStatus(int id, const Chain& msg)
602 {
603     PW();
604     CopyRecord *pCR = _copyList.First();
605     while ( pCR )
606     {
607 	if ( pCR->getId() == id )
608 	{
609 	    pCR->setMsg(msg);
610 
611 	    V();
612 	    return;
613 	}
614 	pCR = _copyList.Next();
615     }
616     V();
617     return;
618 }
619 
getCopyInfo()620 Element* CegoDatabaseManager::getCopyInfo()
621 {
622     Element* pCopyInfo = new Element(XML_COPYINFO_ELEMENT);
623 
624     PR();
625     CopyRecord *pCR = _copyList.First();
626     while ( pCR )
627     {
628 
629 	Element *pN = new Element(XML_COPY_ELEMENT);
630 
631 	pN->setAttribute(XML_CID_ATTR, pCR->getId());
632 	pN->setAttribute(XML_HOSTNAME_ATTR, pCR->getTargetHost());
633 	pN->setAttribute(XML_TABLESET_ATTR, pCR->getTableSet());
634 	pN->setAttribute(XML_STATUS_ATTR, pCR->getMsg());
635 
636 	pCopyInfo->addContent(pN);
637 
638 	pCR = _copyList.Next();
639     }
640 
641     V();
642 
643     return pCopyInfo;
644 }
645 
useObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type,ObjectUseMode mode,unsigned long long tid)646 void CegoDatabaseManager::useObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type, ObjectUseMode mode, unsigned long long tid)
647 {
648     // In SHARED mode, an object can be use in parallel by several threads. Normally, this is done, if the object is accessed read only.
649 
650     // we fist check, if corresponding tableset is really not offline
651     checkTableSetRunState(tabSetId);
652 
653 
654     /*
655     Chain runState = getTableSetRunState(tabSetId);
656     if ( runState != Chain(XML_ONLINE_VALUE) && runState != Chain(XML_BACKUP_VALUE) && runState != Chain(XML_CHECKPOINT_VALUE) )
657     {
658 	Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
659 	throw Exception(EXLOC, msg);
660     }
661     */
662 
663     if ( mode == SHARED )
664     {
665 
666         PW();
667         ObjectRecord *pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
668         if ( pOR == 0 )
669         {
670             V();
671 	    Chain msg = Chain("Cannot access object <") + objName + Chain(">");
672             throw Exception(EXLOC, msg);
673         }
674 
675 	if ( pOR->numUsed() > MAX_OBJECT_USAGE )
676 	{
677             V();
678 	    Chain msg = Chain("Usage exceeded for <") + objName + Chain(">");
679             throw Exception(EXLOC, msg);
680 	}
681 
682 	if ( pOR->getTid() != 0 && pOR->getTid() == tid )
683 	{
684 	    pOR->incUsed();
685 	    V();
686 	}
687 	else
688 	{
689 	    int numTries=0;
690 
691 	    while ( pOR && pOR->getMode() == EXCLUSIVE_WRITE && numTries < DBM_MAXLOCKTRIES )
692 	    {
693 		V();
694 
695 		if ( numTries > 0 )
696 		{
697 		    if ( numTries > 1 )
698 		    {
699 			log(_modId, Logger::NOTICE, Chain("Repeated shared lock delay on ") + objName + Chain(" ( ") + Chain(numTries) + Chain(" tries )"));
700 		    }
701 
702 		    Sleeper ns;
703 		    int sec = DBM_LOCKDELAY / 1000;
704 		    if ( sec > 0 )
705 			ns.secSleep(sec);
706 		    int msec = DBM_LOCKDELAY % 1000;
707 		    if ( msec > 0 )
708 			ns.milliSleep(msec);
709 		}
710 
711 		PW();
712 
713 		pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
714 
715 		if ( pOR == 0 )
716 		{
717 		    V();
718 		    Chain msg = Chain("Cannot access object ") + objName;
719 		    throw Exception(EXLOC, msg);
720 		}
721 		numTries++;
722 	    }
723 
724 	    if ( numTries == DBM_MAXLOCKTRIES )
725 	    {
726 		V();
727 		Chain msg = Chain("Access timeout on object ") + objName;
728 		throw Exception(EXLOC, msg);
729 	    }
730 
731 	    pOR->incUsed();
732 	    V();
733 	}
734     }
735 
736     // In EXCLUSIVE_WRITE mode, an object is accessed exclusively by one thread only
737 
738     else if ( mode == EXCLUSIVE_WRITE )
739     {
740         PW();
741         ObjectRecord *pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
742         if ( pOR == 0 )
743         {
744             V();
745             Chain msg = Chain("Cannot access object ") + objName;
746             throw Exception(EXLOC, msg);
747         }
748 
749 	if ( pOR->getTid() != 0 && pOR->getTid() == tid )
750 	{
751 	    pOR->incUsed();
752 	    V();
753 	}
754 	else
755 	{
756 	    if ( pOR->getMode() == SHARED && pOR->numUsed() == 0 )
757 	    {
758 		pOR->setMode(mode);
759 		pOR->setTid(tid);
760 	    }
761 	    else
762 	    {
763 		int numTries=0;
764 		while ( pOR && pOR->numUsed() != 0 && numTries < DBM_MAXLOCKTRIES )
765 		{
766 
767 		    V();
768 
769 		    if ( numTries > 0 )
770 		    {
771 			if ( numTries > 1 )
772 			{
773 			    log(_modId, Logger::NOTICE, Chain("Repeated exclusive write lock delay on ") + objName + Chain(" ( ") + Chain(numTries) + Chain(" tries )"));
774 			}
775 
776 			Sleeper ns;
777 
778 			int sec = DBM_LOCKDELAY / 1000;
779 			if ( sec > 0 )
780 			    ns.secSleep(sec);
781 			int msec = DBM_LOCKDELAY % 1000;
782 			if ( msec > 0 )
783 			    ns.milliSleep(msec);
784 		    }
785 
786 		    PW();
787 
788 		    pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
789 		    if ( pOR == 0 )
790 		    {
791 			V();
792 			Chain msg = Chain("Cannot access object ") + objName;
793 			throw Exception(EXLOC, msg);
794 		    }
795 
796 		    numTries++;
797 
798 		}
799 
800 		if ( numTries == DBM_MAXLOCKTRIES )
801 		{
802 		    V();
803 		    Chain msg = Chain("Access timeout on object ") + objName;
804 		    throw Exception(EXLOC, msg);
805 		}
806 
807 		pOR->setMode(mode);
808 		pOR->setTid(tid);
809 
810 	    }
811 
812 	    pOR->incUsed();
813 	    V();
814 	}
815     }
816     return;
817 }
818 
unuseObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type,unsigned long long tid)819 void CegoDatabaseManager::unuseObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type, unsigned long long tid )
820 {
821 
822     // we fist check, if corresponding tableset is really not offline
823     checkTableSetRunState(tabSetId);
824 
825     /*
826     Chain runState = getTableSetRunState(tabSetId);
827     if ( runState != Chain(XML_ONLINE_VALUE) && runState != Chain(XML_BACKUP_VALUE) && runState != Chain(XML_CHECKPOINT_VALUE) )
828     {
829 	Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
830 	throw Exception(EXLOC, msg);
831     }
832     */
833 
834 
835     PW();
836     ObjectRecord *pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
837 
838     if ( pOR == 0 )
839     {
840         V();
841 	Chain msg = Chain("Cannot access object ") + objName;
842 	throw Exception(EXLOC, msg);
843 	return;
844     }
845 
846     // we just unuse object, if tid not specified or lockmode of object is SHARED or if transaction owns object lockmode of obejct id EXCLUSIVE_WRITE
847     if ( ( tid == pOR->getTid() && pOR->getMode() == EXCLUSIVE_WRITE ) || tid == 0 || pOR->getMode() == SHARED )
848     {
849 	pOR->decUsed();
850 
851 	if ( pOR->numUsed() == 0 )
852 	{
853 	    pOR->setMode(SHARED);
854 	    pOR->setTid(0);
855 	}
856     }
857     else
858     {
859         V();
860 	Chain msg = Chain("Table ") + objName + Chain(" not owned by thread");
861 	throw Exception(EXLOC, msg);
862 	return;
863     }
864 
865     V();
866 }
867 
addObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type)868 void CegoDatabaseManager::addObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type)
869 {
870     PW();
871     _objList.Insert(ObjectRecord(tabSetId, objName, type));
872     V();
873 }
874 
removeObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type)875 void CegoDatabaseManager::removeObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type)
876 {
877     PW();
878     _objList.Remove(ObjectRecord(tabSetId, objName, type));
879     V();
880 }
881 
removeAllObjects(int tabSetId)882 void CegoDatabaseManager::removeAllObjects(int tabSetId)
883 {
884     PW();
885     ObjectRecord *pOR = _objList.First();
886     while ( pOR )
887     {
888 	if ( pOR->getTabSetId() == tabSetId )
889 	{
890 	    _objList.Remove(*pOR);
891 	    pOR = _objList.First();
892 	}
893 	else
894 	{
895 	    pOR = _objList.Next();
896 	}
897     }
898     V();
899 }
900 
objectExists(int tabSetId,const Chain & objName,CegoObject::ObjectType type)901 bool CegoDatabaseManager::objectExists(int tabSetId, const Chain& objName, CegoObject::ObjectType type)
902 {
903 
904     // we fist check, if corresponding tableset is really not offline
905     checkTableSetRunState(tabSetId);
906 
907     /*
908     Chain runState = getTableSetRunState(tabSetId);
909     if ( runState != Chain(XML_ONLINE_VALUE) && runState != Chain(XML_BACKUP_VALUE) && runState != Chain(XML_CHECKPOINT_VALUE) )
910     {
911 	Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
912 	throw Exception(EXLOC, msg);
913     }
914     */
915 
916     PR();
917     ObjectRecord *pTR = _objList.Find(ObjectRecord(tabSetId, objName, type));
918     V();
919     if ( pTR )
920     {
921 	return true;
922     }
923     else
924     {
925 	return false;
926     }
927 }
928 
printObjectList()929 void CegoDatabaseManager::printObjectList()
930 {
931     PW();
932     ObjectRecord *pOR = _objList.First();
933     while ( pOR )
934     {
935 	cout << "ObjListEntry : " << pOR->getName() << " Type = " << pOR->getType() << endl;
936 	pOR = _objList.Next();
937     }
938     V();
939 }
940 
setThreadInfo(int numDbThread,int numAdmThread,int numLogThread)941 void CegoDatabaseManager::setThreadInfo(int numDbThread, int numAdmThread, int numLogThread)
942 {
943     _numDbThread = numDbThread;
944     _numAdmThread = numAdmThread;
945     _numLogThread = numLogThread;
946     _activeDbThread = 0;
947     _activeAdmThread = 0;
948     _activeLogThread = 0;
949 }
950 
getThreadInfo(int & numDbThread,int & numAdmThread,int & numLogThread,int & activeDbThread,int & activeAdmThread,int & activeLogThread)951 void CegoDatabaseManager::getThreadInfo(int& numDbThread, int& numAdmThread, int& numLogThread,
952 		   int& activeDbThread, int& activeAdmThread, int& activeLogThread)
953 {
954     numDbThread = _numDbThread;
955     numAdmThread = _numAdmThread;
956     numLogThread = _numLogThread;
957     activeDbThread = _activeDbThread;
958     activeAdmThread = _activeAdmThread;
959     activeLogThread = _activeLogThread;
960 }
961 
increaseActiveAdmThread()962 void CegoDatabaseManager::increaseActiveAdmThread()
963 {
964     _activeAdmThread++;
965 }
966 
decreaseActiveAdmThread()967 void CegoDatabaseManager::decreaseActiveAdmThread()
968 {
969     _activeAdmThread--;
970 }
971 
increaseActiveDbThread()972 void CegoDatabaseManager::increaseActiveDbThread()
973 {
974     _activeDbThread++;
975 }
976 
decreaseActiveDbThread()977 void CegoDatabaseManager::decreaseActiveDbThread()
978 {
979     _activeDbThread--;
980 }
981 
increaseActiveLogThread()982 void CegoDatabaseManager::increaseActiveLogThread()
983 {
984     _activeLogThread++;
985 }
986 
decreaseActiveLogThread()987 void CegoDatabaseManager::decreaseActiveLogThread()
988 {
989     _activeLogThread--;
990 }
991 
getDBMLockStat(Chain & lockName,int & lockCount,unsigned long long & numRdLock,unsigned long long & numWrLock,unsigned long long & sumRdDelay,unsigned long long & sumWrDelay)992 void CegoDatabaseManager::getDBMLockStat(Chain& lockName, int& lockCount, unsigned long long &numRdLock, unsigned long long &numWrLock, unsigned long long &sumRdDelay, unsigned long long &sumWrDelay)
993 {
994     lockName = dbmLock.getId();
995     lockCount= dbmLock.numLockTry();
996 
997     numRdLock = dbmLock.numReadLock();
998     numWrLock = dbmLock.numWriteLock();
999     sumRdDelay = 0;
1000     sumWrDelay = 0;
1001 
1002     if ( dbmLock.numReadLock() > 0 )
1003 	sumRdDelay = dbmLock.sumReadDelay() / LCKMNG_DELRES;
1004     if ( dbmLock.numWriteLock() > 0 )
1005 	sumWrDelay = dbmLock.sumWriteDelay() / LCKMNG_DELRES;
1006 }
1007 
allocateSession(const Chain & hostName,const Chain & tableSet,const Chain & userName,const Chain & password)1008 CegoDistDbHandler* CegoDatabaseManager::allocateSession(const Chain& hostName, const Chain& tableSet,
1009 							const Chain& userName, const Chain& password)
1010 {
1011     PW();
1012 
1013     CegoDistDbHandler* pSession = 0;
1014 
1015     try
1016     {
1017 	DbSessionRecord *pSR = _dbSessionList.First();
1018 	while ( pSR )
1019 	{
1020 	    if ( pSR->getHostName() == hostName
1021 		 && pSR->getTableSet() == tableSet
1022 		 && pSR->getUserName() == userName
1023 		 && pSR->isUsed() == false )
1024 	    {
1025 		pSR->setUsed(true);
1026 		Datetime dt;
1027 		pSR->setTSLastUsed( dt.asLong() );
1028 
1029 		V();
1030 
1031 		return pSR->getDbHandler();
1032 	    }
1033 	    pSR = _dbSessionList.Next();
1034 	}
1035 
1036 	// create new session
1037 
1038 	pSession = createSession(hostName, tableSet, userName, password);
1039 	_dbSessionList.Insert( DbSessionRecord(hostName, tableSet, userName, pSession));
1040 
1041     }
1042     catch ( Exception e )
1043     {
1044 	V();
1045 	throw Exception(EXLOC, "Cannot allocate db session", e);
1046     }
1047 
1048     V();
1049     return pSession;
1050 }
1051 
releaseSession(CegoDistDbHandler * pHandler)1052 void CegoDatabaseManager::releaseSession(CegoDistDbHandler* pHandler)
1053 {
1054     PW();
1055 
1056     DbSessionRecord *pSR = _dbSessionList.First();
1057     while ( pSR )
1058     {
1059 	if ( pSR->getDbHandler() == pHandler  )
1060 	{
1061 	    pSR->setUsed(false);
1062 
1063 	    V();
1064 
1065 	    return;
1066 	}
1067 	pSR = _dbSessionList.Next();
1068     }
1069 
1070     V();
1071 
1072     Chain msg = Chain("Cannot release session for unknown db handle");
1073     throw Exception(EXLOC, msg);
1074 }
1075 
cleanSession(int lifetime)1076 void CegoDatabaseManager::cleanSession(int lifetime)
1077 {
1078     PW();
1079 
1080     Datetime dt;
1081 
1082     DbSessionRecord *pSR = _dbSessionList.First();
1083     while ( pSR )
1084     {
1085 	if ( pSR->getTSLastUsed() <  ( dt.asLong() - lifetime )  )
1086 	{
1087 	    if ( pSR->isUsed() == false )
1088 	    {
1089 		closeSession(pSR->getDbHandler());
1090 		_dbSessionList.Remove( DbSessionRecord(pSR->getDbHandler()));
1091 		pSR = _dbSessionList.First();
1092 	    }
1093 	}
1094 	pSR = _dbSessionList.Next();
1095     }
1096 
1097     V();
1098 }
1099 
getSessionInfo(int lifetime)1100 Element* CegoDatabaseManager::getSessionInfo(int lifetime)
1101 {
1102     Element* pSessionInfo = new Element(XML_DBSESSIONINFO_ELEMENT);
1103 
1104     DbSessionRecord *pSR = _dbSessionList.First();
1105     while ( pSR )
1106     {
1107 	Element *pN = new Element(XML_DBSESSION_ELEMENT);
1108 	pN->setAttribute(XML_HOSTNAME_ATTR, pSR->getHostName());
1109 	pN->setAttribute(XML_TABLESET_ATTR, pSR->getTableSet());
1110 	pN->setAttribute(XML_USER_ATTR, pSR->getUserName());
1111 	if ( pSR->isUsed() )
1112 	    pN->setAttribute(XML_ISUSED_ATTR, XML_TRUE_VALUE);
1113 	else
1114 	    pN->setAttribute(XML_ISUSED_ATTR, XML_FALSE_VALUE);
1115 
1116 	Datetime dt;
1117 	pN->setAttribute(XML_TTL_ATTR, Chain(pSR->getTSLastUsed() + (unsigned long long)lifetime - dt.asLong()));
1118 
1119 	pSessionInfo->addContent(pN);
1120 
1121 	pSR = _dbSessionList.Next();
1122     }
1123     return pSessionInfo;
1124 }
1125 
createSession(const Chain & hostName,const Chain & tableSet,const Chain & userName,const Chain & password)1126 CegoDistDbHandler* CegoDatabaseManager::createSession(const Chain& hostName, const Chain& tableSet,
1127 						  const Chain& userName, const Chain& password)
1128 {
1129     int portNo;
1130     getDataPort(portNo);
1131 
1132     Net n( NETMNG_MSG_BUFLEN, NETMNG_SIZEBUFLEN, getMaxSendLen() );
1133     NetHandler* pN = 0;
1134     CegoDistDbHandler *pSH = 0;
1135 
1136     try
1137     {
1138 
1139 #ifdef DEBUG
1140 	log(_modId, Logger::DEBUG, Chain("Connecting to ") + Chain(hostName) + Chain(" on port ") + Chain(portNo));
1141 #endif
1142 
1143 	pN = n.connect(hostName, portNo);
1144 	pSH = new CegoDistDbHandler(pN, _protType, this);
1145 
1146 #ifdef DEBUG
1147 	log(_modId, Logger::DEBUG, Chain("Using user ") + userName + Chain("/") + password);
1148 #endif
1149 	pSH->requestSession(tableSet, userName, password, false);
1150 
1151     }
1152     catch ( Exception e )
1153     {
1154 
1155 	if ( pSH )
1156 	{
1157 	    delete pSH;
1158 	}
1159 
1160 	if ( pN )
1161 	{
1162 	    delete pN;
1163 	}
1164 
1165 	Chain msg = Chain("Cannot create session to ") + hostName;
1166 	throw Exception(EXLOC, msg);
1167     }
1168 
1169     return pSH;
1170 }
1171 
closeSession(CegoDistDbHandler * pSH)1172 void CegoDatabaseManager::closeSession(CegoDistDbHandler* pSH)
1173 {
1174 #ifdef DEBUG
1175     log(_modId, Logger::DEBUG, Chain("Closing session ..."));
1176 #endif
1177 
1178     pSH->closeSession();
1179 
1180     NetHandler *pN = pSH->getNetHandler();
1181 
1182     delete pSH;
1183     delete pN;
1184 }
1185 
configureLogger(Logger::LogLevel level)1186 void CegoDatabaseManager::configureLogger(Logger::LogLevel level)
1187 {
1188     for ( int i=1; i< getMapSize() ; i++)
1189     {
1190 	logModule(i, getModName(i), level);
1191     }
1192     _logConfigured=true;
1193 }
1194 
configureLogger()1195 void CegoDatabaseManager::configureLogger()
1196 {
1197     ListT<Chain> modList;
1198     _logConfigured =  getModuleList(modList);
1199 
1200     Chain *pMod = modList.First();
1201     while ( pMod )
1202     {
1203 	if ( (Chain)pMod->toUpper() == Chain("ALL"))
1204 	{
1205 	    Logger::LogLevel level = getLogLevel(*pMod);
1206 	    for ( int i=1; i< getMapSize() ; i++)
1207 	    {
1208 		logModule(i, getModName(i), level);
1209 	    }
1210 	}
1211 	else
1212 	{
1213 	    unsigned long modId = getModId(*pMod);
1214 	    logModule(modId, *pMod, getLogLevel(*pMod));
1215 	}
1216 	pMod = modList.Next();
1217     }
1218 }
1219 
verifyJDBC(const Chain & user)1220 bool CegoDatabaseManager::verifyJDBC(const Chain& user)
1221 {
1222     SetT<Chain> roleSet;
1223     getRoleSet(user, roleSet);
1224     return roleSet.Find(Chain(ROLE_JDBC));
1225 }
1226 
verifyAccess(const int tabSetId,const Chain & objName,CegoObject::ObjectType type,CegoXMLSpace::AccessMode mode,const Chain & user)1227 bool CegoDatabaseManager::verifyAccess(const int tabSetId, const Chain& objName, CegoObject::ObjectType type, CegoXMLSpace::AccessMode mode, const Chain& user)
1228 {
1229     SetT<Chain> roleSet;
1230 
1231     getRoleSet(user, roleSet);
1232     Chain tableSet = getTabSetName(tabSetId);
1233 
1234     Chain *pRole = roleSet.First();
1235     while ( pRole )
1236     {
1237 
1238 	Chain objPattern = objName;
1239 
1240 	if ( matchRole( *pRole, tableSet, objPattern, mode ) )
1241 	    return true;
1242 	pRole = roleSet.Next();
1243     }
1244     return false;
1245 }
1246 
initLogFiles(const Chain & tableSet,bool overwrite)1247 void CegoDatabaseManager::initLogFiles(const Chain& tableSet, bool overwrite)
1248 {
1249     ListT<Chain> lfList;
1250     ListT<int> sizeList;
1251     ListT<Chain> statusList;
1252 
1253     int tabSetId = getTabSetId(tableSet);
1254     getLogFileInfo(tableSet, lfList, sizeList, statusList);
1255 
1256     Chain *pLog = lfList.First();
1257     int *pSize = sizeList.First();
1258 
1259     bool isFirst = true;
1260     while ( pLog )
1261     {
1262 
1263 	if ( isFirst )
1264 	    setLogFileStatus(tableSet, *pLog, XML_ACTIVE_VALUE);
1265 	else
1266 	    setLogFileStatus(tableSet, *pLog,  XML_FREE_VALUE);
1267 
1268 	isFirst=false;
1269 
1270 	log(_modId, Logger::NOTICE, Chain("Initializing logfile ") + *pLog + Chain(" ..."));
1271 
1272 	if ( overwrite == false )
1273 	{
1274 	    File checkLog(*pLog);
1275 	    if ( checkLog.exists() )
1276 	    {
1277 		Chain msg = Chain("Cannot initialize logfile <") + *pLog + Chain(">, file already exists");
1278 		throw Exception(EXLOC, msg);
1279 	    }
1280 	}
1281 
1282 	setLogFile(tabSetId, *pLog, false);
1283 	initLog(tabSetId, *pSize);
1284 
1285 	pLog = lfList.Next();
1286 	pSize = sizeList.Next();
1287 
1288     }
1289 }
1290 
releaseLogFiles(const Chain & tableSet,bool waitForArch)1291 void CegoDatabaseManager::releaseLogFiles(const Chain& tableSet, bool waitForArch)
1292 {
1293     ListT<Chain> lfList;
1294     ListT<int> sizeList;
1295     ListT<Chain> statusList;
1296 
1297     int tabSetId = getTabSetId(tableSet);
1298     getLogFileInfo(tableSet, lfList, sizeList, statusList);
1299 
1300     Chain *pLog = lfList.First();
1301     Chain *pStatus = statusList.First();
1302 
1303     while ( pLog && pStatus )
1304     {
1305         if ( *pStatus == Chain(XML_ACTIVE_VALUE) && File::exists(*pLog) )
1306         {
1307             setLogFile(tabSetId, *pLog, true);
1308             unsigned long long minlsn = getMinLSN(tabSetId);
1309             if ( minlsn > 0 )
1310             {
1311                 log(_modId, Logger::NOTICE, Chain("Releasing logfile ") + *pLog + Chain(" LSN=") + Chain(minlsn));
1312                 setLogFileStatus(tableSet, *pLog, XML_OCCUPIED_VALUE);
1313             }
1314         }
1315         pStatus = statusList.Next();
1316         pLog = lfList.Next();
1317     }
1318 
1319     if ( waitForArch )
1320     {
1321 	bool notArchived = true;
1322 
1323 	while ( notArchived )
1324 	{
1325 
1326 	    log(_modId, Logger::NOTICE, Chain("Waiting for archive ... "));
1327 
1328 	    ListT<Chain> lfList;
1329 	    ListT<int> sizeList;
1330 	    ListT<Chain> statusList;
1331 
1332 	    getLogFileInfo(tableSet, lfList, sizeList, statusList);
1333 
1334 	    notArchived = false;
1335 
1336 	    Chain *pStatus = statusList.First();
1337             Chain *pLog = lfList.First();
1338             while ( pLog && pStatus )
1339             {
1340                 if ( *pStatus != Chain(XML_FREE_VALUE) && File::exists(*pLog) )
1341                     notArchived = true;
1342                 pStatus = statusList.Next();
1343                 pLog = lfList.Next();
1344             }
1345 
1346 	    lfList.Empty();
1347 	    sizeList.Empty();
1348 	    statusList.Empty();
1349 
1350 	    Sleeper s;
1351 	    s.secSleep(LOGMNG_RECOVERY_DELAY);
1352 	}
1353     }
1354 }
1355 
getProtType() const1356 CegoDbHandler::ProtocolType CegoDatabaseManager::getProtType() const
1357 {
1358     return _protType;
1359 }
1360