1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2010, 2013 Oracle and/or its affiliates. All rights reserved.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/heap.h"
15 #include "dbinc/lock.h"
16 #include "dbinc/mp.h"
17
18 static int __heap_bulk __P((DBC *, DBT *, u_int32_t));
19 static int __heap_getpage __P((DBC *, u_int32_t, u_int8_t *));
20 static int __heapc_close __P((DBC *, db_pgno_t, int *));
21 static int __heapc_del __P((DBC *, u_int32_t));
22 static int __heapc_destroy __P((DBC *));
23 static int __heapc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
24 static int __heapc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
25 static int __heapc_reloc __P((DBC *, DBT *, DBT *));
26 static int __heapc_reloc_partial __P((DBC *, DBT *, DBT *));
27 static int __heapc_split __P((DBC *, DBT *, DBT *, int));
28
29 /*
30 * Acquire a new page/lock. If we are already holding a page and a lock
31 * we discard those and get the new ones. In this case we can use
32 * LCK_COUPLE to save trips to lock manager. If we are not holding a page or
33 * locks, we just get a new lock and page. Lock release done with a
34 * transactional lock put.
35 */
36 #undef ACQUIRE
37 #define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, flags, mflags, ret) do { \
38 DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \
39 if ((pagep) != NULL) { \
40 ret = __memp_fput(__mpf, \
41 (dbc)->thread_info, pagep, dbc->priority); \
42 pagep = NULL; \
43 } \
44 if ((ret) == 0 && STD_LOCKING(dbc)) \
45 ret = __db_lget(dbc, \
46 LOCK_ISSET(lock) ? LCK_COUPLE : 0, \
47 lpgno, mode, flags, &(lock)); \
48 if ((ret) == 0) \
49 ret = __memp_fget(__mpf, &(fpgno), \
50 (dbc)->thread_info, (dbc)->txn, mflags, &(pagep)); \
51 } while (0)
52
53 /* Acquire a new page/lock for a heap cursor */
54 #undef ACQUIRE_CUR
55 #define ACQUIRE_CUR(dbc, mode, p, flags, mflags, ret) do { \
56 HEAP_CURSOR *__cp = (HEAP_CURSOR *)(dbc)->internal; \
57 if (p != __cp->pgno) \
58 __cp->pgno = PGNO_INVALID; \
59 ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, flags, mflags, ret); \
60 if ((ret) == 0) { \
61 __cp->pgno = p; \
62 __cp->lock_mode = (mode); \
63 } \
64 } while (0)
65
66 /* Discard the current page/lock for a cursor, indicate txn lock release */
67 #undef DISCARD
68 #define DISCARD(dbc, pagep, lock, tlock, ret) do { \
69 DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \
70 int __t_ret; \
71 __t_ret = 0; \
72 if ((pagep) != NULL) { \
73 __t_ret = __memp_fput(__mpf, \
74 (dbc)->thread_info, pagep, dbc->priority); \
75 pagep = NULL; \
76 } \
77 if (__t_ret != 0 && (ret) == 0) \
78 ret = __t_ret; \
79 if (tlock == 1) \
80 __t_ret = __TLPUT((dbc), lock); \
81 else \
82 __t_ret = __LPUT((dbc), lock); \
83 if (__t_ret != 0 && (ret) == 0) \
84 ret = __t_ret; \
85 } while (0)
86
87 /*
88 * __heapc_init --
89 * Initialize the access private portion of a cursor
90 *
91 * PUBLIC: int __heapc_init __P((DBC *));
92 */
93 int
__heapc_init(dbc)94 __heapc_init(dbc)
95 DBC *dbc;
96 {
97 ENV *env;
98 int ret;
99
100 env = dbc->env;
101
102 if (dbc->internal == NULL)
103 if ((ret = __os_calloc(
104 env, 1, sizeof(HEAP_CURSOR), &dbc->internal)) != 0)
105 return (ret);
106
107 /* Initialize methods. */
108 dbc->close = dbc->c_close = __dbc_close_pp;
109 dbc->cmp = __dbc_cmp_pp;
110 dbc->count = dbc->c_count = __dbc_count_pp;
111 dbc->del = dbc->c_del = __dbc_del_pp;
112 dbc->dup = dbc->c_dup = __dbc_dup_pp;
113 dbc->get = dbc->c_get = __dbc_get_pp;
114 dbc->pget = dbc->c_pget = __dbc_pget_pp;
115 dbc->put = dbc->c_put = __dbc_put_pp;
116 dbc->am_bulk = __heap_bulk;
117 dbc->am_close = __heapc_close;
118 dbc->am_del = __heapc_del;
119 dbc->am_destroy = __heapc_destroy;
120 dbc->am_get = __heapc_get;
121 dbc->am_put = __heapc_put;
122 dbc->am_writelock = NULL;
123
124 return (0);
125 }
126
127 static int
__heap_bulk(dbc,data,flags)128 __heap_bulk(dbc, data, flags)
129 DBC *dbc;
130 DBT *data;
131 u_int32_t flags;
132 {
133 DB *dbp;
134 DB_HEAP_RID prev_rid, rid;
135 DBT sdata;
136 HEAP_CURSOR *cp;
137 HEAPHDR *hdr;
138 HEAPSPLITHDR *shdr;
139 PAGE *pg;
140 db_lockmode_t lock_type;
141 int is_key, ret;
142 int32_t *offp;
143 u_int32_t data_size, key_size, needed, space;
144 u_int8_t *dbuf, *np;
145
146 ret = 0;
147 dbp = dbc->dbp;
148 cp = (HEAP_CURSOR *)dbc->internal;
149 hdr = NULL;
150 shdr = NULL;
151
152 /* Check for additional bits for locking */
153 if (F_ISSET(dbc, DBC_RMW))
154 lock_type = DB_LOCK_WRITE;
155 else
156 lock_type = DB_LOCK_READ;
157
158 /*
159 * np is the next place to copy things into the buffer.
160 * dbuf always stays at the beginning of the buffer.
161 */
162 dbuf = data->data;
163 np = dbuf;
164
165 /* Keep track of space that is left. There is a termination entry */
166 space = data->ulen;
167 space -= sizeof(*offp);
168
169 /* Build the offset/size table from the end up. */
170 offp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
171 offp--;
172
173 /*
174 * key_size and data_size hold the 32-bit aligned size of the key and
175 * data values written to the buffer.
176 */
177 key_size = DB_ALIGN(DB_HEAP_RID_SZ, sizeof(u_int32_t));
178 data_size = 0;
179
180 /* is_key indicates whether keys are returned. */
181 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
182
183 next_pg:
184 rid.indx = cp->indx;
185 rid.pgno = cp->pgno;
186 pg = cp->page;
187
188 /*
189 * Write records to the buffer, in the format needed by the DB_MULTIPLE
190 * macros. For a description of the data layout, see db.h.
191 */
192 do {
193 if (HEAP_OFFSETTBL(dbp, pg)[rid.indx] == 0)
194 continue;
195 hdr = (HEAPHDR *)P_ENTRY(dbp, pg, rid.indx);
196 /*
197 * If this is a split record and not the first piece of the
198 * record, skip it.
199 */
200 if (F_ISSET(hdr, HEAP_RECSPLIT) &&
201 !F_ISSET(hdr, HEAP_RECFIRST))
202 continue;
203
204 /*
205 * Calculate how much space is needed to add this record. If
206 * there's not enough, we're done. If we haven't written any
207 * data to the buffer, or if we are doing a DBP->get, return
208 * DB_BUFFER_SMALL.
209 */
210 needed = 0;
211 if (is_key)
212 needed = 2 * sizeof(*offp) + key_size;
213 if (F_ISSET(hdr, HEAP_RECSPLIT)) {
214 shdr = (HEAPSPLITHDR *)hdr;
215 data_size = DB_ALIGN(shdr->tsize, sizeof(u_int32_t));
216 } else
217 data_size = DB_ALIGN(hdr->size, sizeof(u_int32_t));
218 needed += 2 * sizeof(*offp) + data_size;
219
220 if (needed > space) {
221 if (np == dbuf || F_ISSET(dbc, DBC_FROM_DB_GET)) {
222 data->size = (u_int32_t)DB_ALIGN(
223 needed + data->ulen - space, 1024);
224 return (DB_BUFFER_SMALL);
225 }
226 break;
227 }
228
229 if (is_key) {
230 memcpy(np, &rid, key_size);
231 *offp-- = (int32_t)(np - dbuf);
232 *offp-- = (int32_t)DB_HEAP_RID_SZ;
233 np += key_size;
234 }
235
236 if (F_ISSET(hdr, HEAP_RECSPLIT)) {
237 /*
238 * Use __heapc_gsplit to write a split record to the
239 * return buffer. gsplit will return any fetched pages
240 * to the cache, but will leave the cursor's current
241 * page alone.
242 */
243 memset(&sdata, 0, sizeof(DBT));
244 sdata.data = np;
245 sdata.size = sdata.ulen = shdr->tsize;
246 sdata.flags = DB_DBT_USERMEM;
247 /* gsplit expects the cursor to be positioned. */
248 cp->pgno = rid.pgno;
249 cp->indx = rid.indx;
250 if ((ret = __heapc_gsplit(
251 dbc, &sdata, NULL, NULL)) != 0)
252 return (ret);
253 } else {
254 memcpy(np,
255 (u_int8_t *)hdr + sizeof(HEAPHDR), hdr->size);
256 }
257 *offp-- = (int32_t)(np - dbuf);
258 if (F_ISSET(hdr, HEAP_RECSPLIT))
259 *offp-- = (int32_t)shdr->tsize;
260 else
261 *offp-- = (int32_t)hdr->size;
262 np += data_size;
263 space -= needed;
264 prev_rid = rid;
265
266 /*
267 * The data and "metadata" ends of the buffer should never
268 * overlap.
269 */
270 DB_ASSERT(dbp->env, (void *)np <= (void *)offp);
271 } while (++rid.indx < NUM_ENT(pg));
272
273 /* If we are off the page then try the next page. */
274 if (rid.indx >= NUM_ENT(pg)) {
275 rid.pgno++;
276 ACQUIRE_CUR(dbc, lock_type, rid.pgno, 0, 0, ret);
277 if (ret == 0) {
278 cp->indx = 0;
279 goto next_pg;
280 } else if (ret != DB_PAGE_NOTFOUND)
281 return (ret);
282 }
283
284 DB_ASSERT(dbp->env, (ret == 0 || ret == DB_PAGE_NOTFOUND));
285 cp->indx = prev_rid.indx;
286 cp->pgno = prev_rid.pgno;
287
288 *offp = -1;
289
290 return (0);
291 }
292
293 static int
__heapc_close(dbc,root_pgno,rmroot)294 __heapc_close(dbc, root_pgno, rmroot)
295 DBC *dbc;
296 db_pgno_t root_pgno;
297 int *rmroot;
298 {
299 DB_MPOOLFILE *mpf;
300 HEAP_CURSOR *cp;
301 int ret;
302
303 COMPQUIET(root_pgno, 0);
304 COMPQUIET(rmroot, 0);
305
306 cp = (HEAP_CURSOR *)dbc->internal;
307 mpf = dbc->dbp->mpf;
308 ret = 0;
309
310 /* Release the page/lock held by the cursor. */
311 DISCARD(dbc, cp->page, cp->lock, 1, ret);
312 if (ret == 0 && !LOCK_ISSET(cp->lock))
313 cp->lock_mode = DB_LOCK_NG;
314
315 return (ret);
316 }
317
318 static int
__heapc_del(dbc,flags)319 __heapc_del(dbc, flags)
320 DBC *dbc;
321 u_int32_t flags;
322 {
323 DB *dbp;
324 DB_HEAP_RID next_rid, orig_rid;
325 DB_MPOOLFILE *mpf;
326 DBT hdr_dbt, log_dbt;
327 HEAP *h;
328 HEAPHDR *hdr;
329 HEAPPG *rpage;
330 HEAP_CURSOR *cp;
331 db_pgno_t region_pgno;
332 int oldspacebits, ret, spacebits, t_ret;
333 u_int16_t data_size, size;
334
335 dbp = dbc->dbp;
336 mpf = dbp->mpf;
337 h = dbp->heap_internal;
338 cp = (HEAP_CURSOR *)dbc->internal;
339 rpage = NULL;
340 COMPQUIET(flags, 0);
341
342 /*
343 * We need to be able to reset the cursor after deleting a record split
344 * across multiple pages.
345 */
346 orig_rid.pgno = cp->pgno;
347 orig_rid.indx = cp->indx;
348
349 /*
350 * This code is always called with a page lock but no page.
351 */
352 DB_ASSERT(dbp->env, cp->page == NULL);
353
354 /* We have a read lock, but need a write lock. */
355 start: if (STD_LOCKING(dbc) && (ret = __db_lget(dbc,
356 LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
357 return (ret);
358
359 if ((ret = __memp_fget(mpf, &cp->pgno,
360 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0)
361 return (ret);
362
363 HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), oldspacebits);
364
365 hdr = (HEAPHDR *)P_ENTRY(dbp, cp->page, cp->indx);
366 data_size = DB_ALIGN(hdr->size, sizeof(u_int32_t));
367 size = data_size + HEAP_HDRSIZE(hdr);
368 if (size < sizeof(HEAPSPLITHDR))
369 size = sizeof(HEAPSPLITHDR);
370 if (F_ISSET(hdr, HEAP_RECSPLIT) && !F_ISSET(hdr, HEAP_RECLAST)) {
371 next_rid.pgno = F_ISSET(hdr, HEAP_RECLAST) ?
372 PGNO_INVALID : ((HEAPSPLITHDR *)hdr)->nextpg;
373 next_rid.indx = F_ISSET(hdr, HEAP_RECLAST) ?
374 PGNO_INVALID : ((HEAPSPLITHDR *)hdr)->nextindx;
375 } else {
376 next_rid.pgno = PGNO_INVALID;
377 next_rid.indx = 0;
378 }
379
380 /* Log the deletion. */
381 if (DBC_LOGGING(dbc)) {
382 hdr_dbt.data = hdr;
383 hdr_dbt.size = HEAP_HDRSIZE(hdr);
384 log_dbt.data = (u_int8_t *)hdr + hdr_dbt.size;
385 log_dbt.size = data_size;
386 if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
387 0, DB_REM_HEAP, cp->pgno, (u_int32_t)cp->indx,
388 size, &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
389 goto err;
390 } else
391 LSN_NOT_LOGGED(LSN(cp->page));
392
393 if ((ret = __heap_ditem(dbc, cp->page, cp->indx, size)) != 0)
394 goto err;
395
396 /*
397 * If the deleted item lived in a region prior to our current, back up
398 * the current region, giving us a chance to reuse the newly available
399 * space on the next insert.
400 */
401 region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
402 if (region_pgno < h->curregion)
403 h->curregion = region_pgno;
404
405 HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), spacebits);
406
407 if (spacebits != oldspacebits) {
408 /*
409 * Get the region page. We never lock the region page, the data
410 * page lock locks the corresponding bits in the bitmap and
411 * latching serializes access.
412 */
413 if ((ret = __memp_fget(mpf, ®ion_pgno,
414 dbc->thread_info, NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
415 goto err;
416 HEAP_SETSPACE(dbp, rpage,
417 cp->pgno - region_pgno - 1, spacebits);
418 }
419
420 err: DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
421 if (rpage != NULL && (t_ret = __memp_fput(mpf,
422 dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
423 ret = t_ret;
424 rpage = NULL;
425
426 if ((t_ret = __memp_fput(mpf,
427 dbc->thread_info, cp->page, dbc->priority)) != 0 && ret == 0)
428 ret = t_ret;
429
430 cp->page = NULL;
431
432 if (ret == 0 && next_rid.pgno != PGNO_INVALID) {
433 cp->pgno = next_rid.pgno;
434 cp->indx = next_rid.indx;
435 goto start;
436 }
437
438 cp->pgno = orig_rid.pgno;
439 cp->indx = orig_rid.indx;
440
441 return (ret);
442 }
443
444 /*
445 * __heap_ditem --
446 * Remove an item from a page.
447 *
448 * PUBLIC: int __heap_ditem
449 * PUBLIC: __P((DBC *, PAGE *, u_int32_t, u_int32_t));
450 */
451 int
__heap_ditem(dbc,pagep,indx,nbytes)452 __heap_ditem(dbc, pagep, indx, nbytes)
453 DBC *dbc;
454 PAGE *pagep;
455 u_int32_t indx, nbytes;
456 {
457 DB *dbp;
458 db_indx_t first, i, max, off, *offtbl, span;
459 u_int8_t *src, *dest;
460
461 dbp = dbc->dbp;
462
463 DB_ASSERT(dbp->env, TYPE(pagep) == P_HEAP);
464 DB_ASSERT(dbp->env, nbytes == DB_ALIGN(nbytes, sizeof(u_int32_t)));
465 DB_ASSERT(dbp->env, nbytes >= sizeof(HEAPSPLITHDR));
466
467 offtbl = (db_indx_t *)HEAP_OFFSETTBL(dbp, pagep);
468 off = offtbl[indx];
469 /*
470 * Find the lowest offset on the page, and adjust offsets that are about
471 * to be moved. If the deleted item is the lowest offset on the page,
472
473 * everything will work, that is not a special case.
474 */
475 max = HEAP_HIGHINDX(pagep);
476 first = HOFFSET(pagep);
477 for (i = 0; i <= max; i++) {
478 if (offtbl[i] < off && offtbl[i] != 0)
479 offtbl[i] += nbytes;
480 }
481 offtbl[indx] = 0;
482
483 /*
484 * Coalesce free space at the beginning of the page. Shift all the data
485 * preceding the deleted entry down, overwriting the deleted entry.
486 */
487 src = (u_int8_t *)(pagep) + first;
488 dest = src + nbytes;
489 span = off - first;
490 memmove(dest, src, span);
491 #ifdef DIAGNOSTIC
492 memset(src, CLEAR_BYTE, nbytes);
493 #endif
494
495 /* Update the page's metadata. */
496 NUM_ENT(pagep)--;
497 HOFFSET(pagep) += nbytes;
498 if (indx < HEAP_FREEINDX(pagep))
499 HEAP_FREEINDX(pagep) = indx;
500 while (HEAP_HIGHINDX(pagep) > 0 && offtbl[HEAP_HIGHINDX(pagep)] == 0)
501 HEAP_HIGHINDX(pagep)--;
502 if (NUM_ENT(pagep) == 0)
503 HEAP_FREEINDX(pagep) = 0;
504 else if (HEAP_FREEINDX(pagep) > HEAP_HIGHINDX(pagep) + 1)
505 HEAP_FREEINDX(pagep) = HEAP_HIGHINDX(pagep) + 1;
506
507 return (0);
508 }
509
510 static int
__heapc_destroy(dbc)511 __heapc_destroy(dbc)
512 DBC *dbc;
513 {
514 HEAP_CURSOR *cp;
515
516 cp = (HEAP_CURSOR *)dbc->internal;
517 __os_free(dbc->env, cp);
518 dbc->internal = NULL;
519
520 return (0);
521 }
522
523 /*
524 * __heapc_get --
525 * Get using a cursor (heap).
526 */
527 static int
__heapc_get(dbc,key,data,flags,pgnop)528 __heapc_get(dbc, key, data, flags, pgnop)
529 DBC *dbc;
530 DBT *key;
531 DBT *data;
532 u_int32_t flags;
533 db_pgno_t *pgnop;
534 {
535 DB *dbp;
536 DB_HEAP_RID rid;
537 DB_MPOOLFILE *mpf;
538 DB_LOCK meta_lock;
539 DBT tmp_val;
540 HEAP *h;
541 HEAPHDR *hdr;
542 HEAPMETA *meta;
543 HEAPPG *dpage;
544 HEAP_CURSOR *cp;
545 db_lockmode_t lock_type;
546 db_pgno_t pgno;
547 int cmp, f_indx, found, getpage, indx, ret;
548
549 dbp = dbc->dbp;
550 mpf = dbp->mpf;
551 h = dbp->heap_internal;
552 cp = (HEAP_CURSOR *)dbc->internal;
553 LOCK_INIT(meta_lock);
554 COMPQUIET(pgnop, NULL);
555
556 if (F_ISSET(key, DB_DBT_USERMEM) && key->ulen < DB_HEAP_RID_SZ) {
557 key->size = DB_HEAP_RID_SZ;
558 return (DB_BUFFER_SMALL);
559 }
560
561 /* Check for additional bits for locking */
562 if (F_ISSET(dbc, DBC_RMW))
563 lock_type = DB_LOCK_WRITE;
564 else
565 lock_type = DB_LOCK_READ;
566
567 ret = 0;
568 found = getpage = FALSE;
569 meta = NULL;
570 dpage = NULL;
571 switch (flags) {
572 case DB_CURRENT:
573
574 /*
575 * Acquire the current page with read lock unless user
576 * has asked for a write lock. Ensure page and record
577 * exist still.
578 */
579 ACQUIRE_CUR(dbc, lock_type, cp->pgno, 0, 0, ret);
580 if (ret != 0) {
581 if (ret == DB_PAGE_NOTFOUND)
582 ret = DB_NOTFOUND;
583 goto err;
584 }
585
586 if (HEAP_OFFSETTBL(dbp, cp->page)[cp->indx] == 0) {
587 ret = DB_NOTFOUND;
588 goto err;
589 }
590 dpage = (HEAPPG *)cp->page;
591 hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, cp->indx);
592 if (F_ISSET(hdr, HEAP_RECSPLIT) &&
593 !F_ISSET(hdr, HEAP_RECFIRST)) {
594 ret = DB_NOTFOUND;
595 goto err;
596 }
597
598 break;
599 case DB_FIRST:
600 /*
601 * The region pages do not distinguish between an empty
602 * page and page with a something on it. So, we will
603 * grab the first possible data page and look for the
604 * lowest index with data. If page is empty we go on to
605 * the next page and look. If no page, then no records.
606 */
607 first: pgno = FIRST_HEAP_DPAGE;
608 while (!found) {
609 /* Put old lock/page and get the new lock/page */
610 ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
611 if (ret != 0 ) {
612 if (ret == DB_PAGE_NOTFOUND)
613 ret = DB_NOTFOUND;
614 goto err;
615 }
616 dpage = (HEAPPG *)cp->page;
617 /*
618 * The page needs to be a data page with entries on
619 * it. If page is good, loop through the offset table
620 * finding first non-split record or first piece of a
621 * split record, then set up cursor.
622 */
623 if (TYPE(dpage) == P_HEAP && NUM_ENT(dpage) != 0) {
624 for (indx = 0;
625 indx <= HEAP_HIGHINDX(dpage); indx++) {
626 if (HEAP_OFFSETTBL(
627 dbp, dpage)[indx] == 0)
628 continue;
629 hdr = (HEAPHDR *)P_ENTRY(
630 dbp, dpage, indx);
631 if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
632 F_ISSET(hdr, HEAP_RECFIRST)) {
633 found = TRUE;
634 cp->pgno = pgno;
635 cp->indx = indx;
636 break;
637 }
638 }
639 if (!found)
640 pgno++;
641 } else
642 pgno++;
643 }
644 break;
645 case DB_LAST:
646 /*
647 * Grab the metadata page to find the last page, and start
648 * there looking backwards for the record with the highest
649 * index and return that one.
650 */
651 last: pgno = PGNO_BASE_MD;
652 ACQUIRE(dbc, DB_LOCK_READ,
653 pgno, meta_lock, pgno, meta, 0, 0, ret);
654 if (ret != 0)
655 goto err;
656
657 pgno = meta->dbmeta.last_pgno;
658
659 /*
660 * It is possible to have another page added while we are
661 * searching backwards for last record. No need to block
662 * this case from occurring by keeping meta page lock.
663 */
664 DISCARD(dbc, meta, meta_lock, 1, ret);
665 if (ret != 0)
666 goto err;
667
668 while (!found) {
669 /* Don't look earlier than the first data page. */
670 if (pgno < FIRST_HEAP_DPAGE) {
671 ret = DB_NOTFOUND;
672 goto err;
673 }
674
675 /* Put old lock/page and get the new lock/page. */
676 ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
677 if (ret != 0)
678 goto err;
679 dpage = (HEAPPG *)cp->page;
680 /*
681 * The page needs to be a data page with entries on
682 * it. If page is good, search backwards until the a
683 * non-split record or the first piece of a split record
684 * is found.
685 */
686 if (TYPE(dpage) == P_HEAP && NUM_ENT(dpage) != 0) {
687 for (indx = HEAP_HIGHINDX(dpage);
688 indx >= 0; indx--) {
689 if (HEAP_OFFSETTBL(
690 dbp, dpage)[indx] == 0)
691 continue;
692 hdr = (HEAPHDR *)P_ENTRY(
693 dbp, dpage, indx);
694 if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
695 F_ISSET(hdr, HEAP_RECFIRST)) {
696 found = TRUE;
697 cp->pgno = pgno;
698 cp->indx = indx;
699 break;
700 }
701 }
702 if (!found)
703 pgno--;
704 } else
705 pgno--;
706 }
707 break;
708 case DB_NEXT_NODUP:
709 case DB_NEXT:
710 /* If cursor not initialize, behave as DB_FIRST */
711 if (dbc->internal->pgno == PGNO_INVALID)
712 goto first;
713
714 /*
715 * Acquire the current page with the lock we have already,
716 * unless user has asked for a write lock.
717 */
718 ACQUIRE_CUR(dbc, lock_type, cp->pgno, 0, 0, ret);
719 if (ret != 0)
720 goto err;
721 dpage = (HEAPPG *)cp->page;
722
723 /* At end of current page, must get next page */
724 if (cp->indx >= HEAP_HIGHINDX(dpage))
725 getpage = TRUE;
726
727 while (!found) {
728 if (getpage) {
729 pgno = cp->pgno + 1;
730
731 /* Put current page/lock and get next one */
732 ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
733 if (ret != 0) {
734 /* Beyond last page? */
735 if (ret == DB_PAGE_NOTFOUND)
736 ret = DB_NOTFOUND;
737 goto err;
738 }
739 dpage = (HEAPPG *)cp->page;
740
741 /*
742 * If page is a spam page or its a data
743 * page without entries, try again.
744 */
745 if (TYPE(dpage) != P_HEAP ||
746 (TYPE(dpage) == P_HEAP &&
747 NUM_ENT(dpage) == 0))
748 continue;
749
750 /* When searching, indx gets bumped to 0 */
751 cp->indx = -1;
752 getpage = FALSE;
753 }
754
755 /*
756 * Bump index and loop through the offset table finding
757 * first nonzero entry. If the offset is for a split
758 * record, make sure it's the first piece of the split
759 * record. HEAP_HIGHINDX always points to highest filled
760 * entry on page.
761 */
762 cp->indx++;
763 for (indx=cp->indx;
764 indx <= HEAP_HIGHINDX(dpage); indx++) {
765 if (HEAP_OFFSETTBL(dbp, dpage)[indx] == 0)
766 continue;
767 hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, indx);
768 if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
769 F_ISSET(hdr, HEAP_RECFIRST)) {
770 found = TRUE;
771 cp->indx = indx;
772 break;
773 }
774 }
775
776 /* Nothing of interest on page, so try next */
777 if (!found)
778 getpage = TRUE;
779 }
780 break;
781 case DB_PREV_NODUP:
782 case DB_PREV:
783 /* If cursor not initialize, behave as DB_LAST */
784 if (dbc->internal->pgno == PGNO_INVALID)
785 goto last;
786
787 /*
788 * Acquire the current page with the lock we have already,
789 * unless user has asked for a write lock.
790 */
791 ACQUIRE_CUR(dbc, lock_type, cp->pgno, 0, 0, ret);
792 if (ret != 0)
793 goto err;
794 dpage = (HEAPPG *)cp->page;
795
796 /*
797 * Loop through indexes and find first used slot. Check if
798 * already at the first slot.
799 */
800 for (f_indx=0; (f_indx <= HEAP_HIGHINDX(dpage)) &&
801 (HEAP_OFFSETTBL(dbp, dpage)[f_indx] == 0); f_indx++) ;
802
803 /* At the beginning of current page, must get new page */
804 if (cp->indx == 0 || cp->indx <= f_indx) {
805 if (cp->pgno == FIRST_HEAP_DPAGE) {
806 ret = DB_NOTFOUND;
807 goto err;
808 }
809 getpage = TRUE;
810 }
811
812 while (!found) {
813 if (getpage) {
814 pgno = cp->pgno - 1;
815 /* Do not go past first page */
816 if (pgno < FIRST_HEAP_DPAGE) {
817 ret = DB_NOTFOUND;
818 goto err;
819 }
820 /* Put current page/lock and get prev page. */
821 ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
822 if (ret != 0)
823 goto err;
824
825 dpage = (HEAPPG *)cp->page;
826
827 /*
828 * If page is a spam page or its a data
829 * page without entries, try again.
830 */
831 if (TYPE(dpage) != P_HEAP ||
832 (TYPE(dpage) == P_HEAP &&
833 NUM_ENT(dpage) == 0))
834 continue;
835
836 /* When search, this gets bumped to high indx */
837 cp->indx = HEAP_HIGHINDX(dpage) + 1;
838 getpage = FALSE;
839 }
840
841 /*
842 * Decrement index and loop through the offset table
843 * finding previous nonzero entry.
844 */
845 cp->indx--;
846 for (indx=cp->indx;
847 indx >= 0; indx--) {
848 if (HEAP_OFFSETTBL(dbp, dpage)[indx] == 0)
849 continue;
850 hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, indx);
851 if (!F_ISSET(hdr, HEAP_RECSPLIT) ||
852 F_ISSET(hdr, HEAP_RECFIRST)) {
853 found = TRUE;
854 cp->indx = indx;
855 break;
856 }
857 }
858
859 /* Nothing of interest on page, so try previous */
860 if (!found)
861 getpage = TRUE;
862 }
863 break;
864 case DB_GET_BOTH_RANGE:
865 case DB_GET_BOTH:
866 case DB_SET_RANGE:
867 case DB_SET:
868 pgno = ((DB_HEAP_RID *)key->data)->pgno;
869 indx = ((DB_HEAP_RID *)key->data)->indx;
870
871 /* First make sure we're trying to get a data page. */
872 if (pgno == PGNO_BASE_MD ||
873 pgno == HEAP_REGION_PGNO(dbp, pgno)) {
874 ret = DB_NOTFOUND;
875 goto err;
876 }
877
878 /* Lock the data page and get it. */
879 ACQUIRE_CUR(dbc, lock_type, pgno, 0, 0, ret);
880
881 if (ret != 0) {
882 if (ret == DB_PAGE_NOTFOUND)
883 ret = DB_NOTFOUND;
884 goto err;
885 }
886 dpage = (HEAPPG *)cp->page;
887
888 /* validate requested index, throw error if not in range */
889 if ((indx > HEAP_HIGHINDX(dpage)) ||
890 (HEAP_OFFSETTBL(dbp, dpage)[indx] == 0)) {
891 DISCARD(dbc, cp->page, cp->lock, 0, ret);
892 ret = DB_NOTFOUND;
893 goto err;
894 }
895 hdr = (HEAPHDR *)P_ENTRY(dbp, dpage, indx);
896 if (F_ISSET(hdr, HEAP_RECSPLIT) &&
897 !F_ISSET(hdr, HEAP_RECFIRST)) {
898 DISCARD(dbc, cp->page, cp->lock, 0, ret);
899 ret = DB_NOTFOUND;
900 goto err;
901 }
902
903 cp->pgno = pgno;
904 cp->indx = indx;
905
906 if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
907 memset(&tmp_val, 0, sizeof(DBT));
908 /* does the data match ? */
909 if (F_ISSET(hdr, HEAP_RECSPLIT)) {
910 tmp_val.flags = DB_DBT_MALLOC;
911 if ((ret = __heapc_gsplit(
912 dbc, &tmp_val, NULL, 0)) != 0)
913 goto err;
914 } else {
915 tmp_val.data =
916 (void *)((u_int8_t *)hdr + sizeof(HEAPHDR));
917 tmp_val.size = hdr->size;
918 }
919 cmp = __bam_defcmp(dbp, &tmp_val, data);
920 if (F_ISSET(&tmp_val, DB_DBT_MALLOC))
921 __os_ufree(dbp->env, tmp_val.data);
922 if (cmp != 0) {
923 ret = DB_NOTFOUND;
924 goto err;
925 }
926 }
927
928 break;
929 case DB_NEXT_DUP:
930 case DB_PREV_DUP:
931 ret = DB_NOTFOUND;
932 goto err;
933 default:
934 /* DB_GET_RECNO, DB_JOIN_ITEM, DB_SET_RECNO are invalid */
935 ret = __db_unknown_flag(dbp->env, "__heap_get", flags);
936 goto err;
937
938 }
939
940 err: if (ret == 0 ) {
941 if (key != NULL) {
942 rid.pgno = cp->pgno;
943 rid.indx = cp->indx;
944 ret = __db_retcopy(dbp->env, key, &rid,
945 DB_HEAP_RID_SZ, &dbc->rkey->data, &dbc->rkey->ulen);
946 F_SET(key, DB_DBT_ISSET);
947 }
948
949 } else {
950 if (meta != NULL)
951 (void)__memp_fput(mpf,
952 dbc->thread_info, meta, dbc->priority);
953 if (LOCK_ISSET(meta_lock))
954 (void)__LPUT(dbc, meta_lock);
955 if (LOCK_ISSET(cp->lock))
956 (void)__LPUT(dbc, cp->lock);
957 }
958 DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
959 return (ret);
960 }
961
962 #undef IS_FIRST
963 #define IS_FIRST (last_rid.pgno == PGNO_INVALID)
964 /*
965 * __heapc_reloc_partial --
966 * Move data from a too-full page to a new page. The old data page must
967 * be write locked before calling this method.
968 */
969 static int
__heapc_reloc_partial(dbc,key,data)970 __heapc_reloc_partial(dbc, key, data)
971 DBC *dbc;
972 DBT *key;
973 DBT *data;
974 {
975 DB *dbp;
976 DBT hdr_dbt, log_dbt, t_data, t_key;
977 DB_HEAP_RID last_rid, next_rid;
978 HEAPHDR *old_hdr;
979 HEAPSPLITHDR new_hdr;
980 HEAP_CURSOR *cp;
981 int add_bytes, ret;
982 u_int32_t buflen, data_size, dlen, doff, left, old_size;
983 u_int32_t remaining, size;
984 u_int8_t *buf, *olddata;
985
986 dbp = dbc->dbp;
987 cp = (HEAP_CURSOR *)dbc->internal;
988 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
989 memset(&hdr_dbt, 0, sizeof(DBT));
990 memset(&log_dbt, 0, sizeof(DBT));
991 buf = NULL;
992 COMPQUIET(key, NULL);
993
994 /* We only work on partial puts. */
995 DB_ASSERT(dbp->env, F_ISSET(data, DB_DBT_PARTIAL));
996
997 /*
998 * Start by calculating the data_size, total size of the new record, and
999 * dlen, the number of bytes we will actually overwrite. Keep a local
1000 * copy of doff, we'll adjust it as we see pieces of the record so that
1001 * it's always relative to the current piece of data.
1002 */
1003 if (F_ISSET(old_hdr, HEAP_RECSPLIT))
1004 old_size = ((HEAPSPLITHDR *)old_hdr)->tsize;
1005 else
1006 old_size = old_hdr->size;
1007 doff = data->doff;
1008 if (old_size < doff) {
1009 /* Post-pending */
1010 dlen = data->dlen;
1011 data_size = doff + data->size;
1012 } else {
1013 if (old_size - doff < data->dlen)
1014 dlen = old_size - doff;
1015 else
1016 dlen = data->dlen;
1017 data_size = old_size - dlen + data->size;
1018 }
1019
1020 /*
1021 * We don't need a buffer large enough to hold the data_size
1022 * bytes, just one large enough to hold the bytes that will be
1023 * written to an individual page. We'll realloc to the necessary size
1024 * as needed.
1025 */
1026 buflen = 0;
1027 buf = NULL;
1028
1029 /*
1030 * We are updating an existing record, which will grow into a split
1031 * record. The strategy is to overwrite the existing record (or each
1032 * piece of the record if the record is already split.) If the new
1033 * record is shorter than the old, delete any extra pieces. If the new
1034 * record is longer than the old, use heapc_split() to write the extra
1035 * data.
1036 *
1037 * We start each loop with old_hdr pointed at the header for the old
1038 * record and the necessary page write locked in cp->page.
1039 */
1040 last_rid.pgno = PGNO_INVALID;
1041 last_rid.indx = 0;
1042 add_bytes = 1;
1043 left = data_size;
1044 memset(&t_data, 0, sizeof(DBT));
1045 remaining = 0;
1046 for (;;) {
1047 /* Figure out if we have a next piece. */
1048 if (F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1049 next_rid.pgno = ((HEAPSPLITHDR *)old_hdr)->nextpg;
1050 next_rid.indx = ((HEAPSPLITHDR *)old_hdr)->nextindx;
1051 } else {
1052 next_rid.pgno = PGNO_INVALID;
1053 next_rid.indx = 0;
1054 }
1055
1056 /*
1057 * Before we delete the old data, use it to construct the new
1058 * data. First figure out the size of the new piece, including
1059 * any remaining data from the last piece.
1060 */
1061 if (doff >= old_hdr->size)
1062 if (F_ISSET(old_hdr, HEAP_RECLAST) ||
1063 !F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1064 /* Post-pending. */
1065 data_size = doff + data->size;
1066 } else {
1067 /* The new piece is just the old piece. */
1068 data_size = old_hdr->size;
1069 }
1070 else if (doff + dlen > old_hdr->size)
1071 /*
1072 * Some of the to-be-overwritten bytes are on the next
1073 * piece, but we'll append all the new bytes to this
1074 * piece if we haven't already written them.
1075 */
1076 data_size = doff + (add_bytes ? data->size : 0);
1077 else
1078 data_size = old_hdr->size -
1079 dlen + (add_bytes ? data->size : 0);
1080 data_size += remaining;
1081
1082 if (data_size > buflen) {
1083 if (__os_realloc(dbp->env, data_size, &buf) != 0)
1084 return (ENOMEM);
1085 buflen = data_size;
1086 }
1087 t_data.data = buf;
1088
1089 /*
1090 * Adjust past any remaining bytes, they've already been moved
1091 * to the beginning of the buffer.
1092 */
1093 buf += remaining;
1094 remaining = 0;
1095
1096 olddata = (u_int8_t *)old_hdr + HEAP_HDRSIZE(old_hdr);
1097 if (doff >= old_hdr->size) {
1098 memcpy(buf, olddata, old_hdr->size);
1099 doff -= old_hdr->size;
1100 if (F_ISSET(old_hdr, HEAP_RECLAST) ||
1101 !F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1102 /* Post-pending. */
1103 buf += old_hdr->size;
1104 memset(buf, '\0', doff);
1105 buf += doff;
1106 memcpy(buf, data->data, data->size);
1107 }
1108 } else {
1109 /* Preserve the first doff bytes. */
1110 memcpy(buf, olddata, doff);
1111 buf += doff;
1112 olddata += doff;
1113 /* Copy in the new bytes, if needed. */
1114 if (add_bytes) {
1115 memcpy(buf, data->data, data->size);
1116 buf += data->size;
1117 add_bytes = 0;
1118 }
1119 /* Skip dlen bytes. */
1120 if (doff + dlen < old_hdr->size) {
1121 olddata += dlen;
1122 memcpy(buf,
1123 olddata, old_hdr->size - doff - dlen);
1124 dlen = 0;
1125 } else
1126 /*
1127 * The data to be removed spills over onto the
1128 * following page(s). Adjust dlen to account
1129 * for the bytes removed from this page.
1130 */
1131 dlen = doff + dlen - old_hdr->size;
1132 doff = 0;
1133 }
1134 buf = t_data.data;
1135
1136 /* Delete the old data, after logging it. */
1137 old_size = DB_ALIGN(
1138 old_hdr->size + HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1139 if (old_size < sizeof(HEAPSPLITHDR))
1140 old_size = sizeof(HEAPSPLITHDR);
1141 if (DBC_LOGGING(dbc)) {
1142 hdr_dbt.data = old_hdr;
1143 hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1144 log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1145 log_dbt.size = DB_ALIGN(
1146 old_hdr->size, sizeof(u_int32_t));
1147 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1148 &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1149 (u_int32_t)cp->indx, old_size,
1150 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1151 goto err;
1152 } else
1153 LSN_NOT_LOGGED(LSN(cp->page));
1154 if ((ret = __heap_ditem(
1155 dbc, cp->page, cp->indx, old_size)) != 0)
1156 goto err;
1157
1158 if (left == 0)
1159 /*
1160 * We've finished writing the new record, we're just
1161 * cleaning up the old record now.
1162 */
1163 goto next_pg;
1164
1165 if (data_size == 0 && !IS_FIRST) {
1166 /*
1167 * This piece is being completely removed. We need to
1168 * adjust the header of the previous piece now.
1169 */
1170 ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
1171 last_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1172 if (ret != 0)
1173 goto err;
1174
1175 cp->indx = last_rid.indx;
1176 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1177
1178 if (DBC_LOGGING(dbc)) {
1179 old_size = DB_ALIGN(old_hdr->size +
1180 HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1181 hdr_dbt.data = old_hdr;
1182 hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1183 log_dbt.data =
1184 (u_int8_t *)old_hdr + hdr_dbt.size;
1185 log_dbt.size = DB_ALIGN(
1186 old_hdr->size, sizeof(u_int32_t));
1187 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1188 &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1189 (u_int32_t)cp->indx, old_size,
1190 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1191 goto err;
1192 } else
1193 LSN_NOT_LOGGED(LSN(cp->page));
1194
1195 ((HEAPSPLITHDR *)old_hdr)->nextpg = next_rid.pgno;
1196 ((HEAPSPLITHDR *)old_hdr)->nextindx = next_rid.indx;
1197
1198 if (DBC_LOGGING(dbc)) {
1199 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1200 &LSN(cp->page), 0, DB_ADD_HEAP, cp->pgno,
1201 (u_int32_t)cp->indx, old_size,
1202 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1203 goto err;
1204 } else
1205 LSN_NOT_LOGGED(LSN(cp->page));
1206
1207 DISCARD(dbc, cp->page, cp->lock, 1, ret);
1208
1209 goto next_pg;
1210 }
1211
1212 /* Set up the header for the new record. */
1213 memset(&new_hdr, 0, sizeof(HEAPSPLITHDR));
1214 new_hdr.std_hdr.flags = HEAP_RECSPLIT;
1215 /*
1216 * If next_rid.pgno == PGNO_INVALID and there's still more data,
1217 * we'll come back and correct the header once we know where the
1218 * next piece lives.
1219 */
1220 new_hdr.nextpg = next_rid.pgno;
1221 new_hdr.nextindx = next_rid.indx;
1222 /*
1223 * Figure out how much we can fit on the page, rounding down to
1224 * a multiple of 4. If we will have to expand the offset table,
1225 * account for that. It needs to be enough to at least fit the
1226 * split header.
1227 */
1228 size = HEAP_FREESPACE(dbp, cp->page);
1229 if (NUM_ENT(cp->page) == 0 ||
1230 cp->indx > HEAP_HIGHINDX(cp->page))
1231 size -= sizeof(db_indx_t);
1232 /* Round down to a multiple of 4. */
1233 size = DB_ALIGN(
1234 size - sizeof(u_int32_t) + 1, sizeof(u_int32_t));
1235 DB_ASSERT(dbp->env, size >= sizeof(HEAPSPLITHDR));
1236
1237 /*
1238 * We try to fill the page, but cannot write more than
1239 * t_data.size bytes, that's all we have in-memory.
1240 */
1241 new_hdr.std_hdr.size = (u_int16_t)
1242 (size - sizeof(HEAPSPLITHDR));
1243 if (new_hdr.std_hdr.size > data_size)
1244 new_hdr.std_hdr.size = data_size;
1245 if (new_hdr.std_hdr.size >= left) {
1246 new_hdr.std_hdr.size = left;
1247 new_hdr.std_hdr.flags |= HEAP_RECLAST;
1248 new_hdr.nextpg = PGNO_INVALID;
1249 new_hdr.nextindx = 0;
1250 }
1251 if (IS_FIRST) {
1252 new_hdr.std_hdr.flags |= HEAP_RECFIRST;
1253 new_hdr.tsize = left;
1254 }
1255
1256 /* Now write the new data to the page. */
1257 t_data.size = new_hdr.std_hdr.size;
1258 hdr_dbt.data = &new_hdr;
1259 hdr_dbt.size = sizeof(HEAPSPLITHDR);
1260 /* Log the write. */
1261 if (DBC_LOGGING(dbc)) {
1262 if ((ret = __heap_addrem_log(dbp,
1263 dbc->txn, &LSN(cp->page), 0,
1264 DB_ADD_HEAP, cp->pgno, (u_int32_t)cp->indx,
1265 size, &hdr_dbt, &t_data, &LSN(cp->page))) != 0)
1266 goto err;
1267 } else
1268 LSN_NOT_LOGGED(LSN(cp->page));
1269 if ((ret = __heap_pitem(dbc,
1270 (PAGE *)cp->page, cp->indx, size, &hdr_dbt, &t_data)) != 0)
1271 goto err;
1272
1273 left -= new_hdr.std_hdr.size;
1274 /*
1275 * If any data couldn't fit on this page, it has to go onto the
1276 * next. Copy it to the front of the buffer and it will be
1277 * preserved in the next loop.
1278 */
1279 if (new_hdr.std_hdr.size < data_size) {
1280 remaining = data_size - new_hdr.std_hdr.size;
1281 memmove(buf, buf + new_hdr.std_hdr.size, remaining);
1282 }
1283
1284 /*
1285 * Remember this piece's RID, we may need to update the header
1286 * if the next data piece is removed, or if this is the final
1287 * piece and we add data to the end of the record.
1288 */
1289 next_pg: last_rid.pgno = cp->pgno;
1290 last_rid.indx = cp->indx;
1291 /* Get the next page, if any. */
1292 if (next_rid.pgno != PGNO_INVALID) {
1293 ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
1294 next_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1295 if (ret != 0)
1296 goto err;
1297 cp->indx = next_rid.indx;
1298 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1299 DB_ASSERT(dbp->env,
1300 HEAP_HIGHINDX(cp->page) <= cp->indx);
1301 DB_ASSERT(dbp->env, F_ISSET(old_hdr, HEAP_RECSPLIT));
1302 } else {
1303 /* Discard the page and drop the lock, txn-ally. */
1304 DISCARD(dbc, cp->page, cp->lock, 1, ret);
1305 if (ret != 0)
1306 goto err;
1307 break;
1308 }
1309 }
1310
1311 /*
1312 * If there is more work to do, let heapc_split do it. After
1313 * heapc_split returns we need to update nextpg and nextindx in the
1314 * header of the last piece we wrote above.
1315 *
1316 * For logging purposes, we "delete" the old record and then "add" the
1317 * record. This makes redo/undo work as-is, but we won't actually
1318 * delete and re-add the record.
1319 */
1320 if (left > 0) {
1321 memset(&t_key, 0, sizeof(DBT));
1322 t_key.size = t_key.ulen = sizeof(DB_HEAP_RID);
1323 t_key.data = &next_rid;
1324 t_key.flags = DB_DBT_USERMEM;
1325 t_data.size = left;
1326 if ((ret = __heapc_split(dbc, &t_key, &t_data, 0)) != 0)
1327 goto err;
1328
1329 ACQUIRE_CUR(dbc,
1330 DB_LOCK_WRITE, last_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1331 if (ret != 0)
1332 goto err;
1333
1334 cp->indx = last_rid.indx;
1335 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1336
1337 if (DBC_LOGGING(dbc)) {
1338 old_size = DB_ALIGN(old_hdr->size +
1339 HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1340 hdr_dbt.data = old_hdr;
1341 hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1342 log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1343 log_dbt.size = DB_ALIGN(
1344 old_hdr->size, sizeof(u_int32_t));
1345 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1346 &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1347 (u_int32_t)cp->indx, old_size,
1348 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1349 goto err;
1350 } else
1351 LSN_NOT_LOGGED(LSN(cp->page));
1352
1353 ((HEAPSPLITHDR *)old_hdr)->nextpg = next_rid.pgno;
1354 ((HEAPSPLITHDR *)old_hdr)->nextindx = next_rid.indx;
1355
1356 if (DBC_LOGGING(dbc)) {
1357 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1358 &LSN(cp->page), 0, DB_ADD_HEAP, cp->pgno,
1359 (u_int32_t)cp->indx, old_size,
1360 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1361 goto err;
1362 } else
1363 LSN_NOT_LOGGED(LSN(cp->page));
1364
1365 DISCARD(dbc, cp->page, cp->lock, 1, ret);
1366 }
1367
1368 err: DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1369 if (buf != NULL)
1370 __os_free(dbp->env, buf);
1371 return (ret);
1372 }
1373
1374 /*
1375 * __heapc_reloc --
1376 * Move data from a too-full page to a new page. The old data page must
1377 * be write locked before calling this method.
1378 */
1379 static int
__heapc_reloc(dbc,key,data)1380 __heapc_reloc(dbc, key, data)
1381 DBC *dbc;
1382 DBT *key;
1383 DBT *data;
1384 {
1385 DB *dbp;
1386 DBT hdr_dbt, log_dbt, t_data, t_key;
1387 DB_HEAP_RID last_rid, next_rid;
1388 HEAPHDR *old_hdr;
1389 HEAPSPLITHDR new_hdr;
1390 HEAP_CURSOR *cp;
1391 int is_first, ret;
1392 u_int32_t left, old_size, size;
1393
1394 dbp = dbc->dbp;
1395 cp = (HEAP_CURSOR *)dbc->internal;
1396 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1397 memset(&hdr_dbt, 0, sizeof(DBT));
1398 memset(&log_dbt, 0, sizeof(DBT));
1399 COMPQUIET(key, NULL);
1400
1401 /*
1402 * We are updating an existing record, which will grow into a split
1403 * record. The strategy is to overwrite the existing record (or each
1404 * piece of the record if the record is already split.) If the new
1405 * record is shorter than the old, delete any extra pieces. If the new
1406 * record is longer than the old, use heapc_split() to write the extra
1407 * data.
1408 *
1409 * We start each loop with t_data.data positioned to the next byte to be
1410 * written, old_hdr pointed at the header for the old record and the
1411 * necessary page write locked in cp->page.
1412 */
1413 is_first = 1;
1414 left = data->size;
1415 memset(&t_data, 0, sizeof(DBT));
1416 t_data.data = data->data;
1417 for (;;) {
1418 /* Figure out if we have a next piece. */
1419 if (F_ISSET(old_hdr, HEAP_RECSPLIT)) {
1420 next_rid.pgno = ((HEAPSPLITHDR *)old_hdr)->nextpg;
1421 next_rid.indx = ((HEAPSPLITHDR *)old_hdr)->nextindx;
1422 } else {
1423 next_rid.pgno = PGNO_INVALID;
1424 next_rid.indx = 0;
1425 }
1426
1427 /* Delete the old data, after logging it. */
1428 old_size = DB_ALIGN(
1429 old_hdr->size + HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1430 if (old_size < sizeof(HEAPSPLITHDR))
1431 old_size = sizeof(HEAPSPLITHDR);
1432 if (DBC_LOGGING(dbc)) {
1433 hdr_dbt.data = old_hdr;
1434 hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1435 log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1436 log_dbt.size = DB_ALIGN(
1437 old_hdr->size, sizeof(u_int32_t));
1438 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1439 &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1440 (u_int32_t)cp->indx, old_size,
1441 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1442 goto err;
1443 } else
1444 LSN_NOT_LOGGED(LSN(cp->page));
1445 if ((ret = __heap_ditem(
1446 dbc, cp->page, cp->indx, old_size)) != 0)
1447 goto err;
1448
1449 if (left == 0)
1450 /*
1451 * We've finished writing the new record, we're just
1452 * cleaning up the old record now.
1453 */
1454 goto next_pg;
1455
1456 /* Set up the header for the new record. */
1457 memset(&new_hdr, 0, sizeof(HEAPSPLITHDR));
1458 new_hdr.std_hdr.flags = HEAP_RECSPLIT;
1459 /* We'll set this later if next_rid.pgno == PGNO_INVALID. */
1460 new_hdr.nextpg = next_rid.pgno;
1461 new_hdr.nextindx = next_rid.indx;
1462 /*
1463 * Figure out how much we can fit on the page, rounding down to
1464 * a multiple of 4. If we will have to expand the offset table,
1465 * account for that.It needs to be enough to at least fit the
1466 * split header.
1467 */
1468 size = HEAP_FREESPACE(dbp, cp->page);
1469 if (NUM_ENT(cp->page) == 0 ||
1470 cp->indx > HEAP_HIGHINDX(cp->page))
1471 size -= sizeof(db_indx_t);
1472 /* Round down to a multiple of 4. */
1473 size = DB_ALIGN(
1474 size - sizeof(u_int32_t) + 1, sizeof(u_int32_t));
1475 DB_ASSERT(dbp->env, size >= sizeof(HEAPSPLITHDR));
1476 new_hdr.std_hdr.size =
1477 (u_int16_t)(size - sizeof(HEAPSPLITHDR));
1478 if (new_hdr.std_hdr.size >= left) {
1479 new_hdr.std_hdr.size = left;
1480 new_hdr.std_hdr.flags |= HEAP_RECLAST;
1481 new_hdr.nextpg = PGNO_INVALID;
1482 new_hdr.nextindx = 0;
1483 }
1484 if (is_first) {
1485 new_hdr.std_hdr.flags |= HEAP_RECFIRST;
1486 new_hdr.tsize = left;
1487 is_first = 0;
1488 }
1489
1490 /* Now write the new data to the page. */
1491 t_data.size = new_hdr.std_hdr.size;
1492 hdr_dbt.data = &new_hdr;
1493 hdr_dbt.size = sizeof(HEAPSPLITHDR);
1494 /* Log the write. */
1495 if (DBC_LOGGING(dbc)) {
1496 if ((ret = __heap_addrem_log(dbp,
1497 dbc->txn, &LSN(cp->page), 0,
1498 DB_ADD_HEAP, cp->pgno, (u_int32_t)cp->indx,
1499 size, &hdr_dbt, &t_data, &LSN(cp->page))) != 0)
1500 goto err;
1501 } else
1502 LSN_NOT_LOGGED(LSN(cp->page));
1503 if ((ret = __heap_pitem(dbc,
1504 (PAGE *)cp->page, cp->indx, size, &hdr_dbt, &t_data)) != 0)
1505 goto err;
1506
1507 left -= new_hdr.std_hdr.size;
1508 t_data.data = (u_int8_t *)(t_data.data) + new_hdr.std_hdr.size;
1509
1510 /* Get the next page, if any. */
1511 next_pg: if (next_rid.pgno != PGNO_INVALID) {
1512 ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
1513 next_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1514 if (ret != 0)
1515 goto err;
1516 cp->indx = next_rid.indx;
1517 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1518 } else {
1519 /*
1520 * Remember the final piece's RID, we may need to update
1521 * the header after writing the rest of the record.
1522 */
1523 last_rid.pgno = cp->pgno;
1524 last_rid.indx = cp->indx;
1525 /* Discard the page and drop the lock, txn-ally. */
1526 DISCARD(dbc, cp->page, cp->lock, 1, ret);
1527 if (ret != 0)
1528 goto err;
1529 break;
1530 }
1531 }
1532
1533 /*
1534 * If there is more work to do, let heapc_split do it. After
1535 * heapc_split returns we need to update nextpg and nextindx in the
1536 * header of the last piece we wrote above.
1537 *
1538 * For logging purposes, we "delete" the old record and then "add" the
1539 * record. This makes redo/undo work as-is, but we won't actually
1540 * delete and re-add the record.
1541 */
1542 if (left > 0) {
1543 memset(&t_key, 0, sizeof(DBT));
1544 t_key.size = t_key.ulen = sizeof(DB_HEAP_RID);
1545 t_key.data = &next_rid;
1546 t_key.flags = DB_DBT_USERMEM;
1547 t_data.size = left;
1548 if ((ret = __heapc_split(dbc, &t_key, &t_data, 0)) != 0)
1549 goto err;
1550
1551 ACQUIRE_CUR(dbc,
1552 DB_LOCK_WRITE, last_rid.pgno, 0, DB_MPOOL_DIRTY, ret);
1553 if (ret != 0)
1554 goto err;
1555
1556 cp->indx = last_rid.indx;
1557 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1558
1559 if (DBC_LOGGING(dbc)) {
1560 old_size = DB_ALIGN(old_hdr->size +
1561 HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1562 hdr_dbt.data = old_hdr;
1563 hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1564 log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1565 log_dbt.size = DB_ALIGN(
1566 old_hdr->size, sizeof(u_int32_t));
1567 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1568 &LSN(cp->page), 0, DB_REM_HEAP, cp->pgno,
1569 (u_int32_t)cp->indx, old_size,
1570 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1571 goto err;
1572 } else
1573 LSN_NOT_LOGGED(LSN(cp->page));
1574
1575 ((HEAPSPLITHDR *)old_hdr)->nextpg = next_rid.pgno;
1576 ((HEAPSPLITHDR *)old_hdr)->nextindx = next_rid.indx;
1577
1578 if (DBC_LOGGING(dbc)) {
1579 if ((ret = __heap_addrem_log(dbp, dbc->txn,
1580 &LSN(cp->page), 0, DB_ADD_HEAP, cp->pgno,
1581 (u_int32_t)cp->indx,old_size,
1582 &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1583 goto err;
1584 } else
1585 LSN_NOT_LOGGED(LSN(cp->page));
1586
1587 DISCARD(dbc, cp->page, cp->lock, 1, ret);
1588 }
1589
1590 err: DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1591 return (ret);
1592 }
1593
1594 /*
1595 * __heapc_put --
1596 *
1597 * Put using a cursor. If the given key exists, update the associated data. If
1598 * the given key does not exsist, return an error.
1599 */
1600 static int
__heapc_put(dbc,key,data,flags,pgnop)1601 __heapc_put(dbc, key, data, flags, pgnop)
1602 DBC *dbc;
1603 DBT *key;
1604 DBT *data;
1605 u_int32_t flags;
1606 db_pgno_t *pgnop;
1607 {
1608 DB *dbp;
1609 DBT hdr_dbt, log_dbt, new_data;
1610 DB_MPOOLFILE *mpf;
1611 HEAPHDR hdr, *old_hdr;
1612 HEAP_CURSOR *cp;
1613 PAGE *rpage;
1614 db_pgno_t region_pgno;
1615 int oldspace, ret, space, t_ret;
1616 u_int32_t data_size, dlen, new_size, old_flags, old_size, tot_size;
1617 u_int8_t *buf, *olddata, *src, *dest;
1618
1619 dbp = dbc->dbp;
1620 mpf = dbp->mpf;
1621 cp = (HEAP_CURSOR *)dbc->internal;
1622 rpage = NULL;
1623 buf = dest = src = NULL;
1624 dlen = 0;
1625
1626 if (flags != DB_CURRENT) {
1627 /* We're going to write following the get, so use RMW. */
1628 old_flags = dbc->flags;
1629 F_SET(dbc, DBC_RMW);
1630 ret = __heapc_get(dbc, key, data, DB_SET, pgnop);
1631 F_CLR(key, DB_DBT_ISSET);
1632 dbc->flags = old_flags;
1633 DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1634 if (ret != 0)
1635 return (ret);
1636 else if (flags == DB_NOOVERWRITE)
1637 return (DB_KEYEXIST);
1638 if ((ret = __memp_dirty(mpf, &cp->page,
1639 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1640 return (ret);
1641 } else {
1642 /* We have a read lock, but need a write lock. */
1643 if (STD_LOCKING(dbc) && cp->lock_mode != DB_LOCK_WRITE &&
1644 (ret = __db_lget(dbc,
1645 LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
1646 return (ret);
1647
1648 if ((ret = __memp_fget(mpf, &cp->pgno, dbc->thread_info,
1649 dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0)
1650 return (ret);
1651 }
1652
1653 /* We've got the page locked and stored in cp->page. */
1654 HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), oldspace);
1655
1656 /*
1657 * Figure out the spacing issue. There is a very rare corner case where
1658 * we don't have enough space on the page to expand the data. Splitting
1659 * the record results in a larger header, if the page is jam packed
1660 * there might not be room for the larger header.
1661 *
1662 * hdr->size is the size of the stored data, it doesn't include any
1663 * padding.
1664 */
1665 old_hdr = (HEAPHDR *)(P_ENTRY(dbp, cp->page, cp->indx));
1666 /* Need data.size + header size, 4-byte aligned. */
1667 old_size =
1668 DB_ALIGN(old_hdr->size + HEAP_HDRSIZE(old_hdr), sizeof(u_int32_t));
1669 if (old_size < sizeof(HEAPSPLITHDR))
1670 old_size = sizeof(HEAPSPLITHDR);
1671 if (F_ISSET(data, DB_DBT_PARTIAL)) {
1672 if (F_ISSET(old_hdr, HEAP_RECSPLIT))
1673 tot_size = ((HEAPSPLITHDR *)old_hdr)->tsize;
1674 else
1675 tot_size = old_hdr->size;
1676 if (tot_size < data->doff) {
1677 /* Post-pending */
1678 dlen = data->dlen;
1679 data_size = data->doff + data->size;
1680 } else {
1681 if (tot_size - data->doff < data->dlen)
1682 dlen = tot_size - data->doff;
1683 else
1684 dlen = data->dlen;
1685 data_size = tot_size - dlen + data->size;
1686 }
1687 } else
1688 data_size = data->size;
1689 new_size = DB_ALIGN(data_size + sizeof(HEAPHDR), sizeof(u_int32_t));
1690 if (new_size < sizeof(HEAPSPLITHDR))
1691 new_size = sizeof(HEAPSPLITHDR);
1692
1693 /* Check whether we actually have enough space on this page. */
1694 if (F_ISSET(old_hdr, HEAP_RECSPLIT) ||
1695 (new_size > old_size &&
1696 new_size - old_size > HEAP_FREESPACE(dbp, cp->page))) {
1697 /*
1698 * We've got to split the record, not enough room on the
1699 * page. Splitting the record will remove old_size bytes and
1700 * introduce at least sizeof(HEAPSPLITHDR).
1701 */
1702 if (F_ISSET(data, DB_DBT_PARTIAL))
1703 return (__heapc_reloc_partial(dbc, key, data));
1704 else
1705 return (__heapc_reloc(dbc, key, data));
1706 }
1707
1708 memset(&new_data, 0, sizeof(DBT));
1709 new_data.size = data_size;
1710 if (F_ISSET(data, DB_DBT_PARTIAL)) {
1711 /*
1712 * Before replacing the old data, we need to use it to build the
1713 * new data.
1714 */
1715 if ((ret = __os_malloc(dbp->env, data_size, &buf)) != 0)
1716 goto err;
1717 new_data.data = buf;
1718
1719 /*
1720 * Preserve data->doff bytes at the start, or all of the old
1721 * record plus padding, if post-pending.
1722 */
1723 olddata = (u_int8_t *)old_hdr + sizeof(HEAPHDR);
1724 if (data->doff > old_hdr->size) {
1725 memcpy(buf, olddata, old_hdr->size);
1726 buf += old_hdr->size;
1727 memset(buf, '\0', data->doff - old_hdr->size);
1728 buf += data->doff - old_hdr->size;
1729 } else {
1730 memcpy(buf, olddata, data->doff);
1731 buf += data->doff;
1732 }
1733
1734 /* Now copy in the user's data. */
1735 memcpy(buf, data->data, data->size);
1736 buf += data->size;
1737
1738 /* Fill in remaining data from the old record, skipping dlen. */
1739 if (data->doff < old_hdr->size) {
1740 olddata += data->doff + data->dlen;
1741 memcpy(buf,
1742 olddata, old_hdr->size - data->doff - data->dlen);
1743 }
1744 } else {
1745 new_data.data = data->data;
1746 }
1747
1748 /*
1749 * Do the update by deleting the old record and writing the new
1750 * record. Start by logging the entire operation.
1751 */
1752 memset(&hdr, 0, sizeof(HEAPHDR));
1753 hdr.size = data_size;
1754 if (DBC_LOGGING(dbc)) {
1755 hdr_dbt.data = old_hdr;
1756 hdr_dbt.size = HEAP_HDRSIZE(old_hdr);
1757 log_dbt.data = (u_int8_t *)old_hdr + hdr_dbt.size;
1758 log_dbt.size = DB_ALIGN(old_hdr->size, sizeof(u_int32_t));
1759 if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
1760 0, DB_REM_HEAP, cp->pgno, (u_int32_t)cp->indx,
1761 old_size, &hdr_dbt, &log_dbt, &LSN(cp->page))) != 0)
1762 goto err;
1763 hdr_dbt.data = &hdr;
1764 hdr_dbt.size = HEAP_HDRSIZE(&hdr);
1765 if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
1766 0, DB_ADD_HEAP, cp->pgno, (u_int32_t)cp->indx,
1767 new_size, &hdr_dbt, &new_data, &LSN(cp->page))) != 0)
1768 goto err;
1769 } else
1770 LSN_NOT_LOGGED(LSN(cp->page));
1771
1772 if ((ret = __heap_ditem(dbc, cp->page, cp->indx, old_size)) != 0)
1773 goto err;
1774 hdr_dbt.data = &hdr;
1775 hdr_dbt.size = HEAP_HDRSIZE(&hdr);
1776 if ((ret = __heap_pitem(dbc,
1777 (PAGE *)cp->page, cp->indx, new_size, &hdr_dbt, &new_data)) != 0)
1778 goto err;
1779
1780 /* Check whether we need to update the space bitmap. */
1781 HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), space);
1782
1783 if (space != oldspace) {
1784 /* Get the region page with an exclusive latch. */
1785 region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
1786
1787 if ((ret = __memp_fget(mpf, ®ion_pgno,
1788 dbc->thread_info, NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
1789 goto err;
1790
1791 HEAP_SETSPACE(dbp, rpage, cp->pgno - region_pgno - 1, space);
1792 }
1793
1794 err: DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
1795 if (rpage != NULL && (t_ret = __memp_fput(mpf,
1796 dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
1797 ret = t_ret;
1798 if (F_ISSET(data, DB_DBT_PARTIAL))
1799 __os_free(dbp->env, new_data.data);
1800
1801 if (ret != 0 && LOCK_ISSET(cp->lock))
1802 (void)__TLPUT(dbc, cp->lock);
1803
1804 return (ret);
1805 }
1806
1807 /*
1808 * __heap_getpage --
1809 * Return a page with sufficient free space. The page will be write locked
1810 * and marked dirty.
1811 */
1812 static int
__heap_getpage(dbc,size,avail)1813 __heap_getpage(dbc, size, avail)
1814 DBC *dbc;
1815 u_int32_t size;
1816 u_int8_t *avail;
1817 {
1818 DB *dbp;
1819 DBMETA *meta;
1820 DB_LOCK meta_lock;
1821 DB_LSN meta_lsn;
1822 DB_MPOOLFILE *mpf;
1823 HEAP *h;
1824 HEAPPG *rpage;
1825 HEAP_CURSOR *cp;
1826 db_pgno_t data_pgno, *lkd_pgs, meta_pgno, region_pgno, start_region;
1827 int i, lk_mode, max, p, ret, space, start, t_ret;
1828
1829 LOCK_INIT(meta_lock);
1830 dbp = dbc->dbp;
1831 mpf = dbp->mpf;
1832 cp = (HEAP_CURSOR *)dbc->internal;
1833 h = dbp->heap_internal;
1834 start_region = region_pgno = h->curregion;
1835 max = HEAP_REGION_SIZE(dbp);
1836 i = ret = t_ret = 0;
1837 lkd_pgs = NULL;
1838
1839 /*
1840 * The algorithm for finding a page:
1841 *
1842 * Look in the space bitmap of the current region page for a data page
1843 * with at least size bytes free. Once we find a page, try to lock it
1844 * and if we get the lock we're done.
1845 *
1846 * Don't wait for a locked region page, just move on to the next region
1847 * page, creating it if it doesn't exist. If the size of the heap
1848 * database is not constrained, just keep creating regions and extending
1849 * the database until we find a page with space. If the database size
1850 * is constrained, loop back to the first region page from the final
1851 * region page. If we wind up making it all the way back to where our
1852 * search began, we need to start waiting for locked region pages. If
1853 * we finish another loop through the database waiting for every region
1854 * page, we know there's no room.
1855 */
1856
1857 /*
1858 * Figure out the % of the page the data will occupy and translate that
1859 * to the relevant bit-map value we need to look for.
1860 */
1861 HEAP_CALCSPACEBITS(dbp, size, space);
1862
1863 /*
1864 * Get the current region page, with a shared latch. On the first loop
1865 * through a fixed size database, we move on to the next region if the
1866 * page is locked. On the second loop, we wait for locked region
1867 * pages. If the database isn't fixed size, we never wait, we'll
1868 * eventually get to use one of the region pages we create.
1869 */
1870 lk_mode = DB_MPOOL_TRY;
1871 find: while ((ret = __memp_fget(mpf, ®ion_pgno,
1872 dbc->thread_info, NULL, lk_mode, &rpage)) != 0 ||
1873 TYPE(rpage) != P_IHEAP) {
1874 if (ret == DB_LOCK_NOTGRANTED)
1875 goto next_region;
1876 if (ret != 0 && ret != DB_PAGE_NOTFOUND)
1877 return (ret);
1878 /*
1879 * The region page doesn't exist, or hasn't been initialized,
1880 * create it, then try again. If the page exists, we have to
1881 * drop it before initializing the region.
1882 */
1883 if (ret == 0 && (ret = __memp_fput(
1884 mpf, dbc->thread_info, rpage, dbc->priority)) != 0)
1885 return (ret);
1886
1887 if ((ret = __heap_create_region(dbc, region_pgno)) != 0)
1888 return (ret);
1889 }
1890
1891 start = h->curpgindx;
1892 /*
1893 * If this is the last region page in a fixed size db, figure out the
1894 * maximum pgno in the bitmap.
1895 */
1896 if (region_pgno + max > h->maxpgno)
1897 max = h->maxpgno - region_pgno;
1898 /*
1899 * Look in the bitmap for a page with sufficient free space. We use i
1900 * in a slightly strange way. Because the 2-bits in the bitmap are only
1901 * an estimate, there is a chance the data won't fit on the page we
1902 * choose. In that case, we re-start the process and want to be able to
1903 * resume this loop where we left off.
1904 */
1905 for (; i < max; i++) {
1906 p = start + i;
1907 if (p >= max)
1908 p -= max;
1909 if ((*avail = HEAP_SPACE(dbp, rpage, p)) > space)
1910 continue;
1911 data_pgno = region_pgno + p + 1;
1912 ACQUIRE_CUR(dbc,
1913 DB_LOCK_WRITE, data_pgno, DB_LOCK_NOWAIT, 0, ret);
1914 /*
1915 * If we have the lock and the page or have the lock and need to
1916 * create the page, we're good. If we don't have the lock, try
1917 * to find different page.
1918 */
1919 if (ret == 0 || ret == DB_PAGE_NOTFOUND)
1920 break;
1921 else if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) {
1922 ret = 0;
1923 continue;
1924 } else
1925 goto err;
1926 }
1927
1928 /*
1929 * Keep a worst case range of highest used page in the region.
1930 */
1931 if (i < max && data_pgno > rpage->high_pgno) {
1932 if ((ret = __memp_dirty(mpf,
1933 &rpage, dbc->thread_info, NULL, dbc->priority, 0)) != 0)
1934 goto err;
1935 /* We might have blocked, check again */
1936 if (data_pgno > rpage->high_pgno)
1937 rpage->high_pgno = data_pgno;
1938 }
1939
1940 /* Done with the region page, even if we didn't find a page. */
1941 if ((ret = __memp_fput(mpf,
1942 dbc->thread_info, rpage, dbc->priority)) != 0) {
1943 /* Did not read the data page, so we can release its lock. */
1944 DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
1945 goto err;
1946 }
1947 rpage = NULL;
1948
1949 if (i >= max) {
1950 /*
1951 * No free pages on this region page, advance to the next region
1952 * page. If we're at the end of a fixed size heap db, loop
1953 * around to the first region page. There is not currently a
1954 * data page locked.
1955 */
1956 next_region: region_pgno += HEAP_REGION_SIZE(dbp) + 1;
1957
1958 if (region_pgno > h->maxpgno)
1959 region_pgno = FIRST_HEAP_RPAGE;
1960
1961 if (region_pgno == start_region) {
1962 /*
1963 * We're in a fixed size db and we've looped through all
1964 * region pages.
1965 */
1966
1967 if (lk_mode == DB_MPOOL_TRY) {
1968 /*
1969 * We may have missed a region page with room,
1970 * because we didn't wait for locked pages. Try
1971 * another loop, waiting for all pages.
1972 */
1973 lk_mode = 0;
1974 } else {
1975 /*
1976 * We've seen every region page, because we
1977 * waited for all pages. No room.
1978 */
1979 ret = DB_HEAP_FULL;
1980 goto err;
1981 }
1982 }
1983
1984 h->curregion = region_pgno;
1985 h->curpgindx = 0;
1986 i = 0;
1987 goto find;
1988 }
1989
1990 /*
1991 * At this point we have the page locked. If we have the page, we need
1992 * to mark it dirty. If we don't have the page (or if the page is
1993 * empty) we need to create and initialize it.
1994 */
1995 if (cp->pgno == PGNO_INVALID || PGNO(cp->page) == PGNO_INVALID) {
1996 /*
1997 * The data page needs to be created and the metadata page needs
1998 * to be updated. Once we get the metadata page, we must not
1999 * jump to err, the metadata page and lock are put back here.
2000 *
2001 * It is possible that the page was created by an aborted txn,
2002 * in which case the page exists but is all zeros. We still
2003 * need to "create" it and log the creation.
2004 *
2005 */
2006
2007 meta_pgno = PGNO_BASE_MD;
2008 if ((ret = __db_lget(dbc, LCK_ALWAYS, meta_pgno,
2009 DB_LOCK_WRITE, DB_LOCK_NOWAIT, &meta_lock)) != 0) {
2010 /*
2011 * We don't want to block while having latched
2012 * a page off the end of file. This could
2013 * get truncated by another thread and we
2014 * will deadlock.
2015 */
2016 p = cp->page != NULL;
2017 DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2018 if (t_ret != 0 ||
2019 (ret != DB_LOCK_NOTGRANTED &&
2020 ret != DB_LOCK_DEADLOCK))
2021 goto pg_err;
2022 if ((ret = __db_lget(dbc, LCK_ALWAYS, meta_pgno,
2023 DB_LOCK_WRITE, 0, &meta_lock)) != 0)
2024 goto pg_err;
2025 ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
2026 data_pgno, 0, DB_MPOOL_CREATE, ret);
2027 /*
2028 * We can race, having read this page when it was
2029 * less than last_pgno but now an aborted
2030 * allocation can make this page beyond last_pgno
2031 * so we must free it. If we can't get the
2032 * lock on the page again, then some other
2033 * thread will handle the issue.
2034 */
2035 if (ret != 0) {
2036 pg_err: if (p != 0) {
2037 ACQUIRE_CUR(dbc, DB_LOCK_WRITE,
2038 data_pgno, 0, 0, t_ret);
2039 if (t_ret == 0 &&
2040 PGNO(cp->page) == PGNO_INVALID) {
2041 (void)__memp_fput(mpf,
2042 dbc->thread_info,
2043 cp->page, dbc->priority);
2044 (void)__memp_fget(mpf,
2045 &data_pgno,
2046 dbc->thread_info, dbc->txn,
2047 DB_MPOOL_FREE, &cp->page);
2048 }
2049 (void)__LPUT(dbc, cp->lock);
2050 }
2051 (void)__LPUT(dbc, meta_lock);
2052 goto err;
2053 }
2054 /* Check if we lost a race. */
2055 if (PGNO(cp->page) != PGNO_INVALID) {
2056 if ((ret = __LPUT(dbc, meta_lock)) != 0)
2057 goto err;
2058 goto check;
2059 }
2060 }
2061
2062 /*
2063 * Before creating a new page in this region, check that the
2064 * region page still exists. By this point, the transaction
2065 * that created the region must have aborted or committed,
2066 * because we now hold the metadata lock. If we can't get the
2067 * latch, the page must exist.
2068 */
2069 ret = __memp_fget(mpf, ®ion_pgno,
2070 dbc->thread_info, NULL, DB_MPOOL_TRY, &rpage);
2071 if (ret == DB_LOCK_NOTGRANTED)
2072 ret = 0;
2073 else if (ret != 0) {
2074 /*
2075 * Free up the metadata lock. If this was an error
2076 * other than a missing region page, bail.
2077 */
2078 if ((t_ret = __LPUT(dbc, meta_lock)) != 0)
2079 ret = t_ret;
2080 if (ret != DB_PAGE_NOTFOUND)
2081 goto err;
2082 /*
2083 * The region no longer exists. Release the page's lock
2084 * (we haven't created the page yet) and find a new page
2085 * on a different region.
2086 */
2087 DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2088 goto find;
2089 } else
2090 ret = __memp_fput(mpf,
2091 dbc->thread_info, rpage, dbc->priority);
2092 rpage = NULL;
2093 if (ret != 0)
2094 goto meta_unlock;
2095
2096 if ((ret = __memp_fget(mpf, &meta_pgno,
2097 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0)
2098 goto err;
2099
2100 /* Log the page creation. Can't jump to err if it fails. */
2101 if (DBC_LOGGING(dbc))
2102 ret = __heap_pg_alloc_log(dbp,
2103 dbc->txn, &LSN(meta), 0, &LSN(meta), meta_pgno,
2104 data_pgno, (u_int32_t)P_HEAP, meta->last_pgno);
2105 else
2106 LSN_NOT_LOGGED(LSN(meta));
2107
2108 /*
2109 * We may have created a page earlier with a larger page number
2110 * check before updating the metadata page.
2111 */
2112 if (ret == 0 && data_pgno > meta->last_pgno)
2113 meta->last_pgno = data_pgno;
2114 meta_lsn = LSN(meta);
2115
2116 if ((t_ret = __memp_fput(mpf,
2117 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
2118 ret = t_ret;
2119 meta = NULL;
2120 if (ret != 0)
2121 goto meta_unlock;
2122
2123 /* If the page doesn't actually exist we need to create it. */
2124 if (cp->pgno == PGNO_INVALID) {
2125 cp->pgno = data_pgno;
2126 if ((ret = __memp_fget(mpf, &cp->pgno,
2127 dbc->thread_info, dbc->txn,
2128 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &cp->page)) != 0)
2129 goto meta_unlock;
2130 DB_ASSERT(dbp->env, cp->pgno == data_pgno);
2131 } else if ((ret = __memp_dirty(mpf, &cp->page,
2132 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
2133 /* Did not read the page, so we can release the lock. */
2134 DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2135 goto meta_unlock;
2136 }
2137
2138 /* Now that we have the page we initialize it and we're done. */
2139 P_INIT(cp->page,
2140 dbp->pgsize, cp->pgno, P_INVALID, P_INVALID, 0, P_HEAP);
2141 LSN(cp->page) = meta_lsn;
2142
2143 meta_unlock: if ((t_ret = __TLPUT(dbc, meta_lock)) != 0 && ret == 0)
2144 ret = t_ret;
2145 if (ret != 0)
2146 goto err;
2147 } else {
2148 /* Check whether we actually have enough space on this page. */
2149 check: if (size + sizeof(db_indx_t) > HEAP_FREESPACE(dbp, cp->page)) {
2150 /* Put back the page and lock, they were never used. */
2151 DISCARD(dbc, cp->page, cp->lock, 0, ret);
2152 if (ret != 0)
2153 goto err;
2154
2155 /* Re-start the bitmap check on the next page. */
2156 i++;
2157 goto find;
2158 }
2159
2160 if ((ret = __memp_dirty(mpf, &cp->page,
2161 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
2162 /* Did not read the page, so we can release the lock. */
2163 DISCARD(dbc, cp->page, cp->lock, 0, t_ret);
2164 goto err;
2165 }
2166 }
2167
2168 h->curpgindx = data_pgno - region_pgno - 1;
2169 err: DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2170 if (rpage != NULL && (t_ret = __memp_fput(mpf,
2171 dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
2172 ret = t_ret;
2173
2174 return (ret);
2175 }
2176
2177 /*
2178 * __heap_append --
2179 * Add an item to a heap database.
2180 *
2181 * PUBLIC: int __heap_append
2182 * PUBLIC: __P((DBC *, DBT *, DBT *));
2183 */
2184 int
__heap_append(dbc,key,data)2185 __heap_append(dbc, key, data)
2186 DBC *dbc;
2187 DBT *data, *key;
2188 {
2189 DB *dbp;
2190 DBT tmp_dbt;
2191 DB_HEAP_RID rid;
2192 DB_MPOOLFILE *mpf;
2193 HEAPPG *rpage;
2194 HEAPHDR hdr;
2195 HEAP_CURSOR *cp;
2196 db_indx_t indx;
2197 db_pgno_t region_pgno;
2198 int ret, space, t_ret;
2199 u_int8_t avail;
2200 u_int32_t data_size;
2201
2202 dbp = dbc->dbp;
2203 mpf = dbp->mpf;
2204 ret = t_ret = 0;
2205 rpage = NULL;
2206 cp = (HEAP_CURSOR *)dbc->internal;
2207
2208 /* Need data.size + header size, 4-byte aligned. */
2209 if (F_ISSET(data, DB_DBT_PARTIAL))
2210 data_size = DB_ALIGN(data->doff +
2211 data->size + sizeof(HEAPHDR), sizeof(u_int32_t));
2212 else
2213 data_size = DB_ALIGN(
2214 data->size + sizeof(HEAPHDR), sizeof(u_int32_t));
2215
2216 if (data_size >= HEAP_MAXDATASIZE(dbp))
2217 return (__heapc_split(dbc, key, data, 1));
2218 else if (data_size < sizeof(HEAPSPLITHDR))
2219 data_size = sizeof(HEAPSPLITHDR);
2220
2221 if ((ret = __heap_getpage(dbc, data_size, &avail)) != 0)
2222 goto err;
2223
2224 indx = HEAP_FREEINDX(cp->page);
2225 memset(&hdr, 0, sizeof(HEAPHDR));
2226 hdr.size = data->size;
2227 if (F_ISSET(data, DB_DBT_PARTIAL))
2228 hdr.size += data->doff;
2229 tmp_dbt.data = &hdr;
2230 tmp_dbt.size = sizeof(HEAPHDR);
2231
2232 /* Log the write. */
2233 if (DBC_LOGGING(dbc)) {
2234 if ((ret = __heap_addrem_log(dbp, dbc->txn, &LSN(cp->page),
2235 0, DB_ADD_HEAP, cp->pgno, (u_int32_t)indx,
2236 data_size, &tmp_dbt, data, &LSN(cp->page))) != 0)
2237 goto err;
2238 } else
2239 LSN_NOT_LOGGED(LSN(cp->page));
2240
2241 if ((ret = __heap_pitem(
2242 dbc, (PAGE *)cp->page, indx, data_size, &tmp_dbt, data)) != 0)
2243 goto err;
2244
2245 rid.pgno = cp->pgno;
2246 rid.indx = indx;
2247 cp->indx = indx;
2248
2249 /* Check whether we need to update the space bitmap. */
2250 HEAP_CALCSPACEBITS(dbp, HEAP_FREESPACE(dbp, cp->page), space);
2251
2252 if (space != avail) {
2253 /* Get the region page with an exclusive latch. */
2254 region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
2255 if ((ret = __memp_fget(mpf, ®ion_pgno,
2256 dbc->thread_info, NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
2257 goto err;
2258
2259 HEAP_SETSPACE(dbp, rpage, cp->pgno - region_pgno - 1, space);
2260 }
2261
2262 err: DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2263 if (rpage != NULL && (t_ret = __memp_fput(mpf,
2264 dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
2265 ret = t_ret;
2266
2267 if (cp->page != NULL) {
2268 DISCARD(dbc, cp->page, cp->lock, 1, t_ret);
2269 if (ret == 0)
2270 ret = t_ret;
2271 }
2272
2273 if (ret == 0 && key != NULL)
2274 ret = __db_retcopy(dbp->env, key,
2275 &rid, DB_HEAP_RID_SZ, &dbc->rkey->data, &dbc->rkey->ulen);
2276
2277 return (ret);
2278 }
2279
2280 static int
__heapc_split(dbc,key,data,is_first)2281 __heapc_split(dbc, key, data, is_first)
2282 DBC *dbc;
2283 DBT *key, *data;
2284 int is_first;
2285 {
2286 DB *dbp;
2287 DBT hdr_dbt, t_data;
2288 DB_HEAP_RID rid;
2289 DB_MPOOLFILE *mpf;
2290 HEAPPG *rpage;
2291 HEAPSPLITHDR hdrs;
2292 HEAP_CURSOR *cp;
2293 db_indx_t indx;
2294 db_pgno_t region_pgno;
2295 int ret, spacebits, t_ret;
2296 u_int32_t buflen, doff, left, size;
2297 u_int8_t availbits, *buf;
2298
2299 dbp = dbc->dbp;
2300 mpf = dbp->mpf;
2301 cp = (HEAP_CURSOR *)dbc->internal;
2302 memset(&hdrs, 0, sizeof(HEAPSPLITHDR));
2303 memset(&t_data, 0, sizeof(DBT));
2304 hdrs.std_hdr.flags = HEAP_RECSPLIT | HEAP_RECLAST;
2305
2306 doff = data->doff;
2307 rpage = NULL;
2308 ret = t_ret = 0;
2309 indx = 0;
2310 buf = NULL;
2311 buflen = 0;
2312
2313 /*
2314 * Write the record to multiple pages, in chunks starting from the end.
2315 * To reconstruct during a get we need the RID of the next chunk, so if
2316 * work our way from back to front during writing we always know the rid
2317 * of the "next" chunk, it's the chunk we just wrote.
2318 */
2319 t_data.data = (u_int8_t *)data->data + data->size;
2320 left = data->size;
2321 if (F_ISSET(data, DB_DBT_PARTIAL)) {
2322 left += data->doff;
2323 }
2324 hdrs.tsize = left;
2325 while (left > 0) {
2326 size = DB_ALIGN(left + sizeof(HEAPSPLITHDR), sizeof(u_int32_t));
2327 if (size < sizeof(HEAPSPLITHDR))
2328 size = sizeof(HEAPSPLITHDR);
2329
2330 if (size > HEAP_MAXDATASIZE(dbp))
2331 /*
2332 * Data won't fit on a single page, find one at least
2333 * 33% free.
2334 */
2335 size = DB_ALIGN(dbp->pgsize / 3, sizeof(u_int32_t));
2336 else
2337 hdrs.std_hdr.flags |= HEAP_RECFIRST;
2338
2339 if ((ret = __heap_getpage(dbc, size, &availbits)) != 0)
2340 return (ret);
2341
2342 /*
2343 * size is the total number of bytes being written to the page.
2344 * The header holds the size of the data being written.
2345 */
2346 if (F_ISSET(&(hdrs.std_hdr), HEAP_RECFIRST)) {
2347 hdrs.std_hdr.size = left;
2348 /*
2349 * If we're called from heapc_reloc, we are only writing
2350 * a piece of the full record and shouldn't set
2351 * HEAP_RECFIRST.
2352 */
2353 if (!is_first)
2354 F_CLR(&(hdrs.std_hdr), HEAP_RECFIRST);
2355 } else {
2356 /*
2357 * Figure out how much room is on the page. If we will
2358 * have to expand the offset table, account for that.
2359 */
2360 size = HEAP_FREESPACE(dbp, cp->page);
2361 if (NUM_ENT(cp->page) == 0 ||
2362 HEAP_FREEINDX(cp->page) > HEAP_HIGHINDX(cp->page))
2363 size -= sizeof(db_indx_t);
2364 /* Round down to a multiple of 4. */
2365 size = DB_ALIGN(
2366 size - sizeof(u_int32_t) + 1, sizeof(u_int32_t));
2367 DB_ASSERT(dbp->env, size >= sizeof(HEAPSPLITHDR));
2368 hdrs.std_hdr.size =
2369 (u_int16_t)(size - sizeof(HEAPSPLITHDR));
2370 }
2371
2372 /*
2373 * t_data.data points at the end of the data left to write. Now
2374 * that we know how much we're going to write to this page, we
2375 * can adjust the pointer to point at the start of the data to
2376 * be written.
2377 *
2378 * If DB_DBT_PARTIAL is set, once data->data is exhausted, we
2379 * have to pad with data->doff bytes (or as much as can fit on
2380 * this page.) left - doff gives the number of bytes to use
2381 * from data->data. Once that can't fill t_data, we have to
2382 * start padding.
2383 */
2384 t_data.data = (u_int8_t *)(t_data.data) - hdrs.std_hdr.size;
2385 DB_ASSERT(dbp->env, (F_ISSET(data, DB_DBT_PARTIAL) ||
2386 t_data.data >= data->data));
2387 t_data.size = hdrs.std_hdr.size;
2388 if (F_ISSET(data, DB_DBT_PARTIAL) &&
2389 t_data.size > left - doff) {
2390 if (buflen < t_data.size) {
2391 if (__os_realloc(
2392 dbp->env, t_data.size, &buf) != 0)
2393 return (ENOMEM);
2394 buflen = t_data.size;
2395 }
2396 /*
2397 * We have to figure out how much data remains. left
2398 * includes doff, so we need (left - doff) bytes from
2399 * data. We also need the amount of padding that can
2400 * fit on the page. That's the amount we can fit on the
2401 * page minus the bytes we're taking from data.
2402 */
2403 t_data.data = buf;
2404 memset(buf, '\0', t_data.size - left + doff);
2405 buf += t_data.size - left + doff;
2406 memcpy(buf, data->data, left - doff);
2407 doff -= t_data.size - left + doff;
2408 buf = t_data.data;
2409 }
2410 hdr_dbt.data = &hdrs;
2411 hdr_dbt.size = sizeof(HEAPSPLITHDR);
2412 indx = HEAP_FREEINDX(cp->page);
2413
2414 /* Log the write. */
2415 if (DBC_LOGGING(dbc)) {
2416 if ((ret = __heap_addrem_log(dbp,
2417 dbc->txn, &LSN(cp->page), 0,
2418 DB_ADD_HEAP, cp->pgno, (u_int32_t)indx,
2419 size, &hdr_dbt, &t_data, &LSN(cp->page))) != 0)
2420 goto err;
2421 } else
2422 LSN_NOT_LOGGED(LSN(cp->page));
2423
2424 if ((ret = __heap_pitem(dbc,
2425 (PAGE *)cp->page, indx, size, &hdr_dbt, &t_data)) != 0)
2426 goto err;
2427 F_CLR(&(hdrs.std_hdr), HEAP_RECLAST);
2428 left -= hdrs.std_hdr.size;
2429
2430 /*
2431 * Save the rid where we just wrote, this is the "next"
2432 * chunk.
2433 */
2434 hdrs.nextpg = cp->pgno;
2435 hdrs.nextindx = indx;
2436
2437 /* Check whether we need to update the space bitmap. */
2438 HEAP_CALCSPACEBITS(dbp,
2439 HEAP_FREESPACE(dbp, cp->page), spacebits);
2440
2441 if (spacebits != availbits) {
2442 /* Get the region page with an exclusive latch. */
2443 region_pgno = HEAP_REGION_PGNO(dbp, cp->pgno);
2444 if ((ret = __memp_fget(mpf, ®ion_pgno,
2445 dbc->thread_info,
2446 NULL, DB_MPOOL_DIRTY, &rpage)) != 0)
2447 goto err;
2448
2449 HEAP_SETSPACE(dbp,
2450 rpage, cp->pgno - region_pgno - 1, spacebits);
2451 ret = __memp_fput(mpf,
2452 dbc->thread_info, rpage, dbc->priority);
2453 rpage = NULL;
2454 if (ret != 0)
2455 goto err;
2456 }
2457
2458 }
2459
2460 rid.pgno = cp->pgno;
2461 rid.indx = indx;
2462 cp->indx = indx;
2463
2464 err: if (rpage != NULL && (t_ret = __memp_fput(mpf,
2465 dbc->thread_info, rpage, dbc->priority)) != 0 && ret == 0)
2466 ret = t_ret;
2467 if (cp->page != NULL) {
2468 DISCARD(dbc, cp->page, cp->lock, 1, t_ret);
2469 if (ret == 0)
2470 ret = t_ret;
2471 }
2472 if (buf != NULL)
2473 __os_free(dbp->env, buf);
2474
2475 if (ret == 0 && key != NULL)
2476 ret = __db_retcopy(dbp->env, key,
2477 &rid, DB_HEAP_RID_SZ, &dbc->rkey->data, &dbc->rkey->ulen);
2478 DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2479 return (ret);
2480 }
2481
2482 /*
2483 * __heapc_pitem --
2484 * Put an item on a heap page. Copy all bytes from the header (if any)
2485 * first and then copy from data.
2486 *
2487 * PUBLIC: int __heap_pitem __P((DBC *,
2488 * PUBLIC: PAGE *, u_int32_t, u_int32_t, DBT *, DBT *));
2489 */
2490 int
__heap_pitem(dbc,pagep,indx,nbytes,hdr,data)2491 __heap_pitem(dbc, pagep, indx, nbytes, hdr, data)
2492 DBC *dbc;
2493 PAGE *pagep;
2494 u_int32_t indx;
2495 u_int32_t nbytes;
2496 DBT *hdr, *data;
2497 {
2498 DB *dbp;
2499 u_int8_t *buf;
2500
2501 dbp = dbc->dbp;
2502
2503 DB_ASSERT(dbp->env, TYPE(pagep) == P_HEAP);
2504 DB_ASSERT(dbp->env, IS_DIRTY(pagep));
2505 DB_ASSERT(dbp->env, nbytes == DB_ALIGN(nbytes, sizeof(u_int32_t)));
2506 DB_ASSERT(dbp->env, DB_ALIGN(((HEAPHDR *)hdr->data)->size,
2507 sizeof(u_int32_t)) >= data->size);
2508 DB_ASSERT(dbp->env, nbytes >= hdr->size + data->size);
2509
2510 /*
2511 * We're writing data either as a result of DB->put or as a result of
2512 * undo-ing a delete. If we're undo-ing a delete we just need to write
2513 * the bytes from hdr to the page. Otherwise, we need to construct a
2514 * heap header, etc.
2515 */
2516 HEAP_OFFSETTBL(dbp, pagep)[indx] = HOFFSET(pagep) - nbytes;
2517 buf = P_ENTRY(dbp, pagep, indx);
2518 DB_ASSERT(dbp->env, buf > (u_int8_t*)&HEAP_OFFSETTBL(dbp, pagep)[indx]);
2519
2520 if (hdr != NULL) {
2521 memcpy(buf, hdr->data, hdr->size);
2522 buf += hdr->size;
2523 }
2524 if (F_ISSET(data, DB_DBT_PARTIAL)) {
2525 memset(buf, 0, data->doff);
2526 buf += data->doff;
2527 }
2528 memcpy(buf, data->data, data->size);
2529
2530 /*
2531 * Update data page header. If DEBUG/DIAGNOSTIC is set, the page might
2532 * be filled with 0xdb, so we can't just look for a 0 in the offset
2533 * table. We used the first available index, so start there and scan
2534 * forward. If the table is full, the first available index is the
2535 * highest index plus one.
2536 */
2537 if (indx > HEAP_HIGHINDX(pagep)) {
2538 if (NUM_ENT(pagep) == 0)
2539 HEAP_FREEINDX(pagep) = 0;
2540 else if (HEAP_FREEINDX(pagep) >= indx) {
2541 if (indx > (u_int32_t)HEAP_HIGHINDX(pagep) + 1)
2542 HEAP_FREEINDX(pagep) = HEAP_HIGHINDX(pagep) + 1;
2543 else
2544 HEAP_FREEINDX(pagep) = indx + 1;
2545 }
2546 while (++HEAP_HIGHINDX(pagep) < indx)
2547 HEAP_OFFSETTBL(dbp,pagep)[HEAP_HIGHINDX(pagep)] = 0;
2548 } else {
2549 for (; indx <= HEAP_HIGHINDX(pagep); indx++)
2550 if (HEAP_OFFSETTBL(dbp, pagep)[indx] == 0)
2551 break;
2552 HEAP_FREEINDX(pagep) = indx;
2553 }
2554 HOFFSET(pagep) -= nbytes;
2555 NUM_ENT(pagep)++;
2556
2557 return (0);
2558 }
2559
2560 /*
2561 * __heapc_dup --
2562 * Duplicate a heap cursor, such that the new one holds appropriate
2563 * locks for the position of the original.
2564 *
2565 * PUBLIC: int __heapc_dup __P((DBC *, DBC *));
2566 */
2567 int
__heapc_dup(orig_dbc,new_dbc)2568 __heapc_dup(orig_dbc, new_dbc)
2569 DBC *orig_dbc, *new_dbc;
2570 {
2571 HEAP_CURSOR *orig, *new;
2572
2573 orig = (HEAP_CURSOR *)orig_dbc->internal;
2574 new = (HEAP_CURSOR *)new_dbc->internal;
2575 new->flags = orig->flags;
2576 return (0);
2577 }
2578
2579 /*
2580 * __heapc_gsplit --
2581 * Get a heap split record. The page pointed to by the cursor must
2582 * be the first segment of this record.
2583 *
2584 * PUBLIC: int __heapc_gsplit __P((DBC *,
2585 * PUBLIC: DBT *, void **, u_int32_t *));
2586 */
2587 int
__heapc_gsplit(dbc,dbt,bpp,bpsz)2588 __heapc_gsplit(dbc, dbt, bpp, bpsz)
2589 DBC *dbc;
2590 DBT *dbt;
2591 void **bpp;
2592 u_int32_t *bpsz;
2593 {
2594 DB *dbp;
2595 DB_MPOOLFILE *mpf;
2596 DB_HEAP_RID rid;
2597 DB_LOCK data_lock;
2598 HEAP_CURSOR *cp;
2599 ENV *env;
2600 HEAPPG *dpage;
2601 HEAPSPLITHDR *hdr;
2602 db_indx_t bytes;
2603 u_int32_t curoff, needed, start, tlen;
2604 u_int8_t *p, *src;
2605 int putpage, ret, t_ret;
2606
2607 LOCK_INIT(data_lock);
2608 dbp = dbc->dbp;
2609 env = dbp->env;
2610 mpf = dbp->mpf;
2611 cp = (HEAP_CURSOR *)dbc->internal;
2612 putpage = FALSE;
2613 ret = 0;
2614
2615 /*
2616 * We should have first page, locked already in cursor. Get the
2617 * record id out of the cursor and set up local variables.
2618 */
2619 DB_ASSERT(env, cp->page != NULL);
2620 rid.pgno = cp->pgno;
2621 rid.indx = cp->indx;
2622 dpage = cp->page;
2623 hdr = (HEAPSPLITHDR *)P_ENTRY(dbp, dpage, rid.indx);
2624 DB_ASSERT(env, hdr->tsize != 0);
2625 tlen = hdr->tsize;
2626
2627 /*
2628 * If we doing a partial retrieval, figure out how much we are
2629 * actually going to get.
2630 */
2631 if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
2632 start = dbt->doff;
2633 if (start > tlen)
2634 needed = 0;
2635 else if (dbt->dlen > tlen - start)
2636 needed = tlen - start;
2637 else
2638 needed = dbt->dlen;
2639 } else {
2640 start = 0;
2641 needed = tlen;
2642 }
2643
2644 /*
2645 * If the caller has not requested any data, return success. This
2646 * "early-out" also avoids setting up the streaming optimization when
2647 * no page would be retrieved. If it were removed, the streaming code
2648 * should only initialize when needed is not 0.
2649 */
2650 if (needed == 0) {
2651 dbt->size = 0;
2652 return (0);
2653 }
2654
2655 /*
2656 * Check if the buffer is big enough; if it is not and we are
2657 * allowed to malloc space, then we'll malloc it. If we are
2658 * not (DB_DBT_USERMEM), then we'll set the dbt and return
2659 * appropriately.
2660 */
2661 if (F_ISSET(dbt, DB_DBT_USERCOPY))
2662 goto skip_alloc;
2663
2664 /* Allocate any necessary memory. */
2665 if (F_ISSET(dbt, DB_DBT_USERMEM)) {
2666 if (needed > dbt->ulen) {
2667 dbt->size = needed;
2668 return (DB_BUFFER_SMALL);
2669 }
2670 } else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
2671 if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
2672 return (ret);
2673 } else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
2674 if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
2675 return (ret);
2676 } else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
2677 if ((ret = __os_realloc(env, needed, bpp)) != 0)
2678 return (ret);
2679 *bpsz = needed;
2680 dbt->data = *bpp;
2681 } else if (bpp != NULL)
2682 dbt->data = *bpp;
2683 else {
2684 DB_ASSERT(env,
2685 F_ISSET(dbt,
2686 DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
2687 bpsz != NULL || bpp != NULL);
2688 return (DB_BUFFER_SMALL);
2689 }
2690
2691 skip_alloc:
2692 /*
2693 * Go through each of the pieces, copying the data on each one
2694 * into the buffer. Never copy more than the total data length.
2695 * We are starting off with the page that is currently pointed to by
2696 * the cursor,
2697 */
2698 curoff = 0;
2699 dbt->size = needed;
2700 for (p = dbt->data; needed > 0;) {
2701 /* Check if we need any bytes from this page */
2702 if (curoff + hdr->std_hdr.size >= start) {
2703 bytes = hdr->std_hdr.size;
2704 src = (u_int8_t *)hdr +
2705 P_TO_UINT16(sizeof(HEAPSPLITHDR));
2706 if (start > curoff) {
2707 src += start - curoff;
2708 bytes -= start - curoff;
2709 }
2710 if (bytes > needed)
2711 bytes = needed;
2712 if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
2713 /*
2714 * The offset into the DBT is the total size
2715 * less the amount of data still needed. Care
2716 * needs to be taken if doing a partial copy
2717 * beginning at an offset other than 0.
2718 */
2719 if ((ret = env->dbt_usercopy(
2720 dbt, dbt->size - needed,
2721 src, bytes, DB_USERCOPY_SETDATA)) != 0) {
2722 if (putpage)
2723 (void)__memp_fput(
2724 mpf, dbc->thread_info,
2725 dpage, dbp->priority);
2726
2727 return (ret);
2728 }
2729 } else
2730 memcpy(p, src, bytes);
2731 p += bytes;
2732 needed -= bytes;
2733 }
2734 curoff += hdr->std_hdr.size;
2735
2736 /* Find next record piece as long as it exists */
2737 if (!F_ISSET((HEAPHDR *)hdr, HEAP_RECLAST)) {
2738 rid.pgno = hdr->nextpg;
2739 rid.indx = hdr->nextindx;
2740
2741 /*
2742 * First pass through here, we are using the
2743 * page pointed to by the cursor, and this page
2744 * will get put when the cursor is is closed.
2745 * Only pages specifically gotten in this loop
2746 * need to be put back.
2747 */
2748 if (putpage) {
2749 if ((ret = __memp_fput(mpf, dbc->thread_info,
2750 dpage, dbp->priority) ) != 0)
2751 goto err;
2752 dpage = NULL;
2753 if ((ret = __TLPUT(dbc, data_lock)) != 0)
2754 goto err;
2755 }
2756
2757 if ((ret = __db_lget(dbc, 0, rid.pgno,
2758 DB_LOCK_READ, 0, &data_lock)) != 0)
2759 goto err;
2760 if ((ret = __memp_fget(mpf, &rid.pgno,
2761 dbc->thread_info, dbc->txn, 0, &dpage)) != 0)
2762 goto err;
2763 hdr = (HEAPSPLITHDR *)P_ENTRY(dbp, dpage, rid.indx);
2764 putpage = TRUE;
2765
2766 /*
2767 * If we have the last piece of this record and we're
2768 * reading the entire record, then what we need should
2769 * equal what is remaining.
2770 */
2771 if (F_ISSET((HEAPHDR *)hdr, HEAP_RECLAST) &&
2772 !F_ISSET(dbt, DB_DBT_PARTIAL) &&
2773 (hdr->std_hdr.size != needed)) {
2774 __db_errx(env, DB_STR_A("1167",
2775 "Incorrect record size in header: %s: rid %lu.%lu",
2776 "%s %lu %lu"), dbc->dbp->fname,
2777 (u_long)(cp->pgno), (u_long)(cp->indx));
2778 ret = __env_panic(env, DB_RUNRECOVERY);
2779 goto err;
2780 }
2781 }
2782 }
2783
2784 err: DB_ASSERT(dbp->env, ret != DB_PAGE_NOTFOUND);
2785 if (putpage && dpage != NULL && (t_ret = __memp_fput(mpf,
2786 dbc->thread_info, dpage, dbp->priority)) != 0 && ret == 0)
2787 ret = t_ret;
2788 if ((t_ret = __TLPUT(dbc, data_lock)) != 0 && ret == 0)
2789 ret = t_ret;
2790
2791 return (ret);
2792 }
2793 /*
2794 * __heapc_refresh --
2795 * do the proper set up for cursor reuse.
2796 *
2797 * PUBLIC: int __heapc_refresh __P((DBC *));
2798 */
2799 int
__heapc_refresh(dbc)2800 __heapc_refresh(dbc)
2801 DBC *dbc;
2802 {
2803 HEAP_CURSOR *cp;
2804
2805 cp = (HEAP_CURSOR *)dbc->internal;
2806
2807 LOCK_INIT(cp->lock);
2808 cp->lock_mode = DB_LOCK_NG;
2809 cp->flags = 0;
2810
2811 return (0);
2812 }
2813