1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2006, 2013 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 
13 typedef int (*HEARTBEAT_ACTION) __P((ENV *));
14 
15 static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
16 static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
17 static void check_min_log_file __P((ENV *));
18 static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
19 static int prepare_input __P((ENV *, REPMGR_CONNECTION *));
20 static int process_own_msg __P((ENV *, REPMGR_CONNECTION *));
21 static int process_parameters __P((ENV *,
22     REPMGR_CONNECTION *, char *, u_int, u_int32_t, int, u_int32_t));
23 static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
24 static int record_permlsn __P((ENV *, REPMGR_CONNECTION *));
25 static int __repmgr_call_election __P((ENV *));
26 static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *));
27 static void *__repmgr_connector_thread __P((void *));
28 static int __repmgr_next_timeout __P((ENV *,
29     db_timespec *, HEARTBEAT_ACTION *));
30 static int __repmgr_retry_connections __P((ENV *));
31 static int __repmgr_send_heartbeat __P((ENV *));
32 static int __repmgr_try_one __P((ENV *, int));
33 static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *));
34 static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
35 
36 #define	ONLY_HANDSHAKE(env, conn) do {				     \
37 	if (conn->msg_type != REPMGR_HANDSHAKE) {		     \
38 		__db_errx(env, DB_STR_A("3613",		     \
39 		    "unexpected msg type %d in state %d", "%d %d"),  \
40 		    (int)conn->msg_type, conn->state);		     \
41 		return (DB_REP_UNAVAIL);			     \
42 	}							     \
43 } while (0)
44 
45 /*
46  * PUBLIC: void *__repmgr_select_thread __P((void *));
47  */
48 void *
__repmgr_select_thread(argsp)49 __repmgr_select_thread(argsp)
50 	void *argsp;
51 {
52 	REPMGR_RUNNABLE *args;
53 	ENV *env;
54 	int ret;
55 
56 	args = argsp;
57 	env = args->env;
58 
59 	if ((ret = __repmgr_select_loop(env))  != 0) {
60 		__db_err(env, ret, DB_STR("3614", "select loop failed"));
61 		(void)__repmgr_thread_failure(env, ret);
62 	}
63 	return (NULL);
64 }
65 
66 /*
67  * PUBLIC: int __repmgr_bow_out __P((ENV *));
68  */
69 int
__repmgr_bow_out(env)70 __repmgr_bow_out(env)
71 	ENV *env;
72 {
73 	DB_REP *db_rep;
74 	int ret;
75 
76 	db_rep = env->rep_handle;
77 	LOCK_MUTEX(db_rep->mutex);
78 	ret = __repmgr_stop_threads(env);
79 	UNLOCK_MUTEX(db_rep->mutex);
80 	DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL);
81 	return (ret);
82 }
83 
84 /*
85  * PUBLIC: int __repmgr_accept __P((ENV *));
86  */
87 int
__repmgr_accept(env)88 __repmgr_accept(env)
89 	ENV *env;
90 {
91 	DB_REP *db_rep;
92 	REPMGR_CONNECTION *conn;
93 	ACCEPT_ADDR siaddr;
94 	socklen_t addrlen;
95 	socket_t s;
96 	int ret;
97 
98 	db_rep = env->rep_handle;
99 	addrlen = sizeof(siaddr);
100 	if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
101 	    &addrlen)) == -1) {
102 		/*
103 		 * Some errors are innocuous and so should be ignored.  MSDN
104 		 * Library documents the Windows ones; the Unix ones are
105 		 * advocated in Stevens' UNPv1, section 16.6; and Linux
106 		 * Application Development, p. 416.
107 		 */
108 		switch (ret = net_errno) {
109 #ifdef DB_WIN32
110 		case WSAECONNRESET:
111 		case WSAEWOULDBLOCK:
112 #else
113 		case EINTR:
114 		case EWOULDBLOCK:
115 		case ECONNABORTED:
116 		case ENETDOWN:
117 #ifdef EPROTO
118 		case EPROTO:
119 #endif
120 		case ENOPROTOOPT:
121 		case EHOSTDOWN:
122 #ifdef ENONET
123 		case ENONET:
124 #endif
125 		case EHOSTUNREACH:
126 		case EOPNOTSUPP:
127 		case ENETUNREACH:
128 #endif
129 			VPRINT(env, (env, DB_VERB_REPMGR_MISC,
130 			    "accept error %d considered innocuous", ret));
131 			return (0);
132 		default:
133 			__db_err(env, ret, DB_STR("3615", "accept error"));
134 			return (ret);
135 		}
136 	}
137 	RPRINT(env, (env, DB_VERB_REPMGR_MISC, "accepted a new connection"));
138 
139 	if ((ret =
140 	    __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
141 		(void)closesocket(s);
142 		return (ret);
143 	}
144 	if ((ret = __repmgr_set_keepalive(env, conn)) != 0) {
145 		(void)__repmgr_destroy_conn(env, conn);
146 		return (ret);
147 	}
148 	if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
149 		__db_err(env, ret, DB_STR("3616",
150 		    "can't set nonblock after accept"));
151 		(void)__repmgr_destroy_conn(env, conn);
152 		return (ret);
153 	}
154 
155 	/*
156 	 * We don't yet know which site this connection is coming from.  So for
157 	 * now, put it on the "orphans" list; we'll move it to the appropriate
158 	 * site struct later when we discover who we're talking with, and what
159 	 * type of connection it is.
160 	 */
161 	conn->eid = -1;
162 	TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
163 	conn->ref_count++;
164 
165 	return (0);
166 }
167 
168 /*
169  * Computes how long we should wait for input, in other words how long until we
170  * have to wake up and do something.  Returns TRUE if timeout is set; FALSE if
171  * there is nothing to wait for.
172  *
173  * Note that the resulting timeout could be zero; but it can't be negative.
174  *
175  * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *));
176  */
177 int
__repmgr_compute_timeout(env,timeout)178 __repmgr_compute_timeout(env, timeout)
179 	ENV *env;
180 	db_timespec *timeout;
181 {
182 	DB_REP *db_rep;
183 	REPMGR_RETRY *retry;
184 	db_timespec now, t;
185 	int have_timeout;
186 
187 	db_rep = env->rep_handle;
188 
189 	/*
190 	 * There are two factors to consider: are heartbeats in use?  and, do we
191 	 * have any sites with broken connections that we ought to retry?
192 	 */
193 	have_timeout = __repmgr_next_timeout(env, &t, NULL);
194 
195 	/* List items are in order, so we only have to examine the first one. */
196 	if (!TAILQ_EMPTY(&db_rep->retries)) {
197 		retry = TAILQ_FIRST(&db_rep->retries);
198 		if (have_timeout) {
199 			/* Choose earliest timeout deadline. */
200 			t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
201 		} else {
202 			t = retry->time;
203 			have_timeout = TRUE;
204 		}
205 	}
206 
207 	if (have_timeout) {
208 		__os_gettime(env, &now, 1);
209 		if (timespeccmp(&now, &t, >=))
210 			timespecclear(timeout);
211 		else {
212 			*timeout = t;
213 			timespecsub(timeout, &now);
214 		}
215 	}
216 
217 	return (have_timeout);
218 }
219 
220 /*
221  * Figures out the next heartbeat-related thing to be done, and when it should
222  * be done.  The code is factored this way because this computation needs to be
223  * done both before each select() call, and after (when we're checking for timer
224  * expiration).
225  */
226 static int
__repmgr_next_timeout(env,deadline,action)227 __repmgr_next_timeout(env, deadline, action)
228 	ENV *env;
229 	db_timespec *deadline;
230 	HEARTBEAT_ACTION *action;
231 {
232 	DB_REP *db_rep;
233 	REP *rep;
234 	HEARTBEAT_ACTION my_action;
235 	REPMGR_CONNECTION *conn;
236 	REPMGR_SITE *master;
237 	db_timespec t;
238 	u_int32_t version;
239 
240 	db_rep = env->rep_handle;
241 	rep = db_rep->region;
242 
243 	if (rep->master_id == db_rep->self_eid &&
244 	    rep->heartbeat_frequency > 0) {
245 		t = db_rep->last_bcast;
246 		TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency);
247 		my_action = __repmgr_send_heartbeat;
248 	} else if ((master = __repmgr_connected_master(env)) != NULL &&
249 	    !IS_SUBORDINATE(db_rep) &&
250 	    rep->heartbeat_monitor_timeout > 0) {
251 		version = 0;
252 		if ((conn = master->ref.conn.in) != NULL &&
253 		    IS_READY_STATE(conn->state))
254 			version = conn->version;
255 		if ((conn = master->ref.conn.out) != NULL &&
256 		    IS_READY_STATE(conn->state) &&
257 		    conn->version > version)
258 			version = conn->version;
259 		if (version >= HEARTBEAT_MIN_VERSION) {
260 			/*
261 			 * If we have a working connection to a heartbeat-aware
262 			 * master, let's monitor it.  Otherwise there's really
263 			 * nothing we can do.
264 			 */
265 			t = master->last_rcvd_timestamp;
266 			TIMESPEC_ADD_DB_TIMEOUT(&t,
267 			    rep->heartbeat_monitor_timeout);
268 			my_action = __repmgr_call_election;
269 		} else
270 			return (FALSE);
271 	} else
272 		return (FALSE);
273 
274 	*deadline = t;
275 	if (action != NULL)
276 		*action = my_action;
277 	return (TRUE);
278 }
279 
280 /*
281  * Sends a heartbeat message.
282  *
283  * repmgr also uses the heartbeat facility to manage rerequests.  We
284  * send the master's current generation and max_perm_lsn with the heartbeat
285  * message to help a client determine whether it has all master transactions.
286  * When a client receives a heartbeat message, it also checks whether it
287  * needs to rerequest anything.  Note that heartbeats must be enabled for
288  * this rerequest processing to occur.
289  */
290 static int
__repmgr_send_heartbeat(env)291 __repmgr_send_heartbeat(env)
292 	ENV *env;
293 {
294 	DB_REP *db_rep;
295 	REP *rep;
296 	DBT control, rec;
297 	__repmgr_permlsn_args permlsn;
298 	u_int8_t buf[__REPMGR_PERMLSN_SIZE];
299 	u_int unused1, unused2;
300 	int ret, unused3;
301 
302 	db_rep = env->rep_handle;
303 	rep = db_rep->region;
304 
305 	permlsn.generation = rep->gen;
306 	if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0)
307 		return (ret);
308 	__repmgr_permlsn_marshal(env, &permlsn, buf);
309 	control.data = buf;
310 	control.size = __REPMGR_PERMLSN_SIZE;
311 
312 	DB_INIT_DBT(rec, NULL, 0);
313 	return (__repmgr_send_broadcast(env,
314 	    REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3));
315 }
316 
317 /*
318  * PUBLIC: REPMGR_SITE *__repmgr_connected_master __P((ENV *));
319  */
320 REPMGR_SITE *
__repmgr_connected_master(env)321 __repmgr_connected_master(env)
322 	ENV *env;
323 {
324 	DB_REP *db_rep;
325 	REPMGR_SITE *master;
326 	int master_id;
327 
328 	db_rep = env->rep_handle;
329 	master_id = db_rep->region->master_id;
330 
331 	if (!IS_KNOWN_REMOTE_SITE(master_id))
332 		return (NULL);
333 	master = SITE_FROM_EID(master_id);
334 	if (master->state == SITE_CONNECTED)
335 		return (master);
336 	return (NULL);
337 }
338 
339 static int
__repmgr_call_election(env)340 __repmgr_call_election(env)
341 	ENV *env;
342 {
343 	REPMGR_CONNECTION *conn;
344 	REPMGR_SITE *master;
345 	int ret;
346 
347 	master = __repmgr_connected_master(env);
348 	if (master == NULL)
349 		return (0);
350 	RPRINT(env, (env, DB_VERB_REPMGR_MISC,
351 	    "heartbeat monitor timeout expired"));
352 	STAT(env->rep_handle->region->mstat.st_connection_drop++);
353 	if ((conn = master->ref.conn.in) != NULL &&
354 	    (ret = __repmgr_bust_connection(env, conn)) != 0)
355 		return (ret);
356 	if ((conn = master->ref.conn.out) != NULL &&
357 	    (ret = __repmgr_bust_connection(env, conn)) != 0)
358 		return (ret);
359 	return (0);
360 }
361 
362 /*
363  * PUBLIC: int __repmgr_check_timeouts __P((ENV *));
364  *
365  * !!!
366  * Assumes caller holds the mutex.
367  */
368 int
__repmgr_check_timeouts(env)369 __repmgr_check_timeouts(env)
370 	ENV *env;
371 {
372 	db_timespec when, now;
373 	HEARTBEAT_ACTION action;
374 	int ret;
375 
376 	/*
377 	 * Figure out the next heartbeat-related thing to be done.  Then, if
378 	 * it's time to do it, do so.
379 	 */
380 	if (__repmgr_next_timeout(env, &when, &action)) {
381 		__os_gettime(env, &now, 1);
382 		if (timespeccmp(&when, &now, <=) &&
383 		    (ret = (*action)(env)) != 0)
384 			return (ret);
385 	}
386 
387 	return (__repmgr_retry_connections(env));
388 }
389 
390 /*
391  * Initiates connection attempts for any sites on the idle list whose retry
392  * times have expired.
393  */
394 static int
__repmgr_retry_connections(env)395 __repmgr_retry_connections(env)
396 	ENV *env;
397 {
398 	DB_REP *db_rep;
399 	REPMGR_SITE *site;
400 	REPMGR_RETRY *retry;
401 	db_timespec now;
402 	int eid, ret;
403 
404 	db_rep = env->rep_handle;
405 	__os_gettime(env, &now, 1);
406 
407 	while (!TAILQ_EMPTY(&db_rep->retries)) {
408 		retry = TAILQ_FIRST(&db_rep->retries);
409 		if (timespeccmp(&retry->time, &now, >=))
410 			break;	/* since items are in time order */
411 
412 		TAILQ_REMOVE(&db_rep->retries, retry, entries);
413 
414 		eid = retry->eid;
415 		__os_free(env, retry);
416 		DB_ASSERT(env, IS_VALID_EID(eid));
417 		site = SITE_FROM_EID(eid);
418 		DB_ASSERT(env, site->state == SITE_PAUSING);
419 
420 		if (site->membership == SITE_PRESENT) {
421 			if ((ret = __repmgr_try_one(env, eid)) != 0)
422 				return (ret);
423 		} else
424 			site->state = SITE_IDLE;
425 	}
426 	return (0);
427 }
428 
429 /*
430  * PUBLIC: int __repmgr_first_try_connections __P((ENV *));
431  *
432  * !!!
433  * Assumes caller holds the mutex.
434  */
435 int
__repmgr_first_try_connections(env)436 __repmgr_first_try_connections(env)
437 	ENV *env;
438 {
439 	DB_REP *db_rep;
440 	REPMGR_SITE *site;
441 	int eid, ret;
442 
443 	db_rep = env->rep_handle;
444 	FOR_EACH_REMOTE_SITE_INDEX(eid) {
445 		site = SITE_FROM_EID(eid);
446 		/*
447 		 * Normally all sites would be IDLE here.  But if a user thread
448 		 * triggered an auto-start in a subordinate process, our send()
449 		 * function may have found new sites when it sync'ed site
450 		 * addresses, and that action causes connection attempts to be
451 		 * scheduled (resulting in PAUSING state here, or conceivably
452 		 * even CONNECTING or CONNECTED).
453 		 */
454 		if (site->state == SITE_IDLE &&
455 		    site->membership == SITE_PRESENT &&
456 		    (ret = __repmgr_try_one(env, eid)) != 0)
457 			return (ret);
458 	}
459 	return (0);
460 }
461 
462 /*
463  * Starts a thread to open a connection to the site at the given EID.
464  */
465 static int
__repmgr_try_one(env,eid)466 __repmgr_try_one(env, eid)
467 	ENV *env;
468 	int eid;
469 {
470 	DB_REP *db_rep;
471 	REPMGR_SITE *site;
472 	REPMGR_RUNNABLE *th;
473 	int ret;
474 
475 	db_rep = env->rep_handle;
476 	DB_ASSERT(env, IS_VALID_EID(eid));
477 	site = SITE_FROM_EID(eid);
478 	th = site->connector;
479 	if (th == NULL) {
480 		if ((ret = __os_malloc(env, sizeof(REPMGR_RUNNABLE), &th)) != 0)
481 			return (ret);
482 		site->connector = th;
483 	} else if (th->finished) {
484 		if ((ret = __repmgr_thread_join(th)) != 0)
485 			return (ret);
486 	} else {
487 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
488 		  "eid %lu previous connector thread still running; will retry",
489 		    (u_long)eid));
490 		return (__repmgr_schedule_connection_attempt(env,
491 			eid, FALSE));
492 	}
493 
494 	site->state = SITE_CONNECTING;
495 
496 	th->run = __repmgr_connector_thread;
497 	th->args.eid = eid;
498 	if ((ret = __repmgr_thread_start(env, th)) != 0) {
499 		__os_free(env, th);
500 		site->connector = NULL;
501 	}
502 	return (ret);
503 }
504 
505 static void *
__repmgr_connector_thread(argsp)506 __repmgr_connector_thread(argsp)
507 	void *argsp;
508 {
509 	REPMGR_RUNNABLE *th;
510 	ENV *env;
511 	int ret;
512 
513 	th = argsp;
514 	env = th->env;
515 
516 	RPRINT(env, (env, DB_VERB_REPMGR_MISC,
517 	    "starting connector thread, eid %u", th->args.eid));
518 	if ((ret = __repmgr_connector_main(env, th)) != 0) {
519 		__db_err(env, ret, DB_STR("3617", "connector thread failed"));
520 		(void)__repmgr_thread_failure(env, ret);
521 	}
522 	RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting"));
523 
524 	th->finished = TRUE;
525 	return (NULL);
526 }
527 
528 static int
__repmgr_connector_main(env,th)529 __repmgr_connector_main(env, th)
530 	ENV *env;
531 	REPMGR_RUNNABLE *th;
532 {
533 	DB_REP *db_rep;
534 	REPMGR_SITE *site;
535 	REPMGR_CONNECTION *conn;
536 	DB_REPMGR_CONN_ERR info;
537 	repmgr_netaddr_t netaddr;
538 	SITE_STRING_BUFFER site_string;
539 	int err, ret, t_ret;
540 
541 	db_rep = env->rep_handle;
542 	ret = 0;
543 
544 	LOCK_MUTEX(db_rep->mutex);
545 	DB_ASSERT(env, IS_VALID_EID(th->args.eid));
546 	site = SITE_FROM_EID(th->args.eid);
547 	if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped)
548 		goto unlock;
549 
550 	/*
551 	 * Drop the mutex during operations that could block.  During those
552 	 * times, the site struct could move (if we had to grow the sites
553 	 * array), but host wouldn't.
554 	 *
555 	 * Also, during those times we might receive an incoming connection from
556 	 * the site, which would change its state.  So, check state each time we
557 	 * reacquire the mutex, and quit if the state of the world changed while
558 	 * we were away.
559 	 */
560 	netaddr = site->net_addr;
561 	RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connecting to %s",
562 		__repmgr_format_site_loc(site, site_string)));
563 	UNLOCK_MUTEX(db_rep->mutex);
564 
565 	if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) {
566 		DB_EVENT(env,  DB_EVENT_REP_CONNECT_ESTD, &th->args.eid);
567 		LOCK_MUTEX(db_rep->mutex);
568 		if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
569 			__db_err(env, ret, DB_STR("3618",
570 			    "set_nonblock in connnect thread"));
571 			goto cleanup;
572 		}
573 		conn->type = REP_CONNECTION;
574 		site = SITE_FROM_EID(th->args.eid);
575 		if (site->state != SITE_CONNECTING ||
576 		    db_rep->repmgr_status == stopped)
577 			goto cleanup;
578 
579 		conn->eid = th->args.eid;
580 		site = SITE_FROM_EID(th->args.eid);
581 		site->ref.conn.out = conn;
582 		site->state = SITE_CONNECTED;
583 		__os_gettime(env, &site->last_rcvd_timestamp, 1);
584 		ret = __repmgr_wake_main_thread(env);
585 	} else if (ret == DB_REP_UNAVAIL) {
586 		/* Retryable error while trying to connect: retry later. */
587 		info.eid = th->args.eid;
588 		info.error = err;
589 		DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info);
590 		STAT(db_rep->region->mstat.st_connect_fail++);
591 
592 		LOCK_MUTEX(db_rep->mutex);
593 		site = SITE_FROM_EID(th->args.eid);
594 		if (site->state != SITE_CONNECTING ||
595 		    db_rep->repmgr_status == stopped) {
596 			ret = 0;
597 			goto unlock;
598 		}
599 		ret = __repmgr_schedule_connection_attempt(env,
600 		    th->args.eid, FALSE);
601 	} else
602 		goto out;
603 
604 	if (0) {
605 cleanup:
606 		if ((t_ret = __repmgr_destroy_conn(env, conn)) != 0 &&
607 		    ret == 0)
608 			ret = t_ret;
609 	}
610 
611 unlock:
612 	UNLOCK_MUTEX(db_rep->mutex);
613 out:
614 	return (ret);
615 }
616 
617 /*
618  * PUBLIC: int __repmgr_send_v1_handshake __P((ENV *,
619  * PUBLIC:     REPMGR_CONNECTION *, void *, size_t));
620  */
621 int
__repmgr_send_v1_handshake(env,conn,buf,len)622 __repmgr_send_v1_handshake(env, conn, buf, len)
623 	ENV *env;
624 	REPMGR_CONNECTION *conn;
625 	void *buf;
626 	size_t len;
627 {
628 	DB_REP *db_rep;
629 	REP *rep;
630 	repmgr_netaddr_t *my_addr;
631 	DB_REPMGR_V1_HANDSHAKE buffer;
632 	DBT cntrl, rec;
633 
634 	db_rep = env->rep_handle;
635 	rep = db_rep->region;
636 	my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
637 
638 	/*
639 	 * We're about to send from a structure that has padding holes in it.
640 	 * Initializing it keeps Valgrind happy, plus we really shouldn't be
641 	 * sending out random garbage anyway (pro forma privacy issue).
642 	 */
643 	memset(&buffer, 0, sizeof(buffer));
644 	buffer.version = 1;
645 	buffer.priority = htonl(rep->priority);
646 	buffer.port = my_addr->port;
647 	cntrl.data = &buffer;
648 	cntrl.size = sizeof(buffer);
649 
650 	rec.data = buf;
651 	rec.size = (u_int32_t)len;
652 
653 	/*
654 	 * It would of course be disastrous to block the select() thread, so
655 	 * pass the "maxblock" argument as 0.  Fortunately blocking should
656 	 * never be necessary here, because the hand-shake is always the first
657 	 * thing we send.  Which is a good thing, because it would be almost as
658 	 * disastrous if we allowed ourselves to drop a handshake.
659 	 */
660 	return (__repmgr_send_one(env,
661 	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0));
662 }
663 
664 /*
665  * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *));
666  *
667  * !!!
668  * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
669  */
670 int
__repmgr_read_from_site(env,conn)671 __repmgr_read_from_site(env, conn)
672 	ENV *env;
673 	REPMGR_CONNECTION *conn;
674 {
675 	DB_REP *db_rep;
676 	REPMGR_SITE *site;
677 	int ret;
678 
679 	db_rep = env->rep_handle;
680 
681 	/*
682 	 * Loop, just in case we get EINTR and need to restart the I/O.  (All
683 	 * other branches return.)
684 	 */
685 	for (;;) {
686 		switch ((ret = __repmgr_read_conn(conn))) {
687 #ifndef DB_WIN32
688 		case EINTR:
689 			continue;
690 #endif
691 
692 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
693 		case DB_REPMGR_EAGAIN:
694 #endif
695 		case WOULDBLOCK:
696 			return (0);
697 
698 		case DB_REP_UNAVAIL:
699 			/* Error 0 is understood to mean EOF. */
700 			__repmgr_fire_conn_err_event(env, conn, 0);
701 			STAT(env->rep_handle->
702 			    region->mstat.st_connection_drop++);
703 			return (DB_REP_UNAVAIL);
704 
705 		case 0:
706 			if (IS_VALID_EID(conn->eid)) {
707 				site = SITE_FROM_EID(conn->eid);
708 				__os_gettime(env,
709 				    &site->last_rcvd_timestamp, 1);
710 			}
711 			return (conn->reading_phase == SIZES_PHASE ?
712 			    prepare_input(env, conn) :
713 			    dispatch_msgin(env, conn));
714 
715 		default:
716 #ifdef EBADF
717 			DB_ASSERT(env, ret != EBADF);
718 #endif
719 			__repmgr_fire_conn_err_event(env, conn, ret);
720 			STAT(db_rep->region->mstat.st_connection_drop++);
721 			return (DB_REP_UNAVAIL);
722 		}
723 	}
724 }
725 
726 /*
727  * Reads in the current input phase, as defined by the connection's IOVECS
728  * struct.
729  *
730  * Returns DB_REP_UNAVAIL for EOF.
731  *
732  * Makes no assumption about synchronization: it's up to the caller to hold
733  * mutex if necessary.
734  *
735  * PUBLIC: int __repmgr_read_conn __P((REPMGR_CONNECTION *));
736  */
737 int
__repmgr_read_conn(conn)738 __repmgr_read_conn(conn)
739 	REPMGR_CONNECTION *conn;
740 {
741 	size_t nr;
742 	int ret;
743 
744 	/*
745 	 * Keep reading pieces as long as we're making some progress, or until
746 	 * we complete the current read phase as defined in iovecs.
747 	 */
748 	for (;;) {
749 		if ((ret = __repmgr_readv(conn->fd,
750 		    &conn->iovecs.vectors[conn->iovecs.offset],
751 		    conn->iovecs.count - conn->iovecs.offset, &nr)) != 0)
752 			return (ret);
753 
754 		if (nr == 0)
755 			return (DB_REP_UNAVAIL);
756 
757 		if (__repmgr_update_consumed(&conn->iovecs, nr)) {
758 			/* We've fully read as much as we wanted. */
759 			return (0);
760 		}
761 	}
762 }
763 
764 /*
765  * Having finished reading the 9-byte message header, figure out what kind of
766  * message we're about to receive, and prepare input buffers accordingly.  The
767  * header includes enough information for us to figure out how much buffer space
768  * we need to allocate (though in some cases we need to do a bit of computation
769  * to arrive at the answer).
770  *
771  * Caller must hold mutex.
772  */
773 static int
prepare_input(env,conn)774 prepare_input(env, conn)
775 	ENV *env;
776 	REPMGR_CONNECTION *conn;
777 {
778 #define	MEM_ALIGN sizeof(double)
779 	DBT *dbt;
780 	__repmgr_msg_hdr_args msg_hdr;
781 	REPMGR_RESPONSE *resp;
782 	u_int32_t control_size, rec_size, size;
783 	size_t memsize, control_offset, rec_offset;
784 	void *membase;
785 	int ret, skip;
786 
787 	DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
788 
789 	/*
790 	 * We can only get here after having read the full 9 bytes that we
791 	 * expect, so this can't fail.
792 	 */
793 	ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
794 	    conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
795 	DB_ASSERT(env, ret == 0);
796 
797 	__repmgr_iovec_init(&conn->iovecs);
798 	skip = FALSE;
799 
800 	switch ((conn->msg_type = msg_hdr.type)) {
801 	case REPMGR_HEARTBEAT:
802 		/*
803 		 * The underlying byte-receiving mechanism will already have
804 		 * noted the fact that we got some traffic on this connection,
805 		 * which is all that is needed to monitor the heartbeat.  But
806 		 * we also put the heartbeat message on the message queue so
807 		 * that it will perform rerequest processing.
808 		 */
809 	case REPMGR_REP_MESSAGE:
810 		env->rep_handle->seen_repmsg = TRUE;
811 		control_size = REP_MSG_CONTROL_SIZE(msg_hdr);
812 		rec_size = REP_MSG_REC_SIZE(msg_hdr);
813 		if (control_size == 0) {
814 			if (conn->msg_type == REPMGR_HEARTBEAT) {
815 				/*
816 				 * Got an old-style heartbeat without payload,
817 				 * nothing to do.
818 				 */
819 				skip = TRUE;
820 				break;
821 			} else {
822 				__db_errx(env, DB_STR("3619",
823 				    "illegal size for rep msg"));
824 				return (DB_REP_UNAVAIL);
825 			}
826 		}
827 		/*
828 		 * Allocate a block of memory large enough to hold a
829 		 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
830 		 * data areas that it points to.  Start by calculating
831 		 * the total memory needed.
832 		 */
833 		memsize = DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
834 		control_offset = memsize;
835 		memsize += control_size;
836 		if (rec_size > 0) {
837 			memsize = DB_ALIGN(memsize, MEM_ALIGN);
838 			rec_offset = memsize;
839 			memsize += rec_size;
840 		} else
841 			COMPQUIET(rec_offset, 0);
842 		if ((ret = __os_malloc(env, memsize, &membase)) != 0)
843 			return (ret);
844 		conn->input.rep_message = membase;
845 		conn->input.rep_message->msg_hdr = msg_hdr;
846 		conn->input.rep_message->v.repmsg.originating_eid = conn->eid;
847 
848 		DB_INIT_DBT(conn->input.rep_message->v.repmsg.control,
849 		    (u_int8_t*)membase + control_offset, control_size);
850 		__repmgr_add_dbt(&conn->iovecs,
851 		    &conn->input.rep_message->v.repmsg.control);
852 
853 		if (rec_size > 0) {
854 			DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
855 			    (rec_size > 0 ?
856 				(u_int8_t*)membase + rec_offset : NULL),
857 			    rec_size);
858 			__repmgr_add_dbt(&conn->iovecs,
859 			    &conn->input.rep_message->v.repmsg.rec);
860 		} else
861 			DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
862 			    NULL, 0);
863 		break;
864 
865 	case REPMGR_APP_MESSAGE:
866 		/*
867 		 * We need a buffer big enough to hold the REPMGR_MESSAGE struct
868 		 * and the data that we expect to receive on the wire.  We must
869 		 * extend the struct size for the variable-length DBT array at
870 		 * the end.
871 		 */
872 		size = DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) +
873 		    APP_MSG_SEGMENT_COUNT(msg_hdr) * sizeof(DBT)),
874 		    MEM_ALIGN);
875 		memsize = size + APP_MSG_BUFFER_SIZE(msg_hdr);
876 		if ((ret = __os_malloc(env, memsize, &membase)) != 0)
877 			return (ret);
878 		conn->input.rep_message = membase;
879 		conn->input.rep_message->msg_hdr = msg_hdr;
880 		conn->input.rep_message->v.appmsg.conn = conn;
881 
882 		DB_INIT_DBT(conn->input.rep_message->v.appmsg.buf,
883 		    (u_int8_t*)membase + size,
884 		    APP_MSG_BUFFER_SIZE(msg_hdr));
885 		__repmgr_add_dbt(&conn->iovecs,
886 		    &conn->input.rep_message->v.appmsg.buf);
887 		break;
888 
889 	case REPMGR_OWN_MSG:
890 		size = sizeof(REPMGR_MESSAGE) + REPMGR_OWN_BUF_SIZE(msg_hdr);
891 		if ((ret = __os_malloc(env, size, &membase)) != 0)
892 			return (ret);
893 		conn->input.rep_message = membase;
894 		conn->input.rep_message->msg_hdr = msg_hdr;
895 
896 		/*
897 		 * Save "conn" pointer in case this turns out to be a one-shot
898 		 * request.  If it isn't, it won't matter.
899 		 */
900 		/*
901 		 * An OWN msg that arrives in PARAMETERS state has bypassed the
902 		 * final handshake, implying that this connection is to be used
903 		 * for a one-shot GMDB request.
904 		 */
905 		if (REPMGR_OWN_BUF_SIZE(msg_hdr) == 0) {
906 			__db_errx(env, DB_STR_A("3680",
907 			    "invalid own buf size %lu in prepare_input", "%lu"),
908 			    (u_long)REPMGR_OWN_BUF_SIZE(msg_hdr));
909 			return (DB_REP_UNAVAIL);
910 		}
911 		DB_INIT_DBT(conn->input.rep_message->v.gmdb_msg.request,
912 		    (u_int8_t*)membase + sizeof(REPMGR_MESSAGE),
913 		    REPMGR_OWN_BUF_SIZE(msg_hdr));
914 		__repmgr_add_dbt(&conn->iovecs,
915 		    &conn->input.rep_message->v.gmdb_msg.request);
916 		break;
917 
918 	case REPMGR_APP_RESPONSE:
919 		size = APP_RESP_BUFFER_SIZE(msg_hdr);
920 		conn->cur_resp = APP_RESP_TAG(msg_hdr);
921 		if (conn->cur_resp >= conn->aresp) {
922 			__db_errx(env, DB_STR_A("3681",
923 			    "invalid cur resp %lu in prepare_input", "%lu"),
924 			    (u_long)conn->cur_resp);
925 			return (DB_REP_UNAVAIL);
926 		}
927 		resp = &conn->responses[conn->cur_resp];
928 		DB_ASSERT(env, F_ISSET(resp, RESP_IN_USE));
929 
930 		dbt = &resp->dbt;
931 
932 		/*
933 		 * Prepare to read message body into either the user-supplied
934 		 * buffer, or one we allocate here.
935 		 */
936 		ret = 0;
937 		if (!F_ISSET(resp, RESP_THREAD_WAITING)) {
938 			/* Caller already timed out; allocate dummy buffer. */
939 			if (size > 0) {
940 				memset(dbt, 0, sizeof(*dbt));
941 				ret = __os_malloc(env, size, &dbt->data);
942 				F_SET(resp, RESP_DUMMY_BUF);
943 			} else
944 				F_CLR(resp, RESP_IN_USE);
945 		} else if (F_ISSET(dbt, DB_DBT_MALLOC))
946 			ret = __os_umalloc(env, size, &dbt->data);
947 		else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
948 			if (dbt->data == NULL || dbt->size < size)
949 				ret = __os_urealloc(env, size, &dbt->data);
950 		} else if (F_ISSET(dbt, DB_DBT_USERMEM)) {
951 			/* Recipient should have checked size limit. */
952 			DB_ASSERT(env, size <= dbt->ulen);
953 		}
954 		dbt->size = size;
955 		if (ret != 0)
956 			return (ret);
957 
958 		if (size > 0) {
959 			__repmgr_add_dbt(&conn->iovecs, dbt);
960 			F_SET(resp, RESP_READING);
961 		} else {
962 			skip = TRUE;
963 			if (F_ISSET(resp, RESP_THREAD_WAITING)) {
964 				F_SET(resp, RESP_COMPLETE);
965 				if ((ret = __repmgr_wake_waiters(env,
966 				    &conn->response_waiters)) != 0)
967 					return (ret);
968 			}
969 		}
970 		break;
971 
972 	case REPMGR_RESP_ERROR:
973 		DB_ASSERT(env, RESP_ERROR_TAG(msg_hdr) < conn->aresp &&
974 		    conn->responses != NULL);
975 		resp = &conn->responses[RESP_ERROR_TAG(msg_hdr)];
976 		DB_ASSERT(env, !F_ISSET(resp, RESP_READING));
977 		if (F_ISSET(resp, RESP_THREAD_WAITING)) {
978 			F_SET(resp, RESP_COMPLETE);
979 
980 			/*
981 			 * DB errors are always negative, but we only send
982 			 * unsigned values on the wire.
983 			 */
984 			resp->ret = -((int)RESP_ERROR_CODE(msg_hdr));
985 			if ((ret = __repmgr_wake_waiters(env,
986 			    &conn->response_waiters)) != 0)
987 				return (ret);
988 		} else
989 			F_CLR(resp, RESP_IN_USE);
990 		skip = TRUE;
991 		break;
992 
993 	case REPMGR_HANDSHAKE:
994 	case REPMGR_PERMLSN:
995 		if ((ret = __repmgr_prepare_simple_input(env,
996 		    conn, &msg_hdr)) != 0)
997 			return (ret);
998 		break;
999 
1000 	default:
1001 		__db_errx(env, DB_STR_A("3676",
1002 		    "unexpected msg type %lu in prepare_input", "%lu"),
1003 		    (u_long)conn->msg_type);
1004 		return (DB_REP_UNAVAIL);
1005 	}
1006 
1007 	if (skip) {
1008 		/*
1009 		 * We can skip the DATA_PHASE, because the current message type
1010 		 * only has a header, no following data.
1011 		 */
1012 		__repmgr_reset_for_reading(conn);
1013 	} else
1014 		conn->reading_phase = DATA_PHASE;
1015 
1016 	return (0);
1017 }
1018 
1019 /*
1020  * PUBLIC: int __repmgr_prepare_simple_input __P((ENV *,
1021  * PUBLIC:     REPMGR_CONNECTION *, __repmgr_msg_hdr_args *));
1022  */
1023 int
__repmgr_prepare_simple_input(env,conn,msg_hdr)1024 __repmgr_prepare_simple_input(env, conn, msg_hdr)
1025 	ENV *env;
1026 	REPMGR_CONNECTION *conn;
1027 	__repmgr_msg_hdr_args *msg_hdr;
1028 {
1029 	DBT *dbt;
1030 	u_int32_t control_size, rec_size;
1031 	int ret;
1032 
1033 	control_size = REP_MSG_CONTROL_SIZE(*msg_hdr);
1034 	rec_size = REP_MSG_REC_SIZE(*msg_hdr);
1035 
1036 	dbt = &conn->input.repmgr_msg.cntrl;
1037 	if ((dbt->size = control_size) > 0) {
1038 		if ((ret = __os_malloc(env,
1039 		    dbt->size, &dbt->data)) != 0)
1040 			return (ret);
1041 		__repmgr_add_dbt(&conn->iovecs, dbt);
1042 	}
1043 
1044 	dbt = &conn->input.repmgr_msg.rec;
1045 	if ((dbt->size = rec_size) > 0) {
1046 		if ((ret = __os_malloc(env,
1047 		    dbt->size, &dbt->data)) != 0) {
1048 			dbt = &conn->input.repmgr_msg.cntrl;
1049 			if (dbt->size > 0)
1050 				__os_free(env, dbt->data);
1051 			return (ret);
1052 		}
1053 		__repmgr_add_dbt(&conn->iovecs, dbt);
1054 	}
1055 	return (0);
1056 }
1057 
1058 /*
1059  * Processes an incoming message, depending on our current state.
1060  *
1061  * Caller must hold mutex.
1062  */
1063 static int
dispatch_msgin(env,conn)1064 dispatch_msgin(env, conn)
1065 	ENV *env;
1066 	REPMGR_CONNECTION *conn;
1067 {
1068 	DB_REP *db_rep;
1069 	REPMGR_SITE *site;
1070 	REPMGR_RUNNABLE *th;
1071 	REPMGR_RESPONSE *resp;
1072 	DBT *dbt;
1073 	char *hostname;
1074 	int eid, ret;
1075 
1076 	DB_ASSERT(env, conn->reading_phase == DATA_PHASE);
1077 	db_rep = env->rep_handle;
1078 
1079 	switch (conn->state) {
1080 	case CONN_CONNECTED:
1081 		/*
1082 		 * In this state, we know we're working with an outgoing
1083 		 * connection.  We've sent a version proposal, and now expect
1084 		 * the response (which could be a dumb old V1 handshake).
1085 		 */
1086 		ONLY_HANDSHAKE(env, conn);
1087 
1088 		/*
1089 		 * Here is a good opportunity to clean up this site's connector
1090 		 * thread, because we generally come through here after making
1091 		 * an outgoing connection, yet we're out of the main loop, so we
1092 		 * don't hit this often.
1093 		 */
1094 		eid = conn->eid;
1095 		DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1096 		site = SITE_FROM_EID(eid);
1097 		th = site->connector;
1098 		if (th != NULL && th->finished) {
1099 			if ((ret = __repmgr_thread_join(th)) != 0)
1100 				return (ret);
1101 			__os_free(env, th);
1102 			site->connector = NULL;
1103 		}
1104 
1105 		if ((ret = read_version_response(env, conn)) != 0)
1106 			return (ret);
1107 		break;
1108 
1109 	case CONN_NEGOTIATE:
1110 		/*
1111 		 * Since we're in this state, we know we're working with an
1112 		 * incoming connection, and this is the first message we've
1113 		 * received.  So it must be a version negotiation proposal (or a
1114 		 * legacy V1 handshake).  (We'll verify this of course.)
1115 		 */
1116 		ONLY_HANDSHAKE(env, conn);
1117 		if ((ret = send_version_response(env, conn)) != 0)
1118 			return (ret);
1119 		break;
1120 
1121 	case CONN_PARAMETERS:
1122 		/*
1123 		 * We've previously agreed on a (>1) version, so we expect
1124 		 * either the other side's parameters handshake, or possibly a
1125 		 * GMDB request on a one-shot, dedicated connection.
1126 		 */
1127 		switch (conn->msg_type) {
1128 		case REPMGR_HANDSHAKE:
1129 			dbt = &conn->input.repmgr_msg.rec;
1130 			hostname = dbt->data;
1131 			hostname[dbt->size-1] = '\0';
1132 			if ((ret = accept_handshake(env, conn, hostname)) != 0)
1133 				return (ret);
1134 			conn->state = CONN_READY;
1135 			break;
1136 		case REPMGR_OWN_MSG:
1137 			/*
1138 			 * GM change requests arrive in their own dedicated
1139 			 * connections, and when they're served the entire
1140 			 * connection isn't needed any more.  So the message
1141 			 * processing thread will do the entire job of serving
1142 			 * the request and finishing off the connection; so we
1143 			 * don't have to read it any more.  Note that normally
1144 			 * whenever we remove a connection from our list we
1145 			 * decrement the reference count; but we also increment
1146 			 * it whenever we pass a reference over to the message
1147 			 * processing threads' queue.  So in this case it's a
1148 			 * wash.
1149 			 */
1150 			conn->input.rep_message->v.gmdb_msg.conn = conn;
1151 			TAILQ_REMOVE(&db_rep->connections, conn, entries);
1152 			if ((ret = __repmgr_queue_put(env,
1153 			    conn->input.rep_message)) != 0)
1154 				return (ret);
1155 			break;
1156 
1157 		default:
1158 			__db_errx(env, DB_STR_A("3620",
1159 			    "unexpected msg type %d in PARAMETERS state", "%d"),
1160 			    (int)conn->msg_type);
1161 			return (DB_REP_UNAVAIL);
1162 		}
1163 
1164 		break;
1165 
1166 	case CONN_READY:
1167 	case CONN_CONGESTED:
1168 		/*
1169 		 * We have a complete message, so process it.  Acks and
1170 		 * handshakes get processed here, in line.  Regular rep messages
1171 		 * get posted to a queue, to be handled by a thread from the
1172 		 * message thread pool.
1173 		 */
1174 		switch (conn->msg_type) {
1175 		case REPMGR_PERMLSN:
1176 			if ((ret = record_permlsn(env, conn)) != 0)
1177 				return (ret);
1178 			break;
1179 
1180 		case REPMGR_HEARTBEAT:
1181 		case REPMGR_APP_MESSAGE:
1182 		case REPMGR_REP_MESSAGE:
1183 			if ((ret = __repmgr_queue_put(env,
1184 			    conn->input.rep_message)) != 0)
1185 				return (ret);
1186 			/*
1187 			 * The queue has taken over responsibility for the
1188 			 * rep_message buffer, and will free it later.
1189 			 */
1190 			if (conn->msg_type == REPMGR_APP_MESSAGE)
1191 				conn->ref_count++;
1192 			break;
1193 
1194 		case REPMGR_OWN_MSG:
1195 			/*
1196 			 * Since we're in one of the "ready" states we know this
1197 			 * isn't a one-shot request, so we are not giving
1198 			 * ownership of this connection over to the message
1199 			 * thread queue; we're going to keep reading on it
1200 			 * ourselves.  The message thread that processes this
1201 			 * request has no need for a connection anyway, since
1202 			 * there is no response that needs to be returned.
1203 			 */
1204 			conn->input.rep_message->v.gmdb_msg.conn = NULL;
1205 			if ((ret = process_own_msg(env, conn)) != 0)
1206 				return (ret);
1207 			break;
1208 
1209 		case REPMGR_APP_RESPONSE:
1210 			DB_ASSERT(env, conn->cur_resp < conn->aresp &&
1211 			    conn->responses != NULL);
1212 			resp = &conn->responses[conn->cur_resp];
1213 			DB_ASSERT(env, F_ISSET(resp, RESP_READING));
1214 			F_CLR(resp, RESP_READING);
1215 			if (F_ISSET(resp, RESP_THREAD_WAITING)) {
1216 				F_SET(resp, RESP_COMPLETE);
1217 				if ((ret = __repmgr_wake_waiters(env,
1218 				    &conn->response_waiters)) != 0)
1219 					return (ret);
1220 			} else {
1221 				/*
1222 				 * If the calling thread is no longer with us,
1223 				 * yet we're reading, it can only mean we're
1224 				 * reading into a dummy buffer, so free it now.
1225 				 */
1226 				DB_ASSERT(env, F_ISSET(resp, RESP_DUMMY_BUF));
1227 				__os_free(env, resp->dbt.data);
1228 				F_CLR(resp, RESP_IN_USE);
1229 			}
1230 			break;
1231 
1232 		case REPMGR_RESP_ERROR:
1233 		default:
1234 			__db_errx(env, DB_STR_A("3621",
1235 			    "unexpected msg type rcvd in ready state: %d",
1236 			    "%d"), (int)conn->msg_type);
1237 			return (DB_REP_UNAVAIL);
1238 		}
1239 		break;
1240 
1241 	case CONN_DEFUNCT:
1242 		break;
1243 
1244 	default:
1245 		DB_ASSERT(env, FALSE);
1246 	}
1247 
1248 	switch (conn->msg_type) {
1249 	case REPMGR_HANDSHAKE:
1250 	case REPMGR_PERMLSN:
1251 		dbt = &conn->input.repmgr_msg.cntrl;
1252 		if (dbt->size > 0)
1253 			__os_free(env, dbt->data);
1254 		dbt = &conn->input.repmgr_msg.rec;
1255 		if (dbt->size > 0)
1256 			__os_free(env, dbt->data);
1257 		break;
1258 	default:
1259 		/*
1260 		 * Some messages in REPMGR_OWN_MSG format are also handled
1261 		 */
1262 		break;
1263 	}
1264 	__repmgr_reset_for_reading(conn);
1265 	return (0);
1266 }
1267 
1268 /*
1269  * Process one of repmgr's "own" message types, and one that occurs on a regular
1270  * (not one-shot) connection.
1271  */
1272 static int
process_own_msg(env,conn)1273 process_own_msg(env, conn)
1274 	ENV *env;
1275 	REPMGR_CONNECTION *conn;
1276 {
1277 	DB_REP *db_rep;
1278 	DBT *dbt;
1279 	REPMGR_SITE *site;
1280 	REPMGR_MESSAGE *msg;
1281 	__repmgr_connect_reject_args reject;
1282 	__repmgr_parm_refresh_args parms;
1283 	int ret;
1284 
1285 	ret = 0;
1286 	/*
1287 	 * Set "msg" to point to the message struct.  If we do all necessary
1288 	 * processing here now, leave it set so that it can be freed.  On the
1289 	 * other hand, if we pass it off to the message queue for later
1290 	 * processing by a message thread, we want to avoid freeing the memory
1291 	 * here, so clear the pointer in such a case.
1292 	 */
1293 	switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) {
1294 	case REPMGR_CONNECT_REJECT:
1295 		dbt = &msg->v.gmdb_msg.request;
1296 		if ((ret = __repmgr_connect_reject_unmarshal(env,
1297 		    &reject, dbt->data, dbt->size, NULL)) != 0)
1298 			return (DB_REP_UNAVAIL);
1299 
1300 		/*
1301 		 * If we're being rejected by someone who has more up-to-date
1302 		 * membership information than we do, it means we have been
1303 		 * removed from the group.  If we've just gotten started, we can
1304 		 * make one attempt at automatically rejoining; otherwise we bow
1305 		 * out gracefully.
1306 		 */
1307 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1308 			"got rejection msg citing version %lu/%lu",
1309 			(u_long)reject.gen, (u_long)reject.version));
1310 
1311 		if (__repmgr_gmdb_version_cmp(env,
1312 		    reject.gen, reject.version) > 0) {
1313 			if (env->rep_handle->seen_repmsg)
1314 				ret = DB_DELETED;
1315 			else if ((ret = __repmgr_defer_op(env,
1316 			    REPMGR_REJOIN)) == 0)
1317 				ret = DB_REP_UNAVAIL;
1318 		} else
1319 			ret = DB_REP_UNAVAIL;
1320 		DB_ASSERT(env, ret != 0);
1321 		return (ret);
1322 
1323 	case REPMGR_SHARING:
1324 		if ((ret = __repmgr_queue_put(env, msg)) != 0)
1325 			return (ret);
1326 		/* Show that we no longer own this memory. */
1327 		msg = NULL;
1328 		break;
1329 
1330 	case REPMGR_PARM_REFRESH:
1331 		dbt = &conn->input.rep_message->v.gmdb_msg.request;
1332 		if ((ret = __repmgr_parm_refresh_unmarshal(env,
1333 		    &parms, dbt->data, dbt->size, NULL)) != 0)
1334 			return (DB_REP_UNAVAIL);
1335 		db_rep = env->rep_handle;
1336 		DB_ASSERT(env, conn->type == REP_CONNECTION &&
1337 		    IS_KNOWN_REMOTE_SITE(conn->eid));
1338 		site = SITE_FROM_EID(conn->eid);
1339 		site->ack_policy = (int)parms.ack_policy;
1340 		if (F_ISSET(&parms, ELECTABLE_SITE))
1341 			F_SET(site, SITE_ELECTABLE);
1342 		else
1343 			F_CLR(site, SITE_ELECTABLE);
1344 		F_SET(site, SITE_HAS_PRIO);
1345 		break;
1346 
1347 	case REPMGR_GM_FAILURE:
1348 	case REPMGR_GM_FORWARD:
1349 	case REPMGR_JOIN_REQUEST:
1350 	case REPMGR_JOIN_SUCCESS:
1351 	case REPMGR_REMOVE_REQUEST:
1352 	case REPMGR_RESOLVE_LIMBO:
1353 	default:
1354 		__db_errx(env, DB_STR_A("3677",
1355 		    "unexpected msg type %lu in process_own_msg", "%lu"),
1356 		    (u_long)REPMGR_OWN_MSG_TYPE(msg->msg_hdr));
1357 		return (DB_REP_UNAVAIL);
1358 	}
1359 	/*
1360 	 * If we haven't given ownership of the msg buffer to another thread,
1361 	 * free it now.
1362 	 */
1363 	if (msg != NULL)
1364 		__os_free(env, msg);
1365 	return (ret);
1366 }
1367 
1368 /*
1369  * Examine and verify the incoming version proposal message, and send an
1370  * appropriate response.
1371  */
1372 static int
send_version_response(env,conn)1373 send_version_response(env, conn)
1374 	ENV *env;
1375 	REPMGR_CONNECTION *conn;
1376 {
1377 	DB_REP *db_rep;
1378 	__repmgr_version_proposal_args versions;
1379 	__repmgr_version_confirmation_args conf;
1380 	repmgr_netaddr_t *my_addr;
1381 	char *hostname;
1382 	u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
1383 	DBT vi;
1384 	int ret;
1385 
1386 	db_rep = env->rep_handle;
1387 	my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1388 
1389 	if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1390 		return (ret);
1391 	if (vi.size == 0) {
1392 		/* No version info, so we must be talking to a v1 site. */
1393 		hostname = conn->input.repmgr_msg.rec.data;
1394 		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1395 			return (ret);
1396 		if ((ret = __repmgr_send_v1_handshake(env,
1397 		    conn, my_addr->host, strlen(my_addr->host) + 1)) != 0)
1398 			return (ret);
1399 		conn->state = CONN_READY;
1400 	} else {
1401 		if ((ret = __repmgr_version_proposal_unmarshal(env,
1402 		    &versions, vi.data, vi.size, NULL)) != 0)
1403 			return (DB_REP_UNAVAIL);
1404 
1405 		if (DB_REPMGR_VERSION >= versions.min &&
1406 		    DB_REPMGR_VERSION <= versions.max)
1407 			conf.version = DB_REPMGR_VERSION;
1408 		else if (versions.max >= DB_REPMGR_MIN_VERSION &&
1409 		    versions.max <= DB_REPMGR_VERSION)
1410 			conf.version = versions.max;
1411 		else {
1412 			/*
1413 			 * User must have wired up a combination of versions
1414 			 * exceeding what we said we'd support.
1415 			 */
1416 			__db_errx(env, DB_STR_A("3622",
1417 			    "No available version between %lu and %lu",
1418 			    "%lu %lu"), (u_long)versions.min,
1419 			    (u_long)versions.max);
1420 			return (DB_REP_UNAVAIL);
1421 		}
1422 		conn->version = conf.version;
1423 
1424 		__repmgr_version_confirmation_marshal(env, &conf, buf);
1425 		buf[__REPMGR_VERSION_CONFIRMATION_SIZE] = '\0';
1426 		DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1427 		if ((ret = __repmgr_send_handshake(env,
1428 		     conn, buf, sizeof(buf), 0)) != 0)
1429 			return (ret);
1430 
1431 		conn->state = CONN_PARAMETERS;
1432 	}
1433 	return (ret);
1434 }
1435 
1436 /*
1437  * Sends a version-aware handshake to the remote site, only after we've verified
1438  * that it is indeed version-aware.  We can send either v2 or v3 handshake,
1439  * depending on the connection's version.
1440  *
1441  * PUBLIC: int __repmgr_send_handshake __P((ENV *,
1442  * PUBLIC:     REPMGR_CONNECTION *, void *, size_t, u_int32_t));
1443  */
1444 int
__repmgr_send_handshake(env,conn,opt,optlen,flags)1445 __repmgr_send_handshake(env, conn, opt, optlen, flags)
1446 	ENV *env;
1447 	REPMGR_CONNECTION *conn;
1448 	void *opt;
1449 	size_t optlen;
1450 	u_int32_t flags;
1451 {
1452 	DB_REP *db_rep;
1453 	REP *rep;
1454 	DBT cntrl, rec;
1455 	__repmgr_handshake_args hs;
1456 	__repmgr_v2handshake_args v2hs;
1457 	__repmgr_v3handshake_args v3hs;
1458 	repmgr_netaddr_t *my_addr;
1459 	size_t hostname_len, rec_len;
1460 	void *buf;
1461 	u_int8_t *p;
1462 	u_int32_t cntrl_len;
1463 	int ret;
1464 
1465 	db_rep = env->rep_handle;
1466 	rep = db_rep->region;
1467 	my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1468 
1469 	/*
1470 	 * The cntrl part has various parameters (varies by version).  The rec
1471 	 * part has the host name, followed by whatever optional extra data was
1472 	 * passed to us.
1473 	 *
1474 	 * Version awareness was introduced with protocol version 2 (so version
1475 	 * 1 is handled elsewhere).
1476 	 */
1477 	switch (conn->version) {
1478 	case 2:
1479 		cntrl_len = __REPMGR_V2HANDSHAKE_SIZE;
1480 		break;
1481 	case 3:
1482 		cntrl_len = __REPMGR_V3HANDSHAKE_SIZE;
1483 		break;
1484 	case 4:
1485 		cntrl_len = __REPMGR_HANDSHAKE_SIZE;
1486 		break;
1487 	default:
1488 		__db_errx(env, DB_STR_A("3678",
1489 		    "unexpected conn version %lu in send_handshake", "%lu"),
1490 		    (u_long)conn->version);
1491 		return (DB_REP_UNAVAIL);
1492 	}
1493 	hostname_len = strlen(my_addr->host);
1494 	rec_len = hostname_len + 1 +
1495 	    (opt == NULL ? 0 : optlen);
1496 
1497 	if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
1498 		return (ret);
1499 
1500 	cntrl.data = p = buf;
1501 	switch (conn->version) {
1502 	case 2:
1503 		/* Not allowed to use multi-process feature in v2 group. */
1504 		DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1505 		v2hs.port = my_addr->port;
1506 		v2hs.priority = rep->priority;
1507 		__repmgr_v2handshake_marshal(env, &v2hs, p);
1508 		break;
1509 	case 3:
1510 		v3hs.port = my_addr->port;
1511 		v3hs.priority = rep->priority;
1512 		v3hs.flags = flags;
1513 		__repmgr_v3handshake_marshal(env, &v3hs, p);
1514 		break;
1515 	case 4:
1516 		hs.port = my_addr->port;
1517 		hs.alignment = MEM_ALIGN;
1518 		hs.ack_policy = (u_int32_t)rep->perm_policy;
1519 		hs.flags = flags;
1520 		if (rep->priority > 0)
1521 			F_SET(&hs, ELECTABLE_SITE);
1522 		__repmgr_handshake_marshal(env, &hs, p);
1523 		break;
1524 	default:
1525 		DB_ASSERT(env, FALSE);
1526 		break;
1527 	}
1528 	cntrl.size = cntrl_len;
1529 
1530 	p = rec.data = &p[cntrl_len];
1531 	(void)strcpy((char*)p, my_addr->host);
1532 	p += hostname_len + 1;
1533 	if (opt != NULL) {
1534 		memcpy(p, opt, optlen);
1535 		p += optlen;
1536 	}
1537 	rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);
1538 
1539 	/* Never block on select thread: pass maxblock as 0. */
1540 	ret = __repmgr_send_one(env,
1541 	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0);
1542 	__os_free(env, buf);
1543 	return (ret);
1544 }
1545 
1546 static int
read_version_response(env,conn)1547 read_version_response(env, conn)
1548 	ENV *env;
1549 	REPMGR_CONNECTION *conn;
1550 {
1551 	DB_REP *db_rep;
1552 	__repmgr_version_confirmation_args conf;
1553 	DBT vi;
1554 	char *hostname;
1555 	u_int32_t flags;
1556 	int ret;
1557 
1558 	db_rep = env->rep_handle;
1559 
1560 	if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1561 		return (ret);
1562 	hostname = conn->input.repmgr_msg.rec.data;
1563 	if (vi.size == 0) {
1564 		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1565 			return (ret);
1566 	} else {
1567 		if ((ret = __repmgr_version_confirmation_unmarshal(env,
1568 		    &conf, vi.data, vi.size, NULL)) != 0)
1569 			return (DB_REP_UNAVAIL);
1570 		if (conf.version >= DB_REPMGR_MIN_VERSION &&
1571 		    conf.version <= DB_REPMGR_VERSION)
1572 			conn->version = conf.version;
1573 		else {
1574 			/*
1575 			 * Remote site "confirmed" a version outside of the
1576 			 * range we proposed.  It should never do that.
1577 			 */
1578 			__db_errx(env, DB_STR_A("3623",
1579 			    "Can't support confirmed version %lu", "%lu"),
1580 			    (u_long)conf.version);
1581 			return (DB_REP_UNAVAIL);
1582 		}
1583 
1584 		if ((ret = accept_handshake(env, conn, hostname)) != 0)
1585 			return (ret);
1586 		flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
1587 		if ((ret = __repmgr_send_handshake(env,
1588 		    conn, NULL, 0, flags)) != 0)
1589 			return (ret);
1590 	}
1591 	conn->state = CONN_READY;
1592 	return (ret);
1593 }
1594 
1595 /*
1596  * Examine the rec part of a handshake message to see if it has any version
1597  * information in it.  This is the magic that lets us allows version-aware sites
1598  * to exchange information, and yet avoids tripping up v1 sites, which don't
1599  * know how to look for it.
1600  *
1601  * PUBLIC: int __repmgr_find_version_info __P((ENV *,
1602  * PUBLIC:     REPMGR_CONNECTION *, DBT *));
1603  */
1604 int
__repmgr_find_version_info(env,conn,vi)1605 __repmgr_find_version_info(env, conn, vi)
1606 	ENV *env;
1607 	REPMGR_CONNECTION *conn;
1608 	DBT *vi;
1609 {
1610 	DBT *dbt;
1611 	char *hostname;
1612 	u_int32_t hostname_len;
1613 
1614 	dbt = &conn->input.repmgr_msg.rec;
1615 	if (dbt->size == 0) {
1616 		__db_errx(env, DB_STR("3624",
1617 		    "handshake is missing rec part"));
1618 		return (DB_REP_UNAVAIL);
1619 	}
1620 	hostname = dbt->data;
1621 	hostname[dbt->size-1] = '\0';
1622 	hostname_len = (u_int32_t)strlen(hostname);
1623 	if (hostname_len + 1 == dbt->size) {
1624 		/*
1625 		 * The rec DBT held only the host name.  This is a simple legacy
1626 		 * V1 handshake; it contains no version information.
1627 		 */
1628 		vi->size = 0;
1629 	} else {
1630 		/*
1631 		 * There's more data than just the host name.  The remainder is
1632 		 * available to be treated as a normal byte buffer (and read in
1633 		 * by one of the unmarshal functions).  Note that the remaining
1634 		 * length should not include the padding byte that we have
1635 		 * already clobbered.
1636 		 */
1637 		vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
1638 		vi->size = (dbt->size - (hostname_len+1)) - 1;
1639 	}
1640 	return (0);
1641 }
1642 
1643 static int
accept_handshake(env,conn,hostname)1644 accept_handshake(env, conn, hostname)
1645 	ENV *env;
1646 	REPMGR_CONNECTION *conn;
1647 	char *hostname;
1648 {
1649 	__repmgr_handshake_args hs;
1650 	__repmgr_v2handshake_args hs2;
1651 	__repmgr_v3handshake_args hs3;
1652 	u_int port;
1653 	u_int32_t ack, flags;
1654 	int electable;
1655 
1656 	switch (conn->version) {
1657 	case 2:
1658 		if (__repmgr_v2handshake_unmarshal(env, &hs2,
1659 		    conn->input.repmgr_msg.cntrl.data,
1660 		    conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1661 			return (DB_REP_UNAVAIL);
1662 		port = hs2.port;
1663 		electable = hs2.priority > 0;
1664 		ack = flags = 0;
1665 		break;
1666 	case 3:
1667 		if (__repmgr_v3handshake_unmarshal(env, &hs3,
1668 		   conn->input.repmgr_msg.cntrl.data,
1669 		   conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1670 			return (DB_REP_UNAVAIL);
1671 		port = hs3.port;
1672 		electable = hs3.priority > 0;
1673 		flags = hs3.flags;
1674 		ack = 0;
1675 		break;
1676 	case 4:
1677 		if (__repmgr_handshake_unmarshal(env, &hs,
1678 		   conn->input.repmgr_msg.cntrl.data,
1679 		   conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1680 			return (DB_REP_UNAVAIL);
1681 		port = hs.port;
1682 		electable = F_ISSET(&hs, ELECTABLE_SITE);
1683 		flags = hs.flags;
1684 		ack = hs.ack_policy;
1685 		break;
1686 	default:
1687 		__db_errx(env, DB_STR_A("3679",
1688 		    "unexpected conn version %lu in accept_handshake", "%lu"),
1689 		    (u_long)conn->version);
1690 		return (DB_REP_UNAVAIL);
1691 	}
1692 
1693 	return (process_parameters(env,
1694 	    conn, hostname, port, ack, electable, flags));
1695 }
1696 
1697 static int
accept_v1_handshake(env,conn,hostname)1698 accept_v1_handshake(env, conn, hostname)
1699 	ENV *env;
1700 	REPMGR_CONNECTION *conn;
1701 	char *hostname;
1702 {
1703 	DB_REPMGR_V1_HANDSHAKE *handshake;
1704 	u_int32_t prio;
1705 	int electable;
1706 
1707 	handshake = conn->input.repmgr_msg.cntrl.data;
1708 	if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
1709 	    handshake->version != 1) {
1710 		__db_errx(env, DB_STR("3625", "malformed V1 handshake"));
1711 		return (DB_REP_UNAVAIL);
1712 	}
1713 
1714 	conn->version = 1;
1715 	prio = ntohl(handshake->priority);
1716 	electable = prio > 0;
1717 	return (process_parameters(env,
1718 	    conn, hostname, handshake->port, 0, electable, 0));
1719 }
1720 
1721 /* Caller must hold mutex. */
1722 static int
process_parameters(env,conn,host,port,ack,electable,flags)1723 process_parameters(env, conn, host, port, ack, electable, flags)
1724 	ENV *env;
1725 	REPMGR_CONNECTION *conn;
1726 	char *host;
1727 	u_int port;
1728 	int electable;
1729 	u_int32_t ack, flags;
1730 {
1731 	DB_REP *db_rep;
1732 	REPMGR_RETRY *retry;
1733 	REPMGR_SITE *site;
1734 	__repmgr_connect_reject_args reject;
1735 	u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE];
1736 	int eid, ret;
1737 
1738 	db_rep = env->rep_handle;
1739 
1740 	/* Connection state can be used to discern incoming versus outgoing. */
1741 	if (conn->state == CONN_CONNECTED) {
1742 		/*
1743 		 * Since we initiated this as an outgoing connection, we
1744 		 * obviously already know the host, port and site.  We just need
1745 		 * the other site's electability flag (which we'll grab below,
1746 		 * after the big "else" clause).
1747 		 */
1748 		DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1749 		site = SITE_FROM_EID(conn->eid);
1750 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1751 		    "handshake from connection to %s:%lu EID %u",
1752 		    site->net_addr.host,
1753 		    (u_long)site->net_addr.port, conn->eid));
1754 	} else {
1755 		DB_ASSERT(env, conn->state == CONN_NEGOTIATE ||
1756 		    conn->state == CONN_PARAMETERS);
1757 		/*
1758 		 * Incoming connection: until now we haven't known what kind of
1759 		 * connection we're dealing with (and in the case of a
1760 		 * REP_CONNECTION, what its EID is); so it must be on the
1761 		 * "orphans" list.  But now that we've received the parameters
1762 		 * we'll be able to figure all that out.
1763 		 */
1764 		if (LF_ISSET(APP_CHANNEL_CONNECTION)) {
1765 			conn->type = APP_CONNECTION;
1766 			return (0);
1767 		} else
1768 			conn->type = REP_CONNECTION;
1769 
1770 		/*
1771 		 * Now that we've been given the host and port, use them to find
1772 		 * the site.
1773 		 */
1774 		if ((site = __repmgr_lookup_site(env, host, port)) != NULL &&
1775 		    site->membership == SITE_PRESENT) {
1776 			TAILQ_REMOVE(&db_rep->connections, conn, entries);
1777 			conn->ref_count--;
1778 
1779 			eid = EID_FROM_SITE(site);
1780 			if (LF_ISSET(REPMGR_SUBORDINATE)) {
1781 				/*
1782 				 * Accept it, as a supplementary source of
1783 				 * input, but nothing else.
1784 				 */
1785 				TAILQ_INSERT_TAIL(&site->sub_conns,
1786 				    conn, entries);
1787 				conn->eid = eid;
1788 			} else {
1789 				DB_EVENT(env,
1790 				    DB_EVENT_REP_CONNECT_ESTD, &eid);
1791 				switch (site->state) {
1792 				case SITE_PAUSING:
1793 					RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1794 				      "handshake from paused site %s:%u EID %u",
1795 					    host, port, eid));
1796 					retry = site->ref.retry;
1797 					TAILQ_REMOVE(&db_rep->retries,
1798 					    retry, entries);
1799 					__os_free(env, retry);
1800 					break;
1801 				case SITE_CONNECTED:
1802 					/*
1803 					 * We got an incoming connection for a
1804 					 * site we were already connected to; at
1805 					 * least we thought we were.
1806 					 */
1807 					RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1808 			 "connection from %s:%u EID %u while already connected",
1809 					    host, port, eid));
1810 					if ((ret = resolve_collision(env,
1811 					    site, conn)) != 0)
1812 						return (ret);
1813 					break;
1814 				case SITE_CONNECTING:
1815 					RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1816 				  "handshake from connecting site %s:%u EID %u",
1817 					    host, port, eid));
1818 					/*
1819 					 * Connector thread will give up when it
1820 					 * sees this site's state change, so we
1821 					 * don't have to do anything else here.
1822 					 */
1823 					break;
1824 				default:
1825 					DB_ASSERT(env, FALSE);
1826 				}
1827 				conn->eid = eid;
1828 				site->state = SITE_CONNECTED;
1829 				site->ref.conn.in = conn;
1830 				__os_gettime(env,
1831 				    &site->last_rcvd_timestamp, 1);
1832 			}
1833 		} else {
1834 			RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1835 		  "rejecting connection from unknown or provisional site %s:%u",
1836 			    host, port));
1837 			reject.version = db_rep->membership_version;
1838 			reject.gen = db_rep->member_version_gen;
1839 			__repmgr_connect_reject_marshal(env,
1840 			    &reject, reject_buf);
1841 
1842 			if ((ret = __repmgr_send_own_msg(env, conn,
1843 			    REPMGR_CONNECT_REJECT, reject_buf,
1844 			    __REPMGR_CONNECT_REJECT_SIZE)) != 0)
1845 				return (ret);
1846 
1847 			/*
1848 			 * Since we haven't set conn->eid, bust_connection will
1849 			 * not schedule a retry for this "failure", which is
1850 			 * exactly what we want.
1851 			 */
1852 			return (DB_REP_UNAVAIL);
1853 		}
1854 	}
1855 
1856 	if (electable)
1857 		F_SET(site, SITE_ELECTABLE);
1858 	else
1859 		F_CLR(site, SITE_ELECTABLE);
1860 	F_SET(site, SITE_HAS_PRIO);
1861 	site->ack_policy = (int)ack;
1862 
1863 	/*
1864 	 * If we're moping around wishing we knew who the master was, then
1865 	 * getting in touch with another site might finally provide sufficient
1866 	 * connectivity to find out.
1867 	 */
1868 	if (!IS_SUBORDINATE(db_rep) && /* us */
1869 	    !__repmgr_master_is_known(env) &&
1870 	    !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */
1871 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1872 		    "handshake with no known master to wake election thread"));
1873 		db_rep->new_connection = TRUE;
1874 		if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
1875 			return (ret);
1876 	}
1877 
1878 	return (0);
1879 }
1880 
1881 static int
resolve_collision(env,site,conn)1882 resolve_collision(env, site, conn)
1883 	ENV *env;
1884 	REPMGR_SITE *site;
1885 	REPMGR_CONNECTION *conn;
1886 {
1887 	int ret;
1888 
1889 	/*
1890 	 * No need for site-oriented recovery, since we now have a replacement
1891 	 * connection; so skip bust_connection() and call disable_conn()
1892 	 * directly.
1893 	 *
1894 	 * If we already had an incoming connection, this new one always
1895 	 * replaces it.  Whether it also/alternatively replaces an outgoing
1896 	 * connection depends on whether we're client or server (so as to avoid
1897 	 * connection collisions resulting in no remaining connections).  (If
1898 	 * it's an older version that doesn't know about our collision
1899 	 * resolution protocol, it will behave like a client.)
1900 	 */
1901 	if (site->ref.conn.in != NULL) {
1902 		ret = __repmgr_disable_connection(env, site->ref.conn.in);
1903 		site->ref.conn.in = NULL;
1904 		if (ret != 0)
1905 			return (ret);
1906 	}
1907 	if (site->ref.conn.out != NULL &&
1908 	    conn->version >= CONN_COLLISION_VERSION &&
1909 	    __repmgr_is_server(env, site)) {
1910 		ret = __repmgr_disable_connection(env, site->ref.conn.out);
1911 		site->ref.conn.out = NULL;
1912 		if (ret != 0)
1913 			return (ret);
1914 	}
1915 	return (0);
1916 }
1917 
1918 static int
record_permlsn(env,conn)1919 record_permlsn(env, conn)
1920 	ENV *env;
1921 	REPMGR_CONNECTION *conn;
1922 {
1923 	DB_REP *db_rep;
1924 	REPMGR_SITE *site;
1925 	__repmgr_permlsn_args *ackp, ack;
1926 	SITE_STRING_BUFFER location;
1927 	u_int32_t gen;
1928 	int ret;
1929 	u_int do_log_check;
1930 
1931 	db_rep = env->rep_handle;
1932 	do_log_check = 0;
1933 
1934 	if (conn->version == 0 ||
1935 	    !IS_READY_STATE(conn->state) || !IS_VALID_EID(conn->eid)) {
1936 		__db_errx(env, DB_STR("3682",
1937 		    "unexpected connection info in record_permlsn"));
1938 		return (DB_REP_UNAVAIL);
1939 	}
1940 	site = SITE_FROM_EID(conn->eid);
1941 
1942 	/*
1943 	 * Extract the LSN.  Save it only if it is an improvement over what the
1944 	 * site has already ack'ed.
1945 	 */
1946 	if (conn->version == 1) {
1947 		ackp = conn->input.repmgr_msg.cntrl.data;
1948 		if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
1949 		    conn->input.repmgr_msg.rec.size != 0) {
1950 			__db_errx(env, DB_STR("3627", "bad ack msg size"));
1951 			return (DB_REP_UNAVAIL);
1952 		}
1953 	} else {
1954 		ackp = &ack;
1955 		if ((ret = __repmgr_permlsn_unmarshal(env, ackp,
1956 			 conn->input.repmgr_msg.cntrl.data,
1957 			 conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
1958 			return (DB_REP_UNAVAIL);
1959 	}
1960 
1961 	/* Ignore stale acks. */
1962 	gen = db_rep->region->gen;
1963 	if (ackp->generation < gen) {
1964 		VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1965 		    "ignoring stale ack (%lu<%lu), from %s",
1966 		     (u_long)ackp->generation, (u_long)gen,
1967 		     __repmgr_format_site_loc(site, location)));
1968 		return (0);
1969 	}
1970 	VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1971 	    "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
1972 	    (u_long)ackp->lsn.offset, (u_long)ackp->generation,
1973 	    __repmgr_format_site_loc(site, location)));
1974 
1975 	if (ackp->generation == gen &&
1976 	    LOG_COMPARE(&ackp->lsn, &site->max_ack) == 1) {
1977 		/*
1978 		 * If file number for this site changed, check lowest log
1979 		 * file needed after recording new permlsn for this site.
1980 		 */
1981 		if (ackp->lsn.file > site->max_ack.file)
1982 			do_log_check = 1;
1983 		memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
1984 		if (do_log_check)
1985 			check_min_log_file(env);
1986 		if ((ret = __repmgr_wake_waiters(env,
1987 		    &db_rep->ack_waiters)) != 0)
1988 			return (ret);
1989 	}
1990 	return (0);
1991 }
1992 
1993 /*
1994  * Maintains lowest log file still needed by the repgroup.  This is stored
1995  * in shared rep region so that it is accessible to repmgr subordinate
1996  * processes that may not themselves have connections to other sites
1997  * (e.g. a separate db_archive process.)
1998  */
1999 static void
check_min_log_file(env)2000 check_min_log_file(env)
2001 	ENV *env;
2002 {
2003 	DB_REP *db_rep;
2004 	REP *rep;
2005 	REPMGR_CONNECTION *conn;
2006 	REPMGR_SITE *site;
2007 	u_int32_t min_log;
2008 	int eid;
2009 
2010 	db_rep = env->rep_handle;
2011 	rep = db_rep->region;
2012 	min_log = 0;
2013 
2014 	/*
2015 	 * Record the lowest log file number from all connected sites.  If this
2016 	 * is a client, ignore the master because the master does not maintain
2017 	 * nor send out its repmgr perm LSN in this way.  Consider connections
2018 	 * so that we don't allow a site that has been down a long time to
2019 	 * indefinitely prevent log archiving.
2020 	 */
2021 	FOR_EACH_REMOTE_SITE_INDEX(eid) {
2022 		if (eid == rep->master_id)
2023 			continue;
2024 		site = SITE_FROM_EID(eid);
2025 		if (site->state == SITE_CONNECTED &&
2026 		    (((conn = site->ref.conn.in) != NULL &&
2027 		    conn->state == CONN_READY) ||
2028 		    ((conn = site->ref.conn.out) != NULL &&
2029 		    conn->state == CONN_READY)) &&
2030 		    !IS_ZERO_LSN(site->max_ack) &&
2031 		    (min_log == 0 || site->max_ack.file < min_log))
2032 			min_log = site->max_ack.file;
2033 	}
2034 	/*
2035 	 * During normal operation min_log should increase over time, but it
2036 	 * is possible if a site returns after being disconnected for a while
2037 	 * that min_log could decrease.
2038 	 */
2039 	if (min_log != 0 && min_log != rep->min_log_file)
2040 		rep->min_log_file = min_log;
2041 }
2042 
2043 /*
2044  * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *));
2045  */
2046 int
__repmgr_write_some(env,conn)2047 __repmgr_write_some(env, conn)
2048 	ENV *env;
2049 	REPMGR_CONNECTION *conn;
2050 {
2051 	QUEUED_OUTPUT *output;
2052 	REPMGR_FLAT *msg;
2053 	int bytes, ret;
2054 
2055 	while (!STAILQ_EMPTY(&conn->outbound_queue)) {
2056 		output = STAILQ_FIRST(&conn->outbound_queue);
2057 		msg = output->msg;
2058 		if ((bytes = sendsocket(conn->fd, &msg->data[output->offset],
2059 		    msg->length - output->offset, 0)) == SOCKET_ERROR) {
2060 			switch (ret = net_errno) {
2061 			case WOULDBLOCK:
2062 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
2063 			case DB_REPMGR_EAGAIN:
2064 #endif
2065 				return (0);
2066 			default:
2067 				__repmgr_fire_conn_err_event(env, conn, ret);
2068 				STAT(env->rep_handle->
2069 				    region->mstat.st_connection_drop++);
2070 				return (DB_REP_UNAVAIL);
2071 			}
2072 		}
2073 
2074 		if ((output->offset += (size_t)bytes) >= msg->length) {
2075 			STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
2076 			__os_free(env, output);
2077 			conn->out_queue_length--;
2078 			if (--msg->ref_count <= 0)
2079 				__os_free(env, msg);
2080 
2081 			/*
2082 			 * We've achieved enough movement to free up at least
2083 			 * one space in the outgoing queue.  Wake any message
2084 			 * threads that may be waiting for space.  Leave
2085 			 * CONGESTED state so that when the queue reaches the
2086 			 * high-water mark again, the filling thread will be
2087 			 * allowed to try waiting again.
2088 			 */
2089 			conn->state = CONN_READY;
2090 			if ((ret = __repmgr_signal(&conn->drained)) != 0)
2091 				return (ret);
2092 		}
2093 	}
2094 
2095 	return (0);
2096 }
2097