1 /* Copyright (c) 2013, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #ifndef DEFINED_RPL_BINLOG_SENDER
24 #define DEFINED_RPL_BINLOG_SENDER
25 
26 #ifdef HAVE_REPLICATION
27 #include "my_global.h"
28 #include "binlog.h"           // LOG_INFO
29 #include "binlog_event.h"     // enum_binlog_checksum_alg, Log_event_type
30 #include "m_string.h"
31 #include "mysqld_error.h"     // ER_*
32 #include "sql_error.h"        // Diagnostics_area
33 
34 
35 /**
36   The major logic of dump thread is implemented in this class. It sends
37   required binlog events to clients according to their requests.
38 */
39 class Binlog_sender
40 {
41 public:
42   Binlog_sender(THD *thd, const char *start_file, my_off_t start_pos,
43                 Gtid_set *exclude_gtids, uint32 flag);
44 
~Binlog_sender()45   ~Binlog_sender() {}
46 
47   /**
48     It checks the dump reqest and sends events to the client until it finish
49     all events(for mysqlbinlog) or encounters an error.
50   */
51   void run();
52 
53   /**
54     Sets the value of the previously processed event.
55 
56     @param type The last processed event type.
57    */
set_prev_event_type(binary_log::Log_event_type type)58   inline void set_prev_event_type(binary_log::Log_event_type type)
59   {
60     m_prev_event_type= type;
61   }
62 
63 private:
64   THD *m_thd;
65   String& m_packet;
66 
67   /* Requested start binlog file and position */
68   const char *m_start_file;
69   my_off_t m_start_pos;
70 
71   /*
72     For COM_BINLOG_DUMP_GTID, It may include a GTID set. All events in the set
73     should not be sent to the client.
74   */
75   Gtid_set *m_exclude_gtid;
76   bool m_using_gtid_protocol;
77   bool m_check_previous_gtid_event;
78   bool m_gtid_clear_fd_created_flag;
79 
80   /* The binlog file it is reading */
81   LOG_INFO m_linfo;
82 
83   binary_log::enum_binlog_checksum_alg m_event_checksum_alg;
84   binary_log::enum_binlog_checksum_alg m_slave_checksum_alg;
85   ulonglong m_heartbeat_period;
86   time_t m_last_event_sent_ts;
87   /*
88     For mysqlbinlog(server_id is 0), it will stop immediately without waiting
89     if it already reads all events.
90   */
91   bool m_wait_new_events;
92 
93   Diagnostics_area m_diag_area;
94   char m_errmsg_buf[MYSQL_ERRMSG_SIZE];
95   const char *m_errmsg;
96   int m_errno;
97   /*
98     The position of the event it reads most recently is stored. So it can report
99     the exact position after where an error happens.
100 
101     m_last_file will point to m_info.log_file_name, if it is same to
102     m_info.log_file_name. Otherwise the file name is copied to m_last_file_buf
103     and m_last_file will point to it.
104   */
105   char m_last_file_buf[FN_REFLEN];
106   const char *m_last_file;
107   my_off_t m_last_pos;
108 
109   /*
110     Needed to be able to evaluate if buffer needs to be resized (shrunk).
111   */
112   ushort m_half_buffer_size_req_counter;
113 
114   /*
115    * The size of the buffer next time we shrink it.
116    * This variable is updated once everytime we shrink or grow the buffer.
117    */
118   size_t m_new_shrink_size;
119 
120   /*
121      Max size of the buffer is 4GB (UINT_MAX32). It is UINT_MAX32 since the
122      threshold is set to (@c Log_event::read_log_event):
123 
124        max(max_allowed_packet,
125            opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER)
126 
127      - opt_binlog_rows_event_max_size is defined as an unsigned long,
128        thence in theory row events can be bigger than UINT_MAX32.
129 
130      - max_allowed_packet is set to MAX_MAX_ALLOWED_PACKET which is in
131        turn defined as 1GB (i.e., 1024*1024*1024). (@c Binlog_sender::init()).
132 
133      Therefore, anything bigger than UINT_MAX32 is not loadable into the
134      packet, thus we set the limit to 4GB (which is the value for UINT_MAX32,
135      @c PACKET_MAXIMUM_SIZE).
136 
137    */
138   const static uint32 PACKET_MAX_SIZE;
139 
140   /*
141    * After these consecutive times using less than half of the buffer
142    * the buffer is shrunk.
143    */
144   const static ushort PACKET_SHRINK_COUNTER_THRESHOLD;
145 
146   /**
147    * The minimum size of the buffer.
148    */
149   const static uint32 PACKET_MIN_SIZE;
150 
151   /**
152    * How much to grow the buffer each time we need to accommodate more bytes
153    * than it currently can hold.
154    */
155   const static float PACKET_GROW_FACTOR;
156 
157   /**
158    * The dual of PACKET_GROW_FACTOR. How much to shrink the buffer each time
159    * it is deemed to being underused.
160    */
161   const static float PACKET_SHRINK_FACTOR;
162 
163   uint32 m_flag;
164   /*
165     It is true if any plugin requires to observe the transmission for each event.
166     And HOOKs(reserve_header, before_send and after_send) are called when
167     transmitting each event. Otherwise, it is false and HOOKs are not called.
168   */
169   bool m_observe_transmission;
170 
171   /* It is true if transmit_start hook is called. If the hook is not called
172    * it will be false.
173    */
174   bool m_transmit_started;
175   /**
176     Type of the previously processed event.
177    */
178   binary_log::Log_event_type m_prev_event_type;
179   /*
180     It initializes the context, checks if the dump request is valid and
181     if binlog status is correct.
182   */
183   void init();
184   void cleanup();
185   void init_heartbeat_period();
186   void init_checksum_alg();
187   /** Check if the requested binlog file and position are valid */
188   int check_start_file();
189   /** Transform read error numbers to error messages. */
190   const char* log_read_error_msg(int error);
191 
192   /**
193     It dumps a binlog file. Events are read and sent one by one. If it need
194     to wait for new events, it will wait after already reading all events in
195     the active log file.
196 
197     @param[in] log_cache  IO_CACHE of the binlog will be sent
198     @param[in] start_pos  Position requested by the slave's IO thread.
199                           Only the events after the position are sent.
200 
201     @return It returns 0 if succeeds, otherwise 1 is returned.
202   */
203   my_off_t send_binlog(IO_CACHE *log_cache, my_off_t start_pos);
204 
205   /**
206     It sends some events in a binlog file to the client.
207 
208 
209      @param[in] log_cache  IO_CACHE of the binlog will be sent
210      @param[in] end_pos    Only the events before end_pos are sent
211 
212      @return It returns 0 if succeeds, otherwise 1 is returned.
213   */
214   int send_events(IO_CACHE *log_cache, my_off_t end_pos);
215 
216   /**
217     It gets the end position of the binlog file.
218 
219     @param[in] log_cache  IO_CACHE of the binlog will be checked
220 
221     @return
222       @retval 0  It already arrives the end of the binlog.
223       @retval 1  Failed to get binlog position
224       @retval >1 Succeeded to get binlog end position
225   */
226   my_off_t get_binlog_end_pos(IO_CACHE *log_cache);
227 
228   /**
229      It checks if a binlog file has Previous_gtid_log_event
230 
231      @param[in]  log_cache  IO_CACHE of the binlog will be checked
232      @param[out] found      Found Previous_gtid_log_event or not
233 
234      @return It returns 0 if succeeds, otherwise 1 is returned.
235   */
236   int has_previous_gtid_log_event(IO_CACHE *log_cache, bool *found);
237 
238   /**
239     It sends a faked rotate event which does not exist physically in any
240     binlog to the slave. It contains the name of the binlog we are going to
241     send to the slave.
242 
243     Faked rotate event is required in a few cases, so slave can know which
244     binlog the following events are from.
245 
246   - The binlog file slave requested is Empty. E.g.
247     "CHANGE MASTER TO MASTER_LOG_FILE='', MASTER_LOG_POS=4", etc.
248 
249   - The position slave requested is exactly the end of a binlog file.
250 
251   - Previous binlog file does not include a rotate event.
252     It happens when server is shutdown and restarted.
253 
254   - The previous binary log was GTID-free (does not contain a
255     Previous_gtids_log_event) and the slave is connecting using
256     the GTID protocol.
257 
258     @param[in] packet         The buffer used to store the faked event.
259     @param[in] next_log_file  The name of the binlog file will be sent after
260                               the rotate event.
261     @param[in] log_pos        The start position of the binlog file.
262 
263     @return It returns 0 if succeeds, otherwise 1 is returned.
264   */
265   int fake_rotate_event(const char *next_log_file, my_off_t log_pos);
266 
267   /**
268      When starting to dump a binlog file, Format_description_log_event
269      is read and sent first. If the requested position is after
270      Format_description_log_event, log_pos field in the first
271      Format_description_log_event has to be set to 0. So the slave
272      will not increment its master's binlog position.
273 
274      @param[in] log_cache IO_CACHE of the binlog will be dumpped
275      @param[in] start_pos Position requested by the slave's IO thread.
276                           Only the events after the position are sent.
277 
278      @return It returns 0 if succeeds, otherwise 1 is returned.
279   */
280   int send_format_description_event(IO_CACHE *log, my_off_t start_pos);
281   /**
282      It sends a heartbeat to the client.
283 
284      @param[in] packet   The buffer used to store the event.
285      @param[in] log_pos  The log position that events before it are sent.
286 
287      @return It returns 0 if succeeds, otherwise 1 is returned.
288   */
289   int send_heartbeat_event(my_off_t log_pos);
290 
291   /**
292      It reads a event from binlog file.
293 
294      @param[in] log_cache     IO_CACHE of the binlog file.
295      @param[in] checksum_alg  Checksum algorithm used to check the event.
296      @param[out] event_ptr    The buffer used to store the event.
297      @param[out] event_len    Length of the event.
298 
299      @return It returns 0 if succeeds, otherwise 1 is returned.
300   */
301   int read_event(IO_CACHE *log_cache,
302                  binary_log::enum_binlog_checksum_alg checksum_alg,
303                  uchar **event_ptr, uint32 *event_len);
304   /**
305     Check if it is allowed to send this event type.
306 
307     The following are disallowed:
308     - GTID_MODE=ON and type==ANONYMOUS_GTID_LOG_EVENT
309     - AUTO_POSITION=1 and type==ANONYMOUS_GTID_LOG_EVENT
310     - GTID_MODE=OFF and type==GTID_LOG_EVENT
311 
312     @param type The event type.
313     @param log_file The binary log file (used in error messages).
314     @param log_pos The binary log position (used in error messages).
315 
316     @retval true The event is not allowed. In this case, this function
317     calls set_fatal_error().
318     @retval false The event is allowed.
319   */
320   bool check_event_type(binary_log::Log_event_type type,
321                         const char *log_file, my_off_t log_pos);
322   /**
323     It checks if the event is in m_exclude_gtid.
324 
325     Clients may request to exclude some GTIDs. The events include in the GTID
326     groups will be skipped. We call below events sequence as a goup,
327     Gtid_log_event
328     BEGIN
329     ...
330     COMMIT or ROLLBACK
331 
332     or
333     Gtid_log_event
334     DDL statement
335 
336     @param[in] event_ptr  Buffer of the event
337     @param[in] event_len  Length of the event
338     @param[in] in_exclude_group  If it is in a execude group
339 
340     @return It returns true if it should be skipped, otherwise false is turned.
341   */
342   bool skip_event(const uchar *event_ptr, uint32 event_len,
343                   bool in_exclude_group);
344 
345   void calc_event_checksum(uchar *event_ptr, size_t event_len);
346   int flush_net();
347   int send_packet();
348   int send_packet_and_flush();
349   int before_send_hook(const char *log_file, my_off_t log_pos);
350   int after_send_hook(const char *log_file, my_off_t log_pos);
351   /*
352     Reset the thread transmit packet buffer for event sending.
353 
354     This function reserves the bytes for event transmission, and
355     should be called before storing the event data to the packet buffer.
356 
357     @param[in] flags      The flag used in reset_transmit hook.
358     @param[in] event_len  If the caller already knows the event length, then
359                           it can pass this value so that reset_transmit_packet
360                           already reallocates the buffer if needed. Otherwise,
361                           if event_len is 0, then the caller needs to extend
362                           the buffer itself.
363   */
364   int reset_transmit_packet(ushort flags, size_t event_len= 0);
365 
366   /**
367     It waits until receiving an update_cond signal. It will send heartbeat
368     periodically if m_heartbeat_period is set.
369 
370     @param[in] log_pos  The end position of the last event it already sent.
371     It is required by heartbeat events.
372 
373     @return It returns 0 if succeeds, otherwise 1 is returned.
374   */
375   int wait_new_events(my_off_t log_pos);
376   int wait_with_heartbeat(my_off_t log_pos);
377   int wait_without_heartbeat();
378 
379 #ifndef NDEBUG
380   /* It is used to count the events that have been sent. */
381   int m_event_count;
382   /*
383     It aborts dump thread with an error if m_event_count exceeds
384     max_binlog_dump_events.
385   */
386   int check_event_count();
387 #endif
388 
has_error()389   bool has_error() { return m_errno != 0; }
set_error(int errorno,const char * errmsg)390   inline void set_error(int errorno, const char *errmsg)
391   {
392     my_snprintf(m_errmsg_buf, sizeof(m_errmsg_buf), "%.*s",
393                 MYSQL_ERRMSG_SIZE - 1, errmsg);
394     m_errmsg= m_errmsg_buf;
395     m_errno= errorno;
396   }
397 
set_unknow_error(const char * errmsg)398   inline void set_unknow_error(const char *errmsg)
399   {
400     set_error(ER_UNKNOWN_ERROR, errmsg);
401   }
402 
set_fatal_error(const char * errmsg)403   inline void set_fatal_error(const char *errmsg)
404   {
405     set_error(ER_MASTER_FATAL_ERROR_READING_BINLOG, errmsg);
406   }
407 
is_fatal_error()408   inline bool is_fatal_error()
409   {
410     return m_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG;
411   }
412 
event_checksum_on()413   inline bool event_checksum_on()
414   {
415     return m_event_checksum_alg > binary_log::BINLOG_CHECKSUM_ALG_OFF &&
416       m_event_checksum_alg < binary_log::BINLOG_CHECKSUM_ALG_ENUM_END;
417   }
418 
set_last_pos(my_off_t log_pos)419   inline void set_last_pos(my_off_t log_pos)
420   {
421     m_last_file= m_linfo.log_file_name;
422     m_last_pos= log_pos;
423   }
424 
set_last_file(const char * log_file)425   inline void set_last_file(const char *log_file)
426   {
427     strcpy(m_last_file_buf, log_file);
428     m_last_file= m_last_file_buf;
429   }
430 
431   /**
432    * This function SHALL grow the buffer of the packet if needed.
433    *
434    * If the buffer used for the packet is large enough to accommodate
435    * the requested extra bytes, then this function does not do anything.
436    *
437    * On the other hand, if the requested size is bigger than the available
438    * free bytes in the buffer, the buffer is extended by a constant factor
439    * (@c PACKET_GROW_FACTOR).
440    *
441    * @param packet  The buffer to resize if needed.
442    * @param extra_size  The size in bytes that the caller wants to add to the buffer.
443    * @return true if an error occurred, false otherwise.
444    */
445   bool grow_packet(size_t extra_size);
446 
447   /**
448    * This function SHALL shrink the size of the buffer used.
449    *
450    * If less than half of the buffer was used in the last N
451    * (@c PACKET_SHRINK_COUNTER_THRESHOLD) consecutive times this function
452    * was called, then the buffer gets shrunk by a constant factor
453    * (@c PACKET_SHRINK_FACTOR).
454    *
455    * The buffer is never shrunk less than a minimum size (@c PACKET_MIN_SIZE).
456    *
457    * @param packet  The buffer to shrink.
458    */
459   bool shrink_packet();
460 
461   /**
462    Helper function to recalculate a new size for the growing buffer.
463 
464    @param current_size The baseline (for instance, the current buffer size).
465    @param min_size The resulting buffer size, needs to be at least as large
466                    as this parameter states.
467    @return The new buffer size, or 0 in the case of an error.
468   */
469   size_t calc_grow_buffer_size(size_t current_size, size_t min_size);
470 
471   /**
472    Helper function to recalculate the new size for the m_new_shrink_size.
473 
474    @param current_size The baseline (for instance, the current buffer size).
475   */
476   void calc_shrink_buffer_size(size_t current_size);
477 };
478 
479 #endif // HAVE_REPLICATION
480 #endif // DEFINED_RPL_BINLOG_SENDER
481