1 /*-
2  * Copyright (c) 1996, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/db_verify.h"
14 #include "dbinc/btree.h"
15 #ifdef HAVE_HASH
16 #include "dbinc/hash.h"
17 #endif
18 #include "dbinc/lock.h"
19 #include "dbinc/mp.h"
20 #include "dbinc/partition.h"
21 #include "dbinc/txn.h"
22 #ifdef HAVE_PARTITION
23 
24 static int __part_rr __P((DB *, DB_THREAD_INFO *, DB_TXN *,
25 	       const char *, const char *, const char *, u_int32_t));
26 static int __partc_close __P((DBC *, db_pgno_t, int *));
27 static int __partc_del __P((DBC*, u_int32_t));
28 static int __partc_destroy __P((DBC*));
29 static int __partc_get_pp __P((DBC*, DBT *, DBT *, u_int32_t));
30 static int __partc_put __P((DBC*, DBT *, DBT *, u_int32_t, db_pgno_t *));
31 static int __partc_writelock __P((DBC*));
32 static int __partition_chk_meta __P((DB *,
33 		DB_THREAD_INFO *, DB_TXN *, u_int32_t));
34 static int __partition_setup_keys __P((DBC *,
35 		DB_PARTITION *, u_int32_t, u_int32_t));
36 static int __part_key_cmp __P((const void *, const void *));
37 static inline void __part_search __P((DB *,
38 		DB_PARTITION *, DBT *, u_int32_t *));
39 
40 #define	ALLOC_ERR DB_STR_A("0764","Partition failed to allocate %d bytes","%d")
41 
42 /*
43  * Allocate a partition cursor and copy flags to the partition cursor.
44  * Not passed:
45  *	DBC_PARTITIONED -- the subcursors are not.
46  *	DBC_OWN_LID -- the arg dbc owns the lock id.
47  *	DBC_WRITECURSOR DBC_WRITER -- CDS locking happens on
48  *				the whole DB, not the partition.
49  */
50 #define	GET_PART_CURSOR(dbc, new_dbc, part_id) do {			     \
51 	DB *__part_dbp;							     \
52 	__part_dbp = part->handles[part_id];				     \
53 	if ((ret = __db_cursor_int(__part_dbp,				     \
54 	     (dbc)->thread_info, (dbc)->txn, __part_dbp->type,		     \
55 	     PGNO_INVALID, 0, (dbc)->locker, &new_dbc)) != 0)		     \
56 		goto err;						     \
57 	(new_dbc)->flags = (dbc)->flags &				     \
58 	    ~(DBC_PARTITIONED|DBC_OWN_LID|DBC_WRITECURSOR|DBC_WRITER);	     \
59 } while (0)
60 
61 /*
62  * Search for the correct partition.
63  */
__part_search(dbp,part,key,part_idp)64 static inline void __part_search(dbp, part, key, part_idp)
65 	DB *dbp;
66 	DB_PARTITION *part;
67 	DBT *key;
68 	u_int32_t *part_idp;
69 {
70 	db_indx_t base, indx, limit;
71 	int cmp;
72 	int (*func) __P((DB *, const DBT *, const DBT *, size_t *));
73 	size_t pos, pos_h, pos_l;
74 
75 	DB_ASSERT(dbp->env, part->nparts != 0);
76 	COMPQUIET(cmp, 0);
77 	COMPQUIET(indx, 0);
78 
79 	pos_h = 0;
80 	pos_l = 0;
81 	func = ((BTREE *)dbp->bt_internal)->bt_compare;
82 	DB_BINARY_SEARCH_FOR(base, limit, part->nparts, O_INDX) {
83 		pos = pos_l > pos_h ? pos_h : pos_l;
84 		DB_BINARY_SEARCH_INCR(indx, base, limit, O_INDX);
85 		cmp = func(dbp, key, &part->keys[indx], &pos);
86 		if (cmp == 0)
87 			break;
88 		if (cmp > 0) {
89 			DB_BINARY_SEARCH_SHIFT_BASE(indx, base, limit, O_INDX);
90 			pos_l = pos;
91 		} else
92 			pos_h = pos;
93 	}
94 	if (cmp == 0)
95 		*part_idp = indx;
96 	else if ((*part_idp = base) != 0)
97 		(*part_idp)--;
98 }
99 
100 /*
101  * __partition_init --
102  *	Initialize the partition structure.
103  * Called when the meta data page is read in during database open or
104  * when partition keys or a callback are set.
105  *
106  * PUBLIC: int __partition_init __P((DB *, u_int32_t));
107  */
108 int
__partition_init(dbp,flags)109 __partition_init(dbp, flags)
110 	DB *dbp;
111 	u_int32_t flags;
112 {
113 	DB_PARTITION *part;
114 	int ret;
115 
116 	if ((part = dbp->p_internal) != NULL) {
117 		if ((LF_ISSET(DBMETA_PART_RANGE) &&
118 		    F_ISSET(part, PART_CALLBACK)) ||
119 		    (LF_ISSET(DBMETA_PART_CALLBACK) &&
120 		    F_ISSET(part, PART_RANGE))) {
121 			__db_errx(dbp->env, DB_STR("0645",
122 			    "Cannot specify callback and range keys."));
123 			return (EINVAL);
124 		}
125 	} else if ((ret = __os_calloc(dbp->env, 1, sizeof(*part), &part)) != 0)
126 		return (ret);
127 
128 	if (LF_ISSET(DBMETA_PART_RANGE))
129 		F_SET(part, PART_RANGE);
130 	if (LF_ISSET(DBMETA_PART_CALLBACK))
131 		F_SET(part, PART_CALLBACK);
132 	dbp->p_internal = part;
133 	/* Set up AM-specific methods that do not require an open. */
134 	dbp->db_am_rename = __part_rename;
135 	dbp->db_am_remove = __part_remove;
136 	return (0);
137 }
138 /*
139  * __partition_set --
140  *	Set the partitioning keys or callback function.
141  * This routine must be called prior to creating the database.
142  * PUBLIC: int __partition_set __P((DB *, u_int32_t, DBT *,
143  * PUBLIC:	u_int32_t (*callback)(DB *, DBT *key)));
144  */
145 
146 int
__partition_set(dbp,parts,keys,callback)147 __partition_set(dbp, parts, keys, callback)
148 	DB *dbp;
149 	u_int32_t parts;
150 	DBT *keys;
151 	u_int32_t (*callback)(DB *, DBT *key);
152 {
153 	DB_PARTITION *part;
154 	ENV *env;
155 	u_int32_t i;
156 	int ret, t_ret;
157 
158 	DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_partition");
159 	env = dbp->dbenv->env;
160 
161 	if (parts < 2) {
162 		__db_errx(env, DB_STR("0646",
163 		    "Must specify at least 2 partitions."));
164 		return (EINVAL);
165 	} else if (parts > PART_MAXIMUM) {
166 		__db_errx(env, DB_STR_A("0772",
167 		    "Must not specify more than %u partitions.", "%u"),
168 		    (unsigned int)PART_MAXIMUM);
169 		return (EINVAL);
170 	}
171 
172 	if (keys == NULL && callback == NULL) {
173 		__db_errx(env, DB_STR("0647",
174 		    "Must specify either keys or a callback."));
175 		return (EINVAL);
176 	}
177 	if (keys != NULL && callback != NULL) {
178 bad:		__db_errx(env, DB_STR("0648",
179 		    "May not specify both keys and a callback."));
180 		return (EINVAL);
181 	}
182 
183 	if ((ret = __partition_init(dbp,
184 	    keys != NULL ?
185 	    DBMETA_PART_RANGE : DBMETA_PART_CALLBACK)) != 0)
186 		return (ret);
187 	part = dbp->p_internal;
188 
189 	if ((part->keys != NULL && callback != NULL) ||
190 	    (part->callback != NULL && keys != NULL))
191 		goto bad;
192 
193 	/*
194 	 * Free a key array that was allocated by an earlier set_partition call.
195 	 */
196 	if (part->keys != NULL) {
197 		for (i = 0; i < part->nparts - 1; i++) {
198 			/*
199 			 * Always free all entries in the key array and return
200 			 * the first error code.
201 			 */
202 			if ((t_ret = __db_dbt_clone_free(dbp->env,
203 			    &part->keys[i])) != 0 && ret == 0)
204 				ret = t_ret;
205 		}
206 		__os_free(dbp->env, part->keys);
207 		part->keys = NULL;
208 	}
209 
210 	if (ret != 0)
211 		return (ret);
212 
213 	part->nparts = parts;
214 	part->callback = callback;
215 
216 	/*
217 	 * Take a copy of the users key array otherwise we cannot be sure
218 	 * that the memory will still be valid when the database is opened.
219 	 */
220 	if (keys != NULL) {
221 		if ((ret = __os_calloc(dbp->env,
222 		    part->nparts - 1, sizeof(DBT), &part->keys)) != 0)
223 			goto err;
224 
225 		for (i = 0, parts = 0; i < part->nparts - 1; i++, parts++)
226 			if ((ret = __db_dbt_clone(dbp->env,
227 			    &part->keys[i], &keys[i])) != 0)
228 				goto err;
229 	}
230 
231 err:	if (ret != 0 && part->keys != NULL) {
232 		/*
233 		 * Always free those entries cloned successfully in the key
234 		 * array and the one which fails in __db_dbt_clone, and
235 		 * return the first error code. As ret != 0 here, so it is
236 		 * safe to ignore any error from __db_dbt_clone_free.
237 		 */
238 		for (i = 0; i < parts; i++)
239 			(void)__db_dbt_clone_free(dbp->env, &part->keys[i]);
240 		if (parts < part->nparts - 1 && part->keys[parts].data != NULL)
241 			__os_free(dbp->env, part->keys[parts].data);
242 		__os_free(dbp->env, part->keys);
243 		part->keys = NULL;
244 	}
245 	return (ret);
246 }
247 
248 /*
249  * __partition_set_dirs --
250  *	Set the directories for creating the partition databases.
251  * They must be in the environment.
252  * PUBLIC: int __partition_set_dirs __P((DB *, const char **));
253  */
254 int
__partition_set_dirs(dbp,dirp)255 __partition_set_dirs(dbp, dirp)
256 	DB *dbp;
257 	const char **dirp;
258 {
259 	DB_ENV *dbenv;
260 	DB_PARTITION *part;
261 	ENV *env;
262 	u_int32_t ndirs, slen;
263 	int i, ret;
264 	const char **dir;
265 	char *cp, **part_dirs, **pd;
266 
267 	DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_partition_dirs");
268 	dbenv = dbp->dbenv;
269 	env = dbp->env;
270 
271 	ndirs = 1;
272 	slen = 0;
273 	for (dir = dirp; *dir != NULL; dir++) {
274 		if (F_ISSET(env, ENV_DBLOCAL))
275 			slen += (u_int32_t)strlen(*dir) + 1;
276 		ndirs++;
277 	}
278 
279 	slen += sizeof(char *) * ndirs;
280 	if ((ret = __os_malloc(env, slen,  &part_dirs)) != 0)
281 		return (EINVAL);
282 	memset(part_dirs, 0, slen);
283 
284 	cp = (char *) part_dirs + (sizeof(char *) * ndirs);
285 	pd = part_dirs;
286 	for (dir = dirp; *dir != NULL; dir++, pd++) {
287 		if (F_ISSET(env, ENV_DBLOCAL)) {
288 			(void)strcpy(cp, *dir);
289 			*pd = cp;
290 			cp += strlen(*dir) + 1;
291 			continue;
292 		}
293 		for (i = 0; i < dbenv->data_next; i++)
294 			if (strcmp(*dir, dbenv->db_data_dir[i]) == 0)
295 				break;
296 		if (i == dbenv->data_next) {
297 			__db_errx(dbp->env, DB_STR_A("0649",
298 			    "Directory not in environment list %s",
299 			    "%s"), *dir);
300 			__os_free(env, part_dirs);
301 			return (EINVAL);
302 		}
303 		*pd = dbenv->db_data_dir[i];
304 	}
305 
306 	if ((part = dbp->p_internal) == NULL) {
307 		if ((ret = __partition_init(dbp, 0)) != 0)
308 			return (ret);
309 		part = dbp->p_internal;
310 	}
311 
312 	part->dirs = (const char **)part_dirs;
313 
314 	return (0);
315 }
316 
317 /*
318  * __partition_extent_names --
319  *	Generate a list of partition extent file names.
320  * PUBLIC: int __partition_extent_names __P((DB *, const char *, char ***));
321  */
322 int
__partition_extent_names(dbp,fname,namelistp)323 __partition_extent_names(dbp, fname, namelistp)
324 	DB *dbp;
325 	const char *fname;
326 	char ***namelistp;
327 {
328 	DB_PARTITION *part;
329 	ENV *env;
330 	char *name, *sp, **cp, *freep;
331 	const char *np;
332 	u_int32_t part_id, namelen, len;
333 	int ret;
334 
335 	env = dbp->env;
336 	part = (DB_PARTITION*)dbp->p_internal;
337 	*namelistp = NULL;
338 
339 	namelen = strlen(fname) + PART_LEN + 1;
340 	len = part->nparts * (namelen + sizeof(char*)) + sizeof(char*);
341 
342 	if ((ret = __os_malloc(env, namelen, &name)) != 0)
343 		goto err;
344 	if ((ret = __os_malloc(env, len, namelistp)) != 0)
345 		goto err;
346 
347 	sp = name;
348 	np = __db_rpath(fname);
349 	if (np == NULL)
350 		np = fname;
351 	else {
352 		np++;
353 		(void)strncpy(name, fname, (size_t)(np - fname));
354 		sp = name + (np - fname);
355 	}
356 
357 	cp = *namelistp;
358 	freep = (char*)(cp + part->nparts + 1);
359 	for (part_id = 0; part_id < part->nparts; part_id++) {
360 		(void)sprintf(sp, PART_NAME, np, part_id);
361 		*cp++ = freep;
362 		(void)strcpy(freep, name);
363 		freep += namelen;
364 	}
365 	*cp = NULL;
366 
367 err:	if (name != NULL)
368 		__os_free(env, name);
369 	return (ret);
370 }
371 
372 /*
373  * __partition_open --
374  *	Open/create a partitioned database.
375  * PUBLIC: int __partition_open __P((DB *, DB_THREAD_INFO *,
376  * PUBLIC:	 DB_TXN *, const char *, DBTYPE, u_int32_t, int, int));
377  */
378 int
__partition_open(dbp,ip,txn,fname,type,flags,mode,do_open)379 __partition_open(dbp, ip, txn, fname, type, flags, mode, do_open)
380 	DB *dbp;
381 	DB_THREAD_INFO *ip;
382 	DB_TXN *txn;
383 	const char *fname;
384 	DBTYPE type;
385 	u_int32_t flags;
386 	int mode, do_open;
387 {
388 	DB *part_db;
389 	DB_PARTITION *part;
390 	DBC *dbc;
391 	ENV *env;
392 	u_int32_t part_id;
393 	int ret;
394 	char *name, *sp;
395 	const char **dirp, *np;
396 
397 	part = dbp->p_internal;
398 	env = dbp->dbenv->env;
399 	name = NULL;
400 
401 	if ((ret = __partition_chk_meta(dbp, ip, txn, flags)) != 0 && do_open)
402 		goto err;
403 
404 	if (part->nparts > PART_MAXIMUM) {
405 		__db_errx(env, DB_STR_A("0789",
406 	    "The number of partitions %u exceeds the maximum %u.", "%u %u"),
407 		    part->nparts, (unsigned int)PART_MAXIMUM);
408 		ret = USR_ERR(env, EINVAL);
409 		goto err;
410 	}
411 
412 	if ((ret = __os_calloc(env,
413 	     part->nparts, sizeof(*part->handles), &part->handles)) != 0) {
414 		__db_errx(env, ALLOC_ERR,
415 		    (int)(part->nparts * sizeof(*part->handles)));
416 		goto err;
417 	}
418 
419 	DB_ASSERT(env, fname != NULL);
420 	if ((ret = __os_malloc(env,
421 	     strlen(fname) + PART_LEN + 1, &name)) != 0) {
422 		__db_errx(env, ALLOC_ERR,
423 		    (int)(strlen(fname) + PART_LEN + 1));
424 		goto err;
425 	}
426 
427 	sp = name;
428 	np = __db_rpath(fname);
429 	if (np == NULL)
430 		np = fname;
431 	else {
432 		np++;
433 		(void)strncpy(name, fname, (size_t)(np - fname));
434 		sp = name + (np - fname);
435 	}
436 
437 	if (F_ISSET(dbp, DB_AM_RECOVER))
438 		goto done;
439 	dirp = part->dirs;
440 	for (part_id = 0; part_id < part->nparts; part_id++) {
441 		if ((ret = __db_create_internal(
442 		    &part->handles[part_id], dbp->env, 0)) != 0)
443 			goto err;
444 
445 		part_db = part->handles[part_id];
446 		part_db->flags = F_ISSET(dbp,
447 		    ~(DB_AM_CREATED | DB_AM_CREATED_MSTR | DB_AM_OPEN_CALLED));
448 		F_SET(part_db, DB_AM_PARTDB);
449 		__db_copy_config(dbp, part_db, part->nparts);
450 
451 		/* These need to be copied for partitions, but not slices. */
452 		part_db->app_private = dbp->app_private;
453 		part_db->adj_fileid = dbp->adj_fileid;
454 
455 		(void)sprintf(sp, PART_NAME, np, part_id);
456 		if (do_open) {
457 			/*
458 			 * Cycle through the directory names passed in,
459 			 * if any.
460 			 */
461 			if (dirp != NULL &&
462 			    (part_db->dirname = *dirp++) == NULL) {
463 				part_db->dirname = *(dirp = part->dirs);
464 				dirp++;
465 			}
466 			if ((ret = __db_open(part_db, ip, txn,
467 			    name, NULL, type, flags, mode, PGNO_BASE_MD)) != 0)
468 				goto err;
469 		} else if ((ret = __os_strdup(env, name, &part_db->fname)) != 0)
470 			goto err;
471 	}
472 
473 	/* Get rid of the cursor used to open the db; it is the wrong type. */
474 done:	while ((dbc = TAILQ_FIRST(&dbp->free_queue)) != NULL)
475 		if ((ret = __dbc_destroy(dbc)) != 0)
476 			break;
477 
478 	if (0) {
479 err:		(void)__partition_close(dbp, txn, 0);
480 	}
481 	if (name != NULL)
482 		__os_free(env, name);
483 	return (ret);
484 }
485 
486 /*
487  * __partition_chk_meta --
488  * Check for a consistent meta data page and parameters when opening a
489  * partitioned database.
490  */
491 static int
__partition_chk_meta(dbp,ip,txn,flags)492 __partition_chk_meta(dbp, ip, txn, flags)
493 	DB *dbp;
494 	DB_THREAD_INFO *ip;
495 	DB_TXN *txn;
496 	u_int32_t flags;
497 {
498 	DBMETA *meta;
499 	DB_PARTITION *part;
500 	DBC *dbc;
501 	DB_LOCK metalock;
502 	DB_MPOOLFILE *mpf;
503 	ENV *env;
504 	db_pgno_t base_pgno;
505 	int ret, set_keys, t_ret;
506 	u_int32_t pgsize;
507 
508 	dbc = NULL;
509 	meta = NULL;
510 	LOCK_INIT(metalock);
511 	part = dbp->p_internal;
512 	mpf = dbp->mpf;
513 	env = dbp->env;
514 	ret = 0;
515 	set_keys = 0;
516 
517 	/*
518 	 * Just to fix the lint warning.
519 	 * The real value will be set later, and we will
520 	 * only use the value after being set properly.
521 	 */
522 	pgsize = dbp->pgsize;
523 
524 	/* Get a cursor on the main db.  */
525 	dbp->p_internal = NULL;
526 	if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
527 		goto err;
528 
529 	/* Get the metadata page. */
530 	base_pgno = PGNO_BASE_MD;
531 	if ((ret =
532 	    __db_lget(dbc, 0, base_pgno, DB_LOCK_READ, 0, &metalock)) != 0)
533 		goto err;
534 	if ((ret = __memp_fget(mpf, &base_pgno, ip, dbc->txn, 0, &meta)) != 0)
535 		goto err;
536 
537 	if (meta->magic != DB_HASHMAGIC &&
538 	    (meta->magic != DB_BTREEMAGIC || F_ISSET(meta, BTM_RECNO))) {
539 		ret = USR_ERR(env, EINVAL);
540 		__db_errx(env, DB_STR("0650",
541 	    "Partitioning may only specified on BTREE and HASH databases."));
542 		goto err;
543 	}
544 	if (!FLD_ISSET(meta->metaflags,
545 	    DBMETA_PART_RANGE | DBMETA_PART_CALLBACK)) {
546 		ret = USR_ERR(env, EINVAL);
547 		__db_errx(env, DB_STR("0651",
548 		    "Partitioning specified on a non-partitioned database."));
549 		goto err;
550 	}
551 
552 	if ((F_ISSET(part, PART_RANGE) &&
553 	    FLD_ISSET(meta->metaflags, DBMETA_PART_CALLBACK)) ||
554 	    (F_ISSET(part, PART_CALLBACK) &&
555 	    FLD_ISSET(meta->metaflags, DBMETA_PART_RANGE))) {
556 		ret = USR_ERR(env, EINVAL);
557 		__db_errx(env, DB_STR("0652",
558 		    "Incompatible partitioning specified."));
559 		goto err;
560 	}
561 
562 	if (FLD_ISSET(meta->metaflags, DBMETA_PART_CALLBACK) &&
563 	     part->callback == NULL && !IS_RECOVERING(env) &&
564 	     !F_ISSET(dbp, DB_AM_RECOVER) && !LF_ISSET(DB_RDWRMASTER)) {
565 		ret = USR_ERR(env, EINVAL);
566 		__db_errx(env, DB_STR("0653",
567 		    "Partition callback not specified."));
568 		goto err;
569 	}
570 
571 	if (F_ISSET(dbp, DB_AM_RECNUM)) {
572 		ret = USR_ERR(env, EINVAL);
573 		__db_errx(env, DB_STR("0654",
574 	    "Record numbers are not supported in partitioned databases."));
575 		goto err;
576 	}
577 
578 	if (part->nparts == 0) {
579 		if (meta->nparts == 0) {
580 			ret = USR_ERR(env, EINVAL);
581 			__db_errx(env, DB_STR("0655",
582 			    "Zero paritions specified."));
583 			goto err;
584 		} else
585 			part->nparts = meta->nparts;
586 	} else if (meta->nparts != 0 && part->nparts != meta->nparts) {
587 		ret = USR_ERR(env, EINVAL);
588 		__db_errx(env, DB_STR("0656",
589 		    "Number of partitions does not match."));
590 		goto err;
591 	}
592 	/*
593 	 * There is no limit on the number of partitions, but I cannot imagine a real
594 	 * database having more than 10000.
595 	 */
596 	if (meta->nparts > 10000) {
597 		ret = USR_ERR(env, EINVAL);
598 		__db_errx(env, DB_STR_A("5553",
599 			"Too many partitions %lu", "%lu"), meta->nparts);
600 		goto err;
601 	}
602 
603 	if (meta->magic == DB_HASHMAGIC) {
604 		if (!F_ISSET(part, PART_CALLBACK)) {
605 			ret = USR_ERR(env, EINVAL);
606 			__db_errx(env, DB_STR("0657",
607 		    "Hash database must specify a partition callback."));
608 		}
609 	} else if (meta->magic != DB_BTREEMAGIC) {
610 		ret = USR_ERR(env, EINVAL);
611 		__db_errx(env, DB_STR("0658",
612 		    "Partitioning only supported on BTREE and HASH."));
613 	} else {
614 		set_keys = 1;
615 		pgsize = meta->pagesize;
616 	}
617 
618 err:	/* Put the metadata page back. */
619 	if (meta != NULL && (t_ret = __memp_fput(mpf,
620 	    ip, meta, dbc->priority)) != 0 && ret == 0)
621 		ret = t_ret;
622 	if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
623 		ret = t_ret;
624 
625 	/*
626 	 * We can only call __partition_setup_keys after putting
627 	 * the meta page and releasing the meta lock, or self-deadlock
628 	 * will occur.
629 	 */
630 	if (ret == 0 && set_keys && (t_ret =
631 	    __partition_setup_keys(dbc, part, pgsize, flags)) != 0)
632 		ret = t_ret;
633 
634 	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
635 		ret = t_ret;
636 
637 	dbp->p_internal = part;
638 	return (ret);
639 }
640 
641 /*
642  * Support for sorting keys.  Keys must be sorted using the btree
643  * compare function so if we call qsort in __partition_setup_keys
644  * we use this structure to pass the DBP and compare function.
645  */
646 struct key_sort {
647 	DB *dbp;
648 	DBT *key;
649 	int (*compare) __P((DB *, const DBT *, const DBT *, size_t *));
650 };
651 
__part_key_cmp(a,b)652 static int __part_key_cmp(a, b)
653 	const void *a, *b;
654 {
655 	const struct key_sort *ka, *kb;
656 
657 	ka = a;
658 	kb = b;
659 	return (ka->compare(ka->dbp, ka->key, kb->key, NULL));
660 }
661 /*
662  * __partition_setup_keys --
663  *	Get the partition keys into memory, or put them to disk if we
664  * are creating a partitioned database.
665  */
666 static int
__partition_setup_keys(dbc,part,pgsize,flags)667 __partition_setup_keys(dbc, part, pgsize, flags)
668 	DBC *dbc;
669 	DB_PARTITION *part;
670 	u_int32_t flags, pgsize;
671 {
672 	BTREE *t;
673 	DB *dbp;
674 	DBT data, key, *keys, *kp, *okp;
675 	ENV *env;
676 	db_pgno_t last_pgno;
677 	u_int32_t cgetflags, i, j;
678 	size_t dsize;
679 	struct key_sort *ks;
680 	int have_keys, ret, t_ret;
681 	int (*compare) __P((DB *, const DBT *, const DBT *, size_t *));
682 
683 	memset(&data, 0, sizeof(data));
684 	memset(&key, 0, sizeof(key));
685 	ks = NULL;
686 
687 	dbp = dbc->dbp;
688 	env = dbp->env;
689 
690 	/* Need to just read the main database. */
691 	dbp->p_internal = NULL;
692 	have_keys = 0;
693 	dsize = 0;
694 
695 	keys = part->keys;
696 
697 	/* First verify that things what we expect. */
698 	if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != 0) {
699 		if (ret != DB_NOTFOUND)
700 			goto err;
701 		if (F_ISSET(part, PART_CALLBACK)) {
702 			ret = 0;
703 			goto done;
704 		}
705 		if (!LF_ISSET(DB_CREATE) && !F_ISSET(dbp, DB_AM_RECOVER) &&
706 		    !LF_ISSET(DB_RDWRMASTER)) {
707 			ret = USR_ERR(env, EINVAL);
708 			__db_errx(env, DB_STR("0659", "No range keys found."));
709 			goto err;
710 		}
711 	} else {
712 		if (F_ISSET(part, PART_CALLBACK)) {
713 			ret = USR_ERR(env, EINVAL);
714 			__db_errx(env, DB_STR("0660",
715 			    "Keys found and callback set."));
716 			goto err;
717 		}
718 		if (key.size != 0) {
719 			ret = USR_ERR(env, EINVAL);
720 			__db_errx(env, DB_STR("0661",
721 			    "Partition key 0 is not empty."));
722 			goto err;
723 		}
724 		have_keys = 1;
725 	}
726 
727 	if (LF_ISSET(DB_CREATE) && have_keys == 0) {
728 		/*
729 		 * Insert the keys into the master database.  We will also
730 		 * compute the total size of the keys for later use.
731 		 */
732 		for (i = 0; i < part->nparts - 1; i++) {
733 			if ((ret = __db_put(dbp, dbc->thread_info,
734 			    dbc->txn, &part->keys[i], &data, 0)) != 0)
735 				    goto err;
736 			dsize += part->keys[i].size;
737 		}
738 
739 		/*
740 		 * Insert the "0" pointer.  All records less than the first
741 		 * given key go into this partition.  We must use the default
742 		 * compare to insert this key, otherwise it might not be first.
743 		 */
744 		t = dbc->dbp->bt_internal;
745 		compare = t->bt_compare;
746 		t->bt_compare = __dbt_defcmp;
747 		memset(&key, 0, sizeof(key));
748 		ret = __db_put(dbp, dbc->thread_info, dbc->txn, &key, &data, 0);
749 		t->bt_compare = compare;
750 		if (ret != 0)
751 		    goto err;
752 	}
753 done:	if (F_ISSET(part, PART_RANGE)) {
754 		/*
755 		 * If we just did the insert, we have known the total size of
756 		 * the keys. Otherwise, the keys must have been in the database,
757 		 * and we can calculate the size by checking the last pgno of
758 		 * the corresponding mpoolfile.
759 		 *
760 		 * We make the size aligned at 1024 for performance.
761 		 */
762 		if (dsize == 0) {
763 			ret = __memp_get_last_pgno(dbp->mpf, &last_pgno);
764 			if (ret != 0)
765 				goto err;
766 			if (last_pgno > 1)
767 				last_pgno--;
768 			dsize = last_pgno * pgsize;
769 		}
770 		dsize = DB_ALIGN(dsize, 1024);
771 
772 		if ((ret = __os_malloc(env,
773 		    dsize + (sizeof(DBT) * part->nparts),
774 		    &part->data)) != 0) {
775 			__db_errx(env, ALLOC_ERR, (int)dsize);
776 			goto err;
777 		}
778 		memset(part->data, 0,
779 		    dsize + (sizeof(DBT) * part->nparts));
780 
781 		kp = okp = (DBT *)
782 		    ((u_int8_t *)part->data + dsize);
783 		memset(&key, 0, sizeof(key));
784 		memset(&data, 0, sizeof(data));
785 		data.flags = DB_DBT_USERMEM;
786 		j = 0;
787 		cgetflags = DB_FIRST;
788 		while ((ret = __dbc_get(dbc, &key, &data, cgetflags)) == 0) {
789 			 /* It is an error if we get more keys than expect. */
790 			if ((u_int32_t)(kp - okp) > part->nparts) {
791 				ret = USR_ERR(env, EINVAL);
792 				goto err;
793 			}
794 			kp->size = key.size;
795 			kp->data = (u_int8_t *)part->data + j;
796 			/* It is an error if the keys overflow the space. */
797 			if (j + kp->size > dsize) {
798 				ret = USR_ERR(env, EINVAL);
799 				goto err;
800 			}
801 			memcpy(kp->data, key.data, kp->size);
802 			j += kp->size;
803 			cgetflags = DB_NEXT;
804 			kp++;
805 		}
806 
807 		/*
808 		 * We should get part->nparts keys back, otherwise it means
809 		 * the passed-in keys are not valid.
810 		 */
811 		if (ret == DB_NOTFOUND && (u_int32_t)(kp - okp) == part->nparts)
812 			ret = 0;
813 
814 		if (ret == 0) {
815 			/*
816 			 * They passed in keys, they must match.
817 			 */
818 			compare = NULL;
819 			if (have_keys == 1 && keys != NULL) {
820 				t = dbc->dbp->bt_internal;
821 				compare = t->bt_compare;
822 				if ((ret = __os_malloc(env, (part->nparts - 1)
823 				     * sizeof(struct key_sort), &ks)) != 0)
824 					goto err;
825 				for (j = 0; j < part->nparts - 1; j++) {
826 					ks[j].dbp = dbc->dbp;
827 					ks[j].compare = compare;
828 					ks[j].key = &keys[j];
829 				}
830 
831 				qsort(ks, (size_t)part->nparts - 1,
832 				    sizeof(struct key_sort), __part_key_cmp);
833 			}
834 			part->keys = (DBT *)
835 			    ((u_int8_t *)part->data + dsize);
836 			F_SET(part, PART_KEYS_SETUP);
837 			j = 0;
838 			for (kp = part->keys;
839 			    kp < &part->keys[part->nparts]; kp++, j++) {
840 				if (have_keys == 1 && keys != NULL && j != 0 &&
841 				    compare(dbc->dbp, ks[j - 1].key,
842 				    kp, NULL) != 0) {
843 					if (kp->data == NULL &&
844 					    F_ISSET(dbp, DB_AM_RECOVER))
845 						goto err;
846 					ret = USR_ERR(env, EINVAL);
847 					__db_errx(env, DB_STR_A("0662",
848 					    "Partition key %d does not match",
849 					    "%d"), j);
850 					goto err;
851 				}
852 			}
853 		}
854 	}
855 	if (ret == DB_NOTFOUND && F_ISSET(dbp, DB_AM_RECOVER))
856 		ret = 0;
857 
858 err:	dbp->p_internal = part;
859 	if (ks != NULL)
860 		__os_free(env, ks);
861 
862 	/*
863 	 * We only free the original copy of the key array when
864 	 * the keys have been setup properly, otherwise we let
865 	 * the close function to free the memory.
866 	 */
867 	if (keys != NULL && F_ISSET(part, PART_KEYS_SETUP)) {
868 		for (i = 0; i < part->nparts - 1; i++)
869 			/*
870 			 * Always free all entries in the key array and return
871 			 * the first error code.
872 			 */
873 			if ((t_ret = __db_dbt_clone_free(env,
874 			    &keys[i])) != 0 && ret == 0)
875 				ret = t_ret;
876 		__os_free(env, keys);
877 	}
878 
879 	return (ret);
880 }
881 
882 /*
883  * __partition_get_callback --
884  *	Get the partition callback function.
885  * PUBLIC: int __partition_get_callback __P((DB *,
886  * PUBLIC:	 u_int32_t *, u_int32_t (**callback)(DB *, DBT *key)));
887  */
888 int
__partition_get_callback(dbp,parts,callback)889 __partition_get_callback(dbp, parts, callback)
890 	DB *dbp;
891 	u_int32_t *parts;
892 	u_int32_t (**callback)(DB *, DBT *key);
893 {
894 	DB_PARTITION *part;
895 
896 	part = dbp->p_internal;
897 	/* Only return populated results if partitioned using callbacks. */
898 	if (part != NULL && !F_ISSET(part, PART_CALLBACK))
899 		part = NULL;
900 	if (parts != NULL)
901 		*parts = (part != NULL ? part->nparts : 0);
902 	if (callback != NULL)
903 		*callback = (part != NULL ? part->callback : NULL);
904 
905 	return (0);
906 }
907 
908 /*
909  * __partition_get_keys --
910  *	Get partition keys.
911  * PUBLIC: int __partition_get_keys __P((DB *, u_int32_t *, DBT **));
912  */
913 int
__partition_get_keys(dbp,parts,keys)914 __partition_get_keys(dbp, parts, keys)
915 	DB *dbp;
916 	u_int32_t *parts;
917 	DBT **keys;
918 {
919 	DB_PARTITION *part;
920 
921 	part = dbp->p_internal;
922 	/* Only return populated results if partitioned using ranges. */
923 	if (part != NULL && !F_ISSET(part, PART_RANGE))
924 		part = NULL;
925 	if (parts != NULL)
926 		*parts = (part != NULL ? part->nparts : 0);
927 	if (keys != NULL)
928 		*keys = (part != NULL ? &part->keys[1] : NULL);
929 
930 	return (0);
931 }
932 
933 /*
934  * __partition_get_dirs --
935  *	Get partition dirs.
936  * PUBLIC: int __partition_get_dirs __P((DB *, const char ***));
937  */
938 int
__partition_get_dirs(dbp,dirpp)939 __partition_get_dirs(dbp, dirpp)
940 	DB *dbp;
941 	const char ***dirpp;
942 {
943 	DB_PARTITION *part;
944 	ENV *env;
945 	u_int32_t i;
946 	int ret;
947 
948 	env = dbp->env;
949 	if ((part = dbp->p_internal) == NULL) {
950 		*dirpp = NULL;
951 		return (0);
952 	}
953 	if (!F_ISSET(dbp, DB_AM_OPEN_CALLED)) {
954 		*dirpp = part->dirs;
955 		return (0);
956 	}
957 
958 	/*
959 	 * We build a list once when asked.  The original directory list,
960 	 * if any, was discarded at open time.
961 	 */
962 	if ((*dirpp = part->dirs) != NULL)
963 		return (0);
964 
965 	if ((ret = __os_calloc(env,
966 	    sizeof(char *), part->nparts + 1, (void *) &part->dirs)) != 0)
967 		return (ret);
968 
969 	for (i = 0; i < part->nparts; i++)
970 		part->dirs[i] = part->handles[i]->dirname;
971 
972 	*dirpp = part->dirs;
973 	return (0);
974 }
975 
976 /*
977  * __partc_init --
978  *	Initialize the access private portion of a cursor
979  *
980  * PUBLIC: int __partc_init __P((DBC *));
981  */
982 int
__partc_init(dbc)983 __partc_init(dbc)
984 	DBC *dbc;
985 {
986 	ENV *env;
987 	int ret;
988 
989 	env = dbc->env;
990 
991 	/* Allocate/initialize the internal structure. */
992 	if (dbc->internal == NULL && (ret =
993 	    __os_calloc(env, 1, sizeof(PART_CURSOR), &dbc->internal)) != 0)
994 		return (ret);
995 
996 	/* Initialize methods. */
997 	dbc->close = dbc->c_close = __dbc_close_pp;
998 	dbc->cmp = __dbc_cmp_pp;
999 	dbc->count = dbc->c_count = __dbc_count_pp;
1000 	dbc->del = dbc->c_del = __dbc_del_pp;
1001 	dbc->dup = dbc->c_dup = __dbc_dup_pp;
1002 	dbc->get = dbc->c_get = __partc_get_pp;
1003 	dbc->pget = dbc->c_pget = __dbc_pget_pp;
1004 	dbc->put = dbc->c_put = __dbc_put_pp;
1005 	dbc->am_bulk = NULL;
1006 	dbc->am_close = __partc_close;
1007 	dbc->am_del = __partc_del;
1008 	dbc->am_destroy = __partc_destroy;
1009 	dbc->am_get = NULL;
1010 	dbc->am_put = __partc_put;
1011 	dbc->am_writelock = __partc_writelock;
1012 
1013 	/* We avoid swapping partition cursors since we swap the sub cursors */
1014 	F_SET(dbc, DBC_PARTITIONED);
1015 
1016 	return (0);
1017 }
1018 /*
1019  * __partc_get_pp --
1020  *	cursor get opeartion on a partitioned database.
1021  */
1022 static int
__partc_get_pp(dbc,key,data,flags)1023 __partc_get_pp(dbc, key, data, flags)
1024 	DBC *dbc;
1025 	DBT *key, *data;
1026 	u_int32_t flags;
1027 {
1028 	DB *dbp;
1029 	DB_THREAD_INFO *ip;
1030 	ENV *env;
1031 	int ignore_lease, ret;
1032 
1033 	dbp = dbc->dbp;
1034 	env = dbp->env;
1035 
1036 	ignore_lease = LF_ISSET(DB_IGNORE_LEASE) ? 1 : 0;
1037 	LF_CLR(DB_IGNORE_LEASE);
1038 	if ((ret = __dbc_get_arg(dbc, key, data, flags)) != 0)
1039 		return (ret);
1040 
1041 	ENV_ENTER(env, ip);
1042 
1043 	DEBUG_LREAD(dbc, dbc->txn, "DBcursor->get",
1044 	    flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
1045 
1046 	ret = __partc_get(dbc, key, data, flags);
1047 	/*
1048 	 * Check for master leases.
1049 	 */
1050 	if (ret == 0 &&
1051 	    IS_REP_MASTER(env) && IS_USING_LEASES(env) && !ignore_lease)
1052 		ret = __rep_lease_check(env, 1);
1053 
1054 	ENV_LEAVE(env, ip);
1055 	__dbt_userfree(env, key, NULL, data);
1056 	return (ret);
1057 }
1058 /*
1059  * __partition_get --
1060  *	cursor get operation on a partitioned database.
1061  *
1062  * PUBLIC: int __partc_get __P((DBC*, DBT *, DBT *, u_int32_t));
1063  */
1064 int
__partc_get(dbc,key,data,flags)1065 __partc_get(dbc, key, data, flags)
1066 	DBC *dbc;
1067 	DBT *key, *data;
1068 	u_int32_t flags;
1069 {
1070 	DB *dbp;
1071 	DBC *orig_dbc, *new_dbc;
1072 	DB_PARTITION *part;
1073 	PART_CURSOR *cp;
1074 	u_int32_t multi, part_id;
1075 	int ret, retry, search;
1076 
1077 	dbp = dbc->dbp;
1078 	cp = (PART_CURSOR*)dbc->internal;
1079 	orig_dbc = cp->sub_cursor;
1080 	part = dbp->p_internal;
1081 
1082 	new_dbc = NULL;
1083 	retry = search = 0;
1084 	part_id = cp->part_id;
1085 	multi = flags & ~DB_OPFLAGS_MASK;
1086 
1087 	switch (flags & DB_OPFLAGS_MASK) {
1088 	case DB_CURRENT:
1089 		break;
1090 	case DB_FIRST:
1091 		part_id = 0;
1092 		retry = 1;
1093 		break;
1094 	case DB_GET_BOTH:
1095 	case DB_GET_BOTHC:
1096 	case DB_GET_BOTH_RANGE:
1097 		search = 1;
1098 		break;
1099 	case DB_SET_RANGE:
1100 		search = 1;
1101 		retry = 1;
1102 		break;
1103 	case DB_LAST:
1104 		part_id = part->nparts - 1;
1105 		retry = 1;
1106 		break;
1107 	case DB_NEXT:
1108 	case DB_NEXT_NODUP:
1109 		if (orig_dbc == NULL)
1110 			part_id = 0;
1111 		else
1112 			part_id = cp->part_id;
1113 		retry = 1;
1114 		break;
1115 	case DB_NEXT_DUP:
1116 		break;
1117 	case DB_PREV:
1118 	case DB_PREV_NODUP:
1119 		if (orig_dbc == NULL)
1120 			part_id = part->nparts - 1;
1121 		else
1122 			part_id = cp->part_id;
1123 		retry = 1;
1124 		break;
1125 	case DB_PREV_DUP:
1126 		break;
1127 	case DB_SET:
1128 		search = 1;
1129 		break;
1130 	default:
1131 		return (__db_unknown_flag(dbp->env, "__partc_get", flags));
1132 	}
1133 
1134 	/*
1135 	 * If we need to find the partition to start on, then
1136 	 * do a binary search of the in memory partition table.
1137 	 */
1138 	if (search == 1 && F_ISSET(part, PART_CALLBACK))
1139 		part_id = part->callback(dbp, key) % part->nparts;
1140 	else if (search == 1)
1141 		__part_search(dbp, part, key, &part_id);
1142 
1143 	/* Get a new cursor if necessary */
1144 	if (orig_dbc == NULL || cp->part_id != part_id) {
1145 		GET_PART_CURSOR(dbc, new_dbc, part_id);
1146 	} else
1147 		new_dbc = orig_dbc;
1148 
1149 	while ((ret = __dbc_get(new_dbc,
1150 	    key, data, flags)) == DB_NOTFOUND && retry == 1) {
1151 		switch (flags & DB_OPFLAGS_MASK) {
1152 		case DB_FIRST:
1153 		case DB_NEXT:
1154 		case DB_NEXT_NODUP:
1155 		case DB_SET_RANGE:
1156 			if (++part_id < part->nparts) {
1157 				flags = DB_FIRST | multi;
1158 				break;
1159 			}
1160 			goto err;
1161 		case DB_LAST:
1162 		case DB_PREV:
1163 		case DB_PREV_NODUP:
1164 			if (part_id-- > 0) {
1165 				flags = DB_LAST | multi;
1166 				break;
1167 			}
1168 			goto err;
1169 		default:
1170 			goto err;
1171 		}
1172 
1173 		if (new_dbc != orig_dbc && (ret = __dbc_close(new_dbc)) != 0)
1174 			goto err;
1175 		GET_PART_CURSOR(dbc, new_dbc, part_id);
1176 	}
1177 
1178 	if (ret != 0)
1179 		goto err;
1180 
1181 	/* Success: swap original and new cursors. */
1182 	if (new_dbc != orig_dbc) {
1183 		if (orig_dbc != NULL) {
1184 			cp->sub_cursor = NULL;
1185 			if ((ret = __dbc_close(orig_dbc)) != 0)
1186 				goto err;
1187 		}
1188 		cp->sub_cursor = new_dbc;
1189 		cp->part_id = part_id;
1190 	}
1191 
1192 	return (0);
1193 
1194 err:	if (new_dbc != NULL && new_dbc != orig_dbc)
1195 		(void)__dbc_close(new_dbc);
1196 	return (ret);
1197 }
1198 
1199 /*
1200  * __partc_put --
1201  *	cursor put opeartion on a partitioned cursor.
1202  *
1203  */
1204 static int
__partc_put(dbc,key,data,flags,pgnop)1205 __partc_put(dbc, key, data, flags, pgnop)
1206 	DBC *dbc;
1207 	DBT *key, *data;
1208 	u_int32_t flags;
1209 	db_pgno_t *pgnop;
1210 {
1211 	DB *dbp;
1212 	DB_PARTITION *part;
1213 	DBC *new_dbc;
1214 	PART_CURSOR *cp;
1215 	u_int32_t part_id;
1216 	int ret;
1217 
1218 	dbp = dbc->dbp;
1219 	cp = (PART_CURSOR*)dbc->internal;
1220 	part_id = cp->part_id;
1221 	part = dbp->p_internal;
1222 	*pgnop = PGNO_INVALID;
1223 
1224 	switch (flags) {
1225 	case DB_KEYFIRST:
1226 	case DB_KEYLAST:
1227 	case DB_NODUPDATA:
1228 	case DB_NOOVERWRITE:
1229 	case DB_OVERWRITE_DUP:
1230 		if (F_ISSET(part, PART_CALLBACK)) {
1231 			part_id = part->callback(dbp, key) % part->nparts;
1232 			break;
1233 		}
1234 		__part_search(dbp, part, key, &part_id);
1235 		break;
1236 	default:
1237 		break;
1238 	}
1239 
1240 	if ((new_dbc = cp->sub_cursor) == NULL || cp->part_id != part_id) {
1241 		if ((ret = __db_cursor_int(part->handles[part_id],
1242 		    dbc->thread_info, dbc->txn, part->handles[part_id]->type,
1243 		    PGNO_INVALID, 0, dbc->locker, &new_dbc)) != 0)
1244 			goto err;
1245 	}
1246 
1247 	if (F_ISSET(dbc, DBC_WRITER | DBC_WRITECURSOR))
1248 		F_SET(new_dbc, DBC_WRITER);
1249 	if ((ret = __dbc_put(new_dbc, key, data, flags)) != 0)
1250 		goto err;
1251 
1252 	if (new_dbc != cp->sub_cursor) {
1253 		if (cp->sub_cursor != NULL) {
1254 			if ((ret = __dbc_close(cp->sub_cursor)) != 0)
1255 				goto err;
1256 			cp->sub_cursor = NULL;
1257 		}
1258 		cp->sub_cursor = new_dbc;
1259 		cp->part_id = part_id;
1260 	}
1261 
1262 	return (0);
1263 
1264 err:	if (new_dbc != NULL && cp->sub_cursor != new_dbc)
1265 		(void)__dbc_close(new_dbc);
1266 	return (ret);
1267 }
1268 
1269 /*
1270  * __partc_del
1271  *	Delete interface to partitioned cursors.
1272  *
1273  */
1274 static int
__partc_del(dbc,flags)1275 __partc_del(dbc, flags)
1276 	DBC *dbc;
1277 	u_int32_t flags;
1278 {
1279 	PART_CURSOR *cp;
1280 	cp = (PART_CURSOR*)dbc->internal;
1281 
1282 	if (F_ISSET(dbc, DBC_WRITER | DBC_WRITECURSOR))
1283 		F_SET(cp->sub_cursor, DBC_WRITER);
1284 	return (__dbc_del(cp->sub_cursor, flags));
1285 }
1286 
1287 /*
1288  * __partc_writelock
1289  *	Writelock interface to partitioned cursors.
1290  *
1291  */
1292 static int
__partc_writelock(dbc)1293 __partc_writelock(dbc)
1294 	DBC *dbc;
1295 {
1296 	PART_CURSOR *cp;
1297 	cp = (PART_CURSOR*)dbc->internal;
1298 
1299 	return (cp->sub_cursor->am_writelock(cp->sub_cursor));
1300 }
1301 
1302 /*
1303  * __partc_close
1304  *	Close interface to partitioned cursors.
1305  *
1306  */
1307 static int
__partc_close(dbc,root_pgno,rmroot)1308 __partc_close(dbc, root_pgno, rmroot)
1309 	DBC *dbc;
1310 	db_pgno_t root_pgno;
1311 	int *rmroot;
1312 {
1313 	PART_CURSOR *cp;
1314 	int ret;
1315 
1316 	COMPQUIET(root_pgno, 0);
1317 	COMPQUIET(rmroot, NULL);
1318 
1319 	cp = (PART_CURSOR*)dbc->internal;
1320 
1321 	if (cp->sub_cursor == NULL)
1322 		return (0);
1323 	ret = __dbc_close(cp->sub_cursor);
1324 	cp->sub_cursor = NULL;
1325 	return (ret);
1326 }
1327 
1328 /*
1329  * __partc_destroy --
1330  *	Destroy a single cursor.
1331  */
1332 static int
__partc_destroy(dbc)1333 __partc_destroy(dbc)
1334 	DBC *dbc;
1335 {
1336 	PART_CURSOR *cp;
1337 	ENV *env;
1338 
1339 	cp = (PART_CURSOR *)dbc->internal;
1340 	env = dbc->env;
1341 
1342 	/* Discard the structure. Don't recurse. */
1343 	__os_free(env, cp);
1344 
1345 	return (0);
1346 }
1347 
1348 /*
1349  * __partition_close
1350  *	Close a partitioned database.
1351  *
1352  * PUBLIC: int __partition_close __P((DB *, DB_TXN *, u_int32_t));
1353  */
1354 int
__partition_close(dbp,txn,flags)1355 __partition_close(dbp, txn, flags)
1356 	DB *dbp;
1357 	DB_TXN *txn;
1358 	u_int32_t flags;
1359 {
1360 	DB **pdbp;
1361 	DB_PARTITION *part;
1362 	ENV *env;
1363 	u_int32_t i;
1364 	int ret, t_ret;
1365 
1366 	if ((part = dbp->p_internal) == NULL)
1367 		return (0);
1368 
1369 	env = dbp->env;
1370 	ret = 0;
1371 
1372 	if ((pdbp = part->handles) != NULL) {
1373 		for (i = 0; i < part->nparts; i++, pdbp++)
1374 			if (*pdbp != NULL && (t_ret =
1375 			    __db_close(*pdbp, txn, flags)) != 0 && ret == 0)
1376 				ret = t_ret;
1377 		__os_free(env, part->handles);
1378 	}
1379 	if (!F_ISSET(part, PART_KEYS_SETUP) && part->keys != NULL) {
1380 		for (i = 0; i < part->nparts - 1; i++) {
1381 			if (part->keys[i].data != NULL && (t_ret =
1382 			    __db_dbt_clone_free(env, &part->keys[i])) != 0 &&
1383 			    ret == 0)
1384 				ret = t_ret;
1385 		}
1386 		__os_free(env, part->keys);
1387 	}
1388 	if (part->dirs != NULL)
1389 		__os_free(env, (char **)part->dirs);
1390 	if (part->data != NULL)
1391 		__os_free(env, (char **)part->data);
1392 	__os_free(env, part);
1393 	dbp->p_internal = NULL;
1394 
1395 	return (ret);
1396 }
1397 
1398 /*
1399  * __partition_sync
1400  *	Sync a partitioned database.
1401  *
1402  * PUBLIC: int __partition_sync __P((DB *));
1403  */
1404 int
__partition_sync(dbp)1405 __partition_sync(dbp)
1406 	DB *dbp;
1407 {
1408 	DB **pdbp;
1409 	DB_PARTITION *part;
1410 	u_int32_t i;
1411 	int ret, t_ret;
1412 
1413 	ret = 0;
1414 	part = dbp->p_internal;
1415 
1416 	if ((pdbp = part->handles) != NULL) {
1417 		for (i = 0; i < part->nparts; i++, pdbp++)
1418 			if (*pdbp != NULL &&
1419 			    F_ISSET(*pdbp, DB_AM_OPEN_CALLED) && (t_ret =
1420 			    __memp_fsync((*pdbp)->mpf)) != 0 && ret == 0)
1421 				ret = t_ret;
1422 	}
1423 	if ((t_ret = __memp_fsync(dbp->mpf)) != 0 && ret == 0)
1424 		ret = t_ret;
1425 
1426 	return (ret);
1427 }
1428 
1429 /*
1430  * __partition_stat
1431  *	Stat a partitioned database.
1432  *
1433  * PUBLIC: int __partition_stat __P((DBC *, void *, u_int32_t));
1434  */
1435 int
__partition_stat(dbc,spp,flags)1436 __partition_stat(dbc, spp, flags)
1437 	DBC *dbc;
1438 	void *spp;
1439 	u_int32_t flags;
1440 {
1441 	DB *dbp, **pdbp;
1442 	DB_BTREE_STAT *fsp, *bsp;
1443 #ifdef HAVE_HASH
1444 	DB_HASH_STAT *hfsp, *hsp;
1445 #endif
1446 	DB_PARTITION *part;
1447 	DBC *new_dbc;
1448 	ENV *env;
1449 	u_int32_t i;
1450 	int ret;
1451 
1452 	dbp = dbc->dbp;
1453 	part = dbp->p_internal;
1454 	env = dbp->env;
1455 	fsp = NULL;
1456 #ifdef HAVE_HASH
1457 	hfsp = NULL;
1458 #endif
1459 
1460 	pdbp = part->handles;
1461 	for (i = 0; i < part->nparts; i++, pdbp++) {
1462 		if ((ret = __db_cursor_int(*pdbp, dbc->thread_info, dbc->txn,
1463 		    (*pdbp)->type, PGNO_INVALID,
1464 		    0, dbc->locker, &new_dbc)) != 0)
1465 			goto err;
1466 		switch (new_dbc->dbtype) {
1467 		case DB_BTREE:
1468 			if ((ret = __bam_stat(new_dbc, &bsp, flags)) != 0)
1469 				goto err;
1470 			if (fsp == NULL) {
1471 				fsp = bsp;
1472 				*(DB_BTREE_STAT **)spp = fsp;
1473 			} else {
1474 				fsp->bt_nkeys += bsp->bt_nkeys;
1475 				fsp->bt_ndata += bsp->bt_ndata;
1476 				fsp->bt_pagecnt += bsp->bt_pagecnt;
1477 				if (fsp->bt_levels < bsp->bt_levels)
1478 					fsp->bt_levels = bsp->bt_levels;
1479 				fsp->bt_int_pg += bsp->bt_int_pg;
1480 				fsp->bt_leaf_pg += bsp->bt_leaf_pg;
1481 				fsp->bt_dup_pg += bsp->bt_dup_pg;
1482 				fsp->bt_over_pg += bsp->bt_over_pg;
1483 				fsp->bt_free += bsp->bt_free;
1484 				fsp->bt_int_pgfree += bsp->bt_int_pgfree;
1485 				fsp->bt_leaf_pgfree += bsp->bt_leaf_pgfree;
1486 				fsp->bt_dup_pgfree += bsp->bt_dup_pgfree;
1487 				fsp->bt_over_pgfree += bsp->bt_over_pgfree;
1488 				__os_ufree(env, bsp);
1489 			}
1490 			break;
1491 #ifdef HAVE_HASH
1492 		case DB_HASH:
1493 			if ((ret = __ham_stat(new_dbc, &hsp, flags)) != 0)
1494 				goto err;
1495 			if (hfsp == NULL) {
1496 				hfsp = hsp;
1497 				*(DB_HASH_STAT **)spp = hfsp;
1498 			} else {
1499 				hfsp->hash_nkeys += hsp->hash_nkeys;
1500 				hfsp->hash_ndata += hsp->hash_ndata;
1501 				hfsp->hash_pagecnt += hsp->hash_pagecnt;
1502 				hfsp->hash_ffactor += hsp->hash_ffactor;
1503 				hfsp->hash_buckets += hsp->hash_buckets;
1504 				hfsp->hash_free += hsp->hash_free;
1505 				hfsp->hash_bfree += hsp->hash_bfree;
1506 				hfsp->hash_bigpages += hsp->hash_bigpages;
1507 				hfsp->hash_big_bfree += hsp->hash_big_bfree;
1508 				hfsp->hash_overflows += hsp->hash_overflows;
1509 				hfsp->hash_ovfl_free += hsp->hash_ovfl_free;
1510 				hfsp->hash_dup += hsp->hash_dup;
1511 				hfsp->hash_dup_free += hsp->hash_dup_free;
1512 				__os_ufree(env, hsp);
1513 			}
1514 			break;
1515 #endif
1516 		default:
1517 			break;
1518 		}
1519 		if ((ret = __dbc_close(new_dbc)) != 0)
1520 			goto err;
1521 	}
1522 	return (0);
1523 
1524 err:
1525 	if (fsp != NULL)
1526 		__os_ufree(env, fsp);
1527 	*(DB_BTREE_STAT **)spp = NULL;
1528 	return (ret);
1529 }
1530 
1531 /*
1532  * __part_truncate --
1533  *	Truncate a database.
1534  *
1535  * PUBLIC: int __part_truncate __P((DBC *, u_int32_t *));
1536  */
1537 int
__part_truncate(dbc,countp)1538 __part_truncate(dbc, countp)
1539 	DBC *dbc;
1540 	u_int32_t *countp;
1541 {
1542 	DB *dbp, **pdbp;
1543 	DB_PARTITION *part;
1544 	DBC *new_dbc;
1545 	u_int32_t count, i;
1546 	int ret, t_ret;
1547 
1548 	dbp = dbc->dbp;
1549 	part = dbp->p_internal;
1550 	pdbp = part->handles;
1551 	ret = 0;
1552 
1553 	if (countp != NULL)
1554 		*countp = 0;
1555 	for (i = 0; ret == 0 && i < part->nparts; i++, pdbp++) {
1556 		if ((ret = __db_cursor_int(*pdbp, dbc->thread_info, dbc->txn,
1557 		    (*pdbp)->type, PGNO_INVALID,
1558 		    0, dbc->locker, &new_dbc)) != 0)
1559 			break;
1560 		switch (dbp->type) {
1561 		case DB_BTREE:
1562 		case DB_RECNO:
1563 			ret = __bam_truncate(new_dbc, &count);
1564 			break;
1565 		case DB_HASH:
1566 #ifdef HAVE_HASH
1567 			ret = __ham_truncate(new_dbc, &count);
1568 			break;
1569 #endif
1570 		case DB_QUEUE:
1571 		case DB_UNKNOWN:
1572 		default:
1573 			ret = __db_unknown_type(dbp->env,
1574 			    "DB->truncate", dbp->type);
1575 			count = 0;
1576 			break;
1577 		}
1578 		if ((t_ret = __dbc_close(new_dbc)) != 0 && ret == 0)
1579 			ret = t_ret;
1580 		if (countp != NULL)
1581 			*countp += count;
1582 	}
1583 
1584 	return (ret);
1585 }
1586 /*
1587  * __part_compact -- compact a partitioned database.
1588  *
1589  * PUBLIC: int __part_compact __P((DB *, DB_THREAD_INFO *, DB_TXN *,
1590  * PUBLIC:     DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *));
1591  */
1592 int
__part_compact(dbp,ip,txn,start,stop,c_data,flags,end)1593 __part_compact(dbp, ip, txn, start, stop, c_data, flags, end)
1594 	DB *dbp;
1595 	DB_THREAD_INFO *ip;
1596 	DB_TXN *txn;
1597 	DBT *start, *stop;
1598 	DB_COMPACT *c_data;
1599 	u_int32_t flags;
1600 	DBT *end;
1601 {
1602 	DB **pdbp;
1603 	DB_PARTITION *part;
1604 	u_int32_t i;
1605 	int ret;
1606 
1607 	part = dbp->p_internal;
1608 	pdbp = part->handles;
1609 	ret = 0;
1610 
1611 	for (i = 0; ret == 0 && i < part->nparts; i++, pdbp++) {
1612 		switch (dbp->type) {
1613 		case DB_HASH:
1614 		case DB_BTREE:
1615 		case DB_RECNO:
1616 			ret = __db_compact_int(*pdbp,
1617 			     ip, txn, start, stop, c_data, flags, end);
1618 			break;
1619 
1620 		default:
1621 			ret = __dbh_am_chk(dbp, DB_OK_BTREE);
1622 			break;
1623 		}
1624 	}
1625 	return (ret);
1626 }
1627 
1628 /*
1629  * __part_lsn_reset --
1630  *	reset the lsns on each partition.
1631  *
1632  * PUBLIC: int __part_lsn_reset __P((DB *, DB_THREAD_INFO *));
1633  */
1634 int
__part_lsn_reset(dbp,ip)1635 __part_lsn_reset(dbp, ip)
1636 	DB *dbp;
1637 	DB_THREAD_INFO *ip;
1638 {
1639 	DB **pdbp;
1640 	DB_PARTITION *part;
1641 	u_int32_t i;
1642 	int ret;
1643 
1644 	part = dbp->p_internal;
1645 	pdbp = part->handles;
1646 	ret = 0;
1647 
1648 	for (i = 0; ret == 0 && i < part->nparts; i++, pdbp++)
1649 		ret = __db_lsn_reset((*pdbp)->mpf, ip);
1650 
1651 	return (ret);
1652 }
1653 
1654 /*
1655  * __part_fileid_reset --
1656  *	reset the fileid on each partition.
1657  *
1658  * PUBLIC: int __part_fileid_reset
1659  * PUBLIC:	 __P((ENV *, DB_THREAD_INFO *, const char *, u_int32_t, int));
1660  */
1661 int
__part_fileid_reset(env,ip,fname,nparts,encrypted)1662 __part_fileid_reset(env, ip, fname, nparts, encrypted)
1663 	ENV *env;
1664 	DB_THREAD_INFO *ip;
1665 	const char *fname;
1666 	u_int32_t nparts;
1667 	int encrypted;
1668 {
1669 	int ret;
1670 	u_int32_t part_id;
1671 	char *name, *sp;
1672 	const char *np;
1673 
1674 	if ((ret = __os_malloc(env,
1675 	     strlen(fname) + PART_LEN + 1, &name)) != 0) {
1676 		__db_errx(env, ALLOC_ERR,
1677 		    (int)(strlen(fname) + PART_LEN + 1));
1678 		return (ret);
1679 	}
1680 
1681 	sp = name;
1682 	np = __db_rpath(fname);
1683 	if (np == NULL)
1684 		np = fname;
1685 	else {
1686 		np++;
1687 		(void)strncpy(name, fname, (size_t)(np - fname));
1688 		sp = name + (np - fname);
1689 	}
1690 
1691 	for (part_id = 0; ret == 0 && part_id < nparts; part_id++) {
1692 		(void)sprintf(sp, PART_NAME, np, part_id);
1693 		ret = __env_fileid_reset(env, ip, sp, encrypted);
1694 	}
1695 
1696 	__os_free(env, name);
1697 	return (ret);
1698 }
1699 
1700 /*
1701  * __part_key_range --
1702  *	Return proportion of keys relative to given key.
1703  *
1704  * PUBLIC: int __part_key_range __P((DBC *, DBT *, DB_KEY_RANGE *, u_int32_t));
1705  */
1706 int
__part_key_range(dbc,dbt,kp,flags)1707 __part_key_range(dbc, dbt, kp, flags)
1708 	DBC *dbc;
1709 	DBT *dbt;
1710 	DB_KEY_RANGE *kp;
1711 	u_int32_t flags;
1712 {
1713 	BTREE_CURSOR *cp;
1714 	DBC *new_dbc;
1715 	DB_PARTITION *part;
1716 	PAGE *h;
1717 	u_int32_t id, part_id;
1718 	u_int32_t elems, empty, less_elems, my_elems, greater_elems;
1719 	u_int32_t levels, max_levels, my_levels;
1720 	db_pgno_t root_pgno;
1721 	int ret;
1722 	double total_elems;
1723 
1724 	COMPQUIET(flags, 0);
1725 
1726 	part = dbc->dbp->p_internal;
1727 
1728 	/*
1729 	 * First we find the key range for the partition that contains the
1730 	 * key.  Then we scale based on estimates of the other partitions.
1731 	 */
1732 	if (F_ISSET(part, PART_CALLBACK))
1733 		part_id = part->callback(dbc->dbp, dbt) % part->nparts;
1734 	else
1735 		__part_search(dbc->dbp, part, dbt, &part_id);
1736 	GET_PART_CURSOR(dbc, new_dbc, part_id);
1737 
1738 	if ((ret = __bam_key_range(new_dbc, dbt, kp, flags)) != 0)
1739 		goto err;
1740 
1741 	cp = (BTREE_CURSOR *)new_dbc->internal;
1742 
1743 	root_pgno = BAM_ROOT_PGNO(new_dbc);
1744 	if ((ret = __memp_fget(new_dbc->dbp->mpf, &root_pgno,
1745 	     new_dbc->thread_info, new_dbc->txn, 0, &h)) != 0)
1746 		goto c_err;
1747 
1748 	my_elems = NUM_ENT(h);
1749 	my_levels = LEVEL(h);
1750 	max_levels = my_levels;
1751 
1752 	if ((ret = __memp_fput(new_dbc->dbp->mpf,
1753 	     new_dbc->thread_info, h, new_dbc->priority)) != 0)
1754 		goto c_err;
1755 
1756 	if ((ret = __dbc_close(new_dbc)) != 0)
1757 		goto err;
1758 	/*
1759 	 * We have the range within one subtree.  Now estimate
1760 	 * what part of the whole range that subtree is.  Figure
1761 	 * out how many levels each part has and how many entries
1762 	 * in the level below the root.
1763 	 */
1764 	empty = less_elems = greater_elems = 0;
1765 	for (id = 0; id < part->nparts; id++) {
1766 		if (id == part_id) {
1767 			empty = 0;
1768 			continue;
1769 		}
1770 		GET_PART_CURSOR(dbc, new_dbc, id);
1771 		cp = (BTREE_CURSOR *)new_dbc->internal;
1772 		if ((ret = __memp_fget(new_dbc->dbp->mpf, &cp->root,
1773 		     new_dbc->thread_info, new_dbc->txn, 0, &h)) != 0)
1774 			goto c_err;
1775 
1776 		elems = NUM_ENT(h);
1777 		levels = LEVEL(h);
1778 		if (levels == 1)
1779 			elems /= 2;
1780 
1781 		if ((ret = __memp_fput(new_dbc->dbp->mpf,
1782 		     new_dbc->thread_info, h, new_dbc->priority)) != 0)
1783 			goto c_err;
1784 
1785 		if ((ret = __dbc_close(new_dbc)) != 0)
1786 			goto err;
1787 
1788 		/* If the tree is empty, ignore it. */
1789 		if (elems == 0) {
1790 			empty++;
1791 			continue;
1792 		}
1793 
1794 		/*
1795 		 * If a tree has fewer levels than the max just count
1796 		 * it as a single element in the higher level.
1797 		 */
1798 		if (id < part_id) {
1799 			if (levels > max_levels) {
1800 				max_levels = levels;
1801 				less_elems = id + elems - empty;
1802 			} else if (levels < max_levels)
1803 				less_elems++;
1804 			else
1805 				less_elems += elems;
1806 		} else {
1807 			if (levels > max_levels) {
1808 				max_levels = levels;
1809 				greater_elems = (id - part_id) + elems - empty;
1810 			} else if (levels < max_levels)
1811 				greater_elems++;
1812 			else
1813 				greater_elems += elems;
1814 		}
1815 
1816 	}
1817 
1818 	if (my_levels < max_levels) {
1819 		/*
1820 		 * The subtree containing the key is not the tallest one.
1821 		 * Reduce its share by the number of records at the highest
1822 		 * level.  Scale the greater and lesser components up
1823 		 * by  the number of records on either side of this
1824 		 * subtree.
1825 		 */
1826 		total_elems = 1 + greater_elems + less_elems;
1827 		kp->equal /= total_elems;
1828 		kp->less /= total_elems;
1829 		kp->less += less_elems/total_elems;
1830 		kp->greater /= total_elems;
1831 		kp->greater += greater_elems/total_elems;
1832 	} else if (my_levels == max_levels) {
1833 		/*
1834 		 * The key is in one of the tallest subtrees.  We will
1835 		 * scale the values by the ratio of the records at the
1836 		 * top of this stubtree to the number of records at the
1837 		 * highest level.
1838 		 */
1839 		total_elems = greater_elems + less_elems;
1840 		if (total_elems != 0) {
1841 			/*
1842 			 * First scale down by the fraction of elements
1843 			 * in this subtree.
1844 			 */
1845 			total_elems += my_elems;
1846 			kp->equal *= my_elems;
1847 			kp->equal /= total_elems;
1848 			kp->less *= my_elems;
1849 			kp->less /= total_elems;
1850 			kp->greater *= my_elems;
1851 			kp->greater /= total_elems;
1852 			/*
1853 			 * Proportionally add weight from the subtrees to the
1854 			 * left and right of this one.
1855 			 */
1856 			kp->less += less_elems / total_elems;
1857 			kp->greater += greater_elems / total_elems;
1858 		}
1859 	}
1860 
1861 	if (0) {
1862 c_err:		(void)__dbc_close(new_dbc);
1863 	}
1864 
1865 err:	return (ret);
1866 }
1867 
1868 /*
1869  * __part_remove --
1870  *	Remove method for a partitioned database.
1871  *
1872  * PUBLIC: int __part_remove __P((DB *, DB_THREAD_INFO *,
1873  * PUBLIC:      DB_TXN *, const char *, const char *, u_int32_t));
1874  */
1875 int
__part_remove(dbp,ip,txn,name,subdb,flags)1876 __part_remove(dbp, ip, txn, name, subdb, flags)
1877 	DB *dbp;
1878 	DB_THREAD_INFO *ip;
1879 	DB_TXN *txn;
1880 	const char *name, *subdb;
1881 	u_int32_t flags;
1882 {
1883 	return (__part_rr(dbp, ip, txn, name, subdb, NULL, flags));
1884 }
1885 
1886 /*
1887  * __part_rename --
1888  *	Rename method for a partitioned database.
1889  *
1890  * PUBLIC: int __part_rename __P((DB *, DB_THREAD_INFO *,
1891  * PUBLIC:         DB_TXN *, const char *, const char *, const char *));
1892  */
1893 int
__part_rename(dbp,ip,txn,name,subdb,newname)1894 __part_rename(dbp, ip, txn, name, subdb, newname)
1895 	DB *dbp;
1896 	DB_THREAD_INFO *ip;
1897 	DB_TXN *txn;
1898 	const char *name, *subdb, *newname;
1899 {
1900 	return (__part_rr(dbp, ip, txn, name, subdb, newname, 0));
1901 }
1902 
1903 /*
1904  * __part_rr --
1905  *	Remove/Rename method for a partitioned database.
1906  */
1907 static int
__part_rr(dbp,ip,txn,name,subdb,newname,flags)1908 __part_rr(dbp, ip, txn, name, subdb, newname, flags)
1909 	DB *dbp;
1910 	DB_THREAD_INFO *ip;
1911 	DB_TXN *txn;
1912 	const char *name, *subdb, *newname;
1913 	u_int32_t flags;
1914 {
1915 	DB **pdbp, *ptmpdbp, *tmpdbp;
1916 	DB_PARTITION *part;
1917 	ENV *env;
1918 	u_int32_t i;
1919 	int ret, t_ret;
1920 	char *np;
1921 
1922 	env = dbp->env;
1923 	ret = 0;
1924 
1925 	if (subdb != NULL && name != NULL) {
1926 		__db_errx(env, DB_STR("0663",
1927 	    "A partitioned database can not be in a multiple databases file"));
1928 		return (EINVAL);
1929 	}
1930 	ENV_GET_THREAD_INFO(env, ip);
1931 
1932 	/*
1933 	 * Since rename no longer opens the database, we have
1934 	 * to do it here.
1935 	 */
1936 	if ((ret = __db_create_internal(&tmpdbp, env, 0)) != 0)
1937 		return (ret);
1938 
1939 	/*
1940 	 * We need to make sure we don't self-deadlock, so give
1941 	 * this dbp the same locker as the incoming one.
1942 	 */
1943 	tmpdbp->locker = dbp->locker;
1944 	if ((ret = __db_open(tmpdbp, ip, txn, name, NULL, dbp->type,
1945 	    DB_RDWRMASTER | DB_RDONLY, 0, PGNO_BASE_MD)) != 0)
1946 		goto err;
1947 
1948 	part = tmpdbp->p_internal;
1949 	pdbp = part->handles;
1950 	COMPQUIET(np, NULL);
1951 	if (newname != NULL && (ret = __os_malloc(env,
1952 	     strlen(newname) + PART_LEN + 1, &np)) != 0) {
1953 		__db_errx(env, ALLOC_ERR,
1954 		    (int)(strlen(newname) + PART_LEN + 1));
1955 		goto err;
1956 	}
1957 	for (i = 0; i < part->nparts; i++, pdbp++) {
1958 		if ((ret = __db_create_internal(&ptmpdbp, env, 0)) != 0)
1959 			break;
1960 		ptmpdbp->locker = (*pdbp)->locker;
1961 		if (newname == NULL)
1962 			ret = __db_remove_int(ptmpdbp,
1963 			     ip, txn, (*pdbp)->fname, NULL, flags);
1964 		else {
1965 			DB_ASSERT(env, np != NULL);
1966 			(void)sprintf(np, PART_NAME, newname, i);
1967 			ret = __db_rename_int(ptmpdbp,
1968 			     ip, txn, (*pdbp)->fname, NULL, np, flags);
1969 		}
1970 		ptmpdbp->locker = NULL;
1971 		(void)__db_close(ptmpdbp, NULL, DB_NOSYNC);
1972 		if (ret != 0)
1973 			break;
1974 	}
1975 
1976 	if (newname != NULL)
1977 		__os_free(env, np);
1978 
1979 	if (!F_ISSET(dbp, DB_AM_OPEN_CALLED)) {
1980 err:
1981 		/* We need to remove the lock event we associated with this. */
1982 		if (txn != NULL)
1983 			__txn_remlock(env, txn, NULL, tmpdbp->locker);
1984 
1985 		/*
1986 		 * Since we copied the locker ID from the dbp, we'd better not
1987 		 * free it here.
1988 		 */
1989 		tmpdbp->locker = NULL;
1990 
1991 		if ((t_ret = __db_close(tmpdbp,
1992 		    txn, DB_NOSYNC)) != 0 && ret == 0)
1993 			ret = t_ret;
1994 	}
1995 	return (ret);
1996 }
1997 
1998 /*
1999  * __partc_dup --
2000  *	Duplicate a cursor on a partitioned database.
2001  *
2002  * PUBLIC: int __partc_dup __P((DBC *, DBC *));
2003  */
2004 int
__partc_dup(dbc_orig,dbc_n)2005 __partc_dup(dbc_orig, dbc_n)
2006 	DBC *dbc_orig;
2007 	DBC *dbc_n;
2008 {
2009 	PART_CURSOR *orig, *new;
2010 
2011 	orig = (PART_CURSOR *)dbc_orig->internal;
2012 	new = (PART_CURSOR *)dbc_n->internal;
2013 
2014 	/*
2015 	 * A cursor on a partitioned database contains the identifier
2016 	 * of the underlying database and a regular cursor that points
2017 	 * to the underlying database.  Copy both pieces.
2018 	 */
2019 	new->part_id = orig->part_id;
2020 
2021 	return (__dbc_dup(orig->sub_cursor, &new->sub_cursor, DB_POSITION));
2022 }
2023 #ifdef HAVE_VERIFY
2024 /*
2025  * __part_verify --
2026  *	Verify a partitioned database.
2027  *
2028  * PUBLIC: int __part_verify __P((DB *, VRFY_DBINFO *, const char *,
2029  * PUBLIC:     void *, int (*)(void *, const void *), u_int32_t));
2030  */
2031 int
__part_verify(dbp,vdp,fname,handle,callback,flags)2032 __part_verify(dbp, vdp, fname, handle, callback, flags)
2033 	DB *dbp;
2034 	VRFY_DBINFO *vdp;
2035 	const char *fname;
2036 	void *handle;
2037 	int (*callback) __P((void *, const void *));
2038 	u_int32_t flags;
2039 {
2040 	BINTERNAL *lp, *rp;
2041 	DB **pdbp;
2042 	DB_PARTITION *part;
2043 	DBC *dbc;
2044 	DBT *key;
2045 	ENV *env;
2046 	DB_THREAD_INFO *ip;
2047 	u_int32_t i;
2048 	int ret, t_ret;
2049 
2050 	env = dbp->env;
2051 	lp = rp = NULL;
2052 	dbc = NULL;
2053 	ip = vdp->thread_info;
2054 
2055 	if (dbp->type == DB_BTREE) {
2056 		if ((ret = __bam_open(dbp, ip,
2057 		    NULL, fname, PGNO_BASE_MD, flags)) != 0)
2058 			goto err;
2059 	}
2060 #ifdef HAVE_HASH
2061 	else if (dbp->type == DB_HASH) {
2062 		if ((ret = __ham_open(dbp, ip,
2063 	    	    NULL, fname, PGNO_BASE_MD, flags)) != 0)
2064 			goto err;
2065 	}
2066 #endif
2067 	/*
2068 	 * Only the BTree and Hash access methods are supported for
2069 	 * partitioned databases.
2070 	 */
2071 	else {
2072 		__db_errx(env, DB_STR_A("5540",
2073 			"%s: Invalid database type for a partitioned database."
2074 			, "%s"), fname);
2075 		return (DB_VERIFY_BAD);
2076 	}
2077 
2078 	/*
2079 	 * Initalize partition db handles and get the names. Set DB_RDWRMASTER
2080 	 * because we may not have the partition callback, but we can still
2081 	 * look at the structure of the tree.
2082 	 */
2083 	if ((ret = __partition_open(dbp,
2084 	    ip, NULL, fname, dbp->type, flags | DB_RDWRMASTER, 0, 0)) != 0)
2085 		goto err;
2086 	part = dbp->p_internal;
2087 
2088 	if (LF_ISSET(DB_SALVAGE)) {
2089 		/* If we are being aggressive we don't want to dump the keys. */
2090 		if (LF_ISSET(DB_AGGRESSIVE))
2091 			dbp->p_internal = NULL;
2092 		ret = __db_prheader(dbp,
2093 		    NULL, 0, 0, handle, callback, vdp, PGNO_BASE_MD);
2094 		dbp->p_internal = part;
2095 		if (ret != 0)
2096 			goto err;
2097 	}
2098 
2099 	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
2100 		goto err;
2101 
2102 	pdbp = part->handles;
2103 	for (i = 0; i < part->nparts; i++, pdbp++) {
2104 		if (!F_ISSET(part, PART_RANGE) || part->keys == NULL)
2105 			goto vrfy;
2106 		if (lp != NULL)
2107 			__os_free(env, lp);
2108 		lp = rp;
2109 		rp = NULL;
2110 		if (i + 1 <  part->nparts) {
2111 			key = &part->keys[i + 1];
2112 			if ((ret = __os_malloc(env,
2113 			    BINTERNAL_SIZE(key->size), &rp)) != 0)
2114 				goto err;
2115 			rp->len = key->size;
2116 			memcpy(rp->data, key->data, key->size);
2117 			B_TSET(rp->type, B_KEYDATA);
2118 		}
2119 vrfy:   if ((t_ret = __db_verify(*pdbp, ip, (*pdbp)->fname,
2120 	      NULL, handle, callback,
2121 	      lp, rp, flags | DB_VERIFY_PARTITION)) != 0 && ret == 0) {
2122 	        ret = t_ret;
2123             if (ret == ENOENT)
2124                 break;
2125 	    }
2126 	}
2127 
2128 err:	if (lp != NULL)
2129 		__os_free(env, lp);
2130 	if (rp != NULL)
2131 		__os_free(env, rp);
2132 	return (ret);
2133 }
2134 #endif
2135 
2136 #ifdef CONFIG_TEST
2137 /*
2138  * __part_testdocopy -- copy all partitions for testing purposes.
2139  *
2140  * PUBLIC: int __part_testdocopy __P((DB *, const char *));
2141  */
2142 int
__part_testdocopy(dbp,name)2143 __part_testdocopy(dbp, name)
2144 	DB *dbp;
2145 	const char *name;
2146 {
2147 	DB **pdbp;
2148 	DB_PARTITION *part;
2149 	u_int32_t i;
2150 	int ret;
2151 
2152 	if ((ret = __db_testdocopy(dbp->env, name)) != 0)
2153 		return (ret);
2154 
2155 	part = dbp->p_internal;
2156 	pdbp = part->handles;
2157 	for (i = 0; i < part->nparts; i++, pdbp++)
2158 		if ((ret = __db_testdocopy(dbp->env, (*pdbp)->fname)) != 0)
2159 			return (ret);
2160 
2161 	return (0);
2162 }
2163 #endif
2164 #else
2165 /*
2166  * __db_nopartition --
2167  *	Error when a Berkeley DB build doesn't include partitioning.
2168  *
2169  * PUBLIC: int __db_no_partition __P((ENV *));
2170  */
2171 int
__db_no_partition(env)2172 __db_no_partition(env)
2173 	ENV *env;
2174 {
2175 	__db_errx(env, DB_STR("0664",
2176     "library build did not include support for the database partitioning"));
2177 	return (DB_OPNOTSUP);
2178 }
2179 /*
2180  * __partition_set --
2181  *	Set the partitioning keys or callback function.
2182  * This routine must be called prior to creating the database.
2183  * PUBLIC: int __partition_set __P((DB *, u_int32_t, DBT *,
2184  * PUBLIC:	u_int32_t (*callback)(DB *, DBT *key)));
2185  */
2186 
2187 int
__partition_set(dbp,parts,keys,callback)2188 __partition_set(dbp, parts, keys, callback)
2189 	DB *dbp;
2190 	u_int32_t parts;
2191 	DBT *keys;
2192 	u_int32_t (*callback)(DB *, DBT *key);
2193 {
2194 	COMPQUIET(parts, 0);
2195 	COMPQUIET(keys, NULL);
2196 	COMPQUIET(callback, NULL);
2197 
2198 	return (__db_no_partition(dbp->env));
2199 }
2200 
2201 /*
2202  * __partition_get_callback --
2203  *	Set the partition callback function.  This routine must be called
2204  * prior to opening a partition database that requires a function.
2205  * PUBLIC: int __partition_get_callback __P((DB *,
2206  * PUBLIC:	 u_int32_t *, u_int32_t (**callback)(DB *, DBT *key)));
2207  */
2208 int
__partition_get_callback(dbp,parts,callback)2209 __partition_get_callback(dbp, parts, callback)
2210 	DB *dbp;
2211 	u_int32_t *parts;
2212 	u_int32_t (**callback)(DB *, DBT *key);
2213 {
2214 	COMPQUIET(parts, NULL);
2215 	COMPQUIET(callback, NULL);
2216 
2217 	return (__db_no_partition(dbp->env));
2218 }
2219 
2220 /*
2221  * __partition_get_dirs --
2222  *	Get partition dirs.
2223  * PUBLIC: int __partition_get_dirs __P((DB *, const char ***));
2224  */
2225 int
__partition_get_dirs(dbp,dirpp)2226 __partition_get_dirs(dbp, dirpp)
2227 	DB *dbp;
2228 	const char ***dirpp;
2229 {
2230 	COMPQUIET(dirpp, NULL);
2231 	return (__db_no_partition(dbp->env));
2232 }
2233 
2234 /*
2235  * __partition_get_keys --
2236  *	Get partition keys.
2237  * PUBLIC: int __partition_get_keys __P((DB *, u_int32_t *, DBT **));
2238  */
2239 int
__partition_get_keys(dbp,parts,keys)2240 __partition_get_keys(dbp, parts, keys)
2241 	DB *dbp;
2242 	u_int32_t *parts;
2243 	DBT **keys;
2244 {
2245 	COMPQUIET(parts, NULL);
2246 	COMPQUIET(keys, NULL);
2247 
2248 	return (__db_no_partition(dbp->env));
2249 }
2250 /*
2251  * __partition_init --
2252  *	Initialize the partition structure.
2253  * Called when the meta data page is read in during database open or
2254  * when partition keys or a callback are set.
2255  *
2256  * PUBLIC: int __partition_init __P((DB *, u_int32_t));
2257  */
2258 int
__partition_init(dbp,flags)2259 __partition_init(dbp, flags)
2260 	DB *dbp;
2261 	u_int32_t flags;
2262 {
2263 	COMPQUIET(flags, 0);
2264 
2265 	return (__db_no_partition(dbp->env));
2266 }
2267 /*
2268  * __part_fileid_reset --
2269  *	reset the fileid on each partition.
2270  *
2271  * PUBLIC: int __part_fileid_reset
2272  * PUBLIC:	 __P((ENV *, DB_THREAD_INFO *, const char *, u_int32_t, int));
2273  */
2274 int
__part_fileid_reset(env,ip,fname,nparts,encrypted)2275 __part_fileid_reset(env, ip, fname, nparts, encrypted)
2276 	ENV *env;
2277 	DB_THREAD_INFO *ip;
2278 	const char *fname;
2279 	u_int32_t nparts;
2280 	int encrypted;
2281 {
2282 	COMPQUIET(ip, NULL);
2283 	COMPQUIET(fname, NULL);
2284 	COMPQUIET(nparts, 0);
2285 	COMPQUIET(encrypted, 0);
2286 
2287 	return (__db_no_partition(env));
2288 }
2289 /*
2290  * __partition_set_dirs --
2291  *	Set the directories for creating the partition databases.
2292  * They must be in the environment.
2293  * PUBLIC: int __partition_set_dirs __P((DB *, const char **));
2294  */
2295 int
__partition_set_dirs(dbp,dirp)2296 __partition_set_dirs(dbp, dirp)
2297 	DB *dbp;
2298 	const char **dirp;
2299 {
2300 	COMPQUIET(dirp, NULL);
2301 
2302 	return (__db_no_partition(dbp->env));
2303 }
2304 #endif
2305