1 /* Copyright (c) 2006, 2017, Oracle and/or its affiliates.
2    Copyright (c) 2010, 2020, MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software Foundation,
15    51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #include "unireg.h"                             // HAVE_*
20 #include "rpl_mi.h"
21 #include "rpl_rli.h"
22 #include "sql_base.h"                        // close_thread_tables
Slave_reporting_capability(char const * thread_name)23 #include <my_dir.h>    // For MY_STAT
24 #include "sql_repl.h"  // For check_binlog_magic
25 #include "log_event.h" // Format_description_log_event, Log_event,
26                        // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
27                        // PREFIX_SQL_LOAD
28 #include "rpl_utility.h"
29 #include "transaction.h"
30 #include "sql_parse.h"                          // end_trans, ROLLBACK
31 #include "slave.h"
32 #include <mysql/plugin.h>
33 #include <mysql/service_thd_wait.h>
34 #include "lock.h"
35 #include "sql_table.h"
36 
37 static int count_relay_log_space(Relay_log_info* rli);
38 
39 /**
40    Current replication state (hash of last GTID executed, per replication
41    domain).
42 */
43 rpl_slave_state *rpl_global_gtid_slave_state;
44 /* Object used for MASTER_GTID_WAIT(). */
45 gtid_waiting rpl_global_gtid_waiting;
46 
47 const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event";
48 
49 Relay_log_info::Relay_log_info(bool is_slave_recovery)
50   :Slave_reporting_capability("SQL"),
51    replicate_same_server_id(::replicate_same_server_id),
52    info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
53    sync_counter(0), is_relay_log_recovery(is_slave_recovery),
54    save_temporary_tables(0),
55    mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0),
56    cur_log_old_open_count(0), error_on_rli_init_info(false),
57    group_relay_log_pos(0), event_relay_log_pos(0),
58    group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
59    last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
60    abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
61    gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
62    slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
63    until_log_pos(0), retried_trans(0), executed_entries(0),
64    sql_delay(0), sql_delay_end(0),
65    until_relay_log_names_defer(false),
66    m_flags(0)
67 {
68   DBUG_ENTER("Relay_log_info::Relay_log_info");
69 
70   relay_log.is_relay_log= TRUE;
71   relay_log_state.init();
72 #ifdef HAVE_PSI_INTERFACE
73   relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
74                          key_RELAYLOG_COND_relay_log_updated,
75                          key_RELAYLOG_COND_bin_log_updated,
76                          key_file_relaylog,
77                          key_file_relaylog_index,
78                          key_RELAYLOG_COND_queue_busy,
79                          key_LOCK_relaylog_end_pos);
~Slave_reporting_capability()80 #endif
81 
82   group_relay_log_name[0]= event_relay_log_name[0]=
83     group_master_log_name[0]= 0;
84   until_log_name[0]= ign_master_log_name_end[0]= 0;
85   max_relay_log_size= global_system_variables.max_relay_log_size;
86   bzero((char*) &info_file, sizeof(info_file));
87   bzero((char*) &cache_buf, sizeof(cache_buf));
88   mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
89   mysql_mutex_init(key_relay_log_info_data_lock,
90                    &data_lock, MY_MUTEX_INIT_FAST);
91   mysql_mutex_init(key_relay_log_info_log_space_lock,
92                    &log_space_lock, MY_MUTEX_INIT_FAST);
93   mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
94   mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
95   mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
96   mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
97   relay_log.init_pthread_objects();
98   DBUG_VOID_RETURN;
99 }
100 
101 
102 Relay_log_info::~Relay_log_info()
103 {
104   DBUG_ENTER("Relay_log_info::~Relay_log_info");
105 
106   reset_inuse_relaylog();
107   mysql_mutex_destroy(&run_lock);
108   mysql_mutex_destroy(&data_lock);
109   mysql_mutex_destroy(&log_space_lock);
110   mysql_cond_destroy(&data_cond);
111   mysql_cond_destroy(&start_cond);
112   mysql_cond_destroy(&stop_cond);
113   mysql_cond_destroy(&log_space_cond);
114   relay_log.cleanup();
115   DBUG_VOID_RETURN;
116 }
117 
118 
119 /**
120   Read the relay_log.info file.
121 
122   @param info_fname The name of the file to read from.
123   @retval 0 success
124   @retval 1 failure
125 */
126 int Relay_log_info::init(const char* info_fname)
127 {
128   char fname[FN_REFLEN+128];
129   const char* msg = 0;
130   int error = 0;
131   mysql_mutex_t *log_lock;
132   DBUG_ENTER("Relay_log_info::init");
133 
134   if (inited)                       // Set if this function called
135     DBUG_RETURN(0);
136 
137   log_lock= relay_log.get_log_lock();
138   fn_format(fname, info_fname, mysql_data_home, "", 4+32);
139   mysql_mutex_lock(&data_lock);
140   cur_log_fd = -1;
141   slave_skip_counter=0;
142   abort_pos_wait=0;
143   log_space_limit= relay_log_space_limit;
144   log_space_total= 0;
145 
146   if (unlikely(error_on_rli_init_info))
147     goto err;
148 
149   char pattern[FN_REFLEN];
150   (void) my_realpath(pattern, slave_load_tmpdir, 0);
151   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
152             MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
153   {
154     mysql_mutex_unlock(&data_lock);
155     sql_print_error("Unable to use slave's temporary directory %s",
156                     slave_load_tmpdir);
157     DBUG_RETURN(1);
158   }
159   unpack_filename(slave_patternload_file, pattern);
160   slave_patternload_file_size= strlen(slave_patternload_file);
161 
162   /*
163     The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
164     Note that the I/O thread flushes it to disk after writing every
165     event, in flush_master_info(mi, 1, ?).
166   */
167 
168   {
169     /* Reports an error and returns, if the --relay-log's path
170        is a directory.*/
171     if (opt_relay_logname &&
172         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
173     {
174       mysql_mutex_unlock(&data_lock);
175       sql_print_error("Path '%s' is a directory name, please specify \
176 a file name for --relay-log option", opt_relay_logname);
177       DBUG_RETURN(1);
178     }
179 
180     /* Reports an error and returns, if the --relay-log-index's path
181        is a directory.*/
182     if (opt_relaylog_index_name &&
183         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
184         == FN_LIBCHAR)
185     {
186       mysql_mutex_unlock(&data_lock);
187       sql_print_error("Path '%s' is a directory name, please specify \
188 a file name for --relay-log-index option", opt_relaylog_index_name);
189       DBUG_RETURN(1);
190     }
191 
192     char buf[FN_REFLEN];
193     const char *ln;
194     static bool name_warning_sent= 0;
195     ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
196                                      1, buf);
197     /* We send the warning only at startup, not after every RESET SLAVE */
198     if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
199         !opt_bootstrap)
200     {
201       /*
202         User didn't give us info to name the relay log index file.
203         Picking `hostname`-relay-bin.index like we do, causes replication to
204         fail if this slave's hostname is changed later. So, we would like to
205         instead require a name. But as we don't want to break many existing
206         setups, we only give warning, not error.
207       */
208       sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
209                         " so replication "
210                         "may break when this MariaDB server acts as a "
211                         "replica and has its hostname changed. Please "
212                         "use '--log-basename=#' or '--relay-log=%s' to avoid "
213                         "this problem.", ln);
214       name_warning_sent= 1;
215     }
216 
217     /* For multimaster, add connection name to relay log filenames */
218     char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
219     char *buf_relaylog_index_name= opt_relaylog_index_name;
220 
221     create_logfile_name_with_suffix(buf_relay_logname,
222                                     sizeof(buf_relay_logname),
223                                     ln, 1, &mi->cmp_connection_name);
224     ln= buf_relay_logname;
225 
226     if (opt_relaylog_index_name)
227     {
228       buf_relaylog_index_name= buf_relaylog_index_name_buff;
229       create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
230                                       sizeof(buf_relaylog_index_name_buff),
231                                       opt_relaylog_index_name, 0,
232                                       &mi->cmp_connection_name);
233     }
234 
235     /*
236       note, that if open() fails, we'll still have index file open
237       but a destructor will take care of that
238     */
239     mysql_mutex_lock(log_lock);
240     if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
241         relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
242                        (ulong)max_relay_log_size, 1, TRUE))
243     {
244       mysql_mutex_unlock(log_lock);
245       mysql_mutex_unlock(&data_lock);
246       sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %M", ln, my_errno);
247       DBUG_RETURN(1);
248     }
249     mysql_mutex_unlock(log_lock);
250   }
251 
252   /* if file does not exist */
253   if (access(fname,F_OK))
254   {
255     /*
256       If someone removed the file from underneath our feet, just close
257       the old descriptor and re-create the old file
258     */
259     if (info_fd >= 0)
260       mysql_file_close(info_fd, MYF(MY_WME));
261     if ((info_fd= mysql_file_open(key_file_relay_log_info,
262                                   fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
263     {
264       sql_print_error("Failed to create a new relay log info file ("
265                       "file '%s', errno %d)", fname, my_errno);
266       msg= current_thd->get_stmt_da()->message();
267       goto err;
268     }
269     if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
270                       MYF(MY_WME)))
271     {
272       sql_print_error("Failed to create a cache on relay log info file '%s'",
273                       fname);
274       msg= current_thd->get_stmt_da()->message();
275       goto err;
276     }
277 
278     /* Init relay log with first entry in the relay index file */
279     if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
280                            &msg, 0))
281     {
282       sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
283       goto err;
284     }
285     group_master_log_name[0]= 0;
286     group_master_log_pos= 0;
287   }
288   else // file exists
289   {
290     if (info_fd >= 0)
291       reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
292     else
293     {
294       int error=0;
295       if ((info_fd= mysql_file_open(key_file_relay_log_info,
296                                     fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
297       {
298         sql_print_error("\
299 Failed to open the existing relay log info file '%s' (errno %d)",
300                         fname, my_errno);
301         error= 1;
302       }
303       else if (init_io_cache(&info_file, info_fd,
304                              IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
305       {
306         sql_print_error("Failed to create a cache on relay log info file '%s'",
307                         fname);
308         error= 1;
309       }
310       if (unlikely(error))
311       {
312         if (info_fd >= 0)
313           mysql_file_close(info_fd, MYF(0));
314         info_fd= -1;
315         mysql_mutex_lock(log_lock);
316         relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
317         mysql_mutex_unlock(log_lock);
318         mysql_mutex_unlock(&data_lock);
319         DBUG_RETURN(1);
320       }
321     }
322 
323     int relay_log_pos, master_log_pos, lines;
324     char *first_non_digit;
325 
326     /*
327       Starting from MySQL 5.6.x, relay-log.info has a new format.
328       Now, its first line contains the number of lines in the file.
329       By reading this number we can determine which version our master.info
330       comes from. We can't simply count the lines in the file, since
331       versions before 5.6.x could generate files with more lines than
332       needed. If first line doesn't contain a number, or if it
333       contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
334       then the file is treated like a file from pre-5.6.x version.
335       There is no ambiguity when reading an old master.info: before
336       5.6.x, the first line contained the binlog's name, which is
337       either empty or has an extension (contains a '.'), so can't be
338       confused with an integer.
339 
340       So we're just reading first line and trying to figure which
341       version is this.
342     */
343 
344     /*
345       The first row is temporarily stored in mi->master_log_name, if
346       it is line count and not binlog name (new format) it will be
347       overwritten by the second row later.
348     */
349     if (init_strvar_from_file(group_relay_log_name,
350                               sizeof(group_relay_log_name),
351                               &info_file, ""))
352     {
353       msg="Error reading slave log configuration";
354       goto err;
355     }
356 
357     lines= strtoul(group_relay_log_name, &first_non_digit, 10);
358 
359     if (group_relay_log_name[0] != '\0' &&
360         *first_non_digit == '\0' &&
361         lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
362     {
363       DBUG_PRINT("info", ("relay_log_info file is in new format."));
364       /* Seems to be new format => read relay log name from next line */
365       if (init_strvar_from_file(group_relay_log_name,
366                                 sizeof(group_relay_log_name),
367                                 &info_file, ""))
368       {
369         msg="Error reading slave log configuration";
370         goto err;
371       }
372     }
373     else
374       DBUG_PRINT("info", ("relay_log_info file is in old format."));
375 
376     if (init_intvar_from_file(&relay_log_pos,
377                               &info_file, BIN_LOG_HEADER_SIZE) ||
378         init_strvar_from_file(group_master_log_name,
379                               sizeof(group_master_log_name),
380                               &info_file, "") ||
381         init_intvar_from_file(&master_log_pos, &info_file, 0) ||
382         (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
383          init_intvar_from_file(&sql_delay, &info_file, 0)))
384     {
385       msg="Error reading slave log configuration";
386       goto err;
387     }
388 
389     strmake_buf(event_relay_log_name,group_relay_log_name);
390     group_relay_log_pos= event_relay_log_pos= relay_log_pos;
391     group_master_log_pos= master_log_pos;
392 
393     if (is_relay_log_recovery && init_recovery(mi, &msg))
394       goto err;
395 
396     relay_log_state.load(rpl_global_gtid_slave_state);
397     if (init_relay_log_pos(this,
398                            group_relay_log_name,
399                            group_relay_log_pos,
400                            0 /* no data lock*/,
401                            &msg, 0))
402     {
403       sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)",
404                       group_relay_log_name, group_relay_log_pos);
405       goto err;
406     }
407   }
408 
409   DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu",
410                       my_b_tell(cur_log), event_relay_log_pos));
411   DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
412   DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos);
413 
414   /*
415     Now change the cache from READ to WRITE - must do this
416     before Relay_log_info::flush()
417   */
418   reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
419   if (unlikely((error= flush())))
420   {
421     msg= "Failed to flush relay log info file";
422     goto err;
423   }
424   if (count_relay_log_space(this))
425   {
426     msg="Error counting relay log space";
427     goto err;
428   }
429   inited= 1;
430   error_on_rli_init_info= false;
431   mysql_mutex_unlock(&data_lock);
432   DBUG_RETURN(0);
433 
434 err:
435   error_on_rli_init_info= true;
436   if (msg)
437     sql_print_error("%s", msg);
438   end_io_cache(&info_file);
439   if (info_fd >= 0)
440     mysql_file_close(info_fd, MYF(0));
441   info_fd= -1;
442   mysql_mutex_lock(log_lock);
443   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
444   mysql_mutex_unlock(log_lock);
445   mysql_mutex_unlock(&data_lock);
446   DBUG_RETURN(1);
447 }
448 
449 
450 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
451 {
452   MY_STAT s;
453   DBUG_ENTER("add_relay_log");
454   if (!mysql_file_stat(key_file_relaylog,
455                        linfo->log_file_name, &s, MYF(0)))
456   {
457     sql_print_error("log %s listed in the index, but failed to stat",
458                     linfo->log_file_name);
459     DBUG_RETURN(1);
460   }
461   my_atomic_add64_explicit((volatile int64*)(&rli->log_space_total),
462                            s.st_size, MY_MEMORY_ORDER_RELAXED);
463   DBUG_PRINT("info",("log_space_total: %llu", rli->log_space_total));
464   DBUG_RETURN(0);
465 }
466 
467 
468 static int count_relay_log_space(Relay_log_info* rli)
469 {
470   LOG_INFO linfo;
471   DBUG_ENTER("count_relay_log_space");
472   my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0,
473                              MY_MEMORY_ORDER_RELAXED);
474   if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
475   {
476     sql_print_error("Could not find first log while counting relay log space");
477     DBUG_RETURN(1);
478   }
479   do
480   {
481     if (add_relay_log(rli,&linfo))
482       DBUG_RETURN(1);
483   } while (!rli->relay_log.find_next_log(&linfo, 1));
484   /*
485      As we have counted everything, including what may have written in a
486      preceding write, we must reset bytes_written, or we may count some space
487      twice.
488   */
489   rli->relay_log.reset_bytes_written();
490   DBUG_RETURN(0);
491 }
492 
493 
494 /*
495    Reset UNTIL condition for Relay_log_info
496 
497    SYNOPSYS
498     clear_until_condition()
499       rli - Relay_log_info structure where UNTIL condition should be reset
500  */
501 
502 void Relay_log_info::clear_until_condition()
503 {
504   DBUG_ENTER("clear_until_condition");
505 
506   until_condition= Relay_log_info::UNTIL_NONE;
507   until_log_name[0]= 0;
508   until_log_pos= 0;
509   until_relay_log_names_defer= false;
510 
511   DBUG_VOID_RETURN;
512 }
513 
514 
515 /*
516   Read the correct format description event for starting to replicate from
517   a given position in a relay log file.
518 */
519 Format_description_log_event *
520 read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
521                                  const char **errmsg)
522 {
523   Log_event *ev;
524   Format_description_log_event *fdev;
525   bool found= false;
526 
527   /*
528     By default the relay log is in binlog format 3 (4.0).
529     Even if format is 4, this will work enough to read the first event
530     (Format_desc) (remember that format 4 is just lenghtened compared to format
531     3; format 3 is a prefix of format 4).
532   */
533   fdev= new Format_description_log_event(3);
534 
535   while (!found)
536   {
537     Log_event_type typ;
538 
539     /*
540       Read the possible Format_description_log_event; if position
541       was 4, no need, it will be read naturally.
542     */
543     DBUG_PRINT("info",("looking for a Format_description_log_event"));
544 
545     if (my_b_tell(cur_log) >= start_pos)
546       break;
547 
548     if (!(ev= Log_event::read_log_event(cur_log, fdev,
549                                         opt_slave_sql_verify_checksum)))
550     {
551       DBUG_PRINT("info",("could not read event, cur_log->error=%d",
552                          cur_log->error));
553       if (cur_log->error) /* not EOF */
554       {
555         *errmsg= "I/O error reading event at position 4";
556         delete fdev;
557         return NULL;
558       }
559       break;
560     }
561     typ= ev->get_type_code();
562     if (typ == FORMAT_DESCRIPTION_EVENT)
563     {
564       Format_description_log_event *old= fdev;
565       DBUG_PRINT("info",("found Format_description_log_event"));
566       fdev= (Format_description_log_event*) ev;
567       fdev->copy_crypto_data(old);
568       delete old;
569 
570       /*
571         As ev was returned by read_log_event, it has passed is_valid(), so
572         my_malloc() in ctor worked, no need to check again.
573       */
574       /*
575         Ok, we found a Format_description event. But it is not sure that this
576         describes the whole relay log; indeed, one can have this sequence
577         (starting from position 4):
578         Format_desc (of slave)
579         Rotate (of master)
580         Format_desc (of master)
581         So the Format_desc which really describes the rest of the relay log
582         is the 3rd event (it can't be further than that, because we rotate
583         the relay log when we queue a Rotate event from the master).
584         But what describes the Rotate is the first Format_desc.
585         So what we do is:
586         go on searching for Format_description events, until you exceed the
587         position (argument 'pos') or until you find another event than Rotate
588         or Format_desc.
589       */
590     }
591     else if (typ == START_ENCRYPTION_EVENT)
592     {
593       if (fdev->start_decryption((Start_encryption_log_event*) ev))
594       {
595         *errmsg= "Unable to set up decryption of binlog.";
596         delete ev;
597         delete fdev;
598         return NULL;
599       }
600       delete ev;
601     }
602     else
603     {
604       DBUG_PRINT("info",("found event of another type=%d", typ));
605       found= (typ != ROTATE_EVENT);
606       delete ev;
607     }
608   }
609   return fdev;
610 }
611 
612 
613 /*
614   Open the given relay log
615 
616   SYNOPSIS
617     init_relay_log_pos()
618     rli                 Relay information (will be initialized)
619     log                 Name of relay log file to read from. NULL = First log
620     pos                 Position in relay log file
621     need_data_lock      Set to 1 if this functions should do mutex locks
622     errmsg              Store pointer to error message here
623     look_for_description_event
624                         1 if we should look for such an event. We only need
625                         this when the SQL thread starts and opens an existing
626                         relay log and has to execute it (possibly from an
627                         offset >4); then we need to read the first event of
628                         the relay log to be able to parse the events we have
629                         to execute.
630 
631   DESCRIPTION
632   - Close old open relay log files.
633   - If we are using the same relay log as the running IO-thread, then set
634     rli->cur_log to point to the same IO_CACHE entry.
635   - If not, open the 'log' binary file.
636 
637   TODO
638     - check proper initialization of group_master_log_name/group_master_log_pos
639 
640   RETURN VALUES
641     0   ok
642     1   error.  errmsg is set to point to the error message
643 */
644 
645 int init_relay_log_pos(Relay_log_info* rli,const char* log,
646                        ulonglong pos, bool need_data_lock,
647                        const char** errmsg,
648                        bool look_for_description_event)
649 {
650   DBUG_ENTER("init_relay_log_pos");
651   DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
652 
653   *errmsg=0;
654   mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
655 
656   if (need_data_lock)
657     mysql_mutex_lock(&rli->data_lock);
658 
659   /*
660     Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
661     is, too, and init_slave() too; these 2 functions allocate a description
662     event in init_relay_log_pos, which is not freed by the terminating SQL slave
663     thread as that thread is not started by these functions. So we have to free
664     the description_event here, in case, so that there is no memory leak in
665     running, say, CHANGE MASTER.
666   */
667   delete rli->relay_log.description_event_for_exec;
668   /*
669     By default the relay log is in binlog format 3 (4.0).
670     Even if format is 4, this will work enough to read the first event
671     (Format_desc) (remember that format 4 is just lenghtened compared to format
672     3; format 3 is a prefix of format 4).
673   */
674   rli->relay_log.description_event_for_exec= new
675     Format_description_log_event(3);
676 
677   mysql_mutex_lock(log_lock);
678 
679   /* Close log file and free buffers if it's already open */
680   if (rli->cur_log_fd >= 0)
681   {
682     end_io_cache(&rli->cache_buf);
683     mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
684     rli->cur_log_fd = -1;
685   }
686 
687   rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
688   rli->clear_flag(Relay_log_info::IN_STMT);
689   rli->clear_flag(Relay_log_info::IN_TRANSACTION);
690 
691   /*
692     Test to see if the previous run was with the skip of purging
693     If yes, we do not purge when we restart
694   */
695   if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
696   {
697     *errmsg="Could not find first log during relay log initialization";
698     goto err;
699   }
700 
701   if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
702   {
703     *errmsg="Could not find target log during relay log initialization";
704     goto err;
705   }
706   strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name);
707   strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
708   if (rli->relay_log.is_active(rli->linfo.log_file_name))
709   {
710     /*
711       The IO thread is using this log file.
712       In this case, we will use the same IO_CACHE pointer to
713       read data as the IO thread is using to write data.
714     */
715     my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
716     if (check_binlog_magic(rli->cur_log,errmsg))
717       goto err;
718     rli->cur_log_old_open_count=rli->relay_log.get_open_count();
719   }
720   else
721   {
722     /*
723       Open the relay log and set rli->cur_log to point at this one
724     */
725     if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
726                                      rli->linfo.log_file_name,errmsg)) < 0)
727       goto err;
728     rli->cur_log = &rli->cache_buf;
729   }
730   /*
731     In all cases, check_binlog_magic() has been called so we're at offset 4 for
732     sure.
733   */
734   if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
735   {
736     if (look_for_description_event)
737     {
738       Format_description_log_event *fdev;
739       if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
740         goto err;
741       delete rli->relay_log.description_event_for_exec;
742       rli->relay_log.description_event_for_exec= fdev;
743     }
744     my_b_seek(rli->cur_log,(off_t)pos);
745     DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu",
746                         my_b_tell(rli->cur_log), rli->event_relay_log_pos));
747 
748   }
749 
750 err:
751   /*
752     If we don't purge, we can't honour relay_log_space_limit ;
753     silently discard it
754   */
755   if (!relay_log_purge)
756     rli->log_space_limit= 0;
757   mysql_cond_broadcast(&rli->data_cond);
758 
759   mysql_mutex_unlock(log_lock);
760 
761   if (need_data_lock)
762     mysql_mutex_unlock(&rli->data_lock);
763   if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
764     *errmsg= "Invalid Format_description log event; could be out of memory";
765 
766   DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0));
767 
768   DBUG_RETURN ((*errmsg) ? 1 : 0);
769 }
770 
771 
772 /*
773   Waits until the SQL thread reaches (has executed up to) the
774   log/position or timed out.
775 
776   SYNOPSIS
777     wait_for_pos()
778     thd             client thread that sent SELECT MASTER_POS_WAIT
779     log_name        log name to wait for
780     log_pos         position to wait for
781     timeout         timeout in seconds before giving up waiting
782 
783   NOTES
784     timeout is longlong whereas it should be ulong ; but this is
785     to catch if the user submitted a negative timeout.
786 
787   RETURN VALUES
788     -2          improper arguments (log_pos<0)
789                 or slave not running, or master info changed
790                 during the function's execution,
791                 or client thread killed. -2 is translated to NULL by caller
792     -1          timed out
793     >=0         number of log events the function had to wait
794                 before reaching the desired log/position
795  */
796 
797 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
798                                     longlong log_pos,
799                                     longlong timeout)
800 {
801   int event_count = 0;
802   ulong init_abort_pos_wait;
803   int error=0;
804   struct timespec abstime; // for timeout checking
805   PSI_stage_info old_stage;
806   DBUG_ENTER("Relay_log_info::wait_for_pos");
807 
808   if (!inited)
809     DBUG_RETURN(-2);
810 
811   DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
812                       log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
813 
814   set_timespec(abstime,timeout);
815   mysql_mutex_lock(&data_lock);
816   thd->ENTER_COND(&data_cond, &data_lock,
817                   &stage_waiting_for_the_slave_thread_to_advance_position,
818                   &old_stage);
819   /*
820      This function will abort when it notices that some CHANGE MASTER or
821      RESET MASTER has changed the master info.
822      To catch this, these commands modify abort_pos_wait ; We just monitor
823      abort_pos_wait and see if it has changed.
824      Why do we have this mechanism instead of simply monitoring slave_running
825      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
826      the SQL thread be stopped?
827      This is becasue if someones does:
828      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
829      the change may happen very quickly and we may not notice that
830      slave_running briefly switches between 1/0/1.
831   */
832   init_abort_pos_wait= abort_pos_wait;
833 
834   /*
835     We'll need to
836     handle all possible log names comparisons (e.g. 999 vs 1000).
837     We use ulong for string->number conversion ; this is no
838     stronger limitation than in find_uniq_filename in sql/log.cc
839   */
840   ulong log_name_extension;
841   char log_name_tmp[FN_REFLEN]; //make a char[] from String
842 
843   strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1));
844 
845   char *p= fn_ext(log_name_tmp);
846   char *p_end;
847   if (!*p || log_pos<0)
848   {
849     error= -2; //means improper arguments
850     goto err;
851   }
852   // Convert 0-3 to 4
853   log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE);
854   /* p points to '.' */
855   log_name_extension= strtoul(++p, &p_end, 10);
856   /*
857     p_end points to the first invalid character.
858     If it equals to p, no digits were found, error.
859     If it contains '\0' it means conversion went ok.
860   */
861   if (p_end==p || *p_end)
862   {
863     error= -2;
864     goto err;
865   }
866 
867   /* The "compare and wait" main loop */
868   while (!thd->killed &&
869          init_abort_pos_wait == abort_pos_wait &&
870          slave_running)
871   {
872     bool pos_reached;
873     int cmp_result= 0;
874 
875     DBUG_PRINT("info",
876                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
877                 init_abort_pos_wait, abort_pos_wait));
878     DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
879                        group_master_log_name, (ulong) group_master_log_pos));
880 
881     /*
882       group_master_log_name can be "", if we are just after a fresh
883       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
884       (before we have executed one Rotate event from the master) or
885       (rare) if the user is doing a weird slave setup (see next
886       paragraph).  If group_master_log_name is "", we assume we don't
887       have enough info to do the comparison yet, so we just wait until
888       more data. In this case master_log_pos is always 0 except if
889       somebody (wrongly) sets this slave to be a slave of itself
890       without using --replicate-same-server-id (an unsupported
891       configuration which does nothing), then group_master_log_pos
892       will grow and group_master_log_name will stay "".
893     */
894     if (*group_master_log_name)
895     {
896       char *basename= (group_master_log_name +
897                        dirname_length(group_master_log_name));
898       /*
899         First compare the parts before the extension.
900         Find the dot in the master's log basename,
901         and protect against user's input error :
902         if the names do not match up to '.' included, return error
903       */
904       char *q= (char*)(fn_ext(basename)+1);
905       if (strncmp(basename, log_name_tmp, (int)(q-basename)))
906       {
907         error= -2;
908         break;
909       }
910       // Now compare extensions.
911       char *q_end;
912       ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
913       if (group_master_log_name_extension < log_name_extension)
914         cmp_result= -1 ;
915       else
916         cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
917 
918       pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
919                     cmp_result > 0);
920       if (pos_reached || thd->killed)
921         break;
922     }
923 
924     //wait for master update, with optional timeout.
925 
926     DBUG_PRINT("info",("Waiting for master update"));
927     /*
928       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
929       will wake us up.
930     */
931     thd_wait_begin(thd, THD_WAIT_BINLOG);
932     if (timeout > 0)
933     {
934       /*
935         Note that mysql_cond_timedwait checks for the timeout
936         before for the condition ; i.e. it returns ETIMEDOUT
937         if the system time equals or exceeds the time specified by abstime
938         before the condition variable is signaled or broadcast, _or_ if
939         the absolute time specified by abstime has already passed at the time
940         of the call.
941         For that reason, mysql_cond_timedwait will do the "timeoutting" job
942         even if its condition is always immediately signaled (case of a loaded
943         master).
944       */
945       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
946     }
947     else
948       mysql_cond_wait(&data_cond, &data_lock);
949     thd_wait_end(thd);
950     DBUG_PRINT("info",("Got signal of master update or timed out"));
951     if (error == ETIMEDOUT || error == ETIME)
952     {
953       error= -1;
954       break;
955     }
956     error=0;
957     event_count++;
958     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
959   }
960 
961 err:
962   thd->EXIT_COND(&old_stage);
963   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
964 improper_arguments: %d  timed_out: %d",
965                      thd->killed_errno(),
966                      (int) (init_abort_pos_wait != abort_pos_wait),
967                      (int) slave_running,
968                      (int) (error == -2),
969                      (int) (error == -1)));
970   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
971       !slave_running)
972   {
973     error= -2;
974   }
975   DBUG_RETURN( error ? error : event_count );
976 }
977 
978 
979 void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
980                                              rpl_group_info *rgi,
981                                              bool skip_lock)
982 {
983   DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
984 
985   if (skip_lock)
986     mysql_mutex_assert_owner(&data_lock);
987   else
988     mysql_mutex_lock(&data_lock);
989 
990   rgi->inc_event_relay_log_pos();
991   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
992                       (long) log_pos, (long) group_master_log_pos));
993   if (rgi->is_parallel_exec)
994   {
995     /* In case of parallel replication, do not update the position backwards. */
996     int cmp= compare_log_name(group_relay_log_name, rgi->event_relay_log_name);
997     if (cmp < 0)
998     {
999       group_relay_log_pos= rgi->future_event_relay_log_pos;
1000       strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
1001     } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
1002       group_relay_log_pos= rgi->future_event_relay_log_pos;
1003 
1004     /*
1005       In the parallel case we need to update the master_log_name here, rather
1006       than in Rotate_log_event::do_update_pos().
1007     */
1008     cmp= compare_log_name(group_master_log_name, rgi->future_event_master_log_name);
1009     if (cmp <= 0)
1010     {
1011       if (cmp < 0)
1012       {
1013         strcpy(group_master_log_name, rgi->future_event_master_log_name);
1014         group_master_log_pos= log_pos;
1015       }
1016       else if (group_master_log_pos < log_pos)
1017         group_master_log_pos= log_pos;
1018     }
1019 
1020     /*
1021       In the parallel case, we only update the Seconds_Behind_Master at the
1022       end of a transaction. In the non-parallel case, the value is updated as
1023       soon as an event is read from the relay log; however this would be too
1024       confusing for the user, seeing the slave reported as up-to-date when
1025       potentially thousands of events are still queued up for worker threads
1026       waiting for execution.
1027     */
1028     if (rgi->last_master_timestamp &&
1029         rgi->last_master_timestamp > last_master_timestamp)
1030       last_master_timestamp= rgi->last_master_timestamp;
1031   }
1032   else
1033   {
1034     /* Non-parallel case. */
1035     group_relay_log_pos= event_relay_log_pos;
1036     strmake_buf(group_relay_log_name, event_relay_log_name);
1037     notify_group_relay_log_name_update();
1038     if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event
1039       group_master_log_pos= log_pos;
1040   }
1041 
1042   /*
1043     If the slave does not support transactions and replicates a transaction,
1044     users should not trust group_master_log_pos (which they can display with
1045     SHOW SLAVE STATUS or read from relay-log.info), because to compute
1046     group_master_log_pos the slave relies on log_pos stored in the master's
1047     binlog, but if we are in a master's transaction these positions are always
1048     the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
1049     not advance as it should on the non-transactional slave (it advances by
1050     big leaps, whereas it should advance by small leaps).
1051   */
1052   /*
1053     In 4.x we used the event's len to compute the positions here. This is
1054     wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1055     then the event's len is not what is was in the master's binlog, so this
1056     will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1057     replication: Exec_master_log_pos is wrong). Only way to solve this is to
1058     have the original offset of the end of the event the relay log. This is
1059     what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1060     of log_pos in 4.0 was to compute the end_log_pos; so better to store
1061     end_log_pos instead of begin_log_pos.
1062     If we had not done this fix here, the problem would also have appeared
1063     when the slave and master are 5.0 but with different event length (for
1064     example the slave is more recent than the master and features the event
1065     UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1066     SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1067     value which would lead to badly broken replication.
1068     Even the relay_log_pos will be corrupted in this case, because the len is
1069     the relay log is not "val".
1070     With the end_log_pos solution, we avoid computations involving lengthes.
1071   */
1072   mysql_cond_broadcast(&data_cond);
1073   if (!skip_lock)
1074     mysql_mutex_unlock(&data_lock);
1075   DBUG_VOID_RETURN;
1076 }
1077 
1078 
1079 void Relay_log_info::close_temporary_tables()
1080 {
1081   DBUG_ENTER("Relay_log_info::close_temporary_tables");
1082 
1083   TMP_TABLE_SHARE *share;
1084   TABLE *table;
1085 
1086   if (!save_temporary_tables)
1087   {
1088     /* There are no temporary tables. */
1089     DBUG_VOID_RETURN;
1090   }
1091 
1092   while ((share= save_temporary_tables->pop_front()))
1093   {
1094     /*
1095       Iterate over the list of tables for this TABLE_SHARE and close them.
1096     */
1097     while ((table= share->all_tmp_tables.pop_front()))
1098     {
1099       DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
1100                               table->s->db.str, table->s->table_name.str));
1101 
1102       /* Reset in_use as the table may have been created by another thd */
1103       table->in_use= 0;
1104       /*
1105         Lets not free TABLE_SHARE here as there could be multiple TABLEs opened
1106         for the same table (TABLE_SHARE).
1107       */
1108       closefrm(table);
1109       my_free(table);
1110     }
1111 
1112     /*
1113       Don't ask for disk deletion. For now, anyway they will be deleted when
1114       slave restarts, but it is a better intention to not delete them.
1115     */
1116 
1117     free_table_share(share);
1118     my_free(share);
1119   }
1120 
1121   /* By now, there mustn't be any elements left in the list. */
1122   DBUG_ASSERT(save_temporary_tables->is_empty());
1123 
1124   my_free(save_temporary_tables);
1125   save_temporary_tables= NULL;
1126   slave_open_temp_tables= 0;
1127 
1128   DBUG_VOID_RETURN;
1129 }
1130 
1131 /*
1132   purge_relay_logs()
1133 
1134   @param rli		Relay log information
1135   @param thd		thread id. May be zero during startup
1136 
1137   NOTES
1138     Assumes to have a run lock on rli and that no slave thread are running.
1139 */
1140 
1141 int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
1142                      const char** errmsg)
1143 {
1144   int error=0;
1145   const char *ln;
1146   char name_buf[FN_REFLEN];
1147   DBUG_ENTER("purge_relay_logs");
1148 
1149   /*
1150     Even if rli->inited==0, we still try to empty rli->master_log_* variables.
1151     Indeed, rli->inited==0 does not imply that they already are empty.
1152     It could be that slave's info initialization partly succeeded :
1153     for example if relay-log.info existed but *relay-bin*.*
1154     have been manually removed, Relay_log_info::init() reads the old
1155     relay-log.info and fills rli->master_log_*, then Relay_log_info::init()
1156     checks for the existence of the relay log, this fails and
1157     Relay_log_info::init() leaves rli->inited to 0.
1158     In that pathological case, rli->master_log_pos* will be properly reinited
1159     at the next START SLAVE (as RESET SLAVE or CHANGE
1160     MASTER, the callers of purge_relay_logs, will delete bogus *.info files
1161     or replace them with correct files), however if the user does SHOW SLAVE
1162     STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
1163     In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
1164     to display fine in any case.
1165   */
1166 
1167   rli->group_master_log_name[0]= 0;
1168   rli->group_master_log_pos= 0;
1169 
1170   if (!rli->inited)
1171   {
1172     DBUG_PRINT("info", ("rli->inited == 0"));
1173     if (rli->error_on_rli_init_info)
1174     {
1175       ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
1176                                        1, name_buf);
1177 
1178       if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1179       {
1180         sql_print_error("Unable to purge relay log files. Failed to open relay "
1181                         "log index file:%s.", rli->relay_log.get_index_fname());
1182         DBUG_RETURN(1);
1183       }
1184       mysql_mutex_lock(rli->relay_log.get_log_lock());
1185       if (rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
1186                              (ulong)(rli->max_relay_log_size ? rli->max_relay_log_size :
1187                               max_binlog_size), 1, TRUE))
1188       {
1189         sql_print_error("Unable to purge relay log files. Failed to open relay "
1190                         "log file:%s.", rli->relay_log.get_log_fname());
1191         mysql_mutex_unlock(rli->relay_log.get_log_lock());
1192         DBUG_RETURN(1);
1193       }
1194       mysql_mutex_unlock(rli->relay_log.get_log_lock());
1195     }
1196     else
1197       DBUG_RETURN(0);
1198   }
1199   else
1200   {
1201     DBUG_ASSERT(rli->slave_running == 0);
1202     DBUG_ASSERT(rli->mi->slave_running == 0);
1203   }
1204   mysql_mutex_lock(&rli->data_lock);
1205 
1206   /*
1207     we close the relay log fd possibly left open by the slave SQL thread,
1208     to be able to delete it; the relay log fd possibly left open by the slave
1209     I/O thread will be closed naturally in reset_logs() by the
1210     close(LOG_CLOSE_TO_BE_OPENED) call
1211   */
1212   if (rli->cur_log_fd >= 0)
1213   {
1214     end_io_cache(&rli->cache_buf);
1215     mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
1216     rli->cur_log_fd= -1;
1217   }
1218 
1219   if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0))
1220   {
1221     *errmsg = "Failed during log reset";
1222     error=1;
1223     goto err;
1224   }
1225   rli->relay_log_state.load(rpl_global_gtid_slave_state);
1226   if (!just_reset)
1227   {
1228     /* Save name of used relay log file */
1229     strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
1230     strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
1231     rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1232     my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0,
1233                                MY_MEMORY_ORDER_RELAXED);
1234     if (count_relay_log_space(rli))
1235     {
1236       *errmsg= "Error counting relay log space";
1237       error=1;
1238       goto err;
1239     }
1240     error= init_relay_log_pos(rli, rli->group_relay_log_name,
1241                               rli->group_relay_log_pos,
1242                               0 /* do not need data lock */, errmsg, 0);
1243   }
1244   else
1245   {
1246     /* Ensure relay log names are not used */
1247     rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
1248   }
1249 
1250   if (!rli->inited && rli->error_on_rli_init_info)
1251   {
1252     mysql_mutex_lock(rli->relay_log.get_log_lock());
1253     rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1254     mysql_mutex_unlock(rli->relay_log.get_log_lock());
1255   }
1256 err:
1257   DBUG_PRINT("info",("log_space_total: %llu",rli->log_space_total));
1258   mysql_mutex_unlock(&rli->data_lock);
1259   DBUG_RETURN(error);
1260 }
1261 
1262 
1263 /*
1264      Check if condition stated in UNTIL clause of START SLAVE is reached.
1265    SYNOPSYS
1266      Relay_log_info::is_until_satisfied()
1267      master_beg_pos    position of the beginning of to be executed event
1268                        (not log_pos member of the event that points to the
1269                         beginning of the following event)
1270 
1271 
1272    DESCRIPTION
1273      Checks if UNTIL condition is reached. Uses caching result of last
1274      comparison of current log file name and target log file name. So cached
1275      value should be invalidated if current log file name changes
1276      (see Relay_log_info::notify_... functions).
1277 
1278      This caching is needed to avoid of expensive string comparisons and
1279      strtol() conversions needed for log names comparison. We don't need to
1280      compare them each time this function is called, we only need to do this
1281      when current log name changes. If we have UNTIL_MASTER_POS condition we
1282      need to do this only after Rotate_log_event::do_apply_event() (which is
1283      rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
1284      condition then we should invalidate cached comarison value after
1285      inc_group_relay_log_pos() which called for each group of events (so we
1286      have some benefit if we have something like queries that use
1287      autoincrement or if we have transactions).
1288 
1289      Should be called ONLY if until_condition != UNTIL_NONE !
1290 
1291      In the parallel execution mode and UNTIL_MASTER_POS the file name is
1292      presented by future_event_master_log_name which may be ahead of
1293      group_master_log_name. Log_event::log_pos does relate to it nevertheless
1294      so the pair comprises a correct binlog coordinate.
1295      Internal group events and events that have zero log_pos also
1296      produce the zero for the local log_pos which may not lead to the
1297      function falsely return true.
1298      In UNTIL_RELAY_POS the original caching and notification are simplified
1299      to straightforward files comparison when the current event can't be
1300      a part of an event group.
1301 
1302    RETURN VALUE
1303      true - condition met or error happened (condition seems to have
1304             bad log file name)
1305      false - condition not met
1306 */
1307 
1308 bool Relay_log_info::is_until_satisfied(Log_event *ev)
1309 {
1310   const char *log_name;
1311   ulonglong log_pos;
1312   /* Prevents stopping within transaction; needed solely for Relay UNTIL. */
1313   bool in_trans= false;
1314 
1315   DBUG_ENTER("Relay_log_info::is_until_satisfied");
1316 
1317   if (until_condition == UNTIL_MASTER_POS)
1318   {
1319     log_name= (mi->using_parallel() ? future_event_master_log_name
1320                                     : group_master_log_name);
1321     log_pos= (get_flag(Relay_log_info::IN_TRANSACTION) || !ev || !ev->log_pos) ?
1322       (mi->using_parallel() ? 0 : group_master_log_pos) :
1323       ev->log_pos - ev->data_written;
1324   }
1325   else
1326   {
1327     DBUG_ASSERT(until_condition == UNTIL_RELAY_POS);
1328     if (!mi->using_parallel())
1329     {
1330       log_name= group_relay_log_name;
1331       log_pos= group_relay_log_pos;
1332     }
1333     else
1334     {
1335       log_name= event_relay_log_name;
1336       log_pos=  event_relay_log_pos;
1337       in_trans= get_flag(Relay_log_info::IN_TRANSACTION);
1338       /*
1339         until_log_names_cmp_result is set to UNKNOWN either
1340         -  by a non-group event *and* only when it is in the middle of a group
1341         -  or by a group event when the preceding group made the above
1342            non-group event to defer the resetting.
1343       */
1344       if ((ev && !Log_event::is_group_event(ev->get_type_code())))
1345       {
1346         if (in_trans)
1347         {
1348           until_relay_log_names_defer= true;
1349         }
1350         else
1351         {
1352           until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
1353           until_relay_log_names_defer= false;
1354         }
1355       }
1356       else if (!in_trans && until_relay_log_names_defer)
1357       {
1358         until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
1359         until_relay_log_names_defer= false;
1360       }
1361     }
1362   }
1363 
1364   DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu",
1365                       group_master_log_name, group_master_log_pos));
1366   DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%llu",
1367                       group_relay_log_name, group_relay_log_pos));
1368   DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%llu",
1369                       until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1370                       log_name, log_pos));
1371   DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%llu",
1372                       until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1373                       until_log_name, until_log_pos));
1374 
1375   if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1376   {
1377     /*
1378       We have no cached comparison results so we should compare log names
1379       and cache result.
1380       If we are after RESET SLAVE, and the SQL slave thread has not processed
1381       any event yet, it could be that group_master_log_name is "". In that case,
1382       just wait for more events (as there is no sensible comparison to do).
1383     */
1384 
1385     if (*log_name)
1386     {
1387       const char *basename= log_name + dirname_length(log_name);
1388 
1389       const char *q= (const char*)(fn_ext(basename)+1);
1390       if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1391       {
1392         /* Now compare extensions. */
1393         char *q_end;
1394         ulong log_name_extension= strtoul(q, &q_end, 10);
1395         if (log_name_extension < until_log_name_extension)
1396           until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1397         else
1398           until_log_names_cmp_result=
1399             (log_name_extension > until_log_name_extension) ?
1400             UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1401       }
1402       else
1403       {
1404         /* Probably error so we aborting */
1405         sql_print_error("Slave SQL thread is stopped because UNTIL "
1406                         "condition is bad.");
1407         DBUG_RETURN(TRUE);
1408       }
1409     }
1410     else
1411       DBUG_RETURN(until_log_pos == 0);
1412   }
1413 
1414   DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1415                 (log_pos >= until_log_pos && !in_trans)) ||
1416                until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
1417 }
1418 
1419 
1420 bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
1421                                rpl_group_info *rgi)
1422 {
1423   int error= 0;
1424   DBUG_ENTER("Relay_log_info::stmt_done");
1425 
1426   DBUG_ASSERT(!belongs_to_client());
1427   DBUG_ASSERT(rgi->rli == this);
1428   /*
1429     If in a transaction, and if the slave supports transactions, just
1430     inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
1431     (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
1432     BEGIN/COMMIT, not with SET AUTOCOMMIT= .
1433 
1434     We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
1435     is also used for single row transactions.
1436 
1437     CAUTION: opt_using_transactions means innodb || bdb ; suppose the
1438     master supports InnoDB and BDB, but the slave supports only BDB,
1439     problems will arise: - suppose an InnoDB table is created on the
1440     master, - then it will be MyISAM on the slave - but as
1441     opt_using_transactions is true, the slave will believe he is
1442     transactional with the MyISAM table. And problems will come when
1443     one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
1444     resume at BEGIN whereas there has not been any rollback).  This is
1445     the problem of using opt_using_transactions instead of a finer
1446     "does the slave support _transactional handler used on the
1447     master_".
1448 
1449     More generally, we'll have problems when a query mixes a
1450     transactional handler and MyISAM and STOP SLAVE is issued in the
1451     middle of the "transaction". START SLAVE will resume at BEGIN
1452     while the MyISAM table has already been updated.
1453   */
1454   if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
1455       opt_using_transactions)
1456     rgi->inc_event_relay_log_pos();
1457   else
1458   {
1459     inc_group_relay_log_pos(event_master_log_pos, rgi);
1460     if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi))
1461     {
1462       report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
1463              "Failed to update GTID state in %s.%s, slave state may become "
1464              "inconsistent: %d: %s",
1465              "mysql", rpl_gtid_slave_state_table_name.str,
1466              thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
1467       /*
1468         At this point we are not in a transaction (for example after DDL),
1469         so we can not roll back. Anyway, normally updates to the slave
1470         state table should not fail, and if they do, at least we made the
1471         DBA aware of the problem in the error log.
1472       */
1473     }
1474     DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
1475     if (mi->using_gtid == Master_info::USE_GTID_NO)
1476     {
1477       if (rgi->is_parallel_exec)
1478         mysql_mutex_lock(&data_lock);
1479       if (flush())
1480         error= 1;
1481       if (rgi->is_parallel_exec)
1482         mysql_mutex_unlock(&data_lock);
1483     }
1484     DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
1485   }
1486   DBUG_RETURN(error);
1487 }
1488 
1489 
1490 int
1491 Relay_log_info::alloc_inuse_relaylog(const char *name)
1492 {
1493   inuse_relaylog *ir;
1494   uint32 gtid_count;
1495   rpl_gtid *gtid_list;
1496 
1497   if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
1498   {
1499     my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
1500     return 1;
1501   }
1502   gtid_count= relay_log_state.count();
1503   if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
1504                                          MYF(MY_WME))))
1505   {
1506     my_free(ir);
1507     my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
1508     return 1;
1509   }
1510   if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
1511   {
1512     my_free(gtid_list);
1513     my_free(ir);
1514     DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
1515     my_error(ER_OUT_OF_RESOURCES, MYF(0));
1516     return 1;
1517   }
1518   ir->rli= this;
1519   strmake_buf(ir->name, name);
1520   ir->relay_log_state= gtid_list;
1521   ir->relay_log_state_count= gtid_count;
1522 
1523   if (!inuse_relaylog_list)
1524     inuse_relaylog_list= ir;
1525   else
1526   {
1527     last_inuse_relaylog->completed= true;
1528     last_inuse_relaylog->next= ir;
1529   }
1530   last_inuse_relaylog= ir;
1531 
1532   return 0;
1533 }
1534 
1535 
1536 void
1537 Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
1538 {
1539   my_free(ir->relay_log_state);
1540   my_free(ir);
1541 }
1542 
1543 
1544 void
1545 Relay_log_info::reset_inuse_relaylog()
1546 {
1547   inuse_relaylog *cur= inuse_relaylog_list;
1548   while (cur)
1549   {
1550     DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
1551     inuse_relaylog *next= cur->next;
1552     free_inuse_relaylog(cur);
1553     cur= next;
1554   }
1555   inuse_relaylog_list= last_inuse_relaylog= NULL;
1556 }
1557 
1558 
1559 int
1560 Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
1561 {
1562   int res= 0;
1563   while (count)
1564   {
1565     if (relay_log_state.update_nolock(gtid_list, false))
1566       res= 1;
1567     ++gtid_list;
1568     --count;
1569   }
1570   return res;
1571 }
1572 
1573 
1574 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1575 struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
1576 
1577 static int
1578 scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
1579                               LEX_CSTRING *tablename, void **out_hton)
1580 {
1581   TABLE_LIST tlist;
1582   TABLE *UNINIT_VAR(table);
1583   bool table_opened= false;
1584   bool table_scanned= false;
1585   struct gtid_pos_element tmp_entry, *entry;
1586   int err= 0;
1587 
1588   thd->reset_for_next_command();
1589   tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ);
1590   if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
1591     goto end;
1592   table_opened= true;
1593   table= tlist.table;
1594 
1595   if ((err= gtid_check_rpl_slave_state_table(table)))
1596     goto end;
1597 
1598   bitmap_set_all(table->read_set);
1599   if (unlikely(err= table->file->ha_rnd_init_with_error(1)))
1600     goto end;
1601 
1602   table_scanned= true;
1603   for (;;)
1604   {
1605     uint32 domain_id, server_id;
1606     uint64 sub_id, seq_no;
1607     uchar *rec;
1608 
1609     if ((err= table->file->ha_rnd_next(table->record[0])))
1610     {
1611       if (err == HA_ERR_END_OF_FILE)
1612         break;
1613       else
1614       {
1615         table->file->print_error(err, MYF(0));
1616         goto end;
1617       }
1618     }
1619     domain_id= (uint32)table->field[0]->val_int();
1620     sub_id= (ulonglong)table->field[1]->val_int();
1621     server_id= (uint32)table->field[2]->val_int();
1622     seq_no= (ulonglong)table->field[3]->val_int();
1623     DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu",
1624                         (unsigned)domain_id, (unsigned)server_id,
1625                         (ulong)seq_no, (ulong)sub_id));
1626 
1627     tmp_entry.sub_id= sub_id;
1628     tmp_entry.gtid.domain_id= domain_id;
1629     tmp_entry.gtid.server_id= server_id;
1630     tmp_entry.gtid.seq_no= seq_no;
1631     tmp_entry.hton= table->s->db_type();
1632     if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
1633     {
1634       my_error(ER_OUT_OF_RESOURCES, MYF(0));
1635       goto end;
1636     }
1637 
1638     if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0)))
1639     {
1640       entry= (struct gtid_pos_element *)rec;
1641       if (entry->sub_id >= sub_id)
1642         continue;
1643       entry->sub_id= sub_id;
1644       DBUG_ASSERT(entry->gtid.domain_id == domain_id);
1645       entry->gtid.server_id= server_id;
1646       entry->gtid.seq_no= seq_no;
1647       entry->hton= table->s->db_type();
1648     }
1649     else
1650     {
1651       if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry),
1652                                                         MYF(MY_WME))))
1653       {
1654         my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
1655         err= 1;
1656         goto end;
1657       }
1658       entry->sub_id= sub_id;
1659       entry->gtid.domain_id= domain_id;
1660       entry->gtid.server_id= server_id;
1661       entry->gtid.seq_no= seq_no;
1662       entry->hton= table->s->db_type();
1663       if ((err= my_hash_insert(hash, (uchar *)entry)))
1664       {
1665         my_free(entry);
1666         my_error(ER_OUT_OF_RESOURCES, MYF(0));
1667         goto end;
1668       }
1669     }
1670   }
1671   err= 0;                                       /* Clear HA_ERR_END_OF_FILE */
1672 
1673 end:
1674   if (table_scanned)
1675   {
1676     table->file->ha_index_or_rnd_end();
1677     ha_commit_trans(thd, FALSE);
1678     ha_commit_trans(thd, TRUE);
1679   }
1680   if (table_opened)
1681   {
1682     *out_hton= table->s->db_type();
1683     close_thread_tables(thd);
1684     thd->mdl_context.release_transactional_locks();
1685   }
1686   return err;
1687 }
1688 
1689 
1690 /*
1691   Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
1692   table found into ARRAY. For each domain id, put the row with highest sub_id
1693   into HASH.
1694 */
1695 static int
1696 scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *),
1697                               void *cb_data)
1698 {
1699   char path[FN_REFLEN];
1700   MY_DIR *dirp;
1701 
1702   thd->reset_for_next_command();
1703   if (lock_schema_name(thd, MYSQL_SCHEMA_NAME.str))
1704     return 1;
1705 
1706   build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "", "", 0);
1707   if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
1708   {
1709     my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
1710     close_thread_tables(thd);
1711     thd->release_transactional_locks();
1712     return 1;
1713   }
1714   else
1715   {
1716     size_t i;
1717     Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files);
1718     Discovered_table_list tl(thd, &files);
1719     int err;
1720 
1721     err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false);
1722     my_dirend(dirp);
1723     close_thread_tables(thd);
1724     thd->release_transactional_locks();
1725     if (err)
1726       return err;
1727 
1728     for (i = 0; i < files.elements(); ++i)
1729     {
1730       if (strncmp(files.at(i)->str,
1731                   rpl_gtid_slave_state_table_name.str,
1732                   rpl_gtid_slave_state_table_name.length) == 0)
1733       {
1734         if ((err= (*cb)(thd, files.at(i), cb_data)))
1735           return err;
1736       }
1737     }
1738   }
1739 
1740   return 0;
1741 }
1742 
1743 
1744 struct load_gtid_state_cb_data {
1745   HASH *hash;
1746   DYNAMIC_ARRAY *array;
1747   struct rpl_slave_state::gtid_pos_table *table_list;
1748   struct rpl_slave_state::gtid_pos_table *default_entry;
1749 };
1750 
1751 static int
1752 process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton,
1753                        struct load_gtid_state_cb_data *data)
1754 {
1755   struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
1756   bool is_default=
1757     (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
1758 
1759   /*
1760     Ignore tables with duplicate storage engine, with a warning.
1761     Prefer the default mysql.gtid_slave_pos over another table
1762     mysql.gtid_slave_posXXX with the same storage engine.
1763   */
1764   next_ptr= &data->table_list;
1765   entry= data->table_list;
1766   while (entry)
1767   {
1768     if (entry->table_hton == hton)
1769     {
1770       static const char *warning_msg= "Ignoring redundant table mysql.%s "
1771         "since mysql.%s has the same storage engine";
1772       if (!is_default)
1773       {
1774         /* Ignore the redundant table. */
1775         sql_print_warning(warning_msg, table_name->str, entry->table_name.str);
1776         return 0;
1777       }
1778       else
1779       {
1780         sql_print_warning(warning_msg, entry->table_name.str, table_name->str);
1781         /* Delete the redundant table, and proceed to add this one instead. */
1782         *next_ptr= entry->next;
1783         my_free(entry);
1784         break;
1785       }
1786     }
1787     next_ptr= &entry->next;
1788     entry= entry->next;
1789   }
1790 
1791   p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name,
1792       hton, rpl_slave_state::GTID_POS_AVAILABLE);
1793   if (!p)
1794     return 1;
1795   p->next= data->table_list;
1796   data->table_list= p;
1797   if (is_default)
1798     data->default_entry= p;
1799   return 0;
1800 }
1801 
1802 
1803 /*
1804   Put tables corresponding to @@gtid_pos_auto_engines at the end of the list,
1805   marked to be auto-created if needed.
1806 */
1807 static int
1808 gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr)
1809 {
1810   plugin_ref *auto_engines;
1811   int err= 0;
1812   mysql_mutex_lock(&LOCK_global_system_variables);
1813   for (auto_engines= opt_gtid_pos_auto_plugins;
1814        !err && auto_engines && *auto_engines;
1815        ++auto_engines)
1816   {
1817     void *hton= plugin_hton(*auto_engines);
1818     char buf[FN_REFLEN+1];
1819     LEX_CSTRING table_name;
1820     char *p;
1821     rpl_slave_state::gtid_pos_table *entry, **next_ptr;
1822 
1823     /* See if this engine is already in the list. */
1824     next_ptr= list_ptr;
1825     entry= *list_ptr;
1826     while (entry)
1827     {
1828       if (entry->table_hton == hton)
1829         break;
1830       next_ptr= &entry->next;
1831       entry= entry->next;
1832     }
1833     if (entry)
1834       continue;
1835 
1836     /* Add an auto-create entry for this engine at end of list. */
1837     p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN);
1838     p= strmake(p, "_", FN_REFLEN - (p - buf));
1839     p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf));
1840     table_name.str= buf;
1841     table_name.length= p - buf;
1842     table_case_convert(const_cast<char*>(table_name.str),
1843                        static_cast<uint>(table_name.length));
1844     entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table
1845       (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE);
1846     if (!entry)
1847     {
1848       err= 1;
1849       break;
1850     }
1851     *next_ptr= entry;
1852   }
1853   mysql_mutex_unlock(&LOCK_global_system_variables);
1854   return err;
1855 }
1856 
1857 
1858 static int
1859 load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
1860 {
1861   int err;
1862   load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
1863   void *hton;
1864 
1865   if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
1866                                           table_name, &hton)))
1867     return err;
1868   return process_gtid_pos_table(thd, table_name, hton, data);
1869 }
1870 
1871 
1872 int
1873 rpl_load_gtid_slave_state(THD *thd)
1874 {
1875   bool array_inited= false;
1876   struct gtid_pos_element tmp_entry, *entry;
1877   HASH hash;
1878   DYNAMIC_ARRAY array;
1879   int err= 0;
1880   uint32 i;
1881   load_gtid_state_cb_data cb_data;
1882   DBUG_ENTER("rpl_load_gtid_slave_state");
1883 
1884   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1885   bool loaded= rpl_global_gtid_slave_state->loaded;
1886   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1887   if (loaded)
1888     DBUG_RETURN(0);
1889 
1890   cb_data.table_list= NULL;
1891   cb_data.default_entry= NULL;
1892   my_hash_init(&hash, &my_charset_bin, 32,
1893                offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
1894                sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1895   if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0))))
1896     goto end;
1897   array_inited= true;
1898 
1899   cb_data.hash = &hash;
1900   cb_data.array = &array;
1901   if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data)))
1902     goto end;
1903 
1904   if (!cb_data.default_entry)
1905   {
1906     /*
1907       If the mysql.gtid_slave_pos table does not exist, but at least one other
1908       table is available, arbitrarily pick the first in the list to use as
1909       default.
1910     */
1911     cb_data.default_entry= cb_data.table_list;
1912   }
1913   if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
1914     goto end;
1915 
1916   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1917   if (rpl_global_gtid_slave_state->loaded)
1918   {
1919     mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1920     goto end;
1921   }
1922 
1923   if (!cb_data.table_list)
1924   {
1925     my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
1926              rpl_gtid_slave_state_table_name.str);
1927     mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1928     err= 1;
1929     goto end;
1930   }
1931 
1932   for (i= 0; i < array.elements; ++i)
1933   {
1934     get_dynamic(&array, (uchar *)&tmp_entry, i);
1935     if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
1936                                                   tmp_entry.gtid.server_id,
1937                                                   tmp_entry.sub_id,
1938                                                   tmp_entry.gtid.seq_no,
1939                                                   tmp_entry.hton,
1940                                                   NULL)))
1941     {
1942       mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1943       my_error(ER_OUT_OF_RESOURCES, MYF(0));
1944       goto end;
1945     }
1946   }
1947 
1948   for (i= 0; i < hash.records; ++i)
1949   {
1950     entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
1951     if (opt_bin_log &&
1952         mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
1953                                                     entry->gtid.seq_no))
1954     {
1955       mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1956       my_error(ER_OUT_OF_RESOURCES, MYF(0));
1957       goto end;
1958     }
1959   }
1960 
1961   rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
1962                                                         cb_data.default_entry);
1963   cb_data.table_list= NULL;
1964   rpl_global_gtid_slave_state->loaded= true;
1965   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1966 
1967 end:
1968   if (array_inited)
1969     delete_dynamic(&array);
1970   my_hash_free(&hash);
1971   if (cb_data.table_list)
1972     rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
1973   DBUG_RETURN(err);
1974 }
1975 
1976 
1977 static int
1978 find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
1979 {
1980   load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
1981   TABLE_LIST tlist;
1982   TABLE *table= NULL;
1983   int err;
1984 
1985   thd->reset_for_next_command();
1986   tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ);
1987   if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
1988     goto end;
1989   table= tlist.table;
1990 
1991   if ((err= gtid_check_rpl_slave_state_table(table)))
1992     goto end;
1993   err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
1994 
1995 end:
1996   if (table)
1997   {
1998     ha_commit_trans(thd, FALSE);
1999     ha_commit_trans(thd, TRUE);
2000     close_thread_tables(thd);
2001     thd->release_transactional_locks();
2002   }
2003 
2004   return err;
2005 }
2006 
2007 
2008 /*
2009   Re-compute the list of available mysql.gtid_slave_posXXX tables.
2010 
2011   This is done at START SLAVE to pick up any newly created tables without
2012   requiring server restart.
2013 */
2014 int
2015 find_gtid_slave_pos_tables(THD *thd)
2016 {
2017   int err= 0;
2018   load_gtid_state_cb_data cb_data;
2019   uint num_running;
2020 
2021   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2022   bool loaded= rpl_global_gtid_slave_state->loaded;
2023   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2024   if (!loaded)
2025     return 0;
2026 
2027   cb_data.table_list= NULL;
2028   cb_data.default_entry= NULL;
2029   if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
2030     goto end;
2031 
2032   if (!cb_data.table_list)
2033   {
2034     my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
2035              rpl_gtid_slave_state_table_name.str);
2036     err= 1;
2037     goto end;
2038   }
2039   if (!cb_data.default_entry)
2040   {
2041     /*
2042       If the mysql.gtid_slave_pos table does not exist, but at least one other
2043       table is available, arbitrarily pick the first in the list to use as
2044       default.
2045     */
2046     cb_data.default_entry= cb_data.table_list;
2047   }
2048   if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
2049     goto end;
2050 
2051   mysql_mutex_lock(&LOCK_active_mi);
2052   num_running= any_slave_sql_running(true);
2053   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2054   if (num_running <= 1)
2055   {
2056     /*
2057       If no slave is running now, the count will be 1, since this SQL thread
2058       which is starting is included in the count. In this case, we can safely
2059       replace the list, no-one can be trying to read it without lock.
2060     */
2061     DBUG_ASSERT(num_running == 1);
2062     rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
2063                                                           cb_data.default_entry);
2064     cb_data.table_list= NULL;
2065   }
2066   else
2067   {
2068     /*
2069       If there are SQL threads running, we cannot safely remove the old list.
2070       However we can add new entries, and warn about any tables that
2071       disappeared, but may still be visible to running SQL threads.
2072     */
2073     rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr;
2074 
2075     old_entry= (rpl_slave_state::gtid_pos_table *)
2076       rpl_global_gtid_slave_state->gtid_pos_tables;
2077     while (old_entry)
2078     {
2079       new_entry= cb_data.table_list;
2080       while (new_entry)
2081       {
2082         if (new_entry->table_hton == old_entry->table_hton)
2083           break;
2084         new_entry= new_entry->next;
2085       }
2086       if (!new_entry)
2087         sql_print_warning("The table mysql.%s was removed. "
2088                           "This change will not take full effect "
2089                           "until all SQL threads have been restarted",
2090                           old_entry->table_name.str);
2091       old_entry= old_entry->next;
2092     }
2093     next_ptr_ptr= &cb_data.table_list;
2094     new_entry= cb_data.table_list;
2095     while (new_entry)
2096     {
2097       /* Check if we already have a table with this storage engine. */
2098       old_entry= (rpl_slave_state::gtid_pos_table *)
2099         rpl_global_gtid_slave_state->gtid_pos_tables;
2100       while (old_entry)
2101       {
2102         if (new_entry->table_hton == old_entry->table_hton)
2103           break;
2104         old_entry= old_entry->next;
2105       }
2106       if (old_entry)
2107       {
2108         /* This new_entry is already available in the list. */
2109         next_ptr_ptr= &new_entry->next;
2110         new_entry= new_entry->next;
2111       }
2112       else
2113       {
2114         /* Move this new_entry to the list. */
2115         rpl_slave_state::gtid_pos_table *next= new_entry->next;
2116         rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
2117         *next_ptr_ptr= next;
2118         new_entry= next;
2119       }
2120     }
2121   }
2122   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2123   mysql_mutex_unlock(&LOCK_active_mi);
2124 
2125 end:
2126   if (cb_data.table_list)
2127     rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
2128   return err;
2129 }
2130 
2131 
2132 void
2133 rpl_group_info::reinit(Relay_log_info *rli)
2134 {
2135   this->rli= rli;
2136   tables_to_lock= NULL;
2137   tables_to_lock_count= 0;
2138   trans_retries= 0;
2139   last_event_start_time= 0;
2140   gtid_sub_id= 0;
2141   commit_id= 0;
2142   gtid_pending= false;
2143   worker_error= 0;
2144   row_stmt_start_timestamp= 0;
2145   long_find_row_note_printed= false;
2146   did_mark_start_commit= false;
2147   gtid_ev_flags2= 0;
2148   pending_gtid_delete_list= NULL;
2149   last_master_timestamp = 0;
2150   gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
2151   speculation= SPECULATE_NO;
2152   commit_orderer.reinit();
2153 }
2154 
2155 rpl_group_info::rpl_group_info(Relay_log_info *rli)
2156   : thd(0), wait_commit_sub_id(0),
2157     wait_commit_group_info(0), parallel_entry(0),
2158     deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
2159 {
2160   reinit(rli);
2161   bzero(&current_gtid, sizeof(current_gtid));
2162   mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
2163                    MY_MUTEX_INIT_FAST);
2164   mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
2165 }
2166 
2167 
2168 rpl_group_info::~rpl_group_info()
2169 {
2170   free_annotate_event();
2171   delete deferred_events;
2172   mysql_mutex_destroy(&sleep_lock);
2173   mysql_cond_destroy(&sleep_cond);
2174 }
2175 
2176 
2177 int
2178 event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
2179 {
2180   uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id);
2181   if (!sub_id)
2182   {
2183     /* Out of memory caused hash insertion to fail. */
2184     return 1;
2185   }
2186   rgi->gtid_sub_id= sub_id;
2187   rgi->current_gtid.domain_id= gev->domain_id;
2188   rgi->current_gtid.server_id= gev->server_id;
2189   rgi->current_gtid.seq_no= gev->seq_no;
2190   rgi->commit_id= gev->commit_id;
2191   rgi->gtid_pending= true;
2192   return 0;
2193 }
2194 
2195 
2196 void
2197 delete_or_keep_event_post_apply(rpl_group_info *rgi,
2198                                 Log_event_type typ, Log_event *ev)
2199 {
2200   /*
2201     ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
2202     thread-safe for parallel replication.
2203   */
2204 
2205   switch (typ) {
2206   case FORMAT_DESCRIPTION_EVENT:
2207     /*
2208       Format_description_log_event should not be deleted because it
2209       will be used to read info about the relay log's format;
2210       it will be deleted when the SQL thread does not need it,
2211       i.e. when this thread terminates.
2212     */
2213     break;
2214   case ANNOTATE_ROWS_EVENT:
2215     /*
2216       Annotate_rows event should not be deleted because after it has
2217       been applied, thd->query points to the string inside this event.
2218       The thd->query will be used to generate new Annotate_rows event
2219       during applying the subsequent Rows events.
2220     */
2221     rgi->set_annotate_event((Annotate_rows_log_event*) ev);
2222     break;
2223   case DELETE_ROWS_EVENT_V1:
2224   case UPDATE_ROWS_EVENT_V1:
2225   case WRITE_ROWS_EVENT_V1:
2226   case DELETE_ROWS_EVENT:
2227   case UPDATE_ROWS_EVENT:
2228   case WRITE_ROWS_EVENT:
2229   case WRITE_ROWS_COMPRESSED_EVENT:
2230   case DELETE_ROWS_COMPRESSED_EVENT:
2231   case UPDATE_ROWS_COMPRESSED_EVENT:
2232   case WRITE_ROWS_COMPRESSED_EVENT_V1:
2233   case UPDATE_ROWS_COMPRESSED_EVENT_V1:
2234   case DELETE_ROWS_COMPRESSED_EVENT_V1:
2235     /*
2236       After the last Rows event has been applied, the saved Annotate_rows
2237       event (if any) is not needed anymore and can be deleted.
2238     */
2239     if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
2240       rgi->free_annotate_event();
2241     /* fall through */
2242   default:
2243     DBUG_PRINT("info", ("Deleting the event after it has been executed"));
2244     if (!rgi->is_deferred_event(ev))
2245       delete ev;
2246     break;
2247   }
2248 }
2249 
2250 
2251 void rpl_group_info::cleanup_context(THD *thd, bool error)
2252 {
2253   DBUG_ENTER("rpl_group_info::cleanup_context");
2254   DBUG_PRINT("enter", ("error: %d", (int) error));
2255 
2256   DBUG_ASSERT(this->thd == thd);
2257   /*
2258     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
2259     may have opened tables, which we cannot be sure have been closed (because
2260     maybe the Rows_log_event have not been found or will not be, because slave
2261     SQL thread is stopping, or relay log has a missing tail etc). So we close
2262     all thread's tables. And so the table mappings have to be cancelled.
2263     2) Rows_log_event::do_apply_event() may even have started statements or
2264     transactions on them, which we need to rollback in case of error.
2265     3) If finding a Format_description_log_event after a BEGIN, we also need
2266     to rollback before continuing with the next events.
2267     4) so we need this "context cleanup" function.
2268   */
2269   if (unlikely(error))
2270   {
2271     trans_rollback_stmt(thd); // if a "statement transaction"
2272     /* trans_rollback() also resets OPTION_GTID_BEGIN */
2273     trans_rollback(thd);      // if a "real transaction"
2274     /*
2275       Now that we have rolled back the transaction, make sure we do not
2276       erroneously update the GTID position.
2277     */
2278     gtid_pending= false;
2279 
2280     /*
2281       Rollback will have undone any deletions of old rows we might have made
2282       in mysql.gtid_slave_pos. Put those rows back on the list to be deleted.
2283     */
2284     pending_gtid_deletes_put_back();
2285   }
2286   m_table_map.clear_tables();
2287   slave_close_thread_tables(thd);
2288 
2289   if (unlikely(error))
2290   {
2291     thd->release_transactional_locks();
2292 
2293     if (thd == rli->sql_driver_thd)
2294     {
2295       /*
2296         Reset flags. This is needed to handle incident events and errors in
2297         the relay log noticed by the sql driver thread.
2298       */
2299       rli->clear_flag(Relay_log_info::IN_STMT);
2300       rli->clear_flag(Relay_log_info::IN_TRANSACTION);
2301     }
2302 
2303     /*
2304       Ensure we always release the domain for others to process, when using
2305       --gtid-ignore-duplicates.
2306     */
2307     if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
2308       rpl_global_gtid_slave_state->release_domain_owner(this);
2309   }
2310 
2311   /*
2312     Cleanup for the flags that have been set at do_apply_event.
2313   */
2314   thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS |
2315                                  OPTION_RELAXED_UNIQUE_CHECKS |
2316                                  OPTION_NO_CHECK_CONSTRAINT_CHECKS);
2317 
2318   /*
2319     Reset state related to long_find_row notes in the error log:
2320     - timestamp
2321     - flag that decides whether the slave prints or not
2322   */
2323   reset_row_stmt_start_timestamp();
2324   unset_long_find_row_note_printed();
2325 
2326   DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", {
2327       if (current_gtid.domain_id == 100)
2328         my_sleep(50000);
2329     };);
2330 
2331   DBUG_VOID_RETURN;
2332 }
2333 
2334 
2335 void rpl_group_info::clear_tables_to_lock()
2336 {
2337   DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
2338 #ifndef DBUG_OFF
2339   /**
2340     When replicating in RBR and MyISAM Merge tables are involved
2341     open_and_lock_tables (called in do_apply_event) appends the
2342     base tables to the list of tables_to_lock. Then these are
2343     removed from the list in close_thread_tables (which is called
2344     before we reach this point).
2345 
2346     This assertion just confirms that we get no surprises at this
2347     point.
2348    */
2349   uint i=0;
2350   for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
2351   DBUG_ASSERT(i == tables_to_lock_count);
2352 #endif
2353 
2354   while (tables_to_lock)
2355   {
2356     uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
2357     if (tables_to_lock->m_tabledef_valid)
2358     {
2359       tables_to_lock->m_tabledef.table_def::~table_def();
2360       tables_to_lock->m_tabledef_valid= FALSE;
2361     }
2362 
2363     /*
2364       If blob fields were used during conversion of field values
2365       from the master table into the slave table, then we need to
2366       free the memory used temporarily to store their values before
2367       copying into the slave's table.
2368     */
2369     if (tables_to_lock->m_conv_table)
2370       free_blobs(tables_to_lock->m_conv_table);
2371 
2372     tables_to_lock=
2373       static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
2374     tables_to_lock_count--;
2375     my_free(to_free);
2376   }
2377   DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
2378   DBUG_VOID_RETURN;
2379 }
2380 
2381 
2382 void rpl_group_info::slave_close_thread_tables(THD *thd)
2383 {
2384   DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
2385   thd->get_stmt_da()->set_overwrite_status(true);
2386   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
2387   thd->get_stmt_da()->set_overwrite_status(false);
2388 
2389   close_thread_tables(thd);
2390   /*
2391     - If transaction rollback was requested due to deadlock
2392     perform it and release metadata locks.
2393     - If inside a multi-statement transaction,
2394     defer the release of metadata locks until the current
2395     transaction is either committed or rolled back. This prevents
2396     other statements from modifying the table for the entire
2397     duration of this transaction.  This provides commit ordering
2398     and guarantees serializability across multiple transactions.
2399     - If in autocommit mode, or outside a transactional context,
2400     automatically release metadata locks of the current statement.
2401   */
2402   if (thd->transaction_rollback_request)
2403   {
2404     trans_rollback_implicit(thd);
2405     thd->release_transactional_locks();
2406   }
2407   else if (! thd->in_multi_stmt_transaction_mode())
2408     thd->release_transactional_locks();
2409   else
2410     thd->mdl_context.release_statement_locks();
2411 
2412   clear_tables_to_lock();
2413   DBUG_VOID_RETURN;
2414 }
2415 
2416 
2417 
2418 static void
2419 mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
2420                         rpl_group_info *rgi)
2421 {
2422   group_commit_orderer *tmp;
2423   uint64 count= ++e->count_committing_event_groups;
2424   /* Signal any following GCO whose wait_count has been reached now. */
2425   tmp= gco;
2426   while ((tmp= tmp->next_gco))
2427   {
2428     uint64 wait_count= tmp->wait_count;
2429     if (wait_count > count)
2430       break;
2431     mysql_cond_broadcast(&tmp->COND_group_commit_orderer);
2432   }
2433 }
2434 
2435 
2436 void
2437 rpl_group_info::mark_start_commit_no_lock()
2438 {
2439   if (did_mark_start_commit)
2440     return;
2441   did_mark_start_commit= true;
2442   mark_start_commit_inner(parallel_entry, gco, this);
2443 }
2444 
2445 
2446 void
2447 rpl_group_info::mark_start_commit()
2448 {
2449   rpl_parallel_entry *e;
2450 
2451   if (did_mark_start_commit)
2452     return;
2453   did_mark_start_commit= true;
2454 
2455   e= this->parallel_entry;
2456   mysql_mutex_lock(&e->LOCK_parallel_entry);
2457   mark_start_commit_inner(e, gco, this);
2458   mysql_mutex_unlock(&e->LOCK_parallel_entry);
2459 }
2460 
2461 
2462 /*
2463   Format the current GTID as a string suitable for printing in error messages.
2464 
2465   The string is stored in a buffer inside rpl_group_info, so remains valid
2466   until next call to gtid_info() or until destruction of rpl_group_info.
2467 
2468   If no GTID is available, then NULL is returned.
2469 */
2470 char *
2471 rpl_group_info::gtid_info()
2472 {
2473   if (!gtid_sub_id || !current_gtid.seq_no)
2474     return NULL;
2475   my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
2476               current_gtid.domain_id, current_gtid.server_id,
2477               current_gtid.seq_no);
2478   return gtid_info_buf;
2479 }
2480 
2481 
2482 /*
2483   Undo the effect of a prior mark_start_commit().
2484 
2485   This is only used for retrying a transaction in parallel replication, after
2486   we have encountered a deadlock or other temporary error.
2487 
2488   When we get such a deadlock, it means that the current group of transactions
2489   did not yet all start committing (else they would not have deadlocked). So
2490   we will not yet have woken up anything in the next group, our rgi->gco is
2491   still live, and we can simply decrement the counter (to be incremented again
2492   later, when the retry succeeds and reaches the commit step).
2493 */
2494 void
2495 rpl_group_info::unmark_start_commit()
2496 {
2497   rpl_parallel_entry *e;
2498 
2499   if (!did_mark_start_commit)
2500     return;
2501   did_mark_start_commit= false;
2502 
2503   e= this->parallel_entry;
2504   mysql_mutex_lock(&e->LOCK_parallel_entry);
2505   --e->count_committing_event_groups;
2506   mysql_mutex_unlock(&e->LOCK_parallel_entry);
2507 }
2508 
2509 
2510 /*
2511   When record_gtid() has deleted any old rows from the table
2512   mysql.gtid_slave_pos as part of a replicated transaction, save the list of
2513   rows deleted here.
2514 
2515   If later the transaction fails (eg. optimistic parallel replication), the
2516   deletes will be undone when the transaction is rolled back. Then we can
2517   put back the list of rows into the rpl_global_gtid_slave_state, so that
2518   we can re-do the deletes and avoid accumulating old rows in the table.
2519 */
2520 void
2521 rpl_group_info::pending_gtid_deletes_save(uint32 domain_id,
2522                                           rpl_slave_state::list_element *list)
2523 {
2524   /*
2525     We should never get to a state where we try to save a new pending list of
2526     gtid deletes while we still have an old one. But make sure we handle it
2527     anyway just in case, so we avoid leaving stray entries in the
2528     mysql.gtid_slave_pos table.
2529   */
2530   DBUG_ASSERT(!pending_gtid_delete_list);
2531   if (unlikely(pending_gtid_delete_list))
2532     pending_gtid_deletes_put_back();
2533 
2534   pending_gtid_delete_list= list;
2535   pending_gtid_delete_list_domain= domain_id;
2536 }
2537 
2538 
2539 /*
2540   Take the list recorded by pending_gtid_deletes_save() and put it back into
2541   rpl_global_gtid_slave_state. This is needed if deletion of the rows was
2542   rolled back due to transaction failure.
2543 */
2544 void
2545 rpl_group_info::pending_gtid_deletes_put_back()
2546 {
2547   if (pending_gtid_delete_list)
2548   {
2549     rpl_global_gtid_slave_state->put_back_list(pending_gtid_delete_list_domain,
2550                                                pending_gtid_delete_list);
2551     pending_gtid_delete_list= NULL;
2552   }
2553 }
2554 
2555 
2556 /*
2557   Free the list recorded by pending_gtid_deletes_save(). Done when the deletes
2558   in the list have been permanently committed.
2559 */
2560 void
2561 rpl_group_info::pending_gtid_deletes_clear()
2562 {
2563   pending_gtid_deletes_free(pending_gtid_delete_list);
2564   pending_gtid_delete_list= NULL;
2565 }
2566 
2567 
2568 void
2569 rpl_group_info::pending_gtid_deletes_free(rpl_slave_state::list_element *list)
2570 {
2571   rpl_slave_state::list_element *next;
2572 
2573   while (list)
2574   {
2575     next= list->next;
2576     my_free(list);
2577     list= next;
2578   }
2579 }
2580 
2581 
2582 rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
2583   : rpl_filter(filter)
2584 {
2585   cached_charset_invalidate();
2586 }
2587 
2588 
2589 void rpl_sql_thread_info::cached_charset_invalidate()
2590 {
2591   DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
2592 
2593   /* Full of zeroes means uninitialized. */
2594   bzero(cached_charset, sizeof(cached_charset));
2595   DBUG_VOID_RETURN;
2596 }
2597 
2598 
2599 bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
2600 {
2601   DBUG_ENTER("rpl_group_info::cached_charset_compare");
2602 
2603   if (memcmp(cached_charset, charset, sizeof(cached_charset)))
2604   {
2605     memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
2606     DBUG_RETURN(1);
2607   }
2608   DBUG_RETURN(0);
2609 }
2610 
2611 
2612 /**
2613   Store the file and position where the slave's SQL thread are in the
2614   relay log.
2615 
2616   Notes:
2617 
2618   - This function should be called either from the slave SQL thread,
2619     or when the slave thread is not running.  (It reads the
2620     group_{relay|master}_log_{pos|name} and delay fields in the rli
2621     object.  These may only be modified by the slave SQL thread or by
2622     a client thread when the slave SQL thread is not running.)
2623 
2624   - If there is an active transaction, then we do not update the
2625     position in the relay log.  This is to ensure that we re-execute
2626     statements if we die in the middle of an transaction that was
2627     rolled back.
2628 
2629   - As a transaction never spans binary logs, we don't have to handle
2630     the case where we do a relay-log-rotation in the middle of the
2631     transaction.  If transactions could span several binlogs, we would
2632     have to ensure that we do not delete the relay log file where the
2633     transaction started before switching to a new relay log file.
2634 
2635   - Error can happen if writing to file fails or if flushing the file
2636     fails.
2637 
2638   @param rli The object representing the Relay_log_info.
2639 
2640   @todo Change the log file information to a binary format to avoid
2641   calling longlong2str.
2642 
2643   @return 0 on success, 1 on error.
2644 */
2645 bool Relay_log_info::flush()
2646 {
2647   bool error=0;
2648 
2649   DBUG_ENTER("Relay_log_info::flush()");
2650 
2651   IO_CACHE *file = &info_file;
2652   // 2*file name, 2*long long, 2*unsigned long, 6*'\n'
2653   char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
2654   my_b_seek(file, 0L);
2655   pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
2656   *pos++='\n';
2657   pos=strmov(pos, group_relay_log_name);
2658   *pos++='\n';
2659   pos=longlong10_to_str(group_relay_log_pos, pos, 10);
2660   *pos++='\n';
2661   pos=strmov(pos, group_master_log_name);
2662   *pos++='\n';
2663   pos=longlong10_to_str(group_master_log_pos, pos, 10);
2664   *pos++='\n';
2665   pos= longlong10_to_str(sql_delay, pos, 10);
2666   *pos++= '\n';
2667   if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)))
2668     error=1;
2669   if (flush_io_cache(file))
2670     error=1;
2671   if (sync_relayloginfo_period &&
2672       !error &&
2673       ++sync_counter >= sync_relayloginfo_period)
2674   {
2675     if (my_sync(info_fd, MYF(MY_WME)))
2676       error=1;
2677     sync_counter= 0;
2678   }
2679   /*
2680     Flushing the relay log is done by the slave I/O thread
2681     or by the user on STOP SLAVE.
2682    */
2683   DBUG_RETURN(error);
2684 }
2685 
2686 #endif
2687