1 /*
2 ** 2011-08-14
3 **
4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
6 **
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** PAGE FORMAT:
14 **
15 ** The maximum page size is 65536 bytes.
16 **
17 ** Since all records are equal to or larger than 2 bytes in size, and
18 ** some space within the page is consumed by the page footer, there must
19 ** be less than 2^15 records on each page.
20 **
21 ** Each page ends with a footer that describes the pages contents. This
22 ** footer serves as similar purpose to the page header in an SQLite database.
23 ** A footer is used instead of a header because it makes it easier to
24 ** populate a new page based on a sorted list of key/value pairs.
25 **
26 ** The footer consists of the following values (starting at the end of
27 ** the page and continuing backwards towards the start). All values are
28 ** stored as unsigned big-endian integers.
29 **
30 ** * Number of records on page (2 bytes).
31 ** * Flags field (2 bytes).
32 ** * Left-hand pointer value (8 bytes).
33 ** * The starting offset of each record (2 bytes per record).
34 **
35 ** Records may span pages. Unless it happens to be an exact fit, the part
36 ** of the final record that starts on page X that does not fit on page X
37 ** is stored at the start of page (X+1). This means there may be pages where
38 ** (N==0). And on most pages the first record that starts on the page will
39 ** not start at byte offset 0. For example:
40 **
41 ** aaaaa bbbbb ccc <footer> cc eeeee fffff g <footer> gggg....
42 **
43 ** RECORD FORMAT:
44 **
45 ** The first byte of the record is a flags byte. It is a combination
46 ** of the following flags (defined in lsmInt.h):
47 **
48 ** LSM_START_DELETE
49 ** LSM_END_DELETE
50 ** LSM_POINT_DELETE
51 ** LSM_INSERT
52 ** LSM_SEPARATOR
53 ** LSM_SYSTEMKEY
54 **
55 ** Immediately following the type byte is a pointer to the smallest key
56 ** in the next file that is larger than the key in the current record. The
57 ** pointer is encoded as a varint. When added to the 32-bit page number
58 ** stored in the footer, it is the page number of the page that contains the
59 ** smallest key in the next sorted file that is larger than this key.
60 **
61 ** Next is the number of bytes in the key, encoded as a varint.
62 **
63 ** If the LSM_INSERT flag is set, the number of bytes in the value, as
64 ** a varint, is next.
65 **
66 ** Finally, the blob of data containing the key, and for LSM_INSERT
67 ** records, the value as well.
68 */
69
70 #ifndef _LSM_INT_H
71 # include "lsmInt.h"
72 #endif
73
74 #define LSM_LOG_STRUCTURE 0
75 #define LSM_LOG_DATA 0
76
77 /*
78 ** Macros to help decode record types.
79 */
80 #define rtTopic(eType) ((eType) & LSM_SYSTEMKEY)
81 #define rtIsDelete(eType) (((eType) & 0x0F)==LSM_POINT_DELETE)
82
83 #define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
84 #define rtIsWrite(eType) (((eType) & LSM_INSERT)!=0)
85 #define rtIsSystem(eType) (((eType) & LSM_SYSTEMKEY)!=0)
86
87 /*
88 ** The following macros are used to access a page footer.
89 */
90 #define SEGMENT_NRECORD_OFFSET(pgsz) ((pgsz) - 2)
91 #define SEGMENT_FLAGS_OFFSET(pgsz) ((pgsz) - 2 - 2)
92 #define SEGMENT_POINTER_OFFSET(pgsz) ((pgsz) - 2 - 2 - 8)
93 #define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 8 - 2 - (iCell)*2)
94
95 #define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry)
96
97 #define SEGMENT_BTREE_FLAG 0x0001
98 #define PGFTR_SKIP_NEXT_FLAG 0x0002
99 #define PGFTR_SKIP_THIS_FLAG 0x0004
100
101
102 #ifndef LSM_SEGMENTPTR_FREE_THRESHOLD
103 # define LSM_SEGMENTPTR_FREE_THRESHOLD 1024
104 #endif
105
106 typedef struct SegmentPtr SegmentPtr;
107 typedef struct Blob Blob;
108
109 struct Blob {
110 lsm_env *pEnv;
111 void *pData;
112 int nData;
113 int nAlloc;
114 };
115
116 /*
117 ** A SegmentPtr object may be used for one of two purposes:
118 **
119 ** * To iterate and/or seek within a single Segment (the combination of a
120 ** main run and an optional sorted run).
121 **
122 ** * To iterate through the separators array of a segment.
123 */
124 struct SegmentPtr {
125 Level *pLevel; /* Level object segment is part of */
126 Segment *pSeg; /* Segment to access */
127
128 /* Current page. See segmentPtrLoadPage(). */
129 Page *pPg; /* Current page */
130 u16 flags; /* Copy of page flags field */
131 int nCell; /* Number of cells on pPg */
132 Pgno iPtr; /* Base cascade pointer */
133
134 /* Current cell. See segmentPtrLoadCell() */
135 int iCell; /* Current record within page pPg */
136 int eType; /* Type of current record */
137 Pgno iPgPtr; /* Cascade pointer offset */
138 void *pKey; int nKey; /* Key associated with current record */
139 void *pVal; int nVal; /* Current record value (eType==WRITE only) */
140
141 /* Blobs used to allocate buffers for pKey and pVal as required */
142 Blob blob1;
143 Blob blob2;
144 };
145
146 /*
147 ** Used to iterate through the keys stored in a b-tree hierarchy from start
148 ** to finish. Only First() and Next() operations are required.
149 **
150 ** btreeCursorNew()
151 ** btreeCursorFirst()
152 ** btreeCursorNext()
153 ** btreeCursorFree()
154 ** btreeCursorPosition()
155 ** btreeCursorRestore()
156 */
157 typedef struct BtreePg BtreePg;
158 typedef struct BtreeCursor BtreeCursor;
159 struct BtreePg {
160 Page *pPage;
161 int iCell;
162 };
163 struct BtreeCursor {
164 Segment *pSeg; /* Iterate through this segments btree */
165 FileSystem *pFS; /* File system to read pages from */
166 int nDepth; /* Allocated size of aPg[] */
167 int iPg; /* Current entry in aPg[]. -1 -> EOF. */
168 BtreePg *aPg; /* Pages from root to current location */
169
170 /* Cache of current entry. pKey==0 for EOF. */
171 void *pKey;
172 int nKey;
173 int eType;
174 Pgno iPtr;
175
176 /* Storage for key, if not local */
177 Blob blob;
178 };
179
180
181 /*
182 ** A cursor used for merged searches or iterations through up to one
183 ** Tree structure and any number of sorted files.
184 **
185 ** lsmMCursorNew()
186 ** lsmMCursorSeek()
187 ** lsmMCursorNext()
188 ** lsmMCursorPrev()
189 ** lsmMCursorFirst()
190 ** lsmMCursorLast()
191 ** lsmMCursorKey()
192 ** lsmMCursorValue()
193 ** lsmMCursorValid()
194 **
195 ** iFree:
196 ** This variable is only used by cursors providing input data for a
197 ** new top-level segment. Such cursors only ever iterate forwards, not
198 ** backwards.
199 */
200 struct MultiCursor {
201 lsm_db *pDb; /* Connection that owns this cursor */
202 MultiCursor *pNext; /* Next cursor owned by connection pDb */
203 int flags; /* Mask of CURSOR_XXX flags */
204
205 int eType; /* Cache of current key type */
206 Blob key; /* Cache of current key (or NULL) */
207 Blob val; /* Cache of current value */
208
209 /* All the component cursors: */
210 TreeCursor *apTreeCsr[2]; /* Up to two tree cursors */
211 int iFree; /* Next element of free-list (-ve for eof) */
212 SegmentPtr *aPtr; /* Array of segment pointers */
213 int nPtr; /* Size of array aPtr[] */
214 BtreeCursor *pBtCsr; /* b-tree cursor (db writes only) */
215
216 /* Comparison results */
217 int nTree; /* Size of aTree[] array */
218 int *aTree; /* Array of comparison results */
219
220 /* Used by cursors flushing the in-memory tree only */
221 void *pSystemVal; /* Pointer to buffer to free */
222
223 /* Used by worker cursors only */
224 Pgno *pPrevMergePtr;
225 };
226
227 /*
228 ** The following constants are used to assign integers to each component
229 ** cursor of a multi-cursor.
230 */
231 #define CURSOR_DATA_TREE0 0 /* Current tree cursor (apTreeCsr[0]) */
232 #define CURSOR_DATA_TREE1 1 /* The "old" tree, if any (apTreeCsr[1]) */
233 #define CURSOR_DATA_SYSTEM 2 /* Free-list entries (new-toplevel only) */
234 #define CURSOR_DATA_SEGMENT 3 /* First segment pointer (aPtr[0]) */
235
236 /*
237 ** CURSOR_IGNORE_DELETE
238 ** If set, this cursor will not visit SORTED_DELETE keys.
239 **
240 ** CURSOR_FLUSH_FREELIST
241 ** This cursor is being used to create a new toplevel. It should also
242 ** iterate through the contents of the in-memory free block list.
243 **
244 ** CURSOR_IGNORE_SYSTEM
245 ** If set, this cursor ignores system keys.
246 **
247 ** CURSOR_NEXT_OK
248 ** Set if it is Ok to call lsm_csr_next().
249 **
250 ** CURSOR_PREV_OK
251 ** Set if it is Ok to call lsm_csr_prev().
252 **
253 ** CURSOR_READ_SEPARATORS
254 ** Set if this cursor should visit the separator keys in segment
255 ** aPtr[nPtr-1].
256 **
257 ** CURSOR_SEEK_EQ
258 ** Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
259 ** The key and value are stored in MultiCursor.key and MultiCursor.val
260 ** respectively.
261 */
262 #define CURSOR_IGNORE_DELETE 0x00000001
263 #define CURSOR_FLUSH_FREELIST 0x00000002
264 #define CURSOR_IGNORE_SYSTEM 0x00000010
265 #define CURSOR_NEXT_OK 0x00000020
266 #define CURSOR_PREV_OK 0x00000040
267 #define CURSOR_READ_SEPARATORS 0x00000080
268 #define CURSOR_SEEK_EQ 0x00000100
269
270 typedef struct MergeWorker MergeWorker;
271 typedef struct Hierarchy Hierarchy;
272
273 struct Hierarchy {
274 Page **apHier;
275 int nHier;
276 };
277
278 /*
279 ** aSave:
280 ** When mergeWorkerNextPage() is called to advance to the next page in
281 ** the output segment, if the bStore flag for an element of aSave[] is
282 ** true, it is cleared and the corresponding iPgno value is set to the
283 ** page number of the page just completed.
284 **
285 ** aSave[0] is used to record the pointer value to be pushed into the
286 ** b-tree hierarchy. aSave[1] is used to save the page number of the
287 ** page containing the indirect key most recently written to the b-tree.
288 ** see mergeWorkerPushHierarchy() for details.
289 */
290 struct MergeWorker {
291 lsm_db *pDb; /* Database handle */
292 Level *pLevel; /* Worker snapshot Level being merged */
293 MultiCursor *pCsr; /* Cursor to read new segment contents from */
294 int bFlush; /* True if this is an in-memory tree flush */
295 Hierarchy hier; /* B-tree hierarchy under construction */
296 Page *pPage; /* Current output page */
297 int nWork; /* Number of calls to mergeWorkerNextPage() */
298 Pgno *aGobble; /* Gobble point for each input segment */
299
300 Pgno iIndirect;
301 struct SavedPgno {
302 Pgno iPgno;
303 int bStore;
304 } aSave[2];
305 };
306
307 #ifdef LSM_DEBUG_EXPENSIVE
308 static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
309 static int assertBtreeOk(lsm_db *, Segment *);
310 static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
311 #else
312 #define assertRunInOrder(x,y)
313 #define assertBtreeOk(x,y)
314 #endif
315
316
317 struct FilePage { u8 *aData; int nData; };
fsPageData(Page * pPg,int * pnData)318 static u8 *fsPageData(Page *pPg, int *pnData){
319 *pnData = ((struct FilePage *)(pPg))->nData;
320 return ((struct FilePage *)(pPg))->aData;
321 }
322 /*UNUSED static u8 *fsPageDataPtr(Page *pPg){
323 return ((struct FilePage *)(pPg))->aData;
324 }*/
325
326 /*
327 ** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
328 */
lsmPutU16(u8 * aOut,u16 nVal)329 void lsmPutU16(u8 *aOut, u16 nVal){
330 aOut[0] = (u8)((nVal>>8) & 0xFF);
331 aOut[1] = (u8)(nVal & 0xFF);
332 }
333
lsmPutU32(u8 * aOut,u32 nVal)334 void lsmPutU32(u8 *aOut, u32 nVal){
335 aOut[0] = (u8)((nVal>>24) & 0xFF);
336 aOut[1] = (u8)((nVal>>16) & 0xFF);
337 aOut[2] = (u8)((nVal>> 8) & 0xFF);
338 aOut[3] = (u8)((nVal ) & 0xFF);
339 }
340
lsmGetU16(u8 * aOut)341 int lsmGetU16(u8 *aOut){
342 return (aOut[0] << 8) + aOut[1];
343 }
344
lsmGetU32(u8 * aOut)345 u32 lsmGetU32(u8 *aOut){
346 return ((u32)aOut[0] << 24)
347 + ((u32)aOut[1] << 16)
348 + ((u32)aOut[2] << 8)
349 + ((u32)aOut[3]);
350 }
351
lsmGetU64(u8 * aOut)352 u64 lsmGetU64(u8 *aOut){
353 return ((u64)aOut[0] << 56)
354 + ((u64)aOut[1] << 48)
355 + ((u64)aOut[2] << 40)
356 + ((u64)aOut[3] << 32)
357 + ((u64)aOut[4] << 24)
358 + ((u32)aOut[5] << 16)
359 + ((u32)aOut[6] << 8)
360 + ((u32)aOut[7]);
361 }
362
lsmPutU64(u8 * aOut,u64 nVal)363 void lsmPutU64(u8 *aOut, u64 nVal){
364 aOut[0] = (u8)((nVal>>56) & 0xFF);
365 aOut[1] = (u8)((nVal>>48) & 0xFF);
366 aOut[2] = (u8)((nVal>>40) & 0xFF);
367 aOut[3] = (u8)((nVal>>32) & 0xFF);
368 aOut[4] = (u8)((nVal>>24) & 0xFF);
369 aOut[5] = (u8)((nVal>>16) & 0xFF);
370 aOut[6] = (u8)((nVal>> 8) & 0xFF);
371 aOut[7] = (u8)((nVal ) & 0xFF);
372 }
373
sortedBlobGrow(lsm_env * pEnv,Blob * pBlob,int nData)374 static int sortedBlobGrow(lsm_env *pEnv, Blob *pBlob, int nData){
375 assert( pBlob->pEnv==pEnv || (pBlob->pEnv==0 && pBlob->pData==0) );
376 if( pBlob->nAlloc<nData ){
377 pBlob->pData = lsmReallocOrFree(pEnv, pBlob->pData, nData);
378 if( !pBlob->pData ) return LSM_NOMEM_BKPT;
379 pBlob->nAlloc = nData;
380 pBlob->pEnv = pEnv;
381 }
382 return LSM_OK;
383 }
384
sortedBlobSet(lsm_env * pEnv,Blob * pBlob,void * pData,int nData)385 static int sortedBlobSet(lsm_env *pEnv, Blob *pBlob, void *pData, int nData){
386 if( sortedBlobGrow(pEnv, pBlob, nData) ) return LSM_NOMEM;
387 memcpy(pBlob->pData, pData, nData);
388 pBlob->nData = nData;
389 return LSM_OK;
390 }
391
392 #if 0
393 static int sortedBlobCopy(Blob *pDest, Blob *pSrc){
394 return sortedBlobSet(pDest, pSrc->pData, pSrc->nData);
395 }
396 #endif
397
sortedBlobFree(Blob * pBlob)398 static void sortedBlobFree(Blob *pBlob){
399 assert( pBlob->pEnv || pBlob->pData==0 );
400 if( pBlob->pData ) lsmFree(pBlob->pEnv, pBlob->pData);
401 memset(pBlob, 0, sizeof(Blob));
402 }
403
sortedReadData(Segment * pSeg,Page * pPg,int iOff,int nByte,void ** ppData,Blob * pBlob)404 static int sortedReadData(
405 Segment *pSeg,
406 Page *pPg,
407 int iOff,
408 int nByte,
409 void **ppData,
410 Blob *pBlob
411 ){
412 int rc = LSM_OK;
413 int iEnd;
414 int nData;
415 int nCell;
416 u8 *aData;
417
418 aData = fsPageData(pPg, &nData);
419 nCell = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
420 iEnd = SEGMENT_EOF(nData, nCell);
421 assert( iEnd>0 && iEnd<nData );
422
423 if( iOff+nByte<=iEnd ){
424 *ppData = (void *)&aData[iOff];
425 }else{
426 int nRem = nByte;
427 int i = iOff;
428 u8 *aDest;
429
430 /* Make sure the blob is big enough to store the value being loaded. */
431 rc = sortedBlobGrow(lsmPageEnv(pPg), pBlob, nByte);
432 if( rc!=LSM_OK ) return rc;
433 pBlob->nData = nByte;
434 aDest = (u8 *)pBlob->pData;
435 *ppData = pBlob->pData;
436
437 /* Increment the pointer pages ref-count. */
438 lsmFsPageRef(pPg);
439
440 while( rc==LSM_OK ){
441 Page *pNext;
442 int flags;
443
444 /* Copy data from pPg into the output buffer. */
445 int nCopy = LSM_MIN(nRem, iEnd-i);
446 if( nCopy>0 ){
447 memcpy(&aDest[nByte-nRem], &aData[i], nCopy);
448 nRem -= nCopy;
449 i += nCopy;
450 assert( nRem==0 || i==iEnd );
451 }
452 assert( nRem>=0 );
453 if( nRem==0 ) break;
454 i -= iEnd;
455
456 /* Grab the next page in the segment */
457
458 do {
459 rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
460 if( rc==LSM_OK && pNext==0 ){
461 rc = LSM_CORRUPT_BKPT;
462 }
463 if( rc ) break;
464 lsmFsPageRelease(pPg);
465 pPg = pNext;
466 aData = fsPageData(pPg, &nData);
467 flags = lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
468 }while( flags&SEGMENT_BTREE_FLAG );
469
470 iEnd = SEGMENT_EOF(nData, lsmGetU16(&aData[nData-2]));
471 assert( iEnd>0 && iEnd<nData );
472 }
473
474 lsmFsPageRelease(pPg);
475 }
476
477 return rc;
478 }
479
pageGetNRec(u8 * aData,int nData)480 static int pageGetNRec(u8 *aData, int nData){
481 return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
482 }
483
pageGetPtr(u8 * aData,int nData)484 static Pgno pageGetPtr(u8 *aData, int nData){
485 return (Pgno)lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
486 }
487
pageGetFlags(u8 * aData,int nData)488 static int pageGetFlags(u8 *aData, int nData){
489 return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
490 }
491
pageGetCell(u8 * aData,int nData,int iCell)492 static u8 *pageGetCell(u8 *aData, int nData, int iCell){
493 return &aData[lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, iCell)])];
494 }
495
496 /*
497 ** Return the number of cells on page pPg.
498 */
pageObjGetNRec(Page * pPg)499 static int pageObjGetNRec(Page *pPg){
500 int nData;
501 u8 *aData = lsmFsPageData(pPg, &nData);
502 return pageGetNRec(aData, nData);
503 }
504
505 /*
506 ** Return the decoded (possibly relative) pointer value stored in cell
507 ** iCell from page aData/nData.
508 */
pageGetRecordPtr(u8 * aData,int nData,int iCell)509 static Pgno pageGetRecordPtr(u8 *aData, int nData, int iCell){
510 Pgno iRet; /* Return value */
511 u8 *aCell; /* Pointer to cell iCell */
512
513 assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
514 aCell = pageGetCell(aData, nData, iCell);
515 lsmVarintGet64(&aCell[1], &iRet);
516 return iRet;
517 }
518
pageGetKey(Segment * pSeg,Page * pPg,int iCell,int * piTopic,int * pnKey,Blob * pBlob)519 static u8 *pageGetKey(
520 Segment *pSeg, /* Segment pPg belongs to */
521 Page *pPg, /* Page to read from */
522 int iCell, /* Index of cell on page to read */
523 int *piTopic, /* OUT: Topic associated with this key */
524 int *pnKey, /* OUT: Size of key in bytes */
525 Blob *pBlob /* If required, use this for dynamic memory */
526 ){
527 u8 *pKey;
528 int nDummy;
529 int eType;
530 u8 *aData;
531 int nData;
532
533 aData = fsPageData(pPg, &nData);
534
535 assert( !(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
536 assert( iCell<pageGetNRec(aData, nData) );
537
538 pKey = pageGetCell(aData, nData, iCell);
539 eType = *pKey++;
540 pKey += lsmVarintGet32(pKey, &nDummy);
541 pKey += lsmVarintGet32(pKey, pnKey);
542 if( rtIsWrite(eType) ){
543 pKey += lsmVarintGet32(pKey, &nDummy);
544 }
545 *piTopic = rtTopic(eType);
546
547 sortedReadData(pSeg, pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob);
548 return pKey;
549 }
550
pageGetKeyCopy(lsm_env * pEnv,Segment * pSeg,Page * pPg,int iCell,int * piTopic,Blob * pBlob)551 static int pageGetKeyCopy(
552 lsm_env *pEnv, /* Environment handle */
553 Segment *pSeg, /* Segment pPg belongs to */
554 Page *pPg, /* Page to read from */
555 int iCell, /* Index of cell on page to read */
556 int *piTopic, /* OUT: Topic associated with this key */
557 Blob *pBlob /* If required, use this for dynamic memory */
558 ){
559 int rc = LSM_OK;
560 int nKey;
561 u8 *aKey;
562
563 aKey = pageGetKey(pSeg, pPg, iCell, piTopic, &nKey, pBlob);
564 assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData );
565 if( (void *)aKey!=pBlob->pData ){
566 rc = sortedBlobSet(pEnv, pBlob, aKey, nKey);
567 }
568
569 return rc;
570 }
571
pageGetBtreeRef(Page * pPg,int iKey)572 static Pgno pageGetBtreeRef(Page *pPg, int iKey){
573 Pgno iRef;
574 u8 *aData;
575 int nData;
576 u8 *aCell;
577
578 aData = fsPageData(pPg, &nData);
579 aCell = pageGetCell(aData, nData, iKey);
580 assert( aCell[0]==0 );
581 aCell++;
582 aCell += lsmVarintGet64(aCell, &iRef);
583 lsmVarintGet64(aCell, &iRef);
584 assert( iRef>0 );
585 return iRef;
586 }
587
588 #define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i)))
589 #define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))
590
pageGetBtreeKey(Segment * pSeg,Page * pPg,int iKey,Pgno * piPtr,int * piTopic,void ** ppKey,int * pnKey,Blob * pBlob)591 static int pageGetBtreeKey(
592 Segment *pSeg, /* Segment page pPg belongs to */
593 Page *pPg,
594 int iKey,
595 Pgno *piPtr,
596 int *piTopic,
597 void **ppKey,
598 int *pnKey,
599 Blob *pBlob
600 ){
601 u8 *aData;
602 int nData;
603 u8 *aCell;
604 int eType;
605
606 aData = fsPageData(pPg, &nData);
607 assert( SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData) );
608 assert( iKey>=0 && iKey<pageGetNRec(aData, nData) );
609
610 aCell = pageGetCell(aData, nData, iKey);
611 eType = *aCell++;
612 aCell += GETVARINT64(aCell, *piPtr);
613
614 if( eType==0 ){
615 int rc;
616 Pgno iRef; /* Page number of referenced page */
617 Page *pRef;
618 aCell += GETVARINT64(aCell, iRef);
619 rc = lsmFsDbPageGet(lsmPageFS(pPg), pSeg, iRef, &pRef);
620 if( rc!=LSM_OK ) return rc;
621 pageGetKeyCopy(lsmPageEnv(pPg), pSeg, pRef, 0, &eType, pBlob);
622 lsmFsPageRelease(pRef);
623 *ppKey = pBlob->pData;
624 *pnKey = pBlob->nData;
625 }else{
626 aCell += GETVARINT32(aCell, *pnKey);
627 *ppKey = aCell;
628 }
629 if( piTopic ) *piTopic = rtTopic(eType);
630
631 return LSM_OK;
632 }
633
btreeCursorLoadKey(BtreeCursor * pCsr)634 static int btreeCursorLoadKey(BtreeCursor *pCsr){
635 int rc = LSM_OK;
636 if( pCsr->iPg<0 ){
637 pCsr->pKey = 0;
638 pCsr->nKey = 0;
639 pCsr->eType = 0;
640 }else{
641 Pgno dummy;
642 int iPg = pCsr->iPg;
643 int iCell = pCsr->aPg[iPg].iCell;
644 while( iCell<0 && (--iPg)>=0 ){
645 iCell = pCsr->aPg[iPg].iCell-1;
646 }
647 if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;
648
649 rc = pageGetBtreeKey(
650 pCsr->pSeg,
651 pCsr->aPg[iPg].pPage, iCell,
652 &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
653 );
654 pCsr->eType |= LSM_SEPARATOR;
655 }
656
657 return rc;
658 }
659
btreeCursorPtr(u8 * aData,int nData,int iCell)660 static int btreeCursorPtr(u8 *aData, int nData, int iCell){
661 int nCell;
662
663 nCell = pageGetNRec(aData, nData);
664 if( iCell>=nCell ){
665 return (int)pageGetPtr(aData, nData);
666 }
667 return (int)pageGetRecordPtr(aData, nData, iCell);
668 }
669
btreeCursorNext(BtreeCursor * pCsr)670 static int btreeCursorNext(BtreeCursor *pCsr){
671 int rc = LSM_OK;
672
673 BtreePg *pPg = &pCsr->aPg[pCsr->iPg];
674 int nCell;
675 u8 *aData;
676 int nData;
677
678 assert( pCsr->iPg>=0 );
679 assert( pCsr->iPg==pCsr->nDepth-1 );
680
681 aData = fsPageData(pPg->pPage, &nData);
682 nCell = pageGetNRec(aData, nData);
683 assert( pPg->iCell<=nCell );
684 pPg->iCell++;
685 if( pPg->iCell==nCell ){
686 Pgno iLoad;
687
688 /* Up to parent. */
689 lsmFsPageRelease(pPg->pPage);
690 pPg->pPage = 0;
691 pCsr->iPg--;
692 while( pCsr->iPg>=0 ){
693 pPg = &pCsr->aPg[pCsr->iPg];
694 aData = fsPageData(pPg->pPage, &nData);
695 if( pPg->iCell<pageGetNRec(aData, nData) ) break;
696 lsmFsPageRelease(pPg->pPage);
697 pCsr->iPg--;
698 }
699
700 /* Read the key */
701 rc = btreeCursorLoadKey(pCsr);
702
703 /* Unless the cursor is at EOF, descend to cell -1 (yes, negative one) of
704 ** the left-most most descendent. */
705 if( pCsr->iPg>=0 ){
706 pCsr->aPg[pCsr->iPg].iCell++;
707
708 iLoad = btreeCursorPtr(aData, nData, pPg->iCell);
709 do {
710 Page *pLoad;
711 pCsr->iPg++;
712 rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pLoad);
713 pCsr->aPg[pCsr->iPg].pPage = pLoad;
714 pCsr->aPg[pCsr->iPg].iCell = 0;
715 if( rc==LSM_OK ){
716 if( pCsr->iPg==(pCsr->nDepth-1) ) break;
717 aData = fsPageData(pLoad, &nData);
718 iLoad = btreeCursorPtr(aData, nData, 0);
719 }
720 }while( rc==LSM_OK && pCsr->iPg<(pCsr->nDepth-1) );
721 pCsr->aPg[pCsr->iPg].iCell = -1;
722 }
723
724 }else{
725 rc = btreeCursorLoadKey(pCsr);
726 }
727
728 if( rc==LSM_OK && pCsr->iPg>=0 ){
729 aData = fsPageData(pCsr->aPg[pCsr->iPg].pPage, &nData);
730 pCsr->iPtr = btreeCursorPtr(aData, nData, pCsr->aPg[pCsr->iPg].iCell+1);
731 }
732
733 return rc;
734 }
735
btreeCursorFree(BtreeCursor * pCsr)736 static void btreeCursorFree(BtreeCursor *pCsr){
737 if( pCsr ){
738 int i;
739 lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
740 for(i=0; i<=pCsr->iPg; i++){
741 lsmFsPageRelease(pCsr->aPg[i].pPage);
742 }
743 sortedBlobFree(&pCsr->blob);
744 lsmFree(pEnv, pCsr->aPg);
745 lsmFree(pEnv, pCsr);
746 }
747 }
748
btreeCursorFirst(BtreeCursor * pCsr)749 static int btreeCursorFirst(BtreeCursor *pCsr){
750 int rc;
751
752 Page *pPg = 0;
753 FileSystem *pFS = pCsr->pFS;
754 int iPg = (int)pCsr->pSeg->iRoot;
755
756 do {
757 rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg);
758 assert( (rc==LSM_OK)==(pPg!=0) );
759 if( rc==LSM_OK ){
760 u8 *aData;
761 int nData;
762 int flags;
763
764 aData = fsPageData(pPg, &nData);
765 flags = pageGetFlags(aData, nData);
766 if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
767
768 if( (pCsr->nDepth % 8)==0 ){
769 int nNew = pCsr->nDepth + 8;
770 pCsr->aPg = (BtreePg *)lsmReallocOrFreeRc(
771 lsmFsEnv(pFS), pCsr->aPg, sizeof(BtreePg) * nNew, &rc
772 );
773 if( rc==LSM_OK ){
774 memset(&pCsr->aPg[pCsr->nDepth], 0, sizeof(BtreePg) * 8);
775 }
776 }
777
778 if( rc==LSM_OK ){
779 assert( pCsr->aPg[pCsr->nDepth].iCell==0 );
780 pCsr->aPg[pCsr->nDepth].pPage = pPg;
781 pCsr->nDepth++;
782 iPg = (int)pageGetRecordPtr(aData, nData, 0);
783 }
784 }
785 }while( rc==LSM_OK );
786 lsmFsPageRelease(pPg);
787 pCsr->iPg = pCsr->nDepth-1;
788
789 if( rc==LSM_OK && pCsr->nDepth ){
790 pCsr->aPg[pCsr->iPg].iCell = -1;
791 rc = btreeCursorNext(pCsr);
792 }
793
794 return rc;
795 }
796
btreeCursorPosition(BtreeCursor * pCsr,MergeInput * p)797 static void btreeCursorPosition(BtreeCursor *pCsr, MergeInput *p){
798 if( pCsr->iPg>=0 ){
799 p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
800 p->iCell = ((pCsr->aPg[pCsr->iPg].iCell + 1) << 8) + pCsr->nDepth;
801 }else{
802 p->iPg = 0;
803 p->iCell = 0;
804 }
805 }
806
btreeCursorSplitkey(BtreeCursor * pCsr,MergeInput * p)807 static void btreeCursorSplitkey(BtreeCursor *pCsr, MergeInput *p){
808 int iCell = pCsr->aPg[pCsr->iPg].iCell;
809 if( iCell>=0 ){
810 p->iCell = iCell;
811 p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
812 }else{
813 int i;
814 for(i=pCsr->iPg-1; i>=0; i--){
815 if( pCsr->aPg[i].iCell>0 ) break;
816 }
817 assert( i>=0 );
818 p->iCell = pCsr->aPg[i].iCell-1;
819 p->iPg = lsmFsPageNumber(pCsr->aPg[i].pPage);
820 }
821 }
822
sortedKeyCompare(int (* xCmp)(void *,int,void *,int),int iLhsTopic,void * pLhsKey,int nLhsKey,int iRhsTopic,void * pRhsKey,int nRhsKey)823 static int sortedKeyCompare(
824 int (*xCmp)(void *, int, void *, int),
825 int iLhsTopic, void *pLhsKey, int nLhsKey,
826 int iRhsTopic, void *pRhsKey, int nRhsKey
827 ){
828 int res = iLhsTopic - iRhsTopic;
829 if( res==0 ){
830 res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey);
831 }
832 return res;
833 }
834
btreeCursorRestore(BtreeCursor * pCsr,int (* xCmp)(void *,int,void *,int),MergeInput * p)835 static int btreeCursorRestore(
836 BtreeCursor *pCsr,
837 int (*xCmp)(void *, int, void *, int),
838 MergeInput *p
839 ){
840 int rc = LSM_OK;
841
842 if( p->iPg ){
843 lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
844 int iCell; /* Current cell number on leaf page */
845 Pgno iLeaf; /* Page number of current leaf page */
846 int nDepth; /* Depth of b-tree structure */
847 Segment *pSeg = pCsr->pSeg;
848
849 /* Decode the MergeInput structure */
850 iLeaf = p->iPg;
851 nDepth = (p->iCell & 0x00FF);
852 iCell = (p->iCell >> 8) - 1;
853
854 /* Allocate the BtreeCursor.aPg[] array */
855 assert( pCsr->aPg==0 );
856 pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc);
857
858 /* Populate the last entry of the aPg[] array */
859 if( rc==LSM_OK ){
860 Page **pp = &pCsr->aPg[nDepth-1].pPage;
861 pCsr->iPg = nDepth-1;
862 pCsr->nDepth = nDepth;
863 pCsr->aPg[pCsr->iPg].iCell = iCell;
864 rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp);
865 }
866
867 /* Populate any other aPg[] array entries */
868 if( rc==LSM_OK && nDepth>1 ){
869 Blob blob = {0,0,0};
870 void *pSeek;
871 int nSeek;
872 int iTopicSeek;
873 int iPg = 0;
874 int iLoad = (int)pSeg->iRoot;
875 Page *pPg = pCsr->aPg[nDepth-1].pPage;
876
877 if( pageObjGetNRec(pPg)==0 ){
878 /* This can happen when pPg is the right-most leaf in the b-tree.
879 ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
880 ** greater than any real key. */
881 assert( iCell==-1 );
882 iTopicSeek = 1000;
883 pSeek = 0;
884 nSeek = 0;
885 }else{
886 Pgno dummy;
887 rc = pageGetBtreeKey(pSeg, pPg,
888 0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
889 );
890 }
891
892 do {
893 Page *pPg2;
894 rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg2);
895 assert( rc==LSM_OK || pPg2==0 );
896 if( rc==LSM_OK ){
897 u8 *aData; /* Buffer containing page data */
898 int nData; /* Size of aData[] in bytes */
899 int iMin;
900 int iMax;
901 int iCell2;
902
903 aData = fsPageData(pPg2, &nData);
904 assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
905
906 iLoad = (int)pageGetPtr(aData, nData);
907 iCell2 = pageGetNRec(aData, nData);
908 iMax = iCell2-1;
909 iMin = 0;
910
911 while( iMax>=iMin ){
912 int iTry = (iMin+iMax)/2;
913 void *pKey; int nKey; /* Key for cell iTry */
914 int iTopic; /* Topic for key pKeyT/nKeyT */
915 Pgno iPtr; /* Pointer for cell iTry */
916 int res; /* (pSeek - pKeyT) */
917
918 rc = pageGetBtreeKey(
919 pSeg, pPg2, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob
920 );
921 if( rc!=LSM_OK ) break;
922
923 res = sortedKeyCompare(
924 xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
925 );
926 assert( res!=0 );
927
928 if( res<0 ){
929 iLoad = (int)iPtr;
930 iCell2 = iTry;
931 iMax = iTry-1;
932 }else{
933 iMin = iTry+1;
934 }
935 }
936
937 pCsr->aPg[iPg].pPage = pPg2;
938 pCsr->aPg[iPg].iCell = iCell2;
939 iPg++;
940 assert( iPg!=nDepth-1
941 || lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf
942 );
943 }
944 }while( rc==LSM_OK && iPg<(nDepth-1) );
945 sortedBlobFree(&blob);
946 }
947
948 /* Load the current key and pointer */
949 if( rc==LSM_OK ){
950 BtreePg *pBtreePg;
951 u8 *aData;
952 int nData;
953
954 pBtreePg = &pCsr->aPg[pCsr->iPg];
955 aData = fsPageData(pBtreePg->pPage, &nData);
956 pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
957 if( pBtreePg->iCell<0 ){
958 Pgno dummy;
959 int i;
960 for(i=pCsr->iPg-1; i>=0; i--){
961 if( pCsr->aPg[i].iCell>0 ) break;
962 }
963 assert( i>=0 );
964 rc = pageGetBtreeKey(pSeg,
965 pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
966 &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
967 );
968 pCsr->eType |= LSM_SEPARATOR;
969
970 }else{
971 rc = btreeCursorLoadKey(pCsr);
972 }
973 }
974 }
975 return rc;
976 }
977
btreeCursorNew(lsm_db * pDb,Segment * pSeg,BtreeCursor ** ppCsr)978 static int btreeCursorNew(
979 lsm_db *pDb,
980 Segment *pSeg,
981 BtreeCursor **ppCsr
982 ){
983 int rc = LSM_OK;
984 BtreeCursor *pCsr;
985
986 assert( pSeg->iRoot );
987 pCsr = lsmMallocZeroRc(pDb->pEnv, sizeof(BtreeCursor), &rc);
988 if( pCsr ){
989 pCsr->pFS = pDb->pFS;
990 pCsr->pSeg = pSeg;
991 pCsr->iPg = -1;
992 }
993
994 *ppCsr = pCsr;
995 return rc;
996 }
997
segmentPtrSetPage(SegmentPtr * pPtr,Page * pNext)998 static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
999 lsmFsPageRelease(pPtr->pPg);
1000 if( pNext ){
1001 int nData;
1002 u8 *aData = fsPageData(pNext, &nData);
1003 pPtr->nCell = pageGetNRec(aData, nData);
1004 pPtr->flags = (u16)pageGetFlags(aData, nData);
1005 pPtr->iPtr = pageGetPtr(aData, nData);
1006 }
1007 pPtr->pPg = pNext;
1008 }
1009
1010 /*
1011 ** Load a new page into the SegmentPtr object pPtr.
1012 */
segmentPtrLoadPage(FileSystem * pFS,SegmentPtr * pPtr,int iNew)1013 static int segmentPtrLoadPage(
1014 FileSystem *pFS,
1015 SegmentPtr *pPtr, /* Load page into this SegmentPtr object */
1016 int iNew /* Page number of new page */
1017 ){
1018 Page *pPg = 0; /* The new page */
1019 int rc; /* Return Code */
1020
1021 rc = lsmFsDbPageGet(pFS, pPtr->pSeg, iNew, &pPg);
1022 assert( rc==LSM_OK || pPg==0 );
1023 segmentPtrSetPage(pPtr, pPg);
1024
1025 return rc;
1026 }
1027
segmentPtrReadData(SegmentPtr * pPtr,int iOff,int nByte,void ** ppData,Blob * pBlob)1028 static int segmentPtrReadData(
1029 SegmentPtr *pPtr,
1030 int iOff,
1031 int nByte,
1032 void **ppData,
1033 Blob *pBlob
1034 ){
1035 return sortedReadData(pPtr->pSeg, pPtr->pPg, iOff, nByte, ppData, pBlob);
1036 }
1037
segmentPtrNextPage(SegmentPtr * pPtr,int eDir)1038 static int segmentPtrNextPage(
1039 SegmentPtr *pPtr, /* Load page into this SegmentPtr object */
1040 int eDir /* +1 for next(), -1 for prev() */
1041 ){
1042 Page *pNext; /* New page to load */
1043 int rc; /* Return code */
1044
1045 assert( eDir==1 || eDir==-1 );
1046 assert( pPtr->pPg );
1047 assert( pPtr->pSeg || eDir>0 );
1048
1049 rc = lsmFsDbPageNext(pPtr->pSeg, pPtr->pPg, eDir, &pNext);
1050 assert( rc==LSM_OK || pNext==0 );
1051 segmentPtrSetPage(pPtr, pNext);
1052 return rc;
1053 }
1054
segmentPtrLoadCell(SegmentPtr * pPtr,int iNew)1055 static int segmentPtrLoadCell(
1056 SegmentPtr *pPtr, /* Load page into this SegmentPtr object */
1057 int iNew /* Cell number of new cell */
1058 ){
1059 int rc = LSM_OK;
1060 if( pPtr->pPg ){
1061 u8 *aData; /* Pointer to page data buffer */
1062 int iOff; /* Offset in aData[] to read from */
1063 int nPgsz; /* Size of page (aData[]) in bytes */
1064
1065 assert( iNew<pPtr->nCell );
1066 pPtr->iCell = iNew;
1067 aData = fsPageData(pPtr->pPg, &nPgsz);
1068 iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]);
1069 pPtr->eType = aData[iOff];
1070 iOff++;
1071 iOff += GETVARINT64(&aData[iOff], pPtr->iPgPtr);
1072 iOff += GETVARINT32(&aData[iOff], pPtr->nKey);
1073 if( rtIsWrite(pPtr->eType) ){
1074 iOff += GETVARINT32(&aData[iOff], pPtr->nVal);
1075 }
1076 assert( pPtr->nKey>=0 );
1077
1078 rc = segmentPtrReadData(
1079 pPtr, iOff, pPtr->nKey, &pPtr->pKey, &pPtr->blob1
1080 );
1081 if( rc==LSM_OK && rtIsWrite(pPtr->eType) ){
1082 rc = segmentPtrReadData(
1083 pPtr, iOff+pPtr->nKey, pPtr->nVal, &pPtr->pVal, &pPtr->blob2
1084 );
1085 }else{
1086 pPtr->nVal = 0;
1087 pPtr->pVal = 0;
1088 }
1089 }
1090
1091 return rc;
1092 }
1093
1094
sortedSplitkeySegment(Level * pLevel)1095 static Segment *sortedSplitkeySegment(Level *pLevel){
1096 Merge *pMerge = pLevel->pMerge;
1097 MergeInput *p = &pMerge->splitkey;
1098 Segment *pSeg;
1099 int i;
1100
1101 for(i=0; i<pMerge->nInput; i++){
1102 if( p->iPg==pMerge->aInput[i].iPg ) break;
1103 }
1104 if( pMerge->nInput==(pLevel->nRight+1) && i>=(pMerge->nInput-1) ){
1105 pSeg = &pLevel->pNext->lhs;
1106 }else{
1107 pSeg = &pLevel->aRhs[i];
1108 }
1109
1110 return pSeg;
1111 }
1112
sortedSplitkey(lsm_db * pDb,Level * pLevel,int * pRc)1113 static void sortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
1114 Segment *pSeg;
1115 Page *pPg = 0;
1116 lsm_env *pEnv = pDb->pEnv; /* Environment handle */
1117 int rc = *pRc;
1118 Merge *pMerge = pLevel->pMerge;
1119
1120 pSeg = sortedSplitkeySegment(pLevel);
1121 if( rc==LSM_OK ){
1122 rc = lsmFsDbPageGet(pDb->pFS, pSeg, pMerge->splitkey.iPg, &pPg);
1123 }
1124 if( rc==LSM_OK ){
1125 int iTopic;
1126 Blob blob = {0, 0, 0, 0};
1127 u8 *aData;
1128 int nData;
1129
1130 aData = lsmFsPageData(pPg, &nData);
1131 if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){
1132 void *pKey;
1133 int nKey;
1134 Pgno dummy;
1135 rc = pageGetBtreeKey(pSeg,
1136 pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob
1137 );
1138 if( rc==LSM_OK && blob.pData!=pKey ){
1139 rc = sortedBlobSet(pEnv, &blob, pKey, nKey);
1140 }
1141 }else{
1142 rc = pageGetKeyCopy(
1143 pEnv, pSeg, pPg, pMerge->splitkey.iCell, &iTopic, &blob
1144 );
1145 }
1146
1147 pLevel->iSplitTopic = iTopic;
1148 pLevel->pSplitKey = blob.pData;
1149 pLevel->nSplitKey = blob.nData;
1150 lsmFsPageRelease(pPg);
1151 }
1152
1153 *pRc = rc;
1154 }
1155
1156 /*
1157 ** Reset a segment cursor. Also free its buffers if they are nThreshold
1158 ** bytes or larger in size.
1159 */
segmentPtrReset(SegmentPtr * pPtr,int nThreshold)1160 static void segmentPtrReset(SegmentPtr *pPtr, int nThreshold){
1161 lsmFsPageRelease(pPtr->pPg);
1162 pPtr->pPg = 0;
1163 pPtr->nCell = 0;
1164 pPtr->pKey = 0;
1165 pPtr->nKey = 0;
1166 pPtr->pVal = 0;
1167 pPtr->nVal = 0;
1168 pPtr->eType = 0;
1169 pPtr->iCell = 0;
1170 if( pPtr->blob1.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob1);
1171 if( pPtr->blob2.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob2);
1172 }
1173
segmentPtrIgnoreSeparators(MultiCursor * pCsr,SegmentPtr * pPtr)1174 static int segmentPtrIgnoreSeparators(MultiCursor *pCsr, SegmentPtr *pPtr){
1175 return (pCsr->flags & CURSOR_READ_SEPARATORS)==0
1176 || (pPtr!=&pCsr->aPtr[pCsr->nPtr-1]);
1177 }
1178
segmentPtrAdvance(MultiCursor * pCsr,SegmentPtr * pPtr,int bReverse)1179 static int segmentPtrAdvance(
1180 MultiCursor *pCsr,
1181 SegmentPtr *pPtr,
1182 int bReverse
1183 ){
1184 int eDir = (bReverse ? -1 : 1);
1185 Level *pLvl = pPtr->pLevel;
1186 do {
1187 int rc;
1188 int iCell; /* Number of new cell in page */
1189 int svFlags = 0; /* SegmentPtr.eType before advance */
1190
1191 iCell = pPtr->iCell + eDir;
1192 assert( pPtr->pPg );
1193 assert( iCell<=pPtr->nCell && iCell>=-1 );
1194
1195 if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){
1196 svFlags = pPtr->eType;
1197 assert( svFlags );
1198 }
1199
1200 if( iCell>=pPtr->nCell || iCell<0 ){
1201 do {
1202 rc = segmentPtrNextPage(pPtr, eDir);
1203 }while( rc==LSM_OK
1204 && pPtr->pPg
1205 && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) )
1206 );
1207 if( rc!=LSM_OK ) return rc;
1208 iCell = bReverse ? (pPtr->nCell-1) : 0;
1209 }
1210 rc = segmentPtrLoadCell(pPtr, iCell);
1211 if( rc!=LSM_OK ) return rc;
1212
1213 if( svFlags && pPtr->pPg ){
1214 int res = sortedKeyCompare(pCsr->pDb->xCmp,
1215 rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1216 pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1217 );
1218 if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1219 }
1220
1221 if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
1222 Segment *pSeg = pPtr->pSeg;
1223 rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg);
1224 if( rc!=LSM_OK ) return rc;
1225 pPtr->eType = LSM_START_DELETE | LSM_POINT_DELETE;
1226 pPtr->eType |= (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
1227 pPtr->pKey = pLvl->pSplitKey;
1228 pPtr->nKey = pLvl->nSplitKey;
1229 }
1230
1231 }while( pCsr
1232 && pPtr->pPg
1233 && segmentPtrIgnoreSeparators(pCsr, pPtr)
1234 && rtIsSeparator(pPtr->eType)
1235 );
1236
1237 return LSM_OK;
1238 }
1239
segmentPtrEndPage(FileSystem * pFS,SegmentPtr * pPtr,int bLast,int * pRc)1240 static void segmentPtrEndPage(
1241 FileSystem *pFS,
1242 SegmentPtr *pPtr,
1243 int bLast,
1244 int *pRc
1245 ){
1246 if( *pRc==LSM_OK ){
1247 Segment *pSeg = pPtr->pSeg;
1248 Page *pNew = 0;
1249 if( bLast ){
1250 *pRc = lsmFsDbPageLast(pFS, pSeg, &pNew);
1251 }else{
1252 *pRc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pNew);
1253 }
1254 segmentPtrSetPage(pPtr, pNew);
1255 }
1256 }
1257
1258
1259 /*
1260 ** Try to move the segment pointer passed as the second argument so that it
1261 ** points at either the first (bLast==0) or last (bLast==1) cell in the valid
1262 ** region of the segment defined by pPtr->iFirst and pPtr->iLast.
1263 **
1264 ** Return LSM_OK if successful or an lsm error code if something goes
1265 ** wrong (IO error, OOM etc.).
1266 */
segmentPtrEnd(MultiCursor * pCsr,SegmentPtr * pPtr,int bLast)1267 static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
1268 Level *pLvl = pPtr->pLevel;
1269 int rc = LSM_OK;
1270 FileSystem *pFS = pCsr->pDb->pFS;
1271 int bIgnore;
1272
1273 segmentPtrEndPage(pFS, pPtr, bLast, &rc);
1274 while( rc==LSM_OK && pPtr->pPg
1275 && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
1276 ){
1277 rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
1278 }
1279
1280 if( rc==LSM_OK && pPtr->pPg ){
1281 rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
1282 if( rc==LSM_OK && bLast && pPtr->pSeg!=&pLvl->lhs ){
1283 int res = sortedKeyCompare(pCsr->pDb->xCmp,
1284 rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1285 pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1286 );
1287 if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1288 }
1289 }
1290
1291 bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
1292 if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
1293 rc = segmentPtrAdvance(pCsr, pPtr, bLast);
1294 }
1295
1296 #if 0
1297 if( bLast && rc==LSM_OK && pPtr->pPg
1298 && pPtr->pSeg==&pLvl->lhs
1299 && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
1300 ){
1301 pPtr->iCell++;
1302 pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
1303 pPtr->pKey = pLvl->pSplitKey;
1304 pPtr->nKey = pLvl->nSplitKey;
1305 pPtr->pVal = 0;
1306 pPtr->nVal = 0;
1307 }
1308 #endif
1309
1310 return rc;
1311 }
1312
segmentPtrKey(SegmentPtr * pPtr,void ** ppKey,int * pnKey)1313 static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
1314 assert( pPtr->pPg );
1315 *ppKey = pPtr->pKey;
1316 *pnKey = pPtr->nKey;
1317 }
1318
1319 #if 0 /* NOT USED */
1320 static char *keyToString(lsm_env *pEnv, void *pKey, int nKey){
1321 int i;
1322 u8 *aKey = (u8 *)pKey;
1323 char *zRet = (char *)lsmMalloc(pEnv, nKey+1);
1324
1325 for(i=0; i<nKey; i++){
1326 zRet[i] = (char)(isalnum(aKey[i]) ? aKey[i] : '.');
1327 }
1328 zRet[nKey] = '\0';
1329 return zRet;
1330 }
1331 #endif
1332
1333 #if 0 /* NOT USED */
1334 /*
1335 ** Check that the page that pPtr currently has loaded is the correct page
1336 ** to search for key (pKey/nKey). If it is, return 1. Otherwise, an assert
1337 ** fails and this function does not return.
1338 */
1339 static int assertKeyLocation(
1340 MultiCursor *pCsr,
1341 SegmentPtr *pPtr,
1342 void *pKey, int nKey
1343 ){
1344 lsm_env *pEnv = lsmFsEnv(pCsr->pDb->pFS);
1345 Blob blob = {0, 0, 0};
1346 int eDir;
1347 int iTopic = 0; /* TODO: Fix me */
1348
1349 for(eDir=-1; eDir<=1; eDir+=2){
1350 Page *pTest = pPtr->pPg;
1351
1352 lsmFsPageRef(pTest);
1353 while( pTest ){
1354 Segment *pSeg = pPtr->pSeg;
1355 Page *pNext;
1356
1357 int rc = lsmFsDbPageNext(pSeg, pTest, eDir, &pNext);
1358 lsmFsPageRelease(pTest);
1359 if( rc ) return 1;
1360 pTest = pNext;
1361
1362 if( pTest ){
1363 int nData;
1364 u8 *aData = fsPageData(pTest, &nData);
1365 int nCell = pageGetNRec(aData, nData);
1366 int flags = pageGetFlags(aData, nData);
1367 if( nCell && 0==(flags&SEGMENT_BTREE_FLAG) ){
1368 int nPgKey;
1369 int iPgTopic;
1370 u8 *pPgKey;
1371 int res;
1372 int iCell;
1373
1374 iCell = ((eDir < 0) ? (nCell-1) : 0);
1375 pPgKey = pageGetKey(pSeg, pTest, iCell, &iPgTopic, &nPgKey, &blob);
1376 res = iTopic - iPgTopic;
1377 if( res==0 ) res = pCsr->pDb->xCmp(pKey, nKey, pPgKey, nPgKey);
1378 if( (eDir==1 && res>0) || (eDir==-1 && res<0) ){
1379 /* Taking this branch means something has gone wrong. */
1380 char *zMsg = lsmMallocPrintf(pEnv, "Key \"%s\" is not on page %d",
1381 keyToString(pEnv, pKey, nKey), lsmFsPageNumber(pPtr->pPg)
1382 );
1383 fprintf(stderr, "%s\n", zMsg);
1384 assert( !"assertKeyLocation() failed" );
1385 }
1386 lsmFsPageRelease(pTest);
1387 pTest = 0;
1388 }
1389 }
1390 }
1391 }
1392
1393 sortedBlobFree(&blob);
1394 return 1;
1395 }
1396 #endif
1397
1398 #ifndef NDEBUG
assertSeekResult(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int eSeek)1399 static int assertSeekResult(
1400 MultiCursor *pCsr,
1401 SegmentPtr *pPtr,
1402 int iTopic,
1403 void *pKey,
1404 int nKey,
1405 int eSeek
1406 ){
1407 if( pPtr->pPg ){
1408 int res;
1409 res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
1410 rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
1411 );
1412
1413 if( eSeek==LSM_SEEK_EQ ) return (res==0);
1414 if( eSeek==LSM_SEEK_LE ) return (res>=0);
1415 if( eSeek==LSM_SEEK_GE ) return (res<=0);
1416 }
1417
1418 return 1;
1419 }
1420 #endif
1421
segmentPtrSearchOversized(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey)1422 static int segmentPtrSearchOversized(
1423 MultiCursor *pCsr, /* Cursor context */
1424 SegmentPtr *pPtr, /* Pointer to seek */
1425 int iTopic, /* Topic of key to search for */
1426 void *pKey, int nKey /* Key to seek to */
1427 ){
1428 int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
1429 int rc = LSM_OK;
1430
1431 /* If the OVERSIZED flag is set, then there is no pointer in the
1432 ** upper level to the next page in the segment that contains at least
1433 ** one key. So compare the largest key on the current page with the
1434 ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance
1435 ** to the next page in the segment that contains at least one key.
1436 */
1437 while( rc==LSM_OK && (pPtr->flags & PGFTR_SKIP_NEXT_FLAG) ){
1438 u8 *pLastKey;
1439 int nLastKey;
1440 int iLastTopic;
1441 int res; /* Result of comparison */
1442 Page *pNext;
1443
1444 /* Load the last key on the current page. */
1445 pLastKey = pageGetKey(pPtr->pSeg,
1446 pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
1447 );
1448
1449 /* If the loaded key is >= than (pKey/nKey), break out of the loop.
1450 ** If (pKey/nKey) is present in this array, it must be on the current
1451 ** page. */
1452 res = sortedKeyCompare(
1453 xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey
1454 );
1455 if( res>=0 ) break;
1456
1457 /* Advance to the next page that contains at least one key. */
1458 pNext = pPtr->pPg;
1459 lsmFsPageRef(pNext);
1460 while( 1 ){
1461 Page *pLoad;
1462 u8 *aData; int nData;
1463
1464 rc = lsmFsDbPageNext(pPtr->pSeg, pNext, 1, &pLoad);
1465 lsmFsPageRelease(pNext);
1466 pNext = pLoad;
1467 if( pNext==0 ) break;
1468
1469 assert( rc==LSM_OK );
1470 aData = lsmFsPageData(pNext, &nData);
1471 if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0
1472 && pageGetNRec(aData, nData)>0
1473 ){
1474 break;
1475 }
1476 }
1477 if( pNext==0 ) break;
1478 segmentPtrSetPage(pPtr, pNext);
1479
1480 /* This should probably be an LSM_CORRUPT error. */
1481 assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) );
1482 }
1483
1484 return rc;
1485 }
1486
ptrFwdPointer(Page * pPage,int iCell,Segment * pSeg,Pgno * piPtr,int * pbFound)1487 static int ptrFwdPointer(
1488 Page *pPage,
1489 int iCell,
1490 Segment *pSeg,
1491 Pgno *piPtr,
1492 int *pbFound
1493 ){
1494 Page *pPg = pPage;
1495 int iFirst = iCell;
1496 int rc = LSM_OK;
1497
1498 do {
1499 Page *pNext = 0;
1500 u8 *aData;
1501 int nData;
1502
1503 aData = lsmFsPageData(pPg, &nData);
1504 if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){
1505 int i;
1506 int nCell = pageGetNRec(aData, nData);
1507 for(i=iFirst; i<nCell; i++){
1508 u8 eType = *pageGetCell(aData, nData, i);
1509 if( (eType & LSM_START_DELETE)==0 ){
1510 *pbFound = 1;
1511 *piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData);
1512 lsmFsPageRelease(pPg);
1513 return LSM_OK;
1514 }
1515 }
1516 }
1517
1518 rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
1519 lsmFsPageRelease(pPg);
1520 pPg = pNext;
1521 iFirst = 0;
1522 }while( pPg && rc==LSM_OK );
1523 lsmFsPageRelease(pPg);
1524
1525 *pbFound = 0;
1526 return rc;
1527 }
1528
sortedRhsFirst(MultiCursor * pCsr,Level * pLvl,SegmentPtr * pPtr)1529 static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
1530 int rc;
1531 rc = segmentPtrEnd(pCsr, pPtr, 0);
1532 while( pPtr->pPg && rc==LSM_OK ){
1533 int res = sortedKeyCompare(pCsr->pDb->xCmp,
1534 pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
1535 rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
1536 );
1537 if( res<=0 ) break;
1538 rc = segmentPtrAdvance(pCsr, pPtr, 0);
1539 }
1540 return rc;
1541 }
1542
1543
1544 /*
1545 ** This function is called as part of a SEEK_GE op on a multi-cursor if the
1546 ** FC pointer read from segment *pPtr comes from an entry with the
1547 ** LSM_START_DELETE flag set. In this case the pointer value cannot be
1548 ** trusted. Instead, the pointer that should be followed is that associated
1549 ** with the next entry in *pPtr that does not have LSM_START_DELETE set.
1550 **
1551 ** Why the pointers can't be trusted:
1552 **
1553 **
1554 **
1555 ** TODO: This is a stop-gap solution:
1556 **
1557 ** At the moment, this function is called from within segmentPtrSeek(),
1558 ** as part of the initial lsmMCursorSeek() call. However, consider a
1559 ** database where the following has occurred:
1560 **
1561 ** 1. A range delete removes keys 1..9999 using a range delete.
1562 ** 2. Keys 1 through 9999 are reinserted.
1563 ** 3. The levels containing the ops in 1. and 2. above are merged. Call
1564 ** this level N. Level N contains FC pointers to level N+1.
1565 **
1566 ** Then, if the user attempts to query for (key>=2 LIMIT 10), the
1567 ** lsmMCursorSeek() call will iterate through 9998 entries searching for a
1568 ** pointer down to the level N+1 that is never actually used. It would be
1569 ** much better if the multi-cursor could do this lazily - only seek to the
1570 ** level (N+1) page after the user has moved the cursor on level N passed
1571 ** the big range-delete.
1572 */
segmentPtrFwdPointer(MultiCursor * pCsr,SegmentPtr * pPtr,Pgno * piPtr)1573 static int segmentPtrFwdPointer(
1574 MultiCursor *pCsr, /* Multi-cursor pPtr belongs to */
1575 SegmentPtr *pPtr, /* Segment-pointer to extract FC ptr from */
1576 Pgno *piPtr /* OUT: FC pointer value */
1577 ){
1578 Level *pLvl = pPtr->pLevel;
1579 Level *pNext = pLvl->pNext;
1580 Page *pPg = pPtr->pPg;
1581 int rc;
1582 int bFound;
1583 Pgno iOut = 0;
1584
1585 if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){
1586 if( pNext==0
1587 || (pNext->nRight==0 && pNext->lhs.iRoot)
1588 || (pNext->nRight!=0 && pNext->aRhs[0].iRoot)
1589 ){
1590 /* Do nothing. The pointer will not be used anyway. */
1591 return LSM_OK;
1592 }
1593 }else{
1594 if( pPtr[1].pSeg->iRoot ){
1595 return LSM_OK;
1596 }
1597 }
1598
1599 /* Search for a pointer within the current segment. */
1600 lsmFsPageRef(pPg);
1601 rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound);
1602
1603 if( rc==LSM_OK && bFound==0 ){
1604 /* This case happens when pPtr points to the left-hand-side of a segment
1605 ** currently undergoing an incremental merge. In this case, jump to the
1606 ** oldest segment in the right-hand-side of the same level and continue
1607 ** searching. But - do not consider any keys smaller than the levels
1608 ** split-key. */
1609 SegmentPtr ptr;
1610
1611 if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){
1612 return LSM_CORRUPT_BKPT;
1613 }
1614
1615 memset(&ptr, 0, sizeof(SegmentPtr));
1616 ptr.pLevel = pPtr->pLevel;
1617 ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1];
1618 rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr);
1619 if( rc==LSM_OK ){
1620 rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound);
1621 ptr.pPg = 0;
1622 }
1623 segmentPtrReset(&ptr, 0);
1624 }
1625
1626 *piPtr = iOut;
1627 return rc;
1628 }
1629
segmentPtrSeek(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int eSeek,int * piPtr,int * pbStop)1630 static int segmentPtrSeek(
1631 MultiCursor *pCsr, /* Cursor context */
1632 SegmentPtr *pPtr, /* Pointer to seek */
1633 int iTopic, /* Key topic to seek to */
1634 void *pKey, int nKey, /* Key to seek to */
1635 int eSeek, /* Search bias - see above */
1636 int *piPtr, /* OUT: FC pointer */
1637 int *pbStop
1638 ){
1639 int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
1640 int res = 0; /* Result of comparison operation */
1641 int rc = LSM_OK;
1642 int iMin;
1643 int iMax;
1644 Pgno iPtrOut = 0;
1645
1646 /* If the current page contains an oversized entry, then there are no
1647 ** pointers to one or more of the subsequent pages in the sorted run.
1648 ** The following call ensures that the segment-ptr points to the correct
1649 ** page in this case. */
1650 rc = segmentPtrSearchOversized(pCsr, pPtr, iTopic, pKey, nKey);
1651 iPtrOut = pPtr->iPtr;
1652
1653 /* Assert that this page is the right page of this segment for the key
1654 ** that we are searching for. Do this by loading page (iPg-1) and testing
1655 ** that pKey/nKey is greater than all keys on that page, and then by
1656 ** loading (iPg+1) and testing that pKey/nKey is smaller than all
1657 ** the keys it houses.
1658 **
1659 ** TODO: With range-deletes in the tree, the test described above may fail.
1660 */
1661 #if 0
1662 assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
1663 #endif
1664
1665 assert( pPtr->nCell>0
1666 || pPtr->pSeg->nSize==1
1667 || lsmFsDbPageIsLast(pPtr->pSeg, pPtr->pPg)
1668 );
1669 if( pPtr->nCell==0 ){
1670 segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1671 }else{
1672 iMin = 0;
1673 iMax = pPtr->nCell-1;
1674
1675 while( 1 ){
1676 int iTry = (iMin+iMax)/2;
1677 void *pKeyT; int nKeyT; /* Key for cell iTry */
1678 int iTopicT;
1679
1680 assert( iTry<iMax || iMin==iMax );
1681
1682 rc = segmentPtrLoadCell(pPtr, iTry);
1683 if( rc!=LSM_OK ) break;
1684
1685 segmentPtrKey(pPtr, &pKeyT, &nKeyT);
1686 iTopicT = rtTopic(pPtr->eType);
1687
1688 res = sortedKeyCompare(xCmp, iTopicT, pKeyT, nKeyT, iTopic, pKey, nKey);
1689 if( res<=0 ){
1690 iPtrOut = pPtr->iPtr + pPtr->iPgPtr;
1691 }
1692
1693 if( res==0 || iMin==iMax ){
1694 break;
1695 }else if( res>0 ){
1696 iMax = LSM_MAX(iTry-1, iMin);
1697 }else{
1698 iMin = iTry+1;
1699 }
1700 }
1701
1702 if( rc==LSM_OK ){
1703 assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
1704 if( res ){
1705 rc = segmentPtrLoadCell(pPtr, iMin);
1706 }
1707 assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) );
1708
1709 if( rc==LSM_OK ){
1710 switch( eSeek ){
1711 case LSM_SEEK_EQ: {
1712 int eType = pPtr->eType;
1713 if( (res<0 && (eType & LSM_START_DELETE))
1714 || (res>0 && (eType & LSM_END_DELETE))
1715 || (res==0 && (eType & LSM_POINT_DELETE))
1716 ){
1717 *pbStop = 1;
1718 }else if( res==0 && (eType & LSM_INSERT) ){
1719 lsm_env *pEnv = pCsr->pDb->pEnv;
1720 *pbStop = 1;
1721 pCsr->eType = pPtr->eType;
1722 rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey);
1723 if( rc==LSM_OK ){
1724 rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal);
1725 }
1726 pCsr->flags |= CURSOR_SEEK_EQ;
1727 }
1728 segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1729 break;
1730 }
1731 case LSM_SEEK_LE:
1732 if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
1733 break;
1734 case LSM_SEEK_GE: {
1735 /* Figure out if we need to 'skip' the pointer forward or not */
1736 if( (res<=0 && (pPtr->eType & LSM_START_DELETE))
1737 || (res>0 && (pPtr->eType & LSM_END_DELETE))
1738 ){
1739 rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut);
1740 }
1741 if( res<0 && rc==LSM_OK ){
1742 rc = segmentPtrAdvance(pCsr, pPtr, 0);
1743 }
1744 break;
1745 }
1746 }
1747 }
1748 }
1749
1750 /* If the cursor seek has found a separator key, and this cursor is
1751 ** supposed to ignore separators keys, advance to the next entry. */
1752 if( rc==LSM_OK && pPtr->pPg
1753 && segmentPtrIgnoreSeparators(pCsr, pPtr)
1754 && rtIsSeparator(pPtr->eType)
1755 ){
1756 assert( eSeek!=LSM_SEEK_EQ );
1757 rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
1758 }
1759 }
1760
1761 assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
1762 *piPtr = (int)iPtrOut;
1763 return rc;
1764 }
1765
seekInBtree(MultiCursor * pCsr,Segment * pSeg,int iTopic,void * pKey,int nKey,Pgno * aPg,Page ** ppPg)1766 static int seekInBtree(
1767 MultiCursor *pCsr, /* Multi-cursor object */
1768 Segment *pSeg, /* Seek within this segment */
1769 int iTopic,
1770 void *pKey, int nKey, /* Key to seek to */
1771 Pgno *aPg, /* OUT: Page numbers */
1772 Page **ppPg /* OUT: Leaf (sorted-run) page reference */
1773 ){
1774 int i = 0;
1775 int rc;
1776 int iPg;
1777 Page *pPg = 0;
1778 Blob blob = {0, 0, 0};
1779
1780 iPg = (int)pSeg->iRoot;
1781 do {
1782 Pgno *piFirst = 0;
1783 if( aPg ){
1784 aPg[i++] = iPg;
1785 piFirst = &aPg[i];
1786 }
1787
1788 rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, iPg, &pPg);
1789 assert( rc==LSM_OK || pPg==0 );
1790 if( rc==LSM_OK ){
1791 u8 *aData; /* Buffer containing page data */
1792 int nData; /* Size of aData[] in bytes */
1793 int iMin;
1794 int iMax;
1795 int nRec;
1796 int flags;
1797
1798 aData = fsPageData(pPg, &nData);
1799 flags = pageGetFlags(aData, nData);
1800 if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
1801
1802 iPg = (int)pageGetPtr(aData, nData);
1803 nRec = pageGetNRec(aData, nData);
1804
1805 iMin = 0;
1806 iMax = nRec-1;
1807 while( iMax>=iMin ){
1808 int iTry = (iMin+iMax)/2;
1809 void *pKeyT; int nKeyT; /* Key for cell iTry */
1810 int iTopicT; /* Topic for key pKeyT/nKeyT */
1811 Pgno iPtr; /* Pointer associated with cell iTry */
1812 int res; /* (pKey - pKeyT) */
1813
1814 rc = pageGetBtreeKey(
1815 pSeg, pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob
1816 );
1817 if( rc!=LSM_OK ) break;
1818 if( piFirst && pKeyT==blob.pData ){
1819 *piFirst = pageGetBtreeRef(pPg, iTry);
1820 piFirst = 0;
1821 i++;
1822 }
1823
1824 res = sortedKeyCompare(
1825 pCsr->pDb->xCmp, iTopic, pKey, nKey, iTopicT, pKeyT, nKeyT
1826 );
1827 if( res<0 ){
1828 iPg = (int)iPtr;
1829 iMax = iTry-1;
1830 }else{
1831 iMin = iTry+1;
1832 }
1833 }
1834 lsmFsPageRelease(pPg);
1835 pPg = 0;
1836 }
1837 }while( rc==LSM_OK );
1838
1839 sortedBlobFree(&blob);
1840 assert( (rc==LSM_OK)==(pPg!=0) );
1841 if( ppPg ){
1842 *ppPg = pPg;
1843 }else{
1844 lsmFsPageRelease(pPg);
1845 }
1846 return rc;
1847 }
1848
seekInSegment(MultiCursor * pCsr,SegmentPtr * pPtr,int iTopic,void * pKey,int nKey,int iPg,int eSeek,int * piPtr,int * pbStop)1849 static int seekInSegment(
1850 MultiCursor *pCsr,
1851 SegmentPtr *pPtr,
1852 int iTopic,
1853 void *pKey, int nKey,
1854 int iPg, /* Page to search */
1855 int eSeek, /* Search bias - see above */
1856 int *piPtr, /* OUT: FC pointer */
1857 int *pbStop /* OUT: Stop search flag */
1858 ){
1859 int iPtr = iPg;
1860 int rc = LSM_OK;
1861
1862 if( pPtr->pSeg->iRoot ){
1863 Page *pPg;
1864 assert( pPtr->pSeg->iRoot!=0 );
1865 rc = seekInBtree(pCsr, pPtr->pSeg, iTopic, pKey, nKey, 0, &pPg);
1866 if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
1867 }else{
1868 if( iPtr==0 ){
1869 iPtr = (int)pPtr->pSeg->iFirst;
1870 }
1871 if( rc==LSM_OK ){
1872 rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
1873 }
1874 }
1875
1876 if( rc==LSM_OK ){
1877 rc = segmentPtrSeek(pCsr, pPtr, iTopic, pKey, nKey, eSeek, piPtr, pbStop);
1878 }
1879 return rc;
1880 }
1881
1882 /*
1883 ** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
1884 **
1885 ** pbStop:
1886 ** This parameter is only significant if parameter eSeek is set to
1887 ** LSM_SEEK_EQ. In this case, it is set to true before returning if
1888 ** the seek operation is finished. This can happen in two ways:
1889 **
1890 ** a) A key matching (pKey/nKey) is found, or
1891 ** b) A point-delete or range-delete deleting the key is found.
1892 **
1893 ** In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
1894 ** and pCsr->val blobs populated before returning.
1895 */
seekInLevel(MultiCursor * pCsr,SegmentPtr * aPtr,int eSeek,int iTopic,void * pKey,int nKey,Pgno * piPgno,int * pbStop)1896 static int seekInLevel(
1897 MultiCursor *pCsr, /* Sorted cursor object to seek */
1898 SegmentPtr *aPtr, /* Pointer to array of (nRhs+1) SPs */
1899 int eSeek, /* Search bias - see above */
1900 int iTopic, /* Key topic to search for */
1901 void *pKey, int nKey, /* Key to search for */
1902 Pgno *piPgno, /* IN/OUT: fraction cascade pointer (or 0) */
1903 int *pbStop /* OUT: See above */
1904 ){
1905 Level *pLvl = aPtr[0].pLevel; /* Level to seek within */
1906 int rc = LSM_OK; /* Return code */
1907 int iOut = 0; /* Pointer to return to caller */
1908 int res = -1; /* Result of xCmp(pKey, split) */
1909 int nRhs = pLvl->nRight; /* Number of right-hand-side segments */
1910 int bStop = 0;
1911
1912 /* If this is a composite level (one currently undergoing an incremental
1913 ** merge), figure out if the search key is larger or smaller than the
1914 ** levels split-key. */
1915 if( nRhs ){
1916 res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
1917 pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1918 );
1919 }
1920
1921 /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
1922 ** is not a composite level and there is no split-key). Search the
1923 ** left-hand-side of the level in this case. */
1924 if( res<0 ){
1925 int iPtr = 0;
1926 if( nRhs==0 ) iPtr = (int)*piPgno;
1927
1928 rc = seekInSegment(
1929 pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
1930 );
1931 if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
1932 res = 0;
1933 }
1934 }
1935
1936 if( res>=0 ){
1937 int bHit = 0; /* True if at least one rhs is not EOF */
1938 int iPtr = (int)*piPgno;
1939 int i;
1940 for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
1941 SegmentPtr *pPtr = &aPtr[i];
1942 iOut = 0;
1943 rc = seekInSegment(
1944 pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
1945 );
1946 iPtr = iOut;
1947
1948 /* If the segment-pointer has settled on a key that is smaller than
1949 ** the splitkey, invalidate the segment-pointer. */
1950 if( pPtr->pPg ){
1951 res = sortedKeyCompare(pCsr->pDb->xCmp,
1952 rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
1953 pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
1954 );
1955 if( res<0 ){
1956 if( pPtr->eType & LSM_START_DELETE ){
1957 pPtr->eType &= ~LSM_INSERT;
1958 pPtr->pKey = pLvl->pSplitKey;
1959 pPtr->nKey = pLvl->nSplitKey;
1960 pPtr->pVal = 0;
1961 pPtr->nVal = 0;
1962 }else{
1963 segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
1964 }
1965 }
1966 }
1967
1968 if( aPtr[i].pKey ) bHit = 1;
1969 }
1970
1971 if( rc==LSM_OK && eSeek==LSM_SEEK_LE && bHit==0 ){
1972 rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
1973 }
1974 }
1975
1976 assert( eSeek==LSM_SEEK_EQ || bStop==0 );
1977 *piPgno = iOut;
1978 *pbStop = bStop;
1979 return rc;
1980 }
1981
multiCursorGetKey(MultiCursor * pCsr,int iKey,int * peType,void ** ppKey,int * pnKey)1982 static void multiCursorGetKey(
1983 MultiCursor *pCsr,
1984 int iKey,
1985 int *peType, /* OUT: Key type (SORTED_WRITE etc.) */
1986 void **ppKey, /* OUT: Pointer to buffer containing key */
1987 int *pnKey /* OUT: Size of *ppKey in bytes */
1988 ){
1989 int nKey = 0;
1990 void *pKey = 0;
1991 int eType = 0;
1992
1993 switch( iKey ){
1994 case CURSOR_DATA_TREE0:
1995 case CURSOR_DATA_TREE1: {
1996 TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
1997 if( lsmTreeCursorValid(pTreeCsr) ){
1998 lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey);
1999 }
2000 break;
2001 }
2002
2003 case CURSOR_DATA_SYSTEM: {
2004 Snapshot *pWorker = pCsr->pDb->pWorker;
2005 if( pWorker && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){
2006 int nEntry = pWorker->freelist.nEntry;
2007 if( pCsr->iFree < (nEntry*2) ){
2008 FreelistEntry *aEntry = pWorker->freelist.aEntry;
2009 int i = nEntry - 1 - (pCsr->iFree / 2);
2010 u32 iKey2 = 0;
2011
2012 if( (pCsr->iFree % 2) ){
2013 eType = LSM_END_DELETE|LSM_SYSTEMKEY;
2014 iKey2 = aEntry[i].iBlk-1;
2015 }else if( aEntry[i].iId>=0 ){
2016 eType = LSM_INSERT|LSM_SYSTEMKEY;
2017 iKey2 = aEntry[i].iBlk;
2018
2019 /* If the in-memory entry immediately before this one was a
2020 ** DELETE, and the block number is one greater than the current
2021 ** block number, mark this entry as an "end-delete-range". */
2022 if( i<(nEntry-1) && aEntry[i+1].iBlk==iKey2+1 && aEntry[i+1].iId<0 ){
2023 eType |= LSM_END_DELETE;
2024 }
2025
2026 }else{
2027 eType = LSM_START_DELETE|LSM_SYSTEMKEY;
2028 iKey2 = aEntry[i].iBlk + 1;
2029 }
2030
2031 /* If the in-memory entry immediately after this one is a
2032 ** DELETE, and the block number is one less than the current
2033 ** key, mark this entry as an "start-delete-range". */
2034 if( i>0 && aEntry[i-1].iBlk==iKey2-1 && aEntry[i-1].iId<0 ){
2035 eType |= LSM_START_DELETE;
2036 }
2037
2038 pKey = pCsr->pSystemVal;
2039 nKey = 4;
2040 lsmPutU32(pKey, ~iKey2);
2041 }
2042 }
2043 break;
2044 }
2045
2046 default: {
2047 int iPtr = iKey - CURSOR_DATA_SEGMENT;
2048 assert( iPtr>=0 );
2049 if( iPtr==pCsr->nPtr ){
2050 if( pCsr->pBtCsr ){
2051 pKey = pCsr->pBtCsr->pKey;
2052 nKey = pCsr->pBtCsr->nKey;
2053 eType = pCsr->pBtCsr->eType;
2054 }
2055 }else if( iPtr<pCsr->nPtr ){
2056 SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2057 if( pPtr->pPg ){
2058 pKey = pPtr->pKey;
2059 nKey = pPtr->nKey;
2060 eType = pPtr->eType;
2061 }
2062 }
2063 break;
2064 }
2065 }
2066
2067 if( peType ) *peType = eType;
2068 if( pnKey ) *pnKey = nKey;
2069 if( ppKey ) *ppKey = pKey;
2070 }
2071
sortedDbKeyCompare(MultiCursor * pCsr,int iLhsFlags,void * pLhsKey,int nLhsKey,int iRhsFlags,void * pRhsKey,int nRhsKey)2072 static int sortedDbKeyCompare(
2073 MultiCursor *pCsr,
2074 int iLhsFlags, void *pLhsKey, int nLhsKey,
2075 int iRhsFlags, void *pRhsKey, int nRhsKey
2076 ){
2077 int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
2078 int res;
2079
2080 /* Compare the keys, including the system flag. */
2081 res = sortedKeyCompare(xCmp,
2082 rtTopic(iLhsFlags), pLhsKey, nLhsKey,
2083 rtTopic(iRhsFlags), pRhsKey, nRhsKey
2084 );
2085
2086 /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
2087 ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
2088 ** the beginning of an open-ended set from masking a database entry or
2089 ** delete at a lower level. */
2090 if( res==0 && (pCsr->flags & CURSOR_IGNORE_DELETE) ){
2091 const int m = LSM_POINT_DELETE|LSM_INSERT|LSM_END_DELETE |LSM_START_DELETE;
2092 int iDel1 = 0;
2093 int iDel2 = 0;
2094
2095 if( LSM_START_DELETE==(iLhsFlags & m) ) iDel1 = +1;
2096 if( LSM_END_DELETE ==(iLhsFlags & m) ) iDel1 = -1;
2097 if( LSM_START_DELETE==(iRhsFlags & m) ) iDel2 = +1;
2098 if( LSM_END_DELETE ==(iRhsFlags & m) ) iDel2 = -1;
2099
2100 res = (iDel1 - iDel2);
2101 }
2102
2103 return res;
2104 }
2105
multiCursorDoCompare(MultiCursor * pCsr,int iOut,int bReverse)2106 static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
2107 int i1;
2108 int i2;
2109 int iRes;
2110 void *pKey1; int nKey1; int eType1;
2111 void *pKey2; int nKey2; int eType2;
2112 const int mul = (bReverse ? -1 : 1);
2113
2114 assert( pCsr->aTree && iOut<pCsr->nTree );
2115 if( iOut>=(pCsr->nTree/2) ){
2116 i1 = (iOut - pCsr->nTree/2) * 2;
2117 i2 = i1 + 1;
2118 }else{
2119 i1 = pCsr->aTree[iOut*2];
2120 i2 = pCsr->aTree[iOut*2+1];
2121 }
2122
2123 multiCursorGetKey(pCsr, i1, &eType1, &pKey1, &nKey1);
2124 multiCursorGetKey(pCsr, i2, &eType2, &pKey2, &nKey2);
2125
2126 if( pKey1==0 ){
2127 iRes = i2;
2128 }else if( pKey2==0 ){
2129 iRes = i1;
2130 }else{
2131 int res;
2132
2133 /* Compare the keys */
2134 res = sortedDbKeyCompare(pCsr,
2135 eType1, pKey1, nKey1, eType2, pKey2, nKey2
2136 );
2137
2138 res = res * mul;
2139 if( res==0 ){
2140 /* The two keys are identical. Normally, this means that the key from
2141 ** the newer run clobbers the old. However, if the newer key is a
2142 ** separator key, or a range-delete-boundary only, do not allow it
2143 ** to clobber an older entry. */
2144 int nc1 = (eType1 & (LSM_INSERT|LSM_POINT_DELETE))==0;
2145 int nc2 = (eType2 & (LSM_INSERT|LSM_POINT_DELETE))==0;
2146 iRes = (nc1 > nc2) ? i2 : i1;
2147 }else if( res<0 ){
2148 iRes = i1;
2149 }else{
2150 iRes = i2;
2151 }
2152 }
2153
2154 pCsr->aTree[iOut] = iRes;
2155 }
2156
2157 /*
2158 ** This function advances segment pointer iPtr belonging to multi-cursor
2159 ** pCsr forward (bReverse==0) or backward (bReverse!=0).
2160 **
2161 ** If the segment pointer points to a segment that is part of a composite
2162 ** level, then the following special case is handled.
2163 **
2164 ** * If iPtr is the lhs of a composite level, and the cursor is being
2165 ** advanced forwards, and segment iPtr is at EOF, move all pointers
2166 ** that correspond to rhs segments of the same level to the first
2167 ** key in their respective data.
2168 */
segmentCursorAdvance(MultiCursor * pCsr,int iPtr,int bReverse)2169 static int segmentCursorAdvance(
2170 MultiCursor *pCsr,
2171 int iPtr,
2172 int bReverse
2173 ){
2174 int rc;
2175 SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2176 Level *pLvl = pPtr->pLevel;
2177 int bComposite; /* True if pPtr is part of composite level */
2178
2179 /* Advance the segment-pointer object. */
2180 rc = segmentPtrAdvance(pCsr, pPtr, bReverse);
2181 if( rc!=LSM_OK ) return rc;
2182
2183 bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);
2184 if( bComposite && pPtr->pPg==0 ){
2185 int bFix = 0;
2186 if( (bReverse==0)==(pPtr->pSeg==&pLvl->lhs) ){
2187 int i;
2188 if( bReverse ){
2189 SegmentPtr *pLhs = &pCsr->aPtr[iPtr - 1 - (pPtr->pSeg - pLvl->aRhs)];
2190 for(i=0; i<pLvl->nRight; i++){
2191 if( pLhs[i+1].pPg ) break;
2192 }
2193 if( i==pLvl->nRight ){
2194 bFix = 1;
2195 rc = segmentPtrEnd(pCsr, pLhs, 1);
2196 }
2197 }else{
2198 bFix = 1;
2199 for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
2200 rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
2201 }
2202 }
2203 }
2204
2205 if( bFix ){
2206 int i;
2207 for(i=pCsr->nTree-1; i>0; i--){
2208 multiCursorDoCompare(pCsr, i, bReverse);
2209 }
2210 }
2211 }
2212
2213 #if 0
2214 if( bComposite && pPtr->pSeg==&pLvl->lhs /* lhs of composite level */
2215 && bReverse==0 /* csr advanced forwards */
2216 && pPtr->pPg==0 /* segment at EOF */
2217 ){
2218 int i;
2219 for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
2220 rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
2221 }
2222 for(i=pCsr->nTree-1; i>0; i--){
2223 multiCursorDoCompare(pCsr, i, 0);
2224 }
2225 }
2226 #endif
2227
2228 return rc;
2229 }
2230
mcursorFreeComponents(MultiCursor * pCsr)2231 static void mcursorFreeComponents(MultiCursor *pCsr){
2232 int i;
2233 lsm_env *pEnv = pCsr->pDb->pEnv;
2234
2235 /* Close the tree cursor, if any. */
2236 lsmTreeCursorDestroy(pCsr->apTreeCsr[0]);
2237 lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
2238
2239 /* Reset the segment pointers */
2240 for(i=0; i<pCsr->nPtr; i++){
2241 segmentPtrReset(&pCsr->aPtr[i], 0);
2242 }
2243
2244 /* And the b-tree cursor, if any */
2245 btreeCursorFree(pCsr->pBtCsr);
2246
2247 /* Free allocations */
2248 lsmFree(pEnv, pCsr->aPtr);
2249 lsmFree(pEnv, pCsr->aTree);
2250 lsmFree(pEnv, pCsr->pSystemVal);
2251
2252 /* Zero fields */
2253 pCsr->nPtr = 0;
2254 pCsr->aPtr = 0;
2255 pCsr->nTree = 0;
2256 pCsr->aTree = 0;
2257 pCsr->pSystemVal = 0;
2258 pCsr->apTreeCsr[0] = 0;
2259 pCsr->apTreeCsr[1] = 0;
2260 pCsr->pBtCsr = 0;
2261 }
2262
lsmMCursorFreeCache(lsm_db * pDb)2263 void lsmMCursorFreeCache(lsm_db *pDb){
2264 MultiCursor *p;
2265 MultiCursor *pNext;
2266 for(p=pDb->pCsrCache; p; p=pNext){
2267 pNext = p->pNext;
2268 lsmMCursorClose(p, 0);
2269 }
2270 pDb->pCsrCache = 0;
2271 }
2272
2273 /*
2274 ** Close the cursor passed as the first argument.
2275 **
2276 ** If the bCache parameter is true, then shift the cursor to the pCsrCache
2277 ** list for possible reuse instead of actually deleting it.
2278 */
lsmMCursorClose(MultiCursor * pCsr,int bCache)2279 void lsmMCursorClose(MultiCursor *pCsr, int bCache){
2280 if( pCsr ){
2281 lsm_db *pDb = pCsr->pDb;
2282 MultiCursor **pp; /* Iterator variable */
2283
2284 /* The cursor may or may not be currently part of the linked list
2285 ** starting at lsm_db.pCsr. If it is, extract it. */
2286 for(pp=&pDb->pCsr; *pp; pp=&((*pp)->pNext)){
2287 if( *pp==pCsr ){
2288 *pp = pCsr->pNext;
2289 break;
2290 }
2291 }
2292
2293 if( bCache ){
2294 int i; /* Used to iterate through segment-pointers */
2295
2296 /* Release any page references held by this cursor. */
2297 assert( !pCsr->pBtCsr );
2298 for(i=0; i<pCsr->nPtr; i++){
2299 SegmentPtr *pPtr = &pCsr->aPtr[i];
2300 lsmFsPageRelease(pPtr->pPg);
2301 pPtr->pPg = 0;
2302 }
2303
2304 /* Reset the tree cursors */
2305 lsmTreeCursorReset(pCsr->apTreeCsr[0]);
2306 lsmTreeCursorReset(pCsr->apTreeCsr[1]);
2307
2308 /* Add the cursor to the pCsrCache list */
2309 pCsr->pNext = pDb->pCsrCache;
2310 pDb->pCsrCache = pCsr;
2311 }else{
2312 /* Free the allocation used to cache the current key, if any. */
2313 sortedBlobFree(&pCsr->key);
2314 sortedBlobFree(&pCsr->val);
2315
2316 /* Free the component cursors */
2317 mcursorFreeComponents(pCsr);
2318
2319 /* Free the cursor structure itself */
2320 lsmFree(pDb->pEnv, pCsr);
2321 }
2322 }
2323 }
2324
2325 #define TREE_NONE 0
2326 #define TREE_OLD 1
2327 #define TREE_BOTH 2
2328
2329 /*
2330 ** Parameter eTree is one of TREE_OLD or TREE_BOTH.
2331 */
multiCursorAddTree(MultiCursor * pCsr,Snapshot * pSnap,int eTree)2332 static int multiCursorAddTree(MultiCursor *pCsr, Snapshot *pSnap, int eTree){
2333 int rc = LSM_OK;
2334 lsm_db *db = pCsr->pDb;
2335
2336 /* Add a tree cursor on the 'old' tree, if it exists. */
2337 if( eTree!=TREE_NONE
2338 && lsmTreeHasOld(db)
2339 && db->treehdr.iOldLog!=pSnap->iLogOff
2340 ){
2341 rc = lsmTreeCursorNew(db, 1, &pCsr->apTreeCsr[1]);
2342 }
2343
2344 /* Add a tree cursor on the 'current' tree, if required. */
2345 if( rc==LSM_OK && eTree==TREE_BOTH ){
2346 rc = lsmTreeCursorNew(db, 0, &pCsr->apTreeCsr[0]);
2347 }
2348
2349 return rc;
2350 }
2351
multiCursorAddRhs(MultiCursor * pCsr,Level * pLvl)2352 static int multiCursorAddRhs(MultiCursor *pCsr, Level *pLvl){
2353 int i;
2354 int nRhs = pLvl->nRight;
2355
2356 assert( pLvl->nRight>0 );
2357 assert( pCsr->aPtr==0 );
2358 pCsr->aPtr = lsmMallocZero(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nRhs);
2359 if( !pCsr->aPtr ) return LSM_NOMEM_BKPT;
2360 pCsr->nPtr = nRhs;
2361
2362 for(i=0; i<nRhs; i++){
2363 pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
2364 pCsr->aPtr[i].pLevel = pLvl;
2365 }
2366
2367 return LSM_OK;
2368 }
2369
multiCursorAddOne(MultiCursor * pCsr,Level * pLvl,int * pRc)2370 static void multiCursorAddOne(MultiCursor *pCsr, Level *pLvl, int *pRc){
2371 if( *pRc==LSM_OK ){
2372 int iPtr = pCsr->nPtr;
2373 int i;
2374 pCsr->aPtr[iPtr].pLevel = pLvl;
2375 pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
2376 iPtr++;
2377 for(i=0; i<pLvl->nRight; i++){
2378 pCsr->aPtr[iPtr].pLevel = pLvl;
2379 pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
2380 iPtr++;
2381 }
2382
2383 if( pLvl->nRight && pLvl->pSplitKey==0 ){
2384 sortedSplitkey(pCsr->pDb, pLvl, pRc);
2385 }
2386 pCsr->nPtr = iPtr;
2387 }
2388 }
2389
multiCursorAddAll(MultiCursor * pCsr,Snapshot * pSnap)2390 static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
2391 Level *pLvl;
2392 int nPtr = 0;
2393 int rc = LSM_OK;
2394
2395 for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
2396 /* If the LEVEL_INCOMPLETE flag is set, then this function is being
2397 ** called (indirectly) from within a sortedNewToplevel() call to
2398 ** construct pLvl. In this case ignore pLvl - this cursor is going to
2399 ** be used to retrieve a freelist entry from the LSM, and the partially
2400 ** complete level may confuse it. */
2401 if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
2402 nPtr += (1 + pLvl->nRight);
2403 }
2404
2405 assert( pCsr->aPtr==0 );
2406 pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
2407
2408 for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
2409 if( (pLvl->flags & LEVEL_INCOMPLETE)==0 ){
2410 multiCursorAddOne(pCsr, pLvl, &rc);
2411 }
2412 }
2413
2414 return rc;
2415 }
2416
multiCursorInit(MultiCursor * pCsr,Snapshot * pSnap)2417 static int multiCursorInit(MultiCursor *pCsr, Snapshot *pSnap){
2418 int rc;
2419 rc = multiCursorAddAll(pCsr, pSnap);
2420 if( rc==LSM_OK ){
2421 rc = multiCursorAddTree(pCsr, pSnap, TREE_BOTH);
2422 }
2423 pCsr->flags |= (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
2424 return rc;
2425 }
2426
multiCursorNew(lsm_db * db,int * pRc)2427 static MultiCursor *multiCursorNew(lsm_db *db, int *pRc){
2428 MultiCursor *pCsr;
2429 pCsr = (MultiCursor *)lsmMallocZeroRc(db->pEnv, sizeof(MultiCursor), pRc);
2430 if( pCsr ){
2431 pCsr->pNext = db->pCsr;
2432 db->pCsr = pCsr;
2433 pCsr->pDb = db;
2434 }
2435 return pCsr;
2436 }
2437
2438
lsmSortedRemap(lsm_db * pDb)2439 void lsmSortedRemap(lsm_db *pDb){
2440 MultiCursor *pCsr;
2441 for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
2442 int iPtr;
2443 if( pCsr->pBtCsr ){
2444 btreeCursorLoadKey(pCsr->pBtCsr);
2445 }
2446 for(iPtr=0; iPtr<pCsr->nPtr; iPtr++){
2447 segmentPtrLoadCell(&pCsr->aPtr[iPtr], pCsr->aPtr[iPtr].iCell);
2448 }
2449 }
2450 }
2451
multiCursorReadSeparators(MultiCursor * pCsr)2452 static void multiCursorReadSeparators(MultiCursor *pCsr){
2453 if( pCsr->nPtr>0 ){
2454 pCsr->flags |= CURSOR_READ_SEPARATORS;
2455 }
2456 }
2457
2458 /*
2459 ** Have this cursor skip over SORTED_DELETE entries.
2460 */
multiCursorIgnoreDelete(MultiCursor * pCsr)2461 static void multiCursorIgnoreDelete(MultiCursor *pCsr){
2462 if( pCsr ) pCsr->flags |= CURSOR_IGNORE_DELETE;
2463 }
2464
2465 /*
2466 ** If the free-block list is not empty, then have this cursor visit a key
2467 ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value
2468 ** blob containing the serialized free-block list.
2469 */
multiCursorVisitFreelist(MultiCursor * pCsr)2470 static int multiCursorVisitFreelist(MultiCursor *pCsr){
2471 int rc = LSM_OK;
2472 pCsr->flags |= CURSOR_FLUSH_FREELIST;
2473 pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
2474 return rc;
2475 }
2476
2477 /*
2478 ** Allocate and return a new database cursor.
2479 **
2480 ** This method should only be called to allocate user cursors. As it may
2481 ** recycle a cursor from lsm_db.pCsrCache.
2482 */
lsmMCursorNew(lsm_db * pDb,MultiCursor ** ppCsr)2483 int lsmMCursorNew(
2484 lsm_db *pDb, /* Database handle */
2485 MultiCursor **ppCsr /* OUT: Allocated cursor */
2486 ){
2487 MultiCursor *pCsr = 0;
2488 int rc = LSM_OK;
2489
2490 if( pDb->pCsrCache ){
2491 int bOld; /* True if there is an old in-memory tree */
2492
2493 /* Remove a cursor from the pCsrCache list and add it to the open list. */
2494 pCsr = pDb->pCsrCache;
2495 pDb->pCsrCache = pCsr->pNext;
2496 pCsr->pNext = pDb->pCsr;
2497 pDb->pCsr = pCsr;
2498
2499 /* The cursor can almost be used as is, except that the old in-memory
2500 ** tree cursor may be present and not required, or required and not
2501 ** present. Fix this if required. */
2502 bOld = (lsmTreeHasOld(pDb) && pDb->treehdr.iOldLog!=pDb->pClient->iLogOff);
2503 if( !bOld && pCsr->apTreeCsr[1] ){
2504 lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
2505 pCsr->apTreeCsr[1] = 0;
2506 }else if( bOld && !pCsr->apTreeCsr[1] ){
2507 rc = lsmTreeCursorNew(pDb, 1, &pCsr->apTreeCsr[1]);
2508 }
2509
2510 pCsr->flags = (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
2511
2512 }else{
2513 pCsr = multiCursorNew(pDb, &rc);
2514 if( rc==LSM_OK ) rc = multiCursorInit(pCsr, pDb->pClient);
2515 }
2516
2517 if( rc!=LSM_OK ){
2518 lsmMCursorClose(pCsr, 0);
2519 pCsr = 0;
2520 }
2521 assert( (rc==LSM_OK)==(pCsr!=0) );
2522 *ppCsr = pCsr;
2523 return rc;
2524 }
2525
multiCursorGetVal(MultiCursor * pCsr,int iVal,void ** ppVal,int * pnVal)2526 static int multiCursorGetVal(
2527 MultiCursor *pCsr,
2528 int iVal,
2529 void **ppVal,
2530 int *pnVal
2531 ){
2532 int rc = LSM_OK;
2533
2534 *ppVal = 0;
2535 *pnVal = 0;
2536
2537 switch( iVal ){
2538 case CURSOR_DATA_TREE0:
2539 case CURSOR_DATA_TREE1: {
2540 TreeCursor *pTreeCsr = pCsr->apTreeCsr[iVal-CURSOR_DATA_TREE0];
2541 if( lsmTreeCursorValid(pTreeCsr) ){
2542 lsmTreeCursorValue(pTreeCsr, ppVal, pnVal);
2543 }else{
2544 *ppVal = 0;
2545 *pnVal = 0;
2546 }
2547 break;
2548 }
2549
2550 case CURSOR_DATA_SYSTEM: {
2551 Snapshot *pWorker = pCsr->pDb->pWorker;
2552 if( pWorker
2553 && (pCsr->iFree % 2)==0
2554 && pCsr->iFree < (pWorker->freelist.nEntry*2)
2555 ){
2556 int iEntry = pWorker->freelist.nEntry - 1 - (pCsr->iFree / 2);
2557 u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
2558 lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
2559 *ppVal = aVal;
2560 *pnVal = 8;
2561 }
2562 break;
2563 }
2564
2565 default: {
2566 int iPtr = iVal-CURSOR_DATA_SEGMENT;
2567 if( iPtr<pCsr->nPtr ){
2568 SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
2569 if( pPtr->pPg ){
2570 *ppVal = pPtr->pVal;
2571 *pnVal = pPtr->nVal;
2572 }
2573 }
2574 }
2575 }
2576
2577 assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
2578 return rc;
2579 }
2580
2581 static int multiCursorAdvance(MultiCursor *pCsr, int bReverse);
2582
2583 /*
2584 ** This function is called by worker connections to walk the part of the
2585 ** free-list stored within the LSM data structure.
2586 */
lsmSortedWalkFreelist(lsm_db * pDb,int bReverse,int (* x)(void *,int,i64),void * pCtx)2587 int lsmSortedWalkFreelist(
2588 lsm_db *pDb, /* Database handle */
2589 int bReverse, /* True to iterate from largest to smallest */
2590 int (*x)(void *, int, i64), /* Callback function */
2591 void *pCtx /* First argument to pass to callback */
2592 ){
2593 MultiCursor *pCsr; /* Cursor used to read db */
2594 int rc = LSM_OK; /* Return Code */
2595 Snapshot *pSnap = 0;
2596
2597 assert( pDb->pWorker );
2598 if( pDb->bIncrMerge ){
2599 rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
2600 if( rc!=LSM_OK ) return rc;
2601 }else{
2602 pSnap = pDb->pWorker;
2603 }
2604
2605 pCsr = multiCursorNew(pDb, &rc);
2606 if( pCsr ){
2607 rc = multiCursorAddAll(pCsr, pSnap);
2608 pCsr->flags |= CURSOR_IGNORE_DELETE;
2609 }
2610
2611 if( rc==LSM_OK ){
2612 if( bReverse==0 ){
2613 rc = lsmMCursorLast(pCsr);
2614 }else{
2615 rc = lsmMCursorSeek(pCsr, 1, "", 0, LSM_SEEK_GE);
2616 }
2617
2618 while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
2619 void *pKey; int nKey;
2620 void *pVal = 0; int nVal = 0;
2621
2622 rc = lsmMCursorKey(pCsr, &pKey, &nKey);
2623 if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
2624 if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;
2625
2626 if( rc==LSM_OK ){
2627 int iBlk;
2628 i64 iSnap;
2629 iBlk = (int)(~(lsmGetU32((u8 *)pKey)));
2630 iSnap = (i64)lsmGetU64((u8 *)pVal);
2631 if( x(pCtx, iBlk, iSnap) ) break;
2632 rc = multiCursorAdvance(pCsr, !bReverse);
2633 }
2634 }
2635 }
2636
2637 lsmMCursorClose(pCsr, 0);
2638 if( pSnap!=pDb->pWorker ){
2639 lsmFreeSnapshot(pDb->pEnv, pSnap);
2640 }
2641
2642 return rc;
2643 }
2644
lsmSortedLoadFreelist(lsm_db * pDb,void ** ppVal,int * pnVal)2645 int lsmSortedLoadFreelist(
2646 lsm_db *pDb, /* Database handle (must be worker) */
2647 void **ppVal, /* OUT: Blob containing LSM free-list */
2648 int *pnVal /* OUT: Size of *ppVal blob in bytes */
2649 ){
2650 MultiCursor *pCsr; /* Cursor used to retreive free-list */
2651 int rc = LSM_OK; /* Return Code */
2652
2653 assert( pDb->pWorker );
2654 assert( *ppVal==0 && *pnVal==0 );
2655
2656 pCsr = multiCursorNew(pDb, &rc);
2657 if( pCsr ){
2658 rc = multiCursorAddAll(pCsr, pDb->pWorker);
2659 pCsr->flags |= CURSOR_IGNORE_DELETE;
2660 }
2661
2662 if( rc==LSM_OK ){
2663 rc = lsmMCursorLast(pCsr);
2664 if( rc==LSM_OK
2665 && rtIsWrite(pCsr->eType) && rtIsSystem(pCsr->eType)
2666 && pCsr->key.nData==8
2667 && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
2668 ){
2669 void *pVal; int nVal; /* Value read from database */
2670 rc = lsmMCursorValue(pCsr, &pVal, &nVal);
2671 if( rc==LSM_OK ){
2672 *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
2673 if( *ppVal ){
2674 memcpy(*ppVal, pVal, nVal);
2675 *pnVal = nVal;
2676 }
2677 }
2678 }
2679
2680 lsmMCursorClose(pCsr, 0);
2681 }
2682
2683 return rc;
2684 }
2685
multiCursorAllocTree(MultiCursor * pCsr)2686 static int multiCursorAllocTree(MultiCursor *pCsr){
2687 int rc = LSM_OK;
2688 if( pCsr->aTree==0 ){
2689 int nByte; /* Bytes of space to allocate */
2690 int nMin; /* Total number of cursors being merged */
2691
2692 nMin = CURSOR_DATA_SEGMENT + pCsr->nPtr + (pCsr->pBtCsr!=0);
2693 pCsr->nTree = 2;
2694 while( pCsr->nTree<nMin ){
2695 pCsr->nTree = pCsr->nTree*2;
2696 }
2697
2698 nByte = sizeof(int)*pCsr->nTree*2;
2699 pCsr->aTree = (int *)lsmMallocZeroRc(pCsr->pDb->pEnv, nByte, &rc);
2700 }
2701 return rc;
2702 }
2703
multiCursorCacheKey(MultiCursor * pCsr,int * pRc)2704 static void multiCursorCacheKey(MultiCursor *pCsr, int *pRc){
2705 if( *pRc==LSM_OK ){
2706 void *pKey;
2707 int nKey;
2708 multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
2709 *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
2710 }
2711 }
2712
2713 #ifdef LSM_DEBUG_EXPENSIVE
assertCursorTree(MultiCursor * pCsr)2714 static void assertCursorTree(MultiCursor *pCsr){
2715 int bRev = !!(pCsr->flags & CURSOR_PREV_OK);
2716 int *aSave = pCsr->aTree;
2717 int nSave = pCsr->nTree;
2718 int rc;
2719
2720 pCsr->aTree = 0;
2721 pCsr->nTree = 0;
2722 rc = multiCursorAllocTree(pCsr);
2723 if( rc==LSM_OK ){
2724 int i;
2725 for(i=pCsr->nTree-1; i>0; i--){
2726 multiCursorDoCompare(pCsr, i, bRev);
2727 }
2728
2729 assert( nSave==pCsr->nTree
2730 && 0==memcmp(aSave, pCsr->aTree, sizeof(int)*nSave)
2731 );
2732
2733 lsmFree(pCsr->pDb->pEnv, pCsr->aTree);
2734 }
2735
2736 pCsr->aTree = aSave;
2737 pCsr->nTree = nSave;
2738 }
2739 #else
2740 # define assertCursorTree(x)
2741 #endif
2742
mcursorLocationOk(MultiCursor * pCsr,int bDeleteOk)2743 static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
2744 int eType = pCsr->eType;
2745 int iKey;
2746 int i;
2747 int rdmask;
2748
2749 assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
2750 assertCursorTree(pCsr);
2751
2752 rdmask = (pCsr->flags & CURSOR_NEXT_OK) ? LSM_END_DELETE : LSM_START_DELETE;
2753
2754 /* If the cursor does not currently point to an actual database key (i.e.
2755 ** it points to a delete key, or the start or end of a range-delete), and
2756 ** the CURSOR_IGNORE_DELETE flag is set, skip past this entry. */
2757 if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
2758 if( (eType & LSM_INSERT)==0 ) return 0;
2759 }
2760
2761 /* If the cursor points to a system key (free-list entry), and the
2762 ** CURSOR_IGNORE_SYSTEM flag is set, skip thie entry. */
2763 if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
2764 return 0;
2765 }
2766
2767 #ifndef NDEBUG
2768 /* This block fires assert() statements to check one of the assumptions
2769 ** in the comment below - that if the lhs sub-cursor of a level undergoing
2770 ** a merge is valid, then all the rhs sub-cursors must be at EOF.
2771 **
2772 ** Also assert that all rhs sub-cursors are either at EOF or point to
2773 ** a key that is not less than the level split-key. */
2774 for(i=0; i<pCsr->nPtr; i++){
2775 SegmentPtr *pPtr = &pCsr->aPtr[i];
2776 Level *pLvl = pPtr->pLevel;
2777 if( pLvl->nRight && pPtr->pPg ){
2778 if( pPtr->pSeg==&pLvl->lhs ){
2779 int j;
2780 for(j=0; j<pLvl->nRight; j++) assert( pPtr[j+1].pPg==0 );
2781 }else{
2782 int res = sortedKeyCompare(pCsr->pDb->xCmp,
2783 rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
2784 pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
2785 );
2786 assert( res>=0 );
2787 }
2788 }
2789 }
2790 #endif
2791
2792 /* Now check if this key has already been deleted by a range-delete. If
2793 ** so, skip past it.
2794 **
2795 ** Assume, for the moment, that the tree contains no levels currently
2796 ** undergoing incremental merge, and that this cursor is iterating forwards
2797 ** through the database keys. The cursor currently points to a key in
2798 ** level L. This key has already been deleted if any of the sub-cursors
2799 ** that point to levels newer than L (or to the in-memory tree) point to
2800 ** a key greater than the current key with the LSM_END_DELETE flag set.
2801 **
2802 ** Or, if the cursor is iterating backwards through data keys, if any
2803 ** such sub-cursor points to a key smaller than the current key with the
2804 ** LSM_START_DELETE flag set.
2805 **
2806 ** Why it works with levels undergoing a merge too:
2807 **
2808 ** When a cursor iterates forwards, the sub-cursors for the rhs of a
2809 ** level are only activated once the lhs reaches EOF. So when iterating
2810 ** forwards, the keys visited are the same as if the level was completely
2811 ** merged.
2812 **
2813 ** If the cursor is iterating backwards, then the lhs sub-cursor is not
2814 ** initialized until the last of the rhs sub-cursors has reached EOF.
2815 ** Additionally, if the START_DELETE flag is set on the last entry (in
2816 ** reverse order - so the entry with the smallest key) of a rhs sub-cursor,
2817 ** then a pseudo-key equal to the levels split-key with the END_DELETE
2818 ** flag set is visited by the sub-cursor.
2819 */
2820 iKey = pCsr->aTree[1];
2821 for(i=0; i<iKey; i++){
2822 int csrflags;
2823 multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
2824 if( (rdmask & csrflags) ){
2825 const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
2826 if( (csrflags & SD_ED)==SD_ED
2827 || (pCsr->flags & CURSOR_IGNORE_DELETE)==0
2828 ){
2829 void *pKey; int nKey;
2830 multiCursorGetKey(pCsr, i, 0, &pKey, &nKey);
2831 if( 0==sortedKeyCompare(pCsr->pDb->xCmp,
2832 rtTopic(eType), pCsr->key.pData, pCsr->key.nData,
2833 rtTopic(csrflags), pKey, nKey
2834 )){
2835 continue;
2836 }
2837 }
2838 return 0;
2839 }
2840 }
2841
2842 /* The current cursor position is one this cursor should visit. Return 1. */
2843 return 1;
2844 }
2845
multiCursorSetupTree(MultiCursor * pCsr,int bRev)2846 static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
2847 int rc;
2848
2849 rc = multiCursorAllocTree(pCsr);
2850 if( rc==LSM_OK ){
2851 int i;
2852 for(i=pCsr->nTree-1; i>0; i--){
2853 multiCursorDoCompare(pCsr, i, bRev);
2854 }
2855 }
2856
2857 assertCursorTree(pCsr);
2858 multiCursorCacheKey(pCsr, &rc);
2859
2860 if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
2861 rc = multiCursorAdvance(pCsr, bRev);
2862 }
2863 return rc;
2864 }
2865
2866
multiCursorEnd(MultiCursor * pCsr,int bLast)2867 static int multiCursorEnd(MultiCursor *pCsr, int bLast){
2868 int rc = LSM_OK;
2869 int i;
2870
2871 pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
2872 pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
2873 pCsr->iFree = 0;
2874
2875 /* Position the two in-memory tree cursors */
2876 for(i=0; rc==LSM_OK && i<2; i++){
2877 if( pCsr->apTreeCsr[i] ){
2878 rc = lsmTreeCursorEnd(pCsr->apTreeCsr[i], bLast);
2879 }
2880 }
2881
2882 for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
2883 SegmentPtr *pPtr = &pCsr->aPtr[i];
2884 Level *pLvl = pPtr->pLevel;
2885 int iRhs;
2886 int bHit = 0;
2887
2888 if( bLast ){
2889 for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
2890 rc = segmentPtrEnd(pCsr, &pPtr[iRhs+1], 1);
2891 if( pPtr[iRhs+1].pPg ) bHit = 1;
2892 }
2893 if( bHit==0 && rc==LSM_OK ){
2894 rc = segmentPtrEnd(pCsr, pPtr, 1);
2895 }else{
2896 segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
2897 }
2898 }else{
2899 int bLhs = (pPtr->pSeg==&pLvl->lhs);
2900 assert( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[0] );
2901
2902 if( bLhs ){
2903 rc = segmentPtrEnd(pCsr, pPtr, 0);
2904 if( pPtr->pKey ) bHit = 1;
2905 }
2906 for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
2907 if( bHit ){
2908 segmentPtrReset(&pPtr[iRhs+1], LSM_SEGMENTPTR_FREE_THRESHOLD);
2909 }else{
2910 rc = sortedRhsFirst(pCsr, pLvl, &pPtr[iRhs+bLhs]);
2911 }
2912 }
2913 }
2914 i += pLvl->nRight;
2915 }
2916
2917 /* And the b-tree cursor, if applicable */
2918 if( rc==LSM_OK && pCsr->pBtCsr ){
2919 assert( bLast==0 );
2920 rc = btreeCursorFirst(pCsr->pBtCsr);
2921 }
2922
2923 if( rc==LSM_OK ){
2924 rc = multiCursorSetupTree(pCsr, bLast);
2925 }
2926
2927 return rc;
2928 }
2929
2930
mcursorSave(MultiCursor * pCsr)2931 int mcursorSave(MultiCursor *pCsr){
2932 int rc = LSM_OK;
2933 if( pCsr->aTree ){
2934 int iTree = pCsr->aTree[1];
2935 if( iTree==CURSOR_DATA_TREE0 || iTree==CURSOR_DATA_TREE1 ){
2936 multiCursorCacheKey(pCsr, &rc);
2937 }
2938 }
2939 mcursorFreeComponents(pCsr);
2940 return rc;
2941 }
2942
mcursorRestore(lsm_db * pDb,MultiCursor * pCsr)2943 int mcursorRestore(lsm_db *pDb, MultiCursor *pCsr){
2944 int rc;
2945 rc = multiCursorInit(pCsr, pDb->pClient);
2946 if( rc==LSM_OK && pCsr->key.pData ){
2947 rc = lsmMCursorSeek(pCsr,
2948 rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, +1
2949 );
2950 }
2951 return rc;
2952 }
2953
lsmSaveCursors(lsm_db * pDb)2954 int lsmSaveCursors(lsm_db *pDb){
2955 int rc = LSM_OK;
2956 MultiCursor *pCsr;
2957
2958 for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
2959 rc = mcursorSave(pCsr);
2960 }
2961 return rc;
2962 }
2963
lsmRestoreCursors(lsm_db * pDb)2964 int lsmRestoreCursors(lsm_db *pDb){
2965 int rc = LSM_OK;
2966 MultiCursor *pCsr;
2967
2968 for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
2969 rc = mcursorRestore(pDb, pCsr);
2970 }
2971 return rc;
2972 }
2973
lsmMCursorFirst(MultiCursor * pCsr)2974 int lsmMCursorFirst(MultiCursor *pCsr){
2975 return multiCursorEnd(pCsr, 0);
2976 }
2977
lsmMCursorLast(MultiCursor * pCsr)2978 int lsmMCursorLast(MultiCursor *pCsr){
2979 return multiCursorEnd(pCsr, 1);
2980 }
2981
lsmMCursorDb(MultiCursor * pCsr)2982 lsm_db *lsmMCursorDb(MultiCursor *pCsr){
2983 return pCsr->pDb;
2984 }
2985
lsmMCursorReset(MultiCursor * pCsr)2986 void lsmMCursorReset(MultiCursor *pCsr){
2987 int i;
2988 lsmTreeCursorReset(pCsr->apTreeCsr[0]);
2989 lsmTreeCursorReset(pCsr->apTreeCsr[1]);
2990 for(i=0; i<pCsr->nPtr; i++){
2991 segmentPtrReset(&pCsr->aPtr[i], LSM_SEGMENTPTR_FREE_THRESHOLD);
2992 }
2993 pCsr->key.nData = 0;
2994 }
2995
treeCursorSeek(MultiCursor * pCsr,TreeCursor * pTreeCsr,void * pKey,int nKey,int eSeek,int * pbStop)2996 static int treeCursorSeek(
2997 MultiCursor *pCsr,
2998 TreeCursor *pTreeCsr,
2999 void *pKey, int nKey,
3000 int eSeek,
3001 int *pbStop
3002 ){
3003 int rc = LSM_OK;
3004 if( pTreeCsr ){
3005 int res = 0;
3006 lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
3007 switch( eSeek ){
3008 case LSM_SEEK_EQ: {
3009 int eType = lsmTreeCursorFlags(pTreeCsr);
3010 if( (res<0 && (eType & LSM_START_DELETE))
3011 || (res>0 && (eType & LSM_END_DELETE))
3012 || (res==0 && (eType & LSM_POINT_DELETE))
3013 ){
3014 *pbStop = 1;
3015 }else if( res==0 && (eType & LSM_INSERT) ){
3016 lsm_env *pEnv = pCsr->pDb->pEnv;
3017 void *p; int n; /* Key/value from tree-cursor */
3018 *pbStop = 1;
3019 pCsr->flags |= CURSOR_SEEK_EQ;
3020 rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n);
3021 if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->key, p, n);
3022 if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n);
3023 if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->val, p, n);
3024 }
3025 lsmTreeCursorReset(pTreeCsr);
3026 break;
3027 }
3028 case LSM_SEEK_GE:
3029 if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
3030 lsmTreeCursorNext(pTreeCsr);
3031 }
3032 break;
3033 default:
3034 if( res>0 ){
3035 assert( lsmTreeCursorValid(pTreeCsr) );
3036 lsmTreeCursorPrev(pTreeCsr);
3037 }
3038 break;
3039 }
3040 }
3041 return rc;
3042 }
3043
3044
3045 /*
3046 ** Seek the cursor.
3047 */
lsmMCursorSeek(MultiCursor * pCsr,int iTopic,void * pKey,int nKey,int eSeek)3048 int lsmMCursorSeek(
3049 MultiCursor *pCsr,
3050 int iTopic,
3051 void *pKey, int nKey,
3052 int eSeek
3053 ){
3054 int eESeek = eSeek; /* Effective eSeek parameter */
3055 int bStop = 0; /* Set to true to halt search operation */
3056 int rc = LSM_OK; /* Return code */
3057 int iPtr = 0; /* Used to iterate through pCsr->aPtr[] */
3058 Pgno iPgno = 0; /* FC pointer value */
3059
3060 assert( pCsr->apTreeCsr[0]==0 || iTopic==0 );
3061 assert( pCsr->apTreeCsr[1]==0 || iTopic==0 );
3062
3063 if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
3064
3065 assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
3066 assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 );
3067 assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );
3068
3069 pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
3070 rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
3071 if( rc==LSM_OK && bStop==0 ){
3072 rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
3073 }
3074
3075 /* Seek all segment pointers. */
3076 for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
3077 SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
3078 assert( pPtr->pSeg==&pPtr->pLevel->lhs );
3079 rc = seekInLevel(pCsr, pPtr, eESeek, iTopic, pKey, nKey, &iPgno, &bStop);
3080 iPtr += pPtr->pLevel->nRight;
3081 }
3082
3083 if( eSeek!=LSM_SEEK_EQ ){
3084 if( rc==LSM_OK ){
3085 rc = multiCursorAllocTree(pCsr);
3086 }
3087 if( rc==LSM_OK ){
3088 int i;
3089 for(i=pCsr->nTree-1; i>0; i--){
3090 multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
3091 }
3092 if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
3093 if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
3094 }
3095
3096 multiCursorCacheKey(pCsr, &rc);
3097 if( rc==LSM_OK && eSeek!=LSM_SEEK_LEFAST && 0==mcursorLocationOk(pCsr, 0) ){
3098 switch( eESeek ){
3099 case LSM_SEEK_EQ:
3100 lsmMCursorReset(pCsr);
3101 break;
3102 case LSM_SEEK_GE:
3103 rc = lsmMCursorNext(pCsr);
3104 break;
3105 default:
3106 rc = lsmMCursorPrev(pCsr);
3107 break;
3108 }
3109 }
3110 }
3111
3112 return rc;
3113 }
3114
lsmMCursorValid(MultiCursor * pCsr)3115 int lsmMCursorValid(MultiCursor *pCsr){
3116 int res = 0;
3117 if( pCsr->flags & CURSOR_SEEK_EQ ){
3118 res = 1;
3119 }else if( pCsr->aTree ){
3120 int iKey = pCsr->aTree[1];
3121 if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3122 res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
3123 }else{
3124 void *pKey;
3125 multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
3126 res = pKey!=0;
3127 }
3128 }
3129 return res;
3130 }
3131
mcursorAdvanceOk(MultiCursor * pCsr,int bReverse,int * pRc)3132 static int mcursorAdvanceOk(
3133 MultiCursor *pCsr,
3134 int bReverse,
3135 int *pRc
3136 ){
3137 void *pNew; /* Pointer to buffer containing new key */
3138 int nNew; /* Size of buffer pNew in bytes */
3139 int eNewType; /* Type of new record */
3140
3141 if( *pRc ) return 1;
3142
3143 /* Check the current key value. If it is not greater than (if bReverse==0)
3144 ** or less than (if bReverse!=0) the key currently cached in pCsr->key,
3145 ** then the cursor has not yet been successfully advanced.
3146 */
3147 multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew);
3148 if( pNew ){
3149 int typemask = (pCsr->flags & CURSOR_IGNORE_DELETE) ? ~(0) : LSM_SYSTEMKEY;
3150 int res = sortedDbKeyCompare(pCsr,
3151 eNewType & typemask, pNew, nNew,
3152 pCsr->eType & typemask, pCsr->key.pData, pCsr->key.nData
3153 );
3154
3155 if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){
3156 return 0;
3157 }
3158
3159 multiCursorCacheKey(pCsr, pRc);
3160 assert( pCsr->eType==eNewType );
3161
3162 /* If this cursor is configured to skip deleted keys, and the current
3163 ** cursor points to a SORTED_DELETE entry, then the cursor has not been
3164 ** successfully advanced.
3165 **
3166 ** Similarly, if the cursor is configured to skip system keys and the
3167 ** current cursor points to a system key, it has not yet been advanced.
3168 */
3169 if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
3170 }
3171 return 1;
3172 }
3173
flCsrAdvance(MultiCursor * pCsr)3174 static void flCsrAdvance(MultiCursor *pCsr){
3175 assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
3176 if( pCsr->iFree % 2 ){
3177 pCsr->iFree++;
3178 }else{
3179 int nEntry = pCsr->pDb->pWorker->freelist.nEntry;
3180 FreelistEntry *aEntry = pCsr->pDb->pWorker->freelist.aEntry;
3181
3182 int i = nEntry - 1 - (pCsr->iFree / 2);
3183
3184 /* If the current entry is a delete and the "end-delete" key will not
3185 ** be attached to the next entry, increment iFree by 1 only. */
3186 if( aEntry[i].iId<0 ){
3187 while( 1 ){
3188 if( i==0 || aEntry[i-1].iBlk!=aEntry[i].iBlk-1 ){
3189 pCsr->iFree--;
3190 break;
3191 }
3192 if( aEntry[i-1].iId>=0 ) break;
3193 pCsr->iFree += 2;
3194 i--;
3195 }
3196 }
3197 pCsr->iFree += 2;
3198 }
3199 }
3200
multiCursorAdvance(MultiCursor * pCsr,int bReverse)3201 static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
3202 int rc = LSM_OK; /* Return Code */
3203 if( lsmMCursorValid(pCsr) ){
3204 do {
3205 int iKey = pCsr->aTree[1];
3206
3207 assertCursorTree(pCsr);
3208
3209 /* If this multi-cursor is advancing forwards, and the sub-cursor
3210 ** being advanced is the one that separator keys may be being read
3211 ** from, record the current absolute pointer value. */
3212 if( pCsr->pPrevMergePtr ){
3213 if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
3214 assert( pCsr->pBtCsr );
3215 *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr;
3216 }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0
3217 && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1)
3218 ){
3219 SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT];
3220 *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr;
3221 }
3222 }
3223
3224 if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3225 TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
3226 if( bReverse ){
3227 rc = lsmTreeCursorPrev(pTreeCsr);
3228 }else{
3229 rc = lsmTreeCursorNext(pTreeCsr);
3230 }
3231 }else if( iKey==CURSOR_DATA_SYSTEM ){
3232 assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
3233 assert( bReverse==0 );
3234 flCsrAdvance(pCsr);
3235 }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
3236 assert( bReverse==0 && pCsr->pBtCsr );
3237 rc = btreeCursorNext(pCsr->pBtCsr);
3238 }else{
3239 rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
3240 }
3241 if( rc==LSM_OK ){
3242 int i;
3243 for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
3244 multiCursorDoCompare(pCsr, i, bReverse);
3245 }
3246 assertCursorTree(pCsr);
3247 }
3248 }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
3249 }
3250 return rc;
3251 }
3252
lsmMCursorNext(MultiCursor * pCsr)3253 int lsmMCursorNext(MultiCursor *pCsr){
3254 if( (pCsr->flags & CURSOR_NEXT_OK)==0 ) return LSM_MISUSE_BKPT;
3255 return multiCursorAdvance(pCsr, 0);
3256 }
3257
lsmMCursorPrev(MultiCursor * pCsr)3258 int lsmMCursorPrev(MultiCursor *pCsr){
3259 if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
3260 return multiCursorAdvance(pCsr, 1);
3261 }
3262
lsmMCursorKey(MultiCursor * pCsr,void ** ppKey,int * pnKey)3263 int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){
3264 if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
3265 *pnKey = pCsr->key.nData;
3266 *ppKey = pCsr->key.pData;
3267 }else{
3268 int iKey = pCsr->aTree[1];
3269
3270 if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
3271 TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
3272 lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
3273 }else{
3274 int nKey;
3275
3276 #ifndef NDEBUG
3277 void *pKey;
3278 int eType;
3279 multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
3280 assert( eType==pCsr->eType );
3281 assert( nKey==pCsr->key.nData );
3282 assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
3283 #endif
3284
3285 nKey = pCsr->key.nData;
3286 if( nKey==0 ){
3287 *ppKey = 0;
3288 }else{
3289 *ppKey = pCsr->key.pData;
3290 }
3291 *pnKey = nKey;
3292 }
3293 }
3294 return LSM_OK;
3295 }
3296
3297 /*
3298 ** Compare the current key that cursor csr points to with pKey/nKey. Set
3299 ** *piRes to the result and return LSM_OK.
3300 */
lsm_csr_cmp(lsm_cursor * csr,const void * pKey,int nKey,int * piRes)3301 int lsm_csr_cmp(lsm_cursor *csr, const void *pKey, int nKey, int *piRes){
3302 MultiCursor *pCsr = (MultiCursor *)csr;
3303 void *pCsrkey; int nCsrkey;
3304 int rc;
3305 rc = lsmMCursorKey(pCsr, &pCsrkey, &nCsrkey);
3306 if( rc==LSM_OK ){
3307 int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
3308 *piRes = sortedKeyCompare(xCmp, 0, pCsrkey, nCsrkey, 0, (void *)pKey, nKey);
3309 }
3310 return rc;
3311 }
3312
lsmMCursorValue(MultiCursor * pCsr,void ** ppVal,int * pnVal)3313 int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
3314 void *pVal;
3315 int nVal;
3316 int rc;
3317 if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
3318 rc = LSM_OK;
3319 nVal = pCsr->val.nData;
3320 pVal = pCsr->val.pData;
3321 }else{
3322
3323 assert( pCsr->aTree );
3324 assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );
3325
3326 rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
3327 if( pVal && rc==LSM_OK ){
3328 rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
3329 pVal = pCsr->val.pData;
3330 }
3331
3332 if( rc!=LSM_OK ){
3333 pVal = 0;
3334 nVal = 0;
3335 }
3336 }
3337 *ppVal = pVal;
3338 *pnVal = nVal;
3339 return rc;
3340 }
3341
lsmMCursorType(MultiCursor * pCsr,int * peType)3342 int lsmMCursorType(MultiCursor *pCsr, int *peType){
3343 assert( pCsr->aTree );
3344 multiCursorGetKey(pCsr, pCsr->aTree[1], peType, 0, 0);
3345 return LSM_OK;
3346 }
3347
3348 /*
3349 ** Buffer aData[], size nData, is assumed to contain a valid b-tree
3350 ** hierarchy page image. Return the offset in aData[] of the next free
3351 ** byte in the data area (where a new cell may be written if there is
3352 ** space).
3353 */
mergeWorkerPageOffset(u8 * aData,int nData)3354 static int mergeWorkerPageOffset(u8 *aData, int nData){
3355 int nRec;
3356 int iOff;
3357 int nKey;
3358 int eType;
3359
3360 nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
3361 iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]);
3362 eType = aData[iOff++];
3363 assert( eType==0
3364 || eType==(LSM_SYSTEMKEY|LSM_SEPARATOR)
3365 || eType==(LSM_SEPARATOR)
3366 );
3367
3368 iOff += lsmVarintGet32(&aData[iOff], &nKey);
3369 iOff += lsmVarintGet32(&aData[iOff], &nKey);
3370
3371 return iOff + (eType ? nKey : 0);
3372 }
3373
3374 /*
3375 ** Following a checkpoint operation, database pages that are part of the
3376 ** checkpointed state of the LSM are deemed read-only. This includes the
3377 ** right-most page of the b-tree hierarchy of any separators array under
3378 ** construction, and all pages between it and the b-tree root, inclusive.
3379 ** This is a problem, as when further pages are appended to the separators
3380 ** array, entries must be added to the indicated b-tree hierarchy pages.
3381 **
3382 ** This function copies all such b-tree pages to new locations, so that
3383 ** they can be modified as required.
3384 **
3385 ** The complication is that not all database pages are the same size - due
3386 ** to the way the file.c module works some (the first and last in each block)
3387 ** are 4 bytes smaller than the others.
3388 */
mergeWorkerMoveHierarchy(MergeWorker * pMW,int bSep)3389 static int mergeWorkerMoveHierarchy(
3390 MergeWorker *pMW, /* Merge worker */
3391 int bSep /* True for separators run */
3392 ){
3393 lsm_db *pDb = pMW->pDb; /* Database handle */
3394 int rc = LSM_OK; /* Return code */
3395 int i;
3396 Page **apHier = pMW->hier.apHier;
3397 int nHier = pMW->hier.nHier;
3398
3399 for(i=0; rc==LSM_OK && i<nHier; i++){
3400 Page *pNew = 0;
3401 rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &pNew);
3402 assert( rc==LSM_OK );
3403
3404 if( rc==LSM_OK ){
3405 u8 *a1; int n1;
3406 u8 *a2; int n2;
3407
3408 a1 = fsPageData(pNew, &n1);
3409 a2 = fsPageData(apHier[i], &n2);
3410
3411 assert( n1==n2 || n1+4==n2 );
3412
3413 if( n1==n2 ){
3414 memcpy(a1, a2, n2);
3415 }else{
3416 int nEntry = pageGetNRec(a2, n2);
3417 int iEof1 = SEGMENT_EOF(n1, nEntry);
3418 int iEof2 = SEGMENT_EOF(n2, nEntry);
3419
3420 memcpy(a1, a2, iEof2 - 4);
3421 memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
3422 }
3423
3424 lsmFsPageRelease(apHier[i]);
3425 apHier[i] = pNew;
3426
3427 #if 0
3428 assert( n1==n2 || n1+4==n2 || n2+4==n1 );
3429 if( n1>=n2 ){
3430 /* If n1 (size of the new page) is equal to or greater than n2 (the
3431 ** size of the old page), then copy the data into the new page. If
3432 ** n1==n2, this could be done with a single memcpy(). However,
3433 ** since sometimes n1>n2, the page content and footer must be copied
3434 ** separately. */
3435 int nEntry = pageGetNRec(a2, n2);
3436 int iEof1 = SEGMENT_EOF(n1, nEntry);
3437 int iEof2 = SEGMENT_EOF(n2, nEntry);
3438 memcpy(a1, a2, iEof2);
3439 memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
3440 lsmFsPageRelease(apHier[i]);
3441 apHier[i] = pNew;
3442 }else{
3443 lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG);
3444 lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0);
3445 lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], 0);
3446 i = i - 1;
3447 lsmFsPageRelease(pNew);
3448 }
3449 #endif
3450 }
3451 }
3452
3453 #ifdef LSM_DEBUG
3454 if( rc==LSM_OK ){
3455 for(i=0; i<nHier; i++) assert( lsmFsPageWritable(apHier[i]) );
3456 }
3457 #endif
3458
3459 return rc;
3460 }
3461
3462 /*
3463 ** Allocate and populate the MergeWorker.apHier[] array.
3464 */
mergeWorkerLoadHierarchy(MergeWorker * pMW)3465 static int mergeWorkerLoadHierarchy(MergeWorker *pMW){
3466 int rc = LSM_OK;
3467 Segment *pSeg;
3468 Hierarchy *p;
3469
3470 pSeg = &pMW->pLevel->lhs;
3471 p = &pMW->hier;
3472
3473 if( p->apHier==0 && pSeg->iRoot!=0 ){
3474 FileSystem *pFS = pMW->pDb->pFS;
3475 lsm_env *pEnv = pMW->pDb->pEnv;
3476 Page **apHier = 0;
3477 int nHier = 0;
3478 int iPg = (int)pSeg->iRoot;
3479
3480 do {
3481 Page *pPg = 0;
3482 u8 *aData;
3483 int nData;
3484 int flags;
3485
3486 rc = lsmFsDbPageGet(pFS, pSeg, iPg, &pPg);
3487 if( rc!=LSM_OK ) break;
3488
3489 aData = fsPageData(pPg, &nData);
3490 flags = pageGetFlags(aData, nData);
3491 if( flags&SEGMENT_BTREE_FLAG ){
3492 Page **apNew = (Page **)lsmRealloc(
3493 pEnv, apHier, sizeof(Page *)*(nHier+1)
3494 );
3495 if( apNew==0 ){
3496 rc = LSM_NOMEM_BKPT;
3497 break;
3498 }
3499 apHier = apNew;
3500 memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
3501 nHier++;
3502
3503 apHier[0] = pPg;
3504 iPg = (int)pageGetPtr(aData, nData);
3505 }else{
3506 lsmFsPageRelease(pPg);
3507 break;
3508 }
3509 }while( 1 );
3510
3511 if( rc==LSM_OK ){
3512 u8 *aData;
3513 int nData;
3514 aData = fsPageData(apHier[0], &nData);
3515 pMW->aSave[0].iPgno = pageGetPtr(aData, nData);
3516 p->nHier = nHier;
3517 p->apHier = apHier;
3518 rc = mergeWorkerMoveHierarchy(pMW, 0);
3519 }else{
3520 int i;
3521 for(i=0; i<nHier; i++){
3522 lsmFsPageRelease(apHier[i]);
3523 }
3524 lsmFree(pEnv, apHier);
3525 }
3526 }
3527
3528 return rc;
3529 }
3530
3531 /*
3532 ** B-tree pages use almost the same format as regular pages. The
3533 ** differences are:
3534 **
3535 ** 1. The record format is (usually, see below) as follows:
3536 **
3537 ** + Type byte (always SORTED_SEPARATOR or SORTED_SYSTEM_SEPARATOR),
3538 ** + Absolute pointer value (varint),
3539 ** + Number of bytes in key (varint),
3540 ** + Blob containing key data.
3541 **
3542 ** 2. All pointer values are stored as absolute values (not offsets
3543 ** relative to the footer pointer value).
3544 **
3545 ** 3. Each pointer that is part of a record points to a page that
3546 ** contains keys smaller than the records key (note: not "equal to or
3547 ** smaller than - smaller than").
3548 **
3549 ** 4. The pointer in the page footer of a b-tree page points to a page
3550 ** that contains keys equal to or larger than the largest key on the
3551 ** b-tree page.
3552 **
3553 ** The reason for having the page footer pointer point to the right-child
3554 ** (instead of the left) is that doing things this way makes the
3555 ** mergeWorkerMoveHierarchy() operation less complicated (since the pointers
3556 ** that need to be updated are all stored as fixed-size integers within the
3557 ** page footer, not varints in page records).
3558 **
3559 ** Records may not span b-tree pages. If this function is called to add a
3560 ** record larger than (page-size / 4) bytes, then a pointer to the indexed
3561 ** array page that contains the main record is added to the b-tree instead.
3562 ** In this case the record format is:
3563 **
3564 ** + 0x00 byte (1 byte)
3565 ** + Absolute pointer value (varint),
3566 ** + Absolute page number of page containing key (varint).
3567 **
3568 ** See function seekInBtree() for the code that traverses b-tree pages.
3569 */
3570
mergeWorkerBtreeWrite(MergeWorker * pMW,u8 eType,Pgno iPtr,Pgno iKeyPg,void * pKey,int nKey)3571 static int mergeWorkerBtreeWrite(
3572 MergeWorker *pMW,
3573 u8 eType,
3574 Pgno iPtr,
3575 Pgno iKeyPg,
3576 void *pKey,
3577 int nKey
3578 ){
3579 Hierarchy *p = &pMW->hier;
3580 lsm_db *pDb = pMW->pDb; /* Database handle */
3581 int rc = LSM_OK; /* Return Code */
3582 int iLevel; /* Level of b-tree hierachy to write to */
3583 int nData; /* Size of aData[] in bytes */
3584 u8 *aData; /* Page data for level iLevel */
3585 int iOff; /* Offset on b-tree page to write record to */
3586 int nRec; /* Initial number of records on b-tree page */
3587
3588 /* iKeyPg should be zero for an ordinary b-tree key, or non-zero for an
3589 ** indirect key. The flags byte for an indirect key is 0x00. */
3590 assert( (eType==0)==(iKeyPg!=0) );
3591
3592 /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
3593 ** hierarchy, the root node, and all nodes that lie on the path between.
3594 ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
3595 ** root page.
3596 **
3597 ** This loop searches for a node with enough space to store the key on,
3598 ** starting with the leaf and iterating up towards the root. When the loop
3599 ** exits, the key may be written to apHier[iLevel]. */
3600 for(iLevel=0; iLevel<=p->nHier; iLevel++){
3601 int nByte; /* Number of free bytes required */
3602
3603 if( iLevel==p->nHier ){
3604 /* Extend the array and allocate a new root page. */
3605 Page **aNew;
3606 aNew = (Page **)lsmRealloc(
3607 pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
3608 );
3609 if( !aNew ){
3610 return LSM_NOMEM_BKPT;
3611 }
3612 p->apHier = aNew;
3613 }else{
3614 Page *pOld;
3615 int nFree;
3616
3617 /* If the key will fit on this page, break out of the loop here.
3618 ** The new entry will be written to page apHier[iLevel]. */
3619 pOld = p->apHier[iLevel];
3620 assert( lsmFsPageWritable(pOld) );
3621 aData = fsPageData(pOld, &nData);
3622 if( eType==0 ){
3623 nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32((int)iKeyPg);
3624 }else{
3625 nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32(nKey) + nKey;
3626 }
3627 nRec = pageGetNRec(aData, nData);
3628 nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
3629 if( nByte<=nFree ) break;
3630
3631 /* Otherwise, this page is full. Set the right-hand-child pointer
3632 ** to iPtr and release it. */
3633 lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
3634 assert( lsmFsPageNumber(pOld)==0 );
3635 rc = lsmFsPagePersist(pOld);
3636 if( rc==LSM_OK ){
3637 iPtr = lsmFsPageNumber(pOld);
3638 lsmFsPageRelease(pOld);
3639 }
3640 }
3641
3642 /* Allocate a new page for apHier[iLevel]. */
3643 p->apHier[iLevel] = 0;
3644 if( rc==LSM_OK ){
3645 rc = lsmFsSortedAppend(
3646 pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &p->apHier[iLevel]
3647 );
3648 }
3649 if( rc!=LSM_OK ) return rc;
3650
3651 aData = fsPageData(p->apHier[iLevel], &nData);
3652 memset(aData, 0, nData);
3653 lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
3654 lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
3655
3656 if( iLevel==p->nHier ){
3657 p->nHier++;
3658 break;
3659 }
3660 }
3661
3662 /* Write the key into page apHier[iLevel]. */
3663 aData = fsPageData(p->apHier[iLevel], &nData);
3664 iOff = mergeWorkerPageOffset(aData, nData);
3665 nRec = pageGetNRec(aData, nData);
3666 lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
3667 lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
3668 if( eType==0 ){
3669 aData[iOff++] = 0x00;
3670 iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
3671 iOff += lsmVarintPut32(&aData[iOff], (int)iKeyPg);
3672 }else{
3673 aData[iOff++] = eType;
3674 iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
3675 iOff += lsmVarintPut32(&aData[iOff], nKey);
3676 memcpy(&aData[iOff], pKey, nKey);
3677 }
3678
3679 return rc;
3680 }
3681
mergeWorkerBtreeIndirect(MergeWorker * pMW)3682 static int mergeWorkerBtreeIndirect(MergeWorker *pMW){
3683 int rc = LSM_OK;
3684 if( pMW->iIndirect ){
3685 Pgno iKeyPg = pMW->aSave[1].iPgno;
3686 rc = mergeWorkerBtreeWrite(pMW, 0, pMW->iIndirect, iKeyPg, 0, 0);
3687 pMW->iIndirect = 0;
3688 }
3689 return rc;
3690 }
3691
3692 /*
3693 ** Append the database key (iTopic/pKey/nKey) to the b-tree under
3694 ** construction. This key has not yet been written to a segment page.
3695 ** The pointer that will accompany the new key in the b-tree - that
3696 ** points to the completed segment page that contains keys smaller than
3697 ** (pKey/nKey) is currently stored in pMW->aSave[0].iPgno.
3698 */
mergeWorkerPushHierarchy(MergeWorker * pMW,int iTopic,void * pKey,int nKey)3699 static int mergeWorkerPushHierarchy(
3700 MergeWorker *pMW, /* Merge worker object */
3701 int iTopic, /* Topic value for this key */
3702 void *pKey, /* Pointer to key buffer */
3703 int nKey /* Size of pKey buffer in bytes */
3704 ){
3705 int rc = LSM_OK; /* Return Code */
3706 Pgno iPtr; /* Pointer value to accompany pKey/nKey */
3707
3708 assert( pMW->aSave[0].bStore==0 );
3709 assert( pMW->aSave[1].bStore==0 );
3710 rc = mergeWorkerBtreeIndirect(pMW);
3711
3712 /* Obtain the absolute pointer value to store along with the key in the
3713 ** page body. This pointer points to a page that contains keys that are
3714 ** smaller than pKey/nKey. */
3715 iPtr = pMW->aSave[0].iPgno;
3716 assert( iPtr!=0 );
3717
3718 /* Determine if the indirect format should be used. */
3719 if( (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)) ){
3720 pMW->iIndirect = iPtr;
3721 pMW->aSave[1].bStore = 1;
3722 }else{
3723 rc = mergeWorkerBtreeWrite(
3724 pMW, (u8)(iTopic | LSM_SEPARATOR), iPtr, 0, pKey, nKey
3725 );
3726 }
3727
3728 /* Ensure that the SortedRun.iRoot field is correct. */
3729 return rc;
3730 }
3731
mergeWorkerFinishHierarchy(MergeWorker * pMW)3732 static int mergeWorkerFinishHierarchy(
3733 MergeWorker *pMW /* Merge worker object */
3734 ){
3735 int i; /* Used to loop through apHier[] */
3736 int rc = LSM_OK; /* Return code */
3737 Pgno iPtr; /* New right-hand-child pointer value */
3738
3739 iPtr = pMW->aSave[0].iPgno;
3740 for(i=0; i<pMW->hier.nHier && rc==LSM_OK; i++){
3741 Page *pPg = pMW->hier.apHier[i];
3742 int nData; /* Size of aData[] in bytes */
3743 u8 *aData; /* Page data for pPg */
3744
3745 aData = fsPageData(pPg, &nData);
3746 lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
3747
3748 rc = lsmFsPagePersist(pPg);
3749 iPtr = lsmFsPageNumber(pPg);
3750 lsmFsPageRelease(pPg);
3751 }
3752
3753 if( pMW->hier.nHier ){
3754 pMW->pLevel->lhs.iRoot = iPtr;
3755 lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
3756 pMW->hier.apHier = 0;
3757 pMW->hier.nHier = 0;
3758 }
3759
3760 return rc;
3761 }
3762
mergeWorkerAddPadding(MergeWorker * pMW)3763 static int mergeWorkerAddPadding(
3764 MergeWorker *pMW /* Merge worker object */
3765 ){
3766 FileSystem *pFS = pMW->pDb->pFS;
3767 return lsmFsSortedPadding(pFS, pMW->pDb->pWorker, &pMW->pLevel->lhs);
3768 }
3769
3770 /*
3771 ** Release all page references currently held by the merge-worker passed
3772 ** as the only argument. Unless an error has occurred, all pages have
3773 ** already been released.
3774 */
mergeWorkerReleaseAll(MergeWorker * pMW)3775 static void mergeWorkerReleaseAll(MergeWorker *pMW){
3776 int i;
3777 lsmFsPageRelease(pMW->pPage);
3778 pMW->pPage = 0;
3779
3780 for(i=0; i<pMW->hier.nHier; i++){
3781 lsmFsPageRelease(pMW->hier.apHier[i]);
3782 pMW->hier.apHier[i] = 0;
3783 }
3784 lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
3785 pMW->hier.apHier = 0;
3786 pMW->hier.nHier = 0;
3787 }
3788
keyszToSkip(FileSystem * pFS,int nKey)3789 static int keyszToSkip(FileSystem *pFS, int nKey){
3790 int nPgsz; /* Nominal database page size */
3791 nPgsz = lsmFsPageSize(pFS);
3792 return LSM_MIN(((nKey * 4) / nPgsz), 3);
3793 }
3794
3795 /*
3796 ** Release the reference to the current output page of merge-worker *pMW
3797 ** (reference pMW->pPage). Set the page number values in aSave[] as
3798 ** required (see comments above struct MergeWorker for details).
3799 */
mergeWorkerPersistAndRelease(MergeWorker * pMW)3800 static int mergeWorkerPersistAndRelease(MergeWorker *pMW){
3801 int rc;
3802 int i;
3803
3804 assert( pMW->pPage || (pMW->aSave[0].bStore==0 && pMW->aSave[1].bStore==0) );
3805
3806 /* Persist the page */
3807 rc = lsmFsPagePersist(pMW->pPage);
3808
3809 /* If required, save the page number. */
3810 for(i=0; i<2; i++){
3811 if( pMW->aSave[i].bStore ){
3812 pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
3813 pMW->aSave[i].bStore = 0;
3814 }
3815 }
3816
3817 /* Release the completed output page. */
3818 lsmFsPageRelease(pMW->pPage);
3819 pMW->pPage = 0;
3820 return rc;
3821 }
3822
3823 /*
3824 ** Advance to the next page of an output run being populated by merge-worker
3825 ** pMW. The footer of the new page is initialized to indicate that it contains
3826 ** zero records. The flags field is cleared. The page footer pointer field
3827 ** is set to iFPtr.
3828 **
3829 ** If successful, LSM_OK is returned. Otherwise, an error code.
3830 */
mergeWorkerNextPage(MergeWorker * pMW,Pgno iFPtr)3831 static int mergeWorkerNextPage(
3832 MergeWorker *pMW, /* Merge worker object to append page to */
3833 Pgno iFPtr /* Pointer value for footer of new page */
3834 ){
3835 int rc = LSM_OK; /* Return code */
3836 Page *pNext = 0; /* New page appended to run */
3837 lsm_db *pDb = pMW->pDb; /* Database handle */
3838
3839 rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 0, &pNext);
3840 assert( rc || pMW->pLevel->lhs.iFirst>0 || pMW->pDb->compress.xCompress );
3841
3842 if( rc==LSM_OK ){
3843 u8 *aData; /* Data buffer belonging to page pNext */
3844 int nData; /* Size of aData[] in bytes */
3845
3846 rc = mergeWorkerPersistAndRelease(pMW);
3847
3848 pMW->pPage = pNext;
3849 pMW->pLevel->pMerge->iOutputOff = 0;
3850 aData = fsPageData(pNext, &nData);
3851 lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
3852 lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
3853 lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);
3854 pMW->nWork++;
3855 }
3856
3857 return rc;
3858 }
3859
3860 /*
3861 ** Write a blob of data into an output segment being populated by a
3862 ** merge-worker object. If argument bSep is true, write into the separators
3863 ** array. Otherwise, the main array.
3864 **
3865 ** This function is used to write the blobs of data for keys and values.
3866 */
mergeWorkerData(MergeWorker * pMW,int bSep,int iFPtr,u8 * aWrite,int nWrite)3867 static int mergeWorkerData(
3868 MergeWorker *pMW, /* Merge worker object */
3869 int bSep, /* True to write to separators run */
3870 int iFPtr, /* Footer ptr for new pages */
3871 u8 *aWrite, /* Write data from this buffer */
3872 int nWrite /* Size of aWrite[] in bytes */
3873 ){
3874 int rc = LSM_OK; /* Return code */
3875 int nRem = nWrite; /* Number of bytes still to write */
3876
3877 while( rc==LSM_OK && nRem>0 ){
3878 Merge *pMerge = pMW->pLevel->pMerge;
3879 int nCopy; /* Number of bytes to copy */
3880 u8 *aData; /* Pointer to buffer of current output page */
3881 int nData; /* Size of aData[] in bytes */
3882 int nRec; /* Number of records on current output page */
3883 int iOff; /* Offset in aData[] to write to */
3884
3885 assert( lsmFsPageWritable(pMW->pPage) );
3886
3887 aData = fsPageData(pMW->pPage, &nData);
3888 nRec = pageGetNRec(aData, nData);
3889 iOff = pMerge->iOutputOff;
3890 nCopy = LSM_MIN(nRem, SEGMENT_EOF(nData, nRec) - iOff);
3891
3892 memcpy(&aData[iOff], &aWrite[nWrite-nRem], nCopy);
3893 nRem -= nCopy;
3894
3895 if( nRem>0 ){
3896 rc = mergeWorkerNextPage(pMW, iFPtr);
3897 }else{
3898 pMerge->iOutputOff = iOff + nCopy;
3899 }
3900 }
3901
3902 return rc;
3903 }
3904
3905
3906 /*
3907 ** The MergeWorker passed as the only argument is working to merge two or
3908 ** more existing segments together (not to flush an in-memory tree). It
3909 ** has not yet written the first key to the first page of the output.
3910 */
mergeWorkerFirstPage(MergeWorker * pMW)3911 static int mergeWorkerFirstPage(MergeWorker *pMW){
3912 int rc = LSM_OK; /* Return code */
3913 Page *pPg = 0; /* First page of run pSeg */
3914 int iFPtr = 0; /* Pointer value read from footer of pPg */
3915 MultiCursor *pCsr = pMW->pCsr;
3916
3917 assert( pMW->pPage==0 );
3918
3919 if( pCsr->pBtCsr ){
3920 rc = LSM_OK;
3921 iFPtr = (int)pMW->pLevel->pNext->lhs.iFirst;
3922 }else if( pCsr->nPtr>0 ){
3923 Segment *pSeg;
3924 pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
3925 rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg);
3926 if( rc==LSM_OK ){
3927 u8 *aData; /* Buffer for page pPg */
3928 int nData; /* Size of aData[] in bytes */
3929 aData = fsPageData(pPg, &nData);
3930 iFPtr = (int)pageGetPtr(aData, nData);
3931 lsmFsPageRelease(pPg);
3932 }
3933 }
3934
3935 if( rc==LSM_OK ){
3936 rc = mergeWorkerNextPage(pMW, iFPtr);
3937 if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
3938 pMW->aSave[0].bStore = 1;
3939 }
3940
3941 return rc;
3942 }
3943
mergeWorkerWrite(MergeWorker * pMW,int eType,void * pKey,int nKey,void * pVal,int nVal,int iPtr)3944 static int mergeWorkerWrite(
3945 MergeWorker *pMW, /* Merge worker object to write into */
3946 int eType, /* One of SORTED_SEPARATOR, WRITE or DELETE */
3947 void *pKey, int nKey, /* Key value */
3948 void *pVal, int nVal, /* Value value */
3949 int iPtr /* Absolute value of page pointer, or 0 */
3950 ){
3951 int rc = LSM_OK; /* Return code */
3952 Merge *pMerge; /* Persistent part of level merge state */
3953 int nHdr; /* Space required for this record header */
3954 Page *pPg; /* Page to write to */
3955 u8 *aData; /* Data buffer for page pWriter->pPage */
3956 int nData = 0; /* Size of buffer aData[] in bytes */
3957 int nRec = 0; /* Number of records on page pPg */
3958 int iFPtr = 0; /* Value of pointer in footer of pPg */
3959 int iRPtr = 0; /* Value of pointer written into record */
3960 int iOff = 0; /* Current write offset within page pPg */
3961 Segment *pSeg; /* Segment being written */
3962 int flags = 0; /* If != 0, flags value for page footer */
3963 int bFirst = 0; /* True for first key of output run */
3964
3965 pMerge = pMW->pLevel->pMerge;
3966 pSeg = &pMW->pLevel->lhs;
3967
3968 if( pSeg->iFirst==0 && pMW->pPage==0 ){
3969 rc = mergeWorkerFirstPage(pMW);
3970 bFirst = 1;
3971 }
3972 pPg = pMW->pPage;
3973 if( pPg ){
3974 aData = fsPageData(pPg, &nData);
3975 nRec = pageGetNRec(aData, nData);
3976 iFPtr = (int)pageGetPtr(aData, nData);
3977 iRPtr = iPtr - iFPtr;
3978 }
3979
3980 /* Figure out how much space is required by the new record. The space
3981 ** required is divided into two sections: the header and the body. The
3982 ** header consists of the intial varint fields. The body are the blobs
3983 ** of data that correspond to the key and value data. The entire header
3984 ** must be stored on the page. The body may overflow onto the next and
3985 ** subsequent pages.
3986 **
3987 ** The header space is:
3988 **
3989 ** 1) record type - 1 byte.
3990 ** 2) Page-pointer-offset - 1 varint
3991 ** 3) Key size - 1 varint
3992 ** 4) Value size - 1 varint (only if LSM_INSERT flag is set)
3993 */
3994 if( rc==LSM_OK ){
3995 nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
3996 if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);
3997
3998 /* If the entire header will not fit on page pPg, or if page pPg is
3999 ** marked read-only, advance to the next page of the output run. */
4000 iOff = pMerge->iOutputOff;
4001 if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
4002 iFPtr = (int)*pMW->pCsr->pPrevMergePtr;
4003 iRPtr = iPtr - iFPtr;
4004 iOff = 0;
4005 nRec = 0;
4006 rc = mergeWorkerNextPage(pMW, iFPtr);
4007 pPg = pMW->pPage;
4008 }
4009 }
4010
4011 /* If this record header will be the first on the page, and the page is
4012 ** not the very first in the entire run, add a copy of the key to the
4013 ** b-tree hierarchy.
4014 */
4015 if( rc==LSM_OK && nRec==0 && bFirst==0 ){
4016 assert( pMerge->nSkip>=0 );
4017
4018 if( pMerge->nSkip==0 ){
4019 rc = mergeWorkerPushHierarchy(pMW, rtTopic(eType), pKey, nKey);
4020 assert( pMW->aSave[0].bStore==0 );
4021 pMW->aSave[0].bStore = 1;
4022 pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
4023 }else{
4024 pMerge->nSkip--;
4025 flags = PGFTR_SKIP_THIS_FLAG;
4026 }
4027
4028 if( pMerge->nSkip ) flags |= PGFTR_SKIP_NEXT_FLAG;
4029 }
4030
4031 /* Update the output segment */
4032 if( rc==LSM_OK ){
4033 aData = fsPageData(pPg, &nData);
4034
4035 /* Update the page footer. */
4036 lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
4037 lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
4038 if( flags ) lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], (u16)flags);
4039
4040 /* Write the entry header into the current page. */
4041 aData[iOff++] = (u8)eType; /* 1 */
4042 iOff += lsmVarintPut32(&aData[iOff], iRPtr); /* 2 */
4043 iOff += lsmVarintPut32(&aData[iOff], nKey); /* 3 */
4044 if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal); /* 4 */
4045 pMerge->iOutputOff = iOff;
4046
4047 /* Write the key and data into the segment. */
4048 assert( iFPtr==pageGetPtr(aData, nData) );
4049 rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pKey, nKey);
4050 if( rc==LSM_OK && rtIsWrite(eType) ){
4051 if( rc==LSM_OK ){
4052 rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pVal, nVal);
4053 }
4054 }
4055 }
4056
4057 return rc;
4058 }
4059
4060
4061 /*
4062 ** Free all resources allocated by mergeWorkerInit().
4063 */
mergeWorkerShutdown(MergeWorker * pMW,int * pRc)4064 static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){
4065 int i; /* Iterator variable */
4066 int rc = *pRc;
4067 MultiCursor *pCsr = pMW->pCsr;
4068
4069 /* Unless the merge has finished, save the cursor position in the
4070 ** Merge.aInput[] array. See function mergeWorkerInit() for the
4071 ** code to restore a cursor position based on aInput[]. */
4072 if( rc==LSM_OK && pCsr && lsmMCursorValid(pCsr) ){
4073 Merge *pMerge = pMW->pLevel->pMerge;
4074 int bBtree = (pCsr->pBtCsr!=0);
4075 int iPtr;
4076
4077 /* pMerge->nInput==0 indicates that this is a FlushTree() operation. */
4078 assert( pMerge->nInput==0 || pMW->pLevel->nRight>0 );
4079 assert( pMerge->nInput==0 || pMerge->nInput==(pCsr->nPtr+bBtree) );
4080
4081 for(i=0; i<(pMerge->nInput-bBtree); i++){
4082 SegmentPtr *pPtr = &pCsr->aPtr[i];
4083 if( pPtr->pPg ){
4084 pMerge->aInput[i].iPg = lsmFsPageNumber(pPtr->pPg);
4085 pMerge->aInput[i].iCell = pPtr->iCell;
4086 }else{
4087 pMerge->aInput[i].iPg = 0;
4088 pMerge->aInput[i].iCell = 0;
4089 }
4090 }
4091 if( bBtree && pMerge->nInput ){
4092 assert( i==pCsr->nPtr );
4093 btreeCursorPosition(pCsr->pBtCsr, &pMerge->aInput[i]);
4094 }
4095
4096 /* Store the location of the split-key */
4097 iPtr = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
4098 if( iPtr<pCsr->nPtr ){
4099 pMerge->splitkey = pMerge->aInput[iPtr];
4100 }else{
4101 btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
4102 }
4103
4104 pMerge->iOutputOff = -1;
4105 }
4106
4107 lsmMCursorClose(pCsr, 0);
4108
4109 /* Persist and release the output page. */
4110 if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
4111 if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
4112 if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
4113 if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW);
4114 lsmFsFlushWaiting(pMW->pDb->pFS, &rc);
4115 mergeWorkerReleaseAll(pMW);
4116
4117 lsmFree(pMW->pDb->pEnv, pMW->aGobble);
4118 pMW->aGobble = 0;
4119 pMW->pCsr = 0;
4120
4121 *pRc = rc;
4122 }
4123
4124 /*
4125 ** The cursor passed as the first argument is being used as the input for
4126 ** a merge operation. When this function is called, *piFlags contains the
4127 ** database entry flags for the current entry. The entry about to be written
4128 ** to the output.
4129 **
4130 ** Note that this function only has to work for cursors configured to
4131 ** iterate forwards (not backwards).
4132 */
mergeRangeDeletes(MultiCursor * pCsr,int * piVal,int * piFlags)4133 static void mergeRangeDeletes(MultiCursor *pCsr, int *piVal, int *piFlags){
4134 int f = *piFlags;
4135 int iKey = pCsr->aTree[1];
4136 int i;
4137
4138 assert( pCsr->flags & CURSOR_NEXT_OK );
4139 if( pCsr->flags & CURSOR_IGNORE_DELETE ){
4140 /* The ignore-delete flag is set when the output of the merge will form
4141 ** the oldest level in the database. In this case there is no point in
4142 ** retaining any range-delete flags. */
4143 assert( (f & LSM_POINT_DELETE)==0 );
4144 f &= ~(LSM_START_DELETE|LSM_END_DELETE);
4145 }else{
4146 for(i=0; i<(CURSOR_DATA_SEGMENT + pCsr->nPtr); i++){
4147 if( i!=iKey ){
4148 int eType;
4149 void *pKey;
4150 int nKey;
4151 int res;
4152 multiCursorGetKey(pCsr, i, &eType, &pKey, &nKey);
4153
4154 if( pKey ){
4155 res = sortedKeyCompare(pCsr->pDb->xCmp,
4156 rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData,
4157 rtTopic(eType), pKey, nKey
4158 );
4159 assert( res<=0 );
4160 if( res==0 ){
4161 if( (f & (LSM_INSERT|LSM_POINT_DELETE))==0 ){
4162 if( eType & LSM_INSERT ){
4163 f |= LSM_INSERT;
4164 *piVal = i;
4165 }
4166 else if( eType & LSM_POINT_DELETE ){
4167 f |= LSM_POINT_DELETE;
4168 }
4169 }
4170 f |= (eType & (LSM_END_DELETE|LSM_START_DELETE));
4171 }
4172
4173 if( i>iKey && (eType & LSM_END_DELETE) && res<0 ){
4174 if( f & (LSM_INSERT|LSM_POINT_DELETE) ){
4175 f |= (LSM_END_DELETE|LSM_START_DELETE);
4176 }else{
4177 f = 0;
4178 }
4179 break;
4180 }
4181 }
4182 }
4183 }
4184
4185 assert( (f & LSM_INSERT)==0 || (f & LSM_POINT_DELETE)==0 );
4186 if( (f & LSM_START_DELETE)
4187 && (f & LSM_END_DELETE)
4188 && (f & LSM_POINT_DELETE )
4189 ){
4190 f = 0;
4191 }
4192 }
4193
4194 *piFlags = f;
4195 }
4196
mergeWorkerStep(MergeWorker * pMW)4197 static int mergeWorkerStep(MergeWorker *pMW){
4198 lsm_db *pDb = pMW->pDb; /* Database handle */
4199 MultiCursor *pCsr; /* Cursor to read input data from */
4200 int rc = LSM_OK; /* Return code */
4201 int eType; /* SORTED_SEPARATOR, WRITE or DELETE */
4202 void *pKey; int nKey; /* Key */
4203 Pgno iPtr;
4204 int iVal;
4205
4206 pCsr = pMW->pCsr;
4207
4208 /* Pull the next record out of the source cursor. */
4209 lsmMCursorKey(pCsr, &pKey, &nKey);
4210 eType = pCsr->eType;
4211
4212 /* Figure out if the output record may have a different pointer value
4213 ** than the previous. This is the case if the current key is identical to
4214 ** a key that appears in the lowest level run being merged. If so, set
4215 ** iPtr to the absolute pointer value. If not, leave iPtr set to zero,
4216 ** indicating that the output pointer value should be a copy of the pointer
4217 ** value written with the previous key. */
4218 iPtr = (pCsr->pPrevMergePtr ? *pCsr->pPrevMergePtr : 0);
4219 if( pCsr->pBtCsr ){
4220 BtreeCursor *pBtCsr = pCsr->pBtCsr;
4221 if( pBtCsr->pKey ){
4222 int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
4223 if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
4224 if( 0==res ) iPtr = pBtCsr->iPtr;
4225 assert( res>=0 );
4226 }
4227 }else if( pCsr->nPtr ){
4228 SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1];
4229 if( pPtr->pPg
4230 && 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey)
4231 ){
4232 iPtr = pPtr->iPtr+pPtr->iPgPtr;
4233 }
4234 }
4235
4236 iVal = pCsr->aTree[1];
4237 mergeRangeDeletes(pCsr, &iVal, &eType);
4238
4239 if( eType!=0 ){
4240 if( pMW->aGobble ){
4241 int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
4242 if( iGobble<pCsr->nPtr && iGobble>=0 ){
4243 SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
4244 if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
4245 pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
4246 }
4247 }
4248 }
4249
4250 /* If this is a separator key and we know that the output pointer has not
4251 ** changed, there is no point in writing an output record. Otherwise,
4252 ** proceed. */
4253 if( rc==LSM_OK && (rtIsSeparator(eType)==0 || iPtr!=0) ){
4254 /* Write the record into the main run. */
4255 void *pVal; int nVal;
4256 rc = multiCursorGetVal(pCsr, iVal, &pVal, &nVal);
4257 if( pVal && rc==LSM_OK ){
4258 assert( nVal>=0 );
4259 rc = sortedBlobSet(pDb->pEnv, &pCsr->val, pVal, nVal);
4260 pVal = pCsr->val.pData;
4261 }
4262 if( rc==LSM_OK ){
4263 rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pVal, nVal, (int)iPtr);
4264 }
4265 }
4266 }
4267
4268 /* Advance the cursor to the next input record (assuming one exists). */
4269 assert( lsmMCursorValid(pMW->pCsr) );
4270 if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
4271
4272 return rc;
4273 }
4274
mergeWorkerDone(MergeWorker * pMW)4275 static int mergeWorkerDone(MergeWorker *pMW){
4276 return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
4277 }
4278
sortedFreeLevel(lsm_env * pEnv,Level * p)4279 static void sortedFreeLevel(lsm_env *pEnv, Level *p){
4280 if( p ){
4281 lsmFree(pEnv, p->pSplitKey);
4282 lsmFree(pEnv, p->pMerge);
4283 lsmFree(pEnv, p->aRhs);
4284 lsmFree(pEnv, p);
4285 }
4286 }
4287
sortedInvokeWorkHook(lsm_db * pDb)4288 static void sortedInvokeWorkHook(lsm_db *pDb){
4289 if( pDb->xWork ){
4290 pDb->xWork(pDb, pDb->pWorkCtx);
4291 }
4292 }
4293
sortedNewToplevel(lsm_db * pDb,int eTree,int * pnWrite)4294 static int sortedNewToplevel(
4295 lsm_db *pDb, /* Connection handle */
4296 int eTree, /* One of the TREE_XXX constants */
4297 int *pnWrite /* OUT: Number of database pages written */
4298 ){
4299 int rc = LSM_OK; /* Return Code */
4300 MultiCursor *pCsr = 0;
4301 Level *pNext = 0; /* The current top level */
4302 Level *pNew; /* The new level itself */
4303 Segment *pLinked = 0; /* Delete separators from this segment */
4304 Level *pDel = 0; /* Delete this entire level */
4305 int nWrite = 0; /* Number of database pages written */
4306 Freelist freelist;
4307
4308 if( eTree!=TREE_NONE ){
4309 rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
4310 }
4311
4312 assert( pDb->bUseFreelist==0 );
4313 pDb->pFreelist = &freelist;
4314 pDb->bUseFreelist = 1;
4315 memset(&freelist, 0, sizeof(freelist));
4316
4317 /* Allocate the new level structure to write to. */
4318 pNext = lsmDbSnapshotLevel(pDb->pWorker);
4319 pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
4320 if( pNew ){
4321 pNew->pNext = pNext;
4322 lsmDbSnapshotSetLevel(pDb->pWorker, pNew);
4323 }
4324
4325 /* Create a cursor to gather the data required by the new segment. The new
4326 ** segment contains everything in the tree and pointers to the next segment
4327 ** in the database (if any). */
4328 pCsr = multiCursorNew(pDb, &rc);
4329 if( pCsr ){
4330 pCsr->pDb = pDb;
4331 rc = multiCursorVisitFreelist(pCsr);
4332 if( rc==LSM_OK ){
4333 rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
4334 }
4335 if( rc==LSM_OK && pNext && pNext->pMerge==0 ){
4336 if( (pNext->flags & LEVEL_FREELIST_ONLY) ){
4337 pDel = pNext;
4338 pCsr->aPtr = lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr), &rc);
4339 multiCursorAddOne(pCsr, pNext, &rc);
4340 }else if( eTree!=TREE_NONE && pNext->lhs.iRoot ){
4341 pLinked = &pNext->lhs;
4342 rc = btreeCursorNew(pDb, pLinked, &pCsr->pBtCsr);
4343 }
4344 }
4345
4346 /* If this will be the only segment in the database, discard any delete
4347 ** markers present in the in-memory tree. */
4348 if( pNext==0 ){
4349 multiCursorIgnoreDelete(pCsr);
4350 }
4351 }
4352
4353 if( rc!=LSM_OK ){
4354 lsmMCursorClose(pCsr, 0);
4355 }else{
4356 Pgno iLeftPtr = 0;
4357 Merge merge; /* Merge object used to create new level */
4358 MergeWorker mergeworker; /* MergeWorker object for the same purpose */
4359
4360 memset(&merge, 0, sizeof(Merge));
4361 memset(&mergeworker, 0, sizeof(MergeWorker));
4362
4363 pNew->pMerge = &merge;
4364 pNew->flags |= LEVEL_INCOMPLETE;
4365 mergeworker.pDb = pDb;
4366 mergeworker.pLevel = pNew;
4367 mergeworker.pCsr = pCsr;
4368 pCsr->pPrevMergePtr = &iLeftPtr;
4369
4370 /* Mark the separators array for the new level as a "phantom". */
4371 mergeworker.bFlush = 1;
4372
4373 /* Do the work to create the new merged segment on disk */
4374 if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
4375 while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
4376 rc = mergeWorkerStep(&mergeworker);
4377 }
4378 mergeWorkerShutdown(&mergeworker, &rc);
4379 assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst );
4380 if( rc==LSM_OK && pNew->lhs.iFirst ){
4381 rc = lsmFsSortedFinish(pDb->pFS, &pNew->lhs);
4382 }
4383 nWrite = mergeworker.nWork;
4384 pNew->flags &= ~LEVEL_INCOMPLETE;
4385 if( eTree==TREE_NONE ){
4386 pNew->flags |= LEVEL_FREELIST_ONLY;
4387 }
4388 pNew->pMerge = 0;
4389 }
4390
4391 if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
4392 assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
4393 lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
4394 sortedFreeLevel(pDb->pEnv, pNew);
4395 }else{
4396 if( pLinked ){
4397 pLinked->iRoot = 0;
4398 }else if( pDel ){
4399 assert( pNew->pNext==pDel );
4400 pNew->pNext = pDel->pNext;
4401 lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
4402 sortedFreeLevel(pDb->pEnv, pDel);
4403 }
4404
4405 #if LSM_LOG_STRUCTURE
4406 lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "new-toplevel");
4407 #endif
4408
4409 if( freelist.nEntry ){
4410 Freelist *p = &pDb->pWorker->freelist;
4411 lsmFree(pDb->pEnv, p->aEntry);
4412 memcpy(p, &freelist, sizeof(freelist));
4413 freelist.aEntry = 0;
4414 }else{
4415 pDb->pWorker->freelist.nEntry = 0;
4416 }
4417
4418 assertBtreeOk(pDb, &pNew->lhs);
4419 sortedInvokeWorkHook(pDb);
4420 }
4421
4422 if( pnWrite ) *pnWrite = nWrite;
4423 pDb->pWorker->nWrite += nWrite;
4424 pDb->pFreelist = 0;
4425 pDb->bUseFreelist = 0;
4426 lsmFree(pDb->pEnv, freelist.aEntry);
4427 return rc;
4428 }
4429
4430 /*
4431 ** The nMerge levels in the LSM beginning with pLevel consist of a
4432 ** left-hand-side segment only. Replace these levels with a single new
4433 ** level consisting of a new empty segment on the left-hand-side and the
4434 ** nMerge segments from the replaced levels on the right-hand-side.
4435 **
4436 ** Also, allocate and populate a Merge object and set Level.pMerge to
4437 ** point to it.
4438 */
sortedMergeSetup(lsm_db * pDb,Level * pLevel,int nMerge,Level ** ppNew)4439 static int sortedMergeSetup(
4440 lsm_db *pDb, /* Database handle */
4441 Level *pLevel, /* First level to merge */
4442 int nMerge, /* Merge this many levels together */
4443 Level **ppNew /* New, merged, level */
4444 ){
4445 int rc = LSM_OK; /* Return Code */
4446 Level *pNew; /* New Level object */
4447 int bUseNext = 0; /* True to link in next separators */
4448 Merge *pMerge; /* New Merge object */
4449 int nByte; /* Bytes of space allocated at pMerge */
4450
4451 #ifdef LSM_DEBUG
4452 int iLevel;
4453 Level *pX = pLevel;
4454 for(iLevel=0; iLevel<nMerge; iLevel++){
4455 assert( pX->nRight==0 );
4456 pX = pX->pNext;
4457 }
4458 #endif
4459
4460 /* Allocate the new Level object */
4461 pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
4462 if( pNew ){
4463 pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv,
4464 nMerge * sizeof(Segment), &rc);
4465 }
4466
4467 /* Populate the new Level object */
4468 if( rc==LSM_OK ){
4469 Level *pNext = 0; /* Level following pNew */
4470 int i;
4471 int bFreeOnly = 1;
4472 Level *pTopLevel;
4473 Level *p = pLevel;
4474 Level **pp;
4475 pNew->nRight = nMerge;
4476 pNew->iAge = pLevel->iAge+1;
4477 for(i=0; i<nMerge; i++){
4478 assert( p->nRight==0 );
4479 pNext = p->pNext;
4480 pNew->aRhs[i] = p->lhs;
4481 if( (p->flags & LEVEL_FREELIST_ONLY)==0 ) bFreeOnly = 0;
4482 sortedFreeLevel(pDb->pEnv, p);
4483 p = pNext;
4484 }
4485
4486 if( bFreeOnly ) pNew->flags |= LEVEL_FREELIST_ONLY;
4487
4488 /* Replace the old levels with the new. */
4489 pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4490 pNew->pNext = p;
4491 for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext));
4492 *pp = pNew;
4493 lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel);
4494
4495 /* Determine whether or not the next separators will be linked in */
4496 if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot && pNext
4497 && (bFreeOnly==0 || (pNext->flags & LEVEL_FREELIST_ONLY))
4498 ){
4499 bUseNext = 1;
4500 }
4501 }
4502
4503 /* Allocate the merge object */
4504 nByte = sizeof(Merge) + sizeof(MergeInput) * (nMerge + bUseNext);
4505 pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
4506 if( pMerge ){
4507 pMerge->aInput = (MergeInput *)&pMerge[1];
4508 pMerge->nInput = nMerge + bUseNext;
4509 pNew->pMerge = pMerge;
4510 }
4511
4512 *ppNew = pNew;
4513 return rc;
4514 }
4515
mergeWorkerInit(lsm_db * pDb,Level * pLevel,MergeWorker * pMW)4516 static int mergeWorkerInit(
4517 lsm_db *pDb, /* Db connection to do merge work */
4518 Level *pLevel, /* Level to work on merging */
4519 MergeWorker *pMW /* Object to initialize */
4520 ){
4521 int rc = LSM_OK; /* Return code */
4522 Merge *pMerge = pLevel->pMerge; /* Persistent part of merge state */
4523 MultiCursor *pCsr = 0; /* Cursor opened for pMW */
4524 Level *pNext = pLevel->pNext; /* Next level in LSM */
4525
4526 assert( pDb->pWorker );
4527 assert( pLevel->pMerge );
4528 assert( pLevel->nRight>0 );
4529
4530 memset(pMW, 0, sizeof(MergeWorker));
4531 pMW->pDb = pDb;
4532 pMW->pLevel = pLevel;
4533 pMW->aGobble = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno) * pLevel->nRight, &rc);
4534
4535 /* Create a multi-cursor to read the data to write to the new
4536 ** segment. The new segment contains:
4537 **
4538 ** 1. Records from LHS of each of the nMerge levels being merged.
4539 ** 2. Separators from either the last level being merged, or the
4540 ** separators attached to the LHS of the following level, or neither.
4541 **
4542 ** If the new level is the lowest (oldest) in the db, discard any
4543 ** delete keys. Key annihilation.
4544 */
4545 pCsr = multiCursorNew(pDb, &rc);
4546 if( pCsr ){
4547 pCsr->flags |= CURSOR_NEXT_OK;
4548 rc = multiCursorAddRhs(pCsr, pLevel);
4549 }
4550 if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){
4551 rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr);
4552 }else if( pNext ){
4553 multiCursorReadSeparators(pCsr);
4554 }else{
4555 multiCursorIgnoreDelete(pCsr);
4556 }
4557
4558 assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nPtr+(pCsr->pBtCsr!=0)) );
4559 pMW->pCsr = pCsr;
4560
4561 /* Load the b-tree hierarchy into memory. */
4562 if( rc==LSM_OK ) rc = mergeWorkerLoadHierarchy(pMW);
4563 if( rc==LSM_OK && pMW->hier.nHier==0 ){
4564 pMW->aSave[0].iPgno = pLevel->lhs.iFirst;
4565 }
4566
4567 /* Position the cursor. */
4568 if( rc==LSM_OK ){
4569 pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
4570 if( pLevel->lhs.iFirst==0 ){
4571 /* The output array is still empty. So position the cursor at the very
4572 ** start of the input. */
4573 rc = multiCursorEnd(pCsr, 0);
4574 }else{
4575 /* The output array is non-empty. Position the cursor based on the
4576 ** page/cell data saved in the Merge.aInput[] array. */
4577 int i;
4578 for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
4579 MergeInput *pInput = &pMerge->aInput[i];
4580 if( pInput->iPg ){
4581 SegmentPtr *pPtr;
4582 assert( pCsr->aPtr[i].pPg==0 );
4583 pPtr = &pCsr->aPtr[i];
4584 rc = segmentPtrLoadPage(pDb->pFS, pPtr, (int)pInput->iPg);
4585 if( rc==LSM_OK && pPtr->nCell>0 ){
4586 rc = segmentPtrLoadCell(pPtr, pInput->iCell);
4587 }
4588 }
4589 }
4590
4591 if( rc==LSM_OK && pCsr->pBtCsr ){
4592 int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
4593 assert( i==pCsr->nPtr );
4594 rc = btreeCursorRestore(pCsr->pBtCsr, xCmp, &pMerge->aInput[i]);
4595 }
4596
4597 if( rc==LSM_OK ){
4598 rc = multiCursorSetupTree(pCsr, 0);
4599 }
4600 }
4601 pCsr->flags |= CURSOR_NEXT_OK;
4602 }
4603
4604 return rc;
4605 }
4606
sortedBtreeGobble(lsm_db * pDb,MultiCursor * pCsr,int iGobble)4607 static int sortedBtreeGobble(
4608 lsm_db *pDb, /* Worker connection */
4609 MultiCursor *pCsr, /* Multi-cursor being used for a merge */
4610 int iGobble /* pCsr->aPtr[] entry to operate on */
4611 ){
4612 int rc = LSM_OK;
4613 if( rtTopic(pCsr->eType)==0 ){
4614 Segment *pSeg = pCsr->aPtr[iGobble].pSeg;
4615 Pgno *aPg;
4616 int nPg;
4617
4618 /* Seek from the root of the b-tree to the segment leaf that may contain
4619 ** a key equal to the one multi-cursor currently points to. Record the
4620 ** page number of each b-tree page and the leaf. The segment may be
4621 ** gobbled up to (but not including) the first of these page numbers.
4622 */
4623 assert( pSeg->iRoot>0 );
4624 aPg = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno)*32, &rc);
4625 if( rc==LSM_OK ){
4626 rc = seekInBtree(pCsr, pSeg,
4627 rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, aPg, 0
4628 );
4629 }
4630
4631 if( rc==LSM_OK ){
4632 for(nPg=0; aPg[nPg]; nPg++);
4633 lsmFsGobble(pDb, pSeg, aPg, nPg);
4634 }
4635
4636 lsmFree(pDb->pEnv, aPg);
4637 }
4638 return rc;
4639 }
4640
4641 /*
4642 ** Argument p points to a level of age N. Return the number of levels in
4643 ** the linked list starting at p that have age=N (always at least 1).
4644 */
sortedCountLevels(Level * p)4645 static int sortedCountLevels(Level *p){
4646 int iAge = p->iAge;
4647 int nRet = 0;
4648 do {
4649 nRet++;
4650 p = p->pNext;
4651 }while( p && p->iAge==iAge );
4652 return nRet;
4653 }
4654
sortedSelectLevel(lsm_db * pDb,int nMerge,Level ** ppOut)4655 static int sortedSelectLevel(lsm_db *pDb, int nMerge, Level **ppOut){
4656 Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4657 int rc = LSM_OK;
4658 Level *pLevel = 0; /* Output value */
4659 Level *pBest = 0; /* Best level to work on found so far */
4660 int nBest; /* Number of segments merged at pBest */
4661 Level *pThis = 0; /* First in run of levels with age=iAge */
4662 int nThis = 0; /* Number of levels starting at pThis */
4663
4664 assert( nMerge>=1 );
4665 nBest = LSM_MAX(1, nMerge-1);
4666
4667 /* Find the longest contiguous run of levels not currently undergoing a
4668 ** merge with the same age in the structure. Or the level being merged
4669 ** with the largest number of right-hand segments. Work on it. */
4670 for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
4671 if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
4672 nThis++;
4673 }else{
4674 if( nThis>nBest ){
4675 if( (pLevel->iAge!=pThis->iAge+1)
4676 || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
4677 ){
4678 pBest = pThis;
4679 nBest = nThis;
4680 }
4681 }
4682 if( pLevel->nRight ){
4683 if( pLevel->nRight>nBest ){
4684 nBest = pLevel->nRight;
4685 pBest = pLevel;
4686 }
4687 nThis = 0;
4688 pThis = 0;
4689 }else{
4690 pThis = pLevel;
4691 nThis = 1;
4692 }
4693 }
4694 }
4695 if( nThis>nBest ){
4696 assert( pThis );
4697 pBest = pThis;
4698 nBest = nThis;
4699 }
4700
4701 if( pBest==0 && nMerge==1 ){
4702 int nFree = 0;
4703 int nUsr = 0;
4704 for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
4705 assert( !pLevel->nRight );
4706 if( pLevel->flags & LEVEL_FREELIST_ONLY ){
4707 nFree++;
4708 }else{
4709 nUsr++;
4710 }
4711 }
4712 if( nUsr>1 ){
4713 pBest = pTopLevel;
4714 nBest = nFree + nUsr;
4715 }
4716 }
4717
4718 if( pBest ){
4719 if( pBest->nRight==0 ){
4720 rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);
4721 }else{
4722 *ppOut = pBest;
4723 }
4724 }
4725
4726 return rc;
4727 }
4728
sortedDbIsFull(lsm_db * pDb)4729 static int sortedDbIsFull(lsm_db *pDb){
4730 Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);
4731
4732 if( lsmDatabaseFull(pDb) ) return 1;
4733 if( pTop && pTop->iAge==0
4734 && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
4735 ){
4736 return 1;
4737 }
4738 return 0;
4739 }
4740
4741 typedef struct MoveBlockCtx MoveBlockCtx;
4742 struct MoveBlockCtx {
4743 int iSeen; /* Previous free block on list */
4744 int iFrom; /* Total number of blocks in file */
4745 };
4746
moveBlockCb(void * pCtx,int iBlk,i64 iSnapshot)4747 static int moveBlockCb(void *pCtx, int iBlk, i64 iSnapshot){
4748 MoveBlockCtx *p = (MoveBlockCtx *)pCtx;
4749 assert( p->iFrom==0 );
4750 if( iBlk==(p->iSeen-1) ){
4751 p->iSeen = iBlk;
4752 return 0;
4753 }
4754 p->iFrom = p->iSeen-1;
4755 return 1;
4756 }
4757
4758 /*
4759 ** This function is called to further compact a database for which all
4760 ** of the content has already been merged into a single segment. If
4761 ** possible, it moves the contents of a single block from the end of the
4762 ** file to a free-block that lies closer to the start of the file (allowing
4763 ** the file to be eventually truncated).
4764 */
sortedMoveBlock(lsm_db * pDb,int * pnWrite)4765 static int sortedMoveBlock(lsm_db *pDb, int *pnWrite){
4766 Snapshot *p = pDb->pWorker;
4767 Level *pLvl = lsmDbSnapshotLevel(p);
4768 int iFrom; /* Block to move */
4769 int iTo; /* Destination to move block to */
4770 int rc; /* Return code */
4771
4772 MoveBlockCtx sCtx;
4773
4774 assert( pLvl->pNext==0 && pLvl->nRight==0 );
4775 assert( p->redirect.n<=LSM_MAX_BLOCK_REDIRECTS );
4776
4777 *pnWrite = 0;
4778
4779 /* Check that the redirect array is not already full. If it is, return
4780 ** without moving any database content. */
4781 if( p->redirect.n>=LSM_MAX_BLOCK_REDIRECTS ) return LSM_OK;
4782
4783 /* Find the last block of content in the database file. Do this by
4784 ** traversing the free-list in reverse (descending block number) order.
4785 ** The first block not on the free list is the one that will be moved.
4786 ** Since the db consists of a single segment, there is no ambiguity as
4787 ** to which segment the block belongs to. */
4788 sCtx.iSeen = p->nBlock+1;
4789 sCtx.iFrom = 0;
4790 rc = lsmWalkFreelist(pDb, 1, moveBlockCb, &sCtx);
4791 if( rc!=LSM_OK || sCtx.iFrom==0 ) return rc;
4792 iFrom = sCtx.iFrom;
4793
4794 /* Find the first free block in the database, ignoring block 1. Block
4795 ** 1 is tricky as it is smaller than the other blocks. */
4796 rc = lsmBlockAllocate(pDb, iFrom, &iTo);
4797 if( rc!=LSM_OK || iTo==0 ) return rc;
4798 assert( iTo!=1 && iTo<iFrom );
4799
4800 rc = lsmFsMoveBlock(pDb->pFS, &pLvl->lhs, iTo, iFrom);
4801 if( rc==LSM_OK ){
4802 if( p->redirect.a==0 ){
4803 int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS;
4804 p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
4805 }
4806 if( rc==LSM_OK ){
4807
4808 /* Check if the block just moved was already redirected. */
4809 int i;
4810 for(i=0; i<p->redirect.n; i++){
4811 if( p->redirect.a[i].iTo==iFrom ) break;
4812 }
4813
4814 if( i==p->redirect.n ){
4815 /* Block iFrom was not already redirected. Add a new array entry. */
4816 memmove(&p->redirect.a[1], &p->redirect.a[0],
4817 sizeof(struct RedirectEntry) * p->redirect.n
4818 );
4819 p->redirect.a[0].iFrom = iFrom;
4820 p->redirect.a[0].iTo = iTo;
4821 p->redirect.n++;
4822 }else{
4823 /* Block iFrom was already redirected. Overwrite existing entry. */
4824 p->redirect.a[i].iTo = iTo;
4825 }
4826
4827 rc = lsmBlockFree(pDb, iFrom);
4828
4829 *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
4830 pLvl->lhs.pRedirect = &p->redirect;
4831 }
4832 }
4833
4834 #if LSM_LOG_STRUCTURE
4835 if( rc==LSM_OK ){
4836 char aBuf[64];
4837 sprintf(aBuf, "move-block %d/%d", p->redirect.n-1, LSM_MAX_BLOCK_REDIRECTS);
4838 lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, aBuf);
4839 }
4840 #endif
4841 return rc;
4842 }
4843
4844 /*
4845 */
mergeInsertFreelistSegments(lsm_db * pDb,int nFree,MergeWorker * pMW)4846 static int mergeInsertFreelistSegments(
4847 lsm_db *pDb,
4848 int nFree,
4849 MergeWorker *pMW
4850 ){
4851 int rc = LSM_OK;
4852 if( nFree>0 ){
4853 MultiCursor *pCsr = pMW->pCsr;
4854 Level *pLvl = pMW->pLevel;
4855 SegmentPtr *aNew1;
4856 Segment *aNew2;
4857
4858 Level *pIter;
4859 Level *pNext;
4860 int i = 0;
4861
4862 aNew1 = (SegmentPtr *)lsmMallocZeroRc(
4863 pDb->pEnv, sizeof(SegmentPtr) * (pCsr->nPtr+nFree), &rc
4864 );
4865 if( rc ) return rc;
4866 memcpy(&aNew1[nFree], pCsr->aPtr, sizeof(SegmentPtr)*pCsr->nPtr);
4867 pCsr->nPtr += nFree;
4868 lsmFree(pDb->pEnv, pCsr->aTree);
4869 lsmFree(pDb->pEnv, pCsr->aPtr);
4870 pCsr->aTree = 0;
4871 pCsr->aPtr = aNew1;
4872
4873 aNew2 = (Segment *)lsmMallocZeroRc(
4874 pDb->pEnv, sizeof(Segment) * (pLvl->nRight+nFree), &rc
4875 );
4876 if( rc ) return rc;
4877 memcpy(&aNew2[nFree], pLvl->aRhs, sizeof(Segment)*pLvl->nRight);
4878 pLvl->nRight += nFree;
4879 lsmFree(pDb->pEnv, pLvl->aRhs);
4880 pLvl->aRhs = aNew2;
4881
4882 for(pIter=pDb->pWorker->pLevel; rc==LSM_OK && pIter!=pLvl; pIter=pNext){
4883 Segment *pSeg = &pLvl->aRhs[i];
4884 memcpy(pSeg, &pIter->lhs, sizeof(Segment));
4885
4886 pCsr->aPtr[i].pSeg = pSeg;
4887 pCsr->aPtr[i].pLevel = pLvl;
4888 rc = segmentPtrEnd(pCsr, &pCsr->aPtr[i], 0);
4889
4890 pDb->pWorker->pLevel = pNext = pIter->pNext;
4891 sortedFreeLevel(pDb->pEnv, pIter);
4892 i++;
4893 }
4894 assert( i==nFree );
4895 assert( rc!=LSM_OK || pDb->pWorker->pLevel==pLvl );
4896
4897 for(i=nFree; i<pCsr->nPtr; i++){
4898 pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
4899 }
4900
4901 lsmFree(pDb->pEnv, pMW->aGobble);
4902 pMW->aGobble = 0;
4903 }
4904 return rc;
4905 }
4906
sortedWork(lsm_db * pDb,int nWork,int nMerge,int bFlush,int * pnWrite)4907 static int sortedWork(
4908 lsm_db *pDb, /* Database handle. Must be worker. */
4909 int nWork, /* Number of pages of work to do */
4910 int nMerge, /* Try to merge this many levels at once */
4911 int bFlush, /* Set if call is to make room for a flush */
4912 int *pnWrite /* OUT: Actual number of pages written */
4913 ){
4914 int rc = LSM_OK; /* Return Code */
4915 int nRemaining = nWork; /* Units of work to do before returning */
4916 Snapshot *pWorker = pDb->pWorker;
4917
4918 assert( pWorker );
4919 if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;
4920
4921 while( nRemaining>0 ){
4922 Level *pLevel = 0;
4923
4924 /* Find a level to work on. */
4925 rc = sortedSelectLevel(pDb, nMerge, &pLevel);
4926 assert( rc==LSM_OK || pLevel==0 );
4927
4928 if( pLevel==0 ){
4929 int nDone = 0;
4930 Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
4931 if( bFlush==0 && nMerge==1 && pTopLevel && pTopLevel->pNext==0 ){
4932 rc = sortedMoveBlock(pDb, &nDone);
4933 }
4934 nRemaining -= nDone;
4935
4936 /* Could not find any work to do. Finished. */
4937 if( nDone==0 ) break;
4938 }else{
4939 int bSave = 0;
4940 Freelist freelist = {0, 0, 0};
4941 MergeWorker mergeworker; /* State used to work on the level merge */
4942
4943 assert( pDb->bIncrMerge==0 );
4944 assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
4945
4946 pDb->bIncrMerge = 1;
4947 rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
4948 assert( mergeworker.nWork==0 );
4949
4950 while( rc==LSM_OK
4951 && 0==mergeWorkerDone(&mergeworker)
4952 && (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
4953 ){
4954 int eType = rtTopic(mergeworker.pCsr->eType);
4955 rc = mergeWorkerStep(&mergeworker);
4956
4957 /* If the cursor now points at the first entry past the end of the
4958 ** user data (i.e. either to EOF or to the first free-list entry
4959 ** that will be added to the run), then check if it is possible to
4960 ** merge in any free-list entries that are either in-memory or in
4961 ** free-list-only blocks. */
4962 if( rc==LSM_OK && nMerge==1 && eType==0
4963 && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
4964 ){
4965 int nFree = 0; /* Number of free-list-only levels to merge */
4966 Level *pLvl;
4967 assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
4968
4969 /* Now check if all levels containing data newer than this one
4970 ** are single-segment free-list only levels. If so, they will be
4971 ** merged in now. */
4972 for(pLvl=pDb->pWorker->pLevel;
4973 pLvl!=mergeworker.pLevel && (pLvl->flags & LEVEL_FREELIST_ONLY);
4974 pLvl=pLvl->pNext
4975 ){
4976 assert( pLvl->nRight==0 );
4977 nFree++;
4978 }
4979 if( pLvl==mergeworker.pLevel ){
4980
4981 rc = mergeInsertFreelistSegments(pDb, nFree, &mergeworker);
4982 if( rc==LSM_OK ){
4983 rc = multiCursorVisitFreelist(mergeworker.pCsr);
4984 }
4985 if( rc==LSM_OK ){
4986 rc = multiCursorSetupTree(mergeworker.pCsr, 0);
4987 pDb->pFreelist = &freelist;
4988 pDb->bUseFreelist = 1;
4989 }
4990 }
4991 }
4992 }
4993 nRemaining -= LSM_MAX(mergeworker.nWork, 1);
4994
4995 if( rc==LSM_OK ){
4996 /* Check if the merge operation is completely finished. If not,
4997 ** gobble up (declare eligible for recycling) any pages from rhs
4998 ** segments for which the content has been completely merged into
4999 ** the lhs of the level. */
5000 if( mergeWorkerDone(&mergeworker)==0 ){
5001 int i;
5002 for(i=0; i<pLevel->nRight; i++){
5003 SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
5004 if( pGobble->pSeg->iRoot ){
5005 rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
5006 }else if( mergeworker.aGobble[i] ){
5007 lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
5008 }
5009 }
5010 }else{
5011 int i;
5012 int bEmpty;
5013 mergeWorkerShutdown(&mergeworker, &rc);
5014 bEmpty = (pLevel->lhs.iFirst==0);
5015
5016 if( bEmpty==0 && rc==LSM_OK ){
5017 rc = lsmFsSortedFinish(pDb->pFS, &pLevel->lhs);
5018 }
5019
5020 if( pDb->bUseFreelist ){
5021 Freelist *p = &pDb->pWorker->freelist;
5022 lsmFree(pDb->pEnv, p->aEntry);
5023 memcpy(p, &freelist, sizeof(freelist));
5024 pDb->bUseFreelist = 0;
5025 pDb->pFreelist = 0;
5026 bSave = 1;
5027 }
5028
5029 for(i=0; i<pLevel->nRight; i++){
5030 lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
5031 }
5032
5033 if( bEmpty ){
5034 /* If the new level is completely empty, remove it from the
5035 ** database snapshot. This can only happen if all input keys were
5036 ** annihilated. Since keys are only annihilated if the new level
5037 ** is the last in the linked list (contains the most ancient of
5038 ** database content), this guarantees that pLevel->pNext==0. */
5039 Level *pTop; /* Top level of worker snapshot */
5040 Level **pp; /* Read/write iterator for Level.pNext list */
5041
5042 assert( pLevel->pNext==0 );
5043
5044 /* Remove the level from the worker snapshot. */
5045 pTop = lsmDbSnapshotLevel(pWorker);
5046 for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
5047 *pp = pLevel->pNext;
5048 lsmDbSnapshotSetLevel(pWorker, pTop);
5049
5050 /* Free the Level structure. */
5051 sortedFreeLevel(pDb->pEnv, pLevel);
5052 }else{
5053
5054 /* Free the separators of the next level, if required. */
5055 if( pLevel->pMerge->nInput > pLevel->nRight ){
5056 assert( pLevel->pNext->lhs.iRoot );
5057 pLevel->pNext->lhs.iRoot = 0;
5058 }
5059
5060 /* Zero the right-hand-side of pLevel */
5061 lsmFree(pDb->pEnv, pLevel->aRhs);
5062 pLevel->nRight = 0;
5063 pLevel->aRhs = 0;
5064
5065 /* Free the Merge object */
5066 lsmFree(pDb->pEnv, pLevel->pMerge);
5067 pLevel->pMerge = 0;
5068 }
5069
5070 if( bSave && rc==LSM_OK ){
5071 pDb->bIncrMerge = 0;
5072 rc = lsmSaveWorker(pDb, 0);
5073 }
5074 }
5075 }
5076
5077 /* Clean up the MergeWorker object initialized above. If no error
5078 ** has occurred, invoke the work-hook to inform the application that
5079 ** the database structure has changed. */
5080 mergeWorkerShutdown(&mergeworker, &rc);
5081 pDb->bIncrMerge = 0;
5082 if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);
5083
5084 #if LSM_LOG_STRUCTURE
5085 lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "work");
5086 #endif
5087 assertBtreeOk(pDb, &pLevel->lhs);
5088 assertRunInOrder(pDb, &pLevel->lhs);
5089
5090 /* If bFlush is true and the database is no longer considered "full",
5091 ** break out of the loop even if nRemaining is still greater than
5092 ** zero. The caller has an in-memory tree to flush to disk. */
5093 if( bFlush && sortedDbIsFull(pDb)==0 ) break;
5094 }
5095 }
5096
5097 if( pnWrite ) *pnWrite = (nWork - nRemaining);
5098 pWorker->nWrite += (nWork - nRemaining);
5099
5100 #ifdef LSM_LOG_WORK
5101 lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
5102 #endif
5103 return rc;
5104 }
5105
5106 /*
5107 ** The database connection passed as the first argument must be a worker
5108 ** connection. This function checks if there exists an "old" in-memory tree
5109 ** ready to be flushed to disk. If so, true is returned. Otherwise false.
5110 **
5111 ** If an error occurs, *pRc is set to an LSM error code before returning.
5112 ** It is assumed that *pRc is set to LSM_OK when this function is called.
5113 */
sortedTreeHasOld(lsm_db * pDb,int * pRc)5114 static int sortedTreeHasOld(lsm_db *pDb, int *pRc){
5115 int rc = LSM_OK;
5116 int bRet = 0;
5117
5118 assert( pDb->pWorker );
5119 if( *pRc==LSM_OK ){
5120 if( rc==LSM_OK
5121 && pDb->treehdr.iOldShmid
5122 && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff
5123 ){
5124 bRet = 1;
5125 }else{
5126 bRet = 0;
5127 }
5128 *pRc = rc;
5129 }
5130 assert( *pRc==LSM_OK || bRet==0 );
5131 return bRet;
5132 }
5133
5134 /*
5135 ** Create a new free-list only top-level segment. Return LSM_OK if successful
5136 ** or an LSM error code if some error occurs.
5137 */
sortedNewFreelistOnly(lsm_db * pDb)5138 static int sortedNewFreelistOnly(lsm_db *pDb){
5139 return sortedNewToplevel(pDb, TREE_NONE, 0);
5140 }
5141
lsmSaveWorker(lsm_db * pDb,int bFlush)5142 int lsmSaveWorker(lsm_db *pDb, int bFlush){
5143 Snapshot *p = pDb->pWorker;
5144 if( p->freelist.nEntry>pDb->nMaxFreelist ){
5145 int rc = sortedNewFreelistOnly(pDb);
5146 if( rc!=LSM_OK ) return rc;
5147 }
5148 return lsmCheckpointSaveWorker(pDb, bFlush);
5149 }
5150
doLsmSingleWork(lsm_db * pDb,int bShutdown,int nMerge,int nPage,int * pnWrite,int * pbCkpt)5151 static int doLsmSingleWork(
5152 lsm_db *pDb,
5153 int bShutdown,
5154 int nMerge, /* Minimum segments to merge together */
5155 int nPage, /* Number of pages to write to disk */
5156 int *pnWrite, /* OUT: Pages actually written to disk */
5157 int *pbCkpt /* OUT: True if an auto-checkpoint is req. */
5158 ){
5159 Snapshot *pWorker; /* Worker snapshot */
5160 int rc = LSM_OK; /* Return code */
5161 int bDirty = 0;
5162 int nMax = nPage; /* Maximum pages to write to disk */
5163 int nRem = nPage;
5164 int bCkpt = 0;
5165
5166 assert( nPage>0 );
5167
5168 /* Open the worker 'transaction'. It will be closed before this function
5169 ** returns. */
5170 assert( pDb->pWorker==0 );
5171 rc = lsmBeginWork(pDb);
5172 if( rc!=LSM_OK ) return rc;
5173 pWorker = pDb->pWorker;
5174
5175 /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
5176 ** that this call stops writing when the auto-checkpoint is due. The
5177 ** caller will do the checkpoint, then possibly call this function again. */
5178 if( bShutdown==0 && pDb->nAutockpt ){
5179 u32 nSync;
5180 u32 nUnsync;
5181 int nPgsz;
5182
5183 lsmCheckpointSynced(pDb, 0, 0, &nSync);
5184 nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
5185 nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);
5186
5187 nMax = (int)LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (int)(nUnsync-nSync));
5188 if( nMax<nRem ){
5189 bCkpt = 1;
5190 nRem = LSM_MAX(nMax, 0);
5191 }
5192 }
5193
5194 /* If there exists in-memory data ready to be flushed to disk, attempt
5195 ** to flush it now. */
5196 if( pDb->nTransOpen==0 ){
5197 rc = lsmTreeLoadHeader(pDb, 0);
5198 }
5199 if( sortedTreeHasOld(pDb, &rc) ){
5200 /* sortedDbIsFull() returns non-zero if either (a) there are too many
5201 ** levels in total in the db, or (b) there are too many levels with the
5202 ** the same age in the db. Either way, call sortedWork() to merge
5203 ** existing segments together until this condition is cleared. */
5204 if( sortedDbIsFull(pDb) ){
5205 int nPg = 0;
5206 rc = sortedWork(pDb, nRem, nMerge, 1, &nPg);
5207 nRem -= nPg;
5208 assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
5209 bDirty = 1;
5210 }
5211
5212 if( rc==LSM_OK && nRem>0 ){
5213 int nPg = 0;
5214 rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
5215 nRem -= nPg;
5216 if( rc==LSM_OK ){
5217 if( pDb->nTransOpen>0 ){
5218 lsmTreeDiscardOld(pDb);
5219 }
5220 rc = lsmSaveWorker(pDb, 1);
5221 bDirty = 0;
5222 }
5223 }
5224 }
5225
5226 /* If nPage is still greater than zero, do some merging. */
5227 if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
5228 int nPg = 0;
5229 rc = sortedWork(pDb, nRem, nMerge, 0, &nPg);
5230 nRem -= nPg;
5231 if( nPg ) bDirty = 1;
5232 }
5233
5234 /* If the in-memory part of the free-list is too large, write a new
5235 ** top-level containing just the in-memory free-list entries to disk. */
5236 if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
5237 int nPg = 0;
5238 while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
5239 rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
5240 nRem -= nPg;
5241 }
5242 if( rc==LSM_OK ){
5243 rc = sortedNewFreelistOnly(pDb);
5244 }
5245 nRem -= nPg;
5246 if( nPg ) bDirty = 1;
5247 }
5248
5249 if( rc==LSM_OK ){
5250 *pnWrite = (nMax - nRem);
5251 *pbCkpt = (bCkpt && nRem<=0);
5252 if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0
5253 && pWorker->pLevel
5254 && pWorker->pLevel->nRight==0
5255 && pWorker->pLevel->pNext==0
5256 ){
5257 *pbCkpt = 1;
5258 }
5259 }
5260
5261 if( rc==LSM_OK && bDirty ){
5262 lsmFinishWork(pDb, 0, &rc);
5263 }else{
5264 int rcdummy = LSM_BUSY;
5265 lsmFinishWork(pDb, 0, &rcdummy);
5266 *pnWrite = 0;
5267 }
5268 assert( pDb->pWorker==0 );
5269 return rc;
5270 }
5271
doLsmWork(lsm_db * pDb,int nMerge,int nPage,int * pnWrite)5272 static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
5273 int rc = LSM_OK; /* Return code */
5274 int nWrite = 0; /* Number of pages written */
5275
5276 assert( nMerge>=1 );
5277
5278 if( nPage!=0 ){
5279 int bCkpt = 0;
5280 do {
5281 int nThis = 0;
5282 int nReq = (nPage>=0) ? (nPage-nWrite) : ((int)0x7FFFFFFF);
5283
5284 bCkpt = 0;
5285 rc = doLsmSingleWork(pDb, 0, nMerge, nReq, &nThis, &bCkpt);
5286 nWrite += nThis;
5287 if( rc==LSM_OK && bCkpt ){
5288 rc = lsm_checkpoint(pDb, 0);
5289 }
5290 }while( rc==LSM_OK && bCkpt && (nWrite<nPage || nPage<0) );
5291 }
5292
5293 if( pnWrite ){
5294 if( rc==LSM_OK ){
5295 *pnWrite = nWrite;
5296 }else{
5297 *pnWrite = 0;
5298 }
5299 }
5300 return rc;
5301 }
5302
5303 /*
5304 ** Perform work to merge database segments together.
5305 */
lsm_work(lsm_db * pDb,int nMerge,int nKB,int * pnWrite)5306 int lsm_work(lsm_db *pDb, int nMerge, int nKB, int *pnWrite){
5307 int rc; /* Return code */
5308 int nPgsz; /* Nominal page size in bytes */
5309 int nPage; /* Equivalent of nKB in pages */
5310 int nWrite = 0; /* Number of pages written */
5311
5312 /* This function may not be called if pDb has an open read or write
5313 ** transaction. Return LSM_MISUSE if an application attempts this. */
5314 if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;
5315 if( nMerge<=0 ) nMerge = pDb->nMerge;
5316
5317 lsmFsPurgeCache(pDb->pFS);
5318
5319 /* Convert from KB to pages */
5320 nPgsz = lsmFsPageSize(pDb->pFS);
5321 if( nKB>=0 ){
5322 nPage = ((i64)nKB * 1024 + nPgsz - 1) / nPgsz;
5323 }else{
5324 nPage = -1;
5325 }
5326
5327 rc = doLsmWork(pDb, nMerge, nPage, &nWrite);
5328
5329 if( pnWrite ){
5330 /* Convert back from pages to KB */
5331 *pnWrite = (int)(((i64)nWrite * 1024 + nPgsz - 1) / nPgsz);
5332 }
5333 return rc;
5334 }
5335
lsm_flush(lsm_db * db)5336 int lsm_flush(lsm_db *db){
5337 int rc;
5338
5339 if( db->nTransOpen>0 || db->pCsr ){
5340 rc = LSM_MISUSE_BKPT;
5341 }else{
5342 rc = lsmBeginWriteTrans(db);
5343 if( rc==LSM_OK ){
5344 lsmFlushTreeToDisk(db);
5345 lsmTreeDiscardOld(db);
5346 lsmTreeMakeOld(db);
5347 lsmTreeDiscardOld(db);
5348 }
5349
5350 if( rc==LSM_OK ){
5351 rc = lsmFinishWriteTrans(db, 1);
5352 }else{
5353 lsmFinishWriteTrans(db, 0);
5354 }
5355 lsmFinishReadTrans(db);
5356 }
5357
5358 return rc;
5359 }
5360
5361 /*
5362 ** This function is called in auto-work mode to perform merging work on
5363 ** the data structure. It performs enough merging work to prevent the
5364 ** height of the tree from growing indefinitely assuming that roughly
5365 ** nUnit database pages worth of data have been written to the database
5366 ** (i.e. the in-memory tree) since the last call.
5367 */
lsmSortedAutoWork(lsm_db * pDb,int nUnit)5368 int lsmSortedAutoWork(
5369 lsm_db *pDb, /* Database handle */
5370 int nUnit /* Pages of data written to in-memory tree */
5371 ){
5372 int rc = LSM_OK; /* Return code */
5373 int nDepth = 0; /* Current height of tree (longest path) */
5374 Level *pLevel; /* Used to iterate through levels */
5375 int bRestore = 0;
5376
5377 assert( pDb->pWorker==0 );
5378 assert( pDb->nTransOpen>0 );
5379
5380 /* Determine how many units of work to do before returning. One unit of
5381 ** work is achieved by writing one page (~4KB) of merged data. */
5382 for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){
5383 /* nDepth += LSM_MAX(1, pLevel->nRight); */
5384 nDepth += 1;
5385 }
5386 if( lsmTreeHasOld(pDb) ){
5387 nDepth += 1;
5388 bRestore = 1;
5389 rc = lsmSaveCursors(pDb);
5390 if( rc!=LSM_OK ) return rc;
5391 }
5392
5393 if( nDepth>0 ){
5394 int nRemaining; /* Units of work to do before returning */
5395
5396 nRemaining = nUnit * nDepth;
5397 #ifdef LSM_LOG_WORK
5398 lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages",
5399 nUnit, nDepth, nRemaining);
5400 #endif
5401 assert( nRemaining>=0 );
5402 rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
5403 if( rc==LSM_BUSY ) rc = LSM_OK;
5404
5405 if( bRestore && pDb->pCsr ){
5406 lsmMCursorFreeCache(pDb);
5407 lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
5408 pDb->pClient = 0;
5409 if( rc==LSM_OK ){
5410 rc = lsmCheckpointLoad(pDb, 0);
5411 }
5412 if( rc==LSM_OK ){
5413 rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
5414 }
5415 if( rc==LSM_OK ){
5416 rc = lsmRestoreCursors(pDb);
5417 }
5418 }
5419 }
5420
5421 return rc;
5422 }
5423
5424 /*
5425 ** This function is only called during system shutdown. The contents of
5426 ** any in-memory trees present (old or current) are written out to disk.
5427 */
lsmFlushTreeToDisk(lsm_db * pDb)5428 int lsmFlushTreeToDisk(lsm_db *pDb){
5429 int rc;
5430
5431 rc = lsmBeginWork(pDb);
5432 while( rc==LSM_OK && sortedDbIsFull(pDb) ){
5433 rc = sortedWork(pDb, 256, pDb->nMerge, 1, 0);
5434 }
5435
5436 if( rc==LSM_OK ){
5437 rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
5438 }
5439
5440 lsmFinishWork(pDb, 1, &rc);
5441 return rc;
5442 }
5443
5444 /*
5445 ** Return a string representation of the segment passed as the only argument.
5446 ** Space for the returned string is allocated using lsmMalloc(), and should
5447 ** be freed by the caller using lsmFree().
5448 */
segToString(lsm_env * pEnv,Segment * pSeg,int nMin)5449 static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){
5450 int nSize = pSeg->nSize;
5451 Pgno iRoot = pSeg->iRoot;
5452 Pgno iFirst = pSeg->iFirst;
5453 Pgno iLast = pSeg->iLastPg;
5454 char *z;
5455
5456 char *z1;
5457 char *z2;
5458 int nPad;
5459
5460 z1 = lsmMallocPrintf(pEnv, "%d.%d", iFirst, iLast);
5461 if( iRoot ){
5462 z2 = lsmMallocPrintf(pEnv, "root=%d", iRoot);
5463 }else{
5464 z2 = lsmMallocPrintf(pEnv, "size=%d", nSize);
5465 }
5466
5467 nPad = nMin - 2 - strlen(z1) - 1 - strlen(z2);
5468 nPad = LSM_MAX(0, nPad);
5469
5470 if( iRoot ){
5471 z = lsmMallocPrintf(pEnv, "/%s %*s%s\\", z1, nPad, "", z2);
5472 }else{
5473 z = lsmMallocPrintf(pEnv, "|%s %*s%s|", z1, nPad, "", z2);
5474 }
5475 lsmFree(pEnv, z1);
5476 lsmFree(pEnv, z2);
5477
5478 return z;
5479 }
5480
fileToString(lsm_db * pDb,char * aBuf,int nBuf,int nMin,Segment * pSeg)5481 static int fileToString(
5482 lsm_db *pDb, /* For xMalloc() */
5483 char *aBuf,
5484 int nBuf,
5485 int nMin,
5486 Segment *pSeg
5487 ){
5488 int i = 0;
5489 if( pSeg ){
5490 char *zSeg;
5491
5492 zSeg = segToString(pDb->pEnv, pSeg, nMin);
5493 snprintf(&aBuf[i], nBuf-i, "%s", zSeg);
5494 i += strlen(&aBuf[i]);
5495 lsmFree(pDb->pEnv, zSeg);
5496
5497 #ifdef LSM_LOG_FREELIST
5498 lsmInfoArrayStructure(pDb, 1, pSeg->iFirst, &zSeg);
5499 snprintf(&aBuf[i], nBuf-1, " (%s)", zSeg);
5500 i += strlen(&aBuf[i]);
5501 lsmFree(pDb->pEnv, zSeg);
5502 #endif
5503 aBuf[nBuf] = 0;
5504 }else{
5505 aBuf[0] = '\0';
5506 }
5507
5508 return i;
5509 }
5510
sortedDumpPage(lsm_db * pDb,Segment * pRun,Page * pPg,int bVals)5511 void sortedDumpPage(lsm_db *pDb, Segment *pRun, Page *pPg, int bVals){
5512 Blob blob = {0, 0, 0}; /* Blob used for keys */
5513 LsmString s;
5514 int i;
5515
5516 int nRec;
5517 int iPtr;
5518 int flags;
5519 u8 *aData;
5520 int nData;
5521
5522 aData = fsPageData(pPg, &nData);
5523
5524 nRec = pageGetNRec(aData, nData);
5525 iPtr = (int)pageGetPtr(aData, nData);
5526 flags = pageGetFlags(aData, nData);
5527
5528 lsmStringInit(&s, pDb->pEnv);
5529 lsmStringAppendf(&s,"nCell=%d iPtr=%d flags=%d {", nRec, iPtr, flags);
5530 if( flags&SEGMENT_BTREE_FLAG ) iPtr = 0;
5531
5532 for(i=0; i<nRec; i++){
5533 Page *pRef = 0; /* Pointer to page iRef */
5534 int iChar;
5535 u8 *aKey; int nKey = 0; /* Key */
5536 u8 *aVal = 0; int nVal = 0; /* Value */
5537 int iTopic;
5538 u8 *aCell;
5539 int iPgPtr;
5540 int eType;
5541
5542 aCell = pageGetCell(aData, nData, i);
5543 eType = *aCell++;
5544 assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 );
5545 aCell += lsmVarintGet32(aCell, &iPgPtr);
5546
5547 if( eType==0 ){
5548 Pgno iRef; /* Page number of referenced page */
5549 aCell += lsmVarintGet64(aCell, &iRef);
5550 lsmFsDbPageGet(pDb->pFS, pRun, iRef, &pRef);
5551 aKey = pageGetKey(pRun, pRef, 0, &iTopic, &nKey, &blob);
5552 }else{
5553 aCell += lsmVarintGet32(aCell, &nKey);
5554 if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
5555 sortedReadData(0, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
5556 aVal = &aKey[nKey];
5557 iTopic = eType;
5558 }
5559
5560 lsmStringAppendf(&s, "%s%2X:", (i==0?"":" "), iTopic);
5561 for(iChar=0; iChar<nKey; iChar++){
5562 lsmStringAppendf(&s, "%c", isalnum(aKey[iChar]) ? aKey[iChar] : '.');
5563 }
5564 if( nVal>0 && bVals ){
5565 lsmStringAppendf(&s, "##");
5566 for(iChar=0; iChar<nVal; iChar++){
5567 lsmStringAppendf(&s, "%c", isalnum(aVal[iChar]) ? aVal[iChar] : '.');
5568 }
5569 }
5570
5571 lsmStringAppendf(&s, " %d", iPgPtr+iPtr);
5572 lsmFsPageRelease(pRef);
5573 }
5574 lsmStringAppend(&s, "}", 1);
5575
5576 lsmLogMessage(pDb, LSM_OK, " Page %d: %s", lsmFsPageNumber(pPg), s.z);
5577 lsmStringClear(&s);
5578
5579 sortedBlobFree(&blob);
5580 }
5581
infoCellDump(lsm_db * pDb,Segment * pSeg,int bIndirect,Page * pPg,int iCell,int * peType,int * piPgPtr,u8 ** paKey,int * pnKey,u8 ** paVal,int * pnVal,Blob * pBlob)5582 static void infoCellDump(
5583 lsm_db *pDb, /* Database handle */
5584 Segment *pSeg, /* Segment page belongs to */
5585 int bIndirect, /* True to follow indirect refs */
5586 Page *pPg,
5587 int iCell,
5588 int *peType,
5589 int *piPgPtr,
5590 u8 **paKey, int *pnKey,
5591 u8 **paVal, int *pnVal,
5592 Blob *pBlob
5593 ){
5594 u8 *aData; int nData; /* Page data */
5595 u8 *aKey; int nKey = 0; /* Key */
5596 u8 *aVal = 0; int nVal = 0; /* Value */
5597 int eType;
5598 int iPgPtr;
5599 Page *pRef = 0; /* Pointer to page iRef */
5600 u8 *aCell;
5601
5602 aData = fsPageData(pPg, &nData);
5603
5604 aCell = pageGetCell(aData, nData, iCell);
5605 eType = *aCell++;
5606 aCell += lsmVarintGet32(aCell, &iPgPtr);
5607
5608 if( eType==0 ){
5609 int dummy;
5610 Pgno iRef; /* Page number of referenced page */
5611 aCell += lsmVarintGet64(aCell, &iRef);
5612 if( bIndirect ){
5613 lsmFsDbPageGet(pDb->pFS, pSeg, iRef, &pRef);
5614 pageGetKeyCopy(pDb->pEnv, pSeg, pRef, 0, &dummy, pBlob);
5615 aKey = (u8 *)pBlob->pData;
5616 nKey = pBlob->nData;
5617 lsmFsPageRelease(pRef);
5618 }else{
5619 aKey = (u8 *)"<indirect>";
5620 nKey = 11;
5621 }
5622 }else{
5623 aCell += lsmVarintGet32(aCell, &nKey);
5624 if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
5625 sortedReadData(pSeg, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob);
5626 aVal = &aKey[nKey];
5627 }
5628
5629 if( peType ) *peType = eType;
5630 if( piPgPtr ) *piPgPtr = iPgPtr;
5631 if( paKey ) *paKey = aKey;
5632 if( paVal ) *paVal = aVal;
5633 if( pnKey ) *pnKey = nKey;
5634 if( pnVal ) *pnVal = nVal;
5635 }
5636
infoAppendBlob(LsmString * pStr,int bHex,u8 * z,int n)5637 static int infoAppendBlob(LsmString *pStr, int bHex, u8 *z, int n){
5638 int iChar;
5639 for(iChar=0; iChar<n; iChar++){
5640 if( bHex ){
5641 lsmStringAppendf(pStr, "%02X", z[iChar]);
5642 }else{
5643 lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.');
5644 }
5645 }
5646 return LSM_OK;
5647 }
5648
5649 #define INFO_PAGE_DUMP_DATA 0x01
5650 #define INFO_PAGE_DUMP_VALUES 0x02
5651 #define INFO_PAGE_DUMP_HEX 0x04
5652 #define INFO_PAGE_DUMP_INDIRECT 0x08
5653
infoPageDump(lsm_db * pDb,Pgno iPg,int flags,char ** pzOut)5654 static int infoPageDump(
5655 lsm_db *pDb, /* Database handle */
5656 Pgno iPg, /* Page number of page to dump */
5657 int flags,
5658 char **pzOut /* OUT: lsmMalloc'd string */
5659 ){
5660 int rc = LSM_OK; /* Return code */
5661 Page *pPg = 0; /* Handle for page iPg */
5662 int i, j; /* Loop counters */
5663 const int perLine = 16; /* Bytes per line in the raw hex dump */
5664 Segment *pSeg = 0;
5665 Snapshot *pSnap;
5666
5667 int bValues = (flags & INFO_PAGE_DUMP_VALUES);
5668 int bHex = (flags & INFO_PAGE_DUMP_HEX);
5669 int bData = (flags & INFO_PAGE_DUMP_DATA);
5670 int bIndirect = (flags & INFO_PAGE_DUMP_INDIRECT);
5671
5672 *pzOut = 0;
5673 if( iPg==0 ) return LSM_ERROR;
5674
5675 assert( pDb->pClient || pDb->pWorker );
5676 pSnap = pDb->pClient;
5677 if( pSnap==0 ) pSnap = pDb->pWorker;
5678 if( pSnap->redirect.n>0 ){
5679 Level *pLvl;
5680 int bUse = 0;
5681 for(pLvl=pSnap->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
5682 pSeg = (pLvl->nRight==0 ? &pLvl->lhs : &pLvl->aRhs[pLvl->nRight-1]);
5683 rc = lsmFsSegmentContainsPg(pDb->pFS, pSeg, iPg, &bUse);
5684 if( bUse==0 ){
5685 pSeg = 0;
5686 }
5687 }
5688
5689 /* iPg is a real page number (not subject to redirection). So it is safe
5690 ** to pass a NULL in place of the segment pointer as the second argument
5691 ** to lsmFsDbPageGet() here. */
5692 if( rc==LSM_OK ){
5693 rc = lsmFsDbPageGet(pDb->pFS, 0, iPg, &pPg);
5694 }
5695
5696 if( rc==LSM_OK ){
5697 Blob blob = {0, 0, 0, 0};
5698 int nKeyWidth = 0;
5699 LsmString str;
5700 int nRec;
5701 int iPtr;
5702 int flags2;
5703 int iCell;
5704 u8 *aData; int nData; /* Page data and size thereof */
5705
5706 aData = fsPageData(pPg, &nData);
5707 nRec = pageGetNRec(aData, nData);
5708 iPtr = (int)pageGetPtr(aData, nData);
5709 flags2 = pageGetFlags(aData, nData);
5710
5711 lsmStringInit(&str, pDb->pEnv);
5712 lsmStringAppendf(&str, "Page : %lld (%d bytes)\n", iPg, nData);
5713 lsmStringAppendf(&str, "nRec : %d\n", nRec);
5714 lsmStringAppendf(&str, "iPtr : %d\n", iPtr);
5715 lsmStringAppendf(&str, "flags: %04x\n", flags2);
5716 lsmStringAppendf(&str, "\n");
5717
5718 for(iCell=0; iCell<nRec; iCell++){
5719 int nKey;
5720 infoCellDump(
5721 pDb, pSeg, bIndirect, pPg, iCell, 0, 0, 0, &nKey, 0, 0, &blob
5722 );
5723 if( nKey>nKeyWidth ) nKeyWidth = nKey;
5724 }
5725 if( bHex ) nKeyWidth = nKeyWidth * 2;
5726
5727 for(iCell=0; iCell<nRec; iCell++){
5728 u8 *aKey; int nKey = 0; /* Key */
5729 u8 *aVal; int nVal = 0; /* Value */
5730 int iPgPtr;
5731 int eType;
5732 Pgno iAbsPtr;
5733 char zFlags[8];
5734
5735 infoCellDump(pDb, pSeg, bIndirect, pPg, iCell, &eType, &iPgPtr,
5736 &aKey, &nKey, &aVal, &nVal, &blob
5737 );
5738 iAbsPtr = iPgPtr + ((flags2 & SEGMENT_BTREE_FLAG) ? 0 : iPtr);
5739
5740 lsmFlagsToString(eType, zFlags);
5741 lsmStringAppendf(&str, "%s %d (%s) ",
5742 zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
5743 );
5744 infoAppendBlob(&str, bHex, aKey, nKey);
5745 if( nVal>0 && bValues ){
5746 lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), "");
5747 lsmStringAppendf(&str, " ");
5748 infoAppendBlob(&str, bHex, aVal, nVal);
5749 }
5750 if( rtTopic(eType) ){
5751 int iBlk = (int)~lsmGetU32(aKey);
5752 lsmStringAppendf(&str, " (block=%d", iBlk);
5753 if( nVal>0 ){
5754 i64 iSnap = lsmGetU64(aVal);
5755 lsmStringAppendf(&str, " snapshot=%lld", iSnap);
5756 }
5757 lsmStringAppendf(&str, ")");
5758 }
5759 lsmStringAppendf(&str, "\n");
5760 }
5761
5762 if( bData ){
5763 lsmStringAppendf(&str, "\n-------------------"
5764 "-------------------------------------------------------------\n");
5765 lsmStringAppendf(&str, "Page %d\n",
5766 iPg, (iPg-1)*nData, iPg*nData - 1);
5767 for(i=0; i<nData; i += perLine){
5768 lsmStringAppendf(&str, "%04x: ", i);
5769 for(j=0; j<perLine; j++){
5770 if( i+j>nData ){
5771 lsmStringAppendf(&str, " ");
5772 }else{
5773 lsmStringAppendf(&str, "%02x ", aData[i+j]);
5774 }
5775 }
5776 lsmStringAppendf(&str, " ");
5777 for(j=0; j<perLine; j++){
5778 if( i+j>nData ){
5779 lsmStringAppendf(&str, " ");
5780 }else{
5781 lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
5782 }
5783 }
5784 lsmStringAppendf(&str,"\n");
5785 }
5786 }
5787
5788 *pzOut = str.z;
5789 sortedBlobFree(&blob);
5790 lsmFsPageRelease(pPg);
5791 }
5792
5793 return rc;
5794 }
5795
lsmInfoPageDump(lsm_db * pDb,Pgno iPg,int bHex,char ** pzOut)5796 int lsmInfoPageDump(
5797 lsm_db *pDb, /* Database handle */
5798 Pgno iPg, /* Page number of page to dump */
5799 int bHex, /* True to output key/value in hex form */
5800 char **pzOut /* OUT: lsmMalloc'd string */
5801 ){
5802 int flags = INFO_PAGE_DUMP_DATA | INFO_PAGE_DUMP_VALUES;
5803 if( bHex ) flags |= INFO_PAGE_DUMP_HEX;
5804 return infoPageDump(pDb, iPg, flags, pzOut);
5805 }
5806
sortedDumpSegment(lsm_db * pDb,Segment * pRun,int bVals)5807 void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){
5808 assert( pDb->xLog );
5809 if( pRun && pRun->iFirst ){
5810 int flags = (bVals ? INFO_PAGE_DUMP_VALUES : 0);
5811 char *zSeg;
5812 Page *pPg;
5813
5814 zSeg = segToString(pDb->pEnv, pRun, 0);
5815 lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
5816 lsmFree(pDb->pEnv, zSeg);
5817
5818 lsmFsDbPageGet(pDb->pFS, pRun, pRun->iFirst, &pPg);
5819 while( pPg ){
5820 Page *pNext;
5821 char *z = 0;
5822 infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z);
5823 lsmLogMessage(pDb, LSM_OK, "%s", z);
5824 lsmFree(pDb->pEnv, z);
5825 #if 0
5826 sortedDumpPage(pDb, pRun, pPg, bVals);
5827 #endif
5828 lsmFsDbPageNext(pRun, pPg, 1, &pNext);
5829 lsmFsPageRelease(pPg);
5830 pPg = pNext;
5831 }
5832 }
5833 }
5834
5835 /*
5836 ** Invoke the log callback zero or more times with messages that describe
5837 ** the current database structure.
5838 */
lsmSortedDumpStructure(lsm_db * pDb,Snapshot * pSnap,int bKeys,int bVals,const char * zWhy)5839 void lsmSortedDumpStructure(
5840 lsm_db *pDb, /* Database handle (used for xLog callback) */
5841 Snapshot *pSnap, /* Snapshot to dump */
5842 int bKeys, /* Output the keys from each segment */
5843 int bVals, /* Output the values from each segment */
5844 const char *zWhy /* Caption to print near top of dump */
5845 ){
5846 Snapshot *pDump = pSnap;
5847 Level *pTopLevel;
5848 char *zFree = 0;
5849
5850 assert( pSnap );
5851 pTopLevel = lsmDbSnapshotLevel(pDump);
5852 if( pDb->xLog && pTopLevel ){
5853 static int nCall = 0;
5854 Level *pLevel;
5855 int iLevel = 0;
5856
5857 nCall++;
5858 lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);
5859
5860 #if 0
5861 if( nCall==1031 || nCall==1032 ) bKeys=1;
5862 #endif
5863
5864 for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
5865 char zLeft[1024];
5866 char zRight[1024];
5867 int i = 0;
5868
5869 Segment *aLeft[24];
5870 Segment *aRight[24];
5871
5872 int nLeft = 0;
5873 int nRight = 0;
5874
5875 Segment *pSeg = &pLevel->lhs;
5876 aLeft[nLeft++] = pSeg;
5877
5878 for(i=0; i<pLevel->nRight; i++){
5879 aRight[nRight++] = &pLevel->aRhs[i];
5880 }
5881
5882 #ifdef LSM_LOG_FREELIST
5883 if( nRight ){
5884 memmove(&aRight[1], aRight, sizeof(aRight[0])*nRight);
5885 aRight[0] = 0;
5886 nRight++;
5887 }
5888 #endif
5889
5890 for(i=0; i<nLeft || i<nRight; i++){
5891 int iPad = 0;
5892 char zLevel[32];
5893 zLeft[0] = '\0';
5894 zRight[0] = '\0';
5895
5896 if( i<nLeft ){
5897 fileToString(pDb, zLeft, sizeof(zLeft), 24, aLeft[i]);
5898 }
5899 if( i<nRight ){
5900 fileToString(pDb, zRight, sizeof(zRight), 24, aRight[i]);
5901 }
5902
5903 if( i==0 ){
5904 snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d) (flags=%.4x)",
5905 iLevel, (int)pLevel->iAge, (int)pLevel->flags
5906 );
5907 }else{
5908 zLevel[0] = '\0';
5909 }
5910
5911 if( nRight==0 ){
5912 iPad = 10;
5913 }
5914
5915 lsmLogMessage(pDb, LSM_OK, "% 25s % *s% -35s %s",
5916 zLevel, iPad, "", zLeft, zRight
5917 );
5918 }
5919
5920 iLevel++;
5921 }
5922
5923 if( bKeys ){
5924 for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
5925 int i;
5926 sortedDumpSegment(pDb, &pLevel->lhs, bVals);
5927 for(i=0; i<pLevel->nRight; i++){
5928 sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
5929 }
5930 }
5931 }
5932 }
5933
5934 lsmInfoFreelist(pDb, &zFree);
5935 lsmLogMessage(pDb, LSM_OK, "Freelist: %s", zFree);
5936 lsmFree(pDb->pEnv, zFree);
5937
5938 assert( lsmFsIntegrityCheck(pDb) );
5939 }
5940
lsmSortedFreeLevel(lsm_env * pEnv,Level * pLevel)5941 void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
5942 Level *pNext;
5943 Level *p;
5944
5945 for(p=pLevel; p; p=pNext){
5946 pNext = p->pNext;
5947 sortedFreeLevel(pEnv, p);
5948 }
5949 }
5950
lsmSortedSaveTreeCursors(lsm_db * pDb)5951 void lsmSortedSaveTreeCursors(lsm_db *pDb){
5952 MultiCursor *pCsr;
5953 for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
5954 lsmTreeCursorSave(pCsr->apTreeCsr[0]);
5955 lsmTreeCursorSave(pCsr->apTreeCsr[1]);
5956 }
5957 }
5958
lsmSortedExpandBtreePage(Page * pPg,int nOrig)5959 void lsmSortedExpandBtreePage(Page *pPg, int nOrig){
5960 u8 *aData;
5961 int nData;
5962 int nEntry;
5963 int iHdr;
5964
5965 aData = lsmFsPageData(pPg, &nData);
5966 nEntry = pageGetNRec(aData, nOrig);
5967 iHdr = SEGMENT_EOF(nOrig, nEntry);
5968 memmove(&aData[iHdr + (nData-nOrig)], &aData[iHdr], nOrig-iHdr);
5969 }
5970
5971 #ifdef LSM_DEBUG_EXPENSIVE
assertRunInOrder(lsm_db * pDb,Segment * pSeg)5972 static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){
5973 Page *pPg = 0;
5974 Blob blob1 = {0, 0, 0, 0};
5975 Blob blob2 = {0, 0, 0, 0};
5976
5977 lsmFsDbPageGet(pDb->pFS, pSeg, pSeg->iFirst, &pPg);
5978 while( pPg ){
5979 u8 *aData; int nData;
5980 Page *pNext;
5981
5982 aData = lsmFsPageData(pPg, &nData);
5983 if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){
5984 int i;
5985 int nRec = pageGetNRec(aData, nData);
5986 for(i=0; i<nRec; i++){
5987 int iTopic1, iTopic2;
5988 pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i, &iTopic1, &blob1);
5989
5990 if( i==0 && blob2.nData ){
5991 assert( sortedKeyCompare(
5992 pDb->xCmp, iTopic2, blob2.pData, blob2.nData,
5993 iTopic1, blob1.pData, blob1.nData
5994 )<0 );
5995 }
5996
5997 if( i<(nRec-1) ){
5998 pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i+1, &iTopic2, &blob2);
5999 assert( sortedKeyCompare(
6000 pDb->xCmp, iTopic1, blob1.pData, blob1.nData,
6001 iTopic2, blob2.pData, blob2.nData
6002 )<0 );
6003 }
6004 }
6005 }
6006
6007 lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
6008 lsmFsPageRelease(pPg);
6009 pPg = pNext;
6010 }
6011
6012 sortedBlobFree(&blob1);
6013 sortedBlobFree(&blob2);
6014 }
6015 #endif
6016
6017 #ifdef LSM_DEBUG_EXPENSIVE
6018 /*
6019 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6020 ** defined. Its only purpose is to evaluate various assert() statements to
6021 ** verify that the database is well formed in certain respects.
6022 **
6023 ** More specifically, it checks that the array pOne contains the required
6024 ** pointers to pTwo. Array pTwo must be a main array. pOne may be either a
6025 ** separators array or another main array. If pOne does not contain the
6026 ** correct set of pointers, an assert() statement fails.
6027 */
assertPointersOk(lsm_db * pDb,Segment * pOne,Segment * pTwo,int bRhs)6028 static int assertPointersOk(
6029 lsm_db *pDb, /* Database handle */
6030 Segment *pOne, /* Segment containing pointers */
6031 Segment *pTwo, /* Segment containing pointer targets */
6032 int bRhs /* True if pTwo may have been Gobble()d */
6033 ){
6034 int rc = LSM_OK; /* Error code */
6035 SegmentPtr ptr1; /* Iterates through pOne */
6036 SegmentPtr ptr2; /* Iterates through pTwo */
6037 Pgno iPrev;
6038
6039 assert( pOne && pTwo );
6040
6041 memset(&ptr1, 0, sizeof(ptr1));
6042 memset(&ptr2, 0, sizeof(ptr1));
6043 ptr1.pSeg = pOne;
6044 ptr2.pSeg = pTwo;
6045 segmentPtrEndPage(pDb->pFS, &ptr1, 0, &rc);
6046 segmentPtrEndPage(pDb->pFS, &ptr2, 0, &rc);
6047
6048 /* Check that the footer pointer of the first page of pOne points to
6049 ** the first page of pTwo. */
6050 iPrev = pTwo->iFirst;
6051 if( ptr1.iPtr!=iPrev && !bRhs ){
6052 assert( 0 );
6053 }
6054
6055 if( rc==LSM_OK && ptr1.nCell>0 ){
6056 rc = segmentPtrLoadCell(&ptr1, 0);
6057 }
6058
6059 while( rc==LSM_OK && ptr2.pPg ){
6060 Pgno iThis;
6061
6062 /* Advance to the next page of segment pTwo that contains at least
6063 ** one cell. Break out of the loop if the iterator reaches EOF. */
6064 do{
6065 rc = segmentPtrNextPage(&ptr2, 1);
6066 assert( rc==LSM_OK );
6067 }while( rc==LSM_OK && ptr2.pPg && ptr2.nCell==0 );
6068 if( rc!=LSM_OK || ptr2.pPg==0 ) break;
6069 iThis = lsmFsPageNumber(ptr2.pPg);
6070
6071 if( (ptr2.flags & (PGFTR_SKIP_THIS_FLAG|SEGMENT_BTREE_FLAG))==0 ){
6072
6073 /* Load the first cell in the array pTwo page. */
6074 rc = segmentPtrLoadCell(&ptr2, 0);
6075
6076 /* Iterate forwards through pOne, searching for a key that matches the
6077 ** key ptr2.pKey/nKey. This key should have a pointer to the page that
6078 ** ptr2 currently points to. */
6079 while( rc==LSM_OK ){
6080 int res = rtTopic(ptr1.eType) - rtTopic(ptr2.eType);
6081 if( res==0 ){
6082 res = pDb->xCmp(ptr1.pKey, ptr1.nKey, ptr2.pKey, ptr2.nKey);
6083 }
6084
6085 if( res<0 ){
6086 assert( bRhs || ptr1.iPtr+ptr1.iPgPtr==iPrev );
6087 }else if( res>0 ){
6088 assert( 0 );
6089 }else{
6090 assert( ptr1.iPtr+ptr1.iPgPtr==iThis );
6091 iPrev = iThis;
6092 break;
6093 }
6094
6095 rc = segmentPtrAdvance(0, &ptr1, 0);
6096 if( ptr1.pPg==0 ){
6097 assert( 0 );
6098 }
6099 }
6100 }
6101 }
6102
6103 segmentPtrReset(&ptr1, 0);
6104 segmentPtrReset(&ptr2, 0);
6105 return LSM_OK;
6106 }
6107
6108 /*
6109 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6110 ** defined. Its only purpose is to evaluate various assert() statements to
6111 ** verify that the database is well formed in certain respects.
6112 **
6113 ** More specifically, it checks that the b-tree embedded in array pRun
6114 ** contains the correct keys. If not, an assert() fails.
6115 */
assertBtreeOk(lsm_db * pDb,Segment * pSeg)6116 static int assertBtreeOk(
6117 lsm_db *pDb,
6118 Segment *pSeg
6119 ){
6120 int rc = LSM_OK; /* Return code */
6121 if( pSeg->iRoot ){
6122 Blob blob = {0, 0, 0}; /* Buffer used to cache overflow keys */
6123 FileSystem *pFS = pDb->pFS; /* File system to read from */
6124 Page *pPg = 0; /* Main run page */
6125 BtreeCursor *pCsr = 0; /* Btree cursor */
6126
6127 rc = btreeCursorNew(pDb, pSeg, &pCsr);
6128 if( rc==LSM_OK ){
6129 rc = btreeCursorFirst(pCsr);
6130 }
6131 if( rc==LSM_OK ){
6132 rc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pPg);
6133 }
6134
6135 while( rc==LSM_OK ){
6136 Page *pNext;
6137 u8 *aData;
6138 int nData;
6139 int flags;
6140
6141 rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
6142 lsmFsPageRelease(pPg);
6143 pPg = pNext;
6144 if( pPg==0 ) break;
6145 aData = fsPageData(pPg, &nData);
6146 flags = pageGetFlags(aData, nData);
6147 if( rc==LSM_OK
6148 && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
6149 && 0!=pageGetNRec(aData, nData)
6150 ){
6151 u8 *pKey;
6152 int nKey;
6153 int iTopic;
6154 pKey = pageGetKey(pSeg, pPg, 0, &iTopic, &nKey, &blob);
6155 assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
6156 assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
6157 rc = btreeCursorNext(pCsr);
6158 }
6159 }
6160 assert( rc!=LSM_OK || pCsr->pKey==0 );
6161
6162 if( pPg ) lsmFsPageRelease(pPg);
6163
6164 btreeCursorFree(pCsr);
6165 sortedBlobFree(&blob);
6166 }
6167
6168 return rc;
6169 }
6170 #endif /* ifdef LSM_DEBUG_EXPENSIVE */
6171