1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 2013 Oracle and/or its affiliates.  All rights reserved.
5  */
6 /*
7  * Copyright (c) 1990, 1993, 1994
8  *	The Regents of the University of California.  All rights reserved.
9  *
10  * This code is derived from software contributed to Berkeley by
11  * Margo Seltzer.
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions
15  * are met:
16  * 1. Redistributions of source code must retain the above copyright
17  *    notice, this list of conditions and the following disclaimer.
18  * 2. Redistributions in binary form must reproduce the above copyright
19  *    notice, this list of conditions and the following disclaimer in the
20  *    documentation and/or other materials provided with the distribution.
21  * 3. Neither the name of the University nor the names of its contributors
22  *    may be used to endorse or promote products derived from this software
23  *    without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
26  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
29  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35  * SUCH DAMAGE.
36  *
37  * $Id$
38  */
39 
40 /*
41  * PACKAGE:  hashing
42  *
43  * DESCRIPTION:
44  *	Manipulation of duplicates for the hash package.
45  */
46 
47 #include "db_config.h"
48 
49 #include "db_int.h"
50 #include "dbinc/db_page.h"
51 #include "dbinc/hash.h"
52 #include "dbinc/btree.h"
53 #include "dbinc/mp.h"
54 
55 static int __hamc_chgpg __P((DBC *,
56     db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
57 static int __ham_check_move __P((DBC *, u_int32_t));
58 static int __ham_dcursor __P((DBC *, db_pgno_t, u_int32_t));
59 static int __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t));
60 static int __hamc_chgpg_func
61     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
62 
63 /*
64  * Called from hash_access to add a duplicate key. nval is the new
65  * value that we want to add.  The flags correspond to the flag values
66  * to cursor_put indicating where to add the new element.
67  * There are 4 cases.
68  * Case 1: The existing duplicate set already resides on a separate page.
69  *	   We return and let the common code handle this.
70  * Case 2: The element is small enough to just be added to the existing set.
71  * Case 3: The element is large enough to be a big item, so we're going to
72  *	   have to push the set onto a new page.
73  * Case 4: The element is large enough to push the duplicate set onto a
74  *	   separate page.
75  *
76  * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t, db_pgno_t *));
77  */
78 int
__ham_add_dup(dbc,nval,flags,pgnop)79 __ham_add_dup(dbc, nval, flags, pgnop)
80 	DBC *dbc;
81 	DBT *nval;
82 	u_int32_t flags;
83 	db_pgno_t *pgnop;
84 {
85 	DB *dbp;
86 	DBT pval, tmp_val;
87 	DB_MPOOLFILE *mpf;
88 	ENV *env;
89 	HASH_CURSOR *hcp;
90 	u_int32_t add_bytes, new_size;
91 	int cmp, ret;
92 	u_int8_t *hk;
93 
94 	dbp = dbc->dbp;
95 	env = dbp->env;
96 	mpf = dbp->mpf;
97 	hcp = (HASH_CURSOR *)dbc->internal;
98 
99 	DB_ASSERT(env, flags != DB_CURRENT);
100 
101 	add_bytes = nval->size +
102 	    (F_ISSET(nval, DB_DBT_PARTIAL) ? nval->doff : 0);
103 	add_bytes = DUP_SIZE(add_bytes);
104 
105 	if ((ret = __ham_check_move(dbc, add_bytes)) != 0)
106 		return (ret);
107 
108 	/*
109 	 * Check if resulting duplicate set is going to need to go
110 	 * onto a separate duplicate page.  If so, convert the
111 	 * duplicate set and add the new one.  After conversion,
112 	 * hcp->dndx is the first free ndx or the index of the
113 	 * current pointer into the duplicate set.
114 	 */
115 	hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
116 	/* Add the len bytes to the current singleton. */
117 	if (HPAGE_PTYPE(hk) != H_DUPLICATE)
118 		add_bytes += DUP_SIZE(0);
119 	new_size =
120 	    LEN_HKEYDATA(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)) +
121 	    add_bytes;
122 
123 	/*
124 	 * We convert to off-page duplicates if the item is a big item,
125 	 * the addition of the new item will make the set large, or
126 	 * if there isn't enough room on this page to add the next item.
127 	 */
128 	if (HPAGE_PTYPE(hk) != H_OFFDUP &&
129 	    (HPAGE_PTYPE(hk) == H_OFFPAGE || ISBIG(hcp, new_size) ||
130 	    add_bytes > P_FREESPACE(dbp, hcp->page))) {
131 
132 		if ((ret = __ham_dup_convert(dbc)) != 0)
133 			return (ret);
134 		return (hcp->opd->am_put(hcp->opd,
135 		    NULL, nval, flags, NULL));
136 	}
137 
138 	/* There are two separate cases here: on page and off page. */
139 	if (HPAGE_PTYPE(hk) != H_OFFDUP) {
140 		if (HPAGE_PTYPE(hk) != H_DUPLICATE) {
141 			pval.flags = 0;
142 			pval.data = HKEYDATA_DATA(hk);
143 			pval.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize,
144 			    hcp->indx);
145 			if ((ret = __ham_make_dup(env,
146 			    &pval, &tmp_val, &dbc->my_rdata.data,
147 			    &dbc->my_rdata.ulen)) != 0 || (ret =
148 			    __ham_replpair(dbc, &tmp_val, H_DUPLICATE)) != 0)
149 				return (ret);
150 			hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
151 			HPAGE_PTYPE(hk) = H_DUPLICATE;
152 
153 			/*
154 			 * Update the cursor position since we now are in
155 			 * duplicates.
156 			 */
157 			F_SET(hcp, H_ISDUP);
158 			hcp->dup_off = 0;
159 			hcp->dup_len = pval.size;
160 			hcp->dup_tlen = DUP_SIZE(hcp->dup_len);
161 		}
162 
163 		/* Now make the new entry a duplicate. */
164 		if ((ret = __ham_make_dup(env, nval,
165 		    &tmp_val, &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
166 			return (ret);
167 
168 		tmp_val.dlen = 0;
169 		switch (flags) {			/* On page. */
170 		case DB_KEYFIRST:
171 		case DB_KEYLAST:
172 		case DB_NODUPDATA:
173 		case DB_OVERWRITE_DUP:
174 			if (dbp->dup_compare != NULL) {
175 				__ham_dsearch(dbc,
176 				    nval, &tmp_val.doff, &cmp, flags);
177 
178 				/*
179 				 * Duplicate duplicates are not supported w/
180 				 * sorted dups.  We can either overwrite or
181 				 * return DB_KEYEXIST.
182 				 */
183 				if (cmp == 0) {
184 					if (flags == DB_OVERWRITE_DUP)
185 						return (__ham_overwrite(dbc,
186 						    nval, flags));
187 					return (__db_duperr(dbp, flags));
188 				}
189 			} else {
190 				hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
191 				    dbp->pgsize, hcp->indx);
192 				hcp->dup_len = nval->size;
193 				F_SET(hcp, H_ISDUP);
194 				if (flags == DB_KEYFIRST)
195 					hcp->dup_off = tmp_val.doff = 0;
196 				else
197 					hcp->dup_off =
198 					    tmp_val.doff = hcp->dup_tlen;
199 			}
200 			break;
201 		case DB_BEFORE:
202 			tmp_val.doff = hcp->dup_off;
203 			break;
204 		case DB_AFTER:
205 			tmp_val.doff = hcp->dup_off + DUP_SIZE(hcp->dup_len);
206 			break;
207 		default:
208 			return (__db_unknown_path(env, "__ham_add_dup"));
209 		}
210 
211 		/* Add the duplicate. */
212 		if ((ret = __memp_dirty(mpf, &hcp->page,
213 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
214 		    (ret = __ham_replpair(dbc, &tmp_val, H_DUPLICATE)) != 0)
215 			return (ret);
216 
217 		/* Now, update the cursor if necessary. */
218 		switch (flags) {
219 		case DB_AFTER:
220 			hcp->dup_off += DUP_SIZE(hcp->dup_len);
221 			hcp->dup_len = nval->size;
222 			hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
223 			break;
224 		case DB_BEFORE:
225 		case DB_KEYFIRST:
226 		case DB_KEYLAST:
227 		case DB_NODUPDATA:
228 		case DB_OVERWRITE_DUP:
229 			hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
230 			hcp->dup_len = nval->size;
231 			break;
232 		default:
233 			return (__db_unknown_path(env, "__ham_add_dup"));
234 		}
235 		ret = __hamc_update(dbc, tmp_val.size, DB_HAM_CURADJ_ADD, 1);
236 		return (ret);
237 	}
238 
239 	/*
240 	 * If we get here, then we're on duplicate pages; set pgnop and
241 	 * return so the common code can handle it.
242 	 */
243 	memcpy(pgnop, HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
244 	    sizeof(db_pgno_t));
245 
246 	return (ret);
247 }
248 
249 /*
250  * Convert an on-page set of duplicates to an offpage set of duplicates.
251  *
252  * PUBLIC: int __ham_dup_convert __P((DBC *));
253  */
254 int
__ham_dup_convert(dbc)255 __ham_dup_convert(dbc)
256 	DBC *dbc;
257 {
258 	BOVERFLOW bo;
259 	DB *dbp;
260 	DBC **hcs;
261 	DBT dbt;
262 	DB_LSN lsn;
263 	DB_MPOOLFILE *mpf;
264 	ENV *env;
265 	HASH_CURSOR *hcp;
266 	HOFFPAGE ho;
267 	PAGE *dp;
268 	db_indx_t i, len, off;
269 	int c, ret, t_ret;
270 	u_int8_t *p, *pend;
271 
272 	dbp = dbc->dbp;
273 	env = dbp->env;
274 	mpf = dbp->mpf;
275 	hcp = (HASH_CURSOR *)dbc->internal;
276 
277 	/*
278 	 * Create a new page for the duplicates.
279 	 */
280 	if ((ret = __db_new(dbc,
281 	    dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, NULL, &dp)) != 0)
282 		return (ret);
283 	P_INIT(dp, dbp->pgsize,
284 	    dp->pgno, PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
285 
286 	/*
287 	 * Get the list of cursors that may need to be updated.
288 	 */
289 	if ((ret = __ham_get_clist(dbp,
290 	    PGNO(hcp->page), (u_int32_t)hcp->indx, &hcs)) != 0)
291 		goto err;
292 
293 	/*
294 	 * Now put the duplicates onto the new page.
295 	 */
296 	dbt.flags = 0;
297 	switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
298 	case H_KEYDATA:
299 		/* Simple case, one key on page; move it to dup page. */
300 		dbt.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
301 		dbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
302 		ret = __db_pitem(dbc,
303 		    dp, 0, BKEYDATA_SIZE(dbt.size), NULL, &dbt);
304 		goto finish;
305 	case H_OFFPAGE:
306 		/* Simple case, one key on page; move it to dup page. */
307 		memcpy(&ho, P_ENTRY(dbp, hcp->page, H_DATAINDEX(hcp->indx)),
308 		    HOFFPAGE_SIZE);
309 		UMRW_SET(bo.unused1);
310 		B_TSET(bo.type, ho.type);
311 		UMRW_SET(bo.unused2);
312 		bo.pgno = ho.pgno;
313 		bo.tlen = ho.tlen;
314 		dbt.size = BOVERFLOW_SIZE;
315 		dbt.data = &bo;
316 
317 		ret = __db_pitem(dbc, dp, 0, dbt.size, &dbt, NULL);
318 finish:		if (ret == 0) {
319 			/* Update any other cursors. */
320 			if (hcs != NULL && DBC_LOGGING(dbc) &&
321 			    IS_SUBTRANSACTION(dbc->txn)) {
322 				if ((ret = __ham_chgpg_log(dbp, dbc->txn,
323 				    &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
324 				    PGNO(dp), hcp->indx, 0)) != 0)
325 					break;
326 			}
327 			for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
328 				if ((ret = __ham_dcursor(hcs[c],
329 				    PGNO(dp), 0)) != 0)
330 					break;
331 		}
332 		break;
333 	case H_DUPLICATE:
334 		p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
335 		pend = p +
336 		    LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
337 
338 		/*
339 		 * We need to maintain the duplicate cursor position.
340 		 * Keep track of where we are in the duplicate set via
341 		 * the offset, and when it matches the one in the cursor,
342 		 * set the off-page duplicate cursor index to the current
343 		 * index.
344 		 */
345 		for (off = 0, i = 0; p < pend; i++) {
346 			memcpy(&len, p, sizeof(db_indx_t));
347 			dbt.size = len;
348 			p += sizeof(db_indx_t);
349 			dbt.data = p;
350 			p += len + sizeof(db_indx_t);
351 			if ((ret = __db_pitem(dbc, dp,
352 			    i, BKEYDATA_SIZE(dbt.size), NULL, &dbt)) != 0)
353 				break;
354 
355 			/* Update any other cursors */
356 			if (hcs != NULL && DBC_LOGGING(dbc) &&
357 			    IS_SUBTRANSACTION(dbc->txn)) {
358 				if ((ret = __ham_chgpg_log(dbp, dbc->txn,
359 				    &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
360 				    PGNO(dp), hcp->indx, i)) != 0)
361 					break;
362 			}
363 			for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
364 				if (((HASH_CURSOR *)(hcs[c]->internal))->dup_off
365 				    == off && (ret = __ham_dcursor(hcs[c],
366 				    PGNO(dp), i)) != 0)
367 					goto err;
368 			off += len + 2 * sizeof(db_indx_t);
369 		}
370 		break;
371 	default:
372 		ret = __db_pgfmt(env, hcp->pgno);
373 		break;
374 	}
375 
376 	/*
377 	 * Now attach this to the source page in place of the old duplicate
378 	 * item.
379 	 */
380 	if (ret == 0)
381 		ret = __memp_dirty(mpf,
382 		    &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0);
383 
384 	if (ret == 0)
385 		ret = __ham_move_offpage(dbc, hcp->page,
386 		    (u_int32_t)H_DATAINDEX(hcp->indx), PGNO(dp));
387 
388 err:	if ((t_ret = __memp_fput(mpf,
389 	    dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0)
390 		ret = t_ret;
391 
392 	if (ret == 0)
393 		hcp->dup_tlen = hcp->dup_off = hcp->dup_len = 0;
394 
395 	if (hcs != NULL)
396 		__os_free(env, hcs);
397 
398 	return (ret);
399 }
400 
401 /*
402  * __ham_make_dup
403  *
404  * Take a regular dbt and make it into a duplicate item with all the partial
405  * information set appropriately. If the incoming dbt is a partial, assume
406  * we are creating a new entry and make sure that we do any initial padding.
407  *
408  * PUBLIC: int __ham_make_dup __P((ENV *,
409  * PUBLIC:     const DBT *, DBT *d, void **, u_int32_t *));
410  */
411 int
__ham_make_dup(env,notdup,duplicate,bufp,sizep)412 __ham_make_dup(env, notdup, duplicate, bufp, sizep)
413 	ENV *env;
414 	const DBT *notdup;
415 	DBT *duplicate;
416 	void **bufp;
417 	u_int32_t *sizep;
418 {
419 	db_indx_t tsize, item_size;
420 	int ret;
421 	u_int8_t *p;
422 
423 	item_size = (db_indx_t)notdup->size;
424 	if (F_ISSET(notdup, DB_DBT_PARTIAL))
425 		item_size += notdup->doff;
426 
427 	tsize = DUP_SIZE(item_size);
428 	if ((ret = __ham_init_dbt(env, duplicate, tsize, bufp, sizep)) != 0)
429 		return (ret);
430 
431 	duplicate->dlen = 0;
432 	duplicate->flags = notdup->flags;
433 	F_SET(duplicate, DB_DBT_PARTIAL);
434 
435 	p = duplicate->data;
436 	memcpy(p, &item_size, sizeof(db_indx_t));
437 	p += sizeof(db_indx_t);
438 	if (F_ISSET(notdup, DB_DBT_PARTIAL)) {
439 		memset(p, 0, notdup->doff);
440 		p += notdup->doff;
441 	}
442 	memcpy(p, notdup->data, notdup->size);
443 	p += notdup->size;
444 	memcpy(p, &item_size, sizeof(db_indx_t));
445 
446 	duplicate->doff = 0;
447 	duplicate->dlen = notdup->size;
448 
449 	return (0);
450 }
451 
452 /*
453  * __ham_check_move --
454  *
455  * Check if we can do whatever we need to on this page.  If not,
456  * then we'll have to move the current element to a new page.
457  */
458 static int
__ham_check_move(dbc,add_len)459 __ham_check_move(dbc, add_len)
460 	DBC *dbc;
461 	u_int32_t add_len;
462 {
463 	DB *dbp;
464 	DBT k, d;
465 	DB_LSN new_lsn;
466 	DB_MPOOLFILE *mpf;
467 	HASH_CURSOR *hcp;
468 	PAGE *new_pagep, *next_pagep;
469 	db_pgno_t next_pgno;
470 	u_int32_t data_type, key_type, new_datalen, old_len;
471 	db_indx_t new_indx;
472 	u_int8_t *hk;
473 	int found, match, ret;
474 
475 	dbp = dbc->dbp;
476 	mpf = dbp->mpf;
477 	hcp = (HASH_CURSOR *)dbc->internal;
478 
479 	hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
480 	found = 0;
481 
482 	/*
483 	 * If the item is already off page duplicates or an offpage item,
484 	 * then we know we can do whatever we need to do in-place
485 	 */
486 	if (HPAGE_PTYPE(hk) == H_OFFDUP || HPAGE_PTYPE(hk) == H_OFFPAGE)
487 		return (0);
488 
489 	old_len =
490 	    LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx));
491 	new_datalen = (old_len - HKEYDATA_SIZE(0)) + add_len;
492 	if (HPAGE_PTYPE(hk) != H_DUPLICATE)
493 		new_datalen += DUP_SIZE(0);
494 
495 	/*
496 	 * We need to add a new page under two conditions:
497 	 * 1. The addition makes the total data length cross the BIG
498 	 *    threshold and the OFFDUP structure won't fit on this page.
499 	 * 2. The addition does not make the total data cross the
500 	 *    threshold, but the new data won't fit on the page.
501 	 * If neither of these is true, then we can return.
502 	 */
503 	if (ISBIG(hcp, new_datalen) && (old_len > HOFFDUP_SIZE ||
504 	    HOFFDUP_SIZE - old_len <= P_FREESPACE(dbp, hcp->page)))
505 		return (0);
506 
507 	if (!ISBIG(hcp, new_datalen) &&
508 	    (new_datalen - old_len) <= P_FREESPACE(dbp, hcp->page))
509 		return (0);
510 
511 	/*
512 	 * If we get here, then we need to move the item to a new page.
513 	 * Check if there are more pages in the chain.  We now need to
514 	 * update new_datalen to include the size of both the key and
515 	 * the data that we need to move.
516 	 */
517 
518 	new_datalen = ISBIG(hcp, new_datalen) ?
519 	    HOFFDUP_SIZE : HKEYDATA_SIZE(new_datalen);
520 	new_datalen +=
521 	    LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_KEYINDEX(hcp->indx));
522 
523 	new_pagep = NULL;
524 	next_pagep = hcp->page;
525 	for (next_pgno = NEXT_PGNO(hcp->page); next_pgno != PGNO_INVALID;
526 	    next_pgno = NEXT_PGNO(next_pagep)) {
527 		if (next_pagep != hcp->page && (ret = __memp_fput(mpf,
528 		    dbc->thread_info, next_pagep, dbc->priority)) != 0)
529 			return (ret);
530 
531 		if ((ret = __memp_fget(mpf,
532 		    &next_pgno, dbc->thread_info, dbc->txn,
533 		    DB_MPOOL_CREATE, &next_pagep)) != 0)
534 			return (ret);
535 
536 		if (P_FREESPACE(dbp, next_pagep) >= new_datalen) {
537 			found = 1;
538 			break;
539 		}
540 	}
541 
542 	if (found != 0) {
543 		/* Found a page with space, dirty it and the original. */
544 		new_pagep = next_pagep;
545 		if ((ret = __memp_dirty(mpf, &hcp->page,
546 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
547 			goto err;
548 		if ((ret = __memp_dirty(mpf, &new_pagep,
549 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
550 			goto err;
551 	} else {
552 		if ((ret = __memp_dirty(mpf, &next_pagep,
553 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
554 			goto err;
555 
556 		/* Add new page at the end of the chain. */
557 		new_pagep = next_pagep;
558 		if ((ret = __ham_add_ovflpage(dbc, &new_pagep)) != 0)
559 			goto err;
560 
561 		if (next_pagep != hcp->page) {
562 			if ((ret = __memp_fput(mpf,
563 			    dbc->thread_info, next_pagep, dbc->priority)) != 0)
564 				goto err;
565 			next_pagep = NULL;
566 			/* Dirty the original page to update it. */
567 			if ((ret = __memp_dirty(mpf, &hcp->page,
568 			    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
569 				goto err;
570 		}
571 	}
572 
573 	/* Copy the item to the new page. */
574 	if (DBC_LOGGING(dbc)) {
575 		memset(&k, 0, sizeof(DBT));
576 		d.flags = 0;
577 		if (HPAGE_PTYPE(
578 		    H_PAIRKEY(dbp, hcp->page, hcp->indx)) == H_OFFPAGE) {
579 			k.data = H_PAIRKEY(dbp, hcp->page, hcp->indx);
580 			k.size = HOFFPAGE_SIZE;
581 			key_type = H_OFFPAGE;
582 		} else {
583 			k.data =
584 			    HKEYDATA_DATA(H_PAIRKEY(dbp, hcp->page, hcp->indx));
585 			k.size =
586 			    LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx);
587 			key_type = H_KEYDATA;
588 		}
589 
590 		/* Resolve the insert index so it can be written to the log. */
591 		if ((ret = __ham_getindex(dbc, new_pagep, &k,
592 		    key_type, &match, &new_indx)) != 0)
593 			return (ret);
594 
595 		if ((data_type = HPAGE_PTYPE(hk)) == H_OFFPAGE) {
596 			d.data = hk;
597 			d.size = HOFFPAGE_SIZE;
598 		} else if (data_type == H_OFFDUP) {
599 			d.data = hk;
600 			d.size = HOFFDUP_SIZE;
601 		} else {
602 			d.data = HKEYDATA_DATA(hk);
603 			d.size = LEN_HDATA(dbp,
604 			     hcp->page, dbp->pgsize, hcp->indx);
605 		}
606 
607 		if ((ret = __ham_insdel_log(dbp, dbc->txn, &new_lsn,
608 		    0, PUTPAIR, PGNO(new_pagep), (u_int32_t)new_indx,
609 		    &LSN(new_pagep), OP_SET(key_type, new_pagep), &k,
610 		    OP_SET(data_type, new_pagep), &d)) != 0) {
611 			(void)__memp_fput(mpf,
612 			    dbc->thread_info, new_pagep, dbc->priority);
613 			return (ret);
614 		}
615 	} else {
616 		LSN_NOT_LOGGED(new_lsn);
617 		/*
618 		 * Ensure that an invalid index is passed to __ham_copypair, so
619 		 * it knows to resolve the index. Resolving the insert index
620 		 * here would require creating a temporary DBT with the key,
621 		 * and calling __ham_getindex. Let __ham_copypair do the
622 		 * resolution using the final key DBT.
623 		 */
624 		new_indx = NDX_INVALID;
625 	}
626 
627 	/* Move lsn onto page. */
628 	LSN(new_pagep) = new_lsn;	/* Structure assignment. */
629 
630 	if ((ret = __ham_copypair(dbc, hcp->page,
631 	    H_KEYINDEX(hcp->indx), new_pagep, &new_indx, 0)) != 0)
632 		goto err;
633 
634 	/* Update all cursors that used to point to this item. */
635 	if ((ret = __hamc_chgpg(dbc, PGNO(hcp->page), H_KEYINDEX(hcp->indx),
636 	    PGNO(new_pagep), new_indx)) != 0)
637 		goto err;
638 
639 	/* Now delete the pair from the current page. */
640 	if ((ret = __ham_del_pair(dbc, HAM_DEL_NO_RECLAIM, NULL)) != 0)
641 		goto err;
642 
643 	/*
644 	 * __ham_del_pair decremented nelem.  This is incorrect;  we
645 	 * manually copied the element elsewhere, so the total number
646 	 * of elements hasn't changed.  Increment it again.
647 	 *
648 	 * !!!
649 	 * Note that we still have the metadata page pinned, and
650 	 * __ham_del_pair dirtied it, so we don't need to set the dirty
651 	 * flag again.
652 	 */
653 	if (!STD_LOCKING(dbc))
654 		hcp->hdr->nelem++;
655 
656 	ret = __memp_fput(mpf, dbc->thread_info, hcp->page, dbc->priority);
657 	hcp->page = new_pagep;
658 	hcp->pgno = PGNO(hcp->page);
659 	hcp->indx = new_indx;
660 	F_SET(hcp, H_EXPAND);
661 	F_CLR(hcp, H_DELETED);
662 
663 	return (ret);
664 
665 err:	if (new_pagep != NULL)
666 		(void)__memp_fput(mpf,
667 			dbc->thread_info, new_pagep, dbc->priority);
668 	if (next_pagep != NULL &&
669 	    next_pagep != hcp->page && next_pagep != new_pagep)
670 		(void)__memp_fput(mpf,
671 			dbc->thread_info, next_pagep, dbc->priority);
672 	return (ret);
673 
674 }
675 
676 /*
677  * __ham_move_offpage --
678  *	Replace an onpage set of duplicates with the OFFDUP structure
679  *	that references the duplicate page.
680  *
681  * XXX
682  * This is really just a special case of __onpage_replace; we should
683  * probably combine them.
684  *
685  */
686 static int
__ham_move_offpage(dbc,pagep,ndx,pgno)687 __ham_move_offpage(dbc, pagep, ndx, pgno)
688 	DBC *dbc;
689 	PAGE *pagep;
690 	u_int32_t ndx;
691 	db_pgno_t pgno;
692 {
693 	DB *dbp;
694 	DBT new_dbt;
695 	DBT old_dbt;
696 	HOFFDUP od;
697 	db_indx_t i, *inp;
698 	int32_t difflen;
699 	u_int8_t *src;
700 	int ret;
701 
702 	dbp = dbc->dbp;
703 	od.type = H_OFFDUP;
704 	UMRW_SET(od.unused[0]);
705 	UMRW_SET(od.unused[1]);
706 	UMRW_SET(od.unused[2]);
707 	od.pgno = pgno;
708 	ret = 0;
709 
710 	if (DBC_LOGGING(dbc)) {
711 		HKEYDATA *hk;
712 		new_dbt.data = &od;
713 		new_dbt.size = HOFFDUP_SIZE;
714 		hk = (HKEYDATA *)P_ENTRY(dbp, pagep, ndx);
715 		if (hk->type == H_KEYDATA || hk->type == H_DUPLICATE) {
716 			old_dbt.data = hk->data;
717 			old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
718 			     SSZA(HKEYDATA, data);
719 		} else {
720 			old_dbt.data = hk;
721 			old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx);
722 		}
723 		if ((ret = __ham_replace_log(dbp, dbc->txn, &LSN(pagep), 0,
724 		    PGNO(pagep), (u_int32_t)ndx, &LSN(pagep), -1,
725 		    OP_SET(hk->type, pagep), &old_dbt,
726 		    OP_SET(H_OFFDUP, pagep), &new_dbt)) != 0)
727 			return (ret);
728 	} else
729 		LSN_NOT_LOGGED(LSN(pagep));
730 
731 	/*
732 	 * difflen is the difference in the lengths, and so may be negative.
733 	 * We know that the difference between two unsigned lengths from a
734 	 * database page will fit into an int32_t.
735 	 */
736 	difflen =
737 	    (int32_t)LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
738 	    (int32_t)HOFFDUP_SIZE;
739 	if (difflen != 0) {
740 		/* Copy data. */
741 		inp = P_INP(dbp, pagep);
742 		src = (u_int8_t *)(pagep) + HOFFSET(pagep);
743 		memmove(src + difflen, src, inp[ndx] - HOFFSET(pagep));
744 		HOFFSET(pagep) += difflen;
745 
746 		/* Update index table. */
747 		for (i = ndx; i < NUM_ENT(pagep); i++)
748 			inp[i] += difflen;
749 	}
750 
751 	/* Now copy the offdup entry onto the page. */
752 	memcpy(P_ENTRY(dbp, pagep, ndx), &od, HOFFDUP_SIZE);
753 	return (ret);
754 }
755 
756 /*
757  * __ham_dsearch:
758  *	Locate a particular duplicate in a duplicate set.  Make sure that
759  *	we exit with the cursor set appropriately.
760  *
761  * PUBLIC: void __ham_dsearch
762  * PUBLIC:     __P((DBC *, DBT *, u_int32_t *, int *, u_int32_t));
763  */
764 void
__ham_dsearch(dbc,dbt,offp,cmpp,flags)765 __ham_dsearch(dbc, dbt, offp, cmpp, flags)
766 	DBC *dbc;
767 	DBT *dbt;
768 	u_int32_t *offp, flags;
769 	int *cmpp;
770 {
771 	DB *dbp;
772 	DBT cur;
773 	HASH_CURSOR *hcp;
774 	db_indx_t i, len;
775 	int (*func) __P((DB *, const DBT *, const DBT *));
776 	u_int8_t *data;
777 
778 	dbp = dbc->dbp;
779 	hcp = (HASH_CURSOR *)dbc->internal;
780 	func = dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare;
781 
782 	i = F_ISSET(hcp, H_CONTINUE) ? hcp->dup_off: 0;
783 	data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + i;
784 	hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
785 	len = hcp->dup_len;
786 	while (i < hcp->dup_tlen) {
787 		memcpy(&len, data, sizeof(db_indx_t));
788 		data += sizeof(db_indx_t);
789 		DB_SET_DBT(cur, data, len);
790 
791 		/*
792 		 * If we find an exact match, we're done.  If in a sorted
793 		 * duplicate set and the item is larger than our test item,
794 		 * we're done.  In the latter case, if permitting partial
795 		 * matches, it's not a failure.
796 		 */
797 		*cmpp = func(dbp, dbt, &cur);
798 		if (*cmpp == 0)
799 			break;
800 		if (*cmpp < 0 && dbp->dup_compare != NULL) {
801 			if (flags == DB_GET_BOTH_RANGE)
802 				*cmpp = 0;
803 			break;
804 		}
805 
806 		i += len + 2 * sizeof(db_indx_t);
807 		data += len + sizeof(db_indx_t);
808 	}
809 
810 	*offp = i;
811 	hcp->dup_off = i;
812 	hcp->dup_len = len;
813 	F_SET(hcp, H_ISDUP);
814 }
815 
816 /*
817  * __ham_dcursor --
818  *
819  *	Create an off page duplicate cursor for this cursor.
820  */
821 static int
__ham_dcursor(dbc,pgno,indx)822 __ham_dcursor(dbc, pgno, indx)
823 	DBC *dbc;
824 	db_pgno_t pgno;
825 	u_int32_t indx;
826 {
827 	BTREE_CURSOR *dcp;
828 	DB *dbp;
829 	HASH_CURSOR *hcp;
830 	int ret;
831 
832 	dbp = dbc->dbp;
833 	hcp = (HASH_CURSOR *)dbc->internal;
834 
835 	if ((ret = __dbc_newopd(dbc, pgno, hcp->opd, &hcp->opd)) != 0)
836 		return (ret);
837 
838 	dcp = (BTREE_CURSOR *)hcp->opd->internal;
839 	dcp->pgno = pgno;
840 	dcp->indx = indx;
841 
842 	if (dbp->dup_compare == NULL) {
843 		/*
844 		 * Converting to off-page Recno trees is tricky.  The
845 		 * record number for the cursor is the index + 1 (to
846 		 * convert to 1-based record numbers).
847 		 */
848 		dcp->recno = indx + 1;
849 	}
850 
851 	/*
852 	 * Transfer the deleted flag from the top-level cursor to the
853 	 * created one.
854 	 */
855 	if (F_ISSET(hcp, H_DELETED)) {
856 		F_SET(dcp, C_DELETED);
857 		F_CLR(hcp, H_DELETED);
858 	}
859 
860 	return (0);
861 }
862 
863 struct __hamc_chgpg_args {
864 	db_pgno_t new_pgno;
865 	db_indx_t new_index;
866 	DB_TXN *my_txn;
867 };
868 
869 static int
__hamc_chgpg_func(cp,my_dbc,foundp,old_pgno,old_index,vargs)870 __hamc_chgpg_func(cp, my_dbc, foundp, old_pgno, old_index, vargs)
871 	DBC *cp, *my_dbc;
872 	u_int32_t *foundp;
873 	db_pgno_t old_pgno;
874 	u_int32_t old_index;
875 	void *vargs;
876 {
877 	HASH_CURSOR *hcp;
878 	struct __hamc_chgpg_args *args;
879 
880 	if (cp == my_dbc || cp->dbtype != DB_HASH)
881 		return (0);
882 
883 	hcp = (HASH_CURSOR *)cp->internal;
884 
885 	/*
886 	 * If a cursor is deleted, it doesn't refer to this
887 	 * item--it just happens to have the same indx, but
888 	 * it points to a former neighbor.  Don't move it.
889 	 */
890 	if (F_ISSET(hcp, H_DELETED))
891 		return (0);
892 
893 	args = vargs;
894 
895 	if (hcp->pgno == old_pgno &&
896 	    hcp->indx == old_index &&
897 	    !MVCC_SKIP_CURADJ(cp, old_pgno)) {
898 		hcp->pgno = args->new_pgno;
899 		hcp->indx = args->new_index;
900 		if (args->my_txn != NULL && cp->txn != args->my_txn)
901 			*foundp = 1;
902 	}
903 	return (0);
904 }
905 
906 /*
907  * __hamc_chgpg --
908  *	Adjust the cursors after moving an item to a new page.  We only
909  *	move cursors that are pointing at this one item and are not
910  *	deleted;  since we only touch non-deleted cursors, and since
911  *	(by definition) no item existed at the pgno/indx we're moving the
912  *	item to, we're guaranteed that all the cursors we affect here or
913  *	on abort really do refer to this one item.
914  */
915 static int
__hamc_chgpg(dbc,old_pgno,old_index,new_pgno,new_index)916 __hamc_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
917 	DBC *dbc;
918 	db_pgno_t old_pgno, new_pgno;
919 	u_int32_t old_index, new_index;
920 {
921 	DB *dbp;
922 	DB_LSN lsn;
923 	int ret;
924 	u_int32_t found;
925 	struct __hamc_chgpg_args args;
926 
927 	dbp = dbc->dbp;
928 
929 	args.my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
930 	args.new_pgno = new_pgno;
931 	args.new_index = new_index;
932 
933 	if ((ret = __db_walk_cursors(dbp, dbc,
934 	    __hamc_chgpg_func, &found, old_pgno, old_index, &args)) != 0)
935 		return (ret);
936 	if (found != 0 && DBC_LOGGING(dbc)) {
937 		if ((ret = __ham_chgpg_log(dbp,
938 		    args.my_txn, &lsn, 0, DB_HAM_CHGPG,
939 		    old_pgno, new_pgno, old_index, new_index)) != 0)
940 			return (ret);
941 	}
942 	return (0);
943 }
944