1 /*
2 ** 2011-08-14
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** PAGE FORMAT:
14 **
15 **   The maximum page size is 65536 bytes.
16 **
17 **   Since all records are equal to or larger than 2 bytes in size, and
18 **   some space within the page is consumed by the page footer, there must
19 **   be less than 2^15 records on each page.
20 **
21 **   Each page ends with a footer that describes the pages contents. This
22 **   footer serves as similar purpose to the page header in an SQLite database.
23 **   A footer is used instead of a header because it makes it easier to
24 **   populate a new page based on a sorted list of key/value pairs.
25 **
26 **   The footer consists of the following values (starting at the end of
27 **   the page and continuing backwards towards the start). All values are
28 **   stored as unsigned big-endian integers.
29 **
30 **     * Number of records on page (2 bytes).
31 **     * Flags field (2 bytes).
32 **     * Left-hand pointer value (8 bytes).
33 **     * The starting offset of each record (2 bytes per record).
34 **
35 **   Records may span pages. Unless it happens to be an exact fit, the part
36 **   of the final record that starts on page X that does not fit on page X
37 **   is stored at the start of page (X+1). This means there may be pages where
38 **   (N==0). And on most pages the first record that starts on the page will
39 **   not start at byte offset 0. For example:
40 **
41 **      aaaaa bbbbb ccc <footer>    cc eeeee fffff g <footer>    gggg....
42 **
43 ** RECORD FORMAT:
44 **
45 **   The first byte of the record is a flags byte. It is a combination
46 **   of the following flags (defined in lsmInt.h):
47 **
48 **       LSM_START_DELETE
49 **       LSM_END_DELETE
50 **       LSM_POINT_DELETE
51 **       LSM_INSERT
52 **       LSM_SEPARATOR
53 **       LSM_SYSTEMKEY
54 **
55 **   Immediately following the type byte is a pointer to the smallest key
56 **   in the next file that is larger than the key in the current record. The
57 **   pointer is encoded as a varint. When added to the 32-bit page number
58 **   stored in the footer, it is the page number of the page that contains the
59 **   smallest key in the next sorted file that is larger than this key.
60 **
61 **   Next is the number of bytes in the key, encoded as a varint.
62 **
63 **   If the LSM_INSERT flag is set, the number of bytes in the value, as
64 **   a varint, is next.
65 **
66 **   Finally, the blob of data containing the key, and for LSM_INSERT
67 **   records, the value as well.
68 */
69 
70 #ifndef _LSM_INT_H
71 # include "lsmInt.h"
72 #endif
73 
74 #define LSM_LOG_STRUCTURE 0
75 #define LSM_LOG_DATA      0
76 
77 /*
78 ** Macros to help decode record types.
79 */
80 #define rtTopic(eType)       ((eType) & LSM_SYSTEMKEY)
81 #define rtIsDelete(eType)    (((eType) & 0x0F)==LSM_POINT_DELETE)
82 
83 #define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
84 #define rtIsWrite(eType)     (((eType) & LSM_INSERT)!=0)
85 #define rtIsSystem(eType)    (((eType) & LSM_SYSTEMKEY)!=0)
86 
87 /*
88 ** The following macros are used to access a page footer.
89 */
90 #define SEGMENT_NRECORD_OFFSET(pgsz)        ((pgsz) - 2)
91 #define SEGMENT_FLAGS_OFFSET(pgsz)          ((pgsz) - 2 - 2)
92 #define SEGMENT_POINTER_OFFSET(pgsz)        ((pgsz) - 2 - 2 - 8)
93 #define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 8 - 2 - (iCell)*2)
94 
95 #define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry-1)
96 
97 #define SEGMENT_BTREE_FLAG     0x0001
98 #define PGFTR_SKIP_NEXT_FLAG   0x0002
99 #define PGFTR_SKIP_THIS_FLAG   0x0004
100 
101 
102 #ifndef LSM_SEGMENTPTR_FREE_THRESHOLD
103 # define LSM_SEGMENTPTR_FREE_THRESHOLD 1024
104 #endif
105 
106 typedef struct SegmentPtr SegmentPtr;
107 typedef struct LsmBlob LsmBlob;
108 
109 struct LsmBlob {
110   lsm_env *pEnv;
111   void *pData;
112   int nData;
113   int nAlloc;
114 };
115 
116 /*
117 ** A SegmentPtr object may be used for one of two purposes:
118 **
119 **   * To iterate and/or seek within a single Segment (the combination of a
120 **     main run and an optional sorted run).
121 **
122 **   * To iterate through the separators array of a segment.
123 */
124 struct SegmentPtr {
125   Level *pLevel;                /* Level object segment is part of */
126   Segment *pSeg;                /* Segment to access */
127 
128   /* Current page. See segmentPtrLoadPage(). */
129   Page *pPg;                    /* Current page */
130   u16 flags;                    /* Copy of page flags field */
131   int nCell;                    /* Number of cells on pPg */
132   LsmPgno iPtr;                 /* Base cascade pointer */
133 
134   /* Current cell. See segmentPtrLoadCell() */
135   int iCell;                    /* Current record within page pPg */
136   int eType;                    /* Type of current record */
137   LsmPgno iPgPtr;               /* Cascade pointer offset */
138   void *pKey; int nKey;         /* Key associated with current record */
139   void *pVal; int nVal;         /* Current record value (eType==WRITE only) */
140 
141   /* Blobs used to allocate buffers for pKey and pVal as required */
142   LsmBlob blob1;
143   LsmBlob blob2;
144 };
145 
146 /*
147 ** Used to iterate through the keys stored in a b-tree hierarchy from start
148 ** to finish. Only First() and Next() operations are required.
149 **
150 **   btreeCursorNew()
151 **   btreeCursorFirst()
152 **   btreeCursorNext()
153 **   btreeCursorFree()
154 **   btreeCursorPosition()
155 **   btreeCursorRestore()
156 */
157 typedef struct BtreePg BtreePg;
158 typedef struct BtreeCursor BtreeCursor;
159 struct BtreePg {
160   Page *pPage;
161   int iCell;
162 };
163 struct BtreeCursor {
164   Segment *pSeg;                  /* Iterate through this segments btree */
165   FileSystem *pFS;                /* File system to read pages from */
166   int nDepth;                     /* Allocated size of aPg[] */
167   int iPg;                        /* Current entry in aPg[]. -1 -> EOF. */
168   BtreePg *aPg;                   /* Pages from root to current location */
169 
170   /* Cache of current entry. pKey==0 for EOF. */
171   void *pKey;
172   int nKey;
173   int eType;
174   LsmPgno iPtr;
175 
176   /* Storage for key, if not local */
177   LsmBlob blob;
178 };
179 
180 
181 /*
182 ** A cursor used for merged searches or iterations through up to one
183 ** Tree structure and any number of sorted files.
184 **
185 **   lsmMCursorNew()
186 **   lsmMCursorSeek()
187 **   lsmMCursorNext()
188 **   lsmMCursorPrev()
189 **   lsmMCursorFirst()
190 **   lsmMCursorLast()
191 **   lsmMCursorKey()
192 **   lsmMCursorValue()
193 **   lsmMCursorValid()
194 **
195 ** iFree:
196 **   This variable is only used by cursors providing input data for a
197 **   new top-level segment. Such cursors only ever iterate forwards, not
198 **   backwards.
199 */
200 struct MultiCursor {
201   lsm_db *pDb;                    /* Connection that owns this cursor */
202   MultiCursor *pNext;             /* Next cursor owned by connection pDb */
203   int flags;                      /* Mask of CURSOR_XXX flags */
204 
205   int eType;                      /* Cache of current key type */
206   LsmBlob key;                    /* Cache of current key (or NULL) */
207   LsmBlob val;                    /* Cache of current value */
208 
209   /* All the component cursors: */
210   TreeCursor *apTreeCsr[2];       /* Up to two tree cursors */
211   int iFree;                      /* Next element of free-list (-ve for eof) */
212   SegmentPtr *aPtr;               /* Array of segment pointers */
213   int nPtr;                       /* Size of array aPtr[] */
214   BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */
215 
216   /* Comparison results */
217   int nTree;                      /* Size of aTree[] array */
218   int *aTree;                     /* Array of comparison results */
219 
220   /* Used by cursors flushing the in-memory tree only */
221   void *pSystemVal;               /* Pointer to buffer to free */
222 
223   /* Used by worker cursors only */
224   LsmPgno *pPrevMergePtr;
225 };
226 
227 /*
228 ** The following constants are used to assign integers to each component
229 ** cursor of a multi-cursor.
230 */
231 #define CURSOR_DATA_TREE0     0   /* Current tree cursor (apTreeCsr[0]) */
232 #define CURSOR_DATA_TREE1     1   /* The "old" tree, if any (apTreeCsr[1]) */
233 #define CURSOR_DATA_SYSTEM    2   /* Free-list entries (new-toplevel only) */
234 #define CURSOR_DATA_SEGMENT   3   /* First segment pointer (aPtr[0]) */
235 
236 /*
237 ** CURSOR_IGNORE_DELETE
238 **   If set, this cursor will not visit SORTED_DELETE keys.
239 **
240 ** CURSOR_FLUSH_FREELIST
241 **   This cursor is being used to create a new toplevel. It should also
242 **   iterate through the contents of the in-memory free block list.
243 **
244 ** CURSOR_IGNORE_SYSTEM
245 **   If set, this cursor ignores system keys.
246 **
247 ** CURSOR_NEXT_OK
248 **   Set if it is Ok to call lsm_csr_next().
249 **
250 ** CURSOR_PREV_OK
251 **   Set if it is Ok to call lsm_csr_prev().
252 **
253 ** CURSOR_READ_SEPARATORS
254 **   Set if this cursor should visit the separator keys in segment
255 **   aPtr[nPtr-1].
256 **
257 ** CURSOR_SEEK_EQ
258 **   Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
259 **   The key and value are stored in MultiCursor.key and MultiCursor.val
260 **   respectively.
261 */
262 #define CURSOR_IGNORE_DELETE    0x00000001
263 #define CURSOR_FLUSH_FREELIST   0x00000002
264 #define CURSOR_IGNORE_SYSTEM    0x00000010
265 #define CURSOR_NEXT_OK          0x00000020
266 #define CURSOR_PREV_OK          0x00000040
267 #define CURSOR_READ_SEPARATORS  0x00000080
268 #define CURSOR_SEEK_EQ          0x00000100
269 
270 typedef struct MergeWorker MergeWorker;
271 typedef struct Hierarchy Hierarchy;
272 
273 struct Hierarchy {
274   Page **apHier;
275   int nHier;
276 };
277 
278 /*
279 ** aSave:
280 **   When mergeWorkerNextPage() is called to advance to the next page in
281 **   the output segment, if the bStore flag for an element of aSave[] is
282 **   true, it is cleared and the corresponding iPgno value is set to the
283 **   page number of the page just completed.
284 **
285 **   aSave[0] is used to record the pointer value to be pushed into the
286 **   b-tree hierarchy. aSave[1] is used to save the page number of the
287 **   page containing the indirect key most recently written to the b-tree.
288 **   see mergeWorkerPushHierarchy() for details.
289 */
290 struct MergeWorker {
291   lsm_db *pDb;                    /* Database handle */
292   Level *pLevel;                  /* Worker snapshot Level being merged */
293   MultiCursor *pCsr;              /* Cursor to read new segment contents from */
294   int bFlush;                     /* True if this is an in-memory tree flush */
295   Hierarchy hier;                 /* B-tree hierarchy under construction */
296   Page *pPage;                    /* Current output page */
297   int nWork;                      /* Number of calls to mergeWorkerNextPage() */
298   LsmPgno *aGobble;               /* Gobble point for each input segment */
299 
300   LsmPgno iIndirect;
301   struct SavedPgno {
302     LsmPgno iPgno;
303     int bStore;
304   } aSave[2];
305 };
306 
307 #ifdef LSM_DEBUG_EXPENSIVE
308 static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
309 static int assertBtreeOk(lsm_db *, Segment *);
310 static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
311 #else
312 #define assertRunInOrder(x,y)
313 #define assertBtreeOk(x,y)
314 #endif
315 
316 
317 struct FilePage { u8 *aData; int nData; };
fsPageData(Page * pPg,int * pnData)318 static u8 *fsPageData(Page *pPg, int *pnData){
319   *pnData = ((struct FilePage *)(pPg))->nData;
320   return ((struct FilePage *)(pPg))->aData;
321 }
322 /*UNUSED static u8 *fsPageDataPtr(Page *pPg){
323   return ((struct FilePage *)(pPg))->aData;
324 }*/
325 
326 /*
327 ** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
328 */
lsmPutU16(u8 * aOut,u16 nVal)329 void lsmPutU16(u8 *aOut, u16 nVal){
330   aOut[0] = (u8)((nVal>>8) & 0xFF);
331   aOut[1] = (u8)(nVal & 0xFF);
332 }
333 
lsmPutU32(u8 * aOut,u32 nVal)334 void lsmPutU32(u8 *aOut, u32 nVal){
335   aOut[0] = (u8)((nVal>>24) & 0xFF);
336   aOut[1] = (u8)((nVal>>16) & 0xFF);
337   aOut[2] = (u8)((nVal>> 8) & 0xFF);
338   aOut[3] = (u8)((nVal    ) & 0xFF);
339 }
340 
lsmGetU16(u8 * aOut)341 int lsmGetU16(u8 *aOut){
342   return (aOut[0] << 8) + aOut[1];
343 }
344 
lsmGetU32(u8 * aOut)345 u32 lsmGetU32(u8 *aOut){
346   return ((u32)aOut[0] << 24)
347        + ((u32)aOut[1] << 16)
348        + ((u32)aOut[2] << 8)
349        + ((u32)aOut[3]);
350 }
351 
lsmGetU64(u8 * aOut)352 u64 lsmGetU64(u8 *aOut){
353   return ((u64)aOut[0] << 56)
354        + ((u64)aOut[1] << 48)
355        + ((u64)aOut[2] << 40)
356        + ((u64)aOut[3] << 32)
357        + ((u64)aOut[4] << 24)
358        + ((u32)aOut[5] << 16)
359        + ((u32)aOut[6] << 8)
360        + ((u32)aOut[7]);
361 }
362 
lsmPutU64(u8 * aOut,u64 nVal)363 void lsmPutU64(u8 *aOut, u64 nVal){
364   aOut[0] = (u8)((nVal>>56) & 0xFF);
365   aOut[1] = (u8)((nVal>>48) & 0xFF);
366   aOut[2] = (u8)((nVal>>40) & 0xFF);
367   aOut[3] = (u8)((nVal>>32) & 0xFF);
368   aOut[4] = (u8)((nVal>>24) & 0xFF);
369   aOut[5] = (u8)((nVal>>16) & 0xFF);
370   aOut[6] = (u8)((nVal>> 8) & 0xFF);
371   aOut[7] = (u8)((nVal    ) & 0xFF);
372 }
373 
sortedBlobGrow(lsm_env * pEnv,LsmBlob * pBlob,int nData)374 static int sortedBlobGrow(lsm_env *pEnv, LsmBlob *pBlob, int nData){
375   assert( pBlob->pEnv==pEnv || (pBlob->pEnv==0 && pBlob->pData==0) );
376   if( pBlob->nAlloc<nData ){
377     pBlob->pData = lsmReallocOrFree(pEnv, pBlob->pData, nData);
378     if( !pBlob->pData ) return LSM_NOMEM_BKPT;
379     pBlob->nAlloc = nData;
380     pBlob->pEnv = pEnv;
381   }
382   return LSM_OK;
383 }
384 
sortedBlobSet(lsm_env * pEnv,LsmBlob * pBlob,void * pData,int nData)385 static int sortedBlobSet(lsm_env *pEnv, LsmBlob *pBlob, void *pData, int nData){
386   if( sortedBlobGrow(pEnv, pBlob, nData) ) return LSM_NOMEM;
387   memcpy(pBlob->pData, pData, nData);
388   pBlob->nData = nData;
389   return LSM_OK;
390 }
391 
392 #if 0
393 static int sortedBlobCopy(LsmBlob *pDest, LsmBlob *pSrc){
394   return sortedBlobSet(pDest, pSrc->pData, pSrc->nData);
395 }
396 #endif
397 
sortedBlobFree(LsmBlob * pBlob)398 static void sortedBlobFree(LsmBlob *pBlob){
399   assert( pBlob->pEnv || pBlob->pData==0 );
400   if( pBlob->pData ) lsmFree(pBlob->pEnv, pBlob->pData);
401   memset(pBlob, 0, sizeof(LsmBlob));
402 }
403 
sortedReadData(Segment * pSeg,Page * pPg,int iOff,int nByte,void ** ppData,LsmBlob * pBlob)404 static int sortedReadData(
405   Segment *pSeg,
406   Page *pPg,
407   int iOff,
408   int nByte,
409   void **ppData,
410   LsmBlob *pBlob
411 ){
412   int rc = LSM_OK;
413   int iEnd;
414   int nData;
415   int nCell;
416   u8 *aData;
417 
418   aData = fsPageData(pPg, &nData);
419   nCell = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
420   iEnd = SEGMENT_EOF(nData, nCell);
421   assert( iEnd>0 && iEnd<nData );
422 
423   if( iOff+nByte<=iEnd ){
424     *ppData = (void *)&aData[iOff];
425   }else{
426     int nRem = nByte;
427     int i = iOff;
428     u8 *aDest;
429 
430     /* Make sure the blob is big enough to store the value being loaded. */
431     rc = sortedBlobGrow(lsmPageEnv(pPg), pBlob, nByte);
432     if( rc!=LSM_OK ) return rc;
433     pBlob->nData = nByte;
434     aDest = (u8 *)pBlob->pData;
435     *ppData = pBlob->pData;
436 
437     /* Increment the pointer pages ref-count. */
438     lsmFsPageRef(pPg);
439 
440     while( rc==LSM_OK ){
441       Page *pNext;
442       int flags;
443 
444       /* Copy data from pPg into the output buffer. */
445       int nCopy = LSM_MIN(nRem, iEnd-i);
446       if( nCopy>0 ){
447         memcpy(&aDest[nByte-nRem], &aData[i], nCopy);
448         nRem -= nCopy;
449         i += nCopy;
450         assert( nRem==0 || i==iEnd );
451       }
452       assert( nRem>=0 );
453       if( nRem==0 ) break;
454       i -= iEnd;
455 
456       /* Grab the next page in the segment */
457 
458       do {
459         rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
460         if( rc==LSM_OK && pNext==0 ){
461           rc = LSM_CORRUPT_BKPT;
462         }
463         if( rc ) break;
464         lsmFsPageRelease(pPg);
465         pPg = pNext;
466         aData = fsPageData(pPg, &nData);
467         flags = lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
468       }while( flags&SEGMENT_BTREE_FLAG );
469 
470       iEnd = SEGMENT_EOF(nData, lsmGetU16(&aData[nData-2]));
471       assert( iEnd>0 && iEnd<nData );
472     }
473 
474     lsmFsPageRelease(pPg);
475   }
476 
477   return rc;
478 }
479 
pageGetNRec(u8 * aData,int nData)480 static int pageGetNRec(u8 *aData, int nData){
481   return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
482 }
483 
pageGetPtr(u8 * aData,int nData)484 static LsmPgno pageGetPtr(u8 *aData, int nData){
485   return (LsmPgno)lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
486 }
487 
pageGetFlags(u8 * aData,int nData)488 static int pageGetFlags(u8 *aData, int nData){
489   return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
490 }
491 
pageGetCell(u8 * aData,int nData,int iCell)492 static u8 *pageGetCell(u8 *aData, int nData, int iCell){
493   return &aData[lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, iCell)])];
494 }
495 
496 /*
497 ** Return the number of cells on page pPg.
498 */
pageObjGetNRec(Page * pPg)499 static int pageObjGetNRec(Page *pPg){
500   int nData;
501   u8 *aData = lsmFsPageData(pPg, &nData);
502   return pageGetNRec(aData, nData);
503 }
504 
505 /*
506 ** Return the decoded (possibly relative) pointer value stored in cell
507 ** iCell from page aData/nData.
508 */
pageGetRecordPtr(u8 * aData,int nData,int iCell)509 static LsmPgno pageGetRecordPtr(u8 *aData, int nData, int iCell){
510   LsmPgno iRet;                   /* Return value */
511   u8 *aCell;                      /* Pointer to cell iCell */
512 
513   assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
514   aCell = pageGetCell(aData, nData, iCell);
515   lsmVarintGet64(&aCell[1], &iRet);
516   return iRet;
517 }
518 
pageGetKey(Segment * pSeg,Page * pPg,int iCell,int * piTopic,int * pnKey,LsmBlob * pBlob)519 static u8 *pageGetKey(
520   Segment *pSeg,                  /* Segment pPg belongs to */
521   Page *pPg,                      /* Page to read from */
522   int iCell,                      /* Index of cell on page to read */
523   int *piTopic,                   /* OUT: Topic associated with this key */
524   int *pnKey,                     /* OUT: Size of key in bytes */
525   LsmBlob *pBlob                  /* If required, use this for dynamic memory */
526 ){
527   u8 *pKey;
528   int nDummy;
529   int eType;
530   u8 *aData;
531   int nData;
532 
533   aData = fsPageData(pPg, &nData);
534 
535   assert( !(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
536   assert( iCell<pageGetNRec(aData, nData) );
537 
538   pKey = pageGetCell(aData, nData, iCell);
539   eType = *pKey++;
540   pKey += lsmVarintGet32(pKey, &nDummy);
541   pKey += lsmVarintGet32(pKey, pnKey);
542   if( rtIsWrite(eType) ){
543     pKey += lsmVarintGet32(pKey, &nDummy);
544   }
545   *piTopic = rtTopic(eType);
546 
547   sortedReadData(pSeg, pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob);
548   return pKey;
549 }
550 
pageGetKeyCopy(lsm_env * pEnv,Segment * pSeg,Page * pPg,int iCell,int * piTopic,LsmBlob * pBlob)551 static int pageGetKeyCopy(
552   lsm_env *pEnv,                  /* Environment handle */
553   Segment *pSeg,                  /* Segment pPg belongs to */
554   Page *pPg,                      /* Page to read from */
555   int iCell,                      /* Index of cell on page to read */
556   int *piTopic,                   /* OUT: Topic associated with this key */
557   LsmBlob *pBlob                  /* If required, use this for dynamic memory */
558 ){
559   int rc = LSM_OK;
560   int nKey;
561   u8 *aKey;
562 
563   aKey = pageGetKey(pSeg, pPg, iCell, piTopic, &nKey, pBlob);
564   assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData );
565   if( (void *)aKey!=pBlob->pData ){
566     rc = sortedBlobSet(pEnv, pBlob, aKey, nKey);
567   }
568 
569   return rc;
570 }
571 
pageGetBtreeRef(Page * pPg,int iKey)572 static LsmPgno pageGetBtreeRef(Page *pPg, int iKey){
573   LsmPgno iRef;
574   u8 *aData;
575   int nData;
576   u8 *aCell;
577 
578   aData = fsPageData(pPg, &nData);
579   aCell = pageGetCell(aData, nData, iKey);
580   assert( aCell[0]==0 );
581   aCell++;
582   aCell += lsmVarintGet64(aCell, &iRef);
583   lsmVarintGet64(aCell, &iRef);
584   assert( iRef>0 );
585   return iRef;
586 }
587 
588 #define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i)))
589 #define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))
590 
pageGetBtreeKey(Segment * pSeg,Page * pPg,int iKey,LsmPgno * piPtr,int * piTopic,void ** ppKey,int * pnKey,LsmBlob * pBlob)591 static int pageGetBtreeKey(
592   Segment *pSeg,                  /* Segment page pPg belongs to */
593   Page *pPg,
594   int iKey,
595   LsmPgno *piPtr,
596   int *piTopic,
597   void **ppKey,
598   int *pnKey,
599   LsmBlob *pBlob
600 ){
601   u8 *aData;
602   int nData;
603   u8 *aCell;
604   int eType;
605 
606   aData = fsPageData(pPg, &nData);
607   assert( SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData) );
608   assert( iKey>=0 && iKey<pageGetNRec(aData, nData) );
609 
610   aCell = pageGetCell(aData, nData, iKey);
611   eType = *aCell++;
612   aCell += GETVARINT64(aCell, *piPtr);
613 
614   if( eType==0 ){
615     int rc;
616     LsmPgno iRef;               /* Page number of referenced page */
617     Page *pRef;
618     aCell += GETVARINT64(aCell, iRef);
619     rc = lsmFsDbPageGet(lsmPageFS(pPg), pSeg, iRef, &pRef);
620     if( rc!=LSM_OK ) return rc;
621     pageGetKeyCopy(lsmPageEnv(pPg), pSeg, pRef, 0, &eType, pBlob);
622     lsmFsPageRelease(pRef);
623     *ppKey = pBlob->pData;
624     *pnKey = pBlob->nData;
625   }else{
626     aCell += GETVARINT32(aCell, *pnKey);
627     *ppKey = aCell;
628   }
629   if( piTopic ) *piTopic = rtTopic(eType);
630 
631   return LSM_OK;
632 }
633 
btreeCursorLoadKey(BtreeCursor * pCsr)634 static int btreeCursorLoadKey(BtreeCursor *pCsr){
635   int rc = LSM_OK;
636   if( pCsr->iPg<0 ){
637     pCsr->pKey = 0;
638     pCsr->nKey = 0;
639     pCsr->eType = 0;
640   }else{
641     LsmPgno dummy;
642     int iPg = pCsr->iPg;
643     int iCell = pCsr->aPg[iPg].iCell;
644     while( iCell<0 && (--iPg)>=0 ){
645       iCell = pCsr->aPg[iPg].iCell-1;
646     }
647     if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;
648 
649     rc = pageGetBtreeKey(
650         pCsr->pSeg,
651         pCsr->aPg[iPg].pPage, iCell,
652         &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
653     );
654     pCsr->eType |= LSM_SEPARATOR;
655   }
656 
657   return rc;
658 }
659 
btreeCursorPtr(u8 * aData,int nData,int iCell)660 static int btreeCursorPtr(u8 *aData, int nData, int iCell){
661   int nCell;
662 
663   nCell = pageGetNRec(aData, nData);
664   if( iCell>=nCell ){
665     return (int)pageGetPtr(aData, nData);
666   }
667   return (int)pageGetRecordPtr(aData, nData, iCell);
668 }
669 
btreeCursorNext(BtreeCursor * pCsr)670 static int btreeCursorNext(BtreeCursor *pCsr){
671   int rc = LSM_OK;
672 
673   BtreePg *pPg = &pCsr->aPg[pCsr->iPg];
674   int nCell;
675   u8 *aData;
676   int nData;
677 
678   assert( pCsr->iPg>=0 );
679   assert( pCsr->iPg==pCsr->nDepth-1 );
680 
681   aData = fsPageData(pPg->pPage, &nData);
682   nCell = pageGetNRec(aData, nData);
683   assert( pPg->iCell<=nCell );
684   pPg->iCell++;
685   if( pPg->iCell==nCell ){
686     LsmPgno iLoad;
687 
688     /* Up to parent. */
689     lsmFsPageRelease(pPg->pPage);
690     pPg->pPage = 0;
691     pCsr->iPg--;
692     while( pCsr->iPg>=0 ){
693       pPg = &pCsr->aPg[pCsr->iPg];
694       aData = fsPageData(pPg->pPage, &nData);
695       if( pPg->iCell<pageGetNRec(aData, nData) ) break;
696       lsmFsPageRelease(pPg->pPage);
697       pCsr->iPg--;
698     }
699 
700     /* Read the key */
701     rc = btreeCursorLoadKey(pCsr);
702 
703     /* Unless the cursor is at EOF, descend to cell -1 (yes, negative one) of
704     ** the left-most most descendent. */
705     if( pCsr->iPg>=0 ){
706       pCsr->aPg[pCsr->iPg].iCell++;
707 
708       iLoad = btreeCursorPtr(aData, nData, pPg->iCell);
709       do {
710         Page *pLoad;
711         pCsr->iPg++;
712         rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pLoad);
713         pCsr->aPg[pCsr->iPg].pPage = pLoad;
714         pCsr->aPg[pCsr->iPg].iCell = 0;
715         if( rc==LSM_OK ){
716           if( pCsr->iPg==(pCsr->nDepth-1) ) break;
717           aData = fsPageData(pLoad, &nData);
718           iLoad = btreeCursorPtr(aData, nData, 0);
719         }
720       }while( rc==LSM_OK && pCsr->iPg<(pCsr->nDepth-1) );
721       pCsr->aPg[pCsr->iPg].iCell = -1;
722     }
723 
724   }else{
725     rc = btreeCursorLoadKey(pCsr);
726   }
727 
728   if( rc==LSM_OK && pCsr->iPg>=0 ){
729     aData = fsPageData(pCsr->aPg[pCsr->iPg].pPage, &nData);
730     pCsr->iPtr = btreeCursorPtr(aData, nData, pCsr->aPg[pCsr->iPg].iCell+1);
731   }
732 
733   return rc;
734 }
735 
btreeCursorFree(BtreeCursor * pCsr)736 static void btreeCursorFree(BtreeCursor *pCsr){
737   if( pCsr ){
738     int i;
739     lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
740     for(i=0; i<=pCsr->iPg; i++){
741       lsmFsPageRelease(pCsr->aPg[i].pPage);
742     }
743     sortedBlobFree(&pCsr->blob);
744     lsmFree(pEnv, pCsr->aPg);
745     lsmFree(pEnv, pCsr);
746   }
747 }
748 
btreeCursorFirst(BtreeCursor * pCsr)749 static int btreeCursorFirst(BtreeCursor *pCsr){
750   int rc;
751 
752   Page *pPg = 0;
753   FileSystem *pFS = pCsr->pFS;
754   int iPg = (int)pCsr->pSeg->iRoot;
755 
756   do {
757     rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg);
758     assert( (rc==LSM_OK)==(pPg!=0) );
759     if( rc==LSM_OK ){
760       u8 *aData;
761       int nData;
762       int flags;
763 
764       aData = fsPageData(pPg, &nData);
765       flags = pageGetFlags(aData, nData);
766       if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
767 
768       if( (pCsr->nDepth % 8)==0 ){
769         int nNew = pCsr->nDepth + 8;
770         pCsr->aPg = (BtreePg *)lsmReallocOrFreeRc(
771             lsmFsEnv(pFS), pCsr->aPg, sizeof(BtreePg) * nNew, &rc
772         );
773         if( rc==LSM_OK ){
774           memset(&pCsr->aPg[pCsr->nDepth], 0, sizeof(BtreePg) * 8);
775         }
776       }
777 
778       if( rc==LSM_OK ){
779         assert( pCsr->aPg[pCsr->nDepth].iCell==0 );
780         pCsr->aPg[pCsr->nDepth].pPage = pPg;
781         pCsr->nDepth++;
782         iPg = (int)pageGetRecordPtr(aData, nData, 0);
783       }
784     }
785   }while( rc==LSM_OK );
786   lsmFsPageRelease(pPg);
787   pCsr->iPg = pCsr->nDepth-1;
788 
789   if( rc==LSM_OK && pCsr->nDepth ){
790     pCsr->aPg[pCsr->iPg].iCell = -1;
791     rc = btreeCursorNext(pCsr);
792   }
793 
794   return rc;
795 }
796 
btreeCursorPosition(BtreeCursor * pCsr,MergeInput * p)797 static void btreeCursorPosition(BtreeCursor *pCsr, MergeInput *p){
798   if( pCsr->iPg>=0 ){
799     p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
800     p->iCell = ((pCsr->aPg[pCsr->iPg].iCell + 1) << 8) + pCsr->nDepth;
801   }else{
802     p->iPg = 0;
803     p->iCell = 0;
804   }
805 }
806 
btreeCursorSplitkey(BtreeCursor * pCsr,MergeInput * p)807 static void btreeCursorSplitkey(BtreeCursor *pCsr, MergeInput *p){
808   int iCell = pCsr->aPg[pCsr->iPg].iCell;
809   if( iCell>=0 ){
810     p->iCell = iCell;
811     p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
812   }else{
813     int i;
814     for(i=pCsr->iPg-1; i>=0; i--){
815       if( pCsr->aPg[i].iCell>0 ) break;
816     }
817     assert( i>=0 );
818     p->iCell = pCsr->aPg[i].iCell-1;
819     p->iPg = lsmFsPageNumber(pCsr->aPg[i].pPage);
820   }
821 }
822 
sortedKeyCompare(int (* xCmp)(void *,int,void *,int),int iLhsTopic,void * pLhsKey,int nLhsKey,int iRhsTopic,void * pRhsKey,int nRhsKey)823 static int sortedKeyCompare(
824   int (*xCmp)(void *, int, void *, int),
825   int iLhsTopic, void *pLhsKey, int nLhsKey,
826   int iRhsTopic, void *pRhsKey, int nRhsKey
827 ){
828   int res = iLhsTopic - iRhsTopic;
829   if( res==0 ){
830     res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey);
831   }
832   return res;
833 }
834 
btreeCursorRestore(BtreeCursor * pCsr,int (* xCmp)(void *,int,void *,int),MergeInput * p)835 static int btreeCursorRestore(
836   BtreeCursor *pCsr,
837   int (*xCmp)(void *, int, void *, int),
838   MergeInput *p
839 ){
840   int rc = LSM_OK;
841 
842   if( p->iPg ){
843     lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
844     int iCell;                    /* Current cell number on leaf page */
845     LsmPgno iLeaf;                /* Page number of current leaf page */
846     int nDepth;                   /* Depth of b-tree structure */
847     Segment *pSeg = pCsr->pSeg;
848 
849     /* Decode the MergeInput structure */
850     iLeaf = p->iPg;
851     nDepth = (p->iCell & 0x00FF);
852     iCell = (p->iCell >> 8) - 1;
853 
854     /* Allocate the BtreeCursor.aPg[] array */
855     assert( pCsr->aPg==0 );
856     pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc);
857 
858     /* Populate the last entry of the aPg[] array */
859     if( rc==LSM_OK ){
860       Page **pp = &pCsr->aPg[nDepth-1].pPage;
861       pCsr->iPg = nDepth-1;
862       pCsr->nDepth = nDepth;
863       pCsr->aPg[pCsr->iPg].iCell = iCell;
864       rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp);
865     }
866 
867     /* Populate any other aPg[] array entries */
868     if( rc==LSM_OK && nDepth>1 ){
869       LsmBlob blob = {0,0,0};
870       void *pSeek;
871       int nSeek;
872       int iTopicSeek;
873       int iPg = 0;
874       int iLoad = (int)pSeg->iRoot;
875       Page *pPg = pCsr->aPg[nDepth-1].pPage;
876 
877       if( pageObjGetNRec(pPg)==0 ){
878         /* This can happen when pPg is the right-most leaf in the b-tree.
879         ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
880         ** greater than any real key.  */
881         assert( iCell==-1 );
882         iTopicSeek = 1000;
883         pSeek = 0;
884         nSeek = 0;
885       }else{
886         LsmPgno dummy;
887         rc = pageGetBtreeKey(pSeg, pPg,
888             0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
889         );
890       }
891 
892       do {
893         Page *pPg2;
894         rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg2);
895         assert( rc==LSM_OK || pPg2==0 );
896         if( rc==LSM_OK ){
897           u8 *aData;                  /* Buffer containing page data */
898           int nData;                  /* Size of aData[] in bytes */
899           int iMin;
900           int iMax;
901           int iCell2;
902 
903           aData = fsPageData(pPg2, &nData);
904           assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
905 
906           iLoad = (int)pageGetPtr(aData, nData);
907           iCell2 = pageGetNRec(aData, nData);
908           iMax = iCell2-1;
909           iMin = 0;
910 
911           while( iMax>=iMin ){
912             int iTry = (iMin+iMax)/2;
913             void *pKey; int nKey;         /* Key for cell iTry */
914             int iTopic;                   /* Topic for key pKeyT/nKeyT */
915             LsmPgno iPtr;                 /* Pointer for cell iTry */
916             int res;                      /* (pSeek - pKeyT) */
917 
918             rc = pageGetBtreeKey(
919                 pSeg, pPg2, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob
920             );
921             if( rc!=LSM_OK ) break;
922 
923             res = sortedKeyCompare(
924                 xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
925             );
926             assert( res!=0 );
927 
928             if( res<0 ){
929               iLoad = (int)iPtr;
930               iCell2 = iTry;
931               iMax = iTry-1;
932             }else{
933               iMin = iTry+1;
934             }
935           }
936 
937           pCsr->aPg[iPg].pPage = pPg2;
938           pCsr->aPg[iPg].iCell = iCell2;
939           iPg++;
940           assert( iPg!=nDepth-1
941                || lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf
942           );
943         }
944       }while( rc==LSM_OK && iPg<(nDepth-1) );
945       sortedBlobFree(&blob);
946     }
947 
948     /* Load the current key and pointer */
949     if( rc==LSM_OK ){
950       BtreePg *pBtreePg;
951       u8 *aData;
952       int nData;
953 
954       pBtreePg = &pCsr->aPg[pCsr->iPg];
955       aData = fsPageData(pBtreePg->pPage, &nData);
956       pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
957       if( pBtreePg->iCell<0 ){
958         LsmPgno dummy;
959         int i;
960         for(i=pCsr->iPg-1; i>=0; i--){
961           if( pCsr->aPg[i].iCell>0 ) break;
962         }
963         assert( i>=0 );
964         rc = pageGetBtreeKey(pSeg,
965             pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
966             &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
967         );
968         pCsr->eType |= LSM_SEPARATOR;
969 
970       }else{
971         rc = btreeCursorLoadKey(pCsr);
972       }
973     }
974   }
975   return rc;
976 }
977 
btreeCursorNew(lsm_db * pDb,Segment * pSeg,BtreeCursor ** ppCsr)978 static int btreeCursorNew(
979   lsm_db *pDb,
980   Segment *pSeg,
981   BtreeCursor **ppCsr
982 ){
983   int rc = LSM_OK;
984   BtreeCursor *pCsr;
985 
986   assert( pSeg->iRoot );
987   pCsr = lsmMallocZeroRc(pDb->pEnv, sizeof(BtreeCursor), &rc);
988   if( pCsr ){
989     pCsr->pFS = pDb->pFS;
990     pCsr->pSeg = pSeg;
991     pCsr->iPg = -1;
992   }
993 
994   *ppCsr = pCsr;
995   return rc;
996 }
997 
segmentPtrSetPage(SegmentPtr * pPtr,Page * pNext)998 static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
999   lsmFsPageRelease(pPtr->pPg);
1000   if( pNext ){
1001     int nData;
1002     u8 *aData = fsPageData(pNext, &nData);
1003     pPtr->nCell = pageGetNRec(aData, nData);
1004     pPtr->flags = (u16)pageGetFlags(aData, nData);
1005     pPtr->iPtr = pageGetPtr(aData, nData);
1006   }
1007   pPtr->pPg = pNext;
1008 }
1009 
1010 /*
1011 ** Load a new page into the SegmentPtr object pPtr.
1012 */
segmentPtrLoadPage(FileSystem * pFS,SegmentPtr * pPtr,int iNew)1013 static int segmentPtrLoadPage(
1014   FileSystem *pFS,
1015   SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
1016   int iNew                       /* Page number of new page */
1017 ){
1018   Page *pPg = 0;                 /* The new page */
1019   int rc;                        /* Return Code */
1020 
1021   rc = lsmFsDbPageGet(pFS, pPtr->pSeg, iNew, &pPg);
1022   assert( rc==LSM_OK || pPg==0 );
1023   segmentPtrSetPage(pPtr, pPg);
1024 
1025   return rc;
1026 }
1027 
segmentPtrReadData(SegmentPtr * pPtr,int iOff,int nByte,void ** ppData,LsmBlob * pBlob)1028 static int segmentPtrReadData(
1029   SegmentPtr *pPtr,
1030   int iOff,
1031   int nByte,
1032   void **ppData,
1033   LsmBlob *pBlob
1034 ){
1035   return sortedReadData(pPtr->pSeg, pPtr->pPg, iOff, nByte, ppData, pBlob);
1036 }
1037 
segmentPtrNextPage(SegmentPtr * pPtr,int eDir)1038 static int segmentPtrNextPage(
1039   SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
1040   int eDir                       /* +1 for next(), -1 for prev() */
1041 ){
1042   Page *pNext;                   /* New page to load */
1043   int rc;                        /* Return code */
1044 
1045   assert( eDir==1 || eDir==-1 );
1046   assert( pPtr->pPg );
1047   assert( pPtr->pSeg || eDir>0 );
1048 
1049   rc = lsmFsDbPageNext(pPtr->pSeg, pPtr->pPg, eDir, &pNext);
1050   assert( rc==LSM_OK || pNext==0 );
1051   segmentPtrSetPage(pPtr, pNext);
1052   return rc;
1053 }
1054 
segmentPtrLoadCell(SegmentPtr * pPtr,int iNew)1055 static int segmentPtrLoadCell(
1056   SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
1057   int iNew                       /* Cell number of new cell */
1058 ){
1059   int rc = LSM_OK;
1060   if( pPtr->pPg ){
1061     u8 *aData;                    /* Pointer to page data buffer */
1062     int iOff;                     /* Offset in aData[] to read from */
1063     int nPgsz;                    /* Size of page (aData[]) in bytes */
1064 
1065     assert( iNew<pPtr->nCell );
1066     pPtr->iCell = iNew;
1067     aData = fsPageData(pPtr->pPg, &nPgsz);
1068     iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]);
1069     pPtr->eType = aData[iOff];
1070     iOff++;
1071     iOff += GETVARINT64(&aData[iOff], pPtr->iPgPtr);
1072     iOff += GETVARINT32(&aData[iOff], pPtr->nKey);
1073     if( rtIsWrite(pPtr->eType) ){
1074       iOff += GETVARINT32(&aData[iOff], pPtr->nVal);
1075     }
1076     assert( pPtr->nKey>=0 );
1077 
1078     rc = segmentPtrReadData(
1079         pPtr, iOff, pPtr->nKey, &pPtr->pKey, &pPtr->blob1
1080     );
1081     if( rc==LSM_OK && rtIsWrite(pPtr->eType) ){
1082       rc = segmentPtrReadData(
1083           pPtr, iOff+pPtr->nKey, pPtr->nVal, &pPtr->pVal, &pPtr->blob2
1084       );
1085     }else{
1086       pPtr->nVal = 0;
1087       pPtr->pVal = 0;
1088     }
1089   }
1090 
1091   return rc;
1092 }
1093 
1094 
sortedSplitkeySegment(Level * pLevel)1095 static Segment *sortedSplitkeySegment(Level *pLevel){
1096   Merge *pMerge = pLevel->pMerge;
1097   MergeInput *p = &pMerge->splitkey;
1098   Segment *pSeg;
1099   int i;
1100 
1101   for(i=0; i<pMerge->nInput; i++){
1102     if( p->iPg==pMerge->aInput[i].iPg ) break;
1103   }
1104   if( pMerge->nInput==(pLevel->nRight+1) && i>=(pMerge->nInput-1) ){
1105     pSeg = &pLevel->pNext->lhs;
1106   }else{
1107     pSeg = &pLevel->aRhs[i];
1108   }
1109 
1110   return pSeg;
1111 }
1112 
sortedSplitkey(lsm_db * pDb,Level * pLevel,int * pRc)1113 static void sortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
1114   Segment *pSeg;
1115   Page *pPg = 0;
1116   lsm_env *pEnv = pDb->pEnv;      /* Environment handle */
1117   int rc = *pRc;
1118   Merge *pMerge = pLevel->pMerge;
1119 
1120   pSeg = sortedSplitkeySegment(pLevel);
1121   if( rc==LSM_OK ){
1122     rc = lsmFsDbPageGet(pDb->pFS, pSeg, pMerge->splitkey.iPg, &pPg);
1123   }
1124   if( rc==LSM_OK ){
1125     int iTopic;
1126     LsmBlob blob = {0, 0, 0, 0};
1127     u8 *aData;
1128     int nData;
1129 
1130     aData = lsmFsPageData(pPg, &nData);
1131     if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){
1132       void *pKey;
1133       int nKey;
1134       LsmPgno dummy;
1135       rc = pageGetBtreeKey(pSeg,
1136           pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob
1137       );
1138       if( rc==LSM_OK && blob.pData!=pKey ){
1139         rc = sortedBlobSet(pEnv, &blob, pKey, nKey);
1140       }
1141     }else{
1142       rc = pageGetKeyCopy(
1143           pEnv, pSeg, pPg, pMerge->splitkey.iCell, &iTopic, &blob
1144       );
1145     }
1146 
1147     pLevel->iSplitTopic = iTopic;
1148     pLevel->pSplitKey = blob.pData;
1149     pLevel->nSplitKey = blob.nData;
1150     lsmFsPageRelease(pPg);
1151   }
1152 
1153   *pRc = rc;
1154 }
1155 
1156 /*
1157 ** Reset a segment cursor. Also free its buffers if they are nThreshold
1158 ** bytes or larger in size.
1159 */
segmentPtrReset(SegmentPtr * pPtr,int nThreshold)1160 static void segmentPtrReset(SegmentPtr *pPtr, int nThreshold){
1161   lsmFsPageRelease(pPtr->pPg);
1162   pPtr->pPg = 0;
1163   pPtr->nCell = 0;
1164   pPtr->pKey = 0;
1165   pPtr->nKey = 0;
1166   pPtr->pVal = 0;
1167   pPtr->nVal = 0;
1168   pPtr->eType = 0;
1169   pPtr->iCell = 0;
1170   if( pPtr->blob1.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob1);
1171   if( pPtr->blob2.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob2);
1172 }
1173 
segmentPtrIgnoreSeparators(MultiCursor * pCsr,SegmentPtr * pPtr)1174 static int segmentPtrIgnoreSeparators(MultiCursor *pCsr, SegmentPtr *pPtr){
1175   return (pCsr->flags & CURSOR_READ_SEPARATORS)==0
1176       || (pPtr!=&pCsr->aPtr[pCsr->nPtr-1]);
1177 }
1178 
segmentPtrAdvance(MultiCursor * pCsr,SegmentPtr * pPtr,int bReverse)1179 static int segmentPtrAdvance(
1180   MultiCursor *pCsr,
1181   SegmentPtr *pPtr,
1182   int bReverse
1183 ){
1184   int eDir = (bReverse ? -1 : 1);
1185   Level *pLvl = pPtr->pLevel;
1186   do {
1187     int rc;
1188     int iCell;                    /* Number of new cell in page */
1189     int svFlags = 0;              /* SegmentPtr.eType before advance */
1190 
1191     iCell = pPtr->iCell + eDir;
1192     assert( pPtr->pPg );
1193     assert( iCell<=pPtr->nCell && iCell>=-1 );
1194 
1195     if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){
1196       svFlags = pPtr->eType;
1197       assert( svFlags );
1198     }
1199 
1200     if( iCell>=pPtr->nCell || iCell<0 ){
1201       do {
1202         rc = segmentPtrNextPage(pPtr, eDir);
1203       }while( rc==LSM_OK
1204            && pPtr->pPg
1205            && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) )
1206       );
1207       if( rc!=LSM_OK ) return rc;
1208       iCell = bReverse ? (pPtr->nCell-1) : 0;
1209     }
1210     rc = segmentPtrLoadCell(pPtr, iCell);
1211     if( rc!=LSM_OK ) return rc;
1212 
1213     if( svFlags && pPtr->pPg ){
1214       int res = sortedKeyCompare(pCsr->pDb->xCmp,
1215           rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1216           pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1217       );
1218       if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1219     }
1220 
1221     if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
1222       Segment *pSeg = pPtr->pSeg;
1223       rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg);
1224       if( rc!=LSM_OK ) return rc;
1225       pPtr->eType = LSM_START_DELETE | LSM_POINT_DELETE;
1226       pPtr->eType |= (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
1227       pPtr->pKey = pLvl->pSplitKey;
1228       pPtr->nKey = pLvl->nSplitKey;
1229     }
1230 
1231   }while( pCsr
1232        && pPtr->pPg
1233        && segmentPtrIgnoreSeparators(pCsr, pPtr)
1234        && rtIsSeparator(pPtr->eType)
1235   );
1236 
1237   return LSM_OK;
1238 }
1239 
segmentPtrEndPage(FileSystem * pFS,SegmentPtr * pPtr,int bLast,int * pRc)1240 static void segmentPtrEndPage(
1241   FileSystem *pFS,
1242   SegmentPtr *pPtr,
1243   int bLast,
1244   int *pRc
1245 ){
1246   if( *pRc==LSM_OK ){
1247     Segment *pSeg = pPtr->pSeg;
1248     Page *pNew = 0;
1249     if( bLast ){
1250       *pRc = lsmFsDbPageLast(pFS, pSeg, &pNew);
1251     }else{
1252       *pRc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pNew);
1253     }
1254     segmentPtrSetPage(pPtr, pNew);
1255   }
1256 }
1257 
1258 
1259 /*
1260 ** Try to move the segment pointer passed as the second argument so that it
1261 ** points at either the first (bLast==0) or last (bLast==1) cell in the valid
1262 ** region of the segment defined by pPtr->iFirst and pPtr->iLast.
1263 **
1264 ** Return LSM_OK if successful or an lsm error code if something goes
1265 ** wrong (IO error, OOM etc.).
1266 */
segmentPtrEnd(MultiCursor * pCsr,SegmentPtr * pPtr,int bLast)1267 static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
1268   Level *pLvl = pPtr->pLevel;
1269   int rc = LSM_OK;
1270   FileSystem *pFS = pCsr->pDb->pFS;
1271   int bIgnore;
1272 
1273   segmentPtrEndPage(pFS, pPtr, bLast, &rc);
1274   while( rc==LSM_OK && pPtr->pPg
1275       && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
1276   ){
1277     rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
1278   }
1279 
1280   if( rc==LSM_OK && pPtr->pPg ){
1281     rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
1282     if( rc==LSM_OK && bLast && pPtr->pSeg!=&pLvl->lhs ){
1283       int res = sortedKeyCompare(pCsr->pDb->xCmp,
1284           rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1285           pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1286       );
1287       if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1288     }
1289   }
1290 
1291   bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
1292   if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
1293     rc = segmentPtrAdvance(pCsr, pPtr, bLast);
1294   }
1295 
1296 #if 0
1297   if( bLast && rc==LSM_OK && pPtr->pPg
1298    && pPtr->pSeg==&pLvl->lhs
1299    && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
1300   ){
1301     pPtr->iCell++;
1302     pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
1303     pPtr->pKey = pLvl->pSplitKey;
1304     pPtr->nKey = pLvl->nSplitKey;
1305     pPtr->pVal = 0;
1306     pPtr->nVal = 0;
1307   }
1308 #endif
1309 
1310   return rc;
1311 }
1312 
segmentPtrKey(SegmentPtr * pPtr,void ** ppKey,int * pnKey)1313 static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
1314   assert( pPtr->pPg );
1315   *ppKey = pPtr->pKey;
1316   *pnKey = pPtr->nKey;
1317 }
1318 
1319 #if 0 /* NOT USED */
1320 static char *keyToString(lsm_env *pEnv, void *pKey, int nKey){
1321   int i;
1322   u8 *aKey = (u8 *)pKey;
1323   char *zRet = (char *)lsmMalloc(pEnv, nKey+1);
1324 
1325   for(i=0; i<nKey; i++){
1326     zRet[i] = (char)(isalnum(aKey[i]) ? aKey[i] : '.');
1327   }
1328   zRet[nKey] = '\0';
1329   return zRet;
1330 }
1331 #endif
1332 
1333 #if 0 /* NOT USED */
1334 /*
1335 ** Check that the page that pPtr currently has loaded is the correct page
1336 ** to search for key (pKey/nKey). If it is, return 1. Otherwise, an assert
1337 ** fails and this function does not return.
1338 */
1339 static int assertKeyLocation(
1340   MultiCursor *pCsr,
1341   SegmentPtr *pPtr,
1342   void *pKey, int nKey
1343 ){
1344   lsm_env *pEnv = lsmFsEnv(pCsr->pDb->pFS);
1345   LsmBlob blob = {0, 0, 0};
1346   int eDir;
1347   int iTopic = 0;                 /* TODO: Fix me */
1348 
1349   for(eDir=-1; eDir<=1; eDir+=2){
1350     Page *pTest = pPtr->pPg;
1351 
1352     lsmFsPageRef(pTest);
1353     while( pTest ){
1354       Segment *pSeg = pPtr->pSeg;
1355       Page *pNext;
1356 
1357       int rc = lsmFsDbPageNext(pSeg, pTest, eDir, &pNext);
1358       lsmFsPageRelease(pTest);
1359       if( rc ) return 1;
1360       pTest = pNext;
1361 
1362       if( pTest ){
1363         int nData;
1364         u8 *aData = fsPageData(pTest, &nData);
1365         int nCell = pageGetNRec(aData, nData);
1366         int flags = pageGetFlags(aData, nData);
1367         if( nCell && 0==(flags&SEGMENT_BTREE_FLAG) ){
1368           int nPgKey;
1369           int iPgTopic;
1370           u8 *pPgKey;
1371           int res;
1372           int iCell;
1373 
1374           iCell = ((eDir < 0) ? (nCell-1) : 0);
1375           pPgKey = pageGetKey(pSeg, pTest, iCell, &iPgTopic, &nPgKey, &blob);
1376           res = iTopic - iPgTopic;
1377           if( res==0 ) res = pCsr->pDb->xCmp(pKey, nKey, pPgKey, nPgKey);
1378           if( (eDir==1 && res>0) || (eDir==-1 && res<0) ){
1379             /* Taking this branch means something has gone wrong. */
1380             char *zMsg = lsmMallocPrintf(pEnv, "Key \"%s\" is not on page %d",
1381                 keyToString(pEnv, pKey, nKey), lsmFsPageNumber(pPtr->pPg)
1382             );
1383             fprintf(stderr, "%s\n", zMsg);
1384             assert( !"assertKeyLocation() failed" );
1385           }
1386           lsmFsPageRelease(pTest);
1387           pTest = 0;
1388         }
1389       }
1390     }
1391   }
1392 
1393   sortedBlobFree(&blob);
1394   return 1;
1395 }
1396 #endif
1397 
1398 #ifndef NDEBUG
assertSeekResult(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int eSeek)1399 static int assertSeekResult(
1400   MultiCursor *pCsr,
1401   SegmentPtr *pPtr,
1402   int iTopic,
1403   void *pKey,
1404   int nKey,
1405   int eSeek
1406 ){
1407   if( pPtr->pPg ){
1408     int res;
1409     res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
1410         rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
1411     );
1412 
1413     if( eSeek==LSM_SEEK_EQ ) return (res==0);
1414     if( eSeek==LSM_SEEK_LE ) return (res>=0);
1415     if( eSeek==LSM_SEEK_GE ) return (res<=0);
1416   }
1417 
1418   return 1;
1419 }
1420 #endif
1421 
segmentPtrSearchOversized(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey)1422 static int segmentPtrSearchOversized(
1423   MultiCursor *pCsr,              /* Cursor context */
1424   SegmentPtr *pPtr,               /* Pointer to seek */
1425   int iTopic,                     /* Topic of key to search for */
1426   void *pKey, int nKey            /* Key to seek to */
1427 ){
1428   int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
1429   int rc = LSM_OK;
1430 
1431   /* If the OVERSIZED flag is set, then there is no pointer in the
1432   ** upper level to the next page in the segment that contains at least
1433   ** one key. So compare the largest key on the current page with the
1434   ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance
1435   ** to the next page in the segment that contains at least one key.
1436   */
1437   while( rc==LSM_OK && (pPtr->flags & PGFTR_SKIP_NEXT_FLAG) ){
1438     u8 *pLastKey;
1439     int nLastKey;
1440     int iLastTopic;
1441     int res;                      /* Result of comparison */
1442     Page *pNext;
1443 
1444     /* Load the last key on the current page. */
1445     pLastKey = pageGetKey(pPtr->pSeg,
1446         pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
1447     );
1448 
1449     /* If the loaded key is >= than (pKey/nKey), break out of the loop.
1450     ** If (pKey/nKey) is present in this array, it must be on the current
1451     ** page.  */
1452     res = sortedKeyCompare(
1453         xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey
1454     );
1455     if( res>=0 ) break;
1456 
1457     /* Advance to the next page that contains at least one key. */
1458     pNext = pPtr->pPg;
1459     lsmFsPageRef(pNext);
1460     while( 1 ){
1461       Page *pLoad;
1462       u8 *aData; int nData;
1463 
1464       rc = lsmFsDbPageNext(pPtr->pSeg, pNext, 1, &pLoad);
1465       lsmFsPageRelease(pNext);
1466       pNext = pLoad;
1467       if( pNext==0 ) break;
1468 
1469       assert( rc==LSM_OK );
1470       aData = lsmFsPageData(pNext, &nData);
1471       if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0
1472        && pageGetNRec(aData, nData)>0
1473       ){
1474         break;
1475       }
1476     }
1477     if( pNext==0 ) break;
1478     segmentPtrSetPage(pPtr, pNext);
1479 
1480     /* This should probably be an LSM_CORRUPT error. */
1481     assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) );
1482   }
1483 
1484   return rc;
1485 }
1486 
ptrFwdPointer(Page * pPage,int iCell,Segment * pSeg,LsmPgno * piPtr,int * pbFound)1487 static int ptrFwdPointer(
1488   Page *pPage,
1489   int iCell,
1490   Segment *pSeg,
1491   LsmPgno *piPtr,
1492   int *pbFound
1493 ){
1494   Page *pPg = pPage;
1495   int iFirst = iCell;
1496   int rc = LSM_OK;
1497 
1498   do {
1499     Page *pNext = 0;
1500     u8 *aData;
1501     int nData;
1502 
1503     aData = lsmFsPageData(pPg, &nData);
1504     if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){
1505       int i;
1506       int nCell = pageGetNRec(aData, nData);
1507       for(i=iFirst; i<nCell; i++){
1508         u8 eType = *pageGetCell(aData, nData, i);
1509         if( (eType & LSM_START_DELETE)==0 ){
1510           *pbFound = 1;
1511           *piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData);
1512           lsmFsPageRelease(pPg);
1513           return LSM_OK;
1514         }
1515       }
1516     }
1517 
1518     rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
1519     lsmFsPageRelease(pPg);
1520     pPg = pNext;
1521     iFirst = 0;
1522   }while( pPg && rc==LSM_OK );
1523   lsmFsPageRelease(pPg);
1524 
1525   *pbFound = 0;
1526   return rc;
1527 }
1528 
sortedRhsFirst(MultiCursor * pCsr,Level * pLvl,SegmentPtr * pPtr)1529 static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
1530   int rc;
1531   rc = segmentPtrEnd(pCsr, pPtr, 0);
1532   while( pPtr->pPg && rc==LSM_OK ){
1533     int res = sortedKeyCompare(pCsr->pDb->xCmp,
1534         pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
1535         rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
1536     );
1537     if( res<=0 ) break;
1538     rc = segmentPtrAdvance(pCsr, pPtr, 0);
1539   }
1540   return rc;
1541 }
1542 
1543 
1544 /*
1545 ** This function is called as part of a SEEK_GE op on a multi-cursor if the
1546 ** FC pointer read from segment *pPtr comes from an entry with the
1547 ** LSM_START_DELETE flag set. In this case the pointer value cannot be
1548 ** trusted. Instead, the pointer that should be followed is that associated
1549 ** with the next entry in *pPtr that does not have LSM_START_DELETE set.
1550 **
1551 ** Why the pointers can't be trusted:
1552 **
1553 **
1554 **
1555 ** TODO: This is a stop-gap solution:
1556 **
1557 **   At the moment, this function is called from within segmentPtrSeek(),
1558 **   as part of the initial lsmMCursorSeek() call. However, consider a
1559 **   database where the following has occurred:
1560 **
1561 **      1. A range delete removes keys 1..9999 using a range delete.
1562 **      2. Keys 1 through 9999 are reinserted.
1563 **      3. The levels containing the ops in 1. and 2. above are merged. Call
1564 **         this level N. Level N contains FC pointers to level N+1.
1565 **
1566 **   Then, if the user attempts to query for (key>=2 LIMIT 10), the
1567 **   lsmMCursorSeek() call will iterate through 9998 entries searching for a
1568 **   pointer down to the level N+1 that is never actually used. It would be
1569 **   much better if the multi-cursor could do this lazily - only seek to the
1570 **   level (N+1) page after the user has moved the cursor on level N passed
1571 **   the big range-delete.
1572 */
segmentPtrFwdPointer(MultiCursor * pCsr,SegmentPtr * pPtr,LsmPgno * piPtr)1573 static int segmentPtrFwdPointer(
1574   MultiCursor *pCsr,              /* Multi-cursor pPtr belongs to */
1575   SegmentPtr *pPtr,               /* Segment-pointer to extract FC ptr from */
1576   LsmPgno *piPtr                  /* OUT: FC pointer value */
1577 ){
1578   Level *pLvl = pPtr->pLevel;
1579   Level *pNext = pLvl->pNext;
1580   Page *pPg = pPtr->pPg;
1581   int rc;
1582   int bFound;
1583   LsmPgno iOut = 0;
1584 
1585   if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){
1586     if( pNext==0
1587         || (pNext->nRight==0 && pNext->lhs.iRoot)
1588         || (pNext->nRight!=0 && pNext->aRhs[0].iRoot)
1589       ){
1590       /* Do nothing. The pointer will not be used anyway. */
1591       return LSM_OK;
1592     }
1593   }else{
1594     if( pPtr[1].pSeg->iRoot ){
1595       return LSM_OK;
1596     }
1597   }
1598 
1599   /* Search for a pointer within the current segment. */
1600   lsmFsPageRef(pPg);
1601   rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound);
1602 
1603   if( rc==LSM_OK && bFound==0 ){
1604     /* This case happens when pPtr points to the left-hand-side of a segment
1605     ** currently undergoing an incremental merge. In this case, jump to the
1606     ** oldest segment in the right-hand-side of the same level and continue
1607     ** searching. But - do not consider any keys smaller than the levels
1608     ** split-key. */
1609     SegmentPtr ptr;
1610 
1611     if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){
1612       return LSM_CORRUPT_BKPT;
1613     }
1614 
1615     memset(&ptr, 0, sizeof(SegmentPtr));
1616     ptr.pLevel = pPtr->pLevel;
1617     ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1];
1618     rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr);
1619     if( rc==LSM_OK ){
1620       rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound);
1621       ptr.pPg = 0;
1622     }
1623     segmentPtrReset(&ptr, 0);
1624   }
1625 
1626   *piPtr = iOut;
1627   return rc;
1628 }
1629 
segmentPtrSeek(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int eSeek,int * piPtr,int * pbStop)1630 static int segmentPtrSeek(
1631   MultiCursor *pCsr,              /* Cursor context */
1632   SegmentPtr *pPtr,               /* Pointer to seek */
1633   int iTopic,                     /* Key topic to seek to */
1634   void *pKey, int nKey,           /* Key to seek to */
1635   int eSeek,                      /* Search bias - see above */
1636   int *piPtr,                     /* OUT: FC pointer */
1637   int *pbStop
1638 ){
1639   int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
1640   int res = 0;                        /* Result of comparison operation */
1641   int rc = LSM_OK;
1642   int iMin;
1643   int iMax;
1644   LsmPgno iPtrOut = 0;
1645 
1646   /* If the current page contains an oversized entry, then there are no
1647   ** pointers to one or more of the subsequent pages in the sorted run.
1648   ** The following call ensures that the segment-ptr points to the correct
1649   ** page in this case.  */
1650   rc = segmentPtrSearchOversized(pCsr, pPtr, iTopic, pKey, nKey);
1651   iPtrOut = pPtr->iPtr;
1652 
1653   /* Assert that this page is the right page of this segment for the key
1654   ** that we are searching for. Do this by loading page (iPg-1) and testing
1655   ** that pKey/nKey is greater than all keys on that page, and then by
1656   ** loading (iPg+1) and testing that pKey/nKey is smaller than all
1657   ** the keys it houses.
1658   **
1659   ** TODO: With range-deletes in the tree, the test described above may fail.
1660   */
1661 #if 0
1662   assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
1663 #endif
1664 
1665   assert( pPtr->nCell>0
1666        || pPtr->pSeg->nSize==1
1667        || lsmFsDbPageIsLast(pPtr->pSeg, pPtr->pPg)
1668   );
1669   if( pPtr->nCell==0 ){
1670     segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1671   }else{
1672     iMin = 0;
1673     iMax = pPtr->nCell-1;
1674 
1675     while( 1 ){
1676       int iTry = (iMin+iMax)/2;
1677       void *pKeyT; int nKeyT;       /* Key for cell iTry */
1678       int iTopicT;
1679 
1680       assert( iTry<iMax || iMin==iMax );
1681 
1682       rc = segmentPtrLoadCell(pPtr, iTry);
1683       if( rc!=LSM_OK ) break;
1684 
1685       segmentPtrKey(pPtr, &pKeyT, &nKeyT);
1686       iTopicT = rtTopic(pPtr->eType);
1687 
1688       res = sortedKeyCompare(xCmp, iTopicT, pKeyT, nKeyT, iTopic, pKey, nKey);
1689       if( res<=0 ){
1690         iPtrOut = pPtr->iPtr + pPtr->iPgPtr;
1691       }
1692 
1693       if( res==0 || iMin==iMax ){
1694         break;
1695       }else if( res>0 ){
1696         iMax = LSM_MAX(iTry-1, iMin);
1697       }else{
1698         iMin = iTry+1;
1699       }
1700     }
1701 
1702     if( rc==LSM_OK ){
1703       assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
1704       if( res ){
1705         rc = segmentPtrLoadCell(pPtr, iMin);
1706       }
1707       assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) );
1708 
1709       if( rc==LSM_OK ){
1710         switch( eSeek ){
1711           case LSM_SEEK_EQ: {
1712             int eType = pPtr->eType;
1713             if( (res<0 && (eType & LSM_START_DELETE))
1714              || (res>0 && (eType & LSM_END_DELETE))
1715              || (res==0 && (eType & LSM_POINT_DELETE))
1716             ){
1717               *pbStop = 1;
1718             }else if( res==0 && (eType & LSM_INSERT) ){
1719               lsm_env *pEnv = pCsr->pDb->pEnv;
1720               *pbStop = 1;
1721               pCsr->eType = pPtr->eType;
1722               rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey);
1723               if( rc==LSM_OK ){
1724                 rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal);
1725               }
1726               pCsr->flags |= CURSOR_SEEK_EQ;
1727             }
1728             segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1729             break;
1730           }
1731           case LSM_SEEK_LE:
1732             if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
1733             break;
1734           case LSM_SEEK_GE: {
1735             /* Figure out if we need to 'skip' the pointer forward or not */
1736             if( (res<=0 && (pPtr->eType & LSM_START_DELETE))
1737              || (res>0  && (pPtr->eType & LSM_END_DELETE))
1738             ){
1739               rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut);
1740             }
1741             if( res<0 && rc==LSM_OK ){
1742               rc = segmentPtrAdvance(pCsr, pPtr, 0);
1743             }
1744             break;
1745           }
1746         }
1747       }
1748     }
1749 
1750     /* If the cursor seek has found a separator key, and this cursor is
1751     ** supposed to ignore separators keys, advance to the next entry.  */
1752     if( rc==LSM_OK && pPtr->pPg
1753      && segmentPtrIgnoreSeparators(pCsr, pPtr)
1754      && rtIsSeparator(pPtr->eType)
1755     ){
1756       assert( eSeek!=LSM_SEEK_EQ );
1757       rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
1758     }
1759   }
1760 
1761   assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
1762   *piPtr = (int)iPtrOut;
1763   return rc;
1764 }
1765 
seekInBtree(MultiCursor * pCsr,Segment * pSeg,int iTopic,void * pKey,int nKey,LsmPgno * aPg,Page ** ppPg)1766 static int seekInBtree(
1767   MultiCursor *pCsr,              /* Multi-cursor object */
1768   Segment *pSeg,                  /* Seek within this segment */
1769   int iTopic,
1770   void *pKey, int nKey,           /* Key to seek to */
1771   LsmPgno *aPg,                   /* OUT: Page numbers */
1772   Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
1773 ){
1774   int i = 0;
1775   int rc;
1776   int iPg;
1777   Page *pPg = 0;
1778   LsmBlob blob = {0, 0, 0};
1779 
1780   iPg = (int)pSeg->iRoot;
1781   do {
1782     LsmPgno *piFirst = 0;
1783     if( aPg ){
1784       aPg[i++] = iPg;
1785       piFirst = &aPg[i];
1786     }
1787 
1788     rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, iPg, &pPg);
1789     assert( rc==LSM_OK || pPg==0 );
1790     if( rc==LSM_OK ){
1791       u8 *aData;                  /* Buffer containing page data */
1792       int nData;                  /* Size of aData[] in bytes */
1793       int iMin;
1794       int iMax;
1795       int nRec;
1796       int flags;
1797 
1798       aData = fsPageData(pPg, &nData);
1799       flags = pageGetFlags(aData, nData);
1800       if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
1801 
1802       iPg = (int)pageGetPtr(aData, nData);
1803       nRec = pageGetNRec(aData, nData);
1804 
1805       iMin = 0;
1806       iMax = nRec-1;
1807       while( iMax>=iMin ){
1808         int iTry = (iMin+iMax)/2;
1809         void *pKeyT; int nKeyT;       /* Key for cell iTry */
1810         int iTopicT;                  /* Topic for key pKeyT/nKeyT */
1811         LsmPgno iPtr;                 /* Pointer associated with cell iTry */
1812         int res;                      /* (pKey - pKeyT) */
1813 
1814         rc = pageGetBtreeKey(
1815             pSeg, pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob
1816         );
1817         if( rc!=LSM_OK ) break;
1818         if( piFirst && pKeyT==blob.pData ){
1819           *piFirst = pageGetBtreeRef(pPg, iTry);
1820           piFirst = 0;
1821           i++;
1822         }
1823 
1824         res = sortedKeyCompare(
1825             pCsr->pDb->xCmp, iTopic, pKey, nKey, iTopicT, pKeyT, nKeyT
1826         );
1827         if( res<0 ){
1828           iPg = (int)iPtr;
1829           iMax = iTry-1;
1830         }else{
1831           iMin = iTry+1;
1832         }
1833       }
1834       lsmFsPageRelease(pPg);
1835       pPg = 0;
1836     }
1837   }while( rc==LSM_OK );
1838 
1839   sortedBlobFree(&blob);
1840   assert( (rc==LSM_OK)==(pPg!=0) );
1841   if( ppPg ){
1842     *ppPg = pPg;
1843   }else{
1844     lsmFsPageRelease(pPg);
1845   }
1846   return rc;
1847 }
1848 
seekInSegment(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int iPg,int eSeek,int * piPtr,int * pbStop)1849 static int seekInSegment(
1850   MultiCursor *pCsr,
1851   SegmentPtr *pPtr,
1852   int iTopic,
1853   void *pKey, int nKey,
1854   int iPg,                        /* Page to search */
1855   int eSeek,                      /* Search bias - see above */
1856   int *piPtr,                     /* OUT: FC pointer */
1857   int *pbStop                     /* OUT: Stop search flag */
1858 ){
1859   int iPtr = iPg;
1860   int rc = LSM_OK;
1861 
1862   if( pPtr->pSeg->iRoot ){
1863     Page *pPg;
1864     assert( pPtr->pSeg->iRoot!=0 );
1865     rc = seekInBtree(pCsr, pPtr->pSeg, iTopic, pKey, nKey, 0, &pPg);
1866     if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
1867   }else{
1868     if( iPtr==0 ){
1869       iPtr = (int)pPtr->pSeg->iFirst;
1870     }
1871     if( rc==LSM_OK ){
1872       rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
1873     }
1874   }
1875 
1876   if( rc==LSM_OK ){
1877     rc = segmentPtrSeek(pCsr, pPtr, iTopic, pKey, nKey, eSeek, piPtr, pbStop);
1878   }
1879   return rc;
1880 }
1881 
1882 /*
1883 ** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
1884 **
1885 ** pbStop:
1886 **   This parameter is only significant if parameter eSeek is set to
1887 **   LSM_SEEK_EQ. In this case, it is set to true before returning if
1888 **   the seek operation is finished. This can happen in two ways:
1889 **
1890 **     a) A key matching (pKey/nKey) is found, or
1891 **     b) A point-delete or range-delete deleting the key is found.
1892 **
1893 **   In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
1894 **   and pCsr->val blobs populated before returning.
1895 */
seekInLevel(MultiCursor * pCsr,SegmentPtr * aPtr,int eSeek,int iTopic,void * pKey,int nKey,LsmPgno * piPgno,int * pbStop)1896 static int seekInLevel(
1897   MultiCursor *pCsr,              /* Sorted cursor object to seek */
1898   SegmentPtr *aPtr,               /* Pointer to array of (nRhs+1) SPs */
1899   int eSeek,                      /* Search bias - see above */
1900   int iTopic,                     /* Key topic to search for */
1901   void *pKey, int nKey,           /* Key to search for */
1902   LsmPgno *piPgno,                /* IN/OUT: fraction cascade pointer (or 0) */
1903   int *pbStop                     /* OUT: See above */
1904 ){
1905   Level *pLvl = aPtr[0].pLevel;   /* Level to seek within */
1906   int rc = LSM_OK;                /* Return code */
1907   int iOut = 0;                   /* Pointer to return to caller */
1908   int res = -1;                   /* Result of xCmp(pKey, split) */
1909   int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */
1910   int bStop = 0;
1911 
1912   /* If this is a composite level (one currently undergoing an incremental
1913   ** merge), figure out if the search key is larger or smaller than the
1914   ** levels split-key.  */
1915   if( nRhs ){
1916     res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
1917         pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1918     );
1919   }
1920 
1921   /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
1922   ** is not a composite level and there is no split-key). Search the
1923   ** left-hand-side of the level in this case.  */
1924   if( res<0 ){
1925     int i;
1926     int iPtr = 0;
1927     if( nRhs==0 ) iPtr = (int)*piPgno;
1928 
1929     rc = seekInSegment(
1930         pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
1931     );
1932     if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
1933       res = 0;
1934     }
1935     for(i=1; i<=nRhs; i++){
1936       segmentPtrReset(&aPtr[i], LSM_SEGMENTPTR_FREE_THRESHOLD);
1937     }
1938   }
1939 
1940   if( res>=0 ){
1941     int bHit = 0;                 /* True if at least one rhs is not EOF */
1942     int iPtr = (int)*piPgno;
1943     int i;
1944     segmentPtrReset(&aPtr[0], LSM_SEGMENTPTR_FREE_THRESHOLD);
1945     for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
1946       SegmentPtr *pPtr = &aPtr[i];
1947       iOut = 0;
1948       rc = seekInSegment(
1949           pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
1950       );
1951       iPtr = iOut;
1952 
1953       /* If the segment-pointer has settled on a key that is smaller than
1954       ** the splitkey, invalidate the segment-pointer.  */
1955       if( pPtr->pPg ){
1956         res = sortedKeyCompare(pCsr->pDb->xCmp,
1957             rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1958             pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1959         );
1960         if( res<0 ){
1961           if( pPtr->eType & LSM_START_DELETE ){
1962             pPtr->eType &= ~LSM_INSERT;
1963             pPtr->pKey = pLvl->pSplitKey;
1964             pPtr->nKey = pLvl->nSplitKey;
1965             pPtr->pVal = 0;
1966             pPtr->nVal = 0;
1967           }else{
1968             segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1969           }
1970         }
1971       }
1972 
1973       if( aPtr[i].pKey ) bHit = 1;
1974     }
1975 
1976     if( rc==LSM_OK && eSeek==LSM_SEEK_LE && bHit==0 ){
1977       rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
1978     }
1979   }
1980 
1981   assert( eSeek==LSM_SEEK_EQ || bStop==0 );
1982   *piPgno = iOut;
1983   *pbStop = bStop;
1984   return rc;
1985 }
1986 
multiCursorGetKey(MultiCursor * pCsr,int iKey,int * peType,void ** ppKey,int * pnKey)1987 static void multiCursorGetKey(
1988   MultiCursor *pCsr,
1989   int iKey,
1990   int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
1991   void **ppKey,                   /* OUT: Pointer to buffer containing key */
1992   int *pnKey                      /* OUT: Size of *ppKey in bytes */
1993 ){
1994   int nKey = 0;
1995   void *pKey = 0;
1996   int eType = 0;
1997 
1998   switch( iKey ){
1999     case CURSOR_DATA_TREE0:
2000     case CURSOR_DATA_TREE1: {
2001       TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
2002       if( lsmTreeCursorValid(pTreeCsr) ){
2003         lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey);
2004       }
2005       break;
2006     }
2007 
2008     case CURSOR_DATA_SYSTEM: {
2009       Snapshot *pWorker = pCsr->pDb->pWorker;
2010       if( pWorker && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){
2011         int nEntry = pWorker->freelist.nEntry;
2012         if( pCsr->iFree < (nEntry*2) ){
2013           FreelistEntry *aEntry = pWorker->freelist.aEntry;
2014           int i = nEntry - 1 - (pCsr->iFree / 2);
2015           u32 iKey2 = 0;
2016 
2017           if( (pCsr->iFree % 2) ){
2018             eType = LSM_END_DELETE|LSM_SYSTEMKEY;
2019             iKey2 = aEntry[i].iBlk-1;
2020           }else if( aEntry[i].iId>=0 ){
2021             eType = LSM_INSERT|LSM_SYSTEMKEY;
2022             iKey2 = aEntry[i].iBlk;
2023 
2024             /* If the in-memory entry immediately before this one was a
2025              ** DELETE, and the block number is one greater than the current
2026              ** block number, mark this entry as an "end-delete-range". */
2027             if( i<(nEntry-1) && aEntry[i+1].iBlk==iKey2+1 && aEntry[i+1].iId<0 ){
2028               eType |= LSM_END_DELETE;
2029             }
2030 
2031           }else{
2032             eType = LSM_START_DELETE|LSM_SYSTEMKEY;
2033             iKey2 = aEntry[i].iBlk + 1;
2034           }
2035 
2036           /* If the in-memory entry immediately after this one is a
2037           ** DELETE, and the block number is one less than the current
2038           ** key, mark this entry as an "start-delete-range".  */
2039           if( i>0 && aEntry[i-1].iBlk==iKey2-1 && aEntry[i-1].iId<0 ){
2040             eType |= LSM_START_DELETE;
2041           }
2042 
2043           pKey = pCsr->pSystemVal;
2044           nKey = 4;
2045           lsmPutU32(pKey, ~iKey2);
2046         }
2047       }
2048       break;
2049     }
2050 
2051     default: {
2052       int iPtr = iKey - CURSOR_DATA_SEGMENT;
2053       assert( iPtr>=0 );
2054       if( iPtr==pCsr->nPtr ){
2055         if( pCsr->pBtCsr ){
2056           pKey = pCsr->pBtCsr->pKey;
2057           nKey = pCsr->pBtCsr->nKey;
2058           eType = pCsr->pBtCsr->eType;
2059         }
2060       }else if( iPtr<pCsr->nPtr ){
2061         SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2062         if( pPtr->pPg ){
2063           pKey = pPtr->pKey;
2064           nKey = pPtr->nKey;
2065           eType = pPtr->eType;
2066         }
2067       }
2068       break;
2069     }
2070   }
2071 
2072   if( peType ) *peType = eType;
2073   if( pnKey ) *pnKey = nKey;
2074   if( ppKey ) *ppKey = pKey;
2075 }
2076 
sortedDbKeyCompare(MultiCursor * pCsr,int iLhsFlags,void * pLhsKey,int nLhsKey,int iRhsFlags,void * pRhsKey,int nRhsKey)2077 static int sortedDbKeyCompare(
2078   MultiCursor *pCsr,
2079   int iLhsFlags, void *pLhsKey, int nLhsKey,
2080   int iRhsFlags, void *pRhsKey, int nRhsKey
2081 ){
2082   int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
2083   int res;
2084 
2085   /* Compare the keys, including the system flag. */
2086   res = sortedKeyCompare(xCmp,
2087     rtTopic(iLhsFlags), pLhsKey, nLhsKey,
2088     rtTopic(iRhsFlags), pRhsKey, nRhsKey
2089   );
2090 
2091   /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
2092   ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
2093   ** the beginning of an open-ended set from masking a database entry or
2094   ** delete at a lower level.  */
2095   if( res==0 && (pCsr->flags & CURSOR_IGNORE_DELETE) ){
2096     const int m = LSM_POINT_DELETE|LSM_INSERT|LSM_END_DELETE |LSM_START_DELETE;
2097     int iDel1 = 0;
2098     int iDel2 = 0;
2099 
2100     if( LSM_START_DELETE==(iLhsFlags & m) ) iDel1 = +1;
2101     if( LSM_END_DELETE  ==(iLhsFlags & m) ) iDel1 = -1;
2102     if( LSM_START_DELETE==(iRhsFlags & m) ) iDel2 = +1;
2103     if( LSM_END_DELETE  ==(iRhsFlags & m) ) iDel2 = -1;
2104 
2105     res = (iDel1 - iDel2);
2106   }
2107 
2108   return res;
2109 }
2110 
multiCursorDoCompare(MultiCursor * pCsr,int iOut,int bReverse)2111 static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
2112   int i1;
2113   int i2;
2114   int iRes;
2115   void *pKey1; int nKey1; int eType1;
2116   void *pKey2; int nKey2; int eType2;
2117   const int mul = (bReverse ? -1 : 1);
2118 
2119   assert( pCsr->aTree && iOut<pCsr->nTree );
2120   if( iOut>=(pCsr->nTree/2) ){
2121     i1 = (iOut - pCsr->nTree/2) * 2;
2122     i2 = i1 + 1;
2123   }else{
2124     i1 = pCsr->aTree[iOut*2];
2125     i2 = pCsr->aTree[iOut*2+1];
2126   }
2127 
2128   multiCursorGetKey(pCsr, i1, &eType1, &pKey1, &nKey1);
2129   multiCursorGetKey(pCsr, i2, &eType2, &pKey2, &nKey2);
2130 
2131   if( pKey1==0 ){
2132     iRes = i2;
2133   }else if( pKey2==0 ){
2134     iRes = i1;
2135   }else{
2136     int res;
2137 
2138     /* Compare the keys */
2139     res = sortedDbKeyCompare(pCsr,
2140         eType1, pKey1, nKey1, eType2, pKey2, nKey2
2141     );
2142 
2143     res = res * mul;
2144     if( res==0 ){
2145       /* The two keys are identical. Normally, this means that the key from
2146       ** the newer run clobbers the old. However, if the newer key is a
2147       ** separator key, or a range-delete-boundary only, do not allow it
2148       ** to clobber an older entry.  */
2149       int nc1 = (eType1 & (LSM_INSERT|LSM_POINT_DELETE))==0;
2150       int nc2 = (eType2 & (LSM_INSERT|LSM_POINT_DELETE))==0;
2151       iRes = (nc1 > nc2) ? i2 : i1;
2152     }else if( res<0 ){
2153       iRes = i1;
2154     }else{
2155       iRes = i2;
2156     }
2157   }
2158 
2159   pCsr->aTree[iOut] = iRes;
2160 }
2161 
2162 /*
2163 ** This function advances segment pointer iPtr belonging to multi-cursor
2164 ** pCsr forward (bReverse==0) or backward (bReverse!=0).
2165 **
2166 ** If the segment pointer points to a segment that is part of a composite
2167 ** level, then the following special case is handled.
2168 **
2169 **   * If iPtr is the lhs of a composite level, and the cursor is being
2170 **     advanced forwards, and segment iPtr is at EOF, move all pointers
2171 **     that correspond to rhs segments of the same level to the first
2172 **     key in their respective data.
2173 */
segmentCursorAdvance(MultiCursor * pCsr,int iPtr,int bReverse)2174 static int segmentCursorAdvance(
2175   MultiCursor *pCsr,
2176   int iPtr,
2177   int bReverse
2178 ){
2179   int rc;
2180   SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2181   Level *pLvl = pPtr->pLevel;
2182   int bComposite;                 /* True if pPtr is part of composite level */
2183 
2184   /* Advance the segment-pointer object. */
2185   rc = segmentPtrAdvance(pCsr, pPtr, bReverse);
2186   if( rc!=LSM_OK ) return rc;
2187 
2188   bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);
2189   if( bComposite && pPtr->pPg==0 ){
2190     int bFix = 0;
2191     if( (bReverse==0)==(pPtr->pSeg==&pLvl->lhs) ){
2192       int i;
2193       if( bReverse ){
2194         SegmentPtr *pLhs = &pCsr->aPtr[iPtr - 1 - (pPtr->pSeg - pLvl->aRhs)];
2195         for(i=0; i<pLvl->nRight; i++){
2196           if( pLhs[i+1].pPg ) break;
2197         }
2198         if( i==pLvl->nRight ){
2199           bFix = 1;
2200           rc = segmentPtrEnd(pCsr, pLhs, 1);
2201         }
2202       }else{
2203         bFix = 1;
2204         for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
2205           rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
2206         }
2207       }
2208     }
2209 
2210     if( bFix ){
2211       int i;
2212       for(i=pCsr->nTree-1; i>0; i--){
2213         multiCursorDoCompare(pCsr, i, bReverse);
2214       }
2215     }
2216   }
2217 
2218 #if 0
2219   if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
2220    && bReverse==0                                /* csr advanced forwards */
2221    && pPtr->pPg==0                               /* segment at EOF */
2222   ){
2223     int i;
2224     for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
2225       rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
2226     }
2227     for(i=pCsr->nTree-1; i>0; i--){
2228       multiCursorDoCompare(pCsr, i, 0);
2229     }
2230   }
2231 #endif
2232 
2233   return rc;
2234 }
2235 
mcursorFreeComponents(MultiCursor * pCsr)2236 static void mcursorFreeComponents(MultiCursor *pCsr){
2237   int i;
2238   lsm_env *pEnv = pCsr->pDb->pEnv;
2239 
2240   /* Close the tree cursor, if any. */
2241   lsmTreeCursorDestroy(pCsr->apTreeCsr[0]);
2242   lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
2243 
2244   /* Reset the segment pointers */
2245   for(i=0; i<pCsr->nPtr; i++){
2246     segmentPtrReset(&pCsr->aPtr[i], 0);
2247   }
2248 
2249   /* And the b-tree cursor, if any */
2250   btreeCursorFree(pCsr->pBtCsr);
2251 
2252   /* Free allocations */
2253   lsmFree(pEnv, pCsr->aPtr);
2254   lsmFree(pEnv, pCsr->aTree);
2255   lsmFree(pEnv, pCsr->pSystemVal);
2256 
2257   /* Zero fields */
2258   pCsr->nPtr = 0;
2259   pCsr->aPtr = 0;
2260   pCsr->nTree = 0;
2261   pCsr->aTree = 0;
2262   pCsr->pSystemVal = 0;
2263   pCsr->apTreeCsr[0] = 0;
2264   pCsr->apTreeCsr[1] = 0;
2265   pCsr->pBtCsr = 0;
2266 }
2267 
lsmMCursorFreeCache(lsm_db * pDb)2268 void lsmMCursorFreeCache(lsm_db *pDb){
2269   MultiCursor *p;
2270   MultiCursor *pNext;
2271   for(p=pDb->pCsrCache; p; p=pNext){
2272     pNext = p->pNext;
2273     lsmMCursorClose(p, 0);
2274   }
2275   pDb->pCsrCache = 0;
2276 }
2277 
2278 /*
2279 ** Close the cursor passed as the first argument.
2280 **
2281 ** If the bCache parameter is true, then shift the cursor to the pCsrCache
2282 ** list for possible reuse instead of actually deleting it.
2283 */
lsmMCursorClose(MultiCursor * pCsr,int bCache)2284 void lsmMCursorClose(MultiCursor *pCsr, int bCache){
2285   if( pCsr ){
2286     lsm_db *pDb = pCsr->pDb;
2287     MultiCursor **pp;             /* Iterator variable */
2288 
2289     /* The cursor may or may not be currently part of the linked list
2290     ** starting at lsm_db.pCsr. If it is, extract it.  */
2291     for(pp=&pDb->pCsr; *pp; pp=&((*pp)->pNext)){
2292       if( *pp==pCsr ){
2293         *pp = pCsr->pNext;
2294         break;
2295       }
2296     }
2297 
2298     if( bCache ){
2299       int i;                      /* Used to iterate through segment-pointers */
2300 
2301       /* Release any page references held by this cursor. */
2302       assert( !pCsr->pBtCsr );
2303       for(i=0; i<pCsr->nPtr; i++){
2304         SegmentPtr *pPtr = &pCsr->aPtr[i];
2305         lsmFsPageRelease(pPtr->pPg);
2306         pPtr->pPg = 0;
2307       }
2308 
2309       /* Reset the tree cursors */
2310       lsmTreeCursorReset(pCsr->apTreeCsr[0]);
2311       lsmTreeCursorReset(pCsr->apTreeCsr[1]);
2312 
2313       /* Add the cursor to the pCsrCache list */
2314       pCsr->pNext = pDb->pCsrCache;
2315       pDb->pCsrCache = pCsr;
2316     }else{
2317       /* Free the allocation used to cache the current key, if any. */
2318       sortedBlobFree(&pCsr->key);
2319       sortedBlobFree(&pCsr->val);
2320 
2321       /* Free the component cursors */
2322       mcursorFreeComponents(pCsr);
2323 
2324       /* Free the cursor structure itself */
2325       lsmFree(pDb->pEnv, pCsr);
2326     }
2327   }
2328 }
2329 
2330 #define TREE_NONE 0
2331 #define TREE_OLD  1
2332 #define TREE_BOTH 2
2333 
2334 /*
2335 ** Parameter eTree is one of TREE_OLD or TREE_BOTH.
2336 */
multiCursorAddTree(MultiCursor * pCsr,Snapshot * pSnap,int eTree)2337 static int multiCursorAddTree(MultiCursor *pCsr, Snapshot *pSnap, int eTree){
2338   int rc = LSM_OK;
2339   lsm_db *db = pCsr->pDb;
2340 
2341   /* Add a tree cursor on the 'old' tree, if it exists. */
2342   if( eTree!=TREE_NONE
2343    && lsmTreeHasOld(db)
2344    && db->treehdr.iOldLog!=pSnap->iLogOff
2345   ){
2346     rc = lsmTreeCursorNew(db, 1, &pCsr->apTreeCsr[1]);
2347   }
2348 
2349   /* Add a tree cursor on the 'current' tree, if required. */
2350   if( rc==LSM_OK && eTree==TREE_BOTH ){
2351     rc = lsmTreeCursorNew(db, 0, &pCsr->apTreeCsr[0]);
2352   }
2353 
2354   return rc;
2355 }
2356 
multiCursorAddRhs(MultiCursor * pCsr,Level * pLvl)2357 static int multiCursorAddRhs(MultiCursor *pCsr, Level *pLvl){
2358   int i;
2359   int nRhs = pLvl->nRight;
2360 
2361   assert( pLvl->nRight>0 );
2362   assert( pCsr->aPtr==0 );
2363   pCsr->aPtr = lsmMallocZero(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nRhs);
2364   if( !pCsr->aPtr ) return LSM_NOMEM_BKPT;
2365   pCsr->nPtr = nRhs;
2366 
2367   for(i=0; i<nRhs; i++){
2368     pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
2369     pCsr->aPtr[i].pLevel = pLvl;
2370   }
2371 
2372   return LSM_OK;
2373 }
2374 
multiCursorAddOne(MultiCursor * pCsr,Level * pLvl,int * pRc)2375 static void multiCursorAddOne(MultiCursor *pCsr, Level *pLvl, int *pRc){
2376   if( *pRc==LSM_OK ){
2377     int iPtr = pCsr->nPtr;
2378     int i;
2379     pCsr->aPtr[iPtr].pLevel = pLvl;
2380     pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
2381     iPtr++;
2382     for(i=0; i<pLvl->nRight; i++){
2383       pCsr->aPtr[iPtr].pLevel = pLvl;
2384       pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
2385       iPtr++;
2386     }
2387 
2388     if( pLvl->nRight && pLvl->pSplitKey==0 ){
2389       sortedSplitkey(pCsr->pDb, pLvl, pRc);
2390     }
2391     pCsr->nPtr = iPtr;
2392   }
2393 }
2394 
multiCursorAddAll(MultiCursor * pCsr,Snapshot * pSnap)2395 static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
2396   Level *pLvl;
2397   int nPtr = 0;
2398   int rc = LSM_OK;
2399 
2400   for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
2401     /* If the LEVEL_INCOMPLETE flag is set, then this function is being
2402     ** called (indirectly) from within a sortedNewToplevel() call to
2403     ** construct pLvl. In this case ignore pLvl - this cursor is going to
2404     ** be used to retrieve a freelist entry from the LSM, and the partially
2405     ** complete level may confuse it.  */
2406     if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
2407     nPtr += (1 + pLvl->nRight);
2408   }
2409 
2410   assert( pCsr->aPtr==0 );
2411   pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
2412 
2413   for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
2414     if( (pLvl->flags & LEVEL_INCOMPLETE)==0 ){
2415       multiCursorAddOne(pCsr, pLvl, &rc);
2416     }
2417   }
2418 
2419   return rc;
2420 }
2421 
multiCursorInit(MultiCursor * pCsr,Snapshot * pSnap)2422 static int multiCursorInit(MultiCursor *pCsr, Snapshot *pSnap){
2423   int rc;
2424   rc = multiCursorAddAll(pCsr, pSnap);
2425   if( rc==LSM_OK ){
2426     rc = multiCursorAddTree(pCsr, pSnap, TREE_BOTH);
2427   }
2428   pCsr->flags |= (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
2429   return rc;
2430 }
2431 
multiCursorNew(lsm_db * db,int * pRc)2432 static MultiCursor *multiCursorNew(lsm_db *db, int *pRc){
2433   MultiCursor *pCsr;
2434   pCsr = (MultiCursor *)lsmMallocZeroRc(db->pEnv, sizeof(MultiCursor), pRc);
2435   if( pCsr ){
2436     pCsr->pNext = db->pCsr;
2437     db->pCsr = pCsr;
2438     pCsr->pDb = db;
2439   }
2440   return pCsr;
2441 }
2442 
2443 
lsmSortedRemap(lsm_db * pDb)2444 void lsmSortedRemap(lsm_db *pDb){
2445   MultiCursor *pCsr;
2446   for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
2447     int iPtr;
2448     if( pCsr->pBtCsr ){
2449       btreeCursorLoadKey(pCsr->pBtCsr);
2450     }
2451     for(iPtr=0; iPtr<pCsr->nPtr; iPtr++){
2452       segmentPtrLoadCell(&pCsr->aPtr[iPtr], pCsr->aPtr[iPtr].iCell);
2453     }
2454   }
2455 }
2456 
multiCursorReadSeparators(MultiCursor * pCsr)2457 static void multiCursorReadSeparators(MultiCursor *pCsr){
2458   if( pCsr->nPtr>0 ){
2459     pCsr->flags |= CURSOR_READ_SEPARATORS;
2460   }
2461 }
2462 
2463 /*
2464 ** Have this cursor skip over SORTED_DELETE entries.
2465 */
multiCursorIgnoreDelete(MultiCursor * pCsr)2466 static void multiCursorIgnoreDelete(MultiCursor *pCsr){
2467   if( pCsr ) pCsr->flags |= CURSOR_IGNORE_DELETE;
2468 }
2469 
2470 /*
2471 ** If the free-block list is not empty, then have this cursor visit a key
2472 ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value
2473 ** blob containing the serialized free-block list.
2474 */
multiCursorVisitFreelist(MultiCursor * pCsr)2475 static int multiCursorVisitFreelist(MultiCursor *pCsr){
2476   int rc = LSM_OK;
2477   pCsr->flags |= CURSOR_FLUSH_FREELIST;
2478   pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
2479   return rc;
2480 }
2481 
2482 /*
2483 ** Allocate and return a new database cursor.
2484 **
2485 ** This method should only be called to allocate user cursors. As it may
2486 ** recycle a cursor from lsm_db.pCsrCache.
2487 */
lsmMCursorNew(lsm_db * pDb,MultiCursor ** ppCsr)2488 int lsmMCursorNew(
2489   lsm_db *pDb,                    /* Database handle */
2490   MultiCursor **ppCsr             /* OUT: Allocated cursor */
2491 ){
2492   MultiCursor *pCsr = 0;
2493   int rc = LSM_OK;
2494 
2495   if( pDb->pCsrCache ){
2496     int bOld;                     /* True if there is an old in-memory tree */
2497 
2498     /* Remove a cursor from the pCsrCache list and add it to the open list. */
2499     pCsr = pDb->pCsrCache;
2500     pDb->pCsrCache = pCsr->pNext;
2501     pCsr->pNext = pDb->pCsr;
2502     pDb->pCsr = pCsr;
2503 
2504     /* The cursor can almost be used as is, except that the old in-memory
2505     ** tree cursor may be present and not required, or required and not
2506     ** present. Fix this if required.  */
2507     bOld = (lsmTreeHasOld(pDb) && pDb->treehdr.iOldLog!=pDb->pClient->iLogOff);
2508     if( !bOld && pCsr->apTreeCsr[1] ){
2509       lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
2510       pCsr->apTreeCsr[1] = 0;
2511     }else if( bOld && !pCsr->apTreeCsr[1] ){
2512       rc = lsmTreeCursorNew(pDb, 1, &pCsr->apTreeCsr[1]);
2513     }
2514 
2515     pCsr->flags = (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
2516 
2517   }else{
2518     pCsr = multiCursorNew(pDb, &rc);
2519     if( rc==LSM_OK ) rc = multiCursorInit(pCsr, pDb->pClient);
2520   }
2521 
2522   if( rc!=LSM_OK ){
2523     lsmMCursorClose(pCsr, 0);
2524     pCsr = 0;
2525   }
2526   assert( (rc==LSM_OK)==(pCsr!=0) );
2527   *ppCsr = pCsr;
2528   return rc;
2529 }
2530 
multiCursorGetVal(MultiCursor * pCsr,int iVal,void ** ppVal,int * pnVal)2531 static int multiCursorGetVal(
2532   MultiCursor *pCsr,
2533   int iVal,
2534   void **ppVal,
2535   int *pnVal
2536 ){
2537   int rc = LSM_OK;
2538 
2539   *ppVal = 0;
2540   *pnVal = 0;
2541 
2542   switch( iVal ){
2543     case CURSOR_DATA_TREE0:
2544     case CURSOR_DATA_TREE1: {
2545       TreeCursor *pTreeCsr = pCsr->apTreeCsr[iVal-CURSOR_DATA_TREE0];
2546       if( lsmTreeCursorValid(pTreeCsr) ){
2547         lsmTreeCursorValue(pTreeCsr, ppVal, pnVal);
2548       }else{
2549         *ppVal = 0;
2550         *pnVal = 0;
2551       }
2552       break;
2553     }
2554 
2555     case CURSOR_DATA_SYSTEM: {
2556       Snapshot *pWorker = pCsr->pDb->pWorker;
2557       if( pWorker
2558        && (pCsr->iFree % 2)==0
2559        && pCsr->iFree < (pWorker->freelist.nEntry*2)
2560       ){
2561         int iEntry = pWorker->freelist.nEntry - 1 - (pCsr->iFree / 2);
2562         u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
2563         lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
2564         *ppVal = aVal;
2565         *pnVal = 8;
2566       }
2567       break;
2568     }
2569 
2570     default: {
2571       int iPtr = iVal-CURSOR_DATA_SEGMENT;
2572       if( iPtr<pCsr->nPtr ){
2573         SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2574         if( pPtr->pPg ){
2575           *ppVal = pPtr->pVal;
2576           *pnVal = pPtr->nVal;
2577         }
2578       }
2579     }
2580   }
2581 
2582   assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
2583   return rc;
2584 }
2585 
2586 static int multiCursorAdvance(MultiCursor *pCsr, int bReverse);
2587 
2588 /*
2589 ** This function is called by worker connections to walk the part of the
2590 ** free-list stored within the LSM data structure.
2591 */
lsmSortedWalkFreelist(lsm_db * pDb,int bReverse,int (* x)(void *,int,i64),void * pCtx)2592 int lsmSortedWalkFreelist(
2593   lsm_db *pDb,                    /* Database handle */
2594   int bReverse,                   /* True to iterate from largest to smallest */
2595   int (*x)(void *, int, i64),     /* Callback function */
2596   void *pCtx                      /* First argument to pass to callback */
2597 ){
2598   MultiCursor *pCsr;              /* Cursor used to read db */
2599   int rc = LSM_OK;                /* Return Code */
2600   Snapshot *pSnap = 0;
2601 
2602   assert( pDb->pWorker );
2603   if( pDb->bIncrMerge ){
2604     rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
2605     if( rc!=LSM_OK ) return rc;
2606   }else{
2607     pSnap = pDb->pWorker;
2608   }
2609 
2610   pCsr = multiCursorNew(pDb, &rc);
2611   if( pCsr ){
2612     rc = multiCursorAddAll(pCsr, pSnap);
2613     pCsr->flags |= CURSOR_IGNORE_DELETE;
2614   }
2615 
2616   if( rc==LSM_OK ){
2617     if( bReverse==0 ){
2618       rc = lsmMCursorLast(pCsr);
2619     }else{
2620       rc = lsmMCursorSeek(pCsr, 1, "", 0, LSM_SEEK_GE);
2621     }
2622 
2623     while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
2624       void *pKey; int nKey;
2625       void *pVal = 0; int nVal = 0;
2626 
2627       rc = lsmMCursorKey(pCsr, &pKey, &nKey);
2628       if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
2629       if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;
2630 
2631       if( rc==LSM_OK ){
2632         int iBlk;
2633         i64 iSnap;
2634         iBlk = (int)(~(lsmGetU32((u8 *)pKey)));
2635         iSnap = (i64)lsmGetU64((u8 *)pVal);
2636         if( x(pCtx, iBlk, iSnap) ) break;
2637         rc = multiCursorAdvance(pCsr, !bReverse);
2638       }
2639     }
2640   }
2641 
2642   lsmMCursorClose(pCsr, 0);
2643   if( pSnap!=pDb->pWorker ){
2644     lsmFreeSnapshot(pDb->pEnv, pSnap);
2645   }
2646 
2647   return rc;
2648 }
2649 
lsmSortedLoadFreelist(lsm_db * pDb,void ** ppVal,int * pnVal)2650 int lsmSortedLoadFreelist(
2651   lsm_db *pDb,                    /* Database handle (must be worker) */
2652   void **ppVal,                   /* OUT: Blob containing LSM free-list */
2653   int *pnVal                      /* OUT: Size of *ppVal blob in bytes */
2654 ){
2655   MultiCursor *pCsr;              /* Cursor used to retreive free-list */
2656   int rc = LSM_OK;                /* Return Code */
2657 
2658   assert( pDb->pWorker );
2659   assert( *ppVal==0 && *pnVal==0 );
2660 
2661   pCsr = multiCursorNew(pDb, &rc);
2662   if( pCsr ){
2663     rc = multiCursorAddAll(pCsr, pDb->pWorker);
2664     pCsr->flags |= CURSOR_IGNORE_DELETE;
2665   }
2666 
2667   if( rc==LSM_OK ){
2668     rc = lsmMCursorLast(pCsr);
2669     if( rc==LSM_OK
2670      && rtIsWrite(pCsr->eType) && rtIsSystem(pCsr->eType)
2671      && pCsr->key.nData==8
2672      && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
2673     ){
2674       void *pVal; int nVal;         /* Value read from database */
2675       rc = lsmMCursorValue(pCsr, &pVal, &nVal);
2676       if( rc==LSM_OK ){
2677         *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
2678         if( *ppVal ){
2679           memcpy(*ppVal, pVal, nVal);
2680           *pnVal = nVal;
2681         }
2682       }
2683     }
2684 
2685     lsmMCursorClose(pCsr, 0);
2686   }
2687 
2688   return rc;
2689 }
2690 
multiCursorAllocTree(MultiCursor * pCsr)2691 static int multiCursorAllocTree(MultiCursor *pCsr){
2692   int rc = LSM_OK;
2693   if( pCsr->aTree==0 ){
2694     int nByte;                    /* Bytes of space to allocate */
2695     int nMin;                     /* Total number of cursors being merged */
2696 
2697     nMin = CURSOR_DATA_SEGMENT + pCsr->nPtr + (pCsr->pBtCsr!=0);
2698     pCsr->nTree = 2;
2699     while( pCsr->nTree<nMin ){
2700       pCsr->nTree = pCsr->nTree*2;
2701     }
2702 
2703     nByte = sizeof(int)*pCsr->nTree*2;
2704     pCsr->aTree = (int *)lsmMallocZeroRc(pCsr->pDb->pEnv, nByte, &rc);
2705   }
2706   return rc;
2707 }
2708 
multiCursorCacheKey(MultiCursor * pCsr,int * pRc)2709 static void multiCursorCacheKey(MultiCursor *pCsr, int *pRc){
2710   if( *pRc==LSM_OK ){
2711     void *pKey;
2712     int nKey;
2713     multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
2714     *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
2715   }
2716 }
2717 
2718 #ifdef LSM_DEBUG_EXPENSIVE
assertCursorTree(MultiCursor * pCsr)2719 static void assertCursorTree(MultiCursor *pCsr){
2720   int bRev = !!(pCsr->flags & CURSOR_PREV_OK);
2721   int *aSave = pCsr->aTree;
2722   int nSave = pCsr->nTree;
2723   int rc;
2724 
2725   pCsr->aTree = 0;
2726   pCsr->nTree = 0;
2727   rc = multiCursorAllocTree(pCsr);
2728   if( rc==LSM_OK ){
2729     int i;
2730     for(i=pCsr->nTree-1; i>0; i--){
2731       multiCursorDoCompare(pCsr, i, bRev);
2732     }
2733 
2734     assert( nSave==pCsr->nTree
2735         && 0==memcmp(aSave, pCsr->aTree, sizeof(int)*nSave)
2736     );
2737 
2738     lsmFree(pCsr->pDb->pEnv, pCsr->aTree);
2739   }
2740 
2741   pCsr->aTree = aSave;
2742   pCsr->nTree = nSave;
2743 }
2744 #else
2745 # define assertCursorTree(x)
2746 #endif
2747 
mcursorLocationOk(MultiCursor * pCsr,int bDeleteOk)2748 static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
2749   int eType = pCsr->eType;
2750   int iKey;
2751   int i;
2752   int rdmask;
2753 
2754   assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
2755   assertCursorTree(pCsr);
2756 
2757   rdmask = (pCsr->flags & CURSOR_NEXT_OK) ? LSM_END_DELETE : LSM_START_DELETE;
2758 
2759   /* If the cursor does not currently point to an actual database key (i.e.
2760   ** it points to a delete key, or the start or end of a range-delete), and
2761   ** the CURSOR_IGNORE_DELETE flag is set, skip past this entry.  */
2762   if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
2763     if( (eType & LSM_INSERT)==0 ) return 0;
2764   }
2765 
2766   /* If the cursor points to a system key (free-list entry), and the
2767   ** CURSOR_IGNORE_SYSTEM flag is set, skip thie entry.  */
2768   if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
2769     return 0;
2770   }
2771 
2772 #ifndef NDEBUG
2773   /* This block fires assert() statements to check one of the assumptions
2774   ** in the comment below - that if the lhs sub-cursor of a level undergoing
2775   ** a merge is valid, then all the rhs sub-cursors must be at EOF.
2776   **
2777   ** Also assert that all rhs sub-cursors are either at EOF or point to
2778   ** a key that is not less than the level split-key.  */
2779   for(i=0; i<pCsr->nPtr; i++){
2780     SegmentPtr *pPtr = &pCsr->aPtr[i];
2781     Level *pLvl = pPtr->pLevel;
2782     if( pLvl->nRight && pPtr->pPg ){
2783       if( pPtr->pSeg==&pLvl->lhs ){
2784         int j;
2785         for(j=0; j<pLvl->nRight; j++) assert( pPtr[j+1].pPg==0 );
2786       }else{
2787         int res = sortedKeyCompare(pCsr->pDb->xCmp,
2788             rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
2789             pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
2790         );
2791         assert( res>=0 );
2792       }
2793     }
2794   }
2795 #endif
2796 
2797   /* Now check if this key has already been deleted by a range-delete. If
2798   ** so, skip past it.
2799   **
2800   ** Assume, for the moment, that the tree contains no levels currently
2801   ** undergoing incremental merge, and that this cursor is iterating forwards
2802   ** through the database keys. The cursor currently points to a key in
2803   ** level L. This key has already been deleted if any of the sub-cursors
2804   ** that point to levels newer than L (or to the in-memory tree) point to
2805   ** a key greater than the current key with the LSM_END_DELETE flag set.
2806   **
2807   ** Or, if the cursor is iterating backwards through data keys, if any
2808   ** such sub-cursor points to a key smaller than the current key with the
2809   ** LSM_START_DELETE flag set.
2810   **
2811   ** Why it works with levels undergoing a merge too:
2812   **
2813   ** When a cursor iterates forwards, the sub-cursors for the rhs of a
2814   ** level are only activated once the lhs reaches EOF. So when iterating
2815   ** forwards, the keys visited are the same as if the level was completely
2816   ** merged.
2817   **
2818   ** If the cursor is iterating backwards, then the lhs sub-cursor is not
2819   ** initialized until the last of the rhs sub-cursors has reached EOF.
2820   ** Additionally, if the START_DELETE flag is set on the last entry (in
2821   ** reverse order - so the entry with the smallest key) of a rhs sub-cursor,
2822   ** then a pseudo-key equal to the levels split-key with the END_DELETE
2823   ** flag set is visited by the sub-cursor.
2824   */
2825   iKey = pCsr->aTree[1];
2826   for(i=0; i<iKey; i++){
2827     int csrflags;
2828     multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
2829     if( (rdmask & csrflags) ){
2830       const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
2831       if( (csrflags & SD_ED)==SD_ED
2832        || (pCsr->flags & CURSOR_IGNORE_DELETE)==0
2833       ){
2834         void *pKey; int nKey;
2835         multiCursorGetKey(pCsr, i, 0, &pKey, &nKey);
2836         if( 0==sortedKeyCompare(pCsr->pDb->xCmp,
2837               rtTopic(eType), pCsr->key.pData, pCsr->key.nData,
2838               rtTopic(csrflags), pKey, nKey
2839         )){
2840           continue;
2841         }
2842       }
2843       return 0;
2844     }
2845   }
2846 
2847   /* The current cursor position is one this cursor should visit. Return 1. */
2848   return 1;
2849 }
2850 
multiCursorSetupTree(MultiCursor * pCsr,int bRev)2851 static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
2852   int rc;
2853 
2854   rc = multiCursorAllocTree(pCsr);
2855   if( rc==LSM_OK ){
2856     int i;
2857     for(i=pCsr->nTree-1; i>0; i--){
2858       multiCursorDoCompare(pCsr, i, bRev);
2859     }
2860   }
2861 
2862   assertCursorTree(pCsr);
2863   multiCursorCacheKey(pCsr, &rc);
2864 
2865   if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
2866     rc = multiCursorAdvance(pCsr, bRev);
2867   }
2868   return rc;
2869 }
2870 
2871 
multiCursorEnd(MultiCursor * pCsr,int bLast)2872 static int multiCursorEnd(MultiCursor *pCsr, int bLast){
2873   int rc = LSM_OK;
2874   int i;
2875 
2876   pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
2877   pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
2878   pCsr->iFree = 0;
2879 
2880   /* Position the two in-memory tree cursors */
2881   for(i=0; rc==LSM_OK && i<2; i++){
2882     if( pCsr->apTreeCsr[i] ){
2883       rc = lsmTreeCursorEnd(pCsr->apTreeCsr[i], bLast);
2884     }
2885   }
2886 
2887   for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
2888     SegmentPtr *pPtr = &pCsr->aPtr[i];
2889     Level *pLvl = pPtr->pLevel;
2890     int iRhs;
2891     int bHit = 0;
2892 
2893     if( bLast ){
2894       for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
2895         rc = segmentPtrEnd(pCsr, &pPtr[iRhs+1], 1);
2896         if( pPtr[iRhs+1].pPg ) bHit = 1;
2897       }
2898       if( bHit==0 && rc==LSM_OK ){
2899         rc = segmentPtrEnd(pCsr, pPtr, 1);
2900       }else{
2901         segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
2902       }
2903     }else{
2904       int bLhs = (pPtr->pSeg==&pLvl->lhs);
2905       assert( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[0] );
2906 
2907       if( bLhs ){
2908         rc = segmentPtrEnd(pCsr, pPtr, 0);
2909         if( pPtr->pKey ) bHit = 1;
2910       }
2911       for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
2912         if( bHit ){
2913           segmentPtrReset(&pPtr[iRhs+1], LSM_SEGMENTPTR_FREE_THRESHOLD);
2914         }else{
2915           rc = sortedRhsFirst(pCsr, pLvl, &pPtr[iRhs+bLhs]);
2916         }
2917       }
2918     }
2919     i += pLvl->nRight;
2920   }
2921 
2922   /* And the b-tree cursor, if applicable */
2923   if( rc==LSM_OK && pCsr->pBtCsr ){
2924     assert( bLast==0 );
2925     rc = btreeCursorFirst(pCsr->pBtCsr);
2926   }
2927 
2928   if( rc==LSM_OK ){
2929     rc = multiCursorSetupTree(pCsr, bLast);
2930   }
2931 
2932   return rc;
2933 }
2934 
2935 
mcursorSave(MultiCursor * pCsr)2936 int mcursorSave(MultiCursor *pCsr){
2937   int rc = LSM_OK;
2938   if( pCsr->aTree ){
2939     int iTree = pCsr->aTree[1];
2940     if( iTree==CURSOR_DATA_TREE0 || iTree==CURSOR_DATA_TREE1 ){
2941       multiCursorCacheKey(pCsr, &rc);
2942     }
2943   }
2944   mcursorFreeComponents(pCsr);
2945   return rc;
2946 }
2947 
mcursorRestore(lsm_db * pDb,MultiCursor * pCsr)2948 int mcursorRestore(lsm_db *pDb, MultiCursor *pCsr){
2949   int rc;
2950   rc = multiCursorInit(pCsr, pDb->pClient);
2951   if( rc==LSM_OK && pCsr->key.pData ){
2952     rc = lsmMCursorSeek(pCsr,
2953          rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, +1
2954     );
2955   }
2956   return rc;
2957 }
2958 
lsmSaveCursors(lsm_db * pDb)2959 int lsmSaveCursors(lsm_db *pDb){
2960   int rc = LSM_OK;
2961   MultiCursor *pCsr;
2962 
2963   for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
2964     rc = mcursorSave(pCsr);
2965   }
2966   return rc;
2967 }
2968 
lsmRestoreCursors(lsm_db * pDb)2969 int lsmRestoreCursors(lsm_db *pDb){
2970   int rc = LSM_OK;
2971   MultiCursor *pCsr;
2972 
2973   for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
2974     rc = mcursorRestore(pDb, pCsr);
2975   }
2976   return rc;
2977 }
2978 
lsmMCursorFirst(MultiCursor * pCsr)2979 int lsmMCursorFirst(MultiCursor *pCsr){
2980   return multiCursorEnd(pCsr, 0);
2981 }
2982 
lsmMCursorLast(MultiCursor * pCsr)2983 int lsmMCursorLast(MultiCursor *pCsr){
2984   return multiCursorEnd(pCsr, 1);
2985 }
2986 
lsmMCursorDb(MultiCursor * pCsr)2987 lsm_db *lsmMCursorDb(MultiCursor *pCsr){
2988   return pCsr->pDb;
2989 }
2990 
lsmMCursorReset(MultiCursor * pCsr)2991 void lsmMCursorReset(MultiCursor *pCsr){
2992   int i;
2993   lsmTreeCursorReset(pCsr->apTreeCsr[0]);
2994   lsmTreeCursorReset(pCsr->apTreeCsr[1]);
2995   for(i=0; i<pCsr->nPtr; i++){
2996     segmentPtrReset(&pCsr->aPtr[i], LSM_SEGMENTPTR_FREE_THRESHOLD);
2997   }
2998   pCsr->key.nData = 0;
2999 }
3000 
treeCursorSeek(MultiCursor * pCsr,TreeCursor * pTreeCsr,void * pKey,int nKey,int eSeek,int * pbStop)3001 static int treeCursorSeek(
3002   MultiCursor *pCsr,
3003   TreeCursor *pTreeCsr,
3004   void *pKey, int nKey,
3005   int eSeek,
3006   int *pbStop
3007 ){
3008   int rc = LSM_OK;
3009   if( pTreeCsr ){
3010     int res = 0;
3011     lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
3012     switch( eSeek ){
3013       case LSM_SEEK_EQ: {
3014         int eType = lsmTreeCursorFlags(pTreeCsr);
3015         if( (res<0 && (eType & LSM_START_DELETE))
3016          || (res>0 && (eType & LSM_END_DELETE))
3017          || (res==0 && (eType & LSM_POINT_DELETE))
3018         ){
3019           *pbStop = 1;
3020         }else if( res==0 && (eType & LSM_INSERT) ){
3021           lsm_env *pEnv = pCsr->pDb->pEnv;
3022           void *p; int n;         /* Key/value from tree-cursor */
3023           *pbStop = 1;
3024           pCsr->flags |= CURSOR_SEEK_EQ;
3025           rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n);
3026           if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->key, p, n);
3027           if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n);
3028           if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->val, p, n);
3029         }
3030         lsmTreeCursorReset(pTreeCsr);
3031         break;
3032       }
3033       case LSM_SEEK_GE:
3034         if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
3035           lsmTreeCursorNext(pTreeCsr);
3036         }
3037         break;
3038       default:
3039         if( res>0 ){
3040           assert( lsmTreeCursorValid(pTreeCsr) );
3041           lsmTreeCursorPrev(pTreeCsr);
3042         }
3043         break;
3044     }
3045   }
3046   return rc;
3047 }
3048 
3049 
3050 /*
3051 ** Seek the cursor.
3052 */
lsmMCursorSeek(MultiCursor * pCsr,int iTopic,void * pKey,int nKey,int eSeek)3053 int lsmMCursorSeek(
3054   MultiCursor *pCsr,
3055   int iTopic,
3056   void *pKey, int nKey,
3057   int eSeek
3058 ){
3059   int eESeek = eSeek;             /* Effective eSeek parameter */
3060   int bStop = 0;                  /* Set to true to halt search operation */
3061   int rc = LSM_OK;                /* Return code */
3062   int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
3063   LsmPgno iPgno = 0;              /* FC pointer value */
3064 
3065   assert( pCsr->apTreeCsr[0]==0 || iTopic==0 );
3066   assert( pCsr->apTreeCsr[1]==0 || iTopic==0 );
3067 
3068   if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
3069 
3070   assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
3071   assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 );
3072   assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );
3073 
3074   pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
3075   rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
3076   if( rc==LSM_OK && bStop==0 ){
3077     rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
3078   }
3079 
3080   /* Seek all segment pointers. */
3081   for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
3082     SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
3083     assert( pPtr->pSeg==&pPtr->pLevel->lhs );
3084     rc = seekInLevel(pCsr, pPtr, eESeek, iTopic, pKey, nKey, &iPgno, &bStop);
3085     iPtr += pPtr->pLevel->nRight;
3086   }
3087 
3088   if( eSeek!=LSM_SEEK_EQ ){
3089     if( rc==LSM_OK ){
3090       rc = multiCursorAllocTree(pCsr);
3091     }
3092     if( rc==LSM_OK ){
3093       int i;
3094       for(i=pCsr->nTree-1; i>0; i--){
3095         multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
3096       }
3097       if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
3098       if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
3099     }
3100 
3101     multiCursorCacheKey(pCsr, &rc);
3102     if( rc==LSM_OK && eSeek!=LSM_SEEK_LEFAST && 0==mcursorLocationOk(pCsr, 0) ){
3103       switch( eESeek ){
3104         case LSM_SEEK_EQ:
3105           lsmMCursorReset(pCsr);
3106           break;
3107         case LSM_SEEK_GE:
3108           rc = lsmMCursorNext(pCsr);
3109           break;
3110         default:
3111           rc = lsmMCursorPrev(pCsr);
3112           break;
3113       }
3114     }
3115   }
3116 
3117   return rc;
3118 }
3119 
lsmMCursorValid(MultiCursor * pCsr)3120 int lsmMCursorValid(MultiCursor *pCsr){
3121   int res = 0;
3122   if( pCsr->flags & CURSOR_SEEK_EQ ){
3123     res = 1;
3124   }else if( pCsr->aTree ){
3125     int iKey = pCsr->aTree[1];
3126     if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3127       res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
3128     }else{
3129       void *pKey;
3130       multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
3131       res = pKey!=0;
3132     }
3133   }
3134   return res;
3135 }
3136 
mcursorAdvanceOk(MultiCursor * pCsr,int bReverse,int * pRc)3137 static int mcursorAdvanceOk(
3138   MultiCursor *pCsr,
3139   int bReverse,
3140   int *pRc
3141 ){
3142   void *pNew;                     /* Pointer to buffer containing new key */
3143   int nNew;                       /* Size of buffer pNew in bytes */
3144   int eNewType;                   /* Type of new record */
3145 
3146   if( *pRc ) return 1;
3147 
3148   /* Check the current key value. If it is not greater than (if bReverse==0)
3149   ** or less than (if bReverse!=0) the key currently cached in pCsr->key,
3150   ** then the cursor has not yet been successfully advanced.
3151   */
3152   multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew);
3153   if( pNew ){
3154     int typemask = (pCsr->flags & CURSOR_IGNORE_DELETE) ? ~(0) : LSM_SYSTEMKEY;
3155     int res = sortedDbKeyCompare(pCsr,
3156       eNewType & typemask, pNew, nNew,
3157       pCsr->eType & typemask, pCsr->key.pData, pCsr->key.nData
3158     );
3159 
3160     if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){
3161       return 0;
3162     }
3163 
3164     multiCursorCacheKey(pCsr, pRc);
3165     assert( pCsr->eType==eNewType );
3166 
3167     /* If this cursor is configured to skip deleted keys, and the current
3168     ** cursor points to a SORTED_DELETE entry, then the cursor has not been
3169     ** successfully advanced.
3170     **
3171     ** Similarly, if the cursor is configured to skip system keys and the
3172     ** current cursor points to a system key, it has not yet been advanced.
3173     */
3174     if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
3175   }
3176   return 1;
3177 }
3178 
flCsrAdvance(MultiCursor * pCsr)3179 static void flCsrAdvance(MultiCursor *pCsr){
3180   assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
3181   if( pCsr->iFree % 2 ){
3182     pCsr->iFree++;
3183   }else{
3184     int nEntry = pCsr->pDb->pWorker->freelist.nEntry;
3185     FreelistEntry *aEntry = pCsr->pDb->pWorker->freelist.aEntry;
3186 
3187     int i = nEntry - 1 - (pCsr->iFree / 2);
3188 
3189     /* If the current entry is a delete and the "end-delete" key will not
3190     ** be attached to the next entry, increment iFree by 1 only. */
3191     if( aEntry[i].iId<0 ){
3192       while( 1 ){
3193         if( i==0 || aEntry[i-1].iBlk!=aEntry[i].iBlk-1 ){
3194           pCsr->iFree--;
3195           break;
3196         }
3197         if( aEntry[i-1].iId>=0 ) break;
3198         pCsr->iFree += 2;
3199         i--;
3200       }
3201     }
3202     pCsr->iFree += 2;
3203   }
3204 }
3205 
multiCursorAdvance(MultiCursor * pCsr,int bReverse)3206 static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
3207   int rc = LSM_OK;                /* Return Code */
3208   if( lsmMCursorValid(pCsr) ){
3209     do {
3210       int iKey = pCsr->aTree[1];
3211 
3212       assertCursorTree(pCsr);
3213 
3214       /* If this multi-cursor is advancing forwards, and the sub-cursor
3215       ** being advanced is the one that separator keys may be being read
3216       ** from, record the current absolute pointer value.  */
3217       if( pCsr->pPrevMergePtr ){
3218         if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
3219           assert( pCsr->pBtCsr );
3220           *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr;
3221         }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0
3222                && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1)
3223         ){
3224           SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT];
3225           *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr;
3226         }
3227       }
3228 
3229       if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3230         TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
3231         if( bReverse ){
3232           rc = lsmTreeCursorPrev(pTreeCsr);
3233         }else{
3234           rc = lsmTreeCursorNext(pTreeCsr);
3235         }
3236       }else if( iKey==CURSOR_DATA_SYSTEM ){
3237         assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
3238         assert( bReverse==0 );
3239         flCsrAdvance(pCsr);
3240       }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
3241         assert( bReverse==0 && pCsr->pBtCsr );
3242         rc = btreeCursorNext(pCsr->pBtCsr);
3243       }else{
3244         rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
3245       }
3246       if( rc==LSM_OK ){
3247         int i;
3248         for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
3249           multiCursorDoCompare(pCsr, i, bReverse);
3250         }
3251         assertCursorTree(pCsr);
3252       }
3253     }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
3254   }
3255   return rc;
3256 }
3257 
lsmMCursorNext(MultiCursor * pCsr)3258 int lsmMCursorNext(MultiCursor *pCsr){
3259   if( (pCsr->flags & CURSOR_NEXT_OK)==0 ) return LSM_MISUSE_BKPT;
3260   return multiCursorAdvance(pCsr, 0);
3261 }
3262 
lsmMCursorPrev(MultiCursor * pCsr)3263 int lsmMCursorPrev(MultiCursor *pCsr){
3264   if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
3265   return multiCursorAdvance(pCsr, 1);
3266 }
3267 
lsmMCursorKey(MultiCursor * pCsr,void ** ppKey,int * pnKey)3268 int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){
3269   if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
3270     *pnKey = pCsr->key.nData;
3271     *ppKey = pCsr->key.pData;
3272   }else{
3273     int iKey = pCsr->aTree[1];
3274 
3275     if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3276       TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
3277       lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
3278     }else{
3279       int nKey;
3280 
3281 #ifndef NDEBUG
3282       void *pKey;
3283       int eType;
3284       multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
3285       assert( eType==pCsr->eType );
3286       assert( nKey==pCsr->key.nData );
3287       assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
3288 #endif
3289 
3290       nKey = pCsr->key.nData;
3291       if( nKey==0 ){
3292         *ppKey = 0;
3293       }else{
3294         *ppKey = pCsr->key.pData;
3295       }
3296       *pnKey = nKey;
3297     }
3298   }
3299   return LSM_OK;
3300 }
3301 
3302 /*
3303 ** Compare the current key that cursor csr points to with pKey/nKey. Set
3304 ** *piRes to the result and return LSM_OK.
3305 */
lsm_csr_cmp(lsm_cursor * csr,const void * pKey,int nKey,int * piRes)3306 int lsm_csr_cmp(lsm_cursor *csr, const void *pKey, int nKey, int *piRes){
3307   MultiCursor *pCsr = (MultiCursor *)csr;
3308   void *pCsrkey; int nCsrkey;
3309   int rc;
3310   rc = lsmMCursorKey(pCsr, &pCsrkey, &nCsrkey);
3311   if( rc==LSM_OK ){
3312     int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
3313     *piRes = sortedKeyCompare(xCmp, 0, pCsrkey, nCsrkey, 0, (void *)pKey, nKey);
3314   }
3315   return rc;
3316 }
3317 
lsmMCursorValue(MultiCursor * pCsr,void ** ppVal,int * pnVal)3318 int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
3319   void *pVal;
3320   int nVal;
3321   int rc;
3322   if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
3323     rc = LSM_OK;
3324     nVal = pCsr->val.nData;
3325     pVal = pCsr->val.pData;
3326   }else{
3327 
3328     assert( pCsr->aTree );
3329     assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );
3330 
3331     rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
3332     if( pVal && rc==LSM_OK ){
3333       rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
3334       pVal = pCsr->val.pData;
3335     }
3336 
3337     if( rc!=LSM_OK ){
3338       pVal = 0;
3339       nVal = 0;
3340     }
3341   }
3342   *ppVal = pVal;
3343   *pnVal = nVal;
3344   return rc;
3345 }
3346 
lsmMCursorType(MultiCursor * pCsr,int * peType)3347 int lsmMCursorType(MultiCursor *pCsr, int *peType){
3348   assert( pCsr->aTree );
3349   multiCursorGetKey(pCsr, pCsr->aTree[1], peType, 0, 0);
3350   return LSM_OK;
3351 }
3352 
3353 /*
3354 ** Buffer aData[], size nData, is assumed to contain a valid b-tree
3355 ** hierarchy page image. Return the offset in aData[] of the next free
3356 ** byte in the data area (where a new cell may be written if there is
3357 ** space).
3358 */
mergeWorkerPageOffset(u8 * aData,int nData)3359 static int mergeWorkerPageOffset(u8 *aData, int nData){
3360   int nRec;
3361   int iOff;
3362   int nKey;
3363   int eType;
3364 
3365   nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
3366   iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]);
3367   eType = aData[iOff++];
3368   assert( eType==0
3369        || eType==(LSM_SYSTEMKEY|LSM_SEPARATOR)
3370        || eType==(LSM_SEPARATOR)
3371   );
3372 
3373   iOff += lsmVarintGet32(&aData[iOff], &nKey);
3374   iOff += lsmVarintGet32(&aData[iOff], &nKey);
3375 
3376   return iOff + (eType ? nKey : 0);
3377 }
3378 
3379 /*
3380 ** Following a checkpoint operation, database pages that are part of the
3381 ** checkpointed state of the LSM are deemed read-only. This includes the
3382 ** right-most page of the b-tree hierarchy of any separators array under
3383 ** construction, and all pages between it and the b-tree root, inclusive.
3384 ** This is a problem, as when further pages are appended to the separators
3385 ** array, entries must be added to the indicated b-tree hierarchy pages.
3386 **
3387 ** This function copies all such b-tree pages to new locations, so that
3388 ** they can be modified as required.
3389 **
3390 ** The complication is that not all database pages are the same size - due
3391 ** to the way the file.c module works some (the first and last in each block)
3392 ** are 4 bytes smaller than the others.
3393 */
mergeWorkerMoveHierarchy(MergeWorker * pMW,int bSep)3394 static int mergeWorkerMoveHierarchy(
3395   MergeWorker *pMW,               /* Merge worker */
3396   int bSep                        /* True for separators run */
3397 ){
3398   lsm_db *pDb = pMW->pDb;         /* Database handle */
3399   int rc = LSM_OK;                /* Return code */
3400   int i;
3401   Page **apHier = pMW->hier.apHier;
3402   int nHier = pMW->hier.nHier;
3403 
3404   for(i=0; rc==LSM_OK && i<nHier; i++){
3405     Page *pNew = 0;
3406     rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &pNew);
3407     assert( rc==LSM_OK );
3408 
3409     if( rc==LSM_OK ){
3410       u8 *a1; int n1;
3411       u8 *a2; int n2;
3412 
3413       a1 = fsPageData(pNew, &n1);
3414       a2 = fsPageData(apHier[i], &n2);
3415 
3416       assert( n1==n2 || n1+4==n2 );
3417 
3418       if( n1==n2 ){
3419         memcpy(a1, a2, n2);
3420       }else{
3421         int nEntry = pageGetNRec(a2, n2);
3422         int iEof1 = SEGMENT_EOF(n1, nEntry);
3423         int iEof2 = SEGMENT_EOF(n2, nEntry);
3424 
3425         memcpy(a1, a2, iEof2 - 4);
3426         memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
3427       }
3428 
3429       lsmFsPageRelease(apHier[i]);
3430       apHier[i] = pNew;
3431 
3432 #if 0
3433       assert( n1==n2 || n1+4==n2 || n2+4==n1 );
3434       if( n1>=n2 ){
3435         /* If n1 (size of the new page) is equal to or greater than n2 (the
3436         ** size of the old page), then copy the data into the new page. If
3437         ** n1==n2, this could be done with a single memcpy(). However,
3438         ** since sometimes n1>n2, the page content and footer must be copied
3439         ** separately. */
3440         int nEntry = pageGetNRec(a2, n2);
3441         int iEof1 = SEGMENT_EOF(n1, nEntry);
3442         int iEof2 = SEGMENT_EOF(n2, nEntry);
3443         memcpy(a1, a2, iEof2);
3444         memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
3445         lsmFsPageRelease(apHier[i]);
3446         apHier[i] = pNew;
3447       }else{
3448         lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG);
3449         lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0);
3450         lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], 0);
3451         i = i - 1;
3452         lsmFsPageRelease(pNew);
3453       }
3454 #endif
3455     }
3456   }
3457 
3458 #ifdef LSM_DEBUG
3459   if( rc==LSM_OK ){
3460     for(i=0; i<nHier; i++) assert( lsmFsPageWritable(apHier[i]) );
3461   }
3462 #endif
3463 
3464   return rc;
3465 }
3466 
3467 /*
3468 ** Allocate and populate the MergeWorker.apHier[] array.
3469 */
mergeWorkerLoadHierarchy(MergeWorker * pMW)3470 static int mergeWorkerLoadHierarchy(MergeWorker *pMW){
3471   int rc = LSM_OK;
3472   Segment *pSeg;
3473   Hierarchy *p;
3474 
3475   pSeg = &pMW->pLevel->lhs;
3476   p = &pMW->hier;
3477 
3478   if( p->apHier==0 && pSeg->iRoot!=0 ){
3479     FileSystem *pFS = pMW->pDb->pFS;
3480     lsm_env *pEnv = pMW->pDb->pEnv;
3481     Page **apHier = 0;
3482     int nHier = 0;
3483     int iPg = (int)pSeg->iRoot;
3484 
3485     do {
3486       Page *pPg = 0;
3487       u8 *aData;
3488       int nData;
3489       int flags;
3490 
3491       rc = lsmFsDbPageGet(pFS, pSeg, iPg, &pPg);
3492       if( rc!=LSM_OK ) break;
3493 
3494       aData = fsPageData(pPg, &nData);
3495       flags = pageGetFlags(aData, nData);
3496       if( flags&SEGMENT_BTREE_FLAG ){
3497         Page **apNew = (Page **)lsmRealloc(
3498             pEnv, apHier, sizeof(Page *)*(nHier+1)
3499         );
3500         if( apNew==0 ){
3501           rc = LSM_NOMEM_BKPT;
3502           break;
3503         }
3504         apHier = apNew;
3505         memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
3506         nHier++;
3507 
3508         apHier[0] = pPg;
3509         iPg = (int)pageGetPtr(aData, nData);
3510       }else{
3511         lsmFsPageRelease(pPg);
3512         break;
3513       }
3514     }while( 1 );
3515 
3516     if( rc==LSM_OK ){
3517       u8 *aData;
3518       int nData;
3519       aData = fsPageData(apHier[0], &nData);
3520       pMW->aSave[0].iPgno = pageGetPtr(aData, nData);
3521       p->nHier = nHier;
3522       p->apHier = apHier;
3523       rc = mergeWorkerMoveHierarchy(pMW, 0);
3524     }else{
3525       int i;
3526       for(i=0; i<nHier; i++){
3527         lsmFsPageRelease(apHier[i]);
3528       }
3529       lsmFree(pEnv, apHier);
3530     }
3531   }
3532 
3533   return rc;
3534 }
3535 
3536 /*
3537 ** B-tree pages use almost the same format as regular pages. The
3538 ** differences are:
3539 **
3540 **   1. The record format is (usually, see below) as follows:
3541 **
3542 **         + Type byte (always SORTED_SEPARATOR or SORTED_SYSTEM_SEPARATOR),
3543 **         + Absolute pointer value (varint),
3544 **         + Number of bytes in key (varint),
3545 **         + LsmBlob containing key data.
3546 **
3547 **   2. All pointer values are stored as absolute values (not offsets
3548 **      relative to the footer pointer value).
3549 **
3550 **   3. Each pointer that is part of a record points to a page that
3551 **      contains keys smaller than the records key (note: not "equal to or
3552 **      smaller than - smaller than").
3553 **
3554 **   4. The pointer in the page footer of a b-tree page points to a page
3555 **      that contains keys equal to or larger than the largest key on the
3556 **      b-tree page.
3557 **
3558 ** The reason for having the page footer pointer point to the right-child
3559 ** (instead of the left) is that doing things this way makes the
3560 ** mergeWorkerMoveHierarchy() operation less complicated (since the pointers
3561 ** that need to be updated are all stored as fixed-size integers within the
3562 ** page footer, not varints in page records).
3563 **
3564 ** Records may not span b-tree pages. If this function is called to add a
3565 ** record larger than (page-size / 4) bytes, then a pointer to the indexed
3566 ** array page that contains the main record is added to the b-tree instead.
3567 ** In this case the record format is:
3568 **
3569 **         + 0x00 byte (1 byte)
3570 **         + Absolute pointer value (varint),
3571 **         + Absolute page number of page containing key (varint).
3572 **
3573 ** See function seekInBtree() for the code that traverses b-tree pages.
3574 */
3575 
mergeWorkerBtreeWrite(MergeWorker * pMW,u8 eType,LsmPgno iPtr,LsmPgno iKeyPg,void * pKey,int nKey)3576 static int mergeWorkerBtreeWrite(
3577   MergeWorker *pMW,
3578   u8 eType,
3579   LsmPgno iPtr,
3580   LsmPgno iKeyPg,
3581   void *pKey,
3582   int nKey
3583 ){
3584   Hierarchy *p = &pMW->hier;
3585   lsm_db *pDb = pMW->pDb;         /* Database handle */
3586   int rc = LSM_OK;                /* Return Code */
3587   int iLevel;                     /* Level of b-tree hierachy to write to */
3588   int nData;                      /* Size of aData[] in bytes */
3589   u8 *aData;                      /* Page data for level iLevel */
3590   int iOff;                       /* Offset on b-tree page to write record to */
3591   int nRec;                       /* Initial number of records on b-tree page */
3592 
3593   /* iKeyPg should be zero for an ordinary b-tree key, or non-zero for an
3594   ** indirect key. The flags byte for an indirect key is 0x00.  */
3595   assert( (eType==0)==(iKeyPg!=0) );
3596 
3597   /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
3598   ** hierarchy, the root node, and all nodes that lie on the path between.
3599   ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
3600   ** root page.
3601   **
3602   ** This loop searches for a node with enough space to store the key on,
3603   ** starting with the leaf and iterating up towards the root. When the loop
3604   ** exits, the key may be written to apHier[iLevel].  */
3605   for(iLevel=0; iLevel<=p->nHier; iLevel++){
3606     int nByte;                    /* Number of free bytes required */
3607 
3608     if( iLevel==p->nHier ){
3609       /* Extend the array and allocate a new root page. */
3610       Page **aNew;
3611       aNew = (Page **)lsmRealloc(
3612           pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
3613       );
3614       if( !aNew ){
3615         return LSM_NOMEM_BKPT;
3616       }
3617       p->apHier = aNew;
3618     }else{
3619       Page *pOld;
3620       int nFree;
3621 
3622       /* If the key will fit on this page, break out of the loop here.
3623       ** The new entry will be written to page apHier[iLevel]. */
3624       pOld = p->apHier[iLevel];
3625       assert( lsmFsPageWritable(pOld) );
3626       aData = fsPageData(pOld, &nData);
3627       if( eType==0 ){
3628         nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32((int)iKeyPg);
3629       }else{
3630         nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32(nKey) + nKey;
3631       }
3632       nRec = pageGetNRec(aData, nData);
3633       nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
3634       if( nByte<=nFree ) break;
3635 
3636       /* Otherwise, this page is full. Set the right-hand-child pointer
3637       ** to iPtr and release it.  */
3638       lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
3639       assert( lsmFsPageNumber(pOld)==0 );
3640       rc = lsmFsPagePersist(pOld);
3641       if( rc==LSM_OK ){
3642         iPtr = lsmFsPageNumber(pOld);
3643         lsmFsPageRelease(pOld);
3644       }
3645     }
3646 
3647     /* Allocate a new page for apHier[iLevel]. */
3648     p->apHier[iLevel] = 0;
3649     if( rc==LSM_OK ){
3650       rc = lsmFsSortedAppend(
3651           pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &p->apHier[iLevel]
3652       );
3653     }
3654     if( rc!=LSM_OK ) return rc;
3655 
3656     aData = fsPageData(p->apHier[iLevel], &nData);
3657     memset(aData, 0, nData);
3658     lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
3659     lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
3660 
3661     if( iLevel==p->nHier ){
3662       p->nHier++;
3663       break;
3664     }
3665   }
3666 
3667   /* Write the key into page apHier[iLevel]. */
3668   aData = fsPageData(p->apHier[iLevel], &nData);
3669   iOff = mergeWorkerPageOffset(aData, nData);
3670   nRec = pageGetNRec(aData, nData);
3671   lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
3672   lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
3673   if( eType==0 ){
3674     aData[iOff++] = 0x00;
3675     iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
3676     iOff += lsmVarintPut32(&aData[iOff], (int)iKeyPg);
3677   }else{
3678     aData[iOff++] = eType;
3679     iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
3680     iOff += lsmVarintPut32(&aData[iOff], nKey);
3681     memcpy(&aData[iOff], pKey, nKey);
3682   }
3683 
3684   return rc;
3685 }
3686 
mergeWorkerBtreeIndirect(MergeWorker * pMW)3687 static int mergeWorkerBtreeIndirect(MergeWorker *pMW){
3688   int rc = LSM_OK;
3689   if( pMW->iIndirect ){
3690     LsmPgno iKeyPg = pMW->aSave[1].iPgno;
3691     rc = mergeWorkerBtreeWrite(pMW, 0, pMW->iIndirect, iKeyPg, 0, 0);
3692     pMW->iIndirect = 0;
3693   }
3694   return rc;
3695 }
3696 
3697 /*
3698 ** Append the database key (iTopic/pKey/nKey) to the b-tree under
3699 ** construction. This key has not yet been written to a segment page.
3700 ** The pointer that will accompany the new key in the b-tree - that
3701 ** points to the completed segment page that contains keys smaller than
3702 ** (pKey/nKey) is currently stored in pMW->aSave[0].iPgno.
3703 */
mergeWorkerPushHierarchy(MergeWorker * pMW,int iTopic,void * pKey,int nKey)3704 static int mergeWorkerPushHierarchy(
3705   MergeWorker *pMW,               /* Merge worker object */
3706   int iTopic,                     /* Topic value for this key */
3707   void *pKey,                     /* Pointer to key buffer */
3708   int nKey                        /* Size of pKey buffer in bytes */
3709 ){
3710   int rc = LSM_OK;                /* Return Code */
3711   LsmPgno iPtr;                   /* Pointer value to accompany pKey/nKey */
3712 
3713   assert( pMW->aSave[0].bStore==0 );
3714   assert( pMW->aSave[1].bStore==0 );
3715   rc = mergeWorkerBtreeIndirect(pMW);
3716 
3717   /* Obtain the absolute pointer value to store along with the key in the
3718   ** page body. This pointer points to a page that contains keys that are
3719   ** smaller than pKey/nKey.  */
3720   iPtr = pMW->aSave[0].iPgno;
3721   assert( iPtr!=0 );
3722 
3723   /* Determine if the indirect format should be used. */
3724   if( (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)) ){
3725     pMW->iIndirect = iPtr;
3726     pMW->aSave[1].bStore = 1;
3727   }else{
3728     rc = mergeWorkerBtreeWrite(
3729         pMW, (u8)(iTopic | LSM_SEPARATOR), iPtr, 0, pKey, nKey
3730     );
3731   }
3732 
3733   /* Ensure that the SortedRun.iRoot field is correct. */
3734   return rc;
3735 }
3736 
mergeWorkerFinishHierarchy(MergeWorker * pMW)3737 static int mergeWorkerFinishHierarchy(
3738   MergeWorker *pMW                /* Merge worker object */
3739 ){
3740   int i;                          /* Used to loop through apHier[] */
3741   int rc = LSM_OK;                /* Return code */
3742   LsmPgno iPtr;                   /* New right-hand-child pointer value */
3743 
3744   iPtr = pMW->aSave[0].iPgno;
3745   for(i=0; i<pMW->hier.nHier && rc==LSM_OK; i++){
3746     Page *pPg = pMW->hier.apHier[i];
3747     int nData;                    /* Size of aData[] in bytes */
3748     u8 *aData;                    /* Page data for pPg */
3749 
3750     aData = fsPageData(pPg, &nData);
3751     lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
3752 
3753     rc = lsmFsPagePersist(pPg);
3754     iPtr = lsmFsPageNumber(pPg);
3755     lsmFsPageRelease(pPg);
3756   }
3757 
3758   if( pMW->hier.nHier ){
3759     pMW->pLevel->lhs.iRoot = iPtr;
3760     lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
3761     pMW->hier.apHier = 0;
3762     pMW->hier.nHier = 0;
3763   }
3764 
3765   return rc;
3766 }
3767 
mergeWorkerAddPadding(MergeWorker * pMW)3768 static int mergeWorkerAddPadding(
3769   MergeWorker *pMW                /* Merge worker object */
3770 ){
3771   FileSystem *pFS = pMW->pDb->pFS;
3772   return lsmFsSortedPadding(pFS, pMW->pDb->pWorker, &pMW->pLevel->lhs);
3773 }
3774 
3775 /*
3776 ** Release all page references currently held by the merge-worker passed
3777 ** as the only argument. Unless an error has occurred, all pages have
3778 ** already been released.
3779 */
mergeWorkerReleaseAll(MergeWorker * pMW)3780 static void mergeWorkerReleaseAll(MergeWorker *pMW){
3781   int i;
3782   lsmFsPageRelease(pMW->pPage);
3783   pMW->pPage = 0;
3784 
3785   for(i=0; i<pMW->hier.nHier; i++){
3786     lsmFsPageRelease(pMW->hier.apHier[i]);
3787     pMW->hier.apHier[i] = 0;
3788   }
3789   lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
3790   pMW->hier.apHier = 0;
3791   pMW->hier.nHier = 0;
3792 }
3793 
keyszToSkip(FileSystem * pFS,int nKey)3794 static int keyszToSkip(FileSystem *pFS, int nKey){
3795   int nPgsz;                /* Nominal database page size */
3796   nPgsz = lsmFsPageSize(pFS);
3797   return LSM_MIN(((nKey * 4) / nPgsz), 3);
3798 }
3799 
3800 /*
3801 ** Release the reference to the current output page of merge-worker *pMW
3802 ** (reference pMW->pPage). Set the page number values in aSave[] as
3803 ** required (see comments above struct MergeWorker for details).
3804 */
mergeWorkerPersistAndRelease(MergeWorker * pMW)3805 static int mergeWorkerPersistAndRelease(MergeWorker *pMW){
3806   int rc;
3807   int i;
3808 
3809   assert( pMW->pPage || (pMW->aSave[0].bStore==0 && pMW->aSave[1].bStore==0) );
3810 
3811   /* Persist the page */
3812   rc = lsmFsPagePersist(pMW->pPage);
3813 
3814   /* If required, save the page number. */
3815   for(i=0; i<2; i++){
3816     if( pMW->aSave[i].bStore ){
3817       pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
3818       pMW->aSave[i].bStore = 0;
3819     }
3820   }
3821 
3822   /* Release the completed output page. */
3823   lsmFsPageRelease(pMW->pPage);
3824   pMW->pPage = 0;
3825   return rc;
3826 }
3827 
3828 /*
3829 ** Advance to the next page of an output run being populated by merge-worker
3830 ** pMW. The footer of the new page is initialized to indicate that it contains
3831 ** zero records. The flags field is cleared. The page footer pointer field
3832 ** is set to iFPtr.
3833 **
3834 ** If successful, LSM_OK is returned. Otherwise, an error code.
3835 */
mergeWorkerNextPage(MergeWorker * pMW,LsmPgno iFPtr)3836 static int mergeWorkerNextPage(
3837   MergeWorker *pMW,               /* Merge worker object to append page to */
3838   LsmPgno iFPtr                   /* Pointer value for footer of new page */
3839 ){
3840   int rc = LSM_OK;                /* Return code */
3841   Page *pNext = 0;                /* New page appended to run */
3842   lsm_db *pDb = pMW->pDb;         /* Database handle */
3843 
3844   rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 0, &pNext);
3845   assert( rc || pMW->pLevel->lhs.iFirst>0 || pMW->pDb->compress.xCompress );
3846 
3847   if( rc==LSM_OK ){
3848     u8 *aData;                    /* Data buffer belonging to page pNext */
3849     int nData;                    /* Size of aData[] in bytes */
3850 
3851     rc = mergeWorkerPersistAndRelease(pMW);
3852 
3853     pMW->pPage = pNext;
3854     pMW->pLevel->pMerge->iOutputOff = 0;
3855     aData = fsPageData(pNext, &nData);
3856     lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
3857     lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
3858     lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);
3859     pMW->nWork++;
3860   }
3861 
3862   return rc;
3863 }
3864 
3865 /*
3866 ** Write a blob of data into an output segment being populated by a
3867 ** merge-worker object. If argument bSep is true, write into the separators
3868 ** array. Otherwise, the main array.
3869 **
3870 ** This function is used to write the blobs of data for keys and values.
3871 */
mergeWorkerData(MergeWorker * pMW,int bSep,int iFPtr,u8 * aWrite,int nWrite)3872 static int mergeWorkerData(
3873   MergeWorker *pMW,               /* Merge worker object */
3874   int bSep,                       /* True to write to separators run */
3875   int iFPtr,                      /* Footer ptr for new pages */
3876   u8 *aWrite,                     /* Write data from this buffer */
3877   int nWrite                      /* Size of aWrite[] in bytes */
3878 ){
3879   int rc = LSM_OK;                /* Return code */
3880   int nRem = nWrite;              /* Number of bytes still to write */
3881 
3882   while( rc==LSM_OK && nRem>0 ){
3883     Merge *pMerge = pMW->pLevel->pMerge;
3884     int nCopy;                    /* Number of bytes to copy */
3885     u8 *aData;                    /* Pointer to buffer of current output page */
3886     int nData;                    /* Size of aData[] in bytes */
3887     int nRec;                     /* Number of records on current output page */
3888     int iOff;                     /* Offset in aData[] to write to */
3889 
3890     assert( lsmFsPageWritable(pMW->pPage) );
3891 
3892     aData = fsPageData(pMW->pPage, &nData);
3893     nRec = pageGetNRec(aData, nData);
3894     iOff = pMerge->iOutputOff;
3895     nCopy = LSM_MIN(nRem, SEGMENT_EOF(nData, nRec) - iOff);
3896 
3897     memcpy(&aData[iOff], &aWrite[nWrite-nRem], nCopy);
3898     nRem -= nCopy;
3899 
3900     if( nRem>0 ){
3901       rc = mergeWorkerNextPage(pMW, iFPtr);
3902     }else{
3903       pMerge->iOutputOff = iOff + nCopy;
3904     }
3905   }
3906 
3907   return rc;
3908 }
3909 
3910 
3911 /*
3912 ** The MergeWorker passed as the only argument is working to merge two or
3913 ** more existing segments together (not to flush an in-memory tree). It
3914 ** has not yet written the first key to the first page of the output.
3915 */
mergeWorkerFirstPage(MergeWorker * pMW)3916 static int mergeWorkerFirstPage(MergeWorker *pMW){
3917   int rc = LSM_OK;                /* Return code */
3918   Page *pPg = 0;                  /* First page of run pSeg */
3919   int iFPtr = 0;                  /* Pointer value read from footer of pPg */
3920   MultiCursor *pCsr = pMW->pCsr;
3921 
3922   assert( pMW->pPage==0 );
3923 
3924   if( pCsr->pBtCsr ){
3925     rc = LSM_OK;
3926     iFPtr = (int)pMW->pLevel->pNext->lhs.iFirst;
3927   }else if( pCsr->nPtr>0 ){
3928     Segment *pSeg;
3929     pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
3930     rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg);
3931     if( rc==LSM_OK ){
3932       u8 *aData;                    /* Buffer for page pPg */
3933       int nData;                    /* Size of aData[] in bytes */
3934       aData = fsPageData(pPg, &nData);
3935       iFPtr = (int)pageGetPtr(aData, nData);
3936       lsmFsPageRelease(pPg);
3937     }
3938   }
3939 
3940   if( rc==LSM_OK ){
3941     rc = mergeWorkerNextPage(pMW, iFPtr);
3942     if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
3943     pMW->aSave[0].bStore = 1;
3944   }
3945 
3946   return rc;
3947 }
3948 
mergeWorkerWrite(MergeWorker * pMW,int eType,void * pKey,int nKey,void * pVal,int nVal,int iPtr)3949 static int mergeWorkerWrite(
3950   MergeWorker *pMW,               /* Merge worker object to write into */
3951   int eType,                      /* One of SORTED_SEPARATOR, WRITE or DELETE */
3952   void *pKey, int nKey,           /* Key value */
3953   void *pVal, int nVal,           /* Value value */
3954   int iPtr                        /* Absolute value of page pointer, or 0 */
3955 ){
3956   int rc = LSM_OK;                /* Return code */
3957   Merge *pMerge;                  /* Persistent part of level merge state */
3958   int nHdr;                       /* Space required for this record header */
3959   Page *pPg;                      /* Page to write to */
3960   u8 *aData;                      /* Data buffer for page pWriter->pPage */
3961   int nData = 0;                  /* Size of buffer aData[] in bytes */
3962   int nRec = 0;                   /* Number of records on page pPg */
3963   int iFPtr = 0;                  /* Value of pointer in footer of pPg */
3964   int iRPtr = 0;                  /* Value of pointer written into record */
3965   int iOff = 0;                   /* Current write offset within page pPg */
3966   Segment *pSeg;                  /* Segment being written */
3967   int flags = 0;                  /* If != 0, flags value for page footer */
3968   int bFirst = 0;                 /* True for first key of output run */
3969 
3970   pMerge = pMW->pLevel->pMerge;
3971   pSeg = &pMW->pLevel->lhs;
3972 
3973   if( pSeg->iFirst==0 && pMW->pPage==0 ){
3974     rc = mergeWorkerFirstPage(pMW);
3975     bFirst = 1;
3976   }
3977   pPg = pMW->pPage;
3978   if( pPg ){
3979     aData = fsPageData(pPg, &nData);
3980     nRec = pageGetNRec(aData, nData);
3981     iFPtr = (int)pageGetPtr(aData, nData);
3982     iRPtr = iPtr - iFPtr;
3983   }
3984 
3985   /* Figure out how much space is required by the new record. The space
3986   ** required is divided into two sections: the header and the body. The
3987   ** header consists of the intial varint fields. The body are the blobs
3988   ** of data that correspond to the key and value data. The entire header
3989   ** must be stored on the page. The body may overflow onto the next and
3990   ** subsequent pages.
3991   **
3992   ** The header space is:
3993   **
3994   **     1) record type - 1 byte.
3995   **     2) Page-pointer-offset - 1 varint
3996   **     3) Key size - 1 varint
3997   **     4) Value size - 1 varint (only if LSM_INSERT flag is set)
3998   */
3999   if( rc==LSM_OK ){
4000     nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
4001     if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);
4002 
4003     /* If the entire header will not fit on page pPg, or if page pPg is
4004     ** marked read-only, advance to the next page of the output run. */
4005     iOff = pMerge->iOutputOff;
4006     if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
4007       if( iOff>=0 && pPg ){
4008         /* Zero any free space on the page */
4009         assert( aData );
4010         memset(&aData[iOff], 0, SEGMENT_EOF(nData, nRec)-iOff);
4011       }
4012       iFPtr = (int)*pMW->pCsr->pPrevMergePtr;
4013       iRPtr = iPtr - iFPtr;
4014       iOff = 0;
4015       nRec = 0;
4016       rc = mergeWorkerNextPage(pMW, iFPtr);
4017       pPg = pMW->pPage;
4018     }
4019   }
4020 
4021   /* If this record header will be the first on the page, and the page is
4022   ** not the very first in the entire run, add a copy of the key to the
4023   ** b-tree hierarchy.
4024   */
4025   if( rc==LSM_OK && nRec==0 && bFirst==0 ){
4026     assert( pMerge->nSkip>=0 );
4027 
4028     if( pMerge->nSkip==0 ){
4029       rc = mergeWorkerPushHierarchy(pMW, rtTopic(eType), pKey, nKey);
4030       assert( pMW->aSave[0].bStore==0 );
4031       pMW->aSave[0].bStore = 1;
4032       pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
4033     }else{
4034       pMerge->nSkip--;
4035       flags = PGFTR_SKIP_THIS_FLAG;
4036     }
4037 
4038     if( pMerge->nSkip ) flags |= PGFTR_SKIP_NEXT_FLAG;
4039   }
4040 
4041   /* Update the output segment */
4042   if( rc==LSM_OK ){
4043     aData = fsPageData(pPg, &nData);
4044 
4045     /* Update the page footer. */
4046     lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
4047     lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
4048     if( flags ) lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], (u16)flags);
4049 
4050     /* Write the entry header into the current page. */
4051     aData[iOff++] = (u8)eType;                                           /* 1 */
4052     iOff += lsmVarintPut32(&aData[iOff], iRPtr);                         /* 2 */
4053     iOff += lsmVarintPut32(&aData[iOff], nKey);                          /* 3 */
4054     if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal);   /* 4 */
4055     pMerge->iOutputOff = iOff;
4056 
4057     /* Write the key and data into the segment. */
4058     assert( iFPtr==pageGetPtr(aData, nData) );
4059     rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pKey, nKey);
4060     if( rc==LSM_OK && rtIsWrite(eType) ){
4061       if( rc==LSM_OK ){
4062         rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pVal, nVal);
4063       }
4064     }
4065   }
4066 
4067   return rc;
4068 }
4069 
4070 
4071 /*
4072 ** Free all resources allocated by mergeWorkerInit().
4073 */
mergeWorkerShutdown(MergeWorker * pMW,int * pRc)4074 static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){
4075   int i;                          /* Iterator variable */
4076   int rc = *pRc;
4077   MultiCursor *pCsr = pMW->pCsr;
4078 
4079   /* Unless the merge has finished, save the cursor position in the
4080   ** Merge.aInput[] array. See function mergeWorkerInit() for the
4081   ** code to restore a cursor position based on aInput[].  */
4082   if( rc==LSM_OK && pCsr ){
4083     Merge *pMerge = pMW->pLevel->pMerge;
4084     if( lsmMCursorValid(pCsr) ){
4085       int bBtree = (pCsr->pBtCsr!=0);
4086       int iPtr;
4087 
4088       /* pMerge->nInput==0 indicates that this is a FlushTree() operation. */
4089       assert( pMerge->nInput==0 || pMW->pLevel->nRight>0 );
4090       assert( pMerge->nInput==0 || pMerge->nInput==(pCsr->nPtr+bBtree) );
4091 
4092       for(i=0; i<(pMerge->nInput-bBtree); i++){
4093         SegmentPtr *pPtr = &pCsr->aPtr[i];
4094         if( pPtr->pPg ){
4095           pMerge->aInput[i].iPg = lsmFsPageNumber(pPtr->pPg);
4096           pMerge->aInput[i].iCell = pPtr->iCell;
4097         }else{
4098           pMerge->aInput[i].iPg = 0;
4099           pMerge->aInput[i].iCell = 0;
4100         }
4101       }
4102       if( bBtree && pMerge->nInput ){
4103         assert( i==pCsr->nPtr );
4104         btreeCursorPosition(pCsr->pBtCsr, &pMerge->aInput[i]);
4105       }
4106 
4107       /* Store the location of the split-key */
4108       iPtr = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
4109       if( iPtr<pCsr->nPtr ){
4110         pMerge->splitkey = pMerge->aInput[iPtr];
4111       }else{
4112         btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
4113       }
4114     }
4115 
4116     /* Zero any free space left on the final page. This helps with
4117     ** compression if using a compression hook. And prevents valgrind
4118     ** from complaining about uninitialized byte passed to write(). */
4119     if( pMW->pPage ){
4120       int nData;
4121       u8 *aData = fsPageData(pMW->pPage, &nData);
4122       int iOff = pMerge->iOutputOff;
4123       int iEof = SEGMENT_EOF(nData, pageGetNRec(aData, nData));
4124       memset(&aData[iOff], 0, iEof - iOff);
4125     }
4126 
4127     pMerge->iOutputOff = -1;
4128   }
4129 
4130   lsmMCursorClose(pCsr, 0);
4131 
4132   /* Persist and release the output page. */
4133   if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
4134   if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
4135   if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
4136   if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW);
4137   lsmFsFlushWaiting(pMW->pDb->pFS, &rc);
4138   mergeWorkerReleaseAll(pMW);
4139 
4140   lsmFree(pMW->pDb->pEnv, pMW->aGobble);
4141   pMW->aGobble = 0;
4142   pMW->pCsr = 0;
4143 
4144   *pRc = rc;
4145 }
4146 
4147 /*
4148 ** The cursor passed as the first argument is being used as the input for
4149 ** a merge operation. When this function is called, *piFlags contains the
4150 ** database entry flags for the current entry. The entry about to be written
4151 ** to the output.
4152 **
4153 ** Note that this function only has to work for cursors configured to
4154 ** iterate forwards (not backwards).
4155 */
mergeRangeDeletes(MultiCursor * pCsr,int * piVal,int * piFlags)4156 static void mergeRangeDeletes(MultiCursor *pCsr, int *piVal, int *piFlags){
4157   int f = *piFlags;
4158   int iKey = pCsr->aTree[1];
4159   int i;
4160 
4161   assert( pCsr->flags & CURSOR_NEXT_OK );
4162   if( pCsr->flags & CURSOR_IGNORE_DELETE ){
4163     /* The ignore-delete flag is set when the output of the merge will form
4164     ** the oldest level in the database. In this case there is no point in
4165     ** retaining any range-delete flags.  */
4166     assert( (f & LSM_POINT_DELETE)==0 );
4167     f &= ~(LSM_START_DELETE|LSM_END_DELETE);
4168   }else{
4169     for(i=0; i<(CURSOR_DATA_SEGMENT + pCsr->nPtr); i++){
4170       if( i!=iKey ){
4171         int eType;
4172         void *pKey;
4173         int nKey;
4174         int res;
4175         multiCursorGetKey(pCsr, i, &eType, &pKey, &nKey);
4176 
4177         if( pKey ){
4178           res = sortedKeyCompare(pCsr->pDb->xCmp,
4179               rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData,
4180               rtTopic(eType), pKey, nKey
4181           );
4182           assert( res<=0 );
4183           if( res==0 ){
4184             if( (f & (LSM_INSERT|LSM_POINT_DELETE))==0 ){
4185               if( eType & LSM_INSERT ){
4186                 f |= LSM_INSERT;
4187                 *piVal = i;
4188               }
4189               else if( eType & LSM_POINT_DELETE ){
4190                 f |= LSM_POINT_DELETE;
4191               }
4192             }
4193             f |= (eType & (LSM_END_DELETE|LSM_START_DELETE));
4194           }
4195 
4196           if( i>iKey && (eType & LSM_END_DELETE) && res<0 ){
4197             if( f & (LSM_INSERT|LSM_POINT_DELETE) ){
4198               f |= (LSM_END_DELETE|LSM_START_DELETE);
4199             }else{
4200               f = 0;
4201             }
4202             break;
4203           }
4204         }
4205       }
4206     }
4207 
4208     assert( (f & LSM_INSERT)==0 || (f & LSM_POINT_DELETE)==0 );
4209     if( (f & LSM_START_DELETE)
4210      && (f & LSM_END_DELETE)
4211      && (f & LSM_POINT_DELETE )
4212     ){
4213       f = 0;
4214     }
4215   }
4216 
4217   *piFlags = f;
4218 }
4219 
mergeWorkerStep(MergeWorker * pMW)4220 static int mergeWorkerStep(MergeWorker *pMW){
4221   lsm_db *pDb = pMW->pDb;       /* Database handle */
4222   MultiCursor *pCsr;            /* Cursor to read input data from */
4223   int rc = LSM_OK;              /* Return code */
4224   int eType;                    /* SORTED_SEPARATOR, WRITE or DELETE */
4225   void *pKey; int nKey;         /* Key */
4226   LsmPgno iPtr;
4227   int iVal;
4228 
4229   pCsr = pMW->pCsr;
4230 
4231   /* Pull the next record out of the source cursor. */
4232   lsmMCursorKey(pCsr, &pKey, &nKey);
4233   eType = pCsr->eType;
4234 
4235   /* Figure out if the output record may have a different pointer value
4236   ** than the previous. This is the case if the current key is identical to
4237   ** a key that appears in the lowest level run being merged. If so, set
4238   ** iPtr to the absolute pointer value. If not, leave iPtr set to zero,
4239   ** indicating that the output pointer value should be a copy of the pointer
4240   ** value written with the previous key.  */
4241   iPtr = (pCsr->pPrevMergePtr ? *pCsr->pPrevMergePtr : 0);
4242   if( pCsr->pBtCsr ){
4243     BtreeCursor *pBtCsr = pCsr->pBtCsr;
4244     if( pBtCsr->pKey ){
4245       int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
4246       if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
4247       if( 0==res ) iPtr = pBtCsr->iPtr;
4248       assert( res>=0 );
4249     }
4250   }else if( pCsr->nPtr ){
4251     SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1];
4252     if( pPtr->pPg
4253      && 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey)
4254     ){
4255       iPtr = pPtr->iPtr+pPtr->iPgPtr;
4256     }
4257   }
4258 
4259   iVal = pCsr->aTree[1];
4260   mergeRangeDeletes(pCsr, &iVal, &eType);
4261 
4262   if( eType!=0 ){
4263     if( pMW->aGobble ){
4264       int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
4265       if( iGobble<pCsr->nPtr && iGobble>=0 ){
4266         SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
4267         if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
4268           pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
4269         }
4270       }
4271     }
4272 
4273     /* If this is a separator key and we know that the output pointer has not
4274     ** changed, there is no point in writing an output record. Otherwise,
4275     ** proceed. */
4276     if( rc==LSM_OK && (rtIsSeparator(eType)==0 || iPtr!=0) ){
4277       /* Write the record into the main run. */
4278       void *pVal; int nVal;
4279       rc = multiCursorGetVal(pCsr, iVal, &pVal, &nVal);
4280       if( pVal && rc==LSM_OK ){
4281         assert( nVal>=0 );
4282         rc = sortedBlobSet(pDb->pEnv, &pCsr->val, pVal, nVal);
4283         pVal = pCsr->val.pData;
4284       }
4285       if( rc==LSM_OK ){
4286         rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pVal, nVal, (int)iPtr);
4287       }
4288     }
4289   }
4290 
4291   /* Advance the cursor to the next input record (assuming one exists). */
4292   assert( lsmMCursorValid(pMW->pCsr) );
4293   if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
4294 
4295   return rc;
4296 }
4297 
mergeWorkerDone(MergeWorker * pMW)4298 static int mergeWorkerDone(MergeWorker *pMW){
4299   return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
4300 }
4301 
sortedFreeLevel(lsm_env * pEnv,Level * p)4302 static void sortedFreeLevel(lsm_env *pEnv, Level *p){
4303   if( p ){
4304     lsmFree(pEnv, p->pSplitKey);
4305     lsmFree(pEnv, p->pMerge);
4306     lsmFree(pEnv, p->aRhs);
4307     lsmFree(pEnv, p);
4308   }
4309 }
4310 
sortedInvokeWorkHook(lsm_db * pDb)4311 static void sortedInvokeWorkHook(lsm_db *pDb){
4312   if( pDb->xWork ){
4313     pDb->xWork(pDb, pDb->pWorkCtx);
4314   }
4315 }
4316 
sortedNewToplevel(lsm_db * pDb,int eTree,int * pnWrite)4317 static int sortedNewToplevel(
4318   lsm_db *pDb,                    /* Connection handle */
4319   int eTree,                      /* One of the TREE_XXX constants */
4320   int *pnWrite                    /* OUT: Number of database pages written */
4321 ){
4322   int rc = LSM_OK;                /* Return Code */
4323   MultiCursor *pCsr = 0;
4324   Level *pNext = 0;               /* The current top level */
4325   Level *pNew;                    /* The new level itself */
4326   Segment *pLinked = 0;           /* Delete separators from this segment */
4327   Level *pDel = 0;                /* Delete this entire level */
4328   int nWrite = 0;                 /* Number of database pages written */
4329   Freelist freelist;
4330 
4331   if( eTree!=TREE_NONE ){
4332     rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
4333   }
4334 
4335   assert( pDb->bUseFreelist==0 );
4336   pDb->pFreelist = &freelist;
4337   pDb->bUseFreelist = 1;
4338   memset(&freelist, 0, sizeof(freelist));
4339 
4340   /* Allocate the new level structure to write to. */
4341   pNext = lsmDbSnapshotLevel(pDb->pWorker);
4342   pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
4343   if( pNew ){
4344     pNew->pNext = pNext;
4345     lsmDbSnapshotSetLevel(pDb->pWorker, pNew);
4346   }
4347 
4348   /* Create a cursor to gather the data required by the new segment. The new
4349   ** segment contains everything in the tree and pointers to the next segment
4350   ** in the database (if any).  */
4351   pCsr = multiCursorNew(pDb, &rc);
4352   if( pCsr ){
4353     pCsr->pDb = pDb;
4354     rc = multiCursorVisitFreelist(pCsr);
4355     if( rc==LSM_OK ){
4356       rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
4357     }
4358     if( rc==LSM_OK && pNext && pNext->pMerge==0 ){
4359       if( (pNext->flags & LEVEL_FREELIST_ONLY) ){
4360         pDel = pNext;
4361         pCsr->aPtr = lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr), &rc);
4362         multiCursorAddOne(pCsr, pNext, &rc);
4363       }else if( eTree!=TREE_NONE && pNext->lhs.iRoot ){
4364         pLinked = &pNext->lhs;
4365         rc = btreeCursorNew(pDb, pLinked, &pCsr->pBtCsr);
4366       }
4367     }
4368 
4369     /* If this will be the only segment in the database, discard any delete
4370     ** markers present in the in-memory tree.  */
4371     if( pNext==0 ){
4372       multiCursorIgnoreDelete(pCsr);
4373     }
4374   }
4375 
4376   if( rc!=LSM_OK ){
4377     lsmMCursorClose(pCsr, 0);
4378   }else{
4379     LsmPgno iLeftPtr = 0;
4380     Merge merge;                  /* Merge object used to create new level */
4381     MergeWorker mergeworker;      /* MergeWorker object for the same purpose */
4382 
4383     memset(&merge, 0, sizeof(Merge));
4384     memset(&mergeworker, 0, sizeof(MergeWorker));
4385 
4386     pNew->pMerge = &merge;
4387     pNew->flags |= LEVEL_INCOMPLETE;
4388     mergeworker.pDb = pDb;
4389     mergeworker.pLevel = pNew;
4390     mergeworker.pCsr = pCsr;
4391     pCsr->pPrevMergePtr = &iLeftPtr;
4392 
4393     /* Mark the separators array for the new level as a "phantom". */
4394     mergeworker.bFlush = 1;
4395 
4396     /* Do the work to create the new merged segment on disk */
4397     if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
4398     while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
4399       rc = mergeWorkerStep(&mergeworker);
4400     }
4401     mergeWorkerShutdown(&mergeworker, &rc);
4402     assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst );
4403     if( rc==LSM_OK && pNew->lhs.iFirst ){
4404       rc = lsmFsSortedFinish(pDb->pFS, &pNew->lhs);
4405     }
4406     nWrite = mergeworker.nWork;
4407     pNew->flags &= ~LEVEL_INCOMPLETE;
4408     if( eTree==TREE_NONE ){
4409       pNew->flags |= LEVEL_FREELIST_ONLY;
4410     }
4411     pNew->pMerge = 0;
4412   }
4413 
4414   if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
4415     assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
4416     lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
4417     sortedFreeLevel(pDb->pEnv, pNew);
4418   }else{
4419     if( pLinked ){
4420       pLinked->iRoot = 0;
4421     }else if( pDel ){
4422       assert( pNew->pNext==pDel );
4423       pNew->pNext = pDel->pNext;
4424       lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
4425       sortedFreeLevel(pDb->pEnv, pDel);
4426     }
4427 
4428 #if LSM_LOG_STRUCTURE
4429     lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "new-toplevel");
4430 #endif
4431 
4432     if( freelist.nEntry ){
4433       Freelist *p = &pDb->pWorker->freelist;
4434       lsmFree(pDb->pEnv, p->aEntry);
4435       memcpy(p, &freelist, sizeof(freelist));
4436       freelist.aEntry = 0;
4437     }else{
4438       pDb->pWorker->freelist.nEntry = 0;
4439     }
4440 
4441     assertBtreeOk(pDb, &pNew->lhs);
4442     sortedInvokeWorkHook(pDb);
4443   }
4444 
4445   if( pnWrite ) *pnWrite = nWrite;
4446   pDb->pWorker->nWrite += nWrite;
4447   pDb->pFreelist = 0;
4448   pDb->bUseFreelist = 0;
4449   lsmFree(pDb->pEnv, freelist.aEntry);
4450   return rc;
4451 }
4452 
4453 /*
4454 ** The nMerge levels in the LSM beginning with pLevel consist of a
4455 ** left-hand-side segment only. Replace these levels with a single new
4456 ** level consisting of a new empty segment on the left-hand-side and the
4457 ** nMerge segments from the replaced levels on the right-hand-side.
4458 **
4459 ** Also, allocate and populate a Merge object and set Level.pMerge to
4460 ** point to it.
4461 */
sortedMergeSetup(lsm_db * pDb,Level * pLevel,int nMerge,Level ** ppNew)4462 static int sortedMergeSetup(
4463   lsm_db *pDb,                    /* Database handle */
4464   Level *pLevel,                  /* First level to merge */
4465   int nMerge,                     /* Merge this many levels together */
4466   Level **ppNew                   /* New, merged, level */
4467 ){
4468   int rc = LSM_OK;                /* Return Code */
4469   Level *pNew;                    /* New Level object */
4470   int bUseNext = 0;               /* True to link in next separators */
4471   Merge *pMerge;                  /* New Merge object */
4472   int nByte;                      /* Bytes of space allocated at pMerge */
4473 
4474 #ifdef LSM_DEBUG
4475   int iLevel;
4476   Level *pX = pLevel;
4477   for(iLevel=0; iLevel<nMerge; iLevel++){
4478     assert( pX->nRight==0 );
4479     pX = pX->pNext;
4480   }
4481 #endif
4482 
4483   /* Allocate the new Level object */
4484   pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
4485   if( pNew ){
4486     pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv,
4487                                         nMerge * sizeof(Segment), &rc);
4488   }
4489 
4490   /* Populate the new Level object */
4491   if( rc==LSM_OK ){
4492     Level *pNext = 0;             /* Level following pNew */
4493     int i;
4494     int bFreeOnly = 1;
4495     Level *pTopLevel;
4496     Level *p = pLevel;
4497     Level **pp;
4498     pNew->nRight = nMerge;
4499     pNew->iAge = pLevel->iAge+1;
4500     for(i=0; i<nMerge; i++){
4501       assert( p->nRight==0 );
4502       pNext = p->pNext;
4503       pNew->aRhs[i] = p->lhs;
4504       if( (p->flags & LEVEL_FREELIST_ONLY)==0 ) bFreeOnly = 0;
4505       sortedFreeLevel(pDb->pEnv, p);
4506       p = pNext;
4507     }
4508 
4509     if( bFreeOnly ) pNew->flags |= LEVEL_FREELIST_ONLY;
4510 
4511     /* Replace the old levels with the new. */
4512     pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4513     pNew->pNext = p;
4514     for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext));
4515     *pp = pNew;
4516     lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel);
4517 
4518     /* Determine whether or not the next separators will be linked in */
4519     if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot && pNext
4520      && (bFreeOnly==0 || (pNext->flags & LEVEL_FREELIST_ONLY))
4521     ){
4522       bUseNext = 1;
4523     }
4524   }
4525 
4526   /* Allocate the merge object */
4527   nByte = sizeof(Merge) + sizeof(MergeInput) * (nMerge + bUseNext);
4528   pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
4529   if( pMerge ){
4530     pMerge->aInput = (MergeInput *)&pMerge[1];
4531     pMerge->nInput = nMerge + bUseNext;
4532     pNew->pMerge = pMerge;
4533   }
4534 
4535   *ppNew = pNew;
4536   return rc;
4537 }
4538 
mergeWorkerInit(lsm_db * pDb,Level * pLevel,MergeWorker * pMW)4539 static int mergeWorkerInit(
4540   lsm_db *pDb,                    /* Db connection to do merge work */
4541   Level *pLevel,                  /* Level to work on merging */
4542   MergeWorker *pMW                /* Object to initialize */
4543 ){
4544   int rc = LSM_OK;                /* Return code */
4545   Merge *pMerge = pLevel->pMerge; /* Persistent part of merge state */
4546   MultiCursor *pCsr = 0;          /* Cursor opened for pMW */
4547   Level *pNext = pLevel->pNext;   /* Next level in LSM */
4548 
4549   assert( pDb->pWorker );
4550   assert( pLevel->pMerge );
4551   assert( pLevel->nRight>0 );
4552 
4553   memset(pMW, 0, sizeof(MergeWorker));
4554   pMW->pDb = pDb;
4555   pMW->pLevel = pLevel;
4556   pMW->aGobble = lsmMallocZeroRc(pDb->pEnv, sizeof(LsmPgno)*pLevel->nRight,&rc);
4557 
4558   /* Create a multi-cursor to read the data to write to the new
4559   ** segment. The new segment contains:
4560   **
4561   **   1. Records from LHS of each of the nMerge levels being merged.
4562   **   2. Separators from either the last level being merged, or the
4563   **      separators attached to the LHS of the following level, or neither.
4564   **
4565   ** If the new level is the lowest (oldest) in the db, discard any
4566   ** delete keys. Key annihilation.
4567   */
4568   pCsr = multiCursorNew(pDb, &rc);
4569   if( pCsr ){
4570     pCsr->flags |= CURSOR_NEXT_OK;
4571     rc = multiCursorAddRhs(pCsr, pLevel);
4572   }
4573   if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){
4574     rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr);
4575   }else if( pNext ){
4576     multiCursorReadSeparators(pCsr);
4577   }else{
4578     multiCursorIgnoreDelete(pCsr);
4579   }
4580 
4581   assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nPtr+(pCsr->pBtCsr!=0)) );
4582   pMW->pCsr = pCsr;
4583 
4584   /* Load the b-tree hierarchy into memory. */
4585   if( rc==LSM_OK ) rc = mergeWorkerLoadHierarchy(pMW);
4586   if( rc==LSM_OK && pMW->hier.nHier==0 ){
4587     pMW->aSave[0].iPgno = pLevel->lhs.iFirst;
4588   }
4589 
4590   /* Position the cursor. */
4591   if( rc==LSM_OK ){
4592     pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
4593     if( pLevel->lhs.iFirst==0 ){
4594       /* The output array is still empty. So position the cursor at the very
4595       ** start of the input.  */
4596       rc = multiCursorEnd(pCsr, 0);
4597     }else{
4598       /* The output array is non-empty. Position the cursor based on the
4599       ** page/cell data saved in the Merge.aInput[] array.  */
4600       int i;
4601       for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
4602         MergeInput *pInput = &pMerge->aInput[i];
4603         if( pInput->iPg ){
4604           SegmentPtr *pPtr;
4605           assert( pCsr->aPtr[i].pPg==0 );
4606           pPtr = &pCsr->aPtr[i];
4607           rc = segmentPtrLoadPage(pDb->pFS, pPtr, (int)pInput->iPg);
4608           if( rc==LSM_OK && pPtr->nCell>0 ){
4609             rc = segmentPtrLoadCell(pPtr, pInput->iCell);
4610           }
4611         }
4612       }
4613 
4614       if( rc==LSM_OK && pCsr->pBtCsr ){
4615         int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
4616         assert( i==pCsr->nPtr );
4617         rc = btreeCursorRestore(pCsr->pBtCsr, xCmp, &pMerge->aInput[i]);
4618       }
4619 
4620       if( rc==LSM_OK ){
4621         rc = multiCursorSetupTree(pCsr, 0);
4622       }
4623     }
4624     pCsr->flags |= CURSOR_NEXT_OK;
4625   }
4626 
4627   return rc;
4628 }
4629 
sortedBtreeGobble(lsm_db * pDb,MultiCursor * pCsr,int iGobble)4630 static int sortedBtreeGobble(
4631   lsm_db *pDb,                    /* Worker connection */
4632   MultiCursor *pCsr,              /* Multi-cursor being used for a merge */
4633   int iGobble                     /* pCsr->aPtr[] entry to operate on */
4634 ){
4635   int rc = LSM_OK;
4636   if( rtTopic(pCsr->eType)==0 ){
4637     Segment *pSeg = pCsr->aPtr[iGobble].pSeg;
4638     LsmPgno *aPg;
4639     int nPg;
4640 
4641     /* Seek from the root of the b-tree to the segment leaf that may contain
4642     ** a key equal to the one multi-cursor currently points to. Record the
4643     ** page number of each b-tree page and the leaf. The segment may be
4644     ** gobbled up to (but not including) the first of these page numbers.
4645     */
4646     assert( pSeg->iRoot>0 );
4647     aPg = lsmMallocZeroRc(pDb->pEnv, sizeof(LsmPgno)*32, &rc);
4648     if( rc==LSM_OK ){
4649       rc = seekInBtree(pCsr, pSeg,
4650           rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, aPg, 0
4651       );
4652     }
4653 
4654     if( rc==LSM_OK ){
4655       for(nPg=0; aPg[nPg]; nPg++);
4656       lsmFsGobble(pDb, pSeg, aPg, nPg);
4657     }
4658 
4659     lsmFree(pDb->pEnv, aPg);
4660   }
4661   return rc;
4662 }
4663 
4664 /*
4665 ** Argument p points to a level of age N. Return the number of levels in
4666 ** the linked list starting at p that have age=N (always at least 1).
4667 */
sortedCountLevels(Level * p)4668 static int sortedCountLevels(Level *p){
4669   int iAge = p->iAge;
4670   int nRet = 0;
4671   do {
4672     nRet++;
4673     p = p->pNext;
4674   }while( p && p->iAge==iAge );
4675   return nRet;
4676 }
4677 
sortedSelectLevel(lsm_db * pDb,int nMerge,Level ** ppOut)4678 static int sortedSelectLevel(lsm_db *pDb, int nMerge, Level **ppOut){
4679   Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4680   int rc = LSM_OK;
4681   Level *pLevel = 0;            /* Output value */
4682   Level *pBest = 0;             /* Best level to work on found so far */
4683   int nBest;                    /* Number of segments merged at pBest */
4684   Level *pThis = 0;             /* First in run of levels with age=iAge */
4685   int nThis = 0;                /* Number of levels starting at pThis */
4686 
4687   assert( nMerge>=1 );
4688   nBest = LSM_MAX(1, nMerge-1);
4689 
4690   /* Find the longest contiguous run of levels not currently undergoing a
4691   ** merge with the same age in the structure. Or the level being merged
4692   ** with the largest number of right-hand segments. Work on it. */
4693   for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
4694     if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
4695       nThis++;
4696     }else{
4697       if( nThis>nBest ){
4698         if( (pLevel->iAge!=pThis->iAge+1)
4699          || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
4700         ){
4701           pBest = pThis;
4702           nBest = nThis;
4703         }
4704       }
4705       if( pLevel->nRight ){
4706         if( pLevel->nRight>nBest ){
4707           nBest = pLevel->nRight;
4708           pBest = pLevel;
4709         }
4710         nThis = 0;
4711         pThis = 0;
4712       }else{
4713         pThis = pLevel;
4714         nThis = 1;
4715       }
4716     }
4717   }
4718   if( nThis>nBest ){
4719     assert( pThis );
4720     pBest = pThis;
4721     nBest = nThis;
4722   }
4723 
4724   if( pBest==0 && nMerge==1 ){
4725     int nFree = 0;
4726     int nUsr = 0;
4727     for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
4728       assert( !pLevel->nRight );
4729       if( pLevel->flags & LEVEL_FREELIST_ONLY ){
4730         nFree++;
4731       }else{
4732         nUsr++;
4733       }
4734     }
4735     if( nUsr>1 ){
4736       pBest = pTopLevel;
4737       nBest = nFree + nUsr;
4738     }
4739   }
4740 
4741   if( pBest ){
4742     if( pBest->nRight==0 ){
4743       rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);
4744     }else{
4745       *ppOut = pBest;
4746     }
4747   }
4748 
4749   return rc;
4750 }
4751 
sortedDbIsFull(lsm_db * pDb)4752 static int sortedDbIsFull(lsm_db *pDb){
4753   Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);
4754 
4755   if( lsmDatabaseFull(pDb) ) return 1;
4756   if( pTop && pTop->iAge==0
4757    && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
4758   ){
4759     return 1;
4760   }
4761   return 0;
4762 }
4763 
4764 typedef struct MoveBlockCtx MoveBlockCtx;
4765 struct MoveBlockCtx {
4766   int iSeen;                      /* Previous free block on list */
4767   int iFrom;                      /* Total number of blocks in file */
4768 };
4769 
moveBlockCb(void * pCtx,int iBlk,i64 iSnapshot)4770 static int moveBlockCb(void *pCtx, int iBlk, i64 iSnapshot){
4771   MoveBlockCtx *p = (MoveBlockCtx *)pCtx;
4772   assert( p->iFrom==0 );
4773   if( iBlk==(p->iSeen-1) ){
4774     p->iSeen = iBlk;
4775     return 0;
4776   }
4777   p->iFrom = p->iSeen-1;
4778   return 1;
4779 }
4780 
4781 /*
4782 ** This function is called to further compact a database for which all
4783 ** of the content has already been merged into a single segment. If
4784 ** possible, it moves the contents of a single block from the end of the
4785 ** file to a free-block that lies closer to the start of the file (allowing
4786 ** the file to be eventually truncated).
4787 */
sortedMoveBlock(lsm_db * pDb,int * pnWrite)4788 static int sortedMoveBlock(lsm_db *pDb, int *pnWrite){
4789   Snapshot *p = pDb->pWorker;
4790   Level *pLvl = lsmDbSnapshotLevel(p);
4791   int iFrom;                      /* Block to move */
4792   int iTo;                        /* Destination to move block to */
4793   int rc;                         /* Return code */
4794 
4795   MoveBlockCtx sCtx;
4796 
4797   assert( pLvl->pNext==0 && pLvl->nRight==0 );
4798   assert( p->redirect.n<=LSM_MAX_BLOCK_REDIRECTS );
4799 
4800   *pnWrite = 0;
4801 
4802   /* Check that the redirect array is not already full. If it is, return
4803   ** without moving any database content.  */
4804   if( p->redirect.n>=LSM_MAX_BLOCK_REDIRECTS ) return LSM_OK;
4805 
4806   /* Find the last block of content in the database file. Do this by
4807   ** traversing the free-list in reverse (descending block number) order.
4808   ** The first block not on the free list is the one that will be moved.
4809   ** Since the db consists of a single segment, there is no ambiguity as
4810   ** to which segment the block belongs to.  */
4811   sCtx.iSeen = p->nBlock+1;
4812   sCtx.iFrom = 0;
4813   rc = lsmWalkFreelist(pDb, 1, moveBlockCb, &sCtx);
4814   if( rc!=LSM_OK || sCtx.iFrom==0 ) return rc;
4815   iFrom = sCtx.iFrom;
4816 
4817   /* Find the first free block in the database, ignoring block 1. Block
4818   ** 1 is tricky as it is smaller than the other blocks.  */
4819   rc = lsmBlockAllocate(pDb, iFrom, &iTo);
4820   if( rc!=LSM_OK || iTo==0 ) return rc;
4821   assert( iTo!=1 && iTo<iFrom );
4822 
4823   rc = lsmFsMoveBlock(pDb->pFS, &pLvl->lhs, iTo, iFrom);
4824   if( rc==LSM_OK ){
4825     if( p->redirect.a==0 ){
4826       int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS;
4827       p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
4828     }
4829     if( rc==LSM_OK ){
4830 
4831       /* Check if the block just moved was already redirected. */
4832       int i;
4833       for(i=0; i<p->redirect.n; i++){
4834         if( p->redirect.a[i].iTo==iFrom ) break;
4835       }
4836 
4837       if( i==p->redirect.n ){
4838         /* Block iFrom was not already redirected. Add a new array entry. */
4839         memmove(&p->redirect.a[1], &p->redirect.a[0],
4840             sizeof(struct RedirectEntry) * p->redirect.n
4841             );
4842         p->redirect.a[0].iFrom = iFrom;
4843         p->redirect.a[0].iTo = iTo;
4844         p->redirect.n++;
4845       }else{
4846         /* Block iFrom was already redirected. Overwrite existing entry. */
4847         p->redirect.a[i].iTo = iTo;
4848       }
4849 
4850       rc = lsmBlockFree(pDb, iFrom);
4851 
4852       *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
4853       pLvl->lhs.pRedirect = &p->redirect;
4854     }
4855   }
4856 
4857 #if LSM_LOG_STRUCTURE
4858   if( rc==LSM_OK ){
4859     char aBuf[64];
4860     sprintf(aBuf, "move-block %d/%d", p->redirect.n-1, LSM_MAX_BLOCK_REDIRECTS);
4861     lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, aBuf);
4862   }
4863 #endif
4864   return rc;
4865 }
4866 
4867 /*
4868 */
mergeInsertFreelistSegments(lsm_db * pDb,int nFree,MergeWorker * pMW)4869 static int mergeInsertFreelistSegments(
4870   lsm_db *pDb,
4871   int nFree,
4872   MergeWorker *pMW
4873 ){
4874   int rc = LSM_OK;
4875   if( nFree>0 ){
4876     MultiCursor *pCsr = pMW->pCsr;
4877     Level *pLvl = pMW->pLevel;
4878     SegmentPtr *aNew1;
4879     Segment *aNew2;
4880 
4881     Level *pIter;
4882     Level *pNext;
4883     int i = 0;
4884 
4885     aNew1 = (SegmentPtr *)lsmMallocZeroRc(
4886         pDb->pEnv, sizeof(SegmentPtr) * (pCsr->nPtr+nFree), &rc
4887     );
4888     if( rc ) return rc;
4889     memcpy(&aNew1[nFree], pCsr->aPtr, sizeof(SegmentPtr)*pCsr->nPtr);
4890     pCsr->nPtr += nFree;
4891     lsmFree(pDb->pEnv, pCsr->aTree);
4892     lsmFree(pDb->pEnv, pCsr->aPtr);
4893     pCsr->aTree = 0;
4894     pCsr->aPtr = aNew1;
4895 
4896     aNew2 = (Segment *)lsmMallocZeroRc(
4897         pDb->pEnv, sizeof(Segment) * (pLvl->nRight+nFree), &rc
4898     );
4899     if( rc ) return rc;
4900     memcpy(&aNew2[nFree], pLvl->aRhs, sizeof(Segment)*pLvl->nRight);
4901     pLvl->nRight += nFree;
4902     lsmFree(pDb->pEnv, pLvl->aRhs);
4903     pLvl->aRhs = aNew2;
4904 
4905     for(pIter=pDb->pWorker->pLevel; rc==LSM_OK && pIter!=pLvl; pIter=pNext){
4906       Segment *pSeg = &pLvl->aRhs[i];
4907       memcpy(pSeg, &pIter->lhs, sizeof(Segment));
4908 
4909       pCsr->aPtr[i].pSeg = pSeg;
4910       pCsr->aPtr[i].pLevel = pLvl;
4911       rc = segmentPtrEnd(pCsr, &pCsr->aPtr[i], 0);
4912 
4913       pDb->pWorker->pLevel = pNext = pIter->pNext;
4914       sortedFreeLevel(pDb->pEnv, pIter);
4915       i++;
4916     }
4917     assert( i==nFree );
4918     assert( rc!=LSM_OK || pDb->pWorker->pLevel==pLvl );
4919 
4920     for(i=nFree; i<pCsr->nPtr; i++){
4921       pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
4922     }
4923 
4924     lsmFree(pDb->pEnv, pMW->aGobble);
4925     pMW->aGobble = 0;
4926   }
4927   return rc;
4928 }
4929 
sortedWork(lsm_db * pDb,int nWork,int nMerge,int bFlush,int * pnWrite)4930 static int sortedWork(
4931   lsm_db *pDb,                    /* Database handle. Must be worker. */
4932   int nWork,                      /* Number of pages of work to do */
4933   int nMerge,                     /* Try to merge this many levels at once */
4934   int bFlush,                     /* Set if call is to make room for a flush */
4935   int *pnWrite                    /* OUT: Actual number of pages written */
4936 ){
4937   int rc = LSM_OK;                /* Return Code */
4938   int nRemaining = nWork;         /* Units of work to do before returning */
4939   Snapshot *pWorker = pDb->pWorker;
4940 
4941   assert( pWorker );
4942   if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;
4943 
4944   while( nRemaining>0 ){
4945     Level *pLevel = 0;
4946 
4947     /* Find a level to work on. */
4948     rc = sortedSelectLevel(pDb, nMerge, &pLevel);
4949     assert( rc==LSM_OK || pLevel==0 );
4950 
4951     if( pLevel==0 ){
4952       int nDone = 0;
4953       Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4954       if( bFlush==0 && nMerge==1 && pTopLevel && pTopLevel->pNext==0 ){
4955         rc = sortedMoveBlock(pDb, &nDone);
4956       }
4957       nRemaining -= nDone;
4958 
4959       /* Could not find any work to do. Finished. */
4960       if( nDone==0 ) break;
4961     }else{
4962       int bSave = 0;
4963       Freelist freelist = {0, 0, 0};
4964       MergeWorker mergeworker;    /* State used to work on the level merge */
4965 
4966       assert( pDb->bIncrMerge==0 );
4967       assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
4968 
4969       pDb->bIncrMerge = 1;
4970       rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
4971       assert( mergeworker.nWork==0 );
4972 
4973       while( rc==LSM_OK
4974           && 0==mergeWorkerDone(&mergeworker)
4975           && (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
4976       ){
4977         int eType = rtTopic(mergeworker.pCsr->eType);
4978         rc = mergeWorkerStep(&mergeworker);
4979 
4980         /* If the cursor now points at the first entry past the end of the
4981         ** user data (i.e. either to EOF or to the first free-list entry
4982         ** that will be added to the run), then check if it is possible to
4983         ** merge in any free-list entries that are either in-memory or in
4984         ** free-list-only blocks.  */
4985         if( rc==LSM_OK && nMerge==1 && eType==0
4986          && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
4987         ){
4988           int nFree = 0;          /* Number of free-list-only levels to merge */
4989           Level *pLvl;
4990           assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
4991 
4992           /* Now check if all levels containing data newer than this one
4993           ** are single-segment free-list only levels. If so, they will be
4994           ** merged in now.  */
4995           for(pLvl=pDb->pWorker->pLevel;
4996               pLvl!=mergeworker.pLevel && (pLvl->flags & LEVEL_FREELIST_ONLY);
4997               pLvl=pLvl->pNext
4998           ){
4999             assert( pLvl->nRight==0 );
5000             nFree++;
5001           }
5002           if( pLvl==mergeworker.pLevel ){
5003 
5004             rc = mergeInsertFreelistSegments(pDb, nFree, &mergeworker);
5005             if( rc==LSM_OK ){
5006               rc = multiCursorVisitFreelist(mergeworker.pCsr);
5007             }
5008             if( rc==LSM_OK ){
5009               rc = multiCursorSetupTree(mergeworker.pCsr, 0);
5010               pDb->pFreelist = &freelist;
5011               pDb->bUseFreelist = 1;
5012             }
5013           }
5014         }
5015       }
5016       nRemaining -= LSM_MAX(mergeworker.nWork, 1);
5017 
5018       if( rc==LSM_OK ){
5019         /* Check if the merge operation is completely finished. If not,
5020         ** gobble up (declare eligible for recycling) any pages from rhs
5021         ** segments for which the content has been completely merged into
5022         ** the lhs of the level.  */
5023         if( mergeWorkerDone(&mergeworker)==0 ){
5024           int i;
5025           for(i=0; i<pLevel->nRight; i++){
5026             SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
5027             if( pGobble->pSeg->iRoot ){
5028               rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
5029             }else if( mergeworker.aGobble[i] ){
5030               lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
5031             }
5032           }
5033         }else{
5034           int i;
5035           int bEmpty;
5036           mergeWorkerShutdown(&mergeworker, &rc);
5037           bEmpty = (pLevel->lhs.iFirst==0);
5038 
5039           if( bEmpty==0 && rc==LSM_OK ){
5040             rc = lsmFsSortedFinish(pDb->pFS, &pLevel->lhs);
5041           }
5042 
5043           if( pDb->bUseFreelist ){
5044             Freelist *p = &pDb->pWorker->freelist;
5045             lsmFree(pDb->pEnv, p->aEntry);
5046             memcpy(p, &freelist, sizeof(freelist));
5047             pDb->bUseFreelist = 0;
5048             pDb->pFreelist = 0;
5049             bSave = 1;
5050           }
5051 
5052           for(i=0; i<pLevel->nRight; i++){
5053             lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
5054           }
5055 
5056           if( bEmpty ){
5057             /* If the new level is completely empty, remove it from the
5058             ** database snapshot. This can only happen if all input keys were
5059             ** annihilated. Since keys are only annihilated if the new level
5060             ** is the last in the linked list (contains the most ancient of
5061             ** database content), this guarantees that pLevel->pNext==0.  */
5062             Level *pTop;          /* Top level of worker snapshot */
5063             Level **pp;           /* Read/write iterator for Level.pNext list */
5064 
5065             assert( pLevel->pNext==0 );
5066 
5067             /* Remove the level from the worker snapshot. */
5068             pTop = lsmDbSnapshotLevel(pWorker);
5069             for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
5070             *pp = pLevel->pNext;
5071             lsmDbSnapshotSetLevel(pWorker, pTop);
5072 
5073             /* Free the Level structure. */
5074             sortedFreeLevel(pDb->pEnv, pLevel);
5075           }else{
5076 
5077             /* Free the separators of the next level, if required. */
5078             if( pLevel->pMerge->nInput > pLevel->nRight ){
5079               assert( pLevel->pNext->lhs.iRoot );
5080               pLevel->pNext->lhs.iRoot = 0;
5081             }
5082 
5083             /* Zero the right-hand-side of pLevel */
5084             lsmFree(pDb->pEnv, pLevel->aRhs);
5085             pLevel->nRight = 0;
5086             pLevel->aRhs = 0;
5087 
5088             /* Free the Merge object */
5089             lsmFree(pDb->pEnv, pLevel->pMerge);
5090             pLevel->pMerge = 0;
5091           }
5092 
5093           if( bSave && rc==LSM_OK ){
5094             pDb->bIncrMerge = 0;
5095             rc = lsmSaveWorker(pDb, 0);
5096           }
5097         }
5098       }
5099 
5100       /* Clean up the MergeWorker object initialized above. If no error
5101       ** has occurred, invoke the work-hook to inform the application that
5102       ** the database structure has changed. */
5103       mergeWorkerShutdown(&mergeworker, &rc);
5104       pDb->bIncrMerge = 0;
5105       if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);
5106 
5107 #if LSM_LOG_STRUCTURE
5108       lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "work");
5109 #endif
5110       assertBtreeOk(pDb, &pLevel->lhs);
5111       assertRunInOrder(pDb, &pLevel->lhs);
5112 
5113       /* If bFlush is true and the database is no longer considered "full",
5114       ** break out of the loop even if nRemaining is still greater than
5115       ** zero. The caller has an in-memory tree to flush to disk.  */
5116       if( bFlush && sortedDbIsFull(pDb)==0 ) break;
5117     }
5118   }
5119 
5120   if( pnWrite ) *pnWrite = (nWork - nRemaining);
5121   pWorker->nWrite += (nWork - nRemaining);
5122 
5123 #ifdef LSM_LOG_WORK
5124   lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
5125 #endif
5126   return rc;
5127 }
5128 
5129 /*
5130 ** The database connection passed as the first argument must be a worker
5131 ** connection. This function checks if there exists an "old" in-memory tree
5132 ** ready to be flushed to disk. If so, true is returned. Otherwise false.
5133 **
5134 ** If an error occurs, *pRc is set to an LSM error code before returning.
5135 ** It is assumed that *pRc is set to LSM_OK when this function is called.
5136 */
sortedTreeHasOld(lsm_db * pDb,int * pRc)5137 static int sortedTreeHasOld(lsm_db *pDb, int *pRc){
5138   int rc = LSM_OK;
5139   int bRet = 0;
5140 
5141   assert( pDb->pWorker );
5142   if( *pRc==LSM_OK ){
5143     if( rc==LSM_OK
5144         && pDb->treehdr.iOldShmid
5145         && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff
5146       ){
5147       bRet = 1;
5148     }else{
5149       bRet = 0;
5150     }
5151     *pRc = rc;
5152   }
5153   assert( *pRc==LSM_OK || bRet==0 );
5154   return bRet;
5155 }
5156 
5157 /*
5158 ** Create a new free-list only top-level segment. Return LSM_OK if successful
5159 ** or an LSM error code if some error occurs.
5160 */
sortedNewFreelistOnly(lsm_db * pDb)5161 static int sortedNewFreelistOnly(lsm_db *pDb){
5162   return sortedNewToplevel(pDb, TREE_NONE, 0);
5163 }
5164 
lsmSaveWorker(lsm_db * pDb,int bFlush)5165 int lsmSaveWorker(lsm_db *pDb, int bFlush){
5166   Snapshot *p = pDb->pWorker;
5167   if( p->freelist.nEntry>pDb->nMaxFreelist ){
5168     int rc = sortedNewFreelistOnly(pDb);
5169     if( rc!=LSM_OK ) return rc;
5170   }
5171   return lsmCheckpointSaveWorker(pDb, bFlush);
5172 }
5173 
doLsmSingleWork(lsm_db * pDb,int bShutdown,int nMerge,int nPage,int * pnWrite,int * pbCkpt)5174 static int doLsmSingleWork(
5175   lsm_db *pDb,
5176   int bShutdown,
5177   int nMerge,                     /* Minimum segments to merge together */
5178   int nPage,                      /* Number of pages to write to disk */
5179   int *pnWrite,                   /* OUT: Pages actually written to disk */
5180   int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
5181 ){
5182   Snapshot *pWorker;              /* Worker snapshot */
5183   int rc = LSM_OK;                /* Return code */
5184   int bDirty = 0;
5185   int nMax = nPage;               /* Maximum pages to write to disk */
5186   int nRem = nPage;
5187   int bCkpt = 0;
5188 
5189   assert( nPage>0 );
5190 
5191   /* Open the worker 'transaction'. It will be closed before this function
5192   ** returns.  */
5193   assert( pDb->pWorker==0 );
5194   rc = lsmBeginWork(pDb);
5195   if( rc!=LSM_OK ) return rc;
5196   pWorker = pDb->pWorker;
5197 
5198   /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
5199   ** that this call stops writing when the auto-checkpoint is due. The
5200   ** caller will do the checkpoint, then possibly call this function again. */
5201   if( bShutdown==0 && pDb->nAutockpt ){
5202     u32 nSync;
5203     u32 nUnsync;
5204     int nPgsz;
5205 
5206     lsmCheckpointSynced(pDb, 0, 0, &nSync);
5207     nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
5208     nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);
5209 
5210     nMax = (int)LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (int)(nUnsync-nSync));
5211     if( nMax<nRem ){
5212       bCkpt = 1;
5213       nRem = LSM_MAX(nMax, 0);
5214     }
5215   }
5216 
5217   /* If there exists in-memory data ready to be flushed to disk, attempt
5218   ** to flush it now.  */
5219   if( pDb->nTransOpen==0 ){
5220     rc = lsmTreeLoadHeader(pDb, 0);
5221   }
5222   if( sortedTreeHasOld(pDb, &rc) ){
5223     /* sortedDbIsFull() returns non-zero if either (a) there are too many
5224     ** levels in total in the db, or (b) there are too many levels with the
5225     ** the same age in the db. Either way, call sortedWork() to merge
5226     ** existing segments together until this condition is cleared.  */
5227     if( sortedDbIsFull(pDb) ){
5228       int nPg = 0;
5229       rc = sortedWork(pDb, nRem, nMerge, 1, &nPg);
5230       nRem -= nPg;
5231       assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
5232       bDirty = 1;
5233     }
5234 
5235     if( rc==LSM_OK && nRem>0 ){
5236       int nPg = 0;
5237       rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
5238       nRem -= nPg;
5239       if( rc==LSM_OK ){
5240         if( pDb->nTransOpen>0 ){
5241           lsmTreeDiscardOld(pDb);
5242         }
5243         rc = lsmSaveWorker(pDb, 1);
5244         bDirty = 0;
5245       }
5246     }
5247   }
5248 
5249   /* If nPage is still greater than zero, do some merging. */
5250   if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
5251     int nPg = 0;
5252     rc = sortedWork(pDb, nRem, nMerge, 0, &nPg);
5253     nRem -= nPg;
5254     if( nPg ) bDirty = 1;
5255   }
5256 
5257   /* If the in-memory part of the free-list is too large, write a new
5258   ** top-level containing just the in-memory free-list entries to disk. */
5259   if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
5260     while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
5261       int nPg = 0;
5262       rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
5263       nRem -= nPg;
5264     }
5265     if( rc==LSM_OK ){
5266       rc = sortedNewFreelistOnly(pDb);
5267     }
5268     bDirty = 1;
5269   }
5270 
5271   if( rc==LSM_OK ){
5272     *pnWrite = (nMax - nRem);
5273     *pbCkpt = (bCkpt && nRem<=0);
5274     if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0
5275      && pWorker->pLevel
5276      && pWorker->pLevel->nRight==0
5277      && pWorker->pLevel->pNext==0
5278     ){
5279       *pbCkpt = 1;
5280     }
5281   }
5282 
5283   if( rc==LSM_OK && bDirty ){
5284     lsmFinishWork(pDb, 0, &rc);
5285   }else{
5286     int rcdummy = LSM_BUSY;
5287     lsmFinishWork(pDb, 0, &rcdummy);
5288     *pnWrite = 0;
5289   }
5290   assert( pDb->pWorker==0 );
5291   return rc;
5292 }
5293 
doLsmWork(lsm_db * pDb,int nMerge,int nPage,int * pnWrite)5294 static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
5295   int rc = LSM_OK;                /* Return code */
5296   int nWrite = 0;                 /* Number of pages written */
5297 
5298   assert( nMerge>=1 );
5299 
5300   if( nPage!=0 ){
5301     int bCkpt = 0;
5302     do {
5303       int nThis = 0;
5304       int nReq = (nPage>=0) ? (nPage-nWrite) : ((int)0x7FFFFFFF);
5305 
5306       bCkpt = 0;
5307       rc = doLsmSingleWork(pDb, 0, nMerge, nReq, &nThis, &bCkpt);
5308       nWrite += nThis;
5309       if( rc==LSM_OK && bCkpt ){
5310         rc = lsm_checkpoint(pDb, 0);
5311       }
5312     }while( rc==LSM_OK && bCkpt && (nWrite<nPage || nPage<0) );
5313   }
5314 
5315   if( pnWrite ){
5316     if( rc==LSM_OK ){
5317       *pnWrite = nWrite;
5318     }else{
5319       *pnWrite = 0;
5320     }
5321   }
5322   return rc;
5323 }
5324 
5325 /*
5326 ** Perform work to merge database segments together.
5327 */
lsm_work(lsm_db * pDb,int nMerge,int nKB,int * pnWrite)5328 int lsm_work(lsm_db *pDb, int nMerge, int nKB, int *pnWrite){
5329   int rc;                         /* Return code */
5330   int nPgsz;                      /* Nominal page size in bytes */
5331   int nPage;                      /* Equivalent of nKB in pages */
5332   int nWrite = 0;                 /* Number of pages written */
5333 
5334   /* This function may not be called if pDb has an open read or write
5335   ** transaction. Return LSM_MISUSE if an application attempts this.  */
5336   if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;
5337   if( nMerge<=0 ) nMerge = pDb->nMerge;
5338 
5339   lsmFsPurgeCache(pDb->pFS);
5340 
5341   /* Convert from KB to pages */
5342   nPgsz = lsmFsPageSize(pDb->pFS);
5343   if( nKB>=0 ){
5344     nPage = ((i64)nKB * 1024 + nPgsz - 1) / nPgsz;
5345   }else{
5346     nPage = -1;
5347   }
5348 
5349   rc = doLsmWork(pDb, nMerge, nPage, &nWrite);
5350 
5351   if( pnWrite ){
5352     /* Convert back from pages to KB */
5353     *pnWrite = (int)(((i64)nWrite * 1024 + nPgsz - 1) / nPgsz);
5354   }
5355   return rc;
5356 }
5357 
lsm_flush(lsm_db * db)5358 int lsm_flush(lsm_db *db){
5359   int rc;
5360 
5361   if( db->nTransOpen>0 || db->pCsr ){
5362     rc = LSM_MISUSE_BKPT;
5363   }else{
5364     rc = lsmBeginWriteTrans(db);
5365     if( rc==LSM_OK ){
5366       lsmFlushTreeToDisk(db);
5367       lsmTreeDiscardOld(db);
5368       lsmTreeMakeOld(db);
5369       lsmTreeDiscardOld(db);
5370     }
5371 
5372     if( rc==LSM_OK ){
5373       rc = lsmFinishWriteTrans(db, 1);
5374     }else{
5375       lsmFinishWriteTrans(db, 0);
5376     }
5377     lsmFinishReadTrans(db);
5378   }
5379 
5380   return rc;
5381 }
5382 
5383 /*
5384 ** This function is called in auto-work mode to perform merging work on
5385 ** the data structure. It performs enough merging work to prevent the
5386 ** height of the tree from growing indefinitely assuming that roughly
5387 ** nUnit database pages worth of data have been written to the database
5388 ** (i.e. the in-memory tree) since the last call.
5389 */
lsmSortedAutoWork(lsm_db * pDb,int nUnit)5390 int lsmSortedAutoWork(
5391   lsm_db *pDb,                    /* Database handle */
5392   int nUnit                       /* Pages of data written to in-memory tree */
5393 ){
5394   int rc = LSM_OK;                /* Return code */
5395   int nDepth = 0;                 /* Current height of tree (longest path) */
5396   Level *pLevel;                  /* Used to iterate through levels */
5397   int bRestore = 0;
5398 
5399   assert( pDb->pWorker==0 );
5400   assert( pDb->nTransOpen>0 );
5401 
5402   /* Determine how many units of work to do before returning. One unit of
5403   ** work is achieved by writing one page (~4KB) of merged data.  */
5404   for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){
5405     /* nDepth += LSM_MAX(1, pLevel->nRight); */
5406     nDepth += 1;
5407   }
5408   if( lsmTreeHasOld(pDb) ){
5409     nDepth += 1;
5410     bRestore = 1;
5411     rc = lsmSaveCursors(pDb);
5412     if( rc!=LSM_OK ) return rc;
5413   }
5414 
5415   if( nDepth>0 ){
5416     int nRemaining;               /* Units of work to do before returning */
5417 
5418     nRemaining = nUnit * nDepth;
5419 #ifdef LSM_LOG_WORK
5420     lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages",
5421         nUnit, nDepth, nRemaining);
5422 #endif
5423     assert( nRemaining>=0 );
5424     rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
5425     if( rc==LSM_BUSY ) rc = LSM_OK;
5426 
5427     if( bRestore && pDb->pCsr ){
5428       lsmMCursorFreeCache(pDb);
5429       lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
5430       pDb->pClient = 0;
5431       if( rc==LSM_OK ){
5432         rc = lsmCheckpointLoad(pDb, 0);
5433       }
5434       if( rc==LSM_OK ){
5435         rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
5436       }
5437       if( rc==LSM_OK ){
5438         rc = lsmRestoreCursors(pDb);
5439       }
5440     }
5441   }
5442 
5443   return rc;
5444 }
5445 
5446 /*
5447 ** This function is only called during system shutdown. The contents of
5448 ** any in-memory trees present (old or current) are written out to disk.
5449 */
lsmFlushTreeToDisk(lsm_db * pDb)5450 int lsmFlushTreeToDisk(lsm_db *pDb){
5451   int rc;
5452 
5453   rc = lsmBeginWork(pDb);
5454   while( rc==LSM_OK && sortedDbIsFull(pDb) ){
5455     rc = sortedWork(pDb, 256, pDb->nMerge, 1, 0);
5456   }
5457 
5458   if( rc==LSM_OK ){
5459     rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
5460   }
5461 
5462   lsmFinishWork(pDb, 1, &rc);
5463   return rc;
5464 }
5465 
5466 /*
5467 ** Return a string representation of the segment passed as the only argument.
5468 ** Space for the returned string is allocated using lsmMalloc(), and should
5469 ** be freed by the caller using lsmFree().
5470 */
segToString(lsm_env * pEnv,Segment * pSeg,int nMin)5471 static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){
5472   int nSize = pSeg->nSize;
5473   LsmPgno iRoot = pSeg->iRoot;
5474   LsmPgno iFirst = pSeg->iFirst;
5475   LsmPgno iLast = pSeg->iLastPg;
5476   char *z;
5477 
5478   char *z1;
5479   char *z2;
5480   int nPad;
5481 
5482   z1 = lsmMallocPrintf(pEnv, "%d.%d", iFirst, iLast);
5483   if( iRoot ){
5484     z2 = lsmMallocPrintf(pEnv, "root=%d", iRoot);
5485   }else{
5486     z2 = lsmMallocPrintf(pEnv, "size=%d", nSize);
5487   }
5488 
5489   nPad = nMin - 2 - strlen(z1) - 1 - strlen(z2);
5490   nPad = LSM_MAX(0, nPad);
5491 
5492   if( iRoot ){
5493     z = lsmMallocPrintf(pEnv, "/%s %*s%s\\", z1, nPad, "", z2);
5494   }else{
5495     z = lsmMallocPrintf(pEnv, "|%s %*s%s|", z1, nPad, "", z2);
5496   }
5497   lsmFree(pEnv, z1);
5498   lsmFree(pEnv, z2);
5499 
5500   return z;
5501 }
5502 
fileToString(lsm_db * pDb,char * aBuf,int nBuf,int nMin,Segment * pSeg)5503 static int fileToString(
5504   lsm_db *pDb,                    /* For xMalloc() */
5505   char *aBuf,
5506   int nBuf,
5507   int nMin,
5508   Segment *pSeg
5509 ){
5510   int i = 0;
5511   if( pSeg ){
5512     char *zSeg;
5513 
5514     zSeg = segToString(pDb->pEnv, pSeg, nMin);
5515     snprintf(&aBuf[i], nBuf-i, "%s", zSeg);
5516     i += strlen(&aBuf[i]);
5517     lsmFree(pDb->pEnv, zSeg);
5518 
5519 #ifdef LSM_LOG_FREELIST
5520     lsmInfoArrayStructure(pDb, 1, pSeg->iFirst, &zSeg);
5521     snprintf(&aBuf[i], nBuf-1, "    (%s)", zSeg);
5522     i += strlen(&aBuf[i]);
5523     lsmFree(pDb->pEnv, zSeg);
5524 #endif
5525     aBuf[nBuf] = 0;
5526   }else{
5527     aBuf[0] = '\0';
5528   }
5529 
5530   return i;
5531 }
5532 
sortedDumpPage(lsm_db * pDb,Segment * pRun,Page * pPg,int bVals)5533 void sortedDumpPage(lsm_db *pDb, Segment *pRun, Page *pPg, int bVals){
5534   LsmBlob blob = {0, 0, 0};       /* LsmBlob used for keys */
5535   LsmString s;
5536   int i;
5537 
5538   int nRec;
5539   int iPtr;
5540   int flags;
5541   u8 *aData;
5542   int nData;
5543 
5544   aData = fsPageData(pPg, &nData);
5545 
5546   nRec = pageGetNRec(aData, nData);
5547   iPtr = (int)pageGetPtr(aData, nData);
5548   flags = pageGetFlags(aData, nData);
5549 
5550   lsmStringInit(&s, pDb->pEnv);
5551   lsmStringAppendf(&s,"nCell=%d iPtr=%d flags=%d {", nRec, iPtr, flags);
5552   if( flags&SEGMENT_BTREE_FLAG ) iPtr = 0;
5553 
5554   for(i=0; i<nRec; i++){
5555     Page *pRef = 0;               /* Pointer to page iRef */
5556     int iChar;
5557     u8 *aKey; int nKey = 0;       /* Key */
5558     u8 *aVal = 0; int nVal = 0;   /* Value */
5559     int iTopic;
5560     u8 *aCell;
5561     int iPgPtr;
5562     int eType;
5563 
5564     aCell = pageGetCell(aData, nData, i);
5565     eType = *aCell++;
5566     assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 );
5567     aCell += lsmVarintGet32(aCell, &iPgPtr);
5568 
5569     if( eType==0 ){
5570       LsmPgno iRef;               /* Page number of referenced page */
5571       aCell += lsmVarintGet64(aCell, &iRef);
5572       lsmFsDbPageGet(pDb->pFS, pRun, iRef, &pRef);
5573       aKey = pageGetKey(pRun, pRef, 0, &iTopic, &nKey, &blob);
5574     }else{
5575       aCell += lsmVarintGet32(aCell, &nKey);
5576       if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
5577       sortedReadData(0, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
5578       aVal = &aKey[nKey];
5579       iTopic = eType;
5580     }
5581 
5582     lsmStringAppendf(&s, "%s%2X:", (i==0?"":" "), iTopic);
5583     for(iChar=0; iChar<nKey; iChar++){
5584       lsmStringAppendf(&s, "%c", isalnum(aKey[iChar]) ? aKey[iChar] : '.');
5585     }
5586     if( nVal>0 && bVals ){
5587       lsmStringAppendf(&s, "##");
5588       for(iChar=0; iChar<nVal; iChar++){
5589         lsmStringAppendf(&s, "%c", isalnum(aVal[iChar]) ? aVal[iChar] : '.');
5590       }
5591     }
5592 
5593     lsmStringAppendf(&s, " %d", iPgPtr+iPtr);
5594     lsmFsPageRelease(pRef);
5595   }
5596   lsmStringAppend(&s, "}", 1);
5597 
5598   lsmLogMessage(pDb, LSM_OK, "      Page %d: %s", lsmFsPageNumber(pPg), s.z);
5599   lsmStringClear(&s);
5600 
5601   sortedBlobFree(&blob);
5602 }
5603 
infoCellDump(lsm_db * pDb,Segment * pSeg,int bIndirect,Page * pPg,int iCell,int * peType,int * piPgPtr,u8 ** paKey,int * pnKey,u8 ** paVal,int * pnVal,LsmBlob * pBlob)5604 static void infoCellDump(
5605   lsm_db *pDb,                    /* Database handle */
5606   Segment *pSeg,                  /* Segment page belongs to */
5607   int bIndirect,                  /* True to follow indirect refs */
5608   Page *pPg,
5609   int iCell,
5610   int *peType,
5611   int *piPgPtr,
5612   u8 **paKey, int *pnKey,
5613   u8 **paVal, int *pnVal,
5614   LsmBlob *pBlob
5615 ){
5616   u8 *aData; int nData;           /* Page data */
5617   u8 *aKey; int nKey = 0;         /* Key */
5618   u8 *aVal = 0; int nVal = 0;     /* Value */
5619   int eType;
5620   int iPgPtr;
5621   Page *pRef = 0;                 /* Pointer to page iRef */
5622   u8 *aCell;
5623 
5624   aData = fsPageData(pPg, &nData);
5625 
5626   aCell = pageGetCell(aData, nData, iCell);
5627   eType = *aCell++;
5628   aCell += lsmVarintGet32(aCell, &iPgPtr);
5629 
5630   if( eType==0 ){
5631     int dummy;
5632     LsmPgno iRef;                 /* Page number of referenced page */
5633     aCell += lsmVarintGet64(aCell, &iRef);
5634     if( bIndirect ){
5635       lsmFsDbPageGet(pDb->pFS, pSeg, iRef, &pRef);
5636       pageGetKeyCopy(pDb->pEnv, pSeg, pRef, 0, &dummy, pBlob);
5637       aKey = (u8 *)pBlob->pData;
5638       nKey = pBlob->nData;
5639       lsmFsPageRelease(pRef);
5640     }else{
5641       aKey = (u8 *)"<indirect>";
5642       nKey = 11;
5643     }
5644   }else{
5645     aCell += lsmVarintGet32(aCell, &nKey);
5646     if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
5647     sortedReadData(pSeg, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob);
5648     aVal = &aKey[nKey];
5649   }
5650 
5651   if( peType ) *peType = eType;
5652   if( piPgPtr ) *piPgPtr = iPgPtr;
5653   if( paKey ) *paKey = aKey;
5654   if( paVal ) *paVal = aVal;
5655   if( pnKey ) *pnKey = nKey;
5656   if( pnVal ) *pnVal = nVal;
5657 }
5658 
infoAppendBlob(LsmString * pStr,int bHex,u8 * z,int n)5659 static int infoAppendBlob(LsmString *pStr, int bHex, u8 *z, int n){
5660   int iChar;
5661   for(iChar=0; iChar<n; iChar++){
5662     if( bHex ){
5663       lsmStringAppendf(pStr, "%02X", z[iChar]);
5664     }else{
5665       lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.');
5666     }
5667   }
5668   return LSM_OK;
5669 }
5670 
5671 #define INFO_PAGE_DUMP_DATA     0x01
5672 #define INFO_PAGE_DUMP_VALUES   0x02
5673 #define INFO_PAGE_DUMP_HEX      0x04
5674 #define INFO_PAGE_DUMP_INDIRECT 0x08
5675 
infoPageDump(lsm_db * pDb,LsmPgno iPg,int flags,char ** pzOut)5676 static int infoPageDump(
5677   lsm_db *pDb,                    /* Database handle */
5678   LsmPgno iPg,                    /* Page number of page to dump */
5679   int flags,
5680   char **pzOut                    /* OUT: lsmMalloc'd string */
5681 ){
5682   int rc = LSM_OK;                /* Return code */
5683   Page *pPg = 0;                  /* Handle for page iPg */
5684   int i, j;                       /* Loop counters */
5685   const int perLine = 16;         /* Bytes per line in the raw hex dump */
5686   Segment *pSeg = 0;
5687   Snapshot *pSnap;
5688 
5689   int bValues = (flags & INFO_PAGE_DUMP_VALUES);
5690   int bHex = (flags & INFO_PAGE_DUMP_HEX);
5691   int bData = (flags & INFO_PAGE_DUMP_DATA);
5692   int bIndirect = (flags & INFO_PAGE_DUMP_INDIRECT);
5693 
5694   *pzOut = 0;
5695   if( iPg==0 ) return LSM_ERROR;
5696 
5697   assert( pDb->pClient || pDb->pWorker );
5698   pSnap = pDb->pClient;
5699   if( pSnap==0 ) pSnap = pDb->pWorker;
5700   if( pSnap->redirect.n>0 ){
5701     Level *pLvl;
5702     int bUse = 0;
5703     for(pLvl=pSnap->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
5704     pSeg = (pLvl->nRight==0 ? &pLvl->lhs : &pLvl->aRhs[pLvl->nRight-1]);
5705     rc = lsmFsSegmentContainsPg(pDb->pFS, pSeg, iPg, &bUse);
5706     if( bUse==0 ){
5707       pSeg = 0;
5708     }
5709   }
5710 
5711   /* iPg is a real page number (not subject to redirection). So it is safe
5712   ** to pass a NULL in place of the segment pointer as the second argument
5713   ** to lsmFsDbPageGet() here.  */
5714   if( rc==LSM_OK ){
5715     rc = lsmFsDbPageGet(pDb->pFS, 0, iPg, &pPg);
5716   }
5717 
5718   if( rc==LSM_OK ){
5719     LsmBlob blob = {0, 0, 0, 0};
5720     int nKeyWidth = 0;
5721     LsmString str;
5722     int nRec;
5723     int iPtr;
5724     int flags2;
5725     int iCell;
5726     u8 *aData; int nData;         /* Page data and size thereof */
5727 
5728     aData = fsPageData(pPg, &nData);
5729     nRec = pageGetNRec(aData, nData);
5730     iPtr = (int)pageGetPtr(aData, nData);
5731     flags2 = pageGetFlags(aData, nData);
5732 
5733     lsmStringInit(&str, pDb->pEnv);
5734     lsmStringAppendf(&str, "Page : %lld  (%d bytes)\n", iPg, nData);
5735     lsmStringAppendf(&str, "nRec : %d\n", nRec);
5736     lsmStringAppendf(&str, "iPtr : %d\n", iPtr);
5737     lsmStringAppendf(&str, "flags: %04x\n", flags2);
5738     lsmStringAppendf(&str, "\n");
5739 
5740     for(iCell=0; iCell<nRec; iCell++){
5741       int nKey;
5742       infoCellDump(
5743           pDb, pSeg, bIndirect, pPg, iCell, 0, 0, 0, &nKey, 0, 0, &blob
5744       );
5745       if( nKey>nKeyWidth ) nKeyWidth = nKey;
5746     }
5747     if( bHex ) nKeyWidth = nKeyWidth * 2;
5748 
5749     for(iCell=0; iCell<nRec; iCell++){
5750       u8 *aKey; int nKey = 0;       /* Key */
5751       u8 *aVal; int nVal = 0;       /* Value */
5752       int iPgPtr;
5753       int eType;
5754       LsmPgno iAbsPtr;
5755       char zFlags[8];
5756 
5757       infoCellDump(pDb, pSeg, bIndirect, pPg, iCell, &eType, &iPgPtr,
5758           &aKey, &nKey, &aVal, &nVal, &blob
5759       );
5760       iAbsPtr = iPgPtr + ((flags2 & SEGMENT_BTREE_FLAG) ? 0 : iPtr);
5761 
5762       lsmFlagsToString(eType, zFlags);
5763       lsmStringAppendf(&str, "%s %d (%s) ",
5764           zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
5765       );
5766       infoAppendBlob(&str, bHex, aKey, nKey);
5767       if( nVal>0 && bValues ){
5768         lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), "");
5769         lsmStringAppendf(&str, " ");
5770         infoAppendBlob(&str, bHex, aVal, nVal);
5771       }
5772       if( rtTopic(eType) ){
5773         int iBlk = (int)~lsmGetU32(aKey);
5774         lsmStringAppendf(&str, "  (block=%d", iBlk);
5775         if( nVal>0 ){
5776           i64 iSnap = lsmGetU64(aVal);
5777           lsmStringAppendf(&str, " snapshot=%lld", iSnap);
5778         }
5779         lsmStringAppendf(&str, ")");
5780       }
5781       lsmStringAppendf(&str, "\n");
5782     }
5783 
5784     if( bData ){
5785       lsmStringAppendf(&str, "\n-------------------"
5786           "-------------------------------------------------------------\n");
5787       lsmStringAppendf(&str, "Page %d\n",
5788           iPg, (iPg-1)*nData, iPg*nData - 1);
5789       for(i=0; i<nData; i += perLine){
5790         lsmStringAppendf(&str, "%04x: ", i);
5791         for(j=0; j<perLine; j++){
5792           if( i+j>nData ){
5793             lsmStringAppendf(&str, "   ");
5794           }else{
5795             lsmStringAppendf(&str, "%02x ", aData[i+j]);
5796           }
5797         }
5798         lsmStringAppendf(&str, "  ");
5799         for(j=0; j<perLine; j++){
5800           if( i+j>nData ){
5801             lsmStringAppendf(&str, " ");
5802           }else{
5803             lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
5804           }
5805         }
5806         lsmStringAppendf(&str,"\n");
5807       }
5808     }
5809 
5810     *pzOut = str.z;
5811     sortedBlobFree(&blob);
5812     lsmFsPageRelease(pPg);
5813   }
5814 
5815   return rc;
5816 }
5817 
lsmInfoPageDump(lsm_db * pDb,LsmPgno iPg,int bHex,char ** pzOut)5818 int lsmInfoPageDump(
5819   lsm_db *pDb,                    /* Database handle */
5820   LsmPgno iPg,                    /* Page number of page to dump */
5821   int bHex,                       /* True to output key/value in hex form */
5822   char **pzOut                    /* OUT: lsmMalloc'd string */
5823 ){
5824   int flags = INFO_PAGE_DUMP_DATA | INFO_PAGE_DUMP_VALUES;
5825   if( bHex ) flags |= INFO_PAGE_DUMP_HEX;
5826   return infoPageDump(pDb, iPg, flags, pzOut);
5827 }
5828 
sortedDumpSegment(lsm_db * pDb,Segment * pRun,int bVals)5829 void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){
5830   assert( pDb->xLog );
5831   if( pRun && pRun->iFirst ){
5832     int flags = (bVals ? INFO_PAGE_DUMP_VALUES : 0);
5833     char *zSeg;
5834     Page *pPg;
5835 
5836     zSeg = segToString(pDb->pEnv, pRun, 0);
5837     lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
5838     lsmFree(pDb->pEnv, zSeg);
5839 
5840     lsmFsDbPageGet(pDb->pFS, pRun, pRun->iFirst, &pPg);
5841     while( pPg ){
5842       Page *pNext;
5843       char *z = 0;
5844       infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z);
5845       lsmLogMessage(pDb, LSM_OK, "%s", z);
5846       lsmFree(pDb->pEnv, z);
5847 #if 0
5848       sortedDumpPage(pDb, pRun, pPg, bVals);
5849 #endif
5850       lsmFsDbPageNext(pRun, pPg, 1, &pNext);
5851       lsmFsPageRelease(pPg);
5852       pPg = pNext;
5853     }
5854   }
5855 }
5856 
5857 /*
5858 ** Invoke the log callback zero or more times with messages that describe
5859 ** the current database structure.
5860 */
lsmSortedDumpStructure(lsm_db * pDb,Snapshot * pSnap,int bKeys,int bVals,const char * zWhy)5861 void lsmSortedDumpStructure(
5862   lsm_db *pDb,                    /* Database handle (used for xLog callback) */
5863   Snapshot *pSnap,                /* Snapshot to dump */
5864   int bKeys,                      /* Output the keys from each segment */
5865   int bVals,                      /* Output the values from each segment */
5866   const char *zWhy                /* Caption to print near top of dump */
5867 ){
5868   Snapshot *pDump = pSnap;
5869   Level *pTopLevel;
5870   char *zFree = 0;
5871 
5872   assert( pSnap );
5873   pTopLevel = lsmDbSnapshotLevel(pDump);
5874   if( pDb->xLog && pTopLevel ){
5875     static int nCall = 0;
5876     Level *pLevel;
5877     int iLevel = 0;
5878 
5879     nCall++;
5880     lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);
5881 
5882 #if 0
5883     if( nCall==1031 || nCall==1032 ) bKeys=1;
5884 #endif
5885 
5886     for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
5887       char zLeft[1024];
5888       char zRight[1024];
5889       int i = 0;
5890 
5891       Segment *aLeft[24];
5892       Segment *aRight[24];
5893 
5894       int nLeft = 0;
5895       int nRight = 0;
5896 
5897       Segment *pSeg = &pLevel->lhs;
5898       aLeft[nLeft++] = pSeg;
5899 
5900       for(i=0; i<pLevel->nRight; i++){
5901         aRight[nRight++] = &pLevel->aRhs[i];
5902       }
5903 
5904 #ifdef LSM_LOG_FREELIST
5905       if( nRight ){
5906         memmove(&aRight[1], aRight, sizeof(aRight[0])*nRight);
5907         aRight[0] = 0;
5908         nRight++;
5909       }
5910 #endif
5911 
5912       for(i=0; i<nLeft || i<nRight; i++){
5913         int iPad = 0;
5914         char zLevel[32];
5915         zLeft[0] = '\0';
5916         zRight[0] = '\0';
5917 
5918         if( i<nLeft ){
5919           fileToString(pDb, zLeft, sizeof(zLeft), 24, aLeft[i]);
5920         }
5921         if( i<nRight ){
5922           fileToString(pDb, zRight, sizeof(zRight), 24, aRight[i]);
5923         }
5924 
5925         if( i==0 ){
5926           snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d) (flags=%.4x)",
5927               iLevel, (int)pLevel->iAge, (int)pLevel->flags
5928           );
5929         }else{
5930           zLevel[0] = '\0';
5931         }
5932 
5933         if( nRight==0 ){
5934           iPad = 10;
5935         }
5936 
5937         lsmLogMessage(pDb, LSM_OK, "% 25s % *s% -35s %s",
5938             zLevel, iPad, "", zLeft, zRight
5939         );
5940       }
5941 
5942       iLevel++;
5943     }
5944 
5945     if( bKeys ){
5946       for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
5947         int i;
5948         sortedDumpSegment(pDb, &pLevel->lhs, bVals);
5949         for(i=0; i<pLevel->nRight; i++){
5950           sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
5951         }
5952       }
5953     }
5954   }
5955 
5956   lsmInfoFreelist(pDb, &zFree);
5957   lsmLogMessage(pDb, LSM_OK, "Freelist: %s", zFree);
5958   lsmFree(pDb->pEnv, zFree);
5959 
5960   assert( lsmFsIntegrityCheck(pDb) );
5961 }
5962 
lsmSortedFreeLevel(lsm_env * pEnv,Level * pLevel)5963 void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
5964   Level *pNext;
5965   Level *p;
5966 
5967   for(p=pLevel; p; p=pNext){
5968     pNext = p->pNext;
5969     sortedFreeLevel(pEnv, p);
5970   }
5971 }
5972 
lsmSortedSaveTreeCursors(lsm_db * pDb)5973 void lsmSortedSaveTreeCursors(lsm_db *pDb){
5974   MultiCursor *pCsr;
5975   for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
5976     lsmTreeCursorSave(pCsr->apTreeCsr[0]);
5977     lsmTreeCursorSave(pCsr->apTreeCsr[1]);
5978   }
5979 }
5980 
lsmSortedExpandBtreePage(Page * pPg,int nOrig)5981 void lsmSortedExpandBtreePage(Page *pPg, int nOrig){
5982   u8 *aData;
5983   int nData;
5984   int nEntry;
5985   int iHdr;
5986 
5987   aData = lsmFsPageData(pPg, &nData);
5988   nEntry = pageGetNRec(aData, nOrig);
5989   iHdr = SEGMENT_EOF(nOrig, nEntry);
5990   memmove(&aData[iHdr + (nData-nOrig)], &aData[iHdr], nOrig-iHdr);
5991 }
5992 
5993 #ifdef LSM_DEBUG_EXPENSIVE
assertRunInOrder(lsm_db * pDb,Segment * pSeg)5994 static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){
5995   Page *pPg = 0;
5996   LsmBlob blob1 = {0, 0, 0, 0};
5997   LsmBlob blob2 = {0, 0, 0, 0};
5998 
5999   lsmFsDbPageGet(pDb->pFS, pSeg, pSeg->iFirst, &pPg);
6000   while( pPg ){
6001     u8 *aData; int nData;
6002     Page *pNext;
6003 
6004     aData = lsmFsPageData(pPg, &nData);
6005     if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){
6006       int i;
6007       int nRec = pageGetNRec(aData, nData);
6008       for(i=0; i<nRec; i++){
6009         int iTopic1, iTopic2;
6010         pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i, &iTopic1, &blob1);
6011 
6012         if( i==0 && blob2.nData ){
6013           assert( sortedKeyCompare(
6014                 pDb->xCmp, iTopic2, blob2.pData, blob2.nData,
6015                 iTopic1, blob1.pData, blob1.nData
6016           )<0 );
6017         }
6018 
6019         if( i<(nRec-1) ){
6020           pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i+1, &iTopic2, &blob2);
6021           assert( sortedKeyCompare(
6022                 pDb->xCmp, iTopic1, blob1.pData, blob1.nData,
6023                 iTopic2, blob2.pData, blob2.nData
6024           )<0 );
6025         }
6026       }
6027     }
6028 
6029     lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
6030     lsmFsPageRelease(pPg);
6031     pPg = pNext;
6032   }
6033 
6034   sortedBlobFree(&blob1);
6035   sortedBlobFree(&blob2);
6036 }
6037 #endif
6038 
6039 #ifdef LSM_DEBUG_EXPENSIVE
6040 /*
6041 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6042 ** defined. Its only purpose is to evaluate various assert() statements to
6043 ** verify that the database is well formed in certain respects.
6044 **
6045 ** More specifically, it checks that the array pOne contains the required
6046 ** pointers to pTwo. Array pTwo must be a main array. pOne may be either a
6047 ** separators array or another main array. If pOne does not contain the
6048 ** correct set of pointers, an assert() statement fails.
6049 */
assertPointersOk(lsm_db * pDb,Segment * pOne,Segment * pTwo,int bRhs)6050 static int assertPointersOk(
6051   lsm_db *pDb,                    /* Database handle */
6052   Segment *pOne,                  /* Segment containing pointers */
6053   Segment *pTwo,                  /* Segment containing pointer targets */
6054   int bRhs                        /* True if pTwo may have been Gobble()d */
6055 ){
6056   int rc = LSM_OK;                /* Error code */
6057   SegmentPtr ptr1;                /* Iterates through pOne */
6058   SegmentPtr ptr2;                /* Iterates through pTwo */
6059   LsmPgno iPrev;
6060 
6061   assert( pOne && pTwo );
6062 
6063   memset(&ptr1, 0, sizeof(ptr1));
6064   memset(&ptr2, 0, sizeof(ptr1));
6065   ptr1.pSeg = pOne;
6066   ptr2.pSeg = pTwo;
6067   segmentPtrEndPage(pDb->pFS, &ptr1, 0, &rc);
6068   segmentPtrEndPage(pDb->pFS, &ptr2, 0, &rc);
6069 
6070   /* Check that the footer pointer of the first page of pOne points to
6071   ** the first page of pTwo. */
6072   iPrev = pTwo->iFirst;
6073   if( ptr1.iPtr!=iPrev && !bRhs ){
6074     assert( 0 );
6075   }
6076 
6077   if( rc==LSM_OK && ptr1.nCell>0 ){
6078     rc = segmentPtrLoadCell(&ptr1, 0);
6079   }
6080 
6081   while( rc==LSM_OK && ptr2.pPg ){
6082     LsmPgno iThis;
6083 
6084     /* Advance to the next page of segment pTwo that contains at least
6085     ** one cell. Break out of the loop if the iterator reaches EOF.  */
6086     do{
6087       rc = segmentPtrNextPage(&ptr2, 1);
6088       assert( rc==LSM_OK );
6089     }while( rc==LSM_OK && ptr2.pPg && ptr2.nCell==0 );
6090     if( rc!=LSM_OK || ptr2.pPg==0 ) break;
6091     iThis = lsmFsPageNumber(ptr2.pPg);
6092 
6093     if( (ptr2.flags & (PGFTR_SKIP_THIS_FLAG|SEGMENT_BTREE_FLAG))==0 ){
6094 
6095       /* Load the first cell in the array pTwo page. */
6096       rc = segmentPtrLoadCell(&ptr2, 0);
6097 
6098       /* Iterate forwards through pOne, searching for a key that matches the
6099       ** key ptr2.pKey/nKey. This key should have a pointer to the page that
6100       ** ptr2 currently points to. */
6101       while( rc==LSM_OK ){
6102         int res = rtTopic(ptr1.eType) - rtTopic(ptr2.eType);
6103         if( res==0 ){
6104           res = pDb->xCmp(ptr1.pKey, ptr1.nKey, ptr2.pKey, ptr2.nKey);
6105         }
6106 
6107         if( res<0 ){
6108           assert( bRhs || ptr1.iPtr+ptr1.iPgPtr==iPrev );
6109         }else if( res>0 ){
6110           assert( 0 );
6111         }else{
6112           assert( ptr1.iPtr+ptr1.iPgPtr==iThis );
6113           iPrev = iThis;
6114           break;
6115         }
6116 
6117         rc = segmentPtrAdvance(0, &ptr1, 0);
6118         if( ptr1.pPg==0 ){
6119           assert( 0 );
6120         }
6121       }
6122     }
6123   }
6124 
6125   segmentPtrReset(&ptr1, 0);
6126   segmentPtrReset(&ptr2, 0);
6127   return LSM_OK;
6128 }
6129 
6130 /*
6131 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6132 ** defined. Its only purpose is to evaluate various assert() statements to
6133 ** verify that the database is well formed in certain respects.
6134 **
6135 ** More specifically, it checks that the b-tree embedded in array pRun
6136 ** contains the correct keys. If not, an assert() fails.
6137 */
assertBtreeOk(lsm_db * pDb,Segment * pSeg)6138 static int assertBtreeOk(
6139   lsm_db *pDb,
6140   Segment *pSeg
6141 ){
6142   int rc = LSM_OK;                /* Return code */
6143   if( pSeg->iRoot ){
6144     LsmBlob blob = {0, 0, 0};     /* Buffer used to cache overflow keys */
6145     FileSystem *pFS = pDb->pFS;   /* File system to read from */
6146     Page *pPg = 0;                /* Main run page */
6147     BtreeCursor *pCsr = 0;        /* Btree cursor */
6148 
6149     rc = btreeCursorNew(pDb, pSeg, &pCsr);
6150     if( rc==LSM_OK ){
6151       rc = btreeCursorFirst(pCsr);
6152     }
6153     if( rc==LSM_OK ){
6154       rc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pPg);
6155     }
6156 
6157     while( rc==LSM_OK ){
6158       Page *pNext;
6159       u8 *aData;
6160       int nData;
6161       int flags;
6162 
6163       rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
6164       lsmFsPageRelease(pPg);
6165       pPg = pNext;
6166       if( pPg==0 ) break;
6167       aData = fsPageData(pPg, &nData);
6168       flags = pageGetFlags(aData, nData);
6169       if( rc==LSM_OK
6170        && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
6171        && 0!=pageGetNRec(aData, nData)
6172       ){
6173         u8 *pKey;
6174         int nKey;
6175         int iTopic;
6176         pKey = pageGetKey(pSeg, pPg, 0, &iTopic, &nKey, &blob);
6177         assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
6178         assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
6179         rc = btreeCursorNext(pCsr);
6180       }
6181     }
6182     assert( rc!=LSM_OK || pCsr->pKey==0 );
6183 
6184     if( pPg ) lsmFsPageRelease(pPg);
6185 
6186     btreeCursorFree(pCsr);
6187     sortedBlobFree(&blob);
6188   }
6189 
6190   return rc;
6191 }
6192 #endif /* ifdef LSM_DEBUG_EXPENSIVE */
6193