1 /*-
2  * Copyright (c) 2001, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/blob.h"
13 #include "dbinc/db_page.h"
14 #include "dbinc/db_am.h"
15 #include "dbinc/lock.h"
16 #include "dbinc/mp.h"
17 #include "dbinc/txn.h"
18 
19 static int __rep_collect_txn
20     __P((ENV *, DB_LSN *, LSN_COLLECTION *, DELAYED_BLOB_LIST **));
21 static int __rep_remove_delayed_blobs
22     __P((ENV *, db_seq_t, u_int32_t ,DELAYED_BLOB_LIST **));
23 static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *));
24 static int __rep_fire_newmaster __P((ENV *, u_int32_t, int));
25 static int __rep_fire_startupdone __P((ENV *, u_int32_t, int));
26 static int __rep_getnext __P((ENV *, DB_THREAD_INFO *));
27 static int __rep_lsn_cmp __P((const void *, const void *));
28 static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *));
29 static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
30     DBT *, db_timespec *, DB_LSN *));
31 static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *));
32 static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t));
33 
34 /* Used to consistently designate which messages ought to be received where. */
35 
36 #define	MASTER_ONLY(rep, rp) do {					\
37 	if (!F_ISSET(rep, REP_F_MASTER)) {				\
38 		RPRINT(env, (env, DB_VERB_REP_MSGS,			\
39 		    "Master record received on client"));		\
40 		REP_PRINT_MESSAGE(env,					\
41 		    eid, rp, "rep_process_message", 0);			\
42 		/* Just skip/ignore it. */				\
43 		ret = 0;						\
44 		goto errlock;						\
45 	}								\
46 } while (0)
47 
48 #define	CLIENT_ONLY(rep, rp) do {					\
49 	if (!F_ISSET(rep, REP_F_CLIENT)) {				\
50 		RPRINT(env, (env, DB_VERB_REP_MSGS,			\
51 		    "Client record received on master"));		\
52 		/*							\
53 		 * Only broadcast DUPMASTER if leases are not		\
54 		 * in effect.  If I am an old master, using		\
55 		 * leases and I get a newer message, my leases		\
56 		 * had better all be expired.				\
57 		 */							\
58 		if (IS_USING_LEASES(env))				\
59 			DB_ASSERT(env,					\
60 			    __rep_lease_check(env, 0) ==		\
61 			    DB_REP_LEASE_EXPIRED);			\
62 		else {							\
63 			REP_PRINT_MESSAGE(env,				\
64 			    eid, rp, "rep_process_message", 0);		\
65 			(void)__rep_send_message(env, DB_EID_BROADCAST, \
66 			    REP_DUPMASTER, NULL, NULL, 0, 0);		\
67 		}							\
68 		ret = DB_REP_DUPMASTER;					\
69 		goto errlock;						\
70 	}								\
71 } while (0)
72 
73 /*
74  * If a client is attempting to service a request and its gen is not in
75  * sync with its database state, it cannot service the request.  Currently
76  * the only way to know this is with the heavy hammer of knowing (or not)
77  * who the master is.  If the master is invalid, force a rerequest.
78  * If we receive an ALIVE, we update both gen and invalidate the
79  * master_id.
80  */
81 #define	CLIENT_MASTERCHK do {						\
82 	if (F_ISSET(rep, REP_F_CLIENT)) {				\
83 		if (master_id == DB_EID_INVALID) {			\
84 			STAT(rep->stat.st_client_svc_miss++);		\
85 			ret = __rep_skip_msg(env, rep, eid, rp->rectype);\
86 			goto errlock;					\
87 		}							\
88 	}								\
89 } while (0)
90 
91 /*
92  * If a client is attempting to service a request it does not have,
93  * call rep_skip_msg to skip this message and force a rerequest to the
94  * sender.  We don't hold the mutex for the stats and may miscount.
95  */
96 #define	CLIENT_REREQ do {						\
97 	if (F_ISSET(rep, REP_F_CLIENT)) {				\
98 		STAT(rep->stat.st_client_svc_req++);			\
99 		if (ret == DB_NOTFOUND) {				\
100 			STAT(rep->stat.st_client_svc_miss++);		\
101 			ret = __rep_skip_msg(env, rep, eid, rp->rectype);\
102 		}							\
103 	}								\
104 } while (0)
105 
106 #define	RECOVERING_SKIP do {						\
107 	if (IS_REP_CLIENT(env) && recovering) {			\
108 		/* Not holding region mutex, may miscount */		\
109 		STAT(rep->stat.st_msgs_recover++);			\
110 		ret = __rep_skip_msg(env, rep, eid, rp->rectype);	\
111 		goto errlock;						\
112 	}								\
113 } while (0)
114 
115 /*
116  * If we're recovering the log we only want log records that are in the
117  * range we need to recover.  Otherwise we can end up storing a huge
118  * number of "new" records, only to truncate the temp database later after
119  * we run recovery.  If we are actively delaying a sync-up, we also skip
120  * all incoming log records until the application requests sync-up.
121  */
122 #define	RECOVERING_LOG_SKIP do {					\
123 	if (F_ISSET(rep, REP_F_DELAY) ||				\
124 	    rep->master_id == DB_EID_INVALID ||				\
125 	    (recovering &&						\
126 	    (rep->sync_state != SYNC_LOG ||				\
127 	     LOG_COMPARE(&rp->lsn, &rep->last_lsn) >= 0))) {		\
128 		/* Not holding region mutex, may miscount */		\
129 		STAT(rep->stat.st_msgs_recover++);			\
130 		ret = __rep_skip_msg(env, rep, eid, rp->rectype);	\
131 		goto errlock;						\
132 	}								\
133 } while (0)
134 
135 #define	ANYSITE(rep)
136 
137 /*
138  * __rep_process_message_pp --
139  *
140  * This routine takes an incoming message and processes it.
141  *
142  * control: contains the control fields from the record
143  * rec: contains the actual record
144  * eid: the environment id of the sender of the message;
145  * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the
146  *	lsn of the maximum permanent or current not permanent log record
147  *	(respectively).
148  *
149  * PUBLIC: int __rep_process_message_pp
150  * PUBLIC:      __P((DB_ENV *, DBT *, DBT *, int, DB_LSN *));
151  */
152 int
__rep_process_message_pp(dbenv,control,rec,eid,ret_lsnp)153 __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp)
154 	DB_ENV *dbenv;
155 	DBT *control, *rec;
156 	int eid;
157 	DB_LSN *ret_lsnp;
158 {
159 	ENV *env;
160 	DB_THREAD_INFO *ip;
161 	int ret;
162 
163 	env = dbenv->env;
164 	ret = 0;
165 
166 	ENV_REQUIRES_CONFIG_XX(
167 	    env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP);
168 
169 	if (APP_IS_REPMGR(env)) {
170 		__db_errx(env, DB_STR_A("3512",
171 		    "%s cannot call from Replication Manager application",
172 		    "%s"), "DB_ENV->rep_process_message:");
173 		return (EINVAL);
174 	}
175 
176 	/* Control argument must be non-Null. */
177 	if (control == NULL || control->size == 0) {
178 		__db_errx(env, DB_STR("3513",
179 	"DB_ENV->rep_process_message: control argument must be specified"));
180 		return (EINVAL);
181 	}
182 
183 	/*
184 	 * Make sure site is a master or a client, which implies that
185 	 * replication has been started.
186 	 */
187 	if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) {
188 		__db_errx(env, DB_STR("3514",
189 	"Environment not configured as replication master or client"));
190 		return (EINVAL);
191 	}
192 
193 	if ((ret = __dbt_usercopy(env, control)) != 0 ||
194 	    (ret = __dbt_usercopy(env, rec)) != 0) {
195 		__dbt_userfree(env, control, rec, NULL);
196 		__db_errx(env, DB_STR("3515",
197 	"DB_ENV->rep_process_message: error retrieving DBT contents"));
198 		return (ret);
199 	}
200 
201 	ENV_ENTER(env, ip);
202 	ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp);
203 	ENV_LEAVE(env, ip);
204 
205 	__dbt_userfree(env, control, rec, NULL);
206 	return (ret);
207 }
208 
209 /*
210  * __rep_process_message_int --
211  *
212  * This routine performs the internal steps to process an incoming message.
213  *
214  * PUBLIC: int __rep_process_message_int
215  * PUBLIC:     __P((ENV *, DBT *, DBT *, int, DB_LSN *));
216  */
217 int
__rep_process_message_int(env,control,rec,eid,ret_lsnp)218 __rep_process_message_int(env, control, rec, eid, ret_lsnp)
219 	ENV *env;
220 	DBT *control, *rec;
221 	int eid;
222 	DB_LSN *ret_lsnp;
223 {
224 	DBT data_dbt;
225 	DB_LOG *dblp;
226 	DB_LSN last_lsn, lsn;
227 	DB_REP *db_rep;
228 	DB_THREAD_INFO *ip;
229 	LOG *lp;
230 	REGENV *renv;
231 	REGINFO *infop;
232 	REP *rep;
233 	__rep_control_args *rp, tmprp;
234 	__rep_egen_args egen_arg;
235 	size_t len;
236 	u_int32_t gen;
237 	int cmp, do_sync, lockout, master_id, recovering, ret, t_ret;
238 	time_t savetime;
239 	u_int8_t buf[__REP_MAXMSG_SIZE];
240 
241 	ret = 0;
242 	do_sync = 0;
243 	lockout = 0;
244 	db_rep = env->rep_handle;
245 	rep = db_rep->region;
246 	dblp = env->lg_handle;
247 	lp = dblp->reginfo.primary;
248 	infop = env->reginfo;
249 	renv = infop->primary;
250 
251 	if ((ret = __rep_control_unmarshal(env, &tmprp,
252 	    control->data, control->size, NULL)) != 0)
253 		return (ret);
254 	rp = &tmprp;
255 	if (ret_lsnp != NULL)
256 		ZERO_LSN(*ret_lsnp);
257 
258 	ENV_GET_THREAD_INFO(env, ip);
259 	REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0);
260 	/*
261 	 * Check the version number for both rep and log.  If it is
262 	 * an old version we support, convert it.  Otherwise complain.
263 	 */
264 	if (rp->rep_version < DB_REPVERSION) {
265 		if (rp->rep_version < DB_REPVERSION_MIN) {
266 			__db_errx(env, DB_STR_A("3516",
267  "unsupported old replication message version %lu, minimum version %d",
268 			    "%lu %d"), (u_long)rp->rep_version,
269 			    DB_REPVERSION_MIN);
270 
271 			return (EINVAL);
272 		}
273 		VPRINT(env, (env, DB_VERB_REP_MSGS,
274 		    "Received record %lu with old rep version %lu",
275 		    (u_long)rp->rectype, (u_long)rp->rep_version));
276 		rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype);
277 		DB_ASSERT(env, rp->rectype != REP_INVALID);
278 		/*
279 		 * We should have a valid new record type for all the old
280 		 * versions.
281 		 */
282 		VPRINT(env, (env, DB_VERB_REP_MSGS,
283 		    "Converted to record %lu with old rep version %lu",
284 		    (u_long)rp->rectype, (u_long)rp->rep_version));
285 	} else if (rp->rep_version > DB_REPVERSION) {
286 		__db_errx(env, DB_STR_A("3517",
287 		    "unexpected replication message version %lu, expected %d",
288 		    "%lu %d"), (u_long)rp->rep_version, DB_REPVERSION);
289 		return (EINVAL);
290 	}
291 
292 	if (rp->log_version < DB_LOGVERSION) {
293 		if (rp->log_version < DB_LOGVERSION_MIN) {
294 			__db_errx(env, DB_STR_A("3518",
295  "unsupported old replication log version %lu, minimum version %d",
296 			    "%lu %d"), (u_long)rp->log_version,
297 			    DB_LOGVERSION_MIN);
298 			return (EINVAL);
299 		}
300 		VPRINT(env, (env, DB_VERB_REP_MSGS,
301 		    "Received record %lu with old log version %lu",
302 		    (u_long)rp->rectype, (u_long)rp->log_version));
303 	} else if (rp->log_version > DB_LOGVERSION) {
304 		__db_errx(env, DB_STR_A("3519",
305 		    "unexpected log record version %lu, expected %d",
306 		    "%lu %d"), (u_long)rp->log_version, DB_LOGVERSION);
307 		return (EINVAL);
308 	}
309 
310 	/*
311 	 * Acquire the replication lock.
312 	 */
313 	REP_SYSTEM_LOCK(env);
314 	if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) {
315 		/*
316 		 * If we're racing with a thread in rep_start, then
317 		 * just ignore the message and return.
318 		 */
319 		RPRINT(env, (env, DB_VERB_REP_MSGS,
320 		    "Racing replication msg lockout, ignore message."));
321 		/*
322 		 * Although we're ignoring the message, there are a few
323 		 * we need to pay a bit of attention to anyway.  All of
324 		 * these cases are mutually exclusive.
325 		 * 1. If it is a PERM message, we don't want to return 0.
326 		 * 2. If it is a NEWSITE message let the app know so it can
327 		 * do whatever it needs for connection purposes.
328 		 * 3. If it is a c2c request, tell the sender we're not
329 		 * going to handle it.
330 		 */
331 		if (F_ISSET(rp, REPCTL_PERM))
332 			ret = DB_REP_IGNORE;
333 		REP_SYSTEM_UNLOCK(env);
334 		/*
335 		 * If this is new site information return DB_REP_NEWSITE so
336 		 * that the user can use whatever information may have been
337 		 * sent for connections.
338 		 */
339 		if (rp->rectype == REP_NEWSITE)
340 			ret = DB_REP_NEWSITE;
341 		/*
342 		 * If another client has sent a c2c request to us, it may be a
343 		 * long time before it resends the request (due to its dual data
344 		 * streams avoidance heuristic); let it know we can't serve the
345 		 * request just now.
346 		 */
347 		if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) {
348 			STAT(rep->stat.st_client_svc_req++);
349 			STAT(rep->stat.st_client_svc_miss++);
350 			(void)__rep_send_message(env,
351 			    eid, REP_REREQUEST, NULL, NULL, 0, 0);
352 		}
353 		goto out;
354 	}
355 	rep->msg_th++;
356 	gen = rep->gen;
357 	master_id = rep->master_id;
358 	recovering = IS_REP_RECOVERING(rep);
359 	savetime = renv->rep_timestamp;
360 
361 	STAT(rep->stat.st_msgs_processed++);
362 	REP_SYSTEM_UNLOCK(env);
363 
364 	/*
365 	 * Check for lease configuration matching.  Leases must be
366 	 * configured all or none.  If I am a client and I receive a
367 	 * message requesting a lease, and I'm not using leases, that
368 	 * is an error.
369 	 */
370 	if (!IS_USING_LEASES(env) &&
371 	    (F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) {
372 		__db_errx(env, DB_STR("3520",
373 		    "Inconsistent lease configuration"));
374 		RPRINT(env, (env, DB_VERB_REP_MSGS,
375 		    "Client received lease message and not using leases"));
376 		ret = EINVAL;
377 		ret = __env_panic(env, ret);
378 		goto errlock;
379 	}
380 
381 	/*
382 	 * Check for generation number matching.  Ignore any old messages
383 	 * except requests that are indicative of a new client that needs
384 	 * to get in sync.
385 	 */
386 	if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
387 	    rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
388 	    rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) {
389 		/*
390 		 * We don't hold the rep mutex, and could miscount if we race.
391 		 */
392 		STAT(rep->stat.st_msgs_badgen++);
393 		if (F_ISSET(rp, REPCTL_PERM))
394 			ret = DB_REP_IGNORE;
395 		goto errlock;
396 	}
397 
398 	if (rp->gen > gen) {
399 		/*
400 		 * If I am a master and am out of date with a lower generation
401 		 * number, I am in bad shape and should downgrade.
402 		 */
403 		if (F_ISSET(rep, REP_F_MASTER)) {
404 			STAT(rep->stat.st_dupmasters++);
405 			ret = DB_REP_DUPMASTER;
406 			/*
407 			 * Only broadcast DUPMASTER if leases are not
408 			 * in effect.  If I am an old master, using
409 			 * leases and I get a newer message, my leases
410 			 * had better all be expired.
411 			 */
412 			if (IS_USING_LEASES(env))
413 				DB_ASSERT(env,
414 				    __rep_lease_check(env, 0) ==
415 				    DB_REP_LEASE_EXPIRED);
416 			else if (rp->rectype != REP_DUPMASTER)
417 				(void)__rep_send_message(env,
418 				    DB_EID_BROADCAST, REP_DUPMASTER,
419 				    NULL, NULL, 0, 0);
420 			goto errlock;
421 		}
422 
423 		/*
424 		 * I am a client and am out of date.  If this is an election,
425 		 * or a response from the first site I contacted, then I can
426 		 * accept the generation number and participate in future
427 		 * elections and communication. Otherwise, I need to hear about
428 		 * a new master and sync up.
429 		 *
430 		 * But do not do any of this if REP_F_HOLD_GEN is set.  In
431 		 * this case we keep the site at its current gen until we
432 		 * clear this flag.
433 		 */
434 		if ((rp->rectype == REP_ALIVE ||
435 		    rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) &&
436 		    !F_ISSET(rep, REP_F_HOLD_GEN)) {
437 			REP_SYSTEM_LOCK(env);
438 			RPRINT(env, (env, DB_VERB_REP_MSGS,
439 			    "Updating gen from %lu to %lu",
440 			    (u_long)gen, (u_long)rp->gen));
441 			rep->master_id = DB_EID_INVALID;
442 			gen = rp->gen;
443 			SET_GEN(gen);
444 			/*
445 			 * Updating of egen will happen when we process the
446 			 * message below for each message type.
447 			 */
448 			REP_SYSTEM_UNLOCK(env);
449 			if (rp->rectype == REP_ALIVE)
450 				(void)__rep_send_message(env,
451 				    DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
452 				    NULL, 0, 0);
453 		} else if (rp->rectype != REP_NEWMASTER) {
454 			/*
455 			 * Ignore this message, retransmit if needed.
456 			 */
457 			if (__rep_check_doreq(env, rep))
458 				(void)__rep_send_message(env,
459 				    DB_EID_BROADCAST, REP_MASTER_REQ,
460 				    NULL, NULL, 0, 0);
461 			goto errlock;
462 		}
463 		/*
464 		 * If you get here, then you're a client and either you're
465 		 * in an election or you have a NEWMASTER or an ALIVE message
466 		 * whose processing will do the right thing below.
467 		 */
468 	}
469 
470 	/*
471 	 * If the sender is part of an established group, so are we now.
472 	 */
473 	if (F_ISSET(rp, REPCTL_GROUP_ESTD)) {
474 		REP_SYSTEM_LOCK(env);
475 #ifdef	DIAGNOSTIC
476 		if (!F_ISSET(rep, REP_F_GROUP_ESTD))
477 			RPRINT(env, (env, DB_VERB_REP_MSGS,
478 			    "I am now part of an established group"));
479 #endif
480 		F_SET(rep, REP_F_GROUP_ESTD);
481 		REP_SYSTEM_UNLOCK(env);
482 	}
483 
484 	/*
485 	 * We need to check if we're in recovery and if we are
486 	 * then we need to ignore any messages except VERIFY*, VOTE*,
487 	 * NEW* and ALIVE_REQ, or backup related messages: UPDATE*,
488 	 * PAGE* and FILE*.  We need to also accept LOG messages
489 	 * if we're copying the log for recovery/backup.
490 	 */
491 	switch (rp->rectype) {
492 	case REP_ALIVE:
493 		/*
494 		 * Handle even if we're recovering.
495 		 */
496 		ANYSITE(rep);
497 		if ((ret = __rep_egen_unmarshal(env, &egen_arg,
498 		    rec->data, rec->size, NULL)) != 0)
499 			return (ret);
500 		REP_SYSTEM_LOCK(env);
501 		if (egen_arg.egen > rep->egen) {
502 			/*
503 			 * If we're currently working futilely at processing an
504 			 * obsolete egen, treat it like an egen update, so that
505 			 * we abort the current rep_elect() call and signal the
506 			 * application to start a new one.
507 			 */
508 			if (rep->spent_egen == rep->egen)
509 				ret = DB_REP_HOLDELECTION;
510 
511 			RPRINT(env, (env, DB_VERB_REP_MSGS,
512 			    "Received ALIVE egen of %lu, mine %lu",
513 			    (u_long)egen_arg.egen, (u_long)rep->egen));
514 			__rep_elect_done(env, rep);
515 			rep->egen = egen_arg.egen;
516 		}
517 		REP_SYSTEM_UNLOCK(env);
518 		break;
519 	case REP_ALIVE_REQ:
520 		/*
521 		 * Handle even if we're recovering.
522 		 */
523 		ANYSITE(rep);
524 		LOG_SYSTEM_LOCK(env);
525 		lsn = lp->lsn;
526 		LOG_SYSTEM_UNLOCK(env);
527 #ifdef	CONFIG_TEST
528 		/*
529 		 * Send this first, before the ALIVE message because of the
530 		 * way the test suite and messaging is done sequentially.
531 		 * In some sequences it is possible to get into a situation
532 		 * where the test suite cannot get the later NEWMASTER because
533 		 * we break out of the messaging loop too early.
534 		 */
535 		if (F_ISSET(rep, REP_F_MASTER))
536 			(void)__rep_send_message(env,
537 			    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
538 #endif
539 		REP_SYSTEM_LOCK(env);
540 		egen_arg.egen = rep->egen;
541 		REP_SYSTEM_UNLOCK(env);
542 		if ((ret = __rep_egen_marshal(env,
543 		    &egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0)
544 			goto errlock;
545 		DB_INIT_DBT(data_dbt, buf, len);
546 		(void)__rep_send_message(env,
547 		    eid, REP_ALIVE, &lsn, &data_dbt, 0, 0);
548 		break;
549 	case REP_ALL_REQ:
550 		RECOVERING_SKIP;
551 		CLIENT_MASTERCHK;
552 		ret = __rep_allreq(env, rp, eid);
553 		CLIENT_REREQ;
554 		break;
555 	case REP_BLOB_ALL_REQ:
556 		RECOVERING_SKIP;
557 		CLIENT_MASTERCHK;
558 		MASTER_UPDATE(env, renv);
559 		ret = __rep_blob_allreq(env, eid, rec);
560 		CLIENT_REREQ;
561 		break;
562 	case REP_BLOB_CHUNK:
563 		/* Handle even if in recovery. */
564 		CLIENT_ONLY(rep, rp);
565 		ret = __rep_blob_chunk(env, eid, ip, rec);
566 		if (ret == DB_REP_PAGEDONE)
567 			ret = 0;
568 		break;
569 	case REP_BLOB_CHUNK_REQ:
570 		RECOVERING_SKIP;
571 		CLIENT_MASTERCHK;
572 		MASTER_UPDATE(env, renv);
573 		ret = __rep_blob_chunk_req(env, eid, rec);
574 		CLIENT_REREQ;
575 		break;
576 	case REP_BLOB_UPDATE:
577 		CLIENT_ONLY(rep, rp);
578 		ret = __rep_blob_update(env, eid, ip, rec);
579 		break;
580 	case REP_BLOB_UPDATE_REQ:
581 		MASTER_ONLY(rep, rp);
582 		MASTER_UPDATE(env, renv);
583 		ret = __rep_blob_update_req(env, eid, ip, rec);
584 		break;
585 	case REP_BULK_LOG:
586 		RECOVERING_LOG_SKIP;
587 		CLIENT_ONLY(rep, rp);
588 		ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp);
589 		break;
590 	case REP_BULK_PAGE:
591 		/*
592 		 * Handle even if we're recovering.
593 		 */
594 		CLIENT_ONLY(rep, rp);
595 		ret = __rep_bulk_page(env, ip, eid, rp, rec);
596 		break;
597 	case REP_DUPMASTER:
598 		/*
599 		 * Handle even if we're recovering.
600 		 */
601 		if (F_ISSET(rep, REP_F_MASTER))
602 			ret = DB_REP_DUPMASTER;
603 		break;
604 #ifdef NOTYET
605 	case REP_FILE: /* TODO */
606 		CLIENT_ONLY(rep, rp);
607 		break;
608 	case REP_FILE_REQ:
609 		ret = __rep_send_file(env, rec, eid);
610 		break;
611 #endif
612 	case REP_FILE_FAIL:
613 		/*
614 		 * Handle even if we're recovering.
615 		 */
616 		CLIENT_ONLY(rep, rp);
617 		/*
618 		 * Clean up any internal init that was in progress.
619 		 */
620 		if (eid == rep->master_id) {
621 			REP_SYSTEM_LOCK(env);
622 			/*
623 			 * If we're already locking out messages, give up.
624 			 */
625 			if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG))
626 				goto errhlk;
627 			/*
628 			 * Lock out other messages to prevent race
629 			 * conditions.
630 			 */
631 			if ((ret =
632 			    __rep_lockout_msg(env, rep, 1)) != 0) {
633 				goto errhlk;
634 			}
635 			lockout = 1;
636 			/*
637 			 * Need mtx_clientdb to safely clean up
638 			 * page database in __rep_init_cleanup().
639 			 */
640 			REP_SYSTEM_UNLOCK(env);
641 			MUTEX_LOCK(env, rep->mtx_clientdb);
642 			REP_SYSTEM_LOCK(env);
643 			/*
644 			 * Clean up internal init if one was in progress.
645 			 */
646 			if (ISSET_LOCKOUT_BDB(rep)) {
647 				RPRINT(env, (env, DB_VERB_REP_MSGS,
648     "FILE_FAIL is cleaning up old internal init"));
649 #ifdef	CONFIG_TEST
650 				STAT(rep->stat.st_filefail_cleanups++);
651 #endif
652 				ret = __rep_init_cleanup(env, rep, DB_FORCE);
653 				F_CLR(rep, REP_F_ABBREVIATED);
654 				CLR_RECOVERY_SETTINGS(rep);
655 #ifdef HAVE_REPLICATION_THREADS
656 				db_rep->abbrev_init = FALSE;
657 #endif
658 			}
659 			MUTEX_UNLOCK(env, rep->mtx_clientdb);
660 			if (ret != 0) {
661 				RPRINT(env, (env, DB_VERB_REP_MSGS,
662     "FILE_FAIL error cleaning up internal init: %d", ret));
663 				goto errhlk;
664 			}
665 			FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
666 			lockout = 0;
667 			/*
668 			 * Restart internal init, setting UPDATE flag and
669 			 * zeroing applicable LSNs.
670 			 */
671 			rep->sync_state = SYNC_UPDATE;
672 			ZERO_LSN(rep->first_lsn);
673 			ZERO_LSN(rep->ckp_lsn);
674 			REP_SYSTEM_UNLOCK(env);
675 			(void)__rep_send_message(env, eid, REP_UPDATE_REQ,
676 			    NULL, NULL, 0, 0);
677 		}
678 		break;
679 	case REP_LEASE_GRANT:
680 		/*
681 		 * Handle even if we're recovering.
682 		 */
683 		MASTER_ONLY(rep, rp);
684 		ret = __rep_lease_grant(env, rp, rec, eid);
685 		break;
686 	case REP_LOG:
687 	case REP_LOG_MORE:
688 		RECOVERING_LOG_SKIP;
689 		CLIENT_ONLY(rep, rp);
690 		ret = __rep_log(env, ip, rp, rec, eid, savetime, ret_lsnp);
691 		break;
692 	case REP_LOG_REQ:
693 		RECOVERING_SKIP;
694 		CLIENT_MASTERCHK;
695 		if (F_ISSET(rp, REPCTL_INIT))
696 			MASTER_UPDATE(env, renv);
697 		ret = __rep_logreq(env, rp, rec, eid);
698 		CLIENT_REREQ;
699 		break;
700 	case REP_NEWSITE:
701 		/*
702 		 * Handle even if we're recovering.
703 		 */
704 		/* We don't hold the rep mutex, and may miscount. */
705 		STAT(rep->stat.st_newsites++);
706 
707 		/* This is a rebroadcast; simply tell the application. */
708 		if (F_ISSET(rep, REP_F_MASTER)) {
709 			dblp = env->lg_handle;
710 			lp = dblp->reginfo.primary;
711 			LOG_SYSTEM_LOCK(env);
712 			lsn = lp->lsn;
713 			LOG_SYSTEM_UNLOCK(env);
714 			(void)__rep_send_message(env,
715 			    eid, REP_NEWMASTER, &lsn, NULL, 0, 0);
716 		}
717 		ret = DB_REP_NEWSITE;
718 		break;
719 	case REP_NEWCLIENT:
720 		/*
721 		 * Handle even if we're recovering.
722 		 */
723 		/*
724 		 * This message was received and should have resulted in the
725 		 * application entering the machine ID in its machine table.
726 		 * We respond to this with an ALIVE to send relevant information
727 		 * to the new client (if we are a master, we'll send a
728 		 * NEWMASTER, so we only need to send the ALIVE if we're a
729 		 * client).  But first, broadcast the new client's record to
730 		 * all the clients.
731 		 */
732 		(void)__rep_send_message(env,
733 		    DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0);
734 
735 		ret = DB_REP_NEWSITE;
736 
737 		if (F_ISSET(rep, REP_F_CLIENT)) {
738 			REP_SYSTEM_LOCK(env);
739 			egen_arg.egen = rep->egen;
740 
741 			/*
742 			 * Clean up any previous master remnants by making
743 			 * master_id invalid and cleaning up any internal
744 			 * init that was in progress.
745 			 */
746 			if (eid == rep->master_id) {
747 				rep->master_id = DB_EID_INVALID;
748 
749 				/*
750 				 * Already locking out messages, must be
751 				 * in sync-up recover or internal init,
752 				 * give up.
753 				 */
754 				if (FLD_ISSET(rep->lockout_flags,
755 				    REP_LOCKOUT_MSG))
756 					goto errhlk;
757 
758 				/*
759 				 * Lock out other messages to prevent race
760 				 * conditions.
761 				 */
762 				if ((t_ret =
763 				    __rep_lockout_msg(env, rep, 1)) != 0) {
764 					ret = t_ret;
765 					goto errhlk;
766 				}
767 				lockout = 1;
768 
769 				/*
770 				 * Need mtx_clientdb to safely clean up
771 				 * page database in __rep_init_cleanup().
772 				 */
773 				REP_SYSTEM_UNLOCK(env);
774 				MUTEX_LOCK(env, rep->mtx_clientdb);
775 				REP_SYSTEM_LOCK(env);
776 
777 				/*
778 				 * Clean up internal init if one was in
779 				 * progress.
780 				 */
781 				if (ISSET_LOCKOUT_BDB(rep)) {
782 					RPRINT(env, (env, DB_VERB_REP_MSGS,
783     "NEWCLIENT is cleaning up old internal init for invalid master"));
784 					t_ret = __rep_init_cleanup(env,
785 					    rep, DB_FORCE);
786 					F_CLR(rep, REP_F_ABBREVIATED);
787 					CLR_RECOVERY_SETTINGS(rep);
788 #ifdef HAVE_REPLICATION_THREADS
789 					db_rep->abbrev_init = FALSE;
790 #endif
791 				}
792 				MUTEX_UNLOCK(env, rep->mtx_clientdb);
793 				if (t_ret != 0) {
794 					ret = t_ret;
795 					RPRINT(env, (env, DB_VERB_REP_MSGS,
796     "NEWCLIENT error cleaning up internal init for invalid master: %d", ret));
797 					goto errhlk;
798 				}
799 				FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
800 				lockout = 0;
801 			}
802 			REP_SYSTEM_UNLOCK(env);
803 			if ((ret = __rep_egen_marshal(env, &egen_arg,
804 			    buf, __REP_EGEN_SIZE, &len)) != 0)
805 				goto errlock;
806 			DB_INIT_DBT(data_dbt, buf, len);
807 			(void)__rep_send_message(env, DB_EID_BROADCAST,
808 			    REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
809 			break;
810 		}
811 		/* FALLTHROUGH */
812 	case REP_MASTER_REQ:
813 		RECOVERING_SKIP;
814 		if (F_ISSET(rep, REP_F_MASTER)) {
815 			LOG_SYSTEM_LOCK(env);
816 			lsn = lp->lsn;
817 			LOG_SYSTEM_UNLOCK(env);
818 			(void)__rep_send_message(env,
819 			    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
820 			if (IS_USING_LEASES(env))
821 				(void)__rep_lease_refresh(env);
822 		}
823 		/*
824 		 * If there is no master, then we could get into a state
825 		 * where an old client lost the initial ALIVE message and
826 		 * is calling an election under an old gen and can
827 		 * never get to the current gen.
828 		 */
829 		if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
830 			REP_SYSTEM_LOCK(env);
831 			egen_arg.egen = rep->egen;
832 			if (eid == rep->master_id)
833 				rep->master_id = DB_EID_INVALID;
834 			REP_SYSTEM_UNLOCK(env);
835 			if ((ret = __rep_egen_marshal(env, &egen_arg,
836 			    buf, __REP_EGEN_SIZE, &len)) != 0)
837 				goto errlock;
838 			DB_INIT_DBT(data_dbt, buf, len);
839 			(void)__rep_send_message(env, eid,
840 			    REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
841 		}
842 		break;
843 	case REP_NEWFILE:
844 		RECOVERING_LOG_SKIP;
845 		CLIENT_ONLY(rep, rp);
846 		ret = __rep_apply(env,
847 		     ip, rp, rec, ret_lsnp, NULL, &last_lsn);
848 		if (ret == DB_REP_LOGREADY)
849 			ret = __rep_logready(env, rep, savetime, &last_lsn);
850 		break;
851 	case REP_NEWMASTER:
852 		/*
853 		 * Handle even if we're recovering.
854 		 */
855 		ANYSITE(rep);
856 		if (F_ISSET(rep, REP_F_MASTER) &&
857 		    eid != rep->eid) {
858 			/* We don't hold the rep mutex, and may miscount. */
859 			STAT(rep->stat.st_dupmasters++);
860 			ret = DB_REP_DUPMASTER;
861 			if (IS_USING_LEASES(env))
862 				DB_ASSERT(env,
863 				    __rep_lease_check(env, 0) ==
864 				    DB_REP_LEASE_EXPIRED);
865 			else
866 				(void)__rep_send_message(env,
867 				    DB_EID_BROADCAST, REP_DUPMASTER,
868 				    NULL, NULL, 0, 0);
869 			break;
870 		}
871 		if ((ret =
872 		    __rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER)
873 			ret = __rep_fire_newmaster(env, rp->gen, eid);
874 		break;
875 	case REP_PAGE:
876 	case REP_PAGE_FAIL:
877 	case REP_PAGE_MORE:
878 		/*
879 		 * Handle even if we're recovering.
880 		 */
881 		CLIENT_ONLY(rep, rp);
882 		ret = __rep_page(env, ip, eid, rp, rec);
883 		if (ret == DB_REP_PAGEDONE)
884 			ret = 0;
885 		break;
886 	case REP_PAGE_REQ:
887 		RECOVERING_SKIP;
888 		CLIENT_MASTERCHK;
889 		MASTER_UPDATE(env, renv);
890 		ret = __rep_page_req(env, ip, eid, rp, rec);
891 		CLIENT_REREQ;
892 		break;
893 	case REP_REREQUEST:
894 		/*
895 		 * Handle even if we're recovering.  Don't do a master
896 		 * check.
897 		 */
898 		CLIENT_ONLY(rep, rp);
899 		/*
900 		 * Don't hold any mutex, may miscount.
901 		 */
902 		STAT(rep->stat.st_client_rerequests++);
903 		ret = __rep_resend_req(env, 1);
904 		break;
905 	case REP_START_SYNC:
906 		RECOVERING_SKIP;
907 		MUTEX_LOCK(env, rep->mtx_clientdb);
908 		cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
909 		/*
910 		 * The comparison needs to be <= because the LSN in
911 		 * the message can be the LSN of the first outstanding
912 		 * txn, which may be the LSN immediately after the
913 		 * previous commit.  The ready_lsn is the LSN of the
914 		 * next record expected.  In that case, the LSNs
915 		 * could be equal and the client has the commit and
916 		 * wants to sync. [SR #15338]
917 		 */
918 		if (cmp <= 0) {
919 			MUTEX_UNLOCK(env, rep->mtx_clientdb);
920 			do_sync = 1;
921 		} else {
922 			STAT(rep->stat.st_startsync_delayed++);
923 			/*
924 			 * There are cases where keeping the first ckp_lsn
925 			 * LSN is advantageous and cases where keeping
926 			 * a later LSN is better.  If random, earlier
927 			 * log records are missing, keeping the later
928 			 * LSN seems to be better.  That is what we'll
929 			 * do for now.
930 			 */
931 			if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0)
932 				rep->ckp_lsn = rp->lsn;
933 			RPRINT(env, (env, DB_VERB_REP_MSGS,
934     "Delayed START_SYNC memp_sync due to missing records."));
935 			RPRINT(env, (env, DB_VERB_REP_MSGS,
936     "ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]",
937 		    (u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset,
938 		    (u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset));
939 			MUTEX_UNLOCK(env, rep->mtx_clientdb);
940 		}
941 		break;
942 	case REP_UPDATE:
943 		/*
944 		 * Handle even if we're recovering.
945 		 */
946 		CLIENT_ONLY(rep, rp);
947 		if ((ret = __rep_update_setup(env,
948 		    eid, rp, rec, savetime, &lsn)) == DB_REP_WOULDROLLBACK &&
949 		    ret_lsnp != NULL) {
950 			/*
951 			 * Not for a normal internal init.  But this could
952 			 * happen here if we had to ask for an UPDATE message in
953 			 * order to check for materializing NIMDBs; in other
954 			 * words, an "abbreviated internal init."
955 			 */
956 			*ret_lsnp = lsn;
957 		}
958 		break;
959 	case REP_UPDATE_REQ:
960 		/*
961 		 * Handle even if we're recovering.
962 		 */
963 		MASTER_ONLY(rep, rp);
964 		infop = env->reginfo;
965 		renv = infop->primary;
966 		MASTER_UPDATE(env, renv);
967 		ret = __rep_update_req(env, rp);
968 		break;
969 	case REP_VERIFY:
970 		if (recovering) {
971 			MUTEX_LOCK(env, rep->mtx_clientdb);
972 			cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn);
973 			MUTEX_UNLOCK(env, rep->mtx_clientdb);
974 			/*
975 			 * If this is not the verify record I want, skip it.
976 			 */
977 			if (cmp != 0) {
978 				ret = __rep_skip_msg(
979 				    env, rep, eid, rp->rectype);
980 				break;
981 			}
982 		}
983 		CLIENT_ONLY(rep, rp);
984 		if ((ret = __rep_verify(env, rp, rec, eid, savetime)) ==
985 		    DB_REP_WOULDROLLBACK && ret_lsnp != NULL)
986 			*ret_lsnp = rp->lsn;
987 		break;
988 	case REP_VERIFY_FAIL:
989 		/*
990 		 * Handle even if we're recovering.
991 		 */
992 		CLIENT_ONLY(rep, rp);
993 		ret = __rep_verify_fail(env, rp);
994 		break;
995 	case REP_VERIFY_REQ:
996 		RECOVERING_SKIP;
997 		CLIENT_MASTERCHK;
998 		ret = __rep_verify_req(env, rp, eid);
999 		CLIENT_REREQ;
1000 		break;
1001 	case REP_VOTE1:
1002 		/*
1003 		 * Handle even if we're recovering.
1004 		 */
1005 		ret = __rep_vote1(env, rp, rec, eid);
1006 		break;
1007 	case REP_VOTE2:
1008 		/*
1009 		 * Handle even if we're recovering.
1010 		 */
1011 		ret = __rep_vote2(env, rec, eid);
1012 		break;
1013 	default:
1014 		__db_errx(env, DB_STR_A("3521",
1015 	"DB_ENV->rep_process_message: unknown replication message: type %lu",
1016 		   "%lu"), (u_long)rp->rectype);
1017 		ret = EINVAL;
1018 		break;
1019 	}
1020 
1021 errlock:
1022 	REP_SYSTEM_LOCK(env);
1023 errhlk:	if (lockout)
1024 		FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
1025 	rep->msg_th--;
1026 	REP_SYSTEM_UNLOCK(env);
1027 	if (do_sync) {
1028 		MUTEX_LOCK(env, rep->mtx_ckp);
1029 		lsn = rp->lsn;
1030 		/*
1031 		 * This is the REP_START_SYNC sync, and so we permit it to be
1032 		 * interrupted.
1033 		 */
1034 		ret = __memp_sync(
1035 		    env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn);
1036 		MUTEX_UNLOCK(env, rep->mtx_ckp);
1037 		RPRINT(env, (env, DB_VERB_REP_MSGS,
1038 		    "START_SYNC: Completed sync [%lu][%lu]",
1039 		    (u_long)lsn.file, (u_long)lsn.offset));
1040 	}
1041 out:
1042 	if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
1043 		if (ret_lsnp != NULL)
1044 			*ret_lsnp = rp->lsn;
1045 		ret = DB_REP_NOTPERM;
1046 	}
1047 	return (ret);
1048 }
1049 
1050 /*
1051  * __rep_apply --
1052  *
1053  * Handle incoming log records on a client, applying when possible and
1054  * entering into the bookkeeping table otherwise.  This routine manages
1055  * the state of the incoming message stream -- processing records, via
1056  * __rep_process_rec, when possible and enqueuing in the __db.rep.db
1057  * when necessary.  As gaps in the stream are filled in, this is where
1058  * we try to process as much as possible from __db.rep.db to catch up.
1059  *
1060  * PUBLIC: int __rep_apply __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
1061  * PUBLIC:     DBT *, DB_LSN *, int *, DB_LSN *));
1062  */
1063 int
__rep_apply(env,ip,rp,rec,ret_lsnp,is_dupp,last_lsnp)1064 __rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp)
1065 	ENV *env;
1066 	DB_THREAD_INFO *ip;
1067 	__rep_control_args *rp;
1068 	DBT *rec;
1069 	DB_LSN *ret_lsnp;
1070 	int *is_dupp;
1071 	DB_LSN *last_lsnp;
1072 {
1073 	DB *dbp;
1074 	DBT control_dbt, key_dbt;
1075 	DBT rec_dbt;
1076 	DB_LOG *dblp;
1077 	DB_LSN max_lsn, save_lsn;
1078 	DB_REP *db_rep;
1079 	LOG *lp;
1080 	REP *rep;
1081 	db_timespec msg_time, max_ts;
1082 	u_int32_t gen, rectype;
1083 	int cmp, event, master, newfile_seen, ret, set_apply, t_ret;
1084 
1085 	COMPQUIET(gen, 0);
1086 	COMPQUIET(master, DB_EID_INVALID);
1087 
1088 	db_rep = env->rep_handle;
1089 	rep = db_rep->region;
1090 	event = ret = set_apply = 0;
1091 	memset(&control_dbt, 0, sizeof(control_dbt));
1092 	memset(&rec_dbt, 0, sizeof(rec_dbt));
1093 	ZERO_LSN(max_lsn);
1094 	timespecclear(&max_ts);
1095 	timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1096 	cmp = -2;		/* OOB value that LOG_COMPARE can't return. */
1097 
1098 	dblp = env->lg_handle;
1099 	MUTEX_LOCK(env, rep->mtx_clientdb);
1100 	/*
1101 	 * Lazily open the temp db.  Always set the startup flag to 0
1102 	 * because it was initialized from rep_start.
1103 	 */
1104 	if (db_rep->rep_db == NULL &&
1105 	    (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
1106 		MUTEX_UNLOCK(env, rep->mtx_clientdb);
1107 		goto out;
1108 	}
1109 	dbp = db_rep->rep_db;
1110 	lp = dblp->reginfo.primary;
1111 	newfile_seen = 0;
1112 	REP_SYSTEM_LOCK(env);
1113 	if (rep->sync_state == SYNC_LOG &&
1114 	    LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0)
1115 		lp->ready_lsn = rep->first_lsn;
1116 	cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
1117 	/*
1118 	 * If we are going to skip or process any message other
1119 	 * than a duplicate, make note of it if we're in an
1120 	 * election so that the election can rerequest proactively.
1121 	 */
1122 	if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_APPLY) && cmp >= 0)
1123 		F_SET(rep, REP_F_SKIPPED_APPLY);
1124 
1125 	/*
1126 	 * If we're in the middle of processing a NEWFILE, we've dropped
1127 	 * the mutex and if this matches it is a duplicate record.  We
1128 	 * do not want this call taking the "matching" code below because
1129 	 * we may then process later records in the temp db and the
1130 	 * original NEWFILE may not have the log file ready.  It will
1131 	 * process those temp db items when it completes.
1132 	 */
1133 	if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0)
1134 		cmp = -1;
1135 
1136 	if (cmp == 0) {
1137 		/*
1138 		 * If we are in an election (i.e. we've sent a vote
1139 		 * with an LSN in it), then we drop the next record
1140 		 * we're expecting.  When we find a master, we'll
1141 		 * either go into sync, or if it was an existing
1142 		 * master, rerequest this one record (later records
1143 		 * are accumulating in the temp db).
1144 		 *
1145 		 * We can simply return here, and rep_process_message
1146 		 * will set NOTPERM if necessary for this record.
1147 		 */
1148 		if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_APPLY)) {
1149 			/*
1150 			 * We will simply return now.  All special return
1151 			 * processing should be ignored because the special
1152 			 * values are just initialized.  Variables like
1153 			 * max_lsn are still 0.
1154 			 */
1155 			RPRINT(env, (env, DB_VERB_REP_MISC,
1156 			    "rep_apply: In election. Ignoring [%lu][%lu]",
1157 			    (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1158 			REP_SYSTEM_UNLOCK(env);
1159 			MUTEX_UNLOCK(env, rep->mtx_clientdb);
1160 			goto out;
1161 		}
1162 		rep->apply_th++;
1163 		set_apply = 1;
1164 		VPRINT(env, (env, DB_VERB_REP_MISC,
1165 		    "rep_apply: Set apply_th %d", rep->apply_th));
1166 		REP_SYSTEM_UNLOCK(env);
1167 		if (rp->rectype == REP_NEWFILE)
1168 			newfile_seen = 1;
1169 		if ((ret = __rep_process_rec(env, ip,
1170 		    rp, rec, &max_ts, &max_lsn)) != 0)
1171 			goto err;
1172 		/*
1173 		 * If we get the record we are expecting, reset
1174 		 * the count of records we've received and are applying
1175 		 * towards the request interval.
1176 		 */
1177 		__os_gettime(env, &lp->rcvd_ts, 1);
1178 		ZERO_LSN(lp->max_wait_lsn);
1179 
1180 		/*
1181 		 * The __rep_remfirst() and __rep_getnext() functions each open,
1182 		 * use and then close a cursor on the temp db, each time through
1183 		 * the loop.  Although this may seem excessive, it is necessary
1184 		 * to avoid locking problems with checkpoints.
1185 		 */
1186 		while (ret == 0 &&
1187 		    LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
1188 			/*
1189 			 * We just filled in a gap in the log record stream.
1190 			 * Write subsequent records to the log.
1191 			 */
1192 gap_check:
1193 			if ((ret = __rep_remfirst(env, ip,
1194 			     &control_dbt, &rec_dbt)) != 0)
1195 				goto err;
1196 
1197 			rp = (__rep_control_args *)control_dbt.data;
1198 			timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1199 			rec = &rec_dbt;
1200 			if (rp->rectype == REP_NEWFILE)
1201 				newfile_seen = 1;
1202 			if ((ret = __rep_process_rec(env, ip,
1203 			    rp, rec, &max_ts, &max_lsn)) != 0)
1204 				goto err;
1205 
1206 			STAT(--rep->stat.st_log_queued);
1207 
1208 			/*
1209 			 * Since we just filled a gap in the log stream, and
1210 			 * we're writing subsequent records to the log, we want
1211 			 * to use rcvd_ts and wait_ts so that we will
1212 			 * request the next gap if we end up with a gap and
1213 			 * not so recent records in the temp db, but not
1214 			 * request if recent records are in the temp db and
1215 			 * likely to arrive on its own shortly.  We want to
1216 			 * avoid requesting the record in that case.  Also
1217 			 * reset max_wait_lsn because the next gap is a
1218 			 * fresh gap.
1219 			 */
1220 			lp->rcvd_ts = lp->last_ts;
1221 			lp->wait_ts = rep->request_gap;
1222 			if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) {
1223 				__os_gettime(env, &lp->rcvd_ts, 1);
1224 				ret = 0;
1225 				break;
1226 			} else if (ret != 0)
1227 				goto err;
1228 		}
1229 
1230 		/*
1231 		 * Check if we're at a gap in the table and if so, whether we
1232 		 * need to ask for any records.
1233 		 */
1234 		if (!IS_ZERO_LSN(lp->waiting_lsn) &&
1235 		    LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
1236 			/*
1237 			 * We got a record and processed it, but we may
1238 			 * still be waiting for more records.  If we
1239 			 * filled a gap we keep a count of how many other
1240 			 * records are in the temp database and if we should
1241 			 * request the next gap at this time.
1242 			 */
1243 			if (__rep_check_doreq(env, rep) && (ret =
1244 			    __rep_loggap_req(env, rep, &rp->lsn, 0)) != 0)
1245 				goto err;
1246 		} else {
1247 			lp->wait_ts = rep->request_gap;
1248 			ZERO_LSN(lp->max_wait_lsn);
1249 		}
1250 
1251 	} else if (cmp > 0) {
1252 		/*
1253 		 * The LSN is higher than the one we were waiting for.
1254 		 * This record isn't in sequence; add it to the temporary
1255 		 * database, update waiting_lsn if necessary, and perform
1256 		 * calculations to determine if we should issue requests
1257 		 * for new records.
1258 		 */
1259 		REP_SYSTEM_UNLOCK(env);
1260 		memset(&key_dbt, 0, sizeof(key_dbt));
1261 		key_dbt.data = rp;
1262 		key_dbt.size = sizeof(*rp);
1263 		ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
1264 		if (ret == 0) {
1265 			STAT(rep->stat.st_log_queued++);
1266 			__os_gettime(env, &lp->last_ts, 1);
1267 #ifdef HAVE_STATISTICS
1268 			rep->stat.st_log_queued_total++;
1269 			if (rep->stat.st_log_queued_max <
1270 			    rep->stat.st_log_queued)
1271 				rep->stat.st_log_queued_max =
1272 				    rep->stat.st_log_queued;
1273 #endif
1274 		}
1275 
1276 		if (ret == DB_KEYEXIST) {
1277 			STAT(rep->stat.st_log_duplicated++);
1278 #ifdef	CONFIG_TEST
1279 			STAT(rep->stat.st_log_futuredup++);
1280 #endif
1281 			if (is_dupp != NULL) {
1282 				*is_dupp = 1;
1283 				/*
1284 				 * Could get overwritten by max_lsn later,
1285 				 * but only when returning NOTPERM for a
1286 				 * REPCTL_PERM record, in which case max_lsn
1287 				 * is this log record.
1288 				 */
1289 				if (ret_lsnp != NULL)
1290 					*ret_lsnp = lp->ready_lsn;
1291 			}
1292 			ret = 0;
1293 		}
1294 		if (ret != 0 && ret != ENOMEM)
1295 			goto done;
1296 
1297 		/*
1298 		 * If we are using in-memory, and got ENOMEM, it is
1299 		 * not an error.  But in that case we want to skip
1300 		 * comparing the message LSN since we're not storing it.
1301 		 * However, we do want continue to check if we need to
1302 		 * send a request for the gap.
1303 		 */
1304 		if (ret == 0 && (IS_ZERO_LSN(lp->waiting_lsn) ||
1305 		    LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0)) {
1306 			/*
1307 			 * If this is a new gap, then reset the rcvd_ts so
1308 			 * that an out-of-order record after an idle period
1309 			 * does not (likely) immediately rerequest.
1310 			 */
1311 			if (IS_ZERO_LSN(lp->waiting_lsn))
1312 				__os_gettime(env, &lp->rcvd_ts, 1);
1313 			lp->waiting_lsn = rp->lsn;
1314 		}
1315 
1316 		if (__rep_check_doreq(env, rep) &&
1317 		    (ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0))
1318 			goto err;
1319 
1320 		/*
1321 		 * If this is permanent; let the caller know that we have
1322 		 * not yet written it to disk, but we've accepted it.
1323 		 */
1324 		if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
1325 			max_lsn = rp->lsn;
1326 			ret = DB_REP_NOTPERM;
1327 		}
1328 		goto done;
1329 	} else {
1330 		STAT(rep->stat.st_log_duplicated++);
1331 		REP_SYSTEM_UNLOCK(env);
1332 		if (is_dupp != NULL) {
1333 			*is_dupp = 1;
1334 			/*
1335 			 * Could get overwritten by max_lsn later.
1336 			 * But max_lsn is guaranteed <= ready_lsn, so
1337 			 * it would be a more conservative LSN to return.
1338 			 */
1339 			if (ret_lsnp != NULL)
1340 				*ret_lsnp = lp->ready_lsn;
1341 		}
1342 		LOGCOPY_32(env, &rectype, rec->data);
1343 		if (IS_PERM_RECTYPE(rectype))
1344 			max_lsn = lp->max_perm_lsn;
1345 		/*
1346 		 * We check REPCTL_LEASE here, because this client may
1347 		 * have leases configured but the master may not (especially
1348 		 * in a mixed version group.  If the master has leases
1349 		 * configured, all clients must also.
1350 		 */
1351 		if (IS_USING_LEASES(env) &&
1352 		    F_ISSET(rp, REPCTL_LEASE) &&
1353 		    timespecisset(&msg_time)) {
1354 			if (timespeccmp(&msg_time, &lp->max_lease_ts, >))
1355 				max_ts = msg_time;
1356 			else
1357 				max_ts = lp->max_lease_ts;
1358 		}
1359 		goto done;
1360 	}
1361 
1362 	/* Check if we need to go back into the table. */
1363 	if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0)
1364 		goto gap_check;
1365 
1366 done:
1367 err:	/*
1368 	 * In case of a race, to make sure only one thread can get
1369 	 * DB_REP_LOGREADY, zero out rep->last_lsn to show that we've gotten to
1370 	 * this point.
1371 	 */
1372 	REP_SYSTEM_LOCK(env);
1373 	if (ret == 0 &&
1374 	    rep->sync_state == SYNC_LOG &&
1375 	    !IS_ZERO_LSN(rep->last_lsn) &&
1376 	    LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) {
1377 		*last_lsnp = max_lsn;
1378 		ZERO_LSN(rep->last_lsn);
1379 		ZERO_LSN(max_lsn);
1380 		ret = DB_REP_LOGREADY;
1381 	}
1382 	/*
1383 	 * Only decrement if we were actually applying log records.
1384 	 * We do not care if we processed a dup record or put one
1385 	 * in the temp db.
1386 	 */
1387 	if (set_apply) {
1388 		rep->apply_th--;
1389 		VPRINT(env, (env, DB_VERB_REP_MISC,
1390 		    "rep_apply: Decrement apply_th %d [%lu][%lu]",
1391 		    rep->apply_th, (u_long)lp->ready_lsn.file,
1392 		    (u_long)lp->ready_lsn.offset));
1393 	}
1394 
1395 	if (ret == 0 && rep->sync_state != SYNC_LOG &&
1396 	    !IS_ZERO_LSN(max_lsn)) {
1397 		if (ret_lsnp != NULL)
1398 			*ret_lsnp = max_lsn;
1399 		ret = DB_REP_ISPERM;
1400 		DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0);
1401 		lp->max_perm_lsn = max_lsn;
1402 		if ((t_ret = __rep_notify_threads(env, AWAIT_LSN)) != 0)
1403 			ret = t_ret;
1404 	}
1405 
1406 	/*
1407 	 * Start-up is complete when we process (or have already processed) up
1408 	 * to the end of the replication group's log.  In case we miss that
1409 	 * message, as a back-up, we also recognize start-up completion when we
1410 	 * actually process a live log record.  Having cmp==0 here (with a good
1411 	 * "ret" value) implies we actually processed the record.
1412 	 */
1413 	if ((ret == 0 || ret == DB_REP_ISPERM) &&
1414 	    rep->stat.st_startup_complete == 0 &&
1415 	    rep->sync_state != SYNC_LOG &&
1416 	    ((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) ||
1417 	    (cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) {
1418 		rep->stat.st_startup_complete = 1;
1419 		event = 1;
1420 		gen = rep->gen;
1421 		master = rep->master_id;
1422 	}
1423 	REP_SYSTEM_UNLOCK(env);
1424 	/*
1425 	 * If we've processed beyond the needed LSN for a pending
1426 	 * start sync, start it now.  We must compare > here
1427 	 * because ready_lsn is the next record we expect and if
1428 	 * the last record is a commit, that will dirty pages on
1429 	 * a client as that txn is applied.
1430 	 */
1431 	if (!IS_ZERO_LSN(rep->ckp_lsn) &&
1432 	    LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) > 0) {
1433 		save_lsn = rep->ckp_lsn;
1434 		ZERO_LSN(rep->ckp_lsn);
1435 	} else
1436 		ZERO_LSN(save_lsn);
1437 
1438 	/*
1439 	 * If this is a perm record, we are using leases, update the lease
1440 	 * grant.  We must hold the clientdb mutex.  We must not hold
1441 	 * the region mutex because rep_update_grant will acquire it.
1442 	 */
1443 	if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) &&
1444 	    timespecisset(&max_ts)) {
1445 		if ((t_ret = __rep_update_grant(env, &max_ts)) != 0)
1446 			ret = t_ret;
1447 		else if (timespeccmp(&max_ts, &lp->max_lease_ts, >))
1448 			lp->max_lease_ts = max_ts;
1449 	}
1450 
1451 	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1452 	if (!IS_ZERO_LSN(save_lsn)) {
1453 		/*
1454 		 * Now call memp_sync holding only the ckp mutex.
1455 		 */
1456 		MUTEX_LOCK(env, rep->mtx_ckp);
1457 		RPRINT(env, (env, DB_VERB_REP_MISC,
1458 		    "Starting delayed __memp_sync call [%lu][%lu]",
1459 		    (u_long)save_lsn.file, (u_long)save_lsn.offset));
1460 		t_ret = __memp_sync(env,
1461 		    DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &save_lsn);
1462 		MUTEX_UNLOCK(env, rep->mtx_ckp);
1463 	}
1464 	if (event) {
1465 		RPRINT(env, (env, DB_VERB_REP_MISC,
1466 		    "Start-up is done [%lu][%lu]",
1467 		    (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1468 
1469 		if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) {
1470 			DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM);
1471 			/* Failure trumps either of those values. */
1472 			ret = t_ret;
1473 			goto out;
1474 		}
1475 	}
1476 	if ((ret == 0 || ret == DB_REP_ISPERM) &&
1477 	    newfile_seen && lp->db_log_autoremove)
1478 		__log_autoremove(env);
1479 	if (control_dbt.data != NULL)
1480 		__os_ufree(env, control_dbt.data);
1481 	if (rec_dbt.data != NULL)
1482 		__os_ufree(env, rec_dbt.data);
1483 
1484 out:
1485 	switch (ret) {
1486 	case 0:
1487 		break;
1488 	case DB_REP_ISPERM:
1489 		VPRINT(env, (env, DB_VERB_REP_MSGS,
1490 		    "Returning ISPERM [%lu][%lu], cmp = %d",
1491 		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1492 		break;
1493 	case DB_REP_LOGREADY:
1494 		RPRINT(env, (env, DB_VERB_REP_MSGS,
1495 		    "Returning LOGREADY up to [%lu][%lu], cmp = %d",
1496 		    (u_long)last_lsnp->file,
1497 		    (u_long)last_lsnp->offset, cmp));
1498 		break;
1499 	case DB_REP_NOTPERM:
1500 		if (rep->sync_state != SYNC_LOG &&
1501 		    !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
1502 			*ret_lsnp = max_lsn;
1503 
1504 		VPRINT(env, (env, DB_VERB_REP_MSGS,
1505 		    "Returning NOTPERM [%lu][%lu], cmp = %d",
1506 		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1507 		break;
1508 	default:
1509 		RPRINT(env, (env, DB_VERB_REP_MSGS,
1510 		    "Returning %d [%lu][%lu], cmp = %d", ret,
1511 		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1512 		break;
1513 	}
1514 
1515 	return (ret);
1516 }
1517 
1518 /*
1519  * __rep_process_txn --
1520  *
1521  * This is the routine that actually applies a transaction's set of updates.
1522  *
1523  * PUBLIC: int __rep_process_txn __P((ENV *, DBT *));
1524  */
1525 int
__rep_process_txn(env,rec)1526 __rep_process_txn(env, rec)
1527 	ENV *env;
1528 	DBT *rec;
1529 {
1530 	DBT data_dbt, *lock_dbt;
1531 	DB_LOCKER *locker;
1532 	DB_LOCKREQ req, *lvp;
1533 	DB_LOGC *logc;
1534 	DB_LSN prev_lsn, *lsnp;
1535 	DB_REP *db_rep;
1536 	DB_THREAD_INFO *ip;
1537 	DB_TXNHEAD *txninfo;
1538 	DELAYED_BLOB_LIST *dblp, *dummy;
1539 	LSN_COLLECTION lc;
1540 	REP *rep;
1541 	__txn_regop_args *txn_args;
1542 	__txn_regop_42_args *txn42_args;
1543 	__txn_prepare_args *prep_args;
1544 	u_int32_t rectype;
1545 	u_int i;
1546 	int ret, t_ret;
1547 
1548 	db_rep = env->rep_handle;
1549 	rep = db_rep->region;
1550 	logc = NULL;
1551 	dblp = dummy = NULL;
1552 	txn_args = NULL;
1553 	txn42_args = NULL;
1554 	prep_args = NULL;
1555 	txninfo = NULL;
1556 
1557 	memset(&data_dbt, 0, sizeof(data_dbt));
1558 	if (F_ISSET(env, ENV_THREAD))
1559 		F_SET(&data_dbt, DB_DBT_REALLOC);
1560 
1561 	/*
1562 	 * There are two phases:  First, we have to traverse backwards through
1563 	 * the log records gathering the list of all LSNs in the transaction.
1564 	 * Once we have this information, we can loop through and then apply it.
1565 	 *
1566 	 * We may be passed a prepare (if we're restoring a prepare on upgrade)
1567 	 * instead of a commit (the common case).  Check which it is and behave
1568 	 * appropriately.
1569 	 */
1570 	LOGCOPY_32(env, &rectype, rec->data);
1571 	memset(&lc, 0, sizeof(lc));
1572 	if (rectype == DB___txn_regop) {
1573 		/*
1574 		 * We're the end of a transaction.  Make sure this is
1575 		 * really a commit and not an abort!
1576 		 */
1577 		if ((ret = __txn_regop_read(
1578 		    env, rec->data, &txn_args)) != 0)
1579 			return (ret);
1580 		if (txn_args->opcode != TXN_COMMIT) {
1581 			__os_free(env, txn_args);
1582 			return (0);
1583 		}
1584 		prev_lsn = txn_args->prev_lsn;
1585 		lock_dbt = &txn_args->locks;
1586 	} else {
1587 		/* We're a prepare. */
1588 		DB_ASSERT(env, rectype == DB___txn_prepare);
1589 
1590 		if ((ret = __txn_prepare_read(
1591 		    env, rec->data, &prep_args)) != 0)
1592 			return (ret);
1593 		prev_lsn = prep_args->prev_lsn;
1594 		lock_dbt = &prep_args->locks;
1595 	}
1596 
1597 	/* Get locks. */
1598 	if ((ret = __lock_id(env, NULL, &locker)) != 0)
1599 		goto err1;
1600 
1601 	/* We are always more important than user transactions. */
1602 	locker->priority = DB_LOCK_MAXPRIORITY;
1603 
1604 	if ((ret =
1605 	    __lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
1606 		goto err;
1607 
1608 	/* Phase 1.  Get a list of the LSNs in this transaction, and sort it. */
1609 	if ((ret = __rep_collect_txn(env, &prev_lsn, &lc, &dblp)) != 0)
1610 		goto err;
1611 
1612 	/* Deal with any child transactions that had to be delayed. */
1613 	while (dblp != NULL) {
1614 		if ((ret = __rep_collect_txn(
1615 		    env, &dblp->lsn, &lc, &dummy)) != 0)
1616 			goto err;
1617 		DB_ASSERT(env, dummy == NULL);
1618 		dummy = dblp;
1619 		dblp = dummy->next;
1620 		__os_free(env, dummy);
1621 		dummy = NULL;
1622 	}
1623 	qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
1624 
1625 	/*
1626 	 * The set of records for a transaction may include dbreg_register
1627 	 * records.  Create a txnlist so that they can keep track of file
1628 	 * state between records.
1629 	 */
1630 	ENV_GET_THREAD_INFO(env, ip);
1631 	if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0)
1632 		goto err;
1633 	/* Replication uses a transaction only when client mvcc is active. */
1634 	if (F_ISSET(env->dbenv, DB_ENV_MULTIVERSION) && (ret = __txn_begin(env,
1635 	    ip, NULL, &txninfo->txn, DB_TXN_SNAPSHOT | DB_TXN_DISPATCH)) != 0)
1636 		goto err;
1637 
1638 	/* Phase 2: Apply updates. */
1639 	if ((ret = __log_cursor(env, &logc)) != 0)
1640 		goto err;
1641 	for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
1642 		if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
1643 			__db_errx(env, DB_STR_A("3522",
1644 			    "failed to read the log at [%lu][%lu]", "%lu %lu"),
1645 			    (u_long)lsnp->file, (u_long)lsnp->offset);
1646 			goto err;
1647 		}
1648 		if ((ret = __db_dispatch(env, &env->recover_dtab,
1649 		    &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) {
1650 			__db_err(env, ret, DB_STR_A("3523",
1651 			    "transaction %x failed at [%lu][%lu]", "%lu %lu"),
1652 			    txninfo->txn->txnid,
1653 			    (u_long)lsnp->file, (u_long)lsnp->offset);
1654 			goto err;
1655 		}
1656 		LOGCOPY_32(env, &rectype, data_dbt.data);
1657 	}
1658 	if (txninfo->txn != NULL) {
1659 		ret = __txn_commit(txninfo->txn, 0);
1660 		txninfo->txn = NULL;
1661 		if (ret != 0) {
1662 			__db_errx(env, DB_STR_A("3715", "%lu %lu",
1663 			    "rep_process_txn [%lu][%lu] failed to commit"),
1664 			    (u_long)lc.array[lc.nlsns - 1].file,
1665 			    (u_long)lc.array[lc.nlsns - 1].offset);
1666 			goto err;
1667 		}
1668 	}
1669 
1670 err:	memset(&req, 0, sizeof(req));
1671 	req.op = DB_LOCK_PUT_ALL;
1672 	if ((t_ret =
1673 	     __lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0)
1674 		ret = t_ret;
1675 
1676 	if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0)
1677 		ret = t_ret;
1678 
1679 	while (dblp != NULL) {
1680 		dummy = dblp;
1681 		dblp = dummy->next;
1682 		__os_free(env, dummy);
1683 	}
1684 
1685 err1:	if (txn_args != NULL)
1686 		__os_free(env, txn_args);
1687 	if (txn42_args != NULL)
1688 		__os_free(env, txn42_args);
1689 	if (prep_args != NULL)
1690 		__os_free(env, prep_args);
1691 	if (lc.array != NULL)
1692 		__os_free(env, lc.array);
1693 
1694 	if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0)
1695 		ret = t_ret;
1696 
1697 	if (txninfo != NULL)
1698 		__db_txnlist_end(env, txninfo);
1699 
1700 	if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
1701 		__os_ufree(env, data_dbt.data);
1702 
1703 #ifdef HAVE_STATISTICS
1704 	if (ret == 0)
1705 		/*
1706 		 * We don't hold the rep mutex, and could miscount if we race.
1707 		 */
1708 		rep->stat.st_txns_applied++;
1709 #endif
1710 
1711 	return (ret);
1712 }
1713 
1714 /*
1715  * __rep_collect_txn
1716  *	Recursive function that will let us visit every entry in a transaction
1717  *	chain including all child transactions so that we can then apply
1718  *	the entire transaction family at once.
1719  */
1720 static int
__rep_collect_txn(env,lsnp,lc,dbl)1721 __rep_collect_txn(env, lsnp, lc, dbl)
1722 	ENV *env;
1723 	DB_LSN *lsnp;
1724 	LSN_COLLECTION *lc;
1725 	DELAYED_BLOB_LIST **dbl;
1726 {
1727 	__dbreg_register_args *dbregargp;
1728 	__txn_child_args *argp;
1729 	DB_LOGC *logc;
1730 	DB_LSN c_lsn;
1731 	DB_REP *db_rep;
1732 	DBT data;
1733 	db_seq_t blob_file_id;
1734 	u_int32_t child, rectype, skip_txnid;
1735 	u_int nalloc;
1736 	int ret, t_ret, view_partial;
1737 	char *name;
1738 
1739 	memset(&data, 0, sizeof(data));
1740 	F_SET(&data, DB_DBT_REALLOC);
1741 	skip_txnid = TXN_INVALID;
1742 
1743 	if ((ret = __log_cursor(env, &logc)) != 0)
1744 		return (ret);
1745 
1746 	/*
1747 	 * For partial replication we assume a certain sequence of
1748 	 * log records to detect a database create and skip it if
1749 	 * desired.  We are walking backward through the records of
1750 	 * a single transaction right now.
1751 	 *
1752 	 * A create operation is done inside a BDB-owned child txn.
1753 	 * Nothing else is done within this BDB-owned child txn.
1754 	 * The last piece of a create operations is the dbreg_register
1755 	 * log record that records the opening of the file.  That
1756 	 * log record contains the child txnid in the 'id' field, and
1757 	 * the file name.  At this point we invoke the partial callback
1758 	 * to determine if this database should be replicated.  If it
1759 	 * should not be replicated, we need to avoid collecting the
1760 	 * entire child txn referenced in the 'id' field.
1761 	 *
1762 	 * So if processing the dbreg_register record finds a database
1763 	 * to skip, we store the child txnid in 'skip_txnid'.  We use
1764 	 * 'skip_txnid' to avoid processing log records or making
1765 	 * recursive calls for that txnid.
1766 	 */
1767 	while (!IS_ZERO_LSN(*lsnp) &&
1768 	    (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) {
1769 		LOGCOPY_32(env, &rectype, data.data);
1770 		if (rectype == DB___txn_child) {
1771 			if ((ret = __txn_child_read(
1772 			    env, data.data, &argp)) != 0)
1773 				goto err;
1774 			c_lsn = argp->c_lsn;
1775 			*lsnp = argp->prev_lsn;
1776 			child = argp->child;
1777 			__os_free(env, argp);
1778 
1779 			if (child == skip_txnid && *dbl != NULL &&
1780 			    (*dbl)->child == child)
1781 				(*dbl)->lsn = c_lsn;
1782 			/*
1783 			 * If skip_txnid is set, it is the id of the child txnid
1784 			 * that creates a database we should skip.  So, if
1785 			 * this is that child txn, do not collect it.
1786 			 */
1787 			if (skip_txnid == TXN_INVALID || child != skip_txnid)
1788 				ret = __rep_collect_txn(env, &c_lsn, lc, dbl);
1789 		} else if (IS_VIEW_SITE(env) &&
1790 		    rectype == DB___dbreg_register) {
1791 			db_rep = env->rep_handle;
1792 			/*
1793 			 * If we are a view see if this is a file creation
1794 			 * stream.  On-disk files have the creating child txn
1795 			 * in the 'id' field and the name.  See if this view
1796 			 * wants this file.
1797 			 */
1798 			if ((ret = __dbreg_register_read(
1799 			    env, data.data, &dbregargp)) != 0)
1800 				goto err;
1801 			child = dbregargp->id;
1802 			name = (char *)dbregargp->name.data;
1803 			skip_txnid = TXN_INVALID;
1804 			if (child != TXN_INVALID &&
1805 			    (!IS_DB_FILE(name) || IS_BLOB_META(name))) {
1806 				/*
1807 				 * The 'id' has a child txn so it is a create.
1808 				 */
1809 				DB_ASSERT(env, db_rep->partial != NULL);
1810 				GET_LO_HI(env, dbregargp->blob_fid_lo,
1811 				    dbregargp->blob_fid_hi, blob_file_id, ret);
1812 				if (ret != 0)
1813 					goto err;
1814 				if ((ret = __rep_call_partial(env,
1815 				    name, &view_partial, 0, dbl)) != 0) {
1816 					VPRINT(env, (env, DB_VERB_REP_MISC,
1817 		    "rep_collect_txn: partial cb err %d for %s", ret, name));
1818 					__os_free(env, dbregargp);
1819 					goto err;
1820 				}
1821 				/*
1822 				 * Save the child txnid for when we walk back
1823 				 * into the txn_child record.
1824 				 */
1825 				if (view_partial == 0) {
1826 					skip_txnid = child;
1827 					if ((ret =
1828 					    __rep_remove_delayed_blobs(env,
1829 					    blob_file_id, child, dbl)) != 0)
1830 						goto err;
1831 				}
1832 			}
1833 			__os_free(env, dbregargp);
1834 		}
1835 		if (rectype != DB___txn_child) {
1836 			if (lc->nalloc < lc->nlsns + 1) {
1837 				nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
1838 				if ((ret = __os_realloc(env,
1839 				    nalloc * sizeof(DB_LSN), &lc->array)) != 0)
1840 					goto err;
1841 				lc->nalloc = nalloc;
1842 			}
1843 			lc->array[lc->nlsns++] = *lsnp;
1844 
1845 			/*
1846 			 * Explicitly copy the previous lsn.  The record
1847 			 * starts with a u_int32_t record type, a u_int32_t
1848 			 * txn id, and then the DB_LSN (prev_lsn) that we
1849 			 * want.  We copy explicitly because we have no idea
1850 			 * what kind of record this is.
1851 			 */
1852 			LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data +
1853 			    sizeof(u_int32_t) + sizeof(u_int32_t));
1854 		}
1855 
1856 		if (ret != 0)
1857 			goto err;
1858 	}
1859 	if (ret != 0)
1860 		__db_errx(env, DB_STR_A("3524",
1861 		    "collect failed at: [%lu][%lu]", "%lu %lu"),
1862 		    (u_long)lsnp->file, (u_long)lsnp->offset);
1863 
1864 err:	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
1865 		ret = t_ret;
1866 	if (data.data != NULL)
1867 		__os_ufree(env, data.data);
1868 	return (ret);
1869 }
1870 
1871 /*
1872  * __rep_remove_delayed_blobs --
1873  *
1874  * If a blob meta database is opened in the same transaction as the database
1875  * that owns it, then deciding whether it should be replicated or not needs
1876  * to be delayed until after the rest of the transaction is processed.  To do
1877  * this, the transaction's information is added to a DELAYED_BLOB_LIST.  When
1878  * the owning database is processed, if it is not replicated then remove the
1879  * entry of its blob meta database from the delayed list.
1880  */
1881 static int
__rep_remove_delayed_blobs(env,blob_file_id,child,dbl)1882 __rep_remove_delayed_blobs(env, blob_file_id, child, dbl)
1883 	ENV *env;
1884 	db_seq_t blob_file_id;
1885 	u_int32_t child;
1886 	DELAYED_BLOB_LIST **dbl;
1887 {
1888 	DELAYED_BLOB_LIST *ent, *next, *prev;
1889 
1890 	if (*dbl == NULL)
1891 		return (0);
1892 
1893 	/*
1894 	 * If the child transaction has not been set, then a new entry was just
1895 	 * added to the list.
1896 	 */
1897 	if ((*dbl)->child == 0) {
1898 		(*dbl)->child = child;
1899 		return (0);
1900 	}
1901 
1902 	if (blob_file_id == 0)
1903 		return (0);
1904 
1905 	/*
1906 	 * This blob meta database should not be replicated if its associated
1907 	 * database is not replicated.  Remove it from the delayed
1908 	 * list so it will not be processed at a later time.
1909 	 */
1910 	for (ent = *dbl; ent != NULL; ent = (DELAYED_BLOB_LIST *)ent->next) {
1911 		if (ent->blob_file_id == blob_file_id && ent->child != child) {
1912 			next = (DELAYED_BLOB_LIST *)ent->next;
1913 			prev = (DELAYED_BLOB_LIST *)ent->prev;
1914 			if (ent == *dbl)
1915 				*dbl = next;
1916 			if (prev != NULL)
1917 				prev->next = ent->next;
1918 			if (next != NULL)
1919 				next->prev = ent->prev;
1920 			__os_free(env, ent);
1921 			break;
1922 		}
1923 	}
1924 	return (0);
1925 }
1926 
1927 /*
1928  * __rep_lsn_cmp --
1929  *	qsort-type-compatible wrapper for LOG_COMPARE.
1930  */
1931 static int
__rep_lsn_cmp(lsn1,lsn2)1932 __rep_lsn_cmp(lsn1, lsn2)
1933 	const void *lsn1, *lsn2;
1934 {
1935 
1936 	return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2));
1937 }
1938 
1939 /*
1940  * __rep_newfile --
1941  *	NEWFILE messages have the LSN of the last record in the previous
1942  * log file.  When applying a NEWFILE message, make sure we haven't already
1943  * swapped files.  Assume caller hold mtx_clientdb.
1944  */
1945 static int
__rep_newfile(env,rp,rec)1946 __rep_newfile(env, rp, rec)
1947 	ENV *env;
1948 	__rep_control_args *rp;
1949 	DBT *rec;
1950 {
1951 	DB_LOG *dblp;
1952 	DB_LSN tmplsn;
1953 	DB_REP *db_rep;
1954 	LOG *lp;
1955 	REP *rep;
1956 	__rep_newfile_args nf_args;
1957 	int ret;
1958 
1959 	dblp = env->lg_handle;
1960 	lp = dblp->reginfo.primary;
1961 	db_rep = env->rep_handle;
1962 	rep = db_rep->region;
1963 
1964 	/*
1965 	 * If a newfile is already in progress, just ignore.
1966 	 */
1967 	if (F_ISSET(rep, REP_F_NEWFILE))
1968 		return (0);
1969 	if (rp->lsn.file + 1 > lp->ready_lsn.file) {
1970 		if ((ret = __rep_newfile_unmarshal(env, &nf_args,
1971 		    rec->data, rec->size, NULL)) != 0)
1972 			return (ret);
1973 		RPRINT(env, (env, DB_VERB_REP_MISC,
1974 		    "rep_newfile: File %lu vers %lu",
1975 		    (u_long)rp->lsn.file + 1, (u_long)nf_args.version));
1976 
1977 		/*
1978 		 * We drop the mtx_clientdb mutex during
1979 		 * the file operation, and then reacquire it when
1980 		 * we're done.  We avoid colliding with new incoming
1981 		 * log records because lp->ready_lsn is not getting
1982 		 * updated and there is no real log record at this
1983 		 * ready_lsn.  We avoid colliding with a duplicate
1984 		 * NEWFILE message by setting an in-progress flag.
1985 		 */
1986 		REP_SYSTEM_LOCK(env);
1987 		F_SET(rep, REP_F_NEWFILE);
1988 		REP_SYSTEM_UNLOCK(env);
1989 		MUTEX_UNLOCK(env, rep->mtx_clientdb);
1990 		LOG_SYSTEM_LOCK(env);
1991 		ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version);
1992 		LOG_SYSTEM_UNLOCK(env);
1993 		MUTEX_LOCK(env, rep->mtx_clientdb);
1994 		REP_SYSTEM_LOCK(env);
1995 		F_CLR(rep, REP_F_NEWFILE);
1996 		REP_SYSTEM_UNLOCK(env);
1997 		if (ret == 0)
1998 			lp->ready_lsn = tmplsn;
1999 		return (ret);
2000 	} else
2001 		/* We've already applied this NEWFILE.  Just ignore it. */
2002 		return (0);
2003 }
2004 
2005 /*
2006  * __rep_do_ckp --
2007  * Perform the memp_sync necessary for this checkpoint without holding the
2008  * REP->mtx_clientdb.  Callers of this function must hold REP->mtx_clientdb
2009  * and must not be holding the region mutex.
2010  */
2011 static int
__rep_do_ckp(env,rec,rp)2012 __rep_do_ckp(env, rec, rp)
2013 	ENV *env;
2014 	DBT *rec;
2015 	__rep_control_args *rp;
2016 {
2017 	DB_ENV *dbenv;
2018 	__txn_ckp_args *ckp_args;
2019 	DB_LSN ckp_lsn;
2020 	REP *rep;
2021 	int ret;
2022 
2023 	dbenv = env->dbenv;
2024 
2025 	/* Crack the log record and extract the checkpoint LSN. */
2026 	if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0)
2027 		return (ret);
2028 	ckp_lsn = ckp_args->ckp_lsn;
2029 	__os_free(env, ckp_args);
2030 
2031 	rep = env->rep_handle->region;
2032 
2033 	MUTEX_UNLOCK(env, rep->mtx_clientdb);
2034 	DB_TEST_WAIT(env, env->test_check);
2035 
2036 	/*
2037 	 * Sync the memory pool.
2038 	 *
2039 	 * This is the real PERM lock record/ckp.  We cannot return ISPERM
2040 	 * if we haven't truly completed the checkpoint, so we don't allow
2041 	 * this call to be interrupted.
2042 	 *
2043 	 * We may be overlapping our log record with an in-progress startsync
2044 	 * of this checkpoint; suppress the max_write settings on any running
2045 	 * cache-flush operation so it completes quickly.
2046 	 */
2047 	(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1);
2048 	MUTEX_LOCK(env, rep->mtx_ckp);
2049 	ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn);
2050 	MUTEX_UNLOCK(env, rep->mtx_ckp);
2051 	(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0);
2052 
2053 	/* Update the last_ckp in the txn region. */
2054 	if (ret == 0)
2055 		ret = __txn_updateckp(env, &rp->lsn);
2056 	else {
2057 		__db_errx(env, DB_STR_A("3525",
2058 		    "Error syncing ckp [%lu][%lu]", "%lu %lu"),
2059 		    (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
2060 		ret = __env_panic(env, ret);
2061 	}
2062 
2063 	MUTEX_LOCK(env, rep->mtx_clientdb);
2064 #ifdef HAVE_REPLICATION_THREADS
2065 	if (ret == 0) {
2066 		REP_SYSTEM_LOCK(env);
2067 		if (LOG_COMPARE(&ckp_lsn, &rep->last_ckp_lsn) > 0)
2068 			rep->last_ckp_lsn = ckp_lsn;
2069 		REP_SYSTEM_UNLOCK(env);
2070 	}
2071 #endif
2072 	return (ret);
2073 }
2074 
2075 /*
2076  * __rep_remfirst --
2077  * Remove the first entry from the __db.rep.db
2078  */
2079 static int
__rep_remfirst(env,ip,cntrl,rec)2080 __rep_remfirst(env, ip, cntrl, rec)
2081 	ENV *env;
2082 	DB_THREAD_INFO *ip;
2083 	DBT *cntrl;
2084 	DBT *rec;
2085 {
2086 	DB *dbp;
2087 	DBC *dbc;
2088 	DB_REP *db_rep;
2089 	int ret, t_ret;
2090 
2091 	db_rep = env->rep_handle;
2092 	dbp = db_rep->rep_db;
2093 	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
2094 		return (ret);
2095 
2096 	/* The DBTs need to persist through another call. */
2097 	F_SET(cntrl, DB_DBT_REALLOC);
2098 	F_SET(rec, DB_DBT_REALLOC);
2099 	if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0)
2100 		ret = __dbc_del(dbc, 0);
2101 	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2102 		ret = t_ret;
2103 
2104 	return (ret);
2105 }
2106 
2107 /*
2108  * __rep_getnext --
2109  * Get the next record out of the __db.rep.db table.
2110  */
2111 static int
__rep_getnext(env,ip)2112 __rep_getnext(env, ip)
2113 	ENV *env;
2114 	DB_THREAD_INFO *ip;
2115 {
2116 	DB *dbp;
2117 	DBC *dbc;
2118 	DBT lsn_dbt, nextrec_dbt;
2119 	DB_LOG *dblp;
2120 	DB_REP *db_rep;
2121 	LOG *lp;
2122 	__rep_control_args *rp;
2123 	int ret, t_ret;
2124 
2125 	dblp = env->lg_handle;
2126 	lp = dblp->reginfo.primary;
2127 
2128 	db_rep = env->rep_handle;
2129 	dbp = db_rep->rep_db;
2130 
2131 	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
2132 		return (ret);
2133 
2134 	/*
2135 	 * Update waiting_lsn.  We need to move it
2136 	 * forward to the LSN of the next record
2137 	 * in the queue.
2138 	 *
2139 	 * If the next item in the database is a log
2140 	 * record--the common case--we're not
2141 	 * interested in its contents, just in its LSN.
2142 	 * Optimize by doing a partial get of the data item.
2143 	 */
2144 	memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
2145 	F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
2146 	nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
2147 
2148 	memset(&lsn_dbt, 0, sizeof(lsn_dbt));
2149 	ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
2150 	if (ret != DB_NOTFOUND && ret != 0)
2151 		goto err;
2152 
2153 	if (ret == DB_NOTFOUND) {
2154 		ZERO_LSN(lp->waiting_lsn);
2155 		/*
2156 		 * Whether or not the current record is
2157 		 * simple, there's no next one, and
2158 		 * therefore we haven't got anything
2159 		 * else to do right now.  Break out.
2160 		 */
2161 		goto err;
2162 	}
2163 	rp = (__rep_control_args *)lsn_dbt.data;
2164 	lp->waiting_lsn = rp->lsn;
2165 
2166 err:	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2167 		ret = t_ret;
2168 	return (ret);
2169 }
2170 
2171 /*
2172  * __rep_process_rec --
2173  *
2174  * Given a record in 'rp', process it.  In the case of a NEWFILE, that means
2175  * potentially switching files.  In the case of a checkpoint, it means doing
2176  * the checkpoint, and in other cases, it means simply writing the record into
2177  * the log.
2178  */
2179 static int
__rep_process_rec(env,ip,rp,rec,ret_tsp,ret_lsnp)2180 __rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp)
2181 	ENV *env;
2182 	DB_THREAD_INFO *ip;
2183 	__rep_control_args *rp;
2184 	DBT *rec;
2185 	db_timespec *ret_tsp;
2186 	DB_LSN *ret_lsnp;
2187 {
2188 	DB *dbp;
2189 	DBT control_dbt, key_dbt, rec_dbt;
2190 	DB_LOG *dblp;
2191 	DB_REP *db_rep;
2192 	DB_LOGC *logc;
2193 	LOG *lp;
2194 	REP *rep;
2195 	DB_LSN lsn;
2196 	db_timespec msg_time;
2197 	u_int32_t rectype, txnid;
2198 	int ret, t_ret;
2199 
2200 	db_rep = env->rep_handle;
2201 	rep = db_rep->region;
2202 	dblp = env->lg_handle;
2203 	lp = dblp->reginfo.primary;
2204 	dbp = db_rep->rep_db;
2205 	ret = 0;
2206 
2207 	memset(&rec_dbt, 0, sizeof(rec_dbt));
2208 	if (rp->rectype == REP_NEWFILE) {
2209 		if ((ret = __rep_newfile(env, rp, rec)) != 0)
2210 			return (ret);
2211 
2212 		/*
2213 		 * In SYNC_LOG, in case the end-of-log sync point happens to be
2214 		 * right at the file boundary, we need to make sure ret_lsnp
2215 		 * points to a real log record, rather than the "dead space" at
2216 		 * the end of the file that the NEWFILE msg normally points to.
2217 		 */
2218 		if (rep->sync_state == SYNC_LOG) {
2219 			if ((ret = __log_cursor(env, &logc)) != 0)
2220 				return (ret);
2221 			if ((ret = __logc_get(logc,
2222 			    &lsn, &rec_dbt, DB_LAST)) == 0)
2223 				*ret_lsnp = lsn;
2224 			if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
2225 				ret = t_ret;
2226 		}
2227 		return (ret);
2228 	}
2229 
2230 	LOGCOPY_32(env, &rectype, rec->data);
2231 	memset(&control_dbt, 0, sizeof(control_dbt));
2232 	timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
2233 
2234 	/*
2235 	 * We write all records except for checkpoint records here.
2236 	 * All non-checkpoint records need to appear in the log before
2237 	 * we take action upon them (i.e., we enforce write-ahead logging).
2238 	 * However, we can't write the checkpoint record here until the
2239 	 * data buffers are actually written to disk, else we are creating
2240 	 * an invalid log -- one that says all data before a certain point
2241 	 * has been written to disk.
2242 	 *
2243 	 * If two threads are both processing the same checkpoint record
2244 	 * (because, for example, it was resent and the original finally
2245 	 * arrived), we handle that below by checking for the existence of
2246 	 * the log record when we add it to the replication database.
2247 	 *
2248 	 * Any log records that arrive while we are processing the checkpoint
2249 	 * are added to the bookkeeping database because ready_lsn is not yet
2250 	 * updated to point after the checkpoint record.
2251 	 */
2252 	if (rectype != DB___txn_ckp || rep->sync_state == SYNC_LOG) {
2253 		if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0)
2254 			return (ret);
2255 		STAT(rep->stat.st_log_records++);
2256 		if (rep->sync_state == SYNC_LOG) {
2257 			*ret_lsnp = rp->lsn;
2258 			goto out;
2259 		}
2260 	}
2261 
2262 	switch (rectype) {
2263 	case DB___dbreg_register:
2264 		/*
2265 		 * DB opens occur in the context of a transaction, so we can
2266 		 * simply handle them when we process the transaction.  Closes,
2267 		 * checkpoints and other dbreg opcodes are not transaction-
2268 		 * protected, so we have to handle them here.
2269 		 *
2270 		 * It should be unsafe for the master to do a close of a file
2271 		 * that was opened in an active transaction, so we should be
2272 		 * guaranteed to get the ordering right.
2273 		 *
2274 		 * !!!
2275 		 * The txn ID is the second 4-byte field of the log record.
2276 		 * We should really be calling __dbreg_register_read() and
2277 		 * working from the __dbreg_register_args structure, but this
2278 		 * is considerably faster and the order of the fields won't
2279 		 * change.
2280 		 */
2281 		LOGCOPY_32(env, &txnid,
2282 		    (u_int8_t *)rec->data + sizeof(u_int32_t));
2283 		if (txnid == TXN_INVALID)
2284 			ret = __db_dispatch(env, &env->recover_dtab,
2285 			    rec, &rp->lsn, DB_TXN_APPLY, NULL);
2286 		break;
2287 	case DB___txn_regop:
2288 		/*
2289 		 * If an application is doing app-specific recovery
2290 		 * and acquires locks while applying a transaction,
2291 		 * it can deadlock.  Any other locks held by this
2292 		 * thread should have been discarded in the
2293 		 * __rep_process_txn error path, so if we simply
2294 		 * retry, we should eventually succeed.
2295 		 */
2296 		do {
2297 			ret = 0;
2298 			if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
2299 				ret = __txn_openfiles(env, ip, NULL, 1);
2300 				F_SET(db_rep, DBREP_OPENFILES);
2301 			}
2302 			if (ret == 0)
2303 				ret = __rep_process_txn(env, rec);
2304 		} while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED);
2305 
2306 		/* Now write/flush the log as appropriate. */
2307 		if (ret == 0) {
2308 			if (F_ISSET(env->dbenv, DB_ENV_TXN_WRITE_NOSYNC))
2309 				ret = __log_rep_write(env);
2310 			else if (!F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
2311 				ret = __log_flush(env, NULL);
2312 		}
2313 		if (ret != 0) {
2314 			__db_errx(env, DB_STR_A("3526",
2315 			    "Error processing txn [%lu][%lu]", "%lu %lu"),
2316 			    (u_long)rp->lsn.file, (u_long)rp->lsn.offset);
2317 			ret = __env_panic(env, ret);
2318 		}
2319 		*ret_lsnp = rp->lsn;
2320 		break;
2321 	case DB___txn_prepare:
2322 		ret = __log_flush(env, NULL);
2323 		/*
2324 		 * Save the biggest prepared LSN we've seen.
2325 		 */
2326 		rep->max_prep_lsn = rp->lsn;
2327 		VPRINT(env, (env, DB_VERB_REP_MSGS,
2328 		    "process_rec: prepare at [%lu][%lu]",
2329 		    (u_long)rep->max_prep_lsn.file,
2330 		    (u_long)rep->max_prep_lsn.offset));
2331 		break;
2332 	case DB___txn_ckp:
2333 		/*
2334 		 * We do not want to hold the REP->mtx_clientdb mutex while
2335 		 * syncing the mpool, so if we get a checkpoint record we are
2336 		 * supposed to process, add it to the __db.rep.db, do the
2337 		 * memp_sync and then go back and process it later, when the
2338 		 * sync has finished.  If this record is already in the table,
2339 		 * then some other thread will process it, so simply return
2340 		 * REP_NOTPERM.
2341 		 */
2342 		memset(&key_dbt, 0, sizeof(key_dbt));
2343 		key_dbt.data = rp;
2344 		key_dbt.size = sizeof(*rp);
2345 
2346 		/*
2347 		 * We want to put this record into the tmp DB only if
2348 		 * it doesn't exist, so use DB_NOOVERWRITE.
2349 		 */
2350 		ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
2351 		if (ret == DB_KEYEXIST) {
2352 			if (ret_lsnp != NULL)
2353 				*ret_lsnp = rp->lsn;
2354 			ret = DB_REP_NOTPERM;
2355 		}
2356 		if (ret != 0)
2357 			break;
2358 
2359 		/*
2360 		 * Now, do the checkpoint.  Regardless of
2361 		 * whether the checkpoint succeeds or not,
2362 		 * we need to remove the record we just put
2363 		 * in the temporary database.  If the
2364 		 * checkpoint failed, return an error.  We
2365 		 * will act like we never received the
2366 		 * checkpoint.
2367 		 */
2368 		if ((ret = __rep_do_ckp(env, rec, rp)) == 0)
2369 			ret = __log_rep_put(env, &rp->lsn, rec,
2370 			    DB_LOG_CHKPNT);
2371 		if ((t_ret = __rep_remfirst(env, ip,
2372 		    &control_dbt, &rec_dbt)) != 0 && ret == 0)
2373 			ret = t_ret;
2374 		/*
2375 		 * If we're successful putting the log record in the
2376 		 * log, flush it for a checkpoint.
2377 		 */
2378 		if (ret == 0) {
2379 			*ret_lsnp = rp->lsn;
2380 			ret = __log_flush(env, NULL);
2381 			if (ret == 0 && lp->db_log_autoremove)
2382 				__log_autoremove(env);
2383 		}
2384 		break;
2385 	default:
2386 		break;
2387 	}
2388 
2389 out:
2390 	if (ret == 0 && F_ISSET(rp, REPCTL_PERM))
2391 		*ret_lsnp = rp->lsn;
2392 	if (IS_USING_LEASES(env) &&
2393 	    F_ISSET(rp, REPCTL_LEASE))
2394 		*ret_tsp = msg_time;
2395 	/*
2396 	 * Set ret_lsnp before flushing the log because if the
2397 	 * flush fails, we've still written the record to the
2398 	 * log and the LSN has been entered.
2399 	 */
2400 	if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH))
2401 		ret = __log_flush(env, NULL);
2402 	if (control_dbt.data != NULL)
2403 		__os_ufree(env, control_dbt.data);
2404 	if (rec_dbt.data != NULL)
2405 		__os_ufree(env, rec_dbt.data);
2406 
2407 	return (ret);
2408 }
2409 
2410 /*
2411  * __rep_resend_req --
2412  *	We might have dropped a message, we need to resend our request.
2413  *	The request we send is dependent on what recovery state we're in.
2414  *	The caller holds no locks.
2415  *
2416  * PUBLIC: int __rep_resend_req __P((ENV *, int));
2417  */
2418 int
__rep_resend_req(env,rereq)2419 __rep_resend_req(env, rereq)
2420 	ENV *env;
2421 	int rereq;
2422 {
2423 	DB_LOG *dblp;
2424 	DB_LSN lsn, *lsnp;
2425 	DB_REP *db_rep;
2426 	LOG *lp;
2427 	REP *rep;
2428 	int blob_sync, master, ret;
2429 	repsync_t sync_state;
2430 	u_int32_t gapflags, msgtype, repflags, sendflags;
2431 
2432 	db_rep = env->rep_handle;
2433 	rep = db_rep->region;
2434 	dblp = env->lg_handle;
2435 	lp = dblp->reginfo.primary;
2436 	ret = 0;
2437 	lsnp = NULL;
2438 	msgtype = REP_INVALID;
2439 	sendflags = 0;
2440 
2441 	repflags = rep->flags;
2442 	sync_state = rep->sync_state;
2443 	blob_sync = rep->blob_sync;
2444 	/*
2445 	 * If we are delayed we do not rerequest anything.
2446 	 */
2447 	if (FLD_ISSET(repflags, REP_F_DELAY))
2448 		return (ret);
2449 	gapflags = rereq ? REP_GAP_REREQUEST : 0;
2450 
2451 	if (sync_state == SYNC_VERIFY) {
2452 		MUTEX_LOCK(env, rep->mtx_clientdb);
2453 		lsn = lp->verify_lsn;
2454 		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2455 		if (!IS_ZERO_LSN(lsn)) {
2456 			msgtype = REP_VERIFY_REQ;
2457 			lsnp = &lsn;
2458 			sendflags = DB_REP_REREQUEST;
2459 		}
2460 	} else if (sync_state == SYNC_UPDATE) {
2461 		/*
2462 		 * UPDATE_REQ only goes to the master.
2463 		 */
2464 		msgtype = REP_UPDATE_REQ;
2465 	} else if (sync_state == SYNC_PAGE) {
2466 		if (blob_sync == 0) {
2467 			REP_SYSTEM_LOCK(env);
2468 			ret = __rep_pggap_req(env, rep, NULL, gapflags);
2469 			REP_SYSTEM_UNLOCK(env);
2470 		} else {
2471 			MUTEX_LOCK(env, rep->mtx_clientdb);
2472 			REP_SYSTEM_LOCK(env);
2473 			ret = __rep_blob_rereq(env, rep, gapflags);
2474 			REP_SYSTEM_UNLOCK(env);
2475 			MUTEX_UNLOCK(env, rep->mtx_clientdb);
2476 		}
2477 	} else {
2478 		MUTEX_LOCK(env, rep->mtx_clientdb);
2479 		ret = __rep_loggap_req(env, rep, NULL, gapflags);
2480 		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2481 	}
2482 
2483 	if (msgtype != REP_INVALID) {
2484 		master = rep->master_id;
2485 		if (master == DB_EID_INVALID)
2486 			(void)__rep_send_message(env,
2487 			    DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
2488 		else
2489 			(void)__rep_send_message(env,
2490 			    master, msgtype, lsnp, NULL, 0, sendflags);
2491 	}
2492 
2493 	return (ret);
2494 }
2495 
2496 /*
2497  * __rep_check_doreq --
2498  * PUBLIC: int __rep_check_doreq __P((ENV *, REP *));
2499  *
2500  * Check if we need to send another request.  If so, compare with
2501  * the request limits the user might have set.  This assumes the
2502  * caller holds the REP->mtx_clientdb mutex.  Returns 1 if a request
2503  * needs to be made, and 0 if it does not.
2504  */
2505 int
__rep_check_doreq(env,rep)2506 __rep_check_doreq(env, rep)
2507 	ENV *env;
2508 	REP *rep;
2509 {
2510 
2511 	DB_LOG *dblp;
2512 	LOG *lp;
2513 	db_timespec now;
2514 	int req;
2515 
2516 	dblp = env->lg_handle;
2517 	lp = dblp->reginfo.primary;
2518 	__os_gettime(env, &now, 1);
2519 	timespecsub(&now, &lp->rcvd_ts);
2520 	req = timespeccmp(&now, &lp->wait_ts, >=);
2521 	if (req) {
2522 		/*
2523 		 * Add wait_ts to itself to double it.
2524 		 */
2525 		timespecadd(&lp->wait_ts, &lp->wait_ts);
2526 		if (timespeccmp(&lp->wait_ts, &rep->max_gap, >))
2527 			lp->wait_ts = rep->max_gap;
2528 		__os_gettime(env, &lp->rcvd_ts, 1);
2529 	}
2530 	return (req);
2531 }
2532 
2533 /*
2534  * __rep_skip_msg -
2535  *
2536  *	If we're in recovery we want to skip/ignore the message, but
2537  *	we also need to see if we need to re-request any retransmissions.
2538  */
2539 static int
__rep_skip_msg(env,rep,eid,rectype)2540 __rep_skip_msg(env, rep, eid, rectype)
2541 	ENV *env;
2542 	REP *rep;
2543 	int eid;
2544 	u_int32_t rectype;
2545 {
2546 	int do_req, ret;
2547 
2548 	ret = 0;
2549 	/*
2550 	 * If we have a request message from a client then immediately
2551 	 * send a REP_REREQUEST back to that client since we're skipping it.
2552 	 */
2553 	if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype))
2554 		do_req = 1;
2555 	else {
2556 		/* Check for need to retransmit. */
2557 		MUTEX_LOCK(env, rep->mtx_clientdb);
2558 		do_req = __rep_check_doreq(env, rep);
2559 		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2560 	}
2561 	/*
2562 	 * Don't respond to a MASTER_REQ with
2563 	 * a MASTER_REQ or REREQUEST.
2564 	 */
2565 	if (do_req && rectype != REP_MASTER_REQ) {
2566 		/*
2567 		 * There are three cases:
2568 		 * 1.  If we don't know who the master is, then send MASTER_REQ.
2569 		 * 2.  If the message we're skipping came from the master,
2570 		 * then we need to rerequest.
2571 		 * 3.  If the message didn't come from a master (i.e. client
2572 		 * to client), then send a rerequest back to the sender so
2573 		 * the sender can rerequest it elsewhere, if we are a client.
2574 		 */
2575 		if (rep->master_id == DB_EID_INVALID)	/* Case 1. */
2576 			(void)__rep_send_message(env,
2577 			    DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
2578 		else if (eid == rep->master_id)	{	/* Case 2. */
2579 			/*
2580 			 * When we receive log messages in the SYNC_PAGE stage
2581 			 * and we decide to rerequest, it often means the pages
2582 			 * we expect have been dropped.  Send a rerequest with
2583 			 * gapflags for better performance.
2584 			 */
2585 			if ((rectype == REP_LOG || rectype == REP_BULK_LOG ||
2586 			    rectype == REP_LOG_MORE) &&
2587 			    rep->sync_state == SYNC_PAGE)
2588 				ret = __rep_resend_req(env, 1);
2589 			else
2590 				ret = __rep_resend_req(env, 0);
2591 		} else if (F_ISSET(rep, REP_F_CLIENT))	/* Case 3. */
2592 			(void)__rep_send_message(env,
2593 			    eid, REP_REREQUEST, NULL, NULL, 0, 0);
2594 	}
2595 	return (ret);
2596 }
2597 
2598 /*
2599  * __rep_check_missing --
2600  * PUBLIC: int __rep_check_missing __P((ENV *, u_int32_t, DB_LSN *));
2601  *
2602  * Check for and request any missing client information.
2603  */
2604 int
__rep_check_missing(env,gen,master_perm_lsn)2605 __rep_check_missing(env, gen, master_perm_lsn)
2606 	ENV *env;
2607 	u_int32_t gen;
2608 	DB_LSN *master_perm_lsn;
2609 {
2610 	DB_LOG *dblp;
2611 	DB_LSN *end_lsn;
2612 	DB_REP *db_rep;
2613 	LOG *lp;
2614 	REGINFO *infop;
2615 	REP *rep;
2616 	__rep_fileinfo_args *curinfo;
2617 	int do_req, has_log_gap, has_page_gap, ret;
2618 
2619 	db_rep = env->rep_handle;
2620 	rep = db_rep->region;
2621 	dblp = env->lg_handle;
2622 	infop = env->reginfo;
2623 	has_log_gap = has_page_gap = ret = 0;
2624 
2625 	MUTEX_LOCK(env, rep->mtx_clientdb);
2626 	REP_SYSTEM_LOCK(env);
2627 	/*
2628 	 * Check if we are okay to proceed with this operation.  If not,
2629 	 * do not rerequest anything.
2630 	 */
2631 	if (!F_ISSET(rep, REP_F_CLIENT) || rep->master_id == DB_EID_INVALID ||
2632 	    gen != rep->gen || FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) {
2633 		REP_SYSTEM_UNLOCK(env);
2634 		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2635 		/*
2636 		 * If this client is out-of-date, ask the master to identify
2637 		 * itself so that this client will synchronize with the
2638 		 * master's later generation.
2639 		 */
2640 		if (gen > rep->gen && __rep_check_doreq(env, rep))
2641 			(void)__rep_send_message(env,
2642 			    DB_EID_BROADCAST, REP_MASTER_REQ,
2643 			    NULL, NULL, 0, 0);
2644 		goto out;
2645 	}
2646 
2647 	/*
2648 	 * Prevent message lockout by counting ourself here.
2649 	 * Setting rep->msg_th will prevent a major system
2650 	 * change, such as a role change or running recovery, from
2651 	 * occurring before sending out any rerequests.
2652 	 */
2653 	rep->msg_th++;
2654 	REP_SYSTEM_UNLOCK(env);
2655 
2656 	/* Check that it is time to request missing information. */
2657 	if ((do_req = __rep_check_doreq(env, rep))) {
2658 		/* Check for interior or tail page gap. */
2659 		REP_SYSTEM_LOCK(env);
2660 		if (rep->sync_state == SYNC_PAGE &&
2661 		    rep->curinfo_off != INVALID_ROFF) {
2662 			GET_CURINFO(rep, infop, curinfo);
2663 			has_page_gap =
2664 			    rep->waiting_pg != PGNO_INVALID ||
2665 			    rep->ready_pg <= curinfo->max_pgno;
2666 		}
2667 		REP_SYSTEM_UNLOCK(env);
2668 	}
2669 	/* Check for interior or tail log gap. */
2670 	if (do_req && !has_page_gap) {
2671 		lp = dblp->reginfo.primary;
2672 		/*
2673 		 * The LOG_COMPARE test is <= because ready_lsn is
2674 		 * the next LSN we are expecting but we do not have
2675 		 * it yet.  If the needed LSN is at this LSN, it
2676 		 * means we are missing the last record we need.
2677 		 */
2678 		if (rep->sync_state == SYNC_LOG)
2679 			end_lsn = &rep->last_lsn;
2680 		else
2681 			end_lsn = master_perm_lsn;
2682 		has_log_gap = !IS_ZERO_LSN(lp->waiting_lsn) ||
2683 		    LOG_COMPARE(&lp->ready_lsn, end_lsn) <= 0;
2684 	}
2685 	MUTEX_UNLOCK(env, rep->mtx_clientdb);
2686 	/*
2687 	 * If it is time to send a request, only do so if we
2688 	 * have a log gap or a page gap, or we need to resend an
2689 	 * UPDATE_REQ or VERIFY_REQ, or we are in SYNC_LOG to keep
2690 	 * requesting to the current known end of the log.
2691 	 */
2692 	do_req = do_req && (has_log_gap || has_page_gap ||
2693 	    rep->sync_state == SYNC_LOG ||
2694 	    rep->sync_state == SYNC_UPDATE ||
2695 	    rep->sync_state == SYNC_VERIFY);
2696 	/*
2697 	 * Determines request type from current replication
2698 	 * state and resends request.  The request may have
2699 	 * the DB_REP_ANYWHERE flag enabled if appropriate.
2700 	 */
2701 	if (do_req)
2702 		ret = __rep_resend_req(env, 0);
2703 
2704 	REP_SYSTEM_LOCK(env);
2705 	rep->msg_th--;
2706 	REP_SYSTEM_UNLOCK(env);
2707 
2708 out:	return (ret);
2709 }
2710 
2711 static int
__rep_fire_newmaster(env,gen,master)2712 __rep_fire_newmaster(env, gen, master)
2713 	ENV *env;
2714 	u_int32_t gen;
2715 	int master;
2716 {
2717 	DB_REP *db_rep;
2718 	REP *rep;
2719 
2720 	db_rep = env->rep_handle;
2721 	rep = db_rep->region;
2722 
2723 	REP_EVENT_LOCK(env);
2724 	/*
2725 	 * The firing of this event should be idempotent with respect to a
2726 	 * particular generation number.
2727 	 */
2728 	if (rep->newmaster_event_gen < gen) {
2729 		__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2730 		rep->newmaster_event_gen = gen;
2731 	}
2732 	REP_EVENT_UNLOCK(env);
2733 	return (0);
2734 }
2735 
2736 static int
__rep_fire_startupdone(env,gen,master)2737 __rep_fire_startupdone(env, gen, master)
2738 	ENV *env;
2739 	u_int32_t gen;
2740 	int master;
2741 {
2742 	DB_REP *db_rep;
2743 	REP *rep;
2744 
2745 	db_rep = env->rep_handle;
2746 	rep = db_rep->region;
2747 
2748 	REP_EVENT_LOCK(env);
2749 	/*
2750 	 * Usually NEWMASTER will already have been fired.  But if not, fire
2751 	 * it here now, to ensure the application receives events in the
2752 	 * expected order.
2753 	 */
2754 	if (rep->newmaster_event_gen < gen) {
2755 		__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2756 		rep->newmaster_event_gen = gen;
2757 	}
2758 
2759 	/*
2760 	 * Caller already ensures that it only tries to fire STARTUPDONE once
2761 	 * per generation.  If we did not want to rely on that, we could add a
2762 	 * simple boolean flag (to the set of data protected by the mtx_event).
2763 	 * The precise meaning of that flag would be "STARTUPDONE has been fired
2764 	 * for the generation value stored in `newmaster_event_gen'".  Then the
2765 	 * more accurate test here would be simply to check that flag, and fire
2766 	 * the event (and set the flag) if it were not already set.
2767 	 */
2768 	if (rep->newmaster_event_gen == gen)
2769 		__rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL);
2770 	REP_EVENT_UNLOCK(env);
2771 	return (0);
2772 }
2773