1 /* $OpenBSD: btree.c,v 1.38 2017/05/26 21:23:14 sthen Exp $ */
2
3 /*
4 * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include <sys/types.h>
20 #include <sys/tree.h>
21 #include <sys/stat.h>
22 #include <sys/queue.h>
23 #include <sys/uio.h>
24
25 #include <assert.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <stddef.h>
30 #include <stdint.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <time.h>
35 #include <unistd.h>
36
37 #include "btree.h"
38
39 /* #define DEBUG */
40
41 #ifdef DEBUG
42 # define DPRINTF(...) do { fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
43 fprintf(stderr, __VA_ARGS__); \
44 fprintf(stderr, "\n"); } while(0)
45 #else
46 # define DPRINTF(...)
47 #endif
48
49 #define PAGESIZE 4096
50 #define BT_MINKEYS 4
51 #define BT_MAGIC 0xB3DBB3DB
52 #define BT_VERSION 4
53 #define MAXKEYSIZE 255
54
55 #define P_INVALID 0xFFFFFFFF
56
57 #define F_ISSET(w, f) (((w) & (f)) == (f))
58 #define MINIMUM(a,b) ((a) < (b) ? (a) : (b))
59
60 typedef uint32_t pgno_t;
61 typedef uint16_t indx_t;
62
63 /* There are four page types: meta, index, leaf and overflow.
64 * They all share the same page header.
65 */
66 struct page { /* represents an on-disk page */
67 pgno_t pgno; /* page number */
68 #define P_BRANCH 0x01 /* branch page */
69 #define P_LEAF 0x02 /* leaf page */
70 #define P_OVERFLOW 0x04 /* overflow page */
71 #define P_META 0x08 /* meta page */
72 #define P_HEAD 0x10 /* header page */
73 uint32_t flags;
74 #define lower b.fb.fb_lower
75 #define upper b.fb.fb_upper
76 #define p_next_pgno b.pb_next_pgno
77 union page_bounds {
78 struct {
79 indx_t fb_lower; /* lower bound of free space */
80 indx_t fb_upper; /* upper bound of free space */
81 } fb;
82 pgno_t pb_next_pgno; /* overflow page linked list */
83 } b;
84 indx_t ptrs[1]; /* dynamic size */
85 } __packed;
86
87 #define PAGEHDRSZ offsetof(struct page, ptrs)
88
89 #define NUMKEYSP(p) (((p)->lower - PAGEHDRSZ) >> 1)
90 #define NUMKEYS(mp) (((mp)->page->lower - PAGEHDRSZ) >> 1)
91 #define SIZELEFT(mp) (indx_t)((mp)->page->upper - (mp)->page->lower)
92 #define PAGEFILL(bt, mp) (1000 * ((bt)->head.psize - PAGEHDRSZ - SIZELEFT(mp)) / \
93 ((bt)->head.psize - PAGEHDRSZ))
94 #define IS_LEAF(mp) F_ISSET((mp)->page->flags, P_LEAF)
95 #define IS_BRANCH(mp) F_ISSET((mp)->page->flags, P_BRANCH)
96 #define IS_OVERFLOW(mp) F_ISSET((mp)->page->flags, P_OVERFLOW)
97
98 struct bt_head { /* header page content */
99 uint32_t magic;
100 uint32_t version;
101 uint32_t flags;
102 uint32_t psize; /* page size */
103 } __packed;
104
105 struct bt_meta { /* meta (footer) page content */
106 #define BT_TOMBSTONE 0x01 /* file is replaced */
107 uint32_t flags;
108 pgno_t root; /* page number of root page */
109 pgno_t prev_meta; /* previous meta page number */
110 time_t created_at;
111 uint32_t branch_pages;
112 uint32_t leaf_pages;
113 uint32_t overflow_pages;
114 uint32_t revisions;
115 uint32_t depth;
116 uint64_t entries;
117 unsigned char hash[SHA_DIGEST_LENGTH];
118 } __packed;
119
120 struct btkey {
121 size_t len;
122 char str[MAXKEYSIZE];
123 };
124
125 struct mpage { /* an in-memory cached page */
126 RB_ENTRY(mpage) entry; /* page cache entry */
127 SIMPLEQ_ENTRY(mpage) next; /* queue of dirty pages */
128 TAILQ_ENTRY(mpage) lru_next; /* LRU queue */
129 struct mpage *parent; /* NULL if root */
130 unsigned int parent_index; /* keep track of node index */
131 struct btkey prefix;
132 struct page *page;
133 pgno_t pgno; /* copy of page->pgno */
134 short ref; /* increased by cursors */
135 short dirty; /* 1 if on dirty queue */
136 };
137 RB_HEAD(page_cache, mpage);
138 SIMPLEQ_HEAD(dirty_queue, mpage);
139 TAILQ_HEAD(lru_queue, mpage);
140
141 static int mpage_cmp(struct mpage *a, struct mpage *b);
142 static struct mpage *mpage_lookup(struct btree *bt, pgno_t pgno);
143 static void mpage_add(struct btree *bt, struct mpage *mp);
144 static void mpage_free(struct mpage *mp);
145 static void mpage_del(struct btree *bt, struct mpage *mp);
146 static void mpage_flush(struct btree *bt);
147 static struct mpage *mpage_copy(struct btree *bt, struct mpage *mp);
148 static void mpage_prune(struct btree *bt);
149 static void mpage_dirty(struct btree *bt, struct mpage *mp);
150 static struct mpage *mpage_touch(struct btree *bt, struct mpage *mp);
151
152 RB_PROTOTYPE(page_cache, mpage, entry, mpage_cmp);
153 RB_GENERATE(page_cache, mpage, entry, mpage_cmp);
154
155 struct ppage { /* ordered list of pages */
156 SLIST_ENTRY(ppage) entry;
157 struct mpage *mpage;
158 unsigned int ki; /* cursor index on page */
159 };
160 SLIST_HEAD(page_stack, ppage);
161
162 #define CURSOR_EMPTY(c) SLIST_EMPTY(&(c)->stack)
163 #define CURSOR_TOP(c) SLIST_FIRST(&(c)->stack)
164 #define CURSOR_POP(c) SLIST_REMOVE_HEAD(&(c)->stack, entry)
165 #define CURSOR_PUSH(c,p) SLIST_INSERT_HEAD(&(c)->stack, p, entry)
166
167 struct cursor {
168 struct btree *bt;
169 struct btree_txn *txn;
170 struct page_stack stack; /* stack of parent pages */
171 short initialized; /* 1 if initialized */
172 short eof; /* 1 if end is reached */
173 };
174
175 #define METAHASHLEN offsetof(struct bt_meta, hash)
176 #define METADATA(p) ((void *)((char *)p + PAGEHDRSZ))
177
178 struct node {
179 #define n_pgno p.np_pgno
180 #define n_dsize p.np_dsize
181 union {
182 pgno_t np_pgno; /* child page number */
183 uint32_t np_dsize; /* leaf data size */
184 } p;
185 uint16_t ksize; /* key size */
186 #define F_BIGDATA 0x01 /* data put on overflow page */
187 uint8_t flags;
188 char data[1];
189 } __packed;
190
191 struct btree_txn {
192 pgno_t root; /* current / new root page */
193 pgno_t next_pgno; /* next unallocated page */
194 struct btree *bt; /* btree is ref'd */
195 struct dirty_queue *dirty_queue; /* modified pages */
196 #define BT_TXN_RDONLY 0x01 /* read-only transaction */
197 #define BT_TXN_ERROR 0x02 /* an error has occurred */
198 unsigned int flags;
199 };
200
201 struct btree {
202 int fd;
203 char *path;
204 #define BT_FIXPADDING 0x01 /* internal */
205 unsigned int flags;
206 bt_cmp_func cmp; /* user compare function */
207 struct bt_head head;
208 struct bt_meta meta;
209 struct page_cache *page_cache;
210 struct lru_queue *lru_queue;
211 struct btree_txn *txn; /* current write transaction */
212 int ref; /* increased by cursors & txn */
213 struct btree_stat stat;
214 off_t size; /* current file size */
215 };
216
217 #define NODESIZE offsetof(struct node, data)
218
219 #define INDXSIZE(k) (NODESIZE + ((k) == NULL ? 0 : (k)->size))
220 #define LEAFSIZE(k, d) (NODESIZE + (k)->size + (d)->size)
221 #define NODEPTRP(p, i) ((struct node *)((char *)(p) + (p)->ptrs[i]))
222 #define NODEPTR(mp, i) NODEPTRP((mp)->page, i)
223 #define NODEKEY(node) (void *)((node)->data)
224 #define NODEDATA(node) (void *)((char *)(node)->data + (node)->ksize)
225 #define NODEPGNO(node) ((node)->p.np_pgno)
226 #define NODEDSZ(node) ((node)->p.np_dsize)
227
228 #define BT_COMMIT_PAGES 64 /* max number of pages to write in one commit */
229 #define BT_MAXCACHE_DEF 1024 /* max number of pages to keep in cache */
230
231 static int btree_read_page(struct btree *bt, pgno_t pgno,
232 struct page *page);
233 static struct mpage *btree_get_mpage(struct btree *bt, pgno_t pgno);
234 static int btree_search_page_root(struct btree *bt,
235 struct mpage *root, struct btval *key,
236 struct cursor *cursor, int modify,
237 struct mpage **mpp);
238 static int btree_search_page(struct btree *bt,
239 struct btree_txn *txn, struct btval *key,
240 struct cursor *cursor, int modify,
241 struct mpage **mpp);
242
243 static int btree_write_header(struct btree *bt, int fd);
244 static int btree_read_header(struct btree *bt);
245 static int btree_is_meta_page(struct page *p);
246 static int btree_read_meta(struct btree *bt, pgno_t *p_next);
247 static int btree_write_meta(struct btree *bt, pgno_t root,
248 unsigned int flags);
249 static void btree_ref(struct btree *bt);
250
251 static struct node *btree_search_node(struct btree *bt, struct mpage *mp,
252 struct btval *key, int *exactp, unsigned int *kip);
253 static int btree_add_node(struct btree *bt, struct mpage *mp,
254 indx_t indx, struct btval *key, struct btval *data,
255 pgno_t pgno, uint8_t flags);
256 static void btree_del_node(struct btree *bt, struct mpage *mp,
257 indx_t indx);
258 static int btree_read_data(struct btree *bt, struct mpage *mp,
259 struct node *leaf, struct btval *data);
260
261 static int btree_rebalance(struct btree *bt, struct mpage *mp);
262 static int btree_update_key(struct btree *bt, struct mpage *mp,
263 indx_t indx, struct btval *key);
264 static int btree_adjust_prefix(struct btree *bt,
265 struct mpage *src, int delta);
266 static int btree_move_node(struct btree *bt, struct mpage *src,
267 indx_t srcindx, struct mpage *dst, indx_t dstindx);
268 static int btree_merge(struct btree *bt, struct mpage *src,
269 struct mpage *dst);
270 static int btree_split(struct btree *bt, struct mpage **mpp,
271 unsigned int *newindxp, struct btval *newkey,
272 struct btval *newdata, pgno_t newpgno);
273 static struct mpage *btree_new_page(struct btree *bt, uint32_t flags);
274 static int btree_write_overflow_data(struct btree *bt,
275 struct page *p, struct btval *data);
276
277 static void cursor_pop_page(struct cursor *cursor);
278 static struct ppage *cursor_push_page(struct cursor *cursor,
279 struct mpage *mp);
280
281 static int bt_set_key(struct btree *bt, struct mpage *mp,
282 struct node *node, struct btval *key);
283 static int btree_sibling(struct cursor *cursor, int move_right);
284 static int btree_cursor_next(struct cursor *cursor,
285 struct btval *key, struct btval *data);
286 static int btree_cursor_set(struct cursor *cursor,
287 struct btval *key, struct btval *data, int *exactp);
288 static int btree_cursor_first(struct cursor *cursor,
289 struct btval *key, struct btval *data);
290
291 static void bt_reduce_separator(struct btree *bt, struct node *min,
292 struct btval *sep);
293 static void remove_prefix(struct btree *bt, struct btval *key,
294 size_t pfxlen);
295 static void expand_prefix(struct btree *bt, struct mpage *mp,
296 indx_t indx, struct btkey *expkey);
297 static void concat_prefix(struct btree *bt, char *s1, size_t n1,
298 char *s2, size_t n2, char *cs, size_t *cn);
299 static void common_prefix(struct btree *bt, struct btkey *min,
300 struct btkey *max, struct btkey *pfx);
301 static void find_common_prefix(struct btree *bt, struct mpage *mp);
302
303 static size_t bt_leaf_size(struct btree *bt, struct btval *key,
304 struct btval *data);
305 static size_t bt_branch_size(struct btree *bt, struct btval *key);
306
307 static pgno_t btree_compact_tree(struct btree *bt, pgno_t pgno,
308 struct btree *btc);
309
310 static int memncmp(const void *s1, size_t n1,
311 const void *s2, size_t n2);
312 static int memnrcmp(const void *s1, size_t n1,
313 const void *s2, size_t n2);
314
315 static int
memncmp(const void * s1,size_t n1,const void * s2,size_t n2)316 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
317 {
318 if (n1 < n2) {
319 if (memcmp(s1, s2, n1) == 0)
320 return -1;
321 }
322 else if (n1 > n2) {
323 if (memcmp(s1, s2, n2) == 0)
324 return 1;
325 }
326 return memcmp(s1, s2, n1);
327 }
328
329 static int
memnrcmp(const void * s1,size_t n1,const void * s2,size_t n2)330 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
331 {
332 const unsigned char *p1;
333 const unsigned char *p2;
334
335 if (n1 == 0)
336 return n2 == 0 ? 0 : -1;
337
338 if (n2 == 0)
339 return n1 == 0 ? 0 : 1;
340
341 p1 = (const unsigned char *)s1 + n1 - 1;
342 p2 = (const unsigned char *)s2 + n2 - 1;
343
344 while (*p1 == *p2) {
345 if (p1 == s1)
346 return (p2 == s2) ? 0 : -1;
347 if (p2 == s2)
348 return (p1 == p2) ? 0 : 1;
349 p1--;
350 p2--;
351 }
352 return *p1 - *p2;
353 }
354
355 int
btree_cmp(struct btree * bt,const struct btval * a,const struct btval * b)356 btree_cmp(struct btree *bt, const struct btval *a, const struct btval *b)
357 {
358 return bt->cmp(a, b);
359 }
360
361 static void
common_prefix(struct btree * bt,struct btkey * min,struct btkey * max,struct btkey * pfx)362 common_prefix(struct btree *bt, struct btkey *min, struct btkey *max,
363 struct btkey *pfx)
364 {
365 size_t n = 0;
366 char *p1;
367 char *p2;
368
369 if (min->len == 0 || max->len == 0) {
370 pfx->len = 0;
371 return;
372 }
373
374 if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
375 p1 = min->str + min->len - 1;
376 p2 = max->str + max->len - 1;
377
378 while (*p1 == *p2) {
379 if (p1 < min->str || p2 < max->str)
380 break;
381 p1--;
382 p2--;
383 n++;
384 }
385
386 assert(n <= (int)sizeof(pfx->str));
387 pfx->len = n;
388 bcopy(p2 + 1, pfx->str, n);
389 } else {
390 p1 = min->str;
391 p2 = max->str;
392
393 while (*p1 == *p2) {
394 if (n == min->len || n == max->len)
395 break;
396 p1++;
397 p2++;
398 n++;
399 }
400
401 assert(n <= (int)sizeof(pfx->str));
402 pfx->len = n;
403 bcopy(max->str, pfx->str, n);
404 }
405 }
406
407 static void
remove_prefix(struct btree * bt,struct btval * key,size_t pfxlen)408 remove_prefix(struct btree *bt, struct btval *key, size_t pfxlen)
409 {
410 if (pfxlen == 0 || bt->cmp != NULL)
411 return;
412
413 DPRINTF("removing %zu bytes of prefix from key [%.*s]", pfxlen,
414 (int)key->size, (char *)key->data);
415 assert(pfxlen <= key->size);
416 key->size -= pfxlen;
417 if (!F_ISSET(bt->flags, BT_REVERSEKEY))
418 key->data = (char *)key->data + pfxlen;
419 }
420
421 static void
expand_prefix(struct btree * bt,struct mpage * mp,indx_t indx,struct btkey * expkey)422 expand_prefix(struct btree *bt, struct mpage *mp, indx_t indx,
423 struct btkey *expkey)
424 {
425 struct node *node;
426
427 node = NODEPTR(mp, indx);
428 expkey->len = sizeof(expkey->str);
429 concat_prefix(bt, mp->prefix.str, mp->prefix.len,
430 NODEKEY(node), node->ksize, expkey->str, &expkey->len);
431 }
432
433 static int
bt_cmp(struct btree * bt,const struct btval * key1,const struct btval * key2,struct btkey * pfx)434 bt_cmp(struct btree *bt, const struct btval *key1, const struct btval *key2,
435 struct btkey *pfx)
436 {
437 if (F_ISSET(bt->flags, BT_REVERSEKEY))
438 return memnrcmp(key1->data, key1->size - pfx->len,
439 key2->data, key2->size);
440 else
441 return memncmp((char *)key1->data + pfx->len, key1->size - pfx->len,
442 key2->data, key2->size);
443 }
444
445 void
btval_reset(struct btval * btv)446 btval_reset(struct btval *btv)
447 {
448 if (btv) {
449 if (btv->mp)
450 btv->mp->ref--;
451 if (btv->free_data)
452 free(btv->data);
453 memset(btv, 0, sizeof(*btv));
454 }
455 }
456
457 static int
mpage_cmp(struct mpage * a,struct mpage * b)458 mpage_cmp(struct mpage *a, struct mpage *b)
459 {
460 if (a->pgno > b->pgno)
461 return 1;
462 if (a->pgno < b->pgno)
463 return -1;
464 return 0;
465 }
466
467 static struct mpage *
mpage_lookup(struct btree * bt,pgno_t pgno)468 mpage_lookup(struct btree *bt, pgno_t pgno)
469 {
470 struct mpage find, *mp;
471
472 find.pgno = pgno;
473 mp = RB_FIND(page_cache, bt->page_cache, &find);
474 if (mp) {
475 bt->stat.hits++;
476 /* Update LRU queue. Move page to the end. */
477 TAILQ_REMOVE(bt->lru_queue, mp, lru_next);
478 TAILQ_INSERT_TAIL(bt->lru_queue, mp, lru_next);
479 }
480 return mp;
481 }
482
483 static void
mpage_add(struct btree * bt,struct mpage * mp)484 mpage_add(struct btree *bt, struct mpage *mp)
485 {
486 assert(RB_INSERT(page_cache, bt->page_cache, mp) == NULL);
487 bt->stat.cache_size++;
488 TAILQ_INSERT_TAIL(bt->lru_queue, mp, lru_next);
489 }
490
491 static void
mpage_free(struct mpage * mp)492 mpage_free(struct mpage *mp)
493 {
494 if (mp != NULL) {
495 free(mp->page);
496 free(mp);
497 }
498 }
499
500 static void
mpage_del(struct btree * bt,struct mpage * mp)501 mpage_del(struct btree *bt, struct mpage *mp)
502 {
503 assert(RB_REMOVE(page_cache, bt->page_cache, mp) == mp);
504 assert(bt->stat.cache_size > 0);
505 bt->stat.cache_size--;
506 TAILQ_REMOVE(bt->lru_queue, mp, lru_next);
507 }
508
509 static void
mpage_flush(struct btree * bt)510 mpage_flush(struct btree *bt)
511 {
512 struct mpage *mp;
513
514 while ((mp = RB_MIN(page_cache, bt->page_cache)) != NULL) {
515 mpage_del(bt, mp);
516 mpage_free(mp);
517 }
518 }
519
520 static struct mpage *
mpage_copy(struct btree * bt,struct mpage * mp)521 mpage_copy(struct btree *bt, struct mpage *mp)
522 {
523 struct mpage *copy;
524
525 if ((copy = calloc(1, sizeof(*copy))) == NULL)
526 return NULL;
527 if ((copy->page = malloc(bt->head.psize)) == NULL) {
528 free(copy);
529 return NULL;
530 }
531 bcopy(mp->page, copy->page, bt->head.psize);
532 bcopy(&mp->prefix, ©->prefix, sizeof(mp->prefix));
533 copy->parent = mp->parent;
534 copy->parent_index = mp->parent_index;
535 copy->pgno = mp->pgno;
536
537 return copy;
538 }
539
540 /* Remove the least recently used memory pages until the cache size is
541 * within the configured bounds. Pages referenced by cursors or returned
542 * key/data are not pruned.
543 */
544 static void
mpage_prune(struct btree * bt)545 mpage_prune(struct btree *bt)
546 {
547 struct mpage *mp, *next;
548
549 for (mp = TAILQ_FIRST(bt->lru_queue); mp; mp = next) {
550 if (bt->stat.cache_size <= bt->stat.max_cache)
551 break;
552 next = TAILQ_NEXT(mp, lru_next);
553 if (!mp->dirty && mp->ref <= 0) {
554 mpage_del(bt, mp);
555 mpage_free(mp);
556 }
557 }
558 }
559
560 /* Mark a page as dirty and push it on the dirty queue.
561 */
562 static void
mpage_dirty(struct btree * bt,struct mpage * mp)563 mpage_dirty(struct btree *bt, struct mpage *mp)
564 {
565 assert(bt != NULL);
566 assert(bt->txn != NULL);
567
568 if (!mp->dirty) {
569 mp->dirty = 1;
570 SIMPLEQ_INSERT_TAIL(bt->txn->dirty_queue, mp, next);
571 }
572 }
573
574 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
575 */
576 static struct mpage *
mpage_touch(struct btree * bt,struct mpage * mp)577 mpage_touch(struct btree *bt, struct mpage *mp)
578 {
579 assert(bt != NULL);
580 assert(bt->txn != NULL);
581 assert(mp != NULL);
582
583 if (!mp->dirty) {
584 DPRINTF("touching page %u -> %u", mp->pgno, bt->txn->next_pgno);
585 if (mp->ref == 0)
586 mpage_del(bt, mp);
587 else {
588 if ((mp = mpage_copy(bt, mp)) == NULL)
589 return NULL;
590 }
591 mp->pgno = mp->page->pgno = bt->txn->next_pgno++;
592 mpage_dirty(bt, mp);
593 mpage_add(bt, mp);
594
595 /* Update the page number to new touched page. */
596 if (mp->parent != NULL)
597 NODEPGNO(NODEPTR(mp->parent,
598 mp->parent_index)) = mp->pgno;
599 }
600
601 return mp;
602 }
603
604 static int
btree_read_page(struct btree * bt,pgno_t pgno,struct page * page)605 btree_read_page(struct btree *bt, pgno_t pgno, struct page *page)
606 {
607 ssize_t rc;
608
609 DPRINTF("reading page %u", pgno);
610 bt->stat.reads++;
611 if ((rc = pread(bt->fd, page, bt->head.psize, (off_t)pgno*bt->head.psize)) == 0) {
612 DPRINTF("page %u doesn't exist", pgno);
613 errno = ENOENT;
614 return BT_FAIL;
615 } else if (rc != (ssize_t)bt->head.psize) {
616 if (rc > 0)
617 errno = EINVAL;
618 DPRINTF("read: %s", strerror(errno));
619 return BT_FAIL;
620 }
621
622 if (page->pgno != pgno) {
623 DPRINTF("page numbers don't match: %u != %u", pgno, page->pgno);
624 errno = EINVAL;
625 return BT_FAIL;
626 }
627
628 DPRINTF("page %u has flags 0x%X", pgno, page->flags);
629
630 return BT_SUCCESS;
631 }
632
633 int
btree_sync(struct btree * bt)634 btree_sync(struct btree *bt)
635 {
636 if (!F_ISSET(bt->flags, BT_NOSYNC))
637 return fsync(bt->fd);
638 return 0;
639 }
640
641 struct btree_txn *
btree_txn_begin(struct btree * bt,int rdonly)642 btree_txn_begin(struct btree *bt, int rdonly)
643 {
644 struct btree_txn *txn;
645
646 if (!rdonly && bt->txn != NULL) {
647 DPRINTF("write transaction already begun");
648 errno = EBUSY;
649 return NULL;
650 }
651
652 if ((txn = calloc(1, sizeof(*txn))) == NULL) {
653 DPRINTF("calloc: %s", strerror(errno));
654 return NULL;
655 }
656
657 if (rdonly) {
658 txn->flags |= BT_TXN_RDONLY;
659 } else {
660 txn->dirty_queue = calloc(1, sizeof(*txn->dirty_queue));
661 if (txn->dirty_queue == NULL) {
662 free(txn);
663 return NULL;
664 }
665 SIMPLEQ_INIT(txn->dirty_queue);
666
667 DPRINTF("taking write lock on txn %p", txn);
668 if (flock(bt->fd, LOCK_EX | LOCK_NB) != 0) {
669 DPRINTF("flock: %s", strerror(errno));
670 errno = EBUSY;
671 free(txn->dirty_queue);
672 free(txn);
673 return NULL;
674 }
675 bt->txn = txn;
676 }
677
678 txn->bt = bt;
679 btree_ref(bt);
680
681 if (btree_read_meta(bt, &txn->next_pgno) != BT_SUCCESS) {
682 btree_txn_abort(txn);
683 return NULL;
684 }
685
686 txn->root = bt->meta.root;
687 DPRINTF("begin transaction on btree %p, root page %u", bt, txn->root);
688
689 return txn;
690 }
691
692 void
btree_txn_abort(struct btree_txn * txn)693 btree_txn_abort(struct btree_txn *txn)
694 {
695 struct mpage *mp;
696 struct btree *bt;
697
698 if (txn == NULL)
699 return;
700
701 bt = txn->bt;
702 DPRINTF("abort transaction on btree %p, root page %u", bt, txn->root);
703
704 if (!F_ISSET(txn->flags, BT_TXN_RDONLY)) {
705 /* Discard all dirty pages.
706 */
707 while (!SIMPLEQ_EMPTY(txn->dirty_queue)) {
708 mp = SIMPLEQ_FIRST(txn->dirty_queue);
709 assert(mp->ref == 0); /* cursors should be closed */
710 mpage_del(bt, mp);
711 SIMPLEQ_REMOVE_HEAD(txn->dirty_queue, next);
712 mpage_free(mp);
713 }
714
715 DPRINTF("releasing write lock on txn %p", txn);
716 txn->bt->txn = NULL;
717 if (flock(txn->bt->fd, LOCK_UN) != 0) {
718 DPRINTF("failed to unlock fd %d: %s",
719 txn->bt->fd, strerror(errno));
720 }
721 free(txn->dirty_queue);
722 }
723
724 btree_close(txn->bt);
725 free(txn);
726 }
727
728 int
btree_txn_commit(struct btree_txn * txn)729 btree_txn_commit(struct btree_txn *txn)
730 {
731 int n, done;
732 ssize_t rc;
733 off_t size;
734 struct mpage *mp;
735 struct btree *bt;
736 struct iovec iov[BT_COMMIT_PAGES];
737
738 assert(txn != NULL);
739 assert(txn->bt != NULL);
740
741 bt = txn->bt;
742
743 if (F_ISSET(txn->flags, BT_TXN_RDONLY)) {
744 DPRINTF("attempt to commit read-only transaction");
745 btree_txn_abort(txn);
746 errno = EPERM;
747 return BT_FAIL;
748 }
749
750 if (txn != bt->txn) {
751 DPRINTF("attempt to commit unknown transaction");
752 btree_txn_abort(txn);
753 errno = EINVAL;
754 return BT_FAIL;
755 }
756
757 if (F_ISSET(txn->flags, BT_TXN_ERROR)) {
758 DPRINTF("error flag is set, can't commit");
759 btree_txn_abort(txn);
760 errno = EINVAL;
761 return BT_FAIL;
762 }
763
764 if (SIMPLEQ_EMPTY(txn->dirty_queue))
765 goto done;
766
767 if (F_ISSET(bt->flags, BT_FIXPADDING)) {
768 size = lseek(bt->fd, 0, SEEK_END);
769 size += bt->head.psize - (size % bt->head.psize);
770 DPRINTF("extending to multiple of page size: %llu", size);
771 if (ftruncate(bt->fd, size) != 0) {
772 DPRINTF("ftruncate: %s", strerror(errno));
773 btree_txn_abort(txn);
774 return BT_FAIL;
775 }
776 bt->flags &= ~BT_FIXPADDING;
777 }
778
779 DPRINTF("committing transaction on btree %p, root page %u",
780 bt, txn->root);
781
782 /* Commit up to BT_COMMIT_PAGES dirty pages to disk until done.
783 */
784 do {
785 n = 0;
786 done = 1;
787 SIMPLEQ_FOREACH(mp, txn->dirty_queue, next) {
788 DPRINTF("committing page %u", mp->pgno);
789 iov[n].iov_len = bt->head.psize;
790 iov[n].iov_base = mp->page;
791 if (++n >= BT_COMMIT_PAGES) {
792 done = 0;
793 break;
794 }
795 }
796
797 if (n == 0)
798 break;
799
800 DPRINTF("committing %u dirty pages", n);
801 rc = writev(bt->fd, iov, n);
802 if (rc != (ssize_t)bt->head.psize*n) {
803 if (rc > 0)
804 DPRINTF("short write, filesystem full?");
805 else
806 DPRINTF("writev: %s", strerror(errno));
807 btree_txn_abort(txn);
808 return BT_FAIL;
809 }
810
811 /* Remove the dirty flag from the written pages.
812 */
813 while (!SIMPLEQ_EMPTY(txn->dirty_queue)) {
814 mp = SIMPLEQ_FIRST(txn->dirty_queue);
815 mp->dirty = 0;
816 SIMPLEQ_REMOVE_HEAD(txn->dirty_queue, next);
817 if (--n == 0)
818 break;
819 }
820 } while (!done);
821
822 if (btree_sync(bt) != 0 ||
823 btree_write_meta(bt, txn->root, 0) != BT_SUCCESS ||
824 btree_sync(bt) != 0) {
825 btree_txn_abort(txn);
826 return BT_FAIL;
827 }
828
829 done:
830 mpage_prune(bt);
831 btree_txn_abort(txn);
832
833 return BT_SUCCESS;
834 }
835
836 static int
btree_write_header(struct btree * bt,int fd)837 btree_write_header(struct btree *bt, int fd)
838 {
839 struct stat sb;
840 struct bt_head *h;
841 struct page *p;
842 ssize_t rc;
843 unsigned int psize;
844
845 DPRINTF("writing header page");
846 assert(bt != NULL);
847
848 /*
849 * Ask stat for 'optimal blocksize for I/O', but cap to fit in indx_t.
850 */
851 if (fstat(fd, &sb) == 0)
852 psize = MINIMUM(32*1024, sb.st_blksize);
853 else
854 psize = PAGESIZE;
855
856 if ((p = calloc(1, psize)) == NULL)
857 return -1;
858 p->flags = P_HEAD;
859
860 h = METADATA(p);
861 h->magic = BT_MAGIC;
862 h->version = BT_VERSION;
863 h->psize = psize;
864 bcopy(h, &bt->head, sizeof(*h));
865
866 rc = write(fd, p, bt->head.psize);
867 free(p);
868 if (rc != (ssize_t)bt->head.psize) {
869 if (rc > 0)
870 DPRINTF("short write, filesystem full?");
871 return BT_FAIL;
872 }
873
874 return BT_SUCCESS;
875 }
876
877 static int
btree_read_header(struct btree * bt)878 btree_read_header(struct btree *bt)
879 {
880 char page[PAGESIZE];
881 struct page *p;
882 struct bt_head *h;
883 int rc;
884
885 assert(bt != NULL);
886
887 /* We don't know the page size yet, so use a minimum value.
888 */
889
890 if ((rc = pread(bt->fd, page, PAGESIZE, 0)) == 0) {
891 errno = ENOENT;
892 return -1;
893 } else if (rc != PAGESIZE) {
894 if (rc > 0)
895 errno = EINVAL;
896 DPRINTF("read: %s", strerror(errno));
897 return -1;
898 }
899
900 p = (struct page *)page;
901
902 if (!F_ISSET(p->flags, P_HEAD)) {
903 DPRINTF("page %d not a header page", p->pgno);
904 errno = EINVAL;
905 return -1;
906 }
907
908 h = METADATA(p);
909 if (h->magic != BT_MAGIC) {
910 DPRINTF("header has invalid magic");
911 errno = EINVAL;
912 return -1;
913 }
914
915 if (h->version != BT_VERSION) {
916 DPRINTF("database is version %u, expected version %u",
917 bt->head.version, BT_VERSION);
918 errno = EINVAL;
919 return -1;
920 }
921
922 bcopy(h, &bt->head, sizeof(*h));
923 return 0;
924 }
925
926 static int
btree_write_meta(struct btree * bt,pgno_t root,unsigned int flags)927 btree_write_meta(struct btree *bt, pgno_t root, unsigned int flags)
928 {
929 struct mpage *mp;
930 struct bt_meta *meta;
931 ssize_t rc;
932
933 DPRINTF("writing meta page for root page %u", root);
934
935 assert(bt != NULL);
936 assert(bt->txn != NULL);
937
938 if ((mp = btree_new_page(bt, P_META)) == NULL)
939 return -1;
940
941 bt->meta.prev_meta = bt->meta.root;
942 bt->meta.root = root;
943 bt->meta.flags = flags;
944 bt->meta.created_at = time(0);
945 bt->meta.revisions++;
946 SHA1((unsigned char *)&bt->meta, METAHASHLEN, bt->meta.hash);
947
948 /* Copy the meta data changes to the new meta page. */
949 meta = METADATA(mp->page);
950 bcopy(&bt->meta, meta, sizeof(*meta));
951
952 rc = write(bt->fd, mp->page, bt->head.psize);
953 mp->dirty = 0;
954 SIMPLEQ_REMOVE_HEAD(bt->txn->dirty_queue, next);
955 if (rc != (ssize_t)bt->head.psize) {
956 if (rc > 0)
957 DPRINTF("short write, filesystem full?");
958 return BT_FAIL;
959 }
960
961 if ((bt->size = lseek(bt->fd, 0, SEEK_END)) == -1) {
962 DPRINTF("failed to update file size: %s", strerror(errno));
963 bt->size = 0;
964 }
965
966 return BT_SUCCESS;
967 }
968
969 /* Returns true if page p is a valid meta page, false otherwise.
970 */
971 static int
btree_is_meta_page(struct page * p)972 btree_is_meta_page(struct page *p)
973 {
974 struct bt_meta *m;
975 unsigned char hash[SHA_DIGEST_LENGTH];
976
977 m = METADATA(p);
978 if (!F_ISSET(p->flags, P_META)) {
979 DPRINTF("page %d not a meta page", p->pgno);
980 errno = EINVAL;
981 return 0;
982 }
983
984 if (m->root >= p->pgno && m->root != P_INVALID) {
985 DPRINTF("page %d points to an invalid root page", p->pgno);
986 errno = EINVAL;
987 return 0;
988 }
989
990 SHA1((unsigned char *)m, METAHASHLEN, hash);
991 if (bcmp(hash, m->hash, SHA_DIGEST_LENGTH) != 0) {
992 DPRINTF("page %d has an invalid digest", p->pgno);
993 errno = EINVAL;
994 return 0;
995 }
996
997 return 1;
998 }
999
1000 static int
btree_read_meta(struct btree * bt,pgno_t * p_next)1001 btree_read_meta(struct btree *bt, pgno_t *p_next)
1002 {
1003 struct mpage *mp;
1004 struct bt_meta *meta;
1005 pgno_t meta_pgno, next_pgno;
1006 off_t size;
1007
1008 assert(bt != NULL);
1009
1010 if ((size = lseek(bt->fd, 0, SEEK_END)) == -1)
1011 goto fail;
1012
1013 DPRINTF("btree_read_meta: size = %llu", size);
1014
1015 if (size < bt->size) {
1016 DPRINTF("file has shrunk!");
1017 errno = EIO;
1018 goto fail;
1019 }
1020
1021 if (size == bt->head.psize) { /* there is only the header */
1022 if (p_next != NULL)
1023 *p_next = 1;
1024 return BT_SUCCESS; /* new file */
1025 }
1026
1027 next_pgno = size / bt->head.psize;
1028 if (next_pgno == 0) {
1029 DPRINTF("corrupt file");
1030 errno = EIO;
1031 goto fail;
1032 }
1033
1034 meta_pgno = next_pgno - 1;
1035
1036 if (size % bt->head.psize != 0) {
1037 DPRINTF("filesize not a multiple of the page size!");
1038 bt->flags |= BT_FIXPADDING;
1039 next_pgno++;
1040 }
1041
1042 if (p_next != NULL)
1043 *p_next = next_pgno;
1044
1045 if (size == bt->size) {
1046 DPRINTF("size unchanged, keeping current meta page");
1047 if (F_ISSET(bt->meta.flags, BT_TOMBSTONE)) {
1048 DPRINTF("file is dead");
1049 errno = ESTALE;
1050 return BT_FAIL;
1051 } else
1052 return BT_SUCCESS;
1053 }
1054 bt->size = size;
1055
1056 while (meta_pgno > 0) {
1057 if ((mp = btree_get_mpage(bt, meta_pgno)) == NULL)
1058 break;
1059 if (btree_is_meta_page(mp->page)) {
1060 meta = METADATA(mp->page);
1061 DPRINTF("flags = 0x%x", meta->flags);
1062 if (F_ISSET(meta->flags, BT_TOMBSTONE)) {
1063 DPRINTF("file is dead");
1064 errno = ESTALE;
1065 return BT_FAIL;
1066 } else {
1067 /* Make copy of last meta page. */
1068 bcopy(meta, &bt->meta, sizeof(bt->meta));
1069 return BT_SUCCESS;
1070 }
1071 }
1072 --meta_pgno; /* scan backwards to first valid meta page */
1073 }
1074
1075 errno = EIO;
1076 fail:
1077 if (p_next != NULL)
1078 *p_next = P_INVALID;
1079 return BT_FAIL;
1080 }
1081
1082 struct btree *
btree_open_fd(int fd,unsigned int flags)1083 btree_open_fd(int fd, unsigned int flags)
1084 {
1085 struct btree *bt;
1086 int fl;
1087
1088 fl = fcntl(fd, F_GETFL);
1089 if (fcntl(fd, F_SETFL, fl | O_APPEND) == -1)
1090 return NULL;
1091
1092 if ((bt = calloc(1, sizeof(*bt))) == NULL)
1093 return NULL;
1094 bt->fd = fd;
1095 bt->flags = flags;
1096 bt->flags &= ~BT_FIXPADDING;
1097 bt->ref = 1;
1098 bt->meta.root = P_INVALID;
1099
1100 if ((bt->page_cache = calloc(1, sizeof(*bt->page_cache))) == NULL)
1101 goto fail;
1102 bt->stat.max_cache = BT_MAXCACHE_DEF;
1103 RB_INIT(bt->page_cache);
1104
1105 if ((bt->lru_queue = calloc(1, sizeof(*bt->lru_queue))) == NULL)
1106 goto fail;
1107 TAILQ_INIT(bt->lru_queue);
1108
1109 if (btree_read_header(bt) != 0) {
1110 if (errno != ENOENT)
1111 goto fail;
1112 DPRINTF("new database");
1113 btree_write_header(bt, bt->fd);
1114 }
1115
1116 if (btree_read_meta(bt, NULL) != 0)
1117 goto fail;
1118
1119 DPRINTF("opened database version %u, pagesize %u",
1120 bt->head.version, bt->head.psize);
1121 DPRINTF("timestamp: %s", ctime(&bt->meta.created_at));
1122 DPRINTF("depth: %u", bt->meta.depth);
1123 DPRINTF("entries: %llu", bt->meta.entries);
1124 DPRINTF("revisions: %u", bt->meta.revisions);
1125 DPRINTF("branch pages: %u", bt->meta.branch_pages);
1126 DPRINTF("leaf pages: %u", bt->meta.leaf_pages);
1127 DPRINTF("overflow pages: %u", bt->meta.overflow_pages);
1128 DPRINTF("root: %u", bt->meta.root);
1129 DPRINTF("previous meta page: %u", bt->meta.prev_meta);
1130
1131 return bt;
1132
1133 fail:
1134 free(bt->lru_queue);
1135 free(bt->page_cache);
1136 free(bt);
1137 return NULL;
1138 }
1139
1140 struct btree *
btree_open(const char * path,unsigned int flags,mode_t mode)1141 btree_open(const char *path, unsigned int flags, mode_t mode)
1142 {
1143 int fd, oflags;
1144 struct btree *bt;
1145
1146 if (F_ISSET(flags, BT_RDONLY))
1147 oflags = O_RDONLY;
1148 else
1149 oflags = O_RDWR | O_CREAT | O_APPEND;
1150
1151 if ((fd = open(path, oflags, mode)) == -1)
1152 return NULL;
1153
1154 if ((bt = btree_open_fd(fd, flags)) == NULL)
1155 close(fd);
1156 else {
1157 bt->path = strdup(path);
1158 DPRINTF("opened btree %p", bt);
1159 }
1160
1161 return bt;
1162 }
1163
1164 static void
btree_ref(struct btree * bt)1165 btree_ref(struct btree *bt)
1166 {
1167 bt->ref++;
1168 DPRINTF("ref is now %d on btree %p", bt->ref, bt);
1169 }
1170
1171 void
btree_close(struct btree * bt)1172 btree_close(struct btree *bt)
1173 {
1174 if (bt == NULL)
1175 return;
1176
1177 if (--bt->ref == 0) {
1178 DPRINTF("ref is zero, closing btree %p", bt);
1179 close(bt->fd);
1180 mpage_flush(bt);
1181 free(bt->lru_queue);
1182 free(bt->path);
1183 free(bt->page_cache);
1184 free(bt);
1185 } else
1186 DPRINTF("ref is now %d on btree %p", bt->ref, bt);
1187 }
1188
1189 /* Search for key within a leaf page, using binary search.
1190 * Returns the smallest entry larger or equal to the key.
1191 * If exactp is non-null, stores whether the found entry was an exact match
1192 * in *exactp (1 or 0).
1193 * If kip is non-null, stores the index of the found entry in *kip.
1194 * If no entry larger of equal to the key is found, returns NULL.
1195 */
1196 static struct node *
btree_search_node(struct btree * bt,struct mpage * mp,struct btval * key,int * exactp,unsigned int * kip)1197 btree_search_node(struct btree *bt, struct mpage *mp, struct btval *key,
1198 int *exactp, unsigned int *kip)
1199 {
1200 unsigned int i = 0;
1201 int low, high;
1202 int rc = 0;
1203 struct node *node;
1204 struct btval nodekey;
1205
1206 DPRINTF("searching %lu keys in %s page %u with prefix [%.*s]",
1207 NUMKEYS(mp),
1208 IS_LEAF(mp) ? "leaf" : "branch",
1209 mp->pgno, (int)mp->prefix.len, (char *)mp->prefix.str);
1210
1211 assert(NUMKEYS(mp) > 0);
1212
1213 memset(&nodekey, 0, sizeof(nodekey));
1214
1215 low = IS_LEAF(mp) ? 0 : 1;
1216 high = NUMKEYS(mp) - 1;
1217 while (low <= high) {
1218 i = (low + high) >> 1;
1219 node = NODEPTR(mp, i);
1220
1221 nodekey.size = node->ksize;
1222 nodekey.data = NODEKEY(node);
1223
1224 if (bt->cmp)
1225 rc = bt->cmp(key, &nodekey);
1226 else
1227 rc = bt_cmp(bt, key, &nodekey, &mp->prefix);
1228
1229 if (IS_LEAF(mp))
1230 DPRINTF("found leaf index %u [%.*s], rc = %d",
1231 i, (int)nodekey.size, (char *)nodekey.data, rc);
1232 else
1233 DPRINTF("found branch index %u [%.*s -> %u], rc = %d",
1234 i, (int)node->ksize, (char *)NODEKEY(node),
1235 node->n_pgno, rc);
1236
1237 if (rc == 0)
1238 break;
1239 if (rc > 0)
1240 low = i + 1;
1241 else
1242 high = i - 1;
1243 }
1244
1245 if (rc > 0) { /* Found entry is less than the key. */
1246 i++; /* Skip to get the smallest entry larger than key. */
1247 if (i >= NUMKEYS(mp))
1248 /* There is no entry larger or equal to the key. */
1249 return NULL;
1250 }
1251 if (exactp)
1252 *exactp = (rc == 0);
1253 if (kip) /* Store the key index if requested. */
1254 *kip = i;
1255
1256 return NODEPTR(mp, i);
1257 }
1258
1259 static void
cursor_pop_page(struct cursor * cursor)1260 cursor_pop_page(struct cursor *cursor)
1261 {
1262 struct ppage *top;
1263
1264 top = CURSOR_TOP(cursor);
1265 CURSOR_POP(cursor);
1266 top->mpage->ref--;
1267
1268 DPRINTF("popped page %u off cursor %p", top->mpage->pgno, cursor);
1269
1270 free(top);
1271 }
1272
1273 static struct ppage *
cursor_push_page(struct cursor * cursor,struct mpage * mp)1274 cursor_push_page(struct cursor *cursor, struct mpage *mp)
1275 {
1276 struct ppage *ppage;
1277
1278 DPRINTF("pushing page %u on cursor %p", mp->pgno, cursor);
1279
1280 if ((ppage = calloc(1, sizeof(*ppage))) == NULL)
1281 return NULL;
1282 ppage->mpage = mp;
1283 mp->ref++;
1284 CURSOR_PUSH(cursor, ppage);
1285 return ppage;
1286 }
1287
1288 static struct mpage *
btree_get_mpage(struct btree * bt,pgno_t pgno)1289 btree_get_mpage(struct btree *bt, pgno_t pgno)
1290 {
1291 struct mpage *mp;
1292
1293 mp = mpage_lookup(bt, pgno);
1294 if (mp == NULL) {
1295 if ((mp = calloc(1, sizeof(*mp))) == NULL)
1296 return NULL;
1297 if ((mp->page = malloc(bt->head.psize)) == NULL) {
1298 free(mp);
1299 return NULL;
1300 }
1301 if (btree_read_page(bt, pgno, mp->page) != BT_SUCCESS) {
1302 mpage_free(mp);
1303 return NULL;
1304 }
1305 mp->pgno = pgno;
1306 mpage_add(bt, mp);
1307 } else
1308 DPRINTF("returning page %u from cache", pgno);
1309
1310 return mp;
1311 }
1312
1313 static void
concat_prefix(struct btree * bt,char * s1,size_t n1,char * s2,size_t n2,char * cs,size_t * cn)1314 concat_prefix(struct btree *bt, char *s1, size_t n1, char *s2, size_t n2,
1315 char *cs, size_t *cn)
1316 {
1317 assert(*cn >= n1 + n2);
1318 if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
1319 bcopy(s2, cs, n2);
1320 bcopy(s1, cs + n2, n1);
1321 } else {
1322 bcopy(s1, cs, n1);
1323 bcopy(s2, cs + n1, n2);
1324 }
1325 *cn = n1 + n2;
1326 }
1327
1328 static void
find_common_prefix(struct btree * bt,struct mpage * mp)1329 find_common_prefix(struct btree *bt, struct mpage *mp)
1330 {
1331 indx_t lbound = 0, ubound = 0;
1332 struct mpage *lp, *up;
1333 struct btkey lprefix, uprefix;
1334
1335 mp->prefix.len = 0;
1336 if (bt->cmp != NULL)
1337 return;
1338
1339 lp = mp;
1340 while (lp->parent != NULL) {
1341 if (lp->parent_index > 0) {
1342 lbound = lp->parent_index;
1343 break;
1344 }
1345 lp = lp->parent;
1346 }
1347
1348 up = mp;
1349 while (up->parent != NULL) {
1350 if (up->parent_index + 1 < (indx_t)NUMKEYS(up->parent)) {
1351 ubound = up->parent_index + 1;
1352 break;
1353 }
1354 up = up->parent;
1355 }
1356
1357 if (lp->parent != NULL && up->parent != NULL) {
1358 expand_prefix(bt, lp->parent, lbound, &lprefix);
1359 expand_prefix(bt, up->parent, ubound, &uprefix);
1360 common_prefix(bt, &lprefix, &uprefix, &mp->prefix);
1361 }
1362 else if (mp->parent)
1363 bcopy(&mp->parent->prefix, &mp->prefix, sizeof(mp->prefix));
1364
1365 DPRINTF("found common prefix [%.*s] (len %zu) for page %u",
1366 (int)mp->prefix.len, mp->prefix.str, mp->prefix.len, mp->pgno);
1367 }
1368
1369 static int
btree_search_page_root(struct btree * bt,struct mpage * root,struct btval * key,struct cursor * cursor,int modify,struct mpage ** mpp)1370 btree_search_page_root(struct btree *bt, struct mpage *root, struct btval *key,
1371 struct cursor *cursor, int modify, struct mpage **mpp)
1372 {
1373 struct mpage *mp, *parent;
1374
1375 if (cursor && cursor_push_page(cursor, root) == NULL)
1376 return BT_FAIL;
1377
1378 mp = root;
1379 while (IS_BRANCH(mp)) {
1380 unsigned int i = 0;
1381 struct node *node;
1382
1383 DPRINTF("branch page %u has %lu keys", mp->pgno, NUMKEYS(mp));
1384 assert(NUMKEYS(mp) > 1);
1385 DPRINTF("found index 0 to page %u", NODEPGNO(NODEPTR(mp, 0)));
1386
1387 if (key == NULL) /* Initialize cursor to first page. */
1388 i = 0;
1389 else {
1390 int exact;
1391 node = btree_search_node(bt, mp, key, &exact, &i);
1392 if (node == NULL)
1393 i = NUMKEYS(mp) - 1;
1394 else if (!exact) {
1395 assert(i > 0);
1396 i--;
1397 }
1398 }
1399
1400 if (key)
1401 DPRINTF("following index %u for key %.*s",
1402 i, (int)key->size, (char *)key->data);
1403 assert(i >= 0 && i < NUMKEYS(mp));
1404 node = NODEPTR(mp, i);
1405
1406 if (cursor)
1407 CURSOR_TOP(cursor)->ki = i;
1408
1409 parent = mp;
1410 if ((mp = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
1411 return BT_FAIL;
1412 mp->parent = parent;
1413 mp->parent_index = i;
1414 find_common_prefix(bt, mp);
1415
1416 if (cursor && cursor_push_page(cursor, mp) == NULL)
1417 return BT_FAIL;
1418
1419 if (modify && (mp = mpage_touch(bt, mp)) == NULL)
1420 return BT_FAIL;
1421 }
1422
1423 if (!IS_LEAF(mp)) {
1424 DPRINTF("internal error, index points to a %02X page!?",
1425 mp->page->flags);
1426 return BT_FAIL;
1427 }
1428
1429 DPRINTF("found leaf page %u for key %.*s", mp->pgno,
1430 key ? (int)key->size : 0, key ? (char *)key->data : NULL);
1431
1432 *mpp = mp;
1433 return BT_SUCCESS;
1434 }
1435
1436 /* Search for the page a given key should be in.
1437 * Stores a pointer to the found page in *mpp.
1438 * If key is NULL, search for the lowest page (used by btree_cursor_first).
1439 * If cursor is non-null, pushes parent pages on the cursor stack.
1440 * If modify is true, visited pages are updated with new page numbers.
1441 */
1442 static int
btree_search_page(struct btree * bt,struct btree_txn * txn,struct btval * key,struct cursor * cursor,int modify,struct mpage ** mpp)1443 btree_search_page(struct btree *bt, struct btree_txn *txn, struct btval *key,
1444 struct cursor *cursor, int modify, struct mpage **mpp)
1445 {
1446 int rc;
1447 pgno_t root;
1448 struct mpage *mp;
1449
1450 /* Can't modify pages outside a transaction. */
1451 if (txn == NULL && modify) {
1452 errno = EINVAL;
1453 return BT_FAIL;
1454 }
1455
1456 /* Choose which root page to start with. If a transaction is given
1457 * use the root page from the transaction, otherwise read the last
1458 * committed root page.
1459 */
1460 if (txn == NULL) {
1461 if ((rc = btree_read_meta(bt, NULL)) != BT_SUCCESS)
1462 return rc;
1463 root = bt->meta.root;
1464 } else if (F_ISSET(txn->flags, BT_TXN_ERROR)) {
1465 DPRINTF("transaction has failed, must abort");
1466 errno = EINVAL;
1467 return BT_FAIL;
1468 } else
1469 root = txn->root;
1470
1471 if (root == P_INVALID) { /* Tree is empty. */
1472 DPRINTF("tree is empty");
1473 errno = ENOENT;
1474 return BT_FAIL;
1475 }
1476
1477 if ((mp = btree_get_mpage(bt, root)) == NULL)
1478 return BT_FAIL;
1479
1480 DPRINTF("root page has flags 0x%X", mp->page->flags);
1481
1482 assert(mp->parent == NULL);
1483 assert(mp->prefix.len == 0);
1484
1485 if (modify && !mp->dirty) {
1486 if ((mp = mpage_touch(bt, mp)) == NULL)
1487 return BT_FAIL;
1488 txn->root = mp->pgno;
1489 }
1490
1491 return btree_search_page_root(bt, mp, key, cursor, modify, mpp);
1492 }
1493
1494 static int
btree_read_data(struct btree * bt,struct mpage * mp,struct node * leaf,struct btval * data)1495 btree_read_data(struct btree *bt, struct mpage *mp, struct node *leaf,
1496 struct btval *data)
1497 {
1498 struct mpage *omp; /* overflow mpage */
1499 size_t psz;
1500 size_t max;
1501 size_t sz = 0;
1502 pgno_t pgno;
1503
1504 memset(data, 0, sizeof(*data));
1505 max = bt->head.psize - PAGEHDRSZ;
1506
1507 if (!F_ISSET(leaf->flags, F_BIGDATA)) {
1508 data->size = leaf->n_dsize;
1509 if (data->size > 0) {
1510 if (mp == NULL) {
1511 if ((data->data = malloc(data->size)) == NULL)
1512 return BT_FAIL;
1513 bcopy(NODEDATA(leaf), data->data, data->size);
1514 data->free_data = 1;
1515 data->mp = NULL;
1516 } else {
1517 data->data = NODEDATA(leaf);
1518 data->free_data = 0;
1519 data->mp = mp;
1520 mp->ref++;
1521 }
1522 }
1523 return BT_SUCCESS;
1524 }
1525
1526 /* Read overflow data.
1527 */
1528 DPRINTF("allocating %u byte for overflow data", leaf->n_dsize);
1529 if ((data->data = malloc(leaf->n_dsize)) == NULL)
1530 return BT_FAIL;
1531 data->size = leaf->n_dsize;
1532 data->free_data = 1;
1533 data->mp = NULL;
1534 bcopy(NODEDATA(leaf), &pgno, sizeof(pgno));
1535 for (sz = 0; sz < data->size; ) {
1536 if ((omp = btree_get_mpage(bt, pgno)) == NULL ||
1537 !F_ISSET(omp->page->flags, P_OVERFLOW)) {
1538 DPRINTF("read overflow page %u failed", pgno);
1539 free(data->data);
1540 mpage_free(omp);
1541 return BT_FAIL;
1542 }
1543 psz = data->size - sz;
1544 if (psz > max)
1545 psz = max;
1546 bcopy(omp->page->ptrs, (char *)data->data + sz, psz);
1547 sz += psz;
1548 pgno = omp->page->p_next_pgno;
1549 }
1550
1551 return BT_SUCCESS;
1552 }
1553
1554 int
btree_txn_get(struct btree * bt,struct btree_txn * txn,struct btval * key,struct btval * data)1555 btree_txn_get(struct btree *bt, struct btree_txn *txn,
1556 struct btval *key, struct btval *data)
1557 {
1558 int rc, exact;
1559 struct node *leaf;
1560 struct mpage *mp;
1561
1562 assert(key);
1563 assert(data);
1564 DPRINTF("===> get key [%.*s]", (int)key->size, (char *)key->data);
1565
1566 if (bt != NULL && txn != NULL && bt != txn->bt) {
1567 errno = EINVAL;
1568 return BT_FAIL;
1569 }
1570
1571 if (bt == NULL) {
1572 if (txn == NULL) {
1573 errno = EINVAL;
1574 return BT_FAIL;
1575 }
1576 bt = txn->bt;
1577 }
1578
1579 if (key->size == 0 || key->size > MAXKEYSIZE) {
1580 errno = EINVAL;
1581 return BT_FAIL;
1582 }
1583
1584 if ((rc = btree_search_page(bt, txn, key, NULL, 0, &mp)) != BT_SUCCESS)
1585 return rc;
1586
1587 leaf = btree_search_node(bt, mp, key, &exact, NULL);
1588 if (leaf && exact)
1589 rc = btree_read_data(bt, mp, leaf, data);
1590 else {
1591 errno = ENOENT;
1592 rc = BT_FAIL;
1593 }
1594
1595 mpage_prune(bt);
1596 return rc;
1597 }
1598
1599 static int
btree_sibling(struct cursor * cursor,int move_right)1600 btree_sibling(struct cursor *cursor, int move_right)
1601 {
1602 int rc;
1603 struct node *indx;
1604 struct ppage *parent, *top;
1605 struct mpage *mp;
1606
1607 top = CURSOR_TOP(cursor);
1608 if ((parent = SLIST_NEXT(top, entry)) == NULL) {
1609 errno = ENOENT;
1610 return BT_FAIL; /* root has no siblings */
1611 }
1612
1613 DPRINTF("parent page is page %u, index %u",
1614 parent->mpage->pgno, parent->ki);
1615
1616 cursor_pop_page(cursor);
1617 if (move_right ? (parent->ki + 1 >= NUMKEYS(parent->mpage))
1618 : (parent->ki == 0)) {
1619 DPRINTF("no more keys left, moving to %s sibling",
1620 move_right ? "right" : "left");
1621 if ((rc = btree_sibling(cursor, move_right)) != BT_SUCCESS)
1622 return rc;
1623 parent = CURSOR_TOP(cursor);
1624 } else {
1625 if (move_right)
1626 parent->ki++;
1627 else
1628 parent->ki--;
1629 DPRINTF("just moving to %s index key %u",
1630 move_right ? "right" : "left", parent->ki);
1631 }
1632 assert(IS_BRANCH(parent->mpage));
1633
1634 indx = NODEPTR(parent->mpage, parent->ki);
1635 if ((mp = btree_get_mpage(cursor->bt, indx->n_pgno)) == NULL)
1636 return BT_FAIL;
1637 mp->parent = parent->mpage;
1638 mp->parent_index = parent->ki;
1639
1640 cursor_push_page(cursor, mp);
1641 find_common_prefix(cursor->bt, mp);
1642
1643 return BT_SUCCESS;
1644 }
1645
1646 static int
bt_set_key(struct btree * bt,struct mpage * mp,struct node * node,struct btval * key)1647 bt_set_key(struct btree *bt, struct mpage *mp, struct node *node,
1648 struct btval *key)
1649 {
1650 if (key == NULL)
1651 return 0;
1652
1653 if (mp->prefix.len > 0) {
1654 key->size = node->ksize + mp->prefix.len;
1655 key->data = malloc(key->size);
1656 if (key->data == NULL)
1657 return -1;
1658 concat_prefix(bt,
1659 mp->prefix.str, mp->prefix.len,
1660 NODEKEY(node), node->ksize,
1661 key->data, &key->size);
1662 key->free_data = 1;
1663 } else {
1664 key->size = node->ksize;
1665 key->data = NODEKEY(node);
1666 key->free_data = 0;
1667 key->mp = mp;
1668 mp->ref++;
1669 }
1670
1671 return 0;
1672 }
1673
1674 static int
btree_cursor_next(struct cursor * cursor,struct btval * key,struct btval * data)1675 btree_cursor_next(struct cursor *cursor, struct btval *key, struct btval *data)
1676 {
1677 struct ppage *top;
1678 struct mpage *mp;
1679 struct node *leaf;
1680
1681 if (cursor->eof) {
1682 errno = ENOENT;
1683 return BT_FAIL;
1684 }
1685
1686 assert(cursor->initialized);
1687
1688 top = CURSOR_TOP(cursor);
1689 mp = top->mpage;
1690
1691 DPRINTF("cursor_next: top page is %u in cursor %p", mp->pgno, cursor);
1692
1693 if (top->ki + 1 >= NUMKEYS(mp)) {
1694 DPRINTF("=====> move to next sibling page");
1695 if (btree_sibling(cursor, 1) != BT_SUCCESS) {
1696 cursor->eof = 1;
1697 return BT_FAIL;
1698 }
1699 top = CURSOR_TOP(cursor);
1700 mp = top->mpage;
1701 DPRINTF("next page is %u, key index %u", mp->pgno, top->ki);
1702 } else
1703 top->ki++;
1704
1705 DPRINTF("==> cursor points to page %u with %lu keys, key index %u",
1706 mp->pgno, NUMKEYS(mp), top->ki);
1707
1708 assert(IS_LEAF(mp));
1709 leaf = NODEPTR(mp, top->ki);
1710
1711 if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1712 return BT_FAIL;
1713
1714 if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1715 return BT_FAIL;
1716
1717 return BT_SUCCESS;
1718 }
1719
1720 static int
btree_cursor_set(struct cursor * cursor,struct btval * key,struct btval * data,int * exactp)1721 btree_cursor_set(struct cursor *cursor, struct btval *key, struct btval *data,
1722 int *exactp)
1723 {
1724 int rc;
1725 struct node *leaf;
1726 struct mpage *mp;
1727 struct ppage *top;
1728
1729 assert(cursor);
1730 assert(key);
1731 assert(key->size > 0);
1732
1733 rc = btree_search_page(cursor->bt, cursor->txn, key, cursor, 0, &mp);
1734 if (rc != BT_SUCCESS)
1735 return rc;
1736 assert(IS_LEAF(mp));
1737
1738 top = CURSOR_TOP(cursor);
1739 leaf = btree_search_node(cursor->bt, mp, key, exactp, &top->ki);
1740 if (exactp != NULL && !*exactp) {
1741 /* BT_CURSOR_EXACT specified and not an exact match. */
1742 errno = ENOENT;
1743 return BT_FAIL;
1744 }
1745
1746 if (leaf == NULL) {
1747 DPRINTF("===> inexact leaf not found, goto sibling");
1748 if (btree_sibling(cursor, 1) != BT_SUCCESS)
1749 return BT_FAIL; /* no entries matched */
1750 top = CURSOR_TOP(cursor);
1751 top->ki = 0;
1752 mp = top->mpage;
1753 assert(IS_LEAF(mp));
1754 leaf = NODEPTR(mp, 0);
1755 }
1756
1757 cursor->initialized = 1;
1758 cursor->eof = 0;
1759
1760 if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1761 return BT_FAIL;
1762
1763 if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1764 return BT_FAIL;
1765 DPRINTF("==> cursor placed on key %.*s",
1766 (int)key->size, (char *)key->data);
1767
1768 return BT_SUCCESS;
1769 }
1770
1771 static int
btree_cursor_first(struct cursor * cursor,struct btval * key,struct btval * data)1772 btree_cursor_first(struct cursor *cursor, struct btval *key, struct btval *data)
1773 {
1774 int rc;
1775 struct mpage *mp;
1776 struct node *leaf;
1777
1778 rc = btree_search_page(cursor->bt, cursor->txn, NULL, cursor, 0, &mp);
1779 if (rc != BT_SUCCESS)
1780 return rc;
1781 assert(IS_LEAF(mp));
1782
1783 leaf = NODEPTR(mp, 0);
1784 cursor->initialized = 1;
1785 cursor->eof = 0;
1786
1787 if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1788 return BT_FAIL;
1789
1790 if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1791 return BT_FAIL;
1792
1793 return BT_SUCCESS;
1794 }
1795
1796 int
btree_cursor_get(struct cursor * cursor,struct btval * key,struct btval * data,enum cursor_op op)1797 btree_cursor_get(struct cursor *cursor, struct btval *key, struct btval *data,
1798 enum cursor_op op)
1799 {
1800 int rc;
1801 int exact = 0;
1802
1803 assert(cursor);
1804
1805 switch (op) {
1806 case BT_CURSOR:
1807 case BT_CURSOR_EXACT:
1808 while (CURSOR_TOP(cursor) != NULL)
1809 cursor_pop_page(cursor);
1810 if (key == NULL || key->size == 0 || key->size > MAXKEYSIZE) {
1811 errno = EINVAL;
1812 rc = BT_FAIL;
1813 } else if (op == BT_CURSOR_EXACT)
1814 rc = btree_cursor_set(cursor, key, data, &exact);
1815 else
1816 rc = btree_cursor_set(cursor, key, data, NULL);
1817 break;
1818 case BT_NEXT:
1819 if (!cursor->initialized)
1820 rc = btree_cursor_first(cursor, key, data);
1821 else
1822 rc = btree_cursor_next(cursor, key, data);
1823 break;
1824 case BT_FIRST:
1825 while (CURSOR_TOP(cursor) != NULL)
1826 cursor_pop_page(cursor);
1827 rc = btree_cursor_first(cursor, key, data);
1828 break;
1829 default:
1830 DPRINTF("unhandled/unimplemented cursor operation %u", op);
1831 rc = BT_FAIL;
1832 break;
1833 }
1834
1835 mpage_prune(cursor->bt);
1836
1837 return rc;
1838 }
1839
1840 static struct mpage *
btree_new_page(struct btree * bt,uint32_t flags)1841 btree_new_page(struct btree *bt, uint32_t flags)
1842 {
1843 struct mpage *mp;
1844
1845 assert(bt != NULL);
1846 assert(bt->txn != NULL);
1847
1848 DPRINTF("allocating new mpage %u, page size %u",
1849 bt->txn->next_pgno, bt->head.psize);
1850 if ((mp = calloc(1, sizeof(*mp))) == NULL)
1851 return NULL;
1852 if ((mp->page = malloc(bt->head.psize)) == NULL) {
1853 free(mp);
1854 return NULL;
1855 }
1856 mp->pgno = mp->page->pgno = bt->txn->next_pgno++;
1857 mp->page->flags = flags;
1858 mp->page->lower = PAGEHDRSZ;
1859 mp->page->upper = bt->head.psize;
1860
1861 if (IS_BRANCH(mp))
1862 bt->meta.branch_pages++;
1863 else if (IS_LEAF(mp))
1864 bt->meta.leaf_pages++;
1865 else if (IS_OVERFLOW(mp))
1866 bt->meta.overflow_pages++;
1867
1868 mpage_add(bt, mp);
1869 mpage_dirty(bt, mp);
1870
1871 return mp;
1872 }
1873
1874 static size_t
bt_leaf_size(struct btree * bt,struct btval * key,struct btval * data)1875 bt_leaf_size(struct btree *bt, struct btval *key, struct btval *data)
1876 {
1877 size_t sz;
1878
1879 sz = LEAFSIZE(key, data);
1880 if (data->size >= bt->head.psize / BT_MINKEYS) {
1881 /* put on overflow page */
1882 sz -= data->size - sizeof(pgno_t);
1883 }
1884
1885 return sz + sizeof(indx_t);
1886 }
1887
1888 static size_t
bt_branch_size(struct btree * bt,struct btval * key)1889 bt_branch_size(struct btree *bt, struct btval *key)
1890 {
1891 size_t sz;
1892
1893 sz = INDXSIZE(key);
1894 if (sz >= bt->head.psize / BT_MINKEYS) {
1895 /* put on overflow page */
1896 /* not implemented */
1897 /* sz -= key->size - sizeof(pgno_t); */
1898 }
1899
1900 return sz + sizeof(indx_t);
1901 }
1902
1903 static int
btree_write_overflow_data(struct btree * bt,struct page * p,struct btval * data)1904 btree_write_overflow_data(struct btree *bt, struct page *p, struct btval *data)
1905 {
1906 size_t done = 0;
1907 size_t sz;
1908 size_t max;
1909 pgno_t *linkp; /* linked page stored here */
1910 struct mpage *next = NULL;
1911
1912 max = bt->head.psize - PAGEHDRSZ;
1913
1914 while (done < data->size) {
1915 if (next != NULL)
1916 p = next->page;
1917 linkp = &p->p_next_pgno;
1918 if (data->size - done > max) {
1919 /* need another overflow page */
1920 if ((next = btree_new_page(bt, P_OVERFLOW)) == NULL)
1921 return BT_FAIL;
1922 *linkp = next->pgno;
1923 DPRINTF("linking overflow page %u", next->pgno);
1924 } else
1925 *linkp = 0; /* indicates end of list */
1926 sz = data->size - done;
1927 if (sz > max)
1928 sz = max;
1929 DPRINTF("copying %zu bytes to overflow page %u", sz, p->pgno);
1930 bcopy((char *)data->data + done, p->ptrs, sz);
1931 done += sz;
1932 }
1933
1934 return BT_SUCCESS;
1935 }
1936
1937 /* Key prefix should already be stripped.
1938 */
1939 static int
btree_add_node(struct btree * bt,struct mpage * mp,indx_t indx,struct btval * key,struct btval * data,pgno_t pgno,uint8_t flags)1940 btree_add_node(struct btree *bt, struct mpage *mp, indx_t indx,
1941 struct btval *key, struct btval *data, pgno_t pgno, uint8_t flags)
1942 {
1943 unsigned int i;
1944 size_t node_size = NODESIZE;
1945 indx_t ofs;
1946 struct node *node;
1947 struct page *p;
1948 struct mpage *ofp = NULL; /* overflow page */
1949
1950 p = mp->page;
1951 assert(p->upper >= p->lower);
1952
1953 DPRINTF("add node [%.*s] to %s page %u at index %d, key size %zu",
1954 key ? (int)key->size : 0, key ? (char *)key->data : NULL,
1955 IS_LEAF(mp) ? "leaf" : "branch",
1956 mp->pgno, indx, key ? key->size : 0);
1957
1958 if (key != NULL)
1959 node_size += key->size;
1960
1961 if (IS_LEAF(mp)) {
1962 assert(data);
1963 node_size += data->size;
1964 if (F_ISSET(flags, F_BIGDATA)) {
1965 /* Data already on overflow page. */
1966 node_size -= data->size - sizeof(pgno_t);
1967 } else if (data->size >= bt->head.psize / BT_MINKEYS) {
1968 /* Put data on overflow page. */
1969 DPRINTF("data size is %zu, put on overflow page",
1970 data->size);
1971 node_size -= data->size - sizeof(pgno_t);
1972 if ((ofp = btree_new_page(bt, P_OVERFLOW)) == NULL)
1973 return BT_FAIL;
1974 DPRINTF("allocated overflow page %u", ofp->pgno);
1975 flags |= F_BIGDATA;
1976 }
1977 }
1978
1979 if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
1980 DPRINTF("not enough room in page %u, got %lu ptrs",
1981 mp->pgno, NUMKEYS(mp));
1982 DPRINTF("upper - lower = %u - %u = %u", p->upper, p->lower,
1983 p->upper - p->lower);
1984 DPRINTF("node size = %zu", node_size);
1985 return BT_FAIL;
1986 }
1987
1988 /* Move higher pointers up one slot. */
1989 for (i = NUMKEYS(mp); i > indx; i--)
1990 p->ptrs[i] = p->ptrs[i - 1];
1991
1992 /* Adjust free space offsets. */
1993 ofs = p->upper - node_size;
1994 assert(ofs >= p->lower + sizeof(indx_t));
1995 p->ptrs[indx] = ofs;
1996 p->upper = ofs;
1997 p->lower += sizeof(indx_t);
1998
1999 /* Write the node data. */
2000 node = NODEPTR(mp, indx);
2001 node->ksize = (key == NULL) ? 0 : key->size;
2002 node->flags = flags;
2003 if (IS_LEAF(mp))
2004 node->n_dsize = data->size;
2005 else
2006 node->n_pgno = pgno;
2007
2008 if (key)
2009 bcopy(key->data, NODEKEY(node), key->size);
2010
2011 if (IS_LEAF(mp)) {
2012 assert(key);
2013 if (ofp == NULL) {
2014 if (F_ISSET(flags, F_BIGDATA))
2015 bcopy(data->data, node->data + key->size,
2016 sizeof(pgno_t));
2017 else
2018 bcopy(data->data, node->data + key->size,
2019 data->size);
2020 } else {
2021 bcopy(&ofp->pgno, node->data + key->size,
2022 sizeof(pgno_t));
2023 if (btree_write_overflow_data(bt, ofp->page,
2024 data) == BT_FAIL)
2025 return BT_FAIL;
2026 }
2027 }
2028
2029 return BT_SUCCESS;
2030 }
2031
2032 static void
btree_del_node(struct btree * bt,struct mpage * mp,indx_t indx)2033 btree_del_node(struct btree *bt, struct mpage *mp, indx_t indx)
2034 {
2035 unsigned int sz;
2036 indx_t i, j, numkeys, ptr;
2037 struct node *node;
2038 char *base;
2039
2040 DPRINTF("delete node %u on %s page %u", indx,
2041 IS_LEAF(mp) ? "leaf" : "branch", mp->pgno);
2042 assert(indx < NUMKEYS(mp));
2043
2044 node = NODEPTR(mp, indx);
2045 sz = NODESIZE + node->ksize;
2046 if (IS_LEAF(mp)) {
2047 if (F_ISSET(node->flags, F_BIGDATA))
2048 sz += sizeof(pgno_t);
2049 else
2050 sz += NODEDSZ(node);
2051 }
2052
2053 ptr = mp->page->ptrs[indx];
2054 numkeys = NUMKEYS(mp);
2055 for (i = j = 0; i < numkeys; i++) {
2056 if (i != indx) {
2057 mp->page->ptrs[j] = mp->page->ptrs[i];
2058 if (mp->page->ptrs[i] < ptr)
2059 mp->page->ptrs[j] += sz;
2060 j++;
2061 }
2062 }
2063
2064 base = (char *)mp->page + mp->page->upper;
2065 bcopy(base, base + sz, ptr - mp->page->upper);
2066
2067 mp->page->lower -= sizeof(indx_t);
2068 mp->page->upper += sz;
2069 }
2070
2071 struct cursor *
btree_txn_cursor_open(struct btree * bt,struct btree_txn * txn)2072 btree_txn_cursor_open(struct btree *bt, struct btree_txn *txn)
2073 {
2074 struct cursor *cursor;
2075
2076 if (bt != NULL && txn != NULL && bt != txn->bt) {
2077 errno = EINVAL;
2078 return NULL;
2079 }
2080
2081 if (bt == NULL) {
2082 if (txn == NULL) {
2083 errno = EINVAL;
2084 return NULL;
2085 }
2086 bt = txn->bt;
2087 }
2088
2089 if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
2090 SLIST_INIT(&cursor->stack);
2091 cursor->bt = bt;
2092 cursor->txn = txn;
2093 btree_ref(bt);
2094 }
2095
2096 return cursor;
2097 }
2098
2099 void
btree_cursor_close(struct cursor * cursor)2100 btree_cursor_close(struct cursor *cursor)
2101 {
2102 if (cursor != NULL) {
2103 while (!CURSOR_EMPTY(cursor))
2104 cursor_pop_page(cursor);
2105
2106 btree_close(cursor->bt);
2107 free(cursor);
2108 }
2109 }
2110
2111 static int
btree_update_key(struct btree * bt,struct mpage * mp,indx_t indx,struct btval * key)2112 btree_update_key(struct btree *bt, struct mpage *mp, indx_t indx,
2113 struct btval *key)
2114 {
2115 indx_t ptr, i, numkeys;
2116 int delta;
2117 size_t len;
2118 struct node *node;
2119 char *base;
2120
2121 node = NODEPTR(mp, indx);
2122 ptr = mp->page->ptrs[indx];
2123 DPRINTF("update key %u (ofs %u) [%.*s] to [%.*s] on page %u",
2124 indx, ptr,
2125 (int)node->ksize, (char *)NODEKEY(node),
2126 (int)key->size, (char *)key->data,
2127 mp->pgno);
2128
2129 if (key->size != node->ksize) {
2130 delta = key->size - node->ksize;
2131 if (delta > 0 && SIZELEFT(mp) < delta) {
2132 DPRINTF("OUCH! Not enough room, delta = %d", delta);
2133 return BT_FAIL;
2134 }
2135
2136 numkeys = NUMKEYS(mp);
2137 for (i = 0; i < numkeys; i++) {
2138 if (mp->page->ptrs[i] <= ptr)
2139 mp->page->ptrs[i] -= delta;
2140 }
2141
2142 base = (char *)mp->page + mp->page->upper;
2143 len = ptr - mp->page->upper + NODESIZE;
2144 bcopy(base, base - delta, len);
2145 mp->page->upper -= delta;
2146
2147 node = NODEPTR(mp, indx);
2148 node->ksize = key->size;
2149 }
2150
2151 bcopy(key->data, NODEKEY(node), key->size);
2152
2153 return BT_SUCCESS;
2154 }
2155
2156 static int
btree_adjust_prefix(struct btree * bt,struct mpage * src,int delta)2157 btree_adjust_prefix(struct btree *bt, struct mpage *src, int delta)
2158 {
2159 indx_t i;
2160 struct node *node;
2161 struct btkey tmpkey;
2162 struct btval key;
2163
2164 DPRINTF("adjusting prefix lengths on page %u with delta %d",
2165 src->pgno, delta);
2166 assert(delta != 0);
2167
2168 for (i = 0; i < NUMKEYS(src); i++) {
2169 node = NODEPTR(src, i);
2170 tmpkey.len = node->ksize - delta;
2171 if (delta > 0) {
2172 if (F_ISSET(bt->flags, BT_REVERSEKEY))
2173 bcopy(NODEKEY(node), tmpkey.str, tmpkey.len);
2174 else
2175 bcopy((char *)NODEKEY(node) + delta, tmpkey.str,
2176 tmpkey.len);
2177 } else {
2178 if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
2179 bcopy(NODEKEY(node), tmpkey.str, node->ksize);
2180 bcopy(src->prefix.str, tmpkey.str + node->ksize,
2181 -delta);
2182 } else {
2183 bcopy(src->prefix.str + src->prefix.len + delta,
2184 tmpkey.str, -delta);
2185 bcopy(NODEKEY(node), tmpkey.str - delta,
2186 node->ksize);
2187 }
2188 }
2189 key.size = tmpkey.len;
2190 key.data = tmpkey.str;
2191 if (btree_update_key(bt, src, i, &key) != BT_SUCCESS)
2192 return BT_FAIL;
2193 }
2194
2195 return BT_SUCCESS;
2196 }
2197
2198 /* Move a node from src to dst.
2199 */
2200 static int
btree_move_node(struct btree * bt,struct mpage * src,indx_t srcindx,struct mpage * dst,indx_t dstindx)2201 btree_move_node(struct btree *bt, struct mpage *src, indx_t srcindx,
2202 struct mpage *dst, indx_t dstindx)
2203 {
2204 int rc;
2205 unsigned int pfxlen, mp_pfxlen = 0;
2206 struct node *srcnode;
2207 struct mpage *mp = NULL;
2208 struct btkey tmpkey, srckey;
2209 struct btval key, data;
2210
2211 assert(src->parent);
2212 assert(dst->parent);
2213
2214 srcnode = NODEPTR(src, srcindx);
2215 DPRINTF("moving %s node %u [%.*s] on page %u to node %u on page %u",
2216 IS_LEAF(src) ? "leaf" : "branch",
2217 srcindx,
2218 (int)srcnode->ksize, (char *)NODEKEY(srcnode),
2219 src->pgno,
2220 dstindx, dst->pgno);
2221
2222 find_common_prefix(bt, src);
2223
2224 if (IS_BRANCH(src)) {
2225 /* Need to check if the page the moved node points to
2226 * changes prefix.
2227 */
2228 if ((mp = btree_get_mpage(bt, NODEPGNO(srcnode))) == NULL)
2229 return BT_FAIL;
2230 mp->parent = src;
2231 mp->parent_index = srcindx;
2232 find_common_prefix(bt, mp);
2233 mp_pfxlen = mp->prefix.len;
2234 }
2235
2236 /* Mark src and dst as dirty. */
2237 if ((src = mpage_touch(bt, src)) == NULL ||
2238 (dst = mpage_touch(bt, dst)) == NULL)
2239 return BT_FAIL;
2240
2241 find_common_prefix(bt, dst);
2242
2243 /* Check if src node has destination page prefix. Otherwise the
2244 * destination page must expand its prefix on all its nodes.
2245 */
2246 srckey.len = srcnode->ksize;
2247 bcopy(NODEKEY(srcnode), srckey.str, srckey.len);
2248 common_prefix(bt, &srckey, &dst->prefix, &tmpkey);
2249 if (tmpkey.len != dst->prefix.len) {
2250 if (btree_adjust_prefix(bt, dst,
2251 tmpkey.len - dst->prefix.len) != BT_SUCCESS)
2252 return BT_FAIL;
2253 bcopy(&tmpkey, &dst->prefix, sizeof(tmpkey));
2254 }
2255
2256 if (srcindx == 0 && IS_BRANCH(src)) {
2257 struct mpage *low;
2258
2259 /* must find the lowest key below src
2260 */
2261 assert(btree_search_page_root(bt, src, NULL, NULL, 0,
2262 &low) == BT_SUCCESS);
2263 expand_prefix(bt, low, 0, &srckey);
2264 DPRINTF("found lowest key [%.*s] on leaf page %u",
2265 (int)srckey.len, srckey.str, low->pgno);
2266 } else {
2267 srckey.len = srcnode->ksize;
2268 bcopy(NODEKEY(srcnode), srckey.str, srcnode->ksize);
2269 }
2270 find_common_prefix(bt, src);
2271
2272 /* expand the prefix */
2273 tmpkey.len = sizeof(tmpkey.str);
2274 concat_prefix(bt, src->prefix.str, src->prefix.len,
2275 srckey.str, srckey.len, tmpkey.str, &tmpkey.len);
2276
2277 /* Add the node to the destination page. Adjust prefix for
2278 * destination page.
2279 */
2280 key.size = tmpkey.len;
2281 key.data = tmpkey.str;
2282 remove_prefix(bt, &key, dst->prefix.len);
2283 data.size = NODEDSZ(srcnode);
2284 data.data = NODEDATA(srcnode);
2285 rc = btree_add_node(bt, dst, dstindx, &key, &data, NODEPGNO(srcnode),
2286 srcnode->flags);
2287 if (rc != BT_SUCCESS)
2288 return rc;
2289
2290 /* Delete the node from the source page.
2291 */
2292 btree_del_node(bt, src, srcindx);
2293
2294 /* Update the parent separators.
2295 */
2296 if (srcindx == 0 && src->parent_index != 0) {
2297 expand_prefix(bt, src, 0, &tmpkey);
2298 key.size = tmpkey.len;
2299 key.data = tmpkey.str;
2300 remove_prefix(bt, &key, src->parent->prefix.len);
2301
2302 DPRINTF("update separator for source page %u to [%.*s]",
2303 src->pgno, (int)key.size, (char *)key.data);
2304 if (btree_update_key(bt, src->parent, src->parent_index,
2305 &key) != BT_SUCCESS)
2306 return BT_FAIL;
2307 }
2308
2309 if (srcindx == 0 && IS_BRANCH(src)) {
2310 struct btval nullkey;
2311 nullkey.size = 0;
2312 assert(btree_update_key(bt, src, 0, &nullkey) == BT_SUCCESS);
2313 }
2314
2315 if (dstindx == 0 && dst->parent_index != 0) {
2316 expand_prefix(bt, dst, 0, &tmpkey);
2317 key.size = tmpkey.len;
2318 key.data = tmpkey.str;
2319 remove_prefix(bt, &key, dst->parent->prefix.len);
2320
2321 DPRINTF("update separator for destination page %u to [%.*s]",
2322 dst->pgno, (int)key.size, (char *)key.data);
2323 if (btree_update_key(bt, dst->parent, dst->parent_index,
2324 &key) != BT_SUCCESS)
2325 return BT_FAIL;
2326 }
2327
2328 if (dstindx == 0 && IS_BRANCH(dst)) {
2329 struct btval nullkey;
2330 nullkey.size = 0;
2331 assert(btree_update_key(bt, dst, 0, &nullkey) == BT_SUCCESS);
2332 }
2333
2334 /* We can get a new page prefix here!
2335 * Must update keys in all nodes of this page!
2336 */
2337 pfxlen = src->prefix.len;
2338 find_common_prefix(bt, src);
2339 if (src->prefix.len != pfxlen) {
2340 if (btree_adjust_prefix(bt, src,
2341 src->prefix.len - pfxlen) != BT_SUCCESS)
2342 return BT_FAIL;
2343 }
2344
2345 pfxlen = dst->prefix.len;
2346 find_common_prefix(bt, dst);
2347 if (dst->prefix.len != pfxlen) {
2348 if (btree_adjust_prefix(bt, dst,
2349 dst->prefix.len - pfxlen) != BT_SUCCESS)
2350 return BT_FAIL;
2351 }
2352
2353 if (IS_BRANCH(dst)) {
2354 assert(mp);
2355 mp->parent = dst;
2356 mp->parent_index = dstindx;
2357 find_common_prefix(bt, mp);
2358 if (mp->prefix.len != mp_pfxlen) {
2359 DPRINTF("moved branch node has changed prefix");
2360 if ((mp = mpage_touch(bt, mp)) == NULL)
2361 return BT_FAIL;
2362 if (btree_adjust_prefix(bt, mp,
2363 mp->prefix.len - mp_pfxlen) != BT_SUCCESS)
2364 return BT_FAIL;
2365 }
2366 }
2367
2368 return BT_SUCCESS;
2369 }
2370
2371 static int
btree_merge(struct btree * bt,struct mpage * src,struct mpage * dst)2372 btree_merge(struct btree *bt, struct mpage *src, struct mpage *dst)
2373 {
2374 int rc;
2375 indx_t i;
2376 unsigned int pfxlen;
2377 struct node *srcnode;
2378 struct btkey tmpkey, dstpfx;
2379 struct btval key, data;
2380
2381 DPRINTF("merging page %u and %u", src->pgno, dst->pgno);
2382
2383 assert(src->parent); /* can't merge root page */
2384 assert(dst->parent);
2385 assert(bt->txn != NULL);
2386
2387 /* Mark src and dst as dirty. */
2388 if ((src = mpage_touch(bt, src)) == NULL ||
2389 (dst = mpage_touch(bt, dst)) == NULL)
2390 return BT_FAIL;
2391
2392 find_common_prefix(bt, src);
2393 find_common_prefix(bt, dst);
2394
2395 /* Check if source nodes has destination page prefix. Otherwise
2396 * the destination page must expand its prefix on all its nodes.
2397 */
2398 common_prefix(bt, &src->prefix, &dst->prefix, &dstpfx);
2399 if (dstpfx.len != dst->prefix.len) {
2400 if (btree_adjust_prefix(bt, dst,
2401 dstpfx.len - dst->prefix.len) != BT_SUCCESS)
2402 return BT_FAIL;
2403 bcopy(&dstpfx, &dst->prefix, sizeof(dstpfx));
2404 }
2405
2406 /* Move all nodes from src to dst.
2407 */
2408 for (i = 0; i < NUMKEYS(src); i++) {
2409 srcnode = NODEPTR(src, i);
2410
2411 /* If branch node 0 (implicit key), find the real key.
2412 */
2413 if (i == 0 && IS_BRANCH(src)) {
2414 struct mpage *low;
2415
2416 /* must find the lowest key below src
2417 */
2418 assert(btree_search_page_root(bt, src, NULL, NULL, 0,
2419 &low) == BT_SUCCESS);
2420 expand_prefix(bt, low, 0, &tmpkey);
2421 DPRINTF("found lowest key [%.*s] on leaf page %u",
2422 (int)tmpkey.len, tmpkey.str, low->pgno);
2423 } else {
2424 expand_prefix(bt, src, i, &tmpkey);
2425 }
2426
2427 key.size = tmpkey.len;
2428 key.data = tmpkey.str;
2429
2430 remove_prefix(bt, &key, dst->prefix.len);
2431 data.size = NODEDSZ(srcnode);
2432 data.data = NODEDATA(srcnode);
2433 rc = btree_add_node(bt, dst, NUMKEYS(dst), &key,
2434 &data, NODEPGNO(srcnode), srcnode->flags);
2435 if (rc != BT_SUCCESS)
2436 return rc;
2437 }
2438
2439 DPRINTF("dst page %u now has %lu keys (%.1f%% filled)",
2440 dst->pgno, NUMKEYS(dst), (float)PAGEFILL(bt, dst) / 10);
2441
2442 /* Unlink the src page from parent.
2443 */
2444 btree_del_node(bt, src->parent, src->parent_index);
2445 if (src->parent_index == 0) {
2446 key.size = 0;
2447 if (btree_update_key(bt, src->parent, 0, &key) != BT_SUCCESS)
2448 return BT_FAIL;
2449
2450 pfxlen = src->prefix.len;
2451 find_common_prefix(bt, src);
2452 assert (src->prefix.len == pfxlen);
2453 }
2454
2455 if (IS_LEAF(src))
2456 bt->meta.leaf_pages--;
2457 else
2458 bt->meta.branch_pages--;
2459
2460 return btree_rebalance(bt, src->parent);
2461 }
2462
2463 #define FILL_THRESHOLD 250
2464
2465 static int
btree_rebalance(struct btree * bt,struct mpage * mp)2466 btree_rebalance(struct btree *bt, struct mpage *mp)
2467 {
2468 struct node *node;
2469 struct mpage *parent;
2470 struct mpage *root;
2471 struct mpage *neighbor = NULL;
2472 indx_t si = 0, di = 0;
2473
2474 assert(bt != NULL);
2475 assert(bt->txn != NULL);
2476 assert(mp != NULL);
2477
2478 DPRINTF("rebalancing %s page %u (has %lu keys, %.1f%% full)",
2479 IS_LEAF(mp) ? "leaf" : "branch",
2480 mp->pgno, NUMKEYS(mp), (float)PAGEFILL(bt, mp) / 10);
2481
2482 if (PAGEFILL(bt, mp) >= FILL_THRESHOLD) {
2483 DPRINTF("no need to rebalance page %u, above fill threshold",
2484 mp->pgno);
2485 return BT_SUCCESS;
2486 }
2487
2488 parent = mp->parent;
2489
2490 if (parent == NULL) {
2491 if (NUMKEYS(mp) == 0) {
2492 DPRINTF("tree is completely empty");
2493 bt->txn->root = P_INVALID;
2494 bt->meta.depth--;
2495 bt->meta.leaf_pages--;
2496 } else if (IS_BRANCH(mp) && NUMKEYS(mp) == 1) {
2497 DPRINTF("collapsing root page!");
2498 bt->txn->root = NODEPGNO(NODEPTR(mp, 0));
2499 if ((root = btree_get_mpage(bt, bt->txn->root)) == NULL)
2500 return BT_FAIL;
2501 root->parent = NULL;
2502 bt->meta.depth--;
2503 bt->meta.branch_pages--;
2504 } else
2505 DPRINTF("root page doesn't need rebalancing");
2506 return BT_SUCCESS;
2507 }
2508
2509 /* The parent (branch page) must have at least 2 pointers,
2510 * otherwise the tree is invalid.
2511 */
2512 assert(NUMKEYS(parent) > 1);
2513
2514 /* Leaf page fill factor is below the threshold.
2515 * Try to move keys from left or right neighbor, or
2516 * merge with a neighbor page.
2517 */
2518
2519 /* Find neighbors.
2520 */
2521 if (mp->parent_index == 0) {
2522 /* We're the leftmost leaf in our parent.
2523 */
2524 DPRINTF("reading right neighbor");
2525 node = NODEPTR(parent, mp->parent_index + 1);
2526 if ((neighbor = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
2527 return BT_FAIL;
2528 neighbor->parent_index = mp->parent_index + 1;
2529 si = 0;
2530 di = NUMKEYS(mp);
2531 } else {
2532 /* There is at least one neighbor to the left.
2533 */
2534 DPRINTF("reading left neighbor");
2535 node = NODEPTR(parent, mp->parent_index - 1);
2536 if ((neighbor = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
2537 return BT_FAIL;
2538 neighbor->parent_index = mp->parent_index - 1;
2539 si = NUMKEYS(neighbor) - 1;
2540 di = 0;
2541 }
2542 neighbor->parent = parent;
2543
2544 DPRINTF("found neighbor page %u (%lu keys, %.1f%% full)",
2545 neighbor->pgno, NUMKEYS(neighbor), (float)PAGEFILL(bt, neighbor) / 10);
2546
2547 /* If the neighbor page is above threshold and has at least two
2548 * keys, move one key from it.
2549 *
2550 * Otherwise we should try to merge them, but that might not be
2551 * possible, even if both are below threshold, as prefix expansion
2552 * might make keys larger. FIXME: detect this
2553 */
2554 if (PAGEFILL(bt, neighbor) >= FILL_THRESHOLD && NUMKEYS(neighbor) >= 2)
2555 return btree_move_node(bt, neighbor, si, mp, di);
2556 else { /* FIXME: if (has_enough_room()) */
2557 if (mp->parent_index == 0)
2558 return btree_merge(bt, neighbor, mp);
2559 else
2560 return btree_merge(bt, mp, neighbor);
2561 }
2562 }
2563
2564 int
btree_txn_del(struct btree * bt,struct btree_txn * txn,struct btval * key,struct btval * data)2565 btree_txn_del(struct btree *bt, struct btree_txn *txn,
2566 struct btval *key, struct btval *data)
2567 {
2568 int rc, exact, close_txn = 0;
2569 unsigned int ki;
2570 struct node *leaf;
2571 struct mpage *mp;
2572
2573 DPRINTF("========> delete key %.*s", (int)key->size, (char *)key->data);
2574
2575 assert(key != NULL);
2576
2577 if (bt != NULL && txn != NULL && bt != txn->bt) {
2578 errno = EINVAL;
2579 return BT_FAIL;
2580 }
2581
2582 if (txn != NULL && F_ISSET(txn->flags, BT_TXN_RDONLY)) {
2583 errno = EINVAL;
2584 return BT_FAIL;
2585 }
2586
2587 if (bt == NULL) {
2588 if (txn == NULL) {
2589 errno = EINVAL;
2590 return BT_FAIL;
2591 }
2592 bt = txn->bt;
2593 }
2594
2595 if (key->size == 0 || key->size > MAXKEYSIZE) {
2596 errno = EINVAL;
2597 return BT_FAIL;
2598 }
2599
2600 if (txn == NULL) {
2601 close_txn = 1;
2602 if ((txn = btree_txn_begin(bt, 0)) == NULL)
2603 return BT_FAIL;
2604 }
2605
2606 if ((rc = btree_search_page(bt, txn, key, NULL, 1, &mp)) != BT_SUCCESS)
2607 goto done;
2608
2609 leaf = btree_search_node(bt, mp, key, &exact, &ki);
2610 if (leaf == NULL || !exact) {
2611 errno = ENOENT;
2612 rc = BT_FAIL;
2613 goto done;
2614 }
2615
2616 if (data && (rc = btree_read_data(bt, NULL, leaf, data)) != BT_SUCCESS)
2617 goto done;
2618
2619 btree_del_node(bt, mp, ki);
2620 bt->meta.entries--;
2621 rc = btree_rebalance(bt, mp);
2622 if (rc != BT_SUCCESS)
2623 txn->flags |= BT_TXN_ERROR;
2624
2625 done:
2626 if (close_txn) {
2627 if (rc == BT_SUCCESS)
2628 rc = btree_txn_commit(txn);
2629 else
2630 btree_txn_abort(txn);
2631 }
2632 mpage_prune(bt);
2633 return rc;
2634 }
2635
2636 /* Reduce the length of the prefix separator <sep> to the minimum length that
2637 * still makes it uniquely distinguishable from <min>.
2638 *
2639 * <min> is guaranteed to be sorted less than <sep>
2640 *
2641 * On return, <sep> is modified to the minimum length.
2642 */
2643 static void
bt_reduce_separator(struct btree * bt,struct node * min,struct btval * sep)2644 bt_reduce_separator(struct btree *bt, struct node *min, struct btval *sep)
2645 {
2646 size_t n = 0;
2647 char *p1;
2648 char *p2;
2649
2650 if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
2651
2652 assert(sep->size > 0);
2653
2654 p1 = (char *)NODEKEY(min) + min->ksize - 1;
2655 p2 = (char *)sep->data + sep->size - 1;
2656
2657 while (p1 >= (char *)NODEKEY(min) && *p1 == *p2) {
2658 assert(p2 > (char *)sep->data);
2659 p1--;
2660 p2--;
2661 n++;
2662 }
2663
2664 sep->data = p2;
2665 sep->size = n + 1;
2666 } else {
2667
2668 assert(min->ksize > 0);
2669 assert(sep->size > 0);
2670
2671 p1 = (char *)NODEKEY(min);
2672 p2 = (char *)sep->data;
2673
2674 while (*p1 == *p2) {
2675 p1++;
2676 p2++;
2677 n++;
2678 if (n == min->ksize || n == sep->size)
2679 break;
2680 }
2681
2682 sep->size = n + 1;
2683 }
2684
2685 DPRINTF("reduced separator to [%.*s] > [%.*s]",
2686 (int)sep->size, (char *)sep->data,
2687 (int)min->ksize, (char *)NODEKEY(min));
2688 }
2689
2690 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
2691 * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
2692 * *newindxp with the actual values after split, ie if *mpp and *newindxp
2693 * refer to a node in the new right sibling page.
2694 */
2695 static int
btree_split(struct btree * bt,struct mpage ** mpp,unsigned int * newindxp,struct btval * newkey,struct btval * newdata,pgno_t newpgno)2696 btree_split(struct btree *bt, struct mpage **mpp, unsigned int *newindxp,
2697 struct btval *newkey, struct btval *newdata, pgno_t newpgno)
2698 {
2699 uint8_t flags;
2700 int rc = BT_SUCCESS, ins_new = 0;
2701 indx_t newindx;
2702 pgno_t pgno = 0;
2703 size_t orig_pfx_len, left_pfx_diff, right_pfx_diff, pfx_diff;
2704 unsigned int i, j, split_indx;
2705 struct node *node;
2706 struct mpage *pright, *p, *mp;
2707 struct btval sepkey, rkey, rdata;
2708 struct btkey tmpkey;
2709 struct page *copy;
2710
2711 assert(bt != NULL);
2712 assert(bt->txn != NULL);
2713
2714 mp = *mpp;
2715 newindx = *newindxp;
2716
2717 DPRINTF("-----> splitting %s page %u and adding [%.*s] at index %d",
2718 IS_LEAF(mp) ? "leaf" : "branch", mp->pgno,
2719 (int)newkey->size, (char *)newkey->data, *newindxp);
2720 DPRINTF("page %u has prefix [%.*s]", mp->pgno,
2721 (int)mp->prefix.len, (char *)mp->prefix.str);
2722 orig_pfx_len = mp->prefix.len;
2723
2724 if (mp->parent == NULL) {
2725 if ((mp->parent = btree_new_page(bt, P_BRANCH)) == NULL)
2726 return BT_FAIL;
2727 mp->parent_index = 0;
2728 bt->txn->root = mp->parent->pgno;
2729 DPRINTF("root split! new root = %u", mp->parent->pgno);
2730 bt->meta.depth++;
2731
2732 /* Add left (implicit) pointer. */
2733 if (btree_add_node(bt, mp->parent, 0, NULL, NULL,
2734 mp->pgno, 0) != BT_SUCCESS)
2735 return BT_FAIL;
2736 } else {
2737 DPRINTF("parent branch page is %u", mp->parent->pgno);
2738 }
2739
2740 /* Create a right sibling. */
2741 if ((pright = btree_new_page(bt, mp->page->flags)) == NULL)
2742 return BT_FAIL;
2743 pright->parent = mp->parent;
2744 pright->parent_index = mp->parent_index + 1;
2745 DPRINTF("new right sibling: page %u", pright->pgno);
2746
2747 /* Move half of the keys to the right sibling. */
2748 if ((copy = malloc(bt->head.psize)) == NULL)
2749 return BT_FAIL;
2750 bcopy(mp->page, copy, bt->head.psize);
2751 assert(mp->ref == 0); /* XXX */
2752 memset(&mp->page->ptrs, 0, bt->head.psize - PAGEHDRSZ);
2753 mp->page->lower = PAGEHDRSZ;
2754 mp->page->upper = bt->head.psize;
2755
2756 split_indx = NUMKEYSP(copy) / 2 + 1;
2757
2758 /* First find the separating key between the split pages.
2759 */
2760 memset(&sepkey, 0, sizeof(sepkey));
2761 if (newindx == split_indx) {
2762 sepkey.size = newkey->size;
2763 sepkey.data = newkey->data;
2764 remove_prefix(bt, &sepkey, mp->prefix.len);
2765 } else {
2766 node = NODEPTRP(copy, split_indx);
2767 sepkey.size = node->ksize;
2768 sepkey.data = NODEKEY(node);
2769 }
2770
2771 if (IS_LEAF(mp) && bt->cmp == NULL) {
2772 /* Find the smallest separator. */
2773 /* Ref: Prefix B-trees, R. Bayer, K. Unterauer, 1977 */
2774 node = NODEPTRP(copy, split_indx - 1);
2775 bt_reduce_separator(bt, node, &sepkey);
2776 }
2777
2778 /* Fix separator wrt parent prefix. */
2779 if (bt->cmp == NULL) {
2780 tmpkey.len = sizeof(tmpkey.str);
2781 concat_prefix(bt, mp->prefix.str, mp->prefix.len,
2782 sepkey.data, sepkey.size, tmpkey.str, &tmpkey.len);
2783 sepkey.data = tmpkey.str;
2784 sepkey.size = tmpkey.len;
2785 }
2786
2787 DPRINTF("separator is [%.*s]", (int)sepkey.size, (char *)sepkey.data);
2788
2789 /* Copy separator key to the parent.
2790 */
2791 if (SIZELEFT(pright->parent) < bt_branch_size(bt, &sepkey)) {
2792 rc = btree_split(bt, &pright->parent, &pright->parent_index,
2793 &sepkey, NULL, pright->pgno);
2794
2795 /* Right page might now have changed parent.
2796 * Check if left page also changed parent.
2797 */
2798 if (pright->parent != mp->parent &&
2799 mp->parent_index >= NUMKEYS(mp->parent)) {
2800 mp->parent = pright->parent;
2801 mp->parent_index = pright->parent_index - 1;
2802 }
2803 } else {
2804 remove_prefix(bt, &sepkey, pright->parent->prefix.len);
2805 rc = btree_add_node(bt, pright->parent, pright->parent_index,
2806 &sepkey, NULL, pright->pgno, 0);
2807 }
2808 if (rc != BT_SUCCESS) {
2809 free(copy);
2810 return BT_FAIL;
2811 }
2812
2813 /* Update prefix for right and left page, if the parent was split.
2814 */
2815 find_common_prefix(bt, pright);
2816 assert(orig_pfx_len <= pright->prefix.len);
2817 right_pfx_diff = pright->prefix.len - orig_pfx_len;
2818
2819 find_common_prefix(bt, mp);
2820 assert(orig_pfx_len <= mp->prefix.len);
2821 left_pfx_diff = mp->prefix.len - orig_pfx_len;
2822
2823 for (i = j = 0; i <= NUMKEYSP(copy); j++) {
2824 if (i < split_indx) {
2825 /* Re-insert in left sibling. */
2826 p = mp;
2827 pfx_diff = left_pfx_diff;
2828 } else {
2829 /* Insert in right sibling. */
2830 if (i == split_indx)
2831 /* Reset insert index for right sibling. */
2832 j = (i == newindx && ins_new);
2833 p = pright;
2834 pfx_diff = right_pfx_diff;
2835 }
2836
2837 if (i == newindx && !ins_new) {
2838 /* Insert the original entry that caused the split. */
2839 rkey.data = newkey->data;
2840 rkey.size = newkey->size;
2841 if (IS_LEAF(mp)) {
2842 rdata.data = newdata->data;
2843 rdata.size = newdata->size;
2844 } else
2845 pgno = newpgno;
2846 flags = 0;
2847 pfx_diff = p->prefix.len;
2848
2849 ins_new = 1;
2850
2851 /* Update page and index for the new key. */
2852 *newindxp = j;
2853 *mpp = p;
2854 } else if (i == NUMKEYSP(copy)) {
2855 break;
2856 } else {
2857 node = NODEPTRP(copy, i);
2858 rkey.data = NODEKEY(node);
2859 rkey.size = node->ksize;
2860 if (IS_LEAF(mp)) {
2861 rdata.data = NODEDATA(node);
2862 rdata.size = node->n_dsize;
2863 } else
2864 pgno = node->n_pgno;
2865 flags = node->flags;
2866
2867 i++;
2868 }
2869
2870 if (!IS_LEAF(mp) && j == 0) {
2871 /* First branch index doesn't need key data. */
2872 rkey.size = 0;
2873 } else
2874 remove_prefix(bt, &rkey, pfx_diff);
2875
2876 rc = btree_add_node(bt, p, j, &rkey, &rdata, pgno,flags);
2877 }
2878
2879 free(copy);
2880 return rc;
2881 }
2882
2883 int
btree_txn_put(struct btree * bt,struct btree_txn * txn,struct btval * key,struct btval * data,unsigned int flags)2884 btree_txn_put(struct btree *bt, struct btree_txn *txn,
2885 struct btval *key, struct btval *data, unsigned int flags)
2886 {
2887 int rc = BT_SUCCESS, exact, close_txn = 0;
2888 unsigned int ki;
2889 struct node *leaf;
2890 struct mpage *mp;
2891 struct btval xkey;
2892
2893 assert(key != NULL);
2894 assert(data != NULL);
2895
2896 if (bt != NULL && txn != NULL && bt != txn->bt) {
2897 errno = EINVAL;
2898 return BT_FAIL;
2899 }
2900
2901 if (txn != NULL && F_ISSET(txn->flags, BT_TXN_RDONLY)) {
2902 errno = EINVAL;
2903 return BT_FAIL;
2904 }
2905
2906 if (bt == NULL) {
2907 if (txn == NULL) {
2908 errno = EINVAL;
2909 return BT_FAIL;
2910 }
2911 bt = txn->bt;
2912 }
2913
2914 if (key->size == 0 || key->size > MAXKEYSIZE) {
2915 errno = EINVAL;
2916 return BT_FAIL;
2917 }
2918
2919 DPRINTF("==> put key %.*s, size %zu, data size %zu",
2920 (int)key->size, (char *)key->data, key->size, data->size);
2921
2922 if (txn == NULL) {
2923 close_txn = 1;
2924 if ((txn = btree_txn_begin(bt, 0)) == NULL)
2925 return BT_FAIL;
2926 }
2927
2928 rc = btree_search_page(bt, txn, key, NULL, 1, &mp);
2929 if (rc == BT_SUCCESS) {
2930 leaf = btree_search_node(bt, mp, key, &exact, &ki);
2931 if (leaf && exact) {
2932 if (F_ISSET(flags, BT_NOOVERWRITE)) {
2933 DPRINTF("duplicate key %.*s",
2934 (int)key->size, (char *)key->data);
2935 errno = EEXIST;
2936 rc = BT_FAIL;
2937 goto done;
2938 }
2939 btree_del_node(bt, mp, ki);
2940 }
2941 if (leaf == NULL) { /* append if not found */
2942 ki = NUMKEYS(mp);
2943 DPRINTF("appending key at index %d", ki);
2944 }
2945 } else if (errno == ENOENT) {
2946 /* new file, just write a root leaf page */
2947 DPRINTF("allocating new root leaf page");
2948 if ((mp = btree_new_page(bt, P_LEAF)) == NULL) {
2949 rc = BT_FAIL;
2950 goto done;
2951 }
2952 txn->root = mp->pgno;
2953 bt->meta.depth++;
2954 ki = 0;
2955 }
2956 else
2957 goto done;
2958
2959 assert(IS_LEAF(mp));
2960 DPRINTF("there are %lu keys, should insert new key at index %u",
2961 NUMKEYS(mp), ki);
2962
2963 /* Copy the key pointer as it is modified by the prefix code. The
2964 * caller might have malloc'ed the data.
2965 */
2966 xkey.data = key->data;
2967 xkey.size = key->size;
2968
2969 if (SIZELEFT(mp) < bt_leaf_size(bt, key, data)) {
2970 rc = btree_split(bt, &mp, &ki, &xkey, data, P_INVALID);
2971 } else {
2972 /* There is room already in this leaf page. */
2973 remove_prefix(bt, &xkey, mp->prefix.len);
2974 rc = btree_add_node(bt, mp, ki, &xkey, data, 0, 0);
2975 }
2976
2977 if (rc != BT_SUCCESS)
2978 txn->flags |= BT_TXN_ERROR;
2979 else
2980 bt->meta.entries++;
2981
2982 done:
2983 if (close_txn) {
2984 if (rc == BT_SUCCESS)
2985 rc = btree_txn_commit(txn);
2986 else
2987 btree_txn_abort(txn);
2988 }
2989 mpage_prune(bt);
2990 return rc;
2991 }
2992
2993 static pgno_t
btree_compact_tree(struct btree * bt,pgno_t pgno,struct btree * btc)2994 btree_compact_tree(struct btree *bt, pgno_t pgno, struct btree *btc)
2995 {
2996 ssize_t rc;
2997 indx_t i;
2998 pgno_t *pnext, next;
2999 struct node *node;
3000 struct page *p;
3001 struct mpage *mp;
3002
3003 /* Get the page and make a copy of it.
3004 */
3005 if ((mp = btree_get_mpage(bt, pgno)) == NULL)
3006 return P_INVALID;
3007 if ((p = malloc(bt->head.psize)) == NULL)
3008 return P_INVALID;
3009 bcopy(mp->page, p, bt->head.psize);
3010
3011 /* Go through all nodes in the (copied) page and update the
3012 * page pointers.
3013 */
3014 if (F_ISSET(p->flags, P_BRANCH)) {
3015 for (i = 0; i < NUMKEYSP(p); i++) {
3016 node = NODEPTRP(p, i);
3017 node->n_pgno = btree_compact_tree(bt, node->n_pgno, btc);
3018 if (node->n_pgno == P_INVALID) {
3019 free(p);
3020 return P_INVALID;
3021 }
3022 }
3023 } else if (F_ISSET(p->flags, P_LEAF)) {
3024 for (i = 0; i < NUMKEYSP(p); i++) {
3025 node = NODEPTRP(p, i);
3026 if (F_ISSET(node->flags, F_BIGDATA)) {
3027 bcopy(NODEDATA(node), &next, sizeof(next));
3028 next = btree_compact_tree(bt, next, btc);
3029 if (next == P_INVALID) {
3030 free(p);
3031 return P_INVALID;
3032 }
3033 bcopy(&next, NODEDATA(node), sizeof(next));
3034 }
3035 }
3036 } else if (F_ISSET(p->flags, P_OVERFLOW)) {
3037 pnext = &p->p_next_pgno;
3038 if (*pnext > 0) {
3039 *pnext = btree_compact_tree(bt, *pnext, btc);
3040 if (*pnext == P_INVALID) {
3041 free(p);
3042 return P_INVALID;
3043 }
3044 }
3045 } else
3046 assert(0);
3047
3048 pgno = p->pgno = btc->txn->next_pgno++;
3049 rc = write(btc->fd, p, bt->head.psize);
3050 free(p);
3051 if (rc != (ssize_t)bt->head.psize)
3052 return P_INVALID;
3053 mpage_prune(bt);
3054 return pgno;
3055 }
3056
3057 int
btree_compact(struct btree * bt)3058 btree_compact(struct btree *bt)
3059 {
3060 char *compact_path = NULL;
3061 struct btree *btc;
3062 struct btree_txn *txn, *txnc = NULL;
3063 int fd;
3064 pgno_t root;
3065
3066 assert(bt != NULL);
3067
3068 DPRINTF("compacting btree %p with path %s", bt, bt->path);
3069
3070 if (bt->path == NULL) {
3071 errno = EINVAL;
3072 return BT_FAIL;
3073 }
3074
3075 if ((txn = btree_txn_begin(bt, 0)) == NULL)
3076 return BT_FAIL;
3077
3078 if (asprintf(&compact_path, "%s.compact.XXXXXX", bt->path) == -1) {
3079 btree_txn_abort(txn);
3080 return BT_FAIL;
3081 }
3082 fd = mkstemp(compact_path);
3083 if (fd == -1) {
3084 free(compact_path);
3085 btree_txn_abort(txn);
3086 return BT_FAIL;
3087 }
3088
3089 if ((btc = btree_open_fd(fd, 0)) == NULL)
3090 goto failed;
3091 bcopy(&bt->meta, &btc->meta, sizeof(bt->meta));
3092 btc->meta.revisions = 0;
3093
3094 if ((txnc = btree_txn_begin(btc, 0)) == NULL)
3095 goto failed;
3096
3097 if (bt->meta.root != P_INVALID) {
3098 root = btree_compact_tree(bt, bt->meta.root, btc);
3099 if (root == P_INVALID)
3100 goto failed;
3101 if (btree_write_meta(btc, root, 0) != BT_SUCCESS)
3102 goto failed;
3103 }
3104
3105 fsync(fd);
3106
3107 DPRINTF("renaming %s to %s", compact_path, bt->path);
3108 if (rename(compact_path, bt->path) != 0)
3109 goto failed;
3110
3111 /* Write a "tombstone" meta page so other processes can pick up
3112 * the change and re-open the file.
3113 */
3114 if (btree_write_meta(bt, P_INVALID, BT_TOMBSTONE) != BT_SUCCESS)
3115 goto failed;
3116
3117 btree_txn_abort(txn);
3118 btree_txn_abort(txnc);
3119 free(compact_path);
3120 btree_close(btc);
3121 mpage_prune(bt);
3122 return 0;
3123
3124 failed:
3125 btree_txn_abort(txn);
3126 btree_txn_abort(txnc);
3127 unlink(compact_path);
3128 free(compact_path);
3129 btree_close(btc);
3130 mpage_prune(bt);
3131 return BT_FAIL;
3132 }
3133
3134 /* Reverts the last change. Truncates the file at the last root page.
3135 */
3136 int
btree_revert(struct btree * bt)3137 btree_revert(struct btree *bt)
3138 {
3139 if (btree_read_meta(bt, NULL) != 0)
3140 return -1;
3141
3142 DPRINTF("truncating file at page %u", bt->meta.root);
3143 return ftruncate(bt->fd, bt->head.psize * bt->meta.root);
3144 }
3145
3146 void
btree_set_cache_size(struct btree * bt,unsigned int cache_size)3147 btree_set_cache_size(struct btree *bt, unsigned int cache_size)
3148 {
3149 bt->stat.max_cache = cache_size;
3150 }
3151
3152 unsigned int
btree_get_flags(struct btree * bt)3153 btree_get_flags(struct btree *bt)
3154 {
3155 return (bt->flags & ~BT_FIXPADDING);
3156 }
3157
3158 const char *
btree_get_path(struct btree * bt)3159 btree_get_path(struct btree *bt)
3160 {
3161 return bt->path;
3162 }
3163
3164 const struct btree_stat *
btree_stat(struct btree * bt)3165 btree_stat(struct btree *bt)
3166 {
3167 if (bt == NULL)
3168 return NULL;
3169
3170 bt->stat.branch_pages = bt->meta.branch_pages;
3171 bt->stat.leaf_pages = bt->meta.leaf_pages;
3172 bt->stat.overflow_pages = bt->meta.overflow_pages;
3173 bt->stat.revisions = bt->meta.revisions;
3174 bt->stat.depth = bt->meta.depth;
3175 bt->stat.entries = bt->meta.entries;
3176 bt->stat.psize = bt->head.psize;
3177 bt->stat.created_at = bt->meta.created_at;
3178
3179 return &bt->stat;
3180 }
3181
3182