1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <assert.h>
24 #include <stdlib.h>
25 
26 #include "xcom_common.h"
27 #include "simset.h"
28 #include "xcom_vp.h"
29 #include "task.h"
30 #include "task_debug.h"
31 #include "node_no.h"
32 #include "server_struct.h"
33 #include "xcom_detector.h"
34 #include "site_struct.h"
35 #include "xcom_transport.h"
36 #include "xcom_base.h"
37 #include "node_set.h"
38 #include "app_data.h"
39 #include "pax_msg.h"
40 #include "synode_no.h"
41 #include "site_def.h"
42 #include "node_list.h"
43 #include "x_platform.h"
44 
45 extern task_env *detector;
46 extern int	xcom_shutdown;
47 
48 /* static double	detected[NSERVERS]; */
49 
50 /* See if node has been suspiciously still for some time */
may_be_dead(detector_state const ds,node_no i,double seconds)51 int	may_be_dead(detector_state const ds, node_no i, double seconds)
52 {
53   /* DBGOUT(FN; NDBG(i,u); NDBG(ds[i] < seconds - 4.0, d)); */
54   return ds[i] < seconds - 4.0;
55 }
56 
init_detector(detector_state ds)57 void init_detector(detector_state ds)
58 {
59   int	i = 0;
60   for (i = 0; i < NSERVERS; i++) {
61     ds[i] = 0.0;
62   }
63 }
64 
note_detected(site_def const * site,node_no node)65 void note_detected(site_def const *site, node_no node)
66 {
67 /*   DBGOUT(FN; NDBG(node,d);); */
68 
69   /* site->servers's size is NSERVERS. */
70   assert(site->nodes.node_list_len <= NSERVERS);
71 
72   if (site && node < site->nodes.node_list_len) {
73     site->servers[node]->detected = task_now();
74   }
75 }
76 
reset_detected(site_def const * site,u_int node)77 static void reset_detected(site_def const *site, u_int node)
78 {
79 /*   DBGOUT(FN; NDBG(node,d);); */
80   /* site->servers's size is NSERVERS. */
81   assert(site->nodes.node_list_len <= NSERVERS);
82   if (site && node < site->nodes.node_list_len) {
83     site->servers[node]->detected = 0.0;
84   }
85 }
86 
reset_disjunct_servers(site_def const * old_site,site_def const * new_site)87 static void reset_disjunct_servers(site_def const *old_site, site_def const *new_site)
88 {
89   u_int node;
90 
91   if (old_site && new_site){
92 
93     /* Reset nodes which are not in new site (removed) */
94     for(node = 0; node < old_site->nodes.node_list_len; node++) {
95       if(!node_exists(&old_site->nodes.node_list_val[node], &new_site->nodes))
96         reset_detected(old_site, node);
97     }
98 
99     /* Reset nodes which are not in old site (added) */
100     for(node = 0; node < new_site->nodes.node_list_len; node++) {
101       if(!node_exists(&new_site->nodes.node_list_val[node], &old_site->nodes))
102         reset_detected(new_site, node);
103     }
104   }
105 }
106 
update_detected(site_def * site)107 void update_detected(site_def *site)
108 {
109 /*   DBGOUT(FN; NDBG(node,d);); */
110   u_int node;
111 
112   if (site){
113     /* site->servers's size is NSERVERS. */
114     assert(site->nodes.node_list_len <= NSERVERS);
115     for(node = 0; node < site->nodes.node_list_len; node++) {
116       site->detected[node] = site->servers[node]->detected;
117     }
118   }
119   site->detector_updated = 1;
120 }
121 
enough_live_nodes(site_def const * site)122 int	enough_live_nodes(site_def const *site)
123 {
124   node_no i = 0;
125   double	t = task_now();
126   node_no n = 0;
127   node_no maxnodes = get_maxnodes(site);
128   node_no self = get_nodeno(site);
129 
130   if(site && !site->detector_updated){
131 	update_detected((site_def*)site);
132   }
133 
134   /* DBGOUT(FN; NDBG(maxnodes,d); );*/
135   if (maxnodes == 0)
136     return 0;
137   for (i = 0; i < maxnodes; i++) {
138     if (i == self || t - site->detected[i] < DETECTOR_LIVE_TIMEOUT) {
139       n++;
140     }
141   }
142   /* DBGOUT(FN; NDBG(maxnodes,d); NDBG(n,d);); */
143   return
144     maxnodes > 0 &&
145     ( n > maxnodes / 2 || (ARBITRATOR_HACK && (2 == maxnodes)));
146 }
147 
148 static void send_my_view(site_def const *site);
149 
150 #define DETECT(site) (i == get_nodeno(site)) || (site->detected[i] + DETECTOR_LIVE_TIMEOUT > task_now())
151 
update_global_count(site_def * site)152 static void	update_global_count(site_def *site)
153 {
154 	u_int i;
155 	u_int nodes = get_maxnodes(site);
156 
157 	site->global_node_count = 0;
158 	for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
159 		if (site->global_node_set.node_set_val[i])
160 			site->global_node_count++;
161 	}
162 }
163 
164 #if 0
165 /*
166   This code seems to be dead.
167   TODO: validate this with OHK and then remove.
168 */
169 static void    update_global_node_set(site_def *site)
170 {
171        u_int i;
172        u_int nodes = get_maxnodes(site);
173        node_no count = 0;
174 
175        for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
176                site->global_node_set.node_set_val[i] = DETECT(site);
177        }
178 }
179 #endif
180 
check_global_node_set(site_def * site,int * notify)181 static void	check_global_node_set(site_def *site, int *notify)
182 {
183 	u_int i;
184 	u_int nodes = get_maxnodes(site);
185 
186 	site->global_node_count = 0;
187 	for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
188 		int	detect = DETECT(site);
189 		DBGOUT(FN; NDBG(i,d); NDBG(detect,d); NDBG(site->detected[i],f));
190 		if (site->global_node_set.node_set_val[i])
191 			site->global_node_count++;
192 		if (site->global_node_set.node_set_val[i] !=  detect) {
193 			*notify = 1;
194 		}
195 		DBGOHK(FN; NDBG(i,u); NDBG(*notify, d));
196 	}
197 }
198 
check_local_node_set(site_def * site,int * notify)199 static void	check_local_node_set(site_def *site, int *notify)
200 {
201 	u_int i;
202 	u_int nodes = get_maxnodes(site);
203 
204 	for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
205 		int	detect = DETECT(site);
206 		if (site->local_node_set.node_set_val[i] !=  detect) {
207 			site->local_node_set.node_set_val[i] =  detect;
208 			*notify = 1;
209 		}
210 		DBGOHK(FN; NDBG(i,u); NDBG(*notify, d));
211 	}
212 }
213 
214 #if 0
215 /*
216   This code seems to be dead.
217   TODO: validate this with OHK and then remove.
218 */
219 static void    update_local_node_set(site_def *site)
220 {
221        u_int i;
222        u_int nodes = get_maxnodes(site);
223        node_no count = 0;
224 
225        for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
226                site->local_node_set.node_set_val[i] = DETECT(site);
227        }
228 }
229 #endif
230 
leader(site_def const * s)231 static node_no	leader(site_def const *s)
232 {
233 	node_no	leader = 0;
234 	for (leader = 0; leader < get_maxnodes(s); leader++) {
235 		if (!may_be_dead(s->detected, leader, task_now()) &&
236 			is_set(s->global_node_set, leader))
237 			return leader;
238 	}
239 	return 0;
240 }
241 
242 
iamtheleader(site_def const * s)243 int	iamtheleader(site_def const *s)
244 {
245 	return leader(s) == s->nodeno;
246 }
247 
248 extern synode_no executed_msg;
249 extern synode_no max_synode;
250 
251 static site_def * last_p_site= 0;
252 static site_def * last_x_site= 0;
253 
invalidate_detector_sites(site_def * site)254 void invalidate_detector_sites(site_def *site)
255 {
256   if(last_p_site == site)
257   {
258     last_p_site = NULL;
259   }
260 
261   if(last_x_site == site)
262   {
263     last_x_site = NULL;
264   }
265 }
266 
267 /* Notify others about our current view */
detector_task(task_arg arg MY_ATTRIBUTE ((unused)))268 int	detector_task(task_arg arg MY_ATTRIBUTE((unused)))
269 {
270 	DECL_ENV
271 		int notify;
272 		int local_notify;
273 	END_ENV;
274 
275 	TASK_BEGIN
276 	last_p_site = 0;
277 	last_x_site = 0;
278 	ep->notify = 1;
279 	ep->local_notify = 1;
280 	DBGOHK(FN; );
281 	while (!xcom_shutdown) {
282 		site_def * p_site = (site_def * )get_proposer_site();
283 		site_def * x_site = (site_def * )get_executor_site();
284 
285 		if (!p_site)
286 			p_site = (site_def * )get_site_def();
287 		DBGOHK(FN; SYCEXP(executed_msg); SYCEXP(max_synode));
288 		DBGOHK(FN; PTREXP(p_site); NDBG(get_nodeno(p_site), u));
289 		DBGOHK(FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
290 
291 		if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
292 
293 			if (x_site != last_x_site) {
294 				reset_disjunct_servers(last_x_site, x_site);
295 			}
296 			update_detected(x_site);
297 			if (x_site != last_x_site) {
298 				last_x_site = x_site;
299 				ep->notify = 1;
300 				ep->local_notify = 1;
301 			}
302 
303 
304 			DBGOHK(FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
305 			DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
306 			DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
307 			check_global_node_set(x_site, &ep->notify);
308 			update_global_count(x_site);
309 			DBGOHK(FN; NDBG(iamtheleader(x_site), d); NDBG(enough_live_nodes(x_site), d); );
310 			/* Send xcom message if node has changed state */
311 			DBGOHK(FN; NDBG(ep->notify,d));
312 			if (ep->notify && iamtheleader(x_site) && enough_live_nodes(x_site)) {
313 				ep->notify = 0;
314 				send_my_view(x_site);
315 			}
316 		}
317 
318 		if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
319 			DBGOHK(FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
320 			DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
321 			DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
322 			update_global_count(x_site);
323 			check_local_node_set(x_site, &ep->local_notify);
324 			DBGOHK(FN; NDBG(ep->local_notify,d));
325 			if (ep->local_notify) {
326 				ep->local_notify = 0;
327 				deliver_view_msg(x_site); /* To application */
328 			}
329 		}
330 		TASK_DELAY(1.0);
331 	}
332 
333 	FINALLY
334 	TASK_END;
335 }
336 
detector_node_set(site_def const * site)337 node_set detector_node_set(site_def const *site)
338 {
339   node_set new_set;
340   new_set.node_set_len = 0;
341   new_set.node_set_val = 0;
342   if (site) {
343     u_int nodes = get_maxnodes(site);
344     alloc_node_set(&new_set, nodes);
345     {
346       u_int i = 0;
347       for (i = 0; i < nodes; i++) {
348         new_set.node_set_val[i] = DETECT(site);
349       }
350     }
351   }
352   return new_set;
353 }
354 
send_my_view(site_def const * site)355 static void send_my_view(site_def const *site)
356 {
357   app_data_ptr a = new_app_data();
358   pax_msg * msg = pax_msg_new(null_synode, site);
359   DBGOHK(FN;);
360   a->body.c_t = view_msg;
361   a->body.app_u_u.present = detector_node_set(site);
362   xcom_send(a, msg);
363 }
364 
365 
366 /* {{{ Alive task */
367 
368 /* Send alive messages periodically */
alive_task(task_arg arg MY_ATTRIBUTE ((unused)))369 int	alive_task(task_arg arg MY_ATTRIBUTE((unused)))
370 {
371 	DECL_ENV
372 	    pax_msg * i_p;
373 	pax_msg * you_p;
374 	END_ENV;
375 	TASK_BEGIN
376 
377 	    ep->i_p = ep->you_p = NULL;
378 
379 	while (!xcom_shutdown) {
380 		double	sec = task_now();
381 		synode_no alive_synode = get_current_message();
382 		site_def const * site = find_site_def(alive_synode);
383 		if (site && get_nodeno(site) != VOID_NODE_NO) {
384 			/* Send alive if we have not been active for some time */
385 			if (server_active(site, get_nodeno(site)) < sec - 0.5) {
386 				replace_pax_msg(&ep->i_p, pax_msg_new(alive_synode, site));
387 				ep->i_p->op = i_am_alive_op;
388 				send_to_all_site(site, ep->i_p, "alive_task");
389 			}
390 
391 			/* Ping nodes which seem absent */
392 			 {
393 				node_no i;
394 				for (i = 0; i < get_maxnodes(site); i++) {
395 					if (i != get_nodeno(site) && may_be_dead(site->detected, i, sec)) {
396 						replace_pax_msg(&ep->you_p, pax_msg_new(alive_synode, site));
397 						ep->you_p->op = are_you_alive_op;
398 						ep->you_p->a = new_app_data();
399 						ep->you_p->a->app_key.group_id = ep->you_p->a->group_id = get_group_id(site);
400 						ep->you_p->a->body.c_t = xcom_boot_type;
401 						init_node_list(1, &site->nodes.node_list_val[i], &ep->you_p->a->body.app_u_u.nodes);
402 						DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&ep->you_p->a->body.app_u_u.nodes)););
403 						send_server_msg(site, i, ep->you_p);
404 					}
405 				}
406 			}
407 		}
408 		TASK_DELAY(1.0);
409 	}
410 	FINALLY
411 	    replace_pax_msg(&ep->i_p, NULL);
412 	replace_pax_msg(&ep->you_p, NULL);
413 	TASK_END;
414 }
415 
416 
417