1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2001, 2013 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8 
9 /*
10  * In this application, we specify all communication via the command line.  In
11  * a real application, we would expect that information about the other sites
12  * in the system would be maintained in some sort of configuration file.  The
13  * critical part of this interface is that we assume at startup that we can
14  * find out
15  * 	1) what our Berkeley DB home environment is,
16  * 	2) what host/port we wish to listen on for connections; and
17  * 	3) an optional list of other sites we should attempt to connect to.
18  *
19  * These pieces of information are expressed by the following flags.
20  * -h home (required; h stands for home directory)
21  * -l host:port (required unless -L is specified; l stands for local)
22  * -L host:port (optional, L means group creator)
23  * -C or -M (optional; start up as client or master)
24  * -r host:port (optional; r stands for remote; any number of these may be
25  *	specified)
26  * -R host:port (optional; R stands for remote peer; only one of these may
27  *      be specified)
28  * -a all|quorum (optional; a stands for ack policy)
29  * -b (optional; b stands for bulk)
30  * -p priority (optional; defaults to 100)
31  * -v (optional; v stands for verbose)
32  */
33 
34 #include <iostream>
35 #include <string>
36 #include <sstream>
37 
38 #include <db_cxx.h>
39 #include "StlRepConfigInfo.h"
40 #include "dbstl_map.h"
41 
42 using std::cout;
43 using std::cin;
44 using std::cerr;
45 using std::endl;
46 using std::flush;
47 using std::istream;
48 using std::istringstream;
49 using std::string;
50 using std::getline;
51 using namespace dbstl;
52 #define	CACHESIZE	(10 * 1024 * 1024)
53 #define	DATABASE	"quote.db"
54 
55 const char *progname = "exstl_repquote";
56 
57 #include <errno.h>
58 #ifdef _WIN32
59 #define WIN32_LEAN_AND_MEAN
60 #include <windows.h>
61 #define	snprintf		_snprintf
62 #define	sleep(s)		Sleep(1000 * (s))
63 
64 extern "C" {
65 extern int getopt(int, char * const *, const char *);
66 extern char *optarg;
67 extern int optind;
68 }
69 
70 typedef HANDLE thread_t;
71 typedef DWORD thread_exit_status_t;
72 #define	thread_create(thrp, attr, func, arg)				   \
73     (((*(thrp) = CreateThread(NULL, 0,					   \
74 	(LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
75 #define	thread_join(thr, statusp)					   \
76     ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&		   \
77     GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
78 #else /* !_WIN32 */
79 #include <pthread.h>
80 
81 typedef pthread_t thread_t;
82 typedef void* thread_exit_status_t;
83 #define	thread_create(thrp, attr, func, arg)				   \
84     pthread_create((thrp), (attr), (func), (arg))
85 #define	thread_join(thr, statusp) pthread_join((thr), (statusp))
86 #endif
87 
88 // Struct used to store information in Db app_private field.
89 typedef struct {
90 	bool app_finished;
91 	bool in_client_sync;
92 	bool is_master;
93 	bool verbose;
94 } APP_DATA;
95 
96 static void log(const char *);
97 void *checkpoint_thread (void *);
98 void *log_archive_thread (void *);
99 
100 class RepQuoteExample
101 {
102 public:
103 	typedef db_map<char *, char *, ElementHolder<char *> > str_map_t;
104 	RepQuoteExample();
105 	void init(RepConfigInfo* config);
106 	void doloop();
107 	int terminate();
108 
109 	static void event_callback(DbEnv * dbenv, u_int32_t which, void *info);
110 
111 private:
112 	// disable copy constructor.
113 	RepQuoteExample(const RepQuoteExample &);
114 	void operator = (const RepQuoteExample &);
115 
116 	// internal data members.
117 	APP_DATA		app_data;
118 	RepConfigInfo   *app_config;
119 	DbEnv		   *cur_env;
120 	Db *dbp;
121 	str_map_t *strmap;
122 	thread_t ckp_thr;
123 	thread_t lga_thr;
124 
125 	// private methods.
126 	void print_stocks();
127 	void prompt();
128 	bool open_db(bool creating);
close_db()129 	void close_db(){
130 		delete strmap;
131 		strmap = NULL;
132 		dbstl::close_db(dbp);
133 		dbp = NULL;
134 	}
135 	static void close_db(Db *&);// Close an unregistered Db handle.
136 };
137 
open_db(bool creating)138 bool RepQuoteExample::open_db(bool creating)
139 {
140 	int ret;
141 
142 	if (dbp)
143 		return true;
144 
145 	dbp = new Db(cur_env, DB_CXX_NO_EXCEPTIONS);
146 
147 	u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD;
148 	if (creating)
149 		flags |= DB_CREATE;
150 
151 	ret = dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0);
152 	switch (ret) {
153 	case 0:
154 		register_db(dbp);
155 		if (strmap)
156 			delete strmap;
157 		strmap = new str_map_t(dbp, cur_env);
158 		return (true);
159 	case DB_LOCK_DEADLOCK: // Fall through
160 	case DB_REP_HANDLE_DEAD:
161 		log("\nFailed to open stock db.");
162 		break;
163 	default:
164 		if (ret == DB_REP_LOCKOUT)
165 			break; // Fall through
166 		else if (ret == ENOENT && !creating)
167 			log("\nStock DB does not yet exist\n");
168 		else {
169 			DbException ex(ret);
170 			throw ex;
171 		}
172 	} // switch
173 
174 	// (All retryable errors fall through to here.)
175 	//
176 	log("\nPlease retry the operation");
177 	close_db(dbp);
178 	return (false);
179 }
180 
close_db(Db * & dbp)181 void RepQuoteExample::close_db(Db *&dbp)
182 {
183 	if (dbp) {
184 		try {
185 			dbp->close(0);
186 			delete dbp;
187 			dbp = 0;
188 		} catch (...) {
189 			delete dbp;
190 			dbp = 0;
191 			throw;
192 		}
193 	}
194 
195 }
196 
RepQuoteExample()197 RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(NULL) {
198 	app_data.app_finished = 0;
199 	app_data.in_client_sync = 0;
200 	app_data.is_master = 0; // assume I start out as client
201 	cur_env = new DbEnv(DB_CXX_NO_EXCEPTIONS);
202 	strmap = NULL;
203 	dbp = NULL;
204 }
205 
init(RepConfigInfo * config)206 void RepQuoteExample::init(RepConfigInfo *config) {
207 	app_config = config;
208 	DbSite *dbsite;
209 	int i;
210 
211 	cur_env->set_app_private(&app_data);
212 	cur_env->set_errfile(stderr);
213 	cur_env->set_errpfx(progname);
214 	cur_env->set_event_notify(event_callback);
215 
216 	// Configure bulk transfer to send groups of records to clients
217 	// in a single network transfer.  This is useful for master sites
218 	// and clients participating in client-to-client synchronization.
219 	//
220 	if (app_config->bulk)
221 		cur_env->rep_set_config(DB_REP_CONF_BULK, 1);
222 
223 	// Turn on debugging and informational output if requested.
224 	if (app_config->verbose) {
225 		cur_env->set_verbose(DB_VERB_REPLICATION, 1);
226 		app_data.verbose = 1;
227 	}
228 
229 	// Set replication group election priority for this environment.
230 	// An election first selects the site with the most recent log
231 	// records as the new master.  If multiple sites have the most
232 	// recent log records, the site with the highest priority value
233 	// is selected as master.
234 	//
235 	cur_env->rep_set_priority(app_config->priority);
236 
237 	// Set the policy that determines how master and client sites
238 	// handle acknowledgement of replication messages needed for
239 	// permanent records.  The default policy of "quorum" requires only
240 	// a quorum of electable peers sufficient to ensure a permanent
241 	// record remains durable if an election is held.  The "all" option
242 	// requires all clients to acknowledge a permanent replication
243 	// message instead.
244 	//
245 	cur_env->repmgr_set_ack_policy(app_config->ack_policy);
246 
247 	// Set the threshold for the minimum and maximum time the client
248 	// waits before requesting retransmission of a missing message.
249 	// Base these values on the performance and load characteristics
250 	// of the master and client host platforms as well as the round
251 	// trip message time.
252 	//
253 	cur_env->rep_set_request(20000, 500000);
254 
255 	// Configure deadlock detection to ensure that any deadlocks
256 	// are broken by having one of the conflicting lock requests
257 	// rejected. DB_LOCK_DEFAULT uses the lock policy specified
258 	// at environment creation time or DB_LOCK_RANDOM if none was
259 	// specified.
260 	//
261 	cur_env->set_lk_detect(DB_LOCK_DEFAULT);
262 
263 	// The following base replication features may also be useful to your
264 	// application. See Berkeley DB documentation for more details.
265 	//   - Master leases: Provide stricter consistency for data reads
266 	//     on a master site.
267 	//   - Timeouts: Customize the amount of time Berkeley DB waits
268 	//     for such things as an election to be concluded or a master
269 	//     lease to be granted.
270 	//   - Delayed client synchronization: Manage the master site's
271 	//     resources by spreading out resource-intensive client
272 	//     synchronizations.
273 	//   - Blocked client operations: Return immediately with an error
274 	//     instead of waiting indefinitely if a client operation is
275 	//     blocked by an ongoing client synchronization.
276 
277 	cur_env->repmgr_site(app_config->this_host.host,
278 	    app_config->this_host.port, &dbsite, 0);
279 	dbsite->set_config(DB_LOCAL_SITE, 1);
280 	if(app_config->this_host.creator)
281 		dbsite->set_config(DB_GROUP_CREATOR, 1);
282 	dbsite->close();
283 
284 	i = 1;
285 	for ( REP_HOST_INFO *cur = app_config->other_hosts;
286 		cur != NULL && i <= app_config->nrsites;
287 		cur = cur->next, i++) {
288 		cur_env->repmgr_site(cur->host, cur->port, &dbsite, 0);
289 		dbsite->set_config(DB_BOOTSTRAP_HELPER, 1);
290 		if(cur->peer)
291 			dbsite->set_config(DB_REPMGR_PEER, 1);
292 		dbsite->close();
293 	}
294 
295 	// Configure heartbeat timeouts so that repmgr monitors the
296 	// health of the TCP connection.  Master sites broadcast a heartbeat
297 	// at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
298 	// Client sites wait for message activity the length of the
299 	// DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
300 	// connection to the master is lost.  The DB_REP_HEARTBEAT_MONITOR
301 	// timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
302 	//
303 	cur_env->rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000);
304 	cur_env->rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000);
305 
306 	// The following repmgr features may also be useful to your
307 	// application.  See Berkeley DB documentation for more details.
308 	//  - Two-site strict majority rule - In a two-site replication
309 	//    group, require both sites to be available to elect a new
310 	//    master.
311 	//  - Timeouts - Customize the amount of time repmgr waits
312 	//    for such things as waiting for acknowledgements or attempting
313 	//    to reconnect to other sites.
314 	//  - Site list - return a list of sites currently known to repmgr.
315 
316 	// We can now open our environment, although we're not ready to
317 	// begin replicating.  However, we want to have a dbenv around
318 	// so that we can send it into any of our message handlers.
319 	cur_env->set_cachesize(0, CACHESIZE, 0);
320 	cur_env->set_flags(DB_TXN_NOSYNC, 1);
321 
322 	cur_env->open(app_config->home, DB_CREATE | DB_RECOVER |
323 	    DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
324 	    DB_INIT_MPOOL | DB_INIT_TXN, 0);
325 
326 	// Start checkpoint and log archive support threads.
327 	(void)thread_create(&ckp_thr, NULL, checkpoint_thread, cur_env);
328 	(void)thread_create(&lga_thr, NULL, log_archive_thread, cur_env);
329 
330 	dbstl::register_db_env(cur_env);
331 	cur_env->repmgr_start(3, app_config->start_policy);
332 }
333 
terminate()334 int RepQuoteExample::terminate() {
335 	try {
336 		// Wait for checkpoint and log archive threads to finish.
337 		// Windows does not allow NULL pointer for exit code variable.
338 		thread_exit_status_t exstat;
339 
340 		(void)thread_join(lga_thr, &exstat);
341 		(void)thread_join(ckp_thr, &exstat);
342 
343 		// We have used the DB_TXN_NOSYNC environment flag for
344 		// improved performance without the usual sacrifice of
345 		// transactional durability, as discussed in the
346 		// "Transactional guarantees" page of the Reference
347 		// Guide: if one replication site crashes, we can
348 		// expect the data to exist at another site.  However,
349 		// in case we shut down all sites gracefully, we push
350 		// out the end of the log here so that the most
351 		// recent transactions don't mysteriously disappear.
352 		cur_env->log_flush(NULL);
353 	} catch (DbException dbe) {
354 		cout << "\nerror closing environment: " << dbe.what() << endl;
355 	}
356 	return 0;
357 }
358 
prompt()359 void RepQuoteExample::prompt() {
360 	cout << "QUOTESERVER";
361 	if (!app_data.is_master)
362 		cout << "(read-only)";
363 	cout << "> " << flush;
364 }
365 
log(const char * msg)366 void log(const char *msg) {
367 	cerr << msg << endl;
368 }
369 
370 // Simple command-line user interface:
371 //  - enter "<stock symbol> <price>" to insert or update a record in the
372 //	database;
373 //  - just press Return (i.e., blank input line) to print out the contents of
374 //	the database;
375 //  - enter "quit" or "exit" to quit.
376 //
doloop()377 void RepQuoteExample::doloop() {
378 	string input;
379 
380 	while (prompt(), getline(cin, input)) {
381 		istringstream is(input);
382 		string token1, token2;
383 
384 		// Read 0, 1 or 2 tokens from the input.
385 		//
386 		int count = 0;
387 		if (is >> token1) {
388 			count++;
389 			if (is >> token2)
390 			count++;
391 		}
392 
393 		if (count == 1) {
394 			if (token1 == "exit" || token1 == "quit") {
395 				app_data.app_finished = 1;
396 				break;
397 			} else {
398 				log("\nFormat: <stock> <price>\n");
399 				continue;
400 			}
401 		}
402 
403 		// Here we know count is either 0 or 2, so we're about to try a
404 		// DB operation.
405 		//
406 		// Open database with DB_CREATE only if this is a master
407 		// database.  A client database uses polling to attempt
408 		// to open the database without DB_CREATE until it is
409 		// successful.
410 		//
411 		// This DB_CREATE polling logic can be simplified under
412 		// some circumstances.  For example, if the application can
413 		// be sure a database is already there, it would never need
414 		// to open it with DB_CREATE.
415 		//
416 		if (!open_db(app_data.is_master))
417 			continue;
418 
419 		try {
420 			if (count == 0)
421 				if (app_data.in_client_sync)
422 					log(
423     "Cannot read data during client initialization - please try again.");
424 				else
425 					print_stocks();
426 			else if (!app_data.is_master)
427 				log("\nCan't update at client\n");
428 			else {
429 				char *symbol = new char[token1.length() + 1];
430 				strcpy(symbol, token1.c_str());
431 				char *price = new char[token2.length() + 1];
432 				strcpy(price, token2.c_str());
433 				begin_txn(0, cur_env);
434 				strmap->insert(make_pair(symbol, price));
435 				commit_txn(cur_env);
436 				delete symbol;
437 				delete price;
438 			}
439 		} catch (DbDeadlockException e) {
440 			log("\nplease retry the operation\n");
441 			close_db();
442 		} catch (DbRepHandleDeadException e) {
443 			log("\nplease retry the operation\n");
444 			close_db();
445 		} catch (DbException e) {
446 			if (e.get_errno() == DB_REP_LOCKOUT) {
447 			log("\nplease retry the operation\n");
448 			close_db();
449 			} else
450 			throw;
451 		}
452 
453 	}
454 
455 	close_db();
456 }
457 
event_callback(DbEnv * dbenv,u_int32_t which,void * info)458 void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
459 {
460 	APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
461 
462 	info = NULL;		/* Currently unused. */
463 
464 	switch (which) {
465 	case DB_EVENT_REP_MASTER:
466 		app->in_client_sync = 0;
467 		app->is_master = 1;
468 		app->verbose = 0;
469 		break;
470 
471 	case DB_EVENT_REP_CLIENT:
472 		app->is_master = 0;
473 		app->in_client_sync = 1;
474 		break;
475 
476 	case DB_EVENT_REP_STARTUPDONE:
477 		app->in_client_sync = 0;
478 		break;
479 
480 	case DB_EVENT_REP_NEWMASTER:
481 		app->in_client_sync = 1;
482 		break;
483 
484 	case DB_EVENT_REP_PERM_FAILED:
485 		// Did not get enough acks to guarantee transaction
486 		// durability based on the configured ack policy.  This
487 		// transaction will be flushed to the master site's
488 		// local disk storage for durability.
489 		//
490 		if(app->verbose)
491 			log(
492     "EVENT: Insufficient acknowledgements to guarantee transaction durability.");
493 		break;
494 
495 	case DB_EVENT_PANIC:
496 		if(app->verbose)
497 			log("EVENT: receive panic event");
498 		break;
499 
500 	case DB_EVENT_REP_CONNECT_BROKEN:
501 		if(app->verbose)
502 			log("EVENT: connection is broken");
503 		break;
504 
505 	case DB_EVENT_REP_DUPMASTER:
506 		if(app->verbose)
507 			log("EVENT: duplicate master");
508 		break;
509 
510 	case DB_EVENT_REP_ELECTED:
511 		if(app->verbose)
512 			log("EVENT: election in replication group");
513 		break;
514 
515 	case DB_EVENT_REP_CONNECT_ESTD:
516 		if(app->verbose)
517 			log("EVENT: establish connection");
518 		break;
519 
520 	case DB_EVENT_REP_CONNECT_TRY_FAILED:
521 		if(app->verbose)
522 			log("EVENT: fail to try connection");
523 		break;
524 
525 	case DB_EVENT_REP_INIT_DONE:
526 		if(app->verbose)
527 			log("EVENT: finish initialization");
528 		break;
529 
530 	case DB_EVENT_REP_LOCAL_SITE_REMOVED:
531 		if(app->verbose)
532 			log("EVENT: remove local site");
533 		break;
534 
535 	case DB_EVENT_REP_SITE_ADDED:
536 		if(app->verbose)
537 			log("EVENT: add site");
538 		break;
539 
540 	case DB_EVENT_REP_SITE_REMOVED:
541 		if(app->verbose)
542 			log("EVENT: remove site removed");
543 		break;
544 
545 	default:
546 		dbenv->errx("\nignoring event %d", which);
547 	}
548 }
549 
print_stocks()550 void RepQuoteExample::print_stocks() {
551 #define	MAXKEYSIZE	10
552 #define	MAXDATASIZE	20
553 
554 	cout << "\tSymbol\tPrice" << endl
555 		<< "\t======\t=====" << endl;
556 	str_map_t::iterator itr;
557 	if (strmap == NULL)
558 		strmap = new str_map_t(dbp, cur_env);
559 	begin_txn(0, cur_env);
560 	for (itr = strmap->begin(); itr != strmap->end(); ++itr)
561 		cout<<"\t"<<itr->first<<"\t"<<itr->second<<endl;
562 	commit_txn(cur_env);
563 	cout << endl << flush;
564 }
565 
usage()566 static void usage() {
567 	cerr << "usage: " << progname << endl
568 	    << " -h home -l|-L host:port"
569 	    << " [-C|M] [-r host:port] [-R host:port]" <<endl
570 	    << " [-a all|quorum] [-b] [-p priority] [-v]" << endl;
571 
572 	cerr << "\t -h home (required; h stands for home directory)" << endl
573 	    << "\t -l host:port (required unless -L is specified;"
574 	    << " l stands for local)" << endl
575 	    << "\t -L host:port (optional, L means group creator)" << endl
576 	    << "\t -C or -M (optional; start up as client or master)" << endl
577 	    << "\t -r host:port (optional; r stands for remote; any "
578 	    << "number of these" << endl
579 	    << "\t               may be specified)" << endl
580 	    << "\t -R host:port (optional; R stands for remote peer; only "
581 	    << "one of" << endl
582 	    << "\t               these may be specified)" << endl
583 	    << "\t -a all|quorum (optional; a stands for ack policy)" << endl
584 	    << "\t -b (optional; b stands for bulk)" << endl
585 	    << "\t -p priority (optional; defaults to 100)" << endl
586 	    << "\t -v (optional; v stands for verbose)" << endl;
587 
588 	exit(EXIT_FAILURE);
589 }
590 
main(int argc,char ** argv)591 int main(int argc, char **argv) {
592 	RepConfigInfo config;
593 	char ch, *portstr, *tmphost;
594 	int tmpport;
595 	bool tmppeer;
596 
597 	// Extract the command line parameters
598 	while ((ch = getopt(argc, argv, "a:bCh:L:l:Mp:R:r:v")) != EOF) {
599 		tmppeer = false;
600 		switch (ch) {
601 		case 'a':
602 			if (strncmp(optarg, "all", 3) == 0)
603 				config.ack_policy = DB_REPMGR_ACKS_ALL;
604 			else if (strncmp(optarg, "quorum", 6) != 0)
605 				usage();
606 			break;
607 		case 'b':
608 			config.bulk = true;
609 			break;
610 		case 'C':
611 			config.start_policy = DB_REP_CLIENT;
612 			break;
613 		case 'h':
614 			config.home = optarg;
615 			break;
616 		case 'L':
617 			config.this_host.creator = true;
618 		case 'l':
619 			config.this_host.host = strtok(optarg, ":");
620 			if ((portstr = strtok(NULL, ":")) == NULL) {
621 				cerr << "\nBad host specification." << endl;
622 				usage();
623 			}
624 			config.this_host.port = (unsigned short)atoi(portstr);
625 			config.got_listen_address = true;
626 			break;
627 		case 'M':
628 			config.start_policy = DB_REP_MASTER;
629 			break;
630 		case 'p':
631 			config.priority = atoi(optarg);
632 			break;
633 		case 'R':
634 			tmppeer = true; // FALLTHROUGH
635 		case 'r':
636 			tmphost = strtok(optarg, ":");
637 			if ((portstr = strtok(NULL, ":")) == NULL) {
638 				cerr << "Bad host specification." << endl;
639 				usage();
640 			}
641 			tmpport = (unsigned short)atoi(portstr);
642 
643 			config.addOtherHost(tmphost, tmpport, tmppeer);
644 			break;
645 		case 'v':
646 			config.verbose = true;
647 			break;
648 		case '?':
649 		default:
650 			usage();
651 		}
652 	}
653 
654 	// Error check command line.
655 	if ((!config.got_listen_address) || config.home == NULL)
656 		usage();
657 
658 	RepQuoteExample runner;
659 	try {
660 		runner.init(&config);
661 		runner.doloop();
662 	} catch (DbException dbe) {
663 		cerr << "\nCaught an exception during initialization or"
664 			<< " processing: " << dbe.what() << endl;
665 	}
666 	runner.terminate();
667 	return 0;
668 }
669 
670 // This is a very simple thread that performs checkpoints at a fixed
671 // time interval.  For a master site, the time interval is one minute
672 // plus the duration of the checkpoint_delay timeout (30 seconds by
673 // default.)  For a client site, the time interval is one minute.
674 //
checkpoint_thread(void * args)675 void *checkpoint_thread(void *args)
676 {
677 	DbEnv *env;
678 	APP_DATA *app;
679 	int i, ret;
680 
681 	env = (DbEnv *)args;
682 	app = (APP_DATA *)env->get_app_private();
683 
684 	for (;;) {
685 		// Wait for one minute, polling once per second to see if
686 		// application has finished.  When application has finished,
687 		// terminate this thread.
688 		//
689 		for (i = 0; i < 60; i++) {
690 			sleep(1);
691 			if (app->app_finished == 1)
692 				return ((void *)EXIT_SUCCESS);
693 		}
694 
695 		// Perform a checkpoint.
696 		if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) {
697 			env->err(ret, "Could not perform checkpoint.\n");
698 			return ((void *)EXIT_FAILURE);
699 		}
700 	}
701 }
702 
703 // This is a simple log archive thread.  Once per minute, it removes all but
704 // the most recent 3 logs that are safe to remove according to a call to
705 // DBENV->log_archive().
706 //
707 // Log cleanup is needed to conserve disk space, but aggressive log cleanup
708 // can cause more frequent client initializations if a client lags too far
709 // behind the current master.  This can happen in the event of a slow client,
710 // a network partition, or a new master that has not kept as many logs as the
711 // previous master.
712 //
713 // The approach in this routine balances the need to mitigate against a
714 // lagging client by keeping a few more of the most recent unneeded logs
715 // with the need to conserve disk space by regularly cleaning up log files.
716 // Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE
717 // flag) is not recommended for replication due to the risk of frequent
718 // client initializations.
719 //
log_archive_thread(void * args)720 void *log_archive_thread(void *args)
721 {
722 	DbEnv *env;
723 	APP_DATA *app;
724 	char **begin, **list;
725 	int i, listlen, logs_to_keep, minlog, ret;
726 
727 	env = (DbEnv *)args;
728 	app = (APP_DATA *)env->get_app_private();
729 	logs_to_keep = 3;
730 
731 	for (;;) {
732 		// Wait for one minute, polling once per second to see if
733 		// application has finished.  When application has finished,
734 		// terminate this thread.
735 		//
736 		for (i = 0; i < 60; i++) {
737 			sleep(1);
738 			if (app->app_finished == 1)
739 				return ((void *)EXIT_SUCCESS);
740 		}
741 
742 		// Get the list of unneeded log files.
743 		if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) {
744 			env->err(ret, "Could not get log archive list.");
745 			return ((void *)EXIT_FAILURE);
746 		}
747 		if (list != NULL) {
748 			listlen = 0;
749 			// Get the number of logs in the list.
750 			for (begin = list; *begin != NULL; begin++, listlen++);
751 			// Remove all but the logs_to_keep most recent
752 			// unneeded log files.
753 			//
754 			minlog = listlen - logs_to_keep;
755 			for (begin = list, i= 0; i < minlog; list++, i++) {
756 				if ((ret = unlink(*list)) != 0) {
757 					env->err(ret,
758 					    "logclean: remove %s", *list);
759 					env->errx(
760 					    "logclean: Error remove %s", *list);
761 					free(begin);
762 					return ((void *)EXIT_FAILURE);
763 				}
764 			}
765 			free(begin);
766 		}
767 	}
768 }
769 
770