1 /* Copyright (c) 2013, 2021, Oracle and/or its affiliates. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software Foundation, 21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ 22 23 #ifndef DEFINED_RPL_BINLOG_SENDER 24 #define DEFINED_RPL_BINLOG_SENDER 25 26 #ifdef HAVE_REPLICATION 27 #include "my_global.h" 28 #include "binlog.h" // LOG_INFO 29 #include "binlog_event.h" // enum_binlog_checksum_alg, Log_event_type 30 #include "m_string.h" 31 #include "mysqld_error.h" // ER_* 32 #include "sql_error.h" // Diagnostics_area 33 34 35 /** 36 The major logic of dump thread is implemented in this class. It sends 37 required binlog events to clients according to their requests. 38 */ 39 class Binlog_sender 40 { 41 public: 42 Binlog_sender(THD *thd, const char *start_file, my_off_t start_pos, 43 Gtid_set *exclude_gtids, uint32 flag); 44 ~Binlog_sender()45 ~Binlog_sender() {} 46 47 /** 48 It checks the dump reqest and sends events to the client until it finish 49 all events(for mysqlbinlog) or encounters an error. 50 */ 51 void run(); 52 53 /** 54 Sets the value of the previously processed event. 55 56 @param type The last processed event type. 57 */ set_prev_event_type(binary_log::Log_event_type type)58 inline void set_prev_event_type(binary_log::Log_event_type type) 59 { 60 m_prev_event_type= type; 61 } 62 63 private: 64 THD *m_thd; 65 String& m_packet; 66 67 /* Requested start binlog file and position */ 68 const char *m_start_file; 69 my_off_t m_start_pos; 70 71 /* 72 For COM_BINLOG_DUMP_GTID, It may include a GTID set. All events in the set 73 should not be sent to the client. 74 */ 75 Gtid_set *m_exclude_gtid; 76 bool m_using_gtid_protocol; 77 bool m_check_previous_gtid_event; 78 bool m_gtid_clear_fd_created_flag; 79 80 /* The binlog file it is reading */ 81 LOG_INFO m_linfo; 82 83 binary_log::enum_binlog_checksum_alg m_event_checksum_alg; 84 binary_log::enum_binlog_checksum_alg m_slave_checksum_alg; 85 ulonglong m_heartbeat_period; 86 time_t m_last_event_sent_ts; 87 /* 88 For mysqlbinlog(server_id is 0), it will stop immediately without waiting 89 if it already reads all events. 90 */ 91 bool m_wait_new_events; 92 93 Diagnostics_area m_diag_area; 94 char m_errmsg_buf[MYSQL_ERRMSG_SIZE]; 95 const char *m_errmsg; 96 int m_errno; 97 /* 98 The position of the event it reads most recently is stored. So it can report 99 the exact position after where an error happens. 100 101 m_last_file will point to m_info.log_file_name, if it is same to 102 m_info.log_file_name. Otherwise the file name is copied to m_last_file_buf 103 and m_last_file will point to it. 104 */ 105 char m_last_file_buf[FN_REFLEN]; 106 const char *m_last_file; 107 my_off_t m_last_pos; 108 109 /* 110 Needed to be able to evaluate if buffer needs to be resized (shrunk). 111 */ 112 ushort m_half_buffer_size_req_counter; 113 114 /* 115 * The size of the buffer next time we shrink it. 116 * This variable is updated once everytime we shrink or grow the buffer. 117 */ 118 size_t m_new_shrink_size; 119 120 /* 121 Max size of the buffer is 4GB (UINT_MAX32). It is UINT_MAX32 since the 122 threshold is set to (@c Log_event::read_log_event): 123 124 max(max_allowed_packet, 125 opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER) 126 127 - opt_binlog_rows_event_max_size is defined as an unsigned long, 128 thence in theory row events can be bigger than UINT_MAX32. 129 130 - max_allowed_packet is set to MAX_MAX_ALLOWED_PACKET which is in 131 turn defined as 1GB (i.e., 1024*1024*1024). (@c Binlog_sender::init()). 132 133 Therefore, anything bigger than UINT_MAX32 is not loadable into the 134 packet, thus we set the limit to 4GB (which is the value for UINT_MAX32, 135 @c PACKET_MAXIMUM_SIZE). 136 137 */ 138 const static uint32 PACKET_MAX_SIZE; 139 140 /* 141 * After these consecutive times using less than half of the buffer 142 * the buffer is shrunk. 143 */ 144 const static ushort PACKET_SHRINK_COUNTER_THRESHOLD; 145 146 /** 147 * The minimum size of the buffer. 148 */ 149 const static uint32 PACKET_MIN_SIZE; 150 151 /** 152 * How much to grow the buffer each time we need to accommodate more bytes 153 * than it currently can hold. 154 */ 155 const static float PACKET_GROW_FACTOR; 156 157 /** 158 * The dual of PACKET_GROW_FACTOR. How much to shrink the buffer each time 159 * it is deemed to being underused. 160 */ 161 const static float PACKET_SHRINK_FACTOR; 162 163 uint32 m_flag; 164 /* 165 It is true if any plugin requires to observe the transmission for each event. 166 And HOOKs(reserve_header, before_send and after_send) are called when 167 transmitting each event. Otherwise, it is false and HOOKs are not called. 168 */ 169 bool m_observe_transmission; 170 171 /* It is true if transmit_start hook is called. If the hook is not called 172 * it will be false. 173 */ 174 bool m_transmit_started; 175 /** 176 Type of the previously processed event. 177 */ 178 binary_log::Log_event_type m_prev_event_type; 179 /* 180 It initializes the context, checks if the dump request is valid and 181 if binlog status is correct. 182 */ 183 void init(); 184 void cleanup(); 185 void init_heartbeat_period(); 186 void init_checksum_alg(); 187 /** Check if the requested binlog file and position are valid */ 188 int check_start_file(); 189 /** Transform read error numbers to error messages. */ 190 const char* log_read_error_msg(int error); 191 192 /** 193 It dumps a binlog file. Events are read and sent one by one. If it need 194 to wait for new events, it will wait after already reading all events in 195 the active log file. 196 197 @param[in] log_cache IO_CACHE of the binlog will be sent 198 @param[in] start_pos Position requested by the slave's IO thread. 199 Only the events after the position are sent. 200 201 @return It returns 0 if succeeds, otherwise 1 is returned. 202 */ 203 my_off_t send_binlog(IO_CACHE *log_cache, my_off_t start_pos); 204 205 /** 206 It sends some events in a binlog file to the client. 207 208 209 @param[in] log_cache IO_CACHE of the binlog will be sent 210 @param[in] end_pos Only the events before end_pos are sent 211 212 @return It returns 0 if succeeds, otherwise 1 is returned. 213 */ 214 int send_events(IO_CACHE *log_cache, my_off_t end_pos); 215 216 /** 217 It gets the end position of the binlog file. 218 219 @param[in] log_cache IO_CACHE of the binlog will be checked 220 221 @return 222 @retval 0 It already arrives the end of the binlog. 223 @retval 1 Failed to get binlog position 224 @retval >1 Succeeded to get binlog end position 225 */ 226 my_off_t get_binlog_end_pos(IO_CACHE *log_cache); 227 228 /** 229 It checks if a binlog file has Previous_gtid_log_event 230 231 @param[in] log_cache IO_CACHE of the binlog will be checked 232 @param[out] found Found Previous_gtid_log_event or not 233 234 @return It returns 0 if succeeds, otherwise 1 is returned. 235 */ 236 int has_previous_gtid_log_event(IO_CACHE *log_cache, bool *found); 237 238 /** 239 It sends a faked rotate event which does not exist physically in any 240 binlog to the slave. It contains the name of the binlog we are going to 241 send to the slave. 242 243 Faked rotate event is required in a few cases, so slave can know which 244 binlog the following events are from. 245 246 - The binlog file slave requested is Empty. E.g. 247 "CHANGE MASTER TO MASTER_LOG_FILE='', MASTER_LOG_POS=4", etc. 248 249 - The position slave requested is exactly the end of a binlog file. 250 251 - Previous binlog file does not include a rotate event. 252 It happens when server is shutdown and restarted. 253 254 - The previous binary log was GTID-free (does not contain a 255 Previous_gtids_log_event) and the slave is connecting using 256 the GTID protocol. 257 258 @param[in] packet The buffer used to store the faked event. 259 @param[in] next_log_file The name of the binlog file will be sent after 260 the rotate event. 261 @param[in] log_pos The start position of the binlog file. 262 263 @return It returns 0 if succeeds, otherwise 1 is returned. 264 */ 265 int fake_rotate_event(const char *next_log_file, my_off_t log_pos); 266 267 /** 268 When starting to dump a binlog file, Format_description_log_event 269 is read and sent first. If the requested position is after 270 Format_description_log_event, log_pos field in the first 271 Format_description_log_event has to be set to 0. So the slave 272 will not increment its master's binlog position. 273 274 @param[in] log_cache IO_CACHE of the binlog will be dumpped 275 @param[in] start_pos Position requested by the slave's IO thread. 276 Only the events after the position are sent. 277 278 @return It returns 0 if succeeds, otherwise 1 is returned. 279 */ 280 int send_format_description_event(IO_CACHE *log, my_off_t start_pos); 281 /** 282 It sends a heartbeat to the client. 283 284 @param[in] packet The buffer used to store the event. 285 @param[in] log_pos The log position that events before it are sent. 286 287 @return It returns 0 if succeeds, otherwise 1 is returned. 288 */ 289 int send_heartbeat_event(my_off_t log_pos); 290 291 /** 292 It reads a event from binlog file. 293 294 @param[in] log_cache IO_CACHE of the binlog file. 295 @param[in] checksum_alg Checksum algorithm used to check the event. 296 @param[out] event_ptr The buffer used to store the event. 297 @param[out] event_len Length of the event. 298 299 @return It returns 0 if succeeds, otherwise 1 is returned. 300 */ 301 int read_event(IO_CACHE *log_cache, 302 binary_log::enum_binlog_checksum_alg checksum_alg, 303 uchar **event_ptr, uint32 *event_len); 304 /** 305 Check if it is allowed to send this event type. 306 307 The following are disallowed: 308 - GTID_MODE=ON and type==ANONYMOUS_GTID_LOG_EVENT 309 - AUTO_POSITION=1 and type==ANONYMOUS_GTID_LOG_EVENT 310 - GTID_MODE=OFF and type==GTID_LOG_EVENT 311 312 @param type The event type. 313 @param log_file The binary log file (used in error messages). 314 @param log_pos The binary log position (used in error messages). 315 316 @retval true The event is not allowed. In this case, this function 317 calls set_fatal_error(). 318 @retval false The event is allowed. 319 */ 320 bool check_event_type(binary_log::Log_event_type type, 321 const char *log_file, my_off_t log_pos); 322 /** 323 It checks if the event is in m_exclude_gtid. 324 325 Clients may request to exclude some GTIDs. The events include in the GTID 326 groups will be skipped. We call below events sequence as a goup, 327 Gtid_log_event 328 BEGIN 329 ... 330 COMMIT or ROLLBACK 331 332 or 333 Gtid_log_event 334 DDL statement 335 336 @param[in] event_ptr Buffer of the event 337 @param[in] event_len Length of the event 338 @param[in] in_exclude_group If it is in a execude group 339 340 @return It returns true if it should be skipped, otherwise false is turned. 341 */ 342 bool skip_event(const uchar *event_ptr, uint32 event_len, 343 bool in_exclude_group); 344 345 void calc_event_checksum(uchar *event_ptr, size_t event_len); 346 int flush_net(); 347 int send_packet(); 348 int send_packet_and_flush(); 349 int before_send_hook(const char *log_file, my_off_t log_pos); 350 int after_send_hook(const char *log_file, my_off_t log_pos); 351 /* 352 Reset the thread transmit packet buffer for event sending. 353 354 This function reserves the bytes for event transmission, and 355 should be called before storing the event data to the packet buffer. 356 357 @param[in] flags The flag used in reset_transmit hook. 358 @param[in] event_len If the caller already knows the event length, then 359 it can pass this value so that reset_transmit_packet 360 already reallocates the buffer if needed. Otherwise, 361 if event_len is 0, then the caller needs to extend 362 the buffer itself. 363 */ 364 int reset_transmit_packet(ushort flags, size_t event_len= 0); 365 366 /** 367 It waits until receiving an update_cond signal. It will send heartbeat 368 periodically if m_heartbeat_period is set. 369 370 @param[in] log_pos The end position of the last event it already sent. 371 It is required by heartbeat events. 372 373 @return It returns 0 if succeeds, otherwise 1 is returned. 374 */ 375 int wait_new_events(my_off_t log_pos); 376 int wait_with_heartbeat(my_off_t log_pos); 377 int wait_without_heartbeat(); 378 379 #ifndef NDEBUG 380 /* It is used to count the events that have been sent. */ 381 int m_event_count; 382 /* 383 It aborts dump thread with an error if m_event_count exceeds 384 max_binlog_dump_events. 385 */ 386 int check_event_count(); 387 #endif 388 has_error()389 bool has_error() { return m_errno != 0; } set_error(int errorno,const char * errmsg)390 inline void set_error(int errorno, const char *errmsg) 391 { 392 my_snprintf(m_errmsg_buf, sizeof(m_errmsg_buf), "%.*s", 393 MYSQL_ERRMSG_SIZE - 1, errmsg); 394 m_errmsg= m_errmsg_buf; 395 m_errno= errorno; 396 } 397 set_unknow_error(const char * errmsg)398 inline void set_unknow_error(const char *errmsg) 399 { 400 set_error(ER_UNKNOWN_ERROR, errmsg); 401 } 402 set_fatal_error(const char * errmsg)403 inline void set_fatal_error(const char *errmsg) 404 { 405 set_error(ER_MASTER_FATAL_ERROR_READING_BINLOG, errmsg); 406 } 407 is_fatal_error()408 inline bool is_fatal_error() 409 { 410 return m_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG; 411 } 412 event_checksum_on()413 inline bool event_checksum_on() 414 { 415 return m_event_checksum_alg > binary_log::BINLOG_CHECKSUM_ALG_OFF && 416 m_event_checksum_alg < binary_log::BINLOG_CHECKSUM_ALG_ENUM_END; 417 } 418 set_last_pos(my_off_t log_pos)419 inline void set_last_pos(my_off_t log_pos) 420 { 421 m_last_file= m_linfo.log_file_name; 422 m_last_pos= log_pos; 423 } 424 set_last_file(const char * log_file)425 inline void set_last_file(const char *log_file) 426 { 427 strcpy(m_last_file_buf, log_file); 428 m_last_file= m_last_file_buf; 429 } 430 431 /** 432 * This function SHALL grow the buffer of the packet if needed. 433 * 434 * If the buffer used for the packet is large enough to accommodate 435 * the requested extra bytes, then this function does not do anything. 436 * 437 * On the other hand, if the requested size is bigger than the available 438 * free bytes in the buffer, the buffer is extended by a constant factor 439 * (@c PACKET_GROW_FACTOR). 440 * 441 * @param packet The buffer to resize if needed. 442 * @param extra_size The size in bytes that the caller wants to add to the buffer. 443 * @return true if an error occurred, false otherwise. 444 */ 445 bool grow_packet(size_t extra_size); 446 447 /** 448 * This function SHALL shrink the size of the buffer used. 449 * 450 * If less than half of the buffer was used in the last N 451 * (@c PACKET_SHRINK_COUNTER_THRESHOLD) consecutive times this function 452 * was called, then the buffer gets shrunk by a constant factor 453 * (@c PACKET_SHRINK_FACTOR). 454 * 455 * The buffer is never shrunk less than a minimum size (@c PACKET_MIN_SIZE). 456 * 457 * @param packet The buffer to shrink. 458 */ 459 bool shrink_packet(); 460 461 /** 462 Helper function to recalculate a new size for the growing buffer. 463 464 @param current_size The baseline (for instance, the current buffer size). 465 @param min_size The resulting buffer size, needs to be at least as large 466 as this parameter states. 467 @return The new buffer size, or 0 in the case of an error. 468 */ 469 size_t calc_grow_buffer_size(size_t current_size, size_t min_size); 470 471 /** 472 Helper function to recalculate the new size for the m_new_shrink_size. 473 474 @param current_size The baseline (for instance, the current buffer size). 475 */ 476 void calc_shrink_buffer_size(size_t current_size); 477 }; 478 479 #endif // HAVE_REPLICATION 480 #endif // DEFINED_RPL_BINLOG_SENDER 481