1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1997, 1998, 1999
5  *	Sleepycat Software.  All rights reserved.
6  */
7 
8 #include "db_config.h"
9 
10 #ifndef lint
11 static const char sccsid[] = "@(#)bt_recno.c	11.9 (Sleepycat) 10/29/99";
12 #endif /* not lint */
13 
14 #ifndef NO_SYSTEM_INCLUDES
15 #include <sys/types.h>
16 
17 #include <errno.h>
18 #include <limits.h>
19 #include <string.h>
20 #endif
21 
22 #include "db_int.h"
23 #include "db_page.h"
24 #include "btree.h"
25 #include "db_ext.h"
26 #include "db_shash.h"
27 #include "lock.h"
28 #include "lock_ext.h"
29 #include "qam.h"
30 
31 static int CDB___ram_add __P((DBC *, db_recno_t *, DBT *, u_int32_t, u_int32_t));
32 static int CDB___ram_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
33 static int CDB___ram_fmap __P((DBC *, db_recno_t));
34 static int CDB___ram_i_delete __P((DBC *));
35 static int CDB___ram_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
36 static int CDB___ram_source __P((DB *));
37 static int CDB___ram_update __P((DBC *, db_recno_t, int));
38 static int CDB___ram_vmap __P((DBC *, db_recno_t));
39 
40 /*
41  * In recno, there are two meanings to the on-page "deleted" flag.  If we're
42  * re-numbering records, it means the record was implicitly created.  We skip
43  * over implicitly created records if doing a cursor "next" or "prev", and
44  * return DB_KEYEMPTY if they're explicitly requested..  If not re-numbering
45  * records, it means that the record was implicitly created, or was deleted.
46  * We skip over implicitly created or deleted records if doing a cursor "next"
47  * or "prev", and return DB_KEYEMPTY if they're explicitly requested.
48  *
49  * If we're re-numbering records, then we have to detect in the cursor that
50  * a record was deleted, and adjust the cursor as necessary on the next get.
51  * If we're not re-numbering records, then we can detect that a record has
52  * been deleted by looking at the actual on-page record, so we completely
53  * ignore the cursor's delete flag.  This is different from the B+tree code.
54  * It also maintains whether the cursor references a deleted record in the
55  * cursor, and it doesn't always check the on-page value.
56  */
57 #define	CD_SET(dbp, cp) {						\
58 	if (F_ISSET(dbp, DB_RE_RENUMBER))				\
59 		F_SET(cp, C_DELETED);					\
60 }
61 #define	CD_CLR(dbp, cp) {						\
62 	if (F_ISSET(dbp, DB_RE_RENUMBER))				\
63 		F_CLR(cp, C_DELETED);					\
64 }
65 #define	CD_ISSET(dbp, cp)						\
66 	(F_ISSET(dbp, DB_RE_RENUMBER) && F_ISSET(cp, C_DELETED))
67 
68 /*
69  * CDB___ram_open --
70  *	Recno open function.
71  *
72  * PUBLIC: int CDB___ram_open __P((DB *, const char *, db_pgno_t));
73  */
74 int
CDB___ram_open(dbp,name,base_pgno)75 CDB___ram_open(dbp, name, base_pgno)
76 	DB *dbp;
77 	const char *name;
78 	db_pgno_t base_pgno;
79 {
80 	BTREE *t;
81 	DBC *dbc;
82 	int ret, t_ret;
83 
84 	t = dbp->bt_internal;
85 
86 	/* Initialize the remaining fields/methods of the DB. */
87 	dbp->del = CDB___ram_delete;
88 	dbp->put = CDB___ram_put;
89 	dbp->stat = CDB___bam_stat;
90 
91 	/* Set the overflow page size. */
92 	CDB___bam_setovflsize(dbp);
93 
94 	/* Start up the tree. */
95 	if ((ret = CDB___bam_read_root(dbp, name, base_pgno)) != 0)
96 		goto err;
97 
98 	/*
99 	 * If the user specified a source tree, open it and map it in.
100 	 *
101 	 * !!!
102 	 * We don't complain if the user specified transactions or threads.
103 	 * It's possible to make it work, but you'd better know what you're
104 	 * doing!
105 	 */
106 	if (t->re_source == NULL)
107 		F_SET(t, RECNO_EOF);
108 	else
109 		if ((ret = CDB___ram_source(dbp)) != 0)
110 			goto err;
111 
112 	/* If we're snapshotting an underlying source file, do it now. */
113 	if (F_ISSET(dbp, DB_RE_SNAPSHOT)) {
114 		/* Allocate a cursor. */
115 		if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
116 			goto err;
117 
118 		/* Do the snapshot. */
119 		if ((ret = CDB___ram_update(dbc,
120 		    DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND)
121 			ret = 0;
122 
123 		/* Discard the cursor. */
124 		if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
125 			ret = t_ret;
126 
127 		if (ret != 0)
128 			goto err;
129 	}
130 
131 	return (0);
132 
133 err:	/* If we mmap'd a source file, discard it. */
134 	if (t->re_smap != NULL)
135 		(void)CDB___os_unmapfile(dbp->dbenv, t->re_smap, t->re_msize);
136 
137 	/* If we opened a source file, discard it. */
138 	if (F_ISSET(&t->re_fh, DB_FH_VALID))
139 		(void)CDB___os_closehandle(&t->re_fh);
140 	if (t->re_source != NULL)
141 		CDB___os_freestr(t->re_source);
142 
143 	return (ret);
144 }
145 
146 /*
147  * CDB___ram_delete --
148  *	Recno db->del function.
149  */
150 static int
CDB___ram_delete(dbp,txn,key,flags)151 CDB___ram_delete(dbp, txn, key, flags)
152 	DB *dbp;
153 	DB_TXN *txn;
154 	DBT *key;
155 	u_int32_t flags;
156 {
157 	BTREE_CURSOR *cp;
158 	DBC *dbc;
159 	db_recno_t recno;
160 	int ret, t_ret;
161 
162 	PANIC_CHECK(dbp->dbenv);
163 
164 	/* Check for invalid flags. */
165 	if ((ret = CDB___db_delchk(dbp,
166 	    key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
167 		return (ret);
168 
169 	/* Acquire a cursor. */
170 	if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
171 		return (ret);
172 
173 	DEBUG_LWRITE(dbc, txn, "ram_delete", key, NULL, flags);
174 
175 	/* Check the user's record number and fill in as necessary. */
176 	if ((ret = CDB___ram_getno(dbc, key, &recno, 0)) != 0)
177 		goto err;
178 
179 	/* Do the delete. */
180 	cp = dbc->internal;
181 	cp->recno = recno;
182 	ret = CDB___ram_i_delete(dbc);
183 
184 	/* Release the cursor. */
185 err:	if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
186 		ret = t_ret;
187 
188 	return (ret);
189 }
190 
191 /*
192  * CDB___ram_i_delete --
193  *	Internal version of recno delete, called by CDB___ram_delete and
194  *	CDB___ram_c_del.
195  */
196 static int
CDB___ram_i_delete(dbc)197 CDB___ram_i_delete(dbc)
198 	DBC *dbc;
199 {
200 	BKEYDATA bk;
201 	BTREE *t;
202 	BTREE_CURSOR *cp;
203 	DB *dbp;
204 	DBT hdr, data;
205 	PAGE *h;
206 	db_indx_t indx;
207 	int exact, ret, stack;
208 
209 	dbp = dbc->dbp;
210 	cp = dbc->internal;
211 	t = dbp->bt_internal;
212 	stack = 0;
213 
214 	/*
215 	 * If this is CDB and this isn't a write cursor, then it's an error.
216 	 * If it is a write cursor, but we don't yet hold the write lock, then
217 	 * we need to upgrade to the write lock.
218 	 */
219 	if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) {
220 		/* Make sure it's a valid update cursor. */
221 		if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))
222 			return (EINVAL);
223 
224 		if (F_ISSET(dbc, DBC_WRITECURSOR) &&
225 		    (ret = CDB_lock_get(dbp->dbenv, dbc->locker,
226 		    DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
227 		    &dbc->mylock)) != 0)
228 			return (ret);
229 	}
230 
231 	/* Search the tree for the key; delete only deletes exact matches. */
232 	if ((ret = CDB___bam_rsearch(dbc, &cp->recno, S_DELETE, 1, &exact)) != 0)
233 		goto err;
234 	if (!exact) {
235 		ret = DB_NOTFOUND;
236 		goto err;
237 	}
238 	stack = 1;
239 
240 	h = cp->csp->page;
241 	indx = cp->csp->indx;
242 
243 	/*
244 	 * If re-numbering records, the on-page deleted flag can only mean
245 	 * that this record was implicitly created.  Applications aren't
246 	 * permitted to delete records they never created, return an error.
247 	 *
248 	 * If not re-numbering records, the on-page deleted flag means that
249 	 * this record was implicitly created, or, was deleted at some time.
250 	 * The former is an error because applications aren't permitted to
251 	 * delete records they never created, the latter is an error because
252 	 * if the record was "deleted", we could never have found it.
253 	 */
254 	if (B_DISSET(GET_BKEYDATA(h, indx)->type)) {
255 		ret = DB_KEYEMPTY;
256 		goto err;
257 	}
258 
259 	if (F_ISSET(dbp, DB_RE_RENUMBER)) {
260 		/* Delete the item, adjust the counts, adjust the cursors. */
261 		if ((ret = CDB___bam_ditem(dbc, h, indx)) != 0)
262 			goto err;
263 		CDB___bam_adjust(dbc, -1);
264 		CDB___ram_ca(dbp, cp->recno, CA_DELETE);
265 
266 		/*
267 		 * If the page is empty, delete it.   The whole tree is locked
268 		 * so there are no preparations to make.
269 		 */
270 		if (NUM_ENT(h) == 0 && h->pgno != t->bt_root) {
271 			stack = 0;
272 			ret = CDB___bam_dpages(dbc);
273 		}
274 	} else {
275 		/* Use a delete/put pair to replace the record with a marker. */
276 		if ((ret = CDB___bam_ditem(dbc, h, indx)) != 0)
277 			goto err;
278 
279 		B_TSET(bk.type, B_KEYDATA, 1);
280 		bk.len = 0;
281 		memset(&hdr, 0, sizeof(hdr));
282 		hdr.data = &bk;
283 		hdr.size = SSZA(BKEYDATA, data);
284 		memset(&data, 0, sizeof(data));
285 		data.data = (void *)"";
286 		data.size = 0;
287 		if ((ret = CDB___db_pitem(dbc,
288 		    h, indx, BKEYDATA_SIZE(0), &hdr, &data)) != 0)
289 			goto err;
290 	}
291 	F_SET(t, RECNO_MODIFIED);
292 
293 err:	if (stack)
294 		CDB___bam_stkrel(dbc, 0);
295 
296 	/* If we upgraded the CDB lock upon entry; downgrade it now. */
297 	if (F_ISSET(dbc, DBC_WRITECURSOR))
298 		(void)CDB___lock_downgrade(dbp->dbenv,
299 		    &dbc->mylock, DB_LOCK_IWRITE, 0);
300 	return (ret);
301 }
302 
303 /*
304  * CDB___ram_put --
305  *	Recno db->put function.
306  */
307 static int
CDB___ram_put(dbp,txn,key,data,flags)308 CDB___ram_put(dbp, txn, key, data, flags)
309 	DB *dbp;
310 	DB_TXN *txn;
311 	DBT *key, *data;
312 	u_int32_t flags;
313 {
314 	DBC *dbc;
315 	db_recno_t recno;
316 	int ret, t_ret;
317 
318 	PANIC_CHECK(dbp->dbenv);
319 
320 	/* Check for invalid flags. */
321 	if ((ret = CDB___db_putchk(dbp,
322 	    key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0)
323 		return (ret);
324 
325 	/* Allocate a cursor. */
326 	if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
327 		return (ret);
328 
329 	DEBUG_LWRITE(dbc, txn, "ram_put", key, data, flags);
330 
331 	/*
332 	 * If we're appending to the tree, make sure we've read in all of
333 	 * the backing source file.  Otherwise, check the user's record
334 	 * number and fill in as necessary.
335 	 */
336 	if (flags == DB_APPEND) {
337 		if ((ret = CDB___ram_update(
338 		    dbc, DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND)
339 			ret = 0;
340 	} else
341 		ret = CDB___ram_getno(dbc, key, &recno, 1);
342 
343 	/* Add the record. */
344 	if (ret == 0)
345 		ret = CDB___ram_add(dbc, &recno, data, flags, 0);
346 
347 	/* Discard the cursor. */
348 	if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
349 		ret = t_ret;
350 
351 	/* Return the record number if we're appending to the tree. */
352 	if (ret == 0 && flags == DB_APPEND)
353 		*(db_recno_t *)key->data = recno;
354 
355 	return (ret);
356 }
357 
358 /*
359  * CDB___ram_c_del --
360  *	Recno cursor->c_del function.
361  *
362  * PUBLIC: int CDB___ram_c_del __P((DBC *, u_int32_t));
363  */
364 int
CDB___ram_c_del(dbc,flags)365 CDB___ram_c_del(dbc, flags)
366 	DBC *dbc;
367 	u_int32_t flags;
368 {
369 	BTREE_CURSOR *cp;
370 	DB *dbp;
371 	int ret;
372 
373 	dbp = dbc->dbp;
374 	cp = dbc->internal;
375 
376 	PANIC_CHECK(dbp->dbenv);
377 
378 	/* Check for invalid flags. */
379 	if ((ret = CDB___db_cdelchk(dbp, flags,
380 	    F_ISSET(dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0)
381 		return (ret);
382 
383 	DEBUG_LWRITE(dbc, dbc->txn, "ram_c_del", NULL, NULL, flags);
384 
385 	/*
386 	 * The semantics of cursors during delete are as follows: if record
387 	 * numbers are mutable (DB_RE_RENUMBER is set), deleting a record
388 	 * causes the cursor to automatically point to the record immediately
389 	 * following.  In this case it is possible to use a single cursor for
390 	 * repeated delete operations, without intervening operations.
391 	 *
392 	 * If record numbers are not mutable, then records are replaced with
393 	 * a marker containing a delete flag.  If the record referenced by
394 	 * this cursor has already been deleted, we will detect that as part
395 	 * of the delete operation, and fail.
396 	 */
397 	return (CDB___ram_i_delete(dbc));
398 }
399 
400 /*
401  * CDB___ram_c_get --
402  *	Recno cursor->c_get function.
403  *
404  * PUBLIC: int CDB___ram_c_get __P((DBC *, DBT *, DBT *, u_int32_t));
405  */
406 int
CDB___ram_c_get(dbc,key,data,flags)407 CDB___ram_c_get(dbc, key, data, flags)
408 	DBC *dbc;
409 	DBT *key, *data;
410 	u_int32_t flags;
411 {
412 	BTREE_CURSOR *cp, copy;
413 	DB *dbp;
414 	PAGE *h;
415 	db_indx_t indx;
416 	int exact, ret, stack, tmp_rmw;
417 
418 	dbp = dbc->dbp;
419 	cp = dbc->internal;
420 
421 	PANIC_CHECK(dbp->dbenv);
422 
423 	/* Check for invalid flags. */
424 	if ((ret = CDB___db_cgetchk(dbc->dbp,
425 	    key, data, flags, cp->recno != RECNO_OOB)) != 0)
426 		return (ret);
427 
428 	/* Clear OR'd in additional bits so we can check for flag equality. */
429 	tmp_rmw = 0;
430 	if (LF_ISSET(DB_RMW)) {
431 		tmp_rmw = 1;
432 		F_SET(dbc, DBC_RMW);
433 		LF_CLR(DB_RMW);
434 	}
435 
436 	DEBUG_LREAD(dbc, dbc->txn, "ram_c_get",
437 	    flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
438 
439 	/* Initialize the cursor for a new retrieval. */
440 	copy = *cp;
441 
442 retry:	/* Update the record number. */
443 	stack = 0;
444 	switch (flags) {
445 	case DB_CURRENT:
446 		/*
447 		 * If record numbers are mutable: if we just deleted a record,
448 		 * there is no action necessary, we return the record following
449 		 * the deleted item by virtue of renumbering the tree.
450 		 */
451 		break;
452 	case DB_NEXT:
453 		/*
454 		 * If record numbers are mutable: if we just deleted a record,
455 		 * we have to avoid incrementing the record number so that we
456 		 * return the right record by virtue of renumbering the tree.
457 		 */
458 		if (CD_ISSET(dbp, cp))
459 			break;
460 
461 		if (cp->recno != RECNO_OOB) {
462 			++cp->recno;
463 			break;
464 		}
465 		/* FALLTHROUGH */
466 	case DB_FIRST:
467 		flags = DB_NEXT;
468 		cp->recno = 1;
469 		break;
470 	case DB_PREV:
471 		if (cp->recno != RECNO_OOB) {
472 			if (cp->recno == 1) {
473 				ret = DB_NOTFOUND;
474 				goto err;
475 			}
476 			--cp->recno;
477 			break;
478 		}
479 		/* FALLTHROUGH */
480 	case DB_LAST:
481 		flags = DB_PREV;
482 		if (((ret = CDB___ram_update(dbc,
483 		    DB_MAX_RECORDS, 0)) != 0) && ret != DB_NOTFOUND)
484 			goto err;
485 		if ((ret = CDB___bam_nrecs(dbc, &cp->recno)) != 0)
486 			goto err;
487 		if (cp->recno == 0) {
488 			ret = DB_NOTFOUND;
489 			goto err;
490 		}
491 		break;
492 	case DB_SET:
493 	case DB_SET_RANGE:
494 		if ((ret = CDB___ram_getno(dbc, key, &cp->recno, 0)) != 0)
495 			goto err;
496 		break;
497 	}
498 
499 	/*
500 	 * For DB_PREV, DB_LAST, DB_SET and DB_SET_RANGE, we have already
501 	 * called CDB___ram_update() to make sure sufficient records have been
502 	 * read from the backing source file.  Do it now for DB_CURRENT (if
503 	 * the current record was deleted we may need more records from the
504 	 * backing file for a DB_CURRENT operation), DB_FIRST and DB_NEXT.
505 	 */
506 	if (flags == DB_NEXT && ((ret =
507 	    CDB___ram_update(dbc, cp->recno, 0)) != 0) && ret != DB_NOTFOUND)
508 		goto err;
509 
510 	/* Search the tree for the record. */
511 	if ((ret = CDB___bam_rsearch(dbc, &cp->recno,
512 	    F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND, 1, &exact)) != 0)
513 		goto err;
514 	stack = 1;
515 	if (!exact) {
516 		ret = DB_NOTFOUND;
517 		goto err;
518 	}
519 	h = cp->csp->page;
520 	indx = cp->csp->indx;
521 
522 	/*
523 	 * If re-numbering records, the on-page deleted flag means this record
524 	 * was implicitly created.  If not re-numbering records, the on-page
525 	 * deleted flag means this record was implicitly created, or, it was
526 	 * deleted at some time.  Regardless, we skip such records if doing
527 	 * cursor next/prev operations, and fail if the application requested
528 	 * them explicitly.
529 	 */
530 	if (B_DISSET(GET_BKEYDATA(h, indx)->type)) {
531 		if (flags == DB_NEXT || flags == DB_PREV) {
532 			(void)CDB___bam_stkrel(dbc, 0);
533 			goto retry;
534 		}
535 		ret = DB_KEYEMPTY;
536 		goto err;
537 	}
538 
539 	/* Return the key if the user didn't give us one. */
540 	if (flags != DB_SET && flags != DB_SET_RANGE &&
541 	    (ret = CDB___db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
542 	    &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
543 		goto err;
544 
545 	/* Return the data item. */
546 	if ((ret = CDB___db_ret(dbp,
547 	    h, indx, data, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
548 		goto err;
549 
550 	/* The cursor was reset, no further delete adjustment is necessary. */
551 	CD_CLR(dbp, cp);
552 
553 err:	if (stack)
554 		(void)CDB___bam_stkrel(dbc, 0);
555 
556 	/* Release temporary lock upgrade. */
557 	if (tmp_rmw)
558 		F_CLR(dbc, DBC_RMW);
559 
560 	if (ret != 0)
561 		*cp = copy;
562 
563 	return (ret);
564 }
565 
566 /*
567  * CDB___ram_c_put --
568  *	Recno cursor->c_put function.
569  *
570  * PUBLIC: int CDB___ram_c_put __P((DBC *, DBT *, DBT *, u_int32_t));
571  */
572 int
CDB___ram_c_put(dbc,key,data,flags)573 CDB___ram_c_put(dbc, key, data, flags)
574 	DBC *dbc;
575 	DBT *key, *data;
576 	u_int32_t flags;
577 {
578 	BTREE_CURSOR *cp, copy;
579 	DB *dbp;
580 	int exact, ret;
581 	void *arg;
582 
583 	dbp = dbc->dbp;
584 	cp = dbc->internal;
585 
586 	PANIC_CHECK(dbp->dbenv);
587 
588 	if ((ret = CDB___db_cputchk(dbc->dbp, key, data, flags,
589 	    F_ISSET(dbc->dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0)
590 		return (ret);
591 
592 	DEBUG_LWRITE(dbc, dbc->txn, "ram_c_put", NULL, data, flags);
593 
594 	/*
595 	 * If we are running CDB, this had better be either a write
596 	 * cursor or an immediate writer.  If it's a regular writer,
597 	 * that means we have an IWRITE lock and we need to upgrade
598 	 * it to a write lock.
599 	 */
600 	if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) {
601 		if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))
602 			return (EINVAL);
603 
604 		if (F_ISSET(dbc, DBC_WRITECURSOR) &&
605 		    (ret = CDB_lock_get(dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE,
606 		    &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0)
607 			return (ret);
608 	}
609 
610 	/* Initialize the cursor for a new retrieval. */
611 	copy = *cp;
612 
613 	/*
614 	 * To split, we need a valid key for the page.
615 	 *
616 	 * The split code discards all short-term locks and stack pages.
617 	 */
618 	if (0) {
619 split:		arg = &cp->recno;
620 		if ((ret = CDB___bam_split(dbc, arg)) != 0)
621 			goto err;
622 	}
623 
624 	if ((ret = CDB___bam_rsearch(dbc, &cp->recno, S_INSERT, 1, &exact)) != 0)
625 		goto err;
626 	if (!exact) {
627 		ret = DB_NOTFOUND;
628 		goto err;
629 	}
630 	if ((ret = CDB___bam_iitem(dbc, &cp->csp->page,
631 	    &cp->csp->indx, key, data, flags, 0)) == DB_NEEDSPLIT) {
632 		if ((ret = CDB___bam_stkrel(dbc, 0)) != 0)
633 			goto err;
634 		goto split;
635 	}
636 	if ((ret = CDB___bam_stkrel(dbc, 0)) != 0)
637 		goto err;
638 
639 	switch (flags) {
640 	case DB_AFTER:
641 		/* Adjust the cursors. */
642 		CDB___ram_ca(dbp, cp->recno, CA_IAFTER);
643 
644 		/* Set this cursor to reference the new record. */
645 		cp->recno = copy.recno + 1;
646 		break;
647 	case DB_BEFORE:
648 		/* Adjust the cursors. */
649 		CDB___ram_ca(dbp, cp->recno, CA_IBEFORE);
650 
651 		/* Set this cursor to reference the new record. */
652 		cp->recno = copy.recno;
653 		break;
654 	}
655 
656 	/* Return the key if we've created a new record. */
657 	if ((flags == DB_AFTER || flags == DB_BEFORE) &&
658 	    (ret = CDB___db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
659 	    &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
660 		goto err;
661 
662 	/* The cursor was reset, no further delete adjustment is necessary. */
663 	CD_CLR(dbp, cp);
664 
665 err:	if (F_ISSET(dbc, DBC_WRITECURSOR))
666 		(void)CDB___lock_downgrade(dbp->dbenv,
667 		    &dbc->mylock, DB_LOCK_IWRITE, 0);
668 
669 	if (ret != 0)
670 		*cp = copy;
671 
672 	return (ret);
673 }
674 
675 /*
676  * CDB___ram_ca --
677  *	Adjust cursors.
678  *
679  * PUBLIC: void CDB___ram_ca __P((DB *, db_recno_t, ca_recno_arg));
680  */
681 void
CDB___ram_ca(dbp,recno,op)682 CDB___ram_ca(dbp, recno, op)
683 	DB *dbp;
684 	db_recno_t recno;
685 	ca_recno_arg op;
686 {
687 	BTREE_CURSOR *cp;
688 	DBC *dbc;
689 	db_recno_t nrecs;
690 
691 	/*
692 	 * Adjust the cursors.  See the comment in CDB___bam_ca_delete().
693 	 */
694 	MUTEX_THREAD_LOCK(dbp->mutexp);
695 	for (dbc = TAILQ_FIRST(&dbp->active_queue);
696 	    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
697 		cp = dbc->internal;
698 		switch (op) {
699 		case CA_DELETE:
700 			if (recno < cp->recno)
701 				--cp->recno;
702 			if (recno == cp->recno) {
703 				if (CDB___bam_nrecs(
704 				    dbc, &nrecs) == 0 && recno > nrecs)
705 					--cp->recno;
706 				else
707 					CD_SET(dbp, cp);
708 			}
709 			break;
710 		case CA_IAFTER:
711 			if (recno < cp->recno)
712 				++cp->recno;
713 			break;
714 		case CA_IBEFORE:
715 			if (recno <= cp->recno)
716 				++cp->recno;
717 			break;
718 		}
719 	}
720 	MUTEX_THREAD_UNLOCK(dbp->mutexp);
721 }
722 
723 /*
724  * CDB___ram_getno --
725  *	Check the user's record number, and make sure we've seen it.
726  *
727  * PUBLIC: int CDB___ram_getno __P((DBC *, const DBT *, db_recno_t *, int));
728  */
729 int
CDB___ram_getno(dbc,key,rep,can_create)730 CDB___ram_getno(dbc, key, rep, can_create)
731 	DBC *dbc;
732 	const DBT *key;
733 	db_recno_t *rep;
734 	int can_create;
735 {
736 	DB *dbp;
737 	db_recno_t recno;
738 
739 	dbp = dbc->dbp;
740 
741 	/* Check the user's record number. */
742 	if ((recno = *(db_recno_t *)key->data) == 0) {
743 		CDB___db_err(dbp->dbenv, "illegal record number of 0");
744 		return (EINVAL);
745 	}
746 	if (rep != NULL)
747 		*rep = recno;
748 
749 	/*
750 	 * Btree can neither create records nor read them in.  Recno can
751 	 * do both, see if we can find the record.
752 	 */
753 	return (dbp->type == DB_RECNO ?
754 	    CDB___ram_update(dbc, recno, can_create) : 0);
755 }
756 
757 /*
758  * CDB___ram_update --
759  *	Ensure the tree has records up to and including the specified one.
760  */
761 static int
CDB___ram_update(dbc,recno,can_create)762 CDB___ram_update(dbc, recno, can_create)
763 	DBC *dbc;
764 	db_recno_t recno;
765 	int can_create;
766 {
767 	BTREE *t;
768 	DB *dbp;
769 	db_recno_t nrecs;
770 	int ret;
771 
772 	dbp = dbc->dbp;
773 	t = dbp->bt_internal;
774 
775 	/*
776 	 * If we can't create records and we've read the entire backing input
777 	 * file, we're done.
778 	 */
779 	if (!can_create && F_ISSET(t, RECNO_EOF))
780 		return (0);
781 
782 	/*
783 	 * If we haven't seen this record yet, try to get it from the original
784 	 * file.
785 	 */
786 	if ((ret = CDB___bam_nrecs(dbc, &nrecs)) != 0)
787 		return (ret);
788 	if (!F_ISSET(t, RECNO_EOF) && recno > nrecs) {
789 		if ((ret = t->re_irec(dbc, recno)) != 0)
790 			return (ret);
791 		if ((ret = CDB___bam_nrecs(dbc, &nrecs)) != 0)
792 			return (ret);
793 	}
794 
795 	/*
796 	 * If we can create records, create empty ones up to the requested
797 	 * record.
798 	 */
799 	if (!can_create || recno <= nrecs + 1)
800 		return (0);
801 
802 	dbc->rdata.dlen = 0;
803 	dbc->rdata.doff = 0;
804 	dbc->rdata.flags = 0;
805 	if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
806 		if (dbc->rdata.ulen < t->re_len) {
807 			if ((ret = CDB___os_realloc(t->re_len,
808 			    NULL, &dbc->rdata.data)) != 0) {
809 				dbc->rdata.ulen = 0;
810 				dbc->rdata.data = NULL;
811 				return (ret);
812 			}
813 			dbc->rdata.ulen = t->re_len;
814 		}
815 		dbc->rdata.size = t->re_len;
816 		memset(dbc->rdata.data, t->re_pad, t->re_len);
817 	} else
818 		dbc->rdata.size = 0;
819 
820 	while (recno > ++nrecs)
821 		if ((ret = CDB___ram_add(dbc,
822 		    &nrecs, &dbc->rdata, 0, BI_DELETED)) != 0)
823 			return (ret);
824 	return (0);
825 }
826 
827 /*
828  * CDB___ram_source --
829  *	Load information about the backing file.
830  */
831 static int
CDB___ram_source(dbp)832 CDB___ram_source(dbp)
833 	DB *dbp;
834 {
835 	BTREE *t;
836 	size_t size;
837 	u_int32_t bytes, mbytes;
838 	int ret;
839 
840 	t = dbp->bt_internal;
841 
842 	/*
843 	 * !!!
844 	 * The caller has full responsibility for cleaning up on error --
845 	 * (it has to anyway, in case it fails after this routine succeeds).
846 	 */
847 	if ((ret = CDB___db_appname(dbp->dbenv,
848 	    DB_APP_DATA, NULL, t->re_source, 0, NULL, &t->re_source)) != 0)
849 		return (ret);
850 
851 	/*
852 	 * !!!
853 	 * It's possible that the backing source file is read-only.  We don't
854 	 * much care other than we'll complain if there are any modifications
855 	 * when it comes time to write the database back to the source.
856 	 */
857 	ret = CDB___os_open(t->re_source,
858 	    F_ISSET(dbp, DB_AM_RDONLY) ? DB_OSO_RDONLY : 0, 0, &t->re_fh);
859 	if (ret != 0 && !F_ISSET(dbp, DB_AM_RDONLY))
860 		ret = CDB___os_open(t->re_source, DB_OSO_RDONLY, 0, &t->re_fh);
861 	if (ret != 0) {
862 		CDB___db_err(dbp->dbenv, "%s: %s", t->re_source, CDB_db_strerror(ret));
863 		return (ret);
864 	}
865 
866 	/*
867 	 * XXX
868 	 * We'd like to test to see if the file is too big to mmap.  Since we
869 	 * don't know what size or type off_t's or size_t's are, or the largest
870 	 * unsigned integral type is, or what random insanity the local C
871 	 * compiler will perpetrate, doing the comparison in a portable way is
872 	 * flatly impossible.  Hope that mmap fails if the file is too large.
873 	 */
874 	if ((ret = CDB___os_ioinfo(t->re_source,
875 	    &t->re_fh, &mbytes, &bytes, NULL)) != 0) {
876 		CDB___db_err(dbp->dbenv, "%s: %s", t->re_source, CDB_db_strerror(ret));
877 		return (ret);
878 	}
879 	if (mbytes == 0 && bytes == 0) {
880 		F_SET(t, RECNO_EOF);
881 		return (0);
882 	}
883 
884 	size = mbytes * MEGABYTE + bytes;
885 	if ((ret = CDB___os_mapfile(dbp->dbenv, t->re_source,
886 	    &t->re_fh, (size_t)size, 1, &t->re_smap)) != 0)
887 		return (ret);
888 	t->re_cmap = t->re_smap;
889 	t->re_emap = (u_int8_t *)t->re_smap + (t->re_msize = size);
890 	t->re_irec = F_ISSET(dbp, DB_RE_FIXEDLEN) ?  CDB___ram_fmap : CDB___ram_vmap;
891 	return (0);
892 }
893 
894 /*
895  * CDB___ram_writeback --
896  *	Rewrite the backing file.
897  *
898  * PUBLIC: int CDB___ram_writeback __P((DB *));
899  */
900 int
CDB___ram_writeback(dbp)901 CDB___ram_writeback(dbp)
902 	DB *dbp;
903 {
904 	BTREE *t;
905 	DBC *dbc;
906 	DBT key, data;
907 	DB_FH fh;
908 	db_recno_t keyno;
909 	ssize_t nw;
910 	int ret, t_ret;
911 	u_int8_t delim, *pad;
912 
913 	t = dbp->bt_internal;
914 
915 	/* If the file wasn't modified, we're done. */
916 	if (!F_ISSET(t, RECNO_MODIFIED))
917 		return (0);
918 
919 	/* If there's no backing source file, we're done. */
920 	if (t->re_source == NULL) {
921 		F_CLR(t, RECNO_MODIFIED);
922 		return (0);
923 	}
924 
925 	/* Allocate a cursor. */
926 	if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
927 		return (ret);
928 
929 	/*
930 	 * Read any remaining records into the tree.
931 	 *
932 	 * !!!
933 	 * This is why we can't support transactions when applications specify
934 	 * backing (re_source) files.  At this point we have to read in the
935 	 * rest of the records from the file so that we can write all of the
936 	 * records back out again, which could modify a page for which we'd
937 	 * have to log changes and which we don't have locked.  This could be
938 	 * partially fixed by taking a snapshot of the entire file during the
939 	 * DB->open as DB->open is transaction protected.  But, if a checkpoint
940 	 * occurs then, the part of the log holding the copy of the file could
941 	 * be discarded, and that would make it impossible to recover in the
942 	 * face of disaster.  This could all probably be fixed, but it would
943 	 * require transaction protecting the backing source file, i.e. mpool
944 	 * would have to know about it, and we don't want to go there.
945 	 */
946 	if ((ret =
947 	    CDB___ram_update(dbc, DB_MAX_RECORDS, 0)) != 0 && ret != DB_NOTFOUND)
948 		return (ret);
949 
950 	/*
951 	 * !!!
952 	 * Close any underlying mmap region.  This is required for Windows NT
953 	 * (4.0, Service Pack 2) -- if the file is still mapped, the following
954 	 * open will fail.
955 	 */
956 	if (t->re_smap != NULL) {
957 		(void)CDB___os_unmapfile(dbp->dbenv, t->re_smap, t->re_msize);
958 		t->re_smap = NULL;
959 	}
960 
961 	/* Get rid of any backing file descriptor, just on GP's. */
962 	if (F_ISSET(&t->re_fh, DB_FH_VALID))
963 		(void)CDB___os_closehandle(&t->re_fh);
964 
965 	/* Open the file, truncating it. */
966 	if ((ret = CDB___os_open(
967 	    t->re_source, DB_OSO_SEQ | DB_OSO_TRUNC, 0, &fh)) != 0) {
968 		CDB___db_err(dbp->dbenv, "%s: %s", t->re_source, CDB_db_strerror(ret));
969 		goto err;
970 	}
971 
972 	/*
973 	 * We step through the records, writing each one out.  Use the record
974 	 * number and the dbp->get() function, instead of a cursor, so we find
975 	 * and write out "deleted" or non-existent records.
976 	 */
977 	memset(&key, 0, sizeof(key));
978 	memset(&data, 0, sizeof(data));
979 	key.size = sizeof(db_recno_t);
980 	key.data = &keyno;
981 
982 	/*
983 	 * We'll need the delimiter if we're doing variable-length records,
984 	 * and the pad character if we're doing fixed-length records.
985 	 */
986 	delim = t->re_delim;
987 	if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
988 		if ((ret = CDB___os_malloc(t->re_len, NULL, &pad)) != 0)
989 			goto err;
990 		memset(pad, t->re_pad, t->re_len);
991 	} else
992 		COMPQUIET(pad, NULL);
993 	for (keyno = 1;; ++keyno) {
994 		switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
995 		case 0:
996 			if ((ret =
997 			    CDB___os_write(&fh, data.data, data.size, &nw)) != 0)
998 				goto err;
999 			if (nw != (ssize_t)data.size) {
1000 				ret = EIO;
1001 				goto err;
1002 			}
1003 			break;
1004 		case DB_KEYEMPTY:
1005 			if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
1006 				if ((ret = CDB___os_write(
1007 				    &fh, pad, t->re_len, &nw)) != 0)
1008 					goto err;
1009 				if (nw != (ssize_t)t->re_len) {
1010 					ret = EIO;
1011 					goto err;
1012 				}
1013 			}
1014 			break;
1015 		case DB_NOTFOUND:
1016 			ret = 0;
1017 			goto done;
1018 		}
1019 		if (!F_ISSET(dbp, DB_RE_FIXEDLEN)) {
1020 			if ((ret = CDB___os_write(&fh, &delim, 1, &nw)) != 0)
1021 				goto err;
1022 			if (nw != 1) {
1023 				ret = EIO;
1024 				goto err;
1025 			}
1026 		}
1027 	}
1028 
1029 err:
1030 done:	/* Close the file descriptor. */
1031 	if (F_ISSET(&fh, DB_FH_VALID) &&
1032 	    (t_ret = CDB___os_closehandle(&fh)) != 0 && ret == 0)
1033 		ret = t_ret;
1034 
1035 	/* Discard the cursor. */
1036 	if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
1037 		ret = t_ret;
1038 
1039 	if (ret == 0)
1040 		F_CLR(t, RECNO_MODIFIED);
1041 
1042 	return (ret);
1043 }
1044 
1045 /*
1046  * CDB___ram_fmap --
1047  *	Get fixed length records from a file.
1048  */
1049 static int
CDB___ram_fmap(dbc,top)1050 CDB___ram_fmap(dbc, top)
1051 	DBC *dbc;
1052 	db_recno_t top;
1053 {
1054 	BTREE *t;
1055 	DB *dbp;
1056 	DBT data;
1057 	db_recno_t recno;
1058 	u_int32_t len;
1059 	u_int8_t *sp, *ep, *p;
1060 	int is_modified, ret;
1061 
1062 	dbp = dbc->dbp;
1063 	t = dbp->bt_internal;
1064 
1065 	if ((ret = CDB___bam_nrecs(dbc, &recno)) != 0)
1066 		return (ret);
1067 
1068 	if (dbc->rdata.ulen < t->re_len) {
1069 		if ((ret =
1070 		    CDB___os_realloc(t->re_len, NULL, &dbc->rdata.data)) != 0) {
1071 			dbc->rdata.ulen = 0;
1072 			dbc->rdata.data = NULL;
1073 			return (ret);
1074 		}
1075 		dbc->rdata.ulen = t->re_len;
1076 	}
1077 
1078 	is_modified = F_ISSET(t, RECNO_MODIFIED);
1079 
1080 	memset(&data, 0, sizeof(data));
1081 	data.data = dbc->rdata.data;
1082 	data.size = t->re_len;
1083 
1084 	sp = (u_int8_t *)t->re_cmap;
1085 	ep = (u_int8_t *)t->re_emap;
1086 	while (recno < top) {
1087 		if (sp >= ep) {
1088 			F_SET(t, RECNO_EOF);
1089 			ret = DB_NOTFOUND;
1090 			goto err;
1091 		}
1092 		len = t->re_len;
1093 		for (p = dbc->rdata.data;
1094 		    sp < ep && len > 0; *p++ = *sp++, --len)
1095 			;
1096 
1097 		/*
1098 		 * Another process may have read this record from the input
1099 		 * file and stored it into the database already, in which
1100 		 * case we don't need to repeat that operation.  We detect
1101 		 * this by checking if the last record we've read is greater
1102 		 * or equal to the number of records in the database.
1103 		 *
1104 		 * XXX
1105 		 * We should just do a seek, since the records are fixed
1106 		 * length.
1107 		 */
1108 		if (t->re_last >= recno) {
1109 			if (len != 0)
1110 				memset(p, t->re_pad, len);
1111 
1112 			++recno;
1113 			if ((ret = CDB___ram_add(dbc, &recno, &data, 0, 0)) != 0)
1114 				goto err;
1115 		}
1116 		++t->re_last;
1117 	}
1118 	t->re_cmap = sp;
1119 
1120 err:	if (!is_modified)
1121 		F_CLR(t, RECNO_MODIFIED);
1122 
1123 	return (0);
1124 }
1125 
1126 /*
1127  * CDB___ram_vmap --
1128  *	Get variable length records from a file.
1129  */
1130 static int
CDB___ram_vmap(dbc,top)1131 CDB___ram_vmap(dbc, top)
1132 	DBC *dbc;
1133 	db_recno_t top;
1134 {
1135 	BTREE *t;
1136 	DBT data;
1137 	db_recno_t recno;
1138 	u_int8_t *sp, *ep;
1139 	int delim, is_modified, ret;
1140 
1141 	t = dbc->dbp->bt_internal;
1142 
1143 	if ((ret = CDB___bam_nrecs(dbc, &recno)) != 0)
1144 		return (ret);
1145 
1146 	delim = t->re_delim;
1147 	is_modified = F_ISSET(t, RECNO_MODIFIED);
1148 
1149 	memset(&data, 0, sizeof(data));
1150 
1151 	sp = (u_int8_t *)t->re_cmap;
1152 	ep = (u_int8_t *)t->re_emap;
1153 	while (recno < top) {
1154 		if (sp >= ep) {
1155 			F_SET(t, RECNO_EOF);
1156 			ret = DB_NOTFOUND;
1157 			goto err;
1158 		}
1159 		for (data.data = sp; sp < ep && *sp != delim; ++sp)
1160 			;
1161 
1162 		/*
1163 		 * Another process may have read this record from the input
1164 		 * file and stored it into the database already, in which
1165 		 * case we don't need to repeat that operation.  We detect
1166 		 * this by checking if the last record we've read is greater
1167 		 * or equal to the number of records in the database.
1168 		 */
1169 		if (t->re_last >= recno) {
1170 			data.size = sp - (u_int8_t *)data.data;
1171 			++recno;
1172 			if ((ret = CDB___ram_add(dbc, &recno, &data, 0, 0)) != 0)
1173 				goto err;
1174 		}
1175 		++t->re_last;
1176 		++sp;
1177 	}
1178 	t->re_cmap = sp;
1179 
1180 err:	if (!is_modified)
1181 		F_CLR(t, RECNO_MODIFIED);
1182 
1183 	return (ret);
1184 }
1185 
1186 /*
1187  * CDB___ram_add --
1188  *	Add records into the tree.
1189  */
1190 static int
CDB___ram_add(dbc,recnop,data,flags,bi_flags)1191 CDB___ram_add(dbc, recnop, data, flags, bi_flags)
1192 	DBC *dbc;
1193 	db_recno_t *recnop;
1194 	DBT *data;
1195 	u_int32_t flags, bi_flags;
1196 {
1197 	BKEYDATA *bk;
1198 	BTREE_CURSOR *cp;
1199 	PAGE *h;
1200 	db_indx_t indx;
1201 	int exact, ret, stack;
1202 
1203 	cp = dbc->internal;
1204 
1205 retry:	/* Find the slot for insertion. */
1206 	if ((ret = CDB___bam_rsearch(dbc, recnop,
1207 	    S_INSERT | (flags == DB_APPEND ? S_APPEND : 0), 1, &exact)) != 0)
1208 		return (ret);
1209 	h = cp->csp->page;
1210 	indx = cp->csp->indx;
1211 	stack = 1;
1212 
1213 	/*
1214 	 * If re-numbering records, the on-page deleted flag means this record
1215 	 * was implicitly created.  If not re-numbering records, the on-page
1216 	 * deleted flag means this record was implicitly created, or, it was
1217 	 * deleted at some time.
1218 	 *
1219 	 * If DB_NOOVERWRITE is set and the item already exists in the tree,
1220 	 * return an error unless the item was either marked for deletion or
1221 	 * only implicitly created.
1222 	 */
1223 	if (exact) {
1224 		bk = GET_BKEYDATA(h, indx);
1225 		if (!B_DISSET(bk->type) && flags == DB_NOOVERWRITE) {
1226 			ret = DB_KEYEXIST;
1227 			goto err;
1228 		}
1229 	}
1230 
1231 	/*
1232 	 * Select the arguments for CDB___bam_iitem() and do the insert.  If the
1233 	 * key is an exact match, or we're replacing the data item with a
1234 	 * new data item, replace the current item.  If the key isn't an exact
1235 	 * match, we're inserting a new key/data pair, before the search
1236 	 * location.
1237 	 */
1238 	switch (ret = CDB___bam_iitem(dbc,
1239 	    &h, &indx, NULL, data, exact ? DB_CURRENT : DB_BEFORE, bi_flags)) {
1240 	case 0:
1241 		/*
1242 		 * Don't adjust anything.
1243 		 *
1244 		 * If we inserted a record, no cursors need adjusting because
1245 		 * the only new record it's possible to insert is at the very
1246 		 * end of the tree.  The necessary adjustments to the internal
1247 		 * page counts were made by CDB___bam_iitem().
1248 		 *
1249 		 * If we overwrote a record, no cursors need adjusting because
1250 		 * future DBcursor->get calls will simply return the underlying
1251 		 * record (there's no adjustment made for the DB_CURRENT flag
1252 		 * when a cursor get operation immediately follows a cursor
1253 		 * delete operation, and the normal adjustment for the DB_NEXT
1254 		 * flag is still correct).
1255 		 */
1256 		break;
1257 	case DB_NEEDSPLIT:
1258 		/* Discard the stack of pages and split the page. */
1259 		(void)CDB___bam_stkrel(dbc, 0);
1260 		stack = 0;
1261 
1262 		if ((ret = CDB___bam_split(dbc, recnop)) != 0)
1263 			goto err;
1264 
1265 		goto retry;
1266 		/* NOTREACHED */
1267 	default:
1268 		goto err;
1269 	}
1270 
1271 
1272 err:	if (stack)
1273 		CDB___bam_stkrel(dbc, 0);
1274 
1275 	return (ret);
1276 }
1277