1 /*-
2  * Copyright (c) 2000, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/fop.h"
15 #include "dbinc/hash.h"
16 #include "dbinc/heap.h"
17 #include "dbinc/lock.h"
18 #include "dbinc/mp.h"
19 #include "dbinc/partition.h"
20 #include "dbinc/qam.h"
21 #include "dbinc/txn.h"
22 
23 static int __db_s_count __P((DB *));
24 static int __db_wrlock_err __P((ENV *));
25 static int __dbc_del_foreign __P((DBC *));
26 static int __dbc_del_oldskey __P((DB *, DBC *, DBT *, DBT *, DBT *));
27 static int __dbc_del_secondary __P((DBC *));
28 static int __dbc_pget_recno __P((DBC *, DBT *, DBT *, u_int32_t));
29 static inline int __dbc_put_append __P((DBC *,
30 		DBT *, DBT *, u_int32_t *, u_int32_t));
31 static inline int __dbc_put_fixed_len __P((DBC *, DBT *, DBT *));
32 static inline int __dbc_put_partial __P((DBC *,
33 		DBT *, DBT *, DBT *, DBT *, u_int32_t *, u_int32_t));
34 static int __dbc_put_primary __P((DBC *, DBT *, DBT *, u_int32_t));
35 static inline int __dbc_put_resolve_key __P((DBC *,
36 		DBT *, DBT *, u_int32_t *, u_int32_t));
37 static inline int __dbc_put_secondaries __P((DBC *,
38 		DBT *, DBT *, DBT *, int, DBT *, u_int32_t *));
39 
40 #define	CDB_LOCKING_INIT(env, dbc)					\
41 	/*								\
42 	 * If we are running CDB, this had better be either a write	\
43 	 * cursor or an immediate writer.  If it's a regular writer,	\
44 	 * that means we have an IWRITE lock and we need to upgrade	\
45 	 * it to a write lock.						\
46 	 */								\
47 	if (CDB_LOCKING(env)) {						\
48 		if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))	\
49 			return (__db_wrlock_err(env));			\
50 									\
51 		if (F_ISSET(dbc, DBC_WRITECURSOR) &&			\
52 		    (ret = __lock_get(env,				\
53 		    (dbc)->locker, DB_LOCK_UPGRADE, &(dbc)->lock_dbt,	\
54 		    DB_LOCK_WRITE, &(dbc)->mylock)) != 0)		\
55 			return (ret);					\
56 	}
57 #define	CDB_LOCKING_DONE(env, dbc)					\
58 	/* Release the upgraded lock. */				\
59 	if (F_ISSET(dbc, DBC_WRITECURSOR))				\
60 		(void)__lock_downgrade(					\
61 		    env, &(dbc)->mylock, DB_LOCK_IWRITE, 0);
62 
63 #define	SET_READ_LOCKING_FLAGS(dbc, var) do {				\
64 	var = 0;							\
65 	if (!F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED)) {	\
66 		if (LF_ISSET(DB_READ_COMMITTED))			\
67 			var = DBC_READ_COMMITTED | DBC_WAS_READ_COMMITTED; \
68 		if (LF_ISSET(DB_READ_UNCOMMITTED))			\
69 			var = DBC_READ_UNCOMMITTED;			\
70 	}								\
71 	LF_CLR(DB_READ_COMMITTED | DB_READ_UNCOMMITTED);		\
72 } while (0)
73 
74 /*
75  * __dbc_close --
76  *	DBC->close.
77  *
78  * PUBLIC: int __dbc_close __P((DBC *));
79  */
80 int
__dbc_close(dbc)81 __dbc_close(dbc)
82 	DBC *dbc;
83 {
84 	DB *dbp;
85 	DBC *opd;
86 	DBC_INTERNAL *cp;
87 #ifdef DIAGNOSTIC
88 	DB_THREAD_INFO *ip;
89 #endif
90 	DB_TXN *txn;
91 	ENV *env;
92 	int ret, t_ret;
93 
94 	dbp = dbc->dbp;
95 	env = dbp->env;
96 	cp = dbc->internal;
97 	opd = cp->opd;
98 	ret = 0;
99 
100 	/*
101 	 * Remove the cursor(s) from the active queue.  We may be closing two
102 	 * cursors at once here, a top-level one and a lower-level, off-page
103 	 * duplicate one.  The access-method specific cursor close routine must
104 	 * close both of them in a single call.
105 	 *
106 	 * !!!
107 	 * Cursors must be removed from the active queue before calling the
108 	 * access specific cursor close routine, btree depends on having that
109 	 * order of operations.
110 	 */
111 	MUTEX_LOCK(env, dbp->mutex);
112 
113 	if (opd != NULL) {
114 		DB_ASSERT(env, F_ISSET(opd, DBC_ACTIVE));
115 		F_CLR(opd, DBC_ACTIVE);
116 		TAILQ_REMOVE(&dbp->active_queue, opd, links);
117 	}
118 	DB_ASSERT(env, F_ISSET(dbc, DBC_ACTIVE));
119 	F_CLR(dbc, DBC_ACTIVE);
120 	TAILQ_REMOVE(&dbp->active_queue, dbc, links);
121 
122 	MUTEX_UNLOCK(env, dbp->mutex);
123 
124 	/* Call the access specific cursor close routine. */
125 	if ((t_ret =
126 	    dbc->am_close(dbc, PGNO_INVALID, NULL)) != 0 && ret == 0)
127 		ret = t_ret;
128 
129 	/*
130 	 * Release the lock after calling the access method specific close
131 	 * routine, a Btree cursor may have had pending deletes.
132 	 *
133 	 * Also, be sure not to free anything if mylock.off is INVALID;  in
134 	 * some cases, such as idup'ed read cursors and secondary update
135 	 * cursors, a cursor in a CDB environment may not have a lock at all.
136 	 */
137 	if (LOCK_ISSET(dbc->mylock)) {
138 		if ((t_ret = __LPUT(dbc, dbc->mylock)) != 0 && ret == 0)
139 			ret = t_ret;
140 
141 		/* For safety's sake, since this is going on the free queue. */
142 		memset(&dbc->mylock, 0, sizeof(dbc->mylock));
143 		if (opd != NULL)
144 			memset(&opd->mylock, 0, sizeof(opd->mylock));
145 	}
146 
147 	/*
148 	 * Remove this cursor's locker ID from its family.
149 	 */
150 	if (F_ISSET(dbc, DBC_OWN_LID) && F_ISSET(dbc, DBC_FAMILY)) {
151 		if ((t_ret = __lock_familyremove(env->lk_handle,
152 		    dbc->lref)) != 0 && ret == 0)
153 			ret = t_ret;
154 		F_CLR(dbc, DBC_FAMILY);
155 	}
156 #ifdef DIAGNOSTIC
157 	if (dbc->locker != NULL) {
158 		ENV_GET_THREAD_INFO(env, ip);
159 		if (ip != NULL)
160 			ip->dbth_locker = dbc->locker->prev_locker;
161 		dbc->locker->prev_locker = INVALID_ROFF;
162 	}
163 #endif
164 
165 	if ((txn = dbc->txn) != NULL)
166 		txn->cursors--;
167 
168 	/* Move the cursor(s) to the free queue. */
169 	MUTEX_LOCK(env, dbp->mutex);
170 	if (opd != NULL) {
171 		if (txn != NULL)
172 			txn->cursors--;
173 		TAILQ_INSERT_TAIL(&dbp->free_queue, opd, links);
174 	}
175 	TAILQ_INSERT_TAIL(&dbp->free_queue, dbc, links);
176 	MUTEX_UNLOCK(env, dbp->mutex);
177 
178 	if (txn != NULL && F_ISSET(txn, TXN_PRIVATE) && txn->cursors == 0 &&
179 	    (t_ret = __txn_commit(txn, 0)) != 0 && ret == 0)
180 		ret = t_ret;
181 
182 	return (ret);
183 }
184 
185 /*
186  * __dbc_destroy --
187  *	Destroy the cursor, called after DBC->close.
188  *
189  * PUBLIC: int __dbc_destroy __P((DBC *));
190  */
191 int
__dbc_destroy(dbc)192 __dbc_destroy(dbc)
193 	DBC *dbc;
194 {
195 	DB *dbp;
196 	ENV *env;
197 	int ret, t_ret;
198 
199 	dbp = dbc->dbp;
200 	env = dbp->env;
201 
202 	/* Remove the cursor from the free queue. */
203 	MUTEX_LOCK(env, dbp->mutex);
204 	TAILQ_REMOVE(&dbp->free_queue, dbc, links);
205 	MUTEX_UNLOCK(env, dbp->mutex);
206 
207 	/* Free up allocated memory. */
208 	if (dbc->my_rskey.data != NULL)
209 		__os_free(env, dbc->my_rskey.data);
210 	if (dbc->my_rkey.data != NULL)
211 		__os_free(env, dbc->my_rkey.data);
212 	if (dbc->my_rdata.data != NULL)
213 		__os_free(env, dbc->my_rdata.data);
214 
215 	/* Call the access specific cursor destroy routine. */
216 	ret = dbc->am_destroy == NULL ? 0 : dbc->am_destroy(dbc);
217 
218 	/*
219 	 * Release the lock id for this cursor.
220 	 */
221 	if (LOCKING_ON(env) &&
222 	    F_ISSET(dbc, DBC_OWN_LID) &&
223 	    (t_ret = __lock_id_free(env, dbc->lref)) != 0 && ret == 0)
224 		ret = t_ret;
225 
226 	__os_free(env, dbc);
227 
228 	return (ret);
229 }
230 
231 /*
232  * __dbc_cmp --
233  *	Compare the position of two cursors. Return whether two cursors are
234  *	pointing to the same key/data pair.
235  *
236  * result == 0  if both cursors refer to the same item.
237  * result == 1  otherwise
238  *
239  * PUBLIC: int __dbc_cmp __P((DBC *, DBC *, int *));
240  */
241 int
__dbc_cmp(dbc,other_dbc,result)242 __dbc_cmp(dbc, other_dbc, result)
243 	DBC *dbc, *other_dbc;
244 	int *result;
245 {
246 	DBC *curr_dbc, *curr_odbc;
247 	DBC_INTERNAL *dbc_int, *odbc_int;
248 	ENV *env;
249 	int ret;
250 
251 	env = dbc->env;
252 	ret = 0;
253 
254 #ifdef HAVE_PARTITION
255 	if (DB_IS_PARTITIONED(dbc->dbp)) {
256 		dbc = ((PART_CURSOR *)dbc->internal)->sub_cursor;
257 		other_dbc = ((PART_CURSOR *)other_dbc->internal)->sub_cursor;
258 	}
259 	/* Both cursors must still be valid. */
260 	if (dbc == NULL || other_dbc == NULL) {
261 		__db_errx(env, DB_STR("0692",
262 "Both cursors must be initialized before calling DBC->cmp."));
263 		return (EINVAL);
264 	}
265 
266 	if (dbc->dbp != other_dbc->dbp) {
267 		*result = 1;
268 		return (0);
269 	}
270 #endif
271 
272 #ifdef HAVE_COMPRESSION
273 	if (DB_IS_COMPRESSED(dbc->dbp))
274 		return (__bamc_compress_cmp(dbc, other_dbc, result));
275 #endif
276 
277 	curr_dbc = dbc;
278 	curr_odbc = other_dbc;
279 	dbc_int = dbc->internal;
280 	odbc_int = other_dbc->internal;
281 
282 	/* Both cursors must be on valid positions. */
283 	if (dbc_int->pgno == PGNO_INVALID || odbc_int->pgno == PGNO_INVALID) {
284 		__db_errx(env, DB_STR("0692",
285 "Both cursors must be initialized before calling DBC->cmp."));
286 		return (EINVAL);
287 	}
288 
289 	/*
290 	 * Use a loop since cursors can be nested. Off page duplicate
291 	 * sets can only be nested one level deep, so it is safe to use a
292 	 * while (true) loop.
293 	 */
294 	while (1) {
295 		if (dbc_int->pgno == odbc_int->pgno &&
296 		    dbc_int->indx == odbc_int->indx) {
297 			/*
298 			 * If one cursor is sitting on an off page duplicate
299 			 * set, the other will be pointing to the same set. Be
300 			 * careful, and check  anyway.
301 			 */
302 			if (dbc_int->opd != NULL && odbc_int->opd != NULL) {
303 				curr_dbc = dbc_int->opd;
304 				curr_odbc = odbc_int->opd;
305 				dbc_int = dbc_int->opd->internal;
306 				odbc_int= odbc_int->opd->internal;
307 				continue;
308 			} else if (dbc_int->opd == NULL &&
309 			    odbc_int->opd == NULL)
310 				*result = 0;
311 			else {
312 				__db_errx(env, DB_STR("0694",
313 	    "DBCursor->cmp mismatched off page duplicate cursor pointers."));
314 				return (EINVAL);
315 			}
316 
317 			switch (curr_dbc->dbtype) {
318 			case DB_HASH:
319 				/*
320 				 * Make sure that on-page duplicate data
321 				 * indexes match, and that the deleted
322 				 * flags are consistent.
323 				 */
324 				ret = __hamc_cmp(curr_dbc, curr_odbc, result);
325 				break;
326 			case DB_BTREE:
327 			case DB_RECNO:
328 				/*
329 				 * Check for consisted deleted flags on btree
330 				 * specific cursors.
331 				 */
332 				ret = __bamc_cmp(curr_dbc, curr_odbc, result);
333 				break;
334 			default:
335 				/* NO-OP break out. */
336 				break;
337 			}
338 		} else
339 			*result = 1;
340 		return (ret);
341 	}
342 	/* NOTREACHED. */
343 	return (ret);
344 }
345 
346 /*
347  * __dbc_count --
348  *	Return a count of duplicate data items.
349  *
350  * PUBLIC: int __dbc_count __P((DBC *, db_recno_t *));
351  */
352 int
__dbc_count(dbc,recnop)353 __dbc_count(dbc, recnop)
354 	DBC *dbc;
355 	db_recno_t *recnop;
356 {
357 	ENV *env;
358 	int ret;
359 
360 	env = dbc->env;
361 
362 #ifdef HAVE_PARTITION
363 	if (DB_IS_PARTITIONED(dbc->dbp))
364 		dbc = ((PART_CURSOR *)dbc->internal)->sub_cursor;
365 #endif
366 	/*
367 	 * Cursor Cleanup Note:
368 	 * All of the cursors passed to the underlying access methods by this
369 	 * routine are not duplicated and will not be cleaned up on return.
370 	 * So, pages/locks that the cursor references must be resolved by the
371 	 * underlying functions.
372 	 */
373 	switch (dbc->dbtype) {
374 	case DB_HEAP:
375 	case DB_QUEUE:
376 	case DB_RECNO:
377 		*recnop = 1;
378 		break;
379 	case DB_HASH:
380 		if (dbc->internal->opd == NULL) {
381 			if ((ret = __hamc_count(dbc, recnop)) != 0)
382 				return (ret);
383 			break;
384 		}
385 		/* FALLTHROUGH */
386 	case DB_BTREE:
387 #ifdef HAVE_COMPRESSION
388 		if (DB_IS_COMPRESSED(dbc->dbp))
389 			return (__bamc_compress_count(dbc, recnop));
390 #endif
391 		if ((ret = __bamc_count(dbc, recnop)) != 0)
392 			return (ret);
393 		break;
394 	case DB_UNKNOWN:
395 	default:
396 		return (__db_unknown_type(env, "__dbc_count", dbc->dbtype));
397 	}
398 	return (0);
399 }
400 
401 /*
402  * __dbc_del --
403  *	DBC->del.
404  *
405  * PUBLIC: int __dbc_del __P((DBC *, u_int32_t));
406  */
407 int
__dbc_del(dbc,flags)408 __dbc_del(dbc, flags)
409 	DBC *dbc;
410 	u_int32_t flags;
411 {
412 	DB *dbp;
413 	ENV *env;
414 	int ret;
415 
416 	dbp = dbc->dbp;
417 	env = dbp->env;
418 
419 	CDB_LOCKING_INIT(env, dbc);
420 	F_CLR(dbc, DBC_ERROR);
421 
422 	/*
423 	 * If we're a secondary index, and DB_UPDATE_SECONDARY isn't set
424 	 * (which it only is if we're being called from a primary update),
425 	 * then we need to call through to the primary and delete the item.
426 	 *
427 	 * Note that this will delete the current item;  we don't need to
428 	 * delete it ourselves as well, so we can just goto done.
429 	 */
430 	if (flags != DB_UPDATE_SECONDARY && F_ISSET(dbp, DB_AM_SECONDARY)) {
431 		ret = __dbc_del_secondary(dbc);
432 		goto done;
433 	}
434 
435 	/*
436 	 * If we are a foreign db, go through and check any foreign key
437 	 * constraints first, which will make rolling back changes on an abort
438 	 * simpler.
439 	 */
440 	if (LIST_FIRST(&dbp->f_primaries) != NULL &&
441 	    (ret = __dbc_del_foreign(dbc)) != 0)
442 		goto done;
443 
444 	/*
445 	 * If we are a primary and have secondary indices, go through
446 	 * and delete any secondary keys that point at the current record.
447 	 */
448 	if (DB_IS_PRIMARY(dbp) &&
449 	    (ret = __dbc_del_primary(dbc)) != 0)
450 		goto done;
451 
452 #ifdef HAVE_COMPRESSION
453 	if (DB_IS_COMPRESSED(dbp))
454 		ret = __bamc_compress_del(dbc, flags);
455 	else
456 #endif
457 		ret = __dbc_idel(dbc, flags);
458 
459 done:	CDB_LOCKING_DONE(env, dbc);
460 
461 	if (!DB_RETOK_DBCDEL(ret))
462 		F_SET(dbc, DBC_ERROR);
463 	return (ret);
464 }
465 
466 /*
467  * __dbc_del --
468  *	Implemenation of DBC->del.
469  *
470  * PUBLIC: int __dbc_idel __P((DBC *, u_int32_t));
471  */
472 int
__dbc_idel(dbc,flags)473 __dbc_idel(dbc, flags)
474 	DBC *dbc;
475 	u_int32_t flags;
476 {
477 	DB *dbp;
478 	DBC *opd;
479 	int ret, t_ret;
480 
481 	COMPQUIET(flags, 0);
482 
483 	dbp = dbc->dbp;
484 
485 	/*
486 	 * Cursor Cleanup Note:
487 	 * All of the cursors passed to the underlying access methods by this
488 	 * routine are not duplicated and will not be cleaned up on return.
489 	 * So, pages/locks that the cursor references must be resolved by the
490 	 * underlying functions.
491 	 */
492 
493 	/*
494 	 * Off-page duplicate trees are locked in the primary tree, that is,
495 	 * we acquire a write lock in the primary tree and no locks in the
496 	 * off-page dup tree.  If the del operation is done in an off-page
497 	 * duplicate tree, call the primary cursor's upgrade routine first.
498 	 */
499 	opd = dbc->internal->opd;
500 	if (opd == NULL)
501 		ret = dbc->am_del(dbc, flags);
502 	else if ((ret = dbc->am_writelock(dbc)) == 0)
503 		ret = opd->am_del(opd, flags);
504 
505 	/*
506 	 * If this was an update that is supporting dirty reads
507 	 * then we may have just swapped our read for a write lock
508 	 * which is held by the surviving cursor.  We need
509 	 * to explicitly downgrade this lock.  The closed cursor
510 	 * may only have had a read lock.
511 	 */
512 	if (ret == 0 && F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
513 	    dbc->internal->lock_mode == DB_LOCK_WRITE) {
514 		if ((ret = __TLPUT(dbc, dbc->internal->lock)) == 0)
515 			dbc->internal->lock_mode = DB_LOCK_WWRITE;
516 		if (dbc->internal->page != NULL && (t_ret =
517 		    __memp_shared(dbp->mpf, dbc->internal->page)) != 0 &&
518 		    ret == 0)
519 			ret = t_ret;
520 	}
521 
522 	return (ret);
523 }
524 
525 /*
526  * __dbc_db_stream --
527  *
528  * DBC->db_stream
529  *
530  * PUBLIC: int __dbc_db_stream __P((DBC *, DB_STREAM **, u_int32_t));
531  */
532 int
__dbc_db_stream(dbc,dbsp,flags)533 __dbc_db_stream(dbc, dbsp, flags)
534 	DBC *dbc;
535 	DB_STREAM **dbsp;
536 	u_int32_t flags;
537 {
538 	ENV *env;
539 	int ret;
540 	u_int32_t oflags;
541 
542 	env = dbc->env;
543 	oflags = 0;
544 
545 	if ((ret = __db_fchk(
546 	    env, "DBC->db_stream", flags,
547 	    DB_STREAM_READ | DB_STREAM_WRITE | DB_STREAM_SYNC_WRITE)) != 0)
548 		return (ret);
549 
550 	if (DB_IS_READONLY(dbc->dbp)) {
551 		LF_SET(DB_STREAM_READ);
552 		oflags |= DB_FOP_READONLY;
553 	}
554 	if (LF_ISSET(DB_STREAM_READ) && LF_ISSET(DB_STREAM_WRITE)) {
555 		ret = USR_ERR(env, EINVAL);
556 		__db_errx(env, DB_STR("0750",
557 	    "Error, cannot set both DB_STREAM_WRITE and DB_STREAM_READ."));
558 		goto err;
559 	}
560 
561 	if (flags & DB_STREAM_READ)
562 		oflags |= DB_FOP_READONLY;
563 	else
564 		oflags |= DB_FOP_WRITE;
565 	if (flags & DB_STREAM_SYNC_WRITE)
566 		oflags |= DB_FOP_SYNC_WRITE;
567 
568 	ret = __db_stream_init(dbc, dbsp, oflags);
569 
570 err:	return (ret);
571 }
572 
573 /*
574  * __dbc_get_blob_id --
575  *
576  * Returns the blob id stored in the data record to which the cursor currently
577  * points.  Returns EINVAL if the cursor does not point to a blob record.
578  *
579  * PUBLIC: int __dbc_get_blob_id __P((DBC *, db_seq_t *));
580  */
581 int
__dbc_get_blob_id(dbc,blob_id)582 __dbc_get_blob_id(dbc, blob_id)
583 	DBC *dbc;
584 	db_seq_t *blob_id;
585 {
586 	DBT key, data;
587 	BBLOB bl;
588 	HBLOB hbl;
589 	HEAPBLOBHDR bhdr;
590 	int ret;
591 
592 	if (dbc->dbtype != DB_BTREE &&
593 	    dbc->dbtype != DB_HEAP && dbc->dbtype != DB_HASH) {
594 		return (EINVAL);
595 	}
596 
597 	ret = 0;
598 	memset(&key, 0, sizeof(DBT));
599 	memset(&data, 0, sizeof(DBT));
600 	/* Get the blob database record instead of the blob. */
601 	data.flags |= DB_DBT_BLOB_REC;
602 
603 	/*
604 	 * It would be great if there was a more efficient way to do this, but
605 	 * the complexities of getting a page from a database, especially
606 	 * when taking into account things like partitions and compression,
607 	 * make that more trouble than it is worth.
608 	 */
609 	if ((ret = __dbc_get(dbc, &key, &data, DB_CURRENT)) != 0)
610 		goto err;
611 
612 	switch (dbc->dbtype) {
613 	case DB_BTREE:
614 		if (data.size != BBLOB_SIZE) {
615 			ret = USR_ERR(dbc->env, EINVAL);
616 			goto err;
617 		}
618 		memcpy(&bl, data.data, BBLOB_SIZE);
619 		if (B_TYPE(bl.type) != B_BLOB) {
620 			ret = USR_ERR(dbc->env, EINVAL);
621 			goto err;
622 		}
623 		*blob_id = (db_seq_t)bl.id;
624 		break;
625 	case DB_HEAP:
626 		if (data.size != HEAPBLOBREC_SIZE) {
627 			ret = USR_ERR(dbc->env, EINVAL);
628 			goto err;
629 		}
630 		memcpy(&bhdr, data.data, HEAPBLOBREC_SIZE);
631 		if (!F_ISSET(&bhdr.std_hdr, HEAP_RECBLOB)) {
632 			ret = USR_ERR(dbc->env, EINVAL);
633 			goto err;
634 		}
635 		*blob_id = (db_seq_t)bhdr.id;
636 		break;
637 	case DB_HASH:
638 		if (data.size != HBLOB_SIZE) {
639 			ret = USR_ERR(dbc->env, EINVAL);
640 			goto err;
641 		}
642 		memcpy(&hbl, data.data, HBLOB_SIZE);
643 		if (HPAGE_PTYPE(&hbl) != H_BLOB) {
644 			ret = USR_ERR(dbc->env, EINVAL);
645 			goto err;
646 		}
647 		*blob_id = (db_seq_t)hbl.id;
648 		break;
649 	default:
650 		ret = USR_ERR(dbc->env, EINVAL);
651 		goto err;
652 	}
653 
654 err:	return (ret);
655 }
656 
657 /*
658  * __dbc_get_blob_size --
659  *
660  * Returns the blob file size stored in the data record to which the cursor
661  * currently points.  Returns EINVAL if the cursor does not point to a blob
662  * record.
663  *
664  * PUBLIC: int __dbc_get_blob_size __P((DBC *, off_t *));
665  */
666 int
__dbc_get_blob_size(dbc,size)667 __dbc_get_blob_size(dbc, size)
668 	DBC *dbc;
669 	off_t *size;
670 {
671 	DBT key, data;
672 	ENV *env;
673 	BBLOB bl;
674 	HBLOB hbl;
675 	HEAPBLOBHDR bhdr;
676 	int ret;
677 
678 	if (dbc->dbtype != DB_BTREE &&
679 	    dbc->dbtype != DB_HEAP && dbc->dbtype != DB_HASH) {
680 		return (EINVAL);
681 	}
682 
683 	env = dbc->env;
684 	ret = 0;
685 	memset(&key, 0, sizeof(DBT));
686 	memset(&data, 0, sizeof(DBT));
687 	/* Get the blob database record instead of the blob. */
688 	data.flags |= DB_DBT_BLOB_REC;
689 
690 	/*
691 	 * It would be great if there was a more efficient way to do this, but
692 	 * the complexities of getting a page from a database, especially
693 	 * when taking into account things like partitions and compression,
694 	 * make that more trouble than it is worth.
695 	 */
696 	if ((ret = __dbc_get(dbc, &key, &data, DB_CURRENT)) != 0)
697 		goto err;
698 
699 	switch (dbc->dbtype) {
700 	case DB_BTREE:
701 		if (data.size != BBLOB_SIZE) {
702 			ret = USR_ERR(dbc->env, EINVAL);
703 			goto err;
704 		}
705 		memcpy(&bl, data.data, BBLOB_SIZE);
706 		if (B_TYPE(bl.type) != B_BLOB) {
707 			ret = USR_ERR(dbc->env, EINVAL);
708 			goto err;
709 		}
710 		GET_BLOB_SIZE(env, bl, *size, ret);
711 		break;
712 	case DB_HEAP:
713 		if (data.size != HEAPBLOBREC_SIZE) {
714 			ret = USR_ERR(dbc->env, EINVAL);
715 			goto err;
716 		}
717 		memcpy(&bhdr, data.data, HEAPBLOBREC_SIZE);
718 		if (!F_ISSET(&bhdr.std_hdr, HEAP_RECBLOB)) {
719 			ret = USR_ERR(dbc->env, EINVAL);
720 			goto err;
721 		}
722 		GET_BLOB_SIZE(env, bhdr, *size, ret);
723 		break;
724 	case DB_HASH:
725 		if (data.size != HBLOB_SIZE) {
726 			ret = USR_ERR(dbc->env, EINVAL);
727 			goto err;
728 		}
729 		memcpy(&hbl, data.data, HBLOB_SIZE);
730 		if (HPAGE_PTYPE(&hbl) != H_BLOB) {
731 			ret = USR_ERR(dbc->env, EINVAL);
732 			goto err;
733 		}
734 		GET_BLOB_SIZE(env, hbl, *size, ret);
735 		break;
736 	default:
737 		ret = USR_ERR(dbc->env, EINVAL);
738 		goto err;
739 	}
740 
741 err:	return (ret);
742 }
743 
744 /*
745  * __dbc_set_blob_size --
746  *
747  * Sets the blob file size in the data record to which the cursor
748  * currently points.  Returns EINVAL if the cursor does not point to a blob
749  * record.
750  *
751  * PUBLIC: int __dbc_set_blob_size __P((DBC *, off_t));
752  */
753 int
__dbc_set_blob_size(dbc,size)754 __dbc_set_blob_size(dbc, size)
755 	DBC *dbc;
756 	off_t size;
757 {
758 	DBT key, data;
759 	BBLOB *bl;
760 	HBLOB *hbl;
761 	HEAPBLOBHDR *bhdr;
762 	int ret;
763 
764 	if (dbc->dbtype != DB_BTREE &&
765 	    dbc->dbtype != DB_HEAP && dbc->dbtype != DB_HASH) {
766 		return (EINVAL);
767 	}
768 
769 	ret = 0;
770 	memset(&key, 0, sizeof(DBT));
771 	memset(&data, 0, sizeof(DBT));
772 	/* Get the blob database record instead of the blob. */
773 	data.flags |= DB_DBT_BLOB_REC;
774 
775 	/*
776 	 * It would be great if there was a more efficient way to do this, but
777 	 * the complexities of getting a page from a database, especially
778 	 * when taking into account things like partitions and compression,
779 	 * make that more trouble than it is worth.
780 	 */
781 	if ((ret = __dbc_get(dbc, &key, &data, DB_CURRENT)) != 0)
782 		goto err;
783 
784 	switch (dbc->dbtype) {
785 	case DB_BTREE:
786 		bl = (BBLOB *)data.data;
787 		if (bl == NULL ||
788 		    B_TYPE(bl->type) != B_BLOB || data.size != BBLOB_SIZE) {
789 			ret = USR_ERR(dbc->env, EINVAL);
790 			goto err;
791 		}
792 		SET_BLOB_SIZE(bl, size, BBLOB);
793 		break;
794 	case DB_HEAP:
795 		bhdr = (HEAPBLOBHDR *)data.data;
796 		if (bhdr == NULL ||
797 		    !F_ISSET(&bhdr->std_hdr, HEAP_RECBLOB) ||
798 		    data.size != HEAPBLOBREC_SIZE) {
799 			ret = USR_ERR(dbc->env, EINVAL);
800 			goto err;
801 		}
802 		SET_BLOB_SIZE(bhdr, size, HEAPBLOBHDR);
803 		break;
804 	case DB_HASH:
805 		hbl = data.data;
806 		if (hbl == NULL ||
807 		    HPAGE_PTYPE(hbl) != H_BLOB || data.size != HBLOB_SIZE) {
808 			ret = USR_ERR(dbc->env, EINVAL);
809 			goto err;
810 		}
811 		SET_BLOB_SIZE((HBLOB *)hbl, size, HBLOB);
812 		break;
813 	default:
814 		ret = USR_ERR(dbc->env, EINVAL);
815 		goto err;
816 	}
817 
818 	if ((ret = __dbc_put(dbc, &key, &data, DB_CURRENT)) != 0)
819 		goto err;
820 
821 err:	return (ret);
822 }
823 
824 #ifdef HAVE_COMPRESSION
825 /*
826  * __dbc_bulk_del --
827  *	Bulk del for a cursor.
828  *
829  *	Only implemented for compressed BTrees. In this file in order to
830  *	use the CDB_LOCKING_* macros.
831  *
832  * PUBLIC: #ifdef HAVE_COMPRESSION
833  * PUBLIC: int __dbc_bulk_del __P((DBC *, DBT *, u_int32_t));
834  * PUBLIC: #endif
835  */
836 int
__dbc_bulk_del(dbc,key,flags)837 __dbc_bulk_del(dbc, key, flags)
838 	DBC *dbc;
839 	DBT *key;
840 	u_int32_t flags;
841 {
842 	ENV *env;
843 	int ret;
844 
845 	env = dbc->env;
846 
847 	DB_ASSERT(env, DB_IS_COMPRESSED(dbc->dbp));
848 
849 	CDB_LOCKING_INIT(env, dbc);
850 	F_CLR(dbc, DBC_ERROR);
851 
852 	ret = __bamc_compress_bulk_del(dbc, key, flags);
853 
854 	CDB_LOCKING_DONE(env, dbc);
855 
856 	return (ret);
857 }
858 #endif
859 
860 /*
861  * __dbc_dup --
862  *	Duplicate a cursor
863  *
864  * PUBLIC: int __dbc_dup __P((DBC *, DBC **, u_int32_t));
865  */
866 int
__dbc_dup(dbc_orig,dbcp,flags)867 __dbc_dup(dbc_orig, dbcp, flags)
868 	DBC *dbc_orig;
869 	DBC **dbcp;
870 	u_int32_t flags;
871 {
872 	DBC *dbc_n, *dbc_nopd;
873 	int ret;
874 
875 	dbc_n = dbc_nopd = NULL;
876 
877 	/* Allocate a new cursor and initialize it. */
878 	if ((ret = __dbc_idup(dbc_orig, &dbc_n, flags)) != 0)
879 		goto err;
880 	*dbcp = dbc_n;
881 
882 	/*
883 	 * If the cursor references an off-page duplicate tree, allocate a
884 	 * new cursor for that tree and initialize it.
885 	 */
886 	if (dbc_orig->internal->opd != NULL) {
887 		if ((ret =
888 		   __dbc_idup(dbc_orig->internal->opd, &dbc_nopd, flags)) != 0)
889 			goto err;
890 		dbc_n->internal->opd = dbc_nopd;
891 		dbc_nopd->internal->pdbc = dbc_n;
892 	}
893 	return (0);
894 
895 err:	if (dbc_n != NULL)
896 		(void)__dbc_close(dbc_n);
897 	if (dbc_nopd != NULL)
898 		(void)__dbc_close(dbc_nopd);
899 
900 	return (ret);
901 }
902 
903 /*
904  * __dbc_idup --
905  *	Internal version of __dbc_dup.
906  *
907  * PUBLIC: int __dbc_idup __P((DBC *, DBC **, u_int32_t));
908  */
909 int
__dbc_idup(dbc_orig,dbcp,flags)910 __dbc_idup(dbc_orig, dbcp, flags)
911 	DBC *dbc_orig, **dbcp;
912 	u_int32_t flags;
913 {
914 	DB *dbp;
915 	DBC *dbc_n;
916 	DBC_INTERNAL *int_n, *int_orig;
917 	ENV *env;
918 	int ret;
919 
920 	dbp = dbc_orig->dbp;
921 	dbc_n = *dbcp;
922 	env = dbp->env;
923 
924 	if ((ret = __db_cursor_int(dbp, dbc_orig->thread_info,
925 	    dbc_orig->txn, dbc_orig->dbtype, dbc_orig->internal->root,
926 	    F_ISSET(dbc_orig, DBC_OPD) | DBC_DUPLICATE,
927 	    dbc_orig->locker, &dbc_n)) != 0)
928 		return (ret);
929 
930 	/* Position the cursor if requested, acquiring the necessary locks. */
931 	if (LF_ISSET(DB_POSITION)) {
932 		int_n = dbc_n->internal;
933 		int_orig = dbc_orig->internal;
934 
935 		dbc_n->flags |= dbc_orig->flags & ~DBC_OWN_LID;
936 
937 		int_n->indx = int_orig->indx;
938 		int_n->pgno = int_orig->pgno;
939 		int_n->root = int_orig->root;
940 		int_n->lock_mode = int_orig->lock_mode;
941 
942 		int_n->stream_start_pgno = int_orig->stream_start_pgno;
943 		int_n->stream_off = int_orig->stream_off;
944 		int_n->stream_curr_pgno = int_orig->stream_curr_pgno;
945 
946 #ifdef HAVE_PARTITION
947 		if (DB_IS_PARTITIONED(dbp)) {
948 			if ((ret = __partc_dup(dbc_orig, dbc_n)) != 0)
949 				goto err;
950 		} else
951 #endif
952 		switch (dbc_orig->dbtype) {
953 		case DB_QUEUE:
954 			if ((ret = __qamc_dup(dbc_orig, dbc_n)) != 0)
955 				goto err;
956 			break;
957 		case DB_BTREE:
958 		case DB_RECNO:
959 			if ((ret = __bamc_dup(dbc_orig, dbc_n, flags)) != 0)
960 				goto err;
961 			break;
962 		case DB_HASH:
963 			if ((ret = __hamc_dup(dbc_orig, dbc_n)) != 0)
964 				goto err;
965 			break;
966 		case DB_HEAP:
967 			if ((ret = __heapc_dup(dbc_orig, dbc_n)) != 0)
968 				goto err;
969 			break;
970 		case DB_UNKNOWN:
971 		default:
972 			ret = __db_unknown_type(env,
973 			    "__dbc_idup", dbc_orig->dbtype);
974 			goto err;
975 		}
976 	} else if (F_ISSET(dbc_orig, DBC_BULK)) {
977 		/*
978 		 * For bulk cursors, remember what page were on, even if we
979 		 * don't know that the next operation will be nearby.
980 		 */
981 		dbc_n->internal->pgno = dbc_orig->internal->pgno;
982 	}
983 
984 	/* Copy the locking flags to the new cursor. */
985 	F_SET(dbc_n, F_ISSET(dbc_orig, DBC_BULK |
986 	    DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED | DBC_WRITECURSOR));
987 
988 	/*
989 	 * If we're in CDB and this isn't an offpage dup cursor, then
990 	 * we need to get a lock for the duplicated cursor.
991 	 */
992 	if (CDB_LOCKING(env) && !F_ISSET(dbc_n, DBC_OPD) &&
993 	    (ret = __lock_get(env, dbc_n->locker, 0,
994 	    &dbc_n->lock_dbt, F_ISSET(dbc_orig, DBC_WRITECURSOR) ?
995 	    DB_LOCK_IWRITE : DB_LOCK_READ, &dbc_n->mylock)) != 0)
996 		goto err;
997 
998 	dbc_n->priority = dbc_orig->priority;
999 	dbc_n->internal->pdbc = dbc_orig->internal->pdbc;
1000 	*dbcp = dbc_n;
1001 	return (0);
1002 
1003 err:	(void)__dbc_close(dbc_n);
1004 	return (ret);
1005 }
1006 
1007 /*
1008  * __dbc_newopd --
1009  *	Create a new off-page duplicate cursor.
1010  *
1011  * PUBLIC: int __dbc_newopd __P((DBC *, db_pgno_t, DBC *, DBC **));
1012  */
1013 int
__dbc_newopd(dbc_parent,root,oldopd,dbcp)1014 __dbc_newopd(dbc_parent, root, oldopd, dbcp)
1015 	DBC *dbc_parent;
1016 	db_pgno_t root;
1017 	DBC *oldopd;
1018 	DBC **dbcp;
1019 {
1020 	DB *dbp;
1021 	DBC *opd;
1022 	DBTYPE dbtype;
1023 	int ret;
1024 
1025 	dbp = dbc_parent->dbp;
1026 	dbtype = (dbp->dup_compare == NULL) ? DB_RECNO : DB_BTREE;
1027 
1028 	/*
1029 	 * On failure, we want to default to returning the old off-page dup
1030 	 * cursor, if any;  our caller can't be left with a dangling pointer
1031 	 * to a freed cursor.  On error the only allowable behavior is to
1032 	 * close the cursor (and the old OPD cursor it in turn points to), so
1033 	 * this should be safe.
1034 	 */
1035 	*dbcp = oldopd;
1036 
1037 	if ((ret = __db_cursor_int(dbp, dbc_parent->thread_info,
1038 	    dbc_parent->txn,
1039 	    dbtype, root, DBC_OPD, dbc_parent->locker, &opd)) != 0)
1040 		return (ret);
1041 
1042 	opd->priority = dbc_parent->priority;
1043 	opd->internal->pdbc = dbc_parent;
1044 	*dbcp = opd;
1045 
1046 	/*
1047 	 * Check to see if we already have an off-page dup cursor that we've
1048 	 * passed in.  If we do, close it.  It'd be nice to use it again
1049 	 * if it's a cursor belonging to the right tree, but if we're doing
1050 	 * a cursor-relative operation this might not be safe, so for now
1051 	 * we'll take the easy way out and always close and reopen.
1052 	 *
1053 	 * Note that under no circumstances do we want to close the old
1054 	 * cursor without returning a valid new one;  we don't want to
1055 	 * leave the main cursor in our caller with a non-NULL pointer
1056 	 * to a freed off-page dup cursor.
1057 	 */
1058 	if (oldopd != NULL && (ret = __dbc_close(oldopd)) != 0)
1059 		return (ret);
1060 
1061 	return (0);
1062 }
1063 
1064 /*
1065  * __dbc_get --
1066  *	Get using a cursor.
1067  *
1068  * PUBLIC: int __dbc_get __P((DBC *, DBT *, DBT *, u_int32_t));
1069  */
1070 int
__dbc_get(dbc,key,data,flags)1071 __dbc_get(dbc, key, data, flags)
1072 	DBC *dbc;
1073 	DBT *key, *data;
1074 	u_int32_t flags;
1075 {
1076 	F_CLR(dbc, DBC_ERROR);
1077 #ifdef HAVE_PARTITION
1078 	if (F_ISSET(dbc, DBC_PARTITIONED))
1079 		return (__partc_get(dbc, key, data, flags));
1080 #endif
1081 
1082 #ifdef HAVE_COMPRESSION
1083 	if (DB_IS_COMPRESSED(dbc->dbp))
1084 		return (__bamc_compress_get(dbc, key, data, flags));
1085 #endif
1086 
1087 	return (__dbc_iget(dbc, key, data, flags));
1088 }
1089 
1090 /*
1091  * __dbc_iget --
1092  *	Implementation of get using a cursor.
1093  *
1094  * PUBLIC: int __dbc_iget __P((DBC *, DBT *, DBT *, u_int32_t));
1095  */
1096 int
__dbc_iget(dbc,key,data,flags)1097 __dbc_iget(dbc, key, data, flags)
1098 	DBC *dbc;
1099 	DBT *key, *data;
1100 	u_int32_t flags;
1101 {
1102 	DB *dbp;
1103 	DBC *ddbc, *dbc_n, *opd;
1104 	DBC_INTERNAL *cp, *cp_n;
1105 	DB_MPOOLFILE *mpf;
1106 	ENV *env;
1107 	db_pgno_t pgno;
1108 	db_indx_t indx_off;
1109 	u_int32_t multi, orig_ulen, tmp_flags, tmp_read_locking, tmp_rmw;
1110 	u_int8_t type;
1111 	int key_small, ret, t_ret;
1112 
1113 	COMPQUIET(orig_ulen, 0);
1114 
1115 	dbc->cur_key = key;
1116 	key_small = 0;
1117 
1118 	/*
1119 	 * Cursor Cleanup Note:
1120 	 * All of the cursors passed to the underlying access methods by this
1121 	 * routine are duplicated cursors.  On return, any referenced pages
1122 	 * will be discarded, and, if the cursor is not intended to be used
1123 	 * again, the close function will be called.  So, pages/locks that
1124 	 * the cursor references do not need to be resolved by the underlying
1125 	 * functions.
1126 	 */
1127 	dbp = dbc->dbp;
1128 	env = dbp->env;
1129 	mpf = dbp->mpf;
1130 	dbc_n = NULL;
1131 	opd = NULL;
1132 
1133 	PERFMON6(env, db, get, dbp->fname, dbp->dname,
1134 	    dbc->txn == NULL ? 0 : dbc->txn->txnid, key, data, flags);
1135 
1136 	/* Clear OR'd in additional bits so we can check for flag equality. */
1137 	tmp_rmw = LF_ISSET(DB_RMW);
1138 	LF_CLR(DB_RMW);
1139 
1140 	SET_READ_LOCKING_FLAGS(dbc, tmp_read_locking);
1141 
1142 	multi = LF_ISSET(DB_MULTIPLE|DB_MULTIPLE_KEY);
1143 	LF_CLR(DB_MULTIPLE|DB_MULTIPLE_KEY);
1144 
1145 	/*
1146 	 * Return a cursor's record number.  It has nothing to do with the
1147 	 * cursor get code except that it was put into the interface.
1148 	 */
1149 	if (flags == DB_GET_RECNO) {
1150 		if (tmp_rmw)
1151 			F_SET(dbc, DBC_RMW);
1152 		F_SET(dbc, tmp_read_locking);
1153 		ret = __bamc_rget(dbc, data);
1154 		if (tmp_rmw)
1155 			F_CLR(dbc, DBC_RMW);
1156 		/* Clear the temp flags, but leave WAS_READ_COMMITTED. */
1157 		F_CLR(dbc, tmp_read_locking & ~DBC_WAS_READ_COMMITTED);
1158 		return (ret);
1159 	}
1160 
1161 	if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
1162 		CDB_LOCKING_INIT(env, dbc);
1163 
1164 	/* Don't return the key or data if it was passed to us. */
1165 	if (!DB_RETURNS_A_KEY(dbp, flags))
1166 		F_SET(key, DB_DBT_ISSET);
1167 	if (flags == DB_GET_BOTH &&
1168 	    (dbp->dup_compare == NULL || dbp->dup_compare == __dbt_defcmp))
1169 		F_SET(data, DB_DBT_ISSET);
1170 
1171 	/*
1172 	 * If we have an off-page duplicates cursor, and the operation applies
1173 	 * to it, perform the operation.  Duplicate the cursor and call the
1174 	 * underlying function.
1175 	 *
1176 	 * Off-page duplicate trees are locked in the primary tree, that is,
1177 	 * we acquire a write lock in the primary tree and no locks in the
1178 	 * off-page dup tree.  If the DB_RMW flag was specified and the get
1179 	 * operation is done in an off-page duplicate tree, call the primary
1180 	 * cursor's upgrade routine first.  We fetch the primary tree's data
1181 	 * page to follow the buffer latching order rules for btrees: latch from
1182 	 * the top of the main tree down, even when also searching OPD trees.
1183 	 * Deadlocks could otherwise occur if we need to fetch the main page
1184 	 * while an OPD page is latched. [#22532]
1185 	 */
1186 	cp = dbc->internal;
1187 	if (cp->opd != NULL &&
1188 	    (flags == DB_CURRENT || flags == DB_GET_BOTHC ||
1189 	    flags == DB_NEXT || flags == DB_NEXT_DUP ||
1190 	    flags == DB_PREV || flags == DB_PREV_DUP)) {
1191 		if (tmp_rmw && (ret = dbc->am_writelock(dbc)) != 0)
1192 			goto err;
1193 		if (cp->page == NULL && (ret = __memp_fget(mpf, &cp->pgno,
1194 		    dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
1195 			goto err;
1196 
1197 		if (F_ISSET(dbc, DBC_TRANSIENT))
1198 			opd = cp->opd;
1199 		else if ((ret = __dbc_idup(cp->opd, &opd, DB_POSITION)) != 0)
1200 			goto err;
1201 
1202 		if ((ret = opd->am_get(opd, key, data, flags, NULL)) == 0)
1203 			goto done;
1204 		/*
1205 		 * Another cursor may have deleted all of the off-page
1206 		 * duplicates, so for operations that are moving a cursor, we
1207 		 * need to skip the empty tree and retry on the parent cursor.
1208 		 */
1209 		if (ret == DB_NOTFOUND &&
1210 		    (flags == DB_PREV || flags == DB_NEXT)) {
1211 			ret = __dbc_close(opd);
1212 			opd = NULL;
1213 			if (F_ISSET(dbc, DBC_TRANSIENT))
1214 				cp->opd = NULL;
1215 		}
1216 		if (ret != 0)
1217 			goto err;
1218 	} else if (cp->opd != NULL && F_ISSET(dbc, DBC_TRANSIENT)) {
1219 		if ((ret = __dbc_close(cp->opd)) != 0)
1220 			goto err;
1221 		cp->opd = NULL;
1222 	}
1223 
1224 	/*
1225 	 * Perform an operation on the main cursor.  Duplicate the cursor,
1226 	 * upgrade the lock as required, and call the underlying function.
1227 	 */
1228 	switch (flags) {
1229 	case DB_CURRENT:
1230 	case DB_GET_BOTHC:
1231 	case DB_NEXT:
1232 	case DB_NEXT_DUP:
1233 	case DB_NEXT_NODUP:
1234 	case DB_PREV:
1235 	case DB_PREV_DUP:
1236 	case DB_PREV_NODUP:
1237 		tmp_flags = DB_POSITION;
1238 		break;
1239 	default:
1240 		tmp_flags = 0;
1241 		break;
1242 	}
1243 
1244 	/*
1245 	 * If this cursor is going to be closed immediately, we don't
1246 	 * need to take precautions to clean it up on error.
1247 	 */
1248 	if (F_ISSET(dbc, DBC_TRANSIENT | DBC_PARTITIONED))
1249 		dbc_n = dbc;
1250 	else {
1251 		ret = __dbc_idup(dbc, &dbc_n, tmp_flags);
1252 
1253 		if (ret != 0)
1254 			goto err;
1255 		COPY_RET_MEM(dbc, dbc_n);
1256 	}
1257 
1258 	if (tmp_rmw)
1259 		F_SET(dbc_n, DBC_RMW);
1260 	F_SET(dbc_n, tmp_read_locking);
1261 
1262 	switch (multi) {
1263 	case DB_MULTIPLE:
1264 		F_SET(dbc_n, DBC_MULTIPLE);
1265 		break;
1266 	case DB_MULTIPLE_KEY:
1267 		F_SET(dbc_n, DBC_MULTIPLE_KEY);
1268 		break;
1269 	case DB_MULTIPLE | DB_MULTIPLE_KEY:
1270 		F_SET(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
1271 		break;
1272 	case 0:
1273 	default:
1274 		break;
1275 	}
1276 
1277 retry:	pgno = PGNO_INVALID;
1278 	ret = dbc_n->am_get(dbc_n, key, data, flags, &pgno);
1279 	if (tmp_rmw)
1280 		F_CLR(dbc_n, DBC_RMW);
1281 	/*
1282 	 * Clear the temporary locking flags in the new cursor.  The user's
1283 	 * (old) cursor needs to have the WAS_READ_COMMITTED flag because this
1284 	 * is used on the next call on that cursor.
1285 	 */
1286 	F_CLR(dbc_n, tmp_read_locking);
1287 	F_SET(dbc, tmp_read_locking & DBC_WAS_READ_COMMITTED);
1288 	F_CLR(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
1289 	if (ret != 0)
1290 		goto err;
1291 
1292 	cp_n = dbc_n->internal;
1293 
1294 	/*
1295 	 * We may be referencing a new off-page duplicates tree.  Acquire
1296 	 * a new cursor and call the underlying function.
1297 	 */
1298 	if (pgno != PGNO_INVALID) {
1299 		if ((ret = __dbc_newopd(dbc,
1300 		    pgno, cp_n->opd, &cp_n->opd)) != 0)
1301 			goto err;
1302 
1303 		switch (flags) {
1304 		case DB_FIRST:
1305 		case DB_NEXT:
1306 		case DB_NEXT_NODUP:
1307 		case DB_SET:
1308 		case DB_SET_RECNO:
1309 		case DB_SET_RANGE:
1310 			tmp_flags = DB_FIRST;
1311 			break;
1312 		case DB_LAST:
1313 		case DB_PREV:
1314 		case DB_PREV_NODUP:
1315 			tmp_flags = DB_LAST;
1316 			break;
1317 		case DB_GET_BOTH:
1318 		case DB_GET_BOTHC:
1319 		case DB_GET_BOTH_RANGE:
1320 			tmp_flags = flags;
1321 			break;
1322 		default:
1323 			ret = __db_unknown_flag(env, "__dbc_get", flags);
1324 			goto err;
1325 		}
1326 		ret = cp_n->opd->am_get(cp_n->opd, key, data, tmp_flags, NULL);
1327 		/*
1328 		 * Another cursor may have deleted all of the off-page
1329 		 * duplicates, so for operations that are moving a cursor, we
1330 		 * need to skip the empty tree and retry on the parent cursor.
1331 		 */
1332 		if (ret == DB_NOTFOUND) {
1333 			PERFMON5(env, race, dbc_get,
1334 			    dbp->fname, dbp->dname, ret, tmp_flags, key);
1335 
1336 			switch (flags) {
1337 			case DB_FIRST:
1338 			case DB_NEXT:
1339 			case DB_NEXT_NODUP:
1340 				flags = DB_NEXT;
1341 				break;
1342 			case DB_LAST:
1343 			case DB_PREV:
1344 			case DB_PREV_NODUP:
1345 				flags = DB_PREV;
1346 				break;
1347 			default:
1348 				goto err;
1349 			}
1350 
1351 			ret = __dbc_close(cp_n->opd);
1352 			cp_n->opd = NULL;
1353 			if (ret == 0)
1354 				goto retry;
1355 		}
1356 		if (ret != 0)
1357 			goto err;
1358 	}
1359 
1360 done:	/*
1361 	 * Return a key/data item.  The only exception is that we don't return
1362 	 * a key if the user already gave us one, that is, if the DB_SET flag
1363 	 * was set.  The DB_SET flag is necessary.  In a Btree, the user's key
1364 	 * doesn't have to be the same as the key stored the tree, depending on
1365 	 * the magic performed by the comparison function.  As we may not have
1366 	 * done any key-oriented operation here, the page reference may not be
1367 	 * valid.  Fill it in as necessary.  We don't have to worry about any
1368 	 * locks, the cursor must already be holding appropriate locks.
1369 	 *
1370 	 * !!!
1371 	 * If not a Btree and DB_SET_RANGE is set, we shouldn't return a key
1372 	 * either, should we?
1373 	 */
1374 	cp_n = dbc_n == NULL ? dbc->internal : dbc_n->internal;
1375 	if (!F_ISSET(key, DB_DBT_ISSET)) {
1376 		if (cp_n->page == NULL && (ret = __memp_fget(mpf, &cp_n->pgno,
1377 		    dbc->thread_info, dbc->txn, 0, &cp_n->page)) != 0)
1378 			goto err;
1379 
1380 		if ((ret = __db_ret(dbc, cp_n->page, cp_n->indx, key,
1381 		    &dbc->rkey->data, &dbc->rkey->ulen)) != 0) {
1382 			/*
1383 			 * If the key DBT is too small, we still want to return
1384 			 * the size of the data.  Otherwise applications are
1385 			 * forced to check each one with a separate call.  We
1386 			 * don't want to copy the data, so we set the ulen to
1387 			 * zero before calling __db_ret.
1388 			 */
1389 			if (ret == DB_BUFFER_SMALL &&
1390 			    F_ISSET(data, DB_DBT_USERMEM)) {
1391 				key_small = 1;
1392 				orig_ulen = data->ulen;
1393 				data->ulen = 0;
1394 			} else
1395 				goto err;
1396 		}
1397 	}
1398 	if (multi != 0 && dbc->am_bulk != NULL) {
1399 		/*
1400 		 * Even if fetching from the OPD cursor we need a duplicate
1401 		 * primary cursor if we are going after multiple keys.
1402 		 */
1403 		if (dbc_n == NULL) {
1404 			/*
1405 			 * Non-"_KEY" DB_MULTIPLE doesn't move the main cursor,
1406 			 * so it's safe to just use dbc, unless the cursor
1407 			 * has an open off-page duplicate cursor whose state
1408 			 * might need to be preserved.
1409 			 */
1410 			if ((!(multi & DB_MULTIPLE_KEY) &&
1411 			    dbc->internal->opd == NULL) ||
1412 			    F_ISSET(dbc, DBC_TRANSIENT | DBC_PARTITIONED))
1413 				dbc_n = dbc;
1414 			else {
1415 				if ((ret = __dbc_idup(dbc,
1416 				    &dbc_n, DB_POSITION)) != 0)
1417 					goto err;
1418 				if ((ret = dbc_n->am_get(dbc_n,
1419 				    key, data, DB_CURRENT, &pgno)) != 0)
1420 					goto err;
1421 			}
1422 			cp_n = dbc_n->internal;
1423 		}
1424 
1425 		/*
1426 		 * If opd is set then we dupped the opd that we came in with.
1427 		 * When we return we may have a new opd if we went to another
1428 		 * key.
1429 		 */
1430 		if (opd != NULL) {
1431 			DB_ASSERT(env, cp_n->opd == NULL);
1432 			cp_n->opd = opd;
1433 			opd = NULL;
1434 		}
1435 
1436 		/*
1437 		 * Bulk get doesn't use __db_retcopy, so data.size won't
1438 		 * get set up unless there is an error.  Assume success
1439 		 * here.  This is the only call to am_bulk, and it avoids
1440 		 * setting it exactly the same everywhere.  If we have an
1441 		 * DB_BUFFER_SMALL error, it'll get overwritten with the
1442 		 * needed value.
1443 		 */
1444 		data->size = data->ulen;
1445 		ret = dbc_n->am_bulk(dbc_n, data, flags | multi);
1446 	} else if (!F_ISSET(data, DB_DBT_ISSET)) {
1447 		ddbc = opd != NULL ? opd :
1448 		    cp_n->opd != NULL ? cp_n->opd : dbc_n;
1449 		cp = ddbc->internal;
1450 		if (cp->page == NULL &&
1451 		    (ret = __memp_fget(mpf, &cp->pgno,
1452 			 dbc->thread_info, ddbc->txn, 0, &cp->page)) != 0)
1453 			goto err;
1454 
1455 		type = TYPE(cp->page);
1456 		indx_off = ((type == P_LBTREE ||
1457 		    type == P_HASH || type == P_HASH_UNSORTED) ? O_INDX : 0);
1458 		ret = __db_ret(ddbc, cp->page, cp->indx + indx_off,
1459 		    data, &dbc->rdata->data, &dbc->rdata->ulen);
1460 	}
1461 
1462 err:	/* Don't pass DB_DBT_ISSET back to application level, error or no. */
1463 	F_CLR(key, DB_DBT_ISSET);
1464 	F_CLR(data, DB_DBT_ISSET);
1465 
1466 	/* Cleanup and cursor resolution. */
1467 	if (opd != NULL) {
1468 		/*
1469 		 * To support dirty reads we must reget the write lock
1470 		 * if we have just stepped off a deleted record.
1471 		 * Since the OPD cursor does not know anything
1472 		 * about the referencing page or cursor we need
1473 		 * to peek at the OPD cursor and get the lock here.
1474 		 */
1475 		if (F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
1476 		     F_ISSET((BTREE_CURSOR *)
1477 		     dbc->internal->opd->internal, C_DELETED))
1478 			if ((t_ret =
1479 			    dbc->am_writelock(dbc)) != 0 && ret == 0)
1480 				ret = t_ret;
1481 		if ((t_ret = __dbc_cleanup(
1482 		    dbc->internal->opd, opd, ret)) != 0 && ret == 0)
1483 			ret = t_ret;
1484 	}
1485 
1486 	if (key_small) {
1487 		data->ulen = orig_ulen;
1488 		if (ret == 0)
1489 			ret = DB_BUFFER_SMALL;
1490 	}
1491 
1492 	if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 &&
1493 	    (ret == 0 || ret == DB_BUFFER_SMALL))
1494 		ret = t_ret;
1495 
1496 	if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
1497 		CDB_LOCKING_DONE(env, dbc);
1498 	return (ret);
1499 }
1500 
1501 /* Internal flags shared by the dbc_put functions. */
1502 #define	DBC_PUT_RMW		0x001
1503 #define	DBC_PUT_NODEL		0x002
1504 #define	DBC_PUT_HAVEREC		0x004
1505 
1506 /*
1507  * __dbc_put_resolve_key --
1508  *	Get the current key and data so that we can correctly update the
1509  *	secondary and foreign databases.
1510  */
1511 static inline int
__dbc_put_resolve_key(dbc,oldkey,olddata,put_statep,flags)1512 __dbc_put_resolve_key(dbc, oldkey, olddata, put_statep, flags)
1513 	DBC *dbc;
1514 	DBT *oldkey, *olddata;
1515 	u_int32_t flags, *put_statep;
1516 {
1517 	int ret, rmw;
1518 
1519 	rmw = FLD_ISSET(*put_statep, DBC_PUT_RMW) ? DB_RMW : 0;
1520 
1521 	DB_ASSERT(dbc->env, flags == DB_CURRENT);
1522 	COMPQUIET(flags, 0);
1523 
1524 	/*
1525 	 * This is safe to do on the cursor we already have;
1526 	 * error or no, it won't move.
1527 	 *
1528 	 * We use DB_RMW for all of these gets because we'll be
1529 	 * writing soon enough in the "normal" put code.  In
1530 	 * transactional databases we'll hold those write locks
1531 	 * even if we close the cursor we're reading with.
1532 	 *
1533 	 * The DB_KEYEMPTY return needs special handling -- if the
1534 	 * cursor is on a deleted key, we return DB_NOTFOUND.
1535 	 */
1536 	memset(oldkey, 0, sizeof(DBT));
1537 	if ((ret = __dbc_get(dbc, oldkey, olddata, rmw | DB_CURRENT)) != 0)
1538 		return (ret == DB_KEYEMPTY ? DB_NOTFOUND : ret);
1539 
1540 	/* Record that we've looked for the old record. */
1541 	FLD_SET(*put_statep, DBC_PUT_HAVEREC);
1542 	return (0);
1543 }
1544 
1545 /*
1546  * __dbc_put_append --
1547  *	Handle an append to a primary.
1548  */
1549 static inline int
__dbc_put_append(dbc,key,data,put_statep,flags)1550 __dbc_put_append(dbc, key, data, put_statep, flags)
1551 	DBC *dbc;
1552 	DBT *key, *data;
1553 	u_int32_t flags, *put_statep;
1554 {
1555 	DB *dbp;
1556 	ENV *env;
1557 	DBC *dbc_n;
1558 	DBT tdata;
1559 	int ret, t_ret;
1560 
1561 	dbp = dbc->dbp;
1562 	env = dbp->env;
1563 	ret = 0;
1564 	dbc_n = NULL;
1565 
1566 	DB_ASSERT(env, flags == DB_APPEND);
1567 	COMPQUIET(flags, 0);
1568 
1569 	/*
1570 	 * With DB_APPEND, we need to do the insert to populate the key value.
1571 	 * So we swap the 'normal' order of updating secondary / verifying
1572 	 * foreign databases and inserting.
1573 	 *
1574 	 * If there is an append callback, the value stored in data->data may
1575 	 * be replaced and then freed.  To avoid passing a freed pointer back
1576 	 * to the user, just operate on a copy of the data DBT.
1577 	 */
1578 	tdata = *data;
1579 
1580 	/*
1581 	 * If this cursor is going to be closed immediately, we don't
1582 	 * need to take precautions to clean it up on error.
1583 	 */
1584 	if (F_ISSET(dbc, DBC_TRANSIENT))
1585 		dbc_n = dbc;
1586 	else if ((ret = __dbc_idup(dbc, &dbc_n, 0)) != 0)
1587 		goto err;
1588 
1589 	/*
1590 	 * Append isn't a normal put operation;  call the appropriate access
1591 	 * method's append function.
1592 	 */
1593 	switch (dbp->type) {
1594 	case DB_HEAP:
1595 		if ((ret = __heap_append(dbc_n, key, &tdata)) != 0)
1596 			goto err;
1597 		break;
1598 	case DB_QUEUE:
1599 		if ((ret = __qam_append(dbc_n, key, &tdata)) != 0)
1600 			goto err;
1601 		break;
1602 	case DB_RECNO:
1603 		if ((ret = __ram_append(dbc_n, key, &tdata)) != 0)
1604 			goto err;
1605 		break;
1606 	default:
1607 		/* The interface should prevent this. */
1608 		DB_ASSERT(env,
1609 		    dbp->type == DB_QUEUE || dbp->type == DB_RECNO);
1610 
1611 		ret = __db_ferr(env, "DBC->put", 0);
1612 		goto err;
1613 	}
1614 
1615 	/*
1616 	 * The append callback, if one exists, may have allocated a new
1617 	 * tdata.data buffer.  If so, free it.
1618 	 */
1619 	FREE_IF_NEEDED(env, &tdata);
1620 
1621 	/*
1622 	 * The key value may have been generated by the above operation, but
1623 	 * not set in the data buffer. Make sure it is there so that secondary
1624 	 * updates can complete.
1625 	 */
1626 	__dbt_userfree(env, key, NULL, NULL);
1627 	if ((ret = __dbt_usercopy(env, key)) != 0)
1628 		goto err;
1629 
1630 	/* An append cannot be replacing an existing item. */
1631 	FLD_SET(*put_statep, DBC_PUT_NODEL);
1632 
1633 err:	if (dbc_n != NULL &&
1634 	    (t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
1635 		ret = t_ret;
1636 	return (ret);
1637 }
1638 
1639 /*
1640  * __dbc_put_partial --
1641  *	Ensure that the data item we are using is complete and correct.
1642  *      Otherwise we could break the secondary constraints.
1643  */
1644 static inline int
__dbc_put_partial(dbc,pkey,data,orig_data,out_data,put_statep,flags)1645 __dbc_put_partial(dbc, pkey, data, orig_data, out_data, put_statep, flags)
1646 	DBC *dbc;
1647 	DBT *pkey, *data, *orig_data, *out_data;
1648 	u_int32_t *put_statep, flags;
1649 {
1650 	DB *dbp;
1651 	DBC *pdbc;
1652 	int ret, rmw, t_ret;
1653 
1654 	dbp = dbc->dbp;
1655 	ret = t_ret = 0;
1656 	rmw = FLD_ISSET(*put_statep, DBC_PUT_RMW) ? DB_RMW : 0;
1657 
1658 	if (!FLD_ISSET(*put_statep, DBC_PUT_HAVEREC) &&
1659 	    !FLD_ISSET(*put_statep, DBC_PUT_NODEL)) {
1660 		/*
1661 		 * We're going to have to search the tree for the
1662 		 * specified key.  Dup a cursor (so we have the same
1663 		 * locking info) and do a c_get.
1664 		 */
1665 		if ((ret = __dbc_idup(dbc, &pdbc, 0)) != 0)
1666 			return (ret);
1667 
1668 		/*
1669 		 * When doing a put with DB_CURRENT, partial data items have
1670 		 * already been resolved.
1671 		 */
1672 		DB_ASSERT(dbp->env, flags != DB_CURRENT);
1673 
1674 		F_SET(pkey, DB_DBT_ISSET);
1675 		ret = __dbc_get(pdbc, pkey, orig_data, rmw | DB_SET);
1676 		if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
1677 			FLD_SET(*put_statep, DBC_PUT_NODEL);
1678 			ret = 0;
1679 		}
1680 		if ((t_ret = __dbc_close(pdbc)) != 0)
1681 			ret = t_ret;
1682 		if (ret != 0)
1683 			return (ret);
1684 
1685 		FLD_SET(*put_statep, DBC_PUT_HAVEREC);
1686 	}
1687 
1688 	COMPQUIET(flags, 0);
1689 
1690 	/*
1691 	 * Now build the new datum from orig_data and the partial data
1692 	 * we were given.  It's okay to do this if no record was
1693 	 * returned above: a partial put on an empty record is allowed,
1694 	 * if a little strange.  The data is zero-padded.
1695 	 */
1696 	return (__db_buildpartial(dbp, orig_data, data, out_data));
1697 }
1698 
1699 /*
1700  * __dbc_put_fixed_len --
1701  *      Handle padding for fixed-length records.
1702  */
1703 static inline int
__dbc_put_fixed_len(dbc,data,out_data)1704 __dbc_put_fixed_len(dbc, data, out_data)
1705 	DBC *dbc;
1706 	DBT *data, *out_data;
1707 {
1708 	DB *dbp;
1709 	ENV *env;
1710 	int re_pad, ret;
1711 	u_int32_t re_len, size;
1712 
1713 	dbp = dbc->dbp;
1714 	env = dbp->env;
1715 	ret = 0;
1716 
1717 	/*
1718 	 * Handle fixed-length records.  If the primary database has
1719 	 * fixed-length records, we need to pad out the datum before
1720 	 * we pass it into the callback function;  we always index the
1721 	 * "real" record.
1722 	 */
1723 	if (dbp->type == DB_QUEUE) {
1724 		re_len = ((QUEUE *)dbp->q_internal)->re_len;
1725 		re_pad = ((QUEUE *)dbp->q_internal)->re_pad;
1726 	} else {
1727 		re_len = ((BTREE *)dbp->bt_internal)->re_len;
1728 		re_pad = ((BTREE *)dbp->bt_internal)->re_pad;
1729 	}
1730 
1731 	size = data->size;
1732 	if (size > re_len) {
1733 		ret = __db_rec_toobig(env, size, re_len);
1734 		return (ret);
1735 	} else if (size < re_len) {
1736 		/*
1737 		 * If we're not doing a partial put, copy data->data into
1738 		 * out_data->data, then pad out out_data->data. This overrides
1739 		 * the assignment made above, which is used in the more common
1740 		 * case when padding is not needed.
1741 		 *
1742 		 * If we're doing a partial put, the data we want are already
1743 		 * in out_data.data; we just need to pad.
1744 		 */
1745 		if (F_ISSET(data, DB_DBT_PARTIAL)) {
1746 		       if ((ret = __os_realloc(
1747 			    env, re_len, &out_data->data)) != 0)
1748 				return (ret);
1749 		       /*
1750 			* In the partial case, we have built the item into
1751 			* out_data already using __db_buildpartial. Just need
1752 			* to pad from the end of out_data, not from data->size.
1753 			*/
1754 		       size = out_data->size;
1755 		} else {
1756 			if ((ret = __os_malloc(
1757 			    env, re_len, &out_data->data)) != 0)
1758 				return (ret);
1759 			memcpy(out_data->data, data->data, size);
1760 		}
1761 		memset((u_int8_t *)out_data->data + size, re_pad,
1762 		    re_len - size);
1763 		out_data->size = re_len;
1764 	}
1765 
1766 	return (ret);
1767 }
1768 
1769 /*
1770  * __dbc_put_secondaries --
1771  *	Insert the secondary keys, and validate the foreign key constraints.
1772  */
1773 static inline int
__dbc_put_secondaries(dbc,pkey,data,orig_data,s_count,s_keys_buf,put_statep)1774 __dbc_put_secondaries(dbc,
1775     pkey, data, orig_data, s_count, s_keys_buf, put_statep)
1776 	DBC *dbc;
1777 	DBT *pkey, *data, *orig_data, *s_keys_buf;
1778 	int s_count;
1779 	u_int32_t *put_statep;
1780 {
1781 	DB *dbp, *sdbp;
1782 	DBC *fdbc, *sdbc;
1783 	DBT fdata, oldpkey, *skeyp, temppkey, tempskey, *tskeyp;
1784 	ENV *env;
1785 	int cmp, ret, rmw, t_ret;
1786 	u_int32_t nskey;
1787 
1788 	dbp = dbc->dbp;
1789 	env = dbp->env;
1790 	fdbc = sdbc = NULL;
1791 	sdbp = NULL;
1792 	t_ret = 0;
1793 	rmw = FLD_ISSET(*put_statep, DBC_PUT_RMW) ? DB_RMW : 0;
1794 
1795 	/*
1796 	 * Loop through the secondaries.  (Step 3.)
1797 	 *
1798 	 * Note that __db_s_first and __db_s_next will take care of
1799 	 * thread-locking and refcounting issues.
1800 	 */
1801 	for (ret = __db_s_first(dbp, &sdbp), skeyp = s_keys_buf;
1802 	    sdbp != NULL && ret == 0;
1803 	    ret = __db_s_next(&sdbp, dbc->txn), ++skeyp) {
1804 		DB_ASSERT(env, skeyp - s_keys_buf < s_count);
1805 		/*
1806 		 * Don't process this secondary if the key is immutable and we
1807 		 * know that the old record exists.  This optimization can't be
1808 		 * used if we have not checked for the old record yet.
1809 		 */
1810 		if (FLD_ISSET(*put_statep, DBC_PUT_HAVEREC) &&
1811 		    !FLD_ISSET(*put_statep, DBC_PUT_NODEL) &&
1812 		    FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
1813 			continue;
1814 
1815 		/*
1816 		 * Call the callback for this secondary, to get the
1817 		 * appropriate secondary key.
1818 		 */
1819 		if ((ret = sdbp->s_callback(sdbp,
1820 		    pkey, data, skeyp)) != 0) {
1821 			/* Not indexing is equivalent to an empty key set. */
1822 			if (ret == DB_DONOTINDEX) {
1823 				F_SET(skeyp, DB_DBT_MULTIPLE);
1824 				skeyp->size = 0;
1825 				ret = 0;
1826 			} else
1827 				goto err;
1828 		}
1829 
1830 		if (sdbp->s_foreign != NULL &&
1831 		    (ret = __db_cursor_int(sdbp->s_foreign,
1832 		    dbc->thread_info, dbc->txn, sdbp->s_foreign->type,
1833 		    PGNO_INVALID, 0, dbc->locker, &fdbc)) != 0)
1834 			goto err;
1835 
1836 		/*
1837 		 * Mark the secondary key DBT(s) as set -- that is, the
1838 		 * callback returned at least one secondary key.
1839 		 *
1840 		 * Also, if this secondary index is associated with a foreign
1841 		 * database, check that the foreign db contains the key(s) to
1842 		 * maintain referential integrity.  Set flags in fdata to avoid
1843 		 * mem copying, we just need to know existence.  We need to do
1844 		 * this check before setting DB_DBT_ISSET, otherwise __dbc_get
1845 		 * will overwrite the flag values.
1846 		 */
1847 		if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) {
1848 #ifdef DIAGNOSTIC
1849 			__db_check_skeyset(sdbp, skeyp);
1850 #endif
1851 			for (tskeyp = (DBT *)skeyp->data, nskey = skeyp->size;
1852 			     nskey > 0; nskey--, tskeyp++) {
1853 				if (fdbc != NULL) {
1854 					memset(&fdata, 0, sizeof(DBT));
1855 					F_SET(&fdata,
1856 					    DB_DBT_PARTIAL | DB_DBT_USERMEM);
1857 					if ((ret = __dbc_get(
1858 					    fdbc, tskeyp, &fdata,
1859 					    DB_SET | rmw)) == DB_NOTFOUND ||
1860 					    ret == DB_KEYEMPTY) {
1861 						ret = DB_FOREIGN_CONFLICT;
1862 						break;
1863 					}
1864 				}
1865 				F_SET(tskeyp, DB_DBT_ISSET);
1866 			}
1867 			tskeyp = (DBT *)skeyp->data;
1868 			nskey = skeyp->size;
1869 		} else {
1870 			if (fdbc != NULL) {
1871 				memset(&fdata, 0, sizeof(DBT));
1872 				F_SET(&fdata, DB_DBT_PARTIAL | DB_DBT_USERMEM);
1873 				if ((ret = __dbc_get(fdbc, skeyp, &fdata,
1874 				    DB_SET | rmw)) == DB_NOTFOUND ||
1875 				    ret == DB_KEYEMPTY)
1876 					ret = DB_FOREIGN_CONFLICT;
1877 			}
1878 			F_SET(skeyp, DB_DBT_ISSET);
1879 			tskeyp = skeyp;
1880 			nskey = 1;
1881 		}
1882 		if (fdbc != NULL && (t_ret = __dbc_close(fdbc)) != 0 &&
1883 		    ret == 0)
1884 			ret = t_ret;
1885 		fdbc = NULL;
1886 		if (ret != 0)
1887 			goto err;
1888 
1889 		/*
1890 		 * If we have the old record, we can generate and remove any
1891 		 * old secondary key(s) now.  We can also skip the secondary
1892 		 * put if there is no change.
1893 		 */
1894 		if (FLD_ISSET(*put_statep, DBC_PUT_HAVEREC)) {
1895 			if ((ret = __dbc_del_oldskey(sdbp, dbc,
1896 			    skeyp, pkey, orig_data)) == DB_KEYEXIST)
1897 				continue;
1898 			else if (ret != 0)
1899 				goto err;
1900 		}
1901 		if (nskey == 0)
1902 			continue;
1903 
1904 		/*
1905 		 * Open a cursor in this secondary.
1906 		 *
1907 		 * Use the same locker ID as our primary cursor, so that
1908 		 * we're guaranteed that the locks don't conflict (e.g. in CDB
1909 		 * or if we're subdatabases that share and want to lock a
1910 		 * metadata page).
1911 		 */
1912 		if ((ret = __db_cursor_int(sdbp, dbc->thread_info, dbc->txn,
1913 		    sdbp->type, PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
1914 			goto err;
1915 
1916 		/*
1917 		 * If we're in CDB, updates will fail since the new cursor
1918 		 * isn't a writer.  However, we hold the WRITE lock in the
1919 		 * primary and will for as long as our new cursor lasts,
1920 		 * and the primary and secondary share a lock file ID,
1921 		 * so it's safe to consider this a WRITER.  The close
1922 		 * routine won't try to put anything because we don't
1923 		 * really have a lock.
1924 		 */
1925 		if (CDB_LOCKING(env)) {
1926 			DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
1927 			F_SET(sdbc, DBC_WRITER);
1928 		}
1929 
1930 		/*
1931 		 * Swap the primary key to the byte order of this secondary, if
1932 		 * necessary.  By doing this now, we can compare directly
1933 		 * against the data already in the secondary without having to
1934 		 * swap it after reading.
1935 		 */
1936 		SWAP_IF_NEEDED(sdbp, pkey);
1937 
1938 		for (; nskey > 0 && ret == 0; nskey--, tskeyp++) {
1939 			/* Skip this key if it is already in the database. */
1940 			if (!F_ISSET(tskeyp, DB_DBT_ISSET))
1941 				continue;
1942 
1943 			/*
1944 			 * There are three cases here--
1945 			 * 1) The secondary supports sorted duplicates.
1946 			 *	If we attempt to put a secondary/primary pair
1947 			 *	that already exists, that's a duplicate
1948 			 *	duplicate, and c_put will return DB_KEYEXIST
1949 			 *	(see __db_duperr).  This will leave us with
1950 			 *	exactly one copy of the secondary/primary pair,
1951 			 *	and this is just right--we'll avoid deleting it
1952 			 *	later, as the old and new secondaries will
1953 			 *	match (since the old secondary is the dup dup
1954 			 *	that's already there).
1955 			 * 2) The secondary supports duplicates, but they're not
1956 			 *	sorted.  We need to avoid putting a duplicate
1957 			 *	duplicate, because the matching old and new
1958 			 *	secondaries will prevent us from deleting
1959 			 *	anything and we'll wind up with two secondary
1960 			 *	records that point to the same primary key.  Do
1961 			 *	a c_get(DB_GET_BOTH);  only do the put if the
1962 			 *	secondary doesn't exist.
1963 			 * 3) The secondary doesn't support duplicates at all.
1964 			 *	In this case, secondary keys must be unique;
1965 			 *	if another primary key already exists for this
1966 			 *	secondary key, we have to either overwrite it
1967 			 *	or not put this one, and in either case we've
1968 			 *	corrupted the secondary index.  Do a
1969 			 *	c_get(DB_SET).  If the secondary/primary pair
1970 			 *	already exists, do nothing;  if the secondary
1971 			 *	exists with a different primary, return an
1972 			 *	error;  and if the secondary does not exist,
1973 			 *	put it.
1974 			 */
1975 			if (!F_ISSET(sdbp, DB_AM_DUP)) {
1976 				/* Case 3. */
1977 				memset(&oldpkey, 0, sizeof(DBT));
1978 				F_SET(&oldpkey, DB_DBT_MALLOC);
1979 				ret = __dbc_get(sdbc,
1980 				    tskeyp, &oldpkey, rmw | DB_SET);
1981 				if (ret == 0) {
1982 					cmp = __dbt_defcmp(sdbp,
1983 					    &oldpkey, pkey, NULL);
1984 					__os_ufree(env, oldpkey.data);
1985 					/*
1986 					 * If the secondary key is unchanged,
1987 					 * skip the put and go on to the next
1988 					 * one.
1989 					 */
1990 					if (cmp == 0)
1991 						continue;
1992 
1993 					ret = USR_ERR(env, EINVAL);
1994 					__db_errx(env, DB_STR("0695",
1995 			    "Put results in a non-unique secondary key in an "
1996 			    "index not configured to support duplicates"));
1997 				}
1998 				if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
1999 					break;
2000 			} else if (!F_ISSET(sdbp, DB_AM_DUPSORT)) {
2001 				/* Case 2. */
2002 				DB_INIT_DBT(tempskey,
2003 				    tskeyp->data, tskeyp->size);
2004 				DB_INIT_DBT(temppkey,
2005 				    pkey->data, pkey->size);
2006 				ret = __dbc_get(sdbc, &tempskey, &temppkey,
2007 				    rmw | DB_GET_BOTH);
2008 				if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
2009 					break;
2010 			}
2011 
2012 			ret = __dbc_put(sdbc, tskeyp, pkey,
2013 			    DB_UPDATE_SECONDARY);
2014 
2015 			/*
2016 			 * We don't know yet whether this was a put-overwrite
2017 			 * that in fact changed nothing.  If it was, we may get
2018 			 * DB_KEYEXIST.  This is not an error.
2019 			 */
2020 			if (ret == DB_KEYEXIST)
2021 				ret = 0;
2022 		}
2023 
2024 		/* Make sure the primary key is back in native byte-order. */
2025 		SWAP_IF_NEEDED(sdbp, pkey);
2026 
2027 		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
2028 			ret = t_ret;
2029 
2030 		if (ret != 0)
2031 			goto err;
2032 
2033 		/*
2034 		 * Mark that we have a key for this secondary so we can check
2035 		 * it later before deleting the old one.  We can't set it
2036 		 * earlier or it would be cleared in the calls above.
2037 		 */
2038 		F_SET(skeyp, DB_DBT_ISSET);
2039 	}
2040 err:	if (sdbp != NULL &&
2041 	    (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
2042 		ret = t_ret;
2043 	COMPQUIET(s_count, 0);
2044 	return (ret);
2045 }
2046 
2047 static int
__dbc_put_primary(dbc,key,data,flags)2048 __dbc_put_primary(dbc, key, data, flags)
2049 	DBC *dbc;
2050 	DBT *key, *data;
2051 	u_int32_t flags;
2052 {
2053 	DB *dbp, *sdbp;
2054 	DBC *dbc_n, *pdbc;
2055 	DBT oldkey, olddata, newdata;
2056 	DBT *all_skeys, *skeyp, *tskeyp;
2057 	ENV *env;
2058 	int ret, t_ret, s_count;
2059 	u_int32_t nskey, put_state, rmw;
2060 
2061 	dbp = dbc->dbp;
2062 	env = dbp->env;
2063 	t_ret = 0;
2064 	put_state = 0;
2065 	sdbp = NULL;
2066 	pdbc = dbc_n = NULL;
2067 	all_skeys = NULL;
2068 	memset(&newdata, 0, sizeof(DBT));
2069 	memset(&olddata, 0, sizeof(DBT));
2070 
2071 	/*
2072 	 * We do multiple cursor operations in some cases and subsequently
2073 	 * access the data DBT information.  Set DB_DBT_MALLOC so we don't risk
2074 	 * modification of the data between our uses of it.
2075 	 */
2076 	F_SET(&olddata, DB_DBT_MALLOC);
2077 
2078 	/*
2079 	 * We have at least one secondary which we may need to update.
2080 	 *
2081 	 * There is a rather vile locking issue here.  Secondary gets
2082 	 * will always involve acquiring a read lock in the secondary,
2083 	 * then acquiring a read lock in the primary.  Ideally, we
2084 	 * would likewise perform puts by updating all the secondaries
2085 	 * first, then doing the actual put in the primary, to avoid
2086 	 * deadlock (since having multiple threads doing secondary
2087 	 * gets and puts simultaneously is probably a common case).
2088 	 *
2089 	 * However, if this put is a put-overwrite--and we have no way to
2090 	 * tell in advance whether it will be--we may need to delete
2091 	 * an outdated secondary key.  In order to find that old
2092 	 * secondary key, we need to get the record we're overwriting,
2093 	 * before we overwrite it.
2094 	 *
2095 	 * (XXX: It would be nice to avoid this extra get, and have the
2096 	 * underlying put routines somehow pass us the old record
2097 	 * since they need to traverse the tree anyway.  I'm saving
2098 	 * this optimization for later, as it's a lot of work, and it
2099 	 * would be hard to fit into this locking paradigm anyway.)
2100 	 *
2101 	 * The simple thing to do would be to go get the old record before
2102 	 * we do anything else.  Unfortunately, though, doing so would
2103 	 * violate our "secondary, then primary" lock acquisition
2104 	 * ordering--even in the common case where no old primary record
2105 	 * exists, we'll still acquire and keep a lock on the page where
2106 	 * we're about to do the primary insert.
2107 	 *
2108 	 * To get around this, we do the following gyrations, which
2109 	 * hopefully solve this problem in the common case:
2110 	 *
2111 	 * 1) If this is a c_put(DB_CURRENT), go ahead and get the
2112 	 *    old record.  We already hold the lock on this page in
2113 	 *    the primary, so no harm done, and we'll need the primary
2114 	 *    key (which we weren't passed in this case) to do any
2115 	 *    secondary puts anyway.
2116 	 *    If this is a put(DB_APPEND), then we need to insert the item,
2117 	 *    so that we can know the key value. So go ahead and insert. In
2118 	 *    the case of a put(DB_APPEND) without secondaries it is
2119 	 *    implemented in the __db_put method as an optimization.
2120 	 *
2121 	 * 2) If we're doing a partial put, we need to perform the
2122 	 *    get on the primary key right away, since we don't have
2123 	 *    the whole datum that the secondary key is based on.
2124 	 *    We may also need to pad out the record if the primary
2125 	 *    has a fixed record length.
2126 	 *
2127 	 * 3) Loop through the secondary indices, putting into each a
2128 	 *    new secondary key that corresponds to the new record.
2129 	 *
2130 	 * 4) If we haven't done so in (1) or (2), get the old primary
2131 	 *    key/data pair.  If one does not exist--the common case--we're
2132 	 *    done with secondary indices, and can go straight on to the
2133 	 *    primary put.
2134 	 *
2135 	 * 5) If we do have an old primary key/data pair, however, we need
2136 	 *    to loop through all the secondaries a second time and delete
2137 	 *    the old secondary in each.
2138 	 */
2139 	s_count = __db_s_count(dbp);
2140 	if ((ret = __os_calloc(env,
2141 	    (u_int)s_count, sizeof(DBT), &all_skeys)) != 0)
2142 		goto err;
2143 
2144 	/*
2145 	 * Primary indices can't have duplicates, so only DB_APPEND,
2146 	 * DB_CURRENT, DB_KEYFIRST, and DB_KEYLAST make any sense.  Other flags
2147 	 * should have been caught by the checking routine, but
2148 	 * add a sprinkling of paranoia.
2149 	 */
2150 	DB_ASSERT(env, flags == DB_APPEND || flags == DB_CURRENT ||
2151 	      flags == DB_KEYFIRST || flags == DB_KEYLAST ||
2152 	      flags == DB_NOOVERWRITE || flags == DB_OVERWRITE_DUP);
2153 
2154 	/*
2155 	 * We'll want to use DB_RMW in a few places, but it's only legal
2156 	 * when locking is on.
2157 	 */
2158 	rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
2159 	if (rmw)
2160 		FLD_SET(put_state, DBC_PUT_RMW);
2161 
2162 	/* Resolve the primary key if required (Step 1). */
2163 	if (flags == DB_CURRENT) {
2164 		if ((ret = __dbc_put_resolve_key(dbc,
2165 		    &oldkey, &olddata, &put_state, flags)) != 0)
2166 			goto err;
2167 		key = &oldkey;
2168 	} else if (flags == DB_APPEND) {
2169 		if ((ret = __dbc_put_append(dbc,
2170 		    key, data, &put_state, flags)) != 0)
2171 			goto err;
2172 	}
2173 
2174 	/*
2175 	 * PUT_NOOVERWRITE with secondaries is a troublesome case. We need
2176 	 * to check that the insert will work prior to making any changes
2177 	 * to secondaries. Try to work within the locking constraints outlined
2178 	 * above.
2179 	 *
2180 	 * This is DB->put (DB_NOOVERWRITE). DBC->put(DB_NODUPDATA) is not
2181 	 * relevant since it is only valid on DBs that support duplicates,
2182 	 * which primaries with secondaries can't have.
2183 	 */
2184 	if (flags == DB_NOOVERWRITE) {
2185 		/* Don't bother retrieving the data. */
2186 		F_SET(key, DB_DBT_ISSET);
2187 		olddata.dlen = 0;
2188 		olddata.flags = DB_DBT_PARTIAL | DB_DBT_USERMEM;
2189 		ret = __dbc_get(dbc, key, &olddata, DB_SET);
2190 		if (ret == 0) {
2191 			ret = DBC_ERR(dbc, DB_KEYEXIST);
2192 			goto done;
2193 		} else if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
2194 			goto err;
2195 	}
2196 
2197 	/*
2198 	 * Check for partial puts using DB_DBT_PARTIAL (Step 2).
2199 	 */
2200 	if (F_ISSET(data, DB_DBT_PARTIAL)) {
2201 		if ((ret = __dbc_put_partial(dbc,
2202 		    key, data, &olddata, &newdata, &put_state, flags)) != 0)
2203 			goto err;
2204 	} else {
2205 		newdata = *data;
2206 	}
2207 
2208 	/*
2209 	 * Check for partial puts, with fixed length record databases (Step 2).
2210 	 */
2211 	if ((dbp->type == DB_RECNO && F_ISSET(dbp, DB_AM_FIXEDLEN)) ||
2212 	    (dbp->type == DB_QUEUE)) {
2213 		if ((ret = __dbc_put_fixed_len(dbc, data, &newdata)) != 0)
2214 			goto err;
2215 	}
2216 
2217 	/* Validate any foreign databases, and update secondaries. (Step 3). */
2218 	if ((ret = __dbc_put_secondaries(dbc, key, &newdata,
2219 	    &olddata, s_count, all_skeys, &put_state))
2220 	    != 0)
2221 		goto err;
2222 	/*
2223 	 * If we've already got the old primary key/data pair, the secondary
2224 	 * updates are already done.
2225 	 */
2226 	if (FLD_ISSET(put_state, DBC_PUT_HAVEREC))
2227 		goto done;
2228 
2229 	/*
2230 	 * If still necessary, go get the old primary key/data.  (Step 4.)
2231 	 *
2232 	 * See the comments in step 2.  This is real familiar.
2233 	 */
2234 	if ((ret = __dbc_idup(dbc, &pdbc, 0)) != 0)
2235 		goto err;
2236 	DB_ASSERT(env, flags != DB_CURRENT);
2237 	F_SET(key, DB_DBT_ISSET);
2238 	ret = __dbc_get(pdbc, key, &olddata, rmw | DB_SET);
2239 	if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
2240 		FLD_SET(put_state, DBC_PUT_NODEL);
2241 		ret = 0;
2242 	}
2243 	if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
2244 		ret = t_ret;
2245 	if (ret != 0)
2246 		goto err;
2247 
2248 	/*
2249 	 * Check whether we do in fact have an old record we may need to
2250 	 * delete.  (Step 5).
2251 	 */
2252 	if (FLD_ISSET(put_state, DBC_PUT_NODEL))
2253 		goto done;
2254 
2255 	for (ret = __db_s_first(dbp, &sdbp), skeyp = all_skeys;
2256 	    sdbp != NULL && ret == 0;
2257 	    ret = __db_s_next(&sdbp, dbc->txn), skeyp++) {
2258 		DB_ASSERT(env, skeyp - all_skeys < s_count);
2259 		/*
2260 		 * Don't process this secondary if the key is immutable.  We
2261 		 * know that the old record exists, so this optimization can
2262 		 * always be used.
2263 		 */
2264 		if (FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
2265 			continue;
2266 
2267 		if ((ret = __dbc_del_oldskey(sdbp, dbc,
2268 		    skeyp, key, &olddata)) != 0 && ret != DB_KEYEXIST)
2269 			goto err;
2270 	}
2271 	if (ret != 0)
2272 		goto err;
2273 
2274 done:
2275 err:
2276 	if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
2277 		ret = t_ret;
2278 
2279 	/* If newdata or olddata were used, free their buffers. */
2280 	if (newdata.data != NULL && newdata.data != data->data)
2281 		__os_free(env, newdata.data);
2282 	if (olddata.data != NULL)
2283 		__os_ufree(env, olddata.data);
2284 
2285 	if (sdbp != NULL &&
2286 	    (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
2287 		ret = t_ret;
2288 
2289 	if (all_skeys != NULL) {
2290 		for (skeyp = all_skeys; skeyp - all_skeys < s_count; skeyp++) {
2291 			if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) {
2292 				for (nskey = skeyp->size,
2293 				    tskeyp = (DBT *)skeyp->data;
2294 				    nskey > 0;
2295 				    nskey--, tskeyp++)
2296 					FREE_IF_NEEDED(env, tskeyp);
2297 			}
2298 			FREE_IF_NEEDED(env, skeyp);
2299 		}
2300 		__os_free(env, all_skeys);
2301 	}
2302 	return (ret);
2303 }
2304 
2305 /*
2306  * __dbc_put --
2307  *	Put using a cursor.
2308  *
2309  * PUBLIC: int __dbc_put __P((DBC *, DBT *, DBT *, u_int32_t));
2310  */
2311 int
__dbc_put(dbc,key,data,flags)2312 __dbc_put(dbc, key, data, flags)
2313 	DBC *dbc;
2314 	DBT *key, *data;
2315 	u_int32_t flags;
2316 {
2317 	DB *dbp;
2318 	int ret;
2319 
2320 	dbp = dbc->dbp;
2321 	ret = 0;
2322 	F_CLR(dbc, DBC_ERROR);
2323 
2324 	/*
2325 	 * Putting to secondary indices is forbidden;  when we need to
2326 	 * internally update one, we're called with a private flag,
2327 	 * DB_UPDATE_SECONDARY, which does the right thing but won't return an
2328 	 * error during flag checking.
2329 	 *
2330 	 * As a convenience, many places that want the default DB_KEYLAST
2331 	 * behavior call DBC->put with flags == 0.  Protect lower-level code
2332 	 * here by translating that.
2333 	 *
2334 	 * Lastly, the DB_OVERWRITE_DUP flag is equivalent to DB_KEYLAST unless
2335 	 * there are sorted duplicates.  Limit the number of places that need
2336 	 * to test for it explicitly.
2337 	 */
2338 	if (flags == DB_UPDATE_SECONDARY || flags == 0 ||
2339 	    (flags == DB_OVERWRITE_DUP && !F_ISSET(dbp, DB_AM_DUPSORT)))
2340 		flags = DB_KEYLAST;
2341 
2342 	CDB_LOCKING_INIT(dbc->env, dbc);
2343 
2344 	PERFMON6(env, db, put, dbp->fname, dbp->dname,
2345 	    dbc->txn == NULL ? 0 : dbc->txn->txnid, key, data, flags);
2346 	/*
2347 	 * Check to see if we are a primary and have secondary indices.
2348 	 * If we are not, we save ourselves a good bit of trouble and
2349 	 * just skip to the "normal" put.
2350 	 */
2351 	if (DB_IS_PRIMARY(dbp) &&
2352 	    ((ret = __dbc_put_primary(dbc, key, data, flags)) != 0))
2353 		goto done;
2354 
2355 	/*
2356 	 * If this is an append operation, the insert was done prior to the
2357 	 * secondary updates, so we are finished.
2358 	 */
2359 	if (flags == DB_APPEND)
2360 		goto done;
2361 
2362 #ifdef HAVE_COMPRESSION
2363 	if (DB_IS_COMPRESSED(dbp))
2364 		ret = __bamc_compress_put(dbc, key, data, flags);
2365 	else
2366 #endif
2367 		ret = __dbc_iput(dbc, key, data, flags);
2368 
2369 done:	CDB_LOCKING_DONE(dbc->env, dbc);
2370 
2371 	return (ret);
2372 }
2373 
2374 /*
2375  * __dbc_iput --
2376  *	Implementation of put using a cursor.
2377  *
2378  * PUBLIC: int __dbc_iput __P((DBC *, DBT *, DBT *, u_int32_t));
2379  */
2380 int
__dbc_iput(dbc,key,data,flags)2381 __dbc_iput(dbc, key, data, flags)
2382 	DBC *dbc;
2383 	DBT *key, *data;
2384 	u_int32_t flags;
2385 {
2386 	DBC *dbc_n, *oldopd, *opd;
2387 	db_pgno_t pgno;
2388 	int ret, t_ret;
2389 	u_int32_t tmp_flags;
2390 
2391 	/*
2392 	 * Cursor Cleanup Note:
2393 	 * All of the cursors passed to the underlying access methods by this
2394 	 * routine are duplicated cursors.  On return, any referenced pages
2395 	 * will be discarded, and, if the cursor is not intended to be used
2396 	 * again, the close function will be called.  So, pages/locks that
2397 	 * the cursor references do not need to be resolved by the underlying
2398 	 * functions.
2399 	 */
2400 	dbc_n = NULL;
2401 	ret = t_ret = 0;
2402 
2403 	/*
2404 	 * If we have an off-page duplicates cursor, and the operation applies
2405 	 * to it, perform the operation.  Duplicate the cursor and call the
2406 	 * underlying function.
2407 	 *
2408 	 * Off-page duplicate trees are locked in the primary tree, that is,
2409 	 * we acquire a write lock in the primary tree and no locks in the
2410 	 * off-page dup tree.  If the put operation is done in an off-page
2411 	 * duplicate tree, call the primary cursor's upgrade routine first.
2412 	 */
2413 	if (dbc->internal->opd != NULL &&
2414 	    (flags == DB_AFTER || flags == DB_BEFORE || flags == DB_CURRENT)) {
2415 		/*
2416 		 * A special case for hash off-page duplicates.  Hash doesn't
2417 		 * support (and is documented not to support) put operations
2418 		 * relative to a cursor which references an already deleted
2419 		 * item.  For consistency, apply the same criteria to off-page
2420 		 * duplicates as well.
2421 		 */
2422 		if (dbc->dbtype == DB_HASH && F_ISSET(
2423 		    ((BTREE_CURSOR *)(dbc->internal->opd->internal)),
2424 		    C_DELETED)) {
2425 			ret = DBC_ERR(dbc, DB_NOTFOUND);
2426 			goto err;
2427 		}
2428 
2429 		if ((ret = dbc->am_writelock(dbc)) != 0 ||
2430 		    (ret = __dbc_dup(dbc, &dbc_n, DB_POSITION)) != 0)
2431 			goto err;
2432 		opd = dbc_n->internal->opd;
2433 		if ((ret = opd->am_put(
2434 		    opd, key, data, flags, NULL)) != 0)
2435 			goto err;
2436 		goto done;
2437 	}
2438 
2439 	/*
2440 	 * Perform an operation on the main cursor.  Duplicate the cursor,
2441 	 * and call the underlying function.
2442 	 */
2443 	if (flags == DB_AFTER || flags == DB_BEFORE || flags == DB_CURRENT)
2444 		tmp_flags = DB_POSITION;
2445 	else
2446 		tmp_flags = 0;
2447 
2448 	/*
2449 	 * If this cursor is going to be closed immediately, we don't
2450 	 * need to take precautions to clean it up on error.
2451 	 */
2452 	if (F_ISSET(dbc, DBC_TRANSIENT | DBC_PARTITIONED))
2453 		dbc_n = dbc;
2454 	else if ((ret = __dbc_idup(dbc, &dbc_n, tmp_flags)) != 0)
2455 		goto err;
2456 
2457 	pgno = PGNO_INVALID;
2458 	if ((ret = dbc_n->am_put(dbc_n, key, data, flags, &pgno)) != 0)
2459 		goto err;
2460 
2461 	/*
2462 	 * We may be referencing a new off-page duplicates tree.  Acquire
2463 	 * a new cursor and call the underlying function.
2464 	 */
2465 	if (pgno != PGNO_INVALID) {
2466 		oldopd = dbc_n->internal->opd;
2467 		if ((ret = __dbc_newopd(dbc, pgno, oldopd, &opd)) != 0) {
2468 			dbc_n->internal->opd = opd;
2469 			goto err;
2470 		}
2471 
2472 		dbc_n->internal->opd = opd;
2473 		opd->internal->pdbc = dbc_n;
2474 
2475 		if (flags == DB_NOOVERWRITE)
2476 			flags = DB_KEYLAST;
2477 		if ((ret = opd->am_put(
2478 		    opd, key, data, flags, NULL)) != 0)
2479 			goto err;
2480 	}
2481 
2482 done:
2483 err:	/* Cleanup and cursor resolution. */
2484 	if (dbc_n != NULL && !DB_RETOK_DBCPUT(ret))
2485 		F_SET(dbc_n, DBC_ERROR);
2486 	if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
2487 		ret = t_ret;
2488 	return (ret);
2489 }
2490 
2491 /*
2492  * __dbc_del_oldskey --
2493  *	Delete an old secondary key, if necessary.
2494  *	Returns DB_KEYEXIST if the new and old keys match..
2495  */
2496 static int
__dbc_del_oldskey(sdbp,dbc,skey,pkey,olddata)2497 __dbc_del_oldskey(sdbp, dbc, skey, pkey, olddata)
2498 	DB *sdbp;
2499 	DBC *dbc;
2500 	DBT *skey, *pkey, *olddata;
2501 {
2502 	DB *dbp;
2503 	DBC *sdbc;
2504 	DBT *toldskeyp, *tskeyp;
2505 	DBT oldskey, temppkey, tempskey;
2506 	ENV *env;
2507 	int ret, t_ret;
2508 	u_int32_t i, noldskey, nsame, nskey, rmw;
2509 
2510 	sdbc = NULL;
2511 	dbp = sdbp->s_primary;
2512 	env = dbp->env;
2513 	nsame = 0;
2514 	rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
2515 
2516 	/*
2517 	 * Get the old secondary key.
2518 	 */
2519 	memset(&oldskey, 0, sizeof(DBT));
2520 	if ((ret = sdbp->s_callback(sdbp, pkey, olddata, &oldskey)) != 0) {
2521 		if (ret == DB_DONOTINDEX ||
2522 		    (F_ISSET(&oldskey, DB_DBT_MULTIPLE) && oldskey.size == 0))
2523 			/* There's no old key to delete. */
2524 			ret = 0;
2525 		return (ret);
2526 	}
2527 
2528 	if (F_ISSET(&oldskey, DB_DBT_MULTIPLE)) {
2529 #ifdef DIAGNOSTIC
2530 		__db_check_skeyset(sdbp, &oldskey);
2531 #endif
2532 		toldskeyp = (DBT *)oldskey.data;
2533 		noldskey = oldskey.size;
2534 	} else {
2535 		toldskeyp = &oldskey;
2536 		noldskey = 1;
2537 	}
2538 
2539 	if (F_ISSET(skey, DB_DBT_MULTIPLE)) {
2540 		nskey = skey->size;
2541 		skey = (DBT *)skey->data;
2542 	} else
2543 		nskey = F_ISSET(skey, DB_DBT_ISSET) ? 1 : 0;
2544 
2545 	for (; noldskey > 0 && ret == 0; noldskey--, toldskeyp++) {
2546 		/*
2547 		 * Check whether this old secondary key is also a new key
2548 		 * before we delete it.  Note that bt_compare is (and must be)
2549 		 * set no matter what access method we're in.
2550 		 */
2551 		for (i = 0, tskeyp = skey; i < nskey; i++, tskeyp++)
2552 			if (((BTREE *)sdbp->bt_internal)->bt_compare(sdbp,
2553 			    toldskeyp, tskeyp, NULL) == 0) {
2554 				nsame++;
2555 				F_CLR(tskeyp, DB_DBT_ISSET);
2556 				break;
2557 			}
2558 
2559 		if (i < nskey) {
2560 			FREE_IF_NEEDED(env, toldskeyp);
2561 			continue;
2562 		}
2563 
2564 		if (sdbc == NULL) {
2565 			if ((ret = __db_cursor_int(sdbp,
2566 			    dbc->thread_info, dbc->txn, sdbp->type,
2567 			    PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
2568 				goto err;
2569 			if (CDB_LOCKING(env)) {
2570 				DB_ASSERT(env,
2571 				    sdbc->mylock.off == LOCK_INVALID);
2572 				F_SET(sdbc, DBC_WRITER);
2573 			}
2574 		}
2575 
2576 		/*
2577 		 * Don't let c_get(DB_GET_BOTH) stomp on our data.  Use
2578 		 * temporary DBTs instead.
2579 		 */
2580 		SWAP_IF_NEEDED(sdbp, pkey);
2581 		DB_INIT_DBT(temppkey, pkey->data, pkey->size);
2582 		DB_INIT_DBT(tempskey, toldskeyp->data, toldskeyp->size);
2583 		if ((ret = __dbc_get(sdbc,
2584 		    &tempskey, &temppkey, rmw | DB_GET_BOTH)) == 0)
2585 			ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY);
2586 		else if (ret == DB_NOTFOUND)
2587 			ret = __db_secondary_corrupt(dbp);
2588 		SWAP_IF_NEEDED(sdbp, pkey);
2589 		FREE_IF_NEEDED(env, toldskeyp);
2590 	}
2591 
2592 err:	for (; noldskey > 0; noldskey--, toldskeyp++)
2593 		FREE_IF_NEEDED(env, toldskeyp);
2594 	FREE_IF_NEEDED(env, &oldskey);
2595 	if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
2596 		ret = t_ret;
2597 	if (ret == 0 && nsame == nskey)
2598 		return (DB_KEYEXIST);
2599 	return (ret);
2600 }
2601 
2602 /*
2603  * __db_duperr()
2604  *	Error message: we don't currently support sorted duplicate duplicates.
2605  * PUBLIC: int __db_duperr __P((DB *, u_int32_t));
2606  */
2607 int
__db_duperr(dbp,flags)2608 __db_duperr(dbp, flags)
2609 	DB *dbp;
2610 	u_int32_t flags;
2611 {
2612 	/*
2613 	 * If we run into this error while updating a secondary index,
2614 	 * don't yell--there's no clean way to pass DB_NODUPDATA in along
2615 	 * with DB_UPDATE_SECONDARY, but we may run into this problem
2616 	 * in a normal, non-error course of events.
2617 	 *
2618 	 * !!!
2619 	 * If and when we ever permit duplicate duplicates in sorted-dup
2620 	 * databases, we need to either change the secondary index code
2621 	 * to check for dup dups, or we need to maintain the implicit
2622 	 * "DB_NODUPDATA" behavior for databases with DB_AM_SECONDARY set.
2623 	 */
2624 	if (flags != DB_NODUPDATA && !F_ISSET(dbp, DB_AM_SECONDARY))
2625 		__db_errx(dbp->env, DB_STR("0696",
2626 		    "Duplicate data items are not supported with sorted data"));
2627 	return (DB_KEYEXIST);
2628 }
2629 
2630 /*
2631  * __dbc_cleanup --
2632  *	Clean up duplicate cursors.
2633  *
2634  * PUBLIC: int __dbc_cleanup __P((DBC *, DBC *, int));
2635  */
2636 int
__dbc_cleanup(dbc,dbc_n,failed)2637 __dbc_cleanup(dbc, dbc_n, failed)
2638 	DBC *dbc, *dbc_n;
2639 	int failed;
2640 {
2641 	DB *dbp;
2642 	DBC *opd;
2643 	DBC_INTERNAL *internal;
2644 	DB_MPOOLFILE *mpf;
2645 	int ret, t_ret;
2646 
2647 	if (F_ISSET(dbc, DBC_OPD))
2648 		LOCK_CHECK_OFF(dbc->thread_info);
2649 
2650 	dbp = dbc->dbp;
2651 	mpf = dbp->mpf;
2652 	internal = dbc->internal;
2653 	ret = 0;
2654 
2655 	/* Discard any pages we're holding. */
2656 	if (internal->page != NULL) {
2657 		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2658 		     internal->page, dbc->priority)) != 0 && ret == 0)
2659 			ret = t_ret;
2660 		internal->page = NULL;
2661 	}
2662 	opd = internal->opd;
2663 	if (opd != NULL && opd->internal->page != NULL) {
2664 		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2665 		    opd->internal->page, dbc->priority)) != 0 && ret == 0)
2666 			ret = t_ret;
2667 		opd->internal->page = NULL;
2668 	}
2669 
2670 	/*
2671 	 * If dbc_n is NULL, there's no internal cursor swapping to be done
2672 	 * and no dbc_n to close--we probably did the entire operation on an
2673 	 * offpage duplicate cursor.  Just return.
2674 	 *
2675 	 * If dbc and dbc_n are the same, we're either inside a DB->{put/get}
2676 	 * operation, and as an optimization we performed the operation on
2677 	 * the main cursor rather than on a duplicated one, or we're in a
2678 	 * bulk get that can't have moved the cursor (DB_MULTIPLE with the
2679 	 * initial c_get operation on an off-page dup cursor).  Just
2680 	 * return--either we know we didn't move the cursor, or we're going
2681 	 * to close it before we return to application code, so we're sure
2682 	 * not to visibly violate the "cursor stays put on error" rule.
2683 	 */
2684 	if (dbc_n == NULL || dbc == dbc_n)
2685 		goto done;
2686 
2687 	if (dbc_n->internal->page != NULL) {
2688 		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2689 		    dbc_n->internal->page, dbc->priority)) != 0 && ret == 0)
2690 			ret = t_ret;
2691 		dbc_n->internal->page = NULL;
2692 	}
2693 	opd = dbc_n->internal->opd;
2694 	if (opd != NULL && opd->internal->page != NULL) {
2695 		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2696 		     opd->internal->page, dbc->priority)) != 0 && ret == 0)
2697 			ret = t_ret;
2698 		opd->internal->page = NULL;
2699 	}
2700 
2701 	/*
2702 	 * If we didn't fail before entering this routine or just now when
2703 	 * freeing pages, swap the interesting contents of the old and new
2704 	 * cursors.
2705 	 */
2706 	if (!failed && ret == 0) {
2707 		MUTEX_LOCK(dbp->env, dbp->mutex);
2708 		if (opd != NULL)
2709 			opd->internal->pdbc = dbc;
2710 		if (internal->opd != NULL)
2711 			internal->opd->internal->pdbc = dbc_n;
2712 		dbc->internal = dbc_n->internal;
2713 		dbc_n->internal = internal;
2714 		MUTEX_UNLOCK(dbp->env, dbp->mutex);
2715 	}
2716 
2717 	/*
2718 	 * Close the cursor we don't care about anymore.  The close can fail,
2719 	 * but we only expect DB_LOCK_DEADLOCK failures.  This violates our
2720 	 * "the cursor is unchanged on error" semantics, but since all you can
2721 	 * do with a DB_LOCK_DEADLOCK failure is close the cursor, I believe
2722 	 * that's OK.
2723 	 *
2724 	 * XXX
2725 	 * There's no way to recover from failure to close the old cursor.
2726 	 * All we can do is move to the new position and return an error.
2727 	 *
2728 	 * XXX
2729 	 * We might want to consider adding a flag to the cursor, so that any
2730 	 * subsequent operations other than close just return an error?
2731 	 */
2732 	if ((t_ret = __dbc_close(dbc_n)) != 0 && ret == 0)
2733 		ret = t_ret;
2734 
2735 	/*
2736 	 * If this was an update that is supporting dirty reads
2737 	 * then we may have just swapped our read for a write lock
2738 	 * which is held by the surviving cursor.  We need
2739 	 * to explicitly downgrade this lock.  The closed cursor
2740 	 * may only have had a read lock.
2741 	 */
2742 	if (ret == 0 && failed == 0 && F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
2743 	    dbc->internal->lock_mode == DB_LOCK_WRITE &&
2744 	    (ret = __TLPUT(dbc, dbc->internal->lock)) == 0)
2745 		dbc->internal->lock_mode = DB_LOCK_WWRITE;
2746 
2747 done:
2748 	if (F_ISSET(dbc, DBC_OPD))
2749 		LOCK_CHECK_ON(dbc->thread_info);
2750 
2751 	return (ret);
2752 }
2753 
2754 /*
2755  * __dbc_secondary_get_pp --
2756  *	This wrapper function for DBC->pget() is the DBC->get() function
2757  *	for a secondary index cursor.
2758  *
2759  * PUBLIC: int __dbc_secondary_get_pp __P((DBC *, DBT *, DBT *, u_int32_t));
2760  */
2761 int
__dbc_secondary_get_pp(dbc,skey,data,flags)2762 __dbc_secondary_get_pp(dbc, skey, data, flags)
2763 	DBC *dbc;
2764 	DBT *skey, *data;
2765 	u_int32_t flags;
2766 {
2767 	DB_ASSERT(dbc->env, F_ISSET(dbc->dbp, DB_AM_SECONDARY));
2768 	return (__dbc_pget_pp(dbc, skey, NULL, data, flags));
2769 }
2770 
2771 /*
2772  * __dbc_pget --
2773  *	Get a primary key/data pair through a secondary index.
2774  *
2775  * PUBLIC: int __dbc_pget __P((DBC *, DBT *, DBT *, DBT *, u_int32_t));
2776  */
2777 int
__dbc_pget(dbc,skey,pkey,data,flags)2778 __dbc_pget(dbc, skey, pkey, data, flags)
2779 	DBC *dbc;
2780 	DBT *skey, *pkey, *data;
2781 	u_int32_t flags;
2782 {
2783 	DB *pdbp, *sdbp;
2784 	DBC *dbc_n, *pdbc;
2785 	DBT nullpkey, *save_data;
2786 	u_int32_t save_pkey_flags, tmp_flags, tmp_read_locking, tmp_rmw;
2787 	int pkeymalloc, ret, t_ret;
2788 
2789 	sdbp = dbc->dbp;
2790 	pdbp = sdbp->s_primary;
2791 	dbc_n = NULL;
2792 	save_data = NULL;
2793 	pkeymalloc = t_ret = 0;
2794 
2795 	/*
2796 	 * The challenging part of this function is getting the behavior
2797 	 * right for all the various permutations of DBT flags.  The
2798 	 * next several blocks handle the various cases we need to
2799 	 * deal with specially.
2800 	 */
2801 
2802 	/*
2803 	 * We may be called with a NULL pkey argument, if we've been
2804 	 * wrapped by a 2-DBT get call.  If so, we need to use our
2805 	 * own DBT.
2806 	 */
2807 	if (pkey == NULL) {
2808 		memset(&nullpkey, 0, sizeof(DBT));
2809 		pkey = &nullpkey;
2810 	}
2811 
2812 	/* Clear OR'd in additional bits so we can check for flag equality. */
2813 	tmp_rmw = LF_ISSET(DB_RMW);
2814 	LF_CLR(DB_RMW);
2815 
2816 	SET_READ_LOCKING_FLAGS(dbc, tmp_read_locking);
2817 	/*
2818 	 * DB_GET_RECNO is a special case, because we're interested not in
2819 	 * the primary key/data pair, but rather in the primary's record
2820 	 * number.
2821 	 */
2822 	if (flags == DB_GET_RECNO) {
2823 		if (tmp_rmw)
2824 			F_SET(dbc, DBC_RMW);
2825 		F_SET(dbc, tmp_read_locking);
2826 		ret = __dbc_pget_recno(dbc, pkey, data, flags);
2827 		if (tmp_rmw)
2828 			F_CLR(dbc, DBC_RMW);
2829 		/* Clear the temp flags, but leave WAS_READ_COMMITTED. */
2830 		F_CLR(dbc, tmp_read_locking & ~DBC_WAS_READ_COMMITTED);
2831 		return (ret);
2832 	}
2833 
2834 	/*
2835 	 * If the DBTs we've been passed don't have any of the
2836 	 * user-specified memory management flags set, we want to make sure
2837 	 * we return values using the DBTs dbc->rskey, dbc->rkey, and
2838 	 * dbc->rdata, respectively.
2839 	 *
2840 	 * There are two tricky aspects to this:  first, we need to pass
2841 	 * skey and pkey *in* to the initial c_get on the secondary key,
2842 	 * since either or both may be looked at by it (depending on the
2843 	 * get flag).  Second, we must not use a normal DB->get call
2844 	 * on the secondary, even though that's what we want to accomplish,
2845 	 * because the DB handle may be free-threaded.  Instead,
2846 	 * we open a cursor, then take steps to ensure that we actually use
2847 	 * the rkey/rdata from the *secondary* cursor.
2848 	 *
2849 	 * We accomplish all this by passing in the DBTs we started out
2850 	 * with to the c_get, but swapping the contents of rskey and rkey,
2851 	 * respectively, into rkey and rdata;  __db_ret will treat them like
2852 	 * the normal key/data pair in a c_get call, and will realloc them as
2853 	 * need be (this is "step 1").  Then, for "step 2", we swap back
2854 	 * rskey/rkey/rdata to normal, and do a get on the primary with the
2855 	 * secondary dbc appointed as the owner of the returned-data memory.
2856 	 *
2857 	 * Note that in step 2, we copy the flags field in case we need to
2858 	 * pass down a DB_DBT_PARTIAL or other flag that is compatible with
2859 	 * letting DB do the memory management.
2860 	 */
2861 
2862 	/*
2863 	 * It is correct, though slightly sick, to attempt a partial get of a
2864 	 * primary key.  However, if we do so here, we'll never find the
2865 	 * primary record;  clear the DB_DBT_PARTIAL field of pkey just for the
2866 	 * duration of the next call.
2867 	 */
2868 	save_pkey_flags = pkey->flags;
2869 	F_CLR(pkey, DB_DBT_PARTIAL);
2870 
2871 	/*
2872 	 * Now we can go ahead with the meat of this call.  First, get the
2873 	 * primary key from the secondary index.  (What exactly we get depends
2874 	 * on the flags, but the underlying cursor get will take care of the
2875 	 * dirty work.)  Duplicate the cursor, in case the later get on the
2876 	 * primary fails.
2877 	 */
2878 	switch (flags) {
2879 	case DB_CURRENT:
2880 	case DB_GET_BOTHC:
2881 	case DB_NEXT:
2882 	case DB_NEXT_DUP:
2883 	case DB_NEXT_NODUP:
2884 	case DB_PREV:
2885 	case DB_PREV_DUP:
2886 	case DB_PREV_NODUP:
2887 		tmp_flags = DB_POSITION;
2888 		break;
2889 	default:
2890 		tmp_flags = 0;
2891 		break;
2892 	}
2893 
2894 	if (dbc->internal->opd != NULL ||
2895 	     F_ISSET(dbc, DBC_PARTITIONED | DBC_TRANSIENT)) {
2896 		dbc_n = dbc;
2897 		save_data = dbc_n->rdata;
2898 	} else {
2899 		if ((ret = __dbc_dup(dbc, &dbc_n, tmp_flags)) != 0)
2900 			return (ret);
2901 		F_SET(dbc_n, DBC_TRANSIENT);
2902 	}
2903 	dbc_n->rdata = dbc->rkey;
2904 	dbc_n->rkey = dbc->rskey;
2905 
2906 	if (tmp_rmw)
2907 		F_SET(dbc_n, DBC_RMW);
2908 	F_SET(dbc_n, tmp_read_locking);
2909 
2910 	/*
2911 	 * If we've been handed a primary key, it will be in native byte order,
2912 	 * so we need to swap it before reading from the secondary.
2913 	 */
2914 	if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
2915 	    flags == DB_GET_BOTH_RANGE)
2916 		SWAP_IF_NEEDED(sdbp, pkey);
2917 
2918 retry:	/* Step 1. */
2919 	ret = __dbc_get(dbc_n, skey, pkey, flags);
2920 	/* Restore pkey's flags in case we stomped the PARTIAL flag. */
2921 	pkey->flags = save_pkey_flags;
2922 
2923 	/*
2924 	 * We need to swap the primary key to native byte order if we read it
2925 	 * successfully, or if we swapped it on entry above.  We can't return
2926 	 * with the application's data modified.
2927 	 */
2928 	if (ret == 0 || flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
2929 	    flags == DB_GET_BOTH_RANGE)
2930 		SWAP_IF_NEEDED(sdbp, pkey);
2931 
2932 	if (ret != 0)
2933 		goto err;
2934 
2935 	/*
2936 	 * Now we're ready for "step 2".  If either or both of pkey and data do
2937 	 * not have memory management flags set--that is, if DB is managing
2938 	 * their memory--we need to swap around the rkey/rdata structures so
2939 	 * that we don't wind up trying to use memory managed by the primary
2940 	 * database cursor, which we'll close before we return.
2941 	 *
2942 	 * !!!
2943 	 * If you're carefully following the bouncing ball, you'll note that in
2944 	 * the DB-managed case, the buffer hanging off of pkey is the same as
2945 	 * dbc->rkey->data.  This is just fine;  we may well realloc and stomp
2946 	 * on it when we return, if we're doing a DB_GET_BOTH and need to
2947 	 * return a different partial or key (depending on the comparison
2948 	 * function), but this is safe.
2949 	 *
2950 	 * !!!
2951 	 * We need to use __db_cursor_int here rather than simply calling
2952 	 * pdbp->cursor, because otherwise, if we're in CDB, we'll allocate a
2953 	 * new locker ID and leave ourselves open to deadlocks.  (Even though
2954 	 * we're only acquiring read locks, we'll still block if there are any
2955 	 * waiters.)
2956 	 */
2957 	if ((ret = __db_cursor_int(pdbp, dbc->thread_info,
2958 	    dbc->txn, pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
2959 		goto err;
2960 
2961 	F_SET(pdbc, tmp_read_locking |
2962 	     F_ISSET(dbc, DBC_READ_UNCOMMITTED | DBC_READ_COMMITTED | DBC_RMW));
2963 
2964 	/*
2965 	 * We're about to use pkey a second time.  If DB_DBT_MALLOC is set on
2966 	 * it, we'll leak the memory we allocated the first time.  Thus, set
2967 	 * DB_DBT_REALLOC instead so that we reuse that memory instead of
2968 	 * leaking it.
2969 	 *
2970 	 * Alternatively, if the application is handling copying for pkey, we
2971 	 * need to take a copy now.  The copy will be freed on exit from
2972 	 * __dbc_pget_pp (and we must be coming through there if DB_DBT_USERCOPY
2973 	 * is set).  In the case of DB_GET_BOTH_RANGE, the pkey supplied by
2974 	 * the application has already been copied in but the value may have
2975 	 * changed in the search.  In that case, free the original copy and get
2976 	 * a new one.
2977 	 *
2978 	 * !!!
2979 	 * This assumes that the user must always specify a compatible realloc
2980 	 * function if a malloc function is specified.  I think this is a
2981 	 * reasonable requirement.
2982 	 */
2983 	if (F_ISSET(pkey, DB_DBT_MALLOC)) {
2984 		F_CLR(pkey, DB_DBT_MALLOC);
2985 		F_SET(pkey, DB_DBT_REALLOC);
2986 		pkeymalloc = 1;
2987 	} else if (F_ISSET(pkey, DB_DBT_USERCOPY)) {
2988 		if (flags == DB_GET_BOTH_RANGE)
2989 			__dbt_userfree(sdbp->env, NULL, pkey, NULL);
2990 		if ((ret = __dbt_usercopy(sdbp->env, pkey)) != 0)
2991 			goto err;
2992 	}
2993 
2994 	/*
2995 	 * Do the actual get.  Set DBC_TRANSIENT since we don't care about
2996 	 * preserving the position on error, and it's faster.  SET_RET_MEM so
2997 	 * that the secondary DBC owns any returned-data memory.
2998 	 */
2999 	F_SET(pdbc, DBC_TRANSIENT);
3000 	SET_RET_MEM(pdbc, dbc);
3001 	ret = __dbc_get(pdbc, pkey, data, DB_SET);
3002 	DB_ASSERT(pdbp->env, ret != DB_PAGE_NOTFOUND);
3003 
3004 	/*
3005 	 * If the item wasn't found in the primary, this is a bug; our
3006 	 * secondary has somehow gotten corrupted, and contains elements that
3007 	 * don't correspond to anything in the primary.  Complain.
3008 	 */
3009 
3010 	/* Now close the primary cursor. */
3011 	if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3012 		ret = t_ret;
3013 
3014 	else if (ret == DB_NOTFOUND) {
3015 		if (!F_ISSET(dbc, DBC_READ_UNCOMMITTED))
3016 			ret = __db_secondary_corrupt(pdbp);
3017 		else switch (flags) {
3018 		case DB_GET_BOTHC:
3019 		case DB_NEXT:
3020 		case DB_NEXT_DUP:
3021 		case DB_NEXT_NODUP:
3022 		case DB_PREV:
3023 		case DB_PREV_DUP:
3024 		case DB_PREV_NODUP:
3025 			PERFMON5(pdbp->env, race, dbc_get,
3026 			    sdbp->fname, sdbp->dname, ret, flags, pkey);
3027 			goto retry;
3028 		default:
3029 			break;
3030 		}
3031 	}
3032 
3033 err:	/* Cleanup and cursor resolution. */
3034 	if (dbc_n == dbc) {
3035 		dbc_n->rkey = dbc_n->rdata;
3036 		dbc_n->rdata = save_data;
3037 	}
3038 	if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
3039 		ret = t_ret;
3040 	if (pkeymalloc) {
3041 		/*
3042 		 * If pkey had a MALLOC flag, we need to restore it; otherwise,
3043 		 * if the user frees the buffer but reuses the DBT without
3044 		 * NULL'ing its data field or changing the flags, we may drop
3045 		 * core.
3046 		 */
3047 		F_CLR(pkey, DB_DBT_REALLOC);
3048 		F_SET(pkey, DB_DBT_MALLOC);
3049 	}
3050 
3051 	return (ret);
3052 }
3053 
3054 /*
3055  * __dbc_pget_recno --
3056  *	Perform a DB_GET_RECNO c_pget on a secondary index.  Returns
3057  * the secondary's record number in the pkey field and the primary's
3058  * in the data field.
3059  */
3060 static int
__dbc_pget_recno(sdbc,pkey,data,flags)3061 __dbc_pget_recno(sdbc, pkey, data, flags)
3062 	DBC *sdbc;
3063 	DBT *pkey, *data;
3064 	u_int32_t flags;
3065 {
3066 	DB *pdbp, *sdbp;
3067 	DBC *pdbc;
3068 	DBT discardme, primary_key;
3069 	ENV *env;
3070 	db_recno_t oob;
3071 	u_int32_t rmw;
3072 	int ret, t_ret;
3073 
3074 	sdbp = sdbc->dbp;
3075 	pdbp = sdbp->s_primary;
3076 	env = sdbp->env;
3077 	pdbc = NULL;
3078 	ret = t_ret = 0;
3079 
3080 	rmw = LF_ISSET(DB_RMW);
3081 
3082 	memset(&discardme, 0, sizeof(DBT));
3083 	F_SET(&discardme, DB_DBT_USERMEM | DB_DBT_PARTIAL);
3084 
3085 	oob = RECNO_OOB;
3086 
3087 	/*
3088 	 * If the primary is an rbtree, we want its record number, whether
3089 	 * or not the secondary is one too.  Fetch the recno into "data".
3090 	 *
3091 	 * If it's not an rbtree, return RECNO_OOB in "data".
3092 	 */
3093 	if (F_ISSET(pdbp, DB_AM_RECNUM)) {
3094 		/*
3095 		 * Get the primary key, so we can find the record number
3096 		 * in the primary. (We're uninterested in the secondary key.)
3097 		 */
3098 		memset(&primary_key, 0, sizeof(DBT));
3099 		F_SET(&primary_key, DB_DBT_MALLOC);
3100 		if ((ret = __dbc_get(sdbc,
3101 		    &discardme, &primary_key, rmw | DB_CURRENT)) != 0)
3102 			return (ret);
3103 
3104 		/*
3105 		 * Open a cursor on the primary, set it to the right record,
3106 		 * and fetch its recno into "data".
3107 		 *
3108 		 * (See __dbc_pget for comments on the use of __db_cursor_int.)
3109 		 *
3110 		 * SET_RET_MEM so that the secondary DBC owns any returned-data
3111 		 * memory.
3112 		 */
3113 		if ((ret = __db_cursor_int(pdbp, sdbc->thread_info, sdbc->txn,
3114 		    pdbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
3115 			goto perr;
3116 		SET_RET_MEM(pdbc, sdbc);
3117 		if ((ret = __dbc_get(pdbc,
3118 		    &primary_key, &discardme, rmw | DB_SET)) != 0)
3119 			goto perr;
3120 
3121 		ret = __dbc_get(pdbc, &discardme, data, rmw | DB_GET_RECNO);
3122 
3123 perr:		__os_ufree(env, primary_key.data);
3124 		if (pdbc != NULL &&
3125 		    (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3126 			ret = t_ret;
3127 		if (ret != 0)
3128 			return (ret);
3129 	} else if ((ret = __db_retcopy(env, data, &oob,
3130 		    sizeof(oob), &sdbc->rkey->data, &sdbc->rkey->ulen)) != 0)
3131 			return (ret);
3132 
3133 	/*
3134 	 * If the secondary is an rbtree, we want its record number, whether
3135 	 * or not the primary is one too.  Fetch the recno into "pkey".
3136 	 *
3137 	 * If it's not an rbtree, return RECNO_OOB in "pkey".
3138 	 */
3139 	if (F_ISSET(sdbp, DB_AM_RECNUM))
3140 		return (__dbc_get(sdbc, &discardme, pkey, flags));
3141 	else
3142 		return (__db_retcopy(env, pkey, &oob,
3143 		    sizeof(oob), &sdbc->rdata->data, &sdbc->rdata->ulen));
3144 }
3145 
3146 /*
3147  * __db_wrlock_err -- do not have a write lock.
3148  */
3149 static int
__db_wrlock_err(env)3150 __db_wrlock_err(env)
3151 	ENV *env;
3152 {
3153 	__db_errx(env, DB_STR("0697", "Write attempted on read-only cursor"));
3154 	return (EPERM);
3155 }
3156 
3157 /*
3158  * __dbc_del_secondary --
3159  *	Perform a delete operation on a secondary index:  call through
3160  *	to the primary and delete the primary record that this record
3161  *	points to.
3162  *
3163  *	Note that deleting the primary record will call c_del on all
3164  *	the secondaries, including this one;  thus, it is not necessary
3165  *	to execute both this function and an actual delete.
3166  */
3167 static int
__dbc_del_secondary(dbc)3168 __dbc_del_secondary(dbc)
3169 	DBC *dbc;
3170 {
3171 	DB *pdbp;
3172 	DBC *pdbc;
3173 	DBT skey, pkey;
3174 	ENV *env;
3175 	int ret, t_ret;
3176 	u_int32_t rmw;
3177 
3178 	pdbp = dbc->dbp->s_primary;
3179 	env = pdbp->env;
3180 	rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
3181 
3182 	/*
3183 	 * Get the current item that we're pointing at.
3184 	 * We don't actually care about the secondary key, just
3185 	 * the primary.
3186 	 */
3187 	memset(&skey, 0, sizeof(DBT));
3188 	memset(&pkey, 0, sizeof(DBT));
3189 	F_SET(&skey, DB_DBT_PARTIAL | DB_DBT_USERMEM);
3190 	if ((ret = __dbc_get(dbc, &skey, &pkey, DB_CURRENT)) != 0)
3191 		return (ret);
3192 
3193 	SWAP_IF_NEEDED(dbc->dbp, &pkey);
3194 	DEBUG_LWRITE(dbc, dbc->txn, "del_secondary", &skey, &pkey, 0);
3195 
3196 	/*
3197 	 * Create a cursor on the primary with our locker ID,
3198 	 * so that when it calls back, we don't conflict.
3199 	 *
3200 	 * We create a cursor explicitly because there's no
3201 	 * way to specify the same locker ID if we're using
3202 	 * locking but not transactions if we use the DB->del
3203 	 * interface.  This shouldn't be any less efficient
3204 	 * anyway.
3205 	 */
3206 	if ((ret = __db_cursor_int(pdbp, dbc->thread_info, dbc->txn,
3207 	    pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
3208 		return (ret);
3209 
3210 	/*
3211 	 * See comment in __dbc_put--if we're in CDB,
3212 	 * we already hold the locks we need, and we need to flag
3213 	 * the cursor as a WRITER so we don't run into errors
3214 	 * when we try to delete.
3215 	 */
3216 	if (CDB_LOCKING(env)) {
3217 		DB_ASSERT(env, pdbc->mylock.off == LOCK_INVALID);
3218 		F_SET(pdbc, DBC_WRITER);
3219 	}
3220 
3221 	/*
3222 	 * Set the new cursor to the correct primary key.  Then
3223 	 * delete it.  We don't really care about the datum;
3224 	 * just reuse our skey DBT.
3225 	 *
3226 	 * If the primary get returns DB_NOTFOUND, something is amiss--
3227 	 * every record in the secondary should correspond to some record
3228 	 * in the primary.
3229 	 */
3230 	if ((ret = __dbc_get(pdbc, &pkey, &skey, DB_SET | rmw)) == 0)
3231 		ret = __dbc_del(pdbc, 0);
3232 	else if (ret == DB_NOTFOUND)
3233 		ret = __db_secondary_corrupt(pdbp);
3234 
3235 	if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3236 		ret = t_ret;
3237 
3238 	return (ret);
3239 }
3240 
3241 /*
3242  * __dbc_del_primary --
3243  *	Perform a delete operation on a primary index.  Loop through
3244  *	all the secondary indices which correspond to this primary
3245  *	database, and delete any secondary keys that point at the current
3246  *	record.
3247  *
3248  * PUBLIC: int __dbc_del_primary __P((DBC *));
3249  */
3250 int
__dbc_del_primary(dbc)3251 __dbc_del_primary(dbc)
3252 	DBC *dbc;
3253 {
3254 	DB *dbp, *sdbp;
3255 	DBC *sdbc;
3256 	DBT *tskeyp;
3257 	DBT data, pkey, skey, temppkey, tempskey;
3258 	ENV *env;
3259 	u_int32_t nskey, rmw;
3260 	int ret, t_ret;
3261 
3262 	dbp = dbc->dbp;
3263 	env = dbp->env;
3264 	sdbp = NULL;
3265 	rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
3266 
3267 	/*
3268 	 * If we're called at all, we have at least one secondary.
3269 	 * (Unfortunately, we can't assert this without grabbing the mutex.)
3270 	 * Get the current record so that we can construct appropriate
3271 	 * secondary keys as needed.
3272 	 */
3273 	memset(&pkey, 0, sizeof(DBT));
3274 	memset(&data, 0, sizeof(DBT));
3275 	if ((ret = __dbc_get(dbc, &pkey, &data, DB_CURRENT)) != 0)
3276 		return (ret);
3277 
3278 	memset(&skey, 0, sizeof(DBT));
3279 	for (ret = __db_s_first(dbp, &sdbp);
3280 	    sdbp != NULL && ret == 0;
3281 	    ret = __db_s_next(&sdbp, dbc->txn)) {
3282 		/*
3283 		 * Get the secondary key for this secondary and the current
3284 		 * item.
3285 		 */
3286 		if ((ret = sdbp->s_callback(sdbp, &pkey, &data, &skey)) != 0) {
3287 			/* Not indexing is equivalent to an empty key set. */
3288 			if (ret == DB_DONOTINDEX) {
3289 				F_SET(&skey, DB_DBT_MULTIPLE);
3290 				skey.size = 0;
3291 			} else /* We had a substantive error.  Bail. */
3292 				goto err;
3293 		}
3294 
3295 #ifdef DIAGNOSTIC
3296 		if (F_ISSET(&skey, DB_DBT_MULTIPLE))
3297 			__db_check_skeyset(sdbp, &skey);
3298 #endif
3299 
3300 		if (F_ISSET(&skey, DB_DBT_MULTIPLE)) {
3301 			tskeyp = (DBT *)skey.data;
3302 			nskey = skey.size;
3303 			if (nskey == 0)
3304 				continue;
3305 		} else {
3306 			tskeyp = &skey;
3307 			nskey = 1;
3308 		}
3309 
3310 		/* Open a secondary cursor. */
3311 		if ((ret = __db_cursor_int(sdbp,
3312 		    dbc->thread_info, dbc->txn, sdbp->type,
3313 		    PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
3314 			goto err;
3315 		/* See comment above and in __dbc_put. */
3316 		if (CDB_LOCKING(env)) {
3317 			DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
3318 			F_SET(sdbc, DBC_WRITER);
3319 		}
3320 
3321 		for (; nskey > 0; nskey--, tskeyp++) {
3322 			/*
3323 			 * Set the secondary cursor to the appropriate item.
3324 			 * Delete it.
3325 			 *
3326 			 * We want to use DB_RMW if locking is on; it's only
3327 			 * legal then, though.
3328 			 *
3329 			 * !!!
3330 			 * Don't stomp on any callback-allocated buffer in skey
3331 			 * when we do a c_get(DB_GET_BOTH); use a temp DBT
3332 			 * instead.  Similarly, don't allow pkey to be
3333 			 * invalidated when the cursor is closed.
3334 			 */
3335 			DB_INIT_DBT(tempskey, tskeyp->data, tskeyp->size);
3336 			SWAP_IF_NEEDED(sdbp, &pkey);
3337 			DB_INIT_DBT(temppkey, pkey.data, pkey.size);
3338 			if ((ret = __dbc_get(sdbc, &tempskey, &temppkey,
3339 			    DB_GET_BOTH | rmw)) == 0)
3340 				ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY);
3341 			else if (ret == DB_NOTFOUND)
3342 				ret = __db_secondary_corrupt(dbp);
3343 			SWAP_IF_NEEDED(sdbp, &pkey);
3344 			FREE_IF_NEEDED(env, tskeyp);
3345 		}
3346 
3347 		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
3348 			ret = t_ret;
3349 		if (ret != 0)
3350 			goto err;
3351 
3352 		/*
3353 		 * In the common case where there is a single secondary key, we
3354 		 * will have freed any application-allocated data in skey
3355 		 * already.  In the multiple key case, we need to free it here.
3356 		 * It is safe to do this twice as the macro resets the data
3357 		 * field.
3358 		 */
3359 		FREE_IF_NEEDED(env, &skey);
3360 	}
3361 
3362 err:	if (sdbp != NULL &&
3363 	    (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
3364 		ret = t_ret;
3365 	FREE_IF_NEEDED(env, &skey);
3366 	return (ret);
3367 }
3368 
3369 /*
3370  * __dbc_del_foreign --
3371  *	Apply the foreign database constraints for a particular foreign
3372  *	database when an item is being deleted (dbc points at item being deleted
3373  *	in the foreign database.)
3374  *
3375  *      Delete happens in dbp, check for occurrences of key in pdpb.
3376  *      Terminology:
3377  *        Foreign db = Where delete occurs (dbp).
3378  *        Secondary db = Where references to dbp occur (sdbp, a secondary)
3379  *        Primary db = sdbp's primary database, references to dbp are secondary
3380  *                      keys here
3381  *        Foreign Key = Key being deleted in dbp (fkey)
3382  *        Primary Key = Key of the corresponding entry in sdbp's primary (pkey).
3383  */
3384 static int
__dbc_del_foreign(dbc)3385 __dbc_del_foreign(dbc)
3386 	DBC *dbc;
3387 {
3388 	DB_FOREIGN_INFO *f_info;
3389 	DB *dbp, *pdbp, *sdbp;
3390 	DBC *pdbc, *sdbc;
3391 	DBT data, fkey, pkey;
3392 	ENV *env;
3393 	u_int32_t flags, rmw;
3394 	int changed, ret, t_ret;
3395 
3396 	dbp = dbc->dbp;
3397 	env = dbp->env;
3398 
3399 	memset(&fkey, 0, sizeof(DBT));
3400 	memset(&data, 0, sizeof(DBT));
3401 	if ((ret = __dbc_get(dbc, &fkey, &data, DB_CURRENT)) != 0)
3402 		return (ret);
3403 
3404 	LIST_FOREACH(f_info, &(dbp->f_primaries), f_links) {
3405 		sdbp = f_info->dbp;
3406 		pdbp = sdbp->s_primary;
3407 		flags = f_info->flags;
3408 
3409 		rmw = (STD_LOCKING(dbc) &&
3410 		    !LF_ISSET(DB_FOREIGN_ABORT)) ? DB_RMW : 0;
3411 
3412 		/*
3413 		 * Handle CDB locking.  Some of this is copied from
3414 		 * __dbc_del_primary, but a bit more acrobatics are required.
3415 		 * If we're not going to abort, then we need to get a write
3416 		 * cursor.  If CDB_ALLDB is set, then only one write cursor is
3417 		 * allowed and we hold it, so we fudge things and promote the
3418 		 * cursor on the other DBs manually, it won't cause a problem.
3419 		 * If CDB_ALLDB is not set, then we go through the usual route
3420 		 * to make sure we block as necessary.  If there are any open
3421 		 * read cursors on sdbp, the delete or put call later will
3422 		 * block.
3423 		 *
3424 		 * If NULLIFY is set, we'll need a cursor on the primary to
3425 		 * update it with the nullified data.  Because primary and
3426 		 * secondary dbs share a lock file ID in CDB, we open a cursor
3427 		 * on the secondary and then get another writable cursor on the
3428 		 * primary via __db_cursor_int to avoid deadlocking.
3429 		 */
3430 		sdbc = pdbc = NULL;
3431 		if (!LF_ISSET(DB_FOREIGN_ABORT) && CDB_LOCKING(env) &&
3432 		    !F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
3433 			ret = __db_cursor(sdbp,
3434 			    dbc->thread_info, dbc->txn, &sdbc, DB_WRITECURSOR);
3435 			if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0) {
3436 				ret = __db_cursor_int(pdbp,
3437 				    dbc->thread_info, dbc->txn, pdbp->type,
3438 				    PGNO_INVALID, 0, dbc->locker, &pdbc);
3439 				F_SET(pdbc, DBC_WRITER);
3440 			}
3441 		} else {
3442 			ret = __db_cursor_int(sdbp, dbc->thread_info, dbc->txn,
3443 			    sdbp->type, PGNO_INVALID, 0, dbc->locker, &sdbc);
3444 			if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0)
3445 				ret = __db_cursor_int(pdbp, dbc->thread_info,
3446 				    dbc->txn, pdbp->type, PGNO_INVALID, 0,
3447 				    dbc->locker, &pdbc);
3448 			}
3449 		if (ret != 0) {
3450 			if (sdbc != NULL)
3451 				(void)__dbc_close(sdbc);
3452 			return (ret);
3453 		}
3454 		if (CDB_LOCKING(env) && F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
3455 			DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
3456 			F_SET(sdbc, DBC_WRITER);
3457 			if (LF_ISSET(DB_FOREIGN_NULLIFY) && pdbc != NULL) {
3458 				DB_ASSERT(env,
3459 				    pdbc->mylock.off == LOCK_INVALID);
3460 				F_SET(pdbc, DBC_WRITER);
3461 			}
3462 		}
3463 
3464 		/*
3465 		 * There are three actions possible when a foreign database has
3466 		 * items corresponding to a deleted item:
3467 		 * DB_FOREIGN_ABORT - The delete operation should be aborted.
3468 		 * DB_FOREIGN_CASCADE - All corresponding foreign items should
3469 		 *    be deleted.
3470 		 * DB_FOREIGN_NULLIFY - A callback needs to be made, allowing
3471 		 *    the application to modify the data DBT from the
3472 		 *    associated database.  If the callback makes a
3473 		 *    modification, the updated item needs to replace the
3474 		 *    original item in the foreign db
3475 		 */
3476 		memset(&pkey, 0, sizeof(DBT));
3477 		memset(&data, 0, sizeof(DBT));
3478 		ret = __dbc_pget(sdbc, &fkey, &pkey, &data, DB_SET|rmw);
3479 
3480 		if (ret == DB_NOTFOUND) {
3481 			/* No entry means no constraint */
3482 			ret = __dbc_close(sdbc);
3483 			if (LF_ISSET(DB_FOREIGN_NULLIFY) &&
3484 			    (t_ret = __dbc_close(pdbc)) != 0)
3485 				ret = t_ret;
3486 			if (ret != 0)
3487 				return (ret);
3488 			continue;
3489 		} else if (ret != 0) {
3490 			/* Just return the error code from the pget */
3491 			(void)__dbc_close(sdbc);
3492 			if (LF_ISSET(DB_FOREIGN_NULLIFY))
3493 				(void)__dbc_close(pdbc);
3494 			return (ret);
3495 		} else if (LF_ISSET(DB_FOREIGN_ABORT)) {
3496 			/* If the record exists and ABORT is set, we're done */
3497 			if ((ret = __dbc_close(sdbc)) != 0)
3498 				return (ret);
3499 			return (DB_FOREIGN_CONFLICT);
3500 		}
3501 
3502 		/*
3503 		 * There were matching items in the primary DB, and the action
3504 		 * is either DB_FOREIGN_CASCADE or DB_FOREIGN_NULLIFY.
3505 		 */
3506 		while (ret == 0) {
3507 			if (LF_ISSET(DB_FOREIGN_CASCADE)) {
3508 				/*
3509 				 * Don't use the DB_UPDATE_SECONDARY flag,
3510 				 * since we want the delete to cascade into the
3511 				 * secondary's primary.
3512 				 */
3513 				if ((ret = __dbc_del(sdbc, 0)) != 0) {
3514 					__db_err(env, ret, DB_STR("0698",
3515 	    "Attempt to execute cascading delete in a foreign index failed"));
3516 					break;
3517 				}
3518 			} else if (LF_ISSET(DB_FOREIGN_NULLIFY)) {
3519 				changed = 0;
3520 				if ((ret = f_info->callback(sdbp,
3521 				    &pkey, &data, &fkey, &changed)) != 0) {
3522 					__db_err(env, ret, DB_STR("0699",
3523 				    "Foreign database application callback"));
3524 					break;
3525 				}
3526 
3527 				/*
3528 				 * If the user callback modified the DBT and
3529 				 * a put on the primary failed.
3530 				 */
3531 				if (changed && (ret = __dbc_put(pdbc,
3532 				    &pkey, &data, DB_KEYFIRST)) != 0) {
3533 					__db_err(env, ret, DB_STR("0700",
3534 "Attempt to overwrite item in foreign database with nullified value failed"));
3535 					break;
3536 				}
3537 			}
3538 			/* retrieve the next matching item from the prim. db */
3539 			memset(&pkey, 0, sizeof(DBT));
3540 			memset(&data, 0, sizeof(DBT));
3541 			ret = __dbc_pget(sdbc,
3542 			    &fkey, &pkey, &data, DB_NEXT_DUP|rmw);
3543 		}
3544 
3545 		if (ret == DB_NOTFOUND)
3546 			ret = 0;
3547 		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
3548 			ret = t_ret;
3549 		if (LF_ISSET(DB_FOREIGN_NULLIFY) &&
3550 		    (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3551 			ret = t_ret;
3552 		if (ret != 0)
3553 			return (ret);
3554 	}
3555 
3556 	return (ret);
3557 }
3558 
3559 /*
3560  * __db_s_first --
3561  *	Get the first secondary, if any are present, from the primary.
3562  *
3563  * PUBLIC: int __db_s_first __P((DB *, DB **));
3564  */
3565 int
__db_s_first(pdbp,sdbpp)3566 __db_s_first(pdbp, sdbpp)
3567 	DB *pdbp, **sdbpp;
3568 {
3569 	DB *sdbp;
3570 
3571 	MUTEX_LOCK(pdbp->env, pdbp->mutex);
3572 	sdbp = LIST_FIRST(&pdbp->s_secondaries);
3573 
3574 	/* See __db_s_next. */
3575 	if (sdbp != NULL)
3576 		sdbp->s_refcnt++;
3577 	MUTEX_UNLOCK(pdbp->env, pdbp->mutex);
3578 
3579 	*sdbpp = sdbp;
3580 
3581 	return (0);
3582 }
3583 
3584 /*
3585  * __db_s_next --
3586  *	Get the next secondary in the list.
3587  *
3588  * PUBLIC: int __db_s_next __P((DB **, DB_TXN *));
3589  */
3590 int
__db_s_next(sdbpp,txn)3591 __db_s_next(sdbpp, txn)
3592 	DB **sdbpp;
3593 	DB_TXN *txn;
3594 {
3595 	DB *sdbp, *pdbp, *closeme;
3596 	ENV *env;
3597 	int ret;
3598 
3599 	/*
3600 	 * Secondary indices are kept in a linked list, s_secondaries,
3601 	 * off each primary DB handle.  If a primary is free-threaded,
3602 	 * this list may only be traversed or modified while the primary's
3603 	 * thread mutex is held.
3604 	 *
3605 	 * The tricky part is that we don't want to hold the thread mutex
3606 	 * across the full set of secondary puts necessary for each primary
3607 	 * put, or we'll wind up essentially single-threading all the puts
3608 	 * to the handle;  the secondary puts will each take about as
3609 	 * long as the primary does, and may require I/O.  So we instead
3610 	 * hold the thread mutex only long enough to follow one link to the
3611 	 * next secondary, and then we release it before performing the
3612 	 * actual secondary put.
3613 	 *
3614 	 * The only danger here is that we might legitimately close a
3615 	 * secondary index in one thread while another thread is performing
3616 	 * a put and trying to update that same secondary index.  To
3617 	 * prevent this from happening, we refcount the secondary handles.
3618 	 * If close is called on a secondary index handle while we're putting
3619 	 * to it, it won't really be closed--the refcount will simply drop,
3620 	 * and we'll be responsible for closing it here.
3621 	 */
3622 	sdbp = *sdbpp;
3623 	pdbp = sdbp->s_primary;
3624 	env = pdbp->env;
3625 	closeme = NULL;
3626 
3627 	MUTEX_LOCK(env, pdbp->mutex);
3628 	DB_ASSERT(env, sdbp->s_refcnt != 0);
3629 	if (--sdbp->s_refcnt == 0) {
3630 		LIST_REMOVE(sdbp, s_links);
3631 		closeme = sdbp;
3632 	}
3633 	sdbp = LIST_NEXT(sdbp, s_links);
3634 	if (sdbp != NULL)
3635 		sdbp->s_refcnt++;
3636 	MUTEX_UNLOCK(env, pdbp->mutex);
3637 
3638 	*sdbpp = sdbp;
3639 
3640 	/*
3641 	 * closeme->close() is a wrapper;  call __db_close explicitly.
3642 	 */
3643 	if (closeme == NULL)
3644 		ret = 0;
3645 	else
3646 		ret = __db_close(closeme, txn, 0);
3647 
3648 	return (ret);
3649 }
3650 
3651 /*
3652  * __db_s_done --
3653  *	Properly decrement the refcount on a secondary database handle we're
3654  *	using, without calling __db_s_next.
3655  *
3656  * PUBLIC: int __db_s_done __P((DB *, DB_TXN *));
3657  */
3658 int
__db_s_done(sdbp,txn)3659 __db_s_done(sdbp, txn)
3660 	DB *sdbp;
3661 	DB_TXN *txn;
3662 {
3663 	DB *pdbp;
3664 	ENV *env;
3665 	int doclose, ret;
3666 
3667 	pdbp = sdbp->s_primary;
3668 	env = pdbp->env;
3669 	doclose = 0;
3670 
3671 	MUTEX_LOCK(env, pdbp->mutex);
3672 	DB_ASSERT(env, sdbp->s_refcnt != 0);
3673 	if (--sdbp->s_refcnt == 0) {
3674 		LIST_REMOVE(sdbp, s_links);
3675 		doclose = 1;
3676 	}
3677 	MUTEX_UNLOCK(env, pdbp->mutex);
3678 
3679 	if (doclose == 0)
3680 		ret = 0;
3681 	else
3682 		ret = __db_close(sdbp, txn, 0);
3683 	return (ret);
3684 }
3685 
3686 /*
3687  * __db_s_count --
3688  *	Count the number of secondaries associated with a given primary.
3689  */
3690 static int
__db_s_count(pdbp)3691 __db_s_count(pdbp)
3692 	DB *pdbp;
3693 {
3694 	DB *sdbp;
3695 	ENV *env;
3696 	int count;
3697 
3698 	env = pdbp->env;
3699 	count = 0;
3700 
3701 	MUTEX_LOCK(env, pdbp->mutex);
3702 	for (sdbp = LIST_FIRST(&pdbp->s_secondaries);
3703 	    sdbp != NULL;
3704 	    sdbp = LIST_NEXT(sdbp, s_links))
3705 		++count;
3706 	MUTEX_UNLOCK(env, pdbp->mutex);
3707 
3708 	return (count);
3709 }
3710 
3711 /*
3712  * __db_buildpartial --
3713  *	Build the record that will result after a partial put is applied to
3714  *	an existing record.
3715  *
3716  *	This should probably be merged with __bam_build, but that requires
3717  *	a little trickery if we plan to keep the overflow-record optimization
3718  *	in that function.
3719  *
3720  * PUBLIC: int __db_buildpartial __P((DB *, DBT *, DBT *, DBT *));
3721  */
3722 int
__db_buildpartial(dbp,oldrec,partial,newrec)3723 __db_buildpartial(dbp, oldrec, partial, newrec)
3724 	DB *dbp;
3725 	DBT *oldrec, *partial, *newrec;
3726 {
3727 	ENV *env;
3728 	u_int32_t len, nbytes;
3729 	u_int8_t *buf;
3730 	int ret;
3731 
3732 	env = dbp->env;
3733 
3734 	DB_ASSERT(env, F_ISSET(partial, DB_DBT_PARTIAL));
3735 
3736 	memset(newrec, 0, sizeof(DBT));
3737 
3738 	nbytes = __db_partsize(oldrec->size, partial);
3739 	newrec->size = nbytes;
3740 
3741 	if ((ret = __os_malloc(env, nbytes, &buf)) != 0)
3742 		return (ret);
3743 	newrec->data = buf;
3744 
3745 	/* Nul or pad out the buffer, for any part that isn't specified. */
3746 	memset(buf,
3747 	    F_ISSET(dbp, DB_AM_FIXEDLEN) ? ((BTREE *)dbp->bt_internal)->re_pad :
3748 	    0, nbytes);
3749 
3750 	/* Copy in any leading data from the original record. */
3751 	memcpy(buf, oldrec->data,
3752 	    partial->doff > oldrec->size ? oldrec->size : partial->doff);
3753 
3754 	/* Copy the data from partial. */
3755 	memcpy(buf + partial->doff, partial->data, partial->size);
3756 
3757 	/* Copy any trailing data from the original record. */
3758 	len = partial->doff + partial->dlen;
3759 	if (oldrec->size > len)
3760 		memcpy(buf + partial->doff + partial->size,
3761 		    (u_int8_t *)oldrec->data + len, oldrec->size - len);
3762 
3763 	return (0);
3764 }
3765 
3766 /*
3767  * __db_partsize --
3768  *	Given the number of bytes in an existing record and a DBT that
3769  *	is about to be partial-put, calculate the size of the record
3770  *	after the put.
3771  *
3772  *	This code is called from __bam_partsize.
3773  *
3774  * PUBLIC: u_int32_t __db_partsize __P((u_int32_t, DBT *));
3775  */
3776 u_int32_t
__db_partsize(nbytes,data)3777 __db_partsize(nbytes, data)
3778 	u_int32_t nbytes;
3779 	DBT *data;
3780 {
3781 
3782 	/*
3783 	 * There are really two cases here:
3784 	 *
3785 	 * Case 1: We are replacing some bytes that do not exist (i.e., they
3786 	 * are past the end of the record).  In this case the number of bytes
3787 	 * we are replacing is irrelevant and all we care about is how many
3788 	 * bytes we are going to add from offset.  So, the new record length
3789 	 * is going to be the size of the new bytes (size) plus wherever those
3790 	 * new bytes begin (doff).
3791 	 *
3792 	 * Case 2: All the bytes we are replacing exist.  Therefore, the new
3793 	 * size is the oldsize (nbytes) minus the bytes we are replacing (dlen)
3794 	 * plus the bytes we are adding (size).
3795 	 */
3796 	if (nbytes < data->doff + data->dlen)		/* Case 1 */
3797 		return (data->doff + data->size);
3798 
3799 	return (nbytes + data->size - data->dlen);	/* Case 2 */
3800 }
3801 
3802 #ifdef DIAGNOSTIC
3803 /*
3804  * __db_check_skeyset --
3805  *	Diagnostic check that the application's callback returns a set of
3806  *	secondary keys without repeats.
3807  *
3808  * PUBLIC: #ifdef DIAGNOSTIC
3809  * PUBLIC: void __db_check_skeyset __P((DB *, DBT *));
3810  * PUBLIC: #endif
3811  */
3812 void
__db_check_skeyset(sdbp,skeyp)3813 __db_check_skeyset(sdbp, skeyp)
3814 	DB *sdbp;
3815 	DBT *skeyp;
3816 {
3817 	DBT *first_key, *last_key, *key1, *key2;
3818 	ENV *env;
3819 
3820 	env = sdbp->env;
3821 
3822 	first_key = (DBT *)skeyp->data;
3823 	last_key = first_key + skeyp->size;
3824 	for (key1 = first_key; key1 < last_key; key1++)
3825 		for (key2 = key1 + 1; key2 < last_key; key2++)
3826 			DB_ASSERT(env,
3827 			    ((BTREE *)sdbp->bt_internal)->bt_compare(sdbp,
3828 			    key1, key2, NULL) != 0);
3829 }
3830 #endif
3831 
3832 #ifdef HAVE_ERROR_HISTORY
3833 /*
3834  * __dbc_diags
3835  *	Save the context which triggers the "first notice" of an error code;
3836  *	i.e., its creation. It doesn't touch anything when err == 0.
3837  *
3838  * PUBLIC: int __dbc_diags __P((DBC *, int));
3839  */
3840  int
__dbc_diags(dbc,err)3841  __dbc_diags(dbc, err)
3842 	DBC *dbc;
3843 	int err;
3844 {
3845 	DB_MSGBUF *mb;
3846 
3847 	if (err != 0 && dbc->env != NULL &&
3848 	    (mb = __db_deferred_get()) != NULL) {
3849 		(void)__db_remember_context(dbc->env, mb, err);
3850 #ifdef HAVE_SLICES
3851 		if (dbc->env->slice_container != NULL)
3852 			__db_msgadd(dbc->env, mb, "slice %d: ",
3853 			    dbc->env->slice_index);
3854 #endif
3855 		__db_msgadd(dbc->env, mb, "DB: %s:%s\n" ,
3856 			dbc->dbp->fname == NULL ? "in-mem" : dbc->dbp->fname,
3857 			dbc->dbp->dname == NULL ? "" : dbc->dbp->fname);
3858 	}
3859 	return (err);
3860 }
3861 #endif
3862