1 /*-
2 * Copyright (c) 1996, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file LICENSE for license information.
5 */
6 /*
7 * Copyright (c) 1990, 1993, 1994, 1995, 1996
8 * Keith Bostic. All rights reserved.
9 */
10 /*
11 * Copyright (c) 1990, 1993, 1994, 1995
12 * The Regents of the University of California. All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Mike Olson.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 * notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 * notice, this list of conditions and the following disclaimer in the
24 * documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 * may be used to endorse or promote products derived from this software
27 * without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id$
42 */
43
44 #include "db_config.h"
45
46 #include "db_int.h"
47 #include "dbinc/db_page.h"
48 #include "dbinc/db_am.h"
49 #include "dbinc/mp.h"
50
51 /*
52 * Big key/data code.
53 *
54 * Big key and data entries are stored on linked lists of pages. The initial
55 * reference is a structure with the total length of the item and the page
56 * number where it begins. Each entry in the linked list contains a pointer
57 * to the next page of data, and so on.
58 */
59
60 /*
61 * __db_alloc_dbt
62 *
63 * Allocate enough space in the dbt to hold the data. Also used by the
64 * blob file API.
65 *
66 * PUBLIC: int __db_alloc_dbt __P((ENV *, DBT *, u_int32_t, u_int32_t *,
67 * PUBLIC: u_int32_t *, void **, u_int32_t *));
68 */
69 int
__db_alloc_dbt(env,dbt,tlen,nd,st,bpp,bpsz)70 __db_alloc_dbt(env, dbt, tlen, nd, st, bpp, bpsz)
71 ENV *env;
72 DBT *dbt;
73 u_int32_t tlen;
74 u_int32_t *nd;
75 u_int32_t *st;
76 void **bpp;
77 u_int32_t *bpsz;
78 {
79 int ret;
80 u_int32_t needed, start;
81
82 /*
83 * Check if the buffer is big enough; if it is not and we are
84 * allowed to malloc space, then we'll malloc it. If we are
85 * not (DB_DBT_USERMEM), then we'll set the dbt and return
86 * appropriately.
87 */
88 if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
89 start = dbt->doff;
90 if (start > tlen)
91 needed = 0;
92 else if (dbt->dlen > tlen - start)
93 needed = tlen - start;
94 else
95 needed = dbt->dlen;
96 } else {
97 start = 0;
98 needed = tlen;
99 }
100 *nd = needed;
101 *st = start;
102
103 /*
104 * If the caller has not requested any data, return success. This
105 * "early-out" also avoids setting up the streaming optimization when
106 * no page would be retrieved. If it were removed, the streaming code
107 * should only initialize when needed is not 0.
108 */
109 if (needed == 0) {
110 dbt->size = 0;
111 return (0);
112 }
113
114 if (F_ISSET(dbt, DB_DBT_USERCOPY))
115 return (0);
116
117 /* Allocate any necessary memory. */
118 if (F_ISSET(dbt, DB_DBT_USERMEM)) {
119 if (needed > dbt->ulen) {
120 dbt->size = needed;
121 return (DB_BUFFER_SMALL);
122 }
123 } else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
124 if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
125 return (ret);
126 } else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
127 if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
128 return (ret);
129 } else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
130 if ((ret = __os_realloc(env, needed, bpp)) != 0)
131 return (ret);
132 *bpsz = needed;
133 dbt->data = *bpp;
134 } else if (bpp != NULL)
135 dbt->data = *bpp;
136 else {
137 DB_ASSERT(env,
138 F_ISSET(dbt,
139 DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
140 bpsz != NULL);
141 return (DB_BUFFER_SMALL);
142 }
143
144 return (0);
145 }
146
147 /*
148 * __db_goff --
149 * Get an offpage item.
150 *
151 * PUBLIC: int __db_goff __P((DBC *,
152 * PUBLIC: DBT *, u_int32_t, db_pgno_t, void **, u_int32_t *));
153 */
154 int
__db_goff(dbc,dbt,tlen,pgno,bpp,bpsz)155 __db_goff(dbc, dbt, tlen, pgno, bpp, bpsz)
156 DBC *dbc;
157 DBT *dbt;
158 u_int32_t tlen;
159 db_pgno_t pgno;
160 void **bpp;
161 u_int32_t *bpsz;
162 {
163 DB *dbp;
164 DB_MPOOLFILE *mpf;
165 DB_TXN *txn;
166 DBC_INTERNAL *cp;
167 ENV *env;
168 PAGE *h;
169 DB_THREAD_INFO *ip;
170 db_indx_t bytes;
171 u_int32_t curoff, needed, start;
172 u_int8_t *p, *src;
173 int ret;
174
175 dbp = dbc->dbp;
176 cp = dbc->internal;
177 env = dbp->env;
178 ip = dbc->thread_info;
179 mpf = dbp->mpf;
180 txn = dbc->txn;
181
182 if (((ret = __db_alloc_dbt(
183 env, dbt, tlen, &needed, &start, bpp, bpsz)) != 0) || needed == 0)
184 return (ret);
185
186 /* Set up a start page in the overflow chain if streaming. */
187 if (cp->stream_start_pgno != PGNO_INVALID &&
188 pgno == cp->stream_start_pgno && start >= cp->stream_off &&
189 start < cp->stream_off + P_MAXSPACE(dbp, dbp->pgsize)) {
190 pgno = cp->stream_curr_pgno;
191 curoff = cp->stream_off;
192 } else {
193 cp->stream_start_pgno = cp->stream_curr_pgno = pgno;
194 cp->stream_off = curoff = 0;
195 }
196
197 /*
198 * Step through the linked list of pages, copying the data on each
199 * one into the buffer. Never copy more than the total data length.
200 */
201 dbt->size = needed;
202 for (p = dbt->data; pgno != PGNO_INVALID && needed > 0;) {
203 if ((ret = __memp_fget(mpf,
204 &pgno, ip, txn, 0, &h)) != 0)
205 return (ret);
206 DB_ASSERT(env, TYPE(h) == P_OVERFLOW);
207
208 /* Check if we need any bytes from this page. */
209 if (curoff + OV_LEN(h) >= start) {
210 bytes = OV_LEN(h);
211 src = (u_int8_t *)h + P_OVERHEAD(dbp);
212 if (start > curoff) {
213 src += start - curoff;
214 bytes -= start - curoff;
215 }
216 if (bytes > needed)
217 bytes = needed;
218 if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
219 /*
220 * The offset into the DBT is the total size
221 * less the amount of data still needed. Care
222 * needs to be taken if doing a partial copy
223 * beginning at an offset other than 0.
224 */
225 if ((ret = env->dbt_usercopy(
226 dbt, dbt->size - needed,
227 src, bytes, DB_USERCOPY_SETDATA)) != 0) {
228 (void)__memp_fput(mpf,
229 ip, h, dbp->priority);
230 return (ret);
231 }
232 } else
233 memcpy(p, src, bytes);
234 p += bytes;
235 needed -= bytes;
236 }
237 cp->stream_off = curoff;
238 curoff += OV_LEN(h);
239 cp->stream_curr_pgno = pgno;
240 pgno = h->next_pgno;
241 (void)__memp_fput(mpf, ip, h, dbp->priority);
242 }
243
244 return (0);
245 }
246
247 /*
248 * __db_poff --
249 * Put an offpage item.
250 *
251 * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *));
252 */
253 int
__db_poff(dbc,dbt,pgnop)254 __db_poff(dbc, dbt, pgnop)
255 DBC *dbc;
256 const DBT *dbt;
257 db_pgno_t *pgnop;
258 {
259 DB *dbp;
260 DBT tmp_dbt;
261 DB_LSN null_lsn;
262 DB_MPOOLFILE *mpf;
263 PAGE *pagep, *lastp;
264 db_indx_t pagespace;
265 db_pgno_t pgno;
266 u_int32_t space, sz, tlen;
267 u_int8_t *p;
268 int ret, t_ret;
269
270 COMPQUIET(tlen, 0);
271
272 /*
273 * Allocate pages and copy the key/data item into them. Calculate the
274 * number of bytes we get for pages we fill completely with a single
275 * item.
276 */
277 dbp = dbc->dbp;
278 lastp = NULL;
279 mpf = dbp->mpf;
280 pagespace = P_MAXSPACE(dbp, dbp->pgsize);
281 p = dbt->data;
282 sz = dbt->size;
283
284 /*
285 * Check whether we are streaming at the end of the overflow item.
286 * If so, the last pgno and offset will be cached in the cursor.
287 */
288 if (F_ISSET(dbt, DB_DBT_STREAMING)) {
289 tlen = dbt->size - dbt->dlen;
290 pgno = dbc->internal->stream_curr_pgno;
291 if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info,
292 dbc->txn, DB_MPOOL_DIRTY, &lastp)) != 0)
293 return (ret);
294
295 /*
296 * Calculate how much we can write on the last page of the
297 * overflow item.
298 */
299 DB_ASSERT(dbp->env,
300 OV_LEN(lastp) == (tlen - dbc->internal->stream_off));
301 space = pagespace - OV_LEN(lastp);
302
303 /* Only copy as much data as we have. */
304 if (space > dbt->dlen)
305 space = dbt->dlen;
306
307 if (DBC_LOGGING(dbc)) {
308 tmp_dbt.data = dbt->data;
309 tmp_dbt.size = space;
310 ZERO_LSN(null_lsn);
311 if ((ret = __db_big_log(dbp, dbc->txn, &LSN(lastp), 0,
312 OP_SET(DB_APPEND_BIG, lastp), pgno,
313 PGNO_INVALID, PGNO_INVALID, &tmp_dbt,
314 &LSN(lastp), &null_lsn, &null_lsn)) != 0)
315 goto err;
316 } else
317 LSN_NOT_LOGGED(LSN(lastp));
318
319 memcpy((u_int8_t *)lastp + P_OVERHEAD(dbp) + OV_LEN(lastp),
320 dbt->data, space);
321 OV_LEN(lastp) += space;
322 sz -= space + dbt->doff;
323 p += space;
324 *pgnop = dbc->internal->stream_start_pgno;
325 }
326
327 ret = 0;
328 for (; sz > 0; p += pagespace, sz -= pagespace) {
329 /*
330 * Reduce pagespace so we terminate the loop correctly and
331 * don't copy too much data.
332 */
333 if (sz < pagespace)
334 pagespace = sz;
335
336 /*
337 * Allocate and initialize a new page and copy all or part of
338 * the item onto the page. If sz is less than pagespace, we
339 * have a partial record.
340 */
341 if ((ret = __db_new(dbc, P_OVERFLOW, NULL, &pagep)) != 0)
342 break;
343 if (DBC_LOGGING(dbc)) {
344 tmp_dbt.data = p;
345 tmp_dbt.size = pagespace;
346 ZERO_LSN(null_lsn);
347 if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
348 OP_SET(DB_ADD_BIG, pagep),
349 PGNO(pagep), lastp ? PGNO(lastp) : PGNO_INVALID,
350 PGNO_INVALID, &tmp_dbt, &LSN(pagep),
351 lastp == NULL ? &null_lsn : &LSN(lastp),
352 &null_lsn)) != 0) {
353 (void)__memp_fput(mpf, dbc->thread_info,
354 pagep, dbc->priority);
355 goto err;
356 }
357 } else
358 LSN_NOT_LOGGED(LSN(pagep));
359
360 /* Move LSN onto page. */
361 if (lastp != NULL)
362 LSN(lastp) = LSN(pagep);
363
364 OV_LEN(pagep) = pagespace;
365 OV_REF(pagep) = 1;
366 memcpy((u_int8_t *)pagep + P_OVERHEAD(dbp), p, pagespace);
367
368 /*
369 * If this is the first entry, update the user's info and
370 * initialize the cursor to allow for streaming of subsequent
371 * updates. Otherwise, update the entry on the last page
372 * filled in and release that page.
373 */
374 if (lastp == NULL) {
375 *pgnop = PGNO(pagep);
376 dbc->internal->stream_start_pgno =
377 dbc->internal->stream_curr_pgno = *pgnop;
378 dbc->internal->stream_off = 0;
379 } else {
380 lastp->next_pgno = PGNO(pagep);
381 pagep->prev_pgno = PGNO(lastp);
382 if ((ret = __memp_fput(mpf,
383 dbc->thread_info, lastp, dbc->priority)) != 0) {
384 lastp = NULL;
385 goto err;
386 }
387 }
388 lastp = pagep;
389 }
390 err: if (lastp != NULL) {
391 if (ret == 0) {
392 dbc->internal->stream_curr_pgno = PGNO(lastp);
393 dbc->internal->stream_off = dbt->size - OV_LEN(lastp);
394 }
395
396 if ((t_ret = __memp_fput(mpf, dbc->thread_info, lastp,
397 dbc->priority)) != 0 && ret == 0)
398 ret = t_ret;
399 }
400 return (ret);
401 }
402
403 /*
404 * __db_ovref --
405 * Decrement the reference count on an overflow page.
406 *
407 * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t));
408 */
409 int
__db_ovref(dbc,pgno)410 __db_ovref(dbc, pgno)
411 DBC *dbc;
412 db_pgno_t pgno;
413 {
414 DB *dbp;
415 DB_MPOOLFILE *mpf;
416 PAGE *h;
417 int ret;
418
419 dbp = dbc->dbp;
420 mpf = dbp->mpf;
421
422 if ((ret = __memp_fget(mpf, &pgno,
423 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &h)) != 0)
424 return (ret);
425
426 if (DBC_LOGGING(dbc)) {
427 if ((ret = __db_ovref_log(dbp,
428 dbc->txn, &LSN(h), 0, h->pgno, -1, &LSN(h))) != 0) {
429 (void)__memp_fput(mpf,
430 dbc->thread_info, h, dbc->priority);
431 return (ret);
432 }
433 } else
434 LSN_NOT_LOGGED(LSN(h));
435
436 /*
437 * In BDB releases before 4.5, the overflow reference counts were
438 * incremented when an overflow item was split onto an internal
439 * page. There was a lock race in that code, and rather than fix
440 * the race, we changed BDB to copy overflow items when splitting
441 * them onto internal pages. The code to decrement reference
442 * counts remains so databases already in the field continue to
443 * work.
444 */
445 --OV_REF(h);
446
447 return (__memp_fput(mpf, dbc->thread_info, h, dbc->priority));
448 }
449
450 /*
451 * __db_doff --
452 * Delete an offpage chain of overflow pages.
453 *
454 * PUBLIC: int __db_doff __P((DBC *, db_pgno_t));
455 */
456 int
__db_doff(dbc,pgno)457 __db_doff(dbc, pgno)
458 DBC *dbc;
459 db_pgno_t pgno;
460 {
461 DB *dbp;
462 DBT tmp_dbt;
463 DB_LSN null_lsn;
464 DB_MPOOLFILE *mpf;
465 PAGE *pagep;
466 int ret;
467
468 dbp = dbc->dbp;
469 mpf = dbp->mpf;
470
471 do {
472 if ((ret = __memp_fget(mpf, &pgno,
473 dbc->thread_info, dbc->txn, 0, &pagep)) != 0)
474 return (ret);
475
476 DB_ASSERT(dbp->env, TYPE(pagep) == P_OVERFLOW);
477 /*
478 * If it's referenced by more than one key/data item,
479 * decrement the reference count and return.
480 */
481 if (OV_REF(pagep) > 1) {
482 (void)__memp_fput(mpf,
483 dbc->thread_info, pagep, dbc->priority);
484 return (__db_ovref(dbc, pgno));
485 }
486
487 if ((ret = __memp_dirty(mpf, &pagep,
488 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
489 if (pagep != NULL)
490 (void)__memp_fput(mpf,
491 dbc->thread_info, pagep, dbc->priority);
492 return (ret);
493 }
494
495 if (DBC_LOGGING(dbc)) {
496 tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD(dbp);
497 tmp_dbt.size = OV_LEN(pagep);
498 ZERO_LSN(null_lsn);
499 if ((ret = __db_big_log(dbp, dbc->txn, &LSN(pagep), 0,
500 OP_SET(DB_REM_BIG, pagep), PGNO(pagep),
501 PREV_PGNO(pagep), NEXT_PGNO(pagep), &tmp_dbt,
502 &LSN(pagep), &null_lsn, &null_lsn)) != 0) {
503 (void)__memp_fput(mpf,
504 dbc->thread_info, pagep, dbc->priority);
505 return (ret);
506 }
507 } else
508 LSN_NOT_LOGGED(LSN(pagep));
509 pgno = pagep->next_pgno;
510 OV_LEN(pagep) = 0;
511 if ((ret = __db_free(dbc, pagep, 0)) != 0)
512 return (ret);
513 } while (pgno != PGNO_INVALID);
514
515 return (0);
516 }
517
518 /*
519 * __db_moff --
520 * Match on overflow pages from a specific offset.
521 *
522 * Given a starting page number and a key, store <0, 0, >0 in 'cmpp' to indicate
523 * if the key on the page is less than, equal to or greater than the key
524 * specified. We optimize this by doing a chunk at a time comparison unless the
525 * user has specified a comparison function. In this case, we need to
526 * materialize the entire object and call their comparison routine.
527 *
528 * We start the comparison at an offset and update the offset with the
529 * longest matching count after the comparison.
530 *
531 * __db_moff and __db_coff are generic functions useful in searching and
532 * ordering off page items. __db_moff matches an overflow DBT with an offpage
533 * item. __db_coff compares two offpage items for lexicographic sort order.
534 *
535 * PUBLIC: int __db_moff __P((DBC *, const DBT *, db_pgno_t, u_int32_t,
536 * PUBLIC: int (*)(DB *, const DBT *, const DBT *, size_t *),
537 * PUBLIC: int *, size_t *));
538 */
539 int
__db_moff(dbc,dbt,pgno,tlen,cmpfunc,cmpp,locp)540 __db_moff(dbc, dbt, pgno, tlen, cmpfunc, cmpp, locp)
541 DBC *dbc;
542 const DBT *dbt;
543 db_pgno_t pgno;
544 u_int32_t tlen;
545 int (*cmpfunc) __P((DB *, const DBT *, const DBT *, size_t *)), *cmpp;
546 size_t *locp;
547 {
548 DB *dbp;
549 DBT local_dbt;
550 DB_MPOOLFILE *mpf;
551 DB_THREAD_INFO *ip;
552 PAGE *pagep;
553 void *buf;
554 u_int32_t bufsize, cmp_bytes, key_left;
555 u_int8_t *p1, *p2;
556 int ret;
557 size_t pos, start;
558
559 dbp = dbc->dbp;
560 ip = dbc->thread_info;
561 mpf = dbp->mpf;
562
563 /*
564 * If there is a user-specified comparison function, build a
565 * contiguous copy of the key, and call it.
566 */
567 if (cmpfunc != NULL) {
568 memset(&local_dbt, 0, sizeof(local_dbt));
569 buf = NULL;
570 bufsize = 0;
571
572 if ((ret = __db_goff(dbc,
573 &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
574 return (ret);
575 /* Pass the key as the first argument */
576 *cmpp = cmpfunc(dbp, dbt, &local_dbt, NULL);
577 __os_free(dbp->env, buf);
578 return (0);
579 }
580
581 /*
582 * We start the comparison from the location of 'locp' and store the
583 * last matching location into 'locp'.
584 */
585 start = (locp == NULL ? 0 : *locp);
586 pos = 0;
587
588 /* Subtract prefix length from lengths. */
589 tlen -= (u_int32_t)start;
590 key_left = dbt->size - (u_int32_t)start;
591 p1 = (u_int8_t *)dbt->data + start;
592
593 /* While there are both keys to compare. */
594 for (*cmpp = 0; key_left > 0 &&
595 tlen > 0 && pgno != PGNO_INVALID;) {
596 if ((ret =
597 __memp_fget(mpf, &pgno, ip, dbc->txn, 0, &pagep)) != 0)
598 return (ret);
599
600 /*
601 * Figure out where to start comparison, and how many
602 * bytes to compare.
603 */
604 if (pos >= start) {
605 p2 = (u_int8_t *)pagep + P_OVERHEAD(dbp);
606 cmp_bytes = OV_LEN(pagep);
607 } else if (pos + OV_LEN(pagep) > start) {
608 p2 = (u_int8_t *)pagep +
609 P_OVERHEAD(dbp) + (start - pos);
610 cmp_bytes = OV_LEN(pagep) - (u_int32_t)(start - pos);
611 } else {
612 p2 = NULL;
613 cmp_bytes = 0;
614 }
615
616 pos += OV_LEN(pagep);
617
618 if (cmp_bytes != 0) {
619 if (cmp_bytes > key_left)
620 cmp_bytes = key_left;
621 tlen -= cmp_bytes;
622 key_left -= cmp_bytes;
623 for (;cmp_bytes-- > 0; ++p1, ++p2) {
624 if (*p1 != *p2) {
625 *cmpp = (long)*p1 - (long)*p2;
626 break;
627 }
628 if (locp != NULL)
629 ++(*locp);
630 }
631
632 }
633 pgno = NEXT_PGNO(pagep);
634 if ((ret = __memp_fput(mpf, ip, pagep, dbp->priority)) != 0)
635 return (ret);
636 if (*cmpp != 0)
637 return (0);
638 }
639
640 if (*cmpp == 0) {
641 if (key_left > 0) /* DBT is longer than the page key. */
642 *cmpp = 1;
643 else if (tlen > 0) /* DBT is shorter than the page key. */
644 *cmpp = -1;
645 }
646
647 return (0);
648 }
649
650 /*
651 * __db_coff --
652 * Match two offpage dbts.
653 *
654 * The DBTs must both refer to offpage items.
655 * The match happens a chunk (page) at a time unless a user defined comparison
656 * function exists. It is not possible to optimize this comparison away when
657 * a lexicographic sort order is required on mismatch.
658 *
659 * NOTE: For now this function only works for H_OFFPAGE type items. It would
660 * be simple to extend it for use with B_OVERFLOW type items. It would only
661 * require extracting the total length, and page number, dependent on the
662 * DBT type.
663 *
664 * PUBLIC: int __db_coff __P((DBC *, const DBT *, const DBT *,
665 * PUBLIC: int (*)(DB *, const DBT *, const DBT *, size_t *), int *));
666 */
667 int
__db_coff(dbc,dbt,match,cmpfunc,cmpp)668 __db_coff(dbc, dbt, match, cmpfunc, cmpp)
669 DBC *dbc;
670 const DBT *dbt, *match;
671 int (*cmpfunc) __P((DB *, const DBT *, const DBT *, size_t *)), *cmpp;
672 {
673 DB *dbp;
674 DB_THREAD_INFO *ip;
675 DB_MPOOLFILE *mpf;
676 DB_TXN *txn;
677 DBT local_key, local_match;
678 PAGE *dbt_pagep, *match_pagep;
679 db_pgno_t dbt_pgno, match_pgno;
680 u_int32_t cmp_bytes, dbt_bufsz, dbt_len, match_bufsz;
681 u_int32_t match_len, max_data, page_space;
682 u_int8_t *p1, *p2;
683 int ret;
684 void *dbt_buf, *match_buf;
685
686 dbp = dbc->dbp;
687 ip = dbc->thread_info;
688 txn = dbc->txn;
689 mpf = dbp->mpf;
690 page_space = P_MAXSPACE(dbp, dbp->pgsize);
691 *cmpp = 0;
692 dbt_buf = match_buf = NULL;
693
694 DB_ASSERT(dbp->env, HPAGE_PTYPE(dbt->data) == H_OFFPAGE);
695 DB_ASSERT(dbp->env, HPAGE_PTYPE(match->data) == H_OFFPAGE);
696
697 /* Extract potentially unaligned length and pgno fields from DBTs */
698 memcpy(&dbt_len, HOFFPAGE_TLEN(dbt->data), sizeof(u_int32_t));
699 memcpy(&dbt_pgno, HOFFPAGE_PGNO(dbt->data), sizeof(db_pgno_t));
700 memcpy(&match_len, HOFFPAGE_TLEN(match->data), sizeof(u_int32_t));
701 memcpy(&match_pgno, HOFFPAGE_PGNO(match->data), sizeof(db_pgno_t));
702 max_data = (dbt_len < match_len ? dbt_len : match_len);
703
704 /*
705 * If there is a custom comparator, fully resolve both DBTs.
706 * Then call the users comparator.
707 */
708 if (cmpfunc != NULL) {
709 memset(&local_key, 0, sizeof(local_key));
710 memset(&local_match, 0, sizeof(local_match));
711 dbt_buf = match_buf = NULL;
712 dbt_bufsz = match_bufsz = 0;
713
714 if ((ret = __db_goff(dbc, &local_key, dbt_len,
715 dbt_pgno, &dbt_buf, &dbt_bufsz)) != 0)
716 goto err1;
717 if ((ret = __db_goff(dbc, &local_match, match_len,
718 match_pgno, &match_buf, &match_bufsz)) != 0)
719 goto err1;
720 /* The key needs to be the first argument for sort order */
721 *cmpp = cmpfunc(dbp, &local_key, &local_match, NULL);
722
723 err1: if (dbt_buf != NULL)
724 __os_free(dbp->env, dbt_buf);
725 if (match_buf != NULL)
726 __os_free(dbp->env, match_buf);
727 return (ret);
728 }
729
730 /* Match the offpage DBTs a page at a time. */
731 while (dbt_pgno != PGNO_INVALID && match_pgno != PGNO_INVALID) {
732 if ((ret =
733 __memp_fget(mpf, &dbt_pgno, ip, txn, 0, &dbt_pagep)) != 0)
734 return (ret);
735 DB_ASSERT(dbc->env, TYPE(dbt_pagep) == P_OVERFLOW);
736 if ((ret =
737 __memp_fget(mpf, &match_pgno,
738 ip, txn, 0, &match_pagep)) != 0) {
739 (void)__memp_fput(
740 mpf, ip, dbt_pagep, DB_PRIORITY_UNCHANGED);
741 return (ret);
742 }
743 DB_ASSERT(dbc->env, TYPE(match_pagep) == P_OVERFLOW);
744 cmp_bytes = page_space < max_data ? page_space : max_data;
745 for (p1 = (u_int8_t *)dbt_pagep + P_OVERHEAD(dbp),
746 p2 = (u_int8_t *)match_pagep + P_OVERHEAD(dbp);
747 cmp_bytes-- > 0; ++p1, ++p2)
748 if (*p1 != *p2) {
749 *cmpp = (long)*p1 - (long)*p2;
750 break;
751 }
752
753 dbt_pgno = NEXT_PGNO(dbt_pagep);
754 match_pgno = NEXT_PGNO(match_pagep);
755 max_data -= page_space;
756 if ((ret = __memp_fput(mpf,
757 ip, dbt_pagep, DB_PRIORITY_UNCHANGED)) != 0) {
758 (void)__memp_fput(mpf,
759 ip, match_pagep, DB_PRIORITY_UNCHANGED);
760 return (ret);
761 }
762 if ((ret = __memp_fput(mpf,
763 ip, match_pagep, DB_PRIORITY_UNCHANGED)) != 0)
764 return (ret);
765 if (*cmpp != 0)
766 return (0);
767 }
768
769 /* If a lexicographic mismatch was found, then the result has already
770 * been returned. If the DBTs matched, consider the lengths of the
771 * items, and return appropriately.
772 */
773 if (dbt_len > match_len) /* DBT is longer than the match key. */
774 *cmpp = 1;
775 else if (match_len > dbt_len) /* DBT is shorter than the match key. */
776 *cmpp = -1;
777 else
778 *cmpp = 0;
779
780 return (0);
781
782 }
783