1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1998, 2013 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/hash.h"
15 #include "dbinc/heap.h"
16 #include "dbinc/lock.h"
17 #include "dbinc/mp.h"
18 #include "dbinc/partition.h"
19 #include "dbinc/qam.h"
20 #include "dbinc/txn.h"
21 
22 static int __db_secondary_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
23 static int __dbc_set_priority __P((DBC *, DB_CACHE_PRIORITY));
24 static int __dbc_get_priority __P((DBC *, DB_CACHE_PRIORITY* ));
25 
26 /*
27  * __db_cursor_int --
28  *	Internal routine to create a cursor.
29  *
30  * PUBLIC: int __db_cursor_int __P((DB *, DB_THREAD_INFO *,
31  * PUBLIC:     DB_TXN *, DBTYPE, db_pgno_t, int, DB_LOCKER *, DBC **));
32  */
33 int
__db_cursor_int(dbp,ip,txn,dbtype,root,flags,locker,dbcp)34 __db_cursor_int(dbp, ip, txn, dbtype, root, flags, locker, dbcp)
35 	DB *dbp;
36 	DB_THREAD_INFO *ip;
37 	DB_TXN *txn;
38 	DBTYPE dbtype;
39 	db_pgno_t root;
40 	int flags;
41 	DB_LOCKER *locker;
42 	DBC **dbcp;
43 {
44 	DBC *dbc;
45 	DBC_INTERNAL *cp;
46 	DB_LOCKREQ req;
47 	ENV *env;
48 	db_threadid_t tid;
49 	int allocated, envlid, ret;
50 	pid_t pid;
51 
52 	env = dbp->env;
53 	allocated = envlid = 0;
54 
55 	/*
56 	 * If dbcp is non-NULL it is assumed to point to an area to initialize
57 	 * as a cursor.
58 	 *
59 	 * Take one from the free list if it's available.  Take only the
60 	 * right type.  With off page dups we may have different kinds
61 	 * of cursors on the queue for a single database.
62 	 */
63 	MUTEX_LOCK(env, dbp->mutex);
64 
65 #ifndef HAVE_NO_DB_REFCOUNT
66 	/*
67 	 * If this DBP is being logged then refcount the log filename
68 	 * relative to this transaction. We do this here because we have
69 	 * the dbp->mutex which protects the refcount.  We want to avoid
70 	 * calling the function if the transaction handle has a shared parent
71 	 * locker or we are duplicating a cursor.  This includes the case of
72 	 * creating an off page duplicate cursor.
73 	 * If we knew this cursor will not be used in an update, we could avoid
74 	 * this, but we don't have that information.
75 	 */
76 	if (IS_REAL_TXN(txn) &&
77 	    !LF_ISSET(DBC_OPD | DBC_DUPLICATE) &&
78 	    !F_ISSET(dbp, DB_AM_RECOVER) &&
79 	    dbp->log_filename != NULL && !IS_REP_CLIENT(env) &&
80 	    (ret = __txn_record_fname(env, txn, dbp->log_filename)) != 0) {
81 		MUTEX_UNLOCK(env, dbp->mutex);
82 		return (ret);
83 	}
84 
85 #endif
86 
87 	TAILQ_FOREACH(dbc, &dbp->free_queue, links)
88 		if (dbtype == dbc->dbtype) {
89 			TAILQ_REMOVE(&dbp->free_queue, dbc, links);
90 			F_CLR(dbc, ~DBC_OWN_LID);
91 			break;
92 		}
93 	MUTEX_UNLOCK(env, dbp->mutex);
94 
95 	if (dbc == NULL) {
96 		if ((ret = __os_calloc(env, 1, sizeof(DBC), &dbc)) != 0)
97 			return (ret);
98 		allocated = 1;
99 		dbc->flags = 0;
100 
101 		dbc->dbp = dbp;
102 		dbc->dbenv = dbp->dbenv;
103 		dbc->env = dbp->env;
104 
105 		/* Set up locking information. */
106 		if (LOCKING_ON(env)) {
107 			/*
108 			 * If we are not threaded, we share a locker ID among
109 			 * all cursors opened in the environment handle,
110 			 * allocating one if this is the first cursor.
111 			 *
112 			 * This relies on the fact that non-threaded DB handles
113 			 * always have non-threaded environment handles, since
114 			 * we set DB_THREAD on DB handles created with threaded
115 			 * environment handles.
116 			 */
117 			if (!DB_IS_THREADED(dbp)) {
118 				if (env->env_lref == NULL) {
119 					if ((ret = __lock_id(env,
120 					    NULL, &env->env_lref)) != 0)
121 						goto err;
122 				       envlid = 1;
123 				}
124 				dbc->lref = env->env_lref;
125 			}
126 
127 			/*
128 			 * In CDB, secondary indices should share a lock file
129 			 * ID with the primary;  otherwise we're susceptible
130 			 * to deadlocks.  We also use __db_cursor_int rather
131 			 * than __db_cursor to create secondary update cursors
132 			 * in c_put and c_del; these won't acquire a new lock.
133 			 *
134 			 * !!!
135 			 * Since this is in the one-time cursor allocation
136 			 * code, we need to be sure to destroy, not just
137 			 * close, all cursors in the secondary when we
138 			 * associate.
139 			 */
140 			if (CDB_LOCKING(env) &&
141 			    F_ISSET(dbp, DB_AM_SECONDARY))
142 				memcpy(dbc->lock.fileid,
143 				    dbp->s_primary->fileid, DB_FILE_ID_LEN);
144 			else
145 				memcpy(dbc->lock.fileid,
146 				    dbp->fileid, DB_FILE_ID_LEN);
147 
148 			if (CDB_LOCKING(env)) {
149 				if (F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
150 					/*
151 					 * If we are doing a single lock per
152 					 * environment, set up the global
153 					 * lock object just like we do to
154 					 * single thread creates.
155 					 */
156 					DB_ASSERT(env, sizeof(db_pgno_t) ==
157 					    sizeof(u_int32_t));
158 					dbc->lock_dbt.size = sizeof(u_int32_t);
159 					dbc->lock_dbt.data = &dbc->lock.pgno;
160 					dbc->lock.pgno = 0;
161 				} else {
162 					dbc->lock_dbt.size = DB_FILE_ID_LEN;
163 					dbc->lock_dbt.data = dbc->lock.fileid;
164 				}
165 			} else {
166 				dbc->lock.type = DB_PAGE_LOCK;
167 				dbc->lock_dbt.size = sizeof(dbc->lock);
168 				dbc->lock_dbt.data = &dbc->lock;
169 			}
170 		}
171 		/* Init the DBC internal structure. */
172 #ifdef HAVE_PARTITION
173 		if (DB_IS_PARTITIONED(dbp)) {
174 			if ((ret = __partc_init(dbc)) != 0)
175 				goto err;
176 		} else
177 #endif
178 		switch (dbtype) {
179 		case DB_BTREE:
180 		case DB_RECNO:
181 			if ((ret = __bamc_init(dbc, dbtype)) != 0)
182 				goto err;
183 			break;
184 		case DB_HASH:
185 			if ((ret = __hamc_init(dbc)) != 0)
186 				goto err;
187 			break;
188 		case DB_HEAP:
189 			if ((ret = __heapc_init(dbc)) != 0)
190 				goto err;
191 			break;
192 		case DB_QUEUE:
193 			if ((ret = __qamc_init(dbc)) != 0)
194 				goto err;
195 			break;
196 		case DB_UNKNOWN:
197 		default:
198 			ret = __db_unknown_type(env, "DB->cursor", dbtype);
199 			goto err;
200 		}
201 
202 		cp = dbc->internal;
203 	}
204 
205 	/* Refresh the DBC structure. */
206 	dbc->dbtype = dbtype;
207 	RESET_RET_MEM(dbc);
208 	dbc->set_priority = __dbc_set_priority;
209 	dbc->get_priority = __dbc_get_priority;
210 	dbc->priority = dbp->priority;
211 	dbc->txn_cursors.tqe_next = NULL;
212 	dbc->txn_cursors.tqe_prev = NULL;
213 
214 	/*
215 	 * If the DB handle is not threaded, there is one locker ID for the
216 	 * whole environment.  There should only one family transaction active
217 	 * as well.  This doesn't apply to CDS group transactions, where the
218 	 * cursor can simply use the transaction's locker directly.
219 	 */
220 	if (!CDB_LOCKING(env) && txn != NULL && F_ISSET(txn, TXN_FAMILY) &&
221 	    (F_ISSET(dbc, DBC_OWN_LID) || dbc->lref == NULL || envlid))  {
222 		if (LOCKING_ON(env)) {
223 			if (dbc->lref == NULL) {
224 				if ((ret =
225 				    __lock_id(env, NULL, &dbc->lref)) != 0)
226 					goto err;
227 				F_SET(dbc, DBC_OWN_LID);
228 			}
229 			if ((ret = __lock_addfamilylocker(env,
230 			    txn->txnid, dbc->lref->id, 1)) != 0)
231 				goto err;
232 		}
233 		F_SET(dbc, DBC_FAMILY);
234 		txn = NULL;
235 	}
236 
237 	if ((dbc->txn = txn) != NULL)
238 		dbc->locker = txn->locker;
239 	else if (LOCKING_ON(env)) {
240 		/*
241 		 * There are certain cases in which we want to create a
242 		 * new cursor with a particular locker ID that is known
243 		 * to be the same as (and thus not conflict with) an
244 		 * open cursor.
245 		 *
246 		 * The most obvious case is cursor duplication;  when we
247 		 * call DBC->dup or __dbc_idup, we want to use the original
248 		 * cursor's locker ID.
249 		 *
250 		 * Another case is when updating secondary indices.  Standard
251 		 * CDB locking would mean that we might block ourself:  we need
252 		 * to open an update cursor in the secondary while an update
253 		 * cursor in the primary is open, and when the secondary and
254 		 * primary are subdatabases or we're using env-wide locking,
255 		 * this is disastrous.
256 		 *
257 		 * In these cases, our caller will pass a nonzero locker
258 		 * ID into this function.  Use this locker ID instead of
259 		 * the default as the locker ID for our new cursor.
260 		 */
261 		if (locker != NULL)
262 			dbc->locker = locker;
263 		else if (LF_ISSET(DB_RECOVER))
264 			dbc->locker = NULL;
265 		else {
266 			if (dbc->lref == NULL) {
267 				if ((ret =
268 				    __lock_id(env, NULL, &dbc->lref)) != 0)
269 					goto err;
270 				F_SET(dbc, DBC_OWN_LID);
271 			}
272 			/*
273 			 * If we are threaded then we need to set the
274 			 * proper thread id into the locker.
275 			 */
276 			if (DB_IS_THREADED(dbp)) {
277 				env->dbenv->thread_id(env->dbenv, &pid, &tid);
278 				__lock_set_thread_id(dbc->lref, pid, tid);
279 			}
280 			dbc->locker = dbc->lref;
281 		}
282 	}
283 
284 	/*
285 	 * These fields change when we are used as a secondary index, so
286 	 * if the DB is a secondary, make sure they're set properly just
287 	 * in case we opened some cursors before we were associated.
288 	 *
289 	 * __dbc_get is used by all access methods, so this should be safe.
290 	 */
291 	if (F_ISSET(dbp, DB_AM_SECONDARY))
292 		dbc->get = dbc->c_get = __dbc_secondary_get_pp;
293 
294 	/*
295 	 * Don't enable bulk for btrees with record numbering, since avoiding
296 	 * a full search avoids taking write locks necessary to maintain
297 	 * consistent numbering.
298 	 */
299 	if (LF_ISSET(DB_CURSOR_BULK) && dbtype == DB_BTREE &&
300 	    !F_ISSET(dbp, DB_AM_RECNUM))
301 		F_SET(dbc, DBC_BULK);
302 	if (LF_ISSET(DB_CURSOR_TRANSIENT))
303 		F_SET(dbc, DBC_TRANSIENT);
304 	if (LF_ISSET(DBC_OPD))
305 		F_SET(dbc, DBC_OPD);
306 	if (F_ISSET(dbp, DB_AM_RECOVER) || LF_ISSET(DB_RECOVER))
307 		F_SET(dbc, DBC_RECOVER);
308 	if (F_ISSET(dbp, DB_AM_COMPENSATE))
309 		F_SET(dbc, DBC_DONTLOCK);
310 	/*
311 	* If this database is exclusive then the cursor
312 	* does not need to get locks.
313 	*/
314 	if (F2_ISSET(dbp, DB2_AM_EXCL)) {
315 		F_SET(dbc, DBC_DONTLOCK);
316 		if (IS_REAL_TXN(txn)&& !LF_ISSET(DBC_OPD | DBC_DUPLICATE)) {
317 			/*
318 			 * Exclusive databases can only have one active
319 			 * transaction at a time since there are no internal
320 			 * locks to prevent one transaction from reading and
321 			 * writing another's uncommitted changes.
322 			 */
323 			if (dbp->cur_txn != NULL && dbp->cur_txn != txn) {
324 			    __db_errx(env, DB_STR("0749",
325 "Exclusive database handles can only have one active transaction at a time."));
326 				ret = EINVAL;
327 				goto err;
328 			}
329 			/* Do not trade a second time. */
330 			if (dbp->cur_txn != txn) {
331 				/* Trade the handle lock to the txn locker. */
332 				memset(&req, 0, sizeof(req));
333 				req.lock = dbp->handle_lock;
334 				req.op = DB_LOCK_TRADE;
335 				if ((ret = __lock_vec(env, txn->locker, 0,
336 				    &req, 1, 0)) != 0)
337 					goto err;
338 				dbp->cur_txn = txn;
339 				dbp->cur_locker = txn->locker;
340 				if ((ret = __txn_lockevent(env, txn, dbp,
341 				    &dbp->handle_lock, dbp->locker)) != 0)
342 					goto err;
343 			}
344 		}
345 	}
346 #ifdef HAVE_REPLICATION
347 	/*
348 	 * If we are replicating from a down rev version then we must
349 	 * use old locking protocols.
350 	 */
351 	if (LOGGING_ON(env) &&
352 	     ((LOG *)env->lg_handle->
353 	     reginfo.primary)->persist.version < DB_LOGVERSION_LATCHING)
354 		F_SET(dbc, DBC_DOWNREV);
355 #endif
356 
357 	/* Refresh the DBC internal structure. */
358 	cp = dbc->internal;
359 	cp->opd = NULL;
360 	cp->pdbc = NULL;
361 
362 	cp->indx = 0;
363 	cp->page = NULL;
364 	cp->pgno = PGNO_INVALID;
365 	cp->root = root;
366 	cp->stream_start_pgno = cp->stream_curr_pgno = PGNO_INVALID;
367 	cp->stream_off = 0;
368 
369 	if (DB_IS_PARTITIONED(dbp)) {
370 		DBC_PART_REFRESH(dbc);
371 	} else switch (dbtype) {
372 	case DB_BTREE:
373 	case DB_RECNO:
374 		if ((ret = __bamc_refresh(dbc)) != 0)
375 			goto err;
376 		break;
377 	case DB_HEAP:
378 		if ((ret = __heapc_refresh(dbc)) != 0)
379 			goto err;
380 		break;
381 	case DB_HASH:
382 	case DB_QUEUE:
383 		break;
384 	case DB_UNKNOWN:
385 	default:
386 		ret = __db_unknown_type(env, "DB->cursor", dbp->type);
387 		goto err;
388 	}
389 
390 	/*
391 	 * The transaction keeps track of how many cursors were opened within
392 	 * it to catch application errors where the cursor isn't closed when
393 	 * the transaction is resolved.
394 	 */
395 	if (txn != NULL)
396 		++txn->cursors;
397 	if (ip != NULL) {
398 		dbc->thread_info = ip;
399 #ifdef DIAGNOSTIC
400 		if (dbc->locker != NULL)
401 			ip->dbth_locker =
402 			    R_OFFSET(&(env->lk_handle->reginfo), dbc->locker);
403 		else
404 			ip->dbth_locker = INVALID_ROFF;
405 #endif
406 	} else if (txn != NULL)
407 		dbc->thread_info = txn->thread_info;
408 	else
409 		ENV_GET_THREAD_INFO(env, dbc->thread_info);
410 
411 	MUTEX_LOCK(env, dbp->mutex);
412 	TAILQ_INSERT_TAIL(&dbp->active_queue, dbc, links);
413 	F_SET(dbc, DBC_ACTIVE);
414 	MUTEX_UNLOCK(env, dbp->mutex);
415 
416 	*dbcp = dbc;
417 	return (0);
418 
419 err:	if (allocated)
420 		__os_free(env, dbc);
421 	return (ret);
422 }
423 
424 /*
425  * __db_put --
426  *	Store a key/data pair.
427  *
428  * PUBLIC: int __db_put __P((DB *,
429  * PUBLIC:      DB_THREAD_INFO *, DB_TXN *, DBT *, DBT *, u_int32_t));
430  */
431 int
__db_put(dbp,ip,txn,key,data,flags)432 __db_put(dbp, ip, txn, key, data, flags)
433 	DB *dbp;
434 	DB_THREAD_INFO *ip;
435 	DB_TXN *txn;
436 	DBT *key, *data;
437 	u_int32_t flags;
438 {
439 	DB_HEAP_RID rid;
440 	DBC *dbc;
441 	DBT tdata, tkey;
442 	ENV *env;
443 	void *bulk_kptr, *bulk_ptr;
444 	db_recno_t recno;
445 	u_int32_t cursor_flags;
446 	int ret, t_ret;
447 
448 	env = dbp->env;
449 
450 	/*
451 	 * See the comment in __db_get() regarding DB_CURSOR_TRANSIENT.
452 	 *
453 	 * Note that the get in the DB_NOOVERWRITE case is safe to do with this
454 	 * flag set;  if it errors in any way other than DB_NOTFOUND, we're
455 	 * going to close the cursor without doing anything else, and if it
456 	 * returns DB_NOTFOUND then it's safe to do a c_put(DB_KEYLAST) even if
457 	 * an access method moved the cursor, since that's not
458 	 * position-dependent.
459 	 */
460 	cursor_flags = DB_WRITELOCK;
461 	if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY))
462 		cursor_flags |= DB_CURSOR_BULK;
463 	else
464 		cursor_flags |= DB_CURSOR_TRANSIENT;
465 	if ((ret = __db_cursor(dbp, ip, txn, &dbc, cursor_flags)) != 0)
466 		return (ret);
467 
468 	DEBUG_LWRITE(dbc, txn, "DB->put", key, data, flags);
469 	PERFMON6(env, db, put, dbp->fname,
470 	    dbp->dname, txn == NULL ? 0 : txn->txnid, key, data, flags);
471 
472 	SET_RET_MEM(dbc, dbp);
473 
474 	if (flags == DB_APPEND && !DB_IS_PRIMARY(dbp)) {
475 		/*
476 		 * If there is an append callback, the value stored in
477 		 * data->data may be replaced and then freed.  To avoid
478 		 * passing a freed pointer back to the user, just operate
479 		 * on a copy of the data DBT.
480 		 */
481 		tdata = *data;
482 
483 		/*
484 		 * Append isn't a normal put operation;  call the appropriate
485 		 * access method's append function.
486 		 */
487 		switch (dbp->type) {
488 		case DB_HEAP:
489 			if ((ret = __heap_append(dbc, key, &tdata)) != 0)
490 				goto err;
491 			break;
492 		case DB_QUEUE:
493 			if ((ret = __qam_append(dbc, key, &tdata)) != 0)
494 				goto err;
495 			break;
496 		case DB_RECNO:
497 			if ((ret = __ram_append(dbc, key, &tdata)) != 0)
498 				goto err;
499 			break;
500 		case DB_BTREE:
501 		case DB_HASH:
502 		case DB_UNKNOWN:
503 		default:
504 			/* The interface should prevent this. */
505 			DB_ASSERT(env,
506 			    dbp->type == DB_QUEUE || dbp->type == DB_RECNO);
507 
508 			ret = __db_ferr(env, "DB->put", 0);
509 			goto err;
510 		}
511 
512 		/*
513 		 * The append callback, if one exists, may have allocated
514 		 * a new tdata.data buffer.  If so, free it.
515 		 */
516 		FREE_IF_NEEDED(env, &tdata);
517 
518 		/* No need for a cursor put;  we're done. */
519 #ifdef HAVE_COMPRESSION
520 	} else if (DB_IS_COMPRESSED(dbp) && !F_ISSET(dbp, DB_AM_SECONDARY) &&
521 	    !DB_IS_PRIMARY(dbp) && LIST_FIRST(&dbp->f_primaries) == NULL) {
522 		ret = __dbc_put(dbc, key, data, flags);
523 #endif
524 	} else if (LF_ISSET(DB_MULTIPLE)) {
525 		ret = 0;
526 		memset(&tkey, 0, sizeof(tkey));
527 		if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
528 			tkey.data = &recno;
529 			tkey.size = sizeof(recno);
530 		}
531 		memset(&tdata, 0, sizeof(tdata));
532 		DB_MULTIPLE_INIT(bulk_kptr, key);
533 		DB_MULTIPLE_INIT(bulk_ptr, data);
534 		key->doff = 0;
535 		while (ret == 0) {
536 			if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
537 				DB_MULTIPLE_RECNO_NEXT(bulk_kptr, key,
538 				    recno, tdata.data, tdata.size);
539 			else
540 				DB_MULTIPLE_NEXT(bulk_kptr, key,
541 				    tkey.data, tkey.size);
542 			DB_MULTIPLE_NEXT(bulk_ptr, data,
543 			    tdata.data, tdata.size);
544 			if (bulk_kptr == NULL || bulk_ptr == NULL)
545 				break;
546 			if (dbp->type == DB_HEAP) {
547 				memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
548 				tkey.data = &rid;
549 			}
550 			ret = __dbc_put(dbc, &tkey, &tdata,
551 			    LF_ISSET(DB_OPFLAGS_MASK));
552 			if (ret == 0)
553 				++key->doff;
554 		}
555 	} else if (LF_ISSET(DB_MULTIPLE_KEY)) {
556 		ret = 0;
557 		memset(&tkey, 0, sizeof(tkey));
558 		if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
559 			tkey.data = &recno;
560 			tkey.size = sizeof(recno);
561 		}
562 		memset(&tdata, 0, sizeof(tdata));
563 		DB_MULTIPLE_INIT(bulk_ptr, key);
564 		while (ret == 0) {
565 			if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
566 				DB_MULTIPLE_RECNO_NEXT(bulk_ptr, key, recno,
567 				    tdata.data, tdata.size);
568 			else
569 				DB_MULTIPLE_KEY_NEXT(bulk_ptr, key, tkey.data,
570 				    tkey.size, tdata.data, tdata.size);
571 			if (bulk_ptr == NULL)
572 				break;
573 			if (dbp->type == DB_HEAP) {
574 				memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
575 				tkey.data = &rid;
576 			}
577 			ret = __dbc_put(dbc, &tkey, &tdata,
578 			    LF_ISSET(DB_OPFLAGS_MASK));
579 			if (ret == 0)
580 				++key->doff;
581 		}
582 	} else
583 		ret = __dbc_put(dbc, key, data, flags);
584 
585 err:	/* Close the cursor. */
586 	if (!DB_RETOK_DBPUT(ret))
587 		F_SET(dbc, DBC_ERROR);
588 	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
589 		ret = t_ret;
590 
591 	return (ret);
592 }
593 
594 /*
595  * __db_del --
596  *	Delete the items referenced by a key.
597  *
598  * PUBLIC: int __db_del __P((DB *,
599  * PUBLIC:      DB_THREAD_INFO *, DB_TXN *, DBT *, u_int32_t));
600  */
601 int
__db_del(dbp,ip,txn,key,flags)602 __db_del(dbp, ip, txn, key, flags)
603 	DB *dbp;
604 	DB_THREAD_INFO *ip;
605 	DB_TXN *txn;
606 	DBT *key;
607 	u_int32_t flags;
608 {
609 	DB_HEAP_RID rid;
610 	DBC *dbc;
611 	DBT data, tkey;
612 	void *bulk_ptr;
613 	db_recno_t recno;
614 	u_int32_t cursor_flags, f_init, f_next;
615 	int ret, t_ret;
616 
617 	COMPQUIET(bulk_ptr, NULL);
618 	/* Allocate a cursor. */
619 	cursor_flags = DB_WRITELOCK;
620 	if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY))
621 		cursor_flags |= DB_CURSOR_BULK;
622 	if ((ret = __db_cursor(dbp, ip, txn, &dbc, cursor_flags)) != 0)
623 		return (ret);
624 
625 	DEBUG_LWRITE(dbc, txn, "DB->del", key, NULL, flags);
626 	PERFMON5(env, db, del,
627 	    dbp->fname, dbp->dname, txn == NULL ? 0 : txn->txnid, key, flags);
628 
629 #ifdef HAVE_COMPRESSION
630 	if (DB_IS_COMPRESSED(dbp) && !F_ISSET(dbp, DB_AM_SECONDARY) &&
631 	    !DB_IS_PRIMARY(dbp) && LIST_FIRST(&dbp->f_primaries) == NULL) {
632 		F_SET(dbc, DBC_TRANSIENT);
633 		ret = __dbc_bulk_del(dbc, key, flags);
634 		goto err;
635 	}
636 #endif
637 
638 	/*
639 	 * Walk a cursor through the key/data pairs, deleting as we go.  Set
640 	 * the DB_DBT_USERMEM flag, as this might be a threaded application
641 	 * and the flags checking will catch us.  We don't actually want the
642 	 * keys or data, set DB_DBT_ISSET.  We rely on __dbc_get to clear
643 	 * this.
644 	 */
645 	memset(&data, 0, sizeof(data));
646 	F_SET(&data, DB_DBT_USERMEM);
647 	tkey = *key;
648 
649 	f_init = LF_ISSET(DB_MULTIPLE_KEY) ? DB_GET_BOTH : DB_SET;
650 	f_next = DB_NEXT_DUP;
651 
652 	/*
653 	 * If locking (and we haven't already acquired CDB locks), set the
654 	 * read-modify-write flag.
655 	 */
656 	if (STD_LOCKING(dbc)) {
657 		f_init |= DB_RMW;
658 		f_next |= DB_RMW;
659 	}
660 
661 	if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY)) {
662 		if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
663 			memset(&tkey, 0, sizeof(tkey));
664 			tkey.data = &recno;
665 			tkey.size = sizeof(recno);
666 		}
667 		DB_MULTIPLE_INIT(bulk_ptr, key);
668 		/* We return the number of keys deleted in doff. */
669 		key->doff = 0;
670 bulk_next:	if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
671 			DB_MULTIPLE_RECNO_NEXT(bulk_ptr, key,
672 			    recno, data.data, data.size);
673 		else if (LF_ISSET(DB_MULTIPLE))
674 			DB_MULTIPLE_NEXT(bulk_ptr, key, tkey.data, tkey.size);
675 		else
676 			DB_MULTIPLE_KEY_NEXT(bulk_ptr, key,
677 			    tkey.data, tkey.size, data.data, data.size);
678 		if (bulk_ptr == NULL)
679 			goto err;
680 		if (dbp->type == DB_HEAP) {
681 			memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
682 			tkey.data = &rid;
683 		}
684 
685 	}
686 
687 	/* We're not interested in the data -- do not return it. */
688 	F_SET(&tkey, DB_DBT_ISSET);
689 	F_SET(&data, DB_DBT_ISSET);
690 
691 	/*
692 	 * Optimize the simple cases.  For all AMs if we don't have secondaries
693 	 * and are not a secondary and we aren't a foreign database and there
694 	 * are no dups then we can avoid a bunch of overhead.  For queue we
695 	 * don't need to fetch the record since we delete by direct calculation
696 	 * from the record number.
697 	 *
698 	 * Hash permits an optimization in DB->del: since on-page duplicates are
699 	 * stored in a single HKEYDATA structure, it's possible to delete an
700 	 * entire set of them at once, and as the HKEYDATA has to be rebuilt
701 	 * and re-put each time it changes, this is much faster than deleting
702 	 * the duplicates one by one.  Thus, if not pointing at an off-page
703 	 * duplicate set, and we're not using secondary indices (in which case
704 	 * we'd have to examine the items one by one anyway), let hash do this
705 	 * "quick delete".
706 	 *
707 	 * !!!
708 	 * Note that this is the only application-executed delete call in
709 	 * Berkeley DB that does not go through the __dbc_del function.
710 	 * If anything other than the delete itself (like a secondary index
711 	 * update) has to happen there in a particular situation, the
712 	 * conditions here should be modified not to use these optimizations.
713 	 * The ordinary AM-independent alternative will work just fine;
714 	 * it'll just be slower.
715 	 */
716 	if (!F_ISSET(dbp, DB_AM_SECONDARY) && !DB_IS_PRIMARY(dbp) &&
717 	    LIST_FIRST(&dbp->f_primaries) == NULL) {
718 #ifdef HAVE_QUEUE
719 		if (dbp->type == DB_QUEUE) {
720 			ret = __qam_delete(dbc, &tkey, flags);
721 			goto next;
722 		}
723 #endif
724 
725 		/* Fetch the first record. */
726 		if ((ret = __dbc_get(dbc, &tkey, &data, f_init)) != 0)
727 			goto err;
728 
729 #ifdef HAVE_HASH
730 		/*
731 		 * Hash "quick delete" removes all on-page duplicates.  We
732 		 * can't do that if deleting specific key/data pairs.
733 		 */
734 		if (dbp->type == DB_HASH && !LF_ISSET(DB_MULTIPLE_KEY)) {
735 			DBC *sdbc;
736 			sdbc = dbc;
737 #ifdef HAVE_PARTITION
738 			if (F_ISSET(dbc, DBC_PARTITIONED))
739 				sdbc =
740 				    ((PART_CURSOR*)dbc->internal)->sub_cursor;
741 #endif
742 			if (sdbc->internal->opd == NULL) {
743 				ret = __ham_quick_delete(sdbc);
744 				goto next;
745 			}
746 		}
747 #endif
748 
749 		if (!F_ISSET(dbp, DB_AM_DUP)) {
750 			ret = dbc->am_del(dbc, 0);
751 			goto next;
752 		}
753 	} else if ((ret = __dbc_get(dbc, &tkey, &data, f_init)) != 0)
754 		goto err;
755 
756 	/* Walk through the set of key/data pairs, deleting as we go. */
757 	for (;;) {
758 		if ((ret = __dbc_del(dbc, flags)) != 0)
759 			break;
760 		/*
761 		 * With DB_MULTIPLE_KEY, the application has specified the
762 		 * exact records they want deleted.  We don't need to walk
763 		 * through a set of duplicates.
764 		 */
765 		if (LF_ISSET(DB_MULTIPLE_KEY))
766 			break;
767 
768 		F_SET(&tkey, DB_DBT_ISSET);
769 		F_SET(&data, DB_DBT_ISSET);
770 		if ((ret = __dbc_get(dbc, &tkey, &data, f_next)) != 0) {
771 			if (ret == DB_NOTFOUND)
772 				ret = 0;
773 			break;
774 		}
775 	}
776 
777 next:	if (ret == 0 && LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY)) {
778 		++key->doff;
779 		goto bulk_next;
780 	}
781 err:	/* Discard the cursor. */
782 	if (!DB_RETOK_DBDEL(ret))
783 		F_SET(dbc, DBC_ERROR);
784 	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
785 		ret = t_ret;
786 
787 	return (ret);
788 }
789 
790 /*
791  * __db_sync --
792  *	Flush the database cache.
793  *
794  * PUBLIC: int __db_sync __P((DB *));
795  */
796 int
__db_sync(dbp)797 __db_sync(dbp)
798 	DB *dbp;
799 {
800 	int ret, t_ret;
801 
802 	ret = 0;
803 
804 	/* If the database was read-only, we're done. */
805 	if (F_ISSET(dbp, DB_AM_RDONLY))
806 		return (0);
807 
808 	/* If it's a Recno tree, write the backing source text file. */
809 	if (dbp->type == DB_RECNO)
810 		ret = __ram_writeback(dbp);
811 
812 	/* If the database was never backed by a database file, we're done. */
813 	if (F_ISSET(dbp, DB_AM_INMEM))
814 		return (ret);
815 #ifdef HAVE_PARTITION
816 	if (DB_IS_PARTITIONED(dbp))
817 		ret = __partition_sync(dbp);
818 	else
819 #endif
820 	if (dbp->type == DB_QUEUE)
821 		ret = __qam_sync(dbp);
822 	else
823 		/* Flush any dirty pages from the cache to the backing file. */
824 		if ((t_ret = __memp_fsync(dbp->mpf)) != 0 && ret == 0)
825 			ret = t_ret;
826 
827 	return (ret);
828 }
829 
830 /*
831  * __db_associate --
832  *	Associate another database as a secondary index to this one.
833  *
834  * PUBLIC: int __db_associate __P((DB *, DB_THREAD_INFO *, DB_TXN *, DB *,
835  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *, DBT *), u_int32_t));
836  */
837 int
__db_associate(dbp,ip,txn,sdbp,callback,flags)838 __db_associate(dbp, ip, txn, sdbp, callback, flags)
839 	DB *dbp, *sdbp;
840 	DB_THREAD_INFO *ip;
841 	DB_TXN *txn;
842 	int (*callback) __P((DB *, const DBT *, const DBT *, DBT *));
843 	u_int32_t flags;
844 {
845 	DBC *pdbc, *sdbc;
846 	DBT key, data, skey, *tskeyp;
847 	ENV *env;
848 	int build, ret, t_ret;
849 	u_int32_t nskey;
850 
851 	env = dbp->env;
852 	pdbc = sdbc = NULL;
853 	ret = 0;
854 
855 	memset(&skey, 0, sizeof(DBT));
856 	nskey = 0;
857 	tskeyp = NULL;
858 
859 	/*
860 	 * Check to see if the secondary is empty -- and thus if we should
861 	 * build it -- before we link it in and risk making it show up in other
862 	 * threads.  Do this first so that the databases remain unassociated on
863 	 * error.
864 	 */
865 	build = 0;
866 	if (LF_ISSET(DB_CREATE)) {
867 		FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_CREATE);
868 
869 		if ((ret = __db_cursor(sdbp, ip, txn, &sdbc, 0)) != 0)
870 			goto err;
871 
872 		/*
873 		 * We don't care about key or data;  we're just doing
874 		 * an existence check.
875 		 */
876 		memset(&key, 0, sizeof(DBT));
877 		memset(&data, 0, sizeof(DBT));
878 		F_SET(&key, DB_DBT_PARTIAL | DB_DBT_USERMEM);
879 		F_SET(&data, DB_DBT_PARTIAL | DB_DBT_USERMEM);
880 		if ((ret = __dbc_get(sdbc, &key, &data,
881 		    (STD_LOCKING(sdbc) ? DB_RMW : 0) |
882 		    DB_FIRST)) == DB_NOTFOUND) {
883 			build = 1;
884 			ret = 0;
885 		}
886 
887 		if (ret != 0)
888 			F_SET(sdbc, DBC_ERROR);
889 		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
890 			ret = t_ret;
891 
892 		/* Reset for later error check. */
893 		sdbc = NULL;
894 
895 		if (ret != 0)
896 			goto err;
897 	}
898 
899 	/*
900 	 * Set up the database handle as a secondary.
901 	 */
902 	sdbp->s_callback = callback;
903 	sdbp->s_primary = dbp;
904 
905 	sdbp->stored_get = sdbp->get;
906 	sdbp->get = __db_secondary_get;
907 
908 	sdbp->stored_close = sdbp->close;
909 	sdbp->close = __db_secondary_close_pp;
910 
911 	F_SET(sdbp, DB_AM_SECONDARY);
912 
913 	if (LF_ISSET(DB_IMMUTABLE_KEY))
914 		FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY);
915 
916 	/*
917 	 * Add the secondary to the list on the primary.  Do it here
918 	 * so that we see any updates that occur while we're walking
919 	 * the primary.
920 	 */
921 	MUTEX_LOCK(env, dbp->mutex);
922 
923 	/* See __db_s_next for an explanation of secondary refcounting. */
924 	DB_ASSERT(env, sdbp->s_refcnt == 0);
925 	sdbp->s_refcnt = 1;
926 	LIST_INSERT_HEAD(&dbp->s_secondaries, sdbp, s_links);
927 	MUTEX_UNLOCK(env, dbp->mutex);
928 
929 	if (build) {
930 		/*
931 		 * We loop through the primary, putting each item we
932 		 * find into the new secondary.
933 		 *
934 		 * If we're using CDB, opening these two cursors puts us
935 		 * in a bit of a locking tangle:  CDB locks are done on the
936 		 * primary, so that we stay deadlock-free, but that means
937 		 * that updating the secondary while we have a read cursor
938 		 * open on the primary will self-block.  To get around this,
939 		 * we force the primary cursor to use the same locker ID
940 		 * as the secondary, so they won't conflict.  This should
941 		 * be harmless even if we're not using CDB.
942 		 */
943 		if ((ret = __db_cursor(sdbp, ip, txn, &sdbc,
944 		    CDB_LOCKING(sdbp->env) ? DB_WRITECURSOR : 0)) != 0)
945 			goto err;
946 		if ((ret = __db_cursor_int(dbp, ip,
947 		    txn, dbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
948 			goto err;
949 
950 		/* Lock out other threads, now that we have a locker. */
951 		dbp->associate_locker = sdbc->locker;
952 
953 		memset(&key, 0, sizeof(DBT));
954 		memset(&data, 0, sizeof(DBT));
955 		while ((ret = __dbc_get(pdbc, &key, &data, DB_NEXT)) == 0) {
956 			if ((ret = callback(sdbp, &key, &data, &skey)) != 0) {
957 				if (ret == DB_DONOTINDEX)
958 					continue;
959 				goto err;
960 			}
961 			if (F_ISSET(&skey, DB_DBT_MULTIPLE)) {
962 #ifdef DIAGNOSTIC
963 				__db_check_skeyset(sdbp, &skey);
964 #endif
965 				nskey = skey.size;
966 				tskeyp = (DBT *)skey.data;
967 			} else {
968 				nskey = 1;
969 				tskeyp = &skey;
970 			}
971 			SWAP_IF_NEEDED(sdbp, &key);
972 			for (; nskey > 0; nskey--, tskeyp++) {
973 				if ((ret = __dbc_put(sdbc,
974 				    tskeyp, &key, DB_UPDATE_SECONDARY)) != 0)
975 					goto err;
976 				FREE_IF_NEEDED(env, tskeyp);
977 			}
978 			SWAP_IF_NEEDED(sdbp, &key);
979 			FREE_IF_NEEDED(env, &skey);
980 		}
981 		if (ret == DB_NOTFOUND)
982 			ret = 0;
983 	}
984 
985 err:	if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
986 		ret = t_ret;
987 
988 	if (pdbc != NULL && (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
989 		ret = t_ret;
990 
991 	dbp->associate_locker = NULL;
992 
993 	for (; nskey > 0; nskey--, tskeyp++)
994 		FREE_IF_NEEDED(env, tskeyp);
995 	FREE_IF_NEEDED(env, &skey);
996 
997 	return (ret);
998 }
999 
1000 /*
1001  * __db_secondary_get --
1002  *	This wrapper function for DB->pget() is the DB->get() function
1003  *	on a database which has been made into a secondary index.
1004  */
1005 static int
__db_secondary_get(sdbp,txn,skey,data,flags)1006 __db_secondary_get(sdbp, txn, skey, data, flags)
1007 	DB *sdbp;
1008 	DB_TXN *txn;
1009 	DBT *skey, *data;
1010 	u_int32_t flags;
1011 {
1012 	DB_ASSERT(sdbp->env, F_ISSET(sdbp, DB_AM_SECONDARY));
1013 	return (__db_pget_pp(sdbp, txn, skey, NULL, data, flags));
1014 }
1015 
1016 /*
1017  * __db_secondary_close --
1018  *	Wrapper function for DB->close() which we use on secondaries to
1019  *	manage refcounting and make sure we don't close them underneath
1020  *	a primary that is updating.
1021  *
1022  * PUBLIC: int __db_secondary_close __P((DB *, u_int32_t));
1023  */
1024 int
__db_secondary_close(sdbp,flags)1025 __db_secondary_close(sdbp, flags)
1026 	DB *sdbp;
1027 	u_int32_t flags;
1028 {
1029 	DB *primary;
1030 	ENV *env;
1031 	int doclose;
1032 
1033 	/*
1034 	 * If the opening transaction is rolled back then the db handle
1035 	 * will have already been refreshed, we just need to call
1036 	 * __db_close to free the data.
1037 	 */
1038 	if (!F_ISSET(sdbp, DB_AM_OPEN_CALLED)) {
1039 		doclose = 1;
1040 		goto done;
1041 	}
1042 	doclose = 0;
1043 	primary = sdbp->s_primary;
1044 	env = primary->env;
1045 
1046 	MUTEX_LOCK(env, primary->mutex);
1047 	/*
1048 	 * Check the refcount--if it was at 1 when we were called, no
1049 	 * thread is currently updating this secondary through the primary,
1050 	 * so it's safe to close it for real.
1051 	 *
1052 	 * If it's not safe to do the close now, we do nothing;  the
1053 	 * database will actually be closed when the refcount is decremented,
1054 	 * which can happen in either __db_s_next or __db_s_done.
1055 	 */
1056 	DB_ASSERT(env, sdbp->s_refcnt != 0);
1057 	if (--sdbp->s_refcnt == 0) {
1058 		LIST_REMOVE(sdbp, s_links);
1059 		/* We don't want to call close while the mutex is held. */
1060 		doclose = 1;
1061 	}
1062 	MUTEX_UNLOCK(env, primary->mutex);
1063 
1064 	/*
1065 	 * sdbp->close is this function;  call the real one explicitly if
1066 	 * need be.
1067 	 */
1068 done:	return (doclose ? __db_close(sdbp, NULL, flags) : 0);
1069 }
1070 
1071 /*
1072  * __db_associate_foreign --
1073  *	Associate this database (fdbp) as a foreign constraint to another
1074  *	database (pdbp).  That is, dbp's keys appear as foreign key values in
1075  *	pdbp.
1076  *
1077  * PUBLIC: int __db_associate_foreign __P((DB *, DB *,
1078  * PUBLIC:     int (*)(DB *, const DBT *, DBT *, const DBT *, int *),
1079  * PUBLIC:     u_int32_t));
1080  */
1081 int
__db_associate_foreign(fdbp,pdbp,callback,flags)1082 __db_associate_foreign(fdbp, pdbp, callback, flags)
1083 	DB *fdbp, *pdbp;
1084 	int (*callback)(DB *, const DBT *, DBT *, const DBT *, int *);
1085 	u_int32_t flags;
1086 {
1087 	DB_FOREIGN_INFO *f_info;
1088 	ENV *env;
1089 	int ret;
1090 
1091 	env = fdbp->env;
1092 	ret = 0;
1093 
1094 	if ((ret = __os_malloc(env, sizeof(DB_FOREIGN_INFO), &f_info)) != 0) {
1095 		return (ret);
1096 	}
1097 	memset(f_info, 0, sizeof(DB_FOREIGN_INFO));
1098 
1099 	f_info->dbp = pdbp;
1100 	f_info->callback = callback;
1101 
1102 	/*
1103 	 * It might be wise to filter this, but for now the flags only
1104 	 * set the delete action type.
1105 	 */
1106 	FLD_SET(f_info->flags, flags);
1107 
1108 	/*
1109 	 * Add f_info to the foreign database's list of primaries.  That is to
1110 	 * say, fdbp->f_primaries lists all databases for which fdbp is a
1111 	 * foreign constraint.
1112 	 */
1113 	MUTEX_LOCK(env, fdbp->mutex);
1114 	LIST_INSERT_HEAD(&fdbp->f_primaries, f_info, f_links);
1115 	MUTEX_UNLOCK(env, fdbp->mutex);
1116 
1117 	/*
1118 	* Associate fdbp as pdbp's foreign db, for referential integrity
1119 	* checks.  We don't allow the foreign db to be changed, because we
1120 	* currently have no way of removing pdbp from the old foreign db's list
1121 	* of primaries.
1122 	*/
1123 	if (pdbp->s_foreign != NULL)
1124 		return (EINVAL);
1125 	pdbp->s_foreign = fdbp;
1126 
1127 	return (ret);
1128 }
1129 
1130 static int
__dbc_set_priority(dbc,priority)1131 __dbc_set_priority(dbc, priority)
1132 	DBC *dbc;
1133 	DB_CACHE_PRIORITY priority;
1134 {
1135 	dbc->priority = priority;
1136 	return (0);
1137 }
1138 
1139 static int
__dbc_get_priority(dbc,priority)1140 __dbc_get_priority(dbc, priority)
1141 	DBC *dbc;
1142 	DB_CACHE_PRIORITY *priority;
1143 {
1144 	if (dbc->priority == DB_PRIORITY_UNCHANGED)
1145 		return (__memp_get_priority(dbc->dbp->mpf, priority));
1146 	else
1147 		*priority = dbc->priority;
1148 
1149 	return (0);
1150 }
1151