1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // CegoDatabaseManager.cc
4 // ----------------------
5 // Cego Database Manager implementation
6 //
7 // Design and Implementation by Bjoern Lemke
8 //
9 // (C)opyright 2000-2019 Bjoern Lemke
10 //
11 // IMPLEMENTATION MODULE
12 //
13 // Class: CegoDatabaseManager
14 //
15 // Description: General database manager
16 //
17 // Status: CLEAN
18 //
19 ///////////////////////////////////////////////////////////////////////////////
20
21 // LFC INCLUDES
22 #include <lfcbase/ThreadLock.h>
23 #include <lfcbase/File.h>
24 #include <lfcbase/Datetime.h>
25 #include <lfcbase/Sleeper.h>
26 #include <lfcbase/Net.h>
27 #include <lfcbase/NetHandler.h>
28 #include <lfcxml/XMLSuite.h>
29 #include <lfcxml/Element.h>
30 #include <lfcxml/Document.h>
31
32 // CEGO INCLUDES
33 #include "CegoDatabaseManager.h"
34 #include "CegoTypeConverter.h"
35 #include "CegoDefs.h"
36 #include "CegoXMLdef.h"
37 #include "CegoXMLSpace.h"
38
39 // POSIX INCLUDES
40 #include <string.h>
41 #include <stdlib.h>
42
43 static ThreadLock dbmLock("DBM");
44 extern bool __lockStatOn;
45
46 //////////////////
47 // ObjectRecord //
48 //////////////////
49
ObjectRecord()50 CegoDatabaseManager::ObjectRecord::ObjectRecord()
51 {
52 }
53
ObjectRecord(const int tabSetId,const Chain & objName,CegoObject::ObjectType type)54 CegoDatabaseManager::ObjectRecord::ObjectRecord(const int tabSetId, const Chain& objName, CegoObject::ObjectType type)
55 {
56 _objName = objName;
57 _type = type;
58 _tabSetId = tabSetId;
59 _numUsed = 0;
60 _mode = SHARED;
61 _tid = 0;
62 }
63
~ObjectRecord()64 CegoDatabaseManager::ObjectRecord::~ObjectRecord()
65 {
66 }
67
getTabSetId() const68 const int CegoDatabaseManager::ObjectRecord::getTabSetId() const
69 {
70 return _tabSetId;
71 }
72
incUsed()73 void CegoDatabaseManager::ObjectRecord::incUsed()
74 {
75 _numUsed++;
76 }
77
decUsed()78 void CegoDatabaseManager::ObjectRecord::decUsed()
79 {
80 if ( _numUsed > 0 )
81 _numUsed--;
82 }
83
numUsed() const84 int CegoDatabaseManager::ObjectRecord::numUsed() const
85 {
86 return _numUsed;
87 }
88
setMode(ObjectUseMode mode)89 void CegoDatabaseManager::ObjectRecord::setMode(ObjectUseMode mode)
90 {
91 _mode = mode;
92 }
93
getMode() const94 CegoDatabaseManager::ObjectUseMode CegoDatabaseManager::ObjectRecord::getMode() const
95 {
96 return _mode;
97 }
98
setTid(unsigned long long tid)99 void CegoDatabaseManager::ObjectRecord::setTid(unsigned long long tid)
100 {
101 _tid = tid;
102 }
103
getTid() const104 unsigned long long CegoDatabaseManager::ObjectRecord::getTid() const
105 {
106 return _tid;
107 }
108
getName() const109 const Chain& CegoDatabaseManager::ObjectRecord::getName() const
110 {
111 return _objName;
112 }
113
getType() const114 const CegoObject::ObjectType CegoDatabaseManager::ObjectRecord::getType() const
115 {
116 return _type;
117 }
118
operator =(const CegoDatabaseManager::ObjectRecord & t)119 CegoDatabaseManager::ObjectRecord& CegoDatabaseManager::ObjectRecord::operator = ( const CegoDatabaseManager::ObjectRecord& t)
120 {
121 _tabSetId = t._tabSetId;
122 _objName = t._objName;
123 _type = t._type;
124 _numUsed = t._numUsed;
125 _mode = t._mode;
126 _tid = t._tid;
127 return (*this);
128 }
129
operator ==(const CegoDatabaseManager::ObjectRecord & r)130 bool CegoDatabaseManager::ObjectRecord::operator == ( const CegoDatabaseManager::ObjectRecord& r)
131 {
132 bool typeMatch=false;
133 if ( ( _type == CegoObject::PAVLTREE || _type == CegoObject::UAVLTREE || _type == CegoObject::AVLTREE )
134 && ( r._type == CegoObject::PAVLTREE || r._type == CegoObject::UAVLTREE || r._type == CegoObject::AVLTREE ) )
135 {
136 typeMatch=true;
137 }
138 else if ( ( _type == CegoObject::PBTREE || _type == CegoObject::UBTREE || _type == CegoObject::BTREE )
139 && ( r._type == CegoObject::PBTREE || r._type == CegoObject::UBTREE || r._type == CegoObject::BTREE ) )
140 {
141 typeMatch=true;
142 }
143 else
144 {
145 typeMatch = _type == r._type;
146 }
147
148 if ( _tabSetId == r._tabSetId && _objName == r._objName && typeMatch )
149 return true;
150 return false;
151 }
152
153 //////////////////
154 // CopyRecord //
155 //////////////////
156
CopyRecord()157 CegoDatabaseManager::CopyRecord::CopyRecord()
158 {
159 }
160
CopyRecord(const Chain & tableSet,const Chain & targetHost,const Chain & mediatorHost,const Chain & user,const Chain & passwd,const Chain & msg)161 CegoDatabaseManager::CopyRecord::CopyRecord(const Chain& tableSet, const Chain& targetHost, const Chain& mediatorHost, const Chain& user, const Chain& passwd, const Chain& msg)
162 {
163 _id = 0;
164 _tableSet = tableSet;
165 _targetHost = targetHost;
166 _mediatorHost = mediatorHost;
167 _user = user;
168 _passwd = passwd;
169 _msg = msg;
170 }
171
~CopyRecord()172 CegoDatabaseManager::CopyRecord::~CopyRecord()
173 {
174 }
175
getTableSet() const176 const Chain& CegoDatabaseManager::CopyRecord::getTableSet() const
177 {
178 return _tableSet;
179 }
180
getTargetHost() const181 const Chain& CegoDatabaseManager::CopyRecord::getTargetHost() const
182 {
183 return _targetHost;
184 }
185
getMediatorHost() const186 const Chain& CegoDatabaseManager::CopyRecord::getMediatorHost() const
187 {
188 return _mediatorHost;
189 }
190
getUser() const191 const Chain& CegoDatabaseManager::CopyRecord::getUser() const
192 {
193 return _user;
194 }
195
getPasswd() const196 const Chain& CegoDatabaseManager::CopyRecord::getPasswd() const
197 {
198 return _passwd;
199 }
200
setMsg(const Chain & msg)201 void CegoDatabaseManager::CopyRecord::setMsg(const Chain& msg)
202 {
203 _msg = msg;
204 }
205
getMsg() const206 const Chain& CegoDatabaseManager::CopyRecord::getMsg() const
207 {
208 return _msg;
209 }
210
getId() const211 int CegoDatabaseManager::CopyRecord::getId() const
212 {
213 return _id;
214 }
215
setId(int id)216 void CegoDatabaseManager::CopyRecord::setId(int id)
217 {
218 _id = id;
219 }
220
operator =(const CegoDatabaseManager::CopyRecord & cr)221 CegoDatabaseManager::CopyRecord& CegoDatabaseManager::CopyRecord::operator = ( const CegoDatabaseManager::CopyRecord& cr)
222 {
223 _id = cr._id;
224 _tableSet = cr._tableSet;
225 _targetHost = cr._targetHost;
226 _mediatorHost = cr._mediatorHost;
227
228 _user = cr._user;
229 _passwd = cr._passwd;
230 _msg = cr._msg;
231
232 return (*this);
233 }
234
operator ==(const CegoDatabaseManager::CopyRecord & cr)235 bool CegoDatabaseManager::CopyRecord::operator == ( const CegoDatabaseManager::CopyRecord& cr)
236 {
237 if ( _tableSet == cr._tableSet && _targetHost == cr._targetHost)
238 return true;
239 return false;
240 }
241
242 /////////////////////
243 // DbSessionRecord //
244 /////////////////////
245
DbSessionRecord()246 CegoDatabaseManager::DbSessionRecord::DbSessionRecord()
247 {
248 }
249
DbSessionRecord(CegoDistDbHandler * pHandler)250 CegoDatabaseManager::DbSessionRecord::DbSessionRecord(CegoDistDbHandler* pHandler)
251 {
252 _pHandler = pHandler;
253 }
254
DbSessionRecord(const Chain & hostName,const Chain & tableSet,const Chain & userName,CegoDistDbHandler * pHandler)255 CegoDatabaseManager::DbSessionRecord::DbSessionRecord(const Chain& hostName, const Chain& tableSet,
256 const Chain& userName, CegoDistDbHandler* pHandler)
257 {
258 _hostName = hostName;
259 _tableSet = tableSet;
260 _userName = userName;
261 _pHandler = pHandler;
262 _lastUsed = Datetime().asLong();
263 }
264
~DbSessionRecord()265 CegoDatabaseManager::DbSessionRecord::~DbSessionRecord()
266 {
267 }
268
getHostName() const269 const Chain& CegoDatabaseManager::DbSessionRecord::getHostName() const
270 {
271 return _hostName;
272 }
273
getTableSet() const274 const Chain& CegoDatabaseManager::DbSessionRecord::getTableSet() const
275 {
276 return _tableSet;
277 }
278
getUserName() const279 const Chain& CegoDatabaseManager::DbSessionRecord::getUserName() const
280 {
281 return _userName;
282 }
283
getDbHandler() const284 CegoDistDbHandler* CegoDatabaseManager::DbSessionRecord::getDbHandler() const
285 {
286 return _pHandler;
287 }
288
isUsed() const289 bool CegoDatabaseManager::DbSessionRecord::isUsed() const
290 {
291 return _isUsed;
292 }
setUsed(bool isUsed)293 void CegoDatabaseManager::DbSessionRecord::setUsed(bool isUsed)
294 {
295 _isUsed = isUsed;
296 }
297
getTSLastUsed()298 unsigned long long CegoDatabaseManager::DbSessionRecord::getTSLastUsed()
299 {
300 return _lastUsed;
301 }
302
setTSLastUsed(unsigned long long ts)303 void CegoDatabaseManager::DbSessionRecord::setTSLastUsed(unsigned long long ts)
304 {
305 _lastUsed = ts;
306 }
307
operator =(const CegoDatabaseManager::DbSessionRecord & sr)308 CegoDatabaseManager::DbSessionRecord& CegoDatabaseManager::DbSessionRecord::operator = ( const CegoDatabaseManager::DbSessionRecord& sr)
309 {
310 _hostName = sr._hostName;
311 _tableSet = sr._tableSet;
312 _userName = sr._userName;
313 _pHandler = sr._pHandler;
314 _lastUsed = sr._lastUsed;
315 return (*this);
316
317 }
318
operator ==(const CegoDatabaseManager::DbSessionRecord & sr)319 bool CegoDatabaseManager::DbSessionRecord::operator == ( const CegoDatabaseManager::DbSessionRecord& sr)
320 {
321 if ( _pHandler == sr._pHandler )
322 return true;
323 return false;
324 }
325
326 /////////////////////////
327 // CegoDatabaseManager //
328 /////////////////////////
329
CegoDatabaseManager(const Chain & xmlDef,const Chain & lckFileName,const Chain & logFile,const Chain & progName,CegoDbHandler::ProtocolType protType)330 CegoDatabaseManager::CegoDatabaseManager(const Chain& xmlDef, const Chain& lckFileName, const Chain& logFile, const Chain& progName, CegoDbHandler::ProtocolType protType) : CegoBufferPool(xmlDef, logFile, progName)
331 {
332 _protType = protType;
333
334 _lckFileName = lckFileName;
335
336 File lckFile(_lckFileName);
337 if ( lckFile.exists() )
338 {
339 Chain msg = Chain("Running database instance detected at lock file ") + lckFileName;
340 throw Exception(EXLOC, msg);
341 }
342 else
343 {
344 lckFile.open(File::WRITE);
345 lckFile.writeChain("Locked");
346 _nextBeat=0;
347 lckFile.close();
348 }
349
350 dbmLock.init(LCKMNG_LOCKWAITDELAY, __lockStatOn);
351 for ( int i=0; i<TABMNG_MAXTABSET; i++)
352 {
353 _recoveryMode[i] = OFF;
354 _pQueryCache[i] = 0;
355 _pTableCache[i]= 0;
356 }
357 _nextCopyId=1;
358
359 _logConfigured=false;
360
361 _modId = getModId("CegoDatabaseManager");
362 }
363
~CegoDatabaseManager()364 CegoDatabaseManager::~CegoDatabaseManager()
365 {
366 File lckFile(_lckFileName);
367 lckFile.remove();
368
369 for ( int i=0; i<TABMNG_MAXTABSET; i++)
370 {
371 if ( _pQueryCache[i] )
372 delete _pQueryCache[i];
373 if ( _pTableCache[i] )
374 delete _pTableCache[i];
375 }
376 }
377
allocateQueryCache(const Chain & tableSet)378 void CegoDatabaseManager::allocateQueryCache(const Chain& tableSet)
379 {
380 if ( getQueryCacheMode(tableSet) )
381 {
382 int maxEntry = getMaxQueryCacheEntry(tableSet);
383 int maxSize = getMaxQueryCacheSize(tableSet);
384 int hashRange = getQueryCacheHashRange(tableSet);
385
386 if ( maxEntry > 0 && maxSize > 0 && hashRange > 0)
387 {
388 int tabSetId = getTabSetId(tableSet);
389
390 if ( _pQueryCache[tabSetId] )
391 delete _pQueryCache[tabSetId];
392
393 _pQueryCache[tabSetId] = new CegoQueryCache(maxEntry, maxSize, hashRange);
394 }
395 else
396 {
397 log(_modId, Logger::NOTICE, Chain("Query Cache Size/Entry not appropriate, skipping cache allocation"));
398 }
399 }
400 }
401
allocateTableCache(const Chain & tableSet)402 void CegoDatabaseManager::allocateTableCache(const Chain& tableSet)
403 {
404 if ( getTableCacheMode(tableSet) )
405 {
406
407 int numEntry = getMaxTableCacheEntry(tableSet);
408 int cacheSize = getMaxTableCacheSize(tableSet);
409
410 if ( numEntry > 0 && cacheSize > 0 )
411 {
412 int tabSetId = getTabSetId(tableSet);
413
414 if ( _pTableCache[tabSetId] )
415 delete _pTableCache[tabSetId];
416 _pTableCache[tabSetId] = new CegoTableCache(numEntry,cacheSize, this);
417 }
418 else
419 {
420 log(_modId, Logger::NOTICE, Chain("Table Cache Size/Entry not appropriate, skipping cache allocation"));
421 }
422 }
423 }
424
releaseQueryCache(const Chain & tableSet)425 void CegoDatabaseManager::releaseQueryCache(const Chain& tableSet)
426 {
427 int tabSetId = getTabSetId(tableSet);
428
429 if ( _pQueryCache[tabSetId] )
430 {
431 delete _pQueryCache[tabSetId];
432 _pQueryCache[tabSetId]=0;
433 }
434 }
435
releaseTableCache(const Chain & tableSet)436 void CegoDatabaseManager::releaseTableCache(const Chain& tableSet)
437 {
438 int tabSetId = getTabSetId(tableSet);
439
440 if ( _pTableCache[tabSetId] )
441 {
442 delete _pTableCache[tabSetId];
443 _pTableCache[tabSetId]=0;
444 }
445 }
446
cleanCache(int tabSetId,CegoObject::ObjectType objType,const Chain & objName)447 void CegoDatabaseManager::cleanCache(int tabSetId, CegoObject::ObjectType objType, const Chain& objName)
448 {
449 if ( _pQueryCache[tabSetId] )
450 {
451 _pQueryCache[tabSetId]->invalidate(CegoObject(objType, objName, tabSetId));
452 }
453
454 if ( _pTableCache[tabSetId] && objType == CegoObject::TABLE )
455 {
456 _pTableCache[tabSetId]->invalidate(tabSetId, objName);
457 }
458 }
459
getQueryCache(const Chain & tableSet) const460 CegoQueryCache* CegoDatabaseManager::getQueryCache(const Chain& tableSet) const
461 {
462 int tabSetId = getTabSetId(tableSet);
463 return _pQueryCache[tabSetId];
464 }
465
getQueryCache(int tabSetId) const466 CegoQueryCache* CegoDatabaseManager::getQueryCache(int tabSetId) const
467 {
468 return _pQueryCache[tabSetId];
469 }
470
getTableCache(const Chain & tableSet) const471 CegoTableCache* CegoDatabaseManager::getTableCache(const Chain& tableSet) const
472 {
473 int tabSetId = getTabSetId(tableSet);
474 return _pTableCache[tabSetId];
475 }
476
getTableCache(int tabSetId) const477 CegoTableCache* CegoDatabaseManager::getTableCache(int tabSetId) const
478 {
479 return _pTableCache[tabSetId];
480 }
481
beat()482 void CegoDatabaseManager::beat()
483 {
484 File lckFile(_lckFileName);
485 lckFile.open(File::WRITE);
486 lckFile.writeChain(Chain("Beat=") + Chain(_nextBeat) + Chain("\n"));
487 _nextBeat++;
488 lckFile.close();
489 }
490
491
checkTableSetRunState(int tabSetId)492 void CegoDatabaseManager::checkTableSetRunState(int tabSetId)
493 {
494 Chain runState = getTableSetRunState(tabSetId);
495
496 if ( runState != Chain(XML_ONLINE_VALUE)
497 && runState != Chain(XML_BACKUP_VALUE)
498 && runState != Chain(XML_RECOVERY_VALUE)
499 && runState != Chain(XML_CHECKPOINT_VALUE) )
500 {
501 Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
502 throw Exception(EXLOC, msg);
503 }
504 }
505
506
PR()507 void CegoDatabaseManager::PR()
508 {
509 dbmLock.readLock(DBM_LOCKTIMEOUT);
510 }
511
PW()512 void CegoDatabaseManager::PW()
513 {
514 dbmLock.writeLock(DBM_LOCKTIMEOUT);
515 }
516
V()517 void CegoDatabaseManager::V()
518 {
519 dbmLock.unlock();
520 }
521
isLoggerConfigured()522 bool CegoDatabaseManager::isLoggerConfigured()
523 {
524 return _logConfigured;
525 }
526
getRecoveryMode(int tabSetId)527 CegoDatabaseManager::RecoveryMode CegoDatabaseManager::getRecoveryMode(int tabSetId)
528 {
529 return _recoveryMode[tabSetId];
530 }
531
setRecoveryMode(int tabSetId,CegoDatabaseManager::RecoveryMode m)532 void CegoDatabaseManager::setRecoveryMode(int tabSetId, CegoDatabaseManager::RecoveryMode m)
533 {
534 _recoveryMode[tabSetId] = m;
535 }
536
setAllRecoveryOff()537 void CegoDatabaseManager::setAllRecoveryOff()
538 {
539 for ( int i=0; i<TABMNG_MAXTABSET; i++)
540 {
541 _recoveryMode[i] = OFF;
542 }
543 }
544
startRecovery(const Chain & tableSet)545 void CegoDatabaseManager::startRecovery(const Chain& tableSet)
546 {
547 PW();
548 _recoveryList.Insert(tableSet);
549 V();
550 }
551
startCopy(const Chain & tableSet,const Chain & targetHost,const Chain & mediatorHost,const Chain & user,const Chain & passwd,const Chain & msg)552 void CegoDatabaseManager::startCopy(const Chain& tableSet, const Chain& targetHost, const Chain& mediatorHost, const Chain& user, const Chain& passwd, const Chain& msg)
553 {
554 PW();
555 _copyList.Insert( CopyRecord( tableSet, targetHost, mediatorHost, user, passwd, msg));
556 V();
557 }
558
nextRecovery(Chain & tableSet)559 bool CegoDatabaseManager::nextRecovery(Chain& tableSet)
560 {
561 PW();
562 Chain *pS = _recoveryList.First();
563 if ( pS )
564 {
565 tableSet = *pS;
566 _recoveryList.Remove(tableSet);
567 V();
568 return true;
569 }
570 V();
571 return false;
572 }
573
nextCopy(int & id,Chain & tableSet,Chain & targetHost,Chain & mediatorHost,Chain & user,Chain & passwd)574 bool CegoDatabaseManager::nextCopy(int& id, Chain& tableSet, Chain& targetHost, Chain& mediatorHost, Chain& user, Chain& passwd)
575 {
576 PW();
577 CopyRecord *pCR = _copyList.First();
578 while ( pCR )
579 {
580 if ( pCR->getId() == 0 )
581 {
582 id = _nextCopyId++;
583 pCR->setId(id);
584
585 tableSet = pCR->getTableSet();
586 targetHost = pCR->getTargetHost();
587 mediatorHost = pCR->getMediatorHost();
588 user = pCR->getUser();
589 passwd = pCR->getPasswd();
590
591 // _copyList.Remove(*pCR);
592 V();
593 return true;
594 }
595 pCR = _copyList.Next();
596 }
597 V();
598 return false;
599 }
600
setCopyStatus(int id,const Chain & msg)601 void CegoDatabaseManager::setCopyStatus(int id, const Chain& msg)
602 {
603 PW();
604 CopyRecord *pCR = _copyList.First();
605 while ( pCR )
606 {
607 if ( pCR->getId() == id )
608 {
609 pCR->setMsg(msg);
610
611 V();
612 return;
613 }
614 pCR = _copyList.Next();
615 }
616 V();
617 return;
618 }
619
getCopyInfo()620 Element* CegoDatabaseManager::getCopyInfo()
621 {
622 Element* pCopyInfo = new Element(XML_COPYINFO_ELEMENT);
623
624 PR();
625 CopyRecord *pCR = _copyList.First();
626 while ( pCR )
627 {
628
629 Element *pN = new Element(XML_COPY_ELEMENT);
630
631 pN->setAttribute(XML_CID_ATTR, pCR->getId());
632 pN->setAttribute(XML_HOSTNAME_ATTR, pCR->getTargetHost());
633 pN->setAttribute(XML_TABLESET_ATTR, pCR->getTableSet());
634 pN->setAttribute(XML_STATUS_ATTR, pCR->getMsg());
635
636 pCopyInfo->addContent(pN);
637
638 pCR = _copyList.Next();
639 }
640
641 V();
642
643 return pCopyInfo;
644 }
645
useObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type,ObjectUseMode mode,unsigned long long tid)646 void CegoDatabaseManager::useObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type, ObjectUseMode mode, unsigned long long tid)
647 {
648 // In SHARED mode, an object can be use in parallel by several threads. Normally, this is done, if the object is accessed read only.
649
650 // we fist check, if corresponding tableset is really not offline
651 checkTableSetRunState(tabSetId);
652
653
654 /*
655 Chain runState = getTableSetRunState(tabSetId);
656 if ( runState != Chain(XML_ONLINE_VALUE) && runState != Chain(XML_BACKUP_VALUE) && runState != Chain(XML_CHECKPOINT_VALUE) )
657 {
658 Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
659 throw Exception(EXLOC, msg);
660 }
661 */
662
663 if ( mode == SHARED )
664 {
665
666 PW();
667 ObjectRecord *pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
668 if ( pOR == 0 )
669 {
670 V();
671 Chain msg = Chain("Cannot access object <") + objName + Chain(">");
672 throw Exception(EXLOC, msg);
673 }
674
675 if ( pOR->numUsed() > MAX_OBJECT_USAGE )
676 {
677 V();
678 Chain msg = Chain("Usage exceeded for <") + objName + Chain(">");
679 throw Exception(EXLOC, msg);
680 }
681
682 if ( pOR->getTid() != 0 && pOR->getTid() == tid )
683 {
684 pOR->incUsed();
685 V();
686 }
687 else
688 {
689 int numTries=0;
690
691 while ( pOR && pOR->getMode() == EXCLUSIVE_WRITE && numTries < DBM_MAXLOCKTRIES )
692 {
693 V();
694
695 if ( numTries > 0 )
696 {
697 if ( numTries > 1 )
698 {
699 log(_modId, Logger::NOTICE, Chain("Repeated shared lock delay on ") + objName + Chain(" ( ") + Chain(numTries) + Chain(" tries )"));
700 }
701
702 Sleeper ns;
703 int sec = DBM_LOCKDELAY / 1000;
704 if ( sec > 0 )
705 ns.secSleep(sec);
706 int msec = DBM_LOCKDELAY % 1000;
707 if ( msec > 0 )
708 ns.milliSleep(msec);
709 }
710
711 PW();
712
713 pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
714
715 if ( pOR == 0 )
716 {
717 V();
718 Chain msg = Chain("Cannot access object ") + objName;
719 throw Exception(EXLOC, msg);
720 }
721 numTries++;
722 }
723
724 if ( numTries == DBM_MAXLOCKTRIES )
725 {
726 V();
727 Chain msg = Chain("Access timeout on object ") + objName;
728 throw Exception(EXLOC, msg);
729 }
730
731 pOR->incUsed();
732 V();
733 }
734 }
735
736 // In EXCLUSIVE_WRITE mode, an object is accessed exclusively by one thread only
737
738 else if ( mode == EXCLUSIVE_WRITE )
739 {
740 PW();
741 ObjectRecord *pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
742 if ( pOR == 0 )
743 {
744 V();
745 Chain msg = Chain("Cannot access object ") + objName;
746 throw Exception(EXLOC, msg);
747 }
748
749 if ( pOR->getTid() != 0 && pOR->getTid() == tid )
750 {
751 pOR->incUsed();
752 V();
753 }
754 else
755 {
756 if ( pOR->getMode() == SHARED && pOR->numUsed() == 0 )
757 {
758 pOR->setMode(mode);
759 pOR->setTid(tid);
760 }
761 else
762 {
763 int numTries=0;
764 while ( pOR && pOR->numUsed() != 0 && numTries < DBM_MAXLOCKTRIES )
765 {
766
767 V();
768
769 if ( numTries > 0 )
770 {
771 if ( numTries > 1 )
772 {
773 log(_modId, Logger::NOTICE, Chain("Repeated exclusive write lock delay on ") + objName + Chain(" ( ") + Chain(numTries) + Chain(" tries )"));
774 }
775
776 Sleeper ns;
777
778 int sec = DBM_LOCKDELAY / 1000;
779 if ( sec > 0 )
780 ns.secSleep(sec);
781 int msec = DBM_LOCKDELAY % 1000;
782 if ( msec > 0 )
783 ns.milliSleep(msec);
784 }
785
786 PW();
787
788 pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
789 if ( pOR == 0 )
790 {
791 V();
792 Chain msg = Chain("Cannot access object ") + objName;
793 throw Exception(EXLOC, msg);
794 }
795
796 numTries++;
797
798 }
799
800 if ( numTries == DBM_MAXLOCKTRIES )
801 {
802 V();
803 Chain msg = Chain("Access timeout on object ") + objName;
804 throw Exception(EXLOC, msg);
805 }
806
807 pOR->setMode(mode);
808 pOR->setTid(tid);
809
810 }
811
812 pOR->incUsed();
813 V();
814 }
815 }
816 return;
817 }
818
unuseObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type,unsigned long long tid)819 void CegoDatabaseManager::unuseObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type, unsigned long long tid )
820 {
821
822 // we fist check, if corresponding tableset is really not offline
823 checkTableSetRunState(tabSetId);
824
825 /*
826 Chain runState = getTableSetRunState(tabSetId);
827 if ( runState != Chain(XML_ONLINE_VALUE) && runState != Chain(XML_BACKUP_VALUE) && runState != Chain(XML_CHECKPOINT_VALUE) )
828 {
829 Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
830 throw Exception(EXLOC, msg);
831 }
832 */
833
834
835 PW();
836 ObjectRecord *pOR = _objList.Find(ObjectRecord(tabSetId, objName, type));
837
838 if ( pOR == 0 )
839 {
840 V();
841 Chain msg = Chain("Cannot access object ") + objName;
842 throw Exception(EXLOC, msg);
843 return;
844 }
845
846 // we just unuse object, if tid not specified or lockmode of object is SHARED or if transaction owns object lockmode of obejct id EXCLUSIVE_WRITE
847 if ( ( tid == pOR->getTid() && pOR->getMode() == EXCLUSIVE_WRITE ) || tid == 0 || pOR->getMode() == SHARED )
848 {
849 pOR->decUsed();
850
851 if ( pOR->numUsed() == 0 )
852 {
853 pOR->setMode(SHARED);
854 pOR->setTid(0);
855 }
856 }
857 else
858 {
859 V();
860 Chain msg = Chain("Table ") + objName + Chain(" not owned by thread");
861 throw Exception(EXLOC, msg);
862 return;
863 }
864
865 V();
866 }
867
addObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type)868 void CegoDatabaseManager::addObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type)
869 {
870 PW();
871 _objList.Insert(ObjectRecord(tabSetId, objName, type));
872 V();
873 }
874
removeObject(int tabSetId,const Chain & objName,CegoObject::ObjectType type)875 void CegoDatabaseManager::removeObject(int tabSetId, const Chain& objName, CegoObject::ObjectType type)
876 {
877 PW();
878 _objList.Remove(ObjectRecord(tabSetId, objName, type));
879 V();
880 }
881
removeAllObjects(int tabSetId)882 void CegoDatabaseManager::removeAllObjects(int tabSetId)
883 {
884 PW();
885 ObjectRecord *pOR = _objList.First();
886 while ( pOR )
887 {
888 if ( pOR->getTabSetId() == tabSetId )
889 {
890 _objList.Remove(*pOR);
891 pOR = _objList.First();
892 }
893 else
894 {
895 pOR = _objList.Next();
896 }
897 }
898 V();
899 }
900
objectExists(int tabSetId,const Chain & objName,CegoObject::ObjectType type)901 bool CegoDatabaseManager::objectExists(int tabSetId, const Chain& objName, CegoObject::ObjectType type)
902 {
903
904 // we fist check, if corresponding tableset is really not offline
905 checkTableSetRunState(tabSetId);
906
907 /*
908 Chain runState = getTableSetRunState(tabSetId);
909 if ( runState != Chain(XML_ONLINE_VALUE) && runState != Chain(XML_BACKUP_VALUE) && runState != Chain(XML_CHECKPOINT_VALUE) )
910 {
911 Chain msg = Chain("Tableset ") + getTabSetName(tabSetId) + Chain(" not online ( run state is ") + runState + Chain(" )");
912 throw Exception(EXLOC, msg);
913 }
914 */
915
916 PR();
917 ObjectRecord *pTR = _objList.Find(ObjectRecord(tabSetId, objName, type));
918 V();
919 if ( pTR )
920 {
921 return true;
922 }
923 else
924 {
925 return false;
926 }
927 }
928
printObjectList()929 void CegoDatabaseManager::printObjectList()
930 {
931 PW();
932 ObjectRecord *pOR = _objList.First();
933 while ( pOR )
934 {
935 cout << "ObjListEntry : " << pOR->getName() << " Type = " << pOR->getType() << endl;
936 pOR = _objList.Next();
937 }
938 V();
939 }
940
setThreadInfo(int numDbThread,int numAdmThread,int numLogThread)941 void CegoDatabaseManager::setThreadInfo(int numDbThread, int numAdmThread, int numLogThread)
942 {
943 _numDbThread = numDbThread;
944 _numAdmThread = numAdmThread;
945 _numLogThread = numLogThread;
946 _activeDbThread = 0;
947 _activeAdmThread = 0;
948 _activeLogThread = 0;
949 }
950
getThreadInfo(int & numDbThread,int & numAdmThread,int & numLogThread,int & activeDbThread,int & activeAdmThread,int & activeLogThread)951 void CegoDatabaseManager::getThreadInfo(int& numDbThread, int& numAdmThread, int& numLogThread,
952 int& activeDbThread, int& activeAdmThread, int& activeLogThread)
953 {
954 numDbThread = _numDbThread;
955 numAdmThread = _numAdmThread;
956 numLogThread = _numLogThread;
957 activeDbThread = _activeDbThread;
958 activeAdmThread = _activeAdmThread;
959 activeLogThread = _activeLogThread;
960 }
961
increaseActiveAdmThread()962 void CegoDatabaseManager::increaseActiveAdmThread()
963 {
964 _activeAdmThread++;
965 }
966
decreaseActiveAdmThread()967 void CegoDatabaseManager::decreaseActiveAdmThread()
968 {
969 _activeAdmThread--;
970 }
971
increaseActiveDbThread()972 void CegoDatabaseManager::increaseActiveDbThread()
973 {
974 _activeDbThread++;
975 }
976
decreaseActiveDbThread()977 void CegoDatabaseManager::decreaseActiveDbThread()
978 {
979 _activeDbThread--;
980 }
981
increaseActiveLogThread()982 void CegoDatabaseManager::increaseActiveLogThread()
983 {
984 _activeLogThread++;
985 }
986
decreaseActiveLogThread()987 void CegoDatabaseManager::decreaseActiveLogThread()
988 {
989 _activeLogThread--;
990 }
991
getDBMLockStat(Chain & lockName,int & lockCount,unsigned long long & numRdLock,unsigned long long & numWrLock,unsigned long long & sumRdDelay,unsigned long long & sumWrDelay)992 void CegoDatabaseManager::getDBMLockStat(Chain& lockName, int& lockCount, unsigned long long &numRdLock, unsigned long long &numWrLock, unsigned long long &sumRdDelay, unsigned long long &sumWrDelay)
993 {
994 lockName = dbmLock.getId();
995 lockCount= dbmLock.numLockTry();
996
997 numRdLock = dbmLock.numReadLock();
998 numWrLock = dbmLock.numWriteLock();
999 sumRdDelay = 0;
1000 sumWrDelay = 0;
1001
1002 if ( dbmLock.numReadLock() > 0 )
1003 sumRdDelay = dbmLock.sumReadDelay() / LCKMNG_DELRES;
1004 if ( dbmLock.numWriteLock() > 0 )
1005 sumWrDelay = dbmLock.sumWriteDelay() / LCKMNG_DELRES;
1006 }
1007
allocateSession(const Chain & hostName,const Chain & tableSet,const Chain & userName,const Chain & password)1008 CegoDistDbHandler* CegoDatabaseManager::allocateSession(const Chain& hostName, const Chain& tableSet,
1009 const Chain& userName, const Chain& password)
1010 {
1011 PW();
1012
1013 CegoDistDbHandler* pSession = 0;
1014
1015 try
1016 {
1017 DbSessionRecord *pSR = _dbSessionList.First();
1018 while ( pSR )
1019 {
1020 if ( pSR->getHostName() == hostName
1021 && pSR->getTableSet() == tableSet
1022 && pSR->getUserName() == userName
1023 && pSR->isUsed() == false )
1024 {
1025 pSR->setUsed(true);
1026 Datetime dt;
1027 pSR->setTSLastUsed( dt.asLong() );
1028
1029 V();
1030
1031 return pSR->getDbHandler();
1032 }
1033 pSR = _dbSessionList.Next();
1034 }
1035
1036 // create new session
1037
1038 pSession = createSession(hostName, tableSet, userName, password);
1039 _dbSessionList.Insert( DbSessionRecord(hostName, tableSet, userName, pSession));
1040
1041 }
1042 catch ( Exception e )
1043 {
1044 V();
1045 throw Exception(EXLOC, "Cannot allocate db session", e);
1046 }
1047
1048 V();
1049 return pSession;
1050 }
1051
releaseSession(CegoDistDbHandler * pHandler)1052 void CegoDatabaseManager::releaseSession(CegoDistDbHandler* pHandler)
1053 {
1054 PW();
1055
1056 DbSessionRecord *pSR = _dbSessionList.First();
1057 while ( pSR )
1058 {
1059 if ( pSR->getDbHandler() == pHandler )
1060 {
1061 pSR->setUsed(false);
1062
1063 V();
1064
1065 return;
1066 }
1067 pSR = _dbSessionList.Next();
1068 }
1069
1070 V();
1071
1072 Chain msg = Chain("Cannot release session for unknown db handle");
1073 throw Exception(EXLOC, msg);
1074 }
1075
cleanSession(int lifetime)1076 void CegoDatabaseManager::cleanSession(int lifetime)
1077 {
1078 PW();
1079
1080 Datetime dt;
1081
1082 DbSessionRecord *pSR = _dbSessionList.First();
1083 while ( pSR )
1084 {
1085 if ( pSR->getTSLastUsed() < ( dt.asLong() - lifetime ) )
1086 {
1087 if ( pSR->isUsed() == false )
1088 {
1089 closeSession(pSR->getDbHandler());
1090 _dbSessionList.Remove( DbSessionRecord(pSR->getDbHandler()));
1091 pSR = _dbSessionList.First();
1092 }
1093 }
1094 pSR = _dbSessionList.Next();
1095 }
1096
1097 V();
1098 }
1099
getSessionInfo(int lifetime)1100 Element* CegoDatabaseManager::getSessionInfo(int lifetime)
1101 {
1102 Element* pSessionInfo = new Element(XML_DBSESSIONINFO_ELEMENT);
1103
1104 DbSessionRecord *pSR = _dbSessionList.First();
1105 while ( pSR )
1106 {
1107 Element *pN = new Element(XML_DBSESSION_ELEMENT);
1108 pN->setAttribute(XML_HOSTNAME_ATTR, pSR->getHostName());
1109 pN->setAttribute(XML_TABLESET_ATTR, pSR->getTableSet());
1110 pN->setAttribute(XML_USER_ATTR, pSR->getUserName());
1111 if ( pSR->isUsed() )
1112 pN->setAttribute(XML_ISUSED_ATTR, XML_TRUE_VALUE);
1113 else
1114 pN->setAttribute(XML_ISUSED_ATTR, XML_FALSE_VALUE);
1115
1116 Datetime dt;
1117 pN->setAttribute(XML_TTL_ATTR, Chain(pSR->getTSLastUsed() + (unsigned long long)lifetime - dt.asLong()));
1118
1119 pSessionInfo->addContent(pN);
1120
1121 pSR = _dbSessionList.Next();
1122 }
1123 return pSessionInfo;
1124 }
1125
createSession(const Chain & hostName,const Chain & tableSet,const Chain & userName,const Chain & password)1126 CegoDistDbHandler* CegoDatabaseManager::createSession(const Chain& hostName, const Chain& tableSet,
1127 const Chain& userName, const Chain& password)
1128 {
1129 int portNo;
1130 getDataPort(portNo);
1131
1132 Net n( NETMNG_MSG_BUFLEN, NETMNG_SIZEBUFLEN, getMaxSendLen() );
1133 NetHandler* pN = 0;
1134 CegoDistDbHandler *pSH = 0;
1135
1136 try
1137 {
1138
1139 #ifdef DEBUG
1140 log(_modId, Logger::DEBUG, Chain("Connecting to ") + Chain(hostName) + Chain(" on port ") + Chain(portNo));
1141 #endif
1142
1143 pN = n.connect(hostName, portNo);
1144 pSH = new CegoDistDbHandler(pN, _protType, this);
1145
1146 #ifdef DEBUG
1147 log(_modId, Logger::DEBUG, Chain("Using user ") + userName + Chain("/") + password);
1148 #endif
1149 pSH->requestSession(tableSet, userName, password, false);
1150
1151 }
1152 catch ( Exception e )
1153 {
1154
1155 if ( pSH )
1156 {
1157 delete pSH;
1158 }
1159
1160 if ( pN )
1161 {
1162 delete pN;
1163 }
1164
1165 Chain msg = Chain("Cannot create session to ") + hostName;
1166 throw Exception(EXLOC, msg);
1167 }
1168
1169 return pSH;
1170 }
1171
closeSession(CegoDistDbHandler * pSH)1172 void CegoDatabaseManager::closeSession(CegoDistDbHandler* pSH)
1173 {
1174 #ifdef DEBUG
1175 log(_modId, Logger::DEBUG, Chain("Closing session ..."));
1176 #endif
1177
1178 pSH->closeSession();
1179
1180 NetHandler *pN = pSH->getNetHandler();
1181
1182 delete pSH;
1183 delete pN;
1184 }
1185
configureLogger(Logger::LogLevel level)1186 void CegoDatabaseManager::configureLogger(Logger::LogLevel level)
1187 {
1188 for ( int i=1; i< getMapSize() ; i++)
1189 {
1190 logModule(i, getModName(i), level);
1191 }
1192 _logConfigured=true;
1193 }
1194
configureLogger()1195 void CegoDatabaseManager::configureLogger()
1196 {
1197 ListT<Chain> modList;
1198 _logConfigured = getModuleList(modList);
1199
1200 Chain *pMod = modList.First();
1201 while ( pMod )
1202 {
1203 if ( (Chain)pMod->toUpper() == Chain("ALL"))
1204 {
1205 Logger::LogLevel level = getLogLevel(*pMod);
1206 for ( int i=1; i< getMapSize() ; i++)
1207 {
1208 logModule(i, getModName(i), level);
1209 }
1210 }
1211 else
1212 {
1213 unsigned long modId = getModId(*pMod);
1214 logModule(modId, *pMod, getLogLevel(*pMod));
1215 }
1216 pMod = modList.Next();
1217 }
1218 }
1219
verifyJDBC(const Chain & user)1220 bool CegoDatabaseManager::verifyJDBC(const Chain& user)
1221 {
1222 SetT<Chain> roleSet;
1223 getRoleSet(user, roleSet);
1224 return roleSet.Find(Chain(ROLE_JDBC));
1225 }
1226
verifyAccess(const int tabSetId,const Chain & objName,CegoObject::ObjectType type,CegoXMLSpace::AccessMode mode,const Chain & user)1227 bool CegoDatabaseManager::verifyAccess(const int tabSetId, const Chain& objName, CegoObject::ObjectType type, CegoXMLSpace::AccessMode mode, const Chain& user)
1228 {
1229 SetT<Chain> roleSet;
1230
1231 getRoleSet(user, roleSet);
1232 Chain tableSet = getTabSetName(tabSetId);
1233
1234 Chain *pRole = roleSet.First();
1235 while ( pRole )
1236 {
1237
1238 Chain objPattern = objName;
1239
1240 if ( matchRole( *pRole, tableSet, objPattern, mode ) )
1241 return true;
1242 pRole = roleSet.Next();
1243 }
1244 return false;
1245 }
1246
initLogFiles(const Chain & tableSet,bool overwrite)1247 void CegoDatabaseManager::initLogFiles(const Chain& tableSet, bool overwrite)
1248 {
1249 ListT<Chain> lfList;
1250 ListT<int> sizeList;
1251 ListT<Chain> statusList;
1252
1253 int tabSetId = getTabSetId(tableSet);
1254 getLogFileInfo(tableSet, lfList, sizeList, statusList);
1255
1256 Chain *pLog = lfList.First();
1257 int *pSize = sizeList.First();
1258
1259 bool isFirst = true;
1260 while ( pLog )
1261 {
1262
1263 if ( isFirst )
1264 setLogFileStatus(tableSet, *pLog, XML_ACTIVE_VALUE);
1265 else
1266 setLogFileStatus(tableSet, *pLog, XML_FREE_VALUE);
1267
1268 isFirst=false;
1269
1270 log(_modId, Logger::NOTICE, Chain("Initializing logfile ") + *pLog + Chain(" ..."));
1271
1272 if ( overwrite == false )
1273 {
1274 File checkLog(*pLog);
1275 if ( checkLog.exists() )
1276 {
1277 Chain msg = Chain("Cannot initialize logfile <") + *pLog + Chain(">, file already exists");
1278 throw Exception(EXLOC, msg);
1279 }
1280 }
1281
1282 setLogFile(tabSetId, *pLog, false);
1283 initLog(tabSetId, *pSize);
1284
1285 pLog = lfList.Next();
1286 pSize = sizeList.Next();
1287
1288 }
1289 }
1290
releaseLogFiles(const Chain & tableSet,bool waitForArch)1291 void CegoDatabaseManager::releaseLogFiles(const Chain& tableSet, bool waitForArch)
1292 {
1293 ListT<Chain> lfList;
1294 ListT<int> sizeList;
1295 ListT<Chain> statusList;
1296
1297 int tabSetId = getTabSetId(tableSet);
1298 getLogFileInfo(tableSet, lfList, sizeList, statusList);
1299
1300 Chain *pLog = lfList.First();
1301 Chain *pStatus = statusList.First();
1302
1303 while ( pLog && pStatus )
1304 {
1305 if ( *pStatus == Chain(XML_ACTIVE_VALUE) && File::exists(*pLog) )
1306 {
1307 setLogFile(tabSetId, *pLog, true);
1308 unsigned long long minlsn = getMinLSN(tabSetId);
1309 if ( minlsn > 0 )
1310 {
1311 log(_modId, Logger::NOTICE, Chain("Releasing logfile ") + *pLog + Chain(" LSN=") + Chain(minlsn));
1312 setLogFileStatus(tableSet, *pLog, XML_OCCUPIED_VALUE);
1313 }
1314 }
1315 pStatus = statusList.Next();
1316 pLog = lfList.Next();
1317 }
1318
1319 if ( waitForArch )
1320 {
1321 bool notArchived = true;
1322
1323 while ( notArchived )
1324 {
1325
1326 log(_modId, Logger::NOTICE, Chain("Waiting for archive ... "));
1327
1328 ListT<Chain> lfList;
1329 ListT<int> sizeList;
1330 ListT<Chain> statusList;
1331
1332 getLogFileInfo(tableSet, lfList, sizeList, statusList);
1333
1334 notArchived = false;
1335
1336 Chain *pStatus = statusList.First();
1337 Chain *pLog = lfList.First();
1338 while ( pLog && pStatus )
1339 {
1340 if ( *pStatus != Chain(XML_FREE_VALUE) && File::exists(*pLog) )
1341 notArchived = true;
1342 pStatus = statusList.Next();
1343 pLog = lfList.Next();
1344 }
1345
1346 lfList.Empty();
1347 sizeList.Empty();
1348 statusList.Empty();
1349
1350 Sleeper s;
1351 s.secSleep(LOGMNG_RECOVERY_DELAY);
1352 }
1353 }
1354 }
1355
getProtType() const1356 CegoDbHandler::ProtocolType CegoDatabaseManager::getProtType() const
1357 {
1358 return _protType;
1359 }
1360