1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2006, 2013 Oracle and/or its affiliates. All rights reserved.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12
13 typedef int (*HEARTBEAT_ACTION) __P((ENV *));
14
15 static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
16 static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
17 static void check_min_log_file __P((ENV *));
18 static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
19 static int prepare_input __P((ENV *, REPMGR_CONNECTION *));
20 static int process_own_msg __P((ENV *, REPMGR_CONNECTION *));
21 static int process_parameters __P((ENV *,
22 REPMGR_CONNECTION *, char *, u_int, u_int32_t, int, u_int32_t));
23 static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
24 static int record_permlsn __P((ENV *, REPMGR_CONNECTION *));
25 static int __repmgr_call_election __P((ENV *));
26 static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *));
27 static void *__repmgr_connector_thread __P((void *));
28 static int __repmgr_next_timeout __P((ENV *,
29 db_timespec *, HEARTBEAT_ACTION *));
30 static int __repmgr_retry_connections __P((ENV *));
31 static int __repmgr_send_heartbeat __P((ENV *));
32 static int __repmgr_try_one __P((ENV *, int));
33 static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *));
34 static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
35
36 #define ONLY_HANDSHAKE(env, conn) do { \
37 if (conn->msg_type != REPMGR_HANDSHAKE) { \
38 __db_errx(env, DB_STR_A("3613", \
39 "unexpected msg type %d in state %d", "%d %d"), \
40 (int)conn->msg_type, conn->state); \
41 return (DB_REP_UNAVAIL); \
42 } \
43 } while (0)
44
45 /*
46 * PUBLIC: void *__repmgr_select_thread __P((void *));
47 */
48 void *
__repmgr_select_thread(argsp)49 __repmgr_select_thread(argsp)
50 void *argsp;
51 {
52 REPMGR_RUNNABLE *args;
53 ENV *env;
54 int ret;
55
56 args = argsp;
57 env = args->env;
58
59 if ((ret = __repmgr_select_loop(env)) != 0) {
60 __db_err(env, ret, DB_STR("3614", "select loop failed"));
61 (void)__repmgr_thread_failure(env, ret);
62 }
63 return (NULL);
64 }
65
66 /*
67 * PUBLIC: int __repmgr_bow_out __P((ENV *));
68 */
69 int
__repmgr_bow_out(env)70 __repmgr_bow_out(env)
71 ENV *env;
72 {
73 DB_REP *db_rep;
74 int ret;
75
76 db_rep = env->rep_handle;
77 LOCK_MUTEX(db_rep->mutex);
78 ret = __repmgr_stop_threads(env);
79 UNLOCK_MUTEX(db_rep->mutex);
80 DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL);
81 return (ret);
82 }
83
84 /*
85 * PUBLIC: int __repmgr_accept __P((ENV *));
86 */
87 int
__repmgr_accept(env)88 __repmgr_accept(env)
89 ENV *env;
90 {
91 DB_REP *db_rep;
92 REPMGR_CONNECTION *conn;
93 ACCEPT_ADDR siaddr;
94 socklen_t addrlen;
95 socket_t s;
96 int ret;
97
98 db_rep = env->rep_handle;
99 addrlen = sizeof(siaddr);
100 if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
101 &addrlen)) == -1) {
102 /*
103 * Some errors are innocuous and so should be ignored. MSDN
104 * Library documents the Windows ones; the Unix ones are
105 * advocated in Stevens' UNPv1, section 16.6; and Linux
106 * Application Development, p. 416.
107 */
108 switch (ret = net_errno) {
109 #ifdef DB_WIN32
110 case WSAECONNRESET:
111 case WSAEWOULDBLOCK:
112 #else
113 case EINTR:
114 case EWOULDBLOCK:
115 case ECONNABORTED:
116 case ENETDOWN:
117 #ifdef EPROTO
118 case EPROTO:
119 #endif
120 case ENOPROTOOPT:
121 case EHOSTDOWN:
122 #ifdef ENONET
123 case ENONET:
124 #endif
125 case EHOSTUNREACH:
126 case EOPNOTSUPP:
127 case ENETUNREACH:
128 #endif
129 VPRINT(env, (env, DB_VERB_REPMGR_MISC,
130 "accept error %d considered innocuous", ret));
131 return (0);
132 default:
133 __db_err(env, ret, DB_STR("3615", "accept error"));
134 return (ret);
135 }
136 }
137 RPRINT(env, (env, DB_VERB_REPMGR_MISC, "accepted a new connection"));
138
139 if ((ret =
140 __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
141 (void)closesocket(s);
142 return (ret);
143 }
144 if ((ret = __repmgr_set_keepalive(env, conn)) != 0) {
145 (void)__repmgr_destroy_conn(env, conn);
146 return (ret);
147 }
148 if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
149 __db_err(env, ret, DB_STR("3616",
150 "can't set nonblock after accept"));
151 (void)__repmgr_destroy_conn(env, conn);
152 return (ret);
153 }
154
155 /*
156 * We don't yet know which site this connection is coming from. So for
157 * now, put it on the "orphans" list; we'll move it to the appropriate
158 * site struct later when we discover who we're talking with, and what
159 * type of connection it is.
160 */
161 conn->eid = -1;
162 TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
163 conn->ref_count++;
164
165 return (0);
166 }
167
168 /*
169 * Computes how long we should wait for input, in other words how long until we
170 * have to wake up and do something. Returns TRUE if timeout is set; FALSE if
171 * there is nothing to wait for.
172 *
173 * Note that the resulting timeout could be zero; but it can't be negative.
174 *
175 * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *));
176 */
177 int
__repmgr_compute_timeout(env,timeout)178 __repmgr_compute_timeout(env, timeout)
179 ENV *env;
180 db_timespec *timeout;
181 {
182 DB_REP *db_rep;
183 REPMGR_RETRY *retry;
184 db_timespec now, t;
185 int have_timeout;
186
187 db_rep = env->rep_handle;
188
189 /*
190 * There are two factors to consider: are heartbeats in use? and, do we
191 * have any sites with broken connections that we ought to retry?
192 */
193 have_timeout = __repmgr_next_timeout(env, &t, NULL);
194
195 /* List items are in order, so we only have to examine the first one. */
196 if (!TAILQ_EMPTY(&db_rep->retries)) {
197 retry = TAILQ_FIRST(&db_rep->retries);
198 if (have_timeout) {
199 /* Choose earliest timeout deadline. */
200 t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
201 } else {
202 t = retry->time;
203 have_timeout = TRUE;
204 }
205 }
206
207 if (have_timeout) {
208 __os_gettime(env, &now, 1);
209 if (timespeccmp(&now, &t, >=))
210 timespecclear(timeout);
211 else {
212 *timeout = t;
213 timespecsub(timeout, &now);
214 }
215 }
216
217 return (have_timeout);
218 }
219
220 /*
221 * Figures out the next heartbeat-related thing to be done, and when it should
222 * be done. The code is factored this way because this computation needs to be
223 * done both before each select() call, and after (when we're checking for timer
224 * expiration).
225 */
226 static int
__repmgr_next_timeout(env,deadline,action)227 __repmgr_next_timeout(env, deadline, action)
228 ENV *env;
229 db_timespec *deadline;
230 HEARTBEAT_ACTION *action;
231 {
232 DB_REP *db_rep;
233 REP *rep;
234 HEARTBEAT_ACTION my_action;
235 REPMGR_CONNECTION *conn;
236 REPMGR_SITE *master;
237 db_timespec t;
238 u_int32_t version;
239
240 db_rep = env->rep_handle;
241 rep = db_rep->region;
242
243 if (rep->master_id == db_rep->self_eid &&
244 rep->heartbeat_frequency > 0) {
245 t = db_rep->last_bcast;
246 TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency);
247 my_action = __repmgr_send_heartbeat;
248 } else if ((master = __repmgr_connected_master(env)) != NULL &&
249 !IS_SUBORDINATE(db_rep) &&
250 rep->heartbeat_monitor_timeout > 0) {
251 version = 0;
252 if ((conn = master->ref.conn.in) != NULL &&
253 IS_READY_STATE(conn->state))
254 version = conn->version;
255 if ((conn = master->ref.conn.out) != NULL &&
256 IS_READY_STATE(conn->state) &&
257 conn->version > version)
258 version = conn->version;
259 if (version >= HEARTBEAT_MIN_VERSION) {
260 /*
261 * If we have a working connection to a heartbeat-aware
262 * master, let's monitor it. Otherwise there's really
263 * nothing we can do.
264 */
265 t = master->last_rcvd_timestamp;
266 TIMESPEC_ADD_DB_TIMEOUT(&t,
267 rep->heartbeat_monitor_timeout);
268 my_action = __repmgr_call_election;
269 } else
270 return (FALSE);
271 } else
272 return (FALSE);
273
274 *deadline = t;
275 if (action != NULL)
276 *action = my_action;
277 return (TRUE);
278 }
279
280 /*
281 * Sends a heartbeat message.
282 *
283 * repmgr also uses the heartbeat facility to manage rerequests. We
284 * send the master's current generation and max_perm_lsn with the heartbeat
285 * message to help a client determine whether it has all master transactions.
286 * When a client receives a heartbeat message, it also checks whether it
287 * needs to rerequest anything. Note that heartbeats must be enabled for
288 * this rerequest processing to occur.
289 */
290 static int
__repmgr_send_heartbeat(env)291 __repmgr_send_heartbeat(env)
292 ENV *env;
293 {
294 DB_REP *db_rep;
295 REP *rep;
296 DBT control, rec;
297 __repmgr_permlsn_args permlsn;
298 u_int8_t buf[__REPMGR_PERMLSN_SIZE];
299 u_int unused1, unused2;
300 int ret, unused3;
301
302 db_rep = env->rep_handle;
303 rep = db_rep->region;
304
305 permlsn.generation = rep->gen;
306 if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0)
307 return (ret);
308 __repmgr_permlsn_marshal(env, &permlsn, buf);
309 control.data = buf;
310 control.size = __REPMGR_PERMLSN_SIZE;
311
312 DB_INIT_DBT(rec, NULL, 0);
313 return (__repmgr_send_broadcast(env,
314 REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3));
315 }
316
317 /*
318 * PUBLIC: REPMGR_SITE *__repmgr_connected_master __P((ENV *));
319 */
320 REPMGR_SITE *
__repmgr_connected_master(env)321 __repmgr_connected_master(env)
322 ENV *env;
323 {
324 DB_REP *db_rep;
325 REPMGR_SITE *master;
326 int master_id;
327
328 db_rep = env->rep_handle;
329 master_id = db_rep->region->master_id;
330
331 if (!IS_KNOWN_REMOTE_SITE(master_id))
332 return (NULL);
333 master = SITE_FROM_EID(master_id);
334 if (master->state == SITE_CONNECTED)
335 return (master);
336 return (NULL);
337 }
338
339 static int
__repmgr_call_election(env)340 __repmgr_call_election(env)
341 ENV *env;
342 {
343 REPMGR_CONNECTION *conn;
344 REPMGR_SITE *master;
345 int ret;
346
347 master = __repmgr_connected_master(env);
348 if (master == NULL)
349 return (0);
350 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
351 "heartbeat monitor timeout expired"));
352 STAT(env->rep_handle->region->mstat.st_connection_drop++);
353 if ((conn = master->ref.conn.in) != NULL &&
354 (ret = __repmgr_bust_connection(env, conn)) != 0)
355 return (ret);
356 if ((conn = master->ref.conn.out) != NULL &&
357 (ret = __repmgr_bust_connection(env, conn)) != 0)
358 return (ret);
359 return (0);
360 }
361
362 /*
363 * PUBLIC: int __repmgr_check_timeouts __P((ENV *));
364 *
365 * !!!
366 * Assumes caller holds the mutex.
367 */
368 int
__repmgr_check_timeouts(env)369 __repmgr_check_timeouts(env)
370 ENV *env;
371 {
372 db_timespec when, now;
373 HEARTBEAT_ACTION action;
374 int ret;
375
376 /*
377 * Figure out the next heartbeat-related thing to be done. Then, if
378 * it's time to do it, do so.
379 */
380 if (__repmgr_next_timeout(env, &when, &action)) {
381 __os_gettime(env, &now, 1);
382 if (timespeccmp(&when, &now, <=) &&
383 (ret = (*action)(env)) != 0)
384 return (ret);
385 }
386
387 return (__repmgr_retry_connections(env));
388 }
389
390 /*
391 * Initiates connection attempts for any sites on the idle list whose retry
392 * times have expired.
393 */
394 static int
__repmgr_retry_connections(env)395 __repmgr_retry_connections(env)
396 ENV *env;
397 {
398 DB_REP *db_rep;
399 REPMGR_SITE *site;
400 REPMGR_RETRY *retry;
401 db_timespec now;
402 int eid, ret;
403
404 db_rep = env->rep_handle;
405 __os_gettime(env, &now, 1);
406
407 while (!TAILQ_EMPTY(&db_rep->retries)) {
408 retry = TAILQ_FIRST(&db_rep->retries);
409 if (timespeccmp(&retry->time, &now, >=))
410 break; /* since items are in time order */
411
412 TAILQ_REMOVE(&db_rep->retries, retry, entries);
413
414 eid = retry->eid;
415 __os_free(env, retry);
416 DB_ASSERT(env, IS_VALID_EID(eid));
417 site = SITE_FROM_EID(eid);
418 DB_ASSERT(env, site->state == SITE_PAUSING);
419
420 if (site->membership == SITE_PRESENT) {
421 if ((ret = __repmgr_try_one(env, eid)) != 0)
422 return (ret);
423 } else
424 site->state = SITE_IDLE;
425 }
426 return (0);
427 }
428
429 /*
430 * PUBLIC: int __repmgr_first_try_connections __P((ENV *));
431 *
432 * !!!
433 * Assumes caller holds the mutex.
434 */
435 int
__repmgr_first_try_connections(env)436 __repmgr_first_try_connections(env)
437 ENV *env;
438 {
439 DB_REP *db_rep;
440 REPMGR_SITE *site;
441 int eid, ret;
442
443 db_rep = env->rep_handle;
444 FOR_EACH_REMOTE_SITE_INDEX(eid) {
445 site = SITE_FROM_EID(eid);
446 /*
447 * Normally all sites would be IDLE here. But if a user thread
448 * triggered an auto-start in a subordinate process, our send()
449 * function may have found new sites when it sync'ed site
450 * addresses, and that action causes connection attempts to be
451 * scheduled (resulting in PAUSING state here, or conceivably
452 * even CONNECTING or CONNECTED).
453 */
454 if (site->state == SITE_IDLE &&
455 site->membership == SITE_PRESENT &&
456 (ret = __repmgr_try_one(env, eid)) != 0)
457 return (ret);
458 }
459 return (0);
460 }
461
462 /*
463 * Starts a thread to open a connection to the site at the given EID.
464 */
465 static int
__repmgr_try_one(env,eid)466 __repmgr_try_one(env, eid)
467 ENV *env;
468 int eid;
469 {
470 DB_REP *db_rep;
471 REPMGR_SITE *site;
472 REPMGR_RUNNABLE *th;
473 int ret;
474
475 db_rep = env->rep_handle;
476 DB_ASSERT(env, IS_VALID_EID(eid));
477 site = SITE_FROM_EID(eid);
478 th = site->connector;
479 if (th == NULL) {
480 if ((ret = __os_malloc(env, sizeof(REPMGR_RUNNABLE), &th)) != 0)
481 return (ret);
482 site->connector = th;
483 } else if (th->finished) {
484 if ((ret = __repmgr_thread_join(th)) != 0)
485 return (ret);
486 } else {
487 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
488 "eid %lu previous connector thread still running; will retry",
489 (u_long)eid));
490 return (__repmgr_schedule_connection_attempt(env,
491 eid, FALSE));
492 }
493
494 site->state = SITE_CONNECTING;
495
496 th->run = __repmgr_connector_thread;
497 th->args.eid = eid;
498 if ((ret = __repmgr_thread_start(env, th)) != 0) {
499 __os_free(env, th);
500 site->connector = NULL;
501 }
502 return (ret);
503 }
504
505 static void *
__repmgr_connector_thread(argsp)506 __repmgr_connector_thread(argsp)
507 void *argsp;
508 {
509 REPMGR_RUNNABLE *th;
510 ENV *env;
511 int ret;
512
513 th = argsp;
514 env = th->env;
515
516 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
517 "starting connector thread, eid %u", th->args.eid));
518 if ((ret = __repmgr_connector_main(env, th)) != 0) {
519 __db_err(env, ret, DB_STR("3617", "connector thread failed"));
520 (void)__repmgr_thread_failure(env, ret);
521 }
522 RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting"));
523
524 th->finished = TRUE;
525 return (NULL);
526 }
527
528 static int
__repmgr_connector_main(env,th)529 __repmgr_connector_main(env, th)
530 ENV *env;
531 REPMGR_RUNNABLE *th;
532 {
533 DB_REP *db_rep;
534 REPMGR_SITE *site;
535 REPMGR_CONNECTION *conn;
536 DB_REPMGR_CONN_ERR info;
537 repmgr_netaddr_t netaddr;
538 SITE_STRING_BUFFER site_string;
539 int err, ret, t_ret;
540
541 db_rep = env->rep_handle;
542 ret = 0;
543
544 LOCK_MUTEX(db_rep->mutex);
545 DB_ASSERT(env, IS_VALID_EID(th->args.eid));
546 site = SITE_FROM_EID(th->args.eid);
547 if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped)
548 goto unlock;
549
550 /*
551 * Drop the mutex during operations that could block. During those
552 * times, the site struct could move (if we had to grow the sites
553 * array), but host wouldn't.
554 *
555 * Also, during those times we might receive an incoming connection from
556 * the site, which would change its state. So, check state each time we
557 * reacquire the mutex, and quit if the state of the world changed while
558 * we were away.
559 */
560 netaddr = site->net_addr;
561 RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connecting to %s",
562 __repmgr_format_site_loc(site, site_string)));
563 UNLOCK_MUTEX(db_rep->mutex);
564
565 if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) {
566 DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &th->args.eid);
567 LOCK_MUTEX(db_rep->mutex);
568 if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
569 __db_err(env, ret, DB_STR("3618",
570 "set_nonblock in connnect thread"));
571 goto cleanup;
572 }
573 conn->type = REP_CONNECTION;
574 site = SITE_FROM_EID(th->args.eid);
575 if (site->state != SITE_CONNECTING ||
576 db_rep->repmgr_status == stopped)
577 goto cleanup;
578
579 conn->eid = th->args.eid;
580 site = SITE_FROM_EID(th->args.eid);
581 site->ref.conn.out = conn;
582 site->state = SITE_CONNECTED;
583 __os_gettime(env, &site->last_rcvd_timestamp, 1);
584 ret = __repmgr_wake_main_thread(env);
585 } else if (ret == DB_REP_UNAVAIL) {
586 /* Retryable error while trying to connect: retry later. */
587 info.eid = th->args.eid;
588 info.error = err;
589 DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info);
590 STAT(db_rep->region->mstat.st_connect_fail++);
591
592 LOCK_MUTEX(db_rep->mutex);
593 site = SITE_FROM_EID(th->args.eid);
594 if (site->state != SITE_CONNECTING ||
595 db_rep->repmgr_status == stopped) {
596 ret = 0;
597 goto unlock;
598 }
599 ret = __repmgr_schedule_connection_attempt(env,
600 th->args.eid, FALSE);
601 } else
602 goto out;
603
604 if (0) {
605 cleanup:
606 if ((t_ret = __repmgr_destroy_conn(env, conn)) != 0 &&
607 ret == 0)
608 ret = t_ret;
609 }
610
611 unlock:
612 UNLOCK_MUTEX(db_rep->mutex);
613 out:
614 return (ret);
615 }
616
617 /*
618 * PUBLIC: int __repmgr_send_v1_handshake __P((ENV *,
619 * PUBLIC: REPMGR_CONNECTION *, void *, size_t));
620 */
621 int
__repmgr_send_v1_handshake(env,conn,buf,len)622 __repmgr_send_v1_handshake(env, conn, buf, len)
623 ENV *env;
624 REPMGR_CONNECTION *conn;
625 void *buf;
626 size_t len;
627 {
628 DB_REP *db_rep;
629 REP *rep;
630 repmgr_netaddr_t *my_addr;
631 DB_REPMGR_V1_HANDSHAKE buffer;
632 DBT cntrl, rec;
633
634 db_rep = env->rep_handle;
635 rep = db_rep->region;
636 my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
637
638 /*
639 * We're about to send from a structure that has padding holes in it.
640 * Initializing it keeps Valgrind happy, plus we really shouldn't be
641 * sending out random garbage anyway (pro forma privacy issue).
642 */
643 memset(&buffer, 0, sizeof(buffer));
644 buffer.version = 1;
645 buffer.priority = htonl(rep->priority);
646 buffer.port = my_addr->port;
647 cntrl.data = &buffer;
648 cntrl.size = sizeof(buffer);
649
650 rec.data = buf;
651 rec.size = (u_int32_t)len;
652
653 /*
654 * It would of course be disastrous to block the select() thread, so
655 * pass the "maxblock" argument as 0. Fortunately blocking should
656 * never be necessary here, because the hand-shake is always the first
657 * thing we send. Which is a good thing, because it would be almost as
658 * disastrous if we allowed ourselves to drop a handshake.
659 */
660 return (__repmgr_send_one(env,
661 conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0));
662 }
663
664 /*
665 * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *));
666 *
667 * !!!
668 * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
669 */
670 int
__repmgr_read_from_site(env,conn)671 __repmgr_read_from_site(env, conn)
672 ENV *env;
673 REPMGR_CONNECTION *conn;
674 {
675 DB_REP *db_rep;
676 REPMGR_SITE *site;
677 int ret;
678
679 db_rep = env->rep_handle;
680
681 /*
682 * Loop, just in case we get EINTR and need to restart the I/O. (All
683 * other branches return.)
684 */
685 for (;;) {
686 switch ((ret = __repmgr_read_conn(conn))) {
687 #ifndef DB_WIN32
688 case EINTR:
689 continue;
690 #endif
691
692 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
693 case DB_REPMGR_EAGAIN:
694 #endif
695 case WOULDBLOCK:
696 return (0);
697
698 case DB_REP_UNAVAIL:
699 /* Error 0 is understood to mean EOF. */
700 __repmgr_fire_conn_err_event(env, conn, 0);
701 STAT(env->rep_handle->
702 region->mstat.st_connection_drop++);
703 return (DB_REP_UNAVAIL);
704
705 case 0:
706 if (IS_VALID_EID(conn->eid)) {
707 site = SITE_FROM_EID(conn->eid);
708 __os_gettime(env,
709 &site->last_rcvd_timestamp, 1);
710 }
711 return (conn->reading_phase == SIZES_PHASE ?
712 prepare_input(env, conn) :
713 dispatch_msgin(env, conn));
714
715 default:
716 #ifdef EBADF
717 DB_ASSERT(env, ret != EBADF);
718 #endif
719 __repmgr_fire_conn_err_event(env, conn, ret);
720 STAT(db_rep->region->mstat.st_connection_drop++);
721 return (DB_REP_UNAVAIL);
722 }
723 }
724 }
725
726 /*
727 * Reads in the current input phase, as defined by the connection's IOVECS
728 * struct.
729 *
730 * Returns DB_REP_UNAVAIL for EOF.
731 *
732 * Makes no assumption about synchronization: it's up to the caller to hold
733 * mutex if necessary.
734 *
735 * PUBLIC: int __repmgr_read_conn __P((REPMGR_CONNECTION *));
736 */
737 int
__repmgr_read_conn(conn)738 __repmgr_read_conn(conn)
739 REPMGR_CONNECTION *conn;
740 {
741 size_t nr;
742 int ret;
743
744 /*
745 * Keep reading pieces as long as we're making some progress, or until
746 * we complete the current read phase as defined in iovecs.
747 */
748 for (;;) {
749 if ((ret = __repmgr_readv(conn->fd,
750 &conn->iovecs.vectors[conn->iovecs.offset],
751 conn->iovecs.count - conn->iovecs.offset, &nr)) != 0)
752 return (ret);
753
754 if (nr == 0)
755 return (DB_REP_UNAVAIL);
756
757 if (__repmgr_update_consumed(&conn->iovecs, nr)) {
758 /* We've fully read as much as we wanted. */
759 return (0);
760 }
761 }
762 }
763
764 /*
765 * Having finished reading the 9-byte message header, figure out what kind of
766 * message we're about to receive, and prepare input buffers accordingly. The
767 * header includes enough information for us to figure out how much buffer space
768 * we need to allocate (though in some cases we need to do a bit of computation
769 * to arrive at the answer).
770 *
771 * Caller must hold mutex.
772 */
773 static int
prepare_input(env,conn)774 prepare_input(env, conn)
775 ENV *env;
776 REPMGR_CONNECTION *conn;
777 {
778 #define MEM_ALIGN sizeof(double)
779 DBT *dbt;
780 __repmgr_msg_hdr_args msg_hdr;
781 REPMGR_RESPONSE *resp;
782 u_int32_t control_size, rec_size, size;
783 size_t memsize, control_offset, rec_offset;
784 void *membase;
785 int ret, skip;
786
787 DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
788
789 /*
790 * We can only get here after having read the full 9 bytes that we
791 * expect, so this can't fail.
792 */
793 ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
794 conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
795 DB_ASSERT(env, ret == 0);
796
797 __repmgr_iovec_init(&conn->iovecs);
798 skip = FALSE;
799
800 switch ((conn->msg_type = msg_hdr.type)) {
801 case REPMGR_HEARTBEAT:
802 /*
803 * The underlying byte-receiving mechanism will already have
804 * noted the fact that we got some traffic on this connection,
805 * which is all that is needed to monitor the heartbeat. But
806 * we also put the heartbeat message on the message queue so
807 * that it will perform rerequest processing.
808 */
809 case REPMGR_REP_MESSAGE:
810 env->rep_handle->seen_repmsg = TRUE;
811 control_size = REP_MSG_CONTROL_SIZE(msg_hdr);
812 rec_size = REP_MSG_REC_SIZE(msg_hdr);
813 if (control_size == 0) {
814 if (conn->msg_type == REPMGR_HEARTBEAT) {
815 /*
816 * Got an old-style heartbeat without payload,
817 * nothing to do.
818 */
819 skip = TRUE;
820 break;
821 } else {
822 __db_errx(env, DB_STR("3619",
823 "illegal size for rep msg"));
824 return (DB_REP_UNAVAIL);
825 }
826 }
827 /*
828 * Allocate a block of memory large enough to hold a
829 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
830 * data areas that it points to. Start by calculating
831 * the total memory needed.
832 */
833 memsize = DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
834 control_offset = memsize;
835 memsize += control_size;
836 if (rec_size > 0) {
837 memsize = DB_ALIGN(memsize, MEM_ALIGN);
838 rec_offset = memsize;
839 memsize += rec_size;
840 } else
841 COMPQUIET(rec_offset, 0);
842 if ((ret = __os_malloc(env, memsize, &membase)) != 0)
843 return (ret);
844 conn->input.rep_message = membase;
845 conn->input.rep_message->msg_hdr = msg_hdr;
846 conn->input.rep_message->v.repmsg.originating_eid = conn->eid;
847
848 DB_INIT_DBT(conn->input.rep_message->v.repmsg.control,
849 (u_int8_t*)membase + control_offset, control_size);
850 __repmgr_add_dbt(&conn->iovecs,
851 &conn->input.rep_message->v.repmsg.control);
852
853 if (rec_size > 0) {
854 DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
855 (rec_size > 0 ?
856 (u_int8_t*)membase + rec_offset : NULL),
857 rec_size);
858 __repmgr_add_dbt(&conn->iovecs,
859 &conn->input.rep_message->v.repmsg.rec);
860 } else
861 DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
862 NULL, 0);
863 break;
864
865 case REPMGR_APP_MESSAGE:
866 /*
867 * We need a buffer big enough to hold the REPMGR_MESSAGE struct
868 * and the data that we expect to receive on the wire. We must
869 * extend the struct size for the variable-length DBT array at
870 * the end.
871 */
872 size = DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) +
873 APP_MSG_SEGMENT_COUNT(msg_hdr) * sizeof(DBT)),
874 MEM_ALIGN);
875 memsize = size + APP_MSG_BUFFER_SIZE(msg_hdr);
876 if ((ret = __os_malloc(env, memsize, &membase)) != 0)
877 return (ret);
878 conn->input.rep_message = membase;
879 conn->input.rep_message->msg_hdr = msg_hdr;
880 conn->input.rep_message->v.appmsg.conn = conn;
881
882 DB_INIT_DBT(conn->input.rep_message->v.appmsg.buf,
883 (u_int8_t*)membase + size,
884 APP_MSG_BUFFER_SIZE(msg_hdr));
885 __repmgr_add_dbt(&conn->iovecs,
886 &conn->input.rep_message->v.appmsg.buf);
887 break;
888
889 case REPMGR_OWN_MSG:
890 size = sizeof(REPMGR_MESSAGE) + REPMGR_OWN_BUF_SIZE(msg_hdr);
891 if ((ret = __os_malloc(env, size, &membase)) != 0)
892 return (ret);
893 conn->input.rep_message = membase;
894 conn->input.rep_message->msg_hdr = msg_hdr;
895
896 /*
897 * Save "conn" pointer in case this turns out to be a one-shot
898 * request. If it isn't, it won't matter.
899 */
900 /*
901 * An OWN msg that arrives in PARAMETERS state has bypassed the
902 * final handshake, implying that this connection is to be used
903 * for a one-shot GMDB request.
904 */
905 if (REPMGR_OWN_BUF_SIZE(msg_hdr) == 0) {
906 __db_errx(env, DB_STR_A("3680",
907 "invalid own buf size %lu in prepare_input", "%lu"),
908 (u_long)REPMGR_OWN_BUF_SIZE(msg_hdr));
909 return (DB_REP_UNAVAIL);
910 }
911 DB_INIT_DBT(conn->input.rep_message->v.gmdb_msg.request,
912 (u_int8_t*)membase + sizeof(REPMGR_MESSAGE),
913 REPMGR_OWN_BUF_SIZE(msg_hdr));
914 __repmgr_add_dbt(&conn->iovecs,
915 &conn->input.rep_message->v.gmdb_msg.request);
916 break;
917
918 case REPMGR_APP_RESPONSE:
919 size = APP_RESP_BUFFER_SIZE(msg_hdr);
920 conn->cur_resp = APP_RESP_TAG(msg_hdr);
921 if (conn->cur_resp >= conn->aresp) {
922 __db_errx(env, DB_STR_A("3681",
923 "invalid cur resp %lu in prepare_input", "%lu"),
924 (u_long)conn->cur_resp);
925 return (DB_REP_UNAVAIL);
926 }
927 resp = &conn->responses[conn->cur_resp];
928 DB_ASSERT(env, F_ISSET(resp, RESP_IN_USE));
929
930 dbt = &resp->dbt;
931
932 /*
933 * Prepare to read message body into either the user-supplied
934 * buffer, or one we allocate here.
935 */
936 ret = 0;
937 if (!F_ISSET(resp, RESP_THREAD_WAITING)) {
938 /* Caller already timed out; allocate dummy buffer. */
939 if (size > 0) {
940 memset(dbt, 0, sizeof(*dbt));
941 ret = __os_malloc(env, size, &dbt->data);
942 F_SET(resp, RESP_DUMMY_BUF);
943 } else
944 F_CLR(resp, RESP_IN_USE);
945 } else if (F_ISSET(dbt, DB_DBT_MALLOC))
946 ret = __os_umalloc(env, size, &dbt->data);
947 else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
948 if (dbt->data == NULL || dbt->size < size)
949 ret = __os_urealloc(env, size, &dbt->data);
950 } else if (F_ISSET(dbt, DB_DBT_USERMEM)) {
951 /* Recipient should have checked size limit. */
952 DB_ASSERT(env, size <= dbt->ulen);
953 }
954 dbt->size = size;
955 if (ret != 0)
956 return (ret);
957
958 if (size > 0) {
959 __repmgr_add_dbt(&conn->iovecs, dbt);
960 F_SET(resp, RESP_READING);
961 } else {
962 skip = TRUE;
963 if (F_ISSET(resp, RESP_THREAD_WAITING)) {
964 F_SET(resp, RESP_COMPLETE);
965 if ((ret = __repmgr_wake_waiters(env,
966 &conn->response_waiters)) != 0)
967 return (ret);
968 }
969 }
970 break;
971
972 case REPMGR_RESP_ERROR:
973 DB_ASSERT(env, RESP_ERROR_TAG(msg_hdr) < conn->aresp &&
974 conn->responses != NULL);
975 resp = &conn->responses[RESP_ERROR_TAG(msg_hdr)];
976 DB_ASSERT(env, !F_ISSET(resp, RESP_READING));
977 if (F_ISSET(resp, RESP_THREAD_WAITING)) {
978 F_SET(resp, RESP_COMPLETE);
979
980 /*
981 * DB errors are always negative, but we only send
982 * unsigned values on the wire.
983 */
984 resp->ret = -((int)RESP_ERROR_CODE(msg_hdr));
985 if ((ret = __repmgr_wake_waiters(env,
986 &conn->response_waiters)) != 0)
987 return (ret);
988 } else
989 F_CLR(resp, RESP_IN_USE);
990 skip = TRUE;
991 break;
992
993 case REPMGR_HANDSHAKE:
994 case REPMGR_PERMLSN:
995 if ((ret = __repmgr_prepare_simple_input(env,
996 conn, &msg_hdr)) != 0)
997 return (ret);
998 break;
999
1000 default:
1001 __db_errx(env, DB_STR_A("3676",
1002 "unexpected msg type %lu in prepare_input", "%lu"),
1003 (u_long)conn->msg_type);
1004 return (DB_REP_UNAVAIL);
1005 }
1006
1007 if (skip) {
1008 /*
1009 * We can skip the DATA_PHASE, because the current message type
1010 * only has a header, no following data.
1011 */
1012 __repmgr_reset_for_reading(conn);
1013 } else
1014 conn->reading_phase = DATA_PHASE;
1015
1016 return (0);
1017 }
1018
1019 /*
1020 * PUBLIC: int __repmgr_prepare_simple_input __P((ENV *,
1021 * PUBLIC: REPMGR_CONNECTION *, __repmgr_msg_hdr_args *));
1022 */
1023 int
__repmgr_prepare_simple_input(env,conn,msg_hdr)1024 __repmgr_prepare_simple_input(env, conn, msg_hdr)
1025 ENV *env;
1026 REPMGR_CONNECTION *conn;
1027 __repmgr_msg_hdr_args *msg_hdr;
1028 {
1029 DBT *dbt;
1030 u_int32_t control_size, rec_size;
1031 int ret;
1032
1033 control_size = REP_MSG_CONTROL_SIZE(*msg_hdr);
1034 rec_size = REP_MSG_REC_SIZE(*msg_hdr);
1035
1036 dbt = &conn->input.repmgr_msg.cntrl;
1037 if ((dbt->size = control_size) > 0) {
1038 if ((ret = __os_malloc(env,
1039 dbt->size, &dbt->data)) != 0)
1040 return (ret);
1041 __repmgr_add_dbt(&conn->iovecs, dbt);
1042 }
1043
1044 dbt = &conn->input.repmgr_msg.rec;
1045 if ((dbt->size = rec_size) > 0) {
1046 if ((ret = __os_malloc(env,
1047 dbt->size, &dbt->data)) != 0) {
1048 dbt = &conn->input.repmgr_msg.cntrl;
1049 if (dbt->size > 0)
1050 __os_free(env, dbt->data);
1051 return (ret);
1052 }
1053 __repmgr_add_dbt(&conn->iovecs, dbt);
1054 }
1055 return (0);
1056 }
1057
1058 /*
1059 * Processes an incoming message, depending on our current state.
1060 *
1061 * Caller must hold mutex.
1062 */
1063 static int
dispatch_msgin(env,conn)1064 dispatch_msgin(env, conn)
1065 ENV *env;
1066 REPMGR_CONNECTION *conn;
1067 {
1068 DB_REP *db_rep;
1069 REPMGR_SITE *site;
1070 REPMGR_RUNNABLE *th;
1071 REPMGR_RESPONSE *resp;
1072 DBT *dbt;
1073 char *hostname;
1074 int eid, ret;
1075
1076 DB_ASSERT(env, conn->reading_phase == DATA_PHASE);
1077 db_rep = env->rep_handle;
1078
1079 switch (conn->state) {
1080 case CONN_CONNECTED:
1081 /*
1082 * In this state, we know we're working with an outgoing
1083 * connection. We've sent a version proposal, and now expect
1084 * the response (which could be a dumb old V1 handshake).
1085 */
1086 ONLY_HANDSHAKE(env, conn);
1087
1088 /*
1089 * Here is a good opportunity to clean up this site's connector
1090 * thread, because we generally come through here after making
1091 * an outgoing connection, yet we're out of the main loop, so we
1092 * don't hit this often.
1093 */
1094 eid = conn->eid;
1095 DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1096 site = SITE_FROM_EID(eid);
1097 th = site->connector;
1098 if (th != NULL && th->finished) {
1099 if ((ret = __repmgr_thread_join(th)) != 0)
1100 return (ret);
1101 __os_free(env, th);
1102 site->connector = NULL;
1103 }
1104
1105 if ((ret = read_version_response(env, conn)) != 0)
1106 return (ret);
1107 break;
1108
1109 case CONN_NEGOTIATE:
1110 /*
1111 * Since we're in this state, we know we're working with an
1112 * incoming connection, and this is the first message we've
1113 * received. So it must be a version negotiation proposal (or a
1114 * legacy V1 handshake). (We'll verify this of course.)
1115 */
1116 ONLY_HANDSHAKE(env, conn);
1117 if ((ret = send_version_response(env, conn)) != 0)
1118 return (ret);
1119 break;
1120
1121 case CONN_PARAMETERS:
1122 /*
1123 * We've previously agreed on a (>1) version, so we expect
1124 * either the other side's parameters handshake, or possibly a
1125 * GMDB request on a one-shot, dedicated connection.
1126 */
1127 switch (conn->msg_type) {
1128 case REPMGR_HANDSHAKE:
1129 dbt = &conn->input.repmgr_msg.rec;
1130 hostname = dbt->data;
1131 hostname[dbt->size-1] = '\0';
1132 if ((ret = accept_handshake(env, conn, hostname)) != 0)
1133 return (ret);
1134 conn->state = CONN_READY;
1135 break;
1136 case REPMGR_OWN_MSG:
1137 /*
1138 * GM change requests arrive in their own dedicated
1139 * connections, and when they're served the entire
1140 * connection isn't needed any more. So the message
1141 * processing thread will do the entire job of serving
1142 * the request and finishing off the connection; so we
1143 * don't have to read it any more. Note that normally
1144 * whenever we remove a connection from our list we
1145 * decrement the reference count; but we also increment
1146 * it whenever we pass a reference over to the message
1147 * processing threads' queue. So in this case it's a
1148 * wash.
1149 */
1150 conn->input.rep_message->v.gmdb_msg.conn = conn;
1151 TAILQ_REMOVE(&db_rep->connections, conn, entries);
1152 if ((ret = __repmgr_queue_put(env,
1153 conn->input.rep_message)) != 0)
1154 return (ret);
1155 break;
1156
1157 default:
1158 __db_errx(env, DB_STR_A("3620",
1159 "unexpected msg type %d in PARAMETERS state", "%d"),
1160 (int)conn->msg_type);
1161 return (DB_REP_UNAVAIL);
1162 }
1163
1164 break;
1165
1166 case CONN_READY:
1167 case CONN_CONGESTED:
1168 /*
1169 * We have a complete message, so process it. Acks and
1170 * handshakes get processed here, in line. Regular rep messages
1171 * get posted to a queue, to be handled by a thread from the
1172 * message thread pool.
1173 */
1174 switch (conn->msg_type) {
1175 case REPMGR_PERMLSN:
1176 if ((ret = record_permlsn(env, conn)) != 0)
1177 return (ret);
1178 break;
1179
1180 case REPMGR_HEARTBEAT:
1181 case REPMGR_APP_MESSAGE:
1182 case REPMGR_REP_MESSAGE:
1183 if ((ret = __repmgr_queue_put(env,
1184 conn->input.rep_message)) != 0)
1185 return (ret);
1186 /*
1187 * The queue has taken over responsibility for the
1188 * rep_message buffer, and will free it later.
1189 */
1190 if (conn->msg_type == REPMGR_APP_MESSAGE)
1191 conn->ref_count++;
1192 break;
1193
1194 case REPMGR_OWN_MSG:
1195 /*
1196 * Since we're in one of the "ready" states we know this
1197 * isn't a one-shot request, so we are not giving
1198 * ownership of this connection over to the message
1199 * thread queue; we're going to keep reading on it
1200 * ourselves. The message thread that processes this
1201 * request has no need for a connection anyway, since
1202 * there is no response that needs to be returned.
1203 */
1204 conn->input.rep_message->v.gmdb_msg.conn = NULL;
1205 if ((ret = process_own_msg(env, conn)) != 0)
1206 return (ret);
1207 break;
1208
1209 case REPMGR_APP_RESPONSE:
1210 DB_ASSERT(env, conn->cur_resp < conn->aresp &&
1211 conn->responses != NULL);
1212 resp = &conn->responses[conn->cur_resp];
1213 DB_ASSERT(env, F_ISSET(resp, RESP_READING));
1214 F_CLR(resp, RESP_READING);
1215 if (F_ISSET(resp, RESP_THREAD_WAITING)) {
1216 F_SET(resp, RESP_COMPLETE);
1217 if ((ret = __repmgr_wake_waiters(env,
1218 &conn->response_waiters)) != 0)
1219 return (ret);
1220 } else {
1221 /*
1222 * If the calling thread is no longer with us,
1223 * yet we're reading, it can only mean we're
1224 * reading into a dummy buffer, so free it now.
1225 */
1226 DB_ASSERT(env, F_ISSET(resp, RESP_DUMMY_BUF));
1227 __os_free(env, resp->dbt.data);
1228 F_CLR(resp, RESP_IN_USE);
1229 }
1230 break;
1231
1232 case REPMGR_RESP_ERROR:
1233 default:
1234 __db_errx(env, DB_STR_A("3621",
1235 "unexpected msg type rcvd in ready state: %d",
1236 "%d"), (int)conn->msg_type);
1237 return (DB_REP_UNAVAIL);
1238 }
1239 break;
1240
1241 case CONN_DEFUNCT:
1242 break;
1243
1244 default:
1245 DB_ASSERT(env, FALSE);
1246 }
1247
1248 switch (conn->msg_type) {
1249 case REPMGR_HANDSHAKE:
1250 case REPMGR_PERMLSN:
1251 dbt = &conn->input.repmgr_msg.cntrl;
1252 if (dbt->size > 0)
1253 __os_free(env, dbt->data);
1254 dbt = &conn->input.repmgr_msg.rec;
1255 if (dbt->size > 0)
1256 __os_free(env, dbt->data);
1257 break;
1258 default:
1259 /*
1260 * Some messages in REPMGR_OWN_MSG format are also handled
1261 */
1262 break;
1263 }
1264 __repmgr_reset_for_reading(conn);
1265 return (0);
1266 }
1267
1268 /*
1269 * Process one of repmgr's "own" message types, and one that occurs on a regular
1270 * (not one-shot) connection.
1271 */
1272 static int
process_own_msg(env,conn)1273 process_own_msg(env, conn)
1274 ENV *env;
1275 REPMGR_CONNECTION *conn;
1276 {
1277 DB_REP *db_rep;
1278 DBT *dbt;
1279 REPMGR_SITE *site;
1280 REPMGR_MESSAGE *msg;
1281 __repmgr_connect_reject_args reject;
1282 __repmgr_parm_refresh_args parms;
1283 int ret;
1284
1285 ret = 0;
1286 /*
1287 * Set "msg" to point to the message struct. If we do all necessary
1288 * processing here now, leave it set so that it can be freed. On the
1289 * other hand, if we pass it off to the message queue for later
1290 * processing by a message thread, we want to avoid freeing the memory
1291 * here, so clear the pointer in such a case.
1292 */
1293 switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) {
1294 case REPMGR_CONNECT_REJECT:
1295 dbt = &msg->v.gmdb_msg.request;
1296 if ((ret = __repmgr_connect_reject_unmarshal(env,
1297 &reject, dbt->data, dbt->size, NULL)) != 0)
1298 return (DB_REP_UNAVAIL);
1299
1300 /*
1301 * If we're being rejected by someone who has more up-to-date
1302 * membership information than we do, it means we have been
1303 * removed from the group. If we've just gotten started, we can
1304 * make one attempt at automatically rejoining; otherwise we bow
1305 * out gracefully.
1306 */
1307 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1308 "got rejection msg citing version %lu/%lu",
1309 (u_long)reject.gen, (u_long)reject.version));
1310
1311 if (__repmgr_gmdb_version_cmp(env,
1312 reject.gen, reject.version) > 0) {
1313 if (env->rep_handle->seen_repmsg)
1314 ret = DB_DELETED;
1315 else if ((ret = __repmgr_defer_op(env,
1316 REPMGR_REJOIN)) == 0)
1317 ret = DB_REP_UNAVAIL;
1318 } else
1319 ret = DB_REP_UNAVAIL;
1320 DB_ASSERT(env, ret != 0);
1321 return (ret);
1322
1323 case REPMGR_SHARING:
1324 if ((ret = __repmgr_queue_put(env, msg)) != 0)
1325 return (ret);
1326 /* Show that we no longer own this memory. */
1327 msg = NULL;
1328 break;
1329
1330 case REPMGR_PARM_REFRESH:
1331 dbt = &conn->input.rep_message->v.gmdb_msg.request;
1332 if ((ret = __repmgr_parm_refresh_unmarshal(env,
1333 &parms, dbt->data, dbt->size, NULL)) != 0)
1334 return (DB_REP_UNAVAIL);
1335 db_rep = env->rep_handle;
1336 DB_ASSERT(env, conn->type == REP_CONNECTION &&
1337 IS_KNOWN_REMOTE_SITE(conn->eid));
1338 site = SITE_FROM_EID(conn->eid);
1339 site->ack_policy = (int)parms.ack_policy;
1340 if (F_ISSET(&parms, ELECTABLE_SITE))
1341 F_SET(site, SITE_ELECTABLE);
1342 else
1343 F_CLR(site, SITE_ELECTABLE);
1344 F_SET(site, SITE_HAS_PRIO);
1345 break;
1346
1347 case REPMGR_GM_FAILURE:
1348 case REPMGR_GM_FORWARD:
1349 case REPMGR_JOIN_REQUEST:
1350 case REPMGR_JOIN_SUCCESS:
1351 case REPMGR_REMOVE_REQUEST:
1352 case REPMGR_RESOLVE_LIMBO:
1353 default:
1354 __db_errx(env, DB_STR_A("3677",
1355 "unexpected msg type %lu in process_own_msg", "%lu"),
1356 (u_long)REPMGR_OWN_MSG_TYPE(msg->msg_hdr));
1357 return (DB_REP_UNAVAIL);
1358 }
1359 /*
1360 * If we haven't given ownership of the msg buffer to another thread,
1361 * free it now.
1362 */
1363 if (msg != NULL)
1364 __os_free(env, msg);
1365 return (ret);
1366 }
1367
1368 /*
1369 * Examine and verify the incoming version proposal message, and send an
1370 * appropriate response.
1371 */
1372 static int
send_version_response(env,conn)1373 send_version_response(env, conn)
1374 ENV *env;
1375 REPMGR_CONNECTION *conn;
1376 {
1377 DB_REP *db_rep;
1378 __repmgr_version_proposal_args versions;
1379 __repmgr_version_confirmation_args conf;
1380 repmgr_netaddr_t *my_addr;
1381 char *hostname;
1382 u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
1383 DBT vi;
1384 int ret;
1385
1386 db_rep = env->rep_handle;
1387 my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1388
1389 if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1390 return (ret);
1391 if (vi.size == 0) {
1392 /* No version info, so we must be talking to a v1 site. */
1393 hostname = conn->input.repmgr_msg.rec.data;
1394 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1395 return (ret);
1396 if ((ret = __repmgr_send_v1_handshake(env,
1397 conn, my_addr->host, strlen(my_addr->host) + 1)) != 0)
1398 return (ret);
1399 conn->state = CONN_READY;
1400 } else {
1401 if ((ret = __repmgr_version_proposal_unmarshal(env,
1402 &versions, vi.data, vi.size, NULL)) != 0)
1403 return (DB_REP_UNAVAIL);
1404
1405 if (DB_REPMGR_VERSION >= versions.min &&
1406 DB_REPMGR_VERSION <= versions.max)
1407 conf.version = DB_REPMGR_VERSION;
1408 else if (versions.max >= DB_REPMGR_MIN_VERSION &&
1409 versions.max <= DB_REPMGR_VERSION)
1410 conf.version = versions.max;
1411 else {
1412 /*
1413 * User must have wired up a combination of versions
1414 * exceeding what we said we'd support.
1415 */
1416 __db_errx(env, DB_STR_A("3622",
1417 "No available version between %lu and %lu",
1418 "%lu %lu"), (u_long)versions.min,
1419 (u_long)versions.max);
1420 return (DB_REP_UNAVAIL);
1421 }
1422 conn->version = conf.version;
1423
1424 __repmgr_version_confirmation_marshal(env, &conf, buf);
1425 buf[__REPMGR_VERSION_CONFIRMATION_SIZE] = '\0';
1426 DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1427 if ((ret = __repmgr_send_handshake(env,
1428 conn, buf, sizeof(buf), 0)) != 0)
1429 return (ret);
1430
1431 conn->state = CONN_PARAMETERS;
1432 }
1433 return (ret);
1434 }
1435
1436 /*
1437 * Sends a version-aware handshake to the remote site, only after we've verified
1438 * that it is indeed version-aware. We can send either v2 or v3 handshake,
1439 * depending on the connection's version.
1440 *
1441 * PUBLIC: int __repmgr_send_handshake __P((ENV *,
1442 * PUBLIC: REPMGR_CONNECTION *, void *, size_t, u_int32_t));
1443 */
1444 int
__repmgr_send_handshake(env,conn,opt,optlen,flags)1445 __repmgr_send_handshake(env, conn, opt, optlen, flags)
1446 ENV *env;
1447 REPMGR_CONNECTION *conn;
1448 void *opt;
1449 size_t optlen;
1450 u_int32_t flags;
1451 {
1452 DB_REP *db_rep;
1453 REP *rep;
1454 DBT cntrl, rec;
1455 __repmgr_handshake_args hs;
1456 __repmgr_v2handshake_args v2hs;
1457 __repmgr_v3handshake_args v3hs;
1458 repmgr_netaddr_t *my_addr;
1459 size_t hostname_len, rec_len;
1460 void *buf;
1461 u_int8_t *p;
1462 u_int32_t cntrl_len;
1463 int ret;
1464
1465 db_rep = env->rep_handle;
1466 rep = db_rep->region;
1467 my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1468
1469 /*
1470 * The cntrl part has various parameters (varies by version). The rec
1471 * part has the host name, followed by whatever optional extra data was
1472 * passed to us.
1473 *
1474 * Version awareness was introduced with protocol version 2 (so version
1475 * 1 is handled elsewhere).
1476 */
1477 switch (conn->version) {
1478 case 2:
1479 cntrl_len = __REPMGR_V2HANDSHAKE_SIZE;
1480 break;
1481 case 3:
1482 cntrl_len = __REPMGR_V3HANDSHAKE_SIZE;
1483 break;
1484 case 4:
1485 cntrl_len = __REPMGR_HANDSHAKE_SIZE;
1486 break;
1487 default:
1488 __db_errx(env, DB_STR_A("3678",
1489 "unexpected conn version %lu in send_handshake", "%lu"),
1490 (u_long)conn->version);
1491 return (DB_REP_UNAVAIL);
1492 }
1493 hostname_len = strlen(my_addr->host);
1494 rec_len = hostname_len + 1 +
1495 (opt == NULL ? 0 : optlen);
1496
1497 if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
1498 return (ret);
1499
1500 cntrl.data = p = buf;
1501 switch (conn->version) {
1502 case 2:
1503 /* Not allowed to use multi-process feature in v2 group. */
1504 DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1505 v2hs.port = my_addr->port;
1506 v2hs.priority = rep->priority;
1507 __repmgr_v2handshake_marshal(env, &v2hs, p);
1508 break;
1509 case 3:
1510 v3hs.port = my_addr->port;
1511 v3hs.priority = rep->priority;
1512 v3hs.flags = flags;
1513 __repmgr_v3handshake_marshal(env, &v3hs, p);
1514 break;
1515 case 4:
1516 hs.port = my_addr->port;
1517 hs.alignment = MEM_ALIGN;
1518 hs.ack_policy = (u_int32_t)rep->perm_policy;
1519 hs.flags = flags;
1520 if (rep->priority > 0)
1521 F_SET(&hs, ELECTABLE_SITE);
1522 __repmgr_handshake_marshal(env, &hs, p);
1523 break;
1524 default:
1525 DB_ASSERT(env, FALSE);
1526 break;
1527 }
1528 cntrl.size = cntrl_len;
1529
1530 p = rec.data = &p[cntrl_len];
1531 (void)strcpy((char*)p, my_addr->host);
1532 p += hostname_len + 1;
1533 if (opt != NULL) {
1534 memcpy(p, opt, optlen);
1535 p += optlen;
1536 }
1537 rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);
1538
1539 /* Never block on select thread: pass maxblock as 0. */
1540 ret = __repmgr_send_one(env,
1541 conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0);
1542 __os_free(env, buf);
1543 return (ret);
1544 }
1545
1546 static int
read_version_response(env,conn)1547 read_version_response(env, conn)
1548 ENV *env;
1549 REPMGR_CONNECTION *conn;
1550 {
1551 DB_REP *db_rep;
1552 __repmgr_version_confirmation_args conf;
1553 DBT vi;
1554 char *hostname;
1555 u_int32_t flags;
1556 int ret;
1557
1558 db_rep = env->rep_handle;
1559
1560 if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1561 return (ret);
1562 hostname = conn->input.repmgr_msg.rec.data;
1563 if (vi.size == 0) {
1564 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1565 return (ret);
1566 } else {
1567 if ((ret = __repmgr_version_confirmation_unmarshal(env,
1568 &conf, vi.data, vi.size, NULL)) != 0)
1569 return (DB_REP_UNAVAIL);
1570 if (conf.version >= DB_REPMGR_MIN_VERSION &&
1571 conf.version <= DB_REPMGR_VERSION)
1572 conn->version = conf.version;
1573 else {
1574 /*
1575 * Remote site "confirmed" a version outside of the
1576 * range we proposed. It should never do that.
1577 */
1578 __db_errx(env, DB_STR_A("3623",
1579 "Can't support confirmed version %lu", "%lu"),
1580 (u_long)conf.version);
1581 return (DB_REP_UNAVAIL);
1582 }
1583
1584 if ((ret = accept_handshake(env, conn, hostname)) != 0)
1585 return (ret);
1586 flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
1587 if ((ret = __repmgr_send_handshake(env,
1588 conn, NULL, 0, flags)) != 0)
1589 return (ret);
1590 }
1591 conn->state = CONN_READY;
1592 return (ret);
1593 }
1594
1595 /*
1596 * Examine the rec part of a handshake message to see if it has any version
1597 * information in it. This is the magic that lets us allows version-aware sites
1598 * to exchange information, and yet avoids tripping up v1 sites, which don't
1599 * know how to look for it.
1600 *
1601 * PUBLIC: int __repmgr_find_version_info __P((ENV *,
1602 * PUBLIC: REPMGR_CONNECTION *, DBT *));
1603 */
1604 int
__repmgr_find_version_info(env,conn,vi)1605 __repmgr_find_version_info(env, conn, vi)
1606 ENV *env;
1607 REPMGR_CONNECTION *conn;
1608 DBT *vi;
1609 {
1610 DBT *dbt;
1611 char *hostname;
1612 u_int32_t hostname_len;
1613
1614 dbt = &conn->input.repmgr_msg.rec;
1615 if (dbt->size == 0) {
1616 __db_errx(env, DB_STR("3624",
1617 "handshake is missing rec part"));
1618 return (DB_REP_UNAVAIL);
1619 }
1620 hostname = dbt->data;
1621 hostname[dbt->size-1] = '\0';
1622 hostname_len = (u_int32_t)strlen(hostname);
1623 if (hostname_len + 1 == dbt->size) {
1624 /*
1625 * The rec DBT held only the host name. This is a simple legacy
1626 * V1 handshake; it contains no version information.
1627 */
1628 vi->size = 0;
1629 } else {
1630 /*
1631 * There's more data than just the host name. The remainder is
1632 * available to be treated as a normal byte buffer (and read in
1633 * by one of the unmarshal functions). Note that the remaining
1634 * length should not include the padding byte that we have
1635 * already clobbered.
1636 */
1637 vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
1638 vi->size = (dbt->size - (hostname_len+1)) - 1;
1639 }
1640 return (0);
1641 }
1642
1643 static int
accept_handshake(env,conn,hostname)1644 accept_handshake(env, conn, hostname)
1645 ENV *env;
1646 REPMGR_CONNECTION *conn;
1647 char *hostname;
1648 {
1649 __repmgr_handshake_args hs;
1650 __repmgr_v2handshake_args hs2;
1651 __repmgr_v3handshake_args hs3;
1652 u_int port;
1653 u_int32_t ack, flags;
1654 int electable;
1655
1656 switch (conn->version) {
1657 case 2:
1658 if (__repmgr_v2handshake_unmarshal(env, &hs2,
1659 conn->input.repmgr_msg.cntrl.data,
1660 conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1661 return (DB_REP_UNAVAIL);
1662 port = hs2.port;
1663 electable = hs2.priority > 0;
1664 ack = flags = 0;
1665 break;
1666 case 3:
1667 if (__repmgr_v3handshake_unmarshal(env, &hs3,
1668 conn->input.repmgr_msg.cntrl.data,
1669 conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1670 return (DB_REP_UNAVAIL);
1671 port = hs3.port;
1672 electable = hs3.priority > 0;
1673 flags = hs3.flags;
1674 ack = 0;
1675 break;
1676 case 4:
1677 if (__repmgr_handshake_unmarshal(env, &hs,
1678 conn->input.repmgr_msg.cntrl.data,
1679 conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1680 return (DB_REP_UNAVAIL);
1681 port = hs.port;
1682 electable = F_ISSET(&hs, ELECTABLE_SITE);
1683 flags = hs.flags;
1684 ack = hs.ack_policy;
1685 break;
1686 default:
1687 __db_errx(env, DB_STR_A("3679",
1688 "unexpected conn version %lu in accept_handshake", "%lu"),
1689 (u_long)conn->version);
1690 return (DB_REP_UNAVAIL);
1691 }
1692
1693 return (process_parameters(env,
1694 conn, hostname, port, ack, electable, flags));
1695 }
1696
1697 static int
accept_v1_handshake(env,conn,hostname)1698 accept_v1_handshake(env, conn, hostname)
1699 ENV *env;
1700 REPMGR_CONNECTION *conn;
1701 char *hostname;
1702 {
1703 DB_REPMGR_V1_HANDSHAKE *handshake;
1704 u_int32_t prio;
1705 int electable;
1706
1707 handshake = conn->input.repmgr_msg.cntrl.data;
1708 if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
1709 handshake->version != 1) {
1710 __db_errx(env, DB_STR("3625", "malformed V1 handshake"));
1711 return (DB_REP_UNAVAIL);
1712 }
1713
1714 conn->version = 1;
1715 prio = ntohl(handshake->priority);
1716 electable = prio > 0;
1717 return (process_parameters(env,
1718 conn, hostname, handshake->port, 0, electable, 0));
1719 }
1720
1721 /* Caller must hold mutex. */
1722 static int
process_parameters(env,conn,host,port,ack,electable,flags)1723 process_parameters(env, conn, host, port, ack, electable, flags)
1724 ENV *env;
1725 REPMGR_CONNECTION *conn;
1726 char *host;
1727 u_int port;
1728 int electable;
1729 u_int32_t ack, flags;
1730 {
1731 DB_REP *db_rep;
1732 REPMGR_RETRY *retry;
1733 REPMGR_SITE *site;
1734 __repmgr_connect_reject_args reject;
1735 u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE];
1736 int eid, ret;
1737
1738 db_rep = env->rep_handle;
1739
1740 /* Connection state can be used to discern incoming versus outgoing. */
1741 if (conn->state == CONN_CONNECTED) {
1742 /*
1743 * Since we initiated this as an outgoing connection, we
1744 * obviously already know the host, port and site. We just need
1745 * the other site's electability flag (which we'll grab below,
1746 * after the big "else" clause).
1747 */
1748 DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1749 site = SITE_FROM_EID(conn->eid);
1750 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1751 "handshake from connection to %s:%lu EID %u",
1752 site->net_addr.host,
1753 (u_long)site->net_addr.port, conn->eid));
1754 } else {
1755 DB_ASSERT(env, conn->state == CONN_NEGOTIATE ||
1756 conn->state == CONN_PARAMETERS);
1757 /*
1758 * Incoming connection: until now we haven't known what kind of
1759 * connection we're dealing with (and in the case of a
1760 * REP_CONNECTION, what its EID is); so it must be on the
1761 * "orphans" list. But now that we've received the parameters
1762 * we'll be able to figure all that out.
1763 */
1764 if (LF_ISSET(APP_CHANNEL_CONNECTION)) {
1765 conn->type = APP_CONNECTION;
1766 return (0);
1767 } else
1768 conn->type = REP_CONNECTION;
1769
1770 /*
1771 * Now that we've been given the host and port, use them to find
1772 * the site.
1773 */
1774 if ((site = __repmgr_lookup_site(env, host, port)) != NULL &&
1775 site->membership == SITE_PRESENT) {
1776 TAILQ_REMOVE(&db_rep->connections, conn, entries);
1777 conn->ref_count--;
1778
1779 eid = EID_FROM_SITE(site);
1780 if (LF_ISSET(REPMGR_SUBORDINATE)) {
1781 /*
1782 * Accept it, as a supplementary source of
1783 * input, but nothing else.
1784 */
1785 TAILQ_INSERT_TAIL(&site->sub_conns,
1786 conn, entries);
1787 conn->eid = eid;
1788 } else {
1789 DB_EVENT(env,
1790 DB_EVENT_REP_CONNECT_ESTD, &eid);
1791 switch (site->state) {
1792 case SITE_PAUSING:
1793 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1794 "handshake from paused site %s:%u EID %u",
1795 host, port, eid));
1796 retry = site->ref.retry;
1797 TAILQ_REMOVE(&db_rep->retries,
1798 retry, entries);
1799 __os_free(env, retry);
1800 break;
1801 case SITE_CONNECTED:
1802 /*
1803 * We got an incoming connection for a
1804 * site we were already connected to; at
1805 * least we thought we were.
1806 */
1807 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1808 "connection from %s:%u EID %u while already connected",
1809 host, port, eid));
1810 if ((ret = resolve_collision(env,
1811 site, conn)) != 0)
1812 return (ret);
1813 break;
1814 case SITE_CONNECTING:
1815 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1816 "handshake from connecting site %s:%u EID %u",
1817 host, port, eid));
1818 /*
1819 * Connector thread will give up when it
1820 * sees this site's state change, so we
1821 * don't have to do anything else here.
1822 */
1823 break;
1824 default:
1825 DB_ASSERT(env, FALSE);
1826 }
1827 conn->eid = eid;
1828 site->state = SITE_CONNECTED;
1829 site->ref.conn.in = conn;
1830 __os_gettime(env,
1831 &site->last_rcvd_timestamp, 1);
1832 }
1833 } else {
1834 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1835 "rejecting connection from unknown or provisional site %s:%u",
1836 host, port));
1837 reject.version = db_rep->membership_version;
1838 reject.gen = db_rep->member_version_gen;
1839 __repmgr_connect_reject_marshal(env,
1840 &reject, reject_buf);
1841
1842 if ((ret = __repmgr_send_own_msg(env, conn,
1843 REPMGR_CONNECT_REJECT, reject_buf,
1844 __REPMGR_CONNECT_REJECT_SIZE)) != 0)
1845 return (ret);
1846
1847 /*
1848 * Since we haven't set conn->eid, bust_connection will
1849 * not schedule a retry for this "failure", which is
1850 * exactly what we want.
1851 */
1852 return (DB_REP_UNAVAIL);
1853 }
1854 }
1855
1856 if (electable)
1857 F_SET(site, SITE_ELECTABLE);
1858 else
1859 F_CLR(site, SITE_ELECTABLE);
1860 F_SET(site, SITE_HAS_PRIO);
1861 site->ack_policy = (int)ack;
1862
1863 /*
1864 * If we're moping around wishing we knew who the master was, then
1865 * getting in touch with another site might finally provide sufficient
1866 * connectivity to find out.
1867 */
1868 if (!IS_SUBORDINATE(db_rep) && /* us */
1869 !__repmgr_master_is_known(env) &&
1870 !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */
1871 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1872 "handshake with no known master to wake election thread"));
1873 db_rep->new_connection = TRUE;
1874 if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
1875 return (ret);
1876 }
1877
1878 return (0);
1879 }
1880
1881 static int
resolve_collision(env,site,conn)1882 resolve_collision(env, site, conn)
1883 ENV *env;
1884 REPMGR_SITE *site;
1885 REPMGR_CONNECTION *conn;
1886 {
1887 int ret;
1888
1889 /*
1890 * No need for site-oriented recovery, since we now have a replacement
1891 * connection; so skip bust_connection() and call disable_conn()
1892 * directly.
1893 *
1894 * If we already had an incoming connection, this new one always
1895 * replaces it. Whether it also/alternatively replaces an outgoing
1896 * connection depends on whether we're client or server (so as to avoid
1897 * connection collisions resulting in no remaining connections). (If
1898 * it's an older version that doesn't know about our collision
1899 * resolution protocol, it will behave like a client.)
1900 */
1901 if (site->ref.conn.in != NULL) {
1902 ret = __repmgr_disable_connection(env, site->ref.conn.in);
1903 site->ref.conn.in = NULL;
1904 if (ret != 0)
1905 return (ret);
1906 }
1907 if (site->ref.conn.out != NULL &&
1908 conn->version >= CONN_COLLISION_VERSION &&
1909 __repmgr_is_server(env, site)) {
1910 ret = __repmgr_disable_connection(env, site->ref.conn.out);
1911 site->ref.conn.out = NULL;
1912 if (ret != 0)
1913 return (ret);
1914 }
1915 return (0);
1916 }
1917
1918 static int
record_permlsn(env,conn)1919 record_permlsn(env, conn)
1920 ENV *env;
1921 REPMGR_CONNECTION *conn;
1922 {
1923 DB_REP *db_rep;
1924 REPMGR_SITE *site;
1925 __repmgr_permlsn_args *ackp, ack;
1926 SITE_STRING_BUFFER location;
1927 u_int32_t gen;
1928 int ret;
1929 u_int do_log_check;
1930
1931 db_rep = env->rep_handle;
1932 do_log_check = 0;
1933
1934 if (conn->version == 0 ||
1935 !IS_READY_STATE(conn->state) || !IS_VALID_EID(conn->eid)) {
1936 __db_errx(env, DB_STR("3682",
1937 "unexpected connection info in record_permlsn"));
1938 return (DB_REP_UNAVAIL);
1939 }
1940 site = SITE_FROM_EID(conn->eid);
1941
1942 /*
1943 * Extract the LSN. Save it only if it is an improvement over what the
1944 * site has already ack'ed.
1945 */
1946 if (conn->version == 1) {
1947 ackp = conn->input.repmgr_msg.cntrl.data;
1948 if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
1949 conn->input.repmgr_msg.rec.size != 0) {
1950 __db_errx(env, DB_STR("3627", "bad ack msg size"));
1951 return (DB_REP_UNAVAIL);
1952 }
1953 } else {
1954 ackp = &ack;
1955 if ((ret = __repmgr_permlsn_unmarshal(env, ackp,
1956 conn->input.repmgr_msg.cntrl.data,
1957 conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
1958 return (DB_REP_UNAVAIL);
1959 }
1960
1961 /* Ignore stale acks. */
1962 gen = db_rep->region->gen;
1963 if (ackp->generation < gen) {
1964 VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1965 "ignoring stale ack (%lu<%lu), from %s",
1966 (u_long)ackp->generation, (u_long)gen,
1967 __repmgr_format_site_loc(site, location)));
1968 return (0);
1969 }
1970 VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1971 "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
1972 (u_long)ackp->lsn.offset, (u_long)ackp->generation,
1973 __repmgr_format_site_loc(site, location)));
1974
1975 if (ackp->generation == gen &&
1976 LOG_COMPARE(&ackp->lsn, &site->max_ack) == 1) {
1977 /*
1978 * If file number for this site changed, check lowest log
1979 * file needed after recording new permlsn for this site.
1980 */
1981 if (ackp->lsn.file > site->max_ack.file)
1982 do_log_check = 1;
1983 memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
1984 if (do_log_check)
1985 check_min_log_file(env);
1986 if ((ret = __repmgr_wake_waiters(env,
1987 &db_rep->ack_waiters)) != 0)
1988 return (ret);
1989 }
1990 return (0);
1991 }
1992
1993 /*
1994 * Maintains lowest log file still needed by the repgroup. This is stored
1995 * in shared rep region so that it is accessible to repmgr subordinate
1996 * processes that may not themselves have connections to other sites
1997 * (e.g. a separate db_archive process.)
1998 */
1999 static void
check_min_log_file(env)2000 check_min_log_file(env)
2001 ENV *env;
2002 {
2003 DB_REP *db_rep;
2004 REP *rep;
2005 REPMGR_CONNECTION *conn;
2006 REPMGR_SITE *site;
2007 u_int32_t min_log;
2008 int eid;
2009
2010 db_rep = env->rep_handle;
2011 rep = db_rep->region;
2012 min_log = 0;
2013
2014 /*
2015 * Record the lowest log file number from all connected sites. If this
2016 * is a client, ignore the master because the master does not maintain
2017 * nor send out its repmgr perm LSN in this way. Consider connections
2018 * so that we don't allow a site that has been down a long time to
2019 * indefinitely prevent log archiving.
2020 */
2021 FOR_EACH_REMOTE_SITE_INDEX(eid) {
2022 if (eid == rep->master_id)
2023 continue;
2024 site = SITE_FROM_EID(eid);
2025 if (site->state == SITE_CONNECTED &&
2026 (((conn = site->ref.conn.in) != NULL &&
2027 conn->state == CONN_READY) ||
2028 ((conn = site->ref.conn.out) != NULL &&
2029 conn->state == CONN_READY)) &&
2030 !IS_ZERO_LSN(site->max_ack) &&
2031 (min_log == 0 || site->max_ack.file < min_log))
2032 min_log = site->max_ack.file;
2033 }
2034 /*
2035 * During normal operation min_log should increase over time, but it
2036 * is possible if a site returns after being disconnected for a while
2037 * that min_log could decrease.
2038 */
2039 if (min_log != 0 && min_log != rep->min_log_file)
2040 rep->min_log_file = min_log;
2041 }
2042
2043 /*
2044 * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *));
2045 */
2046 int
__repmgr_write_some(env,conn)2047 __repmgr_write_some(env, conn)
2048 ENV *env;
2049 REPMGR_CONNECTION *conn;
2050 {
2051 QUEUED_OUTPUT *output;
2052 REPMGR_FLAT *msg;
2053 int bytes, ret;
2054
2055 while (!STAILQ_EMPTY(&conn->outbound_queue)) {
2056 output = STAILQ_FIRST(&conn->outbound_queue);
2057 msg = output->msg;
2058 if ((bytes = sendsocket(conn->fd, &msg->data[output->offset],
2059 msg->length - output->offset, 0)) == SOCKET_ERROR) {
2060 switch (ret = net_errno) {
2061 case WOULDBLOCK:
2062 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
2063 case DB_REPMGR_EAGAIN:
2064 #endif
2065 return (0);
2066 default:
2067 __repmgr_fire_conn_err_event(env, conn, ret);
2068 STAT(env->rep_handle->
2069 region->mstat.st_connection_drop++);
2070 return (DB_REP_UNAVAIL);
2071 }
2072 }
2073
2074 if ((output->offset += (size_t)bytes) >= msg->length) {
2075 STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
2076 __os_free(env, output);
2077 conn->out_queue_length--;
2078 if (--msg->ref_count <= 0)
2079 __os_free(env, msg);
2080
2081 /*
2082 * We've achieved enough movement to free up at least
2083 * one space in the outgoing queue. Wake any message
2084 * threads that may be waiting for space. Leave
2085 * CONGESTED state so that when the queue reaches the
2086 * high-water mark again, the filling thread will be
2087 * allowed to try waiting again.
2088 */
2089 conn->state = CONN_READY;
2090 if ((ret = __repmgr_signal(&conn->drained)) != 0)
2091 return (ret);
2092 }
2093 }
2094
2095 return (0);
2096 }
2097