1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 1997, 1998, 1999, 2000
5 * Sleepycat Software. All rights reserved.
6 */
7 /*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 * Keith Bostic. All rights reserved.
10 */
11 /*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 * The Regents of the University of California. All rights reserved.
14 *
15 * Redistribution and use in source and binary forms, with or without
16 * modification, are permitted provided that the following conditions
17 * are met:
18 * 1. Redistributions of source code must retain the above copyright
19 * notice, this list of conditions and the following disclaimer.
20 * 2. Redistributions in binary form must reproduce the above copyright
21 * notice, this list of conditions and the following disclaimer in the
22 * documentation and/or other materials provided with the distribution.
23 * 3. Neither the name of the University nor the names of its contributors
24 * may be used to endorse or promote products derived from this software
25 * without specific prior written permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
28 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
30 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
31 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
32 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
33 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
34 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
35 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
36 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
37 * SUCH DAMAGE.
38 */
39
40 #include "config.h"
41
42 #ifndef lint
43 static const char revid[] = "$Id: bt_split.c,v 1.7 2014/04/17 20:27:25 sebdiaz Exp $";
44 #endif /* not lint */
45
46 #ifndef NO_SYSTEM_INCLUDES
47 #include <sys/types.h>
48
49 #include <errno.h>
50 #include <limits.h>
51 #include <string.h>
52 #endif
53
54 #include "db_int.h"
55 #include "db_page.h"
56 #include "db_shash.h"
57 #include "lock.h"
58 #include "btree.h"
59
60 #ifdef DEBUG
61 #include "WordMonitor.h"
62 #endif /* DEBUG */
63
64 static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
65 static int __bam_page __P((DBC *, EPG *, EPG *));
66 static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *, int));
67 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
68 static int __bam_root __P((DBC *, EPG *));
69 static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
70
71 /*
72 * CDB___bam_split --
73 * Split a page.
74 *
75 * PUBLIC: int CDB___bam_split __P((DBC *, void *));
76 */
77 int
CDB___bam_split(dbc,arg)78 CDB___bam_split(dbc, arg)
79 DBC *dbc;
80 void *arg;
81 {
82
83 BTREE_CURSOR *cp;
84 enum { UP, DOWN } dir;
85 db_pgno_t root_pgno;
86 int exact, level, ret;
87
88 cp = (BTREE_CURSOR *)dbc->internal;
89 root_pgno = cp->root;
90
91 /*
92 * The locking protocol we use to avoid deadlock to acquire locks by
93 * walking down the tree, but we do it as lazily as possible, locking
94 * the root only as a last resort. We expect all stack pages to have
95 * been discarded before we're called; we discard all short-term locks.
96 *
97 * When CDB___bam_split is first called, we know that a leaf page was too
98 * full for an insert. We don't know what leaf page it was, but we
99 * have the key/recno that caused the problem. We call XX_search to
100 * reacquire the leaf page, but this time get both the leaf page and
101 * its parent, locked. We then split the leaf page and see if the new
102 * internal key will fit into the parent page. If it will, we're done.
103 *
104 * If it won't, we discard our current locks and repeat the process,
105 * only this time acquiring the parent page and its parent, locked.
106 * This process repeats until we succeed in the split, splitting the
107 * root page as the final resort. The entire process then repeats,
108 * as necessary, until we split a leaf page.
109 *
110 * XXX
111 * A traditional method of speeding this up is to maintain a stack of
112 * the pages traversed in the original search. You can detect if the
113 * stack is correct by storing the page's LSN when it was searched and
114 * comparing that LSN with the current one when it's locked during the
115 * split. This would be an easy change for this code, but I have no
116 * numbers that indicate it's worthwhile.
117 */
118 for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
119 /*
120 * Acquire a page and its parent, locked.
121 */
122 if ((ret = (dbc->dbtype == DB_BTREE ?
123 CDB___bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) :
124 CDB___bam_rsearch(dbc,
125 (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
126 return (ret);
127
128 /*
129 * Split the page if it still needs it (it's possible another
130 * thread of control has already split the page). If we are
131 * guaranteed that two items will fit on the page, the split
132 * is no longer necessary.
133 */
134 if (cp->ovflsize * 2 <=
135 (db_indx_t)P_FREESPACE(cp->csp[0].page)) {
136 CDB___bam_stkrel(dbc, STK_NOLOCK);
137 return (0);
138 }
139 ret = cp->csp[0].page->pgno == root_pgno ?
140 __bam_root(dbc, &cp->csp[0]) :
141 __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
142 BT_STK_CLR(cp);
143
144 switch (ret) {
145 case 0:
146 /* Once we've split the leaf page, we're done. */
147 if (level == LEAFLEVEL)
148 return (0);
149
150 /* Switch directions. */
151 if (dir == UP)
152 dir = DOWN;
153 break;
154 case DB_NEEDSPLIT:
155 /*
156 * It's possible to fail to split repeatedly, as other
157 * threads may be modifying the tree, or the page CDB_usage
158 * is sufficiently bad that we don't get enough space
159 * the first time.
160 */
161 if (dir == DOWN)
162 dir = UP;
163 break;
164 default:
165 return (ret);
166 }
167 }
168 /* NOTREACHED */
169 }
170
171 /*
172 * __bam_root --
173 * Split the root page of a btree.
174 */
175 static int
__bam_root(dbc,cp)176 __bam_root(dbc, cp)
177 DBC *dbc;
178 EPG *cp;
179 {
180 DB *dbp;
181 DBT log_dbt;
182 DB_LSN log_lsn;
183 PAGE *lp, *rp;
184 db_indx_t split;
185 u_int32_t opflags;
186 int ret;
187
188 dbp = dbc->dbp;
189
190 /* Yeah, right. */
191 if (cp->page->level >= MAXBTREELEVEL) {
192 CDB___db_err(dbp->dbenv,
193 "Too many btree levels: %d", cp->page->level);
194 ret = ENOSPC;
195 goto err;
196 }
197
198 /* Create new left and right pages for the split. */
199 lp = rp = NULL;
200 if ((ret = CDB___db_new(dbc, TYPE_TAGS(cp->page), &lp)) != 0 ||
201 (ret = CDB___db_new(dbc, TYPE_TAGS(cp->page), &rp)) != 0)
202 goto err;
203 P_INIT(lp, dbp->pgsize, lp->pgno,
204 PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
205 cp->page->level, TYPE(cp->page), TAGS(cp->page));
206 P_INIT(rp, dbp->pgsize, rp->pgno,
207 ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID,
208 cp->page->level, TYPE(cp->page), TAGS(cp->page));
209
210 /* Split the page. */
211 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
212 goto err;
213
214 /* Log the change. */
215 if (DB_LOGGING(dbc)) {
216 memset(&log_dbt, 0, sizeof(log_dbt));
217 log_dbt.data = cp->page;
218 log_dbt.size = dbp->pgsize;
219 ZERO_LSN(log_lsn);
220 opflags = F_ISSET(
221 (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
222 if ((ret = CDB___bam_split_log(dbp->dbenv, dbc->txn,
223 &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
224 PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &log_lsn,
225 dbc->internal->root, &log_dbt, opflags)) != 0)
226 goto err;
227 LSN(lp) = LSN(rp) = LSN(cp->page);
228 }
229
230 /* Clean up the new root page. */
231 if ((ret = (dbc->dbtype == DB_RECNO ?
232 __ram_root(dbc, cp->page, lp, rp) :
233 __bam_broot(dbc, cp->page, lp, rp))) != 0)
234 goto err;
235
236 /* Adjust any cursors. Do it last so we don't have to undo it. */
237 CDB___bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
238
239 /* Success -- write the real pages back to the store. */
240 (void)CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
241 (void)__TLPUT(dbc, cp->lock);
242 (void)CDB_memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
243 (void)CDB_memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
244
245 return (0);
246
247 err: if (lp != NULL)
248 (void)CDB___db_free(dbc, lp);
249 if (rp != NULL)
250 (void)CDB___db_free(dbc, rp);
251 (void)CDB_memp_fput(dbp->mpf, cp->page, 0);
252 (void)__TLPUT(dbc, cp->lock);
253 return (ret);
254 }
255
256 /*
257 * __bam_page --
258 * Split the non-root page of a btree.
259 */
260 static int
__bam_page(dbc,pp,cp)261 __bam_page(dbc, pp, cp)
262 DBC *dbc;
263 EPG *pp, *cp;
264 {
265 BTREE_CURSOR *bc;
266 DBT log_dbt;
267 DB_LSN log_lsn;
268 DB *dbp;
269 DB_LOCK tplock;
270 DB_LSN save_lsn;
271 PAGE *lp, *rp, *alloc_rp, *tp;
272 db_indx_t split;
273 u_int32_t opflags;
274 int ret, t_ret;
275
276 dbp = dbc->dbp;
277 alloc_rp = lp = rp = tp = NULL;
278 tplock.off = LOCK_INVALID;
279 ret = -1;
280
281 /*
282 * Create a new right page for the split, and fill in everything
283 * except its LSN and page number.
284 *
285 * We malloc space for both the left and right pages, so we don't get
286 * a new page from the underlying buffer pool until we know the split
287 * is going to succeed. The reason is that we can't release locks
288 * acquired during the get-a-new-page process because metadata page
289 * locks can't be discarded on failure since we may have modified the
290 * free list. So, if you assume that we're holding a write lock on the
291 * leaf page which ran out of space and started this split (e.g., we
292 * have already written records to the page, or we retrieved a record
293 * from it with the DB_RMW flag set), failing in a split with both a
294 * leaf page locked and the metadata page locked can potentially lock
295 * up the tree badly, because we've violated the rule of always locking
296 * down the tree, and never up.
297 */
298 if ((ret = CDB___os_malloc(dbp->dbenv, dbp->pgsize, NULL, &rp)) != 0)
299 goto err;
300 P_INIT(rp, dbp->pgsize, 0,
301 ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
302 ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
303 cp->page->level, TYPE(cp->page), TAGS(cp->page));
304
305 /*
306 * Create new left page for the split, and fill in everything
307 * except its LSN and next-page page number.
308 */
309 if ((ret = CDB___os_malloc(dbp->dbenv, dbp->pgsize, NULL, &lp)) != 0)
310 goto err;
311 P_INIT(lp, dbp->pgsize, PGNO(cp->page),
312 ISINTERNAL(cp->page) ? PGNO_INVALID : PREV_PGNO(cp->page),
313 ISINTERNAL(cp->page) ? PGNO_INVALID : 0,
314 cp->page->level, TYPE(cp->page), TAGS(cp->page));
315
316 /*
317 * Split right.
318 *
319 * Only the indices are sorted on the page, i.e., the key/data pairs
320 * aren't, so it's simpler to copy the data from the split page onto
321 * two new pages instead of copying half the data to a new right page
322 * and compacting the left page in place. Since the left page can't
323 * change, we swap the original and the allocated left page after the
324 * split.
325 */
326 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
327 goto err;
328
329 /*
330 * Test to see if we are going to be able to insert the new pages into
331 * the parent page. The interesting failure here is that the parent
332 * page can't hold the new keys, and has to be split in turn, in which
333 * case we want to release all the locks we can.
334 */
335 if ((ret = __bam_pinsert(dbc, pp, lp, rp, 1)) != 0)
336 goto err;
337
338 /*
339 * Fix up the previous pointer of any leaf page following the split
340 * page.
341 *
342 * There's interesting deadlock situations here as we try to write-lock
343 * a page that's not in our direct ancestry. Consider a cursor walking
344 * backward through the leaf pages, that has our following page locked,
345 * and is waiting on a lock for the page we're splitting. In that case
346 * we're going to deadlock here . It's probably OK, stepping backward
347 * through the tree isn't a common operation.
348 */
349 if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID) {
350 if ((ret = CDB___db_lget(dbc,
351 0, NEXT_PGNO(cp->page), DB_LOCK_WRITE, 0, &tplock)) != 0)
352 goto err;
353 if ((ret =
354 CDB_memp_fget(dbp->mpf, &NEXT_PGNO(cp->page), 0, &tp)) != 0)
355 goto err;
356 }
357
358 /*
359 * We've got everything locked down we need, and we know the split
360 * is going to succeed. Go and get the additional page we'll need.
361 */
362 if ((ret = CDB___db_new(dbc, TYPE_TAGS(cp->page), &alloc_rp)) != 0)
363 goto err;
364
365 /*
366 * Fix up the page numbers we didn't have before. We have to do this
367 * before calling __bam_pinsert because it may copy a page number onto
368 * the parent page and it takes the page number from its page argument.
369 */
370 PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
371
372 /* Actually update the parent page. */
373 if ((ret = __bam_pinsert(dbc, pp, lp, rp, 0)) != 0)
374 goto err;
375
376 bc = (BTREE_CURSOR *)dbc->internal;
377 /* Log the change. */
378 if (DB_LOGGING(dbc)) {
379 memset(&log_dbt, 0, sizeof(log_dbt));
380 log_dbt.data = cp->page;
381 log_dbt.size = dbp->pgsize;
382 if (tp == NULL)
383 ZERO_LSN(log_lsn);
384 opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
385 if ((ret = CDB___bam_split_log(dbp->dbenv, dbc->txn,
386 &LSN(cp->page), 0, dbp->log_fileid, PGNO(cp->page),
387 &LSN(cp->page), PGNO(alloc_rp), &LSN(alloc_rp),
388 (u_int32_t)NUM_ENT(lp),
389 tp == NULL ? 0 : PGNO(tp),
390 tp == NULL ? &log_lsn : &LSN(tp),
391 bc->root, &log_dbt, opflags)) != 0)
392 goto err;
393
394 /* Update the LSNs for all involved pages. */
395 LSN(alloc_rp) = LSN(lp) = LSN(rp) = LSN(cp->page);
396 if (tp != NULL)
397 LSN(tp) = LSN(cp->page);
398 }
399
400 /*
401 * Copy the left and right pages into place. There are two paths
402 * through here. Either we are logging and we set the LSNs in the
403 * logging path. However, if we are not logging, then we do not
404 * have valid LSNs on lp or rp. The correct LSNs to use are the
405 * ones on the page we got from CDB___db_new or the one that was
406 * originally on cp->page. In both cases, we save the LSN from the
407 * real database page (not a malloc'd one) and reapply it after we
408 * do the copy.
409 */
410 save_lsn = alloc_rp->lsn;
411 memcpy(alloc_rp, rp, LOFFSET(rp));
412 memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
413 (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
414 alloc_rp->lsn = save_lsn;
415
416 save_lsn = cp->page->lsn;
417 memcpy(cp->page, lp, LOFFSET(lp));
418 memcpy((u_int8_t *)cp->page + HOFFSET(lp),
419 (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
420 cp->page->lsn = save_lsn;
421
422 /* Fix up the next-page link. */
423 if (tp != NULL)
424 PREV_PGNO(tp) = PGNO(rp);
425
426 /* Adjust any cursors. Do it last so we don't have to undo it. */
427 CDB___bam_ca_split(dbp, PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0);
428
429 CDB___os_free(lp, dbp->pgsize);
430 CDB___os_free(rp, dbp->pgsize);
431
432 /*
433 * Success -- write the real pages back to the store. As we never
434 * acquired any sort of lock on the new page, we release it before
435 * releasing locks on the pages that reference it. We're finished
436 * modifying the page so it's not really necessary, but it's neater.
437 */
438 if ((t_ret =
439 CDB_memp_fput(dbp->mpf, alloc_rp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
440 ret = t_ret;
441 if ((t_ret =
442 CDB_memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
443 ret = t_ret;
444 (void)__TLPUT(dbc, pp->lock);
445 if ((t_ret =
446 CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
447 ret = t_ret;
448 (void)__TLPUT(dbc, cp->lock);
449 if (tp != NULL) {
450 if ((t_ret =
451 CDB_memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
452 ret = t_ret;
453 (void)__TLPUT(dbc, tplock);
454 }
455 return (ret);
456
457 err: if (lp != NULL)
458 CDB___os_free(lp, dbp->pgsize);
459 if (rp != NULL)
460 CDB___os_free(rp, dbp->pgsize);
461 if (alloc_rp != NULL)
462 (void)CDB___db_free(dbc, alloc_rp);
463
464 if (tp != NULL)
465 (void)CDB_memp_fput(dbp->mpf, tp, 0);
466 if (tplock.off != LOCK_INVALID)
467 /* We never updated the next page, we can release it. */
468 (void)__LPUT(dbc, tplock);
469
470 (void)CDB_memp_fput(dbp->mpf, pp->page, 0);
471 if (ret == DB_NEEDSPLIT)
472 (void)__LPUT(dbc, pp->lock);
473 else
474 (void)__TLPUT(dbc, pp->lock);
475
476 (void)CDB_memp_fput(dbp->mpf, cp->page, 0);
477 if (ret == DB_NEEDSPLIT)
478 (void)__LPUT(dbc, cp->lock);
479 else
480 (void)__TLPUT(dbc, cp->lock);
481
482 return (ret);
483 }
484
485 /*
486 * __bam_broot --
487 * Fix up the btree root page after it has been split.
488 */
489 static int
__bam_broot(dbc,rootp,lp,rp)490 __bam_broot(dbc, rootp, lp, rp)
491 DBC *dbc;
492 PAGE *rootp, *lp, *rp;
493 {
494 BINTERNAL bi, *child_bi;
495 BKEYDATA *child_bk;
496 BTREE_CURSOR *cp;
497 DB *dbp;
498 DBT hdr, data;
499 db_pgno_t root_pgno;
500 int ret;
501
502 dbp = dbc->dbp;
503 cp = (BTREE_CURSOR *)dbc->internal;
504
505 /*
506 * If the root page was a leaf page, change it into an internal page.
507 * We copy the key we split on (but not the key's data, in the case of
508 * a leaf page) to the new root page.
509 */
510 root_pgno = cp->root;
511 P_INIT(rootp, dbp->pgsize,
512 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE, TAGS(lp));
513 #ifdef DEBUG
514 word_monitor_set(DB_MONITOR(dbp->dbenv), WORD_MONITOR_LEVEL, LEVEL(rootp));
515 #endif /* DEBUG */
516
517 memset(&data, 0, sizeof(data));
518 memset(&hdr, 0, sizeof(hdr));
519
520 /*
521 * The btree comparison code guarantees that the left-most key on any
522 * internal btree page is never used, so it doesn't need to be filled
523 * in. Set the record count if necessary.
524 */
525 memset(&bi, 0, sizeof(bi));
526 bi.len = 0;
527 B_TSET(bi.type, B_KEYDATA, 0);
528 bi.pgno = lp->pgno;
529 if (F_ISSET(cp, C_RECNUM)) {
530 bi.nrecs = CDB___bam_total(lp);
531 RE_NREC_SET(rootp, bi.nrecs);
532 }
533 hdr.data = &bi;
534 hdr.size = SSZA(BINTERNAL, data);
535 if ((ret =
536 CDB___db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
537 return (ret);
538
539 switch (TYPE(rp)) {
540 case P_IBTREE:
541 /* Copy the first key of the child page onto the root page. */
542 child_bi = GET_BINTERNAL(rp, 0);
543
544 bi.len = child_bi->len;
545 B_TSET(bi.type, child_bi->type, 0);
546 bi.pgno = rp->pgno;
547 if (F_ISSET(cp, C_RECNUM)) {
548 bi.nrecs = CDB___bam_total(rp);
549 RE_NREC_ADJ(rootp, bi.nrecs);
550 }
551 hdr.data = &bi;
552 hdr.size = SSZA(BINTERNAL, data);
553 data.data = child_bi->data;
554 data.size = child_bi->len;
555 if ((ret = CDB___db_pitem(dbc, rootp, 1,
556 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
557 return (ret);
558
559 /* Increment the overflow ref count. */
560 if (B_TYPE(child_bi->type) == B_OVERFLOW)
561 if ((ret = CDB___db_ovref(dbc,
562 ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
563 return (ret);
564 break;
565 case P_LDUP:
566 case P_LBTREE:
567 /* Copy the first key of the child page onto the root page. */
568 child_bk = GET_BKEYDATA(rp, 0);
569 switch (B_TYPE(child_bk->type)) {
570 case B_KEYDATA:
571 bi.len = child_bk->len;
572 B_TSET(bi.type, child_bk->type, 0);
573 bi.pgno = rp->pgno;
574 if (F_ISSET(cp, C_RECNUM)) {
575 bi.nrecs = CDB___bam_total(rp);
576 RE_NREC_ADJ(rootp, bi.nrecs);
577 }
578 hdr.data = &bi;
579 hdr.size = SSZA(BINTERNAL, data);
580 data.data = child_bk->data;
581 data.size = child_bk->len;
582 if ((ret = CDB___db_pitem(dbc, rootp, 1,
583 BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
584 return (ret);
585 break;
586 case B_DUPLICATE:
587 case B_OVERFLOW:
588 bi.len = BOVERFLOW_SIZE;
589 B_TSET(bi.type, child_bk->type, 0);
590 bi.pgno = rp->pgno;
591 if (F_ISSET(cp, C_RECNUM)) {
592 bi.nrecs = CDB___bam_total(rp);
593 RE_NREC_ADJ(rootp, bi.nrecs);
594 }
595 hdr.data = &bi;
596 hdr.size = SSZA(BINTERNAL, data);
597 data.data = child_bk;
598 data.size = BOVERFLOW_SIZE;
599 if ((ret = CDB___db_pitem(dbc, rootp, 1,
600 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
601 return (ret);
602
603 /* Increment the overflow ref count. */
604 if (B_TYPE(child_bk->type) == B_OVERFLOW)
605 if ((ret = CDB___db_ovref(dbc,
606 ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
607 return (ret);
608 break;
609 default:
610 return (CDB___db_pgfmt(dbp, rp->pgno));
611 }
612 break;
613 default:
614 return (CDB___db_pgfmt(dbp, rp->pgno));
615 }
616 return (0);
617 }
618
619 /*
620 * __ram_root --
621 * Fix up the recno root page after it has been split.
622 */
623 static int
__ram_root(dbc,rootp,lp,rp)624 __ram_root(dbc, rootp, lp, rp)
625 DBC *dbc;
626 PAGE *rootp, *lp, *rp;
627 {
628 DB *dbp;
629 DBT hdr;
630 RINTERNAL ri;
631 db_pgno_t root_pgno;
632 int ret;
633
634 dbp = dbc->dbp;
635 root_pgno = dbc->internal->root;
636
637 /* Initialize the page. */
638 P_INIT(rootp, dbp->pgsize,
639 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO, TAGS(lp));
640
641 /* Initialize the header. */
642 memset(&hdr, 0, sizeof(hdr));
643 hdr.data = &ri;
644 hdr.size = RINTERNAL_SIZE;
645
646 /* Insert the left and right keys, set the header information. */
647 ri.pgno = lp->pgno;
648 ri.nrecs = CDB___bam_total(lp);
649 if ((ret = CDB___db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
650 return (ret);
651 RE_NREC_SET(rootp, ri.nrecs);
652 ri.pgno = rp->pgno;
653 ri.nrecs = CDB___bam_total(rp);
654 if ((ret = CDB___db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
655 return (ret);
656 RE_NREC_ADJ(rootp, ri.nrecs);
657 return (0);
658 }
659
660 /*
661 * __bam_pinsert --
662 * Insert a new key into a parent page, completing the split.
663 */
664 static int
__bam_pinsert(dbc,parent,lchild,rchild,space_check)665 __bam_pinsert(dbc, parent, lchild, rchild, space_check)
666 DBC *dbc;
667 EPG *parent;
668 PAGE *lchild, *rchild;
669 int space_check;
670 {
671 BINTERNAL bi, *child_bi;
672 BKEYDATA *child_bk, *tmp_bk;
673 BTREE *t;
674 BTREE_CURSOR *cp;
675 DB *dbp;
676 DBT a, b, hdr, data;
677 PAGE *ppage;
678 RINTERNAL ri;
679 db_indx_t off;
680 db_recno_t nrecs;
681 size_t (*func) __P((const DBT *, const DBT *));
682 u_int32_t n, nbytes, nksize;
683 int ret;
684
685 dbp = dbc->dbp;
686 cp = (BTREE_CURSOR *)dbc->internal;
687 t = dbp->bt_internal;
688 ppage = parent->page;
689
690 /* If handling record numbers, count records split to the right page. */
691 nrecs = F_ISSET(cp, C_RECNUM) && !space_check ? CDB___bam_total(rchild) : 0;
692
693 /*
694 * Now we insert the new page's first key into the parent page, which
695 * completes the split. The parent points to a PAGE and a page index
696 * offset, where the new key goes ONE AFTER the index, because we split
697 * to the right.
698 *
699 * XXX
700 * Some btree algorithms replace the key for the old page as well as
701 * the new page. We don't, as there's no reason to believe that the
702 * first key on the old page is any better than the key we have, and,
703 * in the case of a key being placed at index 0 causing the split, the
704 * key is unavailable.
705 */
706 off = parent->indx + O_INDX;
707
708 /*
709 * Calculate the space needed on the parent page.
710 *
711 * Prefix trees: space hack used when inserting into BINTERNAL pages.
712 * Retain only what's needed to distinguish between the new entry and
713 * the LAST entry on the page to its left. If the keys compare equal,
714 * retain the entire key. We ignore overflow keys, and the entire key
715 * must be retained for the next-to-leftmost key on the leftmost page
716 * of each level, or the search will fail. Applicable ONLY to internal
717 * pages that have leaf pages as children. Further reduction of the
718 * key between pairs of internal pages loses too much information.
719 */
720 switch (TYPE(rchild)) {
721 case P_IBTREE:
722 child_bi = GET_BINTERNAL(rchild, 0);
723 nbytes = BINTERNAL_PSIZE(child_bi->len);
724
725 if (P_FREESPACE(ppage) < nbytes)
726 return (DB_NEEDSPLIT);
727 if (space_check)
728 return (0);
729
730 /* Add a new record for the right page. */
731 memset(&bi, 0, sizeof(bi));
732 bi.len = child_bi->len;
733 B_TSET(bi.type, child_bi->type, 0);
734 bi.pgno = rchild->pgno;
735 bi.nrecs = nrecs;
736 memset(&hdr, 0, sizeof(hdr));
737 hdr.data = &bi;
738 hdr.size = SSZA(BINTERNAL, data);
739 memset(&data, 0, sizeof(data));
740 data.data = child_bi->data;
741 data.size = child_bi->len;
742 if ((ret = CDB___db_pitem(dbc, ppage, off,
743 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
744 return (ret);
745
746 /* Increment the overflow ref count. */
747 if (B_TYPE(child_bi->type) == B_OVERFLOW)
748 if ((ret = CDB___db_ovref(dbc,
749 ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
750 return (ret);
751 break;
752 case P_LDUP:
753 case P_LBTREE:
754 child_bk = GET_BKEYDATA(rchild, 0);
755 switch (B_TYPE(child_bk->type)) {
756 case B_KEYDATA:
757 /*
758 * We set t->bt_prefix to NULL if we have a comparison
759 * callback but no prefix compression callback. But,
760 * if we're splitting in an off-page duplicates tree,
761 * we still have to do some checking. If using the
762 * default off-page duplicates comparison routine we
763 * can use the default prefix compression callback. If
764 * not using the default off-page duplicates comparison
765 * routine, we can't do any kind of prefix compression
766 * as there's no way for an application to specify a
767 * prefix compression callback that corresponds to its
768 * comparison callback.
769 */
770 if (F_ISSET(dbc, DBC_OPD)) {
771 if (dbp->dup_compare == CDB___bam_defcmp)
772 func = CDB___bam_defpfx;
773 else
774 func = NULL;
775 } else
776 func = t->bt_prefix;
777
778 nbytes = BINTERNAL_PSIZE(child_bk->len);
779 nksize = child_bk->len;
780 if (func == NULL)
781 goto noprefix;
782 if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
783 goto noprefix;
784 tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) -
785 (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
786 if (B_TYPE(tmp_bk->type) != B_KEYDATA)
787 goto noprefix;
788 memset(&a, 0, sizeof(a));
789 a.size = tmp_bk->len;
790 a.data = tmp_bk->data;
791 memset(&b, 0, sizeof(b));
792 b.size = child_bk->len;
793 b.data = child_bk->data;
794 nksize = func(&a, &b);
795 if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
796 nbytes = n;
797 else
798 noprefix: nksize = child_bk->len;
799
800 if (P_FREESPACE(ppage) < nbytes)
801 return (DB_NEEDSPLIT);
802 if (space_check)
803 return (0);
804
805 memset(&bi, 0, sizeof(bi));
806 bi.len = nksize;
807 B_TSET(bi.type, child_bk->type, 0);
808 bi.pgno = rchild->pgno;
809 bi.nrecs = nrecs;
810 memset(&hdr, 0, sizeof(hdr));
811 hdr.data = &bi;
812 hdr.size = SSZA(BINTERNAL, data);
813 memset(&data, 0, sizeof(data));
814 data.data = child_bk->data;
815 data.size = nksize;
816 if ((ret = CDB___db_pitem(dbc, ppage, off,
817 BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
818 return (ret);
819 break;
820 case B_DUPLICATE:
821 case B_OVERFLOW:
822 nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
823
824 if (P_FREESPACE(ppage) < nbytes)
825 return (DB_NEEDSPLIT);
826 if (space_check)
827 return (0);
828
829 memset(&bi, 0, sizeof(bi));
830 bi.len = BOVERFLOW_SIZE;
831 B_TSET(bi.type, child_bk->type, 0);
832 bi.pgno = rchild->pgno;
833 bi.nrecs = nrecs;
834 memset(&hdr, 0, sizeof(hdr));
835 hdr.data = &bi;
836 hdr.size = SSZA(BINTERNAL, data);
837 memset(&data, 0, sizeof(data));
838 data.data = child_bk;
839 data.size = BOVERFLOW_SIZE;
840 if ((ret = CDB___db_pitem(dbc, ppage, off,
841 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
842 return (ret);
843
844 /* Increment the overflow ref count. */
845 if (B_TYPE(child_bk->type) == B_OVERFLOW)
846 if ((ret = CDB___db_ovref(dbc,
847 ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
848 return (ret);
849 break;
850 default:
851 return (CDB___db_pgfmt(dbp, rchild->pgno));
852 }
853 break;
854 case P_IRECNO:
855 case P_LRECNO:
856 nbytes = RINTERNAL_PSIZE;
857
858 if (P_FREESPACE(ppage) < nbytes)
859 return (DB_NEEDSPLIT);
860 if (space_check)
861 return (0);
862
863 /* Add a new record for the right page. */
864 memset(&hdr, 0, sizeof(hdr));
865 hdr.data = &ri;
866 hdr.size = RINTERNAL_SIZE;
867 ri.pgno = rchild->pgno;
868 ri.nrecs = nrecs;
869 if ((ret = CDB___db_pitem(dbc,
870 ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
871 return (ret);
872 break;
873 default:
874 return (CDB___db_pgfmt(dbp, rchild->pgno));
875 }
876
877 /*
878 * If a Recno or Btree with record numbers AM page, or an off-page
879 * duplicates tree, adjust the parent page's left page record count.
880 */
881 if (F_ISSET(cp, C_RECNUM)) {
882 /* Log the change. */
883 if (DB_LOGGING(dbc) &&
884 (ret = CDB___bam_cadjust_log(dbp->dbenv, dbc->txn,
885 &LSN(ppage), 0, dbp->log_fileid, PGNO(ppage),
886 &LSN(ppage), parent->indx, -(int32_t)nrecs, 0)) != 0)
887 return (ret);
888
889 /* Update the left page count. */
890 if (dbc->dbtype == DB_RECNO)
891 GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
892 else
893 GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
894 }
895
896 return (0);
897 }
898
899 /*
900 * __bam_psplit --
901 * Do the real work of splitting the page.
902 */
903 static int
__bam_psplit(dbc,cp,lp,rp,splitret)904 __bam_psplit(dbc, cp, lp, rp, splitret)
905 DBC *dbc;
906 EPG *cp;
907 PAGE *lp, *rp;
908 db_indx_t *splitret;
909 {
910 DB *dbp;
911 PAGE *pp;
912 db_indx_t half, nbytes, off, splitp, top;
913 int adjust, cnt, iflag, isbigkey, ret;
914
915 dbp = dbc->dbp;
916 pp = cp->page;
917 adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
918
919 /*
920 * If we're splitting the first (last) page on a level because we're
921 * inserting (appending) a key to it, it's likely that the data is
922 * sorted. Moving a single item to the new page is less work and can
923 * push the fill factor higher than normal. If we're wrong it's not
924 * a big deal, we'll just do the split the right way next time.
925 */
926 off = 0;
927 if (NEXT_PGNO(pp) == PGNO_INVALID &&
928 ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
929 (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
930 off = NUM_ENT(cp->page) - adjust;
931 else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
932 off = adjust;
933
934 if (off != 0)
935 goto sort;
936
937 /*
938 * Split the data to the left and right pages. Try not to split on
939 * an overflow key. (Overflow keys on internal pages will slow down
940 * searches.) Refuse to split in the middle of a set of duplicates.
941 *
942 * First, find the optimum place to split.
943 *
944 * It's possible to try and split past the last record on the page if
945 * there's a very large record at the end of the page. Make sure this
946 * doesn't happen by bounding the check at the next-to-last entry on
947 * the page.
948 *
949 * Note, we try and split half the data present on the page. This is
950 * because another process may have already split the page and left
951 * it half empty. We don't try and skip the split -- we don't know
952 * how much space we're going to need on the page, and we may need up
953 * to half the page for a big item, so there's no easy test to decide
954 * if we need to split or not. Besides, if two threads are inserting
955 * data into the same place in the database, we're probably going to
956 * need more space soon anyway.
957 */
958 top = NUM_ENT(pp) - adjust;
959 half = (dbp->pgsize - HOFFSET(pp)) / 2;
960 for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
961 switch (TYPE(pp)) {
962 case P_IBTREE:
963 if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
964 nbytes +=
965 BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
966 else
967 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
968 break;
969 case P_LBTREE:
970 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
971 nbytes +=
972 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
973 else
974 nbytes += BOVERFLOW_SIZE;
975
976 ++off;
977 /* FALLTHROUGH */
978 case P_LDUP:
979 case P_LRECNO:
980 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
981 nbytes +=
982 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
983 else
984 nbytes += BOVERFLOW_SIZE;
985 break;
986 case P_IRECNO:
987 nbytes += RINTERNAL_SIZE;
988 break;
989 default:
990 return (CDB___db_pgfmt(dbp, pp->pgno));
991 }
992 sort: splitp = off;
993
994 /*
995 * Splitp is either at or just past the optimum split point. If the
996 * tree type is such that we're going to promote a key to an internal
997 * page, and our current choice is an overflow key, look for something
998 * close by that's smaller.
999 */
1000 switch (TYPE(pp)) {
1001 case P_IBTREE:
1002 iflag = 1;
1003 isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
1004 break;
1005 case P_LBTREE:
1006 case P_LDUP:
1007 iflag = 0;
1008 isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
1009 break;
1010 default:
1011 iflag = isbigkey = 0;
1012 }
1013 if (isbigkey)
1014 for (cnt = 1; cnt <= 3; ++cnt) {
1015 off = splitp + cnt * adjust;
1016 if (off < (db_indx_t)NUM_ENT(pp) &&
1017 ((iflag &&
1018 B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
1019 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
1020 splitp = off;
1021 break;
1022 }
1023 if (splitp <= (db_indx_t)(cnt * adjust))
1024 continue;
1025 off = splitp - cnt * adjust;
1026 if (iflag ?
1027 B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
1028 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
1029 splitp = off;
1030 break;
1031 }
1032 }
1033
1034 /*
1035 * We can't split in the middle a set of duplicates. We know that
1036 * no duplicate set can take up more than about 25% of the page,
1037 * because that's the point where we push it off onto a duplicate
1038 * page set. So, this loop can't be unbounded.
1039 */
1040 if (TYPE(pp) == P_LBTREE &&
1041 pp->inp[splitp] == pp->inp[splitp - adjust])
1042 for (cnt = 1;; ++cnt) {
1043 off = splitp + cnt * adjust;
1044 if (off < NUM_ENT(pp) &&
1045 pp->inp[splitp] != pp->inp[off]) {
1046 splitp = off;
1047 break;
1048 }
1049 if (splitp <= (db_indx_t)(cnt * adjust))
1050 continue;
1051 off = splitp - cnt * adjust;
1052 if (pp->inp[splitp] != pp->inp[off]) {
1053 splitp = off + adjust;
1054 break;
1055 }
1056 }
1057
1058 /* We're going to split at splitp. */
1059 if ((ret = CDB___bam_copy(dbp, pp, lp, 0, splitp)) != 0)
1060 return (ret);
1061 if ((ret = CDB___bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
1062 return (ret);
1063
1064 *splitret = splitp;
1065 return (0);
1066 }
1067
1068 /*
1069 * CDB___bam_copy --
1070 * Copy a set of records from one page to another.
1071 *
1072 * PUBLIC: int CDB___bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
1073 */
1074 int
CDB___bam_copy(dbp,pp,cp,nxt,stop)1075 CDB___bam_copy(dbp, pp, cp, nxt, stop)
1076 DB *dbp;
1077 PAGE *pp, *cp;
1078 u_int32_t nxt, stop;
1079 {
1080 db_indx_t nbytes, off;
1081
1082 /*
1083 * Copy the rest of the data to the right page. Nxt is the next
1084 * offset placed on the target page.
1085 */
1086 for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
1087 switch (TYPE(pp)) {
1088 case P_IBTREE:
1089 if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
1090 nbytes =
1091 BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
1092 else
1093 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
1094 break;
1095 case P_LBTREE:
1096 /*
1097 * If we're on a key and it's a duplicate, just copy
1098 * the offset.
1099 */
1100 if (off != 0 && (nxt % P_INDX) == 0 &&
1101 pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
1102 cp->inp[off] = cp->inp[off - P_INDX];
1103 continue;
1104 }
1105 /* FALLTHROUGH */
1106 case P_LDUP:
1107 case P_LRECNO:
1108 if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
1109 nbytes =
1110 BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
1111 else
1112 nbytes = BOVERFLOW_SIZE;
1113 break;
1114 case P_IRECNO:
1115 nbytes = RINTERNAL_SIZE;
1116 break;
1117 default:
1118 return (CDB___db_pgfmt(dbp, pp->pgno));
1119 }
1120 cp->inp[off] = HOFFSET(cp) -= nbytes;
1121 memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
1122 }
1123 return (0);
1124 }
1125