1 /* Copyright (c) 2013, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #ifdef HAVE_REPLICATION
24 #include "rpl_binlog_sender.h"
25 
26 #include "debug_sync.h"              // debug_sync_set_action
27 #include "log.h"                     // sql_print_information
28 #include "log_event.h"               // MAX_MAX_ALLOWED_PACKET
29 #include "rpl_constants.h"           // BINLOG_DUMP_NON_BLOCK
30 #include "rpl_handler.h"             // RUN_HOOK
31 #include "rpl_master.h"              // opt_sporadic_binlog_dump_fail
32 #include "rpl_reporting.h"           // MAX_SLAVE_ERRMSG
33 #include "sql_class.h"               // THD
34 
35 #include "pfs_file_provider.h"
36 #include "mysql/psi/mysql_file.h"
37 
38 #ifndef NDEBUG
39   static uint binlog_dump_count= 0;
40 #endif
41 using binary_log::checksum_crc32;
42 
43 const uint32 Binlog_sender::PACKET_MIN_SIZE= 4096;
44 const uint32 Binlog_sender::PACKET_MAX_SIZE= UINT_MAX32;
45 const ushort Binlog_sender::PACKET_SHRINK_COUNTER_THRESHOLD= 100;
46 const float Binlog_sender::PACKET_GROW_FACTOR= 2.0;
47 const float Binlog_sender::PACKET_SHRINK_FACTOR= 0.5;
48 
49 /**
50   @class Observe_transmission_guard
51 
52   Sentry class to guard the transitions for `Delegate::m_observe_transmission`
53   flag within given contexts.
54 
55  */
56 class Observe_transmission_guard
57 {
58  public:
59   /**
60     Constructor for the class. It will change the value of the `flag` parameter
61     according with the `event_type` and `event_ptr` content. The `flag` will be
62     set to `true` as follows:
63 
64     - The event is an `XID_EVENT`
65     - The event is an `XA_PREPARE_LOG_EVENT`.
66     - The event is a `QUERY_EVENT` with query equal to "XA COMMIT" or "XA ABORT"
67       or "COMMIT".
68     - The event is the first `QUERY_EVENT` after a `GTID_EVENT` and the query is
69       not "BEGIN" --the statement is a DDL, for instance.
70 
71     @param flag            The flag variable to guard
72     @param event_type      The type of the event being processed
73     @param event_ptr       The raw content of the event being processed
74     @param event_len       The size of the raw content of the event being
75                            processed
76     @param checksum_alg    The checksum algorithm being used currently
77     @param prev_event_type The type of the event processed just before the
78                            current one
79    */
Observe_transmission_guard(bool & flag,binary_log::Log_event_type event_type,const char * event_ptr,uint32 event_len,binary_log::enum_binlog_checksum_alg checksum_alg,binary_log::Log_event_type prev_event_type)80   Observe_transmission_guard(bool &flag, binary_log::Log_event_type event_type,
81                              const char *event_ptr, uint32 event_len,
82                              binary_log::enum_binlog_checksum_alg checksum_alg,
83                              binary_log::Log_event_type prev_event_type)
84       : m_saved(flag), m_to_set(flag)
85   {
86     if (my_atomic_load32(&opt_atomic_replication_sender_observe_commit_only))
87     {
88       switch (event_type)
89       {
90         case binary_log::XID_EVENT:
91         case binary_log::XA_PREPARE_LOG_EVENT:
92         {
93           m_to_set= true;
94           break;
95         }
96         case binary_log::QUERY_EVENT:
97         {
98           bool first_event_after_gtid=
99               prev_event_type == binary_log::ANONYMOUS_GTID_LOG_EVENT ||
100               prev_event_type == binary_log::GTID_LOG_EVENT;
101 
102           Format_description_log_event fd_ev(BINLOG_VERSION);
103           fd_ev.common_footer->checksum_alg= checksum_alg;
104           Query_log_event ev(event_ptr, event_len, &fd_ev,
105                              binary_log::QUERY_EVENT);
106           if (first_event_after_gtid)
107             m_to_set= (strcmp("BEGIN", ev.query) != 0);
108           else
109             m_to_set= (strncmp("XA COMMIT", ev.query, 9) == 0) ||
110                       (strncmp("XA ABORT", ev.query, 8) == 0) ||
111                       (strncmp("COMMIT", ev.query, 6) == 0);
112           break;
113         }
114         default:
115         {
116           m_to_set= false;
117           break;
118         }
119       }
120     }
121   }
122 
123   /**
124     Destructor for the sentry class. It will instantiate the guarded flag with
125     the value prior to the creation of this object.
126   */
~Observe_transmission_guard()127   ~Observe_transmission_guard() { m_to_set= m_saved; }
128 
129  private:
130   /** The value of the guarded flag upon this object creation */
131   bool m_saved;
132   /** The flag variable to guard */
133   bool &m_to_set;
134 };
135 
136 /**
137   @class Sender_context_guard
138 
139   Sentry class that guards the Binlog_sender context and, at destruction, will
140   prepare it for the next event to be processed.
141  */
142 class Sender_context_guard
143 {
144  public:
145   /**
146     Class constructor that simply stores, internally, the reference for the
147     `Binlog_sender` to be guarded and the values to be set upon destruction.
148 
149     @param target     The `Binlog_sender` object to be guarded.
150     @param event_type The currently processed event type, to be used for context
151                       of the next event processing round.
152    */
Sender_context_guard(Binlog_sender & target,binary_log::Log_event_type event_type)153   Sender_context_guard(Binlog_sender &target,
154                        binary_log::Log_event_type event_type)
155       : m_target(target), m_event_type(event_type)
156   {
157   }
158 
159   /**
160     Class destructor that will set the proper context of the guarded
161     `Binlog_sender` object.
162    */
~Sender_context_guard()163   virtual ~Sender_context_guard()
164   {
165     m_target.set_prev_event_type(m_event_type);
166   }
167 
168  private:
169   /** The object to be guarded */
170   Binlog_sender &m_target;
171   /** The currently being processed event type */
172   binary_log::Log_event_type m_event_type;
173 };
174 
Binlog_sender(THD * thd,const char * start_file,my_off_t start_pos,Gtid_set * exclude_gtids,uint32 flag)175 Binlog_sender::Binlog_sender(THD *thd, const char *start_file,
176                              my_off_t start_pos,
177                              Gtid_set *exclude_gtids, uint32 flag)
178   : m_thd(thd),
179     m_packet(*thd->get_protocol_classic()->get_packet()),
180     m_start_file(start_file),
181     m_start_pos(start_pos), m_exclude_gtid(exclude_gtids),
182     m_using_gtid_protocol(exclude_gtids != NULL),
183     m_check_previous_gtid_event(exclude_gtids != NULL),
184     m_gtid_clear_fd_created_flag(exclude_gtids == NULL),
185     m_diag_area(false),
186     m_errmsg(NULL), m_errno(0), m_last_file(NULL), m_last_pos(0),
187     m_half_buffer_size_req_counter(0), m_new_shrink_size(PACKET_MIN_SIZE),
188     m_fdle(NULL), m_flag(flag), m_observe_transmission(false),
189     m_transmit_started(false), m_prev_event_type(binary_log::UNKNOWN_EVENT)
190   {}
191 
init()192 void Binlog_sender::init()
193 {
194   DBUG_ENTER("Binlog_sender::init");
195   THD *thd= m_thd;
196 
197   thd->push_diagnostics_area(&m_diag_area);
198   init_heartbeat_period();
199   m_last_event_sent_ts= time(0);
200 
201   mysql_mutex_lock(&thd->LOCK_thd_data);
202   thd->current_linfo= &m_linfo;
203   mysql_mutex_unlock(&thd->LOCK_thd_data);
204 
205   /* Initialize the buffer only once. */
206   m_packet.mem_realloc(PACKET_MIN_SIZE); // size of the buffer
207   m_new_shrink_size= PACKET_MIN_SIZE;
208   DBUG_PRINT("info", ("Initial packet->alloced_length: %zu",
209                       m_packet.alloced_length()));
210 
211   if (!mysql_bin_log.is_open())
212   {
213     set_fatal_error("Binary log is not open");
214     DBUG_VOID_RETURN;
215   }
216 
217   if (DBUG_EVALUATE_IF("simulate_no_server_id", true, server_id == 0))
218   {
219     set_fatal_error("Misconfigured master - master server_id is 0");
220     DBUG_VOID_RETURN;
221   }
222 
223   if (m_using_gtid_protocol)
224   {
225     enum_gtid_mode gtid_mode= get_gtid_mode(GTID_MODE_LOCK_NONE);
226     if (gtid_mode != GTID_MODE_ON)
227     {
228       char buf[MYSQL_ERRMSG_SIZE];
229       sprintf(buf, "The replication sender thread cannot start in "
230               "AUTO_POSITION mode: this server has GTID_MODE = %.192s "
231               "instead of ON.", get_gtid_mode_string(gtid_mode));
232       set_fatal_error(buf);
233       DBUG_VOID_RETURN;
234     }
235   }
236 
237   if (check_start_file())
238     DBUG_VOID_RETURN;
239 
240   sql_print_information("Start binlog_dump to master_thread_id(%u) "
241                         "slave_server(%u), pos(%s, %llu)",
242                         thd->thread_id(), thd->server_id,
243                         m_start_file, m_start_pos);
244 
245   if (RUN_HOOK(binlog_transmit, transmit_start,
246                (thd, m_flag, m_start_file, m_start_pos,
247                 &m_observe_transmission)))
248   {
249     set_unknow_error("Failed to run hook 'transmit_start'");
250     DBUG_VOID_RETURN;
251   }
252   m_transmit_started=true;
253 
254   init_checksum_alg();
255   /*
256     There are two ways to tell the server to not block:
257 
258     - Set the BINLOG_DUMP_NON_BLOCK flag.
259       This is official, documented, not used by any mysql
260       client, but used by some external users.
261 
262     - Set server_id=0.
263       This is unofficial, undocumented, and used by
264       mysqlbinlog -R since the beginning of time.
265 
266     When mysqlbinlog --stop-never is used, it sets a 'fake'
267     server_id that defaults to 1 but can be set to anything
268     else using stop-never-slave-server-id. This has the
269     drawback that if the server_id conflicts with any other
270     running slave, or with any other instance of mysqlbinlog
271     --stop-never, then that other instance will be killed.  It
272     is also an unnecessary burden on the user to have to
273     specify a server_id different from all other server_ids
274     just to avoid conflicts.
275 
276     As of MySQL 5.6.20 and 5.7.5, mysqlbinlog redundantly sets
277     the BINLOG_DUMP_NONBLOCK flag when one or both of the
278     following holds:
279     - the --stop-never option is *not* specified
280 
281     In a far future, this means we can remove the unofficial
282     functionality that server_id=0 implies nonblocking
283     behavior. That will allow mysqlbinlog to use server_id=0
284     always. That has the advantage that mysqlbinlog
285     --stop-never cannot cause any running dump threads to be
286     killed.
287   */
288   m_wait_new_events= !((thd->server_id == 0) ||
289                        ((m_flag & BINLOG_DUMP_NON_BLOCK) != 0));
290   /* Binary event can be vary large. So set it to max allowed packet. */
291   thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
292 
293 #ifndef NDEBUG
294   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
295     set_unknow_error("Master fails in COM_BINLOG_DUMP because of "
296                      "--sporadic-binlog-dump-fail");
297   m_event_count= 0;
298 #endif
299   DBUG_VOID_RETURN;
300 }
301 
cleanup()302 void Binlog_sender::cleanup()
303 {
304   DBUG_ENTER("Binlog_sender::cleanup");
305 
306   THD *thd= m_thd;
307 
308   if (m_transmit_started)
309     (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, m_flag));
310 
311   mysql_mutex_lock(&thd->LOCK_thd_data);
312   thd->current_linfo= NULL;
313   mysql_mutex_unlock(&thd->LOCK_thd_data);
314 
315   thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet;
316 
317   thd->pop_diagnostics_area();
318   if (has_error())
319     my_message(m_errno, m_errmsg, MYF(0));
320   else
321     my_eof(thd);
322 
323   DBUG_VOID_RETURN;
324 }
325 
run()326 void Binlog_sender::run()
327 {
328   DBUG_ENTER("Binlog_sender::run");
329   File file= -1;
330   IO_CACHE log_cache;
331   my_off_t start_pos= m_start_pos;
332   const char *log_file= m_linfo.log_file_name;
333   bool is_index_file_reopened_on_binlog_disable= false;
334   init();
335 
336   while (!has_error() && !m_thd->killed)
337   {
338     /*
339       Faked rotate event is only required in a few cases(see comment of the
340       function). But even so, a faked rotate event is always sent before sending
341       event log file, even if a rotate log event exists in last binlog and
342       was already sent. The slave then gets an extra rotation and records
343       two Rotate_log_events.
344 
345       The main issue here are some dependencies on mysqlbinlog, that should be
346       solved in the future.
347     */
348     if (unlikely(fake_rotate_event(log_file, start_pos)))
349       break;
350 
351     file= open_binlog_file(&log_cache, log_file, &m_errmsg);
352     if (unlikely(file < 0))
353     {
354       set_fatal_error(m_errmsg);
355       break;
356     }
357 
358     THD_STAGE_INFO(m_thd, stage_sending_binlog_event_to_slave);
359     if (send_binlog(&log_cache, start_pos))
360       break;
361 
362     /* Will go to next file, need to copy log file name */
363     set_last_file(log_file);
364 
365     THD_STAGE_INFO(m_thd,
366                    stage_finished_reading_one_binlog_switching_to_next_binlog);
367     DBUG_EXECUTE_IF("waiting_for_disable_binlog",
368 		    {
369 		    const char act[]= "now "
370 		    "signal dump_thread_reached_wait_point "
371 		    "wait_for continue_dump_thread no_clear_event";
372 		    assert(!debug_sync_set_action(current_thd,
373                                                   STRING_WITH_LEN(act)));
374 		    };);
375     mysql_bin_log.lock_index();
376     if (!mysql_bin_log.is_open())
377     {
378       if (mysql_bin_log.open_index_file(mysql_bin_log.get_index_fname(),
379 					log_file, FALSE))
380       {
381         set_fatal_error("Binary log is not open and failed to open index file "
382                         "to retrieve next file.");
383         mysql_bin_log.unlock_index();
384         break;
385       }
386       is_index_file_reopened_on_binlog_disable= true;
387     }
388     int error= mysql_bin_log.find_next_log(&m_linfo, 0);
389     mysql_bin_log.unlock_index();
390     if (unlikely(error))
391     {
392       DBUG_EXECUTE_IF("waiting_for_disable_binlog",
393 		      {
394 		      const char act[]= "now signal consumed_binlog";
395 		      assert(!debug_sync_set_action(current_thd,
396                                                     STRING_WITH_LEN(act)));
397 		      };);
398       if (is_index_file_reopened_on_binlog_disable)
399         mysql_bin_log.close(LOG_CLOSE_INDEX, true/*need_lock_log=true*/,
400                             true/*need_lock_index=true*/);
401       set_fatal_error("could not find next log");
402       break;
403     }
404 
405     start_pos= BIN_LOG_HEADER_SIZE;
406     end_io_cache(&log_cache);
407     mysql_file_close(file, MYF(MY_WME));
408     file= -1;
409   }
410 
411   THD_STAGE_INFO(m_thd, stage_waiting_to_finalize_termination);
412   char error_text[MAX_SLAVE_ERRMSG];
413 
414   /*
415     If the dump thread was killed because of a duplicate slave UUID we
416     will fail throwing an error to the slave so it will not try to
417     reconnect anymore.
418   */
419   mysql_mutex_lock(&m_thd->LOCK_thd_data);
420   bool was_killed_by_duplicate_slave_id= m_thd->duplicate_slave_id;
421   mysql_mutex_unlock(&m_thd->LOCK_thd_data);
422   if (was_killed_by_duplicate_slave_id)
423     set_fatal_error("A slave with the same server_uuid/server_id as this slave "
424                     "has connected to the master");
425 
426   if (file > 0)
427   {
428     if (is_fatal_error())
429     {
430       /* output events range to error message */
431       my_snprintf(error_text, sizeof(error_text),
432                   "%s; the first event '%s' at %lld, "
433                   "the last event read from '%s' at %lld, "
434                   "the last byte read from '%s' at %lld.",
435                   m_errmsg,
436                   m_start_file, m_start_pos, m_last_file, m_last_pos,
437                   log_file, my_b_tell(&log_cache));
438       set_fatal_error(error_text);
439     }
440 
441     end_io_cache(&log_cache);
442     mysql_file_close(file, MYF(MY_WME));
443   }
444 
445   cleanup();
446   DBUG_VOID_RETURN;
447 }
448 
send_binlog(IO_CACHE * log_cache,my_off_t start_pos)449 my_off_t Binlog_sender::send_binlog(IO_CACHE *log_cache, my_off_t start_pos)
450 {
451   if (unlikely(send_format_description_event(log_cache, start_pos)))
452     return 1;
453 
454   if (start_pos == BIN_LOG_HEADER_SIZE)
455     start_pos= my_b_tell(log_cache);
456 
457   if (m_check_previous_gtid_event)
458   {
459     bool has_prev_gtid_ev;
460     if (has_previous_gtid_log_event(log_cache, &has_prev_gtid_ev))
461       return 1;
462 
463     if (!has_prev_gtid_ev)
464       return 0;
465   }
466 
467   /*
468     Slave is requesting a position which is in the middle of a file,
469     so seek to the correct position.
470   */
471   if (my_b_tell(log_cache) != start_pos)
472     my_b_seek(log_cache, start_pos);
473 
474   while (!m_thd->killed)
475   {
476     my_off_t end_pos;
477 
478     end_pos= get_binlog_end_pos(log_cache);
479     if (end_pos <= 1)
480       return end_pos;
481 
482     if (send_events(log_cache, end_pos))
483       return 1;
484 
485     m_thd->killed= DBUG_EVALUATE_IF("simulate_kill_dump", THD::KILL_CONNECTION,
486                                     m_thd->killed);
487 
488     DBUG_EXECUTE_IF("wait_after_binlog_EOF",
489                     {
490                       const char act[]= "now wait_for signal.rotate_finished no_clear_event";
491                       assert(!debug_sync_set_action(current_thd,
492                                                     STRING_WITH_LEN(act)));
493                     };);
494   }
495   return 1;
496 }
497 
get_binlog_end_pos(IO_CACHE * log_cache)498 inline my_off_t Binlog_sender::get_binlog_end_pos(IO_CACHE *log_cache)
499 {
500   DBUG_ENTER("Binlog_sender::get_binlog_end_pos()");
501   my_off_t log_pos= my_b_tell(log_cache);
502   my_off_t end_pos= 0;
503 
504   do
505   {
506     mysql_bin_log.lock_binlog_end_pos();
507     end_pos= mysql_bin_log.get_binlog_end_pos();
508     mysql_bin_log.unlock_binlog_end_pos();
509 
510     if (unlikely(!mysql_bin_log.is_active(m_linfo.log_file_name)))
511     {
512       end_pos= my_b_filelength(log_cache);
513       if (log_pos == end_pos)
514         DBUG_RETURN(0); // Arrived the end of inactive file
515       else
516         DBUG_RETURN(end_pos);
517     }
518 
519     DBUG_PRINT("info", ("Reading file %s, seek pos %llu, end_pos is %llu",
520                         m_linfo.log_file_name, log_pos, end_pos));
521     DBUG_PRINT("info", ("Active file is %s", mysql_bin_log.get_log_fname()));
522 
523     if (log_pos < end_pos)
524       DBUG_RETURN(end_pos);
525 
526     /* Some data may be in net buffer, it should be flushed before waiting */
527     if (!m_wait_new_events || flush_net())
528       DBUG_RETURN(1);
529 
530     if (unlikely(wait_new_events(log_pos)))
531       DBUG_RETURN(1);
532   } while (unlikely(!m_thd->killed));
533 
534   DBUG_RETURN(1);
535 }
536 
send_events(IO_CACHE * log_cache,my_off_t end_pos)537 int Binlog_sender::send_events(IO_CACHE *log_cache, my_off_t end_pos)
538 {
539   DBUG_ENTER("Binlog_sender::send_events");
540 
541   THD *thd= m_thd;
542   const char *log_file= m_linfo.log_file_name;
543   my_off_t log_pos= my_b_tell(log_cache);
544   my_off_t exclude_group_end_pos= 0;
545   bool in_exclude_group= false;
546 
547   while (likely(log_pos < end_pos))
548   {
549     uchar* event_ptr;
550     uint32 event_len;
551 
552     if (unlikely(thd->killed))
553         DBUG_RETURN(1);
554 
555     if (unlikely(read_event(log_cache, m_event_checksum_alg,
556                             &event_ptr, &event_len)))
557       DBUG_RETURN(1);
558 
559     Log_event_type event_type= (Log_event_type)event_ptr[EVENT_TYPE_OFFSET];
560     if (unlikely(check_event_type(event_type, log_file, log_pos)))
561       DBUG_RETURN(1);
562 
563     Sender_context_guard ctx_guard(*this, event_type);
564     Observe_transmission_guard obs_guard(m_observe_transmission, event_type,
565                                          const_cast<const char*>(
566                                             reinterpret_cast<char*>(event_ptr)),
567                                          event_len, m_event_checksum_alg,
568                                          m_prev_event_type);
569 
570     DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
571                     {
572                       if (event_type == binary_log::XID_EVENT)
573                       {
574                         thd->get_protocol_classic()->flush_net();
575                         const char act[]=
576                           "now "
577                           "wait_for signal.continue";
578                         assert(opt_debug_sync_timeout > 0);
579                         assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
580                       }
581                     });
582 
583     log_pos= my_b_tell(log_cache);
584 
585     if (before_send_hook(log_file, log_pos))
586       DBUG_RETURN(1);
587     /*
588       TODO: Set m_exclude_gtid to NULL if all gtids in m_exclude_gtid has
589       be skipped. and maybe removing the gtid from m_exclude_gtid will make
590       skip_event has better performance.
591     */
592     if (m_exclude_gtid && (in_exclude_group= skip_event(event_ptr, event_len,
593                                                         in_exclude_group)))
594     {
595       /*
596         If we have not send any event from past 'heartbeat_period' time
597         period, then it is time to send a packet before skipping this group.
598        */
599       DBUG_EXECUTE_IF("inject_2sec_sleep_when_skipping_an_event",
600                       {
601                       my_sleep(2000000);
602                       });
603       time_t now= time(0);
604       assert(now >= m_last_event_sent_ts);
605       bool time_for_hb_event= ((ulonglong)(now - m_last_event_sent_ts)
606                           >= (ulonglong)(m_heartbeat_period/1000000000UL));
607       if (time_for_hb_event)
608       {
609         if (unlikely(send_heartbeat_event(log_pos)))
610           DBUG_RETURN(1);
611         exclude_group_end_pos= 0;
612       }
613       else
614       {
615         exclude_group_end_pos= log_pos;
616       }
617       DBUG_PRINT("info", ("Event of type %s is skipped",
618                           Log_event::get_type_str(event_type)));
619     }
620     else
621     {
622       /*
623         A heartbeat is required before sending a event, If some events are
624         skipped. It notifies the slave to increase master_log_pos for
625         excluded events.
626       */
627       if (exclude_group_end_pos)
628       {
629         /* Save a copy of the buffer content. */
630         String tmp;
631         tmp.copy(m_packet);
632         tmp.length(m_packet.length());
633 
634         if (unlikely(send_heartbeat_event(exclude_group_end_pos)))
635           DBUG_RETURN(1);
636         exclude_group_end_pos= 0;
637 
638         /* Restore the copy back. */
639         m_packet.copy(tmp);
640         m_packet.length(tmp.length());
641       }
642 
643       if (unlikely(send_packet()))
644         DBUG_RETURN(1);
645 
646       DBUG_EXECUTE_IF("dump_thread_wait_after_send_write_rows", {
647         if (event_type == binary_log::WRITE_ROWS_EVENT) {
648           thd->get_protocol_classic()->flush_net();
649           static const char act[] =
650               "now "
651               "wait_for signal.continue";
652           assert(opt_debug_sync_timeout > 0);
653           assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
654         }
655       });
656     }
657 
658     if (unlikely(after_send_hook(log_file, in_exclude_group ? log_pos : 0)))
659       DBUG_RETURN(1);
660   }
661 
662   /*
663     A heartbeat is needed before waiting for more events, if some
664     events are skipped. This is needed so that the slave can increase
665     master_log_pos correctly.
666   */
667   if (unlikely(in_exclude_group))
668   {
669     if (send_heartbeat_event(log_pos))
670       DBUG_RETURN(1);
671   }
672   DBUG_RETURN(0);
673 }
674 
675 
check_event_type(Log_event_type type,const char * log_file,my_off_t log_pos)676 bool Binlog_sender::check_event_type(Log_event_type type,
677                                      const char *log_file, my_off_t log_pos)
678 {
679   if (type == binary_log::ANONYMOUS_GTID_LOG_EVENT)
680   {
681     /*
682       Normally, there will not be any anonymous events when
683       auto_position is enabled, since both the master and the slave
684       refuse to connect if the master is not using GTID_MODE=ON.
685       However, if the master changes GTID_MODE after the connection
686       was initialized, or if the slave requests to replicate
687       transactions that appear before the last anonymous event, then
688       this can happen. Then we generate this error to prevent sending
689       anonymous transactions to the slave.
690     */
691     if (m_using_gtid_protocol)
692     {
693       DBUG_EXECUTE_IF("skip_sender_anon_autoposition_error",
694                       {
695                         return false;
696                       };);
697       char buf[MYSQL_ERRMSG_SIZE];
698       sprintf(buf, ER(ER_CANT_REPLICATE_ANONYMOUS_WITH_AUTO_POSITION),
699               log_file, log_pos);
700       set_fatal_error(buf);
701       return true;
702     }
703     /*
704       Normally, there will not be any anonymous events when master has
705       GTID_MODE=ON, since anonymous events are not generated when
706       GTID_MODE=ON.  However, this can happen if the master changes
707       GTID_MODE to ON when the slave has not yet replicated all
708       anonymous transactions.
709     */
710     else if (get_gtid_mode(GTID_MODE_LOCK_NONE) == GTID_MODE_ON)
711     {
712       char buf[MYSQL_ERRMSG_SIZE];
713       sprintf(buf, ER(ER_CANT_REPLICATE_ANONYMOUS_WITH_GTID_MODE_ON),
714               log_file, log_pos);
715       set_fatal_error(buf);
716       return true;
717     }
718   }
719   else if (type == binary_log::GTID_LOG_EVENT)
720   {
721     /*
722       Normally, there will not be any GTID events when master has
723       GTID_MODE=OFF, since GTID events are not generated when
724       GTID_MODE=OFF.  However, this can happen if the master changes
725       GTID_MODE to OFF when the slave has not yet replicated all GTID
726       transactions.
727     */
728     if (get_gtid_mode(GTID_MODE_LOCK_NONE) == GTID_MODE_OFF)
729     {
730       char buf[MYSQL_ERRMSG_SIZE];
731       sprintf(buf, ER(ER_CANT_REPLICATE_GTID_WITH_GTID_MODE_OFF),
732               log_file, log_pos);
733       set_fatal_error(buf);
734       return true;
735     }
736   }
737   return false;
738 }
739 
740 
skip_event(const uchar * event_ptr,uint32 event_len,bool in_exclude_group)741 inline bool Binlog_sender::skip_event(const uchar *event_ptr, uint32 event_len,
742                                       bool in_exclude_group)
743 {
744   DBUG_ENTER("Binlog_sender::skip_event");
745 
746   uint8 event_type= (Log_event_type) event_ptr[LOG_EVENT_OFFSET];
747   switch (event_type)
748   {
749   case binary_log::GTID_LOG_EVENT:
750     {
751       Format_description_log_event fd_ev(BINLOG_VERSION);
752       fd_ev.common_footer->checksum_alg= m_event_checksum_alg;
753       Gtid_log_event gtid_ev((const char *)event_ptr, event_checksum_on() ?
754                              event_len - BINLOG_CHECKSUM_LEN : event_len,
755                              &fd_ev);
756       Gtid gtid;
757       gtid.sidno= gtid_ev.get_sidno(m_exclude_gtid->get_sid_map());
758       gtid.gno= gtid_ev.get_gno();
759       DBUG_RETURN(m_exclude_gtid->contains_gtid(gtid));
760     }
761   case binary_log::ROTATE_EVENT:
762     DBUG_RETURN(false);
763   }
764   DBUG_RETURN(in_exclude_group);
765 }
766 
wait_new_events(my_off_t log_pos)767 int Binlog_sender::wait_new_events(my_off_t log_pos)
768 {
769   int ret= 0;
770   PSI_stage_info old_stage;
771 
772   mysql_bin_log.lock_binlog_end_pos();
773   m_thd->ENTER_COND(mysql_bin_log.get_log_cond(),
774                     mysql_bin_log.get_binlog_end_pos_lock(),
775                     &stage_master_has_sent_all_binlog_to_slave,
776                     &old_stage);
777 
778   if (mysql_bin_log.get_binlog_end_pos() <= log_pos &&
779       mysql_bin_log.is_active(m_linfo.log_file_name))
780   {
781     if (m_heartbeat_period)
782       ret= wait_with_heartbeat(log_pos);
783     else
784       ret= wait_without_heartbeat();
785   }
786 
787   mysql_bin_log.unlock_binlog_end_pos();
788   m_thd->EXIT_COND(&old_stage);
789   return ret;
790 }
791 
wait_with_heartbeat(my_off_t log_pos)792 inline int Binlog_sender::wait_with_heartbeat(my_off_t log_pos)
793 {
794 #ifndef NDEBUG
795   ulong hb_info_counter= 0;
796 #endif
797   struct timespec ts;
798   int ret;
799 
800   do
801   {
802     set_timespec_nsec(&ts, m_heartbeat_period);
803     ret= mysql_bin_log.wait_for_update_bin_log(m_thd, &ts);
804     if (ret != ETIMEDOUT && ret != ETIME)
805       break;
806 
807 #ifndef NDEBUG
808       if (hb_info_counter < 3)
809       {
810         sql_print_information("master sends heartbeat message");
811         hb_info_counter++;
812         if (hb_info_counter == 3)
813           sql_print_information("the rest of heartbeat info skipped ...");
814       }
815 #endif
816       if (send_heartbeat_event(log_pos))
817         return 1;
818   } while (!m_thd->killed);
819 
820   return ret ? 1 : 0;
821 }
822 
wait_without_heartbeat()823 inline int Binlog_sender::wait_without_heartbeat()
824 {
825   return mysql_bin_log.wait_for_update_bin_log(m_thd, NULL);
826 }
827 
init_heartbeat_period()828 void Binlog_sender::init_heartbeat_period()
829 {
830   my_bool null_value;
831   LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
832 
833   /* Protects m_thd->user_vars. */
834   mysql_mutex_lock(&m_thd->LOCK_thd_data);
835 
836   user_var_entry *entry=
837     (user_var_entry*) my_hash_search(&m_thd->user_vars, (uchar*) name.str,
838                                      name.length);
839   m_heartbeat_period= entry ? entry->val_int(&null_value) : 0;
840 
841   mysql_mutex_unlock(&m_thd->LOCK_thd_data);
842 }
843 
check_start_file()844 int Binlog_sender::check_start_file()
845 {
846   char index_entry_name[FN_REFLEN];
847   char *name_ptr= NULL;
848   File file;
849   IO_CACHE cache;
850   std::string errmsg;
851   my_off_t size;
852 
853   if (m_start_file[0] != '\0')
854   {
855     mysql_bin_log.make_log_name(index_entry_name, m_start_file);
856     name_ptr= index_entry_name;
857   }
858   else if (m_using_gtid_protocol)
859   {
860     /*
861       In normal scenarios, it is not possible that Slave will
862       contain more gtids than Master with resepctive to Master's
863       UUID. But it could be possible case if Master's binary log
864       is truncated(due to raid failure) or Master's binary log is
865       deleted but GTID_PURGED was not set properly. That scenario
866       needs to be validated, i.e., it should *always* be the case that
867       Slave's gtid executed set (+retrieved set) is a subset of
868       Master's gtid executed set with respective to Master's UUID.
869       If it happens, dump thread will be stopped during the handshake
870       with Slave (thus the Slave's I/O thread will be stopped with the
871       error. Otherwise, it can lead to data inconsistency between Master
872       and Slave.
873     */
874     Sid_map* slave_sid_map= m_exclude_gtid->get_sid_map();
875     assert(slave_sid_map);
876     global_sid_lock->wrlock();
877     const rpl_sid &server_sid= gtid_state->get_server_sid();
878     rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid);
879     Gtid_set
880       gtid_executed_and_owned(gtid_state->get_executed_gtids()->get_sid_map());
881 
882     // gtids = executed_gtids & owned_gtids
883     if (gtid_executed_and_owned.add_gtid_set(gtid_state->get_executed_gtids())
884         != RETURN_STATUS_OK)
885     {
886       assert(0);
887     }
888     gtid_state->get_owned_gtids()->get_gtids(gtid_executed_and_owned);
889 
890     if (!m_exclude_gtid->is_subset_for_sid(&gtid_executed_and_owned,
891                                            gtid_state->get_server_sidno(),
892                                            subset_sidno))
893     {
894       global_sid_lock->unlock();
895       set_fatal_error(ER(ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER));
896       return 1;
897     }
898     /*
899       Setting GTID_PURGED (when GTID_EXECUTED set is empty i.e., when
900       previous_gtids are also empty) will make binlog rotate. That
901       leaves first binary log with empty previous_gtids and second
902       binary log's previous_gtids with the value of gtid_purged.
903       In find_first_log_not_in_gtid_set() while we search for a binary
904       log whose previous_gtid_set is subset of slave_gtid_executed,
905       in this particular case, server will always find the first binary
906       log with empty previous_gtids which is subset of any given
907       slave_gtid_executed. Thus Master thinks that it found the first
908       binary log which is actually not correct and unable to catch
909       this error situation. Hence adding below extra if condition
910       to check the situation. Slave should know about Master's purged GTIDs.
911       If Slave's GTID executed + retrieved set does not contain Master's
912       complete purged GTID list, that means Slave is requesting(expecting)
913       GTIDs which were purged by Master. We should let Slave know about the
914       situation. i.e., throw error if slave's GTID executed set is not
915       a superset of Master's purged GTID set.
916       The other case, where user deleted binary logs manually
917       (without using 'PURGE BINARY LOGS' command) but gtid_purged
918       is not set by the user, the following if condition cannot catch it.
919       But that is not a problem because in find_first_log_not_in_gtid_set()
920       while checking for subset previous_gtids binary log, the logic
921       will not find one and an error ER_MASTER_HAS_PURGED_REQUIRED_GTIDS
922       is thrown from there.
923     */
924     if (!gtid_state->get_lost_gtids()->is_subset(m_exclude_gtid))
925     {
926       mysql_bin_log.report_missing_purged_gtids(m_exclude_gtid, errmsg);
927       global_sid_lock->unlock();
928       set_fatal_error(errmsg.c_str());
929       return 1;
930     }
931     global_sid_lock->unlock();
932     Gtid first_gtid= {0, 0};
933     if (mysql_bin_log.find_first_log_not_in_gtid_set(index_entry_name,
934                                                      m_exclude_gtid,
935                                                      &first_gtid,
936                                                      errmsg))
937     {
938       set_fatal_error(errmsg.c_str());
939       return 1;
940     }
941     name_ptr= index_entry_name;
942     /*
943       find_first_log_not_in_gtid_set() guarantees the file it found has
944       Previous_gtids_log_event as all following binlogs. So the variable is
945       set to false which tells not to check the event again when starting to
946       dump binglogs.
947     */
948     m_check_previous_gtid_event= false;
949     /*
950       If we are skipping at least the first transaction of the binlog,
951       we must clear the "created" field of the FD event (set it to 0)
952       to avoid cleaning up temp tables on slave.
953     */
954     m_gtid_clear_fd_created_flag= (first_gtid.sidno >= 1 &&
955                                    first_gtid.gno >= 1 &&
956                                    m_exclude_gtid->contains_gtid(first_gtid));
957   }
958 
959   /*
960     Index entry name is saved into m_linfo. If name_ptr is NULL,
961     then starts from the first file in index file.
962   */
963 
964   if (mysql_bin_log.find_log_pos(&m_linfo, name_ptr, true))
965   {
966     set_fatal_error("Could not find first log file name in binary log "
967                     "index file");
968     return 1;
969   }
970 
971   if (m_start_pos < BIN_LOG_HEADER_SIZE)
972   {
973     set_fatal_error("Client requested master to start replication "
974                     "from position < 4");
975     return 1;
976   }
977 
978   const char *bl_errmsg= NULL;
979   if ((file= open_binlog_file(&cache, m_linfo.log_file_name, &bl_errmsg)) < 0)
980   {
981     set_fatal_error(bl_errmsg);
982     return 1;
983   }
984 
985   size= my_b_filelength(&cache);
986   end_io_cache(&cache);
987   mysql_file_close(file, MYF(MY_WME));
988 
989   if (m_start_pos > size)
990   {
991     set_fatal_error("Client requested master to start replication from "
992                     "position > file size");
993     return 1;
994   }
995   return 0;
996 }
997 
998 extern TYPELIB binlog_checksum_typelib;
999 
init_checksum_alg()1000 void Binlog_sender::init_checksum_alg()
1001 {
1002   DBUG_ENTER("init_binlog_checksum");
1003 
1004   LEX_STRING name= {C_STRING_WITH_LEN("master_binlog_checksum")};
1005   user_var_entry *entry;
1006 
1007   m_slave_checksum_alg= binary_log::BINLOG_CHECKSUM_ALG_UNDEF;
1008 
1009   /* Protects m_thd->user_vars. */
1010   mysql_mutex_lock(&m_thd->LOCK_thd_data);
1011 
1012   entry= (user_var_entry*) my_hash_search(&m_thd->user_vars,
1013                                           (uchar*) name.str, name.length);
1014   if (entry)
1015   {
1016     m_slave_checksum_alg=
1017       static_cast<enum_binlog_checksum_alg>(find_type((char*) entry->ptr(), &binlog_checksum_typelib, 1) - 1);
1018     assert(m_slave_checksum_alg < binary_log::BINLOG_CHECKSUM_ALG_ENUM_END);
1019   }
1020 
1021   mysql_mutex_unlock(&m_thd->LOCK_thd_data);
1022 
1023   /*
1024     m_event_checksum_alg should be set to the checksum algorithm in
1025     Format_description_log_event. But it is used by fake_rotate_event() which
1026     will be called before reading any Format_description_log_event. In that case,
1027     m_slave_checksum_alg is set as the value of m_event_checksum_alg.
1028   */
1029   m_event_checksum_alg= m_slave_checksum_alg;
1030   DBUG_VOID_RETURN;
1031 }
1032 
fake_rotate_event(const char * next_log_file,my_off_t log_pos)1033 int Binlog_sender::fake_rotate_event(const char *next_log_file,
1034                                      my_off_t log_pos)
1035 {
1036   DBUG_ENTER("fake_rotate_event");
1037   const char* p = next_log_file + dirname_length(next_log_file);
1038   size_t ident_len = strlen(p);
1039   size_t event_len = ident_len + LOG_EVENT_HEADER_LEN + Binary_log_event::ROTATE_HEADER_LEN +
1040     (event_checksum_on() ? BINLOG_CHECKSUM_LEN : 0);
1041 
1042   /* reset transmit packet for the fake rotate event below */
1043   if (reset_transmit_packet(0, event_len))
1044     DBUG_RETURN(1);
1045 
1046   size_t event_offset= m_packet.length();
1047   m_packet.length(event_len + event_offset);
1048   uchar *header= (uchar *)m_packet.ptr() + event_offset;
1049   uchar *rotate_header= header + LOG_EVENT_HEADER_LEN;
1050   /*
1051     'when' (the timestamp) is set to 0 so that slave could distinguish between
1052     real and fake Rotate events (if necessary)
1053   */
1054   int4store(header, 0);
1055   header[EVENT_TYPE_OFFSET] = binary_log::ROTATE_EVENT;
1056   int4store(header + SERVER_ID_OFFSET, server_id);
1057   int4store(header + EVENT_LEN_OFFSET, static_cast<uint32>(event_len));
1058   int4store(header + LOG_POS_OFFSET, 0);
1059   int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
1060 
1061   int8store(rotate_header, log_pos);
1062   memcpy(rotate_header + Binary_log_event::ROTATE_HEADER_LEN, p, ident_len);
1063 
1064   if (event_checksum_on())
1065     calc_event_checksum(header, event_len);
1066 
1067   DBUG_RETURN(send_packet());
1068 }
1069 
calc_event_checksum(uchar * event_ptr,size_t event_len)1070 inline void Binlog_sender::calc_event_checksum(uchar *event_ptr, size_t event_len)
1071 {
1072   ha_checksum crc= checksum_crc32(0L, NULL, 0);
1073   crc= checksum_crc32(crc, event_ptr, event_len - BINLOG_CHECKSUM_LEN);
1074   int4store(event_ptr + event_len - BINLOG_CHECKSUM_LEN, crc);
1075 }
1076 
reset_transmit_packet(ushort flags,size_t event_len)1077 inline int Binlog_sender::reset_transmit_packet(ushort flags, size_t event_len)
1078 {
1079   DBUG_ENTER("Binlog_sender::reset_transmit_packet");
1080   DBUG_PRINT("info", ("event_len: %zu, m_packet->alloced_length: %zu",
1081                       event_len, m_packet.alloced_length()));
1082   assert(m_packet.alloced_length() >= PACKET_MIN_SIZE);
1083 
1084   m_packet.length(0);  // size of the content
1085   m_packet.qs_append('\0'); // Set this as an OK packet
1086 
1087   /* reserve and set default header */
1088   if (m_observe_transmission &&
1089       RUN_HOOK(binlog_transmit, reserve_header, (m_thd, flags, &m_packet)))
1090   {
1091     set_unknow_error("Failed to run hook 'reserve_header'");
1092     DBUG_RETURN(1);
1093   }
1094 
1095   /* Resizes the buffer if needed. */
1096   if (grow_packet(event_len))
1097     DBUG_RETURN(1);
1098 
1099   DBUG_PRINT("info", ("m_packet.alloced_length: %zu (after potential "
1100                       "reallocation)", m_packet.alloced_length()));
1101 
1102   DBUG_RETURN(0);
1103 }
1104 
send_format_description_event(IO_CACHE * log_cache,my_off_t start_pos)1105 int Binlog_sender::send_format_description_event(IO_CACHE *log_cache,
1106                                                  my_off_t start_pos)
1107 {
1108   DBUG_ENTER("Binlog_sender::send_format_description_event");
1109   uchar* event_ptr;
1110   uint32 event_len;
1111 
1112   m_fdle.reset(new Format_description_log_event(4));
1113   if (m_fdle == NULL)
1114   {
1115     set_fatal_error("Out-of-memory");
1116     DBUG_RETURN(1);
1117   }
1118 
1119   if (read_event(log_cache, binary_log::BINLOG_CHECKSUM_ALG_OFF, &event_ptr,
1120                  &event_len))
1121     DBUG_RETURN(1);
1122 
1123   DBUG_PRINT("info",
1124              ("Looked for a Format_description_log_event, found event type %s",
1125               Log_event::get_type_str((Log_event_type) event_ptr[EVENT_TYPE_OFFSET])));
1126 
1127   if (event_ptr[EVENT_TYPE_OFFSET] != binary_log::FORMAT_DESCRIPTION_EVENT)
1128   {
1129     set_fatal_error("Could not find format_description_event in binlog file");
1130     DBUG_RETURN(1);
1131   }
1132 
1133   assert(event_ptr[LOG_POS_OFFSET] > 0);
1134   m_event_checksum_alg=
1135     Log_event_footer::get_checksum_alg((const char *)event_ptr, event_len);
1136 
1137   assert(m_event_checksum_alg < binary_log::BINLOG_CHECKSUM_ALG_ENUM_END ||
1138          m_event_checksum_alg == binary_log::BINLOG_CHECKSUM_ALG_UNDEF);
1139 
1140   /* Slave does not support checksum, but binary events include checksum */
1141   if (m_slave_checksum_alg == binary_log::BINLOG_CHECKSUM_ALG_UNDEF &&
1142       event_checksum_on())
1143   {
1144     set_fatal_error("Slave can not handle replication events with the "
1145                     "checksum that master is configured to log");
1146 
1147     sql_print_warning("Master is configured to log replication events "
1148                       "with checksum, but will not send such events to "
1149                       "slaves that cannot process them");
1150     DBUG_RETURN(1);
1151   }
1152 
1153   event_ptr[FLAGS_OFFSET] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1154 
1155   bool event_updated= false;
1156   if (m_using_gtid_protocol)
1157   {
1158     if (m_gtid_clear_fd_created_flag)
1159     {
1160       /*
1161         As we are skipping at least the first transaction of the binlog,
1162         we must clear the "created" field of the FD event (set it to 0)
1163         to avoid destroying temp tables on slave.
1164       */
1165       int4store(event_ptr + LOG_EVENT_MINIMAL_HEADER_LEN + ST_CREATED_OFFSET,
1166                 0);
1167       event_updated= true;
1168     }
1169   }
1170   else if (start_pos > BIN_LOG_HEADER_SIZE)
1171   {
1172     /*
1173       If we are skipping the beginning of the binlog file based on the position
1174       asked by the slave, we must clear the log_pos and the created flag of the
1175       Format_description_log_event to be sent. Mark that this event with
1176       "log_pos=0", so the slave should not increment master's binlog position
1177       (rli->group_master_log_pos)
1178     */
1179     int4store(event_ptr + LOG_POS_OFFSET, 0);
1180     /*
1181       Set the 'created' field to 0 to avoid destroying
1182       temp tables on slave.
1183     */
1184     int4store(event_ptr + LOG_EVENT_MINIMAL_HEADER_LEN + ST_CREATED_OFFSET, 0);
1185     event_updated= true;
1186   }
1187 
1188   /* fix the checksum due to latest changes in header */
1189   if (event_checksum_on() && event_updated)
1190     calc_event_checksum(event_ptr, event_len);
1191 
1192   if (m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_UNDEF &&
1193       m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_OFF)
1194     event_len-= BINLOG_CHECKSUM_LEN;
1195 
1196   Format_description_log_event *new_fdle= NULL;
1197 
1198   new_fdle= new Format_description_log_event(reinterpret_cast<char*>(event_ptr), event_len, m_fdle.get());
1199 
1200   if (new_fdle == NULL)
1201   {
1202     set_fatal_error("Out-of-memory");
1203     DBUG_RETURN(1);
1204   }
1205   m_fdle.reset(new_fdle);
1206 
1207   if (send_packet())
1208     DBUG_RETURN(1);
1209 
1210   my_off_t binlog_pos_after_fdle= my_b_tell(log_cache);
1211 
1212   char header_buffer[LOG_EVENT_MINIMAL_HEADER_LEN];
1213   // Let's check if next event is Start encryption event
1214   // If we go outside the file peek_event_header will also return an error
1215   if (Log_event::peek_event_header(header_buffer, log_cache))
1216   {
1217     my_b_seek(log_cache, binlog_pos_after_fdle);
1218     DBUG_RETURN(0);
1219   }
1220 
1221   // peek_event_header actually moves the log_cache->read_pos, thus we need to rewind
1222   my_b_seek(log_cache, binlog_pos_after_fdle);
1223 
1224   if (static_cast<uchar>(header_buffer[EVENT_TYPE_OFFSET]) == binary_log::START_ENCRYPTION_EVENT)
1225   {
1226     event_ptr= NULL;
1227     my_off_t log_pos= my_b_tell(log_cache);
1228 
1229     if (read_event(log_cache, m_event_checksum_alg, &event_ptr,
1230                    &event_len))
1231       DBUG_RETURN(1);
1232 
1233     if (m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_UNDEF &&
1234         m_event_checksum_alg != binary_log::BINLOG_CHECKSUM_ALG_OFF)
1235       event_len-= BINLOG_CHECKSUM_LEN;
1236 
1237     assert(event_ptr[EVENT_TYPE_OFFSET] == binary_log::START_ENCRYPTION_EVENT);
1238     Start_encryption_log_event sele(reinterpret_cast<char*>(event_ptr), event_len, m_fdle.get());
1239 
1240     if (!sele.is_valid())
1241     {
1242       set_fatal_error("Start encryption log event is invalid");
1243       DBUG_RETURN(1);
1244     }
1245 
1246     if (m_fdle->start_decryption(&sele))
1247     {
1248       set_fatal_error("Could not decrypt binlog: encryption key error");
1249       DBUG_RETURN(1);
1250     }
1251 
1252     if (start_pos <= BIN_LOG_HEADER_SIZE)
1253     {
1254       log_pos= my_b_tell(log_cache);
1255       // We have read start encryption event from master binlog, but we have
1256       // not sent it to slave. We need to inform slave that master position
1257       // has advanced.
1258       if (unlikely(send_heartbeat_event(log_pos)))
1259          DBUG_RETURN(1);
1260     }
1261   }
1262   DBUG_RETURN(0);
1263 }
1264 
has_previous_gtid_log_event(IO_CACHE * log_cache,bool * found)1265 int Binlog_sender::has_previous_gtid_log_event(IO_CACHE *log_cache,
1266                                                bool *found)
1267 {
1268   uchar buf[LOG_EVENT_HEADER_LEN];
1269   *found= false;
1270 
1271   /* It is possible there is only format_description_log_event in the file. */
1272   if (my_b_tell(log_cache) < my_b_filelength(log_cache))
1273   {
1274     if (my_b_read(log_cache, buf, LOG_EVENT_HEADER_LEN) != 0)
1275     {
1276       set_fatal_error(log_read_error_msg(LOG_READ_IO));
1277       return 1;
1278     }
1279     *found= (buf[EVENT_TYPE_OFFSET] == binary_log::PREVIOUS_GTIDS_LOG_EVENT);
1280   }
1281   return 0;
1282 }
1283 
log_read_error_msg(int error)1284 const char* Binlog_sender::log_read_error_msg(int error)
1285 {
1286   switch (error) {
1287   case LOG_READ_BOGUS:
1288     return "bogus data in log event";
1289   case LOG_READ_TOO_LARGE:
1290     return "log event entry exceeded max_allowed_packet; Increase "
1291       "max_allowed_packet on master";
1292   case LOG_READ_IO:
1293     return "I/O error reading log event";
1294   case LOG_READ_MEM:
1295     return "memory allocation failed reading log event";
1296   case LOG_READ_TRUNC:
1297     return "binlog truncated in the middle of event; consider out of disk space on master";
1298   case LOG_READ_CHECKSUM_FAILURE:
1299     return "event read from binlog did not pass crc check";
1300   case LOG_READ_DECRYPT:
1301     return "Event decryption failure";
1302   default:
1303     return "unknown error reading log event on the master";
1304   }
1305 }
1306 
read_event(IO_CACHE * log_cache,enum_binlog_checksum_alg checksum_alg,uchar ** event_ptr,uint32 * event_len)1307 inline int Binlog_sender::read_event(IO_CACHE *log_cache, enum_binlog_checksum_alg checksum_alg,
1308                                      uchar **event_ptr, uint32 *event_len)
1309 {
1310   DBUG_ENTER("Binlog_sender::read_event");
1311 
1312   size_t event_offset;
1313   char header[LOG_EVENT_MINIMAL_HEADER_LEN];
1314   int error= 0;
1315 #ifndef NDEBUG
1316   const char *packet_buffer= NULL;
1317 #endif
1318 
1319   if ((error= Log_event::peek_event_length(event_len, log_cache, header)))
1320     goto read_error;
1321 
1322   if (reset_transmit_packet(0, *event_len))
1323     DBUG_RETURN(1);
1324 
1325   event_offset= m_packet.length();
1326 #ifndef NDEBUG
1327   packet_buffer= m_packet.ptr();
1328 #endif
1329 
1330   DBUG_EXECUTE_IF("dump_thread_before_read_event",
1331                   {
1332                     const char act[]= "now wait_for signal.continue no_clear_event";
1333                     assert(!debug_sync_set_action(current_thd,
1334                                                   STRING_WITH_LEN(act)));
1335                   };);
1336 
1337   /*
1338     packet is big enough to read the event, since we have reallocated based
1339     on the length stated in the event header.
1340   */
1341   if ((error= Log_event::read_log_event(log_cache, &m_packet, m_fdle.get(), NULL, checksum_alg,
1342                                         NULL, NULL, header)))
1343     goto read_error;
1344 
1345   set_last_pos(my_b_tell(log_cache));
1346 
1347   /*
1348     As we pre-allocate the buffer to store the event at reset_transmit_packet,
1349     the buffer should not be changed while calling read_log_event (unless binlog
1350     encryption is on), even knowing that it might call functions to replace the
1351     buffer by one with the size to fit the event. When encryption is on - the buffer
1352     will be replaced with memory allocated for storing decrypted data.
1353   */
1354   assert(encrypt_binlog || packet_buffer == m_packet.ptr());
1355   *event_ptr= (uchar *)m_packet.ptr() + event_offset;
1356 
1357   DBUG_PRINT("info",
1358              ("Read event %s",
1359               Log_event::get_type_str(Log_event_type
1360                                       ((*event_ptr)[EVENT_TYPE_OFFSET]))));
1361 #ifndef NDEBUG
1362   if (check_event_count())
1363     DBUG_RETURN(1);
1364 #endif
1365   DBUG_RETURN(0);
1366 read_error:
1367   /*
1368     In theory, it should never happen. But RESET MASTER deletes binlog file
1369     directly without checking if there is any dump thread working.
1370   */
1371   error= (error == LOG_READ_EOF) ? LOG_READ_IO : error;
1372   set_fatal_error(log_read_error_msg(error));
1373   DBUG_RETURN(1);
1374 }
1375 
send_heartbeat_event(my_off_t log_pos)1376 int Binlog_sender::send_heartbeat_event(my_off_t log_pos)
1377 {
1378   DBUG_ENTER("send_heartbeat_event");
1379   const char* filename= m_linfo.log_file_name;
1380   const char* p= filename + dirname_length(filename);
1381   size_t ident_len= strlen(p);
1382   size_t event_len= ident_len + LOG_EVENT_HEADER_LEN +
1383     (event_checksum_on() ? BINLOG_CHECKSUM_LEN : 0);
1384 
1385   DBUG_PRINT("info", ("log_file_name %s, log_pos %llu", p, log_pos));
1386 
1387   if (reset_transmit_packet(0, event_len))
1388     DBUG_RETURN(1);
1389 
1390   size_t event_offset= m_packet.length();
1391   m_packet.length(event_len + event_offset);
1392   uchar *header= (uchar *)m_packet.ptr() + event_offset;
1393 
1394   /* Timestamp field */
1395   int4store(header, 0);
1396   header[EVENT_TYPE_OFFSET] = binary_log::HEARTBEAT_LOG_EVENT;
1397   int4store(header + SERVER_ID_OFFSET, server_id);
1398   int4store(header + EVENT_LEN_OFFSET, event_len);
1399   int4store(header + LOG_POS_OFFSET, static_cast<uint32>(log_pos));
1400   int2store(header + FLAGS_OFFSET, 0);
1401   memcpy(header + LOG_EVENT_HEADER_LEN, p, ident_len);
1402 
1403   if (event_checksum_on())
1404     calc_event_checksum(header, event_len);
1405 
1406   DBUG_RETURN(send_packet_and_flush());
1407 }
1408 
flush_net()1409 inline int Binlog_sender::flush_net()
1410 {
1411   if (DBUG_EVALUATE_IF("simulate_flush_error", 1,
1412       m_thd->get_protocol_classic()->flush_net()))
1413   {
1414     set_unknow_error("failed on flush_net()");
1415     return 1;
1416   }
1417   return 0;
1418 }
1419 
send_packet()1420 inline int Binlog_sender::send_packet()
1421 {
1422   DBUG_ENTER("Binlog_sender::send_packet");
1423   DBUG_PRINT("info",
1424              ("Sending event of type %s", Log_event::get_type_str(
1425                 (Log_event_type)m_packet.ptr()[1 + EVENT_TYPE_OFFSET])));
1426   // We should always use the same buffer to guarantee that the reallocation
1427   // logic is not broken.
1428   if (DBUG_EVALUATE_IF("simulate_send_error", true,
1429                        my_net_write(
1430                          m_thd->get_protocol_classic()->get_net(),
1431                          (uchar*) m_packet.ptr(), m_packet.length())))
1432   {
1433     set_unknow_error("Failed on my_net_write()");
1434     DBUG_RETURN(1);
1435   }
1436 
1437   /* Shrink the packet if needed. */
1438   int ret= shrink_packet() ? 1 : 0;
1439   m_last_event_sent_ts= time(0);
1440   DBUG_RETURN(ret);
1441 }
1442 
send_packet_and_flush()1443 inline int Binlog_sender::send_packet_and_flush()
1444 {
1445   return (send_packet() || flush_net());
1446 }
1447 
before_send_hook(const char * log_file,my_off_t log_pos)1448 inline int Binlog_sender::before_send_hook(const char *log_file,
1449                                            my_off_t log_pos)
1450 {
1451   if (m_observe_transmission &&
1452       RUN_HOOK(binlog_transmit, before_send_event,
1453                (m_thd, m_flag, &m_packet, log_file, log_pos)))
1454   {
1455     set_unknow_error("run 'before_send_event' hook failed");
1456     return 1;
1457   }
1458   return 0;
1459 }
1460 
after_send_hook(const char * log_file,my_off_t log_pos)1461 inline int Binlog_sender::after_send_hook(const char *log_file,
1462                                           my_off_t log_pos)
1463 {
1464   if (m_observe_transmission &&
1465       RUN_HOOK(binlog_transmit, after_send_event,
1466                (m_thd, m_flag, &m_packet, log_file, log_pos)))
1467   {
1468     set_unknow_error("Failed to run hook 'after_send_event'");
1469     return 1;
1470   }
1471 
1472   /*
1473     semisync after_send_event hook doesn't return and error when net error
1474     happens.
1475   */
1476   if (m_thd->get_protocol_classic()->get_net()->last_errno != 0)
1477   {
1478     set_unknow_error("Found net error");
1479     return 1;
1480   }
1481   return 0;
1482 }
1483 
1484 #ifndef NDEBUG
1485 extern int max_binlog_dump_events;
1486 
check_event_count()1487 inline int Binlog_sender::check_event_count()
1488 {
1489   if (max_binlog_dump_events != 0 &&
1490       (++m_event_count > max_binlog_dump_events))
1491   {
1492     set_unknow_error("Debugging binlog dump abort");
1493     return 1;
1494   }
1495   return 0;
1496 }
1497 #endif
1498 
1499 
grow_packet(size_t extra_size)1500 inline bool Binlog_sender::grow_packet(size_t extra_size)
1501 {
1502   DBUG_ENTER("Binlog_sender::grow_packet");
1503   size_t cur_buffer_size= m_packet.alloced_length();
1504   size_t cur_buffer_used= m_packet.length();
1505   size_t needed_buffer_size= cur_buffer_used + extra_size;
1506 
1507   if (extra_size > (PACKET_MAX_SIZE - cur_buffer_used))
1508     /*
1509        Not enough memory: requesting packet to be bigger than the max
1510        allowed - PACKET_MAX_SIZE.
1511     */
1512     DBUG_RETURN(true);
1513 
1514   /* Grow the buffer if needed. */
1515   if (needed_buffer_size > cur_buffer_size)
1516   {
1517     size_t new_buffer_size;
1518     new_buffer_size= calc_grow_buffer_size(cur_buffer_size, needed_buffer_size);
1519 
1520     if (!new_buffer_size)
1521       DBUG_RETURN(true);
1522 
1523     if (m_packet.mem_realloc(new_buffer_size))
1524       DBUG_RETURN(true);
1525 
1526     /*
1527      Calculates the new, smaller buffer, size to use the next time
1528      one wants to shrink the buffer.
1529     */
1530     calc_shrink_buffer_size(new_buffer_size);
1531   }
1532 
1533   DBUG_RETURN(false);
1534 }
1535 
shrink_packet()1536 inline bool Binlog_sender::shrink_packet()
1537 {
1538   DBUG_ENTER("Binlog_sender::shrink_packet");
1539   bool res= false;
1540   size_t cur_buffer_size= m_packet.alloced_length();
1541   size_t buffer_used= m_packet.length();
1542 
1543   assert(!(cur_buffer_size < PACKET_MIN_SIZE));
1544 
1545   /*
1546      If the packet is already at the minimum size, just
1547      do nothing. Otherwise, check if we should shrink.
1548    */
1549   if (cur_buffer_size > PACKET_MIN_SIZE)
1550   {
1551     /* increment the counter if we used less than the new shrink size. */
1552     if (buffer_used < m_new_shrink_size)
1553     {
1554       m_half_buffer_size_req_counter++;
1555 
1556       /* Check if we should shrink the buffer. */
1557       if (m_half_buffer_size_req_counter == PACKET_SHRINK_COUNTER_THRESHOLD)
1558       {
1559         /*
1560          The last PACKET_SHRINK_COUNTER_THRESHOLD consecutive packets
1561          required less than half of the current buffer size. Lets shrink
1562          it to not hold more memory than we potentially need.
1563         */
1564         m_packet.shrink(m_new_shrink_size);
1565 
1566         /*
1567            Calculates the new, smaller buffer, size to use the next time
1568            one wants to shrink the buffer.
1569          */
1570         calc_shrink_buffer_size(m_new_shrink_size);
1571 
1572         /* Reset the counter. */
1573         m_half_buffer_size_req_counter= 0;
1574       }
1575     }
1576     else
1577       m_half_buffer_size_req_counter= 0;
1578   }
1579 #ifndef NDEBUG
1580   if (res == false)
1581   {
1582     assert(m_new_shrink_size <= cur_buffer_size);
1583     assert(m_packet.alloced_length() >= PACKET_MIN_SIZE);
1584   }
1585 #endif
1586   DBUG_RETURN(res);
1587 }
1588 
calc_grow_buffer_size(size_t current_size,size_t min_size)1589 inline size_t Binlog_sender::calc_grow_buffer_size(size_t current_size,
1590                                                    size_t min_size)
1591 {
1592   /* Check that a sane minimum buffer size was requested.  */
1593   assert(min_size > PACKET_MIN_SIZE);
1594   if (min_size > PACKET_MAX_SIZE)
1595     return 0;
1596 
1597   /*
1598      Even if this overflows (PACKET_MAX_SIZE == UINT_MAX32) and
1599      new_size wraps around, the min_size will always be returned,
1600      i.e., it is a safety net.
1601 
1602      Also, cap new_size to PACKET_MAX_SIZE (in case
1603      PACKET_MAX_SIZE < UINT_MAX32).
1604    */
1605   size_t new_size= static_cast<size_t>(
1606     std::min(static_cast<double>(PACKET_MAX_SIZE),
1607              static_cast<double>(current_size * PACKET_GROW_FACTOR)));
1608 
1609   new_size= ALIGN_SIZE(std::max(new_size, min_size));
1610 
1611   return new_size;
1612 }
1613 
calc_shrink_buffer_size(size_t current_size)1614 void Binlog_sender::calc_shrink_buffer_size(size_t current_size)
1615 {
1616   size_t new_size= static_cast<size_t>(
1617       std::max(static_cast<double>(PACKET_MIN_SIZE),
1618                static_cast<double>(current_size * PACKET_SHRINK_FACTOR)));
1619 
1620   m_new_shrink_size= ALIGN_SIZE(new_size);
1621 }
1622 #endif // HAVE_REPLICATION
1623