1 /* Copyright (c) 2000, 2018, Oracle and/or its affiliates.
2    Copyright (c) 2008, 2019, MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
16 
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #include "unireg.h"
20 #include "sql_base.h"
21 #include "sql_parse.h"                          // check_access
22 #ifdef HAVE_REPLICATION
23 
24 #include "rpl_mi.h"
25 #include "rpl_rli.h"
26 #include "sql_repl.h"
27 #include "sql_acl.h"                            // SUPER_ACL
28 #include "log_event.h"
29 #include "rpl_filter.h"
30 #include <my_dir.h>
31 #include "debug_sync.h"
32 #include "semisync_master.h"
33 #include "semisync_slave.h"
34 
35 
36 enum enum_gtid_until_state {
37   GTID_UNTIL_NOT_DONE,
38   GTID_UNTIL_STOP_AFTER_STANDALONE,
39   GTID_UNTIL_STOP_AFTER_TRANSACTION
40 };
41 
42 
43 int max_binlog_dump_events = 0; // unlimited
44 my_bool opt_sporadic_binlog_dump_fail = 0;
45 #ifndef DBUG_OFF
46 static int binlog_dump_count = 0;
47 #endif
48 
49 extern TYPELIB binlog_checksum_typelib;
50 
51 
52 static int
53 fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
54                   my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
55                   enum enum_binlog_checksum_alg checksum_alg_arg, uint32 end_pos)
56 {
57   char header[LOG_EVENT_HEADER_LEN];
58   ulong event_len;
59 
60   *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
61     checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
62 
63   /*
64     'when' (the timestamp) is set to 0 so that slave could distinguish between
65     real and fake Rotate events (if necessary)
66   */
67   memset(header, 0, 4);
68   header[EVENT_TYPE_OFFSET] = (uchar)event_type;
69   event_len=  LOG_EVENT_HEADER_LEN + extra_len +
70     (*do_checksum ? BINLOG_CHECKSUM_LEN : 0);
71   int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
72   int4store(header + EVENT_LEN_OFFSET, event_len);
73   int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
74   // TODO: check what problems this may cause and fix them
75   int4store(header + LOG_POS_OFFSET, end_pos);
76   if (packet->append(header, sizeof(header)))
77   {
78     *errmsg= "Failed due to out-of-memory writing event";
79     return -1;
80   }
81   if (*do_checksum)
82   {
83     *crc= my_checksum(0, (uchar*)header, sizeof(header));
84   }
85   return 0;
86 }
87 
88 
89 static int
90 fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg)
91 {
92   if (do_checksum)
93   {
94     char b[BINLOG_CHECKSUM_LEN];
95     int4store(b, crc);
96     if (packet->append(b, sizeof(b)))
97     {
98       *errmsg= "Failed due to out-of-memory writing event checksum";
99       return -1;
100     }
101   }
102   return 0;
103 }
104 
105 
106 static int
107 fake_event_write(NET *net, String *packet, const char **errmsg)
108 {
109   if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
110   {
111     *errmsg = "failed on my_net_write()";
112     return -1;
113   }
114   return 0;
115 }
116 
117 
118 /*
119   Helper structure, used to pass miscellaneous info from mysql_binlog_send()
120   into the helper functions that it calls.
121 */
122 struct binlog_send_info {
123   rpl_binlog_state until_binlog_state;
124   slave_connection_state gtid_state;
125   THD *thd;
126   NET *net;
127   String *packet;
128   char *const log_file_name; // ptr/alias to linfo.log_file_name
129   slave_connection_state *until_gtid_state;
130   slave_connection_state until_gtid_state_obj;
131   Format_description_log_event *fdev;
132   int mariadb_slave_capability;
133   enum_gtid_skip_type gtid_skip_group;
134   enum_gtid_until_state gtid_until_group;
135   ushort flags;
136   enum enum_binlog_checksum_alg current_checksum_alg;
137   bool slave_gtid_strict_mode;
138   bool send_fake_gtid_list;
139   bool slave_gtid_ignore_duplicates;
140   bool using_gtid_state;
141 
142   int error;
143   const char *errmsg;
144   char error_text[MAX_SLAVE_ERRMSG];
145   rpl_gtid error_gtid;
146 
147   ulonglong heartbeat_period;
148 
149   /** start file/pos as requested by slave, for error message */
150   char start_log_file_name[FN_REFLEN];
151   my_off_t start_pos;
152 
153   /** last pos for error message */
154   my_off_t last_pos;
155 
156 #ifndef DBUG_OFF
157   int left_events;
158   uint dbug_reconnect_counter;
159   ulong hb_info_counter;
160 #endif
161 
162   bool clear_initial_log_pos;
163   bool should_stop;
164   size_t dirlen;
165 
166   binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
167                    char *lfn)
168     : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
169       log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
170       gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
171       flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
172       slave_gtid_strict_mode(false), send_fake_gtid_list(false),
173       slave_gtid_ignore_duplicates(false),
174       error(0),
175       errmsg("Unknown error"),
176       heartbeat_period(0),
177 #ifndef DBUG_OFF
178       left_events(max_binlog_dump_events),
179       dbug_reconnect_counter(0),
180       hb_info_counter(0),
181 #endif
182       clear_initial_log_pos(false),
183       should_stop(false)
184   {
185     error_text[0] = 0;
186     bzero(&error_gtid, sizeof(error_gtid));
187     until_binlog_state.init();
188   }
189 };
190 
191 // prototype
192 static int reset_transmit_packet(struct binlog_send_info *info, ushort flags,
193                                  ulong *ev_offset, const char **errmsg);
194 
195 /*
196     fake_rotate_event() builds a fake (=which does not exist physically in any
197     binlog) Rotate event, which contains the name of the binlog we are going to
198     send to the slave (because the slave may not know it if it just asked for
199     MASTER_LOG_FILE='', MASTER_LOG_POS=4).
200     < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
201     After this version we always call it, so that a 3.23.58 slave can rely on
202     it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
203     zeros in the good positions which, by chance, make it possible for the 3.23
204     slave to detect that this event is unexpected) (this is luck which happens
205     because the master and slave disagree on the size of the header of
206     Log_event).
207 
208     Relying on the event length of the Rotate event instead of these
209     well-placed zeros was not possible as Rotate events have a variable-length
210     part.
211 */
212 
213 static int fake_rotate_event(binlog_send_info *info, ulonglong position,
214                              const char** errmsg, enum enum_binlog_checksum_alg checksum_alg_arg)
215 {
216   DBUG_ENTER("fake_rotate_event");
217   ulong ev_offset;
218   char buf[ROTATE_HEADER_LEN+100];
219   my_bool do_checksum;
220   int err;
221   char* p = info->log_file_name+dirname_length(info->log_file_name);
222   uint ident_len = (uint) strlen(p);
223   String *packet= info->packet;
224   ha_checksum crc;
225 
226   /* reset transmit packet for the fake rotate event below */
227   if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
228     DBUG_RETURN(1);
229 
230   if ((err= fake_event_header(packet, ROTATE_EVENT,
231                               ident_len + ROTATE_HEADER_LEN, &do_checksum,
232                               &crc,
233                               errmsg, checksum_alg_arg, 0)))
234   {
235     info->error= ER_UNKNOWN_ERROR;
236     DBUG_RETURN(err);
237   }
238 
239   int8store(buf+R_POS_OFFSET,position);
240   packet->append(buf, ROTATE_HEADER_LEN);
241   packet->append(p, ident_len);
242 
243   if (do_checksum)
244   {
245     crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
246     crc= my_checksum(crc, (uchar*)p, ident_len);
247   }
248 
249   if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
250       (err= fake_event_write(info->net, packet, errmsg)))
251   {
252     info->error= ER_UNKNOWN_ERROR;
253     DBUG_RETURN(err);
254   }
255   DBUG_RETURN(0);
256 }
257 
258 
259 static int fake_gtid_list_event(binlog_send_info *info,
260                                 Gtid_list_log_event *glev, const char** errmsg,
261                                 uint32 current_pos)
262 {
263   my_bool do_checksum;
264   int err;
265   ha_checksum crc;
266   char buf[128];
267   String str(buf, sizeof(buf), system_charset_info);
268   String* packet= info->packet;
269 
270   str.length(0);
271   if (glev->to_packet(&str))
272   {
273     info->error= ER_UNKNOWN_ERROR;
274     *errmsg= "Failed due to out-of-memory writing Gtid_list event";
275     return -1;
276   }
277   if ((err= fake_event_header(packet, GTID_LIST_EVENT,
278                               str.length(), &do_checksum, &crc,
279                               errmsg, info->current_checksum_alg, current_pos)))
280   {
281     info->error= ER_UNKNOWN_ERROR;
282     return err;
283   }
284 
285   packet->append(str);
286   if (do_checksum)
287   {
288     crc= my_checksum(crc, (uchar*)str.ptr(), str.length());
289   }
290 
291   if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
292       (err= fake_event_write(info->net, packet, errmsg)))
293   {
294     info->error= ER_UNKNOWN_ERROR;
295     return err;
296   }
297 
298   return 0;
299 }
300 
301 
302 /*
303   Reset thread transmit packet buffer for event sending
304 
305   This function allocates header bytes for event transmission, and
306   should be called before store the event data to the packet buffer.
307 */
308 static int reset_transmit_packet(binlog_send_info *info, ushort flags,
309                                  ulong *ev_offset, const char **errmsg)
310 {
311   int ret= 0;
312   String *packet= &info->thd->packet;
313 
314   /* reserve and set default header */
315   packet->length(0);
316   packet->set("\0", 1, &my_charset_bin);
317 
318   if (info->thd->semi_sync_slave)
319   {
320     if (repl_semisync_master.reserve_sync_header(packet))
321     {
322       info->error= ER_UNKNOWN_ERROR;
323       *errmsg= "Failed to run hook 'reserve_header'";
324       ret= 1;
325     }
326   }
327 
328   *ev_offset= packet->length();
329   return ret;
330 }
331 
332 int get_user_var_int(const char *name,
333                      long long int *value, int *null_value)
334 {
335   bool null_val;
336   user_var_entry *entry=
337     (user_var_entry*) my_hash_search(&current_thd->user_vars,
338                                   (uchar*) name, strlen(name));
339   if (!entry)
340     return 1;
341   *value= entry->val_int(&null_val);
342   if (null_value)
343     *null_value= null_val;
344   return 0;
345 }
346 
347 inline bool is_semi_sync_slave()
348 {
349   int null_value;
350   long long val= 0;
351   get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
352   return val;
353 }
354 
355 static int send_file(THD *thd)
356 {
357   NET* net = &thd->net;
358   int fd = -1, error = 1;
359   size_t bytes;
360   char fname[FN_REFLEN+1];
361   const char *errmsg = 0;
362   int old_timeout;
363   unsigned long packet_len;
364   uchar buf[IO_SIZE];				// It's safe to alloc this
365   DBUG_ENTER("send_file");
366 
367   /*
368     The client might be slow loading the data, give him wait_timeout to do
369     the job
370   */
371   old_timeout= net->read_timeout;
372   my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
373 
374   /*
375     We need net_flush here because the client will not know it needs to send
376     us the file name until it has processed the load event entry
377   */
378   if (unlikely(net_flush(net)))
379   {
380   read_error:
381     errmsg = "while reading file name";
382     goto err;
383   }
384   packet_len= my_net_read(net);
385   if (unlikely(packet_len == packet_error))
386     goto read_error;
387 
388   // terminate with \0 for fn_format
389   *((char*)net->read_pos +  packet_len) = 0;
390   fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
391   // this is needed to make replicate-ignore-db
392   if (!strcmp(fname,"/dev/null"))
393     goto end;
394 
395   if ((fd= mysql_file_open(key_file_send_file,
396                            fname, O_RDONLY, MYF(0))) < 0)
397   {
398     errmsg = "on open of file";
399     goto err;
400   }
401 
402   while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0)
403   {
404     if (my_net_write(net, buf, bytes))
405     {
406       errmsg = "while writing data to client";
407       goto err;
408     }
409   }
410 
411  end:
412   if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
413       (my_net_read(net) == packet_error))
414   {
415     errmsg = "while negotiating file transfer close";
416     goto err;
417   }
418   error = 0;
419 
420  err:
421   my_net_set_read_timeout(net, old_timeout);
422   if (fd >= 0)
423     mysql_file_close(fd, MYF(0));
424   if (errmsg)
425   {
426     sql_print_error("Failed in send_file() %s", errmsg);
427     DBUG_PRINT("error", ("%s", errmsg));
428   }
429   DBUG_RETURN(error);
430 }
431 
432 
433 /**
434    Internal to mysql_binlog_send() routine that recalculates checksum for
435    1. FD event (asserted) that needs additional arranment prior sending to slave.
436    2. Start_encryption_log_event whose Ignored flag is set
437 TODO DBUG_ASSERT can be removed if this function is used for more general cases
438 */
439 
440 inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet,
441                          ulong ev_offset)
442 {
443   if (checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
444       checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
445     return;
446   /* recalculate the crc for this event */
447   uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
448   ha_checksum crc;
449   DBUG_ASSERT((data_len ==
450               LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
451               BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN) ||
452               (data_len ==
453               LOG_EVENT_MINIMAL_HEADER_LEN + BINLOG_CRYPTO_SCHEME_LENGTH +
454               BINLOG_KEY_VERSION_LENGTH + BINLOG_NONCE_LENGTH +
455               BINLOG_CHECKSUM_LEN));
456   crc= my_checksum(0, (uchar *)packet->ptr() + ev_offset, data_len -
457                    BINLOG_CHECKSUM_LEN);
458   int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);
459 }
460 
461 
462 static user_var_entry * get_binlog_checksum_uservar(THD * thd)
463 {
464   LEX_CSTRING name=  { STRING_WITH_LEN("master_binlog_checksum")};
465   user_var_entry *entry=
466     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
467                                   name.length);
468   return entry;
469 }
470 
471 /**
472   Function for calling in mysql_binlog_send
473   to check if slave initiated checksum-handshake.
474 
475   @param[in]    thd  THD to access a user variable
476 
477   @return        TRUE if handshake took place, FALSE otherwise
478 */
479 
480 static bool is_slave_checksum_aware(THD * thd)
481 {
482   DBUG_ENTER("is_slave_checksum_aware");
483   user_var_entry *entry= get_binlog_checksum_uservar(thd);
484   DBUG_RETURN(entry? true  : false);
485 }
486 
487 /**
488   Function for calling in mysql_binlog_send
489   to get the value of @@binlog_checksum of the master at
490   time of checksum-handshake.
491 
492   The value tells the master whether to compute or not, and the slave
493   to verify or not the first artificial Rotate event's checksum.
494 
495   @param[in]    thd  THD to access a user variable
496 
497   @return       value of @@binlog_checksum alg according to
498                 @c enum enum_binlog_checksum_alg
499 */
500 
501 static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * thd)
502 {
503   enum enum_binlog_checksum_alg ret;
504 
505   DBUG_ENTER("get_binlog_checksum_value_at_connect");
506   user_var_entry *entry= get_binlog_checksum_uservar(thd);
507   if (!entry)
508   {
509     ret= BINLOG_CHECKSUM_ALG_UNDEF;
510   }
511   else
512   {
513     DBUG_ASSERT(entry->type == STRING_RESULT);
514     String str;
515     uint dummy_errors;
516     str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin,
517              &dummy_errors);
518     ret= (enum_binlog_checksum_alg)
519       (find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1);
520     DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg
521   }
522   DBUG_RETURN(ret);
523 }
524 
525 /*
526   Adjust the position pointer in the binary log file for all running slaves
527 
528   SYNOPSIS
529     adjust_linfo_offsets()
530     purge_offset	Number of bytes removed from start of log index file
531 
532   NOTES
533     - This is called when doing a PURGE when we delete lines from the
534       index log file
535 
536   REQUIREMENTS
537     - Before calling this function, we have to ensure that no threads are
538       using any binary log file before purge_offset.a
539 
540   TODO
541     - Inform the slave threads that they should sync the position
542       in the binary log file with Relay_log_info::flush().
543       Now they sync is done for next read.
544 */
545 
546 void adjust_linfo_offsets(my_off_t purge_offset)
547 {
548   THD *tmp;
549 
550   mysql_mutex_lock(&LOCK_thread_count);
551   I_List_iterator<THD> it(threads);
552 
553   while ((tmp=it++))
554   {
555     LOG_INFO* linfo;
556     if ((linfo = tmp->current_linfo))
557     {
558       mysql_mutex_lock(&linfo->lock);
559       /*
560 	Index file offset can be less that purge offset only if
561 	we just started reading the index file. In that case
562 	we have nothing to adjust
563       */
564       if (linfo->index_file_offset < purge_offset)
565 	linfo->fatal = (linfo->index_file_offset != 0);
566       else
567 	linfo->index_file_offset -= purge_offset;
568       mysql_mutex_unlock(&linfo->lock);
569     }
570   }
571   mysql_mutex_unlock(&LOCK_thread_count);
572 }
573 
574 
575 bool log_in_use(const char* log_name)
576 {
577   size_t log_name_len = strlen(log_name) + 1;
578   THD *tmp;
579   bool result = 0;
580 
581   mysql_mutex_lock(&LOCK_thread_count);
582   I_List_iterator<THD> it(threads);
583 
584   while ((tmp=it++))
585   {
586     LOG_INFO* linfo;
587     if ((linfo = tmp->current_linfo))
588     {
589       mysql_mutex_lock(&linfo->lock);
590       result = !strncmp(log_name, linfo->log_file_name, log_name_len);
591       mysql_mutex_unlock(&linfo->lock);
592       if (result)
593 	break;
594     }
595   }
596 
597   mysql_mutex_unlock(&LOCK_thread_count);
598   return result;
599 }
600 
601 bool purge_error_message(THD* thd, int res)
602 {
603   uint errcode;
604 
605   if ((errcode= purge_log_get_error_code(res)) != 0)
606   {
607     my_message(errcode, ER_THD(thd, errcode), MYF(0));
608     return TRUE;
609   }
610   my_ok(thd);
611   return FALSE;
612 }
613 
614 
615 /**
616   Execute a PURGE BINARY LOGS TO <log> command.
617 
618   @param thd Pointer to THD object for the client thread executing the
619   statement.
620 
621   @param to_log Name of the last log to purge.
622 
623   @retval FALSE success
624   @retval TRUE failure
625 */
626 bool purge_master_logs(THD* thd, const char* to_log)
627 {
628   char search_file_name[FN_REFLEN];
629   if (!mysql_bin_log.is_open())
630   {
631     my_ok(thd);
632     return FALSE;
633   }
634 
635   mysql_bin_log.make_log_name(search_file_name, to_log);
636   return purge_error_message(thd,
637 			     mysql_bin_log.purge_logs(search_file_name, 0, 1,
638 						      1, NULL));
639 }
640 
641 
642 /**
643   Execute a PURGE BINARY LOGS BEFORE <date> command.
644 
645   @param thd Pointer to THD object for the client thread executing the
646   statement.
647 
648   @param purge_time Date before which logs should be purged.
649 
650   @retval FALSE success
651   @retval TRUE failure
652 */
653 bool purge_master_logs_before_date(THD* thd, time_t purge_time)
654 {
655   if (!mysql_bin_log.is_open())
656   {
657     my_ok(thd);
658     return 0;
659   }
660   return purge_error_message(thd,
661                              mysql_bin_log.purge_logs_before_date(purge_time));
662 }
663 
664 void set_read_error(binlog_send_info *info, int error)
665 {
666   if (error == LOG_READ_EOF)
667   {
668     return;
669   }
670   info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
671   switch (error) {
672   case LOG_READ_BOGUS:
673     info->errmsg= "bogus data in log event";
674     break;
675   case LOG_READ_TOO_LARGE:
676     info->errmsg= "log event entry exceeded max_allowed_packet; "
677         "Increase max_allowed_packet on master";
678     break;
679   case LOG_READ_IO:
680     info->errmsg= "I/O error reading log event";
681     break;
682   case LOG_READ_MEM:
683     info->errmsg= "memory allocation failed reading log event";
684     break;
685   case LOG_READ_TRUNC:
686     info->errmsg= "binlog truncated in the middle of event; "
687         "consider out of disk space on master";
688     break;
689   case LOG_READ_CHECKSUM_FAILURE:
690     info->errmsg= "event read from binlog did not pass crc check";
691     break;
692   case LOG_READ_DECRYPT:
693     info->errmsg= "event decryption failure";
694     break;
695   default:
696     info->errmsg= "unknown error reading log event on the master";
697     break;
698   }
699 }
700 
701 
702 /**
703   An auxiliary function for calling in mysql_binlog_send
704   to initialize the heartbeat timeout in waiting for a binlogged event.
705 
706   @param[in]    thd  THD to access a user variable
707 
708   @return        heartbeat period an ulonglong of nanoseconds
709                  or zero if heartbeat was not demanded by slave
710 */
711 static ulonglong get_heartbeat_period(THD * thd)
712 {
713   bool null_value;
714   LEX_CSTRING name=  { STRING_WITH_LEN("master_heartbeat_period")};
715   user_var_entry *entry=
716     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
717                                   name.length);
718   return entry? entry->val_int(&null_value) : 0;
719 }
720 
721 /*
722   Lookup the capabilities of the slave, which it announces by setting a value
723   MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability.
724 
725   Older MariaDB slaves, and other MySQL slaves, do not set
726   @mariadb_slave_capability, corresponding to a capability of
727   MARIA_SLAVE_CAPABILITY_UNKNOWN (0).
728 */
729 static int
730 get_mariadb_slave_capability(THD *thd)
731 {
732   bool null_value;
733   const LEX_CSTRING name= { STRING_WITH_LEN("mariadb_slave_capability") };
734   const user_var_entry *entry=
735     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
736                                   name.length);
737   return entry ?
738     (int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN;
739 }
740 
741 
742 /*
743   Get the value of the @slave_connect_state user variable into the supplied
744   String (this is the GTID connect state requested by the connecting slave).
745 
746   Returns false if error (ie. slave did not set the variable and does not
747   want to use GTID to set start position), true if success.
748 */
749 static bool
750 get_slave_connect_state(THD *thd, String *out_str)
751 {
752   bool null_value;
753 
754   const LEX_CSTRING name= { STRING_WITH_LEN("slave_connect_state") };
755   user_var_entry *entry=
756     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
757                                   name.length);
758   return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
759 }
760 
761 
762 static bool
763 get_slave_gtid_strict_mode(THD *thd)
764 {
765   bool null_value;
766 
767   const LEX_CSTRING name= { STRING_WITH_LEN("slave_gtid_strict_mode") };
768   user_var_entry *entry=
769     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
770                                   name.length);
771   return entry && entry->val_int(&null_value) && !null_value;
772 }
773 
774 
775 static bool
776 get_slave_gtid_ignore_duplicates(THD *thd)
777 {
778   bool null_value;
779 
780   const LEX_CSTRING name= { STRING_WITH_LEN("slave_gtid_ignore_duplicates") };
781   user_var_entry *entry=
782     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
783                                      name.length);
784   return entry && entry->val_int(&null_value) && !null_value;
785 }
786 
787 
788 /*
789   Get the value of the @slave_until_gtid user variable into the supplied
790   String (this is the GTID position specified for START SLAVE UNTIL
791   master_gtid_pos='xxx').
792 
793   Returns false if error (ie. slave did not set the variable and is not doing
794   START SLAVE UNTIL mater_gtid_pos='xxx'), true if success.
795 */
796 static bool
797 get_slave_until_gtid(THD *thd, String *out_str)
798 {
799   bool null_value;
800 
801   const LEX_CSTRING name= { STRING_WITH_LEN("slave_until_gtid") };
802   user_var_entry *entry=
803     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
804                                   name.length);
805   return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
806 }
807 
808 
809 /*
810   Function prepares and sends repliation heartbeat event.
811 
812   @param net                net object of THD
813   @param packet             buffer to store the heartbeat instance
814   @param event_coordinates  binlog file name and position of the last
815                             real event master sent from binlog
816 
817   @note
818     Among three essential pieces of heartbeat data Log_event::when
819     is computed locally.
820     The  error to send is serious and should force terminating
821     the dump thread.
822 */
823 static int send_heartbeat_event(binlog_send_info *info,
824                                 NET* net, String* packet,
825                                 const struct event_coordinates *coord,
826                                 enum enum_binlog_checksum_alg checksum_alg_arg)
827 {
828   DBUG_ENTER("send_heartbeat_event");
829 
830   ulong ev_offset;
831   char sub_header_buf[HB_SUB_HEADER_LEN];
832   bool sub_header_in_use=false;
833   if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
834     DBUG_RETURN(1);
835 
836   char header[LOG_EVENT_HEADER_LEN];
837   my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
838     checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
839   /*
840     'when' (the timestamp) is set to 0 so that slave could distinguish between
841     real and fake Rotate events (if necessary)
842   */
843   memset(header, 0, 4);  // when
844 
845   header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
846 
847   char* p= coord->file_name + dirname_length(coord->file_name);
848 
849   size_t ident_len = strlen(p);
850   size_t event_len = ident_len + LOG_EVENT_HEADER_LEN +
851     (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
852   int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
853   DBUG_EXECUTE_IF("simulate_pos_4G",
854   {
855     const_cast<event_coordinates *>(coord)->pos= (UINT_MAX32 + (ulong)1);
856     DBUG_SET("-d, simulate_pos_4G");
857   };);
858   if (coord->pos <= UINT_MAX32)
859   {
860     int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
861   }
862   else
863   {
864     // Set common_header.log_pos=0 to indicate its overflow
865     int4store(header + LOG_POS_OFFSET, 0);
866     sub_header_in_use= true;
867     int8store(sub_header_buf, coord->pos);
868     event_len+= HB_SUB_HEADER_LEN;
869   }
870 
871   int4store(header + EVENT_LEN_OFFSET, event_len);
872   int2store(header + FLAGS_OFFSET, 0);
873 
874   packet->append(header, sizeof(header));
875   if (sub_header_in_use)
876     packet->append(sub_header_buf, sizeof(sub_header_buf));
877   packet->append(p, ident_len);                    // log_file_name
878 
879   if (do_checksum)
880   {
881     char b[BINLOG_CHECKSUM_LEN];
882     ha_checksum crc= my_checksum(0, (uchar*) header, sizeof(header));
883     if (sub_header_in_use)
884       crc= my_checksum(crc, (uchar*) sub_header_buf, sizeof(sub_header_buf));
885     crc= my_checksum(crc, (uchar*) p, ident_len);
886     int4store(b, crc);
887     packet->append(b, sizeof(b));
888   }
889 
890   if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
891       net_flush(net))
892   {
893     info->error= ER_UNKNOWN_ERROR;
894     DBUG_RETURN(-1);
895   }
896 
897   DBUG_RETURN(0);
898 }
899 
900 
901 struct binlog_file_entry
902 {
903   binlog_file_entry *next;
904   char *name;
905 };
906 
907 static binlog_file_entry *
908 get_binlog_list(MEM_ROOT *memroot)
909 {
910   IO_CACHE *index_file;
911   char fname[FN_REFLEN];
912   size_t length;
913   binlog_file_entry *current_list= NULL, *e;
914   DBUG_ENTER("get_binlog_list");
915 
916   if (!mysql_bin_log.is_open())
917   {
918     my_error(ER_NO_BINARY_LOGGING, MYF(0));
919     DBUG_RETURN(NULL);
920   }
921 
922   mysql_bin_log.lock_index();
923   index_file=mysql_bin_log.get_index_file();
924   reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
925 
926   /* The file ends with EOF or empty line */
927   while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
928   {
929     --length;                                   /* Remove the newline */
930     if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
931         !(e->name= strmake_root(memroot, fname, length)))
932     {
933       mysql_bin_log.unlock_index();
934       DBUG_RETURN(NULL);
935     }
936     e->next= current_list;
937     current_list= e;
938   }
939   mysql_bin_log.unlock_index();
940 
941   DBUG_RETURN(current_list);
942 }
943 
944 
945 /*
946   Check if every GTID requested by the slave is contained in this (or a later)
947   binlog file. Return true if so, false if not.
948 
949   We do the check with a single scan of the list of GTIDs, avoiding the need
950   to build an in-memory hash or stuff like that.
951 
952   We need to check that slave did not request GTID D-S-N1, when the
953   Gtid_list_log_event for this binlog file has D-S-N2 with N2 >= N1.
954   (Because this means that requested GTID is in an earlier binlog).
955   However, if the Gtid_list_log_event indicates that D-S-N1 is the very last
956   GTID for domain D in prior binlog files, then it is ok to start from the
957   very start of this binlog file. This special case is important, as it
958   allows to purge old logs even if some domain is unused for long.
959 
960   In addition, we need to check that we do not have a GTID D-S-N3 in the
961   Gtid_list_log_event where D is not present in the requested slave state at
962   all. Since if D is not in requested slave state, it means that slave needs
963   to start at the very first GTID in domain D.
964 */
965 static bool
966 contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
967 {
968   uint32 i;
969 
970   for (i= 0; i < glev->count; ++i)
971   {
972     uint32 gl_domain_id= glev->list[i].domain_id;
973     const rpl_gtid *gtid= st->find(gl_domain_id);
974     if (!gtid)
975     {
976       /*
977         The slave needs to start from the very beginning of this domain, which
978         is in an earlier binlog file. So we need to search back further.
979       */
980       return false;
981     }
982     if (gtid->server_id == glev->list[i].server_id &&
983         gtid->seq_no <= glev->list[i].seq_no)
984     {
985       /*
986         The slave needs to start after gtid, but it is contained in an earlier
987         binlog file. So we need to search back further, unless it was the very
988         last gtid logged for the domain in earlier binlog files.
989       */
990       if (gtid->seq_no < glev->list[i].seq_no)
991         return false;
992 
993       /*
994         The slave requested D-S-N1, which happens to be the last GTID logged
995         in prior binlog files with same domain id D and server id S.
996 
997         The Gtid_list is kept sorted on domain_id, with the last GTID in each
998         domain_id group being the last one logged. So if this is the last GTID
999         within the domain_id group, then it is ok to start from the very
1000         beginning of this group, per the special case explained in comment at
1001         the start of this function. If not, then we need to search back further.
1002       */
1003       if (i+1 < glev->count && gl_domain_id == glev->list[i+1].domain_id)
1004         return false;
1005     }
1006   }
1007 
1008   return true;
1009 }
1010 
1011 
1012 static void
1013 give_error_start_pos_missing_in_binlog(int *err, const char **errormsg,
1014                                        rpl_gtid *error_gtid)
1015 {
1016   rpl_gtid binlog_gtid;
1017 
1018   if (mysql_bin_log.lookup_domain_in_binlog_state(error_gtid->domain_id,
1019                                                   &binlog_gtid) &&
1020       binlog_gtid.seq_no >= error_gtid->seq_no)
1021   {
1022     *errormsg= "Requested slave GTID state not found in binlog. The slave has "
1023       "probably diverged due to executing erroneous transactions";
1024     *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2;
1025   }
1026   else
1027   {
1028     *errormsg= "Requested slave GTID state not found in binlog";
1029     *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
1030   }
1031 }
1032 
1033 
1034 /*
1035   Check the start GTID state requested by the slave against our binlog state.
1036 
1037   Give an error if the slave requests something that we do not have in our
1038   binlog.
1039 */
1040 
1041 static int
1042 check_slave_start_position(binlog_send_info *info, const char **errormsg,
1043                            rpl_gtid *error_gtid)
1044 {
1045   uint32 i;
1046   int err;
1047   slave_connection_state::entry **delete_list= NULL;
1048   uint32 delete_idx= 0;
1049   slave_connection_state *st= &info->gtid_state;
1050 
1051   if (rpl_load_gtid_slave_state(info->thd))
1052   {
1053     *errormsg= "Failed to load replication slave GTID state";
1054     err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
1055     goto end;
1056   }
1057 
1058   for (i= 0; i < st->hash.records; ++i)
1059   {
1060     slave_connection_state::entry *slave_gtid_entry=
1061       (slave_connection_state::entry *)my_hash_element(&st->hash, i);
1062     rpl_gtid *slave_gtid= &slave_gtid_entry->gtid;
1063     rpl_gtid master_gtid;
1064     rpl_gtid master_replication_gtid;
1065     rpl_gtid start_gtid;
1066     bool start_at_own_slave_pos=
1067       rpl_global_gtid_slave_state->domain_to_gtid(slave_gtid->domain_id,
1068                                                   &master_replication_gtid) &&
1069       slave_gtid->server_id == master_replication_gtid.server_id &&
1070       slave_gtid->seq_no == master_replication_gtid.seq_no;
1071 
1072     if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
1073                                            slave_gtid->server_id,
1074                                            &master_gtid) &&
1075         master_gtid.seq_no >= slave_gtid->seq_no)
1076     {
1077       /*
1078         If connecting slave requests to start at the GTID we last applied when
1079         we were ourselves a slave, then this GTID may not exist in our binlog
1080         (in case of --log-slave-updates=0). So set the flag to disable the
1081         error about missing GTID in the binlog in this case.
1082       */
1083       if (start_at_own_slave_pos)
1084         slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS;
1085       continue;
1086     }
1087 
1088     if (!start_at_own_slave_pos)
1089     {
1090       rpl_gtid domain_gtid;
1091       slave_connection_state *until_gtid_state= info->until_gtid_state;
1092       rpl_gtid *until_gtid;
1093 
1094       if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
1095                                                        &domain_gtid))
1096       {
1097         /*
1098           We do not have anything in this domain, neither in the binlog nor
1099           in the slave state. So we are probably one master in a multi-master
1100           setup, and this domain is served by a different master.
1101 
1102           But set a flag so that if we then ever _do_ happen to encounter
1103           anything in this domain, then we will re-check that the requested
1104           slave position exists, and give the error at that time if not.
1105         */
1106         slave_gtid_entry->flags|= slave_connection_state::START_ON_EMPTY_DOMAIN;
1107         continue;
1108       }
1109 
1110       if (info->slave_gtid_ignore_duplicates &&
1111           domain_gtid.seq_no < slave_gtid->seq_no)
1112       {
1113         /*
1114           When --gtid-ignore-duplicates, it is ok for the slave to request
1115           something that we do not have (yet) - they might already have gotten
1116           it through another path in a multi-path replication hierarchy.
1117         */
1118         continue;
1119       }
1120 
1121       if (until_gtid_state &&
1122           ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
1123             (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
1124                                                 until_gtid->server_id,
1125                                                 &master_gtid) &&
1126              master_gtid.seq_no >= until_gtid->seq_no)))
1127       {
1128         /*
1129           The slave requested to start from a position that is not (yet) in
1130           our binlog, but it also specified an UNTIL condition that _is_ in
1131           our binlog (or a missing UNTIL, which means stop at the very
1132           beginning). So the stop position is before the start position, and
1133           we just delete the entry from the UNTIL hash to mark that this
1134           domain has already reached the UNTIL condition.
1135         */
1136         if(until_gtid)
1137           until_gtid_state->remove(until_gtid);
1138         continue;
1139       }
1140 
1141       *error_gtid= *slave_gtid;
1142       give_error_start_pos_missing_in_binlog(&err, errormsg, error_gtid);
1143       goto end;
1144     }
1145 
1146     /*
1147       Ok, so connecting slave asked to start at a GTID that we do not have in
1148       our binlog, but it was in fact the last GTID we applied earlier, when we
1149       were acting as a replication slave.
1150 
1151       So this means that we were running as a replication slave without
1152       --log-slave-updates, but now we switched to be a master. It is worth it
1153       to handle this special case, as it allows users to run a simple
1154       master -> slave without --log-slave-updates, and then exchange slave and
1155       master, as long as they make sure the slave is caught up before switching.
1156     */
1157 
1158     /*
1159       First check if we logged something ourselves as a master after being a
1160       slave. This will be seen as a GTID with our own server_id and bigger
1161       seq_no than what is in the slave state.
1162 
1163       If we did not log anything ourselves, then start the connecting slave
1164       replicating from the current binlog end position, which in this case
1165       corresponds to our replication slave state and hence what the connecting
1166       slave is requesting.
1167     */
1168     if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
1169                                            global_system_variables.server_id,
1170                                            &start_gtid) &&
1171         start_gtid.seq_no > slave_gtid->seq_no)
1172     {
1173       /*
1174         Start replication within this domain at the first GTID that we logged
1175         ourselves after becoming a master.
1176 
1177         Remember that this starting point is in fact a "fake" GTID which may
1178         not exists in the binlog, so that we do not complain about it in
1179         --gtid-strict-mode.
1180       */
1181       slave_gtid->server_id= global_system_variables.server_id;
1182       slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS;
1183     }
1184     else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
1185                                                          &start_gtid))
1186     {
1187       slave_gtid->server_id= start_gtid.server_id;
1188       slave_gtid->seq_no= start_gtid.seq_no;
1189     }
1190     else
1191     {
1192       /*
1193         We do not have _anything_ in our own binlog for this domain.  Just
1194         delete the entry in the slave connection state, then it will pick up
1195         anything new that arrives.
1196 
1197         We just queue up the deletion and do it later, after the loop, so that
1198         we do not mess up the iteration over the hash.
1199       */
1200       if (!delete_list)
1201       {
1202         if (!(delete_list= (slave_connection_state::entry **)
1203               my_malloc(sizeof(*delete_list) * st->hash.records, MYF(MY_WME))))
1204         {
1205           *errormsg= "Out of memory while checking slave start position";
1206           err= ER_OUT_OF_RESOURCES;
1207           goto end;
1208         }
1209       }
1210       delete_list[delete_idx++]= slave_gtid_entry;
1211     }
1212   }
1213 
1214   /* Do any delayed deletes from the hash. */
1215   if (delete_list)
1216   {
1217     for (i= 0; i < delete_idx; ++i)
1218       st->remove(&(delete_list[i]->gtid));
1219   }
1220   err= 0;
1221 
1222 end:
1223   if (delete_list)
1224     my_free(delete_list);
1225   return err;
1226 }
1227 
1228 /*
1229   Find the name of the binlog file to start reading for a slave that connects
1230   using GTID state.
1231 
1232   Returns the file name in out_name, which must be of size at least FN_REFLEN.
1233 
1234   Returns NULL on ok, error message on error.
1235 
1236   In case of non-error return, the returned binlog file is guaranteed to
1237   contain the first event to be transmitted to the slave for every domain
1238   present in our binlogs. It is still necessary to skip all GTIDs up to
1239   and including the GTID requested by slave within each domain.
1240 
1241   However, as a special case, if the event to be sent to the slave is the very
1242   first event (within that domain) in the returned binlog, then nothing should
1243   be skipped, so that domain is deleted from the passed in slave connection
1244   state.
1245 
1246   This is necessary in case the slave requests a GTID within a replication
1247   domain that has long been inactive. The binlog file containing that GTID may
1248   have been long since purged. However, as long as no GTIDs after that have
1249   been purged, we have the GTID requested by slave in the Gtid_list_log_event
1250   of the latest binlog. So we can start from there, as long as we delete the
1251   corresponding entry in the slave state so we do not wrongly skip any events
1252   that might turn up if that domain becomes active again, vainly looking for
1253   the requested GTID that was already purged.
1254 */
1255 static const char *
1256 gtid_find_binlog_file(slave_connection_state *state, char *out_name,
1257                       slave_connection_state *until_gtid_state)
1258 {
1259   MEM_ROOT memroot;
1260   binlog_file_entry *list;
1261   Gtid_list_log_event *glev= NULL;
1262   const char *errormsg= NULL;
1263   char buf[FN_REFLEN];
1264 
1265   init_alloc_root(&memroot, "gtid_find_binlog_file",
1266                   10*(FN_REFLEN+sizeof(binlog_file_entry)),
1267                   0, MYF(MY_THREAD_SPECIFIC));
1268   if (!(list= get_binlog_list(&memroot)))
1269   {
1270     errormsg= "Out of memory while looking for GTID position in binlog";
1271     goto end;
1272   }
1273 
1274   while (list)
1275   {
1276     File file;
1277     IO_CACHE cache;
1278 
1279     if (!list->next)
1280     {
1281       /*
1282         It should be safe to read the currently used binlog, as we will only
1283         read the header part that is already written.
1284 
1285         But if that does not work on windows, then we will need to cache the
1286         event somewhere in memory I suppose - that could work too.
1287       */
1288     }
1289     /*
1290       Read the Gtid_list_log_event at the start of the binlog file to
1291       get the binlog state.
1292     */
1293     if (normalize_binlog_name(buf, list->name, false))
1294     {
1295       errormsg= "Failed to determine binlog file name while looking for "
1296         "GTID position in binlog";
1297       goto end;
1298     }
1299     bzero((char*) &cache, sizeof(cache));
1300     if (unlikely((file= open_binlog(&cache, buf, &errormsg)) == (File)-1))
1301       goto end;
1302     errormsg= get_gtid_list_event(&cache, &glev);
1303     end_io_cache(&cache);
1304     mysql_file_close(file, MYF(MY_WME));
1305     if (unlikely(errormsg))
1306       goto end;
1307 
1308     if (!glev || contains_all_slave_gtid(state, glev))
1309     {
1310       strmake(out_name, buf, FN_REFLEN);
1311 
1312       if (glev)
1313       {
1314         uint32 i;
1315 
1316         /*
1317           As a special case, we allow to start from binlog file N if the
1318           requested GTID is the last event (in the corresponding domain) in
1319           binlog file (N-1), but then we need to remove that GTID from the slave
1320           state, rather than skipping events waiting for it to turn up.
1321 
1322           If slave is doing START SLAVE UNTIL, check for any UNTIL conditions
1323           that are already included in a previous binlog file. Delete any such
1324           from the UNTIL hash, to mark that such domains have already reached
1325           their UNTIL condition.
1326         */
1327         for (i= 0; i < glev->count; ++i)
1328         {
1329           const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
1330           if (!gtid)
1331           {
1332             /*
1333               Contains_all_slave_gtid() returns false if there is any domain in
1334               Gtid_list_event which is not in the requested slave position.
1335 
1336               We may delete a domain from the slave state inside this loop, but
1337               we only do this when it is the very last GTID logged for that
1338               domain in earlier binlogs, and then we can not encounter it in any
1339               further GTIDs in the Gtid_list.
1340             */
1341             DBUG_ASSERT(0);
1342           } else if (gtid->server_id == glev->list[i].server_id &&
1343                      gtid->seq_no == glev->list[i].seq_no)
1344           {
1345             /*
1346               The slave requested to start from the very beginning of this
1347               domain in this binlog file. So delete the entry from the state,
1348               we do not need to skip anything.
1349             */
1350             state->remove(gtid);
1351           }
1352 
1353           if (until_gtid_state &&
1354               (gtid= until_gtid_state->find(glev->list[i].domain_id)) &&
1355               gtid->server_id == glev->list[i].server_id &&
1356               gtid->seq_no <= glev->list[i].seq_no)
1357           {
1358             /*
1359               We've already reached the stop position in UNTIL for this domain,
1360               since it is before the start position.
1361             */
1362             until_gtid_state->remove(gtid);
1363           }
1364         }
1365       }
1366 
1367       goto end;
1368     }
1369     delete glev;
1370     glev= NULL;
1371     list= list->next;
1372   }
1373 
1374   /* We reached the end without finding anything. */
1375   errormsg= "Could not find GTID state requested by slave in any binlog "
1376     "files. Probably the slave state is too old and required binlog files "
1377     "have been purged.";
1378 
1379 end:
1380   if (glev)
1381     delete glev;
1382 
1383   free_root(&memroot, MYF(0));
1384   return errormsg;
1385 }
1386 
1387 
1388 /*
1389   Given an old-style binlog position with file name and file offset, find the
1390   corresponding gtid position. If the offset is not at an event boundary, give
1391   an error.
1392 
1393   Return NULL on ok, error message string on error.
1394 
1395   ToDo: Improve the performance of this by using binlog index files.
1396 */
1397 static const char *
1398 gtid_state_from_pos(const char *name, uint32 offset,
1399                     slave_connection_state *gtid_state)
1400 {
1401   IO_CACHE cache;
1402   File file;
1403   const char *errormsg= NULL;
1404   bool found_gtid_list_event= false;
1405   bool found_format_description_event= false;
1406   bool valid_pos= false;
1407   enum enum_binlog_checksum_alg current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
1408   int err;
1409   String packet;
1410   Format_description_log_event *fdev= NULL;
1411 
1412   if (unlikely(gtid_state->load((const rpl_gtid *)NULL, 0)))
1413   {
1414     errormsg= "Internal error (out of memory?) initializing slave state "
1415       "while scanning binlog to find start position";
1416     return errormsg;
1417   }
1418 
1419   if (unlikely((file= open_binlog(&cache, name, &errormsg)) == (File)-1))
1420     return errormsg;
1421 
1422   if (!(fdev= new Format_description_log_event(3)))
1423   {
1424     errormsg= "Out of memory initializing format_description event "
1425       "while scanning binlog to find start position";
1426     goto end;
1427   }
1428 
1429   /*
1430     First we need to find the initial GTID_LIST_EVENT. We need this even
1431     if the offset is at the very start of the binlog file.
1432 
1433     But if we do not find any GTID_LIST_EVENT, then this is an old binlog
1434     with no GTID information, so we return empty GTID state.
1435   */
1436   for (;;)
1437   {
1438     Log_event_type typ;
1439     uint32 cur_pos;
1440 
1441     cur_pos= (uint32)my_b_tell(&cache);
1442     if (cur_pos == offset)
1443       valid_pos= true;
1444     if (found_format_description_event && found_gtid_list_event &&
1445         cur_pos >= offset)
1446       break;
1447 
1448     packet.length(0);
1449     err= Log_event::read_log_event(&cache, &packet, fdev,
1450                          opt_master_verify_checksum ? current_checksum_alg
1451                                                     : BINLOG_CHECKSUM_ALG_OFF);
1452     if (unlikely(err))
1453     {
1454       errormsg= "Could not read binlog while searching for slave start "
1455         "position on master";
1456       goto end;
1457     }
1458     /*
1459       The cast to uchar is needed to avoid a signed char being converted to a
1460       negative number.
1461     */
1462     typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
1463     if (typ == FORMAT_DESCRIPTION_EVENT)
1464     {
1465       Format_description_log_event *tmp;
1466 
1467       if (unlikely(found_format_description_event))
1468       {
1469         errormsg= "Duplicate format description log event found while "
1470           "searching for old-style position in binlog";
1471         goto end;
1472       }
1473 
1474       current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
1475       found_format_description_event= true;
1476       if (unlikely(!(tmp= new Format_description_log_event(packet.ptr(),
1477                                                            packet.length(),
1478                                                            fdev))))
1479       {
1480         errormsg= "Corrupt Format_description event found or out-of-memory "
1481           "while searching for old-style position in binlog";
1482         goto end;
1483       }
1484       delete fdev;
1485       fdev= tmp;
1486     }
1487     else if (typ == START_ENCRYPTION_EVENT)
1488     {
1489       uint sele_len = packet.length();
1490       if (current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
1491       {
1492         sele_len -= BINLOG_CHECKSUM_LEN;
1493       }
1494       Start_encryption_log_event sele(packet.ptr(), sele_len, fdev);
1495       if (fdev->start_decryption(&sele))
1496       {
1497         errormsg= "Could not start decryption of binlog.";
1498         goto end;
1499       }
1500     }
1501     else if (unlikely(typ != FORMAT_DESCRIPTION_EVENT &&
1502                       !found_format_description_event))
1503     {
1504       errormsg= "Did not find format description log event while searching "
1505         "for old-style position in binlog";
1506       goto end;
1507     }
1508     else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
1509              typ == BINLOG_CHECKPOINT_EVENT)
1510       continue;                                 /* Continue looking */
1511     else if (typ == GTID_LIST_EVENT)
1512     {
1513       rpl_gtid *gtid_list;
1514       bool status;
1515       uint32 list_len;
1516 
1517       if (unlikely(found_gtid_list_event))
1518       {
1519         errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
1520           "to find slave start position";
1521         goto end;
1522       }
1523       status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
1524                                         current_checksum_alg,
1525                                         &gtid_list, &list_len, fdev);
1526       if (unlikely(status))
1527       {
1528         errormsg= "Error reading Gtid_list_log_event while searching "
1529           "for old-style position in binlog";
1530         goto end;
1531       }
1532       err= gtid_state->load(gtid_list, list_len);
1533       my_free(gtid_list);
1534       if (unlikely(err))
1535       {
1536         errormsg= "Internal error (out of memory?) initialising slave state "
1537           "while scanning binlog to find start position";
1538         goto end;
1539       }
1540       found_gtid_list_event= true;
1541     }
1542     else if (unlikely(!found_gtid_list_event))
1543     {
1544       /* We did not find any Gtid_list_log_event, must be old binlog. */
1545       goto end;
1546     }
1547     else if (typ == GTID_EVENT)
1548     {
1549       rpl_gtid gtid;
1550       uchar flags2;
1551       if (unlikely(Gtid_log_event::peek(packet.ptr(), packet.length(),
1552                                         current_checksum_alg, &gtid.domain_id,
1553                                         &gtid.server_id, &gtid.seq_no, &flags2,
1554                                         fdev)))
1555       {
1556         errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
1557           "initial slave position";
1558         goto end;
1559       }
1560       if (unlikely(gtid_state->update(&gtid)))
1561       {
1562         errormsg= "Internal error (out of memory?) updating slave state while "
1563           "scanning binlog to find start position";
1564         goto end;
1565       }
1566     }
1567   }
1568 
1569   if (unlikely(!valid_pos))
1570   {
1571     errormsg= "Slave requested incorrect position in master binlog. "
1572       "Requested position %u in file '%s', but this position does not "
1573       "correspond to the location of any binlog event.";
1574   }
1575 
1576 end:
1577   delete fdev;
1578   end_io_cache(&cache);
1579   mysql_file_close(file, MYF(MY_WME));
1580 
1581   return errormsg;
1582 }
1583 
1584 
1585 int
1586 gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
1587 {
1588   slave_connection_state gtid_state;
1589   const char *lookup_name;
1590   char name_buf[FN_REFLEN];
1591   LOG_INFO linfo;
1592 
1593   if (!mysql_bin_log.is_open())
1594   {
1595     my_error(ER_NO_BINARY_LOGGING, MYF(0));
1596     return 1;
1597   }
1598 
1599   if (in_name && in_name[0])
1600   {
1601     mysql_bin_log.make_log_name(name_buf, in_name);
1602     lookup_name= name_buf;
1603   }
1604   else
1605     lookup_name= NULL;
1606   linfo.index_file_offset= 0;
1607   if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1))
1608     return 1;
1609 
1610   if (pos < 4)
1611     pos= 4;
1612 
1613   if (gtid_state_from_pos(linfo.log_file_name, pos, &gtid_state) ||
1614       gtid_state.to_string(out_str))
1615     return 1;
1616   return 0;
1617 }
1618 
1619 
1620 static bool
1621 is_until_reached(binlog_send_info *info, ulong *ev_offset,
1622                  Log_event_type event_type, const char **errmsg,
1623                  uint32 current_pos)
1624 {
1625   switch (info->gtid_until_group)
1626   {
1627   case GTID_UNTIL_NOT_DONE:
1628     return false;
1629   case GTID_UNTIL_STOP_AFTER_STANDALONE:
1630     if (Log_event::is_part_of_group(event_type))
1631       return false;
1632     break;
1633   case GTID_UNTIL_STOP_AFTER_TRANSACTION:
1634     if (event_type != XID_EVENT &&
1635         (event_type != QUERY_EVENT ||    /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
1636          !Query_log_event::peek_is_commit_rollback
1637                (info->packet->ptr()+*ev_offset,
1638                 info->packet->length()-*ev_offset,
1639                 info->current_checksum_alg)))
1640       return false;
1641     break;
1642   }
1643 
1644   /*
1645     The last event group has been sent, now the START SLAVE UNTIL condition
1646     has been reached.
1647 
1648     Send a last fake Gtid_list_log_event with a flag set to mark that we
1649     stop due to UNTIL condition.
1650   */
1651   if (reset_transmit_packet(info, info->flags, ev_offset, errmsg))
1652     return true;
1653   Gtid_list_log_event glev(&info->until_binlog_state,
1654                            Gtid_list_log_event::FLAG_UNTIL_REACHED);
1655   if (fake_gtid_list_event(info, &glev, errmsg, current_pos))
1656     return true;
1657   *errmsg= NULL;
1658   return true;
1659 }
1660 
1661 
1662 /*
1663   Helper function for mysql_binlog_send() to write an event down the slave
1664   connection.
1665 
1666   Returns NULL on success, error message string on error.
1667 */
1668 static const char *
1669 send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
1670                     IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid)
1671 {
1672   my_off_t pos;
1673   String* const packet= info->packet;
1674   size_t len= packet->length();
1675   int mariadb_slave_capability= info->mariadb_slave_capability;
1676   enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
1677   slave_connection_state *gtid_state= &info->gtid_state;
1678   slave_connection_state *until_gtid_state= info->until_gtid_state;
1679   bool need_sync= false;
1680 
1681   if (event_type == GTID_LIST_EVENT &&
1682       info->using_gtid_state && until_gtid_state)
1683   {
1684     rpl_gtid *gtid_list;
1685     uint32 list_len;
1686     bool err;
1687 
1688     if (ev_offset > len ||
1689         Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
1690                                   current_checksum_alg,
1691                                   &gtid_list, &list_len, info->fdev))
1692     {
1693       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1694       return "Failed to read Gtid_list_log_event: corrupt binlog";
1695     }
1696     err= info->until_binlog_state.load(gtid_list, list_len);
1697     my_free(gtid_list);
1698     if (err)
1699     {
1700       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1701       return "Failed in internal GTID book-keeping: Out of memory";
1702     }
1703   }
1704 
1705   /* Skip GTID event groups until we reach slave position within a domain_id. */
1706   if (event_type == GTID_EVENT && info->using_gtid_state)
1707   {
1708     uchar flags2;
1709     slave_connection_state::entry *gtid_entry;
1710     rpl_gtid *gtid;
1711 
1712     if (gtid_state->count() > 0 || until_gtid_state)
1713     {
1714       rpl_gtid event_gtid;
1715 
1716       if (ev_offset > len ||
1717           Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
1718                                current_checksum_alg,
1719                                &event_gtid.domain_id, &event_gtid.server_id,
1720                                &event_gtid.seq_no, &flags2, info->fdev))
1721       {
1722         info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1723         return "Failed to read Gtid_log_event: corrupt binlog";
1724       }
1725 
1726       DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
1727         {
1728           rpl_gtid *dbug_gtid;
1729           if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) &&
1730               dbug_gtid->seq_no == 100)
1731           {
1732             DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
1733             DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
1734             info->error= ER_UNKNOWN_ERROR;
1735             return "DBUG-injected forced reconnect";
1736           }
1737         });
1738 
1739       if (info->until_binlog_state.update_nolock(&event_gtid, false))
1740       {
1741         info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1742         return "Failed in internal GTID book-keeping: Out of memory";
1743       }
1744 
1745       if (gtid_state->count() > 0)
1746       {
1747         gtid_entry= gtid_state->find_entry(event_gtid.domain_id);
1748         if (gtid_entry != NULL)
1749         {
1750           gtid= &gtid_entry->gtid;
1751           if (gtid_entry->flags & slave_connection_state::START_ON_EMPTY_DOMAIN)
1752           {
1753             rpl_gtid master_gtid;
1754             if (!mysql_bin_log.find_in_binlog_state(gtid->domain_id,
1755                                                     gtid->server_id,
1756                                                     &master_gtid) ||
1757                 master_gtid.seq_no < gtid->seq_no)
1758             {
1759               int err;
1760               const char *errormsg;
1761               *error_gtid= *gtid;
1762               give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid);
1763               info->error= err;
1764               return errormsg;
1765             }
1766             gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN;
1767           }
1768 
1769           /* Skip this event group if we have not yet reached slave start pos. */
1770           if (event_gtid.server_id != gtid->server_id ||
1771               event_gtid.seq_no <= gtid->seq_no)
1772             info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
1773                                 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
1774           if (event_gtid.server_id == gtid->server_id &&
1775               event_gtid.seq_no >= gtid->seq_no)
1776           {
1777             if (info->slave_gtid_strict_mode &&
1778                 event_gtid.seq_no > gtid->seq_no &&
1779                 !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
1780             {
1781               /*
1782                 In strict mode, it is an error if the slave requests to start
1783                 in a "hole" in the master's binlog: a GTID that does not
1784                 exist, even though both the prior and subsequent seq_no exists
1785                 for same domain_id and server_id.
1786               */
1787               info->error= ER_GTID_START_FROM_BINLOG_HOLE;
1788               *error_gtid= *gtid;
1789               return "The binlog on the master is missing the GTID requested "
1790                 "by the slave (even though both a prior and a subsequent "
1791                 "sequence number does exist), and GTID strict mode is enabled.";
1792             }
1793 
1794             /*
1795               Send a fake Gtid_list event to the slave.
1796               This allows the slave to update its current binlog position
1797               so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
1798               The fake event will be sent at the end of this event group.
1799             */
1800             info->send_fake_gtid_list= true;
1801 
1802             /*
1803               Delete this entry if we have reached slave start position (so we
1804               will not skip subsequent events and won't have to look them up
1805               and check).
1806             */
1807             gtid_state->remove(gtid);
1808           }
1809         }
1810       }
1811 
1812       if (until_gtid_state)
1813       {
1814         gtid= until_gtid_state->find(event_gtid.domain_id);
1815         if (gtid == NULL)
1816         {
1817           /*
1818             This domain already reached the START SLAVE UNTIL stop condition,
1819             so skip this event group.
1820           */
1821           info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
1822                               GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
1823         }
1824         else if (event_gtid.server_id == gtid->server_id &&
1825                  event_gtid.seq_no >= gtid->seq_no)
1826         {
1827           /*
1828             We have reached the stop condition.
1829             Delete this domain_id from the hash, so we will skip all further
1830             events in this domain and eventually stop when all domains are
1831             done.
1832           */
1833           uint64 until_seq_no= gtid->seq_no;
1834           until_gtid_state->remove(gtid);
1835           if (until_gtid_state->count() == 0)
1836             info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
1837                                      GTID_UNTIL_STOP_AFTER_STANDALONE :
1838                                      GTID_UNTIL_STOP_AFTER_TRANSACTION);
1839           if (event_gtid.seq_no > until_seq_no)
1840           {
1841             /*
1842               The GTID in START SLAVE UNTIL condition is missing in our binlog.
1843               This should normally not happen (user error), but since we can be
1844               sure that we are now beyond the position that the UNTIL condition
1845               should be in, we can just stop now. And we also need to skip this
1846               event group (as it is beyond the UNTIL condition).
1847             */
1848             info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
1849                                 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
1850           }
1851         }
1852       }
1853     }
1854   }
1855 
1856   /*
1857     Skip event group if we have not yet reached the correct slave GTID position.
1858 
1859     Note that slave that understands GTID can also tolerate holes, so there is
1860     no need to supply dummy event.
1861   */
1862   switch (info->gtid_skip_group)
1863   {
1864   case GTID_SKIP_STANDALONE:
1865     if (!Log_event::is_part_of_group(event_type))
1866       info->gtid_skip_group= GTID_SKIP_NOT;
1867     return NULL;
1868   case GTID_SKIP_TRANSACTION:
1869     if (event_type == XID_EVENT ||
1870         (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
1871          Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
1872                                                   len - ev_offset,
1873                                                   current_checksum_alg)))
1874       info->gtid_skip_group= GTID_SKIP_NOT;
1875     return NULL;
1876   case GTID_SKIP_NOT:
1877     break;
1878   }
1879 
1880   /* Do not send annotate_rows events unless slave requested it. */
1881   if (event_type == ANNOTATE_ROWS_EVENT &&
1882       !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
1883   {
1884     if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
1885     {
1886       /* This slave can tolerate events omitted from the binlog stream. */
1887       return NULL;
1888     }
1889     else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE)
1890     {
1891       /*
1892         The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as
1893         it will not log them in its own binary log). However, it understands the
1894         event and will just ignore it, and it would break if we omitted it,
1895         leaving a hole in the binlog stream. So just send the event as-is.
1896       */
1897     }
1898     else
1899     {
1900       /*
1901         The slave does not understand ANNOTATE_ROWS_EVENT.
1902 
1903         Older MariaDB slaves (and MySQL slaves) will break replication if there
1904         are holes in the binlog stream (they will miscompute the binlog offset
1905         and request the wrong position when reconnecting).
1906 
1907         So replace the event with a dummy event of the same size that will be
1908         a no-operation on the slave.
1909       */
1910       if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
1911       {
1912         info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1913         return "Failed to replace row annotate event with dummy: too small event.";
1914       }
1915     }
1916   }
1917 
1918   /*
1919     Replace GTID events with old-style BEGIN events for slaves that do not
1920     understand global transaction IDs. For stand-alone events, where there is
1921     no terminating COMMIT query event, omit the GTID event or replace it with
1922     a dummy event, as appropriate.
1923   */
1924   if (event_type == GTID_EVENT &&
1925       mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)
1926   {
1927     bool need_dummy=
1928       mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES;
1929     bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy,
1930                                                     ev_offset,
1931                                                     current_checksum_alg);
1932     if (err)
1933     {
1934       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1935       return "Failed to replace GTID event with backwards-compatible event: "
1936              "currupt event.";
1937     }
1938     if (!need_dummy)
1939       return NULL;
1940   }
1941 
1942   /*
1943     Do not send binlog checkpoint or gtid list events to a slave that does not
1944     understand it.
1945   */
1946   if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
1947        mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) ||
1948       (unlikely(event_type == GTID_LIST_EVENT) &&
1949        mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID))
1950   {
1951     if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
1952     {
1953       /* This slave can tolerate events omitted from the binlog stream. */
1954       return NULL;
1955     }
1956     else
1957     {
1958       /*
1959         The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy
1960         event instead, with same length so slave does not get confused about
1961         binlog positions.
1962       */
1963       if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
1964       {
1965         info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1966         return "Failed to replace binlog checkpoint or gtid list event with "
1967                "dummy: too small event.";
1968       }
1969     }
1970   }
1971 
1972   /*
1973     Skip events with the @@skip_replication flag set, if slave requested
1974     skipping of such events.
1975   */
1976   if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION)
1977   {
1978     uint16 event_flags= uint2korr(&((*packet)[FLAGS_OFFSET + ev_offset]));
1979 
1980     if (event_flags & LOG_EVENT_SKIP_REPLICATION_F)
1981       return NULL;
1982   }
1983 
1984   THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
1985 
1986   pos= my_b_tell(log);
1987   if (repl_semisync_master.update_sync_header(info->thd,
1988                                               (uchar*) packet->c_ptr_safe(),
1989                                               info->log_file_name + info->dirlen,
1990                                               pos, &need_sync))
1991   {
1992     info->error= ER_UNKNOWN_ERROR;
1993     return "run 'before_send_event' hook failed";
1994   }
1995 
1996   if (my_net_write(info->net, (uchar*) packet->ptr(), len))
1997   {
1998     info->error= ER_UNKNOWN_ERROR;
1999     return "Failed on my_net_write()";
2000   }
2001 
2002   DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
2003   if (event_type == LOAD_EVENT)
2004   {
2005     if (send_file(info->thd))
2006     {
2007       info->error= ER_UNKNOWN_ERROR;
2008       return "failed in send_file()";
2009     }
2010   }
2011 
2012   if (need_sync && repl_semisync_master.flush_net(info->thd,
2013                                                   packet->c_ptr_safe()))
2014   {
2015     info->error= ER_UNKNOWN_ERROR;
2016     return "Failed to run hook 'after_send_event'";
2017   }
2018 
2019   return NULL;    /* Success */
2020 }
2021 
2022 static int check_start_offset(binlog_send_info *info,
2023                               const char *log_file_name,
2024                               my_off_t pos)
2025 {
2026   IO_CACHE log;
2027   File file= -1;
2028 
2029   /** check that requested position is inside of file */
2030   if ((file=open_binlog(&log, log_file_name, &info->errmsg)) < 0)
2031   {
2032     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2033     return 1;
2034   }
2035 
2036   if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
2037   {
2038     const char* msg= "Client requested master to start replication from "
2039         "impossible position";
2040 
2041     info->errmsg= NULL; // don't do further modifications of error_text
2042     snprintf(info->error_text, sizeof(info->error_text),
2043              "%s; the first event '%s' at %lld, "
2044              "the last event read from '%s' at %d, "
2045              "the last byte read from '%s' at %d.",
2046              msg,
2047              my_basename(info->start_log_file_name), pos,
2048              my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE,
2049              my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE);
2050     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2051     goto err;
2052   }
2053 
2054 err:
2055   end_io_cache(&log);
2056   mysql_file_close(file, MYF(MY_WME));
2057   return info->error;
2058 }
2059 
2060 static int init_binlog_sender(binlog_send_info *info,
2061                               LOG_INFO *linfo,
2062                               const char *log_ident,
2063                               my_off_t *pos)
2064 {
2065   THD *thd= info->thd;
2066   int error;
2067   char str_buf[128];
2068   String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
2069   char str_buf2[128];
2070   String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
2071   connect_gtid_state.length(0);
2072 
2073   /** save start file/pos that was requested by slave */
2074   strmake(info->start_log_file_name, log_ident,
2075           sizeof(info->start_log_file_name));
2076   info->start_pos= *pos;
2077 
2078   /** init last pos */
2079   info->last_pos= *pos;
2080 
2081   info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
2082   info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
2083   info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
2084   DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
2085                   info->using_gtid_state= false;);
2086 
2087   if (info->using_gtid_state)
2088   {
2089     info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
2090     info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
2091     if (get_slave_until_gtid(thd, &slave_until_gtid_str))
2092       info->until_gtid_state= &info->until_gtid_state_obj;
2093   }
2094 
2095   DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
2096     {
2097       DBUG_SET("-d,binlog_force_reconnect_after_22_events");
2098       DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
2099       info->dbug_reconnect_counter= 22;
2100     });
2101 
2102   if (global_system_variables.log_warnings > 1)
2103   {
2104     sql_print_information(
2105         "Start binlog_dump to slave_server(%lu), pos(%s, %lu), "
2106         "using_gtid(%d), gtid('%s')", thd->variables.server_id,
2107         log_ident, (ulong)*pos, info->using_gtid_state,
2108         connect_gtid_state.c_ptr_quick());
2109   }
2110 
2111 #ifndef DBUG_OFF
2112   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
2113   {
2114     info->errmsg= "Master failed COM_BINLOG_DUMP to test if slave can recover";
2115     info->error= ER_UNKNOWN_ERROR;
2116     return 1;
2117   }
2118 #endif
2119 
2120   if (!mysql_bin_log.is_open())
2121   {
2122     info->errmsg= "Binary log is not open";
2123     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2124     return 1;
2125   }
2126 
2127   char search_file_name[FN_REFLEN];
2128   const char *name=search_file_name;
2129   if (info->using_gtid_state)
2130   {
2131     if (info->gtid_state.load(connect_gtid_state.c_ptr_quick(),
2132                              connect_gtid_state.length()))
2133     {
2134       info->errmsg= "Out of memory or malformed slave request when obtaining "
2135           "start position from GTID state";
2136       info->error= ER_UNKNOWN_ERROR;
2137       return 1;
2138     }
2139     if (info->until_gtid_state &&
2140         info->until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
2141                                     slave_until_gtid_str.length()))
2142     {
2143       info->errmsg= "Out of memory or malformed slave request when "
2144           "obtaining UNTIL position sent from slave";
2145       info->error= ER_UNKNOWN_ERROR;
2146       return 1;
2147     }
2148     if (unlikely((error= check_slave_start_position(info, &info->errmsg,
2149                                                     &info->error_gtid))))
2150     {
2151       info->error= error;
2152       return 1;
2153     }
2154     if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state,
2155                                              search_file_name,
2156                                              info->until_gtid_state)))
2157     {
2158       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2159       return 1;
2160     }
2161 
2162     /* start from beginning of binlog file */
2163     *pos = 4;
2164   }
2165   else
2166   {
2167     if (log_ident[0])
2168       mysql_bin_log.make_log_name(search_file_name, log_ident);
2169     else
2170       name=0; // Find first log
2171   }
2172   linfo->index_file_offset= 0;
2173 
2174   if (mysql_bin_log.find_log_pos(linfo, name, 1))
2175   {
2176     info->errmsg= "Could not find first log file name in binary "
2177         "log index file";
2178     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2179     return 1;
2180   }
2181 
2182   // set current pos too
2183   linfo->pos= *pos;
2184 
2185   // note: publish that we use file, before we open it
2186   thd->current_linfo= linfo;
2187 
2188   if (check_start_offset(info, linfo->log_file_name, *pos))
2189     return 1;
2190 
2191   if (*pos > BIN_LOG_HEADER_SIZE)
2192   {
2193     /*
2194       mark that first format descriptor with "log_pos=0", so the slave
2195       should not increment master's binlog position
2196       (rli->group_master_log_pos)
2197     */
2198     info->clear_initial_log_pos= true;
2199   }
2200 
2201   return 0;
2202 }
2203 
2204 /**
2205  * send format descriptor event for one binlog file
2206  */
2207 static int send_format_descriptor_event(binlog_send_info *info, IO_CACHE *log,
2208                                         LOG_INFO *linfo, my_off_t start_pos)
2209 {
2210   int error;
2211   ulong ev_offset;
2212   THD *thd= info->thd;
2213   String *packet= info->packet;
2214   Log_event_type event_type;
2215   bool initial_log_pos= info->clear_initial_log_pos;
2216   DBUG_ENTER("send_format_descriptor_event");
2217 
2218   /**
2219    * 1) reset fdev before each log-file
2220    * 2) read first event, should be the format descriptor
2221    * 3) read second event, *might* be start encryption event
2222    *    if it's isn't, seek back to undo this read
2223    */
2224   if (info->fdev != NULL)
2225     delete info->fdev;
2226 
2227   if (!(info->fdev= new Format_description_log_event(3)))
2228   {
2229     info->errmsg= "Out of memory initializing format_description event";
2230     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2231     DBUG_RETURN(1);
2232   }
2233 
2234   /* reset transmit packet for the event read from binary log file */
2235   if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
2236     DBUG_RETURN(1);
2237 
2238   /*
2239     Try to find a Format_description_log_event at the beginning of
2240     the binlog
2241   */
2242   info->last_pos= my_b_tell(log);
2243   error= Log_event::read_log_event(log, packet, info->fdev,
2244                                    opt_master_verify_checksum
2245                                    ? info->current_checksum_alg
2246                                    : BINLOG_CHECKSUM_ALG_OFF);
2247   linfo->pos= my_b_tell(log);
2248 
2249   if (unlikely(error))
2250   {
2251     set_read_error(info, error);
2252     DBUG_RETURN(1);
2253   }
2254 
2255   event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
2256 
2257   /*
2258     The packet has offsets equal to the normal offsets in a
2259     binlog event + ev_offset (the first ev_offset characters are
2260     the header (default \0)).
2261   */
2262   DBUG_PRINT("info",
2263              ("Looked for a Format_description_log_event, "
2264               "found event type %d", (int)event_type));
2265 
2266   if (event_type != FORMAT_DESCRIPTION_EVENT)
2267   {
2268     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2269     info->errmsg= "Failed to find format descriptor event in start of binlog";
2270     sql_print_warning("Failed to find format descriptor event in "
2271                       "start of binlog: %s",
2272                       info->log_file_name);
2273     DBUG_RETURN(1);
2274   }
2275 
2276   info->current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
2277                                                packet->length() - ev_offset);
2278 
2279   DBUG_ASSERT(info->current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
2280               info->current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
2281               info->current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
2282 
2283   if (!is_slave_checksum_aware(thd) &&
2284       info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
2285       info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
2286   {
2287     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2288     info->errmsg= "Slave can not handle replication events with the "
2289         "checksum that master is configured to log";
2290     sql_print_warning("Master is configured to log replication events "
2291                       "with checksum, but will not send such events to "
2292                       "slaves that cannot process them");
2293     DBUG_RETURN(1);
2294   }
2295 
2296   uint ev_len= packet->length() - ev_offset;
2297   if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
2298     ev_len-= BINLOG_CHECKSUM_LEN;
2299 
2300   Format_description_log_event *tmp;
2301   if (!(tmp= new Format_description_log_event(packet->ptr() + ev_offset,
2302                                               ev_len, info->fdev)))
2303   {
2304     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2305     info->errmsg= "Corrupt Format_description event found "
2306         "or out-of-memory";
2307     DBUG_RETURN(1);
2308   }
2309   delete info->fdev;
2310   info->fdev= tmp;
2311 
2312   (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
2313 
2314   if (initial_log_pos)
2315   {
2316     info->clear_initial_log_pos= false;
2317     /*
2318       mark that this event with "log_pos=0", so the slave
2319       should not increment master's binlog position
2320       (rli->group_master_log_pos)
2321     */
2322     int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0);
2323 
2324     /*
2325       if reconnect master sends FD event with `created' as 0
2326       to avoid destroying temp tables.
2327     */
2328     int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
2329               ST_CREATED_OFFSET+ev_offset, (ulong) 0);
2330 
2331     /* fix the checksum due to latest changes in header */
2332     fix_checksum(info->current_checksum_alg, packet, ev_offset);
2333   }
2334   else if (info->using_gtid_state)
2335   {
2336     /*
2337       If this event has the field `created' set, then it will cause the
2338       slave to delete all active temporary tables. This must not happen
2339       if the slave received any later GTIDs in a previous connect, as
2340       those GTIDs might have created new temporary tables that are still
2341       needed.
2342 
2343       So here, we check if the starting GTID position was already
2344       reached before this format description event. If not, we clear the
2345       `created' flag to preserve temporary tables on the slave. (If the
2346       slave connects at a position past this event, it means that it
2347       already received and handled it in a previous connect).
2348     */
2349     if (!info->gtid_state.is_pos_reached())
2350     {
2351       int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
2352                 ST_CREATED_OFFSET+ev_offset, (ulong) 0);
2353       fix_checksum(info->current_checksum_alg, packet, ev_offset);
2354     }
2355   }
2356 
2357   /* send it */
2358   if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))
2359   {
2360     info->errmsg= "Failed on my_net_write()";
2361     info->error= ER_UNKNOWN_ERROR;
2362     DBUG_RETURN(1);
2363   }
2364 
2365   /*
2366     Read the following Start_encryption_log_event and send it to slave as
2367     Ignorable_log_event. Although Slave doesn't need to know whether master's
2368     binlog is encrypted but it needs to update slave log pos (for mysqlbinlog).
2369 
2370     If slave want to encrypt its logs, it should generate its own
2371     random nonce, it should not use the one from the master.
2372   */
2373   /* reset transmit packet for the event read from binary log file */
2374   if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
2375     DBUG_RETURN(1);
2376   info->last_pos= linfo->pos;
2377   error= Log_event::read_log_event(log, packet, info->fdev,
2378                                    opt_master_verify_checksum
2379                                    ? info->current_checksum_alg
2380                                    : BINLOG_CHECKSUM_ALG_OFF);
2381   linfo->pos= my_b_tell(log);
2382 
2383   if (unlikely(error))
2384   {
2385     set_read_error(info, error);
2386     DBUG_RETURN(1);
2387   }
2388 
2389   event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET + ev_offset]);
2390   if (event_type == START_ENCRYPTION_EVENT)
2391   {
2392     Start_encryption_log_event *sele= (Start_encryption_log_event *)
2393       Log_event::read_log_event(packet->ptr() + ev_offset, packet->length()
2394                                 - ev_offset, &info->errmsg, info->fdev,
2395                                 BINLOG_CHECKSUM_ALG_OFF);
2396     if (!sele)
2397     {
2398       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2399       DBUG_RETURN(1);
2400     }
2401 
2402     if (info->fdev->start_decryption(sele))
2403     {
2404       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2405       info->errmsg= "Could not decrypt binlog: encryption key error";
2406       delete sele;
2407       DBUG_RETURN(1);
2408     }
2409     /* Make it Ignorable_log_event and send it */
2410     (*packet)[FLAGS_OFFSET+ev_offset] |= LOG_EVENT_IGNORABLE_F;
2411     if (initial_log_pos)
2412       int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0);
2413     /* fix the checksum due to latest changes in header */
2414     fix_checksum(info->current_checksum_alg, packet, ev_offset);
2415     if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))
2416     {
2417       info->errmsg= "Failed on my_net_write()";
2418       info->error= ER_UNKNOWN_ERROR;
2419       DBUG_RETURN(1);
2420     }
2421     delete sele;
2422   }
2423   else if (start_pos == BIN_LOG_HEADER_SIZE)
2424   {
2425     /*
2426       not Start_encryption_log_event - seek back. But only if
2427       send_one_binlog_file() isn't going to seek anyway
2428     */
2429     my_b_seek(log, info->last_pos);
2430     linfo->pos= info->last_pos;
2431   }
2432 
2433 
2434   /** all done */
2435   DBUG_RETURN(0);
2436 }
2437 
2438 static bool should_stop(binlog_send_info *info)
2439 {
2440   return
2441       info->net->error ||
2442       info->net->vio == NULL ||
2443       info->thd->killed ||
2444       info->error != 0 ||
2445       info->should_stop;
2446 }
2447 
2448 /**
2449  * wait for new events to enter binlog
2450  * this function will send heartbeats while waiting if so configured
2451  */
2452 static int wait_new_events(binlog_send_info *info,         /* in */
2453                            LOG_INFO* linfo,                /* in */
2454                            char binlog_end_pos_filename[], /* out */
2455                            my_off_t *end_pos_ptr)          /* out */
2456 {
2457   int ret= 1;
2458   PSI_stage_info old_stage;
2459 
2460   mysql_bin_log.lock_binlog_end_pos();
2461   info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(),
2462                         mysql_bin_log.get_binlog_end_pos_lock(),
2463                         &stage_master_has_sent_all_binlog_to_slave,
2464                         &old_stage);
2465 
2466   while (!should_stop(info))
2467   {
2468     *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
2469     if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0)
2470     {
2471       /* there has been a log file switch, we don't need to wait */
2472       ret= 0;
2473       break;
2474     }
2475 
2476     if (linfo->pos < *end_pos_ptr)
2477     {
2478       /* there is data to read, we don't need to wait */
2479       ret= 0;
2480       break;
2481     }
2482 
2483     if (info->heartbeat_period)
2484     {
2485       struct timespec ts;
2486       set_timespec_nsec(ts, info->heartbeat_period);
2487       ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, &ts);
2488       if (ret == ETIMEDOUT || ret == ETIME)
2489       {
2490         struct event_coordinates coord = { linfo->log_file_name, linfo->pos };
2491 #ifndef DBUG_OFF
2492         const ulong hb_info_counter_limit = 3;
2493         if (info->hb_info_counter < hb_info_counter_limit)
2494         {
2495           sql_print_information("master sends heartbeat message %s:%llu",
2496                                 linfo->log_file_name, linfo->pos);
2497           info->hb_info_counter++;
2498           if (info->hb_info_counter == hb_info_counter_limit)
2499             sql_print_information("the rest of heartbeat info skipped ...");
2500         }
2501 #endif
2502         mysql_bin_log.unlock_binlog_end_pos();
2503         ret= send_heartbeat_event(info,
2504                                   info->net, info->packet, &coord,
2505                                   info->current_checksum_alg);
2506         mysql_bin_log.lock_binlog_end_pos();
2507 
2508         if (ret)
2509         {
2510           ret= 1; // error
2511           break;
2512         }
2513         /**
2514          * re-read heartbeat period after each sent
2515          */
2516         info->heartbeat_period= get_heartbeat_period(info->thd);
2517       }
2518       else if (ret != 0)
2519       {
2520         ret= 1; // error
2521         break;
2522       }
2523     }
2524     else
2525     {
2526       ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL);
2527       if (ret != 0 && ret != ETIMEDOUT && ret != ETIME)
2528       {
2529         ret= 1; // error
2530         break;
2531       }
2532     }
2533   }
2534 
2535   /* it releases the lock set in ENTER_COND */
2536   info->thd->EXIT_COND(&old_stage);
2537   return ret;
2538 }
2539 
2540 /**
2541  * get end pos of current log file, this function
2542  * will wait if there is nothing available
2543  */
2544 static my_off_t get_binlog_end_pos(binlog_send_info *info,
2545                                    IO_CACHE* log,
2546                                    LOG_INFO* linfo)
2547 {
2548   my_off_t log_pos= my_b_tell(log);
2549 
2550   /**
2551    * get current binlog end pos
2552    */
2553   mysql_bin_log.lock_binlog_end_pos();
2554   char binlog_end_pos_filename[FN_REFLEN];
2555   my_off_t end_pos= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
2556   mysql_bin_log.unlock_binlog_end_pos();
2557 
2558   do
2559   {
2560     if (strcmp(binlog_end_pos_filename, linfo->log_file_name) != 0)
2561     {
2562       /**
2563        * this file is not active, since it's not written to again,
2564        * it safe to check file length and use that as end_pos
2565        */
2566       end_pos= my_b_filelength(log);
2567 
2568       if (log_pos == end_pos)
2569         return 0;        // already at end of file inactive file
2570       else
2571         return end_pos;  // return size of inactive file
2572     }
2573     else
2574     {
2575       /**
2576        * this is the active file
2577        */
2578 
2579       if (log_pos < end_pos)
2580       {
2581         /**
2582          * there is data available to read
2583          */
2584         return end_pos;
2585       }
2586 
2587       /**
2588        * check if we should wait for more data
2589        */
2590       if ((info->flags & BINLOG_DUMP_NON_BLOCK) ||
2591           (info->thd->variables.server_id == 0))
2592       {
2593         info->should_stop= true;
2594         return 0;
2595       }
2596 
2597       /**
2598        * flush data before waiting
2599        */
2600       if (net_flush(info->net))
2601       {
2602         info->errmsg= "failed on net_flush()";
2603         info->error= ER_UNKNOWN_ERROR;
2604         return 1;
2605       }
2606 
2607       if (wait_new_events(info, linfo, binlog_end_pos_filename, &end_pos))
2608         return 1;
2609     }
2610   } while (!should_stop(info));
2611 
2612   return 0;
2613 }
2614 
2615 /**
2616  * This function sends events from one binlog file
2617  * but only up until end_pos
2618  *
2619  * return 0 - OK
2620  *        else NOK
2621  */
2622 static int send_events(binlog_send_info *info, IO_CACHE* log, LOG_INFO* linfo,
2623                        my_off_t end_pos)
2624 {
2625   int error;
2626   ulong ev_offset;
2627 
2628   String *packet= info->packet;
2629   linfo->pos= my_b_tell(log);
2630   info->last_pos= my_b_tell(log);
2631 
2632   log->end_of_file= end_pos;
2633   while (linfo->pos < end_pos)
2634   {
2635     if (should_stop(info))
2636       return 0;
2637 
2638     /* reset the transmit packet for the event read from binary log
2639        file */
2640     if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
2641       return 1;
2642 
2643     info->last_pos= linfo->pos;
2644     error= Log_event::read_log_event(log, packet, info->fdev,
2645                        opt_master_verify_checksum ? info->current_checksum_alg
2646                                                   : BINLOG_CHECKSUM_ALG_OFF);
2647     linfo->pos= my_b_tell(log);
2648 
2649     if (unlikely(error))
2650     {
2651       set_read_error(info, error);
2652       return 1;
2653     }
2654 
2655     Log_event_type event_type=
2656         (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
2657 
2658 #ifndef DBUG_OFF
2659     if (info->dbug_reconnect_counter > 0)
2660     {
2661       --info->dbug_reconnect_counter;
2662       if (info->dbug_reconnect_counter == 0)
2663       {
2664         info->errmsg= "DBUG-injected forced reconnect";
2665         info->error= ER_UNKNOWN_ERROR;
2666         return 1;
2667       }
2668     }
2669 #endif
2670 
2671 #ifdef ENABLED_DEBUG_SYNC
2672     DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
2673                     {
2674                       if (event_type == XID_EVENT)
2675                       {
2676                         net_flush(info->net);
2677                         const char act[]=
2678                             "now "
2679                             "wait_for signal.continue";
2680                         DBUG_ASSERT(debug_sync_service);
2681                         DBUG_ASSERT(!debug_sync_set_action(
2682                             info->thd,
2683                             STRING_WITH_LEN(act)));
2684 
2685                         const char act2[]=
2686                             "now "
2687                             "signal signal.continued";
2688                         DBUG_ASSERT(!debug_sync_set_action(
2689                             info->thd,
2690                             STRING_WITH_LEN(act2)));
2691                       }
2692                     });
2693 #endif
2694 
2695     if (event_type != START_ENCRYPTION_EVENT &&
2696         ((info->errmsg= send_event_to_slave(info, event_type, log,
2697                                            ev_offset, &info->error_gtid))))
2698       return 1;
2699 
2700     if (unlikely(info->send_fake_gtid_list) &&
2701         info->gtid_skip_group == GTID_SKIP_NOT)
2702     {
2703       Gtid_list_log_event glev(&info->until_binlog_state, 0);
2704 
2705       if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) ||
2706           fake_gtid_list_event(info, &glev, &info->errmsg, (uint32)my_b_tell(log)))
2707       {
2708         info->error= ER_UNKNOWN_ERROR;
2709         return 1;
2710       }
2711       info->send_fake_gtid_list= false;
2712     }
2713 
2714     if (info->until_gtid_state &&
2715         is_until_reached(info, &ev_offset, event_type, &info->errmsg,
2716                          (uint32)my_b_tell(log)))
2717     {
2718       if (info->errmsg)
2719       {
2720         info->error= ER_UNKNOWN_ERROR;
2721         return 1;
2722       }
2723       info->should_stop= true;
2724       return 0;
2725     }
2726 
2727     /* Abort server before it sends the XID_EVENT */
2728     DBUG_EXECUTE_IF("crash_before_send_xid",
2729                     {
2730                       if (event_type == XID_EVENT)
2731                       {
2732                         my_sleep(2000000);
2733                         DBUG_SUICIDE();
2734                       }
2735                     });
2736   }
2737 
2738   return 0;
2739 }
2740 
2741 /**
2742  * This function sends one binlog file to slave
2743  *
2744  * return 0 - OK
2745  *        1 - NOK
2746  */
2747 static int send_one_binlog_file(binlog_send_info *info,
2748                                 IO_CACHE* log,
2749                                 LOG_INFO* linfo,
2750                                 my_off_t start_pos)
2751 {
2752   mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
2753 
2754   /* seek to the requested position, to start the requested dump */
2755   if (start_pos != BIN_LOG_HEADER_SIZE)
2756   {
2757     my_b_seek(log, start_pos);
2758     linfo->pos= start_pos;
2759   }
2760 
2761   while (!should_stop(info))
2762   {
2763     /**
2764      * get end pos of current log file, this function
2765      * will wait if there is nothing available
2766      */
2767     my_off_t end_pos= get_binlog_end_pos(info, log, linfo);
2768     if (end_pos <= 1)
2769     {
2770       /** end of file or error */
2771       return (int)end_pos;
2772     }
2773     info->dirlen= dirname_length(info->log_file_name);
2774     /**
2775      * send events from current position up to end_pos
2776      */
2777     if (send_events(info, log, linfo, end_pos))
2778       return 1;
2779   }
2780 
2781   return 1;
2782 }
2783 
2784 void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
2785                        ushort flags)
2786 {
2787   LOG_INFO linfo;
2788 
2789   IO_CACHE log;
2790   File file = -1;
2791   String* const packet= &thd->packet;
2792 
2793   binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
2794   binlog_send_info *info= &infoobj;
2795   bool has_transmit_started= false;
2796 
2797   int old_max_allowed_packet= thd->variables.max_allowed_packet;
2798   thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
2799 
2800   DBUG_ENTER("mysql_binlog_send");
2801   DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
2802 
2803   bzero((char*) &log,sizeof(log));
2804 
2805   if (init_binlog_sender(info, &linfo, log_ident, &pos))
2806     goto err;
2807 
2808   has_transmit_started= true;
2809 
2810   /* Check if the dump thread is created by a slave with semisync enabled. */
2811   thd->semi_sync_slave = is_semi_sync_slave();
2812 
2813   DBUG_ASSERT(pos == linfo.pos);
2814 
2815   if (repl_semisync_master.dump_start(thd, linfo.log_file_name, linfo.pos))
2816   {
2817     info->errmsg= "Failed to run hook 'transmit_start'";
2818     info->error= ER_UNKNOWN_ERROR;
2819     goto err;
2820   }
2821 
2822   /*
2823     heartbeat_period from @master_heartbeat_period user variable
2824     NOTE: this is initialized after transmit_start-hook so that
2825     the hook can affect value of heartbeat period
2826   */
2827   info->heartbeat_period= get_heartbeat_period(thd);
2828 
2829   while (!should_stop(info))
2830   {
2831     /*
2832       Tell the client about the log name with a fake Rotate event;
2833       this is needed even if we also send a Format_description_log_event
2834       just after, because that event does not contain the binlog's name.
2835       Note that as this Rotate event is sent before
2836       Format_description_log_event, the slave cannot have any info to
2837       understand this event's format, so the header len of
2838       Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
2839       than other events except FORMAT_DESCRIPTION_EVENT).
2840       Before 4.0.14 we called fake_rotate_event below only if (pos ==
2841       BIN_LOG_HEADER_SIZE), because if this is false then the slave
2842       already knows the binlog's name.
2843       Since, we always call fake_rotate_event; if the slave already knew
2844       the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
2845       useless but does not harm much. It is nice for 3.23 (>=.58) slaves
2846       which test Rotate events to see if the master is 4.0 (then they
2847       choose to stop because they can't replicate 4.0); by always calling
2848       fake_rotate_event we are sure that 3.23.58 and newer will detect the
2849       problem as soon as replication starts (BUG#198).
2850       Always calling fake_rotate_event makes sending of normal
2851       (=from-binlog) Rotate events a priori unneeded, but it is not so
2852       simple: the 2 Rotate events are not equivalent, the normal one is
2853       before the Stop event, the fake one is after. If we don't send the
2854       normal one, then the Stop event will be interpreted (by existing 4.0
2855       slaves) as "the master stopped", which is wrong. So for safety,
2856       given that we want minimum modification of 4.0, we send the normal
2857       and fake Rotates.
2858     */
2859     if (fake_rotate_event(info, pos, &info->errmsg, info->current_checksum_alg))
2860     {
2861       /*
2862         This error code is not perfect, as fake_rotate_event() does not
2863         read anything from the binlog; if it fails it's because of an
2864         error in my_net_write(), fortunately it will say so in errmsg.
2865       */
2866       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2867       goto err;
2868     }
2869 
2870     if ((file=open_binlog(&log, linfo.log_file_name, &info->errmsg)) < 0)
2871     {
2872       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2873       goto err;
2874     }
2875 
2876     if (send_format_descriptor_event(info, &log, &linfo, pos))
2877     {
2878       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2879       goto err;
2880     }
2881 
2882     /*
2883       We want to corrupt the first event that will be sent to the slave.
2884       But we do not want the corruption to happen early, eg. when client does
2885       BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
2886       set the real DBUG injection here.
2887     */
2888     DBUG_EXECUTE_IF("corrupt_read_log_event2_set",
2889                     {
2890                       DBUG_SET("-d,corrupt_read_log_event2_set");
2891                       DBUG_SET("+d,corrupt_read_log_event2");
2892                     });
2893 
2894     /*
2895       Handle the case of START SLAVE UNTIL with an UNTIL condition already
2896       fulfilled at the start position.
2897 
2898       We will send one event, the format_description, and then stop.
2899     */
2900     if (info->until_gtid_state && info->until_gtid_state->count() == 0)
2901       info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
2902 
2903     THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
2904     if (send_one_binlog_file(info, &log, &linfo, pos))
2905       break;
2906 
2907     if (should_stop(info))
2908       break;
2909 
2910     DBUG_EXECUTE_IF("wait_after_binlog_EOF",
2911                     {
2912                       const char act[]= "now wait_for signal.rotate_finished";
2913                       DBUG_ASSERT(!debug_sync_set_action(current_thd,
2914                                                          STRING_WITH_LEN(act)));
2915                     };);
2916 
2917     THD_STAGE_INFO(thd,
2918                    stage_finished_reading_one_binlog_switching_to_next_binlog);
2919     if (mysql_bin_log.find_next_log(&linfo, 1))
2920     {
2921       info->errmsg= "could not find next log";
2922       info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2923       break;
2924     }
2925 
2926     /** start from start of next file */
2927     pos= BIN_LOG_HEADER_SIZE;
2928 
2929     /** close current cache/file */
2930     end_io_cache(&log);
2931     mysql_file_close(file, MYF(MY_WME));
2932     file= -1;
2933   }
2934 
2935 err:
2936   THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
2937   if (has_transmit_started)
2938   {
2939     repl_semisync_master.dump_end(thd);
2940   }
2941 
2942   if (info->thd->killed == KILL_SLAVE_SAME_ID)
2943   {
2944     info->errmsg= "A slave with the same server_uuid/server_id as this slave "
2945                   "has connected to the master";
2946     info->error= ER_SLAVE_SAME_ID;
2947   }
2948 
2949   const bool binlog_open = my_b_inited(&log);
2950   if (file >= 0)
2951   {
2952     end_io_cache(&log);
2953     mysql_file_close(file, MYF(MY_WME));
2954   }
2955 
2956   thd->reset_current_linfo();
2957   thd->variables.max_allowed_packet= old_max_allowed_packet;
2958   delete info->fdev;
2959 
2960   if (likely(info->error == 0))
2961   {
2962     my_eof(thd);
2963     DBUG_VOID_RETURN;
2964   }
2965 
2966   if ((info->error == ER_MASTER_FATAL_ERROR_READING_BINLOG ||
2967        info->error == ER_SLAVE_SAME_ID) && binlog_open)
2968   {
2969     /*
2970        detailing the fatal error message with coordinates
2971        of the last position read.
2972     */
2973     my_snprintf(info->error_text, sizeof(info->error_text),
2974                 "%s; the first event '%s' at %lld, "
2975                 "the last event read from '%s' at %lld, "
2976                 "the last byte read from '%s' at %lld.",
2977                 info->errmsg,
2978                 my_basename(info->start_log_file_name), info->start_pos,
2979                 my_basename(info->log_file_name), info->last_pos,
2980                 my_basename(info->log_file_name), linfo.pos);
2981   }
2982   else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
2983   {
2984     my_snprintf(info->error_text, sizeof(info->error_text),
2985                 "Error: connecting slave requested to start from GTID "
2986                 "%u-%u-%llu, which is not in the master's binlog",
2987                 info->error_gtid.domain_id,
2988                 info->error_gtid.server_id,
2989                 info->error_gtid.seq_no);
2990     /* Use this error code so slave will know not to try reconnect. */
2991     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2992   }
2993   else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
2994   {
2995     my_snprintf(info->error_text, sizeof(info->error_text),
2996                 "Error: connecting slave requested to start from GTID "
2997                 "%u-%u-%llu, which is not in the master's binlog. Since the "
2998                 "master's binlog contains GTIDs with higher sequence numbers, "
2999                 "it probably means that the slave has diverged due to "
3000                 "executing extra erroneous transactions",
3001                 info->error_gtid.domain_id,
3002                 info->error_gtid.server_id,
3003                 info->error_gtid.seq_no);
3004     /* Use this error code so slave will know not to try reconnect. */
3005     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
3006   }
3007   else if (info->error == ER_GTID_START_FROM_BINLOG_HOLE)
3008   {
3009     my_snprintf(info->error_text, sizeof(info->error_text),
3010                 "The binlog on the master is missing the GTID %u-%u-%llu "
3011                 "requested by the slave (even though both a prior and a "
3012                 "subsequent sequence number does exist), and GTID strict mode "
3013                 "is enabled",
3014                 info->error_gtid.domain_id,
3015                 info->error_gtid.server_id,
3016                 info->error_gtid.seq_no);
3017     /* Use this error code so slave will know not to try reconnect. */
3018     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
3019   }
3020   else if (info->error == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
3021   {
3022     my_snprintf(info->error_text, sizeof(info->error_text),
3023                 "Failed to load replication slave GTID state from table %s.%s",
3024                 "mysql", rpl_gtid_slave_state_table_name.str);
3025     info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
3026   }
3027   else if (info->errmsg != NULL)
3028     strcpy(info->error_text, info->errmsg);
3029 
3030   my_message(info->error, info->error_text, MYF(0));
3031 
3032   DBUG_VOID_RETURN;
3033 }
3034 
3035 
3036 /**
3037   Execute a START SLAVE statement.
3038 
3039   @param thd Pointer to THD object for the client thread executing the
3040   statement.
3041 
3042   @param mi Pointer to Master_info object for the slave's IO thread.
3043 
3044   @param net_report If true, saves the exit status into thd->stmt_da.
3045 
3046   @retval 0 success
3047   @retval 1 error
3048   @retval -1 fatal error
3049 */
3050 
3051 int start_slave(THD* thd , Master_info* mi,  bool net_report)
3052 {
3053   int slave_errno= 0;
3054   int thread_mask;
3055   char master_info_file_tmp[FN_REFLEN];
3056   char relay_log_info_file_tmp[FN_REFLEN];
3057   DBUG_ENTER("start_slave");
3058 
3059   if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
3060     DBUG_RETURN(-1);
3061 
3062   create_logfile_name_with_suffix(master_info_file_tmp,
3063                                   sizeof(master_info_file_tmp),
3064                                   master_info_file, 0,
3065                                   &mi->cmp_connection_name);
3066   create_logfile_name_with_suffix(relay_log_info_file_tmp,
3067                                   sizeof(relay_log_info_file_tmp),
3068                                   relay_log_info_file, 0,
3069                                   &mi->cmp_connection_name);
3070 
3071   mi->lock_slave_threads();
3072   if (mi->killed)
3073   {
3074     /* connection was deleted while we waited for lock_slave_threads */
3075     mi->unlock_slave_threads();
3076     my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length,
3077              mi->connection_name.str);
3078     DBUG_RETURN(-1);
3079   }
3080 
3081   // Get a mask of _stopped_ threads
3082   init_thread_mask(&thread_mask,mi,1 /* inverse */);
3083 
3084   if (thd->lex->mi.gtid_pos_str.str)
3085   {
3086     if (thread_mask != (SLAVE_IO|SLAVE_SQL))
3087     {
3088       slave_errno= ER_SLAVE_WAS_RUNNING;
3089       goto err;
3090     }
3091     if (thd->lex->slave_thd_opt)
3092     {
3093       slave_errno= ER_BAD_SLAVE_UNTIL_COND;
3094       goto err;
3095     }
3096     if (mi->using_gtid == Master_info::USE_GTID_NO)
3097     {
3098       slave_errno= ER_UNTIL_REQUIRES_USING_GTID;
3099       goto err;
3100     }
3101   }
3102 
3103   /*
3104     Below we will start all stopped threads.  But if the user wants to
3105     start only one thread, do as if the other thread was running (as we
3106     don't wan't to touch the other thread), so set the bit to 0 for the
3107     other thread
3108   */
3109   if (thd->lex->slave_thd_opt)
3110     thread_mask&= thd->lex->slave_thd_opt;
3111   if (thread_mask) //some threads are stopped, start them
3112   {
3113     if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0,
3114 			 thread_mask))
3115       slave_errno=ER_MASTER_INFO;
3116     else if (!*mi->host)
3117     {
3118       slave_errno= ER_BAD_SLAVE; net_report= 0;
3119       my_message(slave_errno, "Misconfigured slave: MASTER_HOST was not set; Fix in config file or with CHANGE MASTER TO",
3120                  MYF(0));
3121     }
3122     else
3123     {
3124       /*
3125         If we will start SQL thread we will care about UNTIL options If
3126         not and they are specified we will ignore them and warn user
3127         about this fact.
3128       */
3129       if (thread_mask & SLAVE_SQL)
3130       {
3131         mysql_mutex_lock(&mi->rli.data_lock);
3132 
3133         if (thd->lex->mi.pos)
3134         {
3135           if (thd->lex->mi.relay_log_pos)
3136             slave_errno=ER_BAD_SLAVE_UNTIL_COND;
3137           mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
3138           mi->rli.until_log_pos= thd->lex->mi.pos;
3139           /*
3140              We don't check thd->lex->mi.log_file_name for NULL here
3141              since it is checked in sql_yacc.yy
3142           */
3143           strmake_buf(mi->rli.until_log_name, thd->lex->mi.log_file_name);
3144         }
3145         else if (thd->lex->mi.relay_log_pos)
3146         {
3147           mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
3148           mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
3149           strmake_buf(mi->rli.until_log_name, thd->lex->mi.relay_log_name);
3150         }
3151         else if (thd->lex->mi.gtid_pos_str.str)
3152         {
3153           if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str,
3154                                           thd->lex->mi.gtid_pos_str.length))
3155           {
3156             slave_errno= ER_INCORRECT_GTID_STATE;
3157             mysql_mutex_unlock(&mi->rli.data_lock);
3158             goto err;
3159           }
3160           mi->rli.until_condition= Relay_log_info::UNTIL_GTID;
3161         }
3162         else
3163           mi->rli.clear_until_condition();
3164 
3165         if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS ||
3166             mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS)
3167         {
3168           /* Preparing members for effective until condition checking */
3169           const char *p= fn_ext(mi->rli.until_log_name);
3170           char *p_end;
3171           if (*p)
3172           {
3173             //p points to '.'
3174             mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
3175             /*
3176               p_end points to the first invalid character. If it equals
3177               to p, no digits were found, error. If it contains '\0' it
3178               means  conversion went ok.
3179             */
3180             if (p_end==p || *p_end)
3181               slave_errno=ER_BAD_SLAVE_UNTIL_COND;
3182           }
3183           else
3184             slave_errno=ER_BAD_SLAVE_UNTIL_COND;
3185 
3186           /* mark the cached result of the UNTIL comparison as "undefined" */
3187           mi->rli.until_log_names_cmp_result=
3188             Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
3189         }
3190 
3191         if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
3192         {
3193           /* Issuing warning then started without --skip-slave-start */
3194           if (!opt_skip_slave_start)
3195             push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
3196                          ER_MISSING_SKIP_SLAVE,
3197                          ER_THD(thd, ER_MISSING_SKIP_SLAVE));
3198         }
3199 
3200         mysql_mutex_unlock(&mi->rli.data_lock);
3201       }
3202       else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
3203         push_warning(thd,
3204                      Sql_condition::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
3205                      ER_THD(thd, ER_UNTIL_COND_IGNORED));
3206 
3207       if (!slave_errno)
3208         slave_errno = start_slave_threads(thd,
3209                                           1,
3210                                           1 /* wait for start */,
3211                                           mi,
3212                                           master_info_file_tmp,
3213                                           relay_log_info_file_tmp,
3214                                           thread_mask);
3215     }
3216   }
3217   else
3218   {
3219     /* no error if all threads are already started, only a warning */
3220     push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
3221                  ER_THD(thd, ER_SLAVE_WAS_RUNNING));
3222   }
3223 
3224 err:
3225   mi->unlock_slave_threads();
3226   thd_proc_info(thd, 0);
3227 
3228   if (slave_errno)
3229   {
3230     if (net_report)
3231       my_error(slave_errno, MYF(0),
3232                (int) mi->connection_name.length,
3233                mi->connection_name.str);
3234     DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1);
3235   }
3236 
3237   DBUG_RETURN(0);
3238 }
3239 
3240 
3241 /**
3242   Execute a STOP SLAVE statement.
3243 
3244   @param thd Pointer to THD object for the client thread executing the
3245   statement.
3246 
3247   @param mi Pointer to Master_info object for the slave's IO thread.
3248 
3249   @param net_report If true, saves the exit status into thd->stmt_da.
3250 
3251   @retval 0 success
3252   @retval 1 error
3253   @retval -1 error
3254 */
3255 
3256 int stop_slave(THD* thd, Master_info* mi, bool net_report )
3257 {
3258   int slave_errno;
3259   DBUG_ENTER("stop_slave");
3260   DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str));
3261 
3262   if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
3263     DBUG_RETURN(-1);
3264   THD_STAGE_INFO(thd, stage_killing_slave);
3265   int thread_mask;
3266   mi->lock_slave_threads();
3267   /*
3268     Get a mask of _running_ threads.
3269     We don't have to test for mi->killed as the thread_mask will take care
3270     of checking if threads exists
3271   */
3272   init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
3273   /*
3274     Below we will stop all running threads.
3275     But if the user wants to stop only one thread, do as if the other thread
3276     was stopped (as we don't wan't to touch the other thread), so set the
3277     bit to 0 for the other thread
3278   */
3279   if (thd->lex->slave_thd_opt)
3280     thread_mask &= thd->lex->slave_thd_opt;
3281 
3282   if (thread_mask)
3283   {
3284     slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */);
3285   }
3286   else
3287   {
3288     //no error if both threads are already stopped, only a warning
3289     slave_errno= 0;
3290     push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
3291                  ER_THD(thd, ER_SLAVE_WAS_NOT_RUNNING));
3292   }
3293 
3294   mi->unlock_slave_threads();
3295 
3296   if (slave_errno)
3297   {
3298     if (net_report)
3299       my_message(slave_errno, ER_THD(thd, slave_errno), MYF(0));
3300     DBUG_RETURN(1);
3301   }
3302 
3303   DBUG_RETURN(0);
3304 }
3305 
3306 
3307 /**
3308   Execute a RESET SLAVE statement.
3309 
3310   @param thd Pointer to THD object of the client thread executing the
3311   statement.
3312 
3313   @param mi Pointer to Master_info object for the slave.
3314 
3315   @retval 0 success
3316   @retval 1 error
3317 */
3318 int reset_slave(THD *thd, Master_info* mi)
3319 {
3320   MY_STAT stat_area;
3321   char fname[FN_REFLEN];
3322   int thread_mask= 0, error= 0;
3323   uint sql_errno=ER_UNKNOWN_ERROR;
3324   const char* errmsg= "Unknown error occurred while resetting slave";
3325   char master_info_file_tmp[FN_REFLEN];
3326   char relay_log_info_file_tmp[FN_REFLEN];
3327   DBUG_ENTER("reset_slave");
3328 
3329   mi->lock_slave_threads();
3330   if (mi->killed)
3331   {
3332     /* connection was deleted while we waited for lock_slave_threads */
3333     mi->unlock_slave_threads();
3334     my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length,
3335              mi->connection_name.str);
3336     DBUG_RETURN(-1);
3337   }
3338 
3339   init_thread_mask(&thread_mask,mi,0 /* not inverse */);
3340   if (thread_mask) // We refuse if any slave thread is running
3341   {
3342     mi->unlock_slave_threads();
3343     my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
3344              mi->connection_name.str);
3345     DBUG_RETURN(ER_SLAVE_MUST_STOP);
3346   }
3347 
3348   // delete relay logs, clear relay log coordinates
3349   if (unlikely((error= purge_relay_logs(&mi->rli, thd,
3350 			       1 /* just reset */,
3351                                         &errmsg))))
3352   {
3353     sql_errno= ER_RELAY_LOG_FAIL;
3354     goto err;
3355   }
3356 
3357   /* Clear master's log coordinates and associated information */
3358   mi->clear_in_memory_info(thd->lex->reset_slave_info.all);
3359 
3360   /*
3361      Reset errors (the idea is that we forget about the
3362      old master).
3363   */
3364   mi->clear_error();
3365   mi->rli.clear_error();
3366   mi->rli.clear_until_condition();
3367   mi->rli.clear_sql_delay();
3368   mi->rli.slave_skip_counter= 0;
3369 
3370   // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
3371   end_master_info(mi);
3372 
3373   end_relay_log_info(&mi->rli);
3374   // and delete these two files
3375   create_logfile_name_with_suffix(master_info_file_tmp,
3376                                   sizeof(master_info_file_tmp),
3377                                   master_info_file, 0,
3378                                   &mi->cmp_connection_name);
3379   create_logfile_name_with_suffix(relay_log_info_file_tmp,
3380                                   sizeof(relay_log_info_file_tmp),
3381                                   relay_log_info_file, 0,
3382                                   &mi->cmp_connection_name);
3383 
3384   fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32);
3385   if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
3386       mysql_file_delete(key_file_master_info, fname, MYF(MY_WME)))
3387   {
3388     error=1;
3389     goto err;
3390   }
3391   else if (global_system_variables.log_warnings > 1)
3392     sql_print_information("Deleted Master_info file '%s'.", fname);
3393 
3394   // delete relay_log_info_file
3395   fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32);
3396   if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) &&
3397       mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME)))
3398   {
3399     error=1;
3400     goto err;
3401   }
3402   else if (global_system_variables.log_warnings > 1)
3403     sql_print_information("Deleted Master_info file '%s'.", fname);
3404 
3405   if (rpl_semi_sync_slave_enabled)
3406     repl_semisync_slave.reset_slave(mi);
3407 err:
3408   mi->unlock_slave_threads();
3409   if (unlikely(error))
3410     my_error(sql_errno, MYF(0), errmsg);
3411   DBUG_RETURN(error);
3412 }
3413 
3414 /*
3415 
3416   Kill all Binlog_dump threads which previously talked to the same slave
3417   ("same" means with the same server id). Indeed, if the slave stops, if the
3418   Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it
3419   will keep existing until a query is written to the binlog. If the master is
3420   idle, then this could last long, and if the slave reconnects, we could have 2
3421   Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
3422   binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
3423   the master kills any existing thread with the slave's server id (if this id
3424   is not zero; it will be true for real slaves, but false for mysqlbinlog when
3425   it sends COM_BINLOG_DUMP to get a remote binlog dump).
3426 
3427   SYNOPSIS
3428     kill_zombie_dump_threads()
3429     slave_server_id     the slave's server id
3430 */
3431 
3432 void kill_zombie_dump_threads(uint32 slave_server_id)
3433 {
3434   mysql_mutex_lock(&LOCK_thread_count);
3435   I_List_iterator<THD> it(threads);
3436   THD *tmp;
3437 
3438   while ((tmp=it++))
3439   {
3440     if (tmp->get_command() == COM_BINLOG_DUMP &&
3441        tmp->variables.server_id == slave_server_id)
3442     {
3443       mysql_mutex_lock(&tmp->LOCK_thd_kill);    // Lock from delete
3444       break;
3445     }
3446   }
3447   mysql_mutex_unlock(&LOCK_thread_count);
3448   if (tmp)
3449   {
3450     /*
3451       Here we do not call kill_one_thread() as
3452       it will be slow because it will iterate through the list
3453       again. We just to do kill the thread ourselves.
3454     */
3455     tmp->awake_no_mutex(KILL_SLAVE_SAME_ID);
3456     mysql_mutex_unlock(&tmp->LOCK_thd_kill);
3457   }
3458 }
3459 
3460 /**
3461    Get value for a string parameter with error checking
3462 
3463    Note that in case of error the original string should not be updated!
3464 
3465    @ret 0 ok
3466    @ret 1 error
3467 */
3468 
3469 static bool get_string_parameter(char *to, const char *from, size_t length,
3470                                  const char *name, CHARSET_INFO *cs)
3471 {
3472   if (from)                                     // Empty paramaters allowed
3473   {
3474     size_t from_length= strlen(from);
3475     size_t from_numchars= cs->cset->numchars(cs, from, from + from_length);
3476     if (from_numchars > length / cs->mbmaxlen)
3477     {
3478       my_error(ER_WRONG_STRING_LENGTH, MYF(0), from, name,
3479                (int) (length / cs->mbmaxlen));
3480       return 1;
3481     }
3482     memcpy(to, from, from_length+1);
3483   }
3484   return 0;
3485 }
3486 
3487 
3488 /**
3489   Execute a CHANGE MASTER statement.
3490 
3491   @param thd Pointer to THD object for the client thread executing the
3492   statement.
3493 
3494   @param mi Pointer to Master_info object belonging to the slave's IO
3495   thread.
3496 
3497   @param master_info_added Out parameter saying if the Master_info *mi was
3498   added to the global list of masters. This is useful in error conditions
3499   to know if caller should free Master_info *mi.
3500 
3501   @retval FALSE success
3502   @retval TRUE error
3503 */
3504 bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
3505 {
3506   int thread_mask;
3507   const char* errmsg= 0;
3508   bool need_relay_log_purge= 1;
3509   bool ret= FALSE;
3510   char saved_host[HOSTNAME_LENGTH + 1];
3511   uint saved_port;
3512   char saved_log_name[FN_REFLEN];
3513   Master_info::enum_using_gtid saved_using_gtid;
3514   char master_info_file_tmp[FN_REFLEN];
3515   char relay_log_info_file_tmp[FN_REFLEN];
3516   my_off_t saved_log_pos;
3517   LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
3518   DYNAMIC_ARRAY *do_ids, *ignore_ids;
3519 
3520   DBUG_ENTER("change_master");
3521 
3522   DBUG_ASSERT(master_info_index);
3523   mysql_mutex_assert_owner(&LOCK_active_mi);
3524 
3525   *master_info_added= false;
3526   /*
3527     We need to check if there is an empty master_host. Otherwise
3528     change master succeeds, a master.info file is created containing
3529     empty master_host string and when issuing: start slave; an error
3530     is thrown stating that the server is not configured as slave.
3531     (See BUG#28796).
3532   */
3533   if (lex_mi->host && !*lex_mi->host)
3534   {
3535     my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
3536     DBUG_RETURN(TRUE);
3537   }
3538   if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name,
3539                                                      lex_mi->host,
3540                                                      lex_mi->port))
3541     DBUG_RETURN(TRUE);
3542 
3543   mi->lock_slave_threads();
3544   if (mi->killed)
3545   {
3546     /* connection was deleted while we waited for lock_slave_threads */
3547     mi->unlock_slave_threads();
3548     my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length,
3549              mi->connection_name.str);
3550     DBUG_RETURN(TRUE);
3551   }
3552 
3553   init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
3554   if (thread_mask) // We refuse if any slave thread is running
3555   {
3556     my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
3557              mi->connection_name.str);
3558     ret= TRUE;
3559     goto err;
3560   }
3561 
3562   THD_STAGE_INFO(thd, stage_changing_master);
3563 
3564   create_logfile_name_with_suffix(master_info_file_tmp,
3565                                   sizeof(master_info_file_tmp),
3566                                   master_info_file, 0,
3567                                   &mi->cmp_connection_name);
3568   create_logfile_name_with_suffix(relay_log_info_file_tmp,
3569                                   sizeof(relay_log_info_file_tmp),
3570                                   relay_log_info_file, 0,
3571                                   &mi->cmp_connection_name);
3572 
3573   /* if new Master_info doesn't exists, add it */
3574   if (!master_info_index->get_master_info(&mi->connection_name,
3575                                           Sql_condition::WARN_LEVEL_NOTE))
3576   {
3577     if (master_info_index->add_master_info(mi, TRUE))
3578     {
3579       my_error(ER_MASTER_INFO, MYF(0),
3580                (int) lex_mi->connection_name.length,
3581                lex_mi->connection_name.str);
3582       ret= TRUE;
3583       goto err;
3584     }
3585     *master_info_added= true;
3586   }
3587   if (global_system_variables.log_warnings > 1)
3588     sql_print_information("Master connection name: '%.*s'  "
3589                           "Master_info_file: '%s'  "
3590                           "Relay_info_file: '%s'",
3591                           (int) mi->connection_name.length,
3592                           mi->connection_name.str,
3593                           master_info_file_tmp, relay_log_info_file_tmp);
3594 
3595   if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0,
3596 		       thread_mask))
3597   {
3598     my_error(ER_MASTER_INFO, MYF(0),
3599              (int) lex_mi->connection_name.length,
3600              lex_mi->connection_name.str);
3601     ret= TRUE;
3602     goto err;
3603   }
3604 
3605   /*
3606     Data lock not needed since we have already stopped the running threads,
3607     and we have the hold on the run locks which will keep all threads that
3608     could possibly modify the data structures from running
3609   */
3610 
3611   /*
3612     Before processing the command, save the previous state.
3613   */
3614   strmake_buf(saved_host, mi->host);
3615   saved_port= mi->port;
3616   strmake_buf(saved_log_name, mi->master_log_name);
3617   saved_log_pos= mi->master_log_pos;
3618   saved_using_gtid= mi->using_gtid;
3619 
3620   /*
3621     If the user specified host or port without binlog or position,
3622     reset binlog's name to FIRST and position to 4.
3623   */
3624 
3625   if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
3626   {
3627     mi->master_log_name[0] = 0;
3628     mi->master_log_pos= BIN_LOG_HEADER_SIZE;
3629   }
3630 
3631   if (lex_mi->log_file_name)
3632     strmake_buf(mi->master_log_name, lex_mi->log_file_name);
3633   if (lex_mi->pos)
3634   {
3635     mi->master_log_pos= lex_mi->pos;
3636   }
3637   DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3638 
3639   if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1,
3640                            "MASTER_HOST", system_charset_info) ||
3641       get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1,
3642                            "MASTER_USER", system_charset_info) ||
3643       get_string_parameter(mi->password, lex_mi->password,
3644                            sizeof(mi->password)-1, "MASTER_PASSWORD",
3645                            &my_charset_bin))
3646   {
3647     ret= TRUE;
3648     goto err;
3649   }
3650 
3651   if (lex_mi->port)
3652     mi->port = lex_mi->port;
3653   if (lex_mi->connect_retry)
3654     mi->connect_retry = lex_mi->connect_retry;
3655   if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
3656     mi->heartbeat_period = lex_mi->heartbeat_period;
3657   else
3658     mi->heartbeat_period= (float) MY_MIN(SLAVE_MAX_HEARTBEAT_PERIOD,
3659                                       (slave_net_timeout/2.0));
3660   mi->received_heartbeats= 0; // counter lives until master is CHANGEd
3661 
3662   /*
3663     Reset the last time server_id list if the current CHANGE MASTER
3664     is mentioning IGNORE_SERVER_IDS= (...)
3665   */
3666   if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
3667   {
3668     /* Check if the list contains replicate_same_server_id */
3669     for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i ++)
3670     {
3671       ulong s_id;
3672       get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
3673       if (s_id == global_system_variables.server_id && replicate_same_server_id)
3674       {
3675         my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
3676         ret= TRUE;
3677         goto err;
3678       }
3679     }
3680 
3681     /* All ok. Update the old server ids with the new ones. */
3682     update_change_master_ids(&lex_mi->repl_ignore_server_ids,
3683                              &mi->ignore_server_ids);
3684   }
3685 
3686   if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
3687     mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
3688 
3689   if (lex_mi->sql_delay != -1)
3690     mi->rli.set_sql_delay(lex_mi->sql_delay);
3691 
3692   if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
3693     mi->ssl_verify_server_cert=
3694       (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
3695 
3696   if (lex_mi->ssl_ca)
3697     strmake_buf(mi->ssl_ca, lex_mi->ssl_ca);
3698   if (lex_mi->ssl_capath)
3699     strmake_buf(mi->ssl_capath, lex_mi->ssl_capath);
3700   if (lex_mi->ssl_cert)
3701     strmake_buf(mi->ssl_cert, lex_mi->ssl_cert);
3702   if (lex_mi->ssl_cipher)
3703     strmake_buf(mi->ssl_cipher, lex_mi->ssl_cipher);
3704   if (lex_mi->ssl_key)
3705     strmake_buf(mi->ssl_key, lex_mi->ssl_key);
3706   if (lex_mi->ssl_crl)
3707     strmake_buf(mi->ssl_crl, lex_mi->ssl_crl);
3708   if (lex_mi->ssl_crlpath)
3709     strmake_buf(mi->ssl_crlpath, lex_mi->ssl_crlpath);
3710 
3711 #ifndef HAVE_OPENSSL
3712   if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
3713       lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
3714       lex_mi->ssl_verify_server_cert || lex_mi->ssl_crl || lex_mi->ssl_crlpath)
3715     push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
3716                  ER_SLAVE_IGNORED_SSL_PARAMS,
3717                  ER_THD(thd, ER_SLAVE_IGNORED_SSL_PARAMS));
3718 #endif
3719 
3720   if (lex_mi->relay_log_name)
3721   {
3722     need_relay_log_purge= 0;
3723     char relay_log_name[FN_REFLEN];
3724     mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name);
3725     strmake_buf(mi->rli.group_relay_log_name, relay_log_name);
3726     strmake_buf(mi->rli.event_relay_log_name, relay_log_name);
3727   }
3728 
3729   if (lex_mi->relay_log_pos)
3730   {
3731     need_relay_log_purge= 0;
3732     mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
3733   }
3734 
3735   if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_SLAVE_POS)
3736     mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
3737   else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_CURRENT_POS)
3738     mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
3739   else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_NO ||
3740            lex_mi->log_file_name || lex_mi->pos ||
3741            lex_mi->relay_log_name || lex_mi->relay_log_pos)
3742     mi->using_gtid= Master_info::USE_GTID_NO;
3743 
3744   do_ids= ((lex_mi->repl_do_domain_ids_opt ==
3745             LEX_MASTER_INFO::LEX_MI_ENABLE) ?
3746            &lex_mi->repl_do_domain_ids : NULL);
3747 
3748   ignore_ids= ((lex_mi->repl_ignore_domain_ids_opt ==
3749                 LEX_MASTER_INFO::LEX_MI_ENABLE) ?
3750                &lex_mi->repl_ignore_domain_ids : NULL);
3751 
3752   /*
3753     Note: mi->using_gtid stores the previous state in case no MASTER_USE_GTID
3754     is specified.
3755   */
3756   if (mi->domain_id_filter.update_ids(do_ids, ignore_ids, mi->using_gtid))
3757   {
3758     my_error(ER_MASTER_INFO, MYF(0),
3759              (int) lex_mi->connection_name.length,
3760              lex_mi->connection_name.str);
3761     ret= TRUE;
3762     goto err;
3763   }
3764 
3765   /*
3766     If user did specify neither host nor port nor any log name nor any log
3767     pos, i.e. he specified only user/password/master_connect_retry, he probably
3768     wants replication to resume from where it had left, i.e. from the
3769     coordinates of the **SQL** thread (imagine the case where the I/O is ahead
3770     of the SQL; restarting from the coordinates of the I/O would lose some
3771     events which is probably unwanted when you are just doing minor changes
3772     like changing master_connect_retry).
3773     A side-effect is that if only the I/O thread was started, this thread may
3774     restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
3775     much more unlikely situation than the one we are fixing here).
3776     Note: coordinates of the SQL thread must be read here, before the
3777     'if (need_relay_log_purge)' block which resets them.
3778   */
3779   if (!lex_mi->host && !lex_mi->port &&
3780       !lex_mi->log_file_name && !lex_mi->pos &&
3781       need_relay_log_purge)
3782    {
3783      /*
3784        Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
3785        not initialized), so we use a MY_MAX().
3786        What happens to mi->rli.master_log_pos during the initialization stages
3787        of replication is not 100% clear, so we guard against problems using
3788        MY_MAX().
3789       */
3790      mi->master_log_pos = MY_MAX(BIN_LOG_HEADER_SIZE,
3791 			      mi->rli.group_master_log_pos);
3792      strmake_buf(mi->master_log_name, mi->rli.group_master_log_name);
3793   }
3794 
3795   /*
3796     Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
3797     a slave before).
3798   */
3799   if (flush_master_info(mi, FALSE, FALSE))
3800   {
3801     my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
3802     ret= TRUE;
3803     goto err;
3804   }
3805   if (need_relay_log_purge)
3806   {
3807     THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
3808     if (purge_relay_logs(&mi->rli, thd,
3809 			 0 /* not only reset, but also reinit */,
3810 			 &errmsg))
3811     {
3812       my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
3813       ret= TRUE;
3814       goto err;
3815     }
3816   }
3817   else
3818   {
3819     const char* msg;
3820     /* Relay log is already initialized */
3821     if (init_relay_log_pos(&mi->rli,
3822 			   mi->rli.group_relay_log_name,
3823 			   mi->rli.group_relay_log_pos,
3824 			   0 /*no data lock*/,
3825 			   &msg, 0))
3826     {
3827       my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
3828       ret= TRUE;
3829       goto err;
3830     }
3831   }
3832   /*
3833     Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
3834     so restore them to good values. If we left them to ''/0, that would work;
3835     but that would fail in the case of 2 successive CHANGE MASTER (without a
3836     START SLAVE in between): because first one would set the coords in mi to
3837     the good values of those in rli, the set those in rli to ''/0, then
3838     second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
3839     ''/0: we have lost all copies of the original good coordinates.
3840     That's why we always save good coords in rli.
3841   */
3842   mi->rli.group_master_log_pos= mi->master_log_pos;
3843   DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3844   strmake_buf(mi->rli.group_master_log_name,mi->master_log_name);
3845 
3846   if (!mi->rli.group_master_log_name[0]) // uninitialized case
3847     mi->rli.group_master_log_pos=0;
3848 
3849   mysql_mutex_lock(&mi->rli.data_lock);
3850   mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
3851   /* Clear the errors, for a clean start */
3852   mi->rli.clear_error();
3853   mi->rli.clear_until_condition();
3854   mi->rli.slave_skip_counter= 0;
3855 
3856   sql_print_information("'CHANGE MASTER TO executed'. "
3857     "Previous state master_host='%s', master_port='%u', master_log_file='%s', "
3858     "master_log_pos='%ld'. "
3859     "New state master_host='%s', master_port='%u', master_log_file='%s', "
3860     "master_log_pos='%ld'.", saved_host, saved_port, saved_log_name,
3861     (ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name,
3862     (ulong) mi->master_log_pos);
3863   if (saved_using_gtid != Master_info::USE_GTID_NO ||
3864       mi->using_gtid != Master_info::USE_GTID_NO)
3865     sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s",
3866                           mi->using_gtid_astext(saved_using_gtid),
3867                           mi->using_gtid_astext(mi->using_gtid));
3868 
3869   /*
3870     If we don't write new coordinates to disk now, then old will remain in
3871     relay-log.info until START SLAVE is issued; but if mysqld is shutdown
3872     before START SLAVE, then old will remain in relay-log.info, and will be the
3873     in-memory value at restart (thus causing errors, as the old relay log does
3874     not exist anymore).
3875   */
3876   if (mi->rli.flush())
3877     ret= 1;
3878   mysql_cond_broadcast(&mi->data_cond);
3879   mysql_mutex_unlock(&mi->rli.data_lock);
3880 
3881 err:
3882   mi->unlock_slave_threads();
3883   if (ret == FALSE)
3884     my_ok(thd);
3885   else
3886   {
3887     /*
3888       Depending on where CHANGE MASTER failed, the logs may be waiting to be
3889       reopened. This would break future log updates and CHANGE MASTER calls.
3890       `try_fix_log_state()` allows the relay log to fix its state to no longer
3891       expect to be reopened.
3892     */
3893     mi->rli.relay_log.try_fix_log_state();
3894   }
3895   DBUG_RETURN(ret);
3896 }
3897 
3898 
3899 /**
3900   Execute a RESET MASTER statement.
3901 
3902   @param thd Pointer to THD object of the client thread executing the
3903   statement.
3904 
3905   @retval 0 success
3906   @retval 1 error
3907 */
3908 int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
3909                  ulong next_log_number)
3910 {
3911   if (!mysql_bin_log.is_open())
3912   {
3913     my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
3914                ER_THD(thd, ER_FLUSH_MASTER_BINLOG_CLOSED),
3915                MYF(ME_BELL+ME_WAITTANG));
3916     return 1;
3917   }
3918 
3919   bool ret= 0;
3920   /* Temporarily disable master semisync before resetting master. */
3921   repl_semisync_master.before_reset_master();
3922   ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
3923                                 next_log_number);
3924   repl_semisync_master.after_reset_master();
3925   return ret;
3926 }
3927 
3928 
3929 /**
3930   Execute a SHOW BINLOG EVENTS statement.
3931 
3932   @param thd Pointer to THD object for the client thread executing the
3933   statement.
3934 
3935   @retval FALSE success
3936   @retval TRUE failure
3937 */
3938 bool mysql_show_binlog_events(THD* thd)
3939 {
3940   Protocol *protocol= thd->protocol;
3941   List<Item> field_list;
3942   char errmsg_buf[MYSYS_ERRMSG_SIZE];
3943   const char *errmsg = 0;
3944   bool ret = TRUE;
3945   /*
3946      Using checksum validate the correctness of event pos specified in show
3947      binlog events command.
3948   */
3949   bool verify_checksum_once= false;
3950   IO_CACHE log;
3951   File file = -1;
3952   MYSQL_BIN_LOG *binary_log= NULL;
3953   int old_max_allowed_packet= thd->variables.max_allowed_packet;
3954   Master_info *mi= 0;
3955   LOG_INFO linfo;
3956   LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
3957   enum enum_binlog_checksum_alg checksum_alg;
3958   my_off_t binlog_size;
3959   MY_STAT s;
3960 
3961   DBUG_ENTER("mysql_show_binlog_events");
3962 
3963   Log_event::init_show_field_list(thd, &field_list);
3964   if (protocol->send_result_set_metadata(&field_list,
3965                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
3966     DBUG_RETURN(TRUE);
3967 
3968   DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
3969               thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
3970 
3971   /* select which binary log to use: binlog or relay */
3972   if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS )
3973   {
3974     binary_log= &mysql_bin_log;
3975   }
3976   else  /* showing relay log contents */
3977   {
3978     if (!lex_mi->connection_name.str)
3979       lex_mi->connection_name= thd->variables.default_master_connection;
3980     if (!(mi= get_master_info(&lex_mi->connection_name,
3981                               Sql_condition::WARN_LEVEL_ERROR)))
3982     {
3983       DBUG_RETURN(TRUE);
3984     }
3985     binary_log= &(mi->rli.relay_log);
3986   }
3987 
3988   Format_description_log_event *description_event= new
3989     Format_description_log_event(3); /* MySQL 4.0 by default */
3990 
3991   if (binary_log->is_open())
3992   {
3993     SELECT_LEX_UNIT *unit= &thd->lex->unit;
3994     ha_rows event_count, limit_start, limit_end;
3995     my_off_t pos = MY_MAX(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
3996     char search_file_name[FN_REFLEN], *name;
3997     const char *log_file_name = lex_mi->log_file_name;
3998     mysql_mutex_t *log_lock = binary_log->get_log_lock();
3999     Log_event* ev;
4000 
4001     if (mi)
4002     {
4003       /* We can unlock the mutex as we have a lock on the file */
4004       mi->release();
4005       mi= 0;
4006     }
4007 
4008     unit->set_limit(thd->lex->current_select);
4009     limit_start= unit->offset_limit_cnt;
4010     limit_end= unit->select_limit_cnt;
4011 
4012     name= search_file_name;
4013     if (log_file_name)
4014       binary_log->make_log_name(search_file_name, log_file_name);
4015     else
4016       name=0;					// Find first log
4017 
4018     linfo.index_file_offset = 0;
4019 
4020     if (binary_log->find_log_pos(&linfo, name, 1))
4021     {
4022       errmsg = "Could not find target log";
4023       goto err;
4024     }
4025 
4026     thd->current_linfo= &linfo;
4027 
4028     if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
4029       goto err;
4030 
4031     my_stat(linfo.log_file_name, &s, MYF(0));
4032     binlog_size= s.st_size;
4033     if (lex_mi->pos > binlog_size)
4034     {
4035       sprintf(errmsg_buf, "Invalid pos specified. Requested from pos:%llu is "
4036               "greater than actual file size:%lu\n", lex_mi->pos,
4037               (ulong)s.st_size);
4038       errmsg= errmsg_buf;
4039       goto err;
4040     }
4041 
4042     /*
4043       to account binlog event header size
4044     */
4045     thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
4046 
4047     mysql_mutex_lock(log_lock);
4048 
4049     /*
4050       open_binlog() sought to position 4.
4051       Read the first event in case it's a Format_description_log_event, to
4052       know the format. If there's no such event, we are 3.23 or 4.x. This
4053       code, like before, can't read 3.23 binlogs.
4054       Also read the second event, in case it's a Start_encryption_log_event.
4055       This code will fail on a mixed relay log (one which has Format_desc then
4056       Rotate then Format_desc).
4057     */
4058 
4059     my_off_t scan_pos = BIN_LOG_HEADER_SIZE;
4060     while (scan_pos < pos)
4061     {
4062       ev= Log_event::read_log_event(&log, description_event,
4063                                     opt_master_verify_checksum);
4064       scan_pos = my_b_tell(&log);
4065       if (ev == NULL || !ev->is_valid())
4066       {
4067         mysql_mutex_unlock(log_lock);
4068         errmsg = "Wrong offset or I/O error";
4069         goto err;
4070       }
4071       if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
4072       {
4073         delete description_event;
4074         description_event= (Format_description_log_event*) ev;
4075       }
4076       else
4077       {
4078         if (ev->get_type_code() == START_ENCRYPTION_EVENT)
4079         {
4080           if (description_event->start_decryption((Start_encryption_log_event*) ev))
4081           {
4082             delete ev;
4083             mysql_mutex_unlock(log_lock);
4084             errmsg = "Could not initialize decryption of binlog.";
4085             goto err;
4086           }
4087         }
4088         delete ev;
4089         break;
4090       }
4091     }
4092 
4093     if (lex_mi->pos > BIN_LOG_HEADER_SIZE)
4094     {
4095       checksum_alg= description_event->checksum_alg;
4096       /* Validate user given position using checksum */
4097       if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
4098           checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
4099       {
4100         if (!opt_master_verify_checksum)
4101           verify_checksum_once= true;
4102         my_b_seek(&log, pos);
4103       }
4104       else
4105       {
4106         my_off_t cur_pos= my_b_tell(&log);
4107         ulong next_event_len= 0;
4108         uchar buff[IO_SIZE];
4109         while (cur_pos < pos)
4110         {
4111           my_b_seek(&log, cur_pos + EVENT_LEN_OFFSET);
4112           if (my_b_read(&log, (uchar *)buff, sizeof(next_event_len)))
4113           {
4114             mysql_mutex_unlock(log_lock);
4115             errmsg = "Could not read event_length";
4116             goto err;
4117           }
4118           next_event_len= uint4korr(buff);
4119           cur_pos= cur_pos + next_event_len;
4120         }
4121         if (cur_pos > pos)
4122         {
4123           mysql_mutex_unlock(log_lock);
4124           errmsg= "Invalid input pos specified please provide valid one.";
4125           goto err;
4126         }
4127         my_b_seek(&log, cur_pos);
4128       }
4129     }
4130 
4131     for (event_count = 0;
4132          (ev = Log_event::read_log_event(&log,
4133                                          description_event,
4134                                          (opt_master_verify_checksum ||
4135                                           verify_checksum_once))); )
4136     {
4137       if (event_count >= limit_start &&
4138           ev->net_send(protocol, linfo.log_file_name, pos))
4139       {
4140         errmsg = "Net error";
4141         delete ev;
4142         mysql_mutex_unlock(log_lock);
4143         goto err;
4144       }
4145 
4146       if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
4147       {
4148         Format_description_log_event* new_fdle=
4149           (Format_description_log_event*) ev;
4150         new_fdle->copy_crypto_data(description_event);
4151         delete description_event;
4152         description_event= new_fdle;
4153       }
4154       else
4155       {
4156         if (ev->get_type_code() == START_ENCRYPTION_EVENT)
4157         {
4158           if (description_event->start_decryption((Start_encryption_log_event*) ev))
4159           {
4160             errmsg = "Error starting decryption";
4161             delete ev;
4162             mysql_mutex_unlock(log_lock);
4163             goto err;
4164           }
4165         }
4166         delete ev;
4167       }
4168 
4169       verify_checksum_once= false;
4170       pos = my_b_tell(&log);
4171 
4172       if (++event_count >= limit_end)
4173         break;
4174     }
4175 
4176     if (unlikely(event_count < limit_end && log.error))
4177     {
4178       errmsg = "Wrong offset or I/O error";
4179       mysql_mutex_unlock(log_lock);
4180       goto err;
4181     }
4182 
4183     mysql_mutex_unlock(log_lock);
4184   }
4185   else if (mi)
4186     mi->release();
4187 
4188   // Check that linfo is still on the function scope.
4189   DEBUG_SYNC(thd, "after_show_binlog_events");
4190 
4191   ret= FALSE;
4192 
4193 err:
4194   delete description_event;
4195   if (file >= 0)
4196   {
4197     end_io_cache(&log);
4198     mysql_file_close(file, MYF(MY_WME));
4199   }
4200 
4201   if (errmsg)
4202     my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
4203              "SHOW BINLOG EVENTS", errmsg);
4204   else
4205     my_eof(thd);
4206 
4207   thd->reset_current_linfo();
4208   thd->variables.max_allowed_packet= old_max_allowed_packet;
4209   DBUG_RETURN(ret);
4210 }
4211 
4212 
4213 void show_binlog_info_get_fields(THD *thd, List<Item> *field_list)
4214 {
4215   MEM_ROOT *mem_root= thd->mem_root;
4216   field_list->push_back(new (mem_root)
4217                         Item_empty_string(thd, "File", FN_REFLEN),
4218                         mem_root);
4219   field_list->push_back(new (mem_root)
4220                         Item_return_int(thd, "Position", 20,
4221                                         MYSQL_TYPE_LONGLONG),
4222                         mem_root);
4223   field_list->push_back(new (mem_root)
4224                         Item_empty_string(thd, "Binlog_Do_DB", 255),
4225                         mem_root);
4226   field_list->push_back(new (mem_root)
4227                         Item_empty_string(thd, "Binlog_Ignore_DB", 255),
4228                         mem_root);
4229 }
4230 
4231 
4232 /**
4233   Execute a SHOW MASTER STATUS statement.
4234 
4235   @param thd Pointer to THD object for the client thread executing the
4236   statement.
4237 
4238   @retval FALSE success
4239   @retval TRUE failure
4240 */
4241 bool show_binlog_info(THD* thd)
4242 {
4243   Protocol *protocol= thd->protocol;
4244   DBUG_ENTER("show_binlog_info");
4245 
4246   List<Item> field_list;
4247   show_binlog_info_get_fields(thd, &field_list);
4248 
4249   if (protocol->send_result_set_metadata(&field_list,
4250                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
4251     DBUG_RETURN(TRUE);
4252   protocol->prepare_for_resend();
4253 
4254   if (mysql_bin_log.is_open())
4255   {
4256     LOG_INFO li;
4257     mysql_bin_log.get_current_log(&li);
4258     size_t dir_len = dirname_length(li.log_file_name);
4259     protocol->store(li.log_file_name + dir_len, &my_charset_bin);
4260     protocol->store((ulonglong) li.pos);
4261     protocol->store(binlog_filter->get_do_db());
4262     protocol->store(binlog_filter->get_ignore_db());
4263     if (protocol->write())
4264       DBUG_RETURN(TRUE);
4265   }
4266   my_eof(thd);
4267   DBUG_RETURN(FALSE);
4268 }
4269 
4270 
4271 void show_binlogs_get_fields(THD *thd, List<Item> *field_list)
4272 {
4273   MEM_ROOT *mem_root= thd->mem_root;
4274   field_list->push_back(new (mem_root)
4275                         Item_empty_string(thd, "Log_name", 255),
4276                         mem_root);
4277   field_list->push_back(new (mem_root)
4278                         Item_return_int(thd, "File_size", 20,
4279                                         MYSQL_TYPE_LONGLONG),
4280                         mem_root);
4281 }
4282 
4283 
4284 /**
4285   Execute a SHOW BINARY LOGS statement.
4286 
4287   @param thd Pointer to THD object for the client thread executing the
4288   statement.
4289 
4290   @retval FALSE success
4291   @retval TRUE failure
4292 */
4293 bool show_binlogs(THD* thd)
4294 {
4295   IO_CACHE *index_file;
4296   LOG_INFO cur;
4297   File file;
4298   char fname[FN_REFLEN];
4299   List<Item> field_list;
4300   size_t length;
4301   size_t cur_dir_len;
4302   Protocol *protocol= thd->protocol;
4303   DBUG_ENTER("show_binlogs");
4304 
4305   if (!mysql_bin_log.is_open())
4306   {
4307     my_error(ER_NO_BINARY_LOGGING, MYF(0));
4308     DBUG_RETURN(TRUE);
4309   }
4310 
4311   show_binlogs_get_fields(thd, &field_list);
4312 
4313   if (protocol->send_result_set_metadata(&field_list,
4314                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
4315     DBUG_RETURN(TRUE);
4316 
4317   mysql_mutex_lock(mysql_bin_log.get_log_lock());
4318   mysql_bin_log.lock_index();
4319   index_file=mysql_bin_log.get_index_file();
4320 
4321   mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
4322   mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
4323 
4324   cur_dir_len= dirname_length(cur.log_file_name);
4325 
4326   reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
4327 
4328   /* The file ends with EOF or empty line */
4329   while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
4330   {
4331     size_t dir_len;
4332     ulonglong file_length= 0;                   // Length if open fails
4333     fname[--length] = '\0';                     // remove the newline
4334 
4335     protocol->prepare_for_resend();
4336     dir_len= dirname_length(fname);
4337     length-= dir_len;
4338     protocol->store(fname + dir_len, length, &my_charset_bin);
4339 
4340     if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
4341       file_length= cur.pos;  /* The active log, use the active position */
4342     else
4343     {
4344       /* this is an old log, open it and find the size */
4345       if ((file= mysql_file_open(key_file_binlog,
4346                                  fname, O_RDONLY | O_SHARE | O_BINARY,
4347                                  MYF(0))) >= 0)
4348       {
4349         file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
4350         mysql_file_close(file, MYF(0));
4351       }
4352     }
4353     protocol->store(file_length);
4354     if (protocol->write())
4355       goto err;
4356   }
4357   if (unlikely(index_file->error == -1))
4358     goto err;
4359   mysql_bin_log.unlock_index();
4360   my_eof(thd);
4361   DBUG_RETURN(FALSE);
4362 
4363 err:
4364   mysql_bin_log.unlock_index();
4365   DBUG_RETURN(TRUE);
4366 }
4367 
4368 /**
4369    Load data's io cache specific hook to be executed
4370    before a chunk of data is being read into the cache's buffer
4371    The fuction instantianates and writes into the binlog
4372    replication events along LOAD DATA processing.
4373 
4374    @param file  pointer to io-cache
4375    @retval 0 success
4376    @retval 1 failure
4377 */
4378 int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
4379 {
4380   DBUG_ENTER("log_loaded_block");
4381   LOAD_FILE_IO_CACHE *lf_info= static_cast<LOAD_FILE_IO_CACHE*>(file);
4382   uint block_len;
4383   /* buffer contains position where we started last read */
4384   uchar* buffer= (uchar*) my_b_get_buffer_start(file);
4385   uint max_event_size= lf_info->thd->variables.max_allowed_packet;
4386 
4387   if (lf_info->thd->is_current_stmt_binlog_format_row())
4388     goto ret;
4389   if (lf_info->last_pos_in_file != HA_POS_ERROR &&
4390       lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
4391     goto ret;
4392 
4393   for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
4394        buffer += MY_MIN(block_len, max_event_size),
4395        block_len -= MY_MIN(block_len, max_event_size))
4396   {
4397     lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
4398     if (lf_info->wrote_create_file)
4399     {
4400       Append_block_log_event a(lf_info->thd, lf_info->thd->db.str, buffer,
4401                                MY_MIN(block_len, max_event_size),
4402                                lf_info->log_delayed);
4403       if (mysql_bin_log.write(&a))
4404         DBUG_RETURN(1);
4405     }
4406     else
4407     {
4408       Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db.str,
4409                                    buffer,
4410                                    MY_MIN(block_len, max_event_size),
4411                                    lf_info->log_delayed);
4412       if (mysql_bin_log.write(&b))
4413         DBUG_RETURN(1);
4414       lf_info->wrote_create_file= 1;
4415     }
4416   }
4417 ret:
4418   int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
4419   DBUG_RETURN(res);
4420 }
4421 
4422 
4423 /**
4424    Initialise the slave replication state from the mysql.gtid_slave_pos table.
4425 
4426    This is called each time an SQL thread starts, but the data is only actually
4427    loaded on the first call.
4428 
4429    The slave state is the last GTID applied on the slave within each
4430    replication domain.
4431 
4432    To avoid row lock contention, there are multiple rows for each domain_id.
4433    The one containing the current slave state is the one with the maximal
4434    sub_id value, within each domain_id.
4435 
4436     CREATE TABLE mysql.gtid_slave_pos (
4437       domain_id INT UNSIGNED NOT NULL,
4438       sub_id BIGINT UNSIGNED NOT NULL,
4439       server_id INT UNSIGNED NOT NULL,
4440       seq_no BIGINT UNSIGNED NOT NULL,
4441       PRIMARY KEY (domain_id, sub_id))
4442 */
4443 
4444 void
4445 rpl_init_gtid_slave_state()
4446 {
4447   rpl_global_gtid_slave_state= new rpl_slave_state;
4448 }
4449 
4450 
4451 void
4452 rpl_deinit_gtid_slave_state()
4453 {
4454   delete rpl_global_gtid_slave_state;
4455 }
4456 
4457 
4458 void
4459 rpl_init_gtid_waiting()
4460 {
4461   rpl_global_gtid_waiting.init();
4462 }
4463 
4464 
4465 void
4466 rpl_deinit_gtid_waiting()
4467 {
4468   rpl_global_gtid_waiting.destroy();
4469 }
4470 
4471 
4472 /*
4473   Format the current GTID state as a string, for returning the value of
4474   @@global.gtid_slave_pos.
4475 
4476   If the flag use_binlog is true, then the contents of the binary log (if
4477   enabled) is merged into the current GTID state (@@global.gtid_current_pos).
4478 */
4479 int
4480 rpl_append_gtid_state(String *dest, bool use_binlog)
4481 {
4482   int err;
4483   rpl_gtid *gtid_list= NULL;
4484   uint32 num_gtids= 0;
4485 
4486   if (use_binlog && opt_bin_log &&
4487       (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
4488     return err;
4489 
4490   err= rpl_global_gtid_slave_state->tostring(dest, gtid_list, num_gtids);
4491   my_free(gtid_list);
4492 
4493   return err;
4494 }
4495 
4496 
4497 /*
4498   Load the current GTID position into a slave_connection_state, for use when
4499   connecting to a master server with GTID.
4500 
4501   If the flag use_binlog is true, then the contents of the binary log (if
4502   enabled) is merged into the current GTID state (master_use_gtid=current_pos).
4503 */
4504 int
4505 rpl_load_gtid_state(slave_connection_state *state, bool use_binlog)
4506 {
4507   int err;
4508   rpl_gtid *gtid_list= NULL;
4509   uint32 num_gtids= 0;
4510 
4511   if (use_binlog && opt_bin_log &&
4512       (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
4513     return err;
4514 
4515   err= state->load(rpl_global_gtid_slave_state, gtid_list, num_gtids);
4516   my_free(gtid_list);
4517 
4518   return err;
4519 }
4520 
4521 
4522 bool
4523 rpl_gtid_pos_check(THD *thd, char *str, size_t len)
4524 {
4525   slave_connection_state tmp_slave_state;
4526   bool gave_conflict_warning= false, gave_missing_warning= false;
4527 
4528   /* Check that we can parse the supplied string. */
4529   if (tmp_slave_state.load(str, len))
4530     return true;
4531 
4532   /*
4533     Check our own binlog for any of our own transactions that are newer
4534     than the GTID state the user is requesting. Any such transactions would
4535     result in an out-of-order binlog, which could break anyone replicating
4536     with us as master.
4537 
4538     So give an error if this is found, requesting the user to do a
4539     RESET MASTER (to clean up the binlog) if they really want this.
4540   */
4541   if (mysql_bin_log.is_open())
4542   {
4543     rpl_gtid *binlog_gtid_list= NULL;
4544     uint32 num_binlog_gtids= 0;
4545     uint32 i;
4546 
4547     if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
4548                                                 &num_binlog_gtids))
4549     {
4550       my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
4551       return true;
4552     }
4553     for (i= 0; i < num_binlog_gtids; ++i)
4554     {
4555       rpl_gtid *binlog_gtid= &binlog_gtid_list[i];
4556       rpl_gtid *slave_gtid;
4557       if (binlog_gtid->server_id != global_system_variables.server_id)
4558         continue;
4559       if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id)))
4560       {
4561         if (opt_gtid_strict_mode)
4562         {
4563           my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0),
4564                    binlog_gtid->domain_id, binlog_gtid->domain_id,
4565                    binlog_gtid->server_id, binlog_gtid->seq_no);
4566           break;
4567         }
4568         else if (!gave_missing_warning)
4569         {
4570           push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4571                               ER_MASTER_GTID_POS_MISSING_DOMAIN,
4572                               ER_THD(thd, ER_MASTER_GTID_POS_MISSING_DOMAIN),
4573                               binlog_gtid->domain_id, binlog_gtid->domain_id,
4574                               binlog_gtid->server_id, binlog_gtid->seq_no);
4575           gave_missing_warning= true;
4576         }
4577       }
4578       else if (slave_gtid->seq_no < binlog_gtid->seq_no)
4579       {
4580         if (opt_gtid_strict_mode)
4581         {
4582           my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
4583                    slave_gtid->domain_id, slave_gtid->server_id,
4584                    slave_gtid->seq_no, binlog_gtid->domain_id,
4585                    binlog_gtid->server_id, binlog_gtid->seq_no);
4586           break;
4587         }
4588         else if (!gave_conflict_warning)
4589         {
4590           push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4591                               ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG,
4592                               ER_THD(thd, ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG),
4593                               slave_gtid->domain_id, slave_gtid->server_id,
4594                               slave_gtid->seq_no, binlog_gtid->domain_id,
4595                               binlog_gtid->server_id, binlog_gtid->seq_no);
4596           gave_conflict_warning= true;
4597         }
4598       }
4599     }
4600     my_free(binlog_gtid_list);
4601     if (i != num_binlog_gtids)
4602       return true;
4603   }
4604 
4605   return false;
4606 }
4607 
4608 
4609 bool
4610 rpl_gtid_pos_update(THD *thd, char *str, size_t len)
4611 {
4612   if (rpl_global_gtid_slave_state->load(thd, str, len, true, true))
4613   {
4614     my_error(ER_FAILED_GTID_STATE_INIT, MYF(0));
4615     return true;
4616   }
4617   else
4618     return false;
4619 }
4620 
4621 int compare_log_name(const char *log_1, const char *log_2) {
4622   int res= 1;
4623   const char *ext1_str= strrchr(log_1, '.');
4624   const char *ext2_str= strrchr(log_2, '.');
4625   char file_name_1[255], file_name_2[255];
4626   strmake(file_name_1, log_1, (ext1_str - log_1));
4627   strmake(file_name_2, log_2, (ext2_str - log_2));
4628   char *endptr = NULL;
4629   res= strcmp(file_name_1, file_name_2);
4630   if (!res)
4631   {
4632     ulong ext1= strtoul(++ext1_str, &endptr, 10);
4633     ulong ext2= strtoul(++ext2_str, &endptr, 10);
4634     res= (ext1 > ext2 ? 1 : ((ext1 == ext2) ? 0 : -1));
4635   }
4636   return res;
4637 }
4638 
4639 #endif /* HAVE_REPLICATION */
4640