xref: /openbsd/usr.sbin/ldapd/btree.c (revision 3572a0e3)
1 /*	$OpenBSD: btree.c,v 1.38 2017/05/26 21:23:14 sthen Exp $ */
2 
3 /*
4  * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/tree.h>
21 #include <sys/stat.h>
22 #include <sys/queue.h>
23 #include <sys/uio.h>
24 
25 #include <assert.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <stddef.h>
30 #include <stdint.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <time.h>
35 #include <unistd.h>
36 
37 #include "btree.h"
38 
39 /* #define DEBUG */
40 
41 #ifdef DEBUG
42 # define DPRINTF(...)	do { fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
43 			     fprintf(stderr, __VA_ARGS__); \
44 			     fprintf(stderr, "\n"); } while(0)
45 #else
46 # define DPRINTF(...)
47 #endif
48 
49 #define PAGESIZE	 4096
50 #define BT_MINKEYS	 4
51 #define BT_MAGIC	 0xB3DBB3DB
52 #define BT_VERSION	 4
53 #define MAXKEYSIZE	 255
54 
55 #define P_INVALID	 0xFFFFFFFF
56 
57 #define F_ISSET(w, f)	 (((w) & (f)) == (f))
58 #define MINIMUM(a,b)	 ((a) < (b) ? (a) : (b))
59 
60 typedef uint32_t	 pgno_t;
61 typedef uint16_t	 indx_t;
62 
63 /* There are four page types: meta, index, leaf and overflow.
64  * They all share the same page header.
65  */
66 struct page {				/* represents an on-disk page */
67 	pgno_t		 pgno;		/* page number */
68 #define	P_BRANCH	 0x01		/* branch page */
69 #define	P_LEAF		 0x02		/* leaf page */
70 #define	P_OVERFLOW	 0x04		/* overflow page */
71 #define	P_META		 0x08		/* meta page */
72 #define	P_HEAD		 0x10		/* header page */
73 	uint32_t	 flags;
74 #define lower		 b.fb.fb_lower
75 #define upper		 b.fb.fb_upper
76 #define p_next_pgno	 b.pb_next_pgno
77 	union page_bounds {
78 		struct {
79 			indx_t	 fb_lower;	/* lower bound of free space */
80 			indx_t	 fb_upper;	/* upper bound of free space */
81 		} fb;
82 		pgno_t		 pb_next_pgno;	/* overflow page linked list */
83 	} b;
84 	indx_t		 ptrs[1];		/* dynamic size */
85 } __packed;
86 
87 #define PAGEHDRSZ	 offsetof(struct page, ptrs)
88 
89 #define NUMKEYSP(p)	 (((p)->lower - PAGEHDRSZ) >> 1)
90 #define NUMKEYS(mp)	 (((mp)->page->lower - PAGEHDRSZ) >> 1)
91 #define SIZELEFT(mp)	 (indx_t)((mp)->page->upper - (mp)->page->lower)
92 #define PAGEFILL(bt, mp) (1000 * ((bt)->head.psize - PAGEHDRSZ - SIZELEFT(mp)) / \
93 				((bt)->head.psize - PAGEHDRSZ))
94 #define IS_LEAF(mp)	 F_ISSET((mp)->page->flags, P_LEAF)
95 #define IS_BRANCH(mp)	 F_ISSET((mp)->page->flags, P_BRANCH)
96 #define IS_OVERFLOW(mp)	 F_ISSET((mp)->page->flags, P_OVERFLOW)
97 
98 struct bt_head {				/* header page content */
99 	uint32_t	 magic;
100 	uint32_t	 version;
101 	uint32_t	 flags;
102 	uint32_t	 psize;			/* page size */
103 } __packed;
104 
105 struct bt_meta {				/* meta (footer) page content */
106 #define BT_TOMBSTONE	 0x01			/* file is replaced */
107 	uint32_t	 flags;
108 	pgno_t		 root;			/* page number of root page */
109 	pgno_t		 prev_meta;		/* previous meta page number */
110 	time_t		 created_at;
111 	uint32_t	 branch_pages;
112 	uint32_t	 leaf_pages;
113 	uint32_t	 overflow_pages;
114 	uint32_t	 revisions;
115 	uint32_t	 depth;
116 	uint64_t	 entries;
117 	unsigned char	 hash[SHA_DIGEST_LENGTH];
118 } __packed;
119 
120 struct btkey {
121 	size_t			 len;
122 	char			 str[MAXKEYSIZE];
123 };
124 
125 struct mpage {					/* an in-memory cached page */
126 	RB_ENTRY(mpage)		 entry;		/* page cache entry */
127 	SIMPLEQ_ENTRY(mpage)	 next;		/* queue of dirty pages */
128 	TAILQ_ENTRY(mpage)	 lru_next;	/* LRU queue */
129 	struct mpage		*parent;	/* NULL if root */
130 	unsigned int		 parent_index;	/* keep track of node index */
131 	struct btkey		 prefix;
132 	struct page		*page;
133 	pgno_t			 pgno;		/* copy of page->pgno */
134 	short			 ref;		/* increased by cursors */
135 	short			 dirty;		/* 1 if on dirty queue */
136 };
137 RB_HEAD(page_cache, mpage);
138 SIMPLEQ_HEAD(dirty_queue, mpage);
139 TAILQ_HEAD(lru_queue, mpage);
140 
141 static int		 mpage_cmp(struct mpage *a, struct mpage *b);
142 static struct mpage	*mpage_lookup(struct btree *bt, pgno_t pgno);
143 static void		 mpage_add(struct btree *bt, struct mpage *mp);
144 static void		 mpage_free(struct mpage *mp);
145 static void		 mpage_del(struct btree *bt, struct mpage *mp);
146 static void		 mpage_flush(struct btree *bt);
147 static struct mpage	*mpage_copy(struct btree *bt, struct mpage *mp);
148 static void		 mpage_prune(struct btree *bt);
149 static void		 mpage_dirty(struct btree *bt, struct mpage *mp);
150 static struct mpage	*mpage_touch(struct btree *bt, struct mpage *mp);
151 
152 RB_PROTOTYPE(page_cache, mpage, entry, mpage_cmp);
153 RB_GENERATE(page_cache, mpage, entry, mpage_cmp);
154 
155 struct ppage {					/* ordered list of pages */
156 	SLIST_ENTRY(ppage)	 entry;
157 	struct mpage		*mpage;
158 	unsigned int		 ki;		/* cursor index on page */
159 };
160 SLIST_HEAD(page_stack, ppage);
161 
162 #define CURSOR_EMPTY(c)		 SLIST_EMPTY(&(c)->stack)
163 #define CURSOR_TOP(c)		 SLIST_FIRST(&(c)->stack)
164 #define CURSOR_POP(c)		 SLIST_REMOVE_HEAD(&(c)->stack, entry)
165 #define CURSOR_PUSH(c,p)	 SLIST_INSERT_HEAD(&(c)->stack, p, entry)
166 
167 struct cursor {
168 	struct btree		*bt;
169 	struct btree_txn	*txn;
170 	struct page_stack	 stack;		/* stack of parent pages */
171 	short			 initialized;	/* 1 if initialized */
172 	short			 eof;		/* 1 if end is reached */
173 };
174 
175 #define METAHASHLEN	 offsetof(struct bt_meta, hash)
176 #define METADATA(p)	 ((void *)((char *)p + PAGEHDRSZ))
177 
178 struct node {
179 #define n_pgno		 p.np_pgno
180 #define n_dsize		 p.np_dsize
181 	union {
182 		pgno_t		 np_pgno;	/* child page number */
183 		uint32_t	 np_dsize;	/* leaf data size */
184 	}		 p;
185 	uint16_t	 ksize;			/* key size */
186 #define F_BIGDATA	 0x01			/* data put on overflow page */
187 	uint8_t		 flags;
188 	char		 data[1];
189 } __packed;
190 
191 struct btree_txn {
192 	pgno_t			 root;		/* current / new root page */
193 	pgno_t			 next_pgno;	/* next unallocated page */
194 	struct btree		*bt;		/* btree is ref'd */
195 	struct dirty_queue	*dirty_queue;	/* modified pages */
196 #define BT_TXN_RDONLY		 0x01		/* read-only transaction */
197 #define BT_TXN_ERROR		 0x02		/* an error has occurred */
198 	unsigned int		 flags;
199 };
200 
201 struct btree {
202 	int			 fd;
203 	char			*path;
204 #define BT_FIXPADDING		 0x01		/* internal */
205 	unsigned int		 flags;
206 	bt_cmp_func		 cmp;		/* user compare function */
207 	struct bt_head		 head;
208 	struct bt_meta		 meta;
209 	struct page_cache	*page_cache;
210 	struct lru_queue	*lru_queue;
211 	struct btree_txn	*txn;		/* current write transaction */
212 	int			 ref;		/* increased by cursors & txn */
213 	struct btree_stat	 stat;
214 	off_t			 size;		/* current file size */
215 };
216 
217 #define NODESIZE	 offsetof(struct node, data)
218 
219 #define INDXSIZE(k)	 (NODESIZE + ((k) == NULL ? 0 : (k)->size))
220 #define LEAFSIZE(k, d)	 (NODESIZE + (k)->size + (d)->size)
221 #define NODEPTRP(p, i)	 ((struct node *)((char *)(p) + (p)->ptrs[i]))
222 #define NODEPTR(mp, i)	 NODEPTRP((mp)->page, i)
223 #define NODEKEY(node)	 (void *)((node)->data)
224 #define NODEDATA(node)	 (void *)((char *)(node)->data + (node)->ksize)
225 #define NODEPGNO(node)	 ((node)->p.np_pgno)
226 #define NODEDSZ(node)	 ((node)->p.np_dsize)
227 
228 #define BT_COMMIT_PAGES	 64	/* max number of pages to write in one commit */
229 #define BT_MAXCACHE_DEF	 1024	/* max number of pages to keep in cache  */
230 
231 static int		 btree_read_page(struct btree *bt, pgno_t pgno,
232 			    struct page *page);
233 static struct mpage	*btree_get_mpage(struct btree *bt, pgno_t pgno);
234 static int		 btree_search_page_root(struct btree *bt,
235 			    struct mpage *root, struct btval *key,
236 			    struct cursor *cursor, int modify,
237 			    struct mpage **mpp);
238 static int		 btree_search_page(struct btree *bt,
239 			    struct btree_txn *txn, struct btval *key,
240 			    struct cursor *cursor, int modify,
241 			    struct mpage **mpp);
242 
243 static int		 btree_write_header(struct btree *bt, int fd);
244 static int		 btree_read_header(struct btree *bt);
245 static int		 btree_is_meta_page(struct page *p);
246 static int		 btree_read_meta(struct btree *bt, pgno_t *p_next);
247 static int		 btree_write_meta(struct btree *bt, pgno_t root,
248 			    unsigned int flags);
249 static void		 btree_ref(struct btree *bt);
250 
251 static struct node	*btree_search_node(struct btree *bt, struct mpage *mp,
252 			    struct btval *key, int *exactp, unsigned int *kip);
253 static int		 btree_add_node(struct btree *bt, struct mpage *mp,
254 			    indx_t indx, struct btval *key, struct btval *data,
255 			    pgno_t pgno, uint8_t flags);
256 static void		 btree_del_node(struct btree *bt, struct mpage *mp,
257 			    indx_t indx);
258 static int		 btree_read_data(struct btree *bt, struct mpage *mp,
259 			    struct node *leaf, struct btval *data);
260 
261 static int		 btree_rebalance(struct btree *bt, struct mpage *mp);
262 static int		 btree_update_key(struct btree *bt, struct mpage *mp,
263 			    indx_t indx, struct btval *key);
264 static int		 btree_adjust_prefix(struct btree *bt,
265 			    struct mpage *src, int delta);
266 static int		 btree_move_node(struct btree *bt, struct mpage *src,
267 			    indx_t srcindx, struct mpage *dst, indx_t dstindx);
268 static int		 btree_merge(struct btree *bt, struct mpage *src,
269 			    struct mpage *dst);
270 static int		 btree_split(struct btree *bt, struct mpage **mpp,
271 			    unsigned int *newindxp, struct btval *newkey,
272 			    struct btval *newdata, pgno_t newpgno);
273 static struct mpage	*btree_new_page(struct btree *bt, uint32_t flags);
274 static int		 btree_write_overflow_data(struct btree *bt,
275 			    struct page *p, struct btval *data);
276 
277 static void		 cursor_pop_page(struct cursor *cursor);
278 static struct ppage	 *cursor_push_page(struct cursor *cursor,
279 			    struct mpage *mp);
280 
281 static int		 bt_set_key(struct btree *bt, struct mpage *mp,
282 			    struct node *node, struct btval *key);
283 static int		 btree_sibling(struct cursor *cursor, int move_right);
284 static int		 btree_cursor_next(struct cursor *cursor,
285 			    struct btval *key, struct btval *data);
286 static int		 btree_cursor_set(struct cursor *cursor,
287 			    struct btval *key, struct btval *data, int *exactp);
288 static int		 btree_cursor_first(struct cursor *cursor,
289 			    struct btval *key, struct btval *data);
290 
291 static void		 bt_reduce_separator(struct btree *bt, struct node *min,
292 			    struct btval *sep);
293 static void		 remove_prefix(struct btree *bt, struct btval *key,
294 			    size_t pfxlen);
295 static void		 expand_prefix(struct btree *bt, struct mpage *mp,
296 			    indx_t indx, struct btkey *expkey);
297 static void		 concat_prefix(struct btree *bt, char *s1, size_t n1,
298 			    char *s2, size_t n2, char *cs, size_t *cn);
299 static void		 common_prefix(struct btree *bt, struct btkey *min,
300 			    struct btkey *max, struct btkey *pfx);
301 static void		 find_common_prefix(struct btree *bt, struct mpage *mp);
302 
303 static size_t		 bt_leaf_size(struct btree *bt, struct btval *key,
304 			    struct btval *data);
305 static size_t		 bt_branch_size(struct btree *bt, struct btval *key);
306 
307 static pgno_t		 btree_compact_tree(struct btree *bt, pgno_t pgno,
308 			    struct btree *btc);
309 
310 static int		 memncmp(const void *s1, size_t n1,
311 				 const void *s2, size_t n2);
312 static int		 memnrcmp(const void *s1, size_t n1,
313 				  const void *s2, size_t n2);
314 
315 static int
memncmp(const void * s1,size_t n1,const void * s2,size_t n2)316 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
317 {
318 	if (n1 < n2) {
319 		if (memcmp(s1, s2, n1) == 0)
320 			return -1;
321 	}
322 	else if (n1 > n2) {
323 		if (memcmp(s1, s2, n2) == 0)
324 			return 1;
325 	}
326 	return memcmp(s1, s2, n1);
327 }
328 
329 static int
memnrcmp(const void * s1,size_t n1,const void * s2,size_t n2)330 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
331 {
332 	const unsigned char	*p1;
333 	const unsigned char	*p2;
334 
335 	if (n1 == 0)
336 		return n2 == 0 ? 0 : -1;
337 
338 	if (n2 == 0)
339 		return n1 == 0 ? 0 : 1;
340 
341 	p1 = (const unsigned char *)s1 + n1 - 1;
342 	p2 = (const unsigned char *)s2 + n2 - 1;
343 
344 	while (*p1 == *p2) {
345 		if (p1 == s1)
346 			return (p2 == s2) ? 0 : -1;
347 		if (p2 == s2)
348 			return (p1 == p2) ? 0 : 1;
349 		p1--;
350 		p2--;
351 	}
352 	return *p1 - *p2;
353 }
354 
355 int
btree_cmp(struct btree * bt,const struct btval * a,const struct btval * b)356 btree_cmp(struct btree *bt, const struct btval *a, const struct btval *b)
357 {
358 	return bt->cmp(a, b);
359 }
360 
361 static void
common_prefix(struct btree * bt,struct btkey * min,struct btkey * max,struct btkey * pfx)362 common_prefix(struct btree *bt, struct btkey *min, struct btkey *max,
363     struct btkey *pfx)
364 {
365 	size_t		 n = 0;
366 	char		*p1;
367 	char		*p2;
368 
369 	if (min->len == 0 || max->len == 0) {
370 		pfx->len = 0;
371 		return;
372 	}
373 
374 	if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
375 		p1 = min->str + min->len - 1;
376 		p2 = max->str + max->len - 1;
377 
378 		while (*p1 == *p2) {
379 			if (p1 < min->str || p2 < max->str)
380 				break;
381 			p1--;
382 			p2--;
383 			n++;
384 		}
385 
386 		assert(n <= (int)sizeof(pfx->str));
387 		pfx->len = n;
388 		bcopy(p2 + 1, pfx->str, n);
389 	} else {
390 		p1 = min->str;
391 		p2 = max->str;
392 
393 		while (*p1 == *p2) {
394 			if (n == min->len || n == max->len)
395 				break;
396 			p1++;
397 			p2++;
398 			n++;
399 		}
400 
401 		assert(n <= (int)sizeof(pfx->str));
402 		pfx->len = n;
403 		bcopy(max->str, pfx->str, n);
404 	}
405 }
406 
407 static void
remove_prefix(struct btree * bt,struct btval * key,size_t pfxlen)408 remove_prefix(struct btree *bt, struct btval *key, size_t pfxlen)
409 {
410 	if (pfxlen == 0 || bt->cmp != NULL)
411 		return;
412 
413 	DPRINTF("removing %zu bytes of prefix from key [%.*s]", pfxlen,
414 	    (int)key->size, (char *)key->data);
415 	assert(pfxlen <= key->size);
416 	key->size -= pfxlen;
417 	if (!F_ISSET(bt->flags, BT_REVERSEKEY))
418 		key->data = (char *)key->data + pfxlen;
419 }
420 
421 static void
expand_prefix(struct btree * bt,struct mpage * mp,indx_t indx,struct btkey * expkey)422 expand_prefix(struct btree *bt, struct mpage *mp, indx_t indx,
423     struct btkey *expkey)
424 {
425 	struct node	*node;
426 
427 	node = NODEPTR(mp, indx);
428 	expkey->len = sizeof(expkey->str);
429 	concat_prefix(bt, mp->prefix.str, mp->prefix.len,
430 	    NODEKEY(node), node->ksize, expkey->str, &expkey->len);
431 }
432 
433 static int
bt_cmp(struct btree * bt,const struct btval * key1,const struct btval * key2,struct btkey * pfx)434 bt_cmp(struct btree *bt, const struct btval *key1, const struct btval *key2,
435     struct btkey *pfx)
436 {
437 	if (F_ISSET(bt->flags, BT_REVERSEKEY))
438 		return memnrcmp(key1->data, key1->size - pfx->len,
439 		    key2->data, key2->size);
440 	else
441 		return memncmp((char *)key1->data + pfx->len, key1->size - pfx->len,
442 		    key2->data, key2->size);
443 }
444 
445 void
btval_reset(struct btval * btv)446 btval_reset(struct btval *btv)
447 {
448 	if (btv) {
449 		if (btv->mp)
450 			btv->mp->ref--;
451 		if (btv->free_data)
452 			free(btv->data);
453 		memset(btv, 0, sizeof(*btv));
454 	}
455 }
456 
457 static int
mpage_cmp(struct mpage * a,struct mpage * b)458 mpage_cmp(struct mpage *a, struct mpage *b)
459 {
460 	if (a->pgno > b->pgno)
461 		return 1;
462 	if (a->pgno < b->pgno)
463 		return -1;
464 	return 0;
465 }
466 
467 static struct mpage *
mpage_lookup(struct btree * bt,pgno_t pgno)468 mpage_lookup(struct btree *bt, pgno_t pgno)
469 {
470 	struct mpage	 find, *mp;
471 
472 	find.pgno = pgno;
473 	mp = RB_FIND(page_cache, bt->page_cache, &find);
474 	if (mp) {
475 		bt->stat.hits++;
476 		/* Update LRU queue. Move page to the end. */
477 		TAILQ_REMOVE(bt->lru_queue, mp, lru_next);
478 		TAILQ_INSERT_TAIL(bt->lru_queue, mp, lru_next);
479 	}
480 	return mp;
481 }
482 
483 static void
mpage_add(struct btree * bt,struct mpage * mp)484 mpage_add(struct btree *bt, struct mpage *mp)
485 {
486 	assert(RB_INSERT(page_cache, bt->page_cache, mp) == NULL);
487 	bt->stat.cache_size++;
488 	TAILQ_INSERT_TAIL(bt->lru_queue, mp, lru_next);
489 }
490 
491 static void
mpage_free(struct mpage * mp)492 mpage_free(struct mpage *mp)
493 {
494 	if (mp != NULL) {
495 		free(mp->page);
496 		free(mp);
497 	}
498 }
499 
500 static void
mpage_del(struct btree * bt,struct mpage * mp)501 mpage_del(struct btree *bt, struct mpage *mp)
502 {
503 	assert(RB_REMOVE(page_cache, bt->page_cache, mp) == mp);
504 	assert(bt->stat.cache_size > 0);
505 	bt->stat.cache_size--;
506 	TAILQ_REMOVE(bt->lru_queue, mp, lru_next);
507 }
508 
509 static void
mpage_flush(struct btree * bt)510 mpage_flush(struct btree *bt)
511 {
512 	struct mpage	*mp;
513 
514 	while ((mp = RB_MIN(page_cache, bt->page_cache)) != NULL) {
515 		mpage_del(bt, mp);
516 		mpage_free(mp);
517 	}
518 }
519 
520 static struct mpage *
mpage_copy(struct btree * bt,struct mpage * mp)521 mpage_copy(struct btree *bt, struct mpage *mp)
522 {
523 	struct mpage	*copy;
524 
525 	if ((copy = calloc(1, sizeof(*copy))) == NULL)
526 		return NULL;
527 	if ((copy->page = malloc(bt->head.psize)) == NULL) {
528 		free(copy);
529 		return NULL;
530 	}
531 	bcopy(mp->page, copy->page, bt->head.psize);
532 	bcopy(&mp->prefix, &copy->prefix, sizeof(mp->prefix));
533 	copy->parent = mp->parent;
534 	copy->parent_index = mp->parent_index;
535 	copy->pgno = mp->pgno;
536 
537 	return copy;
538 }
539 
540 /* Remove the least recently used memory pages until the cache size is
541  * within the configured bounds. Pages referenced by cursors or returned
542  * key/data are not pruned.
543  */
544 static void
mpage_prune(struct btree * bt)545 mpage_prune(struct btree *bt)
546 {
547 	struct mpage	*mp, *next;
548 
549 	for (mp = TAILQ_FIRST(bt->lru_queue); mp; mp = next) {
550 		if (bt->stat.cache_size <= bt->stat.max_cache)
551 			break;
552 		next = TAILQ_NEXT(mp, lru_next);
553 		if (!mp->dirty && mp->ref <= 0) {
554 			mpage_del(bt, mp);
555 			mpage_free(mp);
556 		}
557 	}
558 }
559 
560 /* Mark a page as dirty and push it on the dirty queue.
561  */
562 static void
mpage_dirty(struct btree * bt,struct mpage * mp)563 mpage_dirty(struct btree *bt, struct mpage *mp)
564 {
565 	assert(bt != NULL);
566 	assert(bt->txn != NULL);
567 
568 	if (!mp->dirty) {
569 		mp->dirty = 1;
570 		SIMPLEQ_INSERT_TAIL(bt->txn->dirty_queue, mp, next);
571 	}
572 }
573 
574 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
575  */
576 static struct mpage *
mpage_touch(struct btree * bt,struct mpage * mp)577 mpage_touch(struct btree *bt, struct mpage *mp)
578 {
579 	assert(bt != NULL);
580 	assert(bt->txn != NULL);
581 	assert(mp != NULL);
582 
583 	if (!mp->dirty) {
584 		DPRINTF("touching page %u -> %u", mp->pgno, bt->txn->next_pgno);
585 		if (mp->ref == 0)
586 			mpage_del(bt, mp);
587 		else {
588 			if ((mp = mpage_copy(bt, mp)) == NULL)
589 				return NULL;
590 		}
591 		mp->pgno = mp->page->pgno = bt->txn->next_pgno++;
592 		mpage_dirty(bt, mp);
593 		mpage_add(bt, mp);
594 
595 		/* Update the page number to new touched page. */
596 		if (mp->parent != NULL)
597 			NODEPGNO(NODEPTR(mp->parent,
598 			    mp->parent_index)) = mp->pgno;
599 	}
600 
601 	return mp;
602 }
603 
604 static int
btree_read_page(struct btree * bt,pgno_t pgno,struct page * page)605 btree_read_page(struct btree *bt, pgno_t pgno, struct page *page)
606 {
607 	ssize_t		 rc;
608 
609 	DPRINTF("reading page %u", pgno);
610 	bt->stat.reads++;
611 	if ((rc = pread(bt->fd, page, bt->head.psize, (off_t)pgno*bt->head.psize)) == 0) {
612 		DPRINTF("page %u doesn't exist", pgno);
613 		errno = ENOENT;
614 		return BT_FAIL;
615 	} else if (rc != (ssize_t)bt->head.psize) {
616 		if (rc > 0)
617 			errno = EINVAL;
618 		DPRINTF("read: %s", strerror(errno));
619 		return BT_FAIL;
620 	}
621 
622 	if (page->pgno != pgno) {
623 		DPRINTF("page numbers don't match: %u != %u", pgno, page->pgno);
624 		errno = EINVAL;
625 		return BT_FAIL;
626 	}
627 
628 	DPRINTF("page %u has flags 0x%X", pgno, page->flags);
629 
630 	return BT_SUCCESS;
631 }
632 
633 int
btree_sync(struct btree * bt)634 btree_sync(struct btree *bt)
635 {
636 	if (!F_ISSET(bt->flags, BT_NOSYNC))
637 		return fsync(bt->fd);
638 	return 0;
639 }
640 
641 struct btree_txn *
btree_txn_begin(struct btree * bt,int rdonly)642 btree_txn_begin(struct btree *bt, int rdonly)
643 {
644 	struct btree_txn	*txn;
645 
646 	if (!rdonly && bt->txn != NULL) {
647 		DPRINTF("write transaction already begun");
648 		errno = EBUSY;
649 		return NULL;
650 	}
651 
652 	if ((txn = calloc(1, sizeof(*txn))) == NULL) {
653 		DPRINTF("calloc: %s", strerror(errno));
654 		return NULL;
655 	}
656 
657 	if (rdonly) {
658 		txn->flags |= BT_TXN_RDONLY;
659 	} else {
660 		txn->dirty_queue = calloc(1, sizeof(*txn->dirty_queue));
661 		if (txn->dirty_queue == NULL) {
662 			free(txn);
663 			return NULL;
664 		}
665 		SIMPLEQ_INIT(txn->dirty_queue);
666 
667 		DPRINTF("taking write lock on txn %p", txn);
668 		if (flock(bt->fd, LOCK_EX | LOCK_NB) != 0) {
669 			DPRINTF("flock: %s", strerror(errno));
670 			errno = EBUSY;
671 			free(txn->dirty_queue);
672 			free(txn);
673 			return NULL;
674 		}
675 		bt->txn = txn;
676 	}
677 
678 	txn->bt = bt;
679 	btree_ref(bt);
680 
681 	if (btree_read_meta(bt, &txn->next_pgno) != BT_SUCCESS) {
682 		btree_txn_abort(txn);
683 		return NULL;
684 	}
685 
686 	txn->root = bt->meta.root;
687 	DPRINTF("begin transaction on btree %p, root page %u", bt, txn->root);
688 
689 	return txn;
690 }
691 
692 void
btree_txn_abort(struct btree_txn * txn)693 btree_txn_abort(struct btree_txn *txn)
694 {
695 	struct mpage	*mp;
696 	struct btree	*bt;
697 
698 	if (txn == NULL)
699 		return;
700 
701 	bt = txn->bt;
702 	DPRINTF("abort transaction on btree %p, root page %u", bt, txn->root);
703 
704 	if (!F_ISSET(txn->flags, BT_TXN_RDONLY)) {
705 		/* Discard all dirty pages.
706 		 */
707 		while (!SIMPLEQ_EMPTY(txn->dirty_queue)) {
708 			mp = SIMPLEQ_FIRST(txn->dirty_queue);
709 			assert(mp->ref == 0);	/* cursors should be closed */
710 			mpage_del(bt, mp);
711 			SIMPLEQ_REMOVE_HEAD(txn->dirty_queue, next);
712 			mpage_free(mp);
713 		}
714 
715 		DPRINTF("releasing write lock on txn %p", txn);
716 		txn->bt->txn = NULL;
717 		if (flock(txn->bt->fd, LOCK_UN) != 0) {
718 			DPRINTF("failed to unlock fd %d: %s",
719 			    txn->bt->fd, strerror(errno));
720 		}
721 		free(txn->dirty_queue);
722 	}
723 
724 	btree_close(txn->bt);
725 	free(txn);
726 }
727 
728 int
btree_txn_commit(struct btree_txn * txn)729 btree_txn_commit(struct btree_txn *txn)
730 {
731 	int		 n, done;
732 	ssize_t		 rc;
733 	off_t		 size;
734 	struct mpage	*mp;
735 	struct btree	*bt;
736 	struct iovec	 iov[BT_COMMIT_PAGES];
737 
738 	assert(txn != NULL);
739 	assert(txn->bt != NULL);
740 
741 	bt = txn->bt;
742 
743 	if (F_ISSET(txn->flags, BT_TXN_RDONLY)) {
744 		DPRINTF("attempt to commit read-only transaction");
745 		btree_txn_abort(txn);
746 		errno = EPERM;
747 		return BT_FAIL;
748 	}
749 
750 	if (txn != bt->txn) {
751 		DPRINTF("attempt to commit unknown transaction");
752 		btree_txn_abort(txn);
753 		errno = EINVAL;
754 		return BT_FAIL;
755 	}
756 
757 	if (F_ISSET(txn->flags, BT_TXN_ERROR)) {
758 		DPRINTF("error flag is set, can't commit");
759 		btree_txn_abort(txn);
760 		errno = EINVAL;
761 		return BT_FAIL;
762 	}
763 
764 	if (SIMPLEQ_EMPTY(txn->dirty_queue))
765 		goto done;
766 
767 	if (F_ISSET(bt->flags, BT_FIXPADDING)) {
768 		size = lseek(bt->fd, 0, SEEK_END);
769 		size += bt->head.psize - (size % bt->head.psize);
770 		DPRINTF("extending to multiple of page size: %llu", size);
771 		if (ftruncate(bt->fd, size) != 0) {
772 			DPRINTF("ftruncate: %s", strerror(errno));
773 			btree_txn_abort(txn);
774 			return BT_FAIL;
775 		}
776 		bt->flags &= ~BT_FIXPADDING;
777 	}
778 
779 	DPRINTF("committing transaction on btree %p, root page %u",
780 	    bt, txn->root);
781 
782 	/* Commit up to BT_COMMIT_PAGES dirty pages to disk until done.
783 	 */
784 	do {
785 		n = 0;
786 		done = 1;
787 		SIMPLEQ_FOREACH(mp, txn->dirty_queue, next) {
788 			DPRINTF("committing page %u", mp->pgno);
789 			iov[n].iov_len = bt->head.psize;
790 			iov[n].iov_base = mp->page;
791 			if (++n >= BT_COMMIT_PAGES) {
792 				done = 0;
793 				break;
794 			}
795 		}
796 
797 		if (n == 0)
798 			break;
799 
800 		DPRINTF("committing %u dirty pages", n);
801 		rc = writev(bt->fd, iov, n);
802 		if (rc != (ssize_t)bt->head.psize*n) {
803 			if (rc > 0)
804 				DPRINTF("short write, filesystem full?");
805 			else
806 				DPRINTF("writev: %s", strerror(errno));
807 			btree_txn_abort(txn);
808 			return BT_FAIL;
809 		}
810 
811 		/* Remove the dirty flag from the written pages.
812 		 */
813 		while (!SIMPLEQ_EMPTY(txn->dirty_queue)) {
814 			mp = SIMPLEQ_FIRST(txn->dirty_queue);
815 			mp->dirty = 0;
816 			SIMPLEQ_REMOVE_HEAD(txn->dirty_queue, next);
817 			if (--n == 0)
818 				break;
819 		}
820 	} while (!done);
821 
822 	if (btree_sync(bt) != 0 ||
823 	    btree_write_meta(bt, txn->root, 0) != BT_SUCCESS ||
824 	    btree_sync(bt) != 0) {
825 		btree_txn_abort(txn);
826 		return BT_FAIL;
827 	}
828 
829 done:
830 	mpage_prune(bt);
831 	btree_txn_abort(txn);
832 
833 	return BT_SUCCESS;
834 }
835 
836 static int
btree_write_header(struct btree * bt,int fd)837 btree_write_header(struct btree *bt, int fd)
838 {
839 	struct stat	 sb;
840 	struct bt_head	*h;
841 	struct page	*p;
842 	ssize_t		 rc;
843 	unsigned int	 psize;
844 
845 	DPRINTF("writing header page");
846 	assert(bt != NULL);
847 
848 	/*
849 	 * Ask stat for 'optimal blocksize for I/O', but cap to fit in indx_t.
850 	 */
851 	if (fstat(fd, &sb) == 0)
852 		psize = MINIMUM(32*1024, sb.st_blksize);
853 	else
854 		psize = PAGESIZE;
855 
856 	if ((p = calloc(1, psize)) == NULL)
857 		return -1;
858 	p->flags = P_HEAD;
859 
860 	h = METADATA(p);
861 	h->magic = BT_MAGIC;
862 	h->version = BT_VERSION;
863 	h->psize = psize;
864 	bcopy(h, &bt->head, sizeof(*h));
865 
866 	rc = write(fd, p, bt->head.psize);
867 	free(p);
868 	if (rc != (ssize_t)bt->head.psize) {
869 		if (rc > 0)
870 			DPRINTF("short write, filesystem full?");
871 		return BT_FAIL;
872 	}
873 
874 	return BT_SUCCESS;
875 }
876 
877 static int
btree_read_header(struct btree * bt)878 btree_read_header(struct btree *bt)
879 {
880 	char		 page[PAGESIZE];
881 	struct page	*p;
882 	struct bt_head	*h;
883 	int		 rc;
884 
885 	assert(bt != NULL);
886 
887 	/* We don't know the page size yet, so use a minimum value.
888 	 */
889 
890 	if ((rc = pread(bt->fd, page, PAGESIZE, 0)) == 0) {
891 		errno = ENOENT;
892 		return -1;
893 	} else if (rc != PAGESIZE) {
894 		if (rc > 0)
895 			errno = EINVAL;
896 		DPRINTF("read: %s", strerror(errno));
897 		return -1;
898 	}
899 
900 	p = (struct page *)page;
901 
902 	if (!F_ISSET(p->flags, P_HEAD)) {
903 		DPRINTF("page %d not a header page", p->pgno);
904 		errno = EINVAL;
905 		return -1;
906 	}
907 
908 	h = METADATA(p);
909 	if (h->magic != BT_MAGIC) {
910 		DPRINTF("header has invalid magic");
911 		errno = EINVAL;
912 		return -1;
913 	}
914 
915 	if (h->version != BT_VERSION) {
916 		DPRINTF("database is version %u, expected version %u",
917 		    bt->head.version, BT_VERSION);
918 		errno = EINVAL;
919 		return -1;
920 	}
921 
922 	bcopy(h, &bt->head, sizeof(*h));
923 	return 0;
924 }
925 
926 static int
btree_write_meta(struct btree * bt,pgno_t root,unsigned int flags)927 btree_write_meta(struct btree *bt, pgno_t root, unsigned int flags)
928 {
929 	struct mpage	*mp;
930 	struct bt_meta	*meta;
931 	ssize_t		 rc;
932 
933 	DPRINTF("writing meta page for root page %u", root);
934 
935 	assert(bt != NULL);
936 	assert(bt->txn != NULL);
937 
938 	if ((mp = btree_new_page(bt, P_META)) == NULL)
939 		return -1;
940 
941 	bt->meta.prev_meta = bt->meta.root;
942 	bt->meta.root = root;
943 	bt->meta.flags = flags;
944 	bt->meta.created_at = time(0);
945 	bt->meta.revisions++;
946 	SHA1((unsigned char *)&bt->meta, METAHASHLEN, bt->meta.hash);
947 
948 	/* Copy the meta data changes to the new meta page. */
949 	meta = METADATA(mp->page);
950 	bcopy(&bt->meta, meta, sizeof(*meta));
951 
952 	rc = write(bt->fd, mp->page, bt->head.psize);
953 	mp->dirty = 0;
954 	SIMPLEQ_REMOVE_HEAD(bt->txn->dirty_queue, next);
955 	if (rc != (ssize_t)bt->head.psize) {
956 		if (rc > 0)
957 			DPRINTF("short write, filesystem full?");
958 		return BT_FAIL;
959 	}
960 
961 	if ((bt->size = lseek(bt->fd, 0, SEEK_END)) == -1) {
962 		DPRINTF("failed to update file size: %s", strerror(errno));
963 		bt->size = 0;
964 	}
965 
966 	return BT_SUCCESS;
967 }
968 
969 /* Returns true if page p is a valid meta page, false otherwise.
970  */
971 static int
btree_is_meta_page(struct page * p)972 btree_is_meta_page(struct page *p)
973 {
974 	struct bt_meta	*m;
975 	unsigned char	 hash[SHA_DIGEST_LENGTH];
976 
977 	m = METADATA(p);
978 	if (!F_ISSET(p->flags, P_META)) {
979 		DPRINTF("page %d not a meta page", p->pgno);
980 		errno = EINVAL;
981 		return 0;
982 	}
983 
984 	if (m->root >= p->pgno && m->root != P_INVALID) {
985 		DPRINTF("page %d points to an invalid root page", p->pgno);
986 		errno = EINVAL;
987 		return 0;
988 	}
989 
990 	SHA1((unsigned char *)m, METAHASHLEN, hash);
991 	if (bcmp(hash, m->hash, SHA_DIGEST_LENGTH) != 0) {
992 		DPRINTF("page %d has an invalid digest", p->pgno);
993 		errno = EINVAL;
994 		return 0;
995 	}
996 
997 	return 1;
998 }
999 
1000 static int
btree_read_meta(struct btree * bt,pgno_t * p_next)1001 btree_read_meta(struct btree *bt, pgno_t *p_next)
1002 {
1003 	struct mpage	*mp;
1004 	struct bt_meta	*meta;
1005 	pgno_t		 meta_pgno, next_pgno;
1006 	off_t		 size;
1007 
1008 	assert(bt != NULL);
1009 
1010 	if ((size = lseek(bt->fd, 0, SEEK_END)) == -1)
1011 		goto fail;
1012 
1013 	DPRINTF("btree_read_meta: size = %llu", size);
1014 
1015 	if (size < bt->size) {
1016 		DPRINTF("file has shrunk!");
1017 		errno = EIO;
1018 		goto fail;
1019 	}
1020 
1021 	if (size == bt->head.psize) {		/* there is only the header */
1022 		if (p_next != NULL)
1023 			*p_next = 1;
1024 		return BT_SUCCESS;		/* new file */
1025 	}
1026 
1027 	next_pgno = size / bt->head.psize;
1028 	if (next_pgno == 0) {
1029 		DPRINTF("corrupt file");
1030 		errno = EIO;
1031 		goto fail;
1032 	}
1033 
1034 	meta_pgno = next_pgno - 1;
1035 
1036 	if (size % bt->head.psize != 0) {
1037 		DPRINTF("filesize not a multiple of the page size!");
1038 		bt->flags |= BT_FIXPADDING;
1039 		next_pgno++;
1040 	}
1041 
1042 	if (p_next != NULL)
1043 		*p_next = next_pgno;
1044 
1045 	if (size == bt->size) {
1046 		DPRINTF("size unchanged, keeping current meta page");
1047 		if (F_ISSET(bt->meta.flags, BT_TOMBSTONE)) {
1048 			DPRINTF("file is dead");
1049 			errno = ESTALE;
1050 			return BT_FAIL;
1051 		} else
1052 			return BT_SUCCESS;
1053 	}
1054 	bt->size = size;
1055 
1056 	while (meta_pgno > 0) {
1057 		if ((mp = btree_get_mpage(bt, meta_pgno)) == NULL)
1058 			break;
1059 		if (btree_is_meta_page(mp->page)) {
1060 			meta = METADATA(mp->page);
1061 			DPRINTF("flags = 0x%x", meta->flags);
1062 			if (F_ISSET(meta->flags, BT_TOMBSTONE)) {
1063 				DPRINTF("file is dead");
1064 				errno = ESTALE;
1065 				return BT_FAIL;
1066 			} else {
1067 				/* Make copy of last meta page. */
1068 				bcopy(meta, &bt->meta, sizeof(bt->meta));
1069 				return BT_SUCCESS;
1070 			}
1071 		}
1072 		--meta_pgno;	/* scan backwards to first valid meta page */
1073 	}
1074 
1075 	errno = EIO;
1076 fail:
1077 	if (p_next != NULL)
1078 		*p_next = P_INVALID;
1079 	return BT_FAIL;
1080 }
1081 
1082 struct btree *
btree_open_fd(int fd,unsigned int flags)1083 btree_open_fd(int fd, unsigned int flags)
1084 {
1085 	struct btree	*bt;
1086 	int		 fl;
1087 
1088 	fl = fcntl(fd, F_GETFL);
1089 	if (fcntl(fd, F_SETFL, fl | O_APPEND) == -1)
1090 		return NULL;
1091 
1092 	if ((bt = calloc(1, sizeof(*bt))) == NULL)
1093 		return NULL;
1094 	bt->fd = fd;
1095 	bt->flags = flags;
1096 	bt->flags &= ~BT_FIXPADDING;
1097 	bt->ref = 1;
1098 	bt->meta.root = P_INVALID;
1099 
1100 	if ((bt->page_cache = calloc(1, sizeof(*bt->page_cache))) == NULL)
1101 		goto fail;
1102 	bt->stat.max_cache = BT_MAXCACHE_DEF;
1103 	RB_INIT(bt->page_cache);
1104 
1105 	if ((bt->lru_queue = calloc(1, sizeof(*bt->lru_queue))) == NULL)
1106 		goto fail;
1107 	TAILQ_INIT(bt->lru_queue);
1108 
1109 	if (btree_read_header(bt) != 0) {
1110 		if (errno != ENOENT)
1111 			goto fail;
1112 		DPRINTF("new database");
1113 		btree_write_header(bt, bt->fd);
1114 	}
1115 
1116 	if (btree_read_meta(bt, NULL) != 0)
1117 		goto fail;
1118 
1119 	DPRINTF("opened database version %u, pagesize %u",
1120 	    bt->head.version, bt->head.psize);
1121 	DPRINTF("timestamp: %s", ctime(&bt->meta.created_at));
1122 	DPRINTF("depth: %u", bt->meta.depth);
1123 	DPRINTF("entries: %llu", bt->meta.entries);
1124 	DPRINTF("revisions: %u", bt->meta.revisions);
1125 	DPRINTF("branch pages: %u", bt->meta.branch_pages);
1126 	DPRINTF("leaf pages: %u", bt->meta.leaf_pages);
1127 	DPRINTF("overflow pages: %u", bt->meta.overflow_pages);
1128 	DPRINTF("root: %u", bt->meta.root);
1129 	DPRINTF("previous meta page: %u", bt->meta.prev_meta);
1130 
1131 	return bt;
1132 
1133 fail:
1134 	free(bt->lru_queue);
1135 	free(bt->page_cache);
1136 	free(bt);
1137 	return NULL;
1138 }
1139 
1140 struct btree *
btree_open(const char * path,unsigned int flags,mode_t mode)1141 btree_open(const char *path, unsigned int flags, mode_t mode)
1142 {
1143 	int		 fd, oflags;
1144 	struct btree	*bt;
1145 
1146 	if (F_ISSET(flags, BT_RDONLY))
1147 		oflags = O_RDONLY;
1148 	else
1149 		oflags = O_RDWR | O_CREAT | O_APPEND;
1150 
1151 	if ((fd = open(path, oflags, mode)) == -1)
1152 		return NULL;
1153 
1154 	if ((bt = btree_open_fd(fd, flags)) == NULL)
1155 		close(fd);
1156 	else {
1157 		bt->path = strdup(path);
1158 		DPRINTF("opened btree %p", bt);
1159 	}
1160 
1161 	return bt;
1162 }
1163 
1164 static void
btree_ref(struct btree * bt)1165 btree_ref(struct btree *bt)
1166 {
1167 	bt->ref++;
1168 	DPRINTF("ref is now %d on btree %p", bt->ref, bt);
1169 }
1170 
1171 void
btree_close(struct btree * bt)1172 btree_close(struct btree *bt)
1173 {
1174 	if (bt == NULL)
1175 		return;
1176 
1177 	if (--bt->ref == 0) {
1178 		DPRINTF("ref is zero, closing btree %p", bt);
1179 		close(bt->fd);
1180 		mpage_flush(bt);
1181 		free(bt->lru_queue);
1182 		free(bt->path);
1183 		free(bt->page_cache);
1184 		free(bt);
1185 	} else
1186 		DPRINTF("ref is now %d on btree %p", bt->ref, bt);
1187 }
1188 
1189 /* Search for key within a leaf page, using binary search.
1190  * Returns the smallest entry larger or equal to the key.
1191  * If exactp is non-null, stores whether the found entry was an exact match
1192  * in *exactp (1 or 0).
1193  * If kip is non-null, stores the index of the found entry in *kip.
1194  * If no entry larger of equal to the key is found, returns NULL.
1195  */
1196 static struct node *
btree_search_node(struct btree * bt,struct mpage * mp,struct btval * key,int * exactp,unsigned int * kip)1197 btree_search_node(struct btree *bt, struct mpage *mp, struct btval *key,
1198     int *exactp, unsigned int *kip)
1199 {
1200 	unsigned int	 i = 0;
1201 	int		 low, high;
1202 	int		 rc = 0;
1203 	struct node	*node;
1204 	struct btval	 nodekey;
1205 
1206 	DPRINTF("searching %lu keys in %s page %u with prefix [%.*s]",
1207 	    NUMKEYS(mp),
1208 	    IS_LEAF(mp) ? "leaf" : "branch",
1209 	    mp->pgno, (int)mp->prefix.len, (char *)mp->prefix.str);
1210 
1211 	assert(NUMKEYS(mp) > 0);
1212 
1213 	memset(&nodekey, 0, sizeof(nodekey));
1214 
1215 	low = IS_LEAF(mp) ? 0 : 1;
1216 	high = NUMKEYS(mp) - 1;
1217 	while (low <= high) {
1218 		i = (low + high) >> 1;
1219 		node = NODEPTR(mp, i);
1220 
1221 		nodekey.size = node->ksize;
1222 		nodekey.data = NODEKEY(node);
1223 
1224 		if (bt->cmp)
1225 			rc = bt->cmp(key, &nodekey);
1226 		else
1227 			rc = bt_cmp(bt, key, &nodekey, &mp->prefix);
1228 
1229 		if (IS_LEAF(mp))
1230 			DPRINTF("found leaf index %u [%.*s], rc = %d",
1231 			    i, (int)nodekey.size, (char *)nodekey.data, rc);
1232 		else
1233 			DPRINTF("found branch index %u [%.*s -> %u], rc = %d",
1234 			    i, (int)node->ksize, (char *)NODEKEY(node),
1235 			    node->n_pgno, rc);
1236 
1237 		if (rc == 0)
1238 			break;
1239 		if (rc > 0)
1240 			low = i + 1;
1241 		else
1242 			high = i - 1;
1243 	}
1244 
1245 	if (rc > 0) {	/* Found entry is less than the key. */
1246 		i++;	/* Skip to get the smallest entry larger than key. */
1247 		if (i >= NUMKEYS(mp))
1248 			/* There is no entry larger or equal to the key. */
1249 			return NULL;
1250 	}
1251 	if (exactp)
1252 		*exactp = (rc == 0);
1253 	if (kip)	/* Store the key index if requested. */
1254 		*kip = i;
1255 
1256 	return NODEPTR(mp, i);
1257 }
1258 
1259 static void
cursor_pop_page(struct cursor * cursor)1260 cursor_pop_page(struct cursor *cursor)
1261 {
1262 	struct ppage	*top;
1263 
1264 	top = CURSOR_TOP(cursor);
1265 	CURSOR_POP(cursor);
1266 	top->mpage->ref--;
1267 
1268 	DPRINTF("popped page %u off cursor %p", top->mpage->pgno, cursor);
1269 
1270 	free(top);
1271 }
1272 
1273 static struct ppage *
cursor_push_page(struct cursor * cursor,struct mpage * mp)1274 cursor_push_page(struct cursor *cursor, struct mpage *mp)
1275 {
1276 	struct ppage	*ppage;
1277 
1278 	DPRINTF("pushing page %u on cursor %p", mp->pgno, cursor);
1279 
1280 	if ((ppage = calloc(1, sizeof(*ppage))) == NULL)
1281 		return NULL;
1282 	ppage->mpage = mp;
1283 	mp->ref++;
1284 	CURSOR_PUSH(cursor, ppage);
1285 	return ppage;
1286 }
1287 
1288 static struct mpage *
btree_get_mpage(struct btree * bt,pgno_t pgno)1289 btree_get_mpage(struct btree *bt, pgno_t pgno)
1290 {
1291 	struct mpage	*mp;
1292 
1293 	mp = mpage_lookup(bt, pgno);
1294 	if (mp == NULL) {
1295 		if ((mp = calloc(1, sizeof(*mp))) == NULL)
1296 			return NULL;
1297 		if ((mp->page = malloc(bt->head.psize)) == NULL) {
1298 			free(mp);
1299 			return NULL;
1300 		}
1301 		if (btree_read_page(bt, pgno, mp->page) != BT_SUCCESS) {
1302 			mpage_free(mp);
1303 			return NULL;
1304 		}
1305 		mp->pgno = pgno;
1306 		mpage_add(bt, mp);
1307 	} else
1308 		DPRINTF("returning page %u from cache", pgno);
1309 
1310 	return mp;
1311 }
1312 
1313 static void
concat_prefix(struct btree * bt,char * s1,size_t n1,char * s2,size_t n2,char * cs,size_t * cn)1314 concat_prefix(struct btree *bt, char *s1, size_t n1, char *s2, size_t n2,
1315     char *cs, size_t *cn)
1316 {
1317 	assert(*cn >= n1 + n2);
1318 	if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
1319 		bcopy(s2, cs, n2);
1320 		bcopy(s1, cs + n2, n1);
1321 	} else {
1322 		bcopy(s1, cs, n1);
1323 		bcopy(s2, cs + n1, n2);
1324 	}
1325 	*cn = n1 + n2;
1326 }
1327 
1328 static void
find_common_prefix(struct btree * bt,struct mpage * mp)1329 find_common_prefix(struct btree *bt, struct mpage *mp)
1330 {
1331 	indx_t			 lbound = 0, ubound = 0;
1332 	struct mpage		*lp, *up;
1333 	struct btkey		 lprefix, uprefix;
1334 
1335 	mp->prefix.len = 0;
1336 	if (bt->cmp != NULL)
1337 		return;
1338 
1339 	lp = mp;
1340 	while (lp->parent != NULL) {
1341 		if (lp->parent_index > 0) {
1342 			lbound = lp->parent_index;
1343 			break;
1344 		}
1345 		lp = lp->parent;
1346 	}
1347 
1348 	up = mp;
1349 	while (up->parent != NULL) {
1350 		if (up->parent_index + 1 < (indx_t)NUMKEYS(up->parent)) {
1351 			ubound = up->parent_index + 1;
1352 			break;
1353 		}
1354 		up = up->parent;
1355 	}
1356 
1357 	if (lp->parent != NULL && up->parent != NULL) {
1358 		expand_prefix(bt, lp->parent, lbound, &lprefix);
1359 		expand_prefix(bt, up->parent, ubound, &uprefix);
1360 		common_prefix(bt, &lprefix, &uprefix, &mp->prefix);
1361 	}
1362 	else if (mp->parent)
1363 		bcopy(&mp->parent->prefix, &mp->prefix, sizeof(mp->prefix));
1364 
1365 	DPRINTF("found common prefix [%.*s] (len %zu) for page %u",
1366 	    (int)mp->prefix.len, mp->prefix.str, mp->prefix.len, mp->pgno);
1367 }
1368 
1369 static int
btree_search_page_root(struct btree * bt,struct mpage * root,struct btval * key,struct cursor * cursor,int modify,struct mpage ** mpp)1370 btree_search_page_root(struct btree *bt, struct mpage *root, struct btval *key,
1371     struct cursor *cursor, int modify, struct mpage **mpp)
1372 {
1373 	struct mpage	*mp, *parent;
1374 
1375 	if (cursor && cursor_push_page(cursor, root) == NULL)
1376 		return BT_FAIL;
1377 
1378 	mp = root;
1379 	while (IS_BRANCH(mp)) {
1380 		unsigned int	 i = 0;
1381 		struct node	*node;
1382 
1383 		DPRINTF("branch page %u has %lu keys", mp->pgno, NUMKEYS(mp));
1384 		assert(NUMKEYS(mp) > 1);
1385 		DPRINTF("found index 0 to page %u", NODEPGNO(NODEPTR(mp, 0)));
1386 
1387 		if (key == NULL)	/* Initialize cursor to first page. */
1388 			i = 0;
1389 		else {
1390 			int	 exact;
1391 			node = btree_search_node(bt, mp, key, &exact, &i);
1392 			if (node == NULL)
1393 				i = NUMKEYS(mp) - 1;
1394 			else if (!exact) {
1395 				assert(i > 0);
1396 				i--;
1397 			}
1398 		}
1399 
1400 		if (key)
1401 			DPRINTF("following index %u for key %.*s",
1402 			    i, (int)key->size, (char *)key->data);
1403 		assert(i >= 0 && i < NUMKEYS(mp));
1404 		node = NODEPTR(mp, i);
1405 
1406 		if (cursor)
1407 			CURSOR_TOP(cursor)->ki = i;
1408 
1409 		parent = mp;
1410 		if ((mp = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
1411 			return BT_FAIL;
1412 		mp->parent = parent;
1413 		mp->parent_index = i;
1414 		find_common_prefix(bt, mp);
1415 
1416 		if (cursor && cursor_push_page(cursor, mp) == NULL)
1417 			return BT_FAIL;
1418 
1419 		if (modify && (mp = mpage_touch(bt, mp)) == NULL)
1420 			return BT_FAIL;
1421 	}
1422 
1423 	if (!IS_LEAF(mp)) {
1424 		DPRINTF("internal error, index points to a %02X page!?",
1425 		    mp->page->flags);
1426 		return BT_FAIL;
1427 	}
1428 
1429 	DPRINTF("found leaf page %u for key %.*s", mp->pgno,
1430 	    key ? (int)key->size : 0, key ? (char *)key->data : NULL);
1431 
1432 	*mpp = mp;
1433 	return BT_SUCCESS;
1434 }
1435 
1436 /* Search for the page a given key should be in.
1437  * Stores a pointer to the found page in *mpp.
1438  * If key is NULL, search for the lowest page (used by btree_cursor_first).
1439  * If cursor is non-null, pushes parent pages on the cursor stack.
1440  * If modify is true, visited pages are updated with new page numbers.
1441  */
1442 static int
btree_search_page(struct btree * bt,struct btree_txn * txn,struct btval * key,struct cursor * cursor,int modify,struct mpage ** mpp)1443 btree_search_page(struct btree *bt, struct btree_txn *txn, struct btval *key,
1444     struct cursor *cursor, int modify, struct mpage **mpp)
1445 {
1446 	int		 rc;
1447 	pgno_t		 root;
1448 	struct mpage	*mp;
1449 
1450 	/* Can't modify pages outside a transaction. */
1451 	if (txn == NULL && modify) {
1452 		errno = EINVAL;
1453 		return BT_FAIL;
1454 	}
1455 
1456 	/* Choose which root page to start with. If a transaction is given
1457          * use the root page from the transaction, otherwise read the last
1458          * committed root page.
1459 	 */
1460 	if (txn == NULL) {
1461 		if ((rc = btree_read_meta(bt, NULL)) != BT_SUCCESS)
1462 			return rc;
1463 		root = bt->meta.root;
1464 	} else if (F_ISSET(txn->flags, BT_TXN_ERROR)) {
1465 		DPRINTF("transaction has failed, must abort");
1466 		errno = EINVAL;
1467 		return BT_FAIL;
1468 	} else
1469 		root = txn->root;
1470 
1471 	if (root == P_INVALID) {		/* Tree is empty. */
1472 		DPRINTF("tree is empty");
1473 		errno = ENOENT;
1474 		return BT_FAIL;
1475 	}
1476 
1477 	if ((mp = btree_get_mpage(bt, root)) == NULL)
1478 		return BT_FAIL;
1479 
1480 	DPRINTF("root page has flags 0x%X", mp->page->flags);
1481 
1482 	assert(mp->parent == NULL);
1483 	assert(mp->prefix.len == 0);
1484 
1485 	if (modify && !mp->dirty) {
1486 		if ((mp = mpage_touch(bt, mp)) == NULL)
1487 			return BT_FAIL;
1488 		txn->root = mp->pgno;
1489 	}
1490 
1491 	return btree_search_page_root(bt, mp, key, cursor, modify, mpp);
1492 }
1493 
1494 static int
btree_read_data(struct btree * bt,struct mpage * mp,struct node * leaf,struct btval * data)1495 btree_read_data(struct btree *bt, struct mpage *mp, struct node *leaf,
1496     struct btval *data)
1497 {
1498 	struct mpage	*omp;		/* overflow mpage */
1499 	size_t		 psz;
1500 	size_t		 max;
1501 	size_t		 sz = 0;
1502 	pgno_t		 pgno;
1503 
1504 	memset(data, 0, sizeof(*data));
1505 	max = bt->head.psize - PAGEHDRSZ;
1506 
1507 	if (!F_ISSET(leaf->flags, F_BIGDATA)) {
1508 		data->size = leaf->n_dsize;
1509 		if (data->size > 0) {
1510 			if (mp == NULL) {
1511 				if ((data->data = malloc(data->size)) == NULL)
1512 					return BT_FAIL;
1513 				bcopy(NODEDATA(leaf), data->data, data->size);
1514 				data->free_data = 1;
1515 				data->mp = NULL;
1516 			} else {
1517 				data->data = NODEDATA(leaf);
1518 				data->free_data = 0;
1519 				data->mp = mp;
1520 				mp->ref++;
1521 			}
1522 		}
1523 		return BT_SUCCESS;
1524 	}
1525 
1526 	/* Read overflow data.
1527 	 */
1528 	DPRINTF("allocating %u byte for overflow data", leaf->n_dsize);
1529 	if ((data->data = malloc(leaf->n_dsize)) == NULL)
1530 		return BT_FAIL;
1531 	data->size = leaf->n_dsize;
1532 	data->free_data = 1;
1533 	data->mp = NULL;
1534 	bcopy(NODEDATA(leaf), &pgno, sizeof(pgno));
1535 	for (sz = 0; sz < data->size; ) {
1536 		if ((omp = btree_get_mpage(bt, pgno)) == NULL ||
1537 		    !F_ISSET(omp->page->flags, P_OVERFLOW)) {
1538 			DPRINTF("read overflow page %u failed", pgno);
1539 			free(data->data);
1540 			mpage_free(omp);
1541 			return BT_FAIL;
1542 		}
1543 		psz = data->size - sz;
1544 		if (psz > max)
1545 			psz = max;
1546 		bcopy(omp->page->ptrs, (char *)data->data + sz, psz);
1547 		sz += psz;
1548 		pgno = omp->page->p_next_pgno;
1549 	}
1550 
1551 	return BT_SUCCESS;
1552 }
1553 
1554 int
btree_txn_get(struct btree * bt,struct btree_txn * txn,struct btval * key,struct btval * data)1555 btree_txn_get(struct btree *bt, struct btree_txn *txn,
1556     struct btval *key, struct btval *data)
1557 {
1558 	int		 rc, exact;
1559 	struct node	*leaf;
1560 	struct mpage	*mp;
1561 
1562 	assert(key);
1563 	assert(data);
1564 	DPRINTF("===> get key [%.*s]", (int)key->size, (char *)key->data);
1565 
1566 	if (bt != NULL && txn != NULL && bt != txn->bt) {
1567 		errno = EINVAL;
1568 		return BT_FAIL;
1569 	}
1570 
1571 	if (bt == NULL) {
1572 		if (txn == NULL) {
1573 			errno = EINVAL;
1574 			return BT_FAIL;
1575 		}
1576 		bt = txn->bt;
1577 	}
1578 
1579 	if (key->size == 0 || key->size > MAXKEYSIZE) {
1580 		errno = EINVAL;
1581 		return BT_FAIL;
1582 	}
1583 
1584 	if ((rc = btree_search_page(bt, txn, key, NULL, 0, &mp)) != BT_SUCCESS)
1585 		return rc;
1586 
1587 	leaf = btree_search_node(bt, mp, key, &exact, NULL);
1588 	if (leaf && exact)
1589 		rc = btree_read_data(bt, mp, leaf, data);
1590 	else {
1591 		errno = ENOENT;
1592 		rc = BT_FAIL;
1593 	}
1594 
1595 	mpage_prune(bt);
1596 	return rc;
1597 }
1598 
1599 static int
btree_sibling(struct cursor * cursor,int move_right)1600 btree_sibling(struct cursor *cursor, int move_right)
1601 {
1602 	int		 rc;
1603 	struct node	*indx;
1604 	struct ppage	*parent, *top;
1605 	struct mpage	*mp;
1606 
1607 	top = CURSOR_TOP(cursor);
1608 	if ((parent = SLIST_NEXT(top, entry)) == NULL) {
1609 		errno = ENOENT;
1610 		return BT_FAIL;			/* root has no siblings */
1611 	}
1612 
1613 	DPRINTF("parent page is page %u, index %u",
1614 	    parent->mpage->pgno, parent->ki);
1615 
1616 	cursor_pop_page(cursor);
1617 	if (move_right ? (parent->ki + 1 >= NUMKEYS(parent->mpage))
1618 		       : (parent->ki == 0)) {
1619 		DPRINTF("no more keys left, moving to %s sibling",
1620 		    move_right ? "right" : "left");
1621 		if ((rc = btree_sibling(cursor, move_right)) != BT_SUCCESS)
1622 			return rc;
1623 		parent = CURSOR_TOP(cursor);
1624 	} else {
1625 		if (move_right)
1626 			parent->ki++;
1627 		else
1628 			parent->ki--;
1629 		DPRINTF("just moving to %s index key %u",
1630 		    move_right ? "right" : "left", parent->ki);
1631 	}
1632 	assert(IS_BRANCH(parent->mpage));
1633 
1634 	indx = NODEPTR(parent->mpage, parent->ki);
1635 	if ((mp = btree_get_mpage(cursor->bt, indx->n_pgno)) == NULL)
1636 		return BT_FAIL;
1637 	mp->parent = parent->mpage;
1638 	mp->parent_index = parent->ki;
1639 
1640 	cursor_push_page(cursor, mp);
1641 	find_common_prefix(cursor->bt, mp);
1642 
1643 	return BT_SUCCESS;
1644 }
1645 
1646 static int
bt_set_key(struct btree * bt,struct mpage * mp,struct node * node,struct btval * key)1647 bt_set_key(struct btree *bt, struct mpage *mp, struct node *node,
1648     struct btval *key)
1649 {
1650 	if (key == NULL)
1651 		return 0;
1652 
1653 	if (mp->prefix.len > 0) {
1654 		key->size = node->ksize + mp->prefix.len;
1655 		key->data = malloc(key->size);
1656 		if (key->data == NULL)
1657 			return -1;
1658 		concat_prefix(bt,
1659 		    mp->prefix.str, mp->prefix.len,
1660 		    NODEKEY(node), node->ksize,
1661 		    key->data, &key->size);
1662 		key->free_data = 1;
1663 	} else {
1664 		key->size = node->ksize;
1665 		key->data = NODEKEY(node);
1666 		key->free_data = 0;
1667 		key->mp = mp;
1668 		mp->ref++;
1669 	}
1670 
1671 	return 0;
1672 }
1673 
1674 static int
btree_cursor_next(struct cursor * cursor,struct btval * key,struct btval * data)1675 btree_cursor_next(struct cursor *cursor, struct btval *key, struct btval *data)
1676 {
1677 	struct ppage	*top;
1678 	struct mpage	*mp;
1679 	struct node	*leaf;
1680 
1681 	if (cursor->eof) {
1682 		errno = ENOENT;
1683 		return BT_FAIL;
1684 	}
1685 
1686 	assert(cursor->initialized);
1687 
1688 	top = CURSOR_TOP(cursor);
1689 	mp = top->mpage;
1690 
1691 	DPRINTF("cursor_next: top page is %u in cursor %p", mp->pgno, cursor);
1692 
1693 	if (top->ki + 1 >= NUMKEYS(mp)) {
1694 		DPRINTF("=====> move to next sibling page");
1695 		if (btree_sibling(cursor, 1) != BT_SUCCESS) {
1696 			cursor->eof = 1;
1697 			return BT_FAIL;
1698 		}
1699 		top = CURSOR_TOP(cursor);
1700 		mp = top->mpage;
1701 		DPRINTF("next page is %u, key index %u", mp->pgno, top->ki);
1702 	} else
1703 		top->ki++;
1704 
1705 	DPRINTF("==> cursor points to page %u with %lu keys, key index %u",
1706 	    mp->pgno, NUMKEYS(mp), top->ki);
1707 
1708 	assert(IS_LEAF(mp));
1709 	leaf = NODEPTR(mp, top->ki);
1710 
1711 	if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1712 		return BT_FAIL;
1713 
1714 	if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1715 		return BT_FAIL;
1716 
1717 	return BT_SUCCESS;
1718 }
1719 
1720 static int
btree_cursor_set(struct cursor * cursor,struct btval * key,struct btval * data,int * exactp)1721 btree_cursor_set(struct cursor *cursor, struct btval *key, struct btval *data,
1722     int *exactp)
1723 {
1724 	int		 rc;
1725 	struct node	*leaf;
1726 	struct mpage	*mp;
1727 	struct ppage	*top;
1728 
1729 	assert(cursor);
1730 	assert(key);
1731 	assert(key->size > 0);
1732 
1733 	rc = btree_search_page(cursor->bt, cursor->txn, key, cursor, 0, &mp);
1734 	if (rc != BT_SUCCESS)
1735 		return rc;
1736 	assert(IS_LEAF(mp));
1737 
1738 	top = CURSOR_TOP(cursor);
1739 	leaf = btree_search_node(cursor->bt, mp, key, exactp, &top->ki);
1740 	if (exactp != NULL && !*exactp) {
1741 		/* BT_CURSOR_EXACT specified and not an exact match. */
1742 		errno = ENOENT;
1743 		return BT_FAIL;
1744 	}
1745 
1746 	if (leaf == NULL) {
1747 		DPRINTF("===> inexact leaf not found, goto sibling");
1748 		if (btree_sibling(cursor, 1) != BT_SUCCESS)
1749 			return BT_FAIL;		/* no entries matched */
1750 		top = CURSOR_TOP(cursor);
1751 		top->ki = 0;
1752 		mp = top->mpage;
1753 		assert(IS_LEAF(mp));
1754 		leaf = NODEPTR(mp, 0);
1755 	}
1756 
1757 	cursor->initialized = 1;
1758 	cursor->eof = 0;
1759 
1760 	if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1761 		return BT_FAIL;
1762 
1763 	if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1764 		return BT_FAIL;
1765 	DPRINTF("==> cursor placed on key %.*s",
1766 	    (int)key->size, (char *)key->data);
1767 
1768 	return BT_SUCCESS;
1769 }
1770 
1771 static int
btree_cursor_first(struct cursor * cursor,struct btval * key,struct btval * data)1772 btree_cursor_first(struct cursor *cursor, struct btval *key, struct btval *data)
1773 {
1774 	int		 rc;
1775 	struct mpage	*mp;
1776 	struct node	*leaf;
1777 
1778 	rc = btree_search_page(cursor->bt, cursor->txn, NULL, cursor, 0, &mp);
1779 	if (rc != BT_SUCCESS)
1780 		return rc;
1781 	assert(IS_LEAF(mp));
1782 
1783 	leaf = NODEPTR(mp, 0);
1784 	cursor->initialized = 1;
1785 	cursor->eof = 0;
1786 
1787 	if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1788 		return BT_FAIL;
1789 
1790 	if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1791 		return BT_FAIL;
1792 
1793 	return BT_SUCCESS;
1794 }
1795 
1796 int
btree_cursor_get(struct cursor * cursor,struct btval * key,struct btval * data,enum cursor_op op)1797 btree_cursor_get(struct cursor *cursor, struct btval *key, struct btval *data,
1798     enum cursor_op op)
1799 {
1800 	int		 rc;
1801 	int		 exact = 0;
1802 
1803 	assert(cursor);
1804 
1805 	switch (op) {
1806 	case BT_CURSOR:
1807 	case BT_CURSOR_EXACT:
1808 		while (CURSOR_TOP(cursor) != NULL)
1809 			cursor_pop_page(cursor);
1810 		if (key == NULL || key->size == 0 || key->size > MAXKEYSIZE) {
1811 			errno = EINVAL;
1812 			rc = BT_FAIL;
1813 		} else if (op == BT_CURSOR_EXACT)
1814 			rc = btree_cursor_set(cursor, key, data, &exact);
1815 		else
1816 			rc = btree_cursor_set(cursor, key, data, NULL);
1817 		break;
1818 	case BT_NEXT:
1819 		if (!cursor->initialized)
1820 			rc = btree_cursor_first(cursor, key, data);
1821 		else
1822 			rc = btree_cursor_next(cursor, key, data);
1823 		break;
1824 	case BT_FIRST:
1825 		while (CURSOR_TOP(cursor) != NULL)
1826 			cursor_pop_page(cursor);
1827 		rc = btree_cursor_first(cursor, key, data);
1828 		break;
1829 	default:
1830 		DPRINTF("unhandled/unimplemented cursor operation %u", op);
1831 		rc = BT_FAIL;
1832 		break;
1833 	}
1834 
1835 	mpage_prune(cursor->bt);
1836 
1837 	return rc;
1838 }
1839 
1840 static struct mpage *
btree_new_page(struct btree * bt,uint32_t flags)1841 btree_new_page(struct btree *bt, uint32_t flags)
1842 {
1843 	struct mpage	*mp;
1844 
1845 	assert(bt != NULL);
1846 	assert(bt->txn != NULL);
1847 
1848 	DPRINTF("allocating new mpage %u, page size %u",
1849 	    bt->txn->next_pgno, bt->head.psize);
1850 	if ((mp = calloc(1, sizeof(*mp))) == NULL)
1851 		return NULL;
1852 	if ((mp->page = malloc(bt->head.psize)) == NULL) {
1853 		free(mp);
1854 		return NULL;
1855 	}
1856 	mp->pgno = mp->page->pgno = bt->txn->next_pgno++;
1857 	mp->page->flags = flags;
1858 	mp->page->lower = PAGEHDRSZ;
1859 	mp->page->upper = bt->head.psize;
1860 
1861 	if (IS_BRANCH(mp))
1862 		bt->meta.branch_pages++;
1863 	else if (IS_LEAF(mp))
1864 		bt->meta.leaf_pages++;
1865 	else if (IS_OVERFLOW(mp))
1866 		bt->meta.overflow_pages++;
1867 
1868 	mpage_add(bt, mp);
1869 	mpage_dirty(bt, mp);
1870 
1871 	return mp;
1872 }
1873 
1874 static size_t
bt_leaf_size(struct btree * bt,struct btval * key,struct btval * data)1875 bt_leaf_size(struct btree *bt, struct btval *key, struct btval *data)
1876 {
1877 	size_t		 sz;
1878 
1879 	sz = LEAFSIZE(key, data);
1880 	if (data->size >= bt->head.psize / BT_MINKEYS) {
1881 		/* put on overflow page */
1882 		sz -= data->size - sizeof(pgno_t);
1883 	}
1884 
1885 	return sz + sizeof(indx_t);
1886 }
1887 
1888 static size_t
bt_branch_size(struct btree * bt,struct btval * key)1889 bt_branch_size(struct btree *bt, struct btval *key)
1890 {
1891 	size_t		 sz;
1892 
1893 	sz = INDXSIZE(key);
1894 	if (sz >= bt->head.psize / BT_MINKEYS) {
1895 		/* put on overflow page */
1896 		/* not implemented */
1897 		/* sz -= key->size - sizeof(pgno_t); */
1898 	}
1899 
1900 	return sz + sizeof(indx_t);
1901 }
1902 
1903 static int
btree_write_overflow_data(struct btree * bt,struct page * p,struct btval * data)1904 btree_write_overflow_data(struct btree *bt, struct page *p, struct btval *data)
1905 {
1906 	size_t		 done = 0;
1907 	size_t		 sz;
1908 	size_t		 max;
1909 	pgno_t		*linkp;			/* linked page stored here */
1910 	struct mpage	*next = NULL;
1911 
1912 	max = bt->head.psize - PAGEHDRSZ;
1913 
1914 	while (done < data->size) {
1915 		if (next != NULL)
1916 			p = next->page;
1917 		linkp = &p->p_next_pgno;
1918 		if (data->size - done > max) {
1919 			/* need another overflow page */
1920 			if ((next = btree_new_page(bt, P_OVERFLOW)) == NULL)
1921 				return BT_FAIL;
1922 			*linkp = next->pgno;
1923 			DPRINTF("linking overflow page %u", next->pgno);
1924 		} else
1925 			*linkp = 0;		/* indicates end of list */
1926 		sz = data->size - done;
1927 		if (sz > max)
1928 			sz = max;
1929 		DPRINTF("copying %zu bytes to overflow page %u", sz, p->pgno);
1930 		bcopy((char *)data->data + done, p->ptrs, sz);
1931 		done += sz;
1932 	}
1933 
1934 	return BT_SUCCESS;
1935 }
1936 
1937 /* Key prefix should already be stripped.
1938  */
1939 static int
btree_add_node(struct btree * bt,struct mpage * mp,indx_t indx,struct btval * key,struct btval * data,pgno_t pgno,uint8_t flags)1940 btree_add_node(struct btree *bt, struct mpage *mp, indx_t indx,
1941     struct btval *key, struct btval *data, pgno_t pgno, uint8_t flags)
1942 {
1943 	unsigned int	 i;
1944 	size_t		 node_size = NODESIZE;
1945 	indx_t		 ofs;
1946 	struct node	*node;
1947 	struct page	*p;
1948 	struct mpage	*ofp = NULL;		/* overflow page */
1949 
1950 	p = mp->page;
1951 	assert(p->upper >= p->lower);
1952 
1953 	DPRINTF("add node [%.*s] to %s page %u at index %d, key size %zu",
1954 	    key ? (int)key->size : 0, key ? (char *)key->data : NULL,
1955 	    IS_LEAF(mp) ? "leaf" : "branch",
1956 	    mp->pgno, indx, key ? key->size : 0);
1957 
1958 	if (key != NULL)
1959 		node_size += key->size;
1960 
1961 	if (IS_LEAF(mp)) {
1962 		assert(data);
1963 		node_size += data->size;
1964 		if (F_ISSET(flags, F_BIGDATA)) {
1965 			/* Data already on overflow page. */
1966 			node_size -= data->size - sizeof(pgno_t);
1967 		} else if (data->size >= bt->head.psize / BT_MINKEYS) {
1968 			/* Put data on overflow page. */
1969 			DPRINTF("data size is %zu, put on overflow page",
1970 			    data->size);
1971 			node_size -= data->size - sizeof(pgno_t);
1972 			if ((ofp = btree_new_page(bt, P_OVERFLOW)) == NULL)
1973 				return BT_FAIL;
1974 			DPRINTF("allocated overflow page %u", ofp->pgno);
1975 			flags |= F_BIGDATA;
1976 		}
1977 	}
1978 
1979 	if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
1980 		DPRINTF("not enough room in page %u, got %lu ptrs",
1981 		    mp->pgno, NUMKEYS(mp));
1982 		DPRINTF("upper - lower = %u - %u = %u", p->upper, p->lower,
1983 		    p->upper - p->lower);
1984 		DPRINTF("node size = %zu", node_size);
1985 		return BT_FAIL;
1986 	}
1987 
1988 	/* Move higher pointers up one slot. */
1989 	for (i = NUMKEYS(mp); i > indx; i--)
1990 		p->ptrs[i] = p->ptrs[i - 1];
1991 
1992 	/* Adjust free space offsets. */
1993 	ofs = p->upper - node_size;
1994 	assert(ofs >= p->lower + sizeof(indx_t));
1995 	p->ptrs[indx] = ofs;
1996 	p->upper = ofs;
1997 	p->lower += sizeof(indx_t);
1998 
1999 	/* Write the node data. */
2000 	node = NODEPTR(mp, indx);
2001 	node->ksize = (key == NULL) ? 0 : key->size;
2002 	node->flags = flags;
2003 	if (IS_LEAF(mp))
2004 		node->n_dsize = data->size;
2005 	else
2006 		node->n_pgno = pgno;
2007 
2008 	if (key)
2009 		bcopy(key->data, NODEKEY(node), key->size);
2010 
2011 	if (IS_LEAF(mp)) {
2012 		assert(key);
2013 		if (ofp == NULL) {
2014 			if (F_ISSET(flags, F_BIGDATA))
2015 				bcopy(data->data, node->data + key->size,
2016 				    sizeof(pgno_t));
2017 			else
2018 				bcopy(data->data, node->data + key->size,
2019 				    data->size);
2020 		} else {
2021 			bcopy(&ofp->pgno, node->data + key->size,
2022 			    sizeof(pgno_t));
2023 			if (btree_write_overflow_data(bt, ofp->page,
2024 			    data) == BT_FAIL)
2025 				return BT_FAIL;
2026 		}
2027 	}
2028 
2029 	return BT_SUCCESS;
2030 }
2031 
2032 static void
btree_del_node(struct btree * bt,struct mpage * mp,indx_t indx)2033 btree_del_node(struct btree *bt, struct mpage *mp, indx_t indx)
2034 {
2035 	unsigned int	 sz;
2036 	indx_t		 i, j, numkeys, ptr;
2037 	struct node	*node;
2038 	char		*base;
2039 
2040 	DPRINTF("delete node %u on %s page %u", indx,
2041 	    IS_LEAF(mp) ? "leaf" : "branch", mp->pgno);
2042 	assert(indx < NUMKEYS(mp));
2043 
2044 	node = NODEPTR(mp, indx);
2045 	sz = NODESIZE + node->ksize;
2046 	if (IS_LEAF(mp)) {
2047 		if (F_ISSET(node->flags, F_BIGDATA))
2048 			sz += sizeof(pgno_t);
2049 		else
2050 			sz += NODEDSZ(node);
2051 	}
2052 
2053 	ptr = mp->page->ptrs[indx];
2054 	numkeys = NUMKEYS(mp);
2055 	for (i = j = 0; i < numkeys; i++) {
2056 		if (i != indx) {
2057 			mp->page->ptrs[j] = mp->page->ptrs[i];
2058 			if (mp->page->ptrs[i] < ptr)
2059 				mp->page->ptrs[j] += sz;
2060 			j++;
2061 		}
2062 	}
2063 
2064 	base = (char *)mp->page + mp->page->upper;
2065 	bcopy(base, base + sz, ptr - mp->page->upper);
2066 
2067 	mp->page->lower -= sizeof(indx_t);
2068 	mp->page->upper += sz;
2069 }
2070 
2071 struct cursor *
btree_txn_cursor_open(struct btree * bt,struct btree_txn * txn)2072 btree_txn_cursor_open(struct btree *bt, struct btree_txn *txn)
2073 {
2074 	struct cursor	*cursor;
2075 
2076 	if (bt != NULL && txn != NULL && bt != txn->bt) {
2077 		errno = EINVAL;
2078 		return NULL;
2079 	}
2080 
2081 	if (bt == NULL) {
2082 		if (txn == NULL) {
2083 			errno = EINVAL;
2084 			return NULL;
2085 		}
2086 		bt = txn->bt;
2087 	}
2088 
2089 	if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
2090 		SLIST_INIT(&cursor->stack);
2091 		cursor->bt = bt;
2092 		cursor->txn = txn;
2093 		btree_ref(bt);
2094 	}
2095 
2096 	return cursor;
2097 }
2098 
2099 void
btree_cursor_close(struct cursor * cursor)2100 btree_cursor_close(struct cursor *cursor)
2101 {
2102 	if (cursor != NULL) {
2103 		while (!CURSOR_EMPTY(cursor))
2104 			cursor_pop_page(cursor);
2105 
2106 		btree_close(cursor->bt);
2107 		free(cursor);
2108 	}
2109 }
2110 
2111 static int
btree_update_key(struct btree * bt,struct mpage * mp,indx_t indx,struct btval * key)2112 btree_update_key(struct btree *bt, struct mpage *mp, indx_t indx,
2113     struct btval *key)
2114 {
2115 	indx_t			 ptr, i, numkeys;
2116 	int			 delta;
2117 	size_t			 len;
2118 	struct node		*node;
2119 	char			*base;
2120 
2121 	node = NODEPTR(mp, indx);
2122 	ptr = mp->page->ptrs[indx];
2123 	DPRINTF("update key %u (ofs %u) [%.*s] to [%.*s] on page %u",
2124 	    indx, ptr,
2125 	    (int)node->ksize, (char *)NODEKEY(node),
2126 	    (int)key->size, (char *)key->data,
2127 	    mp->pgno);
2128 
2129 	if (key->size != node->ksize) {
2130 		delta = key->size - node->ksize;
2131 		if (delta > 0 && SIZELEFT(mp) < delta) {
2132 			DPRINTF("OUCH! Not enough room, delta = %d", delta);
2133 			return BT_FAIL;
2134 		}
2135 
2136 		numkeys = NUMKEYS(mp);
2137 		for (i = 0; i < numkeys; i++) {
2138 			if (mp->page->ptrs[i] <= ptr)
2139 				mp->page->ptrs[i] -= delta;
2140 		}
2141 
2142 		base = (char *)mp->page + mp->page->upper;
2143 		len = ptr - mp->page->upper + NODESIZE;
2144 		bcopy(base, base - delta, len);
2145 		mp->page->upper -= delta;
2146 
2147 		node = NODEPTR(mp, indx);
2148 		node->ksize = key->size;
2149 	}
2150 
2151 	bcopy(key->data, NODEKEY(node), key->size);
2152 
2153 	return BT_SUCCESS;
2154 }
2155 
2156 static int
btree_adjust_prefix(struct btree * bt,struct mpage * src,int delta)2157 btree_adjust_prefix(struct btree *bt, struct mpage *src, int delta)
2158 {
2159 	indx_t		 i;
2160 	struct node	*node;
2161 	struct btkey	 tmpkey;
2162 	struct btval	 key;
2163 
2164 	DPRINTF("adjusting prefix lengths on page %u with delta %d",
2165 	    src->pgno, delta);
2166 	assert(delta != 0);
2167 
2168 	for (i = 0; i < NUMKEYS(src); i++) {
2169 		node = NODEPTR(src, i);
2170 		tmpkey.len = node->ksize - delta;
2171 		if (delta > 0) {
2172 			if (F_ISSET(bt->flags, BT_REVERSEKEY))
2173 				bcopy(NODEKEY(node), tmpkey.str, tmpkey.len);
2174 			else
2175 				bcopy((char *)NODEKEY(node) + delta, tmpkey.str,
2176 				    tmpkey.len);
2177 		} else {
2178 			if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
2179 				bcopy(NODEKEY(node), tmpkey.str, node->ksize);
2180 				bcopy(src->prefix.str, tmpkey.str + node->ksize,
2181 				    -delta);
2182 			} else {
2183 				bcopy(src->prefix.str + src->prefix.len + delta,
2184 				    tmpkey.str, -delta);
2185 				bcopy(NODEKEY(node), tmpkey.str - delta,
2186 				    node->ksize);
2187 			}
2188 		}
2189 		key.size = tmpkey.len;
2190 		key.data = tmpkey.str;
2191 		if (btree_update_key(bt, src, i, &key) != BT_SUCCESS)
2192 			return BT_FAIL;
2193 	}
2194 
2195 	return BT_SUCCESS;
2196 }
2197 
2198 /* Move a node from src to dst.
2199  */
2200 static int
btree_move_node(struct btree * bt,struct mpage * src,indx_t srcindx,struct mpage * dst,indx_t dstindx)2201 btree_move_node(struct btree *bt, struct mpage *src, indx_t srcindx,
2202     struct mpage *dst, indx_t dstindx)
2203 {
2204 	int			 rc;
2205 	unsigned int		 pfxlen, mp_pfxlen = 0;
2206 	struct node		*srcnode;
2207 	struct mpage		*mp = NULL;
2208 	struct btkey		 tmpkey, srckey;
2209 	struct btval		 key, data;
2210 
2211 	assert(src->parent);
2212 	assert(dst->parent);
2213 
2214 	srcnode = NODEPTR(src, srcindx);
2215 	DPRINTF("moving %s node %u [%.*s] on page %u to node %u on page %u",
2216 	    IS_LEAF(src) ? "leaf" : "branch",
2217 	    srcindx,
2218 	    (int)srcnode->ksize, (char *)NODEKEY(srcnode),
2219 	    src->pgno,
2220 	    dstindx, dst->pgno);
2221 
2222 	find_common_prefix(bt, src);
2223 
2224 	if (IS_BRANCH(src)) {
2225 		/* Need to check if the page the moved node points to
2226 		 * changes prefix.
2227 		 */
2228 		if ((mp = btree_get_mpage(bt, NODEPGNO(srcnode))) == NULL)
2229 			return BT_FAIL;
2230 		mp->parent = src;
2231 		mp->parent_index = srcindx;
2232 		find_common_prefix(bt, mp);
2233 		mp_pfxlen = mp->prefix.len;
2234 	}
2235 
2236 	/* Mark src and dst as dirty. */
2237 	if ((src = mpage_touch(bt, src)) == NULL ||
2238 	    (dst = mpage_touch(bt, dst)) == NULL)
2239 		return BT_FAIL;
2240 
2241 	find_common_prefix(bt, dst);
2242 
2243 	/* Check if src node has destination page prefix. Otherwise the
2244 	 * destination page must expand its prefix on all its nodes.
2245 	 */
2246 	srckey.len = srcnode->ksize;
2247 	bcopy(NODEKEY(srcnode), srckey.str, srckey.len);
2248 	common_prefix(bt, &srckey, &dst->prefix, &tmpkey);
2249 	if (tmpkey.len != dst->prefix.len) {
2250 		if (btree_adjust_prefix(bt, dst,
2251 		    tmpkey.len - dst->prefix.len) != BT_SUCCESS)
2252 			return BT_FAIL;
2253 		bcopy(&tmpkey, &dst->prefix, sizeof(tmpkey));
2254 	}
2255 
2256 	if (srcindx == 0 && IS_BRANCH(src)) {
2257 		struct mpage	*low;
2258 
2259 		/* must find the lowest key below src
2260 		 */
2261 		assert(btree_search_page_root(bt, src, NULL, NULL, 0,
2262 		    &low) == BT_SUCCESS);
2263 		expand_prefix(bt, low, 0, &srckey);
2264 		DPRINTF("found lowest key [%.*s] on leaf page %u",
2265 		    (int)srckey.len, srckey.str, low->pgno);
2266 	} else {
2267 		srckey.len = srcnode->ksize;
2268 		bcopy(NODEKEY(srcnode), srckey.str, srcnode->ksize);
2269 	}
2270 	find_common_prefix(bt, src);
2271 
2272 	/* expand the prefix */
2273 	tmpkey.len = sizeof(tmpkey.str);
2274 	concat_prefix(bt, src->prefix.str, src->prefix.len,
2275 	    srckey.str, srckey.len, tmpkey.str, &tmpkey.len);
2276 
2277 	/* Add the node to the destination page. Adjust prefix for
2278 	 * destination page.
2279 	 */
2280 	key.size = tmpkey.len;
2281 	key.data = tmpkey.str;
2282 	remove_prefix(bt, &key, dst->prefix.len);
2283 	data.size = NODEDSZ(srcnode);
2284 	data.data = NODEDATA(srcnode);
2285 	rc = btree_add_node(bt, dst, dstindx, &key, &data, NODEPGNO(srcnode),
2286 	    srcnode->flags);
2287 	if (rc != BT_SUCCESS)
2288 		return rc;
2289 
2290 	/* Delete the node from the source page.
2291 	 */
2292 	btree_del_node(bt, src, srcindx);
2293 
2294 	/* Update the parent separators.
2295 	 */
2296 	if (srcindx == 0 && src->parent_index != 0) {
2297 		expand_prefix(bt, src, 0, &tmpkey);
2298 		key.size = tmpkey.len;
2299 		key.data = tmpkey.str;
2300 		remove_prefix(bt, &key, src->parent->prefix.len);
2301 
2302 		DPRINTF("update separator for source page %u to [%.*s]",
2303 		    src->pgno, (int)key.size, (char *)key.data);
2304 		if (btree_update_key(bt, src->parent, src->parent_index,
2305 		    &key) != BT_SUCCESS)
2306 			return BT_FAIL;
2307 	}
2308 
2309 	if (srcindx == 0 && IS_BRANCH(src)) {
2310 		struct btval	 nullkey;
2311 		nullkey.size = 0;
2312 		assert(btree_update_key(bt, src, 0, &nullkey) == BT_SUCCESS);
2313 	}
2314 
2315 	if (dstindx == 0 && dst->parent_index != 0) {
2316 		expand_prefix(bt, dst, 0, &tmpkey);
2317 		key.size = tmpkey.len;
2318 		key.data = tmpkey.str;
2319 		remove_prefix(bt, &key, dst->parent->prefix.len);
2320 
2321 		DPRINTF("update separator for destination page %u to [%.*s]",
2322 		    dst->pgno, (int)key.size, (char *)key.data);
2323 		if (btree_update_key(bt, dst->parent, dst->parent_index,
2324 		    &key) != BT_SUCCESS)
2325 			return BT_FAIL;
2326 	}
2327 
2328 	if (dstindx == 0 && IS_BRANCH(dst)) {
2329 		struct btval	 nullkey;
2330 		nullkey.size = 0;
2331 		assert(btree_update_key(bt, dst, 0, &nullkey) == BT_SUCCESS);
2332 	}
2333 
2334 	/* We can get a new page prefix here!
2335 	 * Must update keys in all nodes of this page!
2336 	 */
2337 	pfxlen = src->prefix.len;
2338 	find_common_prefix(bt, src);
2339 	if (src->prefix.len != pfxlen) {
2340 		if (btree_adjust_prefix(bt, src,
2341 		    src->prefix.len - pfxlen) != BT_SUCCESS)
2342 			return BT_FAIL;
2343 	}
2344 
2345 	pfxlen = dst->prefix.len;
2346 	find_common_prefix(bt, dst);
2347 	if (dst->prefix.len != pfxlen) {
2348 		if (btree_adjust_prefix(bt, dst,
2349 		    dst->prefix.len - pfxlen) != BT_SUCCESS)
2350 			return BT_FAIL;
2351 	}
2352 
2353 	if (IS_BRANCH(dst)) {
2354 		assert(mp);
2355 		mp->parent = dst;
2356 		mp->parent_index = dstindx;
2357 		find_common_prefix(bt, mp);
2358 		if (mp->prefix.len != mp_pfxlen) {
2359 			DPRINTF("moved branch node has changed prefix");
2360 			if ((mp = mpage_touch(bt, mp)) == NULL)
2361 				return BT_FAIL;
2362 			if (btree_adjust_prefix(bt, mp,
2363 			    mp->prefix.len - mp_pfxlen) != BT_SUCCESS)
2364 				return BT_FAIL;
2365 		}
2366 	}
2367 
2368 	return BT_SUCCESS;
2369 }
2370 
2371 static int
btree_merge(struct btree * bt,struct mpage * src,struct mpage * dst)2372 btree_merge(struct btree *bt, struct mpage *src, struct mpage *dst)
2373 {
2374 	int			 rc;
2375 	indx_t			 i;
2376 	unsigned int		 pfxlen;
2377 	struct node		*srcnode;
2378 	struct btkey		 tmpkey, dstpfx;
2379 	struct btval		 key, data;
2380 
2381 	DPRINTF("merging page %u and %u", src->pgno, dst->pgno);
2382 
2383 	assert(src->parent);	/* can't merge root page */
2384 	assert(dst->parent);
2385 	assert(bt->txn != NULL);
2386 
2387 	/* Mark src and dst as dirty. */
2388 	if ((src = mpage_touch(bt, src)) == NULL ||
2389 	    (dst = mpage_touch(bt, dst)) == NULL)
2390 		return BT_FAIL;
2391 
2392 	find_common_prefix(bt, src);
2393 	find_common_prefix(bt, dst);
2394 
2395 	/* Check if source nodes has destination page prefix. Otherwise
2396 	 * the destination page must expand its prefix on all its nodes.
2397 	 */
2398 	common_prefix(bt, &src->prefix, &dst->prefix, &dstpfx);
2399 	if (dstpfx.len != dst->prefix.len) {
2400 		if (btree_adjust_prefix(bt, dst,
2401 		    dstpfx.len - dst->prefix.len) != BT_SUCCESS)
2402 			return BT_FAIL;
2403 		bcopy(&dstpfx, &dst->prefix, sizeof(dstpfx));
2404 	}
2405 
2406 	/* Move all nodes from src to dst.
2407 	 */
2408 	for (i = 0; i < NUMKEYS(src); i++) {
2409 		srcnode = NODEPTR(src, i);
2410 
2411 		/* If branch node 0 (implicit key), find the real key.
2412 		 */
2413 		if (i == 0 && IS_BRANCH(src)) {
2414 			struct mpage	*low;
2415 
2416 			/* must find the lowest key below src
2417 			 */
2418 			assert(btree_search_page_root(bt, src, NULL, NULL, 0,
2419 			    &low) == BT_SUCCESS);
2420 			expand_prefix(bt, low, 0, &tmpkey);
2421 			DPRINTF("found lowest key [%.*s] on leaf page %u",
2422 			    (int)tmpkey.len, tmpkey.str, low->pgno);
2423 		} else {
2424 			expand_prefix(bt, src, i, &tmpkey);
2425 		}
2426 
2427 		key.size = tmpkey.len;
2428 		key.data = tmpkey.str;
2429 
2430 		remove_prefix(bt, &key, dst->prefix.len);
2431 		data.size = NODEDSZ(srcnode);
2432 		data.data = NODEDATA(srcnode);
2433 		rc = btree_add_node(bt, dst, NUMKEYS(dst), &key,
2434 		    &data, NODEPGNO(srcnode), srcnode->flags);
2435 		if (rc != BT_SUCCESS)
2436 			return rc;
2437 	}
2438 
2439 	DPRINTF("dst page %u now has %lu keys (%.1f%% filled)",
2440 	    dst->pgno, NUMKEYS(dst), (float)PAGEFILL(bt, dst) / 10);
2441 
2442 	/* Unlink the src page from parent.
2443 	 */
2444 	btree_del_node(bt, src->parent, src->parent_index);
2445 	if (src->parent_index == 0) {
2446 		key.size = 0;
2447 		if (btree_update_key(bt, src->parent, 0, &key) != BT_SUCCESS)
2448 			return BT_FAIL;
2449 
2450 		pfxlen = src->prefix.len;
2451 		find_common_prefix(bt, src);
2452 		assert (src->prefix.len == pfxlen);
2453 	}
2454 
2455 	if (IS_LEAF(src))
2456 		bt->meta.leaf_pages--;
2457 	else
2458 		bt->meta.branch_pages--;
2459 
2460 	return btree_rebalance(bt, src->parent);
2461 }
2462 
2463 #define FILL_THRESHOLD	 250
2464 
2465 static int
btree_rebalance(struct btree * bt,struct mpage * mp)2466 btree_rebalance(struct btree *bt, struct mpage *mp)
2467 {
2468 	struct node	*node;
2469 	struct mpage	*parent;
2470 	struct mpage	*root;
2471 	struct mpage	*neighbor = NULL;
2472 	indx_t		 si = 0, di = 0;
2473 
2474 	assert(bt != NULL);
2475 	assert(bt->txn != NULL);
2476 	assert(mp != NULL);
2477 
2478 	DPRINTF("rebalancing %s page %u (has %lu keys, %.1f%% full)",
2479 	    IS_LEAF(mp) ? "leaf" : "branch",
2480 	    mp->pgno, NUMKEYS(mp), (float)PAGEFILL(bt, mp) / 10);
2481 
2482 	if (PAGEFILL(bt, mp) >= FILL_THRESHOLD) {
2483 		DPRINTF("no need to rebalance page %u, above fill threshold",
2484 		    mp->pgno);
2485 		return BT_SUCCESS;
2486 	}
2487 
2488 	parent = mp->parent;
2489 
2490 	if (parent == NULL) {
2491 		if (NUMKEYS(mp) == 0) {
2492 			DPRINTF("tree is completely empty");
2493 			bt->txn->root = P_INVALID;
2494 			bt->meta.depth--;
2495 			bt->meta.leaf_pages--;
2496 		} else if (IS_BRANCH(mp) && NUMKEYS(mp) == 1) {
2497 			DPRINTF("collapsing root page!");
2498 			bt->txn->root = NODEPGNO(NODEPTR(mp, 0));
2499 			if ((root = btree_get_mpage(bt, bt->txn->root)) == NULL)
2500 				return BT_FAIL;
2501 			root->parent = NULL;
2502 			bt->meta.depth--;
2503 			bt->meta.branch_pages--;
2504 		} else
2505 			DPRINTF("root page doesn't need rebalancing");
2506 		return BT_SUCCESS;
2507 	}
2508 
2509 	/* The parent (branch page) must have at least 2 pointers,
2510 	 * otherwise the tree is invalid.
2511 	 */
2512 	assert(NUMKEYS(parent) > 1);
2513 
2514 	/* Leaf page fill factor is below the threshold.
2515 	 * Try to move keys from left or right neighbor, or
2516 	 * merge with a neighbor page.
2517 	 */
2518 
2519 	/* Find neighbors.
2520 	 */
2521 	if (mp->parent_index == 0) {
2522 		/* We're the leftmost leaf in our parent.
2523 		 */
2524 		DPRINTF("reading right neighbor");
2525 		node = NODEPTR(parent, mp->parent_index + 1);
2526 		if ((neighbor = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
2527 			return BT_FAIL;
2528 		neighbor->parent_index = mp->parent_index + 1;
2529 		si = 0;
2530 		di = NUMKEYS(mp);
2531 	} else {
2532 		/* There is at least one neighbor to the left.
2533 		 */
2534 		DPRINTF("reading left neighbor");
2535 		node = NODEPTR(parent, mp->parent_index - 1);
2536 		if ((neighbor = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
2537 			return BT_FAIL;
2538 		neighbor->parent_index = mp->parent_index - 1;
2539 		si = NUMKEYS(neighbor) - 1;
2540 		di = 0;
2541 	}
2542 	neighbor->parent = parent;
2543 
2544 	DPRINTF("found neighbor page %u (%lu keys, %.1f%% full)",
2545 	    neighbor->pgno, NUMKEYS(neighbor), (float)PAGEFILL(bt, neighbor) / 10);
2546 
2547 	/* If the neighbor page is above threshold and has at least two
2548 	 * keys, move one key from it.
2549 	 *
2550 	 * Otherwise we should try to merge them, but that might not be
2551 	 * possible, even if both are below threshold, as prefix expansion
2552 	 * might make keys larger. FIXME: detect this
2553 	 */
2554 	if (PAGEFILL(bt, neighbor) >= FILL_THRESHOLD && NUMKEYS(neighbor) >= 2)
2555 		return btree_move_node(bt, neighbor, si, mp, di);
2556 	else { /* FIXME: if (has_enough_room()) */
2557 		if (mp->parent_index == 0)
2558 			return btree_merge(bt, neighbor, mp);
2559 		else
2560 			return btree_merge(bt, mp, neighbor);
2561 	}
2562 }
2563 
2564 int
btree_txn_del(struct btree * bt,struct btree_txn * txn,struct btval * key,struct btval * data)2565 btree_txn_del(struct btree *bt, struct btree_txn *txn,
2566     struct btval *key, struct btval *data)
2567 {
2568 	int		 rc, exact, close_txn = 0;
2569 	unsigned int	 ki;
2570 	struct node	*leaf;
2571 	struct mpage	*mp;
2572 
2573 	DPRINTF("========> delete key %.*s", (int)key->size, (char *)key->data);
2574 
2575 	assert(key != NULL);
2576 
2577 	if (bt != NULL && txn != NULL && bt != txn->bt) {
2578 		errno = EINVAL;
2579 		return BT_FAIL;
2580 	}
2581 
2582 	if (txn != NULL && F_ISSET(txn->flags, BT_TXN_RDONLY)) {
2583 		errno = EINVAL;
2584 		return BT_FAIL;
2585 	}
2586 
2587 	if (bt == NULL) {
2588 		if (txn == NULL) {
2589 			errno = EINVAL;
2590 			return BT_FAIL;
2591 		}
2592 		bt = txn->bt;
2593 	}
2594 
2595 	if (key->size == 0 || key->size > MAXKEYSIZE) {
2596 		errno = EINVAL;
2597 		return BT_FAIL;
2598 	}
2599 
2600 	if (txn == NULL) {
2601 		close_txn = 1;
2602 		if ((txn = btree_txn_begin(bt, 0)) == NULL)
2603 			return BT_FAIL;
2604 	}
2605 
2606 	if ((rc = btree_search_page(bt, txn, key, NULL, 1, &mp)) != BT_SUCCESS)
2607 		goto done;
2608 
2609 	leaf = btree_search_node(bt, mp, key, &exact, &ki);
2610 	if (leaf == NULL || !exact) {
2611 		errno = ENOENT;
2612 		rc = BT_FAIL;
2613 		goto done;
2614 	}
2615 
2616 	if (data && (rc = btree_read_data(bt, NULL, leaf, data)) != BT_SUCCESS)
2617 		goto done;
2618 
2619 	btree_del_node(bt, mp, ki);
2620 	bt->meta.entries--;
2621 	rc = btree_rebalance(bt, mp);
2622 	if (rc != BT_SUCCESS)
2623 		txn->flags |= BT_TXN_ERROR;
2624 
2625 done:
2626 	if (close_txn) {
2627 		if (rc == BT_SUCCESS)
2628 			rc = btree_txn_commit(txn);
2629 		else
2630 			btree_txn_abort(txn);
2631 	}
2632 	mpage_prune(bt);
2633 	return rc;
2634 }
2635 
2636 /* Reduce the length of the prefix separator <sep> to the minimum length that
2637  * still makes it uniquely distinguishable from <min>.
2638  *
2639  * <min> is guaranteed to be sorted less than <sep>
2640  *
2641  * On return, <sep> is modified to the minimum length.
2642  */
2643 static void
bt_reduce_separator(struct btree * bt,struct node * min,struct btval * sep)2644 bt_reduce_separator(struct btree *bt, struct node *min, struct btval *sep)
2645 {
2646 	size_t		 n = 0;
2647 	char		*p1;
2648 	char		*p2;
2649 
2650 	if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
2651 
2652 		assert(sep->size > 0);
2653 
2654 		p1 = (char *)NODEKEY(min) + min->ksize - 1;
2655 		p2 = (char *)sep->data + sep->size - 1;
2656 
2657 		while (p1 >= (char *)NODEKEY(min) && *p1 == *p2) {
2658 			assert(p2 > (char *)sep->data);
2659 			p1--;
2660 			p2--;
2661 			n++;
2662 		}
2663 
2664 		sep->data = p2;
2665 		sep->size = n + 1;
2666 	} else {
2667 
2668 		assert(min->ksize > 0);
2669 		assert(sep->size > 0);
2670 
2671 		p1 = (char *)NODEKEY(min);
2672 		p2 = (char *)sep->data;
2673 
2674 		while (*p1 == *p2) {
2675 			p1++;
2676 			p2++;
2677 			n++;
2678 			if (n == min->ksize || n == sep->size)
2679 				break;
2680 		}
2681 
2682 		sep->size = n + 1;
2683 	}
2684 
2685 	DPRINTF("reduced separator to [%.*s] > [%.*s]",
2686 	    (int)sep->size, (char *)sep->data,
2687 	    (int)min->ksize, (char *)NODEKEY(min));
2688 }
2689 
2690 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
2691  * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
2692  * *newindxp with the actual values after split, ie if *mpp and *newindxp
2693  * refer to a node in the new right sibling page.
2694  */
2695 static int
btree_split(struct btree * bt,struct mpage ** mpp,unsigned int * newindxp,struct btval * newkey,struct btval * newdata,pgno_t newpgno)2696 btree_split(struct btree *bt, struct mpage **mpp, unsigned int *newindxp,
2697     struct btval *newkey, struct btval *newdata, pgno_t newpgno)
2698 {
2699 	uint8_t		 flags;
2700 	int		 rc = BT_SUCCESS, ins_new = 0;
2701 	indx_t		 newindx;
2702 	pgno_t		 pgno = 0;
2703 	size_t		 orig_pfx_len, left_pfx_diff, right_pfx_diff, pfx_diff;
2704 	unsigned int	 i, j, split_indx;
2705 	struct node	*node;
2706 	struct mpage	*pright, *p, *mp;
2707 	struct btval	 sepkey, rkey, rdata;
2708 	struct btkey	 tmpkey;
2709 	struct page	*copy;
2710 
2711 	assert(bt != NULL);
2712 	assert(bt->txn != NULL);
2713 
2714 	mp = *mpp;
2715 	newindx = *newindxp;
2716 
2717 	DPRINTF("-----> splitting %s page %u and adding [%.*s] at index %d",
2718 	    IS_LEAF(mp) ? "leaf" : "branch", mp->pgno,
2719 	    (int)newkey->size, (char *)newkey->data, *newindxp);
2720 	DPRINTF("page %u has prefix [%.*s]", mp->pgno,
2721 	    (int)mp->prefix.len, (char *)mp->prefix.str);
2722 	orig_pfx_len = mp->prefix.len;
2723 
2724 	if (mp->parent == NULL) {
2725 		if ((mp->parent = btree_new_page(bt, P_BRANCH)) == NULL)
2726 			return BT_FAIL;
2727 		mp->parent_index = 0;
2728 		bt->txn->root = mp->parent->pgno;
2729 		DPRINTF("root split! new root = %u", mp->parent->pgno);
2730 		bt->meta.depth++;
2731 
2732 		/* Add left (implicit) pointer. */
2733 		if (btree_add_node(bt, mp->parent, 0, NULL, NULL,
2734 		    mp->pgno, 0) != BT_SUCCESS)
2735 			return BT_FAIL;
2736 	} else {
2737 		DPRINTF("parent branch page is %u", mp->parent->pgno);
2738 	}
2739 
2740 	/* Create a right sibling. */
2741 	if ((pright = btree_new_page(bt, mp->page->flags)) == NULL)
2742 		return BT_FAIL;
2743 	pright->parent = mp->parent;
2744 	pright->parent_index = mp->parent_index + 1;
2745 	DPRINTF("new right sibling: page %u", pright->pgno);
2746 
2747 	/* Move half of the keys to the right sibling. */
2748 	if ((copy = malloc(bt->head.psize)) == NULL)
2749 		return BT_FAIL;
2750 	bcopy(mp->page, copy, bt->head.psize);
2751 	assert(mp->ref == 0);				/* XXX */
2752 	memset(&mp->page->ptrs, 0, bt->head.psize - PAGEHDRSZ);
2753 	mp->page->lower = PAGEHDRSZ;
2754 	mp->page->upper = bt->head.psize;
2755 
2756 	split_indx = NUMKEYSP(copy) / 2 + 1;
2757 
2758 	/* First find the separating key between the split pages.
2759 	 */
2760 	memset(&sepkey, 0, sizeof(sepkey));
2761 	if (newindx == split_indx) {
2762 		sepkey.size = newkey->size;
2763 		sepkey.data = newkey->data;
2764 		remove_prefix(bt, &sepkey, mp->prefix.len);
2765 	} else {
2766 		node = NODEPTRP(copy, split_indx);
2767 		sepkey.size = node->ksize;
2768 		sepkey.data = NODEKEY(node);
2769 	}
2770 
2771 	if (IS_LEAF(mp) && bt->cmp == NULL) {
2772 		/* Find the smallest separator. */
2773 		/* Ref: Prefix B-trees, R. Bayer, K. Unterauer, 1977 */
2774 		node = NODEPTRP(copy, split_indx - 1);
2775 		bt_reduce_separator(bt, node, &sepkey);
2776 	}
2777 
2778 	/* Fix separator wrt parent prefix. */
2779 	if (bt->cmp == NULL) {
2780 		tmpkey.len = sizeof(tmpkey.str);
2781 		concat_prefix(bt, mp->prefix.str, mp->prefix.len,
2782 		    sepkey.data, sepkey.size, tmpkey.str, &tmpkey.len);
2783 		sepkey.data = tmpkey.str;
2784 		sepkey.size = tmpkey.len;
2785 	}
2786 
2787 	DPRINTF("separator is [%.*s]", (int)sepkey.size, (char *)sepkey.data);
2788 
2789 	/* Copy separator key to the parent.
2790 	 */
2791 	if (SIZELEFT(pright->parent) < bt_branch_size(bt, &sepkey)) {
2792 		rc = btree_split(bt, &pright->parent, &pright->parent_index,
2793 		    &sepkey, NULL, pright->pgno);
2794 
2795 		/* Right page might now have changed parent.
2796 		 * Check if left page also changed parent.
2797 		 */
2798 		if (pright->parent != mp->parent &&
2799 		    mp->parent_index >= NUMKEYS(mp->parent)) {
2800 			mp->parent = pright->parent;
2801 			mp->parent_index = pright->parent_index - 1;
2802 		}
2803 	} else {
2804 		remove_prefix(bt, &sepkey, pright->parent->prefix.len);
2805 		rc = btree_add_node(bt, pright->parent, pright->parent_index,
2806 		    &sepkey, NULL, pright->pgno, 0);
2807 	}
2808 	if (rc != BT_SUCCESS) {
2809 		free(copy);
2810 		return BT_FAIL;
2811 	}
2812 
2813 	/* Update prefix for right and left page, if the parent was split.
2814 	 */
2815 	find_common_prefix(bt, pright);
2816 	assert(orig_pfx_len <= pright->prefix.len);
2817 	right_pfx_diff = pright->prefix.len - orig_pfx_len;
2818 
2819 	find_common_prefix(bt, mp);
2820 	assert(orig_pfx_len <= mp->prefix.len);
2821 	left_pfx_diff = mp->prefix.len - orig_pfx_len;
2822 
2823 	for (i = j = 0; i <= NUMKEYSP(copy); j++) {
2824 		if (i < split_indx) {
2825 			/* Re-insert in left sibling. */
2826 			p = mp;
2827 			pfx_diff = left_pfx_diff;
2828 		} else {
2829 			/* Insert in right sibling. */
2830 			if (i == split_indx)
2831 				/* Reset insert index for right sibling. */
2832 				j = (i == newindx && ins_new);
2833 			p = pright;
2834 			pfx_diff = right_pfx_diff;
2835 		}
2836 
2837 		if (i == newindx && !ins_new) {
2838 			/* Insert the original entry that caused the split. */
2839 			rkey.data = newkey->data;
2840 			rkey.size = newkey->size;
2841 			if (IS_LEAF(mp)) {
2842 				rdata.data = newdata->data;
2843 				rdata.size = newdata->size;
2844 			} else
2845 				pgno = newpgno;
2846 			flags = 0;
2847 			pfx_diff = p->prefix.len;
2848 
2849 			ins_new = 1;
2850 
2851 			/* Update page and index for the new key. */
2852 			*newindxp = j;
2853 			*mpp = p;
2854 		} else if (i == NUMKEYSP(copy)) {
2855 			break;
2856 		} else {
2857 			node = NODEPTRP(copy, i);
2858 			rkey.data = NODEKEY(node);
2859 			rkey.size = node->ksize;
2860 			if (IS_LEAF(mp)) {
2861 				rdata.data = NODEDATA(node);
2862 				rdata.size = node->n_dsize;
2863 			} else
2864 				pgno = node->n_pgno;
2865 			flags = node->flags;
2866 
2867 			i++;
2868 		}
2869 
2870 		if (!IS_LEAF(mp) && j == 0) {
2871 			/* First branch index doesn't need key data. */
2872 			rkey.size = 0;
2873 		} else
2874 			remove_prefix(bt, &rkey, pfx_diff);
2875 
2876 		rc = btree_add_node(bt, p, j, &rkey, &rdata, pgno,flags);
2877 	}
2878 
2879 	free(copy);
2880 	return rc;
2881 }
2882 
2883 int
btree_txn_put(struct btree * bt,struct btree_txn * txn,struct btval * key,struct btval * data,unsigned int flags)2884 btree_txn_put(struct btree *bt, struct btree_txn *txn,
2885     struct btval *key, struct btval *data, unsigned int flags)
2886 {
2887 	int		 rc = BT_SUCCESS, exact, close_txn = 0;
2888 	unsigned int	 ki;
2889 	struct node	*leaf;
2890 	struct mpage	*mp;
2891 	struct btval	 xkey;
2892 
2893 	assert(key != NULL);
2894 	assert(data != NULL);
2895 
2896 	if (bt != NULL && txn != NULL && bt != txn->bt) {
2897 		errno = EINVAL;
2898 		return BT_FAIL;
2899 	}
2900 
2901 	if (txn != NULL && F_ISSET(txn->flags, BT_TXN_RDONLY)) {
2902 		errno = EINVAL;
2903 		return BT_FAIL;
2904 	}
2905 
2906 	if (bt == NULL) {
2907 		if (txn == NULL) {
2908 			errno = EINVAL;
2909 			return BT_FAIL;
2910 		}
2911 		bt = txn->bt;
2912 	}
2913 
2914 	if (key->size == 0 || key->size > MAXKEYSIZE) {
2915 		errno = EINVAL;
2916 		return BT_FAIL;
2917 	}
2918 
2919 	DPRINTF("==> put key %.*s, size %zu, data size %zu",
2920 		(int)key->size, (char *)key->data, key->size, data->size);
2921 
2922 	if (txn == NULL) {
2923 		close_txn = 1;
2924 		if ((txn = btree_txn_begin(bt, 0)) == NULL)
2925 			return BT_FAIL;
2926 	}
2927 
2928 	rc = btree_search_page(bt, txn, key, NULL, 1, &mp);
2929 	if (rc == BT_SUCCESS) {
2930 		leaf = btree_search_node(bt, mp, key, &exact, &ki);
2931 		if (leaf && exact) {
2932 			if (F_ISSET(flags, BT_NOOVERWRITE)) {
2933 				DPRINTF("duplicate key %.*s",
2934 				    (int)key->size, (char *)key->data);
2935 				errno = EEXIST;
2936 				rc = BT_FAIL;
2937 				goto done;
2938 			}
2939 			btree_del_node(bt, mp, ki);
2940 		}
2941 		if (leaf == NULL) {		/* append if not found */
2942 			ki = NUMKEYS(mp);
2943 			DPRINTF("appending key at index %d", ki);
2944 		}
2945 	} else if (errno == ENOENT) {
2946 		/* new file, just write a root leaf page */
2947 		DPRINTF("allocating new root leaf page");
2948 		if ((mp = btree_new_page(bt, P_LEAF)) == NULL) {
2949 			rc = BT_FAIL;
2950 			goto done;
2951 		}
2952 		txn->root = mp->pgno;
2953 		bt->meta.depth++;
2954 		ki = 0;
2955 	}
2956 	else
2957 		goto done;
2958 
2959 	assert(IS_LEAF(mp));
2960 	DPRINTF("there are %lu keys, should insert new key at index %u",
2961 		NUMKEYS(mp), ki);
2962 
2963 	/* Copy the key pointer as it is modified by the prefix code. The
2964 	 * caller might have malloc'ed the data.
2965 	 */
2966 	xkey.data = key->data;
2967 	xkey.size = key->size;
2968 
2969 	if (SIZELEFT(mp) < bt_leaf_size(bt, key, data)) {
2970 		rc = btree_split(bt, &mp, &ki, &xkey, data, P_INVALID);
2971 	} else {
2972 		/* There is room already in this leaf page. */
2973 		remove_prefix(bt, &xkey, mp->prefix.len);
2974 		rc = btree_add_node(bt, mp, ki, &xkey, data, 0, 0);
2975 	}
2976 
2977 	if (rc != BT_SUCCESS)
2978 		txn->flags |= BT_TXN_ERROR;
2979 	else
2980 		bt->meta.entries++;
2981 
2982 done:
2983 	if (close_txn) {
2984 		if (rc == BT_SUCCESS)
2985 			rc = btree_txn_commit(txn);
2986 		else
2987 			btree_txn_abort(txn);
2988 	}
2989 	mpage_prune(bt);
2990 	return rc;
2991 }
2992 
2993 static pgno_t
btree_compact_tree(struct btree * bt,pgno_t pgno,struct btree * btc)2994 btree_compact_tree(struct btree *bt, pgno_t pgno, struct btree *btc)
2995 {
2996 	ssize_t		 rc;
2997 	indx_t		 i;
2998 	pgno_t		*pnext, next;
2999 	struct node	*node;
3000 	struct page	*p;
3001 	struct mpage	*mp;
3002 
3003 	/* Get the page and make a copy of it.
3004 	 */
3005 	if ((mp = btree_get_mpage(bt, pgno)) == NULL)
3006 		return P_INVALID;
3007 	if ((p = malloc(bt->head.psize)) == NULL)
3008 		return P_INVALID;
3009 	bcopy(mp->page, p, bt->head.psize);
3010 
3011         /* Go through all nodes in the (copied) page and update the
3012          * page pointers.
3013 	 */
3014 	if (F_ISSET(p->flags, P_BRANCH)) {
3015 		for (i = 0; i < NUMKEYSP(p); i++) {
3016 			node = NODEPTRP(p, i);
3017 			node->n_pgno = btree_compact_tree(bt, node->n_pgno, btc);
3018 			if (node->n_pgno == P_INVALID) {
3019 				free(p);
3020 				return P_INVALID;
3021 			}
3022 		}
3023 	} else if (F_ISSET(p->flags, P_LEAF)) {
3024 		for (i = 0; i < NUMKEYSP(p); i++) {
3025 			node = NODEPTRP(p, i);
3026 			if (F_ISSET(node->flags, F_BIGDATA)) {
3027 				bcopy(NODEDATA(node), &next, sizeof(next));
3028 				next = btree_compact_tree(bt, next, btc);
3029 				if (next == P_INVALID) {
3030 					free(p);
3031 					return P_INVALID;
3032 				}
3033 				bcopy(&next, NODEDATA(node), sizeof(next));
3034 			}
3035 		}
3036 	} else if (F_ISSET(p->flags, P_OVERFLOW)) {
3037 		pnext = &p->p_next_pgno;
3038 		if (*pnext > 0) {
3039 			*pnext = btree_compact_tree(bt, *pnext, btc);
3040 			if (*pnext == P_INVALID) {
3041 				free(p);
3042 				return P_INVALID;
3043 			}
3044 		}
3045 	} else
3046 		assert(0);
3047 
3048 	pgno = p->pgno = btc->txn->next_pgno++;
3049 	rc = write(btc->fd, p, bt->head.psize);
3050 	free(p);
3051 	if (rc != (ssize_t)bt->head.psize)
3052 		return P_INVALID;
3053 	mpage_prune(bt);
3054 	return pgno;
3055 }
3056 
3057 int
btree_compact(struct btree * bt)3058 btree_compact(struct btree *bt)
3059 {
3060 	char			*compact_path = NULL;
3061 	struct btree		*btc;
3062 	struct btree_txn	*txn, *txnc = NULL;
3063 	int			 fd;
3064 	pgno_t			 root;
3065 
3066 	assert(bt != NULL);
3067 
3068 	DPRINTF("compacting btree %p with path %s", bt, bt->path);
3069 
3070 	if (bt->path == NULL) {
3071 		errno = EINVAL;
3072 		return BT_FAIL;
3073 	}
3074 
3075 	if ((txn = btree_txn_begin(bt, 0)) == NULL)
3076 		return BT_FAIL;
3077 
3078 	if (asprintf(&compact_path, "%s.compact.XXXXXX", bt->path) == -1) {
3079 		btree_txn_abort(txn);
3080 		return BT_FAIL;
3081 	}
3082 	fd = mkstemp(compact_path);
3083 	if (fd == -1) {
3084 		free(compact_path);
3085 		btree_txn_abort(txn);
3086 		return BT_FAIL;
3087 	}
3088 
3089 	if ((btc = btree_open_fd(fd, 0)) == NULL)
3090 		goto failed;
3091 	bcopy(&bt->meta, &btc->meta, sizeof(bt->meta));
3092 	btc->meta.revisions = 0;
3093 
3094 	if ((txnc = btree_txn_begin(btc, 0)) == NULL)
3095 		goto failed;
3096 
3097 	if (bt->meta.root != P_INVALID) {
3098 		root = btree_compact_tree(bt, bt->meta.root, btc);
3099 		if (root == P_INVALID)
3100 			goto failed;
3101 		if (btree_write_meta(btc, root, 0) != BT_SUCCESS)
3102 			goto failed;
3103 	}
3104 
3105 	fsync(fd);
3106 
3107 	DPRINTF("renaming %s to %s", compact_path, bt->path);
3108 	if (rename(compact_path, bt->path) != 0)
3109 		goto failed;
3110 
3111 	/* Write a "tombstone" meta page so other processes can pick up
3112 	 * the change and re-open the file.
3113 	 */
3114 	if (btree_write_meta(bt, P_INVALID, BT_TOMBSTONE) != BT_SUCCESS)
3115 		goto failed;
3116 
3117 	btree_txn_abort(txn);
3118 	btree_txn_abort(txnc);
3119 	free(compact_path);
3120 	btree_close(btc);
3121 	mpage_prune(bt);
3122 	return 0;
3123 
3124 failed:
3125 	btree_txn_abort(txn);
3126 	btree_txn_abort(txnc);
3127 	unlink(compact_path);
3128 	free(compact_path);
3129 	btree_close(btc);
3130 	mpage_prune(bt);
3131 	return BT_FAIL;
3132 }
3133 
3134 /* Reverts the last change. Truncates the file at the last root page.
3135  */
3136 int
btree_revert(struct btree * bt)3137 btree_revert(struct btree *bt)
3138 {
3139 	if (btree_read_meta(bt, NULL) != 0)
3140 		return -1;
3141 
3142 	DPRINTF("truncating file at page %u", bt->meta.root);
3143 	return ftruncate(bt->fd, bt->head.psize * bt->meta.root);
3144 }
3145 
3146 void
btree_set_cache_size(struct btree * bt,unsigned int cache_size)3147 btree_set_cache_size(struct btree *bt, unsigned int cache_size)
3148 {
3149 	bt->stat.max_cache = cache_size;
3150 }
3151 
3152 unsigned int
btree_get_flags(struct btree * bt)3153 btree_get_flags(struct btree *bt)
3154 {
3155 	return (bt->flags & ~BT_FIXPADDING);
3156 }
3157 
3158 const char *
btree_get_path(struct btree * bt)3159 btree_get_path(struct btree *bt)
3160 {
3161 	return bt->path;
3162 }
3163 
3164 const struct btree_stat *
btree_stat(struct btree * bt)3165 btree_stat(struct btree *bt)
3166 {
3167 	if (bt == NULL)
3168 		return NULL;
3169 
3170 	bt->stat.branch_pages = bt->meta.branch_pages;
3171 	bt->stat.leaf_pages = bt->meta.leaf_pages;
3172 	bt->stat.overflow_pages = bt->meta.overflow_pages;
3173 	bt->stat.revisions = bt->meta.revisions;
3174 	bt->stat.depth = bt->meta.depth;
3175 	bt->stat.entries = bt->meta.entries;
3176 	bt->stat.psize = bt->head.psize;
3177 	bt->stat.created_at = bt->meta.created_at;
3178 
3179 	return &bt->stat;
3180 }
3181 
3182