1 /* Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/rpl_rli.h"
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/stat.h>
29 #include <algorithm>
30 #include <regex>
31 
32 #include "libbinlogevents/include/binlog_event.h"
33 #include "m_ctype.h"
34 #include "mutex_lock.h"  // Mutex_lock
35 #include "my_bitmap.h"
36 #include "my_dbug.h"
37 #include "my_dir.h"  // MY_STAT
38 #include "my_sqlcommand.h"
39 #include "my_systime.h"
40 #include "my_thread.h"
41 #include "mysql/components/services/log_builtins.h"
42 #include "mysql/components/services/psi_stage_bits.h"
43 #include "mysql/plugin.h"
44 #include "mysql/psi/mysql_cond.h"
45 #include "mysql/psi/mysql_file.h"
46 #include "mysql/psi/psi_base.h"
47 #include "mysql/service_mysql_alloc.h"
48 #include "mysql/service_thd_wait.h"
49 #include "mysql_com.h"
50 #include "mysqld_error.h"
51 #include "sql/auth/auth_acls.h"  // SUPER_ACL
52 #include "sql/auth/roles.h"      // Roles::Role_activation
53 #include "sql/auth/sql_auth_cache.h"
54 #include "sql/debug_sync.h"
55 #include "sql/derror.h"
56 #include "sql/log_event.h"  // Log_event
57 #include "sql/mdl.h"
58 #include "sql/mysqld.h"  // sync_relaylog_period ...
59 #include "sql/protocol.h"
60 #include "sql/rpl_info_factory.h"  // Rpl_info_factory
61 #include "sql/rpl_info_handler.h"
62 #include "sql/rpl_mi.h"   // Master_info
63 #include "sql/rpl_msr.h"  // channel_map
64 #include "sql/rpl_reporting.h"
65 #include "sql/rpl_rli_pdb.h"  // Slave_worker
66 #include "sql/rpl_slave.h"
67 #include "sql/rpl_trx_boundary_parser.h"
68 #include "sql/sql_base.h"  // close_thread_tables
69 #include "sql/sql_error.h"
70 #include "sql/sql_lex.h"
71 #include "sql/sql_list.h"
72 #include "sql/sql_plugin.h"
73 #include "sql/strfunc.h"      // strconvert
74 #include "sql/transaction.h"  // trans_commit_stmt
75 #include "sql/transaction_info.h"
76 #include "sql/xa.h"
77 #include "sql_string.h"
78 #include "thr_mutex.h"
79 
80 class Item;
81 
82 using std::max;
83 using std::min;
84 
85 /*
86   Please every time you add a new field to the relay log info, update
87   what follows. For now, this is just used to get the number of
88   fields.
89 */
90 const char *info_rli_fields[] = {"number_of_lines",
91                                  "group_relay_log_name",
92                                  "group_relay_log_pos",
93                                  "group_master_log_name",
94                                  "group_master_log_pos",
95                                  "sql_delay",
96                                  "number_of_workers",
97                                  "id",
98                                  "channel_name",
99                                  "privilege_checks_user",
100                                  "privilege_checks_hostname",
101                                  "require_row_format",
102                                  "require_table_primary_key_check"};
103 
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel,bool is_rli_fake)104 Relay_log_info::Relay_log_info(bool is_slave_recovery,
105 #ifdef HAVE_PSI_INTERFACE
106                                PSI_mutex_key *param_key_info_run_lock,
107                                PSI_mutex_key *param_key_info_data_lock,
108                                PSI_mutex_key *param_key_info_sleep_lock,
109                                PSI_mutex_key *param_key_info_thd_lock,
110                                PSI_mutex_key *param_key_info_data_cond,
111                                PSI_mutex_key *param_key_info_start_cond,
112                                PSI_mutex_key *param_key_info_stop_cond,
113                                PSI_mutex_key *param_key_info_sleep_cond,
114 #endif
115                                uint param_id, const char *param_channel,
116                                bool is_rli_fake)
117     : Rpl_info("SQL",
118 #ifdef HAVE_PSI_INTERFACE
119                param_key_info_run_lock, param_key_info_data_lock,
120                param_key_info_sleep_lock, param_key_info_thd_lock,
121                param_key_info_data_cond, param_key_info_start_cond,
122                param_key_info_stop_cond, param_key_info_sleep_cond,
123 #endif
124                param_id, param_channel),
125       replicate_same_server_id(::replicate_same_server_id),
126       relay_log(&sync_relaylog_period, true),
127       is_relay_log_recovery(is_slave_recovery),
128       save_temporary_tables(nullptr),
129       mi(nullptr),
130       error_on_rli_init_info(false),
131       gtid_timestamps_warning_logged(false),
132       transaction_parser(
133           Transaction_boundary_parser::TRX_BOUNDARY_PARSER_APPLIER),
134       group_relay_log_pos(0),
135       event_relay_log_number(0),
136       event_relay_log_pos(0),
137       event_start_pos(0),
138       group_master_log_pos(0),
139       gtid_set(nullptr),
140       rli_fake(is_rli_fake),
141       gtid_retrieved_initialized(false),
142       m_privilege_checks_username{""},
143       m_privilege_checks_hostname{""},
144       m_privilege_checks_user_corrupted{false},
145       m_require_row_format(false),
146       m_require_table_primary_key_check(PK_CHECK_STREAM),
147       is_group_master_log_pos_invalid(false),
148       log_space_total(0),
149       ignore_log_space_limit(false),
150       sql_force_rotate_relay(false),
151       last_master_timestamp(0),
152       slave_skip_counter(0),
153       abort_pos_wait(0),
154       until_condition(UNTIL_NONE),
155       trans_retries(0),
156       retried_trans(0),
157       tables_to_lock(nullptr),
158       tables_to_lock_count(0),
159       rows_query_ev(nullptr),
160       last_event_start_time(0),
161       deferred_events(nullptr),
162       workers(PSI_NOT_INSTRUMENTED),
163       workers_array_initialized(false),
164       curr_group_assigned_parts(PSI_NOT_INSTRUMENTED),
165       curr_group_da(PSI_NOT_INSTRUMENTED),
166       curr_group_seen_begin(false),
167       mts_end_group_sets_max_dbs(false),
168       slave_parallel_workers(0),
169       exit_counter(0),
170       max_updated_index(0),
171       recovery_parallel_workers(0),
172       rli_checkpoint_seqno(0),
173       checkpoint_group(opt_mts_checkpoint_group),
174       recovery_groups_inited(false),
175       mts_recovery_group_cnt(0),
176       mts_recovery_index(0),
177       mts_recovery_group_seen_begin(false),
178       mts_group_status(MTS_NOT_IN_GROUP),
179       stats_exec_time(0),
180       stats_read_time(0),
181       least_occupied_workers(PSI_NOT_INSTRUMENTED),
182       current_mts_submode(nullptr),
183       reported_unsafe_warning(false),
184       rli_description_event(nullptr),
185       commit_order_mngr(nullptr),
186       sql_delay(0),
187       sql_delay_end(0),
188       m_flags(0),
189       row_stmt_start_timestamp(0),
190       long_find_row_note_printed(false),
191       thd_tx_priority(0),
192       is_engine_ha_data_detached(false),
193       current_event(nullptr),
194       ddl_not_atomic(false) {
195   DBUG_TRACE;
196 
197 #ifdef HAVE_PSI_INTERFACE
198   relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, key_RELAYLOG_LOCK_commit,
199                          key_RELAYLOG_LOCK_commit_queue, key_RELAYLOG_LOCK_done,
200                          key_RELAYLOG_LOCK_flush_queue, key_RELAYLOG_LOCK_log,
201                          key_RELAYLOG_LOCK_log_end_pos, key_RELAYLOG_LOCK_sync,
202                          key_RELAYLOG_LOCK_sync_queue, key_RELAYLOG_LOCK_xids,
203                          key_RELAYLOG_COND_done, key_RELAYLOG_update_cond,
204                          key_RELAYLOG_prep_xids_cond, key_file_relaylog,
205                          key_file_relaylog_index, key_file_relaylog_cache,
206                          key_file_relaylog_index_cache);
207 #endif
208 
209   group_relay_log_name[0] = event_relay_log_name[0] = group_master_log_name[0] =
210       0;
211   ign_master_log_name_end[0] = 0;
212   set_timespec_nsec(&last_clock, 0);
213   cached_charset_invalidate();
214   inited_hash_workers = false;
215   commit_timestamps_status = COMMIT_TS_UNKNOWN;
216 
217   if (!rli_fake) {
218     mysql_mutex_init(key_relay_log_info_log_space_lock, &log_space_lock,
219                      MY_MUTEX_INIT_FAST);
220     mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond);
221     mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
222                      MY_MUTEX_INIT_FAST);
223     mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond);
224     mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
225                      MY_MUTEX_INIT_FAST);
226     mysql_mutex_init(key_mts_temp_table_LOCK, &mts_temp_table_LOCK,
227                      MY_MUTEX_INIT_FAST);
228     mysql_mutex_init(key_mts_gaq_LOCK, &mts_gaq_LOCK, MY_MUTEX_INIT_FAST);
229     mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
230 
231     relay_log.init_pthread_objects();
232     force_flush_postponed_due_to_split_trans = false;
233 
234     Checkable_rwlock *sid_lock = new Checkable_rwlock(
235 #if defined(HAVE_PSI_INTERFACE)
236         key_rwlock_receiver_sid_lock
237 #endif
238     );
239     Sid_map *sid_map = new Sid_map(sid_lock);
240     gtid_set = new Gtid_set(sid_map, sid_lock);
241 
242     /*
243       Group replication applier channel shall not use checksum on its relay
244       log files.
245     */
246     if (channel_map.is_group_replication_channel_name(param_channel, true)) {
247       relay_log.relay_log_checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
248     }
249   }
250   gtid_monitoring_info = new Gtid_monitoring_info();
251   do_server_version_split(::server_version, slave_version_split);
252   until_option = nullptr;
253   rpl_filter = nullptr;
254 }
255 
256 /**
257    The method to invoke at slave threads start
258 */
init_workers(ulong n_workers)259 void Relay_log_info::init_workers(ulong n_workers) {
260   /*
261     Parallel slave parameters initialization is done regardless
262     whether the feature is or going to be active or not.
263   */
264   mts_groups_assigned = mts_events_assigned = pending_jobs = wq_size_waits_cnt =
265       0;
266   mts_wq_excess_cnt = mts_wq_no_underrun_cnt = mts_wq_overfill_cnt = 0;
267   mts_total_wait_overlap = 0;
268   mts_total_wait_worker_avail = 0;
269   mts_last_online_stat = 0;
270 
271   workers.reserve(n_workers);
272   workers_array_initialized = true;  // set after init
273 }
274 
275 /**
276    The method to invoke at slave threads stop
277 */
deinit_workers()278 void Relay_log_info::deinit_workers() { workers.clear(); }
279 
~Relay_log_info()280 Relay_log_info::~Relay_log_info() {
281   DBUG_TRACE;
282 
283   if (!rli_fake) {
284     if (recovery_groups_inited) bitmap_free(&recovery_groups);
285     delete current_mts_submode;
286 
287     if (rpl_filter != nullptr) {
288       /* Remove the channel's replication filter from rpl_channel_filters. */
289       rpl_channel_filters.delete_filter(rpl_filter);
290       rpl_filter = nullptr;
291     }
292 
293     if (workers_copy_pfs.size()) {
294       for (int i = static_cast<int>(workers_copy_pfs.size()) - 1; i >= 0; i--)
295         delete workers_copy_pfs[i];
296       workers_copy_pfs.clear();
297     }
298 
299     mysql_mutex_destroy(&log_space_lock);
300     mysql_cond_destroy(&log_space_cond);
301     mysql_mutex_destroy(&pending_jobs_lock);
302     mysql_cond_destroy(&pending_jobs_cond);
303     mysql_mutex_destroy(&exit_count_lock);
304     mysql_mutex_destroy(&mts_temp_table_LOCK);
305     mysql_mutex_destroy(&mts_gaq_LOCK);
306     mysql_cond_destroy(&logical_clock_cond);
307     relay_log.cleanup();
308 
309     Sid_map *sid_map = gtid_set->get_sid_map();
310     Checkable_rwlock *sid_lock = sid_map->get_sid_lock();
311 
312     sid_lock->wrlock();
313     gtid_set->clear();
314     sid_map->clear();
315     delete gtid_set;
316     delete sid_map;
317     sid_lock->unlock();
318     delete sid_lock;
319   }
320 
321   if (set_rli_description_event(nullptr)) {
322 #ifndef DBUG_OFF
323     bool set_rli_description_event_failed{false};
324 #endif
325     DBUG_ASSERT(set_rli_description_event_failed);
326   }
327   delete until_option;
328   delete gtid_monitoring_info;
329 }
330 
331 /**
332    Method is called when MTS coordinator senses the relay-log name
333    has been changed.
334    It marks each Worker member with this fact to make an action
335    at time it will distribute a terminal event of a group to the Worker.
336 
337    Worker receives the new name at the group commiting phase
338    @c Slave_worker::slave_worker_ends_group().
339 */
reset_notified_relay_log_change()340 void Relay_log_info::reset_notified_relay_log_change() {
341   if (!is_parallel_exec()) return;
342   for (Slave_worker **it = workers.begin(); it != workers.end(); ++it) {
343     Slave_worker *w = *it;
344     w->relay_log_change_notified = false;
345   }
346 }
347 
348 /**
349    This method is called in mts_checkpoint_routine() to mark that each
350    worker is required to adapt to a new checkpoint data whose coordinates
351    are passed to it through GAQ index.
352 
353    Worker notices the new checkpoint value at the group commit to reset
354    the current bitmap and starts using the clean bitmap indexed from zero
355    of being reset rli_checkpoint_seqno.
356 
357     New seconds_behind_master timestamp is installed.
358 
359    @param shift            number of bits to shift by Worker due to the
360                            current checkpoint change.
361    @param new_ts           new seconds_behind_master timestamp value
362                            unless zero. Zero could be due to FD event
363                            or fake rotate event.
364    @param update_timestamp if true, this function will update the
365                            rli->last_master_timestamp.
366 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool update_timestamp)367 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
368                                                bool update_timestamp) {
369   /*
370     If this is not a parallel execution we return immediately.
371   */
372   if (!is_parallel_exec()) return;
373 
374   for (Slave_worker **it = workers.begin(); it != workers.end(); ++it) {
375     Slave_worker *w = *it;
376     /*
377       Reseting the notification information in order to force workers to
378       assign jobs with the new updated information.
379       Notice that the bitmap_shifted is accumulated to indicate how many
380       consecutive jobs were successfully processed.
381 
382       The worker when assigning a new job will set the value back to
383       zero.
384     */
385     w->checkpoint_notified = false;
386     w->bitmap_shifted = w->bitmap_shifted + shift;
387     /*
388       Zero shift indicates the caller rotates the master binlog.
389       The new name will be passed to W through the group descriptor
390       during the first post-rotation time scheduling.
391     */
392     if (shift == 0) w->master_log_change_notified = false;
393 
394     DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
395                        "worker->bitmap_shifted --> %lu, worker --> %u.",
396                        shift, w->bitmap_shifted,
397                        static_cast<unsigned>(it - workers.begin())));
398   }
399   /*
400     There should not be a call where (shift == 0 && rli_checkpoint_seqno != 0).
401     Then the new checkpoint sequence is updated by subtracting the number
402     of consecutive jobs that were successfully processed.
403   */
404   DBUG_ASSERT(current_mts_submode->get_type() != MTS_PARALLEL_TYPE_DB_NAME ||
405               !(shift == 0 && rli_checkpoint_seqno != 0));
406   rli_checkpoint_seqno = rli_checkpoint_seqno - shift;
407   DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
408                      "rli_checkpoint_seqno --> %u.",
409                      shift, rli_checkpoint_seqno));
410 
411   if (update_timestamp) {
412     mysql_mutex_lock(&data_lock);
413     last_master_timestamp = new_ts;
414     mysql_mutex_unlock(&data_lock);
415   }
416 }
417 
418 /**
419    Reset recovery info from Worker info table and
420    mark MTS recovery is completed.
421 
422    @return false on success true when @c reset_notified_checkpoint failed.
423 */
mts_finalize_recovery()424 bool Relay_log_info::mts_finalize_recovery() {
425   bool ret = false;
426   uint i;
427   uint repo_type = get_rpl_info_handler()->get_rpl_info_type();
428 
429   DBUG_TRACE;
430 
431   for (Slave_worker **it = workers.begin(); !ret && it != workers.end(); ++it) {
432     Slave_worker *w = *it;
433     ret = w->reset_recovery_info();
434     DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret = true;);
435   }
436   /*
437     The loop is traversed in the worker index descending order due
438     to specifics of the Worker table repository that does not like
439     even temporary holes. Therefore stale records are deleted
440     from the tail.
441   */
442   DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
443                   { DBUG_SET("+d,mts_worker_thread_init_fails"); });
444   for (i = recovery_parallel_workers; i > workers.size() && !ret; i--) {
445     Slave_worker *w =
446         Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
447     /*
448       If an error occurs during the above create_worker call, the newly created
449       worker object gets deleted within the above function call itself and only
450       NULL is returned. Hence the following check has been added to verify
451       that a valid worker object exists.
452     */
453     if (w) {
454       ret = w->remove_info();
455       delete w;
456     } else {
457       ret = true;
458       goto err;
459     }
460   }
461   recovery_parallel_workers = slave_parallel_workers;
462 
463 err:
464   return ret;
465 }
466 
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)467 static inline int add_relay_log(Relay_log_info *rli, LOG_INFO *linfo) {
468   MY_STAT s;
469   DBUG_TRACE;
470   mysql_mutex_assert_owner(&rli->log_space_lock);
471   if (!mysql_file_stat(key_file_relaylog, linfo->log_file_name, &s, MYF(0))) {
472     LogErr(ERROR_LEVEL, ER_RPL_FAILED_TO_STAT_LOG_IN_INDEX,
473            linfo->log_file_name);
474     return 1;
475   }
476   rli->log_space_total += s.st_size;
477 #ifndef DBUG_OFF
478   char buf[22];
479   DBUG_PRINT("info", ("log_space_total: %s", llstr(rli->log_space_total, buf)));
480 #endif
481   return 0;
482 }
483 
count_relay_log_space()484 int Relay_log_info::count_relay_log_space() {
485   LOG_INFO flinfo;
486   DBUG_TRACE;
487   MUTEX_LOCK(lock, &log_space_lock);
488   log_space_total = 0;
489   if (relay_log.find_log_pos(&flinfo, NullS, true)) {
490     LogErr(ERROR_LEVEL, ER_RPL_LOG_NOT_FOUND_WHILE_COUNTING_RELAY_LOG_SPACE);
491     return 1;
492   }
493   do {
494     if (add_relay_log(this, &flinfo)) return 1;
495   } while (!relay_log.find_next_log(&flinfo, true));
496   /*
497      As we have counted everything, including what may have written in a
498      preceding write, we must reset bytes_written, or we may count some space
499      twice.
500   */
501   relay_log.reset_bytes_written();
502   return 0;
503 }
504 
reset_group_relay_log_pos(const char ** errmsg)505 bool Relay_log_info::reset_group_relay_log_pos(const char **errmsg) {
506   LOG_INFO linfo;
507 
508   mysql_mutex_assert_owner(&data_lock);
509 
510   if (relay_log.find_log_pos(&linfo, NullS, true)) {
511     *errmsg = "Could not find first log during relay log initialization";
512     return true;
513   }
514   set_group_relay_log_name(linfo.log_file_name);
515   group_relay_log_pos = BIN_LOG_HEADER_SIZE;
516   return false;
517 }
518 
is_group_relay_log_name_invalid(const char ** errmsg)519 bool Relay_log_info::is_group_relay_log_name_invalid(const char **errmsg) {
520   DBUG_TRACE;
521   const char *errmsg_fmt = nullptr;
522   static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
523   LOG_INFO linfo;
524 
525   *errmsg = nullptr;
526   if (relay_log.find_log_pos(&linfo, group_relay_log_name, true)) {
527     errmsg_fmt =
528         "Could not find target log file mentioned in "
529         "relay log info in the index file '%s' during "
530         "relay log initialization";
531     sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
532     *errmsg = errmsg_buff;
533     return true;
534   }
535   return false;
536 }
537 
538 /**
539   Update the error number, message and timestamp fields. This function is
540   different from va_report() as va_report() also logs the error message in the
541   log apart from updating the error fields.
542 
543   SYNOPSIS
544   @param[in]  level          specifies the level- error, warning or information,
545   @param[in]  err_code       error number,
546   @param[in]  buff_coord     error message to be used.
547 
548 */
fill_coord_err_buf(loglevel level,int err_code,const char * buff_coord) const549 void Relay_log_info::fill_coord_err_buf(loglevel level, int err_code,
550                                         const char *buff_coord) const {
551   mysql_mutex_lock(&err_lock);
552 
553   if (level == ERROR_LEVEL) {
554     m_last_error.number = err_code;
555     snprintf(m_last_error.message, sizeof(m_last_error.message), "%.*s",
556              MAX_SLAVE_ERRMSG - 1, buff_coord);
557     m_last_error.update_timestamp();
558   }
559 
560   mysql_mutex_unlock(&err_lock);
561 }
562 
563 /**
564   Waits until the SQL thread reaches (has executed up to) the
565   log/position or timed out.
566 
567   SYNOPSIS
568   @param[in]  thd             client thread that sent @c SELECT @c
569   MASTER_POS_WAIT,
570   @param[in]  log_name        log name to wait for,
571   @param[in]  log_pos         position to wait for,
572   @param[in]  timeout         @c timeout in seconds before giving up waiting.
573                               @c timeout is double whereas it should be ulong;
574   but this is to catch if the user submitted a negative timeout.
575 
576   @retval  -2   improper arguments (log_pos<0)
577                 or slave not running, or master info changed
578                 during the function's execution,
579                 or client thread killed. -2 is translated to NULL by caller,
580   @retval  -1   timed out
581   @retval  >=0  number of log events the function had to wait
582                 before reaching the desired log/position
583  */
584 
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)585 int Relay_log_info::wait_for_pos(THD *thd, String *log_name, longlong log_pos,
586                                  double timeout) {
587   int event_count = 0;
588   ulong init_abort_pos_wait;
589   int error = 0;
590   struct timespec abstime;  // for timeout checking
591   PSI_stage_info old_stage;
592   DBUG_TRACE;
593 
594   if (!inited) return -2;
595 
596   DBUG_PRINT("enter", ("log_name: '%s'  log_pos: %lu  timeout: %lu",
597                        log_name->c_ptr_safe(), (ulong)log_pos, (ulong)timeout));
598 
599   DEBUG_SYNC(thd, "begin_master_pos_wait");
600 
601   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
602   mysql_mutex_lock(&data_lock);
603   thd->ENTER_COND(&data_cond, &data_lock,
604                   &stage_waiting_for_the_slave_thread_to_advance_position,
605                   &old_stage);
606   /*
607      This function will abort when it notices that some CHANGE MASTER or
608      RESET MASTER has changed the master info.
609      To catch this, these commands modify abort_pos_wait ; We just monitor
610      abort_pos_wait and see if it has changed.
611      Why do we have this mechanism instead of simply monitoring slave_running
612      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
613      the SQL thread be stopped?
614      This is becasue if someones does:
615      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
616      the change may happen very quickly and we may not notice that
617      slave_running briefly switches between 1/0/1.
618   */
619   init_abort_pos_wait = abort_pos_wait;
620 
621   /*
622     We'll need to
623     handle all possible log names comparisons (e.g. 999 vs 1000).
624     We use ulong for string->number conversion ; this is no
625     stronger limitation than in find_uniq_filename in sql/log.cc
626   */
627   ulong log_name_extension;
628   char log_name_tmp[FN_REFLEN];  // make a char[] from String
629 
630   strmake(log_name_tmp, log_name->ptr(),
631           min<uint32>(log_name->length(), FN_REFLEN - 1));
632 
633   const char *p = fn_ext(log_name_tmp);
634   char *p_end;
635   if (!*p || log_pos < 0) {
636     error = -2;  // means improper arguments
637     goto err;
638   }
639   // Convert 0-3 to 4
640   log_pos = max(log_pos, static_cast<longlong>(BIN_LOG_HEADER_SIZE));
641   /* p points to '.' */
642   log_name_extension = strtoul(++p, &p_end, 10);
643   /*
644     p_end points to the first invalid character.
645     If it equals to p, no digits were found, error.
646     If it contains '\0' it means conversion went ok.
647   */
648   if (p_end == p || *p_end) {
649     error = -2;
650     goto err;
651   }
652 
653   /* The "compare and wait" main loop */
654   while (!thd->killed && init_abort_pos_wait == abort_pos_wait &&
655          slave_running) {
656     bool pos_reached;
657     int cmp_result = 0;
658 
659     DBUG_PRINT("info", ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
660                         init_abort_pos_wait, abort_pos_wait));
661     DBUG_PRINT("info", ("group_master_log_name: '%s'  pos: %lu",
662                         group_master_log_name, (ulong)group_master_log_pos));
663 
664     /*
665       group_master_log_name can be "", if we are just after a fresh
666       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
667       (before we have executed one Rotate event from the master) or
668       (rare) if the user is doing a weird slave setup (see next
669       paragraph).  If group_master_log_name is "", we assume we don't
670       have enough info to do the comparison yet, so we just wait until
671       more data. In this case master_log_pos is always 0 except if
672       somebody (wrongly) sets this slave to be a slave of itself
673       without using --replicate-same-server-id (an unsupported
674       configuration which does nothing), then group_master_log_pos
675       will grow and group_master_log_name will stay "".
676       Also in case the group master log position is invalid (e.g. after
677       CHANGE MASTER TO RELAY_LOG_POS ), we will wait till the first event
678       is read and the log position is valid again.
679     */
680     if (*group_master_log_name && !is_group_master_log_pos_invalid) {
681       char *basename =
682           (group_master_log_name + dirname_length(group_master_log_name));
683       /*
684         First compare the parts before the extension.
685         Find the dot in the master's log basename,
686         and protect against user's input error :
687         if the names do not match up to '.' included, return error
688       */
689       const char *q = fn_ext(basename) + 1;
690       if (strncmp(basename, log_name_tmp, (int)(q - basename))) {
691         error = -2;
692         break;
693       }
694       // Now compare extensions.
695       char *q_end;
696       ulong group_master_log_name_extension = strtoul(q, &q_end, 10);
697       if (group_master_log_name_extension < log_name_extension)
698         cmp_result = -1;
699       else
700         cmp_result =
701             (group_master_log_name_extension > log_name_extension) ? 1 : 0;
702 
703       pos_reached =
704           ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
705            cmp_result > 0);
706       if (pos_reached || thd->killed) break;
707     }
708 
709     // wait for master update, with optional timeout.
710 
711     DBUG_PRINT("info", ("Waiting for master update"));
712     /*
713       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
714       will wake us up.
715     */
716     thd_wait_begin(thd, THD_WAIT_BINLOG);
717     if (timeout > 0) {
718       /*
719         Note that mysql_cond_timedwait checks for the timeout
720         before for the condition ; i.e. it returns ETIMEDOUT
721         if the system time equals or exceeds the time specified by abstime
722         before the condition variable is signaled or broadcast, _or_ if
723         the absolute time specified by abstime has already passed at the time
724         of the call.
725         For that reason, mysql_cond_timedwait will do the "timeoutting" job
726         even if its condition is always immediately signaled (case of a loaded
727         master).
728       */
729       error = mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
730     } else
731       mysql_cond_wait(&data_cond, &data_lock);
732     thd_wait_end(thd);
733     DBUG_PRINT("info", ("Got signal of master update or timed out"));
734     if (is_timeout(error)) {
735 #ifndef DBUG_OFF
736       /*
737         Doing this to generate a stack trace and make debugging
738         easier.
739       */
740       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0)) DBUG_ASSERT(0);
741 #endif
742       error = -1;
743       break;
744     }
745     error = 0;
746     event_count++;
747     DBUG_PRINT("info", ("Testing if killed or SQL thread not running"));
748   }
749 
750 err:
751   mysql_mutex_unlock(&data_lock);
752   thd->EXIT_COND(&old_stage);
753   DBUG_PRINT("exit",
754              ("killed: %d  abort: %d  slave_running: %d "
755               "improper_arguments: %d  timed_out: %d",
756               thd->killed.load(), (int)(init_abort_pos_wait != abort_pos_wait),
757               (int)slave_running, (int)(error == -2), (int)(error == -1)));
758   if (thd->killed || init_abort_pos_wait != abort_pos_wait || !slave_running) {
759     error = -2;
760   }
761   return error ? error : event_count;
762 }
763 
wait_for_gtid_set(THD * thd,const char * gtid,double timeout,bool update_THD_status)764 int Relay_log_info::wait_for_gtid_set(THD *thd, const char *gtid,
765                                       double timeout, bool update_THD_status) {
766   DBUG_TRACE;
767 
768   DBUG_PRINT("info", ("Waiting for %s timeout %lf", gtid, timeout));
769 
770   Gtid_set wait_gtid_set(global_sid_map);
771   global_sid_lock->rdlock();
772   enum_return_status ret = wait_gtid_set.add_gtid_text(gtid);
773   global_sid_lock->unlock();
774 
775   if (ret != RETURN_STATUS_OK) {
776     DBUG_PRINT("exit", ("improper gtid argument"));
777     return -2;
778   }
779 
780   return wait_for_gtid_set(thd, &wait_gtid_set, timeout, update_THD_status);
781 }
782 
wait_for_gtid_set(THD * thd,String * gtid,double timeout,bool update_THD_status)783 int Relay_log_info::wait_for_gtid_set(THD *thd, String *gtid, double timeout,
784                                       bool update_THD_status) {
785   DBUG_TRACE;
786   return wait_for_gtid_set(thd, gtid->c_ptr_safe(), timeout, update_THD_status);
787 }
788 
789 /*
790   TODO: This is a duplicated code that needs to be simplified.
791   This will be done while developing all possible sync options.
792   See WL#3584's specification.
793 
794   /Alfranio
795 */
wait_for_gtid_set(THD * thd,const Gtid_set * wait_gtid_set,double timeout,bool update_THD_status)796 int Relay_log_info::wait_for_gtid_set(THD *thd, const Gtid_set *wait_gtid_set,
797                                       double timeout, bool update_THD_status) {
798   int event_count = 0;
799   ulong init_abort_pos_wait;
800   int error = 0;
801   struct timespec abstime;  // for timeout checking
802   PSI_stage_info old_stage;
803   DBUG_TRACE;
804 
805   if (!inited) return -2;
806 
807   DEBUG_SYNC(thd, "begin_wait_for_gtid_set");
808 
809   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
810 
811   mysql_mutex_lock(&data_lock);
812   if (update_THD_status)
813     thd->ENTER_COND(&data_cond, &data_lock,
814                     &stage_waiting_for_the_slave_thread_to_advance_position,
815                     &old_stage);
816   /*
817      This function will abort when it notices that some CHANGE MASTER or
818      RESET MASTER has changed the master info.
819      To catch this, these commands modify abort_pos_wait ; We just monitor
820      abort_pos_wait and see if it has changed.
821      Why do we have this mechanism instead of simply monitoring slave_running
822      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
823      the SQL thread be stopped?
824      This is becasue if someones does:
825      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
826      the change may happen very quickly and we may not notice that
827      slave_running briefly switches between 1/0/1.
828   */
829   init_abort_pos_wait = abort_pos_wait;
830 
831   /* The "compare and wait" main loop */
832   while (!thd->killed && init_abort_pos_wait == abort_pos_wait &&
833          slave_running) {
834     DBUG_PRINT("info", ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
835                         init_abort_pos_wait, abort_pos_wait));
836 
837     // wait for master update, with optional timeout.
838 
839     DBUG_ASSERT(wait_gtid_set->get_sid_map() == nullptr ||
840                 wait_gtid_set->get_sid_map() == global_sid_map);
841 
842     global_sid_lock->wrlock();
843     const Gtid_set *executed_gtids = gtid_state->get_executed_gtids();
844     const Owned_gtids *owned_gtids = gtid_state->get_owned_gtids();
845 
846 #ifndef DBUG_OFF
847     char *wait_gtid_set_buf;
848     wait_gtid_set->to_string(&wait_gtid_set_buf);
849     DBUG_PRINT("info",
850                ("Waiting for '%s'. is_subset: %d and "
851                 "!is_intersection_nonempty: %d",
852                 wait_gtid_set_buf, wait_gtid_set->is_subset(executed_gtids),
853                 !owned_gtids->is_intersection_nonempty(wait_gtid_set)));
854     my_free(wait_gtid_set_buf);
855     executed_gtids->dbug_print("gtid_executed:");
856     owned_gtids->dbug_print("owned_gtids:");
857 #endif
858 
859     /*
860       Since commit is performed after log to binary log, we must also
861       check if any GTID of wait_gtid_set is not yet committed.
862     */
863     if (wait_gtid_set->is_subset(executed_gtids) &&
864         !owned_gtids->is_intersection_nonempty(wait_gtid_set)) {
865       global_sid_lock->unlock();
866       break;
867     }
868     global_sid_lock->unlock();
869 
870     DBUG_PRINT("info", ("Waiting for master update"));
871 
872     /*
873       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
874       will wake us up.
875     */
876     thd_wait_begin(thd, THD_WAIT_BINLOG);
877     if (timeout > 0) {
878       /*
879         Note that mysql_cond_timedwait checks for the timeout
880         before for the condition ; i.e. it returns ETIMEDOUT
881         if the system time equals or exceeds the time specified by abstime
882         before the condition variable is signaled or broadcast, _or_ if
883         the absolute time specified by abstime has already passed at the time
884         of the call.
885         For that reason, mysql_cond_timedwait will do the "timeoutting" job
886         even if its condition is always immediately signaled (case of a loaded
887         master).
888       */
889       error = mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
890     } else
891       mysql_cond_wait(&data_cond, &data_lock);
892     thd_wait_end(thd);
893     DBUG_PRINT("info", ("Got signal of master update or timed out"));
894     if (is_timeout(error)) {
895 #ifndef DBUG_OFF
896       /*
897         Doing this to generate a stack trace and make debugging
898         easier.
899       */
900       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0)) DBUG_ASSERT(0);
901 #endif
902       error = -1;
903       break;
904     }
905     error = 0;
906     event_count++;
907     DBUG_PRINT("info", ("Testing if killed or SQL thread not running"));
908   }
909 
910   mysql_mutex_unlock(&data_lock);
911 
912   if (update_THD_status) thd->EXIT_COND(&old_stage);
913   DBUG_PRINT("exit",
914              ("killed: %d  abort: %d  slave_running: %d "
915               "improper_arguments: %d  timed_out: %d",
916               thd->killed.load(), (int)(init_abort_pos_wait != abort_pos_wait),
917               (int)slave_running, (int)(error == -2), (int)(error == -1)));
918   if (thd->killed || init_abort_pos_wait != abort_pos_wait || !slave_running) {
919     error = -2;
920   }
921   return error ? error : event_count;
922 }
923 
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock,bool force)924 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
925                                             bool need_data_lock, bool force) {
926   int error = 0;
927   DBUG_TRACE;
928 
929   if (need_data_lock)
930     mysql_mutex_lock(&data_lock);
931   else
932     mysql_mutex_assert_owner(&data_lock);
933 
934   inc_event_relay_log_pos();
935   group_relay_log_pos = event_relay_log_pos;
936   strmake(group_relay_log_name, event_relay_log_name,
937           sizeof(group_relay_log_name) - 1);
938 
939   /*
940     In 4.x we used the event's len to compute the positions here. This is
941     wrong if the event was 3.23/4.0 and has been converted to 5.0, because
942     then the event's len is not what is was in the master's binlog, so this
943     will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
944     replication: Exec_master_log_pos is wrong). Only way to solve this is to
945     have the original offset of the end of the event the relay log. This is
946     what we do in 5.0: log_pos has become "end_log_pos" (because the real use
947     of log_pos in 4.0 was to compute the end_log_pos; so better to store
948     end_log_pos instead of begin_log_pos.
949     If we had not done this fix here, the problem would also have appeared
950     when the slave and master are 5.0 but with different event length (for
951     example the slave is more recent than the master and features the event
952     UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
953     SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
954     value which would lead to badly broken replication.
955     Even the relay_log_pos will be corrupted in this case, because the len is
956     the relay log is not "val".
957     With the end_log_pos solution, we avoid computations involving lengthes.
958   */
959   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu", (long)log_pos,
960                       (long)group_master_log_pos));
961 
962   if (log_pos > 0)  // 3.23 binlogs don't have log_posx
963     group_master_log_pos = log_pos;
964   /*
965     If the master log position was invalidiated by say, "CHANGE MASTER TO
966     RELAY_LOG_POS=N", it is now valid,
967    */
968   if (is_group_master_log_pos_invalid) is_group_master_log_pos_invalid = false;
969 
970   /*
971     In MTS mode FD or Rotate event commit their solitary group to
972     Coordinator's info table. Callers make sure that Workers have been
973     executed all assignements.
974     Broadcast to master_pos_wait() waiters should be done after
975     the table is updated.
976   */
977   DBUG_ASSERT(!is_parallel_exec() ||
978               mts_group_status != Relay_log_info::MTS_IN_GROUP);
979   /*
980     We do not force synchronization at this point, except for Rotate event
981     (see Rotate_log_event::do_update_pos), note @c force is false by default,
982     because a non-transactional change is being committed.
983 
984     For that reason, the synchronization here is subjected to
985     the option sync_relay_log_info.
986 
987     See sql/rpl_rli.h for further information on this behavior.
988   */
989   error = flush_info(force);
990 
991   mysql_cond_broadcast(&data_cond);
992   if (need_data_lock) mysql_mutex_unlock(&data_lock);
993   return error;
994 }
995 
close_temporary_tables()996 void Relay_log_info::close_temporary_tables() {
997   TABLE *table, *next;
998   int num_closed_temp_tables = 0;
999   DBUG_TRACE;
1000 
1001   for (table = save_temporary_tables; table; table = next) {
1002     next = table->next;
1003     /*
1004       Don't ask for disk deletion. For now, anyway they will be deleted when
1005       slave restarts, but it is a better intention to not delete them.
1006     */
1007     DBUG_PRINT("info", ("table: %p", table));
1008     close_temporary(nullptr, table, true, false);
1009     num_closed_temp_tables++;
1010   }
1011   save_temporary_tables = nullptr;
1012   atomic_slave_open_temp_tables -= num_closed_temp_tables;
1013   atomic_channel_open_temp_tables -= num_closed_temp_tables;
1014 }
1015 
1016 /**
1017   Purges relay logs. It assumes to have a run lock on rli and that no
1018   slave thread are running.
1019 
1020   @param[in]   thd         connection,
1021   @param[out]  errmsg      store pointer to an error message.
1022   @param[in]   delete_only If true, do not start writing to a new log file.
1023 
1024   @retval 0 successfully executed,
1025   @retval 1 otherwise error, where errmsg is set to point to the error message.
1026 */
1027 
purge_relay_logs(THD * thd,const char ** errmsg,bool delete_only)1028 int Relay_log_info::purge_relay_logs(THD *thd, const char **errmsg,
1029                                      bool delete_only) {
1030   int error = 0;
1031   const char *ln;
1032   /* name of the index file if opt_relaylog_index_name is set*/
1033   const char *log_index_name;
1034   /*
1035     Buffer to add channel name suffix when relay-log-index option is
1036     provided
1037    */
1038   char relay_bin_index_channel[FN_REFLEN];
1039 
1040   const char *ln_without_channel_name;
1041   /*
1042     Buffer to add channel name suffix when relay-log option is provided.
1043    */
1044   char relay_bin_channel[FN_REFLEN];
1045 
1046   char buffer[FN_REFLEN];
1047 
1048   mysql_mutex_t *log_lock = relay_log.get_log_lock();
1049 
1050   DBUG_TRACE;
1051 
1052   /*
1053     Even if inited==0, we still try to empty master_log_* variables. Indeed,
1054     inited==0 does not imply that they already are empty.
1055 
1056     It could be that slave's info initialization partly succeeded: for example
1057     if relay-log.info existed but all relay logs have been manually removed,
1058     init_info reads the old relay-log.info and fills rli->master_log_*, then
1059     init_info checks for the existence of the relay log, this fails and
1060     init_info leaves inited to 0.
1061     In that pathological case, master_log_pos* will be properly reinited at
1062     the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1063     purge_relay_logs, will delete bogus *.info files or replace them with
1064     correct files), however if the user does SHOW SLAVE STATUS before START
1065     SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1066     master_log_* for SHOW SLAVE STATUS to display fine in any case.
1067   */
1068   group_master_log_name[0] = 0;
1069   group_master_log_pos = 0;
1070 
1071   /*
1072     Following the the relay log purge, the master_log_pos will be in sync
1073     with relay_log_pos, so the flag should be cleared. Refer bug#11766010.
1074   */
1075 
1076   is_group_master_log_pos_invalid = false;
1077 
1078   if (!inited) {
1079     DBUG_PRINT("info", ("inited == 0"));
1080     if (error_on_rli_init_info ||
1081         /*
1082           mi->reset means that the channel was reset but still exists. Channel
1083           shall have the index and the first relay log file.
1084 
1085           Those files shall be remove in a following RESET SLAVE ALL (even when
1086           channel was not inited again).
1087         */
1088         (mi->reset && delete_only)) {
1089       DBUG_ASSERT(relay_log.is_relay_log);
1090       ln_without_channel_name =
1091           relay_log.generate_name(opt_relay_logname, "-relay-bin", buffer);
1092 
1093       ln = add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1094                                          ln_without_channel_name);
1095       if (opt_relaylog_index_name_supplied) {
1096         char index_file_withoutext[FN_REFLEN];
1097         relay_log.generate_name(opt_relaylog_index_name, "",
1098                                 index_file_withoutext);
1099 
1100         log_index_name = add_channel_to_relay_log_name(
1101             relay_bin_index_channel, FN_REFLEN, index_file_withoutext);
1102       } else
1103         log_index_name = nullptr;
1104 
1105       if (relay_log.open_index_file(log_index_name, ln, true)) {
1106         LogErr(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_PURGE_FAILED,
1107                "Failed to open relay log index file:",
1108                relay_log.get_index_fname());
1109         return 1;
1110       }
1111       mysql_mutex_lock(&mi->data_lock);
1112       mysql_mutex_lock(log_lock);
1113       if (relay_log.open_binlog(
1114               ln, nullptr,
1115               (max_relay_log_size ? max_relay_log_size : max_binlog_size), true,
1116               true /*need_lock_index=true*/, true /*need_sid_lock=true*/,
1117               mi->get_mi_description_event())) {
1118         mysql_mutex_unlock(log_lock);
1119         mysql_mutex_unlock(&mi->data_lock);
1120         LogErr(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_PURGE_FAILED,
1121                "Failed to open relay log file:", relay_log.get_log_fname());
1122         return 1;
1123       }
1124       mysql_mutex_unlock(log_lock);
1125       mysql_mutex_unlock(&mi->data_lock);
1126     } else
1127       return 0;
1128   } else {
1129     DBUG_ASSERT(slave_running == 0);
1130     DBUG_ASSERT(mi->slave_running == 0);
1131   }
1132   /* Reset the transaction boundary parser and clear the last GTID queued */
1133   mi->transaction_parser.reset();
1134   mysql_mutex_lock(&mi->data_lock);
1135   mi->clear_gtid_monitoring_info();
1136   mysql_mutex_unlock(&mi->data_lock);
1137 
1138   slave_skip_counter = 0;
1139   mysql_mutex_lock(&data_lock);
1140 
1141   /**
1142     Clear the retrieved gtid set for this channel.
1143   */
1144   get_sid_lock()->wrlock();
1145   (const_cast<Gtid_set *>(get_gtid_set()))->clear_set_and_sid_map();
1146   get_sid_lock()->unlock();
1147 
1148   if (relay_log.reset_logs(thd, delete_only)) {
1149     *errmsg = "Failed during log reset";
1150     error = 1;
1151     goto err;
1152   }
1153 
1154   /* Save name of used relay log file */
1155   set_group_relay_log_name(relay_log.get_log_fname());
1156   group_relay_log_pos = BIN_LOG_HEADER_SIZE;
1157   if (!delete_only && count_relay_log_space()) {
1158     *errmsg = "Error counting relay log space";
1159     error = 1;
1160     goto err;
1161   }
1162 
1163   if (!inited && error_on_rli_init_info)
1164     relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1165                     true /*need_lock_log=true*/, true /*need_lock_index=true*/);
1166 err:
1167 #ifndef DBUG_OFF
1168   char buf[22];
1169 #endif
1170   DBUG_PRINT("info", ("log_space_total: %s", llstr(log_space_total, buf)));
1171   mysql_mutex_unlock(&data_lock);
1172   return error;
1173 }
1174 
add_channel_to_relay_log_name(char * buff,uint buff_size,const char * base_name)1175 const char *Relay_log_info::add_channel_to_relay_log_name(
1176     char *buff, uint buff_size, const char *base_name) {
1177   char *ptr;
1178   char channel_to_file[FN_REFLEN];
1179   uint errors, length;
1180   uint base_name_len;
1181   uint suffix_buff_size;
1182 
1183   DBUG_ASSERT(base_name != nullptr);
1184 
1185   base_name_len = strlen(base_name);
1186   suffix_buff_size = buff_size - base_name_len;
1187 
1188   ptr = strmake(buff, base_name, buff_size - 1);
1189 
1190   if (channel[0]) {
1191     /* adding a "-" */
1192     ptr = strmake(ptr, "-", suffix_buff_size - 1);
1193 
1194     /*
1195       Convert the channel name to the file names charset.
1196       Channel name is in system_charset which is UTF8_general_ci
1197       as it was defined as utf8 in the mysql.slaveinfo tables.
1198     */
1199     length = strconvert(system_charset_info, channel, &my_charset_filename,
1200                         channel_to_file, NAME_LEN, &errors);
1201     ptr = strmake(ptr, channel_to_file, suffix_buff_size - length - 1);
1202   }
1203 
1204   return (const char *)buff;
1205 }
1206 
cached_charset_invalidate()1207 void Relay_log_info::cached_charset_invalidate() {
1208   DBUG_TRACE;
1209 
1210   /* Full of zeroes means uninitialized. */
1211   memset(cached_charset, 0, sizeof(cached_charset));
1212 }
1213 
cached_charset_compare(char * charset) const1214 bool Relay_log_info::cached_charset_compare(char *charset) const {
1215   DBUG_TRACE;
1216 
1217   if (memcmp(cached_charset, charset, sizeof(cached_charset))) {
1218     memcpy(const_cast<char *>(cached_charset), charset, sizeof(cached_charset));
1219     return true;
1220   }
1221   return false;
1222 }
1223 
stmt_done(my_off_t event_master_log_pos)1224 int Relay_log_info::stmt_done(my_off_t event_master_log_pos) {
1225   clear_flag(IN_STMT);
1226 
1227   DBUG_ASSERT(!belongs_to_client());
1228   /* Worker does not execute binlog update position logics */
1229   DBUG_ASSERT(!is_mts_worker(info_thd));
1230 
1231   /*
1232     Replication keeps event and group positions to specify the
1233     set of events that were executed.
1234     Event positions are incremented after processing each event
1235     whereas group positions are incremented when an event or a
1236     set of events is processed such as in a transaction and are
1237     committed or rolled back.
1238 
1239     A transaction can be ended with a Query Event, i.e. either
1240     commit or rollback, or by a Xid Log Event. Query Event is
1241     used to terminate pseudo-transactions that are executed
1242     against non-transactional engines such as MyIsam. Xid Log
1243     Event denotes though that a set of changes executed
1244     against a transactional engine is about to commit.
1245 
1246     Events' positions are incremented at stmt_done(). However,
1247     transactions that are ended with Xid Log Event have their
1248     group position incremented in the do_apply_event() and in
1249     the do_apply_event_work().
1250 
1251     Notice that the type of the engine, i.e. where data and
1252     positions are stored, against what events are being applied
1253     are not considered in this logic.
1254 
1255     Regarding the code that follows, notice that the executed
1256     group coordinates don't change if the current event is internal
1257     to the group. The same applies to MTS Coordinator when it
1258     handles a Format Descriptor event that appears in the middle
1259     of a group that is about to be assigned.
1260   */
1261   if ((!is_parallel_exec() && is_in_group()) ||
1262       mts_group_status != MTS_NOT_IN_GROUP) {
1263     inc_event_relay_log_pos();
1264   } else {
1265     return inc_group_relay_log_pos(event_master_log_pos,
1266                                    true /*need_data_lock*/);
1267   }
1268   return 0;
1269 }
1270 
cleanup_context(THD * thd,bool error)1271 void Relay_log_info::cleanup_context(THD *thd, bool error) {
1272   DBUG_TRACE;
1273 
1274   DBUG_ASSERT(info_thd == thd);
1275   /*
1276     1) Instances of Table_map_log_event, if ::do_apply_event() was called on
1277     them, may have opened tables, which we cannot be sure have been closed
1278     (because maybe the Rows_log_event have not been found or will not be,
1279     because slave SQL thread is stopping, or relay log has a missing tail etc).
1280     So we close all thread's tables. And so the table mappings have to be
1281     cancelled. 2) Rows_log_event::do_apply_event() may even have started
1282     statements or transactions on them, which we need to rollback in case of
1283     error. 3) If finding a Format_description_log_event after a BEGIN, we also
1284     need to rollback before continuing with the next events. 4) so we need this
1285     "context cleanup" function.
1286   */
1287   if (error) {
1288     trans_rollback_stmt(thd);  // if a "statement transaction"
1289     trans_rollback(thd);       // if a "real transaction"
1290     thd->variables.original_commit_timestamp = UNDEFINED_COMMIT_TIMESTAMP;
1291   }
1292   if (rows_query_ev) {
1293     /*
1294       In order to avoid invalid memory access, THD::reset_query() should be
1295       called before deleting the rows_query event.
1296     */
1297     info_thd->reset_query();
1298     info_thd->reset_query_for_display();
1299     delete rows_query_ev;
1300     rows_query_ev = nullptr;
1301     DBUG_EXECUTE_IF("after_deleting_the_rows_query_ev", {
1302       const char action[] =
1303           "now SIGNAL deleted_rows_query_ev WAIT_FOR go_ahead";
1304       DBUG_ASSERT(!debug_sync_set_action(info_thd, STRING_WITH_LEN(action)));
1305     };);
1306   }
1307   m_table_map.clear_tables();
1308   slave_close_thread_tables(thd);
1309   if (error) {
1310     /*
1311       trans_rollback above does not rollback XA transactions.
1312       It could be done only after necessarily closing tables which dictates
1313       the following placement.
1314     */
1315     XID_STATE *xid_state = thd->get_transaction()->xid_state();
1316     if (!xid_state->has_state(XID_STATE::XA_NOTR)) {
1317       DBUG_ASSERT(
1318           DBUG_EVALUATE_IF("simulate_commit_failure", 1,
1319                            xid_state->has_state(XID_STATE::XA_ACTIVE) ||
1320                                xid_state->has_state(XID_STATE::XA_IDLE)));
1321 
1322       xa_trans_force_rollback(thd);
1323       xid_state->reset();
1324       cleanup_trans_state(thd);
1325       thd->rpl_unflag_detached_engine_ha_data();
1326     }
1327     thd->mdl_context.release_transactional_locks();
1328   }
1329   clear_flag(IN_STMT);
1330   /*
1331     Cleanup for the flags that have been set at do_apply_event.
1332   */
1333   thd->variables.option_bits &= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1334   thd->variables.option_bits &= ~OPTION_RELAXED_UNIQUE_CHECKS;
1335 
1336   /*
1337     Reset state related to long_find_row notes in the error log:
1338     - timestamp
1339     - flag that decides whether the slave prints or not
1340   */
1341   reset_row_stmt_start_timestamp();
1342   unset_long_find_row_note_printed();
1343 
1344   /*
1345     If the slave applier changed the current transaction isolation level,
1346     it need to be restored to the session default value once having the
1347     current transaction cleared.
1348 
1349     We should call "trans_reset_one_shot_chistics()" only if the "error"
1350     flag is "true", because "cleanup_context()" is called at the end of each
1351     set of Table_maps/Rows representing a statement (when the rows event
1352     is tagged with the STMT_END_F) with the "error" flag as "false".
1353 
1354     So, without the "if (error)" below, the isolation level might be reset
1355     in the middle of a pure row based transaction.
1356   */
1357   if (error) trans_reset_one_shot_chistics(thd);
1358 }
1359 
clear_tables_to_lock()1360 void Relay_log_info::clear_tables_to_lock() {
1361   DBUG_TRACE;
1362 #ifndef DBUG_OFF
1363   /**
1364     When replicating in RBR and MyISAM Merge tables are involved
1365     open_and_lock_tables (called in do_apply_event) appends the
1366     base tables to the list of tables_to_lock. Then these are
1367     removed from the list in close_thread_tables (which is called
1368     before we reach this point).
1369 
1370     This assertion just confirms that we get no surprises at this
1371     point.
1372    */
1373   uint i = 0;
1374   for (TABLE_LIST *ptr = tables_to_lock; ptr; ptr = ptr->next_global, i++)
1375     ;
1376   DBUG_ASSERT(i == tables_to_lock_count);
1377 #endif
1378 
1379   while (tables_to_lock) {
1380     uchar *to_free = reinterpret_cast<uchar *>(tables_to_lock);
1381     if (tables_to_lock->m_tabledef_valid) {
1382       tables_to_lock->m_tabledef.table_def::~table_def();
1383       tables_to_lock->m_tabledef_valid = false;
1384     }
1385 
1386     /*
1387       If blob fields were used during conversion of field values
1388       from the master table into the slave table, then we need to
1389       free the memory used temporarily to store their values before
1390       copying into the slave's table.
1391     */
1392     if (tables_to_lock->m_conv_table) free_blobs(tables_to_lock->m_conv_table);
1393 
1394     tables_to_lock = static_cast<RPL_TABLE_LIST *>(tables_to_lock->next_global);
1395     tables_to_lock_count--;
1396     my_free(to_free);
1397   }
1398   DBUG_ASSERT(tables_to_lock == nullptr && tables_to_lock_count == 0);
1399 }
1400 
slave_close_thread_tables(THD * thd)1401 void Relay_log_info::slave_close_thread_tables(THD *thd) {
1402   thd->get_stmt_da()->set_overwrite_status(true);
1403   DBUG_TRACE;
1404   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1405   thd->get_stmt_da()->set_overwrite_status(false);
1406 
1407   close_thread_tables(thd);
1408   /*
1409     - If transaction rollback was requested due to deadlock
1410     perform it and release metadata locks.
1411     - If inside a multi-statement transaction,
1412     defer the release of metadata locks until the current
1413     transaction is either committed or rolled back. This prevents
1414     other statements from modifying the table for the entire
1415     duration of this transaction.  This provides commit ordering
1416     and guarantees serializability across multiple transactions.
1417     - If in autocommit mode, or outside a transactional context,
1418     automatically release metadata locks of the current statement.
1419   */
1420   if (thd->transaction_rollback_request) {
1421     trans_rollback_implicit(thd);
1422     thd->mdl_context.release_transactional_locks();
1423   } else if (!thd->in_multi_stmt_transaction_mode())
1424     thd->mdl_context.release_transactional_locks();
1425   else
1426     thd->mdl_context.release_statement_locks();
1427 
1428   clear_tables_to_lock();
1429 }
1430 
1431 /**
1432   Execute a SHOW RELAYLOG EVENTS statement.
1433 
1434   When multiple replication channels exist on this slave
1435   and no channel name is specified through FOR CHANNEL clause
1436   this function errors out and exits.
1437 
1438   @param thd Pointer to THD object for the client thread executing the
1439   statement.
1440 
1441   @retval false success
1442   @retval true failure
1443 */
mysql_show_relaylog_events(THD * thd)1444 bool mysql_show_relaylog_events(THD *thd) {
1445   Master_info *mi = nullptr;
1446   List<Item> field_list;
1447   bool res;
1448   DBUG_TRACE;
1449 
1450   DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1451 
1452   channel_map.wrlock();
1453 
1454   if (!thd->lex->mi.for_channel && channel_map.get_num_instances() > 1) {
1455     my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0));
1456     res = true;
1457     goto err;
1458   }
1459 
1460   Log_event::init_show_field_list(&field_list);
1461   if (thd->send_result_metadata(&field_list,
1462                                 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) {
1463     res = true;
1464     goto err;
1465   }
1466 
1467   mi = channel_map.get_mi(thd->lex->mi.channel);
1468 
1469   if (!mi && strcmp(thd->lex->mi.channel, channel_map.get_default_channel())) {
1470     my_error(ER_SLAVE_CHANNEL_DOES_NOT_EXIST, MYF(0), thd->lex->mi.channel);
1471     res = true;
1472     goto err;
1473   }
1474 
1475   if (mi == nullptr) {
1476     my_error(ER_SLAVE_CONFIGURATION, MYF(0));
1477     res = true;
1478     goto err;
1479   }
1480 
1481   res = show_binlog_events(thd, &mi->rli->relay_log);
1482 
1483 err:
1484   channel_map.unlock();
1485 
1486   return res;
1487 }
1488 
rli_init_info(bool skip_received_gtid_set_recovery)1489 int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
1490   int error = 0;
1491   enum_return_check check_return = ERROR_CHECKING_REPOSITORY;
1492   const char *msg = nullptr;
1493   DBUG_TRACE;
1494 
1495   mysql_mutex_assert_owner(&data_lock);
1496 
1497   /*
1498     If Relay_log_info is issued again after a failed init_info(), for
1499     instance because of missing relay log files, it will generate new
1500     files and ignore the previous failure, to avoid that we set
1501     error_on_rli_init_info as true.
1502     This a consequence of the behaviour change, in the past server was
1503     stopped when there were replication initialization errors, now it is
1504     not and so init_info() must be aware of previous failures.
1505   */
1506   if (error_on_rli_init_info) goto err;
1507 
1508   if (inited) {
1509     return recovery_parallel_workers ? mts_recovery_groups(this) : 0;
1510   }
1511 
1512   slave_skip_counter = 0;
1513   abort_pos_wait = 0;
1514   log_space_limit = relay_log_space_limit;
1515   log_space_total = 0;
1516   tables_to_lock = nullptr;
1517   tables_to_lock_count = 0;
1518 
1519   char pattern[FN_REFLEN];
1520   (void)my_realpath(pattern, slave_load_tmpdir, 0);
1521   /*
1522    @TODO:
1523     In MSR, sometimes slave fail with the following error:
1524     Unable to use slave's temporary directory /tmp -
1525     Can't create/write to file
1526    '/tmp/SQL_LOAD-92d1eee0-9de4-11e3-8874-68730ad50fcb'    (Errcode: 17 - File
1527    exists), Error_code: 1
1528 
1529    */
1530   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
1531                 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS) {
1532     LogErr(ERROR_LEVEL, ER_SLAVE_CANT_USE_TEMPDIR, slave_load_tmpdir);
1533     return 1;
1534   }
1535   unpack_filename(slave_patternload_file, pattern);
1536   slave_patternload_file_size = strlen(slave_patternload_file);
1537 
1538   /*
1539     The relay log will now be opened, as a WRITE_CACHE IO_CACHE.
1540     Note that the I/O thread flushes it to disk after writing every
1541     event, in flush_info within the master info.
1542   */
1543   /*
1544     For the maximum log size, we choose max_relay_log_size if it is
1545     non-zero, max_binlog_size otherwise. If later the user does SET
1546     GLOBAL on one of these variables, fix_max_binlog_size and
1547     fix_max_relay_log_size will reconsider the choice (for example
1548     if the user changes max_relay_log_size to zero, we have to
1549     switch to using max_binlog_size for the relay log) and update
1550     relay_log.max_size (and mysql_bin_log.max_size).
1551   */
1552   {
1553     /* Reports an error and returns, if the --relay-log's path
1554        is a directory.*/
1555     if (opt_relay_logname &&
1556         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR) {
1557       LogErr(ERROR_LEVEL, ER_RPL_RELAY_LOG_NEEDS_FILE_NOT_DIRECTORY,
1558              opt_relay_logname);
1559       return 1;
1560     }
1561 
1562     /* Reports an error and returns, if the --relay-log-index's path
1563        is a directory.*/
1564     if (opt_relaylog_index_name &&
1565         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] ==
1566             FN_LIBCHAR) {
1567       LogErr(ERROR_LEVEL, ER_RPL_RELAY_LOG_INDEX_NEEDS_FILE_NOT_DIRECTORY,
1568              opt_relaylog_index_name);
1569       return 1;
1570     }
1571 
1572     char buf[FN_REFLEN];
1573     /* The base name of the relay log file considering multisource rep */
1574     const char *ln;
1575     /*
1576       relay log name without channel prefix taking into account
1577       --relay-log option.
1578     */
1579     const char *ln_without_channel_name;
1580     static bool name_warning_sent = false;
1581 
1582     /*
1583       Buffer to add channel name suffix when relay-log option is provided.
1584     */
1585     char relay_bin_channel[FN_REFLEN];
1586     /*
1587       Buffer to add channel name suffix when relay-log-index option is provided
1588     */
1589     char relay_bin_index_channel[FN_REFLEN];
1590 
1591     /* name of the index file if opt_relaylog_index_name is set*/
1592     const char *log_index_name;
1593 
1594     ln_without_channel_name =
1595         relay_log.generate_name(opt_relay_logname, "-relay-bin", buf);
1596 
1597     ln = add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1598                                        ln_without_channel_name);
1599 
1600     /* We send the warning only at startup, not after every RESET SLAVE */
1601     if (!opt_relay_logname_supplied && !opt_relaylog_index_name_supplied &&
1602         !name_warning_sent) {
1603       /*
1604         User didn't give us info to name the relay log index file.
1605         Picking `hostname`-relay-bin.index like we do, causes replication to
1606         fail if this slave's hostname is changed later. So, we would like to
1607         instead require a name. But as we don't want to break many existing
1608         setups, we only give warning, not error.
1609       */
1610       LogErr(WARNING_LEVEL, ER_RPL_PLEASE_USE_OPTION_RELAY_LOG,
1611              ln_without_channel_name);
1612       name_warning_sent = true;
1613     }
1614 
1615     /*
1616        If relay log index option is set, convert into channel specific
1617        index file. If the opt_relaylog_index has an extension, we strip
1618        it too. This is inconsistent to relay log names.
1619     */
1620     if (opt_relaylog_index_name_supplied) {
1621       char index_file_withoutext[FN_REFLEN];
1622       relay_log.generate_name(opt_relaylog_index_name, "",
1623                               index_file_withoutext);
1624 
1625       log_index_name = add_channel_to_relay_log_name(
1626           relay_bin_index_channel, FN_REFLEN, index_file_withoutext);
1627     } else
1628       log_index_name = nullptr;
1629 
1630     if (relay_log.open_index_file(log_index_name, ln, true)) {
1631       LogErr(ERROR_LEVEL, ER_RPL_OPEN_INDEX_FILE_FAILED);
1632       return 1;
1633     }
1634 
1635     if (!gtid_retrieved_initialized) {
1636       /* Store the GTID of a transaction spanned in multiple relay log files */
1637       Gtid_monitoring_info *partial_trx = mi->get_gtid_monitoring_info();
1638       partial_trx->clear();
1639 #ifndef DBUG_OFF
1640       get_sid_lock()->wrlock();
1641       gtid_set->dbug_print("set of GTIDs in relay log before initialization");
1642       get_sid_lock()->unlock();
1643 #endif
1644       /*
1645         In the init_gtid_set below we pass the mi->transaction_parser.
1646         This will be useful to ensure that we only add a GTID to
1647         the Retrieved_Gtid_Set for fully retrieved transactions. Also, it will
1648         be useful to ensure the Retrieved_Gtid_Set behavior when auto
1649         positioning is disabled (we could have transactions spanning multiple
1650         relay log files in this case).
1651         We will skip this initialization if relay_log_recovery is set in order
1652         to save time, as neither the GTIDs nor the transaction_parser state
1653         would be useful when the relay log will be cleaned up later when calling
1654         init_recovery.
1655       */
1656       if (!is_relay_log_recovery && !gtid_retrieved_initialized &&
1657           !skip_received_gtid_set_recovery &&
1658           relay_log.init_gtid_sets(
1659               gtid_set, nullptr, opt_slave_sql_verify_checksum,
1660               true /*true=need lock*/, &mi->transaction_parser, partial_trx)) {
1661         LogErr(ERROR_LEVEL, ER_RPL_CANT_INITIALIZE_GTID_SETS_IN_RLI_INIT_INFO);
1662         return 1;
1663       }
1664       gtid_retrieved_initialized = true;
1665 #ifndef DBUG_OFF
1666       get_sid_lock()->wrlock();
1667       gtid_set->dbug_print("set of GTIDs in relay log after initialization");
1668       get_sid_lock()->unlock();
1669 #endif
1670     }
1671     /*
1672       Configures what object is used by the current log to store processed
1673       gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
1674       correctly compute the set of previous gtids.
1675     */
1676     relay_log.set_previous_gtid_set_relaylog(gtid_set);
1677     /*
1678       note, that if open() fails, we'll still have index file open
1679       but a destructor will take care of that
1680     */
1681 
1682     mysql_mutex_t *log_lock = relay_log.get_log_lock();
1683     mysql_mutex_lock(log_lock);
1684 
1685     if (relay_log.open_binlog(
1686             ln, nullptr,
1687             (max_relay_log_size ? max_relay_log_size : max_binlog_size), true,
1688             true /*need_lock_index=true*/, true /*need_sid_lock=true*/,
1689             mi->get_mi_description_event())) {
1690       mysql_mutex_unlock(log_lock);
1691       LogErr(ERROR_LEVEL, ER_RPL_CANT_OPEN_LOG_IN_RLI_INIT_INFO);
1692       return 1;
1693     }
1694 
1695     mysql_mutex_unlock(log_lock);
1696   }
1697 
1698   /*
1699    This checks if the repository was created before and thus there
1700    will be values to be read. Please, do not move this call after
1701    the handler->init_info().
1702  */
1703   if ((check_return = check_info()) == ERROR_CHECKING_REPOSITORY) {
1704     msg = "Error checking relay log repository";
1705     error = 1;
1706     goto err;
1707   }
1708 
1709   if (handler->init_info()) {
1710     msg = "Error reading relay log configuration";
1711     error = 1;
1712     goto err;
1713   }
1714 
1715   check_return = check_if_info_was_cleared(check_return);
1716 
1717   if (check_return & REPOSITORY_EXISTS) {
1718     if (read_info(handler)) {
1719       msg = "Error reading relay log configuration";
1720       error = 1;
1721       goto err;
1722     }
1723 
1724     /*
1725       Clone required cleanup must be done only once, thence we only
1726       do it when server is booting.
1727     */
1728     if (clone_startup && get_server_state() == SERVER_BOOTING) {
1729       char *channel_name =
1730           (const_cast<Relay_log_info *>(mi->rli))->get_channel();
1731       bool is_group_replication_channel =
1732           channel_map.is_group_replication_channel_name(channel_name);
1733       if (is_group_replication_channel) {
1734         if (clear_info()) {
1735           msg =
1736               "Error cleaning relay log configuration for group replication "
1737               "after clone";
1738           error = 1;
1739           goto err;
1740         }
1741         if (Rpl_info_factory::reset_workers(this)) {
1742           msg =
1743               "Error cleaning relay log worker configuration for group "
1744               "replication after clone";
1745           error = 1;
1746           goto err;
1747         }
1748         check_return = REPOSITORY_CLEARED;
1749       } else {
1750         if (!is_relay_log_recovery) {
1751           LogErr(WARNING_LEVEL, ER_RPL_RELAY_LOG_RECOVERY_INFO_AFTER_CLONE,
1752                  channel_name);
1753           // After a clone if we detect information is present we always invoke
1754           // relay log recovery. Not doing so would probably mean failure at
1755           // initialization due to missing relay log files.
1756           if (init_recovery(mi)) {
1757             msg = "Error on the relay log recovery after a clone operation";
1758             error = 1;
1759             goto err;
1760           }
1761         }
1762       }
1763     }
1764   }
1765 
1766   if (check_return == REPOSITORY_DOES_NOT_EXIST ||  // Hasn't been initialized
1767       check_return == REPOSITORY_CLEARED  // Was initialized but was RESET
1768   ) {
1769     /* Init relay log with first entry in the relay index file */
1770     if (reset_group_relay_log_pos(&msg)) {
1771       error = 1;
1772       goto err;
1773     }
1774     group_master_log_name[0] = 0;
1775     group_master_log_pos = 0;
1776   } else {
1777     if (is_relay_log_recovery && init_recovery(mi)) {
1778       error = 1;
1779       goto err;
1780     }
1781 
1782     if (is_group_relay_log_name_invalid(&msg)) {
1783       LogErr(ERROR_LEVEL, ER_RPL_MTS_RECOVERY_CANT_OPEN_RELAY_LOG,
1784              group_relay_log_name, std::to_string(group_relay_log_pos).c_str());
1785       error = 1;
1786       goto err;
1787     }
1788   }
1789 
1790   inited = true;
1791   error_on_rli_init_info = false;
1792   if (flush_info(true)) {
1793     msg = "Error reading relay log configuration";
1794     error = 1;
1795     goto err;
1796   }
1797 
1798   if (count_relay_log_space()) {
1799     msg = "Error counting relay log space";
1800     error = 1;
1801     goto err;
1802   }
1803 
1804   /*
1805     In case of MTS the recovery is deferred until the end of
1806     load_mi_and_rli_from_repositories.
1807   */
1808   if (!mi->rli->mts_recovery_group_cnt) is_relay_log_recovery = false;
1809   return error;
1810 
1811 err:
1812   handler->end_info();
1813   inited = false;
1814   error_on_rli_init_info = true;
1815   if (msg) LogErr(ERROR_LEVEL, ER_RPL_RLI_INIT_INFO_MSG, msg);
1816   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1817                   true /*need_lock_log=true*/, true /*need_lock_index=true*/);
1818   return error;
1819 }
1820 
end_info()1821 void Relay_log_info::end_info() {
1822   DBUG_TRACE;
1823 
1824   error_on_rli_init_info = false;
1825   if (!inited) return;
1826 
1827   handler->end_info();
1828 
1829   inited = false;
1830   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1831                   true /*need_lock_log=true*/, true /*need_lock_index=true*/);
1832   relay_log.harvest_bytes_written(this, true /*need_log_space_lock=true*/);
1833   /*
1834     Delete the slave's temporary tables from memory.
1835     In the future there will be other actions than this, to ensure persistance
1836     of slave's temp tables after shutdown.
1837   */
1838   close_temporary_tables();
1839 }
1840 
flush_current_log()1841 int Relay_log_info::flush_current_log() {
1842   DBUG_TRACE;
1843 
1844   /* When we come to this place in code, relay log may or not be initialized; */
1845   if (relay_log.flush()) return 2;
1846 
1847   return 0;
1848 }
1849 
set_master_info(Master_info * info)1850 void Relay_log_info::set_master_info(Master_info *info) { mi = info; }
1851 
1852 /**
1853   Stores the file and position where the execute-slave thread are in the
1854   relay log:
1855 
1856     - As this is only called by the slave thread or on STOP SLAVE, with the
1857       log_lock grabbed and the slave thread stopped, we don't need to have
1858       a lock here.
1859     - If there is an active transaction, then we don't update the position
1860       in the relay log.  This is to ensure that we re-execute statements
1861       if we die in the middle of an transaction that was rolled back.
1862     - As a transaction never spans binary logs, we don't have to handle the
1863       case where we do a relay-log-rotation in the middle of the transaction.
1864       If this would not be the case, we would have to ensure that we
1865       don't delete the relay log file where the transaction started when
1866       we switch to a new relay log file.
1867 
1868   @retval  0   ok,
1869   @retval  1   write error, otherwise.
1870 */
1871 
1872 /**
1873   Store the file and position where the slave's SQL thread are in the
1874   relay log.
1875 
1876   Notes:
1877 
1878   - This function should be called either from the slave SQL thread,
1879     or when the slave thread is not running.  (It reads the
1880     group_{relay|master}_log_{pos|name} and delay fields in the rli
1881     object.  These may only be modified by the slave SQL thread or by
1882     a client thread when the slave SQL thread is not running.)
1883 
1884   - If there is an active transaction, then we do not update the
1885     position in the relay log.  This is to ensure that we re-execute
1886     statements if we die in the middle of an transaction that was
1887     rolled back.
1888 
1889   - As a transaction never spans binary logs, we don't have to handle
1890     the case where we do a relay-log-rotation in the middle of the
1891     transaction.  If transactions could span several binlogs, we would
1892     have to ensure that we do not delete the relay log file where the
1893     transaction started before switching to a new relay log file.
1894 
1895   - Error can happen if writing to file fails or if flushing the file
1896     fails.
1897 
1898   @todo Change the log file information to a binary format to avoid
1899   calling longlong2str.
1900 
1901   @return 0 on success, 1 on error.
1902 */
flush_info(const bool force)1903 int Relay_log_info::flush_info(const bool force) {
1904   DBUG_TRACE;
1905 
1906   if (!inited) return 0;
1907 
1908   /*
1909     We update the sync_period at this point because only here we
1910     now that we are handling a relay log info. This needs to be
1911     update every time we call flush because the option maybe
1912     dinamically set.
1913   */
1914   mysql_mutex_lock(&mts_temp_table_LOCK);
1915   handler->set_sync_period(sync_relayloginfo_period);
1916 
1917   if (write_info(handler)) goto err;
1918 
1919   if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
1920     goto err;
1921 
1922   force_flush_postponed_due_to_split_trans = false;
1923   mysql_mutex_unlock(&mts_temp_table_LOCK);
1924   return 0;
1925 
1926 err:
1927   LogErr(ERROR_LEVEL, ER_RPL_ERROR_WRITING_RELAY_LOG_CONFIGURATION);
1928   mysql_mutex_unlock(&mts_temp_table_LOCK);
1929   return 1;
1930 }
1931 
check_if_info_was_cleared(const enum_return_check & previous_result) const1932 enum_return_check Relay_log_info::check_if_info_was_cleared(
1933     const enum_return_check &previous_result) const {
1934   enum_return_check result = previous_result;
1935 
1936   if (result == REPOSITORY_EXISTS) {
1937     char number_of_lines[FN_REFLEN] = {0};
1938 
1939     if (this->handler->prepare_info_for_read() ||
1940         !!this->handler->get_info(number_of_lines, sizeof(number_of_lines), ""))
1941       return ERROR_CHECKING_REPOSITORY;
1942 
1943     char *first_non_digit{nullptr};
1944     int lines = strtoul(number_of_lines, &first_non_digit, 10);
1945 
1946     if (number_of_lines[0] != '\0' && *first_non_digit == '\0' &&
1947         lines >= LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_ROW_FORMAT) {
1948       char log_name[FN_REFLEN] = {0};
1949 
1950       if (this->handler->get_info(log_name, sizeof(log_name), "") ==
1951           Rpl_info_handler::enum_field_get_status::FAILURE)
1952         return ERROR_CHECKING_REPOSITORY;
1953 
1954       if (log_name[0] == '\0') return REPOSITORY_CLEARED;
1955     }
1956   }
1957   return result;
1958 }
1959 
clear_info()1960 bool Relay_log_info::clear_info() {
1961   this->handler->init_info();
1962 
1963   if (this->handler->prepare_info_for_write() ||
1964       this->handler->set_info((int)MAXIMUM_LINES_IN_RELAY_LOG_INFO_FILE) ||
1965       this->handler->set_info(nullptr) || this->handler->set_info(nullptr) ||
1966       this->handler->set_info(nullptr) || this->handler->set_info(nullptr) ||
1967       this->handler->set_info(nullptr) || this->handler->set_info(nullptr) ||
1968       this->handler->set_info(nullptr) ||
1969       this->handler->set_info(this->channel))
1970     return true;
1971 
1972   if (this->m_privilege_checks_username.length() != 0) {
1973     if (this->handler->set_info(this->m_privilege_checks_username.c_str()))
1974       return true;
1975   } else {
1976     if (this->handler->set_info(nullptr)) {
1977       return true;
1978     }
1979   }
1980   if (this->m_privilege_checks_hostname.length() != 0) {
1981     if (this->handler->set_info(this->m_privilege_checks_hostname.c_str()))
1982       return true;
1983   } else {
1984     if (this->handler->set_info(nullptr)) return true;
1985   }
1986 
1987   if (this->handler->set_info(this->m_require_row_format)) return true;
1988 
1989   if (DBUG_EVALUATE_IF("rpl_rli_clear_info_error", true, false) ||
1990       this->handler->set_info((ulong)this->m_require_table_primary_key_check))
1991     return true;
1992 
1993   if (this->handler->flush_info(true)) return true;
1994 
1995   this->group_relay_log_name[0] = '\0';
1996   this->group_relay_log_pos = 0;
1997   this->group_master_log_name[0] = '\0';
1998   this->group_master_log_pos = 0;
1999   this->sql_delay = 0;
2000   this->recovery_parallel_workers = 0;
2001   this->internal_id = 1;
2002 
2003   return false;
2004 }
2005 
get_number_info_rli_fields()2006 size_t Relay_log_info::get_number_info_rli_fields() {
2007   return sizeof(info_rli_fields) / sizeof(info_rli_fields[0]);
2008 }
2009 
set_nullable_fields(MY_BITMAP * nullable_fields)2010 void Relay_log_info::set_nullable_fields(MY_BITMAP *nullable_fields) {
2011   bitmap_init(nullable_fields, nullptr,
2012               Relay_log_info::get_number_info_rli_fields());
2013   bitmap_set_all(nullable_fields);       // All fields may be NULL except for
2014   bitmap_clear_bit(nullable_fields, 0);  // NUMBER_OF_LINES and
2015   bitmap_clear_bit(nullable_fields, 8);  // CHANNEL_NAME
2016 }
2017 
start_sql_delay(time_t delay_end)2018 void Relay_log_info::start_sql_delay(time_t delay_end) {
2019   mysql_mutex_assert_owner(&data_lock);
2020   sql_delay_end = delay_end;
2021   THD_STAGE_INFO(info_thd, stage_sql_thd_waiting_until_delay);
2022 }
2023 
read_info(Rpl_info_handler * from)2024 bool Relay_log_info::read_info(Rpl_info_handler *from) {
2025   int lines = 0;
2026   char *first_non_digit = nullptr;
2027   ulong temp_group_relay_log_pos = 0;
2028   ulong temp_group_master_log_pos = 0;
2029   int temp_sql_delay = 0;
2030   int temp_internal_id = internal_id;
2031   int temp_require_row_format = 0;
2032   ulong temp_require_table_primary_key_check = Relay_log_info::PK_CHECK_STREAM;
2033   Rpl_info_handler::enum_field_get_status status{
2034       Rpl_info_handler::enum_field_get_status::FAILURE};
2035 
2036   DBUG_TRACE;
2037 
2038   /*
2039     Should not read RLI from file in client threads. Client threads
2040     only use RLI to execute BINLOG statements.
2041 
2042     @todo Uncomment the following assertion. Currently,
2043     Relay_log_info::init() is called from init_master_info() before
2044     the THD object Relay_log_info::sql_thd is created. That means we
2045     cannot call belongs_to_client() since belongs_to_client()
2046     dereferences Relay_log_info::sql_thd. So we need to refactor
2047     slightly: the THD object should be created by Relay_log_info
2048     constructor (or passed to it), so that we are guaranteed that it
2049     exists at this point. /Sven
2050   */
2051   // DBUG_ASSERT(!belongs_to_client());
2052 
2053   /*
2054     Starting from 5.1.x, relay-log.info has a new format. Now, its
2055     first line contains the number of lines in the file. By reading
2056     this number we can determine which version our master.info comes
2057     from. We can't simply count the lines in the file, since
2058     versions before 5.1.x could generate files with more lines than
2059     needed. If first line doesn't contain a number, or if it
2060     contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2061     then the file is treated like a file from pre-5.1.x version.
2062     There is no ambiguity when reading an old master.info: before
2063     5.1.x, the first line contained the binlog's name, which is
2064     either empty or has an extension (contains a '.'), so can't be
2065     confused with an integer.
2066 
2067     So we're just reading first line and trying to figure which
2068     version is this.
2069   */
2070 
2071   /*
2072     The first row is temporarily stored in mi->master_log_name, if
2073     it is line count and not binlog name (new format) it will be
2074     overwritten by the second row later.
2075   */
2076   if (from->prepare_info_for_read() ||
2077       !!from->get_info(group_relay_log_name, sizeof(group_relay_log_name), ""))
2078     return true;
2079 
2080   lines = strtoul(group_relay_log_name, &first_non_digit, 10);
2081 
2082   if (group_relay_log_name[0] != '\0' && *first_non_digit == '\0' &&
2083       lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) {
2084     /* Seems to be new format => read group relay log name */
2085     status =
2086         from->get_info(group_relay_log_name, sizeof(group_relay_log_name), "");
2087     if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2088     if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_IS_NULL)
2089       group_relay_log_name[0] = '\0';
2090   } else
2091     DBUG_PRINT("info", ("relay_log_info file is in old format."));
2092 
2093   status =
2094       from->get_info(&temp_group_relay_log_pos, (ulong)BIN_LOG_HEADER_SIZE);
2095   if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2096 
2097   status =
2098       from->get_info(group_master_log_name, sizeof(group_master_log_name), "");
2099   if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2100   if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_IS_NULL)
2101     group_master_log_name[0] = '\0';
2102 
2103   status = from->get_info(&temp_group_master_log_pos, 0UL);
2104   if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2105 
2106   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) {
2107     status = from->get_info(&temp_sql_delay, 0);
2108     if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2109   }
2110 
2111   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS) {
2112     status = from->get_info(&recovery_parallel_workers, 0UL);
2113     if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2114   }
2115 
2116   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID) {
2117     status = from->get_info(&temp_internal_id, 1);
2118     if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2119   }
2120 
2121   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL) {
2122     /* the default value is empty string"" */
2123     if (!!from->get_info(channel, sizeof(channel), "")) return true;
2124   }
2125 
2126   /*
2127    * Here +4 is used to accomodate string if debug point
2128    * `simulate_priv_check_username_above_limit` is set in test.
2129    */
2130   char temp_privilege_checks_username[PRIV_CHECKS_USERNAME_LENGTH + 4] = {0};
2131   char *username = nullptr;
2132   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_USERNAME) {
2133     status = from->get_info(temp_privilege_checks_username,
2134                             PRIV_CHECKS_USERNAME_LENGTH + 1, nullptr);
2135     if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2136     if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_NOT_NULL)
2137       username = temp_privilege_checks_username;
2138   }
2139 
2140   /*
2141    * Here +4 is used to accomodate string if debug point
2142    * `simulate_priv_check_hostname_above_limit` is set in test.
2143    */
2144   char temp_privilege_checks_hostname[PRIV_CHECKS_HOSTNAME_LENGTH + 4] = {0};
2145   char *hostname = nullptr;
2146   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_HOSTNAME) {
2147     status = from->get_info(temp_privilege_checks_hostname,
2148                             PRIV_CHECKS_HOSTNAME_LENGTH + 1, nullptr);
2149     if (status == Rpl_info_handler::enum_field_get_status::FAILURE) return true;
2150     if (status == Rpl_info_handler::enum_field_get_status::FIELD_VALUE_NOT_NULL)
2151       hostname = temp_privilege_checks_hostname;
2152   }
2153 
2154   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_ROW_FORMAT) {
2155     if (!!from->get_info(&temp_require_row_format, 0)) return true;
2156   } else {
2157     if (channel_map.is_group_replication_channel_name(channel))
2158       temp_require_row_format = 1;
2159   }
2160   m_require_row_format = temp_require_row_format;
2161 
2162   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_TABLE_PRIMARY_KEY_CHECK) {
2163     if (!!from->get_info(&temp_require_table_primary_key_check, 1)) return true;
2164   }
2165   if (temp_require_table_primary_key_check < PK_CHECK_STREAM ||
2166       temp_require_table_primary_key_check > Relay_log_info::PK_CHECK_OFF)
2167     return true;
2168   m_require_table_primary_key_check =
2169       static_cast<Relay_log_info::enum_require_table_primary_key>(
2170           temp_require_table_primary_key_check);
2171 
2172   group_relay_log_pos = temp_group_relay_log_pos;
2173   group_master_log_pos = temp_group_master_log_pos;
2174   sql_delay = (int32)temp_sql_delay;
2175   internal_id = (uint)temp_internal_id;
2176 
2177   DBUG_EXECUTE_IF("simulate_priv_check_username_above_limit", {
2178     strcpy(temp_privilege_checks_username,
2179            "repli_priv_checks_user_more_than_32");
2180     username = temp_privilege_checks_username;
2181   });
2182 
2183   DBUG_EXECUTE_IF("simulate_priv_check_hostname_above_limit", {
2184     strcpy(
2185         temp_privilege_checks_hostname,
2186         "replication_privilege_checks_hostname_more_than_255_replication_"
2187         "privilege_checks_hostname_more_than_255_replication_privilege_checks_"
2188         "hostname_more_than_255_replication_privilege_checks_hostname_more_"
2189         "than_255_replication_privilege_checks_hostname_more_than255");
2190     hostname = temp_privilege_checks_hostname;
2191   });
2192   enum_priv_checks_status error = set_privilege_checks_user(username, hostname);
2193   if (!!error) {
2194     set_privilege_checks_user_corrupted(true);
2195     report_privilege_check_error(WARNING_LEVEL, error, false, channel, username,
2196                                  hostname);
2197     return true;
2198   }
2199 
2200   return false;
2201 }
2202 
set_info_search_keys(Rpl_info_handler * to)2203 bool Relay_log_info::set_info_search_keys(Rpl_info_handler *to) {
2204   DBUG_TRACE;
2205 
2206   if (to->set_info(LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL, channel)) return true;
2207 
2208   return false;
2209 }
2210 
write_info(Rpl_info_handler * to)2211 bool Relay_log_info::write_info(Rpl_info_handler *to) {
2212   DBUG_TRACE;
2213 
2214   /*
2215     @todo Uncomment the following assertion. See todo in
2216     Relay_log_info::read_info() for details. /Sven
2217   */
2218   // DBUG_ASSERT(!belongs_to_client());
2219 
2220   if (to->prepare_info_for_write() ||
2221       to->set_info((int)MAXIMUM_LINES_IN_RELAY_LOG_INFO_FILE) ||
2222       to->set_info(group_relay_log_name) ||
2223       to->set_info((ulong)group_relay_log_pos) ||
2224       to->set_info(group_master_log_name) ||
2225       to->set_info((ulong)group_master_log_pos) ||
2226       to->set_info((int)sql_delay) || to->set_info(recovery_parallel_workers) ||
2227       to->set_info((int)internal_id) || to->set_info(channel))
2228     return true;
2229   if (m_privilege_checks_username.length()) {
2230     if (to->set_info(m_privilege_checks_username.c_str())) return true;
2231   } else {
2232 #ifndef DBUG_OFF
2233     if (DBUG_EVALUATE_IF("simulate_priv_check_user_nullptr_t", true, false)) {
2234       const char *null_char{nullptr};
2235       if (to->set_info(null_char)) return true;
2236     } else
2237 #endif
2238         if (to->set_info(nullptr)) {
2239       return true;
2240     }
2241   }
2242   if (m_privilege_checks_hostname.length()) {
2243     if (to->set_info(m_privilege_checks_hostname.c_str())) return true;
2244   } else {
2245 #ifndef DBUG_OFF
2246     if (DBUG_EVALUATE_IF("simulate_priv_check_user_nullptr_t", true, false)) {
2247       const uchar *null_char{nullptr};
2248       if (to->set_info(null_char, 0)) return true;
2249     } else
2250 #endif
2251         if (to->set_info(nullptr))
2252       return true;
2253   }
2254 
2255   if (to->set_info((int)m_require_row_format)) {
2256     return true; /* purecov: inspected */
2257   }
2258 
2259   if (to->set_info((ulong)m_require_table_primary_key_check)) {
2260     return true; /* purecov: inspected */
2261   }
2262   return false;
2263 }
2264 
2265 /**
2266    The method is run by SQL thread/MTS Coordinator.
2267    It replaces the current FD event with a new one.
2268    A version adaptation routine is invoked for the new FD
2269    to align the slave applier execution context with the master version.
2270 
2271    Since FD are shared by Coordinator and Workers in the MTS mode,
2272    deletion of the old FD is done through decrementing its usage counter.
2273    The destructor runs when the later drops to zero,
2274    also see @c Slave_worker::set_rli_description_event().
2275    The usage counter of the new FD is incremented.
2276 
2277    Although notice that MTS worker runs it, inefficiently (see assert),
2278    once at its destruction time.
2279 
2280    @param  fe Pointer to be installed into execution context
2281            FormatDescriptor event
2282 
2283    @return 1 if an error was encountered, 0 otherwise.
2284 */
2285 
set_rli_description_event(Format_description_log_event * fe)2286 int Relay_log_info::set_rli_description_event(
2287     Format_description_log_event *fe) {
2288   DBUG_TRACE;
2289   DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2290 
2291   if (fe) {
2292     ulong fe_version = adapt_to_master_version(fe);
2293 
2294     if (info_thd) {
2295       /* @see rpl_rli_pdb.h:Slave_worker::set_rli_description_event for a
2296          detailed explanation on the following code block's logic. */
2297       if (info_thd->variables.gtid_next.type == AUTOMATIC_GTID ||
2298           info_thd->variables.gtid_next.type == UNDEFINED_GTID) {
2299         bool in_active_multi_stmt =
2300             info_thd->in_active_multi_stmt_transaction();
2301 
2302         if (!is_in_group() && !in_active_multi_stmt) {
2303           DBUG_PRINT("info",
2304                      ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
2305           info_thd->variables.gtid_next.set_not_yet_determined();
2306         } else if (in_active_multi_stmt) {
2307           my_error(ER_VARIABLE_NOT_SETTABLE_IN_TRANSACTION, MYF(0),
2308                    "gtid_next");
2309           return 1;
2310         }
2311       }
2312 
2313       if (is_parallel_exec() && fe_version > 0) {
2314         /*
2315           Prepare for workers' adaption to a new FD version. Workers
2316           will see notification through scheduling of a first event of
2317           a new post-new-FD.
2318         */
2319         for (Slave_worker **it = workers.begin(); it != workers.end(); ++it)
2320           (*it)->fd_change_notified = false;
2321       }
2322     }
2323   }
2324   if (rli_description_event &&
2325       --rli_description_event->atomic_usage_counter == 0)
2326     delete rli_description_event;
2327 #ifndef DBUG_OFF
2328   else
2329     /* It must be MTS mode when the usage counter greater than 1. */
2330     DBUG_ASSERT(!rli_description_event || is_parallel_exec());
2331 #endif
2332   rli_description_event = fe;
2333   if (rli_description_event) ++rli_description_event->atomic_usage_counter;
2334 
2335   return 0;
2336 }
2337 
2338 struct st_feature_version {
2339   /*
2340     The enum must be in the version non-descending top-down order,
2341     the last item formally corresponds to highest possible server
2342     version (never reached, thereby no adapting actions here);
2343     enumeration starts from zero.
2344   */
2345   enum {
2346     WL6292_TIMESTAMP_EXPLICIT_DEFAULT = 0,
2347     _END_OF_LIST  // always last
2348   } item;
2349   /*
2350     Version where the feature is introduced.
2351   */
2352   uchar version_split[3];
2353   /*
2354     Action to perform when according to FormatDescriptor event Master
2355     is found to be feature-aware while previously it has *not* been.
2356   */
2357   void (*upgrade)(THD *);
2358   /*
2359     Action to perform when according to FormatDescriptor event Master
2360     is found to be feature-*un*aware while previously it has been.
2361   */
2362   void (*downgrade)(THD *);
2363 };
2364 
wl6292_upgrade_func(THD * thd)2365 static void wl6292_upgrade_func(THD *thd) {
2366   thd->variables.explicit_defaults_for_timestamp = false;
2367   if (global_system_variables.explicit_defaults_for_timestamp)
2368     thd->variables.explicit_defaults_for_timestamp = true;
2369 
2370   return;
2371 }
2372 
wl6292_downgrade_func(THD * thd)2373 static void wl6292_downgrade_func(THD *thd) {
2374   if (global_system_variables.explicit_defaults_for_timestamp)
2375     thd->variables.explicit_defaults_for_timestamp = false;
2376 
2377   return;
2378 }
2379 
2380 /**
2381    Sensitive to Master-vs-Slave version difference features
2382    should be listed in the version non-descending order.
2383 */
2384 static st_feature_version s_features[] = {
2385     // order is the same as in the enum
2386     {st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2387      {5, 6, 6},
2388      wl6292_upgrade_func,
2389      wl6292_downgrade_func},
2390     {st_feature_version::_END_OF_LIST, {255, 255, 255}, nullptr, nullptr}};
2391 
2392 /**
2393    The method computes the incoming "master"'s FD server version and that
2394    of the currently installed (if ever) rli_description_event, to
2395    invoke more specific method to compare the two and adapt slave applier
2396    execution context to the new incoming master's version.
2397 
2398    This method is specifically for STS applier/MTS Coordinator as well as
2399    for a user thread applying binlog events.
2400 
2401    @param  fdle  a pointer to new Format Description event that is being
2402                  set up a new execution context.
2403    @return 0                when the versions are equal,
2404            master_version   otherwise
2405 */
adapt_to_master_version(Format_description_log_event * fdle)2406 ulong Relay_log_info::adapt_to_master_version(
2407     Format_description_log_event *fdle) {
2408   ulong master_version, current_version, slave_version;
2409 
2410   slave_version = version_product(slave_version_split);
2411   /* When rli_description_event is uninitialized yet take the slave's version */
2412   master_version = !fdle ? slave_version : fdle->get_product_version();
2413   current_version = !rli_description_event
2414                         ? slave_version
2415                         : rli_description_event->get_product_version();
2416   return adapt_to_master_version_updown(master_version, current_version);
2417 }
2418 
2419 /**
2420   The method compares two supplied versions and carries out down- or
2421   up- grade customization of execution context of the slave applier
2422   (thd).
2423 
2424   The method is invoked in the STS case through
2425   Relay_log_info::adapt_to_master_version() right before a new master
2426   FD is installed into the applier execution context; in the MTS
2427   case it's done by the Worker when it's assigned with a first event
2428   after the latest new FD has been installed.
2429 
2430   Comparison of the current (old, existing) and the master (new,
2431   incoming) versions yields adaptive actions.
2432   To explain that, let's denote V_0 as the current, and the master's
2433   one as V_1.
2434   In the downgrade case (V_1 < V_0) a server feature that is undefined
2435   in V_1 but is defined starting from some V_f of [V_1 + 1, V_0] range
2436   (+1 to mean V_1 excluded) are invalidated ("removed" from execution context)
2437   by running so called here downgrade action.
2438   Conversely in the upgrade case a feature defined in [V_0 + 1, V_1] range
2439   is validated ("added" to execution context) by running its upgrade action.
2440   A typical use case showing how adaptive actions are necessary for the slave
2441   applier is when the master version is lesser than the slave's one.
2442   In such case events generated on the "older" master may need to be applied
2443   in their native server context. And such context can be provided by downgrade
2444   actions.
2445   Conversely, when the old master events are run out and a newer master's events
2446   show up for applying, the execution context will be upgraded through
2447   the namesake actions.
2448 
2449   Notice that a relay log may have two FD events, one the slave local
2450   and the other from the Master. As there's no concern for the FD
2451   originator this leads to two adapt_to_master_version() calls.
2452   It's not harmful as can be seen from the following example.
2453   Say the currently installed FD's version is
2454   V_m, then at relay-log rotation the following transition takes
2455   place:
2456 
2457      V_m  -adapt-> V_s -adapt-> V_m.
2458 
2459   here and further `m' subscript stands for the master, `s' for the slave.
2460   It's clear that in this case an ineffective V_m -> V_m transition occurs.
2461 
2462   At composing downgrade/upgrade actions keep in mind that the slave applier
2463   version transition goes the following route:
2464   The initial version is that of the slave server (V_ss).
2465   It changes to a magic 4.0 at the slave relay log initialization.
2466   In the following course versions are extracted from each FD read out,
2467   regardless of what server generated it. Here is a typical version
2468   transition sequence underscored with annotation:
2469 
2470    V_ss -> 4.0 -> V(FD_s^1) -> V(FD_m^2)   --->   V(FD_s^3) -> V(FD_m^4)  ...
2471 
2472     ----------     -----------------     --------  ------------------     ---
2473      bootstrap       1st relay log       rotation      2nd log            etc
2474 
2475   The upper (^) subscipt enumerates Format Description events, V(FD^i) stands
2476   for a function extrating the version data from the i:th FD.
2477 
2478   There won't be any action to execute when info_thd is undefined,
2479   e.g at bootstrap.
2480 
2481   @param  master_version   an upcoming new version
2482   @param  current_version  the current version
2483   @return 0                when the new version is equal to the current one,
2484           master_version   otherwise
2485 */
adapt_to_master_version_updown(ulong master_version,ulong current_version)2486 ulong Relay_log_info::adapt_to_master_version_updown(ulong master_version,
2487                                                      ulong current_version) {
2488   THD *thd = info_thd;
2489   /*
2490     When the SQL thread or MTS Coordinator executes this method
2491     there's a constraint on current_version argument.
2492   */
2493   DBUG_ASSERT(
2494       !thd || thd->rli_fake != nullptr ||
2495       thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER ||
2496       (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
2497        (!rli_description_event ||
2498         current_version == rli_description_event->get_product_version())));
2499 
2500   if (master_version == current_version)
2501     return 0;
2502   else if (!thd)
2503     return master_version;
2504 
2505   bool downgrade = master_version < current_version;
2506   /*
2507    find item starting from and ending at for which adaptive actions run
2508    for downgrade or upgrade branches.
2509    (todo: convert into bsearch when number of features will grow significantly)
2510  */
2511   long i, i_first = st_feature_version::_END_OF_LIST, i_last = i_first;
2512 
2513   for (i = 0; i < st_feature_version::_END_OF_LIST; i++) {
2514     ulong ver_f = version_product(s_features[i].version_split);
2515 
2516     if ((downgrade ? master_version : current_version) < ver_f &&
2517         i_first == st_feature_version::_END_OF_LIST)
2518       i_first = i;
2519     if ((downgrade ? current_version : master_version) < ver_f) {
2520       i_last = i;
2521       DBUG_ASSERT(i_last >= i_first);
2522       break;
2523     }
2524   }
2525 
2526   /*
2527      actions, executed in version non-descending st_feature_version order
2528   */
2529   for (i = i_first; i < i_last; i++) {
2530     /* Run time check of the st_feature_version items ordering */
2531     DBUG_ASSERT(!i || version_product(s_features[i - 1].version_split) <=
2532                           version_product(s_features[i].version_split));
2533 
2534     DBUG_ASSERT((downgrade ? master_version : current_version) <
2535                     version_product(s_features[i].version_split) &&
2536                 (downgrade ? current_version
2537                            : master_version >=
2538                                  version_product(s_features[i].version_split)));
2539 
2540     if (downgrade && s_features[i].downgrade) {
2541       s_features[i].downgrade(thd);
2542     } else if (s_features[i].upgrade) {
2543       s_features[i].upgrade(thd);
2544     }
2545   }
2546 
2547   return master_version;
2548 }
2549 
relay_log_number_to_name(uint number,char name[FN_REFLEN+1])2550 void Relay_log_info::relay_log_number_to_name(uint number,
2551                                               char name[FN_REFLEN + 1]) {
2552   char *str = nullptr;
2553   char relay_bin_channel[FN_REFLEN + 1];
2554   const char *relay_log_basename_channel = add_channel_to_relay_log_name(
2555       relay_bin_channel, FN_REFLEN + 1, relay_log_basename);
2556 
2557   /* str points to closing null of relay log basename channel */
2558   str = strmake(name, relay_log_basename_channel, FN_REFLEN + 1);
2559   *str++ = '.';
2560   sprintf(str, "%06u", number);
2561 }
2562 
relay_log_name_to_number(const char * name)2563 uint Relay_log_info::relay_log_name_to_number(const char *name) {
2564   return static_cast<uint>(atoi(fn_ext(name) + 1));
2565 }
2566 
is_mts_db_partitioned(Relay_log_info * rli)2567 bool is_mts_db_partitioned(Relay_log_info *rli) {
2568   return (rli->current_mts_submode->get_type() == MTS_PARALLEL_TYPE_DB_NAME);
2569 }
2570 
get_for_channel_str(bool upper_case) const2571 const char *Relay_log_info::get_for_channel_str(bool upper_case) const {
2572   if (rli_fake || mi == nullptr)
2573     return "";
2574   else
2575     return mi->get_for_channel_str(upper_case);
2576 }
2577 
add_gtid_set(const Gtid_set * gtid_set)2578 enum_return_status Relay_log_info::add_gtid_set(const Gtid_set *gtid_set) {
2579   DBUG_TRACE;
2580 
2581   get_sid_lock()->wrlock();
2582   enum_return_status return_status = this->gtid_set->add_gtid_set(gtid_set);
2583   get_sid_lock()->unlock();
2584 
2585   return return_status;
2586 }
2587 
get_until_log_name()2588 const char *Relay_log_info::get_until_log_name() {
2589   if (until_condition == UNTIL_MASTER_POS ||
2590       until_condition == UNTIL_RELAY_POS) {
2591     DBUG_ASSERT(until_option != nullptr);
2592     return ((Until_position *)until_option)->get_until_log_name();
2593   }
2594   return "";
2595 }
2596 
get_until_log_pos()2597 my_off_t Relay_log_info::get_until_log_pos() {
2598   if (until_condition == UNTIL_MASTER_POS ||
2599       until_condition == UNTIL_RELAY_POS) {
2600     DBUG_ASSERT(until_option != nullptr);
2601     return ((Until_position *)until_option)->get_until_log_pos();
2602   }
2603   return 0;
2604 }
2605 
init_until_option(THD * thd,const LEX_MASTER_INFO * master_param)2606 int Relay_log_info::init_until_option(THD *thd,
2607                                       const LEX_MASTER_INFO *master_param) {
2608   DBUG_TRACE;
2609   int ret = 0;
2610   Until_option *option = nullptr;
2611 
2612   until_condition = UNTIL_NONE;
2613   clear_until_option();
2614 
2615   try {
2616     if (master_param->pos) {
2617       Until_master_position *until_mp = nullptr;
2618 
2619       if (master_param->relay_log_pos) return ER_BAD_SLAVE_UNTIL_COND;
2620 
2621       option = until_mp = new Until_master_position(this);
2622       until_condition = UNTIL_MASTER_POS;
2623       ret = until_mp->init(master_param->log_file_name, master_param->pos);
2624     } else if (master_param->relay_log_pos) {
2625       Until_relay_position *until_rp = nullptr;
2626 
2627       if (master_param->pos) return ER_BAD_SLAVE_UNTIL_COND;
2628 
2629       option = until_rp = new Until_relay_position(this);
2630       until_condition = UNTIL_RELAY_POS;
2631       ret = until_rp->init(master_param->relay_log_name,
2632                            master_param->relay_log_pos);
2633     } else if (master_param->gtid) {
2634       Until_gtids *until_g = nullptr;
2635 
2636       if (LEX_MASTER_INFO::UNTIL_SQL_BEFORE_GTIDS ==
2637           master_param->gtid_until_condition) {
2638         option = until_g = new Until_before_gtids(this);
2639         until_condition = UNTIL_SQL_BEFORE_GTIDS;
2640       } else {
2641         DBUG_ASSERT(LEX_MASTER_INFO::UNTIL_SQL_AFTER_GTIDS ==
2642                     master_param->gtid_until_condition);
2643 
2644         option = until_g = new Until_after_gtids(this);
2645         until_condition = UNTIL_SQL_AFTER_GTIDS;
2646         if (opt_slave_parallel_workers != 0) {
2647           opt_slave_parallel_workers = 0;
2648           push_warning_printf(
2649               thd, Sql_condition::SL_NOTE, ER_MTS_FEATURE_IS_NOT_SUPPORTED,
2650               ER_THD(thd, ER_MTS_FEATURE_IS_NOT_SUPPORTED), "UNTIL condtion",
2651               "Slave is started in the sequential execution mode.");
2652         }
2653       }
2654       ret = until_g->init(master_param->gtid);
2655     } else if (master_param->until_after_gaps) {
2656       Until_mts_gap *until_mg = nullptr;
2657 
2658       option = until_mg = new Until_mts_gap(this);
2659       until_condition = UNTIL_SQL_AFTER_MTS_GAPS;
2660       until_mg->init();
2661     } else if (master_param->view_id) {
2662       Until_view_id *until_vi = nullptr;
2663 
2664       option = until_vi = new Until_view_id(this);
2665       until_condition = UNTIL_SQL_VIEW_ID;
2666       ret = until_vi->init(master_param->view_id);
2667     }
2668   } catch (...) {
2669     return ER_OUTOFMEMORY;
2670   }
2671 
2672   if (until_condition == UNTIL_MASTER_POS ||
2673       until_condition == UNTIL_RELAY_POS) {
2674     /* Issuing warning then started without --skip-slave-start */
2675     if (!opt_skip_slave_start)
2676       push_warning(thd, Sql_condition::SL_NOTE, ER_MISSING_SKIP_SLAVE,
2677                    ER_THD(thd, ER_MISSING_SKIP_SLAVE));
2678   }
2679 
2680   mysql_mutex_lock(&data_lock);
2681   until_option = option;
2682   mysql_mutex_unlock(&data_lock);
2683   return ret;
2684 }
2685 
detach_engine_ha_data(THD * thd)2686 void Relay_log_info::detach_engine_ha_data(THD *thd) {
2687   is_engine_ha_data_detached = true;
2688   /*
2689     In case of slave thread applier or processing binlog by client,
2690     detach the engine ha_data ("native" engine transaction)
2691     in favor of dynamically created.
2692   */
2693   plugin_foreach(thd, detach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN, nullptr);
2694 }
2695 
reattach_engine_ha_data(THD * thd)2696 void Relay_log_info::reattach_engine_ha_data(THD *thd) {
2697   is_engine_ha_data_detached = false;
2698   /*
2699     In case of slave thread applier or processing binlog by client,
2700     reattach the engine ha_data ("native" engine transaction)
2701     in favor of dynamically created.
2702   */
2703   plugin_foreach(thd, reattach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN,
2704                  nullptr);
2705 }
2706 
commit_positions()2707 bool Relay_log_info::commit_positions() {
2708   int error = 0;
2709   char saved_group_master_log_name[FN_REFLEN];
2710   my_off_t saved_group_master_log_pos;
2711   char saved_group_relay_log_name[FN_REFLEN];
2712   my_off_t saved_group_relay_log_pos;
2713 
2714   /*
2715     In FILE case Query_log_event::update_pos(), called after commit,
2716     updates the info object.
2717   */
2718   if (!is_transactional()) return false;
2719 
2720   mysql_mutex_lock(&data_lock);
2721 
2722   /* save the rli positions to restore after flush_info() */
2723   strmake(saved_group_master_log_name, get_group_master_log_name(),
2724           FN_REFLEN - 1);
2725   saved_group_master_log_pos = get_group_master_log_pos();
2726   strmake(saved_group_relay_log_name, get_group_relay_log_name(),
2727           FN_REFLEN - 1);
2728   saved_group_relay_log_pos = get_group_relay_log_pos();
2729 
2730   /* Update to new values just for the sake of flush_info */
2731   inc_event_relay_log_pos();
2732   set_group_relay_log_pos(get_event_relay_log_pos());
2733   set_group_relay_log_name(get_event_relay_log_name());
2734   set_group_master_log_pos(current_event->common_header->log_pos);
2735   /* save them too, but now for post_commit() time */
2736   strmake(new_group_master_log_name, get_group_master_log_name(),
2737           FN_REFLEN - 1);
2738   new_group_master_log_pos = get_group_master_log_pos();
2739   strmake(new_group_relay_log_name, get_group_relay_log_name(), FN_REFLEN - 1);
2740   new_group_relay_log_pos = get_group_relay_log_pos();
2741 
2742   error = flush_info(true);
2743 
2744   /*
2745     Restore the saved ones so they remain actual until the replicated
2746     statement commits.
2747   */
2748   set_group_master_log_name(saved_group_master_log_name);
2749   set_group_master_log_pos(saved_group_master_log_pos);
2750   set_group_relay_log_name(saved_group_relay_log_name);
2751   set_group_relay_log_pos(saved_group_relay_log_pos);
2752 
2753   mysql_mutex_unlock(&data_lock);
2754 
2755   return error != 0;
2756 }
2757 
post_commit(bool on_rollback)2758 void Relay_log_info::post_commit(bool on_rollback) {
2759   THD *thd = info_thd;
2760 
2761   if (on_rollback) {
2762     if (thd->owned_gtid_is_empty()) gtid_state->update_on_rollback(thd);
2763   } else {
2764     /*
2765       New executed coordinates prepared in pre_commit() are
2766       finally installed.
2767     */
2768     mysql_mutex_lock(&data_lock);
2769     set_group_master_log_name(new_group_master_log_name);
2770     set_group_master_log_pos(new_group_master_log_pos);
2771     set_group_relay_log_name(new_group_relay_log_name);
2772     set_group_relay_log_pos(new_group_relay_log_pos);
2773     mysql_mutex_unlock(&data_lock);
2774 
2775     if (is_transactional()) {
2776       /* DDL's commit has been completed */
2777       DBUG_ASSERT(
2778           !current_event || !is_atomic_ddl_event(current_event) ||
2779           static_cast<Query_log_event *>(current_event)->has_ddl_committed);
2780       /*
2781         Marked as already-committed DDL may have not updated the GTID
2782         executed state, which is the case when slave filters it out.
2783         on slave side with binlog filtering out.
2784         When this is detected now the gtid state will be sorted later,
2785         and the gtid-executed based signaling will be done in the
2786         "last-chance-to-commit" branch of Log_event::do_update_pos().
2787         However in order to enter the branch has_ddl_committed needs false.
2788       */
2789       if (!thd->owned_gtid_is_empty())
2790         static_cast<Query_log_event *>(current_event)->has_ddl_committed =
2791             false;
2792 
2793       mysql_mutex_lock(&data_lock);
2794       /* Post-commit cleanup for Relay_log_info::wait_for_pos() */
2795       if (is_group_master_log_pos_invalid)
2796         is_group_master_log_pos_invalid = false;
2797 
2798       notify_group_master_log_name_update();
2799       mysql_cond_broadcast(&data_cond);
2800       mysql_mutex_unlock(&data_lock);
2801     } else {
2802       /*
2803         In the non-transactional slave info repository case should the
2804         current event be DDL it would still have to finalize commit.
2805       */
2806       DBUG_ASSERT(
2807           !current_event || !is_atomic_ddl_event(current_event) ||
2808           !static_cast<Query_log_event *>(current_event)->has_ddl_committed);
2809     }
2810   }
2811 }
2812 
notify_relay_log_truncated()2813 void Relay_log_info::notify_relay_log_truncated() {
2814   mysql_mutex_lock(&data_lock);
2815   m_relay_log_truncated = true;
2816   mysql_mutex_unlock(&data_lock);
2817 }
2818 
clear_relay_log_truncated()2819 void Relay_log_info::clear_relay_log_truncated() {
2820   mysql_mutex_assert_owner(&data_lock);
2821   m_relay_log_truncated = false;
2822 }
2823 
is_time_for_mts_checkpoint()2824 bool Relay_log_info::is_time_for_mts_checkpoint() {
2825   if (is_parallel_exec() && opt_mts_checkpoint_period != 0) {
2826     struct timespec curr_clock;
2827     set_timespec_nsec(&curr_clock, 0);
2828     return diff_timespec(&curr_clock, &last_clock) >=
2829            opt_mts_checkpoint_period * 1000000ULL;
2830   }
2831   return false;
2832 }
2833 
operator !(Relay_log_info::enum_priv_checks_status status)2834 bool operator!(Relay_log_info::enum_priv_checks_status status) {
2835   return status == Relay_log_info::enum_priv_checks_status::SUCCESS;
2836 }
2837 
operator !(Relay_log_info::enum_require_row_status status)2838 bool operator!(Relay_log_info::enum_require_row_status status) {
2839   return status == Relay_log_info::enum_require_row_status::SUCCESS;
2840 }
2841 
get_privilege_checks_username() const2842 std::string Relay_log_info::get_privilege_checks_username() const {
2843   return this->m_privilege_checks_username;
2844 }
2845 
get_privilege_checks_hostname() const2846 std::string Relay_log_info::get_privilege_checks_hostname() const {
2847   return this->m_privilege_checks_hostname;
2848 }
2849 
is_privilege_checks_user_null() const2850 bool Relay_log_info::is_privilege_checks_user_null() const {
2851   DBUG_ASSERT(this->m_privilege_checks_username.length() != 0 ||
2852               (this->m_privilege_checks_username.length() == 0 &&
2853                this->m_privilege_checks_hostname.length() == 0));
2854   return this->m_privilege_checks_username.length() == 0;
2855 }
2856 
is_privilege_checks_user_corrupted() const2857 bool Relay_log_info::is_privilege_checks_user_corrupted() const {
2858   return this->m_privilege_checks_user_corrupted;
2859 }
2860 
clear_privilege_checks_user()2861 void Relay_log_info::clear_privilege_checks_user() {
2862   DBUG_TRACE;
2863   this->m_privilege_checks_username.clear();
2864   this->m_privilege_checks_hostname.clear();
2865   this->m_privilege_checks_user_corrupted = false;
2866 }
2867 
set_privilege_checks_user_corrupted(bool is_corrupted)2868 void Relay_log_info::set_privilege_checks_user_corrupted(bool is_corrupted) {
2869   DBUG_TRACE;
2870   this->m_privilege_checks_user_corrupted = is_corrupted;
2871 }
2872 
2873 Relay_log_info::enum_priv_checks_status
set_privilege_checks_user(char const * param_privilege_checks_username,char const * param_privilege_checks_hostname)2874 Relay_log_info::set_privilege_checks_user(
2875     char const *param_privilege_checks_username,
2876     char const *param_privilege_checks_hostname) {
2877   DBUG_TRACE;
2878 
2879   enum_priv_checks_status error = this->check_privilege_checks_user(
2880       param_privilege_checks_username, param_privilege_checks_hostname);
2881   if (!!error) return error;
2882 
2883   if (param_privilege_checks_username == nullptr) {
2884     this->clear_privilege_checks_user();
2885     return enum_priv_checks_status::SUCCESS;
2886   }
2887 
2888   this->m_privilege_checks_user_corrupted = false;
2889   this->m_privilege_checks_username =
2890       static_cast<std::string>(param_privilege_checks_username);
2891 
2892   if (param_privilege_checks_hostname != nullptr) {
2893     this->m_privilege_checks_hostname =
2894         static_cast<std::string>(param_privilege_checks_hostname);
2895     std::transform(this->m_privilege_checks_hostname.begin(),
2896                    this->m_privilege_checks_hostname.end(),
2897                    this->m_privilege_checks_hostname.begin(), ::tolower);
2898   }
2899 
2900   return enum_priv_checks_status::SUCCESS;
2901 }
2902 
2903 Relay_log_info::enum_priv_checks_status
check_privilege_checks_user()2904 Relay_log_info::check_privilege_checks_user() {
2905   DBUG_TRACE;
2906   DBUG_ASSERT(this->m_privilege_checks_username.length() != 0 ||
2907               (this->m_privilege_checks_username.length() == 0 &&
2908                this->m_privilege_checks_hostname.length() == 0));
2909 
2910   return this->check_privilege_checks_user(
2911       this->m_privilege_checks_username.length() != 0
2912           ? this->m_privilege_checks_username.data()
2913           : nullptr,
2914       this->m_privilege_checks_hostname.length() != 0
2915           ? this->m_privilege_checks_hostname.data()
2916           : nullptr);
2917 }
2918 
2919 Relay_log_info::enum_priv_checks_status
check_privilege_checks_user(char const * param_privilege_checks_username,char const * param_privilege_checks_hostname)2920 Relay_log_info::check_privilege_checks_user(
2921     char const *param_privilege_checks_username,
2922     char const *param_privilege_checks_hostname) {
2923   DBUG_TRACE;
2924 
2925   if (param_privilege_checks_username == nullptr) {
2926     if (param_privilege_checks_hostname != nullptr)
2927       return enum_priv_checks_status::USERNAME_NULL_HOSTNAME_NOT_NULL;
2928     return enum_priv_checks_status::SUCCESS;
2929   }
2930 
2931   if (strlen(param_privilege_checks_username) == 0)
2932     return enum_priv_checks_status::USER_ANONYMOUS;
2933 
2934   if (strlen(param_privilege_checks_username) > 32)
2935     return enum_priv_checks_status::USERNAME_TOO_LONG;
2936 
2937   if (param_privilege_checks_hostname != nullptr &&
2938       strlen(param_privilege_checks_hostname) > 255)
2939     return enum_priv_checks_status::HOSTNAME_TOO_LONG;
2940 
2941   if (param_privilege_checks_hostname != nullptr) {
2942     std::regex static const hostname_regex{"^((?![@])[\\x20-\\x7e])+$",
2943                                            std::regex_constants::ECMAScript};
2944     if (!std::regex_match(param_privilege_checks_hostname, hostname_regex))
2945       return enum_priv_checks_status::HOSTNAME_SYNTAX_ERROR;
2946   }
2947 
2948   enum_priv_checks_status error = this->check_applier_acl_user(
2949       param_privilege_checks_username, param_privilege_checks_hostname);
2950   if (!!error) return error;
2951 
2952   return enum_priv_checks_status::SUCCESS;
2953 }
2954 
check_applier_acl_user(char const * param_privilege_checks_username,char const * param_privilege_checks_hostname)2955 Relay_log_info::enum_priv_checks_status Relay_log_info::check_applier_acl_user(
2956     char const *param_privilege_checks_username,
2957     char const *param_privilege_checks_hostname) {
2958   DBUG_TRACE;
2959   DBUG_ASSERT(param_privilege_checks_username != nullptr &&
2960               strlen(param_privilege_checks_username) != 0);
2961 
2962   THD_instance_guard thd{current_thd != nullptr ? current_thd : this->info_thd};
2963   Acl_cache_lock_guard acl_cache_lock{thd, Acl_cache_lock_mode::READ_MODE};
2964   if (!acl_cache_lock.lock()) {  // If we're unable to acquire the lock we're
2965                                  // unable to check if the user exists
2966     return enum_priv_checks_status::USER_DOES_NOT_EXIST;  // so, return
2967                                                           // accordingly.
2968   }
2969 
2970   ACL_USER *applier_acl_user = find_acl_user(
2971       param_privilege_checks_hostname, param_privilege_checks_username, false);
2972 
2973   if (applier_acl_user == nullptr) {
2974     return enum_priv_checks_status::USER_DOES_NOT_EXIST;
2975   }
2976 
2977   return enum_priv_checks_status::SUCCESS;
2978 }
2979 
2980 std::pair<const char *, const char *>
print_applier_security_context_user_host() const2981 Relay_log_info::print_applier_security_context_user_host() const {
2982   if (this->m_privilege_checks_username.length() != 0) {
2983     return {this->m_privilege_checks_username.data(),
2984             this->m_privilege_checks_hostname.length() == 0
2985                 ? "%"
2986                 : this->m_privilege_checks_hostname.data()};
2987   } else if (this->info_thd != nullptr &&
2988              this->info_thd->security_context() != nullptr) {
2989     return {this->info_thd->security_context()->user().str != nullptr
2990                 ? this->info_thd->security_context()->user().str
2991                 : "",
2992             this->info_thd->security_context()->host().str != nullptr
2993                 ? this->info_thd->security_context()->host().str
2994                 : ""};
2995   }
2996   return {"", ""};
2997 }
2998 
report_privilege_check_error(enum loglevel level,enum_priv_checks_status status_code,bool to_client,char const * channel_name_arg,char const * user_name_arg,char const * host_name_arg) const2999 void Relay_log_info::report_privilege_check_error(
3000     enum loglevel level, enum_priv_checks_status status_code, bool to_client,
3001     char const *channel_name_arg, char const *user_name_arg,
3002     char const *host_name_arg) const {
3003   DBUG_TRACE;
3004 
3005   char const *channel_name{channel_name_arg != nullptr ? channel_name_arg
3006                                                        : get_channel()};
3007   char const *user_name{user_name_arg};
3008   char const *host_name{host_name_arg};
3009   if (user_name == nullptr) {
3010     std::tie(user_name, host_name) =
3011         this->print_applier_security_context_user_host();
3012   }
3013 
3014   switch (status_code) {
3015     case enum_priv_checks_status::SUCCESS: {
3016       DBUG_ASSERT(false);
3017       break;
3018     }
3019     case enum_priv_checks_status::USER_ANONYMOUS: {
3020       if (to_client)
3021         my_error(ER_CLIENT_PRIVILEGE_CHECKS_USER_CANNOT_BE_ANONYMOUS, MYF(0),
3022                  channel_name, host_name);
3023       else {
3024         THD_instance_guard thd{current_thd != nullptr ? current_thd
3025                                                       : this->info_thd};
3026         this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3027                      ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3028                      channel_name);
3029       }
3030       break;
3031     }
3032     case enum_priv_checks_status::USERNAME_TOO_LONG: {
3033       if (to_client)
3034         my_error(ER_WRONG_STRING_LENGTH, MYF(0), user_name, "user name", 32);
3035       else {
3036         THD_instance_guard thd{current_thd != nullptr ? current_thd
3037                                                       : this->info_thd};
3038         this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3039                      ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3040                      channel_name);
3041       }
3042       break;
3043     }
3044     case enum_priv_checks_status::HOSTNAME_TOO_LONG: {
3045       if (to_client)
3046         my_error(ER_WRONG_STRING_LENGTH, MYF(0), user_name, "host name", 255);
3047       else {
3048         THD_instance_guard thd{current_thd != nullptr ? current_thd
3049                                                       : this->info_thd};
3050         this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3051                      ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3052                      channel_name);
3053       }
3054       break;
3055     }
3056     case enum_priv_checks_status::HOSTNAME_SYNTAX_ERROR: {
3057       if (to_client)
3058         my_printf_error(ER_UNKNOWN_ERROR, "Malformed hostname (illegal symbol)",
3059                         MYF(0));
3060       else {
3061         THD_instance_guard thd{current_thd != nullptr ? current_thd
3062                                                       : this->info_thd};
3063         this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3064                      ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3065                      channel_name);
3066       }
3067       break;
3068     }
3069     case enum_priv_checks_status::USER_DATA_CORRUPTED:
3070     case enum_priv_checks_status::USERNAME_NULL_HOSTNAME_NOT_NULL: {
3071       if (to_client)
3072         my_error(ER_CLIENT_PRIVILEGE_CHECKS_USER_CORRUPT, MYF(0), channel_name);
3073       else {
3074         THD_instance_guard thd{current_thd != nullptr ? current_thd
3075                                                       : this->info_thd};
3076         this->report(level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT,
3077                      ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_CORRUPT),
3078                      channel_name);
3079       }
3080       break;
3081     }
3082     case enum_priv_checks_status::USER_DOES_NOT_EXIST: {
3083       if (to_client)
3084         my_error(ER_CLIENT_PRIVILEGE_CHECKS_USER_DOES_NOT_EXIST, MYF(0),
3085                  channel_name, user_name, host_name);
3086       else {
3087         THD_instance_guard thd{current_thd != nullptr ? current_thd
3088                                                       : this->info_thd};
3089         this->report(
3090             level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_DOES_NOT_EXIST,
3091             ER_THD(thd, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_DOES_NOT_EXIST),
3092             channel_name, user_name, host_name);
3093       }
3094       break;
3095     }
3096     case enum_priv_checks_status::USER_DOES_NOT_HAVE_PRIVILEGES: {
3097       if (!to_client) {
3098         THD_instance_guard thd{current_thd != nullptr ? current_thd
3099                                                       : this->info_thd};
3100         this->report(
3101             level, ER_WARN_LOG_PRIVILEGE_CHECKS_USER_NEEDS_RPL_APPLIER_PRIV,
3102             ER_THD(thd,
3103                    ER_WARN_LOG_PRIVILEGE_CHECKS_USER_NEEDS_RPL_APPLIER_PRIV),
3104             channel_name, user_name, host_name);
3105       }
3106       break;
3107     }
3108     case enum_priv_checks_status::LOAD_DATA_EVENT_NOT_ALLOWED: {
3109       if (!to_client) {
3110         THD_instance_guard thd{current_thd != nullptr ? current_thd
3111                                                       : this->info_thd};
3112         this->report(level, ER_FILE_PRIVILEGE_FOR_REPLICATION_CHECKS,
3113                      ER_THD(thd, ER_FILE_PRIVILEGE_FOR_REPLICATION_CHECKS),
3114                      channel_name);
3115       }
3116       break;
3117     }
3118   }
3119 }
3120 
3121 Relay_log_info::enum_priv_checks_status
initialize_security_context(THD * thd)3122 Relay_log_info::initialize_security_context(THD *thd) {
3123   DBUG_TRACE;
3124 
3125   if (this->m_privilege_checks_user_corrupted)
3126     return enum_priv_checks_status::USER_DATA_CORRUPTED;
3127 
3128   if (this->m_privilege_checks_username.length() != 0) {
3129     if (acl_getroot(thd, &thd->m_main_security_ctx,
3130                     this->m_privilege_checks_username.data(),
3131                     this->m_privilege_checks_hostname.data(), nullptr,
3132                     nullptr)) {
3133       return enum_priv_checks_status::USER_DOES_NOT_EXIST;
3134     }
3135 
3136     Roles::Role_activation role_activation{
3137         thd, thd->security_context(),
3138         opt_always_activate_granted_roles == 0 ? role_enum::ROLE_DEFAULT
3139                                                : role_enum::ROLE_ALL,
3140         nullptr, true};
3141     if (role_activation.activate())
3142       return enum_priv_checks_status::USER_DOES_NOT_HAVE_PRIVILEGES;
3143 
3144     bool has_grant{false};
3145     std::tie(has_grant, std::ignore) =
3146         thd->m_main_security_ctx.has_global_grant(
3147             STRING_WITH_LEN("REPLICATION_APPLIER"));
3148     if (!has_grant) {
3149       return enum_priv_checks_status::USER_DOES_NOT_HAVE_PRIVILEGES;
3150     }
3151   } else
3152     thd->security_context()->skip_grants();
3153 
3154   return enum_priv_checks_status::SUCCESS;
3155 }
3156 
3157 Relay_log_info::enum_priv_checks_status
initialize_applier_security_context()3158 Relay_log_info::initialize_applier_security_context() {
3159   DBUG_TRACE;
3160   return this->initialize_security_context(this->info_thd);
3161 }
3162 
is_row_format_required() const3163 bool Relay_log_info::is_row_format_required() const {
3164   return this->m_require_row_format;
3165 }
3166 
set_require_row_format(bool require_row)3167 void Relay_log_info::set_require_row_format(bool require_row) {
3168   DBUG_TRACE;
3169   this->m_require_row_format = require_row;
3170 }
3171 
3172 Relay_log_info::enum_require_table_primary_key
get_require_table_primary_key_check() const3173 Relay_log_info::get_require_table_primary_key_check() const {
3174   return this->m_require_table_primary_key_check;
3175 }
3176 
set_require_table_primary_key_check(Relay_log_info::enum_require_table_primary_key require_pk)3177 void Relay_log_info::set_require_table_primary_key_check(
3178     Relay_log_info::enum_require_table_primary_key require_pk) {
3179   DBUG_TRACE;
3180   this->m_require_table_primary_key_check = require_pk;
3181 }
3182 
MDL_lock_guard(THD * target)3183 MDL_lock_guard::MDL_lock_guard(THD *target) : m_target{target} { DBUG_TRACE; }
3184 
MDL_lock_guard(THD * target,MDL_key::enum_mdl_namespace namespace_arg,enum_mdl_type mdl_type_arg,bool blocking)3185 MDL_lock_guard::MDL_lock_guard(THD *target,
3186                                MDL_key::enum_mdl_namespace namespace_arg,
3187                                enum_mdl_type mdl_type_arg, bool blocking)
3188     : m_target{target} {
3189   DBUG_TRACE;
3190   this->lock(namespace_arg, mdl_type_arg, blocking);
3191 }
3192 
lock(MDL_key::enum_mdl_namespace namespace_arg,enum_mdl_type mdl_type_arg,bool blocking)3193 bool MDL_lock_guard::lock(MDL_key::enum_mdl_namespace namespace_arg,
3194                           enum_mdl_type mdl_type_arg, bool blocking) {
3195   DBUG_TRACE;
3196   if (this->m_target != nullptr &&
3197       !this->m_target->mdl_context.has_locks(namespace_arg)) {
3198     MDL_REQUEST_INIT(&this->m_request, namespace_arg, "", "", mdl_type_arg,
3199                      MDL_EXPLICIT);
3200 
3201     if (blocking)
3202       this->m_target->mdl_context.acquire_lock(
3203           &this->m_request, this->m_target->variables.lock_wait_timeout);
3204     else
3205       this->m_target->mdl_context.try_acquire_lock(&this->m_request);
3206 
3207     return !this->is_locked();
3208   }
3209   return true;
3210 }
3211 
~MDL_lock_guard()3212 MDL_lock_guard::~MDL_lock_guard() {
3213   DBUG_TRACE;
3214   if (this->m_request.ticket != nullptr) {
3215     this->m_target->mdl_context.release_lock(this->m_request.ticket);
3216   }
3217 }
3218 
is_locked()3219 bool MDL_lock_guard::is_locked() { return this->m_request.ticket != nullptr; }
3220 
Applier_security_context_guard(Relay_log_info const * rli,THD const * thd)3221 Applier_security_context_guard::Applier_security_context_guard(
3222     Relay_log_info const *rli, THD const *thd)
3223     : m_target{rli},
3224       m_thd{thd},
3225       m_current{nullptr},
3226       m_previous{nullptr},
3227       m_privilege_checks_none{
3228           this->m_target->get_privilege_checks_username().length() == 0 &&
3229           (this->m_thd->lex == nullptr ||
3230            this->m_thd->lex->binlog_stmt_arg.length == 0 ||
3231            this->m_thd->lex->binlog_stmt_arg.length >= 2048 ||
3232            this->m_thd->lex->binlog_stmt_arg.str == nullptr)} {
3233   DBUG_TRACE;
3234 
3235   if (this->m_thd->lex != nullptr &&
3236       this->m_thd->lex->binlog_stmt_arg.length != 0 &&
3237       this->m_thd->lex->binlog_stmt_arg.length < 2048 &&
3238       this->m_thd->lex->binlog_stmt_arg.str != nullptr) {
3239     Security_context *standing_ctx = this->m_thd->security_context();
3240     if (standing_ctx != nullptr &&
3241         (standing_ctx->check_access(SUPER_ACL) ||
3242          standing_ctx->has_global_grant(STRING_WITH_LEN("BINLOG_ADMIN")).first))
3243       this->m_privilege_checks_none = true;
3244   }
3245   if (this->m_privilege_checks_none) return;
3246 
3247   if (this->m_target->get_privilege_checks_username().length() != 0) {
3248     std::string user = this->m_target->get_privilege_checks_username();
3249     std::string host = this->m_target->get_privilege_checks_hostname();
3250     LEX_CSTRING username{user.c_str(), user.length()};
3251     LEX_CSTRING hostname{host.c_str(), host.length()};
3252 
3253     if (this->m_applier_security_ctx.change_security_context(
3254             const_cast<THD *>(this->m_thd), username, hostname, nullptr,
3255             &this->m_previous)) {
3256       return;
3257     }
3258   }
3259 
3260   if (this->m_previous != nullptr) {
3261     Roles::Role_activation role_activation{
3262         const_cast<THD *>(this->m_thd), this->m_thd->security_context(),
3263         opt_always_activate_granted_roles == 0 ? role_enum::ROLE_DEFAULT
3264                                                : role_enum::ROLE_ALL,
3265         nullptr, true};
3266     if (role_activation.activate()) return;
3267   }
3268 
3269   this->m_current = this->m_thd->security_context();
3270 }
3271 
~Applier_security_context_guard()3272 Applier_security_context_guard::~Applier_security_context_guard() {
3273   if (this->m_privilege_checks_none) return;
3274 
3275   if (this->m_previous != nullptr && this->m_previous != this->m_current)
3276     this->m_applier_security_ctx.restore_security_context(
3277         const_cast<THD *>(this->m_thd), this->m_previous);
3278 }
3279 
skip_priv_checks() const3280 bool Applier_security_context_guard::skip_priv_checks() const {
3281   return this->m_privilege_checks_none;
3282 }
3283 
has_access(std::initializer_list<ulong> extra_privileges) const3284 bool Applier_security_context_guard::has_access(
3285     std::initializer_list<ulong> extra_privileges) const {
3286   if (this->m_privilege_checks_none) return true;
3287   if (this->m_current == nullptr) return false;
3288 
3289   for (auto privilege : extra_privileges)
3290     if (!this->m_current->check_access(privilege, "", true)) return false;
3291 
3292   return true;
3293 }
3294 
has_access(std::initializer_list<std::string> extra_privileges) const3295 bool Applier_security_context_guard::has_access(
3296     std::initializer_list<std::string> extra_privileges) const {
3297   if (this->m_privilege_checks_none) return true;
3298   if (this->m_current == nullptr) return false;
3299 
3300   for (auto privilege : extra_privileges) {
3301     if (!this->m_current->has_global_grant(privilege.data(), privilege.length())
3302              .first)
3303       return false;
3304   }
3305   return true;
3306 }
3307 
has_access(std::vector<std::tuple<ulong,TABLE const *,Rows_log_event * >> & extra_privileges) const3308 bool Applier_security_context_guard::has_access(
3309     std::vector<std::tuple<ulong, TABLE const *, Rows_log_event *>>
3310         &extra_privileges) const {
3311   if (this->m_privilege_checks_none) return true;
3312   if (this->m_current == nullptr) return false;
3313 
3314   ulong priv{0};
3315   TABLE const *table{nullptr};
3316 
3317   if (this->m_thd->variables.binlog_row_image == BINLOG_ROW_IMAGE_FULL) {
3318     for (auto tpl : extra_privileges) {
3319       std::tie(priv, table, std::ignore) = tpl;
3320       if (this->m_current->is_table_blocked(priv, table)) return false;
3321     }
3322   } else {
3323     Rows_log_event *event{nullptr};
3324 
3325     for (auto tpl : extra_privileges) {
3326       std::tie(priv, table, event) = tpl;
3327 
3328       if (event->get_general_type_code() == binary_log::DELETE_ROWS_EVENT) {
3329         if (this->m_current->is_table_blocked(priv, table)) return false;
3330       } else {
3331         std::vector<std::string> columns;
3332         this->extract_columns_to_check(table, event, columns);
3333         if (!this->m_current->has_column_access(priv, table, columns))
3334           return false;
3335       }
3336     }
3337   }
3338 
3339   return true;
3340 }
3341 
get_username() const3342 std::string Applier_security_context_guard::get_username() const {
3343   if (this->m_privilege_checks_none || this->m_current == nullptr) return "";
3344   return std::string(this->m_current->user().str,
3345                      this->m_current->user().length);
3346 }
3347 
get_hostname() const3348 std::string Applier_security_context_guard::get_hostname() const {
3349   if (this->m_privilege_checks_none || this->m_current == nullptr) return "";
3350   return std::string(this->m_current->host().str,
3351                      this->m_current->host().length);
3352 }
3353 
extract_columns_to_check(TABLE const * table,Rows_log_event * event,std::vector<std::string> & columns) const3354 void Applier_security_context_guard::extract_columns_to_check(
3355     TABLE const *table, Rows_log_event *event,
3356     std::vector<std::string> &columns) const {
3357   MY_BITMAP const *bitmap{nullptr};
3358 
3359   if (event->get_general_type_code() == binary_log::WRITE_ROWS_EVENT)
3360     bitmap = event->get_cols();
3361   else if (event->get_general_type_code() == binary_log::UPDATE_ROWS_EVENT)
3362     bitmap = event->get_cols_ai();
3363   else
3364     return;
3365 
3366   size_t max = std::min(bitmap->n_bits, table->s->fields);
3367   for (size_t idx = 0; idx != max; ++idx) {
3368     if (bitmap_is_set(bitmap, idx)) {
3369       columns.push_back(table->field[idx]->field_name);
3370     }
3371   }
3372 }
3373