1 /*-
2  * Copyright (c) 2016, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #ifdef HAVE_SLICES
12 
13 #include "db_int.h"
14 #include "dbinc/db_page.h"
15 #include "dbinc/btree.h"
16 #include "dbinc/crypto.h"
17 #include "dbinc/fop.h"
18 #include "dbinc/hash.h"
19 #include "dbinc/heap.h"
20 #include "dbinc/lock.h"
21 #include "dbinc/mp.h"
22 #include "dbinc/qam.h"
23 #include "dbinc/slice.h"
24 #include "dbinc/txn.h"
25 
26 /* This limits the bytes displayed for DBTs in verbose & diagnostic messages. */
27 #define	DB_VERB_SLICE_PRINTLEN	30
28 
29 /*
30  * __db_slice_open_pp --
31  *	DB->open pre/post processing for sliced db.
32  *
33  * PUBLIC: int __db_slice_open_pp __P((DB *, DB_TXN *,
34  * PUBLIC:     const char *, const char *, DBTYPE, u_int32_t, int));
35  */
36 int
__db_slice_open_pp(dbp,txn,fname,dname,type,flags,mode)37 __db_slice_open_pp(dbp, txn, fname, dname, type, flags, mode)
38 	DB *dbp;
39 	DB_TXN *txn;
40 	const char *fname, *dname;
41 	DBTYPE type;
42 	u_int32_t flags;
43 	int mode;
44 {
45 	DB_THREAD_INFO *ip;
46 	ENV *env;
47 	int ret, t_ret, txn_local;
48 #ifdef HAVE_SLICED_REPLICATION
49 	int handle_check;
50 #endif
51 	/*
52 	 * Use the normal open for sub-databases, in-memory databases or
53 	 * non-sliced databases.
54 	 */
55 	if (!LF_ISSET(DB_SLICED) || dname != NULL || fname == NULL)
56 		return (__db_open_pp(dbp,
57 		    txn, fname, dname, type, flags, mode));
58 
59 	txn_local = 0;
60 	env = dbp->env;
61 	ENV_ENTER(env, ip);
62 
63 	/*
64 	 * Save the flags.  We do this here because we don't pass all of the
65 	 * flags down into the actual DB->open method call, we strip
66 	 * DB_AUTO_COMMIT at this layer.
67 	 */
68 	dbp->open_flags = flags;
69 
70 	/* Save the current DB handle flags for refresh. */
71 	dbp->orig_flags = dbp->flags;
72 
73 #ifdef HAVE_SLICED_REPLICATION
74 	/* Reminder: this needs to be looked at for sliced replication. */
75 
76 	/* Check for replication block. */
77 	handle_check = IS_ENV_REPLICATED(env);
78 	if (handle_check &&
79 	    (ret = __db_rep_enter(dbp, 1, 0, IS_REAL_TXN(txn))) != 0) {
80 		handle_check = 0;
81 		goto err;
82 	}
83 
84 	/*
85 	 * A replication client can't create a database, but it's convenient to
86 	 * allow a repmgr application to specify DB_CREATE anyway.  Thus for
87 	 * such an application the meaning of DB_CREATE becomes "create it if
88 	 * I'm a master, and otherwise ignore the flag".  A repmgr application
89 	 * running as master can't be sure that it won't spontaneously become a
90 	 * client, so there's a race condition.
91 	 */
92 	if (IS_REP_CLIENT(env) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))
93 		LF_CLR(DB_CREATE);
94 #endif
95 
96 	/*
97 	 * Create local transaction as necessary, check for consistent
98 	 * transaction usage.
99 	 */
100 	if (txn == NULL || IS_ENV_AUTO_COMMIT(env, txn, flags)) {
101 		if ((ret = __db_txn_auto_init(env, ip, &txn)) != 0)
102 			goto err;
103 		txn_local = 1;
104 	} else if (txn != NULL && !TXN_ON(env) &&
105 	    (!CDB_LOCKING(env) || !F_ISSET(txn, TXN_FAMILY))) {
106 		ret = __db_not_txn_env(env);
107 		goto err;
108 	}
109 	LF_CLR(DB_AUTO_COMMIT);
110 
111 	/*
112 	 * We check arguments after possibly creating a local transaction,
113 	 * which is unusual -- the reason is some flags are illegal if any
114 	 * kind of transaction is in effect.
115 	 */
116 	if ((ret = __db_open_arg(dbp, txn, fname, NULL, type, flags)) == 0 &&
117 	    (ret =
118 		__db_slice_open(dbp, ip, txn, fname, type, flags, mode)) != 0)
119 		goto err;
120 
121 	if (txn_local && (t_ret = __db_txn_auto_resolve(env,
122 	    txn, F_ISSET(dbp, DB_AM_CREATED), ret)) && ret == 0)
123 		ret = t_ret;
124 
125 err:
126 #ifdef HAVE_SLICED_REPLICATION
127 	/* Release replication block. */
128 	if (handle_check && (t_ret = __env_db_rep_exit(env)) != 0 && ret == 0)
129 		ret = t_ret;
130 #endif
131 
132 	ENV_LEAVE(env, ip);
133 	return (ret);
134 }
135 
136 /*
137  * __db_slice_alloc --
138  *	Allocate, create, and clone the db handles of a container's db slices;
139  *	do nothing if they are already allocated.
140  *
141  *	This also verifies the container db's slice-relevant metadata when
142  *	opening an existing database, or adds it when creating the db.
143  *
144  *	The slices' databases are not opened here, but in __db_open_pp().
145  *
146  * PUBLIC: int __db_slice_alloc __P((DB *, DB_THREAD_INFO *, DB_TXN *));
147  */
148 int
__db_slice_alloc(dbp,ip,txn)149 __db_slice_alloc(dbp, ip, txn)
150 	DB *dbp;
151 	DB_THREAD_INFO *ip;
152 	DB_TXN *txn;
153 {
154 	DB_ENV *dbenv, *slice;
155 	DB *sl_dbp;
156 	ENV *env;
157 	int i, ret;
158 
159 	env = dbp->env;
160 	dbenv = env->dbenv;
161 	DB_ASSERT(env, dbenv->slice_cnt != 0);
162 	if (dbp->db_slices != NULL)
163 		return (0);
164 
165 	/* Create a NULL terminated array of slice databases. */
166 	if ((ret = __os_calloc(env,
167 	    dbenv->slice_cnt + 1, sizeof(DB *), &dbp->db_slices)) != 0)
168 		return (ret);
169 
170 	/* Verify or create the slice metadata. */
171 	if ((ret = __db_slice_metachk(dbp, ip, txn)) != 0)
172 		goto err;
173 
174 	for (i = -1; (slice = __slice_iterate(dbenv, &i)) != NULL; ) {
175 		if ((ret = db_create(&dbp->db_slices[i], slice, 0)) != 0) {
176 			__db_err(env, ret,
177 			    "create of database %s slice %d", dbp->fname, i);
178 			goto err;
179 		}
180 		sl_dbp = dbp->db_slices[i];
181 		sl_dbp->db_container = dbp;
182 
183 		/*
184 		 * Copy configuration from dbp: settings, etc. As with DB_ENV,
185 		 * these are sorted by the name of the DB->set_xxx() function.
186 		 */
187 		if ((ret = __db_slice_configure(dbp, sl_dbp)) != 0) {
188 			__db_err(env, ret,
189 			    "configure of \"%s\" slice %d", dbp->fname, i);
190 			goto err;
191 		}
192 	}
193 
194 	return (0);
195 
196 err:
197 	(void)__db_slice_free(dbp, DB_NOSYNC);
198 	return (ret);
199 }
200 
201 /*
202  * __db_slice_free --
203  *	Free all the db handles underneath a container's db.
204  *
205  * PUBLIC: int __db_slice_free __P((DB *, u_int32_t));
206  */
207 int
__db_slice_free(dbp,flags)208 __db_slice_free(dbp, flags)
209 	DB *dbp;
210 	u_int32_t flags;
211 {
212 	DB *sl_dbp;
213 	db_slice_t  i, slice_cnt;
214 	int ret, t_ret;
215 
216 	ret = 0;
217 	if (dbp->db_slices != NULL) {
218 		slice_cnt = dbp->dbenv->slice_cnt;
219 		for (i = 0; i != slice_cnt; i++)  {
220 			sl_dbp = dbp->db_slices[i];
221 			if (sl_dbp != NULL && (t_ret =
222 			     __db_close_pp(sl_dbp, flags)) != 0 && ret == 0)
223 				ret = t_ret;
224 		}
225 		__os_free(dbp->env, dbp->db_slices);
226 		dbp->db_slices = NULL;
227 	}
228 	return (ret);
229 }
230 
231 /*
232  * __db_slice_configure --
233  *	Share the setting of a container db with one of its slices.
234  *
235  * PUBLIC: int __db_slice_configure __P((const DB *, DB *));
236  */
237 int
__db_slice_configure(container,slice)238 __db_slice_configure(container, slice)
239 	const DB *container;
240 	DB *slice;
241 {
242 	int ret;
243 
244 	ret = 0;
245 	DB_ASSERT(container->env, container->dbenv->slice_cnt != 0);
246 
247 	/* Copy the customizable values inherited from the container. */
248 	__db_copy_config(container, slice, 1);
249 
250 #ifdef HAVE_HEAP
251 	if (container->type == DB_HEAP) {
252 		((HEAP *)slice->heap_internal)->gbytes =
253 		    ((HEAP *)container->heap_internal)->gbytes;
254 		((HEAP *)slice->heap_internal)->bytes =
255 		    ((HEAP *)container->heap_internal)->bytes;
256 		((HEAP *)slice->heap_internal)->region_size =
257 		    ((HEAP *)container->heap_internal)->region_size;
258 	}
259 #endif
260 
261 	return (ret);
262 }
263 
264 /*
265  * __db_slice_default_callback  -
266  *      Default slice specification DBT constructor: use the whole key.
267  *
268  * PUBLIC: int __db_slice_default_callback
269  * PUBLIC:     __P((const DB *, const DBT *key, DBT *));
270  */
271 int
__db_slice_default_callback(dbp,key,slice)272 __db_slice_default_callback(dbp, key, slice)
273 	const DB *dbp;
274 	const DBT *key;
275 	DBT *slice;
276 {
277 	slice->data = key->data;
278 	slice->size = key->size;
279 	COMPQUIET(dbp, NULL);
280 	return (0);
281 }
282 
283 /*
284  * __db_slice_metadata
285  *	Fetch or insert a single key-value pair (of string values).
286  *
287  *	The 'expect' DBT is either inserted (if the db is still being created
288  *	or the operation is an insert) or compared to the value actually
289  *	present.
290  *
291  * PUBLIC: int __db_slice_metadata __P((DB *,
292  * PUBLIC:     DB_THREAD_INFO *, DB_TXN *, const char *, DBT *, int));
293  */
294 int
__db_slice_metadata(dbp,ip,txn,name,expect,insert)295 __db_slice_metadata(dbp, ip, txn, name, expect, insert)
296 	DB *dbp;
297 	DB_THREAD_INFO *ip;
298 	DB_TXN *txn;
299 	const char *name;
300 	DBT *expect;
301 	int insert;
302 {
303 	DBT key, actual;
304 	ENV *env;
305 	int ret;
306 	char actual_buf[DB_MAXPATHLEN];
307 
308 	env = dbp->env;
309 
310 	DB_INIT_DBT(key, name, strlen(name));
311 	if (F_ISSET(dbp, DB_AM_CREATED) || insert != 0) {
312 		if ((ret = __db_put(dbp, ip, txn, &key, expect, 0)) != 0)
313 			__db_err(env, ret,
314 			    "Database %s could not insert slice metadata(%s)",
315 			    dbp->fname, name);
316 	} else {
317 		DB_INIT_DBT_USERMEM(actual, actual_buf, sizeof(actual_buf));
318 		if ((ret = __db_get(dbp, ip, txn, &key, &actual, 0)) != 0) {
319 			ret = USR_ERR(env, DB_SLICE_CORRUPT);
320 			__db_err(env, ret, DB_STR_A("0787",
321 			    "Database %s has no metadata \"%s\"", "%s %s"),
322 			    dbp->fname, name);
323 		}
324 		else if (__dbt_defcmp(dbp, &actual, expect, NULL) != 0) {
325 			/*
326 			 * The value isn't exactly what was expected. Usually
327 			 * that says db has corrupt metadata, but if this is the
328 			 * "version", an upgrade could be done. If that is ever
329 			 * needed this could copy the actual value back into the
330 			 * the passed-in DBT, for the caller to do as it wishes.
331 			 */
332 			ret = USR_ERR(env, DB_SLICE_CORRUPT);
333 		}
334 	}
335 	return (ret);
336 }
337 
338 /*
339  * __db_slice_fileid_metachk --
340  *	Verify or insert the fileid metadata for a slice.
341  *
342  * PUBLIC: int __db_slice_fileid_metachk
343  * PUBLIC:     __P((DB *, DB_THREAD_INFO *, DB_TXN *, db_slice_t, int));
344  */
345 int
__db_slice_fileid_metachk(dbp,ip,txn,id,insert)346 __db_slice_fileid_metachk(dbp, ip, txn, id, insert)
347 	DB *dbp;
348 	DB_THREAD_INFO *ip;
349 	DB_TXN *txn;
350 	db_slice_t id;
351 	int insert;
352 {
353 	DBT value;
354 	char fileid_name[sizeof(DB_SLICE_METADATA_FILEID_FMT)];
355 	int ret;
356 
357 	snprintf(fileid_name, sizeof(fileid_name),
358 	    DB_SLICE_METADATA_FILEID_FMT, id);
359 	DB_INIT_DBT_USERMEM(value, dbp->db_slices[id]->fileid, DB_FILE_ID_LEN);
360 	value.size = DB_FILE_ID_LEN;
361 	if ((ret = __db_slice_metadata(
362 	    dbp, ip, txn, fileid_name, &value, insert)) != 0)
363 		__db_errx(dbp->env, DB_STR_A("0788",
364 		    "Sliced database %s has bad metadata for %s", "%s %s"),
365 		    dbp->fname, fileid_name);
366 	return (ret);
367 }
368 
369 /*
370  * __db_slice_metachk --
371  *	Verify or insert the version and slice count metadata of a container db.
372  *
373  *	The container DB needs to have certain metadata.
374  *   #records	key		value
375  *	1	version		slice metadata version number as string
376  *	1	count		slice count as a string
377  *	#slices	fileid#%03d	the fileid of that slice's section
378  *
379  *	The version and count are checked here, if the file has been opened.
380  *	The fileid metadata is checked later, after each slice is opened.
381  *
382  *	If a slice is missing or corrupt, return DB_SLICE_CORRUPT.
383  *
384  * PUBLIC: int __db_slice_metachk __P((DB *, DB_THREAD_INFO *, DB_TXN *));
385  */
386 int
__db_slice_metachk(dbp,ip,txn)387 __db_slice_metachk(dbp, ip, txn)
388 	DB *dbp;
389 	DB_THREAD_INFO *ip;
390 	DB_TXN *txn;
391 {
392 	DBT value;
393 	char value_buf[DB_MAXPATHLEN];
394 	int ret;
395 
396 	/*
397 	 * Skip the metadata checks for db_verify, etc., which don't really open
398 	 * the database. Db_verify does set DB_AM_OPEN_CALLED,
399 	 * so use fname == NULL.
400 	 */
401 	if (dbp->fname == NULL)
402 		return (0);
403 
404 	DB_INIT_DBT_USERMEM(value, value_buf, sizeof(value_buf));
405 
406 	/* Make sure that the version number is not too high, or low. */
407 	value.size = (u_int32_t)snprintf(value.data,
408 	    value.ulen, "%u", DB_SLICE_METADATA_VERSION);
409 	if ((ret = __db_slice_metadata(dbp, ip,
410 	    txn, "version", &value, 0)) != 0)
411 		goto err;
412 
413 	/* Make sure that the slice count matches the environment. */
414 	value.size = (u_int32_t)
415 	    snprintf(value.data, value.ulen, "%u", dbp->dbenv->slice_cnt);
416 	ret = __db_slice_metadata(dbp, ip, txn, "count", &value, 0);
417 
418 err:
419 	return (ret);
420 }
421 
422 /*
423  * __db_slice_open --
424  *	Finish opening up a sliced database by creating and opening its slices.
425  *
426  *	The container DB itself has already been opened.
427  *
428  *	Opens the relative filename in each of the slices' databases. Each
429  *	takes places in its own environment and transaction.
430  *	If a slice is missing or corrupt, return DB_SLICE_CORRUPT.
431  *
432  * PUBLIC: int __db_slice_open __P((DB *, DB_THREAD_INFO *,
433  * PUBLIC:      DB_TXN *, const char *, DBTYPE, u_int32_t, int));
434  */
435 int
__db_slice_open(dbp,ip,txn,fname,type,flags,mode)436 __db_slice_open(dbp, ip, txn, fname, type, flags, mode)
437 	DB *dbp;
438 	DB_THREAD_INFO *ip;
439 	DB_TXN *txn;
440 	const char *fname;
441 	DBTYPE type;
442 	u_int32_t flags;
443 	int mode;
444 {
445 	DB_ENV *dbenv;
446 	DB_THREAD_INFO *slice_ip;
447 	DB_TXN *slice_txn;
448 	ENV *env;
449 	int ret;
450 	u_int32_t slice_flags;
451 	db_slice_t i;
452 	const char *mesg;
453 
454 	env = dbp->env;
455 	dbenv = env->dbenv;
456 	mesg = NULL;
457 	slice_txn = NULL;
458 
459 	if (!SLICES_ON(env))
460 		return (0);
461 
462 	if (fname == NULL)
463 		mesg = "in-memory";
464 	else if (dbp->dname != NULL)
465 		mesg = "sub";
466 	else if (dbp->type != DB_BTREE && dbp->type != DB_HASH)
467 		mesg = __db_dbtype_to_string(dbp->type);
468 	if (mesg != NULL) {
469 		ret = USR_ERR(env, EINVAL);
470 		__db_err(env, ret, "%s databases cannot support slices", mesg);
471 		return (ret);
472 	}
473 	if (dbp->slice_callback == NULL)
474 		dbp->slice_callback = __db_slice_default_callback;
475 
476 	/* Get the flags of the container. */
477 	if ((ret = __db_get_flags(dbp, &slice_flags)) != 0)
478 		return (ret);
479 
480 	/* Allocate the db_slices array, create and 'clone' its db handles. */
481 	if ((ret = __db_slice_alloc(dbp, ip, txn)) != 0)
482 		return (ret);
483 
484 	/*
485 	 * Now open each slice, without DB_SLICED so that their DML calls
486 	 * have non-sliced behavior.
487 	 */
488 	LF_CLR(DB_SLICED);
489 	for (i = 0; i != dbenv->slice_cnt; i++) {
490 		if ((ret = __db_set_flags(
491 		    dbp->db_slices[i], slice_flags)) != 0)
492 			goto err;
493 		if ((ret = __txn_slice_begin(txn, &slice_txn, i)) != 0) {
494 			__db_err(env, ret,
495 			    "txn->begin for db \"%s\" slice %d failed",
496 			    fname, i);
497 			goto err;
498 		}
499 		ENV_ENTER(dbp->db_slices[i]->env, slice_ip);
500 		dbp->db_slices[i]->open_flags = flags;
501 		if ((ret = __db_open(dbp->db_slices[i], slice_ip, slice_txn,
502 		    fname, NULL, type, flags, mode, PGNO_BASE_MD)) != 0) {
503 			__db_err(env, ret,
504 			    "open of database %s slice %d failed", fname, i);
505 			ENV_LEAVE(dbp->db_slices[i]->env, slice_ip);
506 			goto err;
507 		}
508 		ENV_LEAVE(dbp->db_slices[i]->env, slice_ip);
509 		ret = __db_slice_fileid_metachk(dbp, ip, txn, i, 0);
510 		if (ret != 0)
511 			goto err;
512 	}
513 
514 	/* Replace functions which have special handling when db is sliced. */
515 	dbp->close = __db_slice_close_pp;
516 	dbp->del = __db_slice_del_pp;
517 	dbp->exists = __db_slice_exists_pp;
518 	dbp->get = __db_slice_get_pp;
519 	dbp->get_slices = __db_slice_get_slices;
520 	dbp->put = __db_slice_put_pp;
521 	dbp->pget = __db_slice_pget_pp;
522 	dbp->slice_lookup = __db_slice_lookup_pp;
523 	dbp->sync = __db_slice_sync_pp;
524 	/* Replace these with the generic "not supported" error function. */
525 	dbp->join =
526 	    (int (*) __P((DB *, DBC **, DBC **, u_int32_t)))__db_slice_notsup;
527 	dbp->key_range = (int (*) __P((DB *,
528 	    DB_TXN *, DBT *, DB_KEY_RANGE *, u_int32_t)))__db_slice_notsup;
529 	dbp->set_lk_exclusive = (int (*) __P((DB *, int)))__db_slice_notsup;
530 	dbp->set_partition = (int (*) __P ((DB *, u_int32_t, DBT *,
531 	    u_int32_t (*)(DB *, DBT *key))))__db_slice_notsup;
532 
533 	return (0);
534 
535 err:
536 	(void)__db_slice_free(dbp, DB_NOSYNC);
537 	return (USR_ERR(env, DB_SLICE_CORRUPT));
538 }
539 
540 /*
541  * __db_slice_close_pp --
542  *	DB->close pre/post processing for an actually sliced db.
543  *
544  * PUBLIC: int __db_slice_close_pp __P((DB *, u_int32_t));
545  */
546 int
__db_slice_close_pp(dbp,flags)547 __db_slice_close_pp(dbp, flags)
548 	DB *dbp;
549 	u_int32_t flags;
550 {
551 	int ret, t_ret;
552 
553 	ret = __db_slice_free(dbp, flags);
554 	if ((t_ret = __db_close_pp(dbp, flags)) != 0 && ret == 0)
555 		ret = t_ret;
556 	return (ret);
557 }
558 
559 /*
560  * __db_slice_iterate --
561  *	Return each non-null slice of a sliced database.
562  *
563  *	The first call of 'foreach' loop starts with *pos == -1.
564  *
565  *	Returns:
566  *		the next non-NULL slice, or NULL when all have been seen. Once
567  *		it returns NULL it continues to do so on subsequent calls.
568  *
569  *	Side Effect:
570  *		*pos is set to the position in the slice array of the
571  *		returned environment.
572  *
573  * PUBLIC: DB *__db_slice_iterate __P((DB *, int *));
574  */
575 DB *
__db_slice_iterate(dbp,pos)576 __db_slice_iterate(dbp, pos)
577 	DB *dbp;
578 	int *pos;
579 {
580 	DB *sl_dbp;
581 	DB_ENV *dbenv;
582 	ENV *env;
583 	db_slice_t i;
584 
585 	env = dbp->env;
586 	dbenv = env->dbenv;
587 	sl_dbp = NULL;
588 	if (!SLICES_ON(env))
589 		return (NULL);
590 	i = (db_slice_t)(1 + *pos);
591 	DB_ASSERT(env, i <= dbenv->slice_cnt);
592 	while (i < dbenv->slice_cnt && (sl_dbp = dbp->db_slices[i]) == NULL)
593 		i++;
594 
595 	*pos = (int)i;
596 	/* This returns a good DB *, or the NULL if we've seen the last. */
597 	return (sl_dbp);
598 }
599 
600 /*
601  * __db_slice_sync_pp --
602  *	DB->sync pre/post processing for an actually sliced db.
603  *
604  * PUBLIC: int __db_slice_sync_pp __P((DB *, u_int32_t));
605  */
606 int
__db_slice_sync_pp(dbp,flags)607 __db_slice_sync_pp(dbp, flags)
608 	DB *dbp;
609 	u_int32_t flags;
610 {
611 	db_slice_t i;
612 	int ret;
613 
614 	ret = __db_sync_pp(dbp, flags);
615 	for (i = 0; ret == 0 && i != dbp->dbenv->slice_cnt; i++)
616 		ret = __db_sync_pp(dbp->db_slices[i], flags);
617 	return (ret);
618 }
619 
620 /*
621  * __db_slice_map --
622  *	Given a container's DB * and a slice DBT, return the corresponding
623  *	slice number.
624  *
625  * PUBLIC: int __db_slice_map __P((DB *, const DBT *, db_slice_t *));
626  */
627 int
__db_slice_map(dbp,slice,slice_indexp)628 __db_slice_map(dbp, slice, slice_indexp)
629 	DB *dbp;
630 	const DBT *slice;
631 	db_slice_t *slice_indexp;
632 {
633 	ENV *env;
634 	db_slice_t hash;
635 
636 	env = dbp->env;
637 
638 	if (dbp->db_slices == NULL)
639 		return (__db_not_sliced(dbp));
640 	hash = (db_slice_t)__ham_func5(NULL, slice->data, slice->size);
641 	*slice_indexp = hash % env->dbenv->slice_cnt;
642 	return (0);
643 }
644 
645 /*
646  * __db_slice_lookup_pp --
647  *	DB->slice_lookup API call
648  *
649  *	Map a key to its slice, return its DB *.
650  *
651  * PUBLIC: int __db_slice_lookup_pp __P((DB *, const DBT *, DB **, u_int32_t));
652  */
653 int
__db_slice_lookup_pp(dbp,key,sl_dbpp,flags)654 __db_slice_lookup_pp(dbp, key, sl_dbpp, flags)
655 	DB *dbp;
656 	const DBT *key;
657 	DB **sl_dbpp;
658 	u_int32_t flags;
659 {
660 	DBT slice;
661 	int ret;
662 	db_slice_t id;
663 
664 	if ((ret = __dbt_usercopy(dbp->env, (DBT *)key)) != 0 ||
665 	    (ret = __db_fchk(dbp->env, "DB->slice_lookup", flags, 0)) != 0)
666 		return (ret);
667 
668 	if ((ret = __db_slice_build(dbp, key, &slice)) != 0 ||
669 	    (ret = __db_slice_map(dbp, &slice, &id)) != 0)
670 		*sl_dbpp = NULL;
671 	else
672 		*sl_dbpp = dbp->db_slices[id];
673 
674 	FREE_IF_NEEDED(dbp->env, &slice);
675 	__dbt_userfree(dbp->env, (DBT *)key, NULL, NULL);
676 	return (ret);
677 }
678 
679 /*
680  * __db_slice_build --
681  *	Invoke the major key callback function for the database.
682  *
683  * PUBLIC: int __db_slice_build __P((const DB *, const DBT *, DBT *));
684  */
685 int
__db_slice_build(dbp,key,slice)686 __db_slice_build(dbp, key, slice)
687 	const DB *dbp;
688 	const DBT *key;
689 	DBT *slice;
690 {
691 	int ret;
692 
693 	memset(slice, 0, sizeof(DBT));
694 	if ((ret = dbp->slice_callback(dbp, key, slice)) != 0) {
695 		(void)USR_ERR(dbp->env, ret);
696 		__db_err(dbp->env, ret,
697 		    "Sliced database callback for %s failed", dbp->fname);
698 		return (ret);
699 	}
700 	return (0);
701 }
702 
703 /*
704  * __db_slice_activate --
705  *	Prepare to access a slice of a container's sliced database, creating the
706  *	required transaction as needed.
707  *
708  *	The DB and DB_TXN parameters belong to the containing environment.
709  *	The returned DB and DB_TXN values belong to a slice's environment.
710  *
711  *	If it needs to begin a transaction, this enters both the container's
712  *	environment (here) and the slice's environment (in __txn_slice_begin).
713  *
714  * PUBLIC: int __db_slice_activate
715  * PUBLIC:     __P((DB *, DB_TXN *, const DBT *, DB **, DB_TXN **));
716  */
717 int
__db_slice_activate(dbp,txn,sl_dbt,sl_dbpp,sl_txnp)718 __db_slice_activate(dbp, txn, sl_dbt, sl_dbpp, sl_txnp)
719 	DB *dbp;
720 	DB_TXN *txn;
721 	const DBT *sl_dbt;
722 	DB **sl_dbpp;
723 	DB_TXN **sl_txnp;
724 {
725 	DB *sl_dbp;
726 	DB_THREAD_INFO *ip;
727 	DB_TXN *sl_txn;
728 	ENV *sl_env;
729 	int ret;
730 	char *txnmsg;
731 	db_slice_t slice_index;
732 
733 	*sl_dbpp = NULL;
734 	*sl_txnp = NULL;
735 
736 	if ((ret = __db_slice_map(dbp, sl_dbt, &slice_index)) != 0)
737 		return (ret);
738 
739 	sl_dbp = dbp->db_slices[slice_index];
740 	sl_env = sl_dbp->env;
741 	if (txn == NULL) {
742 		txnmsg = "implicit";
743 		sl_txn = NULL;
744 	} else if (txn->txn_slices == NULL) {
745 		txnmsg = "new";
746 		ENV_ENTER(dbp->env, ip);
747 		txn->thread_info = ip;
748 		ret = __txn_slice_begin(txn, &sl_txn, slice_index);
749 		ENV_LEAVE(dbp->env, ip);
750 	} else if ((sl_txn = txn->txn_slices[slice_index]) == NULL) {
751 		/*
752 		 * If txn_slices has been allocated, then there already is a
753 		 * subordinate transaction for this container's txn. If it
754 		 *  *is not* for this slice, then it is for another one,
755 		 * which we don't support for DML.
756 		 */
757 		ret = __txn_multislice(txn);
758 		txnmsg = "denied second txn";
759 	}
760 	else
761 		txnmsg = "existing";
762 
763 	if (FLD_ISSET(sl_env->dbenv->verbose, DB_VERB_SLICE)) {
764 		char hexbuf[DB_TOHEX_BUFSIZE(DB_VERB_SLICE_PRINTLEN)];
765 		u_int32_t printlen;
766 
767 		if ((printlen = sl_dbt->size) > DB_VERB_SLICE_PRINTLEN)
768 			printlen = DB_VERB_SLICE_PRINTLEN;
769 		__db_msg(sl_env, "activate %s slice %d %s txns %08x:%08x",
770 		    __db_tohex(sl_dbt->data, printlen, hexbuf), slice_index,
771 		    txnmsg, txn == NULL ?  0 : txn->txnid,
772 		    sl_txn == NULL ? 0 : sl_txn->txnid);
773 	}
774 
775 	*sl_dbpp = sl_dbp;
776 	*sl_txnp = sl_txn;
777 
778 	return (ret);
779 }
780 
781 /*
782  * __db_slice_get_pp --
783  *	DB->get pre/post processing when the major key builder is used.
784  *
785  *	Find which slice this fetch accesses, and direct the call to that
786  *	db handle.
787  *
788  * PUBLIC: int __db_slice_get_pp __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
789  */
790 int
__db_slice_get_pp(dbp,txn,key,data,flags)791 __db_slice_get_pp(dbp, txn, key, data, flags)
792 	DB *dbp;
793 	DB_TXN *txn;
794 	DBT *key, *data;
795 	u_int32_t flags;
796 {
797 	DB *sl_dbp;
798 	DBT slice;
799 	DB_TXN *sl_txn;
800 	int ret;
801 
802 	if ((ret = __dbt_usercopy(dbp->env, key)) != 0)
803 		return (ret);
804 
805 	if ((ret = __db_slice_build(dbp, key, &slice)) != 0)
806 		goto err;
807 	if ((ret = __db_slice_activate(dbp,
808 	    txn, &slice, &sl_dbp, &sl_txn)) != 0)
809 		goto err;
810 	__dbt_userfree(dbp->env, key, NULL, NULL);
811 	ret = __db_get_pp(sl_dbp, sl_txn, key, data, flags);
812 
813 	if (0)
814 err:		__dbt_userfree(dbp->env, key, NULL, NULL);
815 	FREE_IF_NEEDED(dbp->env, &slice);
816 	return (ret);
817 }
818 
819 /*
820  * __db_slice_exists_pp --
821  *	Sliced version of DB->exists.
822  *
823  *	Find which slice this fetch accesses; call exists() on that handle.
824  *
825  * PUBLIC: int __db_slice_exists_pp __P((DB *, DB_TXN *, DBT *, u_int32_t));
826  */
827 int
__db_slice_exists_pp(dbp,txn,key,flags)828 __db_slice_exists_pp(dbp, txn, key, flags)
829 	DB *dbp;
830 	DB_TXN *txn;
831 	DBT *key;
832 	u_int32_t flags;
833 {
834 	DB *sl_dbp;
835 	DBT slice;
836 	DB_TXN *sl_txn;
837 	int ret;
838 
839 	if ((ret = __dbt_usercopy(dbp->env, key)) != 0)
840 		return (ret);
841 	if ((ret = __db_slice_build(dbp, key, &slice)) != 0)
842 		goto err;
843 	if ((ret = __db_slice_activate(dbp,
844 	    txn, &slice, &sl_dbp, &sl_txn)) != 0)
845 		goto err;
846 
847 	__dbt_userfree(dbp->env, key, NULL, NULL);
848 	ret = __db_exists(sl_dbp, sl_txn, key, flags);
849 
850 	if (0)
851 err:		__dbt_userfree(dbp->env, key, NULL, NULL);
852 	FREE_IF_NEEDED(dbp->env, &slice);
853 	return (ret);
854 }
855 
856 /*
857  * __db_slice_pget_pp --
858  *	Sliced version DB->pget()
859  *
860  *	This needs to search all slices. Since there is no cross-slice
861  *	transaction support, we ignore any txn passed in and use NULL local
862  *	txns. It does not start at the same slice each time (e.g. slice 0), but
863  *	starts at a random slice; this distributes the workload.
864  *
865  * PUBLIC: int __db_slice_pget_pp
866  * PUBLIC:     __P((DB *, DB_TXN *, DBT *, DBT *, DBT *, u_int32_t));
867  */
868 int
__db_slice_pget_pp(dbp,txn,skey,pkey,data,flags)869 __db_slice_pget_pp(dbp, txn, skey, pkey, data, flags)
870 	DB *dbp;
871 	DB_TXN *txn;
872 	DBT *skey, *pkey, *data;
873 	u_int32_t flags;
874 {
875 	DB *sl_dbp;
876 	db_slice_t count, i, offset;
877 	int ret;
878 
879 	ret = 0;
880 	if (dbp->db_slices == NULL)
881 		return (__db_not_sliced(dbp));
882 	/*
883 	 * Try to pget from each slice in succession.  If any pget() succeeds,
884 	 * or it returns an error besides DB_NOTFOUND, stop right away.
885 	 */
886 	count = dbp->dbenv->slice_cnt;
887 	offset = __os_random() % count;
888 	for (i = 0; i != count; i++) {
889 		sl_dbp = dbp->db_slices[(i + offset) % count];
890 		if ((ret = __db_pget_pp(sl_dbp,
891 		    NULL, skey, pkey, data, flags)) != DB_NOTFOUND)
892 			break;
893 	}
894 	COMPQUIET(txn, NULL);
895 	return (ret);
896 }
897 
898 /*
899  * __db_slice_put_pp --
900  *	Sliced version of DB->put().
901  *
902  *	Find which slice this fetch accesses; call put() on that handle.
903  *
904  * PUBLIC: int __db_slice_put_pp __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
905  */
906 int
__db_slice_put_pp(dbp,txn,key,data,flags)907 __db_slice_put_pp(dbp, txn, key, data, flags)
908 	DB *dbp;
909 	DB_TXN *txn;
910 	DBT *key, *data;
911 	u_int32_t flags;
912 {
913 	DB *sl_dbp;
914 	DBT slice;
915 	DB_TXN *sl_txn;
916 	int ret;
917 
918 	if ((ret = __dbt_usercopy(dbp->env, key)) != 0)
919 		return (ret);
920 
921 	if ((ret = __db_slice_build(dbp, key, &slice)) != 0)
922 		goto err;
923 	if ((ret = __db_slice_activate(dbp,
924 	    txn, &slice, &sl_dbp, &sl_txn)) != 0)
925 		goto err;
926 	__dbt_userfree(dbp->env, key, NULL, NULL);
927 	ret = __db_put_pp(sl_dbp, sl_txn, key, data, flags);
928 
929 	if (0)
930 err:		__dbt_userfree(dbp->env, key, NULL, NULL);
931 	FREE_IF_NEEDED(dbp->env, &slice);
932 	return (ret);
933 }
934 
935 /*
936  * __db_slice_del_pp --
937  *	Sliced version of DB->del().
938  *
939  *	Find which slice this fetch accesses; call del() on that handle.
940  *
941  * PUBLIC: int __db_slice_del_pp __P((DB *, DB_TXN *, DBT *, u_int32_t));
942  */
943 int
__db_slice_del_pp(dbp,txn,key,flags)944 __db_slice_del_pp(dbp, txn, key, flags)
945 	DB *dbp;
946 	DB_TXN *txn;
947 	DBT *key;
948 	u_int32_t flags;
949 {
950 	DB *sl_dbp;
951 	DBT slice;
952 	DB_TXN *sl_txn;
953 	int ret;
954 
955 	if ((ret = __dbt_usercopy(dbp->env, key)) != 0)
956 		return (ret);
957 
958 	if ((ret = __db_slice_build(dbp, key, &slice)) != 0)
959 		goto err;
960 	if ((ret = __db_slice_activate(dbp,
961 	    txn, &slice, &sl_dbp, &sl_txn)) != 0)
962 		goto err;
963 	__dbt_userfree(dbp->env, key, NULL, NULL);
964 	ret = __db_del_pp(sl_dbp, sl_txn, key, flags);
965 
966 	if (0)
967 err:		__dbt_userfree(dbp->env, key, NULL, NULL);
968 	FREE_IF_NEEDED(dbp->env, &slice);
969 	return (ret);
970 }
971 
972 /*
973  * __db_slice_secondary_get_pp --
974  *	Sliced version __db_secondary_get(), i.e., DB->get() for a secondary DB.
975  *
976  *	This needs to search all slices. Since there is no cross-slice
977  *	transaction support, we ignore any txn passed in and use NULL local
978  *	txns. Like __db_slice_pget_pp(), it does not start at the same slice
979  *	each time (e.g. slice 0), but starts at a random slice.
980  *
981  * PUBLIC: int __db_slice_secondary_get_pp
982  * PUBLIC:     __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
983  */
984 int
__db_slice_secondary_get_pp(sdbp,txn,skey,data,flags)985 __db_slice_secondary_get_pp(sdbp, txn, skey, data, flags)
986 	DB *sdbp;
987 	DB_TXN *txn;
988 	DBT *skey, *data;
989 	u_int32_t flags;
990 {
991 	DB *sl_dbp;
992 	ENV *env;
993 	db_slice_t count, i, offset;
994 	int ret;
995 
996 	ret = 0;
997 	env = sdbp->env;
998 	count = env->dbenv->slice_cnt;
999 	DB_ASSERT(env, count != 0);
1000 	/*
1001 	 * Try to get from each slice. If any get() succeeds, or one returns an
1002 	 * error besides DB_NOTFOUND, stop right away. Start at a random slice.
1003 	 */
1004 	offset = __os_random() % count;
1005 	for (i = 0; i != count; i++) {
1006 		sl_dbp = sdbp->db_slices[(i + offset) % count];
1007 		DB_ASSERT(env, F_ISSET(sl_dbp, DB_AM_SECONDARY));
1008 		if ((ret = __db_secondary_get(sl_dbp,
1009 		    NULL, skey, data, flags)) != DB_NOTFOUND)
1010 			break;
1011 	}
1012 	COMPQUIET(txn, NULL);
1013 	return (ret);
1014 }
1015 
1016 /*
1017  * __dbc_slice_init --
1018  *	Finish initializing a container's sliced cursor.
1019  *
1020  *	Change some of the API functions to the sliced cursor equivalents.
1021  *	The internal access method functions of a sliced cursor must not be
1022  *	used; their pointers are set to return an error if they are called.
1023  *
1024  * PUBLIC: int __dbc_slice_init __P((DBC *));
1025  */
1026 int
__dbc_slice_init(dbc)1027 __dbc_slice_init(dbc)
1028 	DBC *dbc;
1029 {
1030 	DB_ASSERT(dbc->env, FLD_ISSET(dbc->dbp->open_flags, DB_SLICED));
1031 
1032 	dbc->del = __dbc_slice_del_pp;
1033 	dbc->get = __dbc_slice_get_pp;
1034 	dbc->pget = __dbc_slice_pget_pp;
1035 	dbc->put = __dbc_slice_put_pp;
1036 
1037 	return (0);
1038 }
1039 
1040 /*
1041  * __dbc_slice_close --
1042  *	Close any open cursors on the slices before closing the
1043  *	top cursor.
1044  *
1045  * PUBLIC: int __dbc_slice_close __P((DBC *));
1046  */
1047 int
__dbc_slice_close(dbc)1048 __dbc_slice_close(dbc)
1049 	DBC *dbc;
1050 {
1051 	int ret;
1052 
1053 	ret = 0;
1054 	if (dbc->dbc_slices[0] != NULL) {
1055 		ret = __dbc_close_pp(dbc->dbc_slices[0]);
1056 		dbc->dbc_slices[0] = NULL;
1057 	}
1058 	return (ret);
1059 }
1060 
1061 /*
1062  * __dbc_slice_activate --
1063  *	Prepare to access a slice of a sliced container's cursor, creating the
1064  *	required sub-environment's cursor as needed.
1065  *
1066  *	The DBC parameter belongs to the containing environment.
1067  *	The returned DBC values belong to a slice's environment.
1068  *
1069  *	This sometimes enters the slice's environment (when beginning a
1070  *	transaction there); it *does not* enter the container's environment.
1071  *
1072  *	If the cursor command in 'flags' is DB_FIRST or DB_LAST, then this
1073  *	changes	the container's DBC->get to iterate through all the slices.
1074  *	More details about that TBD.
1075  *
1076  *
1077  * PUBLIC: int __dbc_slice_activate
1078  * PUBLIC:     __P((DBC *, const DBT *, DBC **, u_int32_t));
1079  */
1080 int
__dbc_slice_activate(dbc,key,sl_dbcp,flags)1081 __dbc_slice_activate(dbc, key, sl_dbcp, flags)
1082 	DBC *dbc;
1083 	const DBT *key;
1084 	DBC **sl_dbcp;
1085 	u_int32_t flags;
1086 {
1087 	DB *dbp, *sl_dbp;
1088 	DB_TXN *sl_txn;
1089 	DBT slice;
1090 	int ret;
1091 	db_slice_t slice_index;
1092 
1093 	*sl_dbcp = NULL;
1094 	dbp = dbc->dbp;
1095 
1096 	if ((ret = __db_slice_build(dbp, key, &slice)) != 0)
1097 		return (ret);
1098 
1099 	if (dbc->dbc_slices[0] == NULL) {
1100 		if ((ret = __db_slice_activate(dbp,
1101 		    dbc->txn, &slice, &sl_dbp, &sl_txn)) != 0)
1102 			goto err;
1103 		if ((ret = __db_cursor_pp(sl_dbp, sl_txn,
1104 		    &dbc->dbc_slices[0], dbc->open_flags & ~DB_SLICED)) != 0)
1105 			goto err;
1106 	} else if ((ret = __db_slice_map(dbp, &slice, &slice_index)) != 0)
1107 		goto err;
1108 	else if (dbc->dbc_slices[0]->dbp->db_slice_index != slice_index) {
1109 		ret = __txn_multislice(dbc->txn);
1110 		goto err;
1111 	}
1112 
1113 	*sl_dbcp = dbc->dbc_slices[0];
1114 
1115 err:	FREE_IF_NEEDED(dbp->env, &slice);
1116 	COMPQUIET(flags, 0);
1117 	return (ret);
1118 }
1119 
1120 /*
1121  * __dbc_slice_get_pp --
1122  *	DBC->get pre/post processing for sliced cursors.
1123  *
1124  * PUBLIC: int __dbc_slice_get_pp __P((DBC *, DBT *, DBT *, u_int32_t));
1125  */
1126 int
__dbc_slice_get_pp(dbc,key,data,flags)1127 __dbc_slice_get_pp(dbc, key, data, flags)
1128 	DBC *dbc;
1129 	DBT *key, *data;
1130 	u_int32_t flags;
1131 {
1132 	DBC *sl_dbc;
1133 	int init_get_all, op, ret;
1134 
1135 	init_get_all = 0;
1136 	op = flags & DB_OPFLAGS_MASK;
1137 	switch  (op) {
1138 	case DB_NEXT:
1139 	case DB_NEXT_NODUP:
1140 		if (dbc->dbc_slices[0] != NULL || key->size != 0)
1141 			break;
1142 		/* Fall through to initialize all-slice scan */
1143 	case DB_FIRST:
1144 		init_get_all = 1;
1145 		break;
1146 	case DB_PREV:
1147 	case DB_PREV_NODUP:
1148 		if (dbc->dbc_slices[0] != NULL || key->size != 0)
1149 			break;
1150 		/* Fall through to initialize all-slice backwards scan */
1151 	case DB_LAST:
1152 		init_get_all = 1;
1153 		break;
1154 	default:
1155 		break;
1156 	}
1157 
1158 	if ((ret = __dbt_usercopy(dbc->env, key)) != 0)
1159 		return (ret);
1160 
1161 	ret = __dbc_slice_activate(dbc, key, &sl_dbc, flags);
1162 	if (ret != 0) {
1163 		__dbt_userfree(dbc->env, key, NULL, NULL);
1164 		return (ret);
1165 	}
1166 	if (init_get_all) {
1167 		if (key->size != 0) {
1168 			ret = USR_ERR(dbc->env, EINVAL);
1169 			__db_err(dbc->env, ret,
1170 			    "sliced DB_FIRST/DB_LAST with key (size %u)",
1171 			    key->size);
1172 			return (ret);
1173 		}
1174 		/* Since key->size is 0, the first slice was activated above. */
1175 		dbc->dbc_curslice = 0;
1176 		dbc->get = __dbc_slice_get_all_pp;
1177 		if (op == DB_FIRST)
1178 			flags = (flags & ~DB_OPFLAGS_MASK) | DB_NEXT;
1179 		else if (op == DB_LAST)
1180 			flags = (flags & ~DB_OPFLAGS_MASK) | DB_PREV;
1181 
1182 		/* Invoke the changed 'get' function that was set just above. */
1183 		ret = __dbc_slice_get_all_pp(dbc, key, data, flags);
1184 	} else
1185 		ret = __dbc_get_pp(sl_dbc, key, data, flags);
1186 
1187 	return (ret);
1188 }
1189 
1190 /*
1191  * __dbc_slice_fetch_all --
1192  *	Help DBC->get/pget to iterate over multiple slices, opening and closing
1193  *	cursors as needed.
1194  *
1195  *	The secondary key parameter specifies which function to call
1196  *		skey == NULL	DBC->get()
1197  *		skey != NULL	DBC->pget()
1198  *
1199  *	There is no guarantee of cross-slice consistency.
1200  *
1201  * PUBLIC: int __dbc_slice_fetch_all __P((DBC *,
1202  * PUBLIC:     DBT *, DBT *, DBT *, u_int32_t));
1203  */
1204 int
__dbc_slice_fetch_all(dbc,skey,key,data,flags)1205 __dbc_slice_fetch_all(dbc, skey, key, data, flags)
1206 	DBC *dbc;
1207 	DBT *skey, *key, *data;
1208 	u_int32_t flags;
1209 {
1210 	DB *dbp;
1211 	DBC *sl_dbc;
1212 	DB_TXN *sl_txn;
1213 	db_slice_t slice_cnt;
1214 	int multi_slice_err, ret;
1215 
1216 	dbp = dbc->dbp;
1217 	slice_cnt = dbp->dbenv->slice_cnt;
1218 	multi_slice_err = 0;
1219 	/*
1220 	 * If the current slice is too high, the caller has continued fetching
1221 	 * after the previous call returned DB_NOTFOUND.
1222 	 */
1223 	if (dbc->dbc_curslice >= slice_cnt)
1224 		return (DBC_ERR(dbc, DB_NOTFOUND));
1225 
1226 	for (;;) {
1227 		sl_dbc = dbc->dbc_slices[0];
1228 		if (skey == NULL)
1229 			ret = __dbc_get_pp(sl_dbc, key, data, flags);
1230 		else
1231 			ret = __dbc_pget_pp(sl_dbc, skey, key, data, flags);
1232 
1233 		/* On success or a real error, we're done here. */
1234 		if (ret != DB_NOTFOUND)
1235 			break;
1236 
1237 		/*
1238 		 * If a transaction exists and it is not private, then
1239 		 * the txn is accessing multiple slices, and should return
1240 		 * an error.  However, wait to return the error until
1241 		 * after closing the cursor.
1242 		 */
1243 		sl_txn = dbc->txn;
1244 		if (sl_txn != NULL && !F_ISSET(sl_txn, TXN_PRIVATE))
1245 			multi_slice_err = 1;
1246 
1247 		if ((ret = __dbc_close_pp(sl_dbc)) != 0)
1248 			break;
1249 		dbc->dbc_slices[0] = NULL;
1250 
1251 		if (multi_slice_err) {
1252 			ret = __txn_multislice(sl_txn);
1253 			break;
1254 		}
1255 
1256 		if (++dbc->dbc_curslice >= slice_cnt) {
1257 			ret = DBC_ERR(dbc, DB_NOTFOUND);
1258 			break;
1259 		}
1260 
1261 		if ((ret = __db_cursor_pp(dbp->db_slices[dbc->dbc_curslice],
1262 		    NULL, &dbc->dbc_slices[0],
1263 		    dbc->open_flags & ~DB_SLICED)) != 0)
1264 			break;
1265 	}
1266 
1267 	return (ret);
1268 }
1269 
1270 /*
1271  * __dbc_slice_get_all_pp --
1272  *	DBC->get pre/post processing for a DB_FIRST or DB_LAST sliced cursor.
1273  *
1274  *	This goes from one slice to the next, when DB_NOTFOUND. Scan slice 0
1275  *	first, even when moving backwards through the slice (e.g., DB_PREV).
1276  *	There is no guarantee of cross-slice consistency.
1277  *
1278  * PUBLIC: int __dbc_slice_get_all_pp __P((DBC *, DBT *, DBT *, u_int32_t));
1279  */
1280 int
__dbc_slice_get_all_pp(dbc,key,data,flags)1281 __dbc_slice_get_all_pp(dbc, key, data, flags)
1282 	DBC *dbc;
1283 	DBT *key, *data;
1284 	u_int32_t flags;
1285 {
1286 	int ret;
1287 
1288 	switch (flags & DB_OPFLAGS_MASK) {
1289 	case DB_NEXT:
1290 	case DB_PREV:
1291 		ret = __dbc_slice_fetch_all(dbc, NULL, key, data, flags);
1292 		break;
1293 	default:
1294 		dbc->get = __dbc_slice_get_pp;
1295 		ret = __dbc_slice_get_pp(dbc, key, data, flags);
1296 	}
1297 
1298 	return (ret);
1299 }
1300 
1301 /*
1302  * __dbc_slice_pget_pp --
1303  *	DBC->pget processing for sliced cursors.
1304  *
1305  *	This has to open and close cursors in each slice, until it find one or
1306  *	it sees a real error -- DB_NOTFOUND is not an error here.
1307  *
1308  * PUBLIC: int __dbc_slice_pget_pp __P((DBC *, DBT *, DBT *, DBT *, u_int32_t));
1309  */
1310 int
__dbc_slice_pget_pp(dbc,skey,pkey,data,flags)1311 __dbc_slice_pget_pp(dbc, skey, pkey, data, flags)
1312 	DBC *dbc;
1313 	DBT *skey, *pkey, *data;
1314 	u_int32_t flags;
1315 {
1316 	int ret;
1317 
1318 	ret = __dbc_slice_fetch_all(dbc, skey, pkey, data, flags);
1319 	return (ret);
1320 }
1321 
1322 /*
1323  * __dbc_slice_put_pp --
1324  *	DBC->put pre/post processing for sliced cursors.
1325  *
1326  * PUBLIC: int __dbc_slice_put_pp __P((DBC *, DBT *, DBT *, u_int32_t));
1327  */
1328 int
__dbc_slice_put_pp(dbc,key,data,flags)1329 __dbc_slice_put_pp(dbc, key, data, flags)
1330 	DBC *dbc;
1331 	DBT *key, *data;
1332 	u_int32_t flags;
1333 {
1334 	DBC *sl_dbc;
1335 	int ret;
1336 
1337 	if ((ret = __dbt_usercopy(dbc->env, key)) != 0)
1338 		return (ret);
1339 
1340 	ret = __dbc_slice_activate(dbc, key, &sl_dbc, flags);
1341 	__dbt_userfree(dbc->env, key, NULL, NULL);
1342 	if (ret == 0)
1343 		ret = __dbc_put_pp(sl_dbc, key, data, flags);
1344 
1345 	return (ret);
1346 }
1347 
1348 /*
1349  * __dbc_slice_del_pp --
1350  *	DBC->del pre/post processing for sliced cursors.
1351  *
1352  *	This just forwards the cursor delete to the current cursor.
1353  *
1354  * PUBLIC: int __dbc_slice_del_pp __P((DBC *, u_int32_t));
1355  */
1356 int
__dbc_slice_del_pp(dbc,flags)1357 __dbc_slice_del_pp(dbc, flags)
1358 	DBC *dbc;
1359 	u_int32_t flags;
1360 {
1361 	DBC *sl_dbc;
1362 
1363 	/* It is an error to do a cursor delete before the first get. */
1364 	if ((sl_dbc = dbc->dbc_slices[0]) == NULL)
1365 		return (DBC_ERR(dbc, EINVAL));
1366 
1367 	return (__dbc_del_pp(sl_dbc, flags));
1368 }
1369 
1370 /*
1371  * __db_slice_remove --
1372  *	Extra __env_dbremove() steps for a sliced database that are done before
1373  *	removing the container's database.
1374  *
1375  *	Returns:
1376  *		DB_SLICE_CORRUPT if a slice's remove fails.
1377  *
1378  * PUBLIC: int __db_slice_remove
1379  * PUBLIC:     __P((DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t));
1380  */
1381 int
__db_slice_remove(dbenv,txn,name,subdb,flags)1382 __db_slice_remove(dbenv, txn, name, subdb, flags)
1383 	DB_ENV *dbenv;
1384 	DB_TXN *txn;
1385 	const char *name;
1386 	const char *subdb;
1387 	u_int32_t flags;
1388 {
1389 	DB_ENV *slice;
1390 	DB_TXN *sl_txn;
1391 	ENV *env;
1392 	int i, ret, t_ret;
1393 	u_int32_t metaflags;
1394 
1395 	/* Slices do not handle sub-databases. */
1396 	if (subdb != NULL)
1397 		return (0);
1398 
1399 	env = dbenv->env;
1400 	/* This function is a nop if the db is not sliced. */
1401 	if ((ret = __db_get_metaflags(env, name, &metaflags)) != 0)
1402 		return (ret);
1403 	if (!FLD_ISSET(metaflags, DBMETA_SLICED))
1404 		return (0);
1405 	/* Return an error if removing a sliced db from a non-sliced env. */
1406 	if (!SLICES_ON(env))
1407 		return (__env_not_sliced(env));
1408 
1409 	for (i = -1; (slice = __slice_iterate(dbenv, &i)) != NULL; ) {
1410 		if ((t_ret =
1411 		    __txn_slice_begin(txn, &sl_txn, (db_slice_t)i)) != 0 ||
1412 		    (t_ret = __env_dbremove_pp(slice,
1413 		    sl_txn, name, subdb, flags)) != 0) {
1414 			/*
1415 			 * Until cross slice DDL operations are atomic, any
1416 			 * missing files do not return an error code.
1417 			 */
1418 			if (t_ret == ENOENT)
1419 				continue;
1420 			__db_err(env, t_ret, "dbremove #%d %s", i, name);
1421 			/*
1422 			 * Suppress missing files in slice directories;
1423 			 * cross-slice DDL isn't atomic.
1424 			 */
1425 			if (ret == 0)
1426 				ret = USR_ERR(env, DB_SLICE_CORRUPT);
1427 		}
1428 	}
1429 	return (ret);
1430 }
1431 
1432 /*
1433  * __db_slice_associate --
1434  *	Extra associate steps for a sliced database, after doing the container.
1435  *
1436  *	This requires a cross-slice txn; filling the secondaries (if DB_CREATE)
1437  *	is not atomic.
1438  *
1439  *	Returns:
1440  *		DB_SLICE_CORRUPT if a slice's associate fails.
1441  *
1442  * PUBLIC: int __db_slice_associate __P((DB *, DB_TXN *, DB *,
1443  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *, DBT *), u_int32_t));
1444  */
1445 int
__db_slice_associate(dbp,txn,sdbp,callback,flags)1446 __db_slice_associate(dbp, txn, sdbp, callback, flags)
1447 	DB *dbp;
1448 	DB_TXN *txn;
1449 	DB *sdbp;
1450 	int (*callback) __P((DB *, const DBT *, const DBT *, DBT *));
1451 	u_int32_t flags;
1452 {
1453 	DB *sl_dbp;
1454 	DB_TXN *sl_txn;
1455 	int i, ret, t_ret;
1456 
1457 	ret = 0;
1458 	for (i = -1; (sl_dbp = __db_slice_iterate(dbp, &i)) != NULL; ) {
1459 		if ((t_ret =
1460 		    __txn_slice_begin(txn, &sl_txn, (db_slice_t)i)) != 0 ||
1461 		    (t_ret = __db_associate_pp(sl_dbp,
1462 			sl_txn, sdbp->db_slices[i], callback, flags)) != 0) {
1463 			__db_err(dbp->env,
1464 			    t_ret, "db_associate #%d %s", i, dbp->fname);
1465 			if (ret == 0)
1466 				ret = t_ret;
1467 		}
1468 	}
1469 	sdbp->get = __db_slice_secondary_get_pp;
1470 	return (ret);
1471 }
1472 
1473 /*
1474  * __db_slice_compact --
1475  *	Extra compact steps for a sliced database, after doing the container.
1476  *
1477  *	Returns:
1478  *		DB_SLICE_CORRUPT if a slice's compact fails.
1479  *
1480  * PUBLIC: int __db_slice_compact __P((DB *,
1481  * PUBLIC:     DB_TXN *, DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *));
1482  */
1483 int
__db_slice_compact(dbp,txn,start,stop,c_data,flags,end)1484 __db_slice_compact(dbp, txn, start, stop, c_data, flags, end)
1485 	DB *dbp;
1486 	DB_TXN *txn;
1487 	DBT *start, *stop;
1488 	DB_COMPACT *c_data;
1489 	u_int32_t flags;
1490 	DBT *end;
1491 {
1492 	DB *sl_dbp;
1493 	DB_TXN *sl_txn;
1494 	int i, ret, t_ret;
1495 
1496 	ret = 0;
1497 	/* There is nothing extra to do if the database is not sliced.  */
1498 	if (!FLD_ISSET(dbp->open_flags, DB_SLICED))
1499 		return (0);
1500 
1501 	for (i = -1; (sl_dbp = __db_slice_iterate(dbp, &i)) != NULL; ) {
1502 		if ((t_ret =
1503 		    __txn_slice_begin(txn, &sl_txn, (db_slice_t)i)) != 0 ||
1504 		    (t_ret = __db_compact_pp(sl_dbp,
1505 			sl_txn, start, stop, c_data, flags, end)) != 0) {
1506 			__db_err(dbp->env,
1507 			    t_ret, "db_compact #%d %s", i, dbp->fname);
1508 			if (ret == 0)
1509 				ret = t_ret;
1510 		}
1511 	}
1512 	return (ret);
1513 }
1514 
1515 /*
1516  * __db_slice_rename --
1517  *	Extra __env_dbrename steps for a sliced database that are done before
1518  *	renaming the container.
1519  *
1520  *	Returns:
1521  *		DB_SLICE_CORRUPT if a slice's rename fails.
1522  *
1523  * PUBLIC: int __db_slice_rename __P((DB *,
1524  * PUBLIC:     DB_TXN *, const char *, const char *, const char *, u_int32_t));
1525  */
1526 int
__db_slice_rename(dbp,txn,name,subdb,newname,flags)1527 __db_slice_rename(dbp, txn, name, subdb, newname, flags)
1528 	DB *dbp;
1529 	DB_TXN *txn;
1530 	const char *name;
1531 	const char *subdb;
1532 	const char *newname;
1533 	u_int32_t flags;
1534 {
1535 	DB_ENV *dbenv, *slice;
1536 	DB_TXN *sl_txn;
1537 	ENV *env;
1538 	int i, ret, t_ret;
1539 	u_int32_t metaflags;
1540 
1541 	/* Slices do not handle sub-databases. */
1542 	if (subdb != NULL)
1543 		return (0);
1544 
1545 	env = dbp->env;
1546 	dbenv = dbp->dbenv;
1547 	if ((ret = __db_get_metaflags(env, name, &metaflags)) != 0 &&
1548 	    ret != ENOENT)
1549 		return (ret);
1550 	if (!FLD_ISSET(metaflags, DBMETA_SLICED))
1551 		return (0);
1552 	/* Return an error if renaming a sliced db from a non-sliced env. */
1553 	if (!SLICES_ON(env))
1554 		return (__env_not_sliced(env));
1555 
1556 	for (i = -1; (slice = __slice_iterate(dbenv, &i)) != NULL; )
1557 	{
1558 		if ((t_ret =
1559 		    __txn_slice_begin(txn, &sl_txn, (db_slice_t)i)) != 0 ||
1560 		    (t_ret = __env_dbrename_pp(slice,
1561 		    sl_txn, name, subdb, newname, flags)) != 0) {
1562 			/*
1563 			 * Until cross slice DDL operations are atomic, any
1564 			 * missing files do not return an error code.
1565 			 */
1566 			if (t_ret == ENOENT)
1567 				continue;
1568 			__db_err(env, t_ret, DB_STR_A("0784",
1569 			    "dbrename #%d %s->%s", "%d %s %s"),
1570 			    i, name, newname);
1571 			if (ret == 0)
1572 				ret = USR_ERR(env, DB_SLICE_CORRUPT);
1573 		}
1574 	}
1575 	return (ret);
1576 }
1577 
1578 /*
1579  * __db_slice_truncate --
1580  *	Extra truncate steps for a sliced database, after doing the container.
1581  *
1582  *	Returns:
1583  *		DB_SLICE_CORRUPT if a slice's truncate fails.
1584  *
1585  * PUBLIC: int __db_slice_truncate __P((DB *,
1586  * PUBLIC:     DB_TXN *, u_int32_t *, u_int32_t));
1587  */
1588 int
__db_slice_truncate(dbp,txn,countp,flags)1589 __db_slice_truncate(dbp, txn, countp, flags)
1590 	DB *dbp;
1591 	DB_TXN *txn;
1592 	u_int32_t *countp;
1593 	u_int32_t flags;
1594 {
1595 	ENV *env;
1596 	DB_TXN *sl_txn;
1597 	db_slice_t i;
1598 	int ret, t_ret;
1599 	u_int32_t slice_records;
1600 
1601 	env = dbp->env;
1602 	ret = 0;
1603 	if (countp != NULL)
1604 		*countp = 0;
1605 	/* There is nothing extra to do if the database is not sliced.  */
1606 	if (!FLD_ISSET(dbp->open_flags, DB_SLICED))
1607 		return (0);
1608 
1609 	for (i = 0; i != env->dbenv->slice_cnt; i++) {
1610 		slice_records = 0;
1611 		if ((t_ret = __txn_slice_begin(txn, &sl_txn, i)) != 0 ||
1612 		    (t_ret = __db_truncate_pp(dbp->db_slices[i],
1613 		    sl_txn, &slice_records, flags)) != 0) {
1614 			if (FLD_ISSET(env->dbenv->verbose, DB_VERB_SLICE))
1615 				__db_err(env,  t_ret,
1616 				    "db_slice_truncate #%d %s", i, dbp->fname);
1617 			if (ret == 0)
1618 				ret = t_ret;
1619 		}
1620 		if (countp != NULL)
1621 			*countp += slice_records;
1622 	}
1623 	return (ret);
1624 }
1625 
1626 /*
1627  * __db_slice_process --
1628  *	Extra DB->upgrade/convert processing for a possibly sliced database.
1629  *
1630  *	The database has not been opened, so we need to create the slices'
1631  *	handles, and free them when we're done.
1632  *
1633  *	Returns:
1634  *		DB_SLICE_CORRUPT if a slice cannot be found.
1635  *
1636  * PUBLIC: int __db_slice_process __P((DB *, const char *, u_int32_t,
1637  * PUBLIC:     int (*)(DB *, const char *, u_int32_t), const char *));
1638  */
1639 int
__db_slice_process(dbp,fname,flags,pfunc,msgpfx)1640 __db_slice_process(dbp, fname, flags, pfunc, msgpfx)
1641 	DB *dbp;
1642 	const char *fname;
1643 	u_int32_t flags;
1644 	int (*pfunc)(DB *, const char *, u_int32_t);
1645 	const char *msgpfx;
1646 {
1647 	ENV *env;
1648 	db_slice_t i;
1649 	int ret, t_ret;
1650 	u_int32_t metaflags;
1651 
1652 	env = dbp->env;
1653 	/*
1654 	 * Common DDL checks for sliced databases:
1655 	 * Nothing to do if not sliced, it is an error to attempt sliced
1656 	 */
1657 	if ((ret = __db_get_metaflags(env, fname, &metaflags)) != 0 &&
1658 	    ret != ENOENT)
1659 		return (ret);
1660 	if (!FLD_ISSET(metaflags, DBMETA_SLICED))
1661 		return (0);
1662 	if (!SLICES_ON(env))
1663 		return (__env_not_sliced(env));
1664 
1665 	/*
1666 	 * Upgrading a non-sliced db does not require opening the database, but
1667 	 * the sliced version does,
1668 	 */
1669 	if (!F_ISSET(dbp, DB_AM_OPEN_CALLED) &&
1670 	    (ret = __db_slice_alloc(dbp, NULL, NULL)) != 0)
1671 		return (ret);
1672 
1673 	for (i = 0; i != env->dbenv->slice_cnt; i++) {
1674 		if ((t_ret = pfunc(dbp->db_slices[i], fname, flags)) != 0) {
1675 			__db_err(env, t_ret, DB_STR_A("0785",
1676 			    "%s failed for slice #%u: '%s'", "%s %u %s"),
1677 			    msgpfx, i, fname);
1678 			if (ret == 0)
1679 				ret = USR_ERR(env, DB_SLICE_CORRUPT);
1680 		}
1681 	}
1682 
1683 	/* No flush needed: each upgrade has already __os_fsync()'d the file. */
1684 	if ((t_ret = __db_slice_free(dbp, DB_NOSYNC)) != 0 && ret == 0)
1685 		ret = t_ret;
1686 
1687 	return (ret);
1688 }
1689 
1690 /*
1691  * __dbc_slice_dump_get --
1692  *	Help __db_dump() to retrieve every key/value pair of all the slices.
1693  *
1694  *	There is no attempt to provide cross-slice consistency. It is similar
1695  *	to __dbc_slice_fetch_all without secondary index support.
1696  *
1697  * PUBLIC: int __dbc_slice_dump_get __P((DBC *, DBT *, DBT *, u_int32_t));
1698  */
1699 int
__dbc_slice_dump_get(dbc,key,data,flags)1700 __dbc_slice_dump_get(dbc, key, data, flags)
1701 	DBC *dbc;
1702 	DBT *key, *data;
1703 	u_int32_t flags;
1704 {
1705 	DB *dbp;
1706 	DBC *sl_dbc;
1707 	int ret;
1708 
1709 	dbp = dbc->dbp;
1710 
1711 	/*
1712 	 * If the current slice is too high, the caller has continued fetching
1713 	 * after the previous call returned DB_NOTFOUND.
1714 	 */
1715 	if (dbc->dbc_curslice >= dbp->dbenv->slice_cnt)
1716 		return (DBC_ERR(dbc, DB_NOTFOUND));
1717 
1718 	for (;;) {
1719 		if (dbc->dbc_slices[0] == NULL && (ret =
1720 		    __db_cursor_pp(dbp->db_slices[dbc->dbc_curslice], NULL,
1721 		    &dbc->dbc_slices[0], dbc->open_flags & ~DB_SLICED)) != 0)
1722 			break;
1723 		sl_dbc = dbc->dbc_slices[0];
1724 
1725 		ret = __dbc_get_pp(sl_dbc, key, data, flags);
1726 
1727 		/* On success or a real error, we're done here. */
1728 		if (ret != DB_NOTFOUND)
1729 			break;
1730 
1731 		if ((ret = __dbc_close_pp(sl_dbc)) != 0)
1732 			break;
1733 		dbc->dbc_slices[0] = NULL;
1734 
1735 		if (++dbc->dbc_curslice >= dbp->dbenv->slice_cnt) {
1736 			ret = DBC_ERR(dbc, DB_NOTFOUND);
1737 			break;
1738 		}
1739 	}
1740 
1741 	return (ret);
1742 }
1743 
1744 /*
1745  * __db_slice_verify --
1746  *	Extra DB->verify processing for a possibly sliced database.
1747  *
1748  *	The database has not been opened, so we need to create the slices'
1749  *	handles, and free them when we're done, like __db_slice_verify.
1750  *
1751  *	Returns:
1752  *		DB_SLICE_CORRUPT if a slice cannot be found.
1753  *
1754  * PUBLIC: int __db_slice_verify __P((DB *, const char *,
1755  * PUBLIC:     const char *, void *, int (*)(void *, const void *), u_int32_t));
1756  */
1757 int
__db_slice_verify(dbp,fname,dname,handle,callback,flags)1758 __db_slice_verify(dbp, fname, dname, handle, callback, flags)
1759 	DB *dbp;
1760 	const char *fname;
1761 	const char *dname;
1762 	void *handle;
1763 	int (*callback) __P((void *, const void *));
1764 	u_int32_t flags;
1765 {
1766 	ENV *env;
1767 	db_slice_t i;
1768 	int ret, t_ret;
1769 	u_int32_t metaflags;
1770 
1771 	/* Slices do not handle sub-databases. */
1772 	if (dname != NULL)
1773 		return (0);
1774 
1775 	env = dbp->env;
1776 	/*
1777 	 * Common DDL checks for sliced databases:
1778 	 * Nothing to do if not sliced, it is an error to attempt sliced
1779 	 */
1780 	if ((ret = __db_get_metaflags(env, fname, &metaflags)) != 0 &&
1781 	    ret != ENOENT)
1782 		return (ret);
1783 	if (!FLD_ISSET(metaflags, DBMETA_SLICED))
1784 		return (0);
1785 	if (!SLICES_ON(env))
1786 		return (__env_not_sliced(env));
1787 
1788 	if ((ret = __db_slice_alloc(dbp, NULL, NULL)) != 0)
1789 		goto err;
1790 	for (i = 0; i != env->dbenv->slice_cnt; i++) {
1791 		if ((t_ret = __db_verify_internal(dbp->db_slices[i],
1792 		    fname, dname, handle, callback, flags)) != 0) {
1793 			__db_err(env, t_ret, DB_STR_A("0786",
1794 			    "db_verify #%u %s", "%d %s"), i, fname);
1795 			if (ret == 0)
1796 				ret = USR_ERR(env, DB_SLICE_CORRUPT);
1797 		}
1798 	}
1799 
1800 	/* Verify closed the dbs but doesn't free the db_slices array. */
1801 	if ((t_ret = __db_slice_free(dbp, DB_NOSYNC)) != 0 && ret == 0)
1802 		ret = t_ret;
1803 err:
1804 	return (ret);
1805 }
1806 
1807 #endif
1808