1 /*
2 ** 2011-08-13
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** This file contains the implementation of LSM database logging. Logging
14 ** has one purpose in LSM - to make transactions durable.
15 **
16 ** When data is written to an LSM database, it is initially stored in an
17 ** in-memory tree structure. Since this structure is in volatile memory,
18 ** if a power failure or application crash occurs it may be lost. To
19 ** prevent loss of data in this case, each time a record is written to the
20 ** in-memory tree an equivalent record is appended to the log on disk.
21 ** If a power failure or application crash does occur, data can be recovered
22 ** by reading the log.
23 **
24 ** A log file consists of the following types of records representing data
25 ** written into the database:
26 **
27 **   LOG_WRITE:  A key-value pair written to the database.
28 **   LOG_DELETE: A delete key issued to the database.
29 **   LOG_COMMIT: A transaction commit.
30 **
31 ** And the following types of records for ancillary purposes..
32 **
33 **   LOG_EOF:    A record indicating the end of a log file.
34 **   LOG_PAD1:   A single byte padding record.
35 **   LOG_PAD2:   An N byte padding record (N>1).
36 **   LOG_JUMP:   A pointer to another offset within the log file.
37 **
38 ** Each transaction written to the log contains one or more LOG_WRITE and/or
39 ** LOG_DELETE records, followed by a LOG_COMMIT record. The LOG_COMMIT record
40 ** contains an 8-byte checksum based on all previous data written to the
41 ** log file.
42 **
43 ** LOG CHECKSUMS & RECOVERY
44 **
45 **   Checksums are found in two types of log records: LOG_COMMIT and
46 **   LOG_CKSUM records. In order to recover content from a log, a client
47 **   reads each record from the start of the log, calculating a checksum as
48 **   it does. Each time a LOG_COMMIT or LOG_CKSUM is encountered, the
49 **   recovery process verifies that the checksum stored in the log
50 **   matches the calculated checksum. If it does not, the recovery process
51 **   can stop reading the log.
52 **
53 **   If a recovery process reads records (other than COMMIT or CKSUM)
54 **   consisting of at least LSM_CKSUM_MAXDATA bytes, then the next record in
55 **   the log must be either a LOG_CKSUM or LOG_COMMIT record. If it is
56 **   not, the recovery process also stops reading the log.
57 **
58 **   To recover the log file, it must be read twice. The first time to
59 **   determine the location of the last valid commit record. And the second
60 **   time to load data into the in-memory tree.
61 **
62 **   Todo: Surely there is a better way...
63 **
64 ** LOG WRAPPING
65 **
66 **   If the log file were never deleted or wrapped, it would be possible to
67 **   read it from start to end each time is required recovery (i.e each time
68 **   the number of database clients changes from 0 to 1). Effectively reading
69 **   the entire history of the database each time. This would quickly become
70 **   inefficient. Additionally, since the log file would grow without bound,
71 **   it wastes storage space.
72 **
73 **   Instead, part of each checkpoint written into the database file contains
74 **   a log offset (and other information required to read the log starting at
75 **   at this offset) at which to begin recovery. Offset $O.
76 **
77 **   Once a checkpoint has been written and synced into the database file, it
78 **   is guaranteed that no recovery process will need to read any data before
79 **   offset $O of the log file. It is therefore safe to begin overwriting
80 **   any data that occurs before offset $O.
81 **
82 **   This implementation separates the log into three regions mapped into
83 **   the log file - regions 0, 1 and 2. During recovery, regions are read
84 **   in ascending order (i.e. 0, then 1, then 2). Each region is zero or
85 **   more bytes in size.
86 **
87 **     |---1---|..|--0--|.|--2--|....
88 **
89 **   New records are always appended to the end of region 2.
90 **
91 **   Initially (when it is empty), all three regions are zero bytes in size.
92 **   Each of them are located at the beginning of the file. As records are
93 **   added to the log, region 2 grows, so that the log consists of a zero
94 **   byte region 1, followed by a zero byte region 0, followed by an N byte
95 **   region 2. After one or more checkpoints have been written to disk,
96 **   the start point of region 2 is moved to $O. For example:
97 **
98 **     A) ||.........|--2--|....
99 **
100 **   (both regions 0 and 1 are 0 bytes in size at offset 0).
101 **
102 **   Eventually, the log wraps around to write new records into the start.
103 **   At this point, region 2 is renamed to region 0. Region 0 is renamed
104 **   to region 2. After appending a few records to the new region 2, the
105 **   log file looks like this:
106 **
107 **     B) ||--2--|...|--0--|....
108 **
109 **   (region 1 is still 0 bytes in size, located at offset 0).
110 **
111 **   Any checkpoints made at this point may reduce the size of region 0.
112 **   However, if they do not, and region 2 expands so that it is about to
113 **   overwrite the start of region 0, then region 2 is renamed to region 1,
114 **   and a new region 2 created at the end of the file following the existing
115 **   region 0.
116 **
117 **     C) |---1---|..|--0--|.|-2-|
118 **
119 **   In this state records are appended to region 2 until checkpoints have
120 **   contracted regions 0 AND 1 UNTil they are both zero bytes in size. They
121 **   are then shifted to the start of the log file, leaving the system in
122 **   the equivalent of state A above.
123 **
124 **   Alternatively, state B may transition directly to state A if the size
125 **   of region 0 is reduced to zero bytes before region 2 threatens to
126 **   encroach upon it.
127 **
128 ** LOG_PAD1 & LOG_PAD2 RECORDS
129 **
130 **   PAD1 and PAD2 records may appear in a log file at any point. They allow
131 **   a process writing the log file align the beginning of transactions with
132 **   the beginning of disk sectors, which increases robustness.
133 **
134 ** RECORD FORMATS:
135 **
136 **   LOG_EOF:    * A single 0x00 byte.
137 **
138 **   LOG_PAD1:   * A single 0x01 byte.
139 **
140 **   LOG_PAD2:   * A single 0x02 byte, followed by
141 **               * The number of unused bytes (N) as a varint,
142 **               * An N byte block of unused space.
143 **
144 **   LOG_COMMIT: * A single 0x03 byte.
145 **               * An 8-byte checksum.
146 **
147 **   LOG_JUMP:   * A single 0x04 byte.
148 **               * Absolute file offset to jump to, encoded as a varint.
149 **
150 **   LOG_WRITE:  * A single 0x06 or 0x07 byte,
151 **               * The number of bytes in the key, encoded as a varint,
152 **               * The number of bytes in the value, encoded as a varint,
153 **               * If the first byte was 0x07, an 8 byte checksum.
154 **               * The key data,
155 **               * The value data.
156 **
157 **   LOG_DELETE: * A single 0x08 or 0x09 byte,
158 **               * The number of bytes in the key, encoded as a varint,
159 **               * If the first byte was 0x09, an 8 byte checksum.
160 **               * The key data.
161 **
162 **   Varints are as described in lsm_varint.c (SQLite 4 format).
163 **
164 ** CHECKSUMS:
165 **
166 **   The checksum is calculated using two 32-bit unsigned integers, s0 and
167 **   s1. The initial value for both is 42. It is updated each time a record
168 **   is written into the log file by treating the encoded (binary) record as
169 **   an array of 32-bit little-endian integers. Then, if x[] is the integer
170 **   array, updating the checksum accumulators as follows:
171 **
172 **     for i from 0 to n-1 step 2:
173 **       s0 += x[i] + s1;
174 **       s1 += x[i+1] + s0;
175 **     endfor
176 **
177 **   If the record is not an even multiple of 8-bytes in size it is padded
178 **   with zeroes to make it so before the checksum is updated.
179 **
180 **   The checksum stored in a COMMIT, WRITE or DELETE is based on all bytes
181 **   up to the start of the 8-byte checksum itself, including the COMMIT,
182 **   WRITE or DELETE fields that appear before the checksum in the record.
183 **
184 ** VARINT FORMAT
185 **
186 ** See lsm_varint.c.
187 */
188 
189 #ifndef _LSM_INT_H
190 # include "lsmInt.h"
191 #endif
192 
193 /* Log record types */
194 #define LSM_LOG_EOF          0x00
195 #define LSM_LOG_PAD1         0x01
196 #define LSM_LOG_PAD2         0x02
197 #define LSM_LOG_COMMIT       0x03
198 #define LSM_LOG_JUMP         0x04
199 
200 #define LSM_LOG_WRITE        0x06
201 #define LSM_LOG_WRITE_CKSUM  0x07
202 
203 #define LSM_LOG_DELETE       0x08
204 #define LSM_LOG_DELETE_CKSUM 0x09
205 
206 #define LSM_LOG_DRANGE       0x0A
207 #define LSM_LOG_DRANGE_CKSUM 0x0B
208 
209 /* Require a checksum every 32KB. */
210 #define LSM_CKSUM_MAXDATA (32*1024)
211 
212 /* Do not wrap a log file smaller than this in bytes. */
213 #define LSM_MIN_LOGWRAP      (128*1024)
214 
215 /*
216 ** szSector:
217 **   Commit records must be aligned to end on szSector boundaries. If
218 **   the safety-mode is set to NORMAL or OFF, this value is 1. Otherwise,
219 **   if the safety-mode is set to FULL, it is the size of the file-system
220 **   sectors as reported by lsmFsSectorSize().
221 */
222 struct LogWriter {
223   u32 cksum0;                     /* Checksum 0 at offset iOff */
224   u32 cksum1;                     /* Checksum 1 at offset iOff */
225   int iCksumBuf;                  /* Bytes of buf that have been checksummed */
226   i64 iOff;                       /* Offset at start of buffer buf */
227   int szSector;                   /* Sector size for this transaction */
228   LogRegion jump;                 /* Avoid writing to this region */
229   i64 iRegion1End;                /* End of first region written by trans */
230   i64 iRegion2Start;              /* Start of second regions written by trans */
231   LsmString buf;                  /* Buffer containing data not yet written */
232 };
233 
234 /*
235 ** Return the result of interpreting the first 4 bytes in buffer aIn as
236 ** a 32-bit unsigned little-endian integer.
237 */
getU32le(u8 * aIn)238 static u32 getU32le(u8 *aIn){
239   return ((u32)aIn[3] << 24)
240        + ((u32)aIn[2] << 16)
241        + ((u32)aIn[1] << 8)
242        + ((u32)aIn[0]);
243 }
244 
245 
246 /*
247 ** This function is the same as logCksum(), except that pointer "a" need
248 ** not be aligned to an 8-byte boundary or padded with zero bytes. This
249 ** version is slower, but sometimes more convenient to use.
250 */
logCksumUnaligned(char * z,int n,u32 * pCksum0,u32 * pCksum1)251 static void logCksumUnaligned(
252   char *z,                        /* Input buffer */
253   int n,                          /* Size of input buffer in bytes */
254   u32 *pCksum0,                   /* IN/OUT: Checksum value 1 */
255   u32 *pCksum1                    /* IN/OUT: Checksum value 2 */
256 ){
257   u8 *a = (u8 *)z;
258   u32 cksum0 = *pCksum0;
259   u32 cksum1 = *pCksum1;
260   int nIn = (n/8) * 8;
261   int i;
262 
263   assert( n>0 );
264   for(i=0; i<nIn; i+=8){
265     cksum0 += getU32le(&a[i]) + cksum1;
266     cksum1 += getU32le(&a[i+4]) + cksum0;
267   }
268 
269   if( nIn!=n ){
270     u8 aBuf[8] = {0, 0, 0, 0, 0, 0, 0, 0};
271     assert( (n-nIn)<8 && n>nIn );
272     memcpy(aBuf, &a[nIn], n-nIn);
273     cksum0 += getU32le(aBuf) + cksum1;
274     cksum1 += getU32le(&aBuf[4]) + cksum0;
275   }
276 
277   *pCksum0 = cksum0;
278   *pCksum1 = cksum1;
279 }
280 
281 /*
282 ** Update pLog->cksum0 and pLog->cksum1 so that the first nBuf bytes in the
283 ** write buffer (pLog->buf) are included in the checksum.
284 */
logUpdateCksum(LogWriter * pLog,int nBuf)285 static void logUpdateCksum(LogWriter *pLog, int nBuf){
286   assert( (pLog->iCksumBuf % 8)==0 );
287   assert( pLog->iCksumBuf<=nBuf );
288   assert( (nBuf % 8)==0 || nBuf==pLog->buf.n );
289   if( nBuf>pLog->iCksumBuf ){
290     logCksumUnaligned(
291         &pLog->buf.z[pLog->iCksumBuf], nBuf-pLog->iCksumBuf,
292         &pLog->cksum0, &pLog->cksum1
293     );
294   }
295   pLog->iCksumBuf = nBuf;
296 }
297 
firstByteOnSector(LogWriter * pLog,i64 iOff)298 static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
299   return (iOff / pLog->szSector) * pLog->szSector;
300 }
lastByteOnSector(LogWriter * pLog,i64 iOff)301 static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
302   return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
303 }
304 
305 /*
306 ** If possible, reclaim log file space. Log file space is reclaimed after
307 ** a snapshot that points to the same data in the database file is synced
308 ** into the db header.
309 */
logReclaimSpace(lsm_db * pDb)310 static int logReclaimSpace(lsm_db *pDb){
311   int rc;
312   int iMeta;
313   int bRotrans;                   /* True if there exists some ro-trans */
314 
315   /* Test if there exists some other connection with a read-only transaction
316   ** open. If there does, then log file space may not be reclaimed.  */
317   rc = lsmDetectRoTrans(pDb, &bRotrans);
318   if( rc!=LSM_OK || bRotrans ) return rc;
319 
320   iMeta = (int)pDb->pShmhdr->iMetaPage;
321   if( iMeta==1 || iMeta==2 ){
322     DbLog *pLog = &pDb->treehdr.log;
323     i64 iSyncedId;
324 
325     /* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note
326     ** that in theory, the value read is untrustworthy (due to a race
327     ** condition - see comments above lsmFsReadSyncedId()). So it is only
328     ** ever used to conclude that no log space can be reclaimed. If it seems
329     ** to indicate that it may be possible to reclaim log space, a
330     ** second call to lsmCheckpointSynced() (which does return trustworthy
331     ** values) is made below to confirm.  */
332     rc = lsmFsReadSyncedId(pDb, iMeta, &iSyncedId);
333 
334     if( rc==LSM_OK && pLog->iSnapshotId!=iSyncedId ){
335       i64 iSnapshotId = 0;
336       i64 iOff = 0;
337       rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff, 0);
338       if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
339         int iRegion;
340         for(iRegion=0; iRegion<3; iRegion++){
341           LogRegion *p = &pLog->aRegion[iRegion];
342           if( iOff>=p->iStart && iOff<=p->iEnd ) break;
343           p->iStart = 0;
344           p->iEnd = 0;
345         }
346         assert( iRegion<3 );
347         pLog->aRegion[iRegion].iStart = iOff;
348         pLog->iSnapshotId = iSnapshotId;
349       }
350     }
351   }
352   return rc;
353 }
354 
355 /*
356 ** This function is called when a write-transaction is first opened. It
357 ** is assumed that the caller is holding the client-mutex when it is
358 ** called.
359 **
360 ** Before returning, this function allocates the LogWriter object that
361 ** will be used to write to the log file during the write transaction.
362 ** LSM_OK is returned if no error occurs, otherwise an LSM error code.
363 */
lsmLogBegin(lsm_db * pDb)364 int lsmLogBegin(lsm_db *pDb){
365   int rc = LSM_OK;
366   LogWriter *pNew;
367   LogRegion *aReg;
368 
369   if( pDb->bUseLog==0 ) return LSM_OK;
370 
371   /* If the log file has not yet been opened, open it now. Also allocate
372   ** the LogWriter structure, if it has not already been allocated.  */
373   rc = lsmFsOpenLog(pDb, 0);
374   if( pDb->pLogWriter==0 ){
375     pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
376     if( pNew ){
377       lsmStringInit(&pNew->buf, pDb->pEnv);
378       rc = lsmStringExtend(&pNew->buf, 2);
379     }
380     pDb->pLogWriter = pNew;
381   }else{
382     pNew = pDb->pLogWriter;
383     assert( (u8 *)(&pNew[1])==(u8 *)(&((&pNew->buf)[1])) );
384     memset(pNew, 0, ((u8 *)&pNew->buf) - (u8 *)pNew);
385     pNew->buf.n = 0;
386   }
387 
388   if( rc==LSM_OK ){
389     /* The following call detects whether or not a new snapshot has been
390     ** synced into the database file. If so, it updates the contents of
391     ** the pDb->treehdr.log structure to reclaim any space in the log
392     ** file that is no longer required.
393     **
394     ** TODO: Calling this every transaction is overkill. And since the
395     ** call has to read and checksum a snapshot from the database file,
396     ** it is expensive. It would be better to figure out a way so that
397     ** this is only called occasionally - say for every 32KB written to
398     ** the log file.
399     */
400     rc = logReclaimSpace(pDb);
401   }
402   if( rc!=LSM_OK ){
403     lsmLogClose(pDb);
404     return rc;
405   }
406 
407   /* Set the effective sector-size for this transaction. Sectors are assumed
408   ** to be one byte in size if the safety-mode is OFF or NORMAL, or as
409   ** reported by lsmFsSectorSize if it is FULL.  */
410   if( pDb->eSafety==LSM_SAFETY_FULL ){
411     pNew->szSector = lsmFsSectorSize(pDb->pFS);
412     assert( pNew->szSector>0 );
413   }else{
414     pNew->szSector = 1;
415   }
416 
417   /* There are now three scenarios:
418   **
419   **   1) Regions 0 and 1 are both zero bytes in size and region 2 begins
420   **      at a file offset greater than LSM_MIN_LOGWRAP. In this case, wrap
421   **      around to the start and write data into the start of the log file.
422   **
423   **   2) Region 1 is zero bytes in size and region 2 occurs earlier in the
424   **      file than region 0. In this case, append data to region 2, but
425   **      remember to jump over region 1 if required.
426   **
427   **   3) Region 2 is the last in the file. Append to it.
428   */
429   aReg = &pDb->treehdr.log.aRegion[0];
430 
431   assert( aReg[0].iEnd==0 || aReg[0].iEnd>aReg[0].iStart );
432   assert( aReg[1].iEnd==0 || aReg[1].iEnd>aReg[1].iStart );
433 
434   pNew->cksum0 = pDb->treehdr.log.cksum0;
435   pNew->cksum1 = pDb->treehdr.log.cksum1;
436 
437   if( aReg[0].iEnd==0 && aReg[1].iEnd==0 && aReg[2].iStart>=LSM_MIN_LOGWRAP ){
438     /* Case 1. Wrap around to the start of the file. Write an LSM_LOG_JUMP
439     ** into the log file in this case. Pad it out to 8 bytes using a PAD2
440     ** record so that the checksums can be updated immediately.  */
441     u8 aJump[] = {
442       LSM_LOG_PAD2, 0x04, 0x00, 0x00, 0x00, 0x00, LSM_LOG_JUMP, 0x00
443     };
444 
445     lsmStringBinAppend(&pNew->buf, aJump, sizeof(aJump));
446     logUpdateCksum(pNew, pNew->buf.n);
447     rc = lsmFsWriteLog(pDb->pFS, aReg[2].iEnd, &pNew->buf);
448     pNew->iCksumBuf = pNew->buf.n = 0;
449 
450     aReg[2].iEnd += 8;
451     pNew->jump = aReg[0] = aReg[2];
452     aReg[2].iStart = aReg[2].iEnd = 0;
453   }else if( aReg[1].iEnd==0 && aReg[2].iEnd<aReg[0].iEnd ){
454     /* Case 2. */
455     pNew->iOff = aReg[2].iEnd;
456     pNew->jump = aReg[0];
457   }else{
458     /* Case 3. */
459     assert( aReg[2].iStart>=aReg[0].iEnd && aReg[2].iStart>=aReg[1].iEnd );
460     pNew->iOff = aReg[2].iEnd;
461   }
462 
463   if( pNew->jump.iStart ){
464     i64 iRound;
465     assert( pNew->jump.iStart>pNew->iOff );
466 
467     iRound = firstByteOnSector(pNew, pNew->jump.iStart);
468     if( iRound>pNew->iOff ) pNew->jump.iStart = iRound;
469     pNew->jump.iEnd = lastByteOnSector(pNew, pNew->jump.iEnd);
470   }
471 
472   assert( pDb->pLogWriter==pNew );
473   return rc;
474 }
475 
476 /*
477 ** This function is called when a write-transaction is being closed.
478 ** Parameter bCommit is true if the transaction is being committed,
479 ** or false otherwise. The caller must hold the client-mutex to call
480 ** this function.
481 **
482 ** A call to this function deletes the LogWriter object allocated by
483 ** lsmLogBegin(). If the transaction is being committed, the shared state
484 ** in *pLog is updated before returning.
485 */
lsmLogEnd(lsm_db * pDb,int bCommit)486 void lsmLogEnd(lsm_db *pDb, int bCommit){
487   DbLog *pLog;
488   LogWriter *p;
489   p = pDb->pLogWriter;
490 
491   if( p==0 ) return;
492   pLog = &pDb->treehdr.log;
493 
494   if( bCommit ){
495     pLog->aRegion[2].iEnd = p->iOff;
496     pLog->cksum0 = p->cksum0;
497     pLog->cksum1 = p->cksum1;
498     if( p->iRegion1End ){
499       /* This happens when the transaction had to jump over some other
500       ** part of the log.  */
501       assert( pLog->aRegion[1].iEnd==0 );
502       assert( pLog->aRegion[2].iStart<p->iRegion1End );
503       pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
504       pLog->aRegion[1].iEnd = p->iRegion1End;
505       pLog->aRegion[2].iStart = p->iRegion2Start;
506     }
507   }
508 }
509 
jumpIfRequired(lsm_db * pDb,LogWriter * pLog,int nReq,int * pbJump)510 static int jumpIfRequired(
511   lsm_db *pDb,
512   LogWriter *pLog,
513   int nReq,
514   int *pbJump
515 ){
516   /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the
517   ** jump region before writing the LSM_LOG_WRITE or DELETE record. This
518   ** is necessary if there is insufficient room between the current offset
519   ** and the jump region to fit the new WRITE/DELETE record and the largest
520   ** possible JUMP record with up to 7 bytes of padding (a total of 17
521   ** bytes).  */
522   if( (pLog->jump.iStart > (pLog->iOff + pLog->buf.n))
523    && (pLog->jump.iStart < (pLog->iOff + pLog->buf.n + (nReq + 17)))
524   ){
525     int rc;                       /* Return code */
526     i64 iJump;                    /* Offset to jump to */
527     u8 aJump[10];                 /* Encoded jump record */
528     int nJump;                    /* Valid bytes in aJump[] */
529     int nPad;                     /* Bytes of padding required */
530 
531     /* Serialize the JUMP record */
532     iJump = pLog->jump.iEnd+1;
533     aJump[0] = LSM_LOG_JUMP;
534     nJump = 1 + lsmVarintPut64(&aJump[1], iJump);
535 
536     /* Adding padding to the contents of the buffer so that it will be a
537     ** multiple of 8 bytes in size after the JUMP record is appended. This
538     ** is not strictly required, it just makes the keeping the running
539     ** checksum up to date in this file a little simpler.  */
540     nPad = (pLog->buf.n + nJump) % 8;
541     if( nPad ){
542       u8 aPad[7] = {0,0,0,0,0,0,0};
543       nPad = 8-nPad;
544       if( nPad==1 ){
545         aPad[0] = LSM_LOG_PAD1;
546       }else{
547         aPad[0] = LSM_LOG_PAD2;
548         aPad[1] = (u8)(nPad-2);
549       }
550       rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
551       if( rc!=LSM_OK ) return rc;
552     }
553 
554     /* Append the JUMP record to the buffer. Then flush the buffer to disk
555     ** and update the checksums. The next write to the log file (assuming
556     ** there is no transaction rollback) will be to offset iJump (just past
557     ** the jump region).  */
558     rc = lsmStringBinAppend(&pLog->buf, aJump, nJump);
559     if( rc!=LSM_OK ) return rc;
560     assert( (pLog->buf.n % 8)==0 );
561     rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
562     if( rc!=LSM_OK ) return rc;
563     logUpdateCksum(pLog, pLog->buf.n);
564     pLog->iRegion1End = (pLog->iOff + pLog->buf.n);
565     pLog->iRegion2Start = iJump;
566     pLog->iOff = iJump;
567     pLog->iCksumBuf = pLog->buf.n = 0;
568     if( pbJump ) *pbJump = 1;
569   }
570 
571   return LSM_OK;
572 }
573 
logCksumAndFlush(lsm_db * pDb)574 static int logCksumAndFlush(lsm_db *pDb){
575   int rc;                         /* Return code */
576   LogWriter *pLog = pDb->pLogWriter;
577 
578   /* Calculate the checksum value. Append it to the buffer. */
579   logUpdateCksum(pLog, pLog->buf.n);
580   lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum0);
581   pLog->buf.n += 4;
582   lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum1);
583   pLog->buf.n += 4;
584 
585   /* Write the contents of the buffer to disk. */
586   rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
587   pLog->iOff += pLog->buf.n;
588   pLog->iCksumBuf = pLog->buf.n = 0;
589 
590   return rc;
591 }
592 
593 /*
594 ** Write the contents of the log-buffer to disk. Then write either a CKSUM
595 ** or COMMIT record, depending on the value of parameter eType.
596 */
logFlush(lsm_db * pDb,int eType)597 static int logFlush(lsm_db *pDb, int eType){
598   int rc;
599   int nReq;
600   LogWriter *pLog = pDb->pLogWriter;
601 
602   assert( eType==LSM_LOG_COMMIT );
603   assert( pLog );
604 
605   /* Commit record is always 9 bytes in size. */
606   nReq = 9;
607   if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ) nReq += pLog->szSector + 17;
608   rc = jumpIfRequired(pDb, pLog, nReq, 0);
609 
610   /* If this is a COMMIT, add padding to the log so that the COMMIT record
611   ** is aligned against the end of a disk sector. In other words, add padding
612   ** so that the first byte following the COMMIT record lies on a different
613   ** sector.  */
614   if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ){
615     int nPad;                     /* Bytes of padding to add */
616 
617     /* Determine the value of nPad. */
618     nPad = ((pLog->iOff + pLog->buf.n + 9) % pLog->szSector);
619     if( nPad ) nPad = pLog->szSector - nPad;
620     rc = lsmStringExtend(&pLog->buf, nPad);
621     if( rc!=LSM_OK ) return rc;
622 
623     while( nPad ){
624       if( nPad==1 ){
625         pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
626         nPad = 0;
627       }else{
628         int n = LSM_MIN(200, nPad-2);
629         pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
630         pLog->buf.z[pLog->buf.n++] = (char)n;
631         nPad -= 2;
632         memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
633         pLog->buf.n += n;
634         nPad -= n;
635       }
636     }
637   }
638 
639   /* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
640   ** record. Then add the first byte of it.  */
641   rc = lsmStringExtend(&pLog->buf, 9);
642   if( rc!=LSM_OK ) return rc;
643   pLog->buf.z[pLog->buf.n++] = (char)eType;
644   memset(&pLog->buf.z[pLog->buf.n], 0, 8);
645 
646   rc = logCksumAndFlush(pDb);
647 
648   /* If this is a commit and synchronous=full, sync the log to disk. */
649   if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
650     rc = lsmFsSyncLog(pDb->pFS);
651   }
652   return rc;
653 }
654 
655 /*
656 ** Append an LSM_LOG_WRITE (if nVal>=0) or LSM_LOG_DELETE (if nVal<0)
657 ** record to the database log.
658 */
lsmLogWrite(lsm_db * pDb,int eType,void * pKey,int nKey,void * pVal,int nVal)659 int lsmLogWrite(
660   lsm_db *pDb,                    /* Database handle */
661   int eType,
662   void *pKey, int nKey,           /* Database key to write to log */
663   void *pVal, int nVal            /* Database value (or nVal<0) to write */
664 ){
665   int rc = LSM_OK;
666   LogWriter *pLog;                /* Log object to write to */
667   int nReq;                       /* Bytes of space required in log */
668   int bCksum = 0;                 /* True to embed a checksum in this record */
669 
670   assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
671   assert( LSM_LOG_WRITE==LSM_WRITE );
672   assert( LSM_LOG_DELETE==LSM_DELETE );
673   assert( LSM_LOG_DRANGE==LSM_DRANGE );
674   assert( (eType==LSM_LOG_DELETE)==(nVal<0) );
675 
676   if( pDb->bUseLog==0 ) return LSM_OK;
677   pLog = pDb->pLogWriter;
678 
679   /* Determine how many bytes of space are required, assuming that a checksum
680   ** will be embedded in this record (even though it may not be).  */
681   nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
682   if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;
683 
684   /* Jump over the jump region if required. Set bCksum to true to tell the
685   ** code below to include a checksum in the record if either (a) writing
686   ** this record would mean that more than LSM_CKSUM_MAXDATA bytes of data
687   ** have been written to the log since the last checksum, or (b) the jump
688   ** is taken.  */
689   rc = jumpIfRequired(pDb, pLog, nReq, &bCksum);
690   if( (pLog->buf.n+nReq) > LSM_CKSUM_MAXDATA ) bCksum = 1;
691 
692   if( rc==LSM_OK ){
693     rc = lsmStringExtend(&pLog->buf, nReq);
694   }
695   if( rc==LSM_OK ){
696     u8 *a = (u8 *)&pLog->buf.z[pLog->buf.n];
697 
698     /* Write the record header - the type byte followed by either 1 (for
699     ** DELETE) or 2 (for WRITE) varints.  */
700     assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
701     assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
702     assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
703     *(a++) = (u8)eType | (u8)bCksum;
704     a += lsmVarintPut32(a, nKey);
705     if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);
706 
707     if( bCksum ){
708       pLog->buf.n = (a - (u8 *)pLog->buf.z);
709       rc = logCksumAndFlush(pDb);
710       a = (u8 *)&pLog->buf.z[pLog->buf.n];
711     }
712 
713     memcpy(a, pKey, nKey);
714     a += nKey;
715     if( eType!=LSM_LOG_DELETE ){
716       memcpy(a, pVal, nVal);
717       a += nVal;
718     }
719     pLog->buf.n = a - (u8 *)pLog->buf.z;
720     assert( pLog->buf.n<=pLog->buf.nAlloc );
721   }
722 
723   return rc;
724 }
725 
726 /*
727 ** Append an LSM_LOG_COMMIT record to the database log.
728 */
lsmLogCommit(lsm_db * pDb)729 int lsmLogCommit(lsm_db *pDb){
730   if( pDb->bUseLog==0 ) return LSM_OK;
731   return logFlush(pDb, LSM_LOG_COMMIT);
732 }
733 
734 /*
735 ** Store the current offset and other checksum related information in the
736 ** structure *pMark. Later, *pMark can be passed to lsmLogSeek() to "rewind"
737 ** the LogWriter object to the current log file offset. This is used when
738 ** rolling back savepoint transactions.
739 */
lsmLogTell(lsm_db * pDb,LogMark * pMark)740 void lsmLogTell(
741   lsm_db *pDb,                    /* Database handle */
742   LogMark *pMark                  /* Populate this object with current offset */
743 ){
744   LogWriter *pLog;
745   int nCksum;
746 
747   if( pDb->bUseLog==0 ) return;
748   pLog = pDb->pLogWriter;
749   nCksum = pLog->buf.n & 0xFFFFFFF8;
750   logUpdateCksum(pLog, nCksum);
751   assert( pLog->iCksumBuf==nCksum );
752   pMark->nBuf = pLog->buf.n - nCksum;
753   memcpy(pMark->aBuf, &pLog->buf.z[nCksum], pMark->nBuf);
754 
755   pMark->iOff = pLog->iOff + pLog->buf.n;
756   pMark->cksum0 = pLog->cksum0;
757   pMark->cksum1 = pLog->cksum1;
758 }
759 
760 /*
761 ** Seek (rewind) back to the log file offset stored by an ealier call to
762 ** lsmLogTell() in *pMark.
763 */
lsmLogSeek(lsm_db * pDb,LogMark * pMark)764 void lsmLogSeek(
765   lsm_db *pDb,                    /* Database handle */
766   LogMark *pMark                  /* Object containing log offset to seek to */
767 ){
768   LogWriter *pLog;
769 
770   if( pDb->bUseLog==0 ) return;
771   pLog = pDb->pLogWriter;
772 
773   assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
774   if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
775     pLog->buf.n = (int)(pMark->iOff - pLog->iOff);
776     pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
777   }else{
778     pLog->buf.n = pMark->nBuf;
779     memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
780     pLog->iCksumBuf = 0;
781     pLog->iOff = pMark->iOff - pMark->nBuf;
782   }
783   pLog->cksum0 = pMark->cksum0;
784   pLog->cksum1 = pMark->cksum1;
785 
786   if( pMark->iOff > pLog->iRegion1End ) pLog->iRegion1End = 0;
787   if( pMark->iOff > pLog->iRegion2Start ) pLog->iRegion2Start = 0;
788 }
789 
790 /*
791 ** This function does the work for an lsm_info(LOG_STRUCTURE) request.
792 */
lsmInfoLogStructure(lsm_db * pDb,char ** pzVal)793 int lsmInfoLogStructure(lsm_db *pDb, char **pzVal){
794   int rc = LSM_OK;
795   char *zVal = 0;
796 
797   /* If there is no read or write transaction open, read the latest
798   ** tree-header from shared-memory to report on. If necessary, update
799   ** it based on the contents of the database header.
800   **
801   ** No locks are taken here - these are passive read operations only.
802   */
803   if( pDb->pCsr==0 && pDb->nTransOpen==0 ){
804     rc = lsmTreeLoadHeader(pDb, 0);
805     if( rc==LSM_OK ) rc = logReclaimSpace(pDb);
806   }
807 
808   if( rc==LSM_OK ){
809     DbLog *pLog = &pDb->treehdr.log;
810     zVal = lsmMallocPrintf(pDb->pEnv,
811         "%d %d %d %d %d %d",
812         (int)pLog->aRegion[0].iStart, (int)pLog->aRegion[0].iEnd,
813         (int)pLog->aRegion[1].iStart, (int)pLog->aRegion[1].iEnd,
814         (int)pLog->aRegion[2].iStart, (int)pLog->aRegion[2].iEnd
815     );
816     if( !zVal ) rc = LSM_NOMEM_BKPT;
817   }
818 
819   *pzVal = zVal;
820   return rc;
821 }
822 
823 /*************************************************************************
824 ** Begin code for log recovery.
825 */
826 
827 typedef struct LogReader LogReader;
828 struct LogReader {
829   FileSystem *pFS;                /* File system to read from */
830   i64 iOff;                       /* File offset at end of buf content */
831   int iBuf;                       /* Current read offset in buf */
832   LsmString buf;                  /* Buffer containing file content */
833 
834   int iCksumBuf;                  /* Offset in buf corresponding to cksum[01] */
835   u32 cksum0;                     /* Checksum 0 at offset iCksumBuf */
836   u32 cksum1;                     /* Checksum 1 at offset iCksumBuf */
837 };
838 
logReaderBlob(LogReader * p,LsmString * pBuf,int nBlob,u8 ** ppBlob,int * pRc)839 static void logReaderBlob(
840   LogReader *p,                   /* Log reader object */
841   LsmString *pBuf,                /* Dynamic storage, if required */
842   int nBlob,                      /* Number of bytes to read */
843   u8 **ppBlob,                    /* OUT: Pointer to blob read */
844   int *pRc                        /* IN/OUT: Error code */
845 ){
846   static const int LOG_READ_SIZE = 512;
847   int rc = *pRc;                  /* Return code */
848   int nReq = nBlob;               /* Bytes required */
849 
850   while( rc==LSM_OK && nReq>0 ){
851     int nAvail;                   /* Bytes of data available in p->buf */
852     if( p->buf.n==p->iBuf ){
853       int nCksum;                 /* Total bytes requiring checksum */
854       int nCarry = 0;             /* Total bytes requiring checksum */
855 
856       nCksum = p->iBuf - p->iCksumBuf;
857       if( nCksum>0 ){
858         nCarry = nCksum % 8;
859         nCksum = ((nCksum / 8) * 8);
860         if( nCksum>0 ){
861           logCksumUnaligned(
862               &p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1
863           );
864         }
865       }
866       if( nCarry>0 ) memcpy(p->buf.z, &p->buf.z[p->iBuf-nCarry], nCarry);
867       p->buf.n = nCarry;
868       p->iBuf = nCarry;
869 
870       rc = lsmFsReadLog(p->pFS, p->iOff, LOG_READ_SIZE, &p->buf);
871       if( rc!=LSM_OK ) break;
872       p->iCksumBuf = 0;
873       p->iOff += LOG_READ_SIZE;
874     }
875 
876     nAvail = p->buf.n - p->iBuf;
877     if( ppBlob && nReq==nBlob && nBlob<=nAvail ){
878       *ppBlob = (u8 *)&p->buf.z[p->iBuf];
879       p->iBuf += nBlob;
880       nReq = 0;
881     }else{
882       int nCopy = LSM_MIN(nAvail, nReq);
883       if( nBlob==nReq ){
884         pBuf->n = 0;
885       }
886       rc = lsmStringBinAppend(pBuf, (u8 *)&p->buf.z[p->iBuf], nCopy);
887       nReq -= nCopy;
888       p->iBuf += nCopy;
889       if( nReq==0 && ppBlob ){
890         *ppBlob = (u8*)pBuf->z;
891       }
892     }
893   }
894 
895   *pRc = rc;
896 }
897 
logReaderVarint(LogReader * p,LsmString * pBuf,int * piVal,int * pRc)898 static void logReaderVarint(
899   LogReader *p,
900   LsmString *pBuf,
901   int *piVal,                     /* OUT: Value read from log */
902   int *pRc                        /* IN/OUT: Error code */
903 ){
904   if( *pRc==LSM_OK ){
905     u8 *aVarint;
906     if( p->buf.n==p->iBuf ){
907       logReaderBlob(p, 0, 10, &aVarint, pRc);
908       if( LSM_OK==*pRc ) p->iBuf -= (10 - lsmVarintGet32(aVarint, piVal));
909     }else{
910       logReaderBlob(p, pBuf, lsmVarintSize(p->buf.z[p->iBuf]), &aVarint, pRc);
911       if( LSM_OK==*pRc ) lsmVarintGet32(aVarint, piVal);
912     }
913   }
914 }
915 
logReaderByte(LogReader * p,u8 * pByte,int * pRc)916 static void logReaderByte(LogReader *p, u8 *pByte, int *pRc){
917   u8 *pPtr = 0;
918   logReaderBlob(p, 0, 1, &pPtr, pRc);
919   if( pPtr ) *pByte = *pPtr;
920 }
921 
logReaderCksum(LogReader * p,LsmString * pBuf,int * pbEof,int * pRc)922 static void logReaderCksum(LogReader *p, LsmString *pBuf, int *pbEof, int *pRc){
923   if( *pRc==LSM_OK ){
924     u8 *pPtr = 0;
925     u32 cksum0, cksum1;
926     int nCksum = p->iBuf - p->iCksumBuf;
927 
928     /* Update in-memory (expected) checksums */
929     assert( nCksum>=0 );
930     logCksumUnaligned(&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1);
931     p->iCksumBuf = p->iBuf + 8;
932     logReaderBlob(p, pBuf, 8, &pPtr, pRc);
933     assert( pPtr || *pRc );
934 
935     /* Read the checksums from the log file. Set *pbEof if they do not match. */
936     if( pPtr ){
937       cksum0 = lsmGetU32(pPtr);
938       cksum1 = lsmGetU32(&pPtr[4]);
939       *pbEof = (cksum0!=p->cksum0 || cksum1!=p->cksum1);
940       p->iCksumBuf = p->iBuf;
941     }
942   }
943 }
944 
logReaderInit(lsm_db * pDb,DbLog * pLog,int bInitBuf,LogReader * p)945 static void logReaderInit(
946   lsm_db *pDb,                    /* Database handle */
947   DbLog *pLog,                    /* Log object associated with pDb */
948   int bInitBuf,                   /* True if p->buf is uninitialized */
949   LogReader *p                    /* Initialize this LogReader object */
950 ){
951   p->pFS = pDb->pFS;
952   p->iOff = pLog->aRegion[2].iStart;
953   p->cksum0 = pLog->cksum0;
954   p->cksum1 = pLog->cksum1;
955   if( bInitBuf ){ lsmStringInit(&p->buf, pDb->pEnv); }
956   p->buf.n = 0;
957   p->iCksumBuf = 0;
958   p->iBuf = 0;
959 }
960 
961 /*
962 ** This function is called after reading the header of a LOG_DELETE or
963 ** LOG_WRITE record. Parameter nByte is the total size of the key and
964 ** value that follow the header just read. Return true if the size and
965 ** position of the record indicate that it should contain a checksum.
966 */
logRequireCksum(LogReader * p,int nByte)967 static int logRequireCksum(LogReader *p, int nByte){
968   return ((p->iBuf + nByte - p->iCksumBuf) > LSM_CKSUM_MAXDATA);
969 }
970 
971 /*
972 ** Recover the contents of the log file.
973 */
lsmLogRecover(lsm_db * pDb)974 int lsmLogRecover(lsm_db *pDb){
975   LsmString buf1;                 /* Key buffer */
976   LsmString buf2;                 /* Value buffer */
977   LogReader reader;               /* Log reader object */
978   int rc = LSM_OK;                /* Return code */
979   int nCommit = 0;                /* Number of transactions to recover */
980   int iPass;
981   int nJump = 0;                  /* Number of LSM_LOG_JUMP records in pass 0 */
982   DbLog *pLog;
983   int bOpen;
984 
985   rc = lsmFsOpenLog(pDb, &bOpen);
986   if( rc!=LSM_OK ) return rc;
987 
988   rc = lsmTreeInit(pDb);
989   if( rc!=LSM_OK ) return rc;
990 
991   pLog = &pDb->treehdr.log;
992   lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog);
993 
994   logReaderInit(pDb, pLog, 1, &reader);
995   lsmStringInit(&buf1, pDb->pEnv);
996   lsmStringInit(&buf2, pDb->pEnv);
997 
998   /* The outer for() loop runs at most twice. The first iteration is to
999   ** count the number of committed transactions in the log. The second
1000   ** iterates through those transactions and updates the in-memory tree
1001   ** structure with their contents.  */
1002   if( bOpen ){
1003     for(iPass=0; iPass<2 && rc==LSM_OK; iPass++){
1004       int bEof = 0;
1005 
1006       while( rc==LSM_OK && !bEof ){
1007         u8 eType = 0;
1008         logReaderByte(&reader, &eType, &rc);
1009 
1010         switch( eType ){
1011           case LSM_LOG_PAD1:
1012             break;
1013 
1014           case LSM_LOG_PAD2: {
1015             int nPad;
1016             logReaderVarint(&reader, &buf1, &nPad, &rc);
1017             logReaderBlob(&reader, &buf1, nPad, 0, &rc);
1018             break;
1019           }
1020 
1021           case LSM_LOG_DRANGE:
1022           case LSM_LOG_DRANGE_CKSUM:
1023           case LSM_LOG_WRITE:
1024           case LSM_LOG_WRITE_CKSUM: {
1025             int nKey;
1026             int nVal;
1027             u8 *aVal;
1028             logReaderVarint(&reader, &buf1, &nKey, &rc);
1029             logReaderVarint(&reader, &buf2, &nVal, &rc);
1030 
1031             if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
1032               logReaderCksum(&reader, &buf1, &bEof, &rc);
1033             }else{
1034               bEof = logRequireCksum(&reader, nKey+nVal);
1035             }
1036             if( bEof ) break;
1037 
1038             logReaderBlob(&reader, &buf1, nKey, 0, &rc);
1039             logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
1040             if( iPass==1 && rc==LSM_OK ){
1041               if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
1042                 rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
1043               }else{
1044                 rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
1045               }
1046             }
1047             break;
1048           }
1049 
1050           case LSM_LOG_DELETE:
1051           case LSM_LOG_DELETE_CKSUM: {
1052             int nKey; u8 *aKey;
1053             logReaderVarint(&reader, &buf1, &nKey, &rc);
1054 
1055             if( eType==LSM_LOG_DELETE_CKSUM ){
1056               logReaderCksum(&reader, &buf1, &bEof, &rc);
1057             }else{
1058               bEof = logRequireCksum(&reader, nKey);
1059             }
1060             if( bEof ) break;
1061 
1062             logReaderBlob(&reader, &buf1, nKey, &aKey, &rc);
1063             if( iPass==1 && rc==LSM_OK ){
1064               rc = lsmTreeInsert(pDb, aKey, nKey, NULL, -1);
1065             }
1066             break;
1067           }
1068 
1069           case LSM_LOG_COMMIT:
1070             logReaderCksum(&reader, &buf1, &bEof, &rc);
1071             if( bEof==0 ){
1072               nCommit++;
1073               assert( nCommit>0 || iPass==1 );
1074               if( nCommit==0 ) bEof = 1;
1075             }
1076             break;
1077 
1078           case LSM_LOG_JUMP: {
1079             int iOff = 0;
1080             logReaderVarint(&reader, &buf1, &iOff, &rc);
1081             if( rc==LSM_OK ){
1082               if( iPass==1 ){
1083                 if( pLog->aRegion[2].iStart==0 ){
1084                   assert( pLog->aRegion[1].iStart==0 );
1085                   pLog->aRegion[1].iEnd = reader.iOff;
1086                 }else{
1087                   assert( pLog->aRegion[0].iStart==0 );
1088                   pLog->aRegion[0].iStart = pLog->aRegion[2].iStart;
1089                   pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf;
1090                 }
1091                 pLog->aRegion[2].iStart = iOff;
1092               }else{
1093                 if( (nJump++)==2 ){
1094                   bEof = 1;
1095                 }
1096               }
1097 
1098               reader.iOff = iOff;
1099               reader.buf.n = reader.iBuf;
1100             }
1101             break;
1102           }
1103 
1104           default:
1105             /* Including LSM_LOG_EOF */
1106             bEof = 1;
1107             break;
1108         }
1109       }
1110 
1111       if( rc==LSM_OK && iPass==0 ){
1112         if( nCommit==0 ){
1113           if( pLog->aRegion[2].iStart==0 ){
1114             iPass = 1;
1115           }else{
1116             pLog->aRegion[2].iStart = 0;
1117             iPass = -1;
1118             lsmCheckpointZeroLogoffset(pDb);
1119           }
1120         }
1121         logReaderInit(pDb, pLog, 0, &reader);
1122         nCommit = nCommit * -1;
1123       }
1124     }
1125   }
1126 
1127   /* Initialize DbLog object */
1128   if( rc==LSM_OK ){
1129     pLog->aRegion[2].iEnd = reader.iOff - reader.buf.n + reader.iBuf;
1130     pLog->cksum0 = reader.cksum0;
1131     pLog->cksum1 = reader.cksum1;
1132   }
1133 
1134   if( rc==LSM_OK ){
1135     rc = lsmFinishRecovery(pDb);
1136   }else{
1137     lsmFinishRecovery(pDb);
1138   }
1139 
1140   if( pDb->bRoTrans ){
1141     lsmFsCloseLog(pDb);
1142   }
1143 
1144   lsmStringClear(&buf1);
1145   lsmStringClear(&buf2);
1146   lsmStringClear(&reader.buf);
1147   return rc;
1148 }
1149 
lsmLogClose(lsm_db * db)1150 void lsmLogClose(lsm_db *db){
1151   if( db->pLogWriter ){
1152     lsmFree(db->pEnv, db->pLogWriter->buf.z);
1153     lsmFree(db->pEnv, db->pLogWriter);
1154     db->pLogWriter = 0;
1155   }
1156 }
1157