1 /*-
2 * Copyright (c) 2000, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file LICENSE for license information.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/fop.h"
15 #include "dbinc/hash.h"
16 #include "dbinc/heap.h"
17 #include "dbinc/lock.h"
18 #include "dbinc/mp.h"
19 #include "dbinc/partition.h"
20 #include "dbinc/qam.h"
21 #include "dbinc/txn.h"
22
23 static int __db_s_count __P((DB *));
24 static int __db_wrlock_err __P((ENV *));
25 static int __dbc_del_foreign __P((DBC *));
26 static int __dbc_del_oldskey __P((DB *, DBC *, DBT *, DBT *, DBT *));
27 static int __dbc_del_secondary __P((DBC *));
28 static int __dbc_pget_recno __P((DBC *, DBT *, DBT *, u_int32_t));
29 static inline int __dbc_put_append __P((DBC *,
30 DBT *, DBT *, u_int32_t *, u_int32_t));
31 static inline int __dbc_put_fixed_len __P((DBC *, DBT *, DBT *));
32 static inline int __dbc_put_partial __P((DBC *,
33 DBT *, DBT *, DBT *, DBT *, u_int32_t *, u_int32_t));
34 static int __dbc_put_primary __P((DBC *, DBT *, DBT *, u_int32_t));
35 static inline int __dbc_put_resolve_key __P((DBC *,
36 DBT *, DBT *, u_int32_t *, u_int32_t));
37 static inline int __dbc_put_secondaries __P((DBC *,
38 DBT *, DBT *, DBT *, int, DBT *, u_int32_t *));
39
40 #define CDB_LOCKING_INIT(env, dbc) \
41 /* \
42 * If we are running CDB, this had better be either a write \
43 * cursor or an immediate writer. If it's a regular writer, \
44 * that means we have an IWRITE lock and we need to upgrade \
45 * it to a write lock. \
46 */ \
47 if (CDB_LOCKING(env)) { \
48 if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER)) \
49 return (__db_wrlock_err(env)); \
50 \
51 if (F_ISSET(dbc, DBC_WRITECURSOR) && \
52 (ret = __lock_get(env, \
53 (dbc)->locker, DB_LOCK_UPGRADE, &(dbc)->lock_dbt, \
54 DB_LOCK_WRITE, &(dbc)->mylock)) != 0) \
55 return (ret); \
56 }
57 #define CDB_LOCKING_DONE(env, dbc) \
58 /* Release the upgraded lock. */ \
59 if (F_ISSET(dbc, DBC_WRITECURSOR)) \
60 (void)__lock_downgrade( \
61 env, &(dbc)->mylock, DB_LOCK_IWRITE, 0);
62
63 #define SET_READ_LOCKING_FLAGS(dbc, var) do { \
64 var = 0; \
65 if (!F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED)) { \
66 if (LF_ISSET(DB_READ_COMMITTED)) \
67 var = DBC_READ_COMMITTED | DBC_WAS_READ_COMMITTED; \
68 if (LF_ISSET(DB_READ_UNCOMMITTED)) \
69 var = DBC_READ_UNCOMMITTED; \
70 } \
71 LF_CLR(DB_READ_COMMITTED | DB_READ_UNCOMMITTED); \
72 } while (0)
73
74 /*
75 * __dbc_close --
76 * DBC->close.
77 *
78 * PUBLIC: int __dbc_close __P((DBC *));
79 */
80 int
__dbc_close(dbc)81 __dbc_close(dbc)
82 DBC *dbc;
83 {
84 DB *dbp;
85 DBC *opd;
86 DBC_INTERNAL *cp;
87 #ifdef DIAGNOSTIC
88 DB_THREAD_INFO *ip;
89 #endif
90 DB_TXN *txn;
91 ENV *env;
92 int ret, t_ret;
93
94 dbp = dbc->dbp;
95 env = dbp->env;
96 cp = dbc->internal;
97 opd = cp->opd;
98 ret = 0;
99
100 /*
101 * Remove the cursor(s) from the active queue. We may be closing two
102 * cursors at once here, a top-level one and a lower-level, off-page
103 * duplicate one. The access-method specific cursor close routine must
104 * close both of them in a single call.
105 *
106 * !!!
107 * Cursors must be removed from the active queue before calling the
108 * access specific cursor close routine, btree depends on having that
109 * order of operations.
110 */
111 MUTEX_LOCK(env, dbp->mutex);
112
113 if (opd != NULL) {
114 DB_ASSERT(env, F_ISSET(opd, DBC_ACTIVE));
115 F_CLR(opd, DBC_ACTIVE);
116 TAILQ_REMOVE(&dbp->active_queue, opd, links);
117 }
118 DB_ASSERT(env, F_ISSET(dbc, DBC_ACTIVE));
119 F_CLR(dbc, DBC_ACTIVE);
120 TAILQ_REMOVE(&dbp->active_queue, dbc, links);
121
122 MUTEX_UNLOCK(env, dbp->mutex);
123
124 /* Call the access specific cursor close routine. */
125 if ((t_ret =
126 dbc->am_close(dbc, PGNO_INVALID, NULL)) != 0 && ret == 0)
127 ret = t_ret;
128
129 /*
130 * Release the lock after calling the access method specific close
131 * routine, a Btree cursor may have had pending deletes.
132 *
133 * Also, be sure not to free anything if mylock.off is INVALID; in
134 * some cases, such as idup'ed read cursors and secondary update
135 * cursors, a cursor in a CDB environment may not have a lock at all.
136 */
137 if (LOCK_ISSET(dbc->mylock)) {
138 if ((t_ret = __LPUT(dbc, dbc->mylock)) != 0 && ret == 0)
139 ret = t_ret;
140
141 /* For safety's sake, since this is going on the free queue. */
142 memset(&dbc->mylock, 0, sizeof(dbc->mylock));
143 if (opd != NULL)
144 memset(&opd->mylock, 0, sizeof(opd->mylock));
145 }
146
147 /*
148 * Remove this cursor's locker ID from its family.
149 */
150 if (F_ISSET(dbc, DBC_OWN_LID) && F_ISSET(dbc, DBC_FAMILY)) {
151 if ((t_ret = __lock_familyremove(env->lk_handle,
152 dbc->lref)) != 0 && ret == 0)
153 ret = t_ret;
154 F_CLR(dbc, DBC_FAMILY);
155 }
156 #ifdef DIAGNOSTIC
157 if (dbc->locker != NULL) {
158 ENV_GET_THREAD_INFO(env, ip);
159 if (ip != NULL)
160 ip->dbth_locker = dbc->locker->prev_locker;
161 dbc->locker->prev_locker = INVALID_ROFF;
162 }
163 #endif
164
165 if ((txn = dbc->txn) != NULL)
166 txn->cursors--;
167
168 /* Move the cursor(s) to the free queue. */
169 MUTEX_LOCK(env, dbp->mutex);
170 if (opd != NULL) {
171 if (txn != NULL)
172 txn->cursors--;
173 TAILQ_INSERT_TAIL(&dbp->free_queue, opd, links);
174 }
175 TAILQ_INSERT_TAIL(&dbp->free_queue, dbc, links);
176 MUTEX_UNLOCK(env, dbp->mutex);
177
178 if (txn != NULL && F_ISSET(txn, TXN_PRIVATE) && txn->cursors == 0 &&
179 (t_ret = __txn_commit(txn, 0)) != 0 && ret == 0)
180 ret = t_ret;
181
182 return (ret);
183 }
184
185 /*
186 * __dbc_destroy --
187 * Destroy the cursor, called after DBC->close.
188 *
189 * PUBLIC: int __dbc_destroy __P((DBC *));
190 */
191 int
__dbc_destroy(dbc)192 __dbc_destroy(dbc)
193 DBC *dbc;
194 {
195 DB *dbp;
196 ENV *env;
197 int ret, t_ret;
198
199 dbp = dbc->dbp;
200 env = dbp->env;
201
202 /* Remove the cursor from the free queue. */
203 MUTEX_LOCK(env, dbp->mutex);
204 TAILQ_REMOVE(&dbp->free_queue, dbc, links);
205 MUTEX_UNLOCK(env, dbp->mutex);
206
207 /* Free up allocated memory. */
208 if (dbc->my_rskey.data != NULL)
209 __os_free(env, dbc->my_rskey.data);
210 if (dbc->my_rkey.data != NULL)
211 __os_free(env, dbc->my_rkey.data);
212 if (dbc->my_rdata.data != NULL)
213 __os_free(env, dbc->my_rdata.data);
214
215 /* Call the access specific cursor destroy routine. */
216 ret = dbc->am_destroy == NULL ? 0 : dbc->am_destroy(dbc);
217
218 /*
219 * Release the lock id for this cursor.
220 */
221 if (LOCKING_ON(env) &&
222 F_ISSET(dbc, DBC_OWN_LID) &&
223 (t_ret = __lock_id_free(env, dbc->lref)) != 0 && ret == 0)
224 ret = t_ret;
225
226 __os_free(env, dbc);
227
228 return (ret);
229 }
230
231 /*
232 * __dbc_cmp --
233 * Compare the position of two cursors. Return whether two cursors are
234 * pointing to the same key/data pair.
235 *
236 * result == 0 if both cursors refer to the same item.
237 * result == 1 otherwise
238 *
239 * PUBLIC: int __dbc_cmp __P((DBC *, DBC *, int *));
240 */
241 int
__dbc_cmp(dbc,other_dbc,result)242 __dbc_cmp(dbc, other_dbc, result)
243 DBC *dbc, *other_dbc;
244 int *result;
245 {
246 DBC *curr_dbc, *curr_odbc;
247 DBC_INTERNAL *dbc_int, *odbc_int;
248 ENV *env;
249 int ret;
250
251 env = dbc->env;
252 ret = 0;
253
254 #ifdef HAVE_PARTITION
255 if (DB_IS_PARTITIONED(dbc->dbp)) {
256 dbc = ((PART_CURSOR *)dbc->internal)->sub_cursor;
257 other_dbc = ((PART_CURSOR *)other_dbc->internal)->sub_cursor;
258 }
259 /* Both cursors must still be valid. */
260 if (dbc == NULL || other_dbc == NULL) {
261 __db_errx(env, DB_STR("0692",
262 "Both cursors must be initialized before calling DBC->cmp."));
263 return (EINVAL);
264 }
265
266 if (dbc->dbp != other_dbc->dbp) {
267 *result = 1;
268 return (0);
269 }
270 #endif
271
272 #ifdef HAVE_COMPRESSION
273 if (DB_IS_COMPRESSED(dbc->dbp))
274 return (__bamc_compress_cmp(dbc, other_dbc, result));
275 #endif
276
277 curr_dbc = dbc;
278 curr_odbc = other_dbc;
279 dbc_int = dbc->internal;
280 odbc_int = other_dbc->internal;
281
282 /* Both cursors must be on valid positions. */
283 if (dbc_int->pgno == PGNO_INVALID || odbc_int->pgno == PGNO_INVALID) {
284 __db_errx(env, DB_STR("0692",
285 "Both cursors must be initialized before calling DBC->cmp."));
286 return (EINVAL);
287 }
288
289 /*
290 * Use a loop since cursors can be nested. Off page duplicate
291 * sets can only be nested one level deep, so it is safe to use a
292 * while (true) loop.
293 */
294 while (1) {
295 if (dbc_int->pgno == odbc_int->pgno &&
296 dbc_int->indx == odbc_int->indx) {
297 /*
298 * If one cursor is sitting on an off page duplicate
299 * set, the other will be pointing to the same set. Be
300 * careful, and check anyway.
301 */
302 if (dbc_int->opd != NULL && odbc_int->opd != NULL) {
303 curr_dbc = dbc_int->opd;
304 curr_odbc = odbc_int->opd;
305 dbc_int = dbc_int->opd->internal;
306 odbc_int= odbc_int->opd->internal;
307 continue;
308 } else if (dbc_int->opd == NULL &&
309 odbc_int->opd == NULL)
310 *result = 0;
311 else {
312 __db_errx(env, DB_STR("0694",
313 "DBCursor->cmp mismatched off page duplicate cursor pointers."));
314 return (EINVAL);
315 }
316
317 switch (curr_dbc->dbtype) {
318 case DB_HASH:
319 /*
320 * Make sure that on-page duplicate data
321 * indexes match, and that the deleted
322 * flags are consistent.
323 */
324 ret = __hamc_cmp(curr_dbc, curr_odbc, result);
325 break;
326 case DB_BTREE:
327 case DB_RECNO:
328 /*
329 * Check for consisted deleted flags on btree
330 * specific cursors.
331 */
332 ret = __bamc_cmp(curr_dbc, curr_odbc, result);
333 break;
334 default:
335 /* NO-OP break out. */
336 break;
337 }
338 } else
339 *result = 1;
340 return (ret);
341 }
342 /* NOTREACHED. */
343 return (ret);
344 }
345
346 /*
347 * __dbc_count --
348 * Return a count of duplicate data items.
349 *
350 * PUBLIC: int __dbc_count __P((DBC *, db_recno_t *));
351 */
352 int
__dbc_count(dbc,recnop)353 __dbc_count(dbc, recnop)
354 DBC *dbc;
355 db_recno_t *recnop;
356 {
357 ENV *env;
358 int ret;
359
360 env = dbc->env;
361
362 #ifdef HAVE_PARTITION
363 if (DB_IS_PARTITIONED(dbc->dbp))
364 dbc = ((PART_CURSOR *)dbc->internal)->sub_cursor;
365 #endif
366 /*
367 * Cursor Cleanup Note:
368 * All of the cursors passed to the underlying access methods by this
369 * routine are not duplicated and will not be cleaned up on return.
370 * So, pages/locks that the cursor references must be resolved by the
371 * underlying functions.
372 */
373 switch (dbc->dbtype) {
374 case DB_HEAP:
375 case DB_QUEUE:
376 case DB_RECNO:
377 *recnop = 1;
378 break;
379 case DB_HASH:
380 if (dbc->internal->opd == NULL) {
381 if ((ret = __hamc_count(dbc, recnop)) != 0)
382 return (ret);
383 break;
384 }
385 /* FALLTHROUGH */
386 case DB_BTREE:
387 #ifdef HAVE_COMPRESSION
388 if (DB_IS_COMPRESSED(dbc->dbp))
389 return (__bamc_compress_count(dbc, recnop));
390 #endif
391 if ((ret = __bamc_count(dbc, recnop)) != 0)
392 return (ret);
393 break;
394 case DB_UNKNOWN:
395 default:
396 return (__db_unknown_type(env, "__dbc_count", dbc->dbtype));
397 }
398 return (0);
399 }
400
401 /*
402 * __dbc_del --
403 * DBC->del.
404 *
405 * PUBLIC: int __dbc_del __P((DBC *, u_int32_t));
406 */
407 int
__dbc_del(dbc,flags)408 __dbc_del(dbc, flags)
409 DBC *dbc;
410 u_int32_t flags;
411 {
412 DB *dbp;
413 ENV *env;
414 int ret;
415
416 dbp = dbc->dbp;
417 env = dbp->env;
418
419 CDB_LOCKING_INIT(env, dbc);
420 F_CLR(dbc, DBC_ERROR);
421
422 /*
423 * If we're a secondary index, and DB_UPDATE_SECONDARY isn't set
424 * (which it only is if we're being called from a primary update),
425 * then we need to call through to the primary and delete the item.
426 *
427 * Note that this will delete the current item; we don't need to
428 * delete it ourselves as well, so we can just goto done.
429 */
430 if (flags != DB_UPDATE_SECONDARY && F_ISSET(dbp, DB_AM_SECONDARY)) {
431 ret = __dbc_del_secondary(dbc);
432 goto done;
433 }
434
435 /*
436 * If we are a foreign db, go through and check any foreign key
437 * constraints first, which will make rolling back changes on an abort
438 * simpler.
439 */
440 if (LIST_FIRST(&dbp->f_primaries) != NULL &&
441 (ret = __dbc_del_foreign(dbc)) != 0)
442 goto done;
443
444 /*
445 * If we are a primary and have secondary indices, go through
446 * and delete any secondary keys that point at the current record.
447 */
448 if (DB_IS_PRIMARY(dbp) &&
449 (ret = __dbc_del_primary(dbc)) != 0)
450 goto done;
451
452 #ifdef HAVE_COMPRESSION
453 if (DB_IS_COMPRESSED(dbp))
454 ret = __bamc_compress_del(dbc, flags);
455 else
456 #endif
457 ret = __dbc_idel(dbc, flags);
458
459 done: CDB_LOCKING_DONE(env, dbc);
460
461 if (!DB_RETOK_DBCDEL(ret))
462 F_SET(dbc, DBC_ERROR);
463 return (ret);
464 }
465
466 /*
467 * __dbc_del --
468 * Implemenation of DBC->del.
469 *
470 * PUBLIC: int __dbc_idel __P((DBC *, u_int32_t));
471 */
472 int
__dbc_idel(dbc,flags)473 __dbc_idel(dbc, flags)
474 DBC *dbc;
475 u_int32_t flags;
476 {
477 DB *dbp;
478 DBC *opd;
479 int ret, t_ret;
480
481 COMPQUIET(flags, 0);
482
483 dbp = dbc->dbp;
484
485 /*
486 * Cursor Cleanup Note:
487 * All of the cursors passed to the underlying access methods by this
488 * routine are not duplicated and will not be cleaned up on return.
489 * So, pages/locks that the cursor references must be resolved by the
490 * underlying functions.
491 */
492
493 /*
494 * Off-page duplicate trees are locked in the primary tree, that is,
495 * we acquire a write lock in the primary tree and no locks in the
496 * off-page dup tree. If the del operation is done in an off-page
497 * duplicate tree, call the primary cursor's upgrade routine first.
498 */
499 opd = dbc->internal->opd;
500 if (opd == NULL)
501 ret = dbc->am_del(dbc, flags);
502 else if ((ret = dbc->am_writelock(dbc)) == 0)
503 ret = opd->am_del(opd, flags);
504
505 /*
506 * If this was an update that is supporting dirty reads
507 * then we may have just swapped our read for a write lock
508 * which is held by the surviving cursor. We need
509 * to explicitly downgrade this lock. The closed cursor
510 * may only have had a read lock.
511 */
512 if (ret == 0 && F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
513 dbc->internal->lock_mode == DB_LOCK_WRITE) {
514 if ((ret = __TLPUT(dbc, dbc->internal->lock)) == 0)
515 dbc->internal->lock_mode = DB_LOCK_WWRITE;
516 if (dbc->internal->page != NULL && (t_ret =
517 __memp_shared(dbp->mpf, dbc->internal->page)) != 0 &&
518 ret == 0)
519 ret = t_ret;
520 }
521
522 return (ret);
523 }
524
525 /*
526 * __dbc_db_stream --
527 *
528 * DBC->db_stream
529 *
530 * PUBLIC: int __dbc_db_stream __P((DBC *, DB_STREAM **, u_int32_t));
531 */
532 int
__dbc_db_stream(dbc,dbsp,flags)533 __dbc_db_stream(dbc, dbsp, flags)
534 DBC *dbc;
535 DB_STREAM **dbsp;
536 u_int32_t flags;
537 {
538 ENV *env;
539 int ret;
540 u_int32_t oflags;
541
542 env = dbc->env;
543 oflags = 0;
544
545 if ((ret = __db_fchk(
546 env, "DBC->db_stream", flags,
547 DB_STREAM_READ | DB_STREAM_WRITE | DB_STREAM_SYNC_WRITE)) != 0)
548 return (ret);
549
550 if (DB_IS_READONLY(dbc->dbp)) {
551 LF_SET(DB_STREAM_READ);
552 oflags |= DB_FOP_READONLY;
553 }
554 if (LF_ISSET(DB_STREAM_READ) && LF_ISSET(DB_STREAM_WRITE)) {
555 ret = USR_ERR(env, EINVAL);
556 __db_errx(env, DB_STR("0750",
557 "Error, cannot set both DB_STREAM_WRITE and DB_STREAM_READ."));
558 goto err;
559 }
560
561 if (flags & DB_STREAM_READ)
562 oflags |= DB_FOP_READONLY;
563 else
564 oflags |= DB_FOP_WRITE;
565 if (flags & DB_STREAM_SYNC_WRITE)
566 oflags |= DB_FOP_SYNC_WRITE;
567
568 ret = __db_stream_init(dbc, dbsp, oflags);
569
570 err: return (ret);
571 }
572
573 /*
574 * __dbc_get_blob_id --
575 *
576 * Returns the blob id stored in the data record to which the cursor currently
577 * points. Returns EINVAL if the cursor does not point to a blob record.
578 *
579 * PUBLIC: int __dbc_get_blob_id __P((DBC *, db_seq_t *));
580 */
581 int
__dbc_get_blob_id(dbc,blob_id)582 __dbc_get_blob_id(dbc, blob_id)
583 DBC *dbc;
584 db_seq_t *blob_id;
585 {
586 DBT key, data;
587 BBLOB bl;
588 HBLOB hbl;
589 HEAPBLOBHDR bhdr;
590 int ret;
591
592 if (dbc->dbtype != DB_BTREE &&
593 dbc->dbtype != DB_HEAP && dbc->dbtype != DB_HASH) {
594 return (EINVAL);
595 }
596
597 ret = 0;
598 memset(&key, 0, sizeof(DBT));
599 memset(&data, 0, sizeof(DBT));
600 /* Get the blob database record instead of the blob. */
601 data.flags |= DB_DBT_BLOB_REC;
602
603 /*
604 * It would be great if there was a more efficient way to do this, but
605 * the complexities of getting a page from a database, especially
606 * when taking into account things like partitions and compression,
607 * make that more trouble than it is worth.
608 */
609 if ((ret = __dbc_get(dbc, &key, &data, DB_CURRENT)) != 0)
610 goto err;
611
612 switch (dbc->dbtype) {
613 case DB_BTREE:
614 if (data.size != BBLOB_SIZE) {
615 ret = USR_ERR(dbc->env, EINVAL);
616 goto err;
617 }
618 memcpy(&bl, data.data, BBLOB_SIZE);
619 if (B_TYPE(bl.type) != B_BLOB) {
620 ret = USR_ERR(dbc->env, EINVAL);
621 goto err;
622 }
623 *blob_id = (db_seq_t)bl.id;
624 break;
625 case DB_HEAP:
626 if (data.size != HEAPBLOBREC_SIZE) {
627 ret = USR_ERR(dbc->env, EINVAL);
628 goto err;
629 }
630 memcpy(&bhdr, data.data, HEAPBLOBREC_SIZE);
631 if (!F_ISSET(&bhdr.std_hdr, HEAP_RECBLOB)) {
632 ret = USR_ERR(dbc->env, EINVAL);
633 goto err;
634 }
635 *blob_id = (db_seq_t)bhdr.id;
636 break;
637 case DB_HASH:
638 if (data.size != HBLOB_SIZE) {
639 ret = USR_ERR(dbc->env, EINVAL);
640 goto err;
641 }
642 memcpy(&hbl, data.data, HBLOB_SIZE);
643 if (HPAGE_PTYPE(&hbl) != H_BLOB) {
644 ret = USR_ERR(dbc->env, EINVAL);
645 goto err;
646 }
647 *blob_id = (db_seq_t)hbl.id;
648 break;
649 default:
650 ret = USR_ERR(dbc->env, EINVAL);
651 goto err;
652 }
653
654 err: return (ret);
655 }
656
657 /*
658 * __dbc_get_blob_size --
659 *
660 * Returns the blob file size stored in the data record to which the cursor
661 * currently points. Returns EINVAL if the cursor does not point to a blob
662 * record.
663 *
664 * PUBLIC: int __dbc_get_blob_size __P((DBC *, off_t *));
665 */
666 int
__dbc_get_blob_size(dbc,size)667 __dbc_get_blob_size(dbc, size)
668 DBC *dbc;
669 off_t *size;
670 {
671 DBT key, data;
672 ENV *env;
673 BBLOB bl;
674 HBLOB hbl;
675 HEAPBLOBHDR bhdr;
676 int ret;
677
678 if (dbc->dbtype != DB_BTREE &&
679 dbc->dbtype != DB_HEAP && dbc->dbtype != DB_HASH) {
680 return (EINVAL);
681 }
682
683 env = dbc->env;
684 ret = 0;
685 memset(&key, 0, sizeof(DBT));
686 memset(&data, 0, sizeof(DBT));
687 /* Get the blob database record instead of the blob. */
688 data.flags |= DB_DBT_BLOB_REC;
689
690 /*
691 * It would be great if there was a more efficient way to do this, but
692 * the complexities of getting a page from a database, especially
693 * when taking into account things like partitions and compression,
694 * make that more trouble than it is worth.
695 */
696 if ((ret = __dbc_get(dbc, &key, &data, DB_CURRENT)) != 0)
697 goto err;
698
699 switch (dbc->dbtype) {
700 case DB_BTREE:
701 if (data.size != BBLOB_SIZE) {
702 ret = USR_ERR(dbc->env, EINVAL);
703 goto err;
704 }
705 memcpy(&bl, data.data, BBLOB_SIZE);
706 if (B_TYPE(bl.type) != B_BLOB) {
707 ret = USR_ERR(dbc->env, EINVAL);
708 goto err;
709 }
710 GET_BLOB_SIZE(env, bl, *size, ret);
711 break;
712 case DB_HEAP:
713 if (data.size != HEAPBLOBREC_SIZE) {
714 ret = USR_ERR(dbc->env, EINVAL);
715 goto err;
716 }
717 memcpy(&bhdr, data.data, HEAPBLOBREC_SIZE);
718 if (!F_ISSET(&bhdr.std_hdr, HEAP_RECBLOB)) {
719 ret = USR_ERR(dbc->env, EINVAL);
720 goto err;
721 }
722 GET_BLOB_SIZE(env, bhdr, *size, ret);
723 break;
724 case DB_HASH:
725 if (data.size != HBLOB_SIZE) {
726 ret = USR_ERR(dbc->env, EINVAL);
727 goto err;
728 }
729 memcpy(&hbl, data.data, HBLOB_SIZE);
730 if (HPAGE_PTYPE(&hbl) != H_BLOB) {
731 ret = USR_ERR(dbc->env, EINVAL);
732 goto err;
733 }
734 GET_BLOB_SIZE(env, hbl, *size, ret);
735 break;
736 default:
737 ret = USR_ERR(dbc->env, EINVAL);
738 goto err;
739 }
740
741 err: return (ret);
742 }
743
744 /*
745 * __dbc_set_blob_size --
746 *
747 * Sets the blob file size in the data record to which the cursor
748 * currently points. Returns EINVAL if the cursor does not point to a blob
749 * record.
750 *
751 * PUBLIC: int __dbc_set_blob_size __P((DBC *, off_t));
752 */
753 int
__dbc_set_blob_size(dbc,size)754 __dbc_set_blob_size(dbc, size)
755 DBC *dbc;
756 off_t size;
757 {
758 DBT key, data;
759 BBLOB *bl;
760 HBLOB *hbl;
761 HEAPBLOBHDR *bhdr;
762 int ret;
763
764 if (dbc->dbtype != DB_BTREE &&
765 dbc->dbtype != DB_HEAP && dbc->dbtype != DB_HASH) {
766 return (EINVAL);
767 }
768
769 ret = 0;
770 memset(&key, 0, sizeof(DBT));
771 memset(&data, 0, sizeof(DBT));
772 /* Get the blob database record instead of the blob. */
773 data.flags |= DB_DBT_BLOB_REC;
774
775 /*
776 * It would be great if there was a more efficient way to do this, but
777 * the complexities of getting a page from a database, especially
778 * when taking into account things like partitions and compression,
779 * make that more trouble than it is worth.
780 */
781 if ((ret = __dbc_get(dbc, &key, &data, DB_CURRENT)) != 0)
782 goto err;
783
784 switch (dbc->dbtype) {
785 case DB_BTREE:
786 bl = (BBLOB *)data.data;
787 if (bl == NULL ||
788 B_TYPE(bl->type) != B_BLOB || data.size != BBLOB_SIZE) {
789 ret = USR_ERR(dbc->env, EINVAL);
790 goto err;
791 }
792 SET_BLOB_SIZE(bl, size, BBLOB);
793 break;
794 case DB_HEAP:
795 bhdr = (HEAPBLOBHDR *)data.data;
796 if (bhdr == NULL ||
797 !F_ISSET(&bhdr->std_hdr, HEAP_RECBLOB) ||
798 data.size != HEAPBLOBREC_SIZE) {
799 ret = USR_ERR(dbc->env, EINVAL);
800 goto err;
801 }
802 SET_BLOB_SIZE(bhdr, size, HEAPBLOBHDR);
803 break;
804 case DB_HASH:
805 hbl = data.data;
806 if (hbl == NULL ||
807 HPAGE_PTYPE(hbl) != H_BLOB || data.size != HBLOB_SIZE) {
808 ret = USR_ERR(dbc->env, EINVAL);
809 goto err;
810 }
811 SET_BLOB_SIZE((HBLOB *)hbl, size, HBLOB);
812 break;
813 default:
814 ret = USR_ERR(dbc->env, EINVAL);
815 goto err;
816 }
817
818 if ((ret = __dbc_put(dbc, &key, &data, DB_CURRENT)) != 0)
819 goto err;
820
821 err: return (ret);
822 }
823
824 #ifdef HAVE_COMPRESSION
825 /*
826 * __dbc_bulk_del --
827 * Bulk del for a cursor.
828 *
829 * Only implemented for compressed BTrees. In this file in order to
830 * use the CDB_LOCKING_* macros.
831 *
832 * PUBLIC: #ifdef HAVE_COMPRESSION
833 * PUBLIC: int __dbc_bulk_del __P((DBC *, DBT *, u_int32_t));
834 * PUBLIC: #endif
835 */
836 int
__dbc_bulk_del(dbc,key,flags)837 __dbc_bulk_del(dbc, key, flags)
838 DBC *dbc;
839 DBT *key;
840 u_int32_t flags;
841 {
842 ENV *env;
843 int ret;
844
845 env = dbc->env;
846
847 DB_ASSERT(env, DB_IS_COMPRESSED(dbc->dbp));
848
849 CDB_LOCKING_INIT(env, dbc);
850 F_CLR(dbc, DBC_ERROR);
851
852 ret = __bamc_compress_bulk_del(dbc, key, flags);
853
854 CDB_LOCKING_DONE(env, dbc);
855
856 return (ret);
857 }
858 #endif
859
860 /*
861 * __dbc_dup --
862 * Duplicate a cursor
863 *
864 * PUBLIC: int __dbc_dup __P((DBC *, DBC **, u_int32_t));
865 */
866 int
__dbc_dup(dbc_orig,dbcp,flags)867 __dbc_dup(dbc_orig, dbcp, flags)
868 DBC *dbc_orig;
869 DBC **dbcp;
870 u_int32_t flags;
871 {
872 DBC *dbc_n, *dbc_nopd;
873 int ret;
874
875 dbc_n = dbc_nopd = NULL;
876
877 /* Allocate a new cursor and initialize it. */
878 if ((ret = __dbc_idup(dbc_orig, &dbc_n, flags)) != 0)
879 goto err;
880 *dbcp = dbc_n;
881
882 /*
883 * If the cursor references an off-page duplicate tree, allocate a
884 * new cursor for that tree and initialize it.
885 */
886 if (dbc_orig->internal->opd != NULL) {
887 if ((ret =
888 __dbc_idup(dbc_orig->internal->opd, &dbc_nopd, flags)) != 0)
889 goto err;
890 dbc_n->internal->opd = dbc_nopd;
891 dbc_nopd->internal->pdbc = dbc_n;
892 }
893 return (0);
894
895 err: if (dbc_n != NULL)
896 (void)__dbc_close(dbc_n);
897 if (dbc_nopd != NULL)
898 (void)__dbc_close(dbc_nopd);
899
900 return (ret);
901 }
902
903 /*
904 * __dbc_idup --
905 * Internal version of __dbc_dup.
906 *
907 * PUBLIC: int __dbc_idup __P((DBC *, DBC **, u_int32_t));
908 */
909 int
__dbc_idup(dbc_orig,dbcp,flags)910 __dbc_idup(dbc_orig, dbcp, flags)
911 DBC *dbc_orig, **dbcp;
912 u_int32_t flags;
913 {
914 DB *dbp;
915 DBC *dbc_n;
916 DBC_INTERNAL *int_n, *int_orig;
917 ENV *env;
918 int ret;
919
920 dbp = dbc_orig->dbp;
921 dbc_n = *dbcp;
922 env = dbp->env;
923
924 if ((ret = __db_cursor_int(dbp, dbc_orig->thread_info,
925 dbc_orig->txn, dbc_orig->dbtype, dbc_orig->internal->root,
926 F_ISSET(dbc_orig, DBC_OPD) | DBC_DUPLICATE,
927 dbc_orig->locker, &dbc_n)) != 0)
928 return (ret);
929
930 /* Position the cursor if requested, acquiring the necessary locks. */
931 if (LF_ISSET(DB_POSITION)) {
932 int_n = dbc_n->internal;
933 int_orig = dbc_orig->internal;
934
935 dbc_n->flags |= dbc_orig->flags & ~DBC_OWN_LID;
936
937 int_n->indx = int_orig->indx;
938 int_n->pgno = int_orig->pgno;
939 int_n->root = int_orig->root;
940 int_n->lock_mode = int_orig->lock_mode;
941
942 int_n->stream_start_pgno = int_orig->stream_start_pgno;
943 int_n->stream_off = int_orig->stream_off;
944 int_n->stream_curr_pgno = int_orig->stream_curr_pgno;
945
946 #ifdef HAVE_PARTITION
947 if (DB_IS_PARTITIONED(dbp)) {
948 if ((ret = __partc_dup(dbc_orig, dbc_n)) != 0)
949 goto err;
950 } else
951 #endif
952 switch (dbc_orig->dbtype) {
953 case DB_QUEUE:
954 if ((ret = __qamc_dup(dbc_orig, dbc_n)) != 0)
955 goto err;
956 break;
957 case DB_BTREE:
958 case DB_RECNO:
959 if ((ret = __bamc_dup(dbc_orig, dbc_n, flags)) != 0)
960 goto err;
961 break;
962 case DB_HASH:
963 if ((ret = __hamc_dup(dbc_orig, dbc_n)) != 0)
964 goto err;
965 break;
966 case DB_HEAP:
967 if ((ret = __heapc_dup(dbc_orig, dbc_n)) != 0)
968 goto err;
969 break;
970 case DB_UNKNOWN:
971 default:
972 ret = __db_unknown_type(env,
973 "__dbc_idup", dbc_orig->dbtype);
974 goto err;
975 }
976 } else if (F_ISSET(dbc_orig, DBC_BULK)) {
977 /*
978 * For bulk cursors, remember what page were on, even if we
979 * don't know that the next operation will be nearby.
980 */
981 dbc_n->internal->pgno = dbc_orig->internal->pgno;
982 }
983
984 /* Copy the locking flags to the new cursor. */
985 F_SET(dbc_n, F_ISSET(dbc_orig, DBC_BULK |
986 DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED | DBC_WRITECURSOR));
987
988 /*
989 * If we're in CDB and this isn't an offpage dup cursor, then
990 * we need to get a lock for the duplicated cursor.
991 */
992 if (CDB_LOCKING(env) && !F_ISSET(dbc_n, DBC_OPD) &&
993 (ret = __lock_get(env, dbc_n->locker, 0,
994 &dbc_n->lock_dbt, F_ISSET(dbc_orig, DBC_WRITECURSOR) ?
995 DB_LOCK_IWRITE : DB_LOCK_READ, &dbc_n->mylock)) != 0)
996 goto err;
997
998 dbc_n->priority = dbc_orig->priority;
999 dbc_n->internal->pdbc = dbc_orig->internal->pdbc;
1000 *dbcp = dbc_n;
1001 return (0);
1002
1003 err: (void)__dbc_close(dbc_n);
1004 return (ret);
1005 }
1006
1007 /*
1008 * __dbc_newopd --
1009 * Create a new off-page duplicate cursor.
1010 *
1011 * PUBLIC: int __dbc_newopd __P((DBC *, db_pgno_t, DBC *, DBC **));
1012 */
1013 int
__dbc_newopd(dbc_parent,root,oldopd,dbcp)1014 __dbc_newopd(dbc_parent, root, oldopd, dbcp)
1015 DBC *dbc_parent;
1016 db_pgno_t root;
1017 DBC *oldopd;
1018 DBC **dbcp;
1019 {
1020 DB *dbp;
1021 DBC *opd;
1022 DBTYPE dbtype;
1023 int ret;
1024
1025 dbp = dbc_parent->dbp;
1026 dbtype = (dbp->dup_compare == NULL) ? DB_RECNO : DB_BTREE;
1027
1028 /*
1029 * On failure, we want to default to returning the old off-page dup
1030 * cursor, if any; our caller can't be left with a dangling pointer
1031 * to a freed cursor. On error the only allowable behavior is to
1032 * close the cursor (and the old OPD cursor it in turn points to), so
1033 * this should be safe.
1034 */
1035 *dbcp = oldopd;
1036
1037 if ((ret = __db_cursor_int(dbp, dbc_parent->thread_info,
1038 dbc_parent->txn,
1039 dbtype, root, DBC_OPD, dbc_parent->locker, &opd)) != 0)
1040 return (ret);
1041
1042 opd->priority = dbc_parent->priority;
1043 opd->internal->pdbc = dbc_parent;
1044 *dbcp = opd;
1045
1046 /*
1047 * Check to see if we already have an off-page dup cursor that we've
1048 * passed in. If we do, close it. It'd be nice to use it again
1049 * if it's a cursor belonging to the right tree, but if we're doing
1050 * a cursor-relative operation this might not be safe, so for now
1051 * we'll take the easy way out and always close and reopen.
1052 *
1053 * Note that under no circumstances do we want to close the old
1054 * cursor without returning a valid new one; we don't want to
1055 * leave the main cursor in our caller with a non-NULL pointer
1056 * to a freed off-page dup cursor.
1057 */
1058 if (oldopd != NULL && (ret = __dbc_close(oldopd)) != 0)
1059 return (ret);
1060
1061 return (0);
1062 }
1063
1064 /*
1065 * __dbc_get --
1066 * Get using a cursor.
1067 *
1068 * PUBLIC: int __dbc_get __P((DBC *, DBT *, DBT *, u_int32_t));
1069 */
1070 int
__dbc_get(dbc,key,data,flags)1071 __dbc_get(dbc, key, data, flags)
1072 DBC *dbc;
1073 DBT *key, *data;
1074 u_int32_t flags;
1075 {
1076 F_CLR(dbc, DBC_ERROR);
1077 #ifdef HAVE_PARTITION
1078 if (F_ISSET(dbc, DBC_PARTITIONED))
1079 return (__partc_get(dbc, key, data, flags));
1080 #endif
1081
1082 #ifdef HAVE_COMPRESSION
1083 if (DB_IS_COMPRESSED(dbc->dbp))
1084 return (__bamc_compress_get(dbc, key, data, flags));
1085 #endif
1086
1087 return (__dbc_iget(dbc, key, data, flags));
1088 }
1089
1090 /*
1091 * __dbc_iget --
1092 * Implementation of get using a cursor.
1093 *
1094 * PUBLIC: int __dbc_iget __P((DBC *, DBT *, DBT *, u_int32_t));
1095 */
1096 int
__dbc_iget(dbc,key,data,flags)1097 __dbc_iget(dbc, key, data, flags)
1098 DBC *dbc;
1099 DBT *key, *data;
1100 u_int32_t flags;
1101 {
1102 DB *dbp;
1103 DBC *ddbc, *dbc_n, *opd;
1104 DBC_INTERNAL *cp, *cp_n;
1105 DB_MPOOLFILE *mpf;
1106 ENV *env;
1107 db_pgno_t pgno;
1108 db_indx_t indx_off;
1109 u_int32_t multi, orig_ulen, tmp_flags, tmp_read_locking, tmp_rmw;
1110 u_int8_t type;
1111 int key_small, ret, t_ret;
1112
1113 COMPQUIET(orig_ulen, 0);
1114
1115 dbc->cur_key = key;
1116 key_small = 0;
1117
1118 /*
1119 * Cursor Cleanup Note:
1120 * All of the cursors passed to the underlying access methods by this
1121 * routine are duplicated cursors. On return, any referenced pages
1122 * will be discarded, and, if the cursor is not intended to be used
1123 * again, the close function will be called. So, pages/locks that
1124 * the cursor references do not need to be resolved by the underlying
1125 * functions.
1126 */
1127 dbp = dbc->dbp;
1128 env = dbp->env;
1129 mpf = dbp->mpf;
1130 dbc_n = NULL;
1131 opd = NULL;
1132
1133 PERFMON6(env, db, get, dbp->fname, dbp->dname,
1134 dbc->txn == NULL ? 0 : dbc->txn->txnid, key, data, flags);
1135
1136 /* Clear OR'd in additional bits so we can check for flag equality. */
1137 tmp_rmw = LF_ISSET(DB_RMW);
1138 LF_CLR(DB_RMW);
1139
1140 SET_READ_LOCKING_FLAGS(dbc, tmp_read_locking);
1141
1142 multi = LF_ISSET(DB_MULTIPLE|DB_MULTIPLE_KEY);
1143 LF_CLR(DB_MULTIPLE|DB_MULTIPLE_KEY);
1144
1145 /*
1146 * Return a cursor's record number. It has nothing to do with the
1147 * cursor get code except that it was put into the interface.
1148 */
1149 if (flags == DB_GET_RECNO) {
1150 if (tmp_rmw)
1151 F_SET(dbc, DBC_RMW);
1152 F_SET(dbc, tmp_read_locking);
1153 ret = __bamc_rget(dbc, data);
1154 if (tmp_rmw)
1155 F_CLR(dbc, DBC_RMW);
1156 /* Clear the temp flags, but leave WAS_READ_COMMITTED. */
1157 F_CLR(dbc, tmp_read_locking & ~DBC_WAS_READ_COMMITTED);
1158 return (ret);
1159 }
1160
1161 if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
1162 CDB_LOCKING_INIT(env, dbc);
1163
1164 /* Don't return the key or data if it was passed to us. */
1165 if (!DB_RETURNS_A_KEY(dbp, flags))
1166 F_SET(key, DB_DBT_ISSET);
1167 if (flags == DB_GET_BOTH &&
1168 (dbp->dup_compare == NULL || dbp->dup_compare == __dbt_defcmp))
1169 F_SET(data, DB_DBT_ISSET);
1170
1171 /*
1172 * If we have an off-page duplicates cursor, and the operation applies
1173 * to it, perform the operation. Duplicate the cursor and call the
1174 * underlying function.
1175 *
1176 * Off-page duplicate trees are locked in the primary tree, that is,
1177 * we acquire a write lock in the primary tree and no locks in the
1178 * off-page dup tree. If the DB_RMW flag was specified and the get
1179 * operation is done in an off-page duplicate tree, call the primary
1180 * cursor's upgrade routine first. We fetch the primary tree's data
1181 * page to follow the buffer latching order rules for btrees: latch from
1182 * the top of the main tree down, even when also searching OPD trees.
1183 * Deadlocks could otherwise occur if we need to fetch the main page
1184 * while an OPD page is latched. [#22532]
1185 */
1186 cp = dbc->internal;
1187 if (cp->opd != NULL &&
1188 (flags == DB_CURRENT || flags == DB_GET_BOTHC ||
1189 flags == DB_NEXT || flags == DB_NEXT_DUP ||
1190 flags == DB_PREV || flags == DB_PREV_DUP)) {
1191 if (tmp_rmw && (ret = dbc->am_writelock(dbc)) != 0)
1192 goto err;
1193 if (cp->page == NULL && (ret = __memp_fget(mpf, &cp->pgno,
1194 dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
1195 goto err;
1196
1197 if (F_ISSET(dbc, DBC_TRANSIENT))
1198 opd = cp->opd;
1199 else if ((ret = __dbc_idup(cp->opd, &opd, DB_POSITION)) != 0)
1200 goto err;
1201
1202 if ((ret = opd->am_get(opd, key, data, flags, NULL)) == 0)
1203 goto done;
1204 /*
1205 * Another cursor may have deleted all of the off-page
1206 * duplicates, so for operations that are moving a cursor, we
1207 * need to skip the empty tree and retry on the parent cursor.
1208 */
1209 if (ret == DB_NOTFOUND &&
1210 (flags == DB_PREV || flags == DB_NEXT)) {
1211 ret = __dbc_close(opd);
1212 opd = NULL;
1213 if (F_ISSET(dbc, DBC_TRANSIENT))
1214 cp->opd = NULL;
1215 }
1216 if (ret != 0)
1217 goto err;
1218 } else if (cp->opd != NULL && F_ISSET(dbc, DBC_TRANSIENT)) {
1219 if ((ret = __dbc_close(cp->opd)) != 0)
1220 goto err;
1221 cp->opd = NULL;
1222 }
1223
1224 /*
1225 * Perform an operation on the main cursor. Duplicate the cursor,
1226 * upgrade the lock as required, and call the underlying function.
1227 */
1228 switch (flags) {
1229 case DB_CURRENT:
1230 case DB_GET_BOTHC:
1231 case DB_NEXT:
1232 case DB_NEXT_DUP:
1233 case DB_NEXT_NODUP:
1234 case DB_PREV:
1235 case DB_PREV_DUP:
1236 case DB_PREV_NODUP:
1237 tmp_flags = DB_POSITION;
1238 break;
1239 default:
1240 tmp_flags = 0;
1241 break;
1242 }
1243
1244 /*
1245 * If this cursor is going to be closed immediately, we don't
1246 * need to take precautions to clean it up on error.
1247 */
1248 if (F_ISSET(dbc, DBC_TRANSIENT | DBC_PARTITIONED))
1249 dbc_n = dbc;
1250 else {
1251 ret = __dbc_idup(dbc, &dbc_n, tmp_flags);
1252
1253 if (ret != 0)
1254 goto err;
1255 COPY_RET_MEM(dbc, dbc_n);
1256 }
1257
1258 if (tmp_rmw)
1259 F_SET(dbc_n, DBC_RMW);
1260 F_SET(dbc_n, tmp_read_locking);
1261
1262 switch (multi) {
1263 case DB_MULTIPLE:
1264 F_SET(dbc_n, DBC_MULTIPLE);
1265 break;
1266 case DB_MULTIPLE_KEY:
1267 F_SET(dbc_n, DBC_MULTIPLE_KEY);
1268 break;
1269 case DB_MULTIPLE | DB_MULTIPLE_KEY:
1270 F_SET(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
1271 break;
1272 case 0:
1273 default:
1274 break;
1275 }
1276
1277 retry: pgno = PGNO_INVALID;
1278 ret = dbc_n->am_get(dbc_n, key, data, flags, &pgno);
1279 if (tmp_rmw)
1280 F_CLR(dbc_n, DBC_RMW);
1281 /*
1282 * Clear the temporary locking flags in the new cursor. The user's
1283 * (old) cursor needs to have the WAS_READ_COMMITTED flag because this
1284 * is used on the next call on that cursor.
1285 */
1286 F_CLR(dbc_n, tmp_read_locking);
1287 F_SET(dbc, tmp_read_locking & DBC_WAS_READ_COMMITTED);
1288 F_CLR(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
1289 if (ret != 0)
1290 goto err;
1291
1292 cp_n = dbc_n->internal;
1293
1294 /*
1295 * We may be referencing a new off-page duplicates tree. Acquire
1296 * a new cursor and call the underlying function.
1297 */
1298 if (pgno != PGNO_INVALID) {
1299 if ((ret = __dbc_newopd(dbc,
1300 pgno, cp_n->opd, &cp_n->opd)) != 0)
1301 goto err;
1302
1303 switch (flags) {
1304 case DB_FIRST:
1305 case DB_NEXT:
1306 case DB_NEXT_NODUP:
1307 case DB_SET:
1308 case DB_SET_RECNO:
1309 case DB_SET_RANGE:
1310 tmp_flags = DB_FIRST;
1311 break;
1312 case DB_LAST:
1313 case DB_PREV:
1314 case DB_PREV_NODUP:
1315 tmp_flags = DB_LAST;
1316 break;
1317 case DB_GET_BOTH:
1318 case DB_GET_BOTHC:
1319 case DB_GET_BOTH_RANGE:
1320 tmp_flags = flags;
1321 break;
1322 default:
1323 ret = __db_unknown_flag(env, "__dbc_get", flags);
1324 goto err;
1325 }
1326 ret = cp_n->opd->am_get(cp_n->opd, key, data, tmp_flags, NULL);
1327 /*
1328 * Another cursor may have deleted all of the off-page
1329 * duplicates, so for operations that are moving a cursor, we
1330 * need to skip the empty tree and retry on the parent cursor.
1331 */
1332 if (ret == DB_NOTFOUND) {
1333 PERFMON5(env, race, dbc_get,
1334 dbp->fname, dbp->dname, ret, tmp_flags, key);
1335
1336 switch (flags) {
1337 case DB_FIRST:
1338 case DB_NEXT:
1339 case DB_NEXT_NODUP:
1340 flags = DB_NEXT;
1341 break;
1342 case DB_LAST:
1343 case DB_PREV:
1344 case DB_PREV_NODUP:
1345 flags = DB_PREV;
1346 break;
1347 default:
1348 goto err;
1349 }
1350
1351 ret = __dbc_close(cp_n->opd);
1352 cp_n->opd = NULL;
1353 if (ret == 0)
1354 goto retry;
1355 }
1356 if (ret != 0)
1357 goto err;
1358 }
1359
1360 done: /*
1361 * Return a key/data item. The only exception is that we don't return
1362 * a key if the user already gave us one, that is, if the DB_SET flag
1363 * was set. The DB_SET flag is necessary. In a Btree, the user's key
1364 * doesn't have to be the same as the key stored the tree, depending on
1365 * the magic performed by the comparison function. As we may not have
1366 * done any key-oriented operation here, the page reference may not be
1367 * valid. Fill it in as necessary. We don't have to worry about any
1368 * locks, the cursor must already be holding appropriate locks.
1369 *
1370 * !!!
1371 * If not a Btree and DB_SET_RANGE is set, we shouldn't return a key
1372 * either, should we?
1373 */
1374 cp_n = dbc_n == NULL ? dbc->internal : dbc_n->internal;
1375 if (!F_ISSET(key, DB_DBT_ISSET)) {
1376 if (cp_n->page == NULL && (ret = __memp_fget(mpf, &cp_n->pgno,
1377 dbc->thread_info, dbc->txn, 0, &cp_n->page)) != 0)
1378 goto err;
1379
1380 if ((ret = __db_ret(dbc, cp_n->page, cp_n->indx, key,
1381 &dbc->rkey->data, &dbc->rkey->ulen)) != 0) {
1382 /*
1383 * If the key DBT is too small, we still want to return
1384 * the size of the data. Otherwise applications are
1385 * forced to check each one with a separate call. We
1386 * don't want to copy the data, so we set the ulen to
1387 * zero before calling __db_ret.
1388 */
1389 if (ret == DB_BUFFER_SMALL &&
1390 F_ISSET(data, DB_DBT_USERMEM)) {
1391 key_small = 1;
1392 orig_ulen = data->ulen;
1393 data->ulen = 0;
1394 } else
1395 goto err;
1396 }
1397 }
1398 if (multi != 0 && dbc->am_bulk != NULL) {
1399 /*
1400 * Even if fetching from the OPD cursor we need a duplicate
1401 * primary cursor if we are going after multiple keys.
1402 */
1403 if (dbc_n == NULL) {
1404 /*
1405 * Non-"_KEY" DB_MULTIPLE doesn't move the main cursor,
1406 * so it's safe to just use dbc, unless the cursor
1407 * has an open off-page duplicate cursor whose state
1408 * might need to be preserved.
1409 */
1410 if ((!(multi & DB_MULTIPLE_KEY) &&
1411 dbc->internal->opd == NULL) ||
1412 F_ISSET(dbc, DBC_TRANSIENT | DBC_PARTITIONED))
1413 dbc_n = dbc;
1414 else {
1415 if ((ret = __dbc_idup(dbc,
1416 &dbc_n, DB_POSITION)) != 0)
1417 goto err;
1418 if ((ret = dbc_n->am_get(dbc_n,
1419 key, data, DB_CURRENT, &pgno)) != 0)
1420 goto err;
1421 }
1422 cp_n = dbc_n->internal;
1423 }
1424
1425 /*
1426 * If opd is set then we dupped the opd that we came in with.
1427 * When we return we may have a new opd if we went to another
1428 * key.
1429 */
1430 if (opd != NULL) {
1431 DB_ASSERT(env, cp_n->opd == NULL);
1432 cp_n->opd = opd;
1433 opd = NULL;
1434 }
1435
1436 /*
1437 * Bulk get doesn't use __db_retcopy, so data.size won't
1438 * get set up unless there is an error. Assume success
1439 * here. This is the only call to am_bulk, and it avoids
1440 * setting it exactly the same everywhere. If we have an
1441 * DB_BUFFER_SMALL error, it'll get overwritten with the
1442 * needed value.
1443 */
1444 data->size = data->ulen;
1445 ret = dbc_n->am_bulk(dbc_n, data, flags | multi);
1446 } else if (!F_ISSET(data, DB_DBT_ISSET)) {
1447 ddbc = opd != NULL ? opd :
1448 cp_n->opd != NULL ? cp_n->opd : dbc_n;
1449 cp = ddbc->internal;
1450 if (cp->page == NULL &&
1451 (ret = __memp_fget(mpf, &cp->pgno,
1452 dbc->thread_info, ddbc->txn, 0, &cp->page)) != 0)
1453 goto err;
1454
1455 type = TYPE(cp->page);
1456 indx_off = ((type == P_LBTREE ||
1457 type == P_HASH || type == P_HASH_UNSORTED) ? O_INDX : 0);
1458 ret = __db_ret(ddbc, cp->page, cp->indx + indx_off,
1459 data, &dbc->rdata->data, &dbc->rdata->ulen);
1460 }
1461
1462 err: /* Don't pass DB_DBT_ISSET back to application level, error or no. */
1463 F_CLR(key, DB_DBT_ISSET);
1464 F_CLR(data, DB_DBT_ISSET);
1465
1466 /* Cleanup and cursor resolution. */
1467 if (opd != NULL) {
1468 /*
1469 * To support dirty reads we must reget the write lock
1470 * if we have just stepped off a deleted record.
1471 * Since the OPD cursor does not know anything
1472 * about the referencing page or cursor we need
1473 * to peek at the OPD cursor and get the lock here.
1474 */
1475 if (F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
1476 F_ISSET((BTREE_CURSOR *)
1477 dbc->internal->opd->internal, C_DELETED))
1478 if ((t_ret =
1479 dbc->am_writelock(dbc)) != 0 && ret == 0)
1480 ret = t_ret;
1481 if ((t_ret = __dbc_cleanup(
1482 dbc->internal->opd, opd, ret)) != 0 && ret == 0)
1483 ret = t_ret;
1484 }
1485
1486 if (key_small) {
1487 data->ulen = orig_ulen;
1488 if (ret == 0)
1489 ret = DB_BUFFER_SMALL;
1490 }
1491
1492 if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 &&
1493 (ret == 0 || ret == DB_BUFFER_SMALL))
1494 ret = t_ret;
1495
1496 if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
1497 CDB_LOCKING_DONE(env, dbc);
1498 return (ret);
1499 }
1500
1501 /* Internal flags shared by the dbc_put functions. */
1502 #define DBC_PUT_RMW 0x001
1503 #define DBC_PUT_NODEL 0x002
1504 #define DBC_PUT_HAVEREC 0x004
1505
1506 /*
1507 * __dbc_put_resolve_key --
1508 * Get the current key and data so that we can correctly update the
1509 * secondary and foreign databases.
1510 */
1511 static inline int
__dbc_put_resolve_key(dbc,oldkey,olddata,put_statep,flags)1512 __dbc_put_resolve_key(dbc, oldkey, olddata, put_statep, flags)
1513 DBC *dbc;
1514 DBT *oldkey, *olddata;
1515 u_int32_t flags, *put_statep;
1516 {
1517 int ret, rmw;
1518
1519 rmw = FLD_ISSET(*put_statep, DBC_PUT_RMW) ? DB_RMW : 0;
1520
1521 DB_ASSERT(dbc->env, flags == DB_CURRENT);
1522 COMPQUIET(flags, 0);
1523
1524 /*
1525 * This is safe to do on the cursor we already have;
1526 * error or no, it won't move.
1527 *
1528 * We use DB_RMW for all of these gets because we'll be
1529 * writing soon enough in the "normal" put code. In
1530 * transactional databases we'll hold those write locks
1531 * even if we close the cursor we're reading with.
1532 *
1533 * The DB_KEYEMPTY return needs special handling -- if the
1534 * cursor is on a deleted key, we return DB_NOTFOUND.
1535 */
1536 memset(oldkey, 0, sizeof(DBT));
1537 if ((ret = __dbc_get(dbc, oldkey, olddata, rmw | DB_CURRENT)) != 0)
1538 return (ret == DB_KEYEMPTY ? DB_NOTFOUND : ret);
1539
1540 /* Record that we've looked for the old record. */
1541 FLD_SET(*put_statep, DBC_PUT_HAVEREC);
1542 return (0);
1543 }
1544
1545 /*
1546 * __dbc_put_append --
1547 * Handle an append to a primary.
1548 */
1549 static inline int
__dbc_put_append(dbc,key,data,put_statep,flags)1550 __dbc_put_append(dbc, key, data, put_statep, flags)
1551 DBC *dbc;
1552 DBT *key, *data;
1553 u_int32_t flags, *put_statep;
1554 {
1555 DB *dbp;
1556 ENV *env;
1557 DBC *dbc_n;
1558 DBT tdata;
1559 int ret, t_ret;
1560
1561 dbp = dbc->dbp;
1562 env = dbp->env;
1563 ret = 0;
1564 dbc_n = NULL;
1565
1566 DB_ASSERT(env, flags == DB_APPEND);
1567 COMPQUIET(flags, 0);
1568
1569 /*
1570 * With DB_APPEND, we need to do the insert to populate the key value.
1571 * So we swap the 'normal' order of updating secondary / verifying
1572 * foreign databases and inserting.
1573 *
1574 * If there is an append callback, the value stored in data->data may
1575 * be replaced and then freed. To avoid passing a freed pointer back
1576 * to the user, just operate on a copy of the data DBT.
1577 */
1578 tdata = *data;
1579
1580 /*
1581 * If this cursor is going to be closed immediately, we don't
1582 * need to take precautions to clean it up on error.
1583 */
1584 if (F_ISSET(dbc, DBC_TRANSIENT))
1585 dbc_n = dbc;
1586 else if ((ret = __dbc_idup(dbc, &dbc_n, 0)) != 0)
1587 goto err;
1588
1589 /*
1590 * Append isn't a normal put operation; call the appropriate access
1591 * method's append function.
1592 */
1593 switch (dbp->type) {
1594 case DB_HEAP:
1595 if ((ret = __heap_append(dbc_n, key, &tdata)) != 0)
1596 goto err;
1597 break;
1598 case DB_QUEUE:
1599 if ((ret = __qam_append(dbc_n, key, &tdata)) != 0)
1600 goto err;
1601 break;
1602 case DB_RECNO:
1603 if ((ret = __ram_append(dbc_n, key, &tdata)) != 0)
1604 goto err;
1605 break;
1606 default:
1607 /* The interface should prevent this. */
1608 DB_ASSERT(env,
1609 dbp->type == DB_QUEUE || dbp->type == DB_RECNO);
1610
1611 ret = __db_ferr(env, "DBC->put", 0);
1612 goto err;
1613 }
1614
1615 /*
1616 * The append callback, if one exists, may have allocated a new
1617 * tdata.data buffer. If so, free it.
1618 */
1619 FREE_IF_NEEDED(env, &tdata);
1620
1621 /*
1622 * The key value may have been generated by the above operation, but
1623 * not set in the data buffer. Make sure it is there so that secondary
1624 * updates can complete.
1625 */
1626 __dbt_userfree(env, key, NULL, NULL);
1627 if ((ret = __dbt_usercopy(env, key)) != 0)
1628 goto err;
1629
1630 /* An append cannot be replacing an existing item. */
1631 FLD_SET(*put_statep, DBC_PUT_NODEL);
1632
1633 err: if (dbc_n != NULL &&
1634 (t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
1635 ret = t_ret;
1636 return (ret);
1637 }
1638
1639 /*
1640 * __dbc_put_partial --
1641 * Ensure that the data item we are using is complete and correct.
1642 * Otherwise we could break the secondary constraints.
1643 */
1644 static inline int
__dbc_put_partial(dbc,pkey,data,orig_data,out_data,put_statep,flags)1645 __dbc_put_partial(dbc, pkey, data, orig_data, out_data, put_statep, flags)
1646 DBC *dbc;
1647 DBT *pkey, *data, *orig_data, *out_data;
1648 u_int32_t *put_statep, flags;
1649 {
1650 DB *dbp;
1651 DBC *pdbc;
1652 int ret, rmw, t_ret;
1653
1654 dbp = dbc->dbp;
1655 ret = t_ret = 0;
1656 rmw = FLD_ISSET(*put_statep, DBC_PUT_RMW) ? DB_RMW : 0;
1657
1658 if (!FLD_ISSET(*put_statep, DBC_PUT_HAVEREC) &&
1659 !FLD_ISSET(*put_statep, DBC_PUT_NODEL)) {
1660 /*
1661 * We're going to have to search the tree for the
1662 * specified key. Dup a cursor (so we have the same
1663 * locking info) and do a c_get.
1664 */
1665 if ((ret = __dbc_idup(dbc, &pdbc, 0)) != 0)
1666 return (ret);
1667
1668 /*
1669 * When doing a put with DB_CURRENT, partial data items have
1670 * already been resolved.
1671 */
1672 DB_ASSERT(dbp->env, flags != DB_CURRENT);
1673
1674 F_SET(pkey, DB_DBT_ISSET);
1675 ret = __dbc_get(pdbc, pkey, orig_data, rmw | DB_SET);
1676 if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
1677 FLD_SET(*put_statep, DBC_PUT_NODEL);
1678 ret = 0;
1679 }
1680 if ((t_ret = __dbc_close(pdbc)) != 0)
1681 ret = t_ret;
1682 if (ret != 0)
1683 return (ret);
1684
1685 FLD_SET(*put_statep, DBC_PUT_HAVEREC);
1686 }
1687
1688 COMPQUIET(flags, 0);
1689
1690 /*
1691 * Now build the new datum from orig_data and the partial data
1692 * we were given. It's okay to do this if no record was
1693 * returned above: a partial put on an empty record is allowed,
1694 * if a little strange. The data is zero-padded.
1695 */
1696 return (__db_buildpartial(dbp, orig_data, data, out_data));
1697 }
1698
1699 /*
1700 * __dbc_put_fixed_len --
1701 * Handle padding for fixed-length records.
1702 */
1703 static inline int
__dbc_put_fixed_len(dbc,data,out_data)1704 __dbc_put_fixed_len(dbc, data, out_data)
1705 DBC *dbc;
1706 DBT *data, *out_data;
1707 {
1708 DB *dbp;
1709 ENV *env;
1710 int re_pad, ret;
1711 u_int32_t re_len, size;
1712
1713 dbp = dbc->dbp;
1714 env = dbp->env;
1715 ret = 0;
1716
1717 /*
1718 * Handle fixed-length records. If the primary database has
1719 * fixed-length records, we need to pad out the datum before
1720 * we pass it into the callback function; we always index the
1721 * "real" record.
1722 */
1723 if (dbp->type == DB_QUEUE) {
1724 re_len = ((QUEUE *)dbp->q_internal)->re_len;
1725 re_pad = ((QUEUE *)dbp->q_internal)->re_pad;
1726 } else {
1727 re_len = ((BTREE *)dbp->bt_internal)->re_len;
1728 re_pad = ((BTREE *)dbp->bt_internal)->re_pad;
1729 }
1730
1731 size = data->size;
1732 if (size > re_len) {
1733 ret = __db_rec_toobig(env, size, re_len);
1734 return (ret);
1735 } else if (size < re_len) {
1736 /*
1737 * If we're not doing a partial put, copy data->data into
1738 * out_data->data, then pad out out_data->data. This overrides
1739 * the assignment made above, which is used in the more common
1740 * case when padding is not needed.
1741 *
1742 * If we're doing a partial put, the data we want are already
1743 * in out_data.data; we just need to pad.
1744 */
1745 if (F_ISSET(data, DB_DBT_PARTIAL)) {
1746 if ((ret = __os_realloc(
1747 env, re_len, &out_data->data)) != 0)
1748 return (ret);
1749 /*
1750 * In the partial case, we have built the item into
1751 * out_data already using __db_buildpartial. Just need
1752 * to pad from the end of out_data, not from data->size.
1753 */
1754 size = out_data->size;
1755 } else {
1756 if ((ret = __os_malloc(
1757 env, re_len, &out_data->data)) != 0)
1758 return (ret);
1759 memcpy(out_data->data, data->data, size);
1760 }
1761 memset((u_int8_t *)out_data->data + size, re_pad,
1762 re_len - size);
1763 out_data->size = re_len;
1764 }
1765
1766 return (ret);
1767 }
1768
1769 /*
1770 * __dbc_put_secondaries --
1771 * Insert the secondary keys, and validate the foreign key constraints.
1772 */
1773 static inline int
__dbc_put_secondaries(dbc,pkey,data,orig_data,s_count,s_keys_buf,put_statep)1774 __dbc_put_secondaries(dbc,
1775 pkey, data, orig_data, s_count, s_keys_buf, put_statep)
1776 DBC *dbc;
1777 DBT *pkey, *data, *orig_data, *s_keys_buf;
1778 int s_count;
1779 u_int32_t *put_statep;
1780 {
1781 DB *dbp, *sdbp;
1782 DBC *fdbc, *sdbc;
1783 DBT fdata, oldpkey, *skeyp, temppkey, tempskey, *tskeyp;
1784 ENV *env;
1785 int cmp, ret, rmw, t_ret;
1786 u_int32_t nskey;
1787
1788 dbp = dbc->dbp;
1789 env = dbp->env;
1790 fdbc = sdbc = NULL;
1791 sdbp = NULL;
1792 t_ret = 0;
1793 rmw = FLD_ISSET(*put_statep, DBC_PUT_RMW) ? DB_RMW : 0;
1794
1795 /*
1796 * Loop through the secondaries. (Step 3.)
1797 *
1798 * Note that __db_s_first and __db_s_next will take care of
1799 * thread-locking and refcounting issues.
1800 */
1801 for (ret = __db_s_first(dbp, &sdbp), skeyp = s_keys_buf;
1802 sdbp != NULL && ret == 0;
1803 ret = __db_s_next(&sdbp, dbc->txn), ++skeyp) {
1804 DB_ASSERT(env, skeyp - s_keys_buf < s_count);
1805 /*
1806 * Don't process this secondary if the key is immutable and we
1807 * know that the old record exists. This optimization can't be
1808 * used if we have not checked for the old record yet.
1809 */
1810 if (FLD_ISSET(*put_statep, DBC_PUT_HAVEREC) &&
1811 !FLD_ISSET(*put_statep, DBC_PUT_NODEL) &&
1812 FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
1813 continue;
1814
1815 /*
1816 * Call the callback for this secondary, to get the
1817 * appropriate secondary key.
1818 */
1819 if ((ret = sdbp->s_callback(sdbp,
1820 pkey, data, skeyp)) != 0) {
1821 /* Not indexing is equivalent to an empty key set. */
1822 if (ret == DB_DONOTINDEX) {
1823 F_SET(skeyp, DB_DBT_MULTIPLE);
1824 skeyp->size = 0;
1825 ret = 0;
1826 } else
1827 goto err;
1828 }
1829
1830 if (sdbp->s_foreign != NULL &&
1831 (ret = __db_cursor_int(sdbp->s_foreign,
1832 dbc->thread_info, dbc->txn, sdbp->s_foreign->type,
1833 PGNO_INVALID, 0, dbc->locker, &fdbc)) != 0)
1834 goto err;
1835
1836 /*
1837 * Mark the secondary key DBT(s) as set -- that is, the
1838 * callback returned at least one secondary key.
1839 *
1840 * Also, if this secondary index is associated with a foreign
1841 * database, check that the foreign db contains the key(s) to
1842 * maintain referential integrity. Set flags in fdata to avoid
1843 * mem copying, we just need to know existence. We need to do
1844 * this check before setting DB_DBT_ISSET, otherwise __dbc_get
1845 * will overwrite the flag values.
1846 */
1847 if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) {
1848 #ifdef DIAGNOSTIC
1849 __db_check_skeyset(sdbp, skeyp);
1850 #endif
1851 for (tskeyp = (DBT *)skeyp->data, nskey = skeyp->size;
1852 nskey > 0; nskey--, tskeyp++) {
1853 if (fdbc != NULL) {
1854 memset(&fdata, 0, sizeof(DBT));
1855 F_SET(&fdata,
1856 DB_DBT_PARTIAL | DB_DBT_USERMEM);
1857 if ((ret = __dbc_get(
1858 fdbc, tskeyp, &fdata,
1859 DB_SET | rmw)) == DB_NOTFOUND ||
1860 ret == DB_KEYEMPTY) {
1861 ret = DB_FOREIGN_CONFLICT;
1862 break;
1863 }
1864 }
1865 F_SET(tskeyp, DB_DBT_ISSET);
1866 }
1867 tskeyp = (DBT *)skeyp->data;
1868 nskey = skeyp->size;
1869 } else {
1870 if (fdbc != NULL) {
1871 memset(&fdata, 0, sizeof(DBT));
1872 F_SET(&fdata, DB_DBT_PARTIAL | DB_DBT_USERMEM);
1873 if ((ret = __dbc_get(fdbc, skeyp, &fdata,
1874 DB_SET | rmw)) == DB_NOTFOUND ||
1875 ret == DB_KEYEMPTY)
1876 ret = DB_FOREIGN_CONFLICT;
1877 }
1878 F_SET(skeyp, DB_DBT_ISSET);
1879 tskeyp = skeyp;
1880 nskey = 1;
1881 }
1882 if (fdbc != NULL && (t_ret = __dbc_close(fdbc)) != 0 &&
1883 ret == 0)
1884 ret = t_ret;
1885 fdbc = NULL;
1886 if (ret != 0)
1887 goto err;
1888
1889 /*
1890 * If we have the old record, we can generate and remove any
1891 * old secondary key(s) now. We can also skip the secondary
1892 * put if there is no change.
1893 */
1894 if (FLD_ISSET(*put_statep, DBC_PUT_HAVEREC)) {
1895 if ((ret = __dbc_del_oldskey(sdbp, dbc,
1896 skeyp, pkey, orig_data)) == DB_KEYEXIST)
1897 continue;
1898 else if (ret != 0)
1899 goto err;
1900 }
1901 if (nskey == 0)
1902 continue;
1903
1904 /*
1905 * Open a cursor in this secondary.
1906 *
1907 * Use the same locker ID as our primary cursor, so that
1908 * we're guaranteed that the locks don't conflict (e.g. in CDB
1909 * or if we're subdatabases that share and want to lock a
1910 * metadata page).
1911 */
1912 if ((ret = __db_cursor_int(sdbp, dbc->thread_info, dbc->txn,
1913 sdbp->type, PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
1914 goto err;
1915
1916 /*
1917 * If we're in CDB, updates will fail since the new cursor
1918 * isn't a writer. However, we hold the WRITE lock in the
1919 * primary and will for as long as our new cursor lasts,
1920 * and the primary and secondary share a lock file ID,
1921 * so it's safe to consider this a WRITER. The close
1922 * routine won't try to put anything because we don't
1923 * really have a lock.
1924 */
1925 if (CDB_LOCKING(env)) {
1926 DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
1927 F_SET(sdbc, DBC_WRITER);
1928 }
1929
1930 /*
1931 * Swap the primary key to the byte order of this secondary, if
1932 * necessary. By doing this now, we can compare directly
1933 * against the data already in the secondary without having to
1934 * swap it after reading.
1935 */
1936 SWAP_IF_NEEDED(sdbp, pkey);
1937
1938 for (; nskey > 0 && ret == 0; nskey--, tskeyp++) {
1939 /* Skip this key if it is already in the database. */
1940 if (!F_ISSET(tskeyp, DB_DBT_ISSET))
1941 continue;
1942
1943 /*
1944 * There are three cases here--
1945 * 1) The secondary supports sorted duplicates.
1946 * If we attempt to put a secondary/primary pair
1947 * that already exists, that's a duplicate
1948 * duplicate, and c_put will return DB_KEYEXIST
1949 * (see __db_duperr). This will leave us with
1950 * exactly one copy of the secondary/primary pair,
1951 * and this is just right--we'll avoid deleting it
1952 * later, as the old and new secondaries will
1953 * match (since the old secondary is the dup dup
1954 * that's already there).
1955 * 2) The secondary supports duplicates, but they're not
1956 * sorted. We need to avoid putting a duplicate
1957 * duplicate, because the matching old and new
1958 * secondaries will prevent us from deleting
1959 * anything and we'll wind up with two secondary
1960 * records that point to the same primary key. Do
1961 * a c_get(DB_GET_BOTH); only do the put if the
1962 * secondary doesn't exist.
1963 * 3) The secondary doesn't support duplicates at all.
1964 * In this case, secondary keys must be unique;
1965 * if another primary key already exists for this
1966 * secondary key, we have to either overwrite it
1967 * or not put this one, and in either case we've
1968 * corrupted the secondary index. Do a
1969 * c_get(DB_SET). If the secondary/primary pair
1970 * already exists, do nothing; if the secondary
1971 * exists with a different primary, return an
1972 * error; and if the secondary does not exist,
1973 * put it.
1974 */
1975 if (!F_ISSET(sdbp, DB_AM_DUP)) {
1976 /* Case 3. */
1977 memset(&oldpkey, 0, sizeof(DBT));
1978 F_SET(&oldpkey, DB_DBT_MALLOC);
1979 ret = __dbc_get(sdbc,
1980 tskeyp, &oldpkey, rmw | DB_SET);
1981 if (ret == 0) {
1982 cmp = __dbt_defcmp(sdbp,
1983 &oldpkey, pkey, NULL);
1984 __os_ufree(env, oldpkey.data);
1985 /*
1986 * If the secondary key is unchanged,
1987 * skip the put and go on to the next
1988 * one.
1989 */
1990 if (cmp == 0)
1991 continue;
1992
1993 ret = USR_ERR(env, EINVAL);
1994 __db_errx(env, DB_STR("0695",
1995 "Put results in a non-unique secondary key in an "
1996 "index not configured to support duplicates"));
1997 }
1998 if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
1999 break;
2000 } else if (!F_ISSET(sdbp, DB_AM_DUPSORT)) {
2001 /* Case 2. */
2002 DB_INIT_DBT(tempskey,
2003 tskeyp->data, tskeyp->size);
2004 DB_INIT_DBT(temppkey,
2005 pkey->data, pkey->size);
2006 ret = __dbc_get(sdbc, &tempskey, &temppkey,
2007 rmw | DB_GET_BOTH);
2008 if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
2009 break;
2010 }
2011
2012 ret = __dbc_put(sdbc, tskeyp, pkey,
2013 DB_UPDATE_SECONDARY);
2014
2015 /*
2016 * We don't know yet whether this was a put-overwrite
2017 * that in fact changed nothing. If it was, we may get
2018 * DB_KEYEXIST. This is not an error.
2019 */
2020 if (ret == DB_KEYEXIST)
2021 ret = 0;
2022 }
2023
2024 /* Make sure the primary key is back in native byte-order. */
2025 SWAP_IF_NEEDED(sdbp, pkey);
2026
2027 if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
2028 ret = t_ret;
2029
2030 if (ret != 0)
2031 goto err;
2032
2033 /*
2034 * Mark that we have a key for this secondary so we can check
2035 * it later before deleting the old one. We can't set it
2036 * earlier or it would be cleared in the calls above.
2037 */
2038 F_SET(skeyp, DB_DBT_ISSET);
2039 }
2040 err: if (sdbp != NULL &&
2041 (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
2042 ret = t_ret;
2043 COMPQUIET(s_count, 0);
2044 return (ret);
2045 }
2046
2047 static int
__dbc_put_primary(dbc,key,data,flags)2048 __dbc_put_primary(dbc, key, data, flags)
2049 DBC *dbc;
2050 DBT *key, *data;
2051 u_int32_t flags;
2052 {
2053 DB *dbp, *sdbp;
2054 DBC *dbc_n, *pdbc;
2055 DBT oldkey, olddata, newdata;
2056 DBT *all_skeys, *skeyp, *tskeyp;
2057 ENV *env;
2058 int ret, t_ret, s_count;
2059 u_int32_t nskey, put_state, rmw;
2060
2061 dbp = dbc->dbp;
2062 env = dbp->env;
2063 t_ret = 0;
2064 put_state = 0;
2065 sdbp = NULL;
2066 pdbc = dbc_n = NULL;
2067 all_skeys = NULL;
2068 memset(&newdata, 0, sizeof(DBT));
2069 memset(&olddata, 0, sizeof(DBT));
2070
2071 /*
2072 * We do multiple cursor operations in some cases and subsequently
2073 * access the data DBT information. Set DB_DBT_MALLOC so we don't risk
2074 * modification of the data between our uses of it.
2075 */
2076 F_SET(&olddata, DB_DBT_MALLOC);
2077
2078 /*
2079 * We have at least one secondary which we may need to update.
2080 *
2081 * There is a rather vile locking issue here. Secondary gets
2082 * will always involve acquiring a read lock in the secondary,
2083 * then acquiring a read lock in the primary. Ideally, we
2084 * would likewise perform puts by updating all the secondaries
2085 * first, then doing the actual put in the primary, to avoid
2086 * deadlock (since having multiple threads doing secondary
2087 * gets and puts simultaneously is probably a common case).
2088 *
2089 * However, if this put is a put-overwrite--and we have no way to
2090 * tell in advance whether it will be--we may need to delete
2091 * an outdated secondary key. In order to find that old
2092 * secondary key, we need to get the record we're overwriting,
2093 * before we overwrite it.
2094 *
2095 * (XXX: It would be nice to avoid this extra get, and have the
2096 * underlying put routines somehow pass us the old record
2097 * since they need to traverse the tree anyway. I'm saving
2098 * this optimization for later, as it's a lot of work, and it
2099 * would be hard to fit into this locking paradigm anyway.)
2100 *
2101 * The simple thing to do would be to go get the old record before
2102 * we do anything else. Unfortunately, though, doing so would
2103 * violate our "secondary, then primary" lock acquisition
2104 * ordering--even in the common case where no old primary record
2105 * exists, we'll still acquire and keep a lock on the page where
2106 * we're about to do the primary insert.
2107 *
2108 * To get around this, we do the following gyrations, which
2109 * hopefully solve this problem in the common case:
2110 *
2111 * 1) If this is a c_put(DB_CURRENT), go ahead and get the
2112 * old record. We already hold the lock on this page in
2113 * the primary, so no harm done, and we'll need the primary
2114 * key (which we weren't passed in this case) to do any
2115 * secondary puts anyway.
2116 * If this is a put(DB_APPEND), then we need to insert the item,
2117 * so that we can know the key value. So go ahead and insert. In
2118 * the case of a put(DB_APPEND) without secondaries it is
2119 * implemented in the __db_put method as an optimization.
2120 *
2121 * 2) If we're doing a partial put, we need to perform the
2122 * get on the primary key right away, since we don't have
2123 * the whole datum that the secondary key is based on.
2124 * We may also need to pad out the record if the primary
2125 * has a fixed record length.
2126 *
2127 * 3) Loop through the secondary indices, putting into each a
2128 * new secondary key that corresponds to the new record.
2129 *
2130 * 4) If we haven't done so in (1) or (2), get the old primary
2131 * key/data pair. If one does not exist--the common case--we're
2132 * done with secondary indices, and can go straight on to the
2133 * primary put.
2134 *
2135 * 5) If we do have an old primary key/data pair, however, we need
2136 * to loop through all the secondaries a second time and delete
2137 * the old secondary in each.
2138 */
2139 s_count = __db_s_count(dbp);
2140 if ((ret = __os_calloc(env,
2141 (u_int)s_count, sizeof(DBT), &all_skeys)) != 0)
2142 goto err;
2143
2144 /*
2145 * Primary indices can't have duplicates, so only DB_APPEND,
2146 * DB_CURRENT, DB_KEYFIRST, and DB_KEYLAST make any sense. Other flags
2147 * should have been caught by the checking routine, but
2148 * add a sprinkling of paranoia.
2149 */
2150 DB_ASSERT(env, flags == DB_APPEND || flags == DB_CURRENT ||
2151 flags == DB_KEYFIRST || flags == DB_KEYLAST ||
2152 flags == DB_NOOVERWRITE || flags == DB_OVERWRITE_DUP);
2153
2154 /*
2155 * We'll want to use DB_RMW in a few places, but it's only legal
2156 * when locking is on.
2157 */
2158 rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
2159 if (rmw)
2160 FLD_SET(put_state, DBC_PUT_RMW);
2161
2162 /* Resolve the primary key if required (Step 1). */
2163 if (flags == DB_CURRENT) {
2164 if ((ret = __dbc_put_resolve_key(dbc,
2165 &oldkey, &olddata, &put_state, flags)) != 0)
2166 goto err;
2167 key = &oldkey;
2168 } else if (flags == DB_APPEND) {
2169 if ((ret = __dbc_put_append(dbc,
2170 key, data, &put_state, flags)) != 0)
2171 goto err;
2172 }
2173
2174 /*
2175 * PUT_NOOVERWRITE with secondaries is a troublesome case. We need
2176 * to check that the insert will work prior to making any changes
2177 * to secondaries. Try to work within the locking constraints outlined
2178 * above.
2179 *
2180 * This is DB->put (DB_NOOVERWRITE). DBC->put(DB_NODUPDATA) is not
2181 * relevant since it is only valid on DBs that support duplicates,
2182 * which primaries with secondaries can't have.
2183 */
2184 if (flags == DB_NOOVERWRITE) {
2185 /* Don't bother retrieving the data. */
2186 F_SET(key, DB_DBT_ISSET);
2187 olddata.dlen = 0;
2188 olddata.flags = DB_DBT_PARTIAL | DB_DBT_USERMEM;
2189 ret = __dbc_get(dbc, key, &olddata, DB_SET);
2190 if (ret == 0) {
2191 ret = DBC_ERR(dbc, DB_KEYEXIST);
2192 goto done;
2193 } else if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
2194 goto err;
2195 }
2196
2197 /*
2198 * Check for partial puts using DB_DBT_PARTIAL (Step 2).
2199 */
2200 if (F_ISSET(data, DB_DBT_PARTIAL)) {
2201 if ((ret = __dbc_put_partial(dbc,
2202 key, data, &olddata, &newdata, &put_state, flags)) != 0)
2203 goto err;
2204 } else {
2205 newdata = *data;
2206 }
2207
2208 /*
2209 * Check for partial puts, with fixed length record databases (Step 2).
2210 */
2211 if ((dbp->type == DB_RECNO && F_ISSET(dbp, DB_AM_FIXEDLEN)) ||
2212 (dbp->type == DB_QUEUE)) {
2213 if ((ret = __dbc_put_fixed_len(dbc, data, &newdata)) != 0)
2214 goto err;
2215 }
2216
2217 /* Validate any foreign databases, and update secondaries. (Step 3). */
2218 if ((ret = __dbc_put_secondaries(dbc, key, &newdata,
2219 &olddata, s_count, all_skeys, &put_state))
2220 != 0)
2221 goto err;
2222 /*
2223 * If we've already got the old primary key/data pair, the secondary
2224 * updates are already done.
2225 */
2226 if (FLD_ISSET(put_state, DBC_PUT_HAVEREC))
2227 goto done;
2228
2229 /*
2230 * If still necessary, go get the old primary key/data. (Step 4.)
2231 *
2232 * See the comments in step 2. This is real familiar.
2233 */
2234 if ((ret = __dbc_idup(dbc, &pdbc, 0)) != 0)
2235 goto err;
2236 DB_ASSERT(env, flags != DB_CURRENT);
2237 F_SET(key, DB_DBT_ISSET);
2238 ret = __dbc_get(pdbc, key, &olddata, rmw | DB_SET);
2239 if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
2240 FLD_SET(put_state, DBC_PUT_NODEL);
2241 ret = 0;
2242 }
2243 if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
2244 ret = t_ret;
2245 if (ret != 0)
2246 goto err;
2247
2248 /*
2249 * Check whether we do in fact have an old record we may need to
2250 * delete. (Step 5).
2251 */
2252 if (FLD_ISSET(put_state, DBC_PUT_NODEL))
2253 goto done;
2254
2255 for (ret = __db_s_first(dbp, &sdbp), skeyp = all_skeys;
2256 sdbp != NULL && ret == 0;
2257 ret = __db_s_next(&sdbp, dbc->txn), skeyp++) {
2258 DB_ASSERT(env, skeyp - all_skeys < s_count);
2259 /*
2260 * Don't process this secondary if the key is immutable. We
2261 * know that the old record exists, so this optimization can
2262 * always be used.
2263 */
2264 if (FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
2265 continue;
2266
2267 if ((ret = __dbc_del_oldskey(sdbp, dbc,
2268 skeyp, key, &olddata)) != 0 && ret != DB_KEYEXIST)
2269 goto err;
2270 }
2271 if (ret != 0)
2272 goto err;
2273
2274 done:
2275 err:
2276 if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
2277 ret = t_ret;
2278
2279 /* If newdata or olddata were used, free their buffers. */
2280 if (newdata.data != NULL && newdata.data != data->data)
2281 __os_free(env, newdata.data);
2282 if (olddata.data != NULL)
2283 __os_ufree(env, olddata.data);
2284
2285 if (sdbp != NULL &&
2286 (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
2287 ret = t_ret;
2288
2289 if (all_skeys != NULL) {
2290 for (skeyp = all_skeys; skeyp - all_skeys < s_count; skeyp++) {
2291 if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) {
2292 for (nskey = skeyp->size,
2293 tskeyp = (DBT *)skeyp->data;
2294 nskey > 0;
2295 nskey--, tskeyp++)
2296 FREE_IF_NEEDED(env, tskeyp);
2297 }
2298 FREE_IF_NEEDED(env, skeyp);
2299 }
2300 __os_free(env, all_skeys);
2301 }
2302 return (ret);
2303 }
2304
2305 /*
2306 * __dbc_put --
2307 * Put using a cursor.
2308 *
2309 * PUBLIC: int __dbc_put __P((DBC *, DBT *, DBT *, u_int32_t));
2310 */
2311 int
__dbc_put(dbc,key,data,flags)2312 __dbc_put(dbc, key, data, flags)
2313 DBC *dbc;
2314 DBT *key, *data;
2315 u_int32_t flags;
2316 {
2317 DB *dbp;
2318 int ret;
2319
2320 dbp = dbc->dbp;
2321 ret = 0;
2322 F_CLR(dbc, DBC_ERROR);
2323
2324 /*
2325 * Putting to secondary indices is forbidden; when we need to
2326 * internally update one, we're called with a private flag,
2327 * DB_UPDATE_SECONDARY, which does the right thing but won't return an
2328 * error during flag checking.
2329 *
2330 * As a convenience, many places that want the default DB_KEYLAST
2331 * behavior call DBC->put with flags == 0. Protect lower-level code
2332 * here by translating that.
2333 *
2334 * Lastly, the DB_OVERWRITE_DUP flag is equivalent to DB_KEYLAST unless
2335 * there are sorted duplicates. Limit the number of places that need
2336 * to test for it explicitly.
2337 */
2338 if (flags == DB_UPDATE_SECONDARY || flags == 0 ||
2339 (flags == DB_OVERWRITE_DUP && !F_ISSET(dbp, DB_AM_DUPSORT)))
2340 flags = DB_KEYLAST;
2341
2342 CDB_LOCKING_INIT(dbc->env, dbc);
2343
2344 PERFMON6(env, db, put, dbp->fname, dbp->dname,
2345 dbc->txn == NULL ? 0 : dbc->txn->txnid, key, data, flags);
2346 /*
2347 * Check to see if we are a primary and have secondary indices.
2348 * If we are not, we save ourselves a good bit of trouble and
2349 * just skip to the "normal" put.
2350 */
2351 if (DB_IS_PRIMARY(dbp) &&
2352 ((ret = __dbc_put_primary(dbc, key, data, flags)) != 0))
2353 goto done;
2354
2355 /*
2356 * If this is an append operation, the insert was done prior to the
2357 * secondary updates, so we are finished.
2358 */
2359 if (flags == DB_APPEND)
2360 goto done;
2361
2362 #ifdef HAVE_COMPRESSION
2363 if (DB_IS_COMPRESSED(dbp))
2364 ret = __bamc_compress_put(dbc, key, data, flags);
2365 else
2366 #endif
2367 ret = __dbc_iput(dbc, key, data, flags);
2368
2369 done: CDB_LOCKING_DONE(dbc->env, dbc);
2370
2371 return (ret);
2372 }
2373
2374 /*
2375 * __dbc_iput --
2376 * Implementation of put using a cursor.
2377 *
2378 * PUBLIC: int __dbc_iput __P((DBC *, DBT *, DBT *, u_int32_t));
2379 */
2380 int
__dbc_iput(dbc,key,data,flags)2381 __dbc_iput(dbc, key, data, flags)
2382 DBC *dbc;
2383 DBT *key, *data;
2384 u_int32_t flags;
2385 {
2386 DBC *dbc_n, *oldopd, *opd;
2387 db_pgno_t pgno;
2388 int ret, t_ret;
2389 u_int32_t tmp_flags;
2390
2391 /*
2392 * Cursor Cleanup Note:
2393 * All of the cursors passed to the underlying access methods by this
2394 * routine are duplicated cursors. On return, any referenced pages
2395 * will be discarded, and, if the cursor is not intended to be used
2396 * again, the close function will be called. So, pages/locks that
2397 * the cursor references do not need to be resolved by the underlying
2398 * functions.
2399 */
2400 dbc_n = NULL;
2401 ret = t_ret = 0;
2402
2403 /*
2404 * If we have an off-page duplicates cursor, and the operation applies
2405 * to it, perform the operation. Duplicate the cursor and call the
2406 * underlying function.
2407 *
2408 * Off-page duplicate trees are locked in the primary tree, that is,
2409 * we acquire a write lock in the primary tree and no locks in the
2410 * off-page dup tree. If the put operation is done in an off-page
2411 * duplicate tree, call the primary cursor's upgrade routine first.
2412 */
2413 if (dbc->internal->opd != NULL &&
2414 (flags == DB_AFTER || flags == DB_BEFORE || flags == DB_CURRENT)) {
2415 /*
2416 * A special case for hash off-page duplicates. Hash doesn't
2417 * support (and is documented not to support) put operations
2418 * relative to a cursor which references an already deleted
2419 * item. For consistency, apply the same criteria to off-page
2420 * duplicates as well.
2421 */
2422 if (dbc->dbtype == DB_HASH && F_ISSET(
2423 ((BTREE_CURSOR *)(dbc->internal->opd->internal)),
2424 C_DELETED)) {
2425 ret = DBC_ERR(dbc, DB_NOTFOUND);
2426 goto err;
2427 }
2428
2429 if ((ret = dbc->am_writelock(dbc)) != 0 ||
2430 (ret = __dbc_dup(dbc, &dbc_n, DB_POSITION)) != 0)
2431 goto err;
2432 opd = dbc_n->internal->opd;
2433 if ((ret = opd->am_put(
2434 opd, key, data, flags, NULL)) != 0)
2435 goto err;
2436 goto done;
2437 }
2438
2439 /*
2440 * Perform an operation on the main cursor. Duplicate the cursor,
2441 * and call the underlying function.
2442 */
2443 if (flags == DB_AFTER || flags == DB_BEFORE || flags == DB_CURRENT)
2444 tmp_flags = DB_POSITION;
2445 else
2446 tmp_flags = 0;
2447
2448 /*
2449 * If this cursor is going to be closed immediately, we don't
2450 * need to take precautions to clean it up on error.
2451 */
2452 if (F_ISSET(dbc, DBC_TRANSIENT | DBC_PARTITIONED))
2453 dbc_n = dbc;
2454 else if ((ret = __dbc_idup(dbc, &dbc_n, tmp_flags)) != 0)
2455 goto err;
2456
2457 pgno = PGNO_INVALID;
2458 if ((ret = dbc_n->am_put(dbc_n, key, data, flags, &pgno)) != 0)
2459 goto err;
2460
2461 /*
2462 * We may be referencing a new off-page duplicates tree. Acquire
2463 * a new cursor and call the underlying function.
2464 */
2465 if (pgno != PGNO_INVALID) {
2466 oldopd = dbc_n->internal->opd;
2467 if ((ret = __dbc_newopd(dbc, pgno, oldopd, &opd)) != 0) {
2468 dbc_n->internal->opd = opd;
2469 goto err;
2470 }
2471
2472 dbc_n->internal->opd = opd;
2473 opd->internal->pdbc = dbc_n;
2474
2475 if (flags == DB_NOOVERWRITE)
2476 flags = DB_KEYLAST;
2477 if ((ret = opd->am_put(
2478 opd, key, data, flags, NULL)) != 0)
2479 goto err;
2480 }
2481
2482 done:
2483 err: /* Cleanup and cursor resolution. */
2484 if (dbc_n != NULL && !DB_RETOK_DBCPUT(ret))
2485 F_SET(dbc_n, DBC_ERROR);
2486 if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
2487 ret = t_ret;
2488 return (ret);
2489 }
2490
2491 /*
2492 * __dbc_del_oldskey --
2493 * Delete an old secondary key, if necessary.
2494 * Returns DB_KEYEXIST if the new and old keys match..
2495 */
2496 static int
__dbc_del_oldskey(sdbp,dbc,skey,pkey,olddata)2497 __dbc_del_oldskey(sdbp, dbc, skey, pkey, olddata)
2498 DB *sdbp;
2499 DBC *dbc;
2500 DBT *skey, *pkey, *olddata;
2501 {
2502 DB *dbp;
2503 DBC *sdbc;
2504 DBT *toldskeyp, *tskeyp;
2505 DBT oldskey, temppkey, tempskey;
2506 ENV *env;
2507 int ret, t_ret;
2508 u_int32_t i, noldskey, nsame, nskey, rmw;
2509
2510 sdbc = NULL;
2511 dbp = sdbp->s_primary;
2512 env = dbp->env;
2513 nsame = 0;
2514 rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
2515
2516 /*
2517 * Get the old secondary key.
2518 */
2519 memset(&oldskey, 0, sizeof(DBT));
2520 if ((ret = sdbp->s_callback(sdbp, pkey, olddata, &oldskey)) != 0) {
2521 if (ret == DB_DONOTINDEX ||
2522 (F_ISSET(&oldskey, DB_DBT_MULTIPLE) && oldskey.size == 0))
2523 /* There's no old key to delete. */
2524 ret = 0;
2525 return (ret);
2526 }
2527
2528 if (F_ISSET(&oldskey, DB_DBT_MULTIPLE)) {
2529 #ifdef DIAGNOSTIC
2530 __db_check_skeyset(sdbp, &oldskey);
2531 #endif
2532 toldskeyp = (DBT *)oldskey.data;
2533 noldskey = oldskey.size;
2534 } else {
2535 toldskeyp = &oldskey;
2536 noldskey = 1;
2537 }
2538
2539 if (F_ISSET(skey, DB_DBT_MULTIPLE)) {
2540 nskey = skey->size;
2541 skey = (DBT *)skey->data;
2542 } else
2543 nskey = F_ISSET(skey, DB_DBT_ISSET) ? 1 : 0;
2544
2545 for (; noldskey > 0 && ret == 0; noldskey--, toldskeyp++) {
2546 /*
2547 * Check whether this old secondary key is also a new key
2548 * before we delete it. Note that bt_compare is (and must be)
2549 * set no matter what access method we're in.
2550 */
2551 for (i = 0, tskeyp = skey; i < nskey; i++, tskeyp++)
2552 if (((BTREE *)sdbp->bt_internal)->bt_compare(sdbp,
2553 toldskeyp, tskeyp, NULL) == 0) {
2554 nsame++;
2555 F_CLR(tskeyp, DB_DBT_ISSET);
2556 break;
2557 }
2558
2559 if (i < nskey) {
2560 FREE_IF_NEEDED(env, toldskeyp);
2561 continue;
2562 }
2563
2564 if (sdbc == NULL) {
2565 if ((ret = __db_cursor_int(sdbp,
2566 dbc->thread_info, dbc->txn, sdbp->type,
2567 PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
2568 goto err;
2569 if (CDB_LOCKING(env)) {
2570 DB_ASSERT(env,
2571 sdbc->mylock.off == LOCK_INVALID);
2572 F_SET(sdbc, DBC_WRITER);
2573 }
2574 }
2575
2576 /*
2577 * Don't let c_get(DB_GET_BOTH) stomp on our data. Use
2578 * temporary DBTs instead.
2579 */
2580 SWAP_IF_NEEDED(sdbp, pkey);
2581 DB_INIT_DBT(temppkey, pkey->data, pkey->size);
2582 DB_INIT_DBT(tempskey, toldskeyp->data, toldskeyp->size);
2583 if ((ret = __dbc_get(sdbc,
2584 &tempskey, &temppkey, rmw | DB_GET_BOTH)) == 0)
2585 ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY);
2586 else if (ret == DB_NOTFOUND)
2587 ret = __db_secondary_corrupt(dbp);
2588 SWAP_IF_NEEDED(sdbp, pkey);
2589 FREE_IF_NEEDED(env, toldskeyp);
2590 }
2591
2592 err: for (; noldskey > 0; noldskey--, toldskeyp++)
2593 FREE_IF_NEEDED(env, toldskeyp);
2594 FREE_IF_NEEDED(env, &oldskey);
2595 if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
2596 ret = t_ret;
2597 if (ret == 0 && nsame == nskey)
2598 return (DB_KEYEXIST);
2599 return (ret);
2600 }
2601
2602 /*
2603 * __db_duperr()
2604 * Error message: we don't currently support sorted duplicate duplicates.
2605 * PUBLIC: int __db_duperr __P((DB *, u_int32_t));
2606 */
2607 int
__db_duperr(dbp,flags)2608 __db_duperr(dbp, flags)
2609 DB *dbp;
2610 u_int32_t flags;
2611 {
2612 /*
2613 * If we run into this error while updating a secondary index,
2614 * don't yell--there's no clean way to pass DB_NODUPDATA in along
2615 * with DB_UPDATE_SECONDARY, but we may run into this problem
2616 * in a normal, non-error course of events.
2617 *
2618 * !!!
2619 * If and when we ever permit duplicate duplicates in sorted-dup
2620 * databases, we need to either change the secondary index code
2621 * to check for dup dups, or we need to maintain the implicit
2622 * "DB_NODUPDATA" behavior for databases with DB_AM_SECONDARY set.
2623 */
2624 if (flags != DB_NODUPDATA && !F_ISSET(dbp, DB_AM_SECONDARY))
2625 __db_errx(dbp->env, DB_STR("0696",
2626 "Duplicate data items are not supported with sorted data"));
2627 return (DB_KEYEXIST);
2628 }
2629
2630 /*
2631 * __dbc_cleanup --
2632 * Clean up duplicate cursors.
2633 *
2634 * PUBLIC: int __dbc_cleanup __P((DBC *, DBC *, int));
2635 */
2636 int
__dbc_cleanup(dbc,dbc_n,failed)2637 __dbc_cleanup(dbc, dbc_n, failed)
2638 DBC *dbc, *dbc_n;
2639 int failed;
2640 {
2641 DB *dbp;
2642 DBC *opd;
2643 DBC_INTERNAL *internal;
2644 DB_MPOOLFILE *mpf;
2645 int ret, t_ret;
2646
2647 if (F_ISSET(dbc, DBC_OPD))
2648 LOCK_CHECK_OFF(dbc->thread_info);
2649
2650 dbp = dbc->dbp;
2651 mpf = dbp->mpf;
2652 internal = dbc->internal;
2653 ret = 0;
2654
2655 /* Discard any pages we're holding. */
2656 if (internal->page != NULL) {
2657 if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2658 internal->page, dbc->priority)) != 0 && ret == 0)
2659 ret = t_ret;
2660 internal->page = NULL;
2661 }
2662 opd = internal->opd;
2663 if (opd != NULL && opd->internal->page != NULL) {
2664 if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2665 opd->internal->page, dbc->priority)) != 0 && ret == 0)
2666 ret = t_ret;
2667 opd->internal->page = NULL;
2668 }
2669
2670 /*
2671 * If dbc_n is NULL, there's no internal cursor swapping to be done
2672 * and no dbc_n to close--we probably did the entire operation on an
2673 * offpage duplicate cursor. Just return.
2674 *
2675 * If dbc and dbc_n are the same, we're either inside a DB->{put/get}
2676 * operation, and as an optimization we performed the operation on
2677 * the main cursor rather than on a duplicated one, or we're in a
2678 * bulk get that can't have moved the cursor (DB_MULTIPLE with the
2679 * initial c_get operation on an off-page dup cursor). Just
2680 * return--either we know we didn't move the cursor, or we're going
2681 * to close it before we return to application code, so we're sure
2682 * not to visibly violate the "cursor stays put on error" rule.
2683 */
2684 if (dbc_n == NULL || dbc == dbc_n)
2685 goto done;
2686
2687 if (dbc_n->internal->page != NULL) {
2688 if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2689 dbc_n->internal->page, dbc->priority)) != 0 && ret == 0)
2690 ret = t_ret;
2691 dbc_n->internal->page = NULL;
2692 }
2693 opd = dbc_n->internal->opd;
2694 if (opd != NULL && opd->internal->page != NULL) {
2695 if ((t_ret = __memp_fput(mpf, dbc->thread_info,
2696 opd->internal->page, dbc->priority)) != 0 && ret == 0)
2697 ret = t_ret;
2698 opd->internal->page = NULL;
2699 }
2700
2701 /*
2702 * If we didn't fail before entering this routine or just now when
2703 * freeing pages, swap the interesting contents of the old and new
2704 * cursors.
2705 */
2706 if (!failed && ret == 0) {
2707 MUTEX_LOCK(dbp->env, dbp->mutex);
2708 if (opd != NULL)
2709 opd->internal->pdbc = dbc;
2710 if (internal->opd != NULL)
2711 internal->opd->internal->pdbc = dbc_n;
2712 dbc->internal = dbc_n->internal;
2713 dbc_n->internal = internal;
2714 MUTEX_UNLOCK(dbp->env, dbp->mutex);
2715 }
2716
2717 /*
2718 * Close the cursor we don't care about anymore. The close can fail,
2719 * but we only expect DB_LOCK_DEADLOCK failures. This violates our
2720 * "the cursor is unchanged on error" semantics, but since all you can
2721 * do with a DB_LOCK_DEADLOCK failure is close the cursor, I believe
2722 * that's OK.
2723 *
2724 * XXX
2725 * There's no way to recover from failure to close the old cursor.
2726 * All we can do is move to the new position and return an error.
2727 *
2728 * XXX
2729 * We might want to consider adding a flag to the cursor, so that any
2730 * subsequent operations other than close just return an error?
2731 */
2732 if ((t_ret = __dbc_close(dbc_n)) != 0 && ret == 0)
2733 ret = t_ret;
2734
2735 /*
2736 * If this was an update that is supporting dirty reads
2737 * then we may have just swapped our read for a write lock
2738 * which is held by the surviving cursor. We need
2739 * to explicitly downgrade this lock. The closed cursor
2740 * may only have had a read lock.
2741 */
2742 if (ret == 0 && failed == 0 && F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
2743 dbc->internal->lock_mode == DB_LOCK_WRITE &&
2744 (ret = __TLPUT(dbc, dbc->internal->lock)) == 0)
2745 dbc->internal->lock_mode = DB_LOCK_WWRITE;
2746
2747 done:
2748 if (F_ISSET(dbc, DBC_OPD))
2749 LOCK_CHECK_ON(dbc->thread_info);
2750
2751 return (ret);
2752 }
2753
2754 /*
2755 * __dbc_secondary_get_pp --
2756 * This wrapper function for DBC->pget() is the DBC->get() function
2757 * for a secondary index cursor.
2758 *
2759 * PUBLIC: int __dbc_secondary_get_pp __P((DBC *, DBT *, DBT *, u_int32_t));
2760 */
2761 int
__dbc_secondary_get_pp(dbc,skey,data,flags)2762 __dbc_secondary_get_pp(dbc, skey, data, flags)
2763 DBC *dbc;
2764 DBT *skey, *data;
2765 u_int32_t flags;
2766 {
2767 DB_ASSERT(dbc->env, F_ISSET(dbc->dbp, DB_AM_SECONDARY));
2768 return (__dbc_pget_pp(dbc, skey, NULL, data, flags));
2769 }
2770
2771 /*
2772 * __dbc_pget --
2773 * Get a primary key/data pair through a secondary index.
2774 *
2775 * PUBLIC: int __dbc_pget __P((DBC *, DBT *, DBT *, DBT *, u_int32_t));
2776 */
2777 int
__dbc_pget(dbc,skey,pkey,data,flags)2778 __dbc_pget(dbc, skey, pkey, data, flags)
2779 DBC *dbc;
2780 DBT *skey, *pkey, *data;
2781 u_int32_t flags;
2782 {
2783 DB *pdbp, *sdbp;
2784 DBC *dbc_n, *pdbc;
2785 DBT nullpkey, *save_data;
2786 u_int32_t save_pkey_flags, tmp_flags, tmp_read_locking, tmp_rmw;
2787 int pkeymalloc, ret, t_ret;
2788
2789 sdbp = dbc->dbp;
2790 pdbp = sdbp->s_primary;
2791 dbc_n = NULL;
2792 save_data = NULL;
2793 pkeymalloc = t_ret = 0;
2794
2795 /*
2796 * The challenging part of this function is getting the behavior
2797 * right for all the various permutations of DBT flags. The
2798 * next several blocks handle the various cases we need to
2799 * deal with specially.
2800 */
2801
2802 /*
2803 * We may be called with a NULL pkey argument, if we've been
2804 * wrapped by a 2-DBT get call. If so, we need to use our
2805 * own DBT.
2806 */
2807 if (pkey == NULL) {
2808 memset(&nullpkey, 0, sizeof(DBT));
2809 pkey = &nullpkey;
2810 }
2811
2812 /* Clear OR'd in additional bits so we can check for flag equality. */
2813 tmp_rmw = LF_ISSET(DB_RMW);
2814 LF_CLR(DB_RMW);
2815
2816 SET_READ_LOCKING_FLAGS(dbc, tmp_read_locking);
2817 /*
2818 * DB_GET_RECNO is a special case, because we're interested not in
2819 * the primary key/data pair, but rather in the primary's record
2820 * number.
2821 */
2822 if (flags == DB_GET_RECNO) {
2823 if (tmp_rmw)
2824 F_SET(dbc, DBC_RMW);
2825 F_SET(dbc, tmp_read_locking);
2826 ret = __dbc_pget_recno(dbc, pkey, data, flags);
2827 if (tmp_rmw)
2828 F_CLR(dbc, DBC_RMW);
2829 /* Clear the temp flags, but leave WAS_READ_COMMITTED. */
2830 F_CLR(dbc, tmp_read_locking & ~DBC_WAS_READ_COMMITTED);
2831 return (ret);
2832 }
2833
2834 /*
2835 * If the DBTs we've been passed don't have any of the
2836 * user-specified memory management flags set, we want to make sure
2837 * we return values using the DBTs dbc->rskey, dbc->rkey, and
2838 * dbc->rdata, respectively.
2839 *
2840 * There are two tricky aspects to this: first, we need to pass
2841 * skey and pkey *in* to the initial c_get on the secondary key,
2842 * since either or both may be looked at by it (depending on the
2843 * get flag). Second, we must not use a normal DB->get call
2844 * on the secondary, even though that's what we want to accomplish,
2845 * because the DB handle may be free-threaded. Instead,
2846 * we open a cursor, then take steps to ensure that we actually use
2847 * the rkey/rdata from the *secondary* cursor.
2848 *
2849 * We accomplish all this by passing in the DBTs we started out
2850 * with to the c_get, but swapping the contents of rskey and rkey,
2851 * respectively, into rkey and rdata; __db_ret will treat them like
2852 * the normal key/data pair in a c_get call, and will realloc them as
2853 * need be (this is "step 1"). Then, for "step 2", we swap back
2854 * rskey/rkey/rdata to normal, and do a get on the primary with the
2855 * secondary dbc appointed as the owner of the returned-data memory.
2856 *
2857 * Note that in step 2, we copy the flags field in case we need to
2858 * pass down a DB_DBT_PARTIAL or other flag that is compatible with
2859 * letting DB do the memory management.
2860 */
2861
2862 /*
2863 * It is correct, though slightly sick, to attempt a partial get of a
2864 * primary key. However, if we do so here, we'll never find the
2865 * primary record; clear the DB_DBT_PARTIAL field of pkey just for the
2866 * duration of the next call.
2867 */
2868 save_pkey_flags = pkey->flags;
2869 F_CLR(pkey, DB_DBT_PARTIAL);
2870
2871 /*
2872 * Now we can go ahead with the meat of this call. First, get the
2873 * primary key from the secondary index. (What exactly we get depends
2874 * on the flags, but the underlying cursor get will take care of the
2875 * dirty work.) Duplicate the cursor, in case the later get on the
2876 * primary fails.
2877 */
2878 switch (flags) {
2879 case DB_CURRENT:
2880 case DB_GET_BOTHC:
2881 case DB_NEXT:
2882 case DB_NEXT_DUP:
2883 case DB_NEXT_NODUP:
2884 case DB_PREV:
2885 case DB_PREV_DUP:
2886 case DB_PREV_NODUP:
2887 tmp_flags = DB_POSITION;
2888 break;
2889 default:
2890 tmp_flags = 0;
2891 break;
2892 }
2893
2894 if (dbc->internal->opd != NULL ||
2895 F_ISSET(dbc, DBC_PARTITIONED | DBC_TRANSIENT)) {
2896 dbc_n = dbc;
2897 save_data = dbc_n->rdata;
2898 } else {
2899 if ((ret = __dbc_dup(dbc, &dbc_n, tmp_flags)) != 0)
2900 return (ret);
2901 F_SET(dbc_n, DBC_TRANSIENT);
2902 }
2903 dbc_n->rdata = dbc->rkey;
2904 dbc_n->rkey = dbc->rskey;
2905
2906 if (tmp_rmw)
2907 F_SET(dbc_n, DBC_RMW);
2908 F_SET(dbc_n, tmp_read_locking);
2909
2910 /*
2911 * If we've been handed a primary key, it will be in native byte order,
2912 * so we need to swap it before reading from the secondary.
2913 */
2914 if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
2915 flags == DB_GET_BOTH_RANGE)
2916 SWAP_IF_NEEDED(sdbp, pkey);
2917
2918 retry: /* Step 1. */
2919 ret = __dbc_get(dbc_n, skey, pkey, flags);
2920 /* Restore pkey's flags in case we stomped the PARTIAL flag. */
2921 pkey->flags = save_pkey_flags;
2922
2923 /*
2924 * We need to swap the primary key to native byte order if we read it
2925 * successfully, or if we swapped it on entry above. We can't return
2926 * with the application's data modified.
2927 */
2928 if (ret == 0 || flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
2929 flags == DB_GET_BOTH_RANGE)
2930 SWAP_IF_NEEDED(sdbp, pkey);
2931
2932 if (ret != 0)
2933 goto err;
2934
2935 /*
2936 * Now we're ready for "step 2". If either or both of pkey and data do
2937 * not have memory management flags set--that is, if DB is managing
2938 * their memory--we need to swap around the rkey/rdata structures so
2939 * that we don't wind up trying to use memory managed by the primary
2940 * database cursor, which we'll close before we return.
2941 *
2942 * !!!
2943 * If you're carefully following the bouncing ball, you'll note that in
2944 * the DB-managed case, the buffer hanging off of pkey is the same as
2945 * dbc->rkey->data. This is just fine; we may well realloc and stomp
2946 * on it when we return, if we're doing a DB_GET_BOTH and need to
2947 * return a different partial or key (depending on the comparison
2948 * function), but this is safe.
2949 *
2950 * !!!
2951 * We need to use __db_cursor_int here rather than simply calling
2952 * pdbp->cursor, because otherwise, if we're in CDB, we'll allocate a
2953 * new locker ID and leave ourselves open to deadlocks. (Even though
2954 * we're only acquiring read locks, we'll still block if there are any
2955 * waiters.)
2956 */
2957 if ((ret = __db_cursor_int(pdbp, dbc->thread_info,
2958 dbc->txn, pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
2959 goto err;
2960
2961 F_SET(pdbc, tmp_read_locking |
2962 F_ISSET(dbc, DBC_READ_UNCOMMITTED | DBC_READ_COMMITTED | DBC_RMW));
2963
2964 /*
2965 * We're about to use pkey a second time. If DB_DBT_MALLOC is set on
2966 * it, we'll leak the memory we allocated the first time. Thus, set
2967 * DB_DBT_REALLOC instead so that we reuse that memory instead of
2968 * leaking it.
2969 *
2970 * Alternatively, if the application is handling copying for pkey, we
2971 * need to take a copy now. The copy will be freed on exit from
2972 * __dbc_pget_pp (and we must be coming through there if DB_DBT_USERCOPY
2973 * is set). In the case of DB_GET_BOTH_RANGE, the pkey supplied by
2974 * the application has already been copied in but the value may have
2975 * changed in the search. In that case, free the original copy and get
2976 * a new one.
2977 *
2978 * !!!
2979 * This assumes that the user must always specify a compatible realloc
2980 * function if a malloc function is specified. I think this is a
2981 * reasonable requirement.
2982 */
2983 if (F_ISSET(pkey, DB_DBT_MALLOC)) {
2984 F_CLR(pkey, DB_DBT_MALLOC);
2985 F_SET(pkey, DB_DBT_REALLOC);
2986 pkeymalloc = 1;
2987 } else if (F_ISSET(pkey, DB_DBT_USERCOPY)) {
2988 if (flags == DB_GET_BOTH_RANGE)
2989 __dbt_userfree(sdbp->env, NULL, pkey, NULL);
2990 if ((ret = __dbt_usercopy(sdbp->env, pkey)) != 0)
2991 goto err;
2992 }
2993
2994 /*
2995 * Do the actual get. Set DBC_TRANSIENT since we don't care about
2996 * preserving the position on error, and it's faster. SET_RET_MEM so
2997 * that the secondary DBC owns any returned-data memory.
2998 */
2999 F_SET(pdbc, DBC_TRANSIENT);
3000 SET_RET_MEM(pdbc, dbc);
3001 ret = __dbc_get(pdbc, pkey, data, DB_SET);
3002 DB_ASSERT(pdbp->env, ret != DB_PAGE_NOTFOUND);
3003
3004 /*
3005 * If the item wasn't found in the primary, this is a bug; our
3006 * secondary has somehow gotten corrupted, and contains elements that
3007 * don't correspond to anything in the primary. Complain.
3008 */
3009
3010 /* Now close the primary cursor. */
3011 if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3012 ret = t_ret;
3013
3014 else if (ret == DB_NOTFOUND) {
3015 if (!F_ISSET(dbc, DBC_READ_UNCOMMITTED))
3016 ret = __db_secondary_corrupt(pdbp);
3017 else switch (flags) {
3018 case DB_GET_BOTHC:
3019 case DB_NEXT:
3020 case DB_NEXT_DUP:
3021 case DB_NEXT_NODUP:
3022 case DB_PREV:
3023 case DB_PREV_DUP:
3024 case DB_PREV_NODUP:
3025 PERFMON5(pdbp->env, race, dbc_get,
3026 sdbp->fname, sdbp->dname, ret, flags, pkey);
3027 goto retry;
3028 default:
3029 break;
3030 }
3031 }
3032
3033 err: /* Cleanup and cursor resolution. */
3034 if (dbc_n == dbc) {
3035 dbc_n->rkey = dbc_n->rdata;
3036 dbc_n->rdata = save_data;
3037 }
3038 if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
3039 ret = t_ret;
3040 if (pkeymalloc) {
3041 /*
3042 * If pkey had a MALLOC flag, we need to restore it; otherwise,
3043 * if the user frees the buffer but reuses the DBT without
3044 * NULL'ing its data field or changing the flags, we may drop
3045 * core.
3046 */
3047 F_CLR(pkey, DB_DBT_REALLOC);
3048 F_SET(pkey, DB_DBT_MALLOC);
3049 }
3050
3051 return (ret);
3052 }
3053
3054 /*
3055 * __dbc_pget_recno --
3056 * Perform a DB_GET_RECNO c_pget on a secondary index. Returns
3057 * the secondary's record number in the pkey field and the primary's
3058 * in the data field.
3059 */
3060 static int
__dbc_pget_recno(sdbc,pkey,data,flags)3061 __dbc_pget_recno(sdbc, pkey, data, flags)
3062 DBC *sdbc;
3063 DBT *pkey, *data;
3064 u_int32_t flags;
3065 {
3066 DB *pdbp, *sdbp;
3067 DBC *pdbc;
3068 DBT discardme, primary_key;
3069 ENV *env;
3070 db_recno_t oob;
3071 u_int32_t rmw;
3072 int ret, t_ret;
3073
3074 sdbp = sdbc->dbp;
3075 pdbp = sdbp->s_primary;
3076 env = sdbp->env;
3077 pdbc = NULL;
3078 ret = t_ret = 0;
3079
3080 rmw = LF_ISSET(DB_RMW);
3081
3082 memset(&discardme, 0, sizeof(DBT));
3083 F_SET(&discardme, DB_DBT_USERMEM | DB_DBT_PARTIAL);
3084
3085 oob = RECNO_OOB;
3086
3087 /*
3088 * If the primary is an rbtree, we want its record number, whether
3089 * or not the secondary is one too. Fetch the recno into "data".
3090 *
3091 * If it's not an rbtree, return RECNO_OOB in "data".
3092 */
3093 if (F_ISSET(pdbp, DB_AM_RECNUM)) {
3094 /*
3095 * Get the primary key, so we can find the record number
3096 * in the primary. (We're uninterested in the secondary key.)
3097 */
3098 memset(&primary_key, 0, sizeof(DBT));
3099 F_SET(&primary_key, DB_DBT_MALLOC);
3100 if ((ret = __dbc_get(sdbc,
3101 &discardme, &primary_key, rmw | DB_CURRENT)) != 0)
3102 return (ret);
3103
3104 /*
3105 * Open a cursor on the primary, set it to the right record,
3106 * and fetch its recno into "data".
3107 *
3108 * (See __dbc_pget for comments on the use of __db_cursor_int.)
3109 *
3110 * SET_RET_MEM so that the secondary DBC owns any returned-data
3111 * memory.
3112 */
3113 if ((ret = __db_cursor_int(pdbp, sdbc->thread_info, sdbc->txn,
3114 pdbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
3115 goto perr;
3116 SET_RET_MEM(pdbc, sdbc);
3117 if ((ret = __dbc_get(pdbc,
3118 &primary_key, &discardme, rmw | DB_SET)) != 0)
3119 goto perr;
3120
3121 ret = __dbc_get(pdbc, &discardme, data, rmw | DB_GET_RECNO);
3122
3123 perr: __os_ufree(env, primary_key.data);
3124 if (pdbc != NULL &&
3125 (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3126 ret = t_ret;
3127 if (ret != 0)
3128 return (ret);
3129 } else if ((ret = __db_retcopy(env, data, &oob,
3130 sizeof(oob), &sdbc->rkey->data, &sdbc->rkey->ulen)) != 0)
3131 return (ret);
3132
3133 /*
3134 * If the secondary is an rbtree, we want its record number, whether
3135 * or not the primary is one too. Fetch the recno into "pkey".
3136 *
3137 * If it's not an rbtree, return RECNO_OOB in "pkey".
3138 */
3139 if (F_ISSET(sdbp, DB_AM_RECNUM))
3140 return (__dbc_get(sdbc, &discardme, pkey, flags));
3141 else
3142 return (__db_retcopy(env, pkey, &oob,
3143 sizeof(oob), &sdbc->rdata->data, &sdbc->rdata->ulen));
3144 }
3145
3146 /*
3147 * __db_wrlock_err -- do not have a write lock.
3148 */
3149 static int
__db_wrlock_err(env)3150 __db_wrlock_err(env)
3151 ENV *env;
3152 {
3153 __db_errx(env, DB_STR("0697", "Write attempted on read-only cursor"));
3154 return (EPERM);
3155 }
3156
3157 /*
3158 * __dbc_del_secondary --
3159 * Perform a delete operation on a secondary index: call through
3160 * to the primary and delete the primary record that this record
3161 * points to.
3162 *
3163 * Note that deleting the primary record will call c_del on all
3164 * the secondaries, including this one; thus, it is not necessary
3165 * to execute both this function and an actual delete.
3166 */
3167 static int
__dbc_del_secondary(dbc)3168 __dbc_del_secondary(dbc)
3169 DBC *dbc;
3170 {
3171 DB *pdbp;
3172 DBC *pdbc;
3173 DBT skey, pkey;
3174 ENV *env;
3175 int ret, t_ret;
3176 u_int32_t rmw;
3177
3178 pdbp = dbc->dbp->s_primary;
3179 env = pdbp->env;
3180 rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
3181
3182 /*
3183 * Get the current item that we're pointing at.
3184 * We don't actually care about the secondary key, just
3185 * the primary.
3186 */
3187 memset(&skey, 0, sizeof(DBT));
3188 memset(&pkey, 0, sizeof(DBT));
3189 F_SET(&skey, DB_DBT_PARTIAL | DB_DBT_USERMEM);
3190 if ((ret = __dbc_get(dbc, &skey, &pkey, DB_CURRENT)) != 0)
3191 return (ret);
3192
3193 SWAP_IF_NEEDED(dbc->dbp, &pkey);
3194 DEBUG_LWRITE(dbc, dbc->txn, "del_secondary", &skey, &pkey, 0);
3195
3196 /*
3197 * Create a cursor on the primary with our locker ID,
3198 * so that when it calls back, we don't conflict.
3199 *
3200 * We create a cursor explicitly because there's no
3201 * way to specify the same locker ID if we're using
3202 * locking but not transactions if we use the DB->del
3203 * interface. This shouldn't be any less efficient
3204 * anyway.
3205 */
3206 if ((ret = __db_cursor_int(pdbp, dbc->thread_info, dbc->txn,
3207 pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
3208 return (ret);
3209
3210 /*
3211 * See comment in __dbc_put--if we're in CDB,
3212 * we already hold the locks we need, and we need to flag
3213 * the cursor as a WRITER so we don't run into errors
3214 * when we try to delete.
3215 */
3216 if (CDB_LOCKING(env)) {
3217 DB_ASSERT(env, pdbc->mylock.off == LOCK_INVALID);
3218 F_SET(pdbc, DBC_WRITER);
3219 }
3220
3221 /*
3222 * Set the new cursor to the correct primary key. Then
3223 * delete it. We don't really care about the datum;
3224 * just reuse our skey DBT.
3225 *
3226 * If the primary get returns DB_NOTFOUND, something is amiss--
3227 * every record in the secondary should correspond to some record
3228 * in the primary.
3229 */
3230 if ((ret = __dbc_get(pdbc, &pkey, &skey, DB_SET | rmw)) == 0)
3231 ret = __dbc_del(pdbc, 0);
3232 else if (ret == DB_NOTFOUND)
3233 ret = __db_secondary_corrupt(pdbp);
3234
3235 if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3236 ret = t_ret;
3237
3238 return (ret);
3239 }
3240
3241 /*
3242 * __dbc_del_primary --
3243 * Perform a delete operation on a primary index. Loop through
3244 * all the secondary indices which correspond to this primary
3245 * database, and delete any secondary keys that point at the current
3246 * record.
3247 *
3248 * PUBLIC: int __dbc_del_primary __P((DBC *));
3249 */
3250 int
__dbc_del_primary(dbc)3251 __dbc_del_primary(dbc)
3252 DBC *dbc;
3253 {
3254 DB *dbp, *sdbp;
3255 DBC *sdbc;
3256 DBT *tskeyp;
3257 DBT data, pkey, skey, temppkey, tempskey;
3258 ENV *env;
3259 u_int32_t nskey, rmw;
3260 int ret, t_ret;
3261
3262 dbp = dbc->dbp;
3263 env = dbp->env;
3264 sdbp = NULL;
3265 rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
3266
3267 /*
3268 * If we're called at all, we have at least one secondary.
3269 * (Unfortunately, we can't assert this without grabbing the mutex.)
3270 * Get the current record so that we can construct appropriate
3271 * secondary keys as needed.
3272 */
3273 memset(&pkey, 0, sizeof(DBT));
3274 memset(&data, 0, sizeof(DBT));
3275 if ((ret = __dbc_get(dbc, &pkey, &data, DB_CURRENT)) != 0)
3276 return (ret);
3277
3278 memset(&skey, 0, sizeof(DBT));
3279 for (ret = __db_s_first(dbp, &sdbp);
3280 sdbp != NULL && ret == 0;
3281 ret = __db_s_next(&sdbp, dbc->txn)) {
3282 /*
3283 * Get the secondary key for this secondary and the current
3284 * item.
3285 */
3286 if ((ret = sdbp->s_callback(sdbp, &pkey, &data, &skey)) != 0) {
3287 /* Not indexing is equivalent to an empty key set. */
3288 if (ret == DB_DONOTINDEX) {
3289 F_SET(&skey, DB_DBT_MULTIPLE);
3290 skey.size = 0;
3291 } else /* We had a substantive error. Bail. */
3292 goto err;
3293 }
3294
3295 #ifdef DIAGNOSTIC
3296 if (F_ISSET(&skey, DB_DBT_MULTIPLE))
3297 __db_check_skeyset(sdbp, &skey);
3298 #endif
3299
3300 if (F_ISSET(&skey, DB_DBT_MULTIPLE)) {
3301 tskeyp = (DBT *)skey.data;
3302 nskey = skey.size;
3303 if (nskey == 0)
3304 continue;
3305 } else {
3306 tskeyp = &skey;
3307 nskey = 1;
3308 }
3309
3310 /* Open a secondary cursor. */
3311 if ((ret = __db_cursor_int(sdbp,
3312 dbc->thread_info, dbc->txn, sdbp->type,
3313 PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
3314 goto err;
3315 /* See comment above and in __dbc_put. */
3316 if (CDB_LOCKING(env)) {
3317 DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
3318 F_SET(sdbc, DBC_WRITER);
3319 }
3320
3321 for (; nskey > 0; nskey--, tskeyp++) {
3322 /*
3323 * Set the secondary cursor to the appropriate item.
3324 * Delete it.
3325 *
3326 * We want to use DB_RMW if locking is on; it's only
3327 * legal then, though.
3328 *
3329 * !!!
3330 * Don't stomp on any callback-allocated buffer in skey
3331 * when we do a c_get(DB_GET_BOTH); use a temp DBT
3332 * instead. Similarly, don't allow pkey to be
3333 * invalidated when the cursor is closed.
3334 */
3335 DB_INIT_DBT(tempskey, tskeyp->data, tskeyp->size);
3336 SWAP_IF_NEEDED(sdbp, &pkey);
3337 DB_INIT_DBT(temppkey, pkey.data, pkey.size);
3338 if ((ret = __dbc_get(sdbc, &tempskey, &temppkey,
3339 DB_GET_BOTH | rmw)) == 0)
3340 ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY);
3341 else if (ret == DB_NOTFOUND)
3342 ret = __db_secondary_corrupt(dbp);
3343 SWAP_IF_NEEDED(sdbp, &pkey);
3344 FREE_IF_NEEDED(env, tskeyp);
3345 }
3346
3347 if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
3348 ret = t_ret;
3349 if (ret != 0)
3350 goto err;
3351
3352 /*
3353 * In the common case where there is a single secondary key, we
3354 * will have freed any application-allocated data in skey
3355 * already. In the multiple key case, we need to free it here.
3356 * It is safe to do this twice as the macro resets the data
3357 * field.
3358 */
3359 FREE_IF_NEEDED(env, &skey);
3360 }
3361
3362 err: if (sdbp != NULL &&
3363 (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
3364 ret = t_ret;
3365 FREE_IF_NEEDED(env, &skey);
3366 return (ret);
3367 }
3368
3369 /*
3370 * __dbc_del_foreign --
3371 * Apply the foreign database constraints for a particular foreign
3372 * database when an item is being deleted (dbc points at item being deleted
3373 * in the foreign database.)
3374 *
3375 * Delete happens in dbp, check for occurrences of key in pdpb.
3376 * Terminology:
3377 * Foreign db = Where delete occurs (dbp).
3378 * Secondary db = Where references to dbp occur (sdbp, a secondary)
3379 * Primary db = sdbp's primary database, references to dbp are secondary
3380 * keys here
3381 * Foreign Key = Key being deleted in dbp (fkey)
3382 * Primary Key = Key of the corresponding entry in sdbp's primary (pkey).
3383 */
3384 static int
__dbc_del_foreign(dbc)3385 __dbc_del_foreign(dbc)
3386 DBC *dbc;
3387 {
3388 DB_FOREIGN_INFO *f_info;
3389 DB *dbp, *pdbp, *sdbp;
3390 DBC *pdbc, *sdbc;
3391 DBT data, fkey, pkey;
3392 ENV *env;
3393 u_int32_t flags, rmw;
3394 int changed, ret, t_ret;
3395
3396 dbp = dbc->dbp;
3397 env = dbp->env;
3398
3399 memset(&fkey, 0, sizeof(DBT));
3400 memset(&data, 0, sizeof(DBT));
3401 if ((ret = __dbc_get(dbc, &fkey, &data, DB_CURRENT)) != 0)
3402 return (ret);
3403
3404 LIST_FOREACH(f_info, &(dbp->f_primaries), f_links) {
3405 sdbp = f_info->dbp;
3406 pdbp = sdbp->s_primary;
3407 flags = f_info->flags;
3408
3409 rmw = (STD_LOCKING(dbc) &&
3410 !LF_ISSET(DB_FOREIGN_ABORT)) ? DB_RMW : 0;
3411
3412 /*
3413 * Handle CDB locking. Some of this is copied from
3414 * __dbc_del_primary, but a bit more acrobatics are required.
3415 * If we're not going to abort, then we need to get a write
3416 * cursor. If CDB_ALLDB is set, then only one write cursor is
3417 * allowed and we hold it, so we fudge things and promote the
3418 * cursor on the other DBs manually, it won't cause a problem.
3419 * If CDB_ALLDB is not set, then we go through the usual route
3420 * to make sure we block as necessary. If there are any open
3421 * read cursors on sdbp, the delete or put call later will
3422 * block.
3423 *
3424 * If NULLIFY is set, we'll need a cursor on the primary to
3425 * update it with the nullified data. Because primary and
3426 * secondary dbs share a lock file ID in CDB, we open a cursor
3427 * on the secondary and then get another writable cursor on the
3428 * primary via __db_cursor_int to avoid deadlocking.
3429 */
3430 sdbc = pdbc = NULL;
3431 if (!LF_ISSET(DB_FOREIGN_ABORT) && CDB_LOCKING(env) &&
3432 !F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
3433 ret = __db_cursor(sdbp,
3434 dbc->thread_info, dbc->txn, &sdbc, DB_WRITECURSOR);
3435 if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0) {
3436 ret = __db_cursor_int(pdbp,
3437 dbc->thread_info, dbc->txn, pdbp->type,
3438 PGNO_INVALID, 0, dbc->locker, &pdbc);
3439 F_SET(pdbc, DBC_WRITER);
3440 }
3441 } else {
3442 ret = __db_cursor_int(sdbp, dbc->thread_info, dbc->txn,
3443 sdbp->type, PGNO_INVALID, 0, dbc->locker, &sdbc);
3444 if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0)
3445 ret = __db_cursor_int(pdbp, dbc->thread_info,
3446 dbc->txn, pdbp->type, PGNO_INVALID, 0,
3447 dbc->locker, &pdbc);
3448 }
3449 if (ret != 0) {
3450 if (sdbc != NULL)
3451 (void)__dbc_close(sdbc);
3452 return (ret);
3453 }
3454 if (CDB_LOCKING(env) && F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
3455 DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
3456 F_SET(sdbc, DBC_WRITER);
3457 if (LF_ISSET(DB_FOREIGN_NULLIFY) && pdbc != NULL) {
3458 DB_ASSERT(env,
3459 pdbc->mylock.off == LOCK_INVALID);
3460 F_SET(pdbc, DBC_WRITER);
3461 }
3462 }
3463
3464 /*
3465 * There are three actions possible when a foreign database has
3466 * items corresponding to a deleted item:
3467 * DB_FOREIGN_ABORT - The delete operation should be aborted.
3468 * DB_FOREIGN_CASCADE - All corresponding foreign items should
3469 * be deleted.
3470 * DB_FOREIGN_NULLIFY - A callback needs to be made, allowing
3471 * the application to modify the data DBT from the
3472 * associated database. If the callback makes a
3473 * modification, the updated item needs to replace the
3474 * original item in the foreign db
3475 */
3476 memset(&pkey, 0, sizeof(DBT));
3477 memset(&data, 0, sizeof(DBT));
3478 ret = __dbc_pget(sdbc, &fkey, &pkey, &data, DB_SET|rmw);
3479
3480 if (ret == DB_NOTFOUND) {
3481 /* No entry means no constraint */
3482 ret = __dbc_close(sdbc);
3483 if (LF_ISSET(DB_FOREIGN_NULLIFY) &&
3484 (t_ret = __dbc_close(pdbc)) != 0)
3485 ret = t_ret;
3486 if (ret != 0)
3487 return (ret);
3488 continue;
3489 } else if (ret != 0) {
3490 /* Just return the error code from the pget */
3491 (void)__dbc_close(sdbc);
3492 if (LF_ISSET(DB_FOREIGN_NULLIFY))
3493 (void)__dbc_close(pdbc);
3494 return (ret);
3495 } else if (LF_ISSET(DB_FOREIGN_ABORT)) {
3496 /* If the record exists and ABORT is set, we're done */
3497 if ((ret = __dbc_close(sdbc)) != 0)
3498 return (ret);
3499 return (DB_FOREIGN_CONFLICT);
3500 }
3501
3502 /*
3503 * There were matching items in the primary DB, and the action
3504 * is either DB_FOREIGN_CASCADE or DB_FOREIGN_NULLIFY.
3505 */
3506 while (ret == 0) {
3507 if (LF_ISSET(DB_FOREIGN_CASCADE)) {
3508 /*
3509 * Don't use the DB_UPDATE_SECONDARY flag,
3510 * since we want the delete to cascade into the
3511 * secondary's primary.
3512 */
3513 if ((ret = __dbc_del(sdbc, 0)) != 0) {
3514 __db_err(env, ret, DB_STR("0698",
3515 "Attempt to execute cascading delete in a foreign index failed"));
3516 break;
3517 }
3518 } else if (LF_ISSET(DB_FOREIGN_NULLIFY)) {
3519 changed = 0;
3520 if ((ret = f_info->callback(sdbp,
3521 &pkey, &data, &fkey, &changed)) != 0) {
3522 __db_err(env, ret, DB_STR("0699",
3523 "Foreign database application callback"));
3524 break;
3525 }
3526
3527 /*
3528 * If the user callback modified the DBT and
3529 * a put on the primary failed.
3530 */
3531 if (changed && (ret = __dbc_put(pdbc,
3532 &pkey, &data, DB_KEYFIRST)) != 0) {
3533 __db_err(env, ret, DB_STR("0700",
3534 "Attempt to overwrite item in foreign database with nullified value failed"));
3535 break;
3536 }
3537 }
3538 /* retrieve the next matching item from the prim. db */
3539 memset(&pkey, 0, sizeof(DBT));
3540 memset(&data, 0, sizeof(DBT));
3541 ret = __dbc_pget(sdbc,
3542 &fkey, &pkey, &data, DB_NEXT_DUP|rmw);
3543 }
3544
3545 if (ret == DB_NOTFOUND)
3546 ret = 0;
3547 if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
3548 ret = t_ret;
3549 if (LF_ISSET(DB_FOREIGN_NULLIFY) &&
3550 (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
3551 ret = t_ret;
3552 if (ret != 0)
3553 return (ret);
3554 }
3555
3556 return (ret);
3557 }
3558
3559 /*
3560 * __db_s_first --
3561 * Get the first secondary, if any are present, from the primary.
3562 *
3563 * PUBLIC: int __db_s_first __P((DB *, DB **));
3564 */
3565 int
__db_s_first(pdbp,sdbpp)3566 __db_s_first(pdbp, sdbpp)
3567 DB *pdbp, **sdbpp;
3568 {
3569 DB *sdbp;
3570
3571 MUTEX_LOCK(pdbp->env, pdbp->mutex);
3572 sdbp = LIST_FIRST(&pdbp->s_secondaries);
3573
3574 /* See __db_s_next. */
3575 if (sdbp != NULL)
3576 sdbp->s_refcnt++;
3577 MUTEX_UNLOCK(pdbp->env, pdbp->mutex);
3578
3579 *sdbpp = sdbp;
3580
3581 return (0);
3582 }
3583
3584 /*
3585 * __db_s_next --
3586 * Get the next secondary in the list.
3587 *
3588 * PUBLIC: int __db_s_next __P((DB **, DB_TXN *));
3589 */
3590 int
__db_s_next(sdbpp,txn)3591 __db_s_next(sdbpp, txn)
3592 DB **sdbpp;
3593 DB_TXN *txn;
3594 {
3595 DB *sdbp, *pdbp, *closeme;
3596 ENV *env;
3597 int ret;
3598
3599 /*
3600 * Secondary indices are kept in a linked list, s_secondaries,
3601 * off each primary DB handle. If a primary is free-threaded,
3602 * this list may only be traversed or modified while the primary's
3603 * thread mutex is held.
3604 *
3605 * The tricky part is that we don't want to hold the thread mutex
3606 * across the full set of secondary puts necessary for each primary
3607 * put, or we'll wind up essentially single-threading all the puts
3608 * to the handle; the secondary puts will each take about as
3609 * long as the primary does, and may require I/O. So we instead
3610 * hold the thread mutex only long enough to follow one link to the
3611 * next secondary, and then we release it before performing the
3612 * actual secondary put.
3613 *
3614 * The only danger here is that we might legitimately close a
3615 * secondary index in one thread while another thread is performing
3616 * a put and trying to update that same secondary index. To
3617 * prevent this from happening, we refcount the secondary handles.
3618 * If close is called on a secondary index handle while we're putting
3619 * to it, it won't really be closed--the refcount will simply drop,
3620 * and we'll be responsible for closing it here.
3621 */
3622 sdbp = *sdbpp;
3623 pdbp = sdbp->s_primary;
3624 env = pdbp->env;
3625 closeme = NULL;
3626
3627 MUTEX_LOCK(env, pdbp->mutex);
3628 DB_ASSERT(env, sdbp->s_refcnt != 0);
3629 if (--sdbp->s_refcnt == 0) {
3630 LIST_REMOVE(sdbp, s_links);
3631 closeme = sdbp;
3632 }
3633 sdbp = LIST_NEXT(sdbp, s_links);
3634 if (sdbp != NULL)
3635 sdbp->s_refcnt++;
3636 MUTEX_UNLOCK(env, pdbp->mutex);
3637
3638 *sdbpp = sdbp;
3639
3640 /*
3641 * closeme->close() is a wrapper; call __db_close explicitly.
3642 */
3643 if (closeme == NULL)
3644 ret = 0;
3645 else
3646 ret = __db_close(closeme, txn, 0);
3647
3648 return (ret);
3649 }
3650
3651 /*
3652 * __db_s_done --
3653 * Properly decrement the refcount on a secondary database handle we're
3654 * using, without calling __db_s_next.
3655 *
3656 * PUBLIC: int __db_s_done __P((DB *, DB_TXN *));
3657 */
3658 int
__db_s_done(sdbp,txn)3659 __db_s_done(sdbp, txn)
3660 DB *sdbp;
3661 DB_TXN *txn;
3662 {
3663 DB *pdbp;
3664 ENV *env;
3665 int doclose, ret;
3666
3667 pdbp = sdbp->s_primary;
3668 env = pdbp->env;
3669 doclose = 0;
3670
3671 MUTEX_LOCK(env, pdbp->mutex);
3672 DB_ASSERT(env, sdbp->s_refcnt != 0);
3673 if (--sdbp->s_refcnt == 0) {
3674 LIST_REMOVE(sdbp, s_links);
3675 doclose = 1;
3676 }
3677 MUTEX_UNLOCK(env, pdbp->mutex);
3678
3679 if (doclose == 0)
3680 ret = 0;
3681 else
3682 ret = __db_close(sdbp, txn, 0);
3683 return (ret);
3684 }
3685
3686 /*
3687 * __db_s_count --
3688 * Count the number of secondaries associated with a given primary.
3689 */
3690 static int
__db_s_count(pdbp)3691 __db_s_count(pdbp)
3692 DB *pdbp;
3693 {
3694 DB *sdbp;
3695 ENV *env;
3696 int count;
3697
3698 env = pdbp->env;
3699 count = 0;
3700
3701 MUTEX_LOCK(env, pdbp->mutex);
3702 for (sdbp = LIST_FIRST(&pdbp->s_secondaries);
3703 sdbp != NULL;
3704 sdbp = LIST_NEXT(sdbp, s_links))
3705 ++count;
3706 MUTEX_UNLOCK(env, pdbp->mutex);
3707
3708 return (count);
3709 }
3710
3711 /*
3712 * __db_buildpartial --
3713 * Build the record that will result after a partial put is applied to
3714 * an existing record.
3715 *
3716 * This should probably be merged with __bam_build, but that requires
3717 * a little trickery if we plan to keep the overflow-record optimization
3718 * in that function.
3719 *
3720 * PUBLIC: int __db_buildpartial __P((DB *, DBT *, DBT *, DBT *));
3721 */
3722 int
__db_buildpartial(dbp,oldrec,partial,newrec)3723 __db_buildpartial(dbp, oldrec, partial, newrec)
3724 DB *dbp;
3725 DBT *oldrec, *partial, *newrec;
3726 {
3727 ENV *env;
3728 u_int32_t len, nbytes;
3729 u_int8_t *buf;
3730 int ret;
3731
3732 env = dbp->env;
3733
3734 DB_ASSERT(env, F_ISSET(partial, DB_DBT_PARTIAL));
3735
3736 memset(newrec, 0, sizeof(DBT));
3737
3738 nbytes = __db_partsize(oldrec->size, partial);
3739 newrec->size = nbytes;
3740
3741 if ((ret = __os_malloc(env, nbytes, &buf)) != 0)
3742 return (ret);
3743 newrec->data = buf;
3744
3745 /* Nul or pad out the buffer, for any part that isn't specified. */
3746 memset(buf,
3747 F_ISSET(dbp, DB_AM_FIXEDLEN) ? ((BTREE *)dbp->bt_internal)->re_pad :
3748 0, nbytes);
3749
3750 /* Copy in any leading data from the original record. */
3751 memcpy(buf, oldrec->data,
3752 partial->doff > oldrec->size ? oldrec->size : partial->doff);
3753
3754 /* Copy the data from partial. */
3755 memcpy(buf + partial->doff, partial->data, partial->size);
3756
3757 /* Copy any trailing data from the original record. */
3758 len = partial->doff + partial->dlen;
3759 if (oldrec->size > len)
3760 memcpy(buf + partial->doff + partial->size,
3761 (u_int8_t *)oldrec->data + len, oldrec->size - len);
3762
3763 return (0);
3764 }
3765
3766 /*
3767 * __db_partsize --
3768 * Given the number of bytes in an existing record and a DBT that
3769 * is about to be partial-put, calculate the size of the record
3770 * after the put.
3771 *
3772 * This code is called from __bam_partsize.
3773 *
3774 * PUBLIC: u_int32_t __db_partsize __P((u_int32_t, DBT *));
3775 */
3776 u_int32_t
__db_partsize(nbytes,data)3777 __db_partsize(nbytes, data)
3778 u_int32_t nbytes;
3779 DBT *data;
3780 {
3781
3782 /*
3783 * There are really two cases here:
3784 *
3785 * Case 1: We are replacing some bytes that do not exist (i.e., they
3786 * are past the end of the record). In this case the number of bytes
3787 * we are replacing is irrelevant and all we care about is how many
3788 * bytes we are going to add from offset. So, the new record length
3789 * is going to be the size of the new bytes (size) plus wherever those
3790 * new bytes begin (doff).
3791 *
3792 * Case 2: All the bytes we are replacing exist. Therefore, the new
3793 * size is the oldsize (nbytes) minus the bytes we are replacing (dlen)
3794 * plus the bytes we are adding (size).
3795 */
3796 if (nbytes < data->doff + data->dlen) /* Case 1 */
3797 return (data->doff + data->size);
3798
3799 return (nbytes + data->size - data->dlen); /* Case 2 */
3800 }
3801
3802 #ifdef DIAGNOSTIC
3803 /*
3804 * __db_check_skeyset --
3805 * Diagnostic check that the application's callback returns a set of
3806 * secondary keys without repeats.
3807 *
3808 * PUBLIC: #ifdef DIAGNOSTIC
3809 * PUBLIC: void __db_check_skeyset __P((DB *, DBT *));
3810 * PUBLIC: #endif
3811 */
3812 void
__db_check_skeyset(sdbp,skeyp)3813 __db_check_skeyset(sdbp, skeyp)
3814 DB *sdbp;
3815 DBT *skeyp;
3816 {
3817 DBT *first_key, *last_key, *key1, *key2;
3818 ENV *env;
3819
3820 env = sdbp->env;
3821
3822 first_key = (DBT *)skeyp->data;
3823 last_key = first_key + skeyp->size;
3824 for (key1 = first_key; key1 < last_key; key1++)
3825 for (key2 = key1 + 1; key2 < last_key; key2++)
3826 DB_ASSERT(env,
3827 ((BTREE *)sdbp->bt_internal)->bt_compare(sdbp,
3828 key1, key2, NULL) != 0);
3829 }
3830 #endif
3831
3832 #ifdef HAVE_ERROR_HISTORY
3833 /*
3834 * __dbc_diags
3835 * Save the context which triggers the "first notice" of an error code;
3836 * i.e., its creation. It doesn't touch anything when err == 0.
3837 *
3838 * PUBLIC: int __dbc_diags __P((DBC *, int));
3839 */
3840 int
__dbc_diags(dbc,err)3841 __dbc_diags(dbc, err)
3842 DBC *dbc;
3843 int err;
3844 {
3845 DB_MSGBUF *mb;
3846
3847 if (err != 0 && dbc->env != NULL &&
3848 (mb = __db_deferred_get()) != NULL) {
3849 (void)__db_remember_context(dbc->env, mb, err);
3850 #ifdef HAVE_SLICES
3851 if (dbc->env->slice_container != NULL)
3852 __db_msgadd(dbc->env, mb, "slice %d: ",
3853 dbc->env->slice_index);
3854 #endif
3855 __db_msgadd(dbc->env, mb, "DB: %s:%s\n" ,
3856 dbc->dbp->fname == NULL ? "in-mem" : dbc->dbp->fname,
3857 dbc->dbp->dname == NULL ? "" : dbc->dbp->fname);
3858 }
3859 return (err);
3860 }
3861 #endif
3862