1 /*
2 ** 2011-08-18
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 **
13 ** The main interface to the LSM module.
14 */
15 #include "lsmInt.h"
16 
17 
18 #ifdef LSM_DEBUG
19 /*
20 ** This function returns a copy of its only argument.
21 **
22 ** When the library is built with LSM_DEBUG defined, this function is called
23 ** whenever an error code is generated (not propagated - generated). So
24 ** if the library is mysteriously returning (say) LSM_IOERR, a breakpoint
25 ** may be set in this function to determine why.
26 */
lsmErrorBkpt(int rc)27 int lsmErrorBkpt(int rc){
28   /* Set breakpoint here! */
29   return rc;
30 }
31 
32 /*
33 ** This function contains various assert() statements that test that the
34 ** lsm_db structure passed as an argument is internally consistent.
35 */
assert_db_state(lsm_db * pDb)36 static void assert_db_state(lsm_db *pDb){
37 
38   /* If there is at least one cursor or a write transaction open, the database
39   ** handle must be holding a pointer to a client snapshot. And the reverse
40   ** - if there are no open cursors and no write transactions then there must
41   ** not be a client snapshot.  */
42 
43   assert( (pDb->pCsr!=0||pDb->nTransOpen>0)==(pDb->iReader>=0||pDb->bRoTrans) );
44 
45   assert( (pDb->iReader<0 && pDb->bRoTrans==0) || pDb->pClient!=0 );
46 
47   assert( pDb->nTransOpen>=0 );
48 }
49 #else
50 # define assert_db_state(x)
51 #endif
52 
53 /*
54 ** The default key-compare function.
55 */
xCmp(void * p1,int n1,void * p2,int n2)56 static int xCmp(void *p1, int n1, void *p2, int n2){
57   int res;
58   res = memcmp(p1, p2, LSM_MIN(n1, n2));
59   if( res==0 ) res = (n1-n2);
60   return res;
61 }
62 
xLog(void * pCtx,int rc,const char * z)63 static void xLog(void *pCtx, int rc, const char *z){
64   (void)(rc);
65   (void)(pCtx);
66   fprintf(stderr, "%s\n", z);
67   fflush(stderr);
68 }
69 
70 /*
71 ** Allocate a new db handle.
72 */
lsm_new(lsm_env * pEnv,lsm_db ** ppDb)73 int lsm_new(lsm_env *pEnv, lsm_db **ppDb){
74   lsm_db *pDb;
75 
76   /* If the user did not provide an environment, use the default. */
77   if( pEnv==0 ) pEnv = lsm_default_env();
78   assert( pEnv );
79 
80   /* Allocate the new database handle */
81   *ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db));
82   if( pDb==0 ) return LSM_NOMEM_BKPT;
83 
84   /* Initialize the new object */
85   pDb->pEnv = pEnv;
86   pDb->nTreeLimit = LSM_DFLT_AUTOFLUSH;
87   pDb->nAutockpt = LSM_DFLT_AUTOCHECKPOINT;
88   pDb->bAutowork = LSM_DFLT_AUTOWORK;
89   pDb->eSafety = LSM_DFLT_SAFETY;
90   pDb->xCmp = xCmp;
91   pDb->nDfltPgsz = LSM_DFLT_PAGE_SIZE;
92   pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
93   pDb->nMerge = LSM_DFLT_AUTOMERGE;
94   pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
95   pDb->bUseLog = LSM_DFLT_USE_LOG;
96   pDb->iReader = -1;
97   pDb->iRwclient = -1;
98   pDb->bMultiProc = LSM_DFLT_MULTIPLE_PROCESSES;
99   pDb->iMmap = LSM_DFLT_MMAP;
100   pDb->xLog = xLog;
101   pDb->compress.iId = LSM_COMPRESSION_NONE;
102   return LSM_OK;
103 }
104 
lsm_get_env(lsm_db * pDb)105 lsm_env *lsm_get_env(lsm_db *pDb){
106   assert( pDb->pEnv );
107   return pDb->pEnv;
108 }
109 
110 /*
111 ** If database handle pDb is currently holding a client snapshot, but does
112 ** not have any open cursors or write transactions, release it.
113 */
dbReleaseClientSnapshot(lsm_db * pDb)114 static void dbReleaseClientSnapshot(lsm_db *pDb){
115   if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
116     lsmFinishReadTrans(pDb);
117   }
118 }
119 
getFullpathname(lsm_env * pEnv,const char * zRel,char ** pzAbs)120 static int getFullpathname(
121   lsm_env *pEnv,
122   const char *zRel,
123   char **pzAbs
124 ){
125   int nAlloc = 0;
126   char *zAlloc = 0;
127   int nReq = 0;
128   int rc;
129 
130   do{
131     nAlloc = nReq;
132     rc = pEnv->xFullpath(pEnv, zRel, zAlloc, &nReq);
133     if( nReq>nAlloc ){
134       zAlloc = lsmReallocOrFreeRc(pEnv, zAlloc, nReq, &rc);
135     }
136   }while( nReq>nAlloc && rc==LSM_OK );
137 
138   if( rc!=LSM_OK ){
139     lsmFree(pEnv, zAlloc);
140     zAlloc = 0;
141   }
142   *pzAbs = zAlloc;
143   return rc;
144 }
145 
146 /*
147 ** Check that the bits in the db->mLock mask are consistent with the
148 ** value stored in db->iRwclient. An assert shall fail otherwise.
149 */
assertRwclientLockValue(lsm_db * db)150 static void assertRwclientLockValue(lsm_db *db){
151 #ifndef NDEBUG
152   u64 msk;                        /* Mask of mLock bits for RWCLIENT locks */
153   u64 rwclient = 0;               /* Bit corresponding to db->iRwclient */
154 
155   if( db->iRwclient>=0 ){
156     rwclient = ((u64)1 << (LSM_LOCK_RWCLIENT(db->iRwclient)-1));
157   }
158   msk  = ((u64)1 << (LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT)-1)) - 1;
159   msk -= (((u64)1 << (LSM_LOCK_RWCLIENT(0)-1)) - 1);
160 
161   assert( (db->mLock & msk)==rwclient );
162 #endif
163 }
164 
165 /*
166 ** Open a new connection to database zFilename.
167 */
lsm_open(lsm_db * pDb,const char * zFilename)168 int lsm_open(lsm_db *pDb, const char *zFilename){
169   int rc;
170 
171   if( pDb->pDatabase ){
172     rc = LSM_MISUSE;
173   }else{
174     char *zFull;
175 
176     /* Translate the possibly relative pathname supplied by the user into
177     ** an absolute pathname. This is required because the supplied path
178     ** is used (either directly or with "-log" appended to it) for more
179     ** than one purpose - to open both the database and log files, and
180     ** perhaps to unlink the log file during disconnection. An absolute
181     ** path is required to ensure that the correct files are operated
182     ** on even if the application changes the cwd.  */
183     rc = getFullpathname(pDb->pEnv, zFilename, &zFull);
184     assert( rc==LSM_OK || zFull==0 );
185 
186     /* Connect to the database. */
187     if( rc==LSM_OK ){
188       rc = lsmDbDatabaseConnect(pDb, zFull);
189     }
190 
191     if( pDb->bReadonly==0 ){
192       /* Configure the file-system connection with the page-size and block-size
193       ** of this database. Even if the database file is zero bytes in size
194       ** on disk, these values have been set in shared-memory by now, and so
195       ** are guaranteed not to change during the lifetime of this connection.
196       */
197       if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb, 0)) ){
198         lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
199         lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
200       }
201     }
202 
203     lsmFree(pDb->pEnv, zFull);
204     assertRwclientLockValue(pDb);
205   }
206 
207   assert( pDb->bReadonly==0 || pDb->bReadonly==1 );
208   assert( rc!=LSM_OK || (pDb->pShmhdr==0)==(pDb->bReadonly==1) );
209 
210   return rc;
211 }
212 
lsm_close(lsm_db * pDb)213 int lsm_close(lsm_db *pDb){
214   int rc = LSM_OK;
215   if( pDb ){
216     assert_db_state(pDb);
217     if( pDb->pCsr || pDb->nTransOpen ){
218       rc = LSM_MISUSE_BKPT;
219     }else{
220       lsmMCursorFreeCache(pDb);
221       lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
222       pDb->pClient = 0;
223 
224       assertRwclientLockValue(pDb);
225 
226       lsmDbDatabaseRelease(pDb);
227       lsmLogClose(pDb);
228       lsmFsClose(pDb->pFS);
229       /* assert( pDb->mLock==0 ); */
230 
231       /* Invoke any destructors registered for the compression or
232       ** compression factory callbacks.  */
233       if( pDb->factory.xFree ) pDb->factory.xFree(pDb->factory.pCtx);
234       if( pDb->compress.xFree ) pDb->compress.xFree(pDb->compress.pCtx);
235 
236       lsmFree(pDb->pEnv, pDb->rollback.aArray);
237       lsmFree(pDb->pEnv, pDb->aTrans);
238       lsmFree(pDb->pEnv, pDb->apShm);
239       lsmFree(pDb->pEnv, pDb);
240     }
241   }
242   return rc;
243 }
244 
lsm_config(lsm_db * pDb,int eParam,...)245 int lsm_config(lsm_db *pDb, int eParam, ...){
246   int rc = LSM_OK;
247   va_list ap;
248   va_start(ap, eParam);
249 
250   switch( eParam ){
251     case LSM_CONFIG_AUTOFLUSH: {
252       /* This parameter is read and written in KB. But all internal
253       ** processing is done in bytes.  */
254       int *piVal = va_arg(ap, int *);
255       int iVal = *piVal;
256       if( iVal>=0 && iVal<=(1024*1024) ){
257         pDb->nTreeLimit = iVal*1024;
258       }
259       *piVal = (pDb->nTreeLimit / 1024);
260       break;
261     }
262 
263     case LSM_CONFIG_AUTOWORK: {
264       int *piVal = va_arg(ap, int *);
265       if( *piVal>=0 ){
266         pDb->bAutowork = *piVal;
267       }
268       *piVal = pDb->bAutowork;
269       break;
270     }
271 
272     case LSM_CONFIG_AUTOCHECKPOINT: {
273       /* This parameter is read and written in KB. But all internal processing
274       ** (including the lsm_db.nAutockpt variable) is done in bytes.  */
275       int *piVal = va_arg(ap, int *);
276       if( *piVal>=0 ){
277         int iVal = *piVal;
278         pDb->nAutockpt = (i64)iVal * 1024;
279       }
280       *piVal = (int)(pDb->nAutockpt / 1024);
281       break;
282     }
283 
284     case LSM_CONFIG_PAGE_SIZE: {
285       int *piVal = va_arg(ap, int *);
286       if( pDb->pDatabase ){
287         /* If lsm_open() has been called, this is a read-only parameter.
288         ** Set the output variable to the page-size according to the
289         ** FileSystem object.  */
290         *piVal = lsmFsPageSize(pDb->pFS);
291       }else{
292         if( *piVal>=256 && *piVal<=65536 && ((*piVal-1) & *piVal)==0 ){
293           pDb->nDfltPgsz = *piVal;
294         }else{
295           *piVal = pDb->nDfltPgsz;
296         }
297       }
298       break;
299     }
300 
301     case LSM_CONFIG_BLOCK_SIZE: {
302       /* This parameter is read and written in KB. But all internal
303       ** processing is done in bytes.  */
304       int *piVal = va_arg(ap, int *);
305       if( pDb->pDatabase ){
306         /* If lsm_open() has been called, this is a read-only parameter.
307         ** Set the output variable to the block-size in KB according to the
308         ** FileSystem object.  */
309         *piVal = lsmFsBlockSize(pDb->pFS) / 1024;
310       }else{
311         int iVal = *piVal;
312         if( iVal>=64 && iVal<=65536 && ((iVal-1) & iVal)==0 ){
313           pDb->nDfltBlksz = iVal * 1024;
314         }else{
315           *piVal = pDb->nDfltBlksz / 1024;
316         }
317       }
318       break;
319     }
320 
321     case LSM_CONFIG_SAFETY: {
322       int *piVal = va_arg(ap, int *);
323       if( *piVal>=0 && *piVal<=2 ){
324         pDb->eSafety = *piVal;
325       }
326       *piVal = pDb->eSafety;
327       break;
328     }
329 
330     case LSM_CONFIG_MMAP: {
331       int *piVal = va_arg(ap, int *);
332       if( pDb->iReader<0 && *piVal>=0 ){
333         pDb->iMmap = *piVal;
334         rc = lsmFsConfigure(pDb);
335       }
336       *piVal = pDb->iMmap;
337       break;
338     }
339 
340     case LSM_CONFIG_USE_LOG: {
341       int *piVal = va_arg(ap, int *);
342       if( pDb->nTransOpen==0 && (*piVal==0 || *piVal==1) ){
343         pDb->bUseLog = *piVal;
344       }
345       *piVal = pDb->bUseLog;
346       break;
347     }
348 
349     case LSM_CONFIG_AUTOMERGE: {
350       int *piVal = va_arg(ap, int *);
351       if( *piVal>1 ) pDb->nMerge = *piVal;
352       *piVal = pDb->nMerge;
353       break;
354     }
355 
356     case LSM_CONFIG_MAX_FREELIST: {
357       int *piVal = va_arg(ap, int *);
358       if( *piVal>=2 && *piVal<=LSM_MAX_FREELIST_ENTRIES ){
359         pDb->nMaxFreelist = *piVal;
360       }
361       *piVal = pDb->nMaxFreelist;
362       break;
363     }
364 
365     case LSM_CONFIG_MULTIPLE_PROCESSES: {
366       int *piVal = va_arg(ap, int *);
367       if( pDb->pDatabase ){
368         /* If lsm_open() has been called, this is a read-only parameter.
369         ** Set the output variable to true if this connection is currently
370         ** in multi-process mode.  */
371         *piVal = lsmDbMultiProc(pDb);
372       }else{
373         pDb->bMultiProc = *piVal = (*piVal!=0);
374       }
375       break;
376     }
377 
378     case LSM_CONFIG_READONLY: {
379       int *piVal = va_arg(ap, int *);
380       /* If lsm_open() has been called, this is a read-only parameter. */
381       if( pDb->pDatabase==0 && *piVal>=0 ){
382         pDb->bReadonly = *piVal = (*piVal!=0);
383       }
384       *piVal = pDb->bReadonly;
385       break;
386     }
387 
388     case LSM_CONFIG_SET_COMPRESSION: {
389       lsm_compress *p = va_arg(ap, lsm_compress *);
390       if( pDb->iReader>=0 && pDb->bInFactory==0 ){
391         /* May not change compression schemes with an open transaction */
392         rc = LSM_MISUSE_BKPT;
393       }else{
394         if( pDb->compress.xFree ){
395           /* Invoke any destructor belonging to the current compression. */
396           pDb->compress.xFree(pDb->compress.pCtx);
397         }
398         if( p->xBound==0 ){
399           memset(&pDb->compress, 0, sizeof(lsm_compress));
400           pDb->compress.iId = LSM_COMPRESSION_NONE;
401         }else{
402           memcpy(&pDb->compress, p, sizeof(lsm_compress));
403         }
404         rc = lsmFsConfigure(pDb);
405       }
406       break;
407     }
408 
409     case LSM_CONFIG_SET_COMPRESSION_FACTORY: {
410       lsm_compress_factory *p = va_arg(ap, lsm_compress_factory *);
411       if( pDb->factory.xFree ){
412         /* Invoke any destructor belonging to the current factory. */
413         pDb->factory.xFree(pDb->factory.pCtx);
414       }
415       memcpy(&pDb->factory, p, sizeof(lsm_compress_factory));
416       break;
417     }
418 
419     case LSM_CONFIG_GET_COMPRESSION: {
420       lsm_compress *p = va_arg(ap, lsm_compress *);
421       memcpy(p, &pDb->compress, sizeof(lsm_compress));
422       break;
423     }
424 
425     default:
426       rc = LSM_MISUSE;
427       break;
428   }
429 
430   va_end(ap);
431   return rc;
432 }
433 
lsmAppendSegmentList(LsmString * pStr,char * zPre,Segment * pSeg)434 void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
435   lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre,
436         pSeg->iFirst, pSeg->iLastPg, pSeg->iRoot, pSeg->nSize
437   );
438 }
439 
infoGetWorker(lsm_db * pDb,Snapshot ** pp,int * pbUnlock)440 static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){
441   int rc = LSM_OK;
442 
443   assert( *pbUnlock==0 );
444   if( !pDb->pWorker ){
445     rc = lsmBeginWork(pDb);
446     if( rc!=LSM_OK ) return rc;
447     *pbUnlock = 1;
448   }
449   if( pp ) *pp = pDb->pWorker;
450   return rc;
451 }
452 
infoFreeWorker(lsm_db * pDb,int bUnlock)453 static void infoFreeWorker(lsm_db *pDb, int bUnlock){
454   if( bUnlock ){
455     int rcdummy = LSM_BUSY;
456     lsmFinishWork(pDb, 0, &rcdummy);
457   }
458 }
459 
lsmStructList(lsm_db * pDb,char ** pzOut)460 int lsmStructList(
461   lsm_db *pDb,                    /* Database handle */
462   char **pzOut                    /* OUT: Nul-terminated string (tcl list) */
463 ){
464   Level *pTopLevel = 0;           /* Top level of snapshot to report on */
465   int rc = LSM_OK;
466   Level *p;
467   LsmString s;
468   Snapshot *pWorker;              /* Worker snapshot */
469   int bUnlock = 0;
470 
471   /* Obtain the worker snapshot */
472   rc = infoGetWorker(pDb, &pWorker, &bUnlock);
473   if( rc!=LSM_OK ) return rc;
474 
475   /* Format the contents of the snapshot as text */
476   pTopLevel = lsmDbSnapshotLevel(pWorker);
477   lsmStringInit(&s, pDb->pEnv);
478   for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){
479     int i;
480     lsmStringAppendf(&s, "%s{%d", (s.n ? " " : ""), (int)p->iAge);
481     lsmAppendSegmentList(&s, " ", &p->lhs);
482     for(i=0; rc==LSM_OK && i<p->nRight; i++){
483       lsmAppendSegmentList(&s, " ", &p->aRhs[i]);
484     }
485     lsmStringAppend(&s, "}", 1);
486   }
487   rc = s.n>=0 ? LSM_OK : LSM_NOMEM;
488 
489   /* Release the snapshot and return */
490   infoFreeWorker(pDb, bUnlock);
491   *pzOut = s.z;
492   return rc;
493 }
494 
infoFreelistCb(void * pCtx,int iBlk,i64 iSnapshot)495 static int infoFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
496   LsmString *pStr = (LsmString *)pCtx;
497   lsmStringAppendf(pStr, "%s{%d %lld}", (pStr->n?" ":""), iBlk, iSnapshot);
498   return 0;
499 }
500 
lsmInfoFreelist(lsm_db * pDb,char ** pzOut)501 int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
502   Snapshot *pWorker;              /* Worker snapshot */
503   int bUnlock = 0;
504   LsmString s;
505   int rc;
506 
507   /* Obtain the worker snapshot */
508   rc = infoGetWorker(pDb, &pWorker, &bUnlock);
509   if( rc!=LSM_OK ) return rc;
510 
511   lsmStringInit(&s, pDb->pEnv);
512   rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s);
513   if( rc!=LSM_OK ){
514     lsmFree(pDb->pEnv, s.z);
515   }else{
516     *pzOut = s.z;
517   }
518 
519   /* Release the snapshot and return */
520   infoFreeWorker(pDb, bUnlock);
521   return rc;
522 }
523 
infoTreeSize(lsm_db * db,int * pnOldKB,int * pnNewKB)524 static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){
525   ShmHeader *pShm = db->pShmhdr;
526   TreeHeader *p = &pShm->hdr1;
527 
528   /* The following code suffers from two race conditions, as it accesses and
529   ** trusts the contents of shared memory without verifying checksums:
530   **
531   **   * The two values read - TreeHeader.root.nByte and oldroot.nByte - are
532   **     32-bit fields. It is assumed that reading from one of these
533   **     is atomic - that it is not possible to read a partially written
534   **     garbage value. However the two values may be mutually inconsistent.
535   **
536   **   * TreeHeader.iLogOff is a 64-bit value. And lsmCheckpointLogOffset()
537   **     reads a 64-bit value from a snapshot stored in shared memory. It
538   **     is assumed that in each case it is possible to read a partially
539   **     written garbage value. If this occurs, then the value returned
540   **     for the size of the "old" tree may reflect the size of an "old"
541   **     tree that was recently flushed to disk.
542   **
543   ** Given the context in which this function is called (as a result of an
544   ** lsm_info(LSM_INFO_TREE_SIZE) request), neither of these are considered to
545   ** be problems.
546   */
547   *pnNewKB = ((int)p->root.nByte + 1023) / 1024;
548   if( p->iOldShmid ){
549     if( p->iOldLog==lsmCheckpointLogOffset(pShm->aSnap1) ){
550       *pnOldKB = 0;
551     }else{
552       *pnOldKB = ((int)p->oldroot.nByte + 1023) / 1024;
553     }
554   }else{
555     *pnOldKB = 0;
556   }
557 
558   return LSM_OK;
559 }
560 
lsm_info(lsm_db * pDb,int eParam,...)561 int lsm_info(lsm_db *pDb, int eParam, ...){
562   int rc = LSM_OK;
563   va_list ap;
564   va_start(ap, eParam);
565 
566   switch( eParam ){
567     case LSM_INFO_NWRITE: {
568       int *piVal = va_arg(ap, int *);
569       *piVal = lsmFsNWrite(pDb->pFS);
570       break;
571     }
572 
573     case LSM_INFO_NREAD: {
574       int *piVal = va_arg(ap, int *);
575       *piVal = lsmFsNRead(pDb->pFS);
576       break;
577     }
578 
579     case LSM_INFO_DB_STRUCTURE: {
580       char **pzVal = va_arg(ap, char **);
581       rc = lsmStructList(pDb, pzVal);
582       break;
583     }
584 
585     case LSM_INFO_ARRAY_STRUCTURE: {
586       LsmPgno pgno = va_arg(ap, LsmPgno);
587       char **pzVal = va_arg(ap, char **);
588       rc = lsmInfoArrayStructure(pDb, 0, pgno, pzVal);
589       break;
590     }
591 
592     case LSM_INFO_ARRAY_PAGES: {
593       LsmPgno pgno = va_arg(ap, LsmPgno);
594       char **pzVal = va_arg(ap, char **);
595       rc = lsmInfoArrayPages(pDb, pgno, pzVal);
596       break;
597     }
598 
599     case LSM_INFO_PAGE_HEX_DUMP:
600     case LSM_INFO_PAGE_ASCII_DUMP: {
601       LsmPgno pgno = va_arg(ap, LsmPgno);
602       char **pzVal = va_arg(ap, char **);
603       int bUnlock = 0;
604       rc = infoGetWorker(pDb, 0, &bUnlock);
605       if( rc==LSM_OK ){
606         int bHex = (eParam==LSM_INFO_PAGE_HEX_DUMP);
607         rc = lsmInfoPageDump(pDb, pgno, bHex, pzVal);
608       }
609       infoFreeWorker(pDb, bUnlock);
610       break;
611     }
612 
613     case LSM_INFO_LOG_STRUCTURE: {
614       char **pzVal = va_arg(ap, char **);
615       rc = lsmInfoLogStructure(pDb, pzVal);
616       break;
617     }
618 
619     case LSM_INFO_FREELIST: {
620       char **pzVal = va_arg(ap, char **);
621       rc = lsmInfoFreelist(pDb, pzVal);
622       break;
623     }
624 
625     case LSM_INFO_CHECKPOINT_SIZE: {
626       int *pnKB = va_arg(ap, int *);
627       rc = lsmCheckpointSize(pDb, pnKB);
628       break;
629     }
630 
631     case LSM_INFO_TREE_SIZE: {
632       int *pnOld = va_arg(ap, int *);
633       int *pnNew = va_arg(ap, int *);
634       rc = infoTreeSize(pDb, pnOld, pnNew);
635       break;
636     }
637 
638     case LSM_INFO_COMPRESSION_ID: {
639       unsigned int *piOut = va_arg(ap, unsigned int *);
640       if( pDb->pClient ){
641         *piOut = pDb->pClient->iCmpId;
642       }else{
643         rc = lsmInfoCompressionId(pDb, piOut);
644       }
645       break;
646     }
647 
648     default:
649       rc = LSM_MISUSE;
650       break;
651   }
652 
653   va_end(ap);
654   return rc;
655 }
656 
doWriteOp(lsm_db * pDb,int bDeleteRange,const void * pKey,int nKey,const void * pVal,int nVal)657 static int doWriteOp(
658   lsm_db *pDb,
659   int bDeleteRange,
660   const void *pKey, int nKey,     /* Key to write or delete */
661   const void *pVal, int nVal      /* Value to write. Or nVal==-1 for a delete */
662 ){
663   int rc = LSM_OK;                /* Return code */
664   int bCommit = 0;                /* True to commit before returning */
665 
666   if( pDb->nTransOpen==0 ){
667     bCommit = 1;
668     rc = lsm_begin(pDb, 1);
669   }
670 
671   if( rc==LSM_OK ){
672     int eType = (bDeleteRange ? LSM_DRANGE : (nVal>=0?LSM_WRITE:LSM_DELETE));
673     rc = lsmLogWrite(pDb, eType, (void *)pKey, nKey, (void *)pVal, nVal);
674   }
675 
676   lsmSortedSaveTreeCursors(pDb);
677 
678   if( rc==LSM_OK ){
679     int pgsz = lsmFsPageSize(pDb->pFS);
680     int nQuant = LSM_AUTOWORK_QUANT * pgsz;
681     int nBefore;
682     int nAfter;
683     int nDiff;
684 
685     if( nQuant>pDb->nTreeLimit ){
686       nQuant = LSM_MAX(pDb->nTreeLimit, pgsz);
687     }
688 
689     nBefore = lsmTreeSize(pDb);
690     if( bDeleteRange ){
691       rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
692     }else{
693       rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
694     }
695 
696     nAfter = lsmTreeSize(pDb);
697     nDiff = (nAfter/nQuant) - (nBefore/nQuant);
698     if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
699       rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
700     }
701   }
702 
703   /* If a transaction was opened at the start of this function, commit it.
704   ** Or, if an error has occurred, roll it back.  */
705   if( bCommit ){
706     if( rc==LSM_OK ){
707       rc = lsm_commit(pDb, 0);
708     }else{
709       lsm_rollback(pDb, 0);
710     }
711   }
712 
713   return rc;
714 }
715 
716 /*
717 ** Write a new value into the database.
718 */
lsm_insert(lsm_db * db,const void * pKey,int nKey,const void * pVal,int nVal)719 int lsm_insert(
720   lsm_db *db,                     /* Database connection */
721   const void *pKey, int nKey,     /* Key to write or delete */
722   const void *pVal, int nVal      /* Value to write. Or nVal==-1 for a delete */
723 ){
724   return doWriteOp(db, 0, pKey, nKey, pVal, nVal);
725 }
726 
727 /*
728 ** Delete a value from the database.
729 */
lsm_delete(lsm_db * db,const void * pKey,int nKey)730 int lsm_delete(lsm_db *db, const void *pKey, int nKey){
731   return doWriteOp(db, 0, pKey, nKey, 0, -1);
732 }
733 
734 /*
735 ** Delete a range of database keys.
736 */
lsm_delete_range(lsm_db * db,const void * pKey1,int nKey1,const void * pKey2,int nKey2)737 int lsm_delete_range(
738   lsm_db *db,                     /* Database handle */
739   const void *pKey1, int nKey1,   /* Lower bound of range to delete */
740   const void *pKey2, int nKey2    /* Upper bound of range to delete */
741 ){
742   int rc = LSM_OK;
743   if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){
744     rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2);
745   }
746   return rc;
747 }
748 
749 /*
750 ** Open a new cursor handle.
751 **
752 ** If there are currently no other open cursor handles, and no open write
753 ** transaction, open a read transaction here.
754 */
lsm_csr_open(lsm_db * pDb,lsm_cursor ** ppCsr)755 int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr){
756   int rc = LSM_OK;                /* Return code */
757   MultiCursor *pCsr = 0;          /* New cursor object */
758 
759   /* Open a read transaction if one is not already open. */
760   assert_db_state(pDb);
761 
762   if( pDb->pShmhdr==0 ){
763     assert( pDb->bReadonly );
764     rc = lsmBeginRoTrans(pDb);
765   }else if( pDb->iReader<0 ){
766     rc = lsmBeginReadTrans(pDb);
767   }
768 
769   /* Allocate the multi-cursor. */
770   if( rc==LSM_OK ){
771     rc = lsmMCursorNew(pDb, &pCsr);
772   }
773 
774   /* If an error has occured, set the output to NULL and delete any partially
775   ** allocated cursor. If this means there are no open cursors, release the
776   ** client snapshot.  */
777   if( rc!=LSM_OK ){
778     lsmMCursorClose(pCsr, 0);
779     dbReleaseClientSnapshot(pDb);
780   }
781 
782   assert_db_state(pDb);
783   *ppCsr = (lsm_cursor *)pCsr;
784   return rc;
785 }
786 
787 /*
788 ** Close a cursor opened using lsm_csr_open().
789 */
lsm_csr_close(lsm_cursor * p)790 int lsm_csr_close(lsm_cursor *p){
791   if( p ){
792     lsm_db *pDb = lsmMCursorDb((MultiCursor *)p);
793     assert_db_state(pDb);
794     lsmMCursorClose((MultiCursor *)p, 1);
795     dbReleaseClientSnapshot(pDb);
796     assert_db_state(pDb);
797   }
798   return LSM_OK;
799 }
800 
801 /*
802 ** Attempt to seek the cursor to the database entry specified by pKey/nKey.
803 ** If an error occurs (e.g. an OOM or IO error), return an LSM error code.
804 ** Otherwise, return LSM_OK.
805 */
lsm_csr_seek(lsm_cursor * pCsr,const void * pKey,int nKey,int eSeek)806 int lsm_csr_seek(lsm_cursor *pCsr, const void *pKey, int nKey, int eSeek){
807   return lsmMCursorSeek((MultiCursor *)pCsr, 0, (void *)pKey, nKey, eSeek);
808 }
809 
lsm_csr_next(lsm_cursor * pCsr)810 int lsm_csr_next(lsm_cursor *pCsr){
811   return lsmMCursorNext((MultiCursor *)pCsr);
812 }
813 
lsm_csr_prev(lsm_cursor * pCsr)814 int lsm_csr_prev(lsm_cursor *pCsr){
815   return lsmMCursorPrev((MultiCursor *)pCsr);
816 }
817 
lsm_csr_first(lsm_cursor * pCsr)818 int lsm_csr_first(lsm_cursor *pCsr){
819   return lsmMCursorFirst((MultiCursor *)pCsr);
820 }
821 
lsm_csr_last(lsm_cursor * pCsr)822 int lsm_csr_last(lsm_cursor *pCsr){
823   return lsmMCursorLast((MultiCursor *)pCsr);
824 }
825 
lsm_csr_valid(lsm_cursor * pCsr)826 int lsm_csr_valid(lsm_cursor *pCsr){
827   return lsmMCursorValid((MultiCursor *)pCsr);
828 }
829 
lsm_csr_key(lsm_cursor * pCsr,const void ** ppKey,int * pnKey)830 int lsm_csr_key(lsm_cursor *pCsr, const void **ppKey, int *pnKey){
831   return lsmMCursorKey((MultiCursor *)pCsr, (void **)ppKey, pnKey);
832 }
833 
lsm_csr_value(lsm_cursor * pCsr,const void ** ppVal,int * pnVal)834 int lsm_csr_value(lsm_cursor *pCsr, const void **ppVal, int *pnVal){
835   return lsmMCursorValue((MultiCursor *)pCsr, (void **)ppVal, pnVal);
836 }
837 
lsm_config_log(lsm_db * pDb,void (* xLog)(void *,int,const char *),void * pCtx)838 void lsm_config_log(
839   lsm_db *pDb,
840   void (*xLog)(void *, int, const char *),
841   void *pCtx
842 ){
843   pDb->xLog = xLog;
844   pDb->pLogCtx = pCtx;
845 }
846 
lsm_config_work_hook(lsm_db * pDb,void (* xWork)(lsm_db *,void *),void * pCtx)847 void lsm_config_work_hook(
848   lsm_db *pDb,
849   void (*xWork)(lsm_db *, void *),
850   void *pCtx
851 ){
852   pDb->xWork = xWork;
853   pDb->pWorkCtx = pCtx;
854 }
855 
lsmLogMessage(lsm_db * pDb,int rc,const char * zFormat,...)856 void lsmLogMessage(lsm_db *pDb, int rc, const char *zFormat, ...){
857   if( pDb->xLog ){
858     LsmString s;
859     va_list ap, ap2;
860     lsmStringInit(&s, pDb->pEnv);
861     va_start(ap, zFormat);
862     va_start(ap2, zFormat);
863     lsmStringVAppendf(&s, zFormat, ap, ap2);
864     va_end(ap);
865     va_end(ap2);
866     pDb->xLog(pDb->pLogCtx, rc, s.z);
867     lsmStringClear(&s);
868   }
869 }
870 
lsm_begin(lsm_db * pDb,int iLevel)871 int lsm_begin(lsm_db *pDb, int iLevel){
872   int rc;
873 
874   assert_db_state( pDb );
875   rc = (pDb->bReadonly ? LSM_READONLY : LSM_OK);
876 
877   /* A value less than zero means open one more transaction. */
878   if( iLevel<0 ) iLevel = pDb->nTransOpen + 1;
879   if( iLevel>pDb->nTransOpen ){
880     int i;
881 
882     /* Extend the pDb->aTrans[] array if required. */
883     if( rc==LSM_OK && pDb->nTransAlloc<iLevel ){
884       TransMark *aNew;            /* New allocation */
885       int nByte = sizeof(TransMark) * (iLevel+1);
886       aNew = (TransMark *)lsmRealloc(pDb->pEnv, pDb->aTrans, nByte);
887       if( !aNew ){
888         rc = LSM_NOMEM;
889       }else{
890         nByte = sizeof(TransMark) * (iLevel+1 - pDb->nTransAlloc);
891         memset(&aNew[pDb->nTransAlloc], 0, nByte);
892         pDb->nTransAlloc = iLevel+1;
893         pDb->aTrans = aNew;
894       }
895     }
896 
897     if( rc==LSM_OK && pDb->nTransOpen==0 ){
898       rc = lsmBeginWriteTrans(pDb);
899     }
900 
901     if( rc==LSM_OK ){
902       for(i=pDb->nTransOpen; i<iLevel; i++){
903         lsmTreeMark(pDb, &pDb->aTrans[i].tree);
904         lsmLogTell(pDb, &pDb->aTrans[i].log);
905       }
906       pDb->nTransOpen = iLevel;
907     }
908   }
909 
910   return rc;
911 }
912 
lsm_commit(lsm_db * pDb,int iLevel)913 int lsm_commit(lsm_db *pDb, int iLevel){
914   int rc = LSM_OK;
915 
916   assert_db_state( pDb );
917 
918   /* A value less than zero means close the innermost nested transaction. */
919   if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
920 
921   if( iLevel<pDb->nTransOpen ){
922     if( iLevel==0 ){
923       int rc2;
924       /* Commit the transaction to disk. */
925       if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
926       if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
927         rc = lsmFsSyncLog(pDb->pFS);
928       }
929       rc2 = lsmFinishWriteTrans(pDb, (rc==LSM_OK));
930       if( rc==LSM_OK ) rc = rc2;
931     }
932     pDb->nTransOpen = iLevel;
933   }
934   dbReleaseClientSnapshot(pDb);
935   return rc;
936 }
937 
lsm_rollback(lsm_db * pDb,int iLevel)938 int lsm_rollback(lsm_db *pDb, int iLevel){
939   int rc = LSM_OK;
940   assert_db_state( pDb );
941 
942   if( pDb->nTransOpen ){
943     /* A value less than zero means close the innermost nested transaction. */
944     if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
945 
946     if( iLevel<=pDb->nTransOpen ){
947       TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
948       lsmTreeRollback(pDb, &pMark->tree);
949       if( iLevel ) lsmLogSeek(pDb, &pMark->log);
950       pDb->nTransOpen = iLevel;
951     }
952 
953     if( pDb->nTransOpen==0 ){
954       lsmFinishWriteTrans(pDb, 0);
955     }
956     dbReleaseClientSnapshot(pDb);
957   }
958 
959   return rc;
960 }
961 
lsm_get_user_version(lsm_db * pDb,unsigned int * piUsr)962 int lsm_get_user_version(lsm_db *pDb, unsigned int *piUsr){
963   int rc = LSM_OK;                /* Return code */
964 
965   /* Open a read transaction if one is not already open. */
966   assert_db_state(pDb);
967   if( pDb->pShmhdr==0 ){
968     assert( pDb->bReadonly );
969     rc = lsmBeginRoTrans(pDb);
970   }else if( pDb->iReader<0 ){
971     rc = lsmBeginReadTrans(pDb);
972   }
973 
974   /* Allocate the multi-cursor. */
975   if( rc==LSM_OK ){
976     *piUsr = pDb->treehdr.iUsrVersion;
977   }
978 
979   dbReleaseClientSnapshot(pDb);
980   assert_db_state(pDb);
981   return rc;
982 }
983 
lsm_set_user_version(lsm_db * pDb,unsigned int iUsr)984 int lsm_set_user_version(lsm_db *pDb, unsigned int iUsr){
985   int rc = LSM_OK;                /* Return code */
986   int bCommit = 0;                /* True to commit before returning */
987 
988   if( pDb->nTransOpen==0 ){
989     bCommit = 1;
990     rc = lsm_begin(pDb, 1);
991   }
992 
993   if( rc==LSM_OK ){
994     pDb->treehdr.iUsrVersion = iUsr;
995   }
996 
997   /* If a transaction was opened at the start of this function, commit it.
998   ** Or, if an error has occurred, roll it back.  */
999   if( bCommit ){
1000     if( rc==LSM_OK ){
1001       rc = lsm_commit(pDb, 0);
1002     }else{
1003       lsm_rollback(pDb, 0);
1004     }
1005   }
1006 
1007   return rc;
1008 }
1009