1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "rpl_rli.h"
24 
25 #include "my_dir.h"                // MY_STAT
26 #include "log.h"                   // sql_print_error
27 #include "log_event.h"             // Log_event
28 #include "m_string.h"
29 #include "rpl_group_replication.h" // set_group_replication_retrieved_certifi...
30 #include "rpl_info_factory.h"      // Rpl_info_factory
31 #include "rpl_mi.h"                // Master_info
32 #include "rpl_msr.h"               // channel_map
33 #include "rpl_rli_pdb.h"           // Slave_worker
34 #include "sql_base.h"              // close_thread_tables
35 #include "strfunc.h"               // strconvert
36 #include "transaction.h"           // trans_commit_stmt
37 #include "debug_sync.h"
38 #include "pfs_file_provider.h"
39 #include "mysql/psi/mysql_file.h"
40 #include "mutex_lock.h"            // Mutex_lock
41 
42 #include <algorithm>
43 using std::min;
44 using std::max;
45 
46 /*
47   Please every time you add a new field to the relay log info, update
48   what follows. For now, this is just used to get the number of
49   fields.
50 */
51 const char* info_rli_fields[]=
52 {
53   "number_of_lines",
54   "group_relay_log_name",
55   "group_relay_log_pos",
56   "group_master_log_name",
57   "group_master_log_pos",
58   "sql_delay",
59   "number_of_workers",
60   "id",
61   "channel_name"
62 };
63 
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel,bool is_rli_fake)64 Relay_log_info::Relay_log_info(bool is_slave_recovery
65 #ifdef HAVE_PSI_INTERFACE
66                                ,PSI_mutex_key *param_key_info_run_lock,
67                                PSI_mutex_key *param_key_info_data_lock,
68                                PSI_mutex_key *param_key_info_sleep_lock,
69                                PSI_mutex_key *param_key_info_thd_lock,
70                                PSI_mutex_key *param_key_info_data_cond,
71                                PSI_mutex_key *param_key_info_start_cond,
72                                PSI_mutex_key *param_key_info_stop_cond,
73                                PSI_mutex_key *param_key_info_sleep_cond
74 #endif
75                                , uint param_id, const char *param_channel,
76                                bool is_rli_fake
77                               )
78    :Rpl_info("SQL"
79 #ifdef HAVE_PSI_INTERFACE
80              ,param_key_info_run_lock, param_key_info_data_lock,
81              param_key_info_sleep_lock, param_key_info_thd_lock,
82              param_key_info_data_cond, param_key_info_start_cond,
83              param_key_info_stop_cond, param_key_info_sleep_cond
84 #endif
85              , param_id, param_channel
86             ),
87    replicate_same_server_id(::replicate_same_server_id),
88    cur_log_fd(-1), relay_log(&sync_relaylog_period, SEQ_READ_APPEND),
89    is_relay_log_recovery(is_slave_recovery),
90    save_temporary_tables(0),
91    cur_log_old_open_count(0), error_on_rli_init_info(false),
92    group_relay_log_pos(0), event_relay_log_number(0),
93    event_relay_log_pos(0), event_start_pos(0),
94    m_group_master_log_pos(0),
95    gtid_set(global_sid_map, global_sid_lock),
96    rli_fake(is_rli_fake),
97    gtid_retrieved_initialized(false),
98    is_group_master_log_pos_invalid(false),
99    log_space_total(0), ignore_log_space_limit(0),
100    sql_force_rotate_relay(false),
101    last_master_timestamp(0), slave_skip_counter(0),
102    abort_pos_wait(0), until_condition(UNTIL_NONE),
103    until_log_pos(0),
104    until_sql_gtids(global_sid_map),
105    until_sql_gtids_first_event(true),
106    trans_retries(0), retried_trans(0),
107    tables_to_lock(0), tables_to_lock_count(0),
108    rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
109    workers(PSI_NOT_INSTRUMENTED),
110    workers_array_initialized(false),
111    curr_group_assigned_parts(PSI_NOT_INSTRUMENTED),
112    curr_group_da(PSI_NOT_INSTRUMENTED),
113    slave_parallel_workers(0),
114    exit_counter(0),
115    max_updated_index(0),
116    recovery_parallel_workers(0), checkpoint_seqno(0),
117    checkpoint_group(opt_mts_checkpoint_group),
118    recovery_groups_inited(false), mts_recovery_group_cnt(0),
119    mts_recovery_index(0), mts_recovery_group_seen_begin(0),
120    mts_group_status(MTS_NOT_IN_GROUP),
121    stats_exec_time(0), stats_read_time(0),
122    least_occupied_workers(PSI_NOT_INSTRUMENTED),
123    current_mts_submode(0),
124    reported_unsafe_warning(false), rli_description_event(NULL),
125    commit_order_mngr(NULL),
126    sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
127    long_find_row_note_printed(false),
128    thd_tx_priority(0),
129    m_ignore_write_set_memory_limit(false),
130    m_allow_drop_write_set(false),
131    is_engine_ha_data_detached(false)
132 {
133   DBUG_ENTER("Relay_log_info::Relay_log_info");
134 
135 #ifdef HAVE_PSI_INTERFACE
136   relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
137                          key_RELAYLOG_LOCK_commit,
138                          key_RELAYLOG_LOCK_commit_queue,
139                          key_RELAYLOG_LOCK_done,
140                          key_RELAYLOG_LOCK_flush_queue,
141                          key_RELAYLOG_LOCK_log,
142                          PSI_NOT_INSTRUMENTED, /* Relaylog doesn't support LOCK_binlog_end_pos */
143                          key_RELAYLOG_LOCK_sync,
144                          key_RELAYLOG_LOCK_sync_queue,
145                          key_RELAYLOG_LOCK_xids,
146                          key_RELAYLOG_COND_done,
147                          key_RELAYLOG_update_cond,
148                          key_RELAYLOG_prep_xids_cond,
149                          key_file_relaylog,
150                          key_file_relaylog_index,
151                          key_file_relaylog_cache,
152                          key_file_relaylog_index_cache);
153 #endif
154 
155   group_relay_log_name[0]= event_relay_log_name[0]=
156     m_group_master_log_name[0]= 0;
157   until_log_name[0]= ign_master_log_name_end[0]= 0;
158   set_timespec_nsec(&last_clock, 0);
159   memset(&cache_buf, 0, sizeof(cache_buf));
160   cached_charset_invalidate();
161   inited_hash_workers= FALSE;
162   channel_open_temp_tables.atomic_set(0);
163   /*
164     For applier threads, currently_executing_gtid is set to automatic
165     when they are not executing any transaction.
166   */
167   currently_executing_gtid.set_automatic();
168 
169   if (!rli_fake)
170   {
171     mysql_mutex_init(key_relay_log_info_log_space_lock,
172                      &log_space_lock, MY_MUTEX_INIT_FAST);
173     mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond);
174     mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
175                      MY_MUTEX_INIT_FAST);
176     mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond);
177     mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
178                    MY_MUTEX_INIT_FAST);
179     mysql_mutex_init(key_mts_temp_table_LOCK, &mts_temp_table_LOCK,
180                      MY_MUTEX_INIT_FAST);
181     mysql_mutex_init(key_mts_gaq_LOCK, &mts_gaq_LOCK,
182                      MY_MUTEX_INIT_FAST);
183     mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
184 
185     relay_log.init_pthread_objects();
186     force_flush_postponed_due_to_split_trans= false;
187   }
188   do_server_version_split(::server_version, slave_version_split);
189 
190   DBUG_VOID_RETURN;
191 }
192 
193 /**
194    The method to invoke at slave threads start
195 */
init_workers(ulong n_workers)196 void Relay_log_info::init_workers(ulong n_workers)
197 {
198   /*
199     Parallel slave parameters initialization is done regardless
200     whether the feature is or going to be active or not.
201   */
202   mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
203   mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
204   mts_total_wait_overlap= 0;
205   mts_total_wait_worker_avail= 0;
206   mts_last_online_stat= 0;
207 
208   workers.reserve(n_workers);
209   workers_array_initialized= true; //set after init
210 }
211 
212 /**
213    The method to invoke at slave threads stop
214 */
deinit_workers()215 void Relay_log_info::deinit_workers()
216 {
217   workers.clear();
218 }
219 
~Relay_log_info()220 Relay_log_info::~Relay_log_info()
221 {
222   DBUG_ENTER("Relay_log_info::~Relay_log_info");
223 
224   if(!rli_fake)
225   {
226     if (recovery_groups_inited)
227       bitmap_free(&recovery_groups);
228     delete current_mts_submode;
229 
230     if(workers_copy_pfs.size())
231     {
232       for (int i= static_cast<int>(workers_copy_pfs.size()) - 1; i >= 0; i--)
233         delete workers_copy_pfs[i];
234       workers_copy_pfs.clear();
235     }
236 
237     mysql_mutex_destroy(&log_space_lock);
238     mysql_cond_destroy(&log_space_cond);
239     mysql_mutex_destroy(&pending_jobs_lock);
240     mysql_cond_destroy(&pending_jobs_cond);
241     mysql_mutex_destroy(&exit_count_lock);
242     mysql_mutex_destroy(&mts_temp_table_LOCK);
243     mysql_mutex_destroy(&mts_gaq_LOCK);
244     mysql_cond_destroy(&logical_clock_cond);
245     relay_log.cleanup();
246   }
247 
248   set_rli_description_event(NULL);
249 
250   DBUG_VOID_RETURN;
251 }
252 
253 /**
254    Method is called when MTS coordinator senses the relay-log name
255    has been changed.
256    It marks each Worker member with this fact to make an action
257    at time it will distribute a terminal event of a group to the Worker.
258 
259    Worker receives the new name at the group commiting phase
260    @c Slave_worker::slave_worker_ends_group().
261 */
reset_notified_relay_log_change()262 void Relay_log_info::reset_notified_relay_log_change()
263 {
264   if (!is_parallel_exec())
265     return;
266   for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
267   {
268     Slave_worker *w= *it;
269     w->relay_log_change_notified= FALSE;
270   }
271 }
272 
273 /**
274    This method is called in mts_checkpoint_routine() to mark that each
275    worker is required to adapt to a new checkpoint data whose coordinates
276    are passed to it through GAQ index.
277 
278    Worker notices the new checkpoint value at the group commit to reset
279    the current bitmap and starts using the clean bitmap indexed from zero
280    of being reset checkpoint_seqno.
281 
282     New seconds_behind_master timestamp is installed.
283 
284    @param shift            number of bits to shift by Worker due to the
285                            current checkpoint change.
286    @param new_ts           new seconds_behind_master timestamp value
287                            unless zero. Zero could be due to FD event
288                            or fake rotate event.
289    @param need_data_lock   False if caller has locked @c data_lock
290    @param update_timestamp if true, this function will update the
291                            rli->last_master_timestamp.
292 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool need_data_lock,bool update_timestamp)293 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
294                                                bool need_data_lock,
295                                                bool update_timestamp)
296 {
297   /*
298     If this is not a parallel execution we return immediately.
299   */
300   if (!is_parallel_exec())
301     return;
302 
303   for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
304   {
305     Slave_worker *w= *it;
306     /*
307       Reseting the notification information in order to force workers to
308       assign jobs with the new updated information.
309       Notice that the bitmap_shifted is accumulated to indicate how many
310       consecutive jobs were successfully processed.
311 
312       The worker when assigning a new job will set the value back to
313       zero.
314     */
315     w->checkpoint_notified= FALSE;
316     w->bitmap_shifted= w->bitmap_shifted + shift;
317     /*
318       Zero shift indicates the caller rotates the master binlog.
319       The new name will be passed to W through the group descriptor
320       during the first post-rotation time scheduling.
321     */
322     if (shift == 0)
323       w->master_log_change_notified= false;
324 
325     DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
326                        "worker->bitmap_shifted --> %lu, worker --> %u.",
327                        shift, w->bitmap_shifted,
328                        static_cast<unsigned>(it - workers.begin())));
329   }
330   /*
331     There should not be a call where (shift == 0 && checkpoint_seqno != 0).
332     Then the new checkpoint sequence is updated by subtracting the number
333     of consecutive jobs that were successfully processed.
334   */
335   assert(current_mts_submode->get_type() != MTS_PARALLEL_TYPE_DB_NAME ||
336          !(shift == 0 && checkpoint_seqno != 0));
337   checkpoint_seqno= checkpoint_seqno - shift;
338   DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
339              "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
340 
341   if (update_timestamp)
342   {
343     if (need_data_lock)
344       mysql_mutex_lock(&data_lock);
345     else
346       mysql_mutex_assert_owner(&data_lock);
347     last_master_timestamp= new_ts;
348     if (need_data_lock)
349       mysql_mutex_unlock(&data_lock);
350   }
351 }
352 
353 /**
354    Reset recovery info from Worker info table and
355    mark MTS recovery is completed.
356 
357    @return false on success true when @c reset_notified_checkpoint failed.
358 */
mts_finalize_recovery()359 bool Relay_log_info::mts_finalize_recovery()
360 {
361   bool ret= false;
362   uint i;
363   uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
364 
365   DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
366 
367   for (Slave_worker **it= workers.begin(); !ret && it != workers.end(); ++it)
368   {
369     Slave_worker *w= *it;
370     ret= w->reset_recovery_info();
371     DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
372   }
373   /*
374     The loop is traversed in the worker index descending order due
375     to specifics of the Worker table repository that does not like
376     even temporary holes. Therefore stale records are deleted
377     from the tail.
378   */
379   DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
380                   {DBUG_SET("+d,mts_worker_thread_init_fails");});
381   for (i= recovery_parallel_workers; i > workers.size() && !ret; i--)
382   {
383     Slave_worker *w=
384       Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
385     /*
386       If an error occurs during the above create_worker call, the newly created
387       worker object gets deleted within the above function call itself and only
388       NULL is returned. Hence the following check has been added to verify
389       that a valid worker object exists.
390     */
391     if (w)
392     {
393       ret= w->remove_info();
394       delete w;
395     }
396     else
397     {
398       ret= true;
399       goto err;
400     }
401   }
402   recovery_parallel_workers= slave_parallel_workers;
403 
404 err:
405   DBUG_RETURN(ret);
406 }
407 
mts_workers_queue_empty() const408 bool Relay_log_info::mts_workers_queue_empty() const
409 {
410   ulong ret= 0;
411 
412   for (Slave_worker * const *it= workers.begin(); ret == 0 && it != workers.end(); ++it)
413   {
414     Slave_worker *worker= *it;
415     mysql_mutex_lock(&worker->jobs_lock);
416     ret+= worker->curr_jobs;
417     mysql_mutex_unlock(&worker->jobs_lock);
418   }
419   return ret == 0;
420 }
421 
422 /* Checks if all in-flight stmts/trx can be safely rolled back */
cannot_safely_rollback() const423 bool Relay_log_info::cannot_safely_rollback() const
424 {
425   if (!is_parallel_exec())
426     return info_thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::SESSION);
427 
428   bool ret= false;
429 
430   for (Slave_worker * const *it= workers.begin(); !ret && it != workers.end(); ++it)
431   {
432     Slave_worker *worker= *it;
433     mysql_mutex_lock(&worker->jobs_lock);
434     ret= worker->info_thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::SESSION);
435     mysql_mutex_unlock(&worker->jobs_lock);
436   }
437   return ret;
438 }
439 
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)440 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
441 {
442   MY_STAT s;
443   DBUG_ENTER("add_relay_log");
444   mysql_mutex_assert_owner(&rli->log_space_lock);
445   if (!mysql_file_stat(key_file_relaylog,
446                        linfo->log_file_name, &s, MYF(0)))
447   {
448     sql_print_error("log %s listed in the index, but failed to stat.",
449                     linfo->log_file_name);
450     DBUG_RETURN(1);
451   }
452   rli->log_space_total += s.st_size;
453 #ifndef NDEBUG
454   char buf[22];
455   DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
456 #endif
457   DBUG_RETURN(0);
458 }
459 
count_relay_log_space()460 int Relay_log_info::count_relay_log_space()
461 {
462   LOG_INFO flinfo;
463   DBUG_ENTER("Relay_log_info::count_relay_log_space");
464   Mutex_lock lock(&log_space_lock);
465   log_space_total= 0;
466   if (relay_log.find_log_pos(&flinfo, NullS, 1))
467   {
468     sql_print_error("Could not find first log while counting relay log space.");
469     DBUG_RETURN(1);
470   }
471   do
472   {
473     if (add_relay_log(this, &flinfo))
474       DBUG_RETURN(1);
475   } while (!relay_log.find_next_log(&flinfo, 1));
476   /*
477      As we have counted everything, including what may have written in a
478      preceding write, we must reset bytes_written, or we may count some space
479      twice.
480   */
481   relay_log.reset_bytes_written();
482   DBUG_RETURN(0);
483 }
484 
485 /**
486    Resets UNTIL condition for Relay_log_info
487  */
488 
clear_until_condition()489 void Relay_log_info::clear_until_condition()
490 {
491   DBUG_ENTER("clear_until_condition");
492 
493   until_condition= Relay_log_info::UNTIL_NONE;
494   until_log_name[0]= 0;
495   until_log_pos= 0;
496   until_sql_gtids.clear();
497   until_sql_gtids_first_event= true;
498   DBUG_VOID_RETURN;
499 }
500 
501 /**
502   Opens and intialize the given relay log. Specifically, it does what follows:
503 
504   - Closes old open relay log files.
505   - If we are using the same relay log as the running IO-thread, then sets.
506     rli->cur_log to point to the same IO_CACHE entry.
507   - If not, opens the 'log' binary file.
508 
509   @todo check proper initialization of
510   group_master_log_name/group_master_log_pos. /alfranio
511 
512   @param rli[in] Relay information (will be initialized)
513   @param log[in] Name of relay log file to read from. NULL = First log
514   @param pos[in] Position in relay log file
515   @param need_data_lock[in] If true, this function will acquire the
516   relay_log.data_lock(); otherwise the caller should already have
517   acquired it.
518   @param errmsg[out] On error, this function will store a pointer to
519   an error message here
520   @param keep_looking_for_fd[in] If true, this function will
521   look for a Format_description_log_event.  We only need this when the
522   SQL thread starts and opens an existing relay log and has to execute
523   it (possibly from an offset >4); then we need to read the first
524   event of the relay log to be able to parse the events we have to
525   execute.
526 
527   @retval 0 ok,
528   @retval 1 error.  In this case, *errmsg is set to point to the error
529   message.
530 */
531 
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)532 int Relay_log_info::init_relay_log_pos(const char* log,
533                                        ulonglong pos, bool need_data_lock,
534                                        const char** errmsg,
535                                        bool keep_looking_for_fd)
536 {
537   DBUG_ENTER("Relay_log_info::init_relay_log_pos");
538   DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
539 
540   *errmsg=0;
541   const char* errmsg_fmt= 0;
542   static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
543   mysql_mutex_t *log_lock= relay_log.get_log_lock();
544 
545   if (need_data_lock)
546     mysql_mutex_lock(&data_lock);
547   else
548     mysql_mutex_assert_owner(&data_lock);
549 
550   /*
551     By default the relay log is in binlog format 3 (4.0).
552     Even if format is 4, this will work enough to read the first event
553     (Format_desc) (remember that format 4 is just lenghtened compared to format
554     3; format 3 is a prefix of format 4).
555   */
556   set_rli_description_event(new Format_description_log_event(3));
557 
558   mysql_mutex_lock(log_lock);
559 
560   /* Close log file and free buffers if it's already open */
561   if (cur_log_fd >= 0)
562   {
563     end_io_cache(&cache_buf);
564     mysql_file_close(cur_log_fd, MYF(MY_WME));
565     cur_log_fd = -1;
566   }
567 
568   group_relay_log_pos= event_relay_log_pos= pos;
569 
570   /*
571     Test to see if the previous run was with the skip of purging
572     If yes, we do not purge when we restart
573   */
574   if (relay_log.find_log_pos(&linfo, NullS, 1))
575   {
576     *errmsg="Could not find first log during relay log initialization";
577     goto err;
578   }
579 
580   if (log && relay_log.find_log_pos(&linfo, log, 1))
581   {
582     errmsg_fmt= "Could not find target log file mentioned in "
583                 "relay log info in the index file '%s' during "
584                 "relay log initialization";
585     sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
586     *errmsg= errmsg_buff;
587     goto err;
588   }
589 
590   set_group_relay_log_name(linfo.log_file_name);
591   set_event_relay_log_name(linfo.log_file_name);
592 
593   if (relay_log.is_active(linfo.log_file_name))
594   {
595     /*
596       The IO thread is using this log file.
597       In this case, we will use the same IO_CACHE pointer to
598       read data as the IO thread is using to write data.
599     */
600     my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
601     if (check_binlog_magic(cur_log, errmsg))
602       goto err;
603     cur_log_old_open_count=relay_log.get_open_count();
604   }
605   else
606   {
607     /*
608       Open the relay log and set cur_log to point at this one
609     */
610     if ((cur_log_fd=open_binlog_file(&cache_buf,
611                                      linfo.log_file_name,errmsg)) < 0)
612       goto err;
613     cur_log = &cache_buf;
614   }
615   /*
616     In all cases, check_binlog_magic() has been called so we're at offset 4 for
617     sure.
618   */
619   if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
620   {
621     Log_event* ev;
622     while (keep_looking_for_fd)
623     {
624       /*
625         Read the possible Format_description_log_event; if position
626         was 4, no need, it will be read naturally.
627       */
628       DBUG_PRINT("info",("looking for a Format_description_log_event"));
629 
630       if (my_b_tell(cur_log) >= pos)
631         break;
632 
633       /*
634         Because of we have data_lock and log_lock, we can safely read an
635         event
636       */
637       if (!(ev= Log_event::read_log_event(cur_log, 0,
638                                           rli_description_event,
639                                           opt_slave_sql_verify_checksum)))
640       {
641         DBUG_PRINT("info",("could not read event, cur_log->error=%d",
642                            cur_log->error));
643         if (cur_log->error) /* not EOF */
644         {
645           *errmsg= "I/O error reading event at position 4";
646           goto err;
647         }
648         break;
649       }
650       else if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT)
651       {
652         Format_description_log_event *old= rli_description_event;
653         DBUG_PRINT("info",("found Format_description_log_event"));
654         Format_description_log_event *new_fdev=
655           static_cast<Format_description_log_event*>(ev);
656         new_fdev->copy_crypto_data(*old);
657         set_rli_description_event(new_fdev);
658         /*
659           As ev was returned by read_log_event, it has passed is_valid(), so
660           my_malloc() in ctor worked, no need to check again.
661         */
662         /*
663           Ok, we found a Format_description event. But it is not sure that this
664           describes the whole relay log; indeed, one can have this sequence
665           (starting from position 4):
666           Format_desc (of slave)
667           Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
668           Rotate (of master)
669           Format_desc (of master)
670           So the Format_desc which really describes the rest of the relay log
671           can be the 3rd or the 4th event (depending on GTIDs being enabled or
672           not, it can't be further than that, because we rotate
673           the relay log when we queue a Rotate event from the master).
674           But what describes the Rotate is the first Format_desc.
675           So what we do is:
676           go on searching for Format_description events, until you exceed the
677           position (argument 'pos') or until you find an event other than
678           Previous-GTIDs, Rotate or Format_desc.
679         */
680       }
681       else if (ev->get_type_code() == binary_log::START_ENCRYPTION_EVENT)
682       {
683         if (rli_description_event->start_decryption(static_cast<Start_encryption_log_event*>(ev)))
684         {
685           *errmsg= "Unable to set up decryption of binlog.";
686           delete ev;
687           goto err;
688         }
689         delete ev;
690       }
691       else
692       {
693         DBUG_PRINT("info",("found event of another type=%d",
694                            ev->get_type_code()));
695         keep_looking_for_fd=
696           (ev->get_type_code() == binary_log::ROTATE_EVENT ||
697            ev->get_type_code() == binary_log::PREVIOUS_GTIDS_LOG_EVENT);
698         delete ev;
699       }
700     }
701     my_b_seek(cur_log,(off_t)pos);
702 #ifndef NDEBUG
703   {
704     char llbuf1[22], llbuf2[22];
705     DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
706                         llstr(my_b_tell(cur_log),llbuf1),
707                         llstr(get_event_relay_log_pos(),llbuf2)));
708   }
709 #endif
710 
711   }
712 
713 err:
714   /*
715     If we don't purge, we can't honour relay_log_space_limit ;
716     silently discard it
717   */
718   if (!relay_log_purge)
719   {
720     log_space_limit= 0; // todo: consider to throw a warning at least
721   }
722   mysql_cond_broadcast(&data_cond);
723 
724   mysql_mutex_unlock(log_lock);
725 
726   if (need_data_lock)
727     mysql_mutex_unlock(&data_lock);
728   if (!rli_description_event->is_valid() && !*errmsg)
729     *errmsg= "Invalid Format_description log event; could be out of memory";
730 
731   DBUG_RETURN ((*errmsg) ? 1 : 0);
732 }
733 
734 /**
735   Update the error number, message and timestamp fields. This function is
736   different from va_report() as va_report() also logs the error message in the
737   log apart from updating the error fields.
738 
739   SYNOPSIS
740   @param[in]  level          specifies the level- error, warning or information,
741   @param[in]  err_code       error number,
742   @param[in]  buff_coord     error message to be used.
743 
744 */
fill_coord_err_buf(loglevel level,int err_code,const char * buff_coord) const745 void Relay_log_info::fill_coord_err_buf(loglevel level, int err_code,
746                                       const char *buff_coord) const
747 {
748   mysql_mutex_lock(&err_lock);
749 
750   if(level == ERROR_LEVEL)
751   {
752     m_last_error.number = err_code;
753     my_snprintf(m_last_error.message, sizeof(m_last_error.message), "%.*s",
754                 MAX_SLAVE_ERRMSG - 1, buff_coord);
755     m_last_error.update_timestamp();
756   }
757 
758   mysql_mutex_unlock(&err_lock);
759 }
760 
761 /**
762   Waits until the SQL thread reaches (has executed up to) the
763   log/position or timed out.
764 
765   SYNOPSIS
766   @param[in]  thd             client thread that sent @c SELECT @c MASTER_POS_WAIT,
767   @param[in]  log_name        log name to wait for,
768   @param[in]  log_pos         position to wait for,
769   @param[in]  timeout         @c timeout in seconds before giving up waiting.
770                               @c timeout is double whereas it should be ulong; but this is
771                               to catch if the user submitted a negative timeout.
772 
773   @retval  -2   improper arguments (log_pos<0)
774                 or slave not running, or master info changed
775                 during the function's execution,
776                 or client thread killed. -2 is translated to NULL by caller,
777   @retval  -1   timed out
778   @retval  >=0  number of log events the function had to wait
779                 before reaching the desired log/position
780  */
781 
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)782 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
783                                     longlong log_pos,
784                                     double timeout)
785 {
786   int event_count = 0;
787   ulong init_abort_pos_wait;
788   int error=0;
789   struct timespec abstime; // for timeout checking
790   PSI_stage_info old_stage;
791   DBUG_ENTER("Relay_log_info::wait_for_pos");
792 
793   if (!inited)
794     DBUG_RETURN(-2);
795 
796   DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
797                       log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
798 
799   DEBUG_SYNC(thd, "begin_master_pos_wait");
800 
801   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
802   mysql_mutex_lock(&data_lock);
803   thd->ENTER_COND(&data_cond, &data_lock,
804                   &stage_waiting_for_the_slave_thread_to_advance_position,
805                   &old_stage);
806   /*
807      This function will abort when it notices that some CHANGE MASTER or
808      RESET MASTER has changed the master info.
809      To catch this, these commands modify abort_pos_wait ; We just monitor
810      abort_pos_wait and see if it has changed.
811      Why do we have this mechanism instead of simply monitoring slave_running
812      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
813      the SQL thread be stopped?
814      This is becasue if someones does:
815      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
816      the change may happen very quickly and we may not notice that
817      slave_running briefly switches between 1/0/1.
818   */
819   init_abort_pos_wait= abort_pos_wait;
820 
821   /*
822     We'll need to
823     handle all possible log names comparisons (e.g. 999 vs 1000).
824     We use ulong for string->number conversion ; this is no
825     stronger limitation than in find_uniq_filename in sql/log.cc
826   */
827   ulong log_name_extension;
828   char log_name_tmp[FN_REFLEN]; //make a char[] from String
829 
830   strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
831 
832   char *p= fn_ext(log_name_tmp);
833   char *p_end;
834   if (!*p || log_pos<0)
835   {
836     error= -2; //means improper arguments
837     goto err;
838   }
839   // Convert 0-3 to 4
840   log_pos= max(log_pos, static_cast<longlong>(BIN_LOG_HEADER_SIZE));
841   /* p points to '.' */
842   log_name_extension= strtoul(++p, &p_end, 10);
843   /*
844     p_end points to the first invalid character.
845     If it equals to p, no digits were found, error.
846     If it contains '\0' it means conversion went ok.
847   */
848   if (p_end==p || *p_end)
849   {
850     error= -2;
851     goto err;
852   }
853 
854   /* The "compare and wait" main loop */
855   while (!thd->killed &&
856          init_abort_pos_wait == abort_pos_wait &&
857          slave_running)
858   {
859     bool pos_reached;
860     int cmp_result= 0;
861     const char * const group_master_log_name= get_group_master_log_name();
862     const ulonglong group_master_log_pos= get_group_master_log_pos();
863 
864     DBUG_PRINT("info",
865                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
866                 init_abort_pos_wait, abort_pos_wait));
867     DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
868                        group_master_log_name, (ulong) group_master_log_pos));
869 
870     /*
871       group_master_log_name can be "", if we are just after a fresh
872       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
873       (before we have executed one Rotate event from the master) or
874       (rare) if the user is doing a weird slave setup (see next
875       paragraph).  If group_master_log_name is "", we assume we don't
876       have enough info to do the comparison yet, so we just wait until
877       more data. In this case master_log_pos is always 0 except if
878       somebody (wrongly) sets this slave to be a slave of itself
879       without using --replicate-same-server-id (an unsupported
880       configuration which does nothing), then group_master_log_pos
881       will grow and group_master_log_name will stay "".
882       Also in case the group master log position is invalid (e.g. after
883       CHANGE MASTER TO RELAY_LOG_POS ), we will wait till the first event
884       is read and the log position is valid again.
885     */
886     if (*group_master_log_name && !is_group_master_log_pos_invalid)
887     {
888       const char * const basename= (group_master_log_name +
889                                     dirname_length(group_master_log_name));
890       /*
891         First compare the parts before the extension.
892         Find the dot in the master's log basename,
893         and protect against user's input error :
894         if the names do not match up to '.' included, return error
895       */
896       char *q= (fn_ext(basename)+1);
897       if (strncmp(basename, log_name_tmp, (int)(q-basename)))
898       {
899         error= -2;
900         break;
901       }
902       // Now compare extensions.
903       char *q_end;
904       ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
905       if (group_master_log_name_extension < log_name_extension)
906         cmp_result= -1 ;
907       else
908         cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
909 
910       pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
911                     cmp_result > 0);
912       if (pos_reached || thd->killed)
913         break;
914     }
915 
916     //wait for master update, with optional timeout.
917 
918     DBUG_PRINT("info",("Waiting for master update"));
919     /*
920       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
921       will wake us up.
922     */
923     thd_wait_begin(thd, THD_WAIT_BINLOG);
924     if (timeout > 0)
925     {
926       /*
927         Note that mysql_cond_timedwait checks for the timeout
928         before for the condition ; i.e. it returns ETIMEDOUT
929         if the system time equals or exceeds the time specified by abstime
930         before the condition variable is signaled or broadcast, _or_ if
931         the absolute time specified by abstime has already passed at the time
932         of the call.
933         For that reason, mysql_cond_timedwait will do the "timeoutting" job
934         even if its condition is always immediately signaled (case of a loaded
935         master).
936       */
937       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
938     }
939     else
940       mysql_cond_wait(&data_cond, &data_lock);
941     thd_wait_end(thd);
942     DBUG_PRINT("info",("Got signal of master update or timed out"));
943     if (error == ETIMEDOUT || error == ETIME)
944     {
945 #ifndef NDEBUG
946       /*
947         Doing this to generate a stack trace and make debugging
948         easier.
949       */
950       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
951         assert(0);
952 #endif
953       error= -1;
954       break;
955     }
956     error=0;
957     event_count++;
958     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
959   }
960 
961 err:
962   mysql_mutex_unlock(&data_lock);
963   thd->EXIT_COND(&old_stage);
964   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
965 improper_arguments: %d  timed_out: %d",
966                      thd->killed_errno(),
967                      (int) (init_abort_pos_wait != abort_pos_wait),
968                      (int) slave_running,
969                      (int) (error == -2),
970                      (int) (error == -1)));
971   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
972       !slave_running)
973   {
974     error= -2;
975   }
976   DBUG_RETURN( error ? error : event_count );
977 }
978 
wait_for_gtid_set(THD * thd,String * gtid,double timeout)979 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
980                                       double timeout)
981 {
982   DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, String, timeout)");
983 
984   DBUG_PRINT("info", ("Waiting for %s timeout %lf", gtid->c_ptr_safe(),
985              timeout));
986 
987   Gtid_set wait_gtid_set(global_sid_map);
988   global_sid_lock->rdlock();
989 
990   if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
991   {
992     global_sid_lock->unlock();
993     DBUG_PRINT("exit",("improper gtid argument"));
994     DBUG_RETURN(-2);
995 
996   }
997   global_sid_lock->unlock();
998 
999   DBUG_RETURN(wait_for_gtid_set(thd, &wait_gtid_set, timeout));
1000 }
1001 
1002 /*
1003   TODO: This is a duplicated code that needs to be simplified.
1004   This will be done while developing all possible sync options.
1005   See WL#3584's specification.
1006 
1007   /Alfranio
1008 */
wait_for_gtid_set(THD * thd,const Gtid_set * wait_gtid_set,double timeout)1009 int Relay_log_info::wait_for_gtid_set(THD* thd, const Gtid_set* wait_gtid_set,
1010                                       double timeout)
1011 {
1012   int event_count = 0;
1013   ulong init_abort_pos_wait;
1014   int error=0;
1015   struct timespec abstime; // for timeout checking
1016   PSI_stage_info old_stage;
1017   DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, gtid_set, timeout)");
1018 
1019   if (!inited)
1020     DBUG_RETURN(-2);
1021 
1022   DEBUG_SYNC(thd, "begin_wait_for_gtid_set");
1023 
1024   set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
1025 
1026   mysql_mutex_lock(&data_lock);
1027   thd->ENTER_COND(&data_cond, &data_lock,
1028                   &stage_waiting_for_the_slave_thread_to_advance_position,
1029                   &old_stage);
1030   /*
1031      This function will abort when it notices that some CHANGE MASTER or
1032      RESET MASTER has changed the master info.
1033      To catch this, these commands modify abort_pos_wait ; We just monitor
1034      abort_pos_wait and see if it has changed.
1035      Why do we have this mechanism instead of simply monitoring slave_running
1036      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
1037      the SQL thread be stopped?
1038      This is becasue if someones does:
1039      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
1040      the change may happen very quickly and we may not notice that
1041      slave_running briefly switches between 1/0/1.
1042   */
1043   init_abort_pos_wait= abort_pos_wait;
1044 
1045   /* The "compare and wait" main loop */
1046   while (!thd->killed &&
1047          init_abort_pos_wait == abort_pos_wait &&
1048          slave_running)
1049   {
1050     DBUG_PRINT("info",
1051                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
1052                 init_abort_pos_wait, abort_pos_wait));
1053 
1054     //wait for master update, with optional timeout.
1055 
1056     global_sid_lock->wrlock();
1057     const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1058     const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
1059 
1060     char *wait_gtid_set_buf;
1061     wait_gtid_set->to_string(&wait_gtid_set_buf);
1062     DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
1063                         "!is_intersection_nonempty: %d",
1064                         wait_gtid_set_buf,
1065                         wait_gtid_set->is_subset(executed_gtids),
1066                         !owned_gtids->is_intersection_nonempty(wait_gtid_set)));
1067     my_free(wait_gtid_set_buf);
1068     executed_gtids->dbug_print("gtid_executed:");
1069     owned_gtids->dbug_print("owned_gtids:");
1070 
1071     /*
1072       Since commit is performed after log to binary log, we must also
1073       check if any GTID of wait_gtid_set is not yet committed.
1074     */
1075     if (wait_gtid_set->is_subset(executed_gtids) &&
1076         !owned_gtids->is_intersection_nonempty(wait_gtid_set))
1077     {
1078       global_sid_lock->unlock();
1079       break;
1080     }
1081     global_sid_lock->unlock();
1082 
1083     DBUG_PRINT("info",("Waiting for master update"));
1084 
1085     /*
1086       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
1087       will wake us up.
1088     */
1089     thd_wait_begin(thd, THD_WAIT_BINLOG);
1090     if (timeout > 0)
1091     {
1092       /*
1093         Note that mysql_cond_timedwait checks for the timeout
1094         before for the condition ; i.e. it returns ETIMEDOUT
1095         if the system time equals or exceeds the time specified by abstime
1096         before the condition variable is signaled or broadcast, _or_ if
1097         the absolute time specified by abstime has already passed at the time
1098         of the call.
1099         For that reason, mysql_cond_timedwait will do the "timeoutting" job
1100         even if its condition is always immediately signaled (case of a loaded
1101         master).
1102       */
1103       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
1104     }
1105     else
1106       mysql_cond_wait(&data_cond, &data_lock);
1107     thd_wait_end(thd);
1108     DBUG_PRINT("info",("Got signal of master update or timed out"));
1109     if (error == ETIMEDOUT || error == ETIME)
1110     {
1111 #ifndef NDEBUG
1112       /*
1113         Doing this to generate a stack trace and make debugging
1114         easier.
1115       */
1116       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
1117         assert(0);
1118 #endif
1119       error= -1;
1120       break;
1121     }
1122     error=0;
1123     event_count++;
1124     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
1125   }
1126 
1127   mysql_mutex_unlock(&data_lock);
1128   thd->EXIT_COND(&old_stage);
1129   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
1130 improper_arguments: %d  timed_out: %d",
1131                      thd->killed_errno(),
1132                      (int) (init_abort_pos_wait != abort_pos_wait),
1133                      (int) slave_running,
1134                      (int) (error == -2),
1135                      (int) (error == -1)));
1136   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
1137       !slave_running)
1138   {
1139     error= -2;
1140   }
1141   DBUG_RETURN( error ? error : event_count );
1142 }
1143 
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)1144 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
1145                                             bool need_data_lock)
1146 {
1147   int error= 0;
1148   DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
1149 
1150   if (need_data_lock)
1151   {
1152     const ulong timeout= info_thd->variables.lock_wait_timeout;
1153 
1154     /*
1155       Acquire protection against global BINLOG lock before rli->data_lock is
1156       locked (otherwise we would also block SHOW SLAVE STATUS).
1157     */
1158     assert(!info_thd->backup_binlog_lock.is_acquired());
1159     DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
1160     mysql_mutex_assert_not_owner(&data_lock);
1161     if (info_thd->backup_binlog_lock.acquire_protection(info_thd, MDL_EXPLICIT,
1162                                                         timeout))
1163       DBUG_RETURN(1);
1164 
1165     mysql_mutex_lock(&data_lock);
1166   }
1167   else
1168   {
1169     mysql_mutex_assert_owner(&data_lock);
1170     assert(info_thd->backup_binlog_lock.is_protection_acquired());
1171   }
1172 
1173   inc_event_relay_log_pos();
1174   group_relay_log_pos= event_relay_log_pos;
1175   strmake(group_relay_log_name,event_relay_log_name,
1176           sizeof(group_relay_log_name)-1);
1177 
1178   notify_group_relay_log_name_update();
1179 
1180   /*
1181     In 4.x we used the event's len to compute the positions here. This is
1182     wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1183     then the event's len is not what is was in the master's binlog, so this
1184     will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1185     replication: Exec_master_log_pos is wrong). Only way to solve this is to
1186     have the original offset of the end of the event the relay log. This is
1187     what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1188     of log_pos in 4.0 was to compute the end_log_pos; so better to store
1189     end_log_pos instead of begin_log_pos.
1190     If we had not done this fix here, the problem would also have appeared
1191     when the slave and master are 5.0 but with different event length (for
1192     example the slave is more recent than the master and features the event
1193     UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1194     SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1195     value which would lead to badly broken replication.
1196     Even the relay_log_pos will be corrupted in this case, because the len is
1197     the relay log is not "val".
1198     With the end_log_pos solution, we avoid computations involving lengthes.
1199   */
1200   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
1201                       (long) log_pos, (long) get_group_master_log_pos()));
1202 
1203   if (log_pos > 0)  // 3.23 binlogs don't have log_posx
1204     set_group_master_log_pos(log_pos);
1205   /*
1206     If the master log position was invalidiated by say, "CHANGE MASTER TO
1207     RELAY_LOG_POS=N", it is now valid,
1208    */
1209   if (is_group_master_log_pos_invalid)
1210     is_group_master_log_pos_invalid= false;
1211 
1212   /*
1213     In MTS mode FD or Rotate event commit their solitary group to
1214     Coordinator's info table. Callers make sure that Workers have been
1215     executed all assignements.
1216     Broadcast to master_pos_wait() waiters should be done after
1217     the table is updated.
1218   */
1219   assert(!is_parallel_exec() ||
1220          mts_group_status != Relay_log_info::MTS_IN_GROUP);
1221   /*
1222     We do not force synchronization at this point, note the
1223     parameter false, because a non-transactional change is
1224     being committed.
1225 
1226     For that reason, the synchronization here is subjected to
1227     the option sync_relay_log_info.
1228 
1229     See sql/rpl_rli.h for further information on this behavior.
1230   */
1231   error= flush_info(FALSE);
1232 
1233   mysql_cond_broadcast(&data_cond);
1234   if (need_data_lock)
1235   {
1236     mysql_mutex_unlock(&data_lock);
1237 
1238     DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1239     info_thd->backup_binlog_lock.release_protection(info_thd);
1240   }
1241 
1242   DBUG_RETURN(error);
1243 }
1244 
1245 
close_temporary_tables()1246 void Relay_log_info::close_temporary_tables()
1247 {
1248   TABLE *table,*next;
1249   int num_closed_temp_tables= 0;
1250   DBUG_ENTER("Relay_log_info::close_temporary_tables");
1251 
1252   for (table=save_temporary_tables ; table ; table=next)
1253   {
1254     next=table->next;
1255     /*
1256       Don't ask for disk deletion. For now, anyway they will be deleted when
1257       slave restarts, but it is a better intention to not delete them.
1258     */
1259     DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1260     close_temporary(table, 1, 0);
1261     num_closed_temp_tables++;
1262   }
1263   save_temporary_tables= 0;
1264   slave_open_temp_tables.atomic_add(-num_closed_temp_tables);
1265   channel_open_temp_tables.atomic_add(-num_closed_temp_tables);
1266   DBUG_VOID_RETURN;
1267 }
1268 
1269 /**
1270   Purges relay logs. It assumes to have a run lock on rli and that no
1271   slave thread are running.
1272 
1273   @param[in]   THD         connection,
1274   @param[in]   just_reset  if false, it tells that logs should be purged
1275                            and @c init_relay_log_pos() should be called,
1276   @errmsg[out] errmsg      store pointer to an error message.
1277 
1278   @retval 0 successfuly executed,
1279   @retval 1 otherwise error, where errmsg is set to point to the error message.
1280 */
1281 
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg,bool delete_only)1282 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1283                                      const char** errmsg, bool delete_only)
1284 {
1285   int error=0;
1286   const char *ln;
1287   /* name of the index file if opt_relaylog_index_name is set*/
1288   const char* log_index_name;
1289   /*
1290     Buffer to add channel name suffix when relay-log-index option is
1291     provided
1292    */
1293   char relay_bin_index_channel[FN_REFLEN];
1294 
1295   const char *ln_without_channel_name;
1296   /*
1297     Buffer to add channel name suffix when relay-log option is provided.
1298    */
1299   char relay_bin_channel[FN_REFLEN];
1300 
1301   char buffer[FN_REFLEN];
1302 
1303   mysql_mutex_t *log_lock= relay_log.get_log_lock();
1304 
1305   DBUG_ENTER("Relay_log_info::purge_relay_logs");
1306   bool binlog_prot_acquired= false;
1307 
1308   /*
1309     Even if inited==0, we still try to empty master_log_* variables. Indeed,
1310     inited==0 does not imply that they already are empty.
1311 
1312     It could be that slave's info initialization partly succeeded: for example
1313     if relay-log.info existed but *relay-bin*.* have been manually removed,
1314     init_info reads the old relay-log.info and fills rli->master_log_*, then
1315     init_info checks for the existence of the relay log, this fails and
1316     init_info leaves inited to 0.
1317     In that pathological case, master_log_pos* will be properly reinited at
1318     the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1319     purge_relay_logs, will delete bogus *.info files or replace them with
1320     correct files), however if the user does SHOW SLAVE STATUS before START
1321     SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1322     master_log_* for SHOW SLAVE STATUS to display fine in any case.
1323   */
1324 
1325   if (!thd->backup_binlog_lock.is_acquired())
1326   {
1327     const ulong timeout= thd->variables.lock_wait_timeout;
1328 
1329     DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
1330     mysql_mutex_assert_not_owner(&data_lock);
1331     if (thd->backup_binlog_lock.acquire_protection(thd, MDL_EXPLICIT,
1332                                                    timeout))
1333       DBUG_RETURN(1);
1334 
1335     binlog_prot_acquired= true;
1336   }
1337 
1338   set_group_master_log_name("");
1339   set_group_master_log_pos(0);
1340 
1341   /*
1342     Following the the relay log purge, the master_log_pos will be in sync
1343     with relay_log_pos, so the flag should be cleared. Refer bug#11766010.
1344   */
1345 
1346   is_group_master_log_pos_invalid= false;
1347 
1348   if (!inited)
1349   {
1350     DBUG_PRINT("info", ("inited == 0"));
1351     if (error_on_rli_init_info ||
1352         /*
1353           mi->reset means that the channel was reset but still exists. Channel
1354           shall have the index and the first relay log file.
1355 
1356           Those files shall be remove in a following RESET SLAVE ALL (even when
1357           channel was not inited again).
1358         */
1359         (mi->reset && delete_only))
1360     {
1361       ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
1362                                                        "-relay-bin", buffer);
1363 
1364       ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1365                                         ln_without_channel_name);
1366       if (opt_relaylog_index_name)
1367       {
1368         char index_file_withoutext[FN_REFLEN];
1369         relay_log.generate_name(opt_relaylog_index_name,"",
1370                                 index_file_withoutext);
1371 
1372         log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
1373                                                       FN_REFLEN,
1374                                                       index_file_withoutext);
1375       }
1376       else
1377         log_index_name= 0;
1378 
1379       if (relay_log.open_index_file(log_index_name, ln, TRUE))
1380       {
1381         sql_print_error("Unable to purge relay log files. Failed to open relay "
1382                         "log index file:%s.", relay_log.get_index_fname());
1383         if (binlog_prot_acquired)
1384         {
1385           DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1386           thd->backup_binlog_lock.release_protection(thd);
1387         }
1388         DBUG_RETURN(1);
1389       }
1390       mysql_mutex_lock(&mi->data_lock);
1391       mysql_mutex_lock(log_lock);
1392       if (relay_log.open_binlog(ln, 0,
1393                                 (max_relay_log_size ? max_relay_log_size :
1394                                  max_binlog_size), true,
1395                                 true/*need_lock_index=true*/,
1396                                 true/*need_sid_lock=true*/,
1397                                 mi->get_mi_description_event()))
1398       {
1399         mysql_mutex_unlock(log_lock);
1400         mysql_mutex_unlock(&mi->data_lock);
1401         sql_print_error("Unable to purge relay log files. Failed to open relay "
1402                         "log file:%s.", relay_log.get_log_fname());
1403         if (binlog_prot_acquired)
1404         {
1405           DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1406           thd->backup_binlog_lock.release_protection(thd);
1407         }
1408         DBUG_RETURN(1);
1409       }
1410       mysql_mutex_unlock(log_lock);
1411       mysql_mutex_unlock(&mi->data_lock);
1412     }
1413     else
1414     {
1415       if (binlog_prot_acquired)
1416       {
1417         DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1418         thd->backup_binlog_lock.release_protection(thd);
1419       }
1420       DBUG_RETURN(0);
1421     }
1422   }
1423   else
1424   {
1425     assert(slave_running == 0);
1426     assert(mi->slave_running == 0);
1427   }
1428   /* Reset the transaction boundary parser and clear the last GTID queued */
1429   mi->transaction_parser.reset();
1430   mi->clear_last_gtid_queued();
1431 
1432   slave_skip_counter= 0;
1433   mysql_mutex_lock(&data_lock);
1434 
1435   /*
1436     we close the relay log fd possibly left open by the slave SQL thread,
1437     to be able to delete it; the relay log fd possibly left open by the slave
1438     I/O thread will be closed naturally in reset_logs() by the
1439     close(LOG_CLOSE_TO_BE_OPENED) call
1440   */
1441   if (cur_log_fd >= 0)
1442   {
1443     end_io_cache(&cache_buf);
1444     my_close(cur_log_fd, MYF(MY_WME));
1445     cur_log_fd= -1;
1446   }
1447 
1448   /**
1449     Clear the retrieved gtid set for this channel.
1450     global_sid_lock->wrlock() is needed.
1451   */
1452   global_sid_lock->wrlock();
1453   (const_cast<Gtid_set *>(get_gtid_set()))->clear();
1454   global_sid_lock->unlock();
1455 
1456   if (relay_log.reset_logs(thd, delete_only))
1457   {
1458     *errmsg = "Failed during log reset";
1459     error=1;
1460     goto err;
1461   }
1462 
1463   /* Save name of used relay log file */
1464   set_group_relay_log_name(relay_log.get_log_fname());
1465   set_event_relay_log_name(relay_log.get_log_fname());
1466   group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1467   if (!delete_only && count_relay_log_space())
1468   {
1469     *errmsg= "Error counting relay log space";
1470     error= 1;
1471     goto err;
1472   }
1473   if (!just_reset)
1474     error= init_relay_log_pos(group_relay_log_name,
1475                               group_relay_log_pos,
1476                               false/*need_data_lock=false*/, errmsg, 0);
1477   if (!inited && error_on_rli_init_info)
1478     relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1479                     true/*need_lock_log=true*/,
1480                     true/*need_lock_index=true*/);
1481 err:
1482 #ifndef NDEBUG
1483   char buf[22];
1484 #endif
1485   DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1486   mysql_mutex_unlock(&data_lock);
1487 
1488   if (binlog_prot_acquired)
1489   {
1490       DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1491       thd->backup_binlog_lock.release_protection(thd);
1492   }
1493 
1494   DBUG_RETURN(error);
1495 }
1496 
1497 /*
1498    When --relay-bin option is not provided, the names of the
1499    relay log files are host-relay-bin.0000x or
1500    host-relay-bin-CHANNEL.00000x in the case of MSR.
1501    However, if that option is provided, then the names of the
1502    relay log files are <relay-bin-option>.0000x or
1503    <relay-bin-option>-CHANNEL.00000x in the case of MSR.
1504 
1505    The function adds a channel suffix (according to the channel to file name
1506    conventions and conversions) to the relay log file.
1507 
1508    @todo: truncate the log file if length exceeds.
1509 */
1510 
1511 const char*
add_channel_to_relay_log_name(char * buff,uint buff_size,const char * base_name)1512 Relay_log_info::add_channel_to_relay_log_name(char *buff, uint buff_size,
1513                                               const char *base_name)
1514 {
1515   char *ptr;
1516   char channel_to_file[FN_REFLEN];
1517   uint errors, length;
1518   uint base_name_len;
1519   uint suffix_buff_size;
1520 
1521   assert(base_name !=NULL);
1522 
1523   base_name_len= strlen(base_name);
1524   suffix_buff_size= buff_size - base_name_len;
1525 
1526   ptr= strmake(buff, base_name, buff_size-1);
1527 
1528   if (channel[0])
1529   {
1530 
1531    /* adding a "-" */
1532     ptr= strmake(ptr, "-", suffix_buff_size-1);
1533 
1534     /*
1535       Convert the channel name to the file names charset.
1536       Channel name is in system_charset which is UTF8_general_ci
1537       as it was defined as utf8 in the mysql.slaveinfo tables.
1538     */
1539     length= strconvert(system_charset_info, channel, &my_charset_filename,
1540                        channel_to_file, NAME_LEN, &errors);
1541     ptr= strmake(ptr, channel_to_file, suffix_buff_size-length-1);
1542 
1543   }
1544 
1545   return (const char*)buff;
1546 }
1547 
1548 
1549 /**
1550      Checks if condition stated in UNTIL clause of START SLAVE is reached.
1551 
1552      Specifically, it checks if UNTIL condition is reached. Uses caching result
1553      of last comparison of current log file name and target log file name. So
1554      cached value should be invalidated if current log file name changes (see
1555      @c Relay_log_info::notify_... functions).
1556 
1557      This caching is needed to avoid of expensive string comparisons and
1558      @c strtol() conversions needed for log names comparison. We don't need to
1559      compare them each time this function is called, we only need to do this
1560      when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1561      need to do this only after @c Rotate_log_event::do_apply_event() (which is
1562      rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1563      condition then we should invalidate cached comarison value after
1564      @c inc_group_relay_log_pos() which called for each group of events (so we
1565      have some benefit if we have something like queries that use
1566      autoincrement or if we have transactions).
1567 
1568      Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1569 
1570      @param master_beg_pos    position of the beginning of to be executed event
1571                               (not @c log_pos member of the event that points to
1572                               the beginning of the following event)
1573 
1574      @retval true   condition met or error happened (condition seems to have
1575                     bad log file name),
1576      @retval false  condition not met.
1577 */
1578 
is_until_satisfied(THD * thd,Log_event * ev)1579 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1580 {
1581   char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1582                     "condition is bad.";
1583   DBUG_ENTER("Relay_log_info::is_until_satisfied");
1584 
1585   switch (until_condition)
1586   {
1587   case UNTIL_MASTER_POS:
1588   case UNTIL_RELAY_POS:
1589   {
1590     const char *log_name= NULL;
1591     ulonglong log_pos= 0;
1592 
1593     if (until_condition == UNTIL_MASTER_POS)
1594     {
1595       if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1596         DBUG_RETURN(false);
1597       /*
1598         Rotate events originating from the slave have server_id==0,
1599         and their log_pos is relative to the slave, so in case their
1600         log_pos is greater than the log_pos we are waiting for, they
1601         can cause the slave to stop prematurely. So we ignore such
1602         events.
1603       */
1604       if (ev && ev->server_id == 0)
1605         DBUG_RETURN(false);
1606       log_name= get_group_master_log_name();
1607       if (!ev || is_in_group() || !ev->common_header->log_pos)
1608         log_pos= get_group_master_log_pos();
1609       else
1610         log_pos= ev->common_header->log_pos - ev->common_header->data_written;
1611     }
1612     else
1613     { /* until_condition == UNTIL_RELAY_POS */
1614       log_name= group_relay_log_name;
1615       log_pos= group_relay_log_pos;
1616     }
1617 
1618 #ifndef NDEBUG
1619     {
1620       char buf[32];
1621       DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1622                           get_group_master_log_name(),
1623                           llstr(get_group_master_log_pos(), buf)));
1624       DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1625                           group_relay_log_name, llstr(group_relay_log_pos, buf)));
1626       DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1627                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1628                           log_name, llstr(log_pos, buf)));
1629       DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1630                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1631                           until_log_name, llstr(until_log_pos, buf)));
1632     }
1633 #endif
1634 
1635     if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1636     {
1637       /*
1638         We have no cached comparison results so we should compare log names
1639         and cache result.
1640         If we are after RESET SLAVE, and the SQL slave thread has not processed
1641         any event yet, it could be that group_master_log_name is "". In that case,
1642         just wait for more events (as there is no sensible comparison to do).
1643       */
1644 
1645       if (*log_name)
1646       {
1647         const char *basename= log_name + dirname_length(log_name);
1648 
1649         const char *q= (const char*)(fn_ext(basename)+1);
1650         if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1651         {
1652           /* Now compare extensions. */
1653           char *q_end;
1654           ulong log_name_extension= strtoul(q, &q_end, 10);
1655           if (log_name_extension < until_log_name_extension)
1656             until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1657           else
1658             until_log_names_cmp_result=
1659               (log_name_extension > until_log_name_extension) ?
1660               UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1661         }
1662         else
1663         {
1664           /* Base names do not match, so we abort */
1665           sql_print_error("%s", error_msg);
1666           DBUG_RETURN(true);
1667         }
1668       }
1669       else
1670         DBUG_RETURN(until_log_pos == 0);
1671     }
1672 
1673     if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1674           log_pos >= until_log_pos) ||
1675          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1676     {
1677       char buf[22];
1678       sql_print_information("Slave SQL thread stopped because it reached its"
1679                             " UNTIL position %s", llstr(until_pos(), buf));
1680       DBUG_RETURN(true);
1681     }
1682     DBUG_RETURN(false);
1683   }
1684 
1685   case UNTIL_SQL_BEFORE_GTIDS:
1686     /*
1687       We only need to check once if executed_gtids set
1688       contains any of the until_sql_gtids.
1689     */
1690     if (until_sql_gtids_first_event)
1691     {
1692       until_sql_gtids_first_event= false;
1693       global_sid_lock->wrlock();
1694       /* Check if until GTIDs were already applied. */
1695       const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1696       if (until_sql_gtids.is_intersection_nonempty(executed_gtids))
1697       {
1698         char *buffer;
1699         until_sql_gtids.to_string(&buffer);
1700         global_sid_lock->unlock();
1701         sql_print_information("Slave SQL thread stopped because "
1702                               "UNTIL SQL_BEFORE_GTIDS %s is already "
1703                               "applied", buffer);
1704         my_free(buffer);
1705         DBUG_RETURN(true);
1706       }
1707       global_sid_lock->unlock();
1708     }
1709     if (ev != NULL && ev->get_type_code() == binary_log::GTID_LOG_EVENT)
1710     {
1711       Gtid_log_event *gev= (Gtid_log_event *)ev;
1712       global_sid_lock->rdlock();
1713       if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1714       {
1715         char *buffer;
1716         until_sql_gtids.to_string(&buffer);
1717         global_sid_lock->unlock();
1718         sql_print_information("Slave SQL thread stopped because it reached "
1719                               "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1720         my_free(buffer);
1721         DBUG_RETURN(true);
1722       }
1723       global_sid_lock->unlock();
1724     }
1725     DBUG_RETURN(false);
1726     break;
1727 
1728   case UNTIL_SQL_AFTER_GTIDS:
1729     {
1730       global_sid_lock->wrlock();
1731       const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1732       if (until_sql_gtids.is_subset(executed_gtids))
1733       {
1734         char *buffer;
1735         until_sql_gtids.to_string(&buffer);
1736         global_sid_lock->unlock();
1737         sql_print_information("Slave SQL thread stopped because it reached "
1738                               "UNTIL SQL_AFTER_GTIDS %s", buffer);
1739         my_free(buffer);
1740         DBUG_RETURN(true);
1741       }
1742       global_sid_lock->unlock();
1743       DBUG_RETURN(false);
1744     }
1745     break;
1746 
1747   case UNTIL_SQL_AFTER_MTS_GAPS:
1748   case UNTIL_DONE:
1749     /*
1750       TODO: this condition is actually post-execution or post-scheduling
1751             so the proper place to check it before SQL thread goes
1752             into next_event() where it can wait while the condition
1753             has been satisfied already.
1754             It's deployed here temporarily to be fixed along the regular UNTIL
1755             support for MTS is provided.
1756     */
1757     if (mts_recovery_group_cnt == 0)
1758     {
1759       sql_print_information("Slave SQL thread stopped according to "
1760                             "UNTIL SQL_AFTER_MTS_GAPS as it has "
1761                             "processed all gap transactions left from "
1762                             "the previous slave session.");
1763       until_condition= UNTIL_DONE;
1764       DBUG_RETURN(true);
1765     }
1766     else
1767     {
1768       DBUG_RETURN(false);
1769     }
1770     break;
1771 
1772   case UNTIL_SQL_VIEW_ID:
1773     if (ev != NULL && ev->get_type_code() == binary_log::VIEW_CHANGE_EVENT)
1774     {
1775       View_change_log_event *view_event= (View_change_log_event *)ev;
1776 
1777       if (until_view_id.compare(view_event->get_view_id()) == 0)
1778       {
1779         set_group_replication_retrieved_certification_info(view_event);
1780         until_view_id_found= true;
1781         DBUG_RETURN(false);
1782       }
1783     }
1784 
1785     if (until_view_id_found && ev != NULL && ev->ends_group())
1786     {
1787       until_view_id_commit_found= true;
1788       DBUG_RETURN(false);
1789     }
1790 
1791     if (until_view_id_commit_found && ev == NULL)
1792     {
1793       DBUG_RETURN(true);
1794     }
1795 
1796     DBUG_RETURN(false);
1797     break;
1798 
1799   case UNTIL_NONE:
1800     assert(0);
1801     break;
1802   }
1803 
1804   assert(0);
1805   DBUG_RETURN(false);
1806 }
1807 
cached_charset_invalidate()1808 void Relay_log_info::cached_charset_invalidate()
1809 {
1810   DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1811 
1812   /* Full of zeroes means uninitialized. */
1813   memset(cached_charset, 0, sizeof(cached_charset));
1814   DBUG_VOID_RETURN;
1815 }
1816 
1817 
cached_charset_compare(char * charset) const1818 bool Relay_log_info::cached_charset_compare(char *charset) const
1819 {
1820   DBUG_ENTER("Relay_log_info::cached_charset_compare");
1821 
1822   if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1823   {
1824     memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1825     DBUG_RETURN(1);
1826   }
1827   DBUG_RETURN(0);
1828 }
1829 
1830 
stmt_done(my_off_t event_master_log_pos)1831 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1832 {
1833   int error= 0;
1834 
1835   clear_flag(IN_STMT);
1836 
1837   assert(!belongs_to_client());
1838   /* Worker does not execute binlog update position logics */
1839   assert(!is_mts_worker(info_thd));
1840 
1841   /*
1842     Replication keeps event and group positions to specify the
1843     set of events that were executed.
1844     Event positions are incremented after processing each event
1845     whereas group positions are incremented when an event or a
1846     set of events is processed such as in a transaction and are
1847     committed or rolled back.
1848 
1849     A transaction can be ended with a Query Event, i.e. either
1850     commit or rollback, or by a Xid Log Event. Query Event is
1851     used to terminate pseudo-transactions that are executed
1852     against non-transactional engines such as MyIsam. Xid Log
1853     Event denotes though that a set of changes executed
1854     against a transactional engine is about to commit.
1855 
1856     Events' positions are incremented at stmt_done(). However,
1857     transactions that are ended with Xid Log Event have their
1858     group position incremented in the do_apply_event() and in
1859     the do_apply_event_work().
1860 
1861     Notice that the type of the engine, i.e. where data and
1862     positions are stored, against what events are being applied
1863     are not considered in this logic.
1864 
1865     Regarding the code that follows, notice that the executed
1866     group coordinates don't change if the current event is internal
1867     to the group. The same applies to MTS Coordinator when it
1868     handles a Format Descriptor event that appears in the middle
1869     of a group that is about to be assigned.
1870   */
1871   if ((!is_parallel_exec() && is_in_group()) ||
1872       mts_group_status != MTS_NOT_IN_GROUP)
1873   {
1874     inc_event_relay_log_pos();
1875   }
1876   else
1877   {
1878     if (is_parallel_exec())
1879     {
1880 
1881       assert(!is_mts_worker(info_thd));
1882 
1883       /*
1884         Format Description events only can drive MTS execution to this
1885         point. It is a special event group that is handled with
1886         synchronization. For that reason, the checkpoint routine is
1887         called here.
1888       */
1889       error= mts_checkpoint_routine(this, 0, false,
1890                                     true/*need_data_lock=true*/);
1891     }
1892     if (!error)
1893       error= inc_group_relay_log_pos(event_master_log_pos,
1894                                      true/*need_data_lock=true*/);
1895   }
1896 
1897   return error;
1898 }
1899 
1900 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1901 void Relay_log_info::cleanup_context(THD *thd, bool error)
1902 {
1903   DBUG_ENTER("Relay_log_info::cleanup_context");
1904 
1905   assert(info_thd == thd);
1906   /*
1907     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1908     may have opened tables, which we cannot be sure have been closed (because
1909     maybe the Rows_log_event have not been found or will not be, because slave
1910     SQL thread is stopping, or relay log has a missing tail etc). So we close
1911     all thread's tables. And so the table mappings have to be cancelled.
1912     2) Rows_log_event::do_apply_event() may even have started statements or
1913     transactions on them, which we need to rollback in case of error.
1914     3) If finding a Format_description_log_event after a BEGIN, we also need
1915     to rollback before continuing with the next events.
1916     4) so we need this "context cleanup" function.
1917   */
1918   if (error)
1919   {
1920     trans_rollback_stmt(thd); // if a "statement transaction"
1921     trans_rollback(thd);      // if a "real transaction"
1922   }
1923   if (rows_query_ev)
1924   {
1925     /*
1926       In order to avoid invalid memory access, THD::reset_query() should be
1927       called before deleting the rows_query event.
1928     */
1929     info_thd->reset_query();
1930     info_thd->reset_query_for_display();
1931     delete rows_query_ev;
1932     rows_query_ev= NULL;
1933     DBUG_EXECUTE_IF("after_deleting_the_rows_query_ev",
1934                     {
1935                       const char action[]="now SIGNAL deleted_rows_query_ev WAIT_FOR go_ahead";
1936                       assert(!debug_sync_set_action(info_thd,
1937                                                     STRING_WITH_LEN(action)));
1938                     };);
1939   }
1940   m_table_map.clear_tables();
1941   slave_close_thread_tables(thd);
1942   if (error)
1943   {
1944     /*
1945       trans_rollback above does not rollback XA transactions.
1946       It could be done only after necessarily closing tables which dictates
1947       the following placement.
1948     */
1949     XID_STATE *xid_state= thd->get_transaction()->xid_state();
1950     if (!xid_state->has_state(XID_STATE::XA_NOTR))
1951     {
1952       assert(DBUG_EVALUATE_IF("simulate_commit_failure",1,
1953                               xid_state->has_state(XID_STATE::XA_ACTIVE) ||
1954                               xid_state->has_state(XID_STATE::XA_IDLE)
1955                               ));
1956 
1957       xa_trans_force_rollback(thd);
1958       xid_state->reset();
1959       cleanup_trans_state(thd);
1960       thd->rpl_unflag_detached_engine_ha_data();
1961     }
1962     thd->mdl_context.release_transactional_locks();
1963   }
1964   clear_flag(IN_STMT);
1965   /*
1966     Cleanup for the flags that have been set at do_apply_event.
1967   */
1968   thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1969   thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1970 
1971   /*
1972     Reset state related to long_find_row notes in the error log:
1973     - timestamp
1974     - flag that decides whether the slave prints or not
1975   */
1976   reset_row_stmt_start_timestamp();
1977   unset_long_find_row_note_printed();
1978 
1979   /*
1980     If the slave applier changed the current transaction isolation level,
1981     it need to be restored to the session default value once having the
1982     current transaction cleared.
1983 
1984     We should call "trans_reset_one_shot_chistics()" only if the "error"
1985     flag is "true", because "cleanup_context()" is called at the end of each
1986     set of Table_maps/Rows representing a statement (when the rows event
1987     is tagged with the STMT_END_F) with the "error" flag as "false".
1988 
1989     So, without the "if (error)" below, the isolation level might be reset
1990     in the middle of a pure row based transaction.
1991   */
1992   if (error)
1993     trans_reset_one_shot_chistics(thd);
1994 
1995   DBUG_VOID_RETURN;
1996 }
1997 
clear_tables_to_lock()1998 void Relay_log_info::clear_tables_to_lock()
1999 {
2000   DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
2001 #ifndef NDEBUG
2002   /**
2003     When replicating in RBR and MyISAM Merge tables are involved
2004     open_and_lock_tables (called in do_apply_event) appends the
2005     base tables to the list of tables_to_lock. Then these are
2006     removed from the list in close_thread_tables (which is called
2007     before we reach this point).
2008 
2009     This assertion just confirms that we get no surprises at this
2010     point.
2011    */
2012   uint i=0;
2013   for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
2014   assert(i == tables_to_lock_count);
2015 #endif
2016 
2017   while (tables_to_lock)
2018   {
2019     uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
2020     if (tables_to_lock->m_tabledef_valid)
2021     {
2022       tables_to_lock->m_tabledef.table_def::~table_def();
2023       tables_to_lock->m_tabledef_valid= FALSE;
2024     }
2025 
2026     /*
2027       If blob fields were used during conversion of field values
2028       from the master table into the slave table, then we need to
2029       free the memory used temporarily to store their values before
2030       copying into the slave's table.
2031     */
2032     if (tables_to_lock->m_conv_table)
2033       free_blobs(tables_to_lock->m_conv_table);
2034 
2035     tables_to_lock=
2036       static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
2037     tables_to_lock_count--;
2038     my_free(to_free);
2039   }
2040   assert(tables_to_lock == NULL && tables_to_lock_count == 0);
2041   DBUG_VOID_RETURN;
2042 }
2043 
slave_close_thread_tables(THD * thd)2044 void Relay_log_info::slave_close_thread_tables(THD *thd)
2045 {
2046   thd->get_stmt_da()->set_overwrite_status(true);
2047   DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
2048   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
2049   thd->get_stmt_da()->set_overwrite_status(false);
2050 
2051   close_thread_tables(thd);
2052   /*
2053     - If transaction rollback was requested due to deadlock
2054     perform it and release metadata locks.
2055     - If inside a multi-statement transaction,
2056     defer the release of metadata locks until the current
2057     transaction is either committed or rolled back. This prevents
2058     other statements from modifying the table for the entire
2059     duration of this transaction.  This provides commit ordering
2060     and guarantees serializability across multiple transactions.
2061     - If in autocommit mode, or outside a transactional context,
2062     automatically release metadata locks of the current statement.
2063   */
2064   if (thd->transaction_rollback_request)
2065   {
2066     trans_rollback_implicit(thd);
2067     thd->mdl_context.release_transactional_locks();
2068   }
2069   else if (! thd->in_multi_stmt_transaction_mode())
2070     thd->mdl_context.release_transactional_locks();
2071   else
2072     thd->mdl_context.release_statement_locks();
2073 
2074   clear_tables_to_lock();
2075   DBUG_VOID_RETURN;
2076 }
2077 
2078 
2079 /**
2080   Execute a SHOW RELAYLOG EVENTS statement.
2081 
2082   When multiple replication channels exist on this slave
2083   and no channel name is specified through FOR CHANNEL clause
2084   this function errors out and exits.
2085 
2086   @param thd Pointer to THD object for the client thread executing the
2087   statement.
2088 
2089   @retval FALSE success
2090   @retval TRUE failure
2091 */
mysql_show_relaylog_events(THD * thd)2092 bool mysql_show_relaylog_events(THD* thd)
2093 {
2094 
2095   Master_info *mi =0;
2096   List<Item> field_list;
2097   bool res;
2098   DBUG_ENTER("mysql_show_relaylog_events");
2099 
2100   assert(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
2101 
2102   channel_map.wrlock();
2103 
2104   if (!thd->lex->mi.for_channel && channel_map.get_num_instances() > 1)
2105   {
2106     my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0));
2107     res= true;
2108     goto err;
2109   }
2110 
2111   Log_event::init_show_field_list(&field_list);
2112   if (thd->send_result_metadata(&field_list,
2113                                 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2114   {
2115     res= true;
2116     goto err;
2117   }
2118 
2119   mi= channel_map.get_mi(thd->lex->mi.channel);
2120 
2121   if (!mi && strcmp(thd->lex->mi.channel, channel_map.get_default_channel()))
2122   {
2123     my_error(ER_SLAVE_CHANNEL_DOES_NOT_EXIST, MYF(0), thd->lex->mi.channel);
2124     res= true;
2125     goto err;
2126   }
2127 
2128   if (mi == NULL)
2129   {
2130     my_error(ER_SLAVE_CONFIGURATION, MYF(0));
2131     res= true;
2132     goto err;
2133   }
2134 
2135   res= show_binlog_events(thd, &mi->rli->relay_log);
2136 
2137 err:
2138   channel_map.unlock();
2139 
2140   DBUG_RETURN(res);
2141 }
2142 #endif
2143 
2144 
rli_init_info()2145 int Relay_log_info::rli_init_info()
2146 {
2147   int error= 0;
2148   enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
2149   const char *msg= NULL;
2150   /* Store the GTID of a transaction spanned in multiple relay log files */
2151   Gtid gtid_partial_trx= {0, 0};
2152 
2153   DBUG_ENTER("Relay_log_info::rli_init_info");
2154 
2155   mysql_mutex_assert_owner(&data_lock);
2156 
2157   /*
2158     If Relay_log_info is issued again after a failed init_info(), for
2159     instance because of missing relay log files, it will generate new
2160     files and ignore the previous failure, to avoid that we set
2161     error_on_rli_init_info as true.
2162     This a consequence of the behaviour change, in the past server was
2163     stopped when there were replication initialization errors, now it is
2164     not and so init_info() must be aware of previous failures.
2165   */
2166   if (error_on_rli_init_info)
2167     goto err;
2168 
2169   if (inited)
2170   {
2171     /*
2172       We have to reset read position of relay-log-bin as we may have
2173       already been reading from 'hotlog' when the slave was stopped
2174       last time. If this case pos_in_file would be set and we would
2175       get a crash when trying to read the signature for the binary
2176       relay log.
2177 
2178       We only rewind the read position if we are starting the SQL
2179       thread. The handle_slave_sql thread assumes that the read
2180       position is at the beginning of the file, and will read the
2181       "signature" and then fast-forward to the last position read.
2182     */
2183     bool hot_log= FALSE;
2184     /*
2185       my_b_seek does an implicit flush_io_cache, so we need to:
2186 
2187       1. check if this log is active (hot)
2188       2. if it is we keep log_lock until the seek ends, otherwise
2189          release it right away.
2190 
2191       If we did not take log_lock, SQL thread might race with IO
2192       thread for the IO_CACHE mutex.
2193 
2194     */
2195     mysql_mutex_t *log_lock= relay_log.get_log_lock();
2196     mysql_mutex_lock(log_lock);
2197     hot_log= relay_log.is_active(linfo.log_file_name);
2198 
2199     if (!hot_log)
2200       mysql_mutex_unlock(log_lock);
2201 
2202     my_b_seek(cur_log, (my_off_t) 0);
2203 
2204     if (hot_log)
2205       mysql_mutex_unlock(log_lock);
2206     DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
2207   }
2208 
2209   cur_log_fd = -1;
2210   slave_skip_counter= 0;
2211   abort_pos_wait= 0;
2212   log_space_limit= relay_log_space_limit;
2213   log_space_total= 0;
2214   tables_to_lock= 0;
2215   tables_to_lock_count= 0;
2216 
2217   char pattern[FN_REFLEN];
2218   (void) my_realpath(pattern, slave_load_tmpdir, 0);
2219   /*
2220    @TODO:
2221     In MSR, sometimes slave fail with the following error:
2222     Unable to use slave's temporary directory /tmp -
2223     Can't create/write to file '/tmp/SQL_LOAD-92d1eee0-9de4-11e3-8874-68730ad50fcb'    (Errcode: 17 - File exists), Error_code: 1
2224 
2225    */
2226   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
2227                 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
2228   {
2229     sql_print_error("Unable to use slave's temporary directory '%s'.",
2230                     slave_load_tmpdir);
2231     DBUG_RETURN(1);
2232   }
2233   unpack_filename(slave_patternload_file, pattern);
2234   slave_patternload_file_size= strlen(slave_patternload_file);
2235 
2236   /*
2237     The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
2238     Note that the I/O thread flushes it to disk after writing every
2239     event, in flush_info within the master info.
2240   */
2241   /*
2242     For the maximum log size, we choose max_relay_log_size if it is
2243     non-zero, max_binlog_size otherwise. If later the user does SET
2244     GLOBAL on one of these variables, fix_max_binlog_size and
2245     fix_max_relay_log_size will reconsider the choice (for example
2246     if the user changes max_relay_log_size to zero, we have to
2247     switch to using max_binlog_size for the relay log) and update
2248     relay_log.max_size (and mysql_bin_log.max_size).
2249   */
2250   {
2251     /* Reports an error and returns, if the --relay-log's path
2252        is a directory.*/
2253     if (opt_relay_logname &&
2254         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
2255     {
2256       sql_print_error("Path '%s' is a directory name, please specify \
2257 a file name for --relay-log option.", opt_relay_logname);
2258       DBUG_RETURN(1);
2259     }
2260 
2261     /* Reports an error and returns, if the --relay-log-index's path
2262        is a directory.*/
2263     if (opt_relaylog_index_name &&
2264         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
2265         == FN_LIBCHAR)
2266     {
2267       sql_print_error("Path '%s' is a directory name, please specify \
2268 a file name for --relay-log-index option.", opt_relaylog_index_name);
2269       DBUG_RETURN(1);
2270     }
2271 
2272     char buf[FN_REFLEN];
2273     /* The base name of the relay log file considering multisource rep */
2274     const char *ln;
2275     /*
2276       relay log name without channel prefix taking into account
2277       --relay-log option.
2278     */
2279     const char *ln_without_channel_name;
2280     static bool name_warning_sent= 0;
2281 
2282     /*
2283       Buffer to add channel name suffix when relay-log option is provided.
2284     */
2285     char relay_bin_channel[FN_REFLEN];
2286     /*
2287       Buffer to add channel name suffix when relay-log-index option is provided
2288     */
2289     char relay_bin_index_channel[FN_REFLEN];
2290 
2291     /* name of the index file if opt_relaylog_index_name is set*/
2292     const char* log_index_name;
2293 
2294 
2295     ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
2296                                 "-relay-bin", buf);
2297 
2298     ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
2299                                       ln_without_channel_name);
2300 
2301     /* We send the warning only at startup, not after every RESET SLAVE */
2302     if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
2303     {
2304       /*
2305         User didn't give us info to name the relay log index file.
2306         Picking `hostname`-relay-bin.index like we do, causes replication to
2307         fail if this slave's hostname is changed later. So, we would like to
2308         instead require a name. But as we don't want to break many existing
2309         setups, we only give warning, not error.
2310       */
2311       sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
2312                         " so replication "
2313                         "may break when this MySQL server acts as a "
2314                         "slave and has his hostname changed!! Please "
2315                         "use '--relay-log=%s' to avoid this problem.",
2316                         ln_without_channel_name);
2317       name_warning_sent= 1;
2318     }
2319 
2320     relay_log.is_relay_log= TRUE;
2321 
2322     /*
2323        If relay log index option is set, convert into channel specific
2324        index file. If the opt_relaylog_index has an extension, we strip
2325        it too. This is inconsistent to relay log names.
2326     */
2327     if (opt_relaylog_index_name)
2328     {
2329       char index_file_withoutext[FN_REFLEN];
2330       relay_log.generate_name(opt_relaylog_index_name,"",
2331                               index_file_withoutext);
2332 
2333       log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
2334                                                    FN_REFLEN,
2335                                                    index_file_withoutext);
2336     }
2337     else
2338       log_index_name= 0;
2339 
2340 
2341 
2342     if (relay_log.open_index_file(log_index_name, ln, TRUE))
2343     {
2344       sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
2345       DBUG_RETURN(1);
2346     }
2347 #ifndef NDEBUG
2348     global_sid_lock->wrlock();
2349     gtid_set.dbug_print("set of GTIDs in relay log before initialization");
2350     global_sid_lock->unlock();
2351 #endif
2352     /*
2353       In the init_gtid_set below we pass the mi->transaction_parser.
2354       This will be useful to ensure that we only add a GTID to
2355       the Retrieved_Gtid_Set for fully retrieved transactions. Also, it will
2356       be useful to ensure the Retrieved_Gtid_Set behavior when auto
2357       positioning is disabled (we could have transactions spanning multiple
2358       relay log files in this case).
2359       We will skip this initialization if relay_log_recovery is set in order
2360       to save time, as neither the GTIDs nor the transaction_parser state
2361       would be useful when the relay log will be cleaned up later when calling
2362       init_recovery.
2363     */
2364     if (!is_relay_log_recovery &&
2365         !gtid_retrieved_initialized &&
2366         relay_log.init_gtid_sets(&gtid_set, NULL,
2367                                  opt_slave_sql_verify_checksum,
2368                                  true/*true=need lock*/,
2369                                  &mi->transaction_parser, &gtid_partial_trx))
2370     {
2371       sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
2372       DBUG_RETURN(1);
2373     }
2374     gtid_retrieved_initialized= true;
2375 #ifndef NDEBUG
2376     global_sid_lock->wrlock();
2377     gtid_set.dbug_print("set of GTIDs in relay log after initialization");
2378     global_sid_lock->unlock();
2379 #endif
2380     if (!gtid_partial_trx.is_empty())
2381     {
2382       /*
2383         The init_gtid_set has found an incomplete transaction in the relay log.
2384         We add this transaction's GTID to the last_gtid_queued so the IO thread
2385         knows which GTID to add to the Retrieved_Gtid_Set when reaching the end
2386         of the incomplete transaction.
2387       */
2388       mi->set_last_gtid_queued(gtid_partial_trx);
2389     }
2390     else
2391     {
2392       mi->clear_last_gtid_queued();
2393     }
2394     /*
2395       Configures what object is used by the current log to store processed
2396       gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
2397       corretly compute the set of previous gtids.
2398     */
2399     relay_log.set_previous_gtid_set_relaylog(&gtid_set);
2400     /*
2401       note, that if open() fails, we'll still have index file open
2402       but a destructor will take care of that
2403     */
2404 
2405     mysql_mutex_t *log_lock= relay_log.get_log_lock();
2406     mysql_mutex_lock(log_lock);
2407 
2408     if (relay_log.open_binlog(ln, 0,
2409                               (max_relay_log_size ? max_relay_log_size :
2410                                max_binlog_size), true,
2411                               true/*need_lock_index=true*/,
2412                               true/*need_sid_lock=true*/,
2413                               mi->get_mi_description_event()))
2414     {
2415       mysql_mutex_unlock(log_lock);
2416       sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
2417       DBUG_RETURN(1);
2418     }
2419 
2420     mysql_mutex_unlock(log_lock);
2421 
2422   }
2423 
2424    /*
2425     This checks if the repository was created before and thus there
2426     will be values to be read. Please, do not move this call after
2427     the handler->init_info().
2428   */
2429   if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
2430   {
2431     msg= "Error checking relay log repository";
2432     error= 1;
2433     goto err;
2434   }
2435 
2436   if (handler->init_info())
2437   {
2438     msg= "Error reading relay log configuration";
2439     error= 1;
2440     goto err;
2441   }
2442 
2443   if (check_return == REPOSITORY_DOES_NOT_EXIST)
2444   {
2445     /* Init relay log with first entry in the relay index file */
2446     if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
2447                            false/*need_data_lock=false (lock should be held
2448                                   prior to invoking this function)*/,
2449                            &msg, 0))
2450     {
2451       error= 1;
2452       goto err;
2453     }
2454     set_group_master_log_name("");
2455     set_group_master_log_pos(0);
2456   }
2457   else
2458   {
2459     if (read_info(handler))
2460     {
2461       msg= "Error reading relay log configuration";
2462       error= 1;
2463       goto err;
2464     }
2465 
2466     if (is_relay_log_recovery && init_recovery(mi, &msg))
2467     {
2468       error= 1;
2469       goto err;
2470     }
2471 
2472     if (init_relay_log_pos(group_relay_log_name,
2473                            group_relay_log_pos,
2474                            false/*need_data_lock=false (lock should be held
2475                                   prior to invoking this function)*/,
2476                            &msg, 0))
2477     {
2478       char llbuf[22];
2479       sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
2480                       group_relay_log_name,
2481                       llstr(group_relay_log_pos, llbuf));
2482       error= 1;
2483       goto err;
2484     }
2485 
2486 #ifndef NDEBUG
2487     {
2488       char llbuf1[22], llbuf2[22];
2489       DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
2490                           llstr(my_b_tell(cur_log),llbuf1),
2491                           llstr(event_relay_log_pos,llbuf2)));
2492       assert(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2493       assert((my_b_tell(cur_log) == event_relay_log_pos));
2494     }
2495 #endif
2496   }
2497 
2498   inited= 1;
2499   error_on_rli_init_info= false;
2500   if (flush_info(TRUE))
2501   {
2502     msg= "Error reading relay log configuration";
2503     error= 1;
2504     goto err;
2505   }
2506 
2507   if (count_relay_log_space())
2508   {
2509     msg= "Error counting relay log space";
2510     error= 1;
2511     goto err;
2512   }
2513 
2514   /*
2515     In case of MTS the recovery is deferred until the end of
2516     load_mi_and_rli_from_repositories.
2517   */
2518   if (!mi->rli->mts_recovery_group_cnt)
2519     is_relay_log_recovery= FALSE;
2520   DBUG_RETURN(error);
2521 
2522 err:
2523   handler->end_info();
2524   inited= 0;
2525   error_on_rli_init_info= true;
2526   if (msg)
2527     sql_print_error("%s.", msg);
2528   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2529                   true/*need_lock_log=true*/,
2530                   true/*need_lock_index=true*/);
2531   DBUG_RETURN(error);
2532 }
2533 
end_info()2534 void Relay_log_info::end_info()
2535 {
2536   DBUG_ENTER("Relay_log_info::end_info");
2537 
2538   error_on_rli_init_info= false;
2539   if (!inited)
2540     DBUG_VOID_RETURN;
2541 
2542   handler->end_info();
2543 
2544   if (cur_log_fd >= 0)
2545   {
2546     end_io_cache(&cache_buf);
2547     (void)my_close(cur_log_fd, MYF(MY_WME));
2548     cur_log_fd= -1;
2549   }
2550   inited = 0;
2551   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2552                   true/*need_lock_log=true*/,
2553                   true/*need_lock_index=true*/);
2554   relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2555   /*
2556     Delete the slave's temporary tables from memory.
2557     In the future there will be other actions than this, to ensure persistance
2558     of slave's temp tables after shutdown.
2559   */
2560   close_temporary_tables();
2561 
2562   DBUG_VOID_RETURN;
2563 }
2564 
flush_current_log()2565 int Relay_log_info::flush_current_log()
2566 {
2567   DBUG_ENTER("Relay_log_info::flush_current_log");
2568   /*
2569     When we come to this place in code, relay log may or not be initialized;
2570     the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2571   */
2572   IO_CACHE *log_file= relay_log.get_log_file();
2573   if (flush_io_cache(log_file))
2574     DBUG_RETURN(2);
2575 
2576   DBUG_RETURN(0);
2577 }
2578 
set_master_info(Master_info * info)2579 void Relay_log_info::set_master_info(Master_info* info)
2580 {
2581   mi= info;
2582 }
2583 
2584 /**
2585   Stores the file and position where the execute-slave thread are in the
2586   relay log:
2587 
2588     - As this is only called by the slave thread or on STOP SLAVE, with the
2589       log_lock grabbed and the slave thread stopped, we don't need to have
2590       a lock here.
2591     - If there is an active transaction, then we don't update the position
2592       in the relay log.  This is to ensure that we re-execute statements
2593       if we die in the middle of an transaction that was rolled back.
2594     - As a transaction never spans binary logs, we don't have to handle the
2595       case where we do a relay-log-rotation in the middle of the transaction.
2596       If this would not be the case, we would have to ensure that we
2597       don't delete the relay log file where the transaction started when
2598       we switch to a new relay log file.
2599 
2600   @retval  0   ok,
2601   @retval  1   write error, otherwise.
2602 */
2603 
2604 /**
2605   Store the file and position where the slave's SQL thread are in the
2606   relay log.
2607 
2608   Notes:
2609 
2610   - This function should be called either from the slave SQL thread,
2611     or when the slave thread is not running.  (It reads the
2612     group_{relay|master}_log_{pos|name} and delay fields in the rli
2613     object.  These may only be modified by the slave SQL thread or by
2614     a client thread when the slave SQL thread is not running.)
2615 
2616   - If there is an active transaction, then we do not update the
2617     position in the relay log.  This is to ensure that we re-execute
2618     statements if we die in the middle of an transaction that was
2619     rolled back.
2620 
2621   - As a transaction never spans binary logs, we don't have to handle
2622     the case where we do a relay-log-rotation in the middle of the
2623     transaction.  If transactions could span several binlogs, we would
2624     have to ensure that we do not delete the relay log file where the
2625     transaction started before switching to a new relay log file.
2626 
2627   - Error can happen if writing to file fails or if flushing the file
2628     fails.
2629 
2630   @param rli The object representing the Relay_log_info.
2631 
2632   @todo Change the log file information to a binary format to avoid
2633   calling longlong2str.
2634 
2635   @return 0 on success, 1 on error.
2636 */
flush_info(const bool force)2637 int Relay_log_info::flush_info(const bool force)
2638 {
2639   DBUG_ENTER("Relay_log_info::flush_info");
2640 
2641   if (!inited)
2642     DBUG_RETURN(0);
2643 
2644   /*
2645     We update the sync_period at this point because only here we
2646     now that we are handling a relay log info. This needs to be
2647     update every time we call flush because the option maybe
2648     dinamically set.
2649   */
2650   mysql_mutex_lock(&mts_temp_table_LOCK);
2651   handler->set_sync_period(sync_relayloginfo_period);
2652 
2653   if (write_info(handler))
2654     goto err;
2655 
2656   if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2657     goto err;
2658 
2659   force_flush_postponed_due_to_split_trans= false;
2660   mysql_mutex_unlock(&mts_temp_table_LOCK);
2661   DBUG_RETURN(0);
2662 
2663 err:
2664   sql_print_error("Error writing relay log configuration.");
2665   mysql_mutex_unlock(&mts_temp_table_LOCK);
2666   DBUG_RETURN(1);
2667 }
2668 
get_number_info_rli_fields()2669 size_t Relay_log_info::get_number_info_rli_fields()
2670 {
2671   return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2672 }
2673 
read_info(Rpl_info_handler * from)2674 bool Relay_log_info::read_info(Rpl_info_handler *from)
2675 {
2676   int lines= 0;
2677   char *first_non_digit= NULL;
2678   ulong temp_group_relay_log_pos= 0;
2679   char temp_group_master_log_name[FN_REFLEN];
2680   ulong temp_group_master_log_pos= 0;
2681   int temp_sql_delay= 0;
2682   int temp_internal_id= internal_id;
2683 
2684   DBUG_ENTER("Relay_log_info::read_info");
2685 
2686   /*
2687     Should not read RLI from file in client threads. Client threads
2688     only use RLI to execute BINLOG statements.
2689 
2690     @todo Uncomment the following assertion. Currently,
2691     Relay_log_info::init() is called from init_master_info() before
2692     the THD object Relay_log_info::sql_thd is created. That means we
2693     cannot call belongs_to_client() since belongs_to_client()
2694     dereferences Relay_log_info::sql_thd. So we need to refactor
2695     slightly: the THD object should be created by Relay_log_info
2696     constructor (or passed to it), so that we are guaranteed that it
2697     exists at this point. /Sven
2698   */
2699   //assert(!belongs_to_client());
2700 
2701   /*
2702     Starting from 5.1.x, relay-log.info has a new format. Now, its
2703     first line contains the number of lines in the file. By reading
2704     this number we can determine which version our master.info comes
2705     from. We can't simply count the lines in the file, since
2706     versions before 5.1.x could generate files with more lines than
2707     needed. If first line doesn't contain a number, or if it
2708     contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2709     then the file is treated like a file from pre-5.1.x version.
2710     There is no ambiguity when reading an old master.info: before
2711     5.1.x, the first line contained the binlog's name, which is
2712     either empty or has an extension (contains a '.'), so can't be
2713     confused with an integer.
2714 
2715     So we're just reading first line and trying to figure which
2716     version is this.
2717   */
2718 
2719   /*
2720     The first row is temporarily stored in mi->master_log_name, if
2721     it is line count and not binlog name (new format) it will be
2722     overwritten by the second row later.
2723   */
2724   if (from->prepare_info_for_read() ||
2725       from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2726                      (char *) ""))
2727     DBUG_RETURN(TRUE);
2728 
2729   lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2730 
2731   if (group_relay_log_name[0]!='\0' &&
2732       *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2733   {
2734     /* Seems to be new format => read group relay log name */
2735     if (from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2736                        (char *) ""))
2737       DBUG_RETURN(TRUE);
2738   }
2739   else
2740      DBUG_PRINT("info", ("relay_log_info file is in old format."));
2741 
2742   if (from->get_info(&temp_group_relay_log_pos,
2743                      (ulong) BIN_LOG_HEADER_SIZE) ||
2744       from->get_info(temp_group_master_log_name,
2745                      sizeof(temp_group_master_log_name),
2746                      (char *) "") ||
2747       from->get_info(&temp_group_master_log_pos,
2748                      0UL))
2749     DBUG_RETURN(TRUE);
2750 
2751   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2752   {
2753     if (from->get_info(&temp_sql_delay, 0))
2754       DBUG_RETURN(TRUE);
2755   }
2756 
2757   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2758   {
2759     if (from->get_info(&recovery_parallel_workers, 0UL))
2760       DBUG_RETURN(TRUE);
2761   }
2762 
2763   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2764   {
2765     if (from->get_info(&temp_internal_id, 1))
2766       DBUG_RETURN(TRUE);
2767   }
2768 
2769   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL)
2770   {
2771     /* the default value is empty string"" */
2772     if (from->get_info(channel, sizeof(channel), (char*)""))
2773       DBUG_RETURN(TRUE);
2774   }
2775 
2776   group_relay_log_pos=  temp_group_relay_log_pos;
2777   set_group_master_log_name(temp_group_master_log_name);
2778   set_group_master_log_pos(temp_group_master_log_pos);
2779   sql_delay= (int32) temp_sql_delay;
2780   internal_id= (uint) temp_internal_id;
2781 
2782   assert(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2783          (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2784   DBUG_RETURN(FALSE);
2785 }
2786 
set_info_search_keys(Rpl_info_handler * to)2787 bool Relay_log_info::set_info_search_keys(Rpl_info_handler *to)
2788 {
2789   DBUG_ENTER("Relay_log_info::set_info_search_keys");
2790 
2791   if (to->set_info(LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL, channel))
2792     DBUG_RETURN(TRUE);
2793 
2794   DBUG_RETURN(FALSE);
2795 }
2796 
2797 
write_info(Rpl_info_handler * to)2798 bool Relay_log_info::write_info(Rpl_info_handler *to)
2799 {
2800   DBUG_ENTER("Relay_log_info::write_info");
2801 
2802   /*
2803     @todo Uncomment the following assertion. See todo in
2804     Relay_log_info::read_info() for details. /Sven
2805   */
2806   //assert(!belongs_to_client());
2807 
2808   if (to->prepare_info_for_write() ||
2809       to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2810       to->set_info(group_relay_log_name) ||
2811       to->set_info((ulong) group_relay_log_pos) ||
2812       to->set_info(get_group_master_log_name()) ||
2813       to->set_info((ulong) get_group_master_log_pos()) ||
2814       to->set_info((int) sql_delay) ||
2815       to->set_info(recovery_parallel_workers) ||
2816       to->set_info((int) internal_id) ||
2817       to->set_info(channel))
2818     DBUG_RETURN(TRUE);
2819 
2820   DBUG_RETURN(FALSE);
2821 }
2822 
2823 /**
2824    The method is run by SQL thread/MTS Coordinator.
2825    It replaces the current FD event with a new one.
2826    A version adaptation routine is invoked for the new FD
2827    to align the slave applier execution context with the master version.
2828 
2829    Since FD are shared by Coordinator and Workers in the MTS mode,
2830    deletion of the old FD is done through decrementing its usage counter.
2831    The destructor runs when the later drops to zero,
2832    also see @c Slave_worker::set_rli_description_event().
2833    The usage counter of the new FD is incremented.
2834 
2835    Although notice that MTS worker runs it, inefficiently (see assert),
2836    once at its destruction time.
2837 
2838    @param  a pointer to be installed into execution context
2839            FormatDescriptor event
2840 */
2841 
set_rli_description_event(Format_description_log_event * fe)2842 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2843 {
2844   DBUG_ENTER("Relay_log_info::set_rli_description_event");
2845   assert(!info_thd || !is_mts_worker(info_thd) || !fe);
2846 
2847   if (fe)
2848   {
2849     ulong fe_version= adapt_to_master_version(fe);
2850 
2851     if (info_thd)
2852     {
2853       // See rpl_rli_pdb.h:Slave_worker::set_rli_description_event.
2854       if (!is_in_group() &&
2855           (info_thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
2856            info_thd->variables.gtid_next.type == UNDEFINED_GROUP))
2857       {
2858         DBUG_PRINT("info", ("Setting gtid_next.type to NOT_YET_DETERMINED_GROUP"));
2859         info_thd->variables.gtid_next.set_not_yet_determined();
2860       }
2861 
2862       if (is_parallel_exec() && fe_version > 0)
2863       {
2864         /*
2865           Prepare for workers' adaption to a new FD version. Workers
2866           will see notification through scheduling of a first event of
2867           a new post-new-FD.
2868         */
2869         for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
2870           (*it)->fd_change_notified= false;
2871       }
2872     }
2873   }
2874   if (rli_description_event &&
2875       rli_description_event->usage_counter.atomic_add(-1) == 1)
2876     delete rli_description_event;
2877 #ifndef NDEBUG
2878   else
2879     /* It must be MTS mode when the usage counter greater than 1. */
2880     assert(!rli_description_event || is_parallel_exec());
2881 #endif
2882   rli_description_event= fe;
2883   if (rli_description_event)
2884     rli_description_event->usage_counter.atomic_add(1);
2885 
2886   DBUG_VOID_RETURN;
2887 }
2888 
2889 struct st_feature_version
2890 {
2891   /*
2892     The enum must be in the version non-descending top-down order,
2893     the last item formally corresponds to highest possible server
2894     version (never reached, thereby no adapting actions here);
2895     enumeration starts from zero.
2896   */
2897   enum
2898   {
2899     WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2900     _END_OF_LIST // always last
2901   } item;
2902   /*
2903     Version where the feature is introduced.
2904   */
2905   uchar version_split[3];
2906   /*
2907     Action to perform when according to FormatDescriptor event Master
2908     is found to be feature-aware while previously it has *not* been.
2909   */
2910   void (*upgrade) (THD*);
2911   /*
2912     Action to perform when according to FormatDescriptor event Master
2913     is found to be feature-*un*aware while previously it has been.
2914   */
2915   void (*downgrade) (THD*);
2916 };
2917 
wl6292_upgrade_func(THD * thd)2918 void wl6292_upgrade_func(THD *thd)
2919 {
2920   thd->variables.explicit_defaults_for_timestamp= false;
2921   if (global_system_variables.explicit_defaults_for_timestamp)
2922     thd->variables.explicit_defaults_for_timestamp= true;
2923 
2924   return;
2925 }
2926 
wl6292_downgrade_func(THD * thd)2927 void wl6292_downgrade_func(THD *thd)
2928 {
2929   if (global_system_variables.explicit_defaults_for_timestamp)
2930     thd->variables.explicit_defaults_for_timestamp= false;
2931 
2932   return;
2933 }
2934 
2935 /**
2936    Sensitive to Master-vs-Slave version difference features
2937    should be listed in the version non-descending order.
2938 */
2939 static st_feature_version s_features[]=
2940 {
2941   // order is the same as in the enum
2942   { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2943     {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2944   { st_feature_version::_END_OF_LIST,
2945     {255, 255, 255}, NULL, NULL }
2946 };
2947 
2948 /**
2949    The method computes the incoming "master"'s FD server version and that
2950    of the currently installed (if ever) rli_description_event, to
2951    invoke more specific method to compare the two and adapt slave applier execution
2952    context to the new incoming master's version.
2953 
2954    This method is specifically for STS applier/MTS Coordinator as well as
2955    for a user thread applying binlog events.
2956 
2957    @param  fdle  a pointer to new Format Description event that is being
2958                  set up a new execution context.
2959    @return 0                when the versions are equal,
2960            master_version   otherwise
2961 */
adapt_to_master_version(Format_description_log_event * fdle)2962 ulong Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2963 {
2964   ulong master_version, current_version, slave_version;
2965 
2966   slave_version= version_product(slave_version_split);
2967   /* When rli_description_event is uninitialized yet take the slave's version */
2968   master_version= !fdle ? slave_version : fdle->get_product_version();
2969   current_version= !rli_description_event ? slave_version :
2970     rli_description_event->get_product_version();
2971 
2972   return adapt_to_master_version_updown(master_version, current_version);
2973 }
2974 
2975 /**
2976   The method compares two supplied versions and carries out down- or
2977   up- grade customization of execution context of the slave applier
2978   (thd).
2979 
2980   The method is invoked in the STS case through
2981   Relay_log_info::adapt_to_master_version() right before a new master
2982   FD is installed into the applier execution context; in the MTS
2983   case it's done by the Worker when it's assigned with a first event
2984   after the latest new FD has been installed.
2985 
2986   Comparison of the current (old, existing) and the master (new,
2987   incoming) versions yields adaptive actions.
2988   To explain that, let's denote V_0 as the current, and the master's
2989   one as V_1.
2990   In the downgrade case (V_1 < V_0) a server feature that is undefined
2991   in V_1 but is defined starting from some V_f of [V_1 + 1, V_0] range
2992   (+1 to mean V_1 excluded) are invalidated ("removed" from execution context)
2993   by running so called here downgrade action.
2994   Conversely in the upgrade case a feature defined in [V_0 + 1, V_1] range
2995   is validated ("added" to execution context) by running its upgrade action.
2996   A typical use case showing how adaptive actions are necessary for the slave
2997   applier is when the master version is lesser than the slave's one.
2998   In such case events generated on the "older" master may need to be applied
2999   in their native server context. And such context can be provided by downgrade
3000   actions.
3001   Conversely, when the old master events are run out and a newer master's events
3002   show up for applying, the execution context will be upgraded through
3003   the namesake actions.
3004 
3005   Notice that a relay log may have two FD events, one the slave local
3006   and the other from the Master. As there's no concern for the FD
3007   originator this leads to two adapt_to_master_version() calls.
3008   It's not harmful as can be seen from the following example.
3009   Say the currently installed FD's version is
3010   V_m, then at relay-log rotation the following transition takes
3011   place:
3012 
3013      V_m  -adapt-> V_s -adapt-> V_m.
3014 
3015   here and further `m' subscript stands for the master, `s' for the slave.
3016   It's clear that in this case an ineffective V_m -> V_m transition occurs.
3017 
3018   At composing downgrade/upgrade actions keep in mind that the slave applier
3019   version transition goes the following route:
3020   The initial version is that of the slave server (V_ss).
3021   It changes to a magic 4.0 at the slave relay log initialization.
3022   In the following course versions are extracted from each FD read out,
3023   regardless of what server generated it. Here is a typical version
3024   transition sequence underscored with annotation:
3025 
3026    V_ss -> 4.0 -> V(FD_s^1) -> V(FD_m^2)   --->   V(FD_s^3) -> V(FD_m^4)  ...
3027 
3028     ----------     -----------------     --------  ------------------     ---
3029      bootstrap       1st relay log       rotation      2nd log            etc
3030 
3031   The upper (^) subscipt enumerates Format Description events, V(FD^i) stands
3032   for a function extrating the version data from the i:th FD.
3033 
3034   There won't be any action to execute when info_thd is undefined,
3035   e.g at bootstrap.
3036 
3037   @param  master_version   an upcoming new version
3038   @param  current_version  the current version
3039   @return 0                when the new version is equal to the current one,
3040           master_version   otherwise
3041 */
adapt_to_master_version_updown(ulong master_version,ulong current_version)3042 ulong Relay_log_info::adapt_to_master_version_updown(ulong master_version,
3043                                                      ulong current_version)
3044 {
3045   THD *thd= info_thd;
3046   /*
3047     When the SQL thread or MTS Coordinator executes this method
3048     there's a constraint on current_version argument.
3049   */
3050   assert(!thd ||
3051          thd->rli_fake != NULL ||
3052          thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER ||
3053          (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
3054           (!rli_description_event ||
3055            current_version ==
3056            rli_description_event->get_product_version())));
3057 
3058   if (master_version == current_version)
3059     return 0;
3060   else if (!thd)
3061     return master_version;
3062 
3063   bool downgrade= master_version < current_version;
3064    /*
3065     find item starting from and ending at for which adaptive actions run
3066     for downgrade or upgrade branches.
3067     (todo: convert into bsearch when number of features will grow significantly)
3068   */
3069   long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
3070 
3071   for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
3072   {
3073     ulong ver_f= version_product(s_features[i].version_split);
3074 
3075     if ((downgrade ? master_version : current_version) < ver_f &&
3076         i_first == st_feature_version::_END_OF_LIST)
3077       i_first= i;
3078     if ((downgrade ? current_version : master_version) < ver_f)
3079     {
3080       i_last= i;
3081       assert(i_last >= i_first);
3082       break;
3083     }
3084   }
3085 
3086   /*
3087      actions, executed in version non-descending st_feature_version order
3088   */
3089   for (i= i_first; i < i_last; i++)
3090   {
3091     /* Run time check of the st_feature_version items ordering */
3092     assert(!i ||
3093            version_product(s_features[i - 1].version_split) <=
3094            version_product(s_features[i].version_split));
3095 
3096     assert((downgrade ? master_version : current_version) <
3097            version_product(s_features[i].version_split) &&
3098            (downgrade ? current_version : master_version  >=
3099             version_product(s_features[i].version_split)));
3100 
3101     if (downgrade && s_features[i].downgrade)
3102     {
3103       s_features[i].downgrade(thd);
3104     }
3105     else if (s_features[i].upgrade)
3106     {
3107       s_features[i].upgrade(thd);
3108     }
3109   }
3110 
3111   return master_version;
3112 }
3113 
relay_log_number_to_name(uint number,char name[FN_REFLEN+1])3114 void Relay_log_info::relay_log_number_to_name(uint number,
3115                                               char name[FN_REFLEN+1])
3116 {
3117   char *str= NULL;
3118   char relay_bin_channel[FN_REFLEN+1];
3119   const char *relay_log_basename_channel=
3120     add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN+1,
3121                                   relay_log_basename);
3122 
3123   /* str points to closing null of relay log basename channel */
3124   str= strmake(name, relay_log_basename_channel, FN_REFLEN+1);
3125   *str++= '.';
3126   sprintf(str, "%06u", number);
3127 }
3128 
relay_log_name_to_number(const char * name)3129 uint Relay_log_info::relay_log_name_to_number(const char *name)
3130 {
3131   return static_cast<uint>(atoi(fn_ext(name)+1));
3132 }
3133 
is_mts_db_partitioned(Relay_log_info * rli)3134 bool is_mts_db_partitioned(Relay_log_info * rli)
3135 {
3136   return (rli->current_mts_submode->get_type() ==
3137     MTS_PARALLEL_TYPE_DB_NAME);
3138 }
3139 
get_for_channel_str(bool upper_case) const3140 const char* Relay_log_info::get_for_channel_str(bool upper_case) const
3141 {
3142   if (rli_fake)
3143     return "";
3144   else
3145     return mi->get_for_channel_str(upper_case);
3146 }
3147 
add_gtid_set(const Gtid_set * gtid_set)3148 enum_return_status Relay_log_info::add_gtid_set(const Gtid_set *gtid_set)
3149 {
3150   DBUG_ENTER("Relay_log_info::add_gtid_set(gtid_set)");
3151 
3152   enum_return_status return_status= this->gtid_set.add_gtid_set(gtid_set);
3153 
3154   DBUG_RETURN(return_status);
3155 }
3156 
detach_engine_ha_data(THD * thd)3157 void Relay_log_info::detach_engine_ha_data(THD *thd)
3158 {
3159   is_engine_ha_data_detached= true;
3160     /*
3161       In case of slave thread applier or processing binlog by client,
3162       detach the engine ha_data ("native" engine transaction)
3163       in favor of dynamically created.
3164     */
3165   plugin_foreach(thd, detach_native_trx,
3166                  MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3167 }
3168 
reattach_engine_ha_data(THD * thd)3169 void Relay_log_info::reattach_engine_ha_data(THD *thd)
3170 {
3171   is_engine_ha_data_detached = false;
3172   /*
3173     In case of slave thread applier or processing binlog by client,
3174     reattach the engine ha_data ("native" engine transaction)
3175     in favor of dynamically created.
3176   */
3177   plugin_foreach(thd, reattach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3178 }
3179 
operator new(size_t request)3180 void* Relay_log_info::operator new(size_t request)
3181 {
3182   void* ptr;
3183   if (posix_memalign(&ptr, __alignof__(Relay_log_info), sizeof(Relay_log_info))) {
3184     throw std::bad_alloc();
3185   }
3186   return ptr;
3187 }
3188 
operator delete(void * ptr)3189 void Relay_log_info::operator delete(void * ptr) {
3190   free(ptr);
3191 }
3192