1 /*-
2  * Copyright (c) 1998, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/hash.h"
15 #include "dbinc/heap.h"
16 #include "dbinc/lock.h"
17 #include "dbinc/mp.h"
18 #include "dbinc/partition.h"
19 #include "dbinc/qam.h"
20 #include "dbinc/txn.h"
21 
22 static int __dbc_set_priority __P((DBC *, DB_CACHE_PRIORITY));
23 static int __dbc_get_priority __P((DBC *, DB_CACHE_PRIORITY* ));
24 
25 /*
26  * __db_cursor_int --
27  *	Internal routine to create a cursor.
28  *
29  * PUBLIC: int __db_cursor_int __P((DB *, DB_THREAD_INFO *,
30  * PUBLIC:     DB_TXN *, DBTYPE, db_pgno_t, int, DB_LOCKER *, DBC **));
31  */
32 int
__db_cursor_int(dbp,ip,txn,dbtype,root,flags,locker,dbcp)33 __db_cursor_int(dbp, ip, txn, dbtype, root, flags, locker, dbcp)
34 	DB *dbp;
35 	DB_THREAD_INFO *ip;
36 	DB_TXN *txn;
37 	DBTYPE dbtype;
38 	db_pgno_t root;
39 	int flags;
40 	DB_LOCKER *locker;
41 	DBC **dbcp;
42 {
43 	DBC *dbc;
44 	DBC_INTERNAL *cp;
45 	DB_LOCKREQ req;
46 	ENV *env;
47 	db_threadid_t tid;
48 	int allocated, envlid, ret;
49 	pid_t pid;
50 
51 	env = dbp->env;
52 	allocated = envlid = 0;
53 
54 	/*
55 	 * If dbcp is non-NULL it is assumed to point to an area to initialize
56 	 * as a cursor.
57 	 *
58 	 * Take one from the free list if it's available.  Take only the
59 	 * right type.  With off page dups we may have different kinds
60 	 * of cursors on the queue for a single database.
61 	 */
62 	MUTEX_LOCK(env, dbp->mutex);
63 
64 #ifndef HAVE_NO_DB_REFCOUNT
65 	/*
66 	 * If this DBP is being logged then refcount the log filename
67 	 * relative to this transaction. We do this here because we have
68 	 * the dbp->mutex which protects the refcount.  We want to avoid
69 	 * calling the function if the transaction handle has a shared parent
70 	 * locker or we are duplicating a cursor.  This includes the case of
71 	 * creating an off page duplicate cursor.
72 	 * If we knew this cursor will not be used in an update, we could avoid
73 	 * this, but we don't have that information.
74 	 */
75 	if (IS_REAL_TXN(txn) &&
76 	    !LF_ISSET(DBC_OPD | DBC_DUPLICATE) &&
77 	    !F_ISSET(dbp, DB_AM_RECOVER) &&
78 	    dbp->log_filename != NULL && !IS_REP_CLIENT(env) &&
79 	    (ret = __txn_record_fname(env, txn, dbp->log_filename)) != 0) {
80 		MUTEX_UNLOCK(env, dbp->mutex);
81 		return (ret);
82 	}
83 
84 #endif
85 
86 	TAILQ_FOREACH(dbc, &dbp->free_queue, links)
87 		if (dbtype == dbc->dbtype) {
88 			TAILQ_REMOVE(&dbp->free_queue, dbc, links);
89 			F_CLR(dbc, ~DBC_OWN_LID);
90 			break;
91 		}
92 	MUTEX_UNLOCK(env, dbp->mutex);
93 
94 	if (dbc == NULL) {
95 		if ((ret = __os_calloc(env, 1, sizeof(DBC), &dbc)) != 0)
96 			return (ret);
97 		allocated = 1;
98 		dbc->flags = 0;
99 
100 		dbc->dbp = dbp;
101 		dbc->dbenv = dbp->dbenv;
102 		dbc->env = dbp->env;
103 
104 		/* Set up locking information. */
105 		if (LOCKING_ON(env)) {
106 			/*
107 			 * If we are not threaded, we share a locker ID among
108 			 * all cursors opened in the environment handle,
109 			 * allocating one if this is the first cursor.
110 			 *
111 			 * This relies on the fact that non-threaded DB handles
112 			 * always have non-threaded environment handles, since
113 			 * we set DB_THREAD on DB handles created with threaded
114 			 * environment handles.
115 			 */
116 			if (!DB_IS_THREADED(dbp)) {
117 				if (env->env_lref == NULL) {
118 					if ((ret = __lock_id(env,
119 					    NULL, &env->env_lref)) != 0)
120 						goto err;
121 				       envlid = 1;
122 				}
123 				dbc->lref = env->env_lref;
124 			}
125 
126 			/*
127 			 * In CDB, secondary indices should share a lock file
128 			 * ID with the primary;  otherwise we're susceptible
129 			 * to deadlocks.  We also use __db_cursor_int rather
130 			 * than __db_cursor to create secondary update cursors
131 			 * in c_put and c_del; these won't acquire a new lock.
132 			 *
133 			 * !!!
134 			 * Since this is in the one-time cursor allocation
135 			 * code, we need to be sure to destroy, not just
136 			 * close, all cursors in the secondary when we
137 			 * associate.
138 			 */
139 			if (CDB_LOCKING(env) &&
140 			    F_ISSET(dbp, DB_AM_SECONDARY))
141 				memcpy(dbc->lock.fileid,
142 				    dbp->s_primary->fileid, DB_FILE_ID_LEN);
143 			else
144 				memcpy(dbc->lock.fileid,
145 				    dbp->fileid, DB_FILE_ID_LEN);
146 
147 			if (CDB_LOCKING(env)) {
148 				if (F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
149 					/*
150 					 * If we are doing a single lock per
151 					 * environment, set up the global
152 					 * lock object just like we do to
153 					 * single thread creates.
154 					 */
155 					DB_ASSERT(env, sizeof(db_pgno_t) ==
156 					    sizeof(u_int32_t));
157 					dbc->lock_dbt.size = sizeof(u_int32_t);
158 					dbc->lock_dbt.data = &dbc->lock.pgno;
159 					dbc->lock.pgno = 0;
160 				} else {
161 					dbc->lock_dbt.size = DB_FILE_ID_LEN;
162 					dbc->lock_dbt.data = dbc->lock.fileid;
163 				}
164 			} else {
165 				dbc->lock.type = DB_PAGE_LOCK;
166 				dbc->lock_dbt.size = sizeof(dbc->lock);
167 				dbc->lock_dbt.data = &dbc->lock;
168 			}
169 		}
170 		/* Init the DBC internal structure. */
171 #ifdef HAVE_PARTITION
172 		if (DB_IS_PARTITIONED(dbp)) {
173 			if ((ret = __partc_init(dbc)) != 0)
174 				goto err;
175 		} else
176 #endif
177 		switch (dbtype) {
178 		case DB_BTREE:
179 		case DB_RECNO:
180 			if ((ret = __bamc_init(dbc, dbtype)) != 0)
181 				goto err;
182 			break;
183 		case DB_HASH:
184 			if ((ret = __hamc_init(dbc)) != 0)
185 				goto err;
186 			break;
187 		case DB_HEAP:
188 			if ((ret = __heapc_init(dbc)) != 0)
189 				goto err;
190 			break;
191 		case DB_QUEUE:
192 			if ((ret = __qamc_init(dbc)) != 0)
193 				goto err;
194 			break;
195 		case DB_UNKNOWN:
196 		default:
197 			ret = __db_unknown_type(env, "DB->cursor", dbtype);
198 			goto err;
199 		}
200 
201 		cp = dbc->internal;
202 	}
203 
204 	/* Refresh the DBC structure. */
205 	dbc->dbtype = dbtype;
206 	RESET_RET_MEM(dbc);
207 	dbc->db_stream = __dbc_db_stream;
208 	dbc->set_priority = __dbc_set_priority;
209 	dbc->get_priority = __dbc_get_priority;
210 	dbc->priority = dbp->priority;
211 	dbc->txn_cursors.tqe_next = NULL;
212 	dbc->txn_cursors.tqe_prev = NULL;
213 
214 	/*
215 	 * If the DB handle is not threaded, there is one locker ID for the
216 	 * whole environment.  There should only one family transaction active
217 	 * as well.  This doesn't apply to CDS group transactions, where the
218 	 * cursor can simply use the transaction's locker directly.
219 	 */
220 	if (!CDB_LOCKING(env) && txn != NULL && F_ISSET(txn, TXN_FAMILY) &&
221 	    (F_ISSET(dbc, DBC_OWN_LID) || dbc->lref == NULL || envlid))  {
222 		if (LOCKING_ON(env)) {
223 			if (dbc->lref == NULL) {
224 				if ((ret =
225 				    __lock_id(env, NULL, &dbc->lref)) != 0)
226 					goto err;
227 				F_SET(dbc, DBC_OWN_LID);
228 			}
229 			if ((ret = __lock_addfamilylocker(env,
230 			    txn->txnid, dbc->lref->id, 1)) != 0)
231 				goto err;
232 		}
233 		F_SET(dbc, DBC_FAMILY);
234 		txn = NULL;
235 	}
236 
237 	if ((dbc->txn = txn) != NULL)
238 		dbc->locker = txn->locker;
239 	else if (LOCKING_ON(env)) {
240 		/*
241 		 * There are certain cases in which we want to create a
242 		 * new cursor with a particular locker ID that is known
243 		 * to be the same as (and thus not conflict with) an
244 		 * open cursor.
245 		 *
246 		 * The most obvious case is cursor duplication;  when we
247 		 * call DBC->dup or __dbc_idup, we want to use the original
248 		 * cursor's locker ID.
249 		 *
250 		 * Another case is when updating secondary indices.  Standard
251 		 * CDB locking would mean that we might block ourself:  we need
252 		 * to open an update cursor in the secondary while an update
253 		 * cursor in the primary is open, and when the secondary and
254 		 * primary are subdatabases or we're using env-wide locking,
255 		 * this is disastrous.
256 		 *
257 		 * In these cases, our caller will pass a nonzero locker
258 		 * ID into this function.  Use this locker ID instead of
259 		 * the default as the locker ID for our new cursor.
260 		 */
261 		if (locker != NULL)
262 			dbc->locker = locker;
263 		else if (LF_ISSET(DB_RECOVER))
264 			dbc->locker = NULL;
265 		else {
266 			if (dbc->lref == NULL) {
267 				if ((ret =
268 				    __lock_id(env, NULL, &dbc->lref)) != 0)
269 					goto err;
270 				F_SET(dbc, DBC_OWN_LID);
271 			}
272 			/*
273 			 * If we are threaded then we need to set the
274 			 * proper thread id into the locker.
275 			 */
276 			if (DB_IS_THREADED(dbp)) {
277 				env->dbenv->thread_id(env->dbenv, &pid, &tid);
278 				__lock_set_thread_id(dbc->lref, pid, tid);
279 			}
280 			dbc->locker = dbc->lref;
281 		}
282 	}
283 
284 	/*
285 	 * These fields change when we are used as a secondary index, so
286 	 * if the DB is a secondary, make sure they're set properly just
287 	 * in case we opened some cursors before we were associated.
288 	 *
289 	 * __dbc_get is used by all access methods, so this should be safe.
290 	 */
291 	if (F_ISSET(dbp, DB_AM_SECONDARY))
292 		dbc->get = dbc->c_get = __dbc_secondary_get_pp;
293 
294 	/*
295 	 * Don't enable bulk for btrees with record numbering, since avoiding
296 	 * a full search avoids taking write locks necessary to maintain
297 	 * consistent numbering.
298 	 */
299 	if (LF_ISSET(DB_CURSOR_BULK) && dbtype == DB_BTREE &&
300 	    !F_ISSET(dbp, DB_AM_RECNUM))
301 		F_SET(dbc, DBC_BULK);
302 	if (LF_ISSET(DB_CURSOR_TRANSIENT))
303 		F_SET(dbc, DBC_TRANSIENT);
304 	if (LF_ISSET(DBC_OPD))
305 		F_SET(dbc, DBC_OPD);
306 	if (F_ISSET(dbp, DB_AM_RECOVER) || LF_ISSET(DB_RECOVER))
307 		F_SET(dbc, DBC_RECOVER);
308 	if (F_ISSET(dbp, DB_AM_COMPENSATE))
309 		F_SET(dbc, DBC_DONTLOCK);
310 	/*
311 	* If this database is exclusive then the cursor
312 	* does not need to get locks.
313 	*/
314 	if (F2_ISSET(dbp, DB2_AM_EXCL)) {
315 		F_SET(dbc, DBC_DONTLOCK);
316 		if (IS_REAL_TXN(txn)&& !LF_ISSET(DBC_OPD | DBC_DUPLICATE)) {
317 			/*
318 			 * Exclusive databases can only have one active
319 			 * transaction at a time since there are no internal
320 			 * locks to prevent one transaction from reading and
321 			 * writing another's uncommitted changes.
322 			 */
323 			if (dbp->cur_txn != NULL && dbp->cur_txn != txn) {
324 				ret = USR_ERR(env, EINVAL);
325 			    __db_errx(env, DB_STR("0749",
326 "Exclusive database handles can only have one active transaction at a time."));
327 				goto err;
328 			}
329 			/* Do not trade a second time. */
330 			if (dbp->cur_txn != txn) {
331 				/* Trade the handle lock to the txn locker. */
332 				memset(&req, 0, sizeof(req));
333 				req.lock = dbp->handle_lock;
334 				req.op = DB_LOCK_TRADE;
335 				if ((ret = __lock_vec(env, txn->locker, 0,
336 				    &req, 1, 0)) != 0)
337 					goto err;
338 				dbp->cur_txn = txn;
339 				dbp->cur_locker = txn->locker;
340 				if ((ret = __txn_lockevent(env, txn, dbp,
341 				    &dbp->handle_lock, dbp->locker)) != 0)
342 					goto err;
343 			}
344 		}
345 	}
346 #ifdef HAVE_REPLICATION
347 	/*
348 	 * If we are replicating from a down rev version then we must
349 	 * use old locking protocols.
350 	 */
351 	if (LOGGING_ON(env) &&
352 	     ((LOG *)env->lg_handle->
353 	     reginfo.primary)->persist.version < DB_LOGVERSION_LATCHING)
354 		F_SET(dbc, DBC_DOWNREV);
355 #endif
356 
357 	/* Refresh the DBC internal structure. */
358 	cp = dbc->internal;
359 	cp->opd = NULL;
360 	cp->pdbc = NULL;
361 
362 	cp->indx = 0;
363 	cp->page = NULL;
364 	cp->pgno = PGNO_INVALID;
365 	cp->root = root;
366 	cp->stream_start_pgno = cp->stream_curr_pgno = PGNO_INVALID;
367 	cp->stream_off = 0;
368 
369 	if (DB_IS_PARTITIONED(dbp)) {
370 		DBC_PART_REFRESH(dbc);
371 	} else switch (dbtype) {
372 	case DB_BTREE:
373 	case DB_RECNO:
374 		if ((ret = __bamc_refresh(dbc)) != 0)
375 			goto err;
376 		break;
377 	case DB_HEAP:
378 		if ((ret = __heapc_refresh(dbc)) != 0)
379 			goto err;
380 		break;
381 	case DB_HASH:
382 	case DB_QUEUE:
383 		break;
384 	case DB_UNKNOWN:
385 	default:
386 		ret = __db_unknown_type(env, "DB->cursor", dbp->type);
387 		goto err;
388 	}
389 
390 	/*
391 	 * The transaction keeps track of how many cursors were opened within
392 	 * it to catch application errors where the cursor isn't closed when
393 	 * the transaction is resolved.
394 	 */
395 	if (txn != NULL)
396 		++txn->cursors;
397 	if (ip != NULL) {
398 		dbc->thread_info = ip;
399 #ifdef DIAGNOSTIC
400 		if (dbc->locker != NULL) {
401 			dbc->locker->prev_locker = ip->dbth_locker;
402 			ip->dbth_locker =
403 			    R_OFFSET(&(env->lk_handle->reginfo), dbc->locker);
404 		} else
405 			ip->dbth_locker = INVALID_ROFF;
406 #endif
407 	} else if (txn != NULL)
408 		dbc->thread_info = txn->thread_info;
409 	else
410 		ENV_GET_THREAD_INFO(env, dbc->thread_info);
411 
412 	MUTEX_LOCK(env, dbp->mutex);
413 	TAILQ_INSERT_TAIL(&dbp->active_queue, dbc, links);
414 	F_SET(dbc, DBC_ACTIVE);
415 	MUTEX_UNLOCK(env, dbp->mutex);
416 
417 	*dbcp = dbc;
418 	return (0);
419 
420 err:	if (allocated)
421 		__os_free(env, dbc);
422 	return (ret);
423 }
424 
425 /*
426  * __db_put --
427  *	Store a key/data pair.
428  *
429  * PUBLIC: int __db_put __P((DB *,
430  * PUBLIC:      DB_THREAD_INFO *, DB_TXN *, DBT *, DBT *, u_int32_t));
431  */
432 int
__db_put(dbp,ip,txn,key,data,flags)433 __db_put(dbp, ip, txn, key, data, flags)
434 	DB *dbp;
435 	DB_THREAD_INFO *ip;
436 	DB_TXN *txn;
437 	DBT *key, *data;
438 	u_int32_t flags;
439 {
440 	DB_HEAP_RID rid;
441 	DBC *dbc;
442 	DBT tdata, tkey;
443 	ENV *env;
444 	void *bulk_kptr, *bulk_ptr;
445 	db_recno_t recno;
446 	u_int32_t cursor_flags;
447 	int ret, t_ret;
448 
449 	env = dbp->env;
450 
451 	/*
452 	 * See the comment in __db_get() regarding DB_CURSOR_TRANSIENT.
453 	 *
454 	 * Note that the get in the DB_NOOVERWRITE case is safe to do with this
455 	 * flag set;  if it errors in any way other than DB_NOTFOUND, we're
456 	 * going to close the cursor without doing anything else, and if it
457 	 * returns DB_NOTFOUND then it's safe to do a c_put(DB_KEYLAST) even if
458 	 * an access method moved the cursor, since that's not
459 	 * position-dependent.
460 	 */
461 	cursor_flags = DB_WRITELOCK;
462 	if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY))
463 		cursor_flags |= DB_CURSOR_BULK;
464 	else
465 		cursor_flags |= DB_CURSOR_TRANSIENT;
466 	if ((ret = __db_cursor(dbp, ip, txn, &dbc, cursor_flags)) != 0)
467 		return (ret);
468 
469 	DEBUG_LWRITE(dbc, txn, "DB->put", key, data, flags);
470 	PERFMON6(env, db, put, dbp->fname,
471 	    dbp->dname, txn == NULL ? 0 : txn->txnid, key, data, flags);
472 
473 	SET_RET_MEM(dbc, dbp);
474 
475 	if (flags == DB_APPEND && !DB_IS_PRIMARY(dbp)) {
476 		/*
477 		 * If there is an append callback, the value stored in
478 		 * data->data may be replaced and then freed.  To avoid
479 		 * passing a freed pointer back to the user, just operate
480 		 * on a copy of the data DBT.
481 		 */
482 		tdata = *data;
483 
484 		/*
485 		 * Append isn't a normal put operation;  call the appropriate
486 		 * access method's append function.
487 		 */
488 		switch (dbp->type) {
489 		case DB_HEAP:
490 			if ((ret = __heap_append(dbc, key, &tdata)) != 0)
491 				goto err;
492 			break;
493 		case DB_QUEUE:
494 			if ((ret = __qam_append(dbc, key, &tdata)) != 0)
495 				goto err;
496 			break;
497 		case DB_RECNO:
498 			if ((ret = __ram_append(dbc, key, &tdata)) != 0)
499 				goto err;
500 			break;
501 		case DB_BTREE:
502 		case DB_HASH:
503 		case DB_UNKNOWN:
504 		default:
505 			/* The interface should prevent this. */
506 			DB_ASSERT(env,
507 			    dbp->type == DB_QUEUE || dbp->type == DB_RECNO);
508 
509 			ret = __db_ferr(env, "DB->put", 0);
510 			goto err;
511 		}
512 
513 		/*
514 		 * The append callback, if one exists, may have allocated
515 		 * a new tdata.data buffer.  If so, free it.
516 		 */
517 		FREE_IF_NEEDED(env, &tdata);
518 
519 		/* No need for a cursor put;  we're done. */
520 #ifdef HAVE_COMPRESSION
521 	} else if (DB_IS_COMPRESSED(dbp) && !F_ISSET(dbp, DB_AM_SECONDARY) &&
522 	    !DB_IS_PRIMARY(dbp) && LIST_FIRST(&dbp->f_primaries) == NULL) {
523 		ret = __dbc_put(dbc, key, data, flags);
524 #endif
525 	} else if (LF_ISSET(DB_MULTIPLE)) {
526 		ret = 0;
527 		memset(&tkey, 0, sizeof(tkey));
528 		if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
529 			tkey.data = &recno;
530 			tkey.size = sizeof(recno);
531 		}
532 		memset(&tdata, 0, sizeof(tdata));
533 		DB_MULTIPLE_INIT(bulk_kptr, key);
534 		DB_MULTIPLE_INIT(bulk_ptr, data);
535 		key->doff = 0;
536 		while (ret == 0) {
537 			if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
538 				DB_MULTIPLE_RECNO_NEXT(bulk_kptr, key,
539 				    recno, tdata.data, tdata.size);
540 			else
541 				DB_MULTIPLE_NEXT(bulk_kptr, key,
542 				    tkey.data, tkey.size);
543 			DB_MULTIPLE_NEXT(bulk_ptr, data,
544 			    tdata.data, tdata.size);
545 			if (bulk_kptr == NULL || bulk_ptr == NULL)
546 				break;
547 			if (dbp->type == DB_HEAP) {
548 				memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
549 				tkey.data = &rid;
550 			}
551 			ret = __dbc_put(dbc, &tkey, &tdata,
552 			    LF_ISSET(DB_OPFLAGS_MASK));
553 			if (ret == 0)
554 				++key->doff;
555 		}
556 	} else if (LF_ISSET(DB_MULTIPLE_KEY)) {
557 		ret = 0;
558 		memset(&tkey, 0, sizeof(tkey));
559 		if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
560 			tkey.data = &recno;
561 			tkey.size = sizeof(recno);
562 		}
563 		memset(&tdata, 0, sizeof(tdata));
564 		DB_MULTIPLE_INIT(bulk_ptr, key);
565 		while (ret == 0) {
566 			if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
567 				DB_MULTIPLE_RECNO_NEXT(bulk_ptr, key, recno,
568 				    tdata.data, tdata.size);
569 			else
570 				DB_MULTIPLE_KEY_NEXT(bulk_ptr, key, tkey.data,
571 				    tkey.size, tdata.data, tdata.size);
572 			if (bulk_ptr == NULL)
573 				break;
574 			if (dbp->type == DB_HEAP) {
575 				memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
576 				tkey.data = &rid;
577 			}
578 			ret = __dbc_put(dbc, &tkey, &tdata,
579 			    LF_ISSET(DB_OPFLAGS_MASK));
580 			if (ret == 0)
581 				++key->doff;
582 		}
583 	} else
584 		ret = __dbc_put(dbc, key, data, flags);
585 
586 err:	/* Close the cursor. */
587 	if (!DB_RETOK_DBPUT(ret))
588 		F_SET(dbc, DBC_ERROR);
589 	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
590 		ret = t_ret;
591 
592 	return (ret);
593 }
594 
595 /*
596  * __db_del --
597  *	Delete the items referenced by a key.
598  *
599  * PUBLIC: int __db_del __P((DB *,
600  * PUBLIC:      DB_THREAD_INFO *, DB_TXN *, DBT *, u_int32_t));
601  */
602 int
__db_del(dbp,ip,txn,key,flags)603 __db_del(dbp, ip, txn, key, flags)
604 	DB *dbp;
605 	DB_THREAD_INFO *ip;
606 	DB_TXN *txn;
607 	DBT *key;
608 	u_int32_t flags;
609 {
610 	DB_HEAP_RID rid;
611 	DBC *dbc;
612 	DBT data, tkey;
613 	void *bulk_ptr;
614 	db_recno_t recno;
615 	u_int32_t cursor_flags, f_init, f_next;
616 	int ret, t_ret;
617 
618 	COMPQUIET(bulk_ptr, NULL);
619 	/* Allocate a cursor. */
620 	cursor_flags = DB_WRITELOCK;
621 	if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY))
622 		cursor_flags |= DB_CURSOR_BULK;
623 	if ((ret = __db_cursor(dbp, ip, txn, &dbc, cursor_flags)) != 0)
624 		return (ret);
625 
626 	DEBUG_LWRITE(dbc, txn, "DB->del", key, NULL, flags);
627 	PERFMON5(env, db, del,
628 	    dbp->fname, dbp->dname, txn == NULL ? 0 : txn->txnid, key, flags);
629 
630 #ifdef HAVE_COMPRESSION
631 	if (DB_IS_COMPRESSED(dbp) && !F_ISSET(dbp, DB_AM_SECONDARY) &&
632 	    !DB_IS_PRIMARY(dbp) && LIST_FIRST(&dbp->f_primaries) == NULL) {
633 		F_SET(dbc, DBC_TRANSIENT);
634 		ret = __dbc_bulk_del(dbc, key, flags);
635 		goto err;
636 	}
637 #endif
638 
639 	/*
640 	 * Walk a cursor through the key/data pairs, deleting as we go.  Set
641 	 * the DB_DBT_USERMEM flag, as this might be a threaded application
642 	 * and the flags checking will catch us.  We don't actually want the
643 	 * keys or data, set DB_DBT_ISSET.  We rely on __dbc_get to clear
644 	 * this.
645 	 */
646 	memset(&data, 0, sizeof(data));
647 	F_SET(&data, DB_DBT_USERMEM);
648 	tkey = *key;
649 
650 	f_init = LF_ISSET(DB_MULTIPLE_KEY) ? DB_GET_BOTH : DB_SET;
651 	f_next = DB_NEXT_DUP;
652 
653 	/*
654 	 * If locking (and we haven't already acquired CDB locks), set the
655 	 * read-modify-write flag.
656 	 */
657 	if (STD_LOCKING(dbc)) {
658 		f_init |= DB_RMW;
659 		f_next |= DB_RMW;
660 	}
661 
662 	if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY)) {
663 		if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
664 			memset(&tkey, 0, sizeof(tkey));
665 			tkey.data = &recno;
666 			tkey.size = sizeof(recno);
667 		}
668 		DB_MULTIPLE_INIT(bulk_ptr, key);
669 		/* We return the number of keys deleted in doff. */
670 		key->doff = 0;
671 bulk_next:	if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
672 			DB_MULTIPLE_RECNO_NEXT(bulk_ptr, key,
673 			    recno, data.data, data.size);
674 		else if (LF_ISSET(DB_MULTIPLE))
675 			DB_MULTIPLE_NEXT(bulk_ptr, key, tkey.data, tkey.size);
676 		else
677 			DB_MULTIPLE_KEY_NEXT(bulk_ptr, key,
678 			    tkey.data, tkey.size, data.data, data.size);
679 		if (bulk_ptr == NULL)
680 			goto err;
681 		if (dbp->type == DB_HEAP) {
682 			memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
683 			tkey.data = &rid;
684 		}
685 
686 	}
687 
688 	/* We're not interested in the data -- do not return it. */
689 	F_SET(&tkey, DB_DBT_ISSET);
690 	F_SET(&data, DB_DBT_ISSET);
691 
692 	/*
693 	 * Optimize the simple cases.  For all AMs if we don't have secondaries
694 	 * and are not a secondary and we aren't a foreign database and there
695 	 * are no dups then we can avoid a bunch of overhead.  For queue we
696 	 * don't need to fetch the record since we delete by direct calculation
697 	 * from the record number.
698 	 *
699 	 * Hash permits an optimization in DB->del: since on-page duplicates are
700 	 * stored in a single HKEYDATA structure, it's possible to delete an
701 	 * entire set of them at once, and as the HKEYDATA has to be rebuilt
702 	 * and re-put each time it changes, this is much faster than deleting
703 	 * the duplicates one by one.  Thus, if not pointing at an off-page
704 	 * duplicate set, and we're not using secondary indices (in which case
705 	 * we'd have to examine the items one by one anyway), let hash do this
706 	 * "quick delete".
707 	 *
708 	 * !!!
709 	 * Note that this is the only application-executed delete call in
710 	 * Berkeley DB that does not go through the __dbc_del function.
711 	 * If anything other than the delete itself (like a secondary index
712 	 * update) has to happen there in a particular situation, the
713 	 * conditions here should be modified not to use these optimizations.
714 	 * The ordinary AM-independent alternative will work just fine;
715 	 * it'll just be slower.
716 	 */
717 	if (!F_ISSET(dbp, DB_AM_SECONDARY) && !DB_IS_PRIMARY(dbp) &&
718 	    LIST_FIRST(&dbp->f_primaries) == NULL) {
719 #ifdef HAVE_QUEUE
720 		if (dbp->type == DB_QUEUE) {
721 			ret = __qam_delete(dbc, &tkey, flags);
722 			goto next;
723 		}
724 #endif
725 
726 		/* Fetch the first record. */
727 		if ((ret = __dbc_get(dbc, &tkey, &data, f_init)) != 0)
728 			goto err;
729 
730 #ifdef HAVE_HASH
731 		/*
732 		 * Hash "quick delete" removes all on-page duplicates.  We
733 		 * can't do that if deleting specific key/data pairs.
734 		 */
735 		if (dbp->type == DB_HASH && !LF_ISSET(DB_MULTIPLE_KEY)) {
736 			DBC *sdbc;
737 			sdbc = dbc;
738 #ifdef HAVE_PARTITION
739 			if (F_ISSET(dbc, DBC_PARTITIONED))
740 				sdbc =
741 				    ((PART_CURSOR*)dbc->internal)->sub_cursor;
742 #endif
743 			if (sdbc->internal->opd == NULL) {
744 				ret = __ham_quick_delete(sdbc);
745 				goto next;
746 			}
747 		}
748 #endif
749 
750 		if (!F_ISSET(dbp, DB_AM_DUP)) {
751 			ret = dbc->am_del(dbc, 0);
752 			goto next;
753 		}
754 	} else if ((ret = __dbc_get(dbc, &tkey, &data, f_init)) != 0)
755 		goto err;
756 
757 	/* Walk through the set of key/data pairs, deleting as we go. */
758 	for (;;) {
759 		if ((ret = __dbc_del(dbc, flags)) != 0)
760 			break;
761 		/*
762 		 * With DB_MULTIPLE_KEY, the application has specified the
763 		 * exact records they want deleted.  We don't need to walk
764 		 * through a set of duplicates.
765 		 */
766 		if (LF_ISSET(DB_MULTIPLE_KEY))
767 			break;
768 
769 		F_SET(&tkey, DB_DBT_ISSET);
770 		F_SET(&data, DB_DBT_ISSET);
771 		if ((ret = __dbc_get(dbc, &tkey, &data, f_next)) != 0) {
772 			if (ret == DB_NOTFOUND)
773 				ret = 0;
774 			break;
775 		}
776 	}
777 
778 next:	if (ret == 0 && LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY)) {
779 		++key->doff;
780 		goto bulk_next;
781 	}
782 err:	/* Discard the cursor. */
783 	if (!DB_RETOK_DBDEL(ret))
784 		F_SET(dbc, DBC_ERROR);
785 	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
786 		ret = t_ret;
787 
788 	return (ret);
789 }
790 
791 /*
792  * __db_sync --
793  *	Flush the database cache.
794  *
795  * PUBLIC: int __db_sync __P((DB *));
796  */
797 int
__db_sync(dbp)798 __db_sync(dbp)
799 	DB *dbp;
800 {
801 	int ret, t_ret;
802 
803 	ret = 0;
804 
805 	/* If the database was read-only, we're done. */
806 	if (F_ISSET(dbp, DB_AM_RDONLY))
807 		return (0);
808 
809 	/* If it's a Recno tree, write the backing source text file. */
810 	if (dbp->type == DB_RECNO)
811 		ret = __ram_writeback(dbp);
812 
813 	/* If the database was never backed by a database file, we're done. */
814 	if (F_ISSET(dbp, DB_AM_INMEM))
815 		return (ret);
816 #ifdef HAVE_PARTITION
817 	if (DB_IS_PARTITIONED(dbp))
818 		ret = __partition_sync(dbp);
819 	else
820 #endif
821 
822 	/*
823 	 * No need to sync the top level external file database, since it is
824 	 * only opened when creating a new external file database, and is
825 	 * immediately closed after the external file directory id is obtained
826 		 * from it.
827 	 */
828 	if (dbp->blob_meta_db != NULL) {
829 		if ((t_ret = __db_sync(dbp->blob_meta_db)) != 0 && ret == 0)
830 			ret = t_ret;
831 	}
832 	if (dbp->type == DB_QUEUE)
833 		ret = __qam_sync(dbp);
834 	else
835 		/* Flush any dirty pages from the cache to the backing file. */
836 		if ((t_ret = __memp_fsync(dbp->mpf)) != 0 && ret == 0)
837 			ret = t_ret;
838 
839 	return (ret);
840 }
841 
842 /*
843  * __db_associate --
844  *	Associate another database as a secondary index to this one.
845  *
846  * PUBLIC: int __db_associate __P((DB *, DB_THREAD_INFO *, DB_TXN *, DB *,
847  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *, DBT *), u_int32_t));
848  */
849 int
__db_associate(dbp,ip,txn,sdbp,callback,flags)850 __db_associate(dbp, ip, txn, sdbp, callback, flags)
851 	DB *dbp, *sdbp;
852 	DB_THREAD_INFO *ip;
853 	DB_TXN *txn;
854 	int (*callback) __P((DB *, const DBT *, const DBT *, DBT *));
855 	u_int32_t flags;
856 {
857 	DBC *pdbc, *sdbc;
858 	DBT key, data, skey, *tskeyp;
859 	ENV *env;
860 	int build, ret, t_ret;
861 	u_int32_t nskey;
862 
863 	env = dbp->env;
864 	pdbc = sdbc = NULL;
865 	ret = 0;
866 
867 	memset(&skey, 0, sizeof(DBT));
868 	nskey = 0;
869 	tskeyp = NULL;
870 
871 	/*
872 	 * Check to see if the secondary is empty -- and thus if we should
873 	 * build it -- before we link it in and risk making it show up in other
874 	 * threads.  Do this first so that the databases remain unassociated on
875 	 * error.
876 	 */
877 	build = 0;
878 	if (LF_ISSET(DB_CREATE)) {
879 		FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_CREATE);
880 
881 		if ((ret = __db_cursor(sdbp, ip, txn, &sdbc, 0)) != 0)
882 			goto err;
883 
884 		/*
885 		 * We don't care about key or data;  we're just doing
886 		 * an existence check.
887 		 */
888 		memset(&key, 0, sizeof(DBT));
889 		memset(&data, 0, sizeof(DBT));
890 		F_SET(&key, DB_DBT_PARTIAL | DB_DBT_USERMEM);
891 		F_SET(&data, DB_DBT_PARTIAL | DB_DBT_USERMEM);
892 		if ((ret = __dbc_get(sdbc, &key, &data,
893 		    (STD_LOCKING(sdbc) ? DB_RMW : 0) |
894 		    DB_FIRST)) == DB_NOTFOUND) {
895 			build = 1;
896 			ret = 0;
897 		}
898 
899 		if (ret != 0)
900 			F_SET(sdbc, DBC_ERROR);
901 		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
902 			ret = t_ret;
903 
904 		/* Reset for later error check. */
905 		sdbc = NULL;
906 
907 		if (ret != 0)
908 			goto err;
909 	}
910 
911 	/*
912 	 * Set up the database handle as a secondary.
913 	 */
914 	sdbp->s_callback = callback;
915 	sdbp->s_primary = dbp;
916 
917 	sdbp->stored_get = sdbp->get;
918 	sdbp->get = __db_secondary_get;
919 
920 	sdbp->stored_close = sdbp->close;
921 	sdbp->close = __db_secondary_close_pp;
922 
923 	F_SET(sdbp, DB_AM_SECONDARY);
924 
925 	if (LF_ISSET(DB_IMMUTABLE_KEY))
926 		FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY);
927 
928 	/*
929 	 * Add the secondary to the list on the primary.  Do it here
930 	 * so that we see any updates that occur while we're walking
931 	 * the primary.
932 	 */
933 	MUTEX_LOCK(env, dbp->mutex);
934 
935 	/* See __db_s_next for an explanation of secondary refcounting. */
936 	DB_ASSERT(env, sdbp->s_refcnt == 0);
937 	sdbp->s_refcnt = 1;
938 	LIST_INSERT_HEAD(&dbp->s_secondaries, sdbp, s_links);
939 	MUTEX_UNLOCK(env, dbp->mutex);
940 
941 	if (build) {
942 		/*
943 		 * We loop through the primary, putting each item we
944 		 * find into the new secondary.
945 		 *
946 		 * If we're using CDB, opening these two cursors puts us
947 		 * in a bit of a locking tangle:  CDB locks are done on the
948 		 * primary, so that we stay deadlock-free, but that means
949 		 * that updating the secondary while we have a read cursor
950 		 * open on the primary will self-block.  To get around this,
951 		 * we force the primary cursor to use the same locker ID
952 		 * as the secondary, so they won't conflict.  This should
953 		 * be harmless even if we're not using CDB.
954 		 */
955 		if ((ret = __db_cursor(sdbp, ip, txn, &sdbc,
956 		    CDB_LOCKING(sdbp->env) ? DB_WRITECURSOR : 0)) != 0)
957 			goto err;
958 		if ((ret = __db_cursor_int(dbp, ip,
959 		    txn, dbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
960 			goto err;
961 
962 		/* Lock out other threads, now that we have a locker. */
963 		dbp->associate_locker = sdbc->locker;
964 
965 		memset(&key, 0, sizeof(DBT));
966 		memset(&data, 0, sizeof(DBT));
967 		while ((ret = __dbc_get(pdbc, &key, &data, DB_NEXT)) == 0) {
968 			if ((ret = callback(sdbp, &key, &data, &skey)) != 0) {
969 				if (ret == DB_DONOTINDEX)
970 					continue;
971 				goto err;
972 			}
973 			if (F_ISSET(&skey, DB_DBT_MULTIPLE)) {
974 #ifdef DIAGNOSTIC
975 				__db_check_skeyset(sdbp, &skey);
976 #endif
977 				nskey = skey.size;
978 				tskeyp = (DBT *)skey.data;
979 			} else {
980 				nskey = 1;
981 				tskeyp = &skey;
982 			}
983 			SWAP_IF_NEEDED(sdbp, &key);
984 			for (; nskey > 0; nskey--, tskeyp++) {
985 				if ((ret = __dbc_put(sdbc,
986 				    tskeyp, &key, DB_UPDATE_SECONDARY)) != 0)
987 					goto err;
988 				FREE_IF_NEEDED(env, tskeyp);
989 			}
990 			SWAP_IF_NEEDED(sdbp, &key);
991 			FREE_IF_NEEDED(env, &skey);
992 		}
993 		if (ret == DB_NOTFOUND)
994 			ret = 0;
995 	}
996 
997 err:	if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
998 		ret = t_ret;
999 
1000 	if (pdbc != NULL && (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
1001 		ret = t_ret;
1002 
1003 	dbp->associate_locker = NULL;
1004 
1005 	for (; nskey > 0; nskey--, tskeyp++)
1006 		FREE_IF_NEEDED(env, tskeyp);
1007 	FREE_IF_NEEDED(env, &skey);
1008 
1009 	return (ret);
1010 }
1011 
1012 /*
1013  * __db_secondary_get --
1014  *	This wrapper function for DB->pget() is the DB->get() function
1015  *	on a database which has been made into a secondary index.
1016  *
1017  * PUBLIC: int __db_secondary_get
1018  * PUBLIC:     __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
1019  */
1020 int
__db_secondary_get(sdbp,txn,skey,data,flags)1021 __db_secondary_get(sdbp, txn, skey, data, flags)
1022 	DB *sdbp;
1023 	DB_TXN *txn;
1024 	DBT *skey, *data;
1025 	u_int32_t flags;
1026 {
1027 	DB_ASSERT(sdbp->env, F_ISSET(sdbp, DB_AM_SECONDARY));
1028 	return (__db_pget_pp(sdbp, txn, skey, NULL, data, flags));
1029 }
1030 
1031 /*
1032  * __db_secondary_close --
1033  *	Wrapper function for DB->close() which we use on secondaries to
1034  *	manage refcounting and make sure we don't close them underneath
1035  *	a primary that is updating.
1036  *
1037  * PUBLIC: int __db_secondary_close __P((DB *, u_int32_t));
1038  */
1039 int
__db_secondary_close(sdbp,flags)1040 __db_secondary_close(sdbp, flags)
1041 	DB *sdbp;
1042 	u_int32_t flags;
1043 {
1044 	DB *primary;
1045 	ENV *env;
1046 	int doclose;
1047 
1048 	/*
1049 	 * If the opening transaction is rolled back then the db handle
1050 	 * will have already been refreshed, we just need to call
1051 	 * __db_close to free the data.
1052 	 */
1053 	if (!F_ISSET(sdbp, DB_AM_OPEN_CALLED)) {
1054 		doclose = 1;
1055 		goto done;
1056 	}
1057 	doclose = 0;
1058 	primary = sdbp->s_primary;
1059 	env = primary->env;
1060 
1061 	MUTEX_LOCK(env, primary->mutex);
1062 	/*
1063 	 * Check the refcount--if it was at 1 when we were called, no
1064 	 * thread is currently updating this secondary through the primary,
1065 	 * so it's safe to close it for real.
1066 	 *
1067 	 * If it's not safe to do the close now, we do nothing;  the
1068 	 * database will actually be closed when the refcount is decremented,
1069 	 * which can happen in either __db_s_next or __db_s_done.
1070 	 */
1071 	DB_ASSERT(env, sdbp->s_refcnt != 0);
1072 	if (--sdbp->s_refcnt == 0) {
1073 		LIST_REMOVE(sdbp, s_links);
1074 		/* We don't want to call close while the mutex is held. */
1075 		doclose = 1;
1076 	}
1077 	MUTEX_UNLOCK(env, primary->mutex);
1078 
1079 	/*
1080 	 * sdbp->close is this function;  call the real one explicitly if
1081 	 * need be.
1082 	 */
1083 done:	return (doclose ? __db_close(sdbp, NULL, flags) : 0);
1084 }
1085 
1086 /*
1087  * __db_associate_foreign --
1088  *	Associate this database (fdbp) as a foreign constraint to another
1089  *	database (pdbp).  That is, dbp's keys appear as foreign key values in
1090  *	pdbp.
1091  *
1092  * PUBLIC: int __db_associate_foreign __P((DB *, DB *,
1093  * PUBLIC:     int (*)(DB *, const DBT *, DBT *, const DBT *, int *),
1094  * PUBLIC:     u_int32_t));
1095  */
1096 int
__db_associate_foreign(fdbp,pdbp,callback,flags)1097 __db_associate_foreign(fdbp, pdbp, callback, flags)
1098 	DB *fdbp, *pdbp;
1099 	int (*callback)(DB *, const DBT *, DBT *, const DBT *, int *);
1100 	u_int32_t flags;
1101 {
1102 	DB_FOREIGN_INFO *f_info;
1103 	ENV *env;
1104 	int ret;
1105 
1106 	env = fdbp->env;
1107 	ret = 0;
1108 
1109 	if ((ret = __os_malloc(env, sizeof(DB_FOREIGN_INFO), &f_info)) != 0) {
1110 		return (ret);
1111 	}
1112 	memset(f_info, 0, sizeof(DB_FOREIGN_INFO));
1113 
1114 	f_info->dbp = pdbp;
1115 	f_info->callback = callback;
1116 
1117 	/*
1118 	 * It might be wise to filter this, but for now the flags only
1119 	 * set the delete action type.
1120 	 */
1121 	FLD_SET(f_info->flags, flags);
1122 
1123 	/*
1124 	 * Add f_info to the foreign database's list of primaries.  That is to
1125 	 * say, fdbp->f_primaries lists all databases for which fdbp is a
1126 	 * foreign constraint.
1127 	 */
1128 	MUTEX_LOCK(env, fdbp->mutex);
1129 	LIST_INSERT_HEAD(&fdbp->f_primaries, f_info, f_links);
1130 	MUTEX_UNLOCK(env, fdbp->mutex);
1131 
1132 	/*
1133 	* Associate fdbp as pdbp's foreign db, for referential integrity
1134 	* checks.  We don't allow the foreign db to be changed, because we
1135 	* currently have no way of removing pdbp from the old foreign db's list
1136 	* of primaries.
1137 	*/
1138 	if (pdbp->s_foreign != NULL)
1139 		return (EINVAL);
1140 	pdbp->s_foreign = fdbp;
1141 
1142 	return (ret);
1143 }
1144 
1145 static int
__dbc_set_priority(dbc,priority)1146 __dbc_set_priority(dbc, priority)
1147 	DBC *dbc;
1148 	DB_CACHE_PRIORITY priority;
1149 {
1150 	dbc->priority = priority;
1151 	return (0);
1152 }
1153 
1154 static int
__dbc_get_priority(dbc,priority)1155 __dbc_get_priority(dbc, priority)
1156 	DBC *dbc;
1157 	DB_CACHE_PRIORITY *priority;
1158 {
1159 	if (dbc->priority == DB_PRIORITY_UNCHANGED)
1160 		return (__memp_get_priority(dbc->dbp->mpf, priority));
1161 	else
1162 		*priority = dbc->priority;
1163 
1164 	return (0);
1165 }
1166