1 /* Copyright 2008-2021 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 #ifndef WSREP_MYSQLD_H
17 #define WSREP_MYSQLD_H
18 
19 #include <wsrep.h>
20 
21 #ifdef WITH_WSREP
22 extern bool WSREP_ON_;
23 extern bool WSREP_PROVIDER_EXISTS_;
24 
25 #include <mysql/plugin.h>
26 #include "mysql/service_wsrep.h"
27 
28 #include <my_global.h>
29 #include <my_pthread.h>
30 #include "log.h"
31 #include "mysqld.h"
32 
33 typedef struct st_mysql_show_var SHOW_VAR;
34 #include <sql_priv.h>
35 #include "mdl.h"
36 #include "sql_table.h"
37 #include "wsrep_mysqld_c.h"
38 
39 #include "wsrep/provider.hpp"
40 #include "wsrep/streaming_context.hpp"
41 #include "wsrep_api.h"
42 #include <vector>
43 #include "wsrep_server_state.h"
44 
45 #define WSREP_UNDEFINED_TRX_ID ULONGLONG_MAX
46 
47 class set_var;
48 class THD;
49 
50 enum wsrep_consistency_check_mode {
51     NO_CONSISTENCY_CHECK,
52     CONSISTENCY_CHECK_DECLARED,
53     CONSISTENCY_CHECK_RUNNING,
54 };
55 
56 // Global wsrep parameters
57 
58 // MySQL wsrep options
59 extern const char* wsrep_provider;
60 extern const char* wsrep_provider_options;
61 extern const char* wsrep_cluster_name;
62 extern const char* wsrep_cluster_address;
63 extern const char* wsrep_node_name;
64 extern const char* wsrep_node_address;
65 extern const char* wsrep_node_incoming_address;
66 extern const char* wsrep_data_home_dir;
67 extern const char* wsrep_dbug_option;
68 extern long        wsrep_slave_threads;
69 extern int         wsrep_slave_count_change;
70 extern ulong       wsrep_debug;
71 extern my_bool     wsrep_convert_LOCK_to_trx;
72 extern ulong       wsrep_retry_autocommit;
73 extern my_bool     wsrep_auto_increment_control;
74 extern my_bool     wsrep_drupal_282555_workaround;
75 extern my_bool     wsrep_incremental_data_collection;
76 extern const char* wsrep_start_position;
77 extern ulong       wsrep_max_ws_size;
78 extern ulong       wsrep_max_ws_rows;
79 extern const char* wsrep_notify_cmd;
80 extern my_bool     wsrep_certify_nonPK;
81 extern long int    wsrep_protocol_version;
82 extern ulong       wsrep_forced_binlog_format;
83 extern my_bool     wsrep_desync;
84 extern ulong       wsrep_reject_queries;
85 extern my_bool     wsrep_recovery;
86 extern my_bool     wsrep_replicate_myisam;
87 extern my_bool     wsrep_log_conflicts;
88 extern ulong       wsrep_mysql_replication_bundle;
89 extern my_bool     wsrep_load_data_splitting;
90 extern my_bool     wsrep_restart_slave;
91 extern my_bool     wsrep_restart_slave_activated;
92 extern my_bool     wsrep_slave_FK_checks;
93 extern my_bool     wsrep_slave_UK_checks;
94 extern ulong       wsrep_trx_fragment_unit;
95 extern ulong       wsrep_SR_store_type;
96 extern uint        wsrep_ignore_apply_errors;
97 extern ulong       wsrep_running_threads;
98 extern ulong       wsrep_running_applier_threads;
99 extern ulong       wsrep_running_rollbacker_threads;
100 extern bool        wsrep_new_cluster;
101 extern bool        wsrep_gtid_mode;
102 extern uint32      wsrep_gtid_domain_id;
103 
104 enum enum_wsrep_reject_types {
105   WSREP_REJECT_NONE,    /* nothing rejected */
106   WSREP_REJECT_ALL,     /* reject all queries, with UNKNOWN_COMMAND error */
107   WSREP_REJECT_ALL_KILL /* kill existing connections and reject all queries*/
108 };
109 
110 enum enum_wsrep_OSU_method {
111     WSREP_OSU_TOI,
112     WSREP_OSU_RSU,
113     WSREP_OSU_NONE,
114 };
115 
116 enum enum_wsrep_sync_wait {
117     WSREP_SYNC_WAIT_NONE= 0x0,
118     // select, begin
119     WSREP_SYNC_WAIT_BEFORE_READ= 0x1,
120     WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE= 0x2,
121     WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE= 0x4,
122     WSREP_SYNC_WAIT_BEFORE_SHOW= 0x8,
123     WSREP_SYNC_WAIT_MAX= 0xF
124 };
125 
126 enum enum_wsrep_ignore_apply_error {
127     WSREP_IGNORE_ERRORS_NONE= 0x0,
128     WSREP_IGNORE_ERRORS_ON_RECONCILING_DDL= 0x1,
129     WSREP_IGNORE_ERRORS_ON_RECONCILING_DML= 0x2,
130     WSREP_IGNORE_ERRORS_ON_DDL= 0x4,
131     WSREP_IGNORE_ERRORS_MAX= 0x7
132 };
133 
134 // Streaming Replication
135 #define WSREP_FRAG_BYTES      0
136 #define WSREP_FRAG_ROWS       1
137 #define WSREP_FRAG_STATEMENTS 2
138 
139 #define WSREP_SR_STORE_NONE   0
140 #define WSREP_SR_STORE_TABLE  1
141 
142 extern const char *wsrep_fragment_units[];
143 extern const char *wsrep_SR_store_types[];
144 
145 // MySQL status variables
146 extern my_bool     wsrep_connected;
147 extern my_bool     wsrep_ready;
148 extern const char* wsrep_cluster_state_uuid;
149 extern long long   wsrep_cluster_conf_id;
150 extern const char* wsrep_cluster_status;
151 extern long        wsrep_cluster_size;
152 extern long        wsrep_local_index;
153 extern long long   wsrep_local_bf_aborts;
154 extern const char* wsrep_provider_name;
155 extern const char* wsrep_provider_version;
156 extern const char* wsrep_provider_vendor;
157 extern char*       wsrep_provider_capabilities;
158 extern char*       wsrep_cluster_capabilities;
159 
160 int  wsrep_show_status(THD *thd, SHOW_VAR *var, void *buff,
161                        system_status_var *status_var, enum_var_type scope);
162 int  wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff);
163 void wsrep_free_status(THD *thd);
164 void wsrep_update_cluster_state_uuid(const char* str);
165 
166 /* Filters out --wsrep-new-cluster oprtion from argv[]
167  * should be called in the very beginning of main() */
168 void wsrep_filter_new_cluster (int* argc, char* argv[]);
169 
170 int  wsrep_init();
171 void wsrep_deinit(bool free_options);
172 
173 /* Initialize wsrep thread LOCKs and CONDs */
174 void wsrep_thr_init();
175 /* Destroy wsrep thread LOCKs and CONDs */
176 void wsrep_thr_deinit();
177 
178 void wsrep_recover();
179 bool wsrep_before_SE(); // initialize wsrep before storage
180                         // engines (true) or after (false)
181 /* wsrep initialization sequence at startup
182  * @param before wsrep_before_SE() value */
183 void wsrep_init_startup(bool before);
184 
185 /* Recover streaming transactions from fragment storage */
186 void wsrep_recover_sr_from_storage(THD *);
187 
188 // Other wsrep global variables
189 extern my_bool     wsrep_inited; // whether wsrep is initialized ?
190 
191 extern "C" void wsrep_fire_rollbacker(THD *thd);
192 extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
193 extern "C" time_t wsrep_thd_query_start(THD *thd);
194 extern void wsrep_close_client_connections(my_bool wait_to_end,
195                                            THD *except_caller_thd= NULL);
196 extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
197 extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
198 
199 extern int  wsrep_wait_committing_connections_close(int wait_time);
200 extern void wsrep_close_applier(THD *thd);
201 extern void wsrep_wait_appliers_close(THD *thd);
202 extern void wsrep_close_applier_threads(int count);
203 
204 
205 /* new defines */
206 extern void wsrep_stop_replication(THD *thd);
207 extern bool wsrep_start_replication(const char *wsrep_cluster_address);
208 extern void wsrep_shutdown_replication();
209 extern bool wsrep_must_sync_wait (THD* thd, uint mask= WSREP_SYNC_WAIT_BEFORE_READ);
210 extern bool wsrep_sync_wait (THD* thd, uint mask= WSREP_SYNC_WAIT_BEFORE_READ);
211 extern enum wsrep::provider::status
212 wsrep_sync_wait_upto (THD* thd, wsrep_gtid_t* upto, int timeout);
213 extern int  wsrep_check_opts();
214 extern void wsrep_prepend_PATH (const char* path);
215 extern bool wsrep_append_fk_parent_table(THD* thd, TABLE_LIST* table, wsrep::key_array* keys);
216 extern bool wsrep_reload_ssl();
217 
218 /* Other global variables */
219 extern wsrep_seqno_t wsrep_locked_seqno;
220 #define WSREP_ON unlikely(WSREP_ON_)
221 
222 /* use xxxxxx_NNULL macros when thd pointer is guaranteed to be non-null to
223  * avoid compiler warnings (GCC 6 and later) */
224 
225 #define WSREP_NNULL(thd) \
226   (WSREP_PROVIDER_EXISTS_ && thd->variables.wsrep_on)
227 
228 #define WSREP(thd) \
229   (thd && WSREP_NNULL(thd))
230 
231 #define WSREP_CLIENT_NNULL(thd) \
232   (WSREP_NNULL(thd) && thd->wsrep_client_thread)
233 
234 #define WSREP_CLIENT(thd) \
235     (WSREP(thd) && thd->wsrep_client_thread)
236 
237 #define WSREP_EMULATE_BINLOG_NNULL(thd) \
238   (WSREP_NNULL(thd) && wsrep_emulate_bin_log)
239 
240 #define WSREP_EMULATE_BINLOG(thd) \
241   (WSREP(thd) && wsrep_emulate_bin_log)
242 
243 #define WSREP_BINLOG_FORMAT(my_format)                         \
244    ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ?     \
245    wsrep_forced_binlog_format : my_format)
246 
247 /* A wrapper function for MySQL log functions. The call will prefix
248    the log message with WSREP and forward the result buffer to fun. */
249 void WSREP_LOG(void (*fun)(const char* fmt, ...), const char* fmt, ...);
250 
251 #define WSREP_DEBUG(...)                                                \
252     if (wsrep_debug)     WSREP_LOG(sql_print_information, ##__VA_ARGS__)
253 #define WSREP_INFO(...)  WSREP_LOG(sql_print_information, ##__VA_ARGS__)
254 #define WSREP_WARN(...)  WSREP_LOG(sql_print_warning,     ##__VA_ARGS__)
255 #define WSREP_ERROR(...) WSREP_LOG(sql_print_error,       ##__VA_ARGS__)
256 #define WSREP_UNKNOWN(fmt, ...) WSREP_ERROR("UNKNOWN: " fmt, ##__VA_ARGS__)
257 
258 #define WSREP_LOG_CONFLICT_THD(thd, role)                               \
259   WSREP_INFO("%s: \n "                                                  \
260              "  THD: %lu, mode: %s, state: %s, conflict: %s, seqno: %lld\n " \
261              "  SQL: %s",                                               \
262              role,                                                      \
263              thd_get_thread_id(thd),                                    \
264              wsrep_thd_client_mode_str(thd),                            \
265              wsrep_thd_client_state_str(thd),                           \
266              wsrep_thd_transaction_state_str(thd),                      \
267              wsrep_thd_trx_seqno(thd),                                  \
268              wsrep_thd_query(thd)                                       \
269             );
270 
271 #define WSREP_LOG_CONFLICT(bf_thd, victim_thd, bf_abort)                \
272   if (wsrep_debug || wsrep_log_conflicts)                               \
273   {                                                                     \
274     WSREP_INFO("cluster conflict due to %s for threads:",               \
275                (bf_abort) ? "high priority abort" : "certification failure" \
276               );                                                        \
277     if (bf_thd)     WSREP_LOG_CONFLICT_THD(bf_thd, "Winning thread");   \
278     if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \
279     WSREP_INFO("context: %s:%d", __FILE__, __LINE__); \
280   }
281 
282 #define WSREP_PROVIDER_EXISTS (WSREP_PROVIDER_EXISTS_)
283 
wsrep_cluster_address_exists()284 static inline bool wsrep_cluster_address_exists()
285 {
286   if (mysqld_server_started)
287     mysql_mutex_assert_owner(&LOCK_global_system_variables);
288   return wsrep_cluster_address && wsrep_cluster_address[0];
289 }
290 
291 extern my_bool wsrep_ready_get();
292 extern void wsrep_ready_wait();
293 
294 class Ha_trx_info;
295 struct THD_TRANS;
296 
297 extern mysql_mutex_t LOCK_wsrep_ready;
298 extern mysql_cond_t  COND_wsrep_ready;
299 extern mysql_mutex_t LOCK_wsrep_sst;
300 extern mysql_cond_t  COND_wsrep_sst;
301 extern mysql_mutex_t LOCK_wsrep_sst_init;
302 extern mysql_cond_t  COND_wsrep_sst_init;
303 extern int wsrep_replaying;
304 extern mysql_mutex_t LOCK_wsrep_replaying;
305 extern mysql_cond_t  COND_wsrep_replaying;
306 extern mysql_mutex_t LOCK_wsrep_slave_threads;
307 extern mysql_cond_t  COND_wsrep_slave_threads;
308 extern mysql_mutex_t LOCK_wsrep_cluster_config;
309 extern mysql_mutex_t LOCK_wsrep_desync;
310 extern mysql_mutex_t LOCK_wsrep_SR_pool;
311 extern mysql_mutex_t LOCK_wsrep_SR_store;
312 extern mysql_mutex_t LOCK_wsrep_config_state;
313 extern mysql_mutex_t LOCK_wsrep_group_commit;
314 extern mysql_mutex_t LOCK_wsrep_joiner_monitor;
315 extern mysql_mutex_t LOCK_wsrep_donor_monitor;
316 extern mysql_cond_t  COND_wsrep_joiner_monitor;
317 extern mysql_cond_t  COND_wsrep_donor_monitor;
318 
319 extern my_bool       wsrep_emulate_bin_log;
320 extern int           wsrep_to_isolation;
321 #ifdef GTID_SUPPORT
322 extern rpl_sidno     wsrep_sidno;
323 #endif /* GTID_SUPPORT */
324 extern my_bool       wsrep_preordered_opt;
325 
326 #ifdef HAVE_PSI_INTERFACE
327 
328 extern PSI_cond_key  key_COND_wsrep_thd;
329 
330 extern PSI_mutex_key key_LOCK_wsrep_ready;
331 extern PSI_mutex_key key_COND_wsrep_ready;
332 extern PSI_mutex_key key_LOCK_wsrep_sst;
333 extern PSI_cond_key  key_COND_wsrep_sst;
334 extern PSI_mutex_key key_LOCK_wsrep_sst_init;
335 extern PSI_cond_key  key_COND_wsrep_sst_init;
336 extern PSI_mutex_key key_LOCK_wsrep_sst_thread;
337 extern PSI_cond_key  key_COND_wsrep_sst_thread;
338 extern PSI_mutex_key key_LOCK_wsrep_replaying;
339 extern PSI_cond_key  key_COND_wsrep_replaying;
340 extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
341 extern PSI_cond_key  key_COND_wsrep_slave_threads;
342 extern PSI_mutex_key key_LOCK_wsrep_cluster_config;
343 extern PSI_mutex_key key_LOCK_wsrep_desync;
344 extern PSI_mutex_key key_LOCK_wsrep_SR_pool;
345 extern PSI_mutex_key key_LOCK_wsrep_SR_store;
346 extern PSI_mutex_key key_LOCK_wsrep_global_seqno;
347 extern PSI_mutex_key key_LOCK_wsrep_thd_queue;
348 extern PSI_cond_key  key_COND_wsrep_thd_queue;
349 extern PSI_mutex_key key_LOCK_wsrep_joiner_monitor;
350 extern PSI_mutex_key key_LOCK_wsrep_donor_monitor;
351 
352 extern PSI_file_key key_file_wsrep_gra_log;
353 
354 extern PSI_thread_key key_wsrep_sst_joiner;
355 extern PSI_thread_key key_wsrep_sst_donor;
356 extern PSI_thread_key key_wsrep_rollbacker;
357 extern PSI_thread_key key_wsrep_applier;
358 extern PSI_thread_key key_wsrep_sst_joiner_monitor;
359 extern PSI_thread_key key_wsrep_sst_donor_monitor;
360 #endif /* HAVE_PSI_INTERFACE */
361 
362 
363 struct TABLE_LIST;
364 class Alter_info;
365 int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
366                              const TABLE_LIST* table_list,
367                              Alter_info* alter_info= NULL, wsrep::key_array *fk_tables=NULL);
368 
369 void wsrep_to_isolation_end(THD *thd);
370 
371 bool wsrep_append_SR_keys(THD *thd);
372 int wsrep_to_buf_helper(
373   THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len);
374 int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
375 int wsrep_create_event_query(THD *thd, uchar** buf, size_t* buf_len);
376 
377 void wsrep_init_sidno(const wsrep_uuid_t&);
378 bool wsrep_node_is_donor();
379 bool wsrep_node_is_synced();
380 
381 void wsrep_init_SR();
382 void wsrep_verify_SE_checkpoint(const wsrep_uuid_t& uuid, wsrep_seqno_t seqno);
383 int wsrep_replay_from_SR_store(THD*, const wsrep_trx_meta_t&);
384 
385 class Log_event;
386 int wsrep_ignored_error_code(Log_event* ev, int error);
387 int wsrep_must_ignore_error(THD* thd);
388 
389 bool wsrep_replicate_GTID(THD* thd);
390 
391 typedef struct wsrep_key_arr
392 {
393     wsrep_key_t* keys;
394     size_t       keys_len;
395 } wsrep_key_arr_t;
396 bool wsrep_prepare_keys_for_isolation(THD*              thd,
397                                       const char*       db,
398                                       const char*       table,
399                                       const TABLE_LIST* table_list,
400                                       wsrep_key_arr_t*  ka);
401 void wsrep_keys_free(wsrep_key_arr_t* key_arr);
402 
403 extern void
404 wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
405                           MDL_ticket *ticket,
406                           const MDL_key *key);
407 
408 enum wsrep_thread_type {
409   WSREP_APPLIER_THREAD=1,
410   WSREP_ROLLBACKER_THREAD=2
411 };
412 
413 typedef void (*wsrep_thd_processor_fun)(THD*, void *);
414 class Wsrep_thd_args
415 {
416  public:
Wsrep_thd_args(wsrep_thd_processor_fun fun,wsrep_thread_type thread_type,pthread_t thread_id)417  Wsrep_thd_args(wsrep_thd_processor_fun fun,
418                 wsrep_thread_type thread_type,
419                 pthread_t thread_id)
420    :
421   fun_ (fun),
422   thread_type_ (thread_type),
423   thread_id_ (thread_id)
424   { }
425 
fun()426   wsrep_thd_processor_fun fun() { return fun_; }
thread_id()427   pthread_t* thread_id() {return &thread_id_; }
thread_type()428   enum wsrep_thread_type thread_type() {return thread_type_;}
429 
430  private:
431 
432   Wsrep_thd_args(const Wsrep_thd_args&);
433   Wsrep_thd_args& operator=(const Wsrep_thd_args&);
434 
435   wsrep_thd_processor_fun fun_;
436   enum wsrep_thread_type  thread_type_;
437   pthread_t thread_id_;
438 };
439 
440 void* start_wsrep_THD(void*);
441 
442 void wsrep_close_threads(THD *thd);
443 bool wsrep_is_show_query(enum enum_sql_command command);
444 void wsrep_replay_transaction(THD *thd);
445 bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
446                              TABLE_LIST* src_table,
447                              HA_CREATE_INFO *create_info);
448 bool wsrep_node_is_donor();
449 bool wsrep_node_is_synced();
450 
451 /**
452  * Check if the wsrep provider (ie the Galera library) is capable of
453  * doing streaming replication.
454  * @return true if SR capable
455  */
456 bool wsrep_provider_is_SR_capable();
457 
458 /**
459  * Initialize WSREP server instance.
460  *
461  * @return Zero on success, non-zero on error.
462  */
463 int wsrep_init_server();
464 
465 /**
466  * Initialize WSREP globals. This should be done after server initialization
467  * is complete and the server has joined to the cluster.
468  *
469  */
470 void wsrep_init_globals();
471 
472 /**
473  * Deinit and release WSREP resources.
474  */
475 void wsrep_deinit_server();
476 
477 /**
478  * Convert streaming fragment unit (WSREP_FRAG_BYTES, WSREP_FRAG_ROWS...)
479  * to corresponding wsrep-lib fragment_unit
480  */
481 enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit);
482 
483 wsrep::key wsrep_prepare_key_for_toi(const char* db, const char* table,
484                                      enum wsrep::key::type type);
485 
486 #else /* !WITH_WSREP */
487 
488 /* These macros are needed to compile MariaDB without WSREP support
489  * (e.g. embedded) */
490 
491 #define WSREP_ON false
492 #define WSREP(T)  (0)
493 #define WSREP_NNULL(T) (0)
494 #define WSREP_EMULATE_BINLOG(thd) (0)
495 #define WSREP_EMULATE_BINLOG_NNULL(thd) (0)
496 #define WSREP_BINLOG_FORMAT(my_format) ((ulong)my_format)
497 #define WSREP_PROVIDER_EXISTS (0)
498 #define wsrep_emulate_bin_log (0)
499 #define wsrep_to_isolation (0)
500 #define wsrep_before_SE() (0)
501 #define wsrep_init_startup(X)
502 #define wsrep_check_opts() (0)
503 #define wsrep_thr_init() do {} while(0)
504 #define wsrep_thr_deinit() do {} while(0)
505 #define wsrep_init_globals() do {} while(0)
506 #define wsrep_create_appliers(X) do {} while(0)
507 #define wsrep_cluster_address_exists() (false)
508 
509 #endif /* WITH_WSREP */
510 
511 #endif /* WSREP_MYSQLD_H */
512