1 /* Copyright (c) 2006, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "sql_priv.h"
24 #include "unireg.h"                             // HAVE_*
25 #include "rpl_mi.h"
26 #include "rpl_rli.h"
27 #include "sql_base.h"                        // close_thread_tables
28 #include <my_dir.h>    // For MY_STAT
29 #include "log_event.h" // Format_description_log_event, Log_event,
30                        // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
31                        // PREFIX_SQL_LOAD
32 #include "rpl_slave.h"
33 #include "rpl_utility.h"
34 #include "transaction.h"
35 #include "sql_parse.h"                          // end_trans, ROLLBACK
36 #include "rpl_slave.h"
37 #include "rpl_rli_pdb.h"
38 #include "rpl_info_factory.h"
39 #include <mysql/plugin.h>
40 #include <mysql/service_thd_wait.h>
41 
42 using std::min;
43 using std::max;
44 
45 /*
46   Please every time you add a new field to the relay log info, update
47   what follows. For now, this is just used to get the number of
48   fields.
49 */
50 const char* info_rli_fields[]=
51 {
52   "number_of_lines",
53   "group_relay_log_name",
54   "group_relay_log_pos",
55   "group_master_log_name",
56   "group_master_log_pos",
57   "sql_delay",
58   "number_of_workers",
59   "id"
60 };
61 
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id)62 Relay_log_info::Relay_log_info(bool is_slave_recovery
63 #ifdef HAVE_PSI_INTERFACE
64                                ,PSI_mutex_key *param_key_info_run_lock,
65                                PSI_mutex_key *param_key_info_data_lock,
66                                PSI_mutex_key *param_key_info_sleep_lock,
67                                PSI_mutex_key *param_key_info_data_cond,
68                                PSI_mutex_key *param_key_info_start_cond,
69                                PSI_mutex_key *param_key_info_stop_cond,
70                                PSI_mutex_key *param_key_info_sleep_cond
71 #endif
72                                , uint param_id
73                               )
74    :Rpl_info("SQL"
75 #ifdef HAVE_PSI_INTERFACE
76              ,param_key_info_run_lock, param_key_info_data_lock,
77              param_key_info_sleep_lock,
78              param_key_info_data_cond, param_key_info_start_cond,
79              param_key_info_stop_cond, param_key_info_sleep_cond
80 #endif
81              , param_id
82             ),
83    replicate_same_server_id(::replicate_same_server_id),
84    cur_log_fd(-1), relay_log(&sync_relaylog_period),
85    is_relay_log_recovery(is_slave_recovery),
86    save_temporary_tables(0),
87    cur_log_old_open_count(0), error_on_rli_init_info(false),
88    group_relay_log_pos(0), event_relay_log_pos(0),
89    group_master_log_pos(0),
90    gtid_set(global_sid_map, global_sid_lock),
91    log_space_total(0), ignore_log_space_limit(0),
92    sql_force_rotate_relay(false),
93    last_master_timestamp(0), slave_skip_counter(0),
94    abort_pos_wait(0), until_condition(UNTIL_NONE),
95    until_log_pos(0),
96    until_sql_gtids(global_sid_map),
97    until_sql_gtids_first_event(true),
98    retried_trans(0),
99    tables_to_lock(0), tables_to_lock_count(0),
100    rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
101    slave_parallel_workers(0),
102    exit_counter(0),
103    max_updated_index(0),
104    recovery_parallel_workers(0), checkpoint_seqno(0),
105    checkpoint_group(opt_mts_checkpoint_group),
106    recovery_groups_inited(false), mts_recovery_group_cnt(0),
107    mts_recovery_index(0), mts_recovery_group_seen_begin(0),
108    mts_group_status(MTS_NOT_IN_GROUP), reported_unsafe_warning(false),
109    rli_description_event(NULL),
110    sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
111    long_find_row_note_printed(false)
112 {
113   DBUG_ENTER("Relay_log_info::Relay_log_info");
114 
115 #ifdef HAVE_PSI_INTERFACE
116   relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
117                          key_RELAYLOG_LOCK_commit,
118                          key_RELAYLOG_LOCK_commit_queue,
119                          key_RELAYLOG_LOCK_done,
120                          key_RELAYLOG_LOCK_flush_queue,
121                          key_RELAYLOG_LOCK_log,
122                          key_RELAYLOG_LOCK_sync,
123                          key_RELAYLOG_LOCK_sync_queue,
124                          key_RELAYLOG_LOCK_xids,
125                          key_RELAYLOG_COND_done,
126                          key_RELAYLOG_update_cond,
127                          key_RELAYLOG_prep_xids_cond,
128                          key_file_relaylog,
129                          key_file_relaylog_index);
130 #endif
131 
132   group_relay_log_name[0]= event_relay_log_name[0]=
133     group_master_log_name[0]= 0;
134   until_log_name[0]= ign_master_log_name_end[0]= 0;
135   set_timespec_nsec(last_clock, 0);
136   memset(&cache_buf, 0, sizeof(cache_buf));
137   cached_charset_invalidate();
138 
139   mysql_mutex_init(key_relay_log_info_log_space_lock,
140                    &log_space_lock, MY_MUTEX_INIT_FAST);
141   mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
142   mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
143                    MY_MUTEX_INIT_FAST);
144   mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
145   mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
146                    MY_MUTEX_INIT_FAST);
147   my_atomic_rwlock_init(&slave_open_temp_tables_lock);
148 
149   relay_log.init_pthread_objects();
150   do_server_version_split(::server_version, slave_version_split);
151   last_retrieved_gtid.clear();
152   force_flush_postponed_due_to_split_trans= false;
153   DBUG_VOID_RETURN;
154 }
155 
156 /**
157    The method to invoke at slave threads start
158 */
init_workers(ulong n_workers)159 void Relay_log_info::init_workers(ulong n_workers)
160 {
161   /*
162     Parallel slave parameters initialization is done regardless
163     whether the feature is or going to be active or not.
164   */
165   mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
166   mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
167   mts_last_online_stat= 0;
168   my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
169 }
170 
171 /**
172    The method to invoke at slave threads stop
173 */
deinit_workers()174 void Relay_log_info::deinit_workers()
175 {
176   delete_dynamic(&workers);
177 }
178 
~Relay_log_info()179 Relay_log_info::~Relay_log_info()
180 {
181   DBUG_ENTER("Relay_log_info::~Relay_log_info");
182 
183   if (recovery_groups_inited)
184     bitmap_free(&recovery_groups);
185   mysql_mutex_destroy(&log_space_lock);
186   mysql_cond_destroy(&log_space_cond);
187   mysql_mutex_destroy(&pending_jobs_lock);
188   mysql_cond_destroy(&pending_jobs_cond);
189   mysql_mutex_destroy(&exit_count_lock);
190   my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
191   relay_log.cleanup();
192   set_rli_description_event(NULL);
193   last_retrieved_gtid.clear();
194 
195   DBUG_VOID_RETURN;
196 }
197 
198 /**
199    Method is called when MTS coordinator senses the relay-log name
200    has been changed.
201    It marks each Worker member with this fact to make an action
202    at time it will distribute a terminal event of a group to the Worker.
203 
204    Worker receives the new name at the group commiting phase
205    @c Slave_worker::slave_worker_ends_group().
206 */
reset_notified_relay_log_change()207 void Relay_log_info::reset_notified_relay_log_change()
208 {
209   if (!is_parallel_exec())
210     return;
211   for (uint i= 0; i < workers.elements; i++)
212   {
213     Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
214     w->relay_log_change_notified= FALSE;
215   }
216 }
217 
218 /**
219    This method is called in mts_checkpoint_routine() to mark that each
220    worker is required to adapt to a new checkpoint data whose coordinates
221    are passed to it through GAQ index.
222 
223    Worker notices the new checkpoint value at the group commit to reset
224    the current bitmap and starts using the clean bitmap indexed from zero
225    of being reset checkpoint_seqno.
226 
227     New seconds_behind_master timestamp is installed.
228 
229    @param shift          number of bits to shift by Worker due to the
230                          current checkpoint change.
231    @param new_ts         new seconds_behind_master timestamp value
232                          unless zero. Zero could be due to FD event
233                          or fake rotate event.
234    @param need_data_lock False if caller has locked @c data_lock
235 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool need_data_lock)236 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
237                                                bool need_data_lock)
238 {
239   /*
240     If this is not a parallel execution we return immediately.
241   */
242   if (!is_parallel_exec())
243     return;
244 
245   for (uint i= 0; i < workers.elements; i++)
246   {
247     Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
248     /*
249       Reseting the notification information in order to force workers to
250       assign jobs with the new updated information.
251       Notice that the bitmap_shifted is accumulated to indicate how many
252       consecutive jobs were successfully processed.
253 
254       The worker when assigning a new job will set the value back to
255       zero.
256     */
257     w->checkpoint_notified= FALSE;
258     w->bitmap_shifted= w->bitmap_shifted + shift;
259     /*
260       Zero shift indicates the caller rotates the master binlog.
261       The new name will be passed to W through the group descriptor
262       during the first post-rotation time scheduling.
263     */
264     if (shift == 0)
265       w->master_log_change_notified= false;
266 
267     DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
268                "worker->bitmap_shifted --> %lu, worker --> %u.",
269                shift, w->bitmap_shifted, i));
270   }
271   /*
272     There should not be a call where (shift == 0 && checkpoint_seqno != 0).
273 
274     Then the new checkpoint sequence is updated by subtracting the number
275     of consecutive jobs that were successfully processed.
276   */
277   DBUG_ASSERT(!(shift == 0 && checkpoint_seqno != 0));
278   checkpoint_seqno= checkpoint_seqno - shift;
279   DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
280              "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
281 
282   if (new_ts)
283   {
284     if (need_data_lock)
285       mysql_mutex_lock(&data_lock);
286     else
287       mysql_mutex_assert_owner(&data_lock);
288     last_master_timestamp= new_ts;
289     if (need_data_lock)
290       mysql_mutex_unlock(&data_lock);
291   }
292 }
293 
294 /**
295    Reset recovery info from Worker info table and
296    mark MTS recovery is completed.
297 
298    @return false on success true when @c reset_notified_checkpoint failed.
299 */
mts_finalize_recovery()300 bool Relay_log_info::mts_finalize_recovery()
301 {
302   bool ret= false;
303   uint i;
304   uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
305 
306   DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
307 
308   for (i= 0; !ret && i < workers.elements; i++)
309   {
310     Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
311     ret= w->reset_recovery_info();
312     DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
313   }
314   /*
315     The loop is traversed in the worker index descending order due
316     to specifics of the Worker table repository that does not like
317     even temporary holes. Therefore stale records are deleted
318     from the tail.
319   */
320   DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
321                   {DBUG_SET("+d,mts_worker_thread_init_fails");});
322 
323   for (i= recovery_parallel_workers; i > workers.elements && !ret; i--)
324   {
325     Slave_worker *w=
326       Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
327     /*
328       If an error occurs during the above create_worker call, the newly created
329       worker object gets deleted within the above function call itself and only
330       NULL is returned. Hence the following check has been added to verify
331       that a valid worker object exists.
332     */
333     if (w)
334     {
335       ret= w->remove_info();
336       delete w;
337     }
338     else
339     {
340       ret= true;
341       goto err;
342     }
343   }
344   recovery_parallel_workers= slave_parallel_workers;
345 
346 err:
347   DBUG_RETURN(ret);
348 }
349 
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)350 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
351 {
352   MY_STAT s;
353   DBUG_ENTER("add_relay_log");
354   mysql_mutex_assert_owner(&rli->log_space_lock);
355   if (!mysql_file_stat(key_file_relaylog,
356                        linfo->log_file_name, &s, MYF(0)))
357   {
358     sql_print_error("log %s listed in the index, but failed to stat.",
359                     linfo->log_file_name);
360     DBUG_RETURN(1);
361   }
362   rli->log_space_total += s.st_size;
363 #ifndef DBUG_OFF
364   char buf[22];
365   DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
366 #endif
367   DBUG_RETURN(0);
368 }
369 
count_relay_log_space()370 int Relay_log_info::count_relay_log_space()
371 {
372   LOG_INFO flinfo;
373   DBUG_ENTER("Relay_log_info::count_relay_log_space");
374   mysql_mutex_lock(&log_space_lock);
375   log_space_total= 0;
376   if (relay_log.find_log_pos(&flinfo, NullS, 1))
377   {
378     sql_print_error("Could not find first log while counting relay log space.");
379     mysql_mutex_unlock(&log_space_lock);
380     DBUG_RETURN(1);
381   }
382   do
383   {
384     if (add_relay_log(this, &flinfo))
385     {
386       mysql_mutex_unlock(&log_space_lock);
387       DBUG_RETURN(1);
388     }
389   } while (!relay_log.find_next_log(&flinfo, 1));
390   /*
391      As we have counted everything, including what may have written in a
392      preceding write, we must reset bytes_written, or we may count some space
393      twice.
394   */
395   relay_log.reset_bytes_written();
396   mysql_mutex_unlock(&log_space_lock);
397   DBUG_RETURN(0);
398 }
399 
400 /**
401    Resets UNTIL condition for Relay_log_info
402  */
403 
clear_until_condition()404 void Relay_log_info::clear_until_condition()
405 {
406   DBUG_ENTER("clear_until_condition");
407 
408   until_condition= Relay_log_info::UNTIL_NONE;
409   until_log_name[0]= 0;
410   until_log_pos= 0;
411   until_sql_gtids.clear();
412   until_sql_gtids_first_event= true;
413   DBUG_VOID_RETURN;
414 }
415 
416 /**
417   Opens and intialize the given relay log. Specifically, it does what follows:
418 
419   - Closes old open relay log files.
420   - If we are using the same relay log as the running IO-thread, then sets.
421     rli->cur_log to point to the same IO_CACHE entry.
422   - If not, opens the 'log' binary file.
423 
424   @todo check proper initialization of
425   group_master_log_name/group_master_log_pos. /alfranio
426 
427   @param rli[in] Relay information (will be initialized)
428   @param log[in] Name of relay log file to read from. NULL = First log
429   @param pos[in] Position in relay log file
430   @param need_data_lock[in] If true, this function will acquire the
431   relay_log.data_lock(); otherwise the caller should already have
432   acquired it.
433   @param errmsg[out] On error, this function will store a pointer to
434   an error message here
435   @param keep_looking_for_fd[in] If true, this function will
436   look for a Format_description_log_event.  We only need this when the
437   SQL thread starts and opens an existing relay log and has to execute
438   it (possibly from an offset >4); then we need to read the first
439   event of the relay log to be able to parse the events we have to
440   execute.
441 
442   @retval 0 ok,
443   @retval 1 error.  In this case, *errmsg is set to point to the error
444   message.
445 */
446 
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)447 int Relay_log_info::init_relay_log_pos(const char* log,
448                                        ulonglong pos, bool need_data_lock,
449                                        const char** errmsg,
450                                        bool keep_looking_for_fd)
451 {
452   DBUG_ENTER("Relay_log_info::init_relay_log_pos");
453   DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
454 
455   *errmsg=0;
456   const char* errmsg_fmt= 0;
457   static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
458   mysql_mutex_t *log_lock= relay_log.get_log_lock();
459 
460   if (need_data_lock)
461     mysql_mutex_lock(&data_lock);
462   else
463     mysql_mutex_assert_owner(&data_lock);
464 
465   /*
466     By default the relay log is in binlog format 3 (4.0).
467     Even if format is 4, this will work enough to read the first event
468     (Format_desc) (remember that format 4 is just lenghtened compared to format
469     3; format 3 is a prefix of format 4).
470   */
471   set_rli_description_event(new Format_description_log_event(3));
472 
473   mysql_mutex_lock(log_lock);
474 
475   /* Close log file and free buffers if it's already open */
476   if (cur_log_fd >= 0)
477   {
478     end_io_cache(&cache_buf);
479     mysql_file_close(cur_log_fd, MYF(MY_WME));
480     cur_log_fd = -1;
481   }
482 
483   group_relay_log_pos= event_relay_log_pos= pos;
484 
485   /*
486     Test to see if the previous run was with the skip of purging
487     If yes, we do not purge when we restart
488   */
489   if (relay_log.find_log_pos(&linfo, NullS, 1))
490   {
491     *errmsg="Could not find first log during relay log initialization";
492     goto err;
493   }
494 
495   if (log && relay_log.find_log_pos(&linfo, log, 1))
496   {
497     errmsg_fmt= "Could not find target log file mentioned in "
498                 "relay log info in the index file '%s' during "
499                 "relay log initialization";
500     sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
501     *errmsg= errmsg_buff;
502     goto err;
503   }
504 
505   strmake(group_relay_log_name, linfo.log_file_name,
506           sizeof(group_relay_log_name) - 1);
507   strmake(event_relay_log_name, linfo.log_file_name,
508           sizeof(event_relay_log_name) - 1);
509 
510   if (relay_log.is_active(linfo.log_file_name))
511   {
512     /*
513       The IO thread is using this log file.
514       In this case, we will use the same IO_CACHE pointer to
515       read data as the IO thread is using to write data.
516     */
517     my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
518     if (check_binlog_magic(cur_log, errmsg))
519       goto err;
520     cur_log_old_open_count=relay_log.get_open_count();
521   }
522   else
523   {
524     /*
525       Open the relay log and set cur_log to point at this one
526     */
527     if ((cur_log_fd=open_binlog_file(&cache_buf,
528                                      linfo.log_file_name,errmsg)) < 0)
529       goto err;
530     cur_log = &cache_buf;
531   }
532   /*
533     In all cases, check_binlog_magic() has been called so we're at offset 4 for
534     sure.
535   */
536   if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
537   {
538     Log_event* ev;
539     while (keep_looking_for_fd)
540     {
541       /*
542         Read the possible Format_description_log_event; if position
543         was 4, no need, it will be read naturally.
544       */
545       DBUG_PRINT("info",("looking for a Format_description_log_event"));
546 
547       if (my_b_tell(cur_log) >= pos)
548         break;
549 
550       /*
551         Because of we have data_lock and log_lock, we can safely read an
552         event
553       */
554       if (!(ev= Log_event::read_log_event(cur_log, 0,
555                                           rli_description_event,
556                                           opt_slave_sql_verify_checksum)))
557       {
558         DBUG_PRINT("info",("could not read event, cur_log->error=%d",
559                            cur_log->error));
560         if (cur_log->error) /* not EOF */
561         {
562           *errmsg= "I/O error reading event at position 4";
563           goto err;
564         }
565         break;
566       }
567       else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
568       {
569         DBUG_PRINT("info",("found Format_description_log_event"));
570         set_rli_description_event((Format_description_log_event *)ev);
571         /*
572           As ev was returned by read_log_event, it has passed is_valid(), so
573           my_malloc() in ctor worked, no need to check again.
574         */
575         /*
576           Ok, we found a Format_description event. But it is not sure that this
577           describes the whole relay log; indeed, one can have this sequence
578           (starting from position 4):
579           Format_desc (of slave)
580           Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
581           Rotate (of master)
582           Format_desc (of master)
583           So the Format_desc which really describes the rest of the relay log
584           can be the 3rd or the 4th event (depending on GTIDs being enabled or
585           not, it can't be further than that, because we rotate
586           the relay log when we queue a Rotate event from the master).
587           But what describes the Rotate is the first Format_desc.
588           So what we do is:
589           go on searching for Format_description events, until you exceed the
590           position (argument 'pos') or until you find an event other than
591           Previous-GTIDs, Rotate or Format_desc.
592         */
593       }
594       else
595       {
596         DBUG_PRINT("info",("found event of another type=%d",
597                            ev->get_type_code()));
598         keep_looking_for_fd=
599           (ev->get_type_code() == ROTATE_EVENT ||
600            ev->get_type_code() == PREVIOUS_GTIDS_LOG_EVENT);
601         delete ev;
602       }
603     }
604     my_b_seek(cur_log,(off_t)pos);
605 #ifndef DBUG_OFF
606   {
607     char llbuf1[22], llbuf2[22];
608     DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
609                         llstr(my_b_tell(cur_log),llbuf1),
610                         llstr(get_event_relay_log_pos(),llbuf2)));
611   }
612 #endif
613 
614   }
615 
616 err:
617   /*
618     If we don't purge, we can't honour relay_log_space_limit ;
619     silently discard it
620   */
621   if (!relay_log_purge)
622   {
623     log_space_limit= 0; // todo: consider to throw a warning at least
624   }
625   mysql_cond_broadcast(&data_cond);
626 
627   mysql_mutex_unlock(log_lock);
628 
629   if (need_data_lock)
630     mysql_mutex_unlock(&data_lock);
631   if (!rli_description_event->is_valid() && !*errmsg)
632     *errmsg= "Invalid Format_description log event; could be out of memory";
633 
634   DBUG_RETURN ((*errmsg) ? 1 : 0);
635 }
636 
637 /**
638   Waits until the SQL thread reaches (has executed up to) the
639   log/position or timed out.
640 
641   SYNOPSIS
642   @param[in]  thd             client thread that sent @c SELECT @c MASTER_POS_WAIT,
643   @param[in]  log_name        log name to wait for,
644   @param[in]  log_pos         position to wait for,
645   @param[in]  timeout         @c timeout in seconds before giving up waiting.
646                               @c timeout is double whereas it should be ulong; but this is
647                               to catch if the user submitted a negative timeout.
648 
649   @retval  -2   improper arguments (log_pos<0)
650                 or slave not running, or master info changed
651                 during the function's execution,
652                 or client thread killed. -2 is translated to NULL by caller,
653   @retval  -1   timed out
654   @retval  >=0  number of log events the function had to wait
655                 before reaching the desired log/position
656  */
657 
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)658 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
659                                     longlong log_pos,
660                                     double timeout)
661 {
662   int event_count = 0;
663   ulong init_abort_pos_wait;
664   int error=0;
665   struct timespec abstime; // for timeout checking
666   PSI_stage_info old_stage;
667   DBUG_ENTER("Relay_log_info::wait_for_pos");
668 
669   if (!inited)
670     DBUG_RETURN(-2);
671 
672   DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
673                       log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
674 
675   set_timespec_nsec(abstime, (ulonglong) timeout * 1000000000ULL);
676   mysql_mutex_lock(&data_lock);
677   thd->ENTER_COND(&data_cond, &data_lock,
678                   &stage_waiting_for_the_slave_thread_to_advance_position,
679                   &old_stage);
680   /*
681      This function will abort when it notices that some CHANGE MASTER or
682      RESET MASTER has changed the master info.
683      To catch this, these commands modify abort_pos_wait ; We just monitor
684      abort_pos_wait and see if it has changed.
685      Why do we have this mechanism instead of simply monitoring slave_running
686      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
687      the SQL thread be stopped?
688      This is becasue if someones does:
689      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
690      the change may happen very quickly and we may not notice that
691      slave_running briefly switches between 1/0/1.
692   */
693   init_abort_pos_wait= abort_pos_wait;
694 
695   /*
696     We'll need to
697     handle all possible log names comparisons (e.g. 999 vs 1000).
698     We use ulong for string->number conversion ; this is no
699     stronger limitation than in find_uniq_filename in sql/log.cc
700   */
701   ulong log_name_extension;
702   char log_name_tmp[FN_REFLEN]; //make a char[] from String
703 
704   strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
705 
706   char *p= fn_ext(log_name_tmp);
707   char *p_end;
708   if (!*p || log_pos<0)
709   {
710     error= -2; //means improper arguments
711     goto err;
712   }
713   // Convert 0-3 to 4
714   log_pos= max<ulong>(log_pos, BIN_LOG_HEADER_SIZE);
715   /* p points to '.' */
716   log_name_extension= strtoul(++p, &p_end, 10);
717   /*
718     p_end points to the first invalid character.
719     If it equals to p, no digits were found, error.
720     If it contains '\0' it means conversion went ok.
721   */
722   if (p_end==p || *p_end)
723   {
724     error= -2;
725     goto err;
726   }
727 
728   /* The "compare and wait" main loop */
729   while (!thd->killed &&
730          init_abort_pos_wait == abort_pos_wait &&
731          slave_running)
732   {
733     bool pos_reached;
734     int cmp_result= 0;
735 
736     DBUG_PRINT("info",
737                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
738                 init_abort_pos_wait, abort_pos_wait));
739     DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
740                        group_master_log_name, (ulong) group_master_log_pos));
741 
742     /*
743       group_master_log_name can be "", if we are just after a fresh
744       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
745       (before we have executed one Rotate event from the master) or
746       (rare) if the user is doing a weird slave setup (see next
747       paragraph).  If group_master_log_name is "", we assume we don't
748       have enough info to do the comparison yet, so we just wait until
749       more data. In this case master_log_pos is always 0 except if
750       somebody (wrongly) sets this slave to be a slave of itself
751       without using --replicate-same-server-id (an unsupported
752       configuration which does nothing), then group_master_log_pos
753       will grow and group_master_log_name will stay "".
754     */
755     if (*group_master_log_name)
756     {
757       char *basename= (group_master_log_name +
758                        dirname_length(group_master_log_name));
759       /*
760         First compare the parts before the extension.
761         Find the dot in the master's log basename,
762         and protect against user's input error :
763         if the names do not match up to '.' included, return error
764       */
765       char *q= (char*)(fn_ext(basename)+1);
766       if (strncmp(basename, log_name_tmp, (int)(q-basename)))
767       {
768         error= -2;
769         break;
770       }
771       // Now compare extensions.
772       char *q_end;
773       ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
774       if (group_master_log_name_extension < log_name_extension)
775         cmp_result= -1 ;
776       else
777         cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
778 
779       pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
780                     cmp_result > 0);
781       if (pos_reached || thd->killed)
782         break;
783     }
784 
785     //wait for master update, with optional timeout.
786 
787     DBUG_PRINT("info",("Waiting for master update"));
788     /*
789       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
790       will wake us up.
791     */
792     thd_wait_begin(thd, THD_WAIT_BINLOG);
793     if (timeout > 0)
794     {
795       /*
796         Note that mysql_cond_timedwait checks for the timeout
797         before for the condition ; i.e. it returns ETIMEDOUT
798         if the system time equals or exceeds the time specified by abstime
799         before the condition variable is signaled or broadcast, _or_ if
800         the absolute time specified by abstime has already passed at the time
801         of the call.
802         For that reason, mysql_cond_timedwait will do the "timeoutting" job
803         even if its condition is always immediately signaled (case of a loaded
804         master).
805       */
806       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
807     }
808     else
809       mysql_cond_wait(&data_cond, &data_lock);
810     thd_wait_end(thd);
811     DBUG_PRINT("info",("Got signal of master update or timed out"));
812     if (error == ETIMEDOUT || error == ETIME)
813     {
814 #ifndef DBUG_OFF
815       /*
816         Doing this to generate a stack trace and make debugging
817         easier.
818       */
819       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
820         DBUG_ASSERT(0);
821 #endif
822       error= -1;
823       break;
824     }
825     error=0;
826     event_count++;
827     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
828   }
829 
830 err:
831   thd->EXIT_COND(&old_stage);
832   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
833 improper_arguments: %d  timed_out: %d",
834                      thd->killed_errno(),
835                      (int) (init_abort_pos_wait != abort_pos_wait),
836                      (int) slave_running,
837                      (int) (error == -2),
838                      (int) (error == -1)));
839   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
840       !slave_running)
841   {
842     error= -2;
843   }
844   DBUG_RETURN( error ? error : event_count );
845 }
846 
847 /*
848   TODO: This is a duplicated code that needs to be simplified.
849   This will be done while developing all possible sync options.
850   See WL#3584's specification.
851 
852   /Alfranio
853 */
wait_for_gtid_set(THD * thd,String * gtid,double timeout)854 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
855                                       double timeout)
856 {
857   int event_count = 0;
858   ulong init_abort_pos_wait;
859   int error=0;
860   struct timespec abstime; // for timeout checking
861   PSI_stage_info old_stage;
862   DBUG_ENTER("Relay_log_info::wait_for_gtid_set");
863 
864   if (!inited)
865     DBUG_RETURN(-2);
866 
867   DBUG_PRINT("info", ("Waiting for %s timeout %f", gtid->c_ptr_safe(),
868              timeout));
869 
870   set_timespec_nsec(abstime, (ulonglong) timeout * 1000000000ULL);
871   mysql_mutex_lock(&data_lock);
872   thd->ENTER_COND(&data_cond, &data_lock,
873                   &stage_waiting_for_the_slave_thread_to_advance_position,
874                   &old_stage);
875   /*
876      This function will abort when it notices that some CHANGE MASTER or
877      RESET MASTER has changed the master info.
878      To catch this, these commands modify abort_pos_wait ; We just monitor
879      abort_pos_wait and see if it has changed.
880      Why do we have this mechanism instead of simply monitoring slave_running
881      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
882      the SQL thread be stopped?
883      This is becasue if someones does:
884      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
885      the change may happen very quickly and we may not notice that
886      slave_running briefly switches between 1/0/1.
887   */
888   init_abort_pos_wait= abort_pos_wait;
889   Gtid_set wait_gtid_set(global_sid_map);
890   global_sid_lock->rdlock();
891   if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
892   {
893     global_sid_lock->unlock();
894     goto err;
895   }
896   global_sid_lock->unlock();
897 
898   /* The "compare and wait" main loop */
899   while (!thd->killed &&
900          init_abort_pos_wait == abort_pos_wait &&
901          slave_running)
902   {
903     DBUG_PRINT("info",
904                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
905                 init_abort_pos_wait, abort_pos_wait));
906 
907     //wait for master update, with optional timeout.
908 
909     global_sid_lock->wrlock();
910     const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
911     const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
912 
913     DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
914                         "!is_intersection_nonempty: %d",
915       gtid->c_ptr_safe(), wait_gtid_set.is_subset(logged_gtids),
916       !owned_gtids->is_intersection_nonempty(&wait_gtid_set)));
917     logged_gtids->dbug_print("gtid_executed:");
918     owned_gtids->dbug_print("owned_gtids:");
919 
920     /*
921       Since commit is performed after log to binary log, we must also
922       check if any GTID of wait_gtid_set is not yet committed.
923     */
924     if (wait_gtid_set.is_subset(logged_gtids) &&
925         !owned_gtids->is_intersection_nonempty(&wait_gtid_set))
926     {
927       global_sid_lock->unlock();
928       break;
929     }
930     global_sid_lock->unlock();
931 
932     DBUG_PRINT("info",("Waiting for master update"));
933 
934     /*
935       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
936       will wake us up.
937     */
938     thd_wait_begin(thd, THD_WAIT_BINLOG);
939     if (timeout > 0)
940     {
941       /*
942         Note that mysql_cond_timedwait checks for the timeout
943         before for the condition ; i.e. it returns ETIMEDOUT
944         if the system time equals or exceeds the time specified by abstime
945         before the condition variable is signaled or broadcast, _or_ if
946         the absolute time specified by abstime has already passed at the time
947         of the call.
948         For that reason, mysql_cond_timedwait will do the "timeoutting" job
949         even if its condition is always immediately signaled (case of a loaded
950         master).
951       */
952       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
953     }
954     else
955       mysql_cond_wait(&data_cond, &data_lock);
956     thd_wait_end(thd);
957     DBUG_PRINT("info",("Got signal of master update or timed out"));
958     if (error == ETIMEDOUT || error == ETIME)
959     {
960 #ifndef DBUG_OFF
961       /*
962         Doing this to generate a stack trace and make debugging
963         easier.
964       */
965       if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
966         DBUG_ASSERT(0);
967 #endif
968       error= -1;
969       break;
970     }
971     error=0;
972     event_count++;
973     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
974   }
975 
976 err:
977   thd->EXIT_COND(&old_stage);
978   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
979 improper_arguments: %d  timed_out: %d",
980                      thd->killed_errno(),
981                      (int) (init_abort_pos_wait != abort_pos_wait),
982                      (int) slave_running,
983                      (int) (error == -2),
984                      (int) (error == -1)));
985   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
986       !slave_running)
987   {
988     error= -2;
989   }
990   DBUG_RETURN( error ? error : event_count );
991 }
992 
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)993 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
994                                             bool need_data_lock)
995 {
996   int error= 0;
997   DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
998 
999   if (need_data_lock)
1000     mysql_mutex_lock(&data_lock);
1001   else
1002     mysql_mutex_assert_owner(&data_lock);
1003 
1004   inc_event_relay_log_pos();
1005   group_relay_log_pos= event_relay_log_pos;
1006   strmake(group_relay_log_name,event_relay_log_name,
1007           sizeof(group_relay_log_name)-1);
1008 
1009   notify_group_relay_log_name_update();
1010 
1011   /*
1012     In 4.x we used the event's len to compute the positions here. This is
1013     wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1014     then the event's len is not what is was in the master's binlog, so this
1015     will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1016     replication: Exec_master_log_pos is wrong). Only way to solve this is to
1017     have the original offset of the end of the event the relay log. This is
1018     what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1019     of log_pos in 4.0 was to compute the end_log_pos; so better to store
1020     end_log_pos instead of begin_log_pos.
1021     If we had not done this fix here, the problem would also have appeared
1022     when the slave and master are 5.0 but with different event length (for
1023     example the slave is more recent than the master and features the event
1024     UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1025     SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1026     value which would lead to badly broken replication.
1027     Even the relay_log_pos will be corrupted in this case, because the len is
1028     the relay log is not "val".
1029     With the end_log_pos solution, we avoid computations involving lengthes.
1030   */
1031   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
1032                       (long) log_pos, (long) group_master_log_pos));
1033 
1034   if (log_pos > 0)  // 3.23 binlogs don't have log_posx
1035     group_master_log_pos= log_pos;
1036 
1037   /*
1038     In MTS mode FD or Rotate event commit their solitary group to
1039     Coordinator's info table. Callers make sure that Workers have been
1040     executed all assignements.
1041     Broadcast to master_pos_wait() waiters should be done after
1042     the table is updated.
1043   */
1044   DBUG_ASSERT(!is_parallel_exec() ||
1045               mts_group_status != Relay_log_info::MTS_IN_GROUP);
1046   /*
1047     We do not force synchronization at this point, note the
1048     parameter false, because a non-transactional change is
1049     being committed.
1050 
1051     For that reason, the synchronization here is subjected to
1052     the option sync_relay_log_info.
1053 
1054     See sql/rpl_rli.h for further information on this behavior.
1055   */
1056   error= flush_info(FALSE);
1057 
1058   mysql_cond_broadcast(&data_cond);
1059   if (need_data_lock)
1060     mysql_mutex_unlock(&data_lock);
1061   DBUG_RETURN(error);
1062 }
1063 
1064 
close_temporary_tables()1065 void Relay_log_info::close_temporary_tables()
1066 {
1067   TABLE *table,*next;
1068   DBUG_ENTER("Relay_log_info::close_temporary_tables");
1069 
1070   for (table=save_temporary_tables ; table ; table=next)
1071   {
1072     next=table->next;
1073     /*
1074       Don't ask for disk deletion. For now, anyway they will be deleted when
1075       slave restarts, but it is a better intention to not delete them.
1076     */
1077     DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1078     close_temporary(table, 1, 0);
1079   }
1080   save_temporary_tables= 0;
1081   slave_open_temp_tables= 0;
1082   DBUG_VOID_RETURN;
1083 }
1084 
1085 /**
1086   Purges relay logs. It assumes to have a run lock on rli and that no
1087   slave thread are running.
1088 
1089   @param[in]   THD         connection,
1090   @param[in]   just_reset  if false, it tells that logs should be purged
1091                            and @c init_relay_log_pos() should be called,
1092   @errmsg[out] errmsg      store pointer to an error message.
1093 
1094   @retval 0 successfuly executed,
1095   @retval 1 otherwise error, where errmsg is set to point to the error message.
1096 */
1097 
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg)1098 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1099                                      const char** errmsg)
1100 {
1101   int error=0;
1102   const char *ln;
1103   char name_buf[FN_REFLEN];
1104   DBUG_ENTER("Relay_log_info::purge_relay_logs");
1105 
1106   /*
1107     Even if inited==0, we still try to empty master_log_* variables. Indeed,
1108     inited==0 does not imply that they already are empty.
1109 
1110     It could be that slave's info initialization partly succeeded: for example
1111     if relay-log.info existed but *relay-bin*.* have been manually removed,
1112     init_info reads the old relay-log.info and fills rli->master_log_*, then
1113     init_info checks for the existence of the relay log, this fails and
1114     init_info leaves inited to 0.
1115     In that pathological case, master_log_pos* will be properly reinited at
1116     the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1117     purge_relay_logs, will delete bogus *.info files or replace them with
1118     correct files), however if the user does SHOW SLAVE STATUS before START
1119     SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1120     master_log_* for SHOW SLAVE STATUS to display fine in any case.
1121   */
1122   group_master_log_name[0]= 0;
1123   group_master_log_pos= 0;
1124 
1125   if (!inited)
1126   {
1127     DBUG_PRINT("info", ("inited == 0"));
1128     if (error_on_rli_init_info)
1129     {
1130       ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1131 				  1, name_buf);
1132       if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1133       {
1134 	sql_print_error("Unable to purge relay log files. Failed to open relay "
1135 			"log index file:%s.", relay_log.get_index_fname());
1136 
1137 	DBUG_RETURN(1);
1138       }
1139       mysql_mutex_lock(&mi->data_lock);
1140       if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1141 				(max_relay_log_size ? max_relay_log_size :
1142 				 max_binlog_size), true,
1143 				true/*need_lock_log=true*/,
1144 				true/*need_lock_index=true*/,
1145 				true/*need_sid_lock=true*/,
1146 				mi->get_mi_description_event()))
1147       {
1148 	sql_print_error("Unable to purge relay log files. Failed to open relay "
1149 			"log file:%s.", relay_log.get_log_fname());
1150         mysql_mutex_unlock(&mi->data_lock);
1151 	DBUG_RETURN(1);
1152       }
1153       mysql_mutex_unlock(&mi->data_lock);
1154     }
1155     else
1156       DBUG_RETURN(0);
1157   }
1158   else
1159   {
1160     DBUG_ASSERT(slave_running == 0);
1161     DBUG_ASSERT(mi->slave_running == 0);
1162   }
1163 
1164   slave_skip_counter= 0;
1165   mysql_mutex_lock(&data_lock);
1166 
1167   /*
1168     we close the relay log fd possibly left open by the slave SQL thread,
1169     to be able to delete it; the relay log fd possibly left open by the slave
1170     I/O thread will be closed naturally in reset_logs() by the
1171     close(LOG_CLOSE_TO_BE_OPENED) call
1172   */
1173   if (cur_log_fd >= 0)
1174   {
1175     end_io_cache(&cache_buf);
1176     my_close(cur_log_fd, MYF(MY_WME));
1177     cur_log_fd= -1;
1178   }
1179 
1180   if (relay_log.reset_logs(thd))
1181   {
1182     *errmsg = "Failed during log reset";
1183     error=1;
1184     goto err;
1185   }
1186   /* Save name of used relay log file */
1187   strmake(group_relay_log_name, relay_log.get_log_fname(),
1188           sizeof(group_relay_log_name)-1);
1189   strmake(event_relay_log_name, relay_log.get_log_fname(),
1190           sizeof(event_relay_log_name)-1);
1191   group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1192   if (count_relay_log_space())
1193   {
1194     *errmsg= "Error counting relay log space";
1195     error= 1;
1196     goto err;
1197   }
1198   if (!just_reset)
1199     error= init_relay_log_pos(group_relay_log_name,
1200                               group_relay_log_pos,
1201                               false/*need_data_lock=false*/, errmsg, 0);
1202 
1203   if (!inited && error_on_rli_init_info)
1204     relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1205                     true/*need_lock_log=true*/,
1206                     true/*need_lock_index=true*/);
1207 
1208 err:
1209 #ifndef DBUG_OFF
1210   char buf[22];
1211 #endif
1212   DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1213   mysql_mutex_unlock(&data_lock);
1214   DBUG_RETURN(error);
1215 }
1216 
1217 
1218 /**
1219      Checks if condition stated in UNTIL clause of START SLAVE is reached.
1220 
1221      Specifically, it checks if UNTIL condition is reached. Uses caching result
1222      of last comparison of current log file name and target log file name. So
1223      cached value should be invalidated if current log file name changes (see
1224      @c Relay_log_info::notify_... functions).
1225 
1226      This caching is needed to avoid of expensive string comparisons and
1227      @c strtol() conversions needed for log names comparison. We don't need to
1228      compare them each time this function is called, we only need to do this
1229      when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1230      need to do this only after @c Rotate_log_event::do_apply_event() (which is
1231      rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1232      condition then we should invalidate cached comarison value after
1233      @c inc_group_relay_log_pos() which called for each group of events (so we
1234      have some benefit if we have something like queries that use
1235      autoincrement or if we have transactions).
1236 
1237      Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1238 
1239      @param master_beg_pos    position of the beginning of to be executed event
1240                               (not @c log_pos member of the event that points to
1241                               the beginning of the following event)
1242 
1243      @retval true   condition met or error happened (condition seems to have
1244                     bad log file name),
1245      @retval false  condition not met.
1246 */
1247 
is_until_satisfied(THD * thd,Log_event * ev)1248 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1249 {
1250   char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1251                     "condition is bad.";
1252   DBUG_ENTER("Relay_log_info::is_until_satisfied");
1253 
1254   switch (until_condition)
1255   {
1256   case UNTIL_MASTER_POS:
1257   case UNTIL_RELAY_POS:
1258   {
1259     const char *log_name= NULL;
1260     ulonglong log_pos= 0;
1261 
1262     if (until_condition == UNTIL_MASTER_POS)
1263     {
1264       if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1265         DBUG_RETURN(false);
1266       log_name= group_master_log_name;
1267       log_pos= (!ev || is_in_group() || !ev->log_pos) ?
1268         group_master_log_pos : ev->log_pos - ev->data_written;
1269     }
1270     else
1271     { /* until_condition == UNTIL_RELAY_POS */
1272       log_name= group_relay_log_name;
1273       log_pos= group_relay_log_pos;
1274     }
1275 
1276 #ifndef DBUG_OFF
1277     {
1278       char buf[32];
1279       DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1280                           group_master_log_name, llstr(group_master_log_pos, buf)));
1281       DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1282                           group_relay_log_name, llstr(group_relay_log_pos, buf)));
1283       DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1284                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1285                           log_name, llstr(log_pos, buf)));
1286       DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1287                           until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1288                           until_log_name, llstr(until_log_pos, buf)));
1289     }
1290 #endif
1291 
1292     if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1293     {
1294       /*
1295         We have no cached comparison results so we should compare log names
1296         and cache result.
1297         If we are after RESET SLAVE, and the SQL slave thread has not processed
1298         any event yet, it could be that group_master_log_name is "". In that case,
1299         just wait for more events (as there is no sensible comparison to do).
1300       */
1301 
1302       if (*log_name)
1303       {
1304         const char *basename= log_name + dirname_length(log_name);
1305 
1306         const char *q= (const char*)(fn_ext(basename)+1);
1307         if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1308         {
1309           /* Now compare extensions. */
1310           char *q_end;
1311           ulong log_name_extension= strtoul(q, &q_end, 10);
1312           if (log_name_extension < until_log_name_extension)
1313             until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1314           else
1315             until_log_names_cmp_result=
1316               (log_name_extension > until_log_name_extension) ?
1317               UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1318         }
1319         else
1320         {
1321           /* Base names do not match, so we abort */
1322           sql_print_error("%s", error_msg);
1323           DBUG_RETURN(true);
1324         }
1325       }
1326       else
1327         DBUG_RETURN(until_log_pos == 0);
1328     }
1329 
1330     if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1331           log_pos >= until_log_pos) ||
1332          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1333     {
1334       char buf[22];
1335       sql_print_information("Slave SQL thread stopped because it reached its"
1336                             " UNTIL position %s", llstr(until_pos(), buf));
1337       DBUG_RETURN(true);
1338     }
1339     DBUG_RETURN(false);
1340   }
1341 
1342   case UNTIL_SQL_BEFORE_GTIDS:
1343     // We only need to check once if logged_gtids set contains any of the until_sql_gtids.
1344     if (until_sql_gtids_first_event)
1345     {
1346       until_sql_gtids_first_event= false;
1347       global_sid_lock->wrlock();
1348       /* Check if until GTIDs were already applied. */
1349       const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1350       if (until_sql_gtids.is_intersection_nonempty(logged_gtids))
1351       {
1352         char *buffer= until_sql_gtids.to_string();
1353         global_sid_lock->unlock();
1354         sql_print_information("Slave SQL thread stopped because "
1355                               "UNTIL SQL_BEFORE_GTIDS %s is already "
1356                               "applied", buffer);
1357         my_free(buffer);
1358         DBUG_RETURN(true);
1359       }
1360       global_sid_lock->unlock();
1361     }
1362     if (ev != NULL && ev->get_type_code() == GTID_LOG_EVENT)
1363     {
1364       Gtid_log_event *gev= (Gtid_log_event *)ev;
1365       global_sid_lock->rdlock();
1366       if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1367       {
1368         char *buffer= until_sql_gtids.to_string();
1369         global_sid_lock->unlock();
1370         sql_print_information("Slave SQL thread stopped because it reached "
1371                               "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1372         my_free(buffer);
1373         DBUG_RETURN(true);
1374       }
1375       global_sid_lock->unlock();
1376     }
1377     DBUG_RETURN(false);
1378     break;
1379 
1380   case UNTIL_SQL_AFTER_GTIDS:
1381     {
1382       global_sid_lock->wrlock();
1383       const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1384       if (until_sql_gtids.is_subset(logged_gtids))
1385       {
1386         char *buffer= until_sql_gtids.to_string();
1387         global_sid_lock->unlock();
1388         sql_print_information("Slave SQL thread stopped because it reached "
1389                               "UNTIL SQL_AFTER_GTIDS %s", buffer);
1390         my_free(buffer);
1391         DBUG_RETURN(true);
1392       }
1393       global_sid_lock->unlock();
1394       DBUG_RETURN(false);
1395     }
1396     break;
1397 
1398   case UNTIL_SQL_AFTER_MTS_GAPS:
1399   case UNTIL_DONE:
1400     /*
1401       TODO: this condition is actually post-execution or post-scheduling
1402             so the proper place to check it before SQL thread goes
1403             into next_event() where it can wait while the condition
1404             has been satisfied already.
1405             It's deployed here temporarily to be fixed along the regular UNTIL
1406             support for MTS is provided.
1407     */
1408     if (mts_recovery_group_cnt == 0)
1409     {
1410       sql_print_information("Slave SQL thread stopped according to "
1411                             "UNTIL SQL_AFTER_MTS_GAPS as it has "
1412                             "processed all gap transactions left from "
1413                             "the previous slave session.");
1414       until_condition= UNTIL_DONE;
1415       DBUG_RETURN(true);
1416     }
1417     else
1418     {
1419       DBUG_RETURN(false);
1420     }
1421     break;
1422 
1423   case UNTIL_NONE:
1424     DBUG_ASSERT(0);
1425     break;
1426   }
1427 
1428   DBUG_ASSERT(0);
1429   DBUG_RETURN(false);
1430 }
1431 
cached_charset_invalidate()1432 void Relay_log_info::cached_charset_invalidate()
1433 {
1434   DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1435 
1436   /* Full of zeroes means uninitialized. */
1437   memset(cached_charset, 0, sizeof(cached_charset));
1438   DBUG_VOID_RETURN;
1439 }
1440 
1441 
cached_charset_compare(char * charset) const1442 bool Relay_log_info::cached_charset_compare(char *charset) const
1443 {
1444   DBUG_ENTER("Relay_log_info::cached_charset_compare");
1445 
1446   if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1447   {
1448     memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1449     DBUG_RETURN(1);
1450   }
1451   DBUG_RETURN(0);
1452 }
1453 
1454 
stmt_done(my_off_t event_master_log_pos)1455 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1456 {
1457   int error= 0;
1458 
1459   clear_flag(IN_STMT);
1460 
1461   DBUG_ASSERT(!belongs_to_client());
1462   /* Worker does not execute binlog update position logics */
1463   DBUG_ASSERT(!is_mts_worker(info_thd));
1464 
1465   /*
1466     Replication keeps event and group positions to specify the
1467     set of events that were executed.
1468     Event positions are incremented after processing each event
1469     whereas group positions are incremented when an event or a
1470     set of events is processed such as in a transaction and are
1471     committed or rolled back.
1472 
1473     A transaction can be ended with a Query Event, i.e. either
1474     commit or rollback, or by a Xid Log Event. Query Event is
1475     used to terminate pseudo-transactions that are executed
1476     against non-transactional engines such as MyIsam. Xid Log
1477     Event denotes though that a set of changes executed
1478     against a transactional engine is about to commit.
1479 
1480     Events' positions are incremented at stmt_done(). However,
1481     transactions that are ended with Xid Log Event have their
1482     group position incremented in the do_apply_event() and in
1483     the do_apply_event_work().
1484 
1485     Notice that the type of the engine, i.e. where data and
1486     positions are stored, against what events are being applied
1487     are not considered in this logic.
1488 
1489     Regarding the code that follows, notice that the executed
1490     group coordinates don't change if the current event is internal
1491     to the group. The same applies to MTS Coordinator when it
1492     handles a Format Descriptor event that appears in the middle
1493     of a group that is about to be assigned.
1494   */
1495   if ((!is_parallel_exec() && is_in_group()) ||
1496       mts_group_status != MTS_NOT_IN_GROUP)
1497   {
1498     inc_event_relay_log_pos();
1499   }
1500   else
1501   {
1502     if (is_parallel_exec())
1503     {
1504 
1505       DBUG_ASSERT(!is_mts_worker(info_thd));
1506 
1507       /*
1508         Format Description events only can drive MTS execution to this
1509         point. It is a special event group that is handled with
1510         synchronization. For that reason, the checkpoint routine is
1511         called here.
1512       */
1513       error= mts_checkpoint_routine(this, 0, false,
1514                                     true/*need_data_lock=true*/);
1515     }
1516     if (!error)
1517       error= inc_group_relay_log_pos(event_master_log_pos,
1518                                      true/*need_data_lock=true*/);
1519   }
1520 
1521   return error;
1522 }
1523 
1524 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1525 void Relay_log_info::cleanup_context(THD *thd, bool error)
1526 {
1527   DBUG_ENTER("Relay_log_info::cleanup_context");
1528 
1529   DBUG_ASSERT(info_thd == thd);
1530   /*
1531     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1532     may have opened tables, which we cannot be sure have been closed (because
1533     maybe the Rows_log_event have not been found or will not be, because slave
1534     SQL thread is stopping, or relay log has a missing tail etc). So we close
1535     all thread's tables. And so the table mappings have to be cancelled.
1536     2) Rows_log_event::do_apply_event() may even have started statements or
1537     transactions on them, which we need to rollback in case of error.
1538     3) If finding a Format_description_log_event after a BEGIN, we also need
1539     to rollback before continuing with the next events.
1540     4) so we need this "context cleanup" function.
1541   */
1542   if (error)
1543   {
1544     trans_rollback_stmt(thd); // if a "statement transaction"
1545     trans_rollback(thd);      // if a "real transaction"
1546   }
1547   if (rows_query_ev)
1548   {
1549     info_thd->reset_query_for_display();
1550     delete rows_query_ev;
1551     rows_query_ev= NULL;
1552     info_thd->set_query(NULL, 0);
1553   }
1554   m_table_map.clear_tables();
1555   slave_close_thread_tables(thd);
1556   if (error)
1557     thd->mdl_context.release_transactional_locks();
1558   clear_flag(IN_STMT);
1559   /*
1560     Cleanup for the flags that have been set at do_apply_event.
1561   */
1562   thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1563   thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1564 
1565   /*
1566     Reset state related to long_find_row notes in the error log:
1567     - timestamp
1568     - flag that decides whether the slave prints or not
1569   */
1570   reset_row_stmt_start_timestamp();
1571   unset_long_find_row_note_printed();
1572 
1573   DBUG_VOID_RETURN;
1574 }
1575 
clear_tables_to_lock()1576 void Relay_log_info::clear_tables_to_lock()
1577 {
1578   DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
1579 #ifndef DBUG_OFF
1580   /**
1581     When replicating in RBR and MyISAM Merge tables are involved
1582     open_and_lock_tables (called in do_apply_event) appends the
1583     base tables to the list of tables_to_lock. Then these are
1584     removed from the list in close_thread_tables (which is called
1585     before we reach this point).
1586 
1587     This assertion just confirms that we get no surprises at this
1588     point.
1589    */
1590   uint i=0;
1591   for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1592   DBUG_ASSERT(i == tables_to_lock_count);
1593 #endif
1594 
1595   while (tables_to_lock)
1596   {
1597     uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1598     if (tables_to_lock->m_tabledef_valid)
1599     {
1600       tables_to_lock->m_tabledef.table_def::~table_def();
1601       tables_to_lock->m_tabledef_valid= FALSE;
1602     }
1603 
1604     /*
1605       If blob fields were used during conversion of field values
1606       from the master table into the slave table, then we need to
1607       free the memory used temporarily to store their values before
1608       copying into the slave's table.
1609     */
1610     if (tables_to_lock->m_conv_table)
1611       free_blobs(tables_to_lock->m_conv_table);
1612 
1613     tables_to_lock=
1614       static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1615     tables_to_lock_count--;
1616     my_free(to_free);
1617   }
1618   DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
1619   DBUG_VOID_RETURN;
1620 }
1621 
slave_close_thread_tables(THD * thd)1622 void Relay_log_info::slave_close_thread_tables(THD *thd)
1623 {
1624   thd->get_stmt_da()->set_overwrite_status(true);
1625   DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
1626   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1627   thd->get_stmt_da()->set_overwrite_status(false);
1628 
1629   close_thread_tables(thd);
1630   /*
1631     - If transaction rollback was requested due to deadlock
1632     perform it and release metadata locks.
1633     - If inside a multi-statement transaction,
1634     defer the release of metadata locks until the current
1635     transaction is either committed or rolled back. This prevents
1636     other statements from modifying the table for the entire
1637     duration of this transaction.  This provides commit ordering
1638     and guarantees serializability across multiple transactions.
1639     - If in autocommit mode, or outside a transactional context,
1640     automatically release metadata locks of the current statement.
1641   */
1642   if (thd->transaction_rollback_request)
1643   {
1644     trans_rollback_implicit(thd);
1645     thd->mdl_context.release_transactional_locks();
1646   }
1647   else if (! thd->in_multi_stmt_transaction_mode())
1648     thd->mdl_context.release_transactional_locks();
1649   else
1650     thd->mdl_context.release_statement_locks();
1651 
1652   clear_tables_to_lock();
1653   DBUG_VOID_RETURN;
1654 }
1655 /**
1656   Execute a SHOW RELAYLOG EVENTS statement.
1657 
1658   @param thd Pointer to THD object for the client thread executing the
1659   statement.
1660 
1661   @retval FALSE success
1662   @retval TRUE failure
1663 */
mysql_show_relaylog_events(THD * thd)1664 bool mysql_show_relaylog_events(THD* thd)
1665 {
1666   Protocol *protocol= thd->protocol;
1667   List<Item> field_list;
1668   DBUG_ENTER("mysql_show_relaylog_events");
1669 
1670   DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1671 
1672   Log_event::init_show_field_list(&field_list);
1673   if (protocol->send_result_set_metadata(&field_list,
1674                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1675     DBUG_RETURN(TRUE);
1676 
1677   if (active_mi == NULL)
1678   {
1679     my_error(ER_SLAVE_CONFIGURATION, MYF(0));
1680     DBUG_RETURN(true);
1681   }
1682 
1683   DBUG_RETURN(show_binlog_events(thd, &active_mi->rli->relay_log));
1684 }
1685 
1686 #endif
1687 
rli_init_info()1688 int Relay_log_info::rli_init_info()
1689 {
1690   int error= 0;
1691   enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
1692   const char *msg= NULL;
1693 
1694   DBUG_ENTER("Relay_log_info::rli_init_info");
1695 
1696   mysql_mutex_assert_owner(&data_lock);
1697 
1698   /*
1699     If Relay_log_info is issued again after a failed init_info(), for
1700     instance because of missing relay log files, it will generate new
1701     files and ignore the previous failure, to avoid that we set
1702     error_on_rli_init_info as true.
1703     This a consequence of the behaviour change, in the past server was
1704     stopped when there were replication initialization errors, now it is
1705     not and so init_info() must be aware of previous failures.
1706   */
1707   if (error_on_rli_init_info)
1708     goto err;
1709 
1710   if (inited)
1711   {
1712     /*
1713       We have to reset read position of relay-log-bin as we may have
1714       already been reading from 'hotlog' when the slave was stopped
1715       last time. If this case pos_in_file would be set and we would
1716       get a crash when trying to read the signature for the binary
1717       relay log.
1718 
1719       We only rewind the read position if we are starting the SQL
1720       thread. The handle_slave_sql thread assumes that the read
1721       position is at the beginning of the file, and will read the
1722       "signature" and then fast-forward to the last position read.
1723     */
1724     bool hot_log= FALSE;
1725     /*
1726       my_b_seek does an implicit flush_io_cache, so we need to:
1727 
1728       1. check if this log is active (hot)
1729       2. if it is we keep log_lock until the seek ends, otherwise
1730          release it right away.
1731 
1732       If we did not take log_lock, SQL thread might race with IO
1733       thread for the IO_CACHE mutex.
1734 
1735     */
1736     mysql_mutex_t *log_lock= relay_log.get_log_lock();
1737     mysql_mutex_lock(log_lock);
1738     hot_log= relay_log.is_active(linfo.log_file_name);
1739 
1740     if (!hot_log)
1741       mysql_mutex_unlock(log_lock);
1742 
1743     my_b_seek(cur_log, (my_off_t) 0);
1744 
1745     if (hot_log)
1746       mysql_mutex_unlock(log_lock);
1747     DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
1748   }
1749 
1750   cur_log_fd = -1;
1751   slave_skip_counter= 0;
1752   abort_pos_wait= 0;
1753   log_space_limit= relay_log_space_limit;
1754   log_space_total= 0;
1755   tables_to_lock= 0;
1756   tables_to_lock_count= 0;
1757 
1758   char pattern[FN_REFLEN];
1759   (void) my_realpath(pattern, slave_load_tmpdir, 0);
1760   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
1761                 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
1762   {
1763     sql_print_error("Unable to use slave's temporary directory '%s'.",
1764                     slave_load_tmpdir);
1765     DBUG_RETURN(1);
1766   }
1767   unpack_filename(slave_patternload_file, pattern);
1768   slave_patternload_file_size= strlen(slave_patternload_file);
1769 
1770   /*
1771     The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
1772     Note that the I/O thread flushes it to disk after writing every
1773     event, in flush_info within the master info.
1774   */
1775   /*
1776     For the maximum log size, we choose max_relay_log_size if it is
1777     non-zero, max_binlog_size otherwise. If later the user does SET
1778     GLOBAL on one of these variables, fix_max_binlog_size and
1779     fix_max_relay_log_size will reconsider the choice (for example
1780     if the user changes max_relay_log_size to zero, we have to
1781     switch to using max_binlog_size for the relay log) and update
1782     relay_log.max_size (and mysql_bin_log.max_size).
1783   */
1784   {
1785     /* Reports an error and returns, if the --relay-log's path
1786        is a directory.*/
1787     if (opt_relay_logname &&
1788         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
1789     {
1790       sql_print_error("Path '%s' is a directory name, please specify \
1791 a file name for --relay-log option.", opt_relay_logname);
1792       DBUG_RETURN(1);
1793     }
1794 
1795     /* Reports an error and returns, if the --relay-log-index's path
1796        is a directory.*/
1797     if (opt_relaylog_index_name &&
1798         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
1799         == FN_LIBCHAR)
1800     {
1801       sql_print_error("Path '%s' is a directory name, please specify \
1802 a file name for --relay-log-index option.", opt_relaylog_index_name);
1803       DBUG_RETURN(1);
1804     }
1805 
1806     char buf[FN_REFLEN];
1807     const char *ln;
1808     static bool name_warning_sent= 0;
1809     ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1810                                 1, buf);
1811     /* We send the warning only at startup, not after every RESET SLAVE */
1812     if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
1813     {
1814       /*
1815         User didn't give us info to name the relay log index file.
1816         Picking `hostname`-relay-bin.index like we do, causes replication to
1817         fail if this slave's hostname is changed later. So, we would like to
1818         instead require a name. But as we don't want to break many existing
1819         setups, we only give warning, not error.
1820       */
1821       sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
1822                         " so replication "
1823                         "may break when this MySQL server acts as a "
1824                         "slave and has his hostname changed!! Please "
1825                         "use '--relay-log=%s' to avoid this problem.", ln);
1826       name_warning_sent= 1;
1827     }
1828 
1829     relay_log.is_relay_log= TRUE;
1830 
1831     if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1832     {
1833       sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
1834       DBUG_RETURN(1);
1835     }
1836 #ifndef DBUG_OFF
1837     global_sid_lock->wrlock();
1838     gtid_set.dbug_print("set of GTIDs in relay log before initialization");
1839     global_sid_lock->unlock();
1840 #endif
1841     /*
1842       Below init_gtid_sets() function will parse the available relay logs and
1843       set I/O retrieved gtid event in gtid_state object. We dont need to find
1844       last_retrieved_gtid_event if relay_log_recovery=1 (retrieved set will
1845       be cleared off in that case).
1846     */
1847     if (!current_thd &&
1848         relay_log.init_gtid_sets(&gtid_set, NULL,
1849                                  is_relay_log_recovery ? NULL : get_last_retrieved_gtid(),
1850                                  opt_slave_sql_verify_checksum,
1851                                  true/*true=need lock*/))
1852     {
1853       sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
1854       DBUG_RETURN(1);
1855     }
1856 #ifndef DBUG_OFF
1857     global_sid_lock->wrlock();
1858     gtid_set.dbug_print("set of GTIDs in relay log after initialization");
1859     global_sid_lock->unlock();
1860 #endif
1861     /*
1862       Configures what object is used by the current log to store processed
1863       gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
1864       corretly compute the set of previous gtids.
1865     */
1866     relay_log.set_previous_gtid_set(&gtid_set);
1867     /*
1868       note, that if open() fails, we'll still have index file open
1869       but a destructor will take care of that
1870     */
1871     if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1872                               (max_relay_log_size ? max_relay_log_size :
1873                                max_binlog_size), true,
1874                               true/*need_lock_log=true*/,
1875                               true/*need_lock_index=true*/,
1876                               true/*need_sid_lock=true*/,
1877                               mi->get_mi_description_event()))
1878     {
1879       sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
1880       DBUG_RETURN(1);
1881     }
1882   }
1883 
1884    /*
1885     This checks if the repository was created before and thus there
1886     will be values to be read. Please, do not move this call after
1887     the handler->init_info().
1888   */
1889   if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
1890   {
1891     msg= "Error checking relay log repository";
1892     error= 1;
1893     goto err;
1894   }
1895 
1896   if (handler->init_info())
1897   {
1898     msg= "Error reading relay log configuration";
1899     error= 1;
1900     goto err;
1901   }
1902 
1903   if (check_return == REPOSITORY_DOES_NOT_EXIST)
1904   {
1905     /* Init relay log with first entry in the relay index file */
1906     if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
1907                            false/*need_data_lock=false (lock should be held
1908                                   prior to invoking this function)*/,
1909                            &msg, 0))
1910     {
1911       error= 1;
1912       goto err;
1913     }
1914     group_master_log_name[0]= 0;
1915     group_master_log_pos= 0;
1916   }
1917   else
1918   {
1919     if (read_info(handler))
1920     {
1921       msg= "Error reading relay log configuration";
1922       error= 1;
1923       goto err;
1924     }
1925 
1926     if (is_relay_log_recovery && init_recovery(mi, &msg))
1927     {
1928       error= 1;
1929       goto err;
1930     }
1931 
1932     if (init_relay_log_pos(group_relay_log_name,
1933                            group_relay_log_pos,
1934                            false/*need_data_lock=false (lock should be held
1935                                   prior to invoking this function)*/,
1936                            &msg, 0))
1937     {
1938       char llbuf[22];
1939       sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
1940                       group_relay_log_name,
1941                       llstr(group_relay_log_pos, llbuf));
1942       error= 1;
1943       goto err;
1944     }
1945 
1946 #ifndef DBUG_OFF
1947     {
1948       char llbuf1[22], llbuf2[22];
1949       DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
1950                           llstr(my_b_tell(cur_log),llbuf1),
1951                           llstr(event_relay_log_pos,llbuf2)));
1952       DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
1953       DBUG_ASSERT((my_b_tell(cur_log) == event_relay_log_pos));
1954     }
1955 #endif
1956   }
1957 
1958   inited= 1;
1959   error_on_rli_init_info= false;
1960   if (flush_info(TRUE))
1961   {
1962     msg= "Error reading relay log configuration";
1963     error= 1;
1964     goto err;
1965   }
1966 
1967   if (count_relay_log_space())
1968   {
1969     msg= "Error counting relay log space";
1970     error= 1;
1971     goto err;
1972   }
1973 
1974   /*
1975     In case of MTS the recovery is deferred until the end of global_init_info.
1976   */
1977   if (!mi->rli->mts_recovery_group_cnt)
1978     is_relay_log_recovery= FALSE;
1979   DBUG_RETURN(error);
1980 
1981 err:
1982   handler->end_info();
1983   inited= 0;
1984   error_on_rli_init_info= true;
1985   if (msg)
1986     sql_print_error("%s.", msg);
1987   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1988                   true/*need_lock_log=true*/,
1989                   true/*need_lock_index=true*/);
1990   DBUG_RETURN(error);
1991 }
1992 
end_info()1993 void Relay_log_info::end_info()
1994 {
1995   DBUG_ENTER("Relay_log_info::end_info");
1996 
1997   error_on_rli_init_info= false;
1998   if (!inited)
1999     DBUG_VOID_RETURN;
2000 
2001   handler->end_info();
2002 
2003   if (cur_log_fd >= 0)
2004   {
2005     end_io_cache(&cache_buf);
2006     (void)my_close(cur_log_fd, MYF(MY_WME));
2007     cur_log_fd= -1;
2008   }
2009   inited = 0;
2010   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2011                   true/*need_lock_log=true*/,
2012                   true/*need_lock_index=true*/);
2013   relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2014   /*
2015     Delete the slave's temporary tables from memory.
2016     In the future there will be other actions than this, to ensure persistance
2017     of slave's temp tables after shutdown.
2018   */
2019   close_temporary_tables();
2020 
2021   DBUG_VOID_RETURN;
2022 }
2023 
flush_current_log()2024 int Relay_log_info::flush_current_log()
2025 {
2026   DBUG_ENTER("Relay_log_info::flush_current_log");
2027   /*
2028     When we come to this place in code, relay log may or not be initialized;
2029     the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2030   */
2031   IO_CACHE *log_file= relay_log.get_log_file();
2032   if (flush_io_cache(log_file))
2033     DBUG_RETURN(2);
2034 
2035   DBUG_RETURN(0);
2036 }
2037 
set_master_info(Master_info * info)2038 void Relay_log_info::set_master_info(Master_info* info)
2039 {
2040   mi= info;
2041 }
2042 
2043 /**
2044   Stores the file and position where the execute-slave thread are in the
2045   relay log:
2046 
2047     - As this is only called by the slave thread or on STOP SLAVE, with the
2048       log_lock grabbed and the slave thread stopped, we don't need to have
2049       a lock here.
2050     - If there is an active transaction, then we don't update the position
2051       in the relay log.  This is to ensure that we re-execute statements
2052       if we die in the middle of an transaction that was rolled back.
2053     - As a transaction never spans binary logs, we don't have to handle the
2054       case where we do a relay-log-rotation in the middle of the transaction.
2055       If this would not be the case, we would have to ensure that we
2056       don't delete the relay log file where the transaction started when
2057       we switch to a new relay log file.
2058 
2059   @retval  0   ok,
2060   @retval  1   write error, otherwise.
2061 */
2062 
2063 /**
2064   Store the file and position where the slave's SQL thread are in the
2065   relay log.
2066 
2067   Notes:
2068 
2069   - This function should be called either from the slave SQL thread,
2070     or when the slave thread is not running.  (It reads the
2071     group_{relay|master}_log_{pos|name} and delay fields in the rli
2072     object.  These may only be modified by the slave SQL thread or by
2073     a client thread when the slave SQL thread is not running.)
2074 
2075   - If there is an active transaction, then we do not update the
2076     position in the relay log.  This is to ensure that we re-execute
2077     statements if we die in the middle of an transaction that was
2078     rolled back.
2079 
2080   - As a transaction never spans binary logs, we don't have to handle
2081     the case where we do a relay-log-rotation in the middle of the
2082     transaction.  If transactions could span several binlogs, we would
2083     have to ensure that we do not delete the relay log file where the
2084     transaction started before switching to a new relay log file.
2085 
2086   - Error can happen if writing to file fails or if flushing the file
2087     fails.
2088 
2089   @param rli The object representing the Relay_log_info.
2090 
2091   @todo Change the log file information to a binary format to avoid
2092   calling longlong2str.
2093 
2094   @return 0 on success, 1 on error.
2095 */
flush_info(const bool force)2096 int Relay_log_info::flush_info(const bool force)
2097 {
2098   DBUG_ENTER("Relay_log_info::flush_info");
2099 
2100   if (!inited)
2101     DBUG_RETURN(0);
2102 
2103   /*
2104     We update the sync_period at this point because only here we
2105     now that we are handling a relay log info. This needs to be
2106     update every time we call flush because the option maybe
2107     dinamically set.
2108   */
2109   handler->set_sync_period(sync_relayloginfo_period);
2110 
2111   if (write_info(handler))
2112     goto err;
2113 
2114   if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2115     goto err;
2116 
2117   force_flush_postponed_due_to_split_trans= false;
2118   DBUG_RETURN(0);
2119 
2120 err:
2121   sql_print_error("Error writing relay log configuration.");
2122   DBUG_RETURN(1);
2123 }
2124 
get_number_info_rli_fields()2125 size_t Relay_log_info::get_number_info_rli_fields()
2126 {
2127   return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2128 }
2129 
read_info(Rpl_info_handler * from)2130 bool Relay_log_info::read_info(Rpl_info_handler *from)
2131 {
2132   int lines= 0;
2133   char *first_non_digit= NULL;
2134   ulong temp_group_relay_log_pos= 0;
2135   ulong temp_group_master_log_pos= 0;
2136   int temp_sql_delay= 0;
2137   int temp_internal_id= internal_id;
2138 
2139   DBUG_ENTER("Relay_log_info::read_info");
2140 
2141   /*
2142     Should not read RLI from file in client threads. Client threads
2143     only use RLI to execute BINLOG statements.
2144 
2145     @todo Uncomment the following assertion. Currently,
2146     Relay_log_info::init() is called from init_master_info() before
2147     the THD object Relay_log_info::sql_thd is created. That means we
2148     cannot call belongs_to_client() since belongs_to_client()
2149     dereferences Relay_log_info::sql_thd. So we need to refactor
2150     slightly: the THD object should be created by Relay_log_info
2151     constructor (or passed to it), so that we are guaranteed that it
2152     exists at this point. /Sven
2153   */
2154   //DBUG_ASSERT(!belongs_to_client());
2155 
2156   /*
2157     Starting from 5.1.x, relay-log.info has a new format. Now, its
2158     first line contains the number of lines in the file. By reading
2159     this number we can determine which version our master.info comes
2160     from. We can't simply count the lines in the file, since
2161     versions before 5.1.x could generate files with more lines than
2162     needed. If first line doesn't contain a number, or if it
2163     contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2164     then the file is treated like a file from pre-5.1.x version.
2165     There is no ambiguity when reading an old master.info: before
2166     5.1.x, the first line contained the binlog's name, which is
2167     either empty or has an extension (contains a '.'), so can't be
2168     confused with an integer.
2169 
2170     So we're just reading first line and trying to figure which
2171     version is this.
2172   */
2173 
2174   /*
2175     The first row is temporarily stored in mi->master_log_name, if
2176     it is line count and not binlog name (new format) it will be
2177     overwritten by the second row later.
2178   */
2179   if (from->prepare_info_for_read() ||
2180       from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2181                      (char *) ""))
2182     DBUG_RETURN(TRUE);
2183 
2184   lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2185 
2186   if (group_relay_log_name[0]!='\0' &&
2187       *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2188   {
2189     /* Seems to be new format => read group relay log name */
2190     if (from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2191                        (char *) ""))
2192       DBUG_RETURN(TRUE);
2193   }
2194   else
2195      DBUG_PRINT("info", ("relay_log_info file is in old format."));
2196 
2197   if (from->get_info((ulong *) &temp_group_relay_log_pos,
2198                      (ulong) BIN_LOG_HEADER_SIZE) ||
2199       from->get_info(group_master_log_name,
2200                      (size_t) sizeof(group_relay_log_name),
2201                      (char *) "") ||
2202       from->get_info((ulong *) &temp_group_master_log_pos,
2203                      (ulong) 0))
2204     DBUG_RETURN(TRUE);
2205 
2206   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2207   {
2208     if (from->get_info((int *) &temp_sql_delay, (int) 0))
2209       DBUG_RETURN(TRUE);
2210   }
2211 
2212   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2213   {
2214     if (from->get_info(&recovery_parallel_workers,(ulong) 0))
2215       DBUG_RETURN(TRUE);
2216   }
2217 
2218   if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2219   {
2220     if (from->get_info(&temp_internal_id, (int) 1))
2221       DBUG_RETURN(TRUE);
2222   }
2223 
2224   group_relay_log_pos=  temp_group_relay_log_pos;
2225   group_master_log_pos= temp_group_master_log_pos;
2226   sql_delay= (int32) temp_sql_delay;
2227   internal_id= (uint) temp_internal_id;
2228 
2229   DBUG_ASSERT(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2230              (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2231   DBUG_RETURN(FALSE);
2232 }
2233 
write_info(Rpl_info_handler * to)2234 bool Relay_log_info::write_info(Rpl_info_handler *to)
2235 {
2236   DBUG_ENTER("Relay_log_info::write_info");
2237 
2238   /*
2239     @todo Uncomment the following assertion. See todo in
2240     Relay_log_info::read_info() for details. /Sven
2241   */
2242   //DBUG_ASSERT(!belongs_to_client());
2243 
2244   if (to->prepare_info_for_write() ||
2245       to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2246       to->set_info(group_relay_log_name) ||
2247       to->set_info((ulong) group_relay_log_pos) ||
2248       to->set_info(group_master_log_name) ||
2249       to->set_info((ulong) group_master_log_pos) ||
2250       to->set_info((int) sql_delay) ||
2251       to->set_info(recovery_parallel_workers) ||
2252       to->set_info((int) internal_id))
2253     DBUG_RETURN(TRUE);
2254 
2255   DBUG_RETURN(FALSE);
2256 }
2257 
2258 /**
2259    Delete the existing event and set a new one. This class is
2260    responsible for freeing the event, the caller should not do that.
2261    When a new FD is from the master adaptation routine is invoked
2262    to align the slave applier execution context with the master version.
2263 
2264    The method is run by SQL thread/MTS Coordinator.
2265    Although notice that MTS worker runs it, inefficiently (see assert),
2266    once at its destruction time.
2267    todo: fix Slave_worker and Relay_log_info inheritance relation.
2268 
2269    @param  a pointer to be installed into execution context
2270            FormatDescriptor event
2271 */
2272 
set_rli_description_event(Format_description_log_event * fe)2273 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2274 {
2275   DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2276 
2277   if (fe)
2278   {
2279     adapt_to_master_version(fe);
2280     if (info_thd && is_parallel_exec())
2281     {
2282       for (uint i= 0; i < workers.elements; i++)
2283       {
2284         Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
2285         mysql_mutex_lock(&w->jobs_lock);
2286         if (w->running_status == Slave_worker::RUNNING)
2287           w->set_rli_description_event(fe);
2288         mysql_mutex_unlock(&w->jobs_lock);
2289       }
2290     }
2291   }
2292   delete rli_description_event;
2293   rli_description_event= fe;
2294 }
2295 
2296 struct st_feature_version
2297 {
2298   /*
2299     The enum must be in the version non-descending top-down order,
2300     the last item formally corresponds to highest possible server
2301     version (never reached, thereby no adapting actions here);
2302     enumeration starts from zero.
2303   */
2304   enum
2305   {
2306     WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2307     _END_OF_LIST // always last
2308   } item;
2309   /*
2310     Version where the feature is introduced.
2311   */
2312   uchar version_split[3];
2313   /*
2314     Action to perform when according to FormatDescriptor event Master
2315     is found to be feature-aware while previously it has *not* been.
2316   */
2317   void (*upgrade) (THD*);
2318   /*
2319     Action to perform when according to FormatDescriptor event Master
2320     is found to be feature-*un*aware while previously it has been.
2321   */
2322   void (*downgrade) (THD*);
2323 };
2324 
wl6292_upgrade_func(THD * thd)2325 void wl6292_upgrade_func(THD *thd)
2326 {
2327   thd->variables.explicit_defaults_for_timestamp= false;
2328   if (global_system_variables.explicit_defaults_for_timestamp)
2329     thd->variables.explicit_defaults_for_timestamp= true;
2330 
2331   return;
2332 }
2333 
wl6292_downgrade_func(THD * thd)2334 void wl6292_downgrade_func(THD *thd)
2335 {
2336   if (global_system_variables.explicit_defaults_for_timestamp)
2337     thd->variables.explicit_defaults_for_timestamp= false;
2338 
2339   return;
2340 }
2341 
2342 /**
2343    Sensitive to Master-vs-Slave version difference features
2344    should be listed in the version non-descending order.
2345 */
2346 static st_feature_version s_features[]=
2347 {
2348   // order is the same as in the enum
2349   { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2350     {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2351   { st_feature_version::_END_OF_LIST,
2352     {255, 255, 255}, NULL, NULL }
2353 };
2354 
2355 /**
2356    The method lists rules of adaptation for the slave applier
2357    to specific master versions.
2358    It's executed right before a new master FD is set for
2359    slave appliers execution context.
2360    Comparison of the old and new version yields the adaptive
2361    actions direction.
2362    Current execution FD's version, V_0, is compared with the new being set up
2363    FD (the arg), let's call it V_1.
2364    In the case of downgrade features that are defined in [V_0, V_1-1] range
2365    (V_1 excluded) are "removed" by running the downgrade actions.
2366    In the upgrade case the featured defined in [V_0 + 1, V_1] range are
2367    "added" by running the upgrade actions.
2368 
2369    Notice, that due to relay log may have two FD events, one the slave local
2370    and the other from the Master. That can lead to extra
2371    adapt_to_master_version() calls and in case Slave and Master are of different
2372    versions the extra two calls should compensate each other.
2373 
2374    Also, at composing downgrade/upgrade actions keep in mind that
2375    at initialization Slave sets up FD of version 4.0 and then transits to
2376    the current server version. At transition all upgrading actions in
2377    the range of [4.0..current] are run.
2378 
2379    @param fdle  a pointer to new Format Description event that is being set
2380                 up for execution context.
2381 */
adapt_to_master_version(Format_description_log_event * fdle)2382 void Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2383 {
2384   THD *thd=info_thd;
2385   ulong master_version, current_version;
2386   int changed= !fdle || ! rli_description_event ? 0 :
2387     (master_version= fdle->get_version_product()) -
2388     (current_version= rli_description_event->get_version_product());
2389 
2390   /* When the last version is not changed nothing to adapt for */
2391   if (!changed)
2392     return;
2393 
2394   /*
2395     find item starting from and ending at for which adaptive actions run
2396     for downgrade or upgrade branches.
2397     (todo: convert into bsearch when number of features will grow significantly)
2398   */
2399   bool downgrade= changed < 0;
2400   long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2401 
2402   for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2403   {
2404     ulong ver_f= version_product(s_features[i].version_split);
2405 
2406     if ((downgrade ? master_version : current_version) < ver_f &&
2407         i_first == st_feature_version::_END_OF_LIST)
2408       i_first= i;
2409     if ((downgrade ? current_version : master_version) < ver_f)
2410     {
2411       i_last= i;
2412       DBUG_ASSERT(i_last >= i_first);
2413       break;
2414     }
2415   }
2416 
2417   /*
2418      actions, executed in version non-descending st_feature_version order
2419   */
2420   for (i= i_first; i < i_last; i++)
2421   {
2422     /* Run time check of the st_feature_version items ordering */
2423     DBUG_ASSERT(!i ||
2424                 version_product(s_features[i - 1].version_split) <=
2425                 version_product(s_features[i].version_split));
2426 
2427     DBUG_ASSERT((downgrade ? master_version : current_version) <
2428                 version_product(s_features[i].version_split) &&
2429                 (downgrade ? current_version : master_version  >=
2430                  version_product(s_features[i].version_split)));
2431 
2432     if (downgrade && s_features[i].downgrade)
2433     {
2434       s_features[i].downgrade(thd);
2435     }
2436     else if (s_features[i].upgrade)
2437     {
2438       s_features[i].upgrade(thd);
2439     }
2440   }
2441 }
2442