1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 2013 Oracle and/or its affiliates.  All rights reserved.
5  */
6 /*
7  * Copyright (c) 1990, 1993, 1994, 1995, 1996
8  *	Keith Bostic.  All rights reserved.
9  */
10 /*
11  * Copyright (c) 1990, 1993, 1994, 1995
12  *	The Regents of the University of California.  All rights reserved.
13  *
14  * This code is derived from software contributed to Berkeley by
15  * Mike Olson.
16  *
17  * Redistribution and use in source and binary forms, with or without
18  * modification, are permitted provided that the following conditions
19  * are met:
20  * 1. Redistributions of source code must retain the above copyright
21  *    notice, this list of conditions and the following disclaimer.
22  * 2. Redistributions in binary form must reproduce the above copyright
23  *    notice, this list of conditions and the following disclaimer in the
24  *    documentation and/or other materials provided with the distribution.
25  * 3. Neither the name of the University nor the names of its contributors
26  *    may be used to endorse or promote products derived from this software
27  *    without specific prior written permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39  * SUCH DAMAGE.
40  *
41  * $Id$
42  */
43 
44 #include "db_config.h"
45 
46 #include "db_int.h"
47 #include "dbinc/db_page.h"
48 #include "dbinc/db_am.h"
49 #include "dbinc/mp.h"
50 
51 /*
52  * Big key/data code.
53  *
54  * Big key and data entries are stored on linked lists of pages.  The initial
55  * reference is a structure with the total length of the item and the page
56  * number where it begins.  Each entry in the linked list contains a pointer
57  * to the next page of data, and so on.
58  */
59 
60 /*
61  * __db_goff --
62  *	Get an offpage item.
63  *
64  * PUBLIC: int __db_goff __P((DBC *,
65  * PUBLIC:     DBT *, u_int32_t, db_pgno_t, void **, u_int32_t *));
66  */
67 int
__db_goff(dbc,dbt,tlen,pgno,bpp,bpsz)68 __db_goff(dbc, dbt, tlen, pgno, bpp, bpsz)
69 	DBC *dbc;
70 	DBT *dbt;
71 	u_int32_t tlen;
72 	db_pgno_t pgno;
73 	void **bpp;
74 	u_int32_t *bpsz;
75 {
76 	DB *dbp;
77 	DB_MPOOLFILE *mpf;
78 	DB_TXN *txn;
79 	DBC_INTERNAL *cp;
80 	ENV *env;
81 	PAGE *h;
82 	DB_THREAD_INFO *ip;
83 	db_indx_t bytes;
84 	u_int32_t curoff, needed, start;
85 	u_int8_t *p, *src;
86 	int ret;
87 
88 	dbp = dbc->dbp;
89 	cp = dbc->internal;
90 	env = dbp->env;
91 	ip = dbc->thread_info;
92 	mpf = dbp->mpf;
93 	txn = dbc->txn;
94 
95 	/*
96 	 * Check if the buffer is big enough; if it is not and we are
97 	 * allowed to malloc space, then we'll malloc it.  If we are
98 	 * not (DB_DBT_USERMEM), then we'll set the dbt and return
99 	 * appropriately.
100 	 */
101 	if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
102 		start = dbt->doff;
103 		if (start > tlen)
104 			needed = 0;
105 		else if (dbt->dlen > tlen - start)
106 			needed = tlen - start;
107 		else
108 			needed = dbt->dlen;
109 	} else {
110 		start = 0;
111 		needed = tlen;
112 	}
113 
114 	/*
115 	 * If the caller has not requested any data, return success. This
116 	 * "early-out" also avoids setting up the streaming optimization when
117 	 * no page would be retrieved. If it were removed, the streaming code
118 	 * should only initialize when needed is not 0.
119 	 */
120 	if (needed == 0) {
121 		dbt->size = 0;
122 		return (0);
123 	}
124 
125 	if (F_ISSET(dbt, DB_DBT_USERCOPY))
126 		goto skip_alloc;
127 
128 	/* Allocate any necessary memory. */
129 	if (F_ISSET(dbt, DB_DBT_USERMEM)) {
130 		if (needed > dbt->ulen) {
131 			dbt->size = needed;
132 			return (DB_BUFFER_SMALL);
133 		}
134 	} else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
135 		if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
136 			return (ret);
137 	} else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
138 		if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
139 			return (ret);
140 	} else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
141 		if ((ret = __os_realloc(env, needed, bpp)) != 0)
142 			return (ret);
143 		*bpsz = needed;
144 		dbt->data = *bpp;
145 	} else if (bpp != NULL)
146 		dbt->data = *bpp;
147 	else {
148 		DB_ASSERT(env,
149 		    F_ISSET(dbt,
150 		    DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
151 		    bpsz != NULL);
152 		return (DB_BUFFER_SMALL);
153 	}
154 
155 skip_alloc:
156 	/* Set up a start page in the overflow chain if streaming. */
157 	if (cp->stream_start_pgno != PGNO_INVALID &&
158 	    pgno == cp->stream_start_pgno && start >= cp->stream_off &&
159 	    start < cp->stream_off + P_MAXSPACE(dbp, dbp->pgsize)) {
160 		pgno = cp->stream_curr_pgno;
161 		curoff = cp->stream_off;
162 	} else {
163 		cp->stream_start_pgno = cp->stream_curr_pgno = pgno;
164 		cp->stream_off = curoff = 0;
165 	}
166 
167 	/*
168 	 * Step through the linked list of pages, copying the data on each
169 	 * one into the buffer.  Never copy more than the total data length.
170 	 */
171 	dbt->size = needed;
172 	for (p = dbt->data; pgno != PGNO_INVALID && needed > 0;) {
173 		if ((ret = __memp_fget(mpf,
174 		    &pgno, ip, txn, 0, &h)) != 0)
175 			return (ret);
176 		DB_ASSERT(env, TYPE(h) == P_OVERFLOW);
177 
178 		/* Check if we need any bytes from this page. */
179 		if (curoff + OV_LEN(h) >= start) {
180 			bytes = OV_LEN(h);
181 			src = (u_int8_t *)h + P_OVERHEAD(dbp);
182 			if (start > curoff) {
183 				src += start - curoff;
184 				bytes -= start - curoff;
185 			}
186 			if (bytes > needed)
187 				bytes = needed;
188 			if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
189 				/*
190 				 * The offset into the DBT is the total size
191 				 * less the amount of data still needed.  Care
192 				 * needs to be taken if doing a partial copy
193 				 * beginning at an offset other than 0.
194 				 */
195 				if ((ret = env->dbt_usercopy(
196 				    dbt, dbt->size - needed,
197 				    src, bytes, DB_USERCOPY_SETDATA)) != 0) {
198 					(void)__memp_fput(mpf,
199 					    ip, h, dbp->priority);
200 					return (ret);
201 				}
202 			} else
203 				memcpy(p, src, bytes);
204 			p += bytes;
205 			needed -= bytes;
206 		}
207 		cp->stream_off = curoff;
208 		curoff += OV_LEN(h);
209 		cp->stream_curr_pgno = pgno;
210 		pgno = h->next_pgno;
211 		(void)__memp_fput(mpf, ip, h, dbp->priority);
212 	}
213 
214 	return (0);
215 }
216 
217 /*
218  * __db_poff --
219  *	Put an offpage item.
220  *
221  * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *));
222  */
223 int
__db_poff(dbc,dbt,pgnop)224 __db_poff(dbc, dbt, pgnop)
225 	DBC *dbc;
226 	const DBT *dbt;
227 	db_pgno_t *pgnop;
228 {
229 	DB *dbp;
230 	DBT tmp_dbt;
231 	DB_LSN null_lsn;
232 	DB_MPOOLFILE *mpf;
233 	PAGE *pagep, *lastp;
234 	db_indx_t pagespace;
235 	db_pgno_t pgno;
236 	u_int32_t space, sz, tlen;
237 	u_int8_t *p;
238 	int ret, t_ret;
239 
240 	/*
241 	 * Allocate pages and copy the key/data item into them.  Calculate the
242 	 * number of bytes we get for pages we fill completely with a single
243 	 * item.
244 	 */
245 	dbp = dbc->dbp;
246 	lastp = NULL;
247 	mpf = dbp->mpf;
248 	pagespace = P_MAXSPACE(dbp, dbp->pgsize);
249 	p = dbt->data;
250 	sz = dbt->size;
251 
252 	/*
253 	 * Check whether we are streaming at the end of the overflow item.
254 	 * If so, the last pgno and offset will be cached in the cursor.
255 	 */
256 	if (F_ISSET(dbt, DB_DBT_STREAMING)) {
257 		tlen = dbt->size - dbt->dlen;
258 		pgno = dbc->internal->stream_curr_pgno;
259 		if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info,
260 		    dbc->txn, DB_MPOOL_DIRTY, &lastp)) != 0)
261 			return (ret);
262 
263 		/*
264 		 * Calculate how much we can write on the last page of the
265 		 * overflow item.
266 		 */
267 		DB_ASSERT(dbp->env,
268 		    OV_LEN(lastp) == (tlen - dbc->internal->stream_off));
269 		space = pagespace - OV_LEN(lastp);
270 
271 		/* Only copy as much data as we have. */
272 		if (space > dbt->dlen)
273 			space = dbt->dlen;
274 
275 		if (DBC_LOGGING(dbc)) {
276 			tmp_dbt.data = dbt->data;
277 			tmp_dbt.size = space;
278 			ZERO_LSN(null_lsn);
279 			if ((ret = __db_big_log(dbp, dbc->txn, &LSN(lastp), 0,
280 			    OP_SET(DB_APPEND_BIG, lastp), pgno,
281 			    PGNO_INVALID, PGNO_INVALID, &tmp_dbt,
282 			    &LSN(lastp), &null_lsn, &null_lsn)) != 0)
283 				goto err;
284 		} else
285 			LSN_NOT_LOGGED(LSN(lastp));
286 
287 		memcpy((u_int8_t *)lastp + P_OVERHEAD(dbp) + OV_LEN(lastp),
288 		    dbt->data, space);
289 		OV_LEN(lastp) += space;
290 		sz -= space + dbt->doff;
291 		p += space;
292 		*pgnop = dbc->internal->stream_start_pgno;
293 	}
294 
295 	ret = 0;
296 	for (; sz > 0; p += pagespace, sz -= pagespace) {
297 		/*
298 		 * Reduce pagespace so we terminate the loop correctly and
299 		 * don't copy too much data.
300 		 */
301 		if (sz < pagespace)
302 			pagespace = sz;
303 
304 		/*
305 		 * Allocate and initialize a new page and copy all or part of
306 		 * the item onto the page.  If sz is less than pagespace, we
307 		 * have a partial record.
308 		 */
309 		if ((ret = __db_new(dbc, P_OVERFLOW, NULL, &pagep)) != 0)
310 			break;
311 		if (DBC_LOGGING(dbc)) {
312 			tmp_dbt.data = p;
313 			tmp_dbt.size = pagespace;
314 			ZERO_LSN(null_lsn);
315 			if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
316 			    OP_SET(DB_ADD_BIG, pagep),
317 			    PGNO(pagep), lastp ? PGNO(lastp) : PGNO_INVALID,
318 			    PGNO_INVALID, &tmp_dbt, &LSN(pagep),
319 			    lastp == NULL ? &null_lsn : &LSN(lastp),
320 			    &null_lsn)) != 0) {
321 				(void)__memp_fput(mpf, dbc->thread_info,
322 				    pagep, dbc->priority);
323 				goto err;
324 			}
325 		} else
326 			LSN_NOT_LOGGED(LSN(pagep));
327 
328 		/* Move LSN onto page. */
329 		if (lastp != NULL)
330 			LSN(lastp) = LSN(pagep);
331 
332 		OV_LEN(pagep) = pagespace;
333 		OV_REF(pagep) = 1;
334 		memcpy((u_int8_t *)pagep + P_OVERHEAD(dbp), p, pagespace);
335 
336 		/*
337 		 * If this is the first entry, update the user's info and
338 		 * initialize the cursor to allow for streaming of subsequent
339 		 * updates.  Otherwise, update the entry on the last page
340 		 * filled in and release that page.
341 		 */
342 		if (lastp == NULL) {
343 			*pgnop = PGNO(pagep);
344 			dbc->internal->stream_start_pgno =
345 			    dbc->internal->stream_curr_pgno = *pgnop;
346 			dbc->internal->stream_off = 0;
347 		} else {
348 			lastp->next_pgno = PGNO(pagep);
349 			pagep->prev_pgno = PGNO(lastp);
350 			if ((ret = __memp_fput(mpf,
351 			    dbc->thread_info, lastp, dbc->priority)) != 0) {
352 				lastp = NULL;
353 				goto err;
354 			}
355 		}
356 		lastp = pagep;
357 	}
358 err:	if (lastp != NULL) {
359 		if (ret == 0) {
360 			dbc->internal->stream_curr_pgno = PGNO(lastp);
361 			dbc->internal->stream_off = dbt->size - OV_LEN(lastp);
362 		}
363 
364 		if ((t_ret = __memp_fput(mpf, dbc->thread_info, lastp,
365 		    dbc->priority)) != 0 && ret == 0)
366 			ret = t_ret;
367 	}
368 	return (ret);
369 }
370 
371 /*
372  * __db_ovref --
373  *	Decrement the reference count on an overflow page.
374  *
375  * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t));
376  */
377 int
__db_ovref(dbc,pgno)378 __db_ovref(dbc, pgno)
379 	DBC *dbc;
380 	db_pgno_t pgno;
381 {
382 	DB *dbp;
383 	DB_MPOOLFILE *mpf;
384 	PAGE *h;
385 	int ret;
386 
387 	dbp = dbc->dbp;
388 	mpf = dbp->mpf;
389 
390 	if ((ret = __memp_fget(mpf, &pgno,
391 	     dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &h)) != 0)
392 		return (ret);
393 
394 	if (DBC_LOGGING(dbc)) {
395 		if ((ret = __db_ovref_log(dbp,
396 		    dbc->txn, &LSN(h), 0, h->pgno, -1, &LSN(h))) != 0) {
397 			(void)__memp_fput(mpf,
398 			     dbc->thread_info, h, dbc->priority);
399 			return (ret);
400 		}
401 	} else
402 		LSN_NOT_LOGGED(LSN(h));
403 
404 	/*
405 	 * In BDB releases before 4.5, the overflow reference counts were
406 	 * incremented when an overflow item was split onto an internal
407 	 * page.  There was a lock race in that code, and rather than fix
408 	 * the race, we changed BDB to copy overflow items when splitting
409 	 * them onto internal pages.  The code to decrement reference
410 	 * counts remains so databases already in the field continue to
411 	 * work.
412 	 */
413 	--OV_REF(h);
414 
415 	return (__memp_fput(mpf, dbc->thread_info, h, dbc->priority));
416 }
417 
418 /*
419  * __db_doff --
420  *	Delete an offpage chain of overflow pages.
421  *
422  * PUBLIC: int __db_doff __P((DBC *, db_pgno_t));
423  */
424 int
__db_doff(dbc,pgno)425 __db_doff(dbc, pgno)
426 	DBC *dbc;
427 	db_pgno_t pgno;
428 {
429 	DB *dbp;
430 	DBT tmp_dbt;
431 	DB_LSN null_lsn;
432 	DB_MPOOLFILE *mpf;
433 	PAGE *pagep;
434 	int ret;
435 
436 	dbp = dbc->dbp;
437 	mpf = dbp->mpf;
438 
439 	do {
440 		if ((ret = __memp_fget(mpf, &pgno,
441 		     dbc->thread_info, dbc->txn, 0, &pagep)) != 0)
442 			return (ret);
443 
444 		DB_ASSERT(dbp->env, TYPE(pagep) == P_OVERFLOW);
445 		/*
446 		 * If it's referenced by more than one key/data item,
447 		 * decrement the reference count and return.
448 		 */
449 		if (OV_REF(pagep) > 1) {
450 			(void)__memp_fput(mpf,
451 			    dbc->thread_info, pagep, dbc->priority);
452 			return (__db_ovref(dbc, pgno));
453 		}
454 
455 		if ((ret = __memp_dirty(mpf, &pagep,
456 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
457 			if (pagep != NULL)
458 				(void)__memp_fput(mpf,
459 				    dbc->thread_info, pagep, dbc->priority);
460 			return (ret);
461 		}
462 
463 		if (DBC_LOGGING(dbc)) {
464 			tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD(dbp);
465 			tmp_dbt.size = OV_LEN(pagep);
466 			ZERO_LSN(null_lsn);
467 			if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
468 			    OP_SET(DB_REM_BIG, pagep), PGNO(pagep),
469 			    PREV_PGNO(pagep), NEXT_PGNO(pagep), &tmp_dbt,
470 			    &LSN(pagep), &null_lsn, &null_lsn)) != 0) {
471 				(void)__memp_fput(mpf,
472 				    dbc->thread_info, pagep, dbc->priority);
473 				return (ret);
474 			}
475 		} else
476 			LSN_NOT_LOGGED(LSN(pagep));
477 		pgno = pagep->next_pgno;
478 		OV_LEN(pagep) = 0;
479 		if ((ret = __db_free(dbc, pagep, 0)) != 0)
480 			return (ret);
481 	} while (pgno != PGNO_INVALID);
482 
483 	return (0);
484 }
485 
486 /*
487  * __db_moff --
488  *	Match on overflow pages.
489  *
490  * Given a starting page number and a key, return <0, 0, >0 to indicate if the
491  * key on the page is less than, equal to or greater than the key specified.
492  * We optimize this by doing chunk at a time comparison unless the user has
493  * specified a comparison function.  In this case, we need to materialize
494  * the entire object and call their comparison routine.
495  *
496  * __db_moff and __db_coff are generic functions useful in searching and
497  * ordering off page items. __db_moff matches an overflow DBT with an offpage
498  * item. __db_coff compares two offpage items for lexicographic sort order.
499  *
500  * PUBLIC: int __db_moff __P((DBC *, const DBT *, db_pgno_t, u_int32_t,
501  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *), int *));
502  */
503 int
__db_moff(dbc,dbt,pgno,tlen,cmpfunc,cmpp)504 __db_moff(dbc, dbt, pgno, tlen, cmpfunc, cmpp)
505 	DBC *dbc;
506 	const DBT *dbt;
507 	db_pgno_t pgno;
508 	u_int32_t tlen;
509 	int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
510 {
511 	DB *dbp;
512 	DBT local_dbt;
513 	DB_MPOOLFILE *mpf;
514 	DB_THREAD_INFO *ip;
515 	PAGE *pagep;
516 	void *buf;
517 	u_int32_t bufsize, cmp_bytes, key_left;
518 	u_int8_t *p1, *p2;
519 	int ret;
520 
521 	dbp = dbc->dbp;
522 	ip = dbc->thread_info;
523 	mpf = dbp->mpf;
524 
525 	/*
526 	 * If there is a user-specified comparison function, build a
527 	 * contiguous copy of the key, and call it.
528 	 */
529 	if (cmpfunc != NULL) {
530 		memset(&local_dbt, 0, sizeof(local_dbt));
531 		buf = NULL;
532 		bufsize = 0;
533 
534 		if ((ret = __db_goff(dbc,
535 		    &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
536 			return (ret);
537 		/* Pass the key as the first argument */
538 		*cmpp = cmpfunc(dbp, dbt, &local_dbt);
539 		__os_free(dbp->env, buf);
540 		return (0);
541 	}
542 
543 	/* While there are both keys to compare. */
544 	for (*cmpp = 0, p1 = dbt->data,
545 	    key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) {
546 		if ((ret =
547 		    __memp_fget(mpf, &pgno, ip, dbc->txn, 0, &pagep)) != 0)
548 			return (ret);
549 
550 		DB_ASSERT(dbc->env, TYPE(pagep) == P_OVERFLOW);
551 		cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left;
552 		tlen -= cmp_bytes;
553 		key_left -= cmp_bytes;
554 		for (p2 = (u_int8_t *)pagep + P_OVERHEAD(dbp);
555 		    cmp_bytes-- > 0; ++p1, ++p2)
556 			if (*p1 != *p2) {
557 				*cmpp = (long)*p1 - (long)*p2;
558 				break;
559 			}
560 		pgno = NEXT_PGNO(pagep);
561 		if ((ret = __memp_fput(mpf, ip, pagep, dbp->priority)) != 0)
562 			return (ret);
563 		if (*cmpp != 0)
564 			return (0);
565 	}
566 	if (key_left > 0)		/* DBT is longer than the page key. */
567 		*cmpp = 1;
568 	else if (tlen > 0)		/* DBT is shorter than the page key. */
569 		*cmpp = -1;
570 	else
571 		*cmpp = 0;
572 
573 	return (0);
574 }
575 
576 /*
577  * __db_coff --
578  *	Match two offpage dbts.
579  *
580  * The DBTs must both refer to offpage items.
581  * The match happens a chunk (page) at a time unless a user defined comparison
582  * function exists. It is not possible to optimize this comparison away when
583  * a lexicographic sort order is required on mismatch.
584  *
585  * NOTE: For now this function only works for H_OFFPAGE type items. It would
586  * be simple to extend it for use with B_OVERFLOW type items. It would only
587  * require extracting the total length, and page number, dependent on the
588  * DBT type.
589  *
590  * PUBLIC: int __db_coff __P((DBC *, const DBT *, const DBT *,
591  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *), int *));
592  */
593 int
__db_coff(dbc,dbt,match,cmpfunc,cmpp)594 __db_coff(dbc, dbt, match, cmpfunc, cmpp)
595 	DBC *dbc;
596 	const DBT *dbt, *match;
597 	int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
598 {
599 	DB *dbp;
600 	DB_THREAD_INFO *ip;
601 	DB_MPOOLFILE *mpf;
602 	DB_TXN *txn;
603 	DBT local_key, local_match;
604 	PAGE *dbt_pagep, *match_pagep;
605 	db_pgno_t dbt_pgno, match_pgno;
606 	u_int32_t cmp_bytes, dbt_bufsz, dbt_len, match_bufsz;
607 	u_int32_t match_len, max_data, page_space;
608 	u_int8_t *p1, *p2;
609 	int ret;
610 	void *dbt_buf, *match_buf;
611 
612 	dbp = dbc->dbp;
613 	ip = dbc->thread_info;
614 	txn = dbc->txn;
615 	mpf = dbp->mpf;
616 	page_space = P_MAXSPACE(dbp, dbp->pgsize);
617 	*cmpp = 0;
618 	dbt_buf = match_buf = NULL;
619 
620 	DB_ASSERT(dbp->env, HPAGE_PTYPE(dbt->data) == H_OFFPAGE);
621 	DB_ASSERT(dbp->env, HPAGE_PTYPE(match->data) == H_OFFPAGE);
622 
623 	/* Extract potentially unaligned length and pgno fields from DBTs */
624 	memcpy(&dbt_len, HOFFPAGE_TLEN(dbt->data), sizeof(u_int32_t));
625 	memcpy(&dbt_pgno, HOFFPAGE_PGNO(dbt->data), sizeof(db_pgno_t));
626 	memcpy(&match_len, HOFFPAGE_TLEN(match->data), sizeof(u_int32_t));
627 	memcpy(&match_pgno, HOFFPAGE_PGNO(match->data), sizeof(db_pgno_t));
628 	max_data = (dbt_len < match_len ? dbt_len : match_len);
629 
630 	/*
631 	 * If there is a custom comparator, fully resolve both DBTs.
632 	 * Then call the users comparator.
633 	 */
634 	if (cmpfunc != NULL) {
635 		memset(&local_key, 0, sizeof(local_key));
636 		memset(&local_match, 0, sizeof(local_match));
637 		dbt_buf = match_buf = NULL;
638 		dbt_bufsz = match_bufsz = 0;
639 
640 		if ((ret = __db_goff(dbc, &local_key, dbt_len,
641 		    dbt_pgno, &dbt_buf, &dbt_bufsz)) != 0)
642 			goto err1;
643 		if ((ret = __db_goff(dbc, &local_match, match_len,
644 		    match_pgno, &match_buf, &match_bufsz)) != 0)
645 			goto err1;
646 		/* The key needs to be the first argument for sort order */
647 		*cmpp = cmpfunc(dbp, &local_key, &local_match);
648 
649 err1:		if (dbt_buf != NULL)
650 			__os_free(dbp->env, dbt_buf);
651 		if (match_buf != NULL)
652 			__os_free(dbp->env, match_buf);
653 		return (ret);
654 	}
655 
656 	/* Match the offpage DBTs a page at a time. */
657 	while (dbt_pgno != PGNO_INVALID && match_pgno != PGNO_INVALID) {
658 		if ((ret =
659 		    __memp_fget(mpf, &dbt_pgno, ip, txn, 0, &dbt_pagep)) != 0)
660 			return (ret);
661 		DB_ASSERT(dbc->env, TYPE(dbt_pagep) == P_OVERFLOW);
662 		if ((ret =
663 		    __memp_fget(mpf, &match_pgno,
664 			ip, txn, 0, &match_pagep)) != 0) {
665 			(void)__memp_fput(
666 			    mpf, ip, dbt_pagep, DB_PRIORITY_UNCHANGED);
667 			return (ret);
668 		}
669 		DB_ASSERT(dbc->env, TYPE(match_pagep) == P_OVERFLOW);
670 		cmp_bytes = page_space < max_data ? page_space : max_data;
671 		for (p1 = (u_int8_t *)dbt_pagep + P_OVERHEAD(dbp),
672 		    p2 = (u_int8_t *)match_pagep + P_OVERHEAD(dbp);
673 		    cmp_bytes-- > 0; ++p1, ++p2)
674 				if (*p1 != *p2) {
675 					*cmpp = (long)*p1 - (long)*p2;
676 					break;
677 				}
678 
679 		dbt_pgno = NEXT_PGNO(dbt_pagep);
680 		match_pgno = NEXT_PGNO(match_pagep);
681 		max_data -= page_space;
682 		if ((ret = __memp_fput(mpf,
683 		     ip, dbt_pagep, DB_PRIORITY_UNCHANGED)) != 0) {
684 			(void)__memp_fput(mpf,
685 			    ip, match_pagep, DB_PRIORITY_UNCHANGED);
686 			return (ret);
687 		}
688 		if ((ret = __memp_fput(mpf,
689 		    ip, match_pagep, DB_PRIORITY_UNCHANGED)) != 0)
690 			return (ret);
691 		if (*cmpp != 0)
692 			return (0);
693 	}
694 
695 	/* If a lexicographic mismatch was found, then the result has already
696 	 * been returned. If the DBTs matched, consider the lengths of the
697 	 * items, and return appropriately.
698 	 */
699 	if (dbt_len > match_len) /* DBT is longer than the match key. */
700 		*cmpp = 1;
701 	else if (match_len > dbt_len) /* DBT is shorter than the match key. */
702 		*cmpp = -1;
703 	else
704 		*cmpp = 0;
705 
706 	return (0);
707 
708 }
709