1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1997, 1998, 1999
5 * Sleepycat Software. All rights reserved.
6 */
7
8 #include "db_config.h"
9
10 #ifndef lint
11 static const char sccsid[] = "@(#)bt_recno.c 11.9 (Sleepycat) 10/29/99";
12 #endif /* not lint */
13
14 #ifndef NO_SYSTEM_INCLUDES
15 #include <sys/types.h>
16
17 #include <errno.h>
18 #include <limits.h>
19 #include <string.h>
20 #endif
21
22 #include "db_int.h"
23 #include "db_page.h"
24 #include "btree.h"
25 #include "db_ext.h"
26 #include "db_shash.h"
27 #include "lock.h"
28 #include "lock_ext.h"
29 #include "qam.h"
30
31 static int CDB___ram_add __P((DBC *, db_recno_t *, DBT *, u_int32_t, u_int32_t));
32 static int CDB___ram_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
33 static int CDB___ram_fmap __P((DBC *, db_recno_t));
34 static int CDB___ram_i_delete __P((DBC *));
35 static int CDB___ram_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
36 static int CDB___ram_source __P((DB *));
37 static int CDB___ram_update __P((DBC *, db_recno_t, int));
38 static int CDB___ram_vmap __P((DBC *, db_recno_t));
39
40 /*
41 * In recno, there are two meanings to the on-page "deleted" flag. If we're
42 * re-numbering records, it means the record was implicitly created. We skip
43 * over implicitly created records if doing a cursor "next" or "prev", and
44 * return DB_KEYEMPTY if they're explicitly requested.. If not re-numbering
45 * records, it means that the record was implicitly created, or was deleted.
46 * We skip over implicitly created or deleted records if doing a cursor "next"
47 * or "prev", and return DB_KEYEMPTY if they're explicitly requested.
48 *
49 * If we're re-numbering records, then we have to detect in the cursor that
50 * a record was deleted, and adjust the cursor as necessary on the next get.
51 * If we're not re-numbering records, then we can detect that a record has
52 * been deleted by looking at the actual on-page record, so we completely
53 * ignore the cursor's delete flag. This is different from the B+tree code.
54 * It also maintains whether the cursor references a deleted record in the
55 * cursor, and it doesn't always check the on-page value.
56 */
57 #define CD_SET(dbp, cp) { \
58 if (F_ISSET(dbp, DB_RE_RENUMBER)) \
59 F_SET(cp, C_DELETED); \
60 }
61 #define CD_CLR(dbp, cp) { \
62 if (F_ISSET(dbp, DB_RE_RENUMBER)) \
63 F_CLR(cp, C_DELETED); \
64 }
65 #define CD_ISSET(dbp, cp) \
66 (F_ISSET(dbp, DB_RE_RENUMBER) && F_ISSET(cp, C_DELETED))
67
68 /*
69 * CDB___ram_open --
70 * Recno open function.
71 *
72 * PUBLIC: int CDB___ram_open __P((DB *, const char *, db_pgno_t));
73 */
74 int
CDB___ram_open(dbp,name,base_pgno)75 CDB___ram_open(dbp, name, base_pgno)
76 DB *dbp;
77 const char *name;
78 db_pgno_t base_pgno;
79 {
80 BTREE *t;
81 DBC *dbc;
82 int ret, t_ret;
83
84 t = dbp->bt_internal;
85
86 /* Initialize the remaining fields/methods of the DB. */
87 dbp->del = CDB___ram_delete;
88 dbp->put = CDB___ram_put;
89 dbp->stat = CDB___bam_stat;
90
91 /* Set the overflow page size. */
92 CDB___bam_setovflsize(dbp);
93
94 /* Start up the tree. */
95 if ((ret = CDB___bam_read_root(dbp, name, base_pgno)) != 0)
96 goto err;
97
98 /*
99 * If the user specified a source tree, open it and map it in.
100 *
101 * !!!
102 * We don't complain if the user specified transactions or threads.
103 * It's possible to make it work, but you'd better know what you're
104 * doing!
105 */
106 if (t->re_source == NULL)
107 F_SET(t, RECNO_EOF);
108 else
109 if ((ret = CDB___ram_source(dbp)) != 0)
110 goto err;
111
112 /* If we're snapshotting an underlying source file, do it now. */
113 if (F_ISSET(dbp, DB_RE_SNAPSHOT)) {
114 /* Allocate a cursor. */
115 if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
116 goto err;
117
118 /* Do the snapshot. */
119 if ((ret = CDB___ram_update(dbc,
120 DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND)
121 ret = 0;
122
123 /* Discard the cursor. */
124 if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
125 ret = t_ret;
126
127 if (ret != 0)
128 goto err;
129 }
130
131 return (0);
132
133 err: /* If we mmap'd a source file, discard it. */
134 if (t->re_smap != NULL)
135 (void)CDB___os_unmapfile(dbp->dbenv, t->re_smap, t->re_msize);
136
137 /* If we opened a source file, discard it. */
138 if (F_ISSET(&t->re_fh, DB_FH_VALID))
139 (void)CDB___os_closehandle(&t->re_fh);
140 if (t->re_source != NULL)
141 CDB___os_freestr(t->re_source);
142
143 return (ret);
144 }
145
146 /*
147 * CDB___ram_delete --
148 * Recno db->del function.
149 */
150 static int
CDB___ram_delete(dbp,txn,key,flags)151 CDB___ram_delete(dbp, txn, key, flags)
152 DB *dbp;
153 DB_TXN *txn;
154 DBT *key;
155 u_int32_t flags;
156 {
157 BTREE_CURSOR *cp;
158 DBC *dbc;
159 db_recno_t recno;
160 int ret, t_ret;
161
162 PANIC_CHECK(dbp->dbenv);
163
164 /* Check for invalid flags. */
165 if ((ret = CDB___db_delchk(dbp,
166 key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
167 return (ret);
168
169 /* Acquire a cursor. */
170 if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
171 return (ret);
172
173 DEBUG_LWRITE(dbc, txn, "ram_delete", key, NULL, flags);
174
175 /* Check the user's record number and fill in as necessary. */
176 if ((ret = CDB___ram_getno(dbc, key, &recno, 0)) != 0)
177 goto err;
178
179 /* Do the delete. */
180 cp = dbc->internal;
181 cp->recno = recno;
182 ret = CDB___ram_i_delete(dbc);
183
184 /* Release the cursor. */
185 err: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
186 ret = t_ret;
187
188 return (ret);
189 }
190
191 /*
192 * CDB___ram_i_delete --
193 * Internal version of recno delete, called by CDB___ram_delete and
194 * CDB___ram_c_del.
195 */
196 static int
CDB___ram_i_delete(dbc)197 CDB___ram_i_delete(dbc)
198 DBC *dbc;
199 {
200 BKEYDATA bk;
201 BTREE *t;
202 BTREE_CURSOR *cp;
203 DB *dbp;
204 DBT hdr, data;
205 PAGE *h;
206 db_indx_t indx;
207 int exact, ret, stack;
208
209 dbp = dbc->dbp;
210 cp = dbc->internal;
211 t = dbp->bt_internal;
212 stack = 0;
213
214 /*
215 * If this is CDB and this isn't a write cursor, then it's an error.
216 * If it is a write cursor, but we don't yet hold the write lock, then
217 * we need to upgrade to the write lock.
218 */
219 if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) {
220 /* Make sure it's a valid update cursor. */
221 if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))
222 return (EINVAL);
223
224 if (F_ISSET(dbc, DBC_WRITECURSOR) &&
225 (ret = CDB_lock_get(dbp->dbenv, dbc->locker,
226 DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
227 &dbc->mylock)) != 0)
228 return (ret);
229 }
230
231 /* Search the tree for the key; delete only deletes exact matches. */
232 if ((ret = CDB___bam_rsearch(dbc, &cp->recno, S_DELETE, 1, &exact)) != 0)
233 goto err;
234 if (!exact) {
235 ret = DB_NOTFOUND;
236 goto err;
237 }
238 stack = 1;
239
240 h = cp->csp->page;
241 indx = cp->csp->indx;
242
243 /*
244 * If re-numbering records, the on-page deleted flag can only mean
245 * that this record was implicitly created. Applications aren't
246 * permitted to delete records they never created, return an error.
247 *
248 * If not re-numbering records, the on-page deleted flag means that
249 * this record was implicitly created, or, was deleted at some time.
250 * The former is an error because applications aren't permitted to
251 * delete records they never created, the latter is an error because
252 * if the record was "deleted", we could never have found it.
253 */
254 if (B_DISSET(GET_BKEYDATA(h, indx)->type)) {
255 ret = DB_KEYEMPTY;
256 goto err;
257 }
258
259 if (F_ISSET(dbp, DB_RE_RENUMBER)) {
260 /* Delete the item, adjust the counts, adjust the cursors. */
261 if ((ret = CDB___bam_ditem(dbc, h, indx)) != 0)
262 goto err;
263 CDB___bam_adjust(dbc, -1);
264 CDB___ram_ca(dbp, cp->recno, CA_DELETE);
265
266 /*
267 * If the page is empty, delete it. The whole tree is locked
268 * so there are no preparations to make.
269 */
270 if (NUM_ENT(h) == 0 && h->pgno != t->bt_root) {
271 stack = 0;
272 ret = CDB___bam_dpages(dbc);
273 }
274 } else {
275 /* Use a delete/put pair to replace the record with a marker. */
276 if ((ret = CDB___bam_ditem(dbc, h, indx)) != 0)
277 goto err;
278
279 B_TSET(bk.type, B_KEYDATA, 1);
280 bk.len = 0;
281 memset(&hdr, 0, sizeof(hdr));
282 hdr.data = &bk;
283 hdr.size = SSZA(BKEYDATA, data);
284 memset(&data, 0, sizeof(data));
285 data.data = (void *)"";
286 data.size = 0;
287 if ((ret = CDB___db_pitem(dbc,
288 h, indx, BKEYDATA_SIZE(0), &hdr, &data)) != 0)
289 goto err;
290 }
291 F_SET(t, RECNO_MODIFIED);
292
293 err: if (stack)
294 CDB___bam_stkrel(dbc, 0);
295
296 /* If we upgraded the CDB lock upon entry; downgrade it now. */
297 if (F_ISSET(dbc, DBC_WRITECURSOR))
298 (void)CDB___lock_downgrade(dbp->dbenv,
299 &dbc->mylock, DB_LOCK_IWRITE, 0);
300 return (ret);
301 }
302
303 /*
304 * CDB___ram_put --
305 * Recno db->put function.
306 */
307 static int
CDB___ram_put(dbp,txn,key,data,flags)308 CDB___ram_put(dbp, txn, key, data, flags)
309 DB *dbp;
310 DB_TXN *txn;
311 DBT *key, *data;
312 u_int32_t flags;
313 {
314 DBC *dbc;
315 db_recno_t recno;
316 int ret, t_ret;
317
318 PANIC_CHECK(dbp->dbenv);
319
320 /* Check for invalid flags. */
321 if ((ret = CDB___db_putchk(dbp,
322 key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0)
323 return (ret);
324
325 /* Allocate a cursor. */
326 if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
327 return (ret);
328
329 DEBUG_LWRITE(dbc, txn, "ram_put", key, data, flags);
330
331 /*
332 * If we're appending to the tree, make sure we've read in all of
333 * the backing source file. Otherwise, check the user's record
334 * number and fill in as necessary.
335 */
336 if (flags == DB_APPEND) {
337 if ((ret = CDB___ram_update(
338 dbc, DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND)
339 ret = 0;
340 } else
341 ret = CDB___ram_getno(dbc, key, &recno, 1);
342
343 /* Add the record. */
344 if (ret == 0)
345 ret = CDB___ram_add(dbc, &recno, data, flags, 0);
346
347 /* Discard the cursor. */
348 if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
349 ret = t_ret;
350
351 /* Return the record number if we're appending to the tree. */
352 if (ret == 0 && flags == DB_APPEND)
353 *(db_recno_t *)key->data = recno;
354
355 return (ret);
356 }
357
358 /*
359 * CDB___ram_c_del --
360 * Recno cursor->c_del function.
361 *
362 * PUBLIC: int CDB___ram_c_del __P((DBC *, u_int32_t));
363 */
364 int
CDB___ram_c_del(dbc,flags)365 CDB___ram_c_del(dbc, flags)
366 DBC *dbc;
367 u_int32_t flags;
368 {
369 BTREE_CURSOR *cp;
370 DB *dbp;
371 int ret;
372
373 dbp = dbc->dbp;
374 cp = dbc->internal;
375
376 PANIC_CHECK(dbp->dbenv);
377
378 /* Check for invalid flags. */
379 if ((ret = CDB___db_cdelchk(dbp, flags,
380 F_ISSET(dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0)
381 return (ret);
382
383 DEBUG_LWRITE(dbc, dbc->txn, "ram_c_del", NULL, NULL, flags);
384
385 /*
386 * The semantics of cursors during delete are as follows: if record
387 * numbers are mutable (DB_RE_RENUMBER is set), deleting a record
388 * causes the cursor to automatically point to the record immediately
389 * following. In this case it is possible to use a single cursor for
390 * repeated delete operations, without intervening operations.
391 *
392 * If record numbers are not mutable, then records are replaced with
393 * a marker containing a delete flag. If the record referenced by
394 * this cursor has already been deleted, we will detect that as part
395 * of the delete operation, and fail.
396 */
397 return (CDB___ram_i_delete(dbc));
398 }
399
400 /*
401 * CDB___ram_c_get --
402 * Recno cursor->c_get function.
403 *
404 * PUBLIC: int CDB___ram_c_get __P((DBC *, DBT *, DBT *, u_int32_t));
405 */
406 int
CDB___ram_c_get(dbc,key,data,flags)407 CDB___ram_c_get(dbc, key, data, flags)
408 DBC *dbc;
409 DBT *key, *data;
410 u_int32_t flags;
411 {
412 BTREE_CURSOR *cp, copy;
413 DB *dbp;
414 PAGE *h;
415 db_indx_t indx;
416 int exact, ret, stack, tmp_rmw;
417
418 dbp = dbc->dbp;
419 cp = dbc->internal;
420
421 PANIC_CHECK(dbp->dbenv);
422
423 /* Check for invalid flags. */
424 if ((ret = CDB___db_cgetchk(dbc->dbp,
425 key, data, flags, cp->recno != RECNO_OOB)) != 0)
426 return (ret);
427
428 /* Clear OR'd in additional bits so we can check for flag equality. */
429 tmp_rmw = 0;
430 if (LF_ISSET(DB_RMW)) {
431 tmp_rmw = 1;
432 F_SET(dbc, DBC_RMW);
433 LF_CLR(DB_RMW);
434 }
435
436 DEBUG_LREAD(dbc, dbc->txn, "ram_c_get",
437 flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
438
439 /* Initialize the cursor for a new retrieval. */
440 copy = *cp;
441
442 retry: /* Update the record number. */
443 stack = 0;
444 switch (flags) {
445 case DB_CURRENT:
446 /*
447 * If record numbers are mutable: if we just deleted a record,
448 * there is no action necessary, we return the record following
449 * the deleted item by virtue of renumbering the tree.
450 */
451 break;
452 case DB_NEXT:
453 /*
454 * If record numbers are mutable: if we just deleted a record,
455 * we have to avoid incrementing the record number so that we
456 * return the right record by virtue of renumbering the tree.
457 */
458 if (CD_ISSET(dbp, cp))
459 break;
460
461 if (cp->recno != RECNO_OOB) {
462 ++cp->recno;
463 break;
464 }
465 /* FALLTHROUGH */
466 case DB_FIRST:
467 flags = DB_NEXT;
468 cp->recno = 1;
469 break;
470 case DB_PREV:
471 if (cp->recno != RECNO_OOB) {
472 if (cp->recno == 1) {
473 ret = DB_NOTFOUND;
474 goto err;
475 }
476 --cp->recno;
477 break;
478 }
479 /* FALLTHROUGH */
480 case DB_LAST:
481 flags = DB_PREV;
482 if (((ret = CDB___ram_update(dbc,
483 DB_MAX_RECORDS, 0)) != 0) && ret != DB_NOTFOUND)
484 goto err;
485 if ((ret = CDB___bam_nrecs(dbc, &cp->recno)) != 0)
486 goto err;
487 if (cp->recno == 0) {
488 ret = DB_NOTFOUND;
489 goto err;
490 }
491 break;
492 case DB_SET:
493 case DB_SET_RANGE:
494 if ((ret = CDB___ram_getno(dbc, key, &cp->recno, 0)) != 0)
495 goto err;
496 break;
497 }
498
499 /*
500 * For DB_PREV, DB_LAST, DB_SET and DB_SET_RANGE, we have already
501 * called CDB___ram_update() to make sure sufficient records have been
502 * read from the backing source file. Do it now for DB_CURRENT (if
503 * the current record was deleted we may need more records from the
504 * backing file for a DB_CURRENT operation), DB_FIRST and DB_NEXT.
505 */
506 if (flags == DB_NEXT && ((ret =
507 CDB___ram_update(dbc, cp->recno, 0)) != 0) && ret != DB_NOTFOUND)
508 goto err;
509
510 /* Search the tree for the record. */
511 if ((ret = CDB___bam_rsearch(dbc, &cp->recno,
512 F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND, 1, &exact)) != 0)
513 goto err;
514 stack = 1;
515 if (!exact) {
516 ret = DB_NOTFOUND;
517 goto err;
518 }
519 h = cp->csp->page;
520 indx = cp->csp->indx;
521
522 /*
523 * If re-numbering records, the on-page deleted flag means this record
524 * was implicitly created. If not re-numbering records, the on-page
525 * deleted flag means this record was implicitly created, or, it was
526 * deleted at some time. Regardless, we skip such records if doing
527 * cursor next/prev operations, and fail if the application requested
528 * them explicitly.
529 */
530 if (B_DISSET(GET_BKEYDATA(h, indx)->type)) {
531 if (flags == DB_NEXT || flags == DB_PREV) {
532 (void)CDB___bam_stkrel(dbc, 0);
533 goto retry;
534 }
535 ret = DB_KEYEMPTY;
536 goto err;
537 }
538
539 /* Return the key if the user didn't give us one. */
540 if (flags != DB_SET && flags != DB_SET_RANGE &&
541 (ret = CDB___db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
542 &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
543 goto err;
544
545 /* Return the data item. */
546 if ((ret = CDB___db_ret(dbp,
547 h, indx, data, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
548 goto err;
549
550 /* The cursor was reset, no further delete adjustment is necessary. */
551 CD_CLR(dbp, cp);
552
553 err: if (stack)
554 (void)CDB___bam_stkrel(dbc, 0);
555
556 /* Release temporary lock upgrade. */
557 if (tmp_rmw)
558 F_CLR(dbc, DBC_RMW);
559
560 if (ret != 0)
561 *cp = copy;
562
563 return (ret);
564 }
565
566 /*
567 * CDB___ram_c_put --
568 * Recno cursor->c_put function.
569 *
570 * PUBLIC: int CDB___ram_c_put __P((DBC *, DBT *, DBT *, u_int32_t));
571 */
572 int
CDB___ram_c_put(dbc,key,data,flags)573 CDB___ram_c_put(dbc, key, data, flags)
574 DBC *dbc;
575 DBT *key, *data;
576 u_int32_t flags;
577 {
578 BTREE_CURSOR *cp, copy;
579 DB *dbp;
580 int exact, ret;
581 void *arg;
582
583 dbp = dbc->dbp;
584 cp = dbc->internal;
585
586 PANIC_CHECK(dbp->dbenv);
587
588 if ((ret = CDB___db_cputchk(dbc->dbp, key, data, flags,
589 F_ISSET(dbc->dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0)
590 return (ret);
591
592 DEBUG_LWRITE(dbc, dbc->txn, "ram_c_put", NULL, data, flags);
593
594 /*
595 * If we are running CDB, this had better be either a write
596 * cursor or an immediate writer. If it's a regular writer,
597 * that means we have an IWRITE lock and we need to upgrade
598 * it to a write lock.
599 */
600 if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) {
601 if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))
602 return (EINVAL);
603
604 if (F_ISSET(dbc, DBC_WRITECURSOR) &&
605 (ret = CDB_lock_get(dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE,
606 &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0)
607 return (ret);
608 }
609
610 /* Initialize the cursor for a new retrieval. */
611 copy = *cp;
612
613 /*
614 * To split, we need a valid key for the page.
615 *
616 * The split code discards all short-term locks and stack pages.
617 */
618 if (0) {
619 split: arg = &cp->recno;
620 if ((ret = CDB___bam_split(dbc, arg)) != 0)
621 goto err;
622 }
623
624 if ((ret = CDB___bam_rsearch(dbc, &cp->recno, S_INSERT, 1, &exact)) != 0)
625 goto err;
626 if (!exact) {
627 ret = DB_NOTFOUND;
628 goto err;
629 }
630 if ((ret = CDB___bam_iitem(dbc, &cp->csp->page,
631 &cp->csp->indx, key, data, flags, 0)) == DB_NEEDSPLIT) {
632 if ((ret = CDB___bam_stkrel(dbc, 0)) != 0)
633 goto err;
634 goto split;
635 }
636 if ((ret = CDB___bam_stkrel(dbc, 0)) != 0)
637 goto err;
638
639 switch (flags) {
640 case DB_AFTER:
641 /* Adjust the cursors. */
642 CDB___ram_ca(dbp, cp->recno, CA_IAFTER);
643
644 /* Set this cursor to reference the new record. */
645 cp->recno = copy.recno + 1;
646 break;
647 case DB_BEFORE:
648 /* Adjust the cursors. */
649 CDB___ram_ca(dbp, cp->recno, CA_IBEFORE);
650
651 /* Set this cursor to reference the new record. */
652 cp->recno = copy.recno;
653 break;
654 }
655
656 /* Return the key if we've created a new record. */
657 if ((flags == DB_AFTER || flags == DB_BEFORE) &&
658 (ret = CDB___db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
659 &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
660 goto err;
661
662 /* The cursor was reset, no further delete adjustment is necessary. */
663 CD_CLR(dbp, cp);
664
665 err: if (F_ISSET(dbc, DBC_WRITECURSOR))
666 (void)CDB___lock_downgrade(dbp->dbenv,
667 &dbc->mylock, DB_LOCK_IWRITE, 0);
668
669 if (ret != 0)
670 *cp = copy;
671
672 return (ret);
673 }
674
675 /*
676 * CDB___ram_ca --
677 * Adjust cursors.
678 *
679 * PUBLIC: void CDB___ram_ca __P((DB *, db_recno_t, ca_recno_arg));
680 */
681 void
CDB___ram_ca(dbp,recno,op)682 CDB___ram_ca(dbp, recno, op)
683 DB *dbp;
684 db_recno_t recno;
685 ca_recno_arg op;
686 {
687 BTREE_CURSOR *cp;
688 DBC *dbc;
689 db_recno_t nrecs;
690
691 /*
692 * Adjust the cursors. See the comment in CDB___bam_ca_delete().
693 */
694 MUTEX_THREAD_LOCK(dbp->mutexp);
695 for (dbc = TAILQ_FIRST(&dbp->active_queue);
696 dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
697 cp = dbc->internal;
698 switch (op) {
699 case CA_DELETE:
700 if (recno < cp->recno)
701 --cp->recno;
702 if (recno == cp->recno) {
703 if (CDB___bam_nrecs(
704 dbc, &nrecs) == 0 && recno > nrecs)
705 --cp->recno;
706 else
707 CD_SET(dbp, cp);
708 }
709 break;
710 case CA_IAFTER:
711 if (recno < cp->recno)
712 ++cp->recno;
713 break;
714 case CA_IBEFORE:
715 if (recno <= cp->recno)
716 ++cp->recno;
717 break;
718 }
719 }
720 MUTEX_THREAD_UNLOCK(dbp->mutexp);
721 }
722
723 /*
724 * CDB___ram_getno --
725 * Check the user's record number, and make sure we've seen it.
726 *
727 * PUBLIC: int CDB___ram_getno __P((DBC *, const DBT *, db_recno_t *, int));
728 */
729 int
CDB___ram_getno(dbc,key,rep,can_create)730 CDB___ram_getno(dbc, key, rep, can_create)
731 DBC *dbc;
732 const DBT *key;
733 db_recno_t *rep;
734 int can_create;
735 {
736 DB *dbp;
737 db_recno_t recno;
738
739 dbp = dbc->dbp;
740
741 /* Check the user's record number. */
742 if ((recno = *(db_recno_t *)key->data) == 0) {
743 CDB___db_err(dbp->dbenv, "illegal record number of 0");
744 return (EINVAL);
745 }
746 if (rep != NULL)
747 *rep = recno;
748
749 /*
750 * Btree can neither create records nor read them in. Recno can
751 * do both, see if we can find the record.
752 */
753 return (dbp->type == DB_RECNO ?
754 CDB___ram_update(dbc, recno, can_create) : 0);
755 }
756
757 /*
758 * CDB___ram_update --
759 * Ensure the tree has records up to and including the specified one.
760 */
761 static int
CDB___ram_update(dbc,recno,can_create)762 CDB___ram_update(dbc, recno, can_create)
763 DBC *dbc;
764 db_recno_t recno;
765 int can_create;
766 {
767 BTREE *t;
768 DB *dbp;
769 db_recno_t nrecs;
770 int ret;
771
772 dbp = dbc->dbp;
773 t = dbp->bt_internal;
774
775 /*
776 * If we can't create records and we've read the entire backing input
777 * file, we're done.
778 */
779 if (!can_create && F_ISSET(t, RECNO_EOF))
780 return (0);
781
782 /*
783 * If we haven't seen this record yet, try to get it from the original
784 * file.
785 */
786 if ((ret = CDB___bam_nrecs(dbc, &nrecs)) != 0)
787 return (ret);
788 if (!F_ISSET(t, RECNO_EOF) && recno > nrecs) {
789 if ((ret = t->re_irec(dbc, recno)) != 0)
790 return (ret);
791 if ((ret = CDB___bam_nrecs(dbc, &nrecs)) != 0)
792 return (ret);
793 }
794
795 /*
796 * If we can create records, create empty ones up to the requested
797 * record.
798 */
799 if (!can_create || recno <= nrecs + 1)
800 return (0);
801
802 dbc->rdata.dlen = 0;
803 dbc->rdata.doff = 0;
804 dbc->rdata.flags = 0;
805 if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
806 if (dbc->rdata.ulen < t->re_len) {
807 if ((ret = CDB___os_realloc(t->re_len,
808 NULL, &dbc->rdata.data)) != 0) {
809 dbc->rdata.ulen = 0;
810 dbc->rdata.data = NULL;
811 return (ret);
812 }
813 dbc->rdata.ulen = t->re_len;
814 }
815 dbc->rdata.size = t->re_len;
816 memset(dbc->rdata.data, t->re_pad, t->re_len);
817 } else
818 dbc->rdata.size = 0;
819
820 while (recno > ++nrecs)
821 if ((ret = CDB___ram_add(dbc,
822 &nrecs, &dbc->rdata, 0, BI_DELETED)) != 0)
823 return (ret);
824 return (0);
825 }
826
827 /*
828 * CDB___ram_source --
829 * Load information about the backing file.
830 */
831 static int
CDB___ram_source(dbp)832 CDB___ram_source(dbp)
833 DB *dbp;
834 {
835 BTREE *t;
836 size_t size;
837 u_int32_t bytes, mbytes;
838 int ret;
839
840 t = dbp->bt_internal;
841
842 /*
843 * !!!
844 * The caller has full responsibility for cleaning up on error --
845 * (it has to anyway, in case it fails after this routine succeeds).
846 */
847 if ((ret = CDB___db_appname(dbp->dbenv,
848 DB_APP_DATA, NULL, t->re_source, 0, NULL, &t->re_source)) != 0)
849 return (ret);
850
851 /*
852 * !!!
853 * It's possible that the backing source file is read-only. We don't
854 * much care other than we'll complain if there are any modifications
855 * when it comes time to write the database back to the source.
856 */
857 ret = CDB___os_open(t->re_source,
858 F_ISSET(dbp, DB_AM_RDONLY) ? DB_OSO_RDONLY : 0, 0, &t->re_fh);
859 if (ret != 0 && !F_ISSET(dbp, DB_AM_RDONLY))
860 ret = CDB___os_open(t->re_source, DB_OSO_RDONLY, 0, &t->re_fh);
861 if (ret != 0) {
862 CDB___db_err(dbp->dbenv, "%s: %s", t->re_source, CDB_db_strerror(ret));
863 return (ret);
864 }
865
866 /*
867 * XXX
868 * We'd like to test to see if the file is too big to mmap. Since we
869 * don't know what size or type off_t's or size_t's are, or the largest
870 * unsigned integral type is, or what random insanity the local C
871 * compiler will perpetrate, doing the comparison in a portable way is
872 * flatly impossible. Hope that mmap fails if the file is too large.
873 */
874 if ((ret = CDB___os_ioinfo(t->re_source,
875 &t->re_fh, &mbytes, &bytes, NULL)) != 0) {
876 CDB___db_err(dbp->dbenv, "%s: %s", t->re_source, CDB_db_strerror(ret));
877 return (ret);
878 }
879 if (mbytes == 0 && bytes == 0) {
880 F_SET(t, RECNO_EOF);
881 return (0);
882 }
883
884 size = mbytes * MEGABYTE + bytes;
885 if ((ret = CDB___os_mapfile(dbp->dbenv, t->re_source,
886 &t->re_fh, (size_t)size, 1, &t->re_smap)) != 0)
887 return (ret);
888 t->re_cmap = t->re_smap;
889 t->re_emap = (u_int8_t *)t->re_smap + (t->re_msize = size);
890 t->re_irec = F_ISSET(dbp, DB_RE_FIXEDLEN) ? CDB___ram_fmap : CDB___ram_vmap;
891 return (0);
892 }
893
894 /*
895 * CDB___ram_writeback --
896 * Rewrite the backing file.
897 *
898 * PUBLIC: int CDB___ram_writeback __P((DB *));
899 */
900 int
CDB___ram_writeback(dbp)901 CDB___ram_writeback(dbp)
902 DB *dbp;
903 {
904 BTREE *t;
905 DBC *dbc;
906 DBT key, data;
907 DB_FH fh;
908 db_recno_t keyno;
909 ssize_t nw;
910 int ret, t_ret;
911 u_int8_t delim, *pad;
912
913 t = dbp->bt_internal;
914
915 /* If the file wasn't modified, we're done. */
916 if (!F_ISSET(t, RECNO_MODIFIED))
917 return (0);
918
919 /* If there's no backing source file, we're done. */
920 if (t->re_source == NULL) {
921 F_CLR(t, RECNO_MODIFIED);
922 return (0);
923 }
924
925 /* Allocate a cursor. */
926 if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
927 return (ret);
928
929 /*
930 * Read any remaining records into the tree.
931 *
932 * !!!
933 * This is why we can't support transactions when applications specify
934 * backing (re_source) files. At this point we have to read in the
935 * rest of the records from the file so that we can write all of the
936 * records back out again, which could modify a page for which we'd
937 * have to log changes and which we don't have locked. This could be
938 * partially fixed by taking a snapshot of the entire file during the
939 * DB->open as DB->open is transaction protected. But, if a checkpoint
940 * occurs then, the part of the log holding the copy of the file could
941 * be discarded, and that would make it impossible to recover in the
942 * face of disaster. This could all probably be fixed, but it would
943 * require transaction protecting the backing source file, i.e. mpool
944 * would have to know about it, and we don't want to go there.
945 */
946 if ((ret =
947 CDB___ram_update(dbc, DB_MAX_RECORDS, 0)) != 0 && ret != DB_NOTFOUND)
948 return (ret);
949
950 /*
951 * !!!
952 * Close any underlying mmap region. This is required for Windows NT
953 * (4.0, Service Pack 2) -- if the file is still mapped, the following
954 * open will fail.
955 */
956 if (t->re_smap != NULL) {
957 (void)CDB___os_unmapfile(dbp->dbenv, t->re_smap, t->re_msize);
958 t->re_smap = NULL;
959 }
960
961 /* Get rid of any backing file descriptor, just on GP's. */
962 if (F_ISSET(&t->re_fh, DB_FH_VALID))
963 (void)CDB___os_closehandle(&t->re_fh);
964
965 /* Open the file, truncating it. */
966 if ((ret = CDB___os_open(
967 t->re_source, DB_OSO_SEQ | DB_OSO_TRUNC, 0, &fh)) != 0) {
968 CDB___db_err(dbp->dbenv, "%s: %s", t->re_source, CDB_db_strerror(ret));
969 goto err;
970 }
971
972 /*
973 * We step through the records, writing each one out. Use the record
974 * number and the dbp->get() function, instead of a cursor, so we find
975 * and write out "deleted" or non-existent records.
976 */
977 memset(&key, 0, sizeof(key));
978 memset(&data, 0, sizeof(data));
979 key.size = sizeof(db_recno_t);
980 key.data = &keyno;
981
982 /*
983 * We'll need the delimiter if we're doing variable-length records,
984 * and the pad character if we're doing fixed-length records.
985 */
986 delim = t->re_delim;
987 if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
988 if ((ret = CDB___os_malloc(t->re_len, NULL, &pad)) != 0)
989 goto err;
990 memset(pad, t->re_pad, t->re_len);
991 } else
992 COMPQUIET(pad, NULL);
993 for (keyno = 1;; ++keyno) {
994 switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
995 case 0:
996 if ((ret =
997 CDB___os_write(&fh, data.data, data.size, &nw)) != 0)
998 goto err;
999 if (nw != (ssize_t)data.size) {
1000 ret = EIO;
1001 goto err;
1002 }
1003 break;
1004 case DB_KEYEMPTY:
1005 if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
1006 if ((ret = CDB___os_write(
1007 &fh, pad, t->re_len, &nw)) != 0)
1008 goto err;
1009 if (nw != (ssize_t)t->re_len) {
1010 ret = EIO;
1011 goto err;
1012 }
1013 }
1014 break;
1015 case DB_NOTFOUND:
1016 ret = 0;
1017 goto done;
1018 }
1019 if (!F_ISSET(dbp, DB_RE_FIXEDLEN)) {
1020 if ((ret = CDB___os_write(&fh, &delim, 1, &nw)) != 0)
1021 goto err;
1022 if (nw != 1) {
1023 ret = EIO;
1024 goto err;
1025 }
1026 }
1027 }
1028
1029 err:
1030 done: /* Close the file descriptor. */
1031 if (F_ISSET(&fh, DB_FH_VALID) &&
1032 (t_ret = CDB___os_closehandle(&fh)) != 0 && ret == 0)
1033 ret = t_ret;
1034
1035 /* Discard the cursor. */
1036 if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
1037 ret = t_ret;
1038
1039 if (ret == 0)
1040 F_CLR(t, RECNO_MODIFIED);
1041
1042 return (ret);
1043 }
1044
1045 /*
1046 * CDB___ram_fmap --
1047 * Get fixed length records from a file.
1048 */
1049 static int
CDB___ram_fmap(dbc,top)1050 CDB___ram_fmap(dbc, top)
1051 DBC *dbc;
1052 db_recno_t top;
1053 {
1054 BTREE *t;
1055 DB *dbp;
1056 DBT data;
1057 db_recno_t recno;
1058 u_int32_t len;
1059 u_int8_t *sp, *ep, *p;
1060 int is_modified, ret;
1061
1062 dbp = dbc->dbp;
1063 t = dbp->bt_internal;
1064
1065 if ((ret = CDB___bam_nrecs(dbc, &recno)) != 0)
1066 return (ret);
1067
1068 if (dbc->rdata.ulen < t->re_len) {
1069 if ((ret =
1070 CDB___os_realloc(t->re_len, NULL, &dbc->rdata.data)) != 0) {
1071 dbc->rdata.ulen = 0;
1072 dbc->rdata.data = NULL;
1073 return (ret);
1074 }
1075 dbc->rdata.ulen = t->re_len;
1076 }
1077
1078 is_modified = F_ISSET(t, RECNO_MODIFIED);
1079
1080 memset(&data, 0, sizeof(data));
1081 data.data = dbc->rdata.data;
1082 data.size = t->re_len;
1083
1084 sp = (u_int8_t *)t->re_cmap;
1085 ep = (u_int8_t *)t->re_emap;
1086 while (recno < top) {
1087 if (sp >= ep) {
1088 F_SET(t, RECNO_EOF);
1089 ret = DB_NOTFOUND;
1090 goto err;
1091 }
1092 len = t->re_len;
1093 for (p = dbc->rdata.data;
1094 sp < ep && len > 0; *p++ = *sp++, --len)
1095 ;
1096
1097 /*
1098 * Another process may have read this record from the input
1099 * file and stored it into the database already, in which
1100 * case we don't need to repeat that operation. We detect
1101 * this by checking if the last record we've read is greater
1102 * or equal to the number of records in the database.
1103 *
1104 * XXX
1105 * We should just do a seek, since the records are fixed
1106 * length.
1107 */
1108 if (t->re_last >= recno) {
1109 if (len != 0)
1110 memset(p, t->re_pad, len);
1111
1112 ++recno;
1113 if ((ret = CDB___ram_add(dbc, &recno, &data, 0, 0)) != 0)
1114 goto err;
1115 }
1116 ++t->re_last;
1117 }
1118 t->re_cmap = sp;
1119
1120 err: if (!is_modified)
1121 F_CLR(t, RECNO_MODIFIED);
1122
1123 return (0);
1124 }
1125
1126 /*
1127 * CDB___ram_vmap --
1128 * Get variable length records from a file.
1129 */
1130 static int
CDB___ram_vmap(dbc,top)1131 CDB___ram_vmap(dbc, top)
1132 DBC *dbc;
1133 db_recno_t top;
1134 {
1135 BTREE *t;
1136 DBT data;
1137 db_recno_t recno;
1138 u_int8_t *sp, *ep;
1139 int delim, is_modified, ret;
1140
1141 t = dbc->dbp->bt_internal;
1142
1143 if ((ret = CDB___bam_nrecs(dbc, &recno)) != 0)
1144 return (ret);
1145
1146 delim = t->re_delim;
1147 is_modified = F_ISSET(t, RECNO_MODIFIED);
1148
1149 memset(&data, 0, sizeof(data));
1150
1151 sp = (u_int8_t *)t->re_cmap;
1152 ep = (u_int8_t *)t->re_emap;
1153 while (recno < top) {
1154 if (sp >= ep) {
1155 F_SET(t, RECNO_EOF);
1156 ret = DB_NOTFOUND;
1157 goto err;
1158 }
1159 for (data.data = sp; sp < ep && *sp != delim; ++sp)
1160 ;
1161
1162 /*
1163 * Another process may have read this record from the input
1164 * file and stored it into the database already, in which
1165 * case we don't need to repeat that operation. We detect
1166 * this by checking if the last record we've read is greater
1167 * or equal to the number of records in the database.
1168 */
1169 if (t->re_last >= recno) {
1170 data.size = sp - (u_int8_t *)data.data;
1171 ++recno;
1172 if ((ret = CDB___ram_add(dbc, &recno, &data, 0, 0)) != 0)
1173 goto err;
1174 }
1175 ++t->re_last;
1176 ++sp;
1177 }
1178 t->re_cmap = sp;
1179
1180 err: if (!is_modified)
1181 F_CLR(t, RECNO_MODIFIED);
1182
1183 return (ret);
1184 }
1185
1186 /*
1187 * CDB___ram_add --
1188 * Add records into the tree.
1189 */
1190 static int
CDB___ram_add(dbc,recnop,data,flags,bi_flags)1191 CDB___ram_add(dbc, recnop, data, flags, bi_flags)
1192 DBC *dbc;
1193 db_recno_t *recnop;
1194 DBT *data;
1195 u_int32_t flags, bi_flags;
1196 {
1197 BKEYDATA *bk;
1198 BTREE_CURSOR *cp;
1199 PAGE *h;
1200 db_indx_t indx;
1201 int exact, ret, stack;
1202
1203 cp = dbc->internal;
1204
1205 retry: /* Find the slot for insertion. */
1206 if ((ret = CDB___bam_rsearch(dbc, recnop,
1207 S_INSERT | (flags == DB_APPEND ? S_APPEND : 0), 1, &exact)) != 0)
1208 return (ret);
1209 h = cp->csp->page;
1210 indx = cp->csp->indx;
1211 stack = 1;
1212
1213 /*
1214 * If re-numbering records, the on-page deleted flag means this record
1215 * was implicitly created. If not re-numbering records, the on-page
1216 * deleted flag means this record was implicitly created, or, it was
1217 * deleted at some time.
1218 *
1219 * If DB_NOOVERWRITE is set and the item already exists in the tree,
1220 * return an error unless the item was either marked for deletion or
1221 * only implicitly created.
1222 */
1223 if (exact) {
1224 bk = GET_BKEYDATA(h, indx);
1225 if (!B_DISSET(bk->type) && flags == DB_NOOVERWRITE) {
1226 ret = DB_KEYEXIST;
1227 goto err;
1228 }
1229 }
1230
1231 /*
1232 * Select the arguments for CDB___bam_iitem() and do the insert. If the
1233 * key is an exact match, or we're replacing the data item with a
1234 * new data item, replace the current item. If the key isn't an exact
1235 * match, we're inserting a new key/data pair, before the search
1236 * location.
1237 */
1238 switch (ret = CDB___bam_iitem(dbc,
1239 &h, &indx, NULL, data, exact ? DB_CURRENT : DB_BEFORE, bi_flags)) {
1240 case 0:
1241 /*
1242 * Don't adjust anything.
1243 *
1244 * If we inserted a record, no cursors need adjusting because
1245 * the only new record it's possible to insert is at the very
1246 * end of the tree. The necessary adjustments to the internal
1247 * page counts were made by CDB___bam_iitem().
1248 *
1249 * If we overwrote a record, no cursors need adjusting because
1250 * future DBcursor->get calls will simply return the underlying
1251 * record (there's no adjustment made for the DB_CURRENT flag
1252 * when a cursor get operation immediately follows a cursor
1253 * delete operation, and the normal adjustment for the DB_NEXT
1254 * flag is still correct).
1255 */
1256 break;
1257 case DB_NEEDSPLIT:
1258 /* Discard the stack of pages and split the page. */
1259 (void)CDB___bam_stkrel(dbc, 0);
1260 stack = 0;
1261
1262 if ((ret = CDB___bam_split(dbc, recnop)) != 0)
1263 goto err;
1264
1265 goto retry;
1266 /* NOTREACHED */
1267 default:
1268 goto err;
1269 }
1270
1271
1272 err: if (stack)
1273 CDB___bam_stkrel(dbc, 0);
1274
1275 return (ret);
1276 }
1277