1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // CegoLogManager.cc
4 // -----------------
5 // Cego database log manager implementation
6 //
7 // Design and Implementation by Bjoern Lemke
8 //
9 // (C)opyright 2000-2019 Bjoern Lemke
10 //
11 // IMPLEMENTATION MODULE
12 //
13 // Class: CegoLogManager
14 //
15 // Description: Cego database log management
16 //
17 // Status: CLEAN
18 //
19 ///////////////////////////////////////////////////////////////////////////////
20
21 // LFC INCLUDES
22 #include <lfcbase/Exception.h>
23 #include <lfcbase/File.h>
24 #include <lfcbase/Net.h>
25 #include <lfcbase/Datetime.h>
26
27 // CEGO INCLUDES
28 #include "CegoLogManager.h"
29 #include "CegoXMLdef.h"
30 #include "CegoDefs.h"
31
32 // POSIX INCLUDES
33 #include <string.h>
34 #include <stdlib.h>
35
36 extern bool __fsyncOn;
37
CegoLogManager(const Chain & xmlDef,const Chain & logFile,const Chain & progName)38 CegoLogManager::CegoLogManager(const Chain& xmlDef, const Chain& logFile, const Chain& progName) : CegoFileHandler(logFile, progName), CegoXMLSpace(xmlDef)
39 {
40 for (int i=0;i< TABMNG_MAXTABSET;i++)
41 {
42 _lsn[i]=0;
43 _pLog[i]=0;
44 _logActive[i] = false;
45 _pLogHandler[i] = 0;
46 _pNetHandler[i] = 0;
47 _logBuf[i] = 0;
48 _logBufLen[i] = 0;
49 }
50 _modId = getModId("CegoLogManager");
51 }
52
~CegoLogManager()53 CegoLogManager::~CegoLogManager()
54 {
55 for (int i=0;i< TABMNG_MAXTABSET;i++)
56 {
57 if (_pLog[i])
58 {
59 _pLog[i]->close();
60 delete _pLog[i];
61 }
62 if ( _logBuf[i] )
63 free(_logBuf[i]);
64 }
65 }
66
setCurrentLSN(int tabSetId,unsigned long long lsn)67 void CegoLogManager::setCurrentLSN(int tabSetId, unsigned long long lsn)
68 {
69 _lsn[tabSetId] = lsn;
70 }
71
getCurrentLSN(int tabSetId)72 unsigned long long CegoLogManager::getCurrentLSN(int tabSetId)
73 {
74 return _lsn[tabSetId];
75 }
76
nextLSN(int tabSetId)77 unsigned long long CegoLogManager::nextLSN(int tabSetId)
78 {
79 // we just increase, if log is active
80 if ( _logActive[tabSetId] == false )
81 return 0;
82
83 _lsn[tabSetId]++;
84 return _lsn[tabSetId];
85 }
86
getArchiveLogName(const Chain & tableSet,unsigned long long lsn)87 Chain CegoLogManager::getArchiveLogName(const Chain& tableSet, unsigned long long lsn)
88 {
89 Chain lsnStr = Chain("000000000000") + Chain(lsn);
90 Chain lsnFix = lsnStr.subChain(lsnStr.length() - 12, lsnStr.length());
91 return tableSet + "-" + lsnFix + ".dbf";
92 }
93
setLogFile(int tabSetId,const Chain & logFile,bool readOnly)94 void CegoLogManager::setLogFile(int tabSetId, const Chain& logFile, bool readOnly)
95 {
96 if (_pLog[tabSetId])
97 {
98 _pLog[tabSetId]->close();
99 delete _pLog[tabSetId];
100 }
101
102 _logFile[tabSetId] = logFile;
103
104 _pLog[tabSetId] = new File(logFile);
105
106 if ( readOnly )
107 _pLog[tabSetId]->open(File::READ);
108 else
109 _pLog[tabSetId]->open(File::READWRITE);
110
111 _logSize[tabSetId] = _pLog[tabSetId]->Size();
112
113 _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
114
115 _logPos[tabSetId] = sizeof(int);
116 _logActive[tabSetId] = false;
117 }
118
allocateLogConnection(int tabSetId,const Chain & tableSet,const Chain & logHost,int logPort,const Chain & logUser,const Chain & logPwd)119 void CegoLogManager::allocateLogConnection(int tabSetId, const Chain& tableSet, const Chain& logHost, int logPort, const Chain& logUser, const Chain& logPwd)
120 {
121 Net n ( NETMNG_MSG_BUFLEN, NETMNG_SIZEBUFLEN, NETMNG_MAXSENDLEN );
122
123 log(_modId, Logger::NOTICE, Chain("Connecting to loghost ") + logHost + Chain(":") + Chain(logPort) + Chain(" ..."));
124
125 try
126 {
127 _pNetHandler[tabSetId] = n.connect(logHost, logPort);
128 }
129 catch ( Exception e )
130 {
131 Chain msg = Chain("Cannot connect to loghost ") + logHost + Chain(":") + Chain(logPort);
132 log(_modId, Logger::LOGERR, msg);
133 throw Exception(EXLOC, msg);
134 }
135 _pLogHandler[tabSetId] = new CegoLogHandler(this, _pNetHandler[tabSetId]);
136 _pLogHandler[tabSetId]->requestLogSession(tableSet, logUser, logPwd);
137 }
138
hasLogConnection(int tabSetId)139 bool CegoLogManager::hasLogConnection(int tabSetId)
140 {
141 if ( _pLogHandler[tabSetId] )
142 {
143 return true;
144 }
145 return false;
146 }
147
releaseLogConnection(int tabSetId)148 void CegoLogManager::releaseLogConnection(int tabSetId)
149 {
150 if ( _pLogHandler[tabSetId] )
151 {
152 try
153 {
154 _pLogHandler[tabSetId]->closeSession();
155 }
156 catch ( Exception e )
157 {
158 log(_modId, Logger::LOGERR, Chain("Cannot close session to loghost, ignoring"));
159 }
160 delete _pLogHandler[tabSetId];
161 _pLogHandler[tabSetId] = 0;
162
163 delete _pNetHandler[tabSetId];
164 }
165 }
166
initLog(int tabSetId,int size)167 void CegoLogManager::initLog(int tabSetId, int size)
168 {
169 if (_pLog[tabSetId] == 0)
170 {
171 Chain msg = "No logfile set up for tableset";
172 throw Exception(EXLOC, msg);
173 }
174
175 _pLog[tabSetId]->seek(0);
176
177 char wBuf[LOGMNG_WBUFSIZE];
178
179 _logOffset[tabSetId] = sizeof(int);
180
181 _pLog[tabSetId]->writeByte((char*)&_logOffset[tabSetId], sizeof(int));
182
183 int wBytes = sizeof(int);
184 while (wBytes < size )
185 {
186 int n = wBytes + LOGMNG_WBUFSIZE > size ? size - wBytes : LOGMNG_WBUFSIZE;
187 _pLog[tabSetId]->writeByte(wBuf, n);
188 wBytes += n;
189 }
190
191 _pLog[tabSetId]->close();
192 delete _pLog[tabSetId];
193 _pLog[tabSetId]=0;
194 }
195
getLogOffset(int tabSetId)196 int CegoLogManager::getLogOffset(int tabSetId)
197 {
198 return _logOffset[tabSetId];
199 }
200
resetLog(int tabSetId)201 void CegoLogManager::resetLog(int tabSetId)
202 {
203 _logOffset[tabSetId] = sizeof(int);
204 _pLog[tabSetId]->seek(0);
205 _pLog[tabSetId]->writeByte((char*)&_logOffset[tabSetId], sizeof(int));
206 if ( __fsyncOn )
207 _pLog[tabSetId]->flush();
208 }
209
stopLog(int tabSetId)210 void CegoLogManager::stopLog(int tabSetId)
211 {
212 if (_pLog[tabSetId])
213 {
214 _pLog[tabSetId]->close();
215 delete _pLog[tabSetId];
216 _pLog[tabSetId] = 0;
217 }
218
219 _logActive[tabSetId] = false;
220 }
221
startLog(int tabSetId)222 void CegoLogManager::startLog(int tabSetId)
223 {
224 if ( _pLog[tabSetId] != 0 )
225 {
226 _pLog[tabSetId]->seek(0);
227 _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
228 _pLog[tabSetId]->seek(_logOffset[tabSetId]);
229 }
230 _logActive[tabSetId] = true;
231 }
232
233
logAction(int tabSetId,CegoLogRecord & logRec,bool flushLog)234 CegoLogManager::LogResult CegoLogManager::logAction(int tabSetId, CegoLogRecord& logRec, bool flushLog)
235 {
236
237 if ( _logActive[tabSetId] == false )
238 return LOG_SUCCESS;
239
240 // cout << "Logging " << _lsn[tabSetId] << " " << logRec << endl;
241
242 if ( logRec.getLSN() == 0 )
243 {
244 logRec.setLSN(nextLSN(tabSetId));
245 }
246
247 Datetime ts;
248 logRec.setTS( ts.asLong());
249
250 int len = logRec.getEncodingLength();
251
252 // cout << "Logging record of len " << len << endl;
253
254 if ( _logBuf[tabSetId] == 0 )
255 {
256 _logBuf[tabSetId] = (char*)malloc(len);
257 _logBufLen[tabSetId] = len;
258 }
259 else
260 {
261 if ( _logBufLen[tabSetId] < len )
262 {
263 // reallocate buffer
264 // cout << "Reallocating buffer to size " << len << endl;
265 free ( _logBuf[tabSetId] );
266 _logBuf[tabSetId] = (char*)malloc(len);
267 _logBufLen[tabSetId] = len;
268 }
269 }
270
271 logRec.encode(_logBuf[tabSetId]);
272
273 bool isFull = false;
274
275 if ( _pLogHandler[tabSetId] )
276 {
277 #ifdef CGDEBUG
278 log(_modId, Logger::DEBUG, Chain("Sending log msg ( lsn=") + Chain(logRec.getLSN()) + Chain(", size=") + Chain(len) + Chain(")"));
279 #endif
280 if ( _pLogHandler[tabSetId]->sendLogEntry(_logBuf[tabSetId], len) == false )
281 {
282 return LOG_ERROR;
283 }
284 }
285 else
286 {
287 #ifdef CGDEBUG
288 log(_modId, Logger::DEBUG, Chain("Logging local ( lsn=") + Chain(logRec.getLSN()) + Chain(", size=") + Chain(len) + Chain(")"));
289 #endif
290
291 if ( _logOffset[tabSetId] + len > _logSize[tabSetId] )
292 {
293 // is log is full, we anyway write the log entry to complete log for checkpoint
294 isFull=true;
295 }
296
297 _pLog[tabSetId]->writeByte((char*)&len, sizeof(int));
298 _pLog[tabSetId]->writeByte(_logBuf[tabSetId], len);
299 _logOffset[tabSetId] += len + sizeof(int);
300
301 _pLog[tabSetId]->seek(0);
302 _pLog[tabSetId]->writeByte((char*)&_logOffset[tabSetId], sizeof(int));
303 _pLog[tabSetId]->seek(_logOffset[tabSetId]);
304
305 if ( flushLog )
306 {
307 if ( __fsyncOn )
308 _pLog[tabSetId]->flush();
309 }
310 }
311
312 if ( isFull )
313 return LOG_FULL;
314 else
315 return LOG_SUCCESS;
316 }
317
getMinLSN(int tabSetId)318 unsigned long long CegoLogManager::getMinLSN(int tabSetId)
319 {
320 if (_pLog[tabSetId] == 0)
321 {
322 Chain msg = "No logfile set up for tableset";
323 throw Exception(EXLOC, msg);
324 }
325
326 _pLog[tabSetId]->seek(0);
327 _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
328
329 _logPos[tabSetId] = sizeof(int);
330
331 unsigned long long minlsn=0;
332
333 if ( _logPos[tabSetId] < _logOffset[tabSetId] )
334 {
335 int len;
336 _pLog[tabSetId]->readByte((char*)&len, sizeof(int));
337
338 if ( len > LOGMNG_RECBUFSIZE )
339 {
340 Chain msg = "Log read buffer exceeded";
341 throw Exception(EXLOC, msg);
342 }
343 char buf[LOGMNG_RECBUFSIZE];
344
345 _pLog[tabSetId]->readByte(buf, len);
346
347 CegoLogRecord lr;
348 lr.decode(buf);
349
350 minlsn = lr.getLSN();
351
352 _logPos[tabSetId] += len + sizeof(int);
353
354 }
355 return minlsn;
356 }
357
getMaxLSN(int tabSetId)358 unsigned long long CegoLogManager::getMaxLSN(int tabSetId)
359 {
360 if (_pLog[tabSetId] == 0)
361 {
362 Chain msg = "No logfile set up for tableset";
363 throw Exception(EXLOC, msg);
364 }
365
366 _pLog[tabSetId]->seek(0);
367 _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
368
369 #ifdef CGDEBUG
370 log(_modId, Logger::DEBUG, Chain("Read logoffset ") + Chain(_logOffset[tabSetId]));
371 #endif
372
373 _logPos[tabSetId] = sizeof(int);
374
375 unsigned long long maxlsn=0;
376
377 while (_logPos[tabSetId] < _logOffset[tabSetId])
378 {
379 int len;
380 _pLog[tabSetId]->readByte((char*)&len, sizeof(int));
381
382 if ( len > LOGMNG_RECBUFSIZE )
383 {
384 Chain msg = "Log read buffer exceeded";
385 throw Exception(EXLOC, msg);
386 }
387 char buf[LOGMNG_RECBUFSIZE];
388
389 _pLog[tabSetId]->readByte(buf, len);
390
391 CegoLogRecord lr;
392 lr.decode(buf);
393
394 if ( lr.getLSN() > maxlsn )
395 maxlsn = lr.getLSN();
396
397 _logPos[tabSetId] += len + sizeof(int);
398
399 }
400 return maxlsn;
401 }
402
seekToStart(int tabSetId)403 void CegoLogManager::seekToStart(int tabSetId)
404 {
405 if (_pLog[tabSetId] == 0)
406 {
407 Chain msg = "No logfile set up for tableset";
408 throw Exception(EXLOC, msg);
409 }
410
411 _pLog[tabSetId]->seek(0);
412 _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
413 _logPos[tabSetId] = sizeof(int);
414 }
415
logSeek(int tabSetId,unsigned long long lsn)416 bool CegoLogManager::logSeek(int tabSetId, unsigned long long lsn)
417 {
418 if (_pLog[tabSetId] == 0)
419 {
420 Chain msg = "No logfile set up for tableset";
421 throw Exception(EXLOC, msg);
422 }
423
424 _pLog[tabSetId]->seek(0);
425 _pLog[tabSetId]->readByte((char*)&_logOffset[tabSetId], sizeof(int));
426
427 _logPos[tabSetId] = sizeof(int);
428
429 unsigned long long loglsn=0;
430
431 while (_logPos[tabSetId] < _logOffset[tabSetId])
432 {
433 int len;
434 _pLog[tabSetId]->readByte((char*)&len, sizeof(int));
435
436 if ( len > LOGMNG_RECBUFSIZE )
437 {
438 Chain msg = "Log read buffer exceeded";
439 throw Exception(EXLOC, msg);
440 }
441 char buf[LOGMNG_RECBUFSIZE];
442
443 _pLog[tabSetId]->readByte(buf, len);
444
445 CegoLogRecord lr;
446 lr.decode(buf);
447
448 loglsn = lr.getLSN();
449
450 if ( loglsn == lsn )
451 {
452 _pLog[tabSetId]->seek(_logPos[tabSetId]);
453 return true;
454 }
455 else if ( loglsn > lsn )
456 {
457
458 Chain msg = "LSN=" + Chain(lsn) + " too small, not found in log";
459 throw Exception(EXLOC, msg);
460 }
461
462 _logPos[tabSetId] += len + sizeof(int);
463 }
464
465 if ( loglsn + 1 == lsn )
466 {
467 return false;
468 }
469 else
470 {
471 Chain msg = "LSN=" + Chain(lsn) + " too high, log delta is missing";
472 throw Exception(EXLOC, msg);
473 }
474 }
475
logRead(int tabSetId,CegoLogRecord & logRec)476 bool CegoLogManager::logRead(int tabSetId, CegoLogRecord& logRec)
477 {
478 if (_logPos[tabSetId] < _logOffset[tabSetId])
479 {
480 int len;
481 _pLog[tabSetId]->readByte((char*)&len, sizeof(int));
482
483 if ( len > LOGMNG_RECBUFSIZE )
484 {
485 Chain msg = "Log read buffer exceeded";
486 throw Exception(EXLOC, msg);
487 }
488
489 char buf[LOGMNG_RECBUFSIZE];
490
491 _pLog[tabSetId]->readByte(buf, len);
492
493 logRec.decode(buf);
494
495 _logPos[tabSetId] += len + sizeof(int);
496
497 return true;
498 }
499 return false;
500 }
501
switchLogFile(int tabSetId)502 bool CegoLogManager::switchLogFile(int tabSetId)
503 {
504 if ( _logActive[tabSetId] == false )
505 return true;
506
507 if ( _pLogHandler[tabSetId] == 0 )
508 {
509 Chain tableSet = getTabSetName(tabSetId);
510
511 ListT<Chain> lfList;
512 ListT<int> sizeList;
513 ListT<Chain> statusList;
514
515 getLogFileInfo(tableSet, lfList, sizeList, statusList);
516
517 Chain *pLogFile = lfList.First();
518 Chain *pStatus = statusList.First();
519
520 bool found = false;
521 while ( pLogFile && pStatus && found == false)
522 {
523 if ( *pStatus == Chain(XML_ACTIVE_VALUE))
524 {
525 Chain *pNextLogFile = lfList.Next();
526 Chain *pNextLogStatus = statusList.Next();
527 if ( pNextLogFile == 0)
528 {
529 pNextLogFile = lfList.First();
530 pNextLogStatus = statusList.First();
531 }
532
533
534 if ( isArchiveMode(tabSetId) )
535 {
536 // if next logfile is still occupied, we skip switch and return false
537 if ( *pNextLogStatus == Chain(XML_OCCUPIED_VALUE))
538 {
539 return false;
540 }
541
542 // first we have to switch log file to synchronize with CegoLogThreadPool::shiftRedoLogs()
543 setLogFile(tabSetId, *pNextLogFile, false);
544 setLogFileStatus(tableSet, *pLogFile, XML_OCCUPIED_VALUE);
545 }
546 else
547 {
548 // switch log file first also for non archiving mode
549 setLogFile(tabSetId, *pNextLogFile, false);
550 setLogFileStatus(tableSet, *pLogFile, XML_FREE_VALUE);
551 }
552
553 setLogFileStatus(tableSet, *pNextLogFile, XML_ACTIVE_VALUE);
554
555 log(_modId, Logger::NOTICE, Chain("Logfile switch to logfile ") + *pNextLogFile + Chain(" for tableSet ") + tableSet);
556
557 found = true;
558 }
559 else
560 {
561 pLogFile = lfList.Next();
562 pStatus = statusList.Next();
563 }
564 }
565
566 doc2Xml();
567
568 resetLog(tabSetId);
569 startLog(tabSetId);
570 }
571
572 CegoLogRecord lr;
573 lr.setAction(CegoLogRecord::LOGREC_SYNC);
574 logAction(tabSetId, lr);
575
576 return true;
577 }
578
setActiveLogFile(const Chain & tableSet)579 void CegoLogManager::setActiveLogFile(const Chain& tableSet)
580 {
581 int tabSetId = getTabSetId(tableSet);
582
583 ListT<Chain> lfList;
584 ListT<int> sizeList;
585 ListT<Chain> statusList;
586
587 getLogFileInfo(tableSet, lfList, sizeList, statusList);
588
589 Chain *pLogFile = lfList.First();
590 Chain *pStatus = statusList.First();
591
592 while ( pLogFile && pStatus )
593 {
594 if ( *pStatus == Chain(XML_ACTIVE_VALUE))
595 {
596 log(_modId, Logger::NOTICE, Chain("Setting active logfile to ") + Chain(*pLogFile) + Chain(" ..."));
597 setLogFile(tabSetId, *pLogFile, false);
598 return;
599 }
600 else
601 {
602 pLogFile = lfList.Next();
603 pStatus = statusList.Next();
604 }
605 }
606 }
607