1 /*-
2  * Copyright (c) 2005, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file EXAMPLES-LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 /* File: txn_guide_inmemory.c */
10 
11 /* We assume an ANSI-compatible compiler */
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <db.h>
16 
17 #ifdef _WIN32
18 #include <windows.h>
19 #define	PATHD '\\'
20 extern int getopt(int, char * const *, const char *);
21 extern char *optarg;
22 
23 /* Wrap Windows thread API to make it look POSIXey. */
24 typedef HANDLE thread_t;
25 #define	thread_create(thrp, attr, func, arg)                               \
26     (((*(thrp) = CreateThread(NULL, 0,                                     \
27 	(LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
28 #define	thread_join(thr, statusp)                                          \
29     ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
30     ((statusp == NULL) ? 0 :						   \
31     (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)))
32 
33 typedef HANDLE mutex_t;
34 #define	mutex_init(m, attr)                                                \
35     (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
36 #define	mutex_lock(m)                                                      \
37     ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
38 #define	mutex_unlock(m)         (ReleaseMutex(*(m)) ? 0 : -1)
39 #else
40 #include <pthread.h>
41 #include <unistd.h>
42 #define	PATHD '/'
43 
44 typedef pthread_t thread_t;
45 #define	thread_create(thrp, attr, func, arg)                               \
46     pthread_create((thrp), (attr), (func), (arg))
47 #define	thread_join(thr, statusp) pthread_join((thr), (statusp))
48 
49 typedef pthread_mutex_t mutex_t;
50 #define	mutex_init(m, attr)     pthread_mutex_init((m), (attr))
51 #define	mutex_lock(m)           pthread_mutex_lock(m)
52 #define	mutex_unlock(m)         pthread_mutex_unlock(m)
53 #endif
54 
55 /* Run 5 writers threads at a time. */
56 #define	NUMWRITERS 5
57 
58 /*
59  * Printing of a thread_t is implementation-specific, so we
60  * create our own thread IDs for reporting purposes.
61  */
62 int global_thread_num;
63 mutex_t thread_num_lock;
64 
65 /* Forward declarations */
66 int count_records(DB *, DB_TXN *);
67 int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t);
68 void *writer_thread(void *);
69 
70 int
main(void)71 main(void)
72 {
73     /* Initialize our handles */
74     DB *dbp = NULL;
75     DB_ENV *envp = NULL;
76 
77     thread_t writer_threads[NUMWRITERS];
78     int i, ret, ret_t;
79     u_int32_t env_flags;
80 
81     /* Application name */
82     const char *prog_name = "txn_guide_inmemory";
83 
84     /* Create the environment */
85     ret = db_env_create(&envp, 0);
86     if (ret != 0) {
87 	fprintf(stderr, "Error creating environment handle: %s\n",
88 	    db_strerror(ret));
89 	goto err;
90     }
91 
92     env_flags =
93       DB_CREATE     |  /* Create the environment if it does not exist */
94       DB_INIT_LOCK  |  /* Initialize the locking subsystem */
95       DB_INIT_LOG   |  /* Initialize the logging subsystem */
96       DB_INIT_TXN   |  /* Initialize the transactional subsystem. This
97 			* also turns on logging. */
98       DB_INIT_MPOOL |  /* Initialize the memory pool (in-memory cache) */
99       DB_PRIVATE    |  /* Region files are backed by heap memory.  */
100       DB_THREAD;       /* Cause the environment to be free-threaded */
101 
102     /* Specify in-memory logging */
103     ret = envp->log_set_config(envp, DB_LOG_IN_MEMORY, 1);
104     if (ret != 0) {
105 	fprintf(stderr, "Error setting log subsystem to in-memory: %s\n",
106 	    db_strerror(ret));
107 	goto err;
108     }
109 
110     /*
111      * Specify the size of the in-memory log buffer.
112      */
113     ret = envp->set_lg_bsize(envp, 10 * 1024 * 1024);
114     if (ret != 0) {
115 	fprintf(stderr, "Error increasing the log buffer size: %s\n",
116 	    db_strerror(ret));
117 	goto err;
118     }
119 
120     /*
121      * Specify the size of the in-memory cache.
122      */
123     ret = envp->set_cachesize(envp, 0, 10 * 1024 * 1024, 1);
124     if (ret != 0) {
125 	fprintf(stderr, "Error increasing the cache size: %s\n",
126 	    db_strerror(ret));
127 	goto err;
128     }
129 
130      /*
131      * Indicate that we want db to perform lock detection internally.
132      * Also indicate that the transaction with the fewest number of
133      * write locks will receive the deadlock notification in
134      * the event of a deadlock.
135      */
136     ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE);
137     if (ret != 0) {
138 	fprintf(stderr, "Error setting lock detect: %s\n",
139 	    db_strerror(ret));
140 	goto err;
141     }
142 
143     /* Now actually open the environment */
144     ret = envp->open(envp, NULL, env_flags, 0);
145     if (ret != 0) {
146 	fprintf(stderr, "Error opening environment: %s\n",
147 	    db_strerror(ret));
148 	goto err;
149     }
150 
151     /*
152      * If we had utility threads (for running checkpoints or
153      * deadlock detection, for example) we would spawn those
154      * here. However, for a simple example such as this,
155      * that is not required.
156      */
157 
158     /* Open the database */
159     ret = open_db(&dbp, prog_name, NULL,
160       envp, DB_DUPSORT);
161     if (ret != 0)
162 	goto err;
163 
164     /* Initialize a mutex. Used to help provide thread ids. */
165     (void)mutex_init(&thread_num_lock, NULL);
166 
167     /* Start the writer threads. */
168     for (i = 0; i < NUMWRITERS; i++)
169 	(void)thread_create(
170 	    &writer_threads[i], NULL, writer_thread, (void *)dbp);
171 
172     /* Join the writers */
173     for (i = 0; i < NUMWRITERS; i++)
174 	(void)thread_join(writer_threads[i], NULL);
175 
176 err:
177     /* Close our database handle, if it was opened. */
178     if (dbp != NULL) {
179 	ret_t = dbp->close(dbp, 0);
180 	if (ret_t != 0) {
181 	    fprintf(stderr, "%s database close failed.\n",
182 		db_strerror(ret_t));
183 	    ret = ret_t;
184 	}
185     }
186 
187     /* Close our environment, if it was opened. */
188     if (envp != NULL) {
189 	ret_t = envp->close(envp, 0);
190 	if (ret_t != 0) {
191 	    fprintf(stderr, "environment close failed: %s\n",
192 		db_strerror(ret_t));
193 		ret = ret_t;
194 	}
195     }
196 
197     /* Final status message and return. */
198     printf("I'm all done.\n");
199     return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
200 }
201 
202 /*
203  * A function that performs a series of writes to a
204  * Berkeley DB database. The information written
205  * to the database is largely nonsensical, but the
206  * mechanism of transactional commit/abort and
207  * deadlock detection is illustrated here.
208  */
209 void *
writer_thread(void * args)210 writer_thread(void *args)
211 {
212     static char *key_strings[] = {
213 	"key 1", "key 2", "key 3", "key 4", "key 5",
214 	"key 6", "key 7", "key 8", "key 9", "key 10"
215     };
216     DB *dbp;
217     DB_ENV *envp;
218     DBT key, value;
219     DB_TXN *txn;
220     int i, j, payload, ret, thread_num;
221     int retry_count, max_retries = 20;   /* Max retry on a deadlock */
222 
223     dbp = (DB *)args;
224     envp = dbp->get_env(dbp);
225 
226     /* Get the thread number */
227     (void)mutex_lock(&thread_num_lock);
228     global_thread_num++;
229     thread_num = global_thread_num;
230     (void)mutex_unlock(&thread_num_lock);
231 
232     /* Initialize the random number generator */
233     srand(thread_num);
234 
235     /* Write 50 times and then quit */
236     for (i = 0; i < 50; i++) {
237 	retry_count = 0; /* Used for deadlock retries */
238 
239 	/*
240 	 * Some think it is bad form to loop with a goto statement, but
241 	 * we do it anyway because it is the simplest and clearest way
242 	 * to achieve our abort/retry operation.
243 	 */
244 retry:
245 	/* Begin our transaction. We group multiple writes in
246 	 * this thread under a single transaction so as to
247 	 * (1) show that you can atomically perform multiple writes
248 	 * at a time, and (2) to increase the chances of a
249 	 * deadlock occurring so that we can observe our
250 	 * deadlock detection at work.
251 	 *
252 	 * Normally we would want to avoid the potential for deadlocks,
253 	 * so for this workload the correct thing would be to perform our
254 	 * puts with autocommit. But that would excessively simplify our
255 	 * example, so we do the "wrong" thing here instead.
256 	 */
257 	ret = envp->txn_begin(envp, NULL, &txn, 0);
258 	if (ret != 0) {
259 	    envp->err(envp, ret, "txn_begin failed");
260 	    return ((void *)EXIT_FAILURE);
261 	}
262 	for (j = 0; j < 10; j++) {
263 	    /* Set up our key and values DBTs */
264 	    memset(&key, 0, sizeof(DBT));
265 	    key.data = key_strings[j];
266 	    key.size = (u_int32_t)strlen(key_strings[j]) + 1;
267 
268 	    memset(&value, 0, sizeof(DBT));
269 	    payload = rand() + i;
270 	    value.data = &payload;
271 	    value.size = sizeof(int);
272 
273 	    /* Perform the database put. */
274 	    switch (ret = dbp->put(dbp, txn, &key, &value, 0)) {
275 		case 0:
276 		    break;
277 
278 		/*
279 		 * Here's where we perform deadlock detection. If
280 		 * DB_LOCK_DEADLOCK is returned by the put operation,
281 		 * then this thread has been chosen to break a deadlock.
282 		 * It must abort its operation, and optionally retry the
283 		 * put.
284 		 */
285 		case DB_LOCK_DEADLOCK:
286 		    /*
287 		     * First thing that we MUST do is abort the
288 		     * transaction.
289 		     */
290 		    (void)txn->abort(txn);
291 		    /*
292 		     * Now we decide if we want to retry the operation.
293 		     * If we have retried less than max_retries,
294 		     * increment the retry count and goto retry.
295 		     */
296 		    if (retry_count < max_retries) {
297 			printf("Writer %i: Got DB_LOCK_DEADLOCK.\n",
298 			    thread_num);
299 			printf("Writer %i: Retrying write operation.\n",
300 			    thread_num);
301 			retry_count++;
302 			goto retry;
303 		    }
304 		    /*
305 		     * Otherwise, just give up.
306 		     */
307 		    printf("Writer %i: ", thread_num);
308 		    printf("Got DB_LOCK_DEADLOCK and out of retries.\n");
309 		    printf("Writer %i: Giving up.\n", thread_num);
310 		    return ((void *)EXIT_FAILURE);
311 		/*
312 		 * If a generic error occurs, we simply abort the
313 		 * transaction and exit the thread completely.
314 		 */
315 		default:
316 		    envp->err(envp, ret, "db put failed");
317 		    ret = txn->abort(txn);
318 		    if (ret != 0)
319 			envp->err(envp, ret, "txn abort failed");
320 		    return ((void *)EXIT_FAILURE);
321 	     } /** End case statement **/
322 
323 	}   /** End for loop **/
324 
325 	/*
326 	 * print the number of records found in the database.
327 	 * See count_records() for usage information.
328 	 */
329 	printf("Thread %i. Record count: %i\n", thread_num,
330 	    count_records(dbp, txn));
331 
332 	/*
333 	 * If all goes well, we can commit the transaction and
334 	 * exit the thread.
335 	 */
336 	ret = txn->commit(txn, 0);
337 	if (ret != 0) {
338 	    envp->err(envp, ret, "txn commit failed");
339 	    return ((void *)EXIT_FAILURE);
340 	}
341     }
342     return ((void *)EXIT_SUCCESS);
343 }
344 
345 /*
346  * This simply counts the number of records contained in the
347  * database and returns the result. You can use this function
348  * in three ways:
349  *
350  * First call it with an active txn handle (this is what the
351  *  example currently does).
352  *
353  * Secondly, configure the cursor for uncommitted reads.
354  *
355  * Third, call count_records AFTER the writer has committed
356  *    its transaction.
357  *
358  * If you do none of these things, the writer thread will
359  * self-deadlock.
360  *
361  * Note that this function exists only for illustrative purposes.
362  * A more straight-forward way to count the number of records in
363  * a database is to use DB->stat() or DB->stat_print().
364  */
365 
366 int
count_records(DB * dbp,DB_TXN * txn)367 count_records(DB *dbp, DB_TXN *txn)
368 {
369     DBT key, value;
370     DBC *cursorp;
371     int count, ret;
372 
373     cursorp = NULL;
374     count = 0;
375 
376     /* Get the cursor */
377     ret = dbp->cursor(dbp, txn, &cursorp, 0);
378     if (ret != 0) {
379 	dbp->err(dbp, ret,
380 	  "count_records: cursor open failed.");
381 	goto cursor_err;
382     }
383 
384     /* Get the key DBT used for the database read */
385     memset(&key, 0, sizeof(DBT));
386     memset(&value, 0, sizeof(DBT));
387     do {
388 	ret = cursorp->get(cursorp, &key, &value, DB_NEXT);
389 	switch (ret) {
390 	    case 0:
391 		count++;
392 		break;
393 	    case DB_NOTFOUND:
394 		break;
395 	    default:
396 		dbp->err(dbp, ret,
397 		    "Count records unspecified error");
398 		goto cursor_err;
399 	}
400     } while (ret == 0);
401 
402 cursor_err:
403     if (cursorp != NULL) {
404 	ret = cursorp->close(cursorp);
405 	if (ret != 0) {
406 	    dbp->err(dbp, ret,
407 		"count_records: cursor close failed.");
408 	}
409     }
410 
411     return (count);
412 }
413 
414 /* Open a Berkeley DB database */
415 int
open_db(DB ** dbpp,const char * progname,const char * file_name,DB_ENV * envp,u_int32_t extra_flags)416 open_db(DB **dbpp, const char *progname, const char *file_name,
417   DB_ENV *envp, u_int32_t extra_flags)
418 {
419     int ret;
420     u_int32_t open_flags;
421     DB *dbp;
422 
423     /* Initialize the DB handle */
424     ret = db_create(&dbp, envp, 0);
425     if (ret != 0) {
426 	fprintf(stderr, "%s: %s\n", progname,
427 		db_strerror(ret));
428 	return (EXIT_FAILURE);
429     }
430 
431     /* Point to the memory malloc'd by db_create() */
432     *dbpp = dbp;
433 
434     if (extra_flags != 0) {
435 	ret = dbp->set_flags(dbp, extra_flags);
436 	if (ret != 0) {
437 	    dbp->err(dbp, ret,
438 		"open_db: Attempt to set extra flags failed.");
439 	    return (EXIT_FAILURE);
440 	}
441     }
442 
443     /* Now open the database */
444     open_flags = DB_CREATE        | /* Allow database creation */
445 		 DB_THREAD        |
446 		 DB_AUTO_COMMIT;    /* Allow autocommit */
447 
448     ret = dbp->open(dbp,        /* Pointer to the database */
449 		    NULL,       /* Txn pointer */
450 		    file_name,  /* File name */
451 		    NULL,       /* Logical db name */
452 		    DB_BTREE,   /* Database type (using btree) */
453 		    open_flags, /* Open flags */
454 		    0);         /* File mode. Using defaults */
455 
456     if (ret != 0) {
457 	dbp->err(dbp, ret, "Database  open failed");
458 	return (EXIT_FAILURE);
459     }
460     return (EXIT_SUCCESS);
461 }
462