1 /*-
2 * Copyright (c) 2001, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file LICENSE for license information.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/blob.h"
13 #include "dbinc/db_page.h"
14 #include "dbinc/db_am.h"
15 #include "dbinc/lock.h"
16 #include "dbinc/mp.h"
17 #include "dbinc/txn.h"
18
19 static int __rep_collect_txn
20 __P((ENV *, DB_LSN *, LSN_COLLECTION *, DELAYED_BLOB_LIST **));
21 static int __rep_remove_delayed_blobs
22 __P((ENV *, db_seq_t, u_int32_t ,DELAYED_BLOB_LIST **));
23 static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *));
24 static int __rep_fire_newmaster __P((ENV *, u_int32_t, int));
25 static int __rep_fire_startupdone __P((ENV *, u_int32_t, int));
26 static int __rep_getnext __P((ENV *, DB_THREAD_INFO *));
27 static int __rep_lsn_cmp __P((const void *, const void *));
28 static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *));
29 static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
30 DBT *, db_timespec *, DB_LSN *));
31 static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *));
32 static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t));
33
34 /* Used to consistently designate which messages ought to be received where. */
35
36 #define MASTER_ONLY(rep, rp) do { \
37 if (!F_ISSET(rep, REP_F_MASTER)) { \
38 RPRINT(env, (env, DB_VERB_REP_MSGS, \
39 "Master record received on client")); \
40 REP_PRINT_MESSAGE(env, \
41 eid, rp, "rep_process_message", 0); \
42 /* Just skip/ignore it. */ \
43 ret = 0; \
44 goto errlock; \
45 } \
46 } while (0)
47
48 #define CLIENT_ONLY(rep, rp) do { \
49 if (!F_ISSET(rep, REP_F_CLIENT)) { \
50 RPRINT(env, (env, DB_VERB_REP_MSGS, \
51 "Client record received on master")); \
52 /* \
53 * Only broadcast DUPMASTER if leases are not \
54 * in effect. If I am an old master, using \
55 * leases and I get a newer message, my leases \
56 * had better all be expired. \
57 */ \
58 if (IS_USING_LEASES(env)) \
59 DB_ASSERT(env, \
60 __rep_lease_check(env, 0) == \
61 DB_REP_LEASE_EXPIRED); \
62 else { \
63 REP_PRINT_MESSAGE(env, \
64 eid, rp, "rep_process_message", 0); \
65 (void)__rep_send_message(env, DB_EID_BROADCAST, \
66 REP_DUPMASTER, NULL, NULL, 0, 0); \
67 } \
68 ret = DB_REP_DUPMASTER; \
69 goto errlock; \
70 } \
71 } while (0)
72
73 /*
74 * If a client is attempting to service a request and its gen is not in
75 * sync with its database state, it cannot service the request. Currently
76 * the only way to know this is with the heavy hammer of knowing (or not)
77 * who the master is. If the master is invalid, force a rerequest.
78 * If we receive an ALIVE, we update both gen and invalidate the
79 * master_id.
80 */
81 #define CLIENT_MASTERCHK do { \
82 if (F_ISSET(rep, REP_F_CLIENT)) { \
83 if (master_id == DB_EID_INVALID) { \
84 STAT(rep->stat.st_client_svc_miss++); \
85 ret = __rep_skip_msg(env, rep, eid, rp->rectype);\
86 goto errlock; \
87 } \
88 } \
89 } while (0)
90
91 /*
92 * If a client is attempting to service a request it does not have,
93 * call rep_skip_msg to skip this message and force a rerequest to the
94 * sender. We don't hold the mutex for the stats and may miscount.
95 */
96 #define CLIENT_REREQ do { \
97 if (F_ISSET(rep, REP_F_CLIENT)) { \
98 STAT(rep->stat.st_client_svc_req++); \
99 if (ret == DB_NOTFOUND) { \
100 STAT(rep->stat.st_client_svc_miss++); \
101 ret = __rep_skip_msg(env, rep, eid, rp->rectype);\
102 } \
103 } \
104 } while (0)
105
106 #define RECOVERING_SKIP do { \
107 if (IS_REP_CLIENT(env) && recovering) { \
108 /* Not holding region mutex, may miscount */ \
109 STAT(rep->stat.st_msgs_recover++); \
110 ret = __rep_skip_msg(env, rep, eid, rp->rectype); \
111 goto errlock; \
112 } \
113 } while (0)
114
115 /*
116 * If we're recovering the log we only want log records that are in the
117 * range we need to recover. Otherwise we can end up storing a huge
118 * number of "new" records, only to truncate the temp database later after
119 * we run recovery. If we are actively delaying a sync-up, we also skip
120 * all incoming log records until the application requests sync-up.
121 */
122 #define RECOVERING_LOG_SKIP do { \
123 if (F_ISSET(rep, REP_F_DELAY) || \
124 rep->master_id == DB_EID_INVALID || \
125 (recovering && \
126 (rep->sync_state != SYNC_LOG || \
127 LOG_COMPARE(&rp->lsn, &rep->last_lsn) >= 0))) { \
128 /* Not holding region mutex, may miscount */ \
129 STAT(rep->stat.st_msgs_recover++); \
130 ret = __rep_skip_msg(env, rep, eid, rp->rectype); \
131 goto errlock; \
132 } \
133 } while (0)
134
135 #define ANYSITE(rep)
136
137 /*
138 * __rep_process_message_pp --
139 *
140 * This routine takes an incoming message and processes it.
141 *
142 * control: contains the control fields from the record
143 * rec: contains the actual record
144 * eid: the environment id of the sender of the message;
145 * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the
146 * lsn of the maximum permanent or current not permanent log record
147 * (respectively).
148 *
149 * PUBLIC: int __rep_process_message_pp
150 * PUBLIC: __P((DB_ENV *, DBT *, DBT *, int, DB_LSN *));
151 */
152 int
__rep_process_message_pp(dbenv,control,rec,eid,ret_lsnp)153 __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp)
154 DB_ENV *dbenv;
155 DBT *control, *rec;
156 int eid;
157 DB_LSN *ret_lsnp;
158 {
159 ENV *env;
160 DB_THREAD_INFO *ip;
161 int ret;
162
163 env = dbenv->env;
164 ret = 0;
165
166 ENV_REQUIRES_CONFIG_XX(
167 env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP);
168
169 if (APP_IS_REPMGR(env)) {
170 __db_errx(env, DB_STR_A("3512",
171 "%s cannot call from Replication Manager application",
172 "%s"), "DB_ENV->rep_process_message:");
173 return (EINVAL);
174 }
175
176 /* Control argument must be non-Null. */
177 if (control == NULL || control->size == 0) {
178 __db_errx(env, DB_STR("3513",
179 "DB_ENV->rep_process_message: control argument must be specified"));
180 return (EINVAL);
181 }
182
183 /*
184 * Make sure site is a master or a client, which implies that
185 * replication has been started.
186 */
187 if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) {
188 __db_errx(env, DB_STR("3514",
189 "Environment not configured as replication master or client"));
190 return (EINVAL);
191 }
192
193 if ((ret = __dbt_usercopy(env, control)) != 0 ||
194 (ret = __dbt_usercopy(env, rec)) != 0) {
195 __dbt_userfree(env, control, rec, NULL);
196 __db_errx(env, DB_STR("3515",
197 "DB_ENV->rep_process_message: error retrieving DBT contents"));
198 return (ret);
199 }
200
201 ENV_ENTER(env, ip);
202 ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp);
203 ENV_LEAVE(env, ip);
204
205 __dbt_userfree(env, control, rec, NULL);
206 return (ret);
207 }
208
209 /*
210 * __rep_process_message_int --
211 *
212 * This routine performs the internal steps to process an incoming message.
213 *
214 * PUBLIC: int __rep_process_message_int
215 * PUBLIC: __P((ENV *, DBT *, DBT *, int, DB_LSN *));
216 */
217 int
__rep_process_message_int(env,control,rec,eid,ret_lsnp)218 __rep_process_message_int(env, control, rec, eid, ret_lsnp)
219 ENV *env;
220 DBT *control, *rec;
221 int eid;
222 DB_LSN *ret_lsnp;
223 {
224 DBT data_dbt;
225 DB_LOG *dblp;
226 DB_LSN last_lsn, lsn;
227 DB_REP *db_rep;
228 DB_THREAD_INFO *ip;
229 LOG *lp;
230 REGENV *renv;
231 REGINFO *infop;
232 REP *rep;
233 __rep_control_args *rp, tmprp;
234 __rep_egen_args egen_arg;
235 size_t len;
236 u_int32_t gen;
237 int cmp, do_sync, lockout, master_id, recovering, ret, t_ret;
238 time_t savetime;
239 u_int8_t buf[__REP_MAXMSG_SIZE];
240
241 ret = 0;
242 do_sync = 0;
243 lockout = 0;
244 db_rep = env->rep_handle;
245 rep = db_rep->region;
246 dblp = env->lg_handle;
247 lp = dblp->reginfo.primary;
248 infop = env->reginfo;
249 renv = infop->primary;
250
251 if ((ret = __rep_control_unmarshal(env, &tmprp,
252 control->data, control->size, NULL)) != 0)
253 return (ret);
254 rp = &tmprp;
255 if (ret_lsnp != NULL)
256 ZERO_LSN(*ret_lsnp);
257
258 ENV_GET_THREAD_INFO(env, ip);
259 REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0);
260 /*
261 * Check the version number for both rep and log. If it is
262 * an old version we support, convert it. Otherwise complain.
263 */
264 if (rp->rep_version < DB_REPVERSION) {
265 if (rp->rep_version < DB_REPVERSION_MIN) {
266 __db_errx(env, DB_STR_A("3516",
267 "unsupported old replication message version %lu, minimum version %d",
268 "%lu %d"), (u_long)rp->rep_version,
269 DB_REPVERSION_MIN);
270
271 return (EINVAL);
272 }
273 VPRINT(env, (env, DB_VERB_REP_MSGS,
274 "Received record %lu with old rep version %lu",
275 (u_long)rp->rectype, (u_long)rp->rep_version));
276 rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype);
277 DB_ASSERT(env, rp->rectype != REP_INVALID);
278 /*
279 * We should have a valid new record type for all the old
280 * versions.
281 */
282 VPRINT(env, (env, DB_VERB_REP_MSGS,
283 "Converted to record %lu with old rep version %lu",
284 (u_long)rp->rectype, (u_long)rp->rep_version));
285 } else if (rp->rep_version > DB_REPVERSION) {
286 __db_errx(env, DB_STR_A("3517",
287 "unexpected replication message version %lu, expected %d",
288 "%lu %d"), (u_long)rp->rep_version, DB_REPVERSION);
289 return (EINVAL);
290 }
291
292 if (rp->log_version < DB_LOGVERSION) {
293 if (rp->log_version < DB_LOGVERSION_MIN) {
294 __db_errx(env, DB_STR_A("3518",
295 "unsupported old replication log version %lu, minimum version %d",
296 "%lu %d"), (u_long)rp->log_version,
297 DB_LOGVERSION_MIN);
298 return (EINVAL);
299 }
300 VPRINT(env, (env, DB_VERB_REP_MSGS,
301 "Received record %lu with old log version %lu",
302 (u_long)rp->rectype, (u_long)rp->log_version));
303 } else if (rp->log_version > DB_LOGVERSION) {
304 __db_errx(env, DB_STR_A("3519",
305 "unexpected log record version %lu, expected %d",
306 "%lu %d"), (u_long)rp->log_version, DB_LOGVERSION);
307 return (EINVAL);
308 }
309
310 /*
311 * Acquire the replication lock.
312 */
313 REP_SYSTEM_LOCK(env);
314 if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) {
315 /*
316 * If we're racing with a thread in rep_start, then
317 * just ignore the message and return.
318 */
319 RPRINT(env, (env, DB_VERB_REP_MSGS,
320 "Racing replication msg lockout, ignore message."));
321 /*
322 * Although we're ignoring the message, there are a few
323 * we need to pay a bit of attention to anyway. All of
324 * these cases are mutually exclusive.
325 * 1. If it is a PERM message, we don't want to return 0.
326 * 2. If it is a NEWSITE message let the app know so it can
327 * do whatever it needs for connection purposes.
328 * 3. If it is a c2c request, tell the sender we're not
329 * going to handle it.
330 */
331 if (F_ISSET(rp, REPCTL_PERM))
332 ret = DB_REP_IGNORE;
333 REP_SYSTEM_UNLOCK(env);
334 /*
335 * If this is new site information return DB_REP_NEWSITE so
336 * that the user can use whatever information may have been
337 * sent for connections.
338 */
339 if (rp->rectype == REP_NEWSITE)
340 ret = DB_REP_NEWSITE;
341 /*
342 * If another client has sent a c2c request to us, it may be a
343 * long time before it resends the request (due to its dual data
344 * streams avoidance heuristic); let it know we can't serve the
345 * request just now.
346 */
347 if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) {
348 STAT(rep->stat.st_client_svc_req++);
349 STAT(rep->stat.st_client_svc_miss++);
350 (void)__rep_send_message(env,
351 eid, REP_REREQUEST, NULL, NULL, 0, 0);
352 }
353 goto out;
354 }
355 rep->msg_th++;
356 gen = rep->gen;
357 master_id = rep->master_id;
358 recovering = IS_REP_RECOVERING(rep);
359 savetime = renv->rep_timestamp;
360
361 STAT(rep->stat.st_msgs_processed++);
362 REP_SYSTEM_UNLOCK(env);
363
364 /*
365 * Check for lease configuration matching. Leases must be
366 * configured all or none. If I am a client and I receive a
367 * message requesting a lease, and I'm not using leases, that
368 * is an error.
369 */
370 if (!IS_USING_LEASES(env) &&
371 (F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) {
372 __db_errx(env, DB_STR("3520",
373 "Inconsistent lease configuration"));
374 RPRINT(env, (env, DB_VERB_REP_MSGS,
375 "Client received lease message and not using leases"));
376 ret = EINVAL;
377 ret = __env_panic(env, ret);
378 goto errlock;
379 }
380
381 /*
382 * Check for generation number matching. Ignore any old messages
383 * except requests that are indicative of a new client that needs
384 * to get in sync.
385 */
386 if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
387 rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
388 rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) {
389 /*
390 * We don't hold the rep mutex, and could miscount if we race.
391 */
392 STAT(rep->stat.st_msgs_badgen++);
393 if (F_ISSET(rp, REPCTL_PERM))
394 ret = DB_REP_IGNORE;
395 goto errlock;
396 }
397
398 if (rp->gen > gen) {
399 /*
400 * If I am a master and am out of date with a lower generation
401 * number, I am in bad shape and should downgrade.
402 */
403 if (F_ISSET(rep, REP_F_MASTER)) {
404 STAT(rep->stat.st_dupmasters++);
405 ret = DB_REP_DUPMASTER;
406 /*
407 * Only broadcast DUPMASTER if leases are not
408 * in effect. If I am an old master, using
409 * leases and I get a newer message, my leases
410 * had better all be expired.
411 */
412 if (IS_USING_LEASES(env))
413 DB_ASSERT(env,
414 __rep_lease_check(env, 0) ==
415 DB_REP_LEASE_EXPIRED);
416 else if (rp->rectype != REP_DUPMASTER)
417 (void)__rep_send_message(env,
418 DB_EID_BROADCAST, REP_DUPMASTER,
419 NULL, NULL, 0, 0);
420 goto errlock;
421 }
422
423 /*
424 * I am a client and am out of date. If this is an election,
425 * or a response from the first site I contacted, then I can
426 * accept the generation number and participate in future
427 * elections and communication. Otherwise, I need to hear about
428 * a new master and sync up.
429 *
430 * But do not do any of this if REP_F_HOLD_GEN is set. In
431 * this case we keep the site at its current gen until we
432 * clear this flag.
433 */
434 if ((rp->rectype == REP_ALIVE ||
435 rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) &&
436 !F_ISSET(rep, REP_F_HOLD_GEN)) {
437 REP_SYSTEM_LOCK(env);
438 RPRINT(env, (env, DB_VERB_REP_MSGS,
439 "Updating gen from %lu to %lu",
440 (u_long)gen, (u_long)rp->gen));
441 rep->master_id = DB_EID_INVALID;
442 gen = rp->gen;
443 SET_GEN(gen);
444 /*
445 * Updating of egen will happen when we process the
446 * message below for each message type.
447 */
448 REP_SYSTEM_UNLOCK(env);
449 if (rp->rectype == REP_ALIVE)
450 (void)__rep_send_message(env,
451 DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
452 NULL, 0, 0);
453 } else if (rp->rectype != REP_NEWMASTER) {
454 /*
455 * Ignore this message, retransmit if needed.
456 */
457 if (__rep_check_doreq(env, rep))
458 (void)__rep_send_message(env,
459 DB_EID_BROADCAST, REP_MASTER_REQ,
460 NULL, NULL, 0, 0);
461 goto errlock;
462 }
463 /*
464 * If you get here, then you're a client and either you're
465 * in an election or you have a NEWMASTER or an ALIVE message
466 * whose processing will do the right thing below.
467 */
468 }
469
470 /*
471 * If the sender is part of an established group, so are we now.
472 */
473 if (F_ISSET(rp, REPCTL_GROUP_ESTD)) {
474 REP_SYSTEM_LOCK(env);
475 #ifdef DIAGNOSTIC
476 if (!F_ISSET(rep, REP_F_GROUP_ESTD))
477 RPRINT(env, (env, DB_VERB_REP_MSGS,
478 "I am now part of an established group"));
479 #endif
480 F_SET(rep, REP_F_GROUP_ESTD);
481 REP_SYSTEM_UNLOCK(env);
482 }
483
484 /*
485 * We need to check if we're in recovery and if we are
486 * then we need to ignore any messages except VERIFY*, VOTE*,
487 * NEW* and ALIVE_REQ, or backup related messages: UPDATE*,
488 * PAGE* and FILE*. We need to also accept LOG messages
489 * if we're copying the log for recovery/backup.
490 */
491 switch (rp->rectype) {
492 case REP_ALIVE:
493 /*
494 * Handle even if we're recovering.
495 */
496 ANYSITE(rep);
497 if ((ret = __rep_egen_unmarshal(env, &egen_arg,
498 rec->data, rec->size, NULL)) != 0)
499 return (ret);
500 REP_SYSTEM_LOCK(env);
501 if (egen_arg.egen > rep->egen) {
502 /*
503 * If we're currently working futilely at processing an
504 * obsolete egen, treat it like an egen update, so that
505 * we abort the current rep_elect() call and signal the
506 * application to start a new one.
507 */
508 if (rep->spent_egen == rep->egen)
509 ret = DB_REP_HOLDELECTION;
510
511 RPRINT(env, (env, DB_VERB_REP_MSGS,
512 "Received ALIVE egen of %lu, mine %lu",
513 (u_long)egen_arg.egen, (u_long)rep->egen));
514 __rep_elect_done(env, rep);
515 rep->egen = egen_arg.egen;
516 }
517 REP_SYSTEM_UNLOCK(env);
518 break;
519 case REP_ALIVE_REQ:
520 /*
521 * Handle even if we're recovering.
522 */
523 ANYSITE(rep);
524 LOG_SYSTEM_LOCK(env);
525 lsn = lp->lsn;
526 LOG_SYSTEM_UNLOCK(env);
527 #ifdef CONFIG_TEST
528 /*
529 * Send this first, before the ALIVE message because of the
530 * way the test suite and messaging is done sequentially.
531 * In some sequences it is possible to get into a situation
532 * where the test suite cannot get the later NEWMASTER because
533 * we break out of the messaging loop too early.
534 */
535 if (F_ISSET(rep, REP_F_MASTER))
536 (void)__rep_send_message(env,
537 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
538 #endif
539 REP_SYSTEM_LOCK(env);
540 egen_arg.egen = rep->egen;
541 REP_SYSTEM_UNLOCK(env);
542 if ((ret = __rep_egen_marshal(env,
543 &egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0)
544 goto errlock;
545 DB_INIT_DBT(data_dbt, buf, len);
546 (void)__rep_send_message(env,
547 eid, REP_ALIVE, &lsn, &data_dbt, 0, 0);
548 break;
549 case REP_ALL_REQ:
550 RECOVERING_SKIP;
551 CLIENT_MASTERCHK;
552 ret = __rep_allreq(env, rp, eid);
553 CLIENT_REREQ;
554 break;
555 case REP_BLOB_ALL_REQ:
556 RECOVERING_SKIP;
557 CLIENT_MASTERCHK;
558 MASTER_UPDATE(env, renv);
559 ret = __rep_blob_allreq(env, eid, rec);
560 CLIENT_REREQ;
561 break;
562 case REP_BLOB_CHUNK:
563 /* Handle even if in recovery. */
564 CLIENT_ONLY(rep, rp);
565 ret = __rep_blob_chunk(env, eid, ip, rec);
566 if (ret == DB_REP_PAGEDONE)
567 ret = 0;
568 break;
569 case REP_BLOB_CHUNK_REQ:
570 RECOVERING_SKIP;
571 CLIENT_MASTERCHK;
572 MASTER_UPDATE(env, renv);
573 ret = __rep_blob_chunk_req(env, eid, rec);
574 CLIENT_REREQ;
575 break;
576 case REP_BLOB_UPDATE:
577 CLIENT_ONLY(rep, rp);
578 ret = __rep_blob_update(env, eid, ip, rec);
579 break;
580 case REP_BLOB_UPDATE_REQ:
581 MASTER_ONLY(rep, rp);
582 MASTER_UPDATE(env, renv);
583 ret = __rep_blob_update_req(env, eid, ip, rec);
584 break;
585 case REP_BULK_LOG:
586 RECOVERING_LOG_SKIP;
587 CLIENT_ONLY(rep, rp);
588 ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp);
589 break;
590 case REP_BULK_PAGE:
591 /*
592 * Handle even if we're recovering.
593 */
594 CLIENT_ONLY(rep, rp);
595 ret = __rep_bulk_page(env, ip, eid, rp, rec);
596 break;
597 case REP_DUPMASTER:
598 /*
599 * Handle even if we're recovering.
600 */
601 if (F_ISSET(rep, REP_F_MASTER))
602 ret = DB_REP_DUPMASTER;
603 break;
604 #ifdef NOTYET
605 case REP_FILE: /* TODO */
606 CLIENT_ONLY(rep, rp);
607 break;
608 case REP_FILE_REQ:
609 ret = __rep_send_file(env, rec, eid);
610 break;
611 #endif
612 case REP_FILE_FAIL:
613 /*
614 * Handle even if we're recovering.
615 */
616 CLIENT_ONLY(rep, rp);
617 /*
618 * Clean up any internal init that was in progress.
619 */
620 if (eid == rep->master_id) {
621 REP_SYSTEM_LOCK(env);
622 /*
623 * If we're already locking out messages, give up.
624 */
625 if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG))
626 goto errhlk;
627 /*
628 * Lock out other messages to prevent race
629 * conditions.
630 */
631 if ((ret =
632 __rep_lockout_msg(env, rep, 1)) != 0) {
633 goto errhlk;
634 }
635 lockout = 1;
636 /*
637 * Need mtx_clientdb to safely clean up
638 * page database in __rep_init_cleanup().
639 */
640 REP_SYSTEM_UNLOCK(env);
641 MUTEX_LOCK(env, rep->mtx_clientdb);
642 REP_SYSTEM_LOCK(env);
643 /*
644 * Clean up internal init if one was in progress.
645 */
646 if (ISSET_LOCKOUT_BDB(rep)) {
647 RPRINT(env, (env, DB_VERB_REP_MSGS,
648 "FILE_FAIL is cleaning up old internal init"));
649 #ifdef CONFIG_TEST
650 STAT(rep->stat.st_filefail_cleanups++);
651 #endif
652 ret = __rep_init_cleanup(env, rep, DB_FORCE);
653 F_CLR(rep, REP_F_ABBREVIATED);
654 CLR_RECOVERY_SETTINGS(rep);
655 #ifdef HAVE_REPLICATION_THREADS
656 db_rep->abbrev_init = FALSE;
657 #endif
658 }
659 MUTEX_UNLOCK(env, rep->mtx_clientdb);
660 if (ret != 0) {
661 RPRINT(env, (env, DB_VERB_REP_MSGS,
662 "FILE_FAIL error cleaning up internal init: %d", ret));
663 goto errhlk;
664 }
665 FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
666 lockout = 0;
667 /*
668 * Restart internal init, setting UPDATE flag and
669 * zeroing applicable LSNs.
670 */
671 rep->sync_state = SYNC_UPDATE;
672 ZERO_LSN(rep->first_lsn);
673 ZERO_LSN(rep->ckp_lsn);
674 REP_SYSTEM_UNLOCK(env);
675 (void)__rep_send_message(env, eid, REP_UPDATE_REQ,
676 NULL, NULL, 0, 0);
677 }
678 break;
679 case REP_LEASE_GRANT:
680 /*
681 * Handle even if we're recovering.
682 */
683 MASTER_ONLY(rep, rp);
684 ret = __rep_lease_grant(env, rp, rec, eid);
685 break;
686 case REP_LOG:
687 case REP_LOG_MORE:
688 RECOVERING_LOG_SKIP;
689 CLIENT_ONLY(rep, rp);
690 ret = __rep_log(env, ip, rp, rec, eid, savetime, ret_lsnp);
691 break;
692 case REP_LOG_REQ:
693 RECOVERING_SKIP;
694 CLIENT_MASTERCHK;
695 if (F_ISSET(rp, REPCTL_INIT))
696 MASTER_UPDATE(env, renv);
697 ret = __rep_logreq(env, rp, rec, eid);
698 CLIENT_REREQ;
699 break;
700 case REP_NEWSITE:
701 /*
702 * Handle even if we're recovering.
703 */
704 /* We don't hold the rep mutex, and may miscount. */
705 STAT(rep->stat.st_newsites++);
706
707 /* This is a rebroadcast; simply tell the application. */
708 if (F_ISSET(rep, REP_F_MASTER)) {
709 dblp = env->lg_handle;
710 lp = dblp->reginfo.primary;
711 LOG_SYSTEM_LOCK(env);
712 lsn = lp->lsn;
713 LOG_SYSTEM_UNLOCK(env);
714 (void)__rep_send_message(env,
715 eid, REP_NEWMASTER, &lsn, NULL, 0, 0);
716 }
717 ret = DB_REP_NEWSITE;
718 break;
719 case REP_NEWCLIENT:
720 /*
721 * Handle even if we're recovering.
722 */
723 /*
724 * This message was received and should have resulted in the
725 * application entering the machine ID in its machine table.
726 * We respond to this with an ALIVE to send relevant information
727 * to the new client (if we are a master, we'll send a
728 * NEWMASTER, so we only need to send the ALIVE if we're a
729 * client). But first, broadcast the new client's record to
730 * all the clients.
731 */
732 (void)__rep_send_message(env,
733 DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0);
734
735 ret = DB_REP_NEWSITE;
736
737 if (F_ISSET(rep, REP_F_CLIENT)) {
738 REP_SYSTEM_LOCK(env);
739 egen_arg.egen = rep->egen;
740
741 /*
742 * Clean up any previous master remnants by making
743 * master_id invalid and cleaning up any internal
744 * init that was in progress.
745 */
746 if (eid == rep->master_id) {
747 rep->master_id = DB_EID_INVALID;
748
749 /*
750 * Already locking out messages, must be
751 * in sync-up recover or internal init,
752 * give up.
753 */
754 if (FLD_ISSET(rep->lockout_flags,
755 REP_LOCKOUT_MSG))
756 goto errhlk;
757
758 /*
759 * Lock out other messages to prevent race
760 * conditions.
761 */
762 if ((t_ret =
763 __rep_lockout_msg(env, rep, 1)) != 0) {
764 ret = t_ret;
765 goto errhlk;
766 }
767 lockout = 1;
768
769 /*
770 * Need mtx_clientdb to safely clean up
771 * page database in __rep_init_cleanup().
772 */
773 REP_SYSTEM_UNLOCK(env);
774 MUTEX_LOCK(env, rep->mtx_clientdb);
775 REP_SYSTEM_LOCK(env);
776
777 /*
778 * Clean up internal init if one was in
779 * progress.
780 */
781 if (ISSET_LOCKOUT_BDB(rep)) {
782 RPRINT(env, (env, DB_VERB_REP_MSGS,
783 "NEWCLIENT is cleaning up old internal init for invalid master"));
784 t_ret = __rep_init_cleanup(env,
785 rep, DB_FORCE);
786 F_CLR(rep, REP_F_ABBREVIATED);
787 CLR_RECOVERY_SETTINGS(rep);
788 #ifdef HAVE_REPLICATION_THREADS
789 db_rep->abbrev_init = FALSE;
790 #endif
791 }
792 MUTEX_UNLOCK(env, rep->mtx_clientdb);
793 if (t_ret != 0) {
794 ret = t_ret;
795 RPRINT(env, (env, DB_VERB_REP_MSGS,
796 "NEWCLIENT error cleaning up internal init for invalid master: %d", ret));
797 goto errhlk;
798 }
799 FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
800 lockout = 0;
801 }
802 REP_SYSTEM_UNLOCK(env);
803 if ((ret = __rep_egen_marshal(env, &egen_arg,
804 buf, __REP_EGEN_SIZE, &len)) != 0)
805 goto errlock;
806 DB_INIT_DBT(data_dbt, buf, len);
807 (void)__rep_send_message(env, DB_EID_BROADCAST,
808 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
809 break;
810 }
811 /* FALLTHROUGH */
812 case REP_MASTER_REQ:
813 RECOVERING_SKIP;
814 if (F_ISSET(rep, REP_F_MASTER)) {
815 LOG_SYSTEM_LOCK(env);
816 lsn = lp->lsn;
817 LOG_SYSTEM_UNLOCK(env);
818 (void)__rep_send_message(env,
819 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
820 if (IS_USING_LEASES(env))
821 (void)__rep_lease_refresh(env);
822 }
823 /*
824 * If there is no master, then we could get into a state
825 * where an old client lost the initial ALIVE message and
826 * is calling an election under an old gen and can
827 * never get to the current gen.
828 */
829 if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
830 REP_SYSTEM_LOCK(env);
831 egen_arg.egen = rep->egen;
832 if (eid == rep->master_id)
833 rep->master_id = DB_EID_INVALID;
834 REP_SYSTEM_UNLOCK(env);
835 if ((ret = __rep_egen_marshal(env, &egen_arg,
836 buf, __REP_EGEN_SIZE, &len)) != 0)
837 goto errlock;
838 DB_INIT_DBT(data_dbt, buf, len);
839 (void)__rep_send_message(env, eid,
840 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
841 }
842 break;
843 case REP_NEWFILE:
844 RECOVERING_LOG_SKIP;
845 CLIENT_ONLY(rep, rp);
846 ret = __rep_apply(env,
847 ip, rp, rec, ret_lsnp, NULL, &last_lsn);
848 if (ret == DB_REP_LOGREADY)
849 ret = __rep_logready(env, rep, savetime, &last_lsn);
850 break;
851 case REP_NEWMASTER:
852 /*
853 * Handle even if we're recovering.
854 */
855 ANYSITE(rep);
856 if (F_ISSET(rep, REP_F_MASTER) &&
857 eid != rep->eid) {
858 /* We don't hold the rep mutex, and may miscount. */
859 STAT(rep->stat.st_dupmasters++);
860 ret = DB_REP_DUPMASTER;
861 if (IS_USING_LEASES(env))
862 DB_ASSERT(env,
863 __rep_lease_check(env, 0) ==
864 DB_REP_LEASE_EXPIRED);
865 else
866 (void)__rep_send_message(env,
867 DB_EID_BROADCAST, REP_DUPMASTER,
868 NULL, NULL, 0, 0);
869 break;
870 }
871 if ((ret =
872 __rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER)
873 ret = __rep_fire_newmaster(env, rp->gen, eid);
874 break;
875 case REP_PAGE:
876 case REP_PAGE_FAIL:
877 case REP_PAGE_MORE:
878 /*
879 * Handle even if we're recovering.
880 */
881 CLIENT_ONLY(rep, rp);
882 ret = __rep_page(env, ip, eid, rp, rec);
883 if (ret == DB_REP_PAGEDONE)
884 ret = 0;
885 break;
886 case REP_PAGE_REQ:
887 RECOVERING_SKIP;
888 CLIENT_MASTERCHK;
889 MASTER_UPDATE(env, renv);
890 ret = __rep_page_req(env, ip, eid, rp, rec);
891 CLIENT_REREQ;
892 break;
893 case REP_REREQUEST:
894 /*
895 * Handle even if we're recovering. Don't do a master
896 * check.
897 */
898 CLIENT_ONLY(rep, rp);
899 /*
900 * Don't hold any mutex, may miscount.
901 */
902 STAT(rep->stat.st_client_rerequests++);
903 ret = __rep_resend_req(env, 1);
904 break;
905 case REP_START_SYNC:
906 RECOVERING_SKIP;
907 MUTEX_LOCK(env, rep->mtx_clientdb);
908 cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
909 /*
910 * The comparison needs to be <= because the LSN in
911 * the message can be the LSN of the first outstanding
912 * txn, which may be the LSN immediately after the
913 * previous commit. The ready_lsn is the LSN of the
914 * next record expected. In that case, the LSNs
915 * could be equal and the client has the commit and
916 * wants to sync. [SR #15338]
917 */
918 if (cmp <= 0) {
919 MUTEX_UNLOCK(env, rep->mtx_clientdb);
920 do_sync = 1;
921 } else {
922 STAT(rep->stat.st_startsync_delayed++);
923 /*
924 * There are cases where keeping the first ckp_lsn
925 * LSN is advantageous and cases where keeping
926 * a later LSN is better. If random, earlier
927 * log records are missing, keeping the later
928 * LSN seems to be better. That is what we'll
929 * do for now.
930 */
931 if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0)
932 rep->ckp_lsn = rp->lsn;
933 RPRINT(env, (env, DB_VERB_REP_MSGS,
934 "Delayed START_SYNC memp_sync due to missing records."));
935 RPRINT(env, (env, DB_VERB_REP_MSGS,
936 "ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]",
937 (u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset,
938 (u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset));
939 MUTEX_UNLOCK(env, rep->mtx_clientdb);
940 }
941 break;
942 case REP_UPDATE:
943 /*
944 * Handle even if we're recovering.
945 */
946 CLIENT_ONLY(rep, rp);
947 if ((ret = __rep_update_setup(env,
948 eid, rp, rec, savetime, &lsn)) == DB_REP_WOULDROLLBACK &&
949 ret_lsnp != NULL) {
950 /*
951 * Not for a normal internal init. But this could
952 * happen here if we had to ask for an UPDATE message in
953 * order to check for materializing NIMDBs; in other
954 * words, an "abbreviated internal init."
955 */
956 *ret_lsnp = lsn;
957 }
958 break;
959 case REP_UPDATE_REQ:
960 /*
961 * Handle even if we're recovering.
962 */
963 MASTER_ONLY(rep, rp);
964 infop = env->reginfo;
965 renv = infop->primary;
966 MASTER_UPDATE(env, renv);
967 ret = __rep_update_req(env, rp);
968 break;
969 case REP_VERIFY:
970 if (recovering) {
971 MUTEX_LOCK(env, rep->mtx_clientdb);
972 cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn);
973 MUTEX_UNLOCK(env, rep->mtx_clientdb);
974 /*
975 * If this is not the verify record I want, skip it.
976 */
977 if (cmp != 0) {
978 ret = __rep_skip_msg(
979 env, rep, eid, rp->rectype);
980 break;
981 }
982 }
983 CLIENT_ONLY(rep, rp);
984 if ((ret = __rep_verify(env, rp, rec, eid, savetime)) ==
985 DB_REP_WOULDROLLBACK && ret_lsnp != NULL)
986 *ret_lsnp = rp->lsn;
987 break;
988 case REP_VERIFY_FAIL:
989 /*
990 * Handle even if we're recovering.
991 */
992 CLIENT_ONLY(rep, rp);
993 ret = __rep_verify_fail(env, rp);
994 break;
995 case REP_VERIFY_REQ:
996 RECOVERING_SKIP;
997 CLIENT_MASTERCHK;
998 ret = __rep_verify_req(env, rp, eid);
999 CLIENT_REREQ;
1000 break;
1001 case REP_VOTE1:
1002 /*
1003 * Handle even if we're recovering.
1004 */
1005 ret = __rep_vote1(env, rp, rec, eid);
1006 break;
1007 case REP_VOTE2:
1008 /*
1009 * Handle even if we're recovering.
1010 */
1011 ret = __rep_vote2(env, rec, eid);
1012 break;
1013 default:
1014 __db_errx(env, DB_STR_A("3521",
1015 "DB_ENV->rep_process_message: unknown replication message: type %lu",
1016 "%lu"), (u_long)rp->rectype);
1017 ret = EINVAL;
1018 break;
1019 }
1020
1021 errlock:
1022 REP_SYSTEM_LOCK(env);
1023 errhlk: if (lockout)
1024 FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
1025 rep->msg_th--;
1026 REP_SYSTEM_UNLOCK(env);
1027 if (do_sync) {
1028 MUTEX_LOCK(env, rep->mtx_ckp);
1029 lsn = rp->lsn;
1030 /*
1031 * This is the REP_START_SYNC sync, and so we permit it to be
1032 * interrupted.
1033 */
1034 ret = __memp_sync(
1035 env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn);
1036 MUTEX_UNLOCK(env, rep->mtx_ckp);
1037 RPRINT(env, (env, DB_VERB_REP_MSGS,
1038 "START_SYNC: Completed sync [%lu][%lu]",
1039 (u_long)lsn.file, (u_long)lsn.offset));
1040 }
1041 out:
1042 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
1043 if (ret_lsnp != NULL)
1044 *ret_lsnp = rp->lsn;
1045 ret = DB_REP_NOTPERM;
1046 }
1047 return (ret);
1048 }
1049
1050 /*
1051 * __rep_apply --
1052 *
1053 * Handle incoming log records on a client, applying when possible and
1054 * entering into the bookkeeping table otherwise. This routine manages
1055 * the state of the incoming message stream -- processing records, via
1056 * __rep_process_rec, when possible and enqueuing in the __db.rep.db
1057 * when necessary. As gaps in the stream are filled in, this is where
1058 * we try to process as much as possible from __db.rep.db to catch up.
1059 *
1060 * PUBLIC: int __rep_apply __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
1061 * PUBLIC: DBT *, DB_LSN *, int *, DB_LSN *));
1062 */
1063 int
__rep_apply(env,ip,rp,rec,ret_lsnp,is_dupp,last_lsnp)1064 __rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp)
1065 ENV *env;
1066 DB_THREAD_INFO *ip;
1067 __rep_control_args *rp;
1068 DBT *rec;
1069 DB_LSN *ret_lsnp;
1070 int *is_dupp;
1071 DB_LSN *last_lsnp;
1072 {
1073 DB *dbp;
1074 DBT control_dbt, key_dbt;
1075 DBT rec_dbt;
1076 DB_LOG *dblp;
1077 DB_LSN max_lsn, save_lsn;
1078 DB_REP *db_rep;
1079 LOG *lp;
1080 REP *rep;
1081 db_timespec msg_time, max_ts;
1082 u_int32_t gen, rectype;
1083 int cmp, event, master, newfile_seen, ret, set_apply, t_ret;
1084
1085 COMPQUIET(gen, 0);
1086 COMPQUIET(master, DB_EID_INVALID);
1087
1088 db_rep = env->rep_handle;
1089 rep = db_rep->region;
1090 event = ret = set_apply = 0;
1091 memset(&control_dbt, 0, sizeof(control_dbt));
1092 memset(&rec_dbt, 0, sizeof(rec_dbt));
1093 ZERO_LSN(max_lsn);
1094 timespecclear(&max_ts);
1095 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1096 cmp = -2; /* OOB value that LOG_COMPARE can't return. */
1097
1098 dblp = env->lg_handle;
1099 MUTEX_LOCK(env, rep->mtx_clientdb);
1100 /*
1101 * Lazily open the temp db. Always set the startup flag to 0
1102 * because it was initialized from rep_start.
1103 */
1104 if (db_rep->rep_db == NULL &&
1105 (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
1106 MUTEX_UNLOCK(env, rep->mtx_clientdb);
1107 goto out;
1108 }
1109 dbp = db_rep->rep_db;
1110 lp = dblp->reginfo.primary;
1111 newfile_seen = 0;
1112 REP_SYSTEM_LOCK(env);
1113 if (rep->sync_state == SYNC_LOG &&
1114 LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0)
1115 lp->ready_lsn = rep->first_lsn;
1116 cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
1117 /*
1118 * If we are going to skip or process any message other
1119 * than a duplicate, make note of it if we're in an
1120 * election so that the election can rerequest proactively.
1121 */
1122 if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_APPLY) && cmp >= 0)
1123 F_SET(rep, REP_F_SKIPPED_APPLY);
1124
1125 /*
1126 * If we're in the middle of processing a NEWFILE, we've dropped
1127 * the mutex and if this matches it is a duplicate record. We
1128 * do not want this call taking the "matching" code below because
1129 * we may then process later records in the temp db and the
1130 * original NEWFILE may not have the log file ready. It will
1131 * process those temp db items when it completes.
1132 */
1133 if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0)
1134 cmp = -1;
1135
1136 if (cmp == 0) {
1137 /*
1138 * If we are in an election (i.e. we've sent a vote
1139 * with an LSN in it), then we drop the next record
1140 * we're expecting. When we find a master, we'll
1141 * either go into sync, or if it was an existing
1142 * master, rerequest this one record (later records
1143 * are accumulating in the temp db).
1144 *
1145 * We can simply return here, and rep_process_message
1146 * will set NOTPERM if necessary for this record.
1147 */
1148 if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_APPLY)) {
1149 /*
1150 * We will simply return now. All special return
1151 * processing should be ignored because the special
1152 * values are just initialized. Variables like
1153 * max_lsn are still 0.
1154 */
1155 RPRINT(env, (env, DB_VERB_REP_MISC,
1156 "rep_apply: In election. Ignoring [%lu][%lu]",
1157 (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1158 REP_SYSTEM_UNLOCK(env);
1159 MUTEX_UNLOCK(env, rep->mtx_clientdb);
1160 goto out;
1161 }
1162 rep->apply_th++;
1163 set_apply = 1;
1164 VPRINT(env, (env, DB_VERB_REP_MISC,
1165 "rep_apply: Set apply_th %d", rep->apply_th));
1166 REP_SYSTEM_UNLOCK(env);
1167 if (rp->rectype == REP_NEWFILE)
1168 newfile_seen = 1;
1169 if ((ret = __rep_process_rec(env, ip,
1170 rp, rec, &max_ts, &max_lsn)) != 0)
1171 goto err;
1172 /*
1173 * If we get the record we are expecting, reset
1174 * the count of records we've received and are applying
1175 * towards the request interval.
1176 */
1177 __os_gettime(env, &lp->rcvd_ts, 1);
1178 ZERO_LSN(lp->max_wait_lsn);
1179
1180 /*
1181 * The __rep_remfirst() and __rep_getnext() functions each open,
1182 * use and then close a cursor on the temp db, each time through
1183 * the loop. Although this may seem excessive, it is necessary
1184 * to avoid locking problems with checkpoints.
1185 */
1186 while (ret == 0 &&
1187 LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
1188 /*
1189 * We just filled in a gap in the log record stream.
1190 * Write subsequent records to the log.
1191 */
1192 gap_check:
1193 if ((ret = __rep_remfirst(env, ip,
1194 &control_dbt, &rec_dbt)) != 0)
1195 goto err;
1196
1197 rp = (__rep_control_args *)control_dbt.data;
1198 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1199 rec = &rec_dbt;
1200 if (rp->rectype == REP_NEWFILE)
1201 newfile_seen = 1;
1202 if ((ret = __rep_process_rec(env, ip,
1203 rp, rec, &max_ts, &max_lsn)) != 0)
1204 goto err;
1205
1206 STAT(--rep->stat.st_log_queued);
1207
1208 /*
1209 * Since we just filled a gap in the log stream, and
1210 * we're writing subsequent records to the log, we want
1211 * to use rcvd_ts and wait_ts so that we will
1212 * request the next gap if we end up with a gap and
1213 * not so recent records in the temp db, but not
1214 * request if recent records are in the temp db and
1215 * likely to arrive on its own shortly. We want to
1216 * avoid requesting the record in that case. Also
1217 * reset max_wait_lsn because the next gap is a
1218 * fresh gap.
1219 */
1220 lp->rcvd_ts = lp->last_ts;
1221 lp->wait_ts = rep->request_gap;
1222 if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) {
1223 __os_gettime(env, &lp->rcvd_ts, 1);
1224 ret = 0;
1225 break;
1226 } else if (ret != 0)
1227 goto err;
1228 }
1229
1230 /*
1231 * Check if we're at a gap in the table and if so, whether we
1232 * need to ask for any records.
1233 */
1234 if (!IS_ZERO_LSN(lp->waiting_lsn) &&
1235 LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
1236 /*
1237 * We got a record and processed it, but we may
1238 * still be waiting for more records. If we
1239 * filled a gap we keep a count of how many other
1240 * records are in the temp database and if we should
1241 * request the next gap at this time.
1242 */
1243 if (__rep_check_doreq(env, rep) && (ret =
1244 __rep_loggap_req(env, rep, &rp->lsn, 0)) != 0)
1245 goto err;
1246 } else {
1247 lp->wait_ts = rep->request_gap;
1248 ZERO_LSN(lp->max_wait_lsn);
1249 }
1250
1251 } else if (cmp > 0) {
1252 /*
1253 * The LSN is higher than the one we were waiting for.
1254 * This record isn't in sequence; add it to the temporary
1255 * database, update waiting_lsn if necessary, and perform
1256 * calculations to determine if we should issue requests
1257 * for new records.
1258 */
1259 REP_SYSTEM_UNLOCK(env);
1260 memset(&key_dbt, 0, sizeof(key_dbt));
1261 key_dbt.data = rp;
1262 key_dbt.size = sizeof(*rp);
1263 ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
1264 if (ret == 0) {
1265 STAT(rep->stat.st_log_queued++);
1266 __os_gettime(env, &lp->last_ts, 1);
1267 #ifdef HAVE_STATISTICS
1268 rep->stat.st_log_queued_total++;
1269 if (rep->stat.st_log_queued_max <
1270 rep->stat.st_log_queued)
1271 rep->stat.st_log_queued_max =
1272 rep->stat.st_log_queued;
1273 #endif
1274 }
1275
1276 if (ret == DB_KEYEXIST) {
1277 STAT(rep->stat.st_log_duplicated++);
1278 #ifdef CONFIG_TEST
1279 STAT(rep->stat.st_log_futuredup++);
1280 #endif
1281 if (is_dupp != NULL) {
1282 *is_dupp = 1;
1283 /*
1284 * Could get overwritten by max_lsn later,
1285 * but only when returning NOTPERM for a
1286 * REPCTL_PERM record, in which case max_lsn
1287 * is this log record.
1288 */
1289 if (ret_lsnp != NULL)
1290 *ret_lsnp = lp->ready_lsn;
1291 }
1292 ret = 0;
1293 }
1294 if (ret != 0 && ret != ENOMEM)
1295 goto done;
1296
1297 /*
1298 * If we are using in-memory, and got ENOMEM, it is
1299 * not an error. But in that case we want to skip
1300 * comparing the message LSN since we're not storing it.
1301 * However, we do want continue to check if we need to
1302 * send a request for the gap.
1303 */
1304 if (ret == 0 && (IS_ZERO_LSN(lp->waiting_lsn) ||
1305 LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0)) {
1306 /*
1307 * If this is a new gap, then reset the rcvd_ts so
1308 * that an out-of-order record after an idle period
1309 * does not (likely) immediately rerequest.
1310 */
1311 if (IS_ZERO_LSN(lp->waiting_lsn))
1312 __os_gettime(env, &lp->rcvd_ts, 1);
1313 lp->waiting_lsn = rp->lsn;
1314 }
1315
1316 if (__rep_check_doreq(env, rep) &&
1317 (ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0))
1318 goto err;
1319
1320 /*
1321 * If this is permanent; let the caller know that we have
1322 * not yet written it to disk, but we've accepted it.
1323 */
1324 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
1325 max_lsn = rp->lsn;
1326 ret = DB_REP_NOTPERM;
1327 }
1328 goto done;
1329 } else {
1330 STAT(rep->stat.st_log_duplicated++);
1331 REP_SYSTEM_UNLOCK(env);
1332 if (is_dupp != NULL) {
1333 *is_dupp = 1;
1334 /*
1335 * Could get overwritten by max_lsn later.
1336 * But max_lsn is guaranteed <= ready_lsn, so
1337 * it would be a more conservative LSN to return.
1338 */
1339 if (ret_lsnp != NULL)
1340 *ret_lsnp = lp->ready_lsn;
1341 }
1342 LOGCOPY_32(env, &rectype, rec->data);
1343 if (IS_PERM_RECTYPE(rectype))
1344 max_lsn = lp->max_perm_lsn;
1345 /*
1346 * We check REPCTL_LEASE here, because this client may
1347 * have leases configured but the master may not (especially
1348 * in a mixed version group. If the master has leases
1349 * configured, all clients must also.
1350 */
1351 if (IS_USING_LEASES(env) &&
1352 F_ISSET(rp, REPCTL_LEASE) &&
1353 timespecisset(&msg_time)) {
1354 if (timespeccmp(&msg_time, &lp->max_lease_ts, >))
1355 max_ts = msg_time;
1356 else
1357 max_ts = lp->max_lease_ts;
1358 }
1359 goto done;
1360 }
1361
1362 /* Check if we need to go back into the table. */
1363 if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0)
1364 goto gap_check;
1365
1366 done:
1367 err: /*
1368 * In case of a race, to make sure only one thread can get
1369 * DB_REP_LOGREADY, zero out rep->last_lsn to show that we've gotten to
1370 * this point.
1371 */
1372 REP_SYSTEM_LOCK(env);
1373 if (ret == 0 &&
1374 rep->sync_state == SYNC_LOG &&
1375 !IS_ZERO_LSN(rep->last_lsn) &&
1376 LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) {
1377 *last_lsnp = max_lsn;
1378 ZERO_LSN(rep->last_lsn);
1379 ZERO_LSN(max_lsn);
1380 ret = DB_REP_LOGREADY;
1381 }
1382 /*
1383 * Only decrement if we were actually applying log records.
1384 * We do not care if we processed a dup record or put one
1385 * in the temp db.
1386 */
1387 if (set_apply) {
1388 rep->apply_th--;
1389 VPRINT(env, (env, DB_VERB_REP_MISC,
1390 "rep_apply: Decrement apply_th %d [%lu][%lu]",
1391 rep->apply_th, (u_long)lp->ready_lsn.file,
1392 (u_long)lp->ready_lsn.offset));
1393 }
1394
1395 if (ret == 0 && rep->sync_state != SYNC_LOG &&
1396 !IS_ZERO_LSN(max_lsn)) {
1397 if (ret_lsnp != NULL)
1398 *ret_lsnp = max_lsn;
1399 ret = DB_REP_ISPERM;
1400 DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0);
1401 lp->max_perm_lsn = max_lsn;
1402 if ((t_ret = __rep_notify_threads(env, AWAIT_LSN)) != 0)
1403 ret = t_ret;
1404 }
1405
1406 /*
1407 * Start-up is complete when we process (or have already processed) up
1408 * to the end of the replication group's log. In case we miss that
1409 * message, as a back-up, we also recognize start-up completion when we
1410 * actually process a live log record. Having cmp==0 here (with a good
1411 * "ret" value) implies we actually processed the record.
1412 */
1413 if ((ret == 0 || ret == DB_REP_ISPERM) &&
1414 rep->stat.st_startup_complete == 0 &&
1415 rep->sync_state != SYNC_LOG &&
1416 ((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) ||
1417 (cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) {
1418 rep->stat.st_startup_complete = 1;
1419 event = 1;
1420 gen = rep->gen;
1421 master = rep->master_id;
1422 }
1423 REP_SYSTEM_UNLOCK(env);
1424 /*
1425 * If we've processed beyond the needed LSN for a pending
1426 * start sync, start it now. We must compare > here
1427 * because ready_lsn is the next record we expect and if
1428 * the last record is a commit, that will dirty pages on
1429 * a client as that txn is applied.
1430 */
1431 if (!IS_ZERO_LSN(rep->ckp_lsn) &&
1432 LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) > 0) {
1433 save_lsn = rep->ckp_lsn;
1434 ZERO_LSN(rep->ckp_lsn);
1435 } else
1436 ZERO_LSN(save_lsn);
1437
1438 /*
1439 * If this is a perm record, we are using leases, update the lease
1440 * grant. We must hold the clientdb mutex. We must not hold
1441 * the region mutex because rep_update_grant will acquire it.
1442 */
1443 if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) &&
1444 timespecisset(&max_ts)) {
1445 if ((t_ret = __rep_update_grant(env, &max_ts)) != 0)
1446 ret = t_ret;
1447 else if (timespeccmp(&max_ts, &lp->max_lease_ts, >))
1448 lp->max_lease_ts = max_ts;
1449 }
1450
1451 MUTEX_UNLOCK(env, rep->mtx_clientdb);
1452 if (!IS_ZERO_LSN(save_lsn)) {
1453 /*
1454 * Now call memp_sync holding only the ckp mutex.
1455 */
1456 MUTEX_LOCK(env, rep->mtx_ckp);
1457 RPRINT(env, (env, DB_VERB_REP_MISC,
1458 "Starting delayed __memp_sync call [%lu][%lu]",
1459 (u_long)save_lsn.file, (u_long)save_lsn.offset));
1460 t_ret = __memp_sync(env,
1461 DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &save_lsn);
1462 MUTEX_UNLOCK(env, rep->mtx_ckp);
1463 }
1464 if (event) {
1465 RPRINT(env, (env, DB_VERB_REP_MISC,
1466 "Start-up is done [%lu][%lu]",
1467 (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1468
1469 if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) {
1470 DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM);
1471 /* Failure trumps either of those values. */
1472 ret = t_ret;
1473 goto out;
1474 }
1475 }
1476 if ((ret == 0 || ret == DB_REP_ISPERM) &&
1477 newfile_seen && lp->db_log_autoremove)
1478 __log_autoremove(env);
1479 if (control_dbt.data != NULL)
1480 __os_ufree(env, control_dbt.data);
1481 if (rec_dbt.data != NULL)
1482 __os_ufree(env, rec_dbt.data);
1483
1484 out:
1485 switch (ret) {
1486 case 0:
1487 break;
1488 case DB_REP_ISPERM:
1489 VPRINT(env, (env, DB_VERB_REP_MSGS,
1490 "Returning ISPERM [%lu][%lu], cmp = %d",
1491 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1492 break;
1493 case DB_REP_LOGREADY:
1494 RPRINT(env, (env, DB_VERB_REP_MSGS,
1495 "Returning LOGREADY up to [%lu][%lu], cmp = %d",
1496 (u_long)last_lsnp->file,
1497 (u_long)last_lsnp->offset, cmp));
1498 break;
1499 case DB_REP_NOTPERM:
1500 if (rep->sync_state != SYNC_LOG &&
1501 !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
1502 *ret_lsnp = max_lsn;
1503
1504 VPRINT(env, (env, DB_VERB_REP_MSGS,
1505 "Returning NOTPERM [%lu][%lu], cmp = %d",
1506 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1507 break;
1508 default:
1509 RPRINT(env, (env, DB_VERB_REP_MSGS,
1510 "Returning %d [%lu][%lu], cmp = %d", ret,
1511 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1512 break;
1513 }
1514
1515 return (ret);
1516 }
1517
1518 /*
1519 * __rep_process_txn --
1520 *
1521 * This is the routine that actually applies a transaction's set of updates.
1522 *
1523 * PUBLIC: int __rep_process_txn __P((ENV *, DBT *));
1524 */
1525 int
__rep_process_txn(env,rec)1526 __rep_process_txn(env, rec)
1527 ENV *env;
1528 DBT *rec;
1529 {
1530 DBT data_dbt, *lock_dbt;
1531 DB_LOCKER *locker;
1532 DB_LOCKREQ req, *lvp;
1533 DB_LOGC *logc;
1534 DB_LSN prev_lsn, *lsnp;
1535 DB_REP *db_rep;
1536 DB_THREAD_INFO *ip;
1537 DB_TXNHEAD *txninfo;
1538 DELAYED_BLOB_LIST *dblp, *dummy;
1539 LSN_COLLECTION lc;
1540 REP *rep;
1541 __txn_regop_args *txn_args;
1542 __txn_regop_42_args *txn42_args;
1543 __txn_prepare_args *prep_args;
1544 u_int32_t rectype;
1545 u_int i;
1546 int ret, t_ret;
1547
1548 db_rep = env->rep_handle;
1549 rep = db_rep->region;
1550 logc = NULL;
1551 dblp = dummy = NULL;
1552 txn_args = NULL;
1553 txn42_args = NULL;
1554 prep_args = NULL;
1555 txninfo = NULL;
1556
1557 memset(&data_dbt, 0, sizeof(data_dbt));
1558 if (F_ISSET(env, ENV_THREAD))
1559 F_SET(&data_dbt, DB_DBT_REALLOC);
1560
1561 /*
1562 * There are two phases: First, we have to traverse backwards through
1563 * the log records gathering the list of all LSNs in the transaction.
1564 * Once we have this information, we can loop through and then apply it.
1565 *
1566 * We may be passed a prepare (if we're restoring a prepare on upgrade)
1567 * instead of a commit (the common case). Check which it is and behave
1568 * appropriately.
1569 */
1570 LOGCOPY_32(env, &rectype, rec->data);
1571 memset(&lc, 0, sizeof(lc));
1572 if (rectype == DB___txn_regop) {
1573 /*
1574 * We're the end of a transaction. Make sure this is
1575 * really a commit and not an abort!
1576 */
1577 if ((ret = __txn_regop_read(
1578 env, rec->data, &txn_args)) != 0)
1579 return (ret);
1580 if (txn_args->opcode != TXN_COMMIT) {
1581 __os_free(env, txn_args);
1582 return (0);
1583 }
1584 prev_lsn = txn_args->prev_lsn;
1585 lock_dbt = &txn_args->locks;
1586 } else {
1587 /* We're a prepare. */
1588 DB_ASSERT(env, rectype == DB___txn_prepare);
1589
1590 if ((ret = __txn_prepare_read(
1591 env, rec->data, &prep_args)) != 0)
1592 return (ret);
1593 prev_lsn = prep_args->prev_lsn;
1594 lock_dbt = &prep_args->locks;
1595 }
1596
1597 /* Get locks. */
1598 if ((ret = __lock_id(env, NULL, &locker)) != 0)
1599 goto err1;
1600
1601 /* We are always more important than user transactions. */
1602 locker->priority = DB_LOCK_MAXPRIORITY;
1603
1604 if ((ret =
1605 __lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
1606 goto err;
1607
1608 /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */
1609 if ((ret = __rep_collect_txn(env, &prev_lsn, &lc, &dblp)) != 0)
1610 goto err;
1611
1612 /* Deal with any child transactions that had to be delayed. */
1613 while (dblp != NULL) {
1614 if ((ret = __rep_collect_txn(
1615 env, &dblp->lsn, &lc, &dummy)) != 0)
1616 goto err;
1617 DB_ASSERT(env, dummy == NULL);
1618 dummy = dblp;
1619 dblp = dummy->next;
1620 __os_free(env, dummy);
1621 dummy = NULL;
1622 }
1623 qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
1624
1625 /*
1626 * The set of records for a transaction may include dbreg_register
1627 * records. Create a txnlist so that they can keep track of file
1628 * state between records.
1629 */
1630 ENV_GET_THREAD_INFO(env, ip);
1631 if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0)
1632 goto err;
1633 /* Replication uses a transaction only when client mvcc is active. */
1634 if (F_ISSET(env->dbenv, DB_ENV_MULTIVERSION) && (ret = __txn_begin(env,
1635 ip, NULL, &txninfo->txn, DB_TXN_SNAPSHOT | DB_TXN_DISPATCH)) != 0)
1636 goto err;
1637
1638 /* Phase 2: Apply updates. */
1639 if ((ret = __log_cursor(env, &logc)) != 0)
1640 goto err;
1641 for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
1642 if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
1643 __db_errx(env, DB_STR_A("3522",
1644 "failed to read the log at [%lu][%lu]", "%lu %lu"),
1645 (u_long)lsnp->file, (u_long)lsnp->offset);
1646 goto err;
1647 }
1648 if ((ret = __db_dispatch(env, &env->recover_dtab,
1649 &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) {
1650 __db_err(env, ret, DB_STR_A("3523",
1651 "transaction %x failed at [%lu][%lu]", "%lu %lu"),
1652 txninfo->txn->txnid,
1653 (u_long)lsnp->file, (u_long)lsnp->offset);
1654 goto err;
1655 }
1656 LOGCOPY_32(env, &rectype, data_dbt.data);
1657 }
1658 if (txninfo->txn != NULL) {
1659 ret = __txn_commit(txninfo->txn, 0);
1660 txninfo->txn = NULL;
1661 if (ret != 0) {
1662 __db_errx(env, DB_STR_A("3715", "%lu %lu",
1663 "rep_process_txn [%lu][%lu] failed to commit"),
1664 (u_long)lc.array[lc.nlsns - 1].file,
1665 (u_long)lc.array[lc.nlsns - 1].offset);
1666 goto err;
1667 }
1668 }
1669
1670 err: memset(&req, 0, sizeof(req));
1671 req.op = DB_LOCK_PUT_ALL;
1672 if ((t_ret =
1673 __lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0)
1674 ret = t_ret;
1675
1676 if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0)
1677 ret = t_ret;
1678
1679 while (dblp != NULL) {
1680 dummy = dblp;
1681 dblp = dummy->next;
1682 __os_free(env, dummy);
1683 }
1684
1685 err1: if (txn_args != NULL)
1686 __os_free(env, txn_args);
1687 if (txn42_args != NULL)
1688 __os_free(env, txn42_args);
1689 if (prep_args != NULL)
1690 __os_free(env, prep_args);
1691 if (lc.array != NULL)
1692 __os_free(env, lc.array);
1693
1694 if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0)
1695 ret = t_ret;
1696
1697 if (txninfo != NULL)
1698 __db_txnlist_end(env, txninfo);
1699
1700 if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
1701 __os_ufree(env, data_dbt.data);
1702
1703 #ifdef HAVE_STATISTICS
1704 if (ret == 0)
1705 /*
1706 * We don't hold the rep mutex, and could miscount if we race.
1707 */
1708 rep->stat.st_txns_applied++;
1709 #endif
1710
1711 return (ret);
1712 }
1713
1714 /*
1715 * __rep_collect_txn
1716 * Recursive function that will let us visit every entry in a transaction
1717 * chain including all child transactions so that we can then apply
1718 * the entire transaction family at once.
1719 */
1720 static int
__rep_collect_txn(env,lsnp,lc,dbl)1721 __rep_collect_txn(env, lsnp, lc, dbl)
1722 ENV *env;
1723 DB_LSN *lsnp;
1724 LSN_COLLECTION *lc;
1725 DELAYED_BLOB_LIST **dbl;
1726 {
1727 __dbreg_register_args *dbregargp;
1728 __txn_child_args *argp;
1729 DB_LOGC *logc;
1730 DB_LSN c_lsn;
1731 DB_REP *db_rep;
1732 DBT data;
1733 db_seq_t blob_file_id;
1734 u_int32_t child, rectype, skip_txnid;
1735 u_int nalloc;
1736 int ret, t_ret, view_partial;
1737 char *name;
1738
1739 memset(&data, 0, sizeof(data));
1740 F_SET(&data, DB_DBT_REALLOC);
1741 skip_txnid = TXN_INVALID;
1742
1743 if ((ret = __log_cursor(env, &logc)) != 0)
1744 return (ret);
1745
1746 /*
1747 * For partial replication we assume a certain sequence of
1748 * log records to detect a database create and skip it if
1749 * desired. We are walking backward through the records of
1750 * a single transaction right now.
1751 *
1752 * A create operation is done inside a BDB-owned child txn.
1753 * Nothing else is done within this BDB-owned child txn.
1754 * The last piece of a create operations is the dbreg_register
1755 * log record that records the opening of the file. That
1756 * log record contains the child txnid in the 'id' field, and
1757 * the file name. At this point we invoke the partial callback
1758 * to determine if this database should be replicated. If it
1759 * should not be replicated, we need to avoid collecting the
1760 * entire child txn referenced in the 'id' field.
1761 *
1762 * So if processing the dbreg_register record finds a database
1763 * to skip, we store the child txnid in 'skip_txnid'. We use
1764 * 'skip_txnid' to avoid processing log records or making
1765 * recursive calls for that txnid.
1766 */
1767 while (!IS_ZERO_LSN(*lsnp) &&
1768 (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) {
1769 LOGCOPY_32(env, &rectype, data.data);
1770 if (rectype == DB___txn_child) {
1771 if ((ret = __txn_child_read(
1772 env, data.data, &argp)) != 0)
1773 goto err;
1774 c_lsn = argp->c_lsn;
1775 *lsnp = argp->prev_lsn;
1776 child = argp->child;
1777 __os_free(env, argp);
1778
1779 if (child == skip_txnid && *dbl != NULL &&
1780 (*dbl)->child == child)
1781 (*dbl)->lsn = c_lsn;
1782 /*
1783 * If skip_txnid is set, it is the id of the child txnid
1784 * that creates a database we should skip. So, if
1785 * this is that child txn, do not collect it.
1786 */
1787 if (skip_txnid == TXN_INVALID || child != skip_txnid)
1788 ret = __rep_collect_txn(env, &c_lsn, lc, dbl);
1789 } else if (IS_VIEW_SITE(env) &&
1790 rectype == DB___dbreg_register) {
1791 db_rep = env->rep_handle;
1792 /*
1793 * If we are a view see if this is a file creation
1794 * stream. On-disk files have the creating child txn
1795 * in the 'id' field and the name. See if this view
1796 * wants this file.
1797 */
1798 if ((ret = __dbreg_register_read(
1799 env, data.data, &dbregargp)) != 0)
1800 goto err;
1801 child = dbregargp->id;
1802 name = (char *)dbregargp->name.data;
1803 skip_txnid = TXN_INVALID;
1804 if (child != TXN_INVALID &&
1805 (!IS_DB_FILE(name) || IS_BLOB_META(name))) {
1806 /*
1807 * The 'id' has a child txn so it is a create.
1808 */
1809 DB_ASSERT(env, db_rep->partial != NULL);
1810 GET_LO_HI(env, dbregargp->blob_fid_lo,
1811 dbregargp->blob_fid_hi, blob_file_id, ret);
1812 if (ret != 0)
1813 goto err;
1814 if ((ret = __rep_call_partial(env,
1815 name, &view_partial, 0, dbl)) != 0) {
1816 VPRINT(env, (env, DB_VERB_REP_MISC,
1817 "rep_collect_txn: partial cb err %d for %s", ret, name));
1818 __os_free(env, dbregargp);
1819 goto err;
1820 }
1821 /*
1822 * Save the child txnid for when we walk back
1823 * into the txn_child record.
1824 */
1825 if (view_partial == 0) {
1826 skip_txnid = child;
1827 if ((ret =
1828 __rep_remove_delayed_blobs(env,
1829 blob_file_id, child, dbl)) != 0)
1830 goto err;
1831 }
1832 }
1833 __os_free(env, dbregargp);
1834 }
1835 if (rectype != DB___txn_child) {
1836 if (lc->nalloc < lc->nlsns + 1) {
1837 nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
1838 if ((ret = __os_realloc(env,
1839 nalloc * sizeof(DB_LSN), &lc->array)) != 0)
1840 goto err;
1841 lc->nalloc = nalloc;
1842 }
1843 lc->array[lc->nlsns++] = *lsnp;
1844
1845 /*
1846 * Explicitly copy the previous lsn. The record
1847 * starts with a u_int32_t record type, a u_int32_t
1848 * txn id, and then the DB_LSN (prev_lsn) that we
1849 * want. We copy explicitly because we have no idea
1850 * what kind of record this is.
1851 */
1852 LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data +
1853 sizeof(u_int32_t) + sizeof(u_int32_t));
1854 }
1855
1856 if (ret != 0)
1857 goto err;
1858 }
1859 if (ret != 0)
1860 __db_errx(env, DB_STR_A("3524",
1861 "collect failed at: [%lu][%lu]", "%lu %lu"),
1862 (u_long)lsnp->file, (u_long)lsnp->offset);
1863
1864 err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
1865 ret = t_ret;
1866 if (data.data != NULL)
1867 __os_ufree(env, data.data);
1868 return (ret);
1869 }
1870
1871 /*
1872 * __rep_remove_delayed_blobs --
1873 *
1874 * If a blob meta database is opened in the same transaction as the database
1875 * that owns it, then deciding whether it should be replicated or not needs
1876 * to be delayed until after the rest of the transaction is processed. To do
1877 * this, the transaction's information is added to a DELAYED_BLOB_LIST. When
1878 * the owning database is processed, if it is not replicated then remove the
1879 * entry of its blob meta database from the delayed list.
1880 */
1881 static int
__rep_remove_delayed_blobs(env,blob_file_id,child,dbl)1882 __rep_remove_delayed_blobs(env, blob_file_id, child, dbl)
1883 ENV *env;
1884 db_seq_t blob_file_id;
1885 u_int32_t child;
1886 DELAYED_BLOB_LIST **dbl;
1887 {
1888 DELAYED_BLOB_LIST *ent, *next, *prev;
1889
1890 if (*dbl == NULL)
1891 return (0);
1892
1893 /*
1894 * If the child transaction has not been set, then a new entry was just
1895 * added to the list.
1896 */
1897 if ((*dbl)->child == 0) {
1898 (*dbl)->child = child;
1899 return (0);
1900 }
1901
1902 if (blob_file_id == 0)
1903 return (0);
1904
1905 /*
1906 * This blob meta database should not be replicated if its associated
1907 * database is not replicated. Remove it from the delayed
1908 * list so it will not be processed at a later time.
1909 */
1910 for (ent = *dbl; ent != NULL; ent = (DELAYED_BLOB_LIST *)ent->next) {
1911 if (ent->blob_file_id == blob_file_id && ent->child != child) {
1912 next = (DELAYED_BLOB_LIST *)ent->next;
1913 prev = (DELAYED_BLOB_LIST *)ent->prev;
1914 if (ent == *dbl)
1915 *dbl = next;
1916 if (prev != NULL)
1917 prev->next = ent->next;
1918 if (next != NULL)
1919 next->prev = ent->prev;
1920 __os_free(env, ent);
1921 break;
1922 }
1923 }
1924 return (0);
1925 }
1926
1927 /*
1928 * __rep_lsn_cmp --
1929 * qsort-type-compatible wrapper for LOG_COMPARE.
1930 */
1931 static int
__rep_lsn_cmp(lsn1,lsn2)1932 __rep_lsn_cmp(lsn1, lsn2)
1933 const void *lsn1, *lsn2;
1934 {
1935
1936 return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2));
1937 }
1938
1939 /*
1940 * __rep_newfile --
1941 * NEWFILE messages have the LSN of the last record in the previous
1942 * log file. When applying a NEWFILE message, make sure we haven't already
1943 * swapped files. Assume caller hold mtx_clientdb.
1944 */
1945 static int
__rep_newfile(env,rp,rec)1946 __rep_newfile(env, rp, rec)
1947 ENV *env;
1948 __rep_control_args *rp;
1949 DBT *rec;
1950 {
1951 DB_LOG *dblp;
1952 DB_LSN tmplsn;
1953 DB_REP *db_rep;
1954 LOG *lp;
1955 REP *rep;
1956 __rep_newfile_args nf_args;
1957 int ret;
1958
1959 dblp = env->lg_handle;
1960 lp = dblp->reginfo.primary;
1961 db_rep = env->rep_handle;
1962 rep = db_rep->region;
1963
1964 /*
1965 * If a newfile is already in progress, just ignore.
1966 */
1967 if (F_ISSET(rep, REP_F_NEWFILE))
1968 return (0);
1969 if (rp->lsn.file + 1 > lp->ready_lsn.file) {
1970 if ((ret = __rep_newfile_unmarshal(env, &nf_args,
1971 rec->data, rec->size, NULL)) != 0)
1972 return (ret);
1973 RPRINT(env, (env, DB_VERB_REP_MISC,
1974 "rep_newfile: File %lu vers %lu",
1975 (u_long)rp->lsn.file + 1, (u_long)nf_args.version));
1976
1977 /*
1978 * We drop the mtx_clientdb mutex during
1979 * the file operation, and then reacquire it when
1980 * we're done. We avoid colliding with new incoming
1981 * log records because lp->ready_lsn is not getting
1982 * updated and there is no real log record at this
1983 * ready_lsn. We avoid colliding with a duplicate
1984 * NEWFILE message by setting an in-progress flag.
1985 */
1986 REP_SYSTEM_LOCK(env);
1987 F_SET(rep, REP_F_NEWFILE);
1988 REP_SYSTEM_UNLOCK(env);
1989 MUTEX_UNLOCK(env, rep->mtx_clientdb);
1990 LOG_SYSTEM_LOCK(env);
1991 ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version);
1992 LOG_SYSTEM_UNLOCK(env);
1993 MUTEX_LOCK(env, rep->mtx_clientdb);
1994 REP_SYSTEM_LOCK(env);
1995 F_CLR(rep, REP_F_NEWFILE);
1996 REP_SYSTEM_UNLOCK(env);
1997 if (ret == 0)
1998 lp->ready_lsn = tmplsn;
1999 return (ret);
2000 } else
2001 /* We've already applied this NEWFILE. Just ignore it. */
2002 return (0);
2003 }
2004
2005 /*
2006 * __rep_do_ckp --
2007 * Perform the memp_sync necessary for this checkpoint without holding the
2008 * REP->mtx_clientdb. Callers of this function must hold REP->mtx_clientdb
2009 * and must not be holding the region mutex.
2010 */
2011 static int
__rep_do_ckp(env,rec,rp)2012 __rep_do_ckp(env, rec, rp)
2013 ENV *env;
2014 DBT *rec;
2015 __rep_control_args *rp;
2016 {
2017 DB_ENV *dbenv;
2018 __txn_ckp_args *ckp_args;
2019 DB_LSN ckp_lsn;
2020 REP *rep;
2021 int ret;
2022
2023 dbenv = env->dbenv;
2024
2025 /* Crack the log record and extract the checkpoint LSN. */
2026 if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0)
2027 return (ret);
2028 ckp_lsn = ckp_args->ckp_lsn;
2029 __os_free(env, ckp_args);
2030
2031 rep = env->rep_handle->region;
2032
2033 MUTEX_UNLOCK(env, rep->mtx_clientdb);
2034 DB_TEST_WAIT(env, env->test_check);
2035
2036 /*
2037 * Sync the memory pool.
2038 *
2039 * This is the real PERM lock record/ckp. We cannot return ISPERM
2040 * if we haven't truly completed the checkpoint, so we don't allow
2041 * this call to be interrupted.
2042 *
2043 * We may be overlapping our log record with an in-progress startsync
2044 * of this checkpoint; suppress the max_write settings on any running
2045 * cache-flush operation so it completes quickly.
2046 */
2047 (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1);
2048 MUTEX_LOCK(env, rep->mtx_ckp);
2049 ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn);
2050 MUTEX_UNLOCK(env, rep->mtx_ckp);
2051 (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0);
2052
2053 /* Update the last_ckp in the txn region. */
2054 if (ret == 0)
2055 ret = __txn_updateckp(env, &rp->lsn);
2056 else {
2057 __db_errx(env, DB_STR_A("3525",
2058 "Error syncing ckp [%lu][%lu]", "%lu %lu"),
2059 (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
2060 ret = __env_panic(env, ret);
2061 }
2062
2063 MUTEX_LOCK(env, rep->mtx_clientdb);
2064 #ifdef HAVE_REPLICATION_THREADS
2065 if (ret == 0) {
2066 REP_SYSTEM_LOCK(env);
2067 if (LOG_COMPARE(&ckp_lsn, &rep->last_ckp_lsn) > 0)
2068 rep->last_ckp_lsn = ckp_lsn;
2069 REP_SYSTEM_UNLOCK(env);
2070 }
2071 #endif
2072 return (ret);
2073 }
2074
2075 /*
2076 * __rep_remfirst --
2077 * Remove the first entry from the __db.rep.db
2078 */
2079 static int
__rep_remfirst(env,ip,cntrl,rec)2080 __rep_remfirst(env, ip, cntrl, rec)
2081 ENV *env;
2082 DB_THREAD_INFO *ip;
2083 DBT *cntrl;
2084 DBT *rec;
2085 {
2086 DB *dbp;
2087 DBC *dbc;
2088 DB_REP *db_rep;
2089 int ret, t_ret;
2090
2091 db_rep = env->rep_handle;
2092 dbp = db_rep->rep_db;
2093 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
2094 return (ret);
2095
2096 /* The DBTs need to persist through another call. */
2097 F_SET(cntrl, DB_DBT_REALLOC);
2098 F_SET(rec, DB_DBT_REALLOC);
2099 if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0)
2100 ret = __dbc_del(dbc, 0);
2101 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2102 ret = t_ret;
2103
2104 return (ret);
2105 }
2106
2107 /*
2108 * __rep_getnext --
2109 * Get the next record out of the __db.rep.db table.
2110 */
2111 static int
__rep_getnext(env,ip)2112 __rep_getnext(env, ip)
2113 ENV *env;
2114 DB_THREAD_INFO *ip;
2115 {
2116 DB *dbp;
2117 DBC *dbc;
2118 DBT lsn_dbt, nextrec_dbt;
2119 DB_LOG *dblp;
2120 DB_REP *db_rep;
2121 LOG *lp;
2122 __rep_control_args *rp;
2123 int ret, t_ret;
2124
2125 dblp = env->lg_handle;
2126 lp = dblp->reginfo.primary;
2127
2128 db_rep = env->rep_handle;
2129 dbp = db_rep->rep_db;
2130
2131 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
2132 return (ret);
2133
2134 /*
2135 * Update waiting_lsn. We need to move it
2136 * forward to the LSN of the next record
2137 * in the queue.
2138 *
2139 * If the next item in the database is a log
2140 * record--the common case--we're not
2141 * interested in its contents, just in its LSN.
2142 * Optimize by doing a partial get of the data item.
2143 */
2144 memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
2145 F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
2146 nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
2147
2148 memset(&lsn_dbt, 0, sizeof(lsn_dbt));
2149 ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
2150 if (ret != DB_NOTFOUND && ret != 0)
2151 goto err;
2152
2153 if (ret == DB_NOTFOUND) {
2154 ZERO_LSN(lp->waiting_lsn);
2155 /*
2156 * Whether or not the current record is
2157 * simple, there's no next one, and
2158 * therefore we haven't got anything
2159 * else to do right now. Break out.
2160 */
2161 goto err;
2162 }
2163 rp = (__rep_control_args *)lsn_dbt.data;
2164 lp->waiting_lsn = rp->lsn;
2165
2166 err: if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2167 ret = t_ret;
2168 return (ret);
2169 }
2170
2171 /*
2172 * __rep_process_rec --
2173 *
2174 * Given a record in 'rp', process it. In the case of a NEWFILE, that means
2175 * potentially switching files. In the case of a checkpoint, it means doing
2176 * the checkpoint, and in other cases, it means simply writing the record into
2177 * the log.
2178 */
2179 static int
__rep_process_rec(env,ip,rp,rec,ret_tsp,ret_lsnp)2180 __rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp)
2181 ENV *env;
2182 DB_THREAD_INFO *ip;
2183 __rep_control_args *rp;
2184 DBT *rec;
2185 db_timespec *ret_tsp;
2186 DB_LSN *ret_lsnp;
2187 {
2188 DB *dbp;
2189 DBT control_dbt, key_dbt, rec_dbt;
2190 DB_LOG *dblp;
2191 DB_REP *db_rep;
2192 DB_LOGC *logc;
2193 LOG *lp;
2194 REP *rep;
2195 DB_LSN lsn;
2196 db_timespec msg_time;
2197 u_int32_t rectype, txnid;
2198 int ret, t_ret;
2199
2200 db_rep = env->rep_handle;
2201 rep = db_rep->region;
2202 dblp = env->lg_handle;
2203 lp = dblp->reginfo.primary;
2204 dbp = db_rep->rep_db;
2205 ret = 0;
2206
2207 memset(&rec_dbt, 0, sizeof(rec_dbt));
2208 if (rp->rectype == REP_NEWFILE) {
2209 if ((ret = __rep_newfile(env, rp, rec)) != 0)
2210 return (ret);
2211
2212 /*
2213 * In SYNC_LOG, in case the end-of-log sync point happens to be
2214 * right at the file boundary, we need to make sure ret_lsnp
2215 * points to a real log record, rather than the "dead space" at
2216 * the end of the file that the NEWFILE msg normally points to.
2217 */
2218 if (rep->sync_state == SYNC_LOG) {
2219 if ((ret = __log_cursor(env, &logc)) != 0)
2220 return (ret);
2221 if ((ret = __logc_get(logc,
2222 &lsn, &rec_dbt, DB_LAST)) == 0)
2223 *ret_lsnp = lsn;
2224 if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
2225 ret = t_ret;
2226 }
2227 return (ret);
2228 }
2229
2230 LOGCOPY_32(env, &rectype, rec->data);
2231 memset(&control_dbt, 0, sizeof(control_dbt));
2232 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
2233
2234 /*
2235 * We write all records except for checkpoint records here.
2236 * All non-checkpoint records need to appear in the log before
2237 * we take action upon them (i.e., we enforce write-ahead logging).
2238 * However, we can't write the checkpoint record here until the
2239 * data buffers are actually written to disk, else we are creating
2240 * an invalid log -- one that says all data before a certain point
2241 * has been written to disk.
2242 *
2243 * If two threads are both processing the same checkpoint record
2244 * (because, for example, it was resent and the original finally
2245 * arrived), we handle that below by checking for the existence of
2246 * the log record when we add it to the replication database.
2247 *
2248 * Any log records that arrive while we are processing the checkpoint
2249 * are added to the bookkeeping database because ready_lsn is not yet
2250 * updated to point after the checkpoint record.
2251 */
2252 if (rectype != DB___txn_ckp || rep->sync_state == SYNC_LOG) {
2253 if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0)
2254 return (ret);
2255 STAT(rep->stat.st_log_records++);
2256 if (rep->sync_state == SYNC_LOG) {
2257 *ret_lsnp = rp->lsn;
2258 goto out;
2259 }
2260 }
2261
2262 switch (rectype) {
2263 case DB___dbreg_register:
2264 /*
2265 * DB opens occur in the context of a transaction, so we can
2266 * simply handle them when we process the transaction. Closes,
2267 * checkpoints and other dbreg opcodes are not transaction-
2268 * protected, so we have to handle them here.
2269 *
2270 * It should be unsafe for the master to do a close of a file
2271 * that was opened in an active transaction, so we should be
2272 * guaranteed to get the ordering right.
2273 *
2274 * !!!
2275 * The txn ID is the second 4-byte field of the log record.
2276 * We should really be calling __dbreg_register_read() and
2277 * working from the __dbreg_register_args structure, but this
2278 * is considerably faster and the order of the fields won't
2279 * change.
2280 */
2281 LOGCOPY_32(env, &txnid,
2282 (u_int8_t *)rec->data + sizeof(u_int32_t));
2283 if (txnid == TXN_INVALID)
2284 ret = __db_dispatch(env, &env->recover_dtab,
2285 rec, &rp->lsn, DB_TXN_APPLY, NULL);
2286 break;
2287 case DB___txn_regop:
2288 /*
2289 * If an application is doing app-specific recovery
2290 * and acquires locks while applying a transaction,
2291 * it can deadlock. Any other locks held by this
2292 * thread should have been discarded in the
2293 * __rep_process_txn error path, so if we simply
2294 * retry, we should eventually succeed.
2295 */
2296 do {
2297 ret = 0;
2298 if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
2299 ret = __txn_openfiles(env, ip, NULL, 1);
2300 F_SET(db_rep, DBREP_OPENFILES);
2301 }
2302 if (ret == 0)
2303 ret = __rep_process_txn(env, rec);
2304 } while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED);
2305
2306 /* Now write/flush the log as appropriate. */
2307 if (ret == 0) {
2308 if (F_ISSET(env->dbenv, DB_ENV_TXN_WRITE_NOSYNC))
2309 ret = __log_rep_write(env);
2310 else if (!F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
2311 ret = __log_flush(env, NULL);
2312 }
2313 if (ret != 0) {
2314 __db_errx(env, DB_STR_A("3526",
2315 "Error processing txn [%lu][%lu]", "%lu %lu"),
2316 (u_long)rp->lsn.file, (u_long)rp->lsn.offset);
2317 ret = __env_panic(env, ret);
2318 }
2319 *ret_lsnp = rp->lsn;
2320 break;
2321 case DB___txn_prepare:
2322 ret = __log_flush(env, NULL);
2323 /*
2324 * Save the biggest prepared LSN we've seen.
2325 */
2326 rep->max_prep_lsn = rp->lsn;
2327 VPRINT(env, (env, DB_VERB_REP_MSGS,
2328 "process_rec: prepare at [%lu][%lu]",
2329 (u_long)rep->max_prep_lsn.file,
2330 (u_long)rep->max_prep_lsn.offset));
2331 break;
2332 case DB___txn_ckp:
2333 /*
2334 * We do not want to hold the REP->mtx_clientdb mutex while
2335 * syncing the mpool, so if we get a checkpoint record we are
2336 * supposed to process, add it to the __db.rep.db, do the
2337 * memp_sync and then go back and process it later, when the
2338 * sync has finished. If this record is already in the table,
2339 * then some other thread will process it, so simply return
2340 * REP_NOTPERM.
2341 */
2342 memset(&key_dbt, 0, sizeof(key_dbt));
2343 key_dbt.data = rp;
2344 key_dbt.size = sizeof(*rp);
2345
2346 /*
2347 * We want to put this record into the tmp DB only if
2348 * it doesn't exist, so use DB_NOOVERWRITE.
2349 */
2350 ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
2351 if (ret == DB_KEYEXIST) {
2352 if (ret_lsnp != NULL)
2353 *ret_lsnp = rp->lsn;
2354 ret = DB_REP_NOTPERM;
2355 }
2356 if (ret != 0)
2357 break;
2358
2359 /*
2360 * Now, do the checkpoint. Regardless of
2361 * whether the checkpoint succeeds or not,
2362 * we need to remove the record we just put
2363 * in the temporary database. If the
2364 * checkpoint failed, return an error. We
2365 * will act like we never received the
2366 * checkpoint.
2367 */
2368 if ((ret = __rep_do_ckp(env, rec, rp)) == 0)
2369 ret = __log_rep_put(env, &rp->lsn, rec,
2370 DB_LOG_CHKPNT);
2371 if ((t_ret = __rep_remfirst(env, ip,
2372 &control_dbt, &rec_dbt)) != 0 && ret == 0)
2373 ret = t_ret;
2374 /*
2375 * If we're successful putting the log record in the
2376 * log, flush it for a checkpoint.
2377 */
2378 if (ret == 0) {
2379 *ret_lsnp = rp->lsn;
2380 ret = __log_flush(env, NULL);
2381 if (ret == 0 && lp->db_log_autoremove)
2382 __log_autoremove(env);
2383 }
2384 break;
2385 default:
2386 break;
2387 }
2388
2389 out:
2390 if (ret == 0 && F_ISSET(rp, REPCTL_PERM))
2391 *ret_lsnp = rp->lsn;
2392 if (IS_USING_LEASES(env) &&
2393 F_ISSET(rp, REPCTL_LEASE))
2394 *ret_tsp = msg_time;
2395 /*
2396 * Set ret_lsnp before flushing the log because if the
2397 * flush fails, we've still written the record to the
2398 * log and the LSN has been entered.
2399 */
2400 if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH))
2401 ret = __log_flush(env, NULL);
2402 if (control_dbt.data != NULL)
2403 __os_ufree(env, control_dbt.data);
2404 if (rec_dbt.data != NULL)
2405 __os_ufree(env, rec_dbt.data);
2406
2407 return (ret);
2408 }
2409
2410 /*
2411 * __rep_resend_req --
2412 * We might have dropped a message, we need to resend our request.
2413 * The request we send is dependent on what recovery state we're in.
2414 * The caller holds no locks.
2415 *
2416 * PUBLIC: int __rep_resend_req __P((ENV *, int));
2417 */
2418 int
__rep_resend_req(env,rereq)2419 __rep_resend_req(env, rereq)
2420 ENV *env;
2421 int rereq;
2422 {
2423 DB_LOG *dblp;
2424 DB_LSN lsn, *lsnp;
2425 DB_REP *db_rep;
2426 LOG *lp;
2427 REP *rep;
2428 int blob_sync, master, ret;
2429 repsync_t sync_state;
2430 u_int32_t gapflags, msgtype, repflags, sendflags;
2431
2432 db_rep = env->rep_handle;
2433 rep = db_rep->region;
2434 dblp = env->lg_handle;
2435 lp = dblp->reginfo.primary;
2436 ret = 0;
2437 lsnp = NULL;
2438 msgtype = REP_INVALID;
2439 sendflags = 0;
2440
2441 repflags = rep->flags;
2442 sync_state = rep->sync_state;
2443 blob_sync = rep->blob_sync;
2444 /*
2445 * If we are delayed we do not rerequest anything.
2446 */
2447 if (FLD_ISSET(repflags, REP_F_DELAY))
2448 return (ret);
2449 gapflags = rereq ? REP_GAP_REREQUEST : 0;
2450
2451 if (sync_state == SYNC_VERIFY) {
2452 MUTEX_LOCK(env, rep->mtx_clientdb);
2453 lsn = lp->verify_lsn;
2454 MUTEX_UNLOCK(env, rep->mtx_clientdb);
2455 if (!IS_ZERO_LSN(lsn)) {
2456 msgtype = REP_VERIFY_REQ;
2457 lsnp = &lsn;
2458 sendflags = DB_REP_REREQUEST;
2459 }
2460 } else if (sync_state == SYNC_UPDATE) {
2461 /*
2462 * UPDATE_REQ only goes to the master.
2463 */
2464 msgtype = REP_UPDATE_REQ;
2465 } else if (sync_state == SYNC_PAGE) {
2466 if (blob_sync == 0) {
2467 REP_SYSTEM_LOCK(env);
2468 ret = __rep_pggap_req(env, rep, NULL, gapflags);
2469 REP_SYSTEM_UNLOCK(env);
2470 } else {
2471 MUTEX_LOCK(env, rep->mtx_clientdb);
2472 REP_SYSTEM_LOCK(env);
2473 ret = __rep_blob_rereq(env, rep, gapflags);
2474 REP_SYSTEM_UNLOCK(env);
2475 MUTEX_UNLOCK(env, rep->mtx_clientdb);
2476 }
2477 } else {
2478 MUTEX_LOCK(env, rep->mtx_clientdb);
2479 ret = __rep_loggap_req(env, rep, NULL, gapflags);
2480 MUTEX_UNLOCK(env, rep->mtx_clientdb);
2481 }
2482
2483 if (msgtype != REP_INVALID) {
2484 master = rep->master_id;
2485 if (master == DB_EID_INVALID)
2486 (void)__rep_send_message(env,
2487 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
2488 else
2489 (void)__rep_send_message(env,
2490 master, msgtype, lsnp, NULL, 0, sendflags);
2491 }
2492
2493 return (ret);
2494 }
2495
2496 /*
2497 * __rep_check_doreq --
2498 * PUBLIC: int __rep_check_doreq __P((ENV *, REP *));
2499 *
2500 * Check if we need to send another request. If so, compare with
2501 * the request limits the user might have set. This assumes the
2502 * caller holds the REP->mtx_clientdb mutex. Returns 1 if a request
2503 * needs to be made, and 0 if it does not.
2504 */
2505 int
__rep_check_doreq(env,rep)2506 __rep_check_doreq(env, rep)
2507 ENV *env;
2508 REP *rep;
2509 {
2510
2511 DB_LOG *dblp;
2512 LOG *lp;
2513 db_timespec now;
2514 int req;
2515
2516 dblp = env->lg_handle;
2517 lp = dblp->reginfo.primary;
2518 __os_gettime(env, &now, 1);
2519 timespecsub(&now, &lp->rcvd_ts);
2520 req = timespeccmp(&now, &lp->wait_ts, >=);
2521 if (req) {
2522 /*
2523 * Add wait_ts to itself to double it.
2524 */
2525 timespecadd(&lp->wait_ts, &lp->wait_ts);
2526 if (timespeccmp(&lp->wait_ts, &rep->max_gap, >))
2527 lp->wait_ts = rep->max_gap;
2528 __os_gettime(env, &lp->rcvd_ts, 1);
2529 }
2530 return (req);
2531 }
2532
2533 /*
2534 * __rep_skip_msg -
2535 *
2536 * If we're in recovery we want to skip/ignore the message, but
2537 * we also need to see if we need to re-request any retransmissions.
2538 */
2539 static int
__rep_skip_msg(env,rep,eid,rectype)2540 __rep_skip_msg(env, rep, eid, rectype)
2541 ENV *env;
2542 REP *rep;
2543 int eid;
2544 u_int32_t rectype;
2545 {
2546 int do_req, ret;
2547
2548 ret = 0;
2549 /*
2550 * If we have a request message from a client then immediately
2551 * send a REP_REREQUEST back to that client since we're skipping it.
2552 */
2553 if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype))
2554 do_req = 1;
2555 else {
2556 /* Check for need to retransmit. */
2557 MUTEX_LOCK(env, rep->mtx_clientdb);
2558 do_req = __rep_check_doreq(env, rep);
2559 MUTEX_UNLOCK(env, rep->mtx_clientdb);
2560 }
2561 /*
2562 * Don't respond to a MASTER_REQ with
2563 * a MASTER_REQ or REREQUEST.
2564 */
2565 if (do_req && rectype != REP_MASTER_REQ) {
2566 /*
2567 * There are three cases:
2568 * 1. If we don't know who the master is, then send MASTER_REQ.
2569 * 2. If the message we're skipping came from the master,
2570 * then we need to rerequest.
2571 * 3. If the message didn't come from a master (i.e. client
2572 * to client), then send a rerequest back to the sender so
2573 * the sender can rerequest it elsewhere, if we are a client.
2574 */
2575 if (rep->master_id == DB_EID_INVALID) /* Case 1. */
2576 (void)__rep_send_message(env,
2577 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
2578 else if (eid == rep->master_id) { /* Case 2. */
2579 /*
2580 * When we receive log messages in the SYNC_PAGE stage
2581 * and we decide to rerequest, it often means the pages
2582 * we expect have been dropped. Send a rerequest with
2583 * gapflags for better performance.
2584 */
2585 if ((rectype == REP_LOG || rectype == REP_BULK_LOG ||
2586 rectype == REP_LOG_MORE) &&
2587 rep->sync_state == SYNC_PAGE)
2588 ret = __rep_resend_req(env, 1);
2589 else
2590 ret = __rep_resend_req(env, 0);
2591 } else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */
2592 (void)__rep_send_message(env,
2593 eid, REP_REREQUEST, NULL, NULL, 0, 0);
2594 }
2595 return (ret);
2596 }
2597
2598 /*
2599 * __rep_check_missing --
2600 * PUBLIC: int __rep_check_missing __P((ENV *, u_int32_t, DB_LSN *));
2601 *
2602 * Check for and request any missing client information.
2603 */
2604 int
__rep_check_missing(env,gen,master_perm_lsn)2605 __rep_check_missing(env, gen, master_perm_lsn)
2606 ENV *env;
2607 u_int32_t gen;
2608 DB_LSN *master_perm_lsn;
2609 {
2610 DB_LOG *dblp;
2611 DB_LSN *end_lsn;
2612 DB_REP *db_rep;
2613 LOG *lp;
2614 REGINFO *infop;
2615 REP *rep;
2616 __rep_fileinfo_args *curinfo;
2617 int do_req, has_log_gap, has_page_gap, ret;
2618
2619 db_rep = env->rep_handle;
2620 rep = db_rep->region;
2621 dblp = env->lg_handle;
2622 infop = env->reginfo;
2623 has_log_gap = has_page_gap = ret = 0;
2624
2625 MUTEX_LOCK(env, rep->mtx_clientdb);
2626 REP_SYSTEM_LOCK(env);
2627 /*
2628 * Check if we are okay to proceed with this operation. If not,
2629 * do not rerequest anything.
2630 */
2631 if (!F_ISSET(rep, REP_F_CLIENT) || rep->master_id == DB_EID_INVALID ||
2632 gen != rep->gen || FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG)) {
2633 REP_SYSTEM_UNLOCK(env);
2634 MUTEX_UNLOCK(env, rep->mtx_clientdb);
2635 /*
2636 * If this client is out-of-date, ask the master to identify
2637 * itself so that this client will synchronize with the
2638 * master's later generation.
2639 */
2640 if (gen > rep->gen && __rep_check_doreq(env, rep))
2641 (void)__rep_send_message(env,
2642 DB_EID_BROADCAST, REP_MASTER_REQ,
2643 NULL, NULL, 0, 0);
2644 goto out;
2645 }
2646
2647 /*
2648 * Prevent message lockout by counting ourself here.
2649 * Setting rep->msg_th will prevent a major system
2650 * change, such as a role change or running recovery, from
2651 * occurring before sending out any rerequests.
2652 */
2653 rep->msg_th++;
2654 REP_SYSTEM_UNLOCK(env);
2655
2656 /* Check that it is time to request missing information. */
2657 if ((do_req = __rep_check_doreq(env, rep))) {
2658 /* Check for interior or tail page gap. */
2659 REP_SYSTEM_LOCK(env);
2660 if (rep->sync_state == SYNC_PAGE &&
2661 rep->curinfo_off != INVALID_ROFF) {
2662 GET_CURINFO(rep, infop, curinfo);
2663 has_page_gap =
2664 rep->waiting_pg != PGNO_INVALID ||
2665 rep->ready_pg <= curinfo->max_pgno;
2666 }
2667 REP_SYSTEM_UNLOCK(env);
2668 }
2669 /* Check for interior or tail log gap. */
2670 if (do_req && !has_page_gap) {
2671 lp = dblp->reginfo.primary;
2672 /*
2673 * The LOG_COMPARE test is <= because ready_lsn is
2674 * the next LSN we are expecting but we do not have
2675 * it yet. If the needed LSN is at this LSN, it
2676 * means we are missing the last record we need.
2677 */
2678 if (rep->sync_state == SYNC_LOG)
2679 end_lsn = &rep->last_lsn;
2680 else
2681 end_lsn = master_perm_lsn;
2682 has_log_gap = !IS_ZERO_LSN(lp->waiting_lsn) ||
2683 LOG_COMPARE(&lp->ready_lsn, end_lsn) <= 0;
2684 }
2685 MUTEX_UNLOCK(env, rep->mtx_clientdb);
2686 /*
2687 * If it is time to send a request, only do so if we
2688 * have a log gap or a page gap, or we need to resend an
2689 * UPDATE_REQ or VERIFY_REQ, or we are in SYNC_LOG to keep
2690 * requesting to the current known end of the log.
2691 */
2692 do_req = do_req && (has_log_gap || has_page_gap ||
2693 rep->sync_state == SYNC_LOG ||
2694 rep->sync_state == SYNC_UPDATE ||
2695 rep->sync_state == SYNC_VERIFY);
2696 /*
2697 * Determines request type from current replication
2698 * state and resends request. The request may have
2699 * the DB_REP_ANYWHERE flag enabled if appropriate.
2700 */
2701 if (do_req)
2702 ret = __rep_resend_req(env, 0);
2703
2704 REP_SYSTEM_LOCK(env);
2705 rep->msg_th--;
2706 REP_SYSTEM_UNLOCK(env);
2707
2708 out: return (ret);
2709 }
2710
2711 static int
__rep_fire_newmaster(env,gen,master)2712 __rep_fire_newmaster(env, gen, master)
2713 ENV *env;
2714 u_int32_t gen;
2715 int master;
2716 {
2717 DB_REP *db_rep;
2718 REP *rep;
2719
2720 db_rep = env->rep_handle;
2721 rep = db_rep->region;
2722
2723 REP_EVENT_LOCK(env);
2724 /*
2725 * The firing of this event should be idempotent with respect to a
2726 * particular generation number.
2727 */
2728 if (rep->newmaster_event_gen < gen) {
2729 __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2730 rep->newmaster_event_gen = gen;
2731 }
2732 REP_EVENT_UNLOCK(env);
2733 return (0);
2734 }
2735
2736 static int
__rep_fire_startupdone(env,gen,master)2737 __rep_fire_startupdone(env, gen, master)
2738 ENV *env;
2739 u_int32_t gen;
2740 int master;
2741 {
2742 DB_REP *db_rep;
2743 REP *rep;
2744
2745 db_rep = env->rep_handle;
2746 rep = db_rep->region;
2747
2748 REP_EVENT_LOCK(env);
2749 /*
2750 * Usually NEWMASTER will already have been fired. But if not, fire
2751 * it here now, to ensure the application receives events in the
2752 * expected order.
2753 */
2754 if (rep->newmaster_event_gen < gen) {
2755 __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2756 rep->newmaster_event_gen = gen;
2757 }
2758
2759 /*
2760 * Caller already ensures that it only tries to fire STARTUPDONE once
2761 * per generation. If we did not want to rely on that, we could add a
2762 * simple boolean flag (to the set of data protected by the mtx_event).
2763 * The precise meaning of that flag would be "STARTUPDONE has been fired
2764 * for the generation value stored in `newmaster_event_gen'". Then the
2765 * more accurate test here would be simply to check that flag, and fire
2766 * the event (and set the flag) if it were not already set.
2767 */
2768 if (rep->newmaster_event_gen == gen)
2769 __rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL);
2770 REP_EVENT_UNLOCK(env);
2771 return (0);
2772 }
2773