1 /*
2 ** 2011-08-14
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** PAGE FORMAT:
14 **
15 **   The maximum page size is 65536 bytes.
16 **
17 **   Since all records are equal to or larger than 2 bytes in size, and
18 **   some space within the page is consumed by the page footer, there must
19 **   be less than 2^15 records on each page.
20 **
21 **   Each page ends with a footer that describes the pages contents. This
22 **   footer serves as similar purpose to the page header in an SQLite database.
23 **   A footer is used instead of a header because it makes it easier to
24 **   populate a new page based on a sorted list of key/value pairs.
25 **
26 **   The footer consists of the following values (starting at the end of
27 **   the page and continuing backwards towards the start). All values are
28 **   stored as unsigned big-endian integers.
29 **
30 **     * Number of records on page (2 bytes).
31 **     * Flags field (2 bytes).
32 **     * Left-hand pointer value (8 bytes).
33 **     * The starting offset of each record (2 bytes per record).
34 **
35 **   Records may span pages. Unless it happens to be an exact fit, the part
36 **   of the final record that starts on page X that does not fit on page X
37 **   is stored at the start of page (X+1). This means there may be pages where
38 **   (N==0). And on most pages the first record that starts on the page will
39 **   not start at byte offset 0. For example:
40 **
41 **      aaaaa bbbbb ccc <footer>    cc eeeee fffff g <footer>    gggg....
42 **
43 ** RECORD FORMAT:
44 **
45 **   The first byte of the record is a flags byte. It is a combination
46 **   of the following flags (defined in lsmInt.h):
47 **
48 **       LSM_START_DELETE
49 **       LSM_END_DELETE
50 **       LSM_POINT_DELETE
51 **       LSM_INSERT
52 **       LSM_SEPARATOR
53 **       LSM_SYSTEMKEY
54 **
55 **   Immediately following the type byte is a pointer to the smallest key
56 **   in the next file that is larger than the key in the current record. The
57 **   pointer is encoded as a varint. When added to the 32-bit page number
58 **   stored in the footer, it is the page number of the page that contains the
59 **   smallest key in the next sorted file that is larger than this key.
60 **
61 **   Next is the number of bytes in the key, encoded as a varint.
62 **
63 **   If the LSM_INSERT flag is set, the number of bytes in the value, as
64 **   a varint, is next.
65 **
66 **   Finally, the blob of data containing the key, and for LSM_INSERT
67 **   records, the value as well.
68 */
69 
70 #ifndef _LSM_INT_H
71 # include "lsmInt.h"
72 #endif
73 
74 #define LSM_LOG_STRUCTURE 0
75 #define LSM_LOG_DATA      0
76 
77 /*
78 ** Macros to help decode record types.
79 */
80 #define rtTopic(eType)       ((eType) & LSM_SYSTEMKEY)
81 #define rtIsDelete(eType)    (((eType) & 0x0F)==LSM_POINT_DELETE)
82 
83 #define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
84 #define rtIsWrite(eType)     (((eType) & LSM_INSERT)!=0)
85 #define rtIsSystem(eType)    (((eType) & LSM_SYSTEMKEY)!=0)
86 
87 /*
88 ** The following macros are used to access a page footer.
89 */
90 #define SEGMENT_NRECORD_OFFSET(pgsz)        ((pgsz) - 2)
91 #define SEGMENT_FLAGS_OFFSET(pgsz)          ((pgsz) - 2 - 2)
92 #define SEGMENT_POINTER_OFFSET(pgsz)        ((pgsz) - 2 - 2 - 8)
93 #define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 8 - 2 - (iCell)*2)
94 
95 #define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry)
96 
97 #define SEGMENT_BTREE_FLAG     0x0001
98 #define PGFTR_SKIP_NEXT_FLAG   0x0002
99 #define PGFTR_SKIP_THIS_FLAG   0x0004
100 
101 
102 #ifndef LSM_SEGMENTPTR_FREE_THRESHOLD
103 # define LSM_SEGMENTPTR_FREE_THRESHOLD 1024
104 #endif
105 
106 typedef struct SegmentPtr SegmentPtr;
107 typedef struct Blob Blob;
108 
109 struct Blob {
110   lsm_env *pEnv;
111   void *pData;
112   int nData;
113   int nAlloc;
114 };
115 
116 /*
117 ** A SegmentPtr object may be used for one of two purposes:
118 **
119 **   * To iterate and/or seek within a single Segment (the combination of a
120 **     main run and an optional sorted run).
121 **
122 **   * To iterate through the separators array of a segment.
123 */
124 struct SegmentPtr {
125   Level *pLevel;                /* Level object segment is part of */
126   Segment *pSeg;                /* Segment to access */
127 
128   /* Current page. See segmentPtrLoadPage(). */
129   Page *pPg;                    /* Current page */
130   u16 flags;                    /* Copy of page flags field */
131   int nCell;                    /* Number of cells on pPg */
132   Pgno iPtr;                    /* Base cascade pointer */
133 
134   /* Current cell. See segmentPtrLoadCell() */
135   int iCell;                    /* Current record within page pPg */
136   int eType;                    /* Type of current record */
137   Pgno iPgPtr;                  /* Cascade pointer offset */
138   void *pKey; int nKey;         /* Key associated with current record */
139   void *pVal; int nVal;         /* Current record value (eType==WRITE only) */
140 
141   /* Blobs used to allocate buffers for pKey and pVal as required */
142   Blob blob1;
143   Blob blob2;
144 };
145 
146 /*
147 ** Used to iterate through the keys stored in a b-tree hierarchy from start
148 ** to finish. Only First() and Next() operations are required.
149 **
150 **   btreeCursorNew()
151 **   btreeCursorFirst()
152 **   btreeCursorNext()
153 **   btreeCursorFree()
154 **   btreeCursorPosition()
155 **   btreeCursorRestore()
156 */
157 typedef struct BtreePg BtreePg;
158 typedef struct BtreeCursor BtreeCursor;
159 struct BtreePg {
160   Page *pPage;
161   int iCell;
162 };
163 struct BtreeCursor {
164   Segment *pSeg;                  /* Iterate through this segments btree */
165   FileSystem *pFS;                /* File system to read pages from */
166   int nDepth;                     /* Allocated size of aPg[] */
167   int iPg;                        /* Current entry in aPg[]. -1 -> EOF. */
168   BtreePg *aPg;                   /* Pages from root to current location */
169 
170   /* Cache of current entry. pKey==0 for EOF. */
171   void *pKey;
172   int nKey;
173   int eType;
174   Pgno iPtr;
175 
176   /* Storage for key, if not local */
177   Blob blob;
178 };
179 
180 
181 /*
182 ** A cursor used for merged searches or iterations through up to one
183 ** Tree structure and any number of sorted files.
184 **
185 **   lsmMCursorNew()
186 **   lsmMCursorSeek()
187 **   lsmMCursorNext()
188 **   lsmMCursorPrev()
189 **   lsmMCursorFirst()
190 **   lsmMCursorLast()
191 **   lsmMCursorKey()
192 **   lsmMCursorValue()
193 **   lsmMCursorValid()
194 **
195 ** iFree:
196 **   This variable is only used by cursors providing input data for a
197 **   new top-level segment. Such cursors only ever iterate forwards, not
198 **   backwards.
199 */
200 struct MultiCursor {
201   lsm_db *pDb;                    /* Connection that owns this cursor */
202   MultiCursor *pNext;             /* Next cursor owned by connection pDb */
203   int flags;                      /* Mask of CURSOR_XXX flags */
204 
205   int eType;                      /* Cache of current key type */
206   Blob key;                       /* Cache of current key (or NULL) */
207   Blob val;                       /* Cache of current value */
208 
209   /* All the component cursors: */
210   TreeCursor *apTreeCsr[2];       /* Up to two tree cursors */
211   int iFree;                      /* Next element of free-list (-ve for eof) */
212   SegmentPtr *aPtr;               /* Array of segment pointers */
213   int nPtr;                       /* Size of array aPtr[] */
214   BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */
215 
216   /* Comparison results */
217   int nTree;                      /* Size of aTree[] array */
218   int *aTree;                     /* Array of comparison results */
219 
220   /* Used by cursors flushing the in-memory tree only */
221   void *pSystemVal;               /* Pointer to buffer to free */
222 
223   /* Used by worker cursors only */
224   Pgno *pPrevMergePtr;
225 };
226 
227 /*
228 ** The following constants are used to assign integers to each component
229 ** cursor of a multi-cursor.
230 */
231 #define CURSOR_DATA_TREE0     0   /* Current tree cursor (apTreeCsr[0]) */
232 #define CURSOR_DATA_TREE1     1   /* The "old" tree, if any (apTreeCsr[1]) */
233 #define CURSOR_DATA_SYSTEM    2   /* Free-list entries (new-toplevel only) */
234 #define CURSOR_DATA_SEGMENT   3   /* First segment pointer (aPtr[0]) */
235 
236 /*
237 ** CURSOR_IGNORE_DELETE
238 **   If set, this cursor will not visit SORTED_DELETE keys.
239 **
240 ** CURSOR_FLUSH_FREELIST
241 **   This cursor is being used to create a new toplevel. It should also
242 **   iterate through the contents of the in-memory free block list.
243 **
244 ** CURSOR_IGNORE_SYSTEM
245 **   If set, this cursor ignores system keys.
246 **
247 ** CURSOR_NEXT_OK
248 **   Set if it is Ok to call lsm_csr_next().
249 **
250 ** CURSOR_PREV_OK
251 **   Set if it is Ok to call lsm_csr_prev().
252 **
253 ** CURSOR_READ_SEPARATORS
254 **   Set if this cursor should visit the separator keys in segment
255 **   aPtr[nPtr-1].
256 **
257 ** CURSOR_SEEK_EQ
258 **   Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
259 **   The key and value are stored in MultiCursor.key and MultiCursor.val
260 **   respectively.
261 */
262 #define CURSOR_IGNORE_DELETE    0x00000001
263 #define CURSOR_FLUSH_FREELIST   0x00000002
264 #define CURSOR_IGNORE_SYSTEM    0x00000010
265 #define CURSOR_NEXT_OK          0x00000020
266 #define CURSOR_PREV_OK          0x00000040
267 #define CURSOR_READ_SEPARATORS  0x00000080
268 #define CURSOR_SEEK_EQ          0x00000100
269 
270 typedef struct MergeWorker MergeWorker;
271 typedef struct Hierarchy Hierarchy;
272 
273 struct Hierarchy {
274   Page **apHier;
275   int nHier;
276 };
277 
278 /*
279 ** aSave:
280 **   When mergeWorkerNextPage() is called to advance to the next page in
281 **   the output segment, if the bStore flag for an element of aSave[] is
282 **   true, it is cleared and the corresponding iPgno value is set to the
283 **   page number of the page just completed.
284 **
285 **   aSave[0] is used to record the pointer value to be pushed into the
286 **   b-tree hierarchy. aSave[1] is used to save the page number of the
287 **   page containing the indirect key most recently written to the b-tree.
288 **   see mergeWorkerPushHierarchy() for details.
289 */
290 struct MergeWorker {
291   lsm_db *pDb;                    /* Database handle */
292   Level *pLevel;                  /* Worker snapshot Level being merged */
293   MultiCursor *pCsr;              /* Cursor to read new segment contents from */
294   int bFlush;                     /* True if this is an in-memory tree flush */
295   Hierarchy hier;                 /* B-tree hierarchy under construction */
296   Page *pPage;                    /* Current output page */
297   int nWork;                      /* Number of calls to mergeWorkerNextPage() */
298   Pgno *aGobble;                  /* Gobble point for each input segment */
299 
300   Pgno iIndirect;
301   struct SavedPgno {
302     Pgno iPgno;
303     int bStore;
304   } aSave[2];
305 };
306 
307 #ifdef LSM_DEBUG_EXPENSIVE
308 static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
309 static int assertBtreeOk(lsm_db *, Segment *);
310 static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
311 #else
312 #define assertRunInOrder(x,y)
313 #define assertBtreeOk(x,y)
314 #endif
315 
316 
317 struct FilePage { u8 *aData; int nData; };
fsPageData(Page * pPg,int * pnData)318 static u8 *fsPageData(Page *pPg, int *pnData){
319   *pnData = ((struct FilePage *)(pPg))->nData;
320   return ((struct FilePage *)(pPg))->aData;
321 }
322 /*UNUSED static u8 *fsPageDataPtr(Page *pPg){
323   return ((struct FilePage *)(pPg))->aData;
324 }*/
325 
326 /*
327 ** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
328 */
lsmPutU16(u8 * aOut,u16 nVal)329 void lsmPutU16(u8 *aOut, u16 nVal){
330   aOut[0] = (u8)((nVal>>8) & 0xFF);
331   aOut[1] = (u8)(nVal & 0xFF);
332 }
333 
lsmPutU32(u8 * aOut,u32 nVal)334 void lsmPutU32(u8 *aOut, u32 nVal){
335   aOut[0] = (u8)((nVal>>24) & 0xFF);
336   aOut[1] = (u8)((nVal>>16) & 0xFF);
337   aOut[2] = (u8)((nVal>> 8) & 0xFF);
338   aOut[3] = (u8)((nVal    ) & 0xFF);
339 }
340 
lsmGetU16(u8 * aOut)341 int lsmGetU16(u8 *aOut){
342   return (aOut[0] << 8) + aOut[1];
343 }
344 
lsmGetU32(u8 * aOut)345 u32 lsmGetU32(u8 *aOut){
346   return ((u32)aOut[0] << 24)
347        + ((u32)aOut[1] << 16)
348        + ((u32)aOut[2] << 8)
349        + ((u32)aOut[3]);
350 }
351 
lsmGetU64(u8 * aOut)352 u64 lsmGetU64(u8 *aOut){
353   return ((u64)aOut[0] << 56)
354        + ((u64)aOut[1] << 48)
355        + ((u64)aOut[2] << 40)
356        + ((u64)aOut[3] << 32)
357        + ((u64)aOut[4] << 24)
358        + ((u32)aOut[5] << 16)
359        + ((u32)aOut[6] << 8)
360        + ((u32)aOut[7]);
361 }
362 
lsmPutU64(u8 * aOut,u64 nVal)363 void lsmPutU64(u8 *aOut, u64 nVal){
364   aOut[0] = (u8)((nVal>>56) & 0xFF);
365   aOut[1] = (u8)((nVal>>48) & 0xFF);
366   aOut[2] = (u8)((nVal>>40) & 0xFF);
367   aOut[3] = (u8)((nVal>>32) & 0xFF);
368   aOut[4] = (u8)((nVal>>24) & 0xFF);
369   aOut[5] = (u8)((nVal>>16) & 0xFF);
370   aOut[6] = (u8)((nVal>> 8) & 0xFF);
371   aOut[7] = (u8)((nVal    ) & 0xFF);
372 }
373 
sortedBlobGrow(lsm_env * pEnv,Blob * pBlob,int nData)374 static int sortedBlobGrow(lsm_env *pEnv, Blob *pBlob, int nData){
375   assert( pBlob->pEnv==pEnv || (pBlob->pEnv==0 && pBlob->pData==0) );
376   if( pBlob->nAlloc<nData ){
377     pBlob->pData = lsmReallocOrFree(pEnv, pBlob->pData, nData);
378     if( !pBlob->pData ) return LSM_NOMEM_BKPT;
379     pBlob->nAlloc = nData;
380     pBlob->pEnv = pEnv;
381   }
382   return LSM_OK;
383 }
384 
sortedBlobSet(lsm_env * pEnv,Blob * pBlob,void * pData,int nData)385 static int sortedBlobSet(lsm_env *pEnv, Blob *pBlob, void *pData, int nData){
386   if( sortedBlobGrow(pEnv, pBlob, nData) ) return LSM_NOMEM;
387   memcpy(pBlob->pData, pData, nData);
388   pBlob->nData = nData;
389   return LSM_OK;
390 }
391 
392 #if 0
393 static int sortedBlobCopy(Blob *pDest, Blob *pSrc){
394   return sortedBlobSet(pDest, pSrc->pData, pSrc->nData);
395 }
396 #endif
397 
sortedBlobFree(Blob * pBlob)398 static void sortedBlobFree(Blob *pBlob){
399   assert( pBlob->pEnv || pBlob->pData==0 );
400   if( pBlob->pData ) lsmFree(pBlob->pEnv, pBlob->pData);
401   memset(pBlob, 0, sizeof(Blob));
402 }
403 
sortedReadData(Segment * pSeg,Page * pPg,int iOff,int nByte,void ** ppData,Blob * pBlob)404 static int sortedReadData(
405   Segment *pSeg,
406   Page *pPg,
407   int iOff,
408   int nByte,
409   void **ppData,
410   Blob *pBlob
411 ){
412   int rc = LSM_OK;
413   int iEnd;
414   int nData;
415   int nCell;
416   u8 *aData;
417 
418   aData = fsPageData(pPg, &nData);
419   nCell = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
420   iEnd = SEGMENT_EOF(nData, nCell);
421   assert( iEnd>0 && iEnd<nData );
422 
423   if( iOff+nByte<=iEnd ){
424     *ppData = (void *)&aData[iOff];
425   }else{
426     int nRem = nByte;
427     int i = iOff;
428     u8 *aDest;
429 
430     /* Make sure the blob is big enough to store the value being loaded. */
431     rc = sortedBlobGrow(lsmPageEnv(pPg), pBlob, nByte);
432     if( rc!=LSM_OK ) return rc;
433     pBlob->nData = nByte;
434     aDest = (u8 *)pBlob->pData;
435     *ppData = pBlob->pData;
436 
437     /* Increment the pointer pages ref-count. */
438     lsmFsPageRef(pPg);
439 
440     while( rc==LSM_OK ){
441       Page *pNext;
442       int flags;
443 
444       /* Copy data from pPg into the output buffer. */
445       int nCopy = LSM_MIN(nRem, iEnd-i);
446       if( nCopy>0 ){
447         memcpy(&aDest[nByte-nRem], &aData[i], nCopy);
448         nRem -= nCopy;
449         i += nCopy;
450         assert( nRem==0 || i==iEnd );
451       }
452       assert( nRem>=0 );
453       if( nRem==0 ) break;
454       i -= iEnd;
455 
456       /* Grab the next page in the segment */
457 
458       do {
459         rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
460         if( rc==LSM_OK && pNext==0 ){
461           rc = LSM_CORRUPT_BKPT;
462         }
463         if( rc ) break;
464         lsmFsPageRelease(pPg);
465         pPg = pNext;
466         aData = fsPageData(pPg, &nData);
467         flags = lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
468       }while( flags&SEGMENT_BTREE_FLAG );
469 
470       iEnd = SEGMENT_EOF(nData, lsmGetU16(&aData[nData-2]));
471       assert( iEnd>0 && iEnd<nData );
472     }
473 
474     lsmFsPageRelease(pPg);
475   }
476 
477   return rc;
478 }
479 
pageGetNRec(u8 * aData,int nData)480 static int pageGetNRec(u8 *aData, int nData){
481   return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
482 }
483 
pageGetPtr(u8 * aData,int nData)484 static Pgno pageGetPtr(u8 *aData, int nData){
485   return (Pgno)lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
486 }
487 
pageGetFlags(u8 * aData,int nData)488 static int pageGetFlags(u8 *aData, int nData){
489   return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
490 }
491 
pageGetCell(u8 * aData,int nData,int iCell)492 static u8 *pageGetCell(u8 *aData, int nData, int iCell){
493   return &aData[lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, iCell)])];
494 }
495 
496 /*
497 ** Return the number of cells on page pPg.
498 */
pageObjGetNRec(Page * pPg)499 static int pageObjGetNRec(Page *pPg){
500   int nData;
501   u8 *aData = lsmFsPageData(pPg, &nData);
502   return pageGetNRec(aData, nData);
503 }
504 
505 /*
506 ** Return the decoded (possibly relative) pointer value stored in cell
507 ** iCell from page aData/nData.
508 */
pageGetRecordPtr(u8 * aData,int nData,int iCell)509 static Pgno pageGetRecordPtr(u8 *aData, int nData, int iCell){
510   Pgno iRet;                      /* Return value */
511   u8 *aCell;                      /* Pointer to cell iCell */
512 
513   assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
514   aCell = pageGetCell(aData, nData, iCell);
515   lsmVarintGet64(&aCell[1], &iRet);
516   return iRet;
517 }
518 
pageGetKey(Segment * pSeg,Page * pPg,int iCell,int * piTopic,int * pnKey,Blob * pBlob)519 static u8 *pageGetKey(
520   Segment *pSeg,                  /* Segment pPg belongs to */
521   Page *pPg,                      /* Page to read from */
522   int iCell,                      /* Index of cell on page to read */
523   int *piTopic,                   /* OUT: Topic associated with this key */
524   int *pnKey,                     /* OUT: Size of key in bytes */
525   Blob *pBlob                     /* If required, use this for dynamic memory */
526 ){
527   u8 *pKey;
528   int nDummy;
529   int eType;
530   u8 *aData;
531   int nData;
532 
533   aData = fsPageData(pPg, &nData);
534 
535   assert( !(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
536   assert( iCell<pageGetNRec(aData, nData) );
537 
538   pKey = pageGetCell(aData, nData, iCell);
539   eType = *pKey++;
540   pKey += lsmVarintGet32(pKey, &nDummy);
541   pKey += lsmVarintGet32(pKey, pnKey);
542   if( rtIsWrite(eType) ){
543     pKey += lsmVarintGet32(pKey, &nDummy);
544   }
545   *piTopic = rtTopic(eType);
546 
547   sortedReadData(pSeg, pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob);
548   return pKey;
549 }
550 
pageGetKeyCopy(lsm_env * pEnv,Segment * pSeg,Page * pPg,int iCell,int * piTopic,Blob * pBlob)551 static int pageGetKeyCopy(
552   lsm_env *pEnv,                  /* Environment handle */
553   Segment *pSeg,                  /* Segment pPg belongs to */
554   Page *pPg,                      /* Page to read from */
555   int iCell,                      /* Index of cell on page to read */
556   int *piTopic,                   /* OUT: Topic associated with this key */
557   Blob *pBlob                     /* If required, use this for dynamic memory */
558 ){
559   int rc = LSM_OK;
560   int nKey;
561   u8 *aKey;
562 
563   aKey = pageGetKey(pSeg, pPg, iCell, piTopic, &nKey, pBlob);
564   assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData );
565   if( (void *)aKey!=pBlob->pData ){
566     rc = sortedBlobSet(pEnv, pBlob, aKey, nKey);
567   }
568 
569   return rc;
570 }
571 
pageGetBtreeRef(Page * pPg,int iKey)572 static Pgno pageGetBtreeRef(Page *pPg, int iKey){
573   Pgno iRef;
574   u8 *aData;
575   int nData;
576   u8 *aCell;
577 
578   aData = fsPageData(pPg, &nData);
579   aCell = pageGetCell(aData, nData, iKey);
580   assert( aCell[0]==0 );
581   aCell++;
582   aCell += lsmVarintGet64(aCell, &iRef);
583   lsmVarintGet64(aCell, &iRef);
584   assert( iRef>0 );
585   return iRef;
586 }
587 
588 #define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i)))
589 #define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))
590 
pageGetBtreeKey(Segment * pSeg,Page * pPg,int iKey,Pgno * piPtr,int * piTopic,void ** ppKey,int * pnKey,Blob * pBlob)591 static int pageGetBtreeKey(
592   Segment *pSeg,                  /* Segment page pPg belongs to */
593   Page *pPg,
594   int iKey,
595   Pgno *piPtr,
596   int *piTopic,
597   void **ppKey,
598   int *pnKey,
599   Blob *pBlob
600 ){
601   u8 *aData;
602   int nData;
603   u8 *aCell;
604   int eType;
605 
606   aData = fsPageData(pPg, &nData);
607   assert( SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData) );
608   assert( iKey>=0 && iKey<pageGetNRec(aData, nData) );
609 
610   aCell = pageGetCell(aData, nData, iKey);
611   eType = *aCell++;
612   aCell += GETVARINT64(aCell, *piPtr);
613 
614   if( eType==0 ){
615     int rc;
616     Pgno iRef;                  /* Page number of referenced page */
617     Page *pRef;
618     aCell += GETVARINT64(aCell, iRef);
619     rc = lsmFsDbPageGet(lsmPageFS(pPg), pSeg, iRef, &pRef);
620     if( rc!=LSM_OK ) return rc;
621     pageGetKeyCopy(lsmPageEnv(pPg), pSeg, pRef, 0, &eType, pBlob);
622     lsmFsPageRelease(pRef);
623     *ppKey = pBlob->pData;
624     *pnKey = pBlob->nData;
625   }else{
626     aCell += GETVARINT32(aCell, *pnKey);
627     *ppKey = aCell;
628   }
629   if( piTopic ) *piTopic = rtTopic(eType);
630 
631   return LSM_OK;
632 }
633 
btreeCursorLoadKey(BtreeCursor * pCsr)634 static int btreeCursorLoadKey(BtreeCursor *pCsr){
635   int rc = LSM_OK;
636   if( pCsr->iPg<0 ){
637     pCsr->pKey = 0;
638     pCsr->nKey = 0;
639     pCsr->eType = 0;
640   }else{
641     Pgno dummy;
642     int iPg = pCsr->iPg;
643     int iCell = pCsr->aPg[iPg].iCell;
644     while( iCell<0 && (--iPg)>=0 ){
645       iCell = pCsr->aPg[iPg].iCell-1;
646     }
647     if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;
648 
649     rc = pageGetBtreeKey(
650         pCsr->pSeg,
651         pCsr->aPg[iPg].pPage, iCell,
652         &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
653     );
654     pCsr->eType |= LSM_SEPARATOR;
655   }
656 
657   return rc;
658 }
659 
btreeCursorPtr(u8 * aData,int nData,int iCell)660 static int btreeCursorPtr(u8 *aData, int nData, int iCell){
661   int nCell;
662 
663   nCell = pageGetNRec(aData, nData);
664   if( iCell>=nCell ){
665     return (int)pageGetPtr(aData, nData);
666   }
667   return (int)pageGetRecordPtr(aData, nData, iCell);
668 }
669 
btreeCursorNext(BtreeCursor * pCsr)670 static int btreeCursorNext(BtreeCursor *pCsr){
671   int rc = LSM_OK;
672 
673   BtreePg *pPg = &pCsr->aPg[pCsr->iPg];
674   int nCell;
675   u8 *aData;
676   int nData;
677 
678   assert( pCsr->iPg>=0 );
679   assert( pCsr->iPg==pCsr->nDepth-1 );
680 
681   aData = fsPageData(pPg->pPage, &nData);
682   nCell = pageGetNRec(aData, nData);
683   assert( pPg->iCell<=nCell );
684   pPg->iCell++;
685   if( pPg->iCell==nCell ){
686     Pgno iLoad;
687 
688     /* Up to parent. */
689     lsmFsPageRelease(pPg->pPage);
690     pPg->pPage = 0;
691     pCsr->iPg--;
692     while( pCsr->iPg>=0 ){
693       pPg = &pCsr->aPg[pCsr->iPg];
694       aData = fsPageData(pPg->pPage, &nData);
695       if( pPg->iCell<pageGetNRec(aData, nData) ) break;
696       lsmFsPageRelease(pPg->pPage);
697       pCsr->iPg--;
698     }
699 
700     /* Read the key */
701     rc = btreeCursorLoadKey(pCsr);
702 
703     /* Unless the cursor is at EOF, descend to cell -1 (yes, negative one) of
704     ** the left-most most descendent. */
705     if( pCsr->iPg>=0 ){
706       pCsr->aPg[pCsr->iPg].iCell++;
707 
708       iLoad = btreeCursorPtr(aData, nData, pPg->iCell);
709       do {
710         Page *pLoad;
711         pCsr->iPg++;
712         rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pLoad);
713         pCsr->aPg[pCsr->iPg].pPage = pLoad;
714         pCsr->aPg[pCsr->iPg].iCell = 0;
715         if( rc==LSM_OK ){
716           if( pCsr->iPg==(pCsr->nDepth-1) ) break;
717           aData = fsPageData(pLoad, &nData);
718           iLoad = btreeCursorPtr(aData, nData, 0);
719         }
720       }while( rc==LSM_OK && pCsr->iPg<(pCsr->nDepth-1) );
721       pCsr->aPg[pCsr->iPg].iCell = -1;
722     }
723 
724   }else{
725     rc = btreeCursorLoadKey(pCsr);
726   }
727 
728   if( rc==LSM_OK && pCsr->iPg>=0 ){
729     aData = fsPageData(pCsr->aPg[pCsr->iPg].pPage, &nData);
730     pCsr->iPtr = btreeCursorPtr(aData, nData, pCsr->aPg[pCsr->iPg].iCell+1);
731   }
732 
733   return rc;
734 }
735 
btreeCursorFree(BtreeCursor * pCsr)736 static void btreeCursorFree(BtreeCursor *pCsr){
737   if( pCsr ){
738     int i;
739     lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
740     for(i=0; i<=pCsr->iPg; i++){
741       lsmFsPageRelease(pCsr->aPg[i].pPage);
742     }
743     sortedBlobFree(&pCsr->blob);
744     lsmFree(pEnv, pCsr->aPg);
745     lsmFree(pEnv, pCsr);
746   }
747 }
748 
btreeCursorFirst(BtreeCursor * pCsr)749 static int btreeCursorFirst(BtreeCursor *pCsr){
750   int rc;
751 
752   Page *pPg = 0;
753   FileSystem *pFS = pCsr->pFS;
754   int iPg = (int)pCsr->pSeg->iRoot;
755 
756   do {
757     rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg);
758     assert( (rc==LSM_OK)==(pPg!=0) );
759     if( rc==LSM_OK ){
760       u8 *aData;
761       int nData;
762       int flags;
763 
764       aData = fsPageData(pPg, &nData);
765       flags = pageGetFlags(aData, nData);
766       if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
767 
768       if( (pCsr->nDepth % 8)==0 ){
769         int nNew = pCsr->nDepth + 8;
770         pCsr->aPg = (BtreePg *)lsmReallocOrFreeRc(
771             lsmFsEnv(pFS), pCsr->aPg, sizeof(BtreePg) * nNew, &rc
772         );
773         if( rc==LSM_OK ){
774           memset(&pCsr->aPg[pCsr->nDepth], 0, sizeof(BtreePg) * 8);
775         }
776       }
777 
778       if( rc==LSM_OK ){
779         assert( pCsr->aPg[pCsr->nDepth].iCell==0 );
780         pCsr->aPg[pCsr->nDepth].pPage = pPg;
781         pCsr->nDepth++;
782         iPg = (int)pageGetRecordPtr(aData, nData, 0);
783       }
784     }
785   }while( rc==LSM_OK );
786   lsmFsPageRelease(pPg);
787   pCsr->iPg = pCsr->nDepth-1;
788 
789   if( rc==LSM_OK && pCsr->nDepth ){
790     pCsr->aPg[pCsr->iPg].iCell = -1;
791     rc = btreeCursorNext(pCsr);
792   }
793 
794   return rc;
795 }
796 
btreeCursorPosition(BtreeCursor * pCsr,MergeInput * p)797 static void btreeCursorPosition(BtreeCursor *pCsr, MergeInput *p){
798   if( pCsr->iPg>=0 ){
799     p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
800     p->iCell = ((pCsr->aPg[pCsr->iPg].iCell + 1) << 8) + pCsr->nDepth;
801   }else{
802     p->iPg = 0;
803     p->iCell = 0;
804   }
805 }
806 
btreeCursorSplitkey(BtreeCursor * pCsr,MergeInput * p)807 static void btreeCursorSplitkey(BtreeCursor *pCsr, MergeInput *p){
808   int iCell = pCsr->aPg[pCsr->iPg].iCell;
809   if( iCell>=0 ){
810     p->iCell = iCell;
811     p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
812   }else{
813     int i;
814     for(i=pCsr->iPg-1; i>=0; i--){
815       if( pCsr->aPg[i].iCell>0 ) break;
816     }
817     assert( i>=0 );
818     p->iCell = pCsr->aPg[i].iCell-1;
819     p->iPg = lsmFsPageNumber(pCsr->aPg[i].pPage);
820   }
821 }
822 
sortedKeyCompare(int (* xCmp)(void *,int,void *,int),int iLhsTopic,void * pLhsKey,int nLhsKey,int iRhsTopic,void * pRhsKey,int nRhsKey)823 static int sortedKeyCompare(
824   int (*xCmp)(void *, int, void *, int),
825   int iLhsTopic, void *pLhsKey, int nLhsKey,
826   int iRhsTopic, void *pRhsKey, int nRhsKey
827 ){
828   int res = iLhsTopic - iRhsTopic;
829   if( res==0 ){
830     res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey);
831   }
832   return res;
833 }
834 
btreeCursorRestore(BtreeCursor * pCsr,int (* xCmp)(void *,int,void *,int),MergeInput * p)835 static int btreeCursorRestore(
836   BtreeCursor *pCsr,
837   int (*xCmp)(void *, int, void *, int),
838   MergeInput *p
839 ){
840   int rc = LSM_OK;
841 
842   if( p->iPg ){
843     lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
844     int iCell;                    /* Current cell number on leaf page */
845     Pgno iLeaf;                   /* Page number of current leaf page */
846     int nDepth;                   /* Depth of b-tree structure */
847     Segment *pSeg = pCsr->pSeg;
848 
849     /* Decode the MergeInput structure */
850     iLeaf = p->iPg;
851     nDepth = (p->iCell & 0x00FF);
852     iCell = (p->iCell >> 8) - 1;
853 
854     /* Allocate the BtreeCursor.aPg[] array */
855     assert( pCsr->aPg==0 );
856     pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc);
857 
858     /* Populate the last entry of the aPg[] array */
859     if( rc==LSM_OK ){
860       Page **pp = &pCsr->aPg[nDepth-1].pPage;
861       pCsr->iPg = nDepth-1;
862       pCsr->nDepth = nDepth;
863       pCsr->aPg[pCsr->iPg].iCell = iCell;
864       rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp);
865     }
866 
867     /* Populate any other aPg[] array entries */
868     if( rc==LSM_OK && nDepth>1 ){
869       Blob blob = {0,0,0};
870       void *pSeek;
871       int nSeek;
872       int iTopicSeek;
873       int iPg = 0;
874       int iLoad = (int)pSeg->iRoot;
875       Page *pPg = pCsr->aPg[nDepth-1].pPage;
876 
877       if( pageObjGetNRec(pPg)==0 ){
878         /* This can happen when pPg is the right-most leaf in the b-tree.
879         ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
880         ** greater than any real key.  */
881         assert( iCell==-1 );
882         iTopicSeek = 1000;
883         pSeek = 0;
884         nSeek = 0;
885       }else{
886         Pgno dummy;
887         rc = pageGetBtreeKey(pSeg, pPg,
888             0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
889         );
890       }
891 
892       do {
893         Page *pPg2;
894         rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg2);
895         assert( rc==LSM_OK || pPg2==0 );
896         if( rc==LSM_OK ){
897           u8 *aData;                  /* Buffer containing page data */
898           int nData;                  /* Size of aData[] in bytes */
899           int iMin;
900           int iMax;
901           int iCell2;
902 
903           aData = fsPageData(pPg2, &nData);
904           assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
905 
906           iLoad = (int)pageGetPtr(aData, nData);
907           iCell2 = pageGetNRec(aData, nData);
908           iMax = iCell2-1;
909           iMin = 0;
910 
911           while( iMax>=iMin ){
912             int iTry = (iMin+iMax)/2;
913             void *pKey; int nKey;         /* Key for cell iTry */
914             int iTopic;                   /* Topic for key pKeyT/nKeyT */
915             Pgno iPtr;                    /* Pointer for cell iTry */
916             int res;                      /* (pSeek - pKeyT) */
917 
918             rc = pageGetBtreeKey(
919                 pSeg, pPg2, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob
920             );
921             if( rc!=LSM_OK ) break;
922 
923             res = sortedKeyCompare(
924                 xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
925             );
926             assert( res!=0 );
927 
928             if( res<0 ){
929               iLoad = (int)iPtr;
930               iCell2 = iTry;
931               iMax = iTry-1;
932             }else{
933               iMin = iTry+1;
934             }
935           }
936 
937           pCsr->aPg[iPg].pPage = pPg2;
938           pCsr->aPg[iPg].iCell = iCell2;
939           iPg++;
940           assert( iPg!=nDepth-1
941                || lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf
942           );
943         }
944       }while( rc==LSM_OK && iPg<(nDepth-1) );
945       sortedBlobFree(&blob);
946     }
947 
948     /* Load the current key and pointer */
949     if( rc==LSM_OK ){
950       BtreePg *pBtreePg;
951       u8 *aData;
952       int nData;
953 
954       pBtreePg = &pCsr->aPg[pCsr->iPg];
955       aData = fsPageData(pBtreePg->pPage, &nData);
956       pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
957       if( pBtreePg->iCell<0 ){
958         Pgno dummy;
959         int i;
960         for(i=pCsr->iPg-1; i>=0; i--){
961           if( pCsr->aPg[i].iCell>0 ) break;
962         }
963         assert( i>=0 );
964         rc = pageGetBtreeKey(pSeg,
965             pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
966             &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
967         );
968         pCsr->eType |= LSM_SEPARATOR;
969 
970       }else{
971         rc = btreeCursorLoadKey(pCsr);
972       }
973     }
974   }
975   return rc;
976 }
977 
btreeCursorNew(lsm_db * pDb,Segment * pSeg,BtreeCursor ** ppCsr)978 static int btreeCursorNew(
979   lsm_db *pDb,
980   Segment *pSeg,
981   BtreeCursor **ppCsr
982 ){
983   int rc = LSM_OK;
984   BtreeCursor *pCsr;
985 
986   assert( pSeg->iRoot );
987   pCsr = lsmMallocZeroRc(pDb->pEnv, sizeof(BtreeCursor), &rc);
988   if( pCsr ){
989     pCsr->pFS = pDb->pFS;
990     pCsr->pSeg = pSeg;
991     pCsr->iPg = -1;
992   }
993 
994   *ppCsr = pCsr;
995   return rc;
996 }
997 
segmentPtrSetPage(SegmentPtr * pPtr,Page * pNext)998 static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
999   lsmFsPageRelease(pPtr->pPg);
1000   if( pNext ){
1001     int nData;
1002     u8 *aData = fsPageData(pNext, &nData);
1003     pPtr->nCell = pageGetNRec(aData, nData);
1004     pPtr->flags = (u16)pageGetFlags(aData, nData);
1005     pPtr->iPtr = pageGetPtr(aData, nData);
1006   }
1007   pPtr->pPg = pNext;
1008 }
1009 
1010 /*
1011 ** Load a new page into the SegmentPtr object pPtr.
1012 */
segmentPtrLoadPage(FileSystem * pFS,SegmentPtr * pPtr,int iNew)1013 static int segmentPtrLoadPage(
1014   FileSystem *pFS,
1015   SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
1016   int iNew                       /* Page number of new page */
1017 ){
1018   Page *pPg = 0;                 /* The new page */
1019   int rc;                        /* Return Code */
1020 
1021   rc = lsmFsDbPageGet(pFS, pPtr->pSeg, iNew, &pPg);
1022   assert( rc==LSM_OK || pPg==0 );
1023   segmentPtrSetPage(pPtr, pPg);
1024 
1025   return rc;
1026 }
1027 
segmentPtrReadData(SegmentPtr * pPtr,int iOff,int nByte,void ** ppData,Blob * pBlob)1028 static int segmentPtrReadData(
1029   SegmentPtr *pPtr,
1030   int iOff,
1031   int nByte,
1032   void **ppData,
1033   Blob *pBlob
1034 ){
1035   return sortedReadData(pPtr->pSeg, pPtr->pPg, iOff, nByte, ppData, pBlob);
1036 }
1037 
segmentPtrNextPage(SegmentPtr * pPtr,int eDir)1038 static int segmentPtrNextPage(
1039   SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
1040   int eDir                       /* +1 for next(), -1 for prev() */
1041 ){
1042   Page *pNext;                   /* New page to load */
1043   int rc;                        /* Return code */
1044 
1045   assert( eDir==1 || eDir==-1 );
1046   assert( pPtr->pPg );
1047   assert( pPtr->pSeg || eDir>0 );
1048 
1049   rc = lsmFsDbPageNext(pPtr->pSeg, pPtr->pPg, eDir, &pNext);
1050   assert( rc==LSM_OK || pNext==0 );
1051   segmentPtrSetPage(pPtr, pNext);
1052   return rc;
1053 }
1054 
segmentPtrLoadCell(SegmentPtr * pPtr,int iNew)1055 static int segmentPtrLoadCell(
1056   SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
1057   int iNew                       /* Cell number of new cell */
1058 ){
1059   int rc = LSM_OK;
1060   if( pPtr->pPg ){
1061     u8 *aData;                    /* Pointer to page data buffer */
1062     int iOff;                     /* Offset in aData[] to read from */
1063     int nPgsz;                    /* Size of page (aData[]) in bytes */
1064 
1065     assert( iNew<pPtr->nCell );
1066     pPtr->iCell = iNew;
1067     aData = fsPageData(pPtr->pPg, &nPgsz);
1068     iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]);
1069     pPtr->eType = aData[iOff];
1070     iOff++;
1071     iOff += GETVARINT64(&aData[iOff], pPtr->iPgPtr);
1072     iOff += GETVARINT32(&aData[iOff], pPtr->nKey);
1073     if( rtIsWrite(pPtr->eType) ){
1074       iOff += GETVARINT32(&aData[iOff], pPtr->nVal);
1075     }
1076     assert( pPtr->nKey>=0 );
1077 
1078     rc = segmentPtrReadData(
1079         pPtr, iOff, pPtr->nKey, &pPtr->pKey, &pPtr->blob1
1080     );
1081     if( rc==LSM_OK && rtIsWrite(pPtr->eType) ){
1082       rc = segmentPtrReadData(
1083           pPtr, iOff+pPtr->nKey, pPtr->nVal, &pPtr->pVal, &pPtr->blob2
1084       );
1085     }else{
1086       pPtr->nVal = 0;
1087       pPtr->pVal = 0;
1088     }
1089   }
1090 
1091   return rc;
1092 }
1093 
1094 
sortedSplitkeySegment(Level * pLevel)1095 static Segment *sortedSplitkeySegment(Level *pLevel){
1096   Merge *pMerge = pLevel->pMerge;
1097   MergeInput *p = &pMerge->splitkey;
1098   Segment *pSeg;
1099   int i;
1100 
1101   for(i=0; i<pMerge->nInput; i++){
1102     if( p->iPg==pMerge->aInput[i].iPg ) break;
1103   }
1104   if( pMerge->nInput==(pLevel->nRight+1) && i>=(pMerge->nInput-1) ){
1105     pSeg = &pLevel->pNext->lhs;
1106   }else{
1107     pSeg = &pLevel->aRhs[i];
1108   }
1109 
1110   return pSeg;
1111 }
1112 
sortedSplitkey(lsm_db * pDb,Level * pLevel,int * pRc)1113 static void sortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
1114   Segment *pSeg;
1115   Page *pPg = 0;
1116   lsm_env *pEnv = pDb->pEnv;      /* Environment handle */
1117   int rc = *pRc;
1118   Merge *pMerge = pLevel->pMerge;
1119 
1120   pSeg = sortedSplitkeySegment(pLevel);
1121   if( rc==LSM_OK ){
1122     rc = lsmFsDbPageGet(pDb->pFS, pSeg, pMerge->splitkey.iPg, &pPg);
1123   }
1124   if( rc==LSM_OK ){
1125     int iTopic;
1126     Blob blob = {0, 0, 0, 0};
1127     u8 *aData;
1128     int nData;
1129 
1130     aData = lsmFsPageData(pPg, &nData);
1131     if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){
1132       void *pKey;
1133       int nKey;
1134       Pgno dummy;
1135       rc = pageGetBtreeKey(pSeg,
1136           pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob
1137       );
1138       if( rc==LSM_OK && blob.pData!=pKey ){
1139         rc = sortedBlobSet(pEnv, &blob, pKey, nKey);
1140       }
1141     }else{
1142       rc = pageGetKeyCopy(
1143           pEnv, pSeg, pPg, pMerge->splitkey.iCell, &iTopic, &blob
1144       );
1145     }
1146 
1147     pLevel->iSplitTopic = iTopic;
1148     pLevel->pSplitKey = blob.pData;
1149     pLevel->nSplitKey = blob.nData;
1150     lsmFsPageRelease(pPg);
1151   }
1152 
1153   *pRc = rc;
1154 }
1155 
1156 /*
1157 ** Reset a segment cursor. Also free its buffers if they are nThreshold
1158 ** bytes or larger in size.
1159 */
segmentPtrReset(SegmentPtr * pPtr,int nThreshold)1160 static void segmentPtrReset(SegmentPtr *pPtr, int nThreshold){
1161   lsmFsPageRelease(pPtr->pPg);
1162   pPtr->pPg = 0;
1163   pPtr->nCell = 0;
1164   pPtr->pKey = 0;
1165   pPtr->nKey = 0;
1166   pPtr->pVal = 0;
1167   pPtr->nVal = 0;
1168   pPtr->eType = 0;
1169   pPtr->iCell = 0;
1170   if( pPtr->blob1.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob1);
1171   if( pPtr->blob2.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob2);
1172 }
1173 
segmentPtrIgnoreSeparators(MultiCursor * pCsr,SegmentPtr * pPtr)1174 static int segmentPtrIgnoreSeparators(MultiCursor *pCsr, SegmentPtr *pPtr){
1175   return (pCsr->flags & CURSOR_READ_SEPARATORS)==0
1176       || (pPtr!=&pCsr->aPtr[pCsr->nPtr-1]);
1177 }
1178 
segmentPtrAdvance(MultiCursor * pCsr,SegmentPtr * pPtr,int bReverse)1179 static int segmentPtrAdvance(
1180   MultiCursor *pCsr,
1181   SegmentPtr *pPtr,
1182   int bReverse
1183 ){
1184   int eDir = (bReverse ? -1 : 1);
1185   Level *pLvl = pPtr->pLevel;
1186   do {
1187     int rc;
1188     int iCell;                    /* Number of new cell in page */
1189     int svFlags = 0;              /* SegmentPtr.eType before advance */
1190 
1191     iCell = pPtr->iCell + eDir;
1192     assert( pPtr->pPg );
1193     assert( iCell<=pPtr->nCell && iCell>=-1 );
1194 
1195     if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){
1196       svFlags = pPtr->eType;
1197       assert( svFlags );
1198     }
1199 
1200     if( iCell>=pPtr->nCell || iCell<0 ){
1201       do {
1202         rc = segmentPtrNextPage(pPtr, eDir);
1203       }while( rc==LSM_OK
1204            && pPtr->pPg
1205            && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) )
1206       );
1207       if( rc!=LSM_OK ) return rc;
1208       iCell = bReverse ? (pPtr->nCell-1) : 0;
1209     }
1210     rc = segmentPtrLoadCell(pPtr, iCell);
1211     if( rc!=LSM_OK ) return rc;
1212 
1213     if( svFlags && pPtr->pPg ){
1214       int res = sortedKeyCompare(pCsr->pDb->xCmp,
1215           rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1216           pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1217       );
1218       if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1219     }
1220 
1221     if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
1222       Segment *pSeg = pPtr->pSeg;
1223       rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg);
1224       if( rc!=LSM_OK ) return rc;
1225       pPtr->eType = LSM_START_DELETE | LSM_POINT_DELETE;
1226       pPtr->eType |= (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
1227       pPtr->pKey = pLvl->pSplitKey;
1228       pPtr->nKey = pLvl->nSplitKey;
1229     }
1230 
1231   }while( pCsr
1232        && pPtr->pPg
1233        && segmentPtrIgnoreSeparators(pCsr, pPtr)
1234        && rtIsSeparator(pPtr->eType)
1235   );
1236 
1237   return LSM_OK;
1238 }
1239 
segmentPtrEndPage(FileSystem * pFS,SegmentPtr * pPtr,int bLast,int * pRc)1240 static void segmentPtrEndPage(
1241   FileSystem *pFS,
1242   SegmentPtr *pPtr,
1243   int bLast,
1244   int *pRc
1245 ){
1246   if( *pRc==LSM_OK ){
1247     Segment *pSeg = pPtr->pSeg;
1248     Page *pNew = 0;
1249     if( bLast ){
1250       *pRc = lsmFsDbPageLast(pFS, pSeg, &pNew);
1251     }else{
1252       *pRc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pNew);
1253     }
1254     segmentPtrSetPage(pPtr, pNew);
1255   }
1256 }
1257 
1258 
1259 /*
1260 ** Try to move the segment pointer passed as the second argument so that it
1261 ** points at either the first (bLast==0) or last (bLast==1) cell in the valid
1262 ** region of the segment defined by pPtr->iFirst and pPtr->iLast.
1263 **
1264 ** Return LSM_OK if successful or an lsm error code if something goes
1265 ** wrong (IO error, OOM etc.).
1266 */
segmentPtrEnd(MultiCursor * pCsr,SegmentPtr * pPtr,int bLast)1267 static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
1268   Level *pLvl = pPtr->pLevel;
1269   int rc = LSM_OK;
1270   FileSystem *pFS = pCsr->pDb->pFS;
1271   int bIgnore;
1272 
1273   segmentPtrEndPage(pFS, pPtr, bLast, &rc);
1274   while( rc==LSM_OK && pPtr->pPg
1275       && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
1276   ){
1277     rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
1278   }
1279 
1280   if( rc==LSM_OK && pPtr->pPg ){
1281     rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
1282     if( rc==LSM_OK && bLast && pPtr->pSeg!=&pLvl->lhs ){
1283       int res = sortedKeyCompare(pCsr->pDb->xCmp,
1284           rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1285           pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1286       );
1287       if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1288     }
1289   }
1290 
1291   bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
1292   if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
1293     rc = segmentPtrAdvance(pCsr, pPtr, bLast);
1294   }
1295 
1296 #if 0
1297   if( bLast && rc==LSM_OK && pPtr->pPg
1298    && pPtr->pSeg==&pLvl->lhs
1299    && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
1300   ){
1301     pPtr->iCell++;
1302     pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
1303     pPtr->pKey = pLvl->pSplitKey;
1304     pPtr->nKey = pLvl->nSplitKey;
1305     pPtr->pVal = 0;
1306     pPtr->nVal = 0;
1307   }
1308 #endif
1309 
1310   return rc;
1311 }
1312 
segmentPtrKey(SegmentPtr * pPtr,void ** ppKey,int * pnKey)1313 static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
1314   assert( pPtr->pPg );
1315   *ppKey = pPtr->pKey;
1316   *pnKey = pPtr->nKey;
1317 }
1318 
1319 #if 0 /* NOT USED */
1320 static char *keyToString(lsm_env *pEnv, void *pKey, int nKey){
1321   int i;
1322   u8 *aKey = (u8 *)pKey;
1323   char *zRet = (char *)lsmMalloc(pEnv, nKey+1);
1324 
1325   for(i=0; i<nKey; i++){
1326     zRet[i] = (char)(isalnum(aKey[i]) ? aKey[i] : '.');
1327   }
1328   zRet[nKey] = '\0';
1329   return zRet;
1330 }
1331 #endif
1332 
1333 #if 0 /* NOT USED */
1334 /*
1335 ** Check that the page that pPtr currently has loaded is the correct page
1336 ** to search for key (pKey/nKey). If it is, return 1. Otherwise, an assert
1337 ** fails and this function does not return.
1338 */
1339 static int assertKeyLocation(
1340   MultiCursor *pCsr,
1341   SegmentPtr *pPtr,
1342   void *pKey, int nKey
1343 ){
1344   lsm_env *pEnv = lsmFsEnv(pCsr->pDb->pFS);
1345   Blob blob = {0, 0, 0};
1346   int eDir;
1347   int iTopic = 0;                 /* TODO: Fix me */
1348 
1349   for(eDir=-1; eDir<=1; eDir+=2){
1350     Page *pTest = pPtr->pPg;
1351 
1352     lsmFsPageRef(pTest);
1353     while( pTest ){
1354       Segment *pSeg = pPtr->pSeg;
1355       Page *pNext;
1356 
1357       int rc = lsmFsDbPageNext(pSeg, pTest, eDir, &pNext);
1358       lsmFsPageRelease(pTest);
1359       if( rc ) return 1;
1360       pTest = pNext;
1361 
1362       if( pTest ){
1363         int nData;
1364         u8 *aData = fsPageData(pTest, &nData);
1365         int nCell = pageGetNRec(aData, nData);
1366         int flags = pageGetFlags(aData, nData);
1367         if( nCell && 0==(flags&SEGMENT_BTREE_FLAG) ){
1368           int nPgKey;
1369           int iPgTopic;
1370           u8 *pPgKey;
1371           int res;
1372           int iCell;
1373 
1374           iCell = ((eDir < 0) ? (nCell-1) : 0);
1375           pPgKey = pageGetKey(pSeg, pTest, iCell, &iPgTopic, &nPgKey, &blob);
1376           res = iTopic - iPgTopic;
1377           if( res==0 ) res = pCsr->pDb->xCmp(pKey, nKey, pPgKey, nPgKey);
1378           if( (eDir==1 && res>0) || (eDir==-1 && res<0) ){
1379             /* Taking this branch means something has gone wrong. */
1380             char *zMsg = lsmMallocPrintf(pEnv, "Key \"%s\" is not on page %d",
1381                 keyToString(pEnv, pKey, nKey), lsmFsPageNumber(pPtr->pPg)
1382             );
1383             fprintf(stderr, "%s\n", zMsg);
1384             assert( !"assertKeyLocation() failed" );
1385           }
1386           lsmFsPageRelease(pTest);
1387           pTest = 0;
1388         }
1389       }
1390     }
1391   }
1392 
1393   sortedBlobFree(&blob);
1394   return 1;
1395 }
1396 #endif
1397 
1398 #ifndef NDEBUG
assertSeekResult(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int eSeek)1399 static int assertSeekResult(
1400   MultiCursor *pCsr,
1401   SegmentPtr *pPtr,
1402   int iTopic,
1403   void *pKey,
1404   int nKey,
1405   int eSeek
1406 ){
1407   if( pPtr->pPg ){
1408     int res;
1409     res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
1410         rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
1411     );
1412 
1413     if( eSeek==LSM_SEEK_EQ ) return (res==0);
1414     if( eSeek==LSM_SEEK_LE ) return (res>=0);
1415     if( eSeek==LSM_SEEK_GE ) return (res<=0);
1416   }
1417 
1418   return 1;
1419 }
1420 #endif
1421 
segmentPtrSearchOversized(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey)1422 static int segmentPtrSearchOversized(
1423   MultiCursor *pCsr,              /* Cursor context */
1424   SegmentPtr *pPtr,               /* Pointer to seek */
1425   int iTopic,                     /* Topic of key to search for */
1426   void *pKey, int nKey            /* Key to seek to */
1427 ){
1428   int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
1429   int rc = LSM_OK;
1430 
1431   /* If the OVERSIZED flag is set, then there is no pointer in the
1432   ** upper level to the next page in the segment that contains at least
1433   ** one key. So compare the largest key on the current page with the
1434   ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance
1435   ** to the next page in the segment that contains at least one key.
1436   */
1437   while( rc==LSM_OK && (pPtr->flags & PGFTR_SKIP_NEXT_FLAG) ){
1438     u8 *pLastKey;
1439     int nLastKey;
1440     int iLastTopic;
1441     int res;                      /* Result of comparison */
1442     Page *pNext;
1443 
1444     /* Load the last key on the current page. */
1445     pLastKey = pageGetKey(pPtr->pSeg,
1446         pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
1447     );
1448 
1449     /* If the loaded key is >= than (pKey/nKey), break out of the loop.
1450     ** If (pKey/nKey) is present in this array, it must be on the current
1451     ** page.  */
1452     res = sortedKeyCompare(
1453         xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey
1454     );
1455     if( res>=0 ) break;
1456 
1457     /* Advance to the next page that contains at least one key. */
1458     pNext = pPtr->pPg;
1459     lsmFsPageRef(pNext);
1460     while( 1 ){
1461       Page *pLoad;
1462       u8 *aData; int nData;
1463 
1464       rc = lsmFsDbPageNext(pPtr->pSeg, pNext, 1, &pLoad);
1465       lsmFsPageRelease(pNext);
1466       pNext = pLoad;
1467       if( pNext==0 ) break;
1468 
1469       assert( rc==LSM_OK );
1470       aData = lsmFsPageData(pNext, &nData);
1471       if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0
1472        && pageGetNRec(aData, nData)>0
1473       ){
1474         break;
1475       }
1476     }
1477     if( pNext==0 ) break;
1478     segmentPtrSetPage(pPtr, pNext);
1479 
1480     /* This should probably be an LSM_CORRUPT error. */
1481     assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) );
1482   }
1483 
1484   return rc;
1485 }
1486 
ptrFwdPointer(Page * pPage,int iCell,Segment * pSeg,Pgno * piPtr,int * pbFound)1487 static int ptrFwdPointer(
1488   Page *pPage,
1489   int iCell,
1490   Segment *pSeg,
1491   Pgno *piPtr,
1492   int *pbFound
1493 ){
1494   Page *pPg = pPage;
1495   int iFirst = iCell;
1496   int rc = LSM_OK;
1497 
1498   do {
1499     Page *pNext = 0;
1500     u8 *aData;
1501     int nData;
1502 
1503     aData = lsmFsPageData(pPg, &nData);
1504     if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){
1505       int i;
1506       int nCell = pageGetNRec(aData, nData);
1507       for(i=iFirst; i<nCell; i++){
1508         u8 eType = *pageGetCell(aData, nData, i);
1509         if( (eType & LSM_START_DELETE)==0 ){
1510           *pbFound = 1;
1511           *piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData);
1512           lsmFsPageRelease(pPg);
1513           return LSM_OK;
1514         }
1515       }
1516     }
1517 
1518     rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
1519     lsmFsPageRelease(pPg);
1520     pPg = pNext;
1521     iFirst = 0;
1522   }while( pPg && rc==LSM_OK );
1523   lsmFsPageRelease(pPg);
1524 
1525   *pbFound = 0;
1526   return rc;
1527 }
1528 
sortedRhsFirst(MultiCursor * pCsr,Level * pLvl,SegmentPtr * pPtr)1529 static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
1530   int rc;
1531   rc = segmentPtrEnd(pCsr, pPtr, 0);
1532   while( pPtr->pPg && rc==LSM_OK ){
1533     int res = sortedKeyCompare(pCsr->pDb->xCmp,
1534         pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
1535         rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
1536     );
1537     if( res<=0 ) break;
1538     rc = segmentPtrAdvance(pCsr, pPtr, 0);
1539   }
1540   return rc;
1541 }
1542 
1543 
1544 /*
1545 ** This function is called as part of a SEEK_GE op on a multi-cursor if the
1546 ** FC pointer read from segment *pPtr comes from an entry with the
1547 ** LSM_START_DELETE flag set. In this case the pointer value cannot be
1548 ** trusted. Instead, the pointer that should be followed is that associated
1549 ** with the next entry in *pPtr that does not have LSM_START_DELETE set.
1550 **
1551 ** Why the pointers can't be trusted:
1552 **
1553 **
1554 **
1555 ** TODO: This is a stop-gap solution:
1556 **
1557 **   At the moment, this function is called from within segmentPtrSeek(),
1558 **   as part of the initial lsmMCursorSeek() call. However, consider a
1559 **   database where the following has occurred:
1560 **
1561 **      1. A range delete removes keys 1..9999 using a range delete.
1562 **      2. Keys 1 through 9999 are reinserted.
1563 **      3. The levels containing the ops in 1. and 2. above are merged. Call
1564 **         this level N. Level N contains FC pointers to level N+1.
1565 **
1566 **   Then, if the user attempts to query for (key>=2 LIMIT 10), the
1567 **   lsmMCursorSeek() call will iterate through 9998 entries searching for a
1568 **   pointer down to the level N+1 that is never actually used. It would be
1569 **   much better if the multi-cursor could do this lazily - only seek to the
1570 **   level (N+1) page after the user has moved the cursor on level N passed
1571 **   the big range-delete.
1572 */
segmentPtrFwdPointer(MultiCursor * pCsr,SegmentPtr * pPtr,Pgno * piPtr)1573 static int segmentPtrFwdPointer(
1574   MultiCursor *pCsr,              /* Multi-cursor pPtr belongs to */
1575   SegmentPtr *pPtr,               /* Segment-pointer to extract FC ptr from */
1576   Pgno *piPtr                     /* OUT: FC pointer value */
1577 ){
1578   Level *pLvl = pPtr->pLevel;
1579   Level *pNext = pLvl->pNext;
1580   Page *pPg = pPtr->pPg;
1581   int rc;
1582   int bFound;
1583   Pgno iOut = 0;
1584 
1585   if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){
1586     if( pNext==0
1587         || (pNext->nRight==0 && pNext->lhs.iRoot)
1588         || (pNext->nRight!=0 && pNext->aRhs[0].iRoot)
1589       ){
1590       /* Do nothing. The pointer will not be used anyway. */
1591       return LSM_OK;
1592     }
1593   }else{
1594     if( pPtr[1].pSeg->iRoot ){
1595       return LSM_OK;
1596     }
1597   }
1598 
1599   /* Search for a pointer within the current segment. */
1600   lsmFsPageRef(pPg);
1601   rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound);
1602 
1603   if( rc==LSM_OK && bFound==0 ){
1604     /* This case happens when pPtr points to the left-hand-side of a segment
1605     ** currently undergoing an incremental merge. In this case, jump to the
1606     ** oldest segment in the right-hand-side of the same level and continue
1607     ** searching. But - do not consider any keys smaller than the levels
1608     ** split-key. */
1609     SegmentPtr ptr;
1610 
1611     if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){
1612       return LSM_CORRUPT_BKPT;
1613     }
1614 
1615     memset(&ptr, 0, sizeof(SegmentPtr));
1616     ptr.pLevel = pPtr->pLevel;
1617     ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1];
1618     rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr);
1619     if( rc==LSM_OK ){
1620       rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound);
1621       ptr.pPg = 0;
1622     }
1623     segmentPtrReset(&ptr, 0);
1624   }
1625 
1626   *piPtr = iOut;
1627   return rc;
1628 }
1629 
segmentPtrSeek(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int eSeek,int * piPtr,int * pbStop)1630 static int segmentPtrSeek(
1631   MultiCursor *pCsr,              /* Cursor context */
1632   SegmentPtr *pPtr,               /* Pointer to seek */
1633   int iTopic,                     /* Key topic to seek to */
1634   void *pKey, int nKey,           /* Key to seek to */
1635   int eSeek,                      /* Search bias - see above */
1636   int *piPtr,                     /* OUT: FC pointer */
1637   int *pbStop
1638 ){
1639   int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
1640   int res = 0;                        /* Result of comparison operation */
1641   int rc = LSM_OK;
1642   int iMin;
1643   int iMax;
1644   Pgno iPtrOut = 0;
1645 
1646   /* If the current page contains an oversized entry, then there are no
1647   ** pointers to one or more of the subsequent pages in the sorted run.
1648   ** The following call ensures that the segment-ptr points to the correct
1649   ** page in this case.  */
1650   rc = segmentPtrSearchOversized(pCsr, pPtr, iTopic, pKey, nKey);
1651   iPtrOut = pPtr->iPtr;
1652 
1653   /* Assert that this page is the right page of this segment for the key
1654   ** that we are searching for. Do this by loading page (iPg-1) and testing
1655   ** that pKey/nKey is greater than all keys on that page, and then by
1656   ** loading (iPg+1) and testing that pKey/nKey is smaller than all
1657   ** the keys it houses.
1658   **
1659   ** TODO: With range-deletes in the tree, the test described above may fail.
1660   */
1661 #if 0
1662   assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
1663 #endif
1664 
1665   assert( pPtr->nCell>0
1666        || pPtr->pSeg->nSize==1
1667        || lsmFsDbPageIsLast(pPtr->pSeg, pPtr->pPg)
1668   );
1669   if( pPtr->nCell==0 ){
1670     segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1671   }else{
1672     iMin = 0;
1673     iMax = pPtr->nCell-1;
1674 
1675     while( 1 ){
1676       int iTry = (iMin+iMax)/2;
1677       void *pKeyT; int nKeyT;       /* Key for cell iTry */
1678       int iTopicT;
1679 
1680       assert( iTry<iMax || iMin==iMax );
1681 
1682       rc = segmentPtrLoadCell(pPtr, iTry);
1683       if( rc!=LSM_OK ) break;
1684 
1685       segmentPtrKey(pPtr, &pKeyT, &nKeyT);
1686       iTopicT = rtTopic(pPtr->eType);
1687 
1688       res = sortedKeyCompare(xCmp, iTopicT, pKeyT, nKeyT, iTopic, pKey, nKey);
1689       if( res<=0 ){
1690         iPtrOut = pPtr->iPtr + pPtr->iPgPtr;
1691       }
1692 
1693       if( res==0 || iMin==iMax ){
1694         break;
1695       }else if( res>0 ){
1696         iMax = LSM_MAX(iTry-1, iMin);
1697       }else{
1698         iMin = iTry+1;
1699       }
1700     }
1701 
1702     if( rc==LSM_OK ){
1703       assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
1704       if( res ){
1705         rc = segmentPtrLoadCell(pPtr, iMin);
1706       }
1707       assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) );
1708 
1709       if( rc==LSM_OK ){
1710         switch( eSeek ){
1711           case LSM_SEEK_EQ: {
1712             int eType = pPtr->eType;
1713             if( (res<0 && (eType & LSM_START_DELETE))
1714              || (res>0 && (eType & LSM_END_DELETE))
1715              || (res==0 && (eType & LSM_POINT_DELETE))
1716             ){
1717               *pbStop = 1;
1718             }else if( res==0 && (eType & LSM_INSERT) ){
1719               lsm_env *pEnv = pCsr->pDb->pEnv;
1720               *pbStop = 1;
1721               pCsr->eType = pPtr->eType;
1722               rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey);
1723               if( rc==LSM_OK ){
1724                 rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal);
1725               }
1726               pCsr->flags |= CURSOR_SEEK_EQ;
1727             }
1728             segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1729             break;
1730           }
1731           case LSM_SEEK_LE:
1732             if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
1733             break;
1734           case LSM_SEEK_GE: {
1735             /* Figure out if we need to 'skip' the pointer forward or not */
1736             if( (res<=0 && (pPtr->eType & LSM_START_DELETE))
1737              || (res>0  && (pPtr->eType & LSM_END_DELETE))
1738             ){
1739               rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut);
1740             }
1741             if( res<0 && rc==LSM_OK ){
1742               rc = segmentPtrAdvance(pCsr, pPtr, 0);
1743             }
1744             break;
1745           }
1746         }
1747       }
1748     }
1749 
1750     /* If the cursor seek has found a separator key, and this cursor is
1751     ** supposed to ignore separators keys, advance to the next entry.  */
1752     if( rc==LSM_OK && pPtr->pPg
1753      && segmentPtrIgnoreSeparators(pCsr, pPtr)
1754      && rtIsSeparator(pPtr->eType)
1755     ){
1756       assert( eSeek!=LSM_SEEK_EQ );
1757       rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
1758     }
1759   }
1760 
1761   assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
1762   *piPtr = (int)iPtrOut;
1763   return rc;
1764 }
1765 
seekInBtree(MultiCursor * pCsr,Segment * pSeg,int iTopic,void * pKey,int nKey,Pgno * aPg,Page ** ppPg)1766 static int seekInBtree(
1767   MultiCursor *pCsr,              /* Multi-cursor object */
1768   Segment *pSeg,                  /* Seek within this segment */
1769   int iTopic,
1770   void *pKey, int nKey,           /* Key to seek to */
1771   Pgno *aPg,                      /* OUT: Page numbers */
1772   Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
1773 ){
1774   int i = 0;
1775   int rc;
1776   int iPg;
1777   Page *pPg = 0;
1778   Blob blob = {0, 0, 0};
1779 
1780   iPg = (int)pSeg->iRoot;
1781   do {
1782     Pgno *piFirst = 0;
1783     if( aPg ){
1784       aPg[i++] = iPg;
1785       piFirst = &aPg[i];
1786     }
1787 
1788     rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, iPg, &pPg);
1789     assert( rc==LSM_OK || pPg==0 );
1790     if( rc==LSM_OK ){
1791       u8 *aData;                  /* Buffer containing page data */
1792       int nData;                  /* Size of aData[] in bytes */
1793       int iMin;
1794       int iMax;
1795       int nRec;
1796       int flags;
1797 
1798       aData = fsPageData(pPg, &nData);
1799       flags = pageGetFlags(aData, nData);
1800       if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
1801 
1802       iPg = (int)pageGetPtr(aData, nData);
1803       nRec = pageGetNRec(aData, nData);
1804 
1805       iMin = 0;
1806       iMax = nRec-1;
1807       while( iMax>=iMin ){
1808         int iTry = (iMin+iMax)/2;
1809         void *pKeyT; int nKeyT;       /* Key for cell iTry */
1810         int iTopicT;                  /* Topic for key pKeyT/nKeyT */
1811         Pgno iPtr;                    /* Pointer associated with cell iTry */
1812         int res;                      /* (pKey - pKeyT) */
1813 
1814         rc = pageGetBtreeKey(
1815             pSeg, pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob
1816         );
1817         if( rc!=LSM_OK ) break;
1818         if( piFirst && pKeyT==blob.pData ){
1819           *piFirst = pageGetBtreeRef(pPg, iTry);
1820           piFirst = 0;
1821           i++;
1822         }
1823 
1824         res = sortedKeyCompare(
1825             pCsr->pDb->xCmp, iTopic, pKey, nKey, iTopicT, pKeyT, nKeyT
1826         );
1827         if( res<0 ){
1828           iPg = (int)iPtr;
1829           iMax = iTry-1;
1830         }else{
1831           iMin = iTry+1;
1832         }
1833       }
1834       lsmFsPageRelease(pPg);
1835       pPg = 0;
1836     }
1837   }while( rc==LSM_OK );
1838 
1839   sortedBlobFree(&blob);
1840   assert( (rc==LSM_OK)==(pPg!=0) );
1841   if( ppPg ){
1842     *ppPg = pPg;
1843   }else{
1844     lsmFsPageRelease(pPg);
1845   }
1846   return rc;
1847 }
1848 
seekInSegment(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int iPg,int eSeek,int * piPtr,int * pbStop)1849 static int seekInSegment(
1850   MultiCursor *pCsr,
1851   SegmentPtr *pPtr,
1852   int iTopic,
1853   void *pKey, int nKey,
1854   int iPg,                        /* Page to search */
1855   int eSeek,                      /* Search bias - see above */
1856   int *piPtr,                     /* OUT: FC pointer */
1857   int *pbStop                     /* OUT: Stop search flag */
1858 ){
1859   int iPtr = iPg;
1860   int rc = LSM_OK;
1861 
1862   if( pPtr->pSeg->iRoot ){
1863     Page *pPg;
1864     assert( pPtr->pSeg->iRoot!=0 );
1865     rc = seekInBtree(pCsr, pPtr->pSeg, iTopic, pKey, nKey, 0, &pPg);
1866     if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
1867   }else{
1868     if( iPtr==0 ){
1869       iPtr = (int)pPtr->pSeg->iFirst;
1870     }
1871     if( rc==LSM_OK ){
1872       rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
1873     }
1874   }
1875 
1876   if( rc==LSM_OK ){
1877     rc = segmentPtrSeek(pCsr, pPtr, iTopic, pKey, nKey, eSeek, piPtr, pbStop);
1878   }
1879   return rc;
1880 }
1881 
1882 /*
1883 ** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
1884 **
1885 ** pbStop:
1886 **   This parameter is only significant if parameter eSeek is set to
1887 **   LSM_SEEK_EQ. In this case, it is set to true before returning if
1888 **   the seek operation is finished. This can happen in two ways:
1889 **
1890 **     a) A key matching (pKey/nKey) is found, or
1891 **     b) A point-delete or range-delete deleting the key is found.
1892 **
1893 **   In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
1894 **   and pCsr->val blobs populated before returning.
1895 */
seekInLevel(MultiCursor * pCsr,SegmentPtr * aPtr,int eSeek,int iTopic,void * pKey,int nKey,Pgno * piPgno,int * pbStop)1896 static int seekInLevel(
1897   MultiCursor *pCsr,              /* Sorted cursor object to seek */
1898   SegmentPtr *aPtr,               /* Pointer to array of (nRhs+1) SPs */
1899   int eSeek,                      /* Search bias - see above */
1900   int iTopic,                     /* Key topic to search for */
1901   void *pKey, int nKey,           /* Key to search for */
1902   Pgno *piPgno,                   /* IN/OUT: fraction cascade pointer (or 0) */
1903   int *pbStop                     /* OUT: See above */
1904 ){
1905   Level *pLvl = aPtr[0].pLevel;   /* Level to seek within */
1906   int rc = LSM_OK;                /* Return code */
1907   int iOut = 0;                   /* Pointer to return to caller */
1908   int res = -1;                   /* Result of xCmp(pKey, split) */
1909   int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */
1910   int bStop = 0;
1911 
1912   /* If this is a composite level (one currently undergoing an incremental
1913   ** merge), figure out if the search key is larger or smaller than the
1914   ** levels split-key.  */
1915   if( nRhs ){
1916     res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
1917         pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1918     );
1919   }
1920 
1921   /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
1922   ** is not a composite level and there is no split-key). Search the
1923   ** left-hand-side of the level in this case.  */
1924   if( res<0 ){
1925     int iPtr = 0;
1926     if( nRhs==0 ) iPtr = (int)*piPgno;
1927 
1928     rc = seekInSegment(
1929         pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
1930     );
1931     if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
1932       res = 0;
1933     }
1934   }
1935 
1936   if( res>=0 ){
1937     int bHit = 0;                 /* True if at least one rhs is not EOF */
1938     int iPtr = (int)*piPgno;
1939     int i;
1940     for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
1941       SegmentPtr *pPtr = &aPtr[i];
1942       iOut = 0;
1943       rc = seekInSegment(
1944           pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
1945       );
1946       iPtr = iOut;
1947 
1948       /* If the segment-pointer has settled on a key that is smaller than
1949       ** the splitkey, invalidate the segment-pointer.  */
1950       if( pPtr->pPg ){
1951         res = sortedKeyCompare(pCsr->pDb->xCmp,
1952             rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1953             pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1954         );
1955         if( res<0 ){
1956           if( pPtr->eType & LSM_START_DELETE ){
1957             pPtr->eType &= ~LSM_INSERT;
1958             pPtr->pKey = pLvl->pSplitKey;
1959             pPtr->nKey = pLvl->nSplitKey;
1960             pPtr->pVal = 0;
1961             pPtr->nVal = 0;
1962           }else{
1963             segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1964           }
1965         }
1966       }
1967 
1968       if( aPtr[i].pKey ) bHit = 1;
1969     }
1970 
1971     if( rc==LSM_OK && eSeek==LSM_SEEK_LE && bHit==0 ){
1972       rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
1973     }
1974   }
1975 
1976   assert( eSeek==LSM_SEEK_EQ || bStop==0 );
1977   *piPgno = iOut;
1978   *pbStop = bStop;
1979   return rc;
1980 }
1981 
multiCursorGetKey(MultiCursor * pCsr,int iKey,int * peType,void ** ppKey,int * pnKey)1982 static void multiCursorGetKey(
1983   MultiCursor *pCsr,
1984   int iKey,
1985   int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
1986   void **ppKey,                   /* OUT: Pointer to buffer containing key */
1987   int *pnKey                      /* OUT: Size of *ppKey in bytes */
1988 ){
1989   int nKey = 0;
1990   void *pKey = 0;
1991   int eType = 0;
1992 
1993   switch( iKey ){
1994     case CURSOR_DATA_TREE0:
1995     case CURSOR_DATA_TREE1: {
1996       TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
1997       if( lsmTreeCursorValid(pTreeCsr) ){
1998         lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey);
1999       }
2000       break;
2001     }
2002 
2003     case CURSOR_DATA_SYSTEM: {
2004       Snapshot *pWorker = pCsr->pDb->pWorker;
2005       if( pWorker && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){
2006         int nEntry = pWorker->freelist.nEntry;
2007         if( pCsr->iFree < (nEntry*2) ){
2008           FreelistEntry *aEntry = pWorker->freelist.aEntry;
2009           int i = nEntry - 1 - (pCsr->iFree / 2);
2010           u32 iKey2 = 0;
2011 
2012           if( (pCsr->iFree % 2) ){
2013             eType = LSM_END_DELETE|LSM_SYSTEMKEY;
2014             iKey2 = aEntry[i].iBlk-1;
2015           }else if( aEntry[i].iId>=0 ){
2016             eType = LSM_INSERT|LSM_SYSTEMKEY;
2017             iKey2 = aEntry[i].iBlk;
2018 
2019             /* If the in-memory entry immediately before this one was a
2020              ** DELETE, and the block number is one greater than the current
2021              ** block number, mark this entry as an "end-delete-range". */
2022             if( i<(nEntry-1) && aEntry[i+1].iBlk==iKey2+1 && aEntry[i+1].iId<0 ){
2023               eType |= LSM_END_DELETE;
2024             }
2025 
2026           }else{
2027             eType = LSM_START_DELETE|LSM_SYSTEMKEY;
2028             iKey2 = aEntry[i].iBlk + 1;
2029           }
2030 
2031           /* If the in-memory entry immediately after this one is a
2032           ** DELETE, and the block number is one less than the current
2033           ** key, mark this entry as an "start-delete-range".  */
2034           if( i>0 && aEntry[i-1].iBlk==iKey2-1 && aEntry[i-1].iId<0 ){
2035             eType |= LSM_START_DELETE;
2036           }
2037 
2038           pKey = pCsr->pSystemVal;
2039           nKey = 4;
2040           lsmPutU32(pKey, ~iKey2);
2041         }
2042       }
2043       break;
2044     }
2045 
2046     default: {
2047       int iPtr = iKey - CURSOR_DATA_SEGMENT;
2048       assert( iPtr>=0 );
2049       if( iPtr==pCsr->nPtr ){
2050         if( pCsr->pBtCsr ){
2051           pKey = pCsr->pBtCsr->pKey;
2052           nKey = pCsr->pBtCsr->nKey;
2053           eType = pCsr->pBtCsr->eType;
2054         }
2055       }else if( iPtr<pCsr->nPtr ){
2056         SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2057         if( pPtr->pPg ){
2058           pKey = pPtr->pKey;
2059           nKey = pPtr->nKey;
2060           eType = pPtr->eType;
2061         }
2062       }
2063       break;
2064     }
2065   }
2066 
2067   if( peType ) *peType = eType;
2068   if( pnKey ) *pnKey = nKey;
2069   if( ppKey ) *ppKey = pKey;
2070 }
2071 
sortedDbKeyCompare(MultiCursor * pCsr,int iLhsFlags,void * pLhsKey,int nLhsKey,int iRhsFlags,void * pRhsKey,int nRhsKey)2072 static int sortedDbKeyCompare(
2073   MultiCursor *pCsr,
2074   int iLhsFlags, void *pLhsKey, int nLhsKey,
2075   int iRhsFlags, void *pRhsKey, int nRhsKey
2076 ){
2077   int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
2078   int res;
2079 
2080   /* Compare the keys, including the system flag. */
2081   res = sortedKeyCompare(xCmp,
2082     rtTopic(iLhsFlags), pLhsKey, nLhsKey,
2083     rtTopic(iRhsFlags), pRhsKey, nRhsKey
2084   );
2085 
2086   /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
2087   ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
2088   ** the beginning of an open-ended set from masking a database entry or
2089   ** delete at a lower level.  */
2090   if( res==0 && (pCsr->flags & CURSOR_IGNORE_DELETE) ){
2091     const int m = LSM_POINT_DELETE|LSM_INSERT|LSM_END_DELETE |LSM_START_DELETE;
2092     int iDel1 = 0;
2093     int iDel2 = 0;
2094 
2095     if( LSM_START_DELETE==(iLhsFlags & m) ) iDel1 = +1;
2096     if( LSM_END_DELETE  ==(iLhsFlags & m) ) iDel1 = -1;
2097     if( LSM_START_DELETE==(iRhsFlags & m) ) iDel2 = +1;
2098     if( LSM_END_DELETE  ==(iRhsFlags & m) ) iDel2 = -1;
2099 
2100     res = (iDel1 - iDel2);
2101   }
2102 
2103   return res;
2104 }
2105 
multiCursorDoCompare(MultiCursor * pCsr,int iOut,int bReverse)2106 static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
2107   int i1;
2108   int i2;
2109   int iRes;
2110   void *pKey1; int nKey1; int eType1;
2111   void *pKey2; int nKey2; int eType2;
2112   const int mul = (bReverse ? -1 : 1);
2113 
2114   assert( pCsr->aTree && iOut<pCsr->nTree );
2115   if( iOut>=(pCsr->nTree/2) ){
2116     i1 = (iOut - pCsr->nTree/2) * 2;
2117     i2 = i1 + 1;
2118   }else{
2119     i1 = pCsr->aTree[iOut*2];
2120     i2 = pCsr->aTree[iOut*2+1];
2121   }
2122 
2123   multiCursorGetKey(pCsr, i1, &eType1, &pKey1, &nKey1);
2124   multiCursorGetKey(pCsr, i2, &eType2, &pKey2, &nKey2);
2125 
2126   if( pKey1==0 ){
2127     iRes = i2;
2128   }else if( pKey2==0 ){
2129     iRes = i1;
2130   }else{
2131     int res;
2132 
2133     /* Compare the keys */
2134     res = sortedDbKeyCompare(pCsr,
2135         eType1, pKey1, nKey1, eType2, pKey2, nKey2
2136     );
2137 
2138     res = res * mul;
2139     if( res==0 ){
2140       /* The two keys are identical. Normally, this means that the key from
2141       ** the newer run clobbers the old. However, if the newer key is a
2142       ** separator key, or a range-delete-boundary only, do not allow it
2143       ** to clobber an older entry.  */
2144       int nc1 = (eType1 & (LSM_INSERT|LSM_POINT_DELETE))==0;
2145       int nc2 = (eType2 & (LSM_INSERT|LSM_POINT_DELETE))==0;
2146       iRes = (nc1 > nc2) ? i2 : i1;
2147     }else if( res<0 ){
2148       iRes = i1;
2149     }else{
2150       iRes = i2;
2151     }
2152   }
2153 
2154   pCsr->aTree[iOut] = iRes;
2155 }
2156 
2157 /*
2158 ** This function advances segment pointer iPtr belonging to multi-cursor
2159 ** pCsr forward (bReverse==0) or backward (bReverse!=0).
2160 **
2161 ** If the segment pointer points to a segment that is part of a composite
2162 ** level, then the following special case is handled.
2163 **
2164 **   * If iPtr is the lhs of a composite level, and the cursor is being
2165 **     advanced forwards, and segment iPtr is at EOF, move all pointers
2166 **     that correspond to rhs segments of the same level to the first
2167 **     key in their respective data.
2168 */
segmentCursorAdvance(MultiCursor * pCsr,int iPtr,int bReverse)2169 static int segmentCursorAdvance(
2170   MultiCursor *pCsr,
2171   int iPtr,
2172   int bReverse
2173 ){
2174   int rc;
2175   SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2176   Level *pLvl = pPtr->pLevel;
2177   int bComposite;                 /* True if pPtr is part of composite level */
2178 
2179   /* Advance the segment-pointer object. */
2180   rc = segmentPtrAdvance(pCsr, pPtr, bReverse);
2181   if( rc!=LSM_OK ) return rc;
2182 
2183   bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);
2184   if( bComposite && pPtr->pPg==0 ){
2185     int bFix = 0;
2186     if( (bReverse==0)==(pPtr->pSeg==&pLvl->lhs) ){
2187       int i;
2188       if( bReverse ){
2189         SegmentPtr *pLhs = &pCsr->aPtr[iPtr - 1 - (pPtr->pSeg - pLvl->aRhs)];
2190         for(i=0; i<pLvl->nRight; i++){
2191           if( pLhs[i+1].pPg ) break;
2192         }
2193         if( i==pLvl->nRight ){
2194           bFix = 1;
2195           rc = segmentPtrEnd(pCsr, pLhs, 1);
2196         }
2197       }else{
2198         bFix = 1;
2199         for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
2200           rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
2201         }
2202       }
2203     }
2204 
2205     if( bFix ){
2206       int i;
2207       for(i=pCsr->nTree-1; i>0; i--){
2208         multiCursorDoCompare(pCsr, i, bReverse);
2209       }
2210     }
2211   }
2212 
2213 #if 0
2214   if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
2215    && bReverse==0                                /* csr advanced forwards */
2216    && pPtr->pPg==0                               /* segment at EOF */
2217   ){
2218     int i;
2219     for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
2220       rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
2221     }
2222     for(i=pCsr->nTree-1; i>0; i--){
2223       multiCursorDoCompare(pCsr, i, 0);
2224     }
2225   }
2226 #endif
2227 
2228   return rc;
2229 }
2230 
mcursorFreeComponents(MultiCursor * pCsr)2231 static void mcursorFreeComponents(MultiCursor *pCsr){
2232   int i;
2233   lsm_env *pEnv = pCsr->pDb->pEnv;
2234 
2235   /* Close the tree cursor, if any. */
2236   lsmTreeCursorDestroy(pCsr->apTreeCsr[0]);
2237   lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
2238 
2239   /* Reset the segment pointers */
2240   for(i=0; i<pCsr->nPtr; i++){
2241     segmentPtrReset(&pCsr->aPtr[i], 0);
2242   }
2243 
2244   /* And the b-tree cursor, if any */
2245   btreeCursorFree(pCsr->pBtCsr);
2246 
2247   /* Free allocations */
2248   lsmFree(pEnv, pCsr->aPtr);
2249   lsmFree(pEnv, pCsr->aTree);
2250   lsmFree(pEnv, pCsr->pSystemVal);
2251 
2252   /* Zero fields */
2253   pCsr->nPtr = 0;
2254   pCsr->aPtr = 0;
2255   pCsr->nTree = 0;
2256   pCsr->aTree = 0;
2257   pCsr->pSystemVal = 0;
2258   pCsr->apTreeCsr[0] = 0;
2259   pCsr->apTreeCsr[1] = 0;
2260   pCsr->pBtCsr = 0;
2261 }
2262 
lsmMCursorFreeCache(lsm_db * pDb)2263 void lsmMCursorFreeCache(lsm_db *pDb){
2264   MultiCursor *p;
2265   MultiCursor *pNext;
2266   for(p=pDb->pCsrCache; p; p=pNext){
2267     pNext = p->pNext;
2268     lsmMCursorClose(p, 0);
2269   }
2270   pDb->pCsrCache = 0;
2271 }
2272 
2273 /*
2274 ** Close the cursor passed as the first argument.
2275 **
2276 ** If the bCache parameter is true, then shift the cursor to the pCsrCache
2277 ** list for possible reuse instead of actually deleting it.
2278 */
lsmMCursorClose(MultiCursor * pCsr,int bCache)2279 void lsmMCursorClose(MultiCursor *pCsr, int bCache){
2280   if( pCsr ){
2281     lsm_db *pDb = pCsr->pDb;
2282     MultiCursor **pp;             /* Iterator variable */
2283 
2284     /* The cursor may or may not be currently part of the linked list
2285     ** starting at lsm_db.pCsr. If it is, extract it.  */
2286     for(pp=&pDb->pCsr; *pp; pp=&((*pp)->pNext)){
2287       if( *pp==pCsr ){
2288         *pp = pCsr->pNext;
2289         break;
2290       }
2291     }
2292 
2293     if( bCache ){
2294       int i;                      /* Used to iterate through segment-pointers */
2295 
2296       /* Release any page references held by this cursor. */
2297       assert( !pCsr->pBtCsr );
2298       for(i=0; i<pCsr->nPtr; i++){
2299         SegmentPtr *pPtr = &pCsr->aPtr[i];
2300         lsmFsPageRelease(pPtr->pPg);
2301         pPtr->pPg = 0;
2302       }
2303 
2304       /* Reset the tree cursors */
2305       lsmTreeCursorReset(pCsr->apTreeCsr[0]);
2306       lsmTreeCursorReset(pCsr->apTreeCsr[1]);
2307 
2308       /* Add the cursor to the pCsrCache list */
2309       pCsr->pNext = pDb->pCsrCache;
2310       pDb->pCsrCache = pCsr;
2311     }else{
2312       /* Free the allocation used to cache the current key, if any. */
2313       sortedBlobFree(&pCsr->key);
2314       sortedBlobFree(&pCsr->val);
2315 
2316       /* Free the component cursors */
2317       mcursorFreeComponents(pCsr);
2318 
2319       /* Free the cursor structure itself */
2320       lsmFree(pDb->pEnv, pCsr);
2321     }
2322   }
2323 }
2324 
2325 #define TREE_NONE 0
2326 #define TREE_OLD  1
2327 #define TREE_BOTH 2
2328 
2329 /*
2330 ** Parameter eTree is one of TREE_OLD or TREE_BOTH.
2331 */
multiCursorAddTree(MultiCursor * pCsr,Snapshot * pSnap,int eTree)2332 static int multiCursorAddTree(MultiCursor *pCsr, Snapshot *pSnap, int eTree){
2333   int rc = LSM_OK;
2334   lsm_db *db = pCsr->pDb;
2335 
2336   /* Add a tree cursor on the 'old' tree, if it exists. */
2337   if( eTree!=TREE_NONE
2338    && lsmTreeHasOld(db)
2339    && db->treehdr.iOldLog!=pSnap->iLogOff
2340   ){
2341     rc = lsmTreeCursorNew(db, 1, &pCsr->apTreeCsr[1]);
2342   }
2343 
2344   /* Add a tree cursor on the 'current' tree, if required. */
2345   if( rc==LSM_OK && eTree==TREE_BOTH ){
2346     rc = lsmTreeCursorNew(db, 0, &pCsr->apTreeCsr[0]);
2347   }
2348 
2349   return rc;
2350 }
2351 
multiCursorAddRhs(MultiCursor * pCsr,Level * pLvl)2352 static int multiCursorAddRhs(MultiCursor *pCsr, Level *pLvl){
2353   int i;
2354   int nRhs = pLvl->nRight;
2355 
2356   assert( pLvl->nRight>0 );
2357   assert( pCsr->aPtr==0 );
2358   pCsr->aPtr = lsmMallocZero(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nRhs);
2359   if( !pCsr->aPtr ) return LSM_NOMEM_BKPT;
2360   pCsr->nPtr = nRhs;
2361 
2362   for(i=0; i<nRhs; i++){
2363     pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
2364     pCsr->aPtr[i].pLevel = pLvl;
2365   }
2366 
2367   return LSM_OK;
2368 }
2369 
multiCursorAddOne(MultiCursor * pCsr,Level * pLvl,int * pRc)2370 static void multiCursorAddOne(MultiCursor *pCsr, Level *pLvl, int *pRc){
2371   if( *pRc==LSM_OK ){
2372     int iPtr = pCsr->nPtr;
2373     int i;
2374     pCsr->aPtr[iPtr].pLevel = pLvl;
2375     pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
2376     iPtr++;
2377     for(i=0; i<pLvl->nRight; i++){
2378       pCsr->aPtr[iPtr].pLevel = pLvl;
2379       pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
2380       iPtr++;
2381     }
2382 
2383     if( pLvl->nRight && pLvl->pSplitKey==0 ){
2384       sortedSplitkey(pCsr->pDb, pLvl, pRc);
2385     }
2386     pCsr->nPtr = iPtr;
2387   }
2388 }
2389 
multiCursorAddAll(MultiCursor * pCsr,Snapshot * pSnap)2390 static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
2391   Level *pLvl;
2392   int nPtr = 0;
2393   int rc = LSM_OK;
2394 
2395   for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
2396     /* If the LEVEL_INCOMPLETE flag is set, then this function is being
2397     ** called (indirectly) from within a sortedNewToplevel() call to
2398     ** construct pLvl. In this case ignore pLvl - this cursor is going to
2399     ** be used to retrieve a freelist entry from the LSM, and the partially
2400     ** complete level may confuse it.  */
2401     if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
2402     nPtr += (1 + pLvl->nRight);
2403   }
2404 
2405   assert( pCsr->aPtr==0 );
2406   pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
2407 
2408   for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
2409     if( (pLvl->flags & LEVEL_INCOMPLETE)==0 ){
2410       multiCursorAddOne(pCsr, pLvl, &rc);
2411     }
2412   }
2413 
2414   return rc;
2415 }
2416 
multiCursorInit(MultiCursor * pCsr,Snapshot * pSnap)2417 static int multiCursorInit(MultiCursor *pCsr, Snapshot *pSnap){
2418   int rc;
2419   rc = multiCursorAddAll(pCsr, pSnap);
2420   if( rc==LSM_OK ){
2421     rc = multiCursorAddTree(pCsr, pSnap, TREE_BOTH);
2422   }
2423   pCsr->flags |= (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
2424   return rc;
2425 }
2426 
multiCursorNew(lsm_db * db,int * pRc)2427 static MultiCursor *multiCursorNew(lsm_db *db, int *pRc){
2428   MultiCursor *pCsr;
2429   pCsr = (MultiCursor *)lsmMallocZeroRc(db->pEnv, sizeof(MultiCursor), pRc);
2430   if( pCsr ){
2431     pCsr->pNext = db->pCsr;
2432     db->pCsr = pCsr;
2433     pCsr->pDb = db;
2434   }
2435   return pCsr;
2436 }
2437 
2438 
lsmSortedRemap(lsm_db * pDb)2439 void lsmSortedRemap(lsm_db *pDb){
2440   MultiCursor *pCsr;
2441   for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
2442     int iPtr;
2443     if( pCsr->pBtCsr ){
2444       btreeCursorLoadKey(pCsr->pBtCsr);
2445     }
2446     for(iPtr=0; iPtr<pCsr->nPtr; iPtr++){
2447       segmentPtrLoadCell(&pCsr->aPtr[iPtr], pCsr->aPtr[iPtr].iCell);
2448     }
2449   }
2450 }
2451 
multiCursorReadSeparators(MultiCursor * pCsr)2452 static void multiCursorReadSeparators(MultiCursor *pCsr){
2453   if( pCsr->nPtr>0 ){
2454     pCsr->flags |= CURSOR_READ_SEPARATORS;
2455   }
2456 }
2457 
2458 /*
2459 ** Have this cursor skip over SORTED_DELETE entries.
2460 */
multiCursorIgnoreDelete(MultiCursor * pCsr)2461 static void multiCursorIgnoreDelete(MultiCursor *pCsr){
2462   if( pCsr ) pCsr->flags |= CURSOR_IGNORE_DELETE;
2463 }
2464 
2465 /*
2466 ** If the free-block list is not empty, then have this cursor visit a key
2467 ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value
2468 ** blob containing the serialized free-block list.
2469 */
multiCursorVisitFreelist(MultiCursor * pCsr)2470 static int multiCursorVisitFreelist(MultiCursor *pCsr){
2471   int rc = LSM_OK;
2472   pCsr->flags |= CURSOR_FLUSH_FREELIST;
2473   pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
2474   return rc;
2475 }
2476 
2477 /*
2478 ** Allocate and return a new database cursor.
2479 **
2480 ** This method should only be called to allocate user cursors. As it may
2481 ** recycle a cursor from lsm_db.pCsrCache.
2482 */
lsmMCursorNew(lsm_db * pDb,MultiCursor ** ppCsr)2483 int lsmMCursorNew(
2484   lsm_db *pDb,                    /* Database handle */
2485   MultiCursor **ppCsr             /* OUT: Allocated cursor */
2486 ){
2487   MultiCursor *pCsr = 0;
2488   int rc = LSM_OK;
2489 
2490   if( pDb->pCsrCache ){
2491     int bOld;                     /* True if there is an old in-memory tree */
2492 
2493     /* Remove a cursor from the pCsrCache list and add it to the open list. */
2494     pCsr = pDb->pCsrCache;
2495     pDb->pCsrCache = pCsr->pNext;
2496     pCsr->pNext = pDb->pCsr;
2497     pDb->pCsr = pCsr;
2498 
2499     /* The cursor can almost be used as is, except that the old in-memory
2500     ** tree cursor may be present and not required, or required and not
2501     ** present. Fix this if required.  */
2502     bOld = (lsmTreeHasOld(pDb) && pDb->treehdr.iOldLog!=pDb->pClient->iLogOff);
2503     if( !bOld && pCsr->apTreeCsr[1] ){
2504       lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
2505       pCsr->apTreeCsr[1] = 0;
2506     }else if( bOld && !pCsr->apTreeCsr[1] ){
2507       rc = lsmTreeCursorNew(pDb, 1, &pCsr->apTreeCsr[1]);
2508     }
2509 
2510     pCsr->flags = (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
2511 
2512   }else{
2513     pCsr = multiCursorNew(pDb, &rc);
2514     if( rc==LSM_OK ) rc = multiCursorInit(pCsr, pDb->pClient);
2515   }
2516 
2517   if( rc!=LSM_OK ){
2518     lsmMCursorClose(pCsr, 0);
2519     pCsr = 0;
2520   }
2521   assert( (rc==LSM_OK)==(pCsr!=0) );
2522   *ppCsr = pCsr;
2523   return rc;
2524 }
2525 
multiCursorGetVal(MultiCursor * pCsr,int iVal,void ** ppVal,int * pnVal)2526 static int multiCursorGetVal(
2527   MultiCursor *pCsr,
2528   int iVal,
2529   void **ppVal,
2530   int *pnVal
2531 ){
2532   int rc = LSM_OK;
2533 
2534   *ppVal = 0;
2535   *pnVal = 0;
2536 
2537   switch( iVal ){
2538     case CURSOR_DATA_TREE0:
2539     case CURSOR_DATA_TREE1: {
2540       TreeCursor *pTreeCsr = pCsr->apTreeCsr[iVal-CURSOR_DATA_TREE0];
2541       if( lsmTreeCursorValid(pTreeCsr) ){
2542         lsmTreeCursorValue(pTreeCsr, ppVal, pnVal);
2543       }else{
2544         *ppVal = 0;
2545         *pnVal = 0;
2546       }
2547       break;
2548     }
2549 
2550     case CURSOR_DATA_SYSTEM: {
2551       Snapshot *pWorker = pCsr->pDb->pWorker;
2552       if( pWorker
2553        && (pCsr->iFree % 2)==0
2554        && pCsr->iFree < (pWorker->freelist.nEntry*2)
2555       ){
2556         int iEntry = pWorker->freelist.nEntry - 1 - (pCsr->iFree / 2);
2557         u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
2558         lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
2559         *ppVal = aVal;
2560         *pnVal = 8;
2561       }
2562       break;
2563     }
2564 
2565     default: {
2566       int iPtr = iVal-CURSOR_DATA_SEGMENT;
2567       if( iPtr<pCsr->nPtr ){
2568         SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2569         if( pPtr->pPg ){
2570           *ppVal = pPtr->pVal;
2571           *pnVal = pPtr->nVal;
2572         }
2573       }
2574     }
2575   }
2576 
2577   assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
2578   return rc;
2579 }
2580 
2581 static int multiCursorAdvance(MultiCursor *pCsr, int bReverse);
2582 
2583 /*
2584 ** This function is called by worker connections to walk the part of the
2585 ** free-list stored within the LSM data structure.
2586 */
lsmSortedWalkFreelist(lsm_db * pDb,int bReverse,int (* x)(void *,int,i64),void * pCtx)2587 int lsmSortedWalkFreelist(
2588   lsm_db *pDb,                    /* Database handle */
2589   int bReverse,                   /* True to iterate from largest to smallest */
2590   int (*x)(void *, int, i64),     /* Callback function */
2591   void *pCtx                      /* First argument to pass to callback */
2592 ){
2593   MultiCursor *pCsr;              /* Cursor used to read db */
2594   int rc = LSM_OK;                /* Return Code */
2595   Snapshot *pSnap = 0;
2596 
2597   assert( pDb->pWorker );
2598   if( pDb->bIncrMerge ){
2599     rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
2600     if( rc!=LSM_OK ) return rc;
2601   }else{
2602     pSnap = pDb->pWorker;
2603   }
2604 
2605   pCsr = multiCursorNew(pDb, &rc);
2606   if( pCsr ){
2607     rc = multiCursorAddAll(pCsr, pSnap);
2608     pCsr->flags |= CURSOR_IGNORE_DELETE;
2609   }
2610 
2611   if( rc==LSM_OK ){
2612     if( bReverse==0 ){
2613       rc = lsmMCursorLast(pCsr);
2614     }else{
2615       rc = lsmMCursorSeek(pCsr, 1, "", 0, LSM_SEEK_GE);
2616     }
2617 
2618     while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
2619       void *pKey; int nKey;
2620       void *pVal = 0; int nVal = 0;
2621 
2622       rc = lsmMCursorKey(pCsr, &pKey, &nKey);
2623       if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
2624       if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;
2625 
2626       if( rc==LSM_OK ){
2627         int iBlk;
2628         i64 iSnap;
2629         iBlk = (int)(~(lsmGetU32((u8 *)pKey)));
2630         iSnap = (i64)lsmGetU64((u8 *)pVal);
2631         if( x(pCtx, iBlk, iSnap) ) break;
2632         rc = multiCursorAdvance(pCsr, !bReverse);
2633       }
2634     }
2635   }
2636 
2637   lsmMCursorClose(pCsr, 0);
2638   if( pSnap!=pDb->pWorker ){
2639     lsmFreeSnapshot(pDb->pEnv, pSnap);
2640   }
2641 
2642   return rc;
2643 }
2644 
lsmSortedLoadFreelist(lsm_db * pDb,void ** ppVal,int * pnVal)2645 int lsmSortedLoadFreelist(
2646   lsm_db *pDb,                    /* Database handle (must be worker) */
2647   void **ppVal,                   /* OUT: Blob containing LSM free-list */
2648   int *pnVal                      /* OUT: Size of *ppVal blob in bytes */
2649 ){
2650   MultiCursor *pCsr;              /* Cursor used to retreive free-list */
2651   int rc = LSM_OK;                /* Return Code */
2652 
2653   assert( pDb->pWorker );
2654   assert( *ppVal==0 && *pnVal==0 );
2655 
2656   pCsr = multiCursorNew(pDb, &rc);
2657   if( pCsr ){
2658     rc = multiCursorAddAll(pCsr, pDb->pWorker);
2659     pCsr->flags |= CURSOR_IGNORE_DELETE;
2660   }
2661 
2662   if( rc==LSM_OK ){
2663     rc = lsmMCursorLast(pCsr);
2664     if( rc==LSM_OK
2665      && rtIsWrite(pCsr->eType) && rtIsSystem(pCsr->eType)
2666      && pCsr->key.nData==8
2667      && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
2668     ){
2669       void *pVal; int nVal;         /* Value read from database */
2670       rc = lsmMCursorValue(pCsr, &pVal, &nVal);
2671       if( rc==LSM_OK ){
2672         *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
2673         if( *ppVal ){
2674           memcpy(*ppVal, pVal, nVal);
2675           *pnVal = nVal;
2676         }
2677       }
2678     }
2679 
2680     lsmMCursorClose(pCsr, 0);
2681   }
2682 
2683   return rc;
2684 }
2685 
multiCursorAllocTree(MultiCursor * pCsr)2686 static int multiCursorAllocTree(MultiCursor *pCsr){
2687   int rc = LSM_OK;
2688   if( pCsr->aTree==0 ){
2689     int nByte;                    /* Bytes of space to allocate */
2690     int nMin;                     /* Total number of cursors being merged */
2691 
2692     nMin = CURSOR_DATA_SEGMENT + pCsr->nPtr + (pCsr->pBtCsr!=0);
2693     pCsr->nTree = 2;
2694     while( pCsr->nTree<nMin ){
2695       pCsr->nTree = pCsr->nTree*2;
2696     }
2697 
2698     nByte = sizeof(int)*pCsr->nTree*2;
2699     pCsr->aTree = (int *)lsmMallocZeroRc(pCsr->pDb->pEnv, nByte, &rc);
2700   }
2701   return rc;
2702 }
2703 
multiCursorCacheKey(MultiCursor * pCsr,int * pRc)2704 static void multiCursorCacheKey(MultiCursor *pCsr, int *pRc){
2705   if( *pRc==LSM_OK ){
2706     void *pKey;
2707     int nKey;
2708     multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
2709     *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
2710   }
2711 }
2712 
2713 #ifdef LSM_DEBUG_EXPENSIVE
assertCursorTree(MultiCursor * pCsr)2714 static void assertCursorTree(MultiCursor *pCsr){
2715   int bRev = !!(pCsr->flags & CURSOR_PREV_OK);
2716   int *aSave = pCsr->aTree;
2717   int nSave = pCsr->nTree;
2718   int rc;
2719 
2720   pCsr->aTree = 0;
2721   pCsr->nTree = 0;
2722   rc = multiCursorAllocTree(pCsr);
2723   if( rc==LSM_OK ){
2724     int i;
2725     for(i=pCsr->nTree-1; i>0; i--){
2726       multiCursorDoCompare(pCsr, i, bRev);
2727     }
2728 
2729     assert( nSave==pCsr->nTree
2730         && 0==memcmp(aSave, pCsr->aTree, sizeof(int)*nSave)
2731     );
2732 
2733     lsmFree(pCsr->pDb->pEnv, pCsr->aTree);
2734   }
2735 
2736   pCsr->aTree = aSave;
2737   pCsr->nTree = nSave;
2738 }
2739 #else
2740 # define assertCursorTree(x)
2741 #endif
2742 
mcursorLocationOk(MultiCursor * pCsr,int bDeleteOk)2743 static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
2744   int eType = pCsr->eType;
2745   int iKey;
2746   int i;
2747   int rdmask;
2748 
2749   assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
2750   assertCursorTree(pCsr);
2751 
2752   rdmask = (pCsr->flags & CURSOR_NEXT_OK) ? LSM_END_DELETE : LSM_START_DELETE;
2753 
2754   /* If the cursor does not currently point to an actual database key (i.e.
2755   ** it points to a delete key, or the start or end of a range-delete), and
2756   ** the CURSOR_IGNORE_DELETE flag is set, skip past this entry.  */
2757   if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
2758     if( (eType & LSM_INSERT)==0 ) return 0;
2759   }
2760 
2761   /* If the cursor points to a system key (free-list entry), and the
2762   ** CURSOR_IGNORE_SYSTEM flag is set, skip thie entry.  */
2763   if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
2764     return 0;
2765   }
2766 
2767 #ifndef NDEBUG
2768   /* This block fires assert() statements to check one of the assumptions
2769   ** in the comment below - that if the lhs sub-cursor of a level undergoing
2770   ** a merge is valid, then all the rhs sub-cursors must be at EOF.
2771   **
2772   ** Also assert that all rhs sub-cursors are either at EOF or point to
2773   ** a key that is not less than the level split-key.  */
2774   for(i=0; i<pCsr->nPtr; i++){
2775     SegmentPtr *pPtr = &pCsr->aPtr[i];
2776     Level *pLvl = pPtr->pLevel;
2777     if( pLvl->nRight && pPtr->pPg ){
2778       if( pPtr->pSeg==&pLvl->lhs ){
2779         int j;
2780         for(j=0; j<pLvl->nRight; j++) assert( pPtr[j+1].pPg==0 );
2781       }else{
2782         int res = sortedKeyCompare(pCsr->pDb->xCmp,
2783             rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
2784             pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
2785         );
2786         assert( res>=0 );
2787       }
2788     }
2789   }
2790 #endif
2791 
2792   /* Now check if this key has already been deleted by a range-delete. If
2793   ** so, skip past it.
2794   **
2795   ** Assume, for the moment, that the tree contains no levels currently
2796   ** undergoing incremental merge, and that this cursor is iterating forwards
2797   ** through the database keys. The cursor currently points to a key in
2798   ** level L. This key has already been deleted if any of the sub-cursors
2799   ** that point to levels newer than L (or to the in-memory tree) point to
2800   ** a key greater than the current key with the LSM_END_DELETE flag set.
2801   **
2802   ** Or, if the cursor is iterating backwards through data keys, if any
2803   ** such sub-cursor points to a key smaller than the current key with the
2804   ** LSM_START_DELETE flag set.
2805   **
2806   ** Why it works with levels undergoing a merge too:
2807   **
2808   ** When a cursor iterates forwards, the sub-cursors for the rhs of a
2809   ** level are only activated once the lhs reaches EOF. So when iterating
2810   ** forwards, the keys visited are the same as if the level was completely
2811   ** merged.
2812   **
2813   ** If the cursor is iterating backwards, then the lhs sub-cursor is not
2814   ** initialized until the last of the rhs sub-cursors has reached EOF.
2815   ** Additionally, if the START_DELETE flag is set on the last entry (in
2816   ** reverse order - so the entry with the smallest key) of a rhs sub-cursor,
2817   ** then a pseudo-key equal to the levels split-key with the END_DELETE
2818   ** flag set is visited by the sub-cursor.
2819   */
2820   iKey = pCsr->aTree[1];
2821   for(i=0; i<iKey; i++){
2822     int csrflags;
2823     multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
2824     if( (rdmask & csrflags) ){
2825       const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
2826       if( (csrflags & SD_ED)==SD_ED
2827        || (pCsr->flags & CURSOR_IGNORE_DELETE)==0
2828       ){
2829         void *pKey; int nKey;
2830         multiCursorGetKey(pCsr, i, 0, &pKey, &nKey);
2831         if( 0==sortedKeyCompare(pCsr->pDb->xCmp,
2832               rtTopic(eType), pCsr->key.pData, pCsr->key.nData,
2833               rtTopic(csrflags), pKey, nKey
2834         )){
2835           continue;
2836         }
2837       }
2838       return 0;
2839     }
2840   }
2841 
2842   /* The current cursor position is one this cursor should visit. Return 1. */
2843   return 1;
2844 }
2845 
multiCursorSetupTree(MultiCursor * pCsr,int bRev)2846 static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
2847   int rc;
2848 
2849   rc = multiCursorAllocTree(pCsr);
2850   if( rc==LSM_OK ){
2851     int i;
2852     for(i=pCsr->nTree-1; i>0; i--){
2853       multiCursorDoCompare(pCsr, i, bRev);
2854     }
2855   }
2856 
2857   assertCursorTree(pCsr);
2858   multiCursorCacheKey(pCsr, &rc);
2859 
2860   if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
2861     rc = multiCursorAdvance(pCsr, bRev);
2862   }
2863   return rc;
2864 }
2865 
2866 
multiCursorEnd(MultiCursor * pCsr,int bLast)2867 static int multiCursorEnd(MultiCursor *pCsr, int bLast){
2868   int rc = LSM_OK;
2869   int i;
2870 
2871   pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
2872   pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
2873   pCsr->iFree = 0;
2874 
2875   /* Position the two in-memory tree cursors */
2876   for(i=0; rc==LSM_OK && i<2; i++){
2877     if( pCsr->apTreeCsr[i] ){
2878       rc = lsmTreeCursorEnd(pCsr->apTreeCsr[i], bLast);
2879     }
2880   }
2881 
2882   for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
2883     SegmentPtr *pPtr = &pCsr->aPtr[i];
2884     Level *pLvl = pPtr->pLevel;
2885     int iRhs;
2886     int bHit = 0;
2887 
2888     if( bLast ){
2889       for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
2890         rc = segmentPtrEnd(pCsr, &pPtr[iRhs+1], 1);
2891         if( pPtr[iRhs+1].pPg ) bHit = 1;
2892       }
2893       if( bHit==0 && rc==LSM_OK ){
2894         rc = segmentPtrEnd(pCsr, pPtr, 1);
2895       }else{
2896         segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
2897       }
2898     }else{
2899       int bLhs = (pPtr->pSeg==&pLvl->lhs);
2900       assert( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[0] );
2901 
2902       if( bLhs ){
2903         rc = segmentPtrEnd(pCsr, pPtr, 0);
2904         if( pPtr->pKey ) bHit = 1;
2905       }
2906       for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
2907         if( bHit ){
2908           segmentPtrReset(&pPtr[iRhs+1], LSM_SEGMENTPTR_FREE_THRESHOLD);
2909         }else{
2910           rc = sortedRhsFirst(pCsr, pLvl, &pPtr[iRhs+bLhs]);
2911         }
2912       }
2913     }
2914     i += pLvl->nRight;
2915   }
2916 
2917   /* And the b-tree cursor, if applicable */
2918   if( rc==LSM_OK && pCsr->pBtCsr ){
2919     assert( bLast==0 );
2920     rc = btreeCursorFirst(pCsr->pBtCsr);
2921   }
2922 
2923   if( rc==LSM_OK ){
2924     rc = multiCursorSetupTree(pCsr, bLast);
2925   }
2926 
2927   return rc;
2928 }
2929 
2930 
mcursorSave(MultiCursor * pCsr)2931 int mcursorSave(MultiCursor *pCsr){
2932   int rc = LSM_OK;
2933   if( pCsr->aTree ){
2934     int iTree = pCsr->aTree[1];
2935     if( iTree==CURSOR_DATA_TREE0 || iTree==CURSOR_DATA_TREE1 ){
2936       multiCursorCacheKey(pCsr, &rc);
2937     }
2938   }
2939   mcursorFreeComponents(pCsr);
2940   return rc;
2941 }
2942 
mcursorRestore(lsm_db * pDb,MultiCursor * pCsr)2943 int mcursorRestore(lsm_db *pDb, MultiCursor *pCsr){
2944   int rc;
2945   rc = multiCursorInit(pCsr, pDb->pClient);
2946   if( rc==LSM_OK && pCsr->key.pData ){
2947     rc = lsmMCursorSeek(pCsr,
2948          rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, +1
2949     );
2950   }
2951   return rc;
2952 }
2953 
lsmSaveCursors(lsm_db * pDb)2954 int lsmSaveCursors(lsm_db *pDb){
2955   int rc = LSM_OK;
2956   MultiCursor *pCsr;
2957 
2958   for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
2959     rc = mcursorSave(pCsr);
2960   }
2961   return rc;
2962 }
2963 
lsmRestoreCursors(lsm_db * pDb)2964 int lsmRestoreCursors(lsm_db *pDb){
2965   int rc = LSM_OK;
2966   MultiCursor *pCsr;
2967 
2968   for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
2969     rc = mcursorRestore(pDb, pCsr);
2970   }
2971   return rc;
2972 }
2973 
lsmMCursorFirst(MultiCursor * pCsr)2974 int lsmMCursorFirst(MultiCursor *pCsr){
2975   return multiCursorEnd(pCsr, 0);
2976 }
2977 
lsmMCursorLast(MultiCursor * pCsr)2978 int lsmMCursorLast(MultiCursor *pCsr){
2979   return multiCursorEnd(pCsr, 1);
2980 }
2981 
lsmMCursorDb(MultiCursor * pCsr)2982 lsm_db *lsmMCursorDb(MultiCursor *pCsr){
2983   return pCsr->pDb;
2984 }
2985 
lsmMCursorReset(MultiCursor * pCsr)2986 void lsmMCursorReset(MultiCursor *pCsr){
2987   int i;
2988   lsmTreeCursorReset(pCsr->apTreeCsr[0]);
2989   lsmTreeCursorReset(pCsr->apTreeCsr[1]);
2990   for(i=0; i<pCsr->nPtr; i++){
2991     segmentPtrReset(&pCsr->aPtr[i], LSM_SEGMENTPTR_FREE_THRESHOLD);
2992   }
2993   pCsr->key.nData = 0;
2994 }
2995 
treeCursorSeek(MultiCursor * pCsr,TreeCursor * pTreeCsr,void * pKey,int nKey,int eSeek,int * pbStop)2996 static int treeCursorSeek(
2997   MultiCursor *pCsr,
2998   TreeCursor *pTreeCsr,
2999   void *pKey, int nKey,
3000   int eSeek,
3001   int *pbStop
3002 ){
3003   int rc = LSM_OK;
3004   if( pTreeCsr ){
3005     int res = 0;
3006     lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
3007     switch( eSeek ){
3008       case LSM_SEEK_EQ: {
3009         int eType = lsmTreeCursorFlags(pTreeCsr);
3010         if( (res<0 && (eType & LSM_START_DELETE))
3011          || (res>0 && (eType & LSM_END_DELETE))
3012          || (res==0 && (eType & LSM_POINT_DELETE))
3013         ){
3014           *pbStop = 1;
3015         }else if( res==0 && (eType & LSM_INSERT) ){
3016           lsm_env *pEnv = pCsr->pDb->pEnv;
3017           void *p; int n;         /* Key/value from tree-cursor */
3018           *pbStop = 1;
3019           pCsr->flags |= CURSOR_SEEK_EQ;
3020           rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n);
3021           if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->key, p, n);
3022           if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n);
3023           if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->val, p, n);
3024         }
3025         lsmTreeCursorReset(pTreeCsr);
3026         break;
3027       }
3028       case LSM_SEEK_GE:
3029         if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
3030           lsmTreeCursorNext(pTreeCsr);
3031         }
3032         break;
3033       default:
3034         if( res>0 ){
3035           assert( lsmTreeCursorValid(pTreeCsr) );
3036           lsmTreeCursorPrev(pTreeCsr);
3037         }
3038         break;
3039     }
3040   }
3041   return rc;
3042 }
3043 
3044 
3045 /*
3046 ** Seek the cursor.
3047 */
lsmMCursorSeek(MultiCursor * pCsr,int iTopic,void * pKey,int nKey,int eSeek)3048 int lsmMCursorSeek(
3049   MultiCursor *pCsr,
3050   int iTopic,
3051   void *pKey, int nKey,
3052   int eSeek
3053 ){
3054   int eESeek = eSeek;             /* Effective eSeek parameter */
3055   int bStop = 0;                  /* Set to true to halt search operation */
3056   int rc = LSM_OK;                /* Return code */
3057   int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
3058   Pgno iPgno = 0;                 /* FC pointer value */
3059 
3060   assert( pCsr->apTreeCsr[0]==0 || iTopic==0 );
3061   assert( pCsr->apTreeCsr[1]==0 || iTopic==0 );
3062 
3063   if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
3064 
3065   assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
3066   assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 );
3067   assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );
3068 
3069   pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
3070   rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
3071   if( rc==LSM_OK && bStop==0 ){
3072     rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
3073   }
3074 
3075   /* Seek all segment pointers. */
3076   for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
3077     SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
3078     assert( pPtr->pSeg==&pPtr->pLevel->lhs );
3079     rc = seekInLevel(pCsr, pPtr, eESeek, iTopic, pKey, nKey, &iPgno, &bStop);
3080     iPtr += pPtr->pLevel->nRight;
3081   }
3082 
3083   if( eSeek!=LSM_SEEK_EQ ){
3084     if( rc==LSM_OK ){
3085       rc = multiCursorAllocTree(pCsr);
3086     }
3087     if( rc==LSM_OK ){
3088       int i;
3089       for(i=pCsr->nTree-1; i>0; i--){
3090         multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
3091       }
3092       if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
3093       if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
3094     }
3095 
3096     multiCursorCacheKey(pCsr, &rc);
3097     if( rc==LSM_OK && eSeek!=LSM_SEEK_LEFAST && 0==mcursorLocationOk(pCsr, 0) ){
3098       switch( eESeek ){
3099         case LSM_SEEK_EQ:
3100           lsmMCursorReset(pCsr);
3101           break;
3102         case LSM_SEEK_GE:
3103           rc = lsmMCursorNext(pCsr);
3104           break;
3105         default:
3106           rc = lsmMCursorPrev(pCsr);
3107           break;
3108       }
3109     }
3110   }
3111 
3112   return rc;
3113 }
3114 
lsmMCursorValid(MultiCursor * pCsr)3115 int lsmMCursorValid(MultiCursor *pCsr){
3116   int res = 0;
3117   if( pCsr->flags & CURSOR_SEEK_EQ ){
3118     res = 1;
3119   }else if( pCsr->aTree ){
3120     int iKey = pCsr->aTree[1];
3121     if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3122       res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
3123     }else{
3124       void *pKey;
3125       multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
3126       res = pKey!=0;
3127     }
3128   }
3129   return res;
3130 }
3131 
mcursorAdvanceOk(MultiCursor * pCsr,int bReverse,int * pRc)3132 static int mcursorAdvanceOk(
3133   MultiCursor *pCsr,
3134   int bReverse,
3135   int *pRc
3136 ){
3137   void *pNew;                     /* Pointer to buffer containing new key */
3138   int nNew;                       /* Size of buffer pNew in bytes */
3139   int eNewType;                   /* Type of new record */
3140 
3141   if( *pRc ) return 1;
3142 
3143   /* Check the current key value. If it is not greater than (if bReverse==0)
3144   ** or less than (if bReverse!=0) the key currently cached in pCsr->key,
3145   ** then the cursor has not yet been successfully advanced.
3146   */
3147   multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew);
3148   if( pNew ){
3149     int typemask = (pCsr->flags & CURSOR_IGNORE_DELETE) ? ~(0) : LSM_SYSTEMKEY;
3150     int res = sortedDbKeyCompare(pCsr,
3151       eNewType & typemask, pNew, nNew,
3152       pCsr->eType & typemask, pCsr->key.pData, pCsr->key.nData
3153     );
3154 
3155     if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){
3156       return 0;
3157     }
3158 
3159     multiCursorCacheKey(pCsr, pRc);
3160     assert( pCsr->eType==eNewType );
3161 
3162     /* If this cursor is configured to skip deleted keys, and the current
3163     ** cursor points to a SORTED_DELETE entry, then the cursor has not been
3164     ** successfully advanced.
3165     **
3166     ** Similarly, if the cursor is configured to skip system keys and the
3167     ** current cursor points to a system key, it has not yet been advanced.
3168     */
3169     if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
3170   }
3171   return 1;
3172 }
3173 
flCsrAdvance(MultiCursor * pCsr)3174 static void flCsrAdvance(MultiCursor *pCsr){
3175   assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
3176   if( pCsr->iFree % 2 ){
3177     pCsr->iFree++;
3178   }else{
3179     int nEntry = pCsr->pDb->pWorker->freelist.nEntry;
3180     FreelistEntry *aEntry = pCsr->pDb->pWorker->freelist.aEntry;
3181 
3182     int i = nEntry - 1 - (pCsr->iFree / 2);
3183 
3184     /* If the current entry is a delete and the "end-delete" key will not
3185     ** be attached to the next entry, increment iFree by 1 only. */
3186     if( aEntry[i].iId<0 ){
3187       while( 1 ){
3188         if( i==0 || aEntry[i-1].iBlk!=aEntry[i].iBlk-1 ){
3189           pCsr->iFree--;
3190           break;
3191         }
3192         if( aEntry[i-1].iId>=0 ) break;
3193         pCsr->iFree += 2;
3194         i--;
3195       }
3196     }
3197     pCsr->iFree += 2;
3198   }
3199 }
3200 
multiCursorAdvance(MultiCursor * pCsr,int bReverse)3201 static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
3202   int rc = LSM_OK;                /* Return Code */
3203   if( lsmMCursorValid(pCsr) ){
3204     do {
3205       int iKey = pCsr->aTree[1];
3206 
3207       assertCursorTree(pCsr);
3208 
3209       /* If this multi-cursor is advancing forwards, and the sub-cursor
3210       ** being advanced is the one that separator keys may be being read
3211       ** from, record the current absolute pointer value.  */
3212       if( pCsr->pPrevMergePtr ){
3213         if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
3214           assert( pCsr->pBtCsr );
3215           *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr;
3216         }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0
3217                && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1)
3218         ){
3219           SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT];
3220           *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr;
3221         }
3222       }
3223 
3224       if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3225         TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
3226         if( bReverse ){
3227           rc = lsmTreeCursorPrev(pTreeCsr);
3228         }else{
3229           rc = lsmTreeCursorNext(pTreeCsr);
3230         }
3231       }else if( iKey==CURSOR_DATA_SYSTEM ){
3232         assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
3233         assert( bReverse==0 );
3234         flCsrAdvance(pCsr);
3235       }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
3236         assert( bReverse==0 && pCsr->pBtCsr );
3237         rc = btreeCursorNext(pCsr->pBtCsr);
3238       }else{
3239         rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
3240       }
3241       if( rc==LSM_OK ){
3242         int i;
3243         for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
3244           multiCursorDoCompare(pCsr, i, bReverse);
3245         }
3246         assertCursorTree(pCsr);
3247       }
3248     }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
3249   }
3250   return rc;
3251 }
3252 
lsmMCursorNext(MultiCursor * pCsr)3253 int lsmMCursorNext(MultiCursor *pCsr){
3254   if( (pCsr->flags & CURSOR_NEXT_OK)==0 ) return LSM_MISUSE_BKPT;
3255   return multiCursorAdvance(pCsr, 0);
3256 }
3257 
lsmMCursorPrev(MultiCursor * pCsr)3258 int lsmMCursorPrev(MultiCursor *pCsr){
3259   if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
3260   return multiCursorAdvance(pCsr, 1);
3261 }
3262 
lsmMCursorKey(MultiCursor * pCsr,void ** ppKey,int * pnKey)3263 int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){
3264   if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
3265     *pnKey = pCsr->key.nData;
3266     *ppKey = pCsr->key.pData;
3267   }else{
3268     int iKey = pCsr->aTree[1];
3269 
3270     if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3271       TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
3272       lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
3273     }else{
3274       int nKey;
3275 
3276 #ifndef NDEBUG
3277       void *pKey;
3278       int eType;
3279       multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
3280       assert( eType==pCsr->eType );
3281       assert( nKey==pCsr->key.nData );
3282       assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
3283 #endif
3284 
3285       nKey = pCsr->key.nData;
3286       if( nKey==0 ){
3287         *ppKey = 0;
3288       }else{
3289         *ppKey = pCsr->key.pData;
3290       }
3291       *pnKey = nKey;
3292     }
3293   }
3294   return LSM_OK;
3295 }
3296 
3297 /*
3298 ** Compare the current key that cursor csr points to with pKey/nKey. Set
3299 ** *piRes to the result and return LSM_OK.
3300 */
lsm_csr_cmp(lsm_cursor * csr,const void * pKey,int nKey,int * piRes)3301 int lsm_csr_cmp(lsm_cursor *csr, const void *pKey, int nKey, int *piRes){
3302   MultiCursor *pCsr = (MultiCursor *)csr;
3303   void *pCsrkey; int nCsrkey;
3304   int rc;
3305   rc = lsmMCursorKey(pCsr, &pCsrkey, &nCsrkey);
3306   if( rc==LSM_OK ){
3307     int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
3308     *piRes = sortedKeyCompare(xCmp, 0, pCsrkey, nCsrkey, 0, (void *)pKey, nKey);
3309   }
3310   return rc;
3311 }
3312 
lsmMCursorValue(MultiCursor * pCsr,void ** ppVal,int * pnVal)3313 int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
3314   void *pVal;
3315   int nVal;
3316   int rc;
3317   if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
3318     rc = LSM_OK;
3319     nVal = pCsr->val.nData;
3320     pVal = pCsr->val.pData;
3321   }else{
3322 
3323     assert( pCsr->aTree );
3324     assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );
3325 
3326     rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
3327     if( pVal && rc==LSM_OK ){
3328       rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
3329       pVal = pCsr->val.pData;
3330     }
3331 
3332     if( rc!=LSM_OK ){
3333       pVal = 0;
3334       nVal = 0;
3335     }
3336   }
3337   *ppVal = pVal;
3338   *pnVal = nVal;
3339   return rc;
3340 }
3341 
lsmMCursorType(MultiCursor * pCsr,int * peType)3342 int lsmMCursorType(MultiCursor *pCsr, int *peType){
3343   assert( pCsr->aTree );
3344   multiCursorGetKey(pCsr, pCsr->aTree[1], peType, 0, 0);
3345   return LSM_OK;
3346 }
3347 
3348 /*
3349 ** Buffer aData[], size nData, is assumed to contain a valid b-tree
3350 ** hierarchy page image. Return the offset in aData[] of the next free
3351 ** byte in the data area (where a new cell may be written if there is
3352 ** space).
3353 */
mergeWorkerPageOffset(u8 * aData,int nData)3354 static int mergeWorkerPageOffset(u8 *aData, int nData){
3355   int nRec;
3356   int iOff;
3357   int nKey;
3358   int eType;
3359 
3360   nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
3361   iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]);
3362   eType = aData[iOff++];
3363   assert( eType==0
3364        || eType==(LSM_SYSTEMKEY|LSM_SEPARATOR)
3365        || eType==(LSM_SEPARATOR)
3366   );
3367 
3368   iOff += lsmVarintGet32(&aData[iOff], &nKey);
3369   iOff += lsmVarintGet32(&aData[iOff], &nKey);
3370 
3371   return iOff + (eType ? nKey : 0);
3372 }
3373 
3374 /*
3375 ** Following a checkpoint operation, database pages that are part of the
3376 ** checkpointed state of the LSM are deemed read-only. This includes the
3377 ** right-most page of the b-tree hierarchy of any separators array under
3378 ** construction, and all pages between it and the b-tree root, inclusive.
3379 ** This is a problem, as when further pages are appended to the separators
3380 ** array, entries must be added to the indicated b-tree hierarchy pages.
3381 **
3382 ** This function copies all such b-tree pages to new locations, so that
3383 ** they can be modified as required.
3384 **
3385 ** The complication is that not all database pages are the same size - due
3386 ** to the way the file.c module works some (the first and last in each block)
3387 ** are 4 bytes smaller than the others.
3388 */
mergeWorkerMoveHierarchy(MergeWorker * pMW,int bSep)3389 static int mergeWorkerMoveHierarchy(
3390   MergeWorker *pMW,               /* Merge worker */
3391   int bSep                        /* True for separators run */
3392 ){
3393   lsm_db *pDb = pMW->pDb;         /* Database handle */
3394   int rc = LSM_OK;                /* Return code */
3395   int i;
3396   Page **apHier = pMW->hier.apHier;
3397   int nHier = pMW->hier.nHier;
3398 
3399   for(i=0; rc==LSM_OK && i<nHier; i++){
3400     Page *pNew = 0;
3401     rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &pNew);
3402     assert( rc==LSM_OK );
3403 
3404     if( rc==LSM_OK ){
3405       u8 *a1; int n1;
3406       u8 *a2; int n2;
3407 
3408       a1 = fsPageData(pNew, &n1);
3409       a2 = fsPageData(apHier[i], &n2);
3410 
3411       assert( n1==n2 || n1+4==n2 );
3412 
3413       if( n1==n2 ){
3414         memcpy(a1, a2, n2);
3415       }else{
3416         int nEntry = pageGetNRec(a2, n2);
3417         int iEof1 = SEGMENT_EOF(n1, nEntry);
3418         int iEof2 = SEGMENT_EOF(n2, nEntry);
3419 
3420         memcpy(a1, a2, iEof2 - 4);
3421         memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
3422       }
3423 
3424       lsmFsPageRelease(apHier[i]);
3425       apHier[i] = pNew;
3426 
3427 #if 0
3428       assert( n1==n2 || n1+4==n2 || n2+4==n1 );
3429       if( n1>=n2 ){
3430         /* If n1 (size of the new page) is equal to or greater than n2 (the
3431         ** size of the old page), then copy the data into the new page. If
3432         ** n1==n2, this could be done with a single memcpy(). However,
3433         ** since sometimes n1>n2, the page content and footer must be copied
3434         ** separately. */
3435         int nEntry = pageGetNRec(a2, n2);
3436         int iEof1 = SEGMENT_EOF(n1, nEntry);
3437         int iEof2 = SEGMENT_EOF(n2, nEntry);
3438         memcpy(a1, a2, iEof2);
3439         memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
3440         lsmFsPageRelease(apHier[i]);
3441         apHier[i] = pNew;
3442       }else{
3443         lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG);
3444         lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0);
3445         lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], 0);
3446         i = i - 1;
3447         lsmFsPageRelease(pNew);
3448       }
3449 #endif
3450     }
3451   }
3452 
3453 #ifdef LSM_DEBUG
3454   if( rc==LSM_OK ){
3455     for(i=0; i<nHier; i++) assert( lsmFsPageWritable(apHier[i]) );
3456   }
3457 #endif
3458 
3459   return rc;
3460 }
3461 
3462 /*
3463 ** Allocate and populate the MergeWorker.apHier[] array.
3464 */
mergeWorkerLoadHierarchy(MergeWorker * pMW)3465 static int mergeWorkerLoadHierarchy(MergeWorker *pMW){
3466   int rc = LSM_OK;
3467   Segment *pSeg;
3468   Hierarchy *p;
3469 
3470   pSeg = &pMW->pLevel->lhs;
3471   p = &pMW->hier;
3472 
3473   if( p->apHier==0 && pSeg->iRoot!=0 ){
3474     FileSystem *pFS = pMW->pDb->pFS;
3475     lsm_env *pEnv = pMW->pDb->pEnv;
3476     Page **apHier = 0;
3477     int nHier = 0;
3478     int iPg = (int)pSeg->iRoot;
3479 
3480     do {
3481       Page *pPg = 0;
3482       u8 *aData;
3483       int nData;
3484       int flags;
3485 
3486       rc = lsmFsDbPageGet(pFS, pSeg, iPg, &pPg);
3487       if( rc!=LSM_OK ) break;
3488 
3489       aData = fsPageData(pPg, &nData);
3490       flags = pageGetFlags(aData, nData);
3491       if( flags&SEGMENT_BTREE_FLAG ){
3492         Page **apNew = (Page **)lsmRealloc(
3493             pEnv, apHier, sizeof(Page *)*(nHier+1)
3494         );
3495         if( apNew==0 ){
3496           rc = LSM_NOMEM_BKPT;
3497           break;
3498         }
3499         apHier = apNew;
3500         memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
3501         nHier++;
3502 
3503         apHier[0] = pPg;
3504         iPg = (int)pageGetPtr(aData, nData);
3505       }else{
3506         lsmFsPageRelease(pPg);
3507         break;
3508       }
3509     }while( 1 );
3510 
3511     if( rc==LSM_OK ){
3512       u8 *aData;
3513       int nData;
3514       aData = fsPageData(apHier[0], &nData);
3515       pMW->aSave[0].iPgno = pageGetPtr(aData, nData);
3516       p->nHier = nHier;
3517       p->apHier = apHier;
3518       rc = mergeWorkerMoveHierarchy(pMW, 0);
3519     }else{
3520       int i;
3521       for(i=0; i<nHier; i++){
3522         lsmFsPageRelease(apHier[i]);
3523       }
3524       lsmFree(pEnv, apHier);
3525     }
3526   }
3527 
3528   return rc;
3529 }
3530 
3531 /*
3532 ** B-tree pages use almost the same format as regular pages. The
3533 ** differences are:
3534 **
3535 **   1. The record format is (usually, see below) as follows:
3536 **
3537 **         + Type byte (always SORTED_SEPARATOR or SORTED_SYSTEM_SEPARATOR),
3538 **         + Absolute pointer value (varint),
3539 **         + Number of bytes in key (varint),
3540 **         + Blob containing key data.
3541 **
3542 **   2. All pointer values are stored as absolute values (not offsets
3543 **      relative to the footer pointer value).
3544 **
3545 **   3. Each pointer that is part of a record points to a page that
3546 **      contains keys smaller than the records key (note: not "equal to or
3547 **      smaller than - smaller than").
3548 **
3549 **   4. The pointer in the page footer of a b-tree page points to a page
3550 **      that contains keys equal to or larger than the largest key on the
3551 **      b-tree page.
3552 **
3553 ** The reason for having the page footer pointer point to the right-child
3554 ** (instead of the left) is that doing things this way makes the
3555 ** mergeWorkerMoveHierarchy() operation less complicated (since the pointers
3556 ** that need to be updated are all stored as fixed-size integers within the
3557 ** page footer, not varints in page records).
3558 **
3559 ** Records may not span b-tree pages. If this function is called to add a
3560 ** record larger than (page-size / 4) bytes, then a pointer to the indexed
3561 ** array page that contains the main record is added to the b-tree instead.
3562 ** In this case the record format is:
3563 **
3564 **         + 0x00 byte (1 byte)
3565 **         + Absolute pointer value (varint),
3566 **         + Absolute page number of page containing key (varint).
3567 **
3568 ** See function seekInBtree() for the code that traverses b-tree pages.
3569 */
3570 
mergeWorkerBtreeWrite(MergeWorker * pMW,u8 eType,Pgno iPtr,Pgno iKeyPg,void * pKey,int nKey)3571 static int mergeWorkerBtreeWrite(
3572   MergeWorker *pMW,
3573   u8 eType,
3574   Pgno iPtr,
3575   Pgno iKeyPg,
3576   void *pKey,
3577   int nKey
3578 ){
3579   Hierarchy *p = &pMW->hier;
3580   lsm_db *pDb = pMW->pDb;         /* Database handle */
3581   int rc = LSM_OK;                /* Return Code */
3582   int iLevel;                     /* Level of b-tree hierachy to write to */
3583   int nData;                      /* Size of aData[] in bytes */
3584   u8 *aData;                      /* Page data for level iLevel */
3585   int iOff;                       /* Offset on b-tree page to write record to */
3586   int nRec;                       /* Initial number of records on b-tree page */
3587 
3588   /* iKeyPg should be zero for an ordinary b-tree key, or non-zero for an
3589   ** indirect key. The flags byte for an indirect key is 0x00.  */
3590   assert( (eType==0)==(iKeyPg!=0) );
3591 
3592   /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
3593   ** hierarchy, the root node, and all nodes that lie on the path between.
3594   ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
3595   ** root page.
3596   **
3597   ** This loop searches for a node with enough space to store the key on,
3598   ** starting with the leaf and iterating up towards the root. When the loop
3599   ** exits, the key may be written to apHier[iLevel].  */
3600   for(iLevel=0; iLevel<=p->nHier; iLevel++){
3601     int nByte;                    /* Number of free bytes required */
3602 
3603     if( iLevel==p->nHier ){
3604       /* Extend the array and allocate a new root page. */
3605       Page **aNew;
3606       aNew = (Page **)lsmRealloc(
3607           pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
3608       );
3609       if( !aNew ){
3610         return LSM_NOMEM_BKPT;
3611       }
3612       p->apHier = aNew;
3613     }else{
3614       Page *pOld;
3615       int nFree;
3616 
3617       /* If the key will fit on this page, break out of the loop here.
3618       ** The new entry will be written to page apHier[iLevel]. */
3619       pOld = p->apHier[iLevel];
3620       assert( lsmFsPageWritable(pOld) );
3621       aData = fsPageData(pOld, &nData);
3622       if( eType==0 ){
3623         nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32((int)iKeyPg);
3624       }else{
3625         nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32(nKey) + nKey;
3626       }
3627       nRec = pageGetNRec(aData, nData);
3628       nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
3629       if( nByte<=nFree ) break;
3630 
3631       /* Otherwise, this page is full. Set the right-hand-child pointer
3632       ** to iPtr and release it.  */
3633       lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
3634       assert( lsmFsPageNumber(pOld)==0 );
3635       rc = lsmFsPagePersist(pOld);
3636       if( rc==LSM_OK ){
3637         iPtr = lsmFsPageNumber(pOld);
3638         lsmFsPageRelease(pOld);
3639       }
3640     }
3641 
3642     /* Allocate a new page for apHier[iLevel]. */
3643     p->apHier[iLevel] = 0;
3644     if( rc==LSM_OK ){
3645       rc = lsmFsSortedAppend(
3646           pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &p->apHier[iLevel]
3647       );
3648     }
3649     if( rc!=LSM_OK ) return rc;
3650 
3651     aData = fsPageData(p->apHier[iLevel], &nData);
3652     memset(aData, 0, nData);
3653     lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
3654     lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
3655 
3656     if( iLevel==p->nHier ){
3657       p->nHier++;
3658       break;
3659     }
3660   }
3661 
3662   /* Write the key into page apHier[iLevel]. */
3663   aData = fsPageData(p->apHier[iLevel], &nData);
3664   iOff = mergeWorkerPageOffset(aData, nData);
3665   nRec = pageGetNRec(aData, nData);
3666   lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
3667   lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
3668   if( eType==0 ){
3669     aData[iOff++] = 0x00;
3670     iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
3671     iOff += lsmVarintPut32(&aData[iOff], (int)iKeyPg);
3672   }else{
3673     aData[iOff++] = eType;
3674     iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
3675     iOff += lsmVarintPut32(&aData[iOff], nKey);
3676     memcpy(&aData[iOff], pKey, nKey);
3677   }
3678 
3679   return rc;
3680 }
3681 
mergeWorkerBtreeIndirect(MergeWorker * pMW)3682 static int mergeWorkerBtreeIndirect(MergeWorker *pMW){
3683   int rc = LSM_OK;
3684   if( pMW->iIndirect ){
3685     Pgno iKeyPg = pMW->aSave[1].iPgno;
3686     rc = mergeWorkerBtreeWrite(pMW, 0, pMW->iIndirect, iKeyPg, 0, 0);
3687     pMW->iIndirect = 0;
3688   }
3689   return rc;
3690 }
3691 
3692 /*
3693 ** Append the database key (iTopic/pKey/nKey) to the b-tree under
3694 ** construction. This key has not yet been written to a segment page.
3695 ** The pointer that will accompany the new key in the b-tree - that
3696 ** points to the completed segment page that contains keys smaller than
3697 ** (pKey/nKey) is currently stored in pMW->aSave[0].iPgno.
3698 */
mergeWorkerPushHierarchy(MergeWorker * pMW,int iTopic,void * pKey,int nKey)3699 static int mergeWorkerPushHierarchy(
3700   MergeWorker *pMW,               /* Merge worker object */
3701   int iTopic,                     /* Topic value for this key */
3702   void *pKey,                     /* Pointer to key buffer */
3703   int nKey                        /* Size of pKey buffer in bytes */
3704 ){
3705   int rc = LSM_OK;                /* Return Code */
3706   Pgno iPtr;                      /* Pointer value to accompany pKey/nKey */
3707 
3708   assert( pMW->aSave[0].bStore==0 );
3709   assert( pMW->aSave[1].bStore==0 );
3710   rc = mergeWorkerBtreeIndirect(pMW);
3711 
3712   /* Obtain the absolute pointer value to store along with the key in the
3713   ** page body. This pointer points to a page that contains keys that are
3714   ** smaller than pKey/nKey.  */
3715   iPtr = pMW->aSave[0].iPgno;
3716   assert( iPtr!=0 );
3717 
3718   /* Determine if the indirect format should be used. */
3719   if( (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)) ){
3720     pMW->iIndirect = iPtr;
3721     pMW->aSave[1].bStore = 1;
3722   }else{
3723     rc = mergeWorkerBtreeWrite(
3724         pMW, (u8)(iTopic | LSM_SEPARATOR), iPtr, 0, pKey, nKey
3725     );
3726   }
3727 
3728   /* Ensure that the SortedRun.iRoot field is correct. */
3729   return rc;
3730 }
3731 
mergeWorkerFinishHierarchy(MergeWorker * pMW)3732 static int mergeWorkerFinishHierarchy(
3733   MergeWorker *pMW                /* Merge worker object */
3734 ){
3735   int i;                          /* Used to loop through apHier[] */
3736   int rc = LSM_OK;                /* Return code */
3737   Pgno iPtr;                      /* New right-hand-child pointer value */
3738 
3739   iPtr = pMW->aSave[0].iPgno;
3740   for(i=0; i<pMW->hier.nHier && rc==LSM_OK; i++){
3741     Page *pPg = pMW->hier.apHier[i];
3742     int nData;                    /* Size of aData[] in bytes */
3743     u8 *aData;                    /* Page data for pPg */
3744 
3745     aData = fsPageData(pPg, &nData);
3746     lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
3747 
3748     rc = lsmFsPagePersist(pPg);
3749     iPtr = lsmFsPageNumber(pPg);
3750     lsmFsPageRelease(pPg);
3751   }
3752 
3753   if( pMW->hier.nHier ){
3754     pMW->pLevel->lhs.iRoot = iPtr;
3755     lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
3756     pMW->hier.apHier = 0;
3757     pMW->hier.nHier = 0;
3758   }
3759 
3760   return rc;
3761 }
3762 
mergeWorkerAddPadding(MergeWorker * pMW)3763 static int mergeWorkerAddPadding(
3764   MergeWorker *pMW                /* Merge worker object */
3765 ){
3766   FileSystem *pFS = pMW->pDb->pFS;
3767   return lsmFsSortedPadding(pFS, pMW->pDb->pWorker, &pMW->pLevel->lhs);
3768 }
3769 
3770 /*
3771 ** Release all page references currently held by the merge-worker passed
3772 ** as the only argument. Unless an error has occurred, all pages have
3773 ** already been released.
3774 */
mergeWorkerReleaseAll(MergeWorker * pMW)3775 static void mergeWorkerReleaseAll(MergeWorker *pMW){
3776   int i;
3777   lsmFsPageRelease(pMW->pPage);
3778   pMW->pPage = 0;
3779 
3780   for(i=0; i<pMW->hier.nHier; i++){
3781     lsmFsPageRelease(pMW->hier.apHier[i]);
3782     pMW->hier.apHier[i] = 0;
3783   }
3784   lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
3785   pMW->hier.apHier = 0;
3786   pMW->hier.nHier = 0;
3787 }
3788 
keyszToSkip(FileSystem * pFS,int nKey)3789 static int keyszToSkip(FileSystem *pFS, int nKey){
3790   int nPgsz;                /* Nominal database page size */
3791   nPgsz = lsmFsPageSize(pFS);
3792   return LSM_MIN(((nKey * 4) / nPgsz), 3);
3793 }
3794 
3795 /*
3796 ** Release the reference to the current output page of merge-worker *pMW
3797 ** (reference pMW->pPage). Set the page number values in aSave[] as
3798 ** required (see comments above struct MergeWorker for details).
3799 */
mergeWorkerPersistAndRelease(MergeWorker * pMW)3800 static int mergeWorkerPersistAndRelease(MergeWorker *pMW){
3801   int rc;
3802   int i;
3803 
3804   assert( pMW->pPage || (pMW->aSave[0].bStore==0 && pMW->aSave[1].bStore==0) );
3805 
3806   /* Persist the page */
3807   rc = lsmFsPagePersist(pMW->pPage);
3808 
3809   /* If required, save the page number. */
3810   for(i=0; i<2; i++){
3811     if( pMW->aSave[i].bStore ){
3812       pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
3813       pMW->aSave[i].bStore = 0;
3814     }
3815   }
3816 
3817   /* Release the completed output page. */
3818   lsmFsPageRelease(pMW->pPage);
3819   pMW->pPage = 0;
3820   return rc;
3821 }
3822 
3823 /*
3824 ** Advance to the next page of an output run being populated by merge-worker
3825 ** pMW. The footer of the new page is initialized to indicate that it contains
3826 ** zero records. The flags field is cleared. The page footer pointer field
3827 ** is set to iFPtr.
3828 **
3829 ** If successful, LSM_OK is returned. Otherwise, an error code.
3830 */
mergeWorkerNextPage(MergeWorker * pMW,Pgno iFPtr)3831 static int mergeWorkerNextPage(
3832   MergeWorker *pMW,               /* Merge worker object to append page to */
3833   Pgno iFPtr                      /* Pointer value for footer of new page */
3834 ){
3835   int rc = LSM_OK;                /* Return code */
3836   Page *pNext = 0;                /* New page appended to run */
3837   lsm_db *pDb = pMW->pDb;         /* Database handle */
3838 
3839   rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 0, &pNext);
3840   assert( rc || pMW->pLevel->lhs.iFirst>0 || pMW->pDb->compress.xCompress );
3841 
3842   if( rc==LSM_OK ){
3843     u8 *aData;                    /* Data buffer belonging to page pNext */
3844     int nData;                    /* Size of aData[] in bytes */
3845 
3846     rc = mergeWorkerPersistAndRelease(pMW);
3847 
3848     pMW->pPage = pNext;
3849     pMW->pLevel->pMerge->iOutputOff = 0;
3850     aData = fsPageData(pNext, &nData);
3851     lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
3852     lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
3853     lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);
3854     pMW->nWork++;
3855   }
3856 
3857   return rc;
3858 }
3859 
3860 /*
3861 ** Write a blob of data into an output segment being populated by a
3862 ** merge-worker object. If argument bSep is true, write into the separators
3863 ** array. Otherwise, the main array.
3864 **
3865 ** This function is used to write the blobs of data for keys and values.
3866 */
mergeWorkerData(MergeWorker * pMW,int bSep,int iFPtr,u8 * aWrite,int nWrite)3867 static int mergeWorkerData(
3868   MergeWorker *pMW,               /* Merge worker object */
3869   int bSep,                       /* True to write to separators run */
3870   int iFPtr,                      /* Footer ptr for new pages */
3871   u8 *aWrite,                     /* Write data from this buffer */
3872   int nWrite                      /* Size of aWrite[] in bytes */
3873 ){
3874   int rc = LSM_OK;                /* Return code */
3875   int nRem = nWrite;              /* Number of bytes still to write */
3876 
3877   while( rc==LSM_OK && nRem>0 ){
3878     Merge *pMerge = pMW->pLevel->pMerge;
3879     int nCopy;                    /* Number of bytes to copy */
3880     u8 *aData;                    /* Pointer to buffer of current output page */
3881     int nData;                    /* Size of aData[] in bytes */
3882     int nRec;                     /* Number of records on current output page */
3883     int iOff;                     /* Offset in aData[] to write to */
3884 
3885     assert( lsmFsPageWritable(pMW->pPage) );
3886 
3887     aData = fsPageData(pMW->pPage, &nData);
3888     nRec = pageGetNRec(aData, nData);
3889     iOff = pMerge->iOutputOff;
3890     nCopy = LSM_MIN(nRem, SEGMENT_EOF(nData, nRec) - iOff);
3891 
3892     memcpy(&aData[iOff], &aWrite[nWrite-nRem], nCopy);
3893     nRem -= nCopy;
3894 
3895     if( nRem>0 ){
3896       rc = mergeWorkerNextPage(pMW, iFPtr);
3897     }else{
3898       pMerge->iOutputOff = iOff + nCopy;
3899     }
3900   }
3901 
3902   return rc;
3903 }
3904 
3905 
3906 /*
3907 ** The MergeWorker passed as the only argument is working to merge two or
3908 ** more existing segments together (not to flush an in-memory tree). It
3909 ** has not yet written the first key to the first page of the output.
3910 */
mergeWorkerFirstPage(MergeWorker * pMW)3911 static int mergeWorkerFirstPage(MergeWorker *pMW){
3912   int rc = LSM_OK;                /* Return code */
3913   Page *pPg = 0;                  /* First page of run pSeg */
3914   int iFPtr = 0;                  /* Pointer value read from footer of pPg */
3915   MultiCursor *pCsr = pMW->pCsr;
3916 
3917   assert( pMW->pPage==0 );
3918 
3919   if( pCsr->pBtCsr ){
3920     rc = LSM_OK;
3921     iFPtr = (int)pMW->pLevel->pNext->lhs.iFirst;
3922   }else if( pCsr->nPtr>0 ){
3923     Segment *pSeg;
3924     pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
3925     rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg);
3926     if( rc==LSM_OK ){
3927       u8 *aData;                    /* Buffer for page pPg */
3928       int nData;                    /* Size of aData[] in bytes */
3929       aData = fsPageData(pPg, &nData);
3930       iFPtr = (int)pageGetPtr(aData, nData);
3931       lsmFsPageRelease(pPg);
3932     }
3933   }
3934 
3935   if( rc==LSM_OK ){
3936     rc = mergeWorkerNextPage(pMW, iFPtr);
3937     if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
3938     pMW->aSave[0].bStore = 1;
3939   }
3940 
3941   return rc;
3942 }
3943 
mergeWorkerWrite(MergeWorker * pMW,int eType,void * pKey,int nKey,void * pVal,int nVal,int iPtr)3944 static int mergeWorkerWrite(
3945   MergeWorker *pMW,               /* Merge worker object to write into */
3946   int eType,                      /* One of SORTED_SEPARATOR, WRITE or DELETE */
3947   void *pKey, int nKey,           /* Key value */
3948   void *pVal, int nVal,           /* Value value */
3949   int iPtr                        /* Absolute value of page pointer, or 0 */
3950 ){
3951   int rc = LSM_OK;                /* Return code */
3952   Merge *pMerge;                  /* Persistent part of level merge state */
3953   int nHdr;                       /* Space required for this record header */
3954   Page *pPg;                      /* Page to write to */
3955   u8 *aData;                      /* Data buffer for page pWriter->pPage */
3956   int nData = 0;                  /* Size of buffer aData[] in bytes */
3957   int nRec = 0;                   /* Number of records on page pPg */
3958   int iFPtr = 0;                  /* Value of pointer in footer of pPg */
3959   int iRPtr = 0;                  /* Value of pointer written into record */
3960   int iOff = 0;                   /* Current write offset within page pPg */
3961   Segment *pSeg;                  /* Segment being written */
3962   int flags = 0;                  /* If != 0, flags value for page footer */
3963   int bFirst = 0;                 /* True for first key of output run */
3964 
3965   pMerge = pMW->pLevel->pMerge;
3966   pSeg = &pMW->pLevel->lhs;
3967 
3968   if( pSeg->iFirst==0 && pMW->pPage==0 ){
3969     rc = mergeWorkerFirstPage(pMW);
3970     bFirst = 1;
3971   }
3972   pPg = pMW->pPage;
3973   if( pPg ){
3974     aData = fsPageData(pPg, &nData);
3975     nRec = pageGetNRec(aData, nData);
3976     iFPtr = (int)pageGetPtr(aData, nData);
3977     iRPtr = iPtr - iFPtr;
3978   }
3979 
3980   /* Figure out how much space is required by the new record. The space
3981   ** required is divided into two sections: the header and the body. The
3982   ** header consists of the intial varint fields. The body are the blobs
3983   ** of data that correspond to the key and value data. The entire header
3984   ** must be stored on the page. The body may overflow onto the next and
3985   ** subsequent pages.
3986   **
3987   ** The header space is:
3988   **
3989   **     1) record type - 1 byte.
3990   **     2) Page-pointer-offset - 1 varint
3991   **     3) Key size - 1 varint
3992   **     4) Value size - 1 varint (only if LSM_INSERT flag is set)
3993   */
3994   if( rc==LSM_OK ){
3995     nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
3996     if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);
3997 
3998     /* If the entire header will not fit on page pPg, or if page pPg is
3999     ** marked read-only, advance to the next page of the output run. */
4000     iOff = pMerge->iOutputOff;
4001     if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
4002       iFPtr = (int)*pMW->pCsr->pPrevMergePtr;
4003       iRPtr = iPtr - iFPtr;
4004       iOff = 0;
4005       nRec = 0;
4006       rc = mergeWorkerNextPage(pMW, iFPtr);
4007       pPg = pMW->pPage;
4008     }
4009   }
4010 
4011   /* If this record header will be the first on the page, and the page is
4012   ** not the very first in the entire run, add a copy of the key to the
4013   ** b-tree hierarchy.
4014   */
4015   if( rc==LSM_OK && nRec==0 && bFirst==0 ){
4016     assert( pMerge->nSkip>=0 );
4017 
4018     if( pMerge->nSkip==0 ){
4019       rc = mergeWorkerPushHierarchy(pMW, rtTopic(eType), pKey, nKey);
4020       assert( pMW->aSave[0].bStore==0 );
4021       pMW->aSave[0].bStore = 1;
4022       pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
4023     }else{
4024       pMerge->nSkip--;
4025       flags = PGFTR_SKIP_THIS_FLAG;
4026     }
4027 
4028     if( pMerge->nSkip ) flags |= PGFTR_SKIP_NEXT_FLAG;
4029   }
4030 
4031   /* Update the output segment */
4032   if( rc==LSM_OK ){
4033     aData = fsPageData(pPg, &nData);
4034 
4035     /* Update the page footer. */
4036     lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
4037     lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
4038     if( flags ) lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], (u16)flags);
4039 
4040     /* Write the entry header into the current page. */
4041     aData[iOff++] = (u8)eType;                                           /* 1 */
4042     iOff += lsmVarintPut32(&aData[iOff], iRPtr);                         /* 2 */
4043     iOff += lsmVarintPut32(&aData[iOff], nKey);                          /* 3 */
4044     if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal);   /* 4 */
4045     pMerge->iOutputOff = iOff;
4046 
4047     /* Write the key and data into the segment. */
4048     assert( iFPtr==pageGetPtr(aData, nData) );
4049     rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pKey, nKey);
4050     if( rc==LSM_OK && rtIsWrite(eType) ){
4051       if( rc==LSM_OK ){
4052         rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pVal, nVal);
4053       }
4054     }
4055   }
4056 
4057   return rc;
4058 }
4059 
4060 
4061 /*
4062 ** Free all resources allocated by mergeWorkerInit().
4063 */
mergeWorkerShutdown(MergeWorker * pMW,int * pRc)4064 static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){
4065   int i;                          /* Iterator variable */
4066   int rc = *pRc;
4067   MultiCursor *pCsr = pMW->pCsr;
4068 
4069   /* Unless the merge has finished, save the cursor position in the
4070   ** Merge.aInput[] array. See function mergeWorkerInit() for the
4071   ** code to restore a cursor position based on aInput[].  */
4072   if( rc==LSM_OK && pCsr && lsmMCursorValid(pCsr) ){
4073     Merge *pMerge = pMW->pLevel->pMerge;
4074     int bBtree = (pCsr->pBtCsr!=0);
4075     int iPtr;
4076 
4077     /* pMerge->nInput==0 indicates that this is a FlushTree() operation. */
4078     assert( pMerge->nInput==0 || pMW->pLevel->nRight>0 );
4079     assert( pMerge->nInput==0 || pMerge->nInput==(pCsr->nPtr+bBtree) );
4080 
4081     for(i=0; i<(pMerge->nInput-bBtree); i++){
4082       SegmentPtr *pPtr = &pCsr->aPtr[i];
4083       if( pPtr->pPg ){
4084         pMerge->aInput[i].iPg = lsmFsPageNumber(pPtr->pPg);
4085         pMerge->aInput[i].iCell = pPtr->iCell;
4086       }else{
4087         pMerge->aInput[i].iPg = 0;
4088         pMerge->aInput[i].iCell = 0;
4089       }
4090     }
4091     if( bBtree && pMerge->nInput ){
4092       assert( i==pCsr->nPtr );
4093       btreeCursorPosition(pCsr->pBtCsr, &pMerge->aInput[i]);
4094     }
4095 
4096     /* Store the location of the split-key */
4097     iPtr = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
4098     if( iPtr<pCsr->nPtr ){
4099       pMerge->splitkey = pMerge->aInput[iPtr];
4100     }else{
4101       btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
4102     }
4103 
4104     pMerge->iOutputOff = -1;
4105   }
4106 
4107   lsmMCursorClose(pCsr, 0);
4108 
4109   /* Persist and release the output page. */
4110   if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
4111   if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
4112   if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
4113   if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW);
4114   lsmFsFlushWaiting(pMW->pDb->pFS, &rc);
4115   mergeWorkerReleaseAll(pMW);
4116 
4117   lsmFree(pMW->pDb->pEnv, pMW->aGobble);
4118   pMW->aGobble = 0;
4119   pMW->pCsr = 0;
4120 
4121   *pRc = rc;
4122 }
4123 
4124 /*
4125 ** The cursor passed as the first argument is being used as the input for
4126 ** a merge operation. When this function is called, *piFlags contains the
4127 ** database entry flags for the current entry. The entry about to be written
4128 ** to the output.
4129 **
4130 ** Note that this function only has to work for cursors configured to
4131 ** iterate forwards (not backwards).
4132 */
mergeRangeDeletes(MultiCursor * pCsr,int * piVal,int * piFlags)4133 static void mergeRangeDeletes(MultiCursor *pCsr, int *piVal, int *piFlags){
4134   int f = *piFlags;
4135   int iKey = pCsr->aTree[1];
4136   int i;
4137 
4138   assert( pCsr->flags & CURSOR_NEXT_OK );
4139   if( pCsr->flags & CURSOR_IGNORE_DELETE ){
4140     /* The ignore-delete flag is set when the output of the merge will form
4141     ** the oldest level in the database. In this case there is no point in
4142     ** retaining any range-delete flags.  */
4143     assert( (f & LSM_POINT_DELETE)==0 );
4144     f &= ~(LSM_START_DELETE|LSM_END_DELETE);
4145   }else{
4146     for(i=0; i<(CURSOR_DATA_SEGMENT + pCsr->nPtr); i++){
4147       if( i!=iKey ){
4148         int eType;
4149         void *pKey;
4150         int nKey;
4151         int res;
4152         multiCursorGetKey(pCsr, i, &eType, &pKey, &nKey);
4153 
4154         if( pKey ){
4155           res = sortedKeyCompare(pCsr->pDb->xCmp,
4156               rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData,
4157               rtTopic(eType), pKey, nKey
4158           );
4159           assert( res<=0 );
4160           if( res==0 ){
4161             if( (f & (LSM_INSERT|LSM_POINT_DELETE))==0 ){
4162               if( eType & LSM_INSERT ){
4163                 f |= LSM_INSERT;
4164                 *piVal = i;
4165               }
4166               else if( eType & LSM_POINT_DELETE ){
4167                 f |= LSM_POINT_DELETE;
4168               }
4169             }
4170             f |= (eType & (LSM_END_DELETE|LSM_START_DELETE));
4171           }
4172 
4173           if( i>iKey && (eType & LSM_END_DELETE) && res<0 ){
4174             if( f & (LSM_INSERT|LSM_POINT_DELETE) ){
4175               f |= (LSM_END_DELETE|LSM_START_DELETE);
4176             }else{
4177               f = 0;
4178             }
4179             break;
4180           }
4181         }
4182       }
4183     }
4184 
4185     assert( (f & LSM_INSERT)==0 || (f & LSM_POINT_DELETE)==0 );
4186     if( (f & LSM_START_DELETE)
4187      && (f & LSM_END_DELETE)
4188      && (f & LSM_POINT_DELETE )
4189     ){
4190       f = 0;
4191     }
4192   }
4193 
4194   *piFlags = f;
4195 }
4196 
mergeWorkerStep(MergeWorker * pMW)4197 static int mergeWorkerStep(MergeWorker *pMW){
4198   lsm_db *pDb = pMW->pDb;       /* Database handle */
4199   MultiCursor *pCsr;            /* Cursor to read input data from */
4200   int rc = LSM_OK;              /* Return code */
4201   int eType;                    /* SORTED_SEPARATOR, WRITE or DELETE */
4202   void *pKey; int nKey;         /* Key */
4203   Pgno iPtr;
4204   int iVal;
4205 
4206   pCsr = pMW->pCsr;
4207 
4208   /* Pull the next record out of the source cursor. */
4209   lsmMCursorKey(pCsr, &pKey, &nKey);
4210   eType = pCsr->eType;
4211 
4212   /* Figure out if the output record may have a different pointer value
4213   ** than the previous. This is the case if the current key is identical to
4214   ** a key that appears in the lowest level run being merged. If so, set
4215   ** iPtr to the absolute pointer value. If not, leave iPtr set to zero,
4216   ** indicating that the output pointer value should be a copy of the pointer
4217   ** value written with the previous key.  */
4218   iPtr = (pCsr->pPrevMergePtr ? *pCsr->pPrevMergePtr : 0);
4219   if( pCsr->pBtCsr ){
4220     BtreeCursor *pBtCsr = pCsr->pBtCsr;
4221     if( pBtCsr->pKey ){
4222       int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
4223       if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
4224       if( 0==res ) iPtr = pBtCsr->iPtr;
4225       assert( res>=0 );
4226     }
4227   }else if( pCsr->nPtr ){
4228     SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1];
4229     if( pPtr->pPg
4230      && 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey)
4231     ){
4232       iPtr = pPtr->iPtr+pPtr->iPgPtr;
4233     }
4234   }
4235 
4236   iVal = pCsr->aTree[1];
4237   mergeRangeDeletes(pCsr, &iVal, &eType);
4238 
4239   if( eType!=0 ){
4240     if( pMW->aGobble ){
4241       int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
4242       if( iGobble<pCsr->nPtr && iGobble>=0 ){
4243         SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
4244         if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
4245           pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
4246         }
4247       }
4248     }
4249 
4250     /* If this is a separator key and we know that the output pointer has not
4251     ** changed, there is no point in writing an output record. Otherwise,
4252     ** proceed. */
4253     if( rc==LSM_OK && (rtIsSeparator(eType)==0 || iPtr!=0) ){
4254       /* Write the record into the main run. */
4255       void *pVal; int nVal;
4256       rc = multiCursorGetVal(pCsr, iVal, &pVal, &nVal);
4257       if( pVal && rc==LSM_OK ){
4258         assert( nVal>=0 );
4259         rc = sortedBlobSet(pDb->pEnv, &pCsr->val, pVal, nVal);
4260         pVal = pCsr->val.pData;
4261       }
4262       if( rc==LSM_OK ){
4263         rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pVal, nVal, (int)iPtr);
4264       }
4265     }
4266   }
4267 
4268   /* Advance the cursor to the next input record (assuming one exists). */
4269   assert( lsmMCursorValid(pMW->pCsr) );
4270   if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
4271 
4272   return rc;
4273 }
4274 
mergeWorkerDone(MergeWorker * pMW)4275 static int mergeWorkerDone(MergeWorker *pMW){
4276   return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
4277 }
4278 
sortedFreeLevel(lsm_env * pEnv,Level * p)4279 static void sortedFreeLevel(lsm_env *pEnv, Level *p){
4280   if( p ){
4281     lsmFree(pEnv, p->pSplitKey);
4282     lsmFree(pEnv, p->pMerge);
4283     lsmFree(pEnv, p->aRhs);
4284     lsmFree(pEnv, p);
4285   }
4286 }
4287 
sortedInvokeWorkHook(lsm_db * pDb)4288 static void sortedInvokeWorkHook(lsm_db *pDb){
4289   if( pDb->xWork ){
4290     pDb->xWork(pDb, pDb->pWorkCtx);
4291   }
4292 }
4293 
sortedNewToplevel(lsm_db * pDb,int eTree,int * pnWrite)4294 static int sortedNewToplevel(
4295   lsm_db *pDb,                    /* Connection handle */
4296   int eTree,                      /* One of the TREE_XXX constants */
4297   int *pnWrite                    /* OUT: Number of database pages written */
4298 ){
4299   int rc = LSM_OK;                /* Return Code */
4300   MultiCursor *pCsr = 0;
4301   Level *pNext = 0;               /* The current top level */
4302   Level *pNew;                    /* The new level itself */
4303   Segment *pLinked = 0;           /* Delete separators from this segment */
4304   Level *pDel = 0;                /* Delete this entire level */
4305   int nWrite = 0;                 /* Number of database pages written */
4306   Freelist freelist;
4307 
4308   if( eTree!=TREE_NONE ){
4309     rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
4310   }
4311 
4312   assert( pDb->bUseFreelist==0 );
4313   pDb->pFreelist = &freelist;
4314   pDb->bUseFreelist = 1;
4315   memset(&freelist, 0, sizeof(freelist));
4316 
4317   /* Allocate the new level structure to write to. */
4318   pNext = lsmDbSnapshotLevel(pDb->pWorker);
4319   pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
4320   if( pNew ){
4321     pNew->pNext = pNext;
4322     lsmDbSnapshotSetLevel(pDb->pWorker, pNew);
4323   }
4324 
4325   /* Create a cursor to gather the data required by the new segment. The new
4326   ** segment contains everything in the tree and pointers to the next segment
4327   ** in the database (if any).  */
4328   pCsr = multiCursorNew(pDb, &rc);
4329   if( pCsr ){
4330     pCsr->pDb = pDb;
4331     rc = multiCursorVisitFreelist(pCsr);
4332     if( rc==LSM_OK ){
4333       rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
4334     }
4335     if( rc==LSM_OK && pNext && pNext->pMerge==0 ){
4336       if( (pNext->flags & LEVEL_FREELIST_ONLY) ){
4337         pDel = pNext;
4338         pCsr->aPtr = lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr), &rc);
4339         multiCursorAddOne(pCsr, pNext, &rc);
4340       }else if( eTree!=TREE_NONE && pNext->lhs.iRoot ){
4341         pLinked = &pNext->lhs;
4342         rc = btreeCursorNew(pDb, pLinked, &pCsr->pBtCsr);
4343       }
4344     }
4345 
4346     /* If this will be the only segment in the database, discard any delete
4347     ** markers present in the in-memory tree.  */
4348     if( pNext==0 ){
4349       multiCursorIgnoreDelete(pCsr);
4350     }
4351   }
4352 
4353   if( rc!=LSM_OK ){
4354     lsmMCursorClose(pCsr, 0);
4355   }else{
4356     Pgno iLeftPtr = 0;
4357     Merge merge;                  /* Merge object used to create new level */
4358     MergeWorker mergeworker;      /* MergeWorker object for the same purpose */
4359 
4360     memset(&merge, 0, sizeof(Merge));
4361     memset(&mergeworker, 0, sizeof(MergeWorker));
4362 
4363     pNew->pMerge = &merge;
4364     pNew->flags |= LEVEL_INCOMPLETE;
4365     mergeworker.pDb = pDb;
4366     mergeworker.pLevel = pNew;
4367     mergeworker.pCsr = pCsr;
4368     pCsr->pPrevMergePtr = &iLeftPtr;
4369 
4370     /* Mark the separators array for the new level as a "phantom". */
4371     mergeworker.bFlush = 1;
4372 
4373     /* Do the work to create the new merged segment on disk */
4374     if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
4375     while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
4376       rc = mergeWorkerStep(&mergeworker);
4377     }
4378     mergeWorkerShutdown(&mergeworker, &rc);
4379     assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst );
4380     if( rc==LSM_OK && pNew->lhs.iFirst ){
4381       rc = lsmFsSortedFinish(pDb->pFS, &pNew->lhs);
4382     }
4383     nWrite = mergeworker.nWork;
4384     pNew->flags &= ~LEVEL_INCOMPLETE;
4385     if( eTree==TREE_NONE ){
4386       pNew->flags |= LEVEL_FREELIST_ONLY;
4387     }
4388     pNew->pMerge = 0;
4389   }
4390 
4391   if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
4392     assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
4393     lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
4394     sortedFreeLevel(pDb->pEnv, pNew);
4395   }else{
4396     if( pLinked ){
4397       pLinked->iRoot = 0;
4398     }else if( pDel ){
4399       assert( pNew->pNext==pDel );
4400       pNew->pNext = pDel->pNext;
4401       lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
4402       sortedFreeLevel(pDb->pEnv, pDel);
4403     }
4404 
4405 #if LSM_LOG_STRUCTURE
4406     lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "new-toplevel");
4407 #endif
4408 
4409     if( freelist.nEntry ){
4410       Freelist *p = &pDb->pWorker->freelist;
4411       lsmFree(pDb->pEnv, p->aEntry);
4412       memcpy(p, &freelist, sizeof(freelist));
4413       freelist.aEntry = 0;
4414     }else{
4415       pDb->pWorker->freelist.nEntry = 0;
4416     }
4417 
4418     assertBtreeOk(pDb, &pNew->lhs);
4419     sortedInvokeWorkHook(pDb);
4420   }
4421 
4422   if( pnWrite ) *pnWrite = nWrite;
4423   pDb->pWorker->nWrite += nWrite;
4424   pDb->pFreelist = 0;
4425   pDb->bUseFreelist = 0;
4426   lsmFree(pDb->pEnv, freelist.aEntry);
4427   return rc;
4428 }
4429 
4430 /*
4431 ** The nMerge levels in the LSM beginning with pLevel consist of a
4432 ** left-hand-side segment only. Replace these levels with a single new
4433 ** level consisting of a new empty segment on the left-hand-side and the
4434 ** nMerge segments from the replaced levels on the right-hand-side.
4435 **
4436 ** Also, allocate and populate a Merge object and set Level.pMerge to
4437 ** point to it.
4438 */
sortedMergeSetup(lsm_db * pDb,Level * pLevel,int nMerge,Level ** ppNew)4439 static int sortedMergeSetup(
4440   lsm_db *pDb,                    /* Database handle */
4441   Level *pLevel,                  /* First level to merge */
4442   int nMerge,                     /* Merge this many levels together */
4443   Level **ppNew                   /* New, merged, level */
4444 ){
4445   int rc = LSM_OK;                /* Return Code */
4446   Level *pNew;                    /* New Level object */
4447   int bUseNext = 0;               /* True to link in next separators */
4448   Merge *pMerge;                  /* New Merge object */
4449   int nByte;                      /* Bytes of space allocated at pMerge */
4450 
4451 #ifdef LSM_DEBUG
4452   int iLevel;
4453   Level *pX = pLevel;
4454   for(iLevel=0; iLevel<nMerge; iLevel++){
4455     assert( pX->nRight==0 );
4456     pX = pX->pNext;
4457   }
4458 #endif
4459 
4460   /* Allocate the new Level object */
4461   pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
4462   if( pNew ){
4463     pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv,
4464                                         nMerge * sizeof(Segment), &rc);
4465   }
4466 
4467   /* Populate the new Level object */
4468   if( rc==LSM_OK ){
4469     Level *pNext = 0;             /* Level following pNew */
4470     int i;
4471     int bFreeOnly = 1;
4472     Level *pTopLevel;
4473     Level *p = pLevel;
4474     Level **pp;
4475     pNew->nRight = nMerge;
4476     pNew->iAge = pLevel->iAge+1;
4477     for(i=0; i<nMerge; i++){
4478       assert( p->nRight==0 );
4479       pNext = p->pNext;
4480       pNew->aRhs[i] = p->lhs;
4481       if( (p->flags & LEVEL_FREELIST_ONLY)==0 ) bFreeOnly = 0;
4482       sortedFreeLevel(pDb->pEnv, p);
4483       p = pNext;
4484     }
4485 
4486     if( bFreeOnly ) pNew->flags |= LEVEL_FREELIST_ONLY;
4487 
4488     /* Replace the old levels with the new. */
4489     pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4490     pNew->pNext = p;
4491     for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext));
4492     *pp = pNew;
4493     lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel);
4494 
4495     /* Determine whether or not the next separators will be linked in */
4496     if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot && pNext
4497      && (bFreeOnly==0 || (pNext->flags & LEVEL_FREELIST_ONLY))
4498     ){
4499       bUseNext = 1;
4500     }
4501   }
4502 
4503   /* Allocate the merge object */
4504   nByte = sizeof(Merge) + sizeof(MergeInput) * (nMerge + bUseNext);
4505   pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
4506   if( pMerge ){
4507     pMerge->aInput = (MergeInput *)&pMerge[1];
4508     pMerge->nInput = nMerge + bUseNext;
4509     pNew->pMerge = pMerge;
4510   }
4511 
4512   *ppNew = pNew;
4513   return rc;
4514 }
4515 
mergeWorkerInit(lsm_db * pDb,Level * pLevel,MergeWorker * pMW)4516 static int mergeWorkerInit(
4517   lsm_db *pDb,                    /* Db connection to do merge work */
4518   Level *pLevel,                  /* Level to work on merging */
4519   MergeWorker *pMW                /* Object to initialize */
4520 ){
4521   int rc = LSM_OK;                /* Return code */
4522   Merge *pMerge = pLevel->pMerge; /* Persistent part of merge state */
4523   MultiCursor *pCsr = 0;          /* Cursor opened for pMW */
4524   Level *pNext = pLevel->pNext;   /* Next level in LSM */
4525 
4526   assert( pDb->pWorker );
4527   assert( pLevel->pMerge );
4528   assert( pLevel->nRight>0 );
4529 
4530   memset(pMW, 0, sizeof(MergeWorker));
4531   pMW->pDb = pDb;
4532   pMW->pLevel = pLevel;
4533   pMW->aGobble = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno) * pLevel->nRight, &rc);
4534 
4535   /* Create a multi-cursor to read the data to write to the new
4536   ** segment. The new segment contains:
4537   **
4538   **   1. Records from LHS of each of the nMerge levels being merged.
4539   **   2. Separators from either the last level being merged, or the
4540   **      separators attached to the LHS of the following level, or neither.
4541   **
4542   ** If the new level is the lowest (oldest) in the db, discard any
4543   ** delete keys. Key annihilation.
4544   */
4545   pCsr = multiCursorNew(pDb, &rc);
4546   if( pCsr ){
4547     pCsr->flags |= CURSOR_NEXT_OK;
4548     rc = multiCursorAddRhs(pCsr, pLevel);
4549   }
4550   if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){
4551     rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr);
4552   }else if( pNext ){
4553     multiCursorReadSeparators(pCsr);
4554   }else{
4555     multiCursorIgnoreDelete(pCsr);
4556   }
4557 
4558   assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nPtr+(pCsr->pBtCsr!=0)) );
4559   pMW->pCsr = pCsr;
4560 
4561   /* Load the b-tree hierarchy into memory. */
4562   if( rc==LSM_OK ) rc = mergeWorkerLoadHierarchy(pMW);
4563   if( rc==LSM_OK && pMW->hier.nHier==0 ){
4564     pMW->aSave[0].iPgno = pLevel->lhs.iFirst;
4565   }
4566 
4567   /* Position the cursor. */
4568   if( rc==LSM_OK ){
4569     pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
4570     if( pLevel->lhs.iFirst==0 ){
4571       /* The output array is still empty. So position the cursor at the very
4572       ** start of the input.  */
4573       rc = multiCursorEnd(pCsr, 0);
4574     }else{
4575       /* The output array is non-empty. Position the cursor based on the
4576       ** page/cell data saved in the Merge.aInput[] array.  */
4577       int i;
4578       for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
4579         MergeInput *pInput = &pMerge->aInput[i];
4580         if( pInput->iPg ){
4581           SegmentPtr *pPtr;
4582           assert( pCsr->aPtr[i].pPg==0 );
4583           pPtr = &pCsr->aPtr[i];
4584           rc = segmentPtrLoadPage(pDb->pFS, pPtr, (int)pInput->iPg);
4585           if( rc==LSM_OK && pPtr->nCell>0 ){
4586             rc = segmentPtrLoadCell(pPtr, pInput->iCell);
4587           }
4588         }
4589       }
4590 
4591       if( rc==LSM_OK && pCsr->pBtCsr ){
4592         int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
4593         assert( i==pCsr->nPtr );
4594         rc = btreeCursorRestore(pCsr->pBtCsr, xCmp, &pMerge->aInput[i]);
4595       }
4596 
4597       if( rc==LSM_OK ){
4598         rc = multiCursorSetupTree(pCsr, 0);
4599       }
4600     }
4601     pCsr->flags |= CURSOR_NEXT_OK;
4602   }
4603 
4604   return rc;
4605 }
4606 
sortedBtreeGobble(lsm_db * pDb,MultiCursor * pCsr,int iGobble)4607 static int sortedBtreeGobble(
4608   lsm_db *pDb,                    /* Worker connection */
4609   MultiCursor *pCsr,              /* Multi-cursor being used for a merge */
4610   int iGobble                     /* pCsr->aPtr[] entry to operate on */
4611 ){
4612   int rc = LSM_OK;
4613   if( rtTopic(pCsr->eType)==0 ){
4614     Segment *pSeg = pCsr->aPtr[iGobble].pSeg;
4615     Pgno *aPg;
4616     int nPg;
4617 
4618     /* Seek from the root of the b-tree to the segment leaf that may contain
4619     ** a key equal to the one multi-cursor currently points to. Record the
4620     ** page number of each b-tree page and the leaf. The segment may be
4621     ** gobbled up to (but not including) the first of these page numbers.
4622     */
4623     assert( pSeg->iRoot>0 );
4624     aPg = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno)*32, &rc);
4625     if( rc==LSM_OK ){
4626       rc = seekInBtree(pCsr, pSeg,
4627           rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, aPg, 0
4628       );
4629     }
4630 
4631     if( rc==LSM_OK ){
4632       for(nPg=0; aPg[nPg]; nPg++);
4633       lsmFsGobble(pDb, pSeg, aPg, nPg);
4634     }
4635 
4636     lsmFree(pDb->pEnv, aPg);
4637   }
4638   return rc;
4639 }
4640 
4641 /*
4642 ** Argument p points to a level of age N. Return the number of levels in
4643 ** the linked list starting at p that have age=N (always at least 1).
4644 */
sortedCountLevels(Level * p)4645 static int sortedCountLevels(Level *p){
4646   int iAge = p->iAge;
4647   int nRet = 0;
4648   do {
4649     nRet++;
4650     p = p->pNext;
4651   }while( p && p->iAge==iAge );
4652   return nRet;
4653 }
4654 
sortedSelectLevel(lsm_db * pDb,int nMerge,Level ** ppOut)4655 static int sortedSelectLevel(lsm_db *pDb, int nMerge, Level **ppOut){
4656   Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4657   int rc = LSM_OK;
4658   Level *pLevel = 0;            /* Output value */
4659   Level *pBest = 0;             /* Best level to work on found so far */
4660   int nBest;                    /* Number of segments merged at pBest */
4661   Level *pThis = 0;             /* First in run of levels with age=iAge */
4662   int nThis = 0;                /* Number of levels starting at pThis */
4663 
4664   assert( nMerge>=1 );
4665   nBest = LSM_MAX(1, nMerge-1);
4666 
4667   /* Find the longest contiguous run of levels not currently undergoing a
4668   ** merge with the same age in the structure. Or the level being merged
4669   ** with the largest number of right-hand segments. Work on it. */
4670   for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
4671     if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
4672       nThis++;
4673     }else{
4674       if( nThis>nBest ){
4675         if( (pLevel->iAge!=pThis->iAge+1)
4676          || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
4677         ){
4678           pBest = pThis;
4679           nBest = nThis;
4680         }
4681       }
4682       if( pLevel->nRight ){
4683         if( pLevel->nRight>nBest ){
4684           nBest = pLevel->nRight;
4685           pBest = pLevel;
4686         }
4687         nThis = 0;
4688         pThis = 0;
4689       }else{
4690         pThis = pLevel;
4691         nThis = 1;
4692       }
4693     }
4694   }
4695   if( nThis>nBest ){
4696     assert( pThis );
4697     pBest = pThis;
4698     nBest = nThis;
4699   }
4700 
4701   if( pBest==0 && nMerge==1 ){
4702     int nFree = 0;
4703     int nUsr = 0;
4704     for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
4705       assert( !pLevel->nRight );
4706       if( pLevel->flags & LEVEL_FREELIST_ONLY ){
4707         nFree++;
4708       }else{
4709         nUsr++;
4710       }
4711     }
4712     if( nUsr>1 ){
4713       pBest = pTopLevel;
4714       nBest = nFree + nUsr;
4715     }
4716   }
4717 
4718   if( pBest ){
4719     if( pBest->nRight==0 ){
4720       rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);
4721     }else{
4722       *ppOut = pBest;
4723     }
4724   }
4725 
4726   return rc;
4727 }
4728 
sortedDbIsFull(lsm_db * pDb)4729 static int sortedDbIsFull(lsm_db *pDb){
4730   Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);
4731 
4732   if( lsmDatabaseFull(pDb) ) return 1;
4733   if( pTop && pTop->iAge==0
4734    && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
4735   ){
4736     return 1;
4737   }
4738   return 0;
4739 }
4740 
4741 typedef struct MoveBlockCtx MoveBlockCtx;
4742 struct MoveBlockCtx {
4743   int iSeen;                      /* Previous free block on list */
4744   int iFrom;                      /* Total number of blocks in file */
4745 };
4746 
moveBlockCb(void * pCtx,int iBlk,i64 iSnapshot)4747 static int moveBlockCb(void *pCtx, int iBlk, i64 iSnapshot){
4748   MoveBlockCtx *p = (MoveBlockCtx *)pCtx;
4749   assert( p->iFrom==0 );
4750   if( iBlk==(p->iSeen-1) ){
4751     p->iSeen = iBlk;
4752     return 0;
4753   }
4754   p->iFrom = p->iSeen-1;
4755   return 1;
4756 }
4757 
4758 /*
4759 ** This function is called to further compact a database for which all
4760 ** of the content has already been merged into a single segment. If
4761 ** possible, it moves the contents of a single block from the end of the
4762 ** file to a free-block that lies closer to the start of the file (allowing
4763 ** the file to be eventually truncated).
4764 */
sortedMoveBlock(lsm_db * pDb,int * pnWrite)4765 static int sortedMoveBlock(lsm_db *pDb, int *pnWrite){
4766   Snapshot *p = pDb->pWorker;
4767   Level *pLvl = lsmDbSnapshotLevel(p);
4768   int iFrom;                      /* Block to move */
4769   int iTo;                        /* Destination to move block to */
4770   int rc;                         /* Return code */
4771 
4772   MoveBlockCtx sCtx;
4773 
4774   assert( pLvl->pNext==0 && pLvl->nRight==0 );
4775   assert( p->redirect.n<=LSM_MAX_BLOCK_REDIRECTS );
4776 
4777   *pnWrite = 0;
4778 
4779   /* Check that the redirect array is not already full. If it is, return
4780   ** without moving any database content.  */
4781   if( p->redirect.n>=LSM_MAX_BLOCK_REDIRECTS ) return LSM_OK;
4782 
4783   /* Find the last block of content in the database file. Do this by
4784   ** traversing the free-list in reverse (descending block number) order.
4785   ** The first block not on the free list is the one that will be moved.
4786   ** Since the db consists of a single segment, there is no ambiguity as
4787   ** to which segment the block belongs to.  */
4788   sCtx.iSeen = p->nBlock+1;
4789   sCtx.iFrom = 0;
4790   rc = lsmWalkFreelist(pDb, 1, moveBlockCb, &sCtx);
4791   if( rc!=LSM_OK || sCtx.iFrom==0 ) return rc;
4792   iFrom = sCtx.iFrom;
4793 
4794   /* Find the first free block in the database, ignoring block 1. Block
4795   ** 1 is tricky as it is smaller than the other blocks.  */
4796   rc = lsmBlockAllocate(pDb, iFrom, &iTo);
4797   if( rc!=LSM_OK || iTo==0 ) return rc;
4798   assert( iTo!=1 && iTo<iFrom );
4799 
4800   rc = lsmFsMoveBlock(pDb->pFS, &pLvl->lhs, iTo, iFrom);
4801   if( rc==LSM_OK ){
4802     if( p->redirect.a==0 ){
4803       int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS;
4804       p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
4805     }
4806     if( rc==LSM_OK ){
4807 
4808       /* Check if the block just moved was already redirected. */
4809       int i;
4810       for(i=0; i<p->redirect.n; i++){
4811         if( p->redirect.a[i].iTo==iFrom ) break;
4812       }
4813 
4814       if( i==p->redirect.n ){
4815         /* Block iFrom was not already redirected. Add a new array entry. */
4816         memmove(&p->redirect.a[1], &p->redirect.a[0],
4817             sizeof(struct RedirectEntry) * p->redirect.n
4818             );
4819         p->redirect.a[0].iFrom = iFrom;
4820         p->redirect.a[0].iTo = iTo;
4821         p->redirect.n++;
4822       }else{
4823         /* Block iFrom was already redirected. Overwrite existing entry. */
4824         p->redirect.a[i].iTo = iTo;
4825       }
4826 
4827       rc = lsmBlockFree(pDb, iFrom);
4828 
4829       *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
4830       pLvl->lhs.pRedirect = &p->redirect;
4831     }
4832   }
4833 
4834 #if LSM_LOG_STRUCTURE
4835   if( rc==LSM_OK ){
4836     char aBuf[64];
4837     sprintf(aBuf, "move-block %d/%d", p->redirect.n-1, LSM_MAX_BLOCK_REDIRECTS);
4838     lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, aBuf);
4839   }
4840 #endif
4841   return rc;
4842 }
4843 
4844 /*
4845 */
mergeInsertFreelistSegments(lsm_db * pDb,int nFree,MergeWorker * pMW)4846 static int mergeInsertFreelistSegments(
4847   lsm_db *pDb,
4848   int nFree,
4849   MergeWorker *pMW
4850 ){
4851   int rc = LSM_OK;
4852   if( nFree>0 ){
4853     MultiCursor *pCsr = pMW->pCsr;
4854     Level *pLvl = pMW->pLevel;
4855     SegmentPtr *aNew1;
4856     Segment *aNew2;
4857 
4858     Level *pIter;
4859     Level *pNext;
4860     int i = 0;
4861 
4862     aNew1 = (SegmentPtr *)lsmMallocZeroRc(
4863         pDb->pEnv, sizeof(SegmentPtr) * (pCsr->nPtr+nFree), &rc
4864     );
4865     if( rc ) return rc;
4866     memcpy(&aNew1[nFree], pCsr->aPtr, sizeof(SegmentPtr)*pCsr->nPtr);
4867     pCsr->nPtr += nFree;
4868     lsmFree(pDb->pEnv, pCsr->aTree);
4869     lsmFree(pDb->pEnv, pCsr->aPtr);
4870     pCsr->aTree = 0;
4871     pCsr->aPtr = aNew1;
4872 
4873     aNew2 = (Segment *)lsmMallocZeroRc(
4874         pDb->pEnv, sizeof(Segment) * (pLvl->nRight+nFree), &rc
4875     );
4876     if( rc ) return rc;
4877     memcpy(&aNew2[nFree], pLvl->aRhs, sizeof(Segment)*pLvl->nRight);
4878     pLvl->nRight += nFree;
4879     lsmFree(pDb->pEnv, pLvl->aRhs);
4880     pLvl->aRhs = aNew2;
4881 
4882     for(pIter=pDb->pWorker->pLevel; rc==LSM_OK && pIter!=pLvl; pIter=pNext){
4883       Segment *pSeg = &pLvl->aRhs[i];
4884       memcpy(pSeg, &pIter->lhs, sizeof(Segment));
4885 
4886       pCsr->aPtr[i].pSeg = pSeg;
4887       pCsr->aPtr[i].pLevel = pLvl;
4888       rc = segmentPtrEnd(pCsr, &pCsr->aPtr[i], 0);
4889 
4890       pDb->pWorker->pLevel = pNext = pIter->pNext;
4891       sortedFreeLevel(pDb->pEnv, pIter);
4892       i++;
4893     }
4894     assert( i==nFree );
4895     assert( rc!=LSM_OK || pDb->pWorker->pLevel==pLvl );
4896 
4897     for(i=nFree; i<pCsr->nPtr; i++){
4898       pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
4899     }
4900 
4901     lsmFree(pDb->pEnv, pMW->aGobble);
4902     pMW->aGobble = 0;
4903   }
4904   return rc;
4905 }
4906 
sortedWork(lsm_db * pDb,int nWork,int nMerge,int bFlush,int * pnWrite)4907 static int sortedWork(
4908   lsm_db *pDb,                    /* Database handle. Must be worker. */
4909   int nWork,                      /* Number of pages of work to do */
4910   int nMerge,                     /* Try to merge this many levels at once */
4911   int bFlush,                     /* Set if call is to make room for a flush */
4912   int *pnWrite                    /* OUT: Actual number of pages written */
4913 ){
4914   int rc = LSM_OK;                /* Return Code */
4915   int nRemaining = nWork;         /* Units of work to do before returning */
4916   Snapshot *pWorker = pDb->pWorker;
4917 
4918   assert( pWorker );
4919   if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;
4920 
4921   while( nRemaining>0 ){
4922     Level *pLevel = 0;
4923 
4924     /* Find a level to work on. */
4925     rc = sortedSelectLevel(pDb, nMerge, &pLevel);
4926     assert( rc==LSM_OK || pLevel==0 );
4927 
4928     if( pLevel==0 ){
4929       int nDone = 0;
4930       Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4931       if( bFlush==0 && nMerge==1 && pTopLevel && pTopLevel->pNext==0 ){
4932         rc = sortedMoveBlock(pDb, &nDone);
4933       }
4934       nRemaining -= nDone;
4935 
4936       /* Could not find any work to do. Finished. */
4937       if( nDone==0 ) break;
4938     }else{
4939       int bSave = 0;
4940       Freelist freelist = {0, 0, 0};
4941       MergeWorker mergeworker;    /* State used to work on the level merge */
4942 
4943       assert( pDb->bIncrMerge==0 );
4944       assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
4945 
4946       pDb->bIncrMerge = 1;
4947       rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
4948       assert( mergeworker.nWork==0 );
4949 
4950       while( rc==LSM_OK
4951           && 0==mergeWorkerDone(&mergeworker)
4952           && (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
4953       ){
4954         int eType = rtTopic(mergeworker.pCsr->eType);
4955         rc = mergeWorkerStep(&mergeworker);
4956 
4957         /* If the cursor now points at the first entry past the end of the
4958         ** user data (i.e. either to EOF or to the first free-list entry
4959         ** that will be added to the run), then check if it is possible to
4960         ** merge in any free-list entries that are either in-memory or in
4961         ** free-list-only blocks.  */
4962         if( rc==LSM_OK && nMerge==1 && eType==0
4963          && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
4964         ){
4965           int nFree = 0;          /* Number of free-list-only levels to merge */
4966           Level *pLvl;
4967           assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
4968 
4969           /* Now check if all levels containing data newer than this one
4970           ** are single-segment free-list only levels. If so, they will be
4971           ** merged in now.  */
4972           for(pLvl=pDb->pWorker->pLevel;
4973               pLvl!=mergeworker.pLevel && (pLvl->flags & LEVEL_FREELIST_ONLY);
4974               pLvl=pLvl->pNext
4975           ){
4976             assert( pLvl->nRight==0 );
4977             nFree++;
4978           }
4979           if( pLvl==mergeworker.pLevel ){
4980 
4981             rc = mergeInsertFreelistSegments(pDb, nFree, &mergeworker);
4982             if( rc==LSM_OK ){
4983               rc = multiCursorVisitFreelist(mergeworker.pCsr);
4984             }
4985             if( rc==LSM_OK ){
4986               rc = multiCursorSetupTree(mergeworker.pCsr, 0);
4987               pDb->pFreelist = &freelist;
4988               pDb->bUseFreelist = 1;
4989             }
4990           }
4991         }
4992       }
4993       nRemaining -= LSM_MAX(mergeworker.nWork, 1);
4994 
4995       if( rc==LSM_OK ){
4996         /* Check if the merge operation is completely finished. If not,
4997         ** gobble up (declare eligible for recycling) any pages from rhs
4998         ** segments for which the content has been completely merged into
4999         ** the lhs of the level.  */
5000         if( mergeWorkerDone(&mergeworker)==0 ){
5001           int i;
5002           for(i=0; i<pLevel->nRight; i++){
5003             SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
5004             if( pGobble->pSeg->iRoot ){
5005               rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
5006             }else if( mergeworker.aGobble[i] ){
5007               lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
5008             }
5009           }
5010         }else{
5011           int i;
5012           int bEmpty;
5013           mergeWorkerShutdown(&mergeworker, &rc);
5014           bEmpty = (pLevel->lhs.iFirst==0);
5015 
5016           if( bEmpty==0 && rc==LSM_OK ){
5017             rc = lsmFsSortedFinish(pDb->pFS, &pLevel->lhs);
5018           }
5019 
5020           if( pDb->bUseFreelist ){
5021             Freelist *p = &pDb->pWorker->freelist;
5022             lsmFree(pDb->pEnv, p->aEntry);
5023             memcpy(p, &freelist, sizeof(freelist));
5024             pDb->bUseFreelist = 0;
5025             pDb->pFreelist = 0;
5026             bSave = 1;
5027           }
5028 
5029           for(i=0; i<pLevel->nRight; i++){
5030             lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
5031           }
5032 
5033           if( bEmpty ){
5034             /* If the new level is completely empty, remove it from the
5035             ** database snapshot. This can only happen if all input keys were
5036             ** annihilated. Since keys are only annihilated if the new level
5037             ** is the last in the linked list (contains the most ancient of
5038             ** database content), this guarantees that pLevel->pNext==0.  */
5039             Level *pTop;          /* Top level of worker snapshot */
5040             Level **pp;           /* Read/write iterator for Level.pNext list */
5041 
5042             assert( pLevel->pNext==0 );
5043 
5044             /* Remove the level from the worker snapshot. */
5045             pTop = lsmDbSnapshotLevel(pWorker);
5046             for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
5047             *pp = pLevel->pNext;
5048             lsmDbSnapshotSetLevel(pWorker, pTop);
5049 
5050             /* Free the Level structure. */
5051             sortedFreeLevel(pDb->pEnv, pLevel);
5052           }else{
5053 
5054             /* Free the separators of the next level, if required. */
5055             if( pLevel->pMerge->nInput > pLevel->nRight ){
5056               assert( pLevel->pNext->lhs.iRoot );
5057               pLevel->pNext->lhs.iRoot = 0;
5058             }
5059 
5060             /* Zero the right-hand-side of pLevel */
5061             lsmFree(pDb->pEnv, pLevel->aRhs);
5062             pLevel->nRight = 0;
5063             pLevel->aRhs = 0;
5064 
5065             /* Free the Merge object */
5066             lsmFree(pDb->pEnv, pLevel->pMerge);
5067             pLevel->pMerge = 0;
5068           }
5069 
5070           if( bSave && rc==LSM_OK ){
5071             pDb->bIncrMerge = 0;
5072             rc = lsmSaveWorker(pDb, 0);
5073           }
5074         }
5075       }
5076 
5077       /* Clean up the MergeWorker object initialized above. If no error
5078       ** has occurred, invoke the work-hook to inform the application that
5079       ** the database structure has changed. */
5080       mergeWorkerShutdown(&mergeworker, &rc);
5081       pDb->bIncrMerge = 0;
5082       if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);
5083 
5084 #if LSM_LOG_STRUCTURE
5085       lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "work");
5086 #endif
5087       assertBtreeOk(pDb, &pLevel->lhs);
5088       assertRunInOrder(pDb, &pLevel->lhs);
5089 
5090       /* If bFlush is true and the database is no longer considered "full",
5091       ** break out of the loop even if nRemaining is still greater than
5092       ** zero. The caller has an in-memory tree to flush to disk.  */
5093       if( bFlush && sortedDbIsFull(pDb)==0 ) break;
5094     }
5095   }
5096 
5097   if( pnWrite ) *pnWrite = (nWork - nRemaining);
5098   pWorker->nWrite += (nWork - nRemaining);
5099 
5100 #ifdef LSM_LOG_WORK
5101   lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
5102 #endif
5103   return rc;
5104 }
5105 
5106 /*
5107 ** The database connection passed as the first argument must be a worker
5108 ** connection. This function checks if there exists an "old" in-memory tree
5109 ** ready to be flushed to disk. If so, true is returned. Otherwise false.
5110 **
5111 ** If an error occurs, *pRc is set to an LSM error code before returning.
5112 ** It is assumed that *pRc is set to LSM_OK when this function is called.
5113 */
sortedTreeHasOld(lsm_db * pDb,int * pRc)5114 static int sortedTreeHasOld(lsm_db *pDb, int *pRc){
5115   int rc = LSM_OK;
5116   int bRet = 0;
5117 
5118   assert( pDb->pWorker );
5119   if( *pRc==LSM_OK ){
5120     if( rc==LSM_OK
5121         && pDb->treehdr.iOldShmid
5122         && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff
5123       ){
5124       bRet = 1;
5125     }else{
5126       bRet = 0;
5127     }
5128     *pRc = rc;
5129   }
5130   assert( *pRc==LSM_OK || bRet==0 );
5131   return bRet;
5132 }
5133 
5134 /*
5135 ** Create a new free-list only top-level segment. Return LSM_OK if successful
5136 ** or an LSM error code if some error occurs.
5137 */
sortedNewFreelistOnly(lsm_db * pDb)5138 static int sortedNewFreelistOnly(lsm_db *pDb){
5139   return sortedNewToplevel(pDb, TREE_NONE, 0);
5140 }
5141 
lsmSaveWorker(lsm_db * pDb,int bFlush)5142 int lsmSaveWorker(lsm_db *pDb, int bFlush){
5143   Snapshot *p = pDb->pWorker;
5144   if( p->freelist.nEntry>pDb->nMaxFreelist ){
5145     int rc = sortedNewFreelistOnly(pDb);
5146     if( rc!=LSM_OK ) return rc;
5147   }
5148   return lsmCheckpointSaveWorker(pDb, bFlush);
5149 }
5150 
doLsmSingleWork(lsm_db * pDb,int bShutdown,int nMerge,int nPage,int * pnWrite,int * pbCkpt)5151 static int doLsmSingleWork(
5152   lsm_db *pDb,
5153   int bShutdown,
5154   int nMerge,                     /* Minimum segments to merge together */
5155   int nPage,                      /* Number of pages to write to disk */
5156   int *pnWrite,                   /* OUT: Pages actually written to disk */
5157   int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
5158 ){
5159   Snapshot *pWorker;              /* Worker snapshot */
5160   int rc = LSM_OK;                /* Return code */
5161   int bDirty = 0;
5162   int nMax = nPage;               /* Maximum pages to write to disk */
5163   int nRem = nPage;
5164   int bCkpt = 0;
5165 
5166   assert( nPage>0 );
5167 
5168   /* Open the worker 'transaction'. It will be closed before this function
5169   ** returns.  */
5170   assert( pDb->pWorker==0 );
5171   rc = lsmBeginWork(pDb);
5172   if( rc!=LSM_OK ) return rc;
5173   pWorker = pDb->pWorker;
5174 
5175   /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
5176   ** that this call stops writing when the auto-checkpoint is due. The
5177   ** caller will do the checkpoint, then possibly call this function again. */
5178   if( bShutdown==0 && pDb->nAutockpt ){
5179     u32 nSync;
5180     u32 nUnsync;
5181     int nPgsz;
5182 
5183     lsmCheckpointSynced(pDb, 0, 0, &nSync);
5184     nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
5185     nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);
5186 
5187     nMax = (int)LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (int)(nUnsync-nSync));
5188     if( nMax<nRem ){
5189       bCkpt = 1;
5190       nRem = LSM_MAX(nMax, 0);
5191     }
5192   }
5193 
5194   /* If there exists in-memory data ready to be flushed to disk, attempt
5195   ** to flush it now.  */
5196   if( pDb->nTransOpen==0 ){
5197     rc = lsmTreeLoadHeader(pDb, 0);
5198   }
5199   if( sortedTreeHasOld(pDb, &rc) ){
5200     /* sortedDbIsFull() returns non-zero if either (a) there are too many
5201     ** levels in total in the db, or (b) there are too many levels with the
5202     ** the same age in the db. Either way, call sortedWork() to merge
5203     ** existing segments together until this condition is cleared.  */
5204     if( sortedDbIsFull(pDb) ){
5205       int nPg = 0;
5206       rc = sortedWork(pDb, nRem, nMerge, 1, &nPg);
5207       nRem -= nPg;
5208       assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
5209       bDirty = 1;
5210     }
5211 
5212     if( rc==LSM_OK && nRem>0 ){
5213       int nPg = 0;
5214       rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
5215       nRem -= nPg;
5216       if( rc==LSM_OK ){
5217         if( pDb->nTransOpen>0 ){
5218           lsmTreeDiscardOld(pDb);
5219         }
5220         rc = lsmSaveWorker(pDb, 1);
5221         bDirty = 0;
5222       }
5223     }
5224   }
5225 
5226   /* If nPage is still greater than zero, do some merging. */
5227   if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
5228     int nPg = 0;
5229     rc = sortedWork(pDb, nRem, nMerge, 0, &nPg);
5230     nRem -= nPg;
5231     if( nPg ) bDirty = 1;
5232   }
5233 
5234   /* If the in-memory part of the free-list is too large, write a new
5235   ** top-level containing just the in-memory free-list entries to disk. */
5236   if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
5237     int nPg = 0;
5238     while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
5239       rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
5240       nRem -= nPg;
5241     }
5242     if( rc==LSM_OK ){
5243       rc = sortedNewFreelistOnly(pDb);
5244     }
5245     nRem -= nPg;
5246     if( nPg ) bDirty = 1;
5247   }
5248 
5249   if( rc==LSM_OK ){
5250     *pnWrite = (nMax - nRem);
5251     *pbCkpt = (bCkpt && nRem<=0);
5252     if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0
5253      && pWorker->pLevel
5254      && pWorker->pLevel->nRight==0
5255      && pWorker->pLevel->pNext==0
5256     ){
5257       *pbCkpt = 1;
5258     }
5259   }
5260 
5261   if( rc==LSM_OK && bDirty ){
5262     lsmFinishWork(pDb, 0, &rc);
5263   }else{
5264     int rcdummy = LSM_BUSY;
5265     lsmFinishWork(pDb, 0, &rcdummy);
5266     *pnWrite = 0;
5267   }
5268   assert( pDb->pWorker==0 );
5269   return rc;
5270 }
5271 
doLsmWork(lsm_db * pDb,int nMerge,int nPage,int * pnWrite)5272 static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
5273   int rc = LSM_OK;                /* Return code */
5274   int nWrite = 0;                 /* Number of pages written */
5275 
5276   assert( nMerge>=1 );
5277 
5278   if( nPage!=0 ){
5279     int bCkpt = 0;
5280     do {
5281       int nThis = 0;
5282       int nReq = (nPage>=0) ? (nPage-nWrite) : ((int)0x7FFFFFFF);
5283 
5284       bCkpt = 0;
5285       rc = doLsmSingleWork(pDb, 0, nMerge, nReq, &nThis, &bCkpt);
5286       nWrite += nThis;
5287       if( rc==LSM_OK && bCkpt ){
5288         rc = lsm_checkpoint(pDb, 0);
5289       }
5290     }while( rc==LSM_OK && bCkpt && (nWrite<nPage || nPage<0) );
5291   }
5292 
5293   if( pnWrite ){
5294     if( rc==LSM_OK ){
5295       *pnWrite = nWrite;
5296     }else{
5297       *pnWrite = 0;
5298     }
5299   }
5300   return rc;
5301 }
5302 
5303 /*
5304 ** Perform work to merge database segments together.
5305 */
lsm_work(lsm_db * pDb,int nMerge,int nKB,int * pnWrite)5306 int lsm_work(lsm_db *pDb, int nMerge, int nKB, int *pnWrite){
5307   int rc;                         /* Return code */
5308   int nPgsz;                      /* Nominal page size in bytes */
5309   int nPage;                      /* Equivalent of nKB in pages */
5310   int nWrite = 0;                 /* Number of pages written */
5311 
5312   /* This function may not be called if pDb has an open read or write
5313   ** transaction. Return LSM_MISUSE if an application attempts this.  */
5314   if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;
5315   if( nMerge<=0 ) nMerge = pDb->nMerge;
5316 
5317   lsmFsPurgeCache(pDb->pFS);
5318 
5319   /* Convert from KB to pages */
5320   nPgsz = lsmFsPageSize(pDb->pFS);
5321   if( nKB>=0 ){
5322     nPage = ((i64)nKB * 1024 + nPgsz - 1) / nPgsz;
5323   }else{
5324     nPage = -1;
5325   }
5326 
5327   rc = doLsmWork(pDb, nMerge, nPage, &nWrite);
5328 
5329   if( pnWrite ){
5330     /* Convert back from pages to KB */
5331     *pnWrite = (int)(((i64)nWrite * 1024 + nPgsz - 1) / nPgsz);
5332   }
5333   return rc;
5334 }
5335 
lsm_flush(lsm_db * db)5336 int lsm_flush(lsm_db *db){
5337   int rc;
5338 
5339   if( db->nTransOpen>0 || db->pCsr ){
5340     rc = LSM_MISUSE_BKPT;
5341   }else{
5342     rc = lsmBeginWriteTrans(db);
5343     if( rc==LSM_OK ){
5344       lsmFlushTreeToDisk(db);
5345       lsmTreeDiscardOld(db);
5346       lsmTreeMakeOld(db);
5347       lsmTreeDiscardOld(db);
5348     }
5349 
5350     if( rc==LSM_OK ){
5351       rc = lsmFinishWriteTrans(db, 1);
5352     }else{
5353       lsmFinishWriteTrans(db, 0);
5354     }
5355     lsmFinishReadTrans(db);
5356   }
5357 
5358   return rc;
5359 }
5360 
5361 /*
5362 ** This function is called in auto-work mode to perform merging work on
5363 ** the data structure. It performs enough merging work to prevent the
5364 ** height of the tree from growing indefinitely assuming that roughly
5365 ** nUnit database pages worth of data have been written to the database
5366 ** (i.e. the in-memory tree) since the last call.
5367 */
lsmSortedAutoWork(lsm_db * pDb,int nUnit)5368 int lsmSortedAutoWork(
5369   lsm_db *pDb,                    /* Database handle */
5370   int nUnit                       /* Pages of data written to in-memory tree */
5371 ){
5372   int rc = LSM_OK;                /* Return code */
5373   int nDepth = 0;                 /* Current height of tree (longest path) */
5374   Level *pLevel;                  /* Used to iterate through levels */
5375   int bRestore = 0;
5376 
5377   assert( pDb->pWorker==0 );
5378   assert( pDb->nTransOpen>0 );
5379 
5380   /* Determine how many units of work to do before returning. One unit of
5381   ** work is achieved by writing one page (~4KB) of merged data.  */
5382   for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){
5383     /* nDepth += LSM_MAX(1, pLevel->nRight); */
5384     nDepth += 1;
5385   }
5386   if( lsmTreeHasOld(pDb) ){
5387     nDepth += 1;
5388     bRestore = 1;
5389     rc = lsmSaveCursors(pDb);
5390     if( rc!=LSM_OK ) return rc;
5391   }
5392 
5393   if( nDepth>0 ){
5394     int nRemaining;               /* Units of work to do before returning */
5395 
5396     nRemaining = nUnit * nDepth;
5397 #ifdef LSM_LOG_WORK
5398     lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages",
5399         nUnit, nDepth, nRemaining);
5400 #endif
5401     assert( nRemaining>=0 );
5402     rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
5403     if( rc==LSM_BUSY ) rc = LSM_OK;
5404 
5405     if( bRestore && pDb->pCsr ){
5406       lsmMCursorFreeCache(pDb);
5407       lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
5408       pDb->pClient = 0;
5409       if( rc==LSM_OK ){
5410         rc = lsmCheckpointLoad(pDb, 0);
5411       }
5412       if( rc==LSM_OK ){
5413         rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
5414       }
5415       if( rc==LSM_OK ){
5416         rc = lsmRestoreCursors(pDb);
5417       }
5418     }
5419   }
5420 
5421   return rc;
5422 }
5423 
5424 /*
5425 ** This function is only called during system shutdown. The contents of
5426 ** any in-memory trees present (old or current) are written out to disk.
5427 */
lsmFlushTreeToDisk(lsm_db * pDb)5428 int lsmFlushTreeToDisk(lsm_db *pDb){
5429   int rc;
5430 
5431   rc = lsmBeginWork(pDb);
5432   while( rc==LSM_OK && sortedDbIsFull(pDb) ){
5433     rc = sortedWork(pDb, 256, pDb->nMerge, 1, 0);
5434   }
5435 
5436   if( rc==LSM_OK ){
5437     rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
5438   }
5439 
5440   lsmFinishWork(pDb, 1, &rc);
5441   return rc;
5442 }
5443 
5444 /*
5445 ** Return a string representation of the segment passed as the only argument.
5446 ** Space for the returned string is allocated using lsmMalloc(), and should
5447 ** be freed by the caller using lsmFree().
5448 */
segToString(lsm_env * pEnv,Segment * pSeg,int nMin)5449 static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){
5450   int nSize = pSeg->nSize;
5451   Pgno iRoot = pSeg->iRoot;
5452   Pgno iFirst = pSeg->iFirst;
5453   Pgno iLast = pSeg->iLastPg;
5454   char *z;
5455 
5456   char *z1;
5457   char *z2;
5458   int nPad;
5459 
5460   z1 = lsmMallocPrintf(pEnv, "%d.%d", iFirst, iLast);
5461   if( iRoot ){
5462     z2 = lsmMallocPrintf(pEnv, "root=%d", iRoot);
5463   }else{
5464     z2 = lsmMallocPrintf(pEnv, "size=%d", nSize);
5465   }
5466 
5467   nPad = nMin - 2 - strlen(z1) - 1 - strlen(z2);
5468   nPad = LSM_MAX(0, nPad);
5469 
5470   if( iRoot ){
5471     z = lsmMallocPrintf(pEnv, "/%s %*s%s\\", z1, nPad, "", z2);
5472   }else{
5473     z = lsmMallocPrintf(pEnv, "|%s %*s%s|", z1, nPad, "", z2);
5474   }
5475   lsmFree(pEnv, z1);
5476   lsmFree(pEnv, z2);
5477 
5478   return z;
5479 }
5480 
fileToString(lsm_db * pDb,char * aBuf,int nBuf,int nMin,Segment * pSeg)5481 static int fileToString(
5482   lsm_db *pDb,                    /* For xMalloc() */
5483   char *aBuf,
5484   int nBuf,
5485   int nMin,
5486   Segment *pSeg
5487 ){
5488   int i = 0;
5489   if( pSeg ){
5490     char *zSeg;
5491 
5492     zSeg = segToString(pDb->pEnv, pSeg, nMin);
5493     snprintf(&aBuf[i], nBuf-i, "%s", zSeg);
5494     i += strlen(&aBuf[i]);
5495     lsmFree(pDb->pEnv, zSeg);
5496 
5497 #ifdef LSM_LOG_FREELIST
5498     lsmInfoArrayStructure(pDb, 1, pSeg->iFirst, &zSeg);
5499     snprintf(&aBuf[i], nBuf-1, "    (%s)", zSeg);
5500     i += strlen(&aBuf[i]);
5501     lsmFree(pDb->pEnv, zSeg);
5502 #endif
5503     aBuf[nBuf] = 0;
5504   }else{
5505     aBuf[0] = '\0';
5506   }
5507 
5508   return i;
5509 }
5510 
sortedDumpPage(lsm_db * pDb,Segment * pRun,Page * pPg,int bVals)5511 void sortedDumpPage(lsm_db *pDb, Segment *pRun, Page *pPg, int bVals){
5512   Blob blob = {0, 0, 0};         /* Blob used for keys */
5513   LsmString s;
5514   int i;
5515 
5516   int nRec;
5517   int iPtr;
5518   int flags;
5519   u8 *aData;
5520   int nData;
5521 
5522   aData = fsPageData(pPg, &nData);
5523 
5524   nRec = pageGetNRec(aData, nData);
5525   iPtr = (int)pageGetPtr(aData, nData);
5526   flags = pageGetFlags(aData, nData);
5527 
5528   lsmStringInit(&s, pDb->pEnv);
5529   lsmStringAppendf(&s,"nCell=%d iPtr=%d flags=%d {", nRec, iPtr, flags);
5530   if( flags&SEGMENT_BTREE_FLAG ) iPtr = 0;
5531 
5532   for(i=0; i<nRec; i++){
5533     Page *pRef = 0;               /* Pointer to page iRef */
5534     int iChar;
5535     u8 *aKey; int nKey = 0;       /* Key */
5536     u8 *aVal = 0; int nVal = 0;   /* Value */
5537     int iTopic;
5538     u8 *aCell;
5539     int iPgPtr;
5540     int eType;
5541 
5542     aCell = pageGetCell(aData, nData, i);
5543     eType = *aCell++;
5544     assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 );
5545     aCell += lsmVarintGet32(aCell, &iPgPtr);
5546 
5547     if( eType==0 ){
5548       Pgno iRef;                  /* Page number of referenced page */
5549       aCell += lsmVarintGet64(aCell, &iRef);
5550       lsmFsDbPageGet(pDb->pFS, pRun, iRef, &pRef);
5551       aKey = pageGetKey(pRun, pRef, 0, &iTopic, &nKey, &blob);
5552     }else{
5553       aCell += lsmVarintGet32(aCell, &nKey);
5554       if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
5555       sortedReadData(0, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
5556       aVal = &aKey[nKey];
5557       iTopic = eType;
5558     }
5559 
5560     lsmStringAppendf(&s, "%s%2X:", (i==0?"":" "), iTopic);
5561     for(iChar=0; iChar<nKey; iChar++){
5562       lsmStringAppendf(&s, "%c", isalnum(aKey[iChar]) ? aKey[iChar] : '.');
5563     }
5564     if( nVal>0 && bVals ){
5565       lsmStringAppendf(&s, "##");
5566       for(iChar=0; iChar<nVal; iChar++){
5567         lsmStringAppendf(&s, "%c", isalnum(aVal[iChar]) ? aVal[iChar] : '.');
5568       }
5569     }
5570 
5571     lsmStringAppendf(&s, " %d", iPgPtr+iPtr);
5572     lsmFsPageRelease(pRef);
5573   }
5574   lsmStringAppend(&s, "}", 1);
5575 
5576   lsmLogMessage(pDb, LSM_OK, "      Page %d: %s", lsmFsPageNumber(pPg), s.z);
5577   lsmStringClear(&s);
5578 
5579   sortedBlobFree(&blob);
5580 }
5581 
infoCellDump(lsm_db * pDb,Segment * pSeg,int bIndirect,Page * pPg,int iCell,int * peType,int * piPgPtr,u8 ** paKey,int * pnKey,u8 ** paVal,int * pnVal,Blob * pBlob)5582 static void infoCellDump(
5583   lsm_db *pDb,                    /* Database handle */
5584   Segment *pSeg,                  /* Segment page belongs to */
5585   int bIndirect,                  /* True to follow indirect refs */
5586   Page *pPg,
5587   int iCell,
5588   int *peType,
5589   int *piPgPtr,
5590   u8 **paKey, int *pnKey,
5591   u8 **paVal, int *pnVal,
5592   Blob *pBlob
5593 ){
5594   u8 *aData; int nData;           /* Page data */
5595   u8 *aKey; int nKey = 0;         /* Key */
5596   u8 *aVal = 0; int nVal = 0;     /* Value */
5597   int eType;
5598   int iPgPtr;
5599   Page *pRef = 0;                 /* Pointer to page iRef */
5600   u8 *aCell;
5601 
5602   aData = fsPageData(pPg, &nData);
5603 
5604   aCell = pageGetCell(aData, nData, iCell);
5605   eType = *aCell++;
5606   aCell += lsmVarintGet32(aCell, &iPgPtr);
5607 
5608   if( eType==0 ){
5609     int dummy;
5610     Pgno iRef;                  /* Page number of referenced page */
5611     aCell += lsmVarintGet64(aCell, &iRef);
5612     if( bIndirect ){
5613       lsmFsDbPageGet(pDb->pFS, pSeg, iRef, &pRef);
5614       pageGetKeyCopy(pDb->pEnv, pSeg, pRef, 0, &dummy, pBlob);
5615       aKey = (u8 *)pBlob->pData;
5616       nKey = pBlob->nData;
5617       lsmFsPageRelease(pRef);
5618     }else{
5619       aKey = (u8 *)"<indirect>";
5620       nKey = 11;
5621     }
5622   }else{
5623     aCell += lsmVarintGet32(aCell, &nKey);
5624     if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
5625     sortedReadData(pSeg, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob);
5626     aVal = &aKey[nKey];
5627   }
5628 
5629   if( peType ) *peType = eType;
5630   if( piPgPtr ) *piPgPtr = iPgPtr;
5631   if( paKey ) *paKey = aKey;
5632   if( paVal ) *paVal = aVal;
5633   if( pnKey ) *pnKey = nKey;
5634   if( pnVal ) *pnVal = nVal;
5635 }
5636 
infoAppendBlob(LsmString * pStr,int bHex,u8 * z,int n)5637 static int infoAppendBlob(LsmString *pStr, int bHex, u8 *z, int n){
5638   int iChar;
5639   for(iChar=0; iChar<n; iChar++){
5640     if( bHex ){
5641       lsmStringAppendf(pStr, "%02X", z[iChar]);
5642     }else{
5643       lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.');
5644     }
5645   }
5646   return LSM_OK;
5647 }
5648 
5649 #define INFO_PAGE_DUMP_DATA     0x01
5650 #define INFO_PAGE_DUMP_VALUES   0x02
5651 #define INFO_PAGE_DUMP_HEX      0x04
5652 #define INFO_PAGE_DUMP_INDIRECT 0x08
5653 
infoPageDump(lsm_db * pDb,Pgno iPg,int flags,char ** pzOut)5654 static int infoPageDump(
5655   lsm_db *pDb,                    /* Database handle */
5656   Pgno iPg,                       /* Page number of page to dump */
5657   int flags,
5658   char **pzOut                    /* OUT: lsmMalloc'd string */
5659 ){
5660   int rc = LSM_OK;                /* Return code */
5661   Page *pPg = 0;                  /* Handle for page iPg */
5662   int i, j;                       /* Loop counters */
5663   const int perLine = 16;         /* Bytes per line in the raw hex dump */
5664   Segment *pSeg = 0;
5665   Snapshot *pSnap;
5666 
5667   int bValues = (flags & INFO_PAGE_DUMP_VALUES);
5668   int bHex = (flags & INFO_PAGE_DUMP_HEX);
5669   int bData = (flags & INFO_PAGE_DUMP_DATA);
5670   int bIndirect = (flags & INFO_PAGE_DUMP_INDIRECT);
5671 
5672   *pzOut = 0;
5673   if( iPg==0 ) return LSM_ERROR;
5674 
5675   assert( pDb->pClient || pDb->pWorker );
5676   pSnap = pDb->pClient;
5677   if( pSnap==0 ) pSnap = pDb->pWorker;
5678   if( pSnap->redirect.n>0 ){
5679     Level *pLvl;
5680     int bUse = 0;
5681     for(pLvl=pSnap->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
5682     pSeg = (pLvl->nRight==0 ? &pLvl->lhs : &pLvl->aRhs[pLvl->nRight-1]);
5683     rc = lsmFsSegmentContainsPg(pDb->pFS, pSeg, iPg, &bUse);
5684     if( bUse==0 ){
5685       pSeg = 0;
5686     }
5687   }
5688 
5689   /* iPg is a real page number (not subject to redirection). So it is safe
5690   ** to pass a NULL in place of the segment pointer as the second argument
5691   ** to lsmFsDbPageGet() here.  */
5692   if( rc==LSM_OK ){
5693     rc = lsmFsDbPageGet(pDb->pFS, 0, iPg, &pPg);
5694   }
5695 
5696   if( rc==LSM_OK ){
5697     Blob blob = {0, 0, 0, 0};
5698     int nKeyWidth = 0;
5699     LsmString str;
5700     int nRec;
5701     int iPtr;
5702     int flags2;
5703     int iCell;
5704     u8 *aData; int nData;         /* Page data and size thereof */
5705 
5706     aData = fsPageData(pPg, &nData);
5707     nRec = pageGetNRec(aData, nData);
5708     iPtr = (int)pageGetPtr(aData, nData);
5709     flags2 = pageGetFlags(aData, nData);
5710 
5711     lsmStringInit(&str, pDb->pEnv);
5712     lsmStringAppendf(&str, "Page : %lld  (%d bytes)\n", iPg, nData);
5713     lsmStringAppendf(&str, "nRec : %d\n", nRec);
5714     lsmStringAppendf(&str, "iPtr : %d\n", iPtr);
5715     lsmStringAppendf(&str, "flags: %04x\n", flags2);
5716     lsmStringAppendf(&str, "\n");
5717 
5718     for(iCell=0; iCell<nRec; iCell++){
5719       int nKey;
5720       infoCellDump(
5721           pDb, pSeg, bIndirect, pPg, iCell, 0, 0, 0, &nKey, 0, 0, &blob
5722       );
5723       if( nKey>nKeyWidth ) nKeyWidth = nKey;
5724     }
5725     if( bHex ) nKeyWidth = nKeyWidth * 2;
5726 
5727     for(iCell=0; iCell<nRec; iCell++){
5728       u8 *aKey; int nKey = 0;       /* Key */
5729       u8 *aVal; int nVal = 0;       /* Value */
5730       int iPgPtr;
5731       int eType;
5732       Pgno iAbsPtr;
5733       char zFlags[8];
5734 
5735       infoCellDump(pDb, pSeg, bIndirect, pPg, iCell, &eType, &iPgPtr,
5736           &aKey, &nKey, &aVal, &nVal, &blob
5737       );
5738       iAbsPtr = iPgPtr + ((flags2 & SEGMENT_BTREE_FLAG) ? 0 : iPtr);
5739 
5740       lsmFlagsToString(eType, zFlags);
5741       lsmStringAppendf(&str, "%s %d (%s) ",
5742           zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
5743       );
5744       infoAppendBlob(&str, bHex, aKey, nKey);
5745       if( nVal>0 && bValues ){
5746         lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), "");
5747         lsmStringAppendf(&str, " ");
5748         infoAppendBlob(&str, bHex, aVal, nVal);
5749       }
5750       if( rtTopic(eType) ){
5751         int iBlk = (int)~lsmGetU32(aKey);
5752         lsmStringAppendf(&str, "  (block=%d", iBlk);
5753         if( nVal>0 ){
5754           i64 iSnap = lsmGetU64(aVal);
5755           lsmStringAppendf(&str, " snapshot=%lld", iSnap);
5756         }
5757         lsmStringAppendf(&str, ")");
5758       }
5759       lsmStringAppendf(&str, "\n");
5760     }
5761 
5762     if( bData ){
5763       lsmStringAppendf(&str, "\n-------------------"
5764           "-------------------------------------------------------------\n");
5765       lsmStringAppendf(&str, "Page %d\n",
5766           iPg, (iPg-1)*nData, iPg*nData - 1);
5767       for(i=0; i<nData; i += perLine){
5768         lsmStringAppendf(&str, "%04x: ", i);
5769         for(j=0; j<perLine; j++){
5770           if( i+j>nData ){
5771             lsmStringAppendf(&str, "   ");
5772           }else{
5773             lsmStringAppendf(&str, "%02x ", aData[i+j]);
5774           }
5775         }
5776         lsmStringAppendf(&str, "  ");
5777         for(j=0; j<perLine; j++){
5778           if( i+j>nData ){
5779             lsmStringAppendf(&str, " ");
5780           }else{
5781             lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
5782           }
5783         }
5784         lsmStringAppendf(&str,"\n");
5785       }
5786     }
5787 
5788     *pzOut = str.z;
5789     sortedBlobFree(&blob);
5790     lsmFsPageRelease(pPg);
5791   }
5792 
5793   return rc;
5794 }
5795 
lsmInfoPageDump(lsm_db * pDb,Pgno iPg,int bHex,char ** pzOut)5796 int lsmInfoPageDump(
5797   lsm_db *pDb,                    /* Database handle */
5798   Pgno iPg,                       /* Page number of page to dump */
5799   int bHex,                       /* True to output key/value in hex form */
5800   char **pzOut                    /* OUT: lsmMalloc'd string */
5801 ){
5802   int flags = INFO_PAGE_DUMP_DATA | INFO_PAGE_DUMP_VALUES;
5803   if( bHex ) flags |= INFO_PAGE_DUMP_HEX;
5804   return infoPageDump(pDb, iPg, flags, pzOut);
5805 }
5806 
sortedDumpSegment(lsm_db * pDb,Segment * pRun,int bVals)5807 void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){
5808   assert( pDb->xLog );
5809   if( pRun && pRun->iFirst ){
5810     int flags = (bVals ? INFO_PAGE_DUMP_VALUES : 0);
5811     char *zSeg;
5812     Page *pPg;
5813 
5814     zSeg = segToString(pDb->pEnv, pRun, 0);
5815     lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
5816     lsmFree(pDb->pEnv, zSeg);
5817 
5818     lsmFsDbPageGet(pDb->pFS, pRun, pRun->iFirst, &pPg);
5819     while( pPg ){
5820       Page *pNext;
5821       char *z = 0;
5822       infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z);
5823       lsmLogMessage(pDb, LSM_OK, "%s", z);
5824       lsmFree(pDb->pEnv, z);
5825 #if 0
5826       sortedDumpPage(pDb, pRun, pPg, bVals);
5827 #endif
5828       lsmFsDbPageNext(pRun, pPg, 1, &pNext);
5829       lsmFsPageRelease(pPg);
5830       pPg = pNext;
5831     }
5832   }
5833 }
5834 
5835 /*
5836 ** Invoke the log callback zero or more times with messages that describe
5837 ** the current database structure.
5838 */
lsmSortedDumpStructure(lsm_db * pDb,Snapshot * pSnap,int bKeys,int bVals,const char * zWhy)5839 void lsmSortedDumpStructure(
5840   lsm_db *pDb,                    /* Database handle (used for xLog callback) */
5841   Snapshot *pSnap,                /* Snapshot to dump */
5842   int bKeys,                      /* Output the keys from each segment */
5843   int bVals,                      /* Output the values from each segment */
5844   const char *zWhy                /* Caption to print near top of dump */
5845 ){
5846   Snapshot *pDump = pSnap;
5847   Level *pTopLevel;
5848   char *zFree = 0;
5849 
5850   assert( pSnap );
5851   pTopLevel = lsmDbSnapshotLevel(pDump);
5852   if( pDb->xLog && pTopLevel ){
5853     static int nCall = 0;
5854     Level *pLevel;
5855     int iLevel = 0;
5856 
5857     nCall++;
5858     lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);
5859 
5860 #if 0
5861     if( nCall==1031 || nCall==1032 ) bKeys=1;
5862 #endif
5863 
5864     for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
5865       char zLeft[1024];
5866       char zRight[1024];
5867       int i = 0;
5868 
5869       Segment *aLeft[24];
5870       Segment *aRight[24];
5871 
5872       int nLeft = 0;
5873       int nRight = 0;
5874 
5875       Segment *pSeg = &pLevel->lhs;
5876       aLeft[nLeft++] = pSeg;
5877 
5878       for(i=0; i<pLevel->nRight; i++){
5879         aRight[nRight++] = &pLevel->aRhs[i];
5880       }
5881 
5882 #ifdef LSM_LOG_FREELIST
5883       if( nRight ){
5884         memmove(&aRight[1], aRight, sizeof(aRight[0])*nRight);
5885         aRight[0] = 0;
5886         nRight++;
5887       }
5888 #endif
5889 
5890       for(i=0; i<nLeft || i<nRight; i++){
5891         int iPad = 0;
5892         char zLevel[32];
5893         zLeft[0] = '\0';
5894         zRight[0] = '\0';
5895 
5896         if( i<nLeft ){
5897           fileToString(pDb, zLeft, sizeof(zLeft), 24, aLeft[i]);
5898         }
5899         if( i<nRight ){
5900           fileToString(pDb, zRight, sizeof(zRight), 24, aRight[i]);
5901         }
5902 
5903         if( i==0 ){
5904           snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d) (flags=%.4x)",
5905               iLevel, (int)pLevel->iAge, (int)pLevel->flags
5906           );
5907         }else{
5908           zLevel[0] = '\0';
5909         }
5910 
5911         if( nRight==0 ){
5912           iPad = 10;
5913         }
5914 
5915         lsmLogMessage(pDb, LSM_OK, "% 25s % *s% -35s %s",
5916             zLevel, iPad, "", zLeft, zRight
5917         );
5918       }
5919 
5920       iLevel++;
5921     }
5922 
5923     if( bKeys ){
5924       for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
5925         int i;
5926         sortedDumpSegment(pDb, &pLevel->lhs, bVals);
5927         for(i=0; i<pLevel->nRight; i++){
5928           sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
5929         }
5930       }
5931     }
5932   }
5933 
5934   lsmInfoFreelist(pDb, &zFree);
5935   lsmLogMessage(pDb, LSM_OK, "Freelist: %s", zFree);
5936   lsmFree(pDb->pEnv, zFree);
5937 
5938   assert( lsmFsIntegrityCheck(pDb) );
5939 }
5940 
lsmSortedFreeLevel(lsm_env * pEnv,Level * pLevel)5941 void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
5942   Level *pNext;
5943   Level *p;
5944 
5945   for(p=pLevel; p; p=pNext){
5946     pNext = p->pNext;
5947     sortedFreeLevel(pEnv, p);
5948   }
5949 }
5950 
lsmSortedSaveTreeCursors(lsm_db * pDb)5951 void lsmSortedSaveTreeCursors(lsm_db *pDb){
5952   MultiCursor *pCsr;
5953   for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
5954     lsmTreeCursorSave(pCsr->apTreeCsr[0]);
5955     lsmTreeCursorSave(pCsr->apTreeCsr[1]);
5956   }
5957 }
5958 
lsmSortedExpandBtreePage(Page * pPg,int nOrig)5959 void lsmSortedExpandBtreePage(Page *pPg, int nOrig){
5960   u8 *aData;
5961   int nData;
5962   int nEntry;
5963   int iHdr;
5964 
5965   aData = lsmFsPageData(pPg, &nData);
5966   nEntry = pageGetNRec(aData, nOrig);
5967   iHdr = SEGMENT_EOF(nOrig, nEntry);
5968   memmove(&aData[iHdr + (nData-nOrig)], &aData[iHdr], nOrig-iHdr);
5969 }
5970 
5971 #ifdef LSM_DEBUG_EXPENSIVE
assertRunInOrder(lsm_db * pDb,Segment * pSeg)5972 static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){
5973   Page *pPg = 0;
5974   Blob blob1 = {0, 0, 0, 0};
5975   Blob blob2 = {0, 0, 0, 0};
5976 
5977   lsmFsDbPageGet(pDb->pFS, pSeg, pSeg->iFirst, &pPg);
5978   while( pPg ){
5979     u8 *aData; int nData;
5980     Page *pNext;
5981 
5982     aData = lsmFsPageData(pPg, &nData);
5983     if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){
5984       int i;
5985       int nRec = pageGetNRec(aData, nData);
5986       for(i=0; i<nRec; i++){
5987         int iTopic1, iTopic2;
5988         pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i, &iTopic1, &blob1);
5989 
5990         if( i==0 && blob2.nData ){
5991           assert( sortedKeyCompare(
5992                 pDb->xCmp, iTopic2, blob2.pData, blob2.nData,
5993                 iTopic1, blob1.pData, blob1.nData
5994           )<0 );
5995         }
5996 
5997         if( i<(nRec-1) ){
5998           pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i+1, &iTopic2, &blob2);
5999           assert( sortedKeyCompare(
6000                 pDb->xCmp, iTopic1, blob1.pData, blob1.nData,
6001                 iTopic2, blob2.pData, blob2.nData
6002           )<0 );
6003         }
6004       }
6005     }
6006 
6007     lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
6008     lsmFsPageRelease(pPg);
6009     pPg = pNext;
6010   }
6011 
6012   sortedBlobFree(&blob1);
6013   sortedBlobFree(&blob2);
6014 }
6015 #endif
6016 
6017 #ifdef LSM_DEBUG_EXPENSIVE
6018 /*
6019 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6020 ** defined. Its only purpose is to evaluate various assert() statements to
6021 ** verify that the database is well formed in certain respects.
6022 **
6023 ** More specifically, it checks that the array pOne contains the required
6024 ** pointers to pTwo. Array pTwo must be a main array. pOne may be either a
6025 ** separators array or another main array. If pOne does not contain the
6026 ** correct set of pointers, an assert() statement fails.
6027 */
assertPointersOk(lsm_db * pDb,Segment * pOne,Segment * pTwo,int bRhs)6028 static int assertPointersOk(
6029   lsm_db *pDb,                    /* Database handle */
6030   Segment *pOne,                  /* Segment containing pointers */
6031   Segment *pTwo,                  /* Segment containing pointer targets */
6032   int bRhs                        /* True if pTwo may have been Gobble()d */
6033 ){
6034   int rc = LSM_OK;                /* Error code */
6035   SegmentPtr ptr1;                /* Iterates through pOne */
6036   SegmentPtr ptr2;                /* Iterates through pTwo */
6037   Pgno iPrev;
6038 
6039   assert( pOne && pTwo );
6040 
6041   memset(&ptr1, 0, sizeof(ptr1));
6042   memset(&ptr2, 0, sizeof(ptr1));
6043   ptr1.pSeg = pOne;
6044   ptr2.pSeg = pTwo;
6045   segmentPtrEndPage(pDb->pFS, &ptr1, 0, &rc);
6046   segmentPtrEndPage(pDb->pFS, &ptr2, 0, &rc);
6047 
6048   /* Check that the footer pointer of the first page of pOne points to
6049   ** the first page of pTwo. */
6050   iPrev = pTwo->iFirst;
6051   if( ptr1.iPtr!=iPrev && !bRhs ){
6052     assert( 0 );
6053   }
6054 
6055   if( rc==LSM_OK && ptr1.nCell>0 ){
6056     rc = segmentPtrLoadCell(&ptr1, 0);
6057   }
6058 
6059   while( rc==LSM_OK && ptr2.pPg ){
6060     Pgno iThis;
6061 
6062     /* Advance to the next page of segment pTwo that contains at least
6063     ** one cell. Break out of the loop if the iterator reaches EOF.  */
6064     do{
6065       rc = segmentPtrNextPage(&ptr2, 1);
6066       assert( rc==LSM_OK );
6067     }while( rc==LSM_OK && ptr2.pPg && ptr2.nCell==0 );
6068     if( rc!=LSM_OK || ptr2.pPg==0 ) break;
6069     iThis = lsmFsPageNumber(ptr2.pPg);
6070 
6071     if( (ptr2.flags & (PGFTR_SKIP_THIS_FLAG|SEGMENT_BTREE_FLAG))==0 ){
6072 
6073       /* Load the first cell in the array pTwo page. */
6074       rc = segmentPtrLoadCell(&ptr2, 0);
6075 
6076       /* Iterate forwards through pOne, searching for a key that matches the
6077       ** key ptr2.pKey/nKey. This key should have a pointer to the page that
6078       ** ptr2 currently points to. */
6079       while( rc==LSM_OK ){
6080         int res = rtTopic(ptr1.eType) - rtTopic(ptr2.eType);
6081         if( res==0 ){
6082           res = pDb->xCmp(ptr1.pKey, ptr1.nKey, ptr2.pKey, ptr2.nKey);
6083         }
6084 
6085         if( res<0 ){
6086           assert( bRhs || ptr1.iPtr+ptr1.iPgPtr==iPrev );
6087         }else if( res>0 ){
6088           assert( 0 );
6089         }else{
6090           assert( ptr1.iPtr+ptr1.iPgPtr==iThis );
6091           iPrev = iThis;
6092           break;
6093         }
6094 
6095         rc = segmentPtrAdvance(0, &ptr1, 0);
6096         if( ptr1.pPg==0 ){
6097           assert( 0 );
6098         }
6099       }
6100     }
6101   }
6102 
6103   segmentPtrReset(&ptr1, 0);
6104   segmentPtrReset(&ptr2, 0);
6105   return LSM_OK;
6106 }
6107 
6108 /*
6109 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6110 ** defined. Its only purpose is to evaluate various assert() statements to
6111 ** verify that the database is well formed in certain respects.
6112 **
6113 ** More specifically, it checks that the b-tree embedded in array pRun
6114 ** contains the correct keys. If not, an assert() fails.
6115 */
assertBtreeOk(lsm_db * pDb,Segment * pSeg)6116 static int assertBtreeOk(
6117   lsm_db *pDb,
6118   Segment *pSeg
6119 ){
6120   int rc = LSM_OK;                /* Return code */
6121   if( pSeg->iRoot ){
6122     Blob blob = {0, 0, 0};        /* Buffer used to cache overflow keys */
6123     FileSystem *pFS = pDb->pFS;   /* File system to read from */
6124     Page *pPg = 0;                /* Main run page */
6125     BtreeCursor *pCsr = 0;        /* Btree cursor */
6126 
6127     rc = btreeCursorNew(pDb, pSeg, &pCsr);
6128     if( rc==LSM_OK ){
6129       rc = btreeCursorFirst(pCsr);
6130     }
6131     if( rc==LSM_OK ){
6132       rc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pPg);
6133     }
6134 
6135     while( rc==LSM_OK ){
6136       Page *pNext;
6137       u8 *aData;
6138       int nData;
6139       int flags;
6140 
6141       rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
6142       lsmFsPageRelease(pPg);
6143       pPg = pNext;
6144       if( pPg==0 ) break;
6145       aData = fsPageData(pPg, &nData);
6146       flags = pageGetFlags(aData, nData);
6147       if( rc==LSM_OK
6148        && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
6149        && 0!=pageGetNRec(aData, nData)
6150       ){
6151         u8 *pKey;
6152         int nKey;
6153         int iTopic;
6154         pKey = pageGetKey(pSeg, pPg, 0, &iTopic, &nKey, &blob);
6155         assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
6156         assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
6157         rc = btreeCursorNext(pCsr);
6158       }
6159     }
6160     assert( rc!=LSM_OK || pCsr->pKey==0 );
6161 
6162     if( pPg ) lsmFsPageRelease(pPg);
6163 
6164     btreeCursorFree(pCsr);
6165     sortedBlobFree(&blob);
6166   }
6167 
6168   return rc;
6169 }
6170 #endif /* ifdef LSM_DEBUG_EXPENSIVE */
6171