1 /* Copyright (c) 2012, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #ifndef XCOM_BASE_H
24 #define XCOM_BASE_H
25 
26 #ifdef __cplusplus
27 extern "C" {
28 #endif
29 
30 #include "xcom_os_layer.h"
31 #include "xdr_utils.h"
32 #include "task_debug.h"
33 #include "x_platform.h"
34 
35 
36 #define XCOM_THREAD_DEBUG 1
37 
38 typedef unsigned long long	synode_cnt;
39 
40 
41 
42 #define SET_EXECUTED_MSG(x) set_executed_msg(x)
43 
44 /* {{{ Constants */
45 
46 
47 #ifdef XCOM_TRANSACTIONS
48 static trans_id const last_trans = {
49 	0xffffffff, 0xffffffff};
50 
51 
52 #endif
53 
54 /* }}} */
55 
56 extern int ARBITRATOR_HACK;
57 extern task_arg null_arg;
58 
59 gpointer
60 xcom_thread_main(gpointer cp);
61 
62 synode_no
63 vp_count_to_synode(u_long high, u_long low, node_no nodeid,
64                    uint32_t groupid);
65 
66 synode_no
67 incr_synode(synode_no synode);
68 
69 synode_no
70 decr_synode(synode_no synode);
71 
72 void broadcast_recover_end();
73 char *dbg_pax_msg(pax_msg const *p);
74 pax_msg *dispatch_op(site_def const * site, pax_msg *p, linkage *reply_queue);
75 synode_no set_executed_msg(synode_no msgno);
76 void request_values(synode_no find, synode_no end);
77 void set_group(uint32_t id);
78 void check_tasks();
79 int	xcom_booted();
80 int	iamthegreatest(site_def const *s);
81 void xcom_send(app_data_ptr a, pax_msg *msg);
82 void deliver_view_msg(site_def const *site);
83 int	reply_handler_task(task_arg arg);
84 int	acceptor_learner_task(task_arg arg);
85 synode_no get_max_synode();
86 void xcom_thread_deinit();
87 int	taskmain(xcom_port listen_port);
88 void xcom_thread_init();
89 site_def *install_node_group(app_data_ptr a);
90 int	xcom_taskmain(xcom_port listen_port);
91 int	xcom_taskmain2(xcom_port listen_port);
92 void set_max_synode(synode_no synode);
93 synode_no set_current_message(synode_no msgno);
94 
95 void xcom_send_data(uint32_t size, char *data);
96 
97 bool_t must_force_recover();
98 channel *get_prop_input_queue();
99 int	is_real_recover(app_data_ptr a);
100 
101 void init_xcom_base();
102 void set_force_recover(bool_t const x);
103 void add_to_cache(app_data_ptr a, synode_no synode);
104 uint32_t new_id();
105 synode_no get_boot_key();
106 site_def const * get_executor_site();
107 site_def const *get_proposer_site();
108 synode_no get_current_message();
109 void	start_run_tasks();
110 
111 
112 #define RESET_CLIENT_MSG if(ep->client_msg){msg_link_delete(&ep->client_msg);}
113 
114 #define APP ep->client_msg->p->a
115 
116 #define XAPP ep->p->learner.msg->a
117 
118 #define EXPECT(x,msg)                                                   \
119   while(1){                                                             \
120     CHANNEL_GET(&prop_input_queue, &ep->client_msg, msg_link);          \
121     MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(APP)););                                    \
122     if(APP->body.c_t != (x)){                                           \
123       deliver_to_app(NULL, APP, delivery_failure);                      \
124       RESET_CLIENT_MSG;                                                 \
125     }else{                                                              \
126       sprintf(ep->client_msg->p->v,msg" %s",#x);                        \
127       /*link_into(&(msg_link_new(ep->client_msg->p)->l), &ep->boot_list);*/ \
128       ep->client_msg->p->start_type = BOOT;                             \
129       link_into(&ep->client_msg->l, &ep->boot_list);                    \
130       break;                                                            \
131     }                                                                   \
132   }
133 
134 
135 #define FIND_MAX (CACHED/10)
136 
137 #define x_state_list X(x_start) X(x_boot) X(x_recover) X(x_run) X(x_done)
138 #define x_actions X(xa_wait) X(xa_poll) X(xa_init) X(xa_u_boot) X(xa_add) X(xa_net_boot) X(xa_force_config) X(xa_snapshot) X(xa_need_snapshot) X(xa_complete) X(xa_terminate) X(xa_exit)
139 #define X(a) a,
140 enum xcom_state {
141   x_state_list
142 };
143 typedef enum xcom_state xcom_state;
144 
145 enum xcom_actions {
146   x_actions
147 };
148 typedef enum xcom_actions xcom_actions;
149 #undef X
150 
151 extern const char *xcom_state_name[];
152 
153 extern const char *xcom_actions_name[];
154 
155 struct add_args {
156 	char *addr;
157 	xcom_port port;
158 	node_list *nl;
159 };
160 typedef struct add_args add_args;
161 
162 void xcom_add_node(char *addr, xcom_port port, node_list *nl);
163 
164 xcom_state xcom_fsm(xcom_actions action, task_arg fsmargs);
165 void site_post_install_action(site_def *site);
166 
167 void site_install_action(site_def *site, cargo_type operation);
168 void send_client_add_node(char *srv, xcom_port port, node_list *nl);
169 void send_client_remove_node(char *srv, xcom_port port, node_list *nl);
170 
171 typedef void (*xcom_data_receiver)(synode_no message_id, node_set nodes, u_int size, char *data);
172 void set_xcom_data_receiver(xcom_data_receiver x);
173 
174 typedef void (*xcom_local_view_receiver)(synode_no message_id, node_set nodes);
175 void set_xcom_local_view_receiver(xcom_local_view_receiver x);
176 
177 typedef void (*xcom_global_view_receiver)(synode_no config_id, synode_no message_id, node_set nodes);
178 void set_xcom_global_view_receiver(xcom_global_view_receiver x);
179 
180 void set_xcom_logger(xcom_logger l);
181 
182 typedef void (*app_snap_handler)(blob *gcs_snap);
183 void set_app_snap_handler(app_snap_handler x);
184 
185 typedef synode_no (*app_snap_getter)(blob *gcs_snap);
186 void set_app_snap_getter(app_snap_getter x);
187 
188 typedef int (*port_matcher)(xcom_port if_port);
189 void set_port_matcher(port_matcher x);
190 port_matcher get_port_matcher();
191 
192 typedef void (*xcom_state_change_cb)(int status);
193 void set_xcom_run_cb(xcom_state_change_cb x);
194 void set_xcom_terminate_cb(xcom_state_change_cb x);
195 void set_xcom_exit_cb(xcom_state_change_cb x);
196 void set_xcom_expel_cb(xcom_state_change_cb x);
197 
198 app_data_ptr init_config_with_group(app_data *a, node_list *nl, cargo_type type,
199                                     uint32_t group_id);
200 
201 /*
202  Registers a callback that is called right after
203  the accept routine returns.
204  */
205 typedef int (*xcom_socket_accept_cb)(int fd, site_def const *xcom_config);
206 int set_xcom_socket_accept_cb(xcom_socket_accept_cb x);
207 
208 connection_descriptor *xcom_open_client_connection(char *server,
209                                                    xcom_port port);
210 int xcom_close_client_connection(connection_descriptor* connection);
211 
212 int	xcom_client_disable_arbitrator(connection_descriptor* fd);
213 int	xcom_client_enable_arbitrator(connection_descriptor* fd);
214 int xcom_client_add_node(connection_descriptor* fd, node_list *nl,
215                          uint32_t group_id);
216 int xcom_client_boot(connection_descriptor* fd, node_list *nl,
217                      uint32_t group_id);
218 int xcom_client_force_add_node(connection_descriptor* fd, node_list *nl,
219                                uint32_t group_id);
220 int xcom_client_force_config(connection_descriptor*  fd, node_list *nl,
221                              uint32_t group_id);
222 int xcom_client_force_remove_node(connection_descriptor*  fd, node_list *nl,
223                                   uint32_t group_id);
224 int xcom_client_remove_node(connection_descriptor* fd, node_list *nl,
225                             uint32_t group_id);
226 int64_t xcom_client_send_data(uint32_t size, char *data,
227                               connection_descriptor* fd);
228 int	xcom_client_terminate_and_exit(connection_descriptor* fd);
229 int	xcom_client_set_cache_limit(connection_descriptor *fd, uint64_t cache_limit);
230 
231 /**
232   Copies app data @c source into @c target and checks if the copy
233   succeeded. Sets *target to NULL if the copy fails.
234 
235   @param[in, out] target The pax_msg to which the app_data will be copied.
236   @param source The app data that will be copied.
237   @retval TRUE if the copy was successful.
238   @retval FALSE if the copy failed, in which case *target is set to NULL;
239           a failed copy means that there was an error allocating memory for
240           the copy.
241 */
242 bool_t safe_app_data_copy(pax_msg **target, app_data_ptr source);
243 
strerr_msg(char * buf,size_t len,int nr)244 static inline char *strerr_msg(char *buf, size_t len, int nr)
245 {
246 #if defined (WIN32) || defined (WIN64)
247   strerror_s(buf, len, nr);
248 #else
249   snprintf(buf, len, "%s", strerror(nr));
250 #endif
251   return buf;
252 }
253 
254 #define XCOM_COMMS_ERROR 1
255 #define XCOM_COMMS_OTHER 2
256 #define XCOM_COMMS_OK 0
257 void set_xcom_comms_cb(xcom_state_change_cb x);
258 
259 synode_no get_delivered_msg();
260 
261 #ifdef WITH_LOG_DEBUG
262 #define XCOM_FSM(action, arg) do {const char *s = xcom_state_name[xcom_fsm(action,arg)]; G_TRACE("%f %s:%d", seconds(), __FILE__, __LINE__); G_DEBUG("new state %s",s);} while(0)
263 #else
264 #define XCOM_FSM(action, arg) do {xcom_fsm(action,arg);} while(0)
265 #endif
266 
267 #ifdef __cplusplus
268 }
269 #endif
270 
271 #endif
272 
273