1 /* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <assert.h>
24 #include <stdlib.h>
25 
26 #include "xcom/app_data.h"
27 #include "xcom/xcom_profile.h"
28 #ifndef XCOM_STANDALONE
29 #include "my_compiler.h"
30 #endif
31 #include "xcom/node_list.h"
32 #include "xcom/node_no.h"
33 #include "xcom/node_set.h"
34 #include "xcom/pax_msg.h"
35 #include "xcom/server_struct.h"
36 #include "xcom/simset.h"
37 #include "xcom/site_def.h"
38 #include "xcom/site_struct.h"
39 #include "xcom/sock_probe.h"
40 #include "xcom/synode_no.h"
41 #include "xcom/task.h"
42 #include "xcom/task_debug.h"
43 #include "xcom/x_platform.h"
44 #include "xcom/xcom_base.h"
45 #include "xcom/xcom_common.h"
46 #include "xcom/xcom_detector.h"
47 #include "xcom/xcom_interface.h"
48 #include "xcom/xcom_transport.h"
49 #include "xdr_gen/xcom_vp.h"
50 
51 extern task_env *detector;
52 extern int xcom_shutdown;
53 extern linkage detector_wait;
54 #define MAX_SILENT 4.0
55 
56 #define DETECT(site, i)      \
57   (i == get_nodeno(site)) || \
58       (site->detected[i] + DETECTOR_LIVE_TIMEOUT > task_now())
59 
60 /* static double	detected[NSERVERS]; */
61 
62 /* See if node has been suspiciously still for some time */
may_be_dead(detector_state const ds,node_no i,double seconds)63 int may_be_dead(detector_state const ds, node_no i, double seconds) {
64   /* IFDBG(D_DETECT, FN; NDBG(i,u); NDBG(ds[i] < seconds - 2.0, d)); */
65   return ds[i] < seconds - MAX_SILENT;
66 }
67 
init_detector(detector_state ds)68 void init_detector(detector_state ds) {
69   int i = 0;
70   for (i = 0; i < NSERVERS; i++) {
71     ds[i] = 0.0;
72   }
73 }
74 
note_detected(site_def const * site,node_no node)75 int note_detected(site_def const *site, node_no node) {
76   int retval = 1;
77   /* IFDBG(D_DETECT, FN; NDBG(node,d);); */
78 
79   /* site->servers's size is NSERVERS. */
80   assert(site->nodes.node_list_len <= NSERVERS);
81 
82   if (site && node < site->nodes.node_list_len) {
83     retval = DETECT(site, node);
84     server_detected(site->servers[node]);
85   }
86   return retval;
87 }
88 
89 /**
90  * @brief States if a server is still physically connected to another server.
91  * This will test the connection state to that node.
92  *
93  * @param site site definition that contains the server collections.
94  * @param node node index that we want to test for connectivity
95  *
96  * @return 1 if the server is connected. 0 otherwise.
97  */
is_server_connected(struct site_def const * site,node_no node)98 int is_server_connected(struct site_def const *site, node_no node) {
99   int retval = 0;
100 
101   if (site) {
102     if (get_nodeno(site) == node) {  // Me to myself... i'm always connected
103       retval = 1;
104     } else if (node < site->nodes.node_list_len) {
105       retval = is_connected(&site->servers[node]->con);
106     }
107   }
108 
109   return retval;
110 }
111 
reset_detected(site_def const * site,u_int node)112 static void reset_detected(site_def const *site, u_int node) {
113   IFDBG(D_DETECT, FN; PTREXP(site); NDBG(node, d););
114   /* site->servers's size is NSERVERS. */
115   assert(site->nodes.node_list_len <= NSERVERS);
116   if (site && node < site->nodes.node_list_len) {
117     site->servers[node]->detected = 0.0;
118   }
119 }
120 
reset_disjunct_servers(struct site_def const * old_site,struct site_def const * new_site)121 void reset_disjunct_servers(struct site_def const *old_site,
122                             struct site_def const *new_site) {
123   u_int node;
124 
125   if (old_site && new_site) {
126     /* Reset nodes which are not in new site (removed) */
127     for (node = 0; node < old_site->nodes.node_list_len; node++) {
128       if (!node_exists(&old_site->nodes.node_list_val[node], &new_site->nodes))
129         reset_detected(old_site, node);
130     }
131   }
132 }
133 
dbg_detected(site_def * site)134 static void dbg_detected(site_def *site) {
135   u_int node;
136 
137   if (site) {
138     for (node = 0; node < site->nodes.node_list_len; node++) {
139       IFDBG(D_DETECT, FN; NDBG(node, d); NDBG(site->detected[node], f);
140             NDBG(site->servers[node]->detected, f));
141     }
142   }
143 }
144 
update_detected(site_def * site)145 void update_detected(site_def *site) {
146   u_int node;
147 
148   if (site) {
149     bool_t changed = FALSE;
150     for (node = 0; node < site->nodes.node_list_len; node++) {
151       IFDBG(D_DETECT, FN; NDBG(node, d); NDBG(site->detected[node], f);
152             NDBG(site->servers[node]->detected, f));
153       if (site->detected[node] != site->servers[node]->detected) changed = TRUE;
154       site->detected[node] = site->servers[node]->detected;
155     }
156     site->detector_updated = 1;
157 
158     if (changed) {
159       dbg_detected(site);
160     }
161   }
162 }
163 
enough_live_nodes(site_def * site)164 int enough_live_nodes(site_def *site) {
165   node_no i = 0;
166   double t = task_now();
167   node_no n = 0;
168   node_no maxnodes = get_maxnodes(site);
169   node_no self = get_nodeno(site);
170 
171   update_detected(site);
172 
173   /* IFDBG(D_DETECT, FN; NDBG(maxnodes,d); );*/
174   if (maxnodes == 0) return 0;
175   for (i = 0; i < maxnodes; i++) {
176     if (i == self || t - site->detected[i] < DETECTOR_LIVE_TIMEOUT) {
177       n++;
178     }
179   }
180 /* IFDBG(D_DETECT, FN; NDBG(maxnodes,d); NDBG(n,d);); */
181 #ifdef NODE_0_IS_ARBITRATOR
182   return maxnodes > 0 &&
183          (n > maxnodes / 2 ||
184           (ARBITRATOR_HACK && (get_nodeno(site) == 0) && (2 == maxnodes)));
185 #else
186   return maxnodes > 0 &&
187          (n > maxnodes / 2 || (ARBITRATOR_HACK && (2 == maxnodes)));
188 #endif
189 }
190 
191 static void send_my_view(site_def const *site);
192 
update_global_count(site_def * site)193 static void update_global_count(site_def *site) {
194   u_int i;
195   u_int nodes = get_maxnodes(site);
196 
197   site->global_node_count = 0;
198   for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
199     if (site->global_node_set.node_set_val[i]) site->global_node_count++;
200   }
201 }
202 
check_global_node_set(site_def * site,int * notify)203 static void check_global_node_set(site_def *site, int *notify) {
204   u_int i;
205   u_int nodes = get_maxnodes(site);
206 
207   site->global_node_count = 0;
208   for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
209     int detect = DETECT(site, i);
210     IFDBG(
211         D_DETECT, if (i == 0) {
212           FN;
213           NDBG(task_now(), f);
214         });
215     IFDBG(D_DETECT, FN; NDBG(i, d); NDBG(detect, d);
216           NDBG(site->detected[i], f));
217     if (site->global_node_set.node_set_val[i]) site->global_node_count++;
218     if (site->global_node_set.node_set_val[i] != detect) {
219       *notify = 1;
220     }
221     IFDBG(D_DETECT, FN; NDBG(i, u); NDBG(*notify, d));
222   }
223 }
224 
check_local_node_set(site_def * site,int * notify)225 static void check_local_node_set(site_def *site, int *notify) {
226   u_int i;
227   u_int nodes = get_maxnodes(site);
228 
229   for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
230     int detect = DETECT(site, i);
231     if (site->local_node_set.node_set_val[i] != detect) {
232       site->local_node_set.node_set_val[i] = detect;
233       *notify = 1;
234     }
235     IFDBG(D_DETECT, FN; NDBG(i, u); NDBG(*notify, d));
236   }
237 }
238 
leader(site_def const * s)239 static node_no leader(site_def const *s) {
240   if (s) {
241     node_no leader = 0;
242     for (leader = 0; leader < get_maxnodes(s); leader++) {
243       if (!may_be_dead(s->detected, leader, task_now()) &&
244           is_set(s->global_node_set, leader))
245         return leader;
246     }
247   }
248   return 0;
249 }
250 
iamtheleader(site_def const * s)251 int iamtheleader(site_def const *s) {
252   if (!s)
253     return 0;
254   else
255     return leader(s) == s->nodeno;
256 }
257 
258 extern synode_no executed_msg;
259 extern synode_no max_synode;
260 
261 static site_def *last_p_site = 0;
262 static site_def *last_x_site = 0;
263 
invalidate_detector_sites(site_def * site)264 void invalidate_detector_sites(site_def *site) {
265   if (last_p_site == site) {
266     last_p_site = NULL;
267   }
268 
269   if (last_x_site == site) {
270     last_x_site = NULL;
271   }
272 }
273 
274 /* Notify others about our current view */
detector_task(task_arg arg MY_ATTRIBUTE ((unused)))275 int detector_task(task_arg arg MY_ATTRIBUTE((unused))) {
276   DECL_ENV
277   int notify;
278   int local_notify;
279   END_ENV;
280 
281   TASK_BEGIN
282   last_p_site = 0;
283   last_x_site = 0;
284   ep->notify = 1;
285   ep->local_notify = 1;
286   IFDBG(D_DETECT, FN;);
287   while (!xcom_shutdown) {
288     {
289       site_def *x_site = get_executor_site_rw();
290 #if TASK_DBUG_ON
291       site_def const *p_site = get_proposer_site();
292       if (!p_site) p_site = get_site_def();
293 #endif
294 
295       IFDBG(D_DETECT, FN; SYCEXP(executed_msg); SYCEXP(max_synode));
296       IFDBG(D_DETECT, FN; PTREXP(p_site); NDBG(get_nodeno(p_site), u));
297       IFDBG(D_DETECT, FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
298 
299       if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
300         if (x_site != last_x_site) {
301           reset_disjunct_servers(last_x_site, x_site);
302         }
303         update_detected(x_site);
304         if (x_site != last_x_site) {
305           last_x_site = x_site;
306           ep->notify = 1;
307           ep->local_notify = 1;
308         }
309 
310         IFDBG(D_DETECT, FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
311         IFDBG(D_DETECT, FN;
312               COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
313         IFDBG(D_DETECT, FN;
314               COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
315         check_global_node_set(x_site, &ep->notify);
316         update_global_count(x_site);
317         IFDBG(D_DETECT, FN; NDBG(iamtheleader(x_site), d);
318               NDBG(enough_live_nodes(x_site), d););
319         /* Send xcom message if node has changed state */
320         IFDBG(D_DETECT, FN; NDBG(ep->notify, d));
321         if (ep->notify && iamtheleader(x_site) && enough_live_nodes(x_site)) {
322           ep->notify = 0;
323           send_my_view(x_site);
324         }
325       }
326 
327       if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
328         IFDBG(D_DETECT, FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
329         IFDBG(D_DETECT, FN;
330               COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
331         IFDBG(D_DETECT, FN;
332               COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
333         update_global_count(x_site);
334         check_local_node_set(x_site, &ep->local_notify);
335         IFDBG(D_DETECT, FN; NDBG(ep->local_notify, d));
336         if (ep->local_notify) {
337           ep->local_notify = 0;
338           deliver_view_msg(x_site); /* To application */
339         }
340       }
341     }
342     TIMED_TASK_WAIT(&detector_wait, 1.0);
343   }
344 
345   FINALLY
346   IFDBG(D_BUG, FN; STRLIT(" shutdown "));
347   TASK_END;
348 }
349 
detector_node_set(site_def const * site)350 node_set detector_node_set(site_def const *site) {
351   node_set new_set;
352   new_set.node_set_len = 0;
353   new_set.node_set_val = 0;
354   if (site) {
355     u_int nodes = get_maxnodes(site);
356     alloc_node_set(&new_set, nodes);
357     {
358       u_int i = 0;
359       for (i = 0; i < nodes; i++) {
360         new_set.node_set_val[i] = DETECT(site, i);
361       }
362     }
363   }
364   return new_set;
365 }
366 
send_my_view(site_def const * site)367 static void send_my_view(site_def const *site) {
368   app_data_ptr a = new_app_data();
369   pax_msg *msg = pax_msg_new(null_synode, site);
370   IFDBG(D_DETECT, FN;);
371   a->body.c_t = view_msg;
372   a->body.app_u_u.present = detector_node_set(site);
373   xcom_send(a, msg);
374 }
375 
send_global_view()376 void send_global_view() {
377   site_def const *x_site = get_executor_site();
378   if (iamtheleader(x_site)) {
379     send_my_view(x_site);
380   }
381 }
382 
383 /* Alive task */
384 
385 /*
386    If a new configuration has been forced, the node's number assigned during
387    the reconfiguration may be invalid. Specifically, this situation may happen
388    when the network is down.
389 
390    This code is used to try to fix that by calling the xcom_find_node_index.
391  */
validate_update_configuration(site_def const * site,synode_no alive_synode)392 static void validate_update_configuration(site_def const *site,
393                                           synode_no alive_synode) {
394   if (site && get_nodeno(site) == VOID_NODE_NO) {
395     site_def *site_rw = find_site_def_rw(alive_synode);
396     site_rw->nodeno = xcom_find_node_index(&site_rw->nodes);
397   }
398 }
399 
400 /* Send alive messages periodically */
401 #ifdef TASK_EVENT_TRACE
402 static unsigned int dump = 0;
403 #endif
404 
alive_task(task_arg arg MY_ATTRIBUTE ((unused)))405 int alive_task(task_arg arg MY_ATTRIBUTE((unused))) {
406   DECL_ENV
407   pax_msg *i_p;
408   pax_msg *you_p;
409   END_ENV;
410   TASK_BEGIN
411 
412   ep->i_p = ep->you_p = NULL;
413 
414   while (!xcom_shutdown) {
415     {
416       double sec = task_now();
417       synode_no alive_synode = get_current_message();
418       site_def const *site = find_site_def(alive_synode);
419 
420       /*
421         If a new configuration has been forced, the site's configuration may be
422         invalid. Specifically, this function is called to verify if the site's
423         node number is valid and fix it if this is not valid.
424       */
425       validate_update_configuration(site, alive_synode);
426 
427       if (site && get_nodeno(site) != VOID_NODE_NO) {
428         /* Send alive if we have not been active for some time */
429         if (server_active(site, get_nodeno(site)) < sec - 0.5) {
430           replace_pax_msg(&ep->i_p, pax_msg_new(alive_synode, site));
431           ep->i_p->op = i_am_alive_op;
432           send_to_all_site(site, ep->i_p, "alive_task");
433         }
434 
435         /* Ping nodes which seem absent */
436         {
437           node_no i;
438           for (i = 0; i < get_maxnodes(site); i++) {
439             if (i != get_nodeno(site) && may_be_dead(site->detected, i, sec)) {
440               replace_pax_msg(&ep->you_p, pax_msg_new(alive_synode, site));
441               ep->you_p->op = are_you_alive_op;
442 
443               ep->you_p->a = new_app_data();
444               ep->you_p->a->app_key.group_id = ep->you_p->a->group_id =
445                   get_group_id(site);
446               ep->you_p->a->body.c_t = xcom_boot_type;
447               init_node_list(1, &site->nodes.node_list_val[i],
448                              &ep->you_p->a->body.app_u_u.nodes);
449 
450               IFDBG(D_DETECT, FN; COPY_AND_FREE_GOUT(
451                         dbg_list(&ep->you_p->a->body.app_u_u.nodes)););
452 
453               send_server_msg(site, i, ep->you_p);
454             }
455           }
456         }
457       }
458     }
459     TASK_DELAY(1.0);
460 #ifdef TASK_EVENT_TRACE
461     if (dump++ % 10 == 0) dump_task_events();
462 #endif
463   }
464   FINALLY
465   IFDBG(D_BUG, FN; STRLIT(" shutdown "));
466   replace_pax_msg(&ep->i_p, NULL);
467   replace_pax_msg(&ep->you_p, NULL);
468   TASK_END;
469 }
470