1 /*-
2  * Copyright (c) 2006, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file EXAMPLES-LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include <errno.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <time.h>
13 
14 #include <db.h>
15 
16 #include "rep_common.h"
17 
18 #define	CACHESIZE	(10 * 1024 * 1024)
19 #define	DATABASE	"quote.db"
20 #define	SLEEPTIME	3
21 
22 /*
23  * Definition of thread-specific data key for PERM_FAILED structure
24  * stored in thread local storage.
25  */
26 #ifdef _WIN32
27 /* Windows style. */
28 DWORD permfail_key;
29 #else
30 /* Posix style. */
31 pthread_key_t permfail_key;
32 #endif
33 
34 static int print_stocks __P((DB *));
35 
36 /*
37  * Perform command line parsing and common replication setup for the repmgr
38  * and base replication example programs.
39  */
40 int
common_rep_setup(dbenv,argc,argv,setup_info)41 common_rep_setup(dbenv, argc, argv, setup_info)
42 	DB_ENV *dbenv;
43 	int argc;
44 	char *argv[];
45 	SETUP_DATA *setup_info;
46 {
47 	repsite_t site;
48 	extern char *optarg;
49 	char ch, *last_colon, *portstr;
50 	int ack_policy, got_self, is_repmgr, maxsites, priority, ret;
51 
52 	got_self = is_repmgr = maxsites = ret = 0;
53 	site.peer = site.creator = 0;
54 
55 	priority = 100;
56 	ack_policy = DB_REPMGR_ACKS_QUORUM;
57 	setup_info->role = UNKNOWN;
58 	if (strncmp(setup_info->progname, "ex_rep_mgr", 10) == 0)
59 		is_repmgr = 1;
60 
61 	/*
62 	 * Replication setup calls that are only needed if a command
63 	 * line option is specified are made within this while/switch
64 	 * statement.  Replication setup calls that should be made
65 	 * whether or not a command line option is specified are after
66 	 * this while/switch statement.
67 	 */
68 	while ((ch = getopt(argc, argv, "a:bCh:L:l:Mn:p:R:r:v")) != EOF) {
69 		switch (ch) {
70 		case 'a':
71 			if (!is_repmgr)
72 				usage(is_repmgr, setup_info->progname);
73 			if (strncmp(optarg, "all", 3) == 0)
74 				ack_policy = DB_REPMGR_ACKS_ALL;
75 			else if (strncmp(optarg, "quorum", 6) != 0)
76 				usage(is_repmgr, setup_info->progname);
77 			break;
78 		case 'b':
79 			/*
80 			 * Configure bulk transfer to send groups of records
81 			 * to clients in a single network transfer.  This is
82 			 * useful for master sites and clients participating
83 			 * in client-to-client synchronization.
84 			 */
85 			if ((ret = dbenv->rep_set_config(dbenv,
86 			    DB_REP_CONF_BULK, 1)) != 0) {
87 				dbenv->err(dbenv, ret,
88 				    "Could not configure bulk transfer.\n");
89 				goto err;
90 			}
91 			break;
92 		case 'C':
93 			setup_info->role = CLIENT;
94 			break;
95 		case 'h':
96 			setup_info->home = optarg;
97 			break;
98 		case 'L':
99 			if (!is_repmgr)
100 				usage(is_repmgr, setup_info->progname);
101 			setup_info->self.creator = 1; /* FALLTHROUGH */
102 		case 'l':
103 			setup_info->self.host = optarg;
104 			/*
105 			 * The final colon in host:port string is the
106 			 * boundary between the host and the port portions
107 			 * of the string.
108 			 */
109 			if ((last_colon = strrchr(optarg, ':')) == NULL ) {
110 				fprintf(stderr,
111 				    "Bad local host specification.\n");
112 				goto err;
113 			}
114 			/*
115 			 * Separate the host and port portions of the
116 			 * string for further processing.
117 			 */
118 			portstr = last_colon + 1;
119 			*last_colon = '\0';
120 			setup_info->self.port = (unsigned short)atoi(portstr);
121 			setup_info->self.peer = 0;
122 			got_self = 1;
123 			break;
124 		case 'M':
125 			setup_info->role = MASTER;
126 			break;
127 		case 'n':
128 			if (is_repmgr)
129 				usage(is_repmgr, setup_info->progname);
130 			setup_info->nsites = atoi(optarg);
131 			/*
132 			 * For repmgr, using group membership, we cannot
133 			 * set this any more. For base replication, nsites
134 			 * is simply passed back to main for use in its
135 			 * communications and election processing.
136 			 */
137 			if (setup_info->nsites > 0 &&
138 			    (ret = dbenv->rep_set_nsites(dbenv,
139 			    setup_info->nsites)) != 0) {
140 				dbenv->err(dbenv, ret,
141 				    "Could not set nsites.\n");
142 				goto err;
143 			}
144 			break;
145 		case 'p':
146 			priority = atoi(optarg);
147 			break;
148 		case 'R':
149 			if (!is_repmgr)
150 				usage(is_repmgr, setup_info->progname);
151 			site.peer = 1; /* FALLTHROUGH */
152 		case 'r':
153 			site.host = optarg;
154 			/*
155 			 * The final colon in host:port string is the
156 			 * boundary between the host and the port portions
157 			 * of the string.
158 			 */
159 			if ((last_colon = strrchr(site.host, ':')) == NULL ) {
160 				fprintf(stderr,
161 				    "Bad remote host specification.\n");
162 				goto err;
163 			}
164 			/*
165 			 * Separate the host and port portions of the
166 			 * string for further processing.
167 			 */
168 			portstr = last_colon + 1;
169 			*last_colon = '\0';
170 			site.port = (unsigned short)atoi(portstr);
171 			if (setup_info->site_list == NULL ||
172 			    setup_info->remotesites >= maxsites) {
173 				maxsites = maxsites == 0 ? 10 : 2 * maxsites;
174 				if ((setup_info->site_list =
175 				    realloc(setup_info->site_list,
176 				    maxsites * sizeof(repsite_t))) == NULL) {
177 					fprintf(stderr, "System error %s\n",
178 					    strerror(errno));
179 					goto err;
180 				}
181 			}
182 			(setup_info->site_list)[(setup_info->remotesites)++] =
183 				site;
184 			site.peer = 0;
185 			break;
186 		case 'v':
187 			if ((ret = dbenv->set_verbose(dbenv,
188 			    DB_VERB_REPLICATION, 1)) != 0)
189 				goto err;
190 			break;
191 		case '?':
192 		default:
193 			usage(is_repmgr, setup_info->progname);
194 		}
195 	}
196 
197 	/* Error check command line. */
198 	if (!got_self || setup_info->home == NULL)
199 		usage(is_repmgr, setup_info->progname);
200 	if (!is_repmgr && setup_info->role == UNKNOWN) {
201 		fprintf(stderr, "Must specify -M or -C.\n");
202 		goto err;
203 	}
204 
205 	/*
206 	 * Set replication group election priority for this environment.
207 	 * An election first selects the site with the most recent log
208 	 * records as the new master.  If multiple sites have the most
209 	 * recent log records, the site with the highest priority value
210 	 * is selected as master.
211 	 */
212 	if ((ret = dbenv->rep_set_priority(dbenv, priority)) != 0) {
213 		dbenv->err(dbenv, ret, "Could not set priority.\n");
214 		goto err;
215 	}
216 
217 	/*
218 	 * For repmgr, set the policy that determines how master and client
219 	 * sites handle acknowledgement of replication messages needed for
220 	 * permanent records.  The default policy of "quorum" requires only
221 	 * a quorum of electable peers sufficient to ensure a permanent
222 	 * record remains durable if an election is held.  The "all" option
223 	 * requires all clients to acknowledge a permanent replication
224 	 * message instead.
225 	 */
226 	if (is_repmgr &&
227 	    (ret = dbenv->repmgr_set_ack_policy(dbenv, ack_policy)) != 0) {
228 		dbenv->err(dbenv, ret, "Could not set ack policy.\n");
229 		goto err;
230 	}
231 
232 	/*
233 	 * Set the threshold for the minimum and maximum time the client
234 	 * waits before requesting retransmission of a missing message.
235 	 * Base these values on the performance and load characteristics
236 	 * of the master and client host platforms as well as the round
237 	 * trip message time.
238 	 */
239 	if ((ret = dbenv->rep_set_request(dbenv, 20000, 500000)) != 0) {
240 		dbenv->err(dbenv, ret,
241 		    "Could not set client_retransmission defaults.\n");
242 		goto err;
243 	}
244 
245 	/*
246 	 * Configure deadlock detection to ensure that any deadlocks
247 	 * are broken by having one of the conflicting lock requests
248 	 * rejected. DB_LOCK_DEFAULT uses the lock policy specified
249 	 * at environment creation time or DB_LOCK_RANDOM if none was
250 	 * specified.
251 	 */
252 	if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) {
253 		dbenv->err(dbenv, ret,
254 		    "Could not configure deadlock detection.\n");
255 		goto err;
256 	}
257 
258 	/* The following base replication features may also be useful to your
259 	 * application. See Berkeley DB documentation for more details.
260 	 *   - Master leases: Provide stricter consistency for data reads
261 	 *     on a master site.
262 	 *   - Timeouts: Customize the amount of time Berkeley DB waits
263 	 *     for such things as an election to be concluded or a master
264 	 *     lease to be granted.
265 	 *   - Delayed client synchronization: Manage the master site's
266 	 *     resources by spreading out resource-intensive client
267 	 *     synchronizations.
268 	 *   - Blocked client operations: Return immediately with an error
269 	 *     instead of waiting indefinitely if a client operation is
270 	 *     blocked by an ongoing client synchronization.
271 	 */
272 
273 err:
274 	return (ret);
275 }
276 
277 static int
print_stocks(dbp)278 print_stocks(dbp)
279 	DB *dbp;
280 {
281 	DBC *dbc;
282 	DBT key, data;
283 #define	MAXKEYSIZE	10
284 #define	MAXDATASIZE	20
285 	char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
286 	int ret, t_ret;
287 	u_int32_t keysize, datasize;
288 
289 	if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0) {
290 		dbp->err(dbp, ret, "can't open cursor");
291 		return (ret);
292 	}
293 
294 	memset(&key, 0, sizeof(key));
295 	memset(&data, 0, sizeof(data));
296 
297 	printf("\tSymbol\tPrice\n");
298 	printf("\t======\t=====\n");
299 
300 	for (ret = dbc->get(dbc, &key, &data, DB_FIRST);
301 	    ret == 0;
302 	    ret = dbc->get(dbc, &key, &data, DB_NEXT)) {
303 		keysize = key.size > MAXKEYSIZE ? MAXKEYSIZE : key.size;
304 		memcpy(keybuf, key.data, keysize);
305 		keybuf[keysize] = '\0';
306 
307 		datasize = data.size >= MAXDATASIZE ? MAXDATASIZE : data.size;
308 		memcpy(databuf, data.data, datasize);
309 		databuf[datasize] = '\0';
310 
311 		printf("\t%s\t%s\n", keybuf, databuf);
312 	}
313 	printf("\n");
314 	fflush(stdout);
315 
316 	if ((t_ret = dbc->close(dbc)) != 0 && ret == 0)
317 		ret = t_ret;
318 
319 	switch (ret) {
320 	case 0:
321 	case DB_NOTFOUND:
322 	case DB_LOCK_DEADLOCK:
323 		return (0);
324 	default:
325 		return (ret);
326 	}
327 }
328 
329 /* Start checkpoint and log archive support threads. */
330 int
start_support_threads(dbenv,sup_args,ckp_thr,lga_thr)331 start_support_threads(dbenv, sup_args, ckp_thr, lga_thr)
332 	DB_ENV *dbenv;
333 	supthr_args *sup_args;
334 	thread_t *ckp_thr;
335 	thread_t *lga_thr;
336 {
337 	int ret;
338 
339 	ret = 0;
340 	if ((ret = thread_create(ckp_thr, NULL, checkpoint_thread,
341 	    sup_args)) != 0) {
342 		dbenv->errx(dbenv, "can't create checkpoint thread");
343 		goto err;
344 	}
345 	if ((ret = thread_create(lga_thr, NULL, log_archive_thread,
346 	    sup_args)) != 0)
347 		dbenv->errx(dbenv, "can't create log archive thread");
348 err:
349 	return (ret);
350 
351 }
352 
353 /* Wait for checkpoint and log archive support threads to finish. */
354 int
finish_support_threads(ckp_thr,lga_thr)355 finish_support_threads(ckp_thr, lga_thr)
356 	thread_t *ckp_thr;
357 	thread_t *lga_thr;
358 {
359 	void *ctstatus, *ltstatus;
360 	int ret;
361 
362 	ret = 0;
363 	if (thread_join(*lga_thr, &ltstatus) ||
364 	    thread_join(*ckp_thr, &ctstatus)) {
365 		ret = -1;
366 		goto err;
367 	}
368 	if ((uintptr_t)ltstatus != EXIT_SUCCESS ||
369 	    (uintptr_t)ctstatus != EXIT_SUCCESS)
370 		ret = -1;
371 err:
372 	return (ret);
373 }
374 
375 #define	BUFSIZE 1024
376 
377 int
doloop(dbenv,shared_data)378 doloop(dbenv, shared_data)
379 	DB_ENV *dbenv;
380 	SHARED_DATA *shared_data;
381 {
382 	DB *dbp;
383 	DBT key, data;
384 	permfail_t *pfinfo;
385 	char buf[BUFSIZE], *first, *price;
386 	u_int32_t flags;
387 	int ret;
388 
389 	dbp = NULL;
390 	pfinfo = NULL;
391 	ret = 0;
392 	memset(&key, 0, sizeof(key));
393 	memset(&data, 0, sizeof(data));
394 
395 	/* Allocate put/commit thread's PERM_FAILED structure. */
396 	if (shared_data->is_repmgr) {
397 		if ((pfinfo = malloc(sizeof(permfail_t))) == NULL)
398 			goto err;
399 		if ((ret = thread_setspecific(permfail_key, pfinfo)) != 0)
400 			goto err;
401 		pfinfo->thread_name = "PutCommit";
402 		pfinfo->flag = 0;
403 	}
404 
405 	for (;;) {
406 		printf("QUOTESERVER%s> ",
407 		    shared_data->is_master ? "" : " (read-only)");
408 		fflush(stdout);
409 
410 		if (fgets(buf, sizeof(buf), stdin) == NULL)
411 			break;
412 
413 #define	DELIM " \t\n"
414 		if ((first = strtok(&buf[0], DELIM)) == NULL) {
415 			/* Blank input line. */
416 			price = NULL;
417 		} else if ((price = strtok(NULL, DELIM)) == NULL) {
418 			/* Just one input token. */
419 			if (strncmp(buf, "exit", 4) == 0 ||
420 			    strncmp(buf, "quit", 4) == 0) {
421 				/*
422 				 * This makes the checkpoint and log
423 				 * archive threads stop.
424 				 */
425 				shared_data->app_finished = 1;
426 				break;
427 			}
428 			dbenv->errx(dbenv, "Format: TICKER VALUE");
429 			continue;
430 		} else {
431 			/* Normal two-token input line. */
432 			if (first != NULL && !shared_data->is_master) {
433 				dbenv->errx(dbenv, "Can't update at client");
434 				continue;
435 			}
436 		}
437 
438 		if (dbp == NULL) {
439 			if ((ret = db_create(&dbp, dbenv, 0)) != 0)
440 				return (ret);
441 
442 			flags = DB_AUTO_COMMIT;
443 			/*
444 			 * Open database with DB_CREATE only if this is
445 			 * a master database.  A client database uses
446 			 * polling to attempt to open the database without
447 			 * DB_CREATE until it is successful.
448 			 *
449 			 * This DB_CREATE polling logic can be simplified
450 			 * under some circumstances.  For example, if the
451 			 * application can be sure a database is already
452 			 * there, it would never need to open it with
453 			 * DB_CREATE.
454 			 */
455 			if (shared_data->is_master)
456 				flags |= DB_CREATE;
457 			if ((ret = dbp->open(dbp,
458 			    NULL, DATABASE, NULL, DB_BTREE, flags, 0)) != 0) {
459 				if (ret == ENOENT) {
460 					printf(
461 					  "No stock database yet available.\n");
462 					if ((ret = dbp->close(dbp, 0)) != 0) {
463 						dbenv->err(dbenv, ret,
464 						    "DB->close");
465 						goto err;
466 					}
467 					dbp = NULL;
468 					continue;
469 				}
470 				if (ret == DB_REP_HANDLE_DEAD ||
471 				    ret == DB_LOCK_DEADLOCK) {
472 					dbenv->err(dbenv, ret,
473 					    "please retry the operation");
474 					dbp->close(dbp, DB_NOSYNC);
475 					dbp = NULL;
476 					continue;
477 				}
478 				dbenv->err(dbenv, ret, "DB->open");
479 				goto err;
480 			}
481 			/* Check this thread's PERM_FAILED indicator. */
482 			if (shared_data->is_repmgr) {
483 				pfinfo = (permfail_t *)thread_getspecific(
484 				    permfail_key);
485 				if (pfinfo->flag)
486 					printf(
487 					    "%s Thread: dbopen not durable.\n",
488 					    pfinfo->thread_name);
489 				pfinfo->flag = 0;
490 			}
491 		}
492 
493 		if (first == NULL) {
494 			/*
495 			 * If this is a client in the middle of
496 			 * synchronizing with the master, the client data
497 			 * is possibly stale and won't be displayed until
498 			 * client synchronization is finished.  It is also
499 			 * possible to display the stale data if this is
500 			 * acceptable to the application.
501 			 */
502 			if (shared_data->in_client_sync)
503 				printf(
504 "Cannot read data during client synchronization - please try again.\n");
505 			else
506 				switch ((ret = print_stocks(dbp))) {
507 				case 0:
508 					break;
509 				case DB_REP_HANDLE_DEAD:
510 					(void)dbp->close(dbp, DB_NOSYNC);
511 					dbp = NULL;
512 					break;
513 				default:
514 					dbp->err(dbp, ret,
515 					    "Error traversing data");
516 					goto err;
517 				}
518 		} else {
519 			key.data = first;
520 			key.size = (u_int32_t)strlen(first);
521 
522 			data.data = price;
523 			data.size = (u_int32_t)strlen(price);
524 
525 			if ((ret = dbp->put(dbp,
526 				 NULL, &key, &data, DB_AUTO_COMMIT)) != 0) {
527 				dbp->err(dbp, ret, "DB->put");
528 				goto err;
529 			}
530 			/* Check this thread's PERM_FAILED indicator. */
531 			if (shared_data->is_repmgr) {
532 				pfinfo = (permfail_t *)thread_getspecific(
533 				    permfail_key);
534 				if (pfinfo->flag)
535 					printf(
536 		    "%s Thread: put %s %s not durable.\n",
537 					    pfinfo->thread_name, first, price);
538 				pfinfo->flag = 0;
539 			}
540 		}
541 	}
542 
543 err:	if (dbp != NULL)
544 		(void)dbp->close(dbp, DB_NOSYNC);
545 	if (pfinfo != NULL)
546 		free(pfinfo);
547 	return (ret);
548 }
549 
550 int
create_env(progname,dbenvp)551 create_env(progname, dbenvp)
552 	const char *progname;
553 	DB_ENV **dbenvp;
554 {
555 	DB_ENV *dbenv;
556 	int ret;
557 
558 	if ((ret = db_env_create(&dbenv, 0)) != 0) {
559 		fprintf(stderr, "can't create env handle: %s\n",
560 		    db_strerror(ret));
561 		return (ret);
562 	}
563 
564 	dbenv->set_errfile(dbenv, stderr);
565 	dbenv->set_errpfx(dbenv, progname);
566 
567 	*dbenvp = dbenv;
568 	return (0);
569 }
570 
571 /* Open and configure an environment. */
572 int
env_init(dbenv,home)573 env_init(dbenv, home)
574 	DB_ENV *dbenv;
575 	const char *home;
576 {
577 	u_int32_t flags;
578 	int ret;
579 
580 	(void)dbenv->set_cachesize(dbenv, 0, CACHESIZE, 0);
581 	(void)dbenv->set_flags(dbenv, DB_TXN_NOSYNC, 1);
582 
583 	flags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL |
584 	    DB_INIT_REP | DB_INIT_TXN | DB_RECOVER | DB_THREAD;
585 	if ((ret = dbenv->open(dbenv, home, flags, 0)) != 0)
586 		dbenv->err(dbenv, ret, "can't open environment");
587 	return (ret);
588 }
589 
590 /*
591  * In this application, we specify all communication via the command line.  In
592  * a real application, we would expect that information about the other sites
593  * in the system would be maintained in some sort of configuration file.  The
594  * critical part of this interface is that we assume at startup that we can
595  * find out
596  *	1) what host/port we wish to listen on for connections,
597  *	2) a (possibly empty) list of other sites we should attempt to connect
598  *	to; and
599  *	3) what our Berkeley DB home environment is.
600  *
601  * These pieces of information are expressed by the following flags.
602  * -a all|quorum (optional; repmgr only, a stands for ack policy)
603  * -b (optional, b stands for bulk)
604  * -C or -M start up as client or master (optional for repmgr, required
605  *      for base example)
606  * -h home directory (required)
607  * -l host:port (required for base example;
608  *      required for repmgr unless -L is specified; l stands for local)
609  * -L host:port (optional; repmgr only, local site will be the group creator)
610  * -n nsites (optional; base example only, number of sites in replication group;
611  *      defaults to 0 in which case we try to dynamically compute the
612  *      number of sites in the replication group)
613  * -p priority (optional: defaults to 100)
614  * -r host:port (optional; r stands for remote; any number of these may be
615  *	specified)
616  * -R host:port (optional; repmgr only, remote peer)
617  * -v (optional; v stands for verbose)
618  */
619 void
usage(is_repmgr,progname)620 usage(is_repmgr, progname)
621 	const int is_repmgr;
622 	const char *progname;
623 {
624 	fprintf(stderr, "usage: %s ", progname);
625 	if (is_repmgr)
626 		fprintf(stderr, "[-CM]-h home -l|-L host:port %s%s\n",
627 		    "[-r host:port][-R host:port][-a all|quorum]",
628 		    "[-b][-p priority][-v]");
629 	else
630 		fprintf(stderr, "-CM -h home -l host:port[-r host:port]%s",
631 		    "[-b][-n nsites][-p priority][-v]\n");
632 	exit(EXIT_FAILURE);
633 }
634 
635 /*
636  * This is a very simple thread that performs checkpoints at a fixed
637  * time interval.  For a master site, the time interval is one minute
638  * plus the duration of the checkpoint_delay timeout (30 seconds by
639  * default.)  For a client site, the time interval is one minute.
640  */
641 void *
checkpoint_thread(args)642 checkpoint_thread(args)
643 	void *args;
644 {
645 	DB_ENV *dbenv;
646 	SHARED_DATA *shared;
647 	permfail_t *pfinfo;
648 	supthr_args *ca;
649 	int i, ret;
650 
651 	ca = (supthr_args *)args;
652 	dbenv = ca->dbenv;
653 	shared = ca->shared;
654 	pfinfo = NULL;
655 
656 	/* Allocate checkpoint thread's PERM_FAILED structure. */
657 	if (shared->is_repmgr) {
658 		if ((pfinfo = malloc(sizeof(permfail_t))) == NULL)
659 			return ((void *)EXIT_FAILURE);
660 		if ((ret = thread_setspecific(permfail_key, pfinfo)) != 0)
661 			return ((void *)EXIT_FAILURE);
662 		pfinfo->thread_name = "Checkpoint";
663 		pfinfo->flag = 0;
664 	}
665 
666 	for (;;) {
667 		/*
668 		 * Wait for one minute, polling once per second to see if
669 		 * application has finished.  When application has finished,
670 		 * terminate this thread.
671 		 */
672 		for (i = 0; i < 60; i++) {
673 			sleep(1);
674 			if (shared->app_finished == 1) {
675 				if (pfinfo != NULL)
676 					free(pfinfo);
677 				return ((void *)EXIT_SUCCESS);
678 			}
679 		}
680 
681 		/* Perform a checkpoint. */
682 		if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) {
683 			dbenv->err(dbenv, ret,
684 			    "Could not perform checkpoint.\n");
685 			if (pfinfo != NULL)
686 				free(pfinfo);
687 			return ((void *)EXIT_FAILURE);
688 		}
689 		/* Check this thread's PERM_FAILED indicator. */
690 		if (shared->is_repmgr) {
691 			pfinfo = (permfail_t *)thread_getspecific(
692 			    permfail_key);
693 			if (pfinfo->flag)
694 				printf("%s Thread: checkpoint not durable.\n",
695 				    pfinfo->thread_name);
696 			pfinfo->flag = 0;
697 		}
698 	}
699 }
700 
701 /*
702  * This is a simple log archive thread.  Once per minute, it removes all but
703  * the most recent 3 logs that are safe to remove according to a call to
704  * DB_ENV->log_archive().
705  *
706  * Log cleanup is needed to conserve disk space, but aggressive log cleanup
707  * can cause more frequent client initializations if a client lags too far
708  * behind the current master.  This can happen in the event of a slow client,
709  * a network partition, or a new master that has not kept as many logs as the
710  * previous master.
711  *
712  * The approach in this routine balances the need to mitigate against a
713  * lagging client by keeping a few more of the most recent unneeded logs
714  * with the need to conserve disk space by regularly cleaning up log files.
715  * Use of automatic log removal (DB_ENV->log_set_config() DB_LOG_AUTO_REMOVE
716  * flag) is not recommended for replication due to the risk of frequent
717  * client initializations.
718  */
719 void *
log_archive_thread(args)720 log_archive_thread(args)
721 	void *args;
722 {
723 	DB_ENV *dbenv;
724 	SHARED_DATA *shared;
725 	char **begin, **list;
726 	supthr_args *la;
727 	int i, listlen, logs_to_keep, minlog, ret;
728 
729 	la = (supthr_args *)args;
730 	dbenv = la->dbenv;
731 	shared = la->shared;
732 	logs_to_keep = 3;
733 
734 	for (;;) {
735 		/*
736 		 * Wait for one minute, polling once per second to see if
737 		 * application has finished.  When application has finished,
738 		 * terminate this thread.
739 		 */
740 		for (i = 0; i < 60; i++) {
741 			sleep(1);
742 			if (shared->app_finished == 1)
743 				return ((void *)EXIT_SUCCESS);
744 		}
745 
746 		/* Get the list of unneeded log files. */
747 		if ((ret = dbenv->log_archive(dbenv, &list, DB_ARCH_ABS))
748 		    != 0) {
749 			dbenv->err(dbenv, ret,
750 			    "Could not get log archive list.");
751 			return ((void *)EXIT_FAILURE);
752 		}
753 		if (list != NULL) {
754 			listlen = 0;
755 			/* Get the number of logs in the list. */
756 			for (begin = list; *begin != NULL; begin++, listlen++);
757 			/*
758 			 * Remove all but the logs_to_keep most recent
759 			 * unneeded log files.
760 			 */
761 			minlog = listlen - logs_to_keep;
762 			for (begin = list, i= 0; i < minlog; list++, i++) {
763 				if ((ret = unlink(*list)) != 0) {
764 					dbenv->err(dbenv, ret,
765 					    "logclean: remove %s", *list);
766 					dbenv->errx(dbenv,
767 					    "logclean: Error remove %s", *list);
768 					free(begin);
769 					return ((void *)EXIT_FAILURE);
770 				}
771 			}
772 			free(begin);
773 		}
774 	}
775 }
776