1 // -V::512
2
3 #include "iwkv_internal.h"
4 #include "iwconv.h"
5 #include <stdalign.h>
6
7 static iwrc _dbcache_fill_lw(IWLCTX *lx);
8 static iwrc _dbcache_get(IWLCTX *lx);
9 static iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk);
10 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk);
11 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk);
12 static void _dbcache_destroy_lw(IWDB db);
13
14 #define _wnw_db_wl(db_) _api_db_wlock(db_)
15
16 //-------------------------- GLOBALS
17
18 #ifdef IW_TESTS
19 volatile int8_t iwkv_next_level = -1;
20 #endif
21 atomic_uint_fast64_t g_trigger;
22
23 #define IWKV_IS_INTERNAL_RC(rc_) ((rc_) > _IWKV_ERROR_END && (rc_) < _IWKV_RC_END)
24
25 //-------------------------- UTILS
26
_to_effective_key(struct _IWDB * db,const IWKV_val * key,IWKV_val * okey,uint8_t nbuf[static IW_VNUMBUFSZ])27 IW_SOFT_INLINE iwrc _to_effective_key(
28 struct _IWDB *db, const IWKV_val *key, IWKV_val *okey,
29 uint8_t nbuf[static IW_VNUMBUFSZ]) {
30 static_assert(IW_VNUMBUFSZ >= sizeof(uint64_t), "IW_VNUMBUFSZ >= sizeof(uint64_t)");
31 iwdb_flags_t dbflg = db->dbflg;
32 // Keys compound will be processed at lower levels at `addkv` routines
33 okey->compound = key->compound;
34 if (dbflg & IWDB_VNUM64_KEYS) {
35 unsigned len;
36 if (key->size == 8) {
37 uint64_t llv;
38 memcpy(&llv, key->data, sizeof(llv));
39 IW_SETVNUMBUF64(len, nbuf, llv);
40 if (!len) {
41 return IW_ERROR_OVERFLOW;
42 }
43 okey->size = len;
44 okey->data = nbuf;
45 } else if (key->size == 4) {
46 uint32_t lv;
47 memcpy(&lv, key->data, sizeof(lv));
48 IW_SETVNUMBUF(len, nbuf, lv);
49 if (!len) {
50 return IW_ERROR_OVERFLOW;
51 }
52 okey->size = len;
53 okey->data = nbuf;
54 } else {
55 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
56 }
57 } else {
58 okey->data = key->data;
59 okey->size = key->size;
60 }
61 return 0;
62 }
63
64 // NOTE: at least `2*IW_VNUMBUFSZ` must be allocated for key->data
_unpack_effective_key(struct _IWDB * db,IWKV_val * key,bool no_move_key_data)65 static iwrc _unpack_effective_key(struct _IWDB *db, IWKV_val *key, bool no_move_key_data) {
66 iwdb_flags_t dbflg = db->dbflg;
67 uint8_t *data = key->data;
68 if (dbflg & IWDB_COMPOUND_KEYS) {
69 int step;
70 IW_READVNUMBUF64(key->data, key->compound, step);
71 if (step >= key->size) {
72 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
73 }
74 data += step;
75 key->size -= step;
76 if (!no_move_key_data && !(dbflg & IWDB_VNUM64_KEYS)) {
77 memmove(key->data, data, key->size);
78 }
79 } else {
80 key->compound = 0;
81 }
82 if (dbflg & IWDB_VNUM64_KEYS) {
83 int64_t llv;
84 char nbuf[IW_VNUMBUFSZ];
85 if (key->size > IW_VNUMBUFSZ) {
86 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
87 }
88 memcpy(nbuf, data, key->size);
89 IW_READVNUMBUF64_2(nbuf, llv);
90 memcpy(key->data, &llv, sizeof(llv));
91 key->size = sizeof(llv);
92 }
93 return 0;
94 }
95
_cmp_keys_prefix(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)96 static int _cmp_keys_prefix(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
97 int ret;
98 if (dbflg & IWDB_COMPOUND_KEYS) {
99 // Compound keys mode
100 const char *u1 = v1;
101 const char *u2 = key->data;
102 int step, v2len = (int) key->size;
103 int64_t c1, c2 = key->compound;
104 IW_READVNUMBUF64(v1, c1, step);
105 v1len -= step;
106 u1 += step;
107 if (v1len < 1) {
108 // Inconsistent data?
109 return v2len - v1len;
110 }
111 if (dbflg & IWDB_VNUM64_KEYS) {
112 if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
113 return v2len - v1len;
114 }
115 int64_t n1, n2;
116 char vbuf[IW_VNUMBUFSZ];
117 memcpy(vbuf, u1, v1len);
118 IW_READVNUMBUF64_2(vbuf, n1);
119 memcpy(vbuf, u2, v2len);
120 IW_READVNUMBUF64_2(vbuf, n2);
121 ret = n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
122 if (ret == 0) {
123 ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
124 }
125 } else if (dbflg & IWDB_REALNUM_KEYS) {
126 ret = iwafcmp(u2, v2len, u1, v1len);
127 if (ret == 0) {
128 ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
129 }
130 } else {
131 IW_CMP2(ret, u2, v2len, u1, v1len);
132 }
133 return ret;
134 } else {
135 int v2len = (int) key->size;
136 const void *v2 = key->data;
137 if (dbflg & IWDB_VNUM64_KEYS) {
138 if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
139 return v2len - v1len;
140 }
141 int64_t n1, n2;
142 char vbuf[IW_VNUMBUFSZ];
143 memcpy(vbuf, v1, v1len);
144 IW_READVNUMBUF64_2(vbuf, n1);
145 memcpy(vbuf, v2, v2len);
146 IW_READVNUMBUF64_2(vbuf, n2);
147 return n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
148 } else if (dbflg & IWDB_REALNUM_KEYS) {
149 return iwafcmp(v2, v2len, v1, v1len);
150 } else {
151 IW_CMP2(ret, v2, v2len, v1, v1len);
152 return ret;
153 }
154 }
155 }
156
_cmp_keys(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)157 IW_INLINE int _cmp_keys(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
158 int rv = _cmp_keys_prefix(dbflg, v1, v1len, key);
159 if ((rv == 0) && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
160 if (dbflg & IWDB_COMPOUND_KEYS) {
161 int step;
162 int64_t c1, c2 = key->compound;
163 IW_READVNUMBUF64(v1, c1, step);
164 v1len -= step;
165 if ((int) key->size == v1len) {
166 return c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
167 }
168 }
169 return (int) key->size - v1len;
170 } else {
171 return rv;
172 }
173 }
174
_kv_val_dispose(IWKV_val * v)175 IW_INLINE void _kv_val_dispose(IWKV_val *v) {
176 if (v) {
177 free(v->data);
178 v->size = 0;
179 v->data = 0;
180 }
181 }
182
_kv_dispose(IWKV_val * key,IWKV_val * val)183 IW_INLINE void _kv_dispose(IWKV_val *key, IWKV_val *val) {
184 _kv_val_dispose(key);
185 _kv_val_dispose(val);
186 }
187
iwkv_val_dispose(IWKV_val * v)188 void iwkv_val_dispose(IWKV_val *v) {
189 _kv_val_dispose(v);
190 }
191
iwkv_kv_dispose(IWKV_val * key,IWKV_val * val)192 void iwkv_kv_dispose(IWKV_val *key, IWKV_val *val) {
193 _kv_dispose(key, val);
194 }
195
_num2lebuf(uint8_t buf[static8],void * numdata,size_t sz)196 IW_INLINE void _num2lebuf(uint8_t buf[static 8], void *numdata, size_t sz) {
197 assert(sz == 4 || sz == 8);
198 if (sz > 4) {
199 uint64_t llv;
200 memcpy(&llv, numdata, sizeof(llv));
201 llv = IW_HTOILL(llv);
202 memcpy(buf, &llv, sizeof(llv));
203 } else {
204 uint32_t lv;
205 memcpy(&lv, numdata, sizeof(lv));
206 lv = IW_HTOIL(lv);
207 memcpy(buf, &lv, sizeof(lv));
208 }
209 }
210
211 //-------------------------- IWKV/IWDB WORKERS
212
_iwkv_worker_inc_nolk(IWKV iwkv)213 static WUR iwrc _iwkv_worker_inc_nolk(IWKV iwkv) {
214 if (!iwkv || !iwkv->open) {
215 return IW_ERROR_INVALID_STATE;
216 }
217 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
218 if (rci) {
219 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
220 }
221 if (!iwkv->open) { // -V547
222 pthread_mutex_unlock(&iwkv->wk_mtx);
223 return IW_ERROR_INVALID_STATE;
224 }
225 while (iwkv->wk_pending_exclusive) {
226 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
227 }
228 ++iwkv->wk_count;
229 pthread_cond_broadcast(&iwkv->wk_cond);
230 pthread_mutex_unlock(&iwkv->wk_mtx);
231 return 0;
232 }
233
_db_worker_inc_nolk(IWDB db)234 static WUR iwrc _db_worker_inc_nolk(IWDB db) {
235 if (!db || !db->iwkv || !db->iwkv->open || !db->open) {
236 return IW_ERROR_INVALID_STATE;
237 }
238 IWKV iwkv = db->iwkv;
239 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
240 if (rci) {
241 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
242 }
243 if (!iwkv->open || !db->open) { // -V560
244 pthread_mutex_unlock(&iwkv->wk_mtx);
245 return IW_ERROR_INVALID_STATE;
246 }
247 while (db->wk_pending_exclusive) {
248 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
249 }
250 ++iwkv->wk_count;
251 ++db->wk_count;
252 pthread_cond_broadcast(&iwkv->wk_cond);
253 pthread_mutex_unlock(&iwkv->wk_mtx);
254 return 0;
255 }
256
_iwkv_worker_dec_nolk(IWKV iwkv)257 static iwrc _iwkv_worker_dec_nolk(IWKV iwkv) {
258 if (!iwkv) {
259 return IW_ERROR_INVALID_STATE;
260 }
261 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
262 if (rci) {
263 // Last chanсe to be consistent
264 --iwkv->wk_count;
265 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
266 }
267 --iwkv->wk_count;
268 pthread_cond_broadcast(&iwkv->wk_cond);
269 pthread_mutex_unlock(&iwkv->wk_mtx);
270 return 0;
271 }
272
_db_worker_dec_nolk(IWDB db)273 static iwrc _db_worker_dec_nolk(IWDB db) {
274 if (!db || !db->iwkv) { // do not use ENSURE_OPEN_DB here
275 return IW_ERROR_INVALID_STATE;
276 }
277 IWKV iwkv = db->iwkv;
278 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
279 if (rci) {
280 // Last chanсe to be consistent
281 --iwkv->wk_count;
282 --db->wk_count;
283 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
284 }
285 --iwkv->wk_count;
286 --db->wk_count;
287 pthread_cond_broadcast(&iwkv->wk_cond);
288 pthread_mutex_unlock(&iwkv->wk_mtx);
289 return 0;
290 }
291
_wnw_iwkw_wl(IWKV iwkv)292 static WUR iwrc _wnw_iwkw_wl(IWKV iwkv) {
293 int rci = pthread_rwlock_wrlock(&iwkv->rwl);
294 if (rci) {
295 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
296 }
297 return 0;
298 }
299
_wnw(IWKV iwkv,iwrc (* after)(IWKV iwkv))300 static WUR iwrc _wnw(IWKV iwkv, iwrc (*after)(IWKV iwkv)) {
301 iwrc rc = 0;
302 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
303 if (rci) {
304 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
305 }
306 iwkv->wk_pending_exclusive = true;
307 while (iwkv->wk_count > 0) {
308 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
309 }
310 if (after) {
311 rc = after(iwkv);
312 }
313 iwkv->wk_pending_exclusive = false;
314 pthread_cond_broadcast(&iwkv->wk_cond);
315 rci = pthread_mutex_unlock(&iwkv->wk_mtx);
316 if (rci) {
317 IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
318 }
319 return rc;
320 }
321
_wnw_db(IWDB db,iwrc (* after)(IWDB db))322 static WUR iwrc _wnw_db(IWDB db, iwrc (*after)(IWDB db)) {
323 iwrc rc = 0;
324 IWKV iwkv = db->iwkv;
325 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
326 if (rci) {
327 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
328 }
329 db->wk_pending_exclusive = true;
330 while (db->wk_count > 0) {
331 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
332 }
333 if (after) {
334 rc = after(db);
335 }
336 db->wk_pending_exclusive = false;
337 pthread_cond_broadcast(&iwkv->wk_cond);
338 rci = pthread_mutex_unlock(&iwkv->wk_mtx);
339 if (rci) {
340 IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
341 }
342 return rc;
343 }
344
345 //-------------------------- DB
346
_db_at(IWKV iwkv,IWDB * dbp,off_t addr,uint8_t * mm)347 static WUR iwrc _db_at(IWKV iwkv, IWDB *dbp, off_t addr, uint8_t *mm) {
348 iwrc rc = 0;
349 uint8_t *rp, bv;
350 uint32_t lv;
351 int rci;
352 IWDB db = calloc(1, sizeof(struct _IWDB));
353 *dbp = 0;
354 if (!db) {
355 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
356 }
357 pthread_rwlockattr_t attr;
358 pthread_rwlockattr_init(&attr);
359 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
360 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
361 #endif
362 rci = pthread_rwlock_init(&db->rwl, &attr);
363 if (rci) {
364 free(db);
365 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
366 }
367 rci = pthread_spin_init(&db->cursors_slk, 0);
368 if (rci) {
369 pthread_rwlock_destroy(&db->rwl);
370 free(db);
371 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
372 }
373 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
374 db->flags = SBLK_DB;
375 db->addr = addr;
376 db->db = db;
377 db->iwkv = iwkv;
378 rp = mm + addr;
379 IW_READLV(rp, lv, lv);
380 if (lv != IWDB_MAGIC) {
381 rc = IWKV_ERROR_CORRUPTED;
382 iwlog_ecode_error3(rc);
383 goto finish;
384 }
385 IW_READBV(rp, bv, db->dbflg);
386 IW_READLV(rp, lv, db->id);
387 IW_READLV(rp, lv, db->next_db_addr);
388 db->next_db_addr = BLK2ADDR(db->next_db_addr); // blknum -> addr
389 rp = mm + addr + DOFF_C0_U4;
390 for (int i = 0; i < SLEVELS; ++i) {
391 IW_READLV(rp, lv, db->lcnt[i]);
392 }
393 if (iwkv->fmt_version >= 1) {
394 IW_READLV(rp, lv, db->meta_blk);
395 IW_READLV(rp, lv, db->meta_blkn);
396 }
397 db->open = true;
398 *dbp = db;
399
400 finish:
401 if (rc) {
402 pthread_rwlock_destroy(&db->rwl);
403 free(db);
404 }
405 return rc;
406 }
407
_db_save(IWDB db,bool newdb,uint8_t * mm)408 static WUR iwrc _db_save(IWDB db, bool newdb, uint8_t *mm) {
409 iwrc rc = 0;
410 uint32_t lv;
411 uint8_t *wp = mm + db->addr, bv;
412 uint8_t *sp = wp;
413 IWDLSNR *dlsnr = db->iwkv->dlsnr;
414 db->next_db_addr = db->next ? db->next->addr : 0;
415 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
416 IW_WRITELV(wp, lv, IWDB_MAGIC);
417 IW_WRITEBV(wp, bv, db->dbflg);
418 IW_WRITELV(wp, lv, db->id);
419 IW_WRITELV(wp, lv, ADDR2BLK(db->next_db_addr));
420 if (dlsnr) {
421 rc = dlsnr->onwrite(dlsnr, db->addr, sp, wp - sp, 0);
422 RCRET(rc);
423 }
424 if (db->iwkv->fmt_version >= 1) {
425 if (newdb) {
426 memset(wp, 0, 4 + SLEVELS * 4 * 2); // p0 + n[24] + c[24]
427 sp = wp;
428 wp += 4 + SLEVELS * 4 * 2; // set to zero
429 } else {
430 wp += 4 + SLEVELS * 4 * 2; // skip
431 sp = wp;
432 }
433 IW_WRITELV(wp, lv, db->meta_blk);
434 IW_WRITELV(wp, lv, db->meta_blkn);
435 if (dlsnr) {
436 rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
437 }
438 }
439 return rc;
440 }
441
_db_load_chain(IWKV iwkv,off_t addr,uint8_t * mm)442 static WUR iwrc _db_load_chain(IWKV iwkv, off_t addr, uint8_t *mm) {
443 iwrc rc;
444 int rci;
445 IWDB db = 0, ndb;
446 if (!addr) {
447 return 0;
448 }
449 do {
450 rc = _db_at(iwkv, &ndb, addr, mm);
451 RCRET(rc);
452 if (db) {
453 db->next = ndb;
454 ndb->prev = db;
455 } else {
456 iwkv->first_db = ndb;
457 }
458 db = ndb;
459 addr = db->next_db_addr;
460 iwkv->last_db = db;
461 khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
462 if (rci != -1) {
463 kh_value(iwkv->dbs, k) = db;
464 } else {
465 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
466 }
467 } while (db->next_db_addr);
468 return rc;
469 }
470
_db_release_lw(IWDB * dbp)471 static void _db_release_lw(IWDB *dbp) {
472 assert(dbp && *dbp);
473 IWDB db = *dbp;
474 _dbcache_destroy_lw(db);
475 pthread_rwlock_destroy(&db->rwl);
476 pthread_spin_destroy(&db->cursors_slk);
477 free(db);
478 *dbp = 0;
479 }
480
481 typedef struct DISPOSE_DB_CTX {
482 IWKV iwkv;
483 IWDB db;
484 blkn_t sbn; // First `SBLK` block in DB
485 } DISPOSE_DB_CTX;
486
_db_dispose_chain(DISPOSE_DB_CTX * dctx)487 static iwrc _db_dispose_chain(DISPOSE_DB_CTX *dctx) {
488 iwrc rc = 0;
489 uint8_t *mm, kvszpow;
490 IWFS_FSM *fsm = &dctx->iwkv->fsm;
491 blkn_t sbn = dctx->sbn, kvblkn;
492 off_t page = 0;
493
494 while (sbn) {
495 off_t sba = BLK2ADDR(sbn);
496 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
497 RCBREAK(rc);
498 memcpy(&kvblkn, mm + sba + SOFF_KBLK_U4, 4);
499 kvblkn = IW_ITOHL(kvblkn);
500 memcpy(&sbn, mm + sba + SOFF_N0_U4, 4);
501 sbn = IW_ITOHL(sbn);
502 if (kvblkn) {
503 memcpy(&kvszpow, mm + BLK2ADDR(kvblkn) + KBLK_SZPOW_OFF, 1);
504 }
505 if (dctx->iwkv->fmt_version > 1) {
506 uint8_t bpos;
507 memcpy(&bpos, mm + sba + SOFF_BPOS_U1_V2, 1);
508 rc = fsm->release_mmap(fsm);
509 RCBREAK(rc);
510 if ((bpos > 0) && (bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
511 off_t npage = sba - (bpos - 1) * SBLK_SZ;
512 if (npage != page) {
513 if (page) {
514 if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
515 rc = fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2);
516 }
517 RCBREAK(rc);
518 }
519 page = npage;
520 }
521 }
522 } else {
523 rc = fsm->release_mmap(fsm);
524 RCBREAK(rc);
525 // Deallocate `SBLK`
526 rc = fsm->deallocate(fsm, sba, SBLK_SZ);
527 RCBREAK(rc);
528 }
529 // Deallocate `KVBLK`
530 if (kvblkn) {
531 rc = fsm->deallocate(fsm, BLK2ADDR(kvblkn), 1ULL << kvszpow);
532 RCBREAK(rc);
533 }
534 }
535 if (page) {
536 if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
537 IWRC(fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2), rc);
538 }
539 }
540 _db_release_lw(&dctx->db);
541 return rc;
542 }
543
_db_destroy_lw(IWDB * dbp)544 static WUR iwrc _db_destroy_lw(IWDB *dbp) {
545 iwrc rc;
546 uint8_t *mm;
547 IWDB db = *dbp;
548 IWKV iwkv = db->iwkv;
549 IWDB prev = db->prev;
550 IWDB next = db->next;
551 IWFS_FSM *fsm = &iwkv->fsm;
552 uint32_t first_sblkn;
553
554 khiter_t k = kh_get(DBS, iwkv->dbs, db->id);
555 if (k == kh_end(iwkv->dbs)) {
556 iwlog_ecode_error3(IW_ERROR_INVALID_STATE);
557 return IW_ERROR_INVALID_STATE;
558 }
559 kh_del(DBS, iwkv->dbs, k);
560
561 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
562 RCRET(rc);
563 if (prev) {
564 prev->next = next;
565 rc = _db_save(prev, false, mm);
566 if (rc) {
567 fsm->release_mmap(fsm);
568 return rc;
569 }
570 }
571 if (next) {
572 next->prev = prev;
573 rc = _db_save(next, false, mm);
574 if (rc) {
575 fsm->release_mmap(fsm);
576 return rc;
577 }
578 }
579 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
580 memcpy(&first_sblkn, mm + db->addr + DOFF_N0_U4, 4);
581 first_sblkn = IW_ITOHL(first_sblkn);
582 fsm->release_mmap(fsm);
583
584 if (iwkv->first_db && (iwkv->first_db->addr == db->addr)) {
585 uint64_t llv;
586 db->iwkv->first_db = next;
587 llv = next ? (uint64_t) next->addr : 0;
588 llv = IW_HTOILL(llv);
589 rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
590 }
591 if (iwkv->last_db && (iwkv->last_db->addr == db->addr)) {
592 iwkv->last_db = prev;
593 }
594 // Cleanup DB
595 off_t db_addr = db->addr;
596 blkn_t meta_blk = db->meta_blk;
597 blkn_t meta_blkn = db->meta_blkn;
598 db->open = false;
599
600 DISPOSE_DB_CTX dctx = {
601 .sbn = first_sblkn,
602 .iwkv = iwkv,
603 .db = db
604 };
605 IWRC(_db_dispose_chain(&dctx), rc);
606 if (meta_blk && meta_blkn) {
607 IWRC(fsm->deallocate(fsm, BLK2ADDR(meta_blk), BLK2ADDR(meta_blkn)), rc);
608 }
609 IWRC(fsm->deallocate(fsm, db_addr, DB_SZ), rc);
610 return rc;
611 }
612
_db_create_lw(IWKV iwkv,dbid_t dbid,iwdb_flags_t dbflg,IWDB * odb)613 static WUR iwrc _db_create_lw(IWKV iwkv, dbid_t dbid, iwdb_flags_t dbflg, IWDB *odb) {
614 iwrc rc;
615 int rci;
616 uint8_t *mm = 0;
617 off_t baddr = 0, blen;
618 IWFS_FSM *fsm = &iwkv->fsm;
619 *odb = 0;
620 IWDB db = calloc(1, sizeof(struct _IWDB));
621 if (!db) {
622 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
623 }
624 pthread_rwlockattr_t attr;
625 pthread_rwlockattr_init(&attr);
626 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
627 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
628 #endif
629 rci = pthread_rwlock_init(&db->rwl, &attr);
630 if (rci) {
631 free(db);
632 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
633 }
634 rci = pthread_spin_init(&db->cursors_slk, 0);
635 if (rci) {
636 pthread_rwlock_destroy(&db->rwl);
637 free(db);
638 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
639 }
640 rc = fsm->allocate(fsm, DB_SZ, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
641 if (rc) {
642 _db_release_lw(&db);
643 return rc;
644 }
645 db->iwkv = iwkv;
646 db->dbflg = dbflg;
647 db->addr = baddr;
648 db->id = dbid;
649 db->prev = iwkv->last_db;
650 if (!iwkv->first_db) {
651 uint64_t llv;
652 iwkv->first_db = db;
653 llv = (uint64_t) db->addr;
654 llv = IW_HTOILL(llv);
655 rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
656 } else if (iwkv->last_db) {
657 iwkv->last_db->next = db;
658 }
659 iwkv->last_db = db;
660 khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
661 if (rci != -1) {
662 kh_value(iwkv->dbs, k) = db;
663 } else {
664 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
665 goto finish;
666 }
667 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
668 RCGO(rc, finish);
669 rc = _db_save(db, true, mm);
670 RCGO(rc, finish);
671 if (db->prev) {
672 rc = _db_save(db->prev, false, mm);
673 RCGO(rc, finish);
674 }
675 db->open = true;
676 *odb = db;
677
678 finish:
679 if (mm) {
680 fsm->release_mmap(fsm);
681 }
682 if (rc) {
683 fsm->deallocate(fsm, baddr, blen);
684 _db_release_lw(&db);
685 }
686 return rc;
687 }
688
689 //-------------------------- KVBLK
690
_kvblk_create(IWLCTX * lx,off_t baddr,uint8_t kvbpow,KVBLK ** oblk)691 IW_INLINE void _kvblk_create(IWLCTX *lx, off_t baddr, uint8_t kvbpow, KVBLK **oblk) {
692 KVBLK *kblk = &lx->kaa[lx->kaan];
693 kblk->db = lx->db;
694 kblk->addr = baddr;
695 kblk->maxoff = 0;
696 kblk->idxsz = 2 * IW_VNUMSIZE(0) * KVBLK_IDXNUM;
697 kblk->zidx = 0;
698 kblk->szpow = kvbpow;
699 kblk->flags = KVBLK_DURTY;
700 memset(kblk->pidx, 0, sizeof(kblk->pidx));
701 *oblk = kblk;
702 AAPOS_INC(lx->kaan);
703 }
704
_kvblk_key_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)705 IW_INLINE WUR iwrc _kvblk_key_peek(
706 const KVBLK *kb,
707 uint8_t idx, const uint8_t *mm, uint8_t **obuf,
708 uint32_t *olen) {
709 if (kb->pidx[idx].len) {
710 uint32_t klen, step;
711 const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
712 IW_READVNUMBUF(rp, klen, step);
713 if (!klen) {
714 *obuf = 0;
715 *olen = 0;
716 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
717 return IWKV_ERROR_CORRUPTED;
718 }
719 rp += step;
720 *obuf = (uint8_t*) rp;
721 *olen = klen;
722 } else {
723 *obuf = 0;
724 *olen = 0;
725 }
726 return 0;
727 }
728
_kvblk_value_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)729 IW_INLINE void _kvblk_value_peek(const KVBLK *kb, uint8_t idx, const uint8_t *mm, uint8_t **obuf, uint32_t *olen) {
730 assert(idx < KVBLK_IDXNUM);
731 if (kb->pidx[idx].len) {
732 uint32_t klen, step;
733 const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
734 IW_READVNUMBUF(rp, klen, step);
735 rp += step;
736 rp += klen;
737 *obuf = (uint8_t*) rp;
738 *olen = kb->pidx[idx].len - klen - step;
739 } else {
740 *obuf = 0;
741 *olen = 0;
742 }
743 }
744
_kvblk_key_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key)745 static WUR iwrc _kvblk_key_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key) {
746 assert(mm && idx < KVBLK_IDXNUM);
747 int32_t klen;
748 int step;
749 KVP *kvp = &kb->pidx[idx];
750 key->compound = 0;
751 if (!kvp->len) {
752 key->data = 0;
753 key->size = 0;
754 return 0;
755 }
756 // [klen:vn,key,value]
757 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
758 IW_READVNUMBUF(rp, klen, step);
759 rp += step;
760 if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
761 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
762 return IWKV_ERROR_CORRUPTED;
763 }
764 key->size = (size_t) klen;
765 if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
766 // Needed to provide enough buffer in _unpack_effective_key()
767 key->data = malloc(MAX(key->size, sizeof(int64_t)));
768 } else {
769 key->data = malloc(key->size);
770 }
771 if (!key->data) {
772 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
773 }
774 memcpy(key->data, rp, key->size);
775 return 0;
776 }
777
_kvblk_value_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * val)778 static WUR iwrc _kvblk_value_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *val) {
779 assert(mm && idx < KVBLK_IDXNUM);
780 int32_t klen;
781 int step;
782 KVP *kvp = &kb->pidx[idx];
783 val->compound = 0;
784 if (!kvp->len) {
785 val->data = 0;
786 val->size = 0;
787 return 0;
788 }
789 // [klen:vn,key,value]
790 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
791 IW_READVNUMBUF(rp, klen, step);
792 rp += step;
793 if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
794 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
795 return IWKV_ERROR_CORRUPTED;
796 }
797 rp += klen;
798 if (kvp->len > klen + step) {
799 val->size = kvp->len - klen - step;
800 val->data = malloc(val->size);
801 if (!val->data) {
802 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
803 val->size = 0;
804 return rc;
805 }
806 memcpy(val->data, rp, val->size);
807 } else {
808 val->data = 0;
809 val->size = 0;
810 }
811 return 0;
812 }
813
_kvblk_kv_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key,IWKV_val * val)814 static WUR iwrc _kvblk_kv_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key, IWKV_val *val) {
815 assert(mm && idx < KVBLK_IDXNUM);
816 int32_t klen;
817 int step;
818 KVP *kvp = &kb->pidx[idx];
819 key->compound = 0;
820 val->compound = 0;
821 if (!kvp->len) {
822 key->data = 0;
823 key->size = 0;
824 val->data = 0;
825 val->size = 0;
826 return 0;
827 }
828 // [klen:vn,key,value]
829 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
830 IW_READVNUMBUF(rp, klen, step);
831 rp += step;
832 if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
833 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
834 return IWKV_ERROR_CORRUPTED;
835 }
836 key->size = (size_t) klen;
837 if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
838 // Needed to provide enough buffer in _unpack_effective_key()
839 key->data = malloc(MAX(key->size, sizeof(int64_t)));
840 } else {
841 key->data = malloc(key->size);
842 }
843 if (!key->data) {
844 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
845 }
846 memcpy(key->data, rp, key->size);
847 rp += klen;
848 if (kvp->len > klen + step) {
849 val->size = kvp->len - klen - step;
850 val->data = malloc(val->size);
851 if (!val->data) {
852 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
853 free(key->data);
854 key->data = 0;
855 key->size = 0;
856 val->size = 0;
857 return rc;
858 }
859 memcpy(val->data, rp, val->size);
860 } else {
861 val->data = 0;
862 val->size = 0;
863 }
864 return 0;
865 }
866
_kvblk_at_mm(IWLCTX * lx,off_t addr,uint8_t * mm,KVBLK * kbp,KVBLK ** blkp)867 static WUR iwrc _kvblk_at_mm(IWLCTX *lx, off_t addr, uint8_t *mm, KVBLK *kbp, KVBLK **blkp) {
868 uint8_t *rp;
869 uint16_t sv;
870 int step;
871 iwrc rc = 0;
872 KVBLK *kb = kbp ? kbp : &lx->kaa[lx->kaan];
873 kb->db = lx->db;
874 kb->addr = addr;
875 kb->maxoff = 0;
876 kb->idxsz = 0;
877 kb->zidx = -1;
878 kb->szpow = 0;
879 kb->flags = KVBLK_DEFAULT;
880 memset(kb->pidx, 0, sizeof(kb->pidx));
881
882 *blkp = 0;
883 rp = mm + addr;
884 memcpy(&kb->szpow, rp, 1);
885 rp += 1;
886 IW_READSV(rp, sv, kb->idxsz);
887 if (IW_UNLIKELY(kb->idxsz > KVBLK_MAX_IDX_SZ)) {
888 rc = IWKV_ERROR_CORRUPTED;
889 iwlog_ecode_error3(rc);
890 goto finish;
891 }
892 for (uint8_t i = 0; i < KVBLK_IDXNUM; ++i) {
893 IW_READVNUMBUF64(rp, kb->pidx[i].off, step);
894 rp += step;
895 IW_READVNUMBUF(rp, kb->pidx[i].len, step);
896 rp += step;
897 if (kb->pidx[i].len) {
898 if (IW_UNLIKELY(!kb->pidx[i].off)) {
899 rc = IWKV_ERROR_CORRUPTED;
900 iwlog_ecode_error3(rc);
901 goto finish;
902 }
903 if (kb->pidx[i].off > kb->maxoff) {
904 kb->maxoff = kb->pidx[i].off;
905 }
906 } else if (kb->zidx < 0) {
907 kb->zidx = i;
908 }
909 kb->pidx[i].ridx = i;
910 }
911 *blkp = kb;
912 assert(rp - (mm + addr) <= (1ULL << kb->szpow));
913 if (!kbp) {
914 AAPOS_INC(lx->kaan);
915 }
916
917 finish:
918 return rc;
919 }
920
_kvblk_compacted_offset(KVBLK * kb)921 IW_INLINE off_t _kvblk_compacted_offset(KVBLK *kb) {
922 off_t coff = 0;
923 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
924 coff += kb->pidx[i].len;
925 }
926 return coff;
927 }
928
_kvblk_compacted_dsize(KVBLK * kb)929 IW_INLINE off_t _kvblk_compacted_dsize(KVBLK *kb) {
930 off_t coff = KVBLK_HDRSZ;
931 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
932 coff += kb->pidx[i].len;
933 coff += IW_VNUMSIZE32(kb->pidx[i].len);
934 coff += IW_VNUMSIZE(kb->pidx[i].off);
935 }
936 return coff;
937 }
938
_kvblk_sync_mm(KVBLK * kb,uint8_t * mm)939 static WUR iwrc _kvblk_sync_mm(KVBLK *kb, uint8_t *mm) {
940 iwrc rc = 0;
941 if (!(kb->flags & KVBLK_DURTY)) {
942 return rc;
943 }
944 uint16_t sp;
945 uint8_t *szp;
946 uint8_t *wp = mm + kb->addr;
947 uint8_t *sptr = wp;
948 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
949 memcpy(wp, &kb->szpow, 1);
950 wp += 1;
951 szp = wp;
952 wp += sizeof(uint16_t);
953 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
954 KVP *kvp = &kb->pidx[i];
955 IW_SETVNUMBUF64(sp, wp, kvp->off);
956 wp += sp;
957 IW_SETVNUMBUF(sp, wp, kvp->len);
958 wp += sp;
959 }
960 sp = wp - szp - sizeof(uint16_t);
961 kb->idxsz = sp;
962 assert(kb->idxsz <= KVBLK_MAX_IDX_SZ);
963 sp = IW_HTOIS(sp);
964 memcpy(szp, &sp, sizeof(uint16_t));
965 assert(wp - (mm + kb->addr) <= (1ULL << kb->szpow));
966 if (dlsnr) {
967 rc = dlsnr->onwrite(dlsnr, kb->addr, sptr, wp - sptr, 0);
968 }
969 kb->flags &= ~KVBLK_DURTY;
970 return rc;
971 }
972
973 #define _kvblk_sort_kv_lt(v1, v2, o) \
974 (((v1).off > 0 ? (v1).off : -1UL) < ((v2).off > 0 ? (v2).off : -1UL))
975
976 // -V:KSORT_INIT:522, 756, 769
KSORT_INIT(kvblk,KVP,_kvblk_sort_kv_lt)977 KSORT_INIT(kvblk, KVP, _kvblk_sort_kv_lt)
978
979 static WUR iwrc _kvblk_compact_mm(KVBLK *kb, uint8_t *mm) {
980 uint8_t i;
981 off_t coff = _kvblk_compacted_offset(kb);
982 if (coff == kb->maxoff) { // compacted
983 return 0;
984 }
985 KVP tidx[KVBLK_IDXNUM];
986 KVP tidx_tmp[KVBLK_IDXNUM];
987 iwrc rc = 0;
988 uint16_t idxsiz = 0;
989 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
990 off_t blkend = kb->addr + (1ULL << kb->szpow);
991 uint8_t *wp = mm + blkend;
992 memcpy(tidx, kb->pidx, sizeof(tidx));
993 ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
994
995 coff = 0;
996 for (i = 0; i < KVBLK_IDXNUM && tidx[i].off; ++i) {
997 #ifndef NDEBUG
998 if (i > 0) {
999 assert(tidx[i - 1].off < tidx[i].off);
1000 }
1001 #endif
1002 KVP *kvp = &kb->pidx[tidx[i].ridx];
1003 off_t noff = coff + kvp->len;
1004 if (kvp->off > noff) {
1005 assert(noff <= (1ULL << kb->szpow) && kvp->len <= noff);
1006 if (dlsnr) {
1007 rc = dlsnr->onwrite(dlsnr, blkend - noff, wp - kvp->off, kvp->len, 0);
1008 }
1009 memmove(wp - noff, wp - kvp->off, kvp->len);
1010 kvp->off = noff;
1011 }
1012 coff += kvp->len;
1013 idxsiz += IW_VNUMSIZE(kvp->off);
1014 idxsiz += IW_VNUMSIZE32(kvp->len);
1015 }
1016 idxsiz += (KVBLK_IDXNUM - i) * 2;
1017 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1018 if (!kb->pidx[i].len) {
1019 kb->zidx = i;
1020 break;
1021 }
1022 }
1023 assert(idxsiz <= kb->idxsz);
1024 kb->idxsz = idxsiz;
1025 kb->maxoff = coff;
1026 if (i == KVBLK_IDXNUM) {
1027 kb->zidx = -1;
1028 }
1029 kb->flags |= KVBLK_DURTY;
1030 assert(_kvblk_compacted_offset(kb) == kb->maxoff);
1031 return rc;
1032 }
1033
_kvblk_maxkvoff(KVBLK * kb)1034 IW_INLINE off_t _kvblk_maxkvoff(KVBLK *kb) {
1035 off_t off = 0;
1036 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1037 if (kb->pidx[i].off > off) {
1038 off = kb->pidx[i].off;
1039 }
1040 }
1041 return off;
1042 }
1043
_kvblk_rmkv(KVBLK * kb,uint8_t idx,kvblk_rmkv_opts_t opts)1044 static WUR iwrc _kvblk_rmkv(KVBLK *kb, uint8_t idx, kvblk_rmkv_opts_t opts) {
1045 iwrc rc = 0;
1046 uint8_t *mm = 0;
1047 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1048 IWFS_FSM *fsm = &kb->db->iwkv->fsm;
1049 if (kb->pidx[idx].off >= kb->maxoff) {
1050 kb->maxoff = 0;
1051 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1052 if ((i != idx) && (kb->pidx[i].off > kb->maxoff)) {
1053 kb->maxoff = kb->pidx[i].off;
1054 }
1055 }
1056 }
1057 kb->pidx[idx].len = 0;
1058 kb->pidx[idx].off = 0;
1059 kb->flags |= KVBLK_DURTY;
1060 if ((kb->zidx < 0) || (idx < kb->zidx)) {
1061 kb->zidx = idx;
1062 }
1063 if (!(RMKV_NO_RESIZE & opts) && (kb->szpow > KVBLK_INISZPOW)) {
1064 off_t nlen = 1ULL << kb->szpow;
1065 off_t dsz = _kvblk_compacted_dsize(kb);
1066 if (nlen >= 2 * dsz) {
1067 uint8_t npow = kb->szpow - 1;
1068 while (npow > KVBLK_INISZPOW && (1ULL << (npow - 1)) >= dsz) {
1069 --npow;
1070 }
1071 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1072 RCGO(rc, finish);
1073
1074 rc = _kvblk_compact_mm(kb, mm);
1075 RCGO(rc, finish);
1076
1077 off_t maxoff = _kvblk_maxkvoff(kb);
1078 if (dlsnr) {
1079 rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << npow) - maxoff, mm + kb->addr + nlen - maxoff, maxoff, 0);
1080 RCGO(rc, finish);
1081 }
1082 memmove(mm + kb->addr + (1ULL << npow) - maxoff,
1083 mm + kb->addr + nlen - maxoff,
1084 (size_t) maxoff);
1085
1086 fsm->release_mmap(fsm);
1087 mm = 0;
1088 rc = fsm->reallocate(fsm, (1ULL << npow), &kb->addr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1089 RCGO(rc, finish);
1090 kb->szpow = npow;
1091 assert(nlen == (1ULL << kb->szpow));
1092 opts |= RMKV_SYNC;
1093 }
1094 }
1095 if (RMKV_SYNC & opts) {
1096 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1097 RCGO(rc, finish);
1098 IWRC(_kvblk_sync_mm(kb, mm), rc);
1099 }
1100
1101 finish:
1102 if (mm) {
1103 fsm->release_mmap(fsm);
1104 }
1105 return rc;
1106 }
1107
_kvblk_addkv(KVBLK * kb,const IWKV_val * key,const IWKV_val * val,uint8_t * oidx,bool raw_key)1108 static WUR iwrc _kvblk_addkv(
1109 KVBLK *kb,
1110 const IWKV_val *key,
1111 const IWKV_val *val,
1112 uint8_t *oidx,
1113 bool raw_key) {
1114 *oidx = 0;
1115
1116 iwrc rc = 0;
1117 off_t msz; // max available free space
1118 off_t rsz; // required size to add new key/value pair
1119 off_t noff; // offset of new kvpair from end of block
1120 uint8_t *mm, *wp, *sptr;
1121 size_t i, sp;
1122 KVP *kvp;
1123 IWDB db = kb->db;
1124 bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1125 IWFS_FSM *fsm = &db->iwkv->fsm;
1126 bool compacted = false;
1127 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1128 IWKV_val *uval = (IWKV_val*) val;
1129
1130 size_t ksize = key->size;
1131 if (compound) {
1132 ksize += IW_VNUMSIZE(key->compound);
1133 }
1134 off_t psz = IW_VNUMSIZE(ksize) + ksize;
1135
1136 if (kb->zidx < 0) {
1137 return _IWKV_RC_KVBLOCK_FULL;
1138 }
1139 psz += uval->size;
1140 if (psz > IWKV_MAX_KVSZ) {
1141 return IWKV_ERROR_MAXKVSZ;
1142 }
1143
1144 start:
1145 // [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] // KVBLK
1146 msz = (1ULL << kb->szpow) - (KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1147 assert(msz >= 0);
1148 noff = kb->maxoff + psz;
1149 rsz = psz + IW_VNUMSIZE(noff) + IW_VNUMSIZE(psz);
1150
1151 if (msz < rsz) { // not enough space
1152 if (!compacted) {
1153 compacted = true;
1154 if (_kvblk_compacted_offset(kb) != kb->maxoff) {
1155 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1156 RCGO(rc, finish);
1157 rc = _kvblk_compact_mm(kb, mm);
1158 RCGO(rc, finish);
1159 fsm->release_mmap(fsm);
1160 goto start;
1161 }
1162 }
1163 // resize the whole block
1164 off_t nlen = 1ULL << kb->szpow;
1165 off_t nsz = rsz - msz + nlen;
1166 off_t naddr = kb->addr;
1167 off_t olen = nlen;
1168
1169 uint8_t npow = kb->szpow;
1170 while ((1ULL << ++npow) < nsz);
1171
1172 rc = fsm->allocate(fsm, (1ULL << npow), &naddr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1173 RCGO(rc, finish);
1174 assert(nlen == (1ULL << npow));
1175 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1176 RCGO(rc, finish);
1177 if (dlsnr) {
1178 rc = dlsnr->onwrite(dlsnr, naddr, mm + kb->addr, KVBLK_HDRSZ, 0);
1179 RCGO(rc, finish);
1180 memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1181 rc = dlsnr->onwrite(dlsnr, naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, kb->maxoff, 0);
1182 RCGO(rc, finish);
1183 memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1184 } else {
1185 memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1186 memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1187 }
1188 fsm->release_mmap(fsm);
1189 rc = fsm->deallocate(fsm, kb->addr, olen);
1190 RCGO(rc, finish);
1191
1192 kb->addr = naddr;
1193 kb->szpow = npow;
1194 }
1195 *oidx = (uint8_t) kb->zidx;
1196 kvp = &kb->pidx[kb->zidx];
1197 kvp->len = (uint32_t) psz;
1198 kvp->off = noff;
1199 kvp->ridx = (uint8_t) kb->zidx;
1200 kb->maxoff = noff;
1201 kb->flags |= KVBLK_DURTY;
1202 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1203 if (!kb->pidx[i].len && (i != kb->zidx)) {
1204 kb->zidx = i;
1205 break;
1206 }
1207 }
1208 if (i >= KVBLK_IDXNUM) {
1209 kb->zidx = -1;
1210 }
1211 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1212 RCGO(rc, finish);
1213 assert((1ULL << kb->szpow) >= KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1214 assert(kvp->off < (1ULL << kb->szpow) && kvp->len <= kvp->off);
1215 wp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
1216 sptr = wp;
1217 // [klen:vn,key,value]
1218 IW_SETVNUMBUF(sp, wp, ksize);
1219 wp += sp;
1220 if (compound) {
1221 IW_SETVNUMBUF64(sp, wp, key->compound);
1222 wp += sp;
1223 }
1224 memcpy(wp, key->data, key->size);
1225 wp += key->size;
1226 memcpy(wp, uval->data, uval->size);
1227 wp += uval->size;
1228 #ifndef NDEBUG
1229 assert(wp - sptr == kvp->len);
1230 #endif
1231 if (dlsnr) {
1232 rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << kb->szpow) - kvp->off, sptr, wp - sptr, 0);
1233 }
1234 fsm->release_mmap(fsm);
1235
1236 finish:
1237 return rc;
1238 }
1239
_kvblk_updatev(KVBLK * kb,uint8_t * idxp,const IWKV_val * key,const IWKV_val * val)1240 static WUR iwrc _kvblk_updatev(
1241 KVBLK *kb,
1242 uint8_t *idxp,
1243 const IWKV_val *key, /* Nullable */
1244 const IWKV_val *val) {
1245 assert(*idxp < KVBLK_IDXNUM);
1246 int32_t i;
1247 uint32_t len, nlen, sz;
1248 uint8_t pidx = *idxp, *mm = 0, *wp, *sp;
1249 IWDB db = kb->db;
1250 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1251 IWKV_val *uval = (IWKV_val*) val;
1252 IWKV_val *ukey = (IWKV_val*) key;
1253 IWKV_val skey; // stack allocated key/val
1254 KVP *kvp = &kb->pidx[pidx];
1255 size_t kbsz = 1ULL << kb->szpow; // kvblk size
1256 off_t freesz = kbsz - KVBLK_HDRSZ - kb->idxsz - kb->maxoff; // free space available
1257 IWFS_FSM *fsm = &db->iwkv->fsm;
1258
1259 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1260 RCRET(rc);
1261 assert(freesz >= 0);
1262
1263 wp = mm + kb->addr + kbsz - kvp->off;
1264 sp = wp;
1265 IW_READVNUMBUF(wp, len, sz);
1266 wp += sz;
1267 if (ukey && (len != ukey->size)) {
1268 rc = IWKV_ERROR_CORRUPTED;
1269 iwlog_ecode_error3(rc);
1270 goto finish;
1271 }
1272 wp += len;
1273 off_t rsize = sz + len + uval->size; // required size
1274 if (rsize <= kvp->len) {
1275 memcpy(wp, uval->data, uval->size);
1276 if (dlsnr) {
1277 rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1278 RCGO(rc, finish);
1279 }
1280 wp += uval->size;
1281 if ((wp - sp) != kvp->len) {
1282 kvp->len = wp - sp;
1283 kb->flags |= KVBLK_DURTY;
1284 }
1285 } else {
1286 KVP tidx[KVBLK_IDXNUM];
1287 KVP tidx_tmp[KVBLK_IDXNUM];
1288 off_t koff = kb->pidx[pidx].off;
1289 memcpy(tidx, kb->pidx, KVBLK_IDXNUM * sizeof(kb->pidx[0]));
1290 ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
1291 kb->flags |= KVBLK_DURTY;
1292 if (!ukey) { // we need a key
1293 ukey = &skey;
1294 rc = _kvblk_key_get(kb, mm, pidx, ukey);
1295 RCGO(rc, finish);
1296 }
1297 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1298 if (tidx[i].off == koff) {
1299 if (koff - ((i > 0) ? tidx[i - 1].off : 0) >= rsize) {
1300 nlen = wp + uval->size - sp;
1301 if (!((nlen > kvp->len) && (freesz - IW_VNUMSIZE32(nlen) + IW_VNUMSIZE32(kvp->len) < 0))) { // enough space?
1302 memcpy(wp, uval->data, uval->size);
1303 if (dlsnr) {
1304 rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1305 RCGO(rc, finish);
1306 }
1307 wp += uval->size;
1308 kvp->len = nlen;
1309 break;
1310 ;
1311 }
1312 }
1313 mm = 0;
1314 fsm->release_mmap(fsm);
1315 rc = _kvblk_rmkv(kb, pidx, RMKV_NO_RESIZE);
1316 RCGO(rc, finish);
1317 rc = _kvblk_addkv(kb, ukey, uval, idxp, false);
1318 break;
1319 }
1320 }
1321 }
1322
1323 finish:
1324 if (ukey != key) {
1325 _kv_val_dispose(ukey);
1326 }
1327 if (mm) {
1328 IWRC(fsm->release_mmap(fsm), rc);
1329 }
1330 return rc;
1331 }
1332
1333 //-------------------------- SBLK
1334
_sblk_release(IWLCTX * lx,SBLK ** sblkp)1335 IW_INLINE void _sblk_release(IWLCTX *lx, SBLK **sblkp) {
1336 assert(sblkp && *sblkp);
1337 SBLK *sblk = *sblkp;
1338 sblk->flags &= ~SBLK_CACHE_FLAGS; // clear cache flags
1339 sblk->flags &= ~SBLK_DURTY; // clear dirty flag
1340 sblk->kvblk = 0;
1341 *sblkp = 0;
1342 }
1343
_sblk_loadkvblk_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1344 IW_INLINE WUR iwrc _sblk_loadkvblk_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1345 if (!sblk->kvblk && sblk->kvblkn) {
1346 return _kvblk_at_mm(lx, BLK2ADDR(sblk->kvblkn), mm, 0, &sblk->kvblk);
1347 } else {
1348 return 0;
1349 }
1350 }
1351
_sblk_is_only_one_on_page_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * page_addr)1352 static bool _sblk_is_only_one_on_page_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *page_addr) {
1353 *page_addr = 0;
1354 if ((sblk->bpos > 0) && (sblk->bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
1355 off_t addr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1356 *page_addr = addr;
1357 for (int i = 0; i < SBLK_PAGE_SBLK_NUM_V2; ++i) {
1358 if (i != sblk->bpos - 1) {
1359 uint8_t bv;
1360 memcpy(&bv, mm + addr + i * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1361 if (bv) {
1362 return false;
1363 }
1364 }
1365 }
1366 } else {
1367 return false; // be safe
1368 }
1369 return true;
1370 }
1371
_sblk_destroy(IWLCTX * lx,SBLK ** sblkp)1372 IW_INLINE WUR iwrc _sblk_destroy(IWLCTX *lx, SBLK **sblkp) {
1373 assert(sblkp && *sblkp && (*sblkp)->addr);
1374 iwrc rc = 0;
1375 SBLK *sblk = *sblkp;
1376 lx->destroy_addr = sblk->addr;
1377
1378 if (!(sblk->flags & SBLK_DB)) {
1379 uint8_t kvb_szpow, *mm;
1380 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1381 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1382 off_t kvb_addr = BLK2ADDR(sblk->kvblkn);
1383 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1384 RCRET(rc);
1385
1386 if (!sblk->kvblk) {
1387 // Read KVBLK size as power of two
1388 memcpy(&kvb_szpow, mm + kvb_addr + KBLK_SZPOW_OFF, 1);
1389 } else {
1390 kvb_szpow = sblk->kvblk->szpow;
1391 }
1392 if (lx->db->lcnt[sblk->lvl]) {
1393 lx->db->lcnt[sblk->lvl]--;
1394 lx->db->flags |= SBLK_DURTY;
1395 }
1396 _dbcache_remove_lw(lx, sblk);
1397 if (lx->db->iwkv->fmt_version > 1) {
1398 off_t paddr;
1399 if (_sblk_is_only_one_on_page_v2(lx, mm, sblk, &paddr)) {
1400 fsm->release_mmap(fsm);
1401 // Deallocate whole page
1402 rc = fsm->deallocate(fsm, paddr, SBLK_PAGE_SZ_V2);
1403 } else {
1404 memset(mm + sblk->addr + SOFF_BPOS_U1_V2, 0, 1);
1405 fsm->release_mmap(fsm);
1406 if (dlsnr) {
1407 dlsnr->onset(dlsnr, sblk->addr + SOFF_BPOS_U1_V2, 0, 1, 0);
1408 }
1409 }
1410 } else {
1411 fsm->release_mmap(fsm);
1412 rc = fsm->deallocate(fsm, sblk->addr, SBLK_SZ);
1413 }
1414 IWRC(fsm->deallocate(fsm, kvb_addr, 1ULL << kvb_szpow), rc);
1415 }
1416 _sblk_release(lx, sblkp);
1417 return rc;
1418 }
1419
_sblk_genlevel(IWDB db)1420 IW_INLINE uint8_t _sblk_genlevel(IWDB db) {
1421 uint8_t lvl;
1422 #ifdef IW_TESTS
1423 if (iwkv_next_level >= 0) {
1424 lvl = (uint8_t) iwkv_next_level;
1425 iwkv_next_level = -1;
1426 assert(lvl < SLEVELS);
1427 return lvl;
1428 }
1429 #endif
1430 uint32_t r = iwu_rand_u32();
1431 for (lvl = 0; lvl < SLEVELS && !(r & 1); ++lvl) r >>= 1;
1432 uint8_t ret = IW_UNLIKELY(lvl >= SLEVELS) ? SLEVELS - 1 : lvl;
1433 while (ret > 0 && db->lcnt[ret - 1] == 0) {
1434 --ret;
1435 }
1436 return ret;
1437 }
1438
_sblk_create_v1(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,off_t baddr,uint8_t bpos,SBLK ** oblk)1439 static WUR iwrc _sblk_create_v1(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, off_t baddr, uint8_t bpos, SBLK **oblk) {
1440 iwrc rc;
1441 SBLK *sblk;
1442 KVBLK *kvblk;
1443 off_t blen;
1444 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1445 if (kvbpow < KVBLK_INISZPOW) {
1446 kvbpow = KVBLK_INISZPOW;
1447 }
1448 *oblk = 0;
1449 if (!bpos) {
1450 rc = fsm->allocate(fsm, SBLK_SZ + (1ULL << kvbpow), &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1451 RCRET(rc);
1452 assert(blen - SBLK_SZ == (1ULL << kvbpow));
1453 _kvblk_create(lx, baddr + SBLK_SZ, kvbpow, &kvblk);
1454 } else {
1455 // Allocate kvblk as separate chunk
1456 off_t kblkaddr = 0;
1457 rc = fsm->allocate(fsm, (1ULL << kvbpow), &kblkaddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1458 assert(blen == (1ULL << kvbpow));
1459 _kvblk_create(lx, kblkaddr, kvbpow, &kvblk);
1460 }
1461 sblk = &lx->saa[lx->saan];
1462 sblk->db = lx->db;
1463 sblk->db->lcnt[nlevel]++;
1464 sblk->db->flags |= SBLK_DURTY;
1465 sblk->addr = baddr;
1466 sblk->flags = (SBLK_DURTY | SBLK_CACHE_PUT);
1467 sblk->lvl = nlevel;
1468 sblk->p0 = 0;
1469 memset(sblk->n, 0, sizeof(sblk->n));
1470 sblk->kvblk = kvblk;
1471 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1472 sblk->lkl = 0;
1473 sblk->pnum = 0;
1474 sblk->bpos = bpos;
1475 memset(sblk->pi, 0, sizeof(sblk->pi));
1476 *oblk = sblk;
1477 AAPOS_INC(lx->saan);
1478 return 0;
1479 }
1480
_sblk_find_free_page_slot_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * obaddr,uint8_t * oslot)1481 static void _sblk_find_free_page_slot_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *obaddr, uint8_t *oslot) {
1482 if ((sblk->bpos < 1) || (sblk->bpos > SBLK_PAGE_SBLK_NUM_V2)) {
1483 *obaddr = 0;
1484 *oslot = 0;
1485 return;
1486 }
1487 off_t paddr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1488 for (int i = sblk->bpos + 1; i <= SBLK_PAGE_SBLK_NUM_V2; ++i) {
1489 uint8_t slot;
1490 memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1491 if (!slot) {
1492 *obaddr = paddr + (i - 1) * SBLK_SZ;
1493 *oslot = i;
1494 return;
1495 }
1496 }
1497 for (int i = sblk->bpos - 1; i > 0; --i) {
1498 uint8_t slot;
1499 memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1500 if (!slot) {
1501 *obaddr = paddr + (i - 1) * SBLK_SZ;
1502 *oslot = i;
1503 return;
1504 }
1505 }
1506 *obaddr = 0;
1507 *oslot = 0;
1508 }
1509
1510 /// Create
_sblk_create_v2(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1511 static WUR iwrc _sblk_create_v2(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1512 off_t baddr = 0;
1513 uint8_t bpos = 0, *mm;
1514 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1515 SBLK *_lower = lower;
1516 SBLK *_upper = upper;
1517
1518 for (int i = SLEVELS - 1; i >= 0; --i) {
1519 if (lx->pupper[i] && (lx->pupper[i]->lvl >= nlevel)) {
1520 _upper = lx->pupper[i];
1521 }
1522 if (lx->plower[i] && (lx->plower[i]->lvl >= nlevel)) {
1523 _lower = lx->plower[i];
1524 }
1525 }
1526
1527 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1528 RCRET(rc);
1529 _sblk_find_free_page_slot_v2(lx, mm, _lower, &baddr, &bpos);
1530 if (!baddr && _upper && (_upper->addr != _lower->addr)) {
1531 _sblk_find_free_page_slot_v2(lx, mm, _upper, &baddr, &bpos);
1532 }
1533 if (!baddr) {
1534 if (_lower->addr != lower->addr) {
1535 _sblk_find_free_page_slot_v2(lx, mm, lower, &baddr, &bpos);
1536 }
1537 if (!baddr && upper && _upper && (_upper->addr != upper->addr)) {
1538 _sblk_find_free_page_slot_v2(lx, mm, upper, &baddr, &bpos);
1539 }
1540 }
1541 fsm->release_mmap(fsm);
1542
1543 if (!baddr) {
1544 // No free slots - allocate new SBLK page
1545 off_t blen;
1546 bpos = 1;
1547 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1548 rc = fsm->allocate(fsm, SBLK_PAGE_SZ_V2, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1549 RCRET(rc);
1550 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1551 RCRET(rc);
1552 // Fill page to zero
1553 memset(mm + baddr, 0, blen);
1554 if (dlsnr) {
1555 rc = dlsnr->onset(dlsnr, baddr, 0, blen, 0);
1556 }
1557 fsm->release_mmap(fsm);
1558 RCRET(rc);
1559 }
1560 return _sblk_create_v1(lx, nlevel, kvbpow, baddr, bpos, oblk);
1561 }
1562
_sblk_create(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1563 IW_INLINE WUR iwrc _sblk_create(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1564 if (lx->db->iwkv->fmt_version > 1) {
1565 return _sblk_create_v2(lx, nlevel, kvbpow, lower, upper, oblk);
1566 } else {
1567 return _sblk_create_v1(lx, nlevel, kvbpow, lower->addr, 0, oblk);
1568 }
1569 }
1570
_sblk_at2(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK * sblk)1571 static WUR iwrc _sblk_at2(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK *sblk) {
1572 iwrc rc;
1573 uint8_t *mm;
1574 uint32_t lv;
1575 sblk_flags_t flags = lx->sbflags | flgs;
1576 IWDB db = lx->db;
1577 IWFS_FSM *fsm = &db->iwkv->fsm;
1578 sblk->kvblk = 0;
1579 sblk->bpos = 0;
1580 sblk->db = db;
1581
1582 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1583 RCRET(rc);
1584
1585 if (IW_UNLIKELY(addr == db->addr)) {
1586 uint8_t *rp = mm + addr + DOFF_N0_U4;
1587 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1588 sblk->addr = addr;
1589 sblk->flags = SBLK_DB | flags;
1590 sblk->lvl = 0;
1591 sblk->p0 = 0;
1592 sblk->kvblkn = 0;
1593 sblk->lkl = 0;
1594 sblk->pnum = KVBLK_IDXNUM;
1595 memset(sblk->pi, 0, sizeof(sblk->pi));
1596 for (int i = 0; i < SLEVELS; ++i) {
1597 IW_READLV(rp, lv, sblk->n[i]);
1598 if (sblk->n[i]) {
1599 ++sblk->lvl;
1600 } else {
1601 break;
1602 }
1603 }
1604 if (sblk->lvl) {
1605 --sblk->lvl;
1606 }
1607 } else if (addr) {
1608 uint8_t uflags;
1609 uint8_t *rp = mm + addr;
1610 sblk->addr = addr;
1611 // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
1612 memcpy(&uflags, rp++, 1);
1613 sblk->flags = uflags;
1614 if (sblk->flags & ~SBLK_PERSISTENT_FLAGS) {
1615 rc = IWKV_ERROR_CORRUPTED;
1616 iwlog_ecode_error3(rc);
1617 goto finish;
1618 }
1619 sblk->flags |= flags;
1620 memcpy(&sblk->lvl, rp++, 1);
1621 if (sblk->lvl >= SLEVELS) {
1622 rc = IWKV_ERROR_CORRUPTED;
1623 iwlog_ecode_error3(rc);
1624 goto finish;
1625 }
1626 memcpy(&sblk->lkl, rp++, 1);
1627 if (sblk->lkl > db->iwkv->pklen) {
1628 rc = IWKV_ERROR_CORRUPTED;
1629 iwlog_ecode_error3(rc);
1630 goto finish;
1631 }
1632 memcpy(&sblk->pnum, rp++, 1);
1633 if (sblk->pnum < 0) {
1634 rc = IWKV_ERROR_CORRUPTED;
1635 iwlog_ecode_error3(rc);
1636 goto finish;
1637 }
1638 memcpy(&sblk->p0, rp, 4);
1639 sblk->p0 = IW_ITOHL(sblk->p0);
1640 rp += 4;
1641 memcpy(&sblk->kvblkn, rp, 4);
1642 sblk->kvblkn = IW_ITOHL(sblk->kvblkn);
1643 rp += 4;
1644 memcpy(sblk->pi, rp, KVBLK_IDXNUM);
1645 rp += KVBLK_IDXNUM;
1646
1647 #ifdef IW_BIGENDIAN
1648 for (int i = 0; i <= sblk->lvl; ++i) {
1649 memcpy(&sblk->n[i], rp, 4);
1650 sblk->n[i] = IW_ITOHL(sblk->n[i]);
1651 rp += 4;
1652 }
1653 #else
1654 memcpy(sblk->n, rp, 4 * (sblk->lvl + 1));
1655 rp += 4 * (sblk->lvl + 1);
1656 #endif
1657 if (db->iwkv->fmt_version > 1) {
1658 rp = mm + addr + SOFF_BPOS_U1_V2;
1659 memcpy(&sblk->bpos, rp++, 1);
1660 } else {
1661 rp = mm + addr + SOFF_LK_V1;
1662 }
1663 // Lower key
1664 memcpy(sblk->lk, rp, (size_t) sblk->lkl);
1665 } else { // Database tail
1666 uint8_t *rp = mm + db->addr + DOFF_P0_U4;
1667 sblk->addr = 0;
1668 sblk->flags = SBLK_DB | flags;
1669 sblk->lvl = 0;
1670 sblk->kvblkn = 0;
1671 sblk->lkl = 0;
1672 sblk->pnum = KVBLK_IDXNUM;
1673 memset(sblk->pi, 0, sizeof(sblk->pi));
1674 IW_READLV(rp, lv, sblk->p0);
1675 if (!sblk->p0) {
1676 sblk->p0 = ADDR2BLK(db->addr);
1677 }
1678 }
1679
1680 finish:
1681 fsm->release_mmap(fsm);
1682 return rc;
1683 }
1684
_sblk_at(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK ** sblkp)1685 IW_INLINE WUR iwrc _sblk_at(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK **sblkp) {
1686 *sblkp = 0;
1687 SBLK *sblk = &lx->saa[lx->saan];
1688 iwrc rc = _sblk_at2(lx, addr, flgs, sblk);
1689 AAPOS_INC(lx->saan);
1690 *sblkp = sblk;
1691 return rc;
1692 }
1693
_sblk_sync_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1694 static WUR iwrc _sblk_sync_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1695 iwrc rc = 0;
1696 if (sblk->flags & SBLK_DURTY) {
1697 uint32_t lv;
1698 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1699 sblk->flags &= ~SBLK_DURTY;
1700 if (IW_UNLIKELY(sblk->flags & SBLK_DB)) {
1701 uint8_t *sp;
1702 uint8_t *wp = mm + sblk->db->addr;
1703 if (sblk->addr) {
1704 assert(sblk->addr == sblk->db->addr);
1705 wp += DOFF_N0_U4;
1706 sp = wp;
1707 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1708 for (int i = 0; i < SLEVELS; ++i) {
1709 IW_WRITELV(wp, lv, sblk->n[i]);
1710 }
1711 assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1712 for (int i = 0; i < SLEVELS; ++i) {
1713 IW_WRITELV(wp, lv, lx->db->lcnt[i]);
1714 }
1715 } else { // Database tail
1716 wp += DOFF_P0_U4;
1717 sp = wp;
1718 IW_WRITELV(wp, lv, sblk->p0);
1719 assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1720 }
1721 if (dlsnr) {
1722 rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
1723 }
1724 return rc;
1725 } else {
1726 uint8_t *wp = mm + sblk->addr;
1727 sblk_flags_t flags = (sblk->flags & SBLK_PERSISTENT_FLAGS);
1728 uint8_t uflags = flags;
1729 assert(sblk->lkl <= lx->db->iwkv->pklen);
1730 // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256
1731 wp += SOFF_FLAGS_U1;
1732 memcpy(wp++, &uflags, 1);
1733 memcpy(wp++, &sblk->lvl, 1);
1734 memcpy(wp++, &sblk->lkl, 1);
1735 memcpy(wp++, &sblk->pnum, 1);
1736 IW_WRITELV(wp, lv, sblk->p0);
1737 IW_WRITELV(wp, lv, sblk->kvblkn);
1738 memcpy(wp, sblk->pi, KVBLK_IDXNUM);
1739 wp = mm + sblk->addr + SOFF_N0_U4;
1740
1741 #ifdef IW_BIGENDIAN
1742 for (int i = 0; i <= sblk->lvl; ++i) {
1743 IW_WRITELV(wp, lv, sblk->n[i]);
1744 }
1745 #else
1746 memcpy(wp, sblk->n, 4 * (sblk->lvl + 1));
1747 wp += 4 * (sblk->lvl + 1);
1748 #endif
1749
1750 if (lx->db->iwkv->fmt_version > 1) {
1751 wp = mm + sblk->addr + SOFF_BPOS_U1_V2;
1752 memcpy(wp++, &sblk->bpos, 1);
1753 } else {
1754 wp = mm + sblk->addr + SOFF_LK_V1;
1755 }
1756 memcpy(wp, sblk->lk, (size_t) sblk->lkl);
1757 if (dlsnr) {
1758 rc = dlsnr->onwrite(dlsnr, sblk->addr, mm + sblk->addr, SOFF_END, 0);
1759 RCRET(rc);
1760 }
1761 }
1762 }
1763 if (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY)) {
1764 IWRC(_kvblk_sync_mm(sblk->kvblk, mm), rc);
1765 }
1766 if (sblk->flags & SBLK_CACHE_UPDATE) {
1767 _dbcache_update_lw(lx, sblk);
1768 }
1769 return rc;
1770 }
1771
_sblk_sync(IWLCTX * lx,SBLK * sblk)1772 IW_INLINE WUR iwrc _sblk_sync(IWLCTX *lx, SBLK *sblk) {
1773 if ((sblk->flags & SBLK_DURTY) || (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY))) {
1774 uint8_t *mm;
1775 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1776 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1777 RCRET(rc);
1778 rc = _sblk_sync_mm(lx, sblk, mm);
1779 fsm->release_mmap(fsm);
1780 return rc;
1781 }
1782 return 0;
1783 }
1784
_sblk_sync_and_release_mm(IWLCTX * lx,SBLK ** sblkp,uint8_t * mm)1785 IW_INLINE WUR iwrc _sblk_sync_and_release_mm(IWLCTX *lx, SBLK **sblkp, uint8_t *mm) {
1786 SBLK *sblk = *sblkp;
1787 if (lx->destroy_addr && (lx->destroy_addr == sblk->addr)) {
1788 return 0;
1789 }
1790 iwrc rc = 0;
1791 if (mm) {
1792 rc = _sblk_sync_mm(lx, *sblkp, mm);
1793 }
1794 _sblk_release(lx, sblkp);
1795 return rc;
1796 }
1797
_sblk_find_pi_mm(SBLK * sblk,IWLCTX * lx,const uint8_t * mm,bool * found,uint8_t * idxp)1798 static WUR iwrc _sblk_find_pi_mm(SBLK *sblk, IWLCTX *lx, const uint8_t *mm, bool *found, uint8_t *idxp) {
1799 *found = false;
1800 if (sblk->flags & SBLK_DB) {
1801 *idxp = KVBLK_IDXNUM;
1802 return 0;
1803 }
1804 uint8_t *k;
1805 uint32_t kl;
1806 int idx = 0, lb = 0, ub = sblk->pnum - 1;
1807 iwdb_flags_t dbflg = lx->db->dbflg;
1808
1809 if (sblk->pnum < 1) {
1810 *idxp = 0;
1811 return 0;
1812 }
1813 while (1) {
1814 idx = (ub + lb) / 2;
1815 iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1816 RCRET(rc);
1817 int cr = _cmp_keys(dbflg, k, kl, lx->key);
1818 if (!cr) {
1819 *found = true;
1820 break;
1821 } else if (cr < 0) {
1822 lb = idx + 1;
1823 if (lb > ub) {
1824 idx = lb;
1825 break;
1826 }
1827 } else {
1828 ub = idx - 1;
1829 if (lb > ub) {
1830 break;
1831 }
1832 }
1833 }
1834 *idxp = idx;
1835 return 0;
1836 }
1837
_sblk_insert_pi_mm(SBLK * sblk,uint8_t nidx,IWLCTX * lx,const uint8_t * mm,uint8_t * idxp)1838 static WUR iwrc _sblk_insert_pi_mm(
1839 SBLK *sblk, uint8_t nidx, IWLCTX *lx,
1840 const uint8_t *mm, uint8_t *idxp) {
1841 assert(sblk->kvblk);
1842
1843 uint8_t *k;
1844 uint32_t kl;
1845 int idx = 0, lb = 0, ub = sblk->pnum - 1, nels = sblk->pnum; // NOLINT
1846
1847 if (nels < 1) {
1848 sblk->pi[0] = nidx;
1849 ++sblk->pnum;
1850 *idxp = 0;
1851 return 0;
1852 }
1853 iwdb_flags_t dbflg = sblk->db->dbflg;
1854 while (1) {
1855 idx = (ub + lb) / 2;
1856 iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1857 RCRET(rc);
1858 int cr = _cmp_keys(dbflg, k, kl, lx->key);
1859 if (!cr) {
1860 break;
1861 } else if (cr < 0) {
1862 lb = idx + 1;
1863 if (lb > ub) {
1864 idx = lb;
1865 ++sblk->pnum;
1866 break;
1867 }
1868 } else {
1869 ub = idx - 1;
1870 if (lb > ub) {
1871 ++sblk->pnum;
1872 break;
1873 }
1874 }
1875 }
1876 if (nels - idx > 0) {
1877 memmove(sblk->pi + idx + 1, sblk->pi + idx, nels - idx);
1878 }
1879 sblk->pi[idx] = nidx;
1880 *idxp = idx;
1881 return 0;
1882 }
1883
_sblk_addkv2(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val,bool raw_key)1884 static WUR iwrc _sblk_addkv2(
1885 SBLK *sblk,
1886 int8_t idx,
1887 const IWKV_val *key,
1888 const IWKV_val *val,
1889 bool raw_key) {
1890 assert(sblk && key && key->size && key->data && val && idx >= 0 && sblk->kvblk);
1891
1892 uint8_t kvidx;
1893 IWDB db = sblk->db;
1894 KVBLK *kvblk = sblk->kvblk;
1895 if (sblk->pnum >= KVBLK_IDXNUM) {
1896 return _IWKV_RC_KVBLOCK_FULL;
1897 }
1898
1899 iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, raw_key);
1900 RCRET(rc);
1901 if (sblk->pnum - idx > 0) {
1902 memmove(sblk->pi + idx + 1, sblk->pi + idx, sblk->pnum - idx);
1903 }
1904 sblk->pi[idx] = kvidx;
1905 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1906 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1907 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1908 sblk->flags |= SBLK_CACHE_UPDATE;
1909 }
1910 }
1911 ++sblk->pnum;
1912 sblk->flags |= SBLK_DURTY;
1913 if (idx == 0) { // the lowest key inserted
1914 size_t ksize = key->size;
1915 bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1916 if (compound) {
1917 ksize += IW_VNUMSIZE(key->compound);
1918 }
1919 sblk->lkl = MIN(db->iwkv->pklen, ksize);
1920 uint8_t *wp = sblk->lk;
1921 if (compound) {
1922 int len;
1923 IW_SETVNUMBUF64(len, wp, key->compound);
1924 wp += len;
1925 }
1926 memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1927 if (ksize <= db->iwkv->pklen) {
1928 sblk->flags |= SBLK_FULL_LKEY;
1929 } else {
1930 sblk->flags &= ~SBLK_FULL_LKEY;
1931 }
1932 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1933 sblk->flags |= SBLK_CACHE_UPDATE;
1934 }
1935 }
1936 if (!raw_key) {
1937 // Update active cursors inside this block
1938 pthread_spin_lock(&db->cursors_slk);
1939 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1940 if (cur->cn && (cur->cn->addr == sblk->addr)) {
1941 if (cur->cn != sblk) {
1942 memcpy(cur->cn, sblk, sizeof(*cur->cn));
1943 cur->cn->kvblk = 0;
1944 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1945 }
1946 if (cur->cnpos >= idx) {
1947 cur->cnpos++;
1948 }
1949 }
1950 }
1951 pthread_spin_unlock(&db->cursors_slk);
1952 }
1953 return 0;
1954 }
1955
_sblk_addkv(SBLK * sblk,IWLCTX * lx)1956 static WUR iwrc _sblk_addkv(SBLK *sblk, IWLCTX *lx) {
1957 const IWKV_val *key = lx->key;
1958 const IWKV_val *val = lx->val;
1959 assert(key && key->size && key->data && val && sblk->kvblk);
1960 if (!sblk) {
1961 iwlog_error2("sblk != 0");
1962 return IW_ERROR_ASSERTION;
1963 }
1964 uint8_t *mm, idx, kvidx;
1965 IWDB db = sblk->db;
1966 KVBLK *kvblk = sblk->kvblk;
1967 IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
1968 if (sblk->pnum >= KVBLK_IDXNUM) {
1969 return _IWKV_RC_KVBLOCK_FULL;
1970 }
1971 iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, false);
1972 RCRET(rc);
1973 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1974 RCRET(rc);
1975 rc = _sblk_insert_pi_mm(sblk, kvidx, lx, mm, &idx);
1976 RCRET(rc);
1977 fsm->release_mmap(fsm);
1978 if (idx == 0) { // the lowest key inserted
1979 size_t ksize = key->size;
1980 bool compound = (db->dbflg & IWDB_COMPOUND_KEYS);
1981 if (compound) {
1982 ksize += IW_VNUMSIZE(key->compound);
1983 }
1984 sblk->lkl = MIN(db->iwkv->pklen, ksize);
1985 uint8_t *wp = sblk->lk;
1986 if (compound) {
1987 int len;
1988 IW_SETVNUMBUF64(len, wp, key->compound);
1989 wp += len;
1990 }
1991 memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1992 if (ksize <= db->iwkv->pklen) {
1993 sblk->flags |= SBLK_FULL_LKEY;
1994 } else {
1995 sblk->flags &= ~SBLK_FULL_LKEY;
1996 }
1997 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1998 sblk->flags |= SBLK_CACHE_UPDATE;
1999 }
2000 }
2001 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2002 sblk->kvblkn = ADDR2BLK(kvblk->addr);
2003 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2004 sblk->flags |= SBLK_CACHE_UPDATE;
2005 }
2006 }
2007 sblk->flags |= SBLK_DURTY;
2008
2009 // Update active cursors inside this block
2010 pthread_spin_lock(&db->cursors_slk);
2011 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2012 if (cur->cn && (cur->cn->addr == sblk->addr)) {
2013 if (cur->cn != sblk) {
2014 memcpy(cur->cn, sblk, sizeof(*cur->cn));
2015 cur->cn->kvblk = 0;
2016 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2017 }
2018 if (cur->cnpos >= idx) {
2019 cur->cnpos++;
2020 }
2021 }
2022 }
2023 pthread_spin_unlock(&db->cursors_slk);
2024
2025 return 0;
2026 }
2027
_sblk_updatekv(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val)2028 static WUR iwrc _sblk_updatekv(
2029 SBLK *sblk, int8_t idx,
2030 const IWKV_val *key, const IWKV_val *val) {
2031 assert(sblk && sblk->kvblk && idx >= 0 && idx < sblk->pnum);
2032 IWDB db = sblk->db;
2033 KVBLK *kvblk = sblk->kvblk;
2034 uint8_t kvidx = sblk->pi[idx];
2035 iwrc intrc = 0;
2036 iwrc rc = _kvblk_updatev(kvblk, &kvidx, key, val);
2037 if (IWKV_IS_INTERNAL_RC(rc)) {
2038 intrc = rc;
2039 rc = 0;
2040 }
2041 RCRET(rc);
2042 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2043 sblk->kvblkn = ADDR2BLK(kvblk->addr);
2044 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2045 sblk->flags |= SBLK_CACHE_UPDATE;
2046 }
2047 }
2048 sblk->pi[idx] = kvidx;
2049 sblk->flags |= SBLK_DURTY;
2050 // Update active cursors inside this block
2051 pthread_spin_lock(&db->cursors_slk);
2052 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2053 if (cur->cn && (cur->cn != sblk) && (cur->cn->addr == sblk->addr)) {
2054 memcpy(cur->cn, sblk, sizeof(*cur->cn));
2055 cur->cn->kvblk = 0;
2056 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2057 }
2058 }
2059 pthread_spin_unlock(&db->cursors_slk);
2060 return intrc;
2061 }
2062
_sblk_rmkv(SBLK * sblk,uint8_t idx)2063 static WUR iwrc _sblk_rmkv(SBLK *sblk, uint8_t idx) {
2064 assert(sblk && sblk->kvblk);
2065 IWDB db = sblk->db;
2066 KVBLK *kvblk = sblk->kvblk;
2067 IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
2068 assert(kvblk && idx < sblk->pnum && sblk->pi[idx] < KVBLK_IDXNUM);
2069
2070 iwrc rc = _kvblk_rmkv(kvblk, sblk->pi[idx], 0);
2071 RCRET(rc);
2072
2073 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2074 sblk->kvblkn = ADDR2BLK(kvblk->addr);
2075 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2076 sblk->flags |= SBLK_CACHE_UPDATE;
2077 }
2078 }
2079 --sblk->pnum;
2080 sblk->flags |= SBLK_DURTY;
2081
2082 if ((idx < sblk->pnum) && (sblk->pnum > 0)) {
2083 memmove(sblk->pi + idx, sblk->pi + idx + 1, sblk->pnum - idx);
2084 }
2085
2086 if (idx == 0) { // Lowest key removed
2087 // Replace the lowest key with the next one or reset
2088 if (sblk->pnum > 0) {
2089 uint8_t *mm, *kbuf;
2090 uint32_t klen;
2091 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2092 RCRET(rc);
2093 rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &kbuf, &klen);
2094 if (rc) {
2095 fsm->release_mmap(fsm);
2096 return rc;
2097 }
2098 sblk->lkl = MIN(db->iwkv->pklen, klen);
2099 memcpy(sblk->lk, kbuf, sblk->lkl);
2100 fsm->release_mmap(fsm);
2101 if (klen <= db->iwkv->pklen) {
2102 sblk->flags |= SBLK_FULL_LKEY;
2103 } else {
2104 sblk->flags &= ~SBLK_FULL_LKEY;
2105 }
2106 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2107 sblk->flags |= SBLK_CACHE_UPDATE;
2108 }
2109 } else {
2110 sblk->lkl = 0;
2111 sblk->flags |= SBLK_CACHE_REMOVE;
2112 }
2113 }
2114
2115 // Update active cursors
2116 pthread_spin_lock(&db->cursors_slk);
2117 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2118 if (cur->cn && (cur->cn->addr == sblk->addr)) {
2119 cur->skip_next = 0;
2120 if (cur->cn != sblk) {
2121 memcpy(cur->cn, sblk, sizeof(*cur->cn));
2122 cur->cn->kvblk = 0;
2123 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2124 }
2125 if (cur->cnpos == idx) {
2126 if (idx && (idx == sblk->pnum)) {
2127 cur->cnpos--;
2128 cur->skip_next = -1;
2129 } else {
2130 cur->skip_next = 1;
2131 }
2132 } else if (cur->cnpos > idx) {
2133 cur->cnpos--;
2134 }
2135 }
2136 }
2137 pthread_spin_unlock(&db->cursors_slk);
2138 return 0;
2139 }
2140
2141 //-------------------------- IWLCTX
2142
_lx_sblk_cmp_key(IWLCTX * lx,SBLK * sblk,int * resp)2143 WUR iwrc _lx_sblk_cmp_key(IWLCTX *lx, SBLK *sblk, int *resp) {
2144 int res = 0;
2145 iwrc rc = 0;
2146 iwdb_flags_t dbflg = sblk->db->dbflg;
2147 const IWKV_val *key = lx->key;
2148 uint8_t lkl = sblk->lkl;
2149 size_t ksize = key->size;
2150
2151 if (IW_UNLIKELY((sblk->pnum < 1) || (sblk->flags & SBLK_DB))) {
2152 *resp = 0;
2153 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
2154 return IWKV_ERROR_CORRUPTED;
2155 }
2156 if (dbflg & IWDB_COMPOUND_KEYS) {
2157 ksize += IW_VNUMSIZE(key->compound);
2158 }
2159 if ( (sblk->flags & SBLK_FULL_LKEY)
2160 || (ksize < lkl)
2161 || (dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2162 res = _cmp_keys(dbflg, sblk->lk, lkl, key);
2163 } else {
2164 res = _cmp_keys_prefix(dbflg, sblk->lk, lkl, key);
2165 if (res == 0) {
2166 uint32_t kl;
2167 uint8_t *mm, *k;
2168 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2169 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2170 if (rc) {
2171 *resp = 0;
2172 return rc;
2173 }
2174 if (!sblk->kvblk) {
2175 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2176 if (rc) {
2177 *resp = 0;
2178 fsm->release_mmap(fsm);
2179 return rc;
2180 }
2181 }
2182 rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[0], mm, &k, &kl);
2183 RCRET(rc);
2184 res = _cmp_keys(dbflg, k, kl, key);
2185 fsm->release_mmap(fsm);
2186 }
2187 }
2188 *resp = res;
2189 return rc;
2190 }
2191
_lx_roll_forward(IWLCTX * lx,uint8_t lvl)2192 static WUR iwrc _lx_roll_forward(IWLCTX *lx, uint8_t lvl) {
2193 iwrc rc = 0;
2194 int cret;
2195 SBLK *sblk;
2196 blkn_t blkn;
2197 assert(lx->lower);
2198
2199 while ((blkn = lx->lower->n[lvl])) {
2200 off_t blkaddr = BLK2ADDR(blkn);
2201 if ((lx->nlvl > -1) && (lvl < lx->nlvl)) {
2202 uint8_t ulvl = lvl + 1;
2203 if (lx->pupper[ulvl] && (lx->pupper[ulvl]->addr == blkaddr)) {
2204 sblk = lx->pupper[ulvl];
2205 } else if (lx->plower[ulvl] && (lx->plower[ulvl]->addr == blkaddr)) {
2206 sblk = lx->plower[ulvl];
2207 } else {
2208 rc = _sblk_at(lx, blkaddr, 0, &sblk);
2209 }
2210 } else {
2211 rc = _sblk_at(lx, blkaddr, 0, &sblk);
2212 }
2213 RCRET(rc);
2214 #ifndef NDEBUG
2215 ++lx->num_cmps;
2216 #endif
2217 rc = _lx_sblk_cmp_key(lx, sblk, &cret);
2218 RCRET(rc);
2219 if ((cret > 0) || (lx->upper_addr == sblk->addr)) { // upper > key
2220 lx->upper = sblk;
2221 break;
2222 } else {
2223 lx->lower = sblk;
2224 }
2225 }
2226 return 0;
2227 }
2228
_lx_find_bounds(IWLCTX * lx)2229 static WUR iwrc _lx_find_bounds(IWLCTX *lx) {
2230 iwrc rc = 0;
2231 int lvl;
2232 blkn_t blkn;
2233 SBLK *dblk = &lx->dblk;
2234 if (!dblk->addr) {
2235 SBLK *s;
2236 rc = _sblk_at(lx, lx->db->addr, 0, &s);
2237 RCRET(rc);
2238 memcpy(dblk, s, sizeof(*dblk));
2239 }
2240 if (!lx->lower) {
2241 rc = _dbcache_get(lx);
2242 RCRET(rc);
2243 }
2244 if (lx->nlvl > dblk->lvl) {
2245 // New level in DB
2246 dblk->lvl = (uint8_t) lx->nlvl;
2247 dblk->flags |= SBLK_DURTY;
2248 }
2249 lvl = lx->lower->lvl;
2250 while (lvl > -1) {
2251 rc = _lx_roll_forward(lx, (uint8_t) lvl);
2252 RCRET(rc);
2253 if (lx->upper) {
2254 blkn = ADDR2BLK(lx->upper->addr);
2255 } else {
2256 blkn = 0;
2257 }
2258 do {
2259 if (lx->nlvl >= lvl) {
2260 lx->plower[lvl] = lx->lower;
2261 lx->pupper[lvl] = lx->upper;
2262 }
2263 } while (lvl-- && lx->lower->n[lvl] == blkn);
2264 }
2265 return 0;
2266 }
2267
_lx_release_mm(IWLCTX * lx,uint8_t * mm)2268 static iwrc _lx_release_mm(IWLCTX *lx, uint8_t *mm) {
2269 iwrc rc = 0;
2270 if (lx->nlvl > -1) {
2271 SBLK *lsb = 0, *usb = 0;
2272 if (lx->nb) {
2273 rc = _sblk_sync_mm(lx, lx->nb, mm);
2274 RCGO(rc, finish);
2275 }
2276 if (lx->pupper[0] == lx->upper) {
2277 lx->upper = 0;
2278 }
2279 if (lx->plower[0] == lx->lower) {
2280 lx->lower = 0;
2281 }
2282 for (int i = 0; i <= lx->nlvl; ++i) {
2283 if (lx->pupper[i]) {
2284 if (lx->pupper[i] != usb) {
2285 usb = lx->pupper[i];
2286 rc = _sblk_sync_and_release_mm(lx, &lx->pupper[i], mm);
2287 RCGO(rc, finish);
2288 }
2289 lx->pupper[i] = 0;
2290 }
2291 if (lx->plower[i]) {
2292 if (lx->plower[i] != lsb) {
2293 lsb = lx->plower[i];
2294 rc = _sblk_sync_and_release_mm(lx, &lx->plower[i], mm);
2295 RCGO(rc, finish);
2296 }
2297 lx->plower[i] = 0;
2298 }
2299 }
2300 }
2301 if (lx->upper) {
2302 rc = _sblk_sync_and_release_mm(lx, &lx->upper, mm);
2303 RCGO(rc, finish);
2304 }
2305 if (lx->lower) {
2306 rc = _sblk_sync_and_release_mm(lx, &lx->lower, mm);
2307 RCGO(rc, finish);
2308 }
2309 if (lx->dblk.flags & SBLK_DURTY) {
2310 rc = _sblk_sync_mm(lx, &lx->dblk, mm);
2311 RCGO(rc, finish);
2312 }
2313 if (lx->nb) {
2314 if (lx->nb->flags & SBLK_CACHE_PUT) {
2315 rc = _dbcache_put_lw(lx, lx->nb);
2316 }
2317 _sblk_release(lx, &lx->nb);
2318 RCGO(rc, finish);
2319 }
2320 if (lx->cache_reload) {
2321 rc = _dbcache_fill_lw(lx);
2322 }
2323
2324 finish:
2325 lx->destroy_addr = 0;
2326 return rc;
2327 }
2328
_lx_release(IWLCTX * lx)2329 iwrc _lx_release(IWLCTX *lx) {
2330 uint8_t *mm;
2331 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2332 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2333 RCRET(rc);
2334 rc = _lx_release_mm(lx, mm);
2335 IWRC(fsm->release_mmap(fsm), rc);
2336 return rc;
2337 }
2338
_lx_split_addkv(IWLCTX * lx,int idx,SBLK * sblk)2339 static iwrc _lx_split_addkv(IWLCTX *lx, int idx, SBLK *sblk) {
2340 iwrc rc;
2341 SBLK *nb;
2342 blkn_t nblk;
2343 IWDB db = sblk->db;
2344 bool uside = (idx == sblk->pnum);
2345 register const int8_t pivot = (KVBLK_IDXNUM / 2) + 1; // 32
2346
2347 if (uside) { // Upper side
2348 rc = _sblk_create(lx, (uint8_t) lx->nlvl, 0, sblk, lx->upper, &nb);
2349 RCRET(rc);
2350 rc = _sblk_addkv(nb, lx);
2351 RCGO(rc, finish);
2352 } else { // New key is somewhere in a middle of sblk->kvblk
2353 assert(sblk->kvblk);
2354 // We are in the middle
2355 // Do the partial split
2356 // Move kv pairs into new `nb`
2357 // Compute space required for the new sblk which stores kv pairs after pivot `idx`
2358 size_t sz = 0;
2359 for (int8_t i = pivot; i < sblk->pnum; ++i) {
2360 sz += sblk->kvblk->pidx[sblk->pi[i]].len;
2361 }
2362 if (idx > pivot) {
2363 sz += IW_VNUMSIZE(lx->key->size) + lx->key->size + lx->val->size;
2364 }
2365 sz += KVBLK_MAX_NKV_SZ;
2366 uint8_t kvbpow = (uint8_t) iwlog2_64(sz);
2367 while ((1ULL << kvbpow) < sz) kvbpow++;
2368
2369 rc = _sblk_create(lx, (uint8_t) lx->nlvl, kvbpow, sblk, lx->upper, &nb);
2370 RCRET(rc);
2371
2372 IWKV_val key, val;
2373 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2374 for (int8_t i = pivot, end = sblk->pnum; i < end; ++i) {
2375 uint8_t *mm;
2376 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2377 RCBREAK(rc);
2378
2379 rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[i], &key, &val);
2380 assert(key.size);
2381 fsm->release_mmap(fsm);
2382 RCBREAK(rc);
2383
2384 rc = _sblk_addkv2(nb, i - pivot, &key, &val, true);
2385 _kv_dispose(&key, &val);
2386
2387 RCBREAK(rc);
2388 sblk->kvblk->pidx[sblk->pi[i]].len = 0;
2389 sblk->kvblk->pidx[sblk->pi[i]].off = 0;
2390 --sblk->pnum;
2391 }
2392 sblk->kvblk->flags |= KVBLK_DURTY;
2393 sblk->kvblk->zidx = sblk->pi[pivot];
2394 sblk->kvblk->maxoff = 0;
2395 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
2396 if (sblk->kvblk->pidx[i].off > sblk->kvblk->maxoff) {
2397 sblk->kvblk->maxoff = sblk->kvblk->pidx[i].off;
2398 }
2399 }
2400 }
2401
2402 // Fix levels:
2403 // [ lb -> sblk -> ub ]
2404 // [ lb -> sblk -> nb -> ub ]
2405 nblk = ADDR2BLK(nb->addr);
2406 lx->pupper[0]->p0 = nblk;
2407 lx->pupper[0]->flags |= SBLK_DURTY;
2408 nb->p0 = ADDR2BLK(lx->plower[0]->addr);
2409 for (int i = 0; i <= nb->lvl; ++i) {
2410 lx->plower[i]->n[i] = nblk;
2411 lx->plower[i]->flags |= SBLK_DURTY;
2412 nb->n[i] = ADDR2BLK(lx->pupper[i]->addr);
2413 }
2414
2415 pthread_spin_lock(&db->cursors_slk);
2416 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2417 if (cur->cn && (cur->cn->addr == sblk->addr)) {
2418 if (cur->cnpos >= pivot) {
2419 memcpy(cur->cn, nb, sizeof(*cur->cn));
2420 cur->cn->kvblk = 0;
2421 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2422 cur->cnpos -= pivot;
2423 }
2424 }
2425 }
2426 pthread_spin_unlock(&db->cursors_slk);
2427
2428 if (!uside) {
2429 if (idx > pivot) {
2430 rc = _sblk_addkv(nb, lx);
2431 } else {
2432 rc = _sblk_addkv(sblk, lx);
2433 }
2434 RCGO(rc, finish);
2435 }
2436
2437 finish:
2438 if (rc) {
2439 lx->nb = 0;
2440 IWRC(_sblk_destroy(lx, &nb), rc);
2441 } else {
2442 lx->nb = nb;
2443 }
2444 return rc;
2445 }
2446
_lx_init_chute(IWLCTX * lx)2447 IW_INLINE iwrc _lx_init_chute(IWLCTX *lx) {
2448 assert(lx->nlvl >= 0);
2449 iwrc rc = 0;
2450 if (!lx->pupper[lx->nlvl]) { // fix zero upper by dbtail
2451 SBLK *dbtail;
2452 rc = _sblk_at(lx, 0, 0, &dbtail);
2453 RCRET(rc);
2454 for (int8_t i = lx->nlvl; i >= 0 && !lx->pupper[i]; --i) {
2455 lx->pupper[i] = dbtail;
2456 }
2457 }
2458 return 0;
2459 }
2460
_lx_addkv(IWLCTX * lx)2461 static WUR iwrc _lx_addkv(IWLCTX *lx) {
2462 iwrc rc;
2463 bool found, uadd;
2464 uint8_t *mm = 0, idx;
2465 SBLK *sblk = lx->lower;
2466 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2467 if (lx->nlvl > -1) {
2468 rc = _lx_init_chute(lx);
2469 RCRET(rc);
2470 }
2471 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2472 RCRET(rc);
2473 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2474 if (rc) {
2475 fsm->release_mmap(fsm);
2476 return rc;
2477 }
2478 rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2479 RCRET(rc);
2480 if (found && (lx->opflags & IWKV_NO_OVERWRITE)) {
2481 fsm->release_mmap(fsm);
2482 return IWKV_ERROR_KEY_EXISTS;
2483 }
2484 uadd = ( !found
2485 && sblk->pnum > KVBLK_IDXNUM - 1 && idx > KVBLK_IDXNUM - 1
2486 && lx->upper && lx->upper->pnum < KVBLK_IDXNUM);
2487 if (uadd) {
2488 rc = _sblk_loadkvblk_mm(lx, lx->upper, mm);
2489 if (rc) {
2490 fsm->release_mmap(fsm);
2491 return rc;
2492 }
2493 }
2494 if (found) {
2495 IWKV_val sval, *val = lx->val;
2496 if (lx->opflags & IWKV_VAL_INCREMENT) {
2497 int64_t ival;
2498 uint8_t *rp;
2499 uint32_t len;
2500 if (val->size == 4) {
2501 int32_t lv;
2502 memcpy(&lv, val->data, val->size);
2503 lv = IW_ITOHL(lv);
2504 ival = lv;
2505 } else if (val->size == 8) {
2506 memcpy(&ival, val->data, val->size);
2507 ival = IW_ITOHLL(ival);
2508 } else {
2509 rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2510 fsm->release_mmap(fsm);
2511 return rc;
2512 }
2513 _kvblk_value_peek(sblk->kvblk, sblk->pi[idx], mm, &rp, &len);
2514 sval.data = rp;
2515 sval.size = len;
2516 if (sval.size == 4) {
2517 uint32_t lv;
2518 memcpy(&lv, sval.data, 4);
2519 lv = IW_ITOHL(lv);
2520 lv += ival;
2521 _num2lebuf(lx->incbuf, &lv, 4);
2522 } else if (sval.size == 8) {
2523 uint64_t llv;
2524 memcpy(&llv, sval.data, 8);
2525 llv = IW_ITOHLL(llv);
2526 llv += ival;
2527 _num2lebuf(lx->incbuf, &llv, 8);
2528 } else {
2529 rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2530 fsm->release_mmap(fsm);
2531 return rc;
2532 }
2533 sval.data = lx->incbuf;
2534 val = &sval;
2535 }
2536 if (lx->ph) {
2537 IWKV_val oldval;
2538 rc = _kvblk_value_get(sblk->kvblk, mm, sblk->pi[idx], &oldval);
2539 fsm->release_mmap(fsm);
2540 if (!rc) {
2541 // note: oldval should be disposed by ph
2542 rc = lx->ph(lx->key, lx->val, &oldval, lx->phop);
2543 }
2544 RCRET(rc);
2545 } else {
2546 fsm->release_mmap(fsm);
2547 }
2548 return _sblk_updatekv(sblk, idx, lx->key, val);
2549 } else {
2550 fsm->release_mmap(fsm);
2551 if (sblk->pnum > KVBLK_IDXNUM - 1) {
2552 if (uadd) {
2553 if (lx->ph) {
2554 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2555 RCRET(rc);
2556 }
2557 return _sblk_addkv(lx->upper, lx);
2558 }
2559 if (lx->nlvl < 0) {
2560 return _IWKV_RC_REQUIRE_NLEVEL;
2561 }
2562 if (lx->ph) {
2563 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2564 RCRET(rc);
2565 }
2566 return _lx_split_addkv(lx, idx, sblk);
2567 } else {
2568 if (lx->ph) {
2569 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2570 RCRET(rc);
2571 }
2572 return _sblk_addkv2(sblk, idx, lx->key, lx->val, false);
2573 }
2574 }
2575 }
2576
_lx_put_lw(IWLCTX * lx)2577 IW_INLINE WUR iwrc _lx_put_lw(IWLCTX *lx) {
2578 iwrc rc;
2579 start:
2580 rc = _lx_find_bounds(lx);
2581 if (rc) {
2582 _lx_release_mm(lx, 0);
2583 return rc;
2584 }
2585 rc = _lx_addkv(lx);
2586 if (rc == _IWKV_RC_REQUIRE_NLEVEL) {
2587 SBLK *lower = lx->lower;
2588 lx->lower = 0;
2589 _lx_release_mm(lx, 0);
2590 lx->nlvl = _sblk_genlevel(lx->db);
2591 if (lower->lvl >= lx->nlvl) {
2592 lx->lower = lower;
2593 }
2594 goto start;
2595 }
2596 if (rc == _IWKV_RC_KVBLOCK_FULL) {
2597 rc = IWKV_ERROR_CORRUPTED;
2598 iwlog_ecode_error3(rc);
2599 }
2600 IWRC(_lx_release(lx), rc);
2601 return rc;
2602 }
2603
_lx_get_lr(IWLCTX * lx)2604 IW_INLINE WUR iwrc _lx_get_lr(IWLCTX *lx) {
2605 iwrc rc = _lx_find_bounds(lx);
2606 RCRET(rc);
2607 bool found;
2608 uint8_t *mm, idx;
2609 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2610 lx->val->size = 0;
2611 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2612 RCRET(rc);
2613 rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2614 RCGO(rc, finish);
2615 rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2616 RCGO(rc, finish);
2617 if (found) {
2618 rc = _kvblk_value_get(lx->lower->kvblk, mm, lx->lower->pi[idx], lx->val);
2619 } else {
2620 rc = IWKV_ERROR_NOTFOUND;
2621 }
2622
2623 finish:
2624 IWRC(fsm->release_mmap(fsm), rc);
2625 _lx_release_mm(lx, 0);
2626 return rc;
2627 }
2628
_lx_del_sblk_lw(IWLCTX * lx,SBLK * sblk,uint8_t idx)2629 static WUR iwrc _lx_del_sblk_lw(IWLCTX *lx, SBLK *sblk, uint8_t idx) {
2630 assert(sblk->pnum == 1 && sblk->kvblk);
2631
2632 iwrc rc;
2633 IWDB db = lx->db;
2634 KVBLK *kvblk = sblk->kvblk;
2635 blkn_t sblk_blkn = ADDR2BLK(sblk->addr);
2636
2637 _lx_release_mm(lx, 0);
2638 lx->nlvl = sblk->lvl;
2639 lx->upper_addr = sblk->addr;
2640
2641 rc = _lx_find_bounds(lx);
2642 RCRET(rc);
2643 assert(lx->upper->pnum == 1 && lx->upper->addr == lx->upper_addr);
2644
2645 lx->upper->kvblk = kvblk;
2646 rc = _sblk_rmkv(lx->upper, idx);
2647 RCGO(rc, finish);
2648
2649 for (int i = 0; i <= lx->nlvl; ++i) {
2650 lx->plower[i]->n[i] = lx->upper->n[i];
2651 lx->plower[i]->flags |= SBLK_DURTY;
2652 if (lx->plower[i]->flags & SBLK_DB) {
2653 if (!lx->plower[i]->n[i]) {
2654 --lx->plower[i]->lvl;
2655 }
2656 }
2657 if (lx->pupper[i] == lx->upper) {
2658 // Do not touch `lx->upper` in next `_lx_release_mm()` call
2659 lx->pupper[i] = 0;
2660 }
2661 }
2662
2663 SBLK rb; // Block to remove
2664 memcpy(&rb, lx->upper, sizeof(rb));
2665
2666 SBLK *nb, // Block after lx->upper
2667 *rbp = &rb;
2668
2669 assert(!lx->nb);
2670 rc = _sblk_at(lx, BLK2ADDR(rb.n[0]), 0, &nb);
2671 RCGO(rc, finish);
2672 lx->nb = nb;
2673 lx->nb->p0 = rb.p0;
2674 lx->nb->flags |= SBLK_DURTY;
2675
2676 // Update cursors within sblk removed
2677 pthread_spin_lock(&db->cursors_slk);
2678 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2679 if (cur->cn) {
2680 if (cur->cn->addr == sblk->addr) {
2681 if (nb->flags & SBLK_DB) {
2682 if (!(lx->plower[0]->flags & SBLK_DB)) {
2683 memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2684 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2685 cur->cn->kvblk = 0;
2686 cur->skip_next = -1;
2687 cur->cnpos = lx->plower[0]->pnum;
2688 if (cur->cnpos) {
2689 cur->cnpos--;
2690 }
2691 } else {
2692 cur->cn = 0;
2693 cur->cnpos = 0;
2694 cur->skip_next = 0;
2695 }
2696 } else {
2697 memcpy(cur->cn, nb, sizeof(*nb));
2698 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2699 cur->cn->kvblk = 0;
2700 cur->cnpos = 0;
2701 cur->skip_next = 1;
2702 }
2703 } else if (cur->cn->n[0] == sblk_blkn) {
2704 memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2705 cur->cn->kvblk = 0;
2706 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2707 } else if (cur->cn->p0 == sblk_blkn) {
2708 memcpy(cur->cn, nb, sizeof(*nb));
2709 cur->cn->kvblk = 0;
2710 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2711 }
2712 }
2713 }
2714 pthread_spin_unlock(&db->cursors_slk);
2715
2716 rc = _sblk_destroy(lx, &rbp);
2717
2718 finish:
2719 return rc;
2720 }
2721
_lx_del_lw(IWLCTX * lx)2722 static WUR iwrc _lx_del_lw(IWLCTX *lx) {
2723 iwrc rc;
2724 bool found;
2725 uint8_t *mm = 0, idx;
2726 IWDB db = lx->db;
2727 IWFS_FSM *fsm = &db->iwkv->fsm;
2728 SBLK *sblk;
2729
2730 rc = _lx_find_bounds(lx);
2731 RCRET(rc);
2732
2733 sblk = lx->lower;
2734 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2735 RCGO(rc, finish);
2736 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2737 RCGO(rc, finish);
2738 rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2739 RCGO(rc, finish);
2740 if (!found) {
2741 rc = IWKV_ERROR_NOTFOUND;
2742 goto finish;
2743 }
2744 fsm->release_mmap(fsm);
2745 mm = 0;
2746
2747 if (sblk->pnum == 1) { // last kv in block
2748 rc = _lx_del_sblk_lw(lx, sblk, idx);
2749 } else {
2750 rc = _sblk_rmkv(sblk, idx);
2751 }
2752
2753 finish:
2754 if (mm) {
2755 fsm->release_mmap(fsm);
2756 }
2757 if (rc) {
2758 _lx_release_mm(lx, 0);
2759 } else {
2760 rc = _lx_release(lx);
2761 }
2762 return rc;
2763 }
2764
2765 //-------------------------- CACHE
2766
_dbcache_destroy_lw(IWDB db)2767 static void _dbcache_destroy_lw(IWDB db) {
2768 free(db->cache.nodes);
2769 memset(&db->cache, 0, sizeof(db->cache));
2770 }
2771
_dbcache_lvl(uint8_t lvl)2772 IW_INLINE uint8_t _dbcache_lvl(uint8_t lvl) {
2773 uint8_t clvl = (lvl >= DBCACHE_LEVELS) ? (lvl - DBCACHE_LEVELS + 1) : DBCACHE_MIN_LEVEL;
2774 if (clvl < DBCACHE_MIN_LEVEL) {
2775 clvl = DBCACHE_MIN_LEVEL;
2776 }
2777 return clvl;
2778 }
2779
_dbcache_cmp_nodes(const void * v1,const void * v2,void * op,int * res)2780 static WUR iwrc _dbcache_cmp_nodes(const void *v1, const void *v2, void *op, int *res) {
2781 iwrc rc = 0;
2782 uint8_t *mm = 0;
2783 IWLCTX *lx = op;
2784 IWDB db = lx->db;
2785 IWFS_FSM *fsm = &db->iwkv->fsm;
2786 iwdb_flags_t dbflg = db->dbflg;
2787 int rv = 0, step;
2788
2789 const DBCNODE *cn1 = v1, *cn2 = v2;
2790 uint8_t *k1 = (uint8_t*) cn1->lk, *k2 = (uint8_t*) cn2->lk;
2791 uint32_t kl1 = cn1->lkl, kl2 = cn2->lkl;
2792 KVBLK *kb;
2793
2794 if (!kl1 && cn1->fullkey) {
2795 kl1 = cn1->sblkn;
2796 }
2797 if (!kl2 && cn2->fullkey) {
2798 kl2 = cn2->sblkn;
2799 }
2800
2801 IWKV_val key2 = {
2802 .size = kl2,
2803 .data = k2
2804 };
2805
2806 if (dbflg & IWDB_COMPOUND_KEYS) {
2807 IW_READVNUMBUF64(k2, key2.compound, step);
2808 key2.size -= step;
2809 key2.data = (char*) key2.data + step;
2810 }
2811
2812 rv = _cmp_keys_prefix(dbflg, k1, kl1, &key2);
2813
2814 if ((rv == 0) && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2815
2816 if (!cn1->fullkey || !cn2->fullkey) {
2817 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2818 RCRET(rc);
2819 if (!cn1->fullkey) {
2820 rc = _kvblk_at_mm(lx, BLK2ADDR(cn1->kblkn), mm, 0, &kb);
2821 RCGO(rc, finish);
2822 rc = _kvblk_key_peek(kb, cn1->k0idx, mm, &k1, &kl1);
2823 RCGO(rc, finish);
2824 }
2825 if (!cn2->fullkey) {
2826 rc = _kvblk_at_mm(lx, BLK2ADDR(cn2->kblkn), mm, 0, &kb);
2827 RCGO(rc, finish);
2828 rc = _kvblk_key_peek(kb, cn2->k0idx, mm, &k2, &kl2);
2829 RCGO(rc, finish);
2830 key2.size = kl2;
2831 key2.data = k2;
2832 if (dbflg & IWDB_COMPOUND_KEYS) {
2833 IW_READVNUMBUF64(k2, key2.compound, step);
2834 key2.size -= step;
2835 key2.data = (char*) key2.data + step;
2836 }
2837 }
2838
2839 rv = _cmp_keys(dbflg, k1, kl1, &key2);
2840 } else if (dbflg & IWDB_COMPOUND_KEYS) {
2841
2842 int64_t c1, c2 = key2.compound;
2843 IW_READVNUMBUF64(k1, c1, step);
2844 kl1 -= step;
2845 if (key2.size == kl1) {
2846 rv = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
2847 } else {
2848 rv = (int) key2.size - (int) kl1;
2849 }
2850 } else {
2851 rv = (int) kl2 - (int) kl1;
2852 }
2853 }
2854
2855 finish:
2856 *res = rv;
2857 if (mm) {
2858 fsm->release_mmap(fsm);
2859 }
2860 return rc;
2861 }
2862
_dbcache_fill_lw(IWLCTX * lx)2863 static WUR iwrc _dbcache_fill_lw(IWLCTX *lx) {
2864 iwrc rc = 0;
2865 IWDB db = lx->db;
2866 lx->cache_reload = 0;
2867 if (!lx->dblk.addr) {
2868 SBLK *s;
2869 rc = _sblk_at(lx, lx->db->addr, 0, &s);
2870 RCRET(rc);
2871 memcpy(&lx->dblk, s, sizeof(lx->dblk));
2872 }
2873 SBLK *sdb = &lx->dblk;
2874 SBLK *sblk = sdb;
2875 DBCACHE *c = &db->cache;
2876 assert(lx->db->addr == sdb->addr);
2877 c->num = 0;
2878 if (c->nodes) {
2879 free(c->nodes);
2880 c->nodes = 0;
2881 }
2882 if (sdb->lvl < DBCACHE_MIN_LEVEL) {
2883 c->open = true;
2884 return 0;
2885 }
2886 c->lvl = _dbcache_lvl(sdb->lvl);
2887 c->nsize = (lx->db->dbflg & IWDB_VNUM64_KEYS) ? DBCNODE_VNUM_SZ : DBCNODE_STR_SZ;
2888 c->asize = c->nsize * ((1U << DBCACHE_LEVELS) + DBCACHE_ALLOC_STEP);
2889
2890 size_t nsize = c->nsize;
2891 c->nodes = malloc(c->asize);
2892 if (!c->nodes) {
2893 c->open = false;
2894 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2895 }
2896 blkn_t n;
2897 uint8_t *wp;
2898 size_t num = 0;
2899 while ((n = sblk->n[c->lvl])) {
2900 rc = _sblk_at(lx, BLK2ADDR(n), 0, &sblk);
2901 RCRET(rc);
2902 if (offsetof(DBCNODE, lk) + sblk->lkl > nsize) {
2903 free(c->nodes);
2904 c->nodes = 0;
2905 rc = IWKV_ERROR_CORRUPTED;
2906 iwlog_ecode_error3(rc);
2907 return rc;
2908 }
2909 DBCNODE cn = {
2910 .lkl = sblk->lkl,
2911 .fullkey = (sblk->flags & SBLK_FULL_LKEY),
2912 .k0idx = sblk->pi[0],
2913 .sblkn = ADDR2BLK(sblk->addr),
2914 .kblkn = sblk->kvblkn
2915 };
2916 if (c->asize < nsize * (num + 1)) {
2917 c->asize += (nsize * DBCACHE_ALLOC_STEP);
2918 wp = (uint8_t*) c->nodes;
2919 DBCNODE *nn = realloc(c->nodes, c->asize);
2920 if (!nn) {
2921 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
2922 free(wp);
2923 return rc;
2924 }
2925 c->nodes = nn;
2926 }
2927 wp = (uint8_t*) c->nodes + nsize * num;
2928 memcpy(wp, &cn, offsetof(DBCNODE, lk));
2929 wp += offsetof(DBCNODE, lk);
2930 memcpy(wp, sblk->lk, sblk->lkl);
2931 ++num;
2932 }
2933 c->num = num;
2934 c->open = true;
2935 return 0;
2936 }
2937
_dbcache_get(IWLCTX * lx)2938 static WUR iwrc _dbcache_get(IWLCTX *lx) {
2939 iwrc rc = 0;
2940 off_t idx;
2941 bool found;
2942 DBCNODE *n;
2943 alignas(DBCNODE) uint8_t dbcbuf[255];
2944 IWDB db = lx->db;
2945 DBCACHE *cache = &db->cache;
2946 const IWKV_val *key = lx->key;
2947 if ((lx->nlvl > -1) || (cache->num < 1)) {
2948 lx->lower = &lx->dblk;
2949 return 0;
2950 }
2951 assert(cache->nodes);
2952 size_t lxksiz = key->size;
2953 if (db->dbflg & IWDB_COMPOUND_KEYS) {
2954 lxksiz += IW_VNUMSIZE(key->compound);
2955 }
2956
2957 if (sizeof(DBCNODE) + lxksiz <= sizeof(dbcbuf)) {
2958 n = (DBCNODE*) dbcbuf;
2959 } else {
2960 n = malloc(sizeof(DBCNODE) + lxksiz);
2961 if (!n) {
2962 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2963 }
2964 }
2965 n->sblkn = (uint32_t) lxksiz; // `sblkn` used to store key size (to keep DBCNODE compact)
2966 n->kblkn = 0;
2967 n->fullkey = 1;
2968 n->lkl = 0;
2969 n->k0idx = 0;
2970
2971 uint8_t *wp = (uint8_t*) n + offsetof(DBCNODE, lk);
2972 if (db->dbflg & IWDB_COMPOUND_KEYS) {
2973 size_t step;
2974 char vbuf[IW_VNUMBUFSZ];
2975 IW_SETVNUMBUF(step, vbuf, key->compound);
2976 memcpy(wp, vbuf, step);
2977 wp += step;
2978 }
2979 memcpy(wp, key->data, key->size);
2980
2981 idx = iwarr_sorted_find2(cache->nodes, cache->num, cache->nsize, n, lx, &found, _dbcache_cmp_nodes);
2982 if (idx > 0) {
2983 DBCNODE *fn = (DBCNODE*) ((uint8_t*) cache->nodes + (idx - 1) * cache->nsize);
2984 assert(fn && idx - 1 < cache->num);
2985 rc = _sblk_at(lx, BLK2ADDR(fn->sblkn), 0, &lx->lower);
2986 } else {
2987 lx->lower = &lx->dblk;
2988 }
2989 if ((uint8_t*) n != dbcbuf) {
2990 free(n);
2991 }
2992 return rc;
2993 }
2994
_dbcache_put_lw(IWLCTX * lx,SBLK * sblk)2995 static WUR iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk) {
2996 off_t idx;
2997 bool found;
2998 IWDB db = lx->db;
2999 alignas(DBCNODE) uint8_t dbcbuf[255];
3000 DBCNODE *n = (DBCNODE*) dbcbuf;
3001 DBCACHE *cache = &db->cache;
3002 size_t nsize = cache->nsize;
3003
3004 sblk->flags &= ~SBLK_CACHE_PUT;
3005 assert(sizeof(*cache) + sblk->lkl <= sizeof(dbcbuf));
3006 if ((sblk->pnum < 1) || (sblk->lvl < cache->lvl)) {
3007 return 0;
3008 }
3009 if ((sblk->lvl >= cache->lvl + DBCACHE_LEVELS) || !cache->nodes) { // need to reload full cache
3010 lx->cache_reload = 1;
3011 return 0;
3012 }
3013 if (!sblk->kvblk) {
3014 assert(sblk->kvblk);
3015 return IW_ERROR_INVALID_STATE;
3016 }
3017 n->lkl = sblk->lkl;
3018 n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
3019 n->k0idx = sblk->pi[0];
3020 n->sblkn = ADDR2BLK(sblk->addr);
3021 n->kblkn = sblk->kvblkn;
3022 memcpy((uint8_t*) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
3023
3024 idx = iwarr_sorted_find2(cache->nodes, cache->num, nsize, n, lx, &found, _dbcache_cmp_nodes);
3025 assert(!found);
3026
3027 if (cache->asize <= cache->num * nsize) {
3028 size_t nsz = cache->asize + (nsize * DBCACHE_ALLOC_STEP);
3029 DBCNODE *nodes = realloc(cache->nodes, nsz);
3030 if (!nodes) {
3031 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3032 free(cache->nodes);
3033 cache->nodes = 0;
3034 return rc;
3035 }
3036 cache->asize = nsz;
3037 cache->nodes = nodes;
3038 }
3039
3040 uint8_t *cptr = (uint8_t*) cache->nodes;
3041 if (cache->num != idx) {
3042 memmove(cptr + (idx + 1) * nsize, cptr + idx * nsize, (cache->num - idx) * nsize);
3043 }
3044 memcpy(cptr + idx * nsize, n, nsize);
3045 ++cache->num;
3046 return 0;
3047 }
3048
_dbcache_remove_lw(IWLCTX * lx,SBLK * sblk)3049 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk) {
3050 IWDB db = lx->db;
3051 DBCACHE *cache = &db->cache;
3052 sblk->flags &= ~SBLK_CACHE_REMOVE;
3053 if ((sblk->lvl < cache->lvl) || (cache->num < 1)) {
3054 return;
3055 }
3056 if ((cache->lvl > DBCACHE_MIN_LEVEL) && (lx->dblk.lvl < sblk->lvl)) {
3057 // Database level reduced so we need to shift cache down
3058 lx->cache_reload = 1;
3059 return;
3060 }
3061 blkn_t sblkn = ADDR2BLK(sblk->addr);
3062 size_t num = cache->num;
3063 size_t nsize = cache->nsize;
3064 uint8_t *rp = (uint8_t*) cache->nodes;
3065 for (size_t i = 0; i < num; ++i) {
3066 DBCNODE *n = (DBCNODE*) (rp + i * nsize);
3067 if (sblkn == n->sblkn) {
3068 if (i < num - 1) {
3069 memmove(rp + i * nsize, rp + (i + 1) * nsize, (num - i - 1) * nsize);
3070 }
3071 --cache->num;
3072 break;
3073 }
3074 }
3075 }
3076
_dbcache_update_lw(IWLCTX * lx,SBLK * sblk)3077 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk) {
3078 IWDB db = lx->db;
3079 DBCACHE *cache = &db->cache;
3080 assert(sblk->pnum > 0);
3081 sblk->flags &= ~SBLK_CACHE_UPDATE;
3082 if ((sblk->lvl < cache->lvl) || (cache->num < 1)) {
3083 return;
3084 }
3085 blkn_t sblkn = ADDR2BLK(sblk->addr);
3086 size_t num = cache->num;
3087 size_t nsize = cache->nsize;
3088 uint8_t *rp = (uint8_t*) cache->nodes;
3089 for (size_t i = 0; i < num; ++i) {
3090 DBCNODE *n = (DBCNODE*) (rp + i * nsize);
3091 if (sblkn == n->sblkn) {
3092 n->kblkn = sblk->kvblkn;
3093 n->lkl = sblk->lkl;
3094 n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
3095 n->k0idx = sblk->pi[0];
3096 memcpy((uint8_t*) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
3097 break;
3098 }
3099 }
3100 }
3101
3102 //-------------------------- CURSOR
3103
_cursor_get_ge_idx(IWLCTX * lx,IWKV_cursor_op op,uint8_t * oidx)3104 IW_INLINE WUR iwrc _cursor_get_ge_idx(IWLCTX *lx, IWKV_cursor_op op, uint8_t *oidx) {
3105 iwrc rc = _lx_find_bounds(lx);
3106 RCRET(rc);
3107 bool found;
3108 uint8_t *mm, idx;
3109 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3110 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3111 RCRET(rc);
3112 rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
3113 RCGO(rc, finish);
3114 rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
3115 RCGO(rc, finish);
3116 if (found) {
3117 *oidx = idx;
3118 } else {
3119 if ((op == IWKV_CURSOR_EQ) || (lx->lower->flags & SBLK_DB) || (lx->lower->pnum < 1)) {
3120 rc = IWKV_ERROR_NOTFOUND;
3121 } else {
3122 *oidx = idx ? idx - 1 : idx;
3123 }
3124 }
3125
3126 finish:
3127 IWRC(fsm->release_mmap(fsm), rc);
3128 return rc;
3129 }
3130
_cursor_to_lr(IWKV_cursor cur,IWKV_cursor_op op)3131 static WUR iwrc _cursor_to_lr(IWKV_cursor cur, IWKV_cursor_op op) {
3132 iwrc rc = 0;
3133 IWDB db = cur->lx.db;
3134 IWLCTX *lx = &cur->lx;
3135 blkn_t dblk = ADDR2BLK(db->addr);
3136 if (op < IWKV_CURSOR_NEXT) { // IWKV_CURSOR_BEFORE_FIRST | IWKV_CURSOR_AFTER_LAST
3137 if (cur->cn) {
3138 _sblk_release(lx, &cur->cn);
3139 }
3140 if (op == IWKV_CURSOR_BEFORE_FIRST) {
3141 cur->dbaddr = db->addr;
3142 cur->cnpos = KVBLK_IDXNUM - 1;
3143 } else {
3144 cur->dbaddr = -1; // Negative as sign of dbtail
3145 cur->cnpos = 0;
3146 }
3147 return 0;
3148 }
3149
3150 start:
3151 if (op < IWKV_CURSOR_EQ) { // IWKV_CURSOR_NEXT | IWKV_CURSOR_PREV
3152 blkn_t n = 0;
3153 if (!cur->cn) {
3154 if (cur->dbaddr) {
3155 rc = _sblk_at(lx, (cur->dbaddr < 0 ? 0 : cur->dbaddr), 0, &cur->cn);
3156 cur->dbaddr = 0;
3157 RCGO(rc, finish);
3158 } else {
3159 rc = IWKV_ERROR_NOTFOUND;
3160 goto finish;
3161 }
3162 }
3163 if (op == IWKV_CURSOR_NEXT) {
3164 if (cur->skip_next > 0) {
3165 goto finish;
3166 }
3167 if (cur->cnpos + 1 >= cur->cn->pnum) {
3168 n = cur->cn->n[0];
3169 if (!n) {
3170 rc = IWKV_ERROR_NOTFOUND;
3171 goto finish;
3172 }
3173 _sblk_release(lx, &cur->cn);
3174 rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3175 RCGO(rc, finish);
3176 cur->cnpos = 0;
3177 if (IW_UNLIKELY(!cur->cn->pnum)) {
3178 goto start;
3179 }
3180 } else {
3181 if (cur->cn->flags & SBLK_DB) {
3182 rc = IWKV_ERROR_NOTFOUND;
3183 goto finish;
3184 }
3185 ++cur->cnpos;
3186 }
3187 } else { // IWKV_CURSOR_PREV
3188 if (cur->skip_next < 0) {
3189 goto finish;
3190 }
3191 if (cur->cnpos == 0) {
3192 n = cur->cn->p0;
3193 if (!n || (n == dblk)) {
3194 rc = IWKV_ERROR_NOTFOUND;
3195 goto finish;
3196 }
3197 _sblk_release(lx, &cur->cn);
3198 RCGO(rc, finish);
3199 rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3200 RCGO(rc, finish);
3201 if (IW_LIKELY(cur->cn->pnum)) {
3202 cur->cnpos = cur->cn->pnum - 1;
3203 } else {
3204 goto start;
3205 }
3206 } else {
3207 if (cur->cn->flags & SBLK_DB) {
3208 rc = IWKV_ERROR_NOTFOUND;
3209 goto finish;
3210 }
3211 --cur->cnpos;
3212 }
3213 }
3214 } else { // IWKV_CURSOR_EQ | IWKV_CURSOR_GE
3215 if (!lx->key) {
3216 rc = IW_ERROR_INVALID_STATE;
3217 goto finish;
3218 }
3219 rc = _cursor_get_ge_idx(lx, op, &cur->cnpos);
3220 if (lx->upper) {
3221 _sblk_release(lx, &lx->upper);
3222 }
3223 if (!rc) {
3224 cur->cn = lx->lower;
3225 lx->lower = 0;
3226 }
3227 }
3228
3229 finish:
3230 cur->skip_next = 0;
3231 if (rc && (rc != IWKV_ERROR_NOTFOUND)) {
3232 if (cur->cn) {
3233 _sblk_release(lx, &cur->cn);
3234 }
3235 }
3236 return rc;
3237 }
3238
3239 //-------------------------- PUBLIC API
3240
_kv_ecodefn(locale_t locale,uint32_t ecode)3241 static const char* _kv_ecodefn(locale_t locale, uint32_t ecode) {
3242 if (!((ecode > _IWKV_ERROR_START) && (ecode < _IWKV_ERROR_END))) {
3243 return 0;
3244 }
3245 switch (ecode) {
3246 case IWKV_ERROR_NOTFOUND:
3247 return "Key not found. (IWKV_ERROR_NOTFOUND)";
3248 case IWKV_ERROR_KEY_EXISTS:
3249 return "Key exists. (IWKV_ERROR_KEY_EXISTS)";
3250 case IWKV_ERROR_MAXKVSZ:
3251 return "Size of Key+value must be not greater than 0xfffffff bytes (IWKV_ERROR_MAXKVSZ)";
3252 case IWKV_ERROR_CORRUPTED:
3253 return "Database file invalid or corrupted (IWKV_ERROR_CORRUPTED)";
3254 case IWKV_ERROR_DUP_VALUE_SIZE:
3255 return "Value size is not compatible for insertion into sorted values array (IWKV_ERROR_DUP_VALUE_SIZE)";
3256 case IWKV_ERROR_KEY_NUM_VALUE_SIZE:
3257 return "Given key is not compatible to store as number (IWKV_ERROR_KEY_NUM_VALUE_SIZE)";
3258 case IWKV_ERROR_INCOMPATIBLE_DB_MODE:
3259 return "Incompatible database open mode (IWKV_ERROR_INCOMPATIBLE_DB_MODE)";
3260 case IWKV_ERROR_INCOMPATIBLE_DB_FORMAT:
3261 return "Incompatible database format version, please migrate database data (IWKV_ERROR_INCOMPATIBLE_DB_FORMAT)";
3262 case IWKV_ERROR_CORRUPTED_WAL_FILE:
3263 return "Corrupted WAL file (IWKV_ERROR_CORRUPTED_WAL_FILE)";
3264 case IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED:
3265 return "Stored value cannot be incremented/descremented (IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED)";
3266 case IWKV_ERROR_WAL_MODE_REQUIRED:
3267 return "Operation requires WAL enabled database. (IWKV_ERROR_WAL_MODE_REQUIRED)";
3268 case IWKV_ERROR_BACKUP_IN_PROGRESS:
3269 return "ackup operation in progress. (IWKV_ERROR_BACKUP_IN_PROGRESS)";
3270 default:
3271 break;
3272 }
3273 return 0;
3274 }
3275
iwkv_init(void)3276 iwrc iwkv_init(void) {
3277 static int _kv_initialized = 0;
3278 if (!__sync_bool_compare_and_swap(&_kv_initialized, 0, 1)) {
3279 return 0;
3280 }
3281 return iwlog_register_ecodefn(_kv_ecodefn);
3282 }
3283
_szpolicy(off_t nsize,off_t csize,struct IWFS_EXT * f,void ** _ctx)3284 static off_t _szpolicy(off_t nsize, off_t csize, struct IWFS_EXT *f, void **_ctx) {
3285 off_t res;
3286 size_t aunit = iwp_alloc_unit();
3287 if (csize < 0x4000000) { // Doubled alloc up to 64M
3288 res = csize ? csize : aunit;
3289 while (res < nsize) {
3290 res <<= 1;
3291 }
3292 } else {
3293 res = nsize + 10 * 1024 * 1024; // + 10M extra space
3294 }
3295 res = IW_ROUNDUP(res, aunit);
3296 return res;
3297 }
3298
iwkv_state(IWKV iwkv,IWFS_FSM_STATE * out)3299 iwrc iwkv_state(IWKV iwkv, IWFS_FSM_STATE *out) {
3300 if (!iwkv || !out) {
3301 return IW_ERROR_INVALID_ARGS;
3302 }
3303 int rci;
3304 API_RLOCK(iwkv, rci);
3305 IWFS_FSM fsm = iwkv->fsm;
3306 iwrc rc = fsm.state(&fsm, out);
3307 API_UNLOCK(iwkv, rci, rc);
3308 return rc;
3309 }
3310
iwkv_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)3311 iwrc iwkv_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
3312 return iwal_online_backup(iwkv, ts, target_file);
3313 }
3314
_iwkv_check_online_backup(const char * path,iwp_lockmode extra_lock_flags,bool * out_has_online_bkp)3315 static iwrc _iwkv_check_online_backup(const char *path, iwp_lockmode extra_lock_flags, bool *out_has_online_bkp) {
3316 size_t sp;
3317 uint32_t lv;
3318 off_t fsz, pos;
3319 uint64_t waloff; // WAL offset
3320 char buf[16384];
3321
3322 *out_has_online_bkp = false;
3323 const size_t aunit = iwp_alloc_unit();
3324 char *wpath = 0;
3325
3326 IWFS_FILE f = { 0 }, w = { 0 };
3327 IWFS_FILE_STATE fs, fw;
3328 iwrc rc = iwfs_file_open(&f, &(IWFS_FILE_OPTS) {
3329 .path = path,
3330 .omode = IWFS_OREAD | IWFS_OWRITE,
3331 .lock_mode = IWP_WLOCK | extra_lock_flags
3332 });
3333 if (rc == IW_ERROR_NOT_EXISTS) {
3334 return 0;
3335 }
3336 RCRET(rc);
3337
3338 rc = f.state(&f, &fs);
3339 RCGO(rc, finish);
3340
3341 rc = iwp_lseek(fs.fh, 0, IWP_SEEK_END, &fsz);
3342 RCGO(rc, finish);
3343 if (fsz < iwp_alloc_unit()) {
3344 goto finish;
3345 }
3346
3347 rc = iwp_pread(fs.fh, 0, &lv, sizeof(lv), &sp);
3348 RCGO(rc, finish);
3349 lv = IW_ITOHL(lv);
3350 if ((sp != sizeof(lv)) || (lv != IWFSM_MAGICK)) {
3351 goto finish;
3352 }
3353
3354 rc = iwp_pread(fs.fh, IWFSM_CUSTOM_HDR_DATA_OFFSET, &lv, sizeof(lv), &sp);
3355 RCGO(rc, finish);
3356 lv = IW_ITOHL(lv);
3357 if ((sp != sizeof(lv)) || (lv != IWKV_MAGIC)) {
3358 goto finish;
3359 }
3360
3361 rc = iwp_lseek(fs.fh, (off_t) -1 * sizeof(lv), IWP_SEEK_END, 0);
3362 RCGO(rc, finish);
3363
3364 rc = iwp_read(fs.fh, &lv, sizeof(lv), &sp);
3365 RCGO(rc, finish);
3366 lv = IW_ITOHL(lv);
3367 if ((sp != sizeof(lv)) || (lv != IWKV_BACKUP_MAGIC)) {
3368 goto finish;
3369 }
3370
3371 // Get WAL data offset
3372 rc = iwp_lseek(fs.fh, (off_t) -1 * (sizeof(waloff) + sizeof(lv)), IWP_SEEK_END, &pos);
3373 RCGO(rc, finish);
3374
3375 rc = iwp_read(fs.fh, &waloff, sizeof(waloff), &sp);
3376 RCGO(rc, finish);
3377
3378 waloff = IW_ITOHLL(waloff);
3379 if (((waloff != pos) && (waloff > pos - sizeof(WBSEP))) || (waloff & (aunit - 1))) {
3380 goto finish;
3381 }
3382
3383 // Read the first WAL instruction: WBSEP
3384 if (waloff != pos) { // Not an empty WAL?
3385 WBSEP wbsep = { 0 };
3386 rc = iwp_pread(fs.fh, waloff, &wbsep, sizeof(wbsep), &sp);
3387 RCGO(rc, finish);
3388 if (wbsep.id != WOP_SEP) {
3389 goto finish;
3390 }
3391 }
3392
3393 // Now we have an online backup image, unpack WAL file
3394
3395 sp = strlen(path);
3396 wpath = malloc(sp + 4 /*-wal*/ + 1 /*\0*/);
3397 if (!wpath) {
3398 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3399 goto finish;
3400 }
3401 memcpy(wpath, path, sp);
3402 memcpy(wpath + sp, "-wal", 4);
3403 wpath[sp + 4] = '\0';
3404
3405 iwlog_warn("Unpacking WAL from online backup into: %s", wpath);
3406 *out_has_online_bkp = true;
3407
3408 // WAL file
3409 rc = iwfs_file_open(&w, &(IWFS_FILE_OPTS) {
3410 .path = wpath,
3411 .omode = IWFS_OREAD | IWFS_OWRITE | IWFS_OTRUNC
3412 });
3413 RCGO(rc, finish);
3414
3415 rc = w.state(&w, &fw);
3416 RCGO(rc, finish);
3417
3418 // WAL content copy
3419 rc = iwp_lseek(fs.fh, waloff, IWP_SEEK_SET, 0);
3420 RCGO(rc, finish);
3421 fsz = fsz - waloff - sizeof(lv) /* magic */ - sizeof(waloff) /* wal offset */;
3422 if (fsz > 0) {
3423 sp = 0;
3424 do {
3425 rc = iwp_read(fs.fh, buf, sizeof(buf), &sp);
3426 RCGO(rc, finish);
3427 if (sp > fsz) {
3428 sp = fsz;
3429 }
3430 fsz -= sp;
3431 rc = iwp_write(fw.fh, buf, sp);
3432 RCGO(rc, finish);
3433 } while (fsz > 0 && sp > 0);
3434 }
3435 rc = iwp_fsync(fw.fh);
3436 RCGO(rc, finish);
3437
3438 rc = iwp_ftruncate(fs.fh, waloff);
3439 RCGO(rc, finish);
3440
3441 rc = iwp_fsync(fs.fh);
3442 RCGO(rc, finish);
3443
3444 finish:
3445 if (f.impl) {
3446 IWRC(f.close(&f), rc);
3447 }
3448 if (w.impl) {
3449 IWRC(w.close(&w), rc);
3450 }
3451 free(wpath);
3452 return rc;
3453 }
3454
iwkv_open(const IWKV_OPTS * opts,IWKV * iwkvp)3455 iwrc iwkv_open(const IWKV_OPTS *opts, IWKV *iwkvp) {
3456 if (!opts || !iwkvp || !opts->path) {
3457 return IW_ERROR_INVALID_ARGS;
3458 }
3459 *iwkvp = 0;
3460 int rci;
3461 iwrc rc = 0;
3462 uint32_t lv;
3463 uint64_t llv;
3464 uint8_t *rp, *mm;
3465 bool has_online_bkp = false;
3466
3467 rc = iw_init();
3468 RCRET(rc);
3469
3470 if (opts->random_seed) {
3471 iwu_rand_seed(opts->random_seed);
3472 }
3473 iwkv_openflags oflags = opts->oflags;
3474 iwfs_omode omode = IWFS_OREAD;
3475 if (oflags & IWKV_TRUNC) {
3476 oflags &= ~IWKV_RDONLY;
3477 omode |= IWFS_OTRUNC;
3478 }
3479 if (!(oflags & IWKV_RDONLY)) {
3480 omode |= IWFS_OWRITE;
3481 omode |= IWFS_OCREATE;
3482 }
3483 if ((omode & IWFS_OWRITE) && !(omode & IWFS_OTRUNC)) {
3484 iwp_lockmode extra_lock_flags = 0;
3485 if (opts->file_lock_fail_fast) {
3486 extra_lock_flags |= IWP_NBLOCK;
3487 }
3488 rc = _iwkv_check_online_backup(opts->path, extra_lock_flags, &has_online_bkp);
3489 RCRET(rc);
3490 }
3491
3492 *iwkvp = calloc(1, sizeof(struct _IWKV));
3493 if (!*iwkvp) {
3494 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
3495 }
3496 IWKV iwkv = *iwkvp;
3497 iwkv->fmt_version = opts->fmt_version > 0 ? opts->fmt_version : IWKV_FORMAT;
3498 if (iwkv->fmt_version > IWKV_FORMAT) {
3499 rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3500 iwlog_ecode_error3(rc);
3501 return rc;
3502 }
3503 // Adjust lower key len accourding to database format version
3504 if (iwkv->fmt_version < 2) {
3505 iwkv->pklen = PREFIX_KEY_LEN_V1;
3506 } else {
3507 iwkv->pklen = PREFIX_KEY_LEN_V2;
3508 }
3509
3510 pthread_rwlockattr_t attr;
3511 pthread_rwlockattr_init(&attr);
3512 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
3513 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3514 #endif
3515 rci = pthread_rwlock_init(&iwkv->rwl, &attr);
3516 if (rci) {
3517 free(*iwkvp);
3518 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3519 }
3520 rci = pthread_mutex_init(&iwkv->wk_mtx, 0);
3521 if (rci) {
3522 pthread_rwlock_destroy(&iwkv->rwl);
3523 free(*iwkvp);
3524 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3525 }
3526 rci = pthread_cond_init(&iwkv->wk_cond, 0);
3527 if (rci) {
3528 pthread_rwlock_destroy(&iwkv->rwl);
3529 pthread_mutex_destroy(&iwkv->wk_mtx);
3530 free(*iwkvp);
3531 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3532 }
3533
3534 iwkv->oflags = oflags;
3535 IWFS_FSM_STATE fsmstate;
3536 IWFS_FSM_OPTS fsmopts = {
3537 .exfile = {
3538 .file = {
3539 .path = opts->path,
3540 .omode = omode,
3541 .lock_mode = (oflags & IWKV_RDONLY) ? IWP_RLOCK : IWP_WLOCK
3542 },
3543 .rspolicy = _szpolicy,
3544 .maxoff = IWKV_MAX_DBSZ,
3545 .use_locks = true
3546 },
3547 .bpow = IWKV_FSM_BPOW, // 64 bytes block size
3548 .hdrlen = KVHDRSZ, // Size of custom file header
3549 .oflags = ((oflags & IWKV_RDONLY) ? IWFSM_NOLOCKS : 0),
3550 .mmap_all = true
3551 };
3552 #ifndef NDEBUG
3553 fsmopts.oflags |= IWFSM_STRICT;
3554 #endif
3555 if (oflags & IWKV_NO_TRIM_ON_CLOSE) {
3556 fsmopts.oflags |= IWFSM_NO_TRIM_ON_CLOSE;
3557 }
3558 if (opts->file_lock_fail_fast) {
3559 fsmopts.exfile.file.lock_mode |= IWP_NBLOCK;
3560 }
3561 // Init WAL
3562 rc = iwal_create(iwkv, opts, &fsmopts, has_online_bkp);
3563 RCGO(rc, finish);
3564
3565 // Now open database file
3566 rc = iwfs_fsmfile_open(&iwkv->fsm, &fsmopts);
3567 RCGO(rc, finish);
3568
3569 IWFS_FSM *fsm = &iwkv->fsm;
3570 iwkv->dbs = kh_init(DBS);
3571 rc = fsm->state(fsm, &fsmstate);
3572 RCGO(rc, finish);
3573
3574 // Database header: [magic:u4, first_addr:u8, db_format_version:u4]
3575 if (fsmstate.exfile.file.ostatus & IWFS_OPEN_NEW) {
3576 uint8_t hdr[KVHDRSZ] = { 0 };
3577 uint8_t *wp = hdr;
3578 IW_WRITELV(wp, lv, IWKV_MAGIC);
3579 wp += sizeof(llv); // skip first db addr
3580 IW_WRITELV(wp, lv, iwkv->fmt_version);
3581 rc = fsm->writehdr(fsm, 0, hdr, sizeof(hdr));
3582 RCGO(rc, finish);
3583 rc = fsm->sync(fsm, 0);
3584 RCGO(rc, finish);
3585 } else {
3586 off_t dbaddr; // first database address
3587 uint8_t hdr[KVHDRSZ];
3588 rc = fsm->readhdr(fsm, 0, hdr, KVHDRSZ);
3589 RCGO(rc, finish);
3590 rp = hdr; // -V507
3591 IW_READLV(rp, lv, lv);
3592 IW_READLLV(rp, llv, dbaddr);
3593 if ((lv != IWKV_MAGIC) || (dbaddr < 0)) {
3594 rc = IWKV_ERROR_CORRUPTED;
3595 iwlog_ecode_error3(rc);
3596 goto finish;
3597 }
3598 IW_READLV(rp, lv, iwkv->fmt_version);
3599 if ((iwkv->fmt_version > IWKV_FORMAT)) {
3600 rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3601 iwlog_ecode_error3(rc);
3602 goto finish;
3603 }
3604 if (iwkv->fmt_version < 2) {
3605 iwkv->pklen = PREFIX_KEY_LEN_V1;
3606 } else {
3607 iwkv->pklen = PREFIX_KEY_LEN_V2;
3608 }
3609 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3610 RCGO(rc, finish);
3611 rc = _db_load_chain(iwkv, dbaddr, mm);
3612 fsm->release_mmap(fsm);
3613 }
3614 (*iwkvp)->open = true;
3615
3616 finish:
3617 if (rc) {
3618 (*iwkvp)->open = true; // will be closed in iwkv_close
3619 IWRC(iwkv_close(iwkvp), rc);
3620 }
3621 return rc;
3622 }
3623
iwkv_exclusive_lock(IWKV iwkv)3624 iwrc iwkv_exclusive_lock(IWKV iwkv) {
3625 return _wnw(iwkv, _wnw_iwkw_wl);
3626 }
3627
iwkv_exclusive_unlock(IWKV iwkv)3628 iwrc iwkv_exclusive_unlock(IWKV iwkv) {
3629 int rci;
3630 iwrc rc = 0;
3631 API_UNLOCK(iwkv, rci, rc);
3632 return rc;
3633 }
3634
iwkv_close(IWKV * iwkvp)3635 iwrc iwkv_close(IWKV *iwkvp) {
3636 ENSURE_OPEN((*iwkvp));
3637 IWKV iwkv = *iwkvp;
3638 iwkv->open = false;
3639 iwal_shutdown(iwkv);
3640 iwrc rc = iwkv_exclusive_lock(iwkv);
3641 RCRET(rc);
3642 IWDB db = iwkv->first_db;
3643 while (db) {
3644 IWDB ndb = db->next;
3645 _db_release_lw(&db);
3646 db = ndb;
3647 }
3648 IWRC(iwkv->fsm.close(&iwkv->fsm), rc);
3649 // Below the memory cleanup only
3650 if (iwkv->dbs) {
3651 kh_destroy(DBS, iwkv->dbs);
3652 iwkv->dbs = 0;
3653 }
3654 iwkv_exclusive_unlock(iwkv);
3655 pthread_rwlock_destroy(&iwkv->rwl);
3656 pthread_mutex_destroy(&iwkv->wk_mtx);
3657 pthread_cond_destroy(&iwkv->wk_cond);
3658 free(iwkv);
3659 *iwkvp = 0;
3660 return rc;
3661 }
3662
_iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3663 static iwrc _iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3664 ENSURE_OPEN(iwkv);
3665 if (iwkv->oflags & IWKV_RDONLY) {
3666 return IW_ERROR_READONLY;
3667 }
3668 iwrc rc;
3669 if (iwkv->dlsnr) {
3670 rc = iwal_poke_savepoint(iwkv);
3671 } else {
3672 IWFS_FSM *fsm = &iwkv->fsm;
3673 pthread_rwlock_wrlock(&iwkv->rwl);
3674 iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3675 rc = fsm->sync(fsm, flags);
3676 pthread_rwlock_unlock(&iwkv->rwl);
3677 }
3678 return rc;
3679 }
3680
iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3681 iwrc iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3682 ENSURE_OPEN(iwkv);
3683 if (iwkv->oflags & IWKV_RDONLY) {
3684 return IW_ERROR_READONLY;
3685 }
3686 iwrc rc;
3687 if (iwkv->dlsnr) {
3688 rc = iwkv_exclusive_lock(iwkv);
3689 RCRET(rc);
3690 rc = iwal_savepoint_exl(iwkv, true);
3691 iwkv_exclusive_unlock(iwkv);
3692 } else {
3693 IWFS_FSM *fsm = &iwkv->fsm;
3694 pthread_rwlock_wrlock(&iwkv->rwl);
3695 iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3696 rc = fsm->sync(fsm, flags);
3697 pthread_rwlock_unlock(&iwkv->rwl);
3698 }
3699 return rc;
3700 }
3701
iwkv_db(IWKV iwkv,uint32_t dbid,iwdb_flags_t dbflg,IWDB * dbp)3702 iwrc iwkv_db(IWKV iwkv, uint32_t dbid, iwdb_flags_t dbflg, IWDB *dbp) {
3703 int rci;
3704 iwrc rc = 0;
3705 IWDB db = 0;
3706 *dbp = 0;
3707 API_RLOCK(iwkv, rci);
3708 khiter_t ki = kh_get(DBS, iwkv->dbs, dbid);
3709 if (ki != kh_end(iwkv->dbs)) {
3710 db = kh_value(iwkv->dbs, ki);
3711 }
3712 API_UNLOCK(iwkv, rci, rc);
3713 RCRET(rc);
3714 if (db) {
3715 if (db->dbflg != dbflg) {
3716 return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3717 }
3718 *dbp = db;
3719 return 0;
3720 }
3721 if (iwkv->oflags & IWKV_RDONLY) {
3722 return IW_ERROR_READONLY;
3723 }
3724 rc = iwkv_exclusive_lock(iwkv);
3725 RCRET(rc);
3726 ki = kh_get(DBS, iwkv->dbs, dbid);
3727 if (ki != kh_end(iwkv->dbs)) {
3728 db = kh_value(iwkv->dbs, ki);
3729 }
3730 if (db) {
3731 if (db->dbflg != dbflg) {
3732 return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3733 }
3734 *dbp = db;
3735 } else {
3736 rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3737 }
3738 if (!rc) {
3739 rc = iwal_savepoint_exl(iwkv, true);
3740 }
3741 iwkv_exclusive_unlock(iwkv);
3742 return rc;
3743 }
3744
iwkv_new_db(IWKV iwkv,iwdb_flags_t dbflg,uint32_t * dbidp,IWDB * dbp)3745 iwrc iwkv_new_db(IWKV iwkv, iwdb_flags_t dbflg, uint32_t *dbidp, IWDB *dbp) {
3746 *dbp = 0;
3747 *dbidp = 0;
3748 if (iwkv->oflags & IWKV_RDONLY) {
3749 return IW_ERROR_READONLY;
3750 }
3751 uint32_t dbid = 0;
3752 iwrc rc = iwkv_exclusive_lock(iwkv);
3753 RCRET(rc);
3754 for (khiter_t k = kh_begin(iwkv->dbs); k != kh_end(iwkv->dbs); ++k) {
3755 if (!kh_exist(iwkv->dbs, k)) {
3756 continue;
3757 }
3758 uint32_t id = kh_key(iwkv->dbs, k);
3759 if (id > dbid) {
3760 dbid = id;
3761 }
3762 }
3763 dbid++;
3764 rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3765 if (!rc) {
3766 *dbidp = dbid;
3767 rc = iwal_savepoint_exl(iwkv, true);
3768 }
3769 iwkv_exclusive_unlock(iwkv);
3770 return rc;
3771 }
3772
iwkv_db_cache_release(IWDB db)3773 iwrc iwkv_db_cache_release(IWDB db) {
3774 if (!db || !db->iwkv) {
3775 return IW_ERROR_INVALID_ARGS;
3776 }
3777 int rci;
3778 iwrc rc = 0;
3779 API_DB_WLOCK(db, rci);
3780 _dbcache_destroy_lw(db);
3781 API_DB_UNLOCK(db, rci, rc);
3782 return rc;
3783 }
3784
iwkv_db_destroy(IWDB * dbp)3785 iwrc iwkv_db_destroy(IWDB *dbp) {
3786 if (!dbp || !*dbp) {
3787 return IW_ERROR_INVALID_ARGS;
3788 }
3789 IWDB db = *dbp;
3790 IWKV iwkv = db->iwkv;
3791 *dbp = 0;
3792 if (iwkv->oflags & IWKV_RDONLY) {
3793 return IW_ERROR_READONLY;
3794 }
3795 iwrc rc = iwkv_exclusive_lock(iwkv);
3796 RCRET(rc);
3797 rc = _db_destroy_lw(&db);
3798 iwkv_exclusive_unlock(iwkv);
3799 return rc;
3800 }
3801
iwkv_puth(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3802 iwrc iwkv_puth(
3803 IWDB db, const IWKV_val *key, const IWKV_val *val,
3804 iwkv_opflags opflags, IWKV_PUT_HANDLER ph, void *phop) {
3805 if (!db || !db->iwkv || !key || !key->size || !val) {
3806 return IW_ERROR_INVALID_ARGS;
3807 }
3808 IWKV iwkv = db->iwkv;
3809 if (iwkv->oflags & IWKV_RDONLY) {
3810 return IW_ERROR_READONLY;
3811 }
3812 if (opflags & IWKV_VAL_INCREMENT) {
3813 // No overwrite for increment
3814 opflags &= ~IWKV_NO_OVERWRITE;
3815 }
3816
3817 int rci;
3818 IWKV_val ekey;
3819 uint8_t nbuf[IW_VNUMBUFSZ];
3820 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3821 RCRET(rc);
3822
3823 IWLCTX lx = {
3824 .db = db,
3825 .key = &ekey,
3826 .val = (IWKV_val*) val,
3827 .nlvl = -1,
3828 .op = IWLCTX_PUT,
3829 .opflags = opflags,
3830 .ph = ph,
3831 .phop = phop
3832 };
3833 API_DB_WLOCK(db, rci);
3834 if (!db->cache.open) {
3835 rc = _dbcache_fill_lw(&lx);
3836 RCGO(rc, finish);
3837 }
3838 rc = _lx_put_lw(&lx);
3839
3840 finish:
3841 API_DB_UNLOCK(db, rci, rc);
3842 if (!rc) {
3843 if (lx.opflags & IWKV_SYNC) {
3844 rc = _iwkv_sync(iwkv, 0);
3845 } else {
3846 rc = iwal_poke_checkpoint(iwkv, false);
3847 }
3848 }
3849 return rc;
3850 }
3851
iwkv_put(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags)3852 iwrc iwkv_put(IWDB db, const IWKV_val *key, const IWKV_val *val, iwkv_opflags opflags) {
3853 return iwkv_puth(db, key, val, opflags, 0, 0);
3854 }
3855
iwkv_get(IWDB db,const IWKV_val * key,IWKV_val * oval)3856 iwrc iwkv_get(IWDB db, const IWKV_val *key, IWKV_val *oval) {
3857 if (!db || !db->iwkv || !key || !oval) {
3858 return IW_ERROR_INVALID_ARGS;
3859 }
3860
3861 int rci;
3862 IWKV_val ekey;
3863 uint8_t nbuf[IW_VNUMBUFSZ];
3864 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3865 RCRET(rc);
3866
3867 IWLCTX lx = {
3868 .db = db,
3869 .key = &ekey,
3870 .val = oval,
3871 .nlvl = -1
3872 };
3873 oval->size = 0;
3874 if (IW_LIKELY(db->cache.open)) {
3875 API_DB_RLOCK(db, rci);
3876 } else {
3877 API_DB_WLOCK(db, rci);
3878 if (!db->cache.open) { // -V547
3879 rc = _dbcache_fill_lw(&lx);
3880 RCGO(rc, finish);
3881 }
3882 }
3883 rc = _lx_get_lr(&lx);
3884
3885 finish:
3886 API_DB_UNLOCK(db, rci, rc);
3887 return rc;
3888 }
3889
iwkv_get_copy(IWDB db,const IWKV_val * key,void * vbuf,size_t vbufsz,size_t * vsz)3890 iwrc iwkv_get_copy(IWDB db, const IWKV_val *key, void *vbuf, size_t vbufsz, size_t *vsz) {
3891 if (!db || !db->iwkv || !key || !vbuf) {
3892 return IW_ERROR_INVALID_ARGS;
3893 }
3894 *vsz = 0;
3895
3896 int rci;
3897 bool found;
3898 IWKV_val ekey;
3899 uint32_t ovalsz;
3900 uint8_t *mm = 0, *oval, idx;
3901 IWFS_FSM *fsm = &db->iwkv->fsm;
3902 uint8_t nbuf[IW_VNUMBUFSZ];
3903 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3904 RCRET(rc);
3905
3906 IWLCTX lx = {
3907 .db = db,
3908 .key = &ekey,
3909 .nlvl = -1
3910 };
3911 if (IW_LIKELY(db->cache.open)) {
3912 API_DB_RLOCK(db, rci);
3913 } else {
3914 API_DB_WLOCK(db, rci);
3915 if (!db->cache.open) { // -V547
3916 rc = _dbcache_fill_lw(&lx);
3917 RCGO(rc, finish);
3918 }
3919 }
3920 rc = _lx_find_bounds(&lx);
3921 RCGO(rc, finish);
3922 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3923 RCGO(rc, finish);
3924 rc = _sblk_loadkvblk_mm(&lx, lx.lower, mm);
3925 RCGO(rc, finish);
3926 rc = _sblk_find_pi_mm(lx.lower, &lx, mm, &found, &idx);
3927 RCGO(rc, finish);
3928 if (found) {
3929 _kvblk_value_peek(lx.lower->kvblk, lx.lower->pi[idx], mm, &oval, &ovalsz);
3930 *vsz = ovalsz;
3931 memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3932 } else {
3933 rc = IWKV_ERROR_NOTFOUND;
3934 }
3935
3936 finish:
3937 if (mm) {
3938 IWRC(fsm->release_mmap(fsm), rc);
3939 }
3940 _lx_release_mm(&lx, 0);
3941 API_DB_UNLOCK(db, rci, rc);
3942 return rc;
3943 }
3944
iwkv_db_set_meta(IWDB db,void * buf,size_t sz)3945 iwrc iwkv_db_set_meta(IWDB db, void *buf, size_t sz) {
3946 if (!db || !db->iwkv || !buf) {
3947 return IW_ERROR_INVALID_ARGS;
3948 }
3949 if (!sz) {
3950 return 0;
3951 }
3952
3953 int rci;
3954 iwrc rc = 0;
3955 bool resized = false;
3956 uint8_t *mm = 0, *wp, *sp;
3957 IWFS_FSM *fsm = &db->iwkv->fsm;
3958 size_t asz = IW_ROUNDUP(sz, 1U << IWKV_FSM_BPOW);
3959
3960 API_DB_WLOCK(db, rci);
3961 if ((asz > db->meta_blkn) || (asz * 2 <= db->meta_blkn)) {
3962 off_t oaddr = 0;
3963 off_t olen = 0;
3964 if (db->meta_blk) {
3965 rc = fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn));
3966 RCGO(rc, finish);
3967 }
3968 rc = fsm->allocate(fsm, asz, &oaddr, &olen, IWKV_FSM_ALLOC_FLAGS);
3969 RCGO(rc, finish);
3970 db->meta_blk = ADDR2BLK(oaddr);
3971 db->meta_blkn = ADDR2BLK(olen);
3972 resized = true;
3973 }
3974 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3975 RCGO(rc, finish);
3976 wp = mm + BLK2ADDR(db->meta_blk);
3977 memcpy(wp, buf, sz);
3978 if (db->iwkv->dlsnr) {
3979 rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, wp - mm, wp, sz, 0);
3980 RCGO(rc, finish);
3981 }
3982 if (resized) {
3983 uint32_t lv;
3984 wp = mm + db->addr + DOFF_METABLK_U4;
3985 sp = wp;
3986 IW_WRITELV(wp, lv, db->meta_blk);
3987 IW_WRITELV(wp, lv, db->meta_blkn);
3988 if (db->iwkv->dlsnr) {
3989 rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, sp - mm, sp, wp - sp, 0);
3990 RCGO(rc, finish);
3991 }
3992 }
3993 fsm->release_mmap(fsm);
3994 mm = 0;
3995
3996 finish:
3997 if (mm) {
3998 fsm->release_mmap(fsm);
3999 }
4000 API_DB_UNLOCK(db, rci, rc);
4001 return rc;
4002 }
4003
iwkv_db_get_meta(IWDB db,void * buf,size_t sz,size_t * rsz)4004 iwrc iwkv_db_get_meta(IWDB db, void *buf, size_t sz, size_t *rsz) {
4005 if (!db || !db->iwkv || !buf) {
4006 return IW_ERROR_INVALID_ARGS;
4007 }
4008 *rsz = 0;
4009 if (!sz || !db->meta_blkn) {
4010 return 0;
4011 }
4012 int rci;
4013 iwrc rc = 0;
4014 uint8_t *mm = 0;
4015 IWFS_FSM *fsm = &db->iwkv->fsm;
4016 size_t rmax = BLK2ADDR(db->meta_blkn);
4017 if (sz > rmax) {
4018 sz = rmax;
4019 }
4020 API_DB_RLOCK(db, rci);
4021 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4022 RCGO(rc, finish);
4023 memcpy(buf, mm + BLK2ADDR(db->meta_blk), sz);
4024 *rsz = sz;
4025
4026 finish:
4027 if (mm) {
4028 fsm->release_mmap(fsm);
4029 }
4030 API_DB_UNLOCK(db, rci, rc);
4031 return rc;
4032 }
4033
iwkv_del(IWDB db,const IWKV_val * key,iwkv_opflags opflags)4034 iwrc iwkv_del(IWDB db, const IWKV_val *key, iwkv_opflags opflags) {
4035 if (!db || !db->iwkv || !key) {
4036 return IW_ERROR_INVALID_ARGS;
4037 }
4038 int rci;
4039 IWKV_val ekey;
4040 IWKV iwkv = db->iwkv;
4041
4042 uint8_t nbuf[IW_VNUMBUFSZ];
4043 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
4044 RCRET(rc);
4045 IWLCTX lx = {
4046 .db = db,
4047 .key = &ekey,
4048 .nlvl = -1,
4049 .op = IWLCTX_DEL,
4050 .opflags = opflags
4051 };
4052 API_DB_WLOCK(db, rci);
4053 if (!db->cache.open) {
4054 rc = _dbcache_fill_lw(&lx);
4055 RCGO(rc, finish);
4056 }
4057 rc = _lx_del_lw(&lx);
4058
4059 finish:
4060 API_DB_UNLOCK(db, rci, rc);
4061 if (!rc) {
4062 if (lx.opflags & IWKV_SYNC) {
4063 rc = _iwkv_sync(iwkv, 0);
4064 } else {
4065 rc = iwal_poke_checkpoint(iwkv, false);
4066 }
4067 }
4068 return rc;
4069 }
4070
_cursor_close_lw(IWKV_cursor cur)4071 IW_INLINE iwrc _cursor_close_lw(IWKV_cursor cur) {
4072 iwrc rc = 0;
4073 cur->closed = true;
4074 IWDB db = cur->lx.db;
4075 pthread_spin_lock(&db->cursors_slk);
4076 for (IWKV_cursor c = db->cursors, pc = 0; c; pc = c, c = c->next) {
4077 if (c == cur) {
4078 if (pc) {
4079 pc->next = c->next;
4080 } else {
4081 db->cursors = c->next;
4082 }
4083 break;
4084 }
4085 }
4086 pthread_spin_unlock(&db->cursors_slk);
4087 return rc;
4088 }
4089
iwkv_cursor_open(IWDB db,IWKV_cursor * curptr,IWKV_cursor_op op,const IWKV_val * key)4090 iwrc iwkv_cursor_open(
4091 IWDB db,
4092 IWKV_cursor *curptr,
4093 IWKV_cursor_op op,
4094 const IWKV_val *key) {
4095 if ( !db || !db->iwkv || !curptr
4096 || (key && (op < IWKV_CURSOR_EQ)) || (op < IWKV_CURSOR_BEFORE_FIRST)) {
4097 return IW_ERROR_INVALID_ARGS;
4098 }
4099 iwrc rc;
4100 int rci;
4101 rc = _db_worker_inc_nolk(db);
4102 RCRET(rc);
4103 if (IW_LIKELY(db->cache.open)) {
4104 rc = _api_db_rlock(db);
4105 } else {
4106 rc = _api_db_wlock(db);
4107 }
4108 if (rc) {
4109 _db_worker_dec_nolk(db);
4110 return rc;
4111 }
4112 IWKV_cursor cur = 0;
4113 *curptr = calloc(1, sizeof(**curptr));
4114 if (!(*curptr)) {
4115 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
4116 goto finish;
4117 }
4118 cur = *curptr;
4119 IWLCTX *lx = &cur->lx;
4120 lx->db = db;
4121 lx->nlvl = -1;
4122 if (key) {
4123 rc = _to_effective_key(db, key, &lx->ekey, lx->nbuf);
4124 RCGO(rc, finish);
4125 lx->key = &lx->ekey;
4126 }
4127 if (!db->cache.open) {
4128 rc = _dbcache_fill_lw(lx);
4129 RCGO(rc, finish);
4130 }
4131 rc = _cursor_to_lr(cur, op);
4132
4133 finish:
4134 if (cur) {
4135 if (rc) {
4136 *curptr = 0;
4137 IWRC(_cursor_close_lw(cur), rc);
4138 free(cur);
4139 } else {
4140 pthread_spin_lock(&db->cursors_slk);
4141 cur->next = db->cursors;
4142 db->cursors = cur;
4143 pthread_spin_unlock(&db->cursors_slk);
4144 }
4145 }
4146 API_DB_UNLOCK(db, rci, rc);
4147 if (rc) {
4148 _db_worker_dec_nolk(db);
4149 }
4150 return rc;
4151 }
4152
iwkv_cursor_close(IWKV_cursor * curp)4153 iwrc iwkv_cursor_close(IWKV_cursor *curp) {
4154 iwrc rc = 0;
4155 int rci;
4156 if (!curp || !*curp) {
4157 return 0;
4158 }
4159 IWKV_cursor cur = *curp;
4160 *curp = 0;
4161 IWKV iwkv = cur->lx.db->iwkv;
4162 if (cur->closed) {
4163 free(cur);
4164 return 0;
4165 }
4166 if (!cur->lx.db) {
4167 return IW_ERROR_INVALID_ARGS;
4168 }
4169 API_DB_WLOCK(cur->lx.db, rci);
4170 rc = _cursor_close_lw(cur);
4171 API_DB_UNLOCK(cur->lx.db, rci, rc);
4172 IWRC(_db_worker_dec_nolk(cur->lx.db), rc);
4173 free(cur);
4174 if (!rc) {
4175 rc = iwal_poke_checkpoint(iwkv, false);
4176 }
4177 return rc;
4178 }
4179
iwkv_cursor_to(IWKV_cursor cur,IWKV_cursor_op op)4180 iwrc iwkv_cursor_to(IWKV_cursor cur, IWKV_cursor_op op) {
4181 int rci;
4182 if (!cur) {
4183 return IW_ERROR_INVALID_ARGS;
4184 }
4185 if (!cur->lx.db) {
4186 return IW_ERROR_INVALID_ARGS;
4187 }
4188 API_DB_RLOCK(cur->lx.db, rci);
4189 iwrc rc = _cursor_to_lr(cur, op);
4190 API_DB_UNLOCK(cur->lx.db, rci, rc);
4191 return rc;
4192 }
4193
iwkv_cursor_to_key(IWKV_cursor cur,IWKV_cursor_op op,const IWKV_val * key)4194 iwrc iwkv_cursor_to_key(IWKV_cursor cur, IWKV_cursor_op op, const IWKV_val *key) {
4195 int rci;
4196 if (!cur || ((op != IWKV_CURSOR_EQ) && (op != IWKV_CURSOR_GE))) {
4197 return IW_ERROR_INVALID_ARGS;
4198 }
4199 IWLCTX *lx = &cur->lx;
4200 if (!lx->db) {
4201 return IW_ERROR_INVALID_STATE;
4202 }
4203 iwrc rc = _to_effective_key(lx->db, key, &lx->ekey, lx->nbuf);
4204 RCRET(rc);
4205
4206 API_DB_RLOCK(lx->db, rci);
4207 lx->key = &lx->ekey;
4208 rc = _cursor_to_lr(cur, op);
4209 API_DB_UNLOCK(lx->db, rci, rc);
4210 return rc;
4211 }
4212
iwkv_cursor_get(IWKV_cursor cur,IWKV_val * okey,IWKV_val * oval)4213 iwrc iwkv_cursor_get(
4214 IWKV_cursor cur,
4215 IWKV_val *okey, /* Nullable */
4216 IWKV_val *oval) { /* Nullable */
4217 int rci;
4218 iwrc rc = 0;
4219 if (!cur || !cur->lx.db) {
4220 return IW_ERROR_INVALID_ARGS;
4221 }
4222 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4223 return IWKV_ERROR_NOTFOUND;
4224 }
4225 IWLCTX *lx = &cur->lx;
4226 API_DB_RLOCK(lx->db, rci);
4227 uint8_t *mm = 0;
4228 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4229 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4230 RCGO(rc, finish);
4231 if (!cur->cn->kvblk) {
4232 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4233 RCGO(rc, finish);
4234 }
4235 uint8_t idx = cur->cn->pi[cur->cnpos];
4236 if (okey && oval) {
4237 rc = _kvblk_kv_get(cur->cn->kvblk, mm, idx, okey, oval);
4238 } else if (oval) {
4239 rc = _kvblk_value_get(cur->cn->kvblk, mm, idx, oval);
4240 } else if (okey) {
4241 rc = _kvblk_key_get(cur->cn->kvblk, mm, idx, okey);
4242 } else {
4243 rc = IW_ERROR_INVALID_ARGS;
4244 }
4245 if (!rc && okey) {
4246 _unpack_effective_key(lx->db, okey, false);
4247 }
4248 finish:
4249 if (mm) {
4250 fsm->release_mmap(fsm);
4251 }
4252 API_DB_UNLOCK(lx->db, rci, rc);
4253 return rc;
4254 }
4255
iwkv_cursor_copy_val(IWKV_cursor cur,void * vbuf,size_t vbufsz,size_t * vsz)4256 iwrc iwkv_cursor_copy_val(IWKV_cursor cur, void *vbuf, size_t vbufsz, size_t *vsz) {
4257 int rci;
4258 iwrc rc = 0;
4259 if (!cur || !vbuf || !cur->lx.db) {
4260 return IW_ERROR_INVALID_ARGS;
4261 }
4262 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4263 return IWKV_ERROR_NOTFOUND;
4264 }
4265
4266 *vsz = 0;
4267 IWLCTX *lx = &cur->lx;
4268 API_DB_RLOCK(lx->db, rci);
4269 uint8_t *mm = 0, *oval;
4270 uint32_t ovalsz;
4271 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4272 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4273 RCGO(rc, finish);
4274 if (!cur->cn->kvblk) {
4275 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4276 RCGO(rc, finish);
4277 }
4278 uint8_t idx = cur->cn->pi[cur->cnpos];
4279 _kvblk_value_peek(cur->cn->kvblk, idx, mm, &oval, &ovalsz);
4280 *vsz = ovalsz;
4281 memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
4282
4283 finish:
4284 if (mm) {
4285 fsm->release_mmap(fsm);
4286 }
4287 API_DB_UNLOCK(lx->db, rci, rc);
4288 return rc;
4289 }
4290
iwkv_cursor_is_matched_key(IWKV_cursor cur,const IWKV_val * key,bool * ores,int64_t * ocompound)4291 iwrc iwkv_cursor_is_matched_key(IWKV_cursor cur, const IWKV_val *key, bool *ores, int64_t *ocompound) {
4292 int rci;
4293 iwrc rc = 0;
4294 if (!cur || !ores || !key || !cur->lx.db) {
4295 return IW_ERROR_INVALID_ARGS;
4296 }
4297 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4298 return IWKV_ERROR_NOTFOUND;
4299 }
4300
4301 *ores = 0;
4302 if (ocompound) {
4303 *ocompound = 0;
4304 }
4305
4306 IWLCTX *lx = &cur->lx;
4307 API_DB_RLOCK(lx->db, rci);
4308 uint8_t *mm = 0, *okey;
4309 uint32_t okeysz;
4310 iwdb_flags_t dbflg = lx->db->dbflg;
4311 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4312 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4313 RCGO(rc, finish);
4314 if (!cur->cn->kvblk) {
4315 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4316 RCGO(rc, finish);
4317 }
4318
4319 uint8_t idx = cur->cn->pi[cur->cnpos];
4320 rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4321 RCGO(rc, finish);
4322
4323 if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4324 char nbuf[2 * IW_VNUMBUFSZ];
4325 IWKV_val rkey = { .data = nbuf, .size = okeysz };
4326 memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4327 rc = _unpack_effective_key(lx->db, &rkey, true);
4328 RCGO(rc, finish);
4329 if (ocompound) {
4330 *ocompound = rkey.compound;
4331 }
4332 if (rkey.size != key->size) {
4333 *ores = false;
4334 goto finish;
4335 }
4336 if (dbflg & IWDB_VNUM64_KEYS) {
4337 *ores = !memcmp(rkey.data, key->data, key->size);
4338 } else {
4339 *ores = !memcmp(okey + (okeysz - rkey.size), key->data, key->size);
4340 }
4341 } else {
4342 *ores = (okeysz == key->size) && !memcmp(okey, key->data, key->size);
4343 }
4344
4345 finish:
4346 if (mm) {
4347 fsm->release_mmap(fsm);
4348 }
4349 API_DB_UNLOCK(cur->lx.db, rci, rc);
4350 return rc;
4351 }
4352
iwkv_cursor_copy_key(IWKV_cursor cur,void * kbuf,size_t kbufsz,size_t * ksz,int64_t * compound)4353 iwrc iwkv_cursor_copy_key(IWKV_cursor cur, void *kbuf, size_t kbufsz, size_t *ksz, int64_t *compound) {
4354 int rci;
4355 iwrc rc = 0;
4356 if (!cur || !cur->lx.db) {
4357 return IW_ERROR_INVALID_ARGS;
4358 }
4359 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4360 return IWKV_ERROR_NOTFOUND;
4361 }
4362
4363 *ksz = 0;
4364 IWLCTX *lx = &cur->lx;
4365 API_DB_RLOCK(lx->db, rci);
4366 uint8_t *mm = 0, *okey;
4367 uint32_t okeysz;
4368 iwdb_flags_t dbflg = lx->db->dbflg;
4369 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4370 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4371 RCGO(rc, finish);
4372 if (!cur->cn->kvblk) {
4373 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4374 RCGO(rc, finish);
4375 }
4376
4377 uint8_t idx = cur->cn->pi[cur->cnpos];
4378 rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4379 RCGO(rc, finish);
4380
4381 if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4382 char nbuf[2 * IW_VNUMBUFSZ];
4383 IWKV_val rkey = { .data = nbuf, .size = okeysz };
4384 memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4385 rc = _unpack_effective_key(lx->db, &rkey, true);
4386 RCGO(rc, finish);
4387 if (compound) {
4388 *compound = rkey.compound;
4389 }
4390 *ksz = rkey.size;
4391 if (dbflg & IWDB_VNUM64_KEYS) {
4392 memcpy(kbuf, rkey.data, MIN(kbufsz, rkey.size));
4393 } else {
4394 memcpy(kbuf, okey + (okeysz - rkey.size), MIN(kbufsz, rkey.size));
4395 }
4396 } else {
4397 *ksz = okeysz;
4398 if (compound) {
4399 *compound = 0;
4400 }
4401 memcpy(kbuf, okey, MIN(kbufsz, okeysz));
4402 }
4403
4404 finish:
4405 if (mm) {
4406 fsm->release_mmap(fsm);
4407 }
4408 API_DB_UNLOCK(cur->lx.db, rci, rc);
4409 return rc;
4410 }
4411
iwkv_cursor_seth(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)4412 IW_EXPORT iwrc iwkv_cursor_seth(
4413 IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags,
4414 IWKV_PUT_HANDLER ph, void *phop) {
4415 int rci;
4416 iwrc rc = 0, irc = 0;
4417 if (!cur || !cur->lx.db) {
4418 return IW_ERROR_INVALID_ARGS;
4419 }
4420 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4421 return IWKV_ERROR_NOTFOUND;
4422 }
4423
4424 IWLCTX *lx = &cur->lx;
4425 IWDB db = lx->db;
4426 IWKV iwkv = db->iwkv;
4427 SBLK *sblk = cur->cn;
4428
4429 API_DB_WLOCK(db, rci);
4430 if (ph) {
4431 uint8_t *mm;
4432 IWKV_val key, oldval;
4433 IWFS_FSM *fsm = &db->iwkv->fsm;
4434 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4435 RCGO(rc, finish);
4436 rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key, &oldval);
4437 fsm->release_mmap(fsm);
4438 if (!rc) {
4439 // note: oldval should be disposed by ph
4440 rc = ph(&key, val, &oldval, phop);
4441 _kv_val_dispose(&key);
4442 }
4443 RCGO(rc, finish);
4444 }
4445
4446 rc = _sblk_updatekv(sblk, cur->cnpos, 0, val);
4447 if (IWKV_IS_INTERNAL_RC(rc)) {
4448 irc = rc;
4449 rc = 0;
4450 }
4451 RCGO(rc, finish);
4452
4453 rc = _sblk_sync(lx, sblk);
4454 RCGO(rc, finish);
4455
4456 // Update active cursors inside this block
4457 pthread_spin_lock(&db->cursors_slk);
4458 for (IWKV_cursor c = db->cursors; c; c = c->next) {
4459 if (c->cn && (c->cn->addr == sblk->addr)) {
4460 if (c->cn != sblk) {
4461 memcpy(c->cn, sblk, sizeof(*c->cn));
4462 c->cn->kvblk = 0;
4463 c->cn->flags &= SBLK_PERSISTENT_FLAGS;
4464 }
4465 }
4466 }
4467 pthread_spin_unlock(&db->cursors_slk);
4468
4469 finish:
4470 API_DB_UNLOCK(db, rci, rc);
4471 if (!rc) {
4472 if (opflags & IWKV_SYNC) {
4473 rc = _iwkv_sync(iwkv, 0);
4474 } else {
4475 rc = iwal_poke_checkpoint(iwkv, false);
4476 }
4477 }
4478 return rc ? rc : irc;
4479 }
4480
iwkv_cursor_set(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags)4481 iwrc iwkv_cursor_set(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags) {
4482 return iwkv_cursor_seth(cur, val, opflags, 0, 0);
4483 }
4484
iwkv_cursor_val(IWKV_cursor cur,IWKV_val * oval)4485 iwrc iwkv_cursor_val(IWKV_cursor cur, IWKV_val *oval) {
4486 return iwkv_cursor_get(cur, 0, oval);
4487 }
4488
iwkv_cursor_key(IWKV_cursor cur,IWKV_val * okey)4489 iwrc iwkv_cursor_key(IWKV_cursor cur, IWKV_val *okey) {
4490 return iwkv_cursor_get(cur, okey, 0);
4491 }
4492
iwkv_cursor_del(IWKV_cursor cur,iwkv_opflags opflags)4493 iwrc iwkv_cursor_del(IWKV_cursor cur, iwkv_opflags opflags) {
4494 int rci;
4495 iwrc rc = 0;
4496 if (!cur || !cur->lx.db) {
4497 return IW_ERROR_INVALID_ARGS;
4498 }
4499 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4500 return IWKV_ERROR_NOTFOUND;
4501 }
4502
4503 uint8_t *mm;
4504 SBLK *sblk = cur->cn;
4505 IWLCTX *lx = &cur->lx;
4506 IWDB db = lx->db;
4507 IWKV iwkv = db->iwkv;
4508 IWFS_FSM *fsm = &iwkv->fsm;
4509
4510 API_DB_WLOCK(db, rci);
4511 if (!db->cache.open) {
4512 rc = _dbcache_fill_lw(lx);
4513 RCGO(rc, finish);
4514 }
4515 if (sblk->pnum == 1) { // sblk will be removed
4516 IWKV_val key = { 0 };
4517 // Key a key
4518 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4519 RCGO(rc, finish2);
4520 if (!sblk->kvblk) {
4521 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4522 fsm->release_mmap(fsm);
4523 RCGO(rc, finish2);
4524 }
4525 rc = _kvblk_key_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key);
4526 fsm->release_mmap(fsm);
4527 RCGO(rc, finish2);
4528
4529 lx->key = &key;
4530 rc = _lx_del_sblk_lw(lx, sblk, cur->cnpos);
4531 lx->key = 0;
4532
4533 finish2:
4534 if (rc) {
4535 _lx_release_mm(lx, 0);
4536 } else {
4537 rc = _lx_release(lx);
4538 }
4539 if (key.data) {
4540 _kv_val_dispose(&key);
4541 }
4542 } else { // Simple case
4543 if (!sblk->kvblk) {
4544 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4545 RCGO(rc, finish);
4546 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4547 fsm->release_mmap(fsm);
4548 RCGO(rc, finish);
4549 }
4550 rc = _sblk_rmkv(sblk, cur->cnpos);
4551 RCGO(rc, finish);
4552 rc = _sblk_sync(lx, sblk);
4553 }
4554
4555 finish:
4556 API_DB_UNLOCK(db, rci, rc);
4557 if (!rc) {
4558 if (opflags & IWKV_SYNC) {
4559 rc = _iwkv_sync(iwkv, 0);
4560 } else {
4561 rc = iwal_poke_checkpoint(iwkv, false);
4562 }
4563 }
4564 return rc;
4565 }
4566
4567 #include "./dbg/iwkvdbg.c"
4568