1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005, 2013 Oracle and/or its affiliates. All rights reserved.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/btree.h"
14 #include "dbinc/txn.h"
15
16 #define INITIAL_SITES_ALLOCATION 3 /* Arbitrary guess. */
17
18 static int get_eid __P((ENV *, const char *, u_int, int *));
19 static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *));
20 static int read_gmdb __P((ENV *, DB_THREAD_INFO *, u_int8_t **, size_t *));
21
22 /*
23 * Schedules a future attempt to re-establish a connection with the given site.
24 * Usually, we wait the configured retry_wait period. But if the "immediate"
25 * parameter is given as TRUE, we'll make the wait time 0, and put the request
26 * at the _beginning_ of the retry queue.
27 *
28 * PUBLIC: int __repmgr_schedule_connection_attempt __P((ENV *, int, int));
29 *
30 * !!!
31 * Caller should hold mutex.
32 *
33 * Unless an error occurs, we always attempt to wake the main thread;
34 * __repmgr_bust_connection relies on this behavior.
35 */
36 int
__repmgr_schedule_connection_attempt(env,eid,immediate)37 __repmgr_schedule_connection_attempt(env, eid, immediate)
38 ENV *env;
39 int eid;
40 int immediate;
41 {
42 DB_REP *db_rep;
43 REP *rep;
44 REPMGR_RETRY *retry, *target;
45 REPMGR_SITE *site;
46 db_timespec t;
47 int ret;
48
49 db_rep = env->rep_handle;
50 rep = db_rep->region;
51 if ((ret = __os_malloc(env, sizeof(*retry), &retry)) != 0)
52 return (ret);
53
54 DB_ASSERT(env, IS_VALID_EID(eid));
55 site = SITE_FROM_EID(eid);
56 __os_gettime(env, &t, 1);
57 if (immediate)
58 TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
59 else {
60 TIMESPEC_ADD_DB_TIMEOUT(&t, rep->connection_retry_wait);
61 /*
62 * Insert the new "retry" on the (time-ordered) list in its
63 * proper position. To do so, find the list entry ("target")
64 * with a later time; insert the new entry just before that.
65 */
66 TAILQ_FOREACH(target, &db_rep->retries, entries) {
67 if (timespeccmp(&target->time, &t, >))
68 break;
69 }
70 if (target == NULL)
71 TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries);
72 else
73 TAILQ_INSERT_BEFORE(target, retry, entries);
74
75 }
76 retry->eid = eid;
77 retry->time = t;
78
79 site->state = SITE_PAUSING;
80 site->ref.retry = retry;
81
82 return (__repmgr_wake_main_thread(env));
83 }
84
85 /*
86 * Determines whether a remote site should be considered a "server" to us as a
87 * "client" (in typical client/server terminology, not to be confused with our
88 * usual use of the term "client" as in the master/client replication role), or
89 * vice versa.
90 *
91 * PUBLIC: int __repmgr_is_server __P((ENV *, REPMGR_SITE *));
92 */
93 int
__repmgr_is_server(env,site)94 __repmgr_is_server(env, site)
95 ENV *env;
96 REPMGR_SITE *site;
97 {
98 DB_REP *db_rep;
99 int cmp;
100
101 db_rep = env->rep_handle;
102 cmp = __repmgr_addrcmp(&site->net_addr,
103 &SITE_FROM_EID(db_rep->self_eid)->net_addr);
104 DB_ASSERT(env, cmp != 0);
105
106 /*
107 * The mnemonic here is that a server conventionally has a
108 * small well-known port number, while clients typically use a port
109 * number from the higher ephemeral range. So, for the remote site to
110 * be considered a server, its address should have compared as lower
111 * than ours.
112 */
113 return (cmp == -1);
114 }
115
116 /*
117 * Compare two network addresses (lexicographically), and return -1, 0, or 1, as
118 * the first is less than, equal to, or greater than the second.
119 */
120 static int
__repmgr_addrcmp(addr1,addr2)121 __repmgr_addrcmp(addr1, addr2)
122 repmgr_netaddr_t *addr1, *addr2;
123 {
124 int cmp;
125
126 cmp = strcmp(addr1->host, addr2->host);
127 if (cmp != 0)
128 return (cmp);
129
130 if (addr1->port < addr2->port)
131 return (-1);
132 else if (addr1->port > addr2->port)
133 return (1);
134 return (0);
135 }
136
137 /*
138 * Initialize the necessary control structures to begin reading a new input
139 * message.
140 *
141 * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *));
142 */
143 void
__repmgr_reset_for_reading(con)144 __repmgr_reset_for_reading(con)
145 REPMGR_CONNECTION *con;
146 {
147 con->reading_phase = SIZES_PHASE;
148 __repmgr_iovec_init(&con->iovecs);
149 __repmgr_add_buffer(&con->iovecs,
150 con->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE);
151 }
152
153 /*
154 * Constructs a DB_REPMGR_CONNECTION structure.
155 *
156 * PUBLIC: int __repmgr_new_connection __P((ENV *,
157 * PUBLIC: REPMGR_CONNECTION **, socket_t, int));
158 */
159 int
__repmgr_new_connection(env,connp,s,state)160 __repmgr_new_connection(env, connp, s, state)
161 ENV *env;
162 REPMGR_CONNECTION **connp;
163 socket_t s;
164 int state;
165 {
166 REPMGR_CONNECTION *c;
167 int ret;
168
169 if ((ret = __os_calloc(env, 1, sizeof(REPMGR_CONNECTION), &c)) != 0)
170 return (ret);
171 if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
172 __os_free(env, c);
173 return (ret);
174 }
175 if ((ret = __repmgr_init_waiters(env, &c->response_waiters)) != 0) {
176 (void)__repmgr_free_cond(&c->drained);
177 __os_free(env, c);
178 return (ret);
179 }
180
181 c->fd = s;
182 c->state = state;
183 c->type = UNKNOWN_CONN_TYPE;
184 #ifdef DB_WIN32
185 c->event_object = WSA_INVALID_EVENT;
186 #endif
187
188 STAILQ_INIT(&c->outbound_queue);
189 c->out_queue_length = 0;
190
191 __repmgr_reset_for_reading(c);
192 *connp = c;
193
194 return (0);
195 }
196
197 /*
198 * PUBLIC: int __repmgr_set_keepalive __P((ENV *, REPMGR_CONNECTION *));
199 */
200 int
__repmgr_set_keepalive(env,conn)201 __repmgr_set_keepalive(env, conn)
202 ENV *env;
203 REPMGR_CONNECTION *conn;
204 {
205 int ret, sockopt;
206
207 ret = 0;
208 #ifdef SO_KEEPALIVE
209 sockopt = 1;
210 if (setsockopt(conn->fd, SOL_SOCKET,
211 SO_KEEPALIVE, (sockopt_t)&sockopt, sizeof(sockopt)) != 0) {
212 ret = net_errno;
213 __db_err(env, ret, DB_STR("3626",
214 "can't set KEEPALIVE socket option"));
215 (void)__repmgr_destroy_conn(env, conn);
216 }
217 #endif
218 return (ret);
219 }
220
221 /*
222 * PUBLIC: int __repmgr_new_site __P((ENV *, REPMGR_SITE**,
223 * PUBLIC: const char *, u_int));
224 *
225 * Manipulates the process-local copy of the sites list. So, callers should
226 * hold the db_rep->mutex (except for single-threaded, pre-open configuration).
227 */
228 int
__repmgr_new_site(env,sitep,host,port)229 __repmgr_new_site(env, sitep, host, port)
230 ENV *env;
231 REPMGR_SITE **sitep;
232 const char *host;
233 u_int port;
234 {
235 DB_REP *db_rep;
236 REPMGR_CONNECTION *conn;
237 REPMGR_SITE *site, *sites;
238 char *p;
239 u_int i, new_site_max;
240 int ret;
241
242 db_rep = env->rep_handle;
243 if (db_rep->site_cnt >= db_rep->site_max) {
244 new_site_max = db_rep->site_max == 0 ?
245 INITIAL_SITES_ALLOCATION : db_rep->site_max * 2;
246 if ((ret = __os_malloc(env,
247 sizeof(REPMGR_SITE) * new_site_max, &sites)) != 0)
248 return (ret);
249 if (db_rep->site_max > 0) {
250 /*
251 * For each site in the array, copy the old struct to
252 * the space allocated for the new struct. But the
253 * sub_conns list header (and one of the conn structs on
254 * the list, if any) contain pointers to the address of
255 * the old list header; so we have to move them
256 * explicitly. If not for that, we could use a simple
257 * __os_realloc() call.
258 */
259 for (i = 0; i < db_rep->site_cnt; i++) {
260 sites[i] = db_rep->sites[i];
261 TAILQ_INIT(&sites[i].sub_conns);
262 while (!TAILQ_EMPTY(
263 &db_rep->sites[i].sub_conns)) {
264 conn = TAILQ_FIRST(
265 &db_rep->sites[i].sub_conns);
266 TAILQ_REMOVE(
267 &db_rep->sites[i].sub_conns,
268 conn, entries);
269 TAILQ_INSERT_TAIL(&sites[i].sub_conns,
270 conn, entries);
271 }
272 }
273 __os_free(env, db_rep->sites);
274 }
275 db_rep->sites = sites;
276 db_rep->site_max = new_site_max;
277 }
278 if ((ret = __os_strdup(env, host, &p)) != 0) {
279 /* No harm in leaving the increased site_max intact. */
280 return (ret);
281 }
282 site = &db_rep->sites[db_rep->site_cnt++];
283
284 site->net_addr.host = p;
285 site->net_addr.port = (u_int16_t)port;
286
287 ZERO_LSN(site->max_ack);
288 site->ack_policy = 0;
289 site->alignment = 0;
290 site->flags = 0;
291 timespecclear(&site->last_rcvd_timestamp);
292 TAILQ_INIT(&site->sub_conns);
293 site->connector = NULL;
294 site->ref.conn.in = site->ref.conn.out = NULL;
295 site->state = SITE_IDLE;
296
297 site->membership = 0;
298 site->config = 0;
299
300 *sitep = site;
301 return (0);
302 }
303
304 /*
305 * PUBLIC: int __repmgr_create_mutex __P((ENV *, mgr_mutex_t **));
306 */
307 int
__repmgr_create_mutex(env,mtxp)308 __repmgr_create_mutex(env, mtxp)
309 ENV *env;
310 mgr_mutex_t **mtxp;
311 {
312 mgr_mutex_t *mtx;
313 int ret;
314
315 if ((ret = __os_malloc(env, sizeof(mgr_mutex_t), &mtx)) == 0 &&
316 (ret = __repmgr_create_mutex_pf(mtx)) != 0) {
317 __os_free(env, mtx);
318 }
319 if (ret == 0)
320 *mtxp = mtx;
321 return (ret);
322 }
323
324 /*
325 * PUBLIC: int __repmgr_destroy_mutex __P((ENV *, mgr_mutex_t *));
326 */
327 int
__repmgr_destroy_mutex(env,mtx)328 __repmgr_destroy_mutex(env, mtx)
329 ENV *env;
330 mgr_mutex_t *mtx;
331 {
332 int ret;
333
334 ret = __repmgr_destroy_mutex_pf(mtx);
335 __os_free(env, mtx);
336 return (ret);
337 }
338
339 /*
340 * Kind of like a destructor for a repmgr_netaddr_t: cleans up any subordinate
341 * allocated memory pointed to by the addr, though it does not free the struct
342 * itself.
343 *
344 * PUBLIC: void __repmgr_cleanup_netaddr __P((ENV *, repmgr_netaddr_t *));
345 */
346 void
__repmgr_cleanup_netaddr(env,addr)347 __repmgr_cleanup_netaddr(env, addr)
348 ENV *env;
349 repmgr_netaddr_t *addr;
350 {
351 if (addr->host != NULL) {
352 __os_free(env, addr->host);
353 addr->host = NULL;
354 }
355 }
356
357 /*
358 * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *));
359 */
360 void
__repmgr_iovec_init(v)361 __repmgr_iovec_init(v)
362 REPMGR_IOVECS *v;
363 {
364 v->offset = v->count = 0;
365 v->total_bytes = 0;
366 }
367
368 /*
369 * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t));
370 *
371 * !!!
372 * There is no checking for overflow of the vectors[5] array.
373 */
374 void
__repmgr_add_buffer(v,address,length)375 __repmgr_add_buffer(v, address, length)
376 REPMGR_IOVECS *v;
377 void *address;
378 size_t length;
379 {
380 if (length > 0) {
381 v->vectors[v->count].iov_base = address;
382 v->vectors[v->count++].iov_len = (u_long)length;
383 v->total_bytes += length;
384 }
385 }
386
387 /*
388 * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *));
389 */
390 void
__repmgr_add_dbt(v,dbt)391 __repmgr_add_dbt(v, dbt)
392 REPMGR_IOVECS *v;
393 const DBT *dbt;
394 {
395 if (dbt->size > 0) {
396 v->vectors[v->count].iov_base = dbt->data;
397 v->vectors[v->count++].iov_len = dbt->size;
398 v->total_bytes += dbt->size;
399 }
400 }
401
402 /*
403 * Update a set of iovecs to reflect the number of bytes transferred in an I/O
404 * operation, so that the iovecs can be used to continue transferring where we
405 * left off.
406 * Returns TRUE if the set of buffers is now fully consumed, FALSE if more
407 * remains.
408 *
409 * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t));
410 */
411 int
__repmgr_update_consumed(v,byte_count)412 __repmgr_update_consumed(v, byte_count)
413 REPMGR_IOVECS *v;
414 size_t byte_count;
415 {
416 db_iovec_t *iov;
417 int i;
418
419 for (i = v->offset; ; i++) {
420 DB_ASSERT(NULL, i < v->count && byte_count > 0);
421 iov = &v->vectors[i];
422 if (byte_count > iov->iov_len) {
423 /*
424 * We've consumed (more than) this vector's worth.
425 * Adjust count and continue.
426 */
427 byte_count -= iov->iov_len;
428 } else {
429 /*
430 * Adjust length of remaining portion of vector.
431 * byte_count can never be greater than iov_len, or we
432 * would not be in this section of the if clause.
433 */
434 iov->iov_len -= (u_int32_t)byte_count;
435 if (iov->iov_len > 0) {
436 /*
437 * Still some left in this vector. Adjust base
438 * address too, and leave offset pointing here.
439 */
440 iov->iov_base = (void *)
441 ((u_int8_t *)iov->iov_base + byte_count);
442 v->offset = i;
443 } else {
444 /*
445 * Consumed exactly to a vector boundary.
446 * Advance to next vector for next time.
447 */
448 v->offset = i+1;
449 }
450 /*
451 * If offset has reached count, the entire thing is
452 * consumed.
453 */
454 return (v->offset >= v->count);
455 }
456 }
457 }
458
459 /*
460 * Builds a buffer containing our network address information, suitable for
461 * publishing as cdata via a call to rep_start, and sets up the given DBT to
462 * point to it. The buffer is dynamically allocated memory, and the caller must
463 * assume responsibility for it.
464 *
465 * PUBLIC: int __repmgr_prepare_my_addr __P((ENV *, DBT *));
466 */
467 int
__repmgr_prepare_my_addr(env,dbt)468 __repmgr_prepare_my_addr(env, dbt)
469 ENV *env;
470 DBT *dbt;
471 {
472 DB_REP *db_rep;
473 repmgr_netaddr_t addr;
474 size_t size, hlen;
475 u_int16_t port_buffer;
476 u_int8_t *ptr;
477 int ret;
478
479 db_rep = env->rep_handle;
480 LOCK_MUTEX(db_rep->mutex);
481 addr = SITE_FROM_EID(db_rep->self_eid)->net_addr;
482 UNLOCK_MUTEX(db_rep->mutex);
483 /*
484 * The cdata message consists of the 2-byte port number, in network byte
485 * order, followed by the null-terminated host name string.
486 */
487 port_buffer = htons(addr.port);
488 size = sizeof(port_buffer) + (hlen = strlen(addr.host) + 1);
489 if ((ret = __os_malloc(env, size, &ptr)) != 0)
490 return (ret);
491
492 DB_INIT_DBT(*dbt, ptr, size);
493
494 memcpy(ptr, &port_buffer, sizeof(port_buffer));
495 ptr = &ptr[sizeof(port_buffer)];
496 memcpy(ptr, addr.host, hlen);
497
498 return (0);
499 }
500
501 /*
502 * !!!
503 * This may only be called after threads have been started, because we don't
504 * know the answer until we have established group membership (e.g., reading the
505 * membership database). That should be OK, because we only need this
506 * for starting an election, or counting acks after sending a PERM message.
507 *
508 * PUBLIC: int __repmgr_get_nsites __P((ENV *, u_int32_t *));
509 */
510 int
__repmgr_get_nsites(env,nsitesp)511 __repmgr_get_nsites(env, nsitesp)
512 ENV *env;
513 u_int32_t *nsitesp;
514 {
515 DB_REP *db_rep;
516 u_int32_t nsites;
517
518 db_rep = env->rep_handle;
519
520 if ((nsites = db_rep->region->config_nsites) == 0) {
521 __db_errx(env, DB_STR("3672",
522 "Nsites unknown before repmgr_start()"));
523 return (EINVAL);
524 }
525 *nsitesp = nsites;
526 return (0);
527 }
528
529 /*
530 * PUBLIC: int __repmgr_thread_failure __P((ENV *, int));
531 */
532 int
__repmgr_thread_failure(env,why)533 __repmgr_thread_failure(env, why)
534 ENV *env;
535 int why;
536 {
537 DB_REP *db_rep;
538
539 db_rep = env->rep_handle;
540 LOCK_MUTEX(db_rep->mutex);
541 (void)__repmgr_stop_threads(env);
542 UNLOCK_MUTEX(db_rep->mutex);
543 return (__env_panic(env, why));
544 }
545
546 /*
547 * Format a printable representation of a site location, suitable for inclusion
548 * in an error message. The buffer must be at least as big as
549 * MAX_SITE_LOC_STRING.
550 *
551 * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *,
552 * PUBLIC: REPMGR_CONNECTION *, char *));
553 *
554 * Caller must hold mutex.
555 */
556 char *
__repmgr_format_eid_loc(db_rep,conn,buffer)557 __repmgr_format_eid_loc(db_rep, conn, buffer)
558 DB_REP *db_rep;
559 REPMGR_CONNECTION *conn;
560 char *buffer;
561 {
562 int eid;
563
564 if (conn->type == APP_CONNECTION)
565 snprintf(buffer,
566 MAX_SITE_LOC_STRING, "(application channel)");
567 else if (conn->type == REP_CONNECTION &&
568 IS_VALID_EID(eid = conn->eid))
569 (void)__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer);
570 else
571 snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)");
572 return (buffer);
573 }
574
575 /*
576 * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *));
577 */
578 char *
__repmgr_format_site_loc(site,buffer)579 __repmgr_format_site_loc(site, buffer)
580 REPMGR_SITE *site;
581 char *buffer;
582 {
583 return (__repmgr_format_addr_loc(&site->net_addr, buffer));
584 }
585
586 /*
587 * PUBLIC: char *__repmgr_format_addr_loc __P((repmgr_netaddr_t *, char *));
588 */
589 char *
__repmgr_format_addr_loc(addr,buffer)590 __repmgr_format_addr_loc(addr, buffer)
591 repmgr_netaddr_t *addr;
592 char *buffer;
593 {
594 snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu",
595 addr->host, (u_long)addr->port);
596 return (buffer);
597 }
598
599 /*
600 * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t));
601 */
602 int
__repmgr_repstart(env,flags)603 __repmgr_repstart(env, flags)
604 ENV *env;
605 u_int32_t flags;
606 {
607 DBT my_addr;
608 int ret;
609
610 /* Include "cdata" in case sending to old-version site. */
611 if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
612 return (ret);
613 ret = __rep_start_int(env, &my_addr, flags);
614 __os_free(env, my_addr.data);
615 if (ret != 0)
616 __db_err(env, ret, DB_STR("3673", "rep_start"));
617 return (ret);
618 }
619
620 /*
621 * PUBLIC: int __repmgr_become_master __P((ENV *));
622 */
623 int
__repmgr_become_master(env)624 __repmgr_become_master(env)
625 ENV *env;
626 {
627 DB_REP *db_rep;
628 DB_THREAD_INFO *ip;
629 DB *dbp;
630 DB_TXN *txn;
631 REPMGR_SITE *site;
632 DBT key_dbt, data_dbt;
633 __repmgr_membership_key_args key;
634 __repmgr_membership_data_args member_status;
635 repmgr_netaddr_t addr;
636 u_int32_t status;
637 u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
638 u_int8_t key_buf[MAX_MSG_BUF];
639 size_t len;
640 u_int i;
641 int ret, t_ret;
642
643 db_rep = env->rep_handle;
644 dbp = NULL;
645 txn = NULL;
646
647 /* Examine membership list to see if we have a victim in limbo. */
648 LOCK_MUTEX(db_rep->mutex);
649 ZERO_LSN(db_rep->limbo_failure);
650 ZERO_LSN(db_rep->durable_lsn);
651 db_rep->limbo_victim = DB_EID_INVALID;
652 db_rep->limbo_resolution_needed = FALSE;
653 FOR_EACH_REMOTE_SITE_INDEX(i) {
654 site = SITE_FROM_EID(i);
655 if (site->membership == SITE_ADDING ||
656 site->membership == SITE_DELETING) {
657 db_rep->limbo_victim = (int)i;
658 db_rep->limbo_resolution_needed = TRUE;
659
660 /*
661 * Since there can never be more than one limbo victim,
662 * when we find one we don't have to continue looking
663 * for others.
664 */
665 break;
666 }
667 }
668 db_rep->client_intent = FALSE;
669 UNLOCK_MUTEX(db_rep->mutex);
670
671 if ((ret = __repmgr_repstart(env, DB_REP_MASTER)) != 0)
672 return (ret);
673
674 /*
675 * Make sure member_version_gen is current so that this master
676 * can reject obsolete member lists from other sites.
677 */
678 db_rep->member_version_gen = db_rep->region->gen;
679
680 /* If there is already a gmdb, we are finished. */
681 if (db_rep->have_gmdb)
682 return (0);
683
684 /* There isn't a gmdb. Create one from the in-memory site list. */
685 ENV_ENTER(env, ip);
686 if ((ret = __repmgr_hold_master_role(env, NULL)) != 0)
687 goto leave;
688 retry:
689 if ((ret = __repmgr_setup_gmdb_op(env, ip, &txn, DB_CREATE)) != 0)
690 goto err;
691
692 DB_ASSERT(env, txn != NULL);
693 dbp = db_rep->gmdb;
694 DB_ASSERT(env, dbp != NULL);
695
696 /* Write the meta-data record. */
697 if ((ret = __repmgr_set_gm_version(env, ip, txn, 1)) != 0)
698 goto err;
699
700 /* Write a record representing each site in the group. */
701 for (i = 0; i < db_rep->site_cnt; i++) {
702 LOCK_MUTEX(db_rep->mutex);
703 site = SITE_FROM_EID(i);
704 addr = site->net_addr;
705 status = site->membership;
706 UNLOCK_MUTEX(db_rep->mutex);
707 if (status == 0)
708 continue;
709 DB_INIT_DBT(key.host, addr.host, strlen(addr.host) + 1);
710 key.port = addr.port;
711 ret = __repmgr_membership_key_marshal(env,
712 &key, key_buf, sizeof(key_buf), &len);
713 DB_ASSERT(env, ret == 0);
714 DB_INIT_DBT(key_dbt, key_buf, len);
715 member_status.flags = status;
716 __repmgr_membership_data_marshal(env, &member_status, data_buf);
717 DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
718 if ((ret = __db_put(dbp, ip, txn, &key_dbt, &data_dbt, 0)) != 0)
719 goto err;
720 }
721
722 err:
723 if (txn != NULL) {
724 if ((t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 &&
725 ret == 0)
726 ret = t_ret;
727 if ((t_ret = __repmgr_cleanup_gmdb_op(env, TRUE)) != 0 &&
728 ret == 0)
729 ret = t_ret;
730 }
731 if (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED)
732 goto retry;
733 if ((t_ret = __repmgr_rlse_master_role(env)) != 0 && ret == 0)
734 ret = t_ret;
735 leave:
736 ENV_LEAVE(env, ip);
737 return (ret);
738 }
739
740 /*
741 * Visits all the connections we know about, performing the desired action.
742 * "err_quit" determines whether we give up, or soldier on, in case of an
743 * error.
744 *
745 * PUBLIC: int __repmgr_each_connection __P((ENV *,
746 * PUBLIC: CONNECTION_ACTION, void *, int));
747 *
748 * !!!
749 * Caller must hold mutex.
750 */
751 int
__repmgr_each_connection(env,callback,info,err_quit)752 __repmgr_each_connection(env, callback, info, err_quit)
753 ENV *env;
754 CONNECTION_ACTION callback;
755 void *info;
756 int err_quit;
757 {
758 DB_REP *db_rep;
759 REPMGR_CONNECTION *conn, *next;
760 REPMGR_SITE *site;
761 int eid, ret, t_ret;
762
763 #define HANDLE_ERROR \
764 do { \
765 if (err_quit) \
766 return (t_ret); \
767 if (ret == 0) \
768 ret = t_ret; \
769 } while (0)
770
771 db_rep = env->rep_handle;
772 ret = 0;
773
774 /*
775 * We might have used TAILQ_FOREACH here, except that in some cases we
776 * need to unlink an element along the way.
777 */
778 for (conn = TAILQ_FIRST(&db_rep->connections);
779 conn != NULL;
780 conn = next) {
781 next = TAILQ_NEXT(conn, entries);
782
783 if ((t_ret = (*callback)(env, conn, info)) != 0)
784 HANDLE_ERROR;
785 }
786
787 FOR_EACH_REMOTE_SITE_INDEX(eid) {
788 site = SITE_FROM_EID(eid);
789
790 if (site->state == SITE_CONNECTED) {
791 if ((conn = site->ref.conn.in) != NULL &&
792 (t_ret = (*callback)(env, conn, info)) != 0)
793 HANDLE_ERROR;
794 if ((conn = site->ref.conn.out) != NULL &&
795 (t_ret = (*callback)(env, conn, info)) != 0)
796 HANDLE_ERROR;
797 }
798
799 for (conn = TAILQ_FIRST(&site->sub_conns);
800 conn != NULL;
801 conn = next) {
802 next = TAILQ_NEXT(conn, entries);
803 if ((t_ret = (*callback)(env, conn, info)) != 0)
804 HANDLE_ERROR;
805 }
806 }
807
808 return (0);
809 }
810
811 /*
812 * Initialize repmgr's portion of the shared region area. Note that we can't
813 * simply get the REP* address from the env as we usually do, because at the
814 * time of this call it hasn't been linked into there yet.
815 *
816 * This function is only called during creation of the region. If anything
817 * fails, our caller will panic and remove the region. So, if we have any
818 * failure, we don't have to clean up any partial allocation.
819 *
820 * PUBLIC: int __repmgr_open __P((ENV *, void *));
821 */
822 int
__repmgr_open(env,rep_)823 __repmgr_open(env, rep_)
824 ENV *env;
825 void *rep_;
826 {
827 DB_REP *db_rep;
828 REP *rep;
829 int ret;
830
831 db_rep = env->rep_handle;
832 rep = rep_;
833
834 if ((ret = __mutex_alloc(env, MTX_REPMGR, 0, &rep->mtx_repmgr)) != 0)
835 return (ret);
836
837 DB_ASSERT(env, rep->siteinfo_seq == 0 && db_rep->siteinfo_seq == 0);
838 rep->siteinfo_off = INVALID_ROFF;
839 rep->siteinfo_seq = 0;
840 if ((ret = __repmgr_share_netaddrs(env, rep, 0, db_rep->site_cnt)) != 0)
841 return (ret);
842
843 rep->self_eid = db_rep->self_eid;
844 rep->perm_policy = db_rep->perm_policy;
845 rep->ack_timeout = db_rep->ack_timeout;
846 rep->connection_retry_wait = db_rep->connection_retry_wait;
847 rep->election_retry_wait = db_rep->election_retry_wait;
848 rep->heartbeat_monitor_timeout = db_rep->heartbeat_monitor_timeout;
849 rep->heartbeat_frequency = db_rep->heartbeat_frequency;
850 return (ret);
851 }
852
853 /*
854 * Join an existing environment, by setting up our local site info structures
855 * from shared network address configuration in the region.
856 *
857 * As __repmgr_open(), note that we can't simply get the REP* address from the
858 * env as we usually do, because at the time of this call it hasn't been linked
859 * into there yet.
860 *
861 * PUBLIC: int __repmgr_join __P((ENV *, void *));
862 */
863 int
__repmgr_join(env,rep_)864 __repmgr_join(env, rep_)
865 ENV *env;
866 void *rep_;
867 {
868 DB_REP *db_rep;
869 REGINFO *infop;
870 REP *rep;
871 SITEINFO *p;
872 REPMGR_SITE *site, temp;
873 repmgr_netaddr_t *addrp;
874 char *host;
875 u_int i, j;
876 int ret;
877
878 db_rep = env->rep_handle;
879 infop = env->reginfo;
880 rep = rep_;
881 ret = 0;
882
883 MUTEX_LOCK(env, rep->mtx_repmgr);
884
885 /*
886 * Merge local and shared lists of remote sites. Note that the
887 * placement of entries in the shared array must not change. To
888 * accomplish the merge, pull in entries from the shared list, into the
889 * proper position, shuffling not-yet-resolved local entries if
890 * necessary. Then add any remaining locally known entries to the
891 * shared list.
892 */
893 i = 0;
894 if (rep->siteinfo_off != INVALID_ROFF) {
895 p = R_ADDR(infop, rep->siteinfo_off);
896
897 /* For each address in the shared list ... */
898 for (; i < rep->site_cnt; i++) {
899 host = R_ADDR(infop, p[i].addr.host);
900
901 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
902 "Site %s:%lu found at EID %u",
903 host, (u_long)p[i].addr.port, i));
904 /*
905 * Find it in the local list. Everything before 'i'
906 * already matches the shared list, and is therefore in
907 * the right place. So we only need to search starting
908 * from 'i'. When found, local config values will be
909 * used because they are assumed to be "fresher". But
910 * membership status is not, since this process hasn't
911 * been active (running) yet.
912 */
913 for (j = i; j < db_rep->site_cnt; j++) {
914 site = &db_rep->sites[j];
915 addrp = &site->net_addr;
916 if (strcmp(host, addrp->host) == 0 &&
917 p[i].addr.port == addrp->port) {
918 p[i].config = site->config;
919 site->membership = p[i].status;
920 break;
921 }
922 }
923
924 /*
925 * When not found in local list, copy peer values
926 * from shared list.
927 */
928 if (j == db_rep->site_cnt) {
929 if ((ret = __repmgr_new_site(env,
930 &site, host, p[i].addr.port)) != 0)
931 goto unlock;
932 site->config = p[i].config;
933 site->membership = p[i].status;
934 }
935 DB_ASSERT(env, j < db_rep->site_cnt);
936
937 /* Found or added at 'j', but belongs at 'i': swap. */
938 if (i != j) {
939 temp = db_rep->sites[j];
940 db_rep->sites[j] = db_rep->sites[i];
941 db_rep->sites[i] = temp;
942 /*
943 * If we're moving the entry that self_eid
944 * points to, then adjust self_eid to match.
945 * For now this is still merely our original,
946 * in-process pointer; we have yet to make sure
947 * it matches the one from shared memory.
948 */
949 if (db_rep->self_eid == (int)j)
950 db_rep->self_eid = (int)i;
951 }
952 }
953 }
954 if ((ret = __repmgr_share_netaddrs(env, rep, i, db_rep->site_cnt)) != 0)
955 goto unlock;
956 if (db_rep->self_eid == DB_EID_INVALID)
957 db_rep->self_eid = rep->self_eid;
958 else if (rep->self_eid == DB_EID_INVALID)
959 rep->self_eid = db_rep->self_eid;
960 else if (db_rep->self_eid != rep->self_eid) {
961 __db_errx(env, DB_STR("3674",
962 "A mismatching local site address has been set in the environment"));
963 ret = EINVAL;
964 goto unlock;
965 }
966
967 db_rep->siteinfo_seq = rep->siteinfo_seq;
968 unlock:
969 MUTEX_UNLOCK(env, rep->mtx_repmgr);
970 return (ret);
971 }
972
973 /*
974 * PUBLIC: int __repmgr_env_refresh __P((ENV *env));
975 */
976 int
__repmgr_env_refresh(env)977 __repmgr_env_refresh(env)
978 ENV *env;
979 {
980 DB_REP *db_rep;
981 REP *rep;
982 REGINFO *infop;
983 SITEINFO *shared_array;
984 u_int i;
985 int ret;
986
987 db_rep = env->rep_handle;
988 rep = db_rep->region;
989 infop = env->reginfo;
990 ret = 0;
991 COMPQUIET(i, 0);
992
993 if (F_ISSET(env, ENV_PRIVATE)) {
994 ret = __mutex_free(env, &rep->mtx_repmgr);
995 if (rep->siteinfo_off != INVALID_ROFF) {
996 shared_array = R_ADDR(infop, rep->siteinfo_off);
997 for (i = 0; i < db_rep->site_cnt; i++)
998 __env_alloc_free(infop, R_ADDR(infop,
999 shared_array[i].addr.host));
1000 __env_alloc_free(infop, shared_array);
1001 rep->siteinfo_off = INVALID_ROFF;
1002 }
1003 }
1004
1005 return (ret);
1006 }
1007
1008 /*
1009 * Copies new remote site information from the indicated private array slots
1010 * into the shared region. The corresponding shared array slots do not exist
1011 * yet; they must be allocated.
1012 *
1013 * PUBLIC: int __repmgr_share_netaddrs __P((ENV *, void *, u_int, u_int));
1014 *
1015 * !!! The rep pointer is passed, because it may not yet have been installed
1016 * into the env handle.
1017 *
1018 * !!! Assumes caller holds mtx_repmgr lock.
1019 */
1020 int
__repmgr_share_netaddrs(env,rep_,start,limit)1021 __repmgr_share_netaddrs(env, rep_, start, limit)
1022 ENV *env;
1023 void *rep_;
1024 u_int start, limit;
1025 {
1026 DB_REP *db_rep;
1027 REP *rep;
1028 REGINFO *infop;
1029 REGENV *renv;
1030 SITEINFO *orig, *shared_array;
1031 char *host, *hostbuf;
1032 size_t sz;
1033 u_int i, n;
1034 int eid, ret, touched;
1035
1036 db_rep = env->rep_handle;
1037 infop = env->reginfo;
1038 renv = infop->primary;
1039 rep = rep_;
1040 ret = 0;
1041 touched = FALSE;
1042
1043 MUTEX_LOCK(env, renv->mtx_regenv);
1044
1045 for (i = start; i < limit; i++) {
1046 if (rep->site_cnt >= rep->site_max) {
1047 /* Table is full, we need more space. */
1048 if (rep->siteinfo_off == INVALID_ROFF) {
1049 n = INITIAL_SITES_ALLOCATION;
1050 sz = n * sizeof(SITEINFO);
1051 if ((ret = __env_alloc(infop,
1052 sz, &shared_array)) != 0)
1053 goto out;
1054 } else {
1055 n = 2 * rep->site_max;
1056 sz = n * sizeof(SITEINFO);
1057 if ((ret = __env_alloc(infop,
1058 sz, &shared_array)) != 0)
1059 goto out;
1060 orig = R_ADDR(infop, rep->siteinfo_off);
1061 memcpy(shared_array, orig,
1062 sizeof(SITEINFO) * rep->site_cnt);
1063 __env_alloc_free(infop, orig);
1064 }
1065 rep->siteinfo_off = R_OFFSET(infop, shared_array);
1066 rep->site_max = n;
1067 } else
1068 shared_array = R_ADDR(infop, rep->siteinfo_off);
1069
1070 DB_ASSERT(env, rep->site_cnt < rep->site_max &&
1071 rep->siteinfo_off != INVALID_ROFF);
1072
1073 host = db_rep->sites[i].net_addr.host;
1074 sz = strlen(host) + 1;
1075 if ((ret = __env_alloc(infop, sz, &hostbuf)) != 0)
1076 goto out;
1077 eid = (int)rep->site_cnt++;
1078 (void)strcpy(hostbuf, host);
1079 shared_array[eid].addr.host = R_OFFSET(infop, hostbuf);
1080 shared_array[eid].addr.port = db_rep->sites[i].net_addr.port;
1081 shared_array[eid].config = db_rep->sites[i].config;
1082 shared_array[eid].status = db_rep->sites[i].membership;
1083 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1084 "EID %d is assigned for site %s:%lu",
1085 eid, host, (u_long)shared_array[eid].addr.port));
1086 touched = TRUE;
1087 }
1088
1089 out:
1090 if (touched)
1091 db_rep->siteinfo_seq = ++rep->siteinfo_seq;
1092 MUTEX_UNLOCK(env, renv->mtx_regenv);
1093 return (ret);
1094 }
1095
1096 /*
1097 * Copy into our local list any newly added/changed remote site
1098 * configuration information.
1099 *
1100 * !!! Caller must hold db_rep->mutex and mtx_repmgr locks.
1101 *
1102 * PUBLIC: int __repmgr_copy_in_added_sites __P((ENV *));
1103 */
1104 int
__repmgr_copy_in_added_sites(env)1105 __repmgr_copy_in_added_sites(env)
1106 ENV *env;
1107 {
1108 DB_REP *db_rep;
1109 REP *rep;
1110 REGINFO *infop;
1111 SITEINFO *base, *p;
1112 REPMGR_SITE *site;
1113 char *host;
1114 int ret;
1115 u_int i;
1116
1117 db_rep = env->rep_handle;
1118 rep = db_rep->region;
1119
1120 if (rep->siteinfo_off == INVALID_ROFF)
1121 goto out;
1122
1123 infop = env->reginfo;
1124 base = R_ADDR(infop, rep->siteinfo_off);
1125
1126 /* Create private array slots for new sites. */
1127 for (i = db_rep->site_cnt; i < rep->site_cnt; i++) {
1128 p = &base[i];
1129 host = R_ADDR(infop, p->addr.host);
1130 if ((ret = __repmgr_new_site(env,
1131 &site, host, p->addr.port)) != 0)
1132 return (ret);
1133 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1134 "Site %s:%lu found at EID %u",
1135 host, (u_long)p->addr.port, i));
1136 }
1137
1138 /* Make sure info is up to date for all sites, old and new. */
1139 for (i = 0; i < db_rep->site_cnt; i++) {
1140 p = &base[i];
1141 site = SITE_FROM_EID(i);
1142 site->config = p->config;
1143 site->membership = p->status;
1144 }
1145
1146 out:
1147 /*
1148 * We always make sure our local list has been brought up to date with
1149 * the shared list before adding to the local list (except before env
1150 * open of course). So here there should be nothing on our local list
1151 * not yet in shared memory.
1152 */
1153 DB_ASSERT(env, db_rep->site_cnt == rep->site_cnt);
1154 db_rep->siteinfo_seq = rep->siteinfo_seq;
1155 return (0);
1156 }
1157
1158 /*
1159 * Initialize a range of sites newly added to our site list array. Process each
1160 * array entry in the range from <= x < limit. Passing from >= limit is
1161 * allowed, and is effectively a no-op.
1162 *
1163 * PUBLIC: int __repmgr_init_new_sites __P((ENV *, int, int));
1164 *
1165 * !!! Assumes caller holds db_rep->mutex.
1166 */
1167 int
__repmgr_init_new_sites(env,from,limit)1168 __repmgr_init_new_sites(env, from, limit)
1169 ENV *env;
1170 int from, limit;
1171 {
1172 DB_REP *db_rep;
1173 REPMGR_SITE *site;
1174 int i, ret;
1175
1176 db_rep = env->rep_handle;
1177
1178 if (db_rep->selector == NULL)
1179 return (0);
1180
1181 DB_ASSERT(env, IS_VALID_EID(from) && IS_VALID_EID(limit) &&
1182 from <= limit);
1183 for (i = from; i < limit; i++) {
1184 site = SITE_FROM_EID(i);
1185 if (site->membership == SITE_PRESENT &&
1186 (ret = __repmgr_schedule_connection_attempt(env,
1187 i, TRUE)) != 0)
1188 return (ret);
1189 }
1190
1191 return (0);
1192 }
1193
1194 /*
1195 * PUBLIC: int __repmgr_failchk __P((ENV *));
1196 */
1197 int
__repmgr_failchk(env)1198 __repmgr_failchk(env)
1199 ENV *env;
1200 {
1201 DB_ENV *dbenv;
1202 DB_REP *db_rep;
1203 REP *rep;
1204 db_threadid_t unused;
1205
1206 dbenv = env->dbenv;
1207 db_rep = env->rep_handle;
1208 rep = db_rep->region;
1209
1210 DB_THREADID_INIT(unused);
1211 MUTEX_LOCK(env, rep->mtx_repmgr);
1212
1213 /*
1214 * Check to see if the main (listener) replication process may have died
1215 * without cleaning up the flag. If so, we only have to clear it, and
1216 * another process should then be able to come along and become the
1217 * listener. So in either case we can return success.
1218 */
1219 if (rep->listener != 0 && !dbenv->is_alive(dbenv,
1220 rep->listener, unused, DB_MUTEX_PROCESS_ONLY))
1221 rep->listener = 0;
1222 MUTEX_UNLOCK(env, rep->mtx_repmgr);
1223
1224 return (0);
1225 }
1226
1227 /*
1228 * PUBLIC: int __repmgr_master_is_known __P((ENV *));
1229 */
1230 int
__repmgr_master_is_known(env)1231 __repmgr_master_is_known(env)
1232 ENV *env;
1233 {
1234 DB_REP *db_rep;
1235 REPMGR_CONNECTION *conn;
1236 REPMGR_SITE *master;
1237
1238 db_rep = env->rep_handle;
1239
1240 /*
1241 * We are the master, or we know of a master and have a healthy
1242 * connection to it.
1243 */
1244 if (db_rep->region->master_id == db_rep->self_eid)
1245 return (TRUE);
1246 if ((master = __repmgr_connected_master(env)) == NULL)
1247 return (FALSE);
1248 if ((conn = master->ref.conn.in) != NULL &&
1249 IS_READY_STATE(conn->state))
1250 return (TRUE);
1251 if ((conn = master->ref.conn.out) != NULL &&
1252 IS_READY_STATE(conn->state))
1253 return (TRUE);
1254 return (FALSE);
1255 }
1256
1257 /*
1258 * PUBLIC: int __repmgr_stable_lsn __P((ENV *, DB_LSN *));
1259 *
1260 * This function may be called before any of repmgr's threads have
1261 * been started. This code must not be called before env open.
1262 * Currently that is impossible since its only caller is log_archive
1263 * which itself cannot be called before env_open.
1264 */
1265 int
__repmgr_stable_lsn(env,stable_lsn)1266 __repmgr_stable_lsn(env, stable_lsn)
1267 ENV *env;
1268 DB_LSN *stable_lsn;
1269 {
1270 DB_REP *db_rep;
1271 REP *rep;
1272
1273 db_rep = env->rep_handle;
1274 rep = db_rep->region;
1275
1276 if (rep->min_log_file != 0 && rep->min_log_file < stable_lsn->file) {
1277 /*
1278 * Returning an LSN to be consistent with the rest of the
1279 * log archiving processing. Construct LSN of format
1280 * [filenum][0].
1281 */
1282 stable_lsn->file = rep->min_log_file;
1283 stable_lsn->offset = 0;
1284 }
1285 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1286 "Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu]",
1287 (u_long)stable_lsn->file, (u_long)stable_lsn->offset));
1288 return (0);
1289 }
1290
1291 /*
1292 * PUBLIC: int __repmgr_send_sync_msg __P((ENV *, REPMGR_CONNECTION *,
1293 * PUBLIC: u_int32_t, u_int8_t *, u_int32_t));
1294 */
1295 int
__repmgr_send_sync_msg(env,conn,type,buf,len)1296 __repmgr_send_sync_msg(env, conn, type, buf, len)
1297 ENV *env;
1298 REPMGR_CONNECTION *conn;
1299 u_int8_t *buf;
1300 u_int32_t len, type;
1301 {
1302 REPMGR_IOVECS iovecs;
1303 __repmgr_msg_hdr_args msg_hdr;
1304 u_int8_t hdr_buf[__REPMGR_MSG_HDR_SIZE];
1305 size_t unused;
1306
1307 msg_hdr.type = REPMGR_OWN_MSG;
1308 REPMGR_OWN_BUF_SIZE(msg_hdr) = len;
1309 REPMGR_OWN_MSG_TYPE(msg_hdr) = type;
1310 __repmgr_msg_hdr_marshal(env, &msg_hdr, hdr_buf);
1311
1312 __repmgr_iovec_init(&iovecs);
1313 __repmgr_add_buffer(&iovecs, hdr_buf, __REPMGR_MSG_HDR_SIZE);
1314 if (len > 0)
1315 __repmgr_add_buffer(&iovecs, buf, len);
1316
1317 return (__repmgr_write_iovecs(env, conn, &iovecs, &unused));
1318 }
1319
1320 /*
1321 * Produce a membership list from the known info currently in memory.
1322 *
1323 * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int8_t **, size_t *));
1324 *
1325 * Caller must hold mutex.
1326 */
1327 int
__repmgr_marshal_member_list(env,bufp,lenp)1328 __repmgr_marshal_member_list(env, bufp, lenp)
1329 ENV *env;
1330 u_int8_t **bufp;
1331 size_t *lenp;
1332 {
1333 DB_REP *db_rep;
1334 REP *rep;
1335 REPMGR_SITE *site;
1336 __repmgr_membr_vers_args membr_vers;
1337 __repmgr_site_info_args site_info;
1338 u_int8_t *buf, *p;
1339 size_t bufsize, len;
1340 u_int i;
1341 int ret;
1342
1343 db_rep = env->rep_handle;
1344 rep = db_rep->region;
1345
1346 /* Compute a (generous) upper bound on needed buffer size. */
1347 bufsize = __REPMGR_MEMBR_VERS_SIZE +
1348 db_rep->site_cnt * (__REPMGR_SITE_INFO_SIZE + MAXHOSTNAMELEN + 1);
1349 if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
1350 return (ret);
1351 p = buf;
1352
1353 membr_vers.version = db_rep->membership_version;
1354 membr_vers.gen = rep->gen;
1355 __repmgr_membr_vers_marshal(env, &membr_vers, p);
1356 p += __REPMGR_MEMBR_VERS_SIZE;
1357
1358 for (i = 0; i < db_rep->site_cnt; i++) {
1359 site = SITE_FROM_EID(i);
1360 if (site->membership == 0)
1361 continue;
1362
1363 site_info.host.data = site->net_addr.host;
1364 site_info.host.size =
1365 (u_int32_t)strlen(site->net_addr.host) + 1;
1366 site_info.port = site->net_addr.port;
1367 site_info.flags = site->membership;
1368
1369 ret = __repmgr_site_info_marshal(env,
1370 &site_info, p, (size_t)(&buf[bufsize]-p), &len);
1371 DB_ASSERT(env, ret == 0);
1372 p += len;
1373 }
1374 len = (size_t)(p - buf);
1375
1376 *bufp = buf;
1377 *lenp = len;
1378 DB_ASSERT(env, ret == 0);
1379 return (0);
1380 }
1381
1382 /*
1383 * Produce a membership list by reading the database.
1384 */
1385 static int
read_gmdb(env,ip,bufp,lenp)1386 read_gmdb(env, ip, bufp, lenp)
1387 ENV *env;
1388 DB_THREAD_INFO *ip;
1389 u_int8_t **bufp;
1390 size_t *lenp;
1391 {
1392 DB_TXN *txn;
1393 DB *dbp;
1394 DBC *dbc;
1395 DBT key_dbt, data_dbt;
1396 __repmgr_membership_key_args key;
1397 __repmgr_membership_data_args member_status;
1398 __repmgr_member_metadata_args metadata;
1399 __repmgr_membr_vers_args membr_vers;
1400 __repmgr_site_info_args site_info;
1401 u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
1402 u_int8_t key_buf[MAX_MSG_BUF];
1403 u_int8_t metadata_buf[__REPMGR_MEMBER_METADATA_SIZE];
1404 char *host;
1405 size_t bufsize, len;
1406 u_int8_t *buf, *p;
1407 u_int32_t gen;
1408 int ret, t_ret;
1409
1410 txn = NULL;
1411 dbp = NULL;
1412 dbc = NULL;
1413 buf = NULL;
1414 COMPQUIET(len, 0);
1415
1416 if ((ret = __rep_get_datagen(env, &gen)) != 0)
1417 return (ret);
1418 if ((ret = __txn_begin(env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0)
1419 goto err;
1420 if ((ret = __rep_open_sysdb(env, ip, txn, REPMEMBERSHIP, 0, &dbp)) != 0)
1421 goto err;
1422 if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
1423 goto err;
1424
1425 memset(&key_dbt, 0, sizeof(key_dbt));
1426 key_dbt.data = key_buf;
1427 key_dbt.ulen = sizeof(key_buf);
1428 F_SET(&key_dbt, DB_DBT_USERMEM);
1429 memset(&data_dbt, 0, sizeof(data_dbt));
1430 data_dbt.data = metadata_buf;
1431 data_dbt.ulen = sizeof(metadata_buf);
1432 F_SET(&data_dbt, DB_DBT_USERMEM);
1433
1434 /* Get metadata record, make sure key looks right. */
1435 if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) != 0)
1436 goto err;
1437 ret = __repmgr_membership_key_unmarshal(env,
1438 &key, key_buf, key_dbt.size, NULL);
1439 DB_ASSERT(env, ret == 0);
1440 DB_ASSERT(env, key.host.size == 0);
1441 DB_ASSERT(env, key.port == 0);
1442 ret = __repmgr_member_metadata_unmarshal(env,
1443 &metadata, metadata_buf, data_dbt.size, NULL);
1444 DB_ASSERT(env, ret == 0);
1445 DB_ASSERT(env, metadata.format == REPMGR_GMDB_FMT_VERSION);
1446 DB_ASSERT(env, metadata.version > 0);
1447
1448 bufsize = 1000; /* Initial guess. */
1449 if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
1450 goto err;
1451 membr_vers.version = metadata.version;
1452 membr_vers.gen = gen;
1453 __repmgr_membr_vers_marshal(env, &membr_vers, buf);
1454 p = &buf[__REPMGR_MEMBR_VERS_SIZE];
1455
1456 data_dbt.data = data_buf;
1457 data_dbt.ulen = sizeof(data_buf);
1458 while ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) == 0) {
1459 ret = __repmgr_membership_key_unmarshal(env,
1460 &key, key_buf, key_dbt.size, NULL);
1461 DB_ASSERT(env, ret == 0);
1462 DB_ASSERT(env, key.host.size <= MAXHOSTNAMELEN + 1 &&
1463 key.host.size > 1);
1464 host = (char*)key.host.data;
1465 DB_ASSERT(env, host[key.host.size-1] == '\0');
1466 DB_ASSERT(env, key.port > 0);
1467
1468 ret = __repmgr_membership_data_unmarshal(env,
1469 &member_status, data_buf, data_dbt.size, NULL);
1470 DB_ASSERT(env, ret == 0);
1471 DB_ASSERT(env, member_status.flags != 0);
1472
1473 site_info.host = key.host;
1474 site_info.port = key.port;
1475 site_info.flags = member_status.flags;
1476 if ((ret = __repmgr_site_info_marshal(env, &site_info,
1477 p, (size_t)(&buf[bufsize]-p), &len)) == ENOMEM) {
1478 bufsize *= 2;
1479 len = (size_t)(p - buf);
1480 if ((ret = __os_realloc(env, bufsize, &buf)) != 0)
1481 goto err;
1482 p = &buf[len];
1483 ret = __repmgr_site_info_marshal(env,
1484 &site_info, p, (size_t)(&buf[bufsize]-p), &len);
1485 DB_ASSERT(env, ret == 0);
1486 }
1487 p += len;
1488 }
1489 len = (size_t)(p - buf);
1490 if (ret == DB_NOTFOUND)
1491 ret = 0;
1492
1493 err:
1494 if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1495 ret = t_ret;
1496 if (dbp != NULL &&
1497 (t_ret = __db_close(dbp, txn, DB_NOSYNC)) != 0 && ret == 0)
1498 ret = t_ret;
1499 if (txn != NULL &&
1500 (t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 && ret == 0)
1501 ret = t_ret;
1502 if (ret == 0) {
1503 *bufp = buf;
1504 *lenp = len;
1505 } else if (buf != NULL)
1506 __os_free(env, buf);
1507 return (ret);
1508 }
1509
1510 /*
1511 * Refresh our sites array from the given membership list.
1512 *
1513 * PUBLIC: int __repmgr_refresh_membership __P((ENV *,
1514 * PUBLIC: u_int8_t *, size_t));
1515 */
1516 int
__repmgr_refresh_membership(env,buf,len)1517 __repmgr_refresh_membership(env, buf, len)
1518 ENV *env;
1519 u_int8_t *buf;
1520 size_t len;
1521 {
1522 DB_REP *db_rep;
1523 REPMGR_SITE *site;
1524 __repmgr_membr_vers_args membr_vers;
1525 __repmgr_site_info_args site_info;
1526 char *host;
1527 u_int8_t *p;
1528 u_int16_t port;
1529 u_int32_t i, n;
1530 int eid, ret;
1531
1532 db_rep = env->rep_handle;
1533
1534 /*
1535 * Membership list consists of membr_vers followed by a number of
1536 * site_info structs.
1537 */
1538 ret = __repmgr_membr_vers_unmarshal(env, &membr_vers, buf, len, &p);
1539 DB_ASSERT(env, ret == 0);
1540
1541 if (db_rep->repmgr_status == stopped)
1542 return (0);
1543 /* Ignore obsolete versions. */
1544 if (__repmgr_gmdb_version_cmp(env,
1545 membr_vers.gen, membr_vers.version) <= 0)
1546 return (0);
1547
1548 LOCK_MUTEX(db_rep->mutex);
1549
1550 db_rep->membership_version = membr_vers.version;
1551 db_rep->member_version_gen = membr_vers.gen;
1552
1553 for (i = 0; i < db_rep->site_cnt; i++)
1554 F_CLR(SITE_FROM_EID(i), SITE_TOUCHED);
1555
1556 for (n = 0; p < &buf[len]; ++n) {
1557 ret = __repmgr_site_info_unmarshal(env,
1558 &site_info, p, (size_t)(&buf[len] - p), &p);
1559 DB_ASSERT(env, ret == 0);
1560
1561 host = site_info.host.data;
1562 DB_ASSERT(env,
1563 (u_int8_t*)site_info.host.data + site_info.host.size <= p);
1564 host[site_info.host.size-1] = '\0';
1565 port = site_info.port;
1566
1567 if ((ret = __repmgr_set_membership(env,
1568 host, port, site_info.flags)) != 0)
1569 goto err;
1570
1571 if ((ret = __repmgr_find_site(env, host, port, &eid)) != 0)
1572 goto err;
1573 DB_ASSERT(env, IS_VALID_EID(eid));
1574 F_SET(SITE_FROM_EID(eid), SITE_TOUCHED);
1575 }
1576 ret = __rep_set_nsites_int(env, n);
1577 DB_ASSERT(env, ret == 0);
1578
1579 /* Scan "touched" flags so as to notice sites that have been removed. */
1580 for (i = 0; i < db_rep->site_cnt; i++) {
1581 site = SITE_FROM_EID(i);
1582 if (F_ISSET(site, SITE_TOUCHED))
1583 continue;
1584 host = site->net_addr.host;
1585 port = site->net_addr.port;
1586 if ((ret = __repmgr_set_membership(env, host, port, 0)) != 0)
1587 goto err;
1588 }
1589
1590 err:
1591 UNLOCK_MUTEX(db_rep->mutex);
1592 return (ret);
1593 }
1594
1595 /*
1596 * PUBLIC: int __repmgr_reload_gmdb __P((ENV *));
1597 */
1598 int
__repmgr_reload_gmdb(env)1599 __repmgr_reload_gmdb(env)
1600 ENV *env;
1601 {
1602 DB_THREAD_INFO *ip;
1603 u_int8_t *buf;
1604 size_t len;
1605 int ret;
1606
1607 ENV_ENTER(env, ip);
1608 if ((ret = read_gmdb(env, ip, &buf, &len)) == 0) {
1609 env->rep_handle->have_gmdb = TRUE;
1610 ret = __repmgr_refresh_membership(env, buf, len);
1611 __os_free(env, buf);
1612 }
1613 ENV_LEAVE(env, ip);
1614 return (ret);
1615 }
1616
1617 /*
1618 * Return 1, 0, or -1, as the given gen/version combination is >, =, or < our
1619 * currently known version.
1620 *
1621 * PUBLIC: int __repmgr_gmdb_version_cmp __P((ENV *, u_int32_t, u_int32_t));
1622 */
1623 int
__repmgr_gmdb_version_cmp(env,gen,version)1624 __repmgr_gmdb_version_cmp(env, gen, version)
1625 ENV *env;
1626 u_int32_t gen, version;
1627 {
1628 DB_REP *db_rep;
1629 u_int32_t g, v;
1630
1631 db_rep = env->rep_handle;
1632 g = db_rep->member_version_gen;
1633 v = db_rep->membership_version;
1634
1635 if (gen == g)
1636 return (version == v ? 0 :
1637 (version < v ? -1 : 1));
1638 return (gen < g ? -1 : 1);
1639 }
1640
1641 /*
1642 * PUBLIC: int __repmgr_init_save __P((ENV *, DBT *));
1643 */
1644 int
__repmgr_init_save(env,dbt)1645 __repmgr_init_save(env, dbt)
1646 ENV *env;
1647 DBT *dbt;
1648 {
1649 DB_REP *db_rep;
1650 u_int8_t *buf;
1651 size_t len;
1652 int ret;
1653
1654 db_rep = env->rep_handle;
1655 LOCK_MUTEX(db_rep->mutex);
1656 if (db_rep->site_cnt == 0) {
1657 dbt->data = NULL;
1658 dbt->size = 0;
1659 ret = 0;
1660 } else if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) == 0) {
1661 dbt->data = buf;
1662 dbt->size = (u_int32_t)len;
1663 }
1664 UNLOCK_MUTEX(db_rep->mutex);
1665
1666 return (ret);
1667 }
1668
1669 /*
1670 * PUBLIC: int __repmgr_init_restore __P((ENV *, DBT *));
1671 */
1672 int
__repmgr_init_restore(env,dbt)1673 __repmgr_init_restore(env, dbt)
1674 ENV *env;
1675 DBT *dbt;
1676 {
1677 DB_REP *db_rep;
1678
1679 db_rep = env->rep_handle;
1680 db_rep->restored_list = dbt->data;
1681 db_rep->restored_list_length = dbt->size;
1682 return (0);
1683 }
1684
1685 /*
1686 * Generates an internal request for a deferred operation, to be performed on a
1687 * separate thread (conveniently, a message-processing thread).
1688 *
1689 * PUBLIC: int __repmgr_defer_op __P((ENV *, u_int32_t));
1690 *
1691 * Caller should hold mutex.
1692 */
1693 int
__repmgr_defer_op(env,op)1694 __repmgr_defer_op(env, op)
1695 ENV *env;
1696 u_int32_t op;
1697 {
1698 REPMGR_MESSAGE *msg;
1699 int ret;
1700
1701 /*
1702 * Overload REPMGR_MESSAGE to convey the type of operation being
1703 * requested. For now "op" is all we need; plenty of room for expansion
1704 * if needed in the future.
1705 *
1706 * Leave msg->v.gmdb_msg.conn NULL to show no conn to be cleaned up.
1707 */
1708 if ((ret = __os_calloc(env, 1, sizeof(*msg), &msg)) != 0)
1709 return (ret);
1710 msg->msg_hdr.type = REPMGR_OWN_MSG;
1711 REPMGR_OWN_MSG_TYPE(msg->msg_hdr) = op;
1712 ret = __repmgr_queue_put(env, msg);
1713 return (ret);
1714 }
1715
1716 /*
1717 * PUBLIC: void __repmgr_fire_conn_err_event __P((ENV *,
1718 * PUBLIC: REPMGR_CONNECTION *, int));
1719 */
1720 void
__repmgr_fire_conn_err_event(env,conn,err)1721 __repmgr_fire_conn_err_event(env, conn, err)
1722 ENV *env;
1723 REPMGR_CONNECTION *conn;
1724 int err;
1725 {
1726 DB_REP *db_rep;
1727 DB_REPMGR_CONN_ERR info;
1728
1729 db_rep = env->rep_handle;
1730 if (conn->type == REP_CONNECTION && IS_VALID_EID(conn->eid)) {
1731 __repmgr_print_conn_err(env,
1732 &SITE_FROM_EID(conn->eid)->net_addr, err);
1733 info.eid = conn->eid;
1734 info.error = err;
1735 DB_EVENT(env, DB_EVENT_REP_CONNECT_BROKEN, &info);
1736 }
1737 }
1738
1739 /*
1740 * PUBLIC: void __repmgr_print_conn_err __P((ENV *, repmgr_netaddr_t *, int));
1741 */
1742 void
__repmgr_print_conn_err(env,netaddr,err)1743 __repmgr_print_conn_err(env, netaddr, err)
1744 ENV *env;
1745 repmgr_netaddr_t *netaddr;
1746 int err;
1747 {
1748 SITE_STRING_BUFFER site_loc_buf;
1749 char msgbuf[200]; /* Arbitrary size. */
1750
1751 (void)__repmgr_format_addr_loc(netaddr, site_loc_buf);
1752 /* TCP/IP sockets API convention: 0 indicates "end-of-file". */
1753 if (err == 0)
1754 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1755 "EOF on connection to %s", site_loc_buf));
1756 else
1757 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1758 "`%s' (%d) on connection to %s",
1759 __os_strerror(err, msgbuf, sizeof(msgbuf)),
1760 err, site_loc_buf));
1761 }
1762
1763 /*
1764 * Change role from master to client, but if a GMDB operation is in progress,
1765 * wait for it to finish first.
1766 *
1767 * PUBLIC: int __repmgr_become_client __P((ENV *));
1768 */
1769 int
__repmgr_become_client(env)1770 __repmgr_become_client(env)
1771 ENV *env;
1772 {
1773 DB_REP *db_rep;
1774 int ret;
1775
1776 db_rep = env->rep_handle;
1777 LOCK_MUTEX(db_rep->mutex);
1778 if ((ret = __repmgr_await_gmdbop(env)) == 0)
1779 db_rep->client_intent = TRUE;
1780 UNLOCK_MUTEX(db_rep->mutex);
1781 return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT) : ret);
1782 }
1783
1784 /*
1785 * Looks up a site from our local (in-process) list, or returns NULL if not
1786 * found.
1787 *
1788 * PUBLIC: REPMGR_SITE *__repmgr_lookup_site __P((ENV *, const char *, u_int));
1789 */
1790 REPMGR_SITE *
__repmgr_lookup_site(env,host,port)1791 __repmgr_lookup_site(env, host, port)
1792 ENV *env;
1793 const char *host;
1794 u_int port;
1795 {
1796 DB_REP *db_rep;
1797 REPMGR_SITE *site;
1798 u_int i;
1799
1800 db_rep = env->rep_handle;
1801 for (i = 0; i < db_rep->site_cnt; i++) {
1802 site = &db_rep->sites[i];
1803
1804 if (strcmp(site->net_addr.host, host) == 0 &&
1805 site->net_addr.port == port)
1806 return (site);
1807 }
1808
1809 return (NULL);
1810 }
1811
1812 /*
1813 * Look up a site, or add it if it doesn't already exist.
1814 *
1815 * Caller must hold db_rep mutex and be within ENV_ENTER context, unless this is
1816 * a pre-open call.
1817 *
1818 * PUBLIC: int __repmgr_find_site __P((ENV *, const char *, u_int, int *));
1819 */
1820 int
__repmgr_find_site(env,host,port,eidp)1821 __repmgr_find_site(env, host, port, eidp)
1822 ENV *env;
1823 const char *host;
1824 u_int port;
1825 int *eidp;
1826 {
1827 DB_REP *db_rep;
1828 REP *rep;
1829 REPMGR_SITE *site;
1830 int eid, ret;
1831
1832 db_rep = env->rep_handle;
1833 ret = 0;
1834 if (REP_ON(env)) {
1835 rep = db_rep->region;
1836 MUTEX_LOCK(env, rep->mtx_repmgr);
1837 ret = get_eid(env, host, port, &eid);
1838 MUTEX_UNLOCK(env, rep->mtx_repmgr);
1839 } else {
1840 if ((site = __repmgr_lookup_site(env, host, port)) == NULL &&
1841 (ret = __repmgr_new_site(env, &site, host, port)) != 0)
1842 return (ret);
1843 eid = EID_FROM_SITE(site);
1844 }
1845 if (ret == 0)
1846 *eidp = eid;
1847 return (ret);
1848 }
1849
1850 /*
1851 * Get the EID of the named remote site, even if it means creating a new entry
1852 * in our table if it doesn't already exist.
1853 *
1854 * Caller must hold both db_rep mutex and mtx_repmgr.
1855 */
1856 static int
get_eid(env,host,port,eidp)1857 get_eid(env, host, port, eidp)
1858 ENV *env;
1859 const char *host;
1860 u_int port;
1861 int *eidp;
1862 {
1863 DB_REP *db_rep;
1864 REP *rep;
1865 REPMGR_SITE *site;
1866 int eid, ret;
1867
1868 db_rep = env->rep_handle;
1869 rep = db_rep->region;
1870
1871 if ((ret = __repmgr_copy_in_added_sites(env)) != 0)
1872 return (ret);
1873 if ((site = __repmgr_lookup_site(env, host, port)) == NULL) {
1874 /*
1875 * Store both locally and in shared region.
1876 */
1877 if ((ret = __repmgr_new_site(env, &site, host, port)) != 0)
1878 return (ret);
1879
1880 eid = EID_FROM_SITE(site);
1881 DB_ASSERT(env, (u_int)eid == db_rep->site_cnt - 1);
1882 if ((ret = __repmgr_share_netaddrs(env,
1883 rep, (u_int)eid, db_rep->site_cnt)) == 0) {
1884 /* Show that a change was made. */
1885 db_rep->siteinfo_seq = ++rep->siteinfo_seq;
1886 } else {
1887 /*
1888 * Rescind the local slot we just added, so that we at
1889 * least keep the two lists in sync.
1890 */
1891 db_rep->site_cnt--;
1892 __repmgr_cleanup_netaddr(env, &site->net_addr);
1893 }
1894 } else
1895 eid = EID_FROM_SITE(site);
1896 if (ret == 0)
1897 *eidp = eid;
1898 return (ret);
1899 }
1900
1901 /*
1902 * Sets the named remote site's group membership status to the given value,
1903 * creating it first if it doesn't already exist. Adjusts connections
1904 * accordingly.
1905 *
1906 * PUBLIC: int __repmgr_set_membership __P((ENV *,
1907 * PUBLIC: const char *, u_int, u_int32_t));
1908 *
1909 * Caller must host db_rep mutex, and be in ENV_ENTER context.
1910 */
1911 int
__repmgr_set_membership(env,host,port,status)1912 __repmgr_set_membership(env, host, port, status)
1913 ENV *env;
1914 const char *host;
1915 u_int port;
1916 u_int32_t status;
1917 {
1918 DB_REP *db_rep;
1919 REP *rep;
1920 REGINFO *infop;
1921 REPMGR_SITE *site;
1922 SITEINFO *sites;
1923 u_int32_t orig;
1924 int eid, ret;
1925
1926 db_rep = env->rep_handle;
1927 rep = db_rep->region;
1928 infop = env->reginfo;
1929
1930 COMPQUIET(orig, 0);
1931 COMPQUIET(site, NULL);
1932 DB_ASSERT(env, REP_ON(env));
1933
1934 MUTEX_LOCK(env, rep->mtx_repmgr);
1935 if ((ret = get_eid(env, host, port, &eid)) == 0) {
1936 DB_ASSERT(env, IS_VALID_EID(eid));
1937 site = SITE_FROM_EID(eid);
1938 orig = site->membership;
1939 sites = R_ADDR(infop, rep->siteinfo_off);
1940
1941 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1942 "set membership for %s:%lu %lu (was %lu)",
1943 host, (u_long)port, (u_long)status, (u_long)orig));
1944 if (status != sites[eid].status) {
1945 /*
1946 * Show that a change is occurring.
1947 *
1948 * The call to get_eid() might have also bumped the
1949 * sequence number, and since this is all happening
1950 * within a single critical section it would be possible
1951 * to avoid "wasting" a sequence number. But it's
1952 * hardly worth the trouble and mental complexity: the
1953 * sequence number counts changes that occur within an
1954 * env region lifetime, so there should be plenty.
1955 * We'll run out of membership DB version numbers long
1956 * before this becomes a problem.
1957 */
1958 db_rep->siteinfo_seq = ++rep->siteinfo_seq;
1959 }
1960
1961 /* Set both private and shared copies of the info. */
1962 site->membership = status;
1963 sites[eid].status = status;
1964 }
1965 MUTEX_UNLOCK(env, rep->mtx_repmgr);
1966
1967 /*
1968 * If our notion of the site's membership changed, we may need to create
1969 * or kill a connection.
1970 */
1971 if (ret == 0 && db_rep->repmgr_status == running &&
1972 SELECTOR_RUNNING(db_rep)) {
1973
1974 if (eid == db_rep->self_eid && status != SITE_PRESENT)
1975 ret = DB_DELETED;
1976 else if (orig != SITE_PRESENT && status == SITE_PRESENT &&
1977 site->state == SITE_IDLE) {
1978 /*
1979 * Here we might have just joined a group, or we might
1980 * be an existing site and we've just learned of another
1981 * site joining the group. In the former case, we
1982 * certainly want to connect right away; in the later
1983 * case it might be better to wait, because the new site
1984 * probably isn't quite ready to accept our connection.
1985 * But deciding which case we're in here would be messy,
1986 * so for now we just keep it simple and always try
1987 * connecting immediately. The resulting connection
1988 * failure shouldn't hurt anything, because we'll just
1989 * naturally try again later.
1990 */
1991 ret = __repmgr_schedule_connection_attempt(env,
1992 eid, TRUE);
1993 if (eid != db_rep->self_eid)
1994 DB_EVENT(env, DB_EVENT_REP_SITE_ADDED, &eid);
1995 } else if (orig != 0 && status == 0)
1996 DB_EVENT(env, DB_EVENT_REP_SITE_REMOVED, &eid);
1997
1998 /*
1999 * Callers are responsible for adjusting nsites, even though in
2000 * a way it would make sense to do it here. It's awkward to do
2001 * it here at start-up/join time, when we load up starting from
2002 * an empty array. Then we would get rep_set_nsites()
2003 * repeatedly, and when leases were in use that would thrash the
2004 * lease table adjustment.
2005 */
2006 }
2007 return (ret);
2008 }
2009
2010 /*
2011 * PUBLIC: int __repmgr_bcast_parm_refresh __P((ENV *));
2012 */
2013 int
__repmgr_bcast_parm_refresh(env)2014 __repmgr_bcast_parm_refresh(env)
2015 ENV *env;
2016 {
2017 DB_REP *db_rep;
2018 REP *rep;
2019 __repmgr_parm_refresh_args parms;
2020 u_int8_t buf[__REPMGR_PARM_REFRESH_SIZE];
2021 int ret;
2022
2023 DB_ASSERT(env, REP_ON(env));
2024 db_rep = env->rep_handle;
2025 rep = db_rep->region;
2026 LOCK_MUTEX(db_rep->mutex);
2027 parms.ack_policy = (u_int32_t)rep->perm_policy;
2028 if (rep->priority == 0)
2029 parms.flags = 0;
2030 else
2031 parms.flags = SITE_ELECTABLE;
2032 __repmgr_parm_refresh_marshal(env, &parms, buf);
2033 ret = __repmgr_bcast_own_msg(env,
2034 REPMGR_PARM_REFRESH, buf, __REPMGR_PARM_REFRESH_SIZE);
2035 UNLOCK_MUTEX(db_rep->mutex);
2036 return (ret);
2037 }
2038
2039 /*
2040 * PUBLIC: int __repmgr_chg_prio __P((ENV *, u_int32_t, u_int32_t));
2041 */
2042 int
__repmgr_chg_prio(env,prev,cur)2043 __repmgr_chg_prio(env, prev, cur)
2044 ENV *env;
2045 u_int32_t prev, cur;
2046 {
2047 if ((prev == 0 && cur != 0) ||
2048 (prev != 0 && cur == 0))
2049 return (__repmgr_bcast_parm_refresh(env));
2050 return (0);
2051 }
2052
2053 /*
2054 * PUBLIC: int __repmgr_bcast_own_msg __P((ENV *,
2055 * PUBLIC: u_int32_t, u_int8_t *, size_t));
2056 *
2057 * Caller must hold mutex.
2058 */
2059 int
__repmgr_bcast_own_msg(env,type,buf,len)2060 __repmgr_bcast_own_msg(env, type, buf, len)
2061 ENV *env;
2062 u_int32_t type;
2063 u_int8_t *buf;
2064 size_t len;
2065 {
2066 DB_REP *db_rep;
2067 REPMGR_CONNECTION *conn;
2068 REPMGR_SITE *site;
2069 int ret;
2070 u_int i;
2071
2072 db_rep = env->rep_handle;
2073 if (!SELECTOR_RUNNING(db_rep))
2074 return (0);
2075 FOR_EACH_REMOTE_SITE_INDEX(i) {
2076 site = SITE_FROM_EID(i);
2077 if (site->state != SITE_CONNECTED)
2078 continue;
2079 if ((conn = site->ref.conn.in) != NULL &&
2080 conn->state == CONN_READY &&
2081 (ret = __repmgr_send_own_msg(env,
2082 conn, type, buf, (u_int32_t)len)) != 0 &&
2083 (ret = __repmgr_bust_connection(env, conn)) != 0)
2084 return (ret);
2085 if ((conn = site->ref.conn.out) != NULL &&
2086 conn->state == CONN_READY &&
2087 (ret = __repmgr_send_own_msg(env,
2088 conn, type, buf, (u_int32_t)len)) != 0 &&
2089 (ret = __repmgr_bust_connection(env, conn)) != 0)
2090 return (ret);
2091 }
2092 return (0);
2093 }
2094