1 /* Copyright (c) 2010, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 
24 #include "sql_priv.h"
25 #include "unireg.h"
26 #include "sql_parse.h"                          // check_access
27 #include "global_threads.h"
28 #include "mutex_lock.h"                         // Mutex_lock
29 #ifdef HAVE_REPLICATION
30 
31 #include "sql_acl.h"                            // SUPER_ACL
32 #include "log_event.h"
33 #include "rpl_filter.h"
34 #include <my_dir.h>
35 #include "rpl_handler.h"
36 #include "rpl_master.h"
37 #include "debug_sync.h"
38 
39 int max_binlog_dump_events = 0; // unlimited
40 my_bool opt_sporadic_binlog_dump_fail = 0;
41 
42 #ifndef DBUG_OFF
43 static int binlog_dump_count = 0;
44 #endif
45 
46 #define SLAVE_LIST_CHUNK 128
47 #define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
48 HASH slave_list;
49 extern TYPELIB binlog_checksum_typelib;
50 
51 
52 #define get_object(p, obj, msg) \
53 {\
54   uint len; \
55   if (p >= p_end) \
56   { \
57     my_error(ER_MALFORMED_PACKET, MYF(0)); \
58     my_free(si); \
59     return 1; \
60   } \
61   len= (uint)*p++;  \
62   if (p + len > p_end || len >= sizeof(obj)) \
63   {\
64     errmsg= msg;\
65     goto err; \
66   }\
67   strmake(obj,(char*) p,len); \
68   p+= len; \
69 }\
70 
71 extern "C" uint32
slave_list_key(SLAVE_INFO * si,size_t * len,my_bool not_used MY_ATTRIBUTE ((unused)))72 *slave_list_key(SLAVE_INFO* si, size_t *len,
73 		my_bool not_used MY_ATTRIBUTE((unused)))
74 {
75   *len = 4;
76   return &si->server_id;
77 }
78 
slave_info_free(void * s)79 extern "C" void slave_info_free(void *s)
80 {
81   my_free(s);
82 }
83 
84 #ifdef HAVE_PSI_INTERFACE
85 static PSI_mutex_key key_LOCK_slave_list;
86 
87 static PSI_mutex_info all_slave_list_mutexes[]=
88 {
89   { &key_LOCK_slave_list, "LOCK_slave_list", PSI_FLAG_GLOBAL}
90 };
91 
init_all_slave_list_mutexes(void)92 static void init_all_slave_list_mutexes(void)
93 {
94   int count;
95 
96   count= array_elements(all_slave_list_mutexes);
97   mysql_mutex_register("sql", all_slave_list_mutexes, count);
98 }
99 #endif /* HAVE_PSI_INTERFACE */
100 
init_slave_list()101 void init_slave_list()
102 {
103 #ifdef HAVE_PSI_INTERFACE
104   init_all_slave_list_mutexes();
105 #endif
106 
107   my_hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0,
108                (my_hash_get_key) slave_list_key,
109                (my_hash_free_key) slave_info_free, 0);
110   mysql_mutex_init(key_LOCK_slave_list, &LOCK_slave_list, MY_MUTEX_INIT_FAST);
111 }
112 
end_slave_list()113 void end_slave_list()
114 {
115   /* No protection by a mutex needed as we are only called at shutdown */
116   if (my_hash_inited(&slave_list))
117   {
118     my_hash_free(&slave_list);
119     mysql_mutex_destroy(&LOCK_slave_list);
120   }
121 }
122 
123 /**
124   Register slave in 'slave_list' hash table.
125 
126   @return
127     0	ok
128   @return
129     1	Error.   Error message sent to client
130 */
131 
register_slave(THD * thd,uchar * packet,uint packet_length)132 int register_slave(THD* thd, uchar* packet, uint packet_length)
133 {
134   int res;
135   SLAVE_INFO *si;
136   uchar *p= packet, *p_end= packet + packet_length;
137   const char *errmsg= "Wrong parameters to function register_slave";
138 
139   if (check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0))
140     return 1;
141   if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
142     goto err2;
143 
144   /* 4 bytes for the server id */
145   if (p + 4 > p_end)
146   {
147     my_error(ER_MALFORMED_PACKET, MYF(0));
148     my_free(si);
149     return 1;
150   }
151 
152   thd->server_id= si->server_id= uint4korr(p);
153   p+= 4;
154   get_object(p,si->host, "Failed to register slave: too long 'report-host'");
155   get_object(p,si->user, "Failed to register slave: too long 'report-user'");
156   get_object(p,si->password, "Failed to register slave; too long 'report-password'");
157   if (p+10 > p_end)
158     goto err;
159   si->port= uint2korr(p);
160   p += 2;
161   /*
162      We need to by pass the bytes used in the fake rpl_recovery_rank
163      variable. It was removed in patch for BUG#13963. But this would
164      make a server with that patch unable to connect to an old master.
165      See: BUG#49259
166   */
167   p += 4;
168   if (!(si->master_id= uint4korr(p)))
169     si->master_id= server_id;
170   si->thd= thd;
171 
172   mysql_mutex_lock(&LOCK_slave_list);
173   unregister_slave(thd, false, false/*need_lock_slave_list=false*/);
174   res= my_hash_insert(&slave_list, (uchar*) si);
175   mysql_mutex_unlock(&LOCK_slave_list);
176   return res;
177 
178 err:
179   my_free(si);
180   my_message(ER_UNKNOWN_ERROR, errmsg, MYF(0)); /* purecov: inspected */
181 err2:
182   return 1;
183 }
184 
unregister_slave(THD * thd,bool only_mine,bool need_lock_slave_list)185 void unregister_slave(THD* thd, bool only_mine, bool need_lock_slave_list)
186 {
187   if (thd->server_id)
188   {
189     if (need_lock_slave_list)
190       mysql_mutex_lock(&LOCK_slave_list);
191     else
192       mysql_mutex_assert_owner(&LOCK_slave_list);
193 
194     SLAVE_INFO* old_si;
195     if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list,
196                                               (uchar*)&thd->server_id, 4)) &&
197 	(!only_mine || old_si->thd == thd))
198     my_hash_delete(&slave_list, (uchar*)old_si);
199 
200     if (need_lock_slave_list)
201       mysql_mutex_unlock(&LOCK_slave_list);
202   }
203 }
204 
205 
206 /**
207   Execute a SHOW SLAVE HOSTS statement.
208 
209   @param thd Pointer to THD object for the client thread executing the
210   statement.
211 
212   @retval FALSE success
213   @retval TRUE failure
214 */
show_slave_hosts(THD * thd)215 bool show_slave_hosts(THD* thd)
216 {
217   List<Item> field_list;
218   Protocol *protocol= thd->protocol;
219   DBUG_ENTER("show_slave_hosts");
220 
221   field_list.push_back(new Item_return_int("Server_id", 10,
222 					   MYSQL_TYPE_LONG));
223   field_list.push_back(new Item_empty_string("Host", 20));
224   if (opt_show_slave_auth_info)
225   {
226     field_list.push_back(new Item_empty_string("User",20));
227     field_list.push_back(new Item_empty_string("Password",20));
228   }
229   field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG));
230   field_list.push_back(new Item_return_int("Master_id", 10,
231 					   MYSQL_TYPE_LONG));
232   field_list.push_back(new Item_empty_string("Slave_UUID", UUID_LENGTH));
233 
234   if (protocol->send_result_set_metadata(&field_list,
235                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
236     DBUG_RETURN(TRUE);
237 
238   mysql_mutex_lock(&LOCK_slave_list);
239 
240   for (uint i = 0; i < slave_list.records; ++i)
241   {
242     SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list, i);
243     protocol->prepare_for_resend();
244     protocol->store((uint32) si->server_id);
245     protocol->store(si->host, &my_charset_bin);
246     if (opt_show_slave_auth_info)
247     {
248       protocol->store(si->user, &my_charset_bin);
249       protocol->store(si->password, &my_charset_bin);
250     }
251     protocol->store((uint32) si->port);
252     protocol->store((uint32) si->master_id);
253 
254     /* get slave's UUID */
255     String slave_uuid;
256     if (get_slave_uuid(si->thd, &slave_uuid))
257       protocol->store(slave_uuid.c_ptr_safe(), &my_charset_bin);
258     if (protocol->write())
259     {
260       mysql_mutex_unlock(&LOCK_slave_list);
261       DBUG_RETURN(TRUE);
262     }
263   }
264   mysql_mutex_unlock(&LOCK_slave_list);
265   my_eof(thd);
266   DBUG_RETURN(FALSE);
267 }
268 
269 
270 /**
271    Internal to mysql_binlog_send() routine that recalculates checksum for
272    a FD event (asserted) that needs additional arranment prior sending to slave.
273 */
fix_checksum(String * packet,ulong ev_offset)274 inline void fix_checksum(String *packet, ulong ev_offset)
275 {
276   /* recalculate the crc for this event */
277   uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
278   ha_checksum crc= my_checksum(0L, NULL, 0);
279   DBUG_ASSERT(data_len ==
280               LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
281               BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN);
282   crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len -
283                    BINLOG_CHECKSUM_LEN);
284   int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);
285 }
286 
287 
get_binlog_checksum_uservar(THD * thd)288 static user_var_entry * get_binlog_checksum_uservar(THD * thd)
289 {
290   LEX_STRING name=  { C_STRING_WITH_LEN("master_binlog_checksum")};
291   user_var_entry *entry=
292     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
293                                   name.length);
294   return entry;
295 }
296 
297 /**
298   Function for calling in mysql_binlog_send
299   to check if slave initiated checksum-handshake.
300 
301   @param[in]    thd  THD to access a user variable
302 
303   @return        TRUE if handshake took place, FALSE otherwise
304 */
305 
is_slave_checksum_aware(THD * thd)306 static bool is_slave_checksum_aware(THD * thd)
307 {
308   DBUG_ENTER("is_slave_checksum_aware");
309   user_var_entry *entry= get_binlog_checksum_uservar(thd);
310   DBUG_RETURN(entry? true  : false);
311 }
312 
313 /**
314   Function for calling in mysql_binlog_send
315   to get the value of @@binlog_checksum of the master at
316   time of checksum-handshake.
317 
318   The value tells the master whether to compute or not, and the slave
319   to verify or not the first artificial Rotate event's checksum.
320 
321   @param[in]    thd  THD to access a user variable
322 
323   @return       value of @@binlog_checksum alg according to
324                 @c enum enum_binlog_checksum_alg
325 */
326 
get_binlog_checksum_value_at_connect(THD * thd)327 static uint8 get_binlog_checksum_value_at_connect(THD * thd)
328 {
329   uint8 ret;
330 
331   DBUG_ENTER("get_binlog_checksum_value_at_connect");
332   user_var_entry *entry= get_binlog_checksum_uservar(thd);
333   if (!entry)
334   {
335     ret= BINLOG_CHECKSUM_ALG_UNDEF;
336   }
337   else
338   {
339     DBUG_ASSERT(entry->type() == STRING_RESULT);
340     String str;
341     uint dummy_errors;
342     str.copy(entry->ptr(), entry->length(), &my_charset_bin, &my_charset_bin,
343              &dummy_errors);
344     ret= (uint8) find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1;
345     DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg
346   }
347   DBUG_RETURN(ret);
348 }
349 
350 /*
351     fake_rotate_event() builds a fake (=which does not exist physically in any
352     binlog) Rotate event, which contains the name of the binlog we are going to
353     send to the slave (because the slave may not know it if it just asked for
354     MASTER_LOG_FILE='', MASTER_LOG_POS=4).
355     < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
356     After this version we always call it, so that a 3.23.58 slave can rely on
357     it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
358     zeros in the good positions which, by chance, make it possible for the 3.23
359     slave to detect that this event is unexpected) (this is luck which happens
360     because the master and slave disagree on the size of the header of
361     Log_event).
362 
363     Relying on the event length of the Rotate event instead of these
364     well-placed zeros was not possible as Rotate events have a variable-length
365     part.
366 */
367 
fake_rotate_event(NET * net,String * packet,char * log_file_name,ulonglong position,const char ** errmsg,uint8 checksum_alg_arg)368 static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
369                              ulonglong position, const char** errmsg,
370                              uint8 checksum_alg_arg)
371 {
372   DBUG_ENTER("fake_rotate_event");
373   char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
374 
375   /*
376     this Rotate is to be sent with checksum if and only if
377     slave's get_master_version_and_clock time handshake value
378     of master's @@global.binlog_checksum was TRUE
379   */
380 
381   my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
382     checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
383 
384   /*
385     'when' (the timestamp) is set to 0 so that slave could distinguish between
386     real and fake Rotate events (if necessary)
387   */
388   memset(header, 0, 4);
389   header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
390 
391   char* p = log_file_name+dirname_length(log_file_name);
392   uint ident_len = (uint) strlen(p);
393   ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
394     (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
395   int4store(header + SERVER_ID_OFFSET, server_id);
396   int4store(header + EVENT_LEN_OFFSET, event_len);
397   int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
398 
399   // TODO: check what problems this may cause and fix them
400   int4store(header + LOG_POS_OFFSET, 0);
401 
402   packet->append(header, sizeof(header));
403   int8store(buf+R_POS_OFFSET,position);
404   packet->append(buf, ROTATE_HEADER_LEN);
405   packet->append(p, ident_len);
406 
407   if (do_checksum)
408   {
409     char b[BINLOG_CHECKSUM_LEN];
410     ha_checksum crc= my_checksum(0L, NULL, 0);
411     crc= my_checksum(crc, (uchar*)header, sizeof(header));
412     crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
413     crc= my_checksum(crc, (uchar*)p, ident_len);
414     int4store(b, crc);
415     packet->append(b, sizeof(b));
416   }
417 
418   if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
419   {
420     *errmsg = "failed on my_net_write()";
421     DBUG_RETURN(-1);
422   }
423   DBUG_RETURN(0);
424 }
425 
426 /*
427   Reset thread transmit packet buffer for event sending
428 
429   This function allocates header bytes for event transmission, and
430   should be called before store the event data to the packet buffer.
431 */
reset_transmit_packet(THD * thd,ushort flags,ulong * ev_offset,const char ** errmsg,bool observe_transmission)432 static int reset_transmit_packet(THD *thd, ushort flags,
433                                  ulong *ev_offset, const char **errmsg,
434                                  bool observe_transmission)
435 {
436   int ret= 0;
437   String *packet= &thd->packet;
438 
439   /* reserve and set default header */
440   packet->length(0);
441   packet->set("\0", 1, &my_charset_bin);
442 
443   if (observe_transmission &&
444       RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
445   {
446     *errmsg= "Failed to run hook 'reserve_header'";
447     my_errno= ER_UNKNOWN_ERROR;
448     ret= 1;
449   }
450   *ev_offset= packet->length();
451   DBUG_PRINT("info", ("rpl_master.cc:reset_transmit_packet returns %d", ret));
452   return ret;
453 }
454 
send_file(THD * thd)455 static int send_file(THD *thd)
456 {
457   NET* net = &thd->net;
458   int fd = -1, error = 1;
459   size_t bytes;
460   char fname[FN_REFLEN+1];
461   const char *errmsg = 0;
462   int old_timeout;
463   unsigned long packet_len;
464   uchar buf[IO_SIZE];				// It's safe to alloc this
465   DBUG_ENTER("send_file");
466 
467   /*
468     The client might be slow loading the data, give him wait_timeout to do
469     the job
470   */
471   old_timeout= net->read_timeout;
472   my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
473 
474   /*
475     We need net_flush here because the client will not know it needs to send
476     us the file name until it has processed the load event entry
477   */
478   if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
479   {
480     errmsg = "while reading file name";
481     goto err;
482   }
483 
484   // terminate with \0 for fn_format
485   *((char*)net->read_pos +  packet_len) = 0;
486   fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
487   // this is needed to make replicate-ignore-db
488   if (!strcmp(fname,"/dev/null"))
489     goto end;
490 
491   if ((fd= mysql_file_open(key_file_send_file,
492                            fname, O_RDONLY, MYF(0))) < 0)
493   {
494     errmsg = "on open of file";
495     goto err;
496   }
497 
498   while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0)
499   {
500     if (my_net_write(net, buf, bytes))
501     {
502       errmsg = "while writing data to client";
503       goto err;
504     }
505   }
506 
507  end:
508   if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
509       (my_net_read(net) == packet_error))
510   {
511     errmsg = "while negotiating file transfer close";
512     goto err;
513   }
514   error = 0;
515 
516  err:
517   my_net_set_read_timeout(net, old_timeout);
518   if (fd >= 0)
519     mysql_file_close(fd, MYF(0));
520   if (errmsg)
521   {
522     sql_print_error("Failed in send_file() %s", errmsg);
523     DBUG_PRINT("error", ("%s", errmsg));
524   }
525   DBUG_RETURN(error);
526 }
527 
528 
test_for_non_eof_log_read_errors(int error,const char ** errmsg)529 int test_for_non_eof_log_read_errors(int error, const char **errmsg)
530 {
531   if (error == LOG_READ_EOF)
532     return 0;
533   my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
534   switch (error) {
535   case LOG_READ_BOGUS:
536     *errmsg = "bogus data in log event";
537     break;
538   case LOG_READ_TOO_LARGE:
539     *errmsg = "log event entry exceeded max_allowed_packet; \
540 Increase max_allowed_packet on master";
541     break;
542   case LOG_READ_IO:
543     *errmsg = "I/O error reading log event";
544     break;
545   case LOG_READ_MEM:
546     *errmsg = "memory allocation failed reading log event";
547     break;
548   case LOG_READ_TRUNC:
549     *errmsg = "binlog truncated in the middle of event; consider out of disk space on master";
550     break;
551   case LOG_READ_CHECKSUM_FAILURE:
552     *errmsg = "event read from binlog did not pass crc check";
553     break;
554   default:
555     *errmsg = "unknown error reading log event on the master";
556     break;
557   }
558   return error;
559 }
560 
561 
562 /**
563   An auxiliary function for calling in mysql_binlog_send
564   to initialize the heartbeat timeout in waiting for a binlogged event.
565 
566   @param[in]    thd  THD to access a user variable
567 
568   @return        heartbeat period an ulonglong of nanoseconds
569                  or zero if heartbeat was not demanded by slave
570 */
get_heartbeat_period(THD * thd)571 static ulonglong get_heartbeat_period(THD * thd)
572 {
573   my_bool null_value;
574   LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
575   user_var_entry *entry=
576     (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
577                                   name.length);
578   return entry? entry->val_int(&null_value) : 0;
579 }
580 
581 /*
582   Function prepares and sends repliation heartbeat event.
583 
584   @param net                net object of THD
585   @param packet             buffer to store the heartbeat instance
586   @param event_coordinates  binlog file name and position of the last
587                             real event master sent from binlog
588 
589   @note
590     Among three essential pieces of heartbeat data Log_event::when
591     is computed locally.
592     The  error to send is serious and should force terminating
593     the dump thread.
594 */
send_heartbeat_event(NET * net,String * packet,const struct event_coordinates * coord,uint8 checksum_alg_arg)595 static int send_heartbeat_event(NET* net, String* packet,
596                                 const struct event_coordinates *coord,
597                                 uint8 checksum_alg_arg)
598 {
599   DBUG_ENTER("send_heartbeat_event");
600   char header[LOG_EVENT_HEADER_LEN];
601   my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
602     checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
603   /*
604     'when' (the timestamp) is set to 0 so that slave could distinguish between
605     real and fake Rotate events (if necessary)
606   */
607   memset(header, 0, 4);  // when
608 
609   header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
610 
611   char* p= coord->file_name + dirname_length(coord->file_name);
612 
613   uint ident_len = strlen(p);
614   ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
615     (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
616   int4store(header + SERVER_ID_OFFSET, server_id);
617   int4store(header + EVENT_LEN_OFFSET, event_len);
618   int2store(header + FLAGS_OFFSET, 0);
619 
620   int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
621 
622   packet->append(header, sizeof(header));
623   packet->append(p, ident_len);             // log_file_name
624 
625   if (do_checksum)
626   {
627     char b[BINLOG_CHECKSUM_LEN];
628     ha_checksum crc= my_checksum(0L, NULL, 0);
629     crc= my_checksum(crc, (uchar*) header, sizeof(header));
630     crc= my_checksum(crc, (uchar*) p, ident_len);
631     int4store(b, crc);
632     packet->append(b, sizeof(b));
633   }
634 
635   if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
636       net_flush(net))
637   {
638     DBUG_RETURN(-1);
639   }
640   DBUG_RETURN(0);
641 }
642 
643 
644 /**
645    Reset and send a heartbeat event to the slave. This function is *only* used
646    to send heartbeat events which carry binary log position of the *last
647    skipped transaction*. Since thd->packet is used to send events to the
648    slave and the packet currently holds an event, packet state is stored
649    first. A heartbeat event is sent to the slave and the state is restored
650    later. Note that, the caller has to send the last skipped coordinates
651    to this function.
652 
653    @param[in]      net               This master-slave network handler
654    @param[in]      packet            packet that is to be sent to the slave.
655    @param[in]      last_skip_coord   coordinates for last skipped transaction
656    @param[in]      ev_offset         event offset
657    @param[in]      checksum_alg_arg  checksum alg used
658    @param[in,out]  errmsg            generated error message
659 
660    @retval           0                  ok
661    @retval          -1                  error
662 */
send_last_skip_group_heartbeat(THD * thd,NET * net,String * packet,const struct event_coordinates * last_skip_coord,ulong * ev_offset,uint8 checksum_alg_arg,const char ** errmsg,bool observe_transmission)663 static int send_last_skip_group_heartbeat(THD *thd, NET* net, String *packet,
664                                           const struct event_coordinates *last_skip_coord,
665                                           ulong *ev_offset,
666                                           uint8 checksum_alg_arg, const char **errmsg,
667                                           bool observe_transmission)
668 {
669   DBUG_ENTER("send_last_skip_group_heartbeat");
670   String save_packet;
671   int save_offset= *ev_offset;
672 
673  /* Save the current read packet */
674   save_packet.swap(*packet);
675 
676   if (reset_transmit_packet(thd, 0, ev_offset, errmsg, observe_transmission))
677     DBUG_RETURN(-1);
678 
679   /* Send heart beat event to the slave to update slave  threads coordinates */
680   if (send_heartbeat_event(net, packet, last_skip_coord, checksum_alg_arg))
681   {
682     *errmsg= "Failed on my_net_write()";
683     my_errno= ER_UNKNOWN_ERROR;
684     DBUG_RETURN(-1);
685   }
686 
687   /* Restore the packet and event offset */
688   packet->swap(save_packet);
689   *ev_offset= save_offset;
690 
691   DBUG_PRINT("info", ("rpl_master.cc:send_last_skip_group_heartbeat returns 0"));
692   DBUG_RETURN(0);
693 }
694 
695 
696 /**
697   If there are less than BYTES bytes left to read in the packet,
698   report error.
699 */
700 #define CHECK_PACKET_SIZE(BYTES)                                        \
701   do {                                                                  \
702     if (packet_bytes_todo < BYTES)                                      \
703       goto error_malformed_packet;                                      \
704   } while (0)
705 
706 /**
707   Auxiliary macro used to define READ_INT and READ_STRING.
708 
709   Check that there are at least BYTES more bytes to read, then read
710   the bytes using the given DECODER, then advance the reading
711   position.
712 */
713 #define READ(DECODE, BYTES)                                             \
714   do {                                                                  \
715     CHECK_PACKET_SIZE(BYTES);                                           \
716     DECODE;                                                             \
717     packet_position+= BYTES;                                            \
718     packet_bytes_todo-= BYTES;                                          \
719   } while (0)
720 
721 #define SKIP(BYTES) READ((void)(0), BYTES)
722 
723 /**
724   Check that there are at least BYTES more bytes to read, then read
725   the bytes and decode them into the given integer VAR, then advance
726   the reading position.
727 */
728 #define READ_INT(VAR, BYTES)                                            \
729   READ(VAR= uint ## BYTES ## korr(packet_position), BYTES)
730 
731 /**
732   Check that there are at least BYTES more bytes to read and that
733   BYTES+1 is not greater than BUFFER_SIZE, then read the bytes into
734   the given variable VAR, then advance the reading position.
735 */
736 #define READ_STRING(VAR, BYTES, BUFFER_SIZE)                            \
737   do {                                                                  \
738     if (BUFFER_SIZE <= BYTES)                                           \
739       goto error_malformed_packet;                                      \
740     READ(memcpy(VAR, packet_position, BYTES), BYTES);                   \
741     VAR[BYTES]= '\0';                                                   \
742   } while (0)
743 
744 
com_binlog_dump(THD * thd,char * packet,uint packet_length)745 bool com_binlog_dump(THD *thd, char *packet, uint packet_length)
746 {
747   DBUG_ENTER("com_binlog_dump");
748   ulong pos;
749   ushort flags= 0;
750   const uchar* packet_position= (uchar *) packet;
751   uint packet_bytes_todo= packet_length;
752 
753   status_var_increment(thd->status_var.com_other);
754   thd->enable_slow_log= opt_log_slow_admin_statements;
755   if (check_global_access(thd, REPL_SLAVE_ACL))
756     DBUG_RETURN(false);
757 
758   /*
759     4 bytes is too little, but changing the protocol would break
760     compatibility.  This has been fixed in the new protocol. @see
761     com_binlog_dump_gtid().
762   */
763   READ_INT(pos, 4);
764   READ_INT(flags, 2);
765   READ_INT(thd->server_id, 4);
766 
767   DBUG_PRINT("info", ("pos=%lu flags=%d server_id=%d", pos, flags, thd->server_id));
768 
769   kill_zombie_dump_threads(thd);
770 
771   general_log_print(thd, thd->get_command(), "Log: '%s'  Pos: %ld",
772                     packet + 10, (long) pos);
773   mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, NULL, flags);
774 
775   unregister_slave(thd, true, true/*need_lock_slave_list=true*/);
776   /*  fake COM_QUIT -- if we get here, the thread needs to terminate */
777   DBUG_RETURN(true);
778 
779 error_malformed_packet:
780   my_error(ER_MALFORMED_PACKET, MYF(0));
781   DBUG_RETURN(true);
782 }
783 
784 
com_binlog_dump_gtid(THD * thd,char * packet,uint packet_length)785 bool com_binlog_dump_gtid(THD *thd, char *packet, uint packet_length)
786 {
787   DBUG_ENTER("com_binlog_dump_gtid");
788   /*
789     Before going GA, we need to make this protocol extensible without
790     breaking compatitibilty. /Alfranio.
791   */
792   ushort flags= 0;
793   uint32 data_size= 0;
794   uint64 pos= 0;
795   char name[FN_REFLEN + 1];
796   uint32 name_size= 0;
797   char* gtid_string= NULL;
798   const uchar* packet_position= (uchar *) packet;
799   uint packet_bytes_todo= packet_length;
800   Sid_map sid_map(NULL/*no sid_lock because this is a completely local object*/);
801   Gtid_set slave_gtid_executed(&sid_map);
802 
803   status_var_increment(thd->status_var.com_other);
804   thd->enable_slow_log= opt_log_slow_admin_statements;
805   if (check_global_access(thd, REPL_SLAVE_ACL))
806     DBUG_RETURN(false);
807 
808   READ_INT(flags, 2);
809   READ_INT(thd->server_id, 4);
810   READ_INT(name_size, 4);
811   READ_STRING(name, name_size, sizeof(name));
812   READ_INT(pos, 8);
813   DBUG_PRINT("info", ("pos=%llu flags=%d server_id=%d", pos, flags, thd->server_id));
814   READ_INT(data_size, 4);
815   CHECK_PACKET_SIZE(data_size);
816   if (slave_gtid_executed.add_gtid_encoding(packet_position, data_size) !=
817       RETURN_STATUS_OK)
818     DBUG_RETURN(true);
819   gtid_string= slave_gtid_executed.to_string();
820   DBUG_PRINT("info", ("Slave %d requested to read %s at position %llu gtid set "
821                       "'%s'.", thd->server_id, name, pos, gtid_string));
822 
823   kill_zombie_dump_threads(thd);
824   general_log_print(thd, thd->get_command(), "Log: '%s' Pos: %llu GTIDs: '%s'",
825                     name, pos, gtid_string);
826   my_free(gtid_string);
827   mysql_binlog_send(thd, name, (my_off_t) pos, &slave_gtid_executed, flags);
828 
829   unregister_slave(thd, true, true/*need_lock_slave_list=true*/);
830   /*  fake COM_QUIT -- if we get here, the thread needs to terminate */
831   DBUG_RETURN(true);
832 
833 error_malformed_packet:
834   my_error(ER_MALFORMED_PACKET, MYF(0));
835   DBUG_RETURN(true);
836 }
837 
838 
mysql_binlog_send(THD * thd,char * log_ident,my_off_t pos,const Gtid_set * slave_gtid_executed,int flags)839 void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
840                        const Gtid_set* slave_gtid_executed, int flags)
841 {
842   /**
843     @todo: Clean up loop so that, among other things, we only have one
844     call to send_file(). This is WL#5721.
845   */
846 #define GOTO_ERR                                                        \
847   do {                                                                  \
848     DBUG_PRINT("info", ("mysql_binlog_send fails; goto err from line %d", \
849                         __LINE__));                                     \
850     goto err;                                                           \
851   } while (0)
852   LOG_INFO linfo;
853   char *log_file_name = linfo.log_file_name;
854   char search_file_name[FN_REFLEN], *name;
855 
856   bool is_index_file_reopened_on_binlog_disable= false;
857   ulong ev_offset;
858   bool using_gtid_protocol= slave_gtid_executed != NULL;
859   bool searching_first_gtid= using_gtid_protocol;
860   bool skip_group= false;
861   bool binlog_has_previous_gtids_log_event= false;
862   bool gtid_event_logged= false;
863   bool has_transmit_started= false;
864   Sid_map *sid_map= slave_gtid_executed ? slave_gtid_executed->get_sid_map() : NULL;
865 
866   IO_CACHE log;
867   File file = -1;
868   String* packet = &thd->packet;
869   time_t last_event_sent_ts= time(0);
870   bool time_for_hb_event= false;
871   int error= 0;
872   const char *errmsg = "Unknown error";
873   char error_text[MAX_SLAVE_ERRMSG]= {0}; // to be send to slave via my_message()
874   NET* net = &thd->net;
875   mysql_mutex_t *log_lock;
876   mysql_cond_t *log_cond;
877   uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
878   Format_description_log_event fdle(BINLOG_VERSION), *p_fdle= &fdle;
879   Gtid first_gtid;
880 
881 #ifndef DBUG_OFF
882   int left_events = max_binlog_dump_events;
883 #endif
884   int old_max_allowed_packet= thd->variables.max_allowed_packet;
885   /*
886     Dump thread sends ER_MASTER_FATAL_ERROR_READING_BINLOG instead of the real
887     errors happend on master to slave when erorr is encountered.
888     So set a temporary Diagnostics_area to thd. The low level error is always
889     set into the temporary Diagnostics_area and be ingored. The original
890     Diagnostics_area will be restored at the end of this function.
891     ER_MASTER_FATAL_ERROR_READING_BINLOG will be set to the original
892     Diagnostics_area.
893   */
894   Diagnostics_area temp_da;
895   Diagnostics_area *saved_da= thd->get_stmt_da();
896   thd->set_stmt_da(&temp_da);
897   bool was_killed_by_duplicate_slave_id= false;
898 
899   DBUG_ENTER("mysql_binlog_send");
900   DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
901 
902   memset(&log, 0, sizeof(log));
903   /*
904      heartbeat_period from @master_heartbeat_period user variable
905   */
906   ulonglong heartbeat_period= get_heartbeat_period(thd);
907   struct timespec heartbeat_buf;
908   struct timespec *heartbeat_ts= NULL;
909   const LOG_POS_COORD start_coord= { log_ident, pos },
910     *p_start_coord= &start_coord;
911   LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
912     *p_coord= &coord_buf;
913 
914   /*
915     We use the following variables to send a HB event
916     when groups are skipped during a GTID protocol.
917   */
918   /* Flag to check if last transaction was skipped */
919   bool last_skip_group= skip_group;
920   /* File name where last skip group is present */
921   char last_skip_log_name[FN_REFLEN+1];
922   /* Coordinates of the last skip group */
923   LOG_POS_COORD last_skip_coord_buf= {last_skip_log_name, BIN_LOG_HEADER_SIZE},
924                 *p_last_skip_coord= &last_skip_coord_buf;
925   bool observe_transmission= false;
926 
927   if (heartbeat_period != LL(0))
928   {
929     heartbeat_ts= &heartbeat_buf;
930     set_timespec_nsec(*heartbeat_ts, 0);
931   }
932 
933 #ifndef DBUG_OFF
934   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
935   {
936     errmsg = "Master fails in COM_BINLOG_DUMP because of --opt-sporadic-binlog-dump-fail";
937     my_errno= ER_UNKNOWN_ERROR;
938     GOTO_ERR;
939   }
940 #endif
941 
942   if (!mysql_bin_log.is_open())
943   {
944     errmsg = "Binary log is not open";
945     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
946     GOTO_ERR;
947   }
948   if (!server_id_supplied)
949   {
950     errmsg = "Misconfigured master - server_id was not set";
951     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
952     GOTO_ERR;
953   }
954 
955   name= search_file_name;
956   if (log_ident[0])
957     mysql_bin_log.make_log_name(search_file_name, log_ident);
958   else
959   {
960     if (using_gtid_protocol)
961     {
962       /*
963         In normal scenarios, it is not possible that Slave will
964         contain more gtids than Master with resepctive to Master's
965         UUID. But it could be possible case if Master's binary log
966         is truncated(due to raid failure) or Master's binary log is
967         deleted but GTID_PURGED was not set properly. That scenario
968         needs to be validated, i.e., it should *always* be the case that
969         Slave's gtid executed set (+retrieved set) is a subset of
970         Master's gtid executed set with respective to Master's UUID.
971         If it happens, dump thread will be stopped during the handshake
972         with Slave (thus the Slave's I/O thread will be stopped with the
973         error. Otherwise, it can lead to data inconsistency between Master
974         and Slave.
975       */
976       Sid_map* slave_sid_map= slave_gtid_executed->get_sid_map();
977       DBUG_ASSERT(slave_sid_map);
978       global_sid_lock->wrlock();
979       const rpl_sid &server_sid= gtid_state->get_server_sid();
980       rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid);
981       if (!slave_gtid_executed->is_subset_for_sid(gtid_state->get_logged_gtids(),
982                                                   gtid_state->get_server_sidno(),
983                                                   subset_sidno))
984       {
985         errmsg= ER(ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER);
986         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
987         global_sid_lock->unlock();
988         GOTO_ERR;
989       }
990       /*
991         Setting GTID_PURGED (when GTID_EXECUTED set is empty i.e., when
992         previous_gtids are also empty) will make binlog rotate. That
993         leaves first binary log with empty previous_gtids and second
994         binary log's previous_gtids with the value of gtid_purged.
995         In find_first_log_not_in_gtid_set() while we search for a binary
996         log whose previous_gtid_set is subset of slave_gtid_executed,
997         in this particular case, server will always find the first binary
998         log with empty previous_gtids which is subset of any given
999         slave_gtid_executed. Thus Master thinks that it found the first
1000         binary log which is actually not correct and unable to catch
1001         this error situation. Hence adding below extra if condition
1002         to check the situation. Slave should know about Master's purged GTIDs.
1003         If Slave's GTID executed + retrieved set does not contain Master's
1004         complete purged GTID list, that means Slave is requesting(expecting)
1005         GTIDs which were purged by Master. We should let Slave know about the
1006         situation. i.e., throw error if slave's GTID executed set is not
1007         a superset of Master's purged GTID set.
1008         The other case, where user deleted binary logs manually
1009         (without using 'PURGE BINARY LOGS' command) but gtid_purged
1010         is not set by the user, the following if condition cannot catch it.
1011         But that is not a problem because in find_first_log_not_in_gtid_set()
1012         while checking for subset previous_gtids binary log, the logic
1013         will not find one and an error ER_MASTER_HAS_PURGED_REQUIRED_GTIDS
1014         is thrown from there.
1015       */
1016       if (!gtid_state->get_lost_gtids()->is_subset(slave_gtid_executed))
1017       {
1018         mysql_bin_log.report_missing_purged_gtids(slave_gtid_executed, &errmsg);
1019         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1020         global_sid_lock->unlock();
1021         GOTO_ERR;
1022       }
1023       global_sid_lock->unlock();
1024       first_gtid.clear();
1025       if (mysql_bin_log.find_first_log_not_in_gtid_set(name,
1026                                                        slave_gtid_executed,
1027                                                        &first_gtid,
1028                                                        &errmsg))
1029       {
1030          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1031          GOTO_ERR;
1032       }
1033     }
1034     else
1035       name= 0;					// Find first log
1036   }
1037 
1038   linfo.index_file_offset= 0;
1039 
1040   if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1041   {
1042     errmsg = "Could not find first log file name in binary log index file";
1043     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1044     GOTO_ERR;
1045   }
1046 
1047   mysql_mutex_lock(&LOCK_thread_count);
1048   thd->current_linfo = &linfo;
1049   mysql_mutex_unlock(&LOCK_thread_count);
1050 
1051   if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0)
1052   {
1053     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1054     GOTO_ERR;
1055   }
1056   if (pos < BIN_LOG_HEADER_SIZE)
1057   {
1058     errmsg= "Client requested master to start replication from position < 4";
1059     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1060     GOTO_ERR;
1061   }
1062   if (pos > my_b_filelength(&log))
1063   {
1064     errmsg= "Client requested master to start replication from position > file size";
1065     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1066     GOTO_ERR;
1067   }
1068 
1069   if (log_warnings > 1)
1070     sql_print_information("Start binlog_dump to master_thread_id(%lu) slave_server(%u), pos(%s, %lu)",
1071                           thd->thread_id, thd->server_id, log_ident, (ulong)pos);
1072   if (RUN_HOOK(binlog_transmit, transmit_start,
1073                (thd, flags, log_ident, pos, &observe_transmission)))
1074   {
1075     errmsg= "Failed to run hook 'transmit_start'";
1076     my_errno= ER_UNKNOWN_ERROR;
1077     GOTO_ERR;
1078   }
1079   has_transmit_started= true;
1080   /* reset transmit packet for the fake rotate event below */
1081   if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1082                             observe_transmission))
1083     GOTO_ERR;
1084 
1085   /*
1086     Tell the client about the log name with a fake Rotate event;
1087     this is needed even if we also send a Format_description_log_event
1088     just after, because that event does not contain the binlog's name.
1089     Note that as this Rotate event is sent before
1090     Format_description_log_event, the slave cannot have any info to
1091     understand this event's format, so the header len of
1092     Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
1093     than other events except FORMAT_DESCRIPTION_EVENT).
1094     Before 4.0.14 we called fake_rotate_event below only if (pos ==
1095     BIN_LOG_HEADER_SIZE), because if this is false then the slave
1096     already knows the binlog's name.
1097     Since, we always call fake_rotate_event; if the slave already knew
1098     the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
1099     useless but does not harm much. It is nice for 3.23 (>=.58) slaves
1100     which test Rotate events to see if the master is 4.0 (then they
1101     choose to stop because they can't replicate 4.0); by always calling
1102     fake_rotate_event we are sure that 3.23.58 and newer will detect the
1103     problem as soon as replication starts (BUG#198).
1104     Always calling fake_rotate_event makes sending of normal
1105     (=from-binlog) Rotate events a priori unneeded, but it is not so
1106     simple: the 2 Rotate events are not equivalent, the normal one is
1107     before the Stop event, the fake one is after. If we don't send the
1108     normal one, then the Stop event will be interpreted (by existing 4.0
1109     slaves) as "the master stopped", which is wrong. So for safety,
1110     given that we want minimum modification of 4.0, we send the normal
1111     and fake Rotates.
1112   */
1113   if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
1114       get_binlog_checksum_value_at_connect(current_thd)))
1115   {
1116     /*
1117        This error code is not perfect, as fake_rotate_event() does not
1118        read anything from the binlog; if it fails it's because of an
1119        error in my_net_write(), fortunately it will say so in errmsg.
1120     */
1121     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1122     GOTO_ERR;
1123   }
1124 
1125   /*
1126     Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
1127     this larger than the corresponding packet (query) sent
1128     from client to master.
1129   */
1130   thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
1131 
1132   /*
1133     We can set log_lock now, it does not move (it's a member of
1134     mysql_bin_log, and it's already inited, and it will be destroyed
1135     only at shutdown).
1136   */
1137   p_coord->pos= pos; // the first hb matches the slave's last seen value
1138   log_lock= mysql_bin_log.get_log_lock();
1139   log_cond= mysql_bin_log.get_log_cond();
1140   if (pos > BIN_LOG_HEADER_SIZE)
1141   {
1142     /* reset transmit packet for the event read from binary log
1143        file */
1144     if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1145                               observe_transmission))
1146       GOTO_ERR;
1147 
1148      /*
1149        Try to find a Format_description_log_event at the beginning of
1150        the binlog
1151      */
1152     if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
1153     {
1154       DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__));
1155       /*
1156         The packet has offsets equal to the normal offsets in a
1157         binlog event + ev_offset (the first ev_offset characters are
1158         the header (default \0)).
1159       */
1160       DBUG_PRINT("info",
1161                  ("Looked for a Format_description_log_event, found event type %s",
1162                   Log_event::get_type_str((Log_event_type)(*packet)[EVENT_TYPE_OFFSET + ev_offset])));
1163       if ((*packet)[EVENT_TYPE_OFFSET + ev_offset] == FORMAT_DESCRIPTION_EVENT)
1164       {
1165         current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
1166                                                packet->length() - ev_offset);
1167         DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
1168                     current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1169                     current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
1170         if (!is_slave_checksum_aware(thd) &&
1171             current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1172             current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1173         {
1174           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1175           errmsg= "Slave can not handle replication events with the checksum "
1176             "that master is configured to log";
1177           sql_print_warning("Master is configured to log replication events "
1178                             "with checksum, but will not send such events to "
1179                             "slaves that cannot process them");
1180           GOTO_ERR;
1181         }
1182         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1183         /*
1184           mark that this event with "log_pos=0", so the slave
1185           should not increment master's binlog position
1186           (rli->group_master_log_pos)
1187         */
1188         int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
1189         /*
1190           if reconnect master sends FD event with `created' as 0
1191           to avoid destroying temp tables.
1192         */
1193         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
1194                   ST_CREATED_OFFSET+ev_offset, (ulong) 0);
1195 
1196         /* fix the checksum due to latest changes in header */
1197         if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1198             current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1199           fix_checksum(packet, ev_offset);
1200 
1201         /* send it */
1202         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
1203         {
1204           errmsg = "Failed on my_net_write()";
1205           my_errno= ER_UNKNOWN_ERROR;
1206           GOTO_ERR;
1207         }
1208 
1209         /*
1210           No need to save this event. We are only doing simple reads
1211           (no real parsing of the events) so we don't need it. And so
1212           we don't need the artificial Format_description_log_event of
1213           3.23&4.x.
1214         */
1215       }
1216     }
1217     else
1218     {
1219       if (test_for_non_eof_log_read_errors(error, &errmsg))
1220         GOTO_ERR;
1221       /*
1222         It's EOF, nothing to do, go on reading next events, the
1223         Format_description_log_event will be found naturally if it is written.
1224       */
1225     }
1226   } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
1227   else
1228   {
1229     /* The Format_description_log_event event will be found naturally. */
1230   }
1231 
1232   /* seek to the requested position, to start the requested dump */
1233   my_b_seek(&log, pos);			// Seek will done on next read
1234 
1235   while (!net->error && net->vio != 0 && !thd->killed)
1236   {
1237     Log_event_type event_type= UNKNOWN_EVENT;
1238     bool goto_next_binlog= false;
1239 
1240     /* reset the transmit packet for the event read from binary log
1241        file */
1242     if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1243                               observe_transmission))
1244       GOTO_ERR;
1245     DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
1246                     {
1247                       const char act[]= "now wait_for signal.rotate_finished no_clear_event";
1248                       DBUG_ASSERT(!debug_sync_set_action(current_thd,
1249                                                          STRING_WITH_LEN(act)));
1250                     };);
1251     bool is_active_binlog= false;
1252     while (!thd->killed &&
1253            !(error= Log_event::read_log_event(&log, packet, log_lock,
1254                                               current_checksum_alg,
1255                                               log_file_name,
1256                                               &is_active_binlog)))
1257     {
1258       DBUG_EXECUTE_IF("simulate_dump_thread_kill",
1259                       {
1260                         thd->killed= THD::KILL_CONNECTION;
1261                       });
1262       DBUG_EXECUTE_IF("hold_dump_thread_inside_inner_loop",
1263                     {
1264                       const char act[]= "now "
1265                                         "signal signal_inside_inner_loop "
1266                                         "wait_for signal_continue";
1267                       DBUG_ASSERT(!debug_sync_set_action(current_thd,
1268                                                          STRING_WITH_LEN(act)));
1269                       DBUG_ASSERT(thd->killed);
1270                     };);
1271       time_t created;
1272       DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__));
1273 #ifndef DBUG_OFF
1274       if (max_binlog_dump_events && !left_events--)
1275       {
1276         net_flush(net);
1277         errmsg = "Debugging binlog dump abort";
1278         my_errno= ER_UNKNOWN_ERROR;
1279         GOTO_ERR;
1280       }
1281 #endif
1282       /*
1283         log's filename does not change while it's active
1284       */
1285       p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
1286 
1287       event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
1288       DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
1289                       {
1290                         if (event_type == XID_EVENT)
1291                         {
1292                           net_flush(net);
1293                           const char act[]=
1294                             "now "
1295                             "wait_for signal.continue";
1296                           DBUG_ASSERT(opt_debug_sync_timeout > 0);
1297                           DBUG_ASSERT(!debug_sync_set_action(current_thd,
1298                                                              STRING_WITH_LEN(act)));
1299                         }
1300                       });
1301 
1302       switch (event_type)
1303       {
1304       case FORMAT_DESCRIPTION_EVENT:
1305         skip_group= false;
1306         current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
1307                                                packet->length() - ev_offset);
1308         DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
1309                     current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1310                     current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
1311         if (!is_slave_checksum_aware(thd) &&
1312             current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1313             current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1314         {
1315           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1316           errmsg= "Slave can not handle replication events with the checksum "
1317             "that master is configured to log";
1318           sql_print_warning("Master is configured to log replication events "
1319                             "with checksum, but will not send such events to "
1320                             "slaves that cannot process them");
1321           GOTO_ERR;
1322         }
1323         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1324 
1325         created= uint4korr(packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
1326                            ST_CREATED_OFFSET+ev_offset);
1327 
1328         if (using_gtid_protocol && created > 0)
1329         {
1330           if (first_gtid.sidno >= 1 && first_gtid.gno >= 1 &&
1331               slave_gtid_executed->contains_gtid(first_gtid.sidno,
1332                                                  first_gtid.gno))
1333           {
1334             /*
1335               As we are skipping at least the first transaction of the binlog,
1336               we must clear the "created" field of the FD event (set it to 0)
1337               to avoid cleaning up temp tables on slave.
1338             */
1339             DBUG_PRINT("info",("setting 'created' to 0 before sending FD event"));
1340             int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
1341                       ST_CREATED_OFFSET+ev_offset, (ulong) 0);
1342 
1343             /* Fix the checksum due to latest changes in header */
1344             if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1345                 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1346               fix_checksum(packet, ev_offset);
1347 
1348             first_gtid.clear();
1349           }
1350         }
1351         /*
1352           Fixes the information on the checksum algorithm when a new
1353           format description is read. Notice that this only necessary
1354           when we need to filter out some transactions which were
1355           already processed.
1356         */
1357         p_fdle->checksum_alg= current_checksum_alg;
1358         break;
1359 
1360       case ANONYMOUS_GTID_LOG_EVENT:
1361         /* do nothing */
1362         break;
1363       case GTID_LOG_EVENT:
1364         if (gtid_mode == 0)
1365         {
1366           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1367           errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1368           GOTO_ERR;
1369         }
1370         if (using_gtid_protocol)
1371         {
1372           /*
1373             The current implementation checks if the GTID was not processed
1374             by the slave. This means that everytime a GTID is read, one needs
1375             to check if it was already processed by the slave. If this is the
1376             case, the group is not sent. Otherwise, it must be sent.
1377 
1378             I think we can do better than that. /Alfranio.
1379           */
1380           ulonglong checksum_size=
1381             ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1382               p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
1383              BINLOG_CHECKSUM_LEN + ev_offset : ev_offset);
1384           /**
1385             @todo: use a local sid_map to avoid the lookup in the
1386             global one here /Sven
1387           */
1388           Gtid_log_event gtid_ev(packet->ptr() + ev_offset,
1389                                  packet->length() - checksum_size,
1390                                  p_fdle);
1391           skip_group= slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map),
1392                                                      gtid_ev.get_gno());
1393           searching_first_gtid= skip_group;
1394           DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) skip group(%d) "
1395                               "searching gtid(%d).",
1396                               gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
1397                               skip_group, searching_first_gtid));
1398           gtid_event_logged= true;
1399         }
1400         break;
1401 
1402       case STOP_EVENT:
1403       case INCIDENT_EVENT:
1404         skip_group= searching_first_gtid;
1405         break;
1406 
1407       case PREVIOUS_GTIDS_LOG_EVENT:
1408         binlog_has_previous_gtids_log_event= true;
1409         if (gtid_mode == 0)
1410         {
1411           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1412           errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1413           GOTO_ERR;
1414         }
1415         /* FALLTHROUGH */
1416       case ROTATE_EVENT:
1417         skip_group= false;
1418         break;
1419 
1420       default:
1421         if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
1422           /*
1423             If we come here, it means we are seeing a 'normal' DML/DDL
1424             event (e.g. query_log_event) without having seen any
1425             Previous_gtid_log_event. That means we are in an old
1426             binlog (no previous_gtids_log_event). When using the GTID
1427             protocol, that means we must skip the entire binary log
1428             and jump to the next one.
1429           */
1430           goto_next_binlog= true;
1431 
1432         if (!gtid_event_logged && using_gtid_protocol)
1433         {
1434           /*
1435              Skip groups in the binlogs which don't have any gtid event
1436              logged before them. When opt_gtid_deployment_step is ON, the server
1437              doesn't generate GTID and so no gtid_event is logged before binlog
1438              events. But when opt_gtid_deployment_step is OFF, the server starts
1439              writing gtid_events in the middle of active binlog. When slave
1440              connects with gtid_protocol, master needs to skip binlog events
1441              which don't have corresponding gtid_event.
1442           */
1443           skip_group= true;
1444         }
1445         break;
1446       }
1447 
1448       if (goto_next_binlog)
1449         // stop reading from this binlog
1450         break;
1451 
1452       DBUG_PRINT("info", ("EVENT_TYPE %d SEARCHING %d SKIP_GROUP %d file %s pos %lld\n",
1453                  event_type, searching_first_gtid, skip_group, log_file_name,
1454                  my_b_tell(&log)));
1455       pos = my_b_tell(&log);
1456       if (observe_transmission &&
1457           RUN_HOOK(binlog_transmit, before_send_event,
1458                    (thd, flags, packet, log_file_name, pos)))
1459       {
1460         my_errno= ER_UNKNOWN_ERROR;
1461         errmsg= "run 'before_send_event' hook failed";
1462         GOTO_ERR;
1463       }
1464 
1465       /* The present event was skipped, so store the event coordinates */
1466       if (skip_group)
1467       {
1468         p_last_skip_coord->pos= p_coord->pos;
1469         strcpy(p_last_skip_coord->file_name, p_coord->file_name);
1470         /*
1471           If we have not send any event from past 'heartbeat_period' time
1472           period, then it is time to send a packet before skipping this group.
1473          */
1474         DBUG_EXECUTE_IF("inject_2sec_sleep_when_skipping_an_event",
1475                         {
1476                           my_sleep(2000000);
1477                         });
1478         time_t now= time(0);
1479         DBUG_ASSERT(now >= last_event_sent_ts);
1480         time_for_hb_event= ((ulonglong)(now - last_event_sent_ts) >=
1481                             (ulonglong)(heartbeat_period/1000000000UL));
1482       }
1483 
1484       if ((!skip_group && last_skip_group
1485            && event_type != FORMAT_DESCRIPTION_EVENT) || time_for_hb_event)
1486       {
1487         /*
1488           Dump thread is ready to send it's first transaction after
1489           one or more skipped transactions or dump thread did not
1490           send any event from past 'heartbeat_period' time frame
1491           (busy skipping gtid groups). Send a heart beat event
1492           to update slave IO thread coordinates before that happens.
1493 
1494           Notice that for a new binary log file, FORMAT_DESCRIPTION_EVENT
1495           is the first event to be sent to the slave. In this case, it is
1496           no need to send a HB event (which might have coordinates
1497           of previous binlog file).
1498         */
1499 
1500         if (send_last_skip_group_heartbeat(thd, net, packet, p_last_skip_coord,
1501                                            &ev_offset, current_checksum_alg,
1502                                            &errmsg, observe_transmission))
1503         {
1504           GOTO_ERR;
1505         }
1506         last_event_sent_ts= time(0);
1507         last_skip_group= time_for_hb_event= false;
1508       }
1509       else
1510       {
1511         last_skip_group= skip_group;
1512       }
1513 
1514       DBUG_EXECUTE_IF("master_xid_trigger",
1515       {
1516         Log_event_type event_type= (Log_event_type)
1517                                       (*packet)[EVENT_TYPE_OFFSET + ev_offset];
1518         if (event_type == XID_EVENT)
1519         {
1520           const char act[]= "now signal master_xid_reached wait_for resume";
1521           DBUG_ASSERT(!debug_sync_set_action(current_thd,
1522                                              STRING_WITH_LEN(act)));
1523       }});
1524 
1525       if (skip_group == false)
1526       {
1527         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
1528         {
1529           errmsg = "Failed on my_net_write()";
1530           my_errno= ER_UNKNOWN_ERROR;
1531           GOTO_ERR;
1532         }
1533         last_event_sent_ts= time(0);
1534       }
1535 
1536       DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
1537                       {
1538                         if (event_type == XID_EVENT)
1539                         {
1540                           net_flush(net);
1541                         }
1542                       });
1543 
1544       DBUG_PRINT("info", ("log event code %d", event_type));
1545       if (skip_group == false && event_type == LOAD_EVENT)
1546       {
1547 	if (send_file(thd))
1548 	{
1549           errmsg = "failed in send_file()";
1550           my_errno= ER_UNKNOWN_ERROR;
1551           GOTO_ERR;
1552 	}
1553       }
1554 
1555       if (observe_transmission &&
1556           RUN_HOOK(binlog_transmit, after_send_event,
1557                    (thd, flags, packet, log_file_name, skip_group ? pos : 0)))
1558       {
1559         errmsg= "Failed to run hook 'after_send_event'";
1560         my_errno= ER_UNKNOWN_ERROR;
1561         GOTO_ERR;
1562       }
1563 
1564       /* reset transmit packet for next loop */
1565       if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1566                                 observe_transmission))
1567         GOTO_ERR;
1568     }
1569 
1570     /* If the above while is killed due to thd->killed flag and not
1571       due to read_log_event error, then do nothing.*/
1572     if (thd->killed)
1573       goto end;
1574     DBUG_EXECUTE_IF("wait_after_binlog_EOF",
1575                     {
1576                       const char act[]= "now wait_for signal.rotate_finished no_clear_event";
1577                       DBUG_ASSERT(!debug_sync_set_action(current_thd,
1578                                                          STRING_WITH_LEN(act)));
1579                     };);
1580 
1581     /*
1582       TODO: now that we are logging the offset, check to make sure
1583       the recorded offset and the actual match.
1584       Guilhem 2003-06: this is not true if this master is a slave
1585       <4.0.15 running with --log-slave-updates, because then log_pos may
1586       be the offset in the-master-of-this-master's binlog.
1587     */
1588     if (test_for_non_eof_log_read_errors(error, &errmsg))
1589       GOTO_ERR;
1590 
1591     if (!is_active_binlog)
1592       goto_next_binlog= true;
1593 
1594     if (!goto_next_binlog)
1595     {
1596       /*
1597         Block until there is more data in the log
1598       */
1599       if (net_flush(net))
1600       {
1601         errmsg = "failed on net_flush()";
1602         my_errno= ER_UNKNOWN_ERROR;
1603         GOTO_ERR;
1604       }
1605 
1606       /*
1607 	We may have missed the update broadcast from the log
1608 	that has just happened, let's try to catch it if it did.
1609 	If we did not miss anything, we just wait for other threads
1610 	to signal us.
1611       */
1612       {
1613 	log.error=0;
1614 	bool read_packet = 0;
1615 
1616 #ifndef DBUG_OFF
1617         if (max_binlog_dump_events && !left_events--)
1618         {
1619           errmsg = "Debugging binlog dump abort";
1620           my_errno= ER_UNKNOWN_ERROR;
1621           GOTO_ERR;
1622         }
1623 #endif
1624 
1625         /* reset the transmit packet for the event read from binary log
1626            file */
1627         if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1628                                   observe_transmission))
1629           GOTO_ERR;
1630 
1631 	/*
1632 	  No one will update the log while we are reading
1633 	  now, but we'll be quick and just read one record
1634 
1635 	  TODO:
1636           Add an counter that is incremented for each time we update the
1637           binary log.  We can avoid the following read if the counter
1638           has not been updated since last read.
1639 	*/
1640 
1641         mysql_mutex_lock(log_lock);
1642         switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
1643                                                  current_checksum_alg)) {
1644 	case 0:
1645           DBUG_PRINT("info", ("read_log_event returned 0 on line %d",
1646                               __LINE__));
1647 	  /* we read successfully, so we'll need to send it to the slave */
1648           mysql_mutex_unlock(log_lock);
1649 	  read_packet = 1;
1650           p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
1651           event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
1652           DBUG_ASSERT(event_type != FORMAT_DESCRIPTION_EVENT);
1653 	  break;
1654 
1655 	case LOG_READ_EOF:
1656         {
1657           int ret;
1658           ulong signal_cnt;
1659 	  DBUG_PRINT("wait",("waiting for data in binary log"));
1660           /*
1661             There are two ways to tell the server to not block:
1662 
1663             - Set the BINLOG_DUMP_NON_BLOCK flag.
1664               This is official, documented, not used by any mysql
1665               client, but used by some external users.
1666 
1667             - Set server_id=0.
1668               This is unofficial, undocumented, and used by
1669               mysqlbinlog -R since the beginning of time.
1670 
1671             When mysqlbinlog --stop-never is used, it sets a 'fake'
1672             server_id that defaults to 1 but can be set to anything
1673             else using stop-never-slave-server-id. This has the
1674             drawback that if the server_id conflicts with any other
1675             running slave, or with any other instance of mysqlbinlog
1676             --stop-never, then that other instance will be killed.  It
1677             is also an unnecessary burden on the user to have to
1678             specify a server_id different from all other server_ids
1679             just to avoid conflicts.
1680 
1681             As of MySQL 5.6.20 and 5.7.5, mysqlbinlog redundantly sets
1682             the BINLOG_DUMP_NONBLOCK flag when one or both of the
1683             following holds:
1684             - the --stop-never option is *not* specified
1685 
1686             In a far future, this means we can remove the unofficial
1687             functionality that server_id=0 implies nonblocking
1688             behavior. That will allow mysqlbinlog to use server_id=0
1689             always. That has the advantage that mysqlbinlog
1690             --stop-never cannot cause any running dump threads to be
1691             killed.
1692           */
1693           if (thd->server_id == 0 || ((flags & BINLOG_DUMP_NON_BLOCK) != 0))
1694 	  {
1695             DBUG_PRINT("info", ("stopping dump thread because server_id==0 or the BINLOG_DUMP_NON_BLOCK flag is set: server_id=%u flags=%d", thd->server_id, flags));
1696             mysql_mutex_unlock(log_lock);
1697             DBUG_EXECUTE_IF("inject_hb_event_on_mysqlbinlog_dump_thread",
1698             {
1699               /*
1700                 Send one HB event (with anything in it, content is irrelevant).
1701                 We just want to check that mysqlbinlog will be able to ignore it.
1702 
1703                 Suicide on failure, since if it happens the entire purpose of the
1704                 test is comprimised.
1705                */
1706               if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1707                                         observe_transmission) ||
1708                   send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
1709                 DBUG_SUICIDE();
1710             });
1711 	    goto end;
1712 	  }
1713 
1714 #ifndef DBUG_OFF
1715           ulong hb_info_counter= 0;
1716 #endif
1717           PSI_stage_info old_stage;
1718           signal_cnt= mysql_bin_log.signal_cnt;
1719 
1720           do
1721           {
1722             if (heartbeat_period != 0)
1723             {
1724               DBUG_ASSERT(heartbeat_ts);
1725               set_timespec_nsec(*heartbeat_ts, heartbeat_period);
1726             }
1727             thd->ENTER_COND(log_cond, log_lock,
1728                             &stage_master_has_sent_all_binlog_to_slave,
1729                             &old_stage);
1730             /*
1731               When using GTIDs, if the dump thread has reached the end of the
1732               binary log and the last transaction is skipped,
1733               send one heartbeat event even when the heartbeat  is off.
1734               If the heartbeat is on, it is better to send a heartbeat
1735               event as the time_out of certain functions (Ex: master_pos_wait()
1736               or semi sync ack timeout) might be less than heartbeat period.
1737             */
1738             if (skip_group)
1739             {
1740               /*
1741                 TODO: Number of HB events sent from here can be reduced
1742                by checking whehter it is time to send a HB event or not.
1743                (i.e., using the flag time_for_hb_event)
1744               */
1745               if (send_last_skip_group_heartbeat(thd, net, packet,
1746                                                  p_coord, &ev_offset,
1747                                                  current_checksum_alg, &errmsg,
1748                                                  observe_transmission))
1749               {
1750                 thd->EXIT_COND(&old_stage);
1751                 GOTO_ERR;
1752               }
1753               last_skip_group= false; /*A HB for this pos has been sent. */
1754             }
1755 
1756             ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
1757             DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
1758             if (ret == ETIMEDOUT || ret == ETIME)
1759             {
1760 #ifndef DBUG_OFF
1761               if (hb_info_counter < 3)
1762               {
1763                 sql_print_information("master sends heartbeat message");
1764                 hb_info_counter++;
1765                 if (hb_info_counter == 3)
1766                   sql_print_information("the rest of heartbeat info skipped ...");
1767               }
1768 #endif
1769               /* reset transmit packet for the heartbeat event */
1770               if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1771                                         observe_transmission))
1772               {
1773                 thd->EXIT_COND(&old_stage);
1774                 GOTO_ERR;
1775               }
1776               if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
1777               {
1778                 errmsg = "Failed on my_net_write()";
1779                 my_errno= ER_UNKNOWN_ERROR;
1780                 thd->EXIT_COND(&old_stage);
1781                 GOTO_ERR;
1782               }
1783             }
1784             else
1785             {
1786               DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
1787             }
1788           } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
1789           thd->EXIT_COND(&old_stage);
1790         }
1791         break;
1792 
1793         default:
1794           mysql_mutex_unlock(log_lock);
1795           test_for_non_eof_log_read_errors(error, &errmsg);
1796           GOTO_ERR;
1797         }
1798 
1799         if (read_packet)
1800         {
1801           switch (event_type)
1802           {
1803           case ANONYMOUS_GTID_LOG_EVENT:
1804             /* do nothing */
1805             break;
1806           case GTID_LOG_EVENT:
1807             if (gtid_mode == 0)
1808             {
1809               my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1810               errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1811               GOTO_ERR;
1812             }
1813             if (using_gtid_protocol)
1814             {
1815               ulonglong checksum_size=
1816                 ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1817                   p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
1818                  BINLOG_CHECKSUM_LEN + ev_offset : ev_offset);
1819               Gtid_log_event gtid_ev(packet->ptr() + ev_offset,
1820                                      packet->length() - checksum_size,
1821                                      p_fdle);
1822               skip_group=
1823                 slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map),
1824                                                gtid_ev.get_gno());
1825               searching_first_gtid= skip_group;
1826               DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) "
1827                                   "skip group(%d) searching gtid(%d).",
1828                                   gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
1829                                   skip_group, searching_first_gtid));
1830               gtid_event_logged= true;
1831             }
1832             break;
1833 
1834           case STOP_EVENT:
1835           case INCIDENT_EVENT:
1836             skip_group= searching_first_gtid;
1837             break;
1838 
1839           case PREVIOUS_GTIDS_LOG_EVENT:
1840             binlog_has_previous_gtids_log_event= true;
1841             if (gtid_mode == 0)
1842             {
1843               my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1844               errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1845               GOTO_ERR;
1846             }
1847             /* FALLTHROUGH */
1848           case ROTATE_EVENT:
1849             skip_group= false;
1850             break;
1851           default:
1852             if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
1853               /*
1854                 If we come here, it means we are seeing a 'normal' DML/DDL
1855                 event (e.g. query_log_event) without having seen any
1856                 Previous_gtid_log_event. That means we are in an old
1857                 binlog (no previous_gtids_log_event). When using the GTID
1858                 protocol, that means we must skip the entire binary log
1859                 and jump to the next one.
1860               */
1861               goto_next_binlog= true;
1862             if (!gtid_event_logged && using_gtid_protocol)
1863             {
1864               /*
1865                  Skip groups in the binlogs which don't have any gtid event
1866                  logged before them.
1867               */
1868               skip_group= true;
1869             }
1870 
1871             break;
1872           }
1873 
1874           /* The present event was skipped in a GTID protocol, store the coordinates */
1875           if (skip_group)
1876           {
1877             p_last_skip_coord->pos= p_coord->pos;
1878             strcpy(p_last_skip_coord->file_name, p_coord->file_name);
1879           }
1880 
1881           if (!skip_group && !goto_next_binlog)
1882           {
1883             /* If the last group was skipped, send a HB event */
1884             if (last_skip_group &&
1885                 send_last_skip_group_heartbeat(thd, net, packet,
1886                                                p_last_skip_coord, &ev_offset,
1887                                                current_checksum_alg, &errmsg,
1888                                                observe_transmission))
1889             {
1890               GOTO_ERR;
1891             }
1892 
1893             THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
1894             pos = my_b_tell(&log);
1895             if (observe_transmission &&
1896                 RUN_HOOK(binlog_transmit, before_send_event,
1897                          (thd, flags, packet, log_file_name, pos)))
1898             {
1899               my_errno= ER_UNKNOWN_ERROR;
1900               errmsg= "run 'before_send_event' hook failed";
1901               GOTO_ERR;
1902             }
1903 
1904             if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
1905             {
1906              errmsg = "Failed on my_net_write()";
1907              my_errno= ER_UNKNOWN_ERROR;
1908              GOTO_ERR;
1909             }
1910             last_event_sent_ts= time(0);
1911 
1912             if (event_type == LOAD_EVENT)
1913             {
1914               if (send_file(thd))
1915               {
1916                 errmsg = "failed in send_file()";
1917                 my_errno= ER_UNKNOWN_ERROR;
1918                 GOTO_ERR;
1919               }
1920             }
1921           }
1922 
1923           if(!goto_next_binlog)
1924           {
1925             if (observe_transmission &&
1926                 RUN_HOOK(binlog_transmit, after_send_event,
1927                          (thd, flags, packet, log_file_name,
1928                           skip_group ? pos : 0)))
1929             {
1930               my_errno= ER_UNKNOWN_ERROR;
1931               errmsg= "Failed to run hook 'after_send_event'";
1932               GOTO_ERR;
1933             }
1934           }
1935 
1936           /* Save the skip group for next iteration */
1937           last_skip_group= skip_group;
1938 
1939         }
1940 
1941         log.error=0;
1942       }
1943     }
1944 
1945     if (goto_next_binlog)
1946     {
1947       DBUG_EXECUTE_IF("waiting_for_disable_binlog",
1948                       {
1949                       const char act[]= "now "
1950                       "signal dump_thread_reached_wait_point "
1951                       "wait_for continue_dump_thread no_clear_event";
1952                       DBUG_ASSERT(!debug_sync_set_action(current_thd,
1953                                                          STRING_WITH_LEN(act)));
1954                       };);
1955 
1956       // clear flag because we open a new binlog
1957       binlog_has_previous_gtids_log_event= false;
1958 
1959       THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog);
1960       /*
1961         When moving to next binlog, always check if binlog is enabled or not.
1962         There can be critical errors like for example out of memory scenarios
1963         which prevent mysqld server from writing to binlog. In those cases
1964         server refers to binlog_error_action variable and takes the appropriate
1965         action. If users choose to ignore the error then binary log will be
1966         disabled and server will continue to do its work. In such cases dump
1967         thread which is trying to move to the next log will fail as binlog index
1968         file and binlog file are already closed and their corresponding caches
1969         are also cleared.
1970 
1971         Hence first check if binary log is enabled or not. If enabled look for
1972         the next binary log in the index file. If it is disabled open the index
1973         file once again and check if there any more binary logs that needs to be
1974         sent. Keep reading binary log files until find_next_log returns empty.
1975         If there is an error during open index file or we sent all binary logs
1976         then ER_MASTER_FATAL_ERROR_READING_BINLOG is raised.
1977       */
1978       mysql_bin_log.lock_index();
1979       if (!mysql_bin_log.is_open())
1980       {
1981         if (mysql_bin_log.open_index_file(mysql_bin_log.get_index_fname(),
1982                                           linfo.log_file_name,FALSE))
1983         {
1984           errmsg = "Binary log is not open and failed to open index file to "
1985             "retrieve next file.";
1986           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1987           mysql_bin_log.unlock_index();
1988           GOTO_ERR;
1989         }
1990         is_index_file_reopened_on_binlog_disable= true;
1991       }
1992       if (mysql_bin_log.find_next_log(&linfo, 0))
1993       {
1994         DBUG_EXECUTE_IF("waiting_for_disable_binlog",
1995                         {
1996                         const char act[]= "now signal consumed_binlog";
1997                         DBUG_ASSERT(!debug_sync_set_action(current_thd,
1998                                                            STRING_WITH_LEN(act)));
1999                         };);
2000         errmsg = "could not find next log";
2001         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2002         mysql_bin_log.unlock_index();
2003         GOTO_ERR;
2004       }
2005       mysql_bin_log.unlock_index();
2006 
2007       end_io_cache(&log);
2008       mysql_file_close(file, MYF(MY_WME));
2009 
2010       /* reset transmit packet for the possible fake rotate event */
2011       if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
2012                                 observe_transmission))
2013         GOTO_ERR;
2014 
2015       /*
2016         Call fake_rotate_event() in case the previous log (the one which
2017         we have just finished reading) did not contain a Rotate event.
2018         There are at least two cases when this can happen:
2019 
2020         - The previous binary log was the last one before the master was
2021           shutdown and restarted.
2022 
2023         - The previous binary log was GTID-free (did not contain a
2024           Previous_gtids_log_event) and the slave is connecting using
2025           the GTID protocol.
2026 
2027         This way we tell the slave about the new log's name and
2028         position.  If the binlog is 5.0 or later, the next event we
2029         are going to read and send is Format_description_log_event.
2030       */
2031       if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0 ||
2032           fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
2033                             &errmsg, current_checksum_alg))
2034       {
2035         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2036         GOTO_ERR;
2037       }
2038 
2039       p_coord->file_name= log_file_name; // reset to the next
2040     }
2041   }
2042 
2043 end:
2044   /*
2045     If the dump thread was killed because of a duplicate slave UUID we
2046     will fail throwing an error to the slave so it will not try to
2047     reconnect anymore.
2048   */
2049   mysql_mutex_lock(&thd->LOCK_thd_data);
2050   was_killed_by_duplicate_slave_id= thd->duplicate_slave_id;
2051   mysql_mutex_unlock(&thd->LOCK_thd_data);
2052   if (was_killed_by_duplicate_slave_id)
2053   {
2054     errmsg= "A slave with the same server_uuid/server_id as this slave "
2055             "has connected to the master";
2056     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2057     goto err;
2058   }
2059   thd->set_stmt_da(saved_da);
2060   end_io_cache(&log);
2061   mysql_file_close(file, MYF(MY_WME));
2062 
2063   if (has_transmit_started)
2064     (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
2065   my_eof(thd);
2066   THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
2067   mysql_mutex_lock(&LOCK_thread_count);
2068   thd->current_linfo = 0;
2069   mysql_mutex_unlock(&LOCK_thread_count);
2070   thd->variables.max_allowed_packet= old_max_allowed_packet;
2071   DBUG_VOID_RETURN;
2072 
2073 err:
2074   THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
2075   if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
2076   {
2077     /*
2078        detailing the fatal error message with coordinates
2079        of the last position read.
2080     */
2081     my_snprintf(error_text, sizeof(error_text),
2082                 "%s; the first event '%s' at %lld, "
2083                 "the last event read from '%s' at %lld, "
2084                 "the last byte read from '%s' at %lld.",
2085                 errmsg,
2086                 p_start_coord->file_name, p_start_coord->pos,
2087                 p_coord->file_name, p_coord->pos,
2088                 log_file_name, my_b_tell(&log));
2089   }
2090   else
2091   {
2092     strncpy(error_text, errmsg, sizeof(error_text));
2093     error_text[sizeof(error_text) - 1]= '\0';
2094   }
2095   end_io_cache(&log);
2096   if (is_index_file_reopened_on_binlog_disable)
2097     mysql_bin_log.close(LOG_CLOSE_INDEX, true/*need_lock_log=true*/,
2098 			true/*need_lock_index=true*/);
2099   if (has_transmit_started)
2100     (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
2101   /*
2102     Exclude  iteration through thread list
2103     this is needed for purge_logs() - it will iterate through
2104     thread list and update thd->current_linfo->index_file_offset
2105     this mutex will make sure that it never tried to update our linfo
2106     after we return from this stack frame
2107   */
2108   mysql_mutex_lock(&LOCK_thread_count);
2109   thd->current_linfo = 0;
2110   mysql_mutex_unlock(&LOCK_thread_count);
2111   if (file >= 0)
2112     mysql_file_close(file, MYF(MY_WME));
2113   thd->variables.max_allowed_packet= old_max_allowed_packet;
2114 
2115   thd->set_stmt_da(saved_da);
2116   my_message(my_errno, error_text, MYF(0));
2117   DBUG_VOID_RETURN;
2118 }
2119 
2120 
2121 /**
2122   An auxiliary function extracts slave UUID.
2123 
2124   @param[in]    thd  THD to access a user variable
2125   @param[out]   value String to return UUID value.
2126 
2127   @return       if success value is returned else NULL is returned.
2128 */
get_slave_uuid(THD * thd,String * value)2129 String *get_slave_uuid(THD *thd, String *value)
2130 {
2131   uchar name[]= "slave_uuid";
2132 
2133   if (value == NULL)
2134     return NULL;
2135   Mutex_lock lock_guard(&thd->LOCK_thd_data);
2136   user_var_entry *entry=
2137     (user_var_entry*) my_hash_search(&thd->user_vars, name, sizeof(name)-1);
2138   if (entry && entry->length() > 0)
2139   {
2140     value->copy(entry->ptr(), entry->length(), NULL);
2141     return value;
2142   }
2143 
2144   return NULL;
2145 }
2146 
2147 /*
2148 
2149   Kill all Binlog_dump threads which previously talked to the same slave
2150   ("same" means with the same UUID(for slave versions >= 5.6) or same server id
2151   (for slave versions < 5.6). Indeed, if the slave stops, if the
2152   Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it
2153   will keep existing until a query is written to the binlog. If the master is
2154   idle, then this could last long, and if the slave reconnects, we could have 2
2155   Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
2156   binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
2157   the master kills any existing thread with the slave's UUID/server id (if this id is
2158   not zero; it will be true for real slaves, but false for mysqlbinlog when it
2159   sends COM_BINLOG_DUMP to get a remote binlog dump).
2160 
2161   SYNOPSIS
2162     kill_zombie_dump_threads()
2163     @param thd newly connected dump thread object
2164 
2165 */
2166 
kill_zombie_dump_threads(THD * thd)2167 void kill_zombie_dump_threads(THD *thd)
2168 {
2169   String slave_uuid;
2170   get_slave_uuid(thd, &slave_uuid);
2171   if (slave_uuid.length() == 0 && thd->server_id == 0)
2172     return;
2173 
2174   mysql_mutex_lock(&LOCK_thread_count);
2175   THD *tmp= NULL;
2176   Thread_iterator it= global_thread_list_begin();
2177   Thread_iterator end= global_thread_list_end();
2178   bool is_zombie_thread= false;
2179   for (; it != end; ++it)
2180   {
2181     if ((*it) != thd && ((*it)->get_command() == COM_BINLOG_DUMP ||
2182                          (*it)->get_command() == COM_BINLOG_DUMP_GTID))
2183     {
2184       String tmp_uuid;
2185       get_slave_uuid((*it), &tmp_uuid);
2186       if (slave_uuid.length())
2187       {
2188         is_zombie_thread= (tmp_uuid.length() &&
2189                            !strncmp(slave_uuid.c_ptr(),
2190                                     tmp_uuid.c_ptr(), UUID_LENGTH));
2191       }
2192       else
2193       {
2194         /*
2195           Check if it is a 5.5 slave's dump thread i.e., server_id should be
2196           same && dump thread should not contain 'UUID'.
2197         */
2198         is_zombie_thread= (((*it)->server_id == thd->server_id) &&
2199                            !tmp_uuid.length());
2200       }
2201       if (is_zombie_thread)
2202       {
2203         tmp= *it;
2204         mysql_mutex_lock(&tmp->LOCK_thd_data);	// Lock from delete
2205         break;
2206       }
2207     }
2208   }
2209   mysql_mutex_unlock(&LOCK_thread_count);
2210   if (tmp)
2211   {
2212     /*
2213       Here we do not call kill_one_thread() as
2214       it will be slow because it will iterate through the list
2215       again. We just to do kill the thread ourselves.
2216     */
2217     if (log_warnings > 1)
2218     {
2219       if (slave_uuid.length())
2220       {
2221         sql_print_information("While initializing dump thread for slave with "
2222                               "UUID <%s>, found a zombie dump thread with the "
2223                               "same UUID. Master is killing the zombie dump "
2224                               "thread(%lu).", slave_uuid.c_ptr(),
2225                               tmp->thread_id);
2226       }
2227       else
2228       {
2229         sql_print_information("While initializing dump thread for slave with "
2230                               "server_id <%u>, found a zombie dump thread with the "
2231                               "same server_id. Master is killing the zombie dump "
2232                               "thread(%lu).", thd->server_id,
2233                               tmp->thread_id);
2234       }
2235     }
2236     tmp->duplicate_slave_id= true;
2237     tmp->awake(THD::KILL_QUERY);
2238     mysql_mutex_unlock(&tmp->LOCK_thd_data);
2239   }
2240 }
2241 
2242 /**
2243   Execute a RESET MASTER statement.
2244 
2245   @param thd Pointer to THD object of the client thread executing the
2246   statement.
2247 
2248   @retval 0 success
2249   @retval 1 error
2250 */
reset_master(THD * thd)2251 int reset_master(THD* thd)
2252 {
2253   if (!mysql_bin_log.is_open())
2254   {
2255     my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
2256                ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
2257     return 1;
2258   }
2259 
2260   if (mysql_bin_log.reset_logs(thd))
2261     return 1;
2262   (void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
2263   return 0;
2264 }
2265 
2266 
2267 /**
2268   Execute a SHOW MASTER STATUS statement.
2269 
2270   @param thd Pointer to THD object for the client thread executing the
2271   statement.
2272 
2273   @retval FALSE success
2274   @retval TRUE failure
2275 */
show_master_status(THD * thd)2276 bool show_master_status(THD* thd)
2277 {
2278   Protocol *protocol= thd->protocol;
2279   char* gtid_set_buffer= NULL;
2280   int gtid_set_size= 0;
2281   List<Item> field_list;
2282 
2283   DBUG_ENTER("show_binlog_info");
2284 
2285   global_sid_lock->wrlock();
2286   const Gtid_set* gtid_set= gtid_state->get_logged_gtids();
2287   if ((gtid_set_size= gtid_set->to_string(&gtid_set_buffer)) < 0)
2288   {
2289     global_sid_lock->unlock();
2290     my_eof(thd);
2291     my_free(gtid_set_buffer);
2292     DBUG_RETURN(true);
2293   }
2294   global_sid_lock->unlock();
2295 
2296   field_list.push_back(new Item_empty_string("File", FN_REFLEN));
2297   field_list.push_back(new Item_return_int("Position",20,
2298 					   MYSQL_TYPE_LONGLONG));
2299   field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
2300   field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
2301   field_list.push_back(new Item_empty_string("Executed_Gtid_Set",
2302                                              gtid_set_size));
2303 
2304   if (protocol->send_result_set_metadata(&field_list,
2305                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2306   {
2307     my_free(gtid_set_buffer);
2308     DBUG_RETURN(true);
2309   }
2310   protocol->prepare_for_resend();
2311 
2312   if (mysql_bin_log.is_open())
2313   {
2314     LOG_INFO li;
2315     mysql_bin_log.get_current_log(&li);
2316     int dir_len = dirname_length(li.log_file_name);
2317     protocol->store(li.log_file_name + dir_len, &my_charset_bin);
2318     protocol->store((ulonglong) li.pos);
2319     protocol->store(binlog_filter->get_do_db());
2320     protocol->store(binlog_filter->get_ignore_db());
2321     protocol->store(gtid_set_buffer, &my_charset_bin);
2322     if (protocol->write())
2323     {
2324       my_free(gtid_set_buffer);
2325       DBUG_RETURN(true);
2326     }
2327   }
2328   my_eof(thd);
2329   my_free(gtid_set_buffer);
2330   DBUG_RETURN(false);
2331 }
2332 
2333 
2334 /**
2335   Execute a SHOW BINARY LOGS statement.
2336 
2337   @param thd Pointer to THD object for the client thread executing the
2338   statement.
2339 
2340   @retval FALSE success
2341   @retval TRUE failure
2342 */
show_binlogs(THD * thd)2343 bool show_binlogs(THD* thd)
2344 {
2345   IO_CACHE *index_file;
2346   LOG_INFO cur;
2347   File file;
2348   char fname[FN_REFLEN];
2349   List<Item> field_list;
2350   uint length;
2351   int cur_dir_len;
2352   Protocol *protocol= thd->protocol;
2353   DBUG_ENTER("show_binlogs");
2354 
2355   if (!mysql_bin_log.is_open())
2356   {
2357     my_error(ER_NO_BINARY_LOGGING, MYF(0));
2358     DBUG_RETURN(TRUE);
2359   }
2360 
2361   field_list.push_back(new Item_empty_string("Log_name", 255));
2362   field_list.push_back(new Item_return_int("File_size", 20,
2363                                            MYSQL_TYPE_LONGLONG));
2364   if (protocol->send_result_set_metadata(&field_list,
2365                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2366     DBUG_RETURN(TRUE);
2367 
2368   mysql_mutex_lock(mysql_bin_log.get_log_lock());
2369   DEBUG_SYNC(thd, "show_binlogs_after_lock_log_before_lock_index");
2370   mysql_bin_log.lock_index();
2371   index_file=mysql_bin_log.get_index_file();
2372 
2373   mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
2374   mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
2375 
2376   cur_dir_len= dirname_length(cur.log_file_name);
2377 
2378   reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
2379 
2380   /* The file ends with EOF or empty line */
2381   while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
2382   {
2383     int dir_len;
2384     ulonglong file_length= 0;                   // Length if open fails
2385     fname[--length] = '\0';                     // remove the newline
2386 
2387     protocol->prepare_for_resend();
2388     dir_len= dirname_length(fname);
2389     length-= dir_len;
2390     protocol->store(fname + dir_len, length, &my_charset_bin);
2391 
2392     if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
2393       file_length= cur.pos;  /* The active log, use the active position */
2394     else
2395     {
2396       /* this is an old log, open it and find the size */
2397       if ((file= mysql_file_open(key_file_binlog,
2398                                  fname, O_RDONLY | O_SHARE | O_BINARY,
2399                                  MYF(0))) >= 0)
2400       {
2401         file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
2402         mysql_file_close(file, MYF(0));
2403       }
2404     }
2405     protocol->store(file_length);
2406     if (protocol->write())
2407     {
2408       DBUG_PRINT("info", ("stopping dump thread because protocol->write failed at line %d", __LINE__));
2409       goto err;
2410     }
2411   }
2412   if(index_file->error == -1)
2413     goto err;
2414   mysql_bin_log.unlock_index();
2415   my_eof(thd);
2416   DBUG_RETURN(FALSE);
2417 
2418 err:
2419   mysql_bin_log.unlock_index();
2420   DBUG_RETURN(TRUE);
2421 }
2422 
2423 #endif /* HAVE_REPLICATION */
2424