1 // -V::512
2 
3 #include "iwkv_internal.h"
4 #include "iwconv.h"
5 #include <stdalign.h>
6 
7 static iwrc _dbcache_fill_lw(IWLCTX *lx);
8 static iwrc _dbcache_get(IWLCTX *lx);
9 static iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk);
10 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk);
11 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk);
12 static void _dbcache_destroy_lw(IWDB db);
13 
14 #define _wnw_db_wl(db_) _api_db_wlock(db_)
15 
16 //-------------------------- GLOBALS
17 
18 #ifdef IW_TESTS
19 volatile int8_t iwkv_next_level = -1;
20 #endif
21 atomic_uint_fast64_t g_trigger;
22 
23 #define IWKV_IS_INTERNAL_RC(rc_) ((rc_) > _IWKV_ERROR_END && (rc_) < _IWKV_RC_END)
24 
25 //-------------------------- UTILS
26 
_to_effective_key(struct _IWDB * db,const IWKV_val * key,IWKV_val * okey,uint8_t nbuf[static IW_VNUMBUFSZ])27 IW_SOFT_INLINE iwrc _to_effective_key(
28   struct _IWDB *db, const IWKV_val *key, IWKV_val *okey,
29   uint8_t nbuf[static IW_VNUMBUFSZ]) {
30   static_assert(IW_VNUMBUFSZ >= sizeof(uint64_t), "IW_VNUMBUFSZ >= sizeof(uint64_t)");
31   iwdb_flags_t dbflg = db->dbflg;
32   // Keys compound will be processed at lower levels at `addkv` routines
33   okey->compound = key->compound;
34   if (dbflg & IWDB_VNUM64_KEYS) {
35     unsigned len;
36     if (key->size == 8) {
37       uint64_t llv;
38       memcpy(&llv, key->data, sizeof(llv));
39       IW_SETVNUMBUF64(len, nbuf, llv);
40       if (!len) {
41         return IW_ERROR_OVERFLOW;
42       }
43       okey->size = len;
44       okey->data = nbuf;
45     } else if (key->size == 4) {
46       uint32_t lv;
47       memcpy(&lv, key->data, sizeof(lv));
48       IW_SETVNUMBUF(len, nbuf, lv);
49       if (!len) {
50         return IW_ERROR_OVERFLOW;
51       }
52       okey->size = len;
53       okey->data = nbuf;
54     } else {
55       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
56     }
57   } else {
58     okey->data = key->data;
59     okey->size = key->size;
60   }
61   return 0;
62 }
63 
64 // NOTE: at least `2*IW_VNUMBUFSZ` must be allocated for key->data
_unpack_effective_key(struct _IWDB * db,IWKV_val * key,bool no_move_key_data)65 static iwrc _unpack_effective_key(struct _IWDB *db, IWKV_val *key, bool no_move_key_data) {
66   iwdb_flags_t dbflg = db->dbflg;
67   uint8_t *data = key->data;
68   if (dbflg & IWDB_COMPOUND_KEYS) {
69     int step;
70     IW_READVNUMBUF64(key->data, key->compound, step);
71     if (step >= key->size) {
72       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
73     }
74     data += step;
75     key->size -= step;
76     if (!no_move_key_data && !(dbflg & IWDB_VNUM64_KEYS)) {
77       memmove(key->data, data, key->size);
78     }
79   } else {
80     key->compound = 0;
81   }
82   if (dbflg & IWDB_VNUM64_KEYS) {
83     int64_t llv;
84     char nbuf[IW_VNUMBUFSZ];
85     if (key->size > IW_VNUMBUFSZ) {
86       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
87     }
88     memcpy(nbuf, data, key->size);
89     IW_READVNUMBUF64_2(nbuf, llv);
90     memcpy(key->data, &llv, sizeof(llv));
91     key->size = sizeof(llv);
92   }
93   return 0;
94 }
95 
_cmp_keys_prefix(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)96 static int _cmp_keys_prefix(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
97   int ret;
98   if (dbflg & IWDB_COMPOUND_KEYS) {
99     // Compound keys mode
100     const char *u1 = v1;
101     const char *u2 = key->data;
102     int step, v2len = (int) key->size;
103     int64_t c1, c2 = key->compound;
104     IW_READVNUMBUF64(v1, c1, step);
105     v1len -= step;
106     u1 += step;
107     if (v1len < 1) {
108       // Inconsistent data?
109       return v2len - v1len;
110     }
111     if (dbflg & IWDB_VNUM64_KEYS) {
112       if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
113         return v2len - v1len;
114       }
115       int64_t n1, n2;
116       char vbuf[IW_VNUMBUFSZ];
117       memcpy(vbuf, u1, v1len);
118       IW_READVNUMBUF64_2(vbuf, n1);
119       memcpy(vbuf, u2, v2len);
120       IW_READVNUMBUF64_2(vbuf, n2);
121       ret = n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
122       if (ret == 0) {
123         ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
124       }
125     } else if (dbflg & IWDB_REALNUM_KEYS) {
126       ret = iwafcmp(u2, v2len, u1, v1len);
127       if (ret == 0) {
128         ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
129       }
130     } else {
131       IW_CMP2(ret, u2, v2len, u1, v1len);
132     }
133     return ret;
134   } else {
135     int v2len = (int) key->size;
136     const void *v2 = key->data;
137     if (dbflg & IWDB_VNUM64_KEYS) {
138       if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
139         return v2len - v1len;
140       }
141       int64_t n1, n2;
142       char vbuf[IW_VNUMBUFSZ];
143       memcpy(vbuf, v1, v1len);
144       IW_READVNUMBUF64_2(vbuf, n1);
145       memcpy(vbuf, v2, v2len);
146       IW_READVNUMBUF64_2(vbuf, n2);
147       return n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
148     } else if (dbflg & IWDB_REALNUM_KEYS) {
149       return iwafcmp(v2, v2len, v1, v1len);
150     } else {
151       IW_CMP2(ret, v2, v2len, v1, v1len);
152       return ret;
153     }
154   }
155 }
156 
_cmp_keys(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)157 IW_INLINE int _cmp_keys(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
158   int rv = _cmp_keys_prefix(dbflg, v1, v1len, key);
159   if ((rv == 0) && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
160     if (dbflg & IWDB_COMPOUND_KEYS) {
161       int step;
162       int64_t c1, c2 = key->compound;
163       IW_READVNUMBUF64(v1, c1, step);
164       v1len -= step;
165       if ((int) key->size == v1len) {
166         return c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
167       }
168     }
169     return (int) key->size - v1len;
170   } else {
171     return rv;
172   }
173 }
174 
_kv_val_dispose(IWKV_val * v)175 IW_INLINE void _kv_val_dispose(IWKV_val *v) {
176   if (v) {
177     free(v->data);
178     v->size = 0;
179     v->data = 0;
180   }
181 }
182 
_kv_dispose(IWKV_val * key,IWKV_val * val)183 IW_INLINE void _kv_dispose(IWKV_val *key, IWKV_val *val) {
184   _kv_val_dispose(key);
185   _kv_val_dispose(val);
186 }
187 
iwkv_val_dispose(IWKV_val * v)188 void iwkv_val_dispose(IWKV_val *v) {
189   _kv_val_dispose(v);
190 }
191 
iwkv_kv_dispose(IWKV_val * key,IWKV_val * val)192 void iwkv_kv_dispose(IWKV_val *key, IWKV_val *val) {
193   _kv_dispose(key, val);
194 }
195 
_num2lebuf(uint8_t buf[static8],void * numdata,size_t sz)196 IW_INLINE void _num2lebuf(uint8_t buf[static 8], void *numdata, size_t sz) {
197   assert(sz == 4 || sz == 8);
198   if (sz > 4) {
199     uint64_t llv;
200     memcpy(&llv, numdata, sizeof(llv));
201     llv = IW_HTOILL(llv);
202     memcpy(buf, &llv, sizeof(llv));
203   } else {
204     uint32_t lv;
205     memcpy(&lv, numdata, sizeof(lv));
206     lv = IW_HTOIL(lv);
207     memcpy(buf, &lv, sizeof(lv));
208   }
209 }
210 
211 //-------------------------- IWKV/IWDB WORKERS
212 
_iwkv_worker_inc_nolk(IWKV iwkv)213 static WUR iwrc _iwkv_worker_inc_nolk(IWKV iwkv) {
214   if (!iwkv || !iwkv->open) {
215     return IW_ERROR_INVALID_STATE;
216   }
217   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
218   if (rci) {
219     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
220   }
221   if (!iwkv->open) { // -V547
222     pthread_mutex_unlock(&iwkv->wk_mtx);
223     return IW_ERROR_INVALID_STATE;
224   }
225   while (iwkv->wk_pending_exclusive) {
226     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
227   }
228   ++iwkv->wk_count;
229   pthread_cond_broadcast(&iwkv->wk_cond);
230   pthread_mutex_unlock(&iwkv->wk_mtx);
231   return 0;
232 }
233 
_db_worker_inc_nolk(IWDB db)234 static WUR iwrc _db_worker_inc_nolk(IWDB db) {
235   if (!db || !db->iwkv || !db->iwkv->open || !db->open) {
236     return IW_ERROR_INVALID_STATE;
237   }
238   IWKV iwkv = db->iwkv;
239   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
240   if (rci) {
241     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
242   }
243   if (!iwkv->open || !db->open) { // -V560
244     pthread_mutex_unlock(&iwkv->wk_mtx);
245     return IW_ERROR_INVALID_STATE;
246   }
247   while (db->wk_pending_exclusive) {
248     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
249   }
250   ++iwkv->wk_count;
251   ++db->wk_count;
252   pthread_cond_broadcast(&iwkv->wk_cond);
253   pthread_mutex_unlock(&iwkv->wk_mtx);
254   return 0;
255 }
256 
_iwkv_worker_dec_nolk(IWKV iwkv)257 static iwrc _iwkv_worker_dec_nolk(IWKV iwkv) {
258   if (!iwkv) {
259     return IW_ERROR_INVALID_STATE;
260   }
261   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
262   if (rci) {
263     // Last chanсe to be consistent
264     --iwkv->wk_count;
265     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
266   }
267   --iwkv->wk_count;
268   pthread_cond_broadcast(&iwkv->wk_cond);
269   pthread_mutex_unlock(&iwkv->wk_mtx);
270   return 0;
271 }
272 
_db_worker_dec_nolk(IWDB db)273 static iwrc _db_worker_dec_nolk(IWDB db) {
274   if (!db || !db->iwkv) { // do not use ENSURE_OPEN_DB here
275     return IW_ERROR_INVALID_STATE;
276   }
277   IWKV iwkv = db->iwkv;
278   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
279   if (rci) {
280     // Last chanсe to be consistent
281     --iwkv->wk_count;
282     --db->wk_count;
283     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
284   }
285   --iwkv->wk_count;
286   --db->wk_count;
287   pthread_cond_broadcast(&iwkv->wk_cond);
288   pthread_mutex_unlock(&iwkv->wk_mtx);
289   return 0;
290 }
291 
_wnw_iwkw_wl(IWKV iwkv)292 static WUR iwrc _wnw_iwkw_wl(IWKV iwkv) {
293   int rci = pthread_rwlock_wrlock(&iwkv->rwl);
294   if (rci) {
295     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
296   }
297   return 0;
298 }
299 
_wnw(IWKV iwkv,iwrc (* after)(IWKV iwkv))300 static WUR iwrc _wnw(IWKV iwkv, iwrc (*after)(IWKV iwkv)) {
301   iwrc rc = 0;
302   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
303   if (rci) {
304     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
305   }
306   iwkv->wk_pending_exclusive = true;
307   while (iwkv->wk_count > 0) {
308     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
309   }
310   if (after) {
311     rc = after(iwkv);
312   }
313   iwkv->wk_pending_exclusive = false;
314   pthread_cond_broadcast(&iwkv->wk_cond);
315   rci = pthread_mutex_unlock(&iwkv->wk_mtx);
316   if (rci) {
317     IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
318   }
319   return rc;
320 }
321 
_wnw_db(IWDB db,iwrc (* after)(IWDB db))322 static WUR iwrc _wnw_db(IWDB db, iwrc (*after)(IWDB db)) {
323   iwrc rc = 0;
324   IWKV iwkv = db->iwkv;
325   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
326   if (rci) {
327     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
328   }
329   db->wk_pending_exclusive = true;
330   while (db->wk_count > 0) {
331     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
332   }
333   if (after) {
334     rc = after(db);
335   }
336   db->wk_pending_exclusive = false;
337   pthread_cond_broadcast(&iwkv->wk_cond);
338   rci = pthread_mutex_unlock(&iwkv->wk_mtx);
339   if (rci) {
340     IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
341   }
342   return rc;
343 }
344 
345 //--------------------------  DB
346 
_db_at(IWKV iwkv,IWDB * dbp,off_t addr,uint8_t * mm)347 static WUR iwrc _db_at(IWKV iwkv, IWDB *dbp, off_t addr, uint8_t *mm) {
348   iwrc rc = 0;
349   uint8_t *rp, bv;
350   uint32_t lv;
351   int rci;
352   IWDB db = calloc(1, sizeof(struct _IWDB));
353   *dbp = 0;
354   if (!db) {
355     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
356   }
357   pthread_rwlockattr_t attr;
358   pthread_rwlockattr_init(&attr);
359 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
360   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
361 #endif
362   rci = pthread_rwlock_init(&db->rwl, &attr);
363   if (rci) {
364     free(db);
365     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
366   }
367   rci = pthread_spin_init(&db->cursors_slk, 0);
368   if (rci) {
369     pthread_rwlock_destroy(&db->rwl);
370     free(db);
371     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
372   }
373   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
374   db->flags = SBLK_DB;
375   db->addr = addr;
376   db->db = db;
377   db->iwkv = iwkv;
378   rp = mm + addr;
379   IW_READLV(rp, lv, lv);
380   if (lv != IWDB_MAGIC) {
381     rc = IWKV_ERROR_CORRUPTED;
382     iwlog_ecode_error3(rc);
383     goto finish;
384   }
385   IW_READBV(rp, bv, db->dbflg);
386   IW_READLV(rp, lv, db->id);
387   IW_READLV(rp, lv, db->next_db_addr);
388   db->next_db_addr = BLK2ADDR(db->next_db_addr); // blknum -> addr
389   rp = mm + addr + DOFF_C0_U4;
390   for (int i = 0; i < SLEVELS; ++i) {
391     IW_READLV(rp, lv, db->lcnt[i]);
392   }
393   if (iwkv->fmt_version >= 1) {
394     IW_READLV(rp, lv, db->meta_blk);
395     IW_READLV(rp, lv, db->meta_blkn);
396   }
397   db->open = true;
398   *dbp = db;
399 
400 finish:
401   if (rc) {
402     pthread_rwlock_destroy(&db->rwl);
403     free(db);
404   }
405   return rc;
406 }
407 
_db_save(IWDB db,bool newdb,uint8_t * mm)408 static WUR iwrc _db_save(IWDB db, bool newdb, uint8_t *mm) {
409   iwrc rc = 0;
410   uint32_t lv;
411   uint8_t *wp = mm + db->addr, bv;
412   uint8_t *sp = wp;
413   IWDLSNR *dlsnr = db->iwkv->dlsnr;
414   db->next_db_addr = db->next ? db->next->addr : 0;
415   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
416   IW_WRITELV(wp, lv, IWDB_MAGIC);
417   IW_WRITEBV(wp, bv, db->dbflg);
418   IW_WRITELV(wp, lv, db->id);
419   IW_WRITELV(wp, lv, ADDR2BLK(db->next_db_addr));
420   if (dlsnr) {
421     rc = dlsnr->onwrite(dlsnr, db->addr, sp, wp - sp, 0);
422     RCRET(rc);
423   }
424   if (db->iwkv->fmt_version >= 1) {
425     if (newdb) {
426       memset(wp, 0, 4 + SLEVELS * 4 * 2); // p0 + n[24] + c[24]
427       sp = wp;
428       wp += 4 + SLEVELS * 4 * 2; // set to zero
429     } else {
430       wp += 4 + SLEVELS * 4 * 2; // skip
431       sp = wp;
432     }
433     IW_WRITELV(wp, lv, db->meta_blk);
434     IW_WRITELV(wp, lv, db->meta_blkn);
435     if (dlsnr) {
436       rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
437     }
438   }
439   return rc;
440 }
441 
_db_load_chain(IWKV iwkv,off_t addr,uint8_t * mm)442 static WUR iwrc _db_load_chain(IWKV iwkv, off_t addr, uint8_t *mm) {
443   iwrc rc;
444   int rci;
445   IWDB db = 0, ndb;
446   if (!addr) {
447     return 0;
448   }
449   do {
450     rc = _db_at(iwkv, &ndb, addr, mm);
451     RCRET(rc);
452     if (db) {
453       db->next = ndb;
454       ndb->prev = db;
455     } else {
456       iwkv->first_db = ndb;
457     }
458     db = ndb;
459     addr = db->next_db_addr;
460     iwkv->last_db = db;
461     khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
462     if (rci != -1) {
463       kh_value(iwkv->dbs, k) = db;
464     } else {
465       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
466     }
467   } while (db->next_db_addr);
468   return rc;
469 }
470 
_db_release_lw(IWDB * dbp)471 static void _db_release_lw(IWDB *dbp) {
472   assert(dbp && *dbp);
473   IWDB db = *dbp;
474   _dbcache_destroy_lw(db);
475   pthread_rwlock_destroy(&db->rwl);
476   pthread_spin_destroy(&db->cursors_slk);
477   free(db);
478   *dbp = 0;
479 }
480 
481 typedef struct DISPOSE_DB_CTX {
482   IWKV   iwkv;
483   IWDB   db;
484   blkn_t sbn; // First `SBLK` block in DB
485 } DISPOSE_DB_CTX;
486 
_db_dispose_chain(DISPOSE_DB_CTX * dctx)487 static iwrc _db_dispose_chain(DISPOSE_DB_CTX *dctx) {
488   iwrc rc = 0;
489   uint8_t *mm, kvszpow;
490   IWFS_FSM *fsm = &dctx->iwkv->fsm;
491   blkn_t sbn = dctx->sbn, kvblkn;
492   off_t page = 0;
493 
494   while (sbn) {
495     off_t sba = BLK2ADDR(sbn);
496     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
497     RCBREAK(rc);
498     memcpy(&kvblkn, mm + sba + SOFF_KBLK_U4, 4);
499     kvblkn = IW_ITOHL(kvblkn);
500     memcpy(&sbn, mm + sba + SOFF_N0_U4, 4);
501     sbn = IW_ITOHL(sbn);
502     if (kvblkn) {
503       memcpy(&kvszpow, mm + BLK2ADDR(kvblkn) + KBLK_SZPOW_OFF, 1);
504     }
505     if (dctx->iwkv->fmt_version > 1) {
506       uint8_t bpos;
507       memcpy(&bpos, mm + sba + SOFF_BPOS_U1_V2, 1);
508       rc = fsm->release_mmap(fsm);
509       RCBREAK(rc);
510       if ((bpos > 0) && (bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
511         off_t npage = sba - (bpos - 1) * SBLK_SZ;
512         if (npage != page) {
513           if (page) {
514             if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
515               rc = fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2);
516             }
517             RCBREAK(rc);
518           }
519           page = npage;
520         }
521       }
522     } else {
523       rc = fsm->release_mmap(fsm);
524       RCBREAK(rc);
525       // Deallocate `SBLK`
526       rc = fsm->deallocate(fsm, sba, SBLK_SZ);
527       RCBREAK(rc);
528     }
529     // Deallocate `KVBLK`
530     if (kvblkn) {
531       rc = fsm->deallocate(fsm, BLK2ADDR(kvblkn), 1ULL << kvszpow);
532       RCBREAK(rc);
533     }
534   }
535   if (page) {
536     if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
537       IWRC(fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2), rc);
538     }
539   }
540   _db_release_lw(&dctx->db);
541   return rc;
542 }
543 
_db_destroy_lw(IWDB * dbp)544 static WUR iwrc _db_destroy_lw(IWDB *dbp) {
545   iwrc rc;
546   uint8_t *mm;
547   IWDB db = *dbp;
548   IWKV iwkv = db->iwkv;
549   IWDB prev = db->prev;
550   IWDB next = db->next;
551   IWFS_FSM *fsm = &iwkv->fsm;
552   uint32_t first_sblkn;
553 
554   khiter_t k = kh_get(DBS, iwkv->dbs, db->id);
555   if (k == kh_end(iwkv->dbs)) {
556     iwlog_ecode_error3(IW_ERROR_INVALID_STATE);
557     return IW_ERROR_INVALID_STATE;
558   }
559   kh_del(DBS, iwkv->dbs, k);
560 
561   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
562   RCRET(rc);
563   if (prev) {
564     prev->next = next;
565     rc = _db_save(prev, false, mm);
566     if (rc) {
567       fsm->release_mmap(fsm);
568       return rc;
569     }
570   }
571   if (next) {
572     next->prev = prev;
573     rc = _db_save(next, false, mm);
574     if (rc) {
575       fsm->release_mmap(fsm);
576       return rc;
577     }
578   }
579   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
580   memcpy(&first_sblkn, mm + db->addr + DOFF_N0_U4, 4);
581   first_sblkn = IW_ITOHL(first_sblkn);
582   fsm->release_mmap(fsm);
583 
584   if (iwkv->first_db && (iwkv->first_db->addr == db->addr)) {
585     uint64_t llv;
586     db->iwkv->first_db = next;
587     llv = next ? (uint64_t) next->addr : 0;
588     llv = IW_HTOILL(llv);
589     rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
590   }
591   if (iwkv->last_db && (iwkv->last_db->addr == db->addr)) {
592     iwkv->last_db = prev;
593   }
594   // Cleanup DB
595   off_t db_addr = db->addr;
596   blkn_t meta_blk = db->meta_blk;
597   blkn_t meta_blkn = db->meta_blkn;
598   db->open = false;
599 
600   DISPOSE_DB_CTX dctx = {
601     .sbn  = first_sblkn,
602     .iwkv = iwkv,
603     .db   = db
604   };
605   IWRC(_db_dispose_chain(&dctx), rc);
606   if (meta_blk && meta_blkn) {
607     IWRC(fsm->deallocate(fsm, BLK2ADDR(meta_blk), BLK2ADDR(meta_blkn)), rc);
608   }
609   IWRC(fsm->deallocate(fsm, db_addr, DB_SZ), rc);
610   return rc;
611 }
612 
_db_create_lw(IWKV iwkv,dbid_t dbid,iwdb_flags_t dbflg,IWDB * odb)613 static WUR iwrc _db_create_lw(IWKV iwkv, dbid_t dbid, iwdb_flags_t dbflg, IWDB *odb) {
614   iwrc rc;
615   int rci;
616   uint8_t *mm = 0;
617   off_t baddr = 0, blen;
618   IWFS_FSM *fsm = &iwkv->fsm;
619   *odb = 0;
620   IWDB db = calloc(1, sizeof(struct _IWDB));
621   if (!db) {
622     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
623   }
624   pthread_rwlockattr_t attr;
625   pthread_rwlockattr_init(&attr);
626 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
627   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
628 #endif
629   rci = pthread_rwlock_init(&db->rwl, &attr);
630   if (rci) {
631     free(db);
632     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
633   }
634   rci = pthread_spin_init(&db->cursors_slk, 0);
635   if (rci) {
636     pthread_rwlock_destroy(&db->rwl);
637     free(db);
638     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
639   }
640   rc = fsm->allocate(fsm, DB_SZ, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
641   if (rc) {
642     _db_release_lw(&db);
643     return rc;
644   }
645   db->iwkv = iwkv;
646   db->dbflg = dbflg;
647   db->addr = baddr;
648   db->id = dbid;
649   db->prev = iwkv->last_db;
650   if (!iwkv->first_db) {
651     uint64_t llv;
652     iwkv->first_db = db;
653     llv = (uint64_t) db->addr;
654     llv = IW_HTOILL(llv);
655     rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
656   } else if (iwkv->last_db) {
657     iwkv->last_db->next = db;
658   }
659   iwkv->last_db = db;
660   khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
661   if (rci != -1) {
662     kh_value(iwkv->dbs, k) = db;
663   } else {
664     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
665     goto finish;
666   }
667   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
668   RCGO(rc, finish);
669   rc = _db_save(db, true, mm);
670   RCGO(rc, finish);
671   if (db->prev) {
672     rc = _db_save(db->prev, false, mm);
673     RCGO(rc, finish);
674   }
675   db->open = true;
676   *odb = db;
677 
678 finish:
679   if (mm) {
680     fsm->release_mmap(fsm);
681   }
682   if (rc) {
683     fsm->deallocate(fsm, baddr, blen);
684     _db_release_lw(&db);
685   }
686   return rc;
687 }
688 
689 //--------------------------  KVBLK
690 
_kvblk_create(IWLCTX * lx,off_t baddr,uint8_t kvbpow,KVBLK ** oblk)691 IW_INLINE void _kvblk_create(IWLCTX *lx, off_t baddr, uint8_t kvbpow, KVBLK **oblk) {
692   KVBLK *kblk = &lx->kaa[lx->kaan];
693   kblk->db = lx->db;
694   kblk->addr = baddr;
695   kblk->maxoff = 0;
696   kblk->idxsz = 2 * IW_VNUMSIZE(0) * KVBLK_IDXNUM;
697   kblk->zidx = 0;
698   kblk->szpow = kvbpow;
699   kblk->flags = KVBLK_DURTY;
700   memset(kblk->pidx, 0, sizeof(kblk->pidx));
701   *oblk = kblk;
702   AAPOS_INC(lx->kaan);
703 }
704 
_kvblk_key_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)705 IW_INLINE WUR iwrc _kvblk_key_peek(
706   const KVBLK *kb,
707   uint8_t idx, const uint8_t *mm, uint8_t **obuf,
708   uint32_t *olen) {
709   if (kb->pidx[idx].len) {
710     uint32_t klen, step;
711     const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
712     IW_READVNUMBUF(rp, klen, step);
713     if (!klen) {
714       *obuf = 0;
715       *olen = 0;
716       iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
717       return IWKV_ERROR_CORRUPTED;
718     }
719     rp += step;
720     *obuf = (uint8_t*) rp;
721     *olen = klen;
722   } else {
723     *obuf = 0;
724     *olen = 0;
725   }
726   return 0;
727 }
728 
_kvblk_value_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)729 IW_INLINE void _kvblk_value_peek(const KVBLK *kb, uint8_t idx, const uint8_t *mm, uint8_t **obuf, uint32_t *olen) {
730   assert(idx < KVBLK_IDXNUM);
731   if (kb->pidx[idx].len) {
732     uint32_t klen, step;
733     const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
734     IW_READVNUMBUF(rp, klen, step);
735     rp += step;
736     rp += klen;
737     *obuf = (uint8_t*) rp;
738     *olen = kb->pidx[idx].len - klen - step;
739   } else {
740     *obuf = 0;
741     *olen = 0;
742   }
743 }
744 
_kvblk_key_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key)745 static WUR iwrc _kvblk_key_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key) {
746   assert(mm && idx < KVBLK_IDXNUM);
747   int32_t klen;
748   int step;
749   KVP *kvp = &kb->pidx[idx];
750   key->compound = 0;
751   if (!kvp->len) {
752     key->data = 0;
753     key->size = 0;
754     return 0;
755   }
756   // [klen:vn,key,value]
757   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
758   IW_READVNUMBUF(rp, klen, step);
759   rp += step;
760   if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
761     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
762     return IWKV_ERROR_CORRUPTED;
763   }
764   key->size = (size_t) klen;
765   if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
766     // Needed to provide enough buffer in _unpack_effective_key()
767     key->data = malloc(MAX(key->size, sizeof(int64_t)));
768   } else {
769     key->data = malloc(key->size);
770   }
771   if (!key->data) {
772     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
773   }
774   memcpy(key->data, rp, key->size);
775   return 0;
776 }
777 
_kvblk_value_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * val)778 static WUR iwrc _kvblk_value_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *val) {
779   assert(mm && idx < KVBLK_IDXNUM);
780   int32_t klen;
781   int step;
782   KVP *kvp = &kb->pidx[idx];
783   val->compound = 0;
784   if (!kvp->len) {
785     val->data = 0;
786     val->size = 0;
787     return 0;
788   }
789   // [klen:vn,key,value]
790   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
791   IW_READVNUMBUF(rp, klen, step);
792   rp += step;
793   if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
794     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
795     return IWKV_ERROR_CORRUPTED;
796   }
797   rp += klen;
798   if (kvp->len > klen + step) {
799     val->size = kvp->len - klen - step;
800     val->data = malloc(val->size);
801     if (!val->data) {
802       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
803       val->size = 0;
804       return rc;
805     }
806     memcpy(val->data, rp, val->size);
807   } else {
808     val->data = 0;
809     val->size = 0;
810   }
811   return 0;
812 }
813 
_kvblk_kv_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key,IWKV_val * val)814 static WUR iwrc _kvblk_kv_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key, IWKV_val *val) {
815   assert(mm && idx < KVBLK_IDXNUM);
816   int32_t klen;
817   int step;
818   KVP *kvp = &kb->pidx[idx];
819   key->compound = 0;
820   val->compound = 0;
821   if (!kvp->len) {
822     key->data = 0;
823     key->size = 0;
824     val->data = 0;
825     val->size = 0;
826     return 0;
827   }
828   // [klen:vn,key,value]
829   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
830   IW_READVNUMBUF(rp, klen, step);
831   rp += step;
832   if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
833     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
834     return IWKV_ERROR_CORRUPTED;
835   }
836   key->size = (size_t) klen;
837   if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
838     // Needed to provide enough buffer in _unpack_effective_key()
839     key->data = malloc(MAX(key->size, sizeof(int64_t)));
840   } else {
841     key->data = malloc(key->size);
842   }
843   if (!key->data) {
844     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
845   }
846   memcpy(key->data, rp, key->size);
847   rp += klen;
848   if (kvp->len > klen + step) {
849     val->size = kvp->len - klen - step;
850     val->data = malloc(val->size);
851     if (!val->data) {
852       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
853       free(key->data);
854       key->data = 0;
855       key->size = 0;
856       val->size = 0;
857       return rc;
858     }
859     memcpy(val->data, rp, val->size);
860   } else {
861     val->data = 0;
862     val->size = 0;
863   }
864   return 0;
865 }
866 
_kvblk_at_mm(IWLCTX * lx,off_t addr,uint8_t * mm,KVBLK * kbp,KVBLK ** blkp)867 static WUR iwrc _kvblk_at_mm(IWLCTX *lx, off_t addr, uint8_t *mm, KVBLK *kbp, KVBLK **blkp) {
868   uint8_t *rp;
869   uint16_t sv;
870   int step;
871   iwrc rc = 0;
872   KVBLK *kb = kbp ? kbp : &lx->kaa[lx->kaan];
873   kb->db = lx->db;
874   kb->addr = addr;
875   kb->maxoff = 0;
876   kb->idxsz = 0;
877   kb->zidx = -1;
878   kb->szpow = 0;
879   kb->flags = KVBLK_DEFAULT;
880   memset(kb->pidx, 0, sizeof(kb->pidx));
881 
882   *blkp = 0;
883   rp = mm + addr;
884   memcpy(&kb->szpow, rp, 1);
885   rp += 1;
886   IW_READSV(rp, sv, kb->idxsz);
887   if (IW_UNLIKELY(kb->idxsz > KVBLK_MAX_IDX_SZ)) {
888     rc = IWKV_ERROR_CORRUPTED;
889     iwlog_ecode_error3(rc);
890     goto finish;
891   }
892   for (uint8_t i = 0; i < KVBLK_IDXNUM; ++i) {
893     IW_READVNUMBUF64(rp, kb->pidx[i].off, step);
894     rp += step;
895     IW_READVNUMBUF(rp, kb->pidx[i].len, step);
896     rp += step;
897     if (kb->pidx[i].len) {
898       if (IW_UNLIKELY(!kb->pidx[i].off)) {
899         rc = IWKV_ERROR_CORRUPTED;
900         iwlog_ecode_error3(rc);
901         goto finish;
902       }
903       if (kb->pidx[i].off > kb->maxoff) {
904         kb->maxoff = kb->pidx[i].off;
905       }
906     } else if (kb->zidx < 0) {
907       kb->zidx = i;
908     }
909     kb->pidx[i].ridx = i;
910   }
911   *blkp = kb;
912   assert(rp - (mm + addr) <= (1ULL << kb->szpow));
913   if (!kbp) {
914     AAPOS_INC(lx->kaan);
915   }
916 
917 finish:
918   return rc;
919 }
920 
_kvblk_compacted_offset(KVBLK * kb)921 IW_INLINE off_t _kvblk_compacted_offset(KVBLK *kb) {
922   off_t coff = 0;
923   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
924     coff += kb->pidx[i].len;
925   }
926   return coff;
927 }
928 
_kvblk_compacted_dsize(KVBLK * kb)929 IW_INLINE off_t _kvblk_compacted_dsize(KVBLK *kb) {
930   off_t coff = KVBLK_HDRSZ;
931   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
932     coff += kb->pidx[i].len;
933     coff += IW_VNUMSIZE32(kb->pidx[i].len);
934     coff += IW_VNUMSIZE(kb->pidx[i].off);
935   }
936   return coff;
937 }
938 
_kvblk_sync_mm(KVBLK * kb,uint8_t * mm)939 static WUR iwrc _kvblk_sync_mm(KVBLK *kb, uint8_t *mm) {
940   iwrc rc = 0;
941   if (!(kb->flags & KVBLK_DURTY)) {
942     return rc;
943   }
944   uint16_t sp;
945   uint8_t *szp;
946   uint8_t *wp = mm + kb->addr;
947   uint8_t *sptr = wp;
948   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
949   memcpy(wp, &kb->szpow, 1);
950   wp += 1;
951   szp = wp;
952   wp += sizeof(uint16_t);
953   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
954     KVP *kvp = &kb->pidx[i];
955     IW_SETVNUMBUF64(sp, wp, kvp->off);
956     wp += sp;
957     IW_SETVNUMBUF(sp, wp, kvp->len);
958     wp += sp;
959   }
960   sp = wp - szp - sizeof(uint16_t);
961   kb->idxsz = sp;
962   assert(kb->idxsz <= KVBLK_MAX_IDX_SZ);
963   sp = IW_HTOIS(sp);
964   memcpy(szp, &sp, sizeof(uint16_t));
965   assert(wp - (mm + kb->addr) <= (1ULL << kb->szpow));
966   if (dlsnr) {
967     rc = dlsnr->onwrite(dlsnr, kb->addr, sptr, wp - sptr, 0);
968   }
969   kb->flags &= ~KVBLK_DURTY;
970   return rc;
971 }
972 
973 #define _kvblk_sort_kv_lt(v1, v2, o) \
974   (((v1).off > 0 ? (v1).off : -1UL) < ((v2).off > 0 ? (v2).off : -1UL))
975 
976 // -V:KSORT_INIT:522, 756, 769
KSORT_INIT(kvblk,KVP,_kvblk_sort_kv_lt)977 KSORT_INIT(kvblk, KVP, _kvblk_sort_kv_lt)
978 
979 static WUR iwrc _kvblk_compact_mm(KVBLK *kb, uint8_t *mm) {
980   uint8_t i;
981   off_t coff = _kvblk_compacted_offset(kb);
982   if (coff == kb->maxoff) { // compacted
983     return 0;
984   }
985   KVP tidx[KVBLK_IDXNUM];
986   KVP tidx_tmp[KVBLK_IDXNUM];
987   iwrc rc = 0;
988   uint16_t idxsiz = 0;
989   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
990   off_t blkend = kb->addr + (1ULL << kb->szpow);
991   uint8_t *wp = mm + blkend;
992   memcpy(tidx, kb->pidx, sizeof(tidx));
993   ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
994 
995   coff = 0;
996   for (i = 0; i < KVBLK_IDXNUM && tidx[i].off; ++i) {
997 #ifndef NDEBUG
998     if (i > 0) {
999       assert(tidx[i - 1].off < tidx[i].off);
1000     }
1001 #endif
1002     KVP *kvp = &kb->pidx[tidx[i].ridx];
1003     off_t noff = coff + kvp->len;
1004     if (kvp->off > noff) {
1005       assert(noff <= (1ULL << kb->szpow) && kvp->len <= noff);
1006       if (dlsnr) {
1007         rc = dlsnr->onwrite(dlsnr, blkend - noff, wp - kvp->off, kvp->len, 0);
1008       }
1009       memmove(wp - noff, wp - kvp->off, kvp->len);
1010       kvp->off = noff;
1011     }
1012     coff += kvp->len;
1013     idxsiz += IW_VNUMSIZE(kvp->off);
1014     idxsiz += IW_VNUMSIZE32(kvp->len);
1015   }
1016   idxsiz += (KVBLK_IDXNUM - i) * 2;
1017   for (i = 0; i < KVBLK_IDXNUM; ++i) {
1018     if (!kb->pidx[i].len) {
1019       kb->zidx = i;
1020       break;
1021     }
1022   }
1023   assert(idxsiz <= kb->idxsz);
1024   kb->idxsz = idxsiz;
1025   kb->maxoff = coff;
1026   if (i == KVBLK_IDXNUM) {
1027     kb->zidx = -1;
1028   }
1029   kb->flags |= KVBLK_DURTY;
1030   assert(_kvblk_compacted_offset(kb) == kb->maxoff);
1031   return rc;
1032 }
1033 
_kvblk_maxkvoff(KVBLK * kb)1034 IW_INLINE off_t _kvblk_maxkvoff(KVBLK *kb) {
1035   off_t off = 0;
1036   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1037     if (kb->pidx[i].off > off) {
1038       off = kb->pidx[i].off;
1039     }
1040   }
1041   return off;
1042 }
1043 
_kvblk_rmkv(KVBLK * kb,uint8_t idx,kvblk_rmkv_opts_t opts)1044 static WUR iwrc _kvblk_rmkv(KVBLK *kb, uint8_t idx, kvblk_rmkv_opts_t opts) {
1045   iwrc rc = 0;
1046   uint8_t *mm = 0;
1047   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1048   IWFS_FSM *fsm = &kb->db->iwkv->fsm;
1049   if (kb->pidx[idx].off >= kb->maxoff) {
1050     kb->maxoff = 0;
1051     for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1052       if ((i != idx) && (kb->pidx[i].off > kb->maxoff)) {
1053         kb->maxoff = kb->pidx[i].off;
1054       }
1055     }
1056   }
1057   kb->pidx[idx].len = 0;
1058   kb->pidx[idx].off = 0;
1059   kb->flags |= KVBLK_DURTY;
1060   if ((kb->zidx < 0) || (idx < kb->zidx)) {
1061     kb->zidx = idx;
1062   }
1063   if (!(RMKV_NO_RESIZE & opts) && (kb->szpow > KVBLK_INISZPOW)) {
1064     off_t nlen = 1ULL << kb->szpow;
1065     off_t dsz = _kvblk_compacted_dsize(kb);
1066     if (nlen >= 2 * dsz) {
1067       uint8_t npow = kb->szpow - 1;
1068       while (npow > KVBLK_INISZPOW && (1ULL << (npow - 1)) >= dsz) {
1069         --npow;
1070       }
1071       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1072       RCGO(rc, finish);
1073 
1074       rc = _kvblk_compact_mm(kb, mm);
1075       RCGO(rc, finish);
1076 
1077       off_t maxoff = _kvblk_maxkvoff(kb);
1078       if (dlsnr) {
1079         rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << npow) - maxoff, mm + kb->addr + nlen - maxoff, maxoff, 0);
1080         RCGO(rc, finish);
1081       }
1082       memmove(mm + kb->addr + (1ULL << npow) - maxoff,
1083               mm + kb->addr + nlen - maxoff,
1084               (size_t) maxoff);
1085 
1086       fsm->release_mmap(fsm);
1087       mm = 0;
1088       rc = fsm->reallocate(fsm, (1ULL << npow), &kb->addr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1089       RCGO(rc, finish);
1090       kb->szpow = npow;
1091       assert(nlen == (1ULL << kb->szpow));
1092       opts |= RMKV_SYNC;
1093     }
1094   }
1095   if (RMKV_SYNC & opts) {
1096     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1097     RCGO(rc, finish);
1098     IWRC(_kvblk_sync_mm(kb, mm), rc);
1099   }
1100 
1101 finish:
1102   if (mm) {
1103     fsm->release_mmap(fsm);
1104   }
1105   return rc;
1106 }
1107 
_kvblk_addkv(KVBLK * kb,const IWKV_val * key,const IWKV_val * val,uint8_t * oidx,bool raw_key)1108 static WUR iwrc _kvblk_addkv(
1109   KVBLK          *kb,
1110   const IWKV_val *key,
1111   const IWKV_val *val,
1112   uint8_t        *oidx,
1113   bool            raw_key) {
1114   *oidx = 0;
1115 
1116   iwrc rc = 0;
1117   off_t msz;    // max available free space
1118   off_t rsz;    // required size to add new key/value pair
1119   off_t noff;   // offset of new kvpair from end of block
1120   uint8_t *mm, *wp, *sptr;
1121   size_t i, sp;
1122   KVP *kvp;
1123   IWDB db = kb->db;
1124   bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1125   IWFS_FSM *fsm = &db->iwkv->fsm;
1126   bool compacted = false;
1127   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1128   IWKV_val *uval = (IWKV_val*) val;
1129 
1130   size_t ksize = key->size;
1131   if (compound) {
1132     ksize += IW_VNUMSIZE(key->compound);
1133   }
1134   off_t psz = IW_VNUMSIZE(ksize) + ksize;
1135 
1136   if (kb->zidx < 0) {
1137     return _IWKV_RC_KVBLOCK_FULL;
1138   }
1139   psz += uval->size;
1140   if (psz > IWKV_MAX_KVSZ) {
1141     return IWKV_ERROR_MAXKVSZ;
1142   }
1143 
1144 start:
1145   // [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] // KVBLK
1146   msz = (1ULL << kb->szpow) - (KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1147   assert(msz >= 0);
1148   noff = kb->maxoff + psz;
1149   rsz = psz + IW_VNUMSIZE(noff) + IW_VNUMSIZE(psz);
1150 
1151   if (msz < rsz) { // not enough space
1152     if (!compacted) {
1153       compacted = true;
1154       if (_kvblk_compacted_offset(kb) != kb->maxoff) {
1155         rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1156         RCGO(rc, finish);
1157         rc = _kvblk_compact_mm(kb, mm);
1158         RCGO(rc, finish);
1159         fsm->release_mmap(fsm);
1160         goto start;
1161       }
1162     }
1163     // resize the whole block
1164     off_t nlen = 1ULL << kb->szpow;
1165     off_t nsz = rsz - msz + nlen;
1166     off_t naddr = kb->addr;
1167     off_t olen = nlen;
1168 
1169     uint8_t npow = kb->szpow;
1170     while ((1ULL << ++npow) < nsz);
1171 
1172     rc = fsm->allocate(fsm, (1ULL << npow), &naddr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1173     RCGO(rc, finish);
1174     assert(nlen == (1ULL << npow));
1175     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1176     RCGO(rc, finish);
1177     if (dlsnr) {
1178       rc = dlsnr->onwrite(dlsnr, naddr, mm + kb->addr, KVBLK_HDRSZ, 0);
1179       RCGO(rc, finish);
1180       memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1181       rc = dlsnr->onwrite(dlsnr, naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, kb->maxoff, 0);
1182       RCGO(rc, finish);
1183       memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1184     } else {
1185       memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1186       memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1187     }
1188     fsm->release_mmap(fsm);
1189     rc = fsm->deallocate(fsm, kb->addr, olen);
1190     RCGO(rc, finish);
1191 
1192     kb->addr = naddr;
1193     kb->szpow = npow;
1194   }
1195   *oidx = (uint8_t) kb->zidx;
1196   kvp = &kb->pidx[kb->zidx];
1197   kvp->len = (uint32_t) psz;
1198   kvp->off = noff;
1199   kvp->ridx = (uint8_t) kb->zidx;
1200   kb->maxoff = noff;
1201   kb->flags |= KVBLK_DURTY;
1202   for (i = 0; i < KVBLK_IDXNUM; ++i) {
1203     if (!kb->pidx[i].len && (i != kb->zidx)) {
1204       kb->zidx = i;
1205       break;
1206     }
1207   }
1208   if (i >= KVBLK_IDXNUM) {
1209     kb->zidx = -1;
1210   }
1211   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1212   RCGO(rc, finish);
1213   assert((1ULL << kb->szpow) >= KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1214   assert(kvp->off < (1ULL << kb->szpow) && kvp->len <= kvp->off);
1215   wp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
1216   sptr = wp;
1217   // [klen:vn,key,value]
1218   IW_SETVNUMBUF(sp, wp, ksize);
1219   wp += sp;
1220   if (compound) {
1221     IW_SETVNUMBUF64(sp, wp, key->compound);
1222     wp += sp;
1223   }
1224   memcpy(wp, key->data, key->size);
1225   wp += key->size;
1226   memcpy(wp, uval->data, uval->size);
1227   wp += uval->size;
1228 #ifndef NDEBUG
1229   assert(wp - sptr == kvp->len);
1230 #endif
1231   if (dlsnr) {
1232     rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << kb->szpow) - kvp->off, sptr, wp - sptr, 0);
1233   }
1234   fsm->release_mmap(fsm);
1235 
1236 finish:
1237   return rc;
1238 }
1239 
_kvblk_updatev(KVBLK * kb,uint8_t * idxp,const IWKV_val * key,const IWKV_val * val)1240 static WUR iwrc _kvblk_updatev(
1241   KVBLK          *kb,
1242   uint8_t        *idxp,
1243   const IWKV_val *key,                              /* Nullable */
1244   const IWKV_val *val) {
1245   assert(*idxp < KVBLK_IDXNUM);
1246   int32_t i;
1247   uint32_t len, nlen, sz;
1248   uint8_t pidx = *idxp, *mm = 0, *wp, *sp;
1249   IWDB db = kb->db;
1250   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1251   IWKV_val *uval = (IWKV_val*) val;
1252   IWKV_val *ukey = (IWKV_val*) key;
1253   IWKV_val skey; // stack allocated key/val
1254   KVP *kvp = &kb->pidx[pidx];
1255   size_t kbsz = 1ULL << kb->szpow;                            // kvblk size
1256   off_t freesz = kbsz - KVBLK_HDRSZ - kb->idxsz - kb->maxoff; // free space available
1257   IWFS_FSM *fsm = &db->iwkv->fsm;
1258 
1259   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1260   RCRET(rc);
1261   assert(freesz >= 0);
1262 
1263   wp = mm + kb->addr + kbsz - kvp->off;
1264   sp = wp;
1265   IW_READVNUMBUF(wp, len, sz);
1266   wp += sz;
1267   if (ukey && (len != ukey->size)) {
1268     rc = IWKV_ERROR_CORRUPTED;
1269     iwlog_ecode_error3(rc);
1270     goto finish;
1271   }
1272   wp += len;
1273   off_t rsize = sz + len + uval->size; // required size
1274   if (rsize <= kvp->len) {
1275     memcpy(wp, uval->data, uval->size);
1276     if (dlsnr) {
1277       rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1278       RCGO(rc, finish);
1279     }
1280     wp += uval->size;
1281     if ((wp - sp) != kvp->len) {
1282       kvp->len = wp - sp;
1283       kb->flags |= KVBLK_DURTY;
1284     }
1285   } else {
1286     KVP tidx[KVBLK_IDXNUM];
1287     KVP tidx_tmp[KVBLK_IDXNUM];
1288     off_t koff = kb->pidx[pidx].off;
1289     memcpy(tidx, kb->pidx, KVBLK_IDXNUM * sizeof(kb->pidx[0]));
1290     ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
1291     kb->flags |= KVBLK_DURTY;
1292     if (!ukey) { // we need a key
1293       ukey = &skey;
1294       rc = _kvblk_key_get(kb, mm, pidx, ukey);
1295       RCGO(rc, finish);
1296     }
1297     for (i = 0; i < KVBLK_IDXNUM; ++i) {
1298       if (tidx[i].off == koff) {
1299         if (koff - ((i > 0) ? tidx[i - 1].off : 0) >= rsize) {
1300           nlen = wp + uval->size - sp;
1301           if (!((nlen > kvp->len) && (freesz - IW_VNUMSIZE32(nlen) + IW_VNUMSIZE32(kvp->len) < 0))) { // enough space?
1302             memcpy(wp, uval->data, uval->size);
1303             if (dlsnr) {
1304               rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1305               RCGO(rc, finish);
1306             }
1307             wp += uval->size;
1308             kvp->len = nlen;
1309             break;
1310             ;
1311           }
1312         }
1313         mm = 0;
1314         fsm->release_mmap(fsm);
1315         rc = _kvblk_rmkv(kb, pidx, RMKV_NO_RESIZE);
1316         RCGO(rc, finish);
1317         rc = _kvblk_addkv(kb, ukey, uval, idxp, false);
1318         break;
1319       }
1320     }
1321   }
1322 
1323 finish:
1324   if (ukey != key) {
1325     _kv_val_dispose(ukey);
1326   }
1327   if (mm) {
1328     IWRC(fsm->release_mmap(fsm), rc);
1329   }
1330   return rc;
1331 }
1332 
1333 //--------------------------  SBLK
1334 
_sblk_release(IWLCTX * lx,SBLK ** sblkp)1335 IW_INLINE void _sblk_release(IWLCTX *lx, SBLK **sblkp) {
1336   assert(sblkp && *sblkp);
1337   SBLK *sblk = *sblkp;
1338   sblk->flags &= ~SBLK_CACHE_FLAGS; // clear cache flags
1339   sblk->flags &= ~SBLK_DURTY;       // clear dirty flag
1340   sblk->kvblk = 0;
1341   *sblkp = 0;
1342 }
1343 
_sblk_loadkvblk_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1344 IW_INLINE WUR iwrc _sblk_loadkvblk_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1345   if (!sblk->kvblk && sblk->kvblkn) {
1346     return _kvblk_at_mm(lx, BLK2ADDR(sblk->kvblkn), mm, 0, &sblk->kvblk);
1347   } else {
1348     return 0;
1349   }
1350 }
1351 
_sblk_is_only_one_on_page_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * page_addr)1352 static bool _sblk_is_only_one_on_page_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *page_addr) {
1353   *page_addr = 0;
1354   if ((sblk->bpos > 0) && (sblk->bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
1355     off_t addr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1356     *page_addr = addr;
1357     for (int i = 0; i < SBLK_PAGE_SBLK_NUM_V2; ++i) {
1358       if (i != sblk->bpos - 1) {
1359         uint8_t bv;
1360         memcpy(&bv, mm + addr + i * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1361         if (bv) {
1362           return false;
1363         }
1364       }
1365     }
1366   } else {
1367     return false; // be safe
1368   }
1369   return true;
1370 }
1371 
_sblk_destroy(IWLCTX * lx,SBLK ** sblkp)1372 IW_INLINE WUR iwrc _sblk_destroy(IWLCTX *lx, SBLK **sblkp) {
1373   assert(sblkp && *sblkp && (*sblkp)->addr);
1374   iwrc rc = 0;
1375   SBLK *sblk = *sblkp;
1376   lx->destroy_addr = sblk->addr;
1377 
1378   if (!(sblk->flags & SBLK_DB)) {
1379     uint8_t kvb_szpow, *mm;
1380     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1381     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1382     off_t kvb_addr = BLK2ADDR(sblk->kvblkn);
1383     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1384     RCRET(rc);
1385 
1386     if (!sblk->kvblk) {
1387       // Read KVBLK size as power of two
1388       memcpy(&kvb_szpow, mm + kvb_addr + KBLK_SZPOW_OFF, 1);
1389     } else {
1390       kvb_szpow = sblk->kvblk->szpow;
1391     }
1392     if (lx->db->lcnt[sblk->lvl]) {
1393       lx->db->lcnt[sblk->lvl]--;
1394       lx->db->flags |= SBLK_DURTY;
1395     }
1396     _dbcache_remove_lw(lx, sblk);
1397     if (lx->db->iwkv->fmt_version > 1) {
1398       off_t paddr;
1399       if (_sblk_is_only_one_on_page_v2(lx, mm, sblk, &paddr)) {
1400         fsm->release_mmap(fsm);
1401         // Deallocate whole page
1402         rc = fsm->deallocate(fsm, paddr, SBLK_PAGE_SZ_V2);
1403       } else {
1404         memset(mm + sblk->addr + SOFF_BPOS_U1_V2, 0, 1);
1405         fsm->release_mmap(fsm);
1406         if (dlsnr) {
1407           dlsnr->onset(dlsnr, sblk->addr + SOFF_BPOS_U1_V2, 0, 1, 0);
1408         }
1409       }
1410     } else {
1411       fsm->release_mmap(fsm);
1412       rc = fsm->deallocate(fsm, sblk->addr, SBLK_SZ);
1413     }
1414     IWRC(fsm->deallocate(fsm, kvb_addr, 1ULL << kvb_szpow), rc);
1415   }
1416   _sblk_release(lx, sblkp);
1417   return rc;
1418 }
1419 
_sblk_genlevel(IWDB db)1420 IW_INLINE uint8_t _sblk_genlevel(IWDB db) {
1421   uint8_t lvl;
1422 #ifdef IW_TESTS
1423   if (iwkv_next_level >= 0) {
1424     lvl = (uint8_t) iwkv_next_level;
1425     iwkv_next_level = -1;
1426     assert(lvl < SLEVELS);
1427     return lvl;
1428   }
1429 #endif
1430   uint32_t r = iwu_rand_u32();
1431   for (lvl = 0; lvl < SLEVELS && !(r & 1); ++lvl) r >>= 1;
1432   uint8_t ret = IW_UNLIKELY(lvl >= SLEVELS) ? SLEVELS - 1 : lvl;
1433   while (ret > 0 && db->lcnt[ret - 1] == 0) {
1434     --ret;
1435   }
1436   return ret;
1437 }
1438 
_sblk_create_v1(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,off_t baddr,uint8_t bpos,SBLK ** oblk)1439 static WUR iwrc _sblk_create_v1(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, off_t baddr, uint8_t bpos, SBLK **oblk) {
1440   iwrc rc;
1441   SBLK *sblk;
1442   KVBLK *kvblk;
1443   off_t blen;
1444   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1445   if (kvbpow < KVBLK_INISZPOW) {
1446     kvbpow = KVBLK_INISZPOW;
1447   }
1448   *oblk = 0;
1449   if (!bpos) {
1450     rc = fsm->allocate(fsm, SBLK_SZ + (1ULL << kvbpow), &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1451     RCRET(rc);
1452     assert(blen - SBLK_SZ == (1ULL << kvbpow));
1453     _kvblk_create(lx, baddr + SBLK_SZ, kvbpow, &kvblk);
1454   } else {
1455     // Allocate kvblk as separate chunk
1456     off_t kblkaddr = 0;
1457     rc = fsm->allocate(fsm, (1ULL << kvbpow), &kblkaddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1458     assert(blen == (1ULL << kvbpow));
1459     _kvblk_create(lx, kblkaddr, kvbpow, &kvblk);
1460   }
1461   sblk = &lx->saa[lx->saan];
1462   sblk->db = lx->db;
1463   sblk->db->lcnt[nlevel]++;
1464   sblk->db->flags |= SBLK_DURTY;
1465   sblk->addr = baddr;
1466   sblk->flags = (SBLK_DURTY | SBLK_CACHE_PUT);
1467   sblk->lvl = nlevel;
1468   sblk->p0 = 0;
1469   memset(sblk->n, 0, sizeof(sblk->n));
1470   sblk->kvblk = kvblk;
1471   sblk->kvblkn = ADDR2BLK(kvblk->addr);
1472   sblk->lkl = 0;
1473   sblk->pnum = 0;
1474   sblk->bpos = bpos;
1475   memset(sblk->pi, 0, sizeof(sblk->pi));
1476   *oblk = sblk;
1477   AAPOS_INC(lx->saan);
1478   return 0;
1479 }
1480 
_sblk_find_free_page_slot_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * obaddr,uint8_t * oslot)1481 static void _sblk_find_free_page_slot_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *obaddr, uint8_t *oslot) {
1482   if ((sblk->bpos < 1) || (sblk->bpos > SBLK_PAGE_SBLK_NUM_V2)) {
1483     *obaddr = 0;
1484     *oslot = 0;
1485     return;
1486   }
1487   off_t paddr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1488   for (int i = sblk->bpos + 1; i <= SBLK_PAGE_SBLK_NUM_V2; ++i) {
1489     uint8_t slot;
1490     memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1491     if (!slot) {
1492       *obaddr = paddr + (i - 1) * SBLK_SZ;
1493       *oslot = i;
1494       return;
1495     }
1496   }
1497   for (int i = sblk->bpos - 1; i > 0; --i) {
1498     uint8_t slot;
1499     memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1500     if (!slot) {
1501       *obaddr = paddr + (i - 1) * SBLK_SZ;
1502       *oslot = i;
1503       return;
1504     }
1505   }
1506   *obaddr = 0;
1507   *oslot = 0;
1508 }
1509 
1510 /// Create
_sblk_create_v2(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1511 static WUR iwrc _sblk_create_v2(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1512   off_t baddr = 0;
1513   uint8_t bpos = 0, *mm;
1514   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1515   SBLK *_lower = lower;
1516   SBLK *_upper = upper;
1517 
1518   for (int i = SLEVELS - 1; i >= 0; --i) {
1519     if (lx->pupper[i] && (lx->pupper[i]->lvl >= nlevel)) {
1520       _upper = lx->pupper[i];
1521     }
1522     if (lx->plower[i] && (lx->plower[i]->lvl >= nlevel)) {
1523       _lower = lx->plower[i];
1524     }
1525   }
1526 
1527   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1528   RCRET(rc);
1529   _sblk_find_free_page_slot_v2(lx, mm, _lower, &baddr, &bpos);
1530   if (!baddr && _upper && (_upper->addr != _lower->addr)) {
1531     _sblk_find_free_page_slot_v2(lx, mm, _upper, &baddr, &bpos);
1532   }
1533   if (!baddr) {
1534     if (_lower->addr != lower->addr) {
1535       _sblk_find_free_page_slot_v2(lx, mm, lower, &baddr, &bpos);
1536     }
1537     if (!baddr && upper && _upper && (_upper->addr != upper->addr)) {
1538       _sblk_find_free_page_slot_v2(lx, mm, upper, &baddr, &bpos);
1539     }
1540   }
1541   fsm->release_mmap(fsm);
1542 
1543   if (!baddr) {
1544     // No free slots - allocate new SBLK page
1545     off_t blen;
1546     bpos = 1;
1547     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1548     rc = fsm->allocate(fsm, SBLK_PAGE_SZ_V2, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1549     RCRET(rc);
1550     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1551     RCRET(rc);
1552     // Fill page to zero
1553     memset(mm + baddr, 0, blen);
1554     if (dlsnr) {
1555       rc = dlsnr->onset(dlsnr, baddr, 0, blen, 0);
1556     }
1557     fsm->release_mmap(fsm);
1558     RCRET(rc);
1559   }
1560   return _sblk_create_v1(lx, nlevel, kvbpow, baddr, bpos, oblk);
1561 }
1562 
_sblk_create(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1563 IW_INLINE WUR iwrc _sblk_create(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1564   if (lx->db->iwkv->fmt_version > 1) {
1565     return _sblk_create_v2(lx, nlevel, kvbpow, lower, upper, oblk);
1566   } else {
1567     return _sblk_create_v1(lx, nlevel, kvbpow, lower->addr, 0, oblk);
1568   }
1569 }
1570 
_sblk_at2(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK * sblk)1571 static WUR iwrc _sblk_at2(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK *sblk) {
1572   iwrc rc;
1573   uint8_t *mm;
1574   uint32_t lv;
1575   sblk_flags_t flags = lx->sbflags | flgs;
1576   IWDB db = lx->db;
1577   IWFS_FSM *fsm = &db->iwkv->fsm;
1578   sblk->kvblk = 0;
1579   sblk->bpos = 0;
1580   sblk->db = db;
1581 
1582   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1583   RCRET(rc);
1584 
1585   if (IW_UNLIKELY(addr == db->addr)) {
1586     uint8_t *rp = mm + addr + DOFF_N0_U4;
1587     // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1588     sblk->addr = addr;
1589     sblk->flags = SBLK_DB | flags;
1590     sblk->lvl = 0;
1591     sblk->p0 = 0;
1592     sblk->kvblkn = 0;
1593     sblk->lkl = 0;
1594     sblk->pnum = KVBLK_IDXNUM;
1595     memset(sblk->pi, 0, sizeof(sblk->pi));
1596     for (int i = 0; i < SLEVELS; ++i) {
1597       IW_READLV(rp, lv, sblk->n[i]);
1598       if (sblk->n[i]) {
1599         ++sblk->lvl;
1600       } else {
1601         break;
1602       }
1603     }
1604     if (sblk->lvl) {
1605       --sblk->lvl;
1606     }
1607   } else if (addr) {
1608     uint8_t uflags;
1609     uint8_t *rp = mm + addr;
1610     sblk->addr = addr;
1611     // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
1612     memcpy(&uflags, rp++, 1);
1613     sblk->flags = uflags;
1614     if (sblk->flags & ~SBLK_PERSISTENT_FLAGS) {
1615       rc = IWKV_ERROR_CORRUPTED;
1616       iwlog_ecode_error3(rc);
1617       goto finish;
1618     }
1619     sblk->flags |= flags;
1620     memcpy(&sblk->lvl, rp++, 1);
1621     if (sblk->lvl >= SLEVELS) {
1622       rc = IWKV_ERROR_CORRUPTED;
1623       iwlog_ecode_error3(rc);
1624       goto finish;
1625     }
1626     memcpy(&sblk->lkl, rp++, 1);
1627     if (sblk->lkl > db->iwkv->pklen) {
1628       rc = IWKV_ERROR_CORRUPTED;
1629       iwlog_ecode_error3(rc);
1630       goto finish;
1631     }
1632     memcpy(&sblk->pnum, rp++, 1);
1633     if (sblk->pnum < 0) {
1634       rc = IWKV_ERROR_CORRUPTED;
1635       iwlog_ecode_error3(rc);
1636       goto finish;
1637     }
1638     memcpy(&sblk->p0, rp, 4);
1639     sblk->p0 = IW_ITOHL(sblk->p0);
1640     rp += 4;
1641     memcpy(&sblk->kvblkn, rp, 4);
1642     sblk->kvblkn = IW_ITOHL(sblk->kvblkn);
1643     rp += 4;
1644     memcpy(sblk->pi, rp, KVBLK_IDXNUM);
1645     rp += KVBLK_IDXNUM;
1646 
1647 #ifdef IW_BIGENDIAN
1648     for (int i = 0; i <= sblk->lvl; ++i) {
1649       memcpy(&sblk->n[i], rp, 4);
1650       sblk->n[i] = IW_ITOHL(sblk->n[i]);
1651       rp += 4;
1652     }
1653 #else
1654    memcpy(sblk->n, rp, 4 * (sblk->lvl + 1));
1655    rp += 4 * (sblk->lvl + 1);
1656 #endif
1657     if (db->iwkv->fmt_version > 1) {
1658       rp = mm + addr + SOFF_BPOS_U1_V2;
1659       memcpy(&sblk->bpos, rp++, 1);
1660     } else {
1661       rp = mm + addr + SOFF_LK_V1;
1662     }
1663     // Lower key
1664     memcpy(sblk->lk, rp, (size_t) sblk->lkl);
1665   } else { // Database tail
1666     uint8_t *rp = mm + db->addr + DOFF_P0_U4;
1667     sblk->addr = 0;
1668     sblk->flags = SBLK_DB | flags;
1669     sblk->lvl = 0;
1670     sblk->kvblkn = 0;
1671     sblk->lkl = 0;
1672     sblk->pnum = KVBLK_IDXNUM;
1673     memset(sblk->pi, 0, sizeof(sblk->pi));
1674     IW_READLV(rp, lv, sblk->p0);
1675     if (!sblk->p0) {
1676       sblk->p0 = ADDR2BLK(db->addr);
1677     }
1678   }
1679 
1680 finish:
1681   fsm->release_mmap(fsm);
1682   return rc;
1683 }
1684 
_sblk_at(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK ** sblkp)1685 IW_INLINE WUR iwrc _sblk_at(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK **sblkp) {
1686   *sblkp = 0;
1687   SBLK *sblk = &lx->saa[lx->saan];
1688   iwrc rc = _sblk_at2(lx, addr, flgs, sblk);
1689   AAPOS_INC(lx->saan);
1690   *sblkp = sblk;
1691   return rc;
1692 }
1693 
_sblk_sync_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1694 static WUR iwrc _sblk_sync_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1695   iwrc rc = 0;
1696   if (sblk->flags & SBLK_DURTY) {
1697     uint32_t lv;
1698     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1699     sblk->flags &= ~SBLK_DURTY;
1700     if (IW_UNLIKELY(sblk->flags & SBLK_DB)) {
1701       uint8_t *sp;
1702       uint8_t *wp = mm + sblk->db->addr;
1703       if (sblk->addr) {
1704         assert(sblk->addr == sblk->db->addr);
1705         wp += DOFF_N0_U4;
1706         sp = wp;
1707         // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1708         for (int i = 0; i < SLEVELS; ++i) {
1709           IW_WRITELV(wp, lv, sblk->n[i]);
1710         }
1711         assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1712         for (int i = 0; i < SLEVELS; ++i) {
1713           IW_WRITELV(wp, lv, lx->db->lcnt[i]);
1714         }
1715       } else { // Database tail
1716         wp += DOFF_P0_U4;
1717         sp = wp;
1718         IW_WRITELV(wp, lv, sblk->p0);
1719         assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1720       }
1721       if (dlsnr) {
1722         rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
1723       }
1724       return rc;
1725     } else {
1726       uint8_t *wp = mm + sblk->addr;
1727       sblk_flags_t flags = (sblk->flags & SBLK_PERSISTENT_FLAGS);
1728       uint8_t uflags = flags;
1729       assert(sblk->lkl <= lx->db->iwkv->pklen);
1730       // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256
1731       wp += SOFF_FLAGS_U1;
1732       memcpy(wp++, &uflags, 1);
1733       memcpy(wp++, &sblk->lvl, 1);
1734       memcpy(wp++, &sblk->lkl, 1);
1735       memcpy(wp++, &sblk->pnum, 1);
1736       IW_WRITELV(wp, lv, sblk->p0);
1737       IW_WRITELV(wp, lv, sblk->kvblkn);
1738       memcpy(wp, sblk->pi, KVBLK_IDXNUM);
1739       wp = mm + sblk->addr + SOFF_N0_U4;
1740 
1741 #ifdef IW_BIGENDIAN
1742       for (int i = 0; i <= sblk->lvl; ++i) {
1743         IW_WRITELV(wp, lv, sblk->n[i]);
1744       }
1745 #else
1746       memcpy(wp, sblk->n,  4 * (sblk->lvl + 1));
1747       wp += 4 * (sblk->lvl + 1);
1748 #endif
1749 
1750       if (lx->db->iwkv->fmt_version > 1) {
1751         wp = mm + sblk->addr + SOFF_BPOS_U1_V2;
1752         memcpy(wp++, &sblk->bpos, 1);
1753       } else {
1754         wp = mm + sblk->addr + SOFF_LK_V1;
1755       }
1756       memcpy(wp, sblk->lk, (size_t) sblk->lkl);
1757       if (dlsnr) {
1758         rc = dlsnr->onwrite(dlsnr, sblk->addr, mm + sblk->addr, SOFF_END, 0);
1759         RCRET(rc);
1760       }
1761     }
1762   }
1763   if (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY)) {
1764     IWRC(_kvblk_sync_mm(sblk->kvblk, mm), rc);
1765   }
1766   if (sblk->flags & SBLK_CACHE_UPDATE) {
1767     _dbcache_update_lw(lx, sblk);
1768   }
1769   return rc;
1770 }
1771 
_sblk_sync(IWLCTX * lx,SBLK * sblk)1772 IW_INLINE WUR iwrc _sblk_sync(IWLCTX *lx, SBLK *sblk) {
1773   if ((sblk->flags & SBLK_DURTY) || (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY))) {
1774     uint8_t *mm;
1775     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1776     iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1777     RCRET(rc);
1778     rc = _sblk_sync_mm(lx, sblk, mm);
1779     fsm->release_mmap(fsm);
1780     return rc;
1781   }
1782   return 0;
1783 }
1784 
_sblk_sync_and_release_mm(IWLCTX * lx,SBLK ** sblkp,uint8_t * mm)1785 IW_INLINE WUR iwrc _sblk_sync_and_release_mm(IWLCTX *lx, SBLK **sblkp, uint8_t *mm) {
1786   SBLK *sblk = *sblkp;
1787   if (lx->destroy_addr && (lx->destroy_addr == sblk->addr)) {
1788     return 0;
1789   }
1790   iwrc rc = 0;
1791   if (mm) {
1792     rc = _sblk_sync_mm(lx, *sblkp, mm);
1793   }
1794   _sblk_release(lx, sblkp);
1795   return rc;
1796 }
1797 
_sblk_find_pi_mm(SBLK * sblk,IWLCTX * lx,const uint8_t * mm,bool * found,uint8_t * idxp)1798 static WUR iwrc _sblk_find_pi_mm(SBLK *sblk, IWLCTX *lx, const uint8_t *mm, bool *found, uint8_t *idxp) {
1799   *found = false;
1800   if (sblk->flags & SBLK_DB) {
1801     *idxp = KVBLK_IDXNUM;
1802     return 0;
1803   }
1804   uint8_t *k;
1805   uint32_t kl;
1806   int idx = 0, lb = 0, ub = sblk->pnum - 1;
1807   iwdb_flags_t dbflg = lx->db->dbflg;
1808 
1809   if (sblk->pnum < 1) {
1810     *idxp = 0;
1811     return 0;
1812   }
1813   while (1) {
1814     idx = (ub + lb) / 2;
1815     iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1816     RCRET(rc);
1817     int cr = _cmp_keys(dbflg, k, kl, lx->key);
1818     if (!cr) {
1819       *found = true;
1820       break;
1821     } else if (cr < 0) {
1822       lb = idx + 1;
1823       if (lb > ub) {
1824         idx = lb;
1825         break;
1826       }
1827     } else {
1828       ub = idx - 1;
1829       if (lb > ub) {
1830         break;
1831       }
1832     }
1833   }
1834   *idxp = idx;
1835   return 0;
1836 }
1837 
_sblk_insert_pi_mm(SBLK * sblk,uint8_t nidx,IWLCTX * lx,const uint8_t * mm,uint8_t * idxp)1838 static WUR iwrc _sblk_insert_pi_mm(
1839   SBLK *sblk, uint8_t nidx, IWLCTX *lx,
1840   const uint8_t *mm, uint8_t *idxp) {
1841   assert(sblk->kvblk);
1842 
1843   uint8_t *k;
1844   uint32_t kl;
1845   int idx = 0, lb = 0, ub = sblk->pnum - 1, nels = sblk->pnum; // NOLINT
1846 
1847   if (nels < 1) {
1848     sblk->pi[0] = nidx;
1849     ++sblk->pnum;
1850     *idxp = 0;
1851     return 0;
1852   }
1853   iwdb_flags_t dbflg = sblk->db->dbflg;
1854   while (1) {
1855     idx = (ub + lb) / 2;
1856     iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1857     RCRET(rc);
1858     int cr = _cmp_keys(dbflg, k, kl, lx->key);
1859     if (!cr) {
1860       break;
1861     } else if (cr < 0) {
1862       lb = idx + 1;
1863       if (lb > ub) {
1864         idx = lb;
1865         ++sblk->pnum;
1866         break;
1867       }
1868     } else {
1869       ub = idx - 1;
1870       if (lb > ub) {
1871         ++sblk->pnum;
1872         break;
1873       }
1874     }
1875   }
1876   if (nels - idx > 0) {
1877     memmove(sblk->pi + idx + 1, sblk->pi + idx, nels - idx);
1878   }
1879   sblk->pi[idx] = nidx;
1880   *idxp = idx;
1881   return 0;
1882 }
1883 
_sblk_addkv2(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val,bool raw_key)1884 static WUR iwrc _sblk_addkv2(
1885   SBLK           *sblk,
1886   int8_t          idx,
1887   const IWKV_val *key,
1888   const IWKV_val *val,
1889   bool            raw_key) {
1890   assert(sblk && key && key->size && key->data && val && idx >= 0 && sblk->kvblk);
1891 
1892   uint8_t kvidx;
1893   IWDB db = sblk->db;
1894   KVBLK *kvblk = sblk->kvblk;
1895   if (sblk->pnum >= KVBLK_IDXNUM) {
1896     return _IWKV_RC_KVBLOCK_FULL;
1897   }
1898 
1899   iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, raw_key);
1900   RCRET(rc);
1901   if (sblk->pnum - idx > 0) {
1902     memmove(sblk->pi + idx + 1, sblk->pi + idx, sblk->pnum - idx);
1903   }
1904   sblk->pi[idx] = kvidx;
1905   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1906     sblk->kvblkn = ADDR2BLK(kvblk->addr);
1907     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1908       sblk->flags |= SBLK_CACHE_UPDATE;
1909     }
1910   }
1911   ++sblk->pnum;
1912   sblk->flags |= SBLK_DURTY;
1913   if (idx == 0) { // the lowest key inserted
1914     size_t ksize = key->size;
1915     bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1916     if (compound) {
1917       ksize += IW_VNUMSIZE(key->compound);
1918     }
1919     sblk->lkl = MIN(db->iwkv->pklen, ksize);
1920     uint8_t *wp = sblk->lk;
1921     if (compound) {
1922       int len;
1923       IW_SETVNUMBUF64(len, wp, key->compound);
1924       wp += len;
1925     }
1926     memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1927     if (ksize <= db->iwkv->pklen) {
1928       sblk->flags |= SBLK_FULL_LKEY;
1929     } else {
1930       sblk->flags &= ~SBLK_FULL_LKEY;
1931     }
1932     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1933       sblk->flags |= SBLK_CACHE_UPDATE;
1934     }
1935   }
1936   if (!raw_key) {
1937     // Update active cursors inside this block
1938     pthread_spin_lock(&db->cursors_slk);
1939     for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1940       if (cur->cn && (cur->cn->addr == sblk->addr)) {
1941         if (cur->cn != sblk) {
1942           memcpy(cur->cn, sblk, sizeof(*cur->cn));
1943           cur->cn->kvblk = 0;
1944           cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1945         }
1946         if (cur->cnpos >= idx) {
1947           cur->cnpos++;
1948         }
1949       }
1950     }
1951     pthread_spin_unlock(&db->cursors_slk);
1952   }
1953   return 0;
1954 }
1955 
_sblk_addkv(SBLK * sblk,IWLCTX * lx)1956 static WUR iwrc _sblk_addkv(SBLK *sblk, IWLCTX *lx) {
1957   const IWKV_val *key = lx->key;
1958   const IWKV_val *val = lx->val;
1959   assert(key && key->size && key->data && val && sblk->kvblk);
1960   if (!sblk) {
1961     iwlog_error2("sblk != 0");
1962     return IW_ERROR_ASSERTION;
1963   }
1964   uint8_t *mm, idx, kvidx;
1965   IWDB db = sblk->db;
1966   KVBLK *kvblk = sblk->kvblk;
1967   IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
1968   if (sblk->pnum >= KVBLK_IDXNUM) {
1969     return _IWKV_RC_KVBLOCK_FULL;
1970   }
1971   iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, false);
1972   RCRET(rc);
1973   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1974   RCRET(rc);
1975   rc = _sblk_insert_pi_mm(sblk, kvidx, lx, mm, &idx);
1976   RCRET(rc);
1977   fsm->release_mmap(fsm);
1978   if (idx == 0) { // the lowest key inserted
1979     size_t ksize = key->size;
1980     bool compound = (db->dbflg & IWDB_COMPOUND_KEYS);
1981     if (compound) {
1982       ksize += IW_VNUMSIZE(key->compound);
1983     }
1984     sblk->lkl = MIN(db->iwkv->pklen, ksize);
1985     uint8_t *wp = sblk->lk;
1986     if (compound) {
1987       int len;
1988       IW_SETVNUMBUF64(len, wp, key->compound);
1989       wp += len;
1990     }
1991     memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1992     if (ksize <= db->iwkv->pklen) {
1993       sblk->flags |= SBLK_FULL_LKEY;
1994     } else {
1995       sblk->flags &= ~SBLK_FULL_LKEY;
1996     }
1997     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1998       sblk->flags |= SBLK_CACHE_UPDATE;
1999     }
2000   }
2001   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2002     sblk->kvblkn = ADDR2BLK(kvblk->addr);
2003     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2004       sblk->flags |= SBLK_CACHE_UPDATE;
2005     }
2006   }
2007   sblk->flags |= SBLK_DURTY;
2008 
2009   // Update active cursors inside this block
2010   pthread_spin_lock(&db->cursors_slk);
2011   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2012     if (cur->cn && (cur->cn->addr == sblk->addr)) {
2013       if (cur->cn != sblk) {
2014         memcpy(cur->cn, sblk, sizeof(*cur->cn));
2015         cur->cn->kvblk = 0;
2016         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2017       }
2018       if (cur->cnpos >= idx) {
2019         cur->cnpos++;
2020       }
2021     }
2022   }
2023   pthread_spin_unlock(&db->cursors_slk);
2024 
2025   return 0;
2026 }
2027 
_sblk_updatekv(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val)2028 static WUR iwrc _sblk_updatekv(
2029   SBLK *sblk, int8_t idx,
2030   const IWKV_val *key, const IWKV_val *val) {
2031   assert(sblk && sblk->kvblk && idx >= 0 && idx < sblk->pnum);
2032   IWDB db = sblk->db;
2033   KVBLK *kvblk = sblk->kvblk;
2034   uint8_t kvidx = sblk->pi[idx];
2035   iwrc intrc = 0;
2036   iwrc rc = _kvblk_updatev(kvblk, &kvidx, key, val);
2037   if (IWKV_IS_INTERNAL_RC(rc)) {
2038     intrc = rc;
2039     rc = 0;
2040   }
2041   RCRET(rc);
2042   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2043     sblk->kvblkn = ADDR2BLK(kvblk->addr);
2044     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2045       sblk->flags |= SBLK_CACHE_UPDATE;
2046     }
2047   }
2048   sblk->pi[idx] = kvidx;
2049   sblk->flags |= SBLK_DURTY;
2050   // Update active cursors inside this block
2051   pthread_spin_lock(&db->cursors_slk);
2052   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2053     if (cur->cn && (cur->cn != sblk) && (cur->cn->addr == sblk->addr)) {
2054       memcpy(cur->cn, sblk, sizeof(*cur->cn));
2055       cur->cn->kvblk = 0;
2056       cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2057     }
2058   }
2059   pthread_spin_unlock(&db->cursors_slk);
2060   return intrc;
2061 }
2062 
_sblk_rmkv(SBLK * sblk,uint8_t idx)2063 static WUR iwrc _sblk_rmkv(SBLK *sblk, uint8_t idx) {
2064   assert(sblk && sblk->kvblk);
2065   IWDB db = sblk->db;
2066   KVBLK *kvblk = sblk->kvblk;
2067   IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
2068   assert(kvblk && idx < sblk->pnum && sblk->pi[idx] < KVBLK_IDXNUM);
2069 
2070   iwrc rc = _kvblk_rmkv(kvblk, sblk->pi[idx], 0);
2071   RCRET(rc);
2072 
2073   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2074     sblk->kvblkn = ADDR2BLK(kvblk->addr);
2075     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2076       sblk->flags |= SBLK_CACHE_UPDATE;
2077     }
2078   }
2079   --sblk->pnum;
2080   sblk->flags |= SBLK_DURTY;
2081 
2082   if ((idx < sblk->pnum) && (sblk->pnum > 0)) {
2083     memmove(sblk->pi + idx, sblk->pi + idx + 1, sblk->pnum - idx);
2084   }
2085 
2086   if (idx == 0) { // Lowest key removed
2087     // Replace the lowest key with the next one or reset
2088     if (sblk->pnum > 0) {
2089       uint8_t *mm, *kbuf;
2090       uint32_t klen;
2091       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2092       RCRET(rc);
2093       rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &kbuf, &klen);
2094       if (rc) {
2095         fsm->release_mmap(fsm);
2096         return rc;
2097       }
2098       sblk->lkl = MIN(db->iwkv->pklen, klen);
2099       memcpy(sblk->lk, kbuf, sblk->lkl);
2100       fsm->release_mmap(fsm);
2101       if (klen <= db->iwkv->pklen) {
2102         sblk->flags |= SBLK_FULL_LKEY;
2103       } else {
2104         sblk->flags &= ~SBLK_FULL_LKEY;
2105       }
2106       if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2107         sblk->flags |= SBLK_CACHE_UPDATE;
2108       }
2109     } else {
2110       sblk->lkl = 0;
2111       sblk->flags |= SBLK_CACHE_REMOVE;
2112     }
2113   }
2114 
2115   // Update active cursors
2116   pthread_spin_lock(&db->cursors_slk);
2117   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2118     if (cur->cn && (cur->cn->addr == sblk->addr)) {
2119       cur->skip_next = 0;
2120       if (cur->cn != sblk) {
2121         memcpy(cur->cn, sblk, sizeof(*cur->cn));
2122         cur->cn->kvblk = 0;
2123         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2124       }
2125       if (cur->cnpos == idx) {
2126         if (idx && (idx == sblk->pnum)) {
2127           cur->cnpos--;
2128           cur->skip_next = -1;
2129         } else {
2130           cur->skip_next = 1;
2131         }
2132       } else if (cur->cnpos > idx) {
2133         cur->cnpos--;
2134       }
2135     }
2136   }
2137   pthread_spin_unlock(&db->cursors_slk);
2138   return 0;
2139 }
2140 
2141 //--------------------------  IWLCTX
2142 
_lx_sblk_cmp_key(IWLCTX * lx,SBLK * sblk,int * resp)2143 WUR iwrc _lx_sblk_cmp_key(IWLCTX *lx, SBLK *sblk, int *resp) {
2144   int res = 0;
2145   iwrc rc = 0;
2146   iwdb_flags_t dbflg = sblk->db->dbflg;
2147   const IWKV_val *key = lx->key;
2148   uint8_t lkl = sblk->lkl;
2149   size_t ksize = key->size;
2150 
2151   if (IW_UNLIKELY((sblk->pnum < 1) || (sblk->flags & SBLK_DB))) {
2152     *resp = 0;
2153     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
2154     return IWKV_ERROR_CORRUPTED;
2155   }
2156   if (dbflg & IWDB_COMPOUND_KEYS) {
2157     ksize += IW_VNUMSIZE(key->compound);
2158   }
2159   if (  (sblk->flags & SBLK_FULL_LKEY)
2160      || (ksize < lkl)
2161      || (dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2162     res = _cmp_keys(dbflg, sblk->lk, lkl, key);
2163   } else {
2164     res = _cmp_keys_prefix(dbflg, sblk->lk, lkl, key);
2165     if (res == 0) {
2166       uint32_t kl;
2167       uint8_t *mm, *k;
2168       IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2169       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2170       if (rc) {
2171         *resp = 0;
2172         return rc;
2173       }
2174       if (!sblk->kvblk) {
2175         rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2176         if (rc) {
2177           *resp = 0;
2178           fsm->release_mmap(fsm);
2179           return rc;
2180         }
2181       }
2182       rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[0], mm, &k, &kl);
2183       RCRET(rc);
2184       res = _cmp_keys(dbflg, k, kl, key);
2185       fsm->release_mmap(fsm);
2186     }
2187   }
2188   *resp = res;
2189   return rc;
2190 }
2191 
_lx_roll_forward(IWLCTX * lx,uint8_t lvl)2192 static WUR iwrc _lx_roll_forward(IWLCTX *lx, uint8_t lvl) {
2193   iwrc rc = 0;
2194   int cret;
2195   SBLK *sblk;
2196   blkn_t blkn;
2197   assert(lx->lower);
2198 
2199   while ((blkn = lx->lower->n[lvl])) {
2200     off_t blkaddr = BLK2ADDR(blkn);
2201     if ((lx->nlvl > -1) && (lvl < lx->nlvl)) {
2202       uint8_t ulvl = lvl + 1;
2203       if (lx->pupper[ulvl] && (lx->pupper[ulvl]->addr == blkaddr)) {
2204         sblk = lx->pupper[ulvl];
2205       } else if (lx->plower[ulvl] && (lx->plower[ulvl]->addr == blkaddr)) {
2206         sblk = lx->plower[ulvl];
2207       } else {
2208         rc = _sblk_at(lx, blkaddr, 0, &sblk);
2209       }
2210     } else {
2211       rc = _sblk_at(lx, blkaddr, 0, &sblk);
2212     }
2213     RCRET(rc);
2214 #ifndef NDEBUG
2215     ++lx->num_cmps;
2216 #endif
2217     rc = _lx_sblk_cmp_key(lx, sblk, &cret);
2218     RCRET(rc);
2219     if ((cret > 0) || (lx->upper_addr == sblk->addr)) { // upper > key
2220       lx->upper = sblk;
2221       break;
2222     } else {
2223       lx->lower = sblk;
2224     }
2225   }
2226   return 0;
2227 }
2228 
_lx_find_bounds(IWLCTX * lx)2229 static WUR iwrc _lx_find_bounds(IWLCTX *lx) {
2230   iwrc rc = 0;
2231   int lvl;
2232   blkn_t blkn;
2233   SBLK *dblk = &lx->dblk;
2234   if (!dblk->addr) {
2235     SBLK *s;
2236     rc = _sblk_at(lx, lx->db->addr, 0, &s);
2237     RCRET(rc);
2238     memcpy(dblk, s, sizeof(*dblk));
2239   }
2240   if (!lx->lower) {
2241     rc = _dbcache_get(lx);
2242     RCRET(rc);
2243   }
2244   if (lx->nlvl > dblk->lvl) {
2245     // New level in DB
2246     dblk->lvl = (uint8_t) lx->nlvl;
2247     dblk->flags |= SBLK_DURTY;
2248   }
2249   lvl = lx->lower->lvl;
2250   while (lvl > -1) {
2251     rc = _lx_roll_forward(lx, (uint8_t) lvl);
2252     RCRET(rc);
2253     if (lx->upper) {
2254       blkn = ADDR2BLK(lx->upper->addr);
2255     } else {
2256       blkn = 0;
2257     }
2258     do {
2259       if (lx->nlvl >= lvl) {
2260         lx->plower[lvl] = lx->lower;
2261         lx->pupper[lvl] = lx->upper;
2262       }
2263     } while (lvl-- && lx->lower->n[lvl] == blkn);
2264   }
2265   return 0;
2266 }
2267 
_lx_release_mm(IWLCTX * lx,uint8_t * mm)2268 static iwrc _lx_release_mm(IWLCTX *lx, uint8_t *mm) {
2269   iwrc rc = 0;
2270   if (lx->nlvl > -1) {
2271     SBLK *lsb = 0, *usb = 0;
2272     if (lx->nb) {
2273       rc = _sblk_sync_mm(lx, lx->nb, mm);
2274       RCGO(rc, finish);
2275     }
2276     if (lx->pupper[0] == lx->upper) {
2277       lx->upper = 0;
2278     }
2279     if (lx->plower[0] == lx->lower) {
2280       lx->lower = 0;
2281     }
2282     for (int i = 0; i <= lx->nlvl; ++i) {
2283       if (lx->pupper[i]) {
2284         if (lx->pupper[i] != usb) {
2285           usb = lx->pupper[i];
2286           rc = _sblk_sync_and_release_mm(lx, &lx->pupper[i], mm);
2287           RCGO(rc, finish);
2288         }
2289         lx->pupper[i] = 0;
2290       }
2291       if (lx->plower[i]) {
2292         if (lx->plower[i] != lsb) {
2293           lsb = lx->plower[i];
2294           rc = _sblk_sync_and_release_mm(lx, &lx->plower[i], mm);
2295           RCGO(rc, finish);
2296         }
2297         lx->plower[i] = 0;
2298       }
2299     }
2300   }
2301   if (lx->upper) {
2302     rc = _sblk_sync_and_release_mm(lx, &lx->upper, mm);
2303     RCGO(rc, finish);
2304   }
2305   if (lx->lower) {
2306     rc = _sblk_sync_and_release_mm(lx, &lx->lower, mm);
2307     RCGO(rc, finish);
2308   }
2309   if (lx->dblk.flags & SBLK_DURTY) {
2310     rc = _sblk_sync_mm(lx, &lx->dblk, mm);
2311     RCGO(rc, finish);
2312   }
2313   if (lx->nb) {
2314     if (lx->nb->flags & SBLK_CACHE_PUT) {
2315       rc = _dbcache_put_lw(lx, lx->nb);
2316     }
2317     _sblk_release(lx, &lx->nb);
2318     RCGO(rc, finish);
2319   }
2320   if (lx->cache_reload) {
2321     rc = _dbcache_fill_lw(lx);
2322   }
2323 
2324 finish:
2325   lx->destroy_addr = 0;
2326   return rc;
2327 }
2328 
_lx_release(IWLCTX * lx)2329 iwrc _lx_release(IWLCTX *lx) {
2330   uint8_t *mm;
2331   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2332   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2333   RCRET(rc);
2334   rc = _lx_release_mm(lx, mm);
2335   IWRC(fsm->release_mmap(fsm), rc);
2336   return rc;
2337 }
2338 
_lx_split_addkv(IWLCTX * lx,int idx,SBLK * sblk)2339 static iwrc _lx_split_addkv(IWLCTX *lx, int idx, SBLK *sblk) {
2340   iwrc rc;
2341   SBLK *nb;
2342   blkn_t nblk;
2343   IWDB db = sblk->db;
2344   bool uside = (idx == sblk->pnum);
2345   register const int8_t pivot = (KVBLK_IDXNUM / 2) + 1; // 32
2346 
2347   if (uside) { // Upper side
2348     rc = _sblk_create(lx, (uint8_t) lx->nlvl, 0, sblk, lx->upper, &nb);
2349     RCRET(rc);
2350     rc = _sblk_addkv(nb, lx);
2351     RCGO(rc, finish);
2352   } else { // New key is somewhere in a middle of sblk->kvblk
2353     assert(sblk->kvblk);
2354     // We are in the middle
2355     // Do the partial split
2356     // Move kv pairs into new `nb`
2357     // Compute space required for the new sblk which stores kv pairs after pivot `idx`
2358     size_t sz = 0;
2359     for (int8_t i = pivot; i < sblk->pnum; ++i) {
2360       sz += sblk->kvblk->pidx[sblk->pi[i]].len;
2361     }
2362     if (idx > pivot) {
2363       sz += IW_VNUMSIZE(lx->key->size) + lx->key->size + lx->val->size;
2364     }
2365     sz += KVBLK_MAX_NKV_SZ;
2366     uint8_t kvbpow = (uint8_t) iwlog2_64(sz);
2367     while ((1ULL << kvbpow) < sz) kvbpow++;
2368 
2369     rc = _sblk_create(lx, (uint8_t) lx->nlvl, kvbpow, sblk, lx->upper, &nb);
2370     RCRET(rc);
2371 
2372     IWKV_val key, val;
2373     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2374     for (int8_t i = pivot, end = sblk->pnum; i < end; ++i) {
2375       uint8_t *mm;
2376       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2377       RCBREAK(rc);
2378 
2379       rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[i], &key, &val);
2380       assert(key.size);
2381       fsm->release_mmap(fsm);
2382       RCBREAK(rc);
2383 
2384       rc = _sblk_addkv2(nb, i - pivot, &key, &val, true);
2385       _kv_dispose(&key, &val);
2386 
2387       RCBREAK(rc);
2388       sblk->kvblk->pidx[sblk->pi[i]].len = 0;
2389       sblk->kvblk->pidx[sblk->pi[i]].off = 0;
2390       --sblk->pnum;
2391     }
2392     sblk->kvblk->flags |= KVBLK_DURTY;
2393     sblk->kvblk->zidx = sblk->pi[pivot];
2394     sblk->kvblk->maxoff = 0;
2395     for (int i = 0; i < KVBLK_IDXNUM; ++i) {
2396       if (sblk->kvblk->pidx[i].off > sblk->kvblk->maxoff) {
2397         sblk->kvblk->maxoff = sblk->kvblk->pidx[i].off;
2398       }
2399     }
2400   }
2401 
2402   // Fix levels:
2403   //  [ lb -> sblk -> ub ]
2404   //  [ lb -> sblk -> nb -> ub ]
2405   nblk = ADDR2BLK(nb->addr);
2406   lx->pupper[0]->p0 = nblk;
2407   lx->pupper[0]->flags |= SBLK_DURTY;
2408   nb->p0 = ADDR2BLK(lx->plower[0]->addr);
2409   for (int i = 0; i <= nb->lvl; ++i) {
2410     lx->plower[i]->n[i] = nblk;
2411     lx->plower[i]->flags |= SBLK_DURTY;
2412     nb->n[i] = ADDR2BLK(lx->pupper[i]->addr);
2413   }
2414 
2415   pthread_spin_lock(&db->cursors_slk);
2416   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2417     if (cur->cn && (cur->cn->addr == sblk->addr)) {
2418       if (cur->cnpos >= pivot) {
2419         memcpy(cur->cn, nb, sizeof(*cur->cn));
2420         cur->cn->kvblk = 0;
2421         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2422         cur->cnpos -= pivot;
2423       }
2424     }
2425   }
2426   pthread_spin_unlock(&db->cursors_slk);
2427 
2428   if (!uside) {
2429     if (idx > pivot) {
2430       rc = _sblk_addkv(nb, lx);
2431     } else {
2432       rc = _sblk_addkv(sblk, lx);
2433     }
2434     RCGO(rc, finish);
2435   }
2436 
2437 finish:
2438   if (rc) {
2439     lx->nb = 0;
2440     IWRC(_sblk_destroy(lx, &nb), rc);
2441   } else {
2442     lx->nb = nb;
2443   }
2444   return rc;
2445 }
2446 
_lx_init_chute(IWLCTX * lx)2447 IW_INLINE iwrc _lx_init_chute(IWLCTX *lx) {
2448   assert(lx->nlvl >= 0);
2449   iwrc rc = 0;
2450   if (!lx->pupper[lx->nlvl]) { // fix zero upper by dbtail
2451     SBLK *dbtail;
2452     rc = _sblk_at(lx, 0, 0, &dbtail);
2453     RCRET(rc);
2454     for (int8_t i = lx->nlvl; i >= 0 && !lx->pupper[i]; --i) {
2455       lx->pupper[i] = dbtail;
2456     }
2457   }
2458   return 0;
2459 }
2460 
_lx_addkv(IWLCTX * lx)2461 static WUR iwrc _lx_addkv(IWLCTX *lx) {
2462   iwrc rc;
2463   bool found, uadd;
2464   uint8_t *mm = 0, idx;
2465   SBLK *sblk = lx->lower;
2466   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2467   if (lx->nlvl > -1) {
2468     rc = _lx_init_chute(lx);
2469     RCRET(rc);
2470   }
2471   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2472   RCRET(rc);
2473   rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2474   if (rc) {
2475     fsm->release_mmap(fsm);
2476     return rc;
2477   }
2478   rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2479   RCRET(rc);
2480   if (found && (lx->opflags & IWKV_NO_OVERWRITE)) {
2481     fsm->release_mmap(fsm);
2482     return IWKV_ERROR_KEY_EXISTS;
2483   }
2484   uadd = (  !found
2485          && sblk->pnum > KVBLK_IDXNUM - 1 && idx > KVBLK_IDXNUM - 1
2486          && lx->upper && lx->upper->pnum < KVBLK_IDXNUM);
2487   if (uadd) {
2488     rc = _sblk_loadkvblk_mm(lx, lx->upper, mm);
2489     if (rc) {
2490       fsm->release_mmap(fsm);
2491       return rc;
2492     }
2493   }
2494   if (found) {
2495     IWKV_val sval, *val = lx->val;
2496     if (lx->opflags & IWKV_VAL_INCREMENT) {
2497       int64_t ival;
2498       uint8_t *rp;
2499       uint32_t len;
2500       if (val->size == 4) {
2501         int32_t lv;
2502         memcpy(&lv, val->data, val->size);
2503         lv = IW_ITOHL(lv);
2504         ival = lv;
2505       } else if (val->size == 8) {
2506         memcpy(&ival, val->data, val->size);
2507         ival = IW_ITOHLL(ival);
2508       } else {
2509         rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2510         fsm->release_mmap(fsm);
2511         return rc;
2512       }
2513       _kvblk_value_peek(sblk->kvblk, sblk->pi[idx], mm, &rp, &len);
2514       sval.data = rp;
2515       sval.size = len;
2516       if (sval.size == 4) {
2517         uint32_t lv;
2518         memcpy(&lv, sval.data, 4);
2519         lv = IW_ITOHL(lv);
2520         lv += ival;
2521         _num2lebuf(lx->incbuf, &lv, 4);
2522       } else if (sval.size == 8) {
2523         uint64_t llv;
2524         memcpy(&llv, sval.data, 8);
2525         llv = IW_ITOHLL(llv);
2526         llv += ival;
2527         _num2lebuf(lx->incbuf, &llv, 8);
2528       } else {
2529         rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2530         fsm->release_mmap(fsm);
2531         return rc;
2532       }
2533       sval.data = lx->incbuf;
2534       val = &sval;
2535     }
2536     if (lx->ph) {
2537       IWKV_val oldval;
2538       rc = _kvblk_value_get(sblk->kvblk, mm, sblk->pi[idx], &oldval);
2539       fsm->release_mmap(fsm);
2540       if (!rc) {
2541         // note: oldval should be disposed by ph
2542         rc = lx->ph(lx->key, lx->val, &oldval, lx->phop);
2543       }
2544       RCRET(rc);
2545     } else {
2546       fsm->release_mmap(fsm);
2547     }
2548     return _sblk_updatekv(sblk, idx, lx->key, val);
2549   } else {
2550     fsm->release_mmap(fsm);
2551     if (sblk->pnum > KVBLK_IDXNUM - 1) {
2552       if (uadd) {
2553         if (lx->ph) {
2554           rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2555           RCRET(rc);
2556         }
2557         return _sblk_addkv(lx->upper, lx);
2558       }
2559       if (lx->nlvl < 0) {
2560         return _IWKV_RC_REQUIRE_NLEVEL;
2561       }
2562       if (lx->ph) {
2563         rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2564         RCRET(rc);
2565       }
2566       return _lx_split_addkv(lx, idx, sblk);
2567     } else {
2568       if (lx->ph) {
2569         rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2570         RCRET(rc);
2571       }
2572       return _sblk_addkv2(sblk, idx, lx->key, lx->val, false);
2573     }
2574   }
2575 }
2576 
_lx_put_lw(IWLCTX * lx)2577 IW_INLINE WUR iwrc _lx_put_lw(IWLCTX *lx) {
2578   iwrc rc;
2579 start:
2580   rc = _lx_find_bounds(lx);
2581   if (rc) {
2582     _lx_release_mm(lx, 0);
2583     return rc;
2584   }
2585   rc = _lx_addkv(lx);
2586   if (rc == _IWKV_RC_REQUIRE_NLEVEL) {
2587     SBLK *lower = lx->lower;
2588     lx->lower = 0;
2589     _lx_release_mm(lx, 0);
2590     lx->nlvl = _sblk_genlevel(lx->db);
2591     if (lower->lvl >= lx->nlvl) {
2592       lx->lower = lower;
2593     }
2594     goto start;
2595   }
2596   if (rc == _IWKV_RC_KVBLOCK_FULL) {
2597     rc = IWKV_ERROR_CORRUPTED;
2598     iwlog_ecode_error3(rc);
2599   }
2600   IWRC(_lx_release(lx), rc);
2601   return rc;
2602 }
2603 
_lx_get_lr(IWLCTX * lx)2604 IW_INLINE WUR iwrc _lx_get_lr(IWLCTX *lx) {
2605   iwrc rc = _lx_find_bounds(lx);
2606   RCRET(rc);
2607   bool found;
2608   uint8_t *mm, idx;
2609   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2610   lx->val->size = 0;
2611   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2612   RCRET(rc);
2613   rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2614   RCGO(rc, finish);
2615   rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2616   RCGO(rc, finish);
2617   if (found) {
2618     rc = _kvblk_value_get(lx->lower->kvblk, mm, lx->lower->pi[idx], lx->val);
2619   } else {
2620     rc = IWKV_ERROR_NOTFOUND;
2621   }
2622 
2623 finish:
2624   IWRC(fsm->release_mmap(fsm), rc);
2625   _lx_release_mm(lx, 0);
2626   return rc;
2627 }
2628 
_lx_del_sblk_lw(IWLCTX * lx,SBLK * sblk,uint8_t idx)2629 static WUR iwrc _lx_del_sblk_lw(IWLCTX *lx, SBLK *sblk, uint8_t idx) {
2630   assert(sblk->pnum == 1 && sblk->kvblk);
2631 
2632   iwrc rc;
2633   IWDB db = lx->db;
2634   KVBLK *kvblk = sblk->kvblk;
2635   blkn_t sblk_blkn = ADDR2BLK(sblk->addr);
2636 
2637   _lx_release_mm(lx, 0);
2638   lx->nlvl = sblk->lvl;
2639   lx->upper_addr = sblk->addr;
2640 
2641   rc = _lx_find_bounds(lx);
2642   RCRET(rc);
2643   assert(lx->upper->pnum == 1 && lx->upper->addr == lx->upper_addr);
2644 
2645   lx->upper->kvblk = kvblk;
2646   rc = _sblk_rmkv(lx->upper, idx);
2647   RCGO(rc, finish);
2648 
2649   for (int i = 0; i <= lx->nlvl; ++i) {
2650     lx->plower[i]->n[i] = lx->upper->n[i];
2651     lx->plower[i]->flags |= SBLK_DURTY;
2652     if (lx->plower[i]->flags & SBLK_DB) {
2653       if (!lx->plower[i]->n[i]) {
2654         --lx->plower[i]->lvl;
2655       }
2656     }
2657     if (lx->pupper[i] == lx->upper) {
2658       // Do not touch `lx->upper` in next `_lx_release_mm()` call
2659       lx->pupper[i] = 0;
2660     }
2661   }
2662 
2663   SBLK rb;  // Block to remove
2664   memcpy(&rb, lx->upper, sizeof(rb));
2665 
2666   SBLK *nb, // Block after lx->upper
2667        *rbp = &rb;
2668 
2669   assert(!lx->nb);
2670   rc = _sblk_at(lx, BLK2ADDR(rb.n[0]), 0, &nb);
2671   RCGO(rc, finish);
2672   lx->nb = nb;
2673   lx->nb->p0 = rb.p0;
2674   lx->nb->flags |= SBLK_DURTY;
2675 
2676   // Update cursors within sblk removed
2677   pthread_spin_lock(&db->cursors_slk);
2678   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2679     if (cur->cn) {
2680       if (cur->cn->addr == sblk->addr) {
2681         if (nb->flags & SBLK_DB) {
2682           if (!(lx->plower[0]->flags & SBLK_DB)) {
2683             memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2684             cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2685             cur->cn->kvblk = 0;
2686             cur->skip_next = -1;
2687             cur->cnpos = lx->plower[0]->pnum;
2688             if (cur->cnpos) {
2689               cur->cnpos--;
2690             }
2691           } else {
2692             cur->cn = 0;
2693             cur->cnpos = 0;
2694             cur->skip_next = 0;
2695           }
2696         } else {
2697           memcpy(cur->cn, nb, sizeof(*nb));
2698           cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2699           cur->cn->kvblk = 0;
2700           cur->cnpos = 0;
2701           cur->skip_next = 1;
2702         }
2703       } else if (cur->cn->n[0] == sblk_blkn) {
2704         memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2705         cur->cn->kvblk = 0;
2706         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2707       } else if (cur->cn->p0 == sblk_blkn) {
2708         memcpy(cur->cn, nb, sizeof(*nb));
2709         cur->cn->kvblk = 0;
2710         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2711       }
2712     }
2713   }
2714   pthread_spin_unlock(&db->cursors_slk);
2715 
2716   rc = _sblk_destroy(lx, &rbp);
2717 
2718 finish:
2719   return rc;
2720 }
2721 
_lx_del_lw(IWLCTX * lx)2722 static WUR iwrc _lx_del_lw(IWLCTX *lx) {
2723   iwrc rc;
2724   bool found;
2725   uint8_t *mm = 0, idx;
2726   IWDB db = lx->db;
2727   IWFS_FSM *fsm = &db->iwkv->fsm;
2728   SBLK *sblk;
2729 
2730   rc = _lx_find_bounds(lx);
2731   RCRET(rc);
2732 
2733   sblk = lx->lower;
2734   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2735   RCGO(rc, finish);
2736   rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2737   RCGO(rc, finish);
2738   rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2739   RCGO(rc, finish);
2740   if (!found) {
2741     rc = IWKV_ERROR_NOTFOUND;
2742     goto finish;
2743   }
2744   fsm->release_mmap(fsm);
2745   mm = 0;
2746 
2747   if (sblk->pnum == 1) { // last kv in block
2748     rc = _lx_del_sblk_lw(lx, sblk, idx);
2749   } else {
2750     rc = _sblk_rmkv(sblk, idx);
2751   }
2752 
2753 finish:
2754   if (mm) {
2755     fsm->release_mmap(fsm);
2756   }
2757   if (rc) {
2758     _lx_release_mm(lx, 0);
2759   } else {
2760     rc = _lx_release(lx);
2761   }
2762   return rc;
2763 }
2764 
2765 //-------------------------- CACHE
2766 
_dbcache_destroy_lw(IWDB db)2767 static void _dbcache_destroy_lw(IWDB db) {
2768   free(db->cache.nodes);
2769   memset(&db->cache, 0, sizeof(db->cache));
2770 }
2771 
_dbcache_lvl(uint8_t lvl)2772 IW_INLINE uint8_t _dbcache_lvl(uint8_t lvl) {
2773   uint8_t clvl = (lvl >= DBCACHE_LEVELS) ? (lvl - DBCACHE_LEVELS + 1) : DBCACHE_MIN_LEVEL;
2774   if (clvl < DBCACHE_MIN_LEVEL) {
2775     clvl = DBCACHE_MIN_LEVEL;
2776   }
2777   return clvl;
2778 }
2779 
_dbcache_cmp_nodes(const void * v1,const void * v2,void * op,int * res)2780 static WUR iwrc _dbcache_cmp_nodes(const void *v1, const void *v2, void *op, int *res) {
2781   iwrc rc = 0;
2782   uint8_t *mm = 0;
2783   IWLCTX *lx = op;
2784   IWDB db = lx->db;
2785   IWFS_FSM *fsm = &db->iwkv->fsm;
2786   iwdb_flags_t dbflg = db->dbflg;
2787   int rv = 0, step;
2788 
2789   const DBCNODE *cn1 = v1, *cn2 = v2;
2790   uint8_t *k1 = (uint8_t*) cn1->lk, *k2 = (uint8_t*) cn2->lk;
2791   uint32_t kl1 = cn1->lkl, kl2 = cn2->lkl;
2792   KVBLK *kb;
2793 
2794   if (!kl1 && cn1->fullkey) {
2795     kl1 = cn1->sblkn;
2796   }
2797   if (!kl2 && cn2->fullkey) {
2798     kl2 = cn2->sblkn;
2799   }
2800 
2801   IWKV_val key2 = {
2802     .size = kl2,
2803     .data = k2
2804   };
2805 
2806   if (dbflg & IWDB_COMPOUND_KEYS) {
2807     IW_READVNUMBUF64(k2, key2.compound, step);
2808     key2.size -= step;
2809     key2.data = (char*) key2.data + step;
2810   }
2811 
2812   rv = _cmp_keys_prefix(dbflg, k1, kl1, &key2);
2813 
2814   if ((rv == 0) && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2815 
2816     if (!cn1->fullkey || !cn2->fullkey) {
2817       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2818       RCRET(rc);
2819       if (!cn1->fullkey) {
2820         rc = _kvblk_at_mm(lx, BLK2ADDR(cn1->kblkn), mm, 0, &kb);
2821         RCGO(rc, finish);
2822         rc = _kvblk_key_peek(kb, cn1->k0idx, mm, &k1, &kl1);
2823         RCGO(rc, finish);
2824       }
2825       if (!cn2->fullkey) {
2826         rc = _kvblk_at_mm(lx, BLK2ADDR(cn2->kblkn), mm, 0, &kb);
2827         RCGO(rc, finish);
2828         rc = _kvblk_key_peek(kb, cn2->k0idx, mm, &k2, &kl2);
2829         RCGO(rc, finish);
2830         key2.size = kl2;
2831         key2.data = k2;
2832         if (dbflg & IWDB_COMPOUND_KEYS) {
2833           IW_READVNUMBUF64(k2, key2.compound, step);
2834           key2.size -= step;
2835           key2.data = (char*) key2.data + step;
2836         }
2837       }
2838 
2839       rv = _cmp_keys(dbflg, k1, kl1, &key2);
2840     } else if (dbflg & IWDB_COMPOUND_KEYS) {
2841 
2842       int64_t c1, c2 = key2.compound;
2843       IW_READVNUMBUF64(k1, c1, step);
2844       kl1 -= step;
2845       if (key2.size == kl1) {
2846         rv = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
2847       } else {
2848         rv = (int) key2.size - (int) kl1;
2849       }
2850     } else {
2851       rv = (int) kl2 - (int) kl1;
2852     }
2853   }
2854 
2855 finish:
2856   *res = rv;
2857   if (mm) {
2858     fsm->release_mmap(fsm);
2859   }
2860   return rc;
2861 }
2862 
_dbcache_fill_lw(IWLCTX * lx)2863 static WUR iwrc _dbcache_fill_lw(IWLCTX *lx) {
2864   iwrc rc = 0;
2865   IWDB db = lx->db;
2866   lx->cache_reload = 0;
2867   if (!lx->dblk.addr) {
2868     SBLK *s;
2869     rc = _sblk_at(lx, lx->db->addr, 0, &s);
2870     RCRET(rc);
2871     memcpy(&lx->dblk, s, sizeof(lx->dblk));
2872   }
2873   SBLK *sdb = &lx->dblk;
2874   SBLK *sblk = sdb;
2875   DBCACHE *c = &db->cache;
2876   assert(lx->db->addr == sdb->addr);
2877   c->num = 0;
2878   if (c->nodes) {
2879     free(c->nodes);
2880     c->nodes = 0;
2881   }
2882   if (sdb->lvl < DBCACHE_MIN_LEVEL) {
2883     c->open = true;
2884     return 0;
2885   }
2886   c->lvl = _dbcache_lvl(sdb->lvl);
2887   c->nsize = (lx->db->dbflg & IWDB_VNUM64_KEYS) ? DBCNODE_VNUM_SZ : DBCNODE_STR_SZ;
2888   c->asize = c->nsize * ((1U << DBCACHE_LEVELS) + DBCACHE_ALLOC_STEP);
2889 
2890   size_t nsize = c->nsize;
2891   c->nodes = malloc(c->asize);
2892   if (!c->nodes) {
2893     c->open = false;
2894     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2895   }
2896   blkn_t n;
2897   uint8_t *wp;
2898   size_t num = 0;
2899   while ((n = sblk->n[c->lvl])) {
2900     rc = _sblk_at(lx, BLK2ADDR(n), 0, &sblk);
2901     RCRET(rc);
2902     if (offsetof(DBCNODE, lk) + sblk->lkl > nsize) {
2903       free(c->nodes);
2904       c->nodes = 0;
2905       rc = IWKV_ERROR_CORRUPTED;
2906       iwlog_ecode_error3(rc);
2907       return rc;
2908     }
2909     DBCNODE cn = {
2910       .lkl     = sblk->lkl,
2911       .fullkey = (sblk->flags & SBLK_FULL_LKEY),
2912       .k0idx   = sblk->pi[0],
2913       .sblkn   = ADDR2BLK(sblk->addr),
2914       .kblkn   = sblk->kvblkn
2915     };
2916     if (c->asize < nsize * (num + 1)) {
2917       c->asize += (nsize * DBCACHE_ALLOC_STEP);
2918       wp = (uint8_t*) c->nodes;
2919       DBCNODE *nn = realloc(c->nodes, c->asize);
2920       if (!nn) {
2921         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
2922         free(wp);
2923         return rc;
2924       }
2925       c->nodes = nn;
2926     }
2927     wp = (uint8_t*) c->nodes + nsize * num;
2928     memcpy(wp, &cn, offsetof(DBCNODE, lk));
2929     wp += offsetof(DBCNODE, lk);
2930     memcpy(wp, sblk->lk, sblk->lkl);
2931     ++num;
2932   }
2933   c->num = num;
2934   c->open = true;
2935   return 0;
2936 }
2937 
_dbcache_get(IWLCTX * lx)2938 static WUR iwrc _dbcache_get(IWLCTX *lx) {
2939   iwrc rc = 0;
2940   off_t idx;
2941   bool found;
2942   DBCNODE *n;
2943   alignas(DBCNODE) uint8_t dbcbuf[255];
2944   IWDB db = lx->db;
2945   DBCACHE *cache = &db->cache;
2946   const IWKV_val *key = lx->key;
2947   if ((lx->nlvl > -1) || (cache->num < 1)) {
2948     lx->lower = &lx->dblk;
2949     return 0;
2950   }
2951   assert(cache->nodes);
2952   size_t lxksiz = key->size;
2953   if (db->dbflg & IWDB_COMPOUND_KEYS) {
2954     lxksiz += IW_VNUMSIZE(key->compound);
2955   }
2956 
2957   if (sizeof(DBCNODE) + lxksiz <= sizeof(dbcbuf)) {
2958     n = (DBCNODE*) dbcbuf;
2959   } else {
2960     n = malloc(sizeof(DBCNODE) + lxksiz);
2961     if (!n) {
2962       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2963     }
2964   }
2965   n->sblkn = (uint32_t) lxksiz; // `sblkn` used to store key size (to keep DBCNODE compact)
2966   n->kblkn = 0;
2967   n->fullkey = 1;
2968   n->lkl = 0;
2969   n->k0idx = 0;
2970 
2971   uint8_t *wp = (uint8_t*) n + offsetof(DBCNODE, lk);
2972   if (db->dbflg & IWDB_COMPOUND_KEYS) {
2973     size_t step;
2974     char vbuf[IW_VNUMBUFSZ];
2975     IW_SETVNUMBUF(step, vbuf, key->compound);
2976     memcpy(wp, vbuf, step);
2977     wp += step;
2978   }
2979   memcpy(wp, key->data, key->size);
2980 
2981   idx = iwarr_sorted_find2(cache->nodes, cache->num, cache->nsize, n, lx, &found, _dbcache_cmp_nodes);
2982   if (idx > 0) {
2983     DBCNODE *fn = (DBCNODE*) ((uint8_t*) cache->nodes + (idx - 1) * cache->nsize);
2984     assert(fn && idx - 1 < cache->num);
2985     rc = _sblk_at(lx, BLK2ADDR(fn->sblkn), 0, &lx->lower);
2986   } else {
2987     lx->lower = &lx->dblk;
2988   }
2989   if ((uint8_t*) n != dbcbuf) {
2990     free(n);
2991   }
2992   return rc;
2993 }
2994 
_dbcache_put_lw(IWLCTX * lx,SBLK * sblk)2995 static WUR iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk) {
2996   off_t idx;
2997   bool found;
2998   IWDB db = lx->db;
2999   alignas(DBCNODE) uint8_t dbcbuf[255];
3000   DBCNODE *n = (DBCNODE*) dbcbuf;
3001   DBCACHE *cache = &db->cache;
3002   size_t nsize = cache->nsize;
3003 
3004   sblk->flags &= ~SBLK_CACHE_PUT;
3005   assert(sizeof(*cache) + sblk->lkl <= sizeof(dbcbuf));
3006   if ((sblk->pnum < 1) || (sblk->lvl < cache->lvl)) {
3007     return 0;
3008   }
3009   if ((sblk->lvl >= cache->lvl + DBCACHE_LEVELS) || !cache->nodes) { // need to reload full cache
3010     lx->cache_reload = 1;
3011     return 0;
3012   }
3013   if (!sblk->kvblk) {
3014     assert(sblk->kvblk);
3015     return IW_ERROR_INVALID_STATE;
3016   }
3017   n->lkl = sblk->lkl;
3018   n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
3019   n->k0idx = sblk->pi[0];
3020   n->sblkn = ADDR2BLK(sblk->addr);
3021   n->kblkn = sblk->kvblkn;
3022   memcpy((uint8_t*) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
3023 
3024   idx = iwarr_sorted_find2(cache->nodes, cache->num, nsize, n, lx, &found, _dbcache_cmp_nodes);
3025   assert(!found);
3026 
3027   if (cache->asize <= cache->num * nsize) {
3028     size_t nsz = cache->asize + (nsize * DBCACHE_ALLOC_STEP);
3029     DBCNODE *nodes = realloc(cache->nodes, nsz);
3030     if (!nodes) {
3031       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3032       free(cache->nodes);
3033       cache->nodes = 0;
3034       return rc;
3035     }
3036     cache->asize = nsz;
3037     cache->nodes = nodes;
3038   }
3039 
3040   uint8_t *cptr = (uint8_t*) cache->nodes;
3041   if (cache->num != idx) {
3042     memmove(cptr + (idx + 1) * nsize, cptr + idx * nsize, (cache->num - idx) * nsize);
3043   }
3044   memcpy(cptr + idx * nsize, n, nsize);
3045   ++cache->num;
3046   return 0;
3047 }
3048 
_dbcache_remove_lw(IWLCTX * lx,SBLK * sblk)3049 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk) {
3050   IWDB db = lx->db;
3051   DBCACHE *cache = &db->cache;
3052   sblk->flags &= ~SBLK_CACHE_REMOVE;
3053   if ((sblk->lvl < cache->lvl) || (cache->num < 1)) {
3054     return;
3055   }
3056   if ((cache->lvl > DBCACHE_MIN_LEVEL) && (lx->dblk.lvl < sblk->lvl)) {
3057     // Database level reduced so we need to shift cache down
3058     lx->cache_reload = 1;
3059     return;
3060   }
3061   blkn_t sblkn = ADDR2BLK(sblk->addr);
3062   size_t num = cache->num;
3063   size_t nsize = cache->nsize;
3064   uint8_t *rp = (uint8_t*) cache->nodes;
3065   for (size_t i = 0; i < num; ++i) {
3066     DBCNODE *n = (DBCNODE*) (rp + i * nsize);
3067     if (sblkn == n->sblkn) {
3068       if (i < num - 1) {
3069         memmove(rp + i * nsize, rp + (i + 1) * nsize, (num - i - 1) * nsize);
3070       }
3071       --cache->num;
3072       break;
3073     }
3074   }
3075 }
3076 
_dbcache_update_lw(IWLCTX * lx,SBLK * sblk)3077 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk) {
3078   IWDB db = lx->db;
3079   DBCACHE *cache = &db->cache;
3080   assert(sblk->pnum > 0);
3081   sblk->flags &= ~SBLK_CACHE_UPDATE;
3082   if ((sblk->lvl < cache->lvl) || (cache->num < 1)) {
3083     return;
3084   }
3085   blkn_t sblkn = ADDR2BLK(sblk->addr);
3086   size_t num = cache->num;
3087   size_t nsize = cache->nsize;
3088   uint8_t *rp = (uint8_t*) cache->nodes;
3089   for (size_t i = 0; i < num; ++i) {
3090     DBCNODE *n = (DBCNODE*) (rp + i * nsize);
3091     if (sblkn == n->sblkn) {
3092       n->kblkn = sblk->kvblkn;
3093       n->lkl = sblk->lkl;
3094       n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
3095       n->k0idx = sblk->pi[0];
3096       memcpy((uint8_t*) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
3097       break;
3098     }
3099   }
3100 }
3101 
3102 //--------------------------  CURSOR
3103 
_cursor_get_ge_idx(IWLCTX * lx,IWKV_cursor_op op,uint8_t * oidx)3104 IW_INLINE WUR iwrc _cursor_get_ge_idx(IWLCTX *lx, IWKV_cursor_op op, uint8_t *oidx) {
3105   iwrc rc = _lx_find_bounds(lx);
3106   RCRET(rc);
3107   bool found;
3108   uint8_t *mm, idx;
3109   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3110   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3111   RCRET(rc);
3112   rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
3113   RCGO(rc, finish);
3114   rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
3115   RCGO(rc, finish);
3116   if (found) {
3117     *oidx = idx;
3118   } else {
3119     if ((op == IWKV_CURSOR_EQ) || (lx->lower->flags & SBLK_DB) || (lx->lower->pnum < 1)) {
3120       rc = IWKV_ERROR_NOTFOUND;
3121     } else {
3122       *oidx = idx ? idx - 1 : idx;
3123     }
3124   }
3125 
3126 finish:
3127   IWRC(fsm->release_mmap(fsm), rc);
3128   return rc;
3129 }
3130 
_cursor_to_lr(IWKV_cursor cur,IWKV_cursor_op op)3131 static WUR iwrc _cursor_to_lr(IWKV_cursor cur, IWKV_cursor_op op) {
3132   iwrc rc = 0;
3133   IWDB db = cur->lx.db;
3134   IWLCTX *lx = &cur->lx;
3135   blkn_t dblk = ADDR2BLK(db->addr);
3136   if (op < IWKV_CURSOR_NEXT) { // IWKV_CURSOR_BEFORE_FIRST | IWKV_CURSOR_AFTER_LAST
3137     if (cur->cn) {
3138       _sblk_release(lx, &cur->cn);
3139     }
3140     if (op == IWKV_CURSOR_BEFORE_FIRST) {
3141       cur->dbaddr = db->addr;
3142       cur->cnpos = KVBLK_IDXNUM - 1;
3143     } else {
3144       cur->dbaddr = -1; // Negative as sign of dbtail
3145       cur->cnpos = 0;
3146     }
3147     return 0;
3148   }
3149 
3150 start:
3151   if (op < IWKV_CURSOR_EQ) { // IWKV_CURSOR_NEXT | IWKV_CURSOR_PREV
3152     blkn_t n = 0;
3153     if (!cur->cn) {
3154       if (cur->dbaddr) {
3155         rc = _sblk_at(lx, (cur->dbaddr < 0 ? 0 : cur->dbaddr), 0, &cur->cn);
3156         cur->dbaddr = 0;
3157         RCGO(rc, finish);
3158       } else {
3159         rc = IWKV_ERROR_NOTFOUND;
3160         goto finish;
3161       }
3162     }
3163     if (op == IWKV_CURSOR_NEXT) {
3164       if (cur->skip_next > 0) {
3165         goto finish;
3166       }
3167       if (cur->cnpos + 1 >= cur->cn->pnum) {
3168         n = cur->cn->n[0];
3169         if (!n) {
3170           rc = IWKV_ERROR_NOTFOUND;
3171           goto finish;
3172         }
3173         _sblk_release(lx, &cur->cn);
3174         rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3175         RCGO(rc, finish);
3176         cur->cnpos = 0;
3177         if (IW_UNLIKELY(!cur->cn->pnum)) {
3178           goto start;
3179         }
3180       } else {
3181         if (cur->cn->flags & SBLK_DB) {
3182           rc = IWKV_ERROR_NOTFOUND;
3183           goto finish;
3184         }
3185         ++cur->cnpos;
3186       }
3187     } else { // IWKV_CURSOR_PREV
3188       if (cur->skip_next < 0) {
3189         goto finish;
3190       }
3191       if (cur->cnpos == 0) {
3192         n = cur->cn->p0;
3193         if (!n || (n == dblk)) {
3194           rc = IWKV_ERROR_NOTFOUND;
3195           goto finish;
3196         }
3197         _sblk_release(lx, &cur->cn);
3198         RCGO(rc, finish);
3199         rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3200         RCGO(rc, finish);
3201         if (IW_LIKELY(cur->cn->pnum)) {
3202           cur->cnpos = cur->cn->pnum - 1;
3203         } else {
3204           goto start;
3205         }
3206       } else {
3207         if (cur->cn->flags & SBLK_DB) {
3208           rc = IWKV_ERROR_NOTFOUND;
3209           goto finish;
3210         }
3211         --cur->cnpos;
3212       }
3213     }
3214   } else { // IWKV_CURSOR_EQ | IWKV_CURSOR_GE
3215     if (!lx->key) {
3216       rc = IW_ERROR_INVALID_STATE;
3217       goto finish;
3218     }
3219     rc = _cursor_get_ge_idx(lx, op, &cur->cnpos);
3220     if (lx->upper) {
3221       _sblk_release(lx, &lx->upper);
3222     }
3223     if (!rc) {
3224       cur->cn = lx->lower;
3225       lx->lower = 0;
3226     }
3227   }
3228 
3229 finish:
3230   cur->skip_next = 0;
3231   if (rc && (rc != IWKV_ERROR_NOTFOUND)) {
3232     if (cur->cn) {
3233       _sblk_release(lx, &cur->cn);
3234     }
3235   }
3236   return rc;
3237 }
3238 
3239 //--------------------------  PUBLIC API
3240 
_kv_ecodefn(locale_t locale,uint32_t ecode)3241 static const char* _kv_ecodefn(locale_t locale, uint32_t ecode) {
3242   if (!((ecode > _IWKV_ERROR_START) && (ecode < _IWKV_ERROR_END))) {
3243     return 0;
3244   }
3245   switch (ecode) {
3246     case IWKV_ERROR_NOTFOUND:
3247       return "Key not found. (IWKV_ERROR_NOTFOUND)";
3248     case IWKV_ERROR_KEY_EXISTS:
3249       return "Key exists. (IWKV_ERROR_KEY_EXISTS)";
3250     case IWKV_ERROR_MAXKVSZ:
3251       return "Size of Key+value must be not greater than 0xfffffff bytes (IWKV_ERROR_MAXKVSZ)";
3252     case IWKV_ERROR_CORRUPTED:
3253       return "Database file invalid or corrupted (IWKV_ERROR_CORRUPTED)";
3254     case IWKV_ERROR_DUP_VALUE_SIZE:
3255       return "Value size is not compatible for insertion into sorted values array (IWKV_ERROR_DUP_VALUE_SIZE)";
3256     case IWKV_ERROR_KEY_NUM_VALUE_SIZE:
3257       return "Given key is not compatible to store as number (IWKV_ERROR_KEY_NUM_VALUE_SIZE)";
3258     case IWKV_ERROR_INCOMPATIBLE_DB_MODE:
3259       return "Incompatible database open mode (IWKV_ERROR_INCOMPATIBLE_DB_MODE)";
3260     case IWKV_ERROR_INCOMPATIBLE_DB_FORMAT:
3261       return "Incompatible database format version, please migrate database data (IWKV_ERROR_INCOMPATIBLE_DB_FORMAT)";
3262     case IWKV_ERROR_CORRUPTED_WAL_FILE:
3263       return "Corrupted WAL file (IWKV_ERROR_CORRUPTED_WAL_FILE)";
3264     case IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED:
3265       return "Stored value cannot be incremented/descremented (IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED)";
3266     case IWKV_ERROR_WAL_MODE_REQUIRED:
3267       return "Operation requires WAL enabled database. (IWKV_ERROR_WAL_MODE_REQUIRED)";
3268     case IWKV_ERROR_BACKUP_IN_PROGRESS:
3269       return "ackup operation in progress. (IWKV_ERROR_BACKUP_IN_PROGRESS)";
3270     default:
3271       break;
3272   }
3273   return 0;
3274 }
3275 
iwkv_init(void)3276 iwrc iwkv_init(void) {
3277   static int _kv_initialized = 0;
3278   if (!__sync_bool_compare_and_swap(&_kv_initialized, 0, 1)) {
3279     return 0;
3280   }
3281   return iwlog_register_ecodefn(_kv_ecodefn);
3282 }
3283 
_szpolicy(off_t nsize,off_t csize,struct IWFS_EXT * f,void ** _ctx)3284 static off_t _szpolicy(off_t nsize, off_t csize, struct IWFS_EXT *f, void **_ctx) {
3285   off_t res;
3286   size_t aunit = iwp_alloc_unit();
3287   if (csize < 0x4000000) { // Doubled alloc up to 64M
3288     res = csize ? csize : aunit;
3289     while (res < nsize) {
3290       res <<= 1;
3291     }
3292   } else {
3293     res = nsize + 10 * 1024 * 1024; // + 10M extra space
3294   }
3295   res = IW_ROUNDUP(res, aunit);
3296   return res;
3297 }
3298 
iwkv_state(IWKV iwkv,IWFS_FSM_STATE * out)3299 iwrc iwkv_state(IWKV iwkv, IWFS_FSM_STATE *out) {
3300   if (!iwkv || !out) {
3301     return IW_ERROR_INVALID_ARGS;
3302   }
3303   int rci;
3304   API_RLOCK(iwkv, rci);
3305   IWFS_FSM fsm = iwkv->fsm;
3306   iwrc rc = fsm.state(&fsm, out);
3307   API_UNLOCK(iwkv, rci, rc);
3308   return rc;
3309 }
3310 
iwkv_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)3311 iwrc iwkv_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
3312   return iwal_online_backup(iwkv, ts, target_file);
3313 }
3314 
_iwkv_check_online_backup(const char * path,iwp_lockmode extra_lock_flags,bool * out_has_online_bkp)3315 static iwrc _iwkv_check_online_backup(const char *path, iwp_lockmode extra_lock_flags, bool *out_has_online_bkp) {
3316   size_t sp;
3317   uint32_t lv;
3318   off_t fsz, pos;
3319   uint64_t waloff; // WAL offset
3320   char buf[16384];
3321 
3322   *out_has_online_bkp = false;
3323   const size_t aunit = iwp_alloc_unit();
3324   char *wpath = 0;
3325 
3326   IWFS_FILE f = { 0 }, w = { 0 };
3327   IWFS_FILE_STATE fs, fw;
3328   iwrc rc = iwfs_file_open(&f, &(IWFS_FILE_OPTS) {
3329     .path = path,
3330     .omode = IWFS_OREAD | IWFS_OWRITE,
3331     .lock_mode = IWP_WLOCK | extra_lock_flags
3332   });
3333   if (rc == IW_ERROR_NOT_EXISTS) {
3334     return 0;
3335   }
3336   RCRET(rc);
3337 
3338   rc = f.state(&f, &fs);
3339   RCGO(rc, finish);
3340 
3341   rc = iwp_lseek(fs.fh, 0, IWP_SEEK_END, &fsz);
3342   RCGO(rc, finish);
3343   if (fsz < iwp_alloc_unit()) {
3344     goto finish;
3345   }
3346 
3347   rc = iwp_pread(fs.fh, 0, &lv, sizeof(lv), &sp);
3348   RCGO(rc, finish);
3349   lv = IW_ITOHL(lv);
3350   if ((sp != sizeof(lv)) || (lv != IWFSM_MAGICK)) {
3351     goto finish;
3352   }
3353 
3354   rc = iwp_pread(fs.fh, IWFSM_CUSTOM_HDR_DATA_OFFSET, &lv, sizeof(lv), &sp);
3355   RCGO(rc, finish);
3356   lv = IW_ITOHL(lv);
3357   if ((sp != sizeof(lv)) || (lv != IWKV_MAGIC)) {
3358     goto finish;
3359   }
3360 
3361   rc = iwp_lseek(fs.fh, (off_t) -1 * sizeof(lv), IWP_SEEK_END, 0);
3362   RCGO(rc, finish);
3363 
3364   rc = iwp_read(fs.fh, &lv, sizeof(lv), &sp);
3365   RCGO(rc, finish);
3366   lv = IW_ITOHL(lv);
3367   if ((sp != sizeof(lv)) || (lv != IWKV_BACKUP_MAGIC)) {
3368     goto finish;
3369   }
3370 
3371   // Get WAL data offset
3372   rc = iwp_lseek(fs.fh, (off_t) -1 * (sizeof(waloff) + sizeof(lv)), IWP_SEEK_END, &pos);
3373   RCGO(rc, finish);
3374 
3375   rc = iwp_read(fs.fh, &waloff, sizeof(waloff), &sp);
3376   RCGO(rc, finish);
3377 
3378   waloff = IW_ITOHLL(waloff);
3379   if (((waloff != pos) && (waloff > pos - sizeof(WBSEP))) || (waloff & (aunit - 1))) {
3380     goto finish;
3381   }
3382 
3383   // Read the first WAL instruction: WBSEP
3384   if (waloff != pos) { // Not an empty WAL?
3385     WBSEP wbsep = { 0 };
3386     rc = iwp_pread(fs.fh, waloff, &wbsep, sizeof(wbsep), &sp);
3387     RCGO(rc, finish);
3388     if (wbsep.id != WOP_SEP) {
3389       goto finish;
3390     }
3391   }
3392 
3393   // Now we have an online backup image, unpack WAL file
3394 
3395   sp = strlen(path);
3396   wpath = malloc(sp + 4 /*-wal*/ + 1 /*\0*/);
3397   if (!wpath) {
3398     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3399     goto finish;
3400   }
3401   memcpy(wpath, path, sp);
3402   memcpy(wpath + sp, "-wal", 4);
3403   wpath[sp + 4] = '\0';
3404 
3405   iwlog_warn("Unpacking WAL from online backup into: %s", wpath);
3406   *out_has_online_bkp = true;
3407 
3408   // WAL file
3409   rc = iwfs_file_open(&w, &(IWFS_FILE_OPTS) {
3410     .path = wpath,
3411     .omode = IWFS_OREAD | IWFS_OWRITE | IWFS_OTRUNC
3412   });
3413   RCGO(rc, finish);
3414 
3415   rc = w.state(&w, &fw);
3416   RCGO(rc, finish);
3417 
3418   // WAL content copy
3419   rc = iwp_lseek(fs.fh, waloff, IWP_SEEK_SET, 0);
3420   RCGO(rc, finish);
3421   fsz = fsz - waloff - sizeof(lv) /* magic */ - sizeof(waloff) /* wal offset */;
3422   if (fsz > 0) {
3423     sp = 0;
3424     do {
3425       rc = iwp_read(fs.fh, buf, sizeof(buf), &sp);
3426       RCGO(rc, finish);
3427       if (sp > fsz) {
3428         sp = fsz;
3429       }
3430       fsz -= sp;
3431       rc = iwp_write(fw.fh, buf, sp);
3432       RCGO(rc, finish);
3433     } while (fsz > 0 && sp > 0);
3434   }
3435   rc = iwp_fsync(fw.fh);
3436   RCGO(rc, finish);
3437 
3438   rc = iwp_ftruncate(fs.fh, waloff);
3439   RCGO(rc, finish);
3440 
3441   rc = iwp_fsync(fs.fh);
3442   RCGO(rc, finish);
3443 
3444 finish:
3445   if (f.impl) {
3446     IWRC(f.close(&f), rc);
3447   }
3448   if (w.impl) {
3449     IWRC(w.close(&w), rc);
3450   }
3451   free(wpath);
3452   return rc;
3453 }
3454 
iwkv_open(const IWKV_OPTS * opts,IWKV * iwkvp)3455 iwrc iwkv_open(const IWKV_OPTS *opts, IWKV *iwkvp) {
3456   if (!opts || !iwkvp || !opts->path) {
3457     return IW_ERROR_INVALID_ARGS;
3458   }
3459   *iwkvp = 0;
3460   int rci;
3461   iwrc rc = 0;
3462   uint32_t lv;
3463   uint64_t llv;
3464   uint8_t *rp, *mm;
3465   bool has_online_bkp = false;
3466 
3467   rc = iw_init();
3468   RCRET(rc);
3469 
3470   if (opts->random_seed) {
3471     iwu_rand_seed(opts->random_seed);
3472   }
3473   iwkv_openflags oflags = opts->oflags;
3474   iwfs_omode omode = IWFS_OREAD;
3475   if (oflags & IWKV_TRUNC) {
3476     oflags &= ~IWKV_RDONLY;
3477     omode |= IWFS_OTRUNC;
3478   }
3479   if (!(oflags & IWKV_RDONLY)) {
3480     omode |= IWFS_OWRITE;
3481     omode |= IWFS_OCREATE;
3482   }
3483   if ((omode & IWFS_OWRITE) && !(omode & IWFS_OTRUNC)) {
3484     iwp_lockmode extra_lock_flags = 0;
3485     if (opts->file_lock_fail_fast) {
3486       extra_lock_flags |= IWP_NBLOCK;
3487     }
3488     rc = _iwkv_check_online_backup(opts->path, extra_lock_flags, &has_online_bkp);
3489     RCRET(rc);
3490   }
3491 
3492   *iwkvp = calloc(1, sizeof(struct _IWKV));
3493   if (!*iwkvp) {
3494     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
3495   }
3496   IWKV iwkv = *iwkvp;
3497   iwkv->fmt_version = opts->fmt_version > 0 ? opts->fmt_version : IWKV_FORMAT;
3498   if (iwkv->fmt_version > IWKV_FORMAT) {
3499     rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3500     iwlog_ecode_error3(rc);
3501     return rc;
3502   }
3503   // Adjust lower key len accourding to database format version
3504   if (iwkv->fmt_version < 2) {
3505     iwkv->pklen = PREFIX_KEY_LEN_V1;
3506   } else {
3507     iwkv->pklen = PREFIX_KEY_LEN_V2;
3508   }
3509 
3510   pthread_rwlockattr_t attr;
3511   pthread_rwlockattr_init(&attr);
3512 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
3513   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3514 #endif
3515   rci = pthread_rwlock_init(&iwkv->rwl, &attr);
3516   if (rci) {
3517     free(*iwkvp);
3518     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3519   }
3520   rci = pthread_mutex_init(&iwkv->wk_mtx, 0);
3521   if (rci) {
3522     pthread_rwlock_destroy(&iwkv->rwl);
3523     free(*iwkvp);
3524     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3525   }
3526   rci = pthread_cond_init(&iwkv->wk_cond, 0);
3527   if (rci) {
3528     pthread_rwlock_destroy(&iwkv->rwl);
3529     pthread_mutex_destroy(&iwkv->wk_mtx);
3530     free(*iwkvp);
3531     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3532   }
3533 
3534   iwkv->oflags = oflags;
3535   IWFS_FSM_STATE fsmstate;
3536   IWFS_FSM_OPTS fsmopts = {
3537     .exfile        = {
3538       .file        = {
3539         .path      = opts->path,
3540         .omode     = omode,
3541         .lock_mode = (oflags & IWKV_RDONLY) ? IWP_RLOCK : IWP_WLOCK
3542       },
3543       .rspolicy    = _szpolicy,
3544       .maxoff      = IWKV_MAX_DBSZ,
3545       .use_locks   = true
3546     },
3547     .bpow          = IWKV_FSM_BPOW, // 64 bytes block size
3548     .hdrlen        = KVHDRSZ,       // Size of custom file header
3549     .oflags        = ((oflags & IWKV_RDONLY) ? IWFSM_NOLOCKS : 0),
3550     .mmap_all      = true
3551   };
3552 #ifndef NDEBUG
3553   fsmopts.oflags |= IWFSM_STRICT;
3554 #endif
3555   if (oflags & IWKV_NO_TRIM_ON_CLOSE) {
3556     fsmopts.oflags |= IWFSM_NO_TRIM_ON_CLOSE;
3557   }
3558   if (opts->file_lock_fail_fast) {
3559     fsmopts.exfile.file.lock_mode |= IWP_NBLOCK;
3560   }
3561   // Init WAL
3562   rc = iwal_create(iwkv, opts, &fsmopts, has_online_bkp);
3563   RCGO(rc, finish);
3564 
3565   // Now open database file
3566   rc = iwfs_fsmfile_open(&iwkv->fsm, &fsmopts);
3567   RCGO(rc, finish);
3568 
3569   IWFS_FSM *fsm = &iwkv->fsm;
3570   iwkv->dbs = kh_init(DBS);
3571   rc = fsm->state(fsm, &fsmstate);
3572   RCGO(rc, finish);
3573 
3574   // Database header: [magic:u4, first_addr:u8, db_format_version:u4]
3575   if (fsmstate.exfile.file.ostatus & IWFS_OPEN_NEW) {
3576     uint8_t hdr[KVHDRSZ] = { 0 };
3577     uint8_t *wp = hdr;
3578     IW_WRITELV(wp, lv, IWKV_MAGIC);
3579     wp += sizeof(llv); // skip first db addr
3580     IW_WRITELV(wp, lv, iwkv->fmt_version);
3581     rc = fsm->writehdr(fsm, 0, hdr, sizeof(hdr));
3582     RCGO(rc, finish);
3583     rc = fsm->sync(fsm, 0);
3584     RCGO(rc, finish);
3585   } else {
3586     off_t dbaddr; // first database address
3587     uint8_t hdr[KVHDRSZ];
3588     rc = fsm->readhdr(fsm, 0, hdr, KVHDRSZ);
3589     RCGO(rc, finish);
3590     rp = hdr; // -V507
3591     IW_READLV(rp, lv, lv);
3592     IW_READLLV(rp, llv, dbaddr);
3593     if ((lv != IWKV_MAGIC) || (dbaddr < 0)) {
3594       rc = IWKV_ERROR_CORRUPTED;
3595       iwlog_ecode_error3(rc);
3596       goto finish;
3597     }
3598     IW_READLV(rp, lv, iwkv->fmt_version);
3599     if ((iwkv->fmt_version > IWKV_FORMAT)) {
3600       rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3601       iwlog_ecode_error3(rc);
3602       goto finish;
3603     }
3604     if (iwkv->fmt_version < 2) {
3605       iwkv->pklen = PREFIX_KEY_LEN_V1;
3606     } else {
3607       iwkv->pklen = PREFIX_KEY_LEN_V2;
3608     }
3609     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3610     RCGO(rc, finish);
3611     rc = _db_load_chain(iwkv, dbaddr, mm);
3612     fsm->release_mmap(fsm);
3613   }
3614   (*iwkvp)->open = true;
3615 
3616 finish:
3617   if (rc) {
3618     (*iwkvp)->open = true; // will be closed in iwkv_close
3619     IWRC(iwkv_close(iwkvp), rc);
3620   }
3621   return rc;
3622 }
3623 
iwkv_exclusive_lock(IWKV iwkv)3624 iwrc iwkv_exclusive_lock(IWKV iwkv) {
3625   return _wnw(iwkv, _wnw_iwkw_wl);
3626 }
3627 
iwkv_exclusive_unlock(IWKV iwkv)3628 iwrc iwkv_exclusive_unlock(IWKV iwkv) {
3629   int rci;
3630   iwrc rc = 0;
3631   API_UNLOCK(iwkv, rci, rc);
3632   return rc;
3633 }
3634 
iwkv_close(IWKV * iwkvp)3635 iwrc iwkv_close(IWKV *iwkvp) {
3636   ENSURE_OPEN((*iwkvp));
3637   IWKV iwkv = *iwkvp;
3638   iwkv->open = false;
3639   iwal_shutdown(iwkv);
3640   iwrc rc = iwkv_exclusive_lock(iwkv);
3641   RCRET(rc);
3642   IWDB db = iwkv->first_db;
3643   while (db) {
3644     IWDB ndb = db->next;
3645     _db_release_lw(&db);
3646     db = ndb;
3647   }
3648   IWRC(iwkv->fsm.close(&iwkv->fsm), rc);
3649   // Below the memory cleanup only
3650   if (iwkv->dbs) {
3651     kh_destroy(DBS, iwkv->dbs);
3652     iwkv->dbs = 0;
3653   }
3654   iwkv_exclusive_unlock(iwkv);
3655   pthread_rwlock_destroy(&iwkv->rwl);
3656   pthread_mutex_destroy(&iwkv->wk_mtx);
3657   pthread_cond_destroy(&iwkv->wk_cond);
3658   free(iwkv);
3659   *iwkvp = 0;
3660   return rc;
3661 }
3662 
_iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3663 static iwrc _iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3664   ENSURE_OPEN(iwkv);
3665   if (iwkv->oflags & IWKV_RDONLY) {
3666     return IW_ERROR_READONLY;
3667   }
3668   iwrc rc;
3669   if (iwkv->dlsnr) {
3670     rc = iwal_poke_savepoint(iwkv);
3671   } else {
3672     IWFS_FSM *fsm = &iwkv->fsm;
3673     pthread_rwlock_wrlock(&iwkv->rwl);
3674     iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3675     rc = fsm->sync(fsm, flags);
3676     pthread_rwlock_unlock(&iwkv->rwl);
3677   }
3678   return rc;
3679 }
3680 
iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3681 iwrc iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3682   ENSURE_OPEN(iwkv);
3683   if (iwkv->oflags & IWKV_RDONLY) {
3684     return IW_ERROR_READONLY;
3685   }
3686   iwrc rc;
3687   if (iwkv->dlsnr) {
3688     rc = iwkv_exclusive_lock(iwkv);
3689     RCRET(rc);
3690     rc = iwal_savepoint_exl(iwkv, true);
3691     iwkv_exclusive_unlock(iwkv);
3692   } else {
3693     IWFS_FSM *fsm = &iwkv->fsm;
3694     pthread_rwlock_wrlock(&iwkv->rwl);
3695     iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3696     rc = fsm->sync(fsm, flags);
3697     pthread_rwlock_unlock(&iwkv->rwl);
3698   }
3699   return rc;
3700 }
3701 
iwkv_db(IWKV iwkv,uint32_t dbid,iwdb_flags_t dbflg,IWDB * dbp)3702 iwrc iwkv_db(IWKV iwkv, uint32_t dbid, iwdb_flags_t dbflg, IWDB *dbp) {
3703   int rci;
3704   iwrc rc = 0;
3705   IWDB db = 0;
3706   *dbp = 0;
3707   API_RLOCK(iwkv, rci);
3708   khiter_t ki = kh_get(DBS, iwkv->dbs, dbid);
3709   if (ki != kh_end(iwkv->dbs)) {
3710     db = kh_value(iwkv->dbs, ki);
3711   }
3712   API_UNLOCK(iwkv, rci, rc);
3713   RCRET(rc);
3714   if (db) {
3715     if (db->dbflg != dbflg) {
3716       return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3717     }
3718     *dbp = db;
3719     return 0;
3720   }
3721   if (iwkv->oflags & IWKV_RDONLY) {
3722     return IW_ERROR_READONLY;
3723   }
3724   rc = iwkv_exclusive_lock(iwkv);
3725   RCRET(rc);
3726   ki = kh_get(DBS, iwkv->dbs, dbid);
3727   if (ki != kh_end(iwkv->dbs)) {
3728     db = kh_value(iwkv->dbs, ki);
3729   }
3730   if (db) {
3731     if (db->dbflg != dbflg) {
3732       return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3733     }
3734     *dbp = db;
3735   } else {
3736     rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3737   }
3738   if (!rc) {
3739     rc = iwal_savepoint_exl(iwkv, true);
3740   }
3741   iwkv_exclusive_unlock(iwkv);
3742   return rc;
3743 }
3744 
iwkv_new_db(IWKV iwkv,iwdb_flags_t dbflg,uint32_t * dbidp,IWDB * dbp)3745 iwrc iwkv_new_db(IWKV iwkv, iwdb_flags_t dbflg, uint32_t *dbidp, IWDB *dbp) {
3746   *dbp = 0;
3747   *dbidp = 0;
3748   if (iwkv->oflags & IWKV_RDONLY) {
3749     return IW_ERROR_READONLY;
3750   }
3751   uint32_t dbid = 0;
3752   iwrc rc = iwkv_exclusive_lock(iwkv);
3753   RCRET(rc);
3754   for (khiter_t k = kh_begin(iwkv->dbs); k != kh_end(iwkv->dbs); ++k) {
3755     if (!kh_exist(iwkv->dbs, k)) {
3756       continue;
3757     }
3758     uint32_t id = kh_key(iwkv->dbs, k);
3759     if (id > dbid) {
3760       dbid = id;
3761     }
3762   }
3763   dbid++;
3764   rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3765   if (!rc) {
3766     *dbidp = dbid;
3767     rc = iwal_savepoint_exl(iwkv, true);
3768   }
3769   iwkv_exclusive_unlock(iwkv);
3770   return rc;
3771 }
3772 
iwkv_db_cache_release(IWDB db)3773 iwrc iwkv_db_cache_release(IWDB db) {
3774   if (!db || !db->iwkv) {
3775     return IW_ERROR_INVALID_ARGS;
3776   }
3777   int rci;
3778   iwrc rc = 0;
3779   API_DB_WLOCK(db, rci);
3780   _dbcache_destroy_lw(db);
3781   API_DB_UNLOCK(db, rci, rc);
3782   return rc;
3783 }
3784 
iwkv_db_destroy(IWDB * dbp)3785 iwrc iwkv_db_destroy(IWDB *dbp) {
3786   if (!dbp || !*dbp) {
3787     return IW_ERROR_INVALID_ARGS;
3788   }
3789   IWDB db = *dbp;
3790   IWKV iwkv = db->iwkv;
3791   *dbp = 0;
3792   if (iwkv->oflags & IWKV_RDONLY) {
3793     return IW_ERROR_READONLY;
3794   }
3795   iwrc rc = iwkv_exclusive_lock(iwkv);
3796   RCRET(rc);
3797   rc = _db_destroy_lw(&db);
3798   iwkv_exclusive_unlock(iwkv);
3799   return rc;
3800 }
3801 
iwkv_puth(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3802 iwrc iwkv_puth(
3803   IWDB db, const IWKV_val *key, const IWKV_val *val,
3804   iwkv_opflags opflags, IWKV_PUT_HANDLER ph, void *phop) {
3805   if (!db || !db->iwkv || !key || !key->size || !val) {
3806     return IW_ERROR_INVALID_ARGS;
3807   }
3808   IWKV iwkv = db->iwkv;
3809   if (iwkv->oflags & IWKV_RDONLY) {
3810     return IW_ERROR_READONLY;
3811   }
3812   if (opflags & IWKV_VAL_INCREMENT) {
3813     // No overwrite for increment
3814     opflags &= ~IWKV_NO_OVERWRITE;
3815   }
3816 
3817   int rci;
3818   IWKV_val ekey;
3819   uint8_t nbuf[IW_VNUMBUFSZ];
3820   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3821   RCRET(rc);
3822 
3823   IWLCTX lx = {
3824     .db      = db,
3825     .key     = &ekey,
3826     .val     = (IWKV_val*) val,
3827     .nlvl    = -1,
3828     .op      = IWLCTX_PUT,
3829     .opflags = opflags,
3830     .ph      = ph,
3831     .phop    = phop
3832   };
3833   API_DB_WLOCK(db, rci);
3834   if (!db->cache.open) {
3835     rc = _dbcache_fill_lw(&lx);
3836     RCGO(rc, finish);
3837   }
3838   rc = _lx_put_lw(&lx);
3839 
3840 finish:
3841   API_DB_UNLOCK(db, rci, rc);
3842   if (!rc) {
3843     if (lx.opflags & IWKV_SYNC) {
3844       rc = _iwkv_sync(iwkv, 0);
3845     } else {
3846       rc = iwal_poke_checkpoint(iwkv, false);
3847     }
3848   }
3849   return rc;
3850 }
3851 
iwkv_put(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags)3852 iwrc iwkv_put(IWDB db, const IWKV_val *key, const IWKV_val *val, iwkv_opflags opflags) {
3853   return iwkv_puth(db, key, val, opflags, 0, 0);
3854 }
3855 
iwkv_get(IWDB db,const IWKV_val * key,IWKV_val * oval)3856 iwrc iwkv_get(IWDB db, const IWKV_val *key, IWKV_val *oval) {
3857   if (!db || !db->iwkv || !key || !oval) {
3858     return IW_ERROR_INVALID_ARGS;
3859   }
3860 
3861   int rci;
3862   IWKV_val ekey;
3863   uint8_t nbuf[IW_VNUMBUFSZ];
3864   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3865   RCRET(rc);
3866 
3867   IWLCTX lx = {
3868     .db   = db,
3869     .key  = &ekey,
3870     .val  = oval,
3871     .nlvl = -1
3872   };
3873   oval->size = 0;
3874   if (IW_LIKELY(db->cache.open)) {
3875     API_DB_RLOCK(db, rci);
3876   } else {
3877     API_DB_WLOCK(db, rci);
3878     if (!db->cache.open) { // -V547
3879       rc = _dbcache_fill_lw(&lx);
3880       RCGO(rc, finish);
3881     }
3882   }
3883   rc = _lx_get_lr(&lx);
3884 
3885 finish:
3886   API_DB_UNLOCK(db, rci, rc);
3887   return rc;
3888 }
3889 
iwkv_get_copy(IWDB db,const IWKV_val * key,void * vbuf,size_t vbufsz,size_t * vsz)3890 iwrc iwkv_get_copy(IWDB db, const IWKV_val *key, void *vbuf, size_t vbufsz, size_t *vsz) {
3891   if (!db || !db->iwkv || !key || !vbuf) {
3892     return IW_ERROR_INVALID_ARGS;
3893   }
3894   *vsz = 0;
3895 
3896   int rci;
3897   bool found;
3898   IWKV_val ekey;
3899   uint32_t ovalsz;
3900   uint8_t *mm = 0, *oval, idx;
3901   IWFS_FSM *fsm = &db->iwkv->fsm;
3902   uint8_t nbuf[IW_VNUMBUFSZ];
3903   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3904   RCRET(rc);
3905 
3906   IWLCTX lx = {
3907     .db   = db,
3908     .key  = &ekey,
3909     .nlvl = -1
3910   };
3911   if (IW_LIKELY(db->cache.open)) {
3912     API_DB_RLOCK(db, rci);
3913   } else {
3914     API_DB_WLOCK(db, rci);
3915     if (!db->cache.open) { // -V547
3916       rc = _dbcache_fill_lw(&lx);
3917       RCGO(rc, finish);
3918     }
3919   }
3920   rc = _lx_find_bounds(&lx);
3921   RCGO(rc, finish);
3922   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3923   RCGO(rc, finish);
3924   rc = _sblk_loadkvblk_mm(&lx, lx.lower, mm);
3925   RCGO(rc, finish);
3926   rc = _sblk_find_pi_mm(lx.lower, &lx, mm, &found, &idx);
3927   RCGO(rc, finish);
3928   if (found) {
3929     _kvblk_value_peek(lx.lower->kvblk, lx.lower->pi[idx], mm, &oval, &ovalsz);
3930     *vsz = ovalsz;
3931     memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3932   } else {
3933     rc = IWKV_ERROR_NOTFOUND;
3934   }
3935 
3936 finish:
3937   if (mm) {
3938     IWRC(fsm->release_mmap(fsm), rc);
3939   }
3940   _lx_release_mm(&lx, 0);
3941   API_DB_UNLOCK(db, rci, rc);
3942   return rc;
3943 }
3944 
iwkv_db_set_meta(IWDB db,void * buf,size_t sz)3945 iwrc iwkv_db_set_meta(IWDB db, void *buf, size_t sz) {
3946   if (!db || !db->iwkv || !buf) {
3947     return IW_ERROR_INVALID_ARGS;
3948   }
3949   if (!sz) {
3950     return 0;
3951   }
3952 
3953   int rci;
3954   iwrc rc = 0;
3955   bool resized = false;
3956   uint8_t *mm = 0, *wp, *sp;
3957   IWFS_FSM *fsm = &db->iwkv->fsm;
3958   size_t asz = IW_ROUNDUP(sz, 1U << IWKV_FSM_BPOW);
3959 
3960   API_DB_WLOCK(db, rci);
3961   if ((asz > db->meta_blkn) || (asz * 2 <= db->meta_blkn)) {
3962     off_t oaddr = 0;
3963     off_t olen = 0;
3964     if (db->meta_blk) {
3965       rc = fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn));
3966       RCGO(rc, finish);
3967     }
3968     rc = fsm->allocate(fsm, asz, &oaddr, &olen, IWKV_FSM_ALLOC_FLAGS);
3969     RCGO(rc, finish);
3970     db->meta_blk = ADDR2BLK(oaddr);
3971     db->meta_blkn = ADDR2BLK(olen);
3972     resized = true;
3973   }
3974   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3975   RCGO(rc, finish);
3976   wp = mm + BLK2ADDR(db->meta_blk);
3977   memcpy(wp, buf, sz);
3978   if (db->iwkv->dlsnr) {
3979     rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, wp - mm, wp, sz, 0);
3980     RCGO(rc, finish);
3981   }
3982   if (resized) {
3983     uint32_t lv;
3984     wp = mm + db->addr + DOFF_METABLK_U4;
3985     sp = wp;
3986     IW_WRITELV(wp, lv, db->meta_blk);
3987     IW_WRITELV(wp, lv, db->meta_blkn);
3988     if (db->iwkv->dlsnr) {
3989       rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, sp - mm, sp, wp - sp, 0);
3990       RCGO(rc, finish);
3991     }
3992   }
3993   fsm->release_mmap(fsm);
3994   mm = 0;
3995 
3996 finish:
3997   if (mm) {
3998     fsm->release_mmap(fsm);
3999   }
4000   API_DB_UNLOCK(db, rci, rc);
4001   return rc;
4002 }
4003 
iwkv_db_get_meta(IWDB db,void * buf,size_t sz,size_t * rsz)4004 iwrc iwkv_db_get_meta(IWDB db, void *buf, size_t sz, size_t *rsz) {
4005   if (!db || !db->iwkv || !buf) {
4006     return IW_ERROR_INVALID_ARGS;
4007   }
4008   *rsz = 0;
4009   if (!sz || !db->meta_blkn) {
4010     return 0;
4011   }
4012   int rci;
4013   iwrc rc = 0;
4014   uint8_t *mm = 0;
4015   IWFS_FSM *fsm = &db->iwkv->fsm;
4016   size_t rmax = BLK2ADDR(db->meta_blkn);
4017   if (sz > rmax) {
4018     sz = rmax;
4019   }
4020   API_DB_RLOCK(db, rci);
4021   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4022   RCGO(rc, finish);
4023   memcpy(buf, mm + BLK2ADDR(db->meta_blk), sz);
4024   *rsz = sz;
4025 
4026 finish:
4027   if (mm) {
4028     fsm->release_mmap(fsm);
4029   }
4030   API_DB_UNLOCK(db, rci, rc);
4031   return rc;
4032 }
4033 
iwkv_del(IWDB db,const IWKV_val * key,iwkv_opflags opflags)4034 iwrc iwkv_del(IWDB db, const IWKV_val *key, iwkv_opflags opflags) {
4035   if (!db || !db->iwkv || !key) {
4036     return IW_ERROR_INVALID_ARGS;
4037   }
4038   int rci;
4039   IWKV_val ekey;
4040   IWKV iwkv = db->iwkv;
4041 
4042   uint8_t nbuf[IW_VNUMBUFSZ];
4043   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
4044   RCRET(rc);
4045   IWLCTX lx = {
4046     .db      = db,
4047     .key     = &ekey,
4048     .nlvl    = -1,
4049     .op      = IWLCTX_DEL,
4050     .opflags = opflags
4051   };
4052   API_DB_WLOCK(db, rci);
4053   if (!db->cache.open) {
4054     rc = _dbcache_fill_lw(&lx);
4055     RCGO(rc, finish);
4056   }
4057   rc = _lx_del_lw(&lx);
4058 
4059 finish:
4060   API_DB_UNLOCK(db, rci, rc);
4061   if (!rc) {
4062     if (lx.opflags & IWKV_SYNC) {
4063       rc = _iwkv_sync(iwkv, 0);
4064     } else {
4065       rc = iwal_poke_checkpoint(iwkv, false);
4066     }
4067   }
4068   return rc;
4069 }
4070 
_cursor_close_lw(IWKV_cursor cur)4071 IW_INLINE iwrc _cursor_close_lw(IWKV_cursor cur) {
4072   iwrc rc = 0;
4073   cur->closed = true;
4074   IWDB db = cur->lx.db;
4075   pthread_spin_lock(&db->cursors_slk);
4076   for (IWKV_cursor c = db->cursors, pc = 0; c; pc = c, c = c->next) {
4077     if (c == cur) {
4078       if (pc) {
4079         pc->next = c->next;
4080       } else {
4081         db->cursors = c->next;
4082       }
4083       break;
4084     }
4085   }
4086   pthread_spin_unlock(&db->cursors_slk);
4087   return rc;
4088 }
4089 
iwkv_cursor_open(IWDB db,IWKV_cursor * curptr,IWKV_cursor_op op,const IWKV_val * key)4090 iwrc iwkv_cursor_open(
4091   IWDB            db,
4092   IWKV_cursor    *curptr,
4093   IWKV_cursor_op  op,
4094   const IWKV_val *key) {
4095   if (  !db || !db->iwkv || !curptr
4096      || (key && (op < IWKV_CURSOR_EQ)) || (op < IWKV_CURSOR_BEFORE_FIRST)) {
4097     return IW_ERROR_INVALID_ARGS;
4098   }
4099   iwrc rc;
4100   int rci;
4101   rc = _db_worker_inc_nolk(db);
4102   RCRET(rc);
4103   if (IW_LIKELY(db->cache.open)) {
4104     rc = _api_db_rlock(db);
4105   } else {
4106     rc = _api_db_wlock(db);
4107   }
4108   if (rc) {
4109     _db_worker_dec_nolk(db);
4110     return rc;
4111   }
4112   IWKV_cursor cur = 0;
4113   *curptr = calloc(1, sizeof(**curptr));
4114   if (!(*curptr)) {
4115     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
4116     goto finish;
4117   }
4118   cur = *curptr;
4119   IWLCTX *lx = &cur->lx;
4120   lx->db = db;
4121   lx->nlvl = -1;
4122   if (key) {
4123     rc = _to_effective_key(db, key, &lx->ekey, lx->nbuf);
4124     RCGO(rc, finish);
4125     lx->key = &lx->ekey;
4126   }
4127   if (!db->cache.open) {
4128     rc = _dbcache_fill_lw(lx);
4129     RCGO(rc, finish);
4130   }
4131   rc = _cursor_to_lr(cur, op);
4132 
4133 finish:
4134   if (cur) {
4135     if (rc) {
4136       *curptr = 0;
4137       IWRC(_cursor_close_lw(cur), rc);
4138       free(cur);
4139     } else {
4140       pthread_spin_lock(&db->cursors_slk);
4141       cur->next = db->cursors;
4142       db->cursors = cur;
4143       pthread_spin_unlock(&db->cursors_slk);
4144     }
4145   }
4146   API_DB_UNLOCK(db, rci, rc);
4147   if (rc) {
4148     _db_worker_dec_nolk(db);
4149   }
4150   return rc;
4151 }
4152 
iwkv_cursor_close(IWKV_cursor * curp)4153 iwrc iwkv_cursor_close(IWKV_cursor *curp) {
4154   iwrc rc = 0;
4155   int rci;
4156   if (!curp || !*curp) {
4157     return 0;
4158   }
4159   IWKV_cursor cur = *curp;
4160   *curp = 0;
4161   IWKV iwkv = cur->lx.db->iwkv;
4162   if (cur->closed) {
4163     free(cur);
4164     return 0;
4165   }
4166   if (!cur->lx.db) {
4167     return IW_ERROR_INVALID_ARGS;
4168   }
4169   API_DB_WLOCK(cur->lx.db, rci);
4170   rc = _cursor_close_lw(cur);
4171   API_DB_UNLOCK(cur->lx.db, rci, rc);
4172   IWRC(_db_worker_dec_nolk(cur->lx.db), rc);
4173   free(cur);
4174   if (!rc) {
4175     rc = iwal_poke_checkpoint(iwkv, false);
4176   }
4177   return rc;
4178 }
4179 
iwkv_cursor_to(IWKV_cursor cur,IWKV_cursor_op op)4180 iwrc iwkv_cursor_to(IWKV_cursor cur, IWKV_cursor_op op) {
4181   int rci;
4182   if (!cur) {
4183     return IW_ERROR_INVALID_ARGS;
4184   }
4185   if (!cur->lx.db) {
4186     return IW_ERROR_INVALID_ARGS;
4187   }
4188   API_DB_RLOCK(cur->lx.db, rci);
4189   iwrc rc = _cursor_to_lr(cur, op);
4190   API_DB_UNLOCK(cur->lx.db, rci, rc);
4191   return rc;
4192 }
4193 
iwkv_cursor_to_key(IWKV_cursor cur,IWKV_cursor_op op,const IWKV_val * key)4194 iwrc iwkv_cursor_to_key(IWKV_cursor cur, IWKV_cursor_op op, const IWKV_val *key) {
4195   int rci;
4196   if (!cur || ((op != IWKV_CURSOR_EQ) && (op != IWKV_CURSOR_GE))) {
4197     return IW_ERROR_INVALID_ARGS;
4198   }
4199   IWLCTX *lx = &cur->lx;
4200   if (!lx->db) {
4201     return IW_ERROR_INVALID_STATE;
4202   }
4203   iwrc rc = _to_effective_key(lx->db, key, &lx->ekey, lx->nbuf);
4204   RCRET(rc);
4205 
4206   API_DB_RLOCK(lx->db, rci);
4207   lx->key = &lx->ekey;
4208   rc = _cursor_to_lr(cur, op);
4209   API_DB_UNLOCK(lx->db, rci, rc);
4210   return rc;
4211 }
4212 
iwkv_cursor_get(IWKV_cursor cur,IWKV_val * okey,IWKV_val * oval)4213 iwrc iwkv_cursor_get(
4214   IWKV_cursor cur,
4215   IWKV_val   *okey,                    /* Nullable */
4216   IWKV_val   *oval) {                  /* Nullable */
4217   int rci;
4218   iwrc rc = 0;
4219   if (!cur || !cur->lx.db) {
4220     return IW_ERROR_INVALID_ARGS;
4221   }
4222   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4223     return IWKV_ERROR_NOTFOUND;
4224   }
4225   IWLCTX *lx = &cur->lx;
4226   API_DB_RLOCK(lx->db, rci);
4227   uint8_t *mm = 0;
4228   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4229   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4230   RCGO(rc, finish);
4231   if (!cur->cn->kvblk) {
4232     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4233     RCGO(rc, finish);
4234   }
4235   uint8_t idx = cur->cn->pi[cur->cnpos];
4236   if (okey && oval) {
4237     rc = _kvblk_kv_get(cur->cn->kvblk, mm, idx, okey, oval);
4238   } else if (oval) {
4239     rc = _kvblk_value_get(cur->cn->kvblk, mm, idx, oval);
4240   } else if (okey) {
4241     rc = _kvblk_key_get(cur->cn->kvblk, mm, idx, okey);
4242   } else {
4243     rc = IW_ERROR_INVALID_ARGS;
4244   }
4245   if (!rc && okey) {
4246     _unpack_effective_key(lx->db, okey, false);
4247   }
4248 finish:
4249   if (mm) {
4250     fsm->release_mmap(fsm);
4251   }
4252   API_DB_UNLOCK(lx->db, rci, rc);
4253   return rc;
4254 }
4255 
iwkv_cursor_copy_val(IWKV_cursor cur,void * vbuf,size_t vbufsz,size_t * vsz)4256 iwrc iwkv_cursor_copy_val(IWKV_cursor cur, void *vbuf, size_t vbufsz, size_t *vsz) {
4257   int rci;
4258   iwrc rc = 0;
4259   if (!cur || !vbuf || !cur->lx.db) {
4260     return IW_ERROR_INVALID_ARGS;
4261   }
4262   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4263     return IWKV_ERROR_NOTFOUND;
4264   }
4265 
4266   *vsz = 0;
4267   IWLCTX *lx = &cur->lx;
4268   API_DB_RLOCK(lx->db, rci);
4269   uint8_t *mm = 0, *oval;
4270   uint32_t ovalsz;
4271   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4272   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4273   RCGO(rc, finish);
4274   if (!cur->cn->kvblk) {
4275     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4276     RCGO(rc, finish);
4277   }
4278   uint8_t idx = cur->cn->pi[cur->cnpos];
4279   _kvblk_value_peek(cur->cn->kvblk, idx, mm, &oval, &ovalsz);
4280   *vsz = ovalsz;
4281   memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
4282 
4283 finish:
4284   if (mm) {
4285     fsm->release_mmap(fsm);
4286   }
4287   API_DB_UNLOCK(lx->db, rci, rc);
4288   return rc;
4289 }
4290 
iwkv_cursor_is_matched_key(IWKV_cursor cur,const IWKV_val * key,bool * ores,int64_t * ocompound)4291 iwrc iwkv_cursor_is_matched_key(IWKV_cursor cur, const IWKV_val *key, bool *ores, int64_t *ocompound) {
4292   int rci;
4293   iwrc rc = 0;
4294   if (!cur || !ores || !key || !cur->lx.db) {
4295     return IW_ERROR_INVALID_ARGS;
4296   }
4297   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4298     return IWKV_ERROR_NOTFOUND;
4299   }
4300 
4301   *ores = 0;
4302   if (ocompound) {
4303     *ocompound = 0;
4304   }
4305 
4306   IWLCTX *lx = &cur->lx;
4307   API_DB_RLOCK(lx->db, rci);
4308   uint8_t *mm = 0, *okey;
4309   uint32_t okeysz;
4310   iwdb_flags_t dbflg = lx->db->dbflg;
4311   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4312   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4313   RCGO(rc, finish);
4314   if (!cur->cn->kvblk) {
4315     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4316     RCGO(rc, finish);
4317   }
4318 
4319   uint8_t idx = cur->cn->pi[cur->cnpos];
4320   rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4321   RCGO(rc, finish);
4322 
4323   if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4324     char nbuf[2 * IW_VNUMBUFSZ];
4325     IWKV_val rkey = { .data = nbuf, .size = okeysz };
4326     memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4327     rc = _unpack_effective_key(lx->db, &rkey, true);
4328     RCGO(rc, finish);
4329     if (ocompound) {
4330       *ocompound = rkey.compound;
4331     }
4332     if (rkey.size != key->size) {
4333       *ores = false;
4334       goto finish;
4335     }
4336     if (dbflg & IWDB_VNUM64_KEYS) {
4337       *ores = !memcmp(rkey.data, key->data, key->size);
4338     } else {
4339       *ores = !memcmp(okey + (okeysz - rkey.size), key->data, key->size);
4340     }
4341   } else {
4342     *ores = (okeysz == key->size) && !memcmp(okey, key->data, key->size);
4343   }
4344 
4345 finish:
4346   if (mm) {
4347     fsm->release_mmap(fsm);
4348   }
4349   API_DB_UNLOCK(cur->lx.db, rci, rc);
4350   return rc;
4351 }
4352 
iwkv_cursor_copy_key(IWKV_cursor cur,void * kbuf,size_t kbufsz,size_t * ksz,int64_t * compound)4353 iwrc iwkv_cursor_copy_key(IWKV_cursor cur, void *kbuf, size_t kbufsz, size_t *ksz, int64_t *compound) {
4354   int rci;
4355   iwrc rc = 0;
4356   if (!cur || !cur->lx.db) {
4357     return IW_ERROR_INVALID_ARGS;
4358   }
4359   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4360     return IWKV_ERROR_NOTFOUND;
4361   }
4362 
4363   *ksz = 0;
4364   IWLCTX *lx = &cur->lx;
4365   API_DB_RLOCK(lx->db, rci);
4366   uint8_t *mm = 0, *okey;
4367   uint32_t okeysz;
4368   iwdb_flags_t dbflg = lx->db->dbflg;
4369   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4370   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4371   RCGO(rc, finish);
4372   if (!cur->cn->kvblk) {
4373     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4374     RCGO(rc, finish);
4375   }
4376 
4377   uint8_t idx = cur->cn->pi[cur->cnpos];
4378   rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4379   RCGO(rc, finish);
4380 
4381   if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4382     char nbuf[2 * IW_VNUMBUFSZ];
4383     IWKV_val rkey = { .data = nbuf, .size = okeysz };
4384     memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4385     rc = _unpack_effective_key(lx->db, &rkey, true);
4386     RCGO(rc, finish);
4387     if (compound) {
4388       *compound = rkey.compound;
4389     }
4390     *ksz = rkey.size;
4391     if (dbflg & IWDB_VNUM64_KEYS) {
4392       memcpy(kbuf, rkey.data, MIN(kbufsz, rkey.size));
4393     } else {
4394       memcpy(kbuf, okey + (okeysz - rkey.size), MIN(kbufsz, rkey.size));
4395     }
4396   } else {
4397     *ksz = okeysz;
4398     if (compound) {
4399       *compound = 0;
4400     }
4401     memcpy(kbuf, okey, MIN(kbufsz, okeysz));
4402   }
4403 
4404 finish:
4405   if (mm) {
4406     fsm->release_mmap(fsm);
4407   }
4408   API_DB_UNLOCK(cur->lx.db, rci, rc);
4409   return rc;
4410 }
4411 
iwkv_cursor_seth(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)4412 IW_EXPORT iwrc iwkv_cursor_seth(
4413   IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags,
4414   IWKV_PUT_HANDLER ph, void *phop) {
4415   int rci;
4416   iwrc rc = 0, irc = 0;
4417   if (!cur || !cur->lx.db) {
4418     return IW_ERROR_INVALID_ARGS;
4419   }
4420   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4421     return IWKV_ERROR_NOTFOUND;
4422   }
4423 
4424   IWLCTX *lx = &cur->lx;
4425   IWDB db = lx->db;
4426   IWKV iwkv = db->iwkv;
4427   SBLK *sblk = cur->cn;
4428 
4429   API_DB_WLOCK(db, rci);
4430   if (ph) {
4431     uint8_t *mm;
4432     IWKV_val key, oldval;
4433     IWFS_FSM *fsm = &db->iwkv->fsm;
4434     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4435     RCGO(rc, finish);
4436     rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key, &oldval);
4437     fsm->release_mmap(fsm);
4438     if (!rc) {
4439       // note: oldval should be disposed by ph
4440       rc = ph(&key, val, &oldval, phop);
4441       _kv_val_dispose(&key);
4442     }
4443     RCGO(rc, finish);
4444   }
4445 
4446   rc = _sblk_updatekv(sblk, cur->cnpos, 0, val);
4447   if (IWKV_IS_INTERNAL_RC(rc)) {
4448     irc = rc;
4449     rc = 0;
4450   }
4451   RCGO(rc, finish);
4452 
4453   rc = _sblk_sync(lx, sblk);
4454   RCGO(rc, finish);
4455 
4456   // Update active cursors inside this block
4457   pthread_spin_lock(&db->cursors_slk);
4458   for (IWKV_cursor c = db->cursors; c; c = c->next) {
4459     if (c->cn && (c->cn->addr == sblk->addr)) {
4460       if (c->cn != sblk) {
4461         memcpy(c->cn, sblk, sizeof(*c->cn));
4462         c->cn->kvblk = 0;
4463         c->cn->flags &= SBLK_PERSISTENT_FLAGS;
4464       }
4465     }
4466   }
4467   pthread_spin_unlock(&db->cursors_slk);
4468 
4469 finish:
4470   API_DB_UNLOCK(db, rci, rc);
4471   if (!rc) {
4472     if (opflags & IWKV_SYNC) {
4473       rc = _iwkv_sync(iwkv, 0);
4474     } else {
4475       rc = iwal_poke_checkpoint(iwkv, false);
4476     }
4477   }
4478   return rc ? rc : irc;
4479 }
4480 
iwkv_cursor_set(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags)4481 iwrc iwkv_cursor_set(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags) {
4482   return iwkv_cursor_seth(cur, val, opflags, 0, 0);
4483 }
4484 
iwkv_cursor_val(IWKV_cursor cur,IWKV_val * oval)4485 iwrc iwkv_cursor_val(IWKV_cursor cur, IWKV_val *oval) {
4486   return iwkv_cursor_get(cur, 0, oval);
4487 }
4488 
iwkv_cursor_key(IWKV_cursor cur,IWKV_val * okey)4489 iwrc iwkv_cursor_key(IWKV_cursor cur, IWKV_val *okey) {
4490   return iwkv_cursor_get(cur, okey, 0);
4491 }
4492 
iwkv_cursor_del(IWKV_cursor cur,iwkv_opflags opflags)4493 iwrc iwkv_cursor_del(IWKV_cursor cur, iwkv_opflags opflags) {
4494   int rci;
4495   iwrc rc = 0;
4496   if (!cur || !cur->lx.db) {
4497     return IW_ERROR_INVALID_ARGS;
4498   }
4499   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4500     return IWKV_ERROR_NOTFOUND;
4501   }
4502 
4503   uint8_t *mm;
4504   SBLK *sblk = cur->cn;
4505   IWLCTX *lx = &cur->lx;
4506   IWDB db = lx->db;
4507   IWKV iwkv = db->iwkv;
4508   IWFS_FSM *fsm = &iwkv->fsm;
4509 
4510   API_DB_WLOCK(db, rci);
4511   if (!db->cache.open) {
4512     rc = _dbcache_fill_lw(lx);
4513     RCGO(rc, finish);
4514   }
4515   if (sblk->pnum == 1) { // sblk will be removed
4516     IWKV_val key = { 0 };
4517     // Key a key
4518     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4519     RCGO(rc, finish2);
4520     if (!sblk->kvblk) {
4521       rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4522       fsm->release_mmap(fsm);
4523       RCGO(rc, finish2);
4524     }
4525     rc = _kvblk_key_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key);
4526     fsm->release_mmap(fsm);
4527     RCGO(rc, finish2);
4528 
4529     lx->key = &key;
4530     rc = _lx_del_sblk_lw(lx, sblk, cur->cnpos);
4531     lx->key = 0;
4532 
4533 finish2:
4534     if (rc) {
4535       _lx_release_mm(lx, 0);
4536     } else {
4537       rc = _lx_release(lx);
4538     }
4539     if (key.data) {
4540       _kv_val_dispose(&key);
4541     }
4542   } else { // Simple case
4543     if (!sblk->kvblk) {
4544       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4545       RCGO(rc, finish);
4546       rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4547       fsm->release_mmap(fsm);
4548       RCGO(rc, finish);
4549     }
4550     rc = _sblk_rmkv(sblk, cur->cnpos);
4551     RCGO(rc, finish);
4552     rc = _sblk_sync(lx, sblk);
4553   }
4554 
4555 finish:
4556   API_DB_UNLOCK(db, rci, rc);
4557   if (!rc) {
4558     if (opflags & IWKV_SYNC) {
4559       rc = _iwkv_sync(iwkv, 0);
4560     } else {
4561       rc = iwal_poke_checkpoint(iwkv, false);
4562     }
4563   }
4564   return rc;
4565 }
4566 
4567 #include "./dbg/iwkvdbg.c"
4568