1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1998, 2013 Oracle and/or its affiliates. All rights reserved.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/hash.h"
15 #include "dbinc/heap.h"
16 #include "dbinc/lock.h"
17 #include "dbinc/mp.h"
18 #include "dbinc/partition.h"
19 #include "dbinc/qam.h"
20 #include "dbinc/txn.h"
21
22 static int __db_secondary_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
23 static int __dbc_set_priority __P((DBC *, DB_CACHE_PRIORITY));
24 static int __dbc_get_priority __P((DBC *, DB_CACHE_PRIORITY* ));
25
26 /*
27 * __db_cursor_int --
28 * Internal routine to create a cursor.
29 *
30 * PUBLIC: int __db_cursor_int __P((DB *, DB_THREAD_INFO *,
31 * PUBLIC: DB_TXN *, DBTYPE, db_pgno_t, int, DB_LOCKER *, DBC **));
32 */
33 int
__db_cursor_int(dbp,ip,txn,dbtype,root,flags,locker,dbcp)34 __db_cursor_int(dbp, ip, txn, dbtype, root, flags, locker, dbcp)
35 DB *dbp;
36 DB_THREAD_INFO *ip;
37 DB_TXN *txn;
38 DBTYPE dbtype;
39 db_pgno_t root;
40 int flags;
41 DB_LOCKER *locker;
42 DBC **dbcp;
43 {
44 DBC *dbc;
45 DBC_INTERNAL *cp;
46 DB_LOCKREQ req;
47 ENV *env;
48 db_threadid_t tid;
49 int allocated, envlid, ret;
50 pid_t pid;
51
52 env = dbp->env;
53 allocated = envlid = 0;
54
55 /*
56 * If dbcp is non-NULL it is assumed to point to an area to initialize
57 * as a cursor.
58 *
59 * Take one from the free list if it's available. Take only the
60 * right type. With off page dups we may have different kinds
61 * of cursors on the queue for a single database.
62 */
63 MUTEX_LOCK(env, dbp->mutex);
64
65 #ifndef HAVE_NO_DB_REFCOUNT
66 /*
67 * If this DBP is being logged then refcount the log filename
68 * relative to this transaction. We do this here because we have
69 * the dbp->mutex which protects the refcount. We want to avoid
70 * calling the function if the transaction handle has a shared parent
71 * locker or we are duplicating a cursor. This includes the case of
72 * creating an off page duplicate cursor.
73 * If we knew this cursor will not be used in an update, we could avoid
74 * this, but we don't have that information.
75 */
76 if (IS_REAL_TXN(txn) &&
77 !LF_ISSET(DBC_OPD | DBC_DUPLICATE) &&
78 !F_ISSET(dbp, DB_AM_RECOVER) &&
79 dbp->log_filename != NULL && !IS_REP_CLIENT(env) &&
80 (ret = __txn_record_fname(env, txn, dbp->log_filename)) != 0) {
81 MUTEX_UNLOCK(env, dbp->mutex);
82 return (ret);
83 }
84
85 #endif
86
87 TAILQ_FOREACH(dbc, &dbp->free_queue, links)
88 if (dbtype == dbc->dbtype) {
89 TAILQ_REMOVE(&dbp->free_queue, dbc, links);
90 F_CLR(dbc, ~DBC_OWN_LID);
91 break;
92 }
93 MUTEX_UNLOCK(env, dbp->mutex);
94
95 if (dbc == NULL) {
96 if ((ret = __os_calloc(env, 1, sizeof(DBC), &dbc)) != 0)
97 return (ret);
98 allocated = 1;
99 dbc->flags = 0;
100
101 dbc->dbp = dbp;
102 dbc->dbenv = dbp->dbenv;
103 dbc->env = dbp->env;
104
105 /* Set up locking information. */
106 if (LOCKING_ON(env)) {
107 /*
108 * If we are not threaded, we share a locker ID among
109 * all cursors opened in the environment handle,
110 * allocating one if this is the first cursor.
111 *
112 * This relies on the fact that non-threaded DB handles
113 * always have non-threaded environment handles, since
114 * we set DB_THREAD on DB handles created with threaded
115 * environment handles.
116 */
117 if (!DB_IS_THREADED(dbp)) {
118 if (env->env_lref == NULL) {
119 if ((ret = __lock_id(env,
120 NULL, &env->env_lref)) != 0)
121 goto err;
122 envlid = 1;
123 }
124 dbc->lref = env->env_lref;
125 }
126
127 /*
128 * In CDB, secondary indices should share a lock file
129 * ID with the primary; otherwise we're susceptible
130 * to deadlocks. We also use __db_cursor_int rather
131 * than __db_cursor to create secondary update cursors
132 * in c_put and c_del; these won't acquire a new lock.
133 *
134 * !!!
135 * Since this is in the one-time cursor allocation
136 * code, we need to be sure to destroy, not just
137 * close, all cursors in the secondary when we
138 * associate.
139 */
140 if (CDB_LOCKING(env) &&
141 F_ISSET(dbp, DB_AM_SECONDARY))
142 memcpy(dbc->lock.fileid,
143 dbp->s_primary->fileid, DB_FILE_ID_LEN);
144 else
145 memcpy(dbc->lock.fileid,
146 dbp->fileid, DB_FILE_ID_LEN);
147
148 if (CDB_LOCKING(env)) {
149 if (F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
150 /*
151 * If we are doing a single lock per
152 * environment, set up the global
153 * lock object just like we do to
154 * single thread creates.
155 */
156 DB_ASSERT(env, sizeof(db_pgno_t) ==
157 sizeof(u_int32_t));
158 dbc->lock_dbt.size = sizeof(u_int32_t);
159 dbc->lock_dbt.data = &dbc->lock.pgno;
160 dbc->lock.pgno = 0;
161 } else {
162 dbc->lock_dbt.size = DB_FILE_ID_LEN;
163 dbc->lock_dbt.data = dbc->lock.fileid;
164 }
165 } else {
166 dbc->lock.type = DB_PAGE_LOCK;
167 dbc->lock_dbt.size = sizeof(dbc->lock);
168 dbc->lock_dbt.data = &dbc->lock;
169 }
170 }
171 /* Init the DBC internal structure. */
172 #ifdef HAVE_PARTITION
173 if (DB_IS_PARTITIONED(dbp)) {
174 if ((ret = __partc_init(dbc)) != 0)
175 goto err;
176 } else
177 #endif
178 switch (dbtype) {
179 case DB_BTREE:
180 case DB_RECNO:
181 if ((ret = __bamc_init(dbc, dbtype)) != 0)
182 goto err;
183 break;
184 case DB_HASH:
185 if ((ret = __hamc_init(dbc)) != 0)
186 goto err;
187 break;
188 case DB_HEAP:
189 if ((ret = __heapc_init(dbc)) != 0)
190 goto err;
191 break;
192 case DB_QUEUE:
193 if ((ret = __qamc_init(dbc)) != 0)
194 goto err;
195 break;
196 case DB_UNKNOWN:
197 default:
198 ret = __db_unknown_type(env, "DB->cursor", dbtype);
199 goto err;
200 }
201
202 cp = dbc->internal;
203 }
204
205 /* Refresh the DBC structure. */
206 dbc->dbtype = dbtype;
207 RESET_RET_MEM(dbc);
208 dbc->set_priority = __dbc_set_priority;
209 dbc->get_priority = __dbc_get_priority;
210 dbc->priority = dbp->priority;
211 dbc->txn_cursors.tqe_next = NULL;
212 dbc->txn_cursors.tqe_prev = NULL;
213
214 /*
215 * If the DB handle is not threaded, there is one locker ID for the
216 * whole environment. There should only one family transaction active
217 * as well. This doesn't apply to CDS group transactions, where the
218 * cursor can simply use the transaction's locker directly.
219 */
220 if (!CDB_LOCKING(env) && txn != NULL && F_ISSET(txn, TXN_FAMILY) &&
221 (F_ISSET(dbc, DBC_OWN_LID) || dbc->lref == NULL || envlid)) {
222 if (LOCKING_ON(env)) {
223 if (dbc->lref == NULL) {
224 if ((ret =
225 __lock_id(env, NULL, &dbc->lref)) != 0)
226 goto err;
227 F_SET(dbc, DBC_OWN_LID);
228 }
229 if ((ret = __lock_addfamilylocker(env,
230 txn->txnid, dbc->lref->id, 1)) != 0)
231 goto err;
232 }
233 F_SET(dbc, DBC_FAMILY);
234 txn = NULL;
235 }
236
237 if ((dbc->txn = txn) != NULL)
238 dbc->locker = txn->locker;
239 else if (LOCKING_ON(env)) {
240 /*
241 * There are certain cases in which we want to create a
242 * new cursor with a particular locker ID that is known
243 * to be the same as (and thus not conflict with) an
244 * open cursor.
245 *
246 * The most obvious case is cursor duplication; when we
247 * call DBC->dup or __dbc_idup, we want to use the original
248 * cursor's locker ID.
249 *
250 * Another case is when updating secondary indices. Standard
251 * CDB locking would mean that we might block ourself: we need
252 * to open an update cursor in the secondary while an update
253 * cursor in the primary is open, and when the secondary and
254 * primary are subdatabases or we're using env-wide locking,
255 * this is disastrous.
256 *
257 * In these cases, our caller will pass a nonzero locker
258 * ID into this function. Use this locker ID instead of
259 * the default as the locker ID for our new cursor.
260 */
261 if (locker != NULL)
262 dbc->locker = locker;
263 else if (LF_ISSET(DB_RECOVER))
264 dbc->locker = NULL;
265 else {
266 if (dbc->lref == NULL) {
267 if ((ret =
268 __lock_id(env, NULL, &dbc->lref)) != 0)
269 goto err;
270 F_SET(dbc, DBC_OWN_LID);
271 }
272 /*
273 * If we are threaded then we need to set the
274 * proper thread id into the locker.
275 */
276 if (DB_IS_THREADED(dbp)) {
277 env->dbenv->thread_id(env->dbenv, &pid, &tid);
278 __lock_set_thread_id(dbc->lref, pid, tid);
279 }
280 dbc->locker = dbc->lref;
281 }
282 }
283
284 /*
285 * These fields change when we are used as a secondary index, so
286 * if the DB is a secondary, make sure they're set properly just
287 * in case we opened some cursors before we were associated.
288 *
289 * __dbc_get is used by all access methods, so this should be safe.
290 */
291 if (F_ISSET(dbp, DB_AM_SECONDARY))
292 dbc->get = dbc->c_get = __dbc_secondary_get_pp;
293
294 /*
295 * Don't enable bulk for btrees with record numbering, since avoiding
296 * a full search avoids taking write locks necessary to maintain
297 * consistent numbering.
298 */
299 if (LF_ISSET(DB_CURSOR_BULK) && dbtype == DB_BTREE &&
300 !F_ISSET(dbp, DB_AM_RECNUM))
301 F_SET(dbc, DBC_BULK);
302 if (LF_ISSET(DB_CURSOR_TRANSIENT))
303 F_SET(dbc, DBC_TRANSIENT);
304 if (LF_ISSET(DBC_OPD))
305 F_SET(dbc, DBC_OPD);
306 if (F_ISSET(dbp, DB_AM_RECOVER) || LF_ISSET(DB_RECOVER))
307 F_SET(dbc, DBC_RECOVER);
308 if (F_ISSET(dbp, DB_AM_COMPENSATE))
309 F_SET(dbc, DBC_DONTLOCK);
310 /*
311 * If this database is exclusive then the cursor
312 * does not need to get locks.
313 */
314 if (F2_ISSET(dbp, DB2_AM_EXCL)) {
315 F_SET(dbc, DBC_DONTLOCK);
316 if (IS_REAL_TXN(txn)&& !LF_ISSET(DBC_OPD | DBC_DUPLICATE)) {
317 /*
318 * Exclusive databases can only have one active
319 * transaction at a time since there are no internal
320 * locks to prevent one transaction from reading and
321 * writing another's uncommitted changes.
322 */
323 if (dbp->cur_txn != NULL && dbp->cur_txn != txn) {
324 __db_errx(env, DB_STR("0749",
325 "Exclusive database handles can only have one active transaction at a time."));
326 ret = EINVAL;
327 goto err;
328 }
329 /* Do not trade a second time. */
330 if (dbp->cur_txn != txn) {
331 /* Trade the handle lock to the txn locker. */
332 memset(&req, 0, sizeof(req));
333 req.lock = dbp->handle_lock;
334 req.op = DB_LOCK_TRADE;
335 if ((ret = __lock_vec(env, txn->locker, 0,
336 &req, 1, 0)) != 0)
337 goto err;
338 dbp->cur_txn = txn;
339 dbp->cur_locker = txn->locker;
340 if ((ret = __txn_lockevent(env, txn, dbp,
341 &dbp->handle_lock, dbp->locker)) != 0)
342 goto err;
343 }
344 }
345 }
346 #ifdef HAVE_REPLICATION
347 /*
348 * If we are replicating from a down rev version then we must
349 * use old locking protocols.
350 */
351 if (LOGGING_ON(env) &&
352 ((LOG *)env->lg_handle->
353 reginfo.primary)->persist.version < DB_LOGVERSION_LATCHING)
354 F_SET(dbc, DBC_DOWNREV);
355 #endif
356
357 /* Refresh the DBC internal structure. */
358 cp = dbc->internal;
359 cp->opd = NULL;
360 cp->pdbc = NULL;
361
362 cp->indx = 0;
363 cp->page = NULL;
364 cp->pgno = PGNO_INVALID;
365 cp->root = root;
366 cp->stream_start_pgno = cp->stream_curr_pgno = PGNO_INVALID;
367 cp->stream_off = 0;
368
369 if (DB_IS_PARTITIONED(dbp)) {
370 DBC_PART_REFRESH(dbc);
371 } else switch (dbtype) {
372 case DB_BTREE:
373 case DB_RECNO:
374 if ((ret = __bamc_refresh(dbc)) != 0)
375 goto err;
376 break;
377 case DB_HEAP:
378 if ((ret = __heapc_refresh(dbc)) != 0)
379 goto err;
380 break;
381 case DB_HASH:
382 case DB_QUEUE:
383 break;
384 case DB_UNKNOWN:
385 default:
386 ret = __db_unknown_type(env, "DB->cursor", dbp->type);
387 goto err;
388 }
389
390 /*
391 * The transaction keeps track of how many cursors were opened within
392 * it to catch application errors where the cursor isn't closed when
393 * the transaction is resolved.
394 */
395 if (txn != NULL)
396 ++txn->cursors;
397 if (ip != NULL) {
398 dbc->thread_info = ip;
399 #ifdef DIAGNOSTIC
400 if (dbc->locker != NULL)
401 ip->dbth_locker =
402 R_OFFSET(&(env->lk_handle->reginfo), dbc->locker);
403 else
404 ip->dbth_locker = INVALID_ROFF;
405 #endif
406 } else if (txn != NULL)
407 dbc->thread_info = txn->thread_info;
408 else
409 ENV_GET_THREAD_INFO(env, dbc->thread_info);
410
411 MUTEX_LOCK(env, dbp->mutex);
412 TAILQ_INSERT_TAIL(&dbp->active_queue, dbc, links);
413 F_SET(dbc, DBC_ACTIVE);
414 MUTEX_UNLOCK(env, dbp->mutex);
415
416 *dbcp = dbc;
417 return (0);
418
419 err: if (allocated)
420 __os_free(env, dbc);
421 return (ret);
422 }
423
424 /*
425 * __db_put --
426 * Store a key/data pair.
427 *
428 * PUBLIC: int __db_put __P((DB *,
429 * PUBLIC: DB_THREAD_INFO *, DB_TXN *, DBT *, DBT *, u_int32_t));
430 */
431 int
__db_put(dbp,ip,txn,key,data,flags)432 __db_put(dbp, ip, txn, key, data, flags)
433 DB *dbp;
434 DB_THREAD_INFO *ip;
435 DB_TXN *txn;
436 DBT *key, *data;
437 u_int32_t flags;
438 {
439 DB_HEAP_RID rid;
440 DBC *dbc;
441 DBT tdata, tkey;
442 ENV *env;
443 void *bulk_kptr, *bulk_ptr;
444 db_recno_t recno;
445 u_int32_t cursor_flags;
446 int ret, t_ret;
447
448 env = dbp->env;
449
450 /*
451 * See the comment in __db_get() regarding DB_CURSOR_TRANSIENT.
452 *
453 * Note that the get in the DB_NOOVERWRITE case is safe to do with this
454 * flag set; if it errors in any way other than DB_NOTFOUND, we're
455 * going to close the cursor without doing anything else, and if it
456 * returns DB_NOTFOUND then it's safe to do a c_put(DB_KEYLAST) even if
457 * an access method moved the cursor, since that's not
458 * position-dependent.
459 */
460 cursor_flags = DB_WRITELOCK;
461 if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY))
462 cursor_flags |= DB_CURSOR_BULK;
463 else
464 cursor_flags |= DB_CURSOR_TRANSIENT;
465 if ((ret = __db_cursor(dbp, ip, txn, &dbc, cursor_flags)) != 0)
466 return (ret);
467
468 DEBUG_LWRITE(dbc, txn, "DB->put", key, data, flags);
469 PERFMON6(env, db, put, dbp->fname,
470 dbp->dname, txn == NULL ? 0 : txn->txnid, key, data, flags);
471
472 SET_RET_MEM(dbc, dbp);
473
474 if (flags == DB_APPEND && !DB_IS_PRIMARY(dbp)) {
475 /*
476 * If there is an append callback, the value stored in
477 * data->data may be replaced and then freed. To avoid
478 * passing a freed pointer back to the user, just operate
479 * on a copy of the data DBT.
480 */
481 tdata = *data;
482
483 /*
484 * Append isn't a normal put operation; call the appropriate
485 * access method's append function.
486 */
487 switch (dbp->type) {
488 case DB_HEAP:
489 if ((ret = __heap_append(dbc, key, &tdata)) != 0)
490 goto err;
491 break;
492 case DB_QUEUE:
493 if ((ret = __qam_append(dbc, key, &tdata)) != 0)
494 goto err;
495 break;
496 case DB_RECNO:
497 if ((ret = __ram_append(dbc, key, &tdata)) != 0)
498 goto err;
499 break;
500 case DB_BTREE:
501 case DB_HASH:
502 case DB_UNKNOWN:
503 default:
504 /* The interface should prevent this. */
505 DB_ASSERT(env,
506 dbp->type == DB_QUEUE || dbp->type == DB_RECNO);
507
508 ret = __db_ferr(env, "DB->put", 0);
509 goto err;
510 }
511
512 /*
513 * The append callback, if one exists, may have allocated
514 * a new tdata.data buffer. If so, free it.
515 */
516 FREE_IF_NEEDED(env, &tdata);
517
518 /* No need for a cursor put; we're done. */
519 #ifdef HAVE_COMPRESSION
520 } else if (DB_IS_COMPRESSED(dbp) && !F_ISSET(dbp, DB_AM_SECONDARY) &&
521 !DB_IS_PRIMARY(dbp) && LIST_FIRST(&dbp->f_primaries) == NULL) {
522 ret = __dbc_put(dbc, key, data, flags);
523 #endif
524 } else if (LF_ISSET(DB_MULTIPLE)) {
525 ret = 0;
526 memset(&tkey, 0, sizeof(tkey));
527 if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
528 tkey.data = &recno;
529 tkey.size = sizeof(recno);
530 }
531 memset(&tdata, 0, sizeof(tdata));
532 DB_MULTIPLE_INIT(bulk_kptr, key);
533 DB_MULTIPLE_INIT(bulk_ptr, data);
534 key->doff = 0;
535 while (ret == 0) {
536 if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
537 DB_MULTIPLE_RECNO_NEXT(bulk_kptr, key,
538 recno, tdata.data, tdata.size);
539 else
540 DB_MULTIPLE_NEXT(bulk_kptr, key,
541 tkey.data, tkey.size);
542 DB_MULTIPLE_NEXT(bulk_ptr, data,
543 tdata.data, tdata.size);
544 if (bulk_kptr == NULL || bulk_ptr == NULL)
545 break;
546 if (dbp->type == DB_HEAP) {
547 memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
548 tkey.data = &rid;
549 }
550 ret = __dbc_put(dbc, &tkey, &tdata,
551 LF_ISSET(DB_OPFLAGS_MASK));
552 if (ret == 0)
553 ++key->doff;
554 }
555 } else if (LF_ISSET(DB_MULTIPLE_KEY)) {
556 ret = 0;
557 memset(&tkey, 0, sizeof(tkey));
558 if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
559 tkey.data = &recno;
560 tkey.size = sizeof(recno);
561 }
562 memset(&tdata, 0, sizeof(tdata));
563 DB_MULTIPLE_INIT(bulk_ptr, key);
564 while (ret == 0) {
565 if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
566 DB_MULTIPLE_RECNO_NEXT(bulk_ptr, key, recno,
567 tdata.data, tdata.size);
568 else
569 DB_MULTIPLE_KEY_NEXT(bulk_ptr, key, tkey.data,
570 tkey.size, tdata.data, tdata.size);
571 if (bulk_ptr == NULL)
572 break;
573 if (dbp->type == DB_HEAP) {
574 memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
575 tkey.data = &rid;
576 }
577 ret = __dbc_put(dbc, &tkey, &tdata,
578 LF_ISSET(DB_OPFLAGS_MASK));
579 if (ret == 0)
580 ++key->doff;
581 }
582 } else
583 ret = __dbc_put(dbc, key, data, flags);
584
585 err: /* Close the cursor. */
586 if (!DB_RETOK_DBPUT(ret))
587 F_SET(dbc, DBC_ERROR);
588 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
589 ret = t_ret;
590
591 return (ret);
592 }
593
594 /*
595 * __db_del --
596 * Delete the items referenced by a key.
597 *
598 * PUBLIC: int __db_del __P((DB *,
599 * PUBLIC: DB_THREAD_INFO *, DB_TXN *, DBT *, u_int32_t));
600 */
601 int
__db_del(dbp,ip,txn,key,flags)602 __db_del(dbp, ip, txn, key, flags)
603 DB *dbp;
604 DB_THREAD_INFO *ip;
605 DB_TXN *txn;
606 DBT *key;
607 u_int32_t flags;
608 {
609 DB_HEAP_RID rid;
610 DBC *dbc;
611 DBT data, tkey;
612 void *bulk_ptr;
613 db_recno_t recno;
614 u_int32_t cursor_flags, f_init, f_next;
615 int ret, t_ret;
616
617 COMPQUIET(bulk_ptr, NULL);
618 /* Allocate a cursor. */
619 cursor_flags = DB_WRITELOCK;
620 if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY))
621 cursor_flags |= DB_CURSOR_BULK;
622 if ((ret = __db_cursor(dbp, ip, txn, &dbc, cursor_flags)) != 0)
623 return (ret);
624
625 DEBUG_LWRITE(dbc, txn, "DB->del", key, NULL, flags);
626 PERFMON5(env, db, del,
627 dbp->fname, dbp->dname, txn == NULL ? 0 : txn->txnid, key, flags);
628
629 #ifdef HAVE_COMPRESSION
630 if (DB_IS_COMPRESSED(dbp) && !F_ISSET(dbp, DB_AM_SECONDARY) &&
631 !DB_IS_PRIMARY(dbp) && LIST_FIRST(&dbp->f_primaries) == NULL) {
632 F_SET(dbc, DBC_TRANSIENT);
633 ret = __dbc_bulk_del(dbc, key, flags);
634 goto err;
635 }
636 #endif
637
638 /*
639 * Walk a cursor through the key/data pairs, deleting as we go. Set
640 * the DB_DBT_USERMEM flag, as this might be a threaded application
641 * and the flags checking will catch us. We don't actually want the
642 * keys or data, set DB_DBT_ISSET. We rely on __dbc_get to clear
643 * this.
644 */
645 memset(&data, 0, sizeof(data));
646 F_SET(&data, DB_DBT_USERMEM);
647 tkey = *key;
648
649 f_init = LF_ISSET(DB_MULTIPLE_KEY) ? DB_GET_BOTH : DB_SET;
650 f_next = DB_NEXT_DUP;
651
652 /*
653 * If locking (and we haven't already acquired CDB locks), set the
654 * read-modify-write flag.
655 */
656 if (STD_LOCKING(dbc)) {
657 f_init |= DB_RMW;
658 f_next |= DB_RMW;
659 }
660
661 if (LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY)) {
662 if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO) {
663 memset(&tkey, 0, sizeof(tkey));
664 tkey.data = &recno;
665 tkey.size = sizeof(recno);
666 }
667 DB_MULTIPLE_INIT(bulk_ptr, key);
668 /* We return the number of keys deleted in doff. */
669 key->doff = 0;
670 bulk_next: if (dbp->type == DB_QUEUE || dbp->type == DB_RECNO)
671 DB_MULTIPLE_RECNO_NEXT(bulk_ptr, key,
672 recno, data.data, data.size);
673 else if (LF_ISSET(DB_MULTIPLE))
674 DB_MULTIPLE_NEXT(bulk_ptr, key, tkey.data, tkey.size);
675 else
676 DB_MULTIPLE_KEY_NEXT(bulk_ptr, key,
677 tkey.data, tkey.size, data.data, data.size);
678 if (bulk_ptr == NULL)
679 goto err;
680 if (dbp->type == DB_HEAP) {
681 memcpy(&rid, tkey.data, sizeof(DB_HEAP_RID));
682 tkey.data = &rid;
683 }
684
685 }
686
687 /* We're not interested in the data -- do not return it. */
688 F_SET(&tkey, DB_DBT_ISSET);
689 F_SET(&data, DB_DBT_ISSET);
690
691 /*
692 * Optimize the simple cases. For all AMs if we don't have secondaries
693 * and are not a secondary and we aren't a foreign database and there
694 * are no dups then we can avoid a bunch of overhead. For queue we
695 * don't need to fetch the record since we delete by direct calculation
696 * from the record number.
697 *
698 * Hash permits an optimization in DB->del: since on-page duplicates are
699 * stored in a single HKEYDATA structure, it's possible to delete an
700 * entire set of them at once, and as the HKEYDATA has to be rebuilt
701 * and re-put each time it changes, this is much faster than deleting
702 * the duplicates one by one. Thus, if not pointing at an off-page
703 * duplicate set, and we're not using secondary indices (in which case
704 * we'd have to examine the items one by one anyway), let hash do this
705 * "quick delete".
706 *
707 * !!!
708 * Note that this is the only application-executed delete call in
709 * Berkeley DB that does not go through the __dbc_del function.
710 * If anything other than the delete itself (like a secondary index
711 * update) has to happen there in a particular situation, the
712 * conditions here should be modified not to use these optimizations.
713 * The ordinary AM-independent alternative will work just fine;
714 * it'll just be slower.
715 */
716 if (!F_ISSET(dbp, DB_AM_SECONDARY) && !DB_IS_PRIMARY(dbp) &&
717 LIST_FIRST(&dbp->f_primaries) == NULL) {
718 #ifdef HAVE_QUEUE
719 if (dbp->type == DB_QUEUE) {
720 ret = __qam_delete(dbc, &tkey, flags);
721 goto next;
722 }
723 #endif
724
725 /* Fetch the first record. */
726 if ((ret = __dbc_get(dbc, &tkey, &data, f_init)) != 0)
727 goto err;
728
729 #ifdef HAVE_HASH
730 /*
731 * Hash "quick delete" removes all on-page duplicates. We
732 * can't do that if deleting specific key/data pairs.
733 */
734 if (dbp->type == DB_HASH && !LF_ISSET(DB_MULTIPLE_KEY)) {
735 DBC *sdbc;
736 sdbc = dbc;
737 #ifdef HAVE_PARTITION
738 if (F_ISSET(dbc, DBC_PARTITIONED))
739 sdbc =
740 ((PART_CURSOR*)dbc->internal)->sub_cursor;
741 #endif
742 if (sdbc->internal->opd == NULL) {
743 ret = __ham_quick_delete(sdbc);
744 goto next;
745 }
746 }
747 #endif
748
749 if (!F_ISSET(dbp, DB_AM_DUP)) {
750 ret = dbc->am_del(dbc, 0);
751 goto next;
752 }
753 } else if ((ret = __dbc_get(dbc, &tkey, &data, f_init)) != 0)
754 goto err;
755
756 /* Walk through the set of key/data pairs, deleting as we go. */
757 for (;;) {
758 if ((ret = __dbc_del(dbc, flags)) != 0)
759 break;
760 /*
761 * With DB_MULTIPLE_KEY, the application has specified the
762 * exact records they want deleted. We don't need to walk
763 * through a set of duplicates.
764 */
765 if (LF_ISSET(DB_MULTIPLE_KEY))
766 break;
767
768 F_SET(&tkey, DB_DBT_ISSET);
769 F_SET(&data, DB_DBT_ISSET);
770 if ((ret = __dbc_get(dbc, &tkey, &data, f_next)) != 0) {
771 if (ret == DB_NOTFOUND)
772 ret = 0;
773 break;
774 }
775 }
776
777 next: if (ret == 0 && LF_ISSET(DB_MULTIPLE | DB_MULTIPLE_KEY)) {
778 ++key->doff;
779 goto bulk_next;
780 }
781 err: /* Discard the cursor. */
782 if (!DB_RETOK_DBDEL(ret))
783 F_SET(dbc, DBC_ERROR);
784 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
785 ret = t_ret;
786
787 return (ret);
788 }
789
790 /*
791 * __db_sync --
792 * Flush the database cache.
793 *
794 * PUBLIC: int __db_sync __P((DB *));
795 */
796 int
__db_sync(dbp)797 __db_sync(dbp)
798 DB *dbp;
799 {
800 int ret, t_ret;
801
802 ret = 0;
803
804 /* If the database was read-only, we're done. */
805 if (F_ISSET(dbp, DB_AM_RDONLY))
806 return (0);
807
808 /* If it's a Recno tree, write the backing source text file. */
809 if (dbp->type == DB_RECNO)
810 ret = __ram_writeback(dbp);
811
812 /* If the database was never backed by a database file, we're done. */
813 if (F_ISSET(dbp, DB_AM_INMEM))
814 return (ret);
815 #ifdef HAVE_PARTITION
816 if (DB_IS_PARTITIONED(dbp))
817 ret = __partition_sync(dbp);
818 else
819 #endif
820 if (dbp->type == DB_QUEUE)
821 ret = __qam_sync(dbp);
822 else
823 /* Flush any dirty pages from the cache to the backing file. */
824 if ((t_ret = __memp_fsync(dbp->mpf)) != 0 && ret == 0)
825 ret = t_ret;
826
827 return (ret);
828 }
829
830 /*
831 * __db_associate --
832 * Associate another database as a secondary index to this one.
833 *
834 * PUBLIC: int __db_associate __P((DB *, DB_THREAD_INFO *, DB_TXN *, DB *,
835 * PUBLIC: int (*)(DB *, const DBT *, const DBT *, DBT *), u_int32_t));
836 */
837 int
__db_associate(dbp,ip,txn,sdbp,callback,flags)838 __db_associate(dbp, ip, txn, sdbp, callback, flags)
839 DB *dbp, *sdbp;
840 DB_THREAD_INFO *ip;
841 DB_TXN *txn;
842 int (*callback) __P((DB *, const DBT *, const DBT *, DBT *));
843 u_int32_t flags;
844 {
845 DBC *pdbc, *sdbc;
846 DBT key, data, skey, *tskeyp;
847 ENV *env;
848 int build, ret, t_ret;
849 u_int32_t nskey;
850
851 env = dbp->env;
852 pdbc = sdbc = NULL;
853 ret = 0;
854
855 memset(&skey, 0, sizeof(DBT));
856 nskey = 0;
857 tskeyp = NULL;
858
859 /*
860 * Check to see if the secondary is empty -- and thus if we should
861 * build it -- before we link it in and risk making it show up in other
862 * threads. Do this first so that the databases remain unassociated on
863 * error.
864 */
865 build = 0;
866 if (LF_ISSET(DB_CREATE)) {
867 FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_CREATE);
868
869 if ((ret = __db_cursor(sdbp, ip, txn, &sdbc, 0)) != 0)
870 goto err;
871
872 /*
873 * We don't care about key or data; we're just doing
874 * an existence check.
875 */
876 memset(&key, 0, sizeof(DBT));
877 memset(&data, 0, sizeof(DBT));
878 F_SET(&key, DB_DBT_PARTIAL | DB_DBT_USERMEM);
879 F_SET(&data, DB_DBT_PARTIAL | DB_DBT_USERMEM);
880 if ((ret = __dbc_get(sdbc, &key, &data,
881 (STD_LOCKING(sdbc) ? DB_RMW : 0) |
882 DB_FIRST)) == DB_NOTFOUND) {
883 build = 1;
884 ret = 0;
885 }
886
887 if (ret != 0)
888 F_SET(sdbc, DBC_ERROR);
889 if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
890 ret = t_ret;
891
892 /* Reset for later error check. */
893 sdbc = NULL;
894
895 if (ret != 0)
896 goto err;
897 }
898
899 /*
900 * Set up the database handle as a secondary.
901 */
902 sdbp->s_callback = callback;
903 sdbp->s_primary = dbp;
904
905 sdbp->stored_get = sdbp->get;
906 sdbp->get = __db_secondary_get;
907
908 sdbp->stored_close = sdbp->close;
909 sdbp->close = __db_secondary_close_pp;
910
911 F_SET(sdbp, DB_AM_SECONDARY);
912
913 if (LF_ISSET(DB_IMMUTABLE_KEY))
914 FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY);
915
916 /*
917 * Add the secondary to the list on the primary. Do it here
918 * so that we see any updates that occur while we're walking
919 * the primary.
920 */
921 MUTEX_LOCK(env, dbp->mutex);
922
923 /* See __db_s_next for an explanation of secondary refcounting. */
924 DB_ASSERT(env, sdbp->s_refcnt == 0);
925 sdbp->s_refcnt = 1;
926 LIST_INSERT_HEAD(&dbp->s_secondaries, sdbp, s_links);
927 MUTEX_UNLOCK(env, dbp->mutex);
928
929 if (build) {
930 /*
931 * We loop through the primary, putting each item we
932 * find into the new secondary.
933 *
934 * If we're using CDB, opening these two cursors puts us
935 * in a bit of a locking tangle: CDB locks are done on the
936 * primary, so that we stay deadlock-free, but that means
937 * that updating the secondary while we have a read cursor
938 * open on the primary will self-block. To get around this,
939 * we force the primary cursor to use the same locker ID
940 * as the secondary, so they won't conflict. This should
941 * be harmless even if we're not using CDB.
942 */
943 if ((ret = __db_cursor(sdbp, ip, txn, &sdbc,
944 CDB_LOCKING(sdbp->env) ? DB_WRITECURSOR : 0)) != 0)
945 goto err;
946 if ((ret = __db_cursor_int(dbp, ip,
947 txn, dbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
948 goto err;
949
950 /* Lock out other threads, now that we have a locker. */
951 dbp->associate_locker = sdbc->locker;
952
953 memset(&key, 0, sizeof(DBT));
954 memset(&data, 0, sizeof(DBT));
955 while ((ret = __dbc_get(pdbc, &key, &data, DB_NEXT)) == 0) {
956 if ((ret = callback(sdbp, &key, &data, &skey)) != 0) {
957 if (ret == DB_DONOTINDEX)
958 continue;
959 goto err;
960 }
961 if (F_ISSET(&skey, DB_DBT_MULTIPLE)) {
962 #ifdef DIAGNOSTIC
963 __db_check_skeyset(sdbp, &skey);
964 #endif
965 nskey = skey.size;
966 tskeyp = (DBT *)skey.data;
967 } else {
968 nskey = 1;
969 tskeyp = &skey;
970 }
971 SWAP_IF_NEEDED(sdbp, &key);
972 for (; nskey > 0; nskey--, tskeyp++) {
973 if ((ret = __dbc_put(sdbc,
974 tskeyp, &key, DB_UPDATE_SECONDARY)) != 0)
975 goto err;
976 FREE_IF_NEEDED(env, tskeyp);
977 }
978 SWAP_IF_NEEDED(sdbp, &key);
979 FREE_IF_NEEDED(env, &skey);
980 }
981 if (ret == DB_NOTFOUND)
982 ret = 0;
983 }
984
985 err: if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
986 ret = t_ret;
987
988 if (pdbc != NULL && (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
989 ret = t_ret;
990
991 dbp->associate_locker = NULL;
992
993 for (; nskey > 0; nskey--, tskeyp++)
994 FREE_IF_NEEDED(env, tskeyp);
995 FREE_IF_NEEDED(env, &skey);
996
997 return (ret);
998 }
999
1000 /*
1001 * __db_secondary_get --
1002 * This wrapper function for DB->pget() is the DB->get() function
1003 * on a database which has been made into a secondary index.
1004 */
1005 static int
__db_secondary_get(sdbp,txn,skey,data,flags)1006 __db_secondary_get(sdbp, txn, skey, data, flags)
1007 DB *sdbp;
1008 DB_TXN *txn;
1009 DBT *skey, *data;
1010 u_int32_t flags;
1011 {
1012 DB_ASSERT(sdbp->env, F_ISSET(sdbp, DB_AM_SECONDARY));
1013 return (__db_pget_pp(sdbp, txn, skey, NULL, data, flags));
1014 }
1015
1016 /*
1017 * __db_secondary_close --
1018 * Wrapper function for DB->close() which we use on secondaries to
1019 * manage refcounting and make sure we don't close them underneath
1020 * a primary that is updating.
1021 *
1022 * PUBLIC: int __db_secondary_close __P((DB *, u_int32_t));
1023 */
1024 int
__db_secondary_close(sdbp,flags)1025 __db_secondary_close(sdbp, flags)
1026 DB *sdbp;
1027 u_int32_t flags;
1028 {
1029 DB *primary;
1030 ENV *env;
1031 int doclose;
1032
1033 /*
1034 * If the opening transaction is rolled back then the db handle
1035 * will have already been refreshed, we just need to call
1036 * __db_close to free the data.
1037 */
1038 if (!F_ISSET(sdbp, DB_AM_OPEN_CALLED)) {
1039 doclose = 1;
1040 goto done;
1041 }
1042 doclose = 0;
1043 primary = sdbp->s_primary;
1044 env = primary->env;
1045
1046 MUTEX_LOCK(env, primary->mutex);
1047 /*
1048 * Check the refcount--if it was at 1 when we were called, no
1049 * thread is currently updating this secondary through the primary,
1050 * so it's safe to close it for real.
1051 *
1052 * If it's not safe to do the close now, we do nothing; the
1053 * database will actually be closed when the refcount is decremented,
1054 * which can happen in either __db_s_next or __db_s_done.
1055 */
1056 DB_ASSERT(env, sdbp->s_refcnt != 0);
1057 if (--sdbp->s_refcnt == 0) {
1058 LIST_REMOVE(sdbp, s_links);
1059 /* We don't want to call close while the mutex is held. */
1060 doclose = 1;
1061 }
1062 MUTEX_UNLOCK(env, primary->mutex);
1063
1064 /*
1065 * sdbp->close is this function; call the real one explicitly if
1066 * need be.
1067 */
1068 done: return (doclose ? __db_close(sdbp, NULL, flags) : 0);
1069 }
1070
1071 /*
1072 * __db_associate_foreign --
1073 * Associate this database (fdbp) as a foreign constraint to another
1074 * database (pdbp). That is, dbp's keys appear as foreign key values in
1075 * pdbp.
1076 *
1077 * PUBLIC: int __db_associate_foreign __P((DB *, DB *,
1078 * PUBLIC: int (*)(DB *, const DBT *, DBT *, const DBT *, int *),
1079 * PUBLIC: u_int32_t));
1080 */
1081 int
__db_associate_foreign(fdbp,pdbp,callback,flags)1082 __db_associate_foreign(fdbp, pdbp, callback, flags)
1083 DB *fdbp, *pdbp;
1084 int (*callback)(DB *, const DBT *, DBT *, const DBT *, int *);
1085 u_int32_t flags;
1086 {
1087 DB_FOREIGN_INFO *f_info;
1088 ENV *env;
1089 int ret;
1090
1091 env = fdbp->env;
1092 ret = 0;
1093
1094 if ((ret = __os_malloc(env, sizeof(DB_FOREIGN_INFO), &f_info)) != 0) {
1095 return (ret);
1096 }
1097 memset(f_info, 0, sizeof(DB_FOREIGN_INFO));
1098
1099 f_info->dbp = pdbp;
1100 f_info->callback = callback;
1101
1102 /*
1103 * It might be wise to filter this, but for now the flags only
1104 * set the delete action type.
1105 */
1106 FLD_SET(f_info->flags, flags);
1107
1108 /*
1109 * Add f_info to the foreign database's list of primaries. That is to
1110 * say, fdbp->f_primaries lists all databases for which fdbp is a
1111 * foreign constraint.
1112 */
1113 MUTEX_LOCK(env, fdbp->mutex);
1114 LIST_INSERT_HEAD(&fdbp->f_primaries, f_info, f_links);
1115 MUTEX_UNLOCK(env, fdbp->mutex);
1116
1117 /*
1118 * Associate fdbp as pdbp's foreign db, for referential integrity
1119 * checks. We don't allow the foreign db to be changed, because we
1120 * currently have no way of removing pdbp from the old foreign db's list
1121 * of primaries.
1122 */
1123 if (pdbp->s_foreign != NULL)
1124 return (EINVAL);
1125 pdbp->s_foreign = fdbp;
1126
1127 return (ret);
1128 }
1129
1130 static int
__dbc_set_priority(dbc,priority)1131 __dbc_set_priority(dbc, priority)
1132 DBC *dbc;
1133 DB_CACHE_PRIORITY priority;
1134 {
1135 dbc->priority = priority;
1136 return (0);
1137 }
1138
1139 static int
__dbc_get_priority(dbc,priority)1140 __dbc_get_priority(dbc, priority)
1141 DBC *dbc;
1142 DB_CACHE_PRIORITY *priority;
1143 {
1144 if (dbc->priority == DB_PRIORITY_UNCHANGED)
1145 return (__memp_get_priority(dbc->dbp->mpf, priority));
1146 else
1147 *priority = dbc->priority;
1148
1149 return (0);
1150 }
1151