1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 1997, 1998, 1999, 2000
5 * Sleepycat Software. All rights reserved.
6 */
7 /*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 * Keith Bostic. All rights reserved.
10 */
11 /*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 * The Regents of the University of California. All rights reserved.
14 *
15 * This code is derived from software contributed to Berkeley by
16 * Mike Olson.
17 *
18 * Redistribution and use in source and binary forms, with or without
19 * modification, are permitted provided that the following conditions
20 * are met:
21 * 1. Redistributions of source code must retain the above copyright
22 * notice, this list of conditions and the following disclaimer.
23 * 2. Redistributions in binary form must reproduce the above copyright
24 * notice, this list of conditions and the following disclaimer in the
25 * documentation and/or other materials provided with the distribution.
26 * 3. Neither the name of the University nor the names of its contributors
27 * may be used to endorse or promote products derived from this software
28 * without specific prior written permission.
29 *
30 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
31 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
32 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
34 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
35 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
36 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
38 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
39 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
40 * SUCH DAMAGE.
41 */
42
43 #include "config.h"
44
45 #ifndef lint
46 static const char revid[] = "$Id: bt_put.c,v 1.6 2014/04/17 20:27:25 sebdiaz Exp $";
47 #endif /* not lint */
48
49 #ifndef NO_SYSTEM_INCLUDES
50 #include <sys/types.h>
51
52 #include <errno.h>
53 #include <string.h>
54 #endif
55
56 #include "db_int.h"
57 #include "db_page.h"
58 #include "btree.h"
59
60 static int __bam_dup_convert __P((DBC *, PAGE *, u_int32_t));
61 static int __bam_ovput
62 __P((DBC *, u_int32_t, db_pgno_t, PAGE *, u_int32_t, DBT *));
63
64 /*
65 * CDB___bam_iitem --
66 * Insert an item into the tree.
67 *
68 * PUBLIC: int CDB___bam_iitem __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t));
69 */
70 int
CDB___bam_iitem(dbc,key,data,op,flags)71 CDB___bam_iitem(dbc, key, data, op, flags)
72 DBC *dbc;
73 DBT *key, *data;
74 u_int32_t op, flags;
75 {
76 BKEYDATA *bk, bk_tmp;
77 BTREE *t;
78 BTREE_CURSOR *cp;
79 DB *dbp;
80 DBT bk_hdr, tdbt;
81 PAGE *h;
82 db_indx_t indx;
83 u_int32_t data_size, have_bytes, need_bytes, needed;
84 int cmp, bigkey, bigdata, dupadjust, padrec, replace, ret, was_deleted;
85
86 COMPQUIET(bk, NULL);
87
88 dbp = dbc->dbp;
89 cp = (BTREE_CURSOR *)dbc->internal;
90 t = dbp->bt_internal;
91 h = cp->page;
92 indx = cp->indx;
93 dupadjust = replace = was_deleted = 0;
94
95 /*
96 * Fixed-length records with partial puts: it's an error to specify
97 * anything other simple overwrite.
98 */
99 if (F_ISSET(dbp, DB_RE_FIXEDLEN) &&
100 F_ISSET(data, DB_DBT_PARTIAL) && data->dlen != data->size) {
101 data_size = data->size;
102 goto len_err;
103 }
104
105 /*
106 * Figure out how much space the data will take, including if it's a
107 * partial record.
108 *
109 * Fixed-length records: it's an error to specify a record that's
110 * longer than the fixed-length, and we never require less than
111 * the fixed-length record size.
112 */
113 data_size = F_ISSET(data, DB_DBT_PARTIAL) ?
114 CDB___bam_partsize(op, data, h, indx) : data->size;
115 padrec = 0;
116 if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
117 if (data_size > t->re_len) {
118 len_err: CDB___db_err(dbp->dbenv,
119 "Length improper for fixed length record %lu",
120 (u_long)data_size);
121 return (EINVAL);
122 }
123 if (data_size < t->re_len) {
124 padrec = 1;
125 data_size = t->re_len;
126 }
127 }
128
129 /*
130 * Handle partial puts or short fixed-length records: build the
131 * real record.
132 */
133 if (padrec || F_ISSET(data, DB_DBT_PARTIAL)) {
134 tdbt = *data;
135 if ((ret =
136 CDB___bam_build(dbc, op, &tdbt, h, indx, data_size)) != 0)
137 return (ret);
138 data = &tdbt;
139 }
140
141 /*
142 * If the user has specified a duplicate comparison function, return
143 * an error if DB_CURRENT was specified and the replacement data
144 * doesn't compare equal to the current data. This stops apps from
145 * screwing up the duplicate sort order. We have to do this after
146 * we build the real record so that we're comparing the real items.
147 */
148 if (op == DB_CURRENT && dbp->dup_compare != NULL) {
149 if ((ret = CDB___bam_cmp(dbp, data, h,
150 indx + (TYPE(h) == P_LBTREE ? O_INDX : 0),
151 dbp->dup_compare, &cmp)) != 0)
152 return (ret);
153 if (cmp != 0) {
154 CDB___db_err(dbp->dbenv,
155 "Current data differs from put data");
156 return (EINVAL);
157 }
158 }
159
160 /*
161 * If the key or data item won't fit on a page, we'll have to store
162 * them on overflow pages.
163 */
164 needed = 0;
165 bigdata = data_size > cp->ovflsize;
166 switch (op) {
167 case DB_KEYFIRST:
168 /* We're adding a new key and data pair. */
169 bigkey = key->size > cp->ovflsize;
170 if (bigkey)
171 needed += BOVERFLOW_PSIZE;
172 else
173 needed += BKEYDATA_PSIZE(key->size);
174 if (bigdata)
175 needed += BOVERFLOW_PSIZE;
176 else
177 needed += BKEYDATA_PSIZE(data_size);
178 break;
179 case DB_AFTER:
180 case DB_BEFORE:
181 case DB_CURRENT:
182 /*
183 * We're either overwriting the data item of a key/data pair
184 * or we're creating a new on-page duplicate and only adding
185 * a data item.
186 *
187 * !!!
188 * We're not currently correcting for space reclaimed from
189 * already deleted items, but I don't think it's worth the
190 * complexity.
191 */
192 bigkey = 0;
193 if (op == DB_CURRENT) {
194 bk = GET_BKEYDATA(h,
195 indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
196 if (B_TYPE(bk->type) == B_KEYDATA)
197 have_bytes = BKEYDATA_PSIZE(bk->len);
198 else
199 have_bytes = BOVERFLOW_PSIZE;
200 need_bytes = 0;
201 } else {
202 have_bytes = 0;
203 need_bytes = sizeof(db_indx_t);
204 }
205 if (bigdata)
206 need_bytes += BOVERFLOW_PSIZE;
207 else
208 need_bytes += BKEYDATA_PSIZE(data_size);
209
210 if (have_bytes < need_bytes)
211 needed += need_bytes - have_bytes;
212 break;
213 default:
214 return (CDB___db_unknown_flag(dbp->dbenv, "CDB___bam_iitem", op));
215 }
216
217 /*
218 * If there's not enough room, or the user has put a ceiling on the
219 * number of keys permitted in the page, split the page.
220 *
221 * XXX
222 * The t->bt_maxkey test here may be insufficient -- do we have to
223 * check in the btree split code, so we don't undo it there!?!?
224 */
225 if (P_FREESPACE(h) < needed ||
226 (t->bt_maxkey != 0 && NUM_ENT(h) > t->bt_maxkey))
227 return (DB_NEEDSPLIT);
228
229 /*
230 * The code breaks it up into five cases:
231 *
232 * 1. Insert a new key/data pair.
233 * 2. Append a new data item (a new duplicate).
234 * 3. Insert a new data item (a new duplicate).
235 * 4. Delete and re-add the data item (overflow item).
236 * 5. Overwrite the data item.
237 */
238 switch (op) {
239 case DB_KEYFIRST: /* 1. Insert a new key/data pair. */
240 if (bigkey) {
241 if ((ret = __bam_ovput(dbc,
242 B_OVERFLOW, PGNO_INVALID, h, indx, key)) != 0)
243 return (ret);
244 } else
245 if ((ret = CDB___db_pitem(dbc, h, indx,
246 BKEYDATA_SIZE(key->size), NULL, key)) != 0)
247 return (ret);
248
249 CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
250 ++indx;
251 break;
252 case DB_AFTER: /* 2. Append a new data item. */
253 if (TYPE(h) == P_LBTREE) {
254 /* Copy the key for the duplicate and adjust cursors. */
255 if ((ret =
256 CDB___bam_adjindx(dbc, h, indx + P_INDX, indx, 1)) != 0)
257 return (ret);
258 CDB___bam_ca_di(dbp, PGNO(h), indx + P_INDX, 1);
259
260 indx += 3;
261 dupadjust = 1;
262
263 cp->indx += 2;
264 } else {
265 ++indx;
266 CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
267
268 cp->indx += 1;
269 }
270 break;
271 case DB_BEFORE: /* 3. Insert a new data item. */
272 if (TYPE(h) == P_LBTREE) {
273 /* Copy the key for the duplicate and adjust cursors. */
274 if ((ret = CDB___bam_adjindx(dbc, h, indx, indx, 1)) != 0)
275 return (ret);
276 CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
277
278 ++indx;
279 dupadjust = 1;
280 } else
281 CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
282 break;
283 case DB_CURRENT:
284 if (TYPE(h) == P_LBTREE) {
285 ++indx;
286 dupadjust = 1;
287
288 /*
289 * In a Btree deleted records aren't counted (deleted
290 * records are counted in a Recno because all accesses
291 * are based on record number). If it's a Btree and
292 * it's a DB_CURRENT operation overwriting a previously
293 * deleted record, increment the record count.
294 */
295 was_deleted = B_DISSET(bk->type);
296 }
297
298 /*
299 * 4. Delete and re-add the data item.
300 *
301 * If we're changing the type of the on-page structure, or we
302 * are referencing offpage items, we have to delete and then
303 * re-add the item. We do not do any cursor adjustments here
304 * because we're going to immediately re-add the item into the
305 * same slot.
306 */
307 if (bigdata || B_TYPE(bk->type) != B_KEYDATA) {
308 if ((ret = CDB___bam_ditem(dbc, h, indx)) != 0)
309 return (ret);
310 break;
311 }
312
313 /* 5. Overwrite the data item. */
314 replace = 1;
315 break;
316 default:
317 return (CDB___db_unknown_flag(dbp->dbenv, "CDB___bam_iitem", op));
318 }
319
320 /* Add the data. */
321 if (bigdata) {
322 if ((ret = __bam_ovput(dbc,
323 B_OVERFLOW, PGNO_INVALID, h, indx, data)) != 0)
324 return (ret);
325 } else {
326 if (LF_ISSET(BI_DELETED)) {
327 B_TSET(bk_tmp.type, B_KEYDATA, 1);
328 bk_tmp.len = data->size;
329 bk_hdr.data = &bk_tmp;
330 bk_hdr.size = SSZA(BKEYDATA, data);
331 ret = CDB___db_pitem(dbc, h, indx,
332 BKEYDATA_SIZE(data->size), &bk_hdr, data);
333 } else if (replace)
334 ret = CDB___bam_ritem(dbc, h, indx, data);
335 else
336 ret = CDB___db_pitem(dbc, h, indx,
337 BKEYDATA_SIZE(data->size), NULL, data);
338 if (ret != 0)
339 return (ret);
340 }
341 if ((ret = CDB_memp_fset(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0)
342 return (ret);
343
344 /*
345 * Adjust the cursors in general. After that's done, reset the current
346 * cursor to point to the new item.
347 */
348 if (op == DB_CURRENT)
349 (void)CDB___bam_ca_delete(dbp, PGNO(h),
350 TYPE(h) == P_LBTREE ? indx - O_INDX : indx, 0);
351 else {
352 CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
353 cp->indx = TYPE(h) == P_LBTREE ? indx - O_INDX : indx;
354 }
355
356 /*
357 * If we've changed the record count, update the tree. There's no
358 * need to adjust the count if the operation not performed on the
359 * current record or when the current record was previously deleted.
360 */
361 if (F_ISSET(cp, C_RECNUM) && (op != DB_CURRENT || was_deleted))
362 if ((ret = CDB___bam_adjust(dbc, 1)) != 0)
363 return (ret);
364
365 /*
366 * If a Btree leaf page is at least 50% full and we may have added or
367 * modified a duplicate data item, see if the set of duplicates takes
368 * up at least 25% of the space on the page. If it does, move it onto
369 * its own page.
370 */
371 if (dupadjust && P_FREESPACE(h) <= dbp->pgsize / 2) {
372 if ((ret = __bam_dup_convert(dbc, h, indx - O_INDX)) != 0)
373 return (ret);
374 }
375
376 /* If we've modified a recno file, set the flag. */
377 if (dbc->dbtype == DB_RECNO)
378 F_SET(t, RECNO_MODIFIED);
379
380 return (ret);
381 }
382
383 /*
384 * CDB___bam_partsize --
385 * Figure out how much space a partial data item is in total.
386 *
387 * PUBLIC: u_int32_t CDB___bam_partsize __P((u_int32_t, DBT *, PAGE *, u_int32_t));
388 */
389 u_int32_t
CDB___bam_partsize(op,data,h,indx)390 CDB___bam_partsize(op, data, h, indx)
391 u_int32_t op, indx;
392 DBT *data;
393 PAGE *h;
394 {
395 BKEYDATA *bk;
396 u_int32_t nbytes;
397
398 /*
399 * If the record doesn't already exist, it's simply the data we're
400 * provided.
401 */
402 if (op != DB_CURRENT)
403 return (data->doff + data->size);
404
405 /*
406 * Otherwise, it's the data provided plus any already existing data
407 * that we're not replacing.
408 */
409 bk = GET_BKEYDATA(h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
410 nbytes =
411 B_TYPE(bk->type) == B_OVERFLOW ? ((BOVERFLOW *)bk)->tlen : bk->len;
412
413 /*
414 * There are really two cases here:
415 *
416 * Case 1: We are replacing some bytes that do not exist (i.e., they
417 * are past the end of the record). In this case the number of bytes
418 * we are replacing is irrelevant and all we care about is how many
419 * bytes we are going to add from offset. So, the new record length
420 * is going to be the size of the new bytes (size) plus wherever those
421 * new bytes begin (doff).
422 *
423 * Case 2: All the bytes we are replacing exist. Therefore, the new
424 * size is the oldsize (nbytes) minus the bytes we are replacing (dlen)
425 * plus the bytes we are adding (size).
426 */
427 if (nbytes < data->doff + data->dlen) /* Case 1 */
428 return (data->doff + data->size);
429
430 return (nbytes + data->size - data->dlen); /* Case 2 */
431 }
432
433 /*
434 * CDB___bam_build --
435 * Build the real record for a partial put, or short fixed-length record.
436 *
437 * PUBLIC: int CDB___bam_build __P((DBC *, u_int32_t,
438 * PUBLIC: DBT *, PAGE *, u_int32_t, u_int32_t));
439 */
440 int
CDB___bam_build(dbc,op,dbt,h,indx,nbytes)441 CDB___bam_build(dbc, op, dbt, h, indx, nbytes)
442 DBC *dbc;
443 u_int32_t op, indx, nbytes;
444 DBT *dbt;
445 PAGE *h;
446 {
447 BKEYDATA *bk, tbk;
448 BOVERFLOW *bo;
449 BTREE *t;
450
451 DB *dbp;
452 DBT copy;
453 u_int32_t len, tlen;
454 u_int8_t *p;
455 int ret;
456
457 COMPQUIET(bo, NULL);
458
459 dbp = dbc->dbp;
460 t = dbp->bt_internal;
461
462 /* We use the record data return memory, it's only a short-term use. */
463 if (dbc->rdata.ulen < nbytes) {
464 if ((ret = CDB___os_realloc(dbp->dbenv,
465 nbytes, NULL, &dbc->rdata.data)) != 0) {
466 dbc->rdata.ulen = 0;
467 dbc->rdata.data = NULL;
468 return (ret);
469 }
470 dbc->rdata.ulen = nbytes;
471 }
472
473 /*
474 * We use nul or pad bytes for any part of the record that isn't
475 * specified; get it over with.
476 */
477 memset(dbc->rdata.data,
478 F_ISSET(dbp, DB_RE_FIXEDLEN) ? t->re_pad : 0, nbytes);
479
480 /*
481 * In the next clauses, we need to do three things: a) set p to point
482 * to the place at which to copy the user's data, b) set tlen to the
483 * total length of the record, not including the bytes contributed by
484 * the user, and c) copy any valid data from an existing record. If
485 * it's not a partial put (this code is called for both partial puts
486 * and fixed-length record padding) or it's a new key, we can cut to
487 * the chase.
488 */
489 if (!F_ISSET(dbt, DB_DBT_PARTIAL) || op != DB_CURRENT) {
490 p = (u_int8_t *)dbc->rdata.data + dbt->doff;
491 tlen = dbt->doff;
492 goto user_copy;
493 }
494
495 /* Find the current record. */
496 if (indx < NUM_ENT(h)) {
497 bk = GET_BKEYDATA(h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
498 bo = (BOVERFLOW *)bk;
499 } else {
500 bk = &tbk;
501 B_TSET(bk->type, B_KEYDATA, 0);
502 bk->len = 0;
503 }
504 if (B_TYPE(bk->type) == B_OVERFLOW) {
505 /*
506 * In the case of an overflow record, we shift things around
507 * in the current record rather than allocate a separate copy.
508 */
509 memset(©, 0, sizeof(copy));
510 if ((ret = CDB___db_goff(dbp, ©, bo->tlen,
511 bo->pgno, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
512 return (ret);
513
514 /* Skip any leading data from the original record. */
515 tlen = dbt->doff;
516 p = (u_int8_t *)dbc->rdata.data + dbt->doff;
517
518 /*
519 * Copy in any trailing data from the original record.
520 *
521 * If the original record was larger than the original offset
522 * plus the bytes being deleted, there is trailing data in the
523 * original record we need to preserve. If we aren't deleting
524 * the same number of bytes as we're inserting, copy it up or
525 * down, into place.
526 *
527 * Use memmove(), the regions may overlap.
528 */
529 if (bo->tlen > dbt->doff + dbt->dlen) {
530 len = bo->tlen - (dbt->doff + dbt->dlen);
531 if (dbt->dlen != dbt->size)
532 memmove(p + dbt->size, p + dbt->dlen, len);
533 tlen += len;
534 }
535 } else {
536 /* Copy in any leading data from the original record. */
537 memcpy(dbc->rdata.data,
538 bk->data, dbt->doff > bk->len ? bk->len : dbt->doff);
539 tlen = dbt->doff;
540 p = (u_int8_t *)dbc->rdata.data + dbt->doff;
541
542 /* Copy in any trailing data from the original record. */
543 len = dbt->doff + dbt->dlen;
544 if (bk->len > len) {
545 memcpy(p + dbt->size, bk->data + len, bk->len - len);
546 tlen += bk->len - len;
547 }
548 }
549
550 user_copy:
551 /*
552 * Copy in the application provided data -- p and tlen must have been
553 * initialized above.
554 */
555 memcpy(p, dbt->data, dbt->size);
556 tlen += dbt->size;
557
558 /* Set the DBT to reference our new record. */
559 dbc->rdata.size = F_ISSET(dbp, DB_RE_FIXEDLEN) ? t->re_len : tlen;
560 dbc->rdata.dlen = 0;
561 dbc->rdata.doff = 0;
562 dbc->rdata.flags = 0;
563 *dbt = dbc->rdata;
564 return (0);
565 }
566
567 /*
568 * CDB___bam_ritem --
569 * Replace an item on a page.
570 *
571 * PUBLIC: int CDB___bam_ritem __P((DBC *, PAGE *, u_int32_t, DBT *));
572 */
573 int
CDB___bam_ritem(dbc,h,indx,data)574 CDB___bam_ritem(dbc, h, indx, data)
575 DBC *dbc;
576 PAGE *h;
577 u_int32_t indx;
578 DBT *data;
579 {
580 BKEYDATA *bk;
581 DB *dbp;
582 DBT orig, repl;
583 db_indx_t cnt, lo, ln, min, off, prefix, suffix;
584 int32_t nbytes;
585 int ret;
586 u_int8_t *p, *t;
587
588 dbp = dbc->dbp;
589
590 /*
591 * Replace a single item onto a page. The logic figuring out where
592 * to insert and whether it fits is handled in the caller. All we do
593 * here is manage the page shuffling.
594 */
595 bk = GET_BKEYDATA(h, indx);
596
597 /* Log the change. */
598 if (DB_LOGGING(dbc)) {
599 /*
600 * We might as well check to see if the two data items share
601 * a common prefix and suffix -- it can save us a lot of log
602 * message if they're large.
603 */
604 min = data->size < bk->len ? data->size : bk->len;
605 for (prefix = 0,
606 p = bk->data, t = data->data;
607 prefix < min && *p == *t; ++prefix, ++p, ++t)
608 ;
609
610 min -= prefix;
611 for (suffix = 0,
612 p = (u_int8_t *)bk->data + bk->len - 1,
613 t = (u_int8_t *)data->data + data->size - 1;
614 suffix < min && *p == *t; ++suffix, --p, --t)
615 ;
616
617 /* We only log the parts of the keys that have changed. */
618 orig.data = (u_int8_t *)bk->data + prefix;
619 orig.size = bk->len - (prefix + suffix);
620 repl.data = (u_int8_t *)data->data + prefix;
621 repl.size = data->size - (prefix + suffix);
622 if ((ret = CDB___bam_repl_log(dbp->dbenv, dbc->txn,
623 &LSN(h), 0, dbp->log_fileid, PGNO(h), &LSN(h),
624 (u_int32_t)indx, (u_int32_t)B_DISSET(bk->type),
625 &orig, &repl, (u_int32_t)prefix, (u_int32_t)suffix)) != 0)
626 return (ret);
627 }
628
629 /*
630 * Set references to the first in-use byte on the page and the
631 * first byte of the item being replaced.
632 */
633 p = (u_int8_t *)h + HOFFSET(h);
634 t = (u_int8_t *)bk;
635
636 /*
637 * If the entry is growing in size, shift the beginning of the data
638 * part of the page down. If the entry is shrinking in size, shift
639 * the beginning of the data part of the page up. Use memmove(3),
640 * the regions overlap.
641 */
642 lo = BKEYDATA_SIZE(bk->len);
643 ln = BKEYDATA_SIZE(data->size);
644 if (lo != ln) {
645 nbytes = lo - ln; /* Signed difference. */
646 if (p == t) /* First index is fast. */
647 h->inp[indx] += nbytes;
648 else { /* Else, shift the page. */
649 memmove(p + nbytes, p, t - p);
650
651 /* Adjust the indices' offsets. */
652 off = h->inp[indx];
653 for (cnt = 0; cnt < NUM_ENT(h); ++cnt)
654 if (h->inp[cnt] <= off)
655 h->inp[cnt] += nbytes;
656 }
657
658 /* Clean up the page and adjust the item's reference. */
659 HOFFSET(h) += nbytes;
660 t += nbytes;
661 }
662
663 /* Copy the new item onto the page. */
664 bk = (BKEYDATA *)t;
665 B_TSET(bk->type, B_KEYDATA, 0);
666 bk->len = data->size;
667 memcpy(bk->data, data->data, data->size);
668
669 return (0);
670 }
671
672 /*
673 * __bam_dup_convert --
674 * Check to see if the duplicate set at indx should have its own page.
675 * If it should, create it.
676 */
677 static int
__bam_dup_convert(dbc,h,indx)678 __bam_dup_convert(dbc, h, indx)
679 DBC *dbc;
680 PAGE *h;
681 u_int32_t indx;
682 {
683 BKEYDATA *bk;
684 DB *dbp;
685 DBT hdr;
686 PAGE *dp;
687 db_indx_t cnt, cpindx, first, sz;
688 int ret;
689
690 dbp = dbc->dbp;
691
692 /*
693 * Count the duplicate records and calculate how much room they're
694 * using on the page.
695 */
696 while (indx > 0 && h->inp[indx] == h->inp[indx - P_INDX])
697 indx -= P_INDX;
698 for (cnt = 0, sz = 0, first = indx;; ++cnt, indx += P_INDX) {
699 if (indx >= NUM_ENT(h) || h->inp[first] != h->inp[indx])
700 break;
701 bk = GET_BKEYDATA(h, indx);
702 sz += B_TYPE(bk->type) == B_KEYDATA ?
703 BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
704 bk = GET_BKEYDATA(h, indx + O_INDX);
705 sz += B_TYPE(bk->type) == B_KEYDATA ?
706 BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
707 }
708
709 /*
710 * We have to do these checks when the user is replacing the cursor's
711 * data item -- if the application replaces a duplicate item with a
712 * larger data item, it can increase the amount of space used by the
713 * duplicates, requiring this check. But that means we may have done
714 * this check when it wasn't a duplicate item after all.
715 */
716 if (cnt == 1)
717 return (0);
718
719 /*
720 * If this set of duplicates is using more than 25% of the page, move
721 * them off. The choice of 25% is a WAG, but the value must be small
722 * enough that we can always split a page without putting duplicates
723 * on two different pages.
724 */
725 if (sz < dbp->pgsize / 4)
726 return (0);
727
728 /* Get a new page. */
729 if ((ret = CDB___db_new(dbc,
730 ((dbp->dup_compare == NULL ? P_LRECNO : P_LDUP) | dbp->tags), &dp)) != 0)
731 return (ret);
732 P_INIT(dp, dbp->pgsize, dp->pgno,
733 PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp), TAGS(dp));
734
735 /*
736 * Move this set of duplicates off the page. First points to the first
737 * key of the first duplicate key/data pair, cnt is the number of pairs
738 * we're dealing with.
739 */
740 memset(&hdr, 0, sizeof(hdr));
741 for (indx = first + O_INDX, cpindx = 0;;) {
742 /* Move cursors referencing the old entry to the new entry. */
743 if ((ret = CDB___bam_ca_dup(dbp, first,
744 PGNO(h), indx - O_INDX, PGNO(dp), cpindx)) != 0)
745 goto err;
746
747 /*
748 * Copy the entry to the new page. If the off-duplicate page
749 * is a Btree page, deleted entries move normally. If it's a
750 * Recno page, deleted entries are discarded.
751 */
752 bk = GET_BKEYDATA(h, indx);
753 hdr.data = bk;
754 hdr.size = B_TYPE(bk->type) == B_KEYDATA ?
755 BKEYDATA_SIZE(bk->len) : BOVERFLOW_SIZE;
756 if (dbp->dup_compare != NULL || !B_DISSET(bk->type)) {
757 if ((ret = CDB___db_pitem(
758 dbc, dp, cpindx, hdr.size, &hdr, NULL)) != 0)
759 goto err;
760 ++cpindx;
761 }
762
763 /* Delete the data item. */
764 if ((ret = CDB___db_ditem(dbc, h, indx, hdr.size)) != 0)
765 goto err;
766 CDB___bam_ca_di(dbp, PGNO(h), indx, -1);
767
768 /* Delete all but the first reference to the key. */
769 if (--cnt == 0)
770 break;
771 if ((ret = CDB___bam_adjindx(dbc, h, indx, first, 0)) != 0)
772 goto err;
773 CDB___bam_ca_di(dbp, PGNO(h), indx, -1);
774 }
775
776 /* Put in a new data item that points to the duplicates page. */
777 if ((ret = __bam_ovput(dbc, B_DUPLICATE, dp->pgno, h, indx, NULL)) != 0)
778 goto err;
779
780 return (CDB_memp_fput(dbp->mpf, dp, DB_MPOOL_DIRTY));
781
782 err: (void)CDB___db_free(dbc, dp);
783 return (ret);
784 }
785
786 /*
787 * __bam_ovput --
788 * Build an item for an off-page duplicates page or overflow page and
789 * insert it on the page.
790 */
791 static int
__bam_ovput(dbc,type,pgno,h,indx,item)792 __bam_ovput(dbc, type, pgno, h, indx, item)
793 DBC *dbc;
794 u_int32_t type, indx;
795 db_pgno_t pgno;
796 PAGE *h;
797 DBT *item;
798 {
799 BOVERFLOW bo;
800 DBT hdr;
801 int ret;
802
803 UMRW(bo.unused1);
804 B_TSET(bo.type, type, 0);
805 UMRW(bo.unused2);
806
807 /*
808 * If we're creating an overflow item, do so and acquire the page
809 * number for it. If we're creating an off-page duplicates tree,
810 * we are giving the page number as an argument.
811 */
812 if (type == B_OVERFLOW) {
813 if ((ret = CDB___db_poff(dbc, item, &bo.pgno)) != 0)
814 return (ret);
815 bo.tlen = item->size;
816 } else {
817 bo.pgno = pgno;
818 bo.tlen = 0;
819 }
820
821 /* Store the new record on the page. */
822 memset(&hdr, 0, sizeof(hdr));
823 hdr.data = &bo;
824 hdr.size = BOVERFLOW_SIZE;
825 return (CDB___db_pitem(dbc, h, indx, BOVERFLOW_SIZE, &hdr, NULL));
826 }
827