1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <assert.h>
24 #include <stdlib.h>
25
26 #include "xcom_common.h"
27 #include "simset.h"
28 #include "xcom_vp.h"
29 #include "task.h"
30 #include "task_debug.h"
31 #include "node_no.h"
32 #include "server_struct.h"
33 #include "xcom_detector.h"
34 #include "site_struct.h"
35 #include "xcom_transport.h"
36 #include "xcom_base.h"
37 #include "node_set.h"
38 #include "app_data.h"
39 #include "pax_msg.h"
40 #include "synode_no.h"
41 #include "site_def.h"
42 #include "node_list.h"
43 #include "x_platform.h"
44
45 extern task_env *detector;
46 extern int xcom_shutdown;
47
48 /* static double detected[NSERVERS]; */
49
50 /* See if node has been suspiciously still for some time */
may_be_dead(detector_state const ds,node_no i,double seconds)51 int may_be_dead(detector_state const ds, node_no i, double seconds)
52 {
53 /* DBGOUT(FN; NDBG(i,u); NDBG(ds[i] < seconds - 4.0, d)); */
54 return ds[i] < seconds - 4.0;
55 }
56
init_detector(detector_state ds)57 void init_detector(detector_state ds)
58 {
59 int i = 0;
60 for (i = 0; i < NSERVERS; i++) {
61 ds[i] = 0.0;
62 }
63 }
64
note_detected(site_def const * site,node_no node)65 void note_detected(site_def const *site, node_no node)
66 {
67 /* DBGOUT(FN; NDBG(node,d);); */
68
69 /* site->servers's size is NSERVERS. */
70 assert(site->nodes.node_list_len <= NSERVERS);
71
72 if (site && node < site->nodes.node_list_len) {
73 site->servers[node]->detected = task_now();
74 }
75 }
76
reset_detected(site_def const * site,u_int node)77 static void reset_detected(site_def const *site, u_int node)
78 {
79 /* DBGOUT(FN; NDBG(node,d);); */
80 /* site->servers's size is NSERVERS. */
81 assert(site->nodes.node_list_len <= NSERVERS);
82 if (site && node < site->nodes.node_list_len) {
83 site->servers[node]->detected = 0.0;
84 }
85 }
86
reset_disjunct_servers(site_def const * old_site,site_def const * new_site)87 static void reset_disjunct_servers(site_def const *old_site, site_def const *new_site)
88 {
89 u_int node;
90
91 if (old_site && new_site){
92
93 /* Reset nodes which are not in new site (removed) */
94 for(node = 0; node < old_site->nodes.node_list_len; node++) {
95 if(!node_exists(&old_site->nodes.node_list_val[node], &new_site->nodes))
96 reset_detected(old_site, node);
97 }
98
99 /* Reset nodes which are not in old site (added) */
100 for(node = 0; node < new_site->nodes.node_list_len; node++) {
101 if(!node_exists(&new_site->nodes.node_list_val[node], &old_site->nodes))
102 reset_detected(new_site, node);
103 }
104 }
105 }
106
update_detected(site_def * site)107 void update_detected(site_def *site)
108 {
109 /* DBGOUT(FN; NDBG(node,d);); */
110 u_int node;
111
112 if (site){
113 /* site->servers's size is NSERVERS. */
114 assert(site->nodes.node_list_len <= NSERVERS);
115 for(node = 0; node < site->nodes.node_list_len; node++) {
116 site->detected[node] = site->servers[node]->detected;
117 }
118 }
119 site->detector_updated = 1;
120 }
121
enough_live_nodes(site_def const * site)122 int enough_live_nodes(site_def const *site)
123 {
124 node_no i = 0;
125 double t = task_now();
126 node_no n = 0;
127 node_no maxnodes = get_maxnodes(site);
128 node_no self = get_nodeno(site);
129
130 if(site && !site->detector_updated){
131 update_detected((site_def*)site);
132 }
133
134 /* DBGOUT(FN; NDBG(maxnodes,d); );*/
135 if (maxnodes == 0)
136 return 0;
137 for (i = 0; i < maxnodes; i++) {
138 if (i == self || t - site->detected[i] < DETECTOR_LIVE_TIMEOUT) {
139 n++;
140 }
141 }
142 /* DBGOUT(FN; NDBG(maxnodes,d); NDBG(n,d);); */
143 return
144 maxnodes > 0 &&
145 ( n > maxnodes / 2 || (ARBITRATOR_HACK && (2 == maxnodes)));
146 }
147
148 static void send_my_view(site_def const *site);
149
150 #define DETECT(site) (i == get_nodeno(site)) || (site->detected[i] + DETECTOR_LIVE_TIMEOUT > task_now())
151
update_global_count(site_def * site)152 static void update_global_count(site_def *site)
153 {
154 u_int i;
155 u_int nodes = get_maxnodes(site);
156
157 site->global_node_count = 0;
158 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
159 if (site->global_node_set.node_set_val[i])
160 site->global_node_count++;
161 }
162 }
163
164 #if 0
165 /*
166 This code seems to be dead.
167 TODO: validate this with OHK and then remove.
168 */
169 static void update_global_node_set(site_def *site)
170 {
171 u_int i;
172 u_int nodes = get_maxnodes(site);
173 node_no count = 0;
174
175 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
176 site->global_node_set.node_set_val[i] = DETECT(site);
177 }
178 }
179 #endif
180
check_global_node_set(site_def * site,int * notify)181 static void check_global_node_set(site_def *site, int *notify)
182 {
183 u_int i;
184 u_int nodes = get_maxnodes(site);
185
186 site->global_node_count = 0;
187 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
188 int detect = DETECT(site);
189 DBGOUT(FN; NDBG(i,d); NDBG(detect,d); NDBG(site->detected[i],f));
190 if (site->global_node_set.node_set_val[i])
191 site->global_node_count++;
192 if (site->global_node_set.node_set_val[i] != detect) {
193 *notify = 1;
194 }
195 DBGOHK(FN; NDBG(i,u); NDBG(*notify, d));
196 }
197 }
198
check_local_node_set(site_def * site,int * notify)199 static void check_local_node_set(site_def *site, int *notify)
200 {
201 u_int i;
202 u_int nodes = get_maxnodes(site);
203
204 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
205 int detect = DETECT(site);
206 if (site->local_node_set.node_set_val[i] != detect) {
207 site->local_node_set.node_set_val[i] = detect;
208 *notify = 1;
209 }
210 DBGOHK(FN; NDBG(i,u); NDBG(*notify, d));
211 }
212 }
213
214 #if 0
215 /*
216 This code seems to be dead.
217 TODO: validate this with OHK and then remove.
218 */
219 static void update_local_node_set(site_def *site)
220 {
221 u_int i;
222 u_int nodes = get_maxnodes(site);
223 node_no count = 0;
224
225 for (i = 0; i < nodes && i < site->global_node_set.node_set_len; i++) {
226 site->local_node_set.node_set_val[i] = DETECT(site);
227 }
228 }
229 #endif
230
leader(site_def const * s)231 static node_no leader(site_def const *s)
232 {
233 node_no leader = 0;
234 for (leader = 0; leader < get_maxnodes(s); leader++) {
235 if (!may_be_dead(s->detected, leader, task_now()) &&
236 is_set(s->global_node_set, leader))
237 return leader;
238 }
239 return 0;
240 }
241
242
iamtheleader(site_def const * s)243 int iamtheleader(site_def const *s)
244 {
245 return leader(s) == s->nodeno;
246 }
247
248 extern synode_no executed_msg;
249 extern synode_no max_synode;
250
251 static site_def * last_p_site= 0;
252 static site_def * last_x_site= 0;
253
invalidate_detector_sites(site_def * site)254 void invalidate_detector_sites(site_def *site)
255 {
256 if(last_p_site == site)
257 {
258 last_p_site = NULL;
259 }
260
261 if(last_x_site == site)
262 {
263 last_x_site = NULL;
264 }
265 }
266
267 /* Notify others about our current view */
detector_task(task_arg arg MY_ATTRIBUTE ((unused)))268 int detector_task(task_arg arg MY_ATTRIBUTE((unused)))
269 {
270 DECL_ENV
271 int notify;
272 int local_notify;
273 END_ENV;
274
275 TASK_BEGIN
276 last_p_site = 0;
277 last_x_site = 0;
278 ep->notify = 1;
279 ep->local_notify = 1;
280 DBGOHK(FN; );
281 while (!xcom_shutdown) {
282 site_def * p_site = (site_def * )get_proposer_site();
283 site_def * x_site = (site_def * )get_executor_site();
284
285 if (!p_site)
286 p_site = (site_def * )get_site_def();
287 DBGOHK(FN; SYCEXP(executed_msg); SYCEXP(max_synode));
288 DBGOHK(FN; PTREXP(p_site); NDBG(get_nodeno(p_site), u));
289 DBGOHK(FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
290
291 if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
292
293 if (x_site != last_x_site) {
294 reset_disjunct_servers(last_x_site, x_site);
295 }
296 update_detected(x_site);
297 if (x_site != last_x_site) {
298 last_x_site = x_site;
299 ep->notify = 1;
300 ep->local_notify = 1;
301 }
302
303
304 DBGOHK(FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
305 DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
306 DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
307 check_global_node_set(x_site, &ep->notify);
308 update_global_count(x_site);
309 DBGOHK(FN; NDBG(iamtheleader(x_site), d); NDBG(enough_live_nodes(x_site), d); );
310 /* Send xcom message if node has changed state */
311 DBGOHK(FN; NDBG(ep->notify,d));
312 if (ep->notify && iamtheleader(x_site) && enough_live_nodes(x_site)) {
313 ep->notify = 0;
314 send_my_view(x_site);
315 }
316 }
317
318 if (x_site && get_nodeno(x_site) != VOID_NODE_NO) {
319 DBGOHK(FN; PTREXP(x_site); NDBG(get_nodeno(x_site), u));
320 DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->global_node_set)));
321 DBGOHK(FN; COPY_AND_FREE_GOUT(dbg_node_set(x_site->local_node_set)));
322 update_global_count(x_site);
323 check_local_node_set(x_site, &ep->local_notify);
324 DBGOHK(FN; NDBG(ep->local_notify,d));
325 if (ep->local_notify) {
326 ep->local_notify = 0;
327 deliver_view_msg(x_site); /* To application */
328 }
329 }
330 TASK_DELAY(1.0);
331 }
332
333 FINALLY
334 TASK_END;
335 }
336
detector_node_set(site_def const * site)337 node_set detector_node_set(site_def const *site)
338 {
339 node_set new_set;
340 new_set.node_set_len = 0;
341 new_set.node_set_val = 0;
342 if (site) {
343 u_int nodes = get_maxnodes(site);
344 alloc_node_set(&new_set, nodes);
345 {
346 u_int i = 0;
347 for (i = 0; i < nodes; i++) {
348 new_set.node_set_val[i] = DETECT(site);
349 }
350 }
351 }
352 return new_set;
353 }
354
send_my_view(site_def const * site)355 static void send_my_view(site_def const *site)
356 {
357 app_data_ptr a = new_app_data();
358 pax_msg * msg = pax_msg_new(null_synode, site);
359 DBGOHK(FN;);
360 a->body.c_t = view_msg;
361 a->body.app_u_u.present = detector_node_set(site);
362 xcom_send(a, msg);
363 }
364
365
366 /* {{{ Alive task */
367
368 /* Send alive messages periodically */
alive_task(task_arg arg MY_ATTRIBUTE ((unused)))369 int alive_task(task_arg arg MY_ATTRIBUTE((unused)))
370 {
371 DECL_ENV
372 pax_msg * i_p;
373 pax_msg * you_p;
374 END_ENV;
375 TASK_BEGIN
376
377 ep->i_p = ep->you_p = NULL;
378
379 while (!xcom_shutdown) {
380 double sec = task_now();
381 synode_no alive_synode = get_current_message();
382 site_def const * site = find_site_def(alive_synode);
383 if (site && get_nodeno(site) != VOID_NODE_NO) {
384 /* Send alive if we have not been active for some time */
385 if (server_active(site, get_nodeno(site)) < sec - 0.5) {
386 replace_pax_msg(&ep->i_p, pax_msg_new(alive_synode, site));
387 ep->i_p->op = i_am_alive_op;
388 send_to_all_site(site, ep->i_p, "alive_task");
389 }
390
391 /* Ping nodes which seem absent */
392 {
393 node_no i;
394 for (i = 0; i < get_maxnodes(site); i++) {
395 if (i != get_nodeno(site) && may_be_dead(site->detected, i, sec)) {
396 replace_pax_msg(&ep->you_p, pax_msg_new(alive_synode, site));
397 ep->you_p->op = are_you_alive_op;
398 ep->you_p->a = new_app_data();
399 ep->you_p->a->app_key.group_id = ep->you_p->a->group_id = get_group_id(site);
400 ep->you_p->a->body.c_t = xcom_boot_type;
401 init_node_list(1, &site->nodes.node_list_val[i], &ep->you_p->a->body.app_u_u.nodes);
402 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&ep->you_p->a->body.app_u_u.nodes)););
403 send_server_msg(site, i, ep->you_p);
404 }
405 }
406 }
407 }
408 TASK_DELAY(1.0);
409 }
410 FINALLY
411 replace_pax_msg(&ep->i_p, NULL);
412 replace_pax_msg(&ep->you_p, NULL);
413 TASK_END;
414 }
415
416
417