1 #pragma once
2 #ifndef IWKV_INTERNAL_H
3 #define IWKV_INTERNAL_H
4 
5 #include "iwkv.h"
6 #include "iwlog.h"
7 #include "iwarr.h"
8 #include "iwutils.h"
9 #include "iwfsmfile.h"
10 #include "iwdlsnr.h"
11 #include "iwal.h"
12 #include "khash.h"
13 #include "ksort.h"
14 #include <pthread.h>
15 #include <stdatomic.h>
16 #include <unistd.h>
17 #include "iwcfg.h"
18 
19 #if defined(__APPLE__) || defined(__ANDROID__)
20 #include "pthread_spin_lock_shim.h"
21 #endif
22 
23 // IWKV magic number
24 #define IWKV_MAGIC 0x69776b76U
25 
26 // IWKV backup magic number
27 #define IWKV_BACKUP_MAGIC 0xBACBAC69U
28 
29 // IWKV file format version
30 #define IWKV_FORMAT 2
31 
32 // IWDB magic number
33 #define IWDB_MAGIC 0x69776462U
34 
35 #ifdef IW_32
36 // Max database file size on 32 bit systems: 2Gb
37 #define IWKV_MAX_DBSZ 0x7fffffff
38 #else
39 // Max database file size: ~512Gb
40 #define IWKV_MAX_DBSZ 0x7fffffff80ULL
41 #endif
42 
43 // Size of KV fsm block as power of 2
44 #define IWKV_FSM_BPOW 7U
45 
46 #define IWKV_FSM_ALLOC_FLAGS (IWFSM_ALLOC_NO_OVERALLOCATE | IWFSM_SOLID_ALLOCATED_SPACE | IWFSM_ALLOC_NO_STATS)
47 
48 // Length of KV fsm header in bytes
49 #define KVHDRSZ 255U
50 
51 // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256 // SBLK
52 
53 // Maximum length of prefix key to compare for v2 formst
54 #define PREFIX_KEY_LEN_V1 116U
55 
56 #define PREFIX_KEY_LEN_V2 115U
57 
58 // Number of skip list levels
59 #define SLEVELS 24U
60 
61 #define AANUM (2U * SLEVELS + 2 /* levels + (new block created) + (db block may be updated) */)
62 
63 // Lower key length in SBLK
64 #define SBLK_LKLEN PREFIX_KEY_LEN_V2
65 
66 // Size of database start block in bytes
67 #define DB_SZ (2U * (1U << IWKV_FSM_BPOW))
68 
69 // Size of `SBLK` in bytes
70 #define SBLK_SZ (2U * (1U << IWKV_FSM_BPOW))
71 
72 // Number of SBLK blocks in one page
73 #define SBLK_PAGE_SBLK_NUM_V2 16U
74 
75 // Size of page with adjacent SBLK blocks. 4096
76 // Data format version: v2
77 #define SBLK_PAGE_SZ_V2 (SBLK_PAGE_SBLK_NUM_V2 * SBLK_SZ)
78 
79 // Number of `KV` blocks in KVBLK
80 #define KVBLK_IDXNUM 32U
81 
82 // Initial `KVBLK` size power of 2
83 #define KVBLK_INISZPOW 9U
84 
85 // KVBLK header size: blen:u1,idxsz:u2
86 #define KVBLK_HDRSZ 3U
87 
88 // Max kvp offset bytes
89 #define KVP_MAX_OFF_VLEN 8U
90 
91 // Max kvp len 0xfffffffULL bytes
92 #define KVP_MAX_LEN_VLEN 5U
93 
94 #define KVBLK_MAX_IDX_SZ ((KVP_MAX_OFF_VLEN + KVP_MAX_LEN_VLEN) * KVBLK_IDXNUM)
95 
96 // Max non KV size [blen:u1,idxsz:u2,[ps1:vn,pl1:vn,...,ps63,pl63]
97 #define KVBLK_MAX_NKV_SZ (KVBLK_HDRSZ + KVBLK_MAX_IDX_SZ)
98 
99 #define ADDR2BLK(addr_) ((blkn_t) (((uint64_t) (addr_)) >> IWKV_FSM_BPOW))
100 
101 #define BLK2ADDR(blk_) (((uint64_t) (blk_)) << IWKV_FSM_BPOW)
102 
103 struct _IWKV;
104 struct _IWDB;
105 
106 typedef uint32_t blkn_t;
107 typedef uint32_t dbid_t;
108 
109 /* Key/Value pair stored in `KVBLK` */
110 typedef struct KV {
111   size_t   keysz;
112   size_t   valsz;
113   uint8_t *key;
114   uint8_t *val;
115 } KV;
116 
117 /* Ket/Value (KV) index: Offset and length. */
118 typedef struct KVP {
119   off_t    off;   /**< KV block offset relative to `end` of KVBLK */
120   uint32_t len;   /**< Length of kv pair block */
121   uint8_t  ridx;  /**< Position of the actually persisted slot in `KVBLK` */
122 } KVP;
123 
124 typedef uint8_t kvblk_flags_t;
125 #define KVBLK_DEFAULT ((kvblk_flags_t) 0x00U)
126 /** KVBLK data is durty and should be flushed to mm */
127 #define KVBLK_DURTY ((kvblk_flags_t) 0x01U)
128 
129 typedef uint8_t kvblk_rmkv_opts_t;
130 #define RMKV_SYNC      ((kvblk_rmkv_opts_t) 0x01U)
131 #define RMKV_NO_RESIZE ((kvblk_rmkv_opts_t) 0x02U)
132 
133 typedef uint8_t sblk_flags_t;
134 /** The lowest `SBLK` key is fully contained in `SBLK`. Persistent flag. */
135 #define SBLK_FULL_LKEY ((sblk_flags_t) 0x01U)
136 /** This block is the start database block. */
137 #define SBLK_DB ((sblk_flags_t) 0x08U)
138 /** Block data changed, block marked as durty and needs to be persisted */
139 #define SBLK_DURTY ((sblk_flags_t) 0x10U)
140 /** Put this `SBLK` into dbcache */
141 #define SBLK_CACHE_PUT    ((sblk_flags_t) 0x20U)
142 #define SBLK_CACHE_UPDATE ((sblk_flags_t) 0x40U)
143 #define SBLK_CACHE_REMOVE ((sblk_flags_t) 0x80U)
144 
145 typedef uint8_t iwlctx_op_t;
146 /** Put key value operation */
147 #define IWLCTX_PUT ((iwlctx_op_t) 0x01U)
148 /** Delete key operation */
149 #define IWLCTX_DEL ((iwlctx_op_t) 0x01U)
150 
151 /* KVBLK: [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] */
152 typedef struct KVBLK {
153   IWDB     db;
154   off_t    addr;              /**< Block address */
155   off_t    maxoff;            /**< Max pair offset */
156   uint16_t idxsz;             /**< Size of KV pairs index in bytes */
157   int8_t   zidx;              /**< Index of first empty pair slot (zero index), or -1 */
158   uint8_t  szpow;             /**< Block size as power of 2 */
159   kvblk_flags_t flags;        /**< Flags */
160   KVP pidx[KVBLK_IDXNUM];     /**< KV pairs index */
161 } KVBLK;
162 
163 #define SBLK_PERSISTENT_FLAGS (SBLK_FULL_LKEY)
164 #define SBLK_CACHE_FLAGS      (SBLK_CACHE_UPDATE | SBLK_CACHE_PUT | SBLK_CACHE_REMOVE)
165 
166 // Number of top levels to cache (~ (1<<DBCACHE_LEVELS) cached elements)
167 #define DBCACHE_LEVELS 10U
168 
169 // Minimal cached level
170 #define DBCACHE_MIN_LEVEL 5U
171 
172 // Single allocation step - number of DBCNODEs
173 #define DBCACHE_ALLOC_STEP 32U
174 
175 /** Cached SBLK node */
176 typedef struct DBCNODE {
177   blkn_t  sblkn;              /**< SBLK block number or used to store key size (to keep DBCNODE compact) */
178   blkn_t  kblkn;              /**< KVBLK block number */
179   uint8_t lkl;                /**< Lower key length */
180   uint8_t fullkey;            /**< SBLK is full key */
181   uint8_t k0idx;              /**< KVBLK Zero KVP index */
182   uint8_t pad;                /**< 1 byte pad */
183   uint8_t lk[1];              /**< Lower key buffer */
184 } DBCNODE;
185 
186 #define DBCNODE_VNUM_SZ 24
187 #define DBCNODE_STR_SZ  128
188 
189 static_assert(DBCNODE_VNUM_SZ >= offsetof(DBCNODE, lk) + IW_VNUMBUFSZ,
190               "DBCNODE_VNUM_SZ >= offsetof(DBCNODE, lk) + IW_VNUMBUFSZ");
191 static_assert(DBCNODE_STR_SZ >= offsetof(DBCNODE, lk) + SBLK_LKLEN,
192               "DBCNODE_STR_SZ >= offsetof(DBCNODE, lk) + SBLK_LKLEN");
193 
194 /** Tallest SBLK nodes cache */
195 typedef struct DBCACHE {
196   size_t   asize;               /**< Size of allocated cache buffer */
197   size_t   num;                 /**< Actual number of nodes */
198   size_t   nsize;               /**< Cached node size */
199   uint8_t  lvl;                 /**< Lowes cached level */
200   bool     open;                /**< Is cache open */
201   DBCNODE *nodes;               /**< Sorted nodes array */
202 } DBCACHE;
203 
204 struct _IWKV_cursor;
205 
206 /* Database: [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4]:209 */
207 struct _IWDB {
208   // SBH
209   IWDB  db;                       /**< Database ref */
210   off_t addr;                     /**< Database block address */
211   sblk_flags_t flags;             /**< Flags */
212   // !SBH
213   IWKV    iwkv;
214   DBCACHE cache;                      /**< SBLK nodes cache */
215   pthread_rwlock_t   rwl;             /**< Database API RW lock */
216   pthread_spinlock_t cursors_slk;     /**< Cursors set guard lock */
217   off_t next_db_addr;                 /**< Next IWDB addr */
218   struct _IWKV_cursor *cursors;       /**< Active (currently in-use) database cursors */
219   struct _IWDB *next;                 /**< Next IWDB meta */
220   struct _IWDB *prev;                 /**< Prev IWDB meta */
221   dbid_t id;                          /**< Database ID */
222   volatile int32_t wk_count;          /**< Number of active database workers */
223   blkn_t       meta_blk;              /**< Database meta block number */
224   blkn_t       meta_blkn;             /**< Database meta length (number of blocks) */
225   iwdb_flags_t dbflg;                 /**< Database specific flags */
226   atomic_bool  open;                  /**< True if DB is in OPEN state */
227   volatile bool wk_pending_exclusive; /**< If true someone wants to acquire exclusive lock on IWDB */
228   uint32_t      lcnt[SLEVELS];        /**< SBLK count per level */
229 };
230 
231 /* Skiplist block: [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256 // SBLK */
232 typedef struct SBLK {
233   // SBH
234   IWDB  db;                   /**< Database ref */
235   off_t addr;                 /**< Block address */
236   sblk_flags_t flags;         /**< Flags */
237   uint8_t      lvl;           /**< Skip list node level */
238   uint8_t      bpos;          /**< Position of SBLK in a page block starting with 1 (zero means SBLK deleted) */
239   blkn_t       p0;            /**< Prev node, if IWDB it is the last node */
240   blkn_t       n[SLEVELS];    /**< Next nodes */
241   // !SBH
242   KVBLK  *kvblk;                 /**< Associated KVBLK */
243   blkn_t  kvblkn;                /**< Associated KVBLK block number */
244   int8_t  pnum;                  /**< Number of active kv indexes in `SBLK::pi` */
245   uint8_t lkl;                   /**< Lower key length within a buffer */
246   uint8_t pi[KVBLK_IDXNUM];      /**< Sorted KV slots, value is an index of kv slot in `KVBLK` */
247   uint8_t lk[PREFIX_KEY_LEN_V1]; /**< Lower key buffer */
248 } SBLK;
249 
250 // -V:KHASH_MAP_INIT_INT:522
251 KHASH_MAP_INIT_INT(DBS, IWDB)
252 
253 /** IWKV instance */
254 struct _IWKV {
255   IWFS_FSM fsm;                          /**< FSM pool */
256   pthread_rwlock_t rwl;                  /**< API RW lock */
257   iwrc     fatalrc;                      /**< Fatal error occuried, no farther operations can be performed */
258   IWDB     first_db;                     /**< First database in chain */
259   IWDB     last_db;                      /**< Last database in chain */
260   IWDLSNR *dlsnr;                        /**< WAL data events listener */
261   khash_t(DBS) * dbs;                    /**< Database id -> IWDB mapping */
262   iwkv_openflags  oflags;                /**< Open flags */
263   pthread_cond_t  wk_cond;               /**< Workers cond variable */
264   pthread_mutex_t wk_mtx;                /**< Workers cond mutext */
265   int32_t fmt_version;                   /**< Database format version */
266   int32_t pklen;                         /**< Prefix key length in use */
267   volatile int32_t wk_count;             /**< Number of active workers */
268   volatile bool    wk_pending_exclusive; /**< If true someone wants to acquire exclusive lock on IWKV */
269   atomic_bool      open;                 /**< True if kvstore is in OPEN state */
270 };
271 
272 /** Database lookup context */
273 typedef struct IWLCTX {
274   IWDB db;
275   const IWKV_val *key;        /**< Search key */
276   IWKV_val       *val;        /**< Update value */
277   SBLK *lower;                /**< Next to upper bound block */
278   SBLK *upper;                /**< Upper bound block */
279   SBLK *nb;                   /**< New block */
280   off_t destroy_addr;         /**< Block to destroy address */
281   off_t upper_addr;           /**< Upper block address used in `_lx_del_lr()` */
282 #ifndef NDEBUG
283   uint32_t num_cmps;
284 #endif
285   iwkv_opflags opflags;       /**< Operation flags */
286   sblk_flags_t sbflags;       /**< `SBLK` flags applied to all new/looked blocks in this context */
287   iwlctx_op_t  op;            /**< Context operation */
288   uint8_t      saan;          /**< Position of next free `SBLK` element in the `saa` area */
289   uint8_t      kaan;          /**< Position of next free `KVBLK` element in the `kaa` area */
290   int8_t       nlvl;          /**< Level of new inserted/deleted `SBLK` node. -1 if no new node inserted/deleted */
291   int8_t       cache_reload;  /**< If true dbcache should be refreshed after operation */
292   IWKV_PUT_HANDLER ph;        /**< Optional put handler */
293   void    *phop;              /**< Put handler opaque data */
294   SBLK    *plower[SLEVELS];   /**< Pinned lower nodes per level */
295   SBLK    *pupper[SLEVELS];   /**< Pinned upper nodes per level */
296   IWKV_val ekey;
297   SBLK     dblk;              /**< First database block */
298   SBLK     saa[AANUM];        /**< `SBLK` allocation area */
299   KVBLK    kaa[AANUM];        /**< `KVBLK` allocation area */
300   uint8_t  nbuf[IW_VNUMBUFSZ];
301   uint8_t  incbuf[8];         /**< Buffer used to store incremented/decremented values `IWKV_VAL_INCREMENT` opflag */
302 } IWLCTX;
303 
304 /** Cursor context */
305 struct _IWKV_cursor {
306   uint8_t cnpos;              /**< Position in the current `SBLK` node */
307   bool    closed;             /**< Cursor closed */
308   int8_t  skip_next;          /**< When to skip next IWKV_CURSOR_NEXT|IWKV_CURSOR_PREV cursor move
309                                    due to the side effect of `iwkv_cursor_del()` call.
310                                    If `skip_next > 0` `IWKV_CURSOR_NEXT` will be skipped
311                                    If `skip_next < 0` `IWKV_CURSOR_PREV` will be skipped */
312   SBLK *cn;                   /**< Current `SBLK` node */
313   struct _IWKV_cursor *next;  /**< Next cursor in active db cursors chain */
314   off_t  dbaddr;              /**< Database address used as `cn` */
315   IWLCTX lx;                  /**< Lookup context */
316 };
317 
318 #define ENSURE_OPEN(iwkv_) \
319   if (!(iwkv_) || !((iwkv_)->open)) return IW_ERROR_INVALID_STATE; \
320   if ((iwkv_)->fatalrc) return (iwkv_)->fatalrc
321 
322 #define ENSURE_OPEN_DB(db_) \
323   if (!(db_) || !(db_)->iwkv || !(db_)->open || !((db_)->iwkv->open)) return IW_ERROR_INVALID_STATE
324 
325 #define API_RLOCK(iwkv_, rci_) \
326   ENSURE_OPEN(iwkv_);  \
327   (rci_) = pthread_rwlock_rdlock(&(iwkv_)->rwl); \
328   if (rci_) return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_)
329 
_api_rlock(IWKV iwkv)330 IW_INLINE iwrc _api_rlock(IWKV iwkv) {
331   int rci;
332   API_RLOCK(iwkv, rci);
333   return 0;
334 }
335 
336 #define API_WLOCK(iwkv_, rci_) \
337   ENSURE_OPEN(iwkv_);  \
338   (rci_) = pthread_rwlock_wrlock(&(iwkv_)->rwl); \
339   if (rci_) return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_)
340 
_api_wlock(IWKV iwkv)341 IW_INLINE iwrc _api_wlock(IWKV iwkv) {
342   int rci;
343   API_WLOCK(iwkv, rci);
344   return 0;
345 }
346 
347 #define API_UNLOCK(iwkv_, rci_, rc_)  \
348   rci_ = pthread_rwlock_unlock(&(iwkv_)->rwl); \
349   if (rci_) IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_), rc_)
350 
351 #define API_DB_RLOCK(db_, rci_)                               \
352   do {                                                        \
353     API_RLOCK((db_)->iwkv, rci_);                             \
354     (rci_) = pthread_rwlock_rdlock(&(db_)->rwl);                \
355     if (rci_) {                                               \
356       pthread_rwlock_unlock(&(db_)->iwkv->rwl);               \
357       return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_);  \
358     }                                                         \
359   } while (0)
360 
_api_db_rlock(IWDB db)361 IW_INLINE iwrc _api_db_rlock(IWDB db) {
362   int rci;
363   API_DB_RLOCK(db, rci);
364   return 0;
365 }
366 
367 #define API_DB_WLOCK(db_, rci_)                               \
368   do {                                                        \
369     API_RLOCK((db_)->iwkv, rci_);                             \
370     (rci_) = pthread_rwlock_wrlock(&(db_)->rwl);                \
371     if (rci_) {                                               \
372       pthread_rwlock_unlock(&(db_)->iwkv->rwl);               \
373       return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_);  \
374     }                                                         \
375   } while (0)
376 
_api_db_wlock(IWDB db)377 IW_INLINE iwrc _api_db_wlock(IWDB db) {
378   int rci;
379   API_DB_WLOCK(db, rci);
380   return 0;
381 }
382 
383 #define API_DB_UNLOCK(db_, rci_, rc_)                                     \
384   do {                                                                    \
385     (rci_) = pthread_rwlock_unlock(&(db_)->rwl);                            \
386     if (rci_) IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_), rc_);  \
387     API_UNLOCK((db_)->iwkv, rci_, rc_);                                   \
388   } while (0)
389 
390 #define AAPOS_INC(aan_)         \
391   do {                          \
392     if ((aan_) < AANUM - 1) {   \
393       (aan_) = (aan_) + 1;      \
394     } else {                    \
395       (aan_) = 0;               \
396     }                           \
397   } while (0)
398 
399 
400 // SBLK
401 // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
402 
403 #define SOFF_FLAGS_U1   0
404 #define SOFF_LVL_U1     (SOFF_FLAGS_U1 + 1)
405 #define SOFF_LKL_U1     (SOFF_LVL_U1 + 1)
406 #define SOFF_PNUM_U1    (SOFF_LKL_U1 + 1)
407 #define SOFF_P0_U4      (SOFF_PNUM_U1 + 1)
408 #define SOFF_KBLK_U4    (SOFF_P0_U4 + 4)
409 #define SOFF_PI0_U1     (SOFF_KBLK_U4 + 4)
410 #define SOFF_N0_U4      (SOFF_PI0_U1 + 1 * KVBLK_IDXNUM)
411 #define SOFF_BPOS_U1_V2 (SOFF_N0_U4 + 4 * SLEVELS)
412 #define SOFF_LK_V2      (SOFF_BPOS_U1_V2 + 1)
413 #define SOFF_LK_V1      (SOFF_N0_U4 + 4 * SLEVELS)
414 #define SOFF_END        (SOFF_LK_V2 + SBLK_LKLEN)
415 static_assert(SOFF_END == 256, "SOFF_END == 256");
416 static_assert(SBLK_SZ >= SOFF_END, "SBLK_SZ >= SOFF_END");
417 
418 // DB
419 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
420 #define DOFF_MAGIC_U4    0
421 #define DOFF_DBFLG_U1    (DOFF_MAGIC_U4 + 4)
422 #define DOFF_DBID_U4     (DOFF_DBFLG_U1 + 1)
423 #define DOFF_NEXTDB_U4   (DOFF_DBID_U4 + 4)
424 #define DOFF_P0_U4       (DOFF_NEXTDB_U4 + 4)
425 #define DOFF_N0_U4       (DOFF_P0_U4 + 4)
426 #define DOFF_C0_U4       (DOFF_N0_U4 + 4 * SLEVELS)
427 #define DOFF_METABLK_U4  (DOFF_C0_U4 + 4 * SLEVELS)
428 #define DOFF_METABLKN_U4 (DOFF_METABLK_U4 + 4)
429 #define DOFF_END         (DOFF_METABLKN_U4 + 4)
430 static_assert(DOFF_END == 217, "DOFF_END == 217");
431 static_assert(DB_SZ >= DOFF_END, "DB_SZ >= DOFF_END");
432 
433 // KVBLK
434 // [szpow:u1,idxsz:u2,[ps1:vn,pl1:vn,...,ps32,pl32]____[[_KV],...]] // KVBLK
435 #define KBLK_SZPOW_OFF 0
436 
437 
438 iwrc iwkv_exclusive_lock(IWKV iwkv);
439 iwrc iwkv_exclusive_unlock(IWKV iwkv);
440 void iwkvd_trigger_xor(uint64_t val);
441 void iwkvd_kvblk(FILE *f, KVBLK *kb, int maxvlen);
442 iwrc iwkvd_sblk(FILE *f, IWLCTX *lx, SBLK *sb, int flags);
443 void iwkvd_db(FILE *f, IWDB db, int flags, int plvl);
444 
445 // IWKVD Trigger commands
446 #ifdef IW_TESTS
447 #define IWKVD_WAL_NO_CHECKPOINT_ON_CLOSE 1UL
448 #endif
449 
450 #endif
451