1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 2013 Oracle and/or its affiliates. All rights reserved.
5 */
6 /*
7 * Copyright (c) 1990, 1993, 1994, 1995, 1996
8 * Keith Bostic. All rights reserved.
9 */
10 /*
11 * Copyright (c) 1990, 1993, 1994, 1995
12 * The Regents of the University of California. All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Mike Olson.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 * notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 * notice, this list of conditions and the following disclaimer in the
24 * documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 * may be used to endorse or promote products derived from this software
27 * without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id$
42 */
43
44 #include "db_config.h"
45
46 #include "db_int.h"
47 #include "dbinc/db_page.h"
48 #include "dbinc/db_am.h"
49 #include "dbinc/mp.h"
50
51 /*
52 * Big key/data code.
53 *
54 * Big key and data entries are stored on linked lists of pages. The initial
55 * reference is a structure with the total length of the item and the page
56 * number where it begins. Each entry in the linked list contains a pointer
57 * to the next page of data, and so on.
58 */
59
60 /*
61 * __db_goff --
62 * Get an offpage item.
63 *
64 * PUBLIC: int __db_goff __P((DBC *,
65 * PUBLIC: DBT *, u_int32_t, db_pgno_t, void **, u_int32_t *));
66 */
67 int
__db_goff(dbc,dbt,tlen,pgno,bpp,bpsz)68 __db_goff(dbc, dbt, tlen, pgno, bpp, bpsz)
69 DBC *dbc;
70 DBT *dbt;
71 u_int32_t tlen;
72 db_pgno_t pgno;
73 void **bpp;
74 u_int32_t *bpsz;
75 {
76 DB *dbp;
77 DB_MPOOLFILE *mpf;
78 DB_TXN *txn;
79 DBC_INTERNAL *cp;
80 ENV *env;
81 PAGE *h;
82 DB_THREAD_INFO *ip;
83 db_indx_t bytes;
84 u_int32_t curoff, needed, start;
85 u_int8_t *p, *src;
86 int ret;
87
88 dbp = dbc->dbp;
89 cp = dbc->internal;
90 env = dbp->env;
91 ip = dbc->thread_info;
92 mpf = dbp->mpf;
93 txn = dbc->txn;
94
95 /*
96 * Check if the buffer is big enough; if it is not and we are
97 * allowed to malloc space, then we'll malloc it. If we are
98 * not (DB_DBT_USERMEM), then we'll set the dbt and return
99 * appropriately.
100 */
101 if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
102 start = dbt->doff;
103 if (start > tlen)
104 needed = 0;
105 else if (dbt->dlen > tlen - start)
106 needed = tlen - start;
107 else
108 needed = dbt->dlen;
109 } else {
110 start = 0;
111 needed = tlen;
112 }
113
114 /*
115 * If the caller has not requested any data, return success. This
116 * "early-out" also avoids setting up the streaming optimization when
117 * no page would be retrieved. If it were removed, the streaming code
118 * should only initialize when needed is not 0.
119 */
120 if (needed == 0) {
121 dbt->size = 0;
122 return (0);
123 }
124
125 if (F_ISSET(dbt, DB_DBT_USERCOPY))
126 goto skip_alloc;
127
128 /* Allocate any necessary memory. */
129 if (F_ISSET(dbt, DB_DBT_USERMEM)) {
130 if (needed > dbt->ulen) {
131 dbt->size = needed;
132 return (DB_BUFFER_SMALL);
133 }
134 } else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
135 if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
136 return (ret);
137 } else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
138 if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
139 return (ret);
140 } else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
141 if ((ret = __os_realloc(env, needed, bpp)) != 0)
142 return (ret);
143 *bpsz = needed;
144 dbt->data = *bpp;
145 } else if (bpp != NULL)
146 dbt->data = *bpp;
147 else {
148 DB_ASSERT(env,
149 F_ISSET(dbt,
150 DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
151 bpsz != NULL);
152 return (DB_BUFFER_SMALL);
153 }
154
155 skip_alloc:
156 /* Set up a start page in the overflow chain if streaming. */
157 if (cp->stream_start_pgno != PGNO_INVALID &&
158 pgno == cp->stream_start_pgno && start >= cp->stream_off &&
159 start < cp->stream_off + P_MAXSPACE(dbp, dbp->pgsize)) {
160 pgno = cp->stream_curr_pgno;
161 curoff = cp->stream_off;
162 } else {
163 cp->stream_start_pgno = cp->stream_curr_pgno = pgno;
164 cp->stream_off = curoff = 0;
165 }
166
167 /*
168 * Step through the linked list of pages, copying the data on each
169 * one into the buffer. Never copy more than the total data length.
170 */
171 dbt->size = needed;
172 for (p = dbt->data; pgno != PGNO_INVALID && needed > 0;) {
173 if ((ret = __memp_fget(mpf,
174 &pgno, ip, txn, 0, &h)) != 0)
175 return (ret);
176 DB_ASSERT(env, TYPE(h) == P_OVERFLOW);
177
178 /* Check if we need any bytes from this page. */
179 if (curoff + OV_LEN(h) >= start) {
180 bytes = OV_LEN(h);
181 src = (u_int8_t *)h + P_OVERHEAD(dbp);
182 if (start > curoff) {
183 src += start - curoff;
184 bytes -= start - curoff;
185 }
186 if (bytes > needed)
187 bytes = needed;
188 if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
189 /*
190 * The offset into the DBT is the total size
191 * less the amount of data still needed. Care
192 * needs to be taken if doing a partial copy
193 * beginning at an offset other than 0.
194 */
195 if ((ret = env->dbt_usercopy(
196 dbt, dbt->size - needed,
197 src, bytes, DB_USERCOPY_SETDATA)) != 0) {
198 (void)__memp_fput(mpf,
199 ip, h, dbp->priority);
200 return (ret);
201 }
202 } else
203 memcpy(p, src, bytes);
204 p += bytes;
205 needed -= bytes;
206 }
207 cp->stream_off = curoff;
208 curoff += OV_LEN(h);
209 cp->stream_curr_pgno = pgno;
210 pgno = h->next_pgno;
211 (void)__memp_fput(mpf, ip, h, dbp->priority);
212 }
213
214 return (0);
215 }
216
217 /*
218 * __db_poff --
219 * Put an offpage item.
220 *
221 * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *));
222 */
223 int
__db_poff(dbc,dbt,pgnop)224 __db_poff(dbc, dbt, pgnop)
225 DBC *dbc;
226 const DBT *dbt;
227 db_pgno_t *pgnop;
228 {
229 DB *dbp;
230 DBT tmp_dbt;
231 DB_LSN null_lsn;
232 DB_MPOOLFILE *mpf;
233 PAGE *pagep, *lastp;
234 db_indx_t pagespace;
235 db_pgno_t pgno;
236 u_int32_t space, sz, tlen;
237 u_int8_t *p;
238 int ret, t_ret;
239
240 /*
241 * Allocate pages and copy the key/data item into them. Calculate the
242 * number of bytes we get for pages we fill completely with a single
243 * item.
244 */
245 dbp = dbc->dbp;
246 lastp = NULL;
247 mpf = dbp->mpf;
248 pagespace = P_MAXSPACE(dbp, dbp->pgsize);
249 p = dbt->data;
250 sz = dbt->size;
251
252 /*
253 * Check whether we are streaming at the end of the overflow item.
254 * If so, the last pgno and offset will be cached in the cursor.
255 */
256 if (F_ISSET(dbt, DB_DBT_STREAMING)) {
257 tlen = dbt->size - dbt->dlen;
258 pgno = dbc->internal->stream_curr_pgno;
259 if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info,
260 dbc->txn, DB_MPOOL_DIRTY, &lastp)) != 0)
261 return (ret);
262
263 /*
264 * Calculate how much we can write on the last page of the
265 * overflow item.
266 */
267 DB_ASSERT(dbp->env,
268 OV_LEN(lastp) == (tlen - dbc->internal->stream_off));
269 space = pagespace - OV_LEN(lastp);
270
271 /* Only copy as much data as we have. */
272 if (space > dbt->dlen)
273 space = dbt->dlen;
274
275 if (DBC_LOGGING(dbc)) {
276 tmp_dbt.data = dbt->data;
277 tmp_dbt.size = space;
278 ZERO_LSN(null_lsn);
279 if ((ret = __db_big_log(dbp, dbc->txn, &LSN(lastp), 0,
280 OP_SET(DB_APPEND_BIG, lastp), pgno,
281 PGNO_INVALID, PGNO_INVALID, &tmp_dbt,
282 &LSN(lastp), &null_lsn, &null_lsn)) != 0)
283 goto err;
284 } else
285 LSN_NOT_LOGGED(LSN(lastp));
286
287 memcpy((u_int8_t *)lastp + P_OVERHEAD(dbp) + OV_LEN(lastp),
288 dbt->data, space);
289 OV_LEN(lastp) += space;
290 sz -= space + dbt->doff;
291 p += space;
292 *pgnop = dbc->internal->stream_start_pgno;
293 }
294
295 ret = 0;
296 for (; sz > 0; p += pagespace, sz -= pagespace) {
297 /*
298 * Reduce pagespace so we terminate the loop correctly and
299 * don't copy too much data.
300 */
301 if (sz < pagespace)
302 pagespace = sz;
303
304 /*
305 * Allocate and initialize a new page and copy all or part of
306 * the item onto the page. If sz is less than pagespace, we
307 * have a partial record.
308 */
309 if ((ret = __db_new(dbc, P_OVERFLOW, NULL, &pagep)) != 0)
310 break;
311 if (DBC_LOGGING(dbc)) {
312 tmp_dbt.data = p;
313 tmp_dbt.size = pagespace;
314 ZERO_LSN(null_lsn);
315 if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
316 OP_SET(DB_ADD_BIG, pagep),
317 PGNO(pagep), lastp ? PGNO(lastp) : PGNO_INVALID,
318 PGNO_INVALID, &tmp_dbt, &LSN(pagep),
319 lastp == NULL ? &null_lsn : &LSN(lastp),
320 &null_lsn)) != 0) {
321 (void)__memp_fput(mpf, dbc->thread_info,
322 pagep, dbc->priority);
323 goto err;
324 }
325 } else
326 LSN_NOT_LOGGED(LSN(pagep));
327
328 /* Move LSN onto page. */
329 if (lastp != NULL)
330 LSN(lastp) = LSN(pagep);
331
332 OV_LEN(pagep) = pagespace;
333 OV_REF(pagep) = 1;
334 memcpy((u_int8_t *)pagep + P_OVERHEAD(dbp), p, pagespace);
335
336 /*
337 * If this is the first entry, update the user's info and
338 * initialize the cursor to allow for streaming of subsequent
339 * updates. Otherwise, update the entry on the last page
340 * filled in and release that page.
341 */
342 if (lastp == NULL) {
343 *pgnop = PGNO(pagep);
344 dbc->internal->stream_start_pgno =
345 dbc->internal->stream_curr_pgno = *pgnop;
346 dbc->internal->stream_off = 0;
347 } else {
348 lastp->next_pgno = PGNO(pagep);
349 pagep->prev_pgno = PGNO(lastp);
350 if ((ret = __memp_fput(mpf,
351 dbc->thread_info, lastp, dbc->priority)) != 0) {
352 lastp = NULL;
353 goto err;
354 }
355 }
356 lastp = pagep;
357 }
358 err: if (lastp != NULL) {
359 if (ret == 0) {
360 dbc->internal->stream_curr_pgno = PGNO(lastp);
361 dbc->internal->stream_off = dbt->size - OV_LEN(lastp);
362 }
363
364 if ((t_ret = __memp_fput(mpf, dbc->thread_info, lastp,
365 dbc->priority)) != 0 && ret == 0)
366 ret = t_ret;
367 }
368 return (ret);
369 }
370
371 /*
372 * __db_ovref --
373 * Decrement the reference count on an overflow page.
374 *
375 * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t));
376 */
377 int
__db_ovref(dbc,pgno)378 __db_ovref(dbc, pgno)
379 DBC *dbc;
380 db_pgno_t pgno;
381 {
382 DB *dbp;
383 DB_MPOOLFILE *mpf;
384 PAGE *h;
385 int ret;
386
387 dbp = dbc->dbp;
388 mpf = dbp->mpf;
389
390 if ((ret = __memp_fget(mpf, &pgno,
391 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &h)) != 0)
392 return (ret);
393
394 if (DBC_LOGGING(dbc)) {
395 if ((ret = __db_ovref_log(dbp,
396 dbc->txn, &LSN(h), 0, h->pgno, -1, &LSN(h))) != 0) {
397 (void)__memp_fput(mpf,
398 dbc->thread_info, h, dbc->priority);
399 return (ret);
400 }
401 } else
402 LSN_NOT_LOGGED(LSN(h));
403
404 /*
405 * In BDB releases before 4.5, the overflow reference counts were
406 * incremented when an overflow item was split onto an internal
407 * page. There was a lock race in that code, and rather than fix
408 * the race, we changed BDB to copy overflow items when splitting
409 * them onto internal pages. The code to decrement reference
410 * counts remains so databases already in the field continue to
411 * work.
412 */
413 --OV_REF(h);
414
415 return (__memp_fput(mpf, dbc->thread_info, h, dbc->priority));
416 }
417
418 /*
419 * __db_doff --
420 * Delete an offpage chain of overflow pages.
421 *
422 * PUBLIC: int __db_doff __P((DBC *, db_pgno_t));
423 */
424 int
__db_doff(dbc,pgno)425 __db_doff(dbc, pgno)
426 DBC *dbc;
427 db_pgno_t pgno;
428 {
429 DB *dbp;
430 DBT tmp_dbt;
431 DB_LSN null_lsn;
432 DB_MPOOLFILE *mpf;
433 PAGE *pagep;
434 int ret;
435
436 dbp = dbc->dbp;
437 mpf = dbp->mpf;
438
439 do {
440 if ((ret = __memp_fget(mpf, &pgno,
441 dbc->thread_info, dbc->txn, 0, &pagep)) != 0)
442 return (ret);
443
444 DB_ASSERT(dbp->env, TYPE(pagep) == P_OVERFLOW);
445 /*
446 * If it's referenced by more than one key/data item,
447 * decrement the reference count and return.
448 */
449 if (OV_REF(pagep) > 1) {
450 (void)__memp_fput(mpf,
451 dbc->thread_info, pagep, dbc->priority);
452 return (__db_ovref(dbc, pgno));
453 }
454
455 if ((ret = __memp_dirty(mpf, &pagep,
456 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
457 if (pagep != NULL)
458 (void)__memp_fput(mpf,
459 dbc->thread_info, pagep, dbc->priority);
460 return (ret);
461 }
462
463 if (DBC_LOGGING(dbc)) {
464 tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD(dbp);
465 tmp_dbt.size = OV_LEN(pagep);
466 ZERO_LSN(null_lsn);
467 if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
468 OP_SET(DB_REM_BIG, pagep), PGNO(pagep),
469 PREV_PGNO(pagep), NEXT_PGNO(pagep), &tmp_dbt,
470 &LSN(pagep), &null_lsn, &null_lsn)) != 0) {
471 (void)__memp_fput(mpf,
472 dbc->thread_info, pagep, dbc->priority);
473 return (ret);
474 }
475 } else
476 LSN_NOT_LOGGED(LSN(pagep));
477 pgno = pagep->next_pgno;
478 OV_LEN(pagep) = 0;
479 if ((ret = __db_free(dbc, pagep, 0)) != 0)
480 return (ret);
481 } while (pgno != PGNO_INVALID);
482
483 return (0);
484 }
485
486 /*
487 * __db_moff --
488 * Match on overflow pages.
489 *
490 * Given a starting page number and a key, return <0, 0, >0 to indicate if the
491 * key on the page is less than, equal to or greater than the key specified.
492 * We optimize this by doing chunk at a time comparison unless the user has
493 * specified a comparison function. In this case, we need to materialize
494 * the entire object and call their comparison routine.
495 *
496 * __db_moff and __db_coff are generic functions useful in searching and
497 * ordering off page items. __db_moff matches an overflow DBT with an offpage
498 * item. __db_coff compares two offpage items for lexicographic sort order.
499 *
500 * PUBLIC: int __db_moff __P((DBC *, const DBT *, db_pgno_t, u_int32_t,
501 * PUBLIC: int (*)(DB *, const DBT *, const DBT *), int *));
502 */
503 int
__db_moff(dbc,dbt,pgno,tlen,cmpfunc,cmpp)504 __db_moff(dbc, dbt, pgno, tlen, cmpfunc, cmpp)
505 DBC *dbc;
506 const DBT *dbt;
507 db_pgno_t pgno;
508 u_int32_t tlen;
509 int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
510 {
511 DB *dbp;
512 DBT local_dbt;
513 DB_MPOOLFILE *mpf;
514 DB_THREAD_INFO *ip;
515 PAGE *pagep;
516 void *buf;
517 u_int32_t bufsize, cmp_bytes, key_left;
518 u_int8_t *p1, *p2;
519 int ret;
520
521 dbp = dbc->dbp;
522 ip = dbc->thread_info;
523 mpf = dbp->mpf;
524
525 /*
526 * If there is a user-specified comparison function, build a
527 * contiguous copy of the key, and call it.
528 */
529 if (cmpfunc != NULL) {
530 memset(&local_dbt, 0, sizeof(local_dbt));
531 buf = NULL;
532 bufsize = 0;
533
534 if ((ret = __db_goff(dbc,
535 &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
536 return (ret);
537 /* Pass the key as the first argument */
538 *cmpp = cmpfunc(dbp, dbt, &local_dbt);
539 __os_free(dbp->env, buf);
540 return (0);
541 }
542
543 /* While there are both keys to compare. */
544 for (*cmpp = 0, p1 = dbt->data,
545 key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) {
546 if ((ret =
547 __memp_fget(mpf, &pgno, ip, dbc->txn, 0, &pagep)) != 0)
548 return (ret);
549
550 DB_ASSERT(dbc->env, TYPE(pagep) == P_OVERFLOW);
551 cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left;
552 tlen -= cmp_bytes;
553 key_left -= cmp_bytes;
554 for (p2 = (u_int8_t *)pagep + P_OVERHEAD(dbp);
555 cmp_bytes-- > 0; ++p1, ++p2)
556 if (*p1 != *p2) {
557 *cmpp = (long)*p1 - (long)*p2;
558 break;
559 }
560 pgno = NEXT_PGNO(pagep);
561 if ((ret = __memp_fput(mpf, ip, pagep, dbp->priority)) != 0)
562 return (ret);
563 if (*cmpp != 0)
564 return (0);
565 }
566 if (key_left > 0) /* DBT is longer than the page key. */
567 *cmpp = 1;
568 else if (tlen > 0) /* DBT is shorter than the page key. */
569 *cmpp = -1;
570 else
571 *cmpp = 0;
572
573 return (0);
574 }
575
576 /*
577 * __db_coff --
578 * Match two offpage dbts.
579 *
580 * The DBTs must both refer to offpage items.
581 * The match happens a chunk (page) at a time unless a user defined comparison
582 * function exists. It is not possible to optimize this comparison away when
583 * a lexicographic sort order is required on mismatch.
584 *
585 * NOTE: For now this function only works for H_OFFPAGE type items. It would
586 * be simple to extend it for use with B_OVERFLOW type items. It would only
587 * require extracting the total length, and page number, dependent on the
588 * DBT type.
589 *
590 * PUBLIC: int __db_coff __P((DBC *, const DBT *, const DBT *,
591 * PUBLIC: int (*)(DB *, const DBT *, const DBT *), int *));
592 */
593 int
__db_coff(dbc,dbt,match,cmpfunc,cmpp)594 __db_coff(dbc, dbt, match, cmpfunc, cmpp)
595 DBC *dbc;
596 const DBT *dbt, *match;
597 int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
598 {
599 DB *dbp;
600 DB_THREAD_INFO *ip;
601 DB_MPOOLFILE *mpf;
602 DB_TXN *txn;
603 DBT local_key, local_match;
604 PAGE *dbt_pagep, *match_pagep;
605 db_pgno_t dbt_pgno, match_pgno;
606 u_int32_t cmp_bytes, dbt_bufsz, dbt_len, match_bufsz;
607 u_int32_t match_len, max_data, page_space;
608 u_int8_t *p1, *p2;
609 int ret;
610 void *dbt_buf, *match_buf;
611
612 dbp = dbc->dbp;
613 ip = dbc->thread_info;
614 txn = dbc->txn;
615 mpf = dbp->mpf;
616 page_space = P_MAXSPACE(dbp, dbp->pgsize);
617 *cmpp = 0;
618 dbt_buf = match_buf = NULL;
619
620 DB_ASSERT(dbp->env, HPAGE_PTYPE(dbt->data) == H_OFFPAGE);
621 DB_ASSERT(dbp->env, HPAGE_PTYPE(match->data) == H_OFFPAGE);
622
623 /* Extract potentially unaligned length and pgno fields from DBTs */
624 memcpy(&dbt_len, HOFFPAGE_TLEN(dbt->data), sizeof(u_int32_t));
625 memcpy(&dbt_pgno, HOFFPAGE_PGNO(dbt->data), sizeof(db_pgno_t));
626 memcpy(&match_len, HOFFPAGE_TLEN(match->data), sizeof(u_int32_t));
627 memcpy(&match_pgno, HOFFPAGE_PGNO(match->data), sizeof(db_pgno_t));
628 max_data = (dbt_len < match_len ? dbt_len : match_len);
629
630 /*
631 * If there is a custom comparator, fully resolve both DBTs.
632 * Then call the users comparator.
633 */
634 if (cmpfunc != NULL) {
635 memset(&local_key, 0, sizeof(local_key));
636 memset(&local_match, 0, sizeof(local_match));
637 dbt_buf = match_buf = NULL;
638 dbt_bufsz = match_bufsz = 0;
639
640 if ((ret = __db_goff(dbc, &local_key, dbt_len,
641 dbt_pgno, &dbt_buf, &dbt_bufsz)) != 0)
642 goto err1;
643 if ((ret = __db_goff(dbc, &local_match, match_len,
644 match_pgno, &match_buf, &match_bufsz)) != 0)
645 goto err1;
646 /* The key needs to be the first argument for sort order */
647 *cmpp = cmpfunc(dbp, &local_key, &local_match);
648
649 err1: if (dbt_buf != NULL)
650 __os_free(dbp->env, dbt_buf);
651 if (match_buf != NULL)
652 __os_free(dbp->env, match_buf);
653 return (ret);
654 }
655
656 /* Match the offpage DBTs a page at a time. */
657 while (dbt_pgno != PGNO_INVALID && match_pgno != PGNO_INVALID) {
658 if ((ret =
659 __memp_fget(mpf, &dbt_pgno, ip, txn, 0, &dbt_pagep)) != 0)
660 return (ret);
661 DB_ASSERT(dbc->env, TYPE(dbt_pagep) == P_OVERFLOW);
662 if ((ret =
663 __memp_fget(mpf, &match_pgno,
664 ip, txn, 0, &match_pagep)) != 0) {
665 (void)__memp_fput(
666 mpf, ip, dbt_pagep, DB_PRIORITY_UNCHANGED);
667 return (ret);
668 }
669 DB_ASSERT(dbc->env, TYPE(match_pagep) == P_OVERFLOW);
670 cmp_bytes = page_space < max_data ? page_space : max_data;
671 for (p1 = (u_int8_t *)dbt_pagep + P_OVERHEAD(dbp),
672 p2 = (u_int8_t *)match_pagep + P_OVERHEAD(dbp);
673 cmp_bytes-- > 0; ++p1, ++p2)
674 if (*p1 != *p2) {
675 *cmpp = (long)*p1 - (long)*p2;
676 break;
677 }
678
679 dbt_pgno = NEXT_PGNO(dbt_pagep);
680 match_pgno = NEXT_PGNO(match_pagep);
681 max_data -= page_space;
682 if ((ret = __memp_fput(mpf,
683 ip, dbt_pagep, DB_PRIORITY_UNCHANGED)) != 0) {
684 (void)__memp_fput(mpf,
685 ip, match_pagep, DB_PRIORITY_UNCHANGED);
686 return (ret);
687 }
688 if ((ret = __memp_fput(mpf,
689 ip, match_pagep, DB_PRIORITY_UNCHANGED)) != 0)
690 return (ret);
691 if (*cmpp != 0)
692 return (0);
693 }
694
695 /* If a lexicographic mismatch was found, then the result has already
696 * been returned. If the DBTs matched, consider the lengths of the
697 * items, and return appropriately.
698 */
699 if (dbt_len > match_len) /* DBT is longer than the match key. */
700 *cmpp = 1;
701 else if (match_len > dbt_len) /* DBT is shorter than the match key. */
702 *cmpp = -1;
703 else
704 *cmpp = 0;
705
706 return (0);
707
708 }
709