1 /*-
2  * Copyright (c) 2001, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file EXAMPLES-LICENSE for license information.
5  *
6  * ex_bulk --
7  *   This sample program shows how the DB_MULTIPLE_*() macros allow you to
8  *   retrieve or update multiple records within single calls of DBC->get(),
9  *   DB->put(), and DB->del().
10  *   It also shows how to:
11  *	 a) override the default btree key comparison function, so that records
12  *		are ordered as natural integers, rather than byte streams,
13  *	 b) store two databases in a single Berkeley DB file, and
14  *	 c) construct a secondary "index" database for quick record access via a
15  *		component of the data field.
16  *
17  * Source File:
18  *	ex_bulk.c
19  *
20  * Environment directory:
21  *	EX_BULK_DIR: configured as a Transactional Data Store
22  *
23  * Database file:
24  *	ex_bulk.db (with multiple sub-database support)
25  *
26  *	subdbs:	primary:
27  *		    key:  an 'int'-sized integer, which starts at 0 and is
28  *			  incremented for each successive record.
29  *			  Range: 0 to the number of records added (option: -nN)
30  *		    data: a struct bulk_record: containing an id and a string.
31  *
32  *		secondary:	(only if the -S option was specified)
33  *		    key:  a copy of the first byte of primary.data's string.
34  *			  Since that field contains only the characters found
35  *			  in DataString[], the secondary's key is always one of
36  *			  the characters of DataString[]. This allows the
37  *			  example to select a random character from DataString[]
38  *			  as a key to use when scanning through the secondary
39  *			  index.
40  *			  Range: 0..9a..f
41  *		    data: a copy of primary.key, as always is the case for a
42  * 			  secondary index; it points to a primary record.
43  *
44  *   This program has three main modes of operation:
45  *
46  *   Insert		(this is the default mode)
47  *		Erase and create the environment and main database;
48  *		Add key/data pairs (as described above) with bulk puts.
49  *	 	If the -S option is specified then it also creates a secondary
50  *		index, using get_first_str() to create 1-byte key.
51  *
52  *   Read		(command line option: -R)
53  *		Read batches of records from the primary database. By running
54  *		the program first in `insert' mode, and then this mode you can
55  *		see how fast simple bulk retrievals can be. Read mode ignores
56  *		the secondary database, if present.
57  *
58  *   Delete		(command line option: -D)
59  *	 	This mode performs  all the steps of Insert mode, and then does
60  *		bulk deletes of a randomly selected subset of the records, those
61  *		with keys less than or equal to a certain value. Without the -S
62  *		the maximum key is between 0 and the number of records inserted.
63  *		With -S, the maximum key is selected from DataString[].
64  *
65  *   Each phase displays statistics of its elapsed time and throughput,
66  *   in records and bulk batches per second.
67  *
68  * Options:							default value
69  *	-bN  Set the number of records for each bulk batch	[100]
70  *	-cN  Set the Berkeley DB mpool cache size		[1000*pagesize]
71  *	-dN  Number of values for each key in the primary	[1]
72  *	-iN  Number of bulk iterations for Read and Delete Mode	[1000000]
73  *	-nN  Number of keys to insert, or the highest key to  	[1000000]
74  *	     read or delete
75  *	-pN  A power-of-two pagesize from 512 to 65536		[65536]
76  *	-v   Print each value stored into or obtained from 	[not set]
77  *	     the DB_MULTIPLE* DBTs.
78  *	-D   Bulk delete mode: first insert -nN records, then	[not set]
79  *	     delete a random subset of them
80  *	-R   Perform bulk read: perform -iN bulk get requests 	[not set]
81  *	     starting from a random key from 0 to -nN
82  *	-S   Associate a secondary 'index' database with the	[not set]
83  *	     primary. This association connects the databases together, so
84  *	     that an insert into the primary also constructs and inserts a
85  *	     record into the secondary; deleting a record from one deletes
86  *	     the corresponding record of the other.
87  *		When -S is set, the -dN option may not be specified.
88  *	     This is a requirement of secondary databases, which store a copy
89  *	     of the key of the primary as a logical pointer to the primary's
90  *	     records. If the primary allowed duplicates then there would no
91  *	     longer be a one-to-one correspondence between records in the
92  *	     primary and the secondary.
93  *
94  * $Id$
95  */
96 
97 #include <sys/types.h>
98 #include <errno.h>
99 #include <assert.h>
100 #include <stdlib.h>
101 #include <string.h>
102 #include <time.h>
103 
104 #define	BULKDIR			"EX_BULK_DIR"	/* Environment directory name */
105 #define	DATABASE_FILE		"ex_bulk.db"	/* Database file name */
106 #define	DB_MAXIMUM_PAGESIZE	(64 * 1024)	/* Maximum database page size */
107 #define	PRIMARY_NAME		"primary"	/* Primary sub-database */
108 #define	SECONDARY_NAME		"secondary"	/* Secondary sub-database */
109 
110 #define	DATALEN		20			/* The primary value's length */
111 #define	STRINGLEN	(DATALEN - sizeof(int))	/* The length of its string */
112 
113 /*
114  * The bulk_record structure descibes the value stored in the primary database.
115  *
116  * The 'id' indicates which 'duplicate' this record is. The first value of each
117  * key has an id of 0. If duplicates are specified with the -dN option, there
118  * will be N additional records; each successive one with a incremented id.
119  * The 'str' value is derived from the characters in DataString[], below..
120  *
121  * For example, ex_bulk without any options generates:
122  *	key	value
123  *	0	{ 0, 0123456789abcdef }
124  *	1	{ 0, 123456789abcdef0 }
125  *	2, 	{ 0, 23456789abcdef01 }
126  *	...
127  *	15, 	{ 0, f0123456789abcde }
128  *	16, 	{ 0, 0123456789abcdef }
129  *	17, 	{ 0, 123456789abcdef0 }
130  *	...
131  *
132  * ex_bulk -S (creating a secondary 'index' database) creates:
133  *	primary db				secondary db
134  *	key	value				key	value
135  *	0	{ 0, 0123456789abcdef }		'0'	0
136  *	1	{ 0, 123456789abcdef0 }		'0'	0
137  *	2, 	{ 0, 23456789abcdef01 }		'0'	0
138  *	3, 	{ 0, 3456789abcdef012 }		'0'	0
139  *
140  * ex_bulk -d2 generates these records:
141  *	key	value
142  *	0	{ 0, 0123456789abcdef }
143  *	0	{ 1, 0123456789abcdef }
144  *	0	{ 2, 0123456789abcdef }
145  *	1	{ 0, 123456789abcdef0 }
146  *	1	{ 1, 123456789abcdef0 }
147  *	1	{ 2, 123456789abcdef0 }
148  *	2	{ 0, 23456789abcdef01 }
149  *	...
150  *
151  */
152 typedef struct bulk_record {
153 	int	id;
154 	char	str[STRINGLEN];
155 } bulk_record_t;
156 
157 /* The values put into the key's string value are constructed from this. */
158 const char	DataString[] = "0123456789abcdef";
159 
160 
161 #ifdef _WIN32
162 #include <sys/timeb.h>
163 #include <winsock2.h>
164 extern int getopt(int, char * const *, const char *);
165 
166 /*
167  * Timer support --
168  * This implements a POSIX interface to Windows' millisecond resolution timers.
169  */
170 int
gettimeofday(struct timeval * tv,struct timezone * tz)171 gettimeofday(struct timeval *tv, struct timezone *tz)
172 {
173 	struct _timeb now;
174 	_ftime(&now);
175 	tv->tv_sec = (long)now.time;
176 	tv->tv_usec = now.millitm * 1000;
177 	return (0);
178 }
179 
180 /*
181  * Remove any environment directory remaining from a prior execution of this
182  * example program: the Windows version.
183  */
184 #define CLEANUP_CMD "rmdir " BULKDIR " /q/s"
185 
186 #else
187 
188 #include <sys/time.h>
189 #include <unistd.h>
190 
191 /* Remove any environment directory remaining from a prior execution of this
192  * example program: the POSIX/Solaris/Linux/Unix version.
193  */
194 #define CLEANUP_CMD "rm -rf " BULKDIR
195 
196 #endif
197 
198 #include <db.h>
199 
200 int	bulk_dbt_init(DBT *, int);
201 int	bulk_delete(DB_ENV *, DB *, int, int, int, long long *, int *, int);
202 int	bulk_delete_sec(DB_ENV *, DB *, int, int, int, long long *, int *, int);
203 int	bulk_insert(DB_ENV *, DB *, int, int, int, long long *, int *, int);
204 int	bulk_get(DB_ENV *, DB *, int, int, int, int, long long *, int);
205 int	compare_int(DB *, const DBT *, const DBT *, size_t *);
206 int	db_init(DB_ENV *, DB **, DB**, int, int, int);
207 DB_ENV	*env_init(const char *, const char *, u_int);
208 int	get_first_str(DB *, const DBT *, const DBT *, DBT *);
209 int	rotate_string(const char *, char *, int);
210 void	timer_start(struct timeval *start);
211 double	timer_end(const struct timeval *start);
212 int	main(int, char *[]);
213 void	usage(void);
214 
215 /* This program name is used in error messages. */
216 const char	*Progname = "ex_bulk";
217 
218 
219 /*
220  * timer_start --
221  *	Remember the start of an interval, e.g., the beginning of a major
222  *	loop of bulk operations. At the end of the interval, use timer_end().
223  */
224 void
timer_start(struct timeval * timer)225 timer_start(struct timeval *timer)
226 {
227 	(void)gettimeofday(timer, NULL);
228 }
229 
230 /*
231  * timer_end --
232  *	Return the length of an interval whose start was remembered by
233  *	calling timer_start().
234  *
235  *	Returns:
236  *		The elapsed time, in floating point seconds.
237  *		It never returns 0, even when the interval was smaller
238  *		than the precision of the timer, which is easily possible
239  *		on systems with millisecond resolution. This means that the
240  *		caller does not need to guard against divide by zero errors,
241  *		as when computing the average number of records per second.
242  */
243 double
timer_end(const struct timeval * start)244 timer_end(const struct timeval *start)
245 {
246 	struct timeval now;
247 	double elapsed;
248 
249 	(void)gettimeofday(&now, NULL);
250 	elapsed = (now.tv_sec - start->tv_sec) +
251 	    ((double)now.tv_usec - start->tv_usec) / 1000000;
252 	/* Return a minimum duration of 1 microsecond. */
253 	if (elapsed <= 0.0)
254 		elapsed = 0.000001;
255 	return (elapsed);
256 }
257 
258 
259 /*
260  * main --
261  *	The main function decides which mode to run, opens the environment and
262  *	database(s), runs the requested modes while collecting execution
263  *	statistics, prints the resulting times, and cleanly closes down.
264  */
265 int
main(argc,argv)266 main(argc, argv)
267 	int argc;
268 	char *argv[];
269 {
270 	extern char *optarg;	/* From getopt(): the current option's value */
271 	extern int optind;	/* From getopt(): the # of argv's processed */
272 	DB *dbp;		/* Handle of the main database to be updated */
273 	DB *secdbp;		/* Handle of secondary database, if -S */
274 	DB_ENV *dbenv;		/* Handle of the database environment */
275 	struct timeval start;	/* Stores the time when an interval started */
276 	double duration;	/* Length of the current interval, in seconds */
277 	u_int cache;		/* -c<N> option: environment cachesize */
278 	u_int pagesize;		/* -p<N> option: database pagesize */
279 	int batchsize;		/* -b<N> option: #items per bulk get/put/del */
280 	int bulkgets;		/* -i<N> option: the number of bulk get calls
281 				   to make during read mode. */
282 	int bulkchanges;	/* the number of bulk put or delete calls
283 				   needed by insert or delete mode */
284 	int ch;			/* The current command line option char */
285 	int deletemode;		/* -D option: if set, perform bulk deletes */
286 	int dups;		/* -d<N> option: if set, keys have > 1 value */
287 	int numkeys;		/* -n<N> option: the number of records to
288 				   insert, or the highest one to get|delete */
289 	int deletepairs;	/* For the -DS case: whether the bulk delete
290 				   through the secondary database shall specify
291 				   keys or key+data pairs in its DBT. */
292 	int readmode;		/* -R option: Perform reads when set. */
293 	long long resultcount;	/* The number of records processed */
294 	int ret;		/* Return code from the main API calls */
295 	int ret2;		/* This second return code avoids ovwritting
296 				   the main return code 'ret' when closing
297 				   down, so that the first error seen is
298 				   always propagated to the exit status. */
299 	int secondaryflag;	/* -S option: maintain a secondory index. */
300 	int verbose;		/* -v option: Print each key or key/data pair */
301 	char *desc;		/* For delete mode: whether the item counts are
302 				    for key groups or records. */
303 
304 	/*
305 	 * Initialize the environment and database handles so that the cleanup
306 	 * code can determine whether they need to be closed.
307 	 */
308 	dbenv = NULL;
309 	dbp = secdbp = NULL;
310 
311 	/* Set the defaults for the command line options. */
312 	bulkgets = 1000000;
313 	numkeys = 1000000;
314 	batchsize = 100;
315 	deletemode = dups = readmode = secondaryflag = verbose = 0;
316 	pagesize = 65536;
317 	cache = 1000 * pagesize;
318 
319 	while ((ch = getopt(argc, argv, "b:c:d:i:n:p:vDRS")) != EOF)
320 		switch (ch) {
321 		case 'b':
322 			if ((batchsize = atoi(optarg)) <= 0)
323 				usage();
324 			break;
325 		case 'c':
326 			if ((cache = (u_int)atoi(optarg)) <= 0)
327 				usage();
328 			break;
329 		case 'd':
330 			if ((dups = atoi(optarg)) <= 0)
331 				usage();
332 			break;
333 		case 'i':
334 			if ((bulkgets = atoi(optarg)) <= 0)
335 				usage();
336 			break;
337 		case 'n':
338 			if ((numkeys = atoi(optarg)) <= 0)
339 				usage();
340 			break;
341 		case 'p':
342 			if ((pagesize = (u_int)atoi(optarg)) < 512)
343 				usage();
344 			break;
345 		case 'v':
346 			verbose = 1;
347 			break;
348 		case 'D':
349 			deletemode = 1;
350 			break;
351 		case 'R':
352 			readmode = 1;
353 			break;
354 		case 'S':
355 			secondaryflag = 1;
356 			break;
357 		default:
358 			usage();
359 		}
360 
361 	if (secondaryflag && dups) {
362 		fprintf(stderr,
363 		    "%s: the -S and -d%d options are not compatible:\n\t"
364 		    "Primary databases must be unique.\n", Progname, dups);
365 		exit(1);
366 	}
367 
368 	/* Read mode uses the existing environment and database(s); the
369 	 * modification modes remove any existing files.
370 	 */
371 	if (!readmode) {
372 		system(CLEANUP_CMD);
373 		system("mkdir " BULKDIR);
374 	}
375 
376 	/* Create the environment handle, configure, and open the it. */
377 	if ((dbenv = env_init(BULKDIR, Progname, cache)) == NULL)
378 		return (-1);
379 
380 	/* Create the database handle(s), configure and open them. */
381 	if ((ret = db_init(dbenv,
382 	    &dbp, &secdbp, dups, secondaryflag, pagesize)) != 0)
383 		return (1);
384 
385 	/* The read and delete modes use random numbers; initialize it. */
386 	srand((int)time(NULL));
387 
388 	if (readmode) {
389 		timer_start(&start);
390 		/*
391 		 * Repeat getting one bulk batch of records from a randomly
392 		 * selected key. Set resultcount to the number of records fetched.
393 		 */
394 		if ((ret = bulk_get(dbenv, dbp, numkeys,
395 		    dups, batchsize, bulkgets, &resultcount, verbose)) != 0)
396 			goto cleanup;
397 		duration = timer_end(&start);
398 		printf(
399 		    "[STAT] Read %lld records using %d batches in %.2f seconds"
400 		    ": %.0f records/sec\n",
401 		    resultcount, bulkgets, duration,
402 		    resultcount / duration);
403 	} else {
404 		timer_start(&start);
405 		/*
406 		 * Insert 'numkeys' records, with multiple values for each
407 		 * key if 'dups' is set. The resultcount will be the number of
408 		 * records inserted, unless an error occurs. The number of bulk
409 		 * puts that were required is returned in 'bulkchanges'.
410 		 */
411 		if ((ret = bulk_insert(dbenv, dbp, numkeys, dups, batchsize,
412 		     &resultcount, &bulkchanges, verbose)) != 0)
413 			goto cleanup;
414 		duration = timer_end(&start);
415 		printf("[STAT] Insert %lld records using %d batches",
416 		    resultcount, bulkchanges);
417 		printf(" in %.2f seconds: ", duration);
418 		printf("%.0f records/second\n", resultcount / duration);
419 
420 		if (deletemode) {
421 			if (secondaryflag) {
422 				/*
423 				 * Delete some of the data 'through' the
424 				 * secondary database. Randomly set deletepairs
425 				 * to delete either individual records specified
426 				 * by key-value pairs, or sets of records
427 				 * sharing the same key.
428 				 */
429 				deletepairs = rand() % 2;
430 				timer_start(&start);
431 				if ((ret = bulk_delete_sec(dbenv,
432 				    secdbp, numkeys, deletepairs, batchsize,
433 				    &resultcount, &bulkchanges, verbose)) != 0)
434 					goto cleanup;
435 				duration = timer_end(&start);
436 				desc = deletepairs ? "records" : "key groups";
437 				printf(
438 				    "[STAT] Delete %lld %s using %d batches "
439 				    "in %.2f seconds: %.0f %s/second\n",
440 				    resultcount, desc, bulkchanges, duration,
441 				    resultcount / duration, desc);
442 			} else {
443 				/*
444 				 * Delete a randomly selected subset of records
445 				 * from the primary database.
446 				 */
447 				timer_start(&start);
448 				if ((ret = bulk_delete(dbenv,
449 				    dbp, numkeys, dups, batchsize,
450 				    &resultcount, &bulkchanges, verbose)) != 0)
451 					goto cleanup;
452 				duration = timer_end(&start);
453 				desc = dups ? "records" : "key groups";
454 				printf(
455 				    "[STAT] Delete %lld %s using %d batches "
456 				    "in %.2f seconds: %.0f %s/second\n",
457 				    resultcount, desc, bulkchanges, duration,
458 				    resultcount / duration, desc);
459 			}
460 		}
461 	}
462 
463 	/* Close any non-null handles: they have been opened. */
464 cleanup:
465 	/*
466 	 * First check whether the secondary database needs to be closed, then
467 	 * the primary, lastly the environment.
468 	 * If this was in read mode, then there's no need to flush the database
469 	 * to stable storage, hence the DB_NOSYNC; it saves just a little
470 	 * time, avoiding an unecessary scan over the Berkeley DB cache.
471 	 */
472 	if (secdbp != NULL &&
473 	    (ret2 = secdbp->close(secdbp, readmode ? DB_NOSYNC : 0)) != 0) {
474 		dbenv->err(dbenv, ret, "DB(secondary)->close");
475 		/*
476 		 * Remember that there was a failure, but don't overwrite any
477 		 * prior error.
478 		 */
479 		if (ret == 0)
480 			ret = ret2;
481 	}
482 
483 	if (dbp != NULL &&
484 	    (ret2 = dbp->close(dbp, readmode ? DB_NOSYNC : 0)) != 0) {
485 		dbenv->err(dbenv, ret, " DB(primary)->close");
486 		if (ret == 0)
487 			ret = ret2;
488 	}
489 	if (dbenv != NULL &&
490 	    (ret2 = dbenv->close(dbenv, 0)) != 0) {
491 		fprintf(stderr, "DB_ENV->close: %s", db_strerror(ret2));
492 		if (ret == 0)
493 			ret = ret2;
494 	}
495 
496 	/* Return 0 from main if all succeeded; otherwise return 1. */
497 	return (ret == 0 ? 0 : 1);
498 }
499 
500 /*
501  * bulk_dbt_init --
502  *	Initialize a bulk dbt suitable for a number of key/data pairs.
503  *	The bulk buffer size is calculated to be more than enough to
504  *	handle the largest bulk request that this program generates.
505  *
506  *	The buffer must be a multiple of 1024 bytes, at least as large as the
507  *	page size of the underlying database, and aligned for unsigned integer
508  *	(u_int32_t) access.
509  *
510  */
511 int
bulk_dbt_init(bulk,itemcount)512 bulk_dbt_init(bulk, itemcount)
513 	DBT *bulk;
514 	int itemcount;
515 {
516 	memset(bulk, 0, sizeof(DBT));
517 	/*
518 	 * Allow each key/value pair to use twice the size of the data to be
519 	 * inserted. This gives space for the bookkeeping fields required by
520 	 * DB_MULTIPLE_WRITE() and DB_MULTIPLE_WRITE(). A simple one megabyte
521 	 * buffer is suitable for most applications.
522 	 */
523 	bulk->ulen = (u_int32_t)itemcount * 2 * (sizeof(u_int32_t) + DATALEN);
524 	/* Round the size up to be a multiple of 1024. */
525 	bulk->ulen += 1024 - (bulk->ulen % 1024);
526 	/*
527 	 * In order to use a bulk DBT for get() calls, it must be at least as
528 	 * large as the database's pages. Make sure that it as least 64KB, the
529 	 * maximum database page size.
530 	 */
531 	if (bulk->ulen < DB_MAXIMUM_PAGESIZE)
532 		bulk->ulen = DB_MAXIMUM_PAGESIZE;
533 	bulk->flags = DB_DBT_USERMEM | DB_DBT_BULK;
534 	if ((bulk->data = malloc(bulk->ulen)) == NULL) {
535 		printf("bulk_dbt_init: malloc(%u) failed: %s",
536 			(unsigned)bulk->ulen, strerror(errno));
537 		return (errno);
538 	}
539 	memset(bulk->data, 0, bulk->ulen);
540 	return (0);
541 }
542 
543 /*
544  * bulk_insert --
545  *	Insert numkeys * (dups + 1) records into the database, using bulk DBTs.
546  *	If the secondary database has been created and associated, automatically
547  *	insert the corresponding records into the secondary.
548  */
549 int
bulk_insert(dbenv,dbp,numkeys,dups,batchsize,itemcountp,batchcountp,verbose)550 bulk_insert(dbenv, dbp, numkeys, dups, batchsize, itemcountp, batchcountp, verbose)
551 	DB_ENV *dbenv;		/* Environment handle */
552 	DB *dbp;		/* Database handle open to the secondary db. */
553 	int numkeys;		/* The number of distinct keys. */
554 	int dups;		/* The number of additional records per key. */
555 	int batchsize;		/* Store this many items in the bulk DBT. */
556 	long long *itemcountp;	/* out: the number of items added to the DBT. */
557 	int *batchcountp;	/* out: the number of DB->put() calls done. */
558 	int verbose;		/* If set, print each key to be deleted. */
559 {
560 	DBT key;		/* The bulk key parameter to DB->put(). */
561 	DBT data;		/* The bulk data parameter to DB->put(). */
562 	u_int32_t flag;		/* For DB->put: DB_MULTIPLE | DB_MULTIPLE_KEY */
563 	DB_TXN *txnp;
564 	bulk_record_t data_val;	/* The data value being put into a bulk dbt. */
565 	int batchcount;		/* The number of batches performed. */
566 	int pendingitems;	/* The number of items in the current batch. */
567 	int totalitems;		/* The total number of items put into batches. */
568 	int prikey;		/* The current primary key: 0..numkeys-1. */
569 	int ret;		/* Berkeley DB API return code. */
570 	void *poskey;		/* The current position in key bulk DBT. */
571 	void *posdata;		/* The current position in data bulk DBT. */
572 
573 	txnp = NULL;
574 	batchcount = pendingitems = totalitems = ret = 0;
575 	poskey = posdata = NULL;
576 	memset(&data_val, 0, DATALEN);
577 
578 	/* Initialize the bulk dbts to support at least batchsize items. */
579 	if ((ret = bulk_dbt_init(&key, batchsize)) != 0)
580 		return (ret);
581 	if ((ret = bulk_dbt_init(&data, batchsize)) != 0) {
582 		free(&key.data);
583 		return (ret);
584 	}
585 
586 	/*
587 	 * Bulk insert with either DB_MULTIPLE in two buffers or
588 	 * DB_MULTIPLE_KEY in a single buffer. With DB_MULTIPLE, all keys are
589 	 * constructed in the key DBT, and all data is constructed in the data
590 	 * DBT. With DB_MULTIPLE_KEY, all key/data pairs are constructed in the
591 	 * key Dbt. We use DB_MULTIPLE mode when there are duplicate records.
592 	 */
593 	flag = dups ? DB_MULTIPLE : DB_MULTIPLE_KEY;
594 	DB_MULTIPLE_WRITE_INIT(poskey, &key);
595 	if (dups)
596 		DB_MULTIPLE_WRITE_INIT(posdata, &data);
597 	for (prikey = 0; prikey < numkeys; prikey++) {
598 		/*
599 		 * Add one key,value pair to the bulk DBT for each record to
600 		 * insert.
601 		 * The id is incremented for each duplicate. The str value doesn't change.
602 		 */
603 		data_val.id = 0;
604 		rotate_string(DataString, data_val.str, prikey);
605 		do {
606 			/*
607 			 *
608 			 */
609 			if (dups) {
610 				DB_MULTIPLE_WRITE_NEXT(poskey, &key,
611 				    &prikey, sizeof(prikey));
612 				assert(poskey != NULL);
613 				DB_MULTIPLE_WRITE_NEXT(posdata, &data,
614 				    &data_val, DATALEN);
615 				assert(posdata != NULL);
616 			} else {
617 				DB_MULTIPLE_KEY_WRITE_NEXT(poskey, &key,
618 				    &prikey, sizeof(prikey), &data_val, DATALEN);
619 				assert(poskey != NULL);
620 			}
621 			/* Keep track of the number of items in the bulk DBT. */
622 			pendingitems++;
623 			if (verbose)
624 				printf("Insert key: %d, \t"
625 				    "data: (id %d, str %.*s)\n", prikey,
626 				    data_val.id, DATALEN, data_val.str);
627 		} while (data_val.id++ < dups);
628 
629 		/*
630 		 * When the desired batch size has been reached (or slightly
631 		 * exceeded, by the duplicate count), then perform a
632 		 * transaction-protected DB->put().
633 		 */
634 		if (pendingitems >= batchsize) {
635 			if ((ret =
636 			    dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0) {
637 				dbenv->err(dbenv,
638 				    ret, "bulk_insert: txn_begin");
639 				goto err_cleanup;
640 			}
641 
642 			/* Insert the batch. */
643 			if ((ret =
644 			    dbp->put(dbp, txnp, &key, &data, flag)) != 0) {
645 				dbp->err(dbp, ret, "Bulk DB->put");
646 				goto err_cleanup;
647 			}
648 
649 			/* Update statistics: batches, records. */
650 			batchcount++;
651 			totalitems += pendingitems;
652 
653 			/* Prepare for the next bulk batch. */
654 			pendingitems = 0;
655 			DB_MULTIPLE_WRITE_INIT(poskey, &key);
656 			if (dups)
657 				DB_MULTIPLE_WRITE_INIT(posdata, &data);
658 
659 			/* Commit the inserted batch of records. */
660 			ret = txnp->commit(txnp, 0);
661 			/*
662 			 * The transaction handle must not be referenced after
663 			 * a commit (or abort); null out the handle before
664 			 * checking the return code.
665 			 */
666 			txnp = NULL;
667 			if (ret != 0) {
668 				dbenv->err(dbenv, ret, "bulk_insert: commit");
669 				goto err_cleanup;
670 			}
671 		}
672 	}
673 
674 	/*
675 	 * Insert any remaining items stored in the bulk DBT which haven't
676 	 * been inserted.
677 	 */
678 	if (pendingitems != 0) {
679 		/* As above: begin transaction, put, commit transaction. */
680 		if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0) {
681 			dbenv->err(dbenv, ret, "bulk_insert: final txn_begin");
682 			goto err_cleanup;
683 		}
684 		if ((ret = dbp->put(dbp, txnp, &key, &data, flag)) != 0) {
685 			dbp->err(dbp, ret, "Bulk DB->put");
686 			goto err_cleanup;
687 		}
688 		batchcount++;
689 		totalitems += pendingitems;
690 		ret = txnp->commit(txnp, 0);
691 		txnp = NULL;
692 		if (ret != 0) {
693 			dbenv->err(dbenv, ret, "bulk_insert: final commit");
694 			goto err_cleanup;
695 		}
696 	}
697 
698 	/*
699 	 * Tell the caller how many records were inserted, and the number of
700 	 * bulk API calls were needed.
701 	 */
702 	*itemcountp = totalitems;
703 	*batchcountp = batchcount;
704 
705 	/* Clean up any locally acquired resources. */
706 err_cleanup:
707 	if (txnp != NULL)
708 		(void)txnp->abort(txnp);
709 	free(key.data);
710 	free(data.data);
711 	return (ret);
712 }
713 
714 /*
715  * bulk_delete --
716  *	Bulk delete a subset of records from the primary database, starting at
717  *	the first one, and ending at a randomly selected key.
718  */
719 int
bulk_delete(dbenv,dbp,numkeys,dups,batchsize,itemcountp,batchcountp,verbose)720 bulk_delete(dbenv, dbp, numkeys, dups, batchsize, itemcountp, batchcountp, verbose)
721 	DB_ENV *dbenv;		/* Environment handle */
722 	DB *dbp;		/* Database handle open to the secondary db. */
723 	int numkeys;		/* The number of distinct keys. */
724 	int dups;		/* The number of additional records per key. */
725 	int batchsize;		/* Approximate number of items per DB->del() */
726 	long long *itemcountp;	/* out: the number of items added to the DBT. */
727 	int *batchcountp;	/* out: the number of DB->del() calls done. */
728 	int verbose;		/* If set, print each key to be deleted. */
729 {
730 	DBT bulk;		/* DBT containing keys or key/value pairs */
731 	bulk_record_t value;	/* This is the value of key/value pairs. */
732 	u_int32_t flag;		/* Flags to DB->del(): DB_MULITPLE[_KEY] */
733 	int batchcount;		/* The number of (bulk) DB->del() calls made. */
734 	int pendingitems;	/* The number of items written into the DBT,
735 				   but not yet passwd to DB->del(). */
736 	int totalitems;		/* Total number of items written into the DBT. */
737 	int prikey;		/* The primary database's key. */
738 	int highest;		/* The highest, or only, key value to delete. */
739 	int ret;		/* Berkeley DB API return code. */
740 	void *writepos;		/* Location of the next write into bulk. */
741 
742 	/* Suppress "warning: <var> not used" messages in many compilers. */
743 	dbenv = NULL;
744 	batchcount = pendingitems = totalitems = ret = 0;
745 	memset(&value, 0, DATALEN);
746 
747 	/* Select the highest key to be deleted. */
748 	highest = rand() % numkeys;
749 	printf("Bulk delete of %s <= %d\n",
750 		dups ? "records" : "keys", highest);
751 
752 	/* Initialize the bulk dbt to support at least batchsize items. */
753 	bulk_dbt_init(&bulk, batchsize);
754 
755 	/*
756 	 * Prepare the bulk DBT for DB_MULTIPLE_WRITE_NEXT() or
757 	 * DB_MULTIPLE_KEY_WRITE_NEXT(); the initialization is the same.
758 	 */
759 	DB_MULTIPLE_WRITE_INIT(writepos, &bulk);
760 
761 	/*
762 	 * If there are no duplicate data values, then we can completely
763 	 * specify records by the (unique) key, and so use the single-value
764 	 * version DB_MULTIPLE_WRITE_NEXT(). If there are duplicates we use the
765 	 * dual-value version, which specify the exact key+value pairs that we
766 	 * expect to find, since they were just inserted by bulk_insert().
767 	 */
768 	flag = dups ? DB_MULTIPLE_KEY : DB_MULTIPLE;
769 	for (prikey = 0; prikey < highest; prikey++) {
770 		if (dups) {
771 			/*
772 			 * Add one key,value pair to the bulk DBT for each
773 			 * record which was inserted. The id is incremented for
774 			 * each duplicate. The str value doesn't change.
775 			 */
776 			rotate_string(DataString, value.str, prikey);
777 			for (value.id = 0; value.id <= dups; value.id++) {
778 				DB_MULTIPLE_KEY_WRITE_NEXT(writepos, &bulk,
779 				    &prikey, sizeof(prikey), &value, DATALEN);
780 				if (writepos == NULL) {
781 					dbp->errx(dbp,
782 					    "bulk_delete: Duplicate DBT "
783 					    "overflow: key %d duplicate #%d",
784 					    prikey, value.id);
785 					ret = ENOMEM;
786 					goto err;
787 				}
788 				pendingitems++;
789 				if (verbose)
790 					printf("Delete key: %d, \t"
791 					    "data: (id %d, str %.*s)\n", prikey,
792 					    value.id, DATALEN, value.str);
793 			}
794 		} else {
795 			/*
796 			 * The non-dupicates case. Each addition to the bulk dbt
797 			 * is a single item: the next (incremented) primary key.
798 			 */
799 			DB_MULTIPLE_WRITE_NEXT(writepos,
800 			    &bulk, &prikey, sizeof(prikey));
801 			if (writepos == NULL) {
802 				dbp->errx(dbp,
803 				    "bulk_delete: DBT overflow @ key %d", prikey);
804 				ret = ENOMEM;
805 				goto err;
806 			}
807 			pendingitems++;
808 			if (verbose)
809 				printf("Delete key: %d\n", prikey);
810 		}
811 
812 		/*
813 		 * If we have specified enough records in the bulk DBT, now
814 		 * delete them, update the delete statistics, and reset the
815 		 * bulk DBT for the next batch.
816 		 */
817 		if (pendingitems >= batchsize) {
818 			switch (ret = dbp->del(dbp, NULL, &bulk, flag)) {
819 			case 0:
820 				batchcount++;
821 				totalitems += pendingitems;
822 				pendingitems = 0;
823 				DB_MULTIPLE_WRITE_INIT(writepos, &bulk);
824 				break;
825 			default:
826 				dbp->err(dbp, ret, "delete");
827 				goto err;
828 			}
829 		}
830 	}
831 
832 	/* Finish any accumulated items that have not been deleted. */
833 	if (pendingitems != 0) {
834 		switch (ret = dbp->del(dbp, NULL, &bulk, flag)) {
835 		case 0:
836 			batchcount++;
837 			totalitems += pendingitems;
838 			break;
839 		default:
840 			dbp->err(dbp, ret, "final delete");
841 			goto err;
842 		}
843 	}
844 
845 	*itemcountp = totalitems;
846 	*batchcountp = batchcount;
847 
848 err:
849 	free(bulk.data);
850 	return (ret);
851 }
852 
853 /*
854  * bulk_delete_sec --
855  *	Delete records from both databases, accessing them through the
856  *	secondary database.
857  *
858  *	The range of records to delete extends from the lowest key up through
859  *	one of randomly selected char source for the secondary
860  *	database's key: DataString[].
861  *
862  *	The 'pairs' parameter *only* determines *how* those records are
863  *	specified, whether by a small number of keys or a much larger number
864  *	of key/value pairs. In this example the same records are deleted in
865  *	either case, since the key/value case takes care to specify all values
866  *	which were inserted by bulk_insert().
867  */
868 int
bulk_delete_sec(dbenv,dbp,numkeys,pairs,batchsize,itemcountp,batchcountp,verbose)869 bulk_delete_sec(dbenv, dbp,
870     numkeys, pairs, batchsize, itemcountp, batchcountp, verbose)
871 	DB_ENV *dbenv;		/* Environment handle */
872 	DB *dbp;		/* Database handle open to the secondary db. */
873 	int numkeys;		/* The number of distinct keys. */
874 	int pairs;		/* Delete by keys, or key+data pairs? */
875 	int batchsize;		/* Approximate number of items per DB->del() */
876 	long long *itemcountp;	/* out: the number of items added to the DBT. */
877 	int *batchcountp;	/* out: the number of DB->del() calls done. */
878 	int verbose;		/* If set, print each key to be deleted. */
879 {
880 	DBT bulk;		/* The bulk DBT. */
881 	DB_TXN *txnp;		/* Each DB->del() uses its own transaction. */
882 	u_int32_t flag;		/* DB->del() param: which kind of bulk DBT. */
883 	int deletes;		/* Counts the number of DB->del() calls. */
884 	int highest;		/* The highest secondary key to delete */
885 	int i;			/* loop iterator scanning over secondary keys. */
886 	int totalitems;		/* Count of keys or key/value pairs. */
887 	int j;			/* loop iterator for computing primary key */
888 	int pendingitems;	/* The number of items written into the DBT,
889 				   but not yet passwd to DB->del(). */
890 	int prikey;		/* Add this primary key to bulk, if pairs on. */
891 	int ret;		/* Berkeley DB API return code. */
892 	void *writepos;		/* Location of the next write into bulk. */
893 	char seckey;		/* The secondary key to add to the bulk DBT.*/
894 
895 	/*
896 	 * Initialize variables: there is no transaction yet, no keys or
897 	 * key/value pairs added, no deletes done, no error yet.
898 	 */
899 	txnp = NULL;
900 	deletes = pendingitems = totalitems = ret = 0;
901 
902 	/*
903 	 * Initialize the get flags to match the style of DBT that will be
904 	 * constructed: DB_MULTIPLE_KEY_WRITE_NEXT() if pairs is set; or
905 	 * DB_MULTIPLE_WRITE_NEXT() if
906 	 */
907 	flag = pairs ? DB_MULTIPLE_KEY : DB_MULTIPLE;
908 
909 	/* Select the highest key to delete at random. */
910 	highest = rand() % STRINGLEN;
911 	printf("Bulk delete through secondary of %s <= '%c'\n",
912 		pairs ? "records" : "keys", DataString[highest]);
913 
914 	/* Initialize the bulk dbt to support at least batchsize items. */
915 	bulk_dbt_init(&bulk, batchsize);
916 	DB_MULTIPLE_WRITE_INIT(writepos, &bulk);
917 
918 	/*
919 	 * Bulk delete all records of a specific set of keys which includes all
920 	 * characters before the random key in the Datastring. The random key is
921 	 * one of the characters in the DataString.
922 	 * If DB_MULTIPLE, construct the key DBT by the DB_MULTIPLE_WRITE_NEXT
923 	 * with the specific set of keys. If DB_MULTIPLE_KEY, construct the key
924 	 * DBT by the DB_MULTIPLE_KEY_WRITE_NEXT with all key/data pairs of the
925 	 * specific set of keys.
926 	 */
927 	flag |= pairs ? DB_MULTIPLE_KEY : DB_MULTIPLE;
928 	for (i = 0; i <= highest; i++) {
929 		seckey = DataString[i];
930 		if (pairs) {
931 			/*
932 			 * When specifying the exact key-value pairs to delete,
933 			 * recompute the same pairs as were inserted; use the
934 			 * two-item version of the multiple write macro.
935 			 */
936 			j = 0;
937 			do {
938 				prikey = j * STRINGLEN + i;
939 				DB_MULTIPLE_KEY_WRITE_NEXT(writepos, &bulk,
940 				    &seckey, sizeof(seckey),
941 				    &prikey, sizeof(prikey));
942 				if (writepos == NULL) {
943 					/*
944 					 * The DBT should have been sized to
945 					 * prevent the it from overflowing,
946 					 * but check, just in case.
947 					 */
948 					dbp->errx(dbp,
949 					    "Bulk buffer overflow trying to add"
950 					    " ('%c', %d) batch %d",
951 					    seckey, prikey, deletes);
952 					ret = ENOMEM;
953 					goto err;
954 				}
955 				pendingitems++;
956 				if (verbose)
957 					printf("Delete secondary record: %c,\t"
958 					    "data: %d\n", seckey, prikey);
959 			} while (++j < (int)(numkeys / STRINGLEN));
960 		} else {
961 			DB_MULTIPLE_WRITE_NEXT(writepos,
962 			    &bulk, &seckey, sizeof(seckey));
963 			assert(writepos != NULL);
964 			pendingitems++;
965 			if (verbose)
966 				printf("Delete key: %c\n", seckey);
967 		}
968 
969 		/* If this batch is large enough, delete its records. */
970 		if (pendingitems >= batchsize) {
971 			if ((ret =
972 			    dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0) {
973 				dbenv->err(dbenv, ret, "txn_begin");
974 				goto err;
975 			}
976 			switch (ret = dbp->del(dbp, txnp, &bulk, flag)) {
977 			case 0:
978 				deletes++;
979 				totalitems += pendingitems;
980 				pendingitems = 0;
981 				DB_MULTIPLE_WRITE_INIT(writepos, &bulk);
982 				break;
983 			default:
984 				dbp->err(dbp, ret, "bulk_delete_sec DB->del");
985 				goto err;
986 			}
987 			ret = txnp->commit(txnp, 0);
988 			txnp = NULL;
989 			if (ret != 0) {
990 				dbenv->err(dbenv,
991 				    ret, "bulk_delete_sec txn_commit");
992 				goto err;
993 			}
994 		}
995 	}
996 
997 	/* Delete the final batch of records. */
998 	if (pendingitems != 0) {
999 		if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0) {
1000 			dbenv->err(dbenv, ret, "txn_begin");
1001 			goto err;
1002 		}
1003 		switch (ret = dbp->del(dbp, txnp, &bulk, flag)) {
1004 		case 0:
1005 			deletes++;
1006 			totalitems += pendingitems;
1007 			break;
1008 		default:
1009 			dbp->err(dbp, ret, "final bulk DB->del");
1010 			goto err;
1011 		}
1012 		ret = txnp->commit(txnp, 0);
1013 		txnp = NULL;
1014 		if (ret != 0) {
1015 			dbenv->err(dbenv, ret, "final txn_commit");
1016 			goto err;
1017 		}
1018 	}
1019 
1020 	*itemcountp = totalitems;
1021 	*batchcountp = deletes;
1022 
1023 err:	if (txnp != NULL)
1024 		(void)txnp->abort(txnp);
1025 	free(bulk.data);
1026 	return (ret);
1027 }
1028 
1029 /*
1030  * bulk_get --
1031  *	Repeatedy fetch one bulk batch of records starting from random keys.
1032  *	This shows how to use both the one-value DB_MULTPLE_NEXT() and the
1033  *	two-value DB_MUTLIPLE_KEY_NEXT() macros. When there are duplicates we
1034  *	get a relatively small number of records with DB_MULTPLE_NEXT(), just
1035  * 	the duplicate data values for a single random key, When there are no
1036  *	duplicates, we get all the key-value pairs from the starting key to the
1037  *	last one in the database, with DB_MULTIPLE_KEY_NEXT(). That allows the
1038  *	DB->get() to return both parts of key-value pairs.
1039  *
1040  *	Returns:
1041  *	0 on success, or
1042  *	a Berkeley DB error code
1043  */
1044 int
bulk_get(dbenv,dbp,numkeys,dups,batchsize,iterations,countp,verbose)1045 bulk_get(dbenv, dbp, numkeys, dups, batchsize, iterations, countp, verbose)
1046 	DB_ENV *dbenv;		/* Handle for the opened enivironment. */
1047 	DB *dbp;		/* Handle for the opened primary database. */
1048 	int numkeys;		/* The number of distinct keys. */
1049 	int dups;		/* The number of duplicate values per key. */
1050 	int batchsize;		/* Maximum number of items per DB->get(). */
1051 	int iterations;		/* The number of random gets to perform. */
1052 	long long *countp;	/* Returns the number of records retrieved */
1053 	int verbose;		/* Print the data in each record. */
1054 {
1055 	DBC *dbcp;		/* DBC->get() 'seeks' to key, fetches records. */
1056 	DBT key;		/* The starting point of DBC->get(). Not bulk. */
1057 	DBT bulk;		/* Bulk DBT which receieves many data values. */
1058 	DB_TXN *txnp;		/* The cursor's transaction. */
1059 	bulk_record_t *record;	/* The record extracted from the bulk DBT. */
1060 	u_int32_t flags;	/* DB_MULTIPLE* flags for DBC->get(). */
1061 	long long count;	/* Count the number of records retrieved. */
1062 	int i;			/* The main loop iterator. */
1063 	int randkey;		/* The random key for the current batch. */
1064 	int ret;		/* Berkeley DB API return code. */
1065 	void *readpos;		/* The current location in the return bulk DBT. */
1066 	void *retdata;		/* The data value extracted from bulk. */
1067 	void *retkey;		/* The key value extracted from bulk. */
1068 	u_int32_t retdlen;	/* The length of retdata's value. */
1069 	u_int32_t retklen;	/* The length of retkey's value. */
1070 
1071 	/*
1072 	 * Null out the pointers which point to resources to be released
1073 	 * when this function returns.
1074 	 */
1075 	dbcp = NULL;
1076 	txnp = NULL;
1077 
1078 	/* Initialize the count of the records retrieved and the error code. */
1079 	count = ret = 0;
1080 
1081 	/*
1082 	 * Prepare the key DBT to pass the value in 'randkey' to the API. This
1083 	 * does not set yet key.data and key.size to refer to randkey. Those
1084 	 * fields need to be re-set before each DBC->get() call, which
1085 	 * updates them to denote the key which was retrieved.
1086 	 */
1087 	memset(&key, 0, sizeof(key));
1088 
1089 	/* Initialize the bulk dbt to support at least batchsize items. */
1090 	bulk_dbt_init(&bulk, batchsize);
1091 
1092 	/*
1093 	 * Determine the flags to pass to DBC->get(). DB_SET says to position at
1094 	 * the key specified in the key parameter, or the next higher value.
1095 	 */
1096 	flags = DB_SET;
1097 	flags |= dups ? DB_MULTIPLE: DB_MULTIPLE_KEY;
1098 	for (i = 0; i < iterations; i++) {
1099 		if ((ret =
1100 		    dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0)
1101 			goto err_cleanup;
1102 		if ((ret = dbp->cursor(dbp, txnp, &dbcp, 0)) != 0)
1103 			goto err_cleanup;
1104 
1105 		/*
1106 		 * Get the random key for the start of this batch and set up
1107 		 * the key to use it for the DBC->get(). This needs to be done
1108 		 * each time through this loop, because DBC->get() overwrites
1109 		 * key.data and key.size to the item actually found.
1110 		 */
1111 		randkey = rand() % numkeys;
1112 		key.data = &randkey;
1113 		key.size = sizeof(randkey);
1114 		if (verbose)
1115 			printf("Getting a batch starting at %d\n", randkey);
1116 
1117 		/*
1118 		 * If there are duplicates in the database, retrieve
1119 		 * with DB_MULTIPLE and use the DB_MULTIPLE_NEXT
1120 		 * to iterate the data of the random key in the data
1121 		 * DBT. Otherwise retrieve with DB_MULTIPLE_KEY and use
1122 		 * the DB_MULTIPLE_KEY_NEXT to iterate the
1123 		 * key/data pairs of the specific set of keys which
1124 		 * includes all integers >= the random key and < "numkeys".
1125 		 */
1126 		if ((ret = dbcp->get(dbcp, &key, &bulk, flags)) != 0)
1127 			goto err_cleanup;
1128 
1129 		/* Prepare to extract items from the bulk dbt. */
1130 		DB_MULTIPLE_INIT(readpos, &bulk);
1131 		if (flags & DB_MULTIPLE)
1132 			while (readpos != NULL) {
1133 				DB_MULTIPLE_NEXT(readpos,
1134 				    &bulk, retdata, retdlen);
1135 				if (retdata) {
1136 					count++;
1137 					record = (bulk_record_t *)retdata;
1138 					if (verbose)
1139 						printf(
1140 						    "Retrieve key: %d, \tdata: "
1141 						    "%d:%.*s\n",
1142 						    *(int *)key.data,
1143 						    record->id,
1144 						    DATALEN, record->str);
1145 				}
1146 			}
1147 		else
1148 			while (readpos != NULL) {
1149 				DB_MULTIPLE_KEY_NEXT(readpos,
1150 				    &bulk, retkey, retklen, retdata, retdlen);
1151 				if (retkey) {
1152 					count++;
1153 					record = (bulk_record_t *)retdata;
1154 					if (verbose)
1155 						printf(
1156 						    "Retrieve key: %d, \tdata: "
1157 						    "%d:%.*s\n",
1158 						    *((int *) retkey),
1159 						    record->id,
1160 						    DATALEN, record->str);
1161 				}
1162 			}
1163 
1164 		ret = dbcp->close(dbcp);
1165 		dbcp = NULL;
1166 		if (ret != 0)
1167 			goto err_cleanup;
1168 
1169 		ret = txnp->commit(txnp, 0);
1170 		txnp = NULL;
1171 		if (ret != 0)
1172 			goto err_cleanup;
1173 	}
1174 
1175 	/* Tell the caller how many records were retrieved. */
1176 	*countp = count;
1177 
1178 	/* Release any active resources obtained by this function: the cursor,
1179 	 * transaction, and bulk dbt buffer.
1180 	 */
1181 err_cleanup:
1182 	if (dbcp != NULL)
1183 		(void)dbcp->close(dbcp);
1184 	if (txnp != NULL)
1185 		(void)txnp->abort(txnp);
1186 	if (ret != 0)
1187 		dbp->err(dbp, ret, "bulk get");
1188 	if (bulk.data != NULL)
1189 		free(bulk.data);
1190 	return (ret);
1191 }
1192 
1193 /*
1194  * compare_int --
1195  *	Compare two values as the natural integers of this CPU.
1196  */
1197 int
compare_int(dbp,a,b,locp)1198 compare_int(dbp, a, b, locp)
1199 	DB *dbp;
1200 	const DBT *a, *b;
1201 	size_t *locp;
1202 {
1203 	int ai, bi;
1204 
1205 	/* Suppress "warning: <var> not used" messages in many compilers. */
1206 	dbp = NULL;
1207 	locp = NULL;
1208 
1209 	/*
1210 	 * Returns:
1211 	 *	< 0 if a < b
1212 	 *	= 0 if a = b
1213 	 *	> 0 if a > b
1214 	 */
1215 	memcpy(&ai, a->data, sizeof(int));
1216 	memcpy(&bi, b->data, sizeof(int));
1217 	return (ai - bi);
1218 }
1219 
1220 /*
1221  * db_init --
1222  *	Create, configure, and open a database handle for the primary database
1223  *	inside the file DATABASE (ex_bulk.db).  If the secondary database flag
1224  * 	as was specified on the command line, create and associate the secondary
1225  *	datatbase as well.
1226  *
1227  * Returns:
1228  *	0 on success
1229  *	a Berkeley DB error code on failure, after printing a relevent message
1230  *
1231  */
1232 int
db_init(dbenv,dbpp,sdbpp,dups,secondaryflag,pagesize)1233 db_init(dbenv, dbpp, sdbpp, dups, secondaryflag, pagesize)
1234 	DB_ENV *dbenv;		/* The already-opened database environemnt */
1235 	DB **dbpp;		/* 'out': return the primary db handle. */
1236 	DB **sdbpp;		/* 'out': return the secondary db handle. */
1237 	int dups;		/* #duplicate per key in primary database. */
1238 	int secondaryflag;	/* If true, create the secondary database. */
1239 	int pagesize;		/* Create the database with this page size. */
1240 {
1241 	DB *dbp;		/* Local handle for the primary database. */
1242 	DB *sdbp;		/* Local handle for the secondry database. */
1243 	DB_TXN *txnp; 		/* Open/create the databases with this txn. */
1244 	int ret;		/* Berkeley DB API return code. */
1245 	char *dbname;		/* Database name: "primary" or "secondary". */
1246 
1247 	/*
1248 	 * Clear these Berkeley DB handles so we know that they do not be
1249 	 * closed or aborted, otherwise cleaned up if an error occurs.
1250 	 */
1251 	dbp = NULL;
1252 	sdbp = NULL;
1253 	txnp = NULL;
1254 
1255 	/* Allocate and initialze the handle for the primary database. */
1256 	if ((ret = db_create(&dbp, dbenv, 0)) != 0) {
1257 		dbenv->err(dbenv, ret, "db_create main database handle");
1258 		return (ret);
1259 	}
1260 
1261 	/*
1262 	 * Configure the error handling for the database, similarly to what was
1263 	 * done for the environment handle. Here we continue to set the error
1264 	 * file to stderr; however, the prefix is set to the the database
1265 	 * filename rather than the program name as was done for the DB_ENV.
1266 	 */
1267 	dbp->set_errfile(dbp, stderr);
1268 	dbname = PRIMARY_NAME;
1269 	dbp->set_errpfx(dbp, dbname);
1270 
1271 	/*
1272 	 * By setting the btree comparison function, the records in the primary
1273 	 * database will be stored and retrieved in the numerical order. Without
1274 	 * this the keys will be sorted as byte streams. On little-endian CPUs,
1275 	 * the values returned would not be in numeric order.
1276 	 */
1277 	if ((ret = dbp->set_bt_compare(dbp, compare_int)) != 0) {
1278 		dbp->err(dbp, ret, "set_bt_compare");
1279 		goto err_cleanup;
1280 	}
1281 	/*
1282 	 * Set the size of the Berkeley DB pages to use. This is a tuning
1283 	 * paramter; it does not limit the length of keys or values.
1284 	 */
1285 	if ((ret = dbp->set_pagesize(dbp, pagesize)) != 0) {
1286 		dbp->err(dbp, ret, "set_pagesize(%d)", pagesize);
1287 		goto err_cleanup;
1288 	}
1289 	/*
1290 	 * Permits duplicates if duplicates were requested. Without this it is
1291 	 * not permitted to have two key-value pairs with the same key.
1292 	 */
1293 	if (dups && (ret = dbp->set_flags(dbp, DB_DUP)) != 0) {
1294 		dbp->err(dbp, ret, "set_flags(DB_DUP)");
1295 		goto err_cleanup;
1296 	}
1297 
1298 	/* Begin the transaction to use for creating the database(s). */
1299 	if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0)
1300 		goto err_cleanup;
1301 	/*
1302 	 * This DB->open() call creates the database file in the file system,
1303 	 * creates a sub-database dbname ("primary") inside that file, and
1304 	 * opens that sub-database into the handle 'dbp'.
1305 	 */
1306 	if ((ret = dbp->open(dbp, txnp,
1307 	    DATABASE_FILE, dbname, DB_BTREE, DB_CREATE , 0664)) != 0) {
1308 		dbp->err(dbp, ret, "DB->open(%s)", dbname);
1309 		goto err_cleanup;
1310 	}
1311 	*dbpp = dbp;
1312 
1313 	if (secondaryflag) {
1314 		/*
1315 		 * Create a handle for a secondary database. Berkeley DB will
1316 		 * maintain it as a secondary index of the primary database,
1317 		 * using the first character of the primary's data value as the
1318 		 * key.  After the secondary is configured and opened, the
1319 		 * DB->associate() further down ties the databases together.
1320 		 */
1321 		if ((ret = db_create(&sdbp, dbenv, 0)) != 0) {
1322 			dbenv->err(dbenv, ret, "%s: secondary db_create");
1323 			goto err_cleanup;
1324 		}
1325 		dbname = SECONDARY_NAME;
1326 		dbp->set_errpfx(dbp, dbname);
1327 		/*
1328 		 * Enable support for sorted duplicate data values in the
1329 		 * secondary database.  There are many key+value pairs in the
1330 		 * primary database, by default 1 million.  Since the secondary
1331 		 * key is only 1 character, not early enough to distinguish
1332 		 * that many records, duplicate support is needed. Sorted
1333 		 * duplicates are faster than unsorted (DB_DUP) duplicates.
1334 		 */
1335 		if ((ret = sdbp->set_flags(sdbp, DB_DUPSORT)) != 0) {
1336 			sdbp->err(sdbp, ret, "set_flags(DB_DUPSORT)");
1337 			goto err_cleanup;
1338 		}
1339 		if ((ret = sdbp->open(sdbp, txnp,
1340 		     DATABASE_FILE, dbname, DB_BTREE, DB_CREATE, 0664)) != 0) {
1341 			sdbp->err(sdbp,
1342 			    ret, "%s: secondary open", DATABASE_FILE);
1343 			goto err_cleanup;
1344 		}
1345 
1346 		/*
1347 		 * The associate call connects the secondary database to the
1348 		 * primary database, indicating that when a key+value pair is
1349 		 * added to the primary, the function get_first_str() is to be
1350 		 * called to generate the key for the pair to be added to the
1351 		 * secondary. The data portion of the pair will be the primary's
1352 		 * key. This allows the secondary to 'point' to the primary.
1353 		 */
1354 		if ((ret =
1355 		    dbp->associate(dbp, txnp, sdbp, get_first_str, 0)) != 0) {
1356 			dbp->err(dbp, ret, "associate");
1357 			goto err_cleanup;
1358 		}
1359 	}
1360 	*sdbpp = sdbp;
1361 
1362 	/*
1363 	 * Commit the transaction which opens and possible creates the on-disk
1364 	 * database file.
1365 	 */
1366 	ret = txnp->commit(txnp, 0);
1367 	txnp = NULL;
1368 	if (ret != 0)
1369 		goto err_cleanup;
1370 
1371 	return (0);
1372 
1373 	/* This label is used by any error in this function which requires
1374 	 * releasing any locally acquired resources, such as allocated
1375 	 * database handles or active transactions.
1376 	 */
1377 err_cleanup:
1378 	if (txnp != NULL)
1379 		(void)txnp->abort(txnp);
1380 	if (sdbp != NULL)
1381 		(void)sdbp->close(sdbp, 0);
1382 	if (dbp != NULL)
1383 		(void)dbp->close(dbp, 0);
1384 	return (ret);
1385 }
1386 
1387 /*
1388  * env_init --
1389  *	Create the environment handle, then configure and open it for the
1390  *	Transactional Data Store (TDS).
1391  *
1392  */
1393 DB_ENV *
env_init(home,progname,cachesize)1394 env_init(home, progname, cachesize)
1395 	const char *home;
1396 	const char *progname;
1397 	u_int cachesize;
1398 {
1399 	DB_ENV *dbenv;
1400 	int ret;
1401 
1402 	/* Allocate and initialize an empty environment handle. */
1403 	if ((ret = db_env_create(&dbenv, 0)) != 0) {
1404 		dbenv->err(dbenv, ret, "db_env_create");
1405 		return (NULL);
1406 	}
1407 
1408 	/*
1409 	 * Send error messages to the standard error stream. You could also
1410 	 * open an application-specific log file to use here.
1411 	 */
1412 	dbenv->set_errfile(dbenv, stderr);
1413 
1414 	/* Include the name of the program before each error message. */
1415 	dbenv->set_errpfx(dbenv, progname);
1416 
1417 	/* Set the size of the cache which holds database pages. */
1418 	if ((ret = dbenv->set_cachesize(dbenv, 0, cachesize, 0)) != 0) {
1419 		dbenv->err(dbenv, ret, "DB_ENV->set_cachesize(%u)", cachesize);
1420 		return (NULL);
1421 	}
1422 
1423 	/*
1424 	 * Open the now-configured environment handle, creating the support
1425 	 * files required by the Berkeley DB Transactional Data Store and
1426 	 * setting up the in-memory data structures of the dbenv for DB access.
1427 	 */
1428 	if ((ret = dbenv->open(dbenv, home, DB_CREATE | DB_INIT_MPOOL |
1429 	    DB_INIT_TXN | DB_INIT_LOCK, 0)) != 0) {
1430 		dbenv->err(dbenv, ret, "DB_ENV->open(%s, TDS)", home);
1431 		(void)dbenv->close(dbenv, 0);
1432 		return (NULL);
1433 	}
1434 	return (dbenv);
1435 }
1436 
1437 /*
1438  * get_first_str --
1439  *	Construct the key of this example's secondary index.
1440  *	The secondary's key is the first character of the string field
1441  *	in the primary database's value DBT.
1442  */
1443 int
get_first_str(sdbp,key,data,skey)1444 get_first_str(sdbp, key, data, skey)
1445 	DB *sdbp;
1446 	const DBT *key;
1447 	const DBT *data;
1448 	DBT *skey;
1449 {
1450 	/* Suppress "warning: <var> not used" messages in many compilers. */
1451 	sdbp = NULL;
1452 	key = NULL;
1453 
1454 	memset(skey, 0, sizeof(DBT));
1455 	skey->data = ((bulk_record_t *)(data->data))->str;
1456 	skey->size = sizeof(char);
1457 	return (0);
1458 }
1459 
1460 /*
1461  * rotate_string --
1462  *	Perform a left-circular-shift while copying a STRINGLEN-sized char array.
1463  *	The first character in the destination is the 'offset'th character
1464  *	of the source; the second is from 'offset'+1, etc.
1465  *		src  = "abcde", offset = 3 results in -->
1466  *		dest = "deabc"
1467  *	The source and destination are required to be STRINGLEN bytes long.
1468  *	No null character is appended.
1469  */
1470 int
rotate_string(source,dest,offset)1471 rotate_string(source, dest, offset)
1472 	const char *source;
1473 	char *dest;
1474 	int offset;
1475 {
1476 	unsigned i;
1477 
1478 	for (i = 0; i < STRINGLEN; i++)
1479 		dest[i] = source[(offset + i) % STRINGLEN];
1480 	return (0);
1481 }
1482 /*
1483  * usage --
1484  *	Describe the command line options.
1485  */
1486 void
usage()1487 usage()
1488 {
1489 	(void)fprintf(stderr,
1490 	    "Usage: %s \n"
1491 	    "    -bN	batch size: number of records per bulk operation [100]\n"
1492 	    "    -cN	cachesize [1000 * pagesize]\n"
1493 	    "    -dN	duplicates: number of values for each key [0]\n"
1494 	    "    -iN	number of bulk get calls in read mode [1000000]\n"
1495 	    "    -nN	number of keys [1000000]\n"
1496 	    "    -pN	set pagesize: a power of 2 from 512 to 65536 [65536]\n"
1497 	    "    -v	verbose output\n"
1498 	    "    -D	set mode to perform bulk deletes\n"
1499 	    "    -R	set mode to perform bulk reads\n"
1500 	    "    -S	perform bulk operations in secondary database\n",
1501 			Progname);
1502 	exit(EXIT_FAILURE);
1503 }
1504