1 /* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <assert.h>
24 #include <stdlib.h>
25
26 #include "xcom/app_data.h"
27 #include "xcom/xcom_profile.h"
28 #ifndef XCOM_STANDALONE
29 #include "my_compiler.h"
30 #endif
31 #include "xcom/node_list.h"
32 #include "xcom/node_no.h"
33 #include "xcom/node_set.h"
34 #include "xcom/pax_msg.h"
35 #include "xcom/server_struct.h"
36 #include "xcom/simset.h"
37 #include "xcom/site_def.h"
38 #include "xcom/site_struct.h"
39 #include "xcom/sock_probe.h"
40 #include "xcom/synode_no.h"
41 #include "xcom/task.h"
42 #include "xcom/task_debug.h"
43 #include "xcom/x_platform.h"
44 #include "xcom/xcom_base.h"
45 #include "xcom/xcom_common.h"
46 #include "xcom/xcom_detector.h"
47 #include "xcom/xcom_interface.h"
48 #include "xcom/xcom_transport.h"
49 #include "xdr_gen/xcom_vp.h"
50
51 extern task_env *detector;
52 extern int xcom_shutdown;
53 extern linkage detector_wait;
54 #define MAX_SILENT 4.0
55
56 #define DETECT(site, i) \
57 (i == get_nodeno(site)) || \
58 (site->detected[i] + DETECTOR_LIVE_TIMEOUT > task_now())
59
60 /* static double detected[NSERVERS]; */
61
62 /* See if node has been suspiciously still for some time */
may_be_dead(detector_state const ds,node_no i,double seconds)63 int may_be_dead(detector_state const ds, node_no i, double seconds) {
64 /* IFDBG(D_DETECT, FN; NDBG(i,u); NDBG(ds[i] < seconds - 2.0, d)); */
65 return ds[i] < seconds - MAX_SILENT;
66 }
67
init_detector(detector_state ds)68 void init_detector(detector_state ds) {
69 int i = 0;
70 for (i = 0; i < NSERVERS; i++) {
71 ds[i] = 0.0;
72 }
73 }
74
note_detected(site_def const * site,node_no node)75 int note_detected(site_def const *site, node_no node) {
76 int retval = 1;
77 /* IFDBG(D_DETECT, FN; NDBG(node,d);); */
78
79 /* site->servers's size is NSERVERS. */
80 assert(site->nodes.node_list_len <= NSERVERS);
81
82 if (site && node < site->nodes.node_list_len) {
83 retval = DETECT(site, node);
84 server_detected(site->servers[node]);
85 }
86 return retval;
87 }
88
89 /**
90 * @brief States if a server is still physically connected to another server.
91 * This will test the connection state to that node.
92 *
93 * @param site site definition that contains the server collections.
94 * @param node node index that we want to test for connectivity
95 *
96 * @return 1 if the server is connected. 0 otherwise.
97 */
is_server_connected(struct site_def const * site,node_no node)98 int is_server_connected(struct site_def const *site, node_no node) {
99 int retval = 0;
100
101 if (site) {
102 if (get_nodeno(site) == node) { // Me to myself... i'm always connected
103 retval = 1;
104 } else if (node < site->nodes.node_list_len) {
105 retval = is_connected(&site->servers[node]->con);
106 }
107 }
108
109 return retval;
110 }
111
reset_detected(site_def const * site,u_int node)112 static void reset_detected(site_def const *site, u_int node) {
113 IFDBG(D_DETECT, FN; PTREXP(site); NDBG(node, d););
114 /* site->servers's size is NSERVERS. */
115 assert(site->nodes.node_list_len <= NSERVERS);
116 if (site && node < site->nodes.node_list_len) {
117 site->servers[node]->detected = 0.0;
118 }
119 }
120
reset_disjunct_servers(struct site_def const * old_site,struct site_def const * new_site)121 void reset_disjunct_servers(struct site_def const *old_site,
122 struct site_def const *new_site) {
123 u_int node;
124
125 if (old_site && new_site) {
126 /* Reset nodes which are not in new site (removed) */
127 for (node = 0; node < old_site->nodes.node_list_len; node++) {
128 if (!node_exists(&old_site->nodes.node_list_val[node], &new_site->nodes))
129 reset_detected(old_site, node);
130 }
131 }
132 }
133
dbg_detected(site_def * site)134 static void dbg_detected(site_def *site) {
135 u_int node;
136
137 if (site) {
138 for (node = 0; node < site->nodes.node_list_len; node++) {
139 IFDBG(D_DETECT, FN; NDBG(node, d); NDBG(site->detected[node], f);
140 NDBG(site->servers[node]->detected, f));
141 }
142 }
143 }
144
update_detected(site_def * site)145 void update_detected(site_def *site) {
146 u_int node;
147
148 if (site) {
149 bool_t changed = FALSE;
150 for (node = 0; node < site->nodes.node_list_len; node++) {
151 IFDBG(D_DETECT, FN; NDBG(node, d); NDBG(site->detected[node], f);
152 NDBG(site->servers[node]->detected, f));
153 if (site->detected[node] != site->servers[node]->detected) changed = TRUE;
154 site->detected[node] = site->servers[node]->detected;
155 }
156 site->detector_updated = 1;
157
158 if (changed) {
159 dbg_detected(site);
160 }
161 }
162 }
163
enough_live_nodes(site_def * site)164 int enough_live_nodes(site_def *site) {
165 node_no i = 0;
166 double t = task_now();
167 node_no n = 0;
168 node_no maxnodes = get_maxnodes(site);
169 node_no self = get_nodeno(site);
170
171 update_detected(site);
172
173 /* IFDBG(D_DETECT, FN; NDBG(maxnodes,d); );*/
174 if (maxnodes == 0) return 0;
175 for (i = 0; i < maxnodes; i++) {
176 if (i == self || t - site->detected[i] < DETECTOR_LIVE_TIMEOUT) {
177 n++;
178 }
179 }
180 /* IFDBG(D_DETECT, FN; NDBG(maxnodes,d); NDBG(n,d);); */
181 #ifdef NODE_0_IS_ARBITRATOR
182 return maxnodes > 0 &&
183 (n > maxnodes / 2 ||
184 (ARBITRATOR_HACK && (get_nodeno(site) == 0) && (2 == maxnodes)));
185 #else
186 return maxnodes > 0 &&
187 (n > maxnodes / 2 || (ARBITRATOR_HACK && (2 == maxnodes)));
188 #endif
189 }
190
191 static void send_my_view(site_def const *site);
192
update_global_count(site_def * site)193 static void update_global_count(site_def *site) {
194 u_int i;
195 u_int nodes = get_maxnodes(site);
196
197 site->global_node_count = 0;
198 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
199 if (site->global_node_set.node_set_val[i]) site->global_node_count++;
200 }
201 }
202
check_global_node_set(site_def * site,int * notify)203 static void check_global_node_set(site_def *site, int *notify) {
204 u_int i;
205 u_int nodes = get_maxnodes(site);
206
207 site->global_node_count = 0;
208 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
209 int detect = DETECT(site, i);
210 IFDBG(
211 D_DETECT, if (i == 0) {
212 FN;
213 NDBG(task_now(), f);
214 });
215 IFDBG(D_DETECT, FN; NDBG(i, d); NDBG(detect, d);
216 NDBG(site->detected[i], f));
217 if (site->global_node_set.node_set_val[i]) site->global_node_count++;
218 if (site->global_node_set.node_set_val[i] != detect) {
219 *notify = 1;
220 }
221 IFDBG(D_DETECT, FN; NDBG(i, u); NDBG(*notify, d));
222 }
223 }
224
check_local_node_set(site_def * site,int * notify)225 static void check_local_node_set(site_def *site, int *notify) {
226 u_int i;
227 u_int nodes = get_maxnodes(site);
228
229 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
230 int detect = DETECT(site, i);
231 if (site->local_node_set.node_set_val[i] != detect) {
232 site->local_node_set.node_set_val[i] = detect;
233 *notify = 1;
234 }
235 IFDBG(D_DETECT, FN; NDBG(i, u); NDBG(*notify, d));
236 }
237 }
238
leader(site_def const * s)239 static node_no leader(site_def const *s) {
240 if (s) {
241 node_no leader = 0;
242 for (leader = 0; leader < get_maxnodes(s); leader++) {
243 if (!may_be_dead(s->detected, leader, task_now()) &&
244 is_set(s->global_node_set, leader))
245 return leader;
246 }
247 }
248 return 0;
249 }
250
iamtheleader(site_def const * s)251 int iamtheleader(site_def const *s) {
252 if (!s)
253 return 0;
254 else
255 return leader(s) == s->nodeno;
256 }
257
258 extern synode_no executed_msg;
259 extern synode_no max_synode;
260
261 static site_def *last_p_site = 0;
262 static site_def *last_x_site = 0;
263
invalidate_detector_sites(site_def * site)264 void invalidate_detector_sites(site_def *site) {
265 if (last_p_site == site) {
266 last_p_site = NULL;
267 }
268
269 if (last_x_site == site) {
270 last_x_site = NULL;
271 }
272 }
273
274 /* Notify others about our current view */
detector_task(task_arg arg MY_ATTRIBUTE ((unused)))275 int detector_task(task_arg arg MY_ATTRIBUTE((unused))) {
276 DECL_ENV
277 int notify;
278 int local_notify;
279 END_ENV;
280
281 TASK_BEGIN
282 last_p_site = 0;
283 last_x_site = 0;
284 ep->notify = 1;
285 ep->local_notify = 1;
286 IFDBG(D_DETECT, FN;);
287 while (!xcom_shutdown) {
288 {
289 site_def *x_site = get_executor_site_rw();
290 #if TASK_DBUG_ON
291 site_def const *p_site = get_proposer_site();
292 if (!p_site) p_site = get_site_def();
293 #endif
294
295 IFDBG(D_DETECT, FN; SYCEXP(executed_msg); SYCEXP(max_synode));
296 IFDBG(D_DETECT, FN; PTREXP(p_site); NDBG(get_nodeno(p_site), u));
297 IFDBG(D_DETECT, FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
298
299 if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
300 if (x_site != last_x_site) {
301 reset_disjunct_servers(last_x_site, x_site);
302 }
303 update_detected(x_site);
304 if (x_site != last_x_site) {
305 last_x_site = x_site;
306 ep->notify = 1;
307 ep->local_notify = 1;
308 }
309
310 IFDBG(D_DETECT, FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
311 IFDBG(D_DETECT, FN;
312 COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
313 IFDBG(D_DETECT, FN;
314 COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
315 check_global_node_set(x_site, &ep->notify);
316 update_global_count(x_site);
317 IFDBG(D_DETECT, FN; NDBG(iamtheleader(x_site), d);
318 NDBG(enough_live_nodes(x_site), d););
319 /* Send xcom message if node has changed state */
320 IFDBG(D_DETECT, FN; NDBG(ep->notify, d));
321 if (ep->notify && iamtheleader(x_site) && enough_live_nodes(x_site)) {
322 ep->notify = 0;
323 send_my_view(x_site);
324 }
325 }
326
327 if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
328 IFDBG(D_DETECT, FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
329 IFDBG(D_DETECT, FN;
330 COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
331 IFDBG(D_DETECT, FN;
332 COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
333 update_global_count(x_site);
334 check_local_node_set(x_site, &ep->local_notify);
335 IFDBG(D_DETECT, FN; NDBG(ep->local_notify, d));
336 if (ep->local_notify) {
337 ep->local_notify = 0;
338 deliver_view_msg(x_site); /* To application */
339 }
340 }
341 }
342 TIMED_TASK_WAIT(&detector_wait, 1.0);
343 }
344
345 FINALLY
346 IFDBG(D_BUG, FN; STRLIT(" shutdown "));
347 TASK_END;
348 }
349
detector_node_set(site_def const * site)350 node_set detector_node_set(site_def const *site) {
351 node_set new_set;
352 new_set.node_set_len = 0;
353 new_set.node_set_val = 0;
354 if (site) {
355 u_int nodes = get_maxnodes(site);
356 alloc_node_set(&new_set, nodes);
357 {
358 u_int i = 0;
359 for (i = 0; i < nodes; i++) {
360 new_set.node_set_val[i] = DETECT(site, i);
361 }
362 }
363 }
364 return new_set;
365 }
366
send_my_view(site_def const * site)367 static void send_my_view(site_def const *site) {
368 app_data_ptr a = new_app_data();
369 pax_msg *msg = pax_msg_new(null_synode, site);
370 IFDBG(D_DETECT, FN;);
371 a->body.c_t = view_msg;
372 a->body.app_u_u.present = detector_node_set(site);
373 xcom_send(a, msg);
374 }
375
send_global_view()376 void send_global_view() {
377 site_def const *x_site = get_executor_site();
378 if (iamtheleader(x_site)) {
379 send_my_view(x_site);
380 }
381 }
382
383 /* Alive task */
384
385 /*
386 If a new configuration has been forced, the node's number assigned during
387 the reconfiguration may be invalid. Specifically, this situation may happen
388 when the network is down.
389
390 This code is used to try to fix that by calling the xcom_find_node_index.
391 */
validate_update_configuration(site_def const * site,synode_no alive_synode)392 static void validate_update_configuration(site_def const *site,
393 synode_no alive_synode) {
394 if (site && get_nodeno(site) == VOID_NODE_NO) {
395 site_def *site_rw = find_site_def_rw(alive_synode);
396 site_rw->nodeno = xcom_find_node_index(&site_rw->nodes);
397 }
398 }
399
400 /* Send alive messages periodically */
401 #ifdef TASK_EVENT_TRACE
402 static unsigned int dump = 0;
403 #endif
404
alive_task(task_arg arg MY_ATTRIBUTE ((unused)))405 int alive_task(task_arg arg MY_ATTRIBUTE((unused))) {
406 DECL_ENV
407 pax_msg *i_p;
408 pax_msg *you_p;
409 END_ENV;
410 TASK_BEGIN
411
412 ep->i_p = ep->you_p = NULL;
413
414 while (!xcom_shutdown) {
415 {
416 double sec = task_now();
417 synode_no alive_synode = get_current_message();
418 site_def const *site = find_site_def(alive_synode);
419
420 /*
421 If a new configuration has been forced, the site's configuration may be
422 invalid. Specifically, this function is called to verify if the site's
423 node number is valid and fix it if this is not valid.
424 */
425 validate_update_configuration(site, alive_synode);
426
427 if (site && get_nodeno(site) != VOID_NODE_NO) {
428 /* Send alive if we have not been active for some time */
429 if (server_active(site, get_nodeno(site)) < sec - 0.5) {
430 replace_pax_msg(&ep->i_p, pax_msg_new(alive_synode, site));
431 ep->i_p->op = i_am_alive_op;
432 send_to_all_site(site, ep->i_p, "alive_task");
433 }
434
435 /* Ping nodes which seem absent */
436 {
437 node_no i;
438 for (i = 0; i < get_maxnodes(site); i++) {
439 if (i != get_nodeno(site) && may_be_dead(site->detected, i, sec)) {
440 replace_pax_msg(&ep->you_p, pax_msg_new(alive_synode, site));
441 ep->you_p->op = are_you_alive_op;
442
443 ep->you_p->a = new_app_data();
444 ep->you_p->a->app_key.group_id = ep->you_p->a->group_id =
445 get_group_id(site);
446 ep->you_p->a->body.c_t = xcom_boot_type;
447 init_node_list(1, &site->nodes.node_list_val[i],
448 &ep->you_p->a->body.app_u_u.nodes);
449
450 IFDBG(D_DETECT, FN; COPY_AND_FREE_GOUT(
451 dbg_list(&ep->you_p->a->body.app_u_u.nodes)););
452
453 send_server_msg(site, i, ep->you_p);
454 }
455 }
456 }
457 }
458 }
459 TASK_DELAY(1.0);
460 #ifdef TASK_EVENT_TRACE
461 if (dump++ % 10 == 0) dump_task_events();
462 #endif
463 }
464 FINALLY
465 IFDBG(D_BUG, FN; STRLIT(" shutdown "));
466 replace_pax_msg(&ep->i_p, NULL);
467 replace_pax_msg(&ep->you_p, NULL);
468 TASK_END;
469 }
470