1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2005, 2013 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/txn.h"
15 
16 #define	INITIAL_SITES_ALLOCATION	3	     /* Arbitrary guess. */
17 
18 static int get_eid __P((ENV *, const char *, u_int, int *));
19 static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *));
20 static int read_gmdb __P((ENV *, DB_THREAD_INFO *, u_int8_t **, size_t *));
21 
22 /*
23  * Schedules a future attempt to re-establish a connection with the given site.
24  * Usually, we wait the configured retry_wait period.  But if the "immediate"
25  * parameter is given as TRUE, we'll make the wait time 0, and put the request
26  * at the _beginning_ of the retry queue.
27  *
28  * PUBLIC: int __repmgr_schedule_connection_attempt __P((ENV *, int, int));
29  *
30  * !!!
31  * Caller should hold mutex.
32  *
33  * Unless an error occurs, we always attempt to wake the main thread;
34  * __repmgr_bust_connection relies on this behavior.
35  */
36 int
__repmgr_schedule_connection_attempt(env,eid,immediate)37 __repmgr_schedule_connection_attempt(env, eid, immediate)
38 	ENV *env;
39 	int eid;
40 	int immediate;
41 {
42 	DB_REP *db_rep;
43 	REP *rep;
44 	REPMGR_RETRY *retry, *target;
45 	REPMGR_SITE *site;
46 	db_timespec t;
47 	int ret;
48 
49 	db_rep = env->rep_handle;
50 	rep = db_rep->region;
51 	if ((ret = __os_malloc(env, sizeof(*retry), &retry)) != 0)
52 		return (ret);
53 
54 	DB_ASSERT(env, IS_VALID_EID(eid));
55 	site = SITE_FROM_EID(eid);
56 	__os_gettime(env, &t, 1);
57 	if (immediate)
58 		TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
59 	else {
60 		TIMESPEC_ADD_DB_TIMEOUT(&t, rep->connection_retry_wait);
61 		/*
62 		 * Insert the new "retry" on the (time-ordered) list in its
63 		 * proper position.  To do so, find the list entry ("target")
64 		 * with a later time; insert the new entry just before that.
65 		 */
66 		TAILQ_FOREACH(target, &db_rep->retries, entries) {
67 			if (timespeccmp(&target->time, &t, >))
68 				break;
69 		}
70 		if (target == NULL)
71 			TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries);
72 		else
73 			TAILQ_INSERT_BEFORE(target, retry, entries);
74 
75 	}
76 	retry->eid = eid;
77 	retry->time = t;
78 
79 	site->state = SITE_PAUSING;
80 	site->ref.retry = retry;
81 
82 	return (__repmgr_wake_main_thread(env));
83 }
84 
85 /*
86  * Determines whether a remote site should be considered a "server" to us as a
87  * "client" (in typical client/server terminology, not to be confused with our
88  * usual use of the term "client" as in the master/client replication role), or
89  * vice versa.
90  *
91  * PUBLIC: int __repmgr_is_server __P((ENV *, REPMGR_SITE *));
92  */
93 int
__repmgr_is_server(env,site)94 __repmgr_is_server(env, site)
95 	ENV *env;
96 	REPMGR_SITE *site;
97 {
98 	DB_REP *db_rep;
99 	int cmp;
100 
101 	db_rep = env->rep_handle;
102 	cmp = __repmgr_addrcmp(&site->net_addr,
103 	    &SITE_FROM_EID(db_rep->self_eid)->net_addr);
104 	DB_ASSERT(env, cmp != 0);
105 
106 	/*
107 	 * The mnemonic here is that a server conventionally has a
108 	 * small well-known port number, while clients typically use a port
109 	 * number from the higher ephemeral range.  So, for the remote site to
110 	 * be considered a server, its address should have compared as lower
111 	 * than ours.
112 	 */
113 	return (cmp == -1);
114 }
115 
116 /*
117  * Compare two network addresses (lexicographically), and return -1, 0, or 1, as
118  * the first is less than, equal to, or greater than the second.
119  */
120 static int
__repmgr_addrcmp(addr1,addr2)121 __repmgr_addrcmp(addr1, addr2)
122 	repmgr_netaddr_t *addr1, *addr2;
123 {
124 	int cmp;
125 
126 	cmp = strcmp(addr1->host, addr2->host);
127 	if (cmp != 0)
128 		return (cmp);
129 
130 	if (addr1->port < addr2->port)
131 		return (-1);
132 	else if (addr1->port > addr2->port)
133 		return (1);
134 	return (0);
135 }
136 
137 /*
138  * Initialize the necessary control structures to begin reading a new input
139  * message.
140  *
141  * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *));
142  */
143 void
__repmgr_reset_for_reading(con)144 __repmgr_reset_for_reading(con)
145 	REPMGR_CONNECTION *con;
146 {
147 	con->reading_phase = SIZES_PHASE;
148 	__repmgr_iovec_init(&con->iovecs);
149 	__repmgr_add_buffer(&con->iovecs,
150 	    con->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE);
151 }
152 
153 /*
154  * Constructs a DB_REPMGR_CONNECTION structure.
155  *
156  * PUBLIC: int __repmgr_new_connection __P((ENV *,
157  * PUBLIC:     REPMGR_CONNECTION **, socket_t, int));
158  */
159 int
__repmgr_new_connection(env,connp,s,state)160 __repmgr_new_connection(env, connp, s, state)
161 	ENV *env;
162 	REPMGR_CONNECTION **connp;
163 	socket_t s;
164 	int state;
165 {
166 	REPMGR_CONNECTION *c;
167 	int ret;
168 
169 	if ((ret = __os_calloc(env, 1, sizeof(REPMGR_CONNECTION), &c)) != 0)
170 		return (ret);
171 	if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
172 		__os_free(env, c);
173 		return (ret);
174 	}
175 	if ((ret = __repmgr_init_waiters(env, &c->response_waiters)) != 0) {
176 		(void)__repmgr_free_cond(&c->drained);
177 		__os_free(env, c);
178 		return (ret);
179 	}
180 
181 	c->fd = s;
182 	c->state = state;
183 	c->type = UNKNOWN_CONN_TYPE;
184 #ifdef DB_WIN32
185 	c->event_object = WSA_INVALID_EVENT;
186 #endif
187 
188 	STAILQ_INIT(&c->outbound_queue);
189 	c->out_queue_length = 0;
190 
191 	__repmgr_reset_for_reading(c);
192 	*connp = c;
193 
194 	return (0);
195 }
196 
197 /*
198  * PUBLIC: int __repmgr_set_keepalive __P((ENV *, REPMGR_CONNECTION *));
199  */
200 int
__repmgr_set_keepalive(env,conn)201 __repmgr_set_keepalive(env, conn)
202 	ENV *env;
203 	REPMGR_CONNECTION *conn;
204 {
205 	int ret, sockopt;
206 
207 	ret = 0;
208 #ifdef SO_KEEPALIVE
209 	sockopt = 1;
210 	if (setsockopt(conn->fd, SOL_SOCKET,
211 	    SO_KEEPALIVE, (sockopt_t)&sockopt, sizeof(sockopt)) != 0) {
212 		ret = net_errno;
213 		__db_err(env, ret, DB_STR("3626",
214 			"can't set KEEPALIVE socket option"));
215 		(void)__repmgr_destroy_conn(env, conn);
216 	}
217 #endif
218 	return (ret);
219 }
220 
221 /*
222  * PUBLIC: int __repmgr_new_site __P((ENV *, REPMGR_SITE**,
223  * PUBLIC:     const char *, u_int));
224  *
225  * Manipulates the process-local copy of the sites list.  So, callers should
226  * hold the db_rep->mutex (except for single-threaded, pre-open configuration).
227  */
228 int
__repmgr_new_site(env,sitep,host,port)229 __repmgr_new_site(env, sitep, host, port)
230 	ENV *env;
231 	REPMGR_SITE **sitep;
232 	const char *host;
233 	u_int port;
234 {
235 	DB_REP *db_rep;
236 	REPMGR_CONNECTION *conn;
237 	REPMGR_SITE *site, *sites;
238 	char *p;
239 	u_int i, new_site_max;
240 	int ret;
241 
242 	db_rep = env->rep_handle;
243 	if (db_rep->site_cnt >= db_rep->site_max) {
244 		new_site_max = db_rep->site_max == 0 ?
245 		    INITIAL_SITES_ALLOCATION : db_rep->site_max * 2;
246 		if ((ret = __os_malloc(env,
247 		     sizeof(REPMGR_SITE) * new_site_max, &sites)) != 0)
248 			 return (ret);
249 		if (db_rep->site_max > 0) {
250 			/*
251 			 * For each site in the array, copy the old struct to
252 			 * the space allocated for the new struct.  But the
253 			 * sub_conns list header (and one of the conn structs on
254 			 * the list, if any) contain pointers to the address of
255 			 * the old list header; so we have to move them
256 			 * explicitly.  If not for that, we could use a simple
257 			 * __os_realloc() call.
258 			 */
259 			for (i = 0; i < db_rep->site_cnt; i++) {
260 				sites[i] = db_rep->sites[i];
261 				TAILQ_INIT(&sites[i].sub_conns);
262 				while (!TAILQ_EMPTY(
263 				    &db_rep->sites[i].sub_conns)) {
264 					conn = TAILQ_FIRST(
265 					    &db_rep->sites[i].sub_conns);
266 					TAILQ_REMOVE(
267 					    &db_rep->sites[i].sub_conns,
268 					    conn, entries);
269 					TAILQ_INSERT_TAIL(&sites[i].sub_conns,
270 					    conn, entries);
271 				}
272 			}
273 			__os_free(env, db_rep->sites);
274 		}
275 		db_rep->sites = sites;
276 		db_rep->site_max = new_site_max;
277 	}
278 	if ((ret = __os_strdup(env, host, &p)) != 0) {
279 		/* No harm in leaving the increased site_max intact. */
280 		return (ret);
281 	}
282 	site = &db_rep->sites[db_rep->site_cnt++];
283 
284 	site->net_addr.host = p;
285 	site->net_addr.port = (u_int16_t)port;
286 
287 	ZERO_LSN(site->max_ack);
288 	site->ack_policy = 0;
289 	site->alignment = 0;
290 	site->flags = 0;
291 	timespecclear(&site->last_rcvd_timestamp);
292 	TAILQ_INIT(&site->sub_conns);
293 	site->connector = NULL;
294 	site->ref.conn.in = site->ref.conn.out = NULL;
295 	site->state = SITE_IDLE;
296 
297 	site->membership = 0;
298 	site->config = 0;
299 
300 	*sitep = site;
301 	return (0);
302 }
303 
304 /*
305  * PUBLIC: int __repmgr_create_mutex __P((ENV *, mgr_mutex_t **));
306  */
307 int
__repmgr_create_mutex(env,mtxp)308 __repmgr_create_mutex(env, mtxp)
309 	ENV *env;
310 	mgr_mutex_t **mtxp;
311 {
312 	mgr_mutex_t *mtx;
313 	int ret;
314 
315 	if ((ret = __os_malloc(env, sizeof(mgr_mutex_t), &mtx)) == 0 &&
316 	    (ret = __repmgr_create_mutex_pf(mtx)) != 0) {
317 		__os_free(env, mtx);
318 	}
319 	if (ret == 0)
320 		*mtxp = mtx;
321 	return (ret);
322 }
323 
324 /*
325  * PUBLIC: int __repmgr_destroy_mutex __P((ENV *, mgr_mutex_t *));
326  */
327 int
__repmgr_destroy_mutex(env,mtx)328 __repmgr_destroy_mutex(env, mtx)
329 	ENV *env;
330 	mgr_mutex_t *mtx;
331 {
332 	int ret;
333 
334 	ret = __repmgr_destroy_mutex_pf(mtx);
335 	__os_free(env, mtx);
336 	return (ret);
337 }
338 
339 /*
340  * Kind of like a destructor for a repmgr_netaddr_t: cleans up any subordinate
341  * allocated memory pointed to by the addr, though it does not free the struct
342  * itself.
343  *
344  * PUBLIC: void __repmgr_cleanup_netaddr __P((ENV *, repmgr_netaddr_t *));
345  */
346 void
__repmgr_cleanup_netaddr(env,addr)347 __repmgr_cleanup_netaddr(env, addr)
348 	ENV *env;
349 	repmgr_netaddr_t *addr;
350 {
351 	if (addr->host != NULL) {
352 		__os_free(env, addr->host);
353 		addr->host = NULL;
354 	}
355 }
356 
357 /*
358  * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *));
359  */
360 void
__repmgr_iovec_init(v)361 __repmgr_iovec_init(v)
362 	REPMGR_IOVECS *v;
363 {
364 	v->offset = v->count = 0;
365 	v->total_bytes = 0;
366 }
367 
368 /*
369  * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t));
370  *
371  * !!!
372  * There is no checking for overflow of the vectors[5] array.
373  */
374 void
__repmgr_add_buffer(v,address,length)375 __repmgr_add_buffer(v, address, length)
376 	REPMGR_IOVECS *v;
377 	void *address;
378 	size_t length;
379 {
380 	if (length > 0) {
381 		v->vectors[v->count].iov_base = address;
382 		v->vectors[v->count++].iov_len = (u_long)length;
383 		v->total_bytes += length;
384 	}
385 }
386 
387 /*
388  * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *));
389  */
390 void
__repmgr_add_dbt(v,dbt)391 __repmgr_add_dbt(v, dbt)
392 	REPMGR_IOVECS *v;
393 	const DBT *dbt;
394 {
395 	if (dbt->size > 0) {
396 		v->vectors[v->count].iov_base = dbt->data;
397 		v->vectors[v->count++].iov_len = dbt->size;
398 		v->total_bytes += dbt->size;
399 	}
400 }
401 
402 /*
403  * Update a set of iovecs to reflect the number of bytes transferred in an I/O
404  * operation, so that the iovecs can be used to continue transferring where we
405  * left off.
406  *     Returns TRUE if the set of buffers is now fully consumed, FALSE if more
407  * remains.
408  *
409  * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t));
410  */
411 int
__repmgr_update_consumed(v,byte_count)412 __repmgr_update_consumed(v, byte_count)
413 	REPMGR_IOVECS *v;
414 	size_t byte_count;
415 {
416 	db_iovec_t *iov;
417 	int i;
418 
419 	for (i = v->offset; ; i++) {
420 		DB_ASSERT(NULL, i < v->count && byte_count > 0);
421 		iov = &v->vectors[i];
422 		if (byte_count > iov->iov_len) {
423 			/*
424 			 * We've consumed (more than) this vector's worth.
425 			 * Adjust count and continue.
426 			 */
427 			byte_count -= iov->iov_len;
428 		} else {
429 			/*
430 			 * Adjust length of remaining portion of vector.
431 			 * byte_count can never be greater than iov_len, or we
432 			 * would not be in this section of the if clause.
433 			 */
434 			iov->iov_len -= (u_int32_t)byte_count;
435 			if (iov->iov_len > 0) {
436 				/*
437 				 * Still some left in this vector.  Adjust base
438 				 * address too, and leave offset pointing here.
439 				 */
440 				iov->iov_base = (void *)
441 				    ((u_int8_t *)iov->iov_base + byte_count);
442 				v->offset = i;
443 			} else {
444 				/*
445 				 * Consumed exactly to a vector boundary.
446 				 * Advance to next vector for next time.
447 				 */
448 				v->offset = i+1;
449 			}
450 			/*
451 			 * If offset has reached count, the entire thing is
452 			 * consumed.
453 			 */
454 			return (v->offset >= v->count);
455 		}
456 	}
457 }
458 
459 /*
460  * Builds a buffer containing our network address information, suitable for
461  * publishing as cdata via a call to rep_start, and sets up the given DBT to
462  * point to it.  The buffer is dynamically allocated memory, and the caller must
463  * assume responsibility for it.
464  *
465  * PUBLIC: int __repmgr_prepare_my_addr __P((ENV *, DBT *));
466  */
467 int
__repmgr_prepare_my_addr(env,dbt)468 __repmgr_prepare_my_addr(env, dbt)
469 	ENV *env;
470 	DBT *dbt;
471 {
472 	DB_REP *db_rep;
473 	repmgr_netaddr_t addr;
474 	size_t size, hlen;
475 	u_int16_t port_buffer;
476 	u_int8_t *ptr;
477 	int ret;
478 
479 	db_rep = env->rep_handle;
480 	LOCK_MUTEX(db_rep->mutex);
481 	addr = SITE_FROM_EID(db_rep->self_eid)->net_addr;
482 	UNLOCK_MUTEX(db_rep->mutex);
483 	/*
484 	 * The cdata message consists of the 2-byte port number, in network byte
485 	 * order, followed by the null-terminated host name string.
486 	 */
487 	port_buffer = htons(addr.port);
488 	size = sizeof(port_buffer) + (hlen = strlen(addr.host) + 1);
489 	if ((ret = __os_malloc(env, size, &ptr)) != 0)
490 		return (ret);
491 
492 	DB_INIT_DBT(*dbt, ptr, size);
493 
494 	memcpy(ptr, &port_buffer, sizeof(port_buffer));
495 	ptr = &ptr[sizeof(port_buffer)];
496 	memcpy(ptr, addr.host, hlen);
497 
498 	return (0);
499 }
500 
501 /*
502  * !!!
503  * This may only be called after threads have been started, because we don't
504  * know the answer until we have established group membership (e.g., reading the
505  * membership database).  That should be OK, because we only need this
506  * for starting an election, or counting acks after sending a PERM message.
507  *
508  * PUBLIC: int __repmgr_get_nsites __P((ENV *, u_int32_t *));
509  */
510 int
__repmgr_get_nsites(env,nsitesp)511 __repmgr_get_nsites(env, nsitesp)
512 	ENV *env;
513 	u_int32_t *nsitesp;
514 {
515 	DB_REP *db_rep;
516 	u_int32_t nsites;
517 
518 	db_rep = env->rep_handle;
519 
520 	if ((nsites = db_rep->region->config_nsites) == 0) {
521 		__db_errx(env, DB_STR("3672",
522 		    "Nsites unknown before repmgr_start()"));
523 		return (EINVAL);
524 	}
525 	*nsitesp = nsites;
526 	return (0);
527 }
528 
529 /*
530  * PUBLIC: int __repmgr_thread_failure __P((ENV *, int));
531  */
532 int
__repmgr_thread_failure(env,why)533 __repmgr_thread_failure(env, why)
534 	ENV *env;
535 	int why;
536 {
537 	DB_REP *db_rep;
538 
539 	db_rep = env->rep_handle;
540 	LOCK_MUTEX(db_rep->mutex);
541 	(void)__repmgr_stop_threads(env);
542 	UNLOCK_MUTEX(db_rep->mutex);
543 	return (__env_panic(env, why));
544 }
545 
546 /*
547  * Format a printable representation of a site location, suitable for inclusion
548  * in an error message.  The buffer must be at least as big as
549  * MAX_SITE_LOC_STRING.
550  *
551  * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *,
552  * PUBLIC:    REPMGR_CONNECTION *, char *));
553  *
554  * Caller must hold mutex.
555  */
556 char *
__repmgr_format_eid_loc(db_rep,conn,buffer)557 __repmgr_format_eid_loc(db_rep, conn, buffer)
558 	DB_REP *db_rep;
559 	REPMGR_CONNECTION *conn;
560 	char *buffer;
561 {
562 	int eid;
563 
564 	if (conn->type == APP_CONNECTION)
565 		snprintf(buffer,
566 		    MAX_SITE_LOC_STRING, "(application channel)");
567 	else if (conn->type == REP_CONNECTION &&
568 	    IS_VALID_EID(eid = conn->eid))
569 		(void)__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer);
570 	else
571 		snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)");
572 	return (buffer);
573 }
574 
575 /*
576  * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *));
577  */
578 char *
__repmgr_format_site_loc(site,buffer)579 __repmgr_format_site_loc(site, buffer)
580 	REPMGR_SITE *site;
581 	char *buffer;
582 {
583 	return (__repmgr_format_addr_loc(&site->net_addr, buffer));
584 }
585 
586 /*
587  * PUBLIC: char *__repmgr_format_addr_loc __P((repmgr_netaddr_t *, char *));
588  */
589 char *
__repmgr_format_addr_loc(addr,buffer)590 __repmgr_format_addr_loc(addr, buffer)
591 	repmgr_netaddr_t *addr;
592 	char *buffer;
593 {
594 	snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu",
595 	    addr->host, (u_long)addr->port);
596 	return (buffer);
597 }
598 
599 /*
600  * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t));
601  */
602 int
__repmgr_repstart(env,flags)603 __repmgr_repstart(env, flags)
604 	ENV *env;
605 	u_int32_t flags;
606 {
607 	DBT my_addr;
608 	int ret;
609 
610 	/* Include "cdata" in case sending to old-version site. */
611 	if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
612 		return (ret);
613 	ret = __rep_start_int(env, &my_addr, flags);
614 	__os_free(env, my_addr.data);
615 	if (ret != 0)
616 		__db_err(env, ret, DB_STR("3673", "rep_start"));
617 	return (ret);
618 }
619 
620 /*
621  * PUBLIC: int __repmgr_become_master __P((ENV *));
622  */
623 int
__repmgr_become_master(env)624 __repmgr_become_master(env)
625 	ENV *env;
626 {
627 	DB_REP *db_rep;
628 	DB_THREAD_INFO *ip;
629 	DB *dbp;
630 	DB_TXN *txn;
631 	REPMGR_SITE *site;
632 	DBT key_dbt, data_dbt;
633 	__repmgr_membership_key_args key;
634 	__repmgr_membership_data_args member_status;
635 	repmgr_netaddr_t addr;
636 	u_int32_t status;
637 	u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
638 	u_int8_t key_buf[MAX_MSG_BUF];
639 	size_t len;
640 	u_int i;
641 	int ret, t_ret;
642 
643 	db_rep = env->rep_handle;
644 	dbp = NULL;
645 	txn = NULL;
646 
647 	/* Examine membership list to see if we have a victim in limbo. */
648 	LOCK_MUTEX(db_rep->mutex);
649 	ZERO_LSN(db_rep->limbo_failure);
650 	ZERO_LSN(db_rep->durable_lsn);
651 	db_rep->limbo_victim = DB_EID_INVALID;
652 	db_rep->limbo_resolution_needed = FALSE;
653 	FOR_EACH_REMOTE_SITE_INDEX(i) {
654 		site = SITE_FROM_EID(i);
655 		if (site->membership == SITE_ADDING ||
656 		    site->membership == SITE_DELETING) {
657 			db_rep->limbo_victim = (int)i;
658 			db_rep->limbo_resolution_needed = TRUE;
659 
660 			/*
661 			 * Since there can never be more than one limbo victim,
662 			 * when we find one we don't have to continue looking
663 			 * for others.
664 			 */
665 			break;
666 		}
667 	}
668 	db_rep->client_intent = FALSE;
669 	UNLOCK_MUTEX(db_rep->mutex);
670 
671 	if ((ret = __repmgr_repstart(env, DB_REP_MASTER)) != 0)
672 		return (ret);
673 
674 	/*
675 	 * Make sure member_version_gen is current so that this master
676 	 * can reject obsolete member lists from other sites.
677 	 */
678 	db_rep->member_version_gen = db_rep->region->gen;
679 
680 	/* If there is already a gmdb, we are finished. */
681 	if (db_rep->have_gmdb)
682 		return (0);
683 
684 	/* There isn't a gmdb.  Create one from the in-memory site list. */
685 	ENV_ENTER(env, ip);
686 	if ((ret = __repmgr_hold_master_role(env, NULL)) != 0)
687 		goto leave;
688 retry:
689 	if ((ret = __repmgr_setup_gmdb_op(env, ip, &txn, DB_CREATE)) != 0)
690 		goto err;
691 
692 	DB_ASSERT(env, txn != NULL);
693 	dbp = db_rep->gmdb;
694 	DB_ASSERT(env, dbp != NULL);
695 
696 	/* Write the meta-data record. */
697 	if ((ret = __repmgr_set_gm_version(env, ip, txn, 1)) != 0)
698 		goto err;
699 
700 	/* Write a record representing each site in the group. */
701 	for (i = 0; i < db_rep->site_cnt; i++) {
702 		LOCK_MUTEX(db_rep->mutex);
703 		site = SITE_FROM_EID(i);
704 		addr = site->net_addr;
705 		status = site->membership;
706 		UNLOCK_MUTEX(db_rep->mutex);
707 		if (status == 0)
708 			continue;
709 		DB_INIT_DBT(key.host, addr.host, strlen(addr.host) + 1);
710 		key.port = addr.port;
711 		ret = __repmgr_membership_key_marshal(env,
712 		    &key, key_buf, sizeof(key_buf), &len);
713 		DB_ASSERT(env, ret == 0);
714 		DB_INIT_DBT(key_dbt, key_buf, len);
715 		member_status.flags = status;
716 		__repmgr_membership_data_marshal(env, &member_status, data_buf);
717 		DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
718 		if ((ret = __db_put(dbp, ip, txn, &key_dbt, &data_dbt, 0)) != 0)
719 			goto err;
720 	}
721 
722 err:
723 	if (txn != NULL) {
724 		if ((t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 &&
725 		    ret == 0)
726 			ret = t_ret;
727 		if ((t_ret = __repmgr_cleanup_gmdb_op(env, TRUE)) != 0 &&
728 		    ret == 0)
729 			ret = t_ret;
730 	}
731 	if (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED)
732 		goto retry;
733 	if ((t_ret = __repmgr_rlse_master_role(env)) != 0 && ret == 0)
734 		ret = t_ret;
735 leave:
736 	ENV_LEAVE(env, ip);
737 	return (ret);
738 }
739 
740 /*
741  * Visits all the connections we know about, performing the desired action.
742  * "err_quit" determines whether we give up, or soldier on, in case of an
743  * error.
744  *
745  * PUBLIC: int __repmgr_each_connection __P((ENV *,
746  * PUBLIC:     CONNECTION_ACTION, void *, int));
747  *
748  * !!!
749  * Caller must hold mutex.
750  */
751 int
__repmgr_each_connection(env,callback,info,err_quit)752 __repmgr_each_connection(env, callback, info, err_quit)
753 	ENV *env;
754 	CONNECTION_ACTION callback;
755 	void *info;
756 	int err_quit;
757 {
758 	DB_REP *db_rep;
759 	REPMGR_CONNECTION *conn, *next;
760 	REPMGR_SITE *site;
761 	int eid, ret, t_ret;
762 
763 #define	HANDLE_ERROR		        \
764 	do {			        \
765 		if (err_quit)	        \
766 			return (t_ret); \
767 		if (ret == 0)	        \
768 			ret = t_ret;    \
769 	} while (0)
770 
771 	db_rep = env->rep_handle;
772 	ret = 0;
773 
774 	/*
775 	 * We might have used TAILQ_FOREACH here, except that in some cases we
776 	 * need to unlink an element along the way.
777 	 */
778 	for (conn = TAILQ_FIRST(&db_rep->connections);
779 	     conn != NULL;
780 	     conn = next) {
781 		next = TAILQ_NEXT(conn, entries);
782 
783 		if ((t_ret = (*callback)(env, conn, info)) != 0)
784 			HANDLE_ERROR;
785 	}
786 
787 	FOR_EACH_REMOTE_SITE_INDEX(eid) {
788 		site = SITE_FROM_EID(eid);
789 
790 		if (site->state == SITE_CONNECTED) {
791 			if ((conn = site->ref.conn.in) != NULL &&
792 			    (t_ret = (*callback)(env, conn, info)) != 0)
793 				HANDLE_ERROR;
794 			if ((conn = site->ref.conn.out) != NULL &&
795 			    (t_ret = (*callback)(env, conn, info)) != 0)
796 				HANDLE_ERROR;
797 		}
798 
799 		for (conn = TAILQ_FIRST(&site->sub_conns);
800 		     conn != NULL;
801 		     conn = next) {
802 			next = TAILQ_NEXT(conn, entries);
803 			if ((t_ret = (*callback)(env, conn, info)) != 0)
804 				HANDLE_ERROR;
805 		}
806 	}
807 
808 	return (0);
809 }
810 
811 /*
812  * Initialize repmgr's portion of the shared region area.  Note that we can't
813  * simply get the REP* address from the env as we usually do, because at the
814  * time of this call it hasn't been linked into there yet.
815  *
816  * This function is only called during creation of the region.  If anything
817  * fails, our caller will panic and remove the region.  So, if we have any
818  * failure, we don't have to clean up any partial allocation.
819  *
820  * PUBLIC: int __repmgr_open __P((ENV *, void *));
821  */
822 int
__repmgr_open(env,rep_)823 __repmgr_open(env, rep_)
824 	ENV *env;
825 	void *rep_;
826 {
827 	DB_REP *db_rep;
828 	REP *rep;
829 	int ret;
830 
831 	db_rep = env->rep_handle;
832 	rep = rep_;
833 
834 	if ((ret = __mutex_alloc(env, MTX_REPMGR, 0, &rep->mtx_repmgr)) != 0)
835 		return (ret);
836 
837 	DB_ASSERT(env, rep->siteinfo_seq == 0 && db_rep->siteinfo_seq == 0);
838 	rep->siteinfo_off = INVALID_ROFF;
839 	rep->siteinfo_seq = 0;
840 	if ((ret = __repmgr_share_netaddrs(env, rep, 0, db_rep->site_cnt)) != 0)
841 		return (ret);
842 
843 	rep->self_eid = db_rep->self_eid;
844 	rep->perm_policy = db_rep->perm_policy;
845 	rep->ack_timeout = db_rep->ack_timeout;
846 	rep->connection_retry_wait = db_rep->connection_retry_wait;
847 	rep->election_retry_wait = db_rep->election_retry_wait;
848 	rep->heartbeat_monitor_timeout = db_rep->heartbeat_monitor_timeout;
849 	rep->heartbeat_frequency = db_rep->heartbeat_frequency;
850 	return (ret);
851 }
852 
853 /*
854  * Join an existing environment, by setting up our local site info structures
855  * from shared network address configuration in the region.
856  *
857  * As __repmgr_open(), note that we can't simply get the REP* address from the
858  * env as we usually do, because at the time of this call it hasn't been linked
859  * into there yet.
860  *
861  * PUBLIC: int __repmgr_join __P((ENV *, void *));
862  */
863 int
__repmgr_join(env,rep_)864 __repmgr_join(env, rep_)
865 	ENV *env;
866 	void *rep_;
867 {
868 	DB_REP *db_rep;
869 	REGINFO *infop;
870 	REP *rep;
871 	SITEINFO *p;
872 	REPMGR_SITE *site, temp;
873 	repmgr_netaddr_t *addrp;
874 	char *host;
875 	u_int i, j;
876 	int ret;
877 
878 	db_rep = env->rep_handle;
879 	infop = env->reginfo;
880 	rep = rep_;
881 	ret = 0;
882 
883 	MUTEX_LOCK(env, rep->mtx_repmgr);
884 
885 	/*
886 	 * Merge local and shared lists of remote sites.  Note that the
887 	 * placement of entries in the shared array must not change.  To
888 	 * accomplish the merge, pull in entries from the shared list, into the
889 	 * proper position, shuffling not-yet-resolved local entries if
890 	 * necessary.  Then add any remaining locally known entries to the
891 	 * shared list.
892 	 */
893 	i = 0;
894 	if (rep->siteinfo_off != INVALID_ROFF) {
895 		p = R_ADDR(infop, rep->siteinfo_off);
896 
897 		/* For each address in the shared list ... */
898 		for (; i < rep->site_cnt; i++) {
899 			host = R_ADDR(infop, p[i].addr.host);
900 
901 			RPRINT(env, (env, DB_VERB_REPMGR_MISC,
902 			    "Site %s:%lu found at EID %u",
903 				host, (u_long)p[i].addr.port, i));
904 			/*
905 			 * Find it in the local list.  Everything before 'i'
906 			 * already matches the shared list, and is therefore in
907 			 * the right place.  So we only need to search starting
908 			 * from 'i'.  When found, local config values will be
909 			 * used because they are assumed to be "fresher".  But
910 			 * membership status is not, since this process hasn't
911 			 * been active (running) yet.
912 			 */
913 			for (j = i; j < db_rep->site_cnt; j++) {
914 				site = &db_rep->sites[j];
915 				addrp = &site->net_addr;
916 				if (strcmp(host, addrp->host) == 0 &&
917 				    p[i].addr.port == addrp->port) {
918 					p[i].config = site->config;
919 					site->membership = p[i].status;
920 					break;
921 				}
922 			}
923 
924 			/*
925 			 * When not found in local list, copy peer values
926 			 * from shared list.
927 			 */
928 			if (j == db_rep->site_cnt) {
929 				if ((ret = __repmgr_new_site(env,
930 				    &site, host, p[i].addr.port)) != 0)
931 					goto unlock;
932 				site->config = p[i].config;
933 				site->membership = p[i].status;
934 			}
935 			DB_ASSERT(env, j < db_rep->site_cnt);
936 
937 			/* Found or added at 'j', but belongs at 'i': swap. */
938 			if (i != j) {
939 				temp = db_rep->sites[j];
940 				db_rep->sites[j] = db_rep->sites[i];
941 				db_rep->sites[i] = temp;
942 				/*
943 				 * If we're moving the entry that self_eid
944 				 * points to, then adjust self_eid to match.
945 				 * For now this is still merely our original,
946 				 * in-process pointer; we have yet to make sure
947 				 * it matches the one from shared memory.
948 				 */
949 				if (db_rep->self_eid == (int)j)
950 					db_rep->self_eid = (int)i;
951 			}
952 		}
953 	}
954 	if ((ret = __repmgr_share_netaddrs(env, rep, i, db_rep->site_cnt)) != 0)
955 		goto unlock;
956 	if (db_rep->self_eid == DB_EID_INVALID)
957 		db_rep->self_eid = rep->self_eid;
958 	else if (rep->self_eid == DB_EID_INVALID)
959 		rep->self_eid = db_rep->self_eid;
960 	else if (db_rep->self_eid != rep->self_eid) {
961 		__db_errx(env, DB_STR("3674",
962     "A mismatching local site address has been set in the environment"));
963 		ret = EINVAL;
964 		goto unlock;
965 	}
966 
967 	db_rep->siteinfo_seq = rep->siteinfo_seq;
968 unlock:
969 	MUTEX_UNLOCK(env, rep->mtx_repmgr);
970 	return (ret);
971 }
972 
973 /*
974  * PUBLIC: int __repmgr_env_refresh __P((ENV *env));
975  */
976 int
__repmgr_env_refresh(env)977 __repmgr_env_refresh(env)
978 	ENV *env;
979 {
980 	DB_REP *db_rep;
981 	REP *rep;
982 	REGINFO *infop;
983 	SITEINFO *shared_array;
984 	u_int i;
985 	int ret;
986 
987 	db_rep = env->rep_handle;
988 	rep = db_rep->region;
989 	infop = env->reginfo;
990 	ret = 0;
991 	COMPQUIET(i, 0);
992 
993 	if (F_ISSET(env, ENV_PRIVATE)) {
994 		ret = __mutex_free(env, &rep->mtx_repmgr);
995 		if (rep->siteinfo_off != INVALID_ROFF) {
996 			shared_array = R_ADDR(infop, rep->siteinfo_off);
997 			for (i = 0; i < db_rep->site_cnt; i++)
998 				__env_alloc_free(infop, R_ADDR(infop,
999 				    shared_array[i].addr.host));
1000 			__env_alloc_free(infop, shared_array);
1001 			rep->siteinfo_off = INVALID_ROFF;
1002 		}
1003 	}
1004 
1005 	return (ret);
1006 }
1007 
1008 /*
1009  * Copies new remote site information from the indicated private array slots
1010  * into the shared region.  The corresponding shared array slots do not exist
1011  * yet; they must be allocated.
1012  *
1013  * PUBLIC: int __repmgr_share_netaddrs __P((ENV *, void *, u_int, u_int));
1014  *
1015  * !!! The rep pointer is passed, because it may not yet have been installed
1016  * into the env handle.
1017  *
1018  * !!! Assumes caller holds mtx_repmgr lock.
1019  */
1020 int
__repmgr_share_netaddrs(env,rep_,start,limit)1021 __repmgr_share_netaddrs(env, rep_, start, limit)
1022 	ENV *env;
1023 	void *rep_;
1024 	u_int start, limit;
1025 {
1026 	DB_REP *db_rep;
1027 	REP *rep;
1028 	REGINFO *infop;
1029 	REGENV *renv;
1030 	SITEINFO *orig, *shared_array;
1031 	char *host, *hostbuf;
1032 	size_t sz;
1033 	u_int i, n;
1034 	int eid, ret, touched;
1035 
1036 	db_rep = env->rep_handle;
1037 	infop = env->reginfo;
1038 	renv = infop->primary;
1039 	rep = rep_;
1040 	ret = 0;
1041 	touched = FALSE;
1042 
1043 	MUTEX_LOCK(env, renv->mtx_regenv);
1044 
1045 	for (i = start; i < limit; i++) {
1046 		if (rep->site_cnt >= rep->site_max) {
1047 			/* Table is full, we need more space. */
1048 			if (rep->siteinfo_off == INVALID_ROFF) {
1049 				n = INITIAL_SITES_ALLOCATION;
1050 				sz = n * sizeof(SITEINFO);
1051 				if ((ret = __env_alloc(infop,
1052 				    sz, &shared_array)) != 0)
1053 					goto out;
1054 			} else {
1055 				n = 2 * rep->site_max;
1056 				sz = n * sizeof(SITEINFO);
1057 				if ((ret = __env_alloc(infop,
1058 				    sz, &shared_array)) != 0)
1059 					goto out;
1060 				orig = R_ADDR(infop, rep->siteinfo_off);
1061 				memcpy(shared_array, orig,
1062 				    sizeof(SITEINFO) * rep->site_cnt);
1063 				__env_alloc_free(infop, orig);
1064 			}
1065 			rep->siteinfo_off = R_OFFSET(infop, shared_array);
1066 			rep->site_max = n;
1067 		} else
1068 			shared_array = R_ADDR(infop, rep->siteinfo_off);
1069 
1070 		DB_ASSERT(env, rep->site_cnt < rep->site_max &&
1071 		    rep->siteinfo_off != INVALID_ROFF);
1072 
1073 		host = db_rep->sites[i].net_addr.host;
1074 		sz = strlen(host) + 1;
1075 		if ((ret = __env_alloc(infop, sz, &hostbuf)) != 0)
1076 			goto out;
1077 		eid = (int)rep->site_cnt++;
1078 		(void)strcpy(hostbuf, host);
1079 		shared_array[eid].addr.host = R_OFFSET(infop, hostbuf);
1080 		shared_array[eid].addr.port = db_rep->sites[i].net_addr.port;
1081 		shared_array[eid].config = db_rep->sites[i].config;
1082 		shared_array[eid].status = db_rep->sites[i].membership;
1083 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1084 		    "EID %d is assigned for site %s:%lu",
1085 			eid, host, (u_long)shared_array[eid].addr.port));
1086 		touched = TRUE;
1087 	}
1088 
1089 out:
1090 	if (touched)
1091 		db_rep->siteinfo_seq = ++rep->siteinfo_seq;
1092 	MUTEX_UNLOCK(env, renv->mtx_regenv);
1093 	return (ret);
1094 }
1095 
1096 /*
1097  * Copy into our local list any newly added/changed remote site
1098  * configuration information.
1099  *
1100  * !!! Caller must hold db_rep->mutex and mtx_repmgr locks.
1101  *
1102  * PUBLIC: int __repmgr_copy_in_added_sites __P((ENV *));
1103  */
1104 int
__repmgr_copy_in_added_sites(env)1105 __repmgr_copy_in_added_sites(env)
1106 	ENV *env;
1107 {
1108 	DB_REP *db_rep;
1109 	REP *rep;
1110 	REGINFO *infop;
1111 	SITEINFO *base, *p;
1112 	REPMGR_SITE *site;
1113 	char *host;
1114 	int ret;
1115 	u_int i;
1116 
1117 	db_rep = env->rep_handle;
1118 	rep = db_rep->region;
1119 
1120 	if (rep->siteinfo_off == INVALID_ROFF)
1121 		goto out;
1122 
1123 	infop = env->reginfo;
1124 	base = R_ADDR(infop, rep->siteinfo_off);
1125 
1126 	/* Create private array slots for new sites. */
1127 	for (i = db_rep->site_cnt; i < rep->site_cnt; i++) {
1128 		p = &base[i];
1129 		host = R_ADDR(infop, p->addr.host);
1130 		if ((ret = __repmgr_new_site(env,
1131 		    &site, host, p->addr.port)) != 0)
1132 			return (ret);
1133 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1134 		    "Site %s:%lu found at EID %u",
1135 			host, (u_long)p->addr.port, i));
1136 	}
1137 
1138 	/* Make sure info is up to date for all sites, old and new. */
1139 	for (i = 0; i < db_rep->site_cnt; i++) {
1140 		p = &base[i];
1141 		site = SITE_FROM_EID(i);
1142 		site->config = p->config;
1143 		site->membership = p->status;
1144 	}
1145 
1146 out:
1147 	/*
1148 	 * We always make sure our local list has been brought up to date with
1149 	 * the shared list before adding to the local list (except before env
1150 	 * open of course).  So here there should be nothing on our local list
1151 	 * not yet in shared memory.
1152 	 */
1153 	DB_ASSERT(env, db_rep->site_cnt == rep->site_cnt);
1154 	db_rep->siteinfo_seq = rep->siteinfo_seq;
1155 	return (0);
1156 }
1157 
1158 /*
1159  * Initialize a range of sites newly added to our site list array.  Process each
1160  * array entry in the range from <= x < limit.  Passing from >= limit is
1161  * allowed, and is effectively a no-op.
1162  *
1163  * PUBLIC: int __repmgr_init_new_sites __P((ENV *, int, int));
1164  *
1165  * !!! Assumes caller holds db_rep->mutex.
1166  */
1167 int
__repmgr_init_new_sites(env,from,limit)1168 __repmgr_init_new_sites(env, from, limit)
1169 	ENV *env;
1170 	int from, limit;
1171 {
1172 	DB_REP *db_rep;
1173 	REPMGR_SITE *site;
1174 	int i, ret;
1175 
1176 	db_rep = env->rep_handle;
1177 
1178 	if (db_rep->selector == NULL)
1179 		return (0);
1180 
1181 	DB_ASSERT(env, IS_VALID_EID(from) && IS_VALID_EID(limit) &&
1182 	    from <= limit);
1183 	for (i = from; i < limit; i++) {
1184 		site = SITE_FROM_EID(i);
1185 		if (site->membership == SITE_PRESENT &&
1186 		    (ret = __repmgr_schedule_connection_attempt(env,
1187 		    i, TRUE)) != 0)
1188 			return (ret);
1189 	}
1190 
1191 	return (0);
1192 }
1193 
1194 /*
1195  * PUBLIC: int __repmgr_failchk __P((ENV *));
1196  */
1197 int
__repmgr_failchk(env)1198 __repmgr_failchk(env)
1199 	ENV *env;
1200 {
1201 	DB_ENV *dbenv;
1202 	DB_REP *db_rep;
1203 	REP *rep;
1204 	db_threadid_t unused;
1205 
1206 	dbenv = env->dbenv;
1207 	db_rep = env->rep_handle;
1208 	rep = db_rep->region;
1209 
1210 	DB_THREADID_INIT(unused);
1211 	MUTEX_LOCK(env, rep->mtx_repmgr);
1212 
1213 	/*
1214 	 * Check to see if the main (listener) replication process may have died
1215 	 * without cleaning up the flag.  If so, we only have to clear it, and
1216 	 * another process should then be able to come along and become the
1217 	 * listener.  So in either case we can return success.
1218 	 */
1219 	if (rep->listener != 0 && !dbenv->is_alive(dbenv,
1220 	    rep->listener, unused, DB_MUTEX_PROCESS_ONLY))
1221 		rep->listener = 0;
1222 	MUTEX_UNLOCK(env, rep->mtx_repmgr);
1223 
1224 	return (0);
1225 }
1226 
1227 /*
1228  * PUBLIC: int __repmgr_master_is_known __P((ENV *));
1229  */
1230 int
__repmgr_master_is_known(env)1231 __repmgr_master_is_known(env)
1232 	ENV *env;
1233 {
1234 	DB_REP *db_rep;
1235 	REPMGR_CONNECTION *conn;
1236 	REPMGR_SITE *master;
1237 
1238 	db_rep = env->rep_handle;
1239 
1240 	/*
1241 	 * We are the master, or we know of a master and have a healthy
1242 	 * connection to it.
1243 	 */
1244 	if (db_rep->region->master_id == db_rep->self_eid)
1245 		return (TRUE);
1246 	if ((master = __repmgr_connected_master(env)) == NULL)
1247 		return (FALSE);
1248 	if ((conn = master->ref.conn.in) != NULL &&
1249 	    IS_READY_STATE(conn->state))
1250 		return (TRUE);
1251 	if ((conn = master->ref.conn.out) != NULL &&
1252 	    IS_READY_STATE(conn->state))
1253 		return (TRUE);
1254 	return (FALSE);
1255 }
1256 
1257 /*
1258  * PUBLIC: int __repmgr_stable_lsn __P((ENV *, DB_LSN *));
1259  *
1260  * This function may be called before any of repmgr's threads have
1261  * been started.  This code must not be called before env open.
1262  * Currently that is impossible since its only caller is log_archive
1263  * which itself cannot be called before env_open.
1264  */
1265 int
__repmgr_stable_lsn(env,stable_lsn)1266 __repmgr_stable_lsn(env, stable_lsn)
1267 	ENV *env;
1268 	DB_LSN *stable_lsn;
1269 {
1270 	DB_REP *db_rep;
1271 	REP *rep;
1272 
1273 	db_rep = env->rep_handle;
1274 	rep = db_rep->region;
1275 
1276 	if (rep->min_log_file != 0 && rep->min_log_file < stable_lsn->file) {
1277 		/*
1278 		 * Returning an LSN to be consistent with the rest of the
1279 		 * log archiving processing.  Construct LSN of format
1280 		 * [filenum][0].
1281 		 */
1282 		stable_lsn->file = rep->min_log_file;
1283 		stable_lsn->offset = 0;
1284 	}
1285 	RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1286 	    "Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu]",
1287 	    (u_long)stable_lsn->file, (u_long)stable_lsn->offset));
1288 	return (0);
1289 }
1290 
1291 /*
1292  * PUBLIC: int __repmgr_send_sync_msg __P((ENV *, REPMGR_CONNECTION *,
1293  * PUBLIC:     u_int32_t, u_int8_t *, u_int32_t));
1294  */
1295 int
__repmgr_send_sync_msg(env,conn,type,buf,len)1296 __repmgr_send_sync_msg(env, conn, type, buf, len)
1297 	ENV *env;
1298 	REPMGR_CONNECTION *conn;
1299 	u_int8_t *buf;
1300 	u_int32_t len, type;
1301 {
1302 	REPMGR_IOVECS iovecs;
1303 	__repmgr_msg_hdr_args msg_hdr;
1304 	u_int8_t hdr_buf[__REPMGR_MSG_HDR_SIZE];
1305 	size_t unused;
1306 
1307 	msg_hdr.type = REPMGR_OWN_MSG;
1308 	REPMGR_OWN_BUF_SIZE(msg_hdr) = len;
1309 	REPMGR_OWN_MSG_TYPE(msg_hdr) = type;
1310 	__repmgr_msg_hdr_marshal(env, &msg_hdr, hdr_buf);
1311 
1312 	__repmgr_iovec_init(&iovecs);
1313 	__repmgr_add_buffer(&iovecs, hdr_buf, __REPMGR_MSG_HDR_SIZE);
1314 	if (len > 0)
1315 		__repmgr_add_buffer(&iovecs, buf, len);
1316 
1317 	return (__repmgr_write_iovecs(env, conn, &iovecs, &unused));
1318 }
1319 
1320 /*
1321  * Produce a membership list from the known info currently in memory.
1322  *
1323  * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int8_t **, size_t *));
1324  *
1325  * Caller must hold mutex.
1326  */
1327 int
__repmgr_marshal_member_list(env,bufp,lenp)1328 __repmgr_marshal_member_list(env, bufp, lenp)
1329 	ENV *env;
1330 	u_int8_t **bufp;
1331 	size_t *lenp;
1332 {
1333 	DB_REP *db_rep;
1334 	REP *rep;
1335 	REPMGR_SITE *site;
1336 	__repmgr_membr_vers_args membr_vers;
1337 	__repmgr_site_info_args site_info;
1338 	u_int8_t *buf, *p;
1339 	size_t bufsize, len;
1340 	u_int i;
1341 	int ret;
1342 
1343 	db_rep = env->rep_handle;
1344 	rep = db_rep->region;
1345 
1346 	/* Compute a (generous) upper bound on needed buffer size. */
1347 	bufsize = __REPMGR_MEMBR_VERS_SIZE +
1348 	    db_rep->site_cnt * (__REPMGR_SITE_INFO_SIZE + MAXHOSTNAMELEN + 1);
1349 	if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
1350 		return (ret);
1351 	p = buf;
1352 
1353 	membr_vers.version = db_rep->membership_version;
1354 	membr_vers.gen = rep->gen;
1355 	__repmgr_membr_vers_marshal(env, &membr_vers, p);
1356 	p += __REPMGR_MEMBR_VERS_SIZE;
1357 
1358 	for (i = 0; i < db_rep->site_cnt; i++) {
1359 		site = SITE_FROM_EID(i);
1360 		if (site->membership == 0)
1361 			continue;
1362 
1363 		site_info.host.data = site->net_addr.host;
1364 		site_info.host.size =
1365 		    (u_int32_t)strlen(site->net_addr.host) + 1;
1366 		site_info.port = site->net_addr.port;
1367 		site_info.flags = site->membership;
1368 
1369 		ret = __repmgr_site_info_marshal(env,
1370 		    &site_info, p, (size_t)(&buf[bufsize]-p), &len);
1371 		DB_ASSERT(env, ret == 0);
1372 		p += len;
1373 	}
1374 	len = (size_t)(p - buf);
1375 
1376 	*bufp = buf;
1377 	*lenp = len;
1378 	DB_ASSERT(env, ret == 0);
1379 	return (0);
1380 }
1381 
1382 /*
1383  * Produce a membership list by reading the database.
1384  */
1385 static int
read_gmdb(env,ip,bufp,lenp)1386 read_gmdb(env, ip, bufp, lenp)
1387 	ENV *env;
1388 	DB_THREAD_INFO *ip;
1389 	u_int8_t **bufp;
1390 	size_t *lenp;
1391 {
1392 	DB_TXN *txn;
1393 	DB *dbp;
1394 	DBC *dbc;
1395 	DBT key_dbt, data_dbt;
1396 	__repmgr_membership_key_args key;
1397 	__repmgr_membership_data_args member_status;
1398 	__repmgr_member_metadata_args metadata;
1399 	__repmgr_membr_vers_args membr_vers;
1400 	__repmgr_site_info_args site_info;
1401 	u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
1402 	u_int8_t key_buf[MAX_MSG_BUF];
1403 	u_int8_t metadata_buf[__REPMGR_MEMBER_METADATA_SIZE];
1404 	char *host;
1405 	size_t bufsize, len;
1406 	u_int8_t *buf, *p;
1407 	u_int32_t gen;
1408 	int ret, t_ret;
1409 
1410 	txn = NULL;
1411 	dbp = NULL;
1412 	dbc = NULL;
1413 	buf = NULL;
1414 	COMPQUIET(len, 0);
1415 
1416 	if ((ret = __rep_get_datagen(env, &gen)) != 0)
1417 		return (ret);
1418 	if ((ret = __txn_begin(env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0)
1419 		goto err;
1420 	if ((ret = __rep_open_sysdb(env, ip, txn, REPMEMBERSHIP, 0, &dbp)) != 0)
1421 		goto err;
1422 	if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
1423 		goto err;
1424 
1425 	memset(&key_dbt, 0, sizeof(key_dbt));
1426 	key_dbt.data = key_buf;
1427 	key_dbt.ulen = sizeof(key_buf);
1428 	F_SET(&key_dbt, DB_DBT_USERMEM);
1429 	memset(&data_dbt, 0, sizeof(data_dbt));
1430 	data_dbt.data = metadata_buf;
1431 	data_dbt.ulen = sizeof(metadata_buf);
1432 	F_SET(&data_dbt, DB_DBT_USERMEM);
1433 
1434 	/* Get metadata record, make sure key looks right. */
1435 	if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) != 0)
1436 		goto err;
1437 	ret = __repmgr_membership_key_unmarshal(env,
1438 	    &key, key_buf, key_dbt.size, NULL);
1439 	DB_ASSERT(env, ret == 0);
1440 	DB_ASSERT(env, key.host.size == 0);
1441 	DB_ASSERT(env, key.port == 0);
1442 	ret = __repmgr_member_metadata_unmarshal(env,
1443 	    &metadata, metadata_buf, data_dbt.size, NULL);
1444 	DB_ASSERT(env, ret == 0);
1445 	DB_ASSERT(env, metadata.format == REPMGR_GMDB_FMT_VERSION);
1446 	DB_ASSERT(env, metadata.version > 0);
1447 
1448 	bufsize = 1000;		/* Initial guess. */
1449 	if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
1450 		goto err;
1451 	membr_vers.version = metadata.version;
1452 	membr_vers.gen = gen;
1453 	__repmgr_membr_vers_marshal(env, &membr_vers, buf);
1454 	p = &buf[__REPMGR_MEMBR_VERS_SIZE];
1455 
1456 	data_dbt.data = data_buf;
1457 	data_dbt.ulen = sizeof(data_buf);
1458 	while ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) == 0) {
1459 		ret = __repmgr_membership_key_unmarshal(env,
1460 		    &key, key_buf, key_dbt.size, NULL);
1461 		DB_ASSERT(env, ret == 0);
1462 		DB_ASSERT(env, key.host.size <= MAXHOSTNAMELEN + 1 &&
1463 		    key.host.size > 1);
1464 		host = (char*)key.host.data;
1465 		DB_ASSERT(env, host[key.host.size-1] == '\0');
1466 		DB_ASSERT(env, key.port > 0);
1467 
1468 		ret = __repmgr_membership_data_unmarshal(env,
1469 		    &member_status, data_buf, data_dbt.size, NULL);
1470 		DB_ASSERT(env, ret == 0);
1471 		DB_ASSERT(env, member_status.flags != 0);
1472 
1473 		site_info.host = key.host;
1474 		site_info.port = key.port;
1475 		site_info.flags = member_status.flags;
1476 		if ((ret = __repmgr_site_info_marshal(env, &site_info,
1477 		    p, (size_t)(&buf[bufsize]-p), &len)) == ENOMEM) {
1478 			bufsize *= 2;
1479 			len = (size_t)(p - buf);
1480 			if ((ret = __os_realloc(env, bufsize, &buf)) != 0)
1481 				goto err;
1482 			p = &buf[len];
1483 			ret = __repmgr_site_info_marshal(env,
1484 			    &site_info, p, (size_t)(&buf[bufsize]-p), &len);
1485 			DB_ASSERT(env, ret == 0);
1486 		}
1487 		p += len;
1488 	}
1489 	len = (size_t)(p - buf);
1490 	if (ret == DB_NOTFOUND)
1491 		ret = 0;
1492 
1493 err:
1494 	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1495 		ret = t_ret;
1496 	if (dbp != NULL &&
1497 	    (t_ret = __db_close(dbp, txn, DB_NOSYNC)) != 0 && ret == 0)
1498 		ret = t_ret;
1499 	if (txn != NULL &&
1500 	    (t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 && ret == 0)
1501 		ret = t_ret;
1502 	if (ret == 0) {
1503 		*bufp = buf;
1504 		*lenp = len;
1505 	} else if (buf != NULL)
1506 		__os_free(env, buf);
1507 	return (ret);
1508 }
1509 
1510 /*
1511  * Refresh our sites array from the given membership list.
1512  *
1513  * PUBLIC: int __repmgr_refresh_membership __P((ENV *,
1514  * PUBLIC:     u_int8_t *, size_t));
1515  */
1516 int
__repmgr_refresh_membership(env,buf,len)1517 __repmgr_refresh_membership(env, buf, len)
1518 	ENV *env;
1519 	u_int8_t *buf;
1520 	size_t len;
1521 {
1522 	DB_REP *db_rep;
1523 	REPMGR_SITE *site;
1524 	__repmgr_membr_vers_args membr_vers;
1525 	__repmgr_site_info_args site_info;
1526 	char *host;
1527 	u_int8_t *p;
1528 	u_int16_t port;
1529 	u_int32_t i, n;
1530 	int eid, ret;
1531 
1532 	db_rep = env->rep_handle;
1533 
1534 	/*
1535 	 * Membership list consists of membr_vers followed by a number of
1536 	 * site_info structs.
1537 	 */
1538 	ret = __repmgr_membr_vers_unmarshal(env, &membr_vers, buf, len, &p);
1539 	DB_ASSERT(env, ret == 0);
1540 
1541 	if (db_rep->repmgr_status == stopped)
1542 		return (0);
1543 	/* Ignore obsolete versions. */
1544 	if (__repmgr_gmdb_version_cmp(env,
1545 	    membr_vers.gen, membr_vers.version) <= 0)
1546 		return (0);
1547 
1548 	LOCK_MUTEX(db_rep->mutex);
1549 
1550 	db_rep->membership_version = membr_vers.version;
1551 	db_rep->member_version_gen = membr_vers.gen;
1552 
1553 	for (i = 0; i < db_rep->site_cnt; i++)
1554 		F_CLR(SITE_FROM_EID(i), SITE_TOUCHED);
1555 
1556 	for (n = 0; p < &buf[len]; ++n) {
1557 		ret = __repmgr_site_info_unmarshal(env,
1558 		    &site_info, p, (size_t)(&buf[len] - p), &p);
1559 		DB_ASSERT(env, ret == 0);
1560 
1561 		host = site_info.host.data;
1562 		DB_ASSERT(env,
1563 		    (u_int8_t*)site_info.host.data + site_info.host.size <= p);
1564 		host[site_info.host.size-1] = '\0';
1565 		port = site_info.port;
1566 
1567 		if ((ret = __repmgr_set_membership(env,
1568 		    host, port, site_info.flags)) != 0)
1569 			goto err;
1570 
1571 		if ((ret = __repmgr_find_site(env, host, port, &eid)) != 0)
1572 			goto err;
1573 		DB_ASSERT(env, IS_VALID_EID(eid));
1574 		F_SET(SITE_FROM_EID(eid), SITE_TOUCHED);
1575 	}
1576 	ret = __rep_set_nsites_int(env, n);
1577 	DB_ASSERT(env, ret == 0);
1578 
1579 	/* Scan "touched" flags so as to notice sites that have been removed. */
1580 	for (i = 0; i < db_rep->site_cnt; i++) {
1581 		site = SITE_FROM_EID(i);
1582 		if (F_ISSET(site, SITE_TOUCHED))
1583 			continue;
1584 		host = site->net_addr.host;
1585 		port = site->net_addr.port;
1586 		if ((ret = __repmgr_set_membership(env, host, port, 0)) != 0)
1587 			goto err;
1588 	}
1589 
1590 err:
1591 	UNLOCK_MUTEX(db_rep->mutex);
1592 	return (ret);
1593 }
1594 
1595 /*
1596  * PUBLIC: int __repmgr_reload_gmdb __P((ENV *));
1597  */
1598 int
__repmgr_reload_gmdb(env)1599 __repmgr_reload_gmdb(env)
1600 	ENV *env;
1601 {
1602 	DB_THREAD_INFO *ip;
1603 	u_int8_t *buf;
1604 	size_t len;
1605 	int ret;
1606 
1607 	ENV_ENTER(env, ip);
1608 	if ((ret = read_gmdb(env, ip, &buf, &len)) == 0) {
1609 		env->rep_handle->have_gmdb = TRUE;
1610 		ret = __repmgr_refresh_membership(env, buf, len);
1611 		__os_free(env, buf);
1612 	}
1613 	ENV_LEAVE(env, ip);
1614 	return (ret);
1615 }
1616 
1617 /*
1618  * Return 1, 0, or -1, as the given gen/version combination is >, =, or < our
1619  * currently known version.
1620  *
1621  * PUBLIC: int __repmgr_gmdb_version_cmp __P((ENV *, u_int32_t, u_int32_t));
1622  */
1623 int
__repmgr_gmdb_version_cmp(env,gen,version)1624 __repmgr_gmdb_version_cmp(env, gen, version)
1625 	ENV *env;
1626 	u_int32_t gen, version;
1627 {
1628 	DB_REP *db_rep;
1629 	u_int32_t g, v;
1630 
1631 	db_rep = env->rep_handle;
1632 	g = db_rep->member_version_gen;
1633 	v = db_rep->membership_version;
1634 
1635 	if (gen == g)
1636 		return (version == v ? 0 :
1637 		    (version < v ? -1 : 1));
1638 	return (gen < g ? -1 : 1);
1639 }
1640 
1641 /*
1642  * PUBLIC: int __repmgr_init_save __P((ENV *, DBT *));
1643  */
1644 int
__repmgr_init_save(env,dbt)1645 __repmgr_init_save(env, dbt)
1646 	ENV *env;
1647 	DBT *dbt;
1648 {
1649 	DB_REP *db_rep;
1650 	u_int8_t *buf;
1651 	size_t len;
1652 	int ret;
1653 
1654 	db_rep = env->rep_handle;
1655 	LOCK_MUTEX(db_rep->mutex);
1656 	if (db_rep->site_cnt == 0) {
1657 		dbt->data = NULL;
1658 		dbt->size = 0;
1659 		ret = 0;
1660 	} else if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) == 0) {
1661 		dbt->data = buf;
1662 		dbt->size = (u_int32_t)len;
1663 	}
1664 	UNLOCK_MUTEX(db_rep->mutex);
1665 
1666 	return (ret);
1667 }
1668 
1669 /*
1670  * PUBLIC: int __repmgr_init_restore __P((ENV *, DBT *));
1671  */
1672 int
__repmgr_init_restore(env,dbt)1673 __repmgr_init_restore(env, dbt)
1674 	ENV *env;
1675 	DBT *dbt;
1676 {
1677 	DB_REP *db_rep;
1678 
1679 	db_rep = env->rep_handle;
1680 	db_rep->restored_list = dbt->data;
1681 	db_rep->restored_list_length = dbt->size;
1682 	return (0);
1683 }
1684 
1685 /*
1686  * Generates an internal request for a deferred operation, to be performed on a
1687  * separate thread (conveniently, a message-processing thread).
1688  *
1689  * PUBLIC: int __repmgr_defer_op __P((ENV *, u_int32_t));
1690  *
1691  * Caller should hold mutex.
1692  */
1693 int
__repmgr_defer_op(env,op)1694 __repmgr_defer_op(env, op)
1695 	ENV *env;
1696 	u_int32_t op;
1697 {
1698 	REPMGR_MESSAGE *msg;
1699 	int ret;
1700 
1701 	/*
1702 	 * Overload REPMGR_MESSAGE to convey the type of operation being
1703 	 * requested.  For now "op" is all we need; plenty of room for expansion
1704 	 * if needed in the future.
1705 	 *
1706 	 * Leave msg->v.gmdb_msg.conn NULL to show no conn to be cleaned up.
1707 	 */
1708 	if ((ret = __os_calloc(env, 1, sizeof(*msg), &msg)) != 0)
1709 		return (ret);
1710 	msg->msg_hdr.type = REPMGR_OWN_MSG;
1711 	REPMGR_OWN_MSG_TYPE(msg->msg_hdr) = op;
1712 	ret = __repmgr_queue_put(env, msg);
1713 	return (ret);
1714 }
1715 
1716 /*
1717  * PUBLIC: void __repmgr_fire_conn_err_event __P((ENV *,
1718  * PUBLIC:     REPMGR_CONNECTION *, int));
1719  */
1720 void
__repmgr_fire_conn_err_event(env,conn,err)1721 __repmgr_fire_conn_err_event(env, conn, err)
1722 	ENV *env;
1723 	REPMGR_CONNECTION *conn;
1724 	int err;
1725 {
1726 	DB_REP *db_rep;
1727 	DB_REPMGR_CONN_ERR info;
1728 
1729 	db_rep = env->rep_handle;
1730 	if (conn->type == REP_CONNECTION && IS_VALID_EID(conn->eid)) {
1731 		__repmgr_print_conn_err(env,
1732 		    &SITE_FROM_EID(conn->eid)->net_addr, err);
1733 		info.eid = conn->eid;
1734 		info.error = err;
1735 		DB_EVENT(env, DB_EVENT_REP_CONNECT_BROKEN, &info);
1736 	}
1737 }
1738 
1739 /*
1740  * PUBLIC: void __repmgr_print_conn_err __P((ENV *, repmgr_netaddr_t *, int));
1741  */
1742 void
__repmgr_print_conn_err(env,netaddr,err)1743 __repmgr_print_conn_err(env, netaddr, err)
1744 	ENV *env;
1745 	repmgr_netaddr_t *netaddr;
1746 	int err;
1747 {
1748 	SITE_STRING_BUFFER site_loc_buf;
1749 	char msgbuf[200];	/* Arbitrary size. */
1750 
1751 	(void)__repmgr_format_addr_loc(netaddr, site_loc_buf);
1752 	/* TCP/IP sockets API convention: 0 indicates "end-of-file". */
1753 	if (err == 0)
1754 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1755 			"EOF on connection to %s", site_loc_buf));
1756 	else
1757 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1758 			"`%s' (%d) on connection to %s",
1759 			__os_strerror(err, msgbuf, sizeof(msgbuf)),
1760 			err, site_loc_buf));
1761 }
1762 
1763 /*
1764  * Change role from master to client, but if a GMDB operation is in progress,
1765  * wait for it to finish first.
1766  *
1767  * PUBLIC: int __repmgr_become_client __P((ENV *));
1768  */
1769 int
__repmgr_become_client(env)1770 __repmgr_become_client(env)
1771 	ENV *env;
1772 {
1773 	DB_REP *db_rep;
1774 	int ret;
1775 
1776 	db_rep = env->rep_handle;
1777 	LOCK_MUTEX(db_rep->mutex);
1778 	if ((ret = __repmgr_await_gmdbop(env)) == 0)
1779 		db_rep->client_intent = TRUE;
1780 	UNLOCK_MUTEX(db_rep->mutex);
1781 	return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT) : ret);
1782 }
1783 
1784 /*
1785  * Looks up a site from our local (in-process) list, or returns NULL if not
1786  * found.
1787  *
1788  * PUBLIC: REPMGR_SITE *__repmgr_lookup_site __P((ENV *, const char *, u_int));
1789  */
1790 REPMGR_SITE *
__repmgr_lookup_site(env,host,port)1791 __repmgr_lookup_site(env, host, port)
1792 	ENV *env;
1793 	const char *host;
1794 	u_int port;
1795 {
1796 	DB_REP *db_rep;
1797 	REPMGR_SITE *site;
1798 	u_int i;
1799 
1800 	db_rep = env->rep_handle;
1801 	for (i = 0; i < db_rep->site_cnt; i++) {
1802 		site = &db_rep->sites[i];
1803 
1804 		if (strcmp(site->net_addr.host, host) == 0 &&
1805 		    site->net_addr.port == port)
1806 			return (site);
1807 	}
1808 
1809 	return (NULL);
1810 }
1811 
1812 /*
1813  * Look up a site, or add it if it doesn't already exist.
1814  *
1815  * Caller must hold db_rep mutex and be within ENV_ENTER context, unless this is
1816  * a pre-open call.
1817  *
1818  * PUBLIC: int __repmgr_find_site __P((ENV *, const char *, u_int, int *));
1819  */
1820 int
__repmgr_find_site(env,host,port,eidp)1821 __repmgr_find_site(env, host, port, eidp)
1822 	ENV *env;
1823 	const char *host;
1824 	u_int port;
1825 	int *eidp;
1826 {
1827 	DB_REP *db_rep;
1828 	REP *rep;
1829 	REPMGR_SITE *site;
1830 	int eid, ret;
1831 
1832 	db_rep = env->rep_handle;
1833 	ret = 0;
1834 	if (REP_ON(env)) {
1835 		rep = db_rep->region;
1836 		MUTEX_LOCK(env, rep->mtx_repmgr);
1837 		ret = get_eid(env, host, port, &eid);
1838 		MUTEX_UNLOCK(env, rep->mtx_repmgr);
1839 	} else {
1840 		if ((site = __repmgr_lookup_site(env, host, port)) == NULL &&
1841 		    (ret = __repmgr_new_site(env, &site, host, port)) != 0)
1842 			return (ret);
1843 		eid = EID_FROM_SITE(site);
1844 	}
1845 	if (ret == 0)
1846 		*eidp = eid;
1847 	return (ret);
1848 }
1849 
1850 /*
1851  * Get the EID of the named remote site, even if it means creating a new entry
1852  * in our table if it doesn't already exist.
1853  *
1854  * Caller must hold both db_rep mutex and mtx_repmgr.
1855  */
1856 static int
get_eid(env,host,port,eidp)1857 get_eid(env, host, port, eidp)
1858 	ENV *env;
1859 	const char *host;
1860 	u_int port;
1861 	int *eidp;
1862 {
1863 	DB_REP *db_rep;
1864 	REP *rep;
1865 	REPMGR_SITE *site;
1866 	int eid, ret;
1867 
1868 	db_rep = env->rep_handle;
1869 	rep = db_rep->region;
1870 
1871 	if ((ret = __repmgr_copy_in_added_sites(env)) != 0)
1872 		return (ret);
1873 	if ((site = __repmgr_lookup_site(env, host, port)) == NULL) {
1874 		/*
1875 		 * Store both locally and in shared region.
1876 		 */
1877 		if ((ret = __repmgr_new_site(env, &site, host, port)) != 0)
1878 			return (ret);
1879 
1880 		eid = EID_FROM_SITE(site);
1881 		DB_ASSERT(env, (u_int)eid == db_rep->site_cnt - 1);
1882 		if ((ret = __repmgr_share_netaddrs(env,
1883 		    rep, (u_int)eid, db_rep->site_cnt)) == 0) {
1884 			/* Show that a change was made. */
1885 			db_rep->siteinfo_seq = ++rep->siteinfo_seq;
1886 		} else {
1887 			/*
1888 			 * Rescind the local slot we just added, so that we at
1889 			 * least keep the two lists in sync.
1890 			 */
1891 			db_rep->site_cnt--;
1892 			__repmgr_cleanup_netaddr(env, &site->net_addr);
1893 		}
1894 	} else
1895 		eid = EID_FROM_SITE(site);
1896 	if (ret == 0)
1897 		*eidp = eid;
1898 	return (ret);
1899 }
1900 
1901 /*
1902  * Sets the named remote site's group membership status to the given value,
1903  * creating it first if it doesn't already exist.  Adjusts connections
1904  * accordingly.
1905  *
1906  * PUBLIC: int __repmgr_set_membership __P((ENV *,
1907  * PUBLIC:     const char *, u_int, u_int32_t));
1908  *
1909  * Caller must host db_rep mutex, and be in ENV_ENTER context.
1910  */
1911 int
__repmgr_set_membership(env,host,port,status)1912 __repmgr_set_membership(env, host, port, status)
1913 	ENV *env;
1914 	const char *host;
1915 	u_int port;
1916 	u_int32_t status;
1917 {
1918 	DB_REP *db_rep;
1919 	REP *rep;
1920 	REGINFO *infop;
1921 	REPMGR_SITE *site;
1922 	SITEINFO *sites;
1923 	u_int32_t orig;
1924 	int eid, ret;
1925 
1926 	db_rep = env->rep_handle;
1927 	rep = db_rep->region;
1928 	infop = env->reginfo;
1929 
1930 	COMPQUIET(orig, 0);
1931 	COMPQUIET(site, NULL);
1932 	DB_ASSERT(env, REP_ON(env));
1933 
1934 	MUTEX_LOCK(env, rep->mtx_repmgr);
1935 	if ((ret = get_eid(env, host, port, &eid)) == 0) {
1936 		DB_ASSERT(env, IS_VALID_EID(eid));
1937 		site = SITE_FROM_EID(eid);
1938 		orig = site->membership;
1939 		sites = R_ADDR(infop, rep->siteinfo_off);
1940 
1941 		RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1942 		    "set membership for %s:%lu %lu (was %lu)",
1943 		    host, (u_long)port, (u_long)status, (u_long)orig));
1944 		if (status != sites[eid].status) {
1945 			/*
1946 			 * Show that a change is occurring.
1947 			 *
1948 			 * The call to get_eid() might have also bumped the
1949 			 * sequence number, and since this is all happening
1950 			 * within a single critical section it would be possible
1951 			 * to avoid "wasting" a sequence number.  But it's
1952 			 * hardly worth the trouble and mental complexity: the
1953 			 * sequence number counts changes that occur within an
1954 			 * env region lifetime, so there should be plenty.
1955 			 * We'll run out of membership DB version numbers long
1956 			 * before this becomes a problem.
1957 			 */
1958 			db_rep->siteinfo_seq = ++rep->siteinfo_seq;
1959 		}
1960 
1961 		/* Set both private and shared copies of the info. */
1962 		site->membership = status;
1963 		sites[eid].status = status;
1964 	}
1965 	MUTEX_UNLOCK(env, rep->mtx_repmgr);
1966 
1967 	/*
1968 	 * If our notion of the site's membership changed, we may need to create
1969 	 * or kill a connection.
1970 	 */
1971 	if (ret == 0 && db_rep->repmgr_status == running &&
1972 	    SELECTOR_RUNNING(db_rep)) {
1973 
1974 		if (eid == db_rep->self_eid && status != SITE_PRESENT)
1975 			ret = DB_DELETED;
1976 		else if (orig != SITE_PRESENT && status == SITE_PRESENT &&
1977 		    site->state == SITE_IDLE) {
1978 			/*
1979 			 * Here we might have just joined a group, or we might
1980 			 * be an existing site and we've just learned of another
1981 			 * site joining the group.  In the former case, we
1982 			 * certainly want to connect right away; in the later
1983 			 * case it might be better to wait, because the new site
1984 			 * probably isn't quite ready to accept our connection.
1985 			 * But deciding which case we're in here would be messy,
1986 			 * so for now we just keep it simple and always try
1987 			 * connecting immediately.  The resulting connection
1988 			 * failure shouldn't hurt anything, because we'll just
1989 			 * naturally try again later.
1990 			 */
1991 			ret = __repmgr_schedule_connection_attempt(env,
1992 			    eid, TRUE);
1993 			if (eid != db_rep->self_eid)
1994 				DB_EVENT(env, DB_EVENT_REP_SITE_ADDED, &eid);
1995 		} else if (orig != 0 && status == 0)
1996 			DB_EVENT(env, DB_EVENT_REP_SITE_REMOVED, &eid);
1997 
1998 		/*
1999 		 * Callers are responsible for adjusting nsites, even though in
2000 		 * a way it would make sense to do it here.  It's awkward to do
2001 		 * it here at start-up/join time, when we load up starting from
2002 		 * an empty array.  Then we would get rep_set_nsites()
2003 		 * repeatedly, and when leases were in use that would thrash the
2004 		 * lease table adjustment.
2005 		 */
2006 	}
2007 	return (ret);
2008 }
2009 
2010 /*
2011  * PUBLIC: int __repmgr_bcast_parm_refresh __P((ENV *));
2012  */
2013 int
__repmgr_bcast_parm_refresh(env)2014 __repmgr_bcast_parm_refresh(env)
2015 	ENV *env;
2016 {
2017 	DB_REP *db_rep;
2018 	REP *rep;
2019 	__repmgr_parm_refresh_args parms;
2020 	u_int8_t buf[__REPMGR_PARM_REFRESH_SIZE];
2021 	int ret;
2022 
2023 	DB_ASSERT(env, REP_ON(env));
2024 	db_rep = env->rep_handle;
2025 	rep = db_rep->region;
2026 	LOCK_MUTEX(db_rep->mutex);
2027 	parms.ack_policy = (u_int32_t)rep->perm_policy;
2028 	if (rep->priority == 0)
2029 		parms.flags = 0;
2030 	else
2031 		parms.flags = SITE_ELECTABLE;
2032 	__repmgr_parm_refresh_marshal(env, &parms, buf);
2033 	ret = __repmgr_bcast_own_msg(env,
2034 	    REPMGR_PARM_REFRESH, buf, __REPMGR_PARM_REFRESH_SIZE);
2035 	UNLOCK_MUTEX(db_rep->mutex);
2036 	return (ret);
2037 }
2038 
2039 /*
2040  * PUBLIC: int __repmgr_chg_prio __P((ENV *, u_int32_t, u_int32_t));
2041  */
2042 int
__repmgr_chg_prio(env,prev,cur)2043 __repmgr_chg_prio(env, prev, cur)
2044 	ENV *env;
2045 	u_int32_t prev, cur;
2046 {
2047 	if ((prev == 0 && cur != 0) ||
2048 	    (prev != 0 && cur == 0))
2049 		return (__repmgr_bcast_parm_refresh(env));
2050 	return (0);
2051 }
2052 
2053 /*
2054  * PUBLIC: int __repmgr_bcast_own_msg __P((ENV *,
2055  * PUBLIC:     u_int32_t, u_int8_t *, size_t));
2056  *
2057  * Caller must hold mutex.
2058  */
2059 int
__repmgr_bcast_own_msg(env,type,buf,len)2060 __repmgr_bcast_own_msg(env, type, buf, len)
2061 	ENV *env;
2062 	u_int32_t type;
2063 	u_int8_t *buf;
2064 	size_t len;
2065 {
2066 	DB_REP *db_rep;
2067 	REPMGR_CONNECTION *conn;
2068 	REPMGR_SITE *site;
2069 	int ret;
2070 	u_int i;
2071 
2072 	db_rep = env->rep_handle;
2073 	if (!SELECTOR_RUNNING(db_rep))
2074 		return (0);
2075 	FOR_EACH_REMOTE_SITE_INDEX(i) {
2076 		site = SITE_FROM_EID(i);
2077 		if (site->state != SITE_CONNECTED)
2078 			continue;
2079 		if ((conn = site->ref.conn.in) != NULL &&
2080 		    conn->state == CONN_READY &&
2081 		    (ret = __repmgr_send_own_msg(env,
2082 		    conn, type, buf, (u_int32_t)len)) != 0 &&
2083 		    (ret = __repmgr_bust_connection(env, conn)) != 0)
2084 			return (ret);
2085 		if ((conn = site->ref.conn.out) != NULL &&
2086 		    conn->state == CONN_READY &&
2087 		    (ret = __repmgr_send_own_msg(env,
2088 		    conn, type, buf, (u_int32_t)len)) != 0 &&
2089 		    (ret = __repmgr_bust_connection(env, conn)) != 0)
2090 			return (ret);
2091 	}
2092 	return (0);
2093 }
2094