1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 1997, 1998, 1999
5 * Sleepycat Software. All rights reserved.
6 */
7 /*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 * Keith Bostic. All rights reserved.
10 */
11 /*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 * The Regents of the University of California. All rights reserved.
14 *
15 * This code is derived from software contributed to Berkeley by
16 * Mike Olson.
17 *
18 * Redistribution and use in source and binary forms, with or without
19 * modification, are permitted provided that the following conditions
20 * are met:
21 * 1. Redistributions of source code must retain the above copyright
22 * notice, this list of conditions and the following disclaimer.
23 * 2. Redistributions in binary form must reproduce the above copyright
24 * notice, this list of conditions and the following disclaimer in the
25 * documentation and/or other materials provided with the distribution.
26 * 3. Neither the name of the University nor the names of its contributors
27 * may be used to endorse or promote products derived from this software
28 * without specific prior written permission.
29 *
30 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
31 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
32 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
34 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
35 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
36 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
38 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
39 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
40 * SUCH DAMAGE.
41 */
42
43 #include "db_config.h"
44
45 #ifndef lint
46 static const char sccsid[] = "@(#)db_overflow.c 11.2 (Sleepycat) 9/9/99";
47 #endif /* not lint */
48
49 #ifndef NO_SYSTEM_INCLUDES
50 #include <sys/types.h>
51
52 #include <errno.h>
53 #include <string.h>
54 #endif
55
56 #include "db_int.h"
57 #include "db_page.h"
58 #include "db_am.h"
59
60 /*
61 * Big key/data code.
62 *
63 * Big key and data entries are stored on linked lists of pages. The initial
64 * reference is a structure with the total length of the item and the page
65 * number where it begins. Each entry in the linked list contains a pointer
66 * to the next page of data, and so on.
67 */
68
69 /*
70 * CDB___db_goff --
71 * Get an offpage item.
72 *
73 * PUBLIC: int CDB___db_goff __P((DB *, DBT *,
74 * PUBLIC: u_int32_t, db_pgno_t, void **, u_int32_t *));
75 */
76 int
CDB___db_goff(dbp,dbt,tlen,pgno,bpp,bpsz)77 CDB___db_goff(dbp, dbt, tlen, pgno, bpp, bpsz)
78 DB *dbp;
79 DBT *dbt;
80 u_int32_t tlen;
81 db_pgno_t pgno;
82 void **bpp;
83 u_int32_t *bpsz;
84 {
85 PAGE *h;
86 db_indx_t bytes;
87 u_int32_t curoff, needed, start;
88 u_int8_t *p, *src;
89 int ret;
90
91 /*
92 * Check if the buffer is big enough; if it is not and we are
93 * allowed to malloc space, then we'll malloc it. If we are
94 * not (DB_DBT_USERMEM), then we'll set the dbt and return
95 * appropriately.
96 */
97 if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
98 start = dbt->doff;
99 needed = dbt->dlen;
100 } else {
101 start = 0;
102 needed = tlen;
103 }
104
105 /* Allocate any necessary memory. */
106 if (F_ISSET(dbt, DB_DBT_USERMEM)) {
107 if (needed > dbt->ulen) {
108 dbt->size = needed;
109 return (ENOMEM);
110 }
111 } else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
112 if ((ret =
113 CDB___os_malloc(needed, dbp->db_malloc, &dbt->data)) != 0)
114 return (ret);
115 } else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
116 if ((ret =
117 CDB___os_realloc(needed, dbp->db_realloc, &dbt->data)) != 0)
118 return (ret);
119 } else if (*bpsz == 0 || *bpsz < needed) {
120 if ((ret = CDB___os_realloc(needed, NULL, bpp)) != 0)
121 return (ret);
122 *bpsz = needed;
123 dbt->data = *bpp;
124 } else
125 dbt->data = *bpp;
126
127 /*
128 * Step through the linked list of pages, copying the data on each
129 * one into the buffer. Never copy more than the total data length.
130 */
131 dbt->size = needed;
132 for (curoff = 0, p = dbt->data; pgno != P_INVALID && needed > 0;) {
133 if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) {
134 (void)CDB___db_pgerr(dbp, pgno);
135 return (ret);
136 }
137 /* Check if we need any bytes from this page. */
138 if (curoff + OV_LEN(h) >= start) {
139 src = (u_int8_t *)h + P_OVERHEAD;
140 bytes = OV_LEN(h);
141 if (start > curoff) {
142 src += start - curoff;
143 bytes -= start - curoff;
144 }
145 if (bytes > needed)
146 bytes = needed;
147 memcpy(p, src, bytes);
148 p += bytes;
149 needed -= bytes;
150 }
151 curoff += OV_LEN(h);
152 pgno = h->next_pgno;
153 CDB_memp_fput(dbp->mpf, h, 0);
154 }
155 return (0);
156 }
157
158 /*
159 * CDB___db_poff --
160 * Put an offpage item.
161 *
162 * PUBLIC: int CDB___db_poff __P((DBC *, const DBT *, db_pgno_t *));
163 */
164 int
CDB___db_poff(dbc,dbt,pgnop)165 CDB___db_poff(dbc, dbt, pgnop)
166 DBC *dbc;
167 const DBT *dbt;
168 db_pgno_t *pgnop;
169 {
170 DB *dbp;
171 PAGE *pagep, *lastp;
172 DB_LSN new_lsn, null_lsn;
173 DBT tmp_dbt;
174 db_indx_t pagespace;
175 u_int32_t sz;
176 u_int8_t *p;
177 int ret;
178
179 /*
180 * Allocate pages and copy the key/data item into them. Calculate the
181 * number of bytes we get for pages we fill completely with a single
182 * item.
183 */
184 dbp = dbc->dbp;
185 pagespace = P_MAXSPACE(dbp->pgsize);
186
187 lastp = NULL;
188 for (p = dbt->data,
189 sz = dbt->size; sz > 0; p += pagespace, sz -= pagespace) {
190 /*
191 * Reduce pagespace so we terminate the loop correctly and
192 * don't copy too much data.
193 */
194 if (sz < pagespace)
195 pagespace = sz;
196
197 /*
198 * Allocate and initialize a new page and copy all or part of
199 * the item onto the page. If sz is less than pagespace, we
200 * have a partial record.
201 */
202 if ((ret = CDB___db_new(dbc, P_OVERFLOW, &pagep)) != 0)
203 return (ret);
204 if (DB_LOGGING(dbc)) {
205 tmp_dbt.data = p;
206 tmp_dbt.size = pagespace;
207 ZERO_LSN(null_lsn);
208 if ((ret = CDB___db_big_log(dbp->dbenv, dbc->txn,
209 &new_lsn, 0, DB_ADD_BIG, dbp->log_fileid,
210 PGNO(pagep), lastp ? PGNO(lastp) : PGNO_INVALID,
211 PGNO_INVALID, &tmp_dbt, &LSN(pagep),
212 lastp == NULL ? &null_lsn : &LSN(lastp),
213 &null_lsn)) != 0)
214 return (ret);
215
216 /* Move lsn onto page. */
217 if (lastp)
218 LSN(lastp) = new_lsn;
219 LSN(pagep) = new_lsn;
220 }
221
222 P_INIT(pagep, dbp->pgsize,
223 PGNO(pagep), PGNO_INVALID, PGNO_INVALID, 0, P_OVERFLOW);
224 OV_LEN(pagep) = pagespace;
225 OV_REF(pagep) = 1;
226 memcpy((u_int8_t *)pagep + P_OVERHEAD, p, pagespace);
227
228 /*
229 * If this is the first entry, update the user's info.
230 * Otherwise, update the entry on the last page filled
231 * in and release that page.
232 */
233 if (lastp == NULL)
234 *pgnop = PGNO(pagep);
235 else {
236 lastp->next_pgno = PGNO(pagep);
237 pagep->prev_pgno = PGNO(lastp);
238 (void)CDB_memp_fput(dbp->mpf, lastp, DB_MPOOL_DIRTY);
239 }
240 lastp = pagep;
241 }
242 (void)CDB_memp_fput(dbp->mpf, lastp, DB_MPOOL_DIRTY);
243 return (0);
244 }
245
246 /*
247 * CDB___db_ovref --
248 * Increment/decrement the reference count on an overflow page.
249 *
250 * PUBLIC: int CDB___db_ovref __P((DBC *, db_pgno_t, int32_t));
251 */
252 int
CDB___db_ovref(dbc,pgno,adjust)253 CDB___db_ovref(dbc, pgno, adjust)
254 DBC *dbc;
255 db_pgno_t pgno;
256 int32_t adjust;
257 {
258 DB *dbp;
259 PAGE *h;
260 int ret;
261
262 dbp = dbc->dbp;
263 if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) {
264 (void)CDB___db_pgerr(dbp, pgno);
265 return (ret);
266 }
267
268 if (DB_LOGGING(dbc))
269 if ((ret = CDB___db_ovref_log(dbp->dbenv, dbc->txn,
270 &LSN(h), 0, dbp->log_fileid, h->pgno, adjust,
271 &LSN(h))) != 0)
272 return (ret);
273 OV_REF(h) += adjust;
274
275 (void)CDB_memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY);
276 return (0);
277 }
278
279 /*
280 * CDB___db_doff --
281 * Delete an offpage chain of overflow pages.
282 *
283 * PUBLIC: int CDB___db_doff __P((DBC *, db_pgno_t));
284 */
285 int
CDB___db_doff(dbc,pgno)286 CDB___db_doff(dbc, pgno)
287 DBC *dbc;
288 db_pgno_t pgno;
289 {
290 DB *dbp;
291 PAGE *pagep;
292 DB_LSN null_lsn;
293 DBT tmp_dbt;
294 int ret;
295
296 dbp = dbc->dbp;
297 do {
298 if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0) {
299 (void)CDB___db_pgerr(dbp, pgno);
300 return (ret);
301 }
302
303 /*
304 * If it's an overflow page and it's referenced by more than
305 * one key/data item, decrement the reference count and return.
306 */
307 if (TYPE(pagep) == P_OVERFLOW && OV_REF(pagep) > 1) {
308 (void)CDB_memp_fput(dbp->mpf, pagep, 0);
309 return (CDB___db_ovref(dbc, pgno, -1));
310 }
311
312 if (DB_LOGGING(dbc)) {
313 tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD;
314 tmp_dbt.size = OV_LEN(pagep);
315 ZERO_LSN(null_lsn);
316 if ((ret = CDB___db_big_log(dbp->dbenv, dbc->txn,
317 &LSN(pagep), 0, DB_REM_BIG, dbp->log_fileid,
318 PGNO(pagep), PREV_PGNO(pagep), NEXT_PGNO(pagep),
319 &tmp_dbt, &LSN(pagep), &null_lsn, &null_lsn)) != 0)
320 return (ret);
321 }
322 pgno = pagep->next_pgno;
323 if ((ret = CDB___db_free(dbc, pagep)) != 0)
324 return (ret);
325 } while (pgno != PGNO_INVALID);
326
327 return (0);
328 }
329
330 /*
331 * CDB___db_moff --
332 * Match on overflow pages.
333 *
334 * Given a starting page number and a key, return <0, 0, >0 to indicate if the
335 * key on the page is less than, equal to or greater than the key specified.
336 * We optimize this by doing chunk at a time comparison unless the user has
337 * specified a comparison function. In this case, we need to materialize
338 * the entire object and call their comparison routine.
339 *
340 * PUBLIC: int CDB___db_moff __P((DB *, const DBT *, db_pgno_t, u_int32_t,
341 * PUBLIC: int (*)(const DBT *, const DBT *), int *));
342 */
343 int
CDB___db_moff(dbp,dbt,pgno,tlen,cmpfunc,cmpp)344 CDB___db_moff(dbp, dbt, pgno, tlen, cmpfunc, cmpp)
345 DB *dbp;
346 const DBT *dbt;
347 db_pgno_t pgno;
348 u_int32_t tlen;
349 int (*cmpfunc) __P((const DBT *, const DBT *)), *cmpp;
350 {
351 PAGE *pagep;
352 DBT local_dbt;
353 void *buf;
354 u_int32_t bufsize, cmp_bytes, key_left;
355 u_int8_t *p1, *p2;
356 int ret;
357
358 /*
359 * If there is a user-specified comparison function, build a
360 * contiguous copy of the key, and call it.
361 */
362 if (cmpfunc != NULL) {
363 memset(&local_dbt, 0, sizeof(local_dbt));
364 buf = NULL;
365 bufsize = 0;
366
367 if ((ret = CDB___db_goff(dbp,
368 &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
369 return (ret);
370 *cmpp = cmpfunc(&local_dbt, dbt);
371 CDB___os_free(buf, bufsize);
372 return (0);
373 }
374
375 /* While there are both keys to compare. */
376 for (*cmpp = 0, p1 = dbt->data,
377 key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) {
378 if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0)
379 return (ret);
380
381 cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left;
382 tlen -= cmp_bytes;
383 key_left -= cmp_bytes;
384 for (p2 =
385 (u_int8_t *)pagep + P_OVERHEAD; cmp_bytes-- > 0; ++p1, ++p2)
386 if (*p1 != *p2) {
387 *cmpp = (long)*p1 - (long)*p2;
388 break;
389 }
390 pgno = NEXT_PGNO(pagep);
391 if ((ret = CDB_memp_fput(dbp->mpf, pagep, 0)) != 0)
392 return (ret);
393 if (*cmpp != 0)
394 return (0);
395 }
396 if (key_left > 0) /* DBT is longer than the page key. */
397 *cmpp = -1;
398 else if (tlen > 0) /* DBT is shorter than the page key. */
399 *cmpp = 1;
400 else
401 *cmpp = 0;
402
403 return (0);
404 }
405