1 /* Copyright (c) 2010, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23
24 #include "sql_priv.h"
25 #include "unireg.h"
26 #include "sql_parse.h" // check_access
27 #include "global_threads.h"
28 #include "mutex_lock.h" // Mutex_lock
29 #ifdef HAVE_REPLICATION
30
31 #include "sql_acl.h" // SUPER_ACL
32 #include "log_event.h"
33 #include "rpl_filter.h"
34 #include <my_dir.h>
35 #include "rpl_handler.h"
36 #include "rpl_master.h"
37 #include "debug_sync.h"
38
39 int max_binlog_dump_events = 0; // unlimited
40 my_bool opt_sporadic_binlog_dump_fail = 0;
41
42 #ifndef DBUG_OFF
43 static int binlog_dump_count = 0;
44 #endif
45
46 #define SLAVE_LIST_CHUNK 128
47 #define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
48 HASH slave_list;
49 extern TYPELIB binlog_checksum_typelib;
50
51
52 #define get_object(p, obj, msg) \
53 {\
54 uint len; \
55 if (p >= p_end) \
56 { \
57 my_error(ER_MALFORMED_PACKET, MYF(0)); \
58 my_free(si); \
59 return 1; \
60 } \
61 len= (uint)*p++; \
62 if (p + len > p_end || len >= sizeof(obj)) \
63 {\
64 errmsg= msg;\
65 goto err; \
66 }\
67 strmake(obj,(char*) p,len); \
68 p+= len; \
69 }\
70
71 extern "C" uint32
slave_list_key(SLAVE_INFO * si,size_t * len,my_bool not_used MY_ATTRIBUTE ((unused)))72 *slave_list_key(SLAVE_INFO* si, size_t *len,
73 my_bool not_used MY_ATTRIBUTE((unused)))
74 {
75 *len = 4;
76 return &si->server_id;
77 }
78
slave_info_free(void * s)79 extern "C" void slave_info_free(void *s)
80 {
81 my_free(s);
82 }
83
84 #ifdef HAVE_PSI_INTERFACE
85 static PSI_mutex_key key_LOCK_slave_list;
86
87 static PSI_mutex_info all_slave_list_mutexes[]=
88 {
89 { &key_LOCK_slave_list, "LOCK_slave_list", PSI_FLAG_GLOBAL}
90 };
91
init_all_slave_list_mutexes(void)92 static void init_all_slave_list_mutexes(void)
93 {
94 int count;
95
96 count= array_elements(all_slave_list_mutexes);
97 mysql_mutex_register("sql", all_slave_list_mutexes, count);
98 }
99 #endif /* HAVE_PSI_INTERFACE */
100
init_slave_list()101 void init_slave_list()
102 {
103 #ifdef HAVE_PSI_INTERFACE
104 init_all_slave_list_mutexes();
105 #endif
106
107 my_hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0,
108 (my_hash_get_key) slave_list_key,
109 (my_hash_free_key) slave_info_free, 0);
110 mysql_mutex_init(key_LOCK_slave_list, &LOCK_slave_list, MY_MUTEX_INIT_FAST);
111 }
112
end_slave_list()113 void end_slave_list()
114 {
115 /* No protection by a mutex needed as we are only called at shutdown */
116 if (my_hash_inited(&slave_list))
117 {
118 my_hash_free(&slave_list);
119 mysql_mutex_destroy(&LOCK_slave_list);
120 }
121 }
122
123 /**
124 Register slave in 'slave_list' hash table.
125
126 @return
127 0 ok
128 @return
129 1 Error. Error message sent to client
130 */
131
register_slave(THD * thd,uchar * packet,uint packet_length)132 int register_slave(THD* thd, uchar* packet, uint packet_length)
133 {
134 int res;
135 SLAVE_INFO *si;
136 uchar *p= packet, *p_end= packet + packet_length;
137 const char *errmsg= "Wrong parameters to function register_slave";
138
139 if (check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0))
140 return 1;
141 if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
142 goto err2;
143
144 /* 4 bytes for the server id */
145 if (p + 4 > p_end)
146 {
147 my_error(ER_MALFORMED_PACKET, MYF(0));
148 my_free(si);
149 return 1;
150 }
151
152 thd->server_id= si->server_id= uint4korr(p);
153 p+= 4;
154 get_object(p,si->host, "Failed to register slave: too long 'report-host'");
155 get_object(p,si->user, "Failed to register slave: too long 'report-user'");
156 get_object(p,si->password, "Failed to register slave; too long 'report-password'");
157 if (p+10 > p_end)
158 goto err;
159 si->port= uint2korr(p);
160 p += 2;
161 /*
162 We need to by pass the bytes used in the fake rpl_recovery_rank
163 variable. It was removed in patch for BUG#13963. But this would
164 make a server with that patch unable to connect to an old master.
165 See: BUG#49259
166 */
167 p += 4;
168 if (!(si->master_id= uint4korr(p)))
169 si->master_id= server_id;
170 si->thd= thd;
171
172 mysql_mutex_lock(&LOCK_slave_list);
173 unregister_slave(thd, false, false/*need_lock_slave_list=false*/);
174 res= my_hash_insert(&slave_list, (uchar*) si);
175 mysql_mutex_unlock(&LOCK_slave_list);
176 return res;
177
178 err:
179 my_free(si);
180 my_message(ER_UNKNOWN_ERROR, errmsg, MYF(0)); /* purecov: inspected */
181 err2:
182 return 1;
183 }
184
unregister_slave(THD * thd,bool only_mine,bool need_lock_slave_list)185 void unregister_slave(THD* thd, bool only_mine, bool need_lock_slave_list)
186 {
187 if (thd->server_id)
188 {
189 if (need_lock_slave_list)
190 mysql_mutex_lock(&LOCK_slave_list);
191 else
192 mysql_mutex_assert_owner(&LOCK_slave_list);
193
194 SLAVE_INFO* old_si;
195 if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list,
196 (uchar*)&thd->server_id, 4)) &&
197 (!only_mine || old_si->thd == thd))
198 my_hash_delete(&slave_list, (uchar*)old_si);
199
200 if (need_lock_slave_list)
201 mysql_mutex_unlock(&LOCK_slave_list);
202 }
203 }
204
205
206 /**
207 Execute a SHOW SLAVE HOSTS statement.
208
209 @param thd Pointer to THD object for the client thread executing the
210 statement.
211
212 @retval FALSE success
213 @retval TRUE failure
214 */
show_slave_hosts(THD * thd)215 bool show_slave_hosts(THD* thd)
216 {
217 List<Item> field_list;
218 Protocol *protocol= thd->protocol;
219 DBUG_ENTER("show_slave_hosts");
220
221 field_list.push_back(new Item_return_int("Server_id", 10,
222 MYSQL_TYPE_LONG));
223 field_list.push_back(new Item_empty_string("Host", 20));
224 if (opt_show_slave_auth_info)
225 {
226 field_list.push_back(new Item_empty_string("User",20));
227 field_list.push_back(new Item_empty_string("Password",20));
228 }
229 field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG));
230 field_list.push_back(new Item_return_int("Master_id", 10,
231 MYSQL_TYPE_LONG));
232 field_list.push_back(new Item_empty_string("Slave_UUID", UUID_LENGTH));
233
234 if (protocol->send_result_set_metadata(&field_list,
235 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
236 DBUG_RETURN(TRUE);
237
238 mysql_mutex_lock(&LOCK_slave_list);
239
240 for (uint i = 0; i < slave_list.records; ++i)
241 {
242 SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list, i);
243 protocol->prepare_for_resend();
244 protocol->store((uint32) si->server_id);
245 protocol->store(si->host, &my_charset_bin);
246 if (opt_show_slave_auth_info)
247 {
248 protocol->store(si->user, &my_charset_bin);
249 protocol->store(si->password, &my_charset_bin);
250 }
251 protocol->store((uint32) si->port);
252 protocol->store((uint32) si->master_id);
253
254 /* get slave's UUID */
255 String slave_uuid;
256 if (get_slave_uuid(si->thd, &slave_uuid))
257 protocol->store(slave_uuid.c_ptr_safe(), &my_charset_bin);
258 if (protocol->write())
259 {
260 mysql_mutex_unlock(&LOCK_slave_list);
261 DBUG_RETURN(TRUE);
262 }
263 }
264 mysql_mutex_unlock(&LOCK_slave_list);
265 my_eof(thd);
266 DBUG_RETURN(FALSE);
267 }
268
269
270 /**
271 Internal to mysql_binlog_send() routine that recalculates checksum for
272 a FD event (asserted) that needs additional arranment prior sending to slave.
273 */
fix_checksum(String * packet,ulong ev_offset)274 inline void fix_checksum(String *packet, ulong ev_offset)
275 {
276 /* recalculate the crc for this event */
277 uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
278 ha_checksum crc= my_checksum(0L, NULL, 0);
279 DBUG_ASSERT(data_len ==
280 LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
281 BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN);
282 crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len -
283 BINLOG_CHECKSUM_LEN);
284 int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);
285 }
286
287
get_binlog_checksum_uservar(THD * thd)288 static user_var_entry * get_binlog_checksum_uservar(THD * thd)
289 {
290 LEX_STRING name= { C_STRING_WITH_LEN("master_binlog_checksum")};
291 user_var_entry *entry=
292 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
293 name.length);
294 return entry;
295 }
296
297 /**
298 Function for calling in mysql_binlog_send
299 to check if slave initiated checksum-handshake.
300
301 @param[in] thd THD to access a user variable
302
303 @return TRUE if handshake took place, FALSE otherwise
304 */
305
is_slave_checksum_aware(THD * thd)306 static bool is_slave_checksum_aware(THD * thd)
307 {
308 DBUG_ENTER("is_slave_checksum_aware");
309 user_var_entry *entry= get_binlog_checksum_uservar(thd);
310 DBUG_RETURN(entry? true : false);
311 }
312
313 /**
314 Function for calling in mysql_binlog_send
315 to get the value of @@binlog_checksum of the master at
316 time of checksum-handshake.
317
318 The value tells the master whether to compute or not, and the slave
319 to verify or not the first artificial Rotate event's checksum.
320
321 @param[in] thd THD to access a user variable
322
323 @return value of @@binlog_checksum alg according to
324 @c enum enum_binlog_checksum_alg
325 */
326
get_binlog_checksum_value_at_connect(THD * thd)327 static uint8 get_binlog_checksum_value_at_connect(THD * thd)
328 {
329 uint8 ret;
330
331 DBUG_ENTER("get_binlog_checksum_value_at_connect");
332 user_var_entry *entry= get_binlog_checksum_uservar(thd);
333 if (!entry)
334 {
335 ret= BINLOG_CHECKSUM_ALG_UNDEF;
336 }
337 else
338 {
339 DBUG_ASSERT(entry->type() == STRING_RESULT);
340 String str;
341 uint dummy_errors;
342 str.copy(entry->ptr(), entry->length(), &my_charset_bin, &my_charset_bin,
343 &dummy_errors);
344 ret= (uint8) find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1;
345 DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg
346 }
347 DBUG_RETURN(ret);
348 }
349
350 /*
351 fake_rotate_event() builds a fake (=which does not exist physically in any
352 binlog) Rotate event, which contains the name of the binlog we are going to
353 send to the slave (because the slave may not know it if it just asked for
354 MASTER_LOG_FILE='', MASTER_LOG_POS=4).
355 < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
356 After this version we always call it, so that a 3.23.58 slave can rely on
357 it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
358 zeros in the good positions which, by chance, make it possible for the 3.23
359 slave to detect that this event is unexpected) (this is luck which happens
360 because the master and slave disagree on the size of the header of
361 Log_event).
362
363 Relying on the event length of the Rotate event instead of these
364 well-placed zeros was not possible as Rotate events have a variable-length
365 part.
366 */
367
fake_rotate_event(NET * net,String * packet,char * log_file_name,ulonglong position,const char ** errmsg,uint8 checksum_alg_arg)368 static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
369 ulonglong position, const char** errmsg,
370 uint8 checksum_alg_arg)
371 {
372 DBUG_ENTER("fake_rotate_event");
373 char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
374
375 /*
376 this Rotate is to be sent with checksum if and only if
377 slave's get_master_version_and_clock time handshake value
378 of master's @@global.binlog_checksum was TRUE
379 */
380
381 my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
382 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
383
384 /*
385 'when' (the timestamp) is set to 0 so that slave could distinguish between
386 real and fake Rotate events (if necessary)
387 */
388 memset(header, 0, 4);
389 header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
390
391 char* p = log_file_name+dirname_length(log_file_name);
392 uint ident_len = (uint) strlen(p);
393 ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
394 (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
395 int4store(header + SERVER_ID_OFFSET, server_id);
396 int4store(header + EVENT_LEN_OFFSET, event_len);
397 int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
398
399 // TODO: check what problems this may cause and fix them
400 int4store(header + LOG_POS_OFFSET, 0);
401
402 packet->append(header, sizeof(header));
403 int8store(buf+R_POS_OFFSET,position);
404 packet->append(buf, ROTATE_HEADER_LEN);
405 packet->append(p, ident_len);
406
407 if (do_checksum)
408 {
409 char b[BINLOG_CHECKSUM_LEN];
410 ha_checksum crc= my_checksum(0L, NULL, 0);
411 crc= my_checksum(crc, (uchar*)header, sizeof(header));
412 crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
413 crc= my_checksum(crc, (uchar*)p, ident_len);
414 int4store(b, crc);
415 packet->append(b, sizeof(b));
416 }
417
418 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
419 {
420 *errmsg = "failed on my_net_write()";
421 DBUG_RETURN(-1);
422 }
423 DBUG_RETURN(0);
424 }
425
426 /*
427 Reset thread transmit packet buffer for event sending
428
429 This function allocates header bytes for event transmission, and
430 should be called before store the event data to the packet buffer.
431 */
reset_transmit_packet(THD * thd,ushort flags,ulong * ev_offset,const char ** errmsg,bool observe_transmission)432 static int reset_transmit_packet(THD *thd, ushort flags,
433 ulong *ev_offset, const char **errmsg,
434 bool observe_transmission)
435 {
436 int ret= 0;
437 String *packet= &thd->packet;
438
439 /* reserve and set default header */
440 packet->length(0);
441 packet->set("\0", 1, &my_charset_bin);
442
443 if (observe_transmission &&
444 RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
445 {
446 *errmsg= "Failed to run hook 'reserve_header'";
447 my_errno= ER_UNKNOWN_ERROR;
448 ret= 1;
449 }
450 *ev_offset= packet->length();
451 DBUG_PRINT("info", ("rpl_master.cc:reset_transmit_packet returns %d", ret));
452 return ret;
453 }
454
send_file(THD * thd)455 static int send_file(THD *thd)
456 {
457 NET* net = &thd->net;
458 int fd = -1, error = 1;
459 size_t bytes;
460 char fname[FN_REFLEN+1];
461 const char *errmsg = 0;
462 int old_timeout;
463 unsigned long packet_len;
464 uchar buf[IO_SIZE]; // It's safe to alloc this
465 DBUG_ENTER("send_file");
466
467 /*
468 The client might be slow loading the data, give him wait_timeout to do
469 the job
470 */
471 old_timeout= net->read_timeout;
472 my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
473
474 /*
475 We need net_flush here because the client will not know it needs to send
476 us the file name until it has processed the load event entry
477 */
478 if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
479 {
480 errmsg = "while reading file name";
481 goto err;
482 }
483
484 // terminate with \0 for fn_format
485 *((char*)net->read_pos + packet_len) = 0;
486 fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
487 // this is needed to make replicate-ignore-db
488 if (!strcmp(fname,"/dev/null"))
489 goto end;
490
491 if ((fd= mysql_file_open(key_file_send_file,
492 fname, O_RDONLY, MYF(0))) < 0)
493 {
494 errmsg = "on open of file";
495 goto err;
496 }
497
498 while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0)
499 {
500 if (my_net_write(net, buf, bytes))
501 {
502 errmsg = "while writing data to client";
503 goto err;
504 }
505 }
506
507 end:
508 if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
509 (my_net_read(net) == packet_error))
510 {
511 errmsg = "while negotiating file transfer close";
512 goto err;
513 }
514 error = 0;
515
516 err:
517 my_net_set_read_timeout(net, old_timeout);
518 if (fd >= 0)
519 mysql_file_close(fd, MYF(0));
520 if (errmsg)
521 {
522 sql_print_error("Failed in send_file() %s", errmsg);
523 DBUG_PRINT("error", ("%s", errmsg));
524 }
525 DBUG_RETURN(error);
526 }
527
528
test_for_non_eof_log_read_errors(int error,const char ** errmsg)529 int test_for_non_eof_log_read_errors(int error, const char **errmsg)
530 {
531 if (error == LOG_READ_EOF)
532 return 0;
533 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
534 switch (error) {
535 case LOG_READ_BOGUS:
536 *errmsg = "bogus data in log event";
537 break;
538 case LOG_READ_TOO_LARGE:
539 *errmsg = "log event entry exceeded max_allowed_packet; \
540 Increase max_allowed_packet on master";
541 break;
542 case LOG_READ_IO:
543 *errmsg = "I/O error reading log event";
544 break;
545 case LOG_READ_MEM:
546 *errmsg = "memory allocation failed reading log event";
547 break;
548 case LOG_READ_TRUNC:
549 *errmsg = "binlog truncated in the middle of event; consider out of disk space on master";
550 break;
551 case LOG_READ_CHECKSUM_FAILURE:
552 *errmsg = "event read from binlog did not pass crc check";
553 break;
554 default:
555 *errmsg = "unknown error reading log event on the master";
556 break;
557 }
558 return error;
559 }
560
561
562 /**
563 An auxiliary function for calling in mysql_binlog_send
564 to initialize the heartbeat timeout in waiting for a binlogged event.
565
566 @param[in] thd THD to access a user variable
567
568 @return heartbeat period an ulonglong of nanoseconds
569 or zero if heartbeat was not demanded by slave
570 */
get_heartbeat_period(THD * thd)571 static ulonglong get_heartbeat_period(THD * thd)
572 {
573 my_bool null_value;
574 LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
575 user_var_entry *entry=
576 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
577 name.length);
578 return entry? entry->val_int(&null_value) : 0;
579 }
580
581 /*
582 Function prepares and sends repliation heartbeat event.
583
584 @param net net object of THD
585 @param packet buffer to store the heartbeat instance
586 @param event_coordinates binlog file name and position of the last
587 real event master sent from binlog
588
589 @note
590 Among three essential pieces of heartbeat data Log_event::when
591 is computed locally.
592 The error to send is serious and should force terminating
593 the dump thread.
594 */
send_heartbeat_event(NET * net,String * packet,const struct event_coordinates * coord,uint8 checksum_alg_arg)595 static int send_heartbeat_event(NET* net, String* packet,
596 const struct event_coordinates *coord,
597 uint8 checksum_alg_arg)
598 {
599 DBUG_ENTER("send_heartbeat_event");
600 char header[LOG_EVENT_HEADER_LEN];
601 my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
602 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
603 /*
604 'when' (the timestamp) is set to 0 so that slave could distinguish between
605 real and fake Rotate events (if necessary)
606 */
607 memset(header, 0, 4); // when
608
609 header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
610
611 char* p= coord->file_name + dirname_length(coord->file_name);
612
613 uint ident_len = strlen(p);
614 ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
615 (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
616 int4store(header + SERVER_ID_OFFSET, server_id);
617 int4store(header + EVENT_LEN_OFFSET, event_len);
618 int2store(header + FLAGS_OFFSET, 0);
619
620 int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
621
622 packet->append(header, sizeof(header));
623 packet->append(p, ident_len); // log_file_name
624
625 if (do_checksum)
626 {
627 char b[BINLOG_CHECKSUM_LEN];
628 ha_checksum crc= my_checksum(0L, NULL, 0);
629 crc= my_checksum(crc, (uchar*) header, sizeof(header));
630 crc= my_checksum(crc, (uchar*) p, ident_len);
631 int4store(b, crc);
632 packet->append(b, sizeof(b));
633 }
634
635 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
636 net_flush(net))
637 {
638 DBUG_RETURN(-1);
639 }
640 DBUG_RETURN(0);
641 }
642
643
644 /**
645 Reset and send a heartbeat event to the slave. This function is *only* used
646 to send heartbeat events which carry binary log position of the *last
647 skipped transaction*. Since thd->packet is used to send events to the
648 slave and the packet currently holds an event, packet state is stored
649 first. A heartbeat event is sent to the slave and the state is restored
650 later. Note that, the caller has to send the last skipped coordinates
651 to this function.
652
653 @param[in] net This master-slave network handler
654 @param[in] packet packet that is to be sent to the slave.
655 @param[in] last_skip_coord coordinates for last skipped transaction
656 @param[in] ev_offset event offset
657 @param[in] checksum_alg_arg checksum alg used
658 @param[in,out] errmsg generated error message
659
660 @retval 0 ok
661 @retval -1 error
662 */
send_last_skip_group_heartbeat(THD * thd,NET * net,String * packet,const struct event_coordinates * last_skip_coord,ulong * ev_offset,uint8 checksum_alg_arg,const char ** errmsg,bool observe_transmission)663 static int send_last_skip_group_heartbeat(THD *thd, NET* net, String *packet,
664 const struct event_coordinates *last_skip_coord,
665 ulong *ev_offset,
666 uint8 checksum_alg_arg, const char **errmsg,
667 bool observe_transmission)
668 {
669 DBUG_ENTER("send_last_skip_group_heartbeat");
670 String save_packet;
671 int save_offset= *ev_offset;
672
673 /* Save the current read packet */
674 save_packet.swap(*packet);
675
676 if (reset_transmit_packet(thd, 0, ev_offset, errmsg, observe_transmission))
677 DBUG_RETURN(-1);
678
679 /* Send heart beat event to the slave to update slave threads coordinates */
680 if (send_heartbeat_event(net, packet, last_skip_coord, checksum_alg_arg))
681 {
682 *errmsg= "Failed on my_net_write()";
683 my_errno= ER_UNKNOWN_ERROR;
684 DBUG_RETURN(-1);
685 }
686
687 /* Restore the packet and event offset */
688 packet->swap(save_packet);
689 *ev_offset= save_offset;
690
691 DBUG_PRINT("info", ("rpl_master.cc:send_last_skip_group_heartbeat returns 0"));
692 DBUG_RETURN(0);
693 }
694
695
696 /**
697 If there are less than BYTES bytes left to read in the packet,
698 report error.
699 */
700 #define CHECK_PACKET_SIZE(BYTES) \
701 do { \
702 if (packet_bytes_todo < BYTES) \
703 goto error_malformed_packet; \
704 } while (0)
705
706 /**
707 Auxiliary macro used to define READ_INT and READ_STRING.
708
709 Check that there are at least BYTES more bytes to read, then read
710 the bytes using the given DECODER, then advance the reading
711 position.
712 */
713 #define READ(DECODE, BYTES) \
714 do { \
715 CHECK_PACKET_SIZE(BYTES); \
716 DECODE; \
717 packet_position+= BYTES; \
718 packet_bytes_todo-= BYTES; \
719 } while (0)
720
721 #define SKIP(BYTES) READ((void)(0), BYTES)
722
723 /**
724 Check that there are at least BYTES more bytes to read, then read
725 the bytes and decode them into the given integer VAR, then advance
726 the reading position.
727 */
728 #define READ_INT(VAR, BYTES) \
729 READ(VAR= uint ## BYTES ## korr(packet_position), BYTES)
730
731 /**
732 Check that there are at least BYTES more bytes to read and that
733 BYTES+1 is not greater than BUFFER_SIZE, then read the bytes into
734 the given variable VAR, then advance the reading position.
735 */
736 #define READ_STRING(VAR, BYTES, BUFFER_SIZE) \
737 do { \
738 if (BUFFER_SIZE <= BYTES) \
739 goto error_malformed_packet; \
740 READ(memcpy(VAR, packet_position, BYTES), BYTES); \
741 VAR[BYTES]= '\0'; \
742 } while (0)
743
744
com_binlog_dump(THD * thd,char * packet,uint packet_length)745 bool com_binlog_dump(THD *thd, char *packet, uint packet_length)
746 {
747 DBUG_ENTER("com_binlog_dump");
748 ulong pos;
749 ushort flags= 0;
750 const uchar* packet_position= (uchar *) packet;
751 uint packet_bytes_todo= packet_length;
752
753 status_var_increment(thd->status_var.com_other);
754 thd->enable_slow_log= opt_log_slow_admin_statements;
755 if (check_global_access(thd, REPL_SLAVE_ACL))
756 DBUG_RETURN(false);
757
758 /*
759 4 bytes is too little, but changing the protocol would break
760 compatibility. This has been fixed in the new protocol. @see
761 com_binlog_dump_gtid().
762 */
763 READ_INT(pos, 4);
764 READ_INT(flags, 2);
765 READ_INT(thd->server_id, 4);
766
767 DBUG_PRINT("info", ("pos=%lu flags=%d server_id=%d", pos, flags, thd->server_id));
768
769 kill_zombie_dump_threads(thd);
770
771 general_log_print(thd, thd->get_command(), "Log: '%s' Pos: %ld",
772 packet + 10, (long) pos);
773 mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, NULL, flags);
774
775 unregister_slave(thd, true, true/*need_lock_slave_list=true*/);
776 /* fake COM_QUIT -- if we get here, the thread needs to terminate */
777 DBUG_RETURN(true);
778
779 error_malformed_packet:
780 my_error(ER_MALFORMED_PACKET, MYF(0));
781 DBUG_RETURN(true);
782 }
783
784
com_binlog_dump_gtid(THD * thd,char * packet,uint packet_length)785 bool com_binlog_dump_gtid(THD *thd, char *packet, uint packet_length)
786 {
787 DBUG_ENTER("com_binlog_dump_gtid");
788 /*
789 Before going GA, we need to make this protocol extensible without
790 breaking compatitibilty. /Alfranio.
791 */
792 ushort flags= 0;
793 uint32 data_size= 0;
794 uint64 pos= 0;
795 char name[FN_REFLEN + 1];
796 uint32 name_size= 0;
797 char* gtid_string= NULL;
798 const uchar* packet_position= (uchar *) packet;
799 uint packet_bytes_todo= packet_length;
800 Sid_map sid_map(NULL/*no sid_lock because this is a completely local object*/);
801 Gtid_set slave_gtid_executed(&sid_map);
802
803 status_var_increment(thd->status_var.com_other);
804 thd->enable_slow_log= opt_log_slow_admin_statements;
805 if (check_global_access(thd, REPL_SLAVE_ACL))
806 DBUG_RETURN(false);
807
808 READ_INT(flags, 2);
809 READ_INT(thd->server_id, 4);
810 READ_INT(name_size, 4);
811 READ_STRING(name, name_size, sizeof(name));
812 READ_INT(pos, 8);
813 DBUG_PRINT("info", ("pos=%llu flags=%d server_id=%d", pos, flags, thd->server_id));
814 READ_INT(data_size, 4);
815 CHECK_PACKET_SIZE(data_size);
816 if (slave_gtid_executed.add_gtid_encoding(packet_position, data_size) !=
817 RETURN_STATUS_OK)
818 DBUG_RETURN(true);
819 gtid_string= slave_gtid_executed.to_string();
820 DBUG_PRINT("info", ("Slave %d requested to read %s at position %llu gtid set "
821 "'%s'.", thd->server_id, name, pos, gtid_string));
822
823 kill_zombie_dump_threads(thd);
824 general_log_print(thd, thd->get_command(), "Log: '%s' Pos: %llu GTIDs: '%s'",
825 name, pos, gtid_string);
826 my_free(gtid_string);
827 mysql_binlog_send(thd, name, (my_off_t) pos, &slave_gtid_executed, flags);
828
829 unregister_slave(thd, true, true/*need_lock_slave_list=true*/);
830 /* fake COM_QUIT -- if we get here, the thread needs to terminate */
831 DBUG_RETURN(true);
832
833 error_malformed_packet:
834 my_error(ER_MALFORMED_PACKET, MYF(0));
835 DBUG_RETURN(true);
836 }
837
838
mysql_binlog_send(THD * thd,char * log_ident,my_off_t pos,const Gtid_set * slave_gtid_executed,int flags)839 void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
840 const Gtid_set* slave_gtid_executed, int flags)
841 {
842 /**
843 @todo: Clean up loop so that, among other things, we only have one
844 call to send_file(). This is WL#5721.
845 */
846 #define GOTO_ERR \
847 do { \
848 DBUG_PRINT("info", ("mysql_binlog_send fails; goto err from line %d", \
849 __LINE__)); \
850 goto err; \
851 } while (0)
852 LOG_INFO linfo;
853 char *log_file_name = linfo.log_file_name;
854 char search_file_name[FN_REFLEN], *name;
855
856 bool is_index_file_reopened_on_binlog_disable= false;
857 ulong ev_offset;
858 bool using_gtid_protocol= slave_gtid_executed != NULL;
859 bool searching_first_gtid= using_gtid_protocol;
860 bool skip_group= false;
861 bool binlog_has_previous_gtids_log_event= false;
862 bool gtid_event_logged= false;
863 bool has_transmit_started= false;
864 Sid_map *sid_map= slave_gtid_executed ? slave_gtid_executed->get_sid_map() : NULL;
865
866 IO_CACHE log;
867 File file = -1;
868 String* packet = &thd->packet;
869 time_t last_event_sent_ts= time(0);
870 bool time_for_hb_event= false;
871 int error= 0;
872 const char *errmsg = "Unknown error";
873 char error_text[MAX_SLAVE_ERRMSG]= {0}; // to be send to slave via my_message()
874 NET* net = &thd->net;
875 mysql_mutex_t *log_lock;
876 mysql_cond_t *log_cond;
877 uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
878 Format_description_log_event fdle(BINLOG_VERSION), *p_fdle= &fdle;
879 Gtid first_gtid;
880
881 #ifndef DBUG_OFF
882 int left_events = max_binlog_dump_events;
883 #endif
884 int old_max_allowed_packet= thd->variables.max_allowed_packet;
885 /*
886 Dump thread sends ER_MASTER_FATAL_ERROR_READING_BINLOG instead of the real
887 errors happend on master to slave when erorr is encountered.
888 So set a temporary Diagnostics_area to thd. The low level error is always
889 set into the temporary Diagnostics_area and be ingored. The original
890 Diagnostics_area will be restored at the end of this function.
891 ER_MASTER_FATAL_ERROR_READING_BINLOG will be set to the original
892 Diagnostics_area.
893 */
894 Diagnostics_area temp_da;
895 Diagnostics_area *saved_da= thd->get_stmt_da();
896 thd->set_stmt_da(&temp_da);
897 bool was_killed_by_duplicate_slave_id= false;
898
899 DBUG_ENTER("mysql_binlog_send");
900 DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
901
902 memset(&log, 0, sizeof(log));
903 /*
904 heartbeat_period from @master_heartbeat_period user variable
905 */
906 ulonglong heartbeat_period= get_heartbeat_period(thd);
907 struct timespec heartbeat_buf;
908 struct timespec *heartbeat_ts= NULL;
909 const LOG_POS_COORD start_coord= { log_ident, pos },
910 *p_start_coord= &start_coord;
911 LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
912 *p_coord= &coord_buf;
913
914 /*
915 We use the following variables to send a HB event
916 when groups are skipped during a GTID protocol.
917 */
918 /* Flag to check if last transaction was skipped */
919 bool last_skip_group= skip_group;
920 /* File name where last skip group is present */
921 char last_skip_log_name[FN_REFLEN+1];
922 /* Coordinates of the last skip group */
923 LOG_POS_COORD last_skip_coord_buf= {last_skip_log_name, BIN_LOG_HEADER_SIZE},
924 *p_last_skip_coord= &last_skip_coord_buf;
925 bool observe_transmission= false;
926
927 if (heartbeat_period != LL(0))
928 {
929 heartbeat_ts= &heartbeat_buf;
930 set_timespec_nsec(*heartbeat_ts, 0);
931 }
932
933 #ifndef DBUG_OFF
934 if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
935 {
936 errmsg = "Master fails in COM_BINLOG_DUMP because of --opt-sporadic-binlog-dump-fail";
937 my_errno= ER_UNKNOWN_ERROR;
938 GOTO_ERR;
939 }
940 #endif
941
942 if (!mysql_bin_log.is_open())
943 {
944 errmsg = "Binary log is not open";
945 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
946 GOTO_ERR;
947 }
948 if (!server_id_supplied)
949 {
950 errmsg = "Misconfigured master - server_id was not set";
951 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
952 GOTO_ERR;
953 }
954
955 name= search_file_name;
956 if (log_ident[0])
957 mysql_bin_log.make_log_name(search_file_name, log_ident);
958 else
959 {
960 if (using_gtid_protocol)
961 {
962 /*
963 In normal scenarios, it is not possible that Slave will
964 contain more gtids than Master with resepctive to Master's
965 UUID. But it could be possible case if Master's binary log
966 is truncated(due to raid failure) or Master's binary log is
967 deleted but GTID_PURGED was not set properly. That scenario
968 needs to be validated, i.e., it should *always* be the case that
969 Slave's gtid executed set (+retrieved set) is a subset of
970 Master's gtid executed set with respective to Master's UUID.
971 If it happens, dump thread will be stopped during the handshake
972 with Slave (thus the Slave's I/O thread will be stopped with the
973 error. Otherwise, it can lead to data inconsistency between Master
974 and Slave.
975 */
976 Sid_map* slave_sid_map= slave_gtid_executed->get_sid_map();
977 DBUG_ASSERT(slave_sid_map);
978 global_sid_lock->wrlock();
979 const rpl_sid &server_sid= gtid_state->get_server_sid();
980 rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid);
981 if (!slave_gtid_executed->is_subset_for_sid(gtid_state->get_logged_gtids(),
982 gtid_state->get_server_sidno(),
983 subset_sidno))
984 {
985 errmsg= ER(ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER);
986 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
987 global_sid_lock->unlock();
988 GOTO_ERR;
989 }
990 /*
991 Setting GTID_PURGED (when GTID_EXECUTED set is empty i.e., when
992 previous_gtids are also empty) will make binlog rotate. That
993 leaves first binary log with empty previous_gtids and second
994 binary log's previous_gtids with the value of gtid_purged.
995 In find_first_log_not_in_gtid_set() while we search for a binary
996 log whose previous_gtid_set is subset of slave_gtid_executed,
997 in this particular case, server will always find the first binary
998 log with empty previous_gtids which is subset of any given
999 slave_gtid_executed. Thus Master thinks that it found the first
1000 binary log which is actually not correct and unable to catch
1001 this error situation. Hence adding below extra if condition
1002 to check the situation. Slave should know about Master's purged GTIDs.
1003 If Slave's GTID executed + retrieved set does not contain Master's
1004 complete purged GTID list, that means Slave is requesting(expecting)
1005 GTIDs which were purged by Master. We should let Slave know about the
1006 situation. i.e., throw error if slave's GTID executed set is not
1007 a superset of Master's purged GTID set.
1008 The other case, where user deleted binary logs manually
1009 (without using 'PURGE BINARY LOGS' command) but gtid_purged
1010 is not set by the user, the following if condition cannot catch it.
1011 But that is not a problem because in find_first_log_not_in_gtid_set()
1012 while checking for subset previous_gtids binary log, the logic
1013 will not find one and an error ER_MASTER_HAS_PURGED_REQUIRED_GTIDS
1014 is thrown from there.
1015 */
1016 if (!gtid_state->get_lost_gtids()->is_subset(slave_gtid_executed))
1017 {
1018 mysql_bin_log.report_missing_purged_gtids(slave_gtid_executed, &errmsg);
1019 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1020 global_sid_lock->unlock();
1021 GOTO_ERR;
1022 }
1023 global_sid_lock->unlock();
1024 first_gtid.clear();
1025 if (mysql_bin_log.find_first_log_not_in_gtid_set(name,
1026 slave_gtid_executed,
1027 &first_gtid,
1028 &errmsg))
1029 {
1030 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1031 GOTO_ERR;
1032 }
1033 }
1034 else
1035 name= 0; // Find first log
1036 }
1037
1038 linfo.index_file_offset= 0;
1039
1040 if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1041 {
1042 errmsg = "Could not find first log file name in binary log index file";
1043 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1044 GOTO_ERR;
1045 }
1046
1047 mysql_mutex_lock(&LOCK_thread_count);
1048 thd->current_linfo = &linfo;
1049 mysql_mutex_unlock(&LOCK_thread_count);
1050
1051 if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0)
1052 {
1053 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1054 GOTO_ERR;
1055 }
1056 if (pos < BIN_LOG_HEADER_SIZE)
1057 {
1058 errmsg= "Client requested master to start replication from position < 4";
1059 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1060 GOTO_ERR;
1061 }
1062 if (pos > my_b_filelength(&log))
1063 {
1064 errmsg= "Client requested master to start replication from position > file size";
1065 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1066 GOTO_ERR;
1067 }
1068
1069 if (log_warnings > 1)
1070 sql_print_information("Start binlog_dump to master_thread_id(%lu) slave_server(%u), pos(%s, %lu)",
1071 thd->thread_id, thd->server_id, log_ident, (ulong)pos);
1072 if (RUN_HOOK(binlog_transmit, transmit_start,
1073 (thd, flags, log_ident, pos, &observe_transmission)))
1074 {
1075 errmsg= "Failed to run hook 'transmit_start'";
1076 my_errno= ER_UNKNOWN_ERROR;
1077 GOTO_ERR;
1078 }
1079 has_transmit_started= true;
1080 /* reset transmit packet for the fake rotate event below */
1081 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1082 observe_transmission))
1083 GOTO_ERR;
1084
1085 /*
1086 Tell the client about the log name with a fake Rotate event;
1087 this is needed even if we also send a Format_description_log_event
1088 just after, because that event does not contain the binlog's name.
1089 Note that as this Rotate event is sent before
1090 Format_description_log_event, the slave cannot have any info to
1091 understand this event's format, so the header len of
1092 Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
1093 than other events except FORMAT_DESCRIPTION_EVENT).
1094 Before 4.0.14 we called fake_rotate_event below only if (pos ==
1095 BIN_LOG_HEADER_SIZE), because if this is false then the slave
1096 already knows the binlog's name.
1097 Since, we always call fake_rotate_event; if the slave already knew
1098 the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
1099 useless but does not harm much. It is nice for 3.23 (>=.58) slaves
1100 which test Rotate events to see if the master is 4.0 (then they
1101 choose to stop because they can't replicate 4.0); by always calling
1102 fake_rotate_event we are sure that 3.23.58 and newer will detect the
1103 problem as soon as replication starts (BUG#198).
1104 Always calling fake_rotate_event makes sending of normal
1105 (=from-binlog) Rotate events a priori unneeded, but it is not so
1106 simple: the 2 Rotate events are not equivalent, the normal one is
1107 before the Stop event, the fake one is after. If we don't send the
1108 normal one, then the Stop event will be interpreted (by existing 4.0
1109 slaves) as "the master stopped", which is wrong. So for safety,
1110 given that we want minimum modification of 4.0, we send the normal
1111 and fake Rotates.
1112 */
1113 if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
1114 get_binlog_checksum_value_at_connect(current_thd)))
1115 {
1116 /*
1117 This error code is not perfect, as fake_rotate_event() does not
1118 read anything from the binlog; if it fails it's because of an
1119 error in my_net_write(), fortunately it will say so in errmsg.
1120 */
1121 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1122 GOTO_ERR;
1123 }
1124
1125 /*
1126 Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
1127 this larger than the corresponding packet (query) sent
1128 from client to master.
1129 */
1130 thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
1131
1132 /*
1133 We can set log_lock now, it does not move (it's a member of
1134 mysql_bin_log, and it's already inited, and it will be destroyed
1135 only at shutdown).
1136 */
1137 p_coord->pos= pos; // the first hb matches the slave's last seen value
1138 log_lock= mysql_bin_log.get_log_lock();
1139 log_cond= mysql_bin_log.get_log_cond();
1140 if (pos > BIN_LOG_HEADER_SIZE)
1141 {
1142 /* reset transmit packet for the event read from binary log
1143 file */
1144 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1145 observe_transmission))
1146 GOTO_ERR;
1147
1148 /*
1149 Try to find a Format_description_log_event at the beginning of
1150 the binlog
1151 */
1152 if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
1153 {
1154 DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__));
1155 /*
1156 The packet has offsets equal to the normal offsets in a
1157 binlog event + ev_offset (the first ev_offset characters are
1158 the header (default \0)).
1159 */
1160 DBUG_PRINT("info",
1161 ("Looked for a Format_description_log_event, found event type %s",
1162 Log_event::get_type_str((Log_event_type)(*packet)[EVENT_TYPE_OFFSET + ev_offset])));
1163 if ((*packet)[EVENT_TYPE_OFFSET + ev_offset] == FORMAT_DESCRIPTION_EVENT)
1164 {
1165 current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
1166 packet->length() - ev_offset);
1167 DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
1168 current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1169 current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
1170 if (!is_slave_checksum_aware(thd) &&
1171 current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1172 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1173 {
1174 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1175 errmsg= "Slave can not handle replication events with the checksum "
1176 "that master is configured to log";
1177 sql_print_warning("Master is configured to log replication events "
1178 "with checksum, but will not send such events to "
1179 "slaves that cannot process them");
1180 GOTO_ERR;
1181 }
1182 (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1183 /*
1184 mark that this event with "log_pos=0", so the slave
1185 should not increment master's binlog position
1186 (rli->group_master_log_pos)
1187 */
1188 int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
1189 /*
1190 if reconnect master sends FD event with `created' as 0
1191 to avoid destroying temp tables.
1192 */
1193 int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
1194 ST_CREATED_OFFSET+ev_offset, (ulong) 0);
1195
1196 /* fix the checksum due to latest changes in header */
1197 if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1198 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1199 fix_checksum(packet, ev_offset);
1200
1201 /* send it */
1202 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
1203 {
1204 errmsg = "Failed on my_net_write()";
1205 my_errno= ER_UNKNOWN_ERROR;
1206 GOTO_ERR;
1207 }
1208
1209 /*
1210 No need to save this event. We are only doing simple reads
1211 (no real parsing of the events) so we don't need it. And so
1212 we don't need the artificial Format_description_log_event of
1213 3.23&4.x.
1214 */
1215 }
1216 }
1217 else
1218 {
1219 if (test_for_non_eof_log_read_errors(error, &errmsg))
1220 GOTO_ERR;
1221 /*
1222 It's EOF, nothing to do, go on reading next events, the
1223 Format_description_log_event will be found naturally if it is written.
1224 */
1225 }
1226 } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
1227 else
1228 {
1229 /* The Format_description_log_event event will be found naturally. */
1230 }
1231
1232 /* seek to the requested position, to start the requested dump */
1233 my_b_seek(&log, pos); // Seek will done on next read
1234
1235 while (!net->error && net->vio != 0 && !thd->killed)
1236 {
1237 Log_event_type event_type= UNKNOWN_EVENT;
1238 bool goto_next_binlog= false;
1239
1240 /* reset the transmit packet for the event read from binary log
1241 file */
1242 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1243 observe_transmission))
1244 GOTO_ERR;
1245 DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
1246 {
1247 const char act[]= "now wait_for signal.rotate_finished no_clear_event";
1248 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1249 STRING_WITH_LEN(act)));
1250 };);
1251 bool is_active_binlog= false;
1252 while (!thd->killed &&
1253 !(error= Log_event::read_log_event(&log, packet, log_lock,
1254 current_checksum_alg,
1255 log_file_name,
1256 &is_active_binlog)))
1257 {
1258 DBUG_EXECUTE_IF("simulate_dump_thread_kill",
1259 {
1260 thd->killed= THD::KILL_CONNECTION;
1261 });
1262 DBUG_EXECUTE_IF("hold_dump_thread_inside_inner_loop",
1263 {
1264 const char act[]= "now "
1265 "signal signal_inside_inner_loop "
1266 "wait_for signal_continue";
1267 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1268 STRING_WITH_LEN(act)));
1269 DBUG_ASSERT(thd->killed);
1270 };);
1271 time_t created;
1272 DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__));
1273 #ifndef DBUG_OFF
1274 if (max_binlog_dump_events && !left_events--)
1275 {
1276 net_flush(net);
1277 errmsg = "Debugging binlog dump abort";
1278 my_errno= ER_UNKNOWN_ERROR;
1279 GOTO_ERR;
1280 }
1281 #endif
1282 /*
1283 log's filename does not change while it's active
1284 */
1285 p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
1286
1287 event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
1288 DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
1289 {
1290 if (event_type == XID_EVENT)
1291 {
1292 net_flush(net);
1293 const char act[]=
1294 "now "
1295 "wait_for signal.continue";
1296 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1297 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1298 STRING_WITH_LEN(act)));
1299 }
1300 });
1301
1302 switch (event_type)
1303 {
1304 case FORMAT_DESCRIPTION_EVENT:
1305 skip_group= false;
1306 current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
1307 packet->length() - ev_offset);
1308 DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
1309 current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1310 current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
1311 if (!is_slave_checksum_aware(thd) &&
1312 current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1313 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1314 {
1315 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1316 errmsg= "Slave can not handle replication events with the checksum "
1317 "that master is configured to log";
1318 sql_print_warning("Master is configured to log replication events "
1319 "with checksum, but will not send such events to "
1320 "slaves that cannot process them");
1321 GOTO_ERR;
1322 }
1323 (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1324
1325 created= uint4korr(packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
1326 ST_CREATED_OFFSET+ev_offset);
1327
1328 if (using_gtid_protocol && created > 0)
1329 {
1330 if (first_gtid.sidno >= 1 && first_gtid.gno >= 1 &&
1331 slave_gtid_executed->contains_gtid(first_gtid.sidno,
1332 first_gtid.gno))
1333 {
1334 /*
1335 As we are skipping at least the first transaction of the binlog,
1336 we must clear the "created" field of the FD event (set it to 0)
1337 to avoid cleaning up temp tables on slave.
1338 */
1339 DBUG_PRINT("info",("setting 'created' to 0 before sending FD event"));
1340 int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
1341 ST_CREATED_OFFSET+ev_offset, (ulong) 0);
1342
1343 /* Fix the checksum due to latest changes in header */
1344 if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1345 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1346 fix_checksum(packet, ev_offset);
1347
1348 first_gtid.clear();
1349 }
1350 }
1351 /*
1352 Fixes the information on the checksum algorithm when a new
1353 format description is read. Notice that this only necessary
1354 when we need to filter out some transactions which were
1355 already processed.
1356 */
1357 p_fdle->checksum_alg= current_checksum_alg;
1358 break;
1359
1360 case ANONYMOUS_GTID_LOG_EVENT:
1361 /* do nothing */
1362 break;
1363 case GTID_LOG_EVENT:
1364 if (gtid_mode == 0)
1365 {
1366 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1367 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1368 GOTO_ERR;
1369 }
1370 if (using_gtid_protocol)
1371 {
1372 /*
1373 The current implementation checks if the GTID was not processed
1374 by the slave. This means that everytime a GTID is read, one needs
1375 to check if it was already processed by the slave. If this is the
1376 case, the group is not sent. Otherwise, it must be sent.
1377
1378 I think we can do better than that. /Alfranio.
1379 */
1380 ulonglong checksum_size=
1381 ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1382 p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
1383 BINLOG_CHECKSUM_LEN + ev_offset : ev_offset);
1384 /**
1385 @todo: use a local sid_map to avoid the lookup in the
1386 global one here /Sven
1387 */
1388 Gtid_log_event gtid_ev(packet->ptr() + ev_offset,
1389 packet->length() - checksum_size,
1390 p_fdle);
1391 skip_group= slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map),
1392 gtid_ev.get_gno());
1393 searching_first_gtid= skip_group;
1394 DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) skip group(%d) "
1395 "searching gtid(%d).",
1396 gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
1397 skip_group, searching_first_gtid));
1398 gtid_event_logged= true;
1399 }
1400 break;
1401
1402 case STOP_EVENT:
1403 case INCIDENT_EVENT:
1404 skip_group= searching_first_gtid;
1405 break;
1406
1407 case PREVIOUS_GTIDS_LOG_EVENT:
1408 binlog_has_previous_gtids_log_event= true;
1409 if (gtid_mode == 0)
1410 {
1411 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1412 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1413 GOTO_ERR;
1414 }
1415 /* FALLTHROUGH */
1416 case ROTATE_EVENT:
1417 skip_group= false;
1418 break;
1419
1420 default:
1421 if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
1422 /*
1423 If we come here, it means we are seeing a 'normal' DML/DDL
1424 event (e.g. query_log_event) without having seen any
1425 Previous_gtid_log_event. That means we are in an old
1426 binlog (no previous_gtids_log_event). When using the GTID
1427 protocol, that means we must skip the entire binary log
1428 and jump to the next one.
1429 */
1430 goto_next_binlog= true;
1431
1432 if (!gtid_event_logged && using_gtid_protocol)
1433 {
1434 /*
1435 Skip groups in the binlogs which don't have any gtid event
1436 logged before them. When opt_gtid_deployment_step is ON, the server
1437 doesn't generate GTID and so no gtid_event is logged before binlog
1438 events. But when opt_gtid_deployment_step is OFF, the server starts
1439 writing gtid_events in the middle of active binlog. When slave
1440 connects with gtid_protocol, master needs to skip binlog events
1441 which don't have corresponding gtid_event.
1442 */
1443 skip_group= true;
1444 }
1445 break;
1446 }
1447
1448 if (goto_next_binlog)
1449 // stop reading from this binlog
1450 break;
1451
1452 DBUG_PRINT("info", ("EVENT_TYPE %d SEARCHING %d SKIP_GROUP %d file %s pos %lld\n",
1453 event_type, searching_first_gtid, skip_group, log_file_name,
1454 my_b_tell(&log)));
1455 pos = my_b_tell(&log);
1456 if (observe_transmission &&
1457 RUN_HOOK(binlog_transmit, before_send_event,
1458 (thd, flags, packet, log_file_name, pos)))
1459 {
1460 my_errno= ER_UNKNOWN_ERROR;
1461 errmsg= "run 'before_send_event' hook failed";
1462 GOTO_ERR;
1463 }
1464
1465 /* The present event was skipped, so store the event coordinates */
1466 if (skip_group)
1467 {
1468 p_last_skip_coord->pos= p_coord->pos;
1469 strcpy(p_last_skip_coord->file_name, p_coord->file_name);
1470 /*
1471 If we have not send any event from past 'heartbeat_period' time
1472 period, then it is time to send a packet before skipping this group.
1473 */
1474 DBUG_EXECUTE_IF("inject_2sec_sleep_when_skipping_an_event",
1475 {
1476 my_sleep(2000000);
1477 });
1478 time_t now= time(0);
1479 DBUG_ASSERT(now >= last_event_sent_ts);
1480 time_for_hb_event= ((ulonglong)(now - last_event_sent_ts) >=
1481 (ulonglong)(heartbeat_period/1000000000UL));
1482 }
1483
1484 if ((!skip_group && last_skip_group
1485 && event_type != FORMAT_DESCRIPTION_EVENT) || time_for_hb_event)
1486 {
1487 /*
1488 Dump thread is ready to send it's first transaction after
1489 one or more skipped transactions or dump thread did not
1490 send any event from past 'heartbeat_period' time frame
1491 (busy skipping gtid groups). Send a heart beat event
1492 to update slave IO thread coordinates before that happens.
1493
1494 Notice that for a new binary log file, FORMAT_DESCRIPTION_EVENT
1495 is the first event to be sent to the slave. In this case, it is
1496 no need to send a HB event (which might have coordinates
1497 of previous binlog file).
1498 */
1499
1500 if (send_last_skip_group_heartbeat(thd, net, packet, p_last_skip_coord,
1501 &ev_offset, current_checksum_alg,
1502 &errmsg, observe_transmission))
1503 {
1504 GOTO_ERR;
1505 }
1506 last_event_sent_ts= time(0);
1507 last_skip_group= time_for_hb_event= false;
1508 }
1509 else
1510 {
1511 last_skip_group= skip_group;
1512 }
1513
1514 DBUG_EXECUTE_IF("master_xid_trigger",
1515 {
1516 Log_event_type event_type= (Log_event_type)
1517 (*packet)[EVENT_TYPE_OFFSET + ev_offset];
1518 if (event_type == XID_EVENT)
1519 {
1520 const char act[]= "now signal master_xid_reached wait_for resume";
1521 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1522 STRING_WITH_LEN(act)));
1523 }});
1524
1525 if (skip_group == false)
1526 {
1527 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
1528 {
1529 errmsg = "Failed on my_net_write()";
1530 my_errno= ER_UNKNOWN_ERROR;
1531 GOTO_ERR;
1532 }
1533 last_event_sent_ts= time(0);
1534 }
1535
1536 DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
1537 {
1538 if (event_type == XID_EVENT)
1539 {
1540 net_flush(net);
1541 }
1542 });
1543
1544 DBUG_PRINT("info", ("log event code %d", event_type));
1545 if (skip_group == false && event_type == LOAD_EVENT)
1546 {
1547 if (send_file(thd))
1548 {
1549 errmsg = "failed in send_file()";
1550 my_errno= ER_UNKNOWN_ERROR;
1551 GOTO_ERR;
1552 }
1553 }
1554
1555 if (observe_transmission &&
1556 RUN_HOOK(binlog_transmit, after_send_event,
1557 (thd, flags, packet, log_file_name, skip_group ? pos : 0)))
1558 {
1559 errmsg= "Failed to run hook 'after_send_event'";
1560 my_errno= ER_UNKNOWN_ERROR;
1561 GOTO_ERR;
1562 }
1563
1564 /* reset transmit packet for next loop */
1565 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1566 observe_transmission))
1567 GOTO_ERR;
1568 }
1569
1570 /* If the above while is killed due to thd->killed flag and not
1571 due to read_log_event error, then do nothing.*/
1572 if (thd->killed)
1573 goto end;
1574 DBUG_EXECUTE_IF("wait_after_binlog_EOF",
1575 {
1576 const char act[]= "now wait_for signal.rotate_finished no_clear_event";
1577 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1578 STRING_WITH_LEN(act)));
1579 };);
1580
1581 /*
1582 TODO: now that we are logging the offset, check to make sure
1583 the recorded offset and the actual match.
1584 Guilhem 2003-06: this is not true if this master is a slave
1585 <4.0.15 running with --log-slave-updates, because then log_pos may
1586 be the offset in the-master-of-this-master's binlog.
1587 */
1588 if (test_for_non_eof_log_read_errors(error, &errmsg))
1589 GOTO_ERR;
1590
1591 if (!is_active_binlog)
1592 goto_next_binlog= true;
1593
1594 if (!goto_next_binlog)
1595 {
1596 /*
1597 Block until there is more data in the log
1598 */
1599 if (net_flush(net))
1600 {
1601 errmsg = "failed on net_flush()";
1602 my_errno= ER_UNKNOWN_ERROR;
1603 GOTO_ERR;
1604 }
1605
1606 /*
1607 We may have missed the update broadcast from the log
1608 that has just happened, let's try to catch it if it did.
1609 If we did not miss anything, we just wait for other threads
1610 to signal us.
1611 */
1612 {
1613 log.error=0;
1614 bool read_packet = 0;
1615
1616 #ifndef DBUG_OFF
1617 if (max_binlog_dump_events && !left_events--)
1618 {
1619 errmsg = "Debugging binlog dump abort";
1620 my_errno= ER_UNKNOWN_ERROR;
1621 GOTO_ERR;
1622 }
1623 #endif
1624
1625 /* reset the transmit packet for the event read from binary log
1626 file */
1627 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1628 observe_transmission))
1629 GOTO_ERR;
1630
1631 /*
1632 No one will update the log while we are reading
1633 now, but we'll be quick and just read one record
1634
1635 TODO:
1636 Add an counter that is incremented for each time we update the
1637 binary log. We can avoid the following read if the counter
1638 has not been updated since last read.
1639 */
1640
1641 mysql_mutex_lock(log_lock);
1642 switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
1643 current_checksum_alg)) {
1644 case 0:
1645 DBUG_PRINT("info", ("read_log_event returned 0 on line %d",
1646 __LINE__));
1647 /* we read successfully, so we'll need to send it to the slave */
1648 mysql_mutex_unlock(log_lock);
1649 read_packet = 1;
1650 p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
1651 event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
1652 DBUG_ASSERT(event_type != FORMAT_DESCRIPTION_EVENT);
1653 break;
1654
1655 case LOG_READ_EOF:
1656 {
1657 int ret;
1658 ulong signal_cnt;
1659 DBUG_PRINT("wait",("waiting for data in binary log"));
1660 /*
1661 There are two ways to tell the server to not block:
1662
1663 - Set the BINLOG_DUMP_NON_BLOCK flag.
1664 This is official, documented, not used by any mysql
1665 client, but used by some external users.
1666
1667 - Set server_id=0.
1668 This is unofficial, undocumented, and used by
1669 mysqlbinlog -R since the beginning of time.
1670
1671 When mysqlbinlog --stop-never is used, it sets a 'fake'
1672 server_id that defaults to 1 but can be set to anything
1673 else using stop-never-slave-server-id. This has the
1674 drawback that if the server_id conflicts with any other
1675 running slave, or with any other instance of mysqlbinlog
1676 --stop-never, then that other instance will be killed. It
1677 is also an unnecessary burden on the user to have to
1678 specify a server_id different from all other server_ids
1679 just to avoid conflicts.
1680
1681 As of MySQL 5.6.20 and 5.7.5, mysqlbinlog redundantly sets
1682 the BINLOG_DUMP_NONBLOCK flag when one or both of the
1683 following holds:
1684 - the --stop-never option is *not* specified
1685
1686 In a far future, this means we can remove the unofficial
1687 functionality that server_id=0 implies nonblocking
1688 behavior. That will allow mysqlbinlog to use server_id=0
1689 always. That has the advantage that mysqlbinlog
1690 --stop-never cannot cause any running dump threads to be
1691 killed.
1692 */
1693 if (thd->server_id == 0 || ((flags & BINLOG_DUMP_NON_BLOCK) != 0))
1694 {
1695 DBUG_PRINT("info", ("stopping dump thread because server_id==0 or the BINLOG_DUMP_NON_BLOCK flag is set: server_id=%u flags=%d", thd->server_id, flags));
1696 mysql_mutex_unlock(log_lock);
1697 DBUG_EXECUTE_IF("inject_hb_event_on_mysqlbinlog_dump_thread",
1698 {
1699 /*
1700 Send one HB event (with anything in it, content is irrelevant).
1701 We just want to check that mysqlbinlog will be able to ignore it.
1702
1703 Suicide on failure, since if it happens the entire purpose of the
1704 test is comprimised.
1705 */
1706 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1707 observe_transmission) ||
1708 send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
1709 DBUG_SUICIDE();
1710 });
1711 goto end;
1712 }
1713
1714 #ifndef DBUG_OFF
1715 ulong hb_info_counter= 0;
1716 #endif
1717 PSI_stage_info old_stage;
1718 signal_cnt= mysql_bin_log.signal_cnt;
1719
1720 do
1721 {
1722 if (heartbeat_period != 0)
1723 {
1724 DBUG_ASSERT(heartbeat_ts);
1725 set_timespec_nsec(*heartbeat_ts, heartbeat_period);
1726 }
1727 thd->ENTER_COND(log_cond, log_lock,
1728 &stage_master_has_sent_all_binlog_to_slave,
1729 &old_stage);
1730 /*
1731 When using GTIDs, if the dump thread has reached the end of the
1732 binary log and the last transaction is skipped,
1733 send one heartbeat event even when the heartbeat is off.
1734 If the heartbeat is on, it is better to send a heartbeat
1735 event as the time_out of certain functions (Ex: master_pos_wait()
1736 or semi sync ack timeout) might be less than heartbeat period.
1737 */
1738 if (skip_group)
1739 {
1740 /*
1741 TODO: Number of HB events sent from here can be reduced
1742 by checking whehter it is time to send a HB event or not.
1743 (i.e., using the flag time_for_hb_event)
1744 */
1745 if (send_last_skip_group_heartbeat(thd, net, packet,
1746 p_coord, &ev_offset,
1747 current_checksum_alg, &errmsg,
1748 observe_transmission))
1749 {
1750 thd->EXIT_COND(&old_stage);
1751 GOTO_ERR;
1752 }
1753 last_skip_group= false; /*A HB for this pos has been sent. */
1754 }
1755
1756 ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
1757 DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
1758 if (ret == ETIMEDOUT || ret == ETIME)
1759 {
1760 #ifndef DBUG_OFF
1761 if (hb_info_counter < 3)
1762 {
1763 sql_print_information("master sends heartbeat message");
1764 hb_info_counter++;
1765 if (hb_info_counter == 3)
1766 sql_print_information("the rest of heartbeat info skipped ...");
1767 }
1768 #endif
1769 /* reset transmit packet for the heartbeat event */
1770 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
1771 observe_transmission))
1772 {
1773 thd->EXIT_COND(&old_stage);
1774 GOTO_ERR;
1775 }
1776 if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
1777 {
1778 errmsg = "Failed on my_net_write()";
1779 my_errno= ER_UNKNOWN_ERROR;
1780 thd->EXIT_COND(&old_stage);
1781 GOTO_ERR;
1782 }
1783 }
1784 else
1785 {
1786 DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
1787 }
1788 } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
1789 thd->EXIT_COND(&old_stage);
1790 }
1791 break;
1792
1793 default:
1794 mysql_mutex_unlock(log_lock);
1795 test_for_non_eof_log_read_errors(error, &errmsg);
1796 GOTO_ERR;
1797 }
1798
1799 if (read_packet)
1800 {
1801 switch (event_type)
1802 {
1803 case ANONYMOUS_GTID_LOG_EVENT:
1804 /* do nothing */
1805 break;
1806 case GTID_LOG_EVENT:
1807 if (gtid_mode == 0)
1808 {
1809 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1810 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1811 GOTO_ERR;
1812 }
1813 if (using_gtid_protocol)
1814 {
1815 ulonglong checksum_size=
1816 ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1817 p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
1818 BINLOG_CHECKSUM_LEN + ev_offset : ev_offset);
1819 Gtid_log_event gtid_ev(packet->ptr() + ev_offset,
1820 packet->length() - checksum_size,
1821 p_fdle);
1822 skip_group=
1823 slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map),
1824 gtid_ev.get_gno());
1825 searching_first_gtid= skip_group;
1826 DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) "
1827 "skip group(%d) searching gtid(%d).",
1828 gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
1829 skip_group, searching_first_gtid));
1830 gtid_event_logged= true;
1831 }
1832 break;
1833
1834 case STOP_EVENT:
1835 case INCIDENT_EVENT:
1836 skip_group= searching_first_gtid;
1837 break;
1838
1839 case PREVIOUS_GTIDS_LOG_EVENT:
1840 binlog_has_previous_gtids_log_event= true;
1841 if (gtid_mode == 0)
1842 {
1843 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1844 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1845 GOTO_ERR;
1846 }
1847 /* FALLTHROUGH */
1848 case ROTATE_EVENT:
1849 skip_group= false;
1850 break;
1851 default:
1852 if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
1853 /*
1854 If we come here, it means we are seeing a 'normal' DML/DDL
1855 event (e.g. query_log_event) without having seen any
1856 Previous_gtid_log_event. That means we are in an old
1857 binlog (no previous_gtids_log_event). When using the GTID
1858 protocol, that means we must skip the entire binary log
1859 and jump to the next one.
1860 */
1861 goto_next_binlog= true;
1862 if (!gtid_event_logged && using_gtid_protocol)
1863 {
1864 /*
1865 Skip groups in the binlogs which don't have any gtid event
1866 logged before them.
1867 */
1868 skip_group= true;
1869 }
1870
1871 break;
1872 }
1873
1874 /* The present event was skipped in a GTID protocol, store the coordinates */
1875 if (skip_group)
1876 {
1877 p_last_skip_coord->pos= p_coord->pos;
1878 strcpy(p_last_skip_coord->file_name, p_coord->file_name);
1879 }
1880
1881 if (!skip_group && !goto_next_binlog)
1882 {
1883 /* If the last group was skipped, send a HB event */
1884 if (last_skip_group &&
1885 send_last_skip_group_heartbeat(thd, net, packet,
1886 p_last_skip_coord, &ev_offset,
1887 current_checksum_alg, &errmsg,
1888 observe_transmission))
1889 {
1890 GOTO_ERR;
1891 }
1892
1893 THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
1894 pos = my_b_tell(&log);
1895 if (observe_transmission &&
1896 RUN_HOOK(binlog_transmit, before_send_event,
1897 (thd, flags, packet, log_file_name, pos)))
1898 {
1899 my_errno= ER_UNKNOWN_ERROR;
1900 errmsg= "run 'before_send_event' hook failed";
1901 GOTO_ERR;
1902 }
1903
1904 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
1905 {
1906 errmsg = "Failed on my_net_write()";
1907 my_errno= ER_UNKNOWN_ERROR;
1908 GOTO_ERR;
1909 }
1910 last_event_sent_ts= time(0);
1911
1912 if (event_type == LOAD_EVENT)
1913 {
1914 if (send_file(thd))
1915 {
1916 errmsg = "failed in send_file()";
1917 my_errno= ER_UNKNOWN_ERROR;
1918 GOTO_ERR;
1919 }
1920 }
1921 }
1922
1923 if(!goto_next_binlog)
1924 {
1925 if (observe_transmission &&
1926 RUN_HOOK(binlog_transmit, after_send_event,
1927 (thd, flags, packet, log_file_name,
1928 skip_group ? pos : 0)))
1929 {
1930 my_errno= ER_UNKNOWN_ERROR;
1931 errmsg= "Failed to run hook 'after_send_event'";
1932 GOTO_ERR;
1933 }
1934 }
1935
1936 /* Save the skip group for next iteration */
1937 last_skip_group= skip_group;
1938
1939 }
1940
1941 log.error=0;
1942 }
1943 }
1944
1945 if (goto_next_binlog)
1946 {
1947 DBUG_EXECUTE_IF("waiting_for_disable_binlog",
1948 {
1949 const char act[]= "now "
1950 "signal dump_thread_reached_wait_point "
1951 "wait_for continue_dump_thread no_clear_event";
1952 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1953 STRING_WITH_LEN(act)));
1954 };);
1955
1956 // clear flag because we open a new binlog
1957 binlog_has_previous_gtids_log_event= false;
1958
1959 THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog);
1960 /*
1961 When moving to next binlog, always check if binlog is enabled or not.
1962 There can be critical errors like for example out of memory scenarios
1963 which prevent mysqld server from writing to binlog. In those cases
1964 server refers to binlog_error_action variable and takes the appropriate
1965 action. If users choose to ignore the error then binary log will be
1966 disabled and server will continue to do its work. In such cases dump
1967 thread which is trying to move to the next log will fail as binlog index
1968 file and binlog file are already closed and their corresponding caches
1969 are also cleared.
1970
1971 Hence first check if binary log is enabled or not. If enabled look for
1972 the next binary log in the index file. If it is disabled open the index
1973 file once again and check if there any more binary logs that needs to be
1974 sent. Keep reading binary log files until find_next_log returns empty.
1975 If there is an error during open index file or we sent all binary logs
1976 then ER_MASTER_FATAL_ERROR_READING_BINLOG is raised.
1977 */
1978 mysql_bin_log.lock_index();
1979 if (!mysql_bin_log.is_open())
1980 {
1981 if (mysql_bin_log.open_index_file(mysql_bin_log.get_index_fname(),
1982 linfo.log_file_name,FALSE))
1983 {
1984 errmsg = "Binary log is not open and failed to open index file to "
1985 "retrieve next file.";
1986 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1987 mysql_bin_log.unlock_index();
1988 GOTO_ERR;
1989 }
1990 is_index_file_reopened_on_binlog_disable= true;
1991 }
1992 if (mysql_bin_log.find_next_log(&linfo, 0))
1993 {
1994 DBUG_EXECUTE_IF("waiting_for_disable_binlog",
1995 {
1996 const char act[]= "now signal consumed_binlog";
1997 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1998 STRING_WITH_LEN(act)));
1999 };);
2000 errmsg = "could not find next log";
2001 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2002 mysql_bin_log.unlock_index();
2003 GOTO_ERR;
2004 }
2005 mysql_bin_log.unlock_index();
2006
2007 end_io_cache(&log);
2008 mysql_file_close(file, MYF(MY_WME));
2009
2010 /* reset transmit packet for the possible fake rotate event */
2011 if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
2012 observe_transmission))
2013 GOTO_ERR;
2014
2015 /*
2016 Call fake_rotate_event() in case the previous log (the one which
2017 we have just finished reading) did not contain a Rotate event.
2018 There are at least two cases when this can happen:
2019
2020 - The previous binary log was the last one before the master was
2021 shutdown and restarted.
2022
2023 - The previous binary log was GTID-free (did not contain a
2024 Previous_gtids_log_event) and the slave is connecting using
2025 the GTID protocol.
2026
2027 This way we tell the slave about the new log's name and
2028 position. If the binlog is 5.0 or later, the next event we
2029 are going to read and send is Format_description_log_event.
2030 */
2031 if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0 ||
2032 fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
2033 &errmsg, current_checksum_alg))
2034 {
2035 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2036 GOTO_ERR;
2037 }
2038
2039 p_coord->file_name= log_file_name; // reset to the next
2040 }
2041 }
2042
2043 end:
2044 /*
2045 If the dump thread was killed because of a duplicate slave UUID we
2046 will fail throwing an error to the slave so it will not try to
2047 reconnect anymore.
2048 */
2049 mysql_mutex_lock(&thd->LOCK_thd_data);
2050 was_killed_by_duplicate_slave_id= thd->duplicate_slave_id;
2051 mysql_mutex_unlock(&thd->LOCK_thd_data);
2052 if (was_killed_by_duplicate_slave_id)
2053 {
2054 errmsg= "A slave with the same server_uuid/server_id as this slave "
2055 "has connected to the master";
2056 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2057 goto err;
2058 }
2059 thd->set_stmt_da(saved_da);
2060 end_io_cache(&log);
2061 mysql_file_close(file, MYF(MY_WME));
2062
2063 if (has_transmit_started)
2064 (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
2065 my_eof(thd);
2066 THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
2067 mysql_mutex_lock(&LOCK_thread_count);
2068 thd->current_linfo = 0;
2069 mysql_mutex_unlock(&LOCK_thread_count);
2070 thd->variables.max_allowed_packet= old_max_allowed_packet;
2071 DBUG_VOID_RETURN;
2072
2073 err:
2074 THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
2075 if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
2076 {
2077 /*
2078 detailing the fatal error message with coordinates
2079 of the last position read.
2080 */
2081 my_snprintf(error_text, sizeof(error_text),
2082 "%s; the first event '%s' at %lld, "
2083 "the last event read from '%s' at %lld, "
2084 "the last byte read from '%s' at %lld.",
2085 errmsg,
2086 p_start_coord->file_name, p_start_coord->pos,
2087 p_coord->file_name, p_coord->pos,
2088 log_file_name, my_b_tell(&log));
2089 }
2090 else
2091 {
2092 strncpy(error_text, errmsg, sizeof(error_text));
2093 error_text[sizeof(error_text) - 1]= '\0';
2094 }
2095 end_io_cache(&log);
2096 if (is_index_file_reopened_on_binlog_disable)
2097 mysql_bin_log.close(LOG_CLOSE_INDEX, true/*need_lock_log=true*/,
2098 true/*need_lock_index=true*/);
2099 if (has_transmit_started)
2100 (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
2101 /*
2102 Exclude iteration through thread list
2103 this is needed for purge_logs() - it will iterate through
2104 thread list and update thd->current_linfo->index_file_offset
2105 this mutex will make sure that it never tried to update our linfo
2106 after we return from this stack frame
2107 */
2108 mysql_mutex_lock(&LOCK_thread_count);
2109 thd->current_linfo = 0;
2110 mysql_mutex_unlock(&LOCK_thread_count);
2111 if (file >= 0)
2112 mysql_file_close(file, MYF(MY_WME));
2113 thd->variables.max_allowed_packet= old_max_allowed_packet;
2114
2115 thd->set_stmt_da(saved_da);
2116 my_message(my_errno, error_text, MYF(0));
2117 DBUG_VOID_RETURN;
2118 }
2119
2120
2121 /**
2122 An auxiliary function extracts slave UUID.
2123
2124 @param[in] thd THD to access a user variable
2125 @param[out] value String to return UUID value.
2126
2127 @return if success value is returned else NULL is returned.
2128 */
get_slave_uuid(THD * thd,String * value)2129 String *get_slave_uuid(THD *thd, String *value)
2130 {
2131 uchar name[]= "slave_uuid";
2132
2133 if (value == NULL)
2134 return NULL;
2135 Mutex_lock lock_guard(&thd->LOCK_thd_data);
2136 user_var_entry *entry=
2137 (user_var_entry*) my_hash_search(&thd->user_vars, name, sizeof(name)-1);
2138 if (entry && entry->length() > 0)
2139 {
2140 value->copy(entry->ptr(), entry->length(), NULL);
2141 return value;
2142 }
2143
2144 return NULL;
2145 }
2146
2147 /*
2148
2149 Kill all Binlog_dump threads which previously talked to the same slave
2150 ("same" means with the same UUID(for slave versions >= 5.6) or same server id
2151 (for slave versions < 5.6). Indeed, if the slave stops, if the
2152 Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it
2153 will keep existing until a query is written to the binlog. If the master is
2154 idle, then this could last long, and if the slave reconnects, we could have 2
2155 Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
2156 binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
2157 the master kills any existing thread with the slave's UUID/server id (if this id is
2158 not zero; it will be true for real slaves, but false for mysqlbinlog when it
2159 sends COM_BINLOG_DUMP to get a remote binlog dump).
2160
2161 SYNOPSIS
2162 kill_zombie_dump_threads()
2163 @param thd newly connected dump thread object
2164
2165 */
2166
kill_zombie_dump_threads(THD * thd)2167 void kill_zombie_dump_threads(THD *thd)
2168 {
2169 String slave_uuid;
2170 get_slave_uuid(thd, &slave_uuid);
2171 if (slave_uuid.length() == 0 && thd->server_id == 0)
2172 return;
2173
2174 mysql_mutex_lock(&LOCK_thread_count);
2175 THD *tmp= NULL;
2176 Thread_iterator it= global_thread_list_begin();
2177 Thread_iterator end= global_thread_list_end();
2178 bool is_zombie_thread= false;
2179 for (; it != end; ++it)
2180 {
2181 if ((*it) != thd && ((*it)->get_command() == COM_BINLOG_DUMP ||
2182 (*it)->get_command() == COM_BINLOG_DUMP_GTID))
2183 {
2184 String tmp_uuid;
2185 get_slave_uuid((*it), &tmp_uuid);
2186 if (slave_uuid.length())
2187 {
2188 is_zombie_thread= (tmp_uuid.length() &&
2189 !strncmp(slave_uuid.c_ptr(),
2190 tmp_uuid.c_ptr(), UUID_LENGTH));
2191 }
2192 else
2193 {
2194 /*
2195 Check if it is a 5.5 slave's dump thread i.e., server_id should be
2196 same && dump thread should not contain 'UUID'.
2197 */
2198 is_zombie_thread= (((*it)->server_id == thd->server_id) &&
2199 !tmp_uuid.length());
2200 }
2201 if (is_zombie_thread)
2202 {
2203 tmp= *it;
2204 mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
2205 break;
2206 }
2207 }
2208 }
2209 mysql_mutex_unlock(&LOCK_thread_count);
2210 if (tmp)
2211 {
2212 /*
2213 Here we do not call kill_one_thread() as
2214 it will be slow because it will iterate through the list
2215 again. We just to do kill the thread ourselves.
2216 */
2217 if (log_warnings > 1)
2218 {
2219 if (slave_uuid.length())
2220 {
2221 sql_print_information("While initializing dump thread for slave with "
2222 "UUID <%s>, found a zombie dump thread with the "
2223 "same UUID. Master is killing the zombie dump "
2224 "thread(%lu).", slave_uuid.c_ptr(),
2225 tmp->thread_id);
2226 }
2227 else
2228 {
2229 sql_print_information("While initializing dump thread for slave with "
2230 "server_id <%u>, found a zombie dump thread with the "
2231 "same server_id. Master is killing the zombie dump "
2232 "thread(%lu).", thd->server_id,
2233 tmp->thread_id);
2234 }
2235 }
2236 tmp->duplicate_slave_id= true;
2237 tmp->awake(THD::KILL_QUERY);
2238 mysql_mutex_unlock(&tmp->LOCK_thd_data);
2239 }
2240 }
2241
2242 /**
2243 Execute a RESET MASTER statement.
2244
2245 @param thd Pointer to THD object of the client thread executing the
2246 statement.
2247
2248 @retval 0 success
2249 @retval 1 error
2250 */
reset_master(THD * thd)2251 int reset_master(THD* thd)
2252 {
2253 if (!mysql_bin_log.is_open())
2254 {
2255 my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
2256 ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
2257 return 1;
2258 }
2259
2260 if (mysql_bin_log.reset_logs(thd))
2261 return 1;
2262 (void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
2263 return 0;
2264 }
2265
2266
2267 /**
2268 Execute a SHOW MASTER STATUS statement.
2269
2270 @param thd Pointer to THD object for the client thread executing the
2271 statement.
2272
2273 @retval FALSE success
2274 @retval TRUE failure
2275 */
show_master_status(THD * thd)2276 bool show_master_status(THD* thd)
2277 {
2278 Protocol *protocol= thd->protocol;
2279 char* gtid_set_buffer= NULL;
2280 int gtid_set_size= 0;
2281 List<Item> field_list;
2282
2283 DBUG_ENTER("show_binlog_info");
2284
2285 global_sid_lock->wrlock();
2286 const Gtid_set* gtid_set= gtid_state->get_logged_gtids();
2287 if ((gtid_set_size= gtid_set->to_string(>id_set_buffer)) < 0)
2288 {
2289 global_sid_lock->unlock();
2290 my_eof(thd);
2291 my_free(gtid_set_buffer);
2292 DBUG_RETURN(true);
2293 }
2294 global_sid_lock->unlock();
2295
2296 field_list.push_back(new Item_empty_string("File", FN_REFLEN));
2297 field_list.push_back(new Item_return_int("Position",20,
2298 MYSQL_TYPE_LONGLONG));
2299 field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
2300 field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
2301 field_list.push_back(new Item_empty_string("Executed_Gtid_Set",
2302 gtid_set_size));
2303
2304 if (protocol->send_result_set_metadata(&field_list,
2305 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2306 {
2307 my_free(gtid_set_buffer);
2308 DBUG_RETURN(true);
2309 }
2310 protocol->prepare_for_resend();
2311
2312 if (mysql_bin_log.is_open())
2313 {
2314 LOG_INFO li;
2315 mysql_bin_log.get_current_log(&li);
2316 int dir_len = dirname_length(li.log_file_name);
2317 protocol->store(li.log_file_name + dir_len, &my_charset_bin);
2318 protocol->store((ulonglong) li.pos);
2319 protocol->store(binlog_filter->get_do_db());
2320 protocol->store(binlog_filter->get_ignore_db());
2321 protocol->store(gtid_set_buffer, &my_charset_bin);
2322 if (protocol->write())
2323 {
2324 my_free(gtid_set_buffer);
2325 DBUG_RETURN(true);
2326 }
2327 }
2328 my_eof(thd);
2329 my_free(gtid_set_buffer);
2330 DBUG_RETURN(false);
2331 }
2332
2333
2334 /**
2335 Execute a SHOW BINARY LOGS statement.
2336
2337 @param thd Pointer to THD object for the client thread executing the
2338 statement.
2339
2340 @retval FALSE success
2341 @retval TRUE failure
2342 */
show_binlogs(THD * thd)2343 bool show_binlogs(THD* thd)
2344 {
2345 IO_CACHE *index_file;
2346 LOG_INFO cur;
2347 File file;
2348 char fname[FN_REFLEN];
2349 List<Item> field_list;
2350 uint length;
2351 int cur_dir_len;
2352 Protocol *protocol= thd->protocol;
2353 DBUG_ENTER("show_binlogs");
2354
2355 if (!mysql_bin_log.is_open())
2356 {
2357 my_error(ER_NO_BINARY_LOGGING, MYF(0));
2358 DBUG_RETURN(TRUE);
2359 }
2360
2361 field_list.push_back(new Item_empty_string("Log_name", 255));
2362 field_list.push_back(new Item_return_int("File_size", 20,
2363 MYSQL_TYPE_LONGLONG));
2364 if (protocol->send_result_set_metadata(&field_list,
2365 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2366 DBUG_RETURN(TRUE);
2367
2368 mysql_mutex_lock(mysql_bin_log.get_log_lock());
2369 DEBUG_SYNC(thd, "show_binlogs_after_lock_log_before_lock_index");
2370 mysql_bin_log.lock_index();
2371 index_file=mysql_bin_log.get_index_file();
2372
2373 mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
2374 mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
2375
2376 cur_dir_len= dirname_length(cur.log_file_name);
2377
2378 reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
2379
2380 /* The file ends with EOF or empty line */
2381 while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
2382 {
2383 int dir_len;
2384 ulonglong file_length= 0; // Length if open fails
2385 fname[--length] = '\0'; // remove the newline
2386
2387 protocol->prepare_for_resend();
2388 dir_len= dirname_length(fname);
2389 length-= dir_len;
2390 protocol->store(fname + dir_len, length, &my_charset_bin);
2391
2392 if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
2393 file_length= cur.pos; /* The active log, use the active position */
2394 else
2395 {
2396 /* this is an old log, open it and find the size */
2397 if ((file= mysql_file_open(key_file_binlog,
2398 fname, O_RDONLY | O_SHARE | O_BINARY,
2399 MYF(0))) >= 0)
2400 {
2401 file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
2402 mysql_file_close(file, MYF(0));
2403 }
2404 }
2405 protocol->store(file_length);
2406 if (protocol->write())
2407 {
2408 DBUG_PRINT("info", ("stopping dump thread because protocol->write failed at line %d", __LINE__));
2409 goto err;
2410 }
2411 }
2412 if(index_file->error == -1)
2413 goto err;
2414 mysql_bin_log.unlock_index();
2415 my_eof(thd);
2416 DBUG_RETURN(FALSE);
2417
2418 err:
2419 mysql_bin_log.unlock_index();
2420 DBUG_RETURN(TRUE);
2421 }
2422
2423 #endif /* HAVE_REPLICATION */
2424