1 /*
2 ** 2011-09-11
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** This file contains code to read and write checkpoints.
14 **
15 ** A checkpoint represents the database layout at a single point in time.
16 ** It includes a log offset. When an existing database is opened, the
17 ** current state is determined by reading the newest checkpoint and updating
18 ** it with all committed transactions from the log that follow the specified
19 ** offset.
20 */
21 #include "lsmInt.h"
22 
23 /*
24 ** CHECKPOINT BLOB FORMAT:
25 **
26 ** A checkpoint blob is a series of unsigned 32-bit integers stored in
27 ** big-endian byte order. As follows:
28 **
29 **   Checkpoint header (see the CKPT_HDR_XXX #defines):
30 **
31 **     1. The checkpoint id MSW.
32 **     2. The checkpoint id LSW.
33 **     3. The number of integer values in the entire checkpoint, including
34 **        the two checksum values.
35 **     4. The compression scheme id.
36 **     5. The total number of blocks in the database.
37 **     6. The block size.
38 **     7. The number of levels.
39 **     8. The nominal database page size.
40 **     9. The number of pages (in total) written to the database file.
41 **
42 **   Log pointer:
43 **
44 **     1. The log offset MSW.
45 **     2. The log offset LSW.
46 **     3. Log checksum 0.
47 **     4. Log checksum 1.
48 **
49 **     Note that the "log offset" is not the literal byte offset. Instead,
50 **     it is the byte offset multiplied by 2, with least significant bit
51 **     toggled each time the log pointer value is changed. This is to make
52 **     sure that this field changes each time the log pointer is updated,
53 **     even if the log file itself is disabled. See lsmTreeMakeOld().
54 **
55 **     See ckptExportLog() and ckptImportLog().
56 **
57 **   Append points:
58 **
59 **     8 integers (4 * 64-bit page numbers). See ckptExportAppendlist().
60 **
61 **   For each level in the database, a level record. Formatted as follows:
62 **
63 **     0. Age of the level (least significant 16-bits). And flags mask (most
64 **        significant 16-bits).
65 **     1. The number of right-hand segments (nRight, possibly 0),
66 **     2. Segment record for left-hand segment (8 integers defined below),
67 **     3. Segment record for each right-hand segment (8 integers defined below),
68 **     4. If nRight>0, The number of segments involved in the merge
69 **     5. if nRight>0, Current nSkip value (see Merge structure defn.),
70 **     6. For each segment in the merge:
71 **        5a. Page number of next cell to read during merge (this field
72 **            is 64-bits - 2 integers)
73 **        5b. Cell number of next cell to read during merge
74 **     7. Page containing current split-key (64-bits - 2 integers).
75 **     8. Cell within page containing current split-key.
76 **     9. Current pointer value (64-bits - 2 integers).
77 **
78 **   The block redirect array:
79 **
80 **     1. Number of redirections (maximum LSM_MAX_BLOCK_REDIRECTS).
81 **     2. For each redirection:
82 **        a. "from" block number
83 **        b. "to" block number
84 **
85 **   The in-memory freelist entries. Each entry is either an insert or a
86 **   delete. The in-memory freelist is to the free-block-list as the
87 **   in-memory tree is to the users database content.
88 **
89 **     1. Number of free-list entries stored in checkpoint header.
90 **     2. Number of free blocks (in total).
91 **     3. Total number of blocks freed during database lifetime.
92 **     4. For each entry:
93 **        2a. Block number of free block.
94 **        2b. A 64-bit integer (MSW followed by LSW). -1 for a delete entry,
95 **            or the associated checkpoint id for an insert.
96 **
97 **   The checksum:
98 **
99 **     1. Checksum value 1.
100 **     2. Checksum value 2.
101 **
102 ** In the above, a segment record consists of the following four 64-bit
103 ** fields (converted to 2 * u32 by storing the MSW followed by LSW):
104 **
105 **     1. First page of array,
106 **     2. Last page of array,
107 **     3. Root page of array (or 0),
108 **     4. Size of array in pages.
109 */
110 
111 /*
112 ** LARGE NUMBERS OF LEVEL RECORDS:
113 **
114 ** A limit on the number of rhs segments that may be present in the database
115 ** file. Defining this limit ensures that all level records fit within
116 ** the 4096 byte limit for checkpoint blobs.
117 **
118 ** The number of right-hand-side segments in a database is counted as
119 ** follows:
120 **
121 **   * For each level in the database not undergoing a merge, add 1.
122 **
123 **   * For each level in the database that is undergoing a merge, add
124 **     the number of segments on the rhs of the level.
125 **
126 ** A level record not undergoing a merge is 10 integers. A level record
127 ** with nRhs rhs segments and (nRhs+1) input segments (i.e. including the
128 ** separators from the next level) is (11*nRhs+20) integers. The maximum
129 ** per right-hand-side level is therefore 21 integers. So the maximum
130 ** size of all level records in a checkpoint is 21*40=820 integers.
131 **
132 ** TODO: Before pointer values were changed from 32 to 64 bits, the above
133 ** used to come to 420 bytes - leaving significant space for a free-list
134 ** prefix. No more. To fix this, reduce the size of the level records in
135 ** a db snapshot, and improve management of the free-list tail in
136 ** lsm_sorted.c.
137 */
138 #define LSM_MAX_RHS_SEGMENTS 40
139 
140 /*
141 ** LARGE NUMBERS OF FREELIST ENTRIES:
142 **
143 ** There is also a limit (LSM_MAX_FREELIST_ENTRIES - defined in lsmInt.h)
144 ** on the number of free-list entries stored in a checkpoint. Since each
145 ** free-list entry consists of 3 integers, the maximum free-list size is
146 ** 3*100=300 integers. Combined with the limit on rhs segments defined
147 ** above, this ensures that a checkpoint always fits within a 4096 byte
148 ** meta page.
149 **
150 ** If the database contains more than 100 free blocks, the "overflow" flag
151 ** in the checkpoint header is set and the remainder are stored in the
152 ** system FREELIST entry in the LSM (along with user data). The value
153 ** accompanying the FREELIST key in the LSM is, like a checkpoint, an array
154 ** of 32-bit big-endian integers. As follows:
155 **
156 **     For each entry:
157 **       a. Block number of free block.
158 **       b. MSW of associated checkpoint id.
159 **       c. LSW of associated checkpoint id.
160 **
161 ** The number of entries is not required - it is implied by the size of the
162 ** value blob containing the integer array.
163 **
164 ** Note that the limit defined by LSM_MAX_FREELIST_ENTRIES is a hard limit.
165 ** The actual value used may be configured using LSM_CONFIG_MAX_FREELIST.
166 */
167 
168 /*
169 ** The argument to this macro must be of type u32. On a little-endian
170 ** architecture, it returns the u32 value that results from interpreting
171 ** the 4 bytes as a big-endian value. On a big-endian architecture, it
172 ** returns the value that would be produced by intepreting the 4 bytes
173 ** of the input value as a little-endian integer.
174 */
175 #define BYTESWAP32(x) ( \
176    (((x)&0x000000FF)<<24) + (((x)&0x0000FF00)<<8)  \
177  + (((x)&0x00FF0000)>>8)  + (((x)&0xFF000000)>>24) \
178 )
179 
180 static const int one = 1;
181 #define LSM_LITTLE_ENDIAN (*(u8 *)(&one))
182 
183 /* Sizes, in integers, of various parts of the checkpoint. */
184 #define CKPT_HDR_SIZE         9
185 #define CKPT_LOGPTR_SIZE      4
186 #define CKPT_APPENDLIST_SIZE  (LSM_APPLIST_SZ * 2)
187 
188 /* A #define to describe each integer in the checkpoint header. */
189 #define CKPT_HDR_ID_MSW   0
190 #define CKPT_HDR_ID_LSW   1
191 #define CKPT_HDR_NCKPT    2
192 #define CKPT_HDR_CMPID    3
193 #define CKPT_HDR_NBLOCK   4
194 #define CKPT_HDR_BLKSZ    5
195 #define CKPT_HDR_NLEVEL   6
196 #define CKPT_HDR_PGSZ     7
197 #define CKPT_HDR_NWRITE   8
198 
199 #define CKPT_HDR_LO_MSW     9
200 #define CKPT_HDR_LO_LSW    10
201 #define CKPT_HDR_LO_CKSUM1 11
202 #define CKPT_HDR_LO_CKSUM2 12
203 
204 typedef struct CkptBuffer CkptBuffer;
205 
206 /*
207 ** Dynamic buffer used to accumulate data for a checkpoint.
208 */
209 struct CkptBuffer {
210   lsm_env *pEnv;
211   int nAlloc;
212   u32 *aCkpt;
213 };
214 
215 /*
216 ** Calculate the checksum of the checkpoint specified by arguments aCkpt and
217 ** nCkpt. Store the checksum in *piCksum1 and *piCksum2 before returning.
218 **
219 ** The value of the nCkpt parameter includes the two checksum values at
220 ** the end of the checkpoint. They are not used as inputs to the checksum
221 ** calculation. The checksum is based on the array of (nCkpt-2) integers
222 ** at aCkpt[].
223 */
ckptChecksum(u32 * aCkpt,u32 nCkpt,u32 * piCksum1,u32 * piCksum2)224 static void ckptChecksum(u32 *aCkpt, u32 nCkpt, u32 *piCksum1, u32 *piCksum2){
225   u32 i;
226   u32 cksum1 = 1;
227   u32 cksum2 = 2;
228 
229   if( nCkpt % 2 ){
230     cksum1 += aCkpt[nCkpt-3] & 0x0000FFFF;
231     cksum2 += aCkpt[nCkpt-3] & 0xFFFF0000;
232   }
233 
234   for(i=0; (i+3)<nCkpt; i+=2){
235     cksum1 += cksum2 + aCkpt[i];
236     cksum2 += cksum1 + aCkpt[i+1];
237   }
238 
239   *piCksum1 = cksum1;
240   *piCksum2 = cksum2;
241 }
242 
243 /*
244 ** Set integer iIdx of the checkpoint accumulating in buffer *p to iVal.
245 */
ckptSetValue(CkptBuffer * p,int iIdx,u32 iVal,int * pRc)246 static void ckptSetValue(CkptBuffer *p, int iIdx, u32 iVal, int *pRc){
247   if( *pRc ) return;
248   if( iIdx>=p->nAlloc ){
249     int nNew = LSM_MAX(8, iIdx*2);
250     p->aCkpt = (u32 *)lsmReallocOrFree(p->pEnv, p->aCkpt, nNew*sizeof(u32));
251     if( !p->aCkpt ){
252       *pRc = LSM_NOMEM_BKPT;
253       return;
254     }
255     p->nAlloc = nNew;
256   }
257   p->aCkpt[iIdx] = iVal;
258 }
259 
260 /*
261 ** Argument aInt points to an array nInt elements in size. Switch the
262 ** endian-ness of each element of the array.
263 */
ckptChangeEndianness(u32 * aInt,int nInt)264 static void ckptChangeEndianness(u32 *aInt, int nInt){
265   if( LSM_LITTLE_ENDIAN ){
266     int i;
267     for(i=0; i<nInt; i++) aInt[i] = BYTESWAP32(aInt[i]);
268   }
269 }
270 
271 /*
272 ** Object *p contains a checkpoint in native byte-order. The checkpoint is
273 ** nCkpt integers in size, not including any checksum. This function sets
274 ** the two checksum elements of the checkpoint accordingly.
275 */
ckptAddChecksum(CkptBuffer * p,int nCkpt,int * pRc)276 static void ckptAddChecksum(CkptBuffer *p, int nCkpt, int *pRc){
277   if( *pRc==LSM_OK ){
278     u32 aCksum[2] = {0, 0};
279     ckptChecksum(p->aCkpt, nCkpt+2, &aCksum[0], &aCksum[1]);
280     ckptSetValue(p, nCkpt, aCksum[0], pRc);
281     ckptSetValue(p, nCkpt+1, aCksum[1], pRc);
282   }
283 }
284 
ckptAppend64(CkptBuffer * p,int * piOut,i64 iVal,int * pRc)285 static void ckptAppend64(CkptBuffer *p, int *piOut, i64 iVal, int *pRc){
286   int iOut = *piOut;
287   ckptSetValue(p, iOut++, (iVal >> 32) & 0xFFFFFFFF, pRc);
288   ckptSetValue(p, iOut++, (iVal & 0xFFFFFFFF), pRc);
289   *piOut = iOut;
290 }
291 
ckptRead64(u32 * a)292 static i64 ckptRead64(u32 *a){
293   return (((i64)a[0]) << 32) + (i64)a[1];
294 }
295 
ckptGobble64(u32 * a,int * piIn)296 static i64 ckptGobble64(u32 *a, int *piIn){
297   int iIn = *piIn;
298   *piIn += 2;
299   return ckptRead64(&a[iIn]);
300 }
301 
302 
303 /*
304 ** Append a 6-value segment record corresponding to pSeg to the checkpoint
305 ** buffer passed as the third argument.
306 */
ckptExportSegment(Segment * pSeg,CkptBuffer * p,int * piOut,int * pRc)307 static void ckptExportSegment(
308   Segment *pSeg,
309   CkptBuffer *p,
310   int *piOut,
311   int *pRc
312 ){
313   ckptAppend64(p, piOut, pSeg->iFirst, pRc);
314   ckptAppend64(p, piOut, pSeg->iLastPg, pRc);
315   ckptAppend64(p, piOut, pSeg->iRoot, pRc);
316   ckptAppend64(p, piOut, pSeg->nSize, pRc);
317 }
318 
ckptExportLevel(Level * pLevel,CkptBuffer * p,int * piOut,int * pRc)319 static void ckptExportLevel(
320   Level *pLevel,                  /* Level object to serialize */
321   CkptBuffer *p,                  /* Append new level record to this ckpt */
322   int *piOut,                     /* IN/OUT: Size of checkpoint so far */
323   int *pRc                        /* IN/OUT: Error code */
324 ){
325   int iOut = *piOut;
326   Merge *pMerge;
327 
328   pMerge = pLevel->pMerge;
329   ckptSetValue(p, iOut++, (u32)pLevel->iAge + (u32)(pLevel->flags<<16), pRc);
330   ckptSetValue(p, iOut++, pLevel->nRight, pRc);
331   ckptExportSegment(&pLevel->lhs, p, &iOut, pRc);
332 
333   assert( (pLevel->nRight>0)==(pMerge!=0) );
334   if( pMerge ){
335     int i;
336     for(i=0; i<pLevel->nRight; i++){
337       ckptExportSegment(&pLevel->aRhs[i], p, &iOut, pRc);
338     }
339     assert( pMerge->nInput==pLevel->nRight
340          || pMerge->nInput==pLevel->nRight+1
341     );
342     ckptSetValue(p, iOut++, pMerge->nInput, pRc);
343     ckptSetValue(p, iOut++, pMerge->nSkip, pRc);
344     for(i=0; i<pMerge->nInput; i++){
345       ckptAppend64(p, &iOut, pMerge->aInput[i].iPg, pRc);
346       ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc);
347     }
348     ckptAppend64(p, &iOut, pMerge->splitkey.iPg, pRc);
349     ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc);
350     ckptAppend64(p, &iOut, pMerge->iCurrentPtr, pRc);
351   }
352 
353   *piOut = iOut;
354 }
355 
356 /*
357 ** Populate the log offset fields of the checkpoint buffer. 4 values.
358 */
ckptExportLog(lsm_db * pDb,int bFlush,CkptBuffer * p,int * piOut,int * pRc)359 static void ckptExportLog(
360   lsm_db *pDb,
361   int bFlush,
362   CkptBuffer *p,
363   int *piOut,
364   int *pRc
365 ){
366   int iOut = *piOut;
367 
368   assert( iOut==CKPT_HDR_LO_MSW );
369 
370   if( bFlush ){
371     i64 iOff = pDb->treehdr.iOldLog;
372     ckptAppend64(p, &iOut, iOff, pRc);
373     ckptSetValue(p, iOut++, pDb->treehdr.oldcksum0, pRc);
374     ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
375   }else{
376     for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
377       ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc);
378     }
379   }
380 
381   assert( *pRc || iOut==CKPT_HDR_LO_CKSUM2+1 );
382   *piOut = iOut;
383 }
384 
ckptExportAppendlist(lsm_db * db,CkptBuffer * p,int * piOut,int * pRc)385 static void ckptExportAppendlist(
386   lsm_db *db,                     /* Database connection */
387   CkptBuffer *p,                  /* Checkpoint buffer to write to */
388   int *piOut,                     /* IN/OUT: Offset within checkpoint buffer */
389   int *pRc                        /* IN/OUT: Error code */
390 ){
391   int i;
392   Pgno *aiAppend = db->pWorker->aiAppend;
393 
394   for(i=0; i<LSM_APPLIST_SZ; i++){
395     ckptAppend64(p, piOut, aiAppend[i], pRc);
396   }
397 };
398 
ckptExportSnapshot(lsm_db * pDb,int bLog,i64 iId,int bCksum,void ** ppCkpt,int * pnCkpt)399 static int ckptExportSnapshot(
400   lsm_db *pDb,                    /* Connection handle */
401   int bLog,                       /* True to update log-offset fields */
402   i64 iId,                        /* Checkpoint id */
403   int bCksum,                     /* If true, include checksums */
404   void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
405   int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
406 ){
407   int rc = LSM_OK;                /* Return Code */
408   FileSystem *pFS = pDb->pFS;     /* File system object */
409   Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */
410   int nLevel = 0;                 /* Number of levels in checkpoint */
411   int iLevel;                     /* Used to count out nLevel levels */
412   int iOut = 0;                   /* Current offset in aCkpt[] */
413   Level *pLevel;                  /* Level iterator */
414   int i;                          /* Iterator used while serializing freelist */
415   CkptBuffer ckpt;
416 
417   /* Initialize the output buffer */
418   memset(&ckpt, 0, sizeof(CkptBuffer));
419   ckpt.pEnv = pDb->pEnv;
420   iOut = CKPT_HDR_SIZE;
421 
422   /* Write the log offset into the checkpoint. */
423   ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);
424 
425   /* Write the append-point list */
426   ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);
427 
428   /* Figure out how many levels will be written to the checkpoint. */
429   for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nLevel++;
430 
431   /* Serialize nLevel levels. */
432   iLevel = 0;
433   for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
434     ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
435     iLevel++;
436   }
437 
438   /* Write the block-redirect list */
439   ckptSetValue(&ckpt, iOut++, pSnap->redirect.n, &rc);
440   for(i=0; i<pSnap->redirect.n; i++){
441     ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iFrom, &rc);
442     ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iTo, &rc);
443   }
444 
445   /* Write the freelist */
446   assert( pSnap->freelist.nEntry<=pDb->nMaxFreelist );
447   if( rc==LSM_OK ){
448     int nFree = pSnap->freelist.nEntry;
449     ckptSetValue(&ckpt, iOut++, nFree, &rc);
450     for(i=0; i<nFree; i++){
451       FreelistEntry *p = &pSnap->freelist.aEntry[i];
452       ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
453       ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
454       ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
455     }
456   }
457 
458   /* Write the checkpoint header */
459   assert( iId>=0 );
460   assert( pSnap->iCmpId==pDb->compress.iId
461        || pSnap->iCmpId==LSM_COMPRESSION_EMPTY
462   );
463   ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
464   ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
465   ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
466   ckptSetValue(&ckpt, CKPT_HDR_CMPID, pDb->compress.iId, &rc);
467   ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
468   ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
469   ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
470   ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
471   ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);
472 
473   if( bCksum ){
474     ckptAddChecksum(&ckpt, iOut, &rc);
475   }else{
476     ckptSetValue(&ckpt, iOut, 0, &rc);
477     ckptSetValue(&ckpt, iOut+1, 0, &rc);
478   }
479   iOut += 2;
480   assert( iOut<=1024 );
481 
482 #ifdef LSM_LOG_FREELIST
483   lsmLogMessage(pDb, rc,
484       "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry
485   );
486   for(i=0; i<pSnap->freelist.nEntry; i++){
487   lsmLogMessage(pDb, rc,
488       "ckptExportSnapshot(): iBlk=%d id=%lld",
489       pSnap->freelist.aEntry[i].iBlk,
490       pSnap->freelist.aEntry[i].iId
491   );
492   }
493 #endif
494 
495   *ppCkpt = (void *)ckpt.aCkpt;
496   if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
497   return rc;
498 }
499 
500 
501 /*
502 ** Helper function for ckptImport().
503 */
ckptNewSegment(u32 * aIn,int * piIn,Segment * pSegment)504 static void ckptNewSegment(
505   u32 *aIn,
506   int *piIn,
507   Segment *pSegment               /* Populate this structure */
508 ){
509   assert( pSegment->iFirst==0 && pSegment->iLastPg==0 );
510   assert( pSegment->nSize==0 && pSegment->iRoot==0 );
511   pSegment->iFirst = ckptGobble64(aIn, piIn);
512   pSegment->iLastPg = ckptGobble64(aIn, piIn);
513   pSegment->iRoot = ckptGobble64(aIn, piIn);
514   pSegment->nSize = (int)ckptGobble64(aIn, piIn);
515   assert( pSegment->iFirst );
516 }
517 
ckptSetupMerge(lsm_db * pDb,u32 * aInt,int * piIn,Level * pLevel)518 static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){
519   Merge *pMerge;                  /* Allocated Merge object */
520   int nInput;                     /* Number of input segments in merge */
521   int iIn = *piIn;                /* Next value to read from aInt[] */
522   int i;                          /* Iterator variable */
523   int nByte;                      /* Number of bytes to allocate */
524 
525   /* Allocate the Merge object. If malloc() fails, return LSM_NOMEM. */
526   nInput = (int)aInt[iIn++];
527   nByte = sizeof(Merge) + sizeof(MergeInput) * nInput;
528   pMerge = (Merge *)lsmMallocZero(pDb->pEnv, nByte);
529   if( !pMerge ) return LSM_NOMEM_BKPT;
530   pLevel->pMerge = pMerge;
531 
532   /* Populate the Merge object. */
533   pMerge->aInput = (MergeInput *)&pMerge[1];
534   pMerge->nInput = nInput;
535   pMerge->iOutputOff = -1;
536   pMerge->nSkip = (int)aInt[iIn++];
537   for(i=0; i<nInput; i++){
538     pMerge->aInput[i].iPg = ckptGobble64(aInt, &iIn);
539     pMerge->aInput[i].iCell = (int)aInt[iIn++];
540   }
541   pMerge->splitkey.iPg = ckptGobble64(aInt, &iIn);
542   pMerge->splitkey.iCell = (int)aInt[iIn++];
543   pMerge->iCurrentPtr = ckptGobble64(aInt, &iIn);
544 
545   /* Set *piIn and return LSM_OK. */
546   *piIn = iIn;
547   return LSM_OK;
548 }
549 
550 
ckptLoadLevels(lsm_db * pDb,u32 * aIn,int * piIn,int nLevel,Level ** ppLevel)551 static int ckptLoadLevels(
552   lsm_db *pDb,
553   u32 *aIn,
554   int *piIn,
555   int nLevel,
556   Level **ppLevel
557 ){
558   int i;
559   int rc = LSM_OK;
560   Level *pRet = 0;
561   Level **ppNext;
562   int iIn = *piIn;
563 
564   ppNext = &pRet;
565   for(i=0; rc==LSM_OK && i<nLevel; i++){
566     int iRight;
567     Level *pLevel;
568 
569     /* Allocate space for the Level structure and Level.apRight[] array */
570     pLevel = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
571     if( rc==LSM_OK ){
572       pLevel->iAge = (u16)(aIn[iIn] & 0x0000FFFF);
573       pLevel->flags = (u16)((aIn[iIn]>>16) & 0x0000FFFF);
574       iIn++;
575       pLevel->nRight = aIn[iIn++];
576       if( pLevel->nRight ){
577         int nByte = sizeof(Segment) * pLevel->nRight;
578         pLevel->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
579       }
580       if( rc==LSM_OK ){
581         *ppNext = pLevel;
582         ppNext = &pLevel->pNext;
583 
584         /* Allocate the main segment */
585         ckptNewSegment(aIn, &iIn, &pLevel->lhs);
586 
587         /* Allocate each of the right-hand segments, if any */
588         for(iRight=0; iRight<pLevel->nRight; iRight++){
589           ckptNewSegment(aIn, &iIn, &pLevel->aRhs[iRight]);
590         }
591 
592         /* Set up the Merge object, if required */
593         if( pLevel->nRight>0 ){
594           rc = ckptSetupMerge(pDb, aIn, &iIn, pLevel);
595         }
596       }
597     }
598   }
599 
600   if( rc!=LSM_OK ){
601     /* An OOM must have occurred. Free any level structures allocated and
602     ** return the error to the caller. */
603     lsmSortedFreeLevel(pDb->pEnv, pRet);
604     pRet = 0;
605   }
606 
607   *ppLevel = pRet;
608   *piIn = iIn;
609   return rc;
610 }
611 
612 
lsmCheckpointLoadLevels(lsm_db * pDb,void * pVal,int nVal)613 int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal){
614   int rc = LSM_OK;
615   if( nVal>0 ){
616     u32 *aIn;
617 
618     aIn = lsmMallocRc(pDb->pEnv, nVal, &rc);
619     if( aIn ){
620       Level *pLevel = 0;
621       Level *pParent;
622 
623       int nIn;
624       int nLevel;
625       int iIn = 1;
626       memcpy(aIn, pVal, nVal);
627       nIn = nVal / sizeof(u32);
628 
629       ckptChangeEndianness(aIn, nIn);
630       nLevel = aIn[0];
631       rc = ckptLoadLevels(pDb, aIn, &iIn, nLevel, &pLevel);
632       lsmFree(pDb->pEnv, aIn);
633       assert( rc==LSM_OK || pLevel==0 );
634       if( rc==LSM_OK ){
635         pParent = lsmDbSnapshotLevel(pDb->pWorker);
636         assert( pParent );
637         while( pParent->pNext ) pParent = pParent->pNext;
638         pParent->pNext = pLevel;
639       }
640     }
641   }
642 
643   return rc;
644 }
645 
646 /*
647 ** Return the data for the LEVELS record.
648 **
649 ** The size of the checkpoint that can be stored in the database header
650 ** must not exceed 1024 32-bit integers. Normally, it does not. However,
651 ** if it does, part of the checkpoint must be stored in the LSM. This
652 ** routine returns that part.
653 */
lsmCheckpointLevels(lsm_db * pDb,int nLevel,void ** paVal,int * pnVal)654 int lsmCheckpointLevels(
655   lsm_db *pDb,                    /* Database handle */
656   int nLevel,                     /* Number of levels to write to blob */
657   void **paVal,                   /* OUT: Pointer to LEVELS blob */
658   int *pnVal                      /* OUT: Size of LEVELS blob in bytes */
659 ){
660   Level *p;                       /* Used to iterate through levels */
661   int nAll= 0;
662   int rc;
663   int i;
664   int iOut;
665   CkptBuffer ckpt;
666   assert( nLevel>0 );
667 
668   for(p=lsmDbSnapshotLevel(pDb->pWorker); p; p=p->pNext) nAll++;
669 
670   assert( nAll>nLevel );
671   nAll -= nLevel;
672   for(p=lsmDbSnapshotLevel(pDb->pWorker); p && nAll>0; p=p->pNext) nAll--;
673 
674   memset(&ckpt, 0, sizeof(CkptBuffer));
675   ckpt.pEnv = pDb->pEnv;
676 
677   ckptSetValue(&ckpt, 0, nLevel, &rc);
678   iOut = 1;
679   for(i=0; rc==LSM_OK && i<nLevel; i++){
680     ckptExportLevel(p, &ckpt, &iOut, &rc);
681     p = p->pNext;
682   }
683   assert( rc!=LSM_OK || p==0 );
684 
685   if( rc==LSM_OK ){
686     ckptChangeEndianness(ckpt.aCkpt, iOut);
687     *paVal = (void *)ckpt.aCkpt;
688     *pnVal = iOut * sizeof(u32);
689   }else{
690     *pnVal = 0;
691     *paVal = 0;
692   }
693 
694   return rc;
695 }
696 
697 /*
698 ** Read the checkpoint id from meta-page pPg.
699 */
ckptLoadId(MetaPage * pPg)700 static i64 ckptLoadId(MetaPage *pPg){
701   i64 ret = 0;
702   if( pPg ){
703     int nData;
704     u8 *aData = lsmFsMetaPageData(pPg, &nData);
705     ret = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32) +
706           ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
707   }
708   return ret;
709 }
710 
711 /*
712 ** Return true if the buffer passed as an argument contains a valid
713 ** checkpoint.
714 */
ckptChecksumOk(u32 * aCkpt)715 static int ckptChecksumOk(u32 *aCkpt){
716   u32 nCkpt = aCkpt[CKPT_HDR_NCKPT];
717   u32 cksum1;
718   u32 cksum2;
719 
720   if( nCkpt<CKPT_HDR_NCKPT || nCkpt>(LSM_META_RW_PAGE_SIZE)/sizeof(u32) ){
721     return 0;
722   }
723   ckptChecksum(aCkpt, nCkpt, &cksum1, &cksum2);
724   return (cksum1==aCkpt[nCkpt-2] && cksum2==aCkpt[nCkpt-1]);
725 }
726 
727 /*
728 ** Attempt to load a checkpoint from meta page iMeta.
729 **
730 ** This function is a no-op if *pRc is set to any value other than LSM_OK
731 ** when it is called. If an error occurs, *pRc is set to an LSM error code
732 ** before returning.
733 **
734 ** If no error occurs and the checkpoint is successfully loaded, copy it to
735 ** ShmHeader.aSnap1[] and ShmHeader.aSnap2[], and set ShmHeader.iMetaPage
736 ** to indicate its origin. In this case return 1. Or, if the checkpoint
737 ** cannot be loaded (because the checksum does not compute), return 0.
738 */
ckptTryLoad(lsm_db * pDb,MetaPage * pPg,u32 iMeta,int * pRc)739 static int ckptTryLoad(lsm_db *pDb, MetaPage *pPg, u32 iMeta, int *pRc){
740   int bLoaded = 0;                /* Return value */
741   if( *pRc==LSM_OK ){
742     int rc = LSM_OK;              /* Error code */
743     u32 *aCkpt = 0;               /* Pointer to buffer containing checkpoint */
744     u32 nCkpt;                    /* Number of elements in aCkpt[] */
745     int nData;                    /* Bytes of data in aData[] */
746     u8 *aData;                    /* Meta page data */
747 
748     aData = lsmFsMetaPageData(pPg, &nData);
749     nCkpt = (u32)lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
750     if( nCkpt<=nData/sizeof(u32) && nCkpt>CKPT_HDR_NCKPT ){
751       aCkpt = (u32 *)lsmMallocRc(pDb->pEnv, nCkpt*sizeof(u32), &rc);
752     }
753     if( aCkpt ){
754       memcpy(aCkpt, aData, nCkpt*sizeof(u32));
755       ckptChangeEndianness(aCkpt, nCkpt);
756       if( ckptChecksumOk(aCkpt) ){
757         ShmHeader *pShm = pDb->pShmhdr;
758         memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
759         memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
760         memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
761         pShm->iMetaPage = iMeta;
762         bLoaded = 1;
763       }
764     }
765 
766     lsmFree(pDb->pEnv, aCkpt);
767     *pRc = rc;
768   }
769   return bLoaded;
770 }
771 
772 /*
773 ** Initialize the shared-memory header with an empty snapshot. This function
774 ** is called when no valid snapshot can be found in the database header.
775 */
ckptLoadEmpty(lsm_db * pDb)776 static void ckptLoadEmpty(lsm_db *pDb){
777   u32 aCkpt[] = {
778     0,                       /* CKPT_HDR_ID_MSW */
779     10,                      /* CKPT_HDR_ID_LSW */
780     0,                       /* CKPT_HDR_NCKPT */
781     LSM_COMPRESSION_EMPTY,   /* CKPT_HDR_CMPID */
782     0,                       /* CKPT_HDR_NBLOCK */
783     0,                       /* CKPT_HDR_BLKSZ */
784     0,                       /* CKPT_HDR_NLEVEL */
785     0,                       /* CKPT_HDR_PGSZ */
786     0,                       /* CKPT_HDR_NWRITE */
787     0, 0, 1234, 5678,        /* The log pointer and initial checksum */
788     0,0,0,0, 0,0,0,0,        /* The append list */
789     0,                       /* The redirected block list */
790     0,                       /* The free block list */
791     0, 0                     /* Space for checksum values */
792   };
793   u32 nCkpt = array_size(aCkpt);
794   ShmHeader *pShm = pDb->pShmhdr;
795 
796   aCkpt[CKPT_HDR_NCKPT] = nCkpt;
797   aCkpt[CKPT_HDR_BLKSZ] = pDb->nDfltBlksz;
798   aCkpt[CKPT_HDR_PGSZ] = pDb->nDfltPgsz;
799   ckptChecksum(aCkpt, array_size(aCkpt), &aCkpt[nCkpt-2], &aCkpt[nCkpt-1]);
800 
801   memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
802   memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
803   memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
804 }
805 
806 /*
807 ** This function is called as part of database recovery to initialize the
808 ** ShmHeader.aSnap1[] and ShmHeader.aSnap2[] snapshots.
809 */
lsmCheckpointRecover(lsm_db * pDb)810 int lsmCheckpointRecover(lsm_db *pDb){
811   int rc = LSM_OK;                /* Return Code */
812   i64 iId1;                       /* Id of checkpoint on meta-page 1 */
813   i64 iId2;                       /* Id of checkpoint on meta-page 2 */
814   int bLoaded = 0;                /* True once checkpoint has been loaded */
815   int cmp;                        /* True if (iId2>iId1) */
816   MetaPage *apPg[2] = {0, 0};     /* Meta-pages 1 and 2 */
817 
818   rc = lsmFsMetaPageGet(pDb->pFS, 0, 1, &apPg[0]);
819   if( rc==LSM_OK ) rc = lsmFsMetaPageGet(pDb->pFS, 0, 2, &apPg[1]);
820 
821   iId1 = ckptLoadId(apPg[0]);
822   iId2 = ckptLoadId(apPg[1]);
823   cmp = (iId2 > iId1);
824   bLoaded = ckptTryLoad(pDb, apPg[cmp?1:0], (cmp?2:1), &rc);
825   if( bLoaded==0 ){
826     bLoaded = ckptTryLoad(pDb, apPg[cmp?0:1], (cmp?1:2), &rc);
827   }
828 
829   /* The database does not contain a valid checkpoint. Initialize the shared
830   ** memory header with an empty checkpoint.  */
831   if( bLoaded==0 ){
832     ckptLoadEmpty(pDb);
833   }
834 
835   lsmFsMetaPageRelease(apPg[0]);
836   lsmFsMetaPageRelease(apPg[1]);
837 
838   return rc;
839 }
840 
841 /*
842 ** Store the snapshot in pDb->aSnapshot[] in meta-page iMeta.
843 */
lsmCheckpointStore(lsm_db * pDb,int iMeta)844 int lsmCheckpointStore(lsm_db *pDb, int iMeta){
845   MetaPage *pPg = 0;
846   int rc;
847 
848   assert( iMeta==1 || iMeta==2 );
849   rc = lsmFsMetaPageGet(pDb->pFS, 1, iMeta, &pPg);
850   if( rc==LSM_OK ){
851     u8 *aData;
852     int nData;
853     int nCkpt;
854 
855     nCkpt = (int)pDb->aSnapshot[CKPT_HDR_NCKPT];
856     aData = lsmFsMetaPageData(pPg, &nData);
857     memcpy(aData, pDb->aSnapshot, nCkpt*sizeof(u32));
858     ckptChangeEndianness((u32 *)aData, nCkpt);
859     rc = lsmFsMetaPageRelease(pPg);
860   }
861 
862   return rc;
863 }
864 
865 /*
866 ** Copy the current client snapshot from shared-memory to pDb->aSnapshot[].
867 */
lsmCheckpointLoad(lsm_db * pDb,int * piRead)868 int lsmCheckpointLoad(lsm_db *pDb, int *piRead){
869   int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
870   ShmHeader *pShm = pDb->pShmhdr;
871   while( (nRem--)>0 ){
872     int nInt;
873 
874     nInt = pShm->aSnap1[CKPT_HDR_NCKPT];
875     if( nInt<=(LSM_META_RW_PAGE_SIZE / sizeof(u32)) ){
876       memcpy(pDb->aSnapshot, pShm->aSnap1, nInt*sizeof(u32));
877       if( ckptChecksumOk(pDb->aSnapshot) ){
878         if( piRead ) *piRead = 1;
879         return LSM_OK;
880       }
881     }
882 
883     nInt = pShm->aSnap2[CKPT_HDR_NCKPT];
884     if( nInt<=(LSM_META_RW_PAGE_SIZE / sizeof(u32)) ){
885       memcpy(pDb->aSnapshot, pShm->aSnap2, nInt*sizeof(u32));
886       if( ckptChecksumOk(pDb->aSnapshot) ){
887         if( piRead ) *piRead = 2;
888         return LSM_OK;
889       }
890     }
891 
892     lsmShmBarrier(pDb);
893   }
894   return LSM_PROTOCOL_BKPT;
895 }
896 
lsmInfoCompressionId(lsm_db * db,u32 * piCmpId)897 int lsmInfoCompressionId(lsm_db *db, u32 *piCmpId){
898   int rc;
899 
900   assert( db->pClient==0 && db->pWorker==0 );
901   rc = lsmCheckpointLoad(db, 0);
902   if( rc==LSM_OK ){
903     *piCmpId = db->aSnapshot[CKPT_HDR_CMPID];
904   }
905 
906   return rc;
907 }
908 
lsmCheckpointLoadOk(lsm_db * pDb,int iSnap)909 int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){
910   u32 *aShm;
911   assert( iSnap==1 || iSnap==2 );
912   aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2;
913   return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) );
914 }
915 
lsmCheckpointClientCacheOk(lsm_db * pDb)916 int lsmCheckpointClientCacheOk(lsm_db *pDb){
917   return ( pDb->pClient
918         && pDb->pClient->iId==lsmCheckpointId(pDb->aSnapshot, 0)
919         && pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap1, 0)
920         && pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap2, 0)
921   );
922 }
923 
lsmCheckpointLoadWorker(lsm_db * pDb)924 int lsmCheckpointLoadWorker(lsm_db *pDb){
925   int rc;
926   ShmHeader *pShm = pDb->pShmhdr;
927   int nInt1;
928   int nInt2;
929 
930   /* Must be holding the WORKER lock to do this. Or DMS2. */
931   assert(
932       lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL)
933    || lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL)
934   );
935 
936   /* Check that the two snapshots match. If not, repair them. */
937   nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT];
938   nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT];
939   if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){
940     if( ckptChecksumOk(pShm->aSnap1) ){
941       memcpy(pShm->aSnap2, pShm->aSnap1, sizeof(u32)*nInt1);
942     }else if( ckptChecksumOk(pShm->aSnap2) ){
943       memcpy(pShm->aSnap1, pShm->aSnap2, sizeof(u32)*nInt2);
944     }else{
945       return LSM_PROTOCOL_BKPT;
946     }
947   }
948 
949   rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
950   if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
951 
952   if( rc==LSM_OK ){
953     rc = lsmCheckCompressionId(pDb, pDb->pWorker->iCmpId);
954   }
955 
956 #if 0
957   assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
958 #endif
959   return rc;
960 }
961 
lsmCheckpointDeserialize(lsm_db * pDb,int bInclFreelist,u32 * aCkpt,Snapshot ** ppSnap)962 int lsmCheckpointDeserialize(
963   lsm_db *pDb,
964   int bInclFreelist,              /* If true, deserialize free-list */
965   u32 *aCkpt,
966   Snapshot **ppSnap
967 ){
968   int rc = LSM_OK;
969   Snapshot *pNew;
970 
971   pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc);
972   if( rc==LSM_OK ){
973     Level *pLvl;
974     int nFree;
975     int i;
976     int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL];
977     int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;
978 
979     pNew->iId = lsmCheckpointId(aCkpt, 0);
980     pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];
981     pNew->nWrite = aCkpt[CKPT_HDR_NWRITE];
982     rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);
983     pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);
984     pNew->iCmpId = aCkpt[CKPT_HDR_CMPID];
985 
986     /* Make a copy of the append-list */
987     for(i=0; i<LSM_APPLIST_SZ; i++){
988       u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
989       pNew->aiAppend[i] = ckptRead64(a);
990     }
991 
992     /* Read the block-redirect list */
993     pNew->redirect.n = aCkpt[iIn++];
994     if( pNew->redirect.n ){
995       pNew->redirect.a = lsmMallocZeroRc(pDb->pEnv,
996           (sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS), &rc
997       );
998       if( rc==LSM_OK ){
999         for(i=0; i<pNew->redirect.n; i++){
1000           pNew->redirect.a[i].iFrom = aCkpt[iIn++];
1001           pNew->redirect.a[i].iTo = aCkpt[iIn++];
1002         }
1003       }
1004       for(pLvl=pNew->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
1005       if( pLvl->nRight ){
1006         pLvl->aRhs[pLvl->nRight-1].pRedirect = &pNew->redirect;
1007       }else{
1008         pLvl->lhs.pRedirect = &pNew->redirect;
1009       }
1010     }
1011 
1012     /* Copy the free-list */
1013     if( rc==LSM_OK && bInclFreelist ){
1014       nFree = aCkpt[iIn++];
1015       if( nFree ){
1016         pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
1017             pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
1018         );
1019         if( rc==LSM_OK ){
1020           int j;
1021           for(j=0; j<nFree; j++){
1022             FreelistEntry *p = &pNew->freelist.aEntry[j];
1023             p->iBlk = aCkpt[iIn++];
1024             p->iId = ((i64)(aCkpt[iIn])<<32) + aCkpt[iIn+1];
1025             iIn += 2;
1026           }
1027           pNew->freelist.nEntry = pNew->freelist.nAlloc = nFree;
1028         }
1029       }
1030     }
1031   }
1032 
1033   if( rc!=LSM_OK ){
1034     lsmFreeSnapshot(pDb->pEnv, pNew);
1035     pNew = 0;
1036   }
1037 
1038   *ppSnap = pNew;
1039   return rc;
1040 }
1041 
1042 /*
1043 ** Connection pDb must be the worker connection in order to call this
1044 ** function. It returns true if the database already contains the maximum
1045 ** number of levels or false otherwise.
1046 **
1047 ** This is used when flushing the in-memory tree to disk. If the database
1048 ** is already full, then the caller should invoke lsm_work() or similar
1049 ** until it is not full before creating a new level by flushing the in-memory
1050 ** tree to disk. Limiting the number of levels in the database ensures that
1051 ** the records describing them always fit within the checkpoint blob.
1052 */
lsmDatabaseFull(lsm_db * pDb)1053 int lsmDatabaseFull(lsm_db *pDb){
1054   Level *p;
1055   int nRhs = 0;
1056 
1057   assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );
1058   assert( pDb->pWorker );
1059 
1060   for(p=pDb->pWorker->pLevel; p; p=p->pNext){
1061     nRhs += (p->nRight ? p->nRight : 1);
1062   }
1063 
1064   return (nRhs >= LSM_MAX_RHS_SEGMENTS);
1065 }
1066 
1067 /*
1068 ** The connection passed as the only argument is currently the worker
1069 ** connection. Some work has been performed on the database by the connection,
1070 ** but no new snapshot has been written into shared memory.
1071 **
1072 ** This function updates the shared-memory worker and client snapshots with
1073 ** the new snapshot produced by the work performed by pDb.
1074 **
1075 ** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
1076 ** error code is returned.
1077 */
lsmCheckpointSaveWorker(lsm_db * pDb,int bFlush)1078 int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
1079   Snapshot *pSnap = pDb->pWorker;
1080   ShmHeader *pShm = pDb->pShmhdr;
1081   void *p = 0;
1082   int n = 0;
1083   int rc;
1084 
1085   pSnap->iId++;
1086   rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId, 1, &p, &n);
1087   if( rc!=LSM_OK ) return rc;
1088   assert( ckptChecksumOk((u32 *)p) );
1089 
1090   assert( n<=LSM_META_RW_PAGE_SIZE );
1091   memcpy(pShm->aSnap2, p, n);
1092   lsmShmBarrier(pDb);
1093   memcpy(pShm->aSnap1, p, n);
1094   lsmFree(pDb->pEnv, p);
1095 
1096   /* assert( lsmFsIntegrityCheck(pDb) ); */
1097   return LSM_OK;
1098 }
1099 
1100 /*
1101 ** This function is used to determine the snapshot-id of the most recently
1102 ** checkpointed snapshot. Variable ShmHeader.iMetaPage indicates which of
1103 ** the two meta-pages said snapshot resides on (if any).
1104 **
1105 ** If successful, this function loads the snapshot from the meta-page,
1106 ** verifies its checksum and sets *piId to the snapshot-id before returning
1107 ** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and
1108 ** LSM_OK returned. If an error occurs, an LSM error code is returned and
1109 ** the final value of *piId is undefined.
1110 */
lsmCheckpointSynced(lsm_db * pDb,i64 * piId,i64 * piLog,u32 * pnWrite)1111 int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite){
1112   int rc = LSM_OK;
1113   MetaPage *pPg;
1114   u32 iMeta;
1115 
1116   iMeta = pDb->pShmhdr->iMetaPage;
1117   if( iMeta==1 || iMeta==2 ){
1118     rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
1119     if( rc==LSM_OK ){
1120       int nCkpt;
1121       int nData;
1122       u8 *aData;
1123 
1124       aData = lsmFsMetaPageData(pPg, &nData);
1125       assert( nData==LSM_META_RW_PAGE_SIZE );
1126       nCkpt = lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
1127       if( nCkpt<(LSM_META_RW_PAGE_SIZE/sizeof(u32)) ){
1128         u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
1129         if( aCopy ){
1130           memcpy(aCopy, aData, nCkpt*sizeof(u32));
1131           ckptChangeEndianness(aCopy, nCkpt);
1132           if( ckptChecksumOk(aCopy) ){
1133             if( piId ) *piId = lsmCheckpointId(aCopy, 0);
1134             if( piLog ) *piLog = (lsmCheckpointLogOffset(aCopy) >> 1);
1135             if( pnWrite ) *pnWrite = aCopy[CKPT_HDR_NWRITE];
1136           }
1137           lsmFree(pDb->pEnv, aCopy);
1138         }
1139       }
1140       lsmFsMetaPageRelease(pPg);
1141     }
1142   }
1143 
1144   if( (iMeta!=1 && iMeta!=2) || rc!=LSM_OK || pDb->pShmhdr->iMetaPage!=iMeta ){
1145     if( piId ) *piId = 0;
1146     if( piLog ) *piLog = 0;
1147     if( pnWrite ) *pnWrite = 0;
1148   }
1149   return rc;
1150 }
1151 
1152 /*
1153 ** Return the checkpoint-id of the checkpoint array passed as the first
1154 ** argument to this function. If the second argument is true, then assume
1155 ** that the checkpoint is made up of 32-bit big-endian integers. If it
1156 ** is false, assume that the integers are in machine byte order.
1157 */
lsmCheckpointId(u32 * aCkpt,int bDisk)1158 i64 lsmCheckpointId(u32 *aCkpt, int bDisk){
1159   i64 iId;
1160   if( bDisk ){
1161     u8 *aData = (u8 *)aCkpt;
1162     iId = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32);
1163     iId += ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
1164   }else{
1165     iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW];
1166   }
1167   return iId;
1168 }
1169 
lsmCheckpointNBlock(u32 * aCkpt)1170 u32 lsmCheckpointNBlock(u32 *aCkpt){
1171   return aCkpt[CKPT_HDR_NBLOCK];
1172 }
1173 
lsmCheckpointNWrite(u32 * aCkpt,int bDisk)1174 u32 lsmCheckpointNWrite(u32 *aCkpt, int bDisk){
1175   if( bDisk ){
1176     return lsmGetU32((u8 *)&aCkpt[CKPT_HDR_NWRITE]);
1177   }else{
1178     return aCkpt[CKPT_HDR_NWRITE];
1179   }
1180 }
1181 
lsmCheckpointLogOffset(u32 * aCkpt)1182 i64 lsmCheckpointLogOffset(u32 *aCkpt){
1183   return ((i64)aCkpt[CKPT_HDR_LO_MSW] << 32) + (i64)aCkpt[CKPT_HDR_LO_LSW];
1184 }
1185 
lsmCheckpointPgsz(u32 * aCkpt)1186 int lsmCheckpointPgsz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_PGSZ]; }
1187 
lsmCheckpointBlksz(u32 * aCkpt)1188 int lsmCheckpointBlksz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_BLKSZ]; }
1189 
lsmCheckpointLogoffset(u32 * aCkpt,DbLog * pLog)1190 void lsmCheckpointLogoffset(
1191   u32 *aCkpt,
1192   DbLog *pLog
1193 ){
1194   pLog->aRegion[2].iStart = (lsmCheckpointLogOffset(aCkpt) >> 1);
1195 
1196   pLog->cksum0 = aCkpt[CKPT_HDR_LO_CKSUM1];
1197   pLog->cksum1 = aCkpt[CKPT_HDR_LO_CKSUM2];
1198   pLog->iSnapshotId = lsmCheckpointId(aCkpt, 0);
1199 }
1200 
lsmCheckpointZeroLogoffset(lsm_db * pDb)1201 void lsmCheckpointZeroLogoffset(lsm_db *pDb){
1202   u32 nCkpt;
1203 
1204   nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT];
1205   assert( nCkpt>CKPT_HDR_NCKPT );
1206   assert( nCkpt==pDb->pShmhdr->aSnap1[CKPT_HDR_NCKPT] );
1207   assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap1, nCkpt*sizeof(u32)) );
1208   assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap2, nCkpt*sizeof(u32)) );
1209 
1210   pDb->aSnapshot[CKPT_HDR_LO_MSW] = 0;
1211   pDb->aSnapshot[CKPT_HDR_LO_LSW] = 0;
1212   ckptChecksum(pDb->aSnapshot, nCkpt,
1213       &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
1214   );
1215 
1216   memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
1217   memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
1218 }
1219 
1220 /*
1221 ** Set the output variable to the number of KB of data written into the
1222 ** database file since the most recent checkpoint.
1223 */
lsmCheckpointSize(lsm_db * db,int * pnKB)1224 int lsmCheckpointSize(lsm_db *db, int *pnKB){
1225   int rc = LSM_OK;
1226   u32 nSynced;
1227 
1228   /* Set nSynced to the number of pages that had been written when the
1229   ** database was last checkpointed. */
1230   rc = lsmCheckpointSynced(db, 0, 0, &nSynced);
1231 
1232   if( rc==LSM_OK ){
1233     u32 nPgsz = db->pShmhdr->aSnap1[CKPT_HDR_PGSZ];
1234     u32 nWrite = db->pShmhdr->aSnap1[CKPT_HDR_NWRITE];
1235     *pnKB = (int)(( ((i64)(nWrite - nSynced) * nPgsz) + 1023) / 1024);
1236   }
1237 
1238   return rc;
1239 }
1240