1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // CegoLogManager.cc
4 // -----------------
5 // Cego database log manager implementation
6 //
7 // Design and Implementation by Bjoern Lemke
8 //
9 // (C)opyright 2000-2019 Bjoern Lemke
10 //
11 // IMPLEMENTATION MODULE
12 //
13 // Class: CegoLogManager
14 //
15 // Description: Cego database log management
16 //
17 // Status: CLEAN
18 //
19 ///////////////////////////////////////////////////////////////////////////////
20 
21 // LFC INCLUDES
22 #include <lfcbase/Exception.h>
23 #include <lfcbase/File.h>
24 #include <lfcbase/Net.h>
25 #include <lfcbase/Datetime.h>
26 
27 // CEGO INCLUDES
28 #include "CegoLogManager.h"
29 #include "CegoXMLdef.h"
30 #include "CegoDefs.h"
31 
32 // POSIX INCLUDES
33 #include <string.h>
34 #include <stdlib.h>
35 
36 extern bool __fsyncOn;
37 
CegoLogManager(const Chain & xmlDef,const Chain & logFile,const Chain & progName)38 CegoLogManager::CegoLogManager(const Chain& xmlDef, const Chain& logFile, const Chain& progName) : CegoFileHandler(logFile, progName), CegoXMLSpace(xmlDef)
39 {
40     for (int i=0;i< TABMNG_MAXTABSET;i++)
41     {
42 	_lsn[i]=0;
43 	_pLog[i]=0;
44 	_logActive[i] = false;
45 	_pLogHandler[i] = 0;
46 	_pNetHandler[i] = 0;
47 	_logBuf[i] = 0;
48 	_logBufLen[i] = 0;
49     }
50     _modId = getModId("CegoLogManager");
51 }
52 
~CegoLogManager()53 CegoLogManager::~CegoLogManager()
54 {
55     for (int i=0;i< TABMNG_MAXTABSET;i++)
56     {
57 	if (_pLog[i])
58 	{
59 	    _pLog[i]->close();
60 	    delete _pLog[i];
61 	}
62 	if ( _logBuf[i] )
63 	    free(_logBuf[i]);
64     }
65 }
66 
setCurrentLSN(int tabSetId,unsigned long long lsn)67 void CegoLogManager::setCurrentLSN(int tabSetId, unsigned long long lsn)
68 {
69     _lsn[tabSetId] = lsn;
70 }
71 
getCurrentLSN(int tabSetId)72 unsigned long long CegoLogManager::getCurrentLSN(int tabSetId)
73 {
74     return _lsn[tabSetId];
75 }
76 
nextLSN(int tabSetId)77 unsigned long long CegoLogManager::nextLSN(int tabSetId)
78 {
79     // we just increase, if log is active
80     if ( _logActive[tabSetId] == false )
81 	return 0;
82 
83     _lsn[tabSetId]++;
84     return _lsn[tabSetId];
85 }
86 
getArchiveLogName(const Chain & tableSet,unsigned long long lsn)87 Chain CegoLogManager::getArchiveLogName(const Chain& tableSet, unsigned long long lsn)
88 {
89     Chain lsnStr = Chain("000000000000") + Chain(lsn);
90     Chain lsnFix = lsnStr.subChain(lsnStr.length() - 12, lsnStr.length());
91     return tableSet + "-" + lsnFix + ".dbf";
92 }
93 
setLogFile(int tabSetId,const Chain & logFile,bool readOnly)94 void CegoLogManager::setLogFile(int tabSetId, const Chain& logFile, bool readOnly)
95 {
96     if (_pLog[tabSetId])
97     {
98 	_pLog[tabSetId]->close();
99 	delete _pLog[tabSetId];
100     }
101 
102     _logFile[tabSetId] = logFile;
103 
104     _pLog[tabSetId] = new File(logFile);
105 
106     if ( readOnly )
107 	_pLog[tabSetId]->open(File::READ);
108     else
109 	_pLog[tabSetId]->open(File::READWRITE);
110 
111     _logSize[tabSetId] = _pLog[tabSetId]->Size();
112 
113     _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
114 
115     _logPos[tabSetId] = sizeof(int);
116     _logActive[tabSetId] = false;
117 }
118 
allocateLogConnection(int tabSetId,const Chain & tableSet,const Chain & logHost,int logPort,const Chain & logUser,const Chain & logPwd)119 void CegoLogManager::allocateLogConnection(int tabSetId, const Chain& tableSet, const Chain& logHost, int logPort, const Chain& logUser, const Chain& logPwd)
120 {
121     Net n ( NETMNG_MSG_BUFLEN, NETMNG_SIZEBUFLEN, NETMNG_MAXSENDLEN );
122 
123     log(_modId, Logger::NOTICE, Chain("Connecting to loghost ") + logHost + Chain(":") + Chain(logPort) + Chain(" ..."));
124 
125     try
126     {
127 	_pNetHandler[tabSetId] = n.connect(logHost, logPort);
128     }
129     catch ( Exception e )
130     {
131 	Chain msg = Chain("Cannot connect to loghost ") + logHost + Chain(":") + Chain(logPort);
132 	log(_modId, Logger::LOGERR, msg);
133 	throw Exception(EXLOC, msg);
134     }
135     _pLogHandler[tabSetId] = new CegoLogHandler(this, _pNetHandler[tabSetId]);
136     _pLogHandler[tabSetId]->requestLogSession(tableSet, logUser, logPwd);
137 }
138 
hasLogConnection(int tabSetId)139 bool CegoLogManager::hasLogConnection(int tabSetId)
140 {
141     if ( _pLogHandler[tabSetId] )
142     {
143 	return true;
144     }
145     return false;
146 }
147 
releaseLogConnection(int tabSetId)148 void CegoLogManager::releaseLogConnection(int tabSetId)
149 {
150     if ( _pLogHandler[tabSetId] )
151     {
152 	try
153 	{
154 	    _pLogHandler[tabSetId]->closeSession();
155 	}
156 	catch ( Exception e )
157 	{
158 	    log(_modId, Logger::LOGERR, Chain("Cannot close session to loghost, ignoring"));
159 	}
160 	delete _pLogHandler[tabSetId];
161 	_pLogHandler[tabSetId] = 0;
162 
163 	delete _pNetHandler[tabSetId];
164     }
165 }
166 
initLog(int tabSetId,int size)167 void CegoLogManager::initLog(int tabSetId, int size)
168 {
169     if (_pLog[tabSetId] == 0)
170     {
171 	Chain msg = "No logfile set up for tableset";
172 	throw Exception(EXLOC, msg);
173     }
174 
175     _pLog[tabSetId]->seek(0);
176 
177     char wBuf[LOGMNG_WBUFSIZE];
178 
179     _logOffset[tabSetId] = sizeof(int);
180 
181     _pLog[tabSetId]->writeByte((char*)&_logOffset[tabSetId], sizeof(int));
182 
183     int wBytes = sizeof(int);
184     while (wBytes < size )
185     {
186 	int n = wBytes + LOGMNG_WBUFSIZE > size ? size - wBytes : LOGMNG_WBUFSIZE;
187 	_pLog[tabSetId]->writeByte(wBuf, n);
188 	wBytes += n;
189     }
190 
191     _pLog[tabSetId]->close();
192     delete _pLog[tabSetId];
193     _pLog[tabSetId]=0;
194 }
195 
getLogOffset(int tabSetId)196 int CegoLogManager::getLogOffset(int tabSetId)
197 {
198     return _logOffset[tabSetId];
199 }
200 
resetLog(int tabSetId)201 void CegoLogManager::resetLog(int tabSetId)
202 {
203     _logOffset[tabSetId] = sizeof(int);
204     _pLog[tabSetId]->seek(0);
205     _pLog[tabSetId]->writeByte((char*)&_logOffset[tabSetId], sizeof(int));
206     if ( __fsyncOn )
207 	_pLog[tabSetId]->flush();
208 }
209 
stopLog(int tabSetId)210 void CegoLogManager::stopLog(int tabSetId)
211 {
212     if (_pLog[tabSetId])
213     {
214 	_pLog[tabSetId]->close();
215 	delete _pLog[tabSetId];
216 	_pLog[tabSetId] = 0;
217     }
218 
219     _logActive[tabSetId] = false;
220 }
221 
startLog(int tabSetId)222 void CegoLogManager::startLog(int tabSetId)
223 {
224     if ( _pLog[tabSetId] != 0 )
225     {
226 	_pLog[tabSetId]->seek(0);
227 	_pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
228 	_pLog[tabSetId]->seek(_logOffset[tabSetId]);
229     }
230     _logActive[tabSetId] = true;
231 }
232 
233 
logAction(int tabSetId,CegoLogRecord & logRec,bool flushLog)234 CegoLogManager::LogResult CegoLogManager::logAction(int tabSetId, CegoLogRecord& logRec, bool flushLog)
235 {
236 
237     if ( _logActive[tabSetId] == false )
238 	return LOG_SUCCESS;
239 
240     // cout << "Logging " <<  _lsn[tabSetId] << " " << logRec << endl;
241 
242     if ( logRec.getLSN() == 0 )
243     {
244 	logRec.setLSN(nextLSN(tabSetId));
245     }
246 
247     Datetime ts;
248     logRec.setTS( ts.asLong());
249 
250     int len = logRec.getEncodingLength();
251 
252     // cout << "Logging record of len " << len << endl;
253 
254     if ( _logBuf[tabSetId] == 0 )
255     {
256 	_logBuf[tabSetId] = (char*)malloc(len);
257 	_logBufLen[tabSetId] = len;
258     }
259     else
260     {
261 	if ( _logBufLen[tabSetId] < len )
262 	{
263 	    // reallocate buffer
264 	    // cout << "Reallocating buffer to size " << len << endl;
265 	    free ( _logBuf[tabSetId] );
266 	    _logBuf[tabSetId] = (char*)malloc(len);
267 	    _logBufLen[tabSetId] = len;
268 	}
269     }
270 
271     logRec.encode(_logBuf[tabSetId]);
272 
273     bool isFull = false;
274 
275     if ( _pLogHandler[tabSetId] )
276     {
277 #ifdef CGDEBUG
278 	log(_modId, Logger::DEBUG, Chain("Sending log msg ( lsn=") + Chain(logRec.getLSN()) + Chain(", size=") + Chain(len) + Chain(")"));
279 #endif
280 	if ( _pLogHandler[tabSetId]->sendLogEntry(_logBuf[tabSetId], len) == false )
281 	{
282 	    return LOG_ERROR;
283 	}
284     }
285     else
286     {
287 #ifdef CGDEBUG
288 	log(_modId, Logger::DEBUG, Chain("Logging local ( lsn=") + Chain(logRec.getLSN()) + Chain(", size=") + Chain(len) + Chain(")"));
289 #endif
290 
291 	if ( _logOffset[tabSetId] + len > _logSize[tabSetId] )
292 	{
293 	    // is log is full, we anyway write the log entry to complete log for checkpoint
294 	    isFull=true;
295 	}
296 
297 	_pLog[tabSetId]->writeByte((char*)&len, sizeof(int));
298 	_pLog[tabSetId]->writeByte(_logBuf[tabSetId], len);
299 	_logOffset[tabSetId] += len + sizeof(int);
300 
301 	_pLog[tabSetId]->seek(0);
302 	_pLog[tabSetId]->writeByte((char*)&_logOffset[tabSetId], sizeof(int));
303 	_pLog[tabSetId]->seek(_logOffset[tabSetId]);
304 
305 	if ( flushLog )
306 	{
307 	    if ( __fsyncOn )
308 		_pLog[tabSetId]->flush();
309 	}
310     }
311 
312     if ( isFull )
313 	return LOG_FULL;
314     else
315 	return LOG_SUCCESS;
316 }
317 
getMinLSN(int tabSetId)318 unsigned long long CegoLogManager::getMinLSN(int tabSetId)
319 {
320     if (_pLog[tabSetId] == 0)
321     {
322 	Chain msg = "No logfile set up for tableset";
323 	throw Exception(EXLOC, msg);
324     }
325 
326     _pLog[tabSetId]->seek(0);
327     _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
328 
329     _logPos[tabSetId] = sizeof(int);
330 
331     unsigned long long minlsn=0;
332 
333     if ( _logPos[tabSetId] < _logOffset[tabSetId] )
334     {
335 	int len;
336 	_pLog[tabSetId]->readByte((char*)&len, sizeof(int));
337 
338 	if ( len > LOGMNG_RECBUFSIZE )
339 	{
340 	    Chain msg = "Log read buffer exceeded";
341 	    throw Exception(EXLOC, msg);
342 	}
343 	char buf[LOGMNG_RECBUFSIZE];
344 
345 	_pLog[tabSetId]->readByte(buf, len);
346 
347 	CegoLogRecord lr;
348 	lr.decode(buf);
349 
350 	minlsn = lr.getLSN();
351 
352 	_logPos[tabSetId] += len + sizeof(int);
353 
354     }
355     return minlsn;
356 }
357 
getMaxLSN(int tabSetId)358 unsigned long long CegoLogManager::getMaxLSN(int tabSetId)
359 {
360     if (_pLog[tabSetId] == 0)
361     {
362 	Chain msg = "No logfile set up for tableset";
363 	throw Exception(EXLOC, msg);
364     }
365 
366     _pLog[tabSetId]->seek(0);
367     _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
368 
369 #ifdef CGDEBUG
370     log(_modId, Logger::DEBUG, Chain("Read logoffset ") + Chain(_logOffset[tabSetId]));
371 #endif
372 
373     _logPos[tabSetId] = sizeof(int);
374 
375     unsigned long long maxlsn=0;
376 
377     while (_logPos[tabSetId] < _logOffset[tabSetId])
378     {
379 	int len;
380 	_pLog[tabSetId]->readByte((char*)&len, sizeof(int));
381 
382 	if ( len > LOGMNG_RECBUFSIZE )
383 	{
384 	    Chain msg = "Log read buffer exceeded";
385 	    throw Exception(EXLOC, msg);
386 	}
387 	char buf[LOGMNG_RECBUFSIZE];
388 
389 	_pLog[tabSetId]->readByte(buf, len);
390 
391 	CegoLogRecord lr;
392 	lr.decode(buf);
393 
394 	if ( lr.getLSN() > maxlsn )
395 	    maxlsn = lr.getLSN();
396 
397 	_logPos[tabSetId] += len + sizeof(int);
398 
399     }
400     return maxlsn;
401 }
402 
seekToStart(int tabSetId)403 void CegoLogManager::seekToStart(int tabSetId)
404 {
405     if (_pLog[tabSetId] == 0)
406     {
407 	Chain msg = "No logfile set up for tableset";
408 	throw Exception(EXLOC, msg);
409     }
410 
411     _pLog[tabSetId]->seek(0);
412     _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
413     _logPos[tabSetId] = sizeof(int);
414 }
415 
logSeek(int tabSetId,unsigned long long lsn)416 bool CegoLogManager::logSeek(int tabSetId, unsigned long long lsn)
417 {
418     if (_pLog[tabSetId] == 0)
419     {
420 	Chain msg = "No logfile set up for tableset";
421 	throw Exception(EXLOC, msg);
422     }
423 
424     _pLog[tabSetId]->seek(0);
425     _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
426 
427     _logPos[tabSetId] = sizeof(int);
428 
429     unsigned long long loglsn=0;
430 
431     while (_logPos[tabSetId] < _logOffset[tabSetId])
432     {
433 	int len;
434 	_pLog[tabSetId]->readByte((char*)&len, sizeof(int));
435 
436 	if ( len > LOGMNG_RECBUFSIZE )
437 	{
438 	    Chain msg = "Log read buffer exceeded";
439 	    throw Exception(EXLOC, msg);
440 	}
441 	char buf[LOGMNG_RECBUFSIZE];
442 
443 	_pLog[tabSetId]->readByte(buf, len);
444 
445 	CegoLogRecord lr;
446 	lr.decode(buf);
447 
448 	loglsn = lr.getLSN();
449 
450 	if (  loglsn == lsn )
451 	{
452 	    _pLog[tabSetId]->seek(_logPos[tabSetId]);
453 	    return true;
454 	}
455 	else if ( loglsn > lsn )
456 	{
457 
458 	    Chain msg = "LSN=" + Chain(lsn) + " too small, not found in log";
459 	    throw Exception(EXLOC, msg);
460 	}
461 
462 	_logPos[tabSetId] += len + sizeof(int);
463     }
464 
465     if ( loglsn + 1 == lsn )
466     {
467 	return false;
468     }
469     else
470     {
471 	Chain msg = "LSN=" + Chain(lsn) + " too high, log delta is missing";
472 	throw Exception(EXLOC, msg);
473     }
474 }
475 
logRead(int tabSetId,CegoLogRecord & logRec)476 bool CegoLogManager::logRead(int tabSetId, CegoLogRecord& logRec)
477 {
478     if (_logPos[tabSetId] < _logOffset[tabSetId])
479     {
480 	int len;
481 	_pLog[tabSetId]->readByte((char*)&len, sizeof(int));
482 
483 	if ( len > LOGMNG_RECBUFSIZE )
484 	{
485 	    Chain msg = "Log read buffer exceeded";
486 	    throw Exception(EXLOC, msg);
487 	}
488 
489 	char buf[LOGMNG_RECBUFSIZE];
490 
491 	_pLog[tabSetId]->readByte(buf, len);
492 
493 	logRec.decode(buf);
494 
495 	_logPos[tabSetId] += len + sizeof(int);
496 
497 	return true;
498     }
499     return false;
500 }
501 
switchLogFile(int tabSetId)502 bool CegoLogManager::switchLogFile(int tabSetId)
503 {
504     if ( _logActive[tabSetId] == false )
505 	return true;
506 
507     if ( _pLogHandler[tabSetId] == 0 )
508     {
509 	Chain tableSet = getTabSetName(tabSetId);
510 
511 	ListT<Chain> lfList;
512 	ListT<int> sizeList;
513 	ListT<Chain> statusList;
514 
515 	getLogFileInfo(tableSet, lfList, sizeList, statusList);
516 
517 	Chain *pLogFile = lfList.First();
518 	Chain *pStatus = statusList.First();
519 
520 	bool found = false;
521 	while ( pLogFile && pStatus && found == false)
522 	{
523 	    if ( *pStatus == Chain(XML_ACTIVE_VALUE))
524 	    {
525 		Chain *pNextLogFile = lfList.Next();
526 		Chain *pNextLogStatus = statusList.Next();
527 		if ( pNextLogFile == 0)
528 		{
529 		    pNextLogFile = lfList.First();
530 		    pNextLogStatus = statusList.First();
531 		}
532 
533 
534 		if ( isArchiveMode(tabSetId) )
535 		{
536 		    // if next logfile is still occupied, we skip switch and return false
537 		    if ( *pNextLogStatus == Chain(XML_OCCUPIED_VALUE))
538 		    {
539 			return false;
540 		    }
541 
542 		    // first we have to switch log file to synchronize with CegoLogThreadPool::shiftRedoLogs()
543 		    setLogFile(tabSetId, *pNextLogFile, false);
544 		    setLogFileStatus(tableSet, *pLogFile, XML_OCCUPIED_VALUE);
545 		}
546 		else
547 		{
548 		    // switch log file first also for non archiving mode
549 		    setLogFile(tabSetId, *pNextLogFile, false);
550 		    setLogFileStatus(tableSet, *pLogFile, XML_FREE_VALUE);
551 		}
552 
553 		setLogFileStatus(tableSet, *pNextLogFile, XML_ACTIVE_VALUE);
554 
555 		log(_modId, Logger::NOTICE, Chain("Logfile switch to logfile ") + *pNextLogFile + Chain(" for tableSet ") +  tableSet);
556 
557 		found = true;
558 	    }
559 	    else
560 	    {
561 		pLogFile = lfList.Next();
562 		pStatus = statusList.Next();
563 	    }
564 	}
565 
566 	doc2Xml();
567 
568 	resetLog(tabSetId);
569 	startLog(tabSetId);
570     }
571 
572     CegoLogRecord lr;
573     lr.setAction(CegoLogRecord::LOGREC_SYNC);
574     logAction(tabSetId, lr);
575 
576     return true;
577 }
578 
setActiveLogFile(const Chain & tableSet)579 void CegoLogManager::setActiveLogFile(const Chain& tableSet)
580 {
581     int tabSetId = getTabSetId(tableSet);
582 
583     ListT<Chain> lfList;
584     ListT<int> sizeList;
585     ListT<Chain> statusList;
586 
587     getLogFileInfo(tableSet, lfList, sizeList, statusList);
588 
589     Chain *pLogFile = lfList.First();
590     Chain *pStatus = statusList.First();
591 
592     while ( pLogFile && pStatus )
593     {
594 	if ( *pStatus == Chain(XML_ACTIVE_VALUE))
595 	{
596 	    log(_modId, Logger::NOTICE, Chain("Setting active logfile to ") + Chain(*pLogFile) + Chain(" ..."));
597 	    setLogFile(tabSetId, *pLogFile, false);
598 	    return;
599 	}
600 	else
601 	{
602 	    pLogFile = lfList.Next();
603 	    pStatus = statusList.Next();
604 	}
605     }
606 }
607