1 /* Copyright (c) 2012, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #ifndef XCOM_BASE_H
24 #define XCOM_BASE_H
25 
26 #include <stddef.h>
27 #ifdef __APPLE__
28 #include <stdio.h>
29 #endif
30 
31 #ifndef _WIN32
32 #include <netdb.h>
33 #endif
34 
35 #include "xcom/site_struct.h"
36 #include "xcom/task_arg.h"
37 #include "xcom/task_debug.h"
38 #include "xcom/x_platform.h"
39 #include "xcom/xcom_cache.h"
40 #include "xcom/xcom_input_request.h"
41 #include "xcom/xcom_os_layer.h"
42 #include "xcom/xdr_utils.h"
43 
44 #define XCOM_THREAD_DEBUG 1
45 
46 typedef unsigned long long synode_cnt;
47 
48 #define SET_EXECUTED_MSG(x) \
49   do {                      \
50     IFDBG(D_NONE, FN);      \
51     set_executed_msg(x);    \
52   } while (0)
53 
54 /* Constants */
55 
56 #ifdef XCOM_TRANSACTIONS
57 static trans_id const last_trans = {0xffffffff, 0xffffffff};
58 
59 #endif
60 
61 extern int ARBITRATOR_HACK;
62 extern task_arg null_arg;
63 
64 void *xcom_thread_main(void *cp);
65 
66 synode_no incr_synode(synode_no synode);
67 
68 synode_no decr_synode(synode_no synode);
69 
70 char *dbg_pax_msg(pax_msg const *p);
71 pax_msg *dispatch_op(site_def const *site, pax_msg *p, linkage *reply_queue);
72 synode_no set_executed_msg(synode_no msgno);
73 void request_values(synode_no find, synode_no end);
74 void set_group(uint32_t id);
75 void check_tasks();
76 int xcom_booted();
77 int iamthegreatest(site_def const *s);
78 void xcom_send(app_data_ptr a, pax_msg *msg);
79 int reply_handler_task(task_arg arg);
80 int acceptor_learner_task(task_arg arg);
81 synode_no get_max_synode();
82 void xcom_thread_deinit();
83 int taskmain(xcom_port listen_port);
84 void xcom_thread_init();
85 site_def *install_node_group(app_data_ptr a);
86 int xcom_taskmain2(xcom_port listen_port);
87 void set_max_synode(synode_no synode);
88 synode_no set_current_message(synode_no msgno);
89 
90 int is_real_recover(app_data_ptr a);
91 
92 void init_xcom_base();
93 uint32_t new_id();
94 synode_no get_boot_key();
95 site_def const *get_executor_site();
96 site_def *get_executor_site_rw();
97 site_def const *get_proposer_site();
98 synode_no get_current_message();
99 void start_run_tasks();
100 
101 int is_node_v4_reachable(char *node_address);
102 int is_node_v4_reachable_with_info(struct addrinfo *retrieved_addr_info);
103 int are_we_allowed_to_upgrade_to_v6(app_data_ptr a);
104 struct addrinfo *does_node_have_v4_address(struct addrinfo *retrieved);
105 
106 /**
107  * @brief Process incoming are_you_alive (i.e.: ping) messages and act
108  * accordingly
109  *
110  * GCS/XCom has a full mesh of connections between all nodes. A connects to B
111  * and B connects back to A.
112  *
113  * If we cut out B with, for instance, a firewall, we have the A->B connection
114  * silently dead, but we have the B->A connection alive. Since we only do
115  * monitoring on one half of the connection (the incoming messages), we will
116  * consider that B is alive, although we can't contact it. In the same way, B
117  * will consider that A is dead, since it does not receive any message from it.
118  *
119  * We must be able to break the outgoing connection if we detect that something
120  * is wrong, in order to make the bi-directional connection state consistent and
121  * report that node as unreachable. That can be done if we start receiving
122  * pings from a node that we consider that it is alive. After some pings,
123  * we just kill the outgoing connection, thus creating a consistent state.
124  *
125  * Breaking this connection should only occur if the node has already booted,
126  * meaning that the whole joining process is complete and the node is up and
127  * running. This is due to the fact that we receive pings as part of the
128  * process of joining a group.
129  *
130  * @param site current site definitions
131  * @param pm a possible ping message:
132  * @param has_client_already_booted check if this node has already booted
133  * @param current_time current XCom time
134  *
135  * @return int 1 if the node connection is closed. 0, otherwise.
136  */
137 int pre_process_incoming_ping(site_def const *site, pax_msg const *pm,
138                               int has_client_already_booted,
139                               double current_time);
140 
141 #define RESET_CLIENT_MSG              \
142   if (ep->client_msg) {               \
143     msg_link_delete(&ep->client_msg); \
144   }
145 
146 #define APP ep->client_msg->p->a
147 
148 #define FIND_MAX (MIN_LENGTH / 10)
149 
150 /* Set type and object pointer */
151 #define PLP msg->payload.manager_message_payload_u
152 #define SET_REP_TYPE(quark, ptr) \
153   q = (quark);                   \
154   object = PLP.ptr
155 
156 #define x_actions                                                              \
157   X(x_fsm_wait)                                                                \
158   , X(x_fsm_poll), X(x_fsm_init), X(x_fsm_u_boot), X(x_fsm_add),               \
159       X(x_fsm_net_boot), X(x_fsm_force_config), X(x_fsm_snapshot),             \
160       X(x_fsm_local_snapshot), X(x_fsm_snapshot_wait), X(x_fsm_need_snapshot), \
161       X(x_fsm_complete), X(x_fsm_terminate), X(x_fsm_exit), X(x_fsm_timeout)
162 #define X(a) a
163 
164 enum xcom_actions { x_actions };
165 typedef enum xcom_actions xcom_actions;
166 #undef X
167 
168 extern const char *xcom_actions_name[];
169 
170 struct add_args {
171   char *addr;
172   xcom_port port;
173   node_list *nl;
174 };
175 typedef struct add_args add_args;
176 
177 synode_no xcom_get_last_removed_from_cache();
178 
179 char const *xcom_fsm(xcom_actions action, task_arg fsmargs);
180 void site_post_install_action(site_def *site);
181 
182 void site_install_action(site_def *site, cargo_type operation);
183 void send_client_add_node(char *srv, xcom_port port, node_list *nl);
184 void send_client_remove_node(char *srv, xcom_port port, node_list *nl);
185 
186 typedef void (*xcom_full_data_receiver)(site_def const *site, pax_machine *pma,
187                                         app_data_ptr app,
188                                         delivery_status app_status);
189 void set_xcom_full_data_receiver(xcom_full_data_receiver x);
190 
191 typedef void (*xcom_full_local_view_receiver)(site_def const *site,
192                                               node_set nodes);
193 void set_xcom_full_local_view_receiver(xcom_full_local_view_receiver x);
194 
195 typedef void (*xcom_full_global_view_receiver)(site_def const *site,
196                                                synode_no message_id,
197                                                node_set nodes);
198 void set_xcom_full_global_view_receiver(xcom_full_global_view_receiver x);
199 
200 typedef void (*xcom_data_receiver)(synode_no message_id, node_set nodes,
201                                    u_int size, synode_no last_removed,
202                                    char *data);
203 void set_xcom_data_receiver(xcom_data_receiver x);
204 
205 typedef void (*xcom_local_view_receiver)(synode_no message_id, node_set nodes);
206 void set_xcom_local_view_receiver(xcom_local_view_receiver x);
207 
208 typedef void (*xcom_global_view_receiver)(synode_no config_id,
209                                           synode_no message_id, node_set nodes,
210                                           xcom_event_horizon);
211 void set_xcom_global_view_receiver(xcom_global_view_receiver x);
212 
213 typedef void (*xcom_config_receiver)(app_data *a);
214 
215 void set_xcom_config_receiver(xcom_config_receiver x);
216 
217 void set_xcom_logger(xcom_logger x);
218 void set_xcom_debugger(xcom_debugger x);
219 void set_xcom_debugger_check(xcom_debugger_check x);
220 
221 typedef void (*app_snap_handler)(blob *gcs_snap, synode_no log_start,
222                                  synode_no log_end);
223 void set_app_snap_handler(app_snap_handler x);
224 
225 typedef synode_no (*app_snap_getter)(blob *gcs_snap);
226 void set_app_snap_getter(app_snap_getter x);
227 
228 typedef void (*xcom_state_change_cb)(int status);
229 void set_xcom_run_cb(xcom_state_change_cb x);
230 void set_xcom_terminate_cb(xcom_state_change_cb x);
231 void set_xcom_exit_cb(xcom_state_change_cb x);
232 void set_xcom_expel_cb(xcom_state_change_cb x);
233 
234 typedef int (*should_exit_getter)();
235 void set_should_exit_getter(should_exit_getter x);
236 
237 typedef void (*xcom_recovery_cb)();
238 
239 void set_xcom_recovery_init_cb(xcom_recovery_cb x);
240 
241 void set_xcom_recovery_restart_cb(xcom_recovery_cb x);
242 
243 void set_xcom_recovery_begin_cb(xcom_recovery_cb x);
244 
245 void set_xcom_recovery_end_cb(xcom_recovery_cb x);
246 
247 app_data_ptr init_config_with_group(app_data *a, node_list *nl, cargo_type type,
248                                     uint32_t group_id);
249 app_data_ptr init_set_event_horizon_msg(app_data *a, uint32_t group_id,
250                                         xcom_event_horizon event_horizon);
251 app_data_ptr init_set_cache_size_msg(app_data *a, uint64_t cache_limit);
252 app_data_ptr init_get_event_horizon_msg(app_data *a, uint32_t group_id);
253 app_data_ptr init_app_msg(app_data *a, char *payload, u_int payload_size);
254 app_data_ptr init_terminate_command(app_data *a);
255 
256 void terminate_and_exit();
257 
258 /* Hook the logic to pop from the input channel. */
259 typedef xcom_input_request_ptr (*xcom_input_try_pop_cb)(void);
260 void set_xcom_input_try_pop_cb(xcom_input_try_pop_cb pop);
261 /* Create a connection to the input channel's signalling socket. */
262 bool_t xcom_input_new_signal_connection(char const *address, xcom_port port);
263 /* Signal that the input channel has commands. */
264 bool_t xcom_input_signal(void);
265 /* Destroy the connection to the input channel's signalling socket. */
266 void xcom_input_free_signal_connection(void);
267 
268 /*
269  Registers a callback that is called right after
270  the accept routine returns.
271  */
272 typedef int (*xcom_socket_accept_cb)(int fd, site_def const *config);
273 int set_xcom_socket_accept_cb(xcom_socket_accept_cb x);
274 
275 connection_descriptor *xcom_open_client_connection(char const *server,
276                                                    xcom_port port);
277 int xcom_close_client_connection(connection_descriptor *connection);
278 
279 int xcom_client_disable_arbitrator(connection_descriptor *fd);
280 int xcom_client_enable_arbitrator(connection_descriptor *fd);
281 int xcom_client_add_node(connection_descriptor *fd, node_list *nl,
282                          uint32_t group_id);
283 int xcom_client_boot(connection_descriptor *fd, node_list *nl,
284                      uint32_t group_id);
285 int xcom_client_force_add_node(connection_descriptor *fd, node_list *nl,
286                                uint32_t group_id);
287 int xcom_client_force_config(connection_descriptor *fd, node_list *nl,
288                              uint32_t group_id);
289 int xcom_client_force_remove_node(connection_descriptor *fd, node_list *nl,
290                                   uint32_t group_id);
291 int xcom_client_remove_node(connection_descriptor *fd, node_list *nl,
292                             uint32_t group_id);
293 int64_t xcom_client_send_die(connection_descriptor *fd);
294 int64_t xcom_client_send_data(uint32_t size, char *data,
295                               connection_descriptor *fd);
296 xcom_event_horizon xcom_get_minimum_event_horizon();
297 xcom_event_horizon xcom_get_maximum_event_horizon();
298 int xcom_client_get_event_horizon(connection_descriptor *fd, uint32_t group_id,
299                                   xcom_event_horizon *event_horizon);
300 int xcom_client_set_event_horizon(connection_descriptor *fd, uint32_t group_id,
301                                   xcom_event_horizon event_horizon);
302 int xcom_client_terminate_and_exit(connection_descriptor *fd);
303 int xcom_client_set_cache_limit(connection_descriptor *fd,
304                                 uint64_t cache_limit);
305 int xcom_client_get_synode_app_data(connection_descriptor *const fd,
306                                     uint32_t group_id,
307                                     synode_no_array *const synodes,
308                                     synode_app_data_array *const reply);
309 int xcom_client_convert_into_local_server(connection_descriptor *const fd);
310 int64_t xcom_send_client_app_data(connection_descriptor *fd, app_data_ptr a,
311                                   int force);
312 
313 /**
314   Copies app data @c source into @c target and checks if the copy
315   succeeded. Sets *target to NULL if the copy fails.
316 
317   @param[in, out] target The pax_msg to which the app_data will be copied.
318   @param source The app data that will be copied.
319   @retval TRUE if the copy was successful.
320   @retval FALSE if the copy failed, in which case *target is set to NULL;
321           a failed copy means that there was an error allocating memory for
322           the copy.
323 */
324 bool_t safe_app_data_copy(pax_msg **target, app_data_ptr source);
325 
326 /**
327  * Initializes the message @c msg to go through a 3-phase, regular Paxos.
328  * Executed by Proposers.
329  *
330  * @param site XCom configuration
331  * @param p Paxos instance
332  * @param msg Message to send
333  * @param msgno Synode where @c msg will be proposed
334  * @param msg_type The type of the message, e.g. normal or no_op
335  */
336 void prepare_push_3p(site_def const *site, pax_machine *p, pax_msg *msg,
337                      synode_no msgno, pax_msg_type msg_type);
338 /**
339  * Initializes the message @c p as a Prepare message, as in the message for
340  * Phase 1 (a) of the Paxos protocol.
341  * Executed by Proposers.
342  *
343  * @param p The message to send
344  */
345 void init_prepare_msg(pax_msg *p);
346 /**
347  * Initializes the message @c p as a Prepare message for a no-op, as in the
348  * message for Phase 1 (a) of the Paxos protocol.
349  * Executed by Proposers.
350  *
351  * @param p The no-op message to send
352  * @retval created paxos message of type no_op
353  */
354 pax_msg *create_noop(pax_msg *p);
355 /**
356  * Process the incoming Prepare message from a Proposer, as in the message for
357  * Phase 1 (a) of the Paxos protocol.
358  * Executed by Acceptors.
359  *
360  * @param p Paxos instance
361  * @param pm Incoming Prepare message
362  * @param synode Synode of the Paxos instance/Accept message
363  * @retval pax_msg* the reply to send to the Proposer (as in the Phase 1 (b)
364  * message of the Paxos protocol) if the Acceptor accepts the Prepare
365  * @retval NULL otherwise
366  */
367 pax_msg *handle_simple_prepare(pax_machine *p, pax_msg *pm, synode_no synode);
368 /**
369  * Process the incoming acknowledge from an Acceptor to a sent Prepare, as in
370  * the message for Phase 1 (b) of the Paxos protocol.
371  * Executed by Proposers.
372  *
373  * @param site XCom configuration
374  * @param p Paxos instance
375  * @param m Incoming message
376  * @retval TRUE if a majority of Acceptors replied to the Proposer's Prepare
377  * @retval FALSE otherwise
378  */
379 bool_t handle_simple_ack_prepare(site_def const *site, pax_machine *p,
380                                  pax_msg *m);
381 /**
382  * Initializes the proposer's message to go through a 2-phase Paxos on the
383  * proposer's reserved ballot (0,_).
384  * Executed by Proposers.
385  *
386  * @param site XCom configuration
387  * @param p Paxos instance
388  */
389 void prepare_push_2p(site_def const *site, pax_machine *p);
390 /**
391  * Initializes the message @c p as an Accept message, as in the message for
392  * Phase 2 (a) of the Paxos protocol.
393  * Executed by Proposers.
394  *
395  * @param p The message to send
396  */
397 void init_propose_msg(pax_msg *p);
398 /**
399  * Process the incoming Accept from a Proposer, as in the message for
400  * Phase 2 (a) of the Paxos protocol.
401  * Executed by Acceptors.
402  *
403  * @param p Paxos instance
404  * @param m Incoming Accept message
405  * @param synode Synode of the Paxos instance/Accept message
406  * @retval pax_msg* the reply to send to the Proposer (as in the Phase 2 (b)
407  * message of the Paxos protocol) if the Acceptor accepts the Accept
408  * @retval NULL otherwise
409  */
410 pax_msg *handle_simple_accept(pax_machine *p, pax_msg *m, synode_no synode);
411 /**
412  * Process the incoming acknowledge from an Acceptor to a sent Accept, as in
413  * the message for Phase 2 (b) of the Paxos protocol. Executed by Proposers.
414  *
415  * @param site XCom configuration
416  * @param p Paxos instance
417  * @param m Incoming message
418  * @retval pax_msg* the Learn message to send to Leaners if a majority of
419  * Acceptors replied to the Proposer's Accept
420  * @retval NULL otherwise
421  */
422 pax_msg *handle_simple_ack_accept(site_def const *site, pax_machine *p,
423                                   pax_msg *m);
424 /**
425  * Process the incoming tiny, i.e. without the learned value, Learn message.
426  * Executed by Learners.
427  *
428  * @param site XCom configuration
429  * @param pm Paxos instance
430  * @param p Incoming message
431  */
432 void handle_tiny_learn(site_def const *site, pax_machine *pm, pax_msg *p);
433 /**
434  * Process the incoming Learn message.
435  * Executed by Learners.
436  *
437  * @param site XCom configuration
438  * @param p Paxos instance
439  * @param m Incoming message
440  */
441 void handle_learn(site_def const *site, pax_machine *p, pax_msg *m);
442 /**
443  * @retval 1 if the value for the Paxos instance @c *p has been learned
444  * @retval 0 otherwise
445  */
446 int pm_finished(pax_machine *p);
447 /** @return true if we should process the incoming need_boot_op message passed
448  * in parameter p. */
449 bool_t should_handle_need_boot(site_def const *site, pax_msg *p);
450 /**
451  * Initializes the message @c p as a need_boot_op message.
452  *
453  * @param p The message to send
454  * @param identity The unique incarnation identifier of this XCom instance
455  */
456 void init_need_boot_op(pax_msg *p, node_address *identity);
457 
strerr_msg(char * buf,size_t len,int nr)458 static inline char *strerr_msg(char *buf, size_t len, int nr) {
459 #if defined(_WIN32)
460   strerror_s(buf, len, nr);
461 #else
462   snprintf(buf, len, "%s", strerror(nr));
463 #endif
464   return buf;
465 }
466 
467 #define XCOM_COMMS_ERROR 1
468 #define XCOM_COMMS_OTHER 2
469 #define XCOM_COMMS_OK 0
470 void set_xcom_comms_cb(xcom_state_change_cb x);
471 
472 extern "C" synode_no get_delivered_msg();
473 void set_max_synode_from_unified_boot(synode_no unified_boot_synode);
474 void send_x_fsm_complete();
475 synode_no get_default_start(app_data_ptr a);
476 synode_no get_last_delivered_msg();
477 void set_log_end(gcs_snapshot *gcs);
478 
479 #define XCOM_FSM(action, arg)                           \
480   do {                                                  \
481     const char *s = xcom_fsm(action, arg);              \
482     G_TRACE("%f %s:%d", seconds(), __FILE__, __LINE__); \
483     G_DEBUG("new state %s", s);                         \
484   } while (0)
485 
486 int pm_finished(pax_machine *p);
487 
488 #endif
489