1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "rpl_rli.h"
24 
25 #include "my_dir.h"                // MY_STAT
26 #include "log.h"                   // sql_print_error
27 #include "log_event.h"             // Log_event
28 #include "m_string.h"
29 #include "rpl_group_replication.h" // set_group_replication_retrieved_certifi...
30 #include "rpl_info_factory.h"      // Rpl_info_factory
31 #include "rpl_mi.h"                // Master_info
32 #include "rpl_msr.h"               // channel_map
33 #include "rpl_rli_pdb.h"           // Slave_worker
34 #include "sql_base.h"              // close_thread_tables
35 #include "strfunc.h"               // strconvert
36 #include "transaction.h"           // trans_commit_stmt
37 #include "debug_sync.h"
38 #include "pfs_file_provider.h"
39 #include "mysql/psi/mysql_file.h"
40 #include "mutex_lock.h"            // Mutex_lock
41 
42 #include <algorithm>
43 using std::min;
44 using std::max;
45 
46 /*
47   Please every time you add a new field to the relay log info, update
48   what follows. For now, this is just used to get the number of
49   fields.
50 */
51 const char* info_rli_fields[]=
52 {
53   "number_of_lines",
54   "group_relay_log_name",
55   "group_relay_log_pos",
56   "group_master_log_name",
57   "group_master_log_pos",
58   "sql_delay",
59   "number_of_workers",
60   "id",
61   "channel_name"
62 };
63 
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel,bool is_rli_fake)64 Relay_log_info::Relay_log_info(bool is_slave_recovery
65 #ifdef HAVE_PSI_INTERFACE
66                                ,PSI_mutex_key *param_key_info_run_lock,
67                                PSI_mutex_key *param_key_info_data_lock,
68                                PSI_mutex_key *param_key_info_sleep_lock,
69                                PSI_mutex_key *param_key_info_thd_lock,
70                                PSI_mutex_key *param_key_info_data_cond,
71                                PSI_mutex_key *param_key_info_start_cond,
72                                PSI_mutex_key *param_key_info_stop_cond,
73                                PSI_mutex_key *param_key_info_sleep_cond
74 #endif
75                                , uint param_id, const char *param_channel,
76                                bool is_rli_fake
77                               )
78    :Rpl_info("SQL"
79 #ifdef HAVE_PSI_INTERFACE
80              ,param_key_info_run_lock, param_key_info_data_lock,
81              param_key_info_sleep_lock, param_key_info_thd_lock,
82              param_key_info_data_cond, param_key_info_start_cond,
83              param_key_info_stop_cond, param_key_info_sleep_cond
84 #endif
85              , param_id, param_channel
86             ),
87    replicate_same_server_id(::replicate_same_server_id),
88    cur_log_fd(-1), relay_log(&sync_relaylog_period, SEQ_READ_APPEND),
89    is_relay_log_recovery(is_slave_recovery),
90    save_temporary_tables(0),
91    cur_log_old_open_count(0), error_on_rli_init_info(false),
92    group_relay_log_pos(0), event_relay_log_number(0),
93    event_relay_log_pos(0), event_start_pos(0),
94    group_master_log_pos(0),
95    gtid_set(global_sid_map, global_sid_lock),
96    rli_fake(is_rli_fake),
97    gtid_retrieved_initialized(false),
98    is_group_master_log_pos_invalid(false),
99    log_space_total(0), ignore_log_space_limit(0),
100    sql_force_rotate_relay(false),
101    last_master_timestamp(0), slave_skip_counter(0),
102    abort_pos_wait(0), until_condition(UNTIL_NONE),
103    until_log_pos(0),
104    until_sql_gtids(global_sid_map),
105    until_sql_gtids_first_event(true),
106    trans_retries(0), retried_trans(0),
107    tables_to_lock(0), tables_to_lock_count(0),
108    rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
109    workers(PSI_NOT_INSTRUMENTED),
110    workers_array_initialized(false),
111    curr_group_assigned_parts(PSI_NOT_INSTRUMENTED),
112    curr_group_da(PSI_NOT_INSTRUMENTED),
113    slave_parallel_workers(0),
114    exit_counter(0),
115    max_updated_index(0),
116    recovery_parallel_workers(0), checkpoint_seqno(0),
117    checkpoint_group(opt_mts_checkpoint_group),
118    recovery_groups_inited(false), mts_recovery_group_cnt(0),
119    mts_recovery_index(0), mts_recovery_group_seen_begin(0),
120    mts_group_status(MTS_NOT_IN_GROUP),
121    stats_exec_time(0), stats_read_time(0),
122    least_occupied_workers(PSI_NOT_INSTRUMENTED),
123    current_mts_submode(0),
124    reported_unsafe_warning(false), rli_description_event(NULL),
125    commit_order_mngr(NULL),
126    sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
127    long_find_row_note_printed(false),
128    thd_tx_priority(0),
129    m_ignore_write_set_memory_limit(false),
130    m_allow_drop_write_set(false),
131    is_engine_ha_data_detached(false)
132 {
133   DBUG_ENTER("Relay_log_info::Relay_log_info");
134 
135 #ifdef HAVE_PSI_INTERFACE
136   relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
137                          key_RELAYLOG_LOCK_commit,
138                          key_RELAYLOG_LOCK_commit_queue,
139                          key_RELAYLOG_LOCK_done,
140                          key_RELAYLOG_LOCK_flush_queue,
141                          key_RELAYLOG_LOCK_log,
142                          PSI_NOT_INSTRUMENTED, /* Relaylog doesn't support LOCK_binlog_end_pos */
143                          key_RELAYLOG_LOCK_sync,
144                          key_RELAYLOG_LOCK_sync_queue,
145                          key_RELAYLOG_LOCK_xids,
146                          key_RELAYLOG_COND_done,
147                          key_RELAYLOG_update_cond,
148                          key_RELAYLOG_prep_xids_cond,
149                          key_file_relaylog,
150                          key_file_relaylog_index,
151                          key_file_relaylog_cache,
152                          key_file_relaylog_index_cache);
153 #endif
154 
155   group_relay_log_name[0]= event_relay_log_name[0]=
156     group_master_log_name[0]= 0;
157   until_log_name[0]= ign_master_log_name_end[0]= 0;
158   set_timespec_nsec(&last_clock, 0);
159   memset(&cache_buf, 0, sizeof(cache_buf));
160   cached_charset_invalidate();
161   inited_hash_workers= FALSE;
162   channel_open_temp_tables.atomic_set(0);
163   /*
164     For applier threads, currently_executing_gtid is set to automatic
165     when they are not executing any transaction.
166   */
167   currently_executing_gtid.set_automatic();
168 
169   if (!rli_fake)
170   {
171     mysql_mutex_init(key_relay_log_info_log_space_lock,
172                      &log_space_lock, MY_MUTEX_INIT_FAST);
173     mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond);
174     mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
175                      MY_MUTEX_INIT_FAST);
176     mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond);
177     mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
178                    MY_MUTEX_INIT_FAST);
179     mysql_mutex_init(key_mts_temp_table_LOCK, &mts_temp_table_LOCK,
180                      MY_MUTEX_INIT_FAST);
181     mysql_mutex_init(key_mts_gaq_LOCK, &mts_gaq_LOCK,
182                      MY_MUTEX_INIT_FAST);
183     mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
184 
185     relay_log.init_pthread_objects();
186     force_flush_postponed_due_to_split_trans= false;
187   }
188   do_server_version_split(::server_version, slave_version_split);
189 
190   DBUG_VOID_RETURN;
191 }
192 
193 /**
194    The method to invoke at slave threads start
195 */
init_workers(ulong n_workers)196 void Relay_log_info::init_workers(ulong n_workers)
197 {
198   /*
199     Parallel slave parameters initialization is done regardless
200     whether the feature is or going to be active or not.
201   */
202   mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
203   mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
204   mts_total_wait_overlap= 0;
205   mts_total_wait_worker_avail= 0;
206   mts_last_online_stat= 0;
207 
208   workers.reserve(n_workers);
209   workers_array_initialized= true; //set after init
210 }
211 
212 /**
213    The method to invoke at slave threads stop
214 */
deinit_workers()215 void Relay_log_info::deinit_workers()
216 {
217   workers.clear();
218 }
219 
~Relay_log_info()220 Relay_log_info::~Relay_log_info()
221 {
222   DBUG_ENTER("Relay_log_info::~Relay_log_info");
223 
224   if(!rli_fake)
225   {
226     if (recovery_groups_inited)
227       bitmap_free(&recovery_groups);
228     delete current_mts_submode;
229 
230     if(workers_copy_pfs.size())
231     {
232       for (int i= static_cast<int>(workers_copy_pfs.size()) - 1; i >= 0; i--)
233         delete workers_copy_pfs[i];
234       workers_copy_pfs.clear();
235     }
236 
237     mysql_mutex_destroy(&log_space_lock);
238     mysql_cond_destroy(&log_space_cond);
239     mysql_mutex_destroy(&pending_jobs_lock);
240     mysql_cond_destroy(&pending_jobs_cond);
241     mysql_mutex_destroy(&exit_count_lock);
242     mysql_mutex_destroy(&mts_temp_table_LOCK);
243     mysql_mutex_destroy(&mts_gaq_LOCK);
244     mysql_cond_destroy(&logical_clock_cond);
245     relay_log.cleanup();
246   }
247 
248   set_rli_description_event(NULL);
249 
250   DBUG_VOID_RETURN;
251 }
252 
253 /**
254    Method is called when MTS coordinator senses the relay-log name
255    has been changed.
256    It marks each Worker member with this fact to make an action
257    at time it will distribute a terminal event of a group to the Worker.
258 
259    Worker receives the new name at the group commiting phase
260    @c Slave_worker::slave_worker_ends_group().
261 */
reset_notified_relay_log_change()262 void Relay_log_info::reset_notified_relay_log_change()
263 {
264   if (!is_parallel_exec())
265     return;
266   for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
267   {
268     Slave_worker *w= *it;
269     w->relay_log_change_notified= FALSE;
270   }
271 }
272 
273 /**
274    This method is called in mts_checkpoint_routine() to mark that each
275    worker is required to adapt to a new checkpoint data whose coordinates
276    are passed to it through GAQ index.
277 
278    Worker notices the new checkpoint value at the group commit to reset
279    the current bitmap and starts using the clean bitmap indexed from zero
280    of being reset checkpoint_seqno.
281 
282     New seconds_behind_master timestamp is installed.
283 
284    @param shift            number of bits to shift by Worker due to the
285                            current checkpoint change.
286    @param new_ts           new seconds_behind_master timestamp value
287                            unless zero. Zero could be due to FD event
288                            or fake rotate event.
289    @param need_data_lock   False if caller has locked @c data_lock
290    @param update_timestamp if true, this function will update the
291                            rli->last_master_timestamp.
292 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool need_data_lock,bool update_timestamp)293 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
294                                                bool need_data_lock,
295                                                bool update_timestamp)
296 {
297   /*
298     If this is not a parallel execution we return immediately.
299   */
300   if (!is_parallel_exec())
301     return;
302 
303   for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
304   {
305     Slave_worker *w= *it;
306     /*
307       Reseting the notification information in order to force workers to
308       assign jobs with the new updated information.
309       Notice that the bitmap_shifted is accumulated to indicate how many
310       consecutive jobs were successfully processed.
311 
312       The worker when assigning a new job will set the value back to
313       zero.
314     */
315     w->checkpoint_notified= FALSE;
316     w->bitmap_shifted= w->bitmap_shifted + shift;
317     /*
318       Zero shift indicates the caller rotates the master binlog.
319       The new name will be passed to W through the group descriptor
320       during the first post-rotation time scheduling.
321     */
322     if (shift == 0)
323       w->master_log_change_notified= false;
324 
325     DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
326                        "worker->bitmap_shifted --> %lu, worker --> %u.",
327                        shift, w->bitmap_shifted,
328                        static_cast<unsigned>(it - workers.begin())));
329   }
330   /*
331     There should not be a call where (shift == 0 && checkpoint_seqno != 0).
332     Then the new checkpoint sequence is updated by subtracting the number
333     of consecutive jobs that were successfully processed.
334   */
335   assert(current_mts_submode->get_type() != MTS_PARALLEL_TYPE_DB_NAME ||
336          !(shift == 0 && checkpoint_seqno != 0));
337   checkpoint_seqno= checkpoint_seqno - shift;
338   DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
339              "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
340 
341   if (update_timestamp)
342   {
343     if (need_data_lock)
344       mysql_mutex_lock(&data_lock);
345     else
346       mysql_mutex_assert_owner(&data_lock);
347     last_master_timestamp= new_ts;
348     if (need_data_lock)
349       mysql_mutex_unlock(&data_lock);
350   }
351 }
352 
353 /**
354    Reset recovery info from Worker info table and
355    mark MTS recovery is completed.
356 
357    @return false on success true when @c reset_notified_checkpoint failed.
358 */
mts_finalize_recovery()359 bool Relay_log_info::mts_finalize_recovery()
360 {
361   bool ret= false;
362   uint i;
363   uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
364 
365   DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
366 
367   for (Slave_worker **it= workers.begin(); !ret && it != workers.end(); ++it)
368   {
369     Slave_worker *w= *it;
370     ret= w->reset_recovery_info();
371     DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
372   }
373   /*
374     The loop is traversed in the worker index descending order due
375     to specifics of the Worker table repository that does not like
376     even temporary holes. Therefore stale records are deleted
377     from the tail.
378   */
379   DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
380                   {DBUG_SET("+d,mts_worker_thread_init_fails");});
381   for (i= recovery_parallel_workers; i > workers.size() && !ret; i--)
382   {
383     Slave_worker *w=
384       Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
385     /*
386       If an error occurs during the above create_worker call, the newly created
387       worker object gets deleted within the above function call itself and only
388       NULL is returned. Hence the following check has been added to verify
389       that a valid worker object exists.
390     */
391     if (w)
392     {
393       ret= w->remove_info();
394       delete w;
395     }
396     else
397     {
398       ret= true;
399       goto err;
400     }
401   }
402   recovery_parallel_workers= slave_parallel_workers;
403 
404 err:
405   DBUG_RETURN(ret);
406 }
407 
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)408 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
409 {
410   MY_STAT s;
411   DBUG_ENTER("add_relay_log");
412   mysql_mutex_assert_owner(&rli->log_space_lock);
413   if (!mysql_file_stat(key_file_relaylog,
414                        linfo->log_file_name, &s, MYF(0)))
415   {
416     sql_print_error("log %s listed in the index, but failed to stat.",
417                     linfo->log_file_name);
418     DBUG_RETURN(1);
419   }
420   rli->log_space_total += s.st_size;
421 #ifndef NDEBUG
422   char buf[22];
423   DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
424 #endif
425   DBUG_RETURN(0);
426 }
427 
count_relay_log_space()428 int Relay_log_info::count_relay_log_space()
429 {
430   LOG_INFO flinfo;
431   DBUG_ENTER("Relay_log_info::count_relay_log_space");
432   Mutex_lock lock(&log_space_lock);
433   log_space_total= 0;
434   if (relay_log.find_log_pos(&flinfo, NullS, 1))
435   {
436     sql_print_error("Could not find first log while counting relay log space.");
437     DBUG_RETURN(1);
438   }
439   do
440   {
441     if (add_relay_log(this, &flinfo))
442       DBUG_RETURN(1);
443   } while (!relay_log.find_next_log(&flinfo, 1));
444   /*
445      As we have counted everything, including what may have written in a
446      preceding write, we must reset bytes_written, or we may count some space
447      twice.
448   */
449   relay_log.reset_bytes_written();
450   DBUG_RETURN(0);
451 }
452 
453 /**
454    Resets UNTIL condition for Relay_log_info
455  */
456 
clear_until_condition()457 void Relay_log_info::clear_until_condition()
458 {
459   DBUG_ENTER("clear_until_condition");
460 
461   until_condition= Relay_log_info::UNTIL_NONE;
462   until_log_name[0]= 0;
463   until_log_pos= 0;
464   until_sql_gtids.clear();
465   until_sql_gtids_first_event= true;
466   DBUG_VOID_RETURN;
467 }
468 
469 /**
470   Opens and intialize the given relay log. Specifically, it does what follows:
471 
472   - Closes old open relay log files.
473   - If we are using the same relay log as the running IO-thread, then sets.
474     rli->cur_log to point to the same IO_CACHE entry.
475   - If not, opens the 'log' binary file.
476 
477   @todo check proper initialization of
478   group_master_log_name/group_master_log_pos. /alfranio
479 
480   @param rli[in] Relay information (will be initialized)
481   @param log[in] Name of relay log file to read from. NULL = First log
482   @param pos[in] Position in relay log file
483   @param need_data_lock[in] If true, this function will acquire the
484   relay_log.data_lock(); otherwise the caller should already have
485   acquired it.
486   @param errmsg[out] On error, this function will store a pointer to
487   an error message here
488   @param keep_looking_for_fd[in] If true, this function will
489   look for a Format_description_log_event.  We only need this when the
490   SQL thread starts and opens an existing relay log and has to execute
491   it (possibly from an offset >4); then we need to read the first
492   event of the relay log to be able to parse the events we have to
493   execute.
494 
495   @retval 0 ok,
496   @retval 1 error.  In this case, *errmsg is set to point to the error
497   message.
498 */
499 
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)500 int Relay_log_info::init_relay_log_pos(const char* log,
501                                        ulonglong pos, bool need_data_lock,
502                                        const char** errmsg,
503                                        bool keep_looking_for_fd)
504 {
505   DBUG_ENTER("Relay_log_info::init_relay_log_pos");
506   DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
507 
508   *errmsg=0;
509   const char* errmsg_fmt= 0;
510   static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
511   mysql_mutex_t *log_lock= relay_log.get_log_lock();
512 
513   if (need_data_lock)
514     mysql_mutex_lock(&data_lock);
515   else
516     mysql_mutex_assert_owner(&data_lock);
517 
518   /*
519     By default the relay log is in binlog format 3 (4.0).
520     Even if format is 4, this will work enough to read the first event
521     (Format_desc) (remember that format 4 is just lenghtened compared to format
522     3; format 3 is a prefix of format 4).
523   */
524   set_rli_description_event(new Format_description_log_event(3));
525 
526   mysql_mutex_lock(log_lock);
527 
528   /* Close log file and free buffers if it's already open */
529   if (cur_log_fd >= 0)
530   {
531     end_io_cache(&cache_buf);
532     mysql_file_close(cur_log_fd, MYF(MY_WME));
533     cur_log_fd = -1;
534   }
535 
536   group_relay_log_pos= event_relay_log_pos= pos;
537 
538   /*
539     Test to see if the previous run was with the skip of purging
540     If yes, we do not purge when we restart
541   */
542   if (relay_log.find_log_pos(&linfo, NullS, 1))
543   {
544     *errmsg="Could not find first log during relay log initialization";
545     goto err;
546   }
547 
548   if (log && relay_log.find_log_pos(&linfo, log, 1))
549   {
550     errmsg_fmt= "Could not find target log file mentioned in "
551                 "relay log info in the index file '%s' during "
552                 "relay log initialization";
553     sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
554     *errmsg= errmsg_buff;
555     goto err;
556   }
557 
558   set_group_relay_log_name(linfo.log_file_name);
559   set_event_relay_log_name(linfo.log_file_name);
560 
561   if (relay_log.is_active(linfo.log_file_name))
562   {
563     /*
564       The IO thread is using this log file.
565       In this case, we will use the same IO_CACHE pointer to
566       read data as the IO thread is using to write data.
567     */
568     my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
569     if (check_binlog_magic(cur_log, errmsg))
570       goto err;
571     cur_log_old_open_count=relay_log.get_open_count();
572   }
573   else
574   {
575     /*
576       Open the relay log and set cur_log to point at this one
577     */
578     if ((cur_log_fd=open_binlog_file(&cache_buf,
579                                      linfo.log_file_name,errmsg)) < 0)
580       goto err;
581     cur_log = &cache_buf;
582   }
583   /*
584     In all cases, check_binlog_magic() has been called so we're at offset 4 for
585     sure.
586   */
587   if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
588   {
589     Log_event* ev;
590     while (keep_looking_for_fd)
591     {
592       /*
593         Read the possible Format_description_log_event; if position
594         was 4, no need, it will be read naturally.
595       */
596       DBUG_PRINT("info",("looking for a Format_description_log_event"));
597 
598       if (my_b_tell(cur_log) >= pos)
599         break;
600 
601       /*
602         Because of we have data_lock and log_lock, we can safely read an
603         event
604       */
605       if (!(ev= Log_event::read_log_event(cur_log, 0,
606                                           rli_description_event,
607                                           opt_slave_sql_verify_checksum)))
608       {
609         DBUG_PRINT("info",("could not read event, cur_log->error=%d",
610                            cur_log->error));
611         if (cur_log->error) /* not EOF */
612         {
613           *errmsg= "I/O error reading event at position 4";
614           goto err;
615         }
616         break;
617       }
618       else if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT)
619       {
620         DBUG_PRINT("info",("found Format_description_log_event"));
621         set_rli_description_event((Format_description_log_event *)ev);
622         /*
623           As ev was returned by read_log_event, it has passed is_valid(), so
624           my_malloc() in ctor worked, no need to check again.
625         */
626         /*
627           Ok, we found a Format_description event. But it is not sure that this
628           describes the whole relay log; indeed, one can have this sequence
629           (starting from position 4):
630           Format_desc (of slave)
631           Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
632           Rotate (of master)
633           Format_desc (of master)
634           So the Format_desc which really describes the rest of the relay log
635           can be the 3rd or the 4th event (depending on GTIDs being enabled or
636           not, it can't be further than that, because we rotate
637           the relay log when we queue a Rotate event from the master).
638           But what describes the Rotate is the first Format_desc.
639           So what we do is:
640           go on searching for Format_description events, until you exceed the
641           position (argument 'pos') or until you find an event other than
642           Previous-GTIDs, Rotate or Format_desc.
643         */
644       }
645       else
646       {
647         DBUG_PRINT("info",("found event of another type=%d",
648                            ev->get_type_code()));
649         keep_looking_for_fd=
650           (ev->get_type_code() == binary_log::ROTATE_EVENT ||
651            ev->get_type_code() == binary_log::PREVIOUS_GTIDS_LOG_EVENT);
652         delete ev;
653       }
654     }
655     my_b_seek(cur_log,(off_t)pos);
656 #ifndef NDEBUG
657   {
658     char llbuf1[22], llbuf2[22];
659     DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
660                         llstr(my_b_tell(cur_log),llbuf1),
661                         llstr(get_event_relay_log_pos(),llbuf2)));
662   }
663 #endif
664 
665   }
666 
667 err:
668   /*
669     If we don't purge, we can't honour relay_log_space_limit ;
670     silently discard it
671   */
672   if (!relay_log_purge)
673   {
674     log_space_limit= 0; // todo: consider to throw a warning at least
675   }
676   mysql_cond_broadcast(&data_cond);
677 
678   mysql_mutex_unlock(log_lock);
679 
680   if (need_data_lock)
681     mysql_mutex_unlock(&data_lock);
682   if (!rli_description_event->is_valid() && !*errmsg)
683     *errmsg= "Invalid Format_description log event; could be out of memory";
684 
685   DBUG_RETURN ((*errmsg) ? 1 : 0);
686 }
687 
688 /**
689   Update the error number, message and timestamp fields. This function is
690   different from va_report() as va_report() also logs the error message in the
691   log apart from updating the error fields.
692 
693   SYNOPSIS
694   @param[in]  level          specifies the level- error, warning or information,
695   @param[in]  err_code       error number,
696   @param[in]  buff_coord     error message to be used.
697 
698 */
fill_coord_err_buf(loglevel level,int err_code,const char * buff_coord) const699 void Relay_log_info::fill_coord_err_buf(loglevel level, int err_code,
700                                       const char *buff_coord) const
701 {
702   mysql_mutex_lock(&err_lock);
703 
704   if(level == ERROR_LEVEL)
705   {
706     m_last_error.number = err_code;
707     my_snprintf(m_last_error.message, sizeof(m_last_error.message), "%.*s",
708                 MAX_SLAVE_ERRMSG - 1, buff_coord);
709     m_last_error.update_timestamp();
710   }
711 
712   mysql_mutex_unlock(&err_lock);
713 }
714 
715 /**
716   Waits until the SQL thread reaches (has executed up to) the
717   log/position or timed out.
718 
719   SYNOPSIS
720   @param[in]  thd             client thread that sent @c SELECT @c MASTER_POS_WAIT,
721   @param[in]  log_name        log name to wait for,
722   @param[in]  log_pos         position to wait for,
723   @param[in]  timeout         @c timeout in seconds before giving up waiting.
724                               @c timeout is double whereas it should be ulong; but this is
725                               to catch if the user submitted a negative timeout.
726 
727   @retval  -2   improper arguments (log_pos<0)
728                 or slave not running, or master info changed
729                 during the function's execution,
730                 or client thread killed. -2 is translated to NULL by caller,
731   @retval  -1   timed out
732   @retval  >=0  number of log events the function had to wait
733                 before reaching the desired log/position
734  */
735 
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)736 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
737                                     longlong log_pos,
738                                     double timeout)
739 {
740   int event_count = 0;
741   ulong init_abort_pos_wait;
742   int error=0;
743   struct timespec abstime; // for timeout checking
744   PSI_stage_info old_stage;
745   DBUG_ENTER("Relay_log_info::wait_for_pos");
746 
747   if (!inited)
748     DBUG_RETURN(-2);
749 
750   DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
751                       log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
752 
753   DEBUG_SYNC(thd, "begin_master_pos_wait");
754 
755   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
756   mysql_mutex_lock(&data_lock);
757   thd->ENTER_COND(&data_cond, &data_lock,
758                   &stage_waiting_for_the_slave_thread_to_advance_position,
759                   &old_stage);
760   /*
761      This function will abort when it notices that some CHANGE MASTER or
762      RESET MASTER has changed the master info.
763      To catch this, these commands modify abort_pos_wait ; We just monitor
764      abort_pos_wait and see if it has changed.
765      Why do we have this mechanism instead of simply monitoring slave_running
766      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
767      the SQL thread be stopped?
768      This is becasue if someones does:
769      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
770      the change may happen very quickly and we may not notice that
771      slave_running briefly switches between 1/0/1.
772   */
773   init_abort_pos_wait= abort_pos_wait;
774 
775   /*
776     We'll need to
777     handle all possible log names comparisons (e.g. 999 vs 1000).
778     We use ulong for string->number conversion ; this is no
779     stronger limitation than in find_uniq_filename in sql/log.cc
780   */
781   ulong log_name_extension;
782   char log_name_tmp[FN_REFLEN]; //make a char[] from String
783 
784   strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
785 
786   char *p= fn_ext(log_name_tmp);
787   char *p_end;
788   if (!*p || log_pos<0)
789   {
790     error= -2; //means improper arguments
791     goto err;
792   }
793   // Convert 0-3 to 4
794   log_pos= max(log_pos, static_cast<longlong>(BIN_LOG_HEADER_SIZE));
795   /* p points to '.' */
796   log_name_extension= strtoul(++p, &p_end, 10);
797   /*
798     p_end points to the first invalid character.
799     If it equals to p, no digits were found, error.
800     If it contains '\0' it means conversion went ok.
801   */
802   if (p_end==p || *p_end)
803   {
804     error= -2;
805     goto err;
806   }
807 
808   /* The "compare and wait" main loop */
809   while (!thd->killed &&
810          init_abort_pos_wait == abort_pos_wait &&
811          slave_running)
812   {
813     bool pos_reached;
814     int cmp_result= 0;
815 
816     DBUG_PRINT("info",
817                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
818                 init_abort_pos_wait, abort_pos_wait));
819     DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
820                        group_master_log_name, (ulong) group_master_log_pos));
821 
822     /*
823       group_master_log_name can be "", if we are just after a fresh
824       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
825       (before we have executed one Rotate event from the master) or
826       (rare) if the user is doing a weird slave setup (see next
827       paragraph).  If group_master_log_name is "", we assume we don't
828       have enough info to do the comparison yet, so we just wait until
829       more data. In this case master_log_pos is always 0 except if
830       somebody (wrongly) sets this slave to be a slave of itself
831       without using --replicate-same-server-id (an unsupported
832       configuration which does nothing), then group_master_log_pos
833       will grow and group_master_log_name will stay "".
834       Also in case the group master log position is invalid (e.g. after
835       CHANGE MASTER TO RELAY_LOG_POS ), we will wait till the first event
836       is read and the log position is valid again.
837     */
838     if (*group_master_log_name && !is_group_master_log_pos_invalid)
839     {
840       char *basename= (group_master_log_name +
841                        dirname_length(group_master_log_name));
842       /*
843         First compare the parts before the extension.
844         Find the dot in the master's log basename,
845         and protect against user's input error :
846         if the names do not match up to '.' included, return error
847       */
848       char *q= (fn_ext(basename)+1);
849       if (strncmp(basename, log_name_tmp, (int)(q-basename)))
850       {
851         error= -2;
852         break;
853       }
854       // Now compare extensions.
855       char *q_end;
856       ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
857       if (group_master_log_name_extension < log_name_extension)
858         cmp_result= -1 ;
859       else
860         cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
861 
862       pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
863                     cmp_result > 0);
864       if (pos_reached || thd->killed)
865         break;
866     }
867 
868     //wait for master update, with optional timeout.
869 
870     DBUG_PRINT("info",("Waiting for master update"));
871     /*
872       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
873       will wake us up.
874     */
875     thd_wait_begin(thd, THD_WAIT_BINLOG);
876     if (timeout > 0)
877     {
878       /*
879         Note that mysql_cond_timedwait checks for the timeout
880         before for the condition ; i.e. it returns ETIMEDOUT
881         if the system time equals or exceeds the time specified by abstime
882         before the condition variable is signaled or broadcast, _or_ if
883         the absolute time specified by abstime has already passed at the time
884         of the call.
885         For that reason, mysql_cond_timedwait will do the "timeoutting" job
886         even if its condition is always immediately signaled (case of a loaded
887         master).
888       */
889       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
890     }
891     else
892       mysql_cond_wait(&data_cond, &data_lock);
893     thd_wait_end(thd);
894     DBUG_PRINT("info",("Got signal of master update or timed out"));
895     if (error == ETIMEDOUT || error == ETIME)
896     {
897 #ifndef NDEBUG
898       /*
899         Doing this to generate a stack trace and make debugging
900         easier.
901       */
902       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
903         assert(0);
904 #endif
905       error= -1;
906       break;
907     }
908     error=0;
909     event_count++;
910     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
911   }
912 
913 err:
914   mysql_mutex_unlock(&data_lock);
915   thd->EXIT_COND(&old_stage);
916   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
917 improper_arguments: %d  timed_out: %d",
918                      thd->killed_errno(),
919                      (int) (init_abort_pos_wait != abort_pos_wait),
920                      (int) slave_running,
921                      (int) (error == -2),
922                      (int) (error == -1)));
923   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
924       !slave_running)
925   {
926     error= -2;
927   }
928   DBUG_RETURN( error ? error : event_count );
929 }
930 
wait_for_gtid_set(THD * thd,String * gtid,double timeout)931 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
932                                       double timeout)
933 {
934   DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, String, timeout)");
935 
936   DBUG_PRINT("info", ("Waiting for %s timeout %lf", gtid->c_ptr_safe(),
937              timeout));
938 
939   Gtid_set wait_gtid_set(global_sid_map);
940   global_sid_lock->rdlock();
941 
942   if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
943   {
944     global_sid_lock->unlock();
945     DBUG_PRINT("exit",("improper gtid argument"));
946     DBUG_RETURN(-2);
947 
948   }
949   global_sid_lock->unlock();
950 
951   DBUG_RETURN(wait_for_gtid_set(thd, &wait_gtid_set, timeout));
952 }
953 
954 /*
955   TODO: This is a duplicated code that needs to be simplified.
956   This will be done while developing all possible sync options.
957   See WL#3584's specification.
958 
959   /Alfranio
960 */
wait_for_gtid_set(THD * thd,const Gtid_set * wait_gtid_set,double timeout)961 int Relay_log_info::wait_for_gtid_set(THD* thd, const Gtid_set* wait_gtid_set,
962                                       double timeout)
963 {
964   int event_count = 0;
965   ulong init_abort_pos_wait;
966   int error=0;
967   struct timespec abstime; // for timeout checking
968   PSI_stage_info old_stage;
969   DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, gtid_set, timeout)");
970 
971   if (!inited)
972     DBUG_RETURN(-2);
973 
974   DEBUG_SYNC(thd, "begin_wait_for_gtid_set");
975 
976   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
977 
978   mysql_mutex_lock(&data_lock);
979   thd->ENTER_COND(&data_cond, &data_lock,
980                   &stage_waiting_for_the_slave_thread_to_advance_position,
981                   &old_stage);
982   /*
983      This function will abort when it notices that some CHANGE MASTER or
984      RESET MASTER has changed the master info.
985      To catch this, these commands modify abort_pos_wait ; We just monitor
986      abort_pos_wait and see if it has changed.
987      Why do we have this mechanism instead of simply monitoring slave_running
988      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
989      the SQL thread be stopped?
990      This is becasue if someones does:
991      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
992      the change may happen very quickly and we may not notice that
993      slave_running briefly switches between 1/0/1.
994   */
995   init_abort_pos_wait= abort_pos_wait;
996 
997   /* The "compare and wait" main loop */
998   while (!thd->killed &&
999          init_abort_pos_wait == abort_pos_wait &&
1000          slave_running)
1001   {
1002     DBUG_PRINT("info",
1003                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
1004                 init_abort_pos_wait, abort_pos_wait));
1005 
1006     //wait for master update, with optional timeout.
1007 
1008     global_sid_lock->wrlock();
1009     const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1010     const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
1011 
1012     char *wait_gtid_set_buf;
1013     wait_gtid_set->to_string(&wait_gtid_set_buf);
1014     DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
1015                         "!is_intersection_nonempty: %d",
1016                         wait_gtid_set_buf,
1017                         wait_gtid_set->is_subset(executed_gtids),
1018                         !owned_gtids->is_intersection_nonempty(wait_gtid_set)));
1019     my_free(wait_gtid_set_buf);
1020     executed_gtids->dbug_print("gtid_executed:");
1021     owned_gtids->dbug_print("owned_gtids:");
1022 
1023     /*
1024       Since commit is performed after log to binary log, we must also
1025       check if any GTID of wait_gtid_set is not yet committed.
1026     */
1027     if (wait_gtid_set->is_subset(executed_gtids) &&
1028         !owned_gtids->is_intersection_nonempty(wait_gtid_set))
1029     {
1030       global_sid_lock->unlock();
1031       break;
1032     }
1033     global_sid_lock->unlock();
1034 
1035     DBUG_PRINT("info",("Waiting for master update"));
1036 
1037     /*
1038       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
1039       will wake us up.
1040     */
1041     thd_wait_begin(thd, THD_WAIT_BINLOG);
1042     if (timeout > 0)
1043     {
1044       /*
1045         Note that mysql_cond_timedwait checks for the timeout
1046         before for the condition ; i.e. it returns ETIMEDOUT
1047         if the system time equals or exceeds the time specified by abstime
1048         before the condition variable is signaled or broadcast, _or_ if
1049         the absolute time specified by abstime has already passed at the time
1050         of the call.
1051         For that reason, mysql_cond_timedwait will do the "timeoutting" job
1052         even if its condition is always immediately signaled (case of a loaded
1053         master).
1054       */
1055       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
1056     }
1057     else
1058       mysql_cond_wait(&data_cond, &data_lock);
1059     thd_wait_end(thd);
1060     DBUG_PRINT("info",("Got signal of master update or timed out"));
1061     if (error == ETIMEDOUT || error == ETIME)
1062     {
1063 #ifndef NDEBUG
1064       /*
1065         Doing this to generate a stack trace and make debugging
1066         easier.
1067       */
1068       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
1069         assert(0);
1070 #endif
1071       error= -1;
1072       break;
1073     }
1074     error=0;
1075     event_count++;
1076     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
1077   }
1078 
1079   mysql_mutex_unlock(&data_lock);
1080   thd->EXIT_COND(&old_stage);
1081   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
1082 improper_arguments: %d  timed_out: %d",
1083                      thd->killed_errno(),
1084                      (int) (init_abort_pos_wait != abort_pos_wait),
1085                      (int) slave_running,
1086                      (int) (error == -2),
1087                      (int) (error == -1)));
1088   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
1089       !slave_running)
1090   {
1091     error= -2;
1092   }
1093   DBUG_RETURN( error ? error : event_count );
1094 }
1095 
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)1096 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
1097                                             bool need_data_lock)
1098 {
1099   int error= 0;
1100   DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
1101 
1102   if (need_data_lock)
1103     mysql_mutex_lock(&data_lock);
1104   else
1105     mysql_mutex_assert_owner(&data_lock);
1106 
1107   inc_event_relay_log_pos();
1108   group_relay_log_pos= event_relay_log_pos;
1109   strmake(group_relay_log_name,event_relay_log_name,
1110           sizeof(group_relay_log_name)-1);
1111 
1112   notify_group_relay_log_name_update();
1113 
1114   /*
1115     In 4.x we used the event's len to compute the positions here. This is
1116     wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1117     then the event's len is not what is was in the master's binlog, so this
1118     will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1119     replication: Exec_master_log_pos is wrong). Only way to solve this is to
1120     have the original offset of the end of the event the relay log. This is
1121     what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1122     of log_pos in 4.0 was to compute the end_log_pos; so better to store
1123     end_log_pos instead of begin_log_pos.
1124     If we had not done this fix here, the problem would also have appeared
1125     when the slave and master are 5.0 but with different event length (for
1126     example the slave is more recent than the master and features the event
1127     UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1128     SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1129     value which would lead to badly broken replication.
1130     Even the relay_log_pos will be corrupted in this case, because the len is
1131     the relay log is not "val".
1132     With the end_log_pos solution, we avoid computations involving lengthes.
1133   */
1134   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
1135                       (long) log_pos, (long) group_master_log_pos));
1136 
1137   if (log_pos > 0)  // 3.23 binlogs don't have log_posx
1138     group_master_log_pos= log_pos;
1139   /*
1140     If the master log position was invalidiated by say, "CHANGE MASTER TO
1141     RELAY_LOG_POS=N", it is now valid,
1142    */
1143   if (is_group_master_log_pos_invalid)
1144     is_group_master_log_pos_invalid= false;
1145 
1146   /*
1147     In MTS mode FD or Rotate event commit their solitary group to
1148     Coordinator's info table. Callers make sure that Workers have been
1149     executed all assignements.
1150     Broadcast to master_pos_wait() waiters should be done after
1151     the table is updated.
1152   */
1153   assert(!is_parallel_exec() ||
1154          mts_group_status != Relay_log_info::MTS_IN_GROUP);
1155   /*
1156     We do not force synchronization at this point, note the
1157     parameter false, because a non-transactional change is
1158     being committed.
1159 
1160     For that reason, the synchronization here is subjected to
1161     the option sync_relay_log_info.
1162 
1163     See sql/rpl_rli.h for further information on this behavior.
1164   */
1165   error= flush_info(FALSE);
1166 
1167   mysql_cond_broadcast(&data_cond);
1168   if (need_data_lock)
1169     mysql_mutex_unlock(&data_lock);
1170   DBUG_RETURN(error);
1171 }
1172 
1173 
close_temporary_tables()1174 void Relay_log_info::close_temporary_tables()
1175 {
1176   TABLE *table,*next;
1177   int num_closed_temp_tables= 0;
1178   DBUG_ENTER("Relay_log_info::close_temporary_tables");
1179 
1180   for (table=save_temporary_tables ; table ; table=next)
1181   {
1182     next=table->next;
1183     /*
1184       Don't ask for disk deletion. For now, anyway they will be deleted when
1185       slave restarts, but it is a better intention to not delete them.
1186     */
1187     DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1188     close_temporary(table, 1, 0);
1189     num_closed_temp_tables++;
1190   }
1191   save_temporary_tables= 0;
1192   slave_open_temp_tables.atomic_add(-num_closed_temp_tables);
1193   channel_open_temp_tables.atomic_add(-num_closed_temp_tables);
1194   DBUG_VOID_RETURN;
1195 }
1196 
1197 /**
1198   Purges relay logs. It assumes to have a run lock on rli and that no
1199   slave thread are running.
1200 
1201   @param[in]   THD         connection,
1202   @param[in]   just_reset  if false, it tells that logs should be purged
1203                            and @c init_relay_log_pos() should be called,
1204   @errmsg[out] errmsg      store pointer to an error message.
1205 
1206   @retval 0 successfuly executed,
1207   @retval 1 otherwise error, where errmsg is set to point to the error message.
1208 */
1209 
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg,bool delete_only)1210 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1211                                      const char** errmsg, bool delete_only)
1212 {
1213   int error=0;
1214   const char *ln;
1215   /* name of the index file if opt_relaylog_index_name is set*/
1216   const char* log_index_name;
1217   /*
1218     Buffer to add channel name suffix when relay-log-index option is
1219     provided
1220    */
1221   char relay_bin_index_channel[FN_REFLEN];
1222 
1223   const char *ln_without_channel_name;
1224   /*
1225     Buffer to add channel name suffix when relay-log option is provided.
1226    */
1227   char relay_bin_channel[FN_REFLEN];
1228 
1229   char buffer[FN_REFLEN];
1230 
1231   mysql_mutex_t *log_lock= relay_log.get_log_lock();
1232 
1233   DBUG_ENTER("Relay_log_info::purge_relay_logs");
1234 
1235   /*
1236     Even if inited==0, we still try to empty master_log_* variables. Indeed,
1237     inited==0 does not imply that they already are empty.
1238 
1239     It could be that slave's info initialization partly succeeded: for example
1240     if relay-log.info existed but *relay-bin*.* have been manually removed,
1241     init_info reads the old relay-log.info and fills rli->master_log_*, then
1242     init_info checks for the existence of the relay log, this fails and
1243     init_info leaves inited to 0.
1244     In that pathological case, master_log_pos* will be properly reinited at
1245     the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1246     purge_relay_logs, will delete bogus *.info files or replace them with
1247     correct files), however if the user does SHOW SLAVE STATUS before START
1248     SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1249     master_log_* for SHOW SLAVE STATUS to display fine in any case.
1250   */
1251   group_master_log_name[0]= 0;
1252   group_master_log_pos= 0;
1253 
1254   /*
1255     Following the the relay log purge, the master_log_pos will be in sync
1256     with relay_log_pos, so the flag should be cleared. Refer bug#11766010.
1257   */
1258 
1259   is_group_master_log_pos_invalid= false;
1260 
1261   if (!inited)
1262   {
1263     DBUG_PRINT("info", ("inited == 0"));
1264     if (error_on_rli_init_info ||
1265         /*
1266           mi->reset means that the channel was reset but still exists. Channel
1267           shall have the index and the first relay log file.
1268 
1269           Those files shall be remove in a following RESET SLAVE ALL (even when
1270           channel was not inited again).
1271         */
1272         (mi->reset && delete_only))
1273     {
1274       ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
1275                                                        "-relay-bin", buffer);
1276 
1277       ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1278                                         ln_without_channel_name);
1279       if (opt_relaylog_index_name)
1280       {
1281         char index_file_withoutext[FN_REFLEN];
1282         relay_log.generate_name(opt_relaylog_index_name,"",
1283                                 index_file_withoutext);
1284 
1285         log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
1286                                                       FN_REFLEN,
1287                                                       index_file_withoutext);
1288       }
1289       else
1290         log_index_name= 0;
1291 
1292       if (relay_log.open_index_file(log_index_name, ln, TRUE))
1293       {
1294         sql_print_error("Unable to purge relay log files. Failed to open relay "
1295                         "log index file:%s.", relay_log.get_index_fname());
1296         DBUG_RETURN(1);
1297       }
1298       mysql_mutex_lock(&mi->data_lock);
1299       mysql_mutex_lock(log_lock);
1300       if (relay_log.open_binlog(ln, 0,
1301                                 (max_relay_log_size ? max_relay_log_size :
1302                                  max_binlog_size), true,
1303                                 true/*need_lock_index=true*/,
1304                                 true/*need_sid_lock=true*/,
1305                                 mi->get_mi_description_event()))
1306       {
1307         mysql_mutex_unlock(log_lock);
1308         mysql_mutex_unlock(&mi->data_lock);
1309         sql_print_error("Unable to purge relay log files. Failed to open relay "
1310                         "log file:%s.", relay_log.get_log_fname());
1311         DBUG_RETURN(1);
1312       }
1313       mysql_mutex_unlock(log_lock);
1314       mysql_mutex_unlock(&mi->data_lock);
1315     }
1316     else
1317       DBUG_RETURN(0);
1318   }
1319   else
1320   {
1321     assert(slave_running == 0);
1322     assert(mi->slave_running == 0);
1323   }
1324   /* Reset the transaction boundary parser and clear the last GTID queued */
1325   mi->transaction_parser.reset();
1326   mi->clear_last_gtid_queued();
1327 
1328   slave_skip_counter= 0;
1329   mysql_mutex_lock(&data_lock);
1330 
1331   /*
1332     we close the relay log fd possibly left open by the slave SQL thread,
1333     to be able to delete it; the relay log fd possibly left open by the slave
1334     I/O thread will be closed naturally in reset_logs() by the
1335     close(LOG_CLOSE_TO_BE_OPENED) call
1336   */
1337   if (cur_log_fd >= 0)
1338   {
1339     end_io_cache(&cache_buf);
1340     my_close(cur_log_fd, MYF(MY_WME));
1341     cur_log_fd= -1;
1342   }
1343 
1344   /**
1345     Clear the retrieved gtid set for this channel.
1346     global_sid_lock->wrlock() is needed.
1347   */
1348   global_sid_lock->wrlock();
1349   (const_cast<Gtid_set *>(get_gtid_set()))->clear();
1350   global_sid_lock->unlock();
1351 
1352   if (relay_log.reset_logs(thd, delete_only))
1353   {
1354     *errmsg = "Failed during log reset";
1355     error=1;
1356     goto err;
1357   }
1358 
1359   /* Save name of used relay log file */
1360   set_group_relay_log_name(relay_log.get_log_fname());
1361   set_event_relay_log_name(relay_log.get_log_fname());
1362   group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1363   if (!delete_only && count_relay_log_space())
1364   {
1365     *errmsg= "Error counting relay log space";
1366     error= 1;
1367     goto err;
1368   }
1369   if (!just_reset)
1370     error= init_relay_log_pos(group_relay_log_name,
1371                               group_relay_log_pos,
1372                               false/*need_data_lock=false*/, errmsg, 0);
1373   if (!inited && error_on_rli_init_info)
1374     relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1375                     true/*need_lock_log=true*/,
1376                     true/*need_lock_index=true*/);
1377 err:
1378 #ifndef NDEBUG
1379   char buf[22];
1380 #endif
1381   DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1382   mysql_mutex_unlock(&data_lock);
1383   DBUG_RETURN(error);
1384 }
1385 
1386 /*
1387    When --relay-bin option is not provided, the names of the
1388    relay log files are host-relay-bin.0000x or
1389    host-relay-bin-CHANNEL.00000x in the case of MSR.
1390    However, if that option is provided, then the names of the
1391    relay log files are <relay-bin-option>.0000x or
1392    <relay-bin-option>-CHANNEL.00000x in the case of MSR.
1393 
1394    The function adds a channel suffix (according to the channel to file name
1395    conventions and conversions) to the relay log file.
1396 
1397    @todo: truncate the log file if length exceeds.
1398 */
1399 
1400 const char*
add_channel_to_relay_log_name(char * buff,uint buff_size,const char * base_name)1401 Relay_log_info::add_channel_to_relay_log_name(char *buff, uint buff_size,
1402                                               const char *base_name)
1403 {
1404   char *ptr;
1405   char channel_to_file[FN_REFLEN];
1406   uint errors, length;
1407   uint base_name_len;
1408   uint suffix_buff_size;
1409 
1410   assert(base_name !=NULL);
1411 
1412   base_name_len= strlen(base_name);
1413   suffix_buff_size= buff_size - base_name_len;
1414 
1415   ptr= strmake(buff, base_name, buff_size-1);
1416 
1417   if (channel[0])
1418   {
1419 
1420    /* adding a "-" */
1421     ptr= strmake(ptr, "-", suffix_buff_size-1);
1422 
1423     /*
1424       Convert the channel name to the file names charset.
1425       Channel name is in system_charset which is UTF8_general_ci
1426       as it was defined as utf8 in the mysql.slaveinfo tables.
1427     */
1428     length= strconvert(system_charset_info, channel, &my_charset_filename,
1429                        channel_to_file, NAME_LEN, &errors);
1430     ptr= strmake(ptr, channel_to_file, suffix_buff_size-length-1);
1431 
1432   }
1433 
1434   return (const char*)buff;
1435 }
1436 
1437 
1438 /**
1439      Checks if condition stated in UNTIL clause of START SLAVE is reached.
1440 
1441      Specifically, it checks if UNTIL condition is reached. Uses caching result
1442      of last comparison of current log file name and target log file name. So
1443      cached value should be invalidated if current log file name changes (see
1444      @c Relay_log_info::notify_... functions).
1445 
1446      This caching is needed to avoid of expensive string comparisons and
1447      @c strtol() conversions needed for log names comparison. We don't need to
1448      compare them each time this function is called, we only need to do this
1449      when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1450      need to do this only after @c Rotate_log_event::do_apply_event() (which is
1451      rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1452      condition then we should invalidate cached comarison value after
1453      @c inc_group_relay_log_pos() which called for each group of events (so we
1454      have some benefit if we have something like queries that use
1455      autoincrement or if we have transactions).
1456 
1457      Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1458 
1459      @param master_beg_pos    position of the beginning of to be executed event
1460                               (not @c log_pos member of the event that points to
1461                               the beginning of the following event)
1462 
1463      @retval true   condition met or error happened (condition seems to have
1464                     bad log file name),
1465      @retval false  condition not met.
1466 */
1467 
is_until_satisfied(THD * thd,Log_event * ev)1468 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1469 {
1470   char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1471                     "condition is bad.";
1472   DBUG_ENTER("Relay_log_info::is_until_satisfied");
1473 
1474   switch (until_condition)
1475   {
1476   case UNTIL_MASTER_POS:
1477   case UNTIL_RELAY_POS:
1478   {
1479     const char *log_name= NULL;
1480     ulonglong log_pos= 0;
1481 
1482     if (until_condition == UNTIL_MASTER_POS)
1483     {
1484       if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1485         DBUG_RETURN(false);
1486       /*
1487         Rotate events originating from the slave have server_id==0,
1488         and their log_pos is relative to the slave, so in case their
1489         log_pos is greater than the log_pos we are waiting for, they
1490         can cause the slave to stop prematurely. So we ignore such
1491         events.
1492       */
1493       if (ev && ev->server_id == 0)
1494         DBUG_RETURN(false);
1495       log_name= group_master_log_name;
1496       if (!ev || is_in_group() || !ev->common_header->log_pos)
1497         log_pos= group_master_log_pos;
1498       else
1499         log_pos= ev->common_header->log_pos - ev->common_header->data_written;
1500     }
1501     else
1502     { /* until_condition == UNTIL_RELAY_POS */
1503       log_name= group_relay_log_name;
1504       log_pos= group_relay_log_pos;
1505     }
1506 
1507 #ifndef NDEBUG
1508     {
1509       char buf[32];
1510       DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1511                           group_master_log_name, llstr(group_master_log_pos, buf)));
1512       DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1513                           group_relay_log_name, llstr(group_relay_log_pos, buf)));
1514       DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1515                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1516                           log_name, llstr(log_pos, buf)));
1517       DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1518                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1519                           until_log_name, llstr(until_log_pos, buf)));
1520     }
1521 #endif
1522 
1523     if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1524     {
1525       /*
1526         We have no cached comparison results so we should compare log names
1527         and cache result.
1528         If we are after RESET SLAVE, and the SQL slave thread has not processed
1529         any event yet, it could be that group_master_log_name is "". In that case,
1530         just wait for more events (as there is no sensible comparison to do).
1531       */
1532 
1533       if (*log_name)
1534       {
1535         const char *basename= log_name + dirname_length(log_name);
1536 
1537         const char *q= (const char*)(fn_ext(basename)+1);
1538         if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1539         {
1540           /* Now compare extensions. */
1541           char *q_end;
1542           ulong log_name_extension= strtoul(q, &q_end, 10);
1543           if (log_name_extension < until_log_name_extension)
1544             until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1545           else
1546             until_log_names_cmp_result=
1547               (log_name_extension > until_log_name_extension) ?
1548               UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1549         }
1550         else
1551         {
1552           /* Base names do not match, so we abort */
1553           sql_print_error("%s", error_msg);
1554           DBUG_RETURN(true);
1555         }
1556       }
1557       else
1558         DBUG_RETURN(until_log_pos == 0);
1559     }
1560 
1561     if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1562           log_pos >= until_log_pos) ||
1563          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1564     {
1565       char buf[22];
1566       sql_print_information("Slave SQL thread stopped because it reached its"
1567                             " UNTIL position %s", llstr(until_pos(), buf));
1568       DBUG_RETURN(true);
1569     }
1570     DBUG_RETURN(false);
1571   }
1572 
1573   case UNTIL_SQL_BEFORE_GTIDS:
1574     /*
1575       We only need to check once if executed_gtids set
1576       contains any of the until_sql_gtids.
1577     */
1578     if (until_sql_gtids_first_event)
1579     {
1580       until_sql_gtids_first_event= false;
1581       global_sid_lock->wrlock();
1582       /* Check if until GTIDs were already applied. */
1583       const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1584       if (until_sql_gtids.is_intersection_nonempty(executed_gtids))
1585       {
1586         char *buffer;
1587         until_sql_gtids.to_string(&buffer);
1588         global_sid_lock->unlock();
1589         sql_print_information("Slave SQL thread stopped because "
1590                               "UNTIL SQL_BEFORE_GTIDS %s is already "
1591                               "applied", buffer);
1592         my_free(buffer);
1593         DBUG_RETURN(true);
1594       }
1595       global_sid_lock->unlock();
1596     }
1597     if (ev != NULL && ev->get_type_code() == binary_log::GTID_LOG_EVENT)
1598     {
1599       Gtid_log_event *gev= (Gtid_log_event *)ev;
1600       global_sid_lock->rdlock();
1601       if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1602       {
1603         char *buffer;
1604         until_sql_gtids.to_string(&buffer);
1605         global_sid_lock->unlock();
1606         sql_print_information("Slave SQL thread stopped because it reached "
1607                               "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1608         my_free(buffer);
1609         DBUG_RETURN(true);
1610       }
1611       global_sid_lock->unlock();
1612     }
1613     DBUG_RETURN(false);
1614     break;
1615 
1616   case UNTIL_SQL_AFTER_GTIDS:
1617     {
1618       global_sid_lock->wrlock();
1619       const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1620       if (until_sql_gtids.is_subset(executed_gtids))
1621       {
1622         char *buffer;
1623         until_sql_gtids.to_string(&buffer);
1624         global_sid_lock->unlock();
1625         sql_print_information("Slave SQL thread stopped because it reached "
1626                               "UNTIL SQL_AFTER_GTIDS %s", buffer);
1627         my_free(buffer);
1628         DBUG_RETURN(true);
1629       }
1630       global_sid_lock->unlock();
1631       DBUG_RETURN(false);
1632     }
1633     break;
1634 
1635   case UNTIL_SQL_AFTER_MTS_GAPS:
1636   case UNTIL_DONE:
1637     /*
1638       TODO: this condition is actually post-execution or post-scheduling
1639             so the proper place to check it before SQL thread goes
1640             into next_event() where it can wait while the condition
1641             has been satisfied already.
1642             It's deployed here temporarily to be fixed along the regular UNTIL
1643             support for MTS is provided.
1644     */
1645     if (mts_recovery_group_cnt == 0)
1646     {
1647       sql_print_information("Slave SQL thread stopped according to "
1648                             "UNTIL SQL_AFTER_MTS_GAPS as it has "
1649                             "processed all gap transactions left from "
1650                             "the previous slave session.");
1651       until_condition= UNTIL_DONE;
1652       DBUG_RETURN(true);
1653     }
1654     else
1655     {
1656       DBUG_RETURN(false);
1657     }
1658     break;
1659 
1660   case UNTIL_SQL_VIEW_ID:
1661     if (ev != NULL && ev->get_type_code() == binary_log::VIEW_CHANGE_EVENT)
1662     {
1663       View_change_log_event *view_event= (View_change_log_event *)ev;
1664 
1665       if (until_view_id.compare(view_event->get_view_id()) == 0)
1666       {
1667         set_group_replication_retrieved_certification_info(view_event);
1668         until_view_id_found= true;
1669         DBUG_RETURN(false);
1670       }
1671     }
1672 
1673     if (until_view_id_found && ev != NULL && ev->ends_group())
1674     {
1675       until_view_id_commit_found= true;
1676       DBUG_RETURN(false);
1677     }
1678 
1679     if (until_view_id_commit_found && ev == NULL)
1680     {
1681       DBUG_RETURN(true);
1682     }
1683 
1684     DBUG_RETURN(false);
1685     break;
1686 
1687   case UNTIL_NONE:
1688     assert(0);
1689     break;
1690   }
1691 
1692   assert(0);
1693   DBUG_RETURN(false);
1694 }
1695 
cached_charset_invalidate()1696 void Relay_log_info::cached_charset_invalidate()
1697 {
1698   DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1699 
1700   /* Full of zeroes means uninitialized. */
1701   memset(cached_charset, 0, sizeof(cached_charset));
1702   DBUG_VOID_RETURN;
1703 }
1704 
1705 
cached_charset_compare(char * charset) const1706 bool Relay_log_info::cached_charset_compare(char *charset) const
1707 {
1708   DBUG_ENTER("Relay_log_info::cached_charset_compare");
1709 
1710   if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1711   {
1712     memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1713     DBUG_RETURN(1);
1714   }
1715   DBUG_RETURN(0);
1716 }
1717 
1718 
stmt_done(my_off_t event_master_log_pos)1719 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1720 {
1721   int error= 0;
1722 
1723   clear_flag(IN_STMT);
1724 
1725   assert(!belongs_to_client());
1726   /* Worker does not execute binlog update position logics */
1727   assert(!is_mts_worker(info_thd));
1728 
1729   /*
1730     Replication keeps event and group positions to specify the
1731     set of events that were executed.
1732     Event positions are incremented after processing each event
1733     whereas group positions are incremented when an event or a
1734     set of events is processed such as in a transaction and are
1735     committed or rolled back.
1736 
1737     A transaction can be ended with a Query Event, i.e. either
1738     commit or rollback, or by a Xid Log Event. Query Event is
1739     used to terminate pseudo-transactions that are executed
1740     against non-transactional engines such as MyIsam. Xid Log
1741     Event denotes though that a set of changes executed
1742     against a transactional engine is about to commit.
1743 
1744     Events' positions are incremented at stmt_done(). However,
1745     transactions that are ended with Xid Log Event have their
1746     group position incremented in the do_apply_event() and in
1747     the do_apply_event_work().
1748 
1749     Notice that the type of the engine, i.e. where data and
1750     positions are stored, against what events are being applied
1751     are not considered in this logic.
1752 
1753     Regarding the code that follows, notice that the executed
1754     group coordinates don't change if the current event is internal
1755     to the group. The same applies to MTS Coordinator when it
1756     handles a Format Descriptor event that appears in the middle
1757     of a group that is about to be assigned.
1758   */
1759   if ((!is_parallel_exec() && is_in_group()) ||
1760       mts_group_status != MTS_NOT_IN_GROUP)
1761   {
1762     inc_event_relay_log_pos();
1763   }
1764   else
1765   {
1766     if (is_parallel_exec())
1767     {
1768 
1769       assert(!is_mts_worker(info_thd));
1770 
1771       /*
1772         Format Description events only can drive MTS execution to this
1773         point. It is a special event group that is handled with
1774         synchronization. For that reason, the checkpoint routine is
1775         called here.
1776       */
1777       error= mts_checkpoint_routine(this, 0, false,
1778                                     true/*need_data_lock=true*/);
1779     }
1780     if (!error)
1781       error= inc_group_relay_log_pos(event_master_log_pos,
1782                                      true/*need_data_lock=true*/);
1783   }
1784 
1785   return error;
1786 }
1787 
1788 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1789 void Relay_log_info::cleanup_context(THD *thd, bool error)
1790 {
1791   DBUG_ENTER("Relay_log_info::cleanup_context");
1792 
1793   assert(info_thd == thd);
1794   /*
1795     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1796     may have opened tables, which we cannot be sure have been closed (because
1797     maybe the Rows_log_event have not been found or will not be, because slave
1798     SQL thread is stopping, or relay log has a missing tail etc). So we close
1799     all thread's tables. And so the table mappings have to be cancelled.
1800     2) Rows_log_event::do_apply_event() may even have started statements or
1801     transactions on them, which we need to rollback in case of error.
1802     3) If finding a Format_description_log_event after a BEGIN, we also need
1803     to rollback before continuing with the next events.
1804     4) so we need this "context cleanup" function.
1805   */
1806   if (error)
1807   {
1808     trans_rollback_stmt(thd); // if a "statement transaction"
1809     trans_rollback(thd);      // if a "real transaction"
1810   }
1811   if (rows_query_ev)
1812   {
1813     /*
1814       In order to avoid invalid memory access, THD::reset_query() should be
1815       called before deleting the rows_query event.
1816     */
1817     info_thd->reset_query();
1818     info_thd->reset_query_for_display();
1819     delete rows_query_ev;
1820     rows_query_ev= NULL;
1821     DBUG_EXECUTE_IF("after_deleting_the_rows_query_ev",
1822                     {
1823                       const char action[]="now SIGNAL deleted_rows_query_ev WAIT_FOR go_ahead";
1824                       assert(!debug_sync_set_action(info_thd,
1825                                                     STRING_WITH_LEN(action)));
1826                     };);
1827   }
1828   m_table_map.clear_tables();
1829   slave_close_thread_tables(thd);
1830   if (error)
1831   {
1832     /*
1833       trans_rollback above does not rollback XA transactions.
1834       It could be done only after necessarily closing tables which dictates
1835       the following placement.
1836     */
1837     XID_STATE *xid_state= thd->get_transaction()->xid_state();
1838     if (!xid_state->has_state(XID_STATE::XA_NOTR))
1839     {
1840       assert(DBUG_EVALUATE_IF("simulate_commit_failure",1,
1841                               xid_state->has_state(XID_STATE::XA_ACTIVE) ||
1842                               xid_state->has_state(XID_STATE::XA_IDLE)
1843                               ));
1844 
1845       xa_trans_force_rollback(thd);
1846       xid_state->reset();
1847       cleanup_trans_state(thd);
1848       thd->rpl_unflag_detached_engine_ha_data();
1849     }
1850     thd->mdl_context.release_transactional_locks();
1851   }
1852   clear_flag(IN_STMT);
1853   /*
1854     Cleanup for the flags that have been set at do_apply_event.
1855   */
1856   thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1857   thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1858 
1859   /*
1860     Reset state related to long_find_row notes in the error log:
1861     - timestamp
1862     - flag that decides whether the slave prints or not
1863   */
1864   reset_row_stmt_start_timestamp();
1865   unset_long_find_row_note_printed();
1866 
1867   /*
1868     If the slave applier changed the current transaction isolation level,
1869     it need to be restored to the session default value once having the
1870     current transaction cleared.
1871 
1872     We should call "trans_reset_one_shot_chistics()" only if the "error"
1873     flag is "true", because "cleanup_context()" is called at the end of each
1874     set of Table_maps/Rows representing a statement (when the rows event
1875     is tagged with the STMT_END_F) with the "error" flag as "false".
1876 
1877     So, without the "if (error)" below, the isolation level might be reset
1878     in the middle of a pure row based transaction.
1879   */
1880   if (error)
1881     trans_reset_one_shot_chistics(thd);
1882 
1883   DBUG_VOID_RETURN;
1884 }
1885 
clear_tables_to_lock()1886 void Relay_log_info::clear_tables_to_lock()
1887 {
1888   DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
1889 #ifndef NDEBUG
1890   /**
1891     When replicating in RBR and MyISAM Merge tables are involved
1892     open_and_lock_tables (called in do_apply_event) appends the
1893     base tables to the list of tables_to_lock. Then these are
1894     removed from the list in close_thread_tables (which is called
1895     before we reach this point).
1896 
1897     This assertion just confirms that we get no surprises at this
1898     point.
1899    */
1900   uint i=0;
1901   for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1902   assert(i == tables_to_lock_count);
1903 #endif
1904 
1905   while (tables_to_lock)
1906   {
1907     uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1908     if (tables_to_lock->m_tabledef_valid)
1909     {
1910       tables_to_lock->m_tabledef.table_def::~table_def();
1911       tables_to_lock->m_tabledef_valid= FALSE;
1912     }
1913 
1914     /*
1915       If blob fields were used during conversion of field values
1916       from the master table into the slave table, then we need to
1917       free the memory used temporarily to store their values before
1918       copying into the slave's table.
1919     */
1920     if (tables_to_lock->m_conv_table)
1921       free_blobs(tables_to_lock->m_conv_table);
1922 
1923     tables_to_lock=
1924       static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1925     tables_to_lock_count--;
1926     my_free(to_free);
1927   }
1928   assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1929   DBUG_VOID_RETURN;
1930 }
1931 
slave_close_thread_tables(THD * thd)1932 void Relay_log_info::slave_close_thread_tables(THD *thd)
1933 {
1934   thd->get_stmt_da()->set_overwrite_status(true);
1935   DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
1936   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1937   thd->get_stmt_da()->set_overwrite_status(false);
1938 
1939   close_thread_tables(thd);
1940   /*
1941     - If transaction rollback was requested due to deadlock
1942     perform it and release metadata locks.
1943     - If inside a multi-statement transaction,
1944     defer the release of metadata locks until the current
1945     transaction is either committed or rolled back. This prevents
1946     other statements from modifying the table for the entire
1947     duration of this transaction.  This provides commit ordering
1948     and guarantees serializability across multiple transactions.
1949     - If in autocommit mode, or outside a transactional context,
1950     automatically release metadata locks of the current statement.
1951   */
1952   if (thd->transaction_rollback_request)
1953   {
1954     trans_rollback_implicit(thd);
1955     thd->mdl_context.release_transactional_locks();
1956   }
1957   else if (! thd->in_multi_stmt_transaction_mode())
1958     thd->mdl_context.release_transactional_locks();
1959   else
1960     thd->mdl_context.release_statement_locks();
1961 
1962   clear_tables_to_lock();
1963   DBUG_VOID_RETURN;
1964 }
1965 
1966 
1967 /**
1968   Execute a SHOW RELAYLOG EVENTS statement.
1969 
1970   When multiple replication channels exist on this slave
1971   and no channel name is specified through FOR CHANNEL clause
1972   this function errors out and exits.
1973 
1974   @param thd Pointer to THD object for the client thread executing the
1975   statement.
1976 
1977   @retval FALSE success
1978   @retval TRUE failure
1979 */
mysql_show_relaylog_events(THD * thd)1980 bool mysql_show_relaylog_events(THD* thd)
1981 {
1982 
1983   Master_info *mi =0;
1984   List<Item> field_list;
1985   bool res;
1986   DBUG_ENTER("mysql_show_relaylog_events");
1987 
1988   assert(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1989 
1990   channel_map.wrlock();
1991 
1992   if (!thd->lex->mi.for_channel && channel_map.get_num_instances() > 1)
1993   {
1994     my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0));
1995     res= true;
1996     goto err;
1997   }
1998 
1999   Log_event::init_show_field_list(&field_list);
2000   if (thd->send_result_metadata(&field_list,
2001                                 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2002   {
2003     res= true;
2004     goto err;
2005   }
2006 
2007   mi= channel_map.get_mi(thd->lex->mi.channel);
2008 
2009   if (!mi && strcmp(thd->lex->mi.channel, channel_map.get_default_channel()))
2010   {
2011     my_error(ER_SLAVE_CHANNEL_DOES_NOT_EXIST, MYF(0), thd->lex->mi.channel);
2012     res= true;
2013     goto err;
2014   }
2015 
2016   if (mi == NULL)
2017   {
2018     my_error(ER_SLAVE_CONFIGURATION, MYF(0));
2019     res= true;
2020     goto err;
2021   }
2022 
2023   res= show_binlog_events(thd, &mi->rli->relay_log);
2024 
2025 err:
2026   channel_map.unlock();
2027 
2028   DBUG_RETURN(res);
2029 }
2030 #endif
2031 
2032 
rli_init_info()2033 int Relay_log_info::rli_init_info()
2034 {
2035   int error= 0;
2036   enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
2037   const char *msg= NULL;
2038   /* Store the GTID of a transaction spanned in multiple relay log files */
2039   Gtid gtid_partial_trx= {0, 0};
2040 
2041   DBUG_ENTER("Relay_log_info::rli_init_info");
2042 
2043   mysql_mutex_assert_owner(&data_lock);
2044 
2045   /*
2046     If Relay_log_info is issued again after a failed init_info(), for
2047     instance because of missing relay log files, it will generate new
2048     files and ignore the previous failure, to avoid that we set
2049     error_on_rli_init_info as true.
2050     This a consequence of the behaviour change, in the past server was
2051     stopped when there were replication initialization errors, now it is
2052     not and so init_info() must be aware of previous failures.
2053   */
2054   if (error_on_rli_init_info)
2055     goto err;
2056 
2057   if (inited)
2058   {
2059     /*
2060       We have to reset read position of relay-log-bin as we may have
2061       already been reading from 'hotlog' when the slave was stopped
2062       last time. If this case pos_in_file would be set and we would
2063       get a crash when trying to read the signature for the binary
2064       relay log.
2065 
2066       We only rewind the read position if we are starting the SQL
2067       thread. The handle_slave_sql thread assumes that the read
2068       position is at the beginning of the file, and will read the
2069       "signature" and then fast-forward to the last position read.
2070     */
2071     bool hot_log= FALSE;
2072     /*
2073       my_b_seek does an implicit flush_io_cache, so we need to:
2074 
2075       1. check if this log is active (hot)
2076       2. if it is we keep log_lock until the seek ends, otherwise
2077          release it right away.
2078 
2079       If we did not take log_lock, SQL thread might race with IO
2080       thread for the IO_CACHE mutex.
2081 
2082     */
2083     mysql_mutex_t *log_lock= relay_log.get_log_lock();
2084     mysql_mutex_lock(log_lock);
2085     hot_log= relay_log.is_active(linfo.log_file_name);
2086 
2087     if (!hot_log)
2088       mysql_mutex_unlock(log_lock);
2089 
2090     my_b_seek(cur_log, (my_off_t) 0);
2091 
2092     if (hot_log)
2093       mysql_mutex_unlock(log_lock);
2094     DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
2095   }
2096 
2097   cur_log_fd = -1;
2098   slave_skip_counter= 0;
2099   abort_pos_wait= 0;
2100   log_space_limit= relay_log_space_limit;
2101   log_space_total= 0;
2102   tables_to_lock= 0;
2103   tables_to_lock_count= 0;
2104 
2105   char pattern[FN_REFLEN];
2106   (void) my_realpath(pattern, slave_load_tmpdir, 0);
2107   /*
2108    @TODO:
2109     In MSR, sometimes slave fail with the following error:
2110     Unable to use slave's temporary directory /tmp -
2111     Can't create/write to file '/tmp/SQL_LOAD-92d1eee0-9de4-11e3-8874-68730ad50fcb'    (Errcode: 17 - File exists), Error_code: 1
2112 
2113    */
2114   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
2115                 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
2116   {
2117     sql_print_error("Unable to use slave's temporary directory '%s'.",
2118                     slave_load_tmpdir);
2119     DBUG_RETURN(1);
2120   }
2121   unpack_filename(slave_patternload_file, pattern);
2122   slave_patternload_file_size= strlen(slave_patternload_file);
2123 
2124   /*
2125     The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
2126     Note that the I/O thread flushes it to disk after writing every
2127     event, in flush_info within the master info.
2128   */
2129   /*
2130     For the maximum log size, we choose max_relay_log_size if it is
2131     non-zero, max_binlog_size otherwise. If later the user does SET
2132     GLOBAL on one of these variables, fix_max_binlog_size and
2133     fix_max_relay_log_size will reconsider the choice (for example
2134     if the user changes max_relay_log_size to zero, we have to
2135     switch to using max_binlog_size for the relay log) and update
2136     relay_log.max_size (and mysql_bin_log.max_size).
2137   */
2138   {
2139     /* Reports an error and returns, if the --relay-log's path
2140        is a directory.*/
2141     if (opt_relay_logname &&
2142         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
2143     {
2144       sql_print_error("Path '%s' is a directory name, please specify \
2145 a file name for --relay-log option.", opt_relay_logname);
2146       DBUG_RETURN(1);
2147     }
2148 
2149     /* Reports an error and returns, if the --relay-log-index's path
2150        is a directory.*/
2151     if (opt_relaylog_index_name &&
2152         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
2153         == FN_LIBCHAR)
2154     {
2155       sql_print_error("Path '%s' is a directory name, please specify \
2156 a file name for --relay-log-index option.", opt_relaylog_index_name);
2157       DBUG_RETURN(1);
2158     }
2159 
2160     char buf[FN_REFLEN];
2161     /* The base name of the relay log file considering multisource rep */
2162     const char *ln;
2163     /*
2164       relay log name without channel prefix taking into account
2165       --relay-log option.
2166     */
2167     const char *ln_without_channel_name;
2168     static bool name_warning_sent= 0;
2169 
2170     /*
2171       Buffer to add channel name suffix when relay-log option is provided.
2172     */
2173     char relay_bin_channel[FN_REFLEN];
2174     /*
2175       Buffer to add channel name suffix when relay-log-index option is provided
2176     */
2177     char relay_bin_index_channel[FN_REFLEN];
2178 
2179     /* name of the index file if opt_relaylog_index_name is set*/
2180     const char* log_index_name;
2181 
2182 
2183     ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
2184                                 "-relay-bin", buf);
2185 
2186     ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
2187                                       ln_without_channel_name);
2188 
2189     /* We send the warning only at startup, not after every RESET SLAVE */
2190     if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
2191     {
2192       /*
2193         User didn't give us info to name the relay log index file.
2194         Picking `hostname`-relay-bin.index like we do, causes replication to
2195         fail if this slave's hostname is changed later. So, we would like to
2196         instead require a name. But as we don't want to break many existing
2197         setups, we only give warning, not error.
2198       */
2199       sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
2200                         " so replication "
2201                         "may break when this MySQL server acts as a "
2202                         "slave and has his hostname changed!! Please "
2203                         "use '--relay-log=%s' to avoid this problem.",
2204                         ln_without_channel_name);
2205       name_warning_sent= 1;
2206     }
2207 
2208     relay_log.is_relay_log= TRUE;
2209 
2210     /*
2211        If relay log index option is set, convert into channel specific
2212        index file. If the opt_relaylog_index has an extension, we strip
2213        it too. This is inconsistent to relay log names.
2214     */
2215     if (opt_relaylog_index_name)
2216     {
2217       char index_file_withoutext[FN_REFLEN];
2218       relay_log.generate_name(opt_relaylog_index_name,"",
2219                               index_file_withoutext);
2220 
2221       log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
2222                                                    FN_REFLEN,
2223                                                    index_file_withoutext);
2224     }
2225     else
2226       log_index_name= 0;
2227 
2228 
2229 
2230     if (relay_log.open_index_file(log_index_name, ln, TRUE))
2231     {
2232       sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
2233       DBUG_RETURN(1);
2234     }
2235 #ifndef NDEBUG
2236     global_sid_lock->wrlock();
2237     gtid_set.dbug_print("set of GTIDs in relay log before initialization");
2238     global_sid_lock->unlock();
2239 #endif
2240     /*
2241       In the init_gtid_set below we pass the mi->transaction_parser.
2242       This will be useful to ensure that we only add a GTID to
2243       the Retrieved_Gtid_Set for fully retrieved transactions. Also, it will
2244       be useful to ensure the Retrieved_Gtid_Set behavior when auto
2245       positioning is disabled (we could have transactions spanning multiple
2246       relay log files in this case).
2247       We will skip this initialization if relay_log_recovery is set in order
2248       to save time, as neither the GTIDs nor the transaction_parser state
2249       would be useful when the relay log will be cleaned up later when calling
2250       init_recovery.
2251     */
2252     if (!is_relay_log_recovery &&
2253         !gtid_retrieved_initialized &&
2254         relay_log.init_gtid_sets(&gtid_set, NULL,
2255                                  opt_slave_sql_verify_checksum,
2256                                  true/*true=need lock*/,
2257                                  &mi->transaction_parser, &gtid_partial_trx))
2258     {
2259       sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
2260       DBUG_RETURN(1);
2261     }
2262     gtid_retrieved_initialized= true;
2263 #ifndef NDEBUG
2264     global_sid_lock->wrlock();
2265     gtid_set.dbug_print("set of GTIDs in relay log after initialization");
2266     global_sid_lock->unlock();
2267 #endif
2268     if (!gtid_partial_trx.is_empty())
2269     {
2270       /*
2271         The init_gtid_set has found an incomplete transaction in the relay log.
2272         We add this transaction's GTID to the last_gtid_queued so the IO thread
2273         knows which GTID to add to the Retrieved_Gtid_Set when reaching the end
2274         of the incomplete transaction.
2275       */
2276       mi->set_last_gtid_queued(gtid_partial_trx);
2277     }
2278     else
2279     {
2280       mi->clear_last_gtid_queued();
2281     }
2282     /*
2283       Configures what object is used by the current log to store processed
2284       gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
2285       corretly compute the set of previous gtids.
2286     */
2287     relay_log.set_previous_gtid_set_relaylog(&gtid_set);
2288     /*
2289       note, that if open() fails, we'll still have index file open
2290       but a destructor will take care of that
2291     */
2292 
2293     mysql_mutex_t *log_lock= relay_log.get_log_lock();
2294     mysql_mutex_lock(log_lock);
2295 
2296     if (relay_log.open_binlog(ln, 0,
2297                               (max_relay_log_size ? max_relay_log_size :
2298                                max_binlog_size), true,
2299                               true/*need_lock_index=true*/,
2300                               true/*need_sid_lock=true*/,
2301                               mi->get_mi_description_event()))
2302     {
2303       mysql_mutex_unlock(log_lock);
2304       sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
2305       DBUG_RETURN(1);
2306     }
2307 
2308     mysql_mutex_unlock(log_lock);
2309 
2310   }
2311 
2312    /*
2313     This checks if the repository was created before and thus there
2314     will be values to be read. Please, do not move this call after
2315     the handler->init_info().
2316   */
2317   if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
2318   {
2319     msg= "Error checking relay log repository";
2320     error= 1;
2321     goto err;
2322   }
2323 
2324   if (handler->init_info())
2325   {
2326     msg= "Error reading relay log configuration";
2327     error= 1;
2328     goto err;
2329   }
2330 
2331   if (check_return == REPOSITORY_DOES_NOT_EXIST)
2332   {
2333     /* Init relay log with first entry in the relay index file */
2334     if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
2335                            false/*need_data_lock=false (lock should be held
2336                                   prior to invoking this function)*/,
2337                            &msg, 0))
2338     {
2339       error= 1;
2340       goto err;
2341     }
2342     group_master_log_name[0]= 0;
2343     group_master_log_pos= 0;
2344   }
2345   else
2346   {
2347     if (read_info(handler))
2348     {
2349       msg= "Error reading relay log configuration";
2350       error= 1;
2351       goto err;
2352     }
2353 
2354     if (is_relay_log_recovery && init_recovery(mi, &msg))
2355     {
2356       error= 1;
2357       goto err;
2358     }
2359 
2360     if (init_relay_log_pos(group_relay_log_name,
2361                            group_relay_log_pos,
2362                            false/*need_data_lock=false (lock should be held
2363                                   prior to invoking this function)*/,
2364                            &msg, 0))
2365     {
2366       char llbuf[22];
2367       sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
2368                       group_relay_log_name,
2369                       llstr(group_relay_log_pos, llbuf));
2370       error= 1;
2371       goto err;
2372     }
2373 
2374 #ifndef NDEBUG
2375     {
2376       char llbuf1[22], llbuf2[22];
2377       DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
2378                           llstr(my_b_tell(cur_log),llbuf1),
2379                           llstr(event_relay_log_pos,llbuf2)));
2380       assert(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2381       assert((my_b_tell(cur_log) == event_relay_log_pos));
2382     }
2383 #endif
2384   }
2385 
2386   inited= 1;
2387   error_on_rli_init_info= false;
2388   if (flush_info(TRUE))
2389   {
2390     msg= "Error reading relay log configuration";
2391     error= 1;
2392     goto err;
2393   }
2394 
2395   if (count_relay_log_space())
2396   {
2397     msg= "Error counting relay log space";
2398     error= 1;
2399     goto err;
2400   }
2401 
2402   /*
2403     In case of MTS the recovery is deferred until the end of
2404     load_mi_and_rli_from_repositories.
2405   */
2406   if (!mi->rli->mts_recovery_group_cnt)
2407     is_relay_log_recovery= FALSE;
2408   DBUG_RETURN(error);
2409 
2410 err:
2411   handler->end_info();
2412   inited= 0;
2413   error_on_rli_init_info= true;
2414   if (msg)
2415     sql_print_error("%s.", msg);
2416   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2417                   true/*need_lock_log=true*/,
2418                   true/*need_lock_index=true*/);
2419   DBUG_RETURN(error);
2420 }
2421 
end_info()2422 void Relay_log_info::end_info()
2423 {
2424   DBUG_ENTER("Relay_log_info::end_info");
2425 
2426   error_on_rli_init_info= false;
2427   if (!inited)
2428     DBUG_VOID_RETURN;
2429 
2430   handler->end_info();
2431 
2432   if (cur_log_fd >= 0)
2433   {
2434     end_io_cache(&cache_buf);
2435     (void)my_close(cur_log_fd, MYF(MY_WME));
2436     cur_log_fd= -1;
2437   }
2438   inited = 0;
2439   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2440                   true/*need_lock_log=true*/,
2441                   true/*need_lock_index=true*/);
2442   relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2443   /*
2444     Delete the slave's temporary tables from memory.
2445     In the future there will be other actions than this, to ensure persistance
2446     of slave's temp tables after shutdown.
2447   */
2448   close_temporary_tables();
2449 
2450   DBUG_VOID_RETURN;
2451 }
2452 
flush_current_log()2453 int Relay_log_info::flush_current_log()
2454 {
2455   DBUG_ENTER("Relay_log_info::flush_current_log");
2456   /*
2457     When we come to this place in code, relay log may or not be initialized;
2458     the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2459   */
2460   IO_CACHE *log_file= relay_log.get_log_file();
2461   if (flush_io_cache(log_file))
2462     DBUG_RETURN(2);
2463 
2464   DBUG_RETURN(0);
2465 }
2466 
set_master_info(Master_info * info)2467 void Relay_log_info::set_master_info(Master_info* info)
2468 {
2469   mi= info;
2470 }
2471 
2472 /**
2473   Stores the file and position where the execute-slave thread are in the
2474   relay log:
2475 
2476     - As this is only called by the slave thread or on STOP SLAVE, with the
2477       log_lock grabbed and the slave thread stopped, we don't need to have
2478       a lock here.
2479     - If there is an active transaction, then we don't update the position
2480       in the relay log.  This is to ensure that we re-execute statements
2481       if we die in the middle of an transaction that was rolled back.
2482     - As a transaction never spans binary logs, we don't have to handle the
2483       case where we do a relay-log-rotation in the middle of the transaction.
2484       If this would not be the case, we would have to ensure that we
2485       don't delete the relay log file where the transaction started when
2486       we switch to a new relay log file.
2487 
2488   @retval  0   ok,
2489   @retval  1   write error, otherwise.
2490 */
2491 
2492 /**
2493   Store the file and position where the slave's SQL thread are in the
2494   relay log.
2495 
2496   Notes:
2497 
2498   - This function should be called either from the slave SQL thread,
2499     or when the slave thread is not running.  (It reads the
2500     group_{relay|master}_log_{pos|name} and delay fields in the rli
2501     object.  These may only be modified by the slave SQL thread or by
2502     a client thread when the slave SQL thread is not running.)
2503 
2504   - If there is an active transaction, then we do not update the
2505     position in the relay log.  This is to ensure that we re-execute
2506     statements if we die in the middle of an transaction that was
2507     rolled back.
2508 
2509   - As a transaction never spans binary logs, we don't have to handle
2510     the case where we do a relay-log-rotation in the middle of the
2511     transaction.  If transactions could span several binlogs, we would
2512     have to ensure that we do not delete the relay log file where the
2513     transaction started before switching to a new relay log file.
2514 
2515   - Error can happen if writing to file fails or if flushing the file
2516     fails.
2517 
2518   @param rli The object representing the Relay_log_info.
2519 
2520   @todo Change the log file information to a binary format to avoid
2521   calling longlong2str.
2522 
2523   @return 0 on success, 1 on error.
2524 */
flush_info(const bool force)2525 int Relay_log_info::flush_info(const bool force)
2526 {
2527   DBUG_ENTER("Relay_log_info::flush_info");
2528 
2529   if (!inited)
2530     DBUG_RETURN(0);
2531 
2532   /*
2533     We update the sync_period at this point because only here we
2534     now that we are handling a relay log info. This needs to be
2535     update every time we call flush because the option maybe
2536     dinamically set.
2537   */
2538   mysql_mutex_lock(&mts_temp_table_LOCK);
2539   handler->set_sync_period(sync_relayloginfo_period);
2540 
2541   if (write_info(handler))
2542     goto err;
2543 
2544   if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2545     goto err;
2546 
2547   force_flush_postponed_due_to_split_trans= false;
2548   mysql_mutex_unlock(&mts_temp_table_LOCK);
2549   DBUG_RETURN(0);
2550 
2551 err:
2552   sql_print_error("Error writing relay log configuration.");
2553   mysql_mutex_unlock(&mts_temp_table_LOCK);
2554   DBUG_RETURN(1);
2555 }
2556 
get_number_info_rli_fields()2557 size_t Relay_log_info::get_number_info_rli_fields()
2558 {
2559   return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2560 }
2561 
read_info(Rpl_info_handler * from)2562 bool Relay_log_info::read_info(Rpl_info_handler *from)
2563 {
2564   int lines= 0;
2565   char *first_non_digit= NULL;
2566   ulong temp_group_relay_log_pos= 0;
2567   ulong temp_group_master_log_pos= 0;
2568   int temp_sql_delay= 0;
2569   int temp_internal_id= internal_id;
2570 
2571   DBUG_ENTER("Relay_log_info::read_info");
2572 
2573   /*
2574     Should not read RLI from file in client threads. Client threads
2575     only use RLI to execute BINLOG statements.
2576 
2577     @todo Uncomment the following assertion. Currently,
2578     Relay_log_info::init() is called from init_master_info() before
2579     the THD object Relay_log_info::sql_thd is created. That means we
2580     cannot call belongs_to_client() since belongs_to_client()
2581     dereferences Relay_log_info::sql_thd. So we need to refactor
2582     slightly: the THD object should be created by Relay_log_info
2583     constructor (or passed to it), so that we are guaranteed that it
2584     exists at this point. /Sven
2585   */
2586   //assert(!belongs_to_client());
2587 
2588   /*
2589     Starting from 5.1.x, relay-log.info has a new format. Now, its
2590     first line contains the number of lines in the file. By reading
2591     this number we can determine which version our master.info comes
2592     from. We can't simply count the lines in the file, since
2593     versions before 5.1.x could generate files with more lines than
2594     needed. If first line doesn't contain a number, or if it
2595     contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2596     then the file is treated like a file from pre-5.1.x version.
2597     There is no ambiguity when reading an old master.info: before
2598     5.1.x, the first line contained the binlog's name, which is
2599     either empty or has an extension (contains a '.'), so can't be
2600     confused with an integer.
2601 
2602     So we're just reading first line and trying to figure which
2603     version is this.
2604   */
2605 
2606   /*
2607     The first row is temporarily stored in mi->master_log_name, if
2608     it is line count and not binlog name (new format) it will be
2609     overwritten by the second row later.
2610   */
2611   if (from->prepare_info_for_read() ||
2612       from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2613                      (char *) ""))
2614     DBUG_RETURN(TRUE);
2615 
2616   lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2617 
2618   if (group_relay_log_name[0]!='\0' &&
2619       *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2620   {
2621     /* Seems to be new format => read group relay log name */
2622     if (from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2623                        (char *) ""))
2624       DBUG_RETURN(TRUE);
2625   }
2626   else
2627      DBUG_PRINT("info", ("relay_log_info file is in old format."));
2628 
2629   if (from->get_info(&temp_group_relay_log_pos,
2630                      (ulong) BIN_LOG_HEADER_SIZE) ||
2631       from->get_info(group_master_log_name,
2632                      sizeof(group_relay_log_name),
2633                      (char *) "") ||
2634       from->get_info(&temp_group_master_log_pos,
2635                      0UL))
2636     DBUG_RETURN(TRUE);
2637 
2638   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2639   {
2640     if (from->get_info(&temp_sql_delay, 0))
2641       DBUG_RETURN(TRUE);
2642   }
2643 
2644   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2645   {
2646     if (from->get_info(&recovery_parallel_workers, 0UL))
2647       DBUG_RETURN(TRUE);
2648   }
2649 
2650   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2651   {
2652     if (from->get_info(&temp_internal_id, 1))
2653       DBUG_RETURN(TRUE);
2654   }
2655 
2656   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL)
2657   {
2658     /* the default value is empty string"" */
2659     if (from->get_info(channel, sizeof(channel), (char*)""))
2660       DBUG_RETURN(TRUE);
2661   }
2662 
2663   group_relay_log_pos=  temp_group_relay_log_pos;
2664   group_master_log_pos= temp_group_master_log_pos;
2665   sql_delay= (int32) temp_sql_delay;
2666   internal_id= (uint) temp_internal_id;
2667 
2668   assert(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2669          (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2670   DBUG_RETURN(FALSE);
2671 }
2672 
set_info_search_keys(Rpl_info_handler * to)2673 bool Relay_log_info::set_info_search_keys(Rpl_info_handler *to)
2674 {
2675   DBUG_ENTER("Relay_log_info::set_info_search_keys");
2676 
2677   if (to->set_info(LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL, channel))
2678     DBUG_RETURN(TRUE);
2679 
2680   DBUG_RETURN(FALSE);
2681 }
2682 
2683 
write_info(Rpl_info_handler * to)2684 bool Relay_log_info::write_info(Rpl_info_handler *to)
2685 {
2686   DBUG_ENTER("Relay_log_info::write_info");
2687 
2688   /*
2689     @todo Uncomment the following assertion. See todo in
2690     Relay_log_info::read_info() for details. /Sven
2691   */
2692   //assert(!belongs_to_client());
2693 
2694   if (to->prepare_info_for_write() ||
2695       to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2696       to->set_info(group_relay_log_name) ||
2697       to->set_info((ulong) group_relay_log_pos) ||
2698       to->set_info(group_master_log_name) ||
2699       to->set_info((ulong) group_master_log_pos) ||
2700       to->set_info((int) sql_delay) ||
2701       to->set_info(recovery_parallel_workers) ||
2702       to->set_info((int) internal_id) ||
2703       to->set_info(channel))
2704     DBUG_RETURN(TRUE);
2705 
2706   DBUG_RETURN(FALSE);
2707 }
2708 
2709 /**
2710    The method is run by SQL thread/MTS Coordinator.
2711    It replaces the current FD event with a new one.
2712    A version adaptation routine is invoked for the new FD
2713    to align the slave applier execution context with the master version.
2714 
2715    Since FD are shared by Coordinator and Workers in the MTS mode,
2716    deletion of the old FD is done through decrementing its usage counter.
2717    The destructor runs when the later drops to zero,
2718    also see @c Slave_worker::set_rli_description_event().
2719    The usage counter of the new FD is incremented.
2720 
2721    Although notice that MTS worker runs it, inefficiently (see assert),
2722    once at its destruction time.
2723 
2724    @param  a pointer to be installed into execution context
2725            FormatDescriptor event
2726 */
2727 
set_rli_description_event(Format_description_log_event * fe)2728 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2729 {
2730   DBUG_ENTER("Relay_log_info::set_rli_description_event");
2731   assert(!info_thd || !is_mts_worker(info_thd) || !fe);
2732 
2733   if (fe)
2734   {
2735     ulong fe_version= adapt_to_master_version(fe);
2736 
2737     if (info_thd)
2738     {
2739       // See rpl_rli_pdb.h:Slave_worker::set_rli_description_event.
2740       if (!is_in_group() &&
2741           (info_thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
2742            info_thd->variables.gtid_next.type == UNDEFINED_GROUP))
2743       {
2744         DBUG_PRINT("info", ("Setting gtid_next.type to NOT_YET_DETERMINED_GROUP"));
2745         info_thd->variables.gtid_next.set_not_yet_determined();
2746       }
2747 
2748       if (is_parallel_exec() && fe_version > 0)
2749       {
2750         /*
2751           Prepare for workers' adaption to a new FD version. Workers
2752           will see notification through scheduling of a first event of
2753           a new post-new-FD.
2754         */
2755         for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
2756           (*it)->fd_change_notified= false;
2757       }
2758     }
2759   }
2760   if (rli_description_event &&
2761       rli_description_event->usage_counter.atomic_add(-1) == 1)
2762     delete rli_description_event;
2763 #ifndef NDEBUG
2764   else
2765     /* It must be MTS mode when the usage counter greater than 1. */
2766     assert(!rli_description_event || is_parallel_exec());
2767 #endif
2768   rli_description_event= fe;
2769   if (rli_description_event)
2770     rli_description_event->usage_counter.atomic_add(1);
2771 
2772   DBUG_VOID_RETURN;
2773 }
2774 
2775 struct st_feature_version
2776 {
2777   /*
2778     The enum must be in the version non-descending top-down order,
2779     the last item formally corresponds to highest possible server
2780     version (never reached, thereby no adapting actions here);
2781     enumeration starts from zero.
2782   */
2783   enum
2784   {
2785     WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2786     _END_OF_LIST // always last
2787   } item;
2788   /*
2789     Version where the feature is introduced.
2790   */
2791   uchar version_split[3];
2792   /*
2793     Action to perform when according to FormatDescriptor event Master
2794     is found to be feature-aware while previously it has *not* been.
2795   */
2796   void (*upgrade) (THD*);
2797   /*
2798     Action to perform when according to FormatDescriptor event Master
2799     is found to be feature-*un*aware while previously it has been.
2800   */
2801   void (*downgrade) (THD*);
2802 };
2803 
wl6292_upgrade_func(THD * thd)2804 void wl6292_upgrade_func(THD *thd)
2805 {
2806   thd->variables.explicit_defaults_for_timestamp= false;
2807   if (global_system_variables.explicit_defaults_for_timestamp)
2808     thd->variables.explicit_defaults_for_timestamp= true;
2809 
2810   return;
2811 }
2812 
wl6292_downgrade_func(THD * thd)2813 void wl6292_downgrade_func(THD *thd)
2814 {
2815   if (global_system_variables.explicit_defaults_for_timestamp)
2816     thd->variables.explicit_defaults_for_timestamp= false;
2817 
2818   return;
2819 }
2820 
2821 /**
2822    Sensitive to Master-vs-Slave version difference features
2823    should be listed in the version non-descending order.
2824 */
2825 static st_feature_version s_features[]=
2826 {
2827   // order is the same as in the enum
2828   { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2829     {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2830   { st_feature_version::_END_OF_LIST,
2831     {255, 255, 255}, NULL, NULL }
2832 };
2833 
2834 /**
2835    The method computes the incoming "master"'s FD server version and that
2836    of the currently installed (if ever) rli_description_event, to
2837    invoke more specific method to compare the two and adapt slave applier execution
2838    context to the new incoming master's version.
2839 
2840    This method is specifically for STS applier/MTS Coordinator as well as
2841    for a user thread applying binlog events.
2842 
2843    @param  fdle  a pointer to new Format Description event that is being
2844                  set up a new execution context.
2845    @return 0                when the versions are equal,
2846            master_version   otherwise
2847 */
adapt_to_master_version(Format_description_log_event * fdle)2848 ulong Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2849 {
2850   ulong master_version, current_version, slave_version;
2851 
2852   slave_version= version_product(slave_version_split);
2853   /* When rli_description_event is uninitialized yet take the slave's version */
2854   master_version= !fdle ? slave_version : fdle->get_product_version();
2855   current_version= !rli_description_event ? slave_version :
2856     rli_description_event->get_product_version();
2857 
2858   return adapt_to_master_version_updown(master_version, current_version);
2859 }
2860 
2861 /**
2862   The method compares two supplied versions and carries out down- or
2863   up- grade customization of execution context of the slave applier
2864   (thd).
2865 
2866   The method is invoked in the STS case through
2867   Relay_log_info::adapt_to_master_version() right before a new master
2868   FD is installed into the applier execution context; in the MTS
2869   case it's done by the Worker when it's assigned with a first event
2870   after the latest new FD has been installed.
2871 
2872   Comparison of the current (old, existing) and the master (new,
2873   incoming) versions yields adaptive actions.
2874   To explain that, let's denote V_0 as the current, and the master's
2875   one as V_1.
2876   In the downgrade case (V_1 < V_0) a server feature that is undefined
2877   in V_1 but is defined starting from some V_f of [V_1 + 1, V_0] range
2878   (+1 to mean V_1 excluded) are invalidated ("removed" from execution context)
2879   by running so called here downgrade action.
2880   Conversely in the upgrade case a feature defined in [V_0 + 1, V_1] range
2881   is validated ("added" to execution context) by running its upgrade action.
2882   A typical use case showing how adaptive actions are necessary for the slave
2883   applier is when the master version is lesser than the slave's one.
2884   In such case events generated on the "older" master may need to be applied
2885   in their native server context. And such context can be provided by downgrade
2886   actions.
2887   Conversely, when the old master events are run out and a newer master's events
2888   show up for applying, the execution context will be upgraded through
2889   the namesake actions.
2890 
2891   Notice that a relay log may have two FD events, one the slave local
2892   and the other from the Master. As there's no concern for the FD
2893   originator this leads to two adapt_to_master_version() calls.
2894   It's not harmful as can be seen from the following example.
2895   Say the currently installed FD's version is
2896   V_m, then at relay-log rotation the following transition takes
2897   place:
2898 
2899      V_m  -adapt-> V_s -adapt-> V_m.
2900 
2901   here and further `m' subscript stands for the master, `s' for the slave.
2902   It's clear that in this case an ineffective V_m -> V_m transition occurs.
2903 
2904   At composing downgrade/upgrade actions keep in mind that the slave applier
2905   version transition goes the following route:
2906   The initial version is that of the slave server (V_ss).
2907   It changes to a magic 4.0 at the slave relay log initialization.
2908   In the following course versions are extracted from each FD read out,
2909   regardless of what server generated it. Here is a typical version
2910   transition sequence underscored with annotation:
2911 
2912    V_ss -> 4.0 -> V(FD_s^1) -> V(FD_m^2)   --->   V(FD_s^3) -> V(FD_m^4)  ...
2913 
2914     ----------     -----------------     --------  ------------------     ---
2915      bootstrap       1st relay log       rotation      2nd log            etc
2916 
2917   The upper (^) subscipt enumerates Format Description events, V(FD^i) stands
2918   for a function extrating the version data from the i:th FD.
2919 
2920   There won't be any action to execute when info_thd is undefined,
2921   e.g at bootstrap.
2922 
2923   @param  master_version   an upcoming new version
2924   @param  current_version  the current version
2925   @return 0                when the new version is equal to the current one,
2926           master_version   otherwise
2927 */
adapt_to_master_version_updown(ulong master_version,ulong current_version)2928 ulong Relay_log_info::adapt_to_master_version_updown(ulong master_version,
2929                                                      ulong current_version)
2930 {
2931   THD *thd= info_thd;
2932   /*
2933     When the SQL thread or MTS Coordinator executes this method
2934     there's a constraint on current_version argument.
2935   */
2936   assert(!thd ||
2937          thd->rli_fake != NULL ||
2938          thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER ||
2939          (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
2940           (!rli_description_event ||
2941            current_version ==
2942            rli_description_event->get_product_version())));
2943 
2944   if (master_version == current_version)
2945     return 0;
2946   else if (!thd)
2947     return master_version;
2948 
2949   bool downgrade= master_version < current_version;
2950    /*
2951     find item starting from and ending at for which adaptive actions run
2952     for downgrade or upgrade branches.
2953     (todo: convert into bsearch when number of features will grow significantly)
2954   */
2955   long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2956 
2957   for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2958   {
2959     ulong ver_f= version_product(s_features[i].version_split);
2960 
2961     if ((downgrade ? master_version : current_version) < ver_f &&
2962         i_first == st_feature_version::_END_OF_LIST)
2963       i_first= i;
2964     if ((downgrade ? current_version : master_version) < ver_f)
2965     {
2966       i_last= i;
2967       assert(i_last >= i_first);
2968       break;
2969     }
2970   }
2971 
2972   /*
2973      actions, executed in version non-descending st_feature_version order
2974   */
2975   for (i= i_first; i < i_last; i++)
2976   {
2977     /* Run time check of the st_feature_version items ordering */
2978     assert(!i ||
2979            version_product(s_features[i - 1].version_split) <=
2980            version_product(s_features[i].version_split));
2981 
2982     assert((downgrade ? master_version : current_version) <
2983            version_product(s_features[i].version_split) &&
2984            (downgrade ? current_version : master_version  >=
2985             version_product(s_features[i].version_split)));
2986 
2987     if (downgrade && s_features[i].downgrade)
2988     {
2989       s_features[i].downgrade(thd);
2990     }
2991     else if (s_features[i].upgrade)
2992     {
2993       s_features[i].upgrade(thd);
2994     }
2995   }
2996 
2997   return master_version;
2998 }
2999 
relay_log_number_to_name(uint number,char name[FN_REFLEN+1])3000 void Relay_log_info::relay_log_number_to_name(uint number,
3001                                               char name[FN_REFLEN+1])
3002 {
3003   char *str= NULL;
3004   char relay_bin_channel[FN_REFLEN+1];
3005   const char *relay_log_basename_channel=
3006     add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN+1,
3007                                   relay_log_basename);
3008 
3009   /* str points to closing null of relay log basename channel */
3010   str= strmake(name, relay_log_basename_channel, FN_REFLEN+1);
3011   *str++= '.';
3012   sprintf(str, "%06u", number);
3013 }
3014 
relay_log_name_to_number(const char * name)3015 uint Relay_log_info::relay_log_name_to_number(const char *name)
3016 {
3017   return static_cast<uint>(atoi(fn_ext(name)+1));
3018 }
3019 
is_mts_db_partitioned(Relay_log_info * rli)3020 bool is_mts_db_partitioned(Relay_log_info * rli)
3021 {
3022   return (rli->current_mts_submode->get_type() ==
3023     MTS_PARALLEL_TYPE_DB_NAME);
3024 }
3025 
get_for_channel_str(bool upper_case) const3026 const char* Relay_log_info::get_for_channel_str(bool upper_case) const
3027 {
3028   if (rli_fake)
3029     return "";
3030   else
3031     return mi->get_for_channel_str(upper_case);
3032 }
3033 
add_gtid_set(const Gtid_set * gtid_set)3034 enum_return_status Relay_log_info::add_gtid_set(const Gtid_set *gtid_set)
3035 {
3036   DBUG_ENTER("Relay_log_info::add_gtid_set(gtid_set)");
3037 
3038   enum_return_status return_status= this->gtid_set.add_gtid_set(gtid_set);
3039 
3040   DBUG_RETURN(return_status);
3041 }
3042 
detach_engine_ha_data(THD * thd)3043 void Relay_log_info::detach_engine_ha_data(THD *thd)
3044 {
3045   is_engine_ha_data_detached= true;
3046     /*
3047       In case of slave thread applier or processing binlog by client,
3048       detach the engine ha_data ("native" engine transaction)
3049       in favor of dynamically created.
3050     */
3051   plugin_foreach(thd, detach_native_trx,
3052                  MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3053 }
3054 
reattach_engine_ha_data(THD * thd)3055 void Relay_log_info::reattach_engine_ha_data(THD *thd)
3056 {
3057   is_engine_ha_data_detached = false;
3058   /*
3059     In case of slave thread applier or processing binlog by client,
3060     reattach the engine ha_data ("native" engine transaction)
3061     in favor of dynamically created.
3062   */
3063   plugin_foreach(thd, reattach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3064 }
3065