1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2010, 2013 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/heap.h"
15 #include "dbinc/lock.h"
16 #include "dbinc/mp.h"
17 
18 static int  __heap_bulk __P((DBC *, DBT *, u_int32_t));
19 static int  __heap_getpage __P((DBC *, u_int32_t, u_int8_t *));
20 static int  __heapc_close __P((DBC *, db_pgno_t, int *));
21 static int  __heapc_del __P((DBC *, u_int32_t));
22 static int  __heapc_destroy __P((DBC *));
23 static int  __heapc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
24 static int  __heapc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
25 static int  __heapc_reloc __P((DBC *, DBT *, DBT *));
26 static int  __heapc_reloc_partial __P((DBC *, DBT *, DBT *));
27 static int  __heapc_split __P((DBC *, DBT *, DBT *, int));
28 
29 /*
30  * Acquire a new page/lock.  If we are already holding a page and a lock
31  * we discard those and get the new ones.  In this case we can use
32  * LCK_COUPLE to save trips to lock manager.  If we are not holding a page or
33  * locks, we just get a new lock and page. Lock release done with a
34  * transactional lock put.
35  */
36 #undef  ACQUIRE
37 #define	ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, flags, mflags, ret) do { \
38 	DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;				\
39 	if ((pagep) != NULL) {						\
40 		ret = __memp_fput(__mpf,				\
41 		    (dbc)->thread_info, pagep, dbc->priority);		\
42 		pagep = NULL;						\
43 	}								\
44 	if ((ret) == 0 && STD_LOCKING(dbc))				\
45 		ret = __db_lget(dbc,					\
46 		    LOCK_ISSET(lock) ? LCK_COUPLE : 0,			\
47 		    lpgno, mode, flags, &(lock));			\
48 	if ((ret) == 0)							\
49 		ret = __memp_fget(__mpf, &(fpgno),			\
50 		    (dbc)->thread_info, (dbc)->txn, mflags, &(pagep));	\
51 } while (0)
52 
53 /* Acquire a new page/lock for a heap cursor */
54 #undef  ACQUIRE_CUR
55 #define	ACQUIRE_CUR(dbc, mode, p, flags, mflags, ret) do {		\
56 	HEAP_CURSOR *__cp = (HEAP_CURSOR *)(dbc)->internal;		\
57 	if (p != __cp->pgno)						\
58 		__cp->pgno = PGNO_INVALID;				\
59 	ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, flags, mflags, ret); \
60 	if ((ret) == 0) {						\
61 		__cp->pgno = p;						\
62 		__cp->lock_mode = (mode);				\
63 	}								\
64 } while (0)
65 
66 /* Discard the current page/lock for a cursor, indicate txn lock release */
67 #undef  DISCARD
68 #define	DISCARD(dbc, pagep, lock, tlock, ret) do {			\
69 	DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;				\
70 	int __t_ret;							\
71 	__t_ret = 0;							\
72 	if ((pagep) != NULL) {						\
73 		__t_ret = __memp_fput(__mpf,				\
74 		    (dbc)->thread_info, pagep, dbc->priority);		\
75 		pagep = NULL;						\
76 	}								\
77 	if (__t_ret != 0 && (ret) == 0)					\
78 		ret = __t_ret;						\
79 	if (tlock == 1)							\
80 		__t_ret = __TLPUT((dbc), lock);				\
81 	else								\
82 		__t_ret = __LPUT((dbc), lock);				\
83 	if (__t_ret != 0 && (ret) == 0)					\
84 		ret = __t_ret;						\
85 } while (0)
86 
87 /*
88  * __heapc_init --
89  *	Initialize the access private portion of a cursor
90  *
91  * PUBLIC: int __heapc_init __P((DBC *));
92  */
93 int
__heapc_init(dbc)94 __heapc_init(dbc)
95 	DBC *dbc;
96 {
97 	ENV *env;
98 	int ret;
99 
100 	env = dbc->env;
101 
102 	if (dbc->internal == NULL)
103 		if ((ret = __os_calloc(
104 		    env, 1, sizeof(HEAP_CURSOR), &dbc->internal)) != 0)
105 			return (ret);
106 
107 	/* Initialize methods. */
108 	dbc->close = dbc->c_close = __dbc_close_pp;
109 	dbc->cmp = __dbc_cmp_pp;
110 	dbc->count = dbc->c_count = __dbc_count_pp;
111 	dbc->del = dbc->c_del = __dbc_del_pp;
112 	dbc->dup = dbc->c_dup = __dbc_dup_pp;
113 	dbc->get = dbc->c_get = __dbc_get_pp;
114 	dbc->pget = dbc->c_pget = __dbc_pget_pp;
115 	dbc->put = dbc->c_put = __dbc_put_pp;
116 	dbc->am_bulk = __heap_bulk;
117 	dbc->am_close = __heapc_close;
118 	dbc->am_del = __heapc_del;
119 	dbc->am_destroy = __heapc_destroy;
120 	dbc->am_get = __heapc_get;
121 	dbc->am_put = __heapc_put;
122 	dbc->am_writelock = NULL;
123 
124 	return (0);
125 }
126 
127 static int
__heap_bulk(dbc,data,flags)128 __heap_bulk(dbc, data, flags)
129 	DBC *dbc;
130 	DBT *data;
131 	u_int32_t flags;
132 {
133 	DB *dbp;
134 	DB_HEAP_RID prev_rid, rid;
135 	DBT sdata;
136 	HEAP_CURSOR *cp;
137 	HEAPHDR *hdr;
138 	HEAPSPLITHDR *shdr;
139 	PAGE *pg;
140 	db_lockmode_t lock_type;
141 	int is_key, ret;
142 	int32_t *offp;
143 	u_int32_t data_size, key_size, needed, space;
144 	u_int8_t *dbuf, *np;
145 
146 	ret = 0;
147 	dbp = dbc->dbp;
148 	cp = (HEAP_CURSOR *)dbc->internal;
149 	hdr = NULL;
150 	shdr = NULL;
151 
152 	/* Check for additional bits for locking */
153 	if (F_ISSET(dbc, DBC_RMW))
154 		lock_type = DB_LOCK_WRITE;
155 	else
156 		lock_type = DB_LOCK_READ;
157 
158 	/*
159 	 * np is the next place to copy things into the buffer.
160 	 * dbuf always stays at the beginning of the buffer.
161 	 */
162 	dbuf = data->data;
163 	np = dbuf;
164 
165 	/* Keep track of space that is left.  There is a termination entry */
166 	space = data->ulen;
167 	space -= sizeof(*offp);
168 
169 	/* Build the offset/size table from the end up. */
170 	offp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
171 	offp--;
172 
173 	/*
174 	 * key_size and data_size hold the 32-bit aligned size of the key and
175 	 * data values written to the buffer.
176 	 */
177 	key_size = DB_ALIGN(DB_HEAP_RID_SZ, sizeof(u_int32_t));
178 	data_size = 0;
179 
180 	/* is_key indicates whether keys are returned. */
181 	is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
182 
183 next_pg:
184 	rid.indx = cp->indx;
185 	rid.pgno = cp->pgno;
186 	pg = cp->page;
187 
188 	/*
189 	 * Write records to the buffer, in the format needed by the DB_MULTIPLE
190 	 * macros.  For a description of the data layout, see db.h.
191 	 */
192 	do {
193 		if (HEAP_OFFSETTBL(dbp, pg)[rid.indx] == 0)
194 			continue;
195 		hdr = (HEAPHDR *)P_ENTRY(dbp, pg, rid.indx);
196 		/*
197 		 * If this is a split record and not the first piece of the
198 		 * record, skip it.
199 		 */
200 		if (F_ISSET(hdr, HEAP_RECSPLIT) &&
201 		    !F_ISSET(hdr, HEAP_RECFIRST))
202 			continue;
203 
204 		/*
205 		 * Calculate how much space is needed to add this record.  If
206 		 * there's not enough, we're done.  If we haven't written any
207 		 * data to the buffer, or if we are doing a DBP->get, return
208 		 * DB_BUFFER_SMALL.
209 		 */
210 		needed = 0;
211 		if (is_key)
212 			needed = 2 * sizeof(*offp) + key_size;
213 		if (F_ISSET(hdr, HEAP_RECSPLIT)) {
214 			shdr = (HEAPSPLITHDR *)hdr;
215 			data_size = DB_ALIGN(shdr->tsize, sizeof(u_int32_t));
216 		} else
217 			data_size = DB_ALIGN(hdr->size, sizeof(u_int32_t));
218 		needed += 2 * sizeof(*offp) + data_size;
219 
220 		if (needed > space) {
221 			if (np == dbuf || F_ISSET(dbc, DBC_FROM_DB_GET)) {
222 				data->size = (u_int32_t)DB_ALIGN(
223 				    needed + data->ulen - space, 1024);
224 				return (DB_BUFFER_SMALL);
225 			}
226 			break;
227 		}
228 
229 		if (is_key) {
230 			memcpy(np, &rid, key_size);
231 			*offp-- = (int32_t)(np - dbuf);
232 			*offp-- = (int32_t)DB_HEAP_RID_SZ;
233 			np += key_size;
234 		}
235 
236 		if (F_ISSET(hdr, HEAP_RECSPLIT)) {
237 			/*
238 			 * Use __heapc_gsplit to write a split record to the
239 			 * return buffer.  gsplit will return any fetched pages
240 			 * to the cache, but will leave the cursor's current
241 			 * page alone.
242 			 */
243 			memset(&sdata, 0, sizeof(DBT));
244 			sdata.data = np;
245 			sdata.size = sdata.ulen = shdr->tsize;
246 			sdata.flags = DB_DBT_USERMEM;
247 			/* gsplit expects the cursor to be positioned. */
248 			cp->pgno = rid.pgno;
249 			cp->indx = rid.indx;
250 			if ((ret = __heapc_gsplit(
251 			    dbc, &sdata, NULL, NULL)) != 0)
252 				return (ret);
253 		} else {
254 			memcpy(np,
255 			    (u_int8_t *)hdr + sizeof(HEAPHDR), hdr->size);
256 		}
257 		*offp-- = (int32_t)(np - dbuf);
258 		if (F_ISSET(hdr, HEAP_RECSPLIT))
259 			*offp-- = (int32_t)shdr->tsize;
260 		else
261 			*offp-- = (int32_t)hdr->size;
262 		np += data_size;
263 		space -= needed;
264 		prev_rid = rid;
265 
266 		/*
267 		 * The data and "metadata" ends of the buffer should never
268 		 * overlap.
269 		 */
270 		DB_ASSERT(dbp->env, (void *)np <= (void *)offp);
271 	} while (++rid.indx < NUM_ENT(pg));
272 
273 	/* If we are off the page then try the next page. */
274 	if (rid.indx >= NUM_ENT(pg)) {
275 		rid.pgno++;
276 		ACQUIRE_CUR(dbc, lock_type, rid.pgno, 0, 0, ret);
277 		if (ret == 0) {
278 			cp->indx = 0;
279 			goto next_pg;
280 		} else if (ret != DB_PAGE_NOTFOUND)
281 			return (ret);
282 	}
283 
284 	DB_ASSERT(dbp->env, (ret == 0 || ret == DB_PAGE_NOTFOUND));
285 	cp->indx = prev_rid.indx;
286 	cp->pgno = prev_rid.pgno;
287 
288 	*offp = -1;
289 
290 	return (0);
291 }
292 
293 static int
__heapc_close(dbc,root_pgno,rmroot)294 __heapc_close(dbc, root_pgno, rmroot)
295 	DBC *dbc;
296 	db_pgno_t root_pgno;
297 	int *rmroot;
298 {
299 	DB_MPOOLFILE *mpf;
300 	HEAP_CURSOR *cp;
301 	int ret;
302 
303 	COMPQUIET(root_pgno, 0);
304 	COMPQUIET(rmroot, 0);
305 
306 	cp = (HEAP_CURSOR *)dbc->internal;
307 	mpf = dbc->dbp->mpf;
308 	ret = 0;
309 
310 	/* Release the page/lock held by the cursor. */
311 	DISCARD(dbc, cp->page, cp->lock, 1, ret);
312 	if (ret == 0 && !LOCK_ISSET(cp->lock))
313 		cp->lock_mode = DB_LOCK_NG;
314 
315 	return (ret);
316 }
317 
318 static int
__heapc_del(dbc,flags)319 __heapc_del(dbc, flags)
320 	DBC *dbc;
321 	u_int32_t flags;
322 {
323 	DB *dbp;
324 	DB_HEAP_RID next_rid, orig_rid;
325 	DB_MPOOLFILE *mpf;
326 	DBT hdr_dbt, log_dbt;
327 	HEAP *h;
328 	HEAPHDR *hdr;
329 	HEAPPG *rpage;
330 	HEAP_CURSOR *cp;
331 	db_pgno_t region_pgno;
332 	int oldspacebits, ret, spacebits, t_ret;
333 	u_int16_t data_size, size;
334 
335 	dbp = dbc->dbp;
336 	mpf = dbp->mpf;
337 	h = dbp->heap_internal;
338 	cp = (HEAP_CURSOR *)dbc->internal;
339 	rpage = NULL;
340 	COMPQUIET(flags, 0);
341 
342 	/*
343 	 * We need to be able to reset the cursor after deleting a record split
344 	 * across multiple pages.
345 	 */
346 	orig_rid.pgno = cp->pgno;
347 	orig_rid.indx = cp->indx;
348 
349 	/*
350 	 * This code is always called with a page lock but no page.
351 	 */
352 	DB_ASSERT(dbp->env, cp->page == NULL);
353 
354 	/* We have a read lock, but need a write lock. */
355 start:	if (STD_LOCKING(dbc) && (ret = __db_lget(dbc,
356 	    LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
357 		return (ret);
358 
359 	if ((ret = __memp_fget(mpf, &cp->pgno,
360 	    dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0)
361 		return (ret);
362 
363 	HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), oldspacebits);
364 
365 	hdr = (HEAPHDR *)P_ENTRY(dbp, cp->page, cp->indx);
366 	data_size = DB_ALIGN(hdr->size, sizeof(u_int32_t));
367 	size = data_size + HEAP_HDRSIZE(hdr);
368 	if (size < sizeof(HEAPSPLITHDR))
369 		size = sizeof(HEAPSPLITHDR);
370 	if (F_ISSET(hdr, HEAP_RECSPLIT) && !F_ISSET(hdr, HEAP_RECLAST)) {
371 		next_rid.pgno = F_ISSET(hdr, HEAP_RECLAST) ?
372 			PGNO_INVALID : ((HEAPSPLITHDR *)hdr)->nextpg;
373 		next_rid.indx = F_ISSET(hdr, HEAP_RECLAST) ?
374 			PGNO_INVALID : ((HEAPSPLITHDR *)hdr)->nextindx;
375 	} else {
376 		next_rid.pgno = PGNO_INVALID;
377 		next_rid.indx = 0;
378 	}
379 
380 	/* Log the deletion. */
381 	if (DBC_LOGGING(dbc)) {
382 		hdr_dbt.data = hdr;
383 		hdr_dbt.size = HEAP_HDRSIZE(hdr);
384 		log_dbt.data = (u_int8_t *)hdr + hdr_dbt.size;
385 		log_dbt.size = data_size;
386 		if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
387 		    0, DB_REM_HEAP, cp->pgno, (u_int32_t)cp->indx,
388 		    size, &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
389 			goto err;
390 	} else
391 		LSN_NOT_LOGGED(LSN(cp->page));
392 
393 	if ((ret = __heap_ditem(dbc, cp->page, cp->indx, size)) != 0)
394 		goto err;
395 
396 	/*
397 	 * If the deleted item lived in a region prior to our current, back up
398 	 * the current region, giving us a chance to reuse the newly available
399 	 * space on the next insert.
400 	 */
401 	region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
402 	if (region_pgno < h->curregion)
403 		h->curregion = region_pgno;
404 
405 	HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), spacebits);
406 
407 	if (spacebits != oldspacebits) {
408 		/*
409 		 * Get the region page.  We never lock the region page, the data
410 		 * page lock locks the corresponding bits in the bitmap and
411 		 * latching serializes access.
412 		 */
413 		if ((ret = __memp_fget(mpf, &region_pgno,
414 		    dbc->thread_info, NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
415 			goto err;
416 		HEAP_SETSPACE(dbp, rpage,
417 		    cp->pgno - region_pgno - 1, spacebits);
418 	}
419 
420 err:	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
421 	if (rpage != NULL && (t_ret = __memp_fput(mpf,
422 	    dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
423 		ret = t_ret;
424 	rpage = NULL;
425 
426 	if ((t_ret = __memp_fput(mpf,
427 	    dbc->thread_info, cp->page, dbc->priority)) != 0 && ret == 0)
428 		ret = t_ret;
429 
430 	cp->page = NULL;
431 
432 	if (ret == 0 && next_rid.pgno != PGNO_INVALID) {
433 		cp->pgno = next_rid.pgno;
434 		cp->indx = next_rid.indx;
435 		goto start;
436 	}
437 
438 	cp->pgno = orig_rid.pgno;
439 	cp->indx = orig_rid.indx;
440 
441 	return (ret);
442 }
443 
444 /*
445  * __heap_ditem --
446  *   Remove an item from a page.
447  *
448  * PUBLIC: int __heap_ditem
449  * PUBLIC:   __P((DBC *, PAGE *, u_int32_t, u_int32_t));
450  */
451 int
__heap_ditem(dbc,pagep,indx,nbytes)452 __heap_ditem(dbc, pagep, indx, nbytes)
453 	DBC *dbc;
454 	PAGE *pagep;
455 	u_int32_t indx, nbytes;
456 {
457 	DB *dbp;
458 	db_indx_t first, i, max, off, *offtbl, span;
459 	u_int8_t *src, *dest;
460 
461 	dbp = dbc->dbp;
462 
463 	DB_ASSERT(dbp->env, TYPE(pagep) == P_HEAP);
464 	DB_ASSERT(dbp->env, nbytes == DB_ALIGN(nbytes, sizeof(u_int32_t)));
465 	DB_ASSERT(dbp->env, nbytes >= sizeof(HEAPSPLITHDR));
466 
467 	offtbl = (db_indx_t *)HEAP_OFFSETTBL(dbp, pagep);
468 	off = offtbl[indx];
469 	/*
470 	 * Find the lowest offset on the page, and adjust offsets that are about
471 	 * to be moved.  If the deleted item is the lowest offset on the page,
472 
473 	 * everything will work, that is not a special case.
474 	 */
475 	max = HEAP_HIGHINDX(pagep);
476 	first = HOFFSET(pagep);
477 	for (i = 0; i <= max; i++) {
478 		if (offtbl[i] < off && offtbl[i] != 0)
479 			offtbl[i] += nbytes;
480 	}
481 	offtbl[indx] = 0;
482 
483 	/*
484 	 * Coalesce free space at the beginning of the page.  Shift all the data
485 	 * preceding the deleted entry down, overwriting the deleted entry.
486 	 */
487 	src = (u_int8_t *)(pagep) + first;
488 	dest = src + nbytes;
489 	span = off - first;
490 	memmove(dest, src, span);
491 #ifdef DIAGNOSTIC
492 	memset(src, CLEAR_BYTE, nbytes);
493 #endif
494 
495 	/* Update the page's metadata. */
496 	NUM_ENT(pagep)--;
497 	HOFFSET(pagep) += nbytes;
498 	if (indx < HEAP_FREEINDX(pagep))
499 		HEAP_FREEINDX(pagep) = indx;
500 	while (HEAP_HIGHINDX(pagep) > 0 && offtbl[HEAP_HIGHINDX(pagep)] == 0)
501 		HEAP_HIGHINDX(pagep)--;
502 	if (NUM_ENT(pagep) == 0)
503 		HEAP_FREEINDX(pagep) = 0;
504 	else if (HEAP_FREEINDX(pagep) > HEAP_HIGHINDX(pagep) + 1)
505 		HEAP_FREEINDX(pagep) = HEAP_HIGHINDX(pagep) + 1;
506 
507 	return (0);
508 }
509 
510 static int
__heapc_destroy(dbc)511 __heapc_destroy(dbc)
512 	DBC *dbc;
513 {
514 	HEAP_CURSOR *cp;
515 
516 	cp = (HEAP_CURSOR *)dbc->internal;
517 	__os_free(dbc->env, cp);
518 	dbc->internal = NULL;
519 
520 	return (0);
521 }
522 
523 /*
524  * __heapc_get --
525  *	Get using a cursor (heap).
526  */
527 static int
__heapc_get(dbc,key,data,flags,pgnop)528 __heapc_get(dbc, key, data, flags, pgnop)
529 	DBC *dbc;
530 	DBT *key;
531 	DBT *data;
532 	u_int32_t flags;
533 	db_pgno_t *pgnop;
534 {
535 	DB *dbp;
536 	DB_HEAP_RID rid;
537 	DB_MPOOLFILE *mpf;
538 	DB_LOCK meta_lock;
539 	DBT tmp_val;
540 	HEAP *h;
541 	HEAPHDR *hdr;
542 	HEAPMETA *meta;
543 	HEAPPG *dpage;
544 	HEAP_CURSOR *cp;
545 	db_lockmode_t lock_type;
546 	db_pgno_t pgno;
547 	int cmp, f_indx, found, getpage, indx, ret;
548 
549 	dbp = dbc->dbp;
550 	mpf = dbp->mpf;
551 	h = dbp->heap_internal;
552 	cp = (HEAP_CURSOR *)dbc->internal;
553 	LOCK_INIT(meta_lock);
554 	COMPQUIET(pgnop, NULL);
555 
556 	if (F_ISSET(key, DB_DBT_USERMEM) && key->ulen < DB_HEAP_RID_SZ) {
557 		key->size = DB_HEAP_RID_SZ;
558 		return (DB_BUFFER_SMALL);
559 	}
560 
561 	/* Check for additional bits for locking */
562 	if (F_ISSET(dbc, DBC_RMW))
563 		lock_type = DB_LOCK_WRITE;
564 	else
565 		lock_type = DB_LOCK_READ;
566 
567 	ret = 0;
568 	found = getpage = FALSE;
569 	meta = NULL;
570 	dpage = NULL;
571 	switch (flags) {
572 	case DB_CURRENT:
573 
574 		/*
575 		 * Acquire the current page with read lock unless user
576 		 * has asked for a write lock.  Ensure page and record
577 		 * exist still.
578 		 */
579 		ACQUIRE_CUR(dbc, lock_type, cp->pgno, 0, 0, ret);
580 		if (ret != 0) {
581 			if (ret == DB_PAGE_NOTFOUND)
582 				ret = DB_NOTFOUND;
583 			goto err;
584 		}
585 
586 		if (HEAP_OFFSETTBL(dbp, cp->page)[cp->indx] == 0) {
587 			ret = DB_NOTFOUND;
588 			goto err;
589 		}
590 		dpage = (HEAPPG *)cp->page;
591 		hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, cp->indx);
592 		if (F_ISSET(hdr, HEAP_RECSPLIT) &&
593 		    !F_ISSET(hdr, HEAP_RECFIRST)) {
594 			ret = DB_NOTFOUND;
595 			goto err;
596 		}
597 
598 		break;
599 	case DB_FIRST:
600 		/*
601 		 * The region pages do not distinguish between an empty
602 		 * page and page with a something on it.  So, we will
603 		 * grab the first possible data page and look for the
604 		 * lowest index with data.  If page is empty we go on to
605 		 * the next page and look. If no page, then no records.
606 		 */
607 first:		pgno = FIRST_HEAP_DPAGE;
608 		while (!found) {
609 			/* Put old lock/page and get the new lock/page */
610 			ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
611 			if (ret != 0 ) {
612 				if (ret == DB_PAGE_NOTFOUND)
613 					ret = DB_NOTFOUND;
614 				goto err;
615 			}
616 			dpage = (HEAPPG *)cp->page;
617 			/*
618 			 * The page needs to be a data page with entries on
619 			 * it.  If page is good, loop through the offset table
620 			 * finding first non-split record or first piece of a
621 			 * split record, then set up cursor.
622 			 */
623 			if (TYPE(dpage) == P_HEAP && NUM_ENT(dpage) != 0) {
624 				for (indx = 0;
625 				     indx <= HEAP_HIGHINDX(dpage); indx++) {
626 					if (HEAP_OFFSETTBL(
627 					    dbp, dpage)[indx] == 0)
628 						continue;
629 					hdr = (HEAPHDR *)P_ENTRY(
630 					    dbp, dpage, indx);
631 					if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
632 					    F_ISSET(hdr, HEAP_RECFIRST)) {
633 						found = TRUE;
634 						cp->pgno = pgno;
635 						cp->indx = indx;
636 						break;
637 					}
638 				}
639 				if (!found)
640 					pgno++;
641 			} else
642 				pgno++;
643 		}
644 		break;
645 	case DB_LAST:
646 		/*
647 		 * Grab the metadata page to find the last page, and start
648 		 * there looking backwards for the record with the highest
649 		 * index and return that one.
650 		 */
651 last:		pgno = PGNO_BASE_MD;
652 		ACQUIRE(dbc, DB_LOCK_READ,
653 		    pgno, meta_lock, pgno, meta, 0, 0, ret);
654 		if (ret != 0)
655 			goto err;
656 
657 		pgno = meta->dbmeta.last_pgno;
658 
659 		/*
660 		 * It is possible to have another page added while we are
661 		 * searching backwards for last record. No need to block
662 		 * this case from occurring by keeping meta page lock.
663 		 */
664 		DISCARD(dbc, meta, meta_lock, 1, ret);
665 		if (ret != 0)
666 			goto err;
667 
668 		while (!found) {
669 			/* Don't look earlier than the first data page. */
670 			if (pgno < FIRST_HEAP_DPAGE) {
671 				ret = DB_NOTFOUND;
672 				goto err;
673 			}
674 
675 			/* Put old lock/page and get the new lock/page. */
676 			ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
677 			if (ret != 0)
678 				goto err;
679 			dpage = (HEAPPG *)cp->page;
680 			/*
681 			 * The page needs to be a data page with entries on
682 			 * it.  If page is good, search backwards until the a
683 			 * non-split record or the first piece of a split record
684 			 * is found.
685 			 */
686 			if (TYPE(dpage) == P_HEAP && NUM_ENT(dpage) != 0) {
687 				for (indx = HEAP_HIGHINDX(dpage);
688 				     indx >= 0; indx--) {
689 					if (HEAP_OFFSETTBL(
690 					    dbp, dpage)[indx] == 0)
691 						continue;
692 					hdr = (HEAPHDR *)P_ENTRY(
693 					    dbp, dpage, indx);
694 					if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
695 					    F_ISSET(hdr, HEAP_RECFIRST)) {
696 						found = TRUE;
697 						cp->pgno = pgno;
698 						cp->indx = indx;
699 						break;
700 					}
701 				}
702 				if (!found)
703 					pgno--;
704 			} else
705 				pgno--;
706 		}
707 		break;
708 	case DB_NEXT_NODUP:
709 	case DB_NEXT:
710 		/* If cursor not initialize, behave as DB_FIRST */
711 		if (dbc->internal->pgno == PGNO_INVALID)
712 			goto first;
713 
714 		/*
715 		 * Acquire the current page with the lock we have already,
716 		 * unless user has asked for a write lock.
717 		 */
718 		ACQUIRE_CUR(dbc, lock_type, cp->pgno, 0, 0, ret);
719 		if (ret != 0)
720 			goto err;
721 		dpage = (HEAPPG *)cp->page;
722 
723 		/* At end of current page, must get next page */
724 		if (cp->indx >= HEAP_HIGHINDX(dpage))
725 			getpage = TRUE;
726 
727 		while (!found) {
728 			if (getpage) {
729 				pgno = cp->pgno + 1;
730 
731 				/* Put current page/lock and get next one */
732 				ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
733 				if (ret != 0) {
734 					/* Beyond last page? */
735 					if (ret == DB_PAGE_NOTFOUND)
736 						ret = DB_NOTFOUND;
737 					goto err;
738 				}
739 				dpage = (HEAPPG *)cp->page;
740 
741 				/*
742 				 * If page is a spam page or its a data
743 				 * page without entries, try again.
744 				 */
745 				if (TYPE(dpage) != P_HEAP ||
746 				    (TYPE(dpage) == P_HEAP &&
747 				    NUM_ENT(dpage) == 0))
748 					continue;
749 
750 				/* When searching, indx gets bumped to 0 */
751 				cp->indx = -1;
752 				getpage = FALSE;
753 			}
754 
755 			/*
756 			 * Bump index and loop through the offset table finding
757 			 * first nonzero entry.  If the offset is for a split
758 			 * record, make sure it's the first piece of the split
759 			 * record. HEAP_HIGHINDX always points to highest filled
760 			 * entry on page.
761 			 */
762 			cp->indx++;
763 			for (indx=cp->indx;
764 			     indx <= HEAP_HIGHINDX(dpage); indx++) {
765 				if (HEAP_OFFSETTBL(dbp, dpage)[indx] == 0)
766 					continue;
767 				hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, indx);
768 				if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
769 				    F_ISSET(hdr, HEAP_RECFIRST)) {
770 					found = TRUE;
771 					cp->indx = indx;
772 					break;
773 				}
774 			}
775 
776 			/* Nothing of interest on page, so try next */
777 			if (!found)
778 				getpage = TRUE;
779 		}
780 		break;
781 	case DB_PREV_NODUP:
782 	case DB_PREV:
783 		/* If cursor not initialize, behave as DB_LAST */
784 		if (dbc->internal->pgno == PGNO_INVALID)
785 			goto last;
786 
787 		/*
788 		 * Acquire the current page with the lock we have already,
789 		 * unless user has asked for a write lock.
790 		 */
791 		ACQUIRE_CUR(dbc, lock_type, cp->pgno, 0, 0, ret);
792 		if (ret != 0)
793 			goto err;
794 		dpage = (HEAPPG *)cp->page;
795 
796 		/*
797 		 * Loop through indexes and find first used slot.  Check if
798 		 * already at the first slot.
799 		 */
800 		for (f_indx=0; (f_indx <= HEAP_HIGHINDX(dpage)) &&
801 		    (HEAP_OFFSETTBL(dbp, dpage)[f_indx] == 0); f_indx++) ;
802 
803 		/* At the beginning of current page, must get new page */
804 		if (cp->indx == 0 || cp->indx <= f_indx) {
805 			if (cp->pgno == FIRST_HEAP_DPAGE) {
806 				ret = DB_NOTFOUND;
807 				goto err;
808 			}
809 			getpage = TRUE;
810 		}
811 
812 		while (!found) {
813 			if (getpage) {
814 				pgno = cp->pgno - 1;
815 				/* Do not go past first page */
816 				if (pgno < FIRST_HEAP_DPAGE) {
817 					ret = DB_NOTFOUND;
818 					goto err;
819 				}
820 				/* Put current page/lock and get prev page. */
821 				ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
822 				if (ret != 0)
823 					goto err;
824 
825 				dpage = (HEAPPG *)cp->page;
826 
827 				/*
828 				 * If page is a spam page or its a data
829 				 * page without entries, try again.
830 				 */
831 				if (TYPE(dpage) != P_HEAP ||
832 				    (TYPE(dpage) == P_HEAP &&
833 				    NUM_ENT(dpage) == 0))
834 					continue;
835 
836 				/* When search, this gets bumped to high indx */
837 				cp->indx = HEAP_HIGHINDX(dpage) + 1;
838 				getpage = FALSE;
839 			}
840 
841 			/*
842 			 * Decrement index and loop through the offset table
843 			 * finding previous nonzero entry.
844 			 */
845 			cp->indx--;
846 			for (indx=cp->indx;
847 			     indx >= 0; indx--) {
848 				if (HEAP_OFFSETTBL(dbp, dpage)[indx] == 0)
849 					continue;
850 				hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, indx);
851 				if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
852 				    F_ISSET(hdr, HEAP_RECFIRST)) {
853 					found = TRUE;
854 					cp->indx = indx;
855 					break;
856 				}
857 			}
858 
859 			/* Nothing of interest on page, so try previous */
860 			if (!found)
861 				getpage = TRUE;
862 		}
863 		break;
864 	case DB_GET_BOTH_RANGE:
865 	case DB_GET_BOTH:
866 	case DB_SET_RANGE:
867 	case DB_SET:
868 		pgno = ((DB_HEAP_RID *)key->data)->pgno;
869 		indx = ((DB_HEAP_RID *)key->data)->indx;
870 
871 		/* First make sure we're trying to get a data page. */
872 		if (pgno == PGNO_BASE_MD ||
873 		    pgno == HEAP_REGION_PGNO(dbp, pgno)) {
874 			ret = DB_NOTFOUND;
875 			goto err;
876 		}
877 
878 		/* Lock the data page and get it. */
879 		ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
880 
881 		if (ret != 0) {
882 			if (ret == DB_PAGE_NOTFOUND)
883 				ret = DB_NOTFOUND;
884 			goto err;
885 		}
886 		dpage = (HEAPPG *)cp->page;
887 
888 		/* validate requested index, throw error if not in range */
889 		if ((indx >  HEAP_HIGHINDX(dpage)) ||
890 		    (HEAP_OFFSETTBL(dbp, dpage)[indx] == 0)) {
891 			DISCARD(dbc, cp->page, cp->lock, 0, ret);
892 			ret = DB_NOTFOUND;
893 			goto err;
894 		}
895 		hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, indx);
896 		if (F_ISSET(hdr, HEAP_RECSPLIT) &&
897 		    !F_ISSET(hdr, HEAP_RECFIRST)) {
898 			DISCARD(dbc, cp->page, cp->lock, 0, ret);
899 			ret = DB_NOTFOUND;
900 			goto err;
901 		}
902 
903 		cp->pgno = pgno;
904 		cp->indx = indx;
905 
906 		if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE)	{
907 			memset(&tmp_val, 0, sizeof(DBT));
908 			/* does the data match ? */
909 			if (F_ISSET(hdr, HEAP_RECSPLIT)) {
910 				tmp_val.flags = DB_DBT_MALLOC;
911 				if ((ret = __heapc_gsplit(
912 				    dbc, &tmp_val, NULL, 0)) != 0)
913 					goto err;
914 			} else {
915 				tmp_val.data =
916 				    (void *)((u_int8_t *)hdr + sizeof(HEAPHDR));
917 				tmp_val.size = hdr->size;
918 			}
919 			cmp = __bam_defcmp(dbp, &tmp_val, data);
920 			if (F_ISSET(&tmp_val, DB_DBT_MALLOC))
921 				__os_ufree(dbp->env, tmp_val.data);
922 			if (cmp != 0) {
923 				ret = DB_NOTFOUND;
924 				goto err;
925 			}
926 		}
927 
928 		break;
929 	case DB_NEXT_DUP:
930 	case DB_PREV_DUP:
931 		ret = DB_NOTFOUND;
932 		goto err;
933 	default:
934 		/* DB_GET_RECNO, DB_JOIN_ITEM, DB_SET_RECNO are invalid */
935 		ret = __db_unknown_flag(dbp->env, "__heap_get", flags);
936 		goto err;
937 
938 	}
939 
940 err:	if (ret == 0 ) {
941 		if (key != NULL) {
942 			rid.pgno = cp->pgno;
943 			rid.indx = cp->indx;
944 			ret = __db_retcopy(dbp->env, key, &rid,
945 			    DB_HEAP_RID_SZ, &dbc->rkey->data, &dbc->rkey->ulen);
946 			F_SET(key, DB_DBT_ISSET);
947 		}
948 
949 	} else {
950 		if (meta != NULL)
951 			(void)__memp_fput(mpf,
952 			    dbc->thread_info, meta, dbc->priority);
953 		if (LOCK_ISSET(meta_lock))
954 			(void)__LPUT(dbc, meta_lock);
955 		if (LOCK_ISSET(cp->lock))
956 			 (void)__LPUT(dbc, cp->lock);
957 	}
958 	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
959 	return (ret);
960 }
961 
962 #undef	IS_FIRST
963 #define	IS_FIRST (last_rid.pgno == PGNO_INVALID)
964 /*
965  * __heapc_reloc_partial --
966  *	 Move data from a too-full page to a new page.  The old data page must
967  *	 be write locked before calling this method.
968  */
969 static int
__heapc_reloc_partial(dbc,key,data)970 __heapc_reloc_partial(dbc, key, data)
971 	DBC *dbc;
972 	DBT *key;
973 	DBT *data;
974 {
975 	DB *dbp;
976 	DBT hdr_dbt, log_dbt, t_data, t_key;
977 	DB_HEAP_RID last_rid, next_rid;
978 	HEAPHDR *old_hdr;
979 	HEAPSPLITHDR new_hdr;
980 	HEAP_CURSOR *cp;
981 	int add_bytes, ret;
982 	u_int32_t buflen, data_size, dlen, doff, left, old_size;
983 	u_int32_t remaining, size;
984 	u_int8_t *buf, *olddata;
985 
986 	dbp = dbc->dbp;
987 	cp = (HEAP_CURSOR *)dbc->internal;
988 	old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
989 	memset(&hdr_dbt, 0, sizeof(DBT));
990 	memset(&log_dbt, 0, sizeof(DBT));
991 	buf = NULL;
992 	COMPQUIET(key, NULL);
993 
994 	/* We only work on partial puts. */
995 	DB_ASSERT(dbp->env, F_ISSET(data, DB_DBT_PARTIAL));
996 
997 	/*
998 	 * Start by calculating the data_size, total size of the new record, and
999 	 * dlen, the number of bytes we will actually overwrite.  Keep a local
1000 	 * copy of doff, we'll adjust it as we see pieces of the record so that
1001 	 * it's always relative to the current piece of data.
1002 	 */
1003 	if (F_ISSET(old_hdr, HEAP_RECSPLIT))
1004 		old_size = ((HEAPSPLITHDR *)old_hdr)->tsize;
1005 	else
1006 		old_size = old_hdr->size;
1007 	doff = data->doff;
1008 	if (old_size < doff) {
1009 		/* Post-pending */
1010 		dlen = data->dlen;
1011 		data_size = doff + data->size;
1012 	} else {
1013 		if (old_size - doff < data->dlen)
1014 			dlen = old_size - doff;
1015 		else
1016 			dlen = data->dlen;
1017 		data_size = old_size - dlen + data->size;
1018 	}
1019 
1020 	/*
1021 	 * We don't need a buffer large enough to hold the data_size
1022 	 * bytes, just one large enough to hold the bytes that will be
1023 	 * written to an individual page.  We'll realloc to the necessary size
1024 	 * as needed.
1025 	 */
1026 	buflen = 0;
1027 	buf = NULL;
1028 
1029 	/*
1030 	 * We are updating an existing record, which will grow into a split
1031 	 * record.  The strategy is to overwrite the existing record (or each
1032 	 * piece of the record if the record is already split.)  If the new
1033 	 * record is shorter than the old, delete any extra pieces.  If the new
1034 	 * record is longer than the old, use heapc_split() to write the extra
1035 	 * data.
1036 	 *
1037 	 * We start each loop with old_hdr pointed at the header for the old
1038 	 * record and the necessary page write locked in cp->page.
1039 	 */
1040 	last_rid.pgno = PGNO_INVALID;
1041 	last_rid.indx = 0;
1042 	add_bytes = 1;
1043 	left = data_size;
1044 	memset(&t_data, 0, sizeof(DBT));
1045 	remaining = 0;
1046 	for (;;) {
1047 		/* Figure out if we have a next piece. */
1048 		if (F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1049 			next_rid.pgno = ((HEAPSPLITHDR *)old_hdr)->nextpg;
1050 			next_rid.indx = ((HEAPSPLITHDR *)old_hdr)->nextindx;
1051 		} else {
1052 			next_rid.pgno = PGNO_INVALID;
1053 			next_rid.indx = 0;
1054 		}
1055 
1056 		/*
1057 		 * Before we delete the old data, use it to construct the new
1058 		 * data. First figure out the size of the new piece, including
1059 		 * any remaining data from the last piece.
1060 		 */
1061 		if (doff >= old_hdr->size)
1062 			if (F_ISSET(old_hdr, HEAP_RECLAST) ||
1063 			    !F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1064 				/* Post-pending. */
1065 				data_size = doff + data->size;
1066 			} else {
1067 				/* The new piece is just the old piece. */
1068 				data_size = old_hdr->size;
1069 			}
1070 		else if (doff + dlen > old_hdr->size)
1071 			/*
1072 			 * Some of the to-be-overwritten bytes are on the next
1073 			 * piece, but we'll append all the new bytes to this
1074 			 * piece if we haven't already written them.
1075 			 */
1076 			data_size = doff + (add_bytes ? data->size : 0);
1077 		else
1078 			data_size = old_hdr->size -
1079 				dlen + (add_bytes ? data->size : 0);
1080 		data_size += remaining;
1081 
1082 		if (data_size > buflen) {
1083 			if (__os_realloc(dbp->env, data_size, &buf) != 0)
1084 				return (ENOMEM);
1085 			buflen = data_size;
1086 		}
1087 		t_data.data = buf;
1088 
1089 		/*
1090 		 * Adjust past any remaining bytes, they've already been moved
1091 		 * to the beginning of the buffer.
1092 		 */
1093 		buf += remaining;
1094 		remaining = 0;
1095 
1096 		olddata = (u_int8_t *)old_hdr + HEAP_HDRSIZE(old_hdr);
1097 		if (doff >= old_hdr->size) {
1098 			memcpy(buf, olddata, old_hdr->size);
1099 			doff -= old_hdr->size;
1100 			if (F_ISSET(old_hdr, HEAP_RECLAST) ||
1101 			    !F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1102 				/* Post-pending. */
1103 				buf += old_hdr->size;
1104 				memset(buf, '\0', doff);
1105 				buf += doff;
1106 				memcpy(buf, data->data, data->size);
1107 			}
1108 		} else {
1109 			/* Preserve the first doff bytes. */
1110 			memcpy(buf, olddata, doff);
1111 			buf += doff;
1112 			olddata += doff;
1113 			/* Copy in the new bytes, if needed. */
1114 			if (add_bytes) {
1115 				memcpy(buf, data->data, data->size);
1116 				buf += data->size;
1117 				add_bytes = 0;
1118 			}
1119 			/* Skip dlen bytes. */
1120 			if (doff + dlen < old_hdr->size) {
1121 				olddata += dlen;
1122 				memcpy(buf,
1123 				    olddata, old_hdr->size - doff - dlen);
1124 				dlen = 0;
1125 			} else
1126 				/*
1127 				 * The data to be removed spills over onto the
1128 				 * following page(s).  Adjust dlen to account
1129 				 * for the bytes removed from this page.
1130 				 */
1131 				dlen = doff + dlen - old_hdr->size;
1132 			doff = 0;
1133 		}
1134 		buf = t_data.data;
1135 
1136 		/* Delete the old data, after logging it. */
1137 		old_size = DB_ALIGN(
1138 		    old_hdr->size + HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1139 		if (old_size < sizeof(HEAPSPLITHDR))
1140 			old_size = sizeof(HEAPSPLITHDR);
1141 		if (DBC_LOGGING(dbc)) {
1142 			hdr_dbt.data = old_hdr;
1143 			hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1144 			log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1145 			log_dbt.size = DB_ALIGN(
1146 			    old_hdr->size, sizeof(u_int32_t));
1147 			if ((ret = __heap_addrem_log(dbp, dbc->txn,
1148 			    &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1149 			    (u_int32_t)cp->indx, old_size,
1150 			    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1151 				goto err;
1152 		} else
1153 			LSN_NOT_LOGGED(LSN(cp->page));
1154 		if ((ret = __heap_ditem(
1155 		    dbc, cp->page, cp->indx, old_size)) != 0)
1156 			goto err;
1157 
1158 		if (left == 0)
1159 			/*
1160 			 * We've finished writing the new record, we're just
1161 			 * cleaning up the old record now.
1162 			 */
1163 			goto next_pg;
1164 
1165 		if (data_size == 0 && !IS_FIRST) {
1166 			/*
1167 			 * This piece is being completely removed.  We need to
1168 			 * adjust the header of the previous piece now.
1169 			 */
1170 			ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
1171 			    last_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1172 			if (ret != 0)
1173 				goto err;
1174 
1175 			cp->indx = last_rid.indx;
1176 			old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1177 
1178 			if (DBC_LOGGING(dbc)) {
1179 				old_size = DB_ALIGN(old_hdr->size +
1180 				    HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1181 				hdr_dbt.data = old_hdr;
1182 				hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1183 				log_dbt.data =
1184 				    (u_int8_t *)old_hdr + hdr_dbt.size;
1185 				log_dbt.size = DB_ALIGN(
1186 				    old_hdr->size, sizeof(u_int32_t));
1187 				if ((ret = __heap_addrem_log(dbp, dbc->txn,
1188 				    &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1189 				    (u_int32_t)cp->indx, old_size,
1190 				    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1191 					goto err;
1192 			} else
1193 				LSN_NOT_LOGGED(LSN(cp->page));
1194 
1195 			((HEAPSPLITHDR *)old_hdr)->nextpg = next_rid.pgno;
1196 			((HEAPSPLITHDR *)old_hdr)->nextindx = next_rid.indx;
1197 
1198 			if (DBC_LOGGING(dbc)) {
1199 				if ((ret = __heap_addrem_log(dbp, dbc->txn,
1200 				    &LSN(cp->page), 0, DB_ADD_HEAP, cp->pgno,
1201 				    (u_int32_t)cp->indx, old_size,
1202 				    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1203 					goto err;
1204 			} else
1205 				LSN_NOT_LOGGED(LSN(cp->page));
1206 
1207 			DISCARD(dbc, cp->page, cp->lock, 1, ret);
1208 
1209 			goto next_pg;
1210 		}
1211 
1212 		/* Set up the header for the new record. */
1213 		memset(&new_hdr, 0, sizeof(HEAPSPLITHDR));
1214 		new_hdr.std_hdr.flags = HEAP_RECSPLIT;
1215 		/*
1216 		 * If next_rid.pgno == PGNO_INVALID and there's still more data,
1217 		 * we'll come back and correct the header once we know where the
1218 		 * next piece lives.
1219 		 */
1220 		new_hdr.nextpg = next_rid.pgno;
1221 		new_hdr.nextindx = next_rid.indx;
1222 		/*
1223 		 * Figure out how much we can fit on the page, rounding down to
1224 		 * a multiple of 4.  If we will have to expand the offset table,
1225 		 * account for that. It needs to be enough to at least fit the
1226 		 * split header.
1227 		 */
1228 		size = HEAP_FREESPACE(dbp, cp->page);
1229 		if (NUM_ENT(cp->page) == 0 ||
1230 		    cp->indx > HEAP_HIGHINDX(cp->page))
1231 			size -= sizeof(db_indx_t);
1232 		/* Round down to a multiple of 4. */
1233 		size = DB_ALIGN(
1234 		    size - sizeof(u_int32_t) + 1, sizeof(u_int32_t));
1235 		DB_ASSERT(dbp->env, size >= sizeof(HEAPSPLITHDR));
1236 
1237 		/*
1238 		 * We try to fill the page, but cannot write more than
1239 		 * t_data.size bytes, that's all we have in-memory.
1240 		 */
1241 		new_hdr.std_hdr.size = (u_int16_t)
1242 		    (size - sizeof(HEAPSPLITHDR));
1243 		if (new_hdr.std_hdr.size > data_size)
1244 			new_hdr.std_hdr.size = data_size;
1245 		if (new_hdr.std_hdr.size >= left) {
1246 			new_hdr.std_hdr.size = left;
1247 			new_hdr.std_hdr.flags |= HEAP_RECLAST;
1248 			new_hdr.nextpg = PGNO_INVALID;
1249 			new_hdr.nextindx = 0;
1250 		}
1251 		if (IS_FIRST) {
1252 			new_hdr.std_hdr.flags |= HEAP_RECFIRST;
1253 			new_hdr.tsize = left;
1254 		}
1255 
1256 		/* Now write the new data to the page. */
1257 		t_data.size = new_hdr.std_hdr.size;
1258 		hdr_dbt.data = &new_hdr;
1259 		hdr_dbt.size = sizeof(HEAPSPLITHDR);
1260 		/* Log the write. */
1261 		if (DBC_LOGGING(dbc)) {
1262 			if ((ret = __heap_addrem_log(dbp,
1263 			    dbc->txn, &LSN(cp->page), 0,
1264 			    DB_ADD_HEAP, cp->pgno, (u_int32_t)cp->indx,
1265 			    size, &hdr_dbt, &t_data, &LSN(cp->page))) != 0)
1266 				goto err;
1267 		} else
1268 			LSN_NOT_LOGGED(LSN(cp->page));
1269 		if ((ret = __heap_pitem(dbc,
1270 		    (PAGE *)cp->page, cp->indx, size, &hdr_dbt, &t_data)) != 0)
1271 			goto err;
1272 
1273 		left -= new_hdr.std_hdr.size;
1274 		/*
1275 		 * If any data couldn't fit on this page, it has to go onto the
1276 		 * next.  Copy it to the front of the buffer and it will be
1277 		 * preserved in the next loop.
1278 		 */
1279 		if (new_hdr.std_hdr.size < data_size) {
1280 			remaining = data_size - new_hdr.std_hdr.size;
1281 			memmove(buf, buf + new_hdr.std_hdr.size, remaining);
1282 		}
1283 
1284 		/*
1285 		 * Remember this piece's RID, we may need to update the header
1286 		 * if the next data piece is removed, or if this is the final
1287 		 * piece and we add data to the end of the record.
1288 		 */
1289 next_pg:	last_rid.pgno = cp->pgno;
1290 		last_rid.indx = cp->indx;
1291 		/* Get the next page, if any. */
1292 		if (next_rid.pgno != PGNO_INVALID) {
1293 			ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
1294 			    next_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1295 			if (ret != 0)
1296 				goto err;
1297 			cp->indx = next_rid.indx;
1298 			old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1299 			DB_ASSERT(dbp->env,
1300 			    HEAP_HIGHINDX(cp->page) <= cp->indx);
1301 			DB_ASSERT(dbp->env, F_ISSET(old_hdr, HEAP_RECSPLIT));
1302 		} else {
1303 			/* Discard the page and drop the lock, txn-ally. */
1304 			DISCARD(dbc, cp->page, cp->lock, 1, ret);
1305 			if (ret != 0)
1306 				goto err;
1307 			break;
1308 		}
1309 	}
1310 
1311 	/*
1312 	 * If there is more work to do, let heapc_split do it.  After
1313 	 * heapc_split returns we need to update nextpg and nextindx in the
1314 	 * header of the last piece we wrote above.
1315 	 *
1316 	 * For logging purposes, we "delete" the old record and then "add" the
1317 	 * record.  This makes redo/undo work as-is, but we won't actually
1318 	 * delete and re-add the record.
1319 	 */
1320 	if (left > 0) {
1321 		memset(&t_key, 0, sizeof(DBT));
1322 		t_key.size = t_key.ulen = sizeof(DB_HEAP_RID);
1323 		t_key.data = &next_rid;
1324 		t_key.flags = DB_DBT_USERMEM;
1325 		t_data.size = left;
1326 		if ((ret = __heapc_split(dbc, &t_key, &t_data, 0)) != 0)
1327 			goto err;
1328 
1329 		ACQUIRE_CUR(dbc,
1330 		    DB_LOCK_WRITE, last_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1331 		if (ret != 0)
1332 			goto err;
1333 
1334 		cp->indx = last_rid.indx;
1335 		old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1336 
1337 		if (DBC_LOGGING(dbc)) {
1338 			old_size = DB_ALIGN(old_hdr->size +
1339 			    HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1340 			hdr_dbt.data = old_hdr;
1341 			hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1342 			log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1343 			log_dbt.size = DB_ALIGN(
1344 			    old_hdr->size, sizeof(u_int32_t));
1345 			if ((ret = __heap_addrem_log(dbp, dbc->txn,
1346 			    &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1347 			    (u_int32_t)cp->indx, old_size,
1348 			    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1349 				goto err;
1350 		} else
1351 			LSN_NOT_LOGGED(LSN(cp->page));
1352 
1353 		((HEAPSPLITHDR *)old_hdr)->nextpg = next_rid.pgno;
1354 		((HEAPSPLITHDR *)old_hdr)->nextindx = next_rid.indx;
1355 
1356 		if (DBC_LOGGING(dbc)) {
1357 			if ((ret = __heap_addrem_log(dbp, dbc->txn,
1358 			    &LSN(cp->page), 0, DB_ADD_HEAP, cp->pgno,
1359 			    (u_int32_t)cp->indx, old_size,
1360 			    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1361 				goto err;
1362 		} else
1363 			LSN_NOT_LOGGED(LSN(cp->page));
1364 
1365 		DISCARD(dbc, cp->page, cp->lock, 1, ret);
1366 	}
1367 
1368 err:	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1369 	if (buf != NULL)
1370 		__os_free(dbp->env, buf);
1371 	return (ret);
1372 }
1373 
1374 /*
1375  * __heapc_reloc --
1376  *	 Move data from a too-full page to a new page.  The old data page must
1377  *	 be write locked before calling this method.
1378  */
1379 static int
__heapc_reloc(dbc,key,data)1380 __heapc_reloc(dbc, key, data)
1381 	DBC *dbc;
1382 	DBT *key;
1383 	DBT *data;
1384 {
1385 	DB *dbp;
1386 	DBT hdr_dbt, log_dbt, t_data, t_key;
1387 	DB_HEAP_RID last_rid, next_rid;
1388 	HEAPHDR *old_hdr;
1389 	HEAPSPLITHDR new_hdr;
1390 	HEAP_CURSOR *cp;
1391 	int is_first, ret;
1392 	u_int32_t left, old_size, size;
1393 
1394 	dbp = dbc->dbp;
1395 	cp = (HEAP_CURSOR *)dbc->internal;
1396 	old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1397 	memset(&hdr_dbt, 0, sizeof(DBT));
1398 	memset(&log_dbt, 0, sizeof(DBT));
1399 	COMPQUIET(key, NULL);
1400 
1401 	/*
1402 	 * We are updating an existing record, which will grow into a split
1403 	 * record.  The strategy is to overwrite the existing record (or each
1404 	 * piece of the record if the record is already split.)  If the new
1405 	 * record is shorter than the old, delete any extra pieces.  If the new
1406 	 * record is longer than the old, use heapc_split() to write the extra
1407 	 * data.
1408 	 *
1409 	 * We start each loop with t_data.data positioned to the next byte to be
1410 	 * written, old_hdr pointed at the header for the old record and the
1411 	 * necessary page write locked in cp->page.
1412 	 */
1413 	is_first = 1;
1414 	left = data->size;
1415 	memset(&t_data, 0, sizeof(DBT));
1416 	t_data.data = data->data;
1417 	for (;;) {
1418 		/* Figure out if we have a next piece. */
1419 		if (F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1420 			next_rid.pgno = ((HEAPSPLITHDR *)old_hdr)->nextpg;
1421 			next_rid.indx = ((HEAPSPLITHDR *)old_hdr)->nextindx;
1422 		} else {
1423 			next_rid.pgno = PGNO_INVALID;
1424 			next_rid.indx = 0;
1425 		}
1426 
1427 		/* Delete the old data, after logging it. */
1428 		old_size = DB_ALIGN(
1429 		    old_hdr->size + HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1430 		if (old_size < sizeof(HEAPSPLITHDR))
1431 			old_size = sizeof(HEAPSPLITHDR);
1432 		if (DBC_LOGGING(dbc)) {
1433 			hdr_dbt.data = old_hdr;
1434 			hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1435 			log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1436 			log_dbt.size = DB_ALIGN(
1437 			    old_hdr->size, sizeof(u_int32_t));
1438 			if ((ret = __heap_addrem_log(dbp, dbc->txn,
1439 			    &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1440 			    (u_int32_t)cp->indx, old_size,
1441 			    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1442 				goto err;
1443 		} else
1444 			LSN_NOT_LOGGED(LSN(cp->page));
1445 		if ((ret = __heap_ditem(
1446 		    dbc, cp->page, cp->indx, old_size)) != 0)
1447 			goto err;
1448 
1449 		if (left == 0)
1450 			/*
1451 			 * We've finished writing the new record, we're just
1452 			 * cleaning up the old record now.
1453 			 */
1454 			goto next_pg;
1455 
1456 		/* Set up the header for the new record. */
1457 		memset(&new_hdr, 0, sizeof(HEAPSPLITHDR));
1458 		new_hdr.std_hdr.flags = HEAP_RECSPLIT;
1459 		/* We'll set this later if next_rid.pgno == PGNO_INVALID. */
1460 		new_hdr.nextpg = next_rid.pgno;
1461 		new_hdr.nextindx = next_rid.indx;
1462 		/*
1463 		 * Figure out how much we can fit on the page, rounding down to
1464 		 * a multiple of 4.  If we will have to expand the offset table,
1465 		 * account for that.It needs to be enough to at least fit the
1466 		 * split header.
1467 		 */
1468 		size = HEAP_FREESPACE(dbp, cp->page);
1469 		if (NUM_ENT(cp->page) == 0 ||
1470 		    cp->indx > HEAP_HIGHINDX(cp->page))
1471 			size -= sizeof(db_indx_t);
1472 		/* Round down to a multiple of 4. */
1473 		size = DB_ALIGN(
1474 		    size - sizeof(u_int32_t) + 1, sizeof(u_int32_t));
1475 		DB_ASSERT(dbp->env, size >= sizeof(HEAPSPLITHDR));
1476 		new_hdr.std_hdr.size =
1477 		    (u_int16_t)(size - sizeof(HEAPSPLITHDR));
1478 		if (new_hdr.std_hdr.size >= left) {
1479 			new_hdr.std_hdr.size = left;
1480 			new_hdr.std_hdr.flags |= HEAP_RECLAST;
1481 			new_hdr.nextpg = PGNO_INVALID;
1482 			new_hdr.nextindx = 0;
1483 		}
1484 		if (is_first) {
1485 			new_hdr.std_hdr.flags |= HEAP_RECFIRST;
1486 			new_hdr.tsize = left;
1487 			is_first = 0;
1488 		}
1489 
1490 		/* Now write the new data to the page. */
1491 		t_data.size = new_hdr.std_hdr.size;
1492 		hdr_dbt.data = &new_hdr;
1493 		hdr_dbt.size = sizeof(HEAPSPLITHDR);
1494 		/* Log the write. */
1495 		if (DBC_LOGGING(dbc)) {
1496 			if ((ret = __heap_addrem_log(dbp,
1497 			    dbc->txn, &LSN(cp->page), 0,
1498 			    DB_ADD_HEAP, cp->pgno, (u_int32_t)cp->indx,
1499 			    size, &hdr_dbt, &t_data, &LSN(cp->page))) != 0)
1500 				goto err;
1501 		} else
1502 			LSN_NOT_LOGGED(LSN(cp->page));
1503 		if ((ret = __heap_pitem(dbc,
1504 		    (PAGE *)cp->page, cp->indx, size, &hdr_dbt, &t_data)) != 0)
1505 			goto err;
1506 
1507 		left -= new_hdr.std_hdr.size;
1508 		t_data.data = (u_int8_t *)(t_data.data) + new_hdr.std_hdr.size;
1509 
1510 		/* Get the next page, if any. */
1511 next_pg:	if (next_rid.pgno != PGNO_INVALID) {
1512 			ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
1513 			    next_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1514 			if (ret != 0)
1515 				goto err;
1516 			cp->indx = next_rid.indx;
1517 			old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1518 		} else {
1519 			/*
1520 			 * Remember the final piece's RID, we may need to update
1521 			 * the header after writing the rest of the record.
1522 			 */
1523 			last_rid.pgno = cp->pgno;
1524 			last_rid.indx = cp->indx;
1525 			/* Discard the page and drop the lock, txn-ally. */
1526 			DISCARD(dbc, cp->page, cp->lock, 1, ret);
1527 			if (ret != 0)
1528 				goto err;
1529 			break;
1530 		}
1531 	}
1532 
1533 	/*
1534 	 * If there is more work to do, let heapc_split do it.  After
1535 	 * heapc_split returns we need to update nextpg and nextindx in the
1536 	 * header of the last piece we wrote above.
1537 	 *
1538 	 * For logging purposes, we "delete" the old record and then "add" the
1539 	 * record.  This makes redo/undo work as-is, but we won't actually
1540 	 * delete and re-add the record.
1541 	 */
1542 	if (left > 0) {
1543 		memset(&t_key, 0, sizeof(DBT));
1544 		t_key.size = t_key.ulen = sizeof(DB_HEAP_RID);
1545 		t_key.data = &next_rid;
1546 		t_key.flags = DB_DBT_USERMEM;
1547 		t_data.size = left;
1548 		if ((ret = __heapc_split(dbc, &t_key, &t_data, 0)) != 0)
1549 			goto err;
1550 
1551 		ACQUIRE_CUR(dbc,
1552 		    DB_LOCK_WRITE, last_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1553 		if (ret != 0)
1554 			goto err;
1555 
1556 		cp->indx = last_rid.indx;
1557 		old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1558 
1559 		if (DBC_LOGGING(dbc)) {
1560 			old_size = DB_ALIGN(old_hdr->size +
1561 			    HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1562 			hdr_dbt.data = old_hdr;
1563 			hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1564 			log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1565 			log_dbt.size = DB_ALIGN(
1566 			    old_hdr->size, sizeof(u_int32_t));
1567 			if ((ret = __heap_addrem_log(dbp, dbc->txn,
1568 			    &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1569 			    (u_int32_t)cp->indx, old_size,
1570 			    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1571 				goto err;
1572 		} else
1573 			LSN_NOT_LOGGED(LSN(cp->page));
1574 
1575 		((HEAPSPLITHDR *)old_hdr)->nextpg = next_rid.pgno;
1576 		((HEAPSPLITHDR *)old_hdr)->nextindx = next_rid.indx;
1577 
1578 		if (DBC_LOGGING(dbc)) {
1579 			if ((ret = __heap_addrem_log(dbp, dbc->txn,
1580 			    &LSN(cp->page), 0, DB_ADD_HEAP, cp->pgno,
1581 			    (u_int32_t)cp->indx,old_size,
1582 			    &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1583 				goto err;
1584 		} else
1585 			LSN_NOT_LOGGED(LSN(cp->page));
1586 
1587 		DISCARD(dbc, cp->page, cp->lock, 1, ret);
1588 	}
1589 
1590 err:	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1591 	return (ret);
1592 }
1593 
1594 /*
1595  * __heapc_put --
1596  *
1597  * Put using a cursor.  If the given key exists, update the associated data.  If
1598  * the given key does not exsist, return an error.
1599  */
1600 static int
__heapc_put(dbc,key,data,flags,pgnop)1601 __heapc_put(dbc, key, data, flags, pgnop)
1602 	DBC *dbc;
1603 	DBT *key;
1604 	DBT *data;
1605 	u_int32_t flags;
1606 	db_pgno_t *pgnop;
1607 {
1608 	DB *dbp;
1609 	DBT hdr_dbt, log_dbt, new_data;
1610 	DB_MPOOLFILE *mpf;
1611 	HEAPHDR hdr, *old_hdr;
1612 	HEAP_CURSOR *cp;
1613 	PAGE *rpage;
1614 	db_pgno_t region_pgno;
1615 	int oldspace, ret, space, t_ret;
1616 	u_int32_t data_size, dlen, new_size, old_flags, old_size, tot_size;
1617 	u_int8_t *buf, *olddata, *src, *dest;
1618 
1619 	dbp = dbc->dbp;
1620 	mpf = dbp->mpf;
1621 	cp = (HEAP_CURSOR *)dbc->internal;
1622 	rpage = NULL;
1623 	buf = dest = src = NULL;
1624 	dlen = 0;
1625 
1626 	if (flags != DB_CURRENT) {
1627 		/* We're going to write following the get, so use RMW. */
1628 		old_flags = dbc->flags;
1629 		F_SET(dbc, DBC_RMW);
1630 		ret = __heapc_get(dbc, key, data, DB_SET, pgnop);
1631 		F_CLR(key, DB_DBT_ISSET);
1632 		dbc->flags = old_flags;
1633 		DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1634 		if (ret != 0)
1635 			return (ret);
1636 		else if (flags == DB_NOOVERWRITE)
1637 			return (DB_KEYEXIST);
1638 		if ((ret = __memp_dirty(mpf, &cp->page,
1639 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1640 			return (ret);
1641 	} else {
1642 		/* We have a read lock, but need a write lock. */
1643 		if (STD_LOCKING(dbc) && cp->lock_mode != DB_LOCK_WRITE &&
1644 		    (ret = __db_lget(dbc,
1645 		    LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
1646 			return (ret);
1647 
1648 		if ((ret = __memp_fget(mpf, &cp->pgno, dbc->thread_info,
1649 		    dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0)
1650 			return (ret);
1651 	}
1652 
1653 	/* We've got the page locked and stored in cp->page. */
1654 	HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), oldspace);
1655 
1656 	/*
1657 	 * Figure out the spacing issue.  There is a very rare corner case where
1658 	 * we don't have enough space on the page to expand the data. Splitting
1659 	 * the record results in a larger header, if the page is jam packed
1660 	 * there might not be room for the larger header.
1661 	 *
1662 	 * hdr->size is the size of the stored data, it doesn't include any
1663 	 * padding.
1664 	 */
1665 	old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1666 	/* Need data.size + header size, 4-byte aligned. */
1667 	old_size =
1668 	    DB_ALIGN(old_hdr->size + HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1669 	if (old_size < sizeof(HEAPSPLITHDR))
1670 		old_size = sizeof(HEAPSPLITHDR);
1671 	if (F_ISSET(data, DB_DBT_PARTIAL)) {
1672 		if (F_ISSET(old_hdr, HEAP_RECSPLIT))
1673 			tot_size = ((HEAPSPLITHDR *)old_hdr)->tsize;
1674 		else
1675 			tot_size = old_hdr->size;
1676 		if (tot_size < data->doff) {
1677 			/* Post-pending */
1678 			dlen = data->dlen;
1679 			data_size = data->doff + data->size;
1680 		} else {
1681 			if (tot_size - data->doff < data->dlen)
1682 				dlen = tot_size - data->doff;
1683 			else
1684 				dlen = data->dlen;
1685 			data_size = tot_size - dlen + data->size;
1686 		}
1687 	} else
1688 		data_size = data->size;
1689 	new_size = DB_ALIGN(data_size + sizeof(HEAPHDR), sizeof(u_int32_t));
1690 	if (new_size < sizeof(HEAPSPLITHDR))
1691 		new_size = sizeof(HEAPSPLITHDR);
1692 
1693 	/* Check whether we actually have enough space on this page. */
1694 	if (F_ISSET(old_hdr, HEAP_RECSPLIT) ||
1695 	    (new_size > old_size &&
1696 	    new_size - old_size > HEAP_FREESPACE(dbp, cp->page))) {
1697 		/*
1698 		 * We've got to split the record, not enough room on the
1699 		 * page.  Splitting the record will remove old_size bytes and
1700 		 * introduce at least sizeof(HEAPSPLITHDR).
1701 		 */
1702 		if (F_ISSET(data, DB_DBT_PARTIAL))
1703 			return (__heapc_reloc_partial(dbc, key, data));
1704 		else
1705 			return (__heapc_reloc(dbc, key, data));
1706 	}
1707 
1708 	memset(&new_data, 0, sizeof(DBT));
1709 	new_data.size = data_size;
1710 	if (F_ISSET(data, DB_DBT_PARTIAL)) {
1711 		/*
1712 		 * Before replacing the old data, we need to use it to build the
1713 		 * new data.
1714 		 */
1715 		if ((ret = __os_malloc(dbp->env, data_size, &buf)) != 0)
1716 			goto err;
1717 		new_data.data = buf;
1718 
1719 		/*
1720 		 * Preserve data->doff bytes at the start, or all of the old
1721 		 * record plus padding, if post-pending.
1722 		 */
1723 		olddata = (u_int8_t *)old_hdr + sizeof(HEAPHDR);
1724 		if (data->doff > old_hdr->size) {
1725 			memcpy(buf, olddata, old_hdr->size);
1726 			buf += old_hdr->size;
1727 			memset(buf, '\0', data->doff - old_hdr->size);
1728 			buf += data->doff - old_hdr->size;
1729 		} else {
1730 			memcpy(buf, olddata, data->doff);
1731 			buf += data->doff;
1732 		}
1733 
1734 		/* Now copy in the user's data. */
1735 		memcpy(buf, data->data, data->size);
1736 		buf += data->size;
1737 
1738 		/* Fill in remaining data from the old record, skipping dlen. */
1739 		if (data->doff < old_hdr->size) {
1740 			olddata += data->doff + data->dlen;
1741 			memcpy(buf,
1742 			    olddata, old_hdr->size - data->doff - data->dlen);
1743 		}
1744 	} else {
1745 		new_data.data = data->data;
1746 	}
1747 
1748 	/*
1749 	 * Do the update by deleting the old record and writing the new
1750 	 * record.  Start by logging the entire operation.
1751 	 */
1752 	memset(&hdr, 0, sizeof(HEAPHDR));
1753 	hdr.size = data_size;
1754 	if (DBC_LOGGING(dbc)) {
1755 		hdr_dbt.data = old_hdr;
1756 		hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1757 		log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1758 		log_dbt.size = DB_ALIGN(old_hdr->size, sizeof(u_int32_t));
1759 		if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
1760 		    0, DB_REM_HEAP, cp->pgno, (u_int32_t)cp->indx,
1761 		    old_size, &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1762 			goto err;
1763 		hdr_dbt.data = &hdr;
1764 		hdr_dbt.size = HEAP_HDRSIZE(&hdr);
1765 		if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
1766 		    0, DB_ADD_HEAP, cp->pgno, (u_int32_t)cp->indx,
1767 		    new_size, &hdr_dbt, &new_data, &LSN(cp->page))) != 0)
1768 			goto err;
1769 	} else
1770 		LSN_NOT_LOGGED(LSN(cp->page));
1771 
1772 	if ((ret = __heap_ditem(dbc, cp->page, cp->indx, old_size)) != 0)
1773 		goto err;
1774 	hdr_dbt.data = &hdr;
1775 	hdr_dbt.size = HEAP_HDRSIZE(&hdr);
1776 	if ((ret = __heap_pitem(dbc,
1777 	    (PAGE *)cp->page, cp->indx, new_size, &hdr_dbt, &new_data)) != 0)
1778 		goto err;
1779 
1780 	/* Check whether we need to update the space bitmap. */
1781 	HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), space);
1782 
1783 	if (space != oldspace) {
1784 		/* Get the region page with an exclusive latch. */
1785 		region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
1786 
1787 		if ((ret = __memp_fget(mpf, &region_pgno,
1788 		    dbc->thread_info, NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
1789 			goto err;
1790 
1791 		HEAP_SETSPACE(dbp, rpage, cp->pgno - region_pgno - 1, space);
1792 	}
1793 
1794 err:	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1795 	if (rpage != NULL && (t_ret = __memp_fput(mpf,
1796 	    dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
1797 		ret = t_ret;
1798 	if (F_ISSET(data, DB_DBT_PARTIAL))
1799 		__os_free(dbp->env, new_data.data);
1800 
1801 	if (ret != 0 && LOCK_ISSET(cp->lock))
1802 		(void)__TLPUT(dbc, cp->lock);
1803 
1804 	return (ret);
1805 }
1806 
1807 /*
1808  * __heap_getpage --
1809  *	Return a page with sufficient free space.  The page will be write locked
1810  *	and marked dirty.
1811  */
1812 static int
__heap_getpage(dbc,size,avail)1813 __heap_getpage(dbc, size, avail)
1814 	DBC *dbc;
1815 	u_int32_t size;
1816 	u_int8_t *avail;
1817 {
1818 	DB *dbp;
1819 	DBMETA *meta;
1820 	DB_LOCK meta_lock;
1821 	DB_LSN meta_lsn;
1822 	DB_MPOOLFILE *mpf;
1823 	HEAP *h;
1824 	HEAPPG *rpage;
1825 	HEAP_CURSOR *cp;
1826 	db_pgno_t data_pgno, *lkd_pgs, meta_pgno, region_pgno, start_region;
1827 	int i, lk_mode, max, p, ret, space, start, t_ret;
1828 
1829 	LOCK_INIT(meta_lock);
1830 	dbp = dbc->dbp;
1831 	mpf = dbp->mpf;
1832 	cp = (HEAP_CURSOR *)dbc->internal;
1833 	h = dbp->heap_internal;
1834 	start_region = region_pgno = h->curregion;
1835 	max = HEAP_REGION_SIZE(dbp);
1836 	i = ret = t_ret = 0;
1837 	lkd_pgs = NULL;
1838 
1839 	/*
1840 	 * The algorithm for finding a page:
1841 	 *
1842 	 * Look in the space bitmap of the current region page for a data page
1843 	 * with at least size bytes free.  Once we find a page, try to lock it
1844 	 * and if we get the lock we're done.
1845 	 *
1846 	 * Don't wait for a locked region page, just move on to the next region
1847 	 * page, creating it if it doesn't exist.  If the size of the heap
1848 	 * database is not constrained, just keep creating regions and extending
1849 	 * the database until we find a page with space.  If the database size
1850 	 * is constrained, loop back to the first region page from the final
1851 	 * region page.  If we wind up making it all the way back to where our
1852 	 * search began, we need to start waiting for locked region pages.  If
1853 	 * we finish another loop through the database waiting for every region
1854 	 * page, we know there's no room.
1855 	 */
1856 
1857 	/*
1858 	 * Figure out the % of the page the data will occupy and translate that
1859 	 * to the relevant bit-map value we need to look for.
1860 	 */
1861 	HEAP_CALCSPACEBITS(dbp, size, space);
1862 
1863 	/*
1864 	 * Get the current region page, with a shared latch.  On the first loop
1865 	 * through a fixed size database, we move on to the next region if the
1866 	 * page is locked.  On the second loop, we wait for locked region
1867 	 * pages.  If the database isn't fixed size, we never wait, we'll
1868 	 * eventually get to use one of the region pages we create.
1869 	 */
1870 	lk_mode = DB_MPOOL_TRY;
1871 find:	while ((ret = __memp_fget(mpf, &region_pgno,
1872 	    dbc->thread_info, NULL, lk_mode, &rpage)) != 0 ||
1873 	    TYPE(rpage) != P_IHEAP) {
1874 		if (ret == DB_LOCK_NOTGRANTED)
1875 			goto next_region;
1876 		if (ret != 0 && ret != DB_PAGE_NOTFOUND)
1877 			return (ret);
1878 		/*
1879 		 * The region page doesn't exist, or hasn't been initialized,
1880 		 * create it, then try again.  If the page exists, we have to
1881 		 * drop it before initializing the region.
1882 		 */
1883 		if (ret == 0 && (ret = __memp_fput(
1884 		    mpf, dbc->thread_info, rpage, dbc->priority)) != 0)
1885 			return (ret);
1886 
1887 		if ((ret = __heap_create_region(dbc, region_pgno)) != 0)
1888 			return (ret);
1889 	}
1890 
1891 	start = h->curpgindx;
1892 	/*
1893 	 * If this is the last region page in a fixed size db, figure out the
1894 	 * maximum pgno in the bitmap.
1895 	 */
1896 	if (region_pgno + max > h->maxpgno)
1897 		max = h->maxpgno - region_pgno;
1898 	/*
1899 	 * Look in the bitmap for a page with sufficient free space.  We use i
1900 	 * in a slightly strange way.  Because the 2-bits in the bitmap are only
1901 	 * an estimate, there is a chance the data won't fit on the page we
1902 	 * choose.  In that case, we re-start the process and want to be able to
1903 	 * resume this loop where we left off.
1904 	 */
1905 	for (; i < max; i++) {
1906 		p = start + i;
1907 		if (p >= max)
1908 			p -= max;
1909 		if ((*avail = HEAP_SPACE(dbp, rpage, p)) > space)
1910 			continue;
1911 		data_pgno = region_pgno + p + 1;
1912 		ACQUIRE_CUR(dbc,
1913 		    DB_LOCK_WRITE, data_pgno, DB_LOCK_NOWAIT, 0, ret);
1914 		/*
1915 		 * If we have the lock and the page or have the lock and need to
1916 		 * create the page, we're good.  If we don't have the lock, try
1917 		 * to find different page.
1918 		 */
1919 		if (ret == 0 || ret == DB_PAGE_NOTFOUND)
1920 			break;
1921 		else if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) {
1922 			ret = 0;
1923 			continue;
1924 		} else
1925 			goto err;
1926 	}
1927 
1928 	/*
1929 	 * Keep a worst case range of highest used page in the region.
1930 	 */
1931 	if (i < max && data_pgno > rpage->high_pgno) {
1932 		if ((ret = __memp_dirty(mpf,
1933 		    &rpage, dbc->thread_info, NULL, dbc->priority, 0)) != 0)
1934 			goto err;
1935 		/* We might have blocked, check again */
1936 		if (data_pgno > rpage->high_pgno)
1937 			rpage->high_pgno = data_pgno;
1938 	}
1939 
1940 	/* Done with the region page, even if we didn't find a page. */
1941 	if ((ret = __memp_fput(mpf,
1942 	    dbc->thread_info, rpage, dbc->priority)) != 0) {
1943 		/* Did not read the data page, so we can release its lock. */
1944 		DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
1945 		goto err;
1946 	}
1947 	rpage = NULL;
1948 
1949 	if (i >= max) {
1950 		/*
1951 		 * No free pages on this region page, advance to the next region
1952 		 * page.  If we're at the end of a fixed size heap db, loop
1953 		 * around to the first region page.  There is not currently a
1954 		 * data page locked.
1955 		 */
1956 next_region:	region_pgno += HEAP_REGION_SIZE(dbp) + 1;
1957 
1958 		if (region_pgno > h->maxpgno)
1959 			region_pgno = FIRST_HEAP_RPAGE;
1960 
1961 		if (region_pgno == start_region) {
1962 			/*
1963 			 * We're in a fixed size db and we've looped through all
1964 			 * region pages.
1965 			 */
1966 
1967 			if (lk_mode == DB_MPOOL_TRY) {
1968 				/*
1969 				 * We may have missed a region page with room,
1970 				 * because we didn't wait for locked pages.  Try
1971 				 * another loop, waiting for all pages.
1972 				 */
1973 				lk_mode = 0;
1974 			} else {
1975 				/*
1976 				 * We've seen every region page, because we
1977 				 * waited for all pages.  No room.
1978 				 */
1979 				ret = DB_HEAP_FULL;
1980 				goto err;
1981 			}
1982 		}
1983 
1984 		h->curregion = region_pgno;
1985 		h->curpgindx = 0;
1986 		i = 0;
1987 		goto find;
1988 	}
1989 
1990 	/*
1991 	 * At this point we have the page locked.  If we have the page, we need
1992 	 * to mark it dirty.  If we don't have the page (or if the page is
1993 	 * empty) we need to create and initialize it.
1994 	 */
1995 	if (cp->pgno == PGNO_INVALID || PGNO(cp->page) == PGNO_INVALID) {
1996 		/*
1997 		 * The data page needs to be created and the metadata page needs
1998 		 * to be updated.  Once we get the metadata page, we must not
1999 		 * jump to err, the metadata page and lock are put back here.
2000 		 *
2001 		 * It is possible that the page was created by an aborted txn,
2002 		 * in which case the page exists but is all zeros.  We still
2003 		 * need to "create" it and log the creation.
2004 		 *
2005 		 */
2006 
2007 		meta_pgno = PGNO_BASE_MD;
2008 		if ((ret = __db_lget(dbc, LCK_ALWAYS, meta_pgno,
2009 		    DB_LOCK_WRITE, DB_LOCK_NOWAIT, &meta_lock)) != 0) {
2010 			/*
2011 			 * We don't want to block while having latched
2012 			 * a page off the end of file.  This could
2013 			 * get truncated by another thread and we
2014 			 * will deadlock.
2015 			 */
2016 			p = cp->page != NULL;
2017 			DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2018 			if (t_ret != 0 ||
2019 			     (ret != DB_LOCK_NOTGRANTED &&
2020 			     ret != DB_LOCK_DEADLOCK))
2021 				goto pg_err;
2022 			if ((ret = __db_lget(dbc, LCK_ALWAYS, meta_pgno,
2023 			    DB_LOCK_WRITE, 0, &meta_lock)) != 0)
2024 				goto pg_err;
2025 			ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
2026 			    data_pgno, 0, DB_MPOOL_CREATE, ret);
2027 			/*
2028 			 * We can race, having read this page when it was
2029 			 * less than last_pgno but now an aborted
2030 			 * allocation can make this page beyond last_pgno
2031 			 * so we must free it. If we can't get the
2032 			 * lock on the page again, then some other
2033 			 * thread will handle the issue.
2034 			 */
2035 			if (ret != 0) {
2036 pg_err:				if (p != 0) {
2037 					ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
2038 					    data_pgno, 0, 0, t_ret);
2039 					if (t_ret == 0 &&
2040 					     PGNO(cp->page) == PGNO_INVALID) {
2041 						(void)__memp_fput(mpf,
2042 						     dbc->thread_info,
2043 						     cp->page, dbc->priority);
2044 						(void)__memp_fget(mpf,
2045 						     &data_pgno,
2046 						     dbc->thread_info, dbc->txn,
2047 						     DB_MPOOL_FREE, &cp->page);
2048 					}
2049 					(void)__LPUT(dbc, cp->lock);
2050 				}
2051 				(void)__LPUT(dbc, meta_lock);
2052 				goto err;
2053 			}
2054 			/* Check if we lost a race. */
2055 			if (PGNO(cp->page) != PGNO_INVALID) {
2056 				if ((ret = __LPUT(dbc, meta_lock)) != 0)
2057 					goto err;
2058 				goto check;
2059 			}
2060 		}
2061 
2062 		/*
2063 		 * Before creating a new page in this region, check that the
2064 		 * region page still exists.  By this point, the transaction
2065 		 * that created the region must have aborted or committed,
2066 		 * because we now hold the metadata lock.  If we can't get the
2067 		 * latch, the page must exist.
2068 		 */
2069 		ret = __memp_fget(mpf, &region_pgno,
2070 		    dbc->thread_info, NULL, DB_MPOOL_TRY, &rpage);
2071 		if (ret == DB_LOCK_NOTGRANTED)
2072 			ret = 0;
2073 		else if (ret != 0) {
2074 			/*
2075 			 * Free up the metadata lock.  If this was an error
2076 			 * other than a missing region page, bail.
2077 			 */
2078 			if ((t_ret = __LPUT(dbc, meta_lock)) != 0)
2079 				ret = t_ret;
2080 			if (ret != DB_PAGE_NOTFOUND)
2081 				goto err;
2082 			/*
2083 			 * The region no longer exists.  Release the page's lock
2084 			 * (we haven't created the page yet) and find a new page
2085 			 * on a different region.
2086 			 */
2087 			DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2088 			goto find;
2089 		} else
2090 			ret = __memp_fput(mpf,
2091 			    dbc->thread_info, rpage, dbc->priority);
2092 		rpage = NULL;
2093 		if (ret != 0)
2094 			goto meta_unlock;
2095 
2096 		if ((ret = __memp_fget(mpf, &meta_pgno,
2097 		    dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0)
2098 			goto err;
2099 
2100 		/* Log the page creation.  Can't jump to err if it fails. */
2101 		if (DBC_LOGGING(dbc))
2102 			ret = __heap_pg_alloc_log(dbp,
2103 			    dbc->txn, &LSN(meta), 0, &LSN(meta), meta_pgno,
2104 				data_pgno, (u_int32_t)P_HEAP, meta->last_pgno);
2105 		else
2106 			LSN_NOT_LOGGED(LSN(meta));
2107 
2108 		/*
2109 		 * We may have created a page earlier with a larger page number
2110 		 * check before updating the metadata page.
2111 		 */
2112 		if (ret == 0 && data_pgno > meta->last_pgno)
2113 			meta->last_pgno = data_pgno;
2114 		meta_lsn = LSN(meta);
2115 
2116 		if ((t_ret = __memp_fput(mpf,
2117 		    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
2118 			ret = t_ret;
2119 		meta = NULL;
2120 		if (ret != 0)
2121 			goto meta_unlock;
2122 
2123 		/* If the page doesn't actually exist we need to create it. */
2124 		if (cp->pgno == PGNO_INVALID) {
2125 			cp->pgno = data_pgno;
2126 			if ((ret = __memp_fget(mpf, &cp->pgno,
2127 			    dbc->thread_info, dbc->txn,
2128 			    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &cp->page)) != 0)
2129 				goto meta_unlock;
2130 			DB_ASSERT(dbp->env, cp->pgno == data_pgno);
2131 		} else if ((ret = __memp_dirty(mpf, &cp->page,
2132 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
2133 			/* Did not read the page, so we can release the lock. */
2134 			DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2135 			goto meta_unlock;
2136 		}
2137 
2138 		/* Now that we have the page we initialize it and we're done. */
2139 		P_INIT(cp->page,
2140 		    dbp->pgsize, cp->pgno, P_INVALID, P_INVALID, 0, P_HEAP);
2141 		LSN(cp->page) = meta_lsn;
2142 
2143 meta_unlock:	if ((t_ret = __TLPUT(dbc, meta_lock)) != 0 && ret == 0)
2144 			ret = t_ret;
2145 		if (ret != 0)
2146 			goto err;
2147 	} else {
2148 		/* Check whether we actually have enough space on this page. */
2149 check:		if (size + sizeof(db_indx_t) > HEAP_FREESPACE(dbp, cp->page)) {
2150 			/* Put back the page and lock, they were never used. */
2151 			DISCARD(dbc, cp->page, cp->lock, 0, ret);
2152 			if (ret != 0)
2153 				goto err;
2154 
2155 			/* Re-start the bitmap check on the next page. */
2156 			i++;
2157 			goto find;
2158 		}
2159 
2160 		if ((ret = __memp_dirty(mpf, &cp->page,
2161 		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
2162 			/* Did not read the page, so we can release the lock. */
2163 			DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2164 			goto err;
2165 		}
2166 	}
2167 
2168 	h->curpgindx = data_pgno - region_pgno - 1;
2169 err:	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2170 	if (rpage != NULL && (t_ret = __memp_fput(mpf,
2171 	    dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
2172 		ret = t_ret;
2173 
2174 	return (ret);
2175 }
2176 
2177 /*
2178  * __heap_append --
2179  *	Add an item to a heap database.
2180  *
2181  * PUBLIC: int __heap_append
2182  * PUBLIC:     __P((DBC *, DBT *, DBT *));
2183  */
2184 int
__heap_append(dbc,key,data)2185 __heap_append(dbc, key, data)
2186 	DBC *dbc;
2187 	DBT *data, *key;
2188 {
2189 	DB *dbp;
2190 	DBT tmp_dbt;
2191 	DB_HEAP_RID rid;
2192 	DB_MPOOLFILE *mpf;
2193 	HEAPPG *rpage;
2194 	HEAPHDR hdr;
2195 	HEAP_CURSOR *cp;
2196 	db_indx_t indx;
2197 	db_pgno_t region_pgno;
2198 	int ret, space, t_ret;
2199 	u_int8_t avail;
2200 	u_int32_t data_size;
2201 
2202 	dbp = dbc->dbp;
2203 	mpf = dbp->mpf;
2204 	ret = t_ret = 0;
2205 	rpage = NULL;
2206 	cp = (HEAP_CURSOR *)dbc->internal;
2207 
2208 	/* Need data.size + header size, 4-byte aligned. */
2209 	if (F_ISSET(data, DB_DBT_PARTIAL))
2210 		data_size = DB_ALIGN(data->doff +
2211 		    data->size + sizeof(HEAPHDR), sizeof(u_int32_t));
2212 	else
2213 		data_size = DB_ALIGN(
2214 		    data->size + sizeof(HEAPHDR), sizeof(u_int32_t));
2215 
2216 	if (data_size >= HEAP_MAXDATASIZE(dbp))
2217 		return (__heapc_split(dbc, key, data, 1));
2218 	else if (data_size < sizeof(HEAPSPLITHDR))
2219 		data_size = sizeof(HEAPSPLITHDR);
2220 
2221 	if ((ret = __heap_getpage(dbc, data_size, &avail)) != 0)
2222 		goto err;
2223 
2224 	indx = HEAP_FREEINDX(cp->page);
2225 	memset(&hdr, 0, sizeof(HEAPHDR));
2226 	hdr.size = data->size;
2227 	if (F_ISSET(data, DB_DBT_PARTIAL))
2228 		hdr.size += data->doff;
2229 	tmp_dbt.data = &hdr;
2230 	tmp_dbt.size = sizeof(HEAPHDR);
2231 
2232 	/* Log the write. */
2233 	if (DBC_LOGGING(dbc)) {
2234 		if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
2235 		    0, DB_ADD_HEAP, cp->pgno, (u_int32_t)indx,
2236 		    data_size, &tmp_dbt, data, &LSN(cp->page))) != 0)
2237 			goto err;
2238 	} else
2239 		LSN_NOT_LOGGED(LSN(cp->page));
2240 
2241 	if ((ret = __heap_pitem(
2242 	    dbc, (PAGE *)cp->page, indx, data_size, &tmp_dbt, data)) != 0)
2243 		goto err;
2244 
2245 	rid.pgno = cp->pgno;
2246 	rid.indx = indx;
2247 	cp->indx = indx;
2248 
2249 	/* Check whether we need to update the space bitmap. */
2250 	HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), space);
2251 
2252 	if (space != avail) {
2253 		/* Get the region page with an exclusive latch. */
2254 		region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
2255 		if ((ret = __memp_fget(mpf, &region_pgno,
2256 		    dbc->thread_info, NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
2257 			goto err;
2258 
2259 		HEAP_SETSPACE(dbp, rpage, cp->pgno - region_pgno - 1, space);
2260 	}
2261 
2262 err:	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2263 	if (rpage != NULL && (t_ret = __memp_fput(mpf,
2264 	    dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
2265 		ret = t_ret;
2266 
2267 	if (cp->page != NULL) {
2268 		DISCARD(dbc, cp->page, cp->lock, 1, t_ret);
2269 		if (ret == 0)
2270 			ret = t_ret;
2271 	}
2272 
2273 	if (ret == 0 && key != NULL)
2274 		ret = __db_retcopy(dbp->env, key,
2275 		    &rid, DB_HEAP_RID_SZ, &dbc->rkey->data, &dbc->rkey->ulen);
2276 
2277 	return (ret);
2278 }
2279 
2280 static int
__heapc_split(dbc,key,data,is_first)2281 __heapc_split(dbc, key, data, is_first)
2282 	DBC *dbc;
2283 	DBT *key, *data;
2284 	int is_first;
2285 {
2286 	DB *dbp;
2287 	DBT hdr_dbt, t_data;
2288 	DB_HEAP_RID rid;
2289 	DB_MPOOLFILE *mpf;
2290 	HEAPPG *rpage;
2291 	HEAPSPLITHDR hdrs;
2292 	HEAP_CURSOR *cp;
2293 	db_indx_t indx;
2294 	db_pgno_t region_pgno;
2295 	int ret, spacebits, t_ret;
2296 	u_int32_t buflen, doff, left, size;
2297 	u_int8_t availbits, *buf;
2298 
2299 	dbp = dbc->dbp;
2300 	mpf = dbp->mpf;
2301 	cp = (HEAP_CURSOR *)dbc->internal;
2302 	memset(&hdrs, 0, sizeof(HEAPSPLITHDR));
2303 	memset(&t_data, 0, sizeof(DBT));
2304 	hdrs.std_hdr.flags = HEAP_RECSPLIT | HEAP_RECLAST;
2305 
2306 	doff = data->doff;
2307 	rpage = NULL;
2308 	ret = t_ret = 0;
2309 	indx = 0;
2310 	buf = NULL;
2311 	buflen = 0;
2312 
2313 	/*
2314 	 * Write the record to multiple pages, in chunks starting from the end.
2315 	 * To reconstruct during a get we need the RID of the next chunk, so if
2316 	 * work our way from back to front during writing we always know the rid
2317 	 * of the "next" chunk, it's the chunk we just wrote.
2318 	 */
2319 	t_data.data = (u_int8_t *)data->data + data->size;
2320 	left = data->size;
2321 	if (F_ISSET(data, DB_DBT_PARTIAL)) {
2322 		left += data->doff;
2323 	}
2324 	hdrs.tsize = left;
2325 	while (left > 0) {
2326 		size = DB_ALIGN(left + sizeof(HEAPSPLITHDR), sizeof(u_int32_t));
2327 		if (size < sizeof(HEAPSPLITHDR))
2328 			size = sizeof(HEAPSPLITHDR);
2329 
2330 		if (size > HEAP_MAXDATASIZE(dbp))
2331 			/*
2332 			 * Data won't fit on a single page, find one at least
2333 			 * 33% free.
2334 			 */
2335 			size = DB_ALIGN(dbp->pgsize / 3, sizeof(u_int32_t));
2336 		else
2337 			hdrs.std_hdr.flags |= HEAP_RECFIRST;
2338 
2339 		if ((ret = __heap_getpage(dbc, size, &availbits)) != 0)
2340 			return (ret);
2341 
2342 		/*
2343 		 * size is the total number of bytes being written to the page.
2344 		 * The header holds the size of the data being written.
2345 		 */
2346 		if (F_ISSET(&(hdrs.std_hdr), HEAP_RECFIRST)) {
2347 			hdrs.std_hdr.size = left;
2348 			/*
2349 			 * If we're called from heapc_reloc, we are only writing
2350 			 * a piece of the full record and shouldn't set
2351 			 * HEAP_RECFIRST.
2352 			 */
2353 			if (!is_first)
2354 				F_CLR(&(hdrs.std_hdr), HEAP_RECFIRST);
2355 		} else {
2356 			/*
2357 			 * Figure out how much room is on the page.  If we will
2358 			 * have to expand the offset table, account for that.
2359 			 */
2360 			size = HEAP_FREESPACE(dbp, cp->page);
2361 			if (NUM_ENT(cp->page) == 0 ||
2362 			    HEAP_FREEINDX(cp->page) > HEAP_HIGHINDX(cp->page))
2363 				size -= sizeof(db_indx_t);
2364 			/* Round down to a multiple of 4. */
2365 			size = DB_ALIGN(
2366 			    size - sizeof(u_int32_t) + 1, sizeof(u_int32_t));
2367 			DB_ASSERT(dbp->env, size >= sizeof(HEAPSPLITHDR));
2368 			hdrs.std_hdr.size =
2369 			    (u_int16_t)(size - sizeof(HEAPSPLITHDR));
2370 		}
2371 
2372 		/*
2373 		 * t_data.data points at the end of the data left to write.  Now
2374 		 * that we know how much we're going to write to this page, we
2375 		 * can adjust the pointer to point at the start of the data to
2376 		 * be written.
2377 		 *
2378 		 * If DB_DBT_PARTIAL is set, once data->data is exhausted, we
2379 		 * have to pad with data->doff bytes (or as much as can fit on
2380 		 * this page.)  left - doff gives the number of bytes to use
2381 		 * from data->data.  Once that can't fill t_data, we have to
2382 		 * start padding.
2383 		 */
2384 		t_data.data = (u_int8_t *)(t_data.data) - hdrs.std_hdr.size;
2385 		DB_ASSERT(dbp->env, (F_ISSET(data, DB_DBT_PARTIAL) ||
2386 		    t_data.data >= data->data));
2387 		t_data.size = hdrs.std_hdr.size;
2388 		if (F_ISSET(data, DB_DBT_PARTIAL) &&
2389 		    t_data.size > left - doff) {
2390 			if (buflen < t_data.size) {
2391 				if (__os_realloc(
2392 				    dbp->env, t_data.size, &buf) != 0)
2393 					return (ENOMEM);
2394 				buflen = t_data.size;
2395 			}
2396 			/*
2397 			 * We have to figure out how much data remains.  left
2398 			 * includes doff, so we need (left - doff) bytes from
2399 			 * data.  We also need the amount of padding that can
2400 			 * fit on the page.  That's the amount we can fit on the
2401 			 * page minus the bytes we're taking from data.
2402 			*/
2403 			t_data.data = buf;
2404 			memset(buf, '\0', t_data.size - left + doff);
2405 			buf += t_data.size - left + doff;
2406 			memcpy(buf, data->data, left - doff);
2407 			doff -= t_data.size - left + doff;
2408 			buf = t_data.data;
2409 		}
2410 		hdr_dbt.data = &hdrs;
2411 		hdr_dbt.size = sizeof(HEAPSPLITHDR);
2412 		indx = HEAP_FREEINDX(cp->page);
2413 
2414 		/* Log the write. */
2415 		if (DBC_LOGGING(dbc)) {
2416 			if ((ret = __heap_addrem_log(dbp,
2417 			    dbc->txn, &LSN(cp->page), 0,
2418 			    DB_ADD_HEAP, cp->pgno, (u_int32_t)indx,
2419 			    size, &hdr_dbt, &t_data, &LSN(cp->page))) != 0)
2420 				goto err;
2421 		} else
2422 			LSN_NOT_LOGGED(LSN(cp->page));
2423 
2424 		if ((ret = __heap_pitem(dbc,
2425 		    (PAGE *)cp->page, indx, size, &hdr_dbt, &t_data)) != 0)
2426 			goto err;
2427 		F_CLR(&(hdrs.std_hdr), HEAP_RECLAST);
2428 		left -= hdrs.std_hdr.size;
2429 
2430 		/*
2431 		 * Save the rid where we just wrote, this is the "next"
2432 		 * chunk.
2433 		 */
2434 		hdrs.nextpg = cp->pgno;
2435 		hdrs.nextindx = indx;
2436 
2437 		/* Check whether we need to update the space bitmap. */
2438 		HEAP_CALCSPACEBITS(dbp,
2439 		    HEAP_FREESPACE(dbp, cp->page), spacebits);
2440 
2441 		if (spacebits != availbits) {
2442 			/* Get the region page with an exclusive latch. */
2443 			region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
2444 			if ((ret = __memp_fget(mpf, &region_pgno,
2445 			    dbc->thread_info,
2446 			    NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
2447 				goto err;
2448 
2449 			HEAP_SETSPACE(dbp,
2450 			    rpage, cp->pgno - region_pgno - 1, spacebits);
2451 			ret = __memp_fput(mpf,
2452 			    dbc->thread_info, rpage, dbc->priority);
2453 			rpage = NULL;
2454 			if (ret != 0)
2455 				goto err;
2456 		}
2457 
2458 	}
2459 
2460 	rid.pgno = cp->pgno;
2461 	rid.indx = indx;
2462 	cp->indx = indx;
2463 
2464 err:	if (rpage != NULL && (t_ret = __memp_fput(mpf,
2465 	    dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
2466 		ret = t_ret;
2467 	if (cp->page != NULL) {
2468 		DISCARD(dbc, cp->page, cp->lock, 1, t_ret);
2469 		if (ret == 0)
2470 			ret = t_ret;
2471 	}
2472 	if (buf != NULL)
2473 		__os_free(dbp->env, buf);
2474 
2475 	if (ret == 0 && key != NULL)
2476 		ret = __db_retcopy(dbp->env, key,
2477 		    &rid, DB_HEAP_RID_SZ, &dbc->rkey->data, &dbc->rkey->ulen);
2478 	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2479 	return (ret);
2480 }
2481 
2482 /*
2483  * __heapc_pitem --
2484  *	Put an item on a heap page.  Copy all bytes from the header (if any)
2485  *	first and then copy from data.
2486  *
2487  * PUBLIC: int __heap_pitem __P((DBC *,
2488  * PUBLIC:	PAGE *, u_int32_t, u_int32_t, DBT *, DBT *));
2489  */
2490 int
__heap_pitem(dbc,pagep,indx,nbytes,hdr,data)2491 __heap_pitem(dbc, pagep, indx, nbytes, hdr, data)
2492 	DBC *dbc;
2493 	PAGE *pagep;
2494 	u_int32_t indx;
2495 	u_int32_t nbytes;
2496 	DBT *hdr, *data;
2497 {
2498 	DB *dbp;
2499 	u_int8_t *buf;
2500 
2501 	dbp = dbc->dbp;
2502 
2503 	DB_ASSERT(dbp->env, TYPE(pagep) == P_HEAP);
2504 	DB_ASSERT(dbp->env, IS_DIRTY(pagep));
2505 	DB_ASSERT(dbp->env, nbytes == DB_ALIGN(nbytes, sizeof(u_int32_t)));
2506 	DB_ASSERT(dbp->env, DB_ALIGN(((HEAPHDR *)hdr->data)->size,
2507 	    sizeof(u_int32_t)) >= data->size);
2508 	DB_ASSERT(dbp->env, nbytes >= hdr->size + data->size);
2509 
2510 	/*
2511 	 * We're writing data either as a result of DB->put or as a result of
2512 	 * undo-ing a delete.	If we're undo-ing a delete we just need to write
2513 	 * the bytes from hdr to the page.  Otherwise, we need to construct a
2514 	 * heap header, etc.
2515 	 */
2516 	HEAP_OFFSETTBL(dbp, pagep)[indx] = HOFFSET(pagep) - nbytes;
2517 	buf = P_ENTRY(dbp, pagep, indx);
2518 	DB_ASSERT(dbp->env, buf > (u_int8_t*)&HEAP_OFFSETTBL(dbp, pagep)[indx]);
2519 
2520 	if (hdr != NULL) {
2521 		memcpy(buf, hdr->data, hdr->size);
2522 		buf += hdr->size;
2523 	}
2524 	if (F_ISSET(data, DB_DBT_PARTIAL)) {
2525 		memset(buf, 0, data->doff);
2526 		buf += data->doff;
2527 	}
2528 	memcpy(buf, data->data, data->size);
2529 
2530 	/*
2531 	 * Update data page header.  If DEBUG/DIAGNOSTIC is set, the page might
2532 	 * be filled with 0xdb, so we can't just look for a 0 in the offset
2533 	 * table.  We used the first available index, so start there and scan
2534 	 * forward.  If the table is full, the first available index is the
2535 	 * highest index plus one.
2536 	 */
2537 	if (indx > HEAP_HIGHINDX(pagep)) {
2538 		if (NUM_ENT(pagep) == 0)
2539 			HEAP_FREEINDX(pagep) = 0;
2540 		else if (HEAP_FREEINDX(pagep) >= indx) {
2541 			if (indx > (u_int32_t)HEAP_HIGHINDX(pagep) + 1)
2542 				HEAP_FREEINDX(pagep) = HEAP_HIGHINDX(pagep) + 1;
2543 			else
2544 				HEAP_FREEINDX(pagep) = indx + 1;
2545 		}
2546 		while (++HEAP_HIGHINDX(pagep) < indx)
2547 			HEAP_OFFSETTBL(dbp,pagep)[HEAP_HIGHINDX(pagep)] = 0;
2548 	} else {
2549 		for (; indx <= HEAP_HIGHINDX(pagep); indx++)
2550 			if (HEAP_OFFSETTBL(dbp, pagep)[indx] == 0)
2551 				break;
2552 		HEAP_FREEINDX(pagep) = indx;
2553 	}
2554 	HOFFSET(pagep) -= nbytes;
2555 	NUM_ENT(pagep)++;
2556 
2557 	return (0);
2558 }
2559 
2560 /*
2561  * __heapc_dup --
2562  *      Duplicate a heap cursor, such that the new one holds appropriate
2563  *      locks for the position of the original.
2564  *
2565  * PUBLIC: int __heapc_dup __P((DBC *, DBC *));
2566  */
2567 int
__heapc_dup(orig_dbc,new_dbc)2568 __heapc_dup(orig_dbc, new_dbc)
2569 	DBC *orig_dbc, *new_dbc;
2570 {
2571 	HEAP_CURSOR *orig, *new;
2572 
2573 	orig = (HEAP_CURSOR *)orig_dbc->internal;
2574 	new = (HEAP_CURSOR *)new_dbc->internal;
2575 	new->flags = orig->flags;
2576 	return (0);
2577 }
2578 
2579 /*
2580  * __heapc_gsplit --
2581  *      Get a heap split record.  The page pointed to by the cursor must
2582  *	be the first segment of this record.
2583  *
2584  * PUBLIC: int __heapc_gsplit __P((DBC *,
2585  * PUBLIC:     DBT *, void **, u_int32_t *));
2586  */
2587 int
__heapc_gsplit(dbc,dbt,bpp,bpsz)2588 __heapc_gsplit(dbc, dbt, bpp, bpsz)
2589 	DBC *dbc;
2590 	DBT *dbt;
2591 	void **bpp;
2592 	u_int32_t *bpsz;
2593 {
2594 	DB *dbp;
2595 	DB_MPOOLFILE *mpf;
2596 	DB_HEAP_RID rid;
2597 	DB_LOCK data_lock;
2598 	HEAP_CURSOR *cp;
2599 	ENV *env;
2600 	HEAPPG *dpage;
2601 	HEAPSPLITHDR *hdr;
2602 	db_indx_t bytes;
2603 	u_int32_t curoff, needed, start, tlen;
2604 	u_int8_t *p, *src;
2605 	int putpage, ret, t_ret;
2606 
2607 	LOCK_INIT(data_lock);
2608 	dbp = dbc->dbp;
2609 	env = dbp->env;
2610 	mpf = dbp->mpf;
2611 	cp = (HEAP_CURSOR *)dbc->internal;
2612 	putpage = FALSE;
2613 	ret = 0;
2614 
2615 	/*
2616 	 * We should have first page, locked already in cursor.  Get the
2617 	 * record id out of the cursor and set up local variables.
2618 	 */
2619 	DB_ASSERT(env, cp->page != NULL);
2620 	rid.pgno = cp->pgno;
2621 	rid.indx = cp->indx;
2622 	dpage = cp->page;
2623 	hdr = (HEAPSPLITHDR *)P_ENTRY(dbp, dpage, rid.indx);
2624 	DB_ASSERT(env, hdr->tsize != 0);
2625 	tlen = hdr->tsize;
2626 
2627 	/*
2628 	 * If we doing a partial retrieval, figure out how much we are
2629 	 * actually going to get.
2630 	 */
2631 	if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
2632 		start = dbt->doff;
2633 		if (start > tlen)
2634 			needed = 0;
2635 		else if (dbt->dlen > tlen - start)
2636 			needed = tlen - start;
2637 		else
2638 			needed = dbt->dlen;
2639 	} else {
2640 		start = 0;
2641 		needed = tlen;
2642 	}
2643 
2644 	/*
2645 	 * If the caller has not requested any data, return success. This
2646 	 * "early-out" also avoids setting up the streaming optimization when
2647 	 * no page would be retrieved. If it were removed, the streaming code
2648 	 * should only initialize when needed is not 0.
2649 	 */
2650 	if (needed == 0) {
2651 		dbt->size = 0;
2652 		return (0);
2653 	}
2654 
2655 	/*
2656 	 * Check if the buffer is big enough; if it is not and we are
2657 	 * allowed to malloc space, then we'll malloc it.  If we are
2658 	 * not (DB_DBT_USERMEM), then we'll set the dbt and return
2659 	 * appropriately.
2660 	 */
2661 	if (F_ISSET(dbt, DB_DBT_USERCOPY))
2662 		goto skip_alloc;
2663 
2664 	/* Allocate any necessary memory. */
2665 	if (F_ISSET(dbt, DB_DBT_USERMEM)) {
2666 		if (needed > dbt->ulen) {
2667 			dbt->size = needed;
2668 			return (DB_BUFFER_SMALL);
2669 		}
2670 	} else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
2671 		if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
2672 			return (ret);
2673 	} else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
2674 		if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
2675 			return (ret);
2676 	} else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
2677 		if ((ret = __os_realloc(env, needed, bpp)) != 0)
2678 			return (ret);
2679 		*bpsz = needed;
2680 		dbt->data = *bpp;
2681 	} else if (bpp != NULL)
2682 		dbt->data = *bpp;
2683 	else {
2684 		DB_ASSERT(env,
2685 		    F_ISSET(dbt,
2686 		    DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
2687 		    bpsz != NULL || bpp != NULL);
2688 		return (DB_BUFFER_SMALL);
2689 	}
2690 
2691 skip_alloc:
2692 	/*
2693 	 * Go through each of the pieces, copying the data on each one
2694 	 * into the buffer.  Never copy more than the total data length.
2695 	 * We are starting off with the page that is currently pointed to by
2696 	 * the cursor,
2697 	 */
2698 	curoff = 0;
2699 	dbt->size = needed;
2700 	for (p = dbt->data; needed > 0;) {
2701 		/* Check if we need any bytes from this page */
2702 		if (curoff + hdr->std_hdr.size >= start) {
2703 			bytes = hdr->std_hdr.size;
2704 			src = (u_int8_t *)hdr +
2705 			    P_TO_UINT16(sizeof(HEAPSPLITHDR));
2706 			if (start > curoff) {
2707 				src += start - curoff;
2708 				bytes -= start - curoff;
2709 			}
2710 			if (bytes > needed)
2711 				bytes = needed;
2712 			if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
2713 				/*
2714 				 * The offset into the DBT is the total size
2715 				 * less the amount of data still needed.  Care
2716 				 * needs to be taken if doing a partial copy
2717 				 * beginning at an offset other than 0.
2718 				 */
2719 				if ((ret = env->dbt_usercopy(
2720 				    dbt, dbt->size - needed,
2721 				    src, bytes, DB_USERCOPY_SETDATA)) != 0) {
2722 					if (putpage)
2723 						(void)__memp_fput(
2724 						    mpf, dbc->thread_info,
2725 						    dpage, dbp->priority);
2726 
2727 					return (ret);
2728 				}
2729 			} else
2730 				memcpy(p, src, bytes);
2731 			p += bytes;
2732 			needed -= bytes;
2733 		}
2734 		curoff += hdr->std_hdr.size;
2735 
2736 		/* Find next record piece as long as it exists */
2737 		if (!F_ISSET((HEAPHDR *)hdr, HEAP_RECLAST)) {
2738 			rid.pgno = hdr->nextpg;
2739 			rid.indx = hdr->nextindx;
2740 
2741 			/*
2742 			 * First pass through here, we are using the
2743 			 * page pointed to by the cursor, and this page
2744 			 * will get put when the cursor is is closed.
2745 			 * Only pages specifically gotten in this loop
2746 			 * need to be put back.
2747 			 */
2748 			if (putpage) {
2749 				if ((ret = __memp_fput(mpf, dbc->thread_info,
2750 				    dpage, dbp->priority) ) != 0)
2751 					goto err;
2752 				dpage = NULL;
2753 				if ((ret = __TLPUT(dbc, data_lock)) != 0)
2754 					goto err;
2755 			}
2756 
2757 			if ((ret = __db_lget(dbc, 0, rid.pgno,
2758 			    DB_LOCK_READ, 0, &data_lock)) != 0)
2759 				goto err;
2760 			if ((ret = __memp_fget(mpf, &rid.pgno,
2761 			    dbc->thread_info, dbc->txn, 0, &dpage)) != 0)
2762 				goto err;
2763 			hdr = (HEAPSPLITHDR *)P_ENTRY(dbp, dpage, rid.indx);
2764 			putpage = TRUE;
2765 
2766 			/*
2767 			 * If we have the last piece of this record and we're
2768 			 * reading the entire record, then what we need should
2769 			 * equal what is remaining.
2770 			 */
2771 			if (F_ISSET((HEAPHDR *)hdr, HEAP_RECLAST) &&
2772 			    !F_ISSET(dbt, DB_DBT_PARTIAL) &&
2773 			    (hdr->std_hdr.size != needed)) {
2774 				__db_errx(env, DB_STR_A("1167",
2775 			     "Incorrect record size in header: %s: rid %lu.%lu",
2776 				    "%s %lu %lu"), dbc->dbp->fname,
2777 				    (u_long)(cp->pgno), (u_long)(cp->indx));
2778 				ret = __env_panic(env, DB_RUNRECOVERY);
2779 				goto err;
2780 			}
2781 		}
2782 	}
2783 
2784 err:	DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2785 	if (putpage && dpage != NULL && (t_ret = __memp_fput(mpf,
2786 	    dbc->thread_info, dpage, dbp->priority)) != 0 && ret == 0)
2787 		ret = t_ret;
2788 	if ((t_ret = __TLPUT(dbc, data_lock)) != 0 && ret == 0)
2789 		ret = t_ret;
2790 
2791 	return (ret);
2792 }
2793 /*
2794  * __heapc_refresh --
2795  *      do the proper set up for cursor reuse.
2796  *
2797  * PUBLIC: int __heapc_refresh __P((DBC *));
2798  */
2799 int
__heapc_refresh(dbc)2800 __heapc_refresh(dbc)
2801 	DBC *dbc;
2802 {
2803 	HEAP_CURSOR *cp;
2804 
2805 	cp = (HEAP_CURSOR *)dbc->internal;
2806 
2807 	LOCK_INIT(cp->lock);
2808 	cp->lock_mode = DB_LOCK_NG;
2809 	cp->flags = 0;
2810 
2811 	return (0);
2812 }
2813