1 /* Copyright (c) 2013, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #ifdef HAVE_REPLICATION
24 #include "rpl_binlog_sender.h"
25
26 #include "debug_sync.h" // debug_sync_set_action
27 #include "log.h" // sql_print_information
28 #include "log_event.h" // MAX_MAX_ALLOWED_PACKET
29 #include "rpl_constants.h" // BINLOG_DUMP_NON_BLOCK
30 #include "rpl_handler.h" // RUN_HOOK
31 #include "rpl_master.h" // opt_sporadic_binlog_dump_fail
32 #include "rpl_reporting.h" // MAX_SLAVE_ERRMSG
33 #include "sql_class.h" // THD
34
35 #include "pfs_file_provider.h"
36 #include "mysql/psi/mysql_file.h"
37
38 #ifndef NDEBUG
39 static uint binlog_dump_count= 0;
40 #endif
41 using binary_log::checksum_crc32;
42
43 const uint32 Binlog_sender::PACKET_MIN_SIZE= 4096;
44 const uint32 Binlog_sender::PACKET_MAX_SIZE= UINT_MAX32;
45 const ushort Binlog_sender::PACKET_SHRINK_COUNTER_THRESHOLD= 100;
46 const float Binlog_sender::PACKET_GROW_FACTOR= 2.0;
47 const float Binlog_sender::PACKET_SHRINK_FACTOR= 0.5;
48
49 /**
50 @class Observe_transmission_guard
51
52 Sentry class to guard the transitions for `Delegate::m_observe_transmission`
53 flag within given contexts.
54
55 */
56 class Observe_transmission_guard
57 {
58 public:
59 /**
60 Constructor for the class. It will change the value of the `flag` parameter
61 according with the `event_type` and `event_ptr` content. The `flag` will be
62 set to `true` as follows:
63
64 - The event is an `XID_EVENT`
65 - The event is an `XA_PREPARE_LOG_EVENT`.
66 - The event is a `QUERY_EVENT` with query equal to "XA COMMIT" or "XA ABORT"
67 or "COMMIT".
68 - The event is the first `QUERY_EVENT` after a `GTID_EVENT` and the query is
69 not "BEGIN" --the statement is a DDL, for instance.
70
71 @param flag The flag variable to guard
72 @param event_type The type of the event being processed
73 @param event_ptr The raw content of the event being processed
74 @param event_len The size of the raw content of the event being
75 processed
76 @param checksum_alg The checksum algorithm being used currently
77 @param prev_event_type The type of the event processed just before the
78 current one
79 */
Observe_transmission_guard(bool & flag,binary_log::Log_event_type event_type,const char * event_ptr,uint32 event_len,binary_log::enum_binlog_checksum_alg checksum_alg,binary_log::Log_event_type prev_event_type)80 Observe_transmission_guard(bool &flag, binary_log::Log_event_type event_type,
81 const char *event_ptr, uint32 event_len,
82 binary_log::enum_binlog_checksum_alg checksum_alg,
83 binary_log::Log_event_type prev_event_type)
84 : m_saved(flag), m_to_set(flag)
85 {
86 if (my_atomic_load32(&opt_atomic_replication_sender_observe_commit_only))
87 {
88 switch (event_type)
89 {
90 case binary_log::XID_EVENT:
91 case binary_log::XA_PREPARE_LOG_EVENT:
92 {
93 m_to_set= true;
94 break;
95 }
96 case binary_log::QUERY_EVENT:
97 {
98 bool first_event_after_gtid=
99 prev_event_type == binary_log::ANONYMOUS_GTID_LOG_EVENT ||
100 prev_event_type == binary_log::GTID_LOG_EVENT;
101
102 Format_description_log_event fd_ev(BINLOG_VERSION);
103 fd_ev.common_footer->checksum_alg= checksum_alg;
104 Query_log_event ev(event_ptr, event_len, &fd_ev,
105 binary_log::QUERY_EVENT);
106 if (first_event_after_gtid)
107 m_to_set= (strcmp("BEGIN", ev.query) != 0);
108 else
109 m_to_set= (strncmp("XA COMMIT", ev.query, 9) == 0) ||
110 (strncmp("XA ABORT", ev.query, 8) == 0) ||
111 (strncmp("COMMIT", ev.query, 6) == 0);
112 break;
113 }
114 default:
115 {
116 m_to_set= false;
117 break;
118 }
119 }
120 }
121 }
122
123 /**
124 Destructor for the sentry class. It will instantiate the guarded flag with
125 the value prior to the creation of this object.
126 */
~Observe_transmission_guard()127 ~Observe_transmission_guard() { m_to_set= m_saved; }
128
129 private:
130 /** The value of the guarded flag upon this object creation */
131 bool m_saved;
132 /** The flag variable to guard */
133 bool &m_to_set;
134 };
135
136 /**
137 @class Sender_context_guard
138
139 Sentry class that guards the Binlog_sender context and, at destruction, will
140 prepare it for the next event to be processed.
141 */
142 class Sender_context_guard
143 {
144 public:
145 /**
146 Class constructor that simply stores, internally, the reference for the
147 `Binlog_sender` to be guarded and the values to be set upon destruction.
148
149 @param target The `Binlog_sender` object to be guarded.
150 @param event_type The currently processed event type, to be used for context
151 of the next event processing round.
152 */
Sender_context_guard(Binlog_sender & target,binary_log::Log_event_type event_type)153 Sender_context_guard(Binlog_sender &target,
154 binary_log::Log_event_type event_type)
155 : m_target(target), m_event_type(event_type)
156 {
157 }
158
159 /**
160 Class destructor that will set the proper context of the guarded
161 `Binlog_sender` object.
162 */
~Sender_context_guard()163 virtual ~Sender_context_guard()
164 {
165 m_target.set_prev_event_type(m_event_type);
166 }
167
168 private:
169 /** The object to be guarded */
170 Binlog_sender &m_target;
171 /** The currently being processed event type */
172 binary_log::Log_event_type m_event_type;
173 };
174
Binlog_sender(THD * thd,const char * start_file,my_off_t start_pos,Gtid_set * exclude_gtids,uint32 flag)175 Binlog_sender::Binlog_sender(THD *thd, const char *start_file,
176 my_off_t start_pos,
177 Gtid_set *exclude_gtids, uint32 flag)
178 : m_thd(thd),
179 m_packet(*thd->get_protocol_classic()->get_packet()),
180 m_start_file(start_file),
181 m_start_pos(start_pos), m_exclude_gtid(exclude_gtids),
182 m_using_gtid_protocol(exclude_gtids != NULL),
183 m_check_previous_gtid_event(exclude_gtids != NULL),
184 m_gtid_clear_fd_created_flag(exclude_gtids == NULL),
185 m_diag_area(false),
186 m_errmsg(NULL), m_errno(0), m_last_file(NULL), m_last_pos(0),
187 m_half_buffer_size_req_counter(0), m_new_shrink_size(PACKET_MIN_SIZE),
188 m_fdle(NULL), m_flag(flag), m_observe_transmission(false),
189 m_transmit_started(false), m_prev_event_type(binary_log::UNKNOWN_EVENT)
190 {}
191
init()192 void Binlog_sender::init()
193 {
194 DBUG_ENTER("Binlog_sender::init");
195 THD *thd= m_thd;
196
197 thd->push_diagnostics_area(&m_diag_area);
198 init_heartbeat_period();
199 m_last_event_sent_ts= time(0);
200
201 mysql_mutex_lock(&thd->LOCK_thd_data);
202 thd->current_linfo= &m_linfo;
203 mysql_mutex_unlock(&thd->LOCK_thd_data);
204
205 /* Initialize the buffer only once. */
206 m_packet.mem_realloc(PACKET_MIN_SIZE); // size of the buffer
207 m_new_shrink_size= PACKET_MIN_SIZE;
208 DBUG_PRINT("info", ("Initial packet->alloced_length: %zu",
209 m_packet.alloced_length()));
210
211 if (!mysql_bin_log.is_open())
212 {
213 set_fatal_error("Binary log is not open");
214 DBUG_VOID_RETURN;
215 }
216
217 if (DBUG_EVALUATE_IF("simulate_no_server_id", true, server_id == 0))
218 {
219 set_fatal_error("Misconfigured master - master server_id is 0");
220 DBUG_VOID_RETURN;
221 }
222
223 if (m_using_gtid_protocol)
224 {
225 enum_gtid_mode gtid_mode= get_gtid_mode(GTID_MODE_LOCK_NONE);
226 if (gtid_mode != GTID_MODE_ON)
227 {
228 char buf[MYSQL_ERRMSG_SIZE];
229 sprintf(buf, "The replication sender thread cannot start in "
230 "AUTO_POSITION mode: this server has GTID_MODE = %.192s "
231 "instead of ON.", get_gtid_mode_string(gtid_mode));
232 set_fatal_error(buf);
233 DBUG_VOID_RETURN;
234 }
235 }
236
237 if (check_start_file())
238 DBUG_VOID_RETURN;
239
240 sql_print_information("Start binlog_dump to master_thread_id(%u) "
241 "slave_server(%u), pos(%s, %llu)",
242 thd->thread_id(), thd->server_id,
243 m_start_file, m_start_pos);
244
245 if (RUN_HOOK(binlog_transmit, transmit_start,
246 (thd, m_flag, m_start_file, m_start_pos,
247 &m_observe_transmission)))
248 {
249 set_unknow_error("Failed to run hook 'transmit_start'");
250 DBUG_VOID_RETURN;
251 }
252 m_transmit_started=true;
253
254 init_checksum_alg();
255 /*
256 There are two ways to tell the server to not block:
257
258 - Set the BINLOG_DUMP_NON_BLOCK flag.
259 This is official, documented, not used by any mysql
260 client, but used by some external users.
261
262 - Set server_id=0.
263 This is unofficial, undocumented, and used by
264 mysqlbinlog -R since the beginning of time.
265
266 When mysqlbinlog --stop-never is used, it sets a 'fake'
267 server_id that defaults to 1 but can be set to anything
268 else using stop-never-slave-server-id. This has the
269 drawback that if the server_id conflicts with any other
270 running slave, or with any other instance of mysqlbinlog
271 --stop-never, then that other instance will be killed. It
272 is also an unnecessary burden on the user to have to
273 specify a server_id different from all other server_ids
274 just to avoid conflicts.
275
276 As of MySQL 5.6.20 and 5.7.5, mysqlbinlog redundantly sets
277 the BINLOG_DUMP_NONBLOCK flag when one or both of the
278 following holds:
279 - the --stop-never option is *not* specified
280
281 In a far future, this means we can remove the unofficial
282 functionality that server_id=0 implies nonblocking
283 behavior. That will allow mysqlbinlog to use server_id=0
284 always. That has the advantage that mysqlbinlog
285 --stop-never cannot cause any running dump threads to be
286 killed.
287 */
288 m_wait_new_events= !((thd->server_id == 0) ||
289 ((m_flag & BINLOG_DUMP_NON_BLOCK) != 0));
290 /* Binary event can be vary large. So set it to max allowed packet. */
291 thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
292
293 #ifndef NDEBUG
294 if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
295 set_unknow_error("Master fails in COM_BINLOG_DUMP because of "
296 "--sporadic-binlog-dump-fail");
297 m_event_count= 0;
298 #endif
299 DBUG_VOID_RETURN;
300 }
301
cleanup()302 void Binlog_sender::cleanup()
303 {
304 DBUG_ENTER("Binlog_sender::cleanup");
305
306 THD *thd= m_thd;
307
308 if (m_transmit_started)
309 (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, m_flag));
310
311 mysql_mutex_lock(&thd->LOCK_thd_data);
312 thd->current_linfo= NULL;
313 mysql_mutex_unlock(&thd->LOCK_thd_data);
314
315 thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet;
316
317 thd->pop_diagnostics_area();
318 if (has_error())
319 my_message(m_errno, m_errmsg, MYF(0));
320 else
321 my_eof(thd);
322
323 DBUG_VOID_RETURN;
324 }
325
run()326 void Binlog_sender::run()
327 {
328 DBUG_ENTER("Binlog_sender::run");
329 File file= -1;
330 IO_CACHE log_cache;
331 my_off_t start_pos= m_start_pos;
332 const char *log_file= m_linfo.log_file_name;
333 bool is_index_file_reopened_on_binlog_disable= false;
334 init();
335
336 while (!has_error() && !m_thd->killed)
337 {
338 /*
339 Faked rotate event is only required in a few cases(see comment of the
340 function). But even so, a faked rotate event is always sent before sending
341 event log file, even if a rotate log event exists in last binlog and
342 was already sent. The slave then gets an extra rotation and records
343 two Rotate_log_events.
344
345 The main issue here are some dependencies on mysqlbinlog, that should be
346 solved in the future.
347 */
348 if (unlikely(fake_rotate_event(log_file, start_pos)))
349 break;
350
351 file= open_binlog_file(&log_cache, log_file, &m_errmsg);
352 if (unlikely(file < 0))
353 {
354 set_fatal_error(m_errmsg);
355 break;
356 }
357
358 THD_STAGE_INFO(m_thd, stage_sending_binlog_event_to_slave);
359 if (send_binlog(&log_cache, start_pos))
360 break;
361
362 /* Will go to next file, need to copy log file name */
363 set_last_file(log_file);
364
365 THD_STAGE_INFO(m_thd,
366 stage_finished_reading_one_binlog_switching_to_next_binlog);
367 DBUG_EXECUTE_IF("waiting_for_disable_binlog",
368 {
369 const char act[]= "now "
370 "signal dump_thread_reached_wait_point "
371 "wait_for continue_dump_thread no_clear_event";
372 assert(!debug_sync_set_action(current_thd,
373 STRING_WITH_LEN(act)));
374 };);
375 mysql_bin_log.lock_index();
376 if (!mysql_bin_log.is_open())
377 {
378 if (mysql_bin_log.open_index_file(mysql_bin_log.get_index_fname(),
379 log_file, FALSE))
380 {
381 set_fatal_error("Binary log is not open and failed to open index file "
382 "to retrieve next file.");
383 mysql_bin_log.unlock_index();
384 break;
385 }
386 is_index_file_reopened_on_binlog_disable= true;
387 }
388 int error= mysql_bin_log.find_next_log(&m_linfo, 0);
389 mysql_bin_log.unlock_index();
390 if (unlikely(error))
391 {
392 DBUG_EXECUTE_IF("waiting_for_disable_binlog",
393 {
394 const char act[]= "now signal consumed_binlog";
395 assert(!debug_sync_set_action(current_thd,
396 STRING_WITH_LEN(act)));
397 };);
398 if (is_index_file_reopened_on_binlog_disable)
399 mysql_bin_log.close(LOG_CLOSE_INDEX, true/*need_lock_log=true*/,
400 true/*need_lock_index=true*/);
401 set_fatal_error("could not find next log");
402 break;
403 }
404
405 start_pos= BIN_LOG_HEADER_SIZE;
406 end_io_cache(&log_cache);
407 mysql_file_close(file, MYF(MY_WME));
408 file= -1;
409 }
410
411 THD_STAGE_INFO(m_thd, stage_waiting_to_finalize_termination);
412 char error_text[MAX_SLAVE_ERRMSG];
413
414 /*
415 If the dump thread was killed because of a duplicate slave UUID we
416 will fail throwing an error to the slave so it will not try to
417 reconnect anymore.
418 */
419 mysql_mutex_lock(&m_thd->LOCK_thd_data);
420 bool was_killed_by_duplicate_slave_id= m_thd->duplicate_slave_id;
421 mysql_mutex_unlock(&m_thd->LOCK_thd_data);
422 if (was_killed_by_duplicate_slave_id)
423 set_fatal_error("A slave with the same server_uuid/server_id as this slave "
424 "has connected to the master");
425
426 if (file > 0)
427 {
428 if (is_fatal_error())
429 {
430 /* output events range to error message */
431 my_snprintf(error_text, sizeof(error_text),
432 "%s; the first event '%s' at %lld, "
433 "the last event read from '%s' at %lld, "
434 "the last byte read from '%s' at %lld.",
435 m_errmsg,
436 m_start_file, m_start_pos, m_last_file, m_last_pos,
437 log_file, my_b_tell(&log_cache));
438 set_fatal_error(error_text);
439 }
440
441 end_io_cache(&log_cache);
442 mysql_file_close(file, MYF(MY_WME));
443 }
444
445 cleanup();
446 DBUG_VOID_RETURN;
447 }
448
send_binlog(IO_CACHE * log_cache,my_off_t start_pos)449 my_off_t Binlog_sender::send_binlog(IO_CACHE *log_cache, my_off_t start_pos)
450 {
451 if (unlikely(send_format_description_event(log_cache, start_pos)))
452 return 1;
453
454 if (start_pos == BIN_LOG_HEADER_SIZE)
455 start_pos= my_b_tell(log_cache);
456
457 if (m_check_previous_gtid_event)
458 {
459 bool has_prev_gtid_ev;
460 if (has_previous_gtid_log_event(log_cache, &has_prev_gtid_ev))
461 return 1;
462
463 if (!has_prev_gtid_ev)
464 return 0;
465 }
466
467 /*
468 Slave is requesting a position which is in the middle of a file,
469 so seek to the correct position.
470 */
471 if (my_b_tell(log_cache) != start_pos)
472 my_b_seek(log_cache, start_pos);
473
474 while (!m_thd->killed)
475 {
476 my_off_t end_pos;
477
478 end_pos= get_binlog_end_pos(log_cache);
479 if (end_pos <= 1)
480 return end_pos;
481
482 if (send_events(log_cache, end_pos))
483 return 1;
484
485 m_thd->killed= DBUG_EVALUATE_IF("simulate_kill_dump", THD::KILL_CONNECTION,
486 m_thd->killed);
487
488 DBUG_EXECUTE_IF("wait_after_binlog_EOF",
489 {
490 const char act[]= "now wait_for signal.rotate_finished no_clear_event";
491 assert(!debug_sync_set_action(current_thd,
492 STRING_WITH_LEN(act)));
493 };);
494 }
495 return 1;
496 }
497
get_binlog_end_pos(IO_CACHE * log_cache)498 inline my_off_t Binlog_sender::get_binlog_end_pos(IO_CACHE *log_cache)
499 {
500 DBUG_ENTER("Binlog_sender::get_binlog_end_pos()");
501 my_off_t log_pos= my_b_tell(log_cache);
502 my_off_t end_pos= 0;
503
504 do
505 {
506 mysql_bin_log.lock_binlog_end_pos();
507 end_pos= mysql_bin_log.get_binlog_end_pos();
508 mysql_bin_log.unlock_binlog_end_pos();
509
510 if (unlikely(!mysql_bin_log.is_active(m_linfo.log_file_name)))
511 {
512 end_pos= my_b_filelength(log_cache);
513 if (log_pos == end_pos)
514 DBUG_RETURN(0); // Arrived the end of inactive file
515 else
516 DBUG_RETURN(end_pos);
517 }
518
519 DBUG_PRINT("info", ("Reading file %s, seek pos %llu, end_pos is %llu",
520 m_linfo.log_file_name, log_pos, end_pos));
521 DBUG_PRINT("info", ("Active file is %s", mysql_bin_log.get_log_fname()));
522
523 if (log_pos < end_pos)
524 DBUG_RETURN(end_pos);
525
526 /* Some data may be in net buffer, it should be flushed before waiting */
527 if (!m_wait_new_events || flush_net())
528 DBUG_RETURN(1);
529
530 if (unlikely(wait_new_events(log_pos)))
531 DBUG_RETURN(1);
532 } while (unlikely(!m_thd->killed));
533
534 DBUG_RETURN(1);
535 }
536
send_events(IO_CACHE * log_cache,my_off_t end_pos)537 int Binlog_sender::send_events(IO_CACHE *log_cache, my_off_t end_pos)
538 {
539 DBUG_ENTER("Binlog_sender::send_events");
540
541 THD *thd= m_thd;
542 const char *log_file= m_linfo.log_file_name;
543 my_off_t log_pos= my_b_tell(log_cache);
544 my_off_t exclude_group_end_pos= 0;
545 bool in_exclude_group= false;
546
547 while (likely(log_pos < end_pos))
548 {
549 uchar* event_ptr;
550 uint32 event_len;
551
552 if (unlikely(thd->killed))
553 DBUG_RETURN(1);
554
555 if (unlikely(read_event(log_cache, m_event_checksum_alg,
556 &event_ptr, &event_len)))
557 DBUG_RETURN(1);
558
559 Log_event_type event_type= (Log_event_type)event_ptr[EVENT_TYPE_OFFSET];
560 if (unlikely(check_event_type(event_type, log_file, log_pos)))
561 DBUG_RETURN(1);
562
563 Sender_context_guard ctx_guard(*this, event_type);
564 Observe_transmission_guard obs_guard(m_observe_transmission, event_type,
565 const_cast<const char*>(
566 reinterpret_cast<char*>(event_ptr)),
567 event_len, m_event_checksum_alg,
568 m_prev_event_type);
569
570 DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
571 {
572 if (event_type == binary_log::XID_EVENT)
573 {
574 thd->get_protocol_classic()->flush_net();
575 const char act[]=
576 "now "
577 "wait_for signal.continue";
578 assert(opt_debug_sync_timeout > 0);
579 assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
580 }
581 });
582
583 log_pos= my_b_tell(log_cache);
584
585 if (before_send_hook(log_file, log_pos))
586 DBUG_RETURN(1);
587 /*
588 TODO: Set m_exclude_gtid to NULL if all gtids in m_exclude_gtid has
589 be skipped. and maybe removing the gtid from m_exclude_gtid will make
590 skip_event has better performance.
591 */
592 if (m_exclude_gtid && (in_exclude_group= skip_event(event_ptr, event_len,
593 in_exclude_group)))
594 {
595 /*
596 If we have not send any event from past 'heartbeat_period' time
597 period, then it is time to send a packet before skipping this group.
598 */
599 DBUG_EXECUTE_IF("inject_2sec_sleep_when_skipping_an_event",
600 {
601 my_sleep(2000000);
602 });
603 time_t now= time(0);
604 assert(now >= m_last_event_sent_ts);
605 bool time_for_hb_event= ((ulonglong)(now - m_last_event_sent_ts)
606 >= (ulonglong)(m_heartbeat_period/1000000000UL));
607 if (time_for_hb_event)
608 {
609 if (unlikely(send_heartbeat_event(log_pos)))
610 DBUG_RETURN(1);
611 exclude_group_end_pos= 0;
612 }
613 else
614 {
615 exclude_group_end_pos= log_pos;
616 }
617 DBUG_PRINT("info", ("Event of type %s is skipped",
618 Log_event::get_type_str(event_type)));
619 }
620 else
621 {
622 /*
623 A heartbeat is required before sending a event, If some events are
624 skipped. It notifies the slave to increase master_log_pos for
625 excluded events.
626 */
627 if (exclude_group_end_pos)
628 {
629 /* Save a copy of the buffer content. */
630 String tmp;
631 tmp.copy(m_packet);
632 tmp.length(m_packet.length());
633
634 if (unlikely(send_heartbeat_event(exclude_group_end_pos)))
635 DBUG_RETURN(1);
636 exclude_group_end_pos= 0;
637
638 /* Restore the copy back. */
639 m_packet.copy(tmp);
640 m_packet.length(tmp.length());
641 }
642
643 if (unlikely(send_packet()))
644 DBUG_RETURN(1);
645
646 DBUG_EXECUTE_IF("dump_thread_wait_after_send_write_rows", {
647 if (event_type == binary_log::WRITE_ROWS_EVENT) {
648 thd->get_protocol_classic()->flush_net();
649 static const char act[] =
650 "now "
651 "wait_for signal.continue";
652 assert(opt_debug_sync_timeout > 0);
653 assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
654 }
655 });
656 }
657
658 if (unlikely(after_send_hook(log_file, in_exclude_group ? log_pos : 0)))
659 DBUG_RETURN(1);
660 }
661
662 /*
663 A heartbeat is needed before waiting for more events, if some
664 events are skipped. This is needed so that the slave can increase
665 master_log_pos correctly.
666 */
667 if (unlikely(in_exclude_group))
668 {
669 if (send_heartbeat_event(log_pos))
670 DBUG_RETURN(1);
671 }
672 DBUG_RETURN(0);
673 }
674
675
check_event_type(Log_event_type type,const char * log_file,my_off_t log_pos)676 bool Binlog_sender::check_event_type(Log_event_type type,
677 const char *log_file, my_off_t log_pos)
678 {
679 if (type == binary_log::ANONYMOUS_GTID_LOG_EVENT)
680 {
681 /*
682 Normally, there will not be any anonymous events when
683 auto_position is enabled, since both the master and the slave
684 refuse to connect if the master is not using GTID_MODE=ON.
685 However, if the master changes GTID_MODE after the connection
686 was initialized, or if the slave requests to replicate
687 transactions that appear before the last anonymous event, then
688 this can happen. Then we generate this error to prevent sending
689 anonymous transactions to the slave.
690 */
691 if (m_using_gtid_protocol)
692 {
693 DBUG_EXECUTE_IF("skip_sender_anon_autoposition_error",
694 {
695 return false;
696 };);
697 char buf[MYSQL_ERRMSG_SIZE];
698 sprintf(buf, ER(ER_CANT_REPLICATE_ANONYMOUS_WITH_AUTO_POSITION),
699 log_file, log_pos);
700 set_fatal_error(buf);
701 return true;
702 }
703 /*
704 Normally, there will not be any anonymous events when master has
705 GTID_MODE=ON, since anonymous events are not generated when
706 GTID_MODE=ON. However, this can happen if the master changes
707 GTID_MODE to ON when the slave has not yet replicated all
708 anonymous transactions.
709 */
710 else if (get_gtid_mode(GTID_MODE_LOCK_NONE) == GTID_MODE_ON)
711 {
712 char buf[MYSQL_ERRMSG_SIZE];
713 sprintf(buf, ER(ER_CANT_REPLICATE_ANONYMOUS_WITH_GTID_MODE_ON),
714 log_file, log_pos);
715 set_fatal_error(buf);
716 return true;
717 }
718 }
719 else if (type == binary_log::GTID_LOG_EVENT)
720 {
721 /*
722 Normally, there will not be any GTID events when master has
723 GTID_MODE=OFF, since GTID events are not generated when
724 GTID_MODE=OFF. However, this can happen if the master changes
725 GTID_MODE to OFF when the slave has not yet replicated all GTID
726 transactions.
727 */
728 if (get_gtid_mode(GTID_MODE_LOCK_NONE) == GTID_MODE_OFF)
729 {
730 char buf[MYSQL_ERRMSG_SIZE];
731 sprintf(buf, ER(ER_CANT_REPLICATE_GTID_WITH_GTID_MODE_OFF),
732 log_file, log_pos);
733 set_fatal_error(buf);
734 return true;
735 }
736 }
737 return false;
738 }
739
740
skip_event(const uchar * event_ptr,uint32 event_len,bool in_exclude_group)741 inline bool Binlog_sender::skip_event(const uchar *event_ptr, uint32 event_len,
742 bool in_exclude_group)
743 {
744 DBUG_ENTER("Binlog_sender::skip_event");
745
746 uint8 event_type= (Log_event_type) event_ptr[LOG_EVENT_OFFSET];
747 switch (event_type)
748 {
749 case binary_log::GTID_LOG_EVENT:
750 {
751 Format_description_log_event fd_ev(BINLOG_VERSION);
752 fd_ev.common_footer->checksum_alg= m_event_checksum_alg;
753 Gtid_log_event gtid_ev((const char *)event_ptr, event_checksum_on() ?
754 event_len - BINLOG_CHECKSUM_LEN : event_len,
755 &fd_ev);
756 Gtid gtid;
757 gtid.sidno= gtid_ev.get_sidno(m_exclude_gtid->get_sid_map());
758 gtid.gno= gtid_ev.get_gno();
759 DBUG_RETURN(m_exclude_gtid->contains_gtid(gtid));
760 }
761 case binary_log::ROTATE_EVENT:
762 DBUG_RETURN(false);
763 }
764 DBUG_RETURN(in_exclude_group);
765 }
766
wait_new_events(my_off_t log_pos)767 int Binlog_sender::wait_new_events(my_off_t log_pos)
768 {
769 int ret= 0;
770 PSI_stage_info old_stage;
771
772 mysql_bin_log.lock_binlog_end_pos();
773 m_thd->ENTER_COND(mysql_bin_log.get_log_cond(),
774 mysql_bin_log.get_binlog_end_pos_lock(),
775 &stage_master_has_sent_all_binlog_to_slave,
776 &old_stage);
777
778 if (mysql_bin_log.get_binlog_end_pos() <= log_pos &&
779 mysql_bin_log.is_active(m_linfo.log_file_name))
780 {
781 if (m_heartbeat_period)
782 ret= wait_with_heartbeat(log_pos);
783 else
784 ret= wait_without_heartbeat();
785 }
786
787 mysql_bin_log.unlock_binlog_end_pos();
788 m_thd->EXIT_COND(&old_stage);
789 return ret;
790 }
791
wait_with_heartbeat(my_off_t log_pos)792 inline int Binlog_sender::wait_with_heartbeat(my_off_t log_pos)
793 {
794 #ifndef NDEBUG
795 ulong hb_info_counter= 0;
796 #endif
797 struct timespec ts;
798 int ret;
799
800 do
801 {
802 set_timespec_nsec(&ts, m_heartbeat_period);
803 ret= mysql_bin_log.wait_for_update_bin_log(m_thd, &ts);
804 if (ret != ETIMEDOUT && ret != ETIME)
805 break;
806
807 #ifndef NDEBUG
808 if (hb_info_counter < 3)
809 {
810 sql_print_information("master sends heartbeat message");
811 hb_info_counter++;
812 if (hb_info_counter == 3)
813 sql_print_information("the rest of heartbeat info skipped ...");
814 }
815 #endif
816 if (send_heartbeat_event(log_pos))
817 return 1;
818 } while (!m_thd->killed);
819
820 return ret ? 1 : 0;
821 }
822
wait_without_heartbeat()823 inline int Binlog_sender::wait_without_heartbeat()
824 {
825 return mysql_bin_log.wait_for_update_bin_log(m_thd, NULL);
826 }
827
init_heartbeat_period()828 void Binlog_sender::init_heartbeat_period()
829 {
830 my_bool null_value;
831 LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
832
833 /* Protects m_thd->user_vars. */
834 mysql_mutex_lock(&m_thd->LOCK_thd_data);
835
836 user_var_entry *entry=
837 (user_var_entry*) my_hash_search(&m_thd->user_vars, (uchar*) name.str,
838 name.length);
839 m_heartbeat_period= entry ? entry->val_int(&null_value) : 0;
840
841 mysql_mutex_unlock(&m_thd->LOCK_thd_data);
842 }
843
check_start_file()844 int Binlog_sender::check_start_file()
845 {
846 char index_entry_name[FN_REFLEN];
847 char *name_ptr= NULL;
848 File file;
849 IO_CACHE cache;
850 std::string errmsg;
851 my_off_t size;
852
853 if (m_start_file[0] != '\0')
854 {
855 mysql_bin_log.make_log_name(index_entry_name, m_start_file);
856 name_ptr= index_entry_name;
857 }
858 else if (m_using_gtid_protocol)
859 {
860 /*
861 In normal scenarios, it is not possible that Slave will
862 contain more gtids than Master with resepctive to Master's
863 UUID. But it could be possible case if Master's binary log
864 is truncated(due to raid failure) or Master's binary log is
865 deleted but GTID_PURGED was not set properly. That scenario
866 needs to be validated, i.e., it should *always* be the case that
867 Slave's gtid executed set (+retrieved set) is a subset of
868 Master's gtid executed set with respective to Master's UUID.
869 If it happens, dump thread will be stopped during the handshake
870 with Slave (thus the Slave's I/O thread will be stopped with the
871 error. Otherwise, it can lead to data inconsistency between Master
872 and Slave.
873 */
874 Sid_map* slave_sid_map= m_exclude_gtid->get_sid_map();
875 assert(slave_sid_map);
876 global_sid_lock->wrlock();
877 const rpl_sid &server_sid= gtid_state->get_server_sid();
878 rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid);
879 Gtid_set
880 gtid_executed_and_owned(gtid_state->get_executed_gtids()->get_sid_map());
881
882 // gtids = executed_gtids & owned_gtids
883 if (gtid_executed_and_owned.add_gtid_set(gtid_state->get_executed_gtids())
884 != RETURN_STATUS_OK)
885 {
886 assert(0);
887 }
888 gtid_state->get_owned_gtids()->get_gtids(gtid_executed_and_owned);
889
890 if (!m_exclude_gtid->is_subset_for_sid(>id_executed_and_owned,
891 gtid_state->get_server_sidno(),
892 subset_sidno))
893 {
894 global_sid_lock->unlock();
895 set_fatal_error(ER(ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER));
896 return 1;
897 }
898 /*
899 Setting GTID_PURGED (when GTID_EXECUTED set is empty i.e., when
900 previous_gtids are also empty) will make binlog rotate. That
901 leaves first binary log with empty previous_gtids and second
902 binary log's previous_gtids with the value of gtid_purged.
903 In find_first_log_not_in_gtid_set() while we search for a binary
904 log whose previous_gtid_set is subset of slave_gtid_executed,
905 in this particular case, server will always find the first binary
906 log with empty previous_gtids which is subset of any given
907 slave_gtid_executed. Thus Master thinks that it found the first
908 binary log which is actually not correct and unable to catch
909 this error situation. Hence adding below extra if condition
910 to check the situation. Slave should know about Master's purged GTIDs.
911 If Slave's GTID executed + retrieved set does not contain Master's
912 complete purged GTID list, that means Slave is requesting(expecting)
913 GTIDs which were purged by Master. We should let Slave know about the
914 situation. i.e., throw error if slave's GTID executed set is not
915 a superset of Master's purged GTID set.
916 The other case, where user deleted binary logs manually
917 (without using 'PURGE BINARY LOGS' command) but gtid_purged
918 is not set by the user, the following if condition cannot catch it.
919 But that is not a problem because in find_first_log_not_in_gtid_set()
920 while checking for subset previous_gtids binary log, the logic
921 will not find one and an error ER_MASTER_HAS_PURGED_REQUIRED_GTIDS
922 is thrown from there.
923 */
924 if (!gtid_state->get_lost_gtids()->is_subset(m_exclude_gtid))
925 {
926 mysql_bin_log.report_missing_purged_gtids(m_exclude_gtid, errmsg);
927 global_sid_lock->unlock();
928 set_fatal_error(errmsg.c_str());
929 return 1;
930 }
931 global_sid_lock->unlock();
932 Gtid first_gtid= {0, 0};
933 if (mysql_bin_log.find_first_log_not_in_gtid_set(index_entry_name,
934 m_exclude_gtid,
935 &first_gtid,
936 errmsg))
937 {
938 set_fatal_error(errmsg.c_str());
939 return 1;
940 }
941 name_ptr= index_entry_name;
942 /*
943 find_first_log_not_in_gtid_set() guarantees the file it found has
944 Previous_gtids_log_event as all following binlogs. So the variable is
945 set to false which tells not to check the event again when starting to
946 dump binglogs.
947 */
948 m_check_previous_gtid_event= false;
949 /*
950 If we are skipping at least the first transaction of the binlog,
951 we must clear the "created" field of the FD event (set it to 0)
952 to avoid cleaning up temp tables on slave.
953 */
954 m_gtid_clear_fd_created_flag= (first_gtid.sidno >= 1 &&
955 first_gtid.gno >= 1 &&
956 m_exclude_gtid->contains_gtid(first_gtid));
957 }
958
959 /*
960 Index entry name is saved into m_linfo. If name_ptr is NULL,
961 then starts from the first file in index file.
962 */
963
964 if (mysql_bin_log.find_log_pos(&m_linfo, name_ptr, true))
965 {
966 set_fatal_error("Could not find first log file name in binary log "
967 "index file");
968 return 1;
969 }
970
971 if (m_start_pos < BIN_LOG_HEADER_SIZE)
972 {
973 set_fatal_error("Client requested master to start replication "
974 "from position < 4");
975 return 1;
976 }
977
978 const char *bl_errmsg= NULL;
979 if ((file= open_binlog_file(&cache, m_linfo.log_file_name, &bl_errmsg)) < 0)
980 {
981 set_fatal_error(bl_errmsg);
982 return 1;
983 }
984
985 size= my_b_filelength(&cache);
986 end_io_cache(&cache);
987 mysql_file_close(file, MYF(MY_WME));
988
989 if (m_start_pos > size)
990 {
991 set_fatal_error("Client requested master to start replication from "
992 "position > file size");
993 return 1;
994 }
995 return 0;
996 }
997
998 extern TYPELIB binlog_checksum_typelib;
999
init_checksum_alg()1000 void Binlog_sender::init_checksum_alg()
1001 {
1002 DBUG_ENTER("init_binlog_checksum");
1003
1004 LEX_STRING name= {C_STRING_WITH_LEN("master_binlog_checksum")};
1005 user_var_entry *entry;
1006
1007 m_slave_checksum_alg= binary_log::BINLOG_CHECKSUM_ALG_UNDEF;
1008
1009 /* Protects m_thd->user_vars. */
1010 mysql_mutex_lock(&m_thd->LOCK_thd_data);
1011
1012 entry= (user_var_entry*) my_hash_search(&m_thd->user_vars,
1013 (uchar*) name.str, name.length);
1014 if (entry)
1015 {
1016 m_slave_checksum_alg=
1017 static_cast<enum_binlog_checksum_alg>(find_type((char*) entry->ptr(), &binlog_checksum_typelib, 1) - 1);
1018 assert(m_slave_checksum_alg < binary_log::BINLOG_CHECKSUM_ALG_ENUM_END);
1019 }
1020
1021 mysql_mutex_unlock(&m_thd->LOCK_thd_data);
1022
1023 /*
1024 m_event_checksum_alg should be set to the checksum algorithm in
1025 Format_description_log_event. But it is used by fake_rotate_event() which
1026 will be called before reading any Format_description_log_event. In that case,
1027 m_slave_checksum_alg is set as the value of m_event_checksum_alg.
1028 */
1029 m_event_checksum_alg= m_slave_checksum_alg;
1030 DBUG_VOID_RETURN;
1031 }
1032
fake_rotate_event(const char * next_log_file,my_off_t log_pos)1033 int Binlog_sender::fake_rotate_event(const char *next_log_file,
1034 my_off_t log_pos)
1035 {
1036 DBUG_ENTER("fake_rotate_event");
1037 const char* p = next_log_file + dirname_length(next_log_file);
1038 size_t ident_len = strlen(p);
1039 size_t event_len = ident_len + LOG_EVENT_HEADER_LEN + Binary_log_event::ROTATE_HEADER_LEN +
1040 (event_checksum_on() ? BINLOG_CHECKSUM_LEN : 0);
1041
1042 /* reset transmit packet for the fake rotate event below */
1043 if (reset_transmit_packet(0, event_len))
1044 DBUG_RETURN(1);
1045
1046 size_t event_offset= m_packet.length();
1047 m_packet.length(event_len + event_offset);
1048 uchar *header= (uchar *)m_packet.ptr() + event_offset;
1049 uchar *rotate_header= header + LOG_EVENT_HEADER_LEN;
1050 /*
1051 'when' (the timestamp) is set to 0 so that slave could distinguish between
1052 real and fake Rotate events (if necessary)
1053 */
1054 int4store(header, 0);
1055 header[EVENT_TYPE_OFFSET] = binary_log::ROTATE_EVENT;
1056 int4store(header + SERVER_ID_OFFSET, server_id);
1057 int4store(header + EVENT_LEN_OFFSET, static_cast<uint32>(event_len));
1058 int4store(header + LOG_POS_OFFSET, 0);
1059 int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
1060
1061 int8store(rotate_header, log_pos);
1062 memcpy(rotate_header + Binary_log_event::ROTATE_HEADER_LEN, p, ident_len);
1063
1064 if (event_checksum_on())
1065 calc_event_checksum(header, event_len);
1066
1067 DBUG_RETURN(send_packet());
1068 }
1069
calc_event_checksum(uchar * event_ptr,size_t event_len)1070 inline void Binlog_sender::calc_event_checksum(uchar *event_ptr, size_t event_len)
1071 {
1072 ha_checksum crc= checksum_crc32(0L, NULL, 0);
1073 crc= checksum_crc32(crc, event_ptr, event_len - BINLOG_CHECKSUM_LEN);
1074 int4store(event_ptr + event_len - BINLOG_CHECKSUM_LEN, crc);
1075 }
1076
reset_transmit_packet(ushort flags,size_t event_len)1077 inline int Binlog_sender::reset_transmit_packet(ushort flags, size_t event_len)
1078 {
1079 DBUG_ENTER("Binlog_sender::reset_transmit_packet");
1080 DBUG_PRINT("info", ("event_len: %zu, m_packet->alloced_length: %zu",
1081 event_len, m_packet.alloced_length()));
1082 assert(m_packet.alloced_length() >= PACKET_MIN_SIZE);
1083
1084 m_packet.length(0); // size of the content
1085 m_packet.qs_append('\0'); // Set this as an OK packet
1086
1087 /* reserve and set default header */
1088 if (m_observe_transmission &&
1089 RUN_HOOK(binlog_transmit, reserve_header, (m_thd, flags, &m_packet)))
1090 {
1091 set_unknow_error("Failed to run hook 'reserve_header'");
1092 DBUG_RETURN(1);
1093 }
1094
1095 /* Resizes the buffer if needed. */
1096 if (grow_packet(event_len))
1097 DBUG_RETURN(1);
1098
1099 DBUG_PRINT("info", ("m_packet.alloced_length: %zu (after potential "
1100 "reallocation)", m_packet.alloced_length()));
1101
1102 DBUG_RETURN(0);
1103 }
1104
send_format_description_event(IO_CACHE * log_cache,my_off_t start_pos)1105 int Binlog_sender::send_format_description_event(IO_CACHE *log_cache,
1106 my_off_t start_pos)
1107 {
1108 DBUG_ENTER("Binlog_sender::send_format_description_event");
1109 uchar* event_ptr;
1110 uint32 event_len;
1111
1112 m_fdle.reset(new Format_description_log_event(4));
1113 if (m_fdle == NULL)
1114 {
1115 set_fatal_error("Out-of-memory");
1116 DBUG_RETURN(1);
1117 }
1118
1119 if (read_event(log_cache, binary_log::BINLOG_CHECKSUM_ALG_OFF, &event_ptr,
1120 &event_len))
1121 DBUG_RETURN(1);
1122
1123 DBUG_PRINT("info",
1124 ("Looked for a Format_description_log_event, found event type %s",
1125 Log_event::get_type_str((Log_event_type) event_ptr[EVENT_TYPE_OFFSET])));
1126
1127 if (event_ptr[EVENT_TYPE_OFFSET] != binary_log::FORMAT_DESCRIPTION_EVENT)
1128 {
1129 set_fatal_error("Could not find format_description_event in binlog file");
1130 DBUG_RETURN(1);
1131 }
1132
1133 assert(event_ptr[LOG_POS_OFFSET] > 0);
1134 m_event_checksum_alg=
1135 Log_event_footer::get_checksum_alg((const char *)event_ptr, event_len);
1136
1137 assert(m_event_checksum_alg < binary_log::BINLOG_CHECKSUM_ALG_ENUM_END ||
1138 m_event_checksum_alg == binary_log::BINLOG_CHECKSUM_ALG_UNDEF);
1139
1140 /* Slave does not support checksum, but binary events include checksum */
1141 if (m_slave_checksum_alg == binary_log::BINLOG_CHECKSUM_ALG_UNDEF &&
1142 event_checksum_on())
1143 {
1144 set_fatal_error("Slave can not handle replication events with the "
1145 "checksum that master is configured to log");
1146
1147 sql_print_warning("Master is configured to log replication events "
1148 "with checksum, but will not send such events to "
1149 "slaves that cannot process them");
1150 DBUG_RETURN(1);
1151 }
1152
1153 event_ptr[FLAGS_OFFSET] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1154
1155 bool event_updated= false;
1156 if (m_using_gtid_protocol)
1157 {
1158 if (m_gtid_clear_fd_created_flag)
1159 {
1160 /*
1161 As we are skipping at least the first transaction of the binlog,
1162 we must clear the "created" field of the FD event (set it to 0)
1163 to avoid destroying temp tables on slave.
1164 */
1165 int4store(event_ptr + LOG_EVENT_MINIMAL_HEADER_LEN + ST_CREATED_OFFSET,
1166 0);
1167 event_updated= true;
1168 }
1169 }
1170 else if (start_pos > BIN_LOG_HEADER_SIZE)
1171 {
1172 /*
1173 If we are skipping the beginning of the binlog file based on the position
1174 asked by the slave, we must clear the log_pos and the created flag of the
1175 Format_description_log_event to be sent. Mark that this event with
1176 "log_pos=0", so the slave should not increment master's binlog position
1177 (rli->group_master_log_pos)
1178 */
1179 int4store(event_ptr + LOG_POS_OFFSET, 0);
1180 /*
1181 Set the 'created' field to 0 to avoid destroying
1182 temp tables on slave.
1183 */
1184 int4store(event_ptr + LOG_EVENT_MINIMAL_HEADER_LEN + ST_CREATED_OFFSET, 0);
1185 event_updated= true;
1186 }
1187
1188 /* fix the checksum due to latest changes in header */
1189 if (event_checksum_on() && event_updated)
1190 calc_event_checksum(event_ptr, event_len);
1191
1192 if (m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_UNDEF &&
1193 m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_OFF)
1194 event_len-= BINLOG_CHECKSUM_LEN;
1195
1196 Format_description_log_event *new_fdle= NULL;
1197
1198 new_fdle= new Format_description_log_event(reinterpret_cast<char*>(event_ptr), event_len, m_fdle.get());
1199
1200 if (new_fdle == NULL)
1201 {
1202 set_fatal_error("Out-of-memory");
1203 DBUG_RETURN(1);
1204 }
1205 m_fdle.reset(new_fdle);
1206
1207 if (send_packet())
1208 DBUG_RETURN(1);
1209
1210 my_off_t binlog_pos_after_fdle= my_b_tell(log_cache);
1211
1212 char header_buffer[LOG_EVENT_MINIMAL_HEADER_LEN];
1213 // Let's check if next event is Start encryption event
1214 // If we go outside the file peek_event_header will also return an error
1215 if (Log_event::peek_event_header(header_buffer, log_cache))
1216 {
1217 my_b_seek(log_cache, binlog_pos_after_fdle);
1218 DBUG_RETURN(0);
1219 }
1220
1221 // peek_event_header actually moves the log_cache->read_pos, thus we need to rewind
1222 my_b_seek(log_cache, binlog_pos_after_fdle);
1223
1224 if (static_cast<uchar>(header_buffer[EVENT_TYPE_OFFSET]) == binary_log::START_ENCRYPTION_EVENT)
1225 {
1226 event_ptr= NULL;
1227 my_off_t log_pos= my_b_tell(log_cache);
1228
1229 if (read_event(log_cache, m_event_checksum_alg, &event_ptr,
1230 &event_len))
1231 DBUG_RETURN(1);
1232
1233 if (m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_UNDEF &&
1234 m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_OFF)
1235 event_len-= BINLOG_CHECKSUM_LEN;
1236
1237 assert(event_ptr[EVENT_TYPE_OFFSET] == binary_log::START_ENCRYPTION_EVENT);
1238 Start_encryption_log_event sele(reinterpret_cast<char*>(event_ptr), event_len, m_fdle.get());
1239
1240 if (!sele.is_valid())
1241 {
1242 set_fatal_error("Start encryption log event is invalid");
1243 DBUG_RETURN(1);
1244 }
1245
1246 if (m_fdle->start_decryption(&sele))
1247 {
1248 set_fatal_error("Could not decrypt binlog: encryption key error");
1249 DBUG_RETURN(1);
1250 }
1251
1252 if (start_pos <= BIN_LOG_HEADER_SIZE)
1253 {
1254 log_pos= my_b_tell(log_cache);
1255 // We have read start encryption event from master binlog, but we have
1256 // not sent it to slave. We need to inform slave that master position
1257 // has advanced.
1258 if (unlikely(send_heartbeat_event(log_pos)))
1259 DBUG_RETURN(1);
1260 }
1261 }
1262 DBUG_RETURN(0);
1263 }
1264
has_previous_gtid_log_event(IO_CACHE * log_cache,bool * found)1265 int Binlog_sender::has_previous_gtid_log_event(IO_CACHE *log_cache,
1266 bool *found)
1267 {
1268 uchar buf[LOG_EVENT_HEADER_LEN];
1269 *found= false;
1270
1271 /* It is possible there is only format_description_log_event in the file. */
1272 if (my_b_tell(log_cache) < my_b_filelength(log_cache))
1273 {
1274 if (my_b_read(log_cache, buf, LOG_EVENT_HEADER_LEN) != 0)
1275 {
1276 set_fatal_error(log_read_error_msg(LOG_READ_IO));
1277 return 1;
1278 }
1279 *found= (buf[EVENT_TYPE_OFFSET] == binary_log::PREVIOUS_GTIDS_LOG_EVENT);
1280 }
1281 return 0;
1282 }
1283
log_read_error_msg(int error)1284 const char* Binlog_sender::log_read_error_msg(int error)
1285 {
1286 switch (error) {
1287 case LOG_READ_BOGUS:
1288 return "bogus data in log event";
1289 case LOG_READ_TOO_LARGE:
1290 return "log event entry exceeded max_allowed_packet; Increase "
1291 "max_allowed_packet on master";
1292 case LOG_READ_IO:
1293 return "I/O error reading log event";
1294 case LOG_READ_MEM:
1295 return "memory allocation failed reading log event";
1296 case LOG_READ_TRUNC:
1297 return "binlog truncated in the middle of event; consider out of disk space on master";
1298 case LOG_READ_CHECKSUM_FAILURE:
1299 return "event read from binlog did not pass crc check";
1300 case LOG_READ_DECRYPT:
1301 return "Event decryption failure";
1302 default:
1303 return "unknown error reading log event on the master";
1304 }
1305 }
1306
read_event(IO_CACHE * log_cache,enum_binlog_checksum_alg checksum_alg,uchar ** event_ptr,uint32 * event_len)1307 inline int Binlog_sender::read_event(IO_CACHE *log_cache, enum_binlog_checksum_alg checksum_alg,
1308 uchar **event_ptr, uint32 *event_len)
1309 {
1310 DBUG_ENTER("Binlog_sender::read_event");
1311
1312 size_t event_offset;
1313 char header[LOG_EVENT_MINIMAL_HEADER_LEN];
1314 int error= 0;
1315 #ifndef NDEBUG
1316 const char *packet_buffer= NULL;
1317 #endif
1318
1319 if ((error= Log_event::peek_event_length(event_len, log_cache, header)))
1320 goto read_error;
1321
1322 if (reset_transmit_packet(0, *event_len))
1323 DBUG_RETURN(1);
1324
1325 event_offset= m_packet.length();
1326 #ifndef NDEBUG
1327 packet_buffer= m_packet.ptr();
1328 #endif
1329
1330 DBUG_EXECUTE_IF("dump_thread_before_read_event",
1331 {
1332 const char act[]= "now wait_for signal.continue no_clear_event";
1333 assert(!debug_sync_set_action(current_thd,
1334 STRING_WITH_LEN(act)));
1335 };);
1336
1337 /*
1338 packet is big enough to read the event, since we have reallocated based
1339 on the length stated in the event header.
1340 */
1341 if ((error= Log_event::read_log_event(log_cache, &m_packet, m_fdle.get(), NULL, checksum_alg,
1342 NULL, NULL, header)))
1343 goto read_error;
1344
1345 set_last_pos(my_b_tell(log_cache));
1346
1347 /*
1348 As we pre-allocate the buffer to store the event at reset_transmit_packet,
1349 the buffer should not be changed while calling read_log_event (unless binlog
1350 encryption is on), even knowing that it might call functions to replace the
1351 buffer by one with the size to fit the event. When encryption is on - the buffer
1352 will be replaced with memory allocated for storing decrypted data.
1353 */
1354 assert(encrypt_binlog || packet_buffer == m_packet.ptr());
1355 *event_ptr= (uchar *)m_packet.ptr() + event_offset;
1356
1357 DBUG_PRINT("info",
1358 ("Read event %s",
1359 Log_event::get_type_str(Log_event_type
1360 ((*event_ptr)[EVENT_TYPE_OFFSET]))));
1361 #ifndef NDEBUG
1362 if (check_event_count())
1363 DBUG_RETURN(1);
1364 #endif
1365 DBUG_RETURN(0);
1366 read_error:
1367 /*
1368 In theory, it should never happen. But RESET MASTER deletes binlog file
1369 directly without checking if there is any dump thread working.
1370 */
1371 error= (error == LOG_READ_EOF) ? LOG_READ_IO : error;
1372 set_fatal_error(log_read_error_msg(error));
1373 DBUG_RETURN(1);
1374 }
1375
send_heartbeat_event(my_off_t log_pos)1376 int Binlog_sender::send_heartbeat_event(my_off_t log_pos)
1377 {
1378 DBUG_ENTER("send_heartbeat_event");
1379 const char* filename= m_linfo.log_file_name;
1380 const char* p= filename + dirname_length(filename);
1381 size_t ident_len= strlen(p);
1382 size_t event_len= ident_len + LOG_EVENT_HEADER_LEN +
1383 (event_checksum_on() ? BINLOG_CHECKSUM_LEN : 0);
1384
1385 DBUG_PRINT("info", ("log_file_name %s, log_pos %llu", p, log_pos));
1386
1387 if (reset_transmit_packet(0, event_len))
1388 DBUG_RETURN(1);
1389
1390 size_t event_offset= m_packet.length();
1391 m_packet.length(event_len + event_offset);
1392 uchar *header= (uchar *)m_packet.ptr() + event_offset;
1393
1394 /* Timestamp field */
1395 int4store(header, 0);
1396 header[EVENT_TYPE_OFFSET] = binary_log::HEARTBEAT_LOG_EVENT;
1397 int4store(header + SERVER_ID_OFFSET, server_id);
1398 int4store(header + EVENT_LEN_OFFSET, event_len);
1399 int4store(header + LOG_POS_OFFSET, static_cast<uint32>(log_pos));
1400 int2store(header + FLAGS_OFFSET, 0);
1401 memcpy(header + LOG_EVENT_HEADER_LEN, p, ident_len);
1402
1403 if (event_checksum_on())
1404 calc_event_checksum(header, event_len);
1405
1406 DBUG_RETURN(send_packet_and_flush());
1407 }
1408
flush_net()1409 inline int Binlog_sender::flush_net()
1410 {
1411 if (DBUG_EVALUATE_IF("simulate_flush_error", 1,
1412 m_thd->get_protocol_classic()->flush_net()))
1413 {
1414 set_unknow_error("failed on flush_net()");
1415 return 1;
1416 }
1417 return 0;
1418 }
1419
send_packet()1420 inline int Binlog_sender::send_packet()
1421 {
1422 DBUG_ENTER("Binlog_sender::send_packet");
1423 DBUG_PRINT("info",
1424 ("Sending event of type %s", Log_event::get_type_str(
1425 (Log_event_type)m_packet.ptr()[1 + EVENT_TYPE_OFFSET])));
1426 // We should always use the same buffer to guarantee that the reallocation
1427 // logic is not broken.
1428 if (DBUG_EVALUATE_IF("simulate_send_error", true,
1429 my_net_write(
1430 m_thd->get_protocol_classic()->get_net(),
1431 (uchar*) m_packet.ptr(), m_packet.length())))
1432 {
1433 set_unknow_error("Failed on my_net_write()");
1434 DBUG_RETURN(1);
1435 }
1436
1437 /* Shrink the packet if needed. */
1438 int ret= shrink_packet() ? 1 : 0;
1439 m_last_event_sent_ts= time(0);
1440 DBUG_RETURN(ret);
1441 }
1442
send_packet_and_flush()1443 inline int Binlog_sender::send_packet_and_flush()
1444 {
1445 return (send_packet() || flush_net());
1446 }
1447
before_send_hook(const char * log_file,my_off_t log_pos)1448 inline int Binlog_sender::before_send_hook(const char *log_file,
1449 my_off_t log_pos)
1450 {
1451 if (m_observe_transmission &&
1452 RUN_HOOK(binlog_transmit, before_send_event,
1453 (m_thd, m_flag, &m_packet, log_file, log_pos)))
1454 {
1455 set_unknow_error("run 'before_send_event' hook failed");
1456 return 1;
1457 }
1458 return 0;
1459 }
1460
after_send_hook(const char * log_file,my_off_t log_pos)1461 inline int Binlog_sender::after_send_hook(const char *log_file,
1462 my_off_t log_pos)
1463 {
1464 if (m_observe_transmission &&
1465 RUN_HOOK(binlog_transmit, after_send_event,
1466 (m_thd, m_flag, &m_packet, log_file, log_pos)))
1467 {
1468 set_unknow_error("Failed to run hook 'after_send_event'");
1469 return 1;
1470 }
1471
1472 /*
1473 semisync after_send_event hook doesn't return and error when net error
1474 happens.
1475 */
1476 if (m_thd->get_protocol_classic()->get_net()->last_errno != 0)
1477 {
1478 set_unknow_error("Found net error");
1479 return 1;
1480 }
1481 return 0;
1482 }
1483
1484 #ifndef NDEBUG
1485 extern int max_binlog_dump_events;
1486
check_event_count()1487 inline int Binlog_sender::check_event_count()
1488 {
1489 if (max_binlog_dump_events != 0 &&
1490 (++m_event_count > max_binlog_dump_events))
1491 {
1492 set_unknow_error("Debugging binlog dump abort");
1493 return 1;
1494 }
1495 return 0;
1496 }
1497 #endif
1498
1499
grow_packet(size_t extra_size)1500 inline bool Binlog_sender::grow_packet(size_t extra_size)
1501 {
1502 DBUG_ENTER("Binlog_sender::grow_packet");
1503 size_t cur_buffer_size= m_packet.alloced_length();
1504 size_t cur_buffer_used= m_packet.length();
1505 size_t needed_buffer_size= cur_buffer_used + extra_size;
1506
1507 if (extra_size > (PACKET_MAX_SIZE - cur_buffer_used))
1508 /*
1509 Not enough memory: requesting packet to be bigger than the max
1510 allowed - PACKET_MAX_SIZE.
1511 */
1512 DBUG_RETURN(true);
1513
1514 /* Grow the buffer if needed. */
1515 if (needed_buffer_size > cur_buffer_size)
1516 {
1517 size_t new_buffer_size;
1518 new_buffer_size= calc_grow_buffer_size(cur_buffer_size, needed_buffer_size);
1519
1520 if (!new_buffer_size)
1521 DBUG_RETURN(true);
1522
1523 if (m_packet.mem_realloc(new_buffer_size))
1524 DBUG_RETURN(true);
1525
1526 /*
1527 Calculates the new, smaller buffer, size to use the next time
1528 one wants to shrink the buffer.
1529 */
1530 calc_shrink_buffer_size(new_buffer_size);
1531 }
1532
1533 DBUG_RETURN(false);
1534 }
1535
shrink_packet()1536 inline bool Binlog_sender::shrink_packet()
1537 {
1538 DBUG_ENTER("Binlog_sender::shrink_packet");
1539 bool res= false;
1540 size_t cur_buffer_size= m_packet.alloced_length();
1541 size_t buffer_used= m_packet.length();
1542
1543 assert(!(cur_buffer_size < PACKET_MIN_SIZE));
1544
1545 /*
1546 If the packet is already at the minimum size, just
1547 do nothing. Otherwise, check if we should shrink.
1548 */
1549 if (cur_buffer_size > PACKET_MIN_SIZE)
1550 {
1551 /* increment the counter if we used less than the new shrink size. */
1552 if (buffer_used < m_new_shrink_size)
1553 {
1554 m_half_buffer_size_req_counter++;
1555
1556 /* Check if we should shrink the buffer. */
1557 if (m_half_buffer_size_req_counter == PACKET_SHRINK_COUNTER_THRESHOLD)
1558 {
1559 /*
1560 The last PACKET_SHRINK_COUNTER_THRESHOLD consecutive packets
1561 required less than half of the current buffer size. Lets shrink
1562 it to not hold more memory than we potentially need.
1563 */
1564 m_packet.shrink(m_new_shrink_size);
1565
1566 /*
1567 Calculates the new, smaller buffer, size to use the next time
1568 one wants to shrink the buffer.
1569 */
1570 calc_shrink_buffer_size(m_new_shrink_size);
1571
1572 /* Reset the counter. */
1573 m_half_buffer_size_req_counter= 0;
1574 }
1575 }
1576 else
1577 m_half_buffer_size_req_counter= 0;
1578 }
1579 #ifndef NDEBUG
1580 if (res == false)
1581 {
1582 assert(m_new_shrink_size <= cur_buffer_size);
1583 assert(m_packet.alloced_length() >= PACKET_MIN_SIZE);
1584 }
1585 #endif
1586 DBUG_RETURN(res);
1587 }
1588
calc_grow_buffer_size(size_t current_size,size_t min_size)1589 inline size_t Binlog_sender::calc_grow_buffer_size(size_t current_size,
1590 size_t min_size)
1591 {
1592 /* Check that a sane minimum buffer size was requested. */
1593 assert(min_size > PACKET_MIN_SIZE);
1594 if (min_size > PACKET_MAX_SIZE)
1595 return 0;
1596
1597 /*
1598 Even if this overflows (PACKET_MAX_SIZE == UINT_MAX32) and
1599 new_size wraps around, the min_size will always be returned,
1600 i.e., it is a safety net.
1601
1602 Also, cap new_size to PACKET_MAX_SIZE (in case
1603 PACKET_MAX_SIZE < UINT_MAX32).
1604 */
1605 size_t new_size= static_cast<size_t>(
1606 std::min(static_cast<double>(PACKET_MAX_SIZE),
1607 static_cast<double>(current_size * PACKET_GROW_FACTOR)));
1608
1609 new_size= ALIGN_SIZE(std::max(new_size, min_size));
1610
1611 return new_size;
1612 }
1613
calc_shrink_buffer_size(size_t current_size)1614 void Binlog_sender::calc_shrink_buffer_size(size_t current_size)
1615 {
1616 size_t new_size= static_cast<size_t>(
1617 std::max(static_cast<double>(PACKET_MIN_SIZE),
1618 static_cast<double>(current_size * PACKET_SHRINK_FACTOR)));
1619
1620 m_new_shrink_size= ALIGN_SIZE(new_size);
1621 }
1622 #endif // HAVE_REPLICATION
1623