1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 1997, 1998, 1999
5  *	Sleepycat Software.  All rights reserved.
6  */
7 /*
8  * Copyright (c) 1990, 1993, 1994, 1995, 1996
9  *	Keith Bostic.  All rights reserved.
10  */
11 /*
12  * Copyright (c) 1990, 1993, 1994, 1995
13  *	The Regents of the University of California.  All rights reserved.
14  *
15  * This code is derived from software contributed to Berkeley by
16  * Mike Olson.
17  *
18  * Redistribution and use in source and binary forms, with or without
19  * modification, are permitted provided that the following conditions
20  * are met:
21  * 1. Redistributions of source code must retain the above copyright
22  *    notice, this list of conditions and the following disclaimer.
23  * 2. Redistributions in binary form must reproduce the above copyright
24  *    notice, this list of conditions and the following disclaimer in the
25  *    documentation and/or other materials provided with the distribution.
26  * 3. Neither the name of the University nor the names of its contributors
27  *    may be used to endorse or promote products derived from this software
28  *    without specific prior written permission.
29  *
30  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
31  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
32  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
34  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
35  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
36  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
38  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
39  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
40  * SUCH DAMAGE.
41  */
42 
43 #include "db_config.h"
44 
45 #ifndef lint
46 static const char sccsid[] = "@(#)db_overflow.c	11.2 (Sleepycat) 9/9/99";
47 #endif /* not lint */
48 
49 #ifndef NO_SYSTEM_INCLUDES
50 #include <sys/types.h>
51 
52 #include <errno.h>
53 #include <string.h>
54 #endif
55 
56 #include "db_int.h"
57 #include "db_page.h"
58 #include "db_am.h"
59 
60 /*
61  * Big key/data code.
62  *
63  * Big key and data entries are stored on linked lists of pages.  The initial
64  * reference is a structure with the total length of the item and the page
65  * number where it begins.  Each entry in the linked list contains a pointer
66  * to the next page of data, and so on.
67  */
68 
69 /*
70  * CDB___db_goff --
71  *	Get an offpage item.
72  *
73  * PUBLIC: int CDB___db_goff __P((DB *, DBT *,
74  * PUBLIC:     u_int32_t, db_pgno_t, void **, u_int32_t *));
75  */
76 int
CDB___db_goff(dbp,dbt,tlen,pgno,bpp,bpsz)77 CDB___db_goff(dbp, dbt, tlen, pgno, bpp, bpsz)
78 	DB *dbp;
79 	DBT *dbt;
80 	u_int32_t tlen;
81 	db_pgno_t pgno;
82 	void **bpp;
83 	u_int32_t *bpsz;
84 {
85 	PAGE *h;
86 	db_indx_t bytes;
87 	u_int32_t curoff, needed, start;
88 	u_int8_t *p, *src;
89 	int ret;
90 
91 	/*
92 	 * Check if the buffer is big enough; if it is not and we are
93 	 * allowed to malloc space, then we'll malloc it.  If we are
94 	 * not (DB_DBT_USERMEM), then we'll set the dbt and return
95 	 * appropriately.
96 	 */
97 	if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
98 		start = dbt->doff;
99 		needed = dbt->dlen;
100 	} else {
101 		start = 0;
102 		needed = tlen;
103 	}
104 
105 	/* Allocate any necessary memory. */
106 	if (F_ISSET(dbt, DB_DBT_USERMEM)) {
107 		if (needed > dbt->ulen) {
108 			dbt->size = needed;
109 			return (ENOMEM);
110 		}
111 	} else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
112 		if ((ret =
113 		    CDB___os_malloc(needed, dbp->db_malloc, &dbt->data)) != 0)
114 			return (ret);
115 	} else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
116 		if ((ret =
117 		    CDB___os_realloc(needed, dbp->db_realloc, &dbt->data)) != 0)
118 			return (ret);
119 	} else if (*bpsz == 0 || *bpsz < needed) {
120 		if ((ret = CDB___os_realloc(needed, NULL, bpp)) != 0)
121 			return (ret);
122 		*bpsz = needed;
123 		dbt->data = *bpp;
124 	} else
125 		dbt->data = *bpp;
126 
127 	/*
128 	 * Step through the linked list of pages, copying the data on each
129 	 * one into the buffer.  Never copy more than the total data length.
130 	 */
131 	dbt->size = needed;
132 	for (curoff = 0, p = dbt->data; pgno != P_INVALID && needed > 0;) {
133 		if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) {
134 			(void)CDB___db_pgerr(dbp, pgno);
135 			return (ret);
136 		}
137 		/* Check if we need any bytes from this page. */
138 		if (curoff + OV_LEN(h) >= start) {
139 			src = (u_int8_t *)h + P_OVERHEAD;
140 			bytes = OV_LEN(h);
141 			if (start > curoff) {
142 				src += start - curoff;
143 				bytes -= start - curoff;
144 			}
145 			if (bytes > needed)
146 				bytes = needed;
147 			memcpy(p, src, bytes);
148 			p += bytes;
149 			needed -= bytes;
150 		}
151 		curoff += OV_LEN(h);
152 		pgno = h->next_pgno;
153 		CDB_memp_fput(dbp->mpf, h, 0);
154 	}
155 	return (0);
156 }
157 
158 /*
159  * CDB___db_poff --
160  *	Put an offpage item.
161  *
162  * PUBLIC: int CDB___db_poff __P((DBC *, const DBT *, db_pgno_t *));
163  */
164 int
CDB___db_poff(dbc,dbt,pgnop)165 CDB___db_poff(dbc, dbt, pgnop)
166 	DBC *dbc;
167 	const DBT *dbt;
168 	db_pgno_t *pgnop;
169 {
170 	DB *dbp;
171 	PAGE *pagep, *lastp;
172 	DB_LSN new_lsn, null_lsn;
173 	DBT tmp_dbt;
174 	db_indx_t pagespace;
175 	u_int32_t sz;
176 	u_int8_t *p;
177 	int ret;
178 
179 	/*
180 	 * Allocate pages and copy the key/data item into them.  Calculate the
181 	 * number of bytes we get for pages we fill completely with a single
182 	 * item.
183 	 */
184 	dbp = dbc->dbp;
185 	pagespace = P_MAXSPACE(dbp->pgsize);
186 
187 	lastp = NULL;
188 	for (p = dbt->data,
189 	    sz = dbt->size; sz > 0; p += pagespace, sz -= pagespace) {
190 		/*
191 		 * Reduce pagespace so we terminate the loop correctly and
192 		 * don't copy too much data.
193 		 */
194 		if (sz < pagespace)
195 			pagespace = sz;
196 
197 		/*
198 		 * Allocate and initialize a new page and copy all or part of
199 		 * the item onto the page.  If sz is less than pagespace, we
200 		 * have a partial record.
201 		 */
202 		if ((ret = CDB___db_new(dbc, P_OVERFLOW, &pagep)) != 0)
203 			return (ret);
204 		if (DB_LOGGING(dbc)) {
205 			tmp_dbt.data = p;
206 			tmp_dbt.size = pagespace;
207 			ZERO_LSN(null_lsn);
208 			if ((ret = CDB___db_big_log(dbp->dbenv, dbc->txn,
209 			    &new_lsn, 0, DB_ADD_BIG, dbp->log_fileid,
210 			    PGNO(pagep), lastp ? PGNO(lastp) : PGNO_INVALID,
211 			    PGNO_INVALID, &tmp_dbt, &LSN(pagep),
212 			    lastp == NULL ? &null_lsn : &LSN(lastp),
213 			    &null_lsn)) != 0)
214 				return (ret);
215 
216 			/* Move lsn onto page. */
217 			if (lastp)
218 				LSN(lastp) = new_lsn;
219 			LSN(pagep) = new_lsn;
220 		}
221 
222 		P_INIT(pagep, dbp->pgsize,
223 		    PGNO(pagep), PGNO_INVALID, PGNO_INVALID, 0, P_OVERFLOW);
224 		OV_LEN(pagep) = pagespace;
225 		OV_REF(pagep) = 1;
226 		memcpy((u_int8_t *)pagep + P_OVERHEAD, p, pagespace);
227 
228 		/*
229 		 * If this is the first entry, update the user's info.
230 		 * Otherwise, update the entry on the last page filled
231 		 * in and release that page.
232 		 */
233 		if (lastp == NULL)
234 			*pgnop = PGNO(pagep);
235 		else {
236 			lastp->next_pgno = PGNO(pagep);
237 			pagep->prev_pgno = PGNO(lastp);
238 			(void)CDB_memp_fput(dbp->mpf, lastp, DB_MPOOL_DIRTY);
239 		}
240 		lastp = pagep;
241 	}
242 	(void)CDB_memp_fput(dbp->mpf, lastp, DB_MPOOL_DIRTY);
243 	return (0);
244 }
245 
246 /*
247  * CDB___db_ovref --
248  *	Increment/decrement the reference count on an overflow page.
249  *
250  * PUBLIC: int CDB___db_ovref __P((DBC *, db_pgno_t, int32_t));
251  */
252 int
CDB___db_ovref(dbc,pgno,adjust)253 CDB___db_ovref(dbc, pgno, adjust)
254 	DBC *dbc;
255 	db_pgno_t pgno;
256 	int32_t adjust;
257 {
258 	DB *dbp;
259 	PAGE *h;
260 	int ret;
261 
262 	dbp = dbc->dbp;
263 	if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) {
264 		(void)CDB___db_pgerr(dbp, pgno);
265 		return (ret);
266 	}
267 
268 	if (DB_LOGGING(dbc))
269 		if ((ret = CDB___db_ovref_log(dbp->dbenv, dbc->txn,
270 		    &LSN(h), 0, dbp->log_fileid, h->pgno, adjust,
271 		    &LSN(h))) != 0)
272 			return (ret);
273 	OV_REF(h) += adjust;
274 
275 	(void)CDB_memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY);
276 	return (0);
277 }
278 
279 /*
280  * CDB___db_doff --
281  *	Delete an offpage chain of overflow pages.
282  *
283  * PUBLIC: int CDB___db_doff __P((DBC *, db_pgno_t));
284  */
285 int
CDB___db_doff(dbc,pgno)286 CDB___db_doff(dbc, pgno)
287 	DBC *dbc;
288 	db_pgno_t pgno;
289 {
290 	DB *dbp;
291 	PAGE *pagep;
292 	DB_LSN null_lsn;
293 	DBT tmp_dbt;
294 	int ret;
295 
296 	dbp = dbc->dbp;
297 	do {
298 		if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0) {
299 			(void)CDB___db_pgerr(dbp, pgno);
300 			return (ret);
301 		}
302 
303 		/*
304 		 * If it's an overflow page and it's referenced by more than
305 		 * one key/data item, decrement the reference count and return.
306 		 */
307 		if (TYPE(pagep) == P_OVERFLOW && OV_REF(pagep) > 1) {
308 			(void)CDB_memp_fput(dbp->mpf, pagep, 0);
309 			return (CDB___db_ovref(dbc, pgno, -1));
310 		}
311 
312 		if (DB_LOGGING(dbc)) {
313 			tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD;
314 			tmp_dbt.size = OV_LEN(pagep);
315 			ZERO_LSN(null_lsn);
316 			if ((ret = CDB___db_big_log(dbp->dbenv, dbc->txn,
317 			    &LSN(pagep), 0, DB_REM_BIG, dbp->log_fileid,
318 			    PGNO(pagep), PREV_PGNO(pagep), NEXT_PGNO(pagep),
319 			    &tmp_dbt, &LSN(pagep), &null_lsn, &null_lsn)) != 0)
320 				return (ret);
321 		}
322 		pgno = pagep->next_pgno;
323 		if ((ret = CDB___db_free(dbc, pagep)) != 0)
324 			return (ret);
325 	} while (pgno != PGNO_INVALID);
326 
327 	return (0);
328 }
329 
330 /*
331  * CDB___db_moff --
332  *	Match on overflow pages.
333  *
334  * Given a starting page number and a key, return <0, 0, >0 to indicate if the
335  * key on the page is less than, equal to or greater than the key specified.
336  * We optimize this by doing chunk at a time comparison unless the user has
337  * specified a comparison function.  In this case, we need to materialize
338  * the entire object and call their comparison routine.
339  *
340  * PUBLIC: int CDB___db_moff __P((DB *, const DBT *, db_pgno_t, u_int32_t,
341  * PUBLIC:     int (*)(const DBT *, const DBT *), int *));
342  */
343 int
CDB___db_moff(dbp,dbt,pgno,tlen,cmpfunc,cmpp)344 CDB___db_moff(dbp, dbt, pgno, tlen, cmpfunc, cmpp)
345 	DB *dbp;
346 	const DBT *dbt;
347 	db_pgno_t pgno;
348 	u_int32_t tlen;
349 	int (*cmpfunc) __P((const DBT *, const DBT *)), *cmpp;
350 {
351 	PAGE *pagep;
352 	DBT local_dbt;
353 	void *buf;
354 	u_int32_t bufsize, cmp_bytes, key_left;
355 	u_int8_t *p1, *p2;
356 	int ret;
357 
358 	/*
359 	 * If there is a user-specified comparison function, build a
360 	 * contiguous copy of the key, and call it.
361 	 */
362 	if (cmpfunc != NULL) {
363 		memset(&local_dbt, 0, sizeof(local_dbt));
364 		buf = NULL;
365 		bufsize = 0;
366 
367 		if ((ret = CDB___db_goff(dbp,
368 		    &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
369 			return (ret);
370 		*cmpp = cmpfunc(&local_dbt, dbt);
371 		CDB___os_free(buf, bufsize);
372 		return (0);
373 	}
374 
375 	/* While there are both keys to compare. */
376 	for (*cmpp = 0, p1 = dbt->data,
377 	    key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) {
378 		if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0)
379 			return (ret);
380 
381 		cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left;
382 		tlen -= cmp_bytes;
383 		key_left -= cmp_bytes;
384 		for (p2 =
385 		    (u_int8_t *)pagep + P_OVERHEAD; cmp_bytes-- > 0; ++p1, ++p2)
386 			if (*p1 != *p2) {
387 				*cmpp = (long)*p1 - (long)*p2;
388 				break;
389 			}
390 		pgno = NEXT_PGNO(pagep);
391 		if ((ret = CDB_memp_fput(dbp->mpf, pagep, 0)) != 0)
392 			return (ret);
393 		if (*cmpp != 0)
394 			return (0);
395 	}
396 	if (key_left > 0)		/* DBT is longer than the page key. */
397 		*cmpp = -1;
398 	else if (tlen > 0)		/* DBT is shorter than the page key. */
399 		*cmpp = 1;
400 	else
401 		*cmpp = 0;
402 
403 	return (0);
404 }
405