1 /* Copyright (c) 2006, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "rpl_rli.h"
24 
25 #include "my_dir.h"                // MY_STAT
26 #include "log.h"                   // sql_print_error
27 #include "log_event.h"             // Log_event
28 #include "m_string.h"
29 #include "rpl_group_replication.h" // set_group_replication_retrieved_certifi...
30 #include "rpl_info_factory.h"      // Rpl_info_factory
31 #include "rpl_mi.h"                // Master_info
32 #include "rpl_msr.h"               // channel_map
33 #include "rpl_rli_pdb.h"           // Slave_worker
34 #include "sql_base.h"              // close_thread_tables
35 #include "strfunc.h"               // strconvert
36 #include "transaction.h"           // trans_commit_stmt
37 #include "debug_sync.h"
38 #include "pfs_file_provider.h"
39 #include "mysql/psi/mysql_file.h"
40 #include "mutex_lock.h"            // Mutex_lock
41 
42 #include <algorithm>
43 using std::min;
44 using std::max;
45 
46 /*
47   Please every time you add a new field to the relay log info, update
48   what follows. For now, this is just used to get the number of
49   fields.
50 */
51 const char* info_rli_fields[]=
52 {
53   "number_of_lines",
54   "group_relay_log_name",
55   "group_relay_log_pos",
56   "group_master_log_name",
57   "group_master_log_pos",
58   "sql_delay",
59   "number_of_workers",
60   "id",
61   "channel_name"
62 };
63 
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel,bool is_rli_fake)64 Relay_log_info::Relay_log_info(bool is_slave_recovery
65 #ifdef HAVE_PSI_INTERFACE
66                                ,PSI_mutex_key *param_key_info_run_lock,
67                                PSI_mutex_key *param_key_info_data_lock,
68                                PSI_mutex_key *param_key_info_sleep_lock,
69                                PSI_mutex_key *param_key_info_thd_lock,
70                                PSI_mutex_key *param_key_info_data_cond,
71                                PSI_mutex_key *param_key_info_start_cond,
72                                PSI_mutex_key *param_key_info_stop_cond,
73                                PSI_mutex_key *param_key_info_sleep_cond
74 #endif
75                                , uint param_id, const char *param_channel,
76                                bool is_rli_fake
77                               )
78    :Rpl_info("SQL"
79 #ifdef HAVE_PSI_INTERFACE
80              ,param_key_info_run_lock, param_key_info_data_lock,
81              param_key_info_sleep_lock, param_key_info_thd_lock,
82              param_key_info_data_cond, param_key_info_start_cond,
83              param_key_info_stop_cond, param_key_info_sleep_cond
84 #endif
85              , param_id, param_channel
86             ),
87    replicate_same_server_id(::replicate_same_server_id),
88    cur_log_fd(-1), relay_log(&sync_relaylog_period, SEQ_READ_APPEND),
89    is_relay_log_recovery(is_slave_recovery),
90    save_temporary_tables(0),
91    cur_log_old_open_count(0), error_on_rli_init_info(false),
92    group_relay_log_pos(0), event_relay_log_number(0),
93    event_relay_log_pos(0), event_start_pos(0),
94    group_master_log_pos(0),
95    gtid_set(global_sid_map, global_sid_lock),
96    rli_fake(is_rli_fake),
97    gtid_retrieved_initialized(false),
98    is_group_master_log_pos_invalid(false),
99    log_space_total(0), ignore_log_space_limit(0),
100    sql_force_rotate_relay(false),
101    last_master_timestamp(0), slave_skip_counter(0),
102    abort_pos_wait(0), until_condition(UNTIL_NONE),
103    until_log_pos(0),
104    until_sql_gtids(global_sid_map),
105    until_sql_gtids_first_event(true),
106    trans_retries(0), retried_trans(0),
107    tables_to_lock(0), tables_to_lock_count(0),
108    rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
109    workers(PSI_NOT_INSTRUMENTED),
110    workers_array_initialized(false),
111    curr_group_assigned_parts(PSI_NOT_INSTRUMENTED),
112    curr_group_da(PSI_NOT_INSTRUMENTED),
113    slave_parallel_workers(0),
114    exit_counter(0),
115    max_updated_index(0),
116    recovery_parallel_workers(0), checkpoint_seqno(0),
117    checkpoint_group(opt_mts_checkpoint_group),
118    recovery_groups_inited(false), mts_recovery_group_cnt(0),
119    mts_recovery_index(0), mts_recovery_group_seen_begin(0),
120    mts_group_status(MTS_NOT_IN_GROUP),
121    stats_exec_time(0), stats_read_time(0),
122    least_occupied_workers(PSI_NOT_INSTRUMENTED),
123    current_mts_submode(0),
124    reported_unsafe_warning(false), rli_description_event(NULL),
125    commit_order_mngr(NULL),
126    sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
127    long_find_row_note_printed(false),
128    thd_tx_priority(0),
129    is_engine_ha_data_detached(false)
130 {
131   DBUG_ENTER("Relay_log_info::Relay_log_info");
132 
133 #ifdef HAVE_PSI_INTERFACE
134   relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
135                          key_RELAYLOG_LOCK_commit,
136                          key_RELAYLOG_LOCK_commit_queue,
137                          key_RELAYLOG_LOCK_done,
138                          key_RELAYLOG_LOCK_flush_queue,
139                          key_RELAYLOG_LOCK_log,
140                          PSI_NOT_INSTRUMENTED, /* Relaylog doesn't support LOCK_binlog_end_pos */
141                          key_RELAYLOG_LOCK_sync,
142                          key_RELAYLOG_LOCK_sync_queue,
143                          key_RELAYLOG_LOCK_xids,
144                          key_RELAYLOG_COND_done,
145                          key_RELAYLOG_update_cond,
146                          key_RELAYLOG_prep_xids_cond,
147                          key_file_relaylog,
148                          key_file_relaylog_index,
149                          key_file_relaylog_cache,
150                          key_file_relaylog_index_cache);
151 #endif
152 
153   group_relay_log_name[0]= event_relay_log_name[0]=
154     group_master_log_name[0]= 0;
155   until_log_name[0]= ign_master_log_name_end[0]= 0;
156   set_timespec_nsec(&last_clock, 0);
157   memset(&cache_buf, 0, sizeof(cache_buf));
158   cached_charset_invalidate();
159   inited_hash_workers= FALSE;
160   channel_open_temp_tables.atomic_set(0);
161   /*
162     For applier threads, currently_executing_gtid is set to automatic
163     when they are not executing any transaction.
164   */
165   currently_executing_gtid.set_automatic();
166 
167   if (!rli_fake)
168   {
169     mysql_mutex_init(key_relay_log_info_log_space_lock,
170                      &log_space_lock, MY_MUTEX_INIT_FAST);
171     mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond);
172     mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
173                      MY_MUTEX_INIT_FAST);
174     mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond);
175     mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
176                    MY_MUTEX_INIT_FAST);
177     mysql_mutex_init(key_mts_temp_table_LOCK, &mts_temp_table_LOCK,
178                      MY_MUTEX_INIT_FAST);
179     mysql_mutex_init(key_mts_gaq_LOCK, &mts_gaq_LOCK,
180                      MY_MUTEX_INIT_FAST);
181     mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
182 
183     relay_log.init_pthread_objects();
184     force_flush_postponed_due_to_split_trans= false;
185   }
186   do_server_version_split(::server_version, slave_version_split);
187 
188   DBUG_VOID_RETURN;
189 }
190 
191 /**
192    The method to invoke at slave threads start
193 */
init_workers(ulong n_workers)194 void Relay_log_info::init_workers(ulong n_workers)
195 {
196   /*
197     Parallel slave parameters initialization is done regardless
198     whether the feature is or going to be active or not.
199   */
200   mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
201   mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
202   mts_total_wait_overlap= 0;
203   mts_total_wait_worker_avail= 0;
204   mts_last_online_stat= 0;
205 
206   workers.reserve(n_workers);
207   workers_array_initialized= true; //set after init
208 }
209 
210 /**
211    The method to invoke at slave threads stop
212 */
deinit_workers()213 void Relay_log_info::deinit_workers()
214 {
215   workers.clear();
216 }
217 
~Relay_log_info()218 Relay_log_info::~Relay_log_info()
219 {
220   DBUG_ENTER("Relay_log_info::~Relay_log_info");
221 
222   if(!rli_fake)
223   {
224     if (recovery_groups_inited)
225       bitmap_free(&recovery_groups);
226     delete current_mts_submode;
227 
228     if(workers_copy_pfs.size())
229     {
230       for (int i= static_cast<int>(workers_copy_pfs.size()) - 1; i >= 0; i--)
231         delete workers_copy_pfs[i];
232       workers_copy_pfs.clear();
233     }
234 
235     mysql_mutex_destroy(&log_space_lock);
236     mysql_cond_destroy(&log_space_cond);
237     mysql_mutex_destroy(&pending_jobs_lock);
238     mysql_cond_destroy(&pending_jobs_cond);
239     mysql_mutex_destroy(&exit_count_lock);
240     mysql_mutex_destroy(&mts_temp_table_LOCK);
241     mysql_mutex_destroy(&mts_gaq_LOCK);
242     mysql_cond_destroy(&logical_clock_cond);
243     relay_log.cleanup();
244   }
245 
246   set_rli_description_event(NULL);
247 
248   DBUG_VOID_RETURN;
249 }
250 
251 /**
252    Method is called when MTS coordinator senses the relay-log name
253    has been changed.
254    It marks each Worker member with this fact to make an action
255    at time it will distribute a terminal event of a group to the Worker.
256 
257    Worker receives the new name at the group commiting phase
258    @c Slave_worker::slave_worker_ends_group().
259 */
reset_notified_relay_log_change()260 void Relay_log_info::reset_notified_relay_log_change()
261 {
262   if (!is_parallel_exec())
263     return;
264   for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
265   {
266     Slave_worker *w= *it;
267     w->relay_log_change_notified= FALSE;
268   }
269 }
270 
271 /**
272    This method is called in mts_checkpoint_routine() to mark that each
273    worker is required to adapt to a new checkpoint data whose coordinates
274    are passed to it through GAQ index.
275 
276    Worker notices the new checkpoint value at the group commit to reset
277    the current bitmap and starts using the clean bitmap indexed from zero
278    of being reset checkpoint_seqno.
279 
280     New seconds_behind_master timestamp is installed.
281 
282    @param shift            number of bits to shift by Worker due to the
283                            current checkpoint change.
284    @param new_ts           new seconds_behind_master timestamp value
285                            unless zero. Zero could be due to FD event
286                            or fake rotate event.
287    @param need_data_lock   False if caller has locked @c data_lock
288    @param update_timestamp if true, this function will update the
289                            rli->last_master_timestamp.
290 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool need_data_lock,bool update_timestamp)291 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
292                                                bool need_data_lock,
293                                                bool update_timestamp)
294 {
295   /*
296     If this is not a parallel execution we return immediately.
297   */
298   if (!is_parallel_exec())
299     return;
300 
301   for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
302   {
303     Slave_worker *w= *it;
304     /*
305       Reseting the notification information in order to force workers to
306       assign jobs with the new updated information.
307       Notice that the bitmap_shifted is accumulated to indicate how many
308       consecutive jobs were successfully processed.
309 
310       The worker when assigning a new job will set the value back to
311       zero.
312     */
313     w->checkpoint_notified= FALSE;
314     w->bitmap_shifted= w->bitmap_shifted + shift;
315     /*
316       Zero shift indicates the caller rotates the master binlog.
317       The new name will be passed to W through the group descriptor
318       during the first post-rotation time scheduling.
319     */
320     if (shift == 0)
321       w->master_log_change_notified= false;
322 
323     DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
324                        "worker->bitmap_shifted --> %lu, worker --> %u.",
325                        shift, w->bitmap_shifted,
326                        static_cast<unsigned>(it - workers.begin())));
327   }
328   /*
329     There should not be a call where (shift == 0 && checkpoint_seqno != 0).
330     Then the new checkpoint sequence is updated by subtracting the number
331     of consecutive jobs that were successfully processed.
332   */
333   DBUG_ASSERT(current_mts_submode->get_type() != MTS_PARALLEL_TYPE_DB_NAME ||
334               !(shift == 0 && checkpoint_seqno != 0));
335   checkpoint_seqno= checkpoint_seqno - shift;
336   DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
337              "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
338 
339   if (update_timestamp)
340   {
341     if (need_data_lock)
342       mysql_mutex_lock(&data_lock);
343     else
344       mysql_mutex_assert_owner(&data_lock);
345     last_master_timestamp= new_ts;
346     if (need_data_lock)
347       mysql_mutex_unlock(&data_lock);
348   }
349 }
350 
351 /**
352    Reset recovery info from Worker info table and
353    mark MTS recovery is completed.
354 
355    @return false on success true when @c reset_notified_checkpoint failed.
356 */
mts_finalize_recovery()357 bool Relay_log_info::mts_finalize_recovery()
358 {
359   bool ret= false;
360   uint i;
361   uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
362 
363   DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
364 
365   for (Slave_worker **it= workers.begin(); !ret && it != workers.end(); ++it)
366   {
367     Slave_worker *w= *it;
368     ret= w->reset_recovery_info();
369     DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
370   }
371   /*
372     The loop is traversed in the worker index descending order due
373     to specifics of the Worker table repository that does not like
374     even temporary holes. Therefore stale records are deleted
375     from the tail.
376   */
377   DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
378                   {DBUG_SET("+d,mts_worker_thread_init_fails");});
379   for (i= recovery_parallel_workers; i > workers.size() && !ret; i--)
380   {
381     Slave_worker *w=
382       Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
383     /*
384       If an error occurs during the above create_worker call, the newly created
385       worker object gets deleted within the above function call itself and only
386       NULL is returned. Hence the following check has been added to verify
387       that a valid worker object exists.
388     */
389     if (w)
390     {
391       ret= w->remove_info();
392       delete w;
393     }
394     else
395     {
396       ret= true;
397       goto err;
398     }
399   }
400   recovery_parallel_workers= slave_parallel_workers;
401 
402 err:
403   DBUG_RETURN(ret);
404 }
405 
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)406 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
407 {
408   MY_STAT s;
409   DBUG_ENTER("add_relay_log");
410   mysql_mutex_assert_owner(&rli->log_space_lock);
411   if (!mysql_file_stat(key_file_relaylog,
412                        linfo->log_file_name, &s, MYF(0)))
413   {
414     sql_print_error("log %s listed in the index, but failed to stat.",
415                     linfo->log_file_name);
416     DBUG_RETURN(1);
417   }
418   rli->log_space_total += s.st_size;
419 #ifndef DBUG_OFF
420   char buf[22];
421   DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
422 #endif
423   DBUG_RETURN(0);
424 }
425 
count_relay_log_space()426 int Relay_log_info::count_relay_log_space()
427 {
428   LOG_INFO flinfo;
429   DBUG_ENTER("Relay_log_info::count_relay_log_space");
430   Mutex_lock lock(&log_space_lock);
431   log_space_total= 0;
432   if (relay_log.find_log_pos(&flinfo, NullS, 1))
433   {
434     sql_print_error("Could not find first log while counting relay log space.");
435     DBUG_RETURN(1);
436   }
437   do
438   {
439     if (add_relay_log(this, &flinfo))
440       DBUG_RETURN(1);
441   } while (!relay_log.find_next_log(&flinfo, 1));
442   /*
443      As we have counted everything, including what may have written in a
444      preceding write, we must reset bytes_written, or we may count some space
445      twice.
446   */
447   relay_log.reset_bytes_written();
448   DBUG_RETURN(0);
449 }
450 
451 /**
452    Resets UNTIL condition for Relay_log_info
453  */
454 
clear_until_condition()455 void Relay_log_info::clear_until_condition()
456 {
457   DBUG_ENTER("clear_until_condition");
458 
459   until_condition= Relay_log_info::UNTIL_NONE;
460   until_log_name[0]= 0;
461   until_log_pos= 0;
462   until_sql_gtids.clear();
463   until_sql_gtids_first_event= true;
464   DBUG_VOID_RETURN;
465 }
466 
467 /**
468   Opens and intialize the given relay log. Specifically, it does what follows:
469 
470   - Closes old open relay log files.
471   - If we are using the same relay log as the running IO-thread, then sets.
472     rli->cur_log to point to the same IO_CACHE entry.
473   - If not, opens the 'log' binary file.
474 
475   @todo check proper initialization of
476   group_master_log_name/group_master_log_pos. /alfranio
477 
478   @param rli[in] Relay information (will be initialized)
479   @param log[in] Name of relay log file to read from. NULL = First log
480   @param pos[in] Position in relay log file
481   @param need_data_lock[in] If true, this function will acquire the
482   relay_log.data_lock(); otherwise the caller should already have
483   acquired it.
484   @param errmsg[out] On error, this function will store a pointer to
485   an error message here
486   @param keep_looking_for_fd[in] If true, this function will
487   look for a Format_description_log_event.  We only need this when the
488   SQL thread starts and opens an existing relay log and has to execute
489   it (possibly from an offset >4); then we need to read the first
490   event of the relay log to be able to parse the events we have to
491   execute.
492 
493   @retval 0 ok,
494   @retval 1 error.  In this case, *errmsg is set to point to the error
495   message.
496 */
497 
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)498 int Relay_log_info::init_relay_log_pos(const char* log,
499                                        ulonglong pos, bool need_data_lock,
500                                        const char** errmsg,
501                                        bool keep_looking_for_fd)
502 {
503   DBUG_ENTER("Relay_log_info::init_relay_log_pos");
504   DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
505 
506   *errmsg=0;
507   const char* errmsg_fmt= 0;
508   static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
509   mysql_mutex_t *log_lock= relay_log.get_log_lock();
510 
511   if (need_data_lock)
512     mysql_mutex_lock(&data_lock);
513   else
514     mysql_mutex_assert_owner(&data_lock);
515 
516   /*
517     By default the relay log is in binlog format 3 (4.0).
518     Even if format is 4, this will work enough to read the first event
519     (Format_desc) (remember that format 4 is just lenghtened compared to format
520     3; format 3 is a prefix of format 4).
521   */
522   set_rli_description_event(new Format_description_log_event(3));
523 
524   mysql_mutex_lock(log_lock);
525 
526   /* Close log file and free buffers if it's already open */
527   if (cur_log_fd >= 0)
528   {
529     end_io_cache(&cache_buf);
530     mysql_file_close(cur_log_fd, MYF(MY_WME));
531     cur_log_fd = -1;
532   }
533 
534   group_relay_log_pos= event_relay_log_pos= pos;
535 
536   /*
537     Test to see if the previous run was with the skip of purging
538     If yes, we do not purge when we restart
539   */
540   if (relay_log.find_log_pos(&linfo, NullS, 1))
541   {
542     *errmsg="Could not find first log during relay log initialization";
543     goto err;
544   }
545 
546   if (log && relay_log.find_log_pos(&linfo, log, 1))
547   {
548     errmsg_fmt= "Could not find target log file mentioned in "
549                 "relay log info in the index file '%s' during "
550                 "relay log initialization";
551     sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
552     *errmsg= errmsg_buff;
553     goto err;
554   }
555 
556   set_group_relay_log_name(linfo.log_file_name);
557   set_event_relay_log_name(linfo.log_file_name);
558 
559   if (relay_log.is_active(linfo.log_file_name))
560   {
561     /*
562       The IO thread is using this log file.
563       In this case, we will use the same IO_CACHE pointer to
564       read data as the IO thread is using to write data.
565     */
566     my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
567     if (check_binlog_magic(cur_log, errmsg))
568       goto err;
569     cur_log_old_open_count=relay_log.get_open_count();
570   }
571   else
572   {
573     /*
574       Open the relay log and set cur_log to point at this one
575     */
576     if ((cur_log_fd=open_binlog_file(&cache_buf,
577                                      linfo.log_file_name,errmsg)) < 0)
578       goto err;
579     cur_log = &cache_buf;
580   }
581   /*
582     In all cases, check_binlog_magic() has been called so we're at offset 4 for
583     sure.
584   */
585   if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
586   {
587     Log_event* ev;
588     while (keep_looking_for_fd)
589     {
590       /*
591         Read the possible Format_description_log_event; if position
592         was 4, no need, it will be read naturally.
593       */
594       DBUG_PRINT("info",("looking for a Format_description_log_event"));
595 
596       if (my_b_tell(cur_log) >= pos)
597         break;
598 
599       /*
600         Because of we have data_lock and log_lock, we can safely read an
601         event
602       */
603       if (!(ev= Log_event::read_log_event(cur_log, 0,
604                                           rli_description_event,
605                                           opt_slave_sql_verify_checksum)))
606       {
607         DBUG_PRINT("info",("could not read event, cur_log->error=%d",
608                            cur_log->error));
609         if (cur_log->error) /* not EOF */
610         {
611           *errmsg= "I/O error reading event at position 4";
612           goto err;
613         }
614         break;
615       }
616       else if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT)
617       {
618         DBUG_PRINT("info",("found Format_description_log_event"));
619         set_rli_description_event((Format_description_log_event *)ev);
620         /*
621           As ev was returned by read_log_event, it has passed is_valid(), so
622           my_malloc() in ctor worked, no need to check again.
623         */
624         /*
625           Ok, we found a Format_description event. But it is not sure that this
626           describes the whole relay log; indeed, one can have this sequence
627           (starting from position 4):
628           Format_desc (of slave)
629           Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
630           Rotate (of master)
631           Format_desc (of master)
632           So the Format_desc which really describes the rest of the relay log
633           can be the 3rd or the 4th event (depending on GTIDs being enabled or
634           not, it can't be further than that, because we rotate
635           the relay log when we queue a Rotate event from the master).
636           But what describes the Rotate is the first Format_desc.
637           So what we do is:
638           go on searching for Format_description events, until you exceed the
639           position (argument 'pos') or until you find an event other than
640           Previous-GTIDs, Rotate or Format_desc.
641         */
642       }
643       else
644       {
645         DBUG_PRINT("info",("found event of another type=%d",
646                            ev->get_type_code()));
647         keep_looking_for_fd=
648           (ev->get_type_code() == binary_log::ROTATE_EVENT ||
649            ev->get_type_code() == binary_log::PREVIOUS_GTIDS_LOG_EVENT);
650         delete ev;
651       }
652     }
653     my_b_seek(cur_log,(off_t)pos);
654 #ifndef DBUG_OFF
655   {
656     char llbuf1[22], llbuf2[22];
657     DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
658                         llstr(my_b_tell(cur_log),llbuf1),
659                         llstr(get_event_relay_log_pos(),llbuf2)));
660   }
661 #endif
662 
663   }
664 
665 err:
666   /*
667     If we don't purge, we can't honour relay_log_space_limit ;
668     silently discard it
669   */
670   if (!relay_log_purge)
671   {
672     log_space_limit= 0; // todo: consider to throw a warning at least
673   }
674   mysql_cond_broadcast(&data_cond);
675 
676   mysql_mutex_unlock(log_lock);
677 
678   if (need_data_lock)
679     mysql_mutex_unlock(&data_lock);
680   if (!rli_description_event->is_valid() && !*errmsg)
681     *errmsg= "Invalid Format_description log event; could be out of memory";
682 
683   DBUG_RETURN ((*errmsg) ? 1 : 0);
684 }
685 
686 /**
687   Update the error number, message and timestamp fields. This function is
688   different from va_report() as va_report() also logs the error message in the
689   log apart from updating the error fields.
690 
691   SYNOPSIS
692   @param[in]  level          specifies the level- error, warning or information,
693   @param[in]  err_code       error number,
694   @param[in]  buff_coord     error message to be used.
695 
696 */
fill_coord_err_buf(loglevel level,int err_code,const char * buff_coord) const697 void Relay_log_info::fill_coord_err_buf(loglevel level, int err_code,
698                                       const char *buff_coord) const
699 {
700   mysql_mutex_lock(&err_lock);
701 
702   if(level == ERROR_LEVEL)
703   {
704     m_last_error.number = err_code;
705     my_snprintf(m_last_error.message, sizeof(m_last_error.message), "%.*s",
706                 MAX_SLAVE_ERRMSG - 1, buff_coord);
707     m_last_error.update_timestamp();
708   }
709 
710   mysql_mutex_unlock(&err_lock);
711 }
712 
713 /**
714   Waits until the SQL thread reaches (has executed up to) the
715   log/position or timed out.
716 
717   SYNOPSIS
718   @param[in]  thd             client thread that sent @c SELECT @c MASTER_POS_WAIT,
719   @param[in]  log_name        log name to wait for,
720   @param[in]  log_pos         position to wait for,
721   @param[in]  timeout         @c timeout in seconds before giving up waiting.
722                               @c timeout is double whereas it should be ulong; but this is
723                               to catch if the user submitted a negative timeout.
724 
725   @retval  -2   improper arguments (log_pos<0)
726                 or slave not running, or master info changed
727                 during the function's execution,
728                 or client thread killed. -2 is translated to NULL by caller,
729   @retval  -1   timed out
730   @retval  >=0  number of log events the function had to wait
731                 before reaching the desired log/position
732  */
733 
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)734 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
735                                     longlong log_pos,
736                                     double timeout)
737 {
738   int event_count = 0;
739   ulong init_abort_pos_wait;
740   int error=0;
741   struct timespec abstime; // for timeout checking
742   PSI_stage_info old_stage;
743   DBUG_ENTER("Relay_log_info::wait_for_pos");
744 
745   if (!inited)
746     DBUG_RETURN(-2);
747 
748   DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
749                       log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
750 
751   DEBUG_SYNC(thd, "begin_master_pos_wait");
752 
753   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
754   mysql_mutex_lock(&data_lock);
755   thd->ENTER_COND(&data_cond, &data_lock,
756                   &stage_waiting_for_the_slave_thread_to_advance_position,
757                   &old_stage);
758   /*
759      This function will abort when it notices that some CHANGE MASTER or
760      RESET MASTER has changed the master info.
761      To catch this, these commands modify abort_pos_wait ; We just monitor
762      abort_pos_wait and see if it has changed.
763      Why do we have this mechanism instead of simply monitoring slave_running
764      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
765      the SQL thread be stopped?
766      This is becasue if someones does:
767      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
768      the change may happen very quickly and we may not notice that
769      slave_running briefly switches between 1/0/1.
770   */
771   init_abort_pos_wait= abort_pos_wait;
772 
773   /*
774     We'll need to
775     handle all possible log names comparisons (e.g. 999 vs 1000).
776     We use ulong for string->number conversion ; this is no
777     stronger limitation than in find_uniq_filename in sql/log.cc
778   */
779   ulong log_name_extension;
780   char log_name_tmp[FN_REFLEN]; //make a char[] from String
781 
782   strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
783 
784   char *p= fn_ext(log_name_tmp);
785   char *p_end;
786   if (!*p || log_pos<0)
787   {
788     error= -2; //means improper arguments
789     goto err;
790   }
791   // Convert 0-3 to 4
792   log_pos= max(log_pos, static_cast<longlong>(BIN_LOG_HEADER_SIZE));
793   /* p points to '.' */
794   log_name_extension= strtoul(++p, &p_end, 10);
795   /*
796     p_end points to the first invalid character.
797     If it equals to p, no digits were found, error.
798     If it contains '\0' it means conversion went ok.
799   */
800   if (p_end==p || *p_end)
801   {
802     error= -2;
803     goto err;
804   }
805 
806   /* The "compare and wait" main loop */
807   while (!thd->killed &&
808          init_abort_pos_wait == abort_pos_wait &&
809          slave_running)
810   {
811     bool pos_reached;
812     int cmp_result= 0;
813 
814     DBUG_PRINT("info",
815                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
816                 init_abort_pos_wait, abort_pos_wait));
817     DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
818                        group_master_log_name, (ulong) group_master_log_pos));
819 
820     /*
821       group_master_log_name can be "", if we are just after a fresh
822       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
823       (before we have executed one Rotate event from the master) or
824       (rare) if the user is doing a weird slave setup (see next
825       paragraph).  If group_master_log_name is "", we assume we don't
826       have enough info to do the comparison yet, so we just wait until
827       more data. In this case master_log_pos is always 0 except if
828       somebody (wrongly) sets this slave to be a slave of itself
829       without using --replicate-same-server-id (an unsupported
830       configuration which does nothing), then group_master_log_pos
831       will grow and group_master_log_name will stay "".
832       Also in case the group master log position is invalid (e.g. after
833       CHANGE MASTER TO RELAY_LOG_POS ), we will wait till the first event
834       is read and the log position is valid again.
835     */
836     if (*group_master_log_name && !is_group_master_log_pos_invalid)
837     {
838       char *basename= (group_master_log_name +
839                        dirname_length(group_master_log_name));
840       /*
841         First compare the parts before the extension.
842         Find the dot in the master's log basename,
843         and protect against user's input error :
844         if the names do not match up to '.' included, return error
845       */
846       char *q= (fn_ext(basename)+1);
847       if (strncmp(basename, log_name_tmp, (int)(q-basename)))
848       {
849         error= -2;
850         break;
851       }
852       // Now compare extensions.
853       char *q_end;
854       ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
855       if (group_master_log_name_extension < log_name_extension)
856         cmp_result= -1 ;
857       else
858         cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
859 
860       pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
861                     cmp_result > 0);
862       if (pos_reached || thd->killed)
863         break;
864     }
865 
866     //wait for master update, with optional timeout.
867 
868     DBUG_PRINT("info",("Waiting for master update"));
869     /*
870       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
871       will wake us up.
872     */
873     thd_wait_begin(thd, THD_WAIT_BINLOG);
874     if (timeout > 0)
875     {
876       /*
877         Note that mysql_cond_timedwait checks for the timeout
878         before for the condition ; i.e. it returns ETIMEDOUT
879         if the system time equals or exceeds the time specified by abstime
880         before the condition variable is signaled or broadcast, _or_ if
881         the absolute time specified by abstime has already passed at the time
882         of the call.
883         For that reason, mysql_cond_timedwait will do the "timeoutting" job
884         even if its condition is always immediately signaled (case of a loaded
885         master).
886       */
887       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
888     }
889     else
890       mysql_cond_wait(&data_cond, &data_lock);
891     thd_wait_end(thd);
892     DBUG_PRINT("info",("Got signal of master update or timed out"));
893     if (error == ETIMEDOUT || error == ETIME)
894     {
895 #ifndef DBUG_OFF
896       /*
897         Doing this to generate a stack trace and make debugging
898         easier.
899       */
900       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
901         DBUG_ASSERT(0);
902 #endif
903       error= -1;
904       break;
905     }
906     error=0;
907     event_count++;
908     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
909   }
910 
911 err:
912   mysql_mutex_unlock(&data_lock);
913   thd->EXIT_COND(&old_stage);
914   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
915 improper_arguments: %d  timed_out: %d",
916                      thd->killed_errno(),
917                      (int) (init_abort_pos_wait != abort_pos_wait),
918                      (int) slave_running,
919                      (int) (error == -2),
920                      (int) (error == -1)));
921   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
922       !slave_running)
923   {
924     error= -2;
925   }
926   DBUG_RETURN( error ? error : event_count );
927 }
928 
wait_for_gtid_set(THD * thd,String * gtid,double timeout)929 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
930                                       double timeout)
931 {
932   DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, String, timeout)");
933 
934   DBUG_PRINT("info", ("Waiting for %s timeout %lf", gtid->c_ptr_safe(),
935              timeout));
936 
937   Gtid_set wait_gtid_set(global_sid_map);
938   global_sid_lock->rdlock();
939 
940   if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
941   {
942     global_sid_lock->unlock();
943     DBUG_PRINT("exit",("improper gtid argument"));
944     DBUG_RETURN(-2);
945 
946   }
947   global_sid_lock->unlock();
948 
949   DBUG_RETURN(wait_for_gtid_set(thd, &wait_gtid_set, timeout));
950 }
951 
952 /*
953   TODO: This is a duplicated code that needs to be simplified.
954   This will be done while developing all possible sync options.
955   See WL#3584's specification.
956 
957   /Alfranio
958 */
wait_for_gtid_set(THD * thd,const Gtid_set * wait_gtid_set,double timeout)959 int Relay_log_info::wait_for_gtid_set(THD* thd, const Gtid_set* wait_gtid_set,
960                                       double timeout)
961 {
962   int event_count = 0;
963   ulong init_abort_pos_wait;
964   int error=0;
965   struct timespec abstime; // for timeout checking
966   PSI_stage_info old_stage;
967   DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, gtid_set, timeout)");
968 
969   if (!inited)
970     DBUG_RETURN(-2);
971 
972   DEBUG_SYNC(thd, "begin_wait_for_gtid_set");
973 
974   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
975 
976   mysql_mutex_lock(&data_lock);
977   thd->ENTER_COND(&data_cond, &data_lock,
978                   &stage_waiting_for_the_slave_thread_to_advance_position,
979                   &old_stage);
980   /*
981      This function will abort when it notices that some CHANGE MASTER or
982      RESET MASTER has changed the master info.
983      To catch this, these commands modify abort_pos_wait ; We just monitor
984      abort_pos_wait and see if it has changed.
985      Why do we have this mechanism instead of simply monitoring slave_running
986      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
987      the SQL thread be stopped?
988      This is becasue if someones does:
989      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
990      the change may happen very quickly and we may not notice that
991      slave_running briefly switches between 1/0/1.
992   */
993   init_abort_pos_wait= abort_pos_wait;
994 
995   /* The "compare and wait" main loop */
996   while (!thd->killed &&
997          init_abort_pos_wait == abort_pos_wait &&
998          slave_running)
999   {
1000     DBUG_PRINT("info",
1001                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
1002                 init_abort_pos_wait, abort_pos_wait));
1003 
1004     //wait for master update, with optional timeout.
1005 
1006     global_sid_lock->wrlock();
1007     const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1008     const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
1009 
1010     char *wait_gtid_set_buf;
1011     wait_gtid_set->to_string(&wait_gtid_set_buf);
1012     DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
1013                         "!is_intersection_nonempty: %d",
1014                         wait_gtid_set_buf,
1015                         wait_gtid_set->is_subset(executed_gtids),
1016                         !owned_gtids->is_intersection_nonempty(wait_gtid_set)));
1017     my_free(wait_gtid_set_buf);
1018     executed_gtids->dbug_print("gtid_executed:");
1019     owned_gtids->dbug_print("owned_gtids:");
1020 
1021     /*
1022       Since commit is performed after log to binary log, we must also
1023       check if any GTID of wait_gtid_set is not yet committed.
1024     */
1025     if (wait_gtid_set->is_subset(executed_gtids) &&
1026         !owned_gtids->is_intersection_nonempty(wait_gtid_set))
1027     {
1028       global_sid_lock->unlock();
1029       break;
1030     }
1031     global_sid_lock->unlock();
1032 
1033     DBUG_PRINT("info",("Waiting for master update"));
1034 
1035     /*
1036       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
1037       will wake us up.
1038     */
1039     thd_wait_begin(thd, THD_WAIT_BINLOG);
1040     if (timeout > 0)
1041     {
1042       /*
1043         Note that mysql_cond_timedwait checks for the timeout
1044         before for the condition ; i.e. it returns ETIMEDOUT
1045         if the system time equals or exceeds the time specified by abstime
1046         before the condition variable is signaled or broadcast, _or_ if
1047         the absolute time specified by abstime has already passed at the time
1048         of the call.
1049         For that reason, mysql_cond_timedwait will do the "timeoutting" job
1050         even if its condition is always immediately signaled (case of a loaded
1051         master).
1052       */
1053       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
1054     }
1055     else
1056       mysql_cond_wait(&data_cond, &data_lock);
1057     thd_wait_end(thd);
1058     DBUG_PRINT("info",("Got signal of master update or timed out"));
1059     if (error == ETIMEDOUT || error == ETIME)
1060     {
1061 #ifndef DBUG_OFF
1062       /*
1063         Doing this to generate a stack trace and make debugging
1064         easier.
1065       */
1066       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
1067         DBUG_ASSERT(0);
1068 #endif
1069       error= -1;
1070       break;
1071     }
1072     error=0;
1073     event_count++;
1074     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
1075   }
1076 
1077   mysql_mutex_unlock(&data_lock);
1078   thd->EXIT_COND(&old_stage);
1079   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
1080 improper_arguments: %d  timed_out: %d",
1081                      thd->killed_errno(),
1082                      (int) (init_abort_pos_wait != abort_pos_wait),
1083                      (int) slave_running,
1084                      (int) (error == -2),
1085                      (int) (error == -1)));
1086   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
1087       !slave_running)
1088   {
1089     error= -2;
1090   }
1091   DBUG_RETURN( error ? error : event_count );
1092 }
1093 
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)1094 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
1095                                             bool need_data_lock)
1096 {
1097   int error= 0;
1098   DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
1099 
1100   if (need_data_lock)
1101     mysql_mutex_lock(&data_lock);
1102   else
1103     mysql_mutex_assert_owner(&data_lock);
1104 
1105   inc_event_relay_log_pos();
1106   group_relay_log_pos= event_relay_log_pos;
1107   strmake(group_relay_log_name,event_relay_log_name,
1108           sizeof(group_relay_log_name)-1);
1109 
1110   notify_group_relay_log_name_update();
1111 
1112   /*
1113     In 4.x we used the event's len to compute the positions here. This is
1114     wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1115     then the event's len is not what is was in the master's binlog, so this
1116     will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1117     replication: Exec_master_log_pos is wrong). Only way to solve this is to
1118     have the original offset of the end of the event the relay log. This is
1119     what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1120     of log_pos in 4.0 was to compute the end_log_pos; so better to store
1121     end_log_pos instead of begin_log_pos.
1122     If we had not done this fix here, the problem would also have appeared
1123     when the slave and master are 5.0 but with different event length (for
1124     example the slave is more recent than the master and features the event
1125     UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1126     SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1127     value which would lead to badly broken replication.
1128     Even the relay_log_pos will be corrupted in this case, because the len is
1129     the relay log is not "val".
1130     With the end_log_pos solution, we avoid computations involving lengthes.
1131   */
1132   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
1133                       (long) log_pos, (long) group_master_log_pos));
1134 
1135   if (log_pos > 0)  // 3.23 binlogs don't have log_posx
1136     group_master_log_pos= log_pos;
1137   /*
1138     If the master log position was invalidiated by say, "CHANGE MASTER TO
1139     RELAY_LOG_POS=N", it is now valid,
1140    */
1141   if (is_group_master_log_pos_invalid)
1142     is_group_master_log_pos_invalid= false;
1143 
1144   /*
1145     In MTS mode FD or Rotate event commit their solitary group to
1146     Coordinator's info table. Callers make sure that Workers have been
1147     executed all assignements.
1148     Broadcast to master_pos_wait() waiters should be done after
1149     the table is updated.
1150   */
1151   DBUG_ASSERT(!is_parallel_exec() ||
1152               mts_group_status != Relay_log_info::MTS_IN_GROUP);
1153   /*
1154     We do not force synchronization at this point, note the
1155     parameter false, because a non-transactional change is
1156     being committed.
1157 
1158     For that reason, the synchronization here is subjected to
1159     the option sync_relay_log_info.
1160 
1161     See sql/rpl_rli.h for further information on this behavior.
1162   */
1163   error= flush_info(FALSE);
1164 
1165   mysql_cond_broadcast(&data_cond);
1166   if (need_data_lock)
1167     mysql_mutex_unlock(&data_lock);
1168   DBUG_RETURN(error);
1169 }
1170 
1171 
close_temporary_tables()1172 void Relay_log_info::close_temporary_tables()
1173 {
1174   TABLE *table,*next;
1175   int num_closed_temp_tables= 0;
1176   DBUG_ENTER("Relay_log_info::close_temporary_tables");
1177 
1178   for (table=save_temporary_tables ; table ; table=next)
1179   {
1180     next=table->next;
1181     /*
1182       Don't ask for disk deletion. For now, anyway they will be deleted when
1183       slave restarts, but it is a better intention to not delete them.
1184     */
1185     DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1186     close_temporary(table, 1, 0);
1187     num_closed_temp_tables++;
1188   }
1189   save_temporary_tables= 0;
1190   slave_open_temp_tables.atomic_add(-num_closed_temp_tables);
1191   channel_open_temp_tables.atomic_add(-num_closed_temp_tables);
1192   DBUG_VOID_RETURN;
1193 }
1194 
1195 /**
1196   Purges relay logs. It assumes to have a run lock on rli and that no
1197   slave thread are running.
1198 
1199   @param[in]   THD         connection,
1200   @param[in]   just_reset  if false, it tells that logs should be purged
1201                            and @c init_relay_log_pos() should be called,
1202   @errmsg[out] errmsg      store pointer to an error message.
1203 
1204   @retval 0 successfuly executed,
1205   @retval 1 otherwise error, where errmsg is set to point to the error message.
1206 */
1207 
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg,bool delete_only)1208 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1209                                      const char** errmsg, bool delete_only)
1210 {
1211   int error=0;
1212   const char *ln;
1213   /* name of the index file if opt_relaylog_index_name is set*/
1214   const char* log_index_name;
1215   /*
1216     Buffer to add channel name suffix when relay-log-index option is
1217     provided
1218    */
1219   char relay_bin_index_channel[FN_REFLEN];
1220 
1221   const char *ln_without_channel_name;
1222   /*
1223     Buffer to add channel name suffix when relay-log option is provided.
1224    */
1225   char relay_bin_channel[FN_REFLEN];
1226 
1227   char buffer[FN_REFLEN];
1228 
1229   mysql_mutex_t *log_lock= relay_log.get_log_lock();
1230 
1231   DBUG_ENTER("Relay_log_info::purge_relay_logs");
1232 
1233   /*
1234     Even if inited==0, we still try to empty master_log_* variables. Indeed,
1235     inited==0 does not imply that they already are empty.
1236 
1237     It could be that slave's info initialization partly succeeded: for example
1238     if relay-log.info existed but *relay-bin*.* have been manually removed,
1239     init_info reads the old relay-log.info and fills rli->master_log_*, then
1240     init_info checks for the existence of the relay log, this fails and
1241     init_info leaves inited to 0.
1242     In that pathological case, master_log_pos* will be properly reinited at
1243     the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1244     purge_relay_logs, will delete bogus *.info files or replace them with
1245     correct files), however if the user does SHOW SLAVE STATUS before START
1246     SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1247     master_log_* for SHOW SLAVE STATUS to display fine in any case.
1248   */
1249   group_master_log_name[0]= 0;
1250   group_master_log_pos= 0;
1251 
1252   /*
1253     Following the the relay log purge, the master_log_pos will be in sync
1254     with relay_log_pos, so the flag should be cleared. Refer bug#11766010.
1255   */
1256 
1257   is_group_master_log_pos_invalid= false;
1258 
1259   if (!inited)
1260   {
1261     DBUG_PRINT("info", ("inited == 0"));
1262     if (error_on_rli_init_info ||
1263         /*
1264           mi->reset means that the channel was reset but still exists. Channel
1265           shall have the index and the first relay log file.
1266 
1267           Those files shall be remove in a following RESET SLAVE ALL (even when
1268           channel was not inited again).
1269         */
1270         (mi->reset && delete_only))
1271     {
1272       ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
1273                                                        "-relay-bin", buffer);
1274 
1275       ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1276                                         ln_without_channel_name);
1277       if (opt_relaylog_index_name)
1278       {
1279         char index_file_withoutext[FN_REFLEN];
1280         relay_log.generate_name(opt_relaylog_index_name,"",
1281                                 index_file_withoutext);
1282 
1283         log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
1284                                                       FN_REFLEN,
1285                                                       index_file_withoutext);
1286       }
1287       else
1288         log_index_name= 0;
1289 
1290       if (relay_log.open_index_file(log_index_name, ln, TRUE))
1291       {
1292         sql_print_error("Unable to purge relay log files. Failed to open relay "
1293                         "log index file:%s.", relay_log.get_index_fname());
1294         DBUG_RETURN(1);
1295       }
1296       mysql_mutex_lock(&mi->data_lock);
1297       mysql_mutex_lock(log_lock);
1298       if (relay_log.open_binlog(ln, 0,
1299                                 (max_relay_log_size ? max_relay_log_size :
1300                                  max_binlog_size), true,
1301                                 true/*need_lock_index=true*/,
1302                                 true/*need_sid_lock=true*/,
1303                                 mi->get_mi_description_event()))
1304       {
1305         mysql_mutex_unlock(log_lock);
1306         mysql_mutex_unlock(&mi->data_lock);
1307         sql_print_error("Unable to purge relay log files. Failed to open relay "
1308                         "log file:%s.", relay_log.get_log_fname());
1309         DBUG_RETURN(1);
1310       }
1311       mysql_mutex_unlock(log_lock);
1312       mysql_mutex_unlock(&mi->data_lock);
1313     }
1314     else
1315       DBUG_RETURN(0);
1316   }
1317   else
1318   {
1319     DBUG_ASSERT(slave_running == 0);
1320     DBUG_ASSERT(mi->slave_running == 0);
1321   }
1322   /* Reset the transaction boundary parser and clear the last GTID queued */
1323   mi->transaction_parser.reset();
1324   mi->clear_last_gtid_queued();
1325 
1326   slave_skip_counter= 0;
1327   mysql_mutex_lock(&data_lock);
1328 
1329   /*
1330     we close the relay log fd possibly left open by the slave SQL thread,
1331     to be able to delete it; the relay log fd possibly left open by the slave
1332     I/O thread will be closed naturally in reset_logs() by the
1333     close(LOG_CLOSE_TO_BE_OPENED) call
1334   */
1335   if (cur_log_fd >= 0)
1336   {
1337     end_io_cache(&cache_buf);
1338     my_close(cur_log_fd, MYF(MY_WME));
1339     cur_log_fd= -1;
1340   }
1341 
1342   /**
1343     Clear the retrieved gtid set for this channel.
1344     global_sid_lock->wrlock() is needed.
1345   */
1346   global_sid_lock->wrlock();
1347   (const_cast<Gtid_set *>(get_gtid_set()))->clear();
1348   global_sid_lock->unlock();
1349 
1350   if (relay_log.reset_logs(thd, delete_only))
1351   {
1352     *errmsg = "Failed during log reset";
1353     error=1;
1354     goto err;
1355   }
1356 
1357   /* Save name of used relay log file */
1358   set_group_relay_log_name(relay_log.get_log_fname());
1359   set_event_relay_log_name(relay_log.get_log_fname());
1360   group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1361   if (!delete_only && count_relay_log_space())
1362   {
1363     *errmsg= "Error counting relay log space";
1364     error= 1;
1365     goto err;
1366   }
1367   if (!just_reset)
1368     error= init_relay_log_pos(group_relay_log_name,
1369                               group_relay_log_pos,
1370                               false/*need_data_lock=false*/, errmsg, 0);
1371   if (!inited && error_on_rli_init_info)
1372     relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1373                     true/*need_lock_log=true*/,
1374                     true/*need_lock_index=true*/);
1375 err:
1376 #ifndef DBUG_OFF
1377   char buf[22];
1378 #endif
1379   DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1380   mysql_mutex_unlock(&data_lock);
1381   DBUG_RETURN(error);
1382 }
1383 
1384 /*
1385    When --relay-bin option is not provided, the names of the
1386    relay log files are host-relay-bin.0000x or
1387    host-relay-bin-CHANNEL.00000x in the case of MSR.
1388    However, if that option is provided, then the names of the
1389    relay log files are <relay-bin-option>.0000x or
1390    <relay-bin-option>-CHANNEL.00000x in the case of MSR.
1391 
1392    The function adds a channel suffix (according to the channel to file name
1393    conventions and conversions) to the relay log file.
1394 
1395    @todo: truncate the log file if length exceeds.
1396 */
1397 
1398 const char*
add_channel_to_relay_log_name(char * buff,uint buff_size,const char * base_name)1399 Relay_log_info::add_channel_to_relay_log_name(char *buff, uint buff_size,
1400                                               const char *base_name)
1401 {
1402   char *ptr;
1403   char channel_to_file[FN_REFLEN];
1404   uint errors, length;
1405   uint base_name_len;
1406   uint suffix_buff_size;
1407 
1408   DBUG_ASSERT(base_name !=NULL);
1409 
1410   base_name_len= strlen(base_name);
1411   suffix_buff_size= buff_size - base_name_len;
1412 
1413   ptr= strmake(buff, base_name, buff_size-1);
1414 
1415   if (channel[0])
1416   {
1417 
1418    /* adding a "-" */
1419     ptr= strmake(ptr, "-", suffix_buff_size-1);
1420 
1421     /*
1422       Convert the channel name to the file names charset.
1423       Channel name is in system_charset which is UTF8_general_ci
1424       as it was defined as utf8 in the mysql.slaveinfo tables.
1425     */
1426     length= strconvert(system_charset_info, channel, &my_charset_filename,
1427                        channel_to_file, NAME_LEN, &errors);
1428     ptr= strmake(ptr, channel_to_file, suffix_buff_size-length-1);
1429 
1430   }
1431 
1432   return (const char*)buff;
1433 }
1434 
1435 
1436 /**
1437      Checks if condition stated in UNTIL clause of START SLAVE is reached.
1438 
1439      Specifically, it checks if UNTIL condition is reached. Uses caching result
1440      of last comparison of current log file name and target log file name. So
1441      cached value should be invalidated if current log file name changes (see
1442      @c Relay_log_info::notify_... functions).
1443 
1444      This caching is needed to avoid of expensive string comparisons and
1445      @c strtol() conversions needed for log names comparison. We don't need to
1446      compare them each time this function is called, we only need to do this
1447      when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1448      need to do this only after @c Rotate_log_event::do_apply_event() (which is
1449      rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1450      condition then we should invalidate cached comarison value after
1451      @c inc_group_relay_log_pos() which called for each group of events (so we
1452      have some benefit if we have something like queries that use
1453      autoincrement or if we have transactions).
1454 
1455      Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1456 
1457      @param master_beg_pos    position of the beginning of to be executed event
1458                               (not @c log_pos member of the event that points to
1459                               the beginning of the following event)
1460 
1461      @retval true   condition met or error happened (condition seems to have
1462                     bad log file name),
1463      @retval false  condition not met.
1464 */
1465 
is_until_satisfied(THD * thd,Log_event * ev)1466 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1467 {
1468   char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1469                     "condition is bad.";
1470   DBUG_ENTER("Relay_log_info::is_until_satisfied");
1471 
1472   switch (until_condition)
1473   {
1474   case UNTIL_MASTER_POS:
1475   case UNTIL_RELAY_POS:
1476   {
1477     const char *log_name= NULL;
1478     ulonglong log_pos= 0;
1479 
1480     if (until_condition == UNTIL_MASTER_POS)
1481     {
1482       if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1483         DBUG_RETURN(false);
1484       /*
1485         Rotate events originating from the slave have server_id==0,
1486         and their log_pos is relative to the slave, so in case their
1487         log_pos is greater than the log_pos we are waiting for, they
1488         can cause the slave to stop prematurely. So we ignore such
1489         events.
1490       */
1491       if (ev && ev->server_id == 0)
1492         DBUG_RETURN(false);
1493       log_name= group_master_log_name;
1494       if (!ev || is_in_group() || !ev->common_header->log_pos)
1495         log_pos= group_master_log_pos;
1496       else
1497         log_pos= ev->common_header->log_pos - ev->common_header->data_written;
1498     }
1499     else
1500     { /* until_condition == UNTIL_RELAY_POS */
1501       log_name= group_relay_log_name;
1502       log_pos= group_relay_log_pos;
1503     }
1504 
1505 #ifndef DBUG_OFF
1506     {
1507       char buf[32];
1508       DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1509                           group_master_log_name, llstr(group_master_log_pos, buf)));
1510       DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1511                           group_relay_log_name, llstr(group_relay_log_pos, buf)));
1512       DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1513                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1514                           log_name, llstr(log_pos, buf)));
1515       DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1516                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1517                           until_log_name, llstr(until_log_pos, buf)));
1518     }
1519 #endif
1520 
1521     if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1522     {
1523       /*
1524         We have no cached comparison results so we should compare log names
1525         and cache result.
1526         If we are after RESET SLAVE, and the SQL slave thread has not processed
1527         any event yet, it could be that group_master_log_name is "". In that case,
1528         just wait for more events (as there is no sensible comparison to do).
1529       */
1530 
1531       if (*log_name)
1532       {
1533         const char *basename= log_name + dirname_length(log_name);
1534 
1535         const char *q= (const char*)(fn_ext(basename)+1);
1536         if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1537         {
1538           /* Now compare extensions. */
1539           char *q_end;
1540           ulong log_name_extension= strtoul(q, &q_end, 10);
1541           if (log_name_extension < until_log_name_extension)
1542             until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1543           else
1544             until_log_names_cmp_result=
1545               (log_name_extension > until_log_name_extension) ?
1546               UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1547         }
1548         else
1549         {
1550           /* Base names do not match, so we abort */
1551           sql_print_error("%s", error_msg);
1552           DBUG_RETURN(true);
1553         }
1554       }
1555       else
1556         DBUG_RETURN(until_log_pos == 0);
1557     }
1558 
1559     if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1560           log_pos >= until_log_pos) ||
1561          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1562     {
1563       char buf[22];
1564       sql_print_information("Slave SQL thread stopped because it reached its"
1565                             " UNTIL position %s", llstr(until_pos(), buf));
1566       DBUG_RETURN(true);
1567     }
1568     DBUG_RETURN(false);
1569   }
1570 
1571   case UNTIL_SQL_BEFORE_GTIDS:
1572     /*
1573       We only need to check once if executed_gtids set
1574       contains any of the until_sql_gtids.
1575     */
1576     if (until_sql_gtids_first_event)
1577     {
1578       until_sql_gtids_first_event= false;
1579       global_sid_lock->wrlock();
1580       /* Check if until GTIDs were already applied. */
1581       const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1582       if (until_sql_gtids.is_intersection_nonempty(executed_gtids))
1583       {
1584         char *buffer;
1585         until_sql_gtids.to_string(&buffer);
1586         global_sid_lock->unlock();
1587         sql_print_information("Slave SQL thread stopped because "
1588                               "UNTIL SQL_BEFORE_GTIDS %s is already "
1589                               "applied", buffer);
1590         my_free(buffer);
1591         DBUG_RETURN(true);
1592       }
1593       global_sid_lock->unlock();
1594     }
1595     if (ev != NULL && ev->get_type_code() == binary_log::GTID_LOG_EVENT)
1596     {
1597       Gtid_log_event *gev= (Gtid_log_event *)ev;
1598       global_sid_lock->rdlock();
1599       if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1600       {
1601         char *buffer;
1602         until_sql_gtids.to_string(&buffer);
1603         global_sid_lock->unlock();
1604         sql_print_information("Slave SQL thread stopped because it reached "
1605                               "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1606         my_free(buffer);
1607         DBUG_RETURN(true);
1608       }
1609       global_sid_lock->unlock();
1610     }
1611     DBUG_RETURN(false);
1612     break;
1613 
1614   case UNTIL_SQL_AFTER_GTIDS:
1615     {
1616       global_sid_lock->wrlock();
1617       const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1618       if (until_sql_gtids.is_subset(executed_gtids))
1619       {
1620         char *buffer;
1621         until_sql_gtids.to_string(&buffer);
1622         global_sid_lock->unlock();
1623         sql_print_information("Slave SQL thread stopped because it reached "
1624                               "UNTIL SQL_AFTER_GTIDS %s", buffer);
1625         my_free(buffer);
1626         DBUG_RETURN(true);
1627       }
1628       global_sid_lock->unlock();
1629       DBUG_RETURN(false);
1630     }
1631     break;
1632 
1633   case UNTIL_SQL_AFTER_MTS_GAPS:
1634   case UNTIL_DONE:
1635     /*
1636       TODO: this condition is actually post-execution or post-scheduling
1637             so the proper place to check it before SQL thread goes
1638             into next_event() where it can wait while the condition
1639             has been satisfied already.
1640             It's deployed here temporarily to be fixed along the regular UNTIL
1641             support for MTS is provided.
1642     */
1643     if (mts_recovery_group_cnt == 0)
1644     {
1645       sql_print_information("Slave SQL thread stopped according to "
1646                             "UNTIL SQL_AFTER_MTS_GAPS as it has "
1647                             "processed all gap transactions left from "
1648                             "the previous slave session.");
1649       until_condition= UNTIL_DONE;
1650       DBUG_RETURN(true);
1651     }
1652     else
1653     {
1654       DBUG_RETURN(false);
1655     }
1656     break;
1657 
1658   case UNTIL_SQL_VIEW_ID:
1659     if (ev != NULL && ev->get_type_code() == binary_log::VIEW_CHANGE_EVENT)
1660     {
1661       View_change_log_event *view_event= (View_change_log_event *)ev;
1662 
1663       if (until_view_id.compare(view_event->get_view_id()) == 0)
1664       {
1665         set_group_replication_retrieved_certification_info(view_event);
1666         until_view_id_found= true;
1667         DBUG_RETURN(false);
1668       }
1669     }
1670 
1671     if (until_view_id_found && ev != NULL && ev->ends_group())
1672     {
1673       until_view_id_commit_found= true;
1674       DBUG_RETURN(false);
1675     }
1676 
1677     if (until_view_id_commit_found && ev == NULL)
1678     {
1679       DBUG_RETURN(true);
1680     }
1681 
1682     DBUG_RETURN(false);
1683     break;
1684 
1685   case UNTIL_NONE:
1686     DBUG_ASSERT(0);
1687     break;
1688   }
1689 
1690   DBUG_ASSERT(0);
1691   DBUG_RETURN(false);
1692 }
1693 
cached_charset_invalidate()1694 void Relay_log_info::cached_charset_invalidate()
1695 {
1696   DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1697 
1698   /* Full of zeroes means uninitialized. */
1699   memset(cached_charset, 0, sizeof(cached_charset));
1700   DBUG_VOID_RETURN;
1701 }
1702 
1703 
cached_charset_compare(char * charset) const1704 bool Relay_log_info::cached_charset_compare(char *charset) const
1705 {
1706   DBUG_ENTER("Relay_log_info::cached_charset_compare");
1707 
1708   if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1709   {
1710     memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1711     DBUG_RETURN(1);
1712   }
1713   DBUG_RETURN(0);
1714 }
1715 
1716 
stmt_done(my_off_t event_master_log_pos)1717 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1718 {
1719   int error= 0;
1720 
1721   clear_flag(IN_STMT);
1722 
1723   DBUG_ASSERT(!belongs_to_client());
1724   /* Worker does not execute binlog update position logics */
1725   DBUG_ASSERT(!is_mts_worker(info_thd));
1726 
1727   /*
1728     Replication keeps event and group positions to specify the
1729     set of events that were executed.
1730     Event positions are incremented after processing each event
1731     whereas group positions are incremented when an event or a
1732     set of events is processed such as in a transaction and are
1733     committed or rolled back.
1734 
1735     A transaction can be ended with a Query Event, i.e. either
1736     commit or rollback, or by a Xid Log Event. Query Event is
1737     used to terminate pseudo-transactions that are executed
1738     against non-transactional engines such as MyIsam. Xid Log
1739     Event denotes though that a set of changes executed
1740     against a transactional engine is about to commit.
1741 
1742     Events' positions are incremented at stmt_done(). However,
1743     transactions that are ended with Xid Log Event have their
1744     group position incremented in the do_apply_event() and in
1745     the do_apply_event_work().
1746 
1747     Notice that the type of the engine, i.e. where data and
1748     positions are stored, against what events are being applied
1749     are not considered in this logic.
1750 
1751     Regarding the code that follows, notice that the executed
1752     group coordinates don't change if the current event is internal
1753     to the group. The same applies to MTS Coordinator when it
1754     handles a Format Descriptor event that appears in the middle
1755     of a group that is about to be assigned.
1756   */
1757   if ((!is_parallel_exec() && is_in_group()) ||
1758       mts_group_status != MTS_NOT_IN_GROUP)
1759   {
1760     inc_event_relay_log_pos();
1761   }
1762   else
1763   {
1764     if (is_parallel_exec())
1765     {
1766 
1767       DBUG_ASSERT(!is_mts_worker(info_thd));
1768 
1769       /*
1770         Format Description events only can drive MTS execution to this
1771         point. It is a special event group that is handled with
1772         synchronization. For that reason, the checkpoint routine is
1773         called here.
1774       */
1775       error= mts_checkpoint_routine(this, 0, false,
1776                                     true/*need_data_lock=true*/);
1777     }
1778     if (!error)
1779       error= inc_group_relay_log_pos(event_master_log_pos,
1780                                      true/*need_data_lock=true*/);
1781   }
1782 
1783   return error;
1784 }
1785 
1786 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1787 void Relay_log_info::cleanup_context(THD *thd, bool error)
1788 {
1789   DBUG_ENTER("Relay_log_info::cleanup_context");
1790 
1791   DBUG_ASSERT(info_thd == thd);
1792   /*
1793     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1794     may have opened tables, which we cannot be sure have been closed (because
1795     maybe the Rows_log_event have not been found or will not be, because slave
1796     SQL thread is stopping, or relay log has a missing tail etc). So we close
1797     all thread's tables. And so the table mappings have to be cancelled.
1798     2) Rows_log_event::do_apply_event() may even have started statements or
1799     transactions on them, which we need to rollback in case of error.
1800     3) If finding a Format_description_log_event after a BEGIN, we also need
1801     to rollback before continuing with the next events.
1802     4) so we need this "context cleanup" function.
1803   */
1804   if (error)
1805   {
1806     trans_rollback_stmt(thd); // if a "statement transaction"
1807     trans_rollback(thd);      // if a "real transaction"
1808   }
1809   if (rows_query_ev)
1810   {
1811     /*
1812       In order to avoid invalid memory access, THD::reset_query() should be
1813       called before deleting the rows_query event.
1814     */
1815     info_thd->reset_query();
1816     info_thd->reset_query_for_display();
1817     delete rows_query_ev;
1818     rows_query_ev= NULL;
1819     DBUG_EXECUTE_IF("after_deleting_the_rows_query_ev",
1820                     {
1821                       const char action[]="now SIGNAL deleted_rows_query_ev WAIT_FOR go_ahead";
1822                       DBUG_ASSERT(!debug_sync_set_action(info_thd,
1823                                                        STRING_WITH_LEN(action)));
1824                     };);
1825   }
1826   m_table_map.clear_tables();
1827   slave_close_thread_tables(thd);
1828   if (error)
1829   {
1830     /*
1831       trans_rollback above does not rollback XA transactions.
1832       It could be done only after necessarily closing tables which dictates
1833       the following placement.
1834     */
1835     XID_STATE *xid_state= thd->get_transaction()->xid_state();
1836     if (!xid_state->has_state(XID_STATE::XA_NOTR))
1837     {
1838       DBUG_ASSERT(DBUG_EVALUATE_IF("simulate_commit_failure",1,
1839                                    xid_state->has_state(XID_STATE::XA_ACTIVE) ||
1840                                    xid_state->has_state(XID_STATE::XA_IDLE)
1841                                    ));
1842 
1843       xa_trans_force_rollback(thd);
1844       xid_state->reset();
1845       cleanup_trans_state(thd);
1846       thd->rpl_unflag_detached_engine_ha_data();
1847     }
1848     thd->mdl_context.release_transactional_locks();
1849   }
1850   clear_flag(IN_STMT);
1851   /*
1852     Cleanup for the flags that have been set at do_apply_event.
1853   */
1854   thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1855   thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1856 
1857   /*
1858     Reset state related to long_find_row notes in the error log:
1859     - timestamp
1860     - flag that decides whether the slave prints or not
1861   */
1862   reset_row_stmt_start_timestamp();
1863   unset_long_find_row_note_printed();
1864 
1865   /*
1866     If the slave applier changed the current transaction isolation level,
1867     it need to be restored to the session default value once having the
1868     current transaction cleared.
1869 
1870     We should call "trans_reset_one_shot_chistics()" only if the "error"
1871     flag is "true", because "cleanup_context()" is called at the end of each
1872     set of Table_maps/Rows representing a statement (when the rows event
1873     is tagged with the STMT_END_F) with the "error" flag as "false".
1874 
1875     So, without the "if (error)" below, the isolation level might be reset
1876     in the middle of a pure row based transaction.
1877   */
1878   if (error)
1879     trans_reset_one_shot_chistics(thd);
1880 
1881   DBUG_VOID_RETURN;
1882 }
1883 
clear_tables_to_lock()1884 void Relay_log_info::clear_tables_to_lock()
1885 {
1886   DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
1887 #ifndef DBUG_OFF
1888   /**
1889     When replicating in RBR and MyISAM Merge tables are involved
1890     open_and_lock_tables (called in do_apply_event) appends the
1891     base tables to the list of tables_to_lock. Then these are
1892     removed from the list in close_thread_tables (which is called
1893     before we reach this point).
1894 
1895     This assertion just confirms that we get no surprises at this
1896     point.
1897    */
1898   uint i=0;
1899   for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1900   DBUG_ASSERT(i == tables_to_lock_count);
1901 #endif
1902 
1903   while (tables_to_lock)
1904   {
1905     uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1906     if (tables_to_lock->m_tabledef_valid)
1907     {
1908       tables_to_lock->m_tabledef.table_def::~table_def();
1909       tables_to_lock->m_tabledef_valid= FALSE;
1910     }
1911 
1912     /*
1913       If blob fields were used during conversion of field values
1914       from the master table into the slave table, then we need to
1915       free the memory used temporarily to store their values before
1916       copying into the slave's table.
1917     */
1918     if (tables_to_lock->m_conv_table)
1919       free_blobs(tables_to_lock->m_conv_table);
1920 
1921     tables_to_lock=
1922       static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1923     tables_to_lock_count--;
1924     my_free(to_free);
1925   }
1926   DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
1927   DBUG_VOID_RETURN;
1928 }
1929 
slave_close_thread_tables(THD * thd)1930 void Relay_log_info::slave_close_thread_tables(THD *thd)
1931 {
1932   thd->get_stmt_da()->set_overwrite_status(true);
1933   DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
1934   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1935   thd->get_stmt_da()->set_overwrite_status(false);
1936 
1937   close_thread_tables(thd);
1938   /*
1939     - If transaction rollback was requested due to deadlock
1940     perform it and release metadata locks.
1941     - If inside a multi-statement transaction,
1942     defer the release of metadata locks until the current
1943     transaction is either committed or rolled back. This prevents
1944     other statements from modifying the table for the entire
1945     duration of this transaction.  This provides commit ordering
1946     and guarantees serializability across multiple transactions.
1947     - If in autocommit mode, or outside a transactional context,
1948     automatically release metadata locks of the current statement.
1949   */
1950   if (thd->transaction_rollback_request)
1951   {
1952     trans_rollback_implicit(thd);
1953     thd->mdl_context.release_transactional_locks();
1954   }
1955   else if (! thd->in_multi_stmt_transaction_mode())
1956     thd->mdl_context.release_transactional_locks();
1957   else
1958     thd->mdl_context.release_statement_locks();
1959 
1960   clear_tables_to_lock();
1961   DBUG_VOID_RETURN;
1962 }
1963 
1964 
1965 /**
1966   Execute a SHOW RELAYLOG EVENTS statement.
1967 
1968   When multiple replication channels exist on this slave
1969   and no channel name is specified through FOR CHANNEL clause
1970   this function errors out and exits.
1971 
1972   @param thd Pointer to THD object for the client thread executing the
1973   statement.
1974 
1975   @retval FALSE success
1976   @retval TRUE failure
1977 */
mysql_show_relaylog_events(THD * thd)1978 bool mysql_show_relaylog_events(THD* thd)
1979 {
1980 
1981   Master_info *mi =0;
1982   List<Item> field_list;
1983   bool res;
1984   DBUG_ENTER("mysql_show_relaylog_events");
1985 
1986   DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1987 
1988   channel_map.wrlock();
1989 
1990   if (!thd->lex->mi.for_channel && channel_map.get_num_instances() > 1)
1991   {
1992     my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0));
1993     res= true;
1994     goto err;
1995   }
1996 
1997   Log_event::init_show_field_list(&field_list);
1998   if (thd->send_result_metadata(&field_list,
1999                                 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2000   {
2001     res= true;
2002     goto err;
2003   }
2004 
2005   mi= channel_map.get_mi(thd->lex->mi.channel);
2006 
2007   if (!mi && strcmp(thd->lex->mi.channel, channel_map.get_default_channel()))
2008   {
2009     my_error(ER_SLAVE_CHANNEL_DOES_NOT_EXIST, MYF(0), thd->lex->mi.channel);
2010     res= true;
2011     goto err;
2012   }
2013 
2014   if (mi == NULL)
2015   {
2016     my_error(ER_SLAVE_CONFIGURATION, MYF(0));
2017     res= true;
2018     goto err;
2019   }
2020 
2021   res= show_binlog_events(thd, &mi->rli->relay_log);
2022 
2023 err:
2024   channel_map.unlock();
2025 
2026   DBUG_RETURN(res);
2027 }
2028 #endif
2029 
2030 
rli_init_info()2031 int Relay_log_info::rli_init_info()
2032 {
2033   int error= 0;
2034   enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
2035   const char *msg= NULL;
2036   /* Store the GTID of a transaction spanned in multiple relay log files */
2037   Gtid gtid_partial_trx= {0, 0};
2038 
2039   DBUG_ENTER("Relay_log_info::rli_init_info");
2040 
2041   mysql_mutex_assert_owner(&data_lock);
2042 
2043   /*
2044     If Relay_log_info is issued again after a failed init_info(), for
2045     instance because of missing relay log files, it will generate new
2046     files and ignore the previous failure, to avoid that we set
2047     error_on_rli_init_info as true.
2048     This a consequence of the behaviour change, in the past server was
2049     stopped when there were replication initialization errors, now it is
2050     not and so init_info() must be aware of previous failures.
2051   */
2052   if (error_on_rli_init_info)
2053     goto err;
2054 
2055   if (inited)
2056   {
2057     /*
2058       We have to reset read position of relay-log-bin as we may have
2059       already been reading from 'hotlog' when the slave was stopped
2060       last time. If this case pos_in_file would be set and we would
2061       get a crash when trying to read the signature for the binary
2062       relay log.
2063 
2064       We only rewind the read position if we are starting the SQL
2065       thread. The handle_slave_sql thread assumes that the read
2066       position is at the beginning of the file, and will read the
2067       "signature" and then fast-forward to the last position read.
2068     */
2069     bool hot_log= FALSE;
2070     /*
2071       my_b_seek does an implicit flush_io_cache, so we need to:
2072 
2073       1. check if this log is active (hot)
2074       2. if it is we keep log_lock until the seek ends, otherwise
2075          release it right away.
2076 
2077       If we did not take log_lock, SQL thread might race with IO
2078       thread for the IO_CACHE mutex.
2079 
2080     */
2081     mysql_mutex_t *log_lock= relay_log.get_log_lock();
2082     mysql_mutex_lock(log_lock);
2083     hot_log= relay_log.is_active(linfo.log_file_name);
2084 
2085     if (!hot_log)
2086       mysql_mutex_unlock(log_lock);
2087 
2088     my_b_seek(cur_log, (my_off_t) 0);
2089 
2090     if (hot_log)
2091       mysql_mutex_unlock(log_lock);
2092     DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
2093   }
2094 
2095   cur_log_fd = -1;
2096   slave_skip_counter= 0;
2097   abort_pos_wait= 0;
2098   log_space_limit= relay_log_space_limit;
2099   log_space_total= 0;
2100   tables_to_lock= 0;
2101   tables_to_lock_count= 0;
2102 
2103   char pattern[FN_REFLEN];
2104   (void) my_realpath(pattern, slave_load_tmpdir, 0);
2105   /*
2106    @TODO:
2107     In MSR, sometimes slave fail with the following error:
2108     Unable to use slave's temporary directory /tmp -
2109     Can't create/write to file '/tmp/SQL_LOAD-92d1eee0-9de4-11e3-8874-68730ad50fcb'    (Errcode: 17 - File exists), Error_code: 1
2110 
2111    */
2112   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
2113                 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
2114   {
2115     sql_print_error("Unable to use slave's temporary directory '%s'.",
2116                     slave_load_tmpdir);
2117     DBUG_RETURN(1);
2118   }
2119   unpack_filename(slave_patternload_file, pattern);
2120   slave_patternload_file_size= strlen(slave_patternload_file);
2121 
2122   /*
2123     The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
2124     Note that the I/O thread flushes it to disk after writing every
2125     event, in flush_info within the master info.
2126   */
2127   /*
2128     For the maximum log size, we choose max_relay_log_size if it is
2129     non-zero, max_binlog_size otherwise. If later the user does SET
2130     GLOBAL on one of these variables, fix_max_binlog_size and
2131     fix_max_relay_log_size will reconsider the choice (for example
2132     if the user changes max_relay_log_size to zero, we have to
2133     switch to using max_binlog_size for the relay log) and update
2134     relay_log.max_size (and mysql_bin_log.max_size).
2135   */
2136   {
2137     /* Reports an error and returns, if the --relay-log's path
2138        is a directory.*/
2139     if (opt_relay_logname &&
2140         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
2141     {
2142       sql_print_error("Path '%s' is a directory name, please specify \
2143 a file name for --relay-log option.", opt_relay_logname);
2144       DBUG_RETURN(1);
2145     }
2146 
2147     /* Reports an error and returns, if the --relay-log-index's path
2148        is a directory.*/
2149     if (opt_relaylog_index_name &&
2150         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
2151         == FN_LIBCHAR)
2152     {
2153       sql_print_error("Path '%s' is a directory name, please specify \
2154 a file name for --relay-log-index option.", opt_relaylog_index_name);
2155       DBUG_RETURN(1);
2156     }
2157 
2158     char buf[FN_REFLEN];
2159     /* The base name of the relay log file considering multisource rep */
2160     const char *ln;
2161     /*
2162       relay log name without channel prefix taking into account
2163       --relay-log option.
2164     */
2165     const char *ln_without_channel_name;
2166     static bool name_warning_sent= 0;
2167 
2168     /*
2169       Buffer to add channel name suffix when relay-log option is provided.
2170     */
2171     char relay_bin_channel[FN_REFLEN];
2172     /*
2173       Buffer to add channel name suffix when relay-log-index option is provided
2174     */
2175     char relay_bin_index_channel[FN_REFLEN];
2176 
2177     /* name of the index file if opt_relaylog_index_name is set*/
2178     const char* log_index_name;
2179 
2180 
2181     ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
2182                                 "-relay-bin", buf);
2183 
2184     ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
2185                                       ln_without_channel_name);
2186 
2187     /* We send the warning only at startup, not after every RESET SLAVE */
2188     if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
2189     {
2190       /*
2191         User didn't give us info to name the relay log index file.
2192         Picking `hostname`-relay-bin.index like we do, causes replication to
2193         fail if this slave's hostname is changed later. So, we would like to
2194         instead require a name. But as we don't want to break many existing
2195         setups, we only give warning, not error.
2196       */
2197       sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
2198                         " so replication "
2199                         "may break when this MySQL server acts as a "
2200                         "slave and has his hostname changed!! Please "
2201                         "use '--relay-log=%s' to avoid this problem.",
2202                         ln_without_channel_name);
2203       name_warning_sent= 1;
2204     }
2205 
2206     relay_log.is_relay_log= TRUE;
2207 
2208     /*
2209        If relay log index option is set, convert into channel specific
2210        index file. If the opt_relaylog_index has an extension, we strip
2211        it too. This is inconsistent to relay log names.
2212     */
2213     if (opt_relaylog_index_name)
2214     {
2215       char index_file_withoutext[FN_REFLEN];
2216       relay_log.generate_name(opt_relaylog_index_name,"",
2217                               index_file_withoutext);
2218 
2219       log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
2220                                                    FN_REFLEN,
2221                                                    index_file_withoutext);
2222     }
2223     else
2224       log_index_name= 0;
2225 
2226 
2227 
2228     if (relay_log.open_index_file(log_index_name, ln, TRUE))
2229     {
2230       sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
2231       DBUG_RETURN(1);
2232     }
2233 #ifndef DBUG_OFF
2234     global_sid_lock->wrlock();
2235     gtid_set.dbug_print("set of GTIDs in relay log before initialization");
2236     global_sid_lock->unlock();
2237 #endif
2238     /*
2239       In the init_gtid_set below we pass the mi->transaction_parser.
2240       This will be useful to ensure that we only add a GTID to
2241       the Retrieved_Gtid_Set for fully retrieved transactions. Also, it will
2242       be useful to ensure the Retrieved_Gtid_Set behavior when auto
2243       positioning is disabled (we could have transactions spanning multiple
2244       relay log files in this case).
2245       We will skip this initialization if relay_log_recovery is set in order
2246       to save time, as neither the GTIDs nor the transaction_parser state
2247       would be useful when the relay log will be cleaned up later when calling
2248       init_recovery.
2249     */
2250     if (!is_relay_log_recovery &&
2251         !gtid_retrieved_initialized &&
2252         relay_log.init_gtid_sets(&gtid_set, NULL,
2253                                  opt_slave_sql_verify_checksum,
2254                                  true/*true=need lock*/,
2255                                  &mi->transaction_parser, &gtid_partial_trx))
2256     {
2257       sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
2258       DBUG_RETURN(1);
2259     }
2260     gtid_retrieved_initialized= true;
2261 #ifndef DBUG_OFF
2262     global_sid_lock->wrlock();
2263     gtid_set.dbug_print("set of GTIDs in relay log after initialization");
2264     global_sid_lock->unlock();
2265 #endif
2266     if (!gtid_partial_trx.is_empty())
2267     {
2268       /*
2269         The init_gtid_set has found an incomplete transaction in the relay log.
2270         We add this transaction's GTID to the last_gtid_queued so the IO thread
2271         knows which GTID to add to the Retrieved_Gtid_Set when reaching the end
2272         of the incomplete transaction.
2273       */
2274       mi->set_last_gtid_queued(gtid_partial_trx);
2275     }
2276     else
2277     {
2278       mi->clear_last_gtid_queued();
2279     }
2280     /*
2281       Configures what object is used by the current log to store processed
2282       gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
2283       corretly compute the set of previous gtids.
2284     */
2285     relay_log.set_previous_gtid_set_relaylog(&gtid_set);
2286     /*
2287       note, that if open() fails, we'll still have index file open
2288       but a destructor will take care of that
2289     */
2290 
2291     mysql_mutex_t *log_lock= relay_log.get_log_lock();
2292     mysql_mutex_lock(log_lock);
2293 
2294     if (relay_log.open_binlog(ln, 0,
2295                               (max_relay_log_size ? max_relay_log_size :
2296                                max_binlog_size), true,
2297                               true/*need_lock_index=true*/,
2298                               true/*need_sid_lock=true*/,
2299                               mi->get_mi_description_event()))
2300     {
2301       mysql_mutex_unlock(log_lock);
2302       sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
2303       DBUG_RETURN(1);
2304     }
2305 
2306     mysql_mutex_unlock(log_lock);
2307 
2308   }
2309 
2310    /*
2311     This checks if the repository was created before and thus there
2312     will be values to be read. Please, do not move this call after
2313     the handler->init_info().
2314   */
2315   if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
2316   {
2317     msg= "Error checking relay log repository";
2318     error= 1;
2319     goto err;
2320   }
2321 
2322   if (handler->init_info())
2323   {
2324     msg= "Error reading relay log configuration";
2325     error= 1;
2326     goto err;
2327   }
2328 
2329   if (check_return == REPOSITORY_DOES_NOT_EXIST)
2330   {
2331     /* Init relay log with first entry in the relay index file */
2332     if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
2333                            false/*need_data_lock=false (lock should be held
2334                                   prior to invoking this function)*/,
2335                            &msg, 0))
2336     {
2337       error= 1;
2338       goto err;
2339     }
2340     group_master_log_name[0]= 0;
2341     group_master_log_pos= 0;
2342   }
2343   else
2344   {
2345     if (read_info(handler))
2346     {
2347       msg= "Error reading relay log configuration";
2348       error= 1;
2349       goto err;
2350     }
2351 
2352     if (is_relay_log_recovery && init_recovery(mi, &msg))
2353     {
2354       error= 1;
2355       goto err;
2356     }
2357 
2358     if (init_relay_log_pos(group_relay_log_name,
2359                            group_relay_log_pos,
2360                            false/*need_data_lock=false (lock should be held
2361                                   prior to invoking this function)*/,
2362                            &msg, 0))
2363     {
2364       char llbuf[22];
2365       sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
2366                       group_relay_log_name,
2367                       llstr(group_relay_log_pos, llbuf));
2368       error= 1;
2369       goto err;
2370     }
2371 
2372 #ifndef DBUG_OFF
2373     {
2374       char llbuf1[22], llbuf2[22];
2375       DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
2376                           llstr(my_b_tell(cur_log),llbuf1),
2377                           llstr(event_relay_log_pos,llbuf2)));
2378       DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2379       DBUG_ASSERT((my_b_tell(cur_log) == event_relay_log_pos));
2380     }
2381 #endif
2382   }
2383 
2384   inited= 1;
2385   error_on_rli_init_info= false;
2386   if (flush_info(TRUE))
2387   {
2388     msg= "Error reading relay log configuration";
2389     error= 1;
2390     goto err;
2391   }
2392 
2393   if (count_relay_log_space())
2394   {
2395     msg= "Error counting relay log space";
2396     error= 1;
2397     goto err;
2398   }
2399 
2400   /*
2401     In case of MTS the recovery is deferred until the end of
2402     load_mi_and_rli_from_repositories.
2403   */
2404   if (!mi->rli->mts_recovery_group_cnt)
2405     is_relay_log_recovery= FALSE;
2406   DBUG_RETURN(error);
2407 
2408 err:
2409   handler->end_info();
2410   inited= 0;
2411   error_on_rli_init_info= true;
2412   if (msg)
2413     sql_print_error("%s.", msg);
2414   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2415                   true/*need_lock_log=true*/,
2416                   true/*need_lock_index=true*/);
2417   DBUG_RETURN(error);
2418 }
2419 
end_info()2420 void Relay_log_info::end_info()
2421 {
2422   DBUG_ENTER("Relay_log_info::end_info");
2423 
2424   error_on_rli_init_info= false;
2425   if (!inited)
2426     DBUG_VOID_RETURN;
2427 
2428   handler->end_info();
2429 
2430   if (cur_log_fd >= 0)
2431   {
2432     end_io_cache(&cache_buf);
2433     (void)my_close(cur_log_fd, MYF(MY_WME));
2434     cur_log_fd= -1;
2435   }
2436   inited = 0;
2437   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2438                   true/*need_lock_log=true*/,
2439                   true/*need_lock_index=true*/);
2440   relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2441   /*
2442     Delete the slave's temporary tables from memory.
2443     In the future there will be other actions than this, to ensure persistance
2444     of slave's temp tables after shutdown.
2445   */
2446   close_temporary_tables();
2447 
2448   DBUG_VOID_RETURN;
2449 }
2450 
flush_current_log()2451 int Relay_log_info::flush_current_log()
2452 {
2453   DBUG_ENTER("Relay_log_info::flush_current_log");
2454   /*
2455     When we come to this place in code, relay log may or not be initialized;
2456     the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2457   */
2458   IO_CACHE *log_file= relay_log.get_log_file();
2459   if (flush_io_cache(log_file))
2460     DBUG_RETURN(2);
2461 
2462   DBUG_RETURN(0);
2463 }
2464 
set_master_info(Master_info * info)2465 void Relay_log_info::set_master_info(Master_info* info)
2466 {
2467   mi= info;
2468 }
2469 
2470 /**
2471   Stores the file and position where the execute-slave thread are in the
2472   relay log:
2473 
2474     - As this is only called by the slave thread or on STOP SLAVE, with the
2475       log_lock grabbed and the slave thread stopped, we don't need to have
2476       a lock here.
2477     - If there is an active transaction, then we don't update the position
2478       in the relay log.  This is to ensure that we re-execute statements
2479       if we die in the middle of an transaction that was rolled back.
2480     - As a transaction never spans binary logs, we don't have to handle the
2481       case where we do a relay-log-rotation in the middle of the transaction.
2482       If this would not be the case, we would have to ensure that we
2483       don't delete the relay log file where the transaction started when
2484       we switch to a new relay log file.
2485 
2486   @retval  0   ok,
2487   @retval  1   write error, otherwise.
2488 */
2489 
2490 /**
2491   Store the file and position where the slave's SQL thread are in the
2492   relay log.
2493 
2494   Notes:
2495 
2496   - This function should be called either from the slave SQL thread,
2497     or when the slave thread is not running.  (It reads the
2498     group_{relay|master}_log_{pos|name} and delay fields in the rli
2499     object.  These may only be modified by the slave SQL thread or by
2500     a client thread when the slave SQL thread is not running.)
2501 
2502   - If there is an active transaction, then we do not update the
2503     position in the relay log.  This is to ensure that we re-execute
2504     statements if we die in the middle of an transaction that was
2505     rolled back.
2506 
2507   - As a transaction never spans binary logs, we don't have to handle
2508     the case where we do a relay-log-rotation in the middle of the
2509     transaction.  If transactions could span several binlogs, we would
2510     have to ensure that we do not delete the relay log file where the
2511     transaction started before switching to a new relay log file.
2512 
2513   - Error can happen if writing to file fails or if flushing the file
2514     fails.
2515 
2516   @param rli The object representing the Relay_log_info.
2517 
2518   @todo Change the log file information to a binary format to avoid
2519   calling longlong2str.
2520 
2521   @return 0 on success, 1 on error.
2522 */
flush_info(const bool force)2523 int Relay_log_info::flush_info(const bool force)
2524 {
2525   DBUG_ENTER("Relay_log_info::flush_info");
2526 
2527   if (!inited)
2528     DBUG_RETURN(0);
2529 
2530   /*
2531     We update the sync_period at this point because only here we
2532     now that we are handling a relay log info. This needs to be
2533     update every time we call flush because the option maybe
2534     dinamically set.
2535   */
2536   mysql_mutex_lock(&mts_temp_table_LOCK);
2537   handler->set_sync_period(sync_relayloginfo_period);
2538 
2539   if (write_info(handler))
2540     goto err;
2541 
2542   if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2543     goto err;
2544 
2545   force_flush_postponed_due_to_split_trans= false;
2546   mysql_mutex_unlock(&mts_temp_table_LOCK);
2547   DBUG_RETURN(0);
2548 
2549 err:
2550   sql_print_error("Error writing relay log configuration.");
2551   mysql_mutex_unlock(&mts_temp_table_LOCK);
2552   DBUG_RETURN(1);
2553 }
2554 
get_number_info_rli_fields()2555 size_t Relay_log_info::get_number_info_rli_fields()
2556 {
2557   return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2558 }
2559 
read_info(Rpl_info_handler * from)2560 bool Relay_log_info::read_info(Rpl_info_handler *from)
2561 {
2562   int lines= 0;
2563   char *first_non_digit= NULL;
2564   ulong temp_group_relay_log_pos= 0;
2565   ulong temp_group_master_log_pos= 0;
2566   int temp_sql_delay= 0;
2567   int temp_internal_id= internal_id;
2568 
2569   DBUG_ENTER("Relay_log_info::read_info");
2570 
2571   /*
2572     Should not read RLI from file in client threads. Client threads
2573     only use RLI to execute BINLOG statements.
2574 
2575     @todo Uncomment the following assertion. Currently,
2576     Relay_log_info::init() is called from init_master_info() before
2577     the THD object Relay_log_info::sql_thd is created. That means we
2578     cannot call belongs_to_client() since belongs_to_client()
2579     dereferences Relay_log_info::sql_thd. So we need to refactor
2580     slightly: the THD object should be created by Relay_log_info
2581     constructor (or passed to it), so that we are guaranteed that it
2582     exists at this point. /Sven
2583   */
2584   //DBUG_ASSERT(!belongs_to_client());
2585 
2586   /*
2587     Starting from 5.1.x, relay-log.info has a new format. Now, its
2588     first line contains the number of lines in the file. By reading
2589     this number we can determine which version our master.info comes
2590     from. We can't simply count the lines in the file, since
2591     versions before 5.1.x could generate files with more lines than
2592     needed. If first line doesn't contain a number, or if it
2593     contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2594     then the file is treated like a file from pre-5.1.x version.
2595     There is no ambiguity when reading an old master.info: before
2596     5.1.x, the first line contained the binlog's name, which is
2597     either empty or has an extension (contains a '.'), so can't be
2598     confused with an integer.
2599 
2600     So we're just reading first line and trying to figure which
2601     version is this.
2602   */
2603 
2604   /*
2605     The first row is temporarily stored in mi->master_log_name, if
2606     it is line count and not binlog name (new format) it will be
2607     overwritten by the second row later.
2608   */
2609   if (from->prepare_info_for_read() ||
2610       from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2611                      (char *) ""))
2612     DBUG_RETURN(TRUE);
2613 
2614   lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2615 
2616   if (group_relay_log_name[0]!='\0' &&
2617       *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2618   {
2619     /* Seems to be new format => read group relay log name */
2620     if (from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2621                        (char *) ""))
2622       DBUG_RETURN(TRUE);
2623   }
2624   else
2625      DBUG_PRINT("info", ("relay_log_info file is in old format."));
2626 
2627   if (from->get_info(&temp_group_relay_log_pos,
2628                      (ulong) BIN_LOG_HEADER_SIZE) ||
2629       from->get_info(group_master_log_name,
2630                      sizeof(group_relay_log_name),
2631                      (char *) "") ||
2632       from->get_info(&temp_group_master_log_pos,
2633                      0UL))
2634     DBUG_RETURN(TRUE);
2635 
2636   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2637   {
2638     if (from->get_info(&temp_sql_delay, 0))
2639       DBUG_RETURN(TRUE);
2640   }
2641 
2642   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2643   {
2644     if (from->get_info(&recovery_parallel_workers, 0UL))
2645       DBUG_RETURN(TRUE);
2646   }
2647 
2648   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2649   {
2650     if (from->get_info(&temp_internal_id, 1))
2651       DBUG_RETURN(TRUE);
2652   }
2653 
2654   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL)
2655   {
2656     /* the default value is empty string"" */
2657     if (from->get_info(channel, sizeof(channel), (char*)""))
2658       DBUG_RETURN(TRUE);
2659   }
2660 
2661   group_relay_log_pos=  temp_group_relay_log_pos;
2662   group_master_log_pos= temp_group_master_log_pos;
2663   sql_delay= (int32) temp_sql_delay;
2664   internal_id= (uint) temp_internal_id;
2665 
2666   DBUG_ASSERT(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2667              (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2668   DBUG_RETURN(FALSE);
2669 }
2670 
set_info_search_keys(Rpl_info_handler * to)2671 bool Relay_log_info::set_info_search_keys(Rpl_info_handler *to)
2672 {
2673   DBUG_ENTER("Relay_log_info::set_info_search_keys");
2674 
2675   if (to->set_info(LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL, channel))
2676     DBUG_RETURN(TRUE);
2677 
2678   DBUG_RETURN(FALSE);
2679 }
2680 
2681 
write_info(Rpl_info_handler * to)2682 bool Relay_log_info::write_info(Rpl_info_handler *to)
2683 {
2684   DBUG_ENTER("Relay_log_info::write_info");
2685 
2686   /*
2687     @todo Uncomment the following assertion. See todo in
2688     Relay_log_info::read_info() for details. /Sven
2689   */
2690   //DBUG_ASSERT(!belongs_to_client());
2691 
2692   if (to->prepare_info_for_write() ||
2693       to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2694       to->set_info(group_relay_log_name) ||
2695       to->set_info((ulong) group_relay_log_pos) ||
2696       to->set_info(group_master_log_name) ||
2697       to->set_info((ulong) group_master_log_pos) ||
2698       to->set_info((int) sql_delay) ||
2699       to->set_info(recovery_parallel_workers) ||
2700       to->set_info((int) internal_id) ||
2701       to->set_info(channel))
2702     DBUG_RETURN(TRUE);
2703 
2704   DBUG_RETURN(FALSE);
2705 }
2706 
2707 /**
2708    The method is run by SQL thread/MTS Coordinator.
2709    It replaces the current FD event with a new one.
2710    A version adaptation routine is invoked for the new FD
2711    to align the slave applier execution context with the master version.
2712 
2713    Since FD are shared by Coordinator and Workers in the MTS mode,
2714    deletion of the old FD is done through decrementing its usage counter.
2715    The destructor runs when the later drops to zero,
2716    also see @c Slave_worker::set_rli_description_event().
2717    The usage counter of the new FD is incremented.
2718 
2719    Although notice that MTS worker runs it, inefficiently (see assert),
2720    once at its destruction time.
2721 
2722    @param  a pointer to be installed into execution context
2723            FormatDescriptor event
2724 */
2725 
set_rli_description_event(Format_description_log_event * fe)2726 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2727 {
2728   DBUG_ENTER("Relay_log_info::set_rli_description_event");
2729   DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2730 
2731   if (fe)
2732   {
2733     ulong fe_version= adapt_to_master_version(fe);
2734 
2735     if (info_thd)
2736     {
2737       // See rpl_rli_pdb.h:Slave_worker::set_rli_description_event.
2738       if (!is_in_group() &&
2739           (info_thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
2740            info_thd->variables.gtid_next.type == UNDEFINED_GROUP))
2741       {
2742         DBUG_PRINT("info", ("Setting gtid_next.type to NOT_YET_DETERMINED_GROUP"));
2743         info_thd->variables.gtid_next.set_not_yet_determined();
2744       }
2745 
2746       if (is_parallel_exec() && fe_version > 0)
2747       {
2748         /*
2749           Prepare for workers' adaption to a new FD version. Workers
2750           will see notification through scheduling of a first event of
2751           a new post-new-FD.
2752         */
2753         for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
2754           (*it)->fd_change_notified= false;
2755       }
2756     }
2757   }
2758   if (rli_description_event &&
2759       rli_description_event->usage_counter.atomic_add(-1) == 1)
2760     delete rli_description_event;
2761 #ifndef DBUG_OFF
2762   else
2763     /* It must be MTS mode when the usage counter greater than 1. */
2764     DBUG_ASSERT(!rli_description_event || is_parallel_exec());
2765 #endif
2766   rli_description_event= fe;
2767   if (rli_description_event)
2768     rli_description_event->usage_counter.atomic_add(1);
2769 
2770   DBUG_VOID_RETURN;
2771 }
2772 
2773 struct st_feature_version
2774 {
2775   /*
2776     The enum must be in the version non-descending top-down order,
2777     the last item formally corresponds to highest possible server
2778     version (never reached, thereby no adapting actions here);
2779     enumeration starts from zero.
2780   */
2781   enum
2782   {
2783     WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2784     _END_OF_LIST // always last
2785   } item;
2786   /*
2787     Version where the feature is introduced.
2788   */
2789   uchar version_split[3];
2790   /*
2791     Action to perform when according to FormatDescriptor event Master
2792     is found to be feature-aware while previously it has *not* been.
2793   */
2794   void (*upgrade) (THD*);
2795   /*
2796     Action to perform when according to FormatDescriptor event Master
2797     is found to be feature-*un*aware while previously it has been.
2798   */
2799   void (*downgrade) (THD*);
2800 };
2801 
wl6292_upgrade_func(THD * thd)2802 void wl6292_upgrade_func(THD *thd)
2803 {
2804   thd->variables.explicit_defaults_for_timestamp= false;
2805   if (global_system_variables.explicit_defaults_for_timestamp)
2806     thd->variables.explicit_defaults_for_timestamp= true;
2807 
2808   return;
2809 }
2810 
wl6292_downgrade_func(THD * thd)2811 void wl6292_downgrade_func(THD *thd)
2812 {
2813   if (global_system_variables.explicit_defaults_for_timestamp)
2814     thd->variables.explicit_defaults_for_timestamp= false;
2815 
2816   return;
2817 }
2818 
2819 /**
2820    Sensitive to Master-vs-Slave version difference features
2821    should be listed in the version non-descending order.
2822 */
2823 static st_feature_version s_features[]=
2824 {
2825   // order is the same as in the enum
2826   { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2827     {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2828   { st_feature_version::_END_OF_LIST,
2829     {255, 255, 255}, NULL, NULL }
2830 };
2831 
2832 /**
2833    The method computes the incoming "master"'s FD server version and that
2834    of the currently installed (if ever) rli_description_event, to
2835    invoke more specific method to compare the two and adapt slave applier execution
2836    context to the new incoming master's version.
2837 
2838    This method is specifically for STS applier/MTS Coordinator as well as
2839    for a user thread applying binlog events.
2840 
2841    @param  fdle  a pointer to new Format Description event that is being
2842                  set up a new execution context.
2843    @return 0                when the versions are equal,
2844            master_version   otherwise
2845 */
adapt_to_master_version(Format_description_log_event * fdle)2846 ulong Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2847 {
2848   ulong master_version, current_version, slave_version;
2849 
2850   slave_version= version_product(slave_version_split);
2851   /* When rli_description_event is uninitialized yet take the slave's version */
2852   master_version= !fdle ? slave_version : fdle->get_product_version();
2853   current_version= !rli_description_event ? slave_version :
2854     rli_description_event->get_product_version();
2855 
2856   return adapt_to_master_version_updown(master_version, current_version);
2857 }
2858 
2859 /**
2860   The method compares two supplied versions and carries out down- or
2861   up- grade customization of execution context of the slave applier
2862   (thd).
2863 
2864   The method is invoked in the STS case through
2865   Relay_log_info::adapt_to_master_version() right before a new master
2866   FD is installed into the applier execution context; in the MTS
2867   case it's done by the Worker when it's assigned with a first event
2868   after the latest new FD has been installed.
2869 
2870   Comparison of the current (old, existing) and the master (new,
2871   incoming) versions yields adaptive actions.
2872   To explain that, let's denote V_0 as the current, and the master's
2873   one as V_1.
2874   In the downgrade case (V_1 < V_0) a server feature that is undefined
2875   in V_1 but is defined starting from some V_f of [V_1 + 1, V_0] range
2876   (+1 to mean V_1 excluded) are invalidated ("removed" from execution context)
2877   by running so called here downgrade action.
2878   Conversely in the upgrade case a feature defined in [V_0 + 1, V_1] range
2879   is validated ("added" to execution context) by running its upgrade action.
2880   A typical use case showing how adaptive actions are necessary for the slave
2881   applier is when the master version is lesser than the slave's one.
2882   In such case events generated on the "older" master may need to be applied
2883   in their native server context. And such context can be provided by downgrade
2884   actions.
2885   Conversely, when the old master events are run out and a newer master's events
2886   show up for applying, the execution context will be upgraded through
2887   the namesake actions.
2888 
2889   Notice that a relay log may have two FD events, one the slave local
2890   and the other from the Master. As there's no concern for the FD
2891   originator this leads to two adapt_to_master_version() calls.
2892   It's not harmful as can be seen from the following example.
2893   Say the currently installed FD's version is
2894   V_m, then at relay-log rotation the following transition takes
2895   place:
2896 
2897      V_m  -adapt-> V_s -adapt-> V_m.
2898 
2899   here and further `m' subscript stands for the master, `s' for the slave.
2900   It's clear that in this case an ineffective V_m -> V_m transition occurs.
2901 
2902   At composing downgrade/upgrade actions keep in mind that the slave applier
2903   version transition goes the following route:
2904   The initial version is that of the slave server (V_ss).
2905   It changes to a magic 4.0 at the slave relay log initialization.
2906   In the following course versions are extracted from each FD read out,
2907   regardless of what server generated it. Here is a typical version
2908   transition sequence underscored with annotation:
2909 
2910    V_ss -> 4.0 -> V(FD_s^1) -> V(FD_m^2)   --->   V(FD_s^3) -> V(FD_m^4)  ...
2911 
2912     ----------     -----------------     --------  ------------------     ---
2913      bootstrap       1st relay log       rotation      2nd log            etc
2914 
2915   The upper (^) subscipt enumerates Format Description events, V(FD^i) stands
2916   for a function extrating the version data from the i:th FD.
2917 
2918   There won't be any action to execute when info_thd is undefined,
2919   e.g at bootstrap.
2920 
2921   @param  master_version   an upcoming new version
2922   @param  current_version  the current version
2923   @return 0                when the new version is equal to the current one,
2924           master_version   otherwise
2925 */
adapt_to_master_version_updown(ulong master_version,ulong current_version)2926 ulong Relay_log_info::adapt_to_master_version_updown(ulong master_version,
2927                                                      ulong current_version)
2928 {
2929   THD *thd= info_thd;
2930   /*
2931     When the SQL thread or MTS Coordinator executes this method
2932     there's a constraint on current_version argument.
2933   */
2934   DBUG_ASSERT(!thd ||
2935               thd->rli_fake != NULL ||
2936               thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER ||
2937               (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
2938                (!rli_description_event ||
2939                 current_version ==
2940                 rli_description_event->get_product_version())));
2941 
2942   if (master_version == current_version)
2943     return 0;
2944   else if (!thd)
2945     return master_version;
2946 
2947   bool downgrade= master_version < current_version;
2948    /*
2949     find item starting from and ending at for which adaptive actions run
2950     for downgrade or upgrade branches.
2951     (todo: convert into bsearch when number of features will grow significantly)
2952   */
2953   long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2954 
2955   for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2956   {
2957     ulong ver_f= version_product(s_features[i].version_split);
2958 
2959     if ((downgrade ? master_version : current_version) < ver_f &&
2960         i_first == st_feature_version::_END_OF_LIST)
2961       i_first= i;
2962     if ((downgrade ? current_version : master_version) < ver_f)
2963     {
2964       i_last= i;
2965       DBUG_ASSERT(i_last >= i_first);
2966       break;
2967     }
2968   }
2969 
2970   /*
2971      actions, executed in version non-descending st_feature_version order
2972   */
2973   for (i= i_first; i < i_last; i++)
2974   {
2975     /* Run time check of the st_feature_version items ordering */
2976     DBUG_ASSERT(!i ||
2977                 version_product(s_features[i - 1].version_split) <=
2978                 version_product(s_features[i].version_split));
2979 
2980     DBUG_ASSERT((downgrade ? master_version : current_version) <
2981                 version_product(s_features[i].version_split) &&
2982                 (downgrade ? current_version : master_version  >=
2983                  version_product(s_features[i].version_split)));
2984 
2985     if (downgrade && s_features[i].downgrade)
2986     {
2987       s_features[i].downgrade(thd);
2988     }
2989     else if (s_features[i].upgrade)
2990     {
2991       s_features[i].upgrade(thd);
2992     }
2993   }
2994 
2995   return master_version;
2996 }
2997 
relay_log_number_to_name(uint number,char name[FN_REFLEN+1])2998 void Relay_log_info::relay_log_number_to_name(uint number,
2999                                               char name[FN_REFLEN+1])
3000 {
3001   char *str= NULL;
3002   char relay_bin_channel[FN_REFLEN+1];
3003   const char *relay_log_basename_channel=
3004     add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN+1,
3005                                   relay_log_basename);
3006 
3007   /* str points to closing null of relay log basename channel */
3008   str= strmake(name, relay_log_basename_channel, FN_REFLEN+1);
3009   *str++= '.';
3010   sprintf(str, "%06u", number);
3011 }
3012 
relay_log_name_to_number(const char * name)3013 uint Relay_log_info::relay_log_name_to_number(const char *name)
3014 {
3015   return static_cast<uint>(atoi(fn_ext(name)+1));
3016 }
3017 
is_mts_db_partitioned(Relay_log_info * rli)3018 bool is_mts_db_partitioned(Relay_log_info * rli)
3019 {
3020   return (rli->current_mts_submode->get_type() ==
3021     MTS_PARALLEL_TYPE_DB_NAME);
3022 }
3023 
get_for_channel_str(bool upper_case) const3024 const char* Relay_log_info::get_for_channel_str(bool upper_case) const
3025 {
3026   if (rli_fake)
3027     return "";
3028   else
3029     return mi->get_for_channel_str(upper_case);
3030 }
3031 
add_gtid_set(const Gtid_set * gtid_set)3032 enum_return_status Relay_log_info::add_gtid_set(const Gtid_set *gtid_set)
3033 {
3034   DBUG_ENTER("Relay_log_info::add_gtid_set(gtid_set)");
3035 
3036   enum_return_status return_status= this->gtid_set.add_gtid_set(gtid_set);
3037 
3038   DBUG_RETURN(return_status);
3039 }
3040 
detach_engine_ha_data(THD * thd)3041 void Relay_log_info::detach_engine_ha_data(THD *thd)
3042 {
3043   is_engine_ha_data_detached= true;
3044     /*
3045       In case of slave thread applier or processing binlog by client,
3046       detach the engine ha_data ("native" engine transaction)
3047       in favor of dynamically created.
3048     */
3049   plugin_foreach(thd, detach_native_trx,
3050                  MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3051 }
3052 
reattach_engine_ha_data(THD * thd)3053 void Relay_log_info::reattach_engine_ha_data(THD *thd)
3054 {
3055   is_engine_ha_data_detached = false;
3056   /*
3057     In case of slave thread applier or processing binlog by client,
3058     reattach the engine ha_data ("native" engine transaction)
3059     in favor of dynamically created.
3060   */
3061   plugin_foreach(thd, reattach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3062 }
3063