1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 1997, 1998, 1999, 2000
5  *	Sleepycat Software.  All rights reserved.
6  */
7 /*
8  * Copyright (c) 1990, 1993, 1994, 1995, 1996
9  *	Keith Bostic.  All rights reserved.
10  */
11 /*
12  * Copyright (c) 1990, 1993, 1994, 1995
13  *	The Regents of the University of California.  All rights reserved.
14  *
15  * This code is derived from software contributed to Berkeley by
16  * Mike Olson.
17  *
18  * Redistribution and use in source and binary forms, with or without
19  * modification, are permitted provided that the following conditions
20  * are met:
21  * 1. Redistributions of source code must retain the above copyright
22  *    notice, this list of conditions and the following disclaimer.
23  * 2. Redistributions in binary form must reproduce the above copyright
24  *    notice, this list of conditions and the following disclaimer in the
25  *    documentation and/or other materials provided with the distribution.
26  * 3. Neither the name of the University nor the names of its contributors
27  *    may be used to endorse or promote products derived from this software
28  *    without specific prior written permission.
29  *
30  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
31  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
32  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
34  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
35  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
36  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
38  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
39  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
40  * SUCH DAMAGE.
41  */
42 
43 #include "config.h"
44 
45 #ifndef lint
46 static const char revid[] = "$Id: bt_put.c,v 1.6 2014/04/17 20:27:25 sebdiaz Exp $";
47 #endif /* not lint */
48 
49 #ifndef NO_SYSTEM_INCLUDES
50 #include <sys/types.h>
51 
52 #include <errno.h>
53 #include <string.h>
54 #endif
55 
56 #include "db_int.h"
57 #include "db_page.h"
58 #include "btree.h"
59 
60 static int __bam_dup_convert __P((DBC *, PAGE *, u_int32_t));
61 static int __bam_ovput
62 	       __P((DBC *, u_int32_t, db_pgno_t, PAGE *, u_int32_t, DBT *));
63 
64 /*
65  * CDB___bam_iitem --
66  *	Insert an item into the tree.
67  *
68  * PUBLIC: int CDB___bam_iitem __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t));
69  */
70 int
CDB___bam_iitem(dbc,key,data,op,flags)71 CDB___bam_iitem(dbc, key, data, op, flags)
72 	DBC *dbc;
73 	DBT *key, *data;
74 	u_int32_t op, flags;
75 {
76 	BKEYDATA *bk, bk_tmp;
77 	BTREE *t;
78 	BTREE_CURSOR *cp;
79 	DB *dbp;
80 	DBT bk_hdr, tdbt;
81 	PAGE *h;
82 	db_indx_t indx;
83 	u_int32_t data_size, have_bytes, need_bytes, needed;
84 	int cmp, bigkey, bigdata, dupadjust, padrec, replace, ret, was_deleted;
85 
86 	COMPQUIET(bk, NULL);
87 
88 	dbp = dbc->dbp;
89 	cp = (BTREE_CURSOR *)dbc->internal;
90 	t = dbp->bt_internal;
91 	h = cp->page;
92 	indx = cp->indx;
93 	dupadjust = replace = was_deleted = 0;
94 
95 	/*
96 	 * Fixed-length records with partial puts: it's an error to specify
97 	 * anything other simple overwrite.
98 	 */
99 	if (F_ISSET(dbp, DB_RE_FIXEDLEN) &&
100 	    F_ISSET(data, DB_DBT_PARTIAL) && data->dlen != data->size) {
101 		data_size = data->size;
102 		goto len_err;
103 	}
104 
105 	/*
106 	 * Figure out how much space the data will take, including if it's a
107 	 * partial record.
108 	 *
109 	 * Fixed-length records: it's an error to specify a record that's
110 	 * longer than the fixed-length, and we never require less than
111 	 * the fixed-length record size.
112 	 */
113 	data_size = F_ISSET(data, DB_DBT_PARTIAL) ?
114 	    CDB___bam_partsize(op, data, h, indx) : data->size;
115 	padrec = 0;
116 	if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
117 		if (data_size > t->re_len) {
118 len_err:		CDB___db_err(dbp->dbenv,
119 			    "Length improper for fixed length record %lu",
120 			    (u_long)data_size);
121 			return (EINVAL);
122 		}
123 		if (data_size < t->re_len) {
124 			padrec = 1;
125 			data_size = t->re_len;
126 		}
127 	}
128 
129 	/*
130 	 * Handle partial puts or short fixed-length records: build the
131 	 * real record.
132 	 */
133 	if (padrec || F_ISSET(data, DB_DBT_PARTIAL)) {
134 		tdbt = *data;
135 		if ((ret =
136 		    CDB___bam_build(dbc, op, &tdbt, h, indx, data_size)) != 0)
137 			return (ret);
138 		data = &tdbt;
139 	}
140 
141 	/*
142 	 * If the user has specified a duplicate comparison function, return
143 	 * an error if DB_CURRENT was specified and the replacement data
144 	 * doesn't compare equal to the current data.  This stops apps from
145 	 * screwing up the duplicate sort order.  We have to do this after
146 	 * we build the real record so that we're comparing the real items.
147 	 */
148 	if (op == DB_CURRENT && dbp->dup_compare != NULL) {
149 		if ((ret = CDB___bam_cmp(dbp, data, h,
150 		     indx + (TYPE(h) == P_LBTREE ? O_INDX : 0),
151 		     dbp->dup_compare, &cmp)) != 0)
152 			return (ret);
153 		if (cmp != 0) {
154 			CDB___db_err(dbp->dbenv,
155 			    "Current data differs from put data");
156 			return (EINVAL);
157 		}
158 	}
159 
160 	/*
161 	 * If the key or data item won't fit on a page, we'll have to store
162 	 * them on overflow pages.
163 	 */
164 	needed = 0;
165 	bigdata = data_size > cp->ovflsize;
166 	switch (op) {
167 	case DB_KEYFIRST:
168 		/* We're adding a new key and data pair. */
169 		bigkey = key->size > cp->ovflsize;
170 		if (bigkey)
171 			needed += BOVERFLOW_PSIZE;
172 		else
173 			needed += BKEYDATA_PSIZE(key->size);
174 		if (bigdata)
175 			needed += BOVERFLOW_PSIZE;
176 		else
177 			needed += BKEYDATA_PSIZE(data_size);
178 		break;
179 	case DB_AFTER:
180 	case DB_BEFORE:
181 	case DB_CURRENT:
182 		/*
183 		 * We're either overwriting the data item of a key/data pair
184 		 * or we're creating a new on-page duplicate and only adding
185 		 * a data item.
186 		 *
187 		 * !!!
188 		 * We're not currently correcting for space reclaimed from
189 		 * already deleted items, but I don't think it's worth the
190 		 * complexity.
191 		 */
192 		bigkey = 0;
193 		if (op == DB_CURRENT) {
194 			bk = GET_BKEYDATA(h,
195 			    indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
196 			if (B_TYPE(bk->type) == B_KEYDATA)
197 				have_bytes = BKEYDATA_PSIZE(bk->len);
198 			else
199 				have_bytes = BOVERFLOW_PSIZE;
200 			need_bytes = 0;
201 		} else {
202 			have_bytes = 0;
203 			need_bytes = sizeof(db_indx_t);
204 		}
205 		if (bigdata)
206 			need_bytes += BOVERFLOW_PSIZE;
207 		else
208 			need_bytes += BKEYDATA_PSIZE(data_size);
209 
210 		if (have_bytes < need_bytes)
211 			needed += need_bytes - have_bytes;
212 		break;
213 	default:
214 		return (CDB___db_unknown_flag(dbp->dbenv, "CDB___bam_iitem", op));
215 	}
216 
217 	/*
218 	 * If there's not enough room, or the user has put a ceiling on the
219 	 * number of keys permitted in the page, split the page.
220 	 *
221 	 * XXX
222 	 * The t->bt_maxkey test here may be insufficient -- do we have to
223 	 * check in the btree split code, so we don't undo it there!?!?
224 	 */
225 	if (P_FREESPACE(h) < needed ||
226 	    (t->bt_maxkey != 0 && NUM_ENT(h) > t->bt_maxkey))
227 		return (DB_NEEDSPLIT);
228 
229 	/*
230 	 * The code breaks it up into five cases:
231 	 *
232 	 * 1. Insert a new key/data pair.
233 	 * 2. Append a new data item (a new duplicate).
234 	 * 3. Insert a new data item (a new duplicate).
235 	 * 4. Delete and re-add the data item (overflow item).
236 	 * 5. Overwrite the data item.
237 	 */
238 	switch (op) {
239 	case DB_KEYFIRST:		/* 1. Insert a new key/data pair. */
240 		if (bigkey) {
241 			if ((ret = __bam_ovput(dbc,
242 			    B_OVERFLOW, PGNO_INVALID, h, indx, key)) != 0)
243 				return (ret);
244 		} else
245 			if ((ret = CDB___db_pitem(dbc, h, indx,
246 			    BKEYDATA_SIZE(key->size), NULL, key)) != 0)
247 				return (ret);
248 
249 		CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
250 		++indx;
251 		break;
252 	case DB_AFTER:			/* 2. Append a new data item. */
253 		if (TYPE(h) == P_LBTREE) {
254 			/* Copy the key for the duplicate and adjust cursors. */
255 			if ((ret =
256 			    CDB___bam_adjindx(dbc, h, indx + P_INDX, indx, 1)) != 0)
257 				return (ret);
258 			CDB___bam_ca_di(dbp, PGNO(h), indx + P_INDX, 1);
259 
260 			indx += 3;
261 			dupadjust = 1;
262 
263 			cp->indx += 2;
264 		} else {
265 			++indx;
266 			CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
267 
268 			cp->indx += 1;
269 		}
270 		break;
271 	case DB_BEFORE:			/* 3. Insert a new data item. */
272 		if (TYPE(h) == P_LBTREE) {
273 			/* Copy the key for the duplicate and adjust cursors. */
274 			if ((ret = CDB___bam_adjindx(dbc, h, indx, indx, 1)) != 0)
275 				return (ret);
276 			CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
277 
278 			++indx;
279 			dupadjust = 1;
280 		} else
281 			CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
282 		break;
283 	case DB_CURRENT:
284 		if (TYPE(h) == P_LBTREE) {
285 			++indx;
286 			dupadjust = 1;
287 
288 			/*
289 			 * In a Btree deleted records aren't counted (deleted
290 			 * records are counted in a Recno because all accesses
291 			 * are based on record number).  If it's a Btree and
292 			 * it's a DB_CURRENT operation overwriting a previously
293 			 * deleted record, increment the record count.
294 			 */
295 			was_deleted = B_DISSET(bk->type);
296 		}
297 
298 		/*
299 		 * 4. Delete and re-add the data item.
300 		 *
301 		 * If we're changing the type of the on-page structure, or we
302 		 * are referencing offpage items, we have to delete and then
303 		 * re-add the item.  We do not do any cursor adjustments here
304 		 * because we're going to immediately re-add the item into the
305 		 * same slot.
306 		 */
307 		if (bigdata || B_TYPE(bk->type) != B_KEYDATA) {
308 			if ((ret = CDB___bam_ditem(dbc, h, indx)) != 0)
309 				return (ret);
310 			break;
311 		}
312 
313 		/* 5. Overwrite the data item. */
314 		replace = 1;
315 		break;
316 	default:
317 		return (CDB___db_unknown_flag(dbp->dbenv, "CDB___bam_iitem", op));
318 	}
319 
320 	/* Add the data. */
321 	if (bigdata) {
322 		if ((ret = __bam_ovput(dbc,
323 		    B_OVERFLOW, PGNO_INVALID, h, indx, data)) != 0)
324 			return (ret);
325 	} else {
326 		if (LF_ISSET(BI_DELETED)) {
327 			B_TSET(bk_tmp.type, B_KEYDATA, 1);
328 			bk_tmp.len = data->size;
329 			bk_hdr.data = &bk_tmp;
330 			bk_hdr.size = SSZA(BKEYDATA, data);
331 			ret = CDB___db_pitem(dbc, h, indx,
332 			    BKEYDATA_SIZE(data->size), &bk_hdr, data);
333 		} else if (replace)
334 			ret = CDB___bam_ritem(dbc, h, indx, data);
335 		else
336 			ret = CDB___db_pitem(dbc, h, indx,
337 			    BKEYDATA_SIZE(data->size), NULL, data);
338 		if (ret != 0)
339 			return (ret);
340 	}
341 	if ((ret = CDB_memp_fset(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0)
342 		return (ret);
343 
344 	/*
345 	 * Adjust the cursors in general.  After that's done, reset the current
346 	 * cursor to point to the new item.
347 	 */
348 	if (op == DB_CURRENT)
349 		(void)CDB___bam_ca_delete(dbp, PGNO(h),
350 		    TYPE(h) == P_LBTREE ? indx - O_INDX : indx, 0);
351 	else {
352 		CDB___bam_ca_di(dbp, PGNO(h), indx, 1);
353 		cp->indx = TYPE(h) == P_LBTREE ? indx - O_INDX : indx;
354 	}
355 
356 	/*
357 	 * If we've changed the record count, update the tree.  There's no
358 	 * need to adjust the count if the operation not performed on the
359 	 * current record or when the current record was previously deleted.
360 	 */
361 	if (F_ISSET(cp, C_RECNUM) && (op != DB_CURRENT || was_deleted))
362 		if ((ret = CDB___bam_adjust(dbc, 1)) != 0)
363 			return (ret);
364 
365 	/*
366 	 * If a Btree leaf page is at least 50% full and we may have added or
367 	 * modified a duplicate data item, see if the set of duplicates takes
368 	 * up at least 25% of the space on the page.  If it does, move it onto
369 	 * its own page.
370 	 */
371 	if (dupadjust && P_FREESPACE(h) <= dbp->pgsize / 2) {
372 		if ((ret = __bam_dup_convert(dbc, h, indx - O_INDX)) != 0)
373 			return (ret);
374 	}
375 
376 	/* If we've modified a recno file, set the flag. */
377 	if (dbc->dbtype == DB_RECNO)
378 		F_SET(t, RECNO_MODIFIED);
379 
380 	return (ret);
381 }
382 
383 /*
384  * CDB___bam_partsize --
385  *	Figure out how much space a partial data item is in total.
386  *
387  * PUBLIC: u_int32_t CDB___bam_partsize __P((u_int32_t, DBT *, PAGE *, u_int32_t));
388  */
389 u_int32_t
CDB___bam_partsize(op,data,h,indx)390 CDB___bam_partsize(op, data, h, indx)
391 	u_int32_t op, indx;
392 	DBT *data;
393 	PAGE *h;
394 {
395 	BKEYDATA *bk;
396 	u_int32_t nbytes;
397 
398 	/*
399 	 * If the record doesn't already exist, it's simply the data we're
400 	 * provided.
401 	 */
402 	if (op != DB_CURRENT)
403 		return (data->doff + data->size);
404 
405 	/*
406 	 * Otherwise, it's the data provided plus any already existing data
407 	 * that we're not replacing.
408 	 */
409 	bk = GET_BKEYDATA(h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
410 	nbytes =
411 	    B_TYPE(bk->type) == B_OVERFLOW ? ((BOVERFLOW *)bk)->tlen : bk->len;
412 
413 	/*
414 	 * There are really two cases here:
415 	 *
416 	 * Case 1: We are replacing some bytes that do not exist (i.e., they
417 	 * are past the end of the record).  In this case the number of bytes
418 	 * we are replacing is irrelevant and all we care about is how many
419 	 * bytes we are going to add from offset.  So, the new record length
420 	 * is going to be the size of the new bytes (size) plus wherever those
421 	 * new bytes begin (doff).
422 	 *
423 	 * Case 2: All the bytes we are replacing exist.  Therefore, the new
424 	 * size is the oldsize (nbytes) minus the bytes we are replacing (dlen)
425 	 * plus the bytes we are adding (size).
426 	 */
427 	if (nbytes < data->doff + data->dlen)		/* Case 1 */
428 		return (data->doff + data->size);
429 
430 	return (nbytes + data->size - data->dlen);	/* Case 2 */
431 }
432 
433 /*
434  * CDB___bam_build --
435  *	Build the real record for a partial put, or short fixed-length record.
436  *
437  * PUBLIC: int CDB___bam_build __P((DBC *, u_int32_t,
438  * PUBLIC:     DBT *, PAGE *, u_int32_t, u_int32_t));
439  */
440 int
CDB___bam_build(dbc,op,dbt,h,indx,nbytes)441 CDB___bam_build(dbc, op, dbt, h, indx, nbytes)
442 	DBC *dbc;
443 	u_int32_t op, indx, nbytes;
444 	DBT *dbt;
445 	PAGE *h;
446 {
447 	BKEYDATA *bk, tbk;
448 	BOVERFLOW *bo;
449 	BTREE *t;
450 
451 	DB *dbp;
452 	DBT copy;
453 	u_int32_t len, tlen;
454 	u_int8_t *p;
455 	int ret;
456 
457 	COMPQUIET(bo, NULL);
458 
459 	dbp = dbc->dbp;
460 	t = dbp->bt_internal;
461 
462 	/* We use the record data return memory, it's only a short-term use. */
463 	if (dbc->rdata.ulen < nbytes) {
464 		if ((ret = CDB___os_realloc(dbp->dbenv,
465 		    nbytes, NULL, &dbc->rdata.data)) != 0) {
466 			dbc->rdata.ulen = 0;
467 			dbc->rdata.data = NULL;
468 			return (ret);
469 		}
470 		dbc->rdata.ulen = nbytes;
471 	}
472 
473 	/*
474 	 * We use nul or pad bytes for any part of the record that isn't
475 	 * specified; get it over with.
476 	 */
477 	memset(dbc->rdata.data,
478 	   F_ISSET(dbp, DB_RE_FIXEDLEN) ? t->re_pad : 0, nbytes);
479 
480 	/*
481 	 * In the next clauses, we need to do three things: a) set p to point
482 	 * to the place at which to copy the user's data, b) set tlen to the
483 	 * total length of the record, not including the bytes contributed by
484 	 * the user, and c) copy any valid data from an existing record.  If
485 	 * it's not a partial put (this code is called for both partial puts
486 	 * and fixed-length record padding) or it's a new key, we can cut to
487 	 * the chase.
488 	 */
489 	if (!F_ISSET(dbt, DB_DBT_PARTIAL) || op != DB_CURRENT) {
490 		p = (u_int8_t *)dbc->rdata.data + dbt->doff;
491 		tlen = dbt->doff;
492 		goto user_copy;
493 	}
494 
495 	/* Find the current record. */
496 	if (indx < NUM_ENT(h)) {
497 		bk = GET_BKEYDATA(h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
498 		bo = (BOVERFLOW *)bk;
499 	} else {
500 		bk = &tbk;
501 		B_TSET(bk->type, B_KEYDATA, 0);
502 		bk->len = 0;
503 	}
504 	if (B_TYPE(bk->type) == B_OVERFLOW) {
505 		/*
506 		 * In the case of an overflow record, we shift things around
507 		 * in the current record rather than allocate a separate copy.
508 		 */
509 		memset(&copy, 0, sizeof(copy));
510 		if ((ret = CDB___db_goff(dbp, &copy, bo->tlen,
511 		    bo->pgno, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
512 			return (ret);
513 
514 		/* Skip any leading data from the original record. */
515 		tlen = dbt->doff;
516 		p = (u_int8_t *)dbc->rdata.data + dbt->doff;
517 
518 		/*
519 		 * Copy in any trailing data from the original record.
520 		 *
521 		 * If the original record was larger than the original offset
522 		 * plus the bytes being deleted, there is trailing data in the
523 		 * original record we need to preserve.  If we aren't deleting
524 		 * the same number of bytes as we're inserting, copy it up or
525 		 * down, into place.
526 		 *
527 		 * Use memmove(), the regions may overlap.
528 		 */
529 		if (bo->tlen > dbt->doff + dbt->dlen) {
530 			len = bo->tlen - (dbt->doff + dbt->dlen);
531 			if (dbt->dlen != dbt->size)
532 				memmove(p + dbt->size, p + dbt->dlen, len);
533 			tlen += len;
534 		}
535 	} else {
536 		/* Copy in any leading data from the original record. */
537 		memcpy(dbc->rdata.data,
538 		    bk->data, dbt->doff > bk->len ? bk->len : dbt->doff);
539 		tlen = dbt->doff;
540 		p = (u_int8_t *)dbc->rdata.data + dbt->doff;
541 
542 		/* Copy in any trailing data from the original record. */
543 		len = dbt->doff + dbt->dlen;
544 		if (bk->len > len) {
545 			memcpy(p + dbt->size, bk->data + len, bk->len - len);
546 			tlen += bk->len - len;
547 		}
548 	}
549 
550 user_copy:
551 	/*
552 	 * Copy in the application provided data -- p and tlen must have been
553 	 * initialized above.
554 	 */
555 	memcpy(p, dbt->data, dbt->size);
556 	tlen += dbt->size;
557 
558 	/* Set the DBT to reference our new record. */
559 	dbc->rdata.size = F_ISSET(dbp, DB_RE_FIXEDLEN) ? t->re_len : tlen;
560 	dbc->rdata.dlen = 0;
561 	dbc->rdata.doff = 0;
562 	dbc->rdata.flags = 0;
563 	*dbt = dbc->rdata;
564 	return (0);
565 }
566 
567 /*
568  * CDB___bam_ritem --
569  *	Replace an item on a page.
570  *
571  * PUBLIC: int CDB___bam_ritem __P((DBC *, PAGE *, u_int32_t, DBT *));
572  */
573 int
CDB___bam_ritem(dbc,h,indx,data)574 CDB___bam_ritem(dbc, h, indx, data)
575 	DBC *dbc;
576 	PAGE *h;
577 	u_int32_t indx;
578 	DBT *data;
579 {
580 	BKEYDATA *bk;
581 	DB *dbp;
582 	DBT orig, repl;
583 	db_indx_t cnt, lo, ln, min, off, prefix, suffix;
584 	int32_t nbytes;
585 	int ret;
586 	u_int8_t *p, *t;
587 
588 	dbp = dbc->dbp;
589 
590 	/*
591 	 * Replace a single item onto a page.  The logic figuring out where
592 	 * to insert and whether it fits is handled in the caller.  All we do
593 	 * here is manage the page shuffling.
594 	 */
595 	bk = GET_BKEYDATA(h, indx);
596 
597 	/* Log the change. */
598 	if (DB_LOGGING(dbc)) {
599 		/*
600 		 * We might as well check to see if the two data items share
601 		 * a common prefix and suffix -- it can save us a lot of log
602 		 * message if they're large.
603 		 */
604 		min = data->size < bk->len ? data->size : bk->len;
605 		for (prefix = 0,
606 		    p = bk->data, t = data->data;
607 		    prefix < min && *p == *t; ++prefix, ++p, ++t)
608 			;
609 
610 		min -= prefix;
611 		for (suffix = 0,
612 		    p = (u_int8_t *)bk->data + bk->len - 1,
613 		    t = (u_int8_t *)data->data + data->size - 1;
614 		    suffix < min && *p == *t; ++suffix, --p, --t)
615 			;
616 
617 		/* We only log the parts of the keys that have changed. */
618 		orig.data = (u_int8_t *)bk->data + prefix;
619 		orig.size = bk->len - (prefix + suffix);
620 		repl.data = (u_int8_t *)data->data + prefix;
621 		repl.size = data->size - (prefix + suffix);
622 		if ((ret = CDB___bam_repl_log(dbp->dbenv, dbc->txn,
623 		    &LSN(h), 0, dbp->log_fileid, PGNO(h), &LSN(h),
624 		    (u_int32_t)indx, (u_int32_t)B_DISSET(bk->type),
625 		    &orig, &repl, (u_int32_t)prefix, (u_int32_t)suffix)) != 0)
626 			return (ret);
627 	}
628 
629 	/*
630 	 * Set references to the first in-use byte on the page and the
631 	 * first byte of the item being replaced.
632 	 */
633 	p = (u_int8_t *)h + HOFFSET(h);
634 	t = (u_int8_t *)bk;
635 
636 	/*
637 	 * If the entry is growing in size, shift the beginning of the data
638 	 * part of the page down.  If the entry is shrinking in size, shift
639 	 * the beginning of the data part of the page up.  Use memmove(3),
640 	 * the regions overlap.
641 	 */
642 	lo = BKEYDATA_SIZE(bk->len);
643 	ln = BKEYDATA_SIZE(data->size);
644 	if (lo != ln) {
645 		nbytes = lo - ln;		/* Signed difference. */
646 		if (p == t)			/* First index is fast. */
647 			h->inp[indx] += nbytes;
648 		else {				/* Else, shift the page. */
649 			memmove(p + nbytes, p, t - p);
650 
651 			/* Adjust the indices' offsets. */
652 			off = h->inp[indx];
653 			for (cnt = 0; cnt < NUM_ENT(h); ++cnt)
654 				if (h->inp[cnt] <= off)
655 					h->inp[cnt] += nbytes;
656 		}
657 
658 		/* Clean up the page and adjust the item's reference. */
659 		HOFFSET(h) += nbytes;
660 		t += nbytes;
661 	}
662 
663 	/* Copy the new item onto the page. */
664 	bk = (BKEYDATA *)t;
665 	B_TSET(bk->type, B_KEYDATA, 0);
666 	bk->len = data->size;
667 	memcpy(bk->data, data->data, data->size);
668 
669 	return (0);
670 }
671 
672 /*
673  * __bam_dup_convert --
674  *	Check to see if the duplicate set at indx should have its own page.
675  *	If it should, create it.
676  */
677 static int
__bam_dup_convert(dbc,h,indx)678 __bam_dup_convert(dbc, h, indx)
679 	DBC *dbc;
680 	PAGE *h;
681 	u_int32_t indx;
682 {
683 	BKEYDATA *bk;
684 	DB *dbp;
685 	DBT hdr;
686 	PAGE *dp;
687 	db_indx_t cnt, cpindx, first, sz;
688 	int ret;
689 
690 	dbp = dbc->dbp;
691 
692 	/*
693 	 * Count the duplicate records and calculate how much room they're
694 	 * using on the page.
695 	 */
696 	while (indx > 0 && h->inp[indx] == h->inp[indx - P_INDX])
697 		indx -= P_INDX;
698 	for (cnt = 0, sz = 0, first = indx;; ++cnt, indx += P_INDX) {
699 		if (indx >= NUM_ENT(h) || h->inp[first] != h->inp[indx])
700 			break;
701 		bk = GET_BKEYDATA(h, indx);
702 		sz += B_TYPE(bk->type) == B_KEYDATA ?
703 		    BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
704 		bk = GET_BKEYDATA(h, indx + O_INDX);
705 		sz += B_TYPE(bk->type) == B_KEYDATA ?
706 		    BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
707 	}
708 
709 	/*
710 	 * We have to do these checks when the user is replacing the cursor's
711 	 * data item -- if the application replaces a duplicate item with a
712 	 * larger data item, it can increase the amount of space used by the
713 	 * duplicates, requiring this check.  But that means we may have done
714 	 * this check when it wasn't a duplicate item after all.
715 	 */
716 	if (cnt == 1)
717 		return (0);
718 
719 	/*
720 	 * If this set of duplicates is using more than 25% of the page, move
721 	 * them off.  The choice of 25% is a WAG, but the value must be small
722 	 * enough that we can always split a page without putting duplicates
723 	 * on two different pages.
724 	 */
725 	if (sz < dbp->pgsize / 4)
726 		return (0);
727 
728 	/* Get a new page. */
729 	if ((ret = CDB___db_new(dbc,
730 	    ((dbp->dup_compare == NULL ? P_LRECNO : P_LDUP) | dbp->tags), &dp)) != 0)
731 		return (ret);
732 	P_INIT(dp, dbp->pgsize, dp->pgno,
733 	    PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp), TAGS(dp));
734 
735 	/*
736 	 * Move this set of duplicates off the page.  First points to the first
737 	 * key of the first duplicate key/data pair, cnt is the number of pairs
738 	 * we're dealing with.
739 	 */
740 	memset(&hdr, 0, sizeof(hdr));
741 	for (indx = first + O_INDX, cpindx = 0;;) {
742 		/* Move cursors referencing the old entry to the new entry. */
743 		if ((ret = CDB___bam_ca_dup(dbp, first,
744 		    PGNO(h), indx - O_INDX, PGNO(dp), cpindx)) != 0)
745 			goto err;
746 
747 		/*
748 		 * Copy the entry to the new page.  If the off-duplicate page
749 		 * is a Btree page, deleted entries move normally.  If it's a
750 		 * Recno page, deleted entries are discarded.
751 		 */
752 		bk = GET_BKEYDATA(h, indx);
753 		hdr.data = bk;
754 		hdr.size = B_TYPE(bk->type) == B_KEYDATA ?
755 		    BKEYDATA_SIZE(bk->len) : BOVERFLOW_SIZE;
756 		if (dbp->dup_compare != NULL || !B_DISSET(bk->type)) {
757 			if ((ret = CDB___db_pitem(
758 			    dbc, dp, cpindx, hdr.size, &hdr, NULL)) != 0)
759 				goto err;
760 			++cpindx;
761 		}
762 
763 		/* Delete the data item. */
764 		if ((ret = CDB___db_ditem(dbc, h, indx, hdr.size)) != 0)
765 			goto err;
766 		CDB___bam_ca_di(dbp, PGNO(h), indx, -1);
767 
768 		/* Delete all but the first reference to the key. */
769 		if (--cnt == 0)
770 			break;
771 		if ((ret = CDB___bam_adjindx(dbc, h, indx, first, 0)) != 0)
772 			goto err;
773 		CDB___bam_ca_di(dbp, PGNO(h), indx, -1);
774 	}
775 
776 	/* Put in a new data item that points to the duplicates page. */
777 	if ((ret = __bam_ovput(dbc, B_DUPLICATE, dp->pgno, h, indx, NULL)) != 0)
778 		goto err;
779 
780 	return (CDB_memp_fput(dbp->mpf, dp, DB_MPOOL_DIRTY));
781 
782 err:	(void)CDB___db_free(dbc, dp);
783 	return (ret);
784 }
785 
786 /*
787  * __bam_ovput --
788  *	Build an item for an off-page duplicates page or overflow page and
789  *	insert it on the page.
790  */
791 static int
__bam_ovput(dbc,type,pgno,h,indx,item)792 __bam_ovput(dbc, type, pgno, h, indx, item)
793 	DBC *dbc;
794 	u_int32_t type, indx;
795 	db_pgno_t pgno;
796 	PAGE *h;
797 	DBT *item;
798 {
799 	BOVERFLOW bo;
800 	DBT hdr;
801 	int ret;
802 
803 	UMRW(bo.unused1);
804 	B_TSET(bo.type, type, 0);
805 	UMRW(bo.unused2);
806 
807 	/*
808 	 * If we're creating an overflow item, do so and acquire the page
809 	 * number for it.  If we're creating an off-page duplicates tree,
810 	 * we are giving the page number as an argument.
811 	 */
812 	if (type == B_OVERFLOW) {
813 		if ((ret = CDB___db_poff(dbc, item, &bo.pgno)) != 0)
814 			return (ret);
815 		bo.tlen = item->size;
816 	} else {
817 		bo.pgno = pgno;
818 		bo.tlen = 0;
819 	}
820 
821 	/* Store the new record on the page. */
822 	memset(&hdr, 0, sizeof(hdr));
823 	hdr.data = &bo;
824 	hdr.size = BOVERFLOW_SIZE;
825 	return (CDB___db_pitem(dbc, h, indx, BOVERFLOW_SIZE, &hdr, NULL));
826 }
827