1 /*-
2 * Copyright (c) 2005, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file EXAMPLES-LICENSE for license information.
5 *
6 * $Id$
7 */
8
9 /* File: txn_guide_inmemory.c */
10
11 /* We assume an ANSI-compatible compiler */
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <db.h>
16
17 #ifdef _WIN32
18 #include <windows.h>
19 #define PATHD '\\'
20 extern int getopt(int, char * const *, const char *);
21 extern char *optarg;
22
23 /* Wrap Windows thread API to make it look POSIXey. */
24 typedef HANDLE thread_t;
25 #define thread_create(thrp, attr, func, arg) \
26 (((*(thrp) = CreateThread(NULL, 0, \
27 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
28 #define thread_join(thr, statusp) \
29 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
30 ((statusp == NULL) ? 0 : \
31 (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)))
32
33 typedef HANDLE mutex_t;
34 #define mutex_init(m, attr) \
35 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
36 #define mutex_lock(m) \
37 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
38 #define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
39 #else
40 #include <pthread.h>
41 #include <unistd.h>
42 #define PATHD '/'
43
44 typedef pthread_t thread_t;
45 #define thread_create(thrp, attr, func, arg) \
46 pthread_create((thrp), (attr), (func), (arg))
47 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
48
49 typedef pthread_mutex_t mutex_t;
50 #define mutex_init(m, attr) pthread_mutex_init((m), (attr))
51 #define mutex_lock(m) pthread_mutex_lock(m)
52 #define mutex_unlock(m) pthread_mutex_unlock(m)
53 #endif
54
55 /* Run 5 writers threads at a time. */
56 #define NUMWRITERS 5
57
58 /*
59 * Printing of a thread_t is implementation-specific, so we
60 * create our own thread IDs for reporting purposes.
61 */
62 int global_thread_num;
63 mutex_t thread_num_lock;
64
65 /* Forward declarations */
66 int count_records(DB *, DB_TXN *);
67 int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t);
68 void *writer_thread(void *);
69
70 int
main(void)71 main(void)
72 {
73 /* Initialize our handles */
74 DB *dbp = NULL;
75 DB_ENV *envp = NULL;
76
77 thread_t writer_threads[NUMWRITERS];
78 int i, ret, ret_t;
79 u_int32_t env_flags;
80
81 /* Application name */
82 const char *prog_name = "txn_guide_inmemory";
83
84 /* Create the environment */
85 ret = db_env_create(&envp, 0);
86 if (ret != 0) {
87 fprintf(stderr, "Error creating environment handle: %s\n",
88 db_strerror(ret));
89 goto err;
90 }
91
92 env_flags =
93 DB_CREATE | /* Create the environment if it does not exist */
94 DB_INIT_LOCK | /* Initialize the locking subsystem */
95 DB_INIT_LOG | /* Initialize the logging subsystem */
96 DB_INIT_TXN | /* Initialize the transactional subsystem. This
97 * also turns on logging. */
98 DB_INIT_MPOOL | /* Initialize the memory pool (in-memory cache) */
99 DB_PRIVATE | /* Region files are backed by heap memory. */
100 DB_THREAD; /* Cause the environment to be free-threaded */
101
102 /* Specify in-memory logging */
103 ret = envp->log_set_config(envp, DB_LOG_IN_MEMORY, 1);
104 if (ret != 0) {
105 fprintf(stderr, "Error setting log subsystem to in-memory: %s\n",
106 db_strerror(ret));
107 goto err;
108 }
109
110 /*
111 * Specify the size of the in-memory log buffer.
112 */
113 ret = envp->set_lg_bsize(envp, 10 * 1024 * 1024);
114 if (ret != 0) {
115 fprintf(stderr, "Error increasing the log buffer size: %s\n",
116 db_strerror(ret));
117 goto err;
118 }
119
120 /*
121 * Specify the size of the in-memory cache.
122 */
123 ret = envp->set_cachesize(envp, 0, 10 * 1024 * 1024, 1);
124 if (ret != 0) {
125 fprintf(stderr, "Error increasing the cache size: %s\n",
126 db_strerror(ret));
127 goto err;
128 }
129
130 /*
131 * Indicate that we want db to perform lock detection internally.
132 * Also indicate that the transaction with the fewest number of
133 * write locks will receive the deadlock notification in
134 * the event of a deadlock.
135 */
136 ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE);
137 if (ret != 0) {
138 fprintf(stderr, "Error setting lock detect: %s\n",
139 db_strerror(ret));
140 goto err;
141 }
142
143 /* Now actually open the environment */
144 ret = envp->open(envp, NULL, env_flags, 0);
145 if (ret != 0) {
146 fprintf(stderr, "Error opening environment: %s\n",
147 db_strerror(ret));
148 goto err;
149 }
150
151 /*
152 * If we had utility threads (for running checkpoints or
153 * deadlock detection, for example) we would spawn those
154 * here. However, for a simple example such as this,
155 * that is not required.
156 */
157
158 /* Open the database */
159 ret = open_db(&dbp, prog_name, NULL,
160 envp, DB_DUPSORT);
161 if (ret != 0)
162 goto err;
163
164 /* Initialize a mutex. Used to help provide thread ids. */
165 (void)mutex_init(&thread_num_lock, NULL);
166
167 /* Start the writer threads. */
168 for (i = 0; i < NUMWRITERS; i++)
169 (void)thread_create(
170 &writer_threads[i], NULL, writer_thread, (void *)dbp);
171
172 /* Join the writers */
173 for (i = 0; i < NUMWRITERS; i++)
174 (void)thread_join(writer_threads[i], NULL);
175
176 err:
177 /* Close our database handle, if it was opened. */
178 if (dbp != NULL) {
179 ret_t = dbp->close(dbp, 0);
180 if (ret_t != 0) {
181 fprintf(stderr, "%s database close failed.\n",
182 db_strerror(ret_t));
183 ret = ret_t;
184 }
185 }
186
187 /* Close our environment, if it was opened. */
188 if (envp != NULL) {
189 ret_t = envp->close(envp, 0);
190 if (ret_t != 0) {
191 fprintf(stderr, "environment close failed: %s\n",
192 db_strerror(ret_t));
193 ret = ret_t;
194 }
195 }
196
197 /* Final status message and return. */
198 printf("I'm all done.\n");
199 return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
200 }
201
202 /*
203 * A function that performs a series of writes to a
204 * Berkeley DB database. The information written
205 * to the database is largely nonsensical, but the
206 * mechanism of transactional commit/abort and
207 * deadlock detection is illustrated here.
208 */
209 void *
writer_thread(void * args)210 writer_thread(void *args)
211 {
212 static char *key_strings[] = {
213 "key 1", "key 2", "key 3", "key 4", "key 5",
214 "key 6", "key 7", "key 8", "key 9", "key 10"
215 };
216 DB *dbp;
217 DB_ENV *envp;
218 DBT key, value;
219 DB_TXN *txn;
220 int i, j, payload, ret, thread_num;
221 int retry_count, max_retries = 20; /* Max retry on a deadlock */
222
223 dbp = (DB *)args;
224 envp = dbp->get_env(dbp);
225
226 /* Get the thread number */
227 (void)mutex_lock(&thread_num_lock);
228 global_thread_num++;
229 thread_num = global_thread_num;
230 (void)mutex_unlock(&thread_num_lock);
231
232 /* Initialize the random number generator */
233 srand(thread_num);
234
235 /* Write 50 times and then quit */
236 for (i = 0; i < 50; i++) {
237 retry_count = 0; /* Used for deadlock retries */
238
239 /*
240 * Some think it is bad form to loop with a goto statement, but
241 * we do it anyway because it is the simplest and clearest way
242 * to achieve our abort/retry operation.
243 */
244 retry:
245 /* Begin our transaction. We group multiple writes in
246 * this thread under a single transaction so as to
247 * (1) show that you can atomically perform multiple writes
248 * at a time, and (2) to increase the chances of a
249 * deadlock occurring so that we can observe our
250 * deadlock detection at work.
251 *
252 * Normally we would want to avoid the potential for deadlocks,
253 * so for this workload the correct thing would be to perform our
254 * puts with autocommit. But that would excessively simplify our
255 * example, so we do the "wrong" thing here instead.
256 */
257 ret = envp->txn_begin(envp, NULL, &txn, 0);
258 if (ret != 0) {
259 envp->err(envp, ret, "txn_begin failed");
260 return ((void *)EXIT_FAILURE);
261 }
262 for (j = 0; j < 10; j++) {
263 /* Set up our key and values DBTs */
264 memset(&key, 0, sizeof(DBT));
265 key.data = key_strings[j];
266 key.size = (u_int32_t)strlen(key_strings[j]) + 1;
267
268 memset(&value, 0, sizeof(DBT));
269 payload = rand() + i;
270 value.data = &payload;
271 value.size = sizeof(int);
272
273 /* Perform the database put. */
274 switch (ret = dbp->put(dbp, txn, &key, &value, 0)) {
275 case 0:
276 break;
277
278 /*
279 * Here's where we perform deadlock detection. If
280 * DB_LOCK_DEADLOCK is returned by the put operation,
281 * then this thread has been chosen to break a deadlock.
282 * It must abort its operation, and optionally retry the
283 * put.
284 */
285 case DB_LOCK_DEADLOCK:
286 /*
287 * First thing that we MUST do is abort the
288 * transaction.
289 */
290 (void)txn->abort(txn);
291 /*
292 * Now we decide if we want to retry the operation.
293 * If we have retried less than max_retries,
294 * increment the retry count and goto retry.
295 */
296 if (retry_count < max_retries) {
297 printf("Writer %i: Got DB_LOCK_DEADLOCK.\n",
298 thread_num);
299 printf("Writer %i: Retrying write operation.\n",
300 thread_num);
301 retry_count++;
302 goto retry;
303 }
304 /*
305 * Otherwise, just give up.
306 */
307 printf("Writer %i: ", thread_num);
308 printf("Got DB_LOCK_DEADLOCK and out of retries.\n");
309 printf("Writer %i: Giving up.\n", thread_num);
310 return ((void *)EXIT_FAILURE);
311 /*
312 * If a generic error occurs, we simply abort the
313 * transaction and exit the thread completely.
314 */
315 default:
316 envp->err(envp, ret, "db put failed");
317 ret = txn->abort(txn);
318 if (ret != 0)
319 envp->err(envp, ret, "txn abort failed");
320 return ((void *)EXIT_FAILURE);
321 } /** End case statement **/
322
323 } /** End for loop **/
324
325 /*
326 * print the number of records found in the database.
327 * See count_records() for usage information.
328 */
329 printf("Thread %i. Record count: %i\n", thread_num,
330 count_records(dbp, txn));
331
332 /*
333 * If all goes well, we can commit the transaction and
334 * exit the thread.
335 */
336 ret = txn->commit(txn, 0);
337 if (ret != 0) {
338 envp->err(envp, ret, "txn commit failed");
339 return ((void *)EXIT_FAILURE);
340 }
341 }
342 return ((void *)EXIT_SUCCESS);
343 }
344
345 /*
346 * This simply counts the number of records contained in the
347 * database and returns the result. You can use this function
348 * in three ways:
349 *
350 * First call it with an active txn handle (this is what the
351 * example currently does).
352 *
353 * Secondly, configure the cursor for uncommitted reads.
354 *
355 * Third, call count_records AFTER the writer has committed
356 * its transaction.
357 *
358 * If you do none of these things, the writer thread will
359 * self-deadlock.
360 *
361 * Note that this function exists only for illustrative purposes.
362 * A more straight-forward way to count the number of records in
363 * a database is to use DB->stat() or DB->stat_print().
364 */
365
366 int
count_records(DB * dbp,DB_TXN * txn)367 count_records(DB *dbp, DB_TXN *txn)
368 {
369 DBT key, value;
370 DBC *cursorp;
371 int count, ret;
372
373 cursorp = NULL;
374 count = 0;
375
376 /* Get the cursor */
377 ret = dbp->cursor(dbp, txn, &cursorp, 0);
378 if (ret != 0) {
379 dbp->err(dbp, ret,
380 "count_records: cursor open failed.");
381 goto cursor_err;
382 }
383
384 /* Get the key DBT used for the database read */
385 memset(&key, 0, sizeof(DBT));
386 memset(&value, 0, sizeof(DBT));
387 do {
388 ret = cursorp->get(cursorp, &key, &value, DB_NEXT);
389 switch (ret) {
390 case 0:
391 count++;
392 break;
393 case DB_NOTFOUND:
394 break;
395 default:
396 dbp->err(dbp, ret,
397 "Count records unspecified error");
398 goto cursor_err;
399 }
400 } while (ret == 0);
401
402 cursor_err:
403 if (cursorp != NULL) {
404 ret = cursorp->close(cursorp);
405 if (ret != 0) {
406 dbp->err(dbp, ret,
407 "count_records: cursor close failed.");
408 }
409 }
410
411 return (count);
412 }
413
414 /* Open a Berkeley DB database */
415 int
open_db(DB ** dbpp,const char * progname,const char * file_name,DB_ENV * envp,u_int32_t extra_flags)416 open_db(DB **dbpp, const char *progname, const char *file_name,
417 DB_ENV *envp, u_int32_t extra_flags)
418 {
419 int ret;
420 u_int32_t open_flags;
421 DB *dbp;
422
423 /* Initialize the DB handle */
424 ret = db_create(&dbp, envp, 0);
425 if (ret != 0) {
426 fprintf(stderr, "%s: %s\n", progname,
427 db_strerror(ret));
428 return (EXIT_FAILURE);
429 }
430
431 /* Point to the memory malloc'd by db_create() */
432 *dbpp = dbp;
433
434 if (extra_flags != 0) {
435 ret = dbp->set_flags(dbp, extra_flags);
436 if (ret != 0) {
437 dbp->err(dbp, ret,
438 "open_db: Attempt to set extra flags failed.");
439 return (EXIT_FAILURE);
440 }
441 }
442
443 /* Now open the database */
444 open_flags = DB_CREATE | /* Allow database creation */
445 DB_THREAD |
446 DB_AUTO_COMMIT; /* Allow autocommit */
447
448 ret = dbp->open(dbp, /* Pointer to the database */
449 NULL, /* Txn pointer */
450 file_name, /* File name */
451 NULL, /* Logical db name */
452 DB_BTREE, /* Database type (using btree) */
453 open_flags, /* Open flags */
454 0); /* File mode. Using defaults */
455
456 if (ret != 0) {
457 dbp->err(dbp, ret, "Database open failed");
458 return (EXIT_FAILURE);
459 }
460 return (EXIT_SUCCESS);
461 }
462