1 /* Copyright (c) 2000, 2018, Oracle and/or its affiliates. 2 Copyright (c) 2008, 2019, MariaDB Corporation 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation; version 2 of the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ 16 17 #include "mariadb.h" 18 #include "sql_priv.h" 19 #include "unireg.h" 20 #include "sql_base.h" 21 #include "sql_parse.h" // check_access 22 #ifdef HAVE_REPLICATION 23 24 #include "rpl_mi.h" 25 #include "rpl_rli.h" 26 #include "sql_repl.h" 27 #include "sql_acl.h" // SUPER_ACL 28 #include "log_event.h" 29 #include "rpl_filter.h" 30 #include <my_dir.h> 31 #include "debug_sync.h" 32 #include "semisync_master.h" 33 #include "semisync_slave.h" 34 35 36 enum enum_gtid_until_state { 37 GTID_UNTIL_NOT_DONE, 38 GTID_UNTIL_STOP_AFTER_STANDALONE, 39 GTID_UNTIL_STOP_AFTER_TRANSACTION 40 }; 41 42 43 int max_binlog_dump_events = 0; // unlimited 44 my_bool opt_sporadic_binlog_dump_fail = 0; 45 #ifndef DBUG_OFF 46 static int binlog_dump_count = 0; 47 #endif 48 49 extern TYPELIB binlog_checksum_typelib; 50 51 52 static int 53 fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, 54 my_bool *do_checksum, ha_checksum *crc, const char** errmsg, 55 enum enum_binlog_checksum_alg checksum_alg_arg, uint32 end_pos) 56 { 57 char header[LOG_EVENT_HEADER_LEN]; 58 ulong event_len; 59 60 *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && 61 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; 62 63 /* 64 'when' (the timestamp) is set to 0 so that slave could distinguish between 65 real and fake Rotate events (if necessary) 66 */ 67 memset(header, 0, 4); 68 header[EVENT_TYPE_OFFSET] = (uchar)event_type; 69 event_len= LOG_EVENT_HEADER_LEN + extra_len + 70 (*do_checksum ? BINLOG_CHECKSUM_LEN : 0); 71 int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); 72 int4store(header + EVENT_LEN_OFFSET, event_len); 73 int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); 74 // TODO: check what problems this may cause and fix them 75 int4store(header + LOG_POS_OFFSET, end_pos); 76 if (packet->append(header, sizeof(header))) 77 { 78 *errmsg= "Failed due to out-of-memory writing event"; 79 return -1; 80 } 81 if (*do_checksum) 82 { 83 *crc= my_checksum(0, (uchar*)header, sizeof(header)); 84 } 85 return 0; 86 } 87 88 89 static int 90 fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg) 91 { 92 if (do_checksum) 93 { 94 char b[BINLOG_CHECKSUM_LEN]; 95 int4store(b, crc); 96 if (packet->append(b, sizeof(b))) 97 { 98 *errmsg= "Failed due to out-of-memory writing event checksum"; 99 return -1; 100 } 101 } 102 return 0; 103 } 104 105 106 static int 107 fake_event_write(NET *net, String *packet, const char **errmsg) 108 { 109 if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) 110 { 111 *errmsg = "failed on my_net_write()"; 112 return -1; 113 } 114 return 0; 115 } 116 117 118 /* 119 Helper structure, used to pass miscellaneous info from mysql_binlog_send() 120 into the helper functions that it calls. 121 */ 122 struct binlog_send_info { 123 rpl_binlog_state until_binlog_state; 124 slave_connection_state gtid_state; 125 THD *thd; 126 NET *net; 127 String *packet; 128 char *const log_file_name; // ptr/alias to linfo.log_file_name 129 slave_connection_state *until_gtid_state; 130 slave_connection_state until_gtid_state_obj; 131 Format_description_log_event *fdev; 132 int mariadb_slave_capability; 133 enum_gtid_skip_type gtid_skip_group; 134 enum_gtid_until_state gtid_until_group; 135 ushort flags; 136 enum enum_binlog_checksum_alg current_checksum_alg; 137 bool slave_gtid_strict_mode; 138 bool send_fake_gtid_list; 139 bool slave_gtid_ignore_duplicates; 140 bool using_gtid_state; 141 142 int error; 143 const char *errmsg; 144 char error_text[MAX_SLAVE_ERRMSG]; 145 rpl_gtid error_gtid; 146 147 ulonglong heartbeat_period; 148 149 /** start file/pos as requested by slave, for error message */ 150 char start_log_file_name[FN_REFLEN]; 151 my_off_t start_pos; 152 153 /** last pos for error message */ 154 my_off_t last_pos; 155 156 #ifndef DBUG_OFF 157 int left_events; 158 uint dbug_reconnect_counter; 159 ulong hb_info_counter; 160 #endif 161 162 bool clear_initial_log_pos; 163 bool should_stop; 164 size_t dirlen; 165 166 binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, 167 char *lfn) 168 : thd(thd_arg), net(&thd_arg->net), packet(packet_arg), 169 log_file_name(lfn), until_gtid_state(NULL), fdev(NULL), 170 gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE), 171 flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), 172 slave_gtid_strict_mode(false), send_fake_gtid_list(false), 173 slave_gtid_ignore_duplicates(false), 174 error(0), 175 errmsg("Unknown error"), 176 heartbeat_period(0), 177 #ifndef DBUG_OFF 178 left_events(max_binlog_dump_events), 179 dbug_reconnect_counter(0), 180 hb_info_counter(0), 181 #endif 182 clear_initial_log_pos(false), 183 should_stop(false) 184 { 185 error_text[0] = 0; 186 bzero(&error_gtid, sizeof(error_gtid)); 187 until_binlog_state.init(); 188 } 189 }; 190 191 // prototype 192 static int reset_transmit_packet(struct binlog_send_info *info, ushort flags, 193 ulong *ev_offset, const char **errmsg); 194 195 /* 196 fake_rotate_event() builds a fake (=which does not exist physically in any 197 binlog) Rotate event, which contains the name of the binlog we are going to 198 send to the slave (because the slave may not know it if it just asked for 199 MASTER_LOG_FILE='', MASTER_LOG_POS=4). 200 < 4.0.14, fake_rotate_event() was called only if the requested pos was 4. 201 After this version we always call it, so that a 3.23.58 slave can rely on 202 it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has 203 zeros in the good positions which, by chance, make it possible for the 3.23 204 slave to detect that this event is unexpected) (this is luck which happens 205 because the master and slave disagree on the size of the header of 206 Log_event). 207 208 Relying on the event length of the Rotate event instead of these 209 well-placed zeros was not possible as Rotate events have a variable-length 210 part. 211 */ 212 213 static int fake_rotate_event(binlog_send_info *info, ulonglong position, 214 const char** errmsg, enum enum_binlog_checksum_alg checksum_alg_arg) 215 { 216 DBUG_ENTER("fake_rotate_event"); 217 ulong ev_offset; 218 char buf[ROTATE_HEADER_LEN+100]; 219 my_bool do_checksum; 220 int err; 221 char* p = info->log_file_name+dirname_length(info->log_file_name); 222 uint ident_len = (uint) strlen(p); 223 String *packet= info->packet; 224 ha_checksum crc; 225 226 /* reset transmit packet for the fake rotate event below */ 227 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) 228 DBUG_RETURN(1); 229 230 if ((err= fake_event_header(packet, ROTATE_EVENT, 231 ident_len + ROTATE_HEADER_LEN, &do_checksum, 232 &crc, 233 errmsg, checksum_alg_arg, 0))) 234 { 235 info->error= ER_UNKNOWN_ERROR; 236 DBUG_RETURN(err); 237 } 238 239 int8store(buf+R_POS_OFFSET,position); 240 packet->append(buf, ROTATE_HEADER_LEN); 241 packet->append(p, ident_len); 242 243 if (do_checksum) 244 { 245 crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN); 246 crc= my_checksum(crc, (uchar*)p, ident_len); 247 } 248 249 if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || 250 (err= fake_event_write(info->net, packet, errmsg))) 251 { 252 info->error= ER_UNKNOWN_ERROR; 253 DBUG_RETURN(err); 254 } 255 DBUG_RETURN(0); 256 } 257 258 259 static int fake_gtid_list_event(binlog_send_info *info, 260 Gtid_list_log_event *glev, const char** errmsg, 261 uint32 current_pos) 262 { 263 my_bool do_checksum; 264 int err; 265 ha_checksum crc; 266 char buf[128]; 267 String str(buf, sizeof(buf), system_charset_info); 268 String* packet= info->packet; 269 270 str.length(0); 271 if (glev->to_packet(&str)) 272 { 273 info->error= ER_UNKNOWN_ERROR; 274 *errmsg= "Failed due to out-of-memory writing Gtid_list event"; 275 return -1; 276 } 277 if ((err= fake_event_header(packet, GTID_LIST_EVENT, 278 str.length(), &do_checksum, &crc, 279 errmsg, info->current_checksum_alg, current_pos))) 280 { 281 info->error= ER_UNKNOWN_ERROR; 282 return err; 283 } 284 285 packet->append(str); 286 if (do_checksum) 287 { 288 crc= my_checksum(crc, (uchar*)str.ptr(), str.length()); 289 } 290 291 if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || 292 (err= fake_event_write(info->net, packet, errmsg))) 293 { 294 info->error= ER_UNKNOWN_ERROR; 295 return err; 296 } 297 298 return 0; 299 } 300 301 302 /* 303 Reset thread transmit packet buffer for event sending 304 305 This function allocates header bytes for event transmission, and 306 should be called before store the event data to the packet buffer. 307 */ 308 static int reset_transmit_packet(binlog_send_info *info, ushort flags, 309 ulong *ev_offset, const char **errmsg) 310 { 311 int ret= 0; 312 String *packet= &info->thd->packet; 313 314 /* reserve and set default header */ 315 packet->length(0); 316 packet->set("\0", 1, &my_charset_bin); 317 318 if (info->thd->semi_sync_slave) 319 { 320 if (repl_semisync_master.reserve_sync_header(packet)) 321 { 322 info->error= ER_UNKNOWN_ERROR; 323 *errmsg= "Failed to run hook 'reserve_header'"; 324 ret= 1; 325 } 326 } 327 328 *ev_offset= packet->length(); 329 return ret; 330 } 331 332 int get_user_var_int(const char *name, 333 long long int *value, int *null_value) 334 { 335 bool null_val; 336 user_var_entry *entry= 337 (user_var_entry*) my_hash_search(¤t_thd->user_vars, 338 (uchar*) name, strlen(name)); 339 if (!entry) 340 return 1; 341 *value= entry->val_int(&null_val); 342 if (null_value) 343 *null_value= null_val; 344 return 0; 345 } 346 347 inline bool is_semi_sync_slave() 348 { 349 int null_value; 350 long long val= 0; 351 get_user_var_int("rpl_semi_sync_slave", &val, &null_value); 352 return val; 353 } 354 355 static int send_file(THD *thd) 356 { 357 NET* net = &thd->net; 358 int fd = -1, error = 1; 359 size_t bytes; 360 char fname[FN_REFLEN+1]; 361 const char *errmsg = 0; 362 int old_timeout; 363 unsigned long packet_len; 364 uchar buf[IO_SIZE]; // It's safe to alloc this 365 DBUG_ENTER("send_file"); 366 367 /* 368 The client might be slow loading the data, give him wait_timeout to do 369 the job 370 */ 371 old_timeout= net->read_timeout; 372 my_net_set_read_timeout(net, thd->variables.net_wait_timeout); 373 374 /* 375 We need net_flush here because the client will not know it needs to send 376 us the file name until it has processed the load event entry 377 */ 378 if (unlikely(net_flush(net))) 379 { 380 read_error: 381 errmsg = "while reading file name"; 382 goto err; 383 } 384 packet_len= my_net_read(net); 385 if (unlikely(packet_len == packet_error)) 386 goto read_error; 387 388 // terminate with \0 for fn_format 389 *((char*)net->read_pos + packet_len) = 0; 390 fn_format(fname, (char*) net->read_pos + 1, "", "", 4); 391 // this is needed to make replicate-ignore-db 392 if (!strcmp(fname,"/dev/null")) 393 goto end; 394 395 if ((fd= mysql_file_open(key_file_send_file, 396 fname, O_RDONLY, MYF(0))) < 0) 397 { 398 errmsg = "on open of file"; 399 goto err; 400 } 401 402 while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0) 403 { 404 if (my_net_write(net, buf, bytes)) 405 { 406 errmsg = "while writing data to client"; 407 goto err; 408 } 409 } 410 411 end: 412 if (my_net_write(net, (uchar*) "", 0) || net_flush(net) || 413 (my_net_read(net) == packet_error)) 414 { 415 errmsg = "while negotiating file transfer close"; 416 goto err; 417 } 418 error = 0; 419 420 err: 421 my_net_set_read_timeout(net, old_timeout); 422 if (fd >= 0) 423 mysql_file_close(fd, MYF(0)); 424 if (errmsg) 425 { 426 sql_print_error("Failed in send_file() %s", errmsg); 427 DBUG_PRINT("error", ("%s", errmsg)); 428 } 429 DBUG_RETURN(error); 430 } 431 432 433 /** 434 Internal to mysql_binlog_send() routine that recalculates checksum for 435 1. FD event (asserted) that needs additional arranment prior sending to slave. 436 2. Start_encryption_log_event whose Ignored flag is set 437 TODO DBUG_ASSERT can be removed if this function is used for more general cases 438 */ 439 440 inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet, 441 ulong ev_offset) 442 { 443 if (checksum_alg == BINLOG_CHECKSUM_ALG_OFF || 444 checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) 445 return; 446 /* recalculate the crc for this event */ 447 uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET); 448 ha_checksum crc; 449 DBUG_ASSERT((data_len == 450 LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN + 451 BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN) || 452 (data_len == 453 LOG_EVENT_MINIMAL_HEADER_LEN + BINLOG_CRYPTO_SCHEME_LENGTH + 454 BINLOG_KEY_VERSION_LENGTH + BINLOG_NONCE_LENGTH + 455 BINLOG_CHECKSUM_LEN)); 456 crc= my_checksum(0, (uchar *)packet->ptr() + ev_offset, data_len - 457 BINLOG_CHECKSUM_LEN); 458 int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc); 459 } 460 461 462 static user_var_entry * get_binlog_checksum_uservar(THD * thd) 463 { 464 LEX_CSTRING name= { STRING_WITH_LEN("master_binlog_checksum")}; 465 user_var_entry *entry= 466 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, 467 name.length); 468 return entry; 469 } 470 471 /** 472 Function for calling in mysql_binlog_send 473 to check if slave initiated checksum-handshake. 474 475 @param[in] thd THD to access a user variable 476 477 @return TRUE if handshake took place, FALSE otherwise 478 */ 479 480 static bool is_slave_checksum_aware(THD * thd) 481 { 482 DBUG_ENTER("is_slave_checksum_aware"); 483 user_var_entry *entry= get_binlog_checksum_uservar(thd); 484 DBUG_RETURN(entry? true : false); 485 } 486 487 /** 488 Function for calling in mysql_binlog_send 489 to get the value of @@binlog_checksum of the master at 490 time of checksum-handshake. 491 492 The value tells the master whether to compute or not, and the slave 493 to verify or not the first artificial Rotate event's checksum. 494 495 @param[in] thd THD to access a user variable 496 497 @return value of @@binlog_checksum alg according to 498 @c enum enum_binlog_checksum_alg 499 */ 500 501 static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * thd) 502 { 503 enum enum_binlog_checksum_alg ret; 504 505 DBUG_ENTER("get_binlog_checksum_value_at_connect"); 506 user_var_entry *entry= get_binlog_checksum_uservar(thd); 507 if (!entry) 508 { 509 ret= BINLOG_CHECKSUM_ALG_UNDEF; 510 } 511 else 512 { 513 DBUG_ASSERT(entry->type == STRING_RESULT); 514 String str; 515 uint dummy_errors; 516 str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin, 517 &dummy_errors); 518 ret= (enum_binlog_checksum_alg) 519 (find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1); 520 DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg 521 } 522 DBUG_RETURN(ret); 523 } 524 525 /* 526 Adjust the position pointer in the binary log file for all running slaves 527 528 SYNOPSIS 529 adjust_linfo_offsets() 530 purge_offset Number of bytes removed from start of log index file 531 532 NOTES 533 - This is called when doing a PURGE when we delete lines from the 534 index log file 535 536 REQUIREMENTS 537 - Before calling this function, we have to ensure that no threads are 538 using any binary log file before purge_offset.a 539 540 TODO 541 - Inform the slave threads that they should sync the position 542 in the binary log file with Relay_log_info::flush(). 543 Now they sync is done for next read. 544 */ 545 546 void adjust_linfo_offsets(my_off_t purge_offset) 547 { 548 THD *tmp; 549 550 mysql_mutex_lock(&LOCK_thread_count); 551 I_List_iterator<THD> it(threads); 552 553 while ((tmp=it++)) 554 { 555 LOG_INFO* linfo; 556 if ((linfo = tmp->current_linfo)) 557 { 558 mysql_mutex_lock(&linfo->lock); 559 /* 560 Index file offset can be less that purge offset only if 561 we just started reading the index file. In that case 562 we have nothing to adjust 563 */ 564 if (linfo->index_file_offset < purge_offset) 565 linfo->fatal = (linfo->index_file_offset != 0); 566 else 567 linfo->index_file_offset -= purge_offset; 568 mysql_mutex_unlock(&linfo->lock); 569 } 570 } 571 mysql_mutex_unlock(&LOCK_thread_count); 572 } 573 574 575 bool log_in_use(const char* log_name) 576 { 577 size_t log_name_len = strlen(log_name) + 1; 578 THD *tmp; 579 bool result = 0; 580 581 mysql_mutex_lock(&LOCK_thread_count); 582 I_List_iterator<THD> it(threads); 583 584 while ((tmp=it++)) 585 { 586 LOG_INFO* linfo; 587 if ((linfo = tmp->current_linfo)) 588 { 589 mysql_mutex_lock(&linfo->lock); 590 result = !strncmp(log_name, linfo->log_file_name, log_name_len); 591 mysql_mutex_unlock(&linfo->lock); 592 if (result) 593 break; 594 } 595 } 596 597 mysql_mutex_unlock(&LOCK_thread_count); 598 return result; 599 } 600 601 bool purge_error_message(THD* thd, int res) 602 { 603 uint errcode; 604 605 if ((errcode= purge_log_get_error_code(res)) != 0) 606 { 607 my_message(errcode, ER_THD(thd, errcode), MYF(0)); 608 return TRUE; 609 } 610 my_ok(thd); 611 return FALSE; 612 } 613 614 615 /** 616 Execute a PURGE BINARY LOGS TO <log> command. 617 618 @param thd Pointer to THD object for the client thread executing the 619 statement. 620 621 @param to_log Name of the last log to purge. 622 623 @retval FALSE success 624 @retval TRUE failure 625 */ 626 bool purge_master_logs(THD* thd, const char* to_log) 627 { 628 char search_file_name[FN_REFLEN]; 629 if (!mysql_bin_log.is_open()) 630 { 631 my_ok(thd); 632 return FALSE; 633 } 634 635 mysql_bin_log.make_log_name(search_file_name, to_log); 636 return purge_error_message(thd, 637 mysql_bin_log.purge_logs(search_file_name, 0, 1, 638 1, NULL)); 639 } 640 641 642 /** 643 Execute a PURGE BINARY LOGS BEFORE <date> command. 644 645 @param thd Pointer to THD object for the client thread executing the 646 statement. 647 648 @param purge_time Date before which logs should be purged. 649 650 @retval FALSE success 651 @retval TRUE failure 652 */ 653 bool purge_master_logs_before_date(THD* thd, time_t purge_time) 654 { 655 if (!mysql_bin_log.is_open()) 656 { 657 my_ok(thd); 658 return 0; 659 } 660 return purge_error_message(thd, 661 mysql_bin_log.purge_logs_before_date(purge_time)); 662 } 663 664 void set_read_error(binlog_send_info *info, int error) 665 { 666 if (error == LOG_READ_EOF) 667 { 668 return; 669 } 670 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 671 switch (error) { 672 case LOG_READ_BOGUS: 673 info->errmsg= "bogus data in log event"; 674 break; 675 case LOG_READ_TOO_LARGE: 676 info->errmsg= "log event entry exceeded max_allowed_packet; " 677 "Increase max_allowed_packet on master"; 678 break; 679 case LOG_READ_IO: 680 info->errmsg= "I/O error reading log event"; 681 break; 682 case LOG_READ_MEM: 683 info->errmsg= "memory allocation failed reading log event"; 684 break; 685 case LOG_READ_TRUNC: 686 info->errmsg= "binlog truncated in the middle of event; " 687 "consider out of disk space on master"; 688 break; 689 case LOG_READ_CHECKSUM_FAILURE: 690 info->errmsg= "event read from binlog did not pass crc check"; 691 break; 692 case LOG_READ_DECRYPT: 693 info->errmsg= "event decryption failure"; 694 break; 695 default: 696 info->errmsg= "unknown error reading log event on the master"; 697 break; 698 } 699 } 700 701 702 /** 703 An auxiliary function for calling in mysql_binlog_send 704 to initialize the heartbeat timeout in waiting for a binlogged event. 705 706 @param[in] thd THD to access a user variable 707 708 @return heartbeat period an ulonglong of nanoseconds 709 or zero if heartbeat was not demanded by slave 710 */ 711 static ulonglong get_heartbeat_period(THD * thd) 712 { 713 bool null_value; 714 LEX_CSTRING name= { STRING_WITH_LEN("master_heartbeat_period")}; 715 user_var_entry *entry= 716 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, 717 name.length); 718 return entry? entry->val_int(&null_value) : 0; 719 } 720 721 /* 722 Lookup the capabilities of the slave, which it announces by setting a value 723 MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability. 724 725 Older MariaDB slaves, and other MySQL slaves, do not set 726 @mariadb_slave_capability, corresponding to a capability of 727 MARIA_SLAVE_CAPABILITY_UNKNOWN (0). 728 */ 729 static int 730 get_mariadb_slave_capability(THD *thd) 731 { 732 bool null_value; 733 const LEX_CSTRING name= { STRING_WITH_LEN("mariadb_slave_capability") }; 734 const user_var_entry *entry= 735 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, 736 name.length); 737 return entry ? 738 (int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN; 739 } 740 741 742 /* 743 Get the value of the @slave_connect_state user variable into the supplied 744 String (this is the GTID connect state requested by the connecting slave). 745 746 Returns false if error (ie. slave did not set the variable and does not 747 want to use GTID to set start position), true if success. 748 */ 749 static bool 750 get_slave_connect_state(THD *thd, String *out_str) 751 { 752 bool null_value; 753 754 const LEX_CSTRING name= { STRING_WITH_LEN("slave_connect_state") }; 755 user_var_entry *entry= 756 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, 757 name.length); 758 return entry && entry->val_str(&null_value, out_str, 0) && !null_value; 759 } 760 761 762 static bool 763 get_slave_gtid_strict_mode(THD *thd) 764 { 765 bool null_value; 766 767 const LEX_CSTRING name= { STRING_WITH_LEN("slave_gtid_strict_mode") }; 768 user_var_entry *entry= 769 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, 770 name.length); 771 return entry && entry->val_int(&null_value) && !null_value; 772 } 773 774 775 static bool 776 get_slave_gtid_ignore_duplicates(THD *thd) 777 { 778 bool null_value; 779 780 const LEX_CSTRING name= { STRING_WITH_LEN("slave_gtid_ignore_duplicates") }; 781 user_var_entry *entry= 782 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, 783 name.length); 784 return entry && entry->val_int(&null_value) && !null_value; 785 } 786 787 788 /* 789 Get the value of the @slave_until_gtid user variable into the supplied 790 String (this is the GTID position specified for START SLAVE UNTIL 791 master_gtid_pos='xxx'). 792 793 Returns false if error (ie. slave did not set the variable and is not doing 794 START SLAVE UNTIL mater_gtid_pos='xxx'), true if success. 795 */ 796 static bool 797 get_slave_until_gtid(THD *thd, String *out_str) 798 { 799 bool null_value; 800 801 const LEX_CSTRING name= { STRING_WITH_LEN("slave_until_gtid") }; 802 user_var_entry *entry= 803 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, 804 name.length); 805 return entry && entry->val_str(&null_value, out_str, 0) && !null_value; 806 } 807 808 809 /* 810 Function prepares and sends repliation heartbeat event. 811 812 @param net net object of THD 813 @param packet buffer to store the heartbeat instance 814 @param event_coordinates binlog file name and position of the last 815 real event master sent from binlog 816 817 @note 818 Among three essential pieces of heartbeat data Log_event::when 819 is computed locally. 820 The error to send is serious and should force terminating 821 the dump thread. 822 */ 823 static int send_heartbeat_event(binlog_send_info *info, 824 NET* net, String* packet, 825 const struct event_coordinates *coord, 826 enum enum_binlog_checksum_alg checksum_alg_arg) 827 { 828 DBUG_ENTER("send_heartbeat_event"); 829 830 ulong ev_offset; 831 char sub_header_buf[HB_SUB_HEADER_LEN]; 832 bool sub_header_in_use=false; 833 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) 834 DBUG_RETURN(1); 835 836 char header[LOG_EVENT_HEADER_LEN]; 837 my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && 838 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; 839 /* 840 'when' (the timestamp) is set to 0 so that slave could distinguish between 841 real and fake Rotate events (if necessary) 842 */ 843 memset(header, 0, 4); // when 844 845 header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT; 846 847 char* p= coord->file_name + dirname_length(coord->file_name); 848 849 size_t ident_len = strlen(p); 850 size_t event_len = ident_len + LOG_EVENT_HEADER_LEN + 851 (do_checksum ? BINLOG_CHECKSUM_LEN : 0); 852 int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); 853 DBUG_EXECUTE_IF("simulate_pos_4G", 854 { 855 const_cast<event_coordinates *>(coord)->pos= (UINT_MAX32 + (ulong)1); 856 DBUG_SET("-d, simulate_pos_4G"); 857 };); 858 if (coord->pos <= UINT_MAX32) 859 { 860 int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos 861 } 862 else 863 { 864 // Set common_header.log_pos=0 to indicate its overflow 865 int4store(header + LOG_POS_OFFSET, 0); 866 sub_header_in_use= true; 867 int8store(sub_header_buf, coord->pos); 868 event_len+= HB_SUB_HEADER_LEN; 869 } 870 871 int4store(header + EVENT_LEN_OFFSET, event_len); 872 int2store(header + FLAGS_OFFSET, 0); 873 874 packet->append(header, sizeof(header)); 875 if (sub_header_in_use) 876 packet->append(sub_header_buf, sizeof(sub_header_buf)); 877 packet->append(p, ident_len); // log_file_name 878 879 if (do_checksum) 880 { 881 char b[BINLOG_CHECKSUM_LEN]; 882 ha_checksum crc= my_checksum(0, (uchar*) header, sizeof(header)); 883 if (sub_header_in_use) 884 crc= my_checksum(crc, (uchar*) sub_header_buf, sizeof(sub_header_buf)); 885 crc= my_checksum(crc, (uchar*) p, ident_len); 886 int4store(b, crc); 887 packet->append(b, sizeof(b)); 888 } 889 890 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) || 891 net_flush(net)) 892 { 893 info->error= ER_UNKNOWN_ERROR; 894 DBUG_RETURN(-1); 895 } 896 897 DBUG_RETURN(0); 898 } 899 900 901 struct binlog_file_entry 902 { 903 binlog_file_entry *next; 904 char *name; 905 }; 906 907 static binlog_file_entry * 908 get_binlog_list(MEM_ROOT *memroot) 909 { 910 IO_CACHE *index_file; 911 char fname[FN_REFLEN]; 912 size_t length; 913 binlog_file_entry *current_list= NULL, *e; 914 DBUG_ENTER("get_binlog_list"); 915 916 if (!mysql_bin_log.is_open()) 917 { 918 my_error(ER_NO_BINARY_LOGGING, MYF(0)); 919 DBUG_RETURN(NULL); 920 } 921 922 mysql_bin_log.lock_index(); 923 index_file=mysql_bin_log.get_index_file(); 924 reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); 925 926 /* The file ends with EOF or empty line */ 927 while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) 928 { 929 --length; /* Remove the newline */ 930 if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) || 931 !(e->name= strmake_root(memroot, fname, length))) 932 { 933 mysql_bin_log.unlock_index(); 934 DBUG_RETURN(NULL); 935 } 936 e->next= current_list; 937 current_list= e; 938 } 939 mysql_bin_log.unlock_index(); 940 941 DBUG_RETURN(current_list); 942 } 943 944 945 /* 946 Check if every GTID requested by the slave is contained in this (or a later) 947 binlog file. Return true if so, false if not. 948 949 We do the check with a single scan of the list of GTIDs, avoiding the need 950 to build an in-memory hash or stuff like that. 951 952 We need to check that slave did not request GTID D-S-N1, when the 953 Gtid_list_log_event for this binlog file has D-S-N2 with N2 >= N1. 954 (Because this means that requested GTID is in an earlier binlog). 955 However, if the Gtid_list_log_event indicates that D-S-N1 is the very last 956 GTID for domain D in prior binlog files, then it is ok to start from the 957 very start of this binlog file. This special case is important, as it 958 allows to purge old logs even if some domain is unused for long. 959 960 In addition, we need to check that we do not have a GTID D-S-N3 in the 961 Gtid_list_log_event where D is not present in the requested slave state at 962 all. Since if D is not in requested slave state, it means that slave needs 963 to start at the very first GTID in domain D. 964 */ 965 static bool 966 contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) 967 { 968 uint32 i; 969 970 for (i= 0; i < glev->count; ++i) 971 { 972 uint32 gl_domain_id= glev->list[i].domain_id; 973 const rpl_gtid *gtid= st->find(gl_domain_id); 974 if (!gtid) 975 { 976 /* 977 The slave needs to start from the very beginning of this domain, which 978 is in an earlier binlog file. So we need to search back further. 979 */ 980 return false; 981 } 982 if (gtid->server_id == glev->list[i].server_id && 983 gtid->seq_no <= glev->list[i].seq_no) 984 { 985 /* 986 The slave needs to start after gtid, but it is contained in an earlier 987 binlog file. So we need to search back further, unless it was the very 988 last gtid logged for the domain in earlier binlog files. 989 */ 990 if (gtid->seq_no < glev->list[i].seq_no) 991 return false; 992 993 /* 994 The slave requested D-S-N1, which happens to be the last GTID logged 995 in prior binlog files with same domain id D and server id S. 996 997 The Gtid_list is kept sorted on domain_id, with the last GTID in each 998 domain_id group being the last one logged. So if this is the last GTID 999 within the domain_id group, then it is ok to start from the very 1000 beginning of this group, per the special case explained in comment at 1001 the start of this function. If not, then we need to search back further. 1002 */ 1003 if (i+1 < glev->count && gl_domain_id == glev->list[i+1].domain_id) 1004 return false; 1005 } 1006 } 1007 1008 return true; 1009 } 1010 1011 1012 static void 1013 give_error_start_pos_missing_in_binlog(int *err, const char **errormsg, 1014 rpl_gtid *error_gtid) 1015 { 1016 rpl_gtid binlog_gtid; 1017 1018 if (mysql_bin_log.lookup_domain_in_binlog_state(error_gtid->domain_id, 1019 &binlog_gtid) && 1020 binlog_gtid.seq_no >= error_gtid->seq_no) 1021 { 1022 *errormsg= "Requested slave GTID state not found in binlog. The slave has " 1023 "probably diverged due to executing erroneous transactions"; 1024 *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2; 1025 } 1026 else 1027 { 1028 *errormsg= "Requested slave GTID state not found in binlog"; 1029 *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; 1030 } 1031 } 1032 1033 1034 /* 1035 Check the start GTID state requested by the slave against our binlog state. 1036 1037 Give an error if the slave requests something that we do not have in our 1038 binlog. 1039 */ 1040 1041 static int 1042 check_slave_start_position(binlog_send_info *info, const char **errormsg, 1043 rpl_gtid *error_gtid) 1044 { 1045 uint32 i; 1046 int err; 1047 slave_connection_state::entry **delete_list= NULL; 1048 uint32 delete_idx= 0; 1049 slave_connection_state *st= &info->gtid_state; 1050 1051 if (rpl_load_gtid_slave_state(info->thd)) 1052 { 1053 *errormsg= "Failed to load replication slave GTID state"; 1054 err= ER_CANNOT_LOAD_SLAVE_GTID_STATE; 1055 goto end; 1056 } 1057 1058 for (i= 0; i < st->hash.records; ++i) 1059 { 1060 slave_connection_state::entry *slave_gtid_entry= 1061 (slave_connection_state::entry *)my_hash_element(&st->hash, i); 1062 rpl_gtid *slave_gtid= &slave_gtid_entry->gtid; 1063 rpl_gtid master_gtid; 1064 rpl_gtid master_replication_gtid; 1065 rpl_gtid start_gtid; 1066 bool start_at_own_slave_pos= 1067 rpl_global_gtid_slave_state->domain_to_gtid(slave_gtid->domain_id, 1068 &master_replication_gtid) && 1069 slave_gtid->server_id == master_replication_gtid.server_id && 1070 slave_gtid->seq_no == master_replication_gtid.seq_no; 1071 1072 if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, 1073 slave_gtid->server_id, 1074 &master_gtid) && 1075 master_gtid.seq_no >= slave_gtid->seq_no) 1076 { 1077 /* 1078 If connecting slave requests to start at the GTID we last applied when 1079 we were ourselves a slave, then this GTID may not exist in our binlog 1080 (in case of --log-slave-updates=0). So set the flag to disable the 1081 error about missing GTID in the binlog in this case. 1082 */ 1083 if (start_at_own_slave_pos) 1084 slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS; 1085 continue; 1086 } 1087 1088 if (!start_at_own_slave_pos) 1089 { 1090 rpl_gtid domain_gtid; 1091 slave_connection_state *until_gtid_state= info->until_gtid_state; 1092 rpl_gtid *until_gtid; 1093 1094 if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, 1095 &domain_gtid)) 1096 { 1097 /* 1098 We do not have anything in this domain, neither in the binlog nor 1099 in the slave state. So we are probably one master in a multi-master 1100 setup, and this domain is served by a different master. 1101 1102 But set a flag so that if we then ever _do_ happen to encounter 1103 anything in this domain, then we will re-check that the requested 1104 slave position exists, and give the error at that time if not. 1105 */ 1106 slave_gtid_entry->flags|= slave_connection_state::START_ON_EMPTY_DOMAIN; 1107 continue; 1108 } 1109 1110 if (info->slave_gtid_ignore_duplicates && 1111 domain_gtid.seq_no < slave_gtid->seq_no) 1112 { 1113 /* 1114 When --gtid-ignore-duplicates, it is ok for the slave to request 1115 something that we do not have (yet) - they might already have gotten 1116 it through another path in a multi-path replication hierarchy. 1117 */ 1118 continue; 1119 } 1120 1121 if (until_gtid_state && 1122 ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) || 1123 (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id, 1124 until_gtid->server_id, 1125 &master_gtid) && 1126 master_gtid.seq_no >= until_gtid->seq_no))) 1127 { 1128 /* 1129 The slave requested to start from a position that is not (yet) in 1130 our binlog, but it also specified an UNTIL condition that _is_ in 1131 our binlog (or a missing UNTIL, which means stop at the very 1132 beginning). So the stop position is before the start position, and 1133 we just delete the entry from the UNTIL hash to mark that this 1134 domain has already reached the UNTIL condition. 1135 */ 1136 if(until_gtid) 1137 until_gtid_state->remove(until_gtid); 1138 continue; 1139 } 1140 1141 *error_gtid= *slave_gtid; 1142 give_error_start_pos_missing_in_binlog(&err, errormsg, error_gtid); 1143 goto end; 1144 } 1145 1146 /* 1147 Ok, so connecting slave asked to start at a GTID that we do not have in 1148 our binlog, but it was in fact the last GTID we applied earlier, when we 1149 were acting as a replication slave. 1150 1151 So this means that we were running as a replication slave without 1152 --log-slave-updates, but now we switched to be a master. It is worth it 1153 to handle this special case, as it allows users to run a simple 1154 master -> slave without --log-slave-updates, and then exchange slave and 1155 master, as long as they make sure the slave is caught up before switching. 1156 */ 1157 1158 /* 1159 First check if we logged something ourselves as a master after being a 1160 slave. This will be seen as a GTID with our own server_id and bigger 1161 seq_no than what is in the slave state. 1162 1163 If we did not log anything ourselves, then start the connecting slave 1164 replicating from the current binlog end position, which in this case 1165 corresponds to our replication slave state and hence what the connecting 1166 slave is requesting. 1167 */ 1168 if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, 1169 global_system_variables.server_id, 1170 &start_gtid) && 1171 start_gtid.seq_no > slave_gtid->seq_no) 1172 { 1173 /* 1174 Start replication within this domain at the first GTID that we logged 1175 ourselves after becoming a master. 1176 1177 Remember that this starting point is in fact a "fake" GTID which may 1178 not exists in the binlog, so that we do not complain about it in 1179 --gtid-strict-mode. 1180 */ 1181 slave_gtid->server_id= global_system_variables.server_id; 1182 slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS; 1183 } 1184 else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, 1185 &start_gtid)) 1186 { 1187 slave_gtid->server_id= start_gtid.server_id; 1188 slave_gtid->seq_no= start_gtid.seq_no; 1189 } 1190 else 1191 { 1192 /* 1193 We do not have _anything_ in our own binlog for this domain. Just 1194 delete the entry in the slave connection state, then it will pick up 1195 anything new that arrives. 1196 1197 We just queue up the deletion and do it later, after the loop, so that 1198 we do not mess up the iteration over the hash. 1199 */ 1200 if (!delete_list) 1201 { 1202 if (!(delete_list= (slave_connection_state::entry **) 1203 my_malloc(sizeof(*delete_list) * st->hash.records, MYF(MY_WME)))) 1204 { 1205 *errormsg= "Out of memory while checking slave start position"; 1206 err= ER_OUT_OF_RESOURCES; 1207 goto end; 1208 } 1209 } 1210 delete_list[delete_idx++]= slave_gtid_entry; 1211 } 1212 } 1213 1214 /* Do any delayed deletes from the hash. */ 1215 if (delete_list) 1216 { 1217 for (i= 0; i < delete_idx; ++i) 1218 st->remove(&(delete_list[i]->gtid)); 1219 } 1220 err= 0; 1221 1222 end: 1223 if (delete_list) 1224 my_free(delete_list); 1225 return err; 1226 } 1227 1228 /* 1229 Find the name of the binlog file to start reading for a slave that connects 1230 using GTID state. 1231 1232 Returns the file name in out_name, which must be of size at least FN_REFLEN. 1233 1234 Returns NULL on ok, error message on error. 1235 1236 In case of non-error return, the returned binlog file is guaranteed to 1237 contain the first event to be transmitted to the slave for every domain 1238 present in our binlogs. It is still necessary to skip all GTIDs up to 1239 and including the GTID requested by slave within each domain. 1240 1241 However, as a special case, if the event to be sent to the slave is the very 1242 first event (within that domain) in the returned binlog, then nothing should 1243 be skipped, so that domain is deleted from the passed in slave connection 1244 state. 1245 1246 This is necessary in case the slave requests a GTID within a replication 1247 domain that has long been inactive. The binlog file containing that GTID may 1248 have been long since purged. However, as long as no GTIDs after that have 1249 been purged, we have the GTID requested by slave in the Gtid_list_log_event 1250 of the latest binlog. So we can start from there, as long as we delete the 1251 corresponding entry in the slave state so we do not wrongly skip any events 1252 that might turn up if that domain becomes active again, vainly looking for 1253 the requested GTID that was already purged. 1254 */ 1255 static const char * 1256 gtid_find_binlog_file(slave_connection_state *state, char *out_name, 1257 slave_connection_state *until_gtid_state) 1258 { 1259 MEM_ROOT memroot; 1260 binlog_file_entry *list; 1261 Gtid_list_log_event *glev= NULL; 1262 const char *errormsg= NULL; 1263 char buf[FN_REFLEN]; 1264 1265 init_alloc_root(&memroot, "gtid_find_binlog_file", 1266 10*(FN_REFLEN+sizeof(binlog_file_entry)), 1267 0, MYF(MY_THREAD_SPECIFIC)); 1268 if (!(list= get_binlog_list(&memroot))) 1269 { 1270 errormsg= "Out of memory while looking for GTID position in binlog"; 1271 goto end; 1272 } 1273 1274 while (list) 1275 { 1276 File file; 1277 IO_CACHE cache; 1278 1279 if (!list->next) 1280 { 1281 /* 1282 It should be safe to read the currently used binlog, as we will only 1283 read the header part that is already written. 1284 1285 But if that does not work on windows, then we will need to cache the 1286 event somewhere in memory I suppose - that could work too. 1287 */ 1288 } 1289 /* 1290 Read the Gtid_list_log_event at the start of the binlog file to 1291 get the binlog state. 1292 */ 1293 if (normalize_binlog_name(buf, list->name, false)) 1294 { 1295 errormsg= "Failed to determine binlog file name while looking for " 1296 "GTID position in binlog"; 1297 goto end; 1298 } 1299 bzero((char*) &cache, sizeof(cache)); 1300 if (unlikely((file= open_binlog(&cache, buf, &errormsg)) == (File)-1)) 1301 goto end; 1302 errormsg= get_gtid_list_event(&cache, &glev); 1303 end_io_cache(&cache); 1304 mysql_file_close(file, MYF(MY_WME)); 1305 if (unlikely(errormsg)) 1306 goto end; 1307 1308 if (!glev || contains_all_slave_gtid(state, glev)) 1309 { 1310 strmake(out_name, buf, FN_REFLEN); 1311 1312 if (glev) 1313 { 1314 uint32 i; 1315 1316 /* 1317 As a special case, we allow to start from binlog file N if the 1318 requested GTID is the last event (in the corresponding domain) in 1319 binlog file (N-1), but then we need to remove that GTID from the slave 1320 state, rather than skipping events waiting for it to turn up. 1321 1322 If slave is doing START SLAVE UNTIL, check for any UNTIL conditions 1323 that are already included in a previous binlog file. Delete any such 1324 from the UNTIL hash, to mark that such domains have already reached 1325 their UNTIL condition. 1326 */ 1327 for (i= 0; i < glev->count; ++i) 1328 { 1329 const rpl_gtid *gtid= state->find(glev->list[i].domain_id); 1330 if (!gtid) 1331 { 1332 /* 1333 Contains_all_slave_gtid() returns false if there is any domain in 1334 Gtid_list_event which is not in the requested slave position. 1335 1336 We may delete a domain from the slave state inside this loop, but 1337 we only do this when it is the very last GTID logged for that 1338 domain in earlier binlogs, and then we can not encounter it in any 1339 further GTIDs in the Gtid_list. 1340 */ 1341 DBUG_ASSERT(0); 1342 } else if (gtid->server_id == glev->list[i].server_id && 1343 gtid->seq_no == glev->list[i].seq_no) 1344 { 1345 /* 1346 The slave requested to start from the very beginning of this 1347 domain in this binlog file. So delete the entry from the state, 1348 we do not need to skip anything. 1349 */ 1350 state->remove(gtid); 1351 } 1352 1353 if (until_gtid_state && 1354 (gtid= until_gtid_state->find(glev->list[i].domain_id)) && 1355 gtid->server_id == glev->list[i].server_id && 1356 gtid->seq_no <= glev->list[i].seq_no) 1357 { 1358 /* 1359 We've already reached the stop position in UNTIL for this domain, 1360 since it is before the start position. 1361 */ 1362 until_gtid_state->remove(gtid); 1363 } 1364 } 1365 } 1366 1367 goto end; 1368 } 1369 delete glev; 1370 glev= NULL; 1371 list= list->next; 1372 } 1373 1374 /* We reached the end without finding anything. */ 1375 errormsg= "Could not find GTID state requested by slave in any binlog " 1376 "files. Probably the slave state is too old and required binlog files " 1377 "have been purged."; 1378 1379 end: 1380 if (glev) 1381 delete glev; 1382 1383 free_root(&memroot, MYF(0)); 1384 return errormsg; 1385 } 1386 1387 1388 /* 1389 Given an old-style binlog position with file name and file offset, find the 1390 corresponding gtid position. If the offset is not at an event boundary, give 1391 an error. 1392 1393 Return NULL on ok, error message string on error. 1394 1395 ToDo: Improve the performance of this by using binlog index files. 1396 */ 1397 static const char * 1398 gtid_state_from_pos(const char *name, uint32 offset, 1399 slave_connection_state *gtid_state) 1400 { 1401 IO_CACHE cache; 1402 File file; 1403 const char *errormsg= NULL; 1404 bool found_gtid_list_event= false; 1405 bool found_format_description_event= false; 1406 bool valid_pos= false; 1407 enum enum_binlog_checksum_alg current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; 1408 int err; 1409 String packet; 1410 Format_description_log_event *fdev= NULL; 1411 1412 if (unlikely(gtid_state->load((const rpl_gtid *)NULL, 0))) 1413 { 1414 errormsg= "Internal error (out of memory?) initializing slave state " 1415 "while scanning binlog to find start position"; 1416 return errormsg; 1417 } 1418 1419 if (unlikely((file= open_binlog(&cache, name, &errormsg)) == (File)-1)) 1420 return errormsg; 1421 1422 if (!(fdev= new Format_description_log_event(3))) 1423 { 1424 errormsg= "Out of memory initializing format_description event " 1425 "while scanning binlog to find start position"; 1426 goto end; 1427 } 1428 1429 /* 1430 First we need to find the initial GTID_LIST_EVENT. We need this even 1431 if the offset is at the very start of the binlog file. 1432 1433 But if we do not find any GTID_LIST_EVENT, then this is an old binlog 1434 with no GTID information, so we return empty GTID state. 1435 */ 1436 for (;;) 1437 { 1438 Log_event_type typ; 1439 uint32 cur_pos; 1440 1441 cur_pos= (uint32)my_b_tell(&cache); 1442 if (cur_pos == offset) 1443 valid_pos= true; 1444 if (found_format_description_event && found_gtid_list_event && 1445 cur_pos >= offset) 1446 break; 1447 1448 packet.length(0); 1449 err= Log_event::read_log_event(&cache, &packet, fdev, 1450 opt_master_verify_checksum ? current_checksum_alg 1451 : BINLOG_CHECKSUM_ALG_OFF); 1452 if (unlikely(err)) 1453 { 1454 errormsg= "Could not read binlog while searching for slave start " 1455 "position on master"; 1456 goto end; 1457 } 1458 /* 1459 The cast to uchar is needed to avoid a signed char being converted to a 1460 negative number. 1461 */ 1462 typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET]; 1463 if (typ == FORMAT_DESCRIPTION_EVENT) 1464 { 1465 Format_description_log_event *tmp; 1466 1467 if (unlikely(found_format_description_event)) 1468 { 1469 errormsg= "Duplicate format description log event found while " 1470 "searching for old-style position in binlog"; 1471 goto end; 1472 } 1473 1474 current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length()); 1475 found_format_description_event= true; 1476 if (unlikely(!(tmp= new Format_description_log_event(packet.ptr(), 1477 packet.length(), 1478 fdev)))) 1479 { 1480 errormsg= "Corrupt Format_description event found or out-of-memory " 1481 "while searching for old-style position in binlog"; 1482 goto end; 1483 } 1484 delete fdev; 1485 fdev= tmp; 1486 } 1487 else if (typ == START_ENCRYPTION_EVENT) 1488 { 1489 uint sele_len = packet.length(); 1490 if (current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) 1491 { 1492 sele_len -= BINLOG_CHECKSUM_LEN; 1493 } 1494 Start_encryption_log_event sele(packet.ptr(), sele_len, fdev); 1495 if (fdev->start_decryption(&sele)) 1496 { 1497 errormsg= "Could not start decryption of binlog."; 1498 goto end; 1499 } 1500 } 1501 else if (unlikely(typ != FORMAT_DESCRIPTION_EVENT && 1502 !found_format_description_event)) 1503 { 1504 errormsg= "Did not find format description log event while searching " 1505 "for old-style position in binlog"; 1506 goto end; 1507 } 1508 else if (typ == ROTATE_EVENT || typ == STOP_EVENT || 1509 typ == BINLOG_CHECKPOINT_EVENT) 1510 continue; /* Continue looking */ 1511 else if (typ == GTID_LIST_EVENT) 1512 { 1513 rpl_gtid *gtid_list; 1514 bool status; 1515 uint32 list_len; 1516 1517 if (unlikely(found_gtid_list_event)) 1518 { 1519 errormsg= "Found duplicate Gtid_list_log_event while scanning binlog " 1520 "to find slave start position"; 1521 goto end; 1522 } 1523 status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), 1524 current_checksum_alg, 1525 >id_list, &list_len, fdev); 1526 if (unlikely(status)) 1527 { 1528 errormsg= "Error reading Gtid_list_log_event while searching " 1529 "for old-style position in binlog"; 1530 goto end; 1531 } 1532 err= gtid_state->load(gtid_list, list_len); 1533 my_free(gtid_list); 1534 if (unlikely(err)) 1535 { 1536 errormsg= "Internal error (out of memory?) initialising slave state " 1537 "while scanning binlog to find start position"; 1538 goto end; 1539 } 1540 found_gtid_list_event= true; 1541 } 1542 else if (unlikely(!found_gtid_list_event)) 1543 { 1544 /* We did not find any Gtid_list_log_event, must be old binlog. */ 1545 goto end; 1546 } 1547 else if (typ == GTID_EVENT) 1548 { 1549 rpl_gtid gtid; 1550 uchar flags2; 1551 if (unlikely(Gtid_log_event::peek(packet.ptr(), packet.length(), 1552 current_checksum_alg, >id.domain_id, 1553 >id.server_id, >id.seq_no, &flags2, 1554 fdev))) 1555 { 1556 errormsg= "Corrupt gtid_log_event found while scanning binlog to find " 1557 "initial slave position"; 1558 goto end; 1559 } 1560 if (unlikely(gtid_state->update(>id))) 1561 { 1562 errormsg= "Internal error (out of memory?) updating slave state while " 1563 "scanning binlog to find start position"; 1564 goto end; 1565 } 1566 } 1567 } 1568 1569 if (unlikely(!valid_pos)) 1570 { 1571 errormsg= "Slave requested incorrect position in master binlog. " 1572 "Requested position %u in file '%s', but this position does not " 1573 "correspond to the location of any binlog event."; 1574 } 1575 1576 end: 1577 delete fdev; 1578 end_io_cache(&cache); 1579 mysql_file_close(file, MYF(MY_WME)); 1580 1581 return errormsg; 1582 } 1583 1584 1585 int 1586 gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) 1587 { 1588 slave_connection_state gtid_state; 1589 const char *lookup_name; 1590 char name_buf[FN_REFLEN]; 1591 LOG_INFO linfo; 1592 1593 if (!mysql_bin_log.is_open()) 1594 { 1595 my_error(ER_NO_BINARY_LOGGING, MYF(0)); 1596 return 1; 1597 } 1598 1599 if (in_name && in_name[0]) 1600 { 1601 mysql_bin_log.make_log_name(name_buf, in_name); 1602 lookup_name= name_buf; 1603 } 1604 else 1605 lookup_name= NULL; 1606 linfo.index_file_offset= 0; 1607 if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1)) 1608 return 1; 1609 1610 if (pos < 4) 1611 pos= 4; 1612 1613 if (gtid_state_from_pos(linfo.log_file_name, pos, >id_state) || 1614 gtid_state.to_string(out_str)) 1615 return 1; 1616 return 0; 1617 } 1618 1619 1620 static bool 1621 is_until_reached(binlog_send_info *info, ulong *ev_offset, 1622 Log_event_type event_type, const char **errmsg, 1623 uint32 current_pos) 1624 { 1625 switch (info->gtid_until_group) 1626 { 1627 case GTID_UNTIL_NOT_DONE: 1628 return false; 1629 case GTID_UNTIL_STOP_AFTER_STANDALONE: 1630 if (Log_event::is_part_of_group(event_type)) 1631 return false; 1632 break; 1633 case GTID_UNTIL_STOP_AFTER_TRANSACTION: 1634 if (event_type != XID_EVENT && 1635 (event_type != QUERY_EVENT || /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ 1636 !Query_log_event::peek_is_commit_rollback 1637 (info->packet->ptr()+*ev_offset, 1638 info->packet->length()-*ev_offset, 1639 info->current_checksum_alg))) 1640 return false; 1641 break; 1642 } 1643 1644 /* 1645 The last event group has been sent, now the START SLAVE UNTIL condition 1646 has been reached. 1647 1648 Send a last fake Gtid_list_log_event with a flag set to mark that we 1649 stop due to UNTIL condition. 1650 */ 1651 if (reset_transmit_packet(info, info->flags, ev_offset, errmsg)) 1652 return true; 1653 Gtid_list_log_event glev(&info->until_binlog_state, 1654 Gtid_list_log_event::FLAG_UNTIL_REACHED); 1655 if (fake_gtid_list_event(info, &glev, errmsg, current_pos)) 1656 return true; 1657 *errmsg= NULL; 1658 return true; 1659 } 1660 1661 1662 /* 1663 Helper function for mysql_binlog_send() to write an event down the slave 1664 connection. 1665 1666 Returns NULL on success, error message string on error. 1667 */ 1668 static const char * 1669 send_event_to_slave(binlog_send_info *info, Log_event_type event_type, 1670 IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid) 1671 { 1672 my_off_t pos; 1673 String* const packet= info->packet; 1674 size_t len= packet->length(); 1675 int mariadb_slave_capability= info->mariadb_slave_capability; 1676 enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg; 1677 slave_connection_state *gtid_state= &info->gtid_state; 1678 slave_connection_state *until_gtid_state= info->until_gtid_state; 1679 bool need_sync= false; 1680 1681 if (event_type == GTID_LIST_EVENT && 1682 info->using_gtid_state && until_gtid_state) 1683 { 1684 rpl_gtid *gtid_list; 1685 uint32 list_len; 1686 bool err; 1687 1688 if (ev_offset > len || 1689 Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, 1690 current_checksum_alg, 1691 >id_list, &list_len, info->fdev)) 1692 { 1693 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 1694 return "Failed to read Gtid_list_log_event: corrupt binlog"; 1695 } 1696 err= info->until_binlog_state.load(gtid_list, list_len); 1697 my_free(gtid_list); 1698 if (err) 1699 { 1700 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 1701 return "Failed in internal GTID book-keeping: Out of memory"; 1702 } 1703 } 1704 1705 /* Skip GTID event groups until we reach slave position within a domain_id. */ 1706 if (event_type == GTID_EVENT && info->using_gtid_state) 1707 { 1708 uchar flags2; 1709 slave_connection_state::entry *gtid_entry; 1710 rpl_gtid *gtid; 1711 1712 if (gtid_state->count() > 0 || until_gtid_state) 1713 { 1714 rpl_gtid event_gtid; 1715 1716 if (ev_offset > len || 1717 Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, 1718 current_checksum_alg, 1719 &event_gtid.domain_id, &event_gtid.server_id, 1720 &event_gtid.seq_no, &flags2, info->fdev)) 1721 { 1722 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 1723 return "Failed to read Gtid_log_event: corrupt binlog"; 1724 } 1725 1726 DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100", 1727 { 1728 rpl_gtid *dbug_gtid; 1729 if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) && 1730 dbug_gtid->seq_no == 100) 1731 { 1732 DBUG_SET("-d,gtid_force_reconnect_at_10_1_100"); 1733 DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100"); 1734 info->error= ER_UNKNOWN_ERROR; 1735 return "DBUG-injected forced reconnect"; 1736 } 1737 }); 1738 1739 if (info->until_binlog_state.update_nolock(&event_gtid, false)) 1740 { 1741 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 1742 return "Failed in internal GTID book-keeping: Out of memory"; 1743 } 1744 1745 if (gtid_state->count() > 0) 1746 { 1747 gtid_entry= gtid_state->find_entry(event_gtid.domain_id); 1748 if (gtid_entry != NULL) 1749 { 1750 gtid= >id_entry->gtid; 1751 if (gtid_entry->flags & slave_connection_state::START_ON_EMPTY_DOMAIN) 1752 { 1753 rpl_gtid master_gtid; 1754 if (!mysql_bin_log.find_in_binlog_state(gtid->domain_id, 1755 gtid->server_id, 1756 &master_gtid) || 1757 master_gtid.seq_no < gtid->seq_no) 1758 { 1759 int err; 1760 const char *errormsg; 1761 *error_gtid= *gtid; 1762 give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid); 1763 info->error= err; 1764 return errormsg; 1765 } 1766 gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN; 1767 } 1768 1769 /* Skip this event group if we have not yet reached slave start pos. */ 1770 if (event_gtid.server_id != gtid->server_id || 1771 event_gtid.seq_no <= gtid->seq_no) 1772 info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ? 1773 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); 1774 if (event_gtid.server_id == gtid->server_id && 1775 event_gtid.seq_no >= gtid->seq_no) 1776 { 1777 if (info->slave_gtid_strict_mode && 1778 event_gtid.seq_no > gtid->seq_no && 1779 !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS)) 1780 { 1781 /* 1782 In strict mode, it is an error if the slave requests to start 1783 in a "hole" in the master's binlog: a GTID that does not 1784 exist, even though both the prior and subsequent seq_no exists 1785 for same domain_id and server_id. 1786 */ 1787 info->error= ER_GTID_START_FROM_BINLOG_HOLE; 1788 *error_gtid= *gtid; 1789 return "The binlog on the master is missing the GTID requested " 1790 "by the slave (even though both a prior and a subsequent " 1791 "sequence number does exist), and GTID strict mode is enabled."; 1792 } 1793 1794 /* 1795 Send a fake Gtid_list event to the slave. 1796 This allows the slave to update its current binlog position 1797 so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work. 1798 The fake event will be sent at the end of this event group. 1799 */ 1800 info->send_fake_gtid_list= true; 1801 1802 /* 1803 Delete this entry if we have reached slave start position (so we 1804 will not skip subsequent events and won't have to look them up 1805 and check). 1806 */ 1807 gtid_state->remove(gtid); 1808 } 1809 } 1810 } 1811 1812 if (until_gtid_state) 1813 { 1814 gtid= until_gtid_state->find(event_gtid.domain_id); 1815 if (gtid == NULL) 1816 { 1817 /* 1818 This domain already reached the START SLAVE UNTIL stop condition, 1819 so skip this event group. 1820 */ 1821 info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? 1822 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); 1823 } 1824 else if (event_gtid.server_id == gtid->server_id && 1825 event_gtid.seq_no >= gtid->seq_no) 1826 { 1827 /* 1828 We have reached the stop condition. 1829 Delete this domain_id from the hash, so we will skip all further 1830 events in this domain and eventually stop when all domains are 1831 done. 1832 */ 1833 uint64 until_seq_no= gtid->seq_no; 1834 until_gtid_state->remove(gtid); 1835 if (until_gtid_state->count() == 0) 1836 info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ? 1837 GTID_UNTIL_STOP_AFTER_STANDALONE : 1838 GTID_UNTIL_STOP_AFTER_TRANSACTION); 1839 if (event_gtid.seq_no > until_seq_no) 1840 { 1841 /* 1842 The GTID in START SLAVE UNTIL condition is missing in our binlog. 1843 This should normally not happen (user error), but since we can be 1844 sure that we are now beyond the position that the UNTIL condition 1845 should be in, we can just stop now. And we also need to skip this 1846 event group (as it is beyond the UNTIL condition). 1847 */ 1848 info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? 1849 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); 1850 } 1851 } 1852 } 1853 } 1854 } 1855 1856 /* 1857 Skip event group if we have not yet reached the correct slave GTID position. 1858 1859 Note that slave that understands GTID can also tolerate holes, so there is 1860 no need to supply dummy event. 1861 */ 1862 switch (info->gtid_skip_group) 1863 { 1864 case GTID_SKIP_STANDALONE: 1865 if (!Log_event::is_part_of_group(event_type)) 1866 info->gtid_skip_group= GTID_SKIP_NOT; 1867 return NULL; 1868 case GTID_SKIP_TRANSACTION: 1869 if (event_type == XID_EVENT || 1870 (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ 1871 Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, 1872 len - ev_offset, 1873 current_checksum_alg))) 1874 info->gtid_skip_group= GTID_SKIP_NOT; 1875 return NULL; 1876 case GTID_SKIP_NOT: 1877 break; 1878 } 1879 1880 /* Do not send annotate_rows events unless slave requested it. */ 1881 if (event_type == ANNOTATE_ROWS_EVENT && 1882 !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) 1883 { 1884 if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) 1885 { 1886 /* This slave can tolerate events omitted from the binlog stream. */ 1887 return NULL; 1888 } 1889 else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE) 1890 { 1891 /* 1892 The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as 1893 it will not log them in its own binary log). However, it understands the 1894 event and will just ignore it, and it would break if we omitted it, 1895 leaving a hole in the binlog stream. So just send the event as-is. 1896 */ 1897 } 1898 else 1899 { 1900 /* 1901 The slave does not understand ANNOTATE_ROWS_EVENT. 1902 1903 Older MariaDB slaves (and MySQL slaves) will break replication if there 1904 are holes in the binlog stream (they will miscompute the binlog offset 1905 and request the wrong position when reconnecting). 1906 1907 So replace the event with a dummy event of the same size that will be 1908 a no-operation on the slave. 1909 */ 1910 if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) 1911 { 1912 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 1913 return "Failed to replace row annotate event with dummy: too small event."; 1914 } 1915 } 1916 } 1917 1918 /* 1919 Replace GTID events with old-style BEGIN events for slaves that do not 1920 understand global transaction IDs. For stand-alone events, where there is 1921 no terminating COMMIT query event, omit the GTID event or replace it with 1922 a dummy event, as appropriate. 1923 */ 1924 if (event_type == GTID_EVENT && 1925 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID) 1926 { 1927 bool need_dummy= 1928 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES; 1929 bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy, 1930 ev_offset, 1931 current_checksum_alg); 1932 if (err) 1933 { 1934 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 1935 return "Failed to replace GTID event with backwards-compatible event: " 1936 "currupt event."; 1937 } 1938 if (!need_dummy) 1939 return NULL; 1940 } 1941 1942 /* 1943 Do not send binlog checkpoint or gtid list events to a slave that does not 1944 understand it. 1945 */ 1946 if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && 1947 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) || 1948 (unlikely(event_type == GTID_LIST_EVENT) && 1949 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)) 1950 { 1951 if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) 1952 { 1953 /* This slave can tolerate events omitted from the binlog stream. */ 1954 return NULL; 1955 } 1956 else 1957 { 1958 /* 1959 The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy 1960 event instead, with same length so slave does not get confused about 1961 binlog positions. 1962 */ 1963 if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) 1964 { 1965 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 1966 return "Failed to replace binlog checkpoint or gtid list event with " 1967 "dummy: too small event."; 1968 } 1969 } 1970 } 1971 1972 /* 1973 Skip events with the @@skip_replication flag set, if slave requested 1974 skipping of such events. 1975 */ 1976 if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION) 1977 { 1978 uint16 event_flags= uint2korr(&((*packet)[FLAGS_OFFSET + ev_offset])); 1979 1980 if (event_flags & LOG_EVENT_SKIP_REPLICATION_F) 1981 return NULL; 1982 } 1983 1984 THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave); 1985 1986 pos= my_b_tell(log); 1987 if (repl_semisync_master.update_sync_header(info->thd, 1988 (uchar*) packet->c_ptr_safe(), 1989 info->log_file_name + info->dirlen, 1990 pos, &need_sync)) 1991 { 1992 info->error= ER_UNKNOWN_ERROR; 1993 return "run 'before_send_event' hook failed"; 1994 } 1995 1996 if (my_net_write(info->net, (uchar*) packet->ptr(), len)) 1997 { 1998 info->error= ER_UNKNOWN_ERROR; 1999 return "Failed on my_net_write()"; 2000 } 2001 2002 DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); 2003 if (event_type == LOAD_EVENT) 2004 { 2005 if (send_file(info->thd)) 2006 { 2007 info->error= ER_UNKNOWN_ERROR; 2008 return "failed in send_file()"; 2009 } 2010 } 2011 2012 if (need_sync && repl_semisync_master.flush_net(info->thd, 2013 packet->c_ptr_safe())) 2014 { 2015 info->error= ER_UNKNOWN_ERROR; 2016 return "Failed to run hook 'after_send_event'"; 2017 } 2018 2019 return NULL; /* Success */ 2020 } 2021 2022 static int check_start_offset(binlog_send_info *info, 2023 const char *log_file_name, 2024 my_off_t pos) 2025 { 2026 IO_CACHE log; 2027 File file= -1; 2028 2029 /** check that requested position is inside of file */ 2030 if ((file=open_binlog(&log, log_file_name, &info->errmsg)) < 0) 2031 { 2032 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2033 return 1; 2034 } 2035 2036 if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log)) 2037 { 2038 const char* msg= "Client requested master to start replication from " 2039 "impossible position"; 2040 2041 info->errmsg= NULL; // don't do further modifications of error_text 2042 snprintf(info->error_text, sizeof(info->error_text), 2043 "%s; the first event '%s' at %lld, " 2044 "the last event read from '%s' at %d, " 2045 "the last byte read from '%s' at %d.", 2046 msg, 2047 my_basename(info->start_log_file_name), pos, 2048 my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE, 2049 my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE); 2050 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2051 goto err; 2052 } 2053 2054 err: 2055 end_io_cache(&log); 2056 mysql_file_close(file, MYF(MY_WME)); 2057 return info->error; 2058 } 2059 2060 static int init_binlog_sender(binlog_send_info *info, 2061 LOG_INFO *linfo, 2062 const char *log_ident, 2063 my_off_t *pos) 2064 { 2065 THD *thd= info->thd; 2066 int error; 2067 char str_buf[128]; 2068 String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); 2069 char str_buf2[128]; 2070 String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); 2071 connect_gtid_state.length(0); 2072 2073 /** save start file/pos that was requested by slave */ 2074 strmake(info->start_log_file_name, log_ident, 2075 sizeof(info->start_log_file_name)); 2076 info->start_pos= *pos; 2077 2078 /** init last pos */ 2079 info->last_pos= *pos; 2080 2081 info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd); 2082 info->mariadb_slave_capability= get_mariadb_slave_capability(thd); 2083 info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); 2084 DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", 2085 info->using_gtid_state= false;); 2086 2087 if (info->using_gtid_state) 2088 { 2089 info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); 2090 info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd); 2091 if (get_slave_until_gtid(thd, &slave_until_gtid_str)) 2092 info->until_gtid_state= &info->until_gtid_state_obj; 2093 } 2094 2095 DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events", 2096 { 2097 DBUG_SET("-d,binlog_force_reconnect_after_22_events"); 2098 DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events"); 2099 info->dbug_reconnect_counter= 22; 2100 }); 2101 2102 if (global_system_variables.log_warnings > 1) 2103 { 2104 sql_print_information( 2105 "Start binlog_dump to slave_server(%lu), pos(%s, %lu), " 2106 "using_gtid(%d), gtid('%s')", thd->variables.server_id, 2107 log_ident, (ulong)*pos, info->using_gtid_state, 2108 connect_gtid_state.c_ptr_quick()); 2109 } 2110 2111 #ifndef DBUG_OFF 2112 if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) 2113 { 2114 info->errmsg= "Master failed COM_BINLOG_DUMP to test if slave can recover"; 2115 info->error= ER_UNKNOWN_ERROR; 2116 return 1; 2117 } 2118 #endif 2119 2120 if (!mysql_bin_log.is_open()) 2121 { 2122 info->errmsg= "Binary log is not open"; 2123 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2124 return 1; 2125 } 2126 2127 char search_file_name[FN_REFLEN]; 2128 const char *name=search_file_name; 2129 if (info->using_gtid_state) 2130 { 2131 if (info->gtid_state.load(connect_gtid_state.c_ptr_quick(), 2132 connect_gtid_state.length())) 2133 { 2134 info->errmsg= "Out of memory or malformed slave request when obtaining " 2135 "start position from GTID state"; 2136 info->error= ER_UNKNOWN_ERROR; 2137 return 1; 2138 } 2139 if (info->until_gtid_state && 2140 info->until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), 2141 slave_until_gtid_str.length())) 2142 { 2143 info->errmsg= "Out of memory or malformed slave request when " 2144 "obtaining UNTIL position sent from slave"; 2145 info->error= ER_UNKNOWN_ERROR; 2146 return 1; 2147 } 2148 if (unlikely((error= check_slave_start_position(info, &info->errmsg, 2149 &info->error_gtid)))) 2150 { 2151 info->error= error; 2152 return 1; 2153 } 2154 if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state, 2155 search_file_name, 2156 info->until_gtid_state))) 2157 { 2158 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2159 return 1; 2160 } 2161 2162 /* start from beginning of binlog file */ 2163 *pos = 4; 2164 } 2165 else 2166 { 2167 if (log_ident[0]) 2168 mysql_bin_log.make_log_name(search_file_name, log_ident); 2169 else 2170 name=0; // Find first log 2171 } 2172 linfo->index_file_offset= 0; 2173 2174 if (mysql_bin_log.find_log_pos(linfo, name, 1)) 2175 { 2176 info->errmsg= "Could not find first log file name in binary " 2177 "log index file"; 2178 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2179 return 1; 2180 } 2181 2182 // set current pos too 2183 linfo->pos= *pos; 2184 2185 // note: publish that we use file, before we open it 2186 thd->current_linfo= linfo; 2187 2188 if (check_start_offset(info, linfo->log_file_name, *pos)) 2189 return 1; 2190 2191 if (*pos > BIN_LOG_HEADER_SIZE) 2192 { 2193 /* 2194 mark that first format descriptor with "log_pos=0", so the slave 2195 should not increment master's binlog position 2196 (rli->group_master_log_pos) 2197 */ 2198 info->clear_initial_log_pos= true; 2199 } 2200 2201 return 0; 2202 } 2203 2204 /** 2205 * send format descriptor event for one binlog file 2206 */ 2207 static int send_format_descriptor_event(binlog_send_info *info, IO_CACHE *log, 2208 LOG_INFO *linfo, my_off_t start_pos) 2209 { 2210 int error; 2211 ulong ev_offset; 2212 THD *thd= info->thd; 2213 String *packet= info->packet; 2214 Log_event_type event_type; 2215 bool initial_log_pos= info->clear_initial_log_pos; 2216 DBUG_ENTER("send_format_descriptor_event"); 2217 2218 /** 2219 * 1) reset fdev before each log-file 2220 * 2) read first event, should be the format descriptor 2221 * 3) read second event, *might* be start encryption event 2222 * if it's isn't, seek back to undo this read 2223 */ 2224 if (info->fdev != NULL) 2225 delete info->fdev; 2226 2227 if (!(info->fdev= new Format_description_log_event(3))) 2228 { 2229 info->errmsg= "Out of memory initializing format_description event"; 2230 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2231 DBUG_RETURN(1); 2232 } 2233 2234 /* reset transmit packet for the event read from binary log file */ 2235 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) 2236 DBUG_RETURN(1); 2237 2238 /* 2239 Try to find a Format_description_log_event at the beginning of 2240 the binlog 2241 */ 2242 info->last_pos= my_b_tell(log); 2243 error= Log_event::read_log_event(log, packet, info->fdev, 2244 opt_master_verify_checksum 2245 ? info->current_checksum_alg 2246 : BINLOG_CHECKSUM_ALG_OFF); 2247 linfo->pos= my_b_tell(log); 2248 2249 if (unlikely(error)) 2250 { 2251 set_read_error(info, error); 2252 DBUG_RETURN(1); 2253 } 2254 2255 event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); 2256 2257 /* 2258 The packet has offsets equal to the normal offsets in a 2259 binlog event + ev_offset (the first ev_offset characters are 2260 the header (default \0)). 2261 */ 2262 DBUG_PRINT("info", 2263 ("Looked for a Format_description_log_event, " 2264 "found event type %d", (int)event_type)); 2265 2266 if (event_type != FORMAT_DESCRIPTION_EVENT) 2267 { 2268 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2269 info->errmsg= "Failed to find format descriptor event in start of binlog"; 2270 sql_print_warning("Failed to find format descriptor event in " 2271 "start of binlog: %s", 2272 info->log_file_name); 2273 DBUG_RETURN(1); 2274 } 2275 2276 info->current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, 2277 packet->length() - ev_offset); 2278 2279 DBUG_ASSERT(info->current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || 2280 info->current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || 2281 info->current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); 2282 2283 if (!is_slave_checksum_aware(thd) && 2284 info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && 2285 info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) 2286 { 2287 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2288 info->errmsg= "Slave can not handle replication events with the " 2289 "checksum that master is configured to log"; 2290 sql_print_warning("Master is configured to log replication events " 2291 "with checksum, but will not send such events to " 2292 "slaves that cannot process them"); 2293 DBUG_RETURN(1); 2294 } 2295 2296 uint ev_len= packet->length() - ev_offset; 2297 if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) 2298 ev_len-= BINLOG_CHECKSUM_LEN; 2299 2300 Format_description_log_event *tmp; 2301 if (!(tmp= new Format_description_log_event(packet->ptr() + ev_offset, 2302 ev_len, info->fdev))) 2303 { 2304 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2305 info->errmsg= "Corrupt Format_description event found " 2306 "or out-of-memory"; 2307 DBUG_RETURN(1); 2308 } 2309 delete info->fdev; 2310 info->fdev= tmp; 2311 2312 (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; 2313 2314 if (initial_log_pos) 2315 { 2316 info->clear_initial_log_pos= false; 2317 /* 2318 mark that this event with "log_pos=0", so the slave 2319 should not increment master's binlog position 2320 (rli->group_master_log_pos) 2321 */ 2322 int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0); 2323 2324 /* 2325 if reconnect master sends FD event with `created' as 0 2326 to avoid destroying temp tables. 2327 */ 2328 int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ 2329 ST_CREATED_OFFSET+ev_offset, (ulong) 0); 2330 2331 /* fix the checksum due to latest changes in header */ 2332 fix_checksum(info->current_checksum_alg, packet, ev_offset); 2333 } 2334 else if (info->using_gtid_state) 2335 { 2336 /* 2337 If this event has the field `created' set, then it will cause the 2338 slave to delete all active temporary tables. This must not happen 2339 if the slave received any later GTIDs in a previous connect, as 2340 those GTIDs might have created new temporary tables that are still 2341 needed. 2342 2343 So here, we check if the starting GTID position was already 2344 reached before this format description event. If not, we clear the 2345 `created' flag to preserve temporary tables on the slave. (If the 2346 slave connects at a position past this event, it means that it 2347 already received and handled it in a previous connect). 2348 */ 2349 if (!info->gtid_state.is_pos_reached()) 2350 { 2351 int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ 2352 ST_CREATED_OFFSET+ev_offset, (ulong) 0); 2353 fix_checksum(info->current_checksum_alg, packet, ev_offset); 2354 } 2355 } 2356 2357 /* send it */ 2358 if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length())) 2359 { 2360 info->errmsg= "Failed on my_net_write()"; 2361 info->error= ER_UNKNOWN_ERROR; 2362 DBUG_RETURN(1); 2363 } 2364 2365 /* 2366 Read the following Start_encryption_log_event and send it to slave as 2367 Ignorable_log_event. Although Slave doesn't need to know whether master's 2368 binlog is encrypted but it needs to update slave log pos (for mysqlbinlog). 2369 2370 If slave want to encrypt its logs, it should generate its own 2371 random nonce, it should not use the one from the master. 2372 */ 2373 /* reset transmit packet for the event read from binary log file */ 2374 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) 2375 DBUG_RETURN(1); 2376 info->last_pos= linfo->pos; 2377 error= Log_event::read_log_event(log, packet, info->fdev, 2378 opt_master_verify_checksum 2379 ? info->current_checksum_alg 2380 : BINLOG_CHECKSUM_ALG_OFF); 2381 linfo->pos= my_b_tell(log); 2382 2383 if (unlikely(error)) 2384 { 2385 set_read_error(info, error); 2386 DBUG_RETURN(1); 2387 } 2388 2389 event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET + ev_offset]); 2390 if (event_type == START_ENCRYPTION_EVENT) 2391 { 2392 Start_encryption_log_event *sele= (Start_encryption_log_event *) 2393 Log_event::read_log_event(packet->ptr() + ev_offset, packet->length() 2394 - ev_offset, &info->errmsg, info->fdev, 2395 BINLOG_CHECKSUM_ALG_OFF); 2396 if (!sele) 2397 { 2398 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2399 DBUG_RETURN(1); 2400 } 2401 2402 if (info->fdev->start_decryption(sele)) 2403 { 2404 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2405 info->errmsg= "Could not decrypt binlog: encryption key error"; 2406 delete sele; 2407 DBUG_RETURN(1); 2408 } 2409 /* Make it Ignorable_log_event and send it */ 2410 (*packet)[FLAGS_OFFSET+ev_offset] |= LOG_EVENT_IGNORABLE_F; 2411 if (initial_log_pos) 2412 int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0); 2413 /* fix the checksum due to latest changes in header */ 2414 fix_checksum(info->current_checksum_alg, packet, ev_offset); 2415 if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length())) 2416 { 2417 info->errmsg= "Failed on my_net_write()"; 2418 info->error= ER_UNKNOWN_ERROR; 2419 DBUG_RETURN(1); 2420 } 2421 delete sele; 2422 } 2423 else if (start_pos == BIN_LOG_HEADER_SIZE) 2424 { 2425 /* 2426 not Start_encryption_log_event - seek back. But only if 2427 send_one_binlog_file() isn't going to seek anyway 2428 */ 2429 my_b_seek(log, info->last_pos); 2430 linfo->pos= info->last_pos; 2431 } 2432 2433 2434 /** all done */ 2435 DBUG_RETURN(0); 2436 } 2437 2438 static bool should_stop(binlog_send_info *info) 2439 { 2440 return 2441 info->net->error || 2442 info->net->vio == NULL || 2443 info->thd->killed || 2444 info->error != 0 || 2445 info->should_stop; 2446 } 2447 2448 /** 2449 * wait for new events to enter binlog 2450 * this function will send heartbeats while waiting if so configured 2451 */ 2452 static int wait_new_events(binlog_send_info *info, /* in */ 2453 LOG_INFO* linfo, /* in */ 2454 char binlog_end_pos_filename[], /* out */ 2455 my_off_t *end_pos_ptr) /* out */ 2456 { 2457 int ret= 1; 2458 PSI_stage_info old_stage; 2459 2460 mysql_bin_log.lock_binlog_end_pos(); 2461 info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(), 2462 mysql_bin_log.get_binlog_end_pos_lock(), 2463 &stage_master_has_sent_all_binlog_to_slave, 2464 &old_stage); 2465 2466 while (!should_stop(info)) 2467 { 2468 *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename); 2469 if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0) 2470 { 2471 /* there has been a log file switch, we don't need to wait */ 2472 ret= 0; 2473 break; 2474 } 2475 2476 if (linfo->pos < *end_pos_ptr) 2477 { 2478 /* there is data to read, we don't need to wait */ 2479 ret= 0; 2480 break; 2481 } 2482 2483 if (info->heartbeat_period) 2484 { 2485 struct timespec ts; 2486 set_timespec_nsec(ts, info->heartbeat_period); 2487 ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, &ts); 2488 if (ret == ETIMEDOUT || ret == ETIME) 2489 { 2490 struct event_coordinates coord = { linfo->log_file_name, linfo->pos }; 2491 #ifndef DBUG_OFF 2492 const ulong hb_info_counter_limit = 3; 2493 if (info->hb_info_counter < hb_info_counter_limit) 2494 { 2495 sql_print_information("master sends heartbeat message %s:%llu", 2496 linfo->log_file_name, linfo->pos); 2497 info->hb_info_counter++; 2498 if (info->hb_info_counter == hb_info_counter_limit) 2499 sql_print_information("the rest of heartbeat info skipped ..."); 2500 } 2501 #endif 2502 mysql_bin_log.unlock_binlog_end_pos(); 2503 ret= send_heartbeat_event(info, 2504 info->net, info->packet, &coord, 2505 info->current_checksum_alg); 2506 mysql_bin_log.lock_binlog_end_pos(); 2507 2508 if (ret) 2509 { 2510 ret= 1; // error 2511 break; 2512 } 2513 /** 2514 * re-read heartbeat period after each sent 2515 */ 2516 info->heartbeat_period= get_heartbeat_period(info->thd); 2517 } 2518 else if (ret != 0) 2519 { 2520 ret= 1; // error 2521 break; 2522 } 2523 } 2524 else 2525 { 2526 ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL); 2527 if (ret != 0 && ret != ETIMEDOUT && ret != ETIME) 2528 { 2529 ret= 1; // error 2530 break; 2531 } 2532 } 2533 } 2534 2535 /* it releases the lock set in ENTER_COND */ 2536 info->thd->EXIT_COND(&old_stage); 2537 return ret; 2538 } 2539 2540 /** 2541 * get end pos of current log file, this function 2542 * will wait if there is nothing available 2543 */ 2544 static my_off_t get_binlog_end_pos(binlog_send_info *info, 2545 IO_CACHE* log, 2546 LOG_INFO* linfo) 2547 { 2548 my_off_t log_pos= my_b_tell(log); 2549 2550 /** 2551 * get current binlog end pos 2552 */ 2553 mysql_bin_log.lock_binlog_end_pos(); 2554 char binlog_end_pos_filename[FN_REFLEN]; 2555 my_off_t end_pos= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename); 2556 mysql_bin_log.unlock_binlog_end_pos(); 2557 2558 do 2559 { 2560 if (strcmp(binlog_end_pos_filename, linfo->log_file_name) != 0) 2561 { 2562 /** 2563 * this file is not active, since it's not written to again, 2564 * it safe to check file length and use that as end_pos 2565 */ 2566 end_pos= my_b_filelength(log); 2567 2568 if (log_pos == end_pos) 2569 return 0; // already at end of file inactive file 2570 else 2571 return end_pos; // return size of inactive file 2572 } 2573 else 2574 { 2575 /** 2576 * this is the active file 2577 */ 2578 2579 if (log_pos < end_pos) 2580 { 2581 /** 2582 * there is data available to read 2583 */ 2584 return end_pos; 2585 } 2586 2587 /** 2588 * check if we should wait for more data 2589 */ 2590 if ((info->flags & BINLOG_DUMP_NON_BLOCK) || 2591 (info->thd->variables.server_id == 0)) 2592 { 2593 info->should_stop= true; 2594 return 0; 2595 } 2596 2597 /** 2598 * flush data before waiting 2599 */ 2600 if (net_flush(info->net)) 2601 { 2602 info->errmsg= "failed on net_flush()"; 2603 info->error= ER_UNKNOWN_ERROR; 2604 return 1; 2605 } 2606 2607 if (wait_new_events(info, linfo, binlog_end_pos_filename, &end_pos)) 2608 return 1; 2609 } 2610 } while (!should_stop(info)); 2611 2612 return 0; 2613 } 2614 2615 /** 2616 * This function sends events from one binlog file 2617 * but only up until end_pos 2618 * 2619 * return 0 - OK 2620 * else NOK 2621 */ 2622 static int send_events(binlog_send_info *info, IO_CACHE* log, LOG_INFO* linfo, 2623 my_off_t end_pos) 2624 { 2625 int error; 2626 ulong ev_offset; 2627 2628 String *packet= info->packet; 2629 linfo->pos= my_b_tell(log); 2630 info->last_pos= my_b_tell(log); 2631 2632 log->end_of_file= end_pos; 2633 while (linfo->pos < end_pos) 2634 { 2635 if (should_stop(info)) 2636 return 0; 2637 2638 /* reset the transmit packet for the event read from binary log 2639 file */ 2640 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) 2641 return 1; 2642 2643 info->last_pos= linfo->pos; 2644 error= Log_event::read_log_event(log, packet, info->fdev, 2645 opt_master_verify_checksum ? info->current_checksum_alg 2646 : BINLOG_CHECKSUM_ALG_OFF); 2647 linfo->pos= my_b_tell(log); 2648 2649 if (unlikely(error)) 2650 { 2651 set_read_error(info, error); 2652 return 1; 2653 } 2654 2655 Log_event_type event_type= 2656 (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); 2657 2658 #ifndef DBUG_OFF 2659 if (info->dbug_reconnect_counter > 0) 2660 { 2661 --info->dbug_reconnect_counter; 2662 if (info->dbug_reconnect_counter == 0) 2663 { 2664 info->errmsg= "DBUG-injected forced reconnect"; 2665 info->error= ER_UNKNOWN_ERROR; 2666 return 1; 2667 } 2668 } 2669 #endif 2670 2671 #ifdef ENABLED_DEBUG_SYNC 2672 DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", 2673 { 2674 if (event_type == XID_EVENT) 2675 { 2676 net_flush(info->net); 2677 const char act[]= 2678 "now " 2679 "wait_for signal.continue"; 2680 DBUG_ASSERT(debug_sync_service); 2681 DBUG_ASSERT(!debug_sync_set_action( 2682 info->thd, 2683 STRING_WITH_LEN(act))); 2684 2685 const char act2[]= 2686 "now " 2687 "signal signal.continued"; 2688 DBUG_ASSERT(!debug_sync_set_action( 2689 info->thd, 2690 STRING_WITH_LEN(act2))); 2691 } 2692 }); 2693 #endif 2694 2695 if (event_type != START_ENCRYPTION_EVENT && 2696 ((info->errmsg= send_event_to_slave(info, event_type, log, 2697 ev_offset, &info->error_gtid)))) 2698 return 1; 2699 2700 if (unlikely(info->send_fake_gtid_list) && 2701 info->gtid_skip_group == GTID_SKIP_NOT) 2702 { 2703 Gtid_list_log_event glev(&info->until_binlog_state, 0); 2704 2705 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) || 2706 fake_gtid_list_event(info, &glev, &info->errmsg, (uint32)my_b_tell(log))) 2707 { 2708 info->error= ER_UNKNOWN_ERROR; 2709 return 1; 2710 } 2711 info->send_fake_gtid_list= false; 2712 } 2713 2714 if (info->until_gtid_state && 2715 is_until_reached(info, &ev_offset, event_type, &info->errmsg, 2716 (uint32)my_b_tell(log))) 2717 { 2718 if (info->errmsg) 2719 { 2720 info->error= ER_UNKNOWN_ERROR; 2721 return 1; 2722 } 2723 info->should_stop= true; 2724 return 0; 2725 } 2726 2727 /* Abort server before it sends the XID_EVENT */ 2728 DBUG_EXECUTE_IF("crash_before_send_xid", 2729 { 2730 if (event_type == XID_EVENT) 2731 { 2732 my_sleep(2000000); 2733 DBUG_SUICIDE(); 2734 } 2735 }); 2736 } 2737 2738 return 0; 2739 } 2740 2741 /** 2742 * This function sends one binlog file to slave 2743 * 2744 * return 0 - OK 2745 * 1 - NOK 2746 */ 2747 static int send_one_binlog_file(binlog_send_info *info, 2748 IO_CACHE* log, 2749 LOG_INFO* linfo, 2750 my_off_t start_pos) 2751 { 2752 mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock()); 2753 2754 /* seek to the requested position, to start the requested dump */ 2755 if (start_pos != BIN_LOG_HEADER_SIZE) 2756 { 2757 my_b_seek(log, start_pos); 2758 linfo->pos= start_pos; 2759 } 2760 2761 while (!should_stop(info)) 2762 { 2763 /** 2764 * get end pos of current log file, this function 2765 * will wait if there is nothing available 2766 */ 2767 my_off_t end_pos= get_binlog_end_pos(info, log, linfo); 2768 if (end_pos <= 1) 2769 { 2770 /** end of file or error */ 2771 return (int)end_pos; 2772 } 2773 info->dirlen= dirname_length(info->log_file_name); 2774 /** 2775 * send events from current position up to end_pos 2776 */ 2777 if (send_events(info, log, linfo, end_pos)) 2778 return 1; 2779 } 2780 2781 return 1; 2782 } 2783 2784 void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, 2785 ushort flags) 2786 { 2787 LOG_INFO linfo; 2788 2789 IO_CACHE log; 2790 File file = -1; 2791 String* const packet= &thd->packet; 2792 2793 binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name); 2794 binlog_send_info *info= &infoobj; 2795 bool has_transmit_started= false; 2796 2797 int old_max_allowed_packet= thd->variables.max_allowed_packet; 2798 thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; 2799 2800 DBUG_ENTER("mysql_binlog_send"); 2801 DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); 2802 2803 bzero((char*) &log,sizeof(log)); 2804 2805 if (init_binlog_sender(info, &linfo, log_ident, &pos)) 2806 goto err; 2807 2808 has_transmit_started= true; 2809 2810 /* Check if the dump thread is created by a slave with semisync enabled. */ 2811 thd->semi_sync_slave = is_semi_sync_slave(); 2812 2813 DBUG_ASSERT(pos == linfo.pos); 2814 2815 if (repl_semisync_master.dump_start(thd, linfo.log_file_name, linfo.pos)) 2816 { 2817 info->errmsg= "Failed to run hook 'transmit_start'"; 2818 info->error= ER_UNKNOWN_ERROR; 2819 goto err; 2820 } 2821 2822 /* 2823 heartbeat_period from @master_heartbeat_period user variable 2824 NOTE: this is initialized after transmit_start-hook so that 2825 the hook can affect value of heartbeat period 2826 */ 2827 info->heartbeat_period= get_heartbeat_period(thd); 2828 2829 while (!should_stop(info)) 2830 { 2831 /* 2832 Tell the client about the log name with a fake Rotate event; 2833 this is needed even if we also send a Format_description_log_event 2834 just after, because that event does not contain the binlog's name. 2835 Note that as this Rotate event is sent before 2836 Format_description_log_event, the slave cannot have any info to 2837 understand this event's format, so the header len of 2838 Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter 2839 than other events except FORMAT_DESCRIPTION_EVENT). 2840 Before 4.0.14 we called fake_rotate_event below only if (pos == 2841 BIN_LOG_HEADER_SIZE), because if this is false then the slave 2842 already knows the binlog's name. 2843 Since, we always call fake_rotate_event; if the slave already knew 2844 the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is 2845 useless but does not harm much. It is nice for 3.23 (>=.58) slaves 2846 which test Rotate events to see if the master is 4.0 (then they 2847 choose to stop because they can't replicate 4.0); by always calling 2848 fake_rotate_event we are sure that 3.23.58 and newer will detect the 2849 problem as soon as replication starts (BUG#198). 2850 Always calling fake_rotate_event makes sending of normal 2851 (=from-binlog) Rotate events a priori unneeded, but it is not so 2852 simple: the 2 Rotate events are not equivalent, the normal one is 2853 before the Stop event, the fake one is after. If we don't send the 2854 normal one, then the Stop event will be interpreted (by existing 4.0 2855 slaves) as "the master stopped", which is wrong. So for safety, 2856 given that we want minimum modification of 4.0, we send the normal 2857 and fake Rotates. 2858 */ 2859 if (fake_rotate_event(info, pos, &info->errmsg, info->current_checksum_alg)) 2860 { 2861 /* 2862 This error code is not perfect, as fake_rotate_event() does not 2863 read anything from the binlog; if it fails it's because of an 2864 error in my_net_write(), fortunately it will say so in errmsg. 2865 */ 2866 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2867 goto err; 2868 } 2869 2870 if ((file=open_binlog(&log, linfo.log_file_name, &info->errmsg)) < 0) 2871 { 2872 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2873 goto err; 2874 } 2875 2876 if (send_format_descriptor_event(info, &log, &linfo, pos)) 2877 { 2878 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2879 goto err; 2880 } 2881 2882 /* 2883 We want to corrupt the first event that will be sent to the slave. 2884 But we do not want the corruption to happen early, eg. when client does 2885 BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to 2886 set the real DBUG injection here. 2887 */ 2888 DBUG_EXECUTE_IF("corrupt_read_log_event2_set", 2889 { 2890 DBUG_SET("-d,corrupt_read_log_event2_set"); 2891 DBUG_SET("+d,corrupt_read_log_event2"); 2892 }); 2893 2894 /* 2895 Handle the case of START SLAVE UNTIL with an UNTIL condition already 2896 fulfilled at the start position. 2897 2898 We will send one event, the format_description, and then stop. 2899 */ 2900 if (info->until_gtid_state && info->until_gtid_state->count() == 0) 2901 info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; 2902 2903 THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave); 2904 if (send_one_binlog_file(info, &log, &linfo, pos)) 2905 break; 2906 2907 if (should_stop(info)) 2908 break; 2909 2910 DBUG_EXECUTE_IF("wait_after_binlog_EOF", 2911 { 2912 const char act[]= "now wait_for signal.rotate_finished"; 2913 DBUG_ASSERT(!debug_sync_set_action(current_thd, 2914 STRING_WITH_LEN(act))); 2915 };); 2916 2917 THD_STAGE_INFO(thd, 2918 stage_finished_reading_one_binlog_switching_to_next_binlog); 2919 if (mysql_bin_log.find_next_log(&linfo, 1)) 2920 { 2921 info->errmsg= "could not find next log"; 2922 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2923 break; 2924 } 2925 2926 /** start from start of next file */ 2927 pos= BIN_LOG_HEADER_SIZE; 2928 2929 /** close current cache/file */ 2930 end_io_cache(&log); 2931 mysql_file_close(file, MYF(MY_WME)); 2932 file= -1; 2933 } 2934 2935 err: 2936 THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); 2937 if (has_transmit_started) 2938 { 2939 repl_semisync_master.dump_end(thd); 2940 } 2941 2942 if (info->thd->killed == KILL_SLAVE_SAME_ID) 2943 { 2944 info->errmsg= "A slave with the same server_uuid/server_id as this slave " 2945 "has connected to the master"; 2946 info->error= ER_SLAVE_SAME_ID; 2947 } 2948 2949 const bool binlog_open = my_b_inited(&log); 2950 if (file >= 0) 2951 { 2952 end_io_cache(&log); 2953 mysql_file_close(file, MYF(MY_WME)); 2954 } 2955 2956 thd->reset_current_linfo(); 2957 thd->variables.max_allowed_packet= old_max_allowed_packet; 2958 delete info->fdev; 2959 2960 if (likely(info->error == 0)) 2961 { 2962 my_eof(thd); 2963 DBUG_VOID_RETURN; 2964 } 2965 2966 if ((info->error == ER_MASTER_FATAL_ERROR_READING_BINLOG || 2967 info->error == ER_SLAVE_SAME_ID) && binlog_open) 2968 { 2969 /* 2970 detailing the fatal error message with coordinates 2971 of the last position read. 2972 */ 2973 my_snprintf(info->error_text, sizeof(info->error_text), 2974 "%s; the first event '%s' at %lld, " 2975 "the last event read from '%s' at %lld, " 2976 "the last byte read from '%s' at %lld.", 2977 info->errmsg, 2978 my_basename(info->start_log_file_name), info->start_pos, 2979 my_basename(info->log_file_name), info->last_pos, 2980 my_basename(info->log_file_name), linfo.pos); 2981 } 2982 else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG) 2983 { 2984 my_snprintf(info->error_text, sizeof(info->error_text), 2985 "Error: connecting slave requested to start from GTID " 2986 "%u-%u-%llu, which is not in the master's binlog", 2987 info->error_gtid.domain_id, 2988 info->error_gtid.server_id, 2989 info->error_gtid.seq_no); 2990 /* Use this error code so slave will know not to try reconnect. */ 2991 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 2992 } 2993 else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2) 2994 { 2995 my_snprintf(info->error_text, sizeof(info->error_text), 2996 "Error: connecting slave requested to start from GTID " 2997 "%u-%u-%llu, which is not in the master's binlog. Since the " 2998 "master's binlog contains GTIDs with higher sequence numbers, " 2999 "it probably means that the slave has diverged due to " 3000 "executing extra erroneous transactions", 3001 info->error_gtid.domain_id, 3002 info->error_gtid.server_id, 3003 info->error_gtid.seq_no); 3004 /* Use this error code so slave will know not to try reconnect. */ 3005 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 3006 } 3007 else if (info->error == ER_GTID_START_FROM_BINLOG_HOLE) 3008 { 3009 my_snprintf(info->error_text, sizeof(info->error_text), 3010 "The binlog on the master is missing the GTID %u-%u-%llu " 3011 "requested by the slave (even though both a prior and a " 3012 "subsequent sequence number does exist), and GTID strict mode " 3013 "is enabled", 3014 info->error_gtid.domain_id, 3015 info->error_gtid.server_id, 3016 info->error_gtid.seq_no); 3017 /* Use this error code so slave will know not to try reconnect. */ 3018 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 3019 } 3020 else if (info->error == ER_CANNOT_LOAD_SLAVE_GTID_STATE) 3021 { 3022 my_snprintf(info->error_text, sizeof(info->error_text), 3023 "Failed to load replication slave GTID state from table %s.%s", 3024 "mysql", rpl_gtid_slave_state_table_name.str); 3025 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; 3026 } 3027 else if (info->errmsg != NULL) 3028 strcpy(info->error_text, info->errmsg); 3029 3030 my_message(info->error, info->error_text, MYF(0)); 3031 3032 DBUG_VOID_RETURN; 3033 } 3034 3035 3036 /** 3037 Execute a START SLAVE statement. 3038 3039 @param thd Pointer to THD object for the client thread executing the 3040 statement. 3041 3042 @param mi Pointer to Master_info object for the slave's IO thread. 3043 3044 @param net_report If true, saves the exit status into thd->stmt_da. 3045 3046 @retval 0 success 3047 @retval 1 error 3048 @retval -1 fatal error 3049 */ 3050 3051 int start_slave(THD* thd , Master_info* mi, bool net_report) 3052 { 3053 int slave_errno= 0; 3054 int thread_mask; 3055 char master_info_file_tmp[FN_REFLEN]; 3056 char relay_log_info_file_tmp[FN_REFLEN]; 3057 DBUG_ENTER("start_slave"); 3058 3059 if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) 3060 DBUG_RETURN(-1); 3061 3062 create_logfile_name_with_suffix(master_info_file_tmp, 3063 sizeof(master_info_file_tmp), 3064 master_info_file, 0, 3065 &mi->cmp_connection_name); 3066 create_logfile_name_with_suffix(relay_log_info_file_tmp, 3067 sizeof(relay_log_info_file_tmp), 3068 relay_log_info_file, 0, 3069 &mi->cmp_connection_name); 3070 3071 mi->lock_slave_threads(); 3072 if (mi->killed) 3073 { 3074 /* connection was deleted while we waited for lock_slave_threads */ 3075 mi->unlock_slave_threads(); 3076 my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length, 3077 mi->connection_name.str); 3078 DBUG_RETURN(-1); 3079 } 3080 3081 // Get a mask of _stopped_ threads 3082 init_thread_mask(&thread_mask,mi,1 /* inverse */); 3083 3084 if (thd->lex->mi.gtid_pos_str.str) 3085 { 3086 if (thread_mask != (SLAVE_IO|SLAVE_SQL)) 3087 { 3088 slave_errno= ER_SLAVE_WAS_RUNNING; 3089 goto err; 3090 } 3091 if (thd->lex->slave_thd_opt) 3092 { 3093 slave_errno= ER_BAD_SLAVE_UNTIL_COND; 3094 goto err; 3095 } 3096 if (mi->using_gtid == Master_info::USE_GTID_NO) 3097 { 3098 slave_errno= ER_UNTIL_REQUIRES_USING_GTID; 3099 goto err; 3100 } 3101 } 3102 3103 /* 3104 Below we will start all stopped threads. But if the user wants to 3105 start only one thread, do as if the other thread was running (as we 3106 don't wan't to touch the other thread), so set the bit to 0 for the 3107 other thread 3108 */ 3109 if (thd->lex->slave_thd_opt) 3110 thread_mask&= thd->lex->slave_thd_opt; 3111 if (thread_mask) //some threads are stopped, start them 3112 { 3113 if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0, 3114 thread_mask)) 3115 slave_errno=ER_MASTER_INFO; 3116 else if (!*mi->host) 3117 { 3118 slave_errno= ER_BAD_SLAVE; net_report= 0; 3119 my_message(slave_errno, "Misconfigured slave: MASTER_HOST was not set; Fix in config file or with CHANGE MASTER TO", 3120 MYF(0)); 3121 } 3122 else 3123 { 3124 /* 3125 If we will start SQL thread we will care about UNTIL options If 3126 not and they are specified we will ignore them and warn user 3127 about this fact. 3128 */ 3129 if (thread_mask & SLAVE_SQL) 3130 { 3131 mysql_mutex_lock(&mi->rli.data_lock); 3132 3133 if (thd->lex->mi.pos) 3134 { 3135 if (thd->lex->mi.relay_log_pos) 3136 slave_errno=ER_BAD_SLAVE_UNTIL_COND; 3137 mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS; 3138 mi->rli.until_log_pos= thd->lex->mi.pos; 3139 /* 3140 We don't check thd->lex->mi.log_file_name for NULL here 3141 since it is checked in sql_yacc.yy 3142 */ 3143 strmake_buf(mi->rli.until_log_name, thd->lex->mi.log_file_name); 3144 } 3145 else if (thd->lex->mi.relay_log_pos) 3146 { 3147 mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS; 3148 mi->rli.until_log_pos= thd->lex->mi.relay_log_pos; 3149 strmake_buf(mi->rli.until_log_name, thd->lex->mi.relay_log_name); 3150 } 3151 else if (thd->lex->mi.gtid_pos_str.str) 3152 { 3153 if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str, 3154 thd->lex->mi.gtid_pos_str.length)) 3155 { 3156 slave_errno= ER_INCORRECT_GTID_STATE; 3157 mysql_mutex_unlock(&mi->rli.data_lock); 3158 goto err; 3159 } 3160 mi->rli.until_condition= Relay_log_info::UNTIL_GTID; 3161 } 3162 else 3163 mi->rli.clear_until_condition(); 3164 3165 if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS || 3166 mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS) 3167 { 3168 /* Preparing members for effective until condition checking */ 3169 const char *p= fn_ext(mi->rli.until_log_name); 3170 char *p_end; 3171 if (*p) 3172 { 3173 //p points to '.' 3174 mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10); 3175 /* 3176 p_end points to the first invalid character. If it equals 3177 to p, no digits were found, error. If it contains '\0' it 3178 means conversion went ok. 3179 */ 3180 if (p_end==p || *p_end) 3181 slave_errno=ER_BAD_SLAVE_UNTIL_COND; 3182 } 3183 else 3184 slave_errno=ER_BAD_SLAVE_UNTIL_COND; 3185 3186 /* mark the cached result of the UNTIL comparison as "undefined" */ 3187 mi->rli.until_log_names_cmp_result= 3188 Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN; 3189 } 3190 3191 if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE) 3192 { 3193 /* Issuing warning then started without --skip-slave-start */ 3194 if (!opt_skip_slave_start) 3195 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 3196 ER_MISSING_SKIP_SLAVE, 3197 ER_THD(thd, ER_MISSING_SKIP_SLAVE)); 3198 } 3199 3200 mysql_mutex_unlock(&mi->rli.data_lock); 3201 } 3202 else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos) 3203 push_warning(thd, 3204 Sql_condition::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED, 3205 ER_THD(thd, ER_UNTIL_COND_IGNORED)); 3206 3207 if (!slave_errno) 3208 slave_errno = start_slave_threads(thd, 3209 1, 3210 1 /* wait for start */, 3211 mi, 3212 master_info_file_tmp, 3213 relay_log_info_file_tmp, 3214 thread_mask); 3215 } 3216 } 3217 else 3218 { 3219 /* no error if all threads are already started, only a warning */ 3220 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING, 3221 ER_THD(thd, ER_SLAVE_WAS_RUNNING)); 3222 } 3223 3224 err: 3225 mi->unlock_slave_threads(); 3226 thd_proc_info(thd, 0); 3227 3228 if (slave_errno) 3229 { 3230 if (net_report) 3231 my_error(slave_errno, MYF(0), 3232 (int) mi->connection_name.length, 3233 mi->connection_name.str); 3234 DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1); 3235 } 3236 3237 DBUG_RETURN(0); 3238 } 3239 3240 3241 /** 3242 Execute a STOP SLAVE statement. 3243 3244 @param thd Pointer to THD object for the client thread executing the 3245 statement. 3246 3247 @param mi Pointer to Master_info object for the slave's IO thread. 3248 3249 @param net_report If true, saves the exit status into thd->stmt_da. 3250 3251 @retval 0 success 3252 @retval 1 error 3253 @retval -1 error 3254 */ 3255 3256 int stop_slave(THD* thd, Master_info* mi, bool net_report ) 3257 { 3258 int slave_errno; 3259 DBUG_ENTER("stop_slave"); 3260 DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str)); 3261 3262 if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) 3263 DBUG_RETURN(-1); 3264 THD_STAGE_INFO(thd, stage_killing_slave); 3265 int thread_mask; 3266 mi->lock_slave_threads(); 3267 /* 3268 Get a mask of _running_ threads. 3269 We don't have to test for mi->killed as the thread_mask will take care 3270 of checking if threads exists 3271 */ 3272 init_thread_mask(&thread_mask,mi,0 /* not inverse*/); 3273 /* 3274 Below we will stop all running threads. 3275 But if the user wants to stop only one thread, do as if the other thread 3276 was stopped (as we don't wan't to touch the other thread), so set the 3277 bit to 0 for the other thread 3278 */ 3279 if (thd->lex->slave_thd_opt) 3280 thread_mask &= thd->lex->slave_thd_opt; 3281 3282 if (thread_mask) 3283 { 3284 slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */); 3285 } 3286 else 3287 { 3288 //no error if both threads are already stopped, only a warning 3289 slave_errno= 0; 3290 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING, 3291 ER_THD(thd, ER_SLAVE_WAS_NOT_RUNNING)); 3292 } 3293 3294 mi->unlock_slave_threads(); 3295 3296 if (slave_errno) 3297 { 3298 if (net_report) 3299 my_message(slave_errno, ER_THD(thd, slave_errno), MYF(0)); 3300 DBUG_RETURN(1); 3301 } 3302 3303 DBUG_RETURN(0); 3304 } 3305 3306 3307 /** 3308 Execute a RESET SLAVE statement. 3309 3310 @param thd Pointer to THD object of the client thread executing the 3311 statement. 3312 3313 @param mi Pointer to Master_info object for the slave. 3314 3315 @retval 0 success 3316 @retval 1 error 3317 */ 3318 int reset_slave(THD *thd, Master_info* mi) 3319 { 3320 MY_STAT stat_area; 3321 char fname[FN_REFLEN]; 3322 int thread_mask= 0, error= 0; 3323 uint sql_errno=ER_UNKNOWN_ERROR; 3324 const char* errmsg= "Unknown error occurred while resetting slave"; 3325 char master_info_file_tmp[FN_REFLEN]; 3326 char relay_log_info_file_tmp[FN_REFLEN]; 3327 DBUG_ENTER("reset_slave"); 3328 3329 mi->lock_slave_threads(); 3330 if (mi->killed) 3331 { 3332 /* connection was deleted while we waited for lock_slave_threads */ 3333 mi->unlock_slave_threads(); 3334 my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length, 3335 mi->connection_name.str); 3336 DBUG_RETURN(-1); 3337 } 3338 3339 init_thread_mask(&thread_mask,mi,0 /* not inverse */); 3340 if (thread_mask) // We refuse if any slave thread is running 3341 { 3342 mi->unlock_slave_threads(); 3343 my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, 3344 mi->connection_name.str); 3345 DBUG_RETURN(ER_SLAVE_MUST_STOP); 3346 } 3347 3348 // delete relay logs, clear relay log coordinates 3349 if (unlikely((error= purge_relay_logs(&mi->rli, thd, 3350 1 /* just reset */, 3351 &errmsg)))) 3352 { 3353 sql_errno= ER_RELAY_LOG_FAIL; 3354 goto err; 3355 } 3356 3357 /* Clear master's log coordinates and associated information */ 3358 mi->clear_in_memory_info(thd->lex->reset_slave_info.all); 3359 3360 /* 3361 Reset errors (the idea is that we forget about the 3362 old master). 3363 */ 3364 mi->clear_error(); 3365 mi->rli.clear_error(); 3366 mi->rli.clear_until_condition(); 3367 mi->rli.clear_sql_delay(); 3368 mi->rli.slave_skip_counter= 0; 3369 3370 // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 3371 end_master_info(mi); 3372 3373 end_relay_log_info(&mi->rli); 3374 // and delete these two files 3375 create_logfile_name_with_suffix(master_info_file_tmp, 3376 sizeof(master_info_file_tmp), 3377 master_info_file, 0, 3378 &mi->cmp_connection_name); 3379 create_logfile_name_with_suffix(relay_log_info_file_tmp, 3380 sizeof(relay_log_info_file_tmp), 3381 relay_log_info_file, 0, 3382 &mi->cmp_connection_name); 3383 3384 fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32); 3385 if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) && 3386 mysql_file_delete(key_file_master_info, fname, MYF(MY_WME))) 3387 { 3388 error=1; 3389 goto err; 3390 } 3391 else if (global_system_variables.log_warnings > 1) 3392 sql_print_information("Deleted Master_info file '%s'.", fname); 3393 3394 // delete relay_log_info_file 3395 fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32); 3396 if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) && 3397 mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME))) 3398 { 3399 error=1; 3400 goto err; 3401 } 3402 else if (global_system_variables.log_warnings > 1) 3403 sql_print_information("Deleted Master_info file '%s'.", fname); 3404 3405 if (rpl_semi_sync_slave_enabled) 3406 repl_semisync_slave.reset_slave(mi); 3407 err: 3408 mi->unlock_slave_threads(); 3409 if (unlikely(error)) 3410 my_error(sql_errno, MYF(0), errmsg); 3411 DBUG_RETURN(error); 3412 } 3413 3414 /* 3415 3416 Kill all Binlog_dump threads which previously talked to the same slave 3417 ("same" means with the same server id). Indeed, if the slave stops, if the 3418 Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it 3419 will keep existing until a query is written to the binlog. If the master is 3420 idle, then this could last long, and if the slave reconnects, we could have 2 3421 Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the 3422 binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP, 3423 the master kills any existing thread with the slave's server id (if this id 3424 is not zero; it will be true for real slaves, but false for mysqlbinlog when 3425 it sends COM_BINLOG_DUMP to get a remote binlog dump). 3426 3427 SYNOPSIS 3428 kill_zombie_dump_threads() 3429 slave_server_id the slave's server id 3430 */ 3431 3432 void kill_zombie_dump_threads(uint32 slave_server_id) 3433 { 3434 mysql_mutex_lock(&LOCK_thread_count); 3435 I_List_iterator<THD> it(threads); 3436 THD *tmp; 3437 3438 while ((tmp=it++)) 3439 { 3440 if (tmp->get_command() == COM_BINLOG_DUMP && 3441 tmp->variables.server_id == slave_server_id) 3442 { 3443 mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete 3444 break; 3445 } 3446 } 3447 mysql_mutex_unlock(&LOCK_thread_count); 3448 if (tmp) 3449 { 3450 /* 3451 Here we do not call kill_one_thread() as 3452 it will be slow because it will iterate through the list 3453 again. We just to do kill the thread ourselves. 3454 */ 3455 tmp->awake_no_mutex(KILL_SLAVE_SAME_ID); 3456 mysql_mutex_unlock(&tmp->LOCK_thd_kill); 3457 } 3458 } 3459 3460 /** 3461 Get value for a string parameter with error checking 3462 3463 Note that in case of error the original string should not be updated! 3464 3465 @ret 0 ok 3466 @ret 1 error 3467 */ 3468 3469 static bool get_string_parameter(char *to, const char *from, size_t length, 3470 const char *name, CHARSET_INFO *cs) 3471 { 3472 if (from) // Empty paramaters allowed 3473 { 3474 size_t from_length= strlen(from); 3475 size_t from_numchars= cs->cset->numchars(cs, from, from + from_length); 3476 if (from_numchars > length / cs->mbmaxlen) 3477 { 3478 my_error(ER_WRONG_STRING_LENGTH, MYF(0), from, name, 3479 (int) (length / cs->mbmaxlen)); 3480 return 1; 3481 } 3482 memcpy(to, from, from_length+1); 3483 } 3484 return 0; 3485 } 3486 3487 3488 /** 3489 Execute a CHANGE MASTER statement. 3490 3491 @param thd Pointer to THD object for the client thread executing the 3492 statement. 3493 3494 @param mi Pointer to Master_info object belonging to the slave's IO 3495 thread. 3496 3497 @param master_info_added Out parameter saying if the Master_info *mi was 3498 added to the global list of masters. This is useful in error conditions 3499 to know if caller should free Master_info *mi. 3500 3501 @retval FALSE success 3502 @retval TRUE error 3503 */ 3504 bool change_master(THD* thd, Master_info* mi, bool *master_info_added) 3505 { 3506 int thread_mask; 3507 const char* errmsg= 0; 3508 bool need_relay_log_purge= 1; 3509 bool ret= FALSE; 3510 char saved_host[HOSTNAME_LENGTH + 1]; 3511 uint saved_port; 3512 char saved_log_name[FN_REFLEN]; 3513 Master_info::enum_using_gtid saved_using_gtid; 3514 char master_info_file_tmp[FN_REFLEN]; 3515 char relay_log_info_file_tmp[FN_REFLEN]; 3516 my_off_t saved_log_pos; 3517 LEX_MASTER_INFO* lex_mi= &thd->lex->mi; 3518 DYNAMIC_ARRAY *do_ids, *ignore_ids; 3519 3520 DBUG_ENTER("change_master"); 3521 3522 DBUG_ASSERT(master_info_index); 3523 mysql_mutex_assert_owner(&LOCK_active_mi); 3524 3525 *master_info_added= false; 3526 /* 3527 We need to check if there is an empty master_host. Otherwise 3528 change master succeeds, a master.info file is created containing 3529 empty master_host string and when issuing: start slave; an error 3530 is thrown stating that the server is not configured as slave. 3531 (See BUG#28796). 3532 */ 3533 if (lex_mi->host && !*lex_mi->host) 3534 { 3535 my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST"); 3536 DBUG_RETURN(TRUE); 3537 } 3538 if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name, 3539 lex_mi->host, 3540 lex_mi->port)) 3541 DBUG_RETURN(TRUE); 3542 3543 mi->lock_slave_threads(); 3544 if (mi->killed) 3545 { 3546 /* connection was deleted while we waited for lock_slave_threads */ 3547 mi->unlock_slave_threads(); 3548 my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length, 3549 mi->connection_name.str); 3550 DBUG_RETURN(TRUE); 3551 } 3552 3553 init_thread_mask(&thread_mask,mi,0 /*not inverse*/); 3554 if (thread_mask) // We refuse if any slave thread is running 3555 { 3556 my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, 3557 mi->connection_name.str); 3558 ret= TRUE; 3559 goto err; 3560 } 3561 3562 THD_STAGE_INFO(thd, stage_changing_master); 3563 3564 create_logfile_name_with_suffix(master_info_file_tmp, 3565 sizeof(master_info_file_tmp), 3566 master_info_file, 0, 3567 &mi->cmp_connection_name); 3568 create_logfile_name_with_suffix(relay_log_info_file_tmp, 3569 sizeof(relay_log_info_file_tmp), 3570 relay_log_info_file, 0, 3571 &mi->cmp_connection_name); 3572 3573 /* if new Master_info doesn't exists, add it */ 3574 if (!master_info_index->get_master_info(&mi->connection_name, 3575 Sql_condition::WARN_LEVEL_NOTE)) 3576 { 3577 if (master_info_index->add_master_info(mi, TRUE)) 3578 { 3579 my_error(ER_MASTER_INFO, MYF(0), 3580 (int) lex_mi->connection_name.length, 3581 lex_mi->connection_name.str); 3582 ret= TRUE; 3583 goto err; 3584 } 3585 *master_info_added= true; 3586 } 3587 if (global_system_variables.log_warnings > 1) 3588 sql_print_information("Master connection name: '%.*s' " 3589 "Master_info_file: '%s' " 3590 "Relay_info_file: '%s'", 3591 (int) mi->connection_name.length, 3592 mi->connection_name.str, 3593 master_info_file_tmp, relay_log_info_file_tmp); 3594 3595 if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0, 3596 thread_mask)) 3597 { 3598 my_error(ER_MASTER_INFO, MYF(0), 3599 (int) lex_mi->connection_name.length, 3600 lex_mi->connection_name.str); 3601 ret= TRUE; 3602 goto err; 3603 } 3604 3605 /* 3606 Data lock not needed since we have already stopped the running threads, 3607 and we have the hold on the run locks which will keep all threads that 3608 could possibly modify the data structures from running 3609 */ 3610 3611 /* 3612 Before processing the command, save the previous state. 3613 */ 3614 strmake_buf(saved_host, mi->host); 3615 saved_port= mi->port; 3616 strmake_buf(saved_log_name, mi->master_log_name); 3617 saved_log_pos= mi->master_log_pos; 3618 saved_using_gtid= mi->using_gtid; 3619 3620 /* 3621 If the user specified host or port without binlog or position, 3622 reset binlog's name to FIRST and position to 4. 3623 */ 3624 3625 if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos) 3626 { 3627 mi->master_log_name[0] = 0; 3628 mi->master_log_pos= BIN_LOG_HEADER_SIZE; 3629 } 3630 3631 if (lex_mi->log_file_name) 3632 strmake_buf(mi->master_log_name, lex_mi->log_file_name); 3633 if (lex_mi->pos) 3634 { 3635 mi->master_log_pos= lex_mi->pos; 3636 } 3637 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); 3638 3639 if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1, 3640 "MASTER_HOST", system_charset_info) || 3641 get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1, 3642 "MASTER_USER", system_charset_info) || 3643 get_string_parameter(mi->password, lex_mi->password, 3644 sizeof(mi->password)-1, "MASTER_PASSWORD", 3645 &my_charset_bin)) 3646 { 3647 ret= TRUE; 3648 goto err; 3649 } 3650 3651 if (lex_mi->port) 3652 mi->port = lex_mi->port; 3653 if (lex_mi->connect_retry) 3654 mi->connect_retry = lex_mi->connect_retry; 3655 if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) 3656 mi->heartbeat_period = lex_mi->heartbeat_period; 3657 else 3658 mi->heartbeat_period= (float) MY_MIN(SLAVE_MAX_HEARTBEAT_PERIOD, 3659 (slave_net_timeout/2.0)); 3660 mi->received_heartbeats= 0; // counter lives until master is CHANGEd 3661 3662 /* 3663 Reset the last time server_id list if the current CHANGE MASTER 3664 is mentioning IGNORE_SERVER_IDS= (...) 3665 */ 3666 if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE) 3667 { 3668 /* Check if the list contains replicate_same_server_id */ 3669 for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i ++) 3670 { 3671 ulong s_id; 3672 get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i); 3673 if (s_id == global_system_variables.server_id && replicate_same_server_id) 3674 { 3675 my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id)); 3676 ret= TRUE; 3677 goto err; 3678 } 3679 } 3680 3681 /* All ok. Update the old server ids with the new ones. */ 3682 update_change_master_ids(&lex_mi->repl_ignore_server_ids, 3683 &mi->ignore_server_ids); 3684 } 3685 3686 if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED) 3687 mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE); 3688 3689 if (lex_mi->sql_delay != -1) 3690 mi->rli.set_sql_delay(lex_mi->sql_delay); 3691 3692 if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED) 3693 mi->ssl_verify_server_cert= 3694 (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE); 3695 3696 if (lex_mi->ssl_ca) 3697 strmake_buf(mi->ssl_ca, lex_mi->ssl_ca); 3698 if (lex_mi->ssl_capath) 3699 strmake_buf(mi->ssl_capath, lex_mi->ssl_capath); 3700 if (lex_mi->ssl_cert) 3701 strmake_buf(mi->ssl_cert, lex_mi->ssl_cert); 3702 if (lex_mi->ssl_cipher) 3703 strmake_buf(mi->ssl_cipher, lex_mi->ssl_cipher); 3704 if (lex_mi->ssl_key) 3705 strmake_buf(mi->ssl_key, lex_mi->ssl_key); 3706 if (lex_mi->ssl_crl) 3707 strmake_buf(mi->ssl_crl, lex_mi->ssl_crl); 3708 if (lex_mi->ssl_crlpath) 3709 strmake_buf(mi->ssl_crlpath, lex_mi->ssl_crlpath); 3710 3711 #ifndef HAVE_OPENSSL 3712 if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath || 3713 lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key || 3714 lex_mi->ssl_verify_server_cert || lex_mi->ssl_crl || lex_mi->ssl_crlpath) 3715 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, 3716 ER_SLAVE_IGNORED_SSL_PARAMS, 3717 ER_THD(thd, ER_SLAVE_IGNORED_SSL_PARAMS)); 3718 #endif 3719 3720 if (lex_mi->relay_log_name) 3721 { 3722 need_relay_log_purge= 0; 3723 char relay_log_name[FN_REFLEN]; 3724 mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name); 3725 strmake_buf(mi->rli.group_relay_log_name, relay_log_name); 3726 strmake_buf(mi->rli.event_relay_log_name, relay_log_name); 3727 } 3728 3729 if (lex_mi->relay_log_pos) 3730 { 3731 need_relay_log_purge= 0; 3732 mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos; 3733 } 3734 3735 if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_SLAVE_POS) 3736 mi->using_gtid= Master_info::USE_GTID_SLAVE_POS; 3737 else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_CURRENT_POS) 3738 mi->using_gtid= Master_info::USE_GTID_CURRENT_POS; 3739 else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_NO || 3740 lex_mi->log_file_name || lex_mi->pos || 3741 lex_mi->relay_log_name || lex_mi->relay_log_pos) 3742 mi->using_gtid= Master_info::USE_GTID_NO; 3743 3744 do_ids= ((lex_mi->repl_do_domain_ids_opt == 3745 LEX_MASTER_INFO::LEX_MI_ENABLE) ? 3746 &lex_mi->repl_do_domain_ids : NULL); 3747 3748 ignore_ids= ((lex_mi->repl_ignore_domain_ids_opt == 3749 LEX_MASTER_INFO::LEX_MI_ENABLE) ? 3750 &lex_mi->repl_ignore_domain_ids : NULL); 3751 3752 /* 3753 Note: mi->using_gtid stores the previous state in case no MASTER_USE_GTID 3754 is specified. 3755 */ 3756 if (mi->domain_id_filter.update_ids(do_ids, ignore_ids, mi->using_gtid)) 3757 { 3758 my_error(ER_MASTER_INFO, MYF(0), 3759 (int) lex_mi->connection_name.length, 3760 lex_mi->connection_name.str); 3761 ret= TRUE; 3762 goto err; 3763 } 3764 3765 /* 3766 If user did specify neither host nor port nor any log name nor any log 3767 pos, i.e. he specified only user/password/master_connect_retry, he probably 3768 wants replication to resume from where it had left, i.e. from the 3769 coordinates of the **SQL** thread (imagine the case where the I/O is ahead 3770 of the SQL; restarting from the coordinates of the I/O would lose some 3771 events which is probably unwanted when you are just doing minor changes 3772 like changing master_connect_retry). 3773 A side-effect is that if only the I/O thread was started, this thread may 3774 restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a 3775 much more unlikely situation than the one we are fixing here). 3776 Note: coordinates of the SQL thread must be read here, before the 3777 'if (need_relay_log_purge)' block which resets them. 3778 */ 3779 if (!lex_mi->host && !lex_mi->port && 3780 !lex_mi->log_file_name && !lex_mi->pos && 3781 need_relay_log_purge) 3782 { 3783 /* 3784 Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is 3785 not initialized), so we use a MY_MAX(). 3786 What happens to mi->rli.master_log_pos during the initialization stages 3787 of replication is not 100% clear, so we guard against problems using 3788 MY_MAX(). 3789 */ 3790 mi->master_log_pos = MY_MAX(BIN_LOG_HEADER_SIZE, 3791 mi->rli.group_master_log_pos); 3792 strmake_buf(mi->master_log_name, mi->rli.group_master_log_name); 3793 } 3794 3795 /* 3796 Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never 3797 a slave before). 3798 */ 3799 if (flush_master_info(mi, FALSE, FALSE)) 3800 { 3801 my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file"); 3802 ret= TRUE; 3803 goto err; 3804 } 3805 if (need_relay_log_purge) 3806 { 3807 THD_STAGE_INFO(thd, stage_purging_old_relay_logs); 3808 if (purge_relay_logs(&mi->rli, thd, 3809 0 /* not only reset, but also reinit */, 3810 &errmsg)) 3811 { 3812 my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg); 3813 ret= TRUE; 3814 goto err; 3815 } 3816 } 3817 else 3818 { 3819 const char* msg; 3820 /* Relay log is already initialized */ 3821 if (init_relay_log_pos(&mi->rli, 3822 mi->rli.group_relay_log_name, 3823 mi->rli.group_relay_log_pos, 3824 0 /*no data lock*/, 3825 &msg, 0)) 3826 { 3827 my_error(ER_RELAY_LOG_INIT, MYF(0), msg); 3828 ret= TRUE; 3829 goto err; 3830 } 3831 } 3832 /* 3833 Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block, 3834 so restore them to good values. If we left them to ''/0, that would work; 3835 but that would fail in the case of 2 successive CHANGE MASTER (without a 3836 START SLAVE in between): because first one would set the coords in mi to 3837 the good values of those in rli, the set those in rli to ''/0, then 3838 second CHANGE MASTER would set the coords in mi to those of rli, i.e. to 3839 ''/0: we have lost all copies of the original good coordinates. 3840 That's why we always save good coords in rli. 3841 */ 3842 mi->rli.group_master_log_pos= mi->master_log_pos; 3843 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); 3844 strmake_buf(mi->rli.group_master_log_name,mi->master_log_name); 3845 3846 if (!mi->rli.group_master_log_name[0]) // uninitialized case 3847 mi->rli.group_master_log_pos=0; 3848 3849 mysql_mutex_lock(&mi->rli.data_lock); 3850 mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */ 3851 /* Clear the errors, for a clean start */ 3852 mi->rli.clear_error(); 3853 mi->rli.clear_until_condition(); 3854 mi->rli.slave_skip_counter= 0; 3855 3856 sql_print_information("'CHANGE MASTER TO executed'. " 3857 "Previous state master_host='%s', master_port='%u', master_log_file='%s', " 3858 "master_log_pos='%ld'. " 3859 "New state master_host='%s', master_port='%u', master_log_file='%s', " 3860 "master_log_pos='%ld'.", saved_host, saved_port, saved_log_name, 3861 (ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name, 3862 (ulong) mi->master_log_pos); 3863 if (saved_using_gtid != Master_info::USE_GTID_NO || 3864 mi->using_gtid != Master_info::USE_GTID_NO) 3865 sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s", 3866 mi->using_gtid_astext(saved_using_gtid), 3867 mi->using_gtid_astext(mi->using_gtid)); 3868 3869 /* 3870 If we don't write new coordinates to disk now, then old will remain in 3871 relay-log.info until START SLAVE is issued; but if mysqld is shutdown 3872 before START SLAVE, then old will remain in relay-log.info, and will be the 3873 in-memory value at restart (thus causing errors, as the old relay log does 3874 not exist anymore). 3875 */ 3876 if (mi->rli.flush()) 3877 ret= 1; 3878 mysql_cond_broadcast(&mi->data_cond); 3879 mysql_mutex_unlock(&mi->rli.data_lock); 3880 3881 err: 3882 mi->unlock_slave_threads(); 3883 if (ret == FALSE) 3884 my_ok(thd); 3885 else 3886 { 3887 /* 3888 Depending on where CHANGE MASTER failed, the logs may be waiting to be 3889 reopened. This would break future log updates and CHANGE MASTER calls. 3890 `try_fix_log_state()` allows the relay log to fix its state to no longer 3891 expect to be reopened. 3892 */ 3893 mi->rli.relay_log.try_fix_log_state(); 3894 } 3895 DBUG_RETURN(ret); 3896 } 3897 3898 3899 /** 3900 Execute a RESET MASTER statement. 3901 3902 @param thd Pointer to THD object of the client thread executing the 3903 statement. 3904 3905 @retval 0 success 3906 @retval 1 error 3907 */ 3908 int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len, 3909 ulong next_log_number) 3910 { 3911 if (!mysql_bin_log.is_open()) 3912 { 3913 my_message(ER_FLUSH_MASTER_BINLOG_CLOSED, 3914 ER_THD(thd, ER_FLUSH_MASTER_BINLOG_CLOSED), 3915 MYF(ME_BELL+ME_WAITTANG)); 3916 return 1; 3917 } 3918 3919 bool ret= 0; 3920 /* Temporarily disable master semisync before resetting master. */ 3921 repl_semisync_master.before_reset_master(); 3922 ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len, 3923 next_log_number); 3924 repl_semisync_master.after_reset_master(); 3925 return ret; 3926 } 3927 3928 3929 /** 3930 Execute a SHOW BINLOG EVENTS statement. 3931 3932 @param thd Pointer to THD object for the client thread executing the 3933 statement. 3934 3935 @retval FALSE success 3936 @retval TRUE failure 3937 */ 3938 bool mysql_show_binlog_events(THD* thd) 3939 { 3940 Protocol *protocol= thd->protocol; 3941 List<Item> field_list; 3942 char errmsg_buf[MYSYS_ERRMSG_SIZE]; 3943 const char *errmsg = 0; 3944 bool ret = TRUE; 3945 /* 3946 Using checksum validate the correctness of event pos specified in show 3947 binlog events command. 3948 */ 3949 bool verify_checksum_once= false; 3950 IO_CACHE log; 3951 File file = -1; 3952 MYSQL_BIN_LOG *binary_log= NULL; 3953 int old_max_allowed_packet= thd->variables.max_allowed_packet; 3954 Master_info *mi= 0; 3955 LOG_INFO linfo; 3956 LEX_MASTER_INFO *lex_mi= &thd->lex->mi; 3957 enum enum_binlog_checksum_alg checksum_alg; 3958 my_off_t binlog_size; 3959 MY_STAT s; 3960 3961 DBUG_ENTER("mysql_show_binlog_events"); 3962 3963 Log_event::init_show_field_list(thd, &field_list); 3964 if (protocol->send_result_set_metadata(&field_list, 3965 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) 3966 DBUG_RETURN(TRUE); 3967 3968 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS || 3969 thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS); 3970 3971 /* select which binary log to use: binlog or relay */ 3972 if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ) 3973 { 3974 binary_log= &mysql_bin_log; 3975 } 3976 else /* showing relay log contents */ 3977 { 3978 if (!lex_mi->connection_name.str) 3979 lex_mi->connection_name= thd->variables.default_master_connection; 3980 if (!(mi= get_master_info(&lex_mi->connection_name, 3981 Sql_condition::WARN_LEVEL_ERROR))) 3982 { 3983 DBUG_RETURN(TRUE); 3984 } 3985 binary_log= &(mi->rli.relay_log); 3986 } 3987 3988 Format_description_log_event *description_event= new 3989 Format_description_log_event(3); /* MySQL 4.0 by default */ 3990 3991 if (binary_log->is_open()) 3992 { 3993 SELECT_LEX_UNIT *unit= &thd->lex->unit; 3994 ha_rows event_count, limit_start, limit_end; 3995 my_off_t pos = MY_MAX(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly 3996 char search_file_name[FN_REFLEN], *name; 3997 const char *log_file_name = lex_mi->log_file_name; 3998 mysql_mutex_t *log_lock = binary_log->get_log_lock(); 3999 Log_event* ev; 4000 4001 if (mi) 4002 { 4003 /* We can unlock the mutex as we have a lock on the file */ 4004 mi->release(); 4005 mi= 0; 4006 } 4007 4008 unit->set_limit(thd->lex->current_select); 4009 limit_start= unit->offset_limit_cnt; 4010 limit_end= unit->select_limit_cnt; 4011 4012 name= search_file_name; 4013 if (log_file_name) 4014 binary_log->make_log_name(search_file_name, log_file_name); 4015 else 4016 name=0; // Find first log 4017 4018 linfo.index_file_offset = 0; 4019 4020 if (binary_log->find_log_pos(&linfo, name, 1)) 4021 { 4022 errmsg = "Could not find target log"; 4023 goto err; 4024 } 4025 4026 thd->current_linfo= &linfo; 4027 4028 if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0) 4029 goto err; 4030 4031 my_stat(linfo.log_file_name, &s, MYF(0)); 4032 binlog_size= s.st_size; 4033 if (lex_mi->pos > binlog_size) 4034 { 4035 sprintf(errmsg_buf, "Invalid pos specified. Requested from pos:%llu is " 4036 "greater than actual file size:%lu\n", lex_mi->pos, 4037 (ulong)s.st_size); 4038 errmsg= errmsg_buf; 4039 goto err; 4040 } 4041 4042 /* 4043 to account binlog event header size 4044 */ 4045 thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER; 4046 4047 mysql_mutex_lock(log_lock); 4048 4049 /* 4050 open_binlog() sought to position 4. 4051 Read the first event in case it's a Format_description_log_event, to 4052 know the format. If there's no such event, we are 3.23 or 4.x. This 4053 code, like before, can't read 3.23 binlogs. 4054 Also read the second event, in case it's a Start_encryption_log_event. 4055 This code will fail on a mixed relay log (one which has Format_desc then 4056 Rotate then Format_desc). 4057 */ 4058 4059 my_off_t scan_pos = BIN_LOG_HEADER_SIZE; 4060 while (scan_pos < pos) 4061 { 4062 ev= Log_event::read_log_event(&log, description_event, 4063 opt_master_verify_checksum); 4064 scan_pos = my_b_tell(&log); 4065 if (ev == NULL || !ev->is_valid()) 4066 { 4067 mysql_mutex_unlock(log_lock); 4068 errmsg = "Wrong offset or I/O error"; 4069 goto err; 4070 } 4071 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) 4072 { 4073 delete description_event; 4074 description_event= (Format_description_log_event*) ev; 4075 } 4076 else 4077 { 4078 if (ev->get_type_code() == START_ENCRYPTION_EVENT) 4079 { 4080 if (description_event->start_decryption((Start_encryption_log_event*) ev)) 4081 { 4082 delete ev; 4083 mysql_mutex_unlock(log_lock); 4084 errmsg = "Could not initialize decryption of binlog."; 4085 goto err; 4086 } 4087 } 4088 delete ev; 4089 break; 4090 } 4091 } 4092 4093 if (lex_mi->pos > BIN_LOG_HEADER_SIZE) 4094 { 4095 checksum_alg= description_event->checksum_alg; 4096 /* Validate user given position using checksum */ 4097 if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF && 4098 checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) 4099 { 4100 if (!opt_master_verify_checksum) 4101 verify_checksum_once= true; 4102 my_b_seek(&log, pos); 4103 } 4104 else 4105 { 4106 my_off_t cur_pos= my_b_tell(&log); 4107 ulong next_event_len= 0; 4108 uchar buff[IO_SIZE]; 4109 while (cur_pos < pos) 4110 { 4111 my_b_seek(&log, cur_pos + EVENT_LEN_OFFSET); 4112 if (my_b_read(&log, (uchar *)buff, sizeof(next_event_len))) 4113 { 4114 mysql_mutex_unlock(log_lock); 4115 errmsg = "Could not read event_length"; 4116 goto err; 4117 } 4118 next_event_len= uint4korr(buff); 4119 cur_pos= cur_pos + next_event_len; 4120 } 4121 if (cur_pos > pos) 4122 { 4123 mysql_mutex_unlock(log_lock); 4124 errmsg= "Invalid input pos specified please provide valid one."; 4125 goto err; 4126 } 4127 my_b_seek(&log, cur_pos); 4128 } 4129 } 4130 4131 for (event_count = 0; 4132 (ev = Log_event::read_log_event(&log, 4133 description_event, 4134 (opt_master_verify_checksum || 4135 verify_checksum_once))); ) 4136 { 4137 if (event_count >= limit_start && 4138 ev->net_send(protocol, linfo.log_file_name, pos)) 4139 { 4140 errmsg = "Net error"; 4141 delete ev; 4142 mysql_mutex_unlock(log_lock); 4143 goto err; 4144 } 4145 4146 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) 4147 { 4148 Format_description_log_event* new_fdle= 4149 (Format_description_log_event*) ev; 4150 new_fdle->copy_crypto_data(description_event); 4151 delete description_event; 4152 description_event= new_fdle; 4153 } 4154 else 4155 { 4156 if (ev->get_type_code() == START_ENCRYPTION_EVENT) 4157 { 4158 if (description_event->start_decryption((Start_encryption_log_event*) ev)) 4159 { 4160 errmsg = "Error starting decryption"; 4161 delete ev; 4162 mysql_mutex_unlock(log_lock); 4163 goto err; 4164 } 4165 } 4166 delete ev; 4167 } 4168 4169 verify_checksum_once= false; 4170 pos = my_b_tell(&log); 4171 4172 if (++event_count >= limit_end) 4173 break; 4174 } 4175 4176 if (unlikely(event_count < limit_end && log.error)) 4177 { 4178 errmsg = "Wrong offset or I/O error"; 4179 mysql_mutex_unlock(log_lock); 4180 goto err; 4181 } 4182 4183 mysql_mutex_unlock(log_lock); 4184 } 4185 else if (mi) 4186 mi->release(); 4187 4188 // Check that linfo is still on the function scope. 4189 DEBUG_SYNC(thd, "after_show_binlog_events"); 4190 4191 ret= FALSE; 4192 4193 err: 4194 delete description_event; 4195 if (file >= 0) 4196 { 4197 end_io_cache(&log); 4198 mysql_file_close(file, MYF(MY_WME)); 4199 } 4200 4201 if (errmsg) 4202 my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0), 4203 "SHOW BINLOG EVENTS", errmsg); 4204 else 4205 my_eof(thd); 4206 4207 thd->reset_current_linfo(); 4208 thd->variables.max_allowed_packet= old_max_allowed_packet; 4209 DBUG_RETURN(ret); 4210 } 4211 4212 4213 void show_binlog_info_get_fields(THD *thd, List<Item> *field_list) 4214 { 4215 MEM_ROOT *mem_root= thd->mem_root; 4216 field_list->push_back(new (mem_root) 4217 Item_empty_string(thd, "File", FN_REFLEN), 4218 mem_root); 4219 field_list->push_back(new (mem_root) 4220 Item_return_int(thd, "Position", 20, 4221 MYSQL_TYPE_LONGLONG), 4222 mem_root); 4223 field_list->push_back(new (mem_root) 4224 Item_empty_string(thd, "Binlog_Do_DB", 255), 4225 mem_root); 4226 field_list->push_back(new (mem_root) 4227 Item_empty_string(thd, "Binlog_Ignore_DB", 255), 4228 mem_root); 4229 } 4230 4231 4232 /** 4233 Execute a SHOW MASTER STATUS statement. 4234 4235 @param thd Pointer to THD object for the client thread executing the 4236 statement. 4237 4238 @retval FALSE success 4239 @retval TRUE failure 4240 */ 4241 bool show_binlog_info(THD* thd) 4242 { 4243 Protocol *protocol= thd->protocol; 4244 DBUG_ENTER("show_binlog_info"); 4245 4246 List<Item> field_list; 4247 show_binlog_info_get_fields(thd, &field_list); 4248 4249 if (protocol->send_result_set_metadata(&field_list, 4250 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) 4251 DBUG_RETURN(TRUE); 4252 protocol->prepare_for_resend(); 4253 4254 if (mysql_bin_log.is_open()) 4255 { 4256 LOG_INFO li; 4257 mysql_bin_log.get_current_log(&li); 4258 size_t dir_len = dirname_length(li.log_file_name); 4259 protocol->store(li.log_file_name + dir_len, &my_charset_bin); 4260 protocol->store((ulonglong) li.pos); 4261 protocol->store(binlog_filter->get_do_db()); 4262 protocol->store(binlog_filter->get_ignore_db()); 4263 if (protocol->write()) 4264 DBUG_RETURN(TRUE); 4265 } 4266 my_eof(thd); 4267 DBUG_RETURN(FALSE); 4268 } 4269 4270 4271 void show_binlogs_get_fields(THD *thd, List<Item> *field_list) 4272 { 4273 MEM_ROOT *mem_root= thd->mem_root; 4274 field_list->push_back(new (mem_root) 4275 Item_empty_string(thd, "Log_name", 255), 4276 mem_root); 4277 field_list->push_back(new (mem_root) 4278 Item_return_int(thd, "File_size", 20, 4279 MYSQL_TYPE_LONGLONG), 4280 mem_root); 4281 } 4282 4283 4284 /** 4285 Execute a SHOW BINARY LOGS statement. 4286 4287 @param thd Pointer to THD object for the client thread executing the 4288 statement. 4289 4290 @retval FALSE success 4291 @retval TRUE failure 4292 */ 4293 bool show_binlogs(THD* thd) 4294 { 4295 IO_CACHE *index_file; 4296 LOG_INFO cur; 4297 File file; 4298 char fname[FN_REFLEN]; 4299 List<Item> field_list; 4300 size_t length; 4301 size_t cur_dir_len; 4302 Protocol *protocol= thd->protocol; 4303 DBUG_ENTER("show_binlogs"); 4304 4305 if (!mysql_bin_log.is_open()) 4306 { 4307 my_error(ER_NO_BINARY_LOGGING, MYF(0)); 4308 DBUG_RETURN(TRUE); 4309 } 4310 4311 show_binlogs_get_fields(thd, &field_list); 4312 4313 if (protocol->send_result_set_metadata(&field_list, 4314 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) 4315 DBUG_RETURN(TRUE); 4316 4317 mysql_mutex_lock(mysql_bin_log.get_log_lock()); 4318 mysql_bin_log.lock_index(); 4319 index_file=mysql_bin_log.get_index_file(); 4320 4321 mysql_bin_log.raw_get_current_log(&cur); // dont take mutex 4322 mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK 4323 4324 cur_dir_len= dirname_length(cur.log_file_name); 4325 4326 reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); 4327 4328 /* The file ends with EOF or empty line */ 4329 while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) 4330 { 4331 size_t dir_len; 4332 ulonglong file_length= 0; // Length if open fails 4333 fname[--length] = '\0'; // remove the newline 4334 4335 protocol->prepare_for_resend(); 4336 dir_len= dirname_length(fname); 4337 length-= dir_len; 4338 protocol->store(fname + dir_len, length, &my_charset_bin); 4339 4340 if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length))) 4341 file_length= cur.pos; /* The active log, use the active position */ 4342 else 4343 { 4344 /* this is an old log, open it and find the size */ 4345 if ((file= mysql_file_open(key_file_binlog, 4346 fname, O_RDONLY | O_SHARE | O_BINARY, 4347 MYF(0))) >= 0) 4348 { 4349 file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0)); 4350 mysql_file_close(file, MYF(0)); 4351 } 4352 } 4353 protocol->store(file_length); 4354 if (protocol->write()) 4355 goto err; 4356 } 4357 if (unlikely(index_file->error == -1)) 4358 goto err; 4359 mysql_bin_log.unlock_index(); 4360 my_eof(thd); 4361 DBUG_RETURN(FALSE); 4362 4363 err: 4364 mysql_bin_log.unlock_index(); 4365 DBUG_RETURN(TRUE); 4366 } 4367 4368 /** 4369 Load data's io cache specific hook to be executed 4370 before a chunk of data is being read into the cache's buffer 4371 The fuction instantianates and writes into the binlog 4372 replication events along LOAD DATA processing. 4373 4374 @param file pointer to io-cache 4375 @retval 0 success 4376 @retval 1 failure 4377 */ 4378 int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count) 4379 { 4380 DBUG_ENTER("log_loaded_block"); 4381 LOAD_FILE_IO_CACHE *lf_info= static_cast<LOAD_FILE_IO_CACHE*>(file); 4382 uint block_len; 4383 /* buffer contains position where we started last read */ 4384 uchar* buffer= (uchar*) my_b_get_buffer_start(file); 4385 uint max_event_size= lf_info->thd->variables.max_allowed_packet; 4386 4387 if (lf_info->thd->is_current_stmt_binlog_format_row()) 4388 goto ret; 4389 if (lf_info->last_pos_in_file != HA_POS_ERROR && 4390 lf_info->last_pos_in_file >= my_b_get_pos_in_file(file)) 4391 goto ret; 4392 4393 for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0; 4394 buffer += MY_MIN(block_len, max_event_size), 4395 block_len -= MY_MIN(block_len, max_event_size)) 4396 { 4397 lf_info->last_pos_in_file= my_b_get_pos_in_file(file); 4398 if (lf_info->wrote_create_file) 4399 { 4400 Append_block_log_event a(lf_info->thd, lf_info->thd->db.str, buffer, 4401 MY_MIN(block_len, max_event_size), 4402 lf_info->log_delayed); 4403 if (mysql_bin_log.write(&a)) 4404 DBUG_RETURN(1); 4405 } 4406 else 4407 { 4408 Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db.str, 4409 buffer, 4410 MY_MIN(block_len, max_event_size), 4411 lf_info->log_delayed); 4412 if (mysql_bin_log.write(&b)) 4413 DBUG_RETURN(1); 4414 lf_info->wrote_create_file= 1; 4415 } 4416 } 4417 ret: 4418 int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0; 4419 DBUG_RETURN(res); 4420 } 4421 4422 4423 /** 4424 Initialise the slave replication state from the mysql.gtid_slave_pos table. 4425 4426 This is called each time an SQL thread starts, but the data is only actually 4427 loaded on the first call. 4428 4429 The slave state is the last GTID applied on the slave within each 4430 replication domain. 4431 4432 To avoid row lock contention, there are multiple rows for each domain_id. 4433 The one containing the current slave state is the one with the maximal 4434 sub_id value, within each domain_id. 4435 4436 CREATE TABLE mysql.gtid_slave_pos ( 4437 domain_id INT UNSIGNED NOT NULL, 4438 sub_id BIGINT UNSIGNED NOT NULL, 4439 server_id INT UNSIGNED NOT NULL, 4440 seq_no BIGINT UNSIGNED NOT NULL, 4441 PRIMARY KEY (domain_id, sub_id)) 4442 */ 4443 4444 void 4445 rpl_init_gtid_slave_state() 4446 { 4447 rpl_global_gtid_slave_state= new rpl_slave_state; 4448 } 4449 4450 4451 void 4452 rpl_deinit_gtid_slave_state() 4453 { 4454 delete rpl_global_gtid_slave_state; 4455 } 4456 4457 4458 void 4459 rpl_init_gtid_waiting() 4460 { 4461 rpl_global_gtid_waiting.init(); 4462 } 4463 4464 4465 void 4466 rpl_deinit_gtid_waiting() 4467 { 4468 rpl_global_gtid_waiting.destroy(); 4469 } 4470 4471 4472 /* 4473 Format the current GTID state as a string, for returning the value of 4474 @@global.gtid_slave_pos. 4475 4476 If the flag use_binlog is true, then the contents of the binary log (if 4477 enabled) is merged into the current GTID state (@@global.gtid_current_pos). 4478 */ 4479 int 4480 rpl_append_gtid_state(String *dest, bool use_binlog) 4481 { 4482 int err; 4483 rpl_gtid *gtid_list= NULL; 4484 uint32 num_gtids= 0; 4485 4486 if (use_binlog && opt_bin_log && 4487 (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) 4488 return err; 4489 4490 err= rpl_global_gtid_slave_state->tostring(dest, gtid_list, num_gtids); 4491 my_free(gtid_list); 4492 4493 return err; 4494 } 4495 4496 4497 /* 4498 Load the current GTID position into a slave_connection_state, for use when 4499 connecting to a master server with GTID. 4500 4501 If the flag use_binlog is true, then the contents of the binary log (if 4502 enabled) is merged into the current GTID state (master_use_gtid=current_pos). 4503 */ 4504 int 4505 rpl_load_gtid_state(slave_connection_state *state, bool use_binlog) 4506 { 4507 int err; 4508 rpl_gtid *gtid_list= NULL; 4509 uint32 num_gtids= 0; 4510 4511 if (use_binlog && opt_bin_log && 4512 (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) 4513 return err; 4514 4515 err= state->load(rpl_global_gtid_slave_state, gtid_list, num_gtids); 4516 my_free(gtid_list); 4517 4518 return err; 4519 } 4520 4521 4522 bool 4523 rpl_gtid_pos_check(THD *thd, char *str, size_t len) 4524 { 4525 slave_connection_state tmp_slave_state; 4526 bool gave_conflict_warning= false, gave_missing_warning= false; 4527 4528 /* Check that we can parse the supplied string. */ 4529 if (tmp_slave_state.load(str, len)) 4530 return true; 4531 4532 /* 4533 Check our own binlog for any of our own transactions that are newer 4534 than the GTID state the user is requesting. Any such transactions would 4535 result in an out-of-order binlog, which could break anyone replicating 4536 with us as master. 4537 4538 So give an error if this is found, requesting the user to do a 4539 RESET MASTER (to clean up the binlog) if they really want this. 4540 */ 4541 if (mysql_bin_log.is_open()) 4542 { 4543 rpl_gtid *binlog_gtid_list= NULL; 4544 uint32 num_binlog_gtids= 0; 4545 uint32 i; 4546 4547 if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list, 4548 &num_binlog_gtids)) 4549 { 4550 my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); 4551 return true; 4552 } 4553 for (i= 0; i < num_binlog_gtids; ++i) 4554 { 4555 rpl_gtid *binlog_gtid= &binlog_gtid_list[i]; 4556 rpl_gtid *slave_gtid; 4557 if (binlog_gtid->server_id != global_system_variables.server_id) 4558 continue; 4559 if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id))) 4560 { 4561 if (opt_gtid_strict_mode) 4562 { 4563 my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0), 4564 binlog_gtid->domain_id, binlog_gtid->domain_id, 4565 binlog_gtid->server_id, binlog_gtid->seq_no); 4566 break; 4567 } 4568 else if (!gave_missing_warning) 4569 { 4570 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, 4571 ER_MASTER_GTID_POS_MISSING_DOMAIN, 4572 ER_THD(thd, ER_MASTER_GTID_POS_MISSING_DOMAIN), 4573 binlog_gtid->domain_id, binlog_gtid->domain_id, 4574 binlog_gtid->server_id, binlog_gtid->seq_no); 4575 gave_missing_warning= true; 4576 } 4577 } 4578 else if (slave_gtid->seq_no < binlog_gtid->seq_no) 4579 { 4580 if (opt_gtid_strict_mode) 4581 { 4582 my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0), 4583 slave_gtid->domain_id, slave_gtid->server_id, 4584 slave_gtid->seq_no, binlog_gtid->domain_id, 4585 binlog_gtid->server_id, binlog_gtid->seq_no); 4586 break; 4587 } 4588 else if (!gave_conflict_warning) 4589 { 4590 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, 4591 ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, 4592 ER_THD(thd, ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG), 4593 slave_gtid->domain_id, slave_gtid->server_id, 4594 slave_gtid->seq_no, binlog_gtid->domain_id, 4595 binlog_gtid->server_id, binlog_gtid->seq_no); 4596 gave_conflict_warning= true; 4597 } 4598 } 4599 } 4600 my_free(binlog_gtid_list); 4601 if (i != num_binlog_gtids) 4602 return true; 4603 } 4604 4605 return false; 4606 } 4607 4608 4609 bool 4610 rpl_gtid_pos_update(THD *thd, char *str, size_t len) 4611 { 4612 if (rpl_global_gtid_slave_state->load(thd, str, len, true, true)) 4613 { 4614 my_error(ER_FAILED_GTID_STATE_INIT, MYF(0)); 4615 return true; 4616 } 4617 else 4618 return false; 4619 } 4620 4621 int compare_log_name(const char *log_1, const char *log_2) { 4622 int res= 1; 4623 const char *ext1_str= strrchr(log_1, '.'); 4624 const char *ext2_str= strrchr(log_2, '.'); 4625 char file_name_1[255], file_name_2[255]; 4626 strmake(file_name_1, log_1, (ext1_str - log_1)); 4627 strmake(file_name_2, log_2, (ext2_str - log_2)); 4628 char *endptr = NULL; 4629 res= strcmp(file_name_1, file_name_2); 4630 if (!res) 4631 { 4632 ulong ext1= strtoul(++ext1_str, &endptr, 10); 4633 ulong ext2= strtoul(++ext2_str, &endptr, 10); 4634 res= (ext1 > ext2 ? 1 : ((ext1 == ext2) ? 0 : -1)); 4635 } 4636 return res; 4637 } 4638 4639 #endif /* HAVE_REPLICATION */ 4640