1 /* Copyright (c) 2012, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #ifndef XCOM_BASE_H
24 #define XCOM_BASE_H
25
26 #ifdef __cplusplus
27 extern "C" {
28 #endif
29
30 #include "xcom_os_layer.h"
31 #include "xdr_utils.h"
32 #include "task_debug.h"
33 #include "x_platform.h"
34
35
36 #define XCOM_THREAD_DEBUG 1
37
38 typedef unsigned long long synode_cnt;
39
40
41
42 #define SET_EXECUTED_MSG(x) set_executed_msg(x)
43
44 /* {{{ Constants */
45
46
47 #ifdef XCOM_TRANSACTIONS
48 static trans_id const last_trans = {
49 0xffffffff, 0xffffffff};
50
51
52 #endif
53
54 /* }}} */
55
56 extern int ARBITRATOR_HACK;
57 extern task_arg null_arg;
58
59 gpointer
60 xcom_thread_main(gpointer cp);
61
62 synode_no
63 vp_count_to_synode(u_long high, u_long low, node_no nodeid,
64 uint32_t groupid);
65
66 synode_no
67 incr_synode(synode_no synode);
68
69 synode_no
70 decr_synode(synode_no synode);
71
72 void broadcast_recover_end();
73 char *dbg_pax_msg(pax_msg const *p);
74 pax_msg *dispatch_op(site_def const * site, pax_msg *p, linkage *reply_queue);
75 synode_no set_executed_msg(synode_no msgno);
76 void request_values(synode_no find, synode_no end);
77 void set_group(uint32_t id);
78 void check_tasks();
79 int xcom_booted();
80 int iamthegreatest(site_def const *s);
81 void xcom_send(app_data_ptr a, pax_msg *msg);
82 void deliver_view_msg(site_def const *site);
83 int reply_handler_task(task_arg arg);
84 int acceptor_learner_task(task_arg arg);
85 synode_no get_max_synode();
86 void xcom_thread_deinit();
87 int taskmain(xcom_port listen_port);
88 void xcom_thread_init();
89 site_def *install_node_group(app_data_ptr a);
90 int xcom_taskmain(xcom_port listen_port);
91 int xcom_taskmain2(xcom_port listen_port);
92 void set_max_synode(synode_no synode);
93 synode_no set_current_message(synode_no msgno);
94
95 void xcom_send_data(uint32_t size, char *data);
96
97 bool_t must_force_recover();
98 channel *get_prop_input_queue();
99 int is_real_recover(app_data_ptr a);
100
101 void init_xcom_base();
102 void set_force_recover(bool_t const x);
103 void add_to_cache(app_data_ptr a, synode_no synode);
104 uint32_t new_id();
105 synode_no get_boot_key();
106 site_def const * get_executor_site();
107 site_def const *get_proposer_site();
108 synode_no get_current_message();
109 void start_run_tasks();
110
111
112 #define RESET_CLIENT_MSG if(ep->client_msg){msg_link_delete(&ep->client_msg);}
113
114 #define APP ep->client_msg->p->a
115
116 #define XAPP ep->p->learner.msg->a
117
118 #define EXPECT(x,msg) \
119 while(1){ \
120 CHANNEL_GET(&prop_input_queue, &ep->client_msg, msg_link); \
121 MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(APP));); \
122 if(APP->body.c_t != (x)){ \
123 deliver_to_app(NULL, APP, delivery_failure); \
124 RESET_CLIENT_MSG; \
125 }else{ \
126 sprintf(ep->client_msg->p->v,msg" %s",#x); \
127 /*link_into(&(msg_link_new(ep->client_msg->p)->l), &ep->boot_list);*/ \
128 ep->client_msg->p->start_type = BOOT; \
129 link_into(&ep->client_msg->l, &ep->boot_list); \
130 break; \
131 } \
132 }
133
134
135 #define FIND_MAX (CACHED/10)
136
137 #define x_state_list X(x_start) X(x_boot) X(x_recover) X(x_run) X(x_done)
138 #define x_actions X(xa_wait) X(xa_poll) X(xa_init) X(xa_u_boot) X(xa_add) X(xa_net_boot) X(xa_force_config) X(xa_snapshot) X(xa_need_snapshot) X(xa_complete) X(xa_terminate) X(xa_exit)
139 #define X(a) a,
140 enum xcom_state {
141 x_state_list
142 };
143 typedef enum xcom_state xcom_state;
144
145 enum xcom_actions {
146 x_actions
147 };
148 typedef enum xcom_actions xcom_actions;
149 #undef X
150
151 extern const char *xcom_state_name[];
152
153 extern const char *xcom_actions_name[];
154
155 struct add_args {
156 char *addr;
157 xcom_port port;
158 node_list *nl;
159 };
160 typedef struct add_args add_args;
161
162 void xcom_add_node(char *addr, xcom_port port, node_list *nl);
163
164 xcom_state xcom_fsm(xcom_actions action, task_arg fsmargs);
165 void site_post_install_action(site_def *site);
166
167 void site_install_action(site_def *site, cargo_type operation);
168 void send_client_add_node(char *srv, xcom_port port, node_list *nl);
169 void send_client_remove_node(char *srv, xcom_port port, node_list *nl);
170
171 typedef void (*xcom_data_receiver)(synode_no message_id, node_set nodes, u_int size, char *data);
172 void set_xcom_data_receiver(xcom_data_receiver x);
173
174 typedef void (*xcom_local_view_receiver)(synode_no message_id, node_set nodes);
175 void set_xcom_local_view_receiver(xcom_local_view_receiver x);
176
177 typedef void (*xcom_global_view_receiver)(synode_no config_id, synode_no message_id, node_set nodes);
178 void set_xcom_global_view_receiver(xcom_global_view_receiver x);
179
180 void set_xcom_logger(xcom_logger l);
181
182 typedef void (*app_snap_handler)(blob *gcs_snap);
183 void set_app_snap_handler(app_snap_handler x);
184
185 typedef synode_no (*app_snap_getter)(blob *gcs_snap);
186 void set_app_snap_getter(app_snap_getter x);
187
188 typedef int (*port_matcher)(xcom_port if_port);
189 void set_port_matcher(port_matcher x);
190 port_matcher get_port_matcher();
191
192 typedef void (*xcom_state_change_cb)(int status);
193 void set_xcom_run_cb(xcom_state_change_cb x);
194 void set_xcom_terminate_cb(xcom_state_change_cb x);
195 void set_xcom_exit_cb(xcom_state_change_cb x);
196 void set_xcom_expel_cb(xcom_state_change_cb x);
197
198 app_data_ptr init_config_with_group(app_data *a, node_list *nl, cargo_type type,
199 uint32_t group_id);
200
201 /*
202 Registers a callback that is called right after
203 the accept routine returns.
204 */
205 typedef int (*xcom_socket_accept_cb)(int fd, site_def const *xcom_config);
206 int set_xcom_socket_accept_cb(xcom_socket_accept_cb x);
207
208 connection_descriptor *xcom_open_client_connection(char *server,
209 xcom_port port);
210 int xcom_close_client_connection(connection_descriptor* connection);
211
212 int xcom_client_disable_arbitrator(connection_descriptor* fd);
213 int xcom_client_enable_arbitrator(connection_descriptor* fd);
214 int xcom_client_add_node(connection_descriptor* fd, node_list *nl,
215 uint32_t group_id);
216 int xcom_client_boot(connection_descriptor* fd, node_list *nl,
217 uint32_t group_id);
218 int xcom_client_force_add_node(connection_descriptor* fd, node_list *nl,
219 uint32_t group_id);
220 int xcom_client_force_config(connection_descriptor* fd, node_list *nl,
221 uint32_t group_id);
222 int xcom_client_force_remove_node(connection_descriptor* fd, node_list *nl,
223 uint32_t group_id);
224 int xcom_client_remove_node(connection_descriptor* fd, node_list *nl,
225 uint32_t group_id);
226 int64_t xcom_client_send_data(uint32_t size, char *data,
227 connection_descriptor* fd);
228 int xcom_client_terminate_and_exit(connection_descriptor* fd);
229 int xcom_client_set_cache_limit(connection_descriptor *fd, uint64_t cache_limit);
230
231 /**
232 Copies app data @c source into @c target and checks if the copy
233 succeeded. Sets *target to NULL if the copy fails.
234
235 @param[in, out] target The pax_msg to which the app_data will be copied.
236 @param source The app data that will be copied.
237 @retval TRUE if the copy was successful.
238 @retval FALSE if the copy failed, in which case *target is set to NULL;
239 a failed copy means that there was an error allocating memory for
240 the copy.
241 */
242 bool_t safe_app_data_copy(pax_msg **target, app_data_ptr source);
243
strerr_msg(char * buf,size_t len,int nr)244 static inline char *strerr_msg(char *buf, size_t len, int nr)
245 {
246 #if defined (WIN32) || defined (WIN64)
247 strerror_s(buf, len, nr);
248 #else
249 snprintf(buf, len, "%s", strerror(nr));
250 #endif
251 return buf;
252 }
253
254 #define XCOM_COMMS_ERROR 1
255 #define XCOM_COMMS_OTHER 2
256 #define XCOM_COMMS_OK 0
257 void set_xcom_comms_cb(xcom_state_change_cb x);
258
259 synode_no get_delivered_msg();
260
261 #ifdef WITH_LOG_DEBUG
262 #define XCOM_FSM(action, arg) do {const char *s = xcom_state_name[xcom_fsm(action,arg)]; G_TRACE("%f %s:%d", seconds(), __FILE__, __LINE__); G_DEBUG("new state %s",s);} while(0)
263 #else
264 #define XCOM_FSM(action, arg) do {xcom_fsm(action,arg);} while(0)
265 #endif
266
267 #ifdef __cplusplus
268 }
269 #endif
270
271 #endif
272
273