1 /*-
2  * Copyright (c) 1996, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  */
6 /*
7  * Copyright (c) 1990, 1993, 1994, 1995, 1996
8  *	Keith Bostic.  All rights reserved.
9  */
10 /*
11  * Copyright (c) 1990, 1993, 1994, 1995
12  *	The Regents of the University of California.  All rights reserved.
13  *
14  * This code is derived from software contributed to Berkeley by
15  * Mike Olson.
16  *
17  * Redistribution and use in source and binary forms, with or without
18  * modification, are permitted provided that the following conditions
19  * are met:
20  * 1. Redistributions of source code must retain the above copyright
21  *    notice, this list of conditions and the following disclaimer.
22  * 2. Redistributions in binary form must reproduce the above copyright
23  *    notice, this list of conditions and the following disclaimer in the
24  *    documentation and/or other materials provided with the distribution.
25  * 3. Neither the name of the University nor the names of its contributors
26  *    may be used to endorse or promote products derived from this software
27  *    without specific prior written permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39  * SUCH DAMAGE.
40  *
41  * $Id$
42  */
43 
44 #include "db_config.h"
45 
46 #include "db_int.h"
47 #include "dbinc/db_page.h"
48 #include "dbinc/db_am.h"
49 #include "dbinc/mp.h"
50 
51 /*
52  * Big key/data code.
53  *
54  * Big key and data entries are stored on linked lists of pages.  The initial
55  * reference is a structure with the total length of the item and the page
56  * number where it begins.  Each entry in the linked list contains a pointer
57  * to the next page of data, and so on.
58  */
59 
60 /*
61  * __db_alloc_dbt
62  *
63  *	Allocate enough space in the dbt to hold the data. Also used by the
64  *	blob file API.
65  *
66  * PUBLIC: int __db_alloc_dbt __P((ENV *, DBT *, u_int32_t, u_int32_t *,
67  * PUBLIC:	u_int32_t *, void **, u_int32_t *));
68  */
69 int
__db_alloc_dbt(env,dbt,tlen,nd,st,bpp,bpsz)70 __db_alloc_dbt(env, dbt, tlen, nd, st, bpp, bpsz)
71 	ENV *env;
72 	DBT *dbt;
73 	u_int32_t tlen;
74 	u_int32_t *nd;
75 	u_int32_t *st;
76 	void **bpp;
77 	u_int32_t *bpsz;
78 {
79 	int ret;
80 	u_int32_t needed, start;
81 
82 	/*
83 	 * Check if the buffer is big enough; if it is not and we are
84 	 * allowed to malloc space, then we'll malloc it.  If we are
85 	 * not (DB_DBT_USERMEM), then we'll set the dbt and return
86 	 * appropriately.
87 	 */
88 	if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
89 		start = dbt->doff;
90 		if (start > tlen)
91 			needed = 0;
92 		else if (dbt->dlen > tlen - start)
93 			needed = tlen - start;
94 		else
95 			needed = dbt->dlen;
96 	} else {
97 		start = 0;
98 		needed = tlen;
99 	}
100 	*nd = needed;
101 	*st = start;
102 
103 	/*
104 	 * If the caller has not requested any data, return success. This
105 	 * "early-out" also avoids setting up the streaming optimization when
106 	 * no page would be retrieved. If it were removed, the streaming code
107 	 * should only initialize when needed is not 0.
108 	 */
109 	if (needed == 0) {
110 		dbt->size = 0;
111 		return (0);
112 	}
113 
114 	if (F_ISSET(dbt, DB_DBT_USERCOPY))
115 		return (0);
116 
117 	/* Allocate any necessary memory. */
118 	if (F_ISSET(dbt, DB_DBT_USERMEM)) {
119 		if (needed > dbt->ulen) {
120 			dbt->size = needed;
121 			return (DB_BUFFER_SMALL);
122 		}
123 	} else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
124 		if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
125 			return (ret);
126 	} else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
127 		if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
128 			return (ret);
129 	} else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
130 		if ((ret = __os_realloc(env, needed, bpp)) != 0)
131 			return (ret);
132 		*bpsz = needed;
133 		dbt->data = *bpp;
134 	} else if (bpp != NULL)
135 		dbt->data = *bpp;
136 	else {
137 		DB_ASSERT(env,
138 		    F_ISSET(dbt,
139 		    DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
140 		    bpsz != NULL);
141 		return (DB_BUFFER_SMALL);
142 	}
143 
144 	return (0);
145 }
146 
147 /*
148  * __db_goff --
149  *	Get an offpage item.
150  *
151  * PUBLIC: int __db_goff __P((DBC *,
152  * PUBLIC:     DBT *, u_int32_t, db_pgno_t, void **, u_int32_t *));
153  */
154 int
__db_goff(dbc,dbt,tlen,pgno,bpp,bpsz)155 __db_goff(dbc, dbt, tlen, pgno, bpp, bpsz)
156 	DBC *dbc;
157 	DBT *dbt;
158 	u_int32_t tlen;
159 	db_pgno_t pgno;
160 	void **bpp;
161 	u_int32_t *bpsz;
162 {
163 	DB *dbp;
164 	DB_MPOOLFILE *mpf;
165 	DB_TXN *txn;
166 	DBC_INTERNAL *cp;
167 	ENV *env;
168 	PAGE *h;
169 	DB_THREAD_INFO *ip;
170 	db_indx_t bytes;
171 	u_int32_t curoff, needed, start;
172 	u_int8_t *p, *src;
173 	int ret;
174 
175 	dbp = dbc->dbp;
176 	cp = dbc->internal;
177 	env = dbp->env;
178 	ip = dbc->thread_info;
179 	mpf = dbp->mpf;
180 	txn = dbc->txn;
181 
182 	if (((ret = __db_alloc_dbt(
183 	    env, dbt, tlen, &needed, &start, bpp, bpsz)) != 0) || needed == 0)
184 		return (ret);
185 
186 	/* Set up a start page in the overflow chain if streaming. */
187 	if (cp->stream_start_pgno != PGNO_INVALID &&
188 	    pgno == cp->stream_start_pgno && start >= cp->stream_off &&
189 	    start < cp->stream_off + P_MAXSPACE(dbp, dbp->pgsize)) {
190 		pgno = cp->stream_curr_pgno;
191 		curoff = cp->stream_off;
192 	} else {
193 		cp->stream_start_pgno = cp->stream_curr_pgno = pgno;
194 		cp->stream_off = curoff = 0;
195 	}
196 
197 	/*
198 	 * Step through the linked list of pages, copying the data on each
199 	 * one into the buffer.  Never copy more than the total data length.
200 	 */
201 	dbt->size = needed;
202 	for (p = dbt->data; pgno != PGNO_INVALID && needed > 0;) {
203 		if ((ret = __memp_fget(mpf,
204 		    &pgno, ip, txn, 0, &h)) != 0)
205 			return (ret);
206 		DB_ASSERT(env, TYPE(h) == P_OVERFLOW);
207 
208 		/* Check if we need any bytes from this page. */
209 		if (curoff + OV_LEN(h) >= start) {
210 			bytes = OV_LEN(h);
211 			src = (u_int8_t *)h + P_OVERHEAD(dbp);
212 			if (start > curoff) {
213 				src += start - curoff;
214 				bytes -= start - curoff;
215 			}
216 			if (bytes > needed)
217 				bytes = needed;
218 			if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
219 				/*
220 				 * The offset into the DBT is the total size
221 				 * less the amount of data still needed.  Care
222 				 * needs to be taken if doing a partial copy
223 				 * beginning at an offset other than 0.
224 				 */
225 				if ((ret = env->dbt_usercopy(
226 				    dbt, dbt->size - needed,
227 				    src, bytes, DB_USERCOPY_SETDATA)) != 0) {
228 					(void)__memp_fput(mpf,
229 					    ip, h, dbp->priority);
230 					return (ret);
231 				}
232 			} else
233 				memcpy(p, src, bytes);
234 			p += bytes;
235 			needed -= bytes;
236 		}
237 		cp->stream_off = curoff;
238 		curoff += OV_LEN(h);
239 		cp->stream_curr_pgno = pgno;
240 		pgno = h->next_pgno;
241 		(void)__memp_fput(mpf, ip, h, dbp->priority);
242 	}
243 
244 	return (0);
245 }
246 
247 /*
248  * __db_poff --
249  *	Put an offpage item.
250  *
251  * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *));
252  */
253 int
__db_poff(dbc,dbt,pgnop)254 __db_poff(dbc, dbt, pgnop)
255 	DBC *dbc;
256 	const DBT *dbt;
257 	db_pgno_t *pgnop;
258 {
259 	DB *dbp;
260 	DBT tmp_dbt;
261 	DB_LSN null_lsn;
262 	DB_MPOOLFILE *mpf;
263 	PAGE *pagep, *lastp;
264 	db_indx_t pagespace;
265 	db_pgno_t pgno;
266 	u_int32_t space, sz, tlen;
267 	u_int8_t *p;
268 	int ret, t_ret;
269 
270 	COMPQUIET(tlen, 0);
271 
272 	/*
273 	 * Allocate pages and copy the key/data item into them.  Calculate the
274 	 * number of bytes we get for pages we fill completely with a single
275 	 * item.
276 	 */
277 	dbp = dbc->dbp;
278 	lastp = NULL;
279 	mpf = dbp->mpf;
280 	pagespace = P_MAXSPACE(dbp, dbp->pgsize);
281 	p = dbt->data;
282 	sz = dbt->size;
283 
284 	/*
285 	 * Check whether we are streaming at the end of the overflow item.
286 	 * If so, the last pgno and offset will be cached in the cursor.
287 	 */
288 	if (F_ISSET(dbt, DB_DBT_STREAMING)) {
289 		tlen = dbt->size - dbt->dlen;
290 		pgno = dbc->internal->stream_curr_pgno;
291 		if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info,
292 		    dbc->txn, DB_MPOOL_DIRTY, &lastp)) != 0)
293 			return (ret);
294 
295 		/*
296 		 * Calculate how much we can write on the last page of the
297 		 * overflow item.
298 		 */
299 		DB_ASSERT(dbp->env,
300 		    OV_LEN(lastp) == (tlen - dbc->internal->stream_off));
301 		space = pagespace - OV_LEN(lastp);
302 
303 		/* Only copy as much data as we have. */
304 		if (space > dbt->dlen)
305 			space = dbt->dlen;
306 
307 		if (DBC_LOGGING(dbc)) {
308 			tmp_dbt.data = dbt->data;
309 			tmp_dbt.size = space;
310 			ZERO_LSN(null_lsn);
311 			if ((ret = __db_big_log(dbp, dbc->txn, &LSN(lastp), 0,
312 			    OP_SET(DB_APPEND_BIG, lastp), pgno,
313 			    PGNO_INVALID, PGNO_INVALID, &tmp_dbt,
314 			    &LSN(lastp), &null_lsn, &null_lsn)) != 0)
315 				goto err;
316 		} else
317 			LSN_NOT_LOGGED(LSN(lastp));
318 
319 		memcpy((u_int8_t *)lastp + P_OVERHEAD(dbp) + OV_LEN(lastp),
320 		    dbt->data, space);
321 		OV_LEN(lastp) += space;
322 		sz -= space + dbt->doff;
323 		p += space;
324 		*pgnop = dbc->internal->stream_start_pgno;
325 	}
326 
327 	ret = 0;
328 	for (; sz > 0; p += pagespace, sz -= pagespace) {
329 		/*
330 		 * Reduce pagespace so we terminate the loop correctly and
331 		 * don't copy too much data.
332 		 */
333 		if (sz < pagespace)
334 			pagespace = sz;
335 
336 		/*
337 		 * Allocate and initialize a new page and copy all or part of
338 		 * the item onto the page.  If sz is less than pagespace, we
339 		 * have a partial record.
340 		 */
341 		if ((ret = __db_new(dbc, P_OVERFLOW, NULL, &pagep)) != 0)
342 			break;
343 		if (DBC_LOGGING(dbc)) {
344 			tmp_dbt.data = p;
345 			tmp_dbt.size = pagespace;
346 			ZERO_LSN(null_lsn);
347 			if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
348 			    OP_SET(DB_ADD_BIG, pagep),
349 			    PGNO(pagep), lastp ? PGNO(lastp) : PGNO_INVALID,
350 			    PGNO_INVALID, &tmp_dbt, &LSN(pagep),
351 			    lastp == NULL ? &null_lsn : &LSN(lastp),
352 			    &null_lsn)) != 0) {
353 				(void)__memp_fput(mpf, dbc->thread_info,
354 				    pagep, dbc->priority);
355 				goto err;
356 			}
357 		} else
358 			LSN_NOT_LOGGED(LSN(pagep));
359 
360 		/* Move LSN onto page. */
361 		if (lastp != NULL)
362 			LSN(lastp) = LSN(pagep);
363 
364 		OV_LEN(pagep) = pagespace;
365 		OV_REF(pagep) = 1;
366 		memcpy((u_int8_t *)pagep + P_OVERHEAD(dbp), p, pagespace);
367 
368 		/*
369 		 * If this is the first entry, update the user's info and
370 		 * initialize the cursor to allow for streaming of subsequent
371 		 * updates.  Otherwise, update the entry on the last page
372 		 * filled in and release that page.
373 		 */
374 		if (lastp == NULL) {
375 			*pgnop = PGNO(pagep);
376 			dbc->internal->stream_start_pgno =
377 			    dbc->internal->stream_curr_pgno = *pgnop;
378 			dbc->internal->stream_off = 0;
379 		} else {
380 			lastp->next_pgno = PGNO(pagep);
381 			pagep->prev_pgno = PGNO(lastp);
382 			if ((ret = __memp_fput(mpf,
383 			    dbc->thread_info, lastp, dbc->priority)) != 0) {
384 				lastp = NULL;
385 				goto err;
386 			}
387 		}
388 		lastp = pagep;
389 	}
390 err:	if (lastp != NULL) {
391 		if (ret == 0) {
392 			dbc->internal->stream_curr_pgno = PGNO(lastp);
393 			dbc->internal->stream_off = dbt->size - OV_LEN(lastp);
394 		}
395 
396 		if ((t_ret = __memp_fput(mpf, dbc->thread_info, lastp,
397 		    dbc->priority)) != 0 && ret == 0)
398 			ret = t_ret;
399 	}
400 	return (ret);
401 }
402 
403 /*
404  * __db_ovref --
405  *	Decrement the reference count on an overflow page.
406  *
407  * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t));
408  */
409 int
__db_ovref(dbc,pgno)410 __db_ovref(dbc, pgno)
411 	DBC *dbc;
412 	db_pgno_t pgno;
413 {
414 	DB *dbp;
415 	DB_MPOOLFILE *mpf;
416 	PAGE *h;
417 	int ret;
418 
419 	dbp = dbc->dbp;
420 	mpf = dbp->mpf;
421 
422 	if ((ret = __memp_fget(mpf, &pgno,
423 	     dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &h)) != 0)
424 		return (ret);
425 
426 	if (DBC_LOGGING(dbc)) {
427 		if ((ret = __db_ovref_log(dbp,
428 		    dbc->txn, &LSN(h), 0, h->pgno, -1, &LSN(h))) != 0) {
429 			(void)__memp_fput(mpf,
430 			     dbc->thread_info, h, dbc->priority);
431 			return (ret);
432 		}
433 	} else
434 		LSN_NOT_LOGGED(LSN(h));
435 
436 	/*
437 	 * In BDB releases before 4.5, the overflow reference counts were
438 	 * incremented when an overflow item was split onto an internal
439 	 * page.  There was a lock race in that code, and rather than fix
440 	 * the race, we changed BDB to copy overflow items when splitting
441 	 * them onto internal pages.  The code to decrement reference
442 	 * counts remains so databases already in the field continue to
443 	 * work.
444 	 */
445 	--OV_REF(h);
446 
447 	return (__memp_fput(mpf, dbc->thread_info, h, dbc->priority));
448 }
449 
450 /*
451  * __db_doff --
452  *	Delete an offpage chain of overflow pages.
453  *
454  * PUBLIC: int __db_doff __P((DBC *, db_pgno_t));
455  */
456 int
__db_doff(dbc,pgno)457 __db_doff(dbc, pgno)
458 	DBC *dbc;
459 	db_pgno_t pgno;
460 {
461 	DB *dbp;
462 	DBT tmp_dbt;
463 	DB_LSN null_lsn;
464 	DB_MPOOLFILE *mpf;
465 	PAGE *pagep;
466 	int ret;
467 
468 	dbp = dbc->dbp;
469 	mpf = dbp->mpf;
470 
471 	do {
472 		if ((ret = __memp_fget(mpf, &pgno,
473 		     dbc->thread_info, dbc->txn, 0, &pagep)) != 0)
474 			return (ret);
475 
476 		DB_ASSERT(dbp->env, TYPE(pagep) == P_OVERFLOW);
477 		/*
478 		 * If it's referenced by more than one key/data item,
479 		 * decrement the reference count and return.
480 		 */
481 		if (OV_REF(pagep) > 1) {
482 			(void)__memp_fput(mpf,
483 			    dbc->thread_info, pagep, dbc->priority);
484 			return (__db_ovref(dbc, pgno));
485 		}
486 
487 		if ((ret = __memp_dirty(mpf, &pagep,
488 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
489 			if (pagep != NULL)
490 				(void)__memp_fput(mpf,
491 				    dbc->thread_info, pagep, dbc->priority);
492 			return (ret);
493 		}
494 
495 		if (DBC_LOGGING(dbc)) {
496 			tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD(dbp);
497 			tmp_dbt.size = OV_LEN(pagep);
498 			ZERO_LSN(null_lsn);
499 			if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
500 			    OP_SET(DB_REM_BIG, pagep), PGNO(pagep),
501 			    PREV_PGNO(pagep), NEXT_PGNO(pagep), &tmp_dbt,
502 			    &LSN(pagep), &null_lsn, &null_lsn)) != 0) {
503 				(void)__memp_fput(mpf,
504 				    dbc->thread_info, pagep, dbc->priority);
505 				return (ret);
506 			}
507 		} else
508 			LSN_NOT_LOGGED(LSN(pagep));
509 		pgno = pagep->next_pgno;
510 		OV_LEN(pagep) = 0;
511 		if ((ret = __db_free(dbc, pagep, 0)) != 0)
512 			return (ret);
513 	} while (pgno != PGNO_INVALID);
514 
515 	return (0);
516 }
517 
518 /*
519  * __db_moff --
520  *	Match on overflow pages from a specific offset.
521  *
522  * Given a starting page number and a key, store <0, 0, >0 in 'cmpp' to indicate
523  * if the key on the page is less than, equal to or greater than the key
524  * specified. We optimize this by doing a chunk at a time comparison unless the
525  * user has specified a comparison function. In this case, we need to
526  * materialize the entire object and call their comparison routine.
527  *
528  * We start the comparison at an offset and update the offset with the
529  * longest matching count after the comparison.
530  *
531  * __db_moff and __db_coff are generic functions useful in searching and
532  * ordering off page items. __db_moff matches an overflow DBT with an offpage
533  * item. __db_coff compares two offpage items for lexicographic sort order.
534  *
535  * PUBLIC: int __db_moff __P((DBC *, const DBT *, db_pgno_t, u_int32_t,
536  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *, size_t *),
537  * PUBLIC:     int *, size_t *));
538  */
539 int
__db_moff(dbc,dbt,pgno,tlen,cmpfunc,cmpp,locp)540 __db_moff(dbc, dbt, pgno, tlen, cmpfunc, cmpp, locp)
541 	DBC *dbc;
542 	const DBT *dbt;
543 	db_pgno_t pgno;
544 	u_int32_t tlen;
545 	int (*cmpfunc) __P((DB *, const DBT *, const DBT *, size_t *)), *cmpp;
546 	size_t *locp;
547 {
548 	DB *dbp;
549 	DBT local_dbt;
550 	DB_MPOOLFILE *mpf;
551 	DB_THREAD_INFO *ip;
552 	PAGE *pagep;
553 	void *buf;
554 	u_int32_t bufsize, cmp_bytes, key_left;
555 	u_int8_t *p1, *p2;
556 	int ret;
557 	size_t pos, start;
558 
559 	dbp = dbc->dbp;
560 	ip = dbc->thread_info;
561 	mpf = dbp->mpf;
562 
563 	/*
564 	 * If there is a user-specified comparison function, build a
565 	 * contiguous copy of the key, and call it.
566 	 */
567 	if (cmpfunc != NULL) {
568 		memset(&local_dbt, 0, sizeof(local_dbt));
569 		buf = NULL;
570 		bufsize = 0;
571 
572 		if ((ret = __db_goff(dbc,
573 		    &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
574 			return (ret);
575 		/* Pass the key as the first argument */
576 		*cmpp = cmpfunc(dbp, dbt, &local_dbt, NULL);
577 		__os_free(dbp->env, buf);
578 		return (0);
579 	}
580 
581 	/*
582 	 * We start the comparison from the location of 'locp' and store the
583 	 * last matching location into 'locp'.
584 	 */
585 	start = (locp == NULL ? 0 : *locp);
586 	pos = 0;
587 
588 	/* Subtract prefix length from lengths. */
589 	tlen -= (u_int32_t)start;
590 	key_left = dbt->size - (u_int32_t)start;
591 	p1 = (u_int8_t *)dbt->data + start;
592 
593 	/* While there are both keys to compare. */
594 	for (*cmpp = 0; key_left > 0 &&
595 	    tlen > 0 && pgno != PGNO_INVALID;) {
596 		if ((ret =
597 		    __memp_fget(mpf, &pgno, ip, dbc->txn, 0, &pagep)) != 0)
598 			return (ret);
599 
600 		/*
601 		 * Figure out where to start comparison, and how many
602 		 * bytes to compare.
603 		 */
604 		if (pos >= start) {
605 			p2 = (u_int8_t *)pagep + P_OVERHEAD(dbp);
606 			cmp_bytes = OV_LEN(pagep);
607 		} else if (pos + OV_LEN(pagep) > start) {
608 			p2 = (u_int8_t *)pagep +
609 			    P_OVERHEAD(dbp) + (start - pos);
610 			cmp_bytes = OV_LEN(pagep) - (u_int32_t)(start - pos);
611 		} else {
612 			p2 = NULL;
613 			cmp_bytes = 0;
614 		}
615 
616 		pos += OV_LEN(pagep);
617 
618 		if (cmp_bytes != 0) {
619 			if (cmp_bytes > key_left)
620 				cmp_bytes = key_left;
621 			tlen -= cmp_bytes;
622 			key_left -= cmp_bytes;
623 			for (;cmp_bytes-- > 0; ++p1, ++p2) {
624 				if (*p1 != *p2) {
625 					*cmpp = (long)*p1 - (long)*p2;
626 					break;
627 				}
628 				if (locp != NULL)
629 					++(*locp);
630 			}
631 
632 		}
633 		pgno = NEXT_PGNO(pagep);
634 		if ((ret = __memp_fput(mpf, ip, pagep, dbp->priority)) != 0)
635 			return (ret);
636 		if (*cmpp != 0)
637 			return (0);
638 	}
639 
640 	if (*cmpp == 0) {
641 		if (key_left > 0) /* DBT is longer than the page key. */
642 			*cmpp = 1;
643 		else if (tlen > 0) /* DBT is shorter than the page key. */
644 			*cmpp = -1;
645 	}
646 
647 	return (0);
648 }
649 
650 /*
651  * __db_coff --
652  *	Match two offpage dbts.
653  *
654  * The DBTs must both refer to offpage items.
655  * The match happens a chunk (page) at a time unless a user defined comparison
656  * function exists. It is not possible to optimize this comparison away when
657  * a lexicographic sort order is required on mismatch.
658  *
659  * NOTE: For now this function only works for H_OFFPAGE type items. It would
660  * be simple to extend it for use with B_OVERFLOW type items. It would only
661  * require extracting the total length, and page number, dependent on the
662  * DBT type.
663  *
664  * PUBLIC: int __db_coff __P((DBC *, const DBT *, const DBT *,
665  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *, size_t *), int *));
666  */
667 int
__db_coff(dbc,dbt,match,cmpfunc,cmpp)668 __db_coff(dbc, dbt, match, cmpfunc, cmpp)
669 	DBC *dbc;
670 	const DBT *dbt, *match;
671 	int (*cmpfunc) __P((DB *, const DBT *, const DBT *, size_t *)), *cmpp;
672 {
673 	DB *dbp;
674 	DB_THREAD_INFO *ip;
675 	DB_MPOOLFILE *mpf;
676 	DB_TXN *txn;
677 	DBT local_key, local_match;
678 	PAGE *dbt_pagep, *match_pagep;
679 	db_pgno_t dbt_pgno, match_pgno;
680 	u_int32_t cmp_bytes, dbt_bufsz, dbt_len, match_bufsz;
681 	u_int32_t match_len, max_data, page_space;
682 	u_int8_t *p1, *p2;
683 	int ret;
684 	void *dbt_buf, *match_buf;
685 
686 	dbp = dbc->dbp;
687 	ip = dbc->thread_info;
688 	txn = dbc->txn;
689 	mpf = dbp->mpf;
690 	page_space = P_MAXSPACE(dbp, dbp->pgsize);
691 	*cmpp = 0;
692 	dbt_buf = match_buf = NULL;
693 
694 	DB_ASSERT(dbp->env, HPAGE_PTYPE(dbt->data) == H_OFFPAGE);
695 	DB_ASSERT(dbp->env, HPAGE_PTYPE(match->data) == H_OFFPAGE);
696 
697 	/* Extract potentially unaligned length and pgno fields from DBTs */
698 	memcpy(&dbt_len, HOFFPAGE_TLEN(dbt->data), sizeof(u_int32_t));
699 	memcpy(&dbt_pgno, HOFFPAGE_PGNO(dbt->data), sizeof(db_pgno_t));
700 	memcpy(&match_len, HOFFPAGE_TLEN(match->data), sizeof(u_int32_t));
701 	memcpy(&match_pgno, HOFFPAGE_PGNO(match->data), sizeof(db_pgno_t));
702 	max_data = (dbt_len < match_len ? dbt_len : match_len);
703 
704 	/*
705 	 * If there is a custom comparator, fully resolve both DBTs.
706 	 * Then call the users comparator.
707 	 */
708 	if (cmpfunc != NULL) {
709 		memset(&local_key, 0, sizeof(local_key));
710 		memset(&local_match, 0, sizeof(local_match));
711 		dbt_buf = match_buf = NULL;
712 		dbt_bufsz = match_bufsz = 0;
713 
714 		if ((ret = __db_goff(dbc, &local_key, dbt_len,
715 		    dbt_pgno, &dbt_buf, &dbt_bufsz)) != 0)
716 			goto err1;
717 		if ((ret = __db_goff(dbc, &local_match, match_len,
718 		    match_pgno, &match_buf, &match_bufsz)) != 0)
719 			goto err1;
720 		/* The key needs to be the first argument for sort order */
721 		*cmpp = cmpfunc(dbp, &local_key, &local_match, NULL);
722 
723 err1:		if (dbt_buf != NULL)
724 			__os_free(dbp->env, dbt_buf);
725 		if (match_buf != NULL)
726 			__os_free(dbp->env, match_buf);
727 		return (ret);
728 	}
729 
730 	/* Match the offpage DBTs a page at a time. */
731 	while (dbt_pgno != PGNO_INVALID && match_pgno != PGNO_INVALID) {
732 		if ((ret =
733 		    __memp_fget(mpf, &dbt_pgno, ip, txn, 0, &dbt_pagep)) != 0)
734 			return (ret);
735 		DB_ASSERT(dbc->env, TYPE(dbt_pagep) == P_OVERFLOW);
736 		if ((ret =
737 		    __memp_fget(mpf, &match_pgno,
738 			ip, txn, 0, &match_pagep)) != 0) {
739 			(void)__memp_fput(
740 			    mpf, ip, dbt_pagep, DB_PRIORITY_UNCHANGED);
741 			return (ret);
742 		}
743 		DB_ASSERT(dbc->env, TYPE(match_pagep) == P_OVERFLOW);
744 		cmp_bytes = page_space < max_data ? page_space : max_data;
745 		for (p1 = (u_int8_t *)dbt_pagep + P_OVERHEAD(dbp),
746 		    p2 = (u_int8_t *)match_pagep + P_OVERHEAD(dbp);
747 		    cmp_bytes-- > 0; ++p1, ++p2)
748 				if (*p1 != *p2) {
749 					*cmpp = (long)*p1 - (long)*p2;
750 					break;
751 				}
752 
753 		dbt_pgno = NEXT_PGNO(dbt_pagep);
754 		match_pgno = NEXT_PGNO(match_pagep);
755 		max_data -= page_space;
756 		if ((ret = __memp_fput(mpf,
757 		     ip, dbt_pagep, DB_PRIORITY_UNCHANGED)) != 0) {
758 			(void)__memp_fput(mpf,
759 			    ip, match_pagep, DB_PRIORITY_UNCHANGED);
760 			return (ret);
761 		}
762 		if ((ret = __memp_fput(mpf,
763 		    ip, match_pagep, DB_PRIORITY_UNCHANGED)) != 0)
764 			return (ret);
765 		if (*cmpp != 0)
766 			return (0);
767 	}
768 
769 	/* If a lexicographic mismatch was found, then the result has already
770 	 * been returned. If the DBTs matched, consider the lengths of the
771 	 * items, and return appropriately.
772 	 */
773 	if (dbt_len > match_len) /* DBT is longer than the match key. */
774 		*cmpp = 1;
775 	else if (match_len > dbt_len) /* DBT is shorter than the match key. */
776 		*cmpp = -1;
777 	else
778 		*cmpp = 0;
779 
780 	return (0);
781 
782 }
783