1 /*-
2  * Copyright (c) 1996, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/mp.h"
15 
16 static int __bam_opd_cursor __P((DB *, DBC *, db_pgno_t, u_int32_t, u_int32_t));
17 static int __bam_ca_delete_func
18     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
19 static int __ram_ca_delete_func
20     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
21 static int __bam_ca_di_func
22     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
23 static int __bam_ca_dup_func
24     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
25 static int __bam_ca_undodup_func
26     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
27 static int __bam_ca_rsplit_func
28     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
29 static int __bam_ca_split_func
30     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
31 static int __bam_ca_undosplit_func
32     __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
33 
34 /*
35  * Cursor adjustments are logged if they are for subtransactions.  This is
36  * because it's possible for a subtransaction to adjust cursors which will
37  * still be active after the subtransaction aborts, and so which must be
38  * restored to their previous locations.  Cursors that can be both affected
39  * by our cursor adjustments and active after our transaction aborts can
40  * only be found in our parent transaction -- cursors in other transactions,
41  * including other child transactions of our parent, must have conflicting
42  * locker IDs, and so cannot be affected by adjustments in this transaction.
43  */
44 
45  /*
46   * __bam_ca_delete_func
47   *	Callback function for walking cursors to update them due to a delete.
48   */
49  static int
__bam_ca_delete_func(dbc,my_dbc,countp,pgno,indx,args)50  __bam_ca_delete_func(dbc, my_dbc, countp, pgno, indx, args)
51 	DBC *dbc, *my_dbc;
52 	u_int32_t *countp;
53 	db_pgno_t pgno;
54 	u_int32_t indx;
55 	void *args;
56 {
57 	BTREE_CURSOR *cp;
58 	u_int32_t del;
59 
60 	COMPQUIET(my_dbc, NULL);
61 	del = *(u_int32_t *)args;
62 
63 	cp = (BTREE_CURSOR *)dbc->internal;
64 	if (cp->pgno == pgno && cp->indx == indx &&
65 	    !MVCC_SKIP_CURADJ(dbc, pgno)) {
66 		/*
67 		 * [#8032] This assert is checking for possible race
68 		 * conditions where we hold a cursor position without
69 		 * a lock.  Unfortunately, there are paths in the
70 		 * Btree code that do not satisfy these conditions.
71 		 * None of them are known to be a problem, but this
72 		 * assert should be re-activated when the Btree stack
73 		 * code is re-written.
74 		DB_ASSERT(env, !STD_LOCKING(dbc) ||
75 		    cp->lock_mode != DB_LOCK_NG);
76 		 */
77 		if (del) {
78 			F_SET(cp, C_DELETED);
79 			/*
80 			 * If we're deleting the item, we can't
81 			 * keep a streaming offset cached.
82 			 */
83 			cp->stream_start_pgno = PGNO_INVALID;
84 		} else
85 			F_CLR(cp, C_DELETED);
86 
87 #ifdef HAVE_COMPRESSION
88 		/*
89 		 * We also set the C_COMPRESS_MODIFIED flag, which
90 		 * prompts the compression code to look for it's
91 		 * current entry again if it needs to.
92 		 *
93 		 * The flag isn't cleared, because the compression
94 		 * code still needs to do that even for an entry that
95 		 * becomes undeleted.
96 		 *
97 		 * This flag also needs to be set if an entry is
98 		 * updated, but since the compression code always
99 		 * deletes before an update, setting it here is
100 		 * sufficient.
101 		 */
102 		F_SET(cp, C_COMPRESS_MODIFIED);
103 #endif
104 
105 		++(*countp);
106 	}
107 	return (0);
108 }
109 
110 /*
111  * __bam_ca_delete --
112  *	Update the cursors when items are deleted and when already deleted
113  *	items are overwritten.  Return the number of relevant cursors found.
114  *
115  * PUBLIC: int __bam_ca_delete __P((DB *,
116  * PUBLIC:     db_pgno_t, u_int32_t, int, u_int32_t *));
117  */
118 int
__bam_ca_delete(dbp,pgno,indx,del,countp)119 __bam_ca_delete(dbp, pgno, indx, del, countp)
120 	DB *dbp;
121 	db_pgno_t pgno;
122 	u_int32_t indx;
123 	int del;
124 	u_int32_t *countp;
125 {
126 	int ret;
127 	u_int32_t count;
128 
129 	/*
130 	 * Adjust the cursors.  We have the page write locked, so the
131 	 * only other cursors that can be pointing at a page are
132 	 * those in the same thread of control.  Unfortunately, we don't
133 	 * know that they're using the same DB handle, so traverse
134 	 * all matching DB handles in the same ENV, then all cursors
135 	 * on each matching DB handle.
136 	 *
137 	 * Each cursor is single-threaded, so we only need to lock the
138 	 * list of DBs and then the list of cursors in each DB.
139 	 */
140 	if ((ret = __db_walk_cursors(dbp, NULL,
141 	    __bam_ca_delete_func, &count, pgno, indx, &del)) != 0)
142 		return (ret);
143 
144 	if (countp != NULL)
145 		*countp = count;
146 	return (0);
147 }
148 
149 static int
__ram_ca_delete_func(dbc,my_dbc,countp,root_pgno,indx,args)150 __ram_ca_delete_func(dbc, my_dbc, countp, root_pgno, indx, args)
151 	DBC *dbc, *my_dbc;
152 	u_int32_t *countp;
153 	db_pgno_t root_pgno;
154 	u_int32_t indx;
155 	void *args;
156 {
157 	COMPQUIET(indx, 0);
158 	COMPQUIET(my_dbc, NULL);
159 	COMPQUIET(args, NULL);
160 
161 	if (dbc->internal->root == root_pgno &&
162 	    !MVCC_SKIP_CURADJ(dbc, root_pgno)) {
163 		(*countp)++;
164 		return (EEXIST);
165 	}
166 	return (0);
167 }
168 
169 /*
170  * __ram_ca_delete --
171  *	Return if any relevant cursors found.
172  *
173  * PUBLIC: int __ram_ca_delete __P((DB *, db_pgno_t, u_int32_t *));
174  */
175 int
__ram_ca_delete(dbp,root_pgno,foundp)176 __ram_ca_delete(dbp, root_pgno, foundp)
177 	DB *dbp;
178 	db_pgno_t root_pgno;
179 	u_int32_t *foundp;
180 {
181 	int ret;
182 
183 	if ((ret = __db_walk_cursors(dbp, NULL, __ram_ca_delete_func,
184 	    foundp, root_pgno, 0, NULL)) != 0 && ret != EEXIST)
185 		return (ret);
186 
187 	return (0);
188 }
189 
190 struct __bam_ca_di_args {
191 	int adjust;
192 	DB_TXN *my_txn;
193 };
194 
195 static int
__bam_ca_di_func(dbc,my_dbc,foundp,pgno,indx,vargs)196 __bam_ca_di_func(dbc, my_dbc, foundp, pgno, indx, vargs)
197 	DBC *dbc, *my_dbc;
198 	u_int32_t *foundp;
199 	db_pgno_t pgno;
200 	u_int32_t indx;
201 	void *vargs;
202 {
203 	DBC_INTERNAL *cp;
204 	struct __bam_ca_di_args *args;
205 
206 	if (dbc->dbtype == DB_RECNO)
207 		return (0);
208 
209 	cp = dbc->internal;
210 	args = vargs;
211 	if (cp->pgno == pgno && cp->indx >= indx &&
212 	    (dbc == my_dbc || !MVCC_SKIP_CURADJ(dbc, pgno))) {
213 		/* Cursor indices should never be negative. */
214 		DB_ASSERT(dbc->dbp->env, cp->indx != 0 || args->adjust > 0);
215 		/* [#8032]
216 		DB_ASSERT(env, !STD_LOCKING(dbc) ||
217 		    cp->lock_mode != DB_LOCK_NG);
218 		*/
219 		cp->indx += args->adjust;
220 		if (args->my_txn != NULL && args->my_txn != dbc->txn)
221 			*foundp = 1;
222 	}
223 	return (0);
224 }
225 /*
226  * __bam_ca_di --
227  *	Adjust the cursors during a delete or insert.
228  *
229  * PUBLIC: int __bam_ca_di __P((DBC *, db_pgno_t, u_int32_t, int));
230  */
231 int
__bam_ca_di(my_dbc,pgno,indx,adjust)232 __bam_ca_di(my_dbc, pgno, indx, adjust)
233 	DBC *my_dbc;
234 	db_pgno_t pgno;
235 	u_int32_t indx;
236 	int adjust;
237 {
238 	DB *dbp;
239 	DB_LSN lsn;
240 	int ret;
241 	u_int32_t found;
242 	struct __bam_ca_di_args args;
243 
244 	dbp = my_dbc->dbp;
245 	args.adjust = adjust;
246 	args.my_txn = IS_SUBTRANSACTION(my_dbc->txn) ? my_dbc->txn : NULL;
247 
248 	/*
249 	 * Adjust the cursors.  See the comment in __bam_ca_delete().
250 	 */
251 	if ((ret = __db_walk_cursors(dbp, my_dbc, __bam_ca_di_func,
252 	    &found, pgno, indx, &args)) != 0)
253 		return (ret);
254 
255 	if (found != 0 && DBC_LOGGING(my_dbc)) {
256 		if ((ret = __bam_curadj_log(dbp, my_dbc->txn, &lsn, 0,
257 		    DB_CA_DI, pgno, 0, 0, (u_int32_t)adjust, indx, 0)) != 0)
258 			return (ret);
259 	}
260 
261 	return (0);
262 }
263 
264 /*
265  * __bam_opd_cursor -- create a new opd cursor.
266  */
267 static int
__bam_opd_cursor(dbp,dbc,first,tpgno,ti)268 __bam_opd_cursor(dbp, dbc, first, tpgno, ti)
269 	DB *dbp;
270 	DBC *dbc;
271 	db_pgno_t tpgno;
272 	u_int32_t first, ti;
273 {
274 	BTREE_CURSOR *cp, *orig_cp;
275 	DBC *dbc_nopd;
276 	int ret;
277 
278 	orig_cp = (BTREE_CURSOR *)dbc->internal;
279 	dbc_nopd = NULL;
280 
281 	/*
282 	 * Allocate a new cursor and create the stack.  If duplicates
283 	 * are sorted, we've just created an off-page duplicate Btree.
284 	 * If duplicates aren't sorted, we've just created a Recno tree.
285 	 *
286 	 * Note that in order to get here at all, there shouldn't be
287 	 * an old off-page dup cursor--to augment the checking dbc_newopd
288 	 * will do, assert this.
289 	 */
290 	DB_ASSERT(dbp->env, orig_cp->opd == NULL);
291 	if ((ret = __dbc_newopd(dbc, tpgno, orig_cp->opd, &dbc_nopd)) != 0)
292 		return (ret);
293 
294 	cp = (BTREE_CURSOR *)dbc_nopd->internal;
295 	cp->pgno = tpgno;
296 	cp->indx = ti;
297 
298 	if (dbp->dup_compare == NULL) {
299 		/*
300 		 * Converting to off-page Recno trees is tricky.  The
301 		 * record number for the cursor is the index + 1 (to
302 		 * convert to 1-based record numbers).
303 		 */
304 		cp->recno = ti + 1;
305 	}
306 
307 	/*
308 	 * Transfer the deleted flag from the top-level cursor to the
309 	 * created one.
310 	 */
311 	if (F_ISSET(orig_cp, C_DELETED)) {
312 		F_SET(cp, C_DELETED);
313 		F_CLR(orig_cp, C_DELETED);
314 	}
315 
316 	/* Stack the cursors and reset the initial cursor's index. */
317 	orig_cp->opd = dbc_nopd;
318 	orig_cp->indx = first;
319 	return (0);
320 }
321 
322 struct __bam_ca_dup_args {
323 	db_pgno_t tpgno;
324 	db_indx_t first, ti;
325 	DB_TXN *my_txn;
326 };
327 
328 static int
__bam_ca_dup_func(dbc,my_dbc,foundp,fpgno,fi,vargs)329 __bam_ca_dup_func(dbc, my_dbc, foundp, fpgno, fi, vargs)
330 	DBC *dbc;
331 	DBC *my_dbc;
332 	u_int32_t *foundp;
333 	db_pgno_t fpgno;
334 	u_int32_t fi;
335 	void *vargs;
336 {
337 	BTREE_CURSOR *orig_cp;
338 	DB *dbp;
339 	int ret;
340 	struct __bam_ca_dup_args *args;
341 
342 	COMPQUIET(my_dbc, NULL);
343 
344 	/*
345 	 * Since we rescan the list see if this is already
346 	 * converted.
347 	 */
348 	orig_cp = (BTREE_CURSOR *)dbc->internal;
349 	if (orig_cp->opd != NULL)
350 		return (0);
351 
352 	/* Find cursors pointing to this record. */
353 	if (orig_cp->pgno != fpgno || orig_cp->indx != fi ||
354 	    MVCC_SKIP_CURADJ(dbc, fpgno))
355 		return (0);
356 
357 	dbp = dbc->dbp;
358 	args = vargs;
359 
360 	MUTEX_UNLOCK(dbp->env, dbp->mutex);
361 
362 	if ((ret = __bam_opd_cursor(dbp,
363 	    dbc, args->first, args->tpgno, args->ti)) != 0) {
364 		MUTEX_LOCK(dbp->env, dbp->mutex);
365 		return (ret);
366 	}
367 	if (args->my_txn != NULL && args->my_txn != dbc->txn)
368 		*foundp = 1;
369 	/* We released the mutex to get a cursor, start over. */
370 	return (DB_LOCK_NOTGRANTED);
371 }
372 
373 /*
374  * __bam_ca_dup --
375  *	Adjust the cursors when moving items from a leaf page to a duplicates
376  *	page.
377  *
378  * PUBLIC: int __bam_ca_dup __P((DBC *,
379  * PUBLIC:    u_int32_t, db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
380  */
381 int
__bam_ca_dup(my_dbc,first,fpgno,fi,tpgno,ti)382 __bam_ca_dup(my_dbc, first, fpgno, fi, tpgno, ti)
383 	DBC *my_dbc;
384 	db_pgno_t fpgno, tpgno;
385 	u_int32_t first, fi, ti;
386 {
387 	DB *dbp;
388 	DB_LSN lsn;
389 	int ret, t_ret;
390 	u_int32_t found;
391 	struct __bam_ca_dup_args args;
392 
393 	dbp = my_dbc->dbp;
394 
395 	args.first = first;
396 	args.tpgno = tpgno;
397 	args.ti = ti;
398 	args.my_txn = IS_SUBTRANSACTION(my_dbc->txn) ? my_dbc->txn : NULL;
399 
400 	if ((ret = __db_walk_cursors(dbp,
401 	    my_dbc, __bam_ca_dup_func, &found, fpgno, fi, &args)) != 0)
402 		return (ret);
403 
404 	if (found != 0 && DBC_LOGGING(my_dbc)) {
405 		if ((t_ret = __bam_curadj_log(dbp, my_dbc->txn,
406 		    &lsn, 0, DB_CA_DUP, fpgno, tpgno, 0, first, fi, ti)) != 0 &&
407 		    ret == 0)
408 			ret = t_ret;
409 	}
410 
411 	return (ret);
412 }
413 
414 static int
__bam_ca_undodup_func(dbc,my_dbc,countp,fpgno,fi,vargs)415 __bam_ca_undodup_func(dbc, my_dbc, countp, fpgno, fi, vargs)
416 	DBC *dbc;
417 	DBC *my_dbc;
418 	u_int32_t *countp;
419 	db_pgno_t fpgno;
420 	u_int32_t fi;
421 	void *vargs;
422 {
423 	BTREE_CURSOR *orig_cp;
424 	DB *dbp;
425 	int ret;
426 	struct __bam_ca_dup_args *args;
427 
428 	COMPQUIET(my_dbc, NULL);
429 	COMPQUIET(countp, NULL);
430 
431 	orig_cp = (BTREE_CURSOR *)dbc->internal;
432 	dbp = dbc->dbp;
433 	args = vargs;
434 	/*
435 	 * A note on the orig_cp->opd != NULL requirement here:
436 	 * it's possible that there's a cursor that refers to
437 	 * the same duplicate set, but which has no opd cursor,
438 	 * because it refers to a different item and we took
439 	 * care of it while processing a previous record.
440 	 */
441 	if (orig_cp->pgno != fpgno ||
442 	    orig_cp->indx != args->first ||
443 	    orig_cp->opd == NULL || ((BTREE_CURSOR *)
444 	    orig_cp->opd->internal)->indx != args->ti ||
445 	    MVCC_SKIP_CURADJ(dbc, fpgno))
446 		return (0);
447 	MUTEX_UNLOCK(dbp->env, dbp->mutex);
448 	if ((ret = __dbc_close(orig_cp->opd)) != 0) {
449 		MUTEX_LOCK(dbp->env, dbp->mutex);
450 		return (ret);
451 	}
452 	orig_cp->opd = NULL;
453 	orig_cp->indx = fi;
454 	/*
455 	 * We released the mutex to free a cursor,
456 	 * start over.
457 	 */
458 	return (DB_LOCK_NOTGRANTED);
459 }
460 
461 /*
462  * __bam_ca_undodup --
463  *	Adjust the cursors when returning items to a leaf page
464  *	from a duplicate page.
465  *	Called only during undo processing.
466  *
467  * PUBLIC: int __bam_ca_undodup __P((DB *,
468  * PUBLIC:    u_int32_t, db_pgno_t, u_int32_t, u_int32_t));
469  */
470 int
__bam_ca_undodup(dbp,first,fpgno,fi,ti)471 __bam_ca_undodup(dbp, first, fpgno, fi, ti)
472 	DB *dbp;
473 	db_pgno_t fpgno;
474 	u_int32_t first, fi, ti;
475 {
476 	u_int32_t count;
477 	struct __bam_ca_dup_args args;
478 
479 	args.first = first;
480 	args.ti = ti;
481 	return (__db_walk_cursors(dbp, NULL,
482 	    __bam_ca_undodup_func, &count, fpgno, fi, &args));
483 
484 }
485 
486 static int
__bam_ca_rsplit_func(dbc,my_dbc,foundp,fpgno,indx,args)487 __bam_ca_rsplit_func(dbc, my_dbc, foundp, fpgno, indx, args)
488 	DBC *dbc;
489 	DBC *my_dbc;
490 	u_int32_t *foundp;
491 	db_pgno_t fpgno;
492 	u_int32_t indx;
493 	void *args;
494 {
495 	db_pgno_t tpgno;
496 
497 	COMPQUIET(indx, 0);
498 
499 	if (dbc->dbtype == DB_RECNO)
500 		return (0);
501 
502 	tpgno = *(db_pgno_t *)args;
503 	if (dbc->internal->pgno == fpgno &&
504 	    !MVCC_SKIP_CURADJ(dbc, fpgno)) {
505 		dbc->internal->pgno = tpgno;
506 		/* [#8032]
507 		DB_ASSERT(env, !STD_LOCKING(dbc) ||
508 		    dbc->internal->lock_mode != DB_LOCK_NG);
509 		*/
510 		if (IS_SUBTRANSACTION(my_dbc->txn) && dbc->txn != my_dbc->txn)
511 			*foundp = 1;
512 	}
513 	return (0);
514 }
515 
516 /*
517  * __bam_ca_rsplit --
518  *	Adjust the cursors when doing reverse splits.
519  *
520  * PUBLIC: int __bam_ca_rsplit __P((DBC *, db_pgno_t, db_pgno_t));
521  */
522 int
__bam_ca_rsplit(my_dbc,fpgno,tpgno)523 __bam_ca_rsplit(my_dbc, fpgno, tpgno)
524 	DBC* my_dbc;
525 	db_pgno_t fpgno, tpgno;
526 {
527 	DB *dbp;
528 	DB_LSN lsn;
529 	int ret;
530 	u_int32_t found;
531 
532 	dbp = my_dbc->dbp;
533 
534 	if ((ret = __db_walk_cursors(dbp, my_dbc,
535 	    __bam_ca_rsplit_func, &found, fpgno, 0, &tpgno)) != 0)
536 		return (ret);
537 
538 	if (found != 0 && DBC_LOGGING(my_dbc)) {
539 		if ((ret = __bam_curadj_log(dbp, my_dbc->txn,
540 		    &lsn, 0, DB_CA_RSPLIT, fpgno, tpgno, 0, 0, 0, 0)) != 0)
541 			return (ret);
542 	}
543 	return (0);
544 }
545 
546 struct __bam_ca_split_args {
547 	db_pgno_t lpgno, rpgno;
548 	int cleft;
549 	DB_TXN *my_txn;
550 };
551 
552 static int
__bam_ca_split_func(dbc,my_dbc,foundp,ppgno,split_indx,vargs)553 __bam_ca_split_func(dbc, my_dbc, foundp, ppgno, split_indx, vargs)
554 	DBC *dbc;
555 	DBC *my_dbc;
556 	u_int32_t *foundp;
557 	db_pgno_t ppgno;
558 	u_int32_t split_indx;
559 	void *vargs;
560 {
561 	DBC_INTERNAL *cp;
562 	struct __bam_ca_split_args *args;
563 
564 	COMPQUIET(my_dbc, NULL);
565 
566 	if (dbc->dbtype == DB_RECNO)
567 		return (0);
568 	cp = dbc->internal;
569 	args = vargs;
570 	if (cp->pgno == ppgno &&
571 	    !MVCC_SKIP_CURADJ(dbc, ppgno)) {
572 		/* [#8032]
573 		DB_ASSERT(env, !STD_LOCKING(dbc) ||
574 		    cp->lock_mode != DB_LOCK_NG);
575 		*/
576 		if (args->my_txn != NULL && args->my_txn != dbc->txn)
577 			*foundp = 1;
578 		if (cp->indx < split_indx) {
579 			if (args->cleft)
580 				cp->pgno = args->lpgno;
581 		} else {
582 			cp->pgno = args->rpgno;
583 			cp->indx -= split_indx;
584 		}
585 	}
586 	return (0);
587 }
588 
589 /*
590  * __bam_ca_split --
591  *	Adjust the cursors when splitting a page.
592  *
593  * PUBLIC: int __bam_ca_split __P((DBC *,
594  * PUBLIC:    db_pgno_t, db_pgno_t, db_pgno_t, u_int32_t, int));
595  */
596 int
__bam_ca_split(my_dbc,ppgno,lpgno,rpgno,split_indx,cleft)597 __bam_ca_split(my_dbc, ppgno, lpgno, rpgno, split_indx, cleft)
598 	DBC *my_dbc;
599 	db_pgno_t ppgno, lpgno, rpgno;
600 	u_int32_t split_indx;
601 	int cleft;
602 {
603 	DB *dbp;
604 	DB_LSN lsn;
605 	int ret;
606 	u_int32_t found;
607 	struct __bam_ca_split_args args;
608 
609 	dbp = my_dbc->dbp;
610 
611 	/*
612 	 * If splitting the page that a cursor was on, the cursor has to be
613 	 * adjusted to point to the same record as before the split.  Most
614 	 * of the time we don't adjust pointers to the left page, because
615 	 * we're going to copy its contents back over the original page.  If
616 	 * the cursor is on the right page, it is decremented by the number of
617 	 * records split to the left page.
618 	 */
619 	args.lpgno = lpgno;
620 	args.rpgno = rpgno;
621 	args.cleft = cleft;
622 	args.my_txn = IS_SUBTRANSACTION(my_dbc->txn) ? my_dbc->txn : NULL;
623 	if ((ret = __db_walk_cursors(dbp, my_dbc,
624 	    __bam_ca_split_func, &found, ppgno, split_indx, &args)) != 0)
625 		return (ret);
626 
627 	if (found != 0 && DBC_LOGGING(my_dbc)) {
628 		if ((ret = __bam_curadj_log(dbp,
629 		    my_dbc->txn, &lsn, 0, DB_CA_SPLIT, ppgno, rpgno,
630 		    cleft ? lpgno : PGNO_INVALID, 0, split_indx, 0)) != 0)
631 			return (ret);
632 	}
633 
634 	return (0);
635 }
636 
637 static int
__bam_ca_undosplit_func(dbc,my_dbc,foundp,frompgno,split_indx,vargs)638 __bam_ca_undosplit_func(dbc, my_dbc, foundp, frompgno, split_indx, vargs)
639 	DBC *dbc;
640 	DBC *my_dbc;
641 	u_int32_t *foundp;
642 	db_pgno_t frompgno;
643 	u_int32_t split_indx;
644 	void *vargs;
645 {
646 	DBC_INTERNAL *cp;
647 	struct __bam_ca_split_args *args;
648 
649 	COMPQUIET(my_dbc, NULL);
650 	COMPQUIET(foundp, NULL);
651 
652 	if (dbc->dbtype == DB_RECNO)
653 		return (0);
654 	cp = dbc->internal;
655 	args = vargs;
656 	if (cp->pgno == args->rpgno &&
657 	    !MVCC_SKIP_CURADJ(dbc, args->rpgno)) {
658 		cp->pgno = frompgno;
659 		cp->indx += split_indx;
660 	} else if (cp->pgno == args->lpgno &&
661 	    !MVCC_SKIP_CURADJ(dbc, args->lpgno))
662 		cp->pgno = frompgno;
663 
664 	return (0);
665 }
666 
667 /*
668  * __bam_ca_undosplit --
669  *	Adjust the cursors when undoing a split of a page.
670  *	If we grew a level we will execute this for both the
671  *	left and the right pages.
672  *	Called only during undo processing.
673  *
674  * PUBLIC: int __bam_ca_undosplit __P((DB *,
675  * PUBLIC:    db_pgno_t, db_pgno_t, db_pgno_t, u_int32_t));
676  */
677 int
__bam_ca_undosplit(dbp,frompgno,topgno,lpgno,split_indx)678 __bam_ca_undosplit(dbp, frompgno, topgno, lpgno, split_indx)
679 	DB *dbp;
680 	db_pgno_t frompgno, topgno, lpgno;
681 	u_int32_t split_indx;
682 {
683 	u_int32_t count;
684 	struct __bam_ca_split_args args;
685 
686 	/*
687 	 * When backing out a split, we move the cursor back
688 	 * to the original offset and bump it by the split_indx.
689 	 */
690 	args.lpgno = lpgno;
691 	args.rpgno = topgno;
692 	return (__db_walk_cursors(dbp, NULL,
693 	    __bam_ca_undosplit_func, &count, frompgno, split_indx, &args));
694 }
695