1 /* Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/rpl_rli.h"
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/stat.h>
29 #include <algorithm>
30 #include <regex>
31
32 #include "libbinlogevents/include/binlog_event.h"
33 #include "m_ctype.h"
34 #include "mutex_lock.h" // Mutex_lock
35 #include "my_bitmap.h"
36 #include "my_dbug.h"
37 #include "my_dir.h" // MY_STAT
38 #include "my_sqlcommand.h"
39 #include "my_systime.h"
40 #include "my_thread.h"
41 #include "mysql/components/services/log_builtins.h"
42 #include "mysql/components/services/psi_stage_bits.h"
43 #include "mysql/plugin.h"
44 #include "mysql/psi/mysql_cond.h"
45 #include "mysql/psi/mysql_file.h"
46 #include "mysql/psi/psi_base.h"
47 #include "mysql/service_mysql_alloc.h"
48 #include "mysql/service_thd_wait.h"
49 #include "mysql_com.h"
50 #include "mysqld_error.h"
51 #include "sql/auth/auth_acls.h" // SUPER_ACL
52 #include "sql/auth/roles.h" // Roles::Role_activation
53 #include "sql/auth/sql_auth_cache.h"
54 #include "sql/debug_sync.h"
55 #include "sql/derror.h"
56 #include "sql/log_event.h" // Log_event
57 #include "sql/mdl.h"
58 #include "sql/mysqld.h" // sync_relaylog_period ...
59 #include "sql/protocol.h"
60 #include "sql/rpl_info_factory.h" // Rpl_info_factory
61 #include "sql/rpl_info_handler.h"
62 #include "sql/rpl_mi.h" // Master_info
63 #include "sql/rpl_msr.h" // channel_map
64 #include "sql/rpl_reporting.h"
65 #include "sql/rpl_rli_pdb.h" // Slave_worker
66 #include "sql/rpl_slave.h"
67 #include "sql/rpl_trx_boundary_parser.h"
68 #include "sql/sql_base.h" // close_thread_tables
69 #include "sql/sql_error.h"
70 #include "sql/sql_lex.h"
71 #include "sql/sql_list.h"
72 #include "sql/sql_plugin.h"
73 #include "sql/strfunc.h" // strconvert
74 #include "sql/transaction.h" // trans_commit_stmt
75 #include "sql/transaction_info.h"
76 #include "sql/xa.h"
77 #include "sql_string.h"
78 #include "thr_mutex.h"
79
80 class Item;
81
82 using std::max;
83 using std::min;
84
85 /*
86 Please every time you add a new field to the relay log info, update
87 what follows. For now, this is just used to get the number of
88 fields.
89 */
90 const char *info_rli_fields[] = {"number_of_lines",
91 "group_relay_log_name",
92 "group_relay_log_pos",
93 "group_master_log_name",
94 "group_master_log_pos",
95 "sql_delay",
96 "number_of_workers",
97 "id",
98 "channel_name",
99 "privilege_checks_user",
100 "privilege_checks_hostname",
101 "require_row_format",
102 "require_table_primary_key_check"};
103
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel,bool is_rli_fake)104 Relay_log_info::Relay_log_info(bool is_slave_recovery,
105 #ifdef HAVE_PSI_INTERFACE
106 PSI_mutex_key *param_key_info_run_lock,
107 PSI_mutex_key *param_key_info_data_lock,
108 PSI_mutex_key *param_key_info_sleep_lock,
109 PSI_mutex_key *param_key_info_thd_lock,
110 PSI_mutex_key *param_key_info_data_cond,
111 PSI_mutex_key *param_key_info_start_cond,
112 PSI_mutex_key *param_key_info_stop_cond,
113 PSI_mutex_key *param_key_info_sleep_cond,
114 #endif
115 uint param_id, const char *param_channel,
116 bool is_rli_fake)
117 : Rpl_info("SQL",
118 #ifdef HAVE_PSI_INTERFACE
119 param_key_info_run_lock, param_key_info_data_lock,
120 param_key_info_sleep_lock, param_key_info_thd_lock,
121 param_key_info_data_cond, param_key_info_start_cond,
122 param_key_info_stop_cond, param_key_info_sleep_cond,
123 #endif
124 param_id, param_channel),
125 replicate_same_server_id(::replicate_same_server_id),
126 relay_log(&sync_relaylog_period, true),
127 is_relay_log_recovery(is_slave_recovery),
128 save_temporary_tables(nullptr),
129 mi(nullptr),
130 error_on_rli_init_info(false),
131 gtid_timestamps_warning_logged(false),
132 transaction_parser(
133 Transaction_boundary_parser::TRX_BOUNDARY_PARSER_APPLIER),
134 group_relay_log_pos(0),
135 event_relay_log_number(0),
136 event_relay_log_pos(0),
137 event_start_pos(0),
138 group_master_log_pos(0),
139 gtid_set(nullptr),
140 rli_fake(is_rli_fake),
141 gtid_retrieved_initialized(false),
142 m_privilege_checks_username{""},
143 m_privilege_checks_hostname{""},
144 m_privilege_checks_user_corrupted{false},
145 m_require_row_format(false),
146 m_require_table_primary_key_check(PK_CHECK_STREAM),
147 is_group_master_log_pos_invalid(false),
148 log_space_total(0),
149 ignore_log_space_limit(false),
150 sql_force_rotate_relay(false),
151 last_master_timestamp(0),
152 slave_skip_counter(0),
153 abort_pos_wait(0),
154 until_condition(UNTIL_NONE),
155 trans_retries(0),
156 retried_trans(0),
157 tables_to_lock(nullptr),
158 tables_to_lock_count(0),
159 rows_query_ev(nullptr),
160 last_event_start_time(0),
161 deferred_events(nullptr),
162 workers(PSI_NOT_INSTRUMENTED),
163 workers_array_initialized(false),
164 curr_group_assigned_parts(PSI_NOT_INSTRUMENTED),
165 curr_group_da(PSI_NOT_INSTRUMENTED),
166 curr_group_seen_begin(false),
167 mts_end_group_sets_max_dbs(false),
168 slave_parallel_workers(0),
169 exit_counter(0),
170 max_updated_index(0),
171 recovery_parallel_workers(0),
172 rli_checkpoint_seqno(0),
173 checkpoint_group(opt_mts_checkpoint_group),
174 recovery_groups_inited(false),
175 mts_recovery_group_cnt(0),
176 mts_recovery_index(0),
177 mts_recovery_group_seen_begin(false),
178 mts_group_status(MTS_NOT_IN_GROUP),
179 stats_exec_time(0),
180 stats_read_time(0),
181 least_occupied_workers(PSI_NOT_INSTRUMENTED),
182 current_mts_submode(nullptr),
183 reported_unsafe_warning(false),
184 rli_description_event(nullptr),
185 commit_order_mngr(nullptr),
186 sql_delay(0),
187 sql_delay_end(0),
188 m_flags(0),
189 row_stmt_start_timestamp(0),
190 long_find_row_note_printed(false),
191 thd_tx_priority(0),
192 is_engine_ha_data_detached(false),
193 current_event(nullptr),
194 ddl_not_atomic(false) {
195 DBUG_TRACE;
196
197 #ifdef HAVE_PSI_INTERFACE
198 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, key_RELAYLOG_LOCK_commit,
199 key_RELAYLOG_LOCK_commit_queue, key_RELAYLOG_LOCK_done,
200 key_RELAYLOG_LOCK_flush_queue, key_RELAYLOG_LOCK_log,
201 key_RELAYLOG_LOCK_log_end_pos, key_RELAYLOG_LOCK_sync,
202 key_RELAYLOG_LOCK_sync_queue, key_RELAYLOG_LOCK_xids,
203 key_RELAYLOG_COND_done, key_RELAYLOG_update_cond,
204 key_RELAYLOG_prep_xids_cond, key_file_relaylog,
205 key_file_relaylog_index, key_file_relaylog_cache,
206 key_file_relaylog_index_cache);
207 #endif
208
209 group_relay_log_name[0] = event_relay_log_name[0] = group_master_log_name[0] =
210 0;
211 ign_master_log_name_end[0] = 0;
212 set_timespec_nsec(&last_clock, 0);
213 cached_charset_invalidate();
214 inited_hash_workers = false;
215 commit_timestamps_status = COMMIT_TS_UNKNOWN;
216
217 if (!rli_fake) {
218 mysql_mutex_init(key_relay_log_info_log_space_lock, &log_space_lock,
219 MY_MUTEX_INIT_FAST);
220 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond);
221 mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
222 MY_MUTEX_INIT_FAST);
223 mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond);
224 mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
225 MY_MUTEX_INIT_FAST);
226 mysql_mutex_init(key_mts_temp_table_LOCK, &mts_temp_table_LOCK,
227 MY_MUTEX_INIT_FAST);
228 mysql_mutex_init(key_mts_gaq_LOCK, &mts_gaq_LOCK, MY_MUTEX_INIT_FAST);
229 mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
230
231 relay_log.init_pthread_objects();
232 force_flush_postponed_due_to_split_trans = false;
233
234 Checkable_rwlock *sid_lock = new Checkable_rwlock(
235 #if defined(HAVE_PSI_INTERFACE)
236 key_rwlock_receiver_sid_lock
237 #endif
238 );
239 Sid_map *sid_map = new Sid_map(sid_lock);
240 gtid_set = new Gtid_set(sid_map, sid_lock);
241
242 /*
243 Group replication applier channel shall not use checksum on its relay
244 log files.
245 */
246 if (channel_map.is_group_replication_channel_name(param_channel, true)) {
247 relay_log.relay_log_checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
248 }
249 }
250 gtid_monitoring_info = new Gtid_monitoring_info();
251 do_server_version_split(::server_version, slave_version_split);
252 until_option = nullptr;
253 rpl_filter = nullptr;
254 }
255
256 /**
257 The method to invoke at slave threads start
258 */
init_workers(ulong n_workers)259 void Relay_log_info::init_workers(ulong n_workers) {
260 /*
261 Parallel slave parameters initialization is done regardless
262 whether the feature is or going to be active or not.
263 */
264 mts_groups_assigned = mts_events_assigned = pending_jobs = wq_size_waits_cnt =
265 0;
266 mts_wq_excess_cnt = mts_wq_no_underrun_cnt = mts_wq_overfill_cnt = 0;
267 mts_total_wait_overlap = 0;
268 mts_total_wait_worker_avail = 0;
269 mts_last_online_stat = 0;
270
271 workers.reserve(n_workers);
272 workers_array_initialized = true; // set after init
273 }
274
275 /**
276 The method to invoke at slave threads stop
277 */
deinit_workers()278 void Relay_log_info::deinit_workers() { workers.clear(); }
279
~Relay_log_info()280 Relay_log_info::~Relay_log_info() {
281 DBUG_TRACE;
282
283 if (!rli_fake) {
284 if (recovery_groups_inited) bitmap_free(&recovery_groups);
285 delete current_mts_submode;
286
287 if (rpl_filter != nullptr) {
288 /* Remove the channel's replication filter from rpl_channel_filters. */
289 rpl_channel_filters.delete_filter(rpl_filter);
290 rpl_filter = nullptr;
291 }
292
293 if (workers_copy_pfs.size()) {
294 for (int i = static_cast<int>(workers_copy_pfs.size()) - 1; i >= 0; i--)
295 delete workers_copy_pfs[i];
296 workers_copy_pfs.clear();
297 }
298
299 mysql_mutex_destroy(&log_space_lock);
300 mysql_cond_destroy(&log_space_cond);
301 mysql_mutex_destroy(&pending_jobs_lock);
302 mysql_cond_destroy(&pending_jobs_cond);
303 mysql_mutex_destroy(&exit_count_lock);
304 mysql_mutex_destroy(&mts_temp_table_LOCK);
305 mysql_mutex_destroy(&mts_gaq_LOCK);
306 mysql_cond_destroy(&logical_clock_cond);
307 relay_log.cleanup();
308
309 Sid_map *sid_map = gtid_set->get_sid_map();
310 Checkable_rwlock *sid_lock = sid_map->get_sid_lock();
311
312 sid_lock->wrlock();
313 gtid_set->clear();
314 sid_map->clear();
315 delete gtid_set;
316 delete sid_map;
317 sid_lock->unlock();
318 delete sid_lock;
319 }
320
321 if (set_rli_description_event(nullptr)) {
322 #ifndef DBUG_OFF
323 bool set_rli_description_event_failed{false};
324 #endif
325 DBUG_ASSERT(set_rli_description_event_failed);
326 }
327 delete until_option;
328 delete gtid_monitoring_info;
329 }
330
331 /**
332 Method is called when MTS coordinator senses the relay-log name
333 has been changed.
334 It marks each Worker member with this fact to make an action
335 at time it will distribute a terminal event of a group to the Worker.
336
337 Worker receives the new name at the group commiting phase
338 @c Slave_worker::slave_worker_ends_group().
339 */
reset_notified_relay_log_change()340 void Relay_log_info::reset_notified_relay_log_change() {
341 if (!is_parallel_exec()) return;
342 for (Slave_worker **it = workers.begin(); it != workers.end(); ++it) {
343 Slave_worker *w = *it;
344 w->relay_log_change_notified = false;
345 }
346 }
347
348 /**
349 This method is called in mts_checkpoint_routine() to mark that each
350 worker is required to adapt to a new checkpoint data whose coordinates
351 are passed to it through GAQ index.
352
353 Worker notices the new checkpoint value at the group commit to reset
354 the current bitmap and starts using the clean bitmap indexed from zero
355 of being reset rli_checkpoint_seqno.
356
357 New seconds_behind_master timestamp is installed.
358
359 @param shift number of bits to shift by Worker due to the
360 current checkpoint change.
361 @param new_ts new seconds_behind_master timestamp value
362 unless zero. Zero could be due to FD event
363 or fake rotate event.
364 @param update_timestamp if true, this function will update the
365 rli->last_master_timestamp.
366 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool update_timestamp)367 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
368 bool update_timestamp) {
369 /*
370 If this is not a parallel execution we return immediately.
371 */
372 if (!is_parallel_exec()) return;
373
374 for (Slave_worker **it = workers.begin(); it != workers.end(); ++it) {
375 Slave_worker *w = *it;
376 /*
377 Reseting the notification information in order to force workers to
378 assign jobs with the new updated information.
379 Notice that the bitmap_shifted is accumulated to indicate how many
380 consecutive jobs were successfully processed.
381
382 The worker when assigning a new job will set the value back to
383 zero.
384 */
385 w->checkpoint_notified = false;
386 w->bitmap_shifted = w->bitmap_shifted + shift;
387 /*
388 Zero shift indicates the caller rotates the master binlog.
389 The new name will be passed to W through the group descriptor
390 during the first post-rotation time scheduling.
391 */
392 if (shift == 0) w->master_log_change_notified = false;
393
394 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
395 "worker->bitmap_shifted --> %lu, worker --> %u.",
396 shift, w->bitmap_shifted,
397 static_cast<unsigned>(it - workers.begin())));
398 }
399 /*
400 There should not be a call where (shift == 0 && rli_checkpoint_seqno != 0).
401 Then the new checkpoint sequence is updated by subtracting the number
402 of consecutive jobs that were successfully processed.
403 */
404 DBUG_ASSERT(current_mts_submode->get_type() != MTS_PARALLEL_TYPE_DB_NAME ||
405 !(shift == 0 && rli_checkpoint_seqno != 0));
406 rli_checkpoint_seqno = rli_checkpoint_seqno - shift;
407 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
408 "rli_checkpoint_seqno --> %u.",
409 shift, rli_checkpoint_seqno));
410
411 if (update_timestamp) {
412 mysql_mutex_lock(&data_lock);
413 last_master_timestamp = new_ts;
414 mysql_mutex_unlock(&data_lock);
415 }
416 }
417
418 /**
419 Reset recovery info from Worker info table and
420 mark MTS recovery is completed.
421
422 @return false on success true when @c reset_notified_checkpoint failed.
423 */
mts_finalize_recovery()424 bool Relay_log_info::mts_finalize_recovery() {
425 bool ret = false;
426 uint i;
427 uint repo_type = get_rpl_info_handler()->get_rpl_info_type();
428
429 DBUG_TRACE;
430
431 for (Slave_worker **it = workers.begin(); !ret && it != workers.end(); ++it) {
432 Slave_worker *w = *it;
433 ret = w->reset_recovery_info();
434 DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret = true;);
435 }
436 /*
437 The loop is traversed in the worker index descending order due
438 to specifics of the Worker table repository that does not like
439 even temporary holes. Therefore stale records are deleted
440 from the tail.
441 */
442 DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
443 { DBUG_SET("+d,mts_worker_thread_init_fails"); });
444 for (i = recovery_parallel_workers; i > workers.size() && !ret; i--) {
445 Slave_worker *w =
446 Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
447 /*
448 If an error occurs during the above create_worker call, the newly created
449 worker object gets deleted within the above function call itself and only
450 NULL is returned. Hence the following check has been added to verify
451 that a valid worker object exists.
452 */
453 if (w) {
454 ret = w->remove_info();
455 delete w;
456 } else {
457 ret = true;
458 goto err;
459 }
460 }
461 recovery_parallel_workers = slave_parallel_workers;
462
463 err:
464 return ret;
465 }
466
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)467 static inline int add_relay_log(Relay_log_info *rli, LOG_INFO *linfo) {
468 MY_STAT s;
469 DBUG_TRACE;
470 mysql_mutex_assert_owner(&rli->log_space_lock);
471 if (!mysql_file_stat(key_file_relaylog, linfo->log_file_name, &s, MYF(0))) {
472 LogErr(ERROR_LEVEL, ER_RPL_FAILED_TO_STAT_LOG_IN_INDEX,
473 linfo->log_file_name);
474 return 1;
475 }
476 rli->log_space_total += s.st_size;
477 #ifndef DBUG_OFF
478 char buf[22];
479 DBUG_PRINT("info", ("log_space_total: %s", llstr(rli->log_space_total, buf)));
480 #endif
481 return 0;
482 }
483
count_relay_log_space()484 int Relay_log_info::count_relay_log_space() {
485 LOG_INFO flinfo;
486 DBUG_TRACE;
487 MUTEX_LOCK(lock, &log_space_lock);
488 log_space_total = 0;
489 if (relay_log.find_log_pos(&flinfo, NullS, true)) {
490 LogErr(ERROR_LEVEL, ER_RPL_LOG_NOT_FOUND_WHILE_COUNTING_RELAY_LOG_SPACE);
491 return 1;
492 }
493 do {
494 if (add_relay_log(this, &flinfo)) return 1;
495 } while (!relay_log.find_next_log(&flinfo, true));
496 /*
497 As we have counted everything, including what may have written in a
498 preceding write, we must reset bytes_written, or we may count some space
499 twice.
500 */
501 relay_log.reset_bytes_written();
502 return 0;
503 }
504
reset_group_relay_log_pos(const char ** errmsg)505 bool Relay_log_info::reset_group_relay_log_pos(const char **errmsg) {
506 LOG_INFO linfo;
507
508 mysql_mutex_assert_owner(&data_lock);
509
510 if (relay_log.find_log_pos(&linfo, NullS, true)) {
511 *errmsg = "Could not find first log during relay log initialization";
512 return true;
513 }
514 set_group_relay_log_name(linfo.log_file_name);
515 group_relay_log_pos = BIN_LOG_HEADER_SIZE;
516 return false;
517 }
518
is_group_relay_log_name_invalid(const char ** errmsg)519 bool Relay_log_info::is_group_relay_log_name_invalid(const char **errmsg) {
520 DBUG_TRACE;
521 const char *errmsg_fmt = nullptr;
522 static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
523 LOG_INFO linfo;
524
525 *errmsg = nullptr;
526 if (relay_log.find_log_pos(&linfo, group_relay_log_name, true)) {
527 errmsg_fmt =
528 "Could not find target log file mentioned in "
529 "relay log info in the index file '%s' during "
530 "relay log initialization";
531 sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
532 *errmsg = errmsg_buff;
533 return true;
534 }
535 return false;
536 }
537
538 /**
539 Update the error number, message and timestamp fields. This function is
540 different from va_report() as va_report() also logs the error message in the
541 log apart from updating the error fields.
542
543 SYNOPSIS
544 @param[in] level specifies the level- error, warning or information,
545 @param[in] err_code error number,
546 @param[in] buff_coord error message to be used.
547
548 */
fill_coord_err_buf(loglevel level,int err_code,const char * buff_coord) const549 void Relay_log_info::fill_coord_err_buf(loglevel level, int err_code,
550 const char *buff_coord) const {
551 mysql_mutex_lock(&err_lock);
552
553 if (level == ERROR_LEVEL) {
554 m_last_error.number = err_code;
555 snprintf(m_last_error.message, sizeof(m_last_error.message), "%.*s",
556 MAX_SLAVE_ERRMSG - 1, buff_coord);
557 m_last_error.update_timestamp();
558 }
559
560 mysql_mutex_unlock(&err_lock);
561 }
562
563 /**
564 Waits until the SQL thread reaches (has executed up to) the
565 log/position or timed out.
566
567 SYNOPSIS
568 @param[in] thd client thread that sent @c SELECT @c
569 MASTER_POS_WAIT,
570 @param[in] log_name log name to wait for,
571 @param[in] log_pos position to wait for,
572 @param[in] timeout @c timeout in seconds before giving up waiting.
573 @c timeout is double whereas it should be ulong;
574 but this is to catch if the user submitted a negative timeout.
575
576 @retval -2 improper arguments (log_pos<0)
577 or slave not running, or master info changed
578 during the function's execution,
579 or client thread killed. -2 is translated to NULL by caller,
580 @retval -1 timed out
581 @retval >=0 number of log events the function had to wait
582 before reaching the desired log/position
583 */
584
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)585 int Relay_log_info::wait_for_pos(THD *thd, String *log_name, longlong log_pos,
586 double timeout) {
587 int event_count = 0;
588 ulong init_abort_pos_wait;
589 int error = 0;
590 struct timespec abstime; // for timeout checking
591 PSI_stage_info old_stage;
592 DBUG_TRACE;
593
594 if (!inited) return -2;
595
596 DBUG_PRINT("enter", ("log_name: '%s' log_pos: %lu timeout: %lu",
597 log_name->c_ptr_safe(), (ulong)log_pos, (ulong)timeout));
598
599 DEBUG_SYNC(thd, "begin_master_pos_wait");
600
601 set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
602 mysql_mutex_lock(&data_lock);
603 thd->ENTER_COND(&data_cond, &data_lock,
604 &stage_waiting_for_the_slave_thread_to_advance_position,
605 &old_stage);
606 /*
607 This function will abort when it notices that some CHANGE MASTER or
608 RESET MASTER has changed the master info.
609 To catch this, these commands modify abort_pos_wait ; We just monitor
610 abort_pos_wait and see if it has changed.
611 Why do we have this mechanism instead of simply monitoring slave_running
612 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
613 the SQL thread be stopped?
614 This is becasue if someones does:
615 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
616 the change may happen very quickly and we may not notice that
617 slave_running briefly switches between 1/0/1.
618 */
619 init_abort_pos_wait = abort_pos_wait;
620
621 /*
622 We'll need to
623 handle all possible log names comparisons (e.g. 999 vs 1000).
624 We use ulong for string->number conversion ; this is no
625 stronger limitation than in find_uniq_filename in sql/log.cc
626 */
627 ulong log_name_extension;
628 char log_name_tmp[FN_REFLEN]; // make a char[] from String
629
630 strmake(log_name_tmp, log_name->ptr(),
631 min<uint32>(log_name->length(), FN_REFLEN - 1));
632
633 const char *p = fn_ext(log_name_tmp);
634 char *p_end;
635 if (!*p || log_pos < 0) {
636 error = -2; // means improper arguments
637 goto err;
638 }
639 // Convert 0-3 to 4
640 log_pos = max(log_pos, static_cast<longlong>(BIN_LOG_HEADER_SIZE));
641 /* p points to '.' */
642 log_name_extension = strtoul(++p, &p_end, 10);
643 /*
644 p_end points to the first invalid character.
645 If it equals to p, no digits were found, error.
646 If it contains '\0' it means conversion went ok.
647 */
648 if (p_end == p || *p_end) {
649 error = -2;
650 goto err;
651 }
652
653 /* The "compare and wait" main loop */
654 while (!thd->killed && init_abort_pos_wait == abort_pos_wait &&
655 slave_running) {
656 bool pos_reached;
657 int cmp_result = 0;
658
659 DBUG_PRINT("info", ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
660 init_abort_pos_wait, abort_pos_wait));
661 DBUG_PRINT("info", ("group_master_log_name: '%s' pos: %lu",
662 group_master_log_name, (ulong)group_master_log_pos));
663
664 /*
665 group_master_log_name can be "", if we are just after a fresh
666 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
667 (before we have executed one Rotate event from the master) or
668 (rare) if the user is doing a weird slave setup (see next
669 paragraph). If group_master_log_name is "", we assume we don't
670 have enough info to do the comparison yet, so we just wait until
671 more data. In this case master_log_pos is always 0 except if
672 somebody (wrongly) sets this slave to be a slave of itself
673 without using --replicate-same-server-id (an unsupported
674 configuration which does nothing), then group_master_log_pos
675 will grow and group_master_log_name will stay "".
676 Also in case the group master log position is invalid (e.g. after
677 CHANGE MASTER TO RELAY_LOG_POS ), we will wait till the first event
678 is read and the log position is valid again.
679 */
680 if (*group_master_log_name && !is_group_master_log_pos_invalid) {
681 char *basename =
682 (group_master_log_name + dirname_length(group_master_log_name));
683 /*
684 First compare the parts before the extension.
685 Find the dot in the master's log basename,
686 and protect against user's input error :
687 if the names do not match up to '.' included, return error
688 */
689 const char *q = fn_ext(basename) + 1;
690 if (strncmp(basename, log_name_tmp, (int)(q - basename))) {
691 error = -2;
692 break;
693 }
694 // Now compare extensions.
695 char *q_end;
696 ulong group_master_log_name_extension = strtoul(q, &q_end, 10);
697 if (group_master_log_name_extension < log_name_extension)
698 cmp_result = -1;
699 else
700 cmp_result =
701 (group_master_log_name_extension > log_name_extension) ? 1 : 0;
702
703 pos_reached =
704 ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
705 cmp_result > 0);
706 if (pos_reached || thd->killed) break;
707 }
708
709 // wait for master update, with optional timeout.
710
711 DBUG_PRINT("info", ("Waiting for master update"));
712 /*
713 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
714 will wake us up.
715 */
716 thd_wait_begin(thd, THD_WAIT_BINLOG);
717 if (timeout > 0) {
718 /*
719 Note that mysql_cond_timedwait checks for the timeout
720 before for the condition ; i.e. it returns ETIMEDOUT
721 if the system time equals or exceeds the time specified by abstime
722 before the condition variable is signaled or broadcast, _or_ if
723 the absolute time specified by abstime has already passed at the time
724 of the call.
725 For that reason, mysql_cond_timedwait will do the "timeoutting" job
726 even if its condition is always immediately signaled (case of a loaded
727 master).
728 */
729 error = mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
730 } else
731 mysql_cond_wait(&data_cond, &data_lock);
732 thd_wait_end(thd);
733 DBUG_PRINT("info", ("Got signal of master update or timed out"));
734 if (is_timeout(error)) {
735 #ifndef DBUG_OFF
736 /*
737 Doing this to generate a stack trace and make debugging
738 easier.
739 */
740 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0)) DBUG_ASSERT(0);
741 #endif
742 error = -1;
743 break;
744 }
745 error = 0;
746 event_count++;
747 DBUG_PRINT("info", ("Testing if killed or SQL thread not running"));
748 }
749
750 err:
751 mysql_mutex_unlock(&data_lock);
752 thd->EXIT_COND(&old_stage);
753 DBUG_PRINT("exit",
754 ("killed: %d abort: %d slave_running: %d "
755 "improper_arguments: %d timed_out: %d",
756 thd->killed.load(), (int)(init_abort_pos_wait != abort_pos_wait),
757 (int)slave_running, (int)(error == -2), (int)(error == -1)));
758 if (thd->killed || init_abort_pos_wait != abort_pos_wait || !slave_running) {
759 error = -2;
760 }
761 return error ? error : event_count;
762 }
763
wait_for_gtid_set(THD * thd,const char * gtid,double timeout,bool update_THD_status)764 int Relay_log_info::wait_for_gtid_set(THD *thd, const char *gtid,
765 double timeout, bool update_THD_status) {
766 DBUG_TRACE;
767
768 DBUG_PRINT("info", ("Waiting for %s timeout %lf", gtid, timeout));
769
770 Gtid_set wait_gtid_set(global_sid_map);
771 global_sid_lock->rdlock();
772 enum_return_status ret = wait_gtid_set.add_gtid_text(gtid);
773 global_sid_lock->unlock();
774
775 if (ret != RETURN_STATUS_OK) {
776 DBUG_PRINT("exit", ("improper gtid argument"));
777 return -2;
778 }
779
780 return wait_for_gtid_set(thd, &wait_gtid_set, timeout, update_THD_status);
781 }
782
wait_for_gtid_set(THD * thd,String * gtid,double timeout,bool update_THD_status)783 int Relay_log_info::wait_for_gtid_set(THD *thd, String *gtid, double timeout,
784 bool update_THD_status) {
785 DBUG_TRACE;
786 return wait_for_gtid_set(thd, gtid->c_ptr_safe(), timeout, update_THD_status);
787 }
788
789 /*
790 TODO: This is a duplicated code that needs to be simplified.
791 This will be done while developing all possible sync options.
792 See WL#3584's specification.
793
794 /Alfranio
795 */
wait_for_gtid_set(THD * thd,const Gtid_set * wait_gtid_set,double timeout,bool update_THD_status)796 int Relay_log_info::wait_for_gtid_set(THD *thd, const Gtid_set *wait_gtid_set,
797 double timeout, bool update_THD_status) {
798 int event_count = 0;
799 ulong init_abort_pos_wait;
800 int error = 0;
801 struct timespec abstime; // for timeout checking
802 PSI_stage_info old_stage;
803 DBUG_TRACE;
804
805 if (!inited) return -2;
806
807 DEBUG_SYNC(thd, "begin_wait_for_gtid_set");
808
809 set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
810
811 mysql_mutex_lock(&data_lock);
812 if (update_THD_status)
813 thd->ENTER_COND(&data_cond, &data_lock,
814 &stage_waiting_for_the_slave_thread_to_advance_position,
815 &old_stage);
816 /*
817 This function will abort when it notices that some CHANGE MASTER or
818 RESET MASTER has changed the master info.
819 To catch this, these commands modify abort_pos_wait ; We just monitor
820 abort_pos_wait and see if it has changed.
821 Why do we have this mechanism instead of simply monitoring slave_running
822 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
823 the SQL thread be stopped?
824 This is becasue if someones does:
825 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
826 the change may happen very quickly and we may not notice that
827 slave_running briefly switches between 1/0/1.
828 */
829 init_abort_pos_wait = abort_pos_wait;
830
831 /* The "compare and wait" main loop */
832 while (!thd->killed && init_abort_pos_wait == abort_pos_wait &&
833 slave_running) {
834 DBUG_PRINT("info", ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
835 init_abort_pos_wait, abort_pos_wait));
836
837 // wait for master update, with optional timeout.
838
839 DBUG_ASSERT(wait_gtid_set->get_sid_map() == nullptr ||
840 wait_gtid_set->get_sid_map() == global_sid_map);
841
842 global_sid_lock->wrlock();
843 const Gtid_set *executed_gtids = gtid_state->get_executed_gtids();
844 const Owned_gtids *owned_gtids = gtid_state->get_owned_gtids();
845
846 #ifndef DBUG_OFF
847 char *wait_gtid_set_buf;
848 wait_gtid_set->to_string(&wait_gtid_set_buf);
849 DBUG_PRINT("info",
850 ("Waiting for '%s'. is_subset: %d and "
851 "!is_intersection_nonempty: %d",
852 wait_gtid_set_buf, wait_gtid_set->is_subset(executed_gtids),
853 !owned_gtids->is_intersection_nonempty(wait_gtid_set)));
854 my_free(wait_gtid_set_buf);
855 executed_gtids->dbug_print("gtid_executed:");
856 owned_gtids->dbug_print("owned_gtids:");
857 #endif
858
859 /*
860 Since commit is performed after log to binary log, we must also
861 check if any GTID of wait_gtid_set is not yet committed.
862 */
863 if (wait_gtid_set->is_subset(executed_gtids) &&
864 !owned_gtids->is_intersection_nonempty(wait_gtid_set)) {
865 global_sid_lock->unlock();
866 break;
867 }
868 global_sid_lock->unlock();
869
870 DBUG_PRINT("info", ("Waiting for master update"));
871
872 /*
873 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
874 will wake us up.
875 */
876 thd_wait_begin(thd, THD_WAIT_BINLOG);
877 if (timeout > 0) {
878 /*
879 Note that mysql_cond_timedwait checks for the timeout
880 before for the condition ; i.e. it returns ETIMEDOUT
881 if the system time equals or exceeds the time specified by abstime
882 before the condition variable is signaled or broadcast, _or_ if
883 the absolute time specified by abstime has already passed at the time
884 of the call.
885 For that reason, mysql_cond_timedwait will do the "timeoutting" job
886 even if its condition is always immediately signaled (case of a loaded
887 master).
888 */
889 error = mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
890 } else
891 mysql_cond_wait(&data_cond, &data_lock);
892 thd_wait_end(thd);
893 DBUG_PRINT("info", ("Got signal of master update or timed out"));
894 if (is_timeout(error)) {
895 #ifndef DBUG_OFF
896 /*
897 Doing this to generate a stack trace and make debugging
898 easier.
899 */
900 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0)) DBUG_ASSERT(0);
901 #endif
902 error = -1;
903 break;
904 }
905 error = 0;
906 event_count++;
907 DBUG_PRINT("info", ("Testing if killed or SQL thread not running"));
908 }
909
910 mysql_mutex_unlock(&data_lock);
911
912 if (update_THD_status) thd->EXIT_COND(&old_stage);
913 DBUG_PRINT("exit",
914 ("killed: %d abort: %d slave_running: %d "
915 "improper_arguments: %d timed_out: %d",
916 thd->killed.load(), (int)(init_abort_pos_wait != abort_pos_wait),
917 (int)slave_running, (int)(error == -2), (int)(error == -1)));
918 if (thd->killed || init_abort_pos_wait != abort_pos_wait || !slave_running) {
919 error = -2;
920 }
921 return error ? error : event_count;
922 }
923
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock,bool force)924 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
925 bool need_data_lock, bool force) {
926 int error = 0;
927 DBUG_TRACE;
928
929 if (need_data_lock)
930 mysql_mutex_lock(&data_lock);
931 else
932 mysql_mutex_assert_owner(&data_lock);
933
934 inc_event_relay_log_pos();
935 group_relay_log_pos = event_relay_log_pos;
936 strmake(group_relay_log_name, event_relay_log_name,
937 sizeof(group_relay_log_name) - 1);
938
939 /*
940 In 4.x we used the event's len to compute the positions here. This is
941 wrong if the event was 3.23/4.0 and has been converted to 5.0, because
942 then the event's len is not what is was in the master's binlog, so this
943 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
944 replication: Exec_master_log_pos is wrong). Only way to solve this is to
945 have the original offset of the end of the event the relay log. This is
946 what we do in 5.0: log_pos has become "end_log_pos" (because the real use
947 of log_pos in 4.0 was to compute the end_log_pos; so better to store
948 end_log_pos instead of begin_log_pos.
949 If we had not done this fix here, the problem would also have appeared
950 when the slave and master are 5.0 but with different event length (for
951 example the slave is more recent than the master and features the event
952 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
953 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
954 value which would lead to badly broken replication.
955 Even the relay_log_pos will be corrupted in this case, because the len is
956 the relay log is not "val".
957 With the end_log_pos solution, we avoid computations involving lengthes.
958 */
959 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", (long)log_pos,
960 (long)group_master_log_pos));
961
962 if (log_pos > 0) // 3.23 binlogs don't have log_posx
963 group_master_log_pos = log_pos;
964 /*
965 If the master log position was invalidiated by say, "CHANGE MASTER TO
966 RELAY_LOG_POS=N", it is now valid,
967 */
968 if (is_group_master_log_pos_invalid) is_group_master_log_pos_invalid = false;
969
970 /*
971 In MTS mode FD or Rotate event commit their solitary group to
972 Coordinator's info table. Callers make sure that Workers have been
973 executed all assignements.
974 Broadcast to master_pos_wait() waiters should be done after
975 the table is updated.
976 */
977 DBUG_ASSERT(!is_parallel_exec() ||
978 mts_group_status != Relay_log_info::MTS_IN_GROUP);
979 /*
980 We do not force synchronization at this point, except for Rotate event
981 (see Rotate_log_event::do_update_pos), note @c force is false by default,
982 because a non-transactional change is being committed.
983
984 For that reason, the synchronization here is subjected to
985 the option sync_relay_log_info.
986
987 See sql/rpl_rli.h for further information on this behavior.
988 */
989 error = flush_info(force);
990
991 mysql_cond_broadcast(&data_cond);
992 if (need_data_lock) mysql_mutex_unlock(&data_lock);
993 return error;
994 }
995
close_temporary_tables()996 void Relay_log_info::close_temporary_tables() {
997 TABLE *table, *next;
998 int num_closed_temp_tables = 0;
999 DBUG_TRACE;
1000
1001 for (table = save_temporary_tables; table; table = next) {
1002 next = table->next;
1003 /*
1004 Don't ask for disk deletion. For now, anyway they will be deleted when
1005 slave restarts, but it is a better intention to not delete them.
1006 */
1007 DBUG_PRINT("info", ("table: %p", table));
1008 close_temporary(nullptr, table, true, false);
1009 num_closed_temp_tables++;
1010 }
1011 save_temporary_tables = nullptr;
1012 atomic_slave_open_temp_tables -= num_closed_temp_tables;
1013 atomic_channel_open_temp_tables -= num_closed_temp_tables;
1014 }
1015
1016 /**
1017 Purges relay logs. It assumes to have a run lock on rli and that no
1018 slave thread are running.
1019
1020 @param[in] thd connection,
1021 @param[out] errmsg store pointer to an error message.
1022 @param[in] delete_only If true, do not start writing to a new log file.
1023
1024 @retval 0 successfully executed,
1025 @retval 1 otherwise error, where errmsg is set to point to the error message.
1026 */
1027
purge_relay_logs(THD * thd,const char ** errmsg,bool delete_only)1028 int Relay_log_info::purge_relay_logs(THD *thd, const char **errmsg,
1029 bool delete_only) {
1030 int error = 0;
1031 const char *ln;
1032 /* name of the index file if opt_relaylog_index_name is set*/
1033 const char *log_index_name;
1034 /*
1035 Buffer to add channel name suffix when relay-log-index option is
1036 provided
1037 */
1038 char relay_bin_index_channel[FN_REFLEN];
1039
1040 const char *ln_without_channel_name;
1041 /*
1042 Buffer to add channel name suffix when relay-log option is provided.
1043 */
1044 char relay_bin_channel[FN_REFLEN];
1045
1046 char buffer[FN_REFLEN];
1047
1048 mysql_mutex_t *log_lock = relay_log.get_log_lock();
1049
1050 DBUG_TRACE;
1051
1052 /*
1053 Even if inited==0, we still try to empty master_log_* variables. Indeed,
1054 inited==0 does not imply that they already are empty.
1055
1056 It could be that slave's info initialization partly succeeded: for example
1057 if relay-log.info existed but all relay logs have been manually removed,
1058 init_info reads the old relay-log.info and fills rli->master_log_*, then
1059 init_info checks for the existence of the relay log, this fails and
1060 init_info leaves inited to 0.
1061 In that pathological case, master_log_pos* will be properly reinited at
1062 the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1063 purge_relay_logs, will delete bogus *.info files or replace them with
1064 correct files), however if the user does SHOW SLAVE STATUS before START
1065 SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1066 master_log_* for SHOW SLAVE STATUS to display fine in any case.
1067 */
1068 group_master_log_name[0] = 0;
1069 group_master_log_pos = 0;
1070
1071 /*
1072 Following the the relay log purge, the master_log_pos will be in sync
1073 with relay_log_pos, so the flag should be cleared. Refer bug#11766010.
1074 */
1075
1076 is_group_master_log_pos_invalid = false;
1077
1078 if (!inited) {
1079 DBUG_PRINT("info", ("inited == 0"));
1080 if (error_on_rli_init_info ||
1081 /*
1082 mi->reset means that the channel was reset but still exists. Channel
1083 shall have the index and the first relay log file.
1084
1085 Those files shall be remove in a following RESET SLAVE ALL (even when
1086 channel was not inited again).
1087 */
1088 (mi->reset && delete_only)) {
1089 DBUG_ASSERT(relay_log.is_relay_log);
1090 ln_without_channel_name =
1091 relay_log.generate_name(opt_relay_logname, "-relay-bin", buffer);
1092
1093 ln = add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1094 ln_without_channel_name);
1095 if (opt_relaylog_index_name_supplied) {
1096 char index_file_withoutext[FN_REFLEN];
1097 relay_log.generate_name(opt_relaylog_index_name, "",
1098 index_file_withoutext);
1099
1100 log_index_name = add_channel_to_relay_log_name(
1101 relay_bin_index_channel, FN_REFLEN, index_file_withoutext);
1102 } else
1103 log_index_name = nullptr;
1104
1105 if (relay_log.open_index_file(log_index_name, ln, true)) {
1106 LogErr(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_PURGE_FAILED,
1107 "Failed to open relay log index file:",
1108 relay_log.get_index_fname());
1109 return 1;
1110 }
1111 mysql_mutex_lock(&mi->data_lock);
1112 mysql_mutex_lock(log_lock);
1113 if (relay_log.open_binlog(
1114 ln, nullptr,
1115 (max_relay_log_size ? max_relay_log_size : max_binlog_size), true,
1116 true /*need_lock_index=true*/, true /*need_sid_lock=true*/,
1117 mi->get_mi_description_event())) {
1118 mysql_mutex_unlock(log_lock);
1119 mysql_mutex_unlock(&mi->data_lock);
1120 LogErr(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_PURGE_FAILED,
1121 "Failed to open relay log file:", relay_log.get_log_fname());
1122 return 1;
1123 }
1124 mysql_mutex_unlock(log_lock);
1125 mysql_mutex_unlock(&mi->data_lock);
1126 } else
1127 return 0;
1128 } else {
1129 DBUG_ASSERT(slave_running == 0);
1130 DBUG_ASSERT(mi->slave_running == 0);
1131 }
1132 /* Reset the transaction boundary parser and clear the last GTID queued */
1133 mi->transaction_parser.reset();
1134 mysql_mutex_lock(&mi->data_lock);
1135 mi->clear_gtid_monitoring_info();
1136 mysql_mutex_unlock(&mi->data_lock);
1137
1138 slave_skip_counter = 0;
1139 mysql_mutex_lock(&data_lock);
1140
1141 /**
1142 Clear the retrieved gtid set for this channel.
1143 */
1144 get_sid_lock()->wrlock();
1145 (const_cast<Gtid_set *>(get_gtid_set()))->clear_set_and_sid_map();
1146 get_sid_lock()->unlock();
1147
1148 if (relay_log.reset_logs(thd, delete_only)) {
1149 *errmsg = "Failed during log reset";
1150 error = 1;
1151 goto err;
1152 }
1153
1154 /* Save name of used relay log file */
1155 set_group_relay_log_name(relay_log.get_log_fname());
1156 group_relay_log_pos = BIN_LOG_HEADER_SIZE;
1157 if (!delete_only && count_relay_log_space()) {
1158 *errmsg = "Error counting relay log space";
1159 error = 1;
1160 goto err;
1161 }
1162
1163 if (!inited && error_on_rli_init_info)
1164 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1165 true /*need_lock_log=true*/, true /*need_lock_index=true*/);
1166 err:
1167 #ifndef DBUG_OFF
1168 char buf[22];
1169 #endif
1170 DBUG_PRINT("info", ("log_space_total: %s", llstr(log_space_total, buf)));
1171 mysql_mutex_unlock(&data_lock);
1172 return error;
1173 }
1174
add_channel_to_relay_log_name(char * buff,uint buff_size,const char * base_name)1175 const char *Relay_log_info::add_channel_to_relay_log_name(
1176 char *buff, uint buff_size, const char *base_name) {
1177 char *ptr;
1178 char channel_to_file[FN_REFLEN];
1179 uint errors, length;
1180 uint base_name_len;
1181 uint suffix_buff_size;
1182
1183 DBUG_ASSERT(base_name != nullptr);
1184
1185 base_name_len = strlen(base_name);
1186 suffix_buff_size = buff_size - base_name_len;
1187
1188 ptr = strmake(buff, base_name, buff_size - 1);
1189
1190 if (channel[0]) {
1191 /* adding a "-" */
1192 ptr = strmake(ptr, "-", suffix_buff_size - 1);
1193
1194 /*
1195 Convert the channel name to the file names charset.
1196 Channel name is in system_charset which is UTF8_general_ci
1197 as it was defined as utf8 in the mysql.slaveinfo tables.
1198 */
1199 length = strconvert(system_charset_info, channel, &my_charset_filename,
1200 channel_to_file, NAME_LEN, &errors);
1201 ptr = strmake(ptr, channel_to_file, suffix_buff_size - length - 1);
1202 }
1203
1204 return (const char *)buff;
1205 }
1206
cached_charset_invalidate()1207 void Relay_log_info::cached_charset_invalidate() {
1208 DBUG_TRACE;
1209
1210 /* Full of zeroes means uninitialized. */
1211 memset(cached_charset, 0, sizeof(cached_charset));
1212 }
1213
cached_charset_compare(char * charset) const1214 bool Relay_log_info::cached_charset_compare(char *charset) const {
1215 DBUG_TRACE;
1216
1217 if (memcmp(cached_charset, charset, sizeof(cached_charset))) {
1218 memcpy(const_cast<char *>(cached_charset), charset, sizeof(cached_charset));
1219 return true;
1220 }
1221 return false;
1222 }
1223
stmt_done(my_off_t event_master_log_pos)1224 int Relay_log_info::stmt_done(my_off_t event_master_log_pos) {
1225 clear_flag(IN_STMT);
1226
1227 DBUG_ASSERT(!belongs_to_client());
1228 /* Worker does not execute binlog update position logics */
1229 DBUG_ASSERT(!is_mts_worker(info_thd));
1230
1231 /*
1232 Replication keeps event and group positions to specify the
1233 set of events that were executed.
1234 Event positions are incremented after processing each event
1235 whereas group positions are incremented when an event or a
1236 set of events is processed such as in a transaction and are
1237 committed or rolled back.
1238
1239 A transaction can be ended with a Query Event, i.e. either
1240 commit or rollback, or by a Xid Log Event. Query Event is
1241 used to terminate pseudo-transactions that are executed
1242 against non-transactional engines such as MyIsam. Xid Log
1243 Event denotes though that a set of changes executed
1244 against a transactional engine is about to commit.
1245
1246 Events' positions are incremented at stmt_done(). However,
1247 transactions that are ended with Xid Log Event have their
1248 group position incremented in the do_apply_event() and in
1249 the do_apply_event_work().
1250
1251 Notice that the type of the engine, i.e. where data and
1252 positions are stored, against what events are being applied
1253 are not considered in this logic.
1254
1255 Regarding the code that follows, notice that the executed
1256 group coordinates don't change if the current event is internal
1257 to the group. The same applies to MTS Coordinator when it
1258 handles a Format Descriptor event that appears in the middle
1259 of a group that is about to be assigned.
1260 */
1261 if ((!is_parallel_exec() && is_in_group()) ||
1262 mts_group_status != MTS_NOT_IN_GROUP) {
1263 inc_event_relay_log_pos();
1264 } else {
1265 return inc_group_relay_log_pos(event_master_log_pos,
1266 true /*need_data_lock*/);
1267 }
1268 return 0;
1269 }
1270
cleanup_context(THD * thd,bool error)1271 void Relay_log_info::cleanup_context(THD *thd, bool error) {
1272 DBUG_TRACE;
1273
1274 DBUG_ASSERT(info_thd == thd);
1275 /*
1276 1) Instances of Table_map_log_event, if ::do_apply_event() was called on
1277 them, may have opened tables, which we cannot be sure have been closed
1278 (because maybe the Rows_log_event have not been found or will not be,
1279 because slave SQL thread is stopping, or relay log has a missing tail etc).
1280 So we close all thread's tables. And so the table mappings have to be
1281 cancelled. 2) Rows_log_event::do_apply_event() may even have started
1282 statements or transactions on them, which we need to rollback in case of
1283 error. 3) If finding a Format_description_log_event after a BEGIN, we also
1284 need to rollback before continuing with the next events. 4) so we need this
1285 "context cleanup" function.
1286 */
1287 if (error) {
1288 trans_rollback_stmt(thd); // if a "statement transaction"
1289 trans_rollback(thd); // if a "real transaction"
1290 thd->variables.original_commit_timestamp = UNDEFINED_COMMIT_TIMESTAMP;
1291 }
1292 if (rows_query_ev) {
1293 /*
1294 In order to avoid invalid memory access, THD::reset_query() should be
1295 called before deleting the rows_query event.
1296 */
1297 info_thd->reset_query();
1298 info_thd->reset_query_for_display();
1299 delete rows_query_ev;
1300 rows_query_ev = nullptr;
1301 DBUG_EXECUTE_IF("after_deleting_the_rows_query_ev", {
1302 const char action[] =
1303 "now SIGNAL deleted_rows_query_ev WAIT_FOR go_ahead";
1304 DBUG_ASSERT(!debug_sync_set_action(info_thd, STRING_WITH_LEN(action)));
1305 };);
1306 }
1307 m_table_map.clear_tables();
1308 slave_close_thread_tables(thd);
1309 if (error) {
1310 /*
1311 trans_rollback above does not rollback XA transactions.
1312 It could be done only after necessarily closing tables which dictates
1313 the following placement.
1314 */
1315 XID_STATE *xid_state = thd->get_transaction()->xid_state();
1316 if (!xid_state->has_state(XID_STATE::XA_NOTR)) {
1317 DBUG_ASSERT(
1318 DBUG_EVALUATE_IF("simulate_commit_failure", 1,
1319 xid_state->has_state(XID_STATE::XA_ACTIVE) ||
1320 xid_state->has_state(XID_STATE::XA_IDLE)));
1321
1322 xa_trans_force_rollback(thd);
1323 xid_state->reset();
1324 cleanup_trans_state(thd);
1325 thd->rpl_unflag_detached_engine_ha_data();
1326 }
1327 thd->mdl_context.release_transactional_locks();
1328 }
1329 clear_flag(IN_STMT);
1330 /*
1331 Cleanup for the flags that have been set at do_apply_event.
1332 */
1333 thd->variables.option_bits &= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1334 thd->variables.option_bits &= ~OPTION_RELAXED_UNIQUE_CHECKS;
1335
1336 /*
1337 Reset state related to long_find_row notes in the error log:
1338 - timestamp
1339 - flag that decides whether the slave prints or not
1340 */
1341 reset_row_stmt_start_timestamp();
1342 unset_long_find_row_note_printed();
1343
1344 /*
1345 If the slave applier changed the current transaction isolation level,
1346 it need to be restored to the session default value once having the
1347 current transaction cleared.
1348
1349 We should call "trans_reset_one_shot_chistics()" only if the "error"
1350 flag is "true", because "cleanup_context()" is called at the end of each
1351 set of Table_maps/Rows representing a statement (when the rows event
1352 is tagged with the STMT_END_F) with the "error" flag as "false".
1353
1354 So, without the "if (error)" below, the isolation level might be reset
1355 in the middle of a pure row based transaction.
1356 */
1357 if (error) trans_reset_one_shot_chistics(thd);
1358 }
1359
clear_tables_to_lock()1360 void Relay_log_info::clear_tables_to_lock() {
1361 DBUG_TRACE;
1362 #ifndef DBUG_OFF
1363 /**
1364 When replicating in RBR and MyISAM Merge tables are involved
1365 open_and_lock_tables (called in do_apply_event) appends the
1366 base tables to the list of tables_to_lock. Then these are
1367 removed from the list in close_thread_tables (which is called
1368 before we reach this point).
1369
1370 This assertion just confirms that we get no surprises at this
1371 point.
1372 */
1373 uint i = 0;
1374 for (TABLE_LIST *ptr = tables_to_lock; ptr; ptr = ptr->next_global, i++)
1375 ;
1376 DBUG_ASSERT(i == tables_to_lock_count);
1377 #endif
1378
1379 while (tables_to_lock) {
1380 uchar *to_free = reinterpret_cast<uchar *>(tables_to_lock);
1381 if (tables_to_lock->m_tabledef_valid) {
1382 tables_to_lock->m_tabledef.table_def::~table_def();
1383 tables_to_lock->m_tabledef_valid = false;
1384 }
1385
1386 /*
1387 If blob fields were used during conversion of field values
1388 from the master table into the slave table, then we need to
1389 free the memory used temporarily to store their values before
1390 copying into the slave's table.
1391 */
1392 if (tables_to_lock->m_conv_table) free_blobs(tables_to_lock->m_conv_table);
1393
1394 tables_to_lock = static_cast<RPL_TABLE_LIST *>(tables_to_lock->next_global);
1395 tables_to_lock_count--;
1396 my_free(to_free);
1397 }
1398 DBUG_ASSERT(tables_to_lock == nullptr && tables_to_lock_count == 0);
1399 }
1400
slave_close_thread_tables(THD * thd)1401 void Relay_log_info::slave_close_thread_tables(THD *thd) {
1402 thd->get_stmt_da()->set_overwrite_status(true);
1403 DBUG_TRACE;
1404 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1405 thd->get_stmt_da()->set_overwrite_status(false);
1406
1407 close_thread_tables(thd);
1408 /*
1409 - If transaction rollback was requested due to deadlock
1410 perform it and release metadata locks.
1411 - If inside a multi-statement transaction,
1412 defer the release of metadata locks until the current
1413 transaction is either committed or rolled back. This prevents
1414 other statements from modifying the table for the entire
1415 duration of this transaction. This provides commit ordering
1416 and guarantees serializability across multiple transactions.
1417 - If in autocommit mode, or outside a transactional context,
1418 automatically release metadata locks of the current statement.
1419 */
1420 if (thd->transaction_rollback_request) {
1421 trans_rollback_implicit(thd);
1422 thd->mdl_context.release_transactional_locks();
1423 } else if (!thd->in_multi_stmt_transaction_mode())
1424 thd->mdl_context.release_transactional_locks();
1425 else
1426 thd->mdl_context.release_statement_locks();
1427
1428 clear_tables_to_lock();
1429 }
1430
1431 /**
1432 Execute a SHOW RELAYLOG EVENTS statement.
1433
1434 When multiple replication channels exist on this slave
1435 and no channel name is specified through FOR CHANNEL clause
1436 this function errors out and exits.
1437
1438 @param thd Pointer to THD object for the client thread executing the
1439 statement.
1440
1441 @retval false success
1442 @retval true failure
1443 */
mysql_show_relaylog_events(THD * thd)1444 bool mysql_show_relaylog_events(THD *thd) {
1445 Master_info *mi = nullptr;
1446 List<Item> field_list;
1447 bool res;
1448 DBUG_TRACE;
1449
1450 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1451
1452 channel_map.wrlock();
1453
1454 if (!thd->lex->mi.for_channel && channel_map.get_num_instances() > 1) {
1455 my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0));
1456 res = true;
1457 goto err;
1458 }
1459
1460 Log_event::init_show_field_list(&field_list);
1461 if (thd->send_result_metadata(&field_list,
1462 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) {
1463 res = true;
1464 goto err;
1465 }
1466
1467 mi = channel_map.get_mi(thd->lex->mi.channel);
1468
1469 if (!mi && strcmp(thd->lex->mi.channel, channel_map.get_default_channel())) {
1470 my_error(ER_SLAVE_CHANNEL_DOES_NOT_EXIST, MYF(0), thd->lex->mi.channel);
1471 res = true;
1472 goto err;
1473 }
1474
1475 if (mi == nullptr) {
1476 my_error(ER_SLAVE_CONFIGURATION, MYF(0));
1477 res = true;
1478 goto err;
1479 }
1480
1481 res = show_binlog_events(thd, &mi->rli->relay_log);
1482
1483 err:
1484 channel_map.unlock();
1485
1486 return res;
1487 }
1488
rli_init_info(bool skip_received_gtid_set_recovery)1489 int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
1490 int error = 0;
1491 enum_return_check check_return = ERROR_CHECKING_REPOSITORY;
1492 const char *msg = nullptr;
1493 DBUG_TRACE;
1494
1495 mysql_mutex_assert_owner(&data_lock);
1496
1497 /*
1498 If Relay_log_info is issued again after a failed init_info(), for
1499 instance because of missing relay log files, it will generate new
1500 files and ignore the previous failure, to avoid that we set
1501 error_on_rli_init_info as true.
1502 This a consequence of the behaviour change, in the past server was
1503 stopped when there were replication initialization errors, now it is
1504 not and so init_info() must be aware of previous failures.
1505 */
1506 if (error_on_rli_init_info) goto err;
1507
1508 if (inited) {
1509 return recovery_parallel_workers ? mts_recovery_groups(this) : 0;
1510 }
1511
1512 slave_skip_counter = 0;
1513 abort_pos_wait = 0;
1514 log_space_limit = relay_log_space_limit;
1515 log_space_total = 0;
1516 tables_to_lock = nullptr;
1517 tables_to_lock_count = 0;
1518
1519 char pattern[FN_REFLEN];
1520 (void)my_realpath(pattern, slave_load_tmpdir, 0);
1521 /*
1522 @TODO:
1523 In MSR, sometimes slave fail with the following error:
1524 Unable to use slave's temporary directory /tmp -
1525 Can't create/write to file
1526 '/tmp/SQL_LOAD-92d1eee0-9de4-11e3-8874-68730ad50fcb' (Errcode: 17 - File
1527 exists), Error_code: 1
1528
1529 */
1530 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
1531 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS) {
1532 LogErr(ERROR_LEVEL, ER_SLAVE_CANT_USE_TEMPDIR, slave_load_tmpdir);
1533 return 1;
1534 }
1535 unpack_filename(slave_patternload_file, pattern);
1536 slave_patternload_file_size = strlen(slave_patternload_file);
1537
1538 /*
1539 The relay log will now be opened, as a WRITE_CACHE IO_CACHE.
1540 Note that the I/O thread flushes it to disk after writing every
1541 event, in flush_info within the master info.
1542 */
1543 /*
1544 For the maximum log size, we choose max_relay_log_size if it is
1545 non-zero, max_binlog_size otherwise. If later the user does SET
1546 GLOBAL on one of these variables, fix_max_binlog_size and
1547 fix_max_relay_log_size will reconsider the choice (for example
1548 if the user changes max_relay_log_size to zero, we have to
1549 switch to using max_binlog_size for the relay log) and update
1550 relay_log.max_size (and mysql_bin_log.max_size).
1551 */
1552 {
1553 /* Reports an error and returns, if the --relay-log's path
1554 is a directory.*/
1555 if (opt_relay_logname &&
1556 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR) {
1557 LogErr(ERROR_LEVEL, ER_RPL_RELAY_LOG_NEEDS_FILE_NOT_DIRECTORY,
1558 opt_relay_logname);
1559 return 1;
1560 }
1561
1562 /* Reports an error and returns, if the --relay-log-index's path
1563 is a directory.*/
1564 if (opt_relaylog_index_name &&
1565 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] ==
1566 FN_LIBCHAR) {
1567 LogErr(ERROR_LEVEL, ER_RPL_RELAY_LOG_INDEX_NEEDS_FILE_NOT_DIRECTORY,
1568 opt_relaylog_index_name);
1569 return 1;
1570 }
1571
1572 char buf[FN_REFLEN];
1573 /* The base name of the relay log file considering multisource rep */
1574 const char *ln;
1575 /*
1576 relay log name without channel prefix taking into account
1577 --relay-log option.
1578 */
1579 const char *ln_without_channel_name;
1580 static bool name_warning_sent = false;
1581
1582 /*
1583 Buffer to add channel name suffix when relay-log option is provided.
1584 */
1585 char relay_bin_channel[FN_REFLEN];
1586 /*
1587 Buffer to add channel name suffix when relay-log-index option is provided
1588 */
1589 char relay_bin_index_channel[FN_REFLEN];
1590
1591 /* name of the index file if opt_relaylog_index_name is set*/
1592 const char *log_index_name;
1593
1594 ln_without_channel_name =
1595 relay_log.generate_name(opt_relay_logname, "-relay-bin", buf);
1596
1597 ln = add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1598 ln_without_channel_name);
1599
1600 /* We send the warning only at startup, not after every RESET SLAVE */
1601 if (!opt_relay_logname_supplied && !opt_relaylog_index_name_supplied &&
1602 !name_warning_sent) {
1603 /*
1604 User didn't give us info to name the relay log index file.
1605 Picking `hostname`-relay-bin.index like we do, causes replication to
1606 fail if this slave's hostname is changed later. So, we would like to
1607 instead require a name. But as we don't want to break many existing
1608 setups, we only give warning, not error.
1609 */
1610 LogErr(WARNING_LEVEL, ER_RPL_PLEASE_USE_OPTION_RELAY_LOG,
1611 ln_without_channel_name);
1612 name_warning_sent = true;
1613 }
1614
1615 /*
1616 If relay log index option is set, convert into channel specific
1617 index file. If the opt_relaylog_index has an extension, we strip
1618 it too. This is inconsistent to relay log names.
1619 */
1620 if (opt_relaylog_index_name_supplied) {
1621 char index_file_withoutext[FN_REFLEN];
1622 relay_log.generate_name(opt_relaylog_index_name, "",
1623 index_file_withoutext);
1624
1625 log_index_name = add_channel_to_relay_log_name(
1626 relay_bin_index_channel, FN_REFLEN, index_file_withoutext);
1627 } else
1628 log_index_name = nullptr;
1629
1630 if (relay_log.open_index_file(log_index_name, ln, true)) {
1631 LogErr(ERROR_LEVEL, ER_RPL_OPEN_INDEX_FILE_FAILED);
1632 return 1;
1633 }
1634
1635 if (!gtid_retrieved_initialized) {
1636 /* Store the GTID of a transaction spanned in multiple relay log files */
1637 Gtid_monitoring_info *partial_trx = mi->get_gtid_monitoring_info();
1638 partial_trx->clear();
1639 #ifndef DBUG_OFF
1640 get_sid_lock()->wrlock();
1641 gtid_set->dbug_print("set of GTIDs in relay log before initialization");
1642 get_sid_lock()->unlock();
1643 #endif
1644 /*
1645 In the init_gtid_set below we pass the mi->transaction_parser.
1646 This will be useful to ensure that we only add a GTID to
1647 the Retrieved_Gtid_Set for fully retrieved transactions. Also, it will
1648 be useful to ensure the Retrieved_Gtid_Set behavior when auto
1649 positioning is disabled (we could have transactions spanning multiple
1650 relay log files in this case).
1651 We will skip this initialization if relay_log_recovery is set in order
1652 to save time, as neither the GTIDs nor the transaction_parser state
1653 would be useful when the relay log will be cleaned up later when calling
1654 init_recovery.
1655 */
1656 if (!is_relay_log_recovery && !gtid_retrieved_initialized &&
1657 !skip_received_gtid_set_recovery &&
1658 relay_log.init_gtid_sets(
1659 gtid_set, nullptr, opt_slave_sql_verify_checksum,
1660 true /*true=need lock*/, &mi->transaction_parser, partial_trx)) {
1661 LogErr(ERROR_LEVEL, ER_RPL_CANT_INITIALIZE_GTID_SETS_IN_RLI_INIT_INFO);
1662 return 1;
1663 }
1664 gtid_retrieved_initialized = true;
1665 #ifndef DBUG_OFF
1666 get_sid_lock()->wrlock();
1667 gtid_set->dbug_print("set of GTIDs in relay log after initialization");
1668 get_sid_lock()->unlock();
1669 #endif
1670 }
1671 /*
1672 Configures what object is used by the current log to store processed
1673 gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
1674 correctly compute the set of previous gtids.
1675 */
1676 relay_log.set_previous_gtid_set_relaylog(gtid_set);
1677 /*
1678 note, that if open() fails, we'll still have index file open
1679 but a destructor will take care of that
1680 */
1681
1682 mysql_mutex_t *log_lock = relay_log.get_log_lock();
1683 mysql_mutex_lock(log_lock);
1684
1685 if (relay_log.open_binlog(
1686 ln, nullptr,
1687 (max_relay_log_size ? max_relay_log_size : max_binlog_size), true,
1688 true /*need_lock_index=true*/, true /*need_sid_lock=true*/,
1689 mi->get_mi_description_event())) {
1690 mysql_mutex_unlock(log_lock);
1691 LogErr(ERROR_LEVEL, ER_RPL_CANT_OPEN_LOG_IN_RLI_INIT_INFO);
1692 return 1;
1693 }
1694
1695 mysql_mutex_unlock(log_lock);
1696 }
1697
1698 /*
1699 This checks if the repository was created before and thus there
1700 will be values to be read. Please, do not move this call after
1701 the handler->init_info().
1702 */
1703 if ((check_return = check_info()) == ERROR_CHECKING_REPOSITORY) {
1704 msg = "Error checking relay log repository";
1705 error = 1;
1706 goto err;
1707 }
1708
1709 if (handler->init_info()) {
1710 msg = "Error reading relay log configuration";
1711 error = 1;
1712 goto err;
1713 }
1714
1715 check_return = check_if_info_was_cleared(check_return);
1716
1717 if (check_return & REPOSITORY_EXISTS) {
1718 if (read_info(handler)) {
1719 msg = "Error reading relay log configuration";
1720 error = 1;
1721 goto err;
1722 }
1723
1724 /*
1725 Clone required cleanup must be done only once, thence we only
1726 do it when server is booting.
1727 */
1728 if (clone_startup && get_server_state() == SERVER_BOOTING) {
1729 char *channel_name =
1730 (const_cast<Relay_log_info *>(mi->rli))->get_channel();
1731 bool is_group_replication_channel =
1732 channel_map.is_group_replication_channel_name(channel_name);
1733 if (is_group_replication_channel) {
1734 if (clear_info()) {
1735 msg =
1736 "Error cleaning relay log configuration for group replication "
1737 "after clone";
1738 error = 1;
1739 goto err;
1740 }
1741 if (Rpl_info_factory::reset_workers(this)) {
1742 msg =
1743 "Error cleaning relay log worker configuration for group "
1744 "replication after clone";
1745 error = 1;
1746 goto err;
1747 }
1748 check_return = REPOSITORY_CLEARED;
1749 } else {
1750 if (!is_relay_log_recovery) {
1751 LogErr(WARNING_LEVEL, ER_RPL_RELAY_LOG_RECOVERY_INFO_AFTER_CLONE,
1752 channel_name);
1753 // After a clone if we detect information is present we always invoke
1754 // relay log recovery. Not doing so would probably mean failure at
1755 // initialization due to missing relay log files.
1756 if (init_recovery(mi)) {
1757 msg = "Error on the relay log recovery after a clone operation";
1758 error = 1;
1759 goto err;
1760 }
1761 }
1762 }
1763 }
1764 }
1765
1766 if (check_return == REPOSITORY_DOES_NOT_EXIST || // Hasn't been initialized
1767 check_return == REPOSITORY_CLEARED // Was initialized but was RESET
1768 ) {
1769 /* Init relay log with first entry in the relay index file */
1770 if (reset_group_relay_log_pos(&msg)) {
1771 error = 1;
1772 goto err;
1773 }
1774 group_master_log_name[0] = 0;
1775 group_master_log_pos = 0;
1776 } else {
1777 if (is_relay_log_recovery && init_recovery(mi)) {
1778 error = 1;
1779 goto err;
1780 }
1781
1782 if (is_group_relay_log_name_invalid(&msg)) {
1783 LogErr(ERROR_LEVEL, ER_RPL_MTS_RECOVERY_CANT_OPEN_RELAY_LOG,
1784 group_relay_log_name, std::to_string(group_relay_log_pos).c_str());
1785 error = 1;
1786 goto err;
1787 }
1788 }
1789
1790 inited = true;
1791 error_on_rli_init_info = false;
1792 if (flush_info(true)) {
1793 msg = "Error reading relay log configuration";
1794 error = 1;
1795 goto err;
1796 }
1797
1798 if (count_relay_log_space()) {
1799 msg = "Error counting relay log space";
1800 error = 1;
1801 goto err;
1802 }
1803
1804 /*
1805 In case of MTS the recovery is deferred until the end of
1806 load_mi_and_rli_from_repositories.
1807 */
1808 if (!mi->rli->mts_recovery_group_cnt) is_relay_log_recovery = false;
1809 return error;
1810
1811 err:
1812 handler->end_info();
1813 inited = false;
1814 error_on_rli_init_info = true;
1815 if (msg) LogErr(ERROR_LEVEL, ER_RPL_RLI_INIT_INFO_MSG, msg);
1816 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1817 true /*need_lock_log=true*/, true /*need_lock_index=true*/);
1818 return error;
1819 }
1820
end_info()1821 void Relay_log_info::end_info() {
1822 DBUG_TRACE;
1823
1824 error_on_rli_init_info = false;
1825 if (!inited) return;
1826
1827 handler->end_info();
1828
1829 inited = false;
1830 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1831 true /*need_lock_log=true*/, true /*need_lock_index=true*/);
1832 relay_log.harvest_bytes_written(this, true /*need_log_space_lock=true*/);
1833 /*
1834 Delete the slave's temporary tables from memory.
1835 In the future there will be other actions than this, to ensure persistance
1836 of slave's temp tables after shutdown.
1837 */
1838 close_temporary_tables();
1839 }
1840
flush_current_log()1841 int Relay_log_info::flush_current_log() {
1842 DBUG_TRACE;
1843
1844 /* When we come to this place in code, relay log may or not be initialized; */
1845 if (relay_log.flush()) return 2;
1846
1847 return 0;
1848 }
1849
set_master_info(Master_info * info)1850 void Relay_log_info::set_master_info(Master_info *info) { mi = info; }
1851
1852 /**
1853 Stores the file and position where the execute-slave thread are in the
1854 relay log:
1855
1856 - As this is only called by the slave thread or on STOP SLAVE, with the
1857 log_lock grabbed and the slave thread stopped, we don't need to have
1858 a lock here.
1859 - If there is an active transaction, then we don't update the position
1860 in the relay log. This is to ensure that we re-execute statements
1861 if we die in the middle of an transaction that was rolled back.
1862 - As a transaction never spans binary logs, we don't have to handle the
1863 case where we do a relay-log-rotation in the middle of the transaction.
1864 If this would not be the case, we would have to ensure that we
1865 don't delete the relay log file where the transaction started when
1866 we switch to a new relay log file.
1867
1868 @retval 0 ok,
1869 @retval 1 write error, otherwise.
1870 */
1871
1872 /**
1873 Store the file and position where the slave's SQL thread are in the
1874 relay log.
1875
1876 Notes:
1877
1878 - This function should be called either from the slave SQL thread,
1879 or when the slave thread is not running. (It reads the
1880 group_{relay|master}_log_{pos|name} and delay fields in the rli
1881 object. These may only be modified by the slave SQL thread or by
1882 a client thread when the slave SQL thread is not running.)
1883
1884 - If there is an active transaction, then we do not update the
1885 position in the relay log. This is to ensure that we re-execute
1886 statements if we die in the middle of an transaction that was
1887 rolled back.
1888
1889 - As a transaction never spans binary logs, we don't have to handle
1890 the case where we do a relay-log-rotation in the middle of the
1891 transaction. If transactions could span several binlogs, we would
1892 have to ensure that we do not delete the relay log file where the
1893 transaction started before switching to a new relay log file.
1894
1895 - Error can happen if writing to file fails or if flushing the file
1896 fails.
1897
1898 @todo Change the log file information to a binary format to avoid
1899 calling longlong2str.
1900
1901 @return 0 on success, 1 on error.
1902 */
flush_info(const bool force)1903 int Relay_log_info::flush_info(const bool force) {
1904 DBUG_TRACE;
1905
1906 if (!inited) return 0;
1907
1908 /*
1909 We update the sync_period at this point because only here we
1910 now that we are handling a relay log info. This needs to be
1911 update every time we call flush because the option maybe
1912 dinamically set.
1913 */
1914 mysql_mutex_lock(&mts_temp_table_LOCK);
1915 handler->set_sync_period(sync_relayloginfo_period);
1916
1917 if (write_info(handler)) goto err;
1918
1919 if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
1920 goto err;
1921
1922 force_flush_postponed_due_to_split_trans = false;
1923 mysql_mutex_unlock(&mts_temp_table_LOCK);
1924 return 0;
1925
1926 err:
1927 LogErr(ERROR_LEVEL, ER_RPL_ERROR_WRITING_RELAY_LOG_CONFIGURATION);
1928 mysql_mutex_unlock(&mts_temp_table_LOCK);
1929 return 1;
1930 }
1931
check_if_info_was_cleared(const enum_return_check & previous_result) const1932 enum_return_check Relay_log_info::check_if_info_was_cleared(
1933 const enum_return_check &previous_result) const {
1934 enum_return_check result = previous_result;
1935
1936 if (result == REPOSITORY_EXISTS) {
1937 char number_of_lines[FN_REFLEN] = {0};
1938
1939 if (this->handler->prepare_info_for_read() ||
1940 !!this->handler->get_info(number_of_lines, sizeof(number_of_lines), ""))
1941 return ERROR_CHECKING_REPOSITORY;
1942
1943 char *first_non_digit{nullptr};
1944 int lines = strtoul(number_of_lines, &first_non_digit, 10);
1945
1946 if (number_of_lines[0] != '\0' && *first_non_digit == '\0' &&
1947 lines >= LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_ROW_FORMAT) {
1948 char log_name[FN_REFLEN] = {0};
1949
1950 if (this->handler->get_info(log_name, sizeof(log_name), "") ==
1951 Rpl_info_handler::enum_field_get_status::FAILURE)
1952 return ERROR_CHECKING_REPOSITORY;
1953
1954 if (log_name[0] == '\0') return REPOSITORY_CLEARED;
1955 }
1956 }
1957 return result;
1958 }
1959
clear_info()1960 bool Relay_log_info::clear_info() {
1961 this->handler->init_info();
1962
1963 if (this->handler->prepare_info_for_write() ||
1964 this->handler->set_info((int)MAXIMUM_LINES_IN_RELAY_LOG_INFO_FILE) ||
1965 this->handler->set_info(nullptr) || this->handler->set_info(nullptr) ||
1966 this->handler->set_info(nullptr) || this->handler->set_info(nullptr) ||
1967 this->handler->set_info(nullptr) || this->handler->set_info(nullptr) ||
1968 this->handler->set_info(nullptr) ||
1969 this->handler->set_info(this->channel))
1970 return true;
1971
1972 if (this->m_privilege_checks_username.length() != 0) {
1973 if (this->handler->set_info(this->m_privilege_checks_username.c_str()))
1974 return true;
1975 } else {
1976 if (this->handler->set_info(nullptr)) {
1977 return true;
1978 }
1979 }
1980 if (this->m_privilege_checks_hostname.length() != 0) {
1981 if (this->handler->set_info(this->m_privilege_checks_hostname.c_str()))
1982 return true;
1983 } else {
1984 if (this->handler->set_info(nullptr)) return true;
1985 }
1986
1987 if (this->handler->set_info(this->m_require_row_format)) return true;
1988
1989 if (DBUG_EVALUATE_IF("rpl_rli_clear_info_error", true, false) ||
1990 this->handler->set_info((ulong)this->m_require_table_primary_key_check))
1991 return true;
1992
1993 if (this->handler->flush_info(true)) return true;
1994
1995 this->group_relay_log_name[0] = '\0';
1996 this->group_relay_log_pos = 0;
1997 this->group_master_log_name[0] = '\0';
1998 this->group_master_log_pos = 0;
1999 this->sql_delay = 0;
2000 this->recovery_parallel_workers = 0;
2001 this->internal_id = 1;
2002
2003 return false;
2004 }
2005
get_number_info_rli_fields()2006 size_t Relay_log_info::get_number_info_rli_fields() {
2007 return sizeof(info_rli_fields) / sizeof(info_rli_fields[0]);
2008 }
2009
set_nullable_fields(MY_BITMAP * nullable_fields)2010 void Relay_log_info::set_nullable_fields(MY_BITMAP *nullable_fields) {
2011 bitmap_init(nullable_fields, nullptr,
2012 Relay_log_info::get_number_info_rli_fields());
2013 bitmap_set_all(nullable_fields); // All fields may be NULL except for
2014 bitmap_clear_bit(nullable_fields, 0); // NUMBER_OF_LINES and
2015 bitmap_clear_bit(nullable_fields, 8); // CHANNEL_NAME
2016 }
2017
start_sql_delay(time_t delay_end)2018 void Relay_log_info::start_sql_delay(time_t delay_end) {
2019 mysql_mutex_assert_owner(&data_lock);
2020 sql_delay_end = delay_end;
2021 THD_STAGE_INFO(info_thd, stage_sql_thd_waiting_until_delay);
2022 }
2023
read_info(Rpl_info_handler * from)2024 bool Relay_log_info::read_info(Rpl_info_handler *from) {
2025 int lines = 0;
2026 char *first_non_digit = nullptr;
2027 ulong temp_group_relay_log_pos = 0;
2028 ulong temp_group_master_log_pos = 0;
2029 int temp_sql_delay = 0;
2030 int temp_internal_id = internal_id;
2031 int temp_require_row_format = 0;
2032 ulong temp_require_table_primary_key_check = Relay_log_info::PK_CHECK_STREAM;
2033 Rpl_info_handler::enum_field_get_status status{
2034 Rpl_info_handler::enum_field_get_status::FAILURE};
2035
2036 DBUG_TRACE;
2037
2038 /*
2039 Should not read RLI from file in client threads. Client threads
2040 only use RLI to execute BINLOG statements.
2041
2042 @todo Uncomment the following assertion. Currently,
2043 Relay_log_info::init() is called from init_master_info() before
2044 the THD object Relay_log_info::sql_thd is created. That means we
2045 cannot call belongs_to_client() since belongs_to_client()
2046 dereferences Relay_log_info::sql_thd. So we need to refactor
2047 slightly: the THD object should be created by Relay_log_info
2048 constructor (or passed to it), so that we are guaranteed that it
2049 exists at this point. /Sven
2050 */
2051 // DBUG_ASSERT(!belongs_to_client());
2052
2053 /*
2054 Starting from 5.1.x, relay-log.info has a new format. Now, its
2055 first line contains the number of lines in the file. By reading
2056 this number we can determine which version our master.info comes
2057 from. We can't simply count the lines in the file, since
2058 versions before 5.1.x could generate files with more lines than
2059 needed. If first line doesn't contain a number, or if it
2060 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2061 then the file is treated like a file from pre-5.1.x version.
2062 There is no ambiguity when reading an old master.info: before
2063 5.1.x, the first line contained the binlog's name, which is
2064 either empty or has an extension (contains a '.'), so can't be
2065 confused with an integer.
2066
2067 So we're just reading first line and trying to figure which
2068 version is this.
2069 */
2070
2071 /*
2072 The first row is temporarily stored in mi->master_log_name, if
2073 it is line count and not binlog name (new format) it will be
2074 overwritten by the second row later.
2075 */
2076 if (from->prepare_info_for_read() ||
2077 !!from->get_info(group_relay_log_name, sizeof(group_relay_log_name), ""))
2078 return true;
2079
2080 lines = strtoul(group_relay_log_name, &first_non_digit, 10);
2081
2082 if (group_relay_log_name[0] != '\0' && *first_non_digit == '\0' &&
2083 lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) {
2084 /* Seems to be new format => read group relay log name */
2085 status =
2086 from->get_info(group_relay_log_name, sizeof(group_relay_log_name), "");
2087 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2088 if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_IS_NULL)
2089 group_relay_log_name[0] = '\0';
2090 } else
2091 DBUG_PRINT("info", ("relay_log_info file is in old format."));
2092
2093 status =
2094 from->get_info(&temp_group_relay_log_pos, (ulong)BIN_LOG_HEADER_SIZE);
2095 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2096
2097 status =
2098 from->get_info(group_master_log_name, sizeof(group_master_log_name), "");
2099 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2100 if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_IS_NULL)
2101 group_master_log_name[0] = '\0';
2102
2103 status = from->get_info(&temp_group_master_log_pos, 0UL);
2104 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2105
2106 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) {
2107 status = from->get_info(&temp_sql_delay, 0);
2108 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2109 }
2110
2111 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS) {
2112 status = from->get_info(&recovery_parallel_workers, 0UL);
2113 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2114 }
2115
2116 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID) {
2117 status = from->get_info(&temp_internal_id, 1);
2118 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2119 }
2120
2121 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL) {
2122 /* the default value is empty string"" */
2123 if (!!from->get_info(channel, sizeof(channel), "")) return true;
2124 }
2125
2126 /*
2127 * Here +4 is used to accomodate string if debug point
2128 * `simulate_priv_check_username_above_limit` is set in test.
2129 */
2130 char temp_privilege_checks_username[PRIV_CHECKS_USERNAME_LENGTH + 4] = {0};
2131 char *username = nullptr;
2132 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_USERNAME) {
2133 status = from->get_info(temp_privilege_checks_username,
2134 PRIV_CHECKS_USERNAME_LENGTH + 1, nullptr);
2135 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2136 if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_NOT_NULL)
2137 username = temp_privilege_checks_username;
2138 }
2139
2140 /*
2141 * Here +4 is used to accomodate string if debug point
2142 * `simulate_priv_check_hostname_above_limit` is set in test.
2143 */
2144 char temp_privilege_checks_hostname[PRIV_CHECKS_HOSTNAME_LENGTH + 4] = {0};
2145 char *hostname = nullptr;
2146 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_HOSTNAME) {
2147 status = from->get_info(temp_privilege_checks_hostname,
2148 PRIV_CHECKS_HOSTNAME_LENGTH + 1, nullptr);
2149 if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2150 if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_NOT_NULL)
2151 hostname = temp_privilege_checks_hostname;
2152 }
2153
2154 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_ROW_FORMAT) {
2155 if (!!from->get_info(&temp_require_row_format, 0)) return true;
2156 } else {
2157 if (channel_map.is_group_replication_channel_name(channel))
2158 temp_require_row_format = 1;
2159 }
2160 m_require_row_format = temp_require_row_format;
2161
2162 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_TABLE_PRIMARY_KEY_CHECK) {
2163 if (!!from->get_info(&temp_require_table_primary_key_check, 1)) return true;
2164 }
2165 if (temp_require_table_primary_key_check < PK_CHECK_STREAM ||
2166 temp_require_table_primary_key_check > Relay_log_info::PK_CHECK_OFF)
2167 return true;
2168 m_require_table_primary_key_check =
2169 static_cast<Relay_log_info::enum_require_table_primary_key>(
2170 temp_require_table_primary_key_check);
2171
2172 group_relay_log_pos = temp_group_relay_log_pos;
2173 group_master_log_pos = temp_group_master_log_pos;
2174 sql_delay = (int32)temp_sql_delay;
2175 internal_id = (uint)temp_internal_id;
2176
2177 DBUG_EXECUTE_IF("simulate_priv_check_username_above_limit", {
2178 strcpy(temp_privilege_checks_username,
2179 "repli_priv_checks_user_more_than_32");
2180 username = temp_privilege_checks_username;
2181 });
2182
2183 DBUG_EXECUTE_IF("simulate_priv_check_hostname_above_limit", {
2184 strcpy(
2185 temp_privilege_checks_hostname,
2186 "replication_privilege_checks_hostname_more_than_255_replication_"
2187 "privilege_checks_hostname_more_than_255_replication_privilege_checks_"
2188 "hostname_more_than_255_replication_privilege_checks_hostname_more_"
2189 "than_255_replication_privilege_checks_hostname_more_than255");
2190 hostname = temp_privilege_checks_hostname;
2191 });
2192 enum_priv_checks_status error = set_privilege_checks_user(username, hostname);
2193 if (!!error) {
2194 set_privilege_checks_user_corrupted(true);
2195 report_privilege_check_error(WARNING_LEVEL, error, false, channel, username,
2196 hostname);
2197 return true;
2198 }
2199
2200 return false;
2201 }
2202
set_info_search_keys(Rpl_info_handler * to)2203 bool Relay_log_info::set_info_search_keys(Rpl_info_handler *to) {
2204 DBUG_TRACE;
2205
2206 if (to->set_info(LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL, channel)) return true;
2207
2208 return false;
2209 }
2210
write_info(Rpl_info_handler * to)2211 bool Relay_log_info::write_info(Rpl_info_handler *to) {
2212 DBUG_TRACE;
2213
2214 /*
2215 @todo Uncomment the following assertion. See todo in
2216 Relay_log_info::read_info() for details. /Sven
2217 */
2218 // DBUG_ASSERT(!belongs_to_client());
2219
2220 if (to->prepare_info_for_write() ||
2221 to->set_info((int)MAXIMUM_LINES_IN_RELAY_LOG_INFO_FILE) ||
2222 to->set_info(group_relay_log_name) ||
2223 to->set_info((ulong)group_relay_log_pos) ||
2224 to->set_info(group_master_log_name) ||
2225 to->set_info((ulong)group_master_log_pos) ||
2226 to->set_info((int)sql_delay) || to->set_info(recovery_parallel_workers) ||
2227 to->set_info((int)internal_id) || to->set_info(channel))
2228 return true;
2229 if (m_privilege_checks_username.length()) {
2230 if (to->set_info(m_privilege_checks_username.c_str())) return true;
2231 } else {
2232 #ifndef DBUG_OFF
2233 if (DBUG_EVALUATE_IF("simulate_priv_check_user_nullptr_t", true, false)) {
2234 const char *null_char{nullptr};
2235 if (to->set_info(null_char)) return true;
2236 } else
2237 #endif
2238 if (to->set_info(nullptr)) {
2239 return true;
2240 }
2241 }
2242 if (m_privilege_checks_hostname.length()) {
2243 if (to->set_info(m_privilege_checks_hostname.c_str())) return true;
2244 } else {
2245 #ifndef DBUG_OFF
2246 if (DBUG_EVALUATE_IF("simulate_priv_check_user_nullptr_t", true, false)) {
2247 const uchar *null_char{nullptr};
2248 if (to->set_info(null_char, 0)) return true;
2249 } else
2250 #endif
2251 if (to->set_info(nullptr))
2252 return true;
2253 }
2254
2255 if (to->set_info((int)m_require_row_format)) {
2256 return true; /* purecov: inspected */
2257 }
2258
2259 if (to->set_info((ulong)m_require_table_primary_key_check)) {
2260 return true; /* purecov: inspected */
2261 }
2262 return false;
2263 }
2264
2265 /**
2266 The method is run by SQL thread/MTS Coordinator.
2267 It replaces the current FD event with a new one.
2268 A version adaptation routine is invoked for the new FD
2269 to align the slave applier execution context with the master version.
2270
2271 Since FD are shared by Coordinator and Workers in the MTS mode,
2272 deletion of the old FD is done through decrementing its usage counter.
2273 The destructor runs when the later drops to zero,
2274 also see @c Slave_worker::set_rli_description_event().
2275 The usage counter of the new FD is incremented.
2276
2277 Although notice that MTS worker runs it, inefficiently (see assert),
2278 once at its destruction time.
2279
2280 @param fe Pointer to be installed into execution context
2281 FormatDescriptor event
2282
2283 @return 1 if an error was encountered, 0 otherwise.
2284 */
2285
set_rli_description_event(Format_description_log_event * fe)2286 int Relay_log_info::set_rli_description_event(
2287 Format_description_log_event *fe) {
2288 DBUG_TRACE;
2289 DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2290
2291 if (fe) {
2292 ulong fe_version = adapt_to_master_version(fe);
2293
2294 if (info_thd) {
2295 /* @see rpl_rli_pdb.h:Slave_worker::set_rli_description_event for a
2296 detailed explanation on the following code block's logic. */
2297 if (info_thd->variables.gtid_next.type == AUTOMATIC_GTID ||
2298 info_thd->variables.gtid_next.type == UNDEFINED_GTID) {
2299 bool in_active_multi_stmt =
2300 info_thd->in_active_multi_stmt_transaction();
2301
2302 if (!is_in_group() && !in_active_multi_stmt) {
2303 DBUG_PRINT("info",
2304 ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
2305 info_thd->variables.gtid_next.set_not_yet_determined();
2306 } else if (in_active_multi_stmt) {
2307 my_error(ER_VARIABLE_NOT_SETTABLE_IN_TRANSACTION, MYF(0),
2308 "gtid_next");
2309 return 1;
2310 }
2311 }
2312
2313 if (is_parallel_exec() && fe_version > 0) {
2314 /*
2315 Prepare for workers' adaption to a new FD version. Workers
2316 will see notification through scheduling of a first event of
2317 a new post-new-FD.
2318 */
2319 for (Slave_worker **it = workers.begin(); it != workers.end(); ++it)
2320 (*it)->fd_change_notified = false;
2321 }
2322 }
2323 }
2324 if (rli_description_event &&
2325 --rli_description_event->atomic_usage_counter == 0)
2326 delete rli_description_event;
2327 #ifndef DBUG_OFF
2328 else
2329 /* It must be MTS mode when the usage counter greater than 1. */
2330 DBUG_ASSERT(!rli_description_event || is_parallel_exec());
2331 #endif
2332 rli_description_event = fe;
2333 if (rli_description_event) ++rli_description_event->atomic_usage_counter;
2334
2335 return 0;
2336 }
2337
2338 struct st_feature_version {
2339 /*
2340 The enum must be in the version non-descending top-down order,
2341 the last item formally corresponds to highest possible server
2342 version (never reached, thereby no adapting actions here);
2343 enumeration starts from zero.
2344 */
2345 enum {
2346 WL6292_TIMESTAMP_EXPLICIT_DEFAULT = 0,
2347 _END_OF_LIST // always last
2348 } item;
2349 /*
2350 Version where the feature is introduced.
2351 */
2352 uchar version_split[3];
2353 /*
2354 Action to perform when according to FormatDescriptor event Master
2355 is found to be feature-aware while previously it has *not* been.
2356 */
2357 void (*upgrade)(THD *);
2358 /*
2359 Action to perform when according to FormatDescriptor event Master
2360 is found to be feature-*un*aware while previously it has been.
2361 */
2362 void (*downgrade)(THD *);
2363 };
2364
wl6292_upgrade_func(THD * thd)2365 static void wl6292_upgrade_func(THD *thd) {
2366 thd->variables.explicit_defaults_for_timestamp = false;
2367 if (global_system_variables.explicit_defaults_for_timestamp)
2368 thd->variables.explicit_defaults_for_timestamp = true;
2369
2370 return;
2371 }
2372
wl6292_downgrade_func(THD * thd)2373 static void wl6292_downgrade_func(THD *thd) {
2374 if (global_system_variables.explicit_defaults_for_timestamp)
2375 thd->variables.explicit_defaults_for_timestamp = false;
2376
2377 return;
2378 }
2379
2380 /**
2381 Sensitive to Master-vs-Slave version difference features
2382 should be listed in the version non-descending order.
2383 */
2384 static st_feature_version s_features[] = {
2385 // order is the same as in the enum
2386 {st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2387 {5, 6, 6},
2388 wl6292_upgrade_func,
2389 wl6292_downgrade_func},
2390 {st_feature_version::_END_OF_LIST, {255, 255, 255}, nullptr, nullptr}};
2391
2392 /**
2393 The method computes the incoming "master"'s FD server version and that
2394 of the currently installed (if ever) rli_description_event, to
2395 invoke more specific method to compare the two and adapt slave applier
2396 execution context to the new incoming master's version.
2397
2398 This method is specifically for STS applier/MTS Coordinator as well as
2399 for a user thread applying binlog events.
2400
2401 @param fdle a pointer to new Format Description event that is being
2402 set up a new execution context.
2403 @return 0 when the versions are equal,
2404 master_version otherwise
2405 */
adapt_to_master_version(Format_description_log_event * fdle)2406 ulong Relay_log_info::adapt_to_master_version(
2407 Format_description_log_event *fdle) {
2408 ulong master_version, current_version, slave_version;
2409
2410 slave_version = version_product(slave_version_split);
2411 /* When rli_description_event is uninitialized yet take the slave's version */
2412 master_version = !fdle ? slave_version : fdle->get_product_version();
2413 current_version = !rli_description_event
2414 ? slave_version
2415 : rli_description_event->get_product_version();
2416 return adapt_to_master_version_updown(master_version, current_version);
2417 }
2418
2419 /**
2420 The method compares two supplied versions and carries out down- or
2421 up- grade customization of execution context of the slave applier
2422 (thd).
2423
2424 The method is invoked in the STS case through
2425 Relay_log_info::adapt_to_master_version() right before a new master
2426 FD is installed into the applier execution context; in the MTS
2427 case it's done by the Worker when it's assigned with a first event
2428 after the latest new FD has been installed.
2429
2430 Comparison of the current (old, existing) and the master (new,
2431 incoming) versions yields adaptive actions.
2432 To explain that, let's denote V_0 as the current, and the master's
2433 one as V_1.
2434 In the downgrade case (V_1 < V_0) a server feature that is undefined
2435 in V_1 but is defined starting from some V_f of [V_1 + 1, V_0] range
2436 (+1 to mean V_1 excluded) are invalidated ("removed" from execution context)
2437 by running so called here downgrade action.
2438 Conversely in the upgrade case a feature defined in [V_0 + 1, V_1] range
2439 is validated ("added" to execution context) by running its upgrade action.
2440 A typical use case showing how adaptive actions are necessary for the slave
2441 applier is when the master version is lesser than the slave's one.
2442 In such case events generated on the "older" master may need to be applied
2443 in their native server context. And such context can be provided by downgrade
2444 actions.
2445 Conversely, when the old master events are run out and a newer master's events
2446 show up for applying, the execution context will be upgraded through
2447 the namesake actions.
2448
2449 Notice that a relay log may have two FD events, one the slave local
2450 and the other from the Master. As there's no concern for the FD
2451 originator this leads to two adapt_to_master_version() calls.
2452 It's not harmful as can be seen from the following example.
2453 Say the currently installed FD's version is
2454 V_m, then at relay-log rotation the following transition takes
2455 place:
2456
2457 V_m -adapt-> V_s -adapt-> V_m.
2458
2459 here and further `m' subscript stands for the master, `s' for the slave.
2460 It's clear that in this case an ineffective V_m -> V_m transition occurs.
2461
2462 At composing downgrade/upgrade actions keep in mind that the slave applier
2463 version transition goes the following route:
2464 The initial version is that of the slave server (V_ss).
2465 It changes to a magic 4.0 at the slave relay log initialization.
2466 In the following course versions are extracted from each FD read out,
2467 regardless of what server generated it. Here is a typical version
2468 transition sequence underscored with annotation:
2469
2470 V_ss -> 4.0 -> V(FD_s^1) -> V(FD_m^2) ---> V(FD_s^3) -> V(FD_m^4) ...
2471
2472 ---------- ----------------- -------- ------------------ ---
2473 bootstrap 1st relay log rotation 2nd log etc
2474
2475 The upper (^) subscipt enumerates Format Description events, V(FD^i) stands
2476 for a function extrating the version data from the i:th FD.
2477
2478 There won't be any action to execute when info_thd is undefined,
2479 e.g at bootstrap.
2480
2481 @param master_version an upcoming new version
2482 @param current_version the current version
2483 @return 0 when the new version is equal to the current one,
2484 master_version otherwise
2485 */
adapt_to_master_version_updown(ulong master_version,ulong current_version)2486 ulong Relay_log_info::adapt_to_master_version_updown(ulong master_version,
2487 ulong current_version) {
2488 THD *thd = info_thd;
2489 /*
2490 When the SQL thread or MTS Coordinator executes this method
2491 there's a constraint on current_version argument.
2492 */
2493 DBUG_ASSERT(
2494 !thd || thd->rli_fake != nullptr ||
2495 thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER ||
2496 (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
2497 (!rli_description_event ||
2498 current_version == rli_description_event->get_product_version())));
2499
2500 if (master_version == current_version)
2501 return 0;
2502 else if (!thd)
2503 return master_version;
2504
2505 bool downgrade = master_version < current_version;
2506 /*
2507 find item starting from and ending at for which adaptive actions run
2508 for downgrade or upgrade branches.
2509 (todo: convert into bsearch when number of features will grow significantly)
2510 */
2511 long i, i_first = st_feature_version::_END_OF_LIST, i_last = i_first;
2512
2513 for (i = 0; i < st_feature_version::_END_OF_LIST; i++) {
2514 ulong ver_f = version_product(s_features[i].version_split);
2515
2516 if ((downgrade ? master_version : current_version) < ver_f &&
2517 i_first == st_feature_version::_END_OF_LIST)
2518 i_first = i;
2519 if ((downgrade ? current_version : master_version) < ver_f) {
2520 i_last = i;
2521 DBUG_ASSERT(i_last >= i_first);
2522 break;
2523 }
2524 }
2525
2526 /*
2527 actions, executed in version non-descending st_feature_version order
2528 */
2529 for (i = i_first; i < i_last; i++) {
2530 /* Run time check of the st_feature_version items ordering */
2531 DBUG_ASSERT(!i || version_product(s_features[i - 1].version_split) <=
2532 version_product(s_features[i].version_split));
2533
2534 DBUG_ASSERT((downgrade ? master_version : current_version) <
2535 version_product(s_features[i].version_split) &&
2536 (downgrade ? current_version
2537 : master_version >=
2538 version_product(s_features[i].version_split)));
2539
2540 if (downgrade && s_features[i].downgrade) {
2541 s_features[i].downgrade(thd);
2542 } else if (s_features[i].upgrade) {
2543 s_features[i].upgrade(thd);
2544 }
2545 }
2546
2547 return master_version;
2548 }
2549
relay_log_number_to_name(uint number,char name[FN_REFLEN+1])2550 void Relay_log_info::relay_log_number_to_name(uint number,
2551 char name[FN_REFLEN + 1]) {
2552 char *str = nullptr;
2553 char relay_bin_channel[FN_REFLEN + 1];
2554 const char *relay_log_basename_channel = add_channel_to_relay_log_name(
2555 relay_bin_channel, FN_REFLEN + 1, relay_log_basename);
2556
2557 /* str points to closing null of relay log basename channel */
2558 str = strmake(name, relay_log_basename_channel, FN_REFLEN + 1);
2559 *str++ = '.';
2560 sprintf(str, "%06u", number);
2561 }
2562
relay_log_name_to_number(const char * name)2563 uint Relay_log_info::relay_log_name_to_number(const char *name) {
2564 return static_cast<uint>(atoi(fn_ext(name) + 1));
2565 }
2566
is_mts_db_partitioned(Relay_log_info * rli)2567 bool is_mts_db_partitioned(Relay_log_info *rli) {
2568 return (rli->current_mts_submode->get_type() == MTS_PARALLEL_TYPE_DB_NAME);
2569 }
2570
get_for_channel_str(bool upper_case) const2571 const char *Relay_log_info::get_for_channel_str(bool upper_case) const {
2572 if (rli_fake || mi == nullptr)
2573 return "";
2574 else
2575 return mi->get_for_channel_str(upper_case);
2576 }
2577
add_gtid_set(const Gtid_set * gtid_set)2578 enum_return_status Relay_log_info::add_gtid_set(const Gtid_set *gtid_set) {
2579 DBUG_TRACE;
2580
2581 get_sid_lock()->wrlock();
2582 enum_return_status return_status = this->gtid_set->add_gtid_set(gtid_set);
2583 get_sid_lock()->unlock();
2584
2585 return return_status;
2586 }
2587
get_until_log_name()2588 const char *Relay_log_info::get_until_log_name() {
2589 if (until_condition == UNTIL_MASTER_POS ||
2590 until_condition == UNTIL_RELAY_POS) {
2591 DBUG_ASSERT(until_option != nullptr);
2592 return ((Until_position *)until_option)->get_until_log_name();
2593 }
2594 return "";
2595 }
2596
get_until_log_pos()2597 my_off_t Relay_log_info::get_until_log_pos() {
2598 if (until_condition == UNTIL_MASTER_POS ||
2599 until_condition == UNTIL_RELAY_POS) {
2600 DBUG_ASSERT(until_option != nullptr);
2601 return ((Until_position *)until_option)->get_until_log_pos();
2602 }
2603 return 0;
2604 }
2605
init_until_option(THD * thd,const LEX_MASTER_INFO * master_param)2606 int Relay_log_info::init_until_option(THD *thd,
2607 const LEX_MASTER_INFO *master_param) {
2608 DBUG_TRACE;
2609 int ret = 0;
2610 Until_option *option = nullptr;
2611
2612 until_condition = UNTIL_NONE;
2613 clear_until_option();
2614
2615 try {
2616 if (master_param->pos) {
2617 Until_master_position *until_mp = nullptr;
2618
2619 if (master_param->relay_log_pos) return ER_BAD_SLAVE_UNTIL_COND;
2620
2621 option = until_mp = new Until_master_position(this);
2622 until_condition = UNTIL_MASTER_POS;
2623 ret = until_mp->init(master_param->log_file_name, master_param->pos);
2624 } else if (master_param->relay_log_pos) {
2625 Until_relay_position *until_rp = nullptr;
2626
2627 if (master_param->pos) return ER_BAD_SLAVE_UNTIL_COND;
2628
2629 option = until_rp = new Until_relay_position(this);
2630 until_condition = UNTIL_RELAY_POS;
2631 ret = until_rp->init(master_param->relay_log_name,
2632 master_param->relay_log_pos);
2633 } else if (master_param->gtid) {
2634 Until_gtids *until_g = nullptr;
2635
2636 if (LEX_MASTER_INFO::UNTIL_SQL_BEFORE_GTIDS ==
2637 master_param->gtid_until_condition) {
2638 option = until_g = new Until_before_gtids(this);
2639 until_condition = UNTIL_SQL_BEFORE_GTIDS;
2640 } else {
2641 DBUG_ASSERT(LEX_MASTER_INFO::UNTIL_SQL_AFTER_GTIDS ==
2642 master_param->gtid_until_condition);
2643
2644 option = until_g = new Until_after_gtids(this);
2645 until_condition = UNTIL_SQL_AFTER_GTIDS;
2646 if (opt_slave_parallel_workers != 0) {
2647 opt_slave_parallel_workers = 0;
2648 push_warning_printf(
2649 thd, Sql_condition::SL_NOTE, ER_MTS_FEATURE_IS_NOT_SUPPORTED,
2650 ER_THD(thd, ER_MTS_FEATURE_IS_NOT_SUPPORTED), "UNTIL condtion",
2651 "Slave is started in the sequential execution mode.");
2652 }
2653 }
2654 ret = until_g->init(master_param->gtid);
2655 } else if (master_param->until_after_gaps) {
2656 Until_mts_gap *until_mg = nullptr;
2657
2658 option = until_mg = new Until_mts_gap(this);
2659 until_condition = UNTIL_SQL_AFTER_MTS_GAPS;
2660 until_mg->init();
2661 } else if (master_param->view_id) {
2662 Until_view_id *until_vi = nullptr;
2663
2664 option = until_vi = new Until_view_id(this);
2665 until_condition = UNTIL_SQL_VIEW_ID;
2666 ret = until_vi->init(master_param->view_id);
2667 }
2668 } catch (...) {
2669 return ER_OUTOFMEMORY;
2670 }
2671
2672 if (until_condition == UNTIL_MASTER_POS ||
2673 until_condition == UNTIL_RELAY_POS) {
2674 /* Issuing warning then started without --skip-slave-start */
2675 if (!opt_skip_slave_start)
2676 push_warning(thd, Sql_condition::SL_NOTE, ER_MISSING_SKIP_SLAVE,
2677 ER_THD(thd, ER_MISSING_SKIP_SLAVE));
2678 }
2679
2680 mysql_mutex_lock(&data_lock);
2681 until_option = option;
2682 mysql_mutex_unlock(&data_lock);
2683 return ret;
2684 }
2685
detach_engine_ha_data(THD * thd)2686 void Relay_log_info::detach_engine_ha_data(THD *thd) {
2687 is_engine_ha_data_detached = true;
2688 /*
2689 In case of slave thread applier or processing binlog by client,
2690 detach the engine ha_data ("native" engine transaction)
2691 in favor of dynamically created.
2692 */
2693 plugin_foreach(thd, detach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN, nullptr);
2694 }
2695
reattach_engine_ha_data(THD * thd)2696 void Relay_log_info::reattach_engine_ha_data(THD *thd) {
2697 is_engine_ha_data_detached = false;
2698 /*
2699 In case of slave thread applier or processing binlog by client,
2700 reattach the engine ha_data ("native" engine transaction)
2701 in favor of dynamically created.
2702 */
2703 plugin_foreach(thd, reattach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN,
2704 nullptr);
2705 }
2706
commit_positions()2707 bool Relay_log_info::commit_positions() {
2708 int error = 0;
2709 char saved_group_master_log_name[FN_REFLEN];
2710 my_off_t saved_group_master_log_pos;
2711 char saved_group_relay_log_name[FN_REFLEN];
2712 my_off_t saved_group_relay_log_pos;
2713
2714 /*
2715 In FILE case Query_log_event::update_pos(), called after commit,
2716 updates the info object.
2717 */
2718 if (!is_transactional()) return false;
2719
2720 mysql_mutex_lock(&data_lock);
2721
2722 /* save the rli positions to restore after flush_info() */
2723 strmake(saved_group_master_log_name, get_group_master_log_name(),
2724 FN_REFLEN - 1);
2725 saved_group_master_log_pos = get_group_master_log_pos();
2726 strmake(saved_group_relay_log_name, get_group_relay_log_name(),
2727 FN_REFLEN - 1);
2728 saved_group_relay_log_pos = get_group_relay_log_pos();
2729
2730 /* Update to new values just for the sake of flush_info */
2731 inc_event_relay_log_pos();
2732 set_group_relay_log_pos(get_event_relay_log_pos());
2733 set_group_relay_log_name(get_event_relay_log_name());
2734 set_group_master_log_pos(current_event->common_header->log_pos);
2735 /* save them too, but now for post_commit() time */
2736 strmake(new_group_master_log_name, get_group_master_log_name(),
2737 FN_REFLEN - 1);
2738 new_group_master_log_pos = get_group_master_log_pos();
2739 strmake(new_group_relay_log_name, get_group_relay_log_name(), FN_REFLEN - 1);
2740 new_group_relay_log_pos = get_group_relay_log_pos();
2741
2742 error = flush_info(true);
2743
2744 /*
2745 Restore the saved ones so they remain actual until the replicated
2746 statement commits.
2747 */
2748 set_group_master_log_name(saved_group_master_log_name);
2749 set_group_master_log_pos(saved_group_master_log_pos);
2750 set_group_relay_log_name(saved_group_relay_log_name);
2751 set_group_relay_log_pos(saved_group_relay_log_pos);
2752
2753 mysql_mutex_unlock(&data_lock);
2754
2755 return error != 0;
2756 }
2757
post_commit(bool on_rollback)2758 void Relay_log_info::post_commit(bool on_rollback) {
2759 THD *thd = info_thd;
2760
2761 if (on_rollback) {
2762 if (thd->owned_gtid_is_empty()) gtid_state->update_on_rollback(thd);
2763 } else {
2764 /*
2765 New executed coordinates prepared in pre_commit() are
2766 finally installed.
2767 */
2768 mysql_mutex_lock(&data_lock);
2769 set_group_master_log_name(new_group_master_log_name);
2770 set_group_master_log_pos(new_group_master_log_pos);
2771 set_group_relay_log_name(new_group_relay_log_name);
2772 set_group_relay_log_pos(new_group_relay_log_pos);
2773 mysql_mutex_unlock(&data_lock);
2774
2775 if (is_transactional()) {
2776 /* DDL's commit has been completed */
2777 DBUG_ASSERT(
2778 !current_event || !is_atomic_ddl_event(current_event) ||
2779 static_cast<Query_log_event *>(current_event)->has_ddl_committed);
2780 /*
2781 Marked as already-committed DDL may have not updated the GTID
2782 executed state, which is the case when slave filters it out.
2783 on slave side with binlog filtering out.
2784 When this is detected now the gtid state will be sorted later,
2785 and the gtid-executed based signaling will be done in the
2786 "last-chance-to-commit" branch of Log_event::do_update_pos().
2787 However in order to enter the branch has_ddl_committed needs false.
2788 */
2789 if (!thd->owned_gtid_is_empty())
2790 static_cast<Query_log_event *>(current_event)->has_ddl_committed =
2791 false;
2792
2793 mysql_mutex_lock(&data_lock);
2794 /* Post-commit cleanup for Relay_log_info::wait_for_pos() */
2795 if (is_group_master_log_pos_invalid)
2796 is_group_master_log_pos_invalid = false;
2797
2798 notify_group_master_log_name_update();
2799 mysql_cond_broadcast(&data_cond);
2800 mysql_mutex_unlock(&data_lock);
2801 } else {
2802 /*
2803 In the non-transactional slave info repository case should the
2804 current event be DDL it would still have to finalize commit.
2805 */
2806 DBUG_ASSERT(
2807 !current_event || !is_atomic_ddl_event(current_event) ||
2808 !static_cast<Query_log_event *>(current_event)->has_ddl_committed);
2809 }
2810 }
2811 }
2812
notify_relay_log_truncated()2813 void Relay_log_info::notify_relay_log_truncated() {
2814 mysql_mutex_lock(&data_lock);
2815 m_relay_log_truncated = true;
2816 mysql_mutex_unlock(&data_lock);
2817 }
2818
clear_relay_log_truncated()2819 void Relay_log_info::clear_relay_log_truncated() {
2820 mysql_mutex_assert_owner(&data_lock);
2821 m_relay_log_truncated = false;
2822 }
2823
is_time_for_mts_checkpoint()2824 bool Relay_log_info::is_time_for_mts_checkpoint() {
2825 if (is_parallel_exec() && opt_mts_checkpoint_period != 0) {
2826 struct timespec curr_clock;
2827 set_timespec_nsec(&curr_clock, 0);
2828 return diff_timespec(&curr_clock, &last_clock) >=
2829 opt_mts_checkpoint_period * 1000000ULL;
2830 }
2831 return false;
2832 }
2833
operator !(Relay_log_info::enum_priv_checks_status status)2834 bool operator!(Relay_log_info::enum_priv_checks_status status) {
2835 return status == Relay_log_info::enum_priv_checks_status::SUCCESS;
2836 }
2837
operator !(Relay_log_info::enum_require_row_status status)2838 bool operator!(Relay_log_info::enum_require_row_status status) {
2839 return status == Relay_log_info::enum_require_row_status::SUCCESS;
2840 }
2841
get_privilege_checks_username() const2842 std::string Relay_log_info::get_privilege_checks_username() const {
2843 return this->m_privilege_checks_username;
2844 }
2845
get_privilege_checks_hostname() const2846 std::string Relay_log_info::get_privilege_checks_hostname() const {
2847 return this->m_privilege_checks_hostname;
2848 }
2849
is_privilege_checks_user_null() const2850 bool Relay_log_info::is_privilege_checks_user_null() const {
2851 DBUG_ASSERT(this->m_privilege_checks_username.length() != 0 ||
2852 (this->m_privilege_checks_username.length() == 0 &&
2853 this->m_privilege_checks_hostname.length() == 0));
2854 return this->m_privilege_checks_username.length() == 0;
2855 }
2856
is_privilege_checks_user_corrupted() const2857 bool Relay_log_info::is_privilege_checks_user_corrupted() const {
2858 return this->m_privilege_checks_user_corrupted;
2859 }
2860
clear_privilege_checks_user()2861 void Relay_log_info::clear_privilege_checks_user() {
2862 DBUG_TRACE;
2863 this->m_privilege_checks_username.clear();
2864 this->m_privilege_checks_hostname.clear();
2865 this->m_privilege_checks_user_corrupted = false;
2866 }
2867
set_privilege_checks_user_corrupted(bool is_corrupted)2868 void Relay_log_info::set_privilege_checks_user_corrupted(bool is_corrupted) {
2869 DBUG_TRACE;
2870 this->m_privilege_checks_user_corrupted = is_corrupted;
2871 }
2872
2873 Relay_log_info::enum_priv_checks_status
set_privilege_checks_user(char const * param_privilege_checks_username,char const * param_privilege_checks_hostname)2874 Relay_log_info::set_privilege_checks_user(
2875 char const *param_privilege_checks_username,
2876 char const *param_privilege_checks_hostname) {
2877 DBUG_TRACE;
2878
2879 enum_priv_checks_status error = this->check_privilege_checks_user(
2880 param_privilege_checks_username, param_privilege_checks_hostname);
2881 if (!!error) return error;
2882
2883 if (param_privilege_checks_username == nullptr) {
2884 this->clear_privilege_checks_user();
2885 return enum_priv_checks_status::SUCCESS;
2886 }
2887
2888 this->m_privilege_checks_user_corrupted = false;
2889 this->m_privilege_checks_username =
2890 static_cast<std::string>(param_privilege_checks_username);
2891
2892 if (param_privilege_checks_hostname != nullptr) {
2893 this->m_privilege_checks_hostname =
2894 static_cast<std::string>(param_privilege_checks_hostname);
2895 std::transform(this->m_privilege_checks_hostname.begin(),
2896 this->m_privilege_checks_hostname.end(),
2897 this->m_privilege_checks_hostname.begin(), ::tolower);
2898 }
2899
2900 return enum_priv_checks_status::SUCCESS;
2901 }
2902
2903 Relay_log_info::enum_priv_checks_status
check_privilege_checks_user()2904 Relay_log_info::check_privilege_checks_user() {
2905 DBUG_TRACE;
2906 DBUG_ASSERT(this->m_privilege_checks_username.length() != 0 ||
2907 (this->m_privilege_checks_username.length() == 0 &&
2908 this->m_privilege_checks_hostname.length() == 0));
2909
2910 return this->check_privilege_checks_user(
2911 this->m_privilege_checks_username.length() != 0
2912 ? this->m_privilege_checks_username.data()
2913 : nullptr,
2914 this->m_privilege_checks_hostname.length() != 0
2915 ? this->m_privilege_checks_hostname.data()
2916 : nullptr);
2917 }
2918
2919 Relay_log_info::enum_priv_checks_status
check_privilege_checks_user(char const * param_privilege_checks_username,char const * param_privilege_checks_hostname)2920 Relay_log_info::check_privilege_checks_user(
2921 char const *param_privilege_checks_username,
2922 char const *param_privilege_checks_hostname) {
2923 DBUG_TRACE;
2924
2925 if (param_privilege_checks_username == nullptr) {
2926 if (param_privilege_checks_hostname != nullptr)
2927 return enum_priv_checks_status::USERNAME_NULL_HOSTNAME_NOT_NULL;
2928 return enum_priv_checks_status::SUCCESS;
2929 }
2930
2931 if (strlen(param_privilege_checks_username) == 0)
2932 return enum_priv_checks_status::USER_ANONYMOUS;
2933
2934 if (strlen(param_privilege_checks_username) > 32)
2935 return enum_priv_checks_status::USERNAME_TOO_LONG;
2936
2937 if (param_privilege_checks_hostname != nullptr &&
2938 strlen(param_privilege_checks_hostname) > 255)
2939 return enum_priv_checks_status::HOSTNAME_TOO_LONG;
2940
2941 if (param_privilege_checks_hostname != nullptr) {
2942 std::regex static const hostname_regex{"^((?![@])[\\x20-\\x7e])+$",
2943 std::regex_constants::ECMAScript};
2944 if (!std::regex_match(param_privilege_checks_hostname, hostname_regex))
2945 return enum_priv_checks_status::HOSTNAME_SYNTAX_ERROR;
2946 }
2947
2948 enum_priv_checks_status error = this->check_applier_acl_user(
2949 param_privilege_checks_username, param_privilege_checks_hostname);
2950 if (!!error) return error;
2951
2952 return enum_priv_checks_status::SUCCESS;
2953 }
2954
check_applier_acl_user(char const * param_privilege_checks_username,char const * param_privilege_checks_hostname)2955 Relay_log_info::enum_priv_checks_status Relay_log_info::check_applier_acl_user(
2956 char const *param_privilege_checks_username,
2957 char const *param_privilege_checks_hostname) {
2958 DBUG_TRACE;
2959 DBUG_ASSERT(param_privilege_checks_username != nullptr &&
2960 strlen(param_privilege_checks_username) != 0);
2961
2962 THD_instance_guard thd{current_thd != nullptr ? current_thd : this->info_thd};
2963 Acl_cache_lock_guard acl_cache_lock{thd, Acl_cache_lock_mode::READ_MODE};
2964 if (!acl_cache_lock.lock()) { // If we're unable to acquire the lock we're
2965 // unable to check if the user exists
2966 return enum_priv_checks_status::USER_DOES_NOT_EXIST; // so, return
2967 // accordingly.
2968 }
2969
2970 ACL_USER *applier_acl_user = find_acl_user(
2971 param_privilege_checks_hostname, param_privilege_checks_username, false);
2972
2973 if (applier_acl_user == nullptr) {
2974 return enum_priv_checks_status::USER_DOES_NOT_EXIST;
2975 }
2976
2977 return enum_priv_checks_status::SUCCESS;
2978 }
2979
2980 std::pair<const char *, const char *>
print_applier_security_context_user_host() const2981 Relay_log_info::print_applier_security_context_user_host() const {
2982 if (this->m_privilege_checks_username.length() != 0) {
2983 return {this->m_privilege_checks_username.data(),
2984 this->m_privilege_checks_hostname.length() == 0
2985 ? "%"
2986 : this->m_privilege_checks_hostname.data()};
2987 } else if (this->info_thd != nullptr &&
2988 this->info_thd->security_context() != nullptr) {
2989 return {this->info_thd->security_context()->user().str != nullptr
2990 ? this->info_thd->security_context()->user().str
2991 : "",
2992 this->info_thd->security_context()->host().str != nullptr
2993 ? this->info_thd->security_context()->host().str
2994 : ""};
2995 }
2996 return {"", ""};
2997 }
2998
report_privilege_check_error(enum loglevel level,enum_priv_checks_status status_code,bool to_client,char const * channel_name_arg,char const * user_name_arg,char const * host_name_arg) const2999 void Relay_log_info::report_privilege_check_error(
3000 enum loglevel level, enum_priv_checks_status status_code, bool to_client,
3001 char const *channel_name_arg, char const *user_name_arg,
3002 char const *host_name_arg) const {
3003 DBUG_TRACE;
3004
3005 char const *channel_name{channel_name_arg != nullptr ? channel_name_arg
3006 : get_channel()};
3007 char const *user_name{user_name_arg};
3008 char const *host_name{host_name_arg};
3009 if (user_name == nullptr) {
3010 std::tie(user_name, host_name) =
3011 this->print_applier_security_context_user_host();
3012 }
3013
3014 switch (status_code) {
3015 case enum_priv_checks_status::SUCCESS: {
3016 DBUG_ASSERT(false);
3017 break;
3018 }
3019 case enum_priv_checks_status::USER_ANONYMOUS: {
3020 if (to_client)
3021 my_error(ER_CLIENT_PRIVILEGE_CHECKS_USER_CANNOT_BE_ANONYMOUS, MYF(0),
3022 channel_name, host_name);
3023 else {
3024 THD_instance_guard thd{current_thd != nullptr ? current_thd
3025 : this->info_thd};
3026 this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3027 ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3028 channel_name);
3029 }
3030 break;
3031 }
3032 case enum_priv_checks_status::USERNAME_TOO_LONG: {
3033 if (to_client)
3034 my_error(ER_WRONG_STRING_LENGTH, MYF(0), user_name, "user name", 32);
3035 else {
3036 THD_instance_guard thd{current_thd != nullptr ? current_thd
3037 : this->info_thd};
3038 this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3039 ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3040 channel_name);
3041 }
3042 break;
3043 }
3044 case enum_priv_checks_status::HOSTNAME_TOO_LONG: {
3045 if (to_client)
3046 my_error(ER_WRONG_STRING_LENGTH, MYF(0), user_name, "host name", 255);
3047 else {
3048 THD_instance_guard thd{current_thd != nullptr ? current_thd
3049 : this->info_thd};
3050 this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3051 ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3052 channel_name);
3053 }
3054 break;
3055 }
3056 case enum_priv_checks_status::HOSTNAME_SYNTAX_ERROR: {
3057 if (to_client)
3058 my_printf_error(ER_UNKNOWN_ERROR, "Malformed hostname (illegal symbol)",
3059 MYF(0));
3060 else {
3061 THD_instance_guard thd{current_thd != nullptr ? current_thd
3062 : this->info_thd};
3063 this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3064 ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3065 channel_name);
3066 }
3067 break;
3068 }
3069 case enum_priv_checks_status::USER_DATA_CORRUPTED:
3070 case enum_priv_checks_status::USERNAME_NULL_HOSTNAME_NOT_NULL: {
3071 if (to_client)
3072 my_error(ER_CLIENT_PRIVILEGE_CHECKS_USER_CORRUPT, MYF(0), channel_name);
3073 else {
3074 THD_instance_guard thd{current_thd != nullptr ? current_thd
3075 : this->info_thd};
3076 this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3077 ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3078 channel_name);
3079 }
3080 break;
3081 }
3082 case enum_priv_checks_status::USER_DOES_NOT_EXIST: {
3083 if (to_client)
3084 my_error(ER_CLIENT_PRIVILEGE_CHECKS_USER_DOES_NOT_EXIST, MYF(0),
3085 channel_name, user_name, host_name);
3086 else {
3087 THD_instance_guard thd{current_thd != nullptr ? current_thd
3088 : this->info_thd};
3089 this->report(
3090 level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_DOES_NOT_EXIST,
3091 ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_DOES_NOT_EXIST),
3092 channel_name, user_name, host_name);
3093 }
3094 break;
3095 }
3096 case enum_priv_checks_status::USER_DOES_NOT_HAVE_PRIVILEGES: {
3097 if (!to_client) {
3098 THD_instance_guard thd{current_thd != nullptr ? current_thd
3099 : this->info_thd};
3100 this->report(
3101 level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_NEEDS_RPL_APPLIER_PRIV,
3102 ER_THD(thd,
3103 ER_WARN_LOG_PRIVILEGE_CHECKS_USER_NEEDS_RPL_APPLIER_PRIV),
3104 channel_name, user_name, host_name);
3105 }
3106 break;
3107 }
3108 case enum_priv_checks_status::LOAD_DATA_EVENT_NOT_ALLOWED: {
3109 if (!to_client) {
3110 THD_instance_guard thd{current_thd != nullptr ? current_thd
3111 : this->info_thd};
3112 this->report(level, ER_FILE_PRIVILEGE_FOR_REPLICATION_CHECKS,
3113 ER_THD(thd, ER_FILE_PRIVILEGE_FOR_REPLICATION_CHECKS),
3114 channel_name);
3115 }
3116 break;
3117 }
3118 }
3119 }
3120
3121 Relay_log_info::enum_priv_checks_status
initialize_security_context(THD * thd)3122 Relay_log_info::initialize_security_context(THD *thd) {
3123 DBUG_TRACE;
3124
3125 if (this->m_privilege_checks_user_corrupted)
3126 return enum_priv_checks_status::USER_DATA_CORRUPTED;
3127
3128 if (this->m_privilege_checks_username.length() != 0) {
3129 if (acl_getroot(thd, &thd->m_main_security_ctx,
3130 this->m_privilege_checks_username.data(),
3131 this->m_privilege_checks_hostname.data(), nullptr,
3132 nullptr)) {
3133 return enum_priv_checks_status::USER_DOES_NOT_EXIST;
3134 }
3135
3136 Roles::Role_activation role_activation{
3137 thd, thd->security_context(),
3138 opt_always_activate_granted_roles == 0 ? role_enum::ROLE_DEFAULT
3139 : role_enum::ROLE_ALL,
3140 nullptr, true};
3141 if (role_activation.activate())
3142 return enum_priv_checks_status::USER_DOES_NOT_HAVE_PRIVILEGES;
3143
3144 bool has_grant{false};
3145 std::tie(has_grant, std::ignore) =
3146 thd->m_main_security_ctx.has_global_grant(
3147 STRING_WITH_LEN("REPLICATION_APPLIER"));
3148 if (!has_grant) {
3149 return enum_priv_checks_status::USER_DOES_NOT_HAVE_PRIVILEGES;
3150 }
3151 } else
3152 thd->security_context()->skip_grants();
3153
3154 return enum_priv_checks_status::SUCCESS;
3155 }
3156
3157 Relay_log_info::enum_priv_checks_status
initialize_applier_security_context()3158 Relay_log_info::initialize_applier_security_context() {
3159 DBUG_TRACE;
3160 return this->initialize_security_context(this->info_thd);
3161 }
3162
is_row_format_required() const3163 bool Relay_log_info::is_row_format_required() const {
3164 return this->m_require_row_format;
3165 }
3166
set_require_row_format(bool require_row)3167 void Relay_log_info::set_require_row_format(bool require_row) {
3168 DBUG_TRACE;
3169 this->m_require_row_format = require_row;
3170 }
3171
3172 Relay_log_info::enum_require_table_primary_key
get_require_table_primary_key_check() const3173 Relay_log_info::get_require_table_primary_key_check() const {
3174 return this->m_require_table_primary_key_check;
3175 }
3176
set_require_table_primary_key_check(Relay_log_info::enum_require_table_primary_key require_pk)3177 void Relay_log_info::set_require_table_primary_key_check(
3178 Relay_log_info::enum_require_table_primary_key require_pk) {
3179 DBUG_TRACE;
3180 this->m_require_table_primary_key_check = require_pk;
3181 }
3182
MDL_lock_guard(THD * target)3183 MDL_lock_guard::MDL_lock_guard(THD *target) : m_target{target} { DBUG_TRACE; }
3184
MDL_lock_guard(THD * target,MDL_key::enum_mdl_namespace namespace_arg,enum_mdl_type mdl_type_arg,bool blocking)3185 MDL_lock_guard::MDL_lock_guard(THD *target,
3186 MDL_key::enum_mdl_namespace namespace_arg,
3187 enum_mdl_type mdl_type_arg, bool blocking)
3188 : m_target{target} {
3189 DBUG_TRACE;
3190 this->lock(namespace_arg, mdl_type_arg, blocking);
3191 }
3192
lock(MDL_key::enum_mdl_namespace namespace_arg,enum_mdl_type mdl_type_arg,bool blocking)3193 bool MDL_lock_guard::lock(MDL_key::enum_mdl_namespace namespace_arg,
3194 enum_mdl_type mdl_type_arg, bool blocking) {
3195 DBUG_TRACE;
3196 if (this->m_target != nullptr &&
3197 !this->m_target->mdl_context.has_locks(namespace_arg)) {
3198 MDL_REQUEST_INIT(&this->m_request, namespace_arg, "", "", mdl_type_arg,
3199 MDL_EXPLICIT);
3200
3201 if (blocking)
3202 this->m_target->mdl_context.acquire_lock(
3203 &this->m_request, this->m_target->variables.lock_wait_timeout);
3204 else
3205 this->m_target->mdl_context.try_acquire_lock(&this->m_request);
3206
3207 return !this->is_locked();
3208 }
3209 return true;
3210 }
3211
~MDL_lock_guard()3212 MDL_lock_guard::~MDL_lock_guard() {
3213 DBUG_TRACE;
3214 if (this->m_request.ticket != nullptr) {
3215 this->m_target->mdl_context.release_lock(this->m_request.ticket);
3216 }
3217 }
3218
is_locked()3219 bool MDL_lock_guard::is_locked() { return this->m_request.ticket != nullptr; }
3220
Applier_security_context_guard(Relay_log_info const * rli,THD const * thd)3221 Applier_security_context_guard::Applier_security_context_guard(
3222 Relay_log_info const *rli, THD const *thd)
3223 : m_target{rli},
3224 m_thd{thd},
3225 m_current{nullptr},
3226 m_previous{nullptr},
3227 m_privilege_checks_none{
3228 this->m_target->get_privilege_checks_username().length() == 0 &&
3229 (this->m_thd->lex == nullptr ||
3230 this->m_thd->lex->binlog_stmt_arg.length == 0 ||
3231 this->m_thd->lex->binlog_stmt_arg.length >= 2048 ||
3232 this->m_thd->lex->binlog_stmt_arg.str == nullptr)} {
3233 DBUG_TRACE;
3234
3235 if (this->m_thd->lex != nullptr &&
3236 this->m_thd->lex->binlog_stmt_arg.length != 0 &&
3237 this->m_thd->lex->binlog_stmt_arg.length < 2048 &&
3238 this->m_thd->lex->binlog_stmt_arg.str != nullptr) {
3239 Security_context *standing_ctx = this->m_thd->security_context();
3240 if (standing_ctx != nullptr &&
3241 (standing_ctx->check_access(SUPER_ACL) ||
3242 standing_ctx->has_global_grant(STRING_WITH_LEN("BINLOG_ADMIN")).first))
3243 this->m_privilege_checks_none = true;
3244 }
3245 if (this->m_privilege_checks_none) return;
3246
3247 if (this->m_target->get_privilege_checks_username().length() != 0) {
3248 std::string user = this->m_target->get_privilege_checks_username();
3249 std::string host = this->m_target->get_privilege_checks_hostname();
3250 LEX_CSTRING username{user.c_str(), user.length()};
3251 LEX_CSTRING hostname{host.c_str(), host.length()};
3252
3253 if (this->m_applier_security_ctx.change_security_context(
3254 const_cast<THD *>(this->m_thd), username, hostname, nullptr,
3255 &this->m_previous)) {
3256 return;
3257 }
3258 }
3259
3260 if (this->m_previous != nullptr) {
3261 Roles::Role_activation role_activation{
3262 const_cast<THD *>(this->m_thd), this->m_thd->security_context(),
3263 opt_always_activate_granted_roles == 0 ? role_enum::ROLE_DEFAULT
3264 : role_enum::ROLE_ALL,
3265 nullptr, true};
3266 if (role_activation.activate()) return;
3267 }
3268
3269 this->m_current = this->m_thd->security_context();
3270 }
3271
~Applier_security_context_guard()3272 Applier_security_context_guard::~Applier_security_context_guard() {
3273 if (this->m_privilege_checks_none) return;
3274
3275 if (this->m_previous != nullptr && this->m_previous != this->m_current)
3276 this->m_applier_security_ctx.restore_security_context(
3277 const_cast<THD *>(this->m_thd), this->m_previous);
3278 }
3279
skip_priv_checks() const3280 bool Applier_security_context_guard::skip_priv_checks() const {
3281 return this->m_privilege_checks_none;
3282 }
3283
has_access(std::initializer_list<ulong> extra_privileges) const3284 bool Applier_security_context_guard::has_access(
3285 std::initializer_list<ulong> extra_privileges) const {
3286 if (this->m_privilege_checks_none) return true;
3287 if (this->m_current == nullptr) return false;
3288
3289 for (auto privilege : extra_privileges)
3290 if (!this->m_current->check_access(privilege, "", true)) return false;
3291
3292 return true;
3293 }
3294
has_access(std::initializer_list<std::string> extra_privileges) const3295 bool Applier_security_context_guard::has_access(
3296 std::initializer_list<std::string> extra_privileges) const {
3297 if (this->m_privilege_checks_none) return true;
3298 if (this->m_current == nullptr) return false;
3299
3300 for (auto privilege : extra_privileges) {
3301 if (!this->m_current->has_global_grant(privilege.data(), privilege.length())
3302 .first)
3303 return false;
3304 }
3305 return true;
3306 }
3307
has_access(std::vector<std::tuple<ulong,TABLE const *,Rows_log_event * >> & extra_privileges) const3308 bool Applier_security_context_guard::has_access(
3309 std::vector<std::tuple<ulong, TABLE const *, Rows_log_event *>>
3310 &extra_privileges) const {
3311 if (this->m_privilege_checks_none) return true;
3312 if (this->m_current == nullptr) return false;
3313
3314 ulong priv{0};
3315 TABLE const *table{nullptr};
3316
3317 if (this->m_thd->variables.binlog_row_image == BINLOG_ROW_IMAGE_FULL) {
3318 for (auto tpl : extra_privileges) {
3319 std::tie(priv, table, std::ignore) = tpl;
3320 if (this->m_current->is_table_blocked(priv, table)) return false;
3321 }
3322 } else {
3323 Rows_log_event *event{nullptr};
3324
3325 for (auto tpl : extra_privileges) {
3326 std::tie(priv, table, event) = tpl;
3327
3328 if (event->get_general_type_code() == binary_log::DELETE_ROWS_EVENT) {
3329 if (this->m_current->is_table_blocked(priv, table)) return false;
3330 } else {
3331 std::vector<std::string> columns;
3332 this->extract_columns_to_check(table, event, columns);
3333 if (!this->m_current->has_column_access(priv, table, columns))
3334 return false;
3335 }
3336 }
3337 }
3338
3339 return true;
3340 }
3341
get_username() const3342 std::string Applier_security_context_guard::get_username() const {
3343 if (this->m_privilege_checks_none || this->m_current == nullptr) return "";
3344 return std::string(this->m_current->user().str,
3345 this->m_current->user().length);
3346 }
3347
get_hostname() const3348 std::string Applier_security_context_guard::get_hostname() const {
3349 if (this->m_privilege_checks_none || this->m_current == nullptr) return "";
3350 return std::string(this->m_current->host().str,
3351 this->m_current->host().length);
3352 }
3353
extract_columns_to_check(TABLE const * table,Rows_log_event * event,std::vector<std::string> & columns) const3354 void Applier_security_context_guard::extract_columns_to_check(
3355 TABLE const *table, Rows_log_event *event,
3356 std::vector<std::string> &columns) const {
3357 MY_BITMAP const *bitmap{nullptr};
3358
3359 if (event->get_general_type_code() == binary_log::WRITE_ROWS_EVENT)
3360 bitmap = event->get_cols();
3361 else if (event->get_general_type_code() == binary_log::UPDATE_ROWS_EVENT)
3362 bitmap = event->get_cols_ai();
3363 else
3364 return;
3365
3366 size_t max = std::min(bitmap->n_bits, table->s->fields);
3367 for (size_t idx = 0; idx != max; ++idx) {
3368 if (bitmap_is_set(bitmap, idx)) {
3369 columns.push_back(table->field[idx]->field_name);
3370 }
3371 }
3372 }
3373