1 /*-
2 * Copyright (c) 1996, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file LICENSE for license information.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/db_verify.h"
14 #include "dbinc/btree.h"
15 #ifdef HAVE_HASH
16 #include "dbinc/hash.h"
17 #endif
18 #include "dbinc/lock.h"
19 #include "dbinc/mp.h"
20 #include "dbinc/partition.h"
21 #include "dbinc/txn.h"
22 #ifdef HAVE_PARTITION
23
24 static int __part_rr __P((DB *, DB_THREAD_INFO *, DB_TXN *,
25 const char *, const char *, const char *, u_int32_t));
26 static int __partc_close __P((DBC *, db_pgno_t, int *));
27 static int __partc_del __P((DBC*, u_int32_t));
28 static int __partc_destroy __P((DBC*));
29 static int __partc_get_pp __P((DBC*, DBT *, DBT *, u_int32_t));
30 static int __partc_put __P((DBC*, DBT *, DBT *, u_int32_t, db_pgno_t *));
31 static int __partc_writelock __P((DBC*));
32 static int __partition_chk_meta __P((DB *,
33 DB_THREAD_INFO *, DB_TXN *, u_int32_t));
34 static int __partition_setup_keys __P((DBC *,
35 DB_PARTITION *, u_int32_t, u_int32_t));
36 static int __part_key_cmp __P((const void *, const void *));
37 static inline void __part_search __P((DB *,
38 DB_PARTITION *, DBT *, u_int32_t *));
39
40 #define ALLOC_ERR DB_STR_A("0764","Partition failed to allocate %d bytes","%d")
41
42 /*
43 * Allocate a partition cursor and copy flags to the partition cursor.
44 * Not passed:
45 * DBC_PARTITIONED -- the subcursors are not.
46 * DBC_OWN_LID -- the arg dbc owns the lock id.
47 * DBC_WRITECURSOR DBC_WRITER -- CDS locking happens on
48 * the whole DB, not the partition.
49 */
50 #define GET_PART_CURSOR(dbc, new_dbc, part_id) do { \
51 DB *__part_dbp; \
52 __part_dbp = part->handles[part_id]; \
53 if ((ret = __db_cursor_int(__part_dbp, \
54 (dbc)->thread_info, (dbc)->txn, __part_dbp->type, \
55 PGNO_INVALID, 0, (dbc)->locker, &new_dbc)) != 0) \
56 goto err; \
57 (new_dbc)->flags = (dbc)->flags & \
58 ~(DBC_PARTITIONED|DBC_OWN_LID|DBC_WRITECURSOR|DBC_WRITER); \
59 } while (0)
60
61 /*
62 * Search for the correct partition.
63 */
__part_search(dbp,part,key,part_idp)64 static inline void __part_search(dbp, part, key, part_idp)
65 DB *dbp;
66 DB_PARTITION *part;
67 DBT *key;
68 u_int32_t *part_idp;
69 {
70 db_indx_t base, indx, limit;
71 int cmp;
72 int (*func) __P((DB *, const DBT *, const DBT *, size_t *));
73 size_t pos, pos_h, pos_l;
74
75 DB_ASSERT(dbp->env, part->nparts != 0);
76 COMPQUIET(cmp, 0);
77 COMPQUIET(indx, 0);
78
79 pos_h = 0;
80 pos_l = 0;
81 func = ((BTREE *)dbp->bt_internal)->bt_compare;
82 DB_BINARY_SEARCH_FOR(base, limit, part->nparts, O_INDX) {
83 pos = pos_l > pos_h ? pos_h : pos_l;
84 DB_BINARY_SEARCH_INCR(indx, base, limit, O_INDX);
85 cmp = func(dbp, key, &part->keys[indx], &pos);
86 if (cmp == 0)
87 break;
88 if (cmp > 0) {
89 DB_BINARY_SEARCH_SHIFT_BASE(indx, base, limit, O_INDX);
90 pos_l = pos;
91 } else
92 pos_h = pos;
93 }
94 if (cmp == 0)
95 *part_idp = indx;
96 else if ((*part_idp = base) != 0)
97 (*part_idp)--;
98 }
99
100 /*
101 * __partition_init --
102 * Initialize the partition structure.
103 * Called when the meta data page is read in during database open or
104 * when partition keys or a callback are set.
105 *
106 * PUBLIC: int __partition_init __P((DB *, u_int32_t));
107 */
108 int
__partition_init(dbp,flags)109 __partition_init(dbp, flags)
110 DB *dbp;
111 u_int32_t flags;
112 {
113 DB_PARTITION *part;
114 int ret;
115
116 if ((part = dbp->p_internal) != NULL) {
117 if ((LF_ISSET(DBMETA_PART_RANGE) &&
118 F_ISSET(part, PART_CALLBACK)) ||
119 (LF_ISSET(DBMETA_PART_CALLBACK) &&
120 F_ISSET(part, PART_RANGE))) {
121 __db_errx(dbp->env, DB_STR("0645",
122 "Cannot specify callback and range keys."));
123 return (EINVAL);
124 }
125 } else if ((ret = __os_calloc(dbp->env, 1, sizeof(*part), &part)) != 0)
126 return (ret);
127
128 if (LF_ISSET(DBMETA_PART_RANGE))
129 F_SET(part, PART_RANGE);
130 if (LF_ISSET(DBMETA_PART_CALLBACK))
131 F_SET(part, PART_CALLBACK);
132 dbp->p_internal = part;
133 /* Set up AM-specific methods that do not require an open. */
134 dbp->db_am_rename = __part_rename;
135 dbp->db_am_remove = __part_remove;
136 return (0);
137 }
138 /*
139 * __partition_set --
140 * Set the partitioning keys or callback function.
141 * This routine must be called prior to creating the database.
142 * PUBLIC: int __partition_set __P((DB *, u_int32_t, DBT *,
143 * PUBLIC: u_int32_t (*callback)(DB *, DBT *key)));
144 */
145
146 int
__partition_set(dbp,parts,keys,callback)147 __partition_set(dbp, parts, keys, callback)
148 DB *dbp;
149 u_int32_t parts;
150 DBT *keys;
151 u_int32_t (*callback)(DB *, DBT *key);
152 {
153 DB_PARTITION *part;
154 ENV *env;
155 u_int32_t i;
156 int ret, t_ret;
157
158 DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_partition");
159 env = dbp->dbenv->env;
160
161 if (parts < 2) {
162 __db_errx(env, DB_STR("0646",
163 "Must specify at least 2 partitions."));
164 return (EINVAL);
165 } else if (parts > PART_MAXIMUM) {
166 __db_errx(env, DB_STR_A("0772",
167 "Must not specify more than %u partitions.", "%u"),
168 (unsigned int)PART_MAXIMUM);
169 return (EINVAL);
170 }
171
172 if (keys == NULL && callback == NULL) {
173 __db_errx(env, DB_STR("0647",
174 "Must specify either keys or a callback."));
175 return (EINVAL);
176 }
177 if (keys != NULL && callback != NULL) {
178 bad: __db_errx(env, DB_STR("0648",
179 "May not specify both keys and a callback."));
180 return (EINVAL);
181 }
182
183 if ((ret = __partition_init(dbp,
184 keys != NULL ?
185 DBMETA_PART_RANGE : DBMETA_PART_CALLBACK)) != 0)
186 return (ret);
187 part = dbp->p_internal;
188
189 if ((part->keys != NULL && callback != NULL) ||
190 (part->callback != NULL && keys != NULL))
191 goto bad;
192
193 /*
194 * Free a key array that was allocated by an earlier set_partition call.
195 */
196 if (part->keys != NULL) {
197 for (i = 0; i < part->nparts - 1; i++) {
198 /*
199 * Always free all entries in the key array and return
200 * the first error code.
201 */
202 if ((t_ret = __db_dbt_clone_free(dbp->env,
203 &part->keys[i])) != 0 && ret == 0)
204 ret = t_ret;
205 }
206 __os_free(dbp->env, part->keys);
207 part->keys = NULL;
208 }
209
210 if (ret != 0)
211 return (ret);
212
213 part->nparts = parts;
214 part->callback = callback;
215
216 /*
217 * Take a copy of the users key array otherwise we cannot be sure
218 * that the memory will still be valid when the database is opened.
219 */
220 if (keys != NULL) {
221 if ((ret = __os_calloc(dbp->env,
222 part->nparts - 1, sizeof(DBT), &part->keys)) != 0)
223 goto err;
224
225 for (i = 0, parts = 0; i < part->nparts - 1; i++, parts++)
226 if ((ret = __db_dbt_clone(dbp->env,
227 &part->keys[i], &keys[i])) != 0)
228 goto err;
229 }
230
231 err: if (ret != 0 && part->keys != NULL) {
232 /*
233 * Always free those entries cloned successfully in the key
234 * array and the one which fails in __db_dbt_clone, and
235 * return the first error code. As ret != 0 here, so it is
236 * safe to ignore any error from __db_dbt_clone_free.
237 */
238 for (i = 0; i < parts; i++)
239 (void)__db_dbt_clone_free(dbp->env, &part->keys[i]);
240 if (parts < part->nparts - 1 && part->keys[parts].data != NULL)
241 __os_free(dbp->env, part->keys[parts].data);
242 __os_free(dbp->env, part->keys);
243 part->keys = NULL;
244 }
245 return (ret);
246 }
247
248 /*
249 * __partition_set_dirs --
250 * Set the directories for creating the partition databases.
251 * They must be in the environment.
252 * PUBLIC: int __partition_set_dirs __P((DB *, const char **));
253 */
254 int
__partition_set_dirs(dbp,dirp)255 __partition_set_dirs(dbp, dirp)
256 DB *dbp;
257 const char **dirp;
258 {
259 DB_ENV *dbenv;
260 DB_PARTITION *part;
261 ENV *env;
262 u_int32_t ndirs, slen;
263 int i, ret;
264 const char **dir;
265 char *cp, **part_dirs, **pd;
266
267 DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_partition_dirs");
268 dbenv = dbp->dbenv;
269 env = dbp->env;
270
271 ndirs = 1;
272 slen = 0;
273 for (dir = dirp; *dir != NULL; dir++) {
274 if (F_ISSET(env, ENV_DBLOCAL))
275 slen += (u_int32_t)strlen(*dir) + 1;
276 ndirs++;
277 }
278
279 slen += sizeof(char *) * ndirs;
280 if ((ret = __os_malloc(env, slen, &part_dirs)) != 0)
281 return (EINVAL);
282 memset(part_dirs, 0, slen);
283
284 cp = (char *) part_dirs + (sizeof(char *) * ndirs);
285 pd = part_dirs;
286 for (dir = dirp; *dir != NULL; dir++, pd++) {
287 if (F_ISSET(env, ENV_DBLOCAL)) {
288 (void)strcpy(cp, *dir);
289 *pd = cp;
290 cp += strlen(*dir) + 1;
291 continue;
292 }
293 for (i = 0; i < dbenv->data_next; i++)
294 if (strcmp(*dir, dbenv->db_data_dir[i]) == 0)
295 break;
296 if (i == dbenv->data_next) {
297 __db_errx(dbp->env, DB_STR_A("0649",
298 "Directory not in environment list %s",
299 "%s"), *dir);
300 __os_free(env, part_dirs);
301 return (EINVAL);
302 }
303 *pd = dbenv->db_data_dir[i];
304 }
305
306 if ((part = dbp->p_internal) == NULL) {
307 if ((ret = __partition_init(dbp, 0)) != 0)
308 return (ret);
309 part = dbp->p_internal;
310 }
311
312 part->dirs = (const char **)part_dirs;
313
314 return (0);
315 }
316
317 /*
318 * __partition_extent_names --
319 * Generate a list of partition extent file names.
320 * PUBLIC: int __partition_extent_names __P((DB *, const char *, char ***));
321 */
322 int
__partition_extent_names(dbp,fname,namelistp)323 __partition_extent_names(dbp, fname, namelistp)
324 DB *dbp;
325 const char *fname;
326 char ***namelistp;
327 {
328 DB_PARTITION *part;
329 ENV *env;
330 char *name, *sp, **cp, *freep;
331 const char *np;
332 u_int32_t part_id, namelen, len;
333 int ret;
334
335 env = dbp->env;
336 part = (DB_PARTITION*)dbp->p_internal;
337 *namelistp = NULL;
338
339 namelen = strlen(fname) + PART_LEN + 1;
340 len = part->nparts * (namelen + sizeof(char*)) + sizeof(char*);
341
342 if ((ret = __os_malloc(env, namelen, &name)) != 0)
343 goto err;
344 if ((ret = __os_malloc(env, len, namelistp)) != 0)
345 goto err;
346
347 sp = name;
348 np = __db_rpath(fname);
349 if (np == NULL)
350 np = fname;
351 else {
352 np++;
353 (void)strncpy(name, fname, (size_t)(np - fname));
354 sp = name + (np - fname);
355 }
356
357 cp = *namelistp;
358 freep = (char*)(cp + part->nparts + 1);
359 for (part_id = 0; part_id < part->nparts; part_id++) {
360 (void)sprintf(sp, PART_NAME, np, part_id);
361 *cp++ = freep;
362 (void)strcpy(freep, name);
363 freep += namelen;
364 }
365 *cp = NULL;
366
367 err: if (name != NULL)
368 __os_free(env, name);
369 return (ret);
370 }
371
372 /*
373 * __partition_open --
374 * Open/create a partitioned database.
375 * PUBLIC: int __partition_open __P((DB *, DB_THREAD_INFO *,
376 * PUBLIC: DB_TXN *, const char *, DBTYPE, u_int32_t, int, int));
377 */
378 int
__partition_open(dbp,ip,txn,fname,type,flags,mode,do_open)379 __partition_open(dbp, ip, txn, fname, type, flags, mode, do_open)
380 DB *dbp;
381 DB_THREAD_INFO *ip;
382 DB_TXN *txn;
383 const char *fname;
384 DBTYPE type;
385 u_int32_t flags;
386 int mode, do_open;
387 {
388 DB *part_db;
389 DB_PARTITION *part;
390 DBC *dbc;
391 ENV *env;
392 u_int32_t part_id;
393 int ret;
394 char *name, *sp;
395 const char **dirp, *np;
396
397 part = dbp->p_internal;
398 env = dbp->dbenv->env;
399 name = NULL;
400
401 if ((ret = __partition_chk_meta(dbp, ip, txn, flags)) != 0 && do_open)
402 goto err;
403
404 if (part->nparts > PART_MAXIMUM) {
405 __db_errx(env, DB_STR_A("0789",
406 "The number of partitions %u exceeds the maximum %u.", "%u %u"),
407 part->nparts, (unsigned int)PART_MAXIMUM);
408 ret = USR_ERR(env, EINVAL);
409 goto err;
410 }
411
412 if ((ret = __os_calloc(env,
413 part->nparts, sizeof(*part->handles), &part->handles)) != 0) {
414 __db_errx(env, ALLOC_ERR,
415 (int)(part->nparts * sizeof(*part->handles)));
416 goto err;
417 }
418
419 DB_ASSERT(env, fname != NULL);
420 if ((ret = __os_malloc(env,
421 strlen(fname) + PART_LEN + 1, &name)) != 0) {
422 __db_errx(env, ALLOC_ERR,
423 (int)(strlen(fname) + PART_LEN + 1));
424 goto err;
425 }
426
427 sp = name;
428 np = __db_rpath(fname);
429 if (np == NULL)
430 np = fname;
431 else {
432 np++;
433 (void)strncpy(name, fname, (size_t)(np - fname));
434 sp = name + (np - fname);
435 }
436
437 if (F_ISSET(dbp, DB_AM_RECOVER))
438 goto done;
439 dirp = part->dirs;
440 for (part_id = 0; part_id < part->nparts; part_id++) {
441 if ((ret = __db_create_internal(
442 &part->handles[part_id], dbp->env, 0)) != 0)
443 goto err;
444
445 part_db = part->handles[part_id];
446 part_db->flags = F_ISSET(dbp,
447 ~(DB_AM_CREATED | DB_AM_CREATED_MSTR | DB_AM_OPEN_CALLED));
448 F_SET(part_db, DB_AM_PARTDB);
449 __db_copy_config(dbp, part_db, part->nparts);
450
451 /* These need to be copied for partitions, but not slices. */
452 part_db->app_private = dbp->app_private;
453 part_db->adj_fileid = dbp->adj_fileid;
454
455 (void)sprintf(sp, PART_NAME, np, part_id);
456 if (do_open) {
457 /*
458 * Cycle through the directory names passed in,
459 * if any.
460 */
461 if (dirp != NULL &&
462 (part_db->dirname = *dirp++) == NULL) {
463 part_db->dirname = *(dirp = part->dirs);
464 dirp++;
465 }
466 if ((ret = __db_open(part_db, ip, txn,
467 name, NULL, type, flags, mode, PGNO_BASE_MD)) != 0)
468 goto err;
469 } else if ((ret = __os_strdup(env, name, &part_db->fname)) != 0)
470 goto err;
471 }
472
473 /* Get rid of the cursor used to open the db; it is the wrong type. */
474 done: while ((dbc = TAILQ_FIRST(&dbp->free_queue)) != NULL)
475 if ((ret = __dbc_destroy(dbc)) != 0)
476 break;
477
478 if (0) {
479 err: (void)__partition_close(dbp, txn, 0);
480 }
481 if (name != NULL)
482 __os_free(env, name);
483 return (ret);
484 }
485
486 /*
487 * __partition_chk_meta --
488 * Check for a consistent meta data page and parameters when opening a
489 * partitioned database.
490 */
491 static int
__partition_chk_meta(dbp,ip,txn,flags)492 __partition_chk_meta(dbp, ip, txn, flags)
493 DB *dbp;
494 DB_THREAD_INFO *ip;
495 DB_TXN *txn;
496 u_int32_t flags;
497 {
498 DBMETA *meta;
499 DB_PARTITION *part;
500 DBC *dbc;
501 DB_LOCK metalock;
502 DB_MPOOLFILE *mpf;
503 ENV *env;
504 db_pgno_t base_pgno;
505 int ret, set_keys, t_ret;
506 u_int32_t pgsize;
507
508 dbc = NULL;
509 meta = NULL;
510 LOCK_INIT(metalock);
511 part = dbp->p_internal;
512 mpf = dbp->mpf;
513 env = dbp->env;
514 ret = 0;
515 set_keys = 0;
516
517 /*
518 * Just to fix the lint warning.
519 * The real value will be set later, and we will
520 * only use the value after being set properly.
521 */
522 pgsize = dbp->pgsize;
523
524 /* Get a cursor on the main db. */
525 dbp->p_internal = NULL;
526 if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
527 goto err;
528
529 /* Get the metadata page. */
530 base_pgno = PGNO_BASE_MD;
531 if ((ret =
532 __db_lget(dbc, 0, base_pgno, DB_LOCK_READ, 0, &metalock)) != 0)
533 goto err;
534 if ((ret = __memp_fget(mpf, &base_pgno, ip, dbc->txn, 0, &meta)) != 0)
535 goto err;
536
537 if (meta->magic != DB_HASHMAGIC &&
538 (meta->magic != DB_BTREEMAGIC || F_ISSET(meta, BTM_RECNO))) {
539 ret = USR_ERR(env, EINVAL);
540 __db_errx(env, DB_STR("0650",
541 "Partitioning may only specified on BTREE and HASH databases."));
542 goto err;
543 }
544 if (!FLD_ISSET(meta->metaflags,
545 DBMETA_PART_RANGE | DBMETA_PART_CALLBACK)) {
546 ret = USR_ERR(env, EINVAL);
547 __db_errx(env, DB_STR("0651",
548 "Partitioning specified on a non-partitioned database."));
549 goto err;
550 }
551
552 if ((F_ISSET(part, PART_RANGE) &&
553 FLD_ISSET(meta->metaflags, DBMETA_PART_CALLBACK)) ||
554 (F_ISSET(part, PART_CALLBACK) &&
555 FLD_ISSET(meta->metaflags, DBMETA_PART_RANGE))) {
556 ret = USR_ERR(env, EINVAL);
557 __db_errx(env, DB_STR("0652",
558 "Incompatible partitioning specified."));
559 goto err;
560 }
561
562 if (FLD_ISSET(meta->metaflags, DBMETA_PART_CALLBACK) &&
563 part->callback == NULL && !IS_RECOVERING(env) &&
564 !F_ISSET(dbp, DB_AM_RECOVER) && !LF_ISSET(DB_RDWRMASTER)) {
565 ret = USR_ERR(env, EINVAL);
566 __db_errx(env, DB_STR("0653",
567 "Partition callback not specified."));
568 goto err;
569 }
570
571 if (F_ISSET(dbp, DB_AM_RECNUM)) {
572 ret = USR_ERR(env, EINVAL);
573 __db_errx(env, DB_STR("0654",
574 "Record numbers are not supported in partitioned databases."));
575 goto err;
576 }
577
578 if (part->nparts == 0) {
579 if (meta->nparts == 0) {
580 ret = USR_ERR(env, EINVAL);
581 __db_errx(env, DB_STR("0655",
582 "Zero paritions specified."));
583 goto err;
584 } else
585 part->nparts = meta->nparts;
586 } else if (meta->nparts != 0 && part->nparts != meta->nparts) {
587 ret = USR_ERR(env, EINVAL);
588 __db_errx(env, DB_STR("0656",
589 "Number of partitions does not match."));
590 goto err;
591 }
592 /*
593 * There is no limit on the number of partitions, but I cannot imagine a real
594 * database having more than 10000.
595 */
596 if (meta->nparts > 10000) {
597 ret = USR_ERR(env, EINVAL);
598 __db_errx(env, DB_STR_A("5553",
599 "Too many partitions %lu", "%lu"), meta->nparts);
600 goto err;
601 }
602
603 if (meta->magic == DB_HASHMAGIC) {
604 if (!F_ISSET(part, PART_CALLBACK)) {
605 ret = USR_ERR(env, EINVAL);
606 __db_errx(env, DB_STR("0657",
607 "Hash database must specify a partition callback."));
608 }
609 } else if (meta->magic != DB_BTREEMAGIC) {
610 ret = USR_ERR(env, EINVAL);
611 __db_errx(env, DB_STR("0658",
612 "Partitioning only supported on BTREE and HASH."));
613 } else {
614 set_keys = 1;
615 pgsize = meta->pagesize;
616 }
617
618 err: /* Put the metadata page back. */
619 if (meta != NULL && (t_ret = __memp_fput(mpf,
620 ip, meta, dbc->priority)) != 0 && ret == 0)
621 ret = t_ret;
622 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
623 ret = t_ret;
624
625 /*
626 * We can only call __partition_setup_keys after putting
627 * the meta page and releasing the meta lock, or self-deadlock
628 * will occur.
629 */
630 if (ret == 0 && set_keys && (t_ret =
631 __partition_setup_keys(dbc, part, pgsize, flags)) != 0)
632 ret = t_ret;
633
634 if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
635 ret = t_ret;
636
637 dbp->p_internal = part;
638 return (ret);
639 }
640
641 /*
642 * Support for sorting keys. Keys must be sorted using the btree
643 * compare function so if we call qsort in __partition_setup_keys
644 * we use this structure to pass the DBP and compare function.
645 */
646 struct key_sort {
647 DB *dbp;
648 DBT *key;
649 int (*compare) __P((DB *, const DBT *, const DBT *, size_t *));
650 };
651
__part_key_cmp(a,b)652 static int __part_key_cmp(a, b)
653 const void *a, *b;
654 {
655 const struct key_sort *ka, *kb;
656
657 ka = a;
658 kb = b;
659 return (ka->compare(ka->dbp, ka->key, kb->key, NULL));
660 }
661 /*
662 * __partition_setup_keys --
663 * Get the partition keys into memory, or put them to disk if we
664 * are creating a partitioned database.
665 */
666 static int
__partition_setup_keys(dbc,part,pgsize,flags)667 __partition_setup_keys(dbc, part, pgsize, flags)
668 DBC *dbc;
669 DB_PARTITION *part;
670 u_int32_t flags, pgsize;
671 {
672 BTREE *t;
673 DB *dbp;
674 DBT data, key, *keys, *kp, *okp;
675 ENV *env;
676 db_pgno_t last_pgno;
677 u_int32_t cgetflags, i, j;
678 size_t dsize;
679 struct key_sort *ks;
680 int have_keys, ret, t_ret;
681 int (*compare) __P((DB *, const DBT *, const DBT *, size_t *));
682
683 memset(&data, 0, sizeof(data));
684 memset(&key, 0, sizeof(key));
685 ks = NULL;
686
687 dbp = dbc->dbp;
688 env = dbp->env;
689
690 /* Need to just read the main database. */
691 dbp->p_internal = NULL;
692 have_keys = 0;
693 dsize = 0;
694
695 keys = part->keys;
696
697 /* First verify that things what we expect. */
698 if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != 0) {
699 if (ret != DB_NOTFOUND)
700 goto err;
701 if (F_ISSET(part, PART_CALLBACK)) {
702 ret = 0;
703 goto done;
704 }
705 if (!LF_ISSET(DB_CREATE) && !F_ISSET(dbp, DB_AM_RECOVER) &&
706 !LF_ISSET(DB_RDWRMASTER)) {
707 ret = USR_ERR(env, EINVAL);
708 __db_errx(env, DB_STR("0659", "No range keys found."));
709 goto err;
710 }
711 } else {
712 if (F_ISSET(part, PART_CALLBACK)) {
713 ret = USR_ERR(env, EINVAL);
714 __db_errx(env, DB_STR("0660",
715 "Keys found and callback set."));
716 goto err;
717 }
718 if (key.size != 0) {
719 ret = USR_ERR(env, EINVAL);
720 __db_errx(env, DB_STR("0661",
721 "Partition key 0 is not empty."));
722 goto err;
723 }
724 have_keys = 1;
725 }
726
727 if (LF_ISSET(DB_CREATE) && have_keys == 0) {
728 /*
729 * Insert the keys into the master database. We will also
730 * compute the total size of the keys for later use.
731 */
732 for (i = 0; i < part->nparts - 1; i++) {
733 if ((ret = __db_put(dbp, dbc->thread_info,
734 dbc->txn, &part->keys[i], &data, 0)) != 0)
735 goto err;
736 dsize += part->keys[i].size;
737 }
738
739 /*
740 * Insert the "0" pointer. All records less than the first
741 * given key go into this partition. We must use the default
742 * compare to insert this key, otherwise it might not be first.
743 */
744 t = dbc->dbp->bt_internal;
745 compare = t->bt_compare;
746 t->bt_compare = __dbt_defcmp;
747 memset(&key, 0, sizeof(key));
748 ret = __db_put(dbp, dbc->thread_info, dbc->txn, &key, &data, 0);
749 t->bt_compare = compare;
750 if (ret != 0)
751 goto err;
752 }
753 done: if (F_ISSET(part, PART_RANGE)) {
754 /*
755 * If we just did the insert, we have known the total size of
756 * the keys. Otherwise, the keys must have been in the database,
757 * and we can calculate the size by checking the last pgno of
758 * the corresponding mpoolfile.
759 *
760 * We make the size aligned at 1024 for performance.
761 */
762 if (dsize == 0) {
763 ret = __memp_get_last_pgno(dbp->mpf, &last_pgno);
764 if (ret != 0)
765 goto err;
766 if (last_pgno > 1)
767 last_pgno--;
768 dsize = last_pgno * pgsize;
769 }
770 dsize = DB_ALIGN(dsize, 1024);
771
772 if ((ret = __os_malloc(env,
773 dsize + (sizeof(DBT) * part->nparts),
774 &part->data)) != 0) {
775 __db_errx(env, ALLOC_ERR, (int)dsize);
776 goto err;
777 }
778 memset(part->data, 0,
779 dsize + (sizeof(DBT) * part->nparts));
780
781 kp = okp = (DBT *)
782 ((u_int8_t *)part->data + dsize);
783 memset(&key, 0, sizeof(key));
784 memset(&data, 0, sizeof(data));
785 data.flags = DB_DBT_USERMEM;
786 j = 0;
787 cgetflags = DB_FIRST;
788 while ((ret = __dbc_get(dbc, &key, &data, cgetflags)) == 0) {
789 /* It is an error if we get more keys than expect. */
790 if ((u_int32_t)(kp - okp) > part->nparts) {
791 ret = USR_ERR(env, EINVAL);
792 goto err;
793 }
794 kp->size = key.size;
795 kp->data = (u_int8_t *)part->data + j;
796 /* It is an error if the keys overflow the space. */
797 if (j + kp->size > dsize) {
798 ret = USR_ERR(env, EINVAL);
799 goto err;
800 }
801 memcpy(kp->data, key.data, kp->size);
802 j += kp->size;
803 cgetflags = DB_NEXT;
804 kp++;
805 }
806
807 /*
808 * We should get part->nparts keys back, otherwise it means
809 * the passed-in keys are not valid.
810 */
811 if (ret == DB_NOTFOUND && (u_int32_t)(kp - okp) == part->nparts)
812 ret = 0;
813
814 if (ret == 0) {
815 /*
816 * They passed in keys, they must match.
817 */
818 compare = NULL;
819 if (have_keys == 1 && keys != NULL) {
820 t = dbc->dbp->bt_internal;
821 compare = t->bt_compare;
822 if ((ret = __os_malloc(env, (part->nparts - 1)
823 * sizeof(struct key_sort), &ks)) != 0)
824 goto err;
825 for (j = 0; j < part->nparts - 1; j++) {
826 ks[j].dbp = dbc->dbp;
827 ks[j].compare = compare;
828 ks[j].key = &keys[j];
829 }
830
831 qsort(ks, (size_t)part->nparts - 1,
832 sizeof(struct key_sort), __part_key_cmp);
833 }
834 part->keys = (DBT *)
835 ((u_int8_t *)part->data + dsize);
836 F_SET(part, PART_KEYS_SETUP);
837 j = 0;
838 for (kp = part->keys;
839 kp < &part->keys[part->nparts]; kp++, j++) {
840 if (have_keys == 1 && keys != NULL && j != 0 &&
841 compare(dbc->dbp, ks[j - 1].key,
842 kp, NULL) != 0) {
843 if (kp->data == NULL &&
844 F_ISSET(dbp, DB_AM_RECOVER))
845 goto err;
846 ret = USR_ERR(env, EINVAL);
847 __db_errx(env, DB_STR_A("0662",
848 "Partition key %d does not match",
849 "%d"), j);
850 goto err;
851 }
852 }
853 }
854 }
855 if (ret == DB_NOTFOUND && F_ISSET(dbp, DB_AM_RECOVER))
856 ret = 0;
857
858 err: dbp->p_internal = part;
859 if (ks != NULL)
860 __os_free(env, ks);
861
862 /*
863 * We only free the original copy of the key array when
864 * the keys have been setup properly, otherwise we let
865 * the close function to free the memory.
866 */
867 if (keys != NULL && F_ISSET(part, PART_KEYS_SETUP)) {
868 for (i = 0; i < part->nparts - 1; i++)
869 /*
870 * Always free all entries in the key array and return
871 * the first error code.
872 */
873 if ((t_ret = __db_dbt_clone_free(env,
874 &keys[i])) != 0 && ret == 0)
875 ret = t_ret;
876 __os_free(env, keys);
877 }
878
879 return (ret);
880 }
881
882 /*
883 * __partition_get_callback --
884 * Get the partition callback function.
885 * PUBLIC: int __partition_get_callback __P((DB *,
886 * PUBLIC: u_int32_t *, u_int32_t (**callback)(DB *, DBT *key)));
887 */
888 int
__partition_get_callback(dbp,parts,callback)889 __partition_get_callback(dbp, parts, callback)
890 DB *dbp;
891 u_int32_t *parts;
892 u_int32_t (**callback)(DB *, DBT *key);
893 {
894 DB_PARTITION *part;
895
896 part = dbp->p_internal;
897 /* Only return populated results if partitioned using callbacks. */
898 if (part != NULL && !F_ISSET(part, PART_CALLBACK))
899 part = NULL;
900 if (parts != NULL)
901 *parts = (part != NULL ? part->nparts : 0);
902 if (callback != NULL)
903 *callback = (part != NULL ? part->callback : NULL);
904
905 return (0);
906 }
907
908 /*
909 * __partition_get_keys --
910 * Get partition keys.
911 * PUBLIC: int __partition_get_keys __P((DB *, u_int32_t *, DBT **));
912 */
913 int
__partition_get_keys(dbp,parts,keys)914 __partition_get_keys(dbp, parts, keys)
915 DB *dbp;
916 u_int32_t *parts;
917 DBT **keys;
918 {
919 DB_PARTITION *part;
920
921 part = dbp->p_internal;
922 /* Only return populated results if partitioned using ranges. */
923 if (part != NULL && !F_ISSET(part, PART_RANGE))
924 part = NULL;
925 if (parts != NULL)
926 *parts = (part != NULL ? part->nparts : 0);
927 if (keys != NULL)
928 *keys = (part != NULL ? &part->keys[1] : NULL);
929
930 return (0);
931 }
932
933 /*
934 * __partition_get_dirs --
935 * Get partition dirs.
936 * PUBLIC: int __partition_get_dirs __P((DB *, const char ***));
937 */
938 int
__partition_get_dirs(dbp,dirpp)939 __partition_get_dirs(dbp, dirpp)
940 DB *dbp;
941 const char ***dirpp;
942 {
943 DB_PARTITION *part;
944 ENV *env;
945 u_int32_t i;
946 int ret;
947
948 env = dbp->env;
949 if ((part = dbp->p_internal) == NULL) {
950 *dirpp = NULL;
951 return (0);
952 }
953 if (!F_ISSET(dbp, DB_AM_OPEN_CALLED)) {
954 *dirpp = part->dirs;
955 return (0);
956 }
957
958 /*
959 * We build a list once when asked. The original directory list,
960 * if any, was discarded at open time.
961 */
962 if ((*dirpp = part->dirs) != NULL)
963 return (0);
964
965 if ((ret = __os_calloc(env,
966 sizeof(char *), part->nparts + 1, (void *) &part->dirs)) != 0)
967 return (ret);
968
969 for (i = 0; i < part->nparts; i++)
970 part->dirs[i] = part->handles[i]->dirname;
971
972 *dirpp = part->dirs;
973 return (0);
974 }
975
976 /*
977 * __partc_init --
978 * Initialize the access private portion of a cursor
979 *
980 * PUBLIC: int __partc_init __P((DBC *));
981 */
982 int
__partc_init(dbc)983 __partc_init(dbc)
984 DBC *dbc;
985 {
986 ENV *env;
987 int ret;
988
989 env = dbc->env;
990
991 /* Allocate/initialize the internal structure. */
992 if (dbc->internal == NULL && (ret =
993 __os_calloc(env, 1, sizeof(PART_CURSOR), &dbc->internal)) != 0)
994 return (ret);
995
996 /* Initialize methods. */
997 dbc->close = dbc->c_close = __dbc_close_pp;
998 dbc->cmp = __dbc_cmp_pp;
999 dbc->count = dbc->c_count = __dbc_count_pp;
1000 dbc->del = dbc->c_del = __dbc_del_pp;
1001 dbc->dup = dbc->c_dup = __dbc_dup_pp;
1002 dbc->get = dbc->c_get = __partc_get_pp;
1003 dbc->pget = dbc->c_pget = __dbc_pget_pp;
1004 dbc->put = dbc->c_put = __dbc_put_pp;
1005 dbc->am_bulk = NULL;
1006 dbc->am_close = __partc_close;
1007 dbc->am_del = __partc_del;
1008 dbc->am_destroy = __partc_destroy;
1009 dbc->am_get = NULL;
1010 dbc->am_put = __partc_put;
1011 dbc->am_writelock = __partc_writelock;
1012
1013 /* We avoid swapping partition cursors since we swap the sub cursors */
1014 F_SET(dbc, DBC_PARTITIONED);
1015
1016 return (0);
1017 }
1018 /*
1019 * __partc_get_pp --
1020 * cursor get opeartion on a partitioned database.
1021 */
1022 static int
__partc_get_pp(dbc,key,data,flags)1023 __partc_get_pp(dbc, key, data, flags)
1024 DBC *dbc;
1025 DBT *key, *data;
1026 u_int32_t flags;
1027 {
1028 DB *dbp;
1029 DB_THREAD_INFO *ip;
1030 ENV *env;
1031 int ignore_lease, ret;
1032
1033 dbp = dbc->dbp;
1034 env = dbp->env;
1035
1036 ignore_lease = LF_ISSET(DB_IGNORE_LEASE) ? 1 : 0;
1037 LF_CLR(DB_IGNORE_LEASE);
1038 if ((ret = __dbc_get_arg(dbc, key, data, flags)) != 0)
1039 return (ret);
1040
1041 ENV_ENTER(env, ip);
1042
1043 DEBUG_LREAD(dbc, dbc->txn, "DBcursor->get",
1044 flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
1045
1046 ret = __partc_get(dbc, key, data, flags);
1047 /*
1048 * Check for master leases.
1049 */
1050 if (ret == 0 &&
1051 IS_REP_MASTER(env) && IS_USING_LEASES(env) && !ignore_lease)
1052 ret = __rep_lease_check(env, 1);
1053
1054 ENV_LEAVE(env, ip);
1055 __dbt_userfree(env, key, NULL, data);
1056 return (ret);
1057 }
1058 /*
1059 * __partition_get --
1060 * cursor get operation on a partitioned database.
1061 *
1062 * PUBLIC: int __partc_get __P((DBC*, DBT *, DBT *, u_int32_t));
1063 */
1064 int
__partc_get(dbc,key,data,flags)1065 __partc_get(dbc, key, data, flags)
1066 DBC *dbc;
1067 DBT *key, *data;
1068 u_int32_t flags;
1069 {
1070 DB *dbp;
1071 DBC *orig_dbc, *new_dbc;
1072 DB_PARTITION *part;
1073 PART_CURSOR *cp;
1074 u_int32_t multi, part_id;
1075 int ret, retry, search;
1076
1077 dbp = dbc->dbp;
1078 cp = (PART_CURSOR*)dbc->internal;
1079 orig_dbc = cp->sub_cursor;
1080 part = dbp->p_internal;
1081
1082 new_dbc = NULL;
1083 retry = search = 0;
1084 part_id = cp->part_id;
1085 multi = flags & ~DB_OPFLAGS_MASK;
1086
1087 switch (flags & DB_OPFLAGS_MASK) {
1088 case DB_CURRENT:
1089 break;
1090 case DB_FIRST:
1091 part_id = 0;
1092 retry = 1;
1093 break;
1094 case DB_GET_BOTH:
1095 case DB_GET_BOTHC:
1096 case DB_GET_BOTH_RANGE:
1097 search = 1;
1098 break;
1099 case DB_SET_RANGE:
1100 search = 1;
1101 retry = 1;
1102 break;
1103 case DB_LAST:
1104 part_id = part->nparts - 1;
1105 retry = 1;
1106 break;
1107 case DB_NEXT:
1108 case DB_NEXT_NODUP:
1109 if (orig_dbc == NULL)
1110 part_id = 0;
1111 else
1112 part_id = cp->part_id;
1113 retry = 1;
1114 break;
1115 case DB_NEXT_DUP:
1116 break;
1117 case DB_PREV:
1118 case DB_PREV_NODUP:
1119 if (orig_dbc == NULL)
1120 part_id = part->nparts - 1;
1121 else
1122 part_id = cp->part_id;
1123 retry = 1;
1124 break;
1125 case DB_PREV_DUP:
1126 break;
1127 case DB_SET:
1128 search = 1;
1129 break;
1130 default:
1131 return (__db_unknown_flag(dbp->env, "__partc_get", flags));
1132 }
1133
1134 /*
1135 * If we need to find the partition to start on, then
1136 * do a binary search of the in memory partition table.
1137 */
1138 if (search == 1 && F_ISSET(part, PART_CALLBACK))
1139 part_id = part->callback(dbp, key) % part->nparts;
1140 else if (search == 1)
1141 __part_search(dbp, part, key, &part_id);
1142
1143 /* Get a new cursor if necessary */
1144 if (orig_dbc == NULL || cp->part_id != part_id) {
1145 GET_PART_CURSOR(dbc, new_dbc, part_id);
1146 } else
1147 new_dbc = orig_dbc;
1148
1149 while ((ret = __dbc_get(new_dbc,
1150 key, data, flags)) == DB_NOTFOUND && retry == 1) {
1151 switch (flags & DB_OPFLAGS_MASK) {
1152 case DB_FIRST:
1153 case DB_NEXT:
1154 case DB_NEXT_NODUP:
1155 case DB_SET_RANGE:
1156 if (++part_id < part->nparts) {
1157 flags = DB_FIRST | multi;
1158 break;
1159 }
1160 goto err;
1161 case DB_LAST:
1162 case DB_PREV:
1163 case DB_PREV_NODUP:
1164 if (part_id-- > 0) {
1165 flags = DB_LAST | multi;
1166 break;
1167 }
1168 goto err;
1169 default:
1170 goto err;
1171 }
1172
1173 if (new_dbc != orig_dbc && (ret = __dbc_close(new_dbc)) != 0)
1174 goto err;
1175 GET_PART_CURSOR(dbc, new_dbc, part_id);
1176 }
1177
1178 if (ret != 0)
1179 goto err;
1180
1181 /* Success: swap original and new cursors. */
1182 if (new_dbc != orig_dbc) {
1183 if (orig_dbc != NULL) {
1184 cp->sub_cursor = NULL;
1185 if ((ret = __dbc_close(orig_dbc)) != 0)
1186 goto err;
1187 }
1188 cp->sub_cursor = new_dbc;
1189 cp->part_id = part_id;
1190 }
1191
1192 return (0);
1193
1194 err: if (new_dbc != NULL && new_dbc != orig_dbc)
1195 (void)__dbc_close(new_dbc);
1196 return (ret);
1197 }
1198
1199 /*
1200 * __partc_put --
1201 * cursor put opeartion on a partitioned cursor.
1202 *
1203 */
1204 static int
__partc_put(dbc,key,data,flags,pgnop)1205 __partc_put(dbc, key, data, flags, pgnop)
1206 DBC *dbc;
1207 DBT *key, *data;
1208 u_int32_t flags;
1209 db_pgno_t *pgnop;
1210 {
1211 DB *dbp;
1212 DB_PARTITION *part;
1213 DBC *new_dbc;
1214 PART_CURSOR *cp;
1215 u_int32_t part_id;
1216 int ret;
1217
1218 dbp = dbc->dbp;
1219 cp = (PART_CURSOR*)dbc->internal;
1220 part_id = cp->part_id;
1221 part = dbp->p_internal;
1222 *pgnop = PGNO_INVALID;
1223
1224 switch (flags) {
1225 case DB_KEYFIRST:
1226 case DB_KEYLAST:
1227 case DB_NODUPDATA:
1228 case DB_NOOVERWRITE:
1229 case DB_OVERWRITE_DUP:
1230 if (F_ISSET(part, PART_CALLBACK)) {
1231 part_id = part->callback(dbp, key) % part->nparts;
1232 break;
1233 }
1234 __part_search(dbp, part, key, &part_id);
1235 break;
1236 default:
1237 break;
1238 }
1239
1240 if ((new_dbc = cp->sub_cursor) == NULL || cp->part_id != part_id) {
1241 if ((ret = __db_cursor_int(part->handles[part_id],
1242 dbc->thread_info, dbc->txn, part->handles[part_id]->type,
1243 PGNO_INVALID, 0, dbc->locker, &new_dbc)) != 0)
1244 goto err;
1245 }
1246
1247 if (F_ISSET(dbc, DBC_WRITER | DBC_WRITECURSOR))
1248 F_SET(new_dbc, DBC_WRITER);
1249 if ((ret = __dbc_put(new_dbc, key, data, flags)) != 0)
1250 goto err;
1251
1252 if (new_dbc != cp->sub_cursor) {
1253 if (cp->sub_cursor != NULL) {
1254 if ((ret = __dbc_close(cp->sub_cursor)) != 0)
1255 goto err;
1256 cp->sub_cursor = NULL;
1257 }
1258 cp->sub_cursor = new_dbc;
1259 cp->part_id = part_id;
1260 }
1261
1262 return (0);
1263
1264 err: if (new_dbc != NULL && cp->sub_cursor != new_dbc)
1265 (void)__dbc_close(new_dbc);
1266 return (ret);
1267 }
1268
1269 /*
1270 * __partc_del
1271 * Delete interface to partitioned cursors.
1272 *
1273 */
1274 static int
__partc_del(dbc,flags)1275 __partc_del(dbc, flags)
1276 DBC *dbc;
1277 u_int32_t flags;
1278 {
1279 PART_CURSOR *cp;
1280 cp = (PART_CURSOR*)dbc->internal;
1281
1282 if (F_ISSET(dbc, DBC_WRITER | DBC_WRITECURSOR))
1283 F_SET(cp->sub_cursor, DBC_WRITER);
1284 return (__dbc_del(cp->sub_cursor, flags));
1285 }
1286
1287 /*
1288 * __partc_writelock
1289 * Writelock interface to partitioned cursors.
1290 *
1291 */
1292 static int
__partc_writelock(dbc)1293 __partc_writelock(dbc)
1294 DBC *dbc;
1295 {
1296 PART_CURSOR *cp;
1297 cp = (PART_CURSOR*)dbc->internal;
1298
1299 return (cp->sub_cursor->am_writelock(cp->sub_cursor));
1300 }
1301
1302 /*
1303 * __partc_close
1304 * Close interface to partitioned cursors.
1305 *
1306 */
1307 static int
__partc_close(dbc,root_pgno,rmroot)1308 __partc_close(dbc, root_pgno, rmroot)
1309 DBC *dbc;
1310 db_pgno_t root_pgno;
1311 int *rmroot;
1312 {
1313 PART_CURSOR *cp;
1314 int ret;
1315
1316 COMPQUIET(root_pgno, 0);
1317 COMPQUIET(rmroot, NULL);
1318
1319 cp = (PART_CURSOR*)dbc->internal;
1320
1321 if (cp->sub_cursor == NULL)
1322 return (0);
1323 ret = __dbc_close(cp->sub_cursor);
1324 cp->sub_cursor = NULL;
1325 return (ret);
1326 }
1327
1328 /*
1329 * __partc_destroy --
1330 * Destroy a single cursor.
1331 */
1332 static int
__partc_destroy(dbc)1333 __partc_destroy(dbc)
1334 DBC *dbc;
1335 {
1336 PART_CURSOR *cp;
1337 ENV *env;
1338
1339 cp = (PART_CURSOR *)dbc->internal;
1340 env = dbc->env;
1341
1342 /* Discard the structure. Don't recurse. */
1343 __os_free(env, cp);
1344
1345 return (0);
1346 }
1347
1348 /*
1349 * __partition_close
1350 * Close a partitioned database.
1351 *
1352 * PUBLIC: int __partition_close __P((DB *, DB_TXN *, u_int32_t));
1353 */
1354 int
__partition_close(dbp,txn,flags)1355 __partition_close(dbp, txn, flags)
1356 DB *dbp;
1357 DB_TXN *txn;
1358 u_int32_t flags;
1359 {
1360 DB **pdbp;
1361 DB_PARTITION *part;
1362 ENV *env;
1363 u_int32_t i;
1364 int ret, t_ret;
1365
1366 if ((part = dbp->p_internal) == NULL)
1367 return (0);
1368
1369 env = dbp->env;
1370 ret = 0;
1371
1372 if ((pdbp = part->handles) != NULL) {
1373 for (i = 0; i < part->nparts; i++, pdbp++)
1374 if (*pdbp != NULL && (t_ret =
1375 __db_close(*pdbp, txn, flags)) != 0 && ret == 0)
1376 ret = t_ret;
1377 __os_free(env, part->handles);
1378 }
1379 if (!F_ISSET(part, PART_KEYS_SETUP) && part->keys != NULL) {
1380 for (i = 0; i < part->nparts - 1; i++) {
1381 if (part->keys[i].data != NULL && (t_ret =
1382 __db_dbt_clone_free(env, &part->keys[i])) != 0 &&
1383 ret == 0)
1384 ret = t_ret;
1385 }
1386 __os_free(env, part->keys);
1387 }
1388 if (part->dirs != NULL)
1389 __os_free(env, (char **)part->dirs);
1390 if (part->data != NULL)
1391 __os_free(env, (char **)part->data);
1392 __os_free(env, part);
1393 dbp->p_internal = NULL;
1394
1395 return (ret);
1396 }
1397
1398 /*
1399 * __partition_sync
1400 * Sync a partitioned database.
1401 *
1402 * PUBLIC: int __partition_sync __P((DB *));
1403 */
1404 int
__partition_sync(dbp)1405 __partition_sync(dbp)
1406 DB *dbp;
1407 {
1408 DB **pdbp;
1409 DB_PARTITION *part;
1410 u_int32_t i;
1411 int ret, t_ret;
1412
1413 ret = 0;
1414 part = dbp->p_internal;
1415
1416 if ((pdbp = part->handles) != NULL) {
1417 for (i = 0; i < part->nparts; i++, pdbp++)
1418 if (*pdbp != NULL &&
1419 F_ISSET(*pdbp, DB_AM_OPEN_CALLED) && (t_ret =
1420 __memp_fsync((*pdbp)->mpf)) != 0 && ret == 0)
1421 ret = t_ret;
1422 }
1423 if ((t_ret = __memp_fsync(dbp->mpf)) != 0 && ret == 0)
1424 ret = t_ret;
1425
1426 return (ret);
1427 }
1428
1429 /*
1430 * __partition_stat
1431 * Stat a partitioned database.
1432 *
1433 * PUBLIC: int __partition_stat __P((DBC *, void *, u_int32_t));
1434 */
1435 int
__partition_stat(dbc,spp,flags)1436 __partition_stat(dbc, spp, flags)
1437 DBC *dbc;
1438 void *spp;
1439 u_int32_t flags;
1440 {
1441 DB *dbp, **pdbp;
1442 DB_BTREE_STAT *fsp, *bsp;
1443 #ifdef HAVE_HASH
1444 DB_HASH_STAT *hfsp, *hsp;
1445 #endif
1446 DB_PARTITION *part;
1447 DBC *new_dbc;
1448 ENV *env;
1449 u_int32_t i;
1450 int ret;
1451
1452 dbp = dbc->dbp;
1453 part = dbp->p_internal;
1454 env = dbp->env;
1455 fsp = NULL;
1456 #ifdef HAVE_HASH
1457 hfsp = NULL;
1458 #endif
1459
1460 pdbp = part->handles;
1461 for (i = 0; i < part->nparts; i++, pdbp++) {
1462 if ((ret = __db_cursor_int(*pdbp, dbc->thread_info, dbc->txn,
1463 (*pdbp)->type, PGNO_INVALID,
1464 0, dbc->locker, &new_dbc)) != 0)
1465 goto err;
1466 switch (new_dbc->dbtype) {
1467 case DB_BTREE:
1468 if ((ret = __bam_stat(new_dbc, &bsp, flags)) != 0)
1469 goto err;
1470 if (fsp == NULL) {
1471 fsp = bsp;
1472 *(DB_BTREE_STAT **)spp = fsp;
1473 } else {
1474 fsp->bt_nkeys += bsp->bt_nkeys;
1475 fsp->bt_ndata += bsp->bt_ndata;
1476 fsp->bt_pagecnt += bsp->bt_pagecnt;
1477 if (fsp->bt_levels < bsp->bt_levels)
1478 fsp->bt_levels = bsp->bt_levels;
1479 fsp->bt_int_pg += bsp->bt_int_pg;
1480 fsp->bt_leaf_pg += bsp->bt_leaf_pg;
1481 fsp->bt_dup_pg += bsp->bt_dup_pg;
1482 fsp->bt_over_pg += bsp->bt_over_pg;
1483 fsp->bt_free += bsp->bt_free;
1484 fsp->bt_int_pgfree += bsp->bt_int_pgfree;
1485 fsp->bt_leaf_pgfree += bsp->bt_leaf_pgfree;
1486 fsp->bt_dup_pgfree += bsp->bt_dup_pgfree;
1487 fsp->bt_over_pgfree += bsp->bt_over_pgfree;
1488 __os_ufree(env, bsp);
1489 }
1490 break;
1491 #ifdef HAVE_HASH
1492 case DB_HASH:
1493 if ((ret = __ham_stat(new_dbc, &hsp, flags)) != 0)
1494 goto err;
1495 if (hfsp == NULL) {
1496 hfsp = hsp;
1497 *(DB_HASH_STAT **)spp = hfsp;
1498 } else {
1499 hfsp->hash_nkeys += hsp->hash_nkeys;
1500 hfsp->hash_ndata += hsp->hash_ndata;
1501 hfsp->hash_pagecnt += hsp->hash_pagecnt;
1502 hfsp->hash_ffactor += hsp->hash_ffactor;
1503 hfsp->hash_buckets += hsp->hash_buckets;
1504 hfsp->hash_free += hsp->hash_free;
1505 hfsp->hash_bfree += hsp->hash_bfree;
1506 hfsp->hash_bigpages += hsp->hash_bigpages;
1507 hfsp->hash_big_bfree += hsp->hash_big_bfree;
1508 hfsp->hash_overflows += hsp->hash_overflows;
1509 hfsp->hash_ovfl_free += hsp->hash_ovfl_free;
1510 hfsp->hash_dup += hsp->hash_dup;
1511 hfsp->hash_dup_free += hsp->hash_dup_free;
1512 __os_ufree(env, hsp);
1513 }
1514 break;
1515 #endif
1516 default:
1517 break;
1518 }
1519 if ((ret = __dbc_close(new_dbc)) != 0)
1520 goto err;
1521 }
1522 return (0);
1523
1524 err:
1525 if (fsp != NULL)
1526 __os_ufree(env, fsp);
1527 *(DB_BTREE_STAT **)spp = NULL;
1528 return (ret);
1529 }
1530
1531 /*
1532 * __part_truncate --
1533 * Truncate a database.
1534 *
1535 * PUBLIC: int __part_truncate __P((DBC *, u_int32_t *));
1536 */
1537 int
__part_truncate(dbc,countp)1538 __part_truncate(dbc, countp)
1539 DBC *dbc;
1540 u_int32_t *countp;
1541 {
1542 DB *dbp, **pdbp;
1543 DB_PARTITION *part;
1544 DBC *new_dbc;
1545 u_int32_t count, i;
1546 int ret, t_ret;
1547
1548 dbp = dbc->dbp;
1549 part = dbp->p_internal;
1550 pdbp = part->handles;
1551 ret = 0;
1552
1553 if (countp != NULL)
1554 *countp = 0;
1555 for (i = 0; ret == 0 && i < part->nparts; i++, pdbp++) {
1556 if ((ret = __db_cursor_int(*pdbp, dbc->thread_info, dbc->txn,
1557 (*pdbp)->type, PGNO_INVALID,
1558 0, dbc->locker, &new_dbc)) != 0)
1559 break;
1560 switch (dbp->type) {
1561 case DB_BTREE:
1562 case DB_RECNO:
1563 ret = __bam_truncate(new_dbc, &count);
1564 break;
1565 case DB_HASH:
1566 #ifdef HAVE_HASH
1567 ret = __ham_truncate(new_dbc, &count);
1568 break;
1569 #endif
1570 case DB_QUEUE:
1571 case DB_UNKNOWN:
1572 default:
1573 ret = __db_unknown_type(dbp->env,
1574 "DB->truncate", dbp->type);
1575 count = 0;
1576 break;
1577 }
1578 if ((t_ret = __dbc_close(new_dbc)) != 0 && ret == 0)
1579 ret = t_ret;
1580 if (countp != NULL)
1581 *countp += count;
1582 }
1583
1584 return (ret);
1585 }
1586 /*
1587 * __part_compact -- compact a partitioned database.
1588 *
1589 * PUBLIC: int __part_compact __P((DB *, DB_THREAD_INFO *, DB_TXN *,
1590 * PUBLIC: DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *));
1591 */
1592 int
__part_compact(dbp,ip,txn,start,stop,c_data,flags,end)1593 __part_compact(dbp, ip, txn, start, stop, c_data, flags, end)
1594 DB *dbp;
1595 DB_THREAD_INFO *ip;
1596 DB_TXN *txn;
1597 DBT *start, *stop;
1598 DB_COMPACT *c_data;
1599 u_int32_t flags;
1600 DBT *end;
1601 {
1602 DB **pdbp;
1603 DB_PARTITION *part;
1604 u_int32_t i;
1605 int ret;
1606
1607 part = dbp->p_internal;
1608 pdbp = part->handles;
1609 ret = 0;
1610
1611 for (i = 0; ret == 0 && i < part->nparts; i++, pdbp++) {
1612 switch (dbp->type) {
1613 case DB_HASH:
1614 case DB_BTREE:
1615 case DB_RECNO:
1616 ret = __db_compact_int(*pdbp,
1617 ip, txn, start, stop, c_data, flags, end);
1618 break;
1619
1620 default:
1621 ret = __dbh_am_chk(dbp, DB_OK_BTREE);
1622 break;
1623 }
1624 }
1625 return (ret);
1626 }
1627
1628 /*
1629 * __part_lsn_reset --
1630 * reset the lsns on each partition.
1631 *
1632 * PUBLIC: int __part_lsn_reset __P((DB *, DB_THREAD_INFO *));
1633 */
1634 int
__part_lsn_reset(dbp,ip)1635 __part_lsn_reset(dbp, ip)
1636 DB *dbp;
1637 DB_THREAD_INFO *ip;
1638 {
1639 DB **pdbp;
1640 DB_PARTITION *part;
1641 u_int32_t i;
1642 int ret;
1643
1644 part = dbp->p_internal;
1645 pdbp = part->handles;
1646 ret = 0;
1647
1648 for (i = 0; ret == 0 && i < part->nparts; i++, pdbp++)
1649 ret = __db_lsn_reset((*pdbp)->mpf, ip);
1650
1651 return (ret);
1652 }
1653
1654 /*
1655 * __part_fileid_reset --
1656 * reset the fileid on each partition.
1657 *
1658 * PUBLIC: int __part_fileid_reset
1659 * PUBLIC: __P((ENV *, DB_THREAD_INFO *, const char *, u_int32_t, int));
1660 */
1661 int
__part_fileid_reset(env,ip,fname,nparts,encrypted)1662 __part_fileid_reset(env, ip, fname, nparts, encrypted)
1663 ENV *env;
1664 DB_THREAD_INFO *ip;
1665 const char *fname;
1666 u_int32_t nparts;
1667 int encrypted;
1668 {
1669 int ret;
1670 u_int32_t part_id;
1671 char *name, *sp;
1672 const char *np;
1673
1674 if ((ret = __os_malloc(env,
1675 strlen(fname) + PART_LEN + 1, &name)) != 0) {
1676 __db_errx(env, ALLOC_ERR,
1677 (int)(strlen(fname) + PART_LEN + 1));
1678 return (ret);
1679 }
1680
1681 sp = name;
1682 np = __db_rpath(fname);
1683 if (np == NULL)
1684 np = fname;
1685 else {
1686 np++;
1687 (void)strncpy(name, fname, (size_t)(np - fname));
1688 sp = name + (np - fname);
1689 }
1690
1691 for (part_id = 0; ret == 0 && part_id < nparts; part_id++) {
1692 (void)sprintf(sp, PART_NAME, np, part_id);
1693 ret = __env_fileid_reset(env, ip, sp, encrypted);
1694 }
1695
1696 __os_free(env, name);
1697 return (ret);
1698 }
1699
1700 /*
1701 * __part_key_range --
1702 * Return proportion of keys relative to given key.
1703 *
1704 * PUBLIC: int __part_key_range __P((DBC *, DBT *, DB_KEY_RANGE *, u_int32_t));
1705 */
1706 int
__part_key_range(dbc,dbt,kp,flags)1707 __part_key_range(dbc, dbt, kp, flags)
1708 DBC *dbc;
1709 DBT *dbt;
1710 DB_KEY_RANGE *kp;
1711 u_int32_t flags;
1712 {
1713 BTREE_CURSOR *cp;
1714 DBC *new_dbc;
1715 DB_PARTITION *part;
1716 PAGE *h;
1717 u_int32_t id, part_id;
1718 u_int32_t elems, empty, less_elems, my_elems, greater_elems;
1719 u_int32_t levels, max_levels, my_levels;
1720 db_pgno_t root_pgno;
1721 int ret;
1722 double total_elems;
1723
1724 COMPQUIET(flags, 0);
1725
1726 part = dbc->dbp->p_internal;
1727
1728 /*
1729 * First we find the key range for the partition that contains the
1730 * key. Then we scale based on estimates of the other partitions.
1731 */
1732 if (F_ISSET(part, PART_CALLBACK))
1733 part_id = part->callback(dbc->dbp, dbt) % part->nparts;
1734 else
1735 __part_search(dbc->dbp, part, dbt, &part_id);
1736 GET_PART_CURSOR(dbc, new_dbc, part_id);
1737
1738 if ((ret = __bam_key_range(new_dbc, dbt, kp, flags)) != 0)
1739 goto err;
1740
1741 cp = (BTREE_CURSOR *)new_dbc->internal;
1742
1743 root_pgno = BAM_ROOT_PGNO(new_dbc);
1744 if ((ret = __memp_fget(new_dbc->dbp->mpf, &root_pgno,
1745 new_dbc->thread_info, new_dbc->txn, 0, &h)) != 0)
1746 goto c_err;
1747
1748 my_elems = NUM_ENT(h);
1749 my_levels = LEVEL(h);
1750 max_levels = my_levels;
1751
1752 if ((ret = __memp_fput(new_dbc->dbp->mpf,
1753 new_dbc->thread_info, h, new_dbc->priority)) != 0)
1754 goto c_err;
1755
1756 if ((ret = __dbc_close(new_dbc)) != 0)
1757 goto err;
1758 /*
1759 * We have the range within one subtree. Now estimate
1760 * what part of the whole range that subtree is. Figure
1761 * out how many levels each part has and how many entries
1762 * in the level below the root.
1763 */
1764 empty = less_elems = greater_elems = 0;
1765 for (id = 0; id < part->nparts; id++) {
1766 if (id == part_id) {
1767 empty = 0;
1768 continue;
1769 }
1770 GET_PART_CURSOR(dbc, new_dbc, id);
1771 cp = (BTREE_CURSOR *)new_dbc->internal;
1772 if ((ret = __memp_fget(new_dbc->dbp->mpf, &cp->root,
1773 new_dbc->thread_info, new_dbc->txn, 0, &h)) != 0)
1774 goto c_err;
1775
1776 elems = NUM_ENT(h);
1777 levels = LEVEL(h);
1778 if (levels == 1)
1779 elems /= 2;
1780
1781 if ((ret = __memp_fput(new_dbc->dbp->mpf,
1782 new_dbc->thread_info, h, new_dbc->priority)) != 0)
1783 goto c_err;
1784
1785 if ((ret = __dbc_close(new_dbc)) != 0)
1786 goto err;
1787
1788 /* If the tree is empty, ignore it. */
1789 if (elems == 0) {
1790 empty++;
1791 continue;
1792 }
1793
1794 /*
1795 * If a tree has fewer levels than the max just count
1796 * it as a single element in the higher level.
1797 */
1798 if (id < part_id) {
1799 if (levels > max_levels) {
1800 max_levels = levels;
1801 less_elems = id + elems - empty;
1802 } else if (levels < max_levels)
1803 less_elems++;
1804 else
1805 less_elems += elems;
1806 } else {
1807 if (levels > max_levels) {
1808 max_levels = levels;
1809 greater_elems = (id - part_id) + elems - empty;
1810 } else if (levels < max_levels)
1811 greater_elems++;
1812 else
1813 greater_elems += elems;
1814 }
1815
1816 }
1817
1818 if (my_levels < max_levels) {
1819 /*
1820 * The subtree containing the key is not the tallest one.
1821 * Reduce its share by the number of records at the highest
1822 * level. Scale the greater and lesser components up
1823 * by the number of records on either side of this
1824 * subtree.
1825 */
1826 total_elems = 1 + greater_elems + less_elems;
1827 kp->equal /= total_elems;
1828 kp->less /= total_elems;
1829 kp->less += less_elems/total_elems;
1830 kp->greater /= total_elems;
1831 kp->greater += greater_elems/total_elems;
1832 } else if (my_levels == max_levels) {
1833 /*
1834 * The key is in one of the tallest subtrees. We will
1835 * scale the values by the ratio of the records at the
1836 * top of this stubtree to the number of records at the
1837 * highest level.
1838 */
1839 total_elems = greater_elems + less_elems;
1840 if (total_elems != 0) {
1841 /*
1842 * First scale down by the fraction of elements
1843 * in this subtree.
1844 */
1845 total_elems += my_elems;
1846 kp->equal *= my_elems;
1847 kp->equal /= total_elems;
1848 kp->less *= my_elems;
1849 kp->less /= total_elems;
1850 kp->greater *= my_elems;
1851 kp->greater /= total_elems;
1852 /*
1853 * Proportionally add weight from the subtrees to the
1854 * left and right of this one.
1855 */
1856 kp->less += less_elems / total_elems;
1857 kp->greater += greater_elems / total_elems;
1858 }
1859 }
1860
1861 if (0) {
1862 c_err: (void)__dbc_close(new_dbc);
1863 }
1864
1865 err: return (ret);
1866 }
1867
1868 /*
1869 * __part_remove --
1870 * Remove method for a partitioned database.
1871 *
1872 * PUBLIC: int __part_remove __P((DB *, DB_THREAD_INFO *,
1873 * PUBLIC: DB_TXN *, const char *, const char *, u_int32_t));
1874 */
1875 int
__part_remove(dbp,ip,txn,name,subdb,flags)1876 __part_remove(dbp, ip, txn, name, subdb, flags)
1877 DB *dbp;
1878 DB_THREAD_INFO *ip;
1879 DB_TXN *txn;
1880 const char *name, *subdb;
1881 u_int32_t flags;
1882 {
1883 return (__part_rr(dbp, ip, txn, name, subdb, NULL, flags));
1884 }
1885
1886 /*
1887 * __part_rename --
1888 * Rename method for a partitioned database.
1889 *
1890 * PUBLIC: int __part_rename __P((DB *, DB_THREAD_INFO *,
1891 * PUBLIC: DB_TXN *, const char *, const char *, const char *));
1892 */
1893 int
__part_rename(dbp,ip,txn,name,subdb,newname)1894 __part_rename(dbp, ip, txn, name, subdb, newname)
1895 DB *dbp;
1896 DB_THREAD_INFO *ip;
1897 DB_TXN *txn;
1898 const char *name, *subdb, *newname;
1899 {
1900 return (__part_rr(dbp, ip, txn, name, subdb, newname, 0));
1901 }
1902
1903 /*
1904 * __part_rr --
1905 * Remove/Rename method for a partitioned database.
1906 */
1907 static int
__part_rr(dbp,ip,txn,name,subdb,newname,flags)1908 __part_rr(dbp, ip, txn, name, subdb, newname, flags)
1909 DB *dbp;
1910 DB_THREAD_INFO *ip;
1911 DB_TXN *txn;
1912 const char *name, *subdb, *newname;
1913 u_int32_t flags;
1914 {
1915 DB **pdbp, *ptmpdbp, *tmpdbp;
1916 DB_PARTITION *part;
1917 ENV *env;
1918 u_int32_t i;
1919 int ret, t_ret;
1920 char *np;
1921
1922 env = dbp->env;
1923 ret = 0;
1924
1925 if (subdb != NULL && name != NULL) {
1926 __db_errx(env, DB_STR("0663",
1927 "A partitioned database can not be in a multiple databases file"));
1928 return (EINVAL);
1929 }
1930 ENV_GET_THREAD_INFO(env, ip);
1931
1932 /*
1933 * Since rename no longer opens the database, we have
1934 * to do it here.
1935 */
1936 if ((ret = __db_create_internal(&tmpdbp, env, 0)) != 0)
1937 return (ret);
1938
1939 /*
1940 * We need to make sure we don't self-deadlock, so give
1941 * this dbp the same locker as the incoming one.
1942 */
1943 tmpdbp->locker = dbp->locker;
1944 if ((ret = __db_open(tmpdbp, ip, txn, name, NULL, dbp->type,
1945 DB_RDWRMASTER | DB_RDONLY, 0, PGNO_BASE_MD)) != 0)
1946 goto err;
1947
1948 part = tmpdbp->p_internal;
1949 pdbp = part->handles;
1950 COMPQUIET(np, NULL);
1951 if (newname != NULL && (ret = __os_malloc(env,
1952 strlen(newname) + PART_LEN + 1, &np)) != 0) {
1953 __db_errx(env, ALLOC_ERR,
1954 (int)(strlen(newname) + PART_LEN + 1));
1955 goto err;
1956 }
1957 for (i = 0; i < part->nparts; i++, pdbp++) {
1958 if ((ret = __db_create_internal(&ptmpdbp, env, 0)) != 0)
1959 break;
1960 ptmpdbp->locker = (*pdbp)->locker;
1961 if (newname == NULL)
1962 ret = __db_remove_int(ptmpdbp,
1963 ip, txn, (*pdbp)->fname, NULL, flags);
1964 else {
1965 DB_ASSERT(env, np != NULL);
1966 (void)sprintf(np, PART_NAME, newname, i);
1967 ret = __db_rename_int(ptmpdbp,
1968 ip, txn, (*pdbp)->fname, NULL, np, flags);
1969 }
1970 ptmpdbp->locker = NULL;
1971 (void)__db_close(ptmpdbp, NULL, DB_NOSYNC);
1972 if (ret != 0)
1973 break;
1974 }
1975
1976 if (newname != NULL)
1977 __os_free(env, np);
1978
1979 if (!F_ISSET(dbp, DB_AM_OPEN_CALLED)) {
1980 err:
1981 /* We need to remove the lock event we associated with this. */
1982 if (txn != NULL)
1983 __txn_remlock(env, txn, NULL, tmpdbp->locker);
1984
1985 /*
1986 * Since we copied the locker ID from the dbp, we'd better not
1987 * free it here.
1988 */
1989 tmpdbp->locker = NULL;
1990
1991 if ((t_ret = __db_close(tmpdbp,
1992 txn, DB_NOSYNC)) != 0 && ret == 0)
1993 ret = t_ret;
1994 }
1995 return (ret);
1996 }
1997
1998 /*
1999 * __partc_dup --
2000 * Duplicate a cursor on a partitioned database.
2001 *
2002 * PUBLIC: int __partc_dup __P((DBC *, DBC *));
2003 */
2004 int
__partc_dup(dbc_orig,dbc_n)2005 __partc_dup(dbc_orig, dbc_n)
2006 DBC *dbc_orig;
2007 DBC *dbc_n;
2008 {
2009 PART_CURSOR *orig, *new;
2010
2011 orig = (PART_CURSOR *)dbc_orig->internal;
2012 new = (PART_CURSOR *)dbc_n->internal;
2013
2014 /*
2015 * A cursor on a partitioned database contains the identifier
2016 * of the underlying database and a regular cursor that points
2017 * to the underlying database. Copy both pieces.
2018 */
2019 new->part_id = orig->part_id;
2020
2021 return (__dbc_dup(orig->sub_cursor, &new->sub_cursor, DB_POSITION));
2022 }
2023 #ifdef HAVE_VERIFY
2024 /*
2025 * __part_verify --
2026 * Verify a partitioned database.
2027 *
2028 * PUBLIC: int __part_verify __P((DB *, VRFY_DBINFO *, const char *,
2029 * PUBLIC: void *, int (*)(void *, const void *), u_int32_t));
2030 */
2031 int
__part_verify(dbp,vdp,fname,handle,callback,flags)2032 __part_verify(dbp, vdp, fname, handle, callback, flags)
2033 DB *dbp;
2034 VRFY_DBINFO *vdp;
2035 const char *fname;
2036 void *handle;
2037 int (*callback) __P((void *, const void *));
2038 u_int32_t flags;
2039 {
2040 BINTERNAL *lp, *rp;
2041 DB **pdbp;
2042 DB_PARTITION *part;
2043 DBC *dbc;
2044 DBT *key;
2045 ENV *env;
2046 DB_THREAD_INFO *ip;
2047 u_int32_t i;
2048 int ret, t_ret;
2049
2050 env = dbp->env;
2051 lp = rp = NULL;
2052 dbc = NULL;
2053 ip = vdp->thread_info;
2054
2055 if (dbp->type == DB_BTREE) {
2056 if ((ret = __bam_open(dbp, ip,
2057 NULL, fname, PGNO_BASE_MD, flags)) != 0)
2058 goto err;
2059 }
2060 #ifdef HAVE_HASH
2061 else if (dbp->type == DB_HASH) {
2062 if ((ret = __ham_open(dbp, ip,
2063 NULL, fname, PGNO_BASE_MD, flags)) != 0)
2064 goto err;
2065 }
2066 #endif
2067 /*
2068 * Only the BTree and Hash access methods are supported for
2069 * partitioned databases.
2070 */
2071 else {
2072 __db_errx(env, DB_STR_A("5540",
2073 "%s: Invalid database type for a partitioned database."
2074 , "%s"), fname);
2075 return (DB_VERIFY_BAD);
2076 }
2077
2078 /*
2079 * Initalize partition db handles and get the names. Set DB_RDWRMASTER
2080 * because we may not have the partition callback, but we can still
2081 * look at the structure of the tree.
2082 */
2083 if ((ret = __partition_open(dbp,
2084 ip, NULL, fname, dbp->type, flags | DB_RDWRMASTER, 0, 0)) != 0)
2085 goto err;
2086 part = dbp->p_internal;
2087
2088 if (LF_ISSET(DB_SALVAGE)) {
2089 /* If we are being aggressive we don't want to dump the keys. */
2090 if (LF_ISSET(DB_AGGRESSIVE))
2091 dbp->p_internal = NULL;
2092 ret = __db_prheader(dbp,
2093 NULL, 0, 0, handle, callback, vdp, PGNO_BASE_MD);
2094 dbp->p_internal = part;
2095 if (ret != 0)
2096 goto err;
2097 }
2098
2099 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
2100 goto err;
2101
2102 pdbp = part->handles;
2103 for (i = 0; i < part->nparts; i++, pdbp++) {
2104 if (!F_ISSET(part, PART_RANGE) || part->keys == NULL)
2105 goto vrfy;
2106 if (lp != NULL)
2107 __os_free(env, lp);
2108 lp = rp;
2109 rp = NULL;
2110 if (i + 1 < part->nparts) {
2111 key = &part->keys[i + 1];
2112 if ((ret = __os_malloc(env,
2113 BINTERNAL_SIZE(key->size), &rp)) != 0)
2114 goto err;
2115 rp->len = key->size;
2116 memcpy(rp->data, key->data, key->size);
2117 B_TSET(rp->type, B_KEYDATA);
2118 }
2119 vrfy: if ((t_ret = __db_verify(*pdbp, ip, (*pdbp)->fname,
2120 NULL, handle, callback,
2121 lp, rp, flags | DB_VERIFY_PARTITION)) != 0 && ret == 0) {
2122 ret = t_ret;
2123 if (ret == ENOENT)
2124 break;
2125 }
2126 }
2127
2128 err: if (lp != NULL)
2129 __os_free(env, lp);
2130 if (rp != NULL)
2131 __os_free(env, rp);
2132 return (ret);
2133 }
2134 #endif
2135
2136 #ifdef CONFIG_TEST
2137 /*
2138 * __part_testdocopy -- copy all partitions for testing purposes.
2139 *
2140 * PUBLIC: int __part_testdocopy __P((DB *, const char *));
2141 */
2142 int
__part_testdocopy(dbp,name)2143 __part_testdocopy(dbp, name)
2144 DB *dbp;
2145 const char *name;
2146 {
2147 DB **pdbp;
2148 DB_PARTITION *part;
2149 u_int32_t i;
2150 int ret;
2151
2152 if ((ret = __db_testdocopy(dbp->env, name)) != 0)
2153 return (ret);
2154
2155 part = dbp->p_internal;
2156 pdbp = part->handles;
2157 for (i = 0; i < part->nparts; i++, pdbp++)
2158 if ((ret = __db_testdocopy(dbp->env, (*pdbp)->fname)) != 0)
2159 return (ret);
2160
2161 return (0);
2162 }
2163 #endif
2164 #else
2165 /*
2166 * __db_nopartition --
2167 * Error when a Berkeley DB build doesn't include partitioning.
2168 *
2169 * PUBLIC: int __db_no_partition __P((ENV *));
2170 */
2171 int
__db_no_partition(env)2172 __db_no_partition(env)
2173 ENV *env;
2174 {
2175 __db_errx(env, DB_STR("0664",
2176 "library build did not include support for the database partitioning"));
2177 return (DB_OPNOTSUP);
2178 }
2179 /*
2180 * __partition_set --
2181 * Set the partitioning keys or callback function.
2182 * This routine must be called prior to creating the database.
2183 * PUBLIC: int __partition_set __P((DB *, u_int32_t, DBT *,
2184 * PUBLIC: u_int32_t (*callback)(DB *, DBT *key)));
2185 */
2186
2187 int
__partition_set(dbp,parts,keys,callback)2188 __partition_set(dbp, parts, keys, callback)
2189 DB *dbp;
2190 u_int32_t parts;
2191 DBT *keys;
2192 u_int32_t (*callback)(DB *, DBT *key);
2193 {
2194 COMPQUIET(parts, 0);
2195 COMPQUIET(keys, NULL);
2196 COMPQUIET(callback, NULL);
2197
2198 return (__db_no_partition(dbp->env));
2199 }
2200
2201 /*
2202 * __partition_get_callback --
2203 * Set the partition callback function. This routine must be called
2204 * prior to opening a partition database that requires a function.
2205 * PUBLIC: int __partition_get_callback __P((DB *,
2206 * PUBLIC: u_int32_t *, u_int32_t (**callback)(DB *, DBT *key)));
2207 */
2208 int
__partition_get_callback(dbp,parts,callback)2209 __partition_get_callback(dbp, parts, callback)
2210 DB *dbp;
2211 u_int32_t *parts;
2212 u_int32_t (**callback)(DB *, DBT *key);
2213 {
2214 COMPQUIET(parts, NULL);
2215 COMPQUIET(callback, NULL);
2216
2217 return (__db_no_partition(dbp->env));
2218 }
2219
2220 /*
2221 * __partition_get_dirs --
2222 * Get partition dirs.
2223 * PUBLIC: int __partition_get_dirs __P((DB *, const char ***));
2224 */
2225 int
__partition_get_dirs(dbp,dirpp)2226 __partition_get_dirs(dbp, dirpp)
2227 DB *dbp;
2228 const char ***dirpp;
2229 {
2230 COMPQUIET(dirpp, NULL);
2231 return (__db_no_partition(dbp->env));
2232 }
2233
2234 /*
2235 * __partition_get_keys --
2236 * Get partition keys.
2237 * PUBLIC: int __partition_get_keys __P((DB *, u_int32_t *, DBT **));
2238 */
2239 int
__partition_get_keys(dbp,parts,keys)2240 __partition_get_keys(dbp, parts, keys)
2241 DB *dbp;
2242 u_int32_t *parts;
2243 DBT **keys;
2244 {
2245 COMPQUIET(parts, NULL);
2246 COMPQUIET(keys, NULL);
2247
2248 return (__db_no_partition(dbp->env));
2249 }
2250 /*
2251 * __partition_init --
2252 * Initialize the partition structure.
2253 * Called when the meta data page is read in during database open or
2254 * when partition keys or a callback are set.
2255 *
2256 * PUBLIC: int __partition_init __P((DB *, u_int32_t));
2257 */
2258 int
__partition_init(dbp,flags)2259 __partition_init(dbp, flags)
2260 DB *dbp;
2261 u_int32_t flags;
2262 {
2263 COMPQUIET(flags, 0);
2264
2265 return (__db_no_partition(dbp->env));
2266 }
2267 /*
2268 * __part_fileid_reset --
2269 * reset the fileid on each partition.
2270 *
2271 * PUBLIC: int __part_fileid_reset
2272 * PUBLIC: __P((ENV *, DB_THREAD_INFO *, const char *, u_int32_t, int));
2273 */
2274 int
__part_fileid_reset(env,ip,fname,nparts,encrypted)2275 __part_fileid_reset(env, ip, fname, nparts, encrypted)
2276 ENV *env;
2277 DB_THREAD_INFO *ip;
2278 const char *fname;
2279 u_int32_t nparts;
2280 int encrypted;
2281 {
2282 COMPQUIET(ip, NULL);
2283 COMPQUIET(fname, NULL);
2284 COMPQUIET(nparts, 0);
2285 COMPQUIET(encrypted, 0);
2286
2287 return (__db_no_partition(env));
2288 }
2289 /*
2290 * __partition_set_dirs --
2291 * Set the directories for creating the partition databases.
2292 * They must be in the environment.
2293 * PUBLIC: int __partition_set_dirs __P((DB *, const char **));
2294 */
2295 int
__partition_set_dirs(dbp,dirp)2296 __partition_set_dirs(dbp, dirp)
2297 DB *dbp;
2298 const char **dirp;
2299 {
2300 COMPQUIET(dirp, NULL);
2301
2302 return (__db_no_partition(dbp->env));
2303 }
2304 #endif
2305