1 /* Copyright (c) 2006, 2017, Oracle and/or its affiliates. 2 Copyright (c) 2010, 2020, MariaDB Corporation. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation; version 2 of the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software Foundation, 15 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ 16 17 #include "mariadb.h" 18 #include "sql_priv.h" 19 #include "unireg.h" // HAVE_* 20 #include "rpl_mi.h" 21 #include "rpl_rli.h" 22 #include "sql_base.h" // close_thread_tables 23 #include <my_dir.h> // For MY_STAT 24 #include "sql_repl.h" // For check_binlog_magic 25 #include "log_event.h" // Format_description_log_event, Log_event, 26 // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT, 27 // PREFIX_SQL_LOAD 28 #include "rpl_utility.h" 29 #include "transaction.h" 30 #include "sql_parse.h" // end_trans, ROLLBACK 31 #include "slave.h" 32 #include <mysql/plugin.h> 33 #include <mysql/service_thd_wait.h> 34 #include "lock.h" 35 #include "sql_table.h" 36 37 static int count_relay_log_space(Relay_log_info* rli); 38 39 /** 40 Current replication state (hash of last GTID executed, per replication 41 domain). 42 */ 43 rpl_slave_state *rpl_global_gtid_slave_state; 44 /* Object used for MASTER_GTID_WAIT(). */ 45 gtid_waiting rpl_global_gtid_waiting; 46 47 const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event"; 48 49 Relay_log_info::Relay_log_info(bool is_slave_recovery) 50 :Slave_reporting_capability("SQL"), 51 replicate_same_server_id(::replicate_same_server_id), 52 info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), 53 sync_counter(0), is_relay_log_recovery(is_slave_recovery), 54 save_temporary_tables(0), 55 mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0), 56 cur_log_old_open_count(0), error_on_rli_init_info(false), 57 group_relay_log_pos(0), event_relay_log_pos(0), 58 group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), 59 last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), 60 abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), 61 gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), 62 slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE), 63 until_log_pos(0), retried_trans(0), executed_entries(0), 64 sql_delay(0), sql_delay_end(0), EXTRA_DEBUG_fprintf(...)65 until_relay_log_names_defer(false), 66 m_flags(0) 67 { 68 DBUG_ENTER("Relay_log_info::Relay_log_info"); 69 70 relay_log.is_relay_log= TRUE; 71 relay_log_state.init(); 72 #ifdef HAVE_PSI_INTERFACE 73 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, 74 key_RELAYLOG_COND_relay_log_updated, 75 key_RELAYLOG_COND_bin_log_updated, 76 key_file_relaylog, 77 key_file_relaylog_index, 78 key_RELAYLOG_COND_queue_busy, 79 key_LOCK_relaylog_end_pos); 80 #endif 81 82 group_relay_log_name[0]= event_relay_log_name[0]= 83 group_master_log_name[0]= 0; 84 until_log_name[0]= ign_master_log_name_end[0]= 0; 85 max_relay_log_size= global_system_variables.max_relay_log_size; 86 bzero((char*) &info_file, sizeof(info_file)); 87 bzero((char*) &cache_buf, sizeof(cache_buf)); 88 mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); 89 mysql_mutex_init(key_relay_log_info_data_lock, 90 &data_lock, MY_MUTEX_INIT_FAST); 91 mysql_mutex_init(key_relay_log_info_log_space_lock, 92 &log_space_lock, MY_MUTEX_INIT_FAST); 93 mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL); 94 mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); 95 mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); 96 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); 97 relay_log.init_pthread_objects(); 98 DBUG_VOID_RETURN; 99 } 100 101 102 Relay_log_info::~Relay_log_info() 103 { 104 DBUG_ENTER("Relay_log_info::~Relay_log_info"); 105 106 reset_inuse_relaylog(); 107 mysql_mutex_destroy(&run_lock); 108 mysql_mutex_destroy(&data_lock); 109 mysql_mutex_destroy(&log_space_lock); 110 mysql_cond_destroy(&data_cond); 111 mysql_cond_destroy(&start_cond); 112 mysql_cond_destroy(&stop_cond); 113 mysql_cond_destroy(&log_space_cond); 114 relay_log.cleanup(); 115 DBUG_VOID_RETURN; 116 } 117 118 119 /** 120 Read the relay_log.info file. 121 122 @param info_fname The name of the file to read from. 123 @retval 0 success 124 @retval 1 failure 125 */ 126 int Relay_log_info::init(const char* info_fname) 127 { 128 char fname[FN_REFLEN+128]; 129 const char* msg = 0; 130 int error = 0; 131 mysql_mutex_t *log_lock; 132 DBUG_ENTER("Relay_log_info::init"); 133 134 if (inited) // Set if this function called 135 DBUG_RETURN(0); 136 my_net_init(NET * net,Vio * vio,void * thd,uint my_flags)137 log_lock= relay_log.get_log_lock(); 138 fn_format(fname, info_fname, mysql_data_home, "", 4+32); 139 mysql_mutex_lock(&data_lock); 140 cur_log_fd = -1; 141 slave_skip_counter=0; 142 abort_pos_wait=0; 143 log_space_limit= relay_log_space_limit; 144 log_space_total= 0; 145 146 if (unlikely(error_on_rli_init_info)) 147 goto err; 148 149 char pattern[FN_REFLEN]; 150 (void) my_realpath(pattern, slave_load_tmpdir, 0); 151 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "", 152 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS) 153 { 154 mysql_mutex_unlock(&data_lock); 155 sql_print_error("Unable to use slave's temporary directory %s", 156 slave_load_tmpdir); 157 DBUG_RETURN(1); 158 } 159 unpack_filename(slave_patternload_file, pattern); 160 slave_patternload_file_size= strlen(slave_patternload_file); 161 162 /* 163 The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE. 164 Note that the I/O thread flushes it to disk after writing every 165 event, in flush_master_info(mi, 1, ?). 166 */ 167 168 { 169 /* Reports an error and returns, if the --relay-log's path 170 is a directory.*/ 171 if (opt_relay_logname && 172 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR) 173 { 174 mysql_mutex_unlock(&data_lock); 175 sql_print_error("Path '%s' is a directory name, please specify \ 176 a file name for --relay-log option", opt_relay_logname); net_allocate_new_packet(NET * net,void * thd,uint my_flags)177 DBUG_RETURN(1); 178 } 179 180 /* Reports an error and returns, if the --relay-log-index's path 181 is a directory.*/ 182 if (opt_relaylog_index_name && 183 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] 184 == FN_LIBCHAR) 185 { 186 mysql_mutex_unlock(&data_lock); 187 sql_print_error("Path '%s' is a directory name, please specify \ 188 a file name for --relay-log-index option", opt_relaylog_index_name); 189 DBUG_RETURN(1); net_end(NET * net)190 } 191 192 char buf[FN_REFLEN]; 193 const char *ln; 194 static bool name_warning_sent= 0; 195 ln= relay_log.generate_name(opt_relay_logname, "-relay-bin", 196 1, buf); 197 /* We send the warning only at startup, not after every RESET SLAVE */ 198 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent && 199 !opt_bootstrap) 200 { net_realloc(NET * net,size_t length)201 /* 202 User didn't give us info to name the relay log index file. 203 Picking `hostname`-relay-bin.index like we do, causes replication to 204 fail if this slave's hostname is changed later. So, we would like to 205 instead require a name. But as we don't want to break many existing 206 setups, we only give warning, not error. 207 */ 208 sql_print_warning("Neither --relay-log nor --relay-log-index were used;" 209 " so replication " 210 "may break when this MariaDB server acts as a " 211 "replica and has its hostname changed. Please " 212 "use '--log-basename=#' or '--relay-log=%s' to avoid " 213 "this problem.", ln); 214 name_warning_sent= 1; 215 } 216 217 /* For multimaster, add connection name to relay log filenames */ 218 char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN]; 219 char *buf_relaylog_index_name= opt_relaylog_index_name; 220 221 create_logfile_name_with_suffix(buf_relay_logname, 222 sizeof(buf_relay_logname), 223 ln, 1, &mi->cmp_connection_name); 224 ln= buf_relay_logname; 225 226 if (opt_relaylog_index_name) 227 { 228 buf_relaylog_index_name= buf_relaylog_index_name_buff; 229 create_logfile_name_with_suffix(buf_relaylog_index_name_buff, 230 sizeof(buf_relaylog_index_name_buff), 231 opt_relaylog_index_name, 0, 232 &mi->cmp_connection_name); 233 } 234 235 /* 236 note, that if open() fails, we'll still have index file open 237 but a destructor will take care of that 238 */ 239 mysql_mutex_lock(log_lock); 240 if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || 241 relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, 242 (ulong)max_relay_log_size, 1, TRUE)) 243 { 244 mysql_mutex_unlock(log_lock); 245 mysql_mutex_unlock(&data_lock); 246 sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %M", ln, my_errno); 247 DBUG_RETURN(1); 248 } 249 mysql_mutex_unlock(log_lock); 250 } 251 252 /* if file does not exist */ 253 if (access(fname,F_OK)) 254 { 255 /* 256 If someone removed the file from underneath our feet, just close 257 the old descriptor and re-create the old file net_data_is_ready(my_socket sd)258 */ 259 if (info_fd >= 0) 260 mysql_file_close(info_fd, MYF(MY_WME)); 261 if ((info_fd= mysql_file_open(key_file_relay_log_info, 262 fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0) 263 { 264 sql_print_error("Failed to create a new relay log info file (" 265 "file '%s', errno %d)", fname, my_errno); 266 msg= current_thd->get_stmt_da()->message(); 267 goto err; 268 } 269 if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, 270 MYF(MY_WME))) 271 { 272 sql_print_error("Failed to create a cache on relay log info file '%s'", 273 fname); 274 msg= current_thd->get_stmt_da()->message(); 275 goto err; 276 } 277 278 /* Init relay log with first entry in the relay index file */ 279 if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, 280 &msg, 0)) 281 { 282 sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)"); 283 goto err; 284 } 285 group_master_log_name[0]= 0; 286 group_master_log_pos= 0; 287 } 288 else // file exists 289 { 290 if (info_fd >= 0) 291 reinit_io_cache(&info_file, READ_CACHE, 0L,0,0); 292 else 293 { 294 int error=0; 295 if ((info_fd= mysql_file_open(key_file_relay_log_info, 296 fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0) 297 { 298 sql_print_error("\ 299 Failed to open the existing relay log info file '%s' (errno %d)", 300 fname, my_errno); 301 error= 1; 302 } 303 else if (init_io_cache(&info_file, info_fd, 304 IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME))) 305 { 306 sql_print_error("Failed to create a cache on relay log info file '%s'", 307 fname); 308 error= 1; 309 } 310 if (unlikely(error)) 311 { 312 if (info_fd >= 0) 313 mysql_file_close(info_fd, MYF(0)); 314 info_fd= -1; 315 mysql_mutex_lock(log_lock); 316 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); 317 mysql_mutex_unlock(log_lock); 318 mysql_mutex_unlock(&data_lock); 319 DBUG_RETURN(1); net_clear(NET * net,my_bool clear_buffer)320 } 321 } 322 323 int relay_log_pos, master_log_pos, lines; 324 char *first_non_digit; 325 326 /* 327 Starting from MySQL 5.6.x, relay-log.info has a new format. 328 Now, its first line contains the number of lines in the file. 329 By reading this number we can determine which version our master.info 330 comes from. We can't simply count the lines in the file, since 331 versions before 5.6.x could generate files with more lines than 332 needed. If first line doesn't contain a number, or if it 333 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY, 334 then the file is treated like a file from pre-5.6.x version. 335 There is no ambiguity when reading an old master.info: before 336 5.6.x, the first line contained the binlog's name, which is 337 either empty or has an extension (contains a '.'), so can't be 338 confused with an integer. 339 340 So we're just reading first line and trying to figure which 341 version is this. 342 */ 343 344 /* 345 The first row is temporarily stored in mi->master_log_name, if 346 it is line count and not binlog name (new format) it will be 347 overwritten by the second row later. 348 */ 349 if (init_strvar_from_file(group_relay_log_name, 350 sizeof(group_relay_log_name), 351 &info_file, "")) 352 { 353 msg="Error reading slave log configuration"; 354 goto err; 355 } 356 357 lines= strtoul(group_relay_log_name, &first_non_digit, 10); 358 359 if (group_relay_log_name[0] != '\0' && 360 *first_non_digit == '\0' && 361 lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) 362 { 363 DBUG_PRINT("info", ("relay_log_info file is in new format.")); 364 /* Seems to be new format => read relay log name from next line */ 365 if (init_strvar_from_file(group_relay_log_name, 366 sizeof(group_relay_log_name), 367 &info_file, "")) 368 { 369 msg="Error reading slave log configuration"; 370 goto err; 371 } 372 } 373 else 374 DBUG_PRINT("info", ("relay_log_info file is in old format.")); 375 376 if (init_intvar_from_file(&relay_log_pos, 377 &info_file, BIN_LOG_HEADER_SIZE) || net_flush(NET * net)378 init_strvar_from_file(group_master_log_name, 379 sizeof(group_master_log_name), 380 &info_file, "") || 381 init_intvar_from_file(&master_log_pos, &info_file, 0) || 382 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY && 383 init_intvar_from_file(&sql_delay, &info_file, 0))) 384 { 385 msg="Error reading slave log configuration"; 386 goto err; 387 } 388 389 strmake_buf(event_relay_log_name,group_relay_log_name); 390 group_relay_log_pos= event_relay_log_pos= relay_log_pos; 391 group_master_log_pos= master_log_pos; 392 393 if (is_relay_log_recovery && init_recovery(mi, &msg)) 394 goto err; 395 396 relay_log_state.load(rpl_global_gtid_slave_state); 397 if (init_relay_log_pos(this, 398 group_relay_log_name, 399 group_relay_log_pos, 400 0 /* no data lock*/, 401 &msg, 0)) 402 { 403 sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)", 404 group_relay_log_name, group_relay_log_pos); 405 goto err; 406 } 407 } my_net_write(NET * net,const uchar * packet,size_t len)408 409 DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu", 410 my_b_tell(cur_log), event_relay_log_pos)); 411 DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE); 412 DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos); 413 414 /* 415 Now change the cache from READ to WRITE - must do this 416 before Relay_log_info::flush() 417 */ 418 reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1); 419 if (unlikely((error= flush()))) 420 { 421 msg= "Failed to flush relay log info file"; 422 goto err; 423 } 424 if (count_relay_log_space(this)) 425 { 426 msg="Error counting relay log space"; 427 goto err; 428 } 429 inited= 1; 430 error_on_rli_init_info= false; 431 mysql_mutex_unlock(&data_lock); 432 DBUG_RETURN(0); 433 434 err: 435 error_on_rli_init_info= true; 436 if (msg) 437 sql_print_error("%s", msg); 438 end_io_cache(&info_file); 439 if (info_fd >= 0) 440 mysql_file_close(info_fd, MYF(0)); 441 info_fd= -1; 442 mysql_mutex_lock(log_lock); 443 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); 444 mysql_mutex_unlock(log_lock); 445 mysql_mutex_unlock(&data_lock); 446 DBUG_RETURN(1); 447 } 448 449 450 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo) 451 { 452 MY_STAT s; 453 DBUG_ENTER("add_relay_log"); 454 if (!mysql_file_stat(key_file_relaylog, 455 linfo->log_file_name, &s, MYF(0))) 456 { 457 sql_print_error("log %s listed in the index, but failed to stat", 458 linfo->log_file_name); 459 DBUG_RETURN(1); 460 } 461 my_atomic_add64_explicit((volatile int64*)(&rli->log_space_total), 462 s.st_size, MY_MEMORY_ORDER_RELAXED); 463 DBUG_PRINT("info",("log_space_total: %llu", rli->log_space_total)); 464 DBUG_RETURN(0); 465 } 466 467 468 static int count_relay_log_space(Relay_log_info* rli) 469 { 470 LOG_INFO linfo; 471 DBUG_ENTER("count_relay_log_space"); 472 my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0, 473 MY_MEMORY_ORDER_RELAXED); 474 if (rli->relay_log.find_log_pos(&linfo, NullS, 1)) 475 { 476 sql_print_error("Could not find first log while counting relay log space"); 477 DBUG_RETURN(1); 478 } 479 do 480 { 481 if (add_relay_log(rli,&linfo)) net_write_command(NET * net,uchar command,const uchar * header,size_t head_len,const uchar * packet,size_t len)482 DBUG_RETURN(1); 483 } while (!rli->relay_log.find_next_log(&linfo, 1)); 484 /* 485 As we have counted everything, including what may have written in a 486 preceding write, we must reset bytes_written, or we may count some space 487 twice. 488 */ 489 rli->relay_log.reset_bytes_written(); 490 DBUG_RETURN(0); 491 } 492 493 494 /* 495 Reset UNTIL condition for Relay_log_info 496 497 SYNOPSYS 498 clear_until_condition() 499 rli - Relay_log_info structure where UNTIL condition should be reset 500 */ 501 502 void Relay_log_info::clear_until_condition() 503 { 504 DBUG_ENTER("clear_until_condition"); 505 506 until_condition= Relay_log_info::UNTIL_NONE; 507 until_log_name[0]= 0; 508 until_log_pos= 0; 509 until_relay_log_names_defer= false; 510 511 DBUG_VOID_RETURN; 512 } 513 514 515 /* 516 Read the correct format description event for starting to replicate from 517 a given position in a relay log file. 518 */ 519 Format_description_log_event * 520 read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, 521 const char **errmsg) 522 { 523 Log_event *ev; 524 Format_description_log_event *fdev; 525 bool found= false; 526 527 /* 528 By default the relay log is in binlog format 3 (4.0). 529 Even if format is 4, this will work enough to read the first event 530 (Format_desc) (remember that format 4 is just lenghtened compared to format 531 3; format 3 is a prefix of format 4). 532 */ 533 fdev= new Format_description_log_event(3); 534 535 while (!found) 536 { 537 Log_event_type typ; 538 539 /* 540 Read the possible Format_description_log_event; if position 541 was 4, no need, it will be read naturally. 542 */ 543 DBUG_PRINT("info",("looking for a Format_description_log_event")); 544 545 if (my_b_tell(cur_log) >= start_pos) 546 break; 547 548 if (!(ev= Log_event::read_log_event(cur_log, fdev, 549 opt_slave_sql_verify_checksum))) 550 { 551 DBUG_PRINT("info",("could not read event, cur_log->error=%d", 552 cur_log->error)); 553 if (cur_log->error) /* not EOF */ 554 { 555 *errmsg= "I/O error reading event at position 4"; 556 delete fdev; 557 return NULL; 558 } 559 break; 560 } 561 typ= ev->get_type_code(); 562 if (typ == FORMAT_DESCRIPTION_EVENT) 563 { 564 Format_description_log_event *old= fdev; 565 DBUG_PRINT("info",("found Format_description_log_event")); 566 fdev= (Format_description_log_event*) ev; net_write_buff(NET * net,const uchar * packet,size_t len)567 fdev->copy_crypto_data(old); 568 delete old; 569 570 /* 571 As ev was returned by read_log_event, it has passed is_valid(), so 572 my_malloc() in ctor worked, no need to check again. 573 */ 574 /* 575 Ok, we found a Format_description event. But it is not sure that this 576 describes the whole relay log; indeed, one can have this sequence 577 (starting from position 4): 578 Format_desc (of slave) 579 Rotate (of master) 580 Format_desc (of master) 581 So the Format_desc which really describes the rest of the relay log 582 is the 3rd event (it can't be further than that, because we rotate 583 the relay log when we queue a Rotate event from the master). 584 But what describes the Rotate is the first Format_desc. 585 So what we do is: 586 go on searching for Format_description events, until you exceed the 587 position (argument 'pos') or until you find another event than Rotate 588 or Format_desc. 589 */ 590 } 591 else if (typ == START_ENCRYPTION_EVENT) 592 { 593 if (fdev->start_decryption((Start_encryption_log_event*) ev)) 594 { 595 *errmsg= "Unable to set up decryption of binlog."; 596 delete ev; 597 delete fdev; 598 return NULL; 599 } 600 delete ev; 601 } 602 else 603 { 604 DBUG_PRINT("info",("found event of another type=%d", typ)); 605 found= (typ != ROTATE_EVENT); 606 delete ev; 607 } 608 } 609 return fdev; 610 } 611 612 613 /* 614 Open the given relay log 615 616 SYNOPSIS 617 init_relay_log_pos() 618 rli Relay information (will be initialized) 619 log Name of relay log file to read from. NULL = First log 620 pos Position in relay log file 621 need_data_lock Set to 1 if this functions should do mutex locks 622 errmsg Store pointer to error message here 623 look_for_description_event 624 1 if we should look for such an event. We only need 625 this when the SQL thread starts and opens an existing net_real_write(NET * net,const uchar * packet,size_t len)626 relay log and has to execute it (possibly from an 627 offset >4); then we need to read the first event of 628 the relay log to be able to parse the events we have 629 to execute. 630 631 DESCRIPTION 632 - Close old open relay log files. 633 - If we are using the same relay log as the running IO-thread, then set 634 rli->cur_log to point to the same IO_CACHE entry. 635 - If not, open the 'log' binary file. 636 637 TODO 638 - check proper initialization of group_master_log_name/group_master_log_pos 639 640 RETURN VALUES 641 0 ok 642 1 error. errmsg is set to point to the error message 643 */ 644 645 int init_relay_log_pos(Relay_log_info* rli,const char* log, 646 ulonglong pos, bool need_data_lock, 647 const char** errmsg, 648 bool look_for_description_event) 649 { 650 DBUG_ENTER("init_relay_log_pos"); 651 DBUG_PRINT("info", ("pos: %lu", (ulong) pos)); 652 653 *errmsg=0; 654 mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); 655 656 if (need_data_lock) 657 mysql_mutex_lock(&rli->data_lock); 658 659 /* 660 Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER 661 is, too, and init_slave() too; these 2 functions allocate a description 662 event in init_relay_log_pos, which is not freed by the terminating SQL slave 663 thread as that thread is not started by these functions. So we have to free 664 the description_event here, in case, so that there is no memory leak in 665 running, say, CHANGE MASTER. 666 */ 667 delete rli->relay_log.description_event_for_exec; 668 /* 669 By default the relay log is in binlog format 3 (4.0). 670 Even if format is 4, this will work enough to read the first event 671 (Format_desc) (remember that format 4 is just lenghtened compared to format 672 3; format 3 is a prefix of format 4). 673 */ 674 rli->relay_log.description_event_for_exec= new 675 Format_description_log_event(3); 676 677 mysql_mutex_lock(log_lock); 678 679 /* Close log file and free buffers if it's already open */ 680 if (rli->cur_log_fd >= 0) 681 { 682 end_io_cache(&rli->cache_buf); 683 mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); 684 rli->cur_log_fd = -1; 685 } 686 687 rli->group_relay_log_pos = rli->event_relay_log_pos = pos; 688 rli->clear_flag(Relay_log_info::IN_STMT); 689 rli->clear_flag(Relay_log_info::IN_TRANSACTION); 690 691 /* 692 Test to see if the previous run was with the skip of purging 693 If yes, we do not purge when we restart 694 */ 695 if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1)) 696 { 697 *errmsg="Could not find first log during relay log initialization"; 698 goto err; 699 } 700 701 if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1)) 702 { 703 *errmsg="Could not find target log during relay log initialization"; 704 goto err; 705 } 706 strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name); 707 strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name); 708 if (rli->relay_log.is_active(rli->linfo.log_file_name)) 709 { 710 /* 711 The IO thread is using this log file. 712 In this case, we will use the same IO_CACHE pointer to 713 read data as the IO thread is using to write data. 714 */ 715 my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0); 716 if (check_binlog_magic(rli->cur_log,errmsg)) 717 goto err; 718 rli->cur_log_old_open_count=rli->relay_log.get_open_count(); 719 } 720 else 721 { 722 /* 723 Open the relay log and set rli->cur_log to point at this one 724 */ 725 if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, 726 rli->linfo.log_file_name,errmsg)) < 0) 727 goto err; 728 rli->cur_log = &rli->cache_buf; 729 } 730 /* 731 In all cases, check_binlog_magic() has been called so we're at offset 4 for 732 sure. 733 */ 734 if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ 735 { 736 if (look_for_description_event) 737 { 738 Format_description_log_event *fdev; 739 if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg))) 740 goto err; 741 delete rli->relay_log.description_event_for_exec; 742 rli->relay_log.description_event_for_exec= fdev; 743 } 744 my_b_seek(rli->cur_log,(off_t)pos); 745 DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu", 746 my_b_tell(rli->cur_log), rli->event_relay_log_pos)); 747 748 } 749 750 err: 751 /* 752 If we don't purge, we can't honour relay_log_space_limit ; 753 silently discard it 754 */ 755 if (!relay_log_purge) 756 rli->log_space_limit= 0; 757 mysql_cond_broadcast(&rli->data_cond); 758 759 mysql_mutex_unlock(log_lock); 760 761 if (need_data_lock) 762 mysql_mutex_unlock(&rli->data_lock); 763 if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg) 764 *errmsg= "Invalid Format_description log event; could be out of memory"; 765 766 DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0)); 767 768 DBUG_RETURN ((*errmsg) ? 1 : 0); 769 } net_safe_read(NET * net,uchar * buff,size_t length,thr_alarm_t * alarmed)770 771 772 /* 773 Waits until the SQL thread reaches (has executed up to) the 774 log/position or timed out. 775 776 SYNOPSIS 777 wait_for_pos() 778 thd client thread that sent SELECT MASTER_POS_WAIT 779 log_name log name to wait for 780 log_pos position to wait for 781 timeout timeout in seconds before giving up waiting 782 783 NOTES 784 timeout is longlong whereas it should be ulong ; but this is 785 to catch if the user submitted a negative timeout. 786 787 RETURN VALUES 788 -2 improper arguments (log_pos<0) 789 or slave not running, or master info changed 790 during the function's execution, 791 or client thread killed. -2 is translated to NULL by caller 792 -1 timed out 793 >=0 number of log events the function had to wait 794 before reaching the desired log/position 795 */ 796 797 int Relay_log_info::wait_for_pos(THD* thd, String* log_name, 798 longlong log_pos, 799 longlong timeout) 800 { 801 int event_count = 0; 802 ulong init_abort_pos_wait; 803 int error=0; 804 struct timespec abstime; // for timeout checking 805 PSI_stage_info old_stage; 806 DBUG_ENTER("Relay_log_info::wait_for_pos"); 807 808 if (!inited) 809 DBUG_RETURN(-2); 810 811 DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu", 812 log_name->c_ptr(), (ulong) log_pos, (ulong) timeout)); 813 814 set_timespec(abstime,timeout); 815 mysql_mutex_lock(&data_lock); 816 thd->ENTER_COND(&data_cond, &data_lock, 817 &stage_waiting_for_the_slave_thread_to_advance_position, 818 &old_stage); 819 /* 820 This function will abort when it notices that some CHANGE MASTER or 821 RESET MASTER has changed the master info. 822 To catch this, these commands modify abort_pos_wait ; We just monitor 823 abort_pos_wait and see if it has changed. 824 Why do we have this mechanism instead of simply monitoring slave_running 825 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that 826 the SQL thread be stopped? 827 This is becasue if someones does: 828 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE; 829 the change may happen very quickly and we may not notice that 830 slave_running briefly switches between 1/0/1. 831 */ 832 init_abort_pos_wait= abort_pos_wait; 833 834 /* 835 We'll need to 836 handle all possible log names comparisons (e.g. 999 vs 1000). 837 We use ulong for string->number conversion ; this is no 838 stronger limitation than in find_uniq_filename in sql/log.cc 839 */ 840 ulong log_name_extension; 841 char log_name_tmp[FN_REFLEN]; //make a char[] from String 842 843 strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1)); 844 845 char *p= fn_ext(log_name_tmp); 846 char *p_end; 847 if (!*p || log_pos<0) 848 { 849 error= -2; //means improper arguments 850 goto err; 851 } 852 // Convert 0-3 to 4 853 log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE); 854 /* p points to '.' */ 855 log_name_extension= strtoul(++p, &p_end, 10); 856 /* 857 p_end points to the first invalid character. 858 If it equals to p, no digits were found, error. 859 If it contains '\0' it means conversion went ok. 860 */ 861 if (p_end==p || *p_end) 862 { 863 error= -2; 864 goto err; 865 } 866 867 /* The "compare and wait" main loop */ 868 while (!thd->killed && 869 init_abort_pos_wait == abort_pos_wait && 870 slave_running) 871 { 872 bool pos_reached; 873 int cmp_result= 0; 874 875 DBUG_PRINT("info", 876 ("init_abort_pos_wait: %ld abort_pos_wait: %ld", 877 init_abort_pos_wait, abort_pos_wait)); 878 DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu", 879 group_master_log_name, (ulong) group_master_log_pos)); 880 881 /* 882 group_master_log_name can be "", if we are just after a fresh 883 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT 884 (before we have executed one Rotate event from the master) or 885 (rare) if the user is doing a weird slave setup (see next 886 paragraph). If group_master_log_name is "", we assume we don't 887 have enough info to do the comparison yet, so we just wait until 888 more data. In this case master_log_pos is always 0 except if 889 somebody (wrongly) sets this slave to be a slave of itself 890 without using --replicate-same-server-id (an unsupported 891 configuration which does nothing), then group_master_log_pos 892 will grow and group_master_log_name will stay "". 893 */ 894 if (*group_master_log_name) 895 { 896 char *basename= (group_master_log_name + 897 dirname_length(group_master_log_name)); 898 /* 899 First compare the parts before the extension. 900 Find the dot in the master's log basename, 901 and protect against user's input error : 902 if the names do not match up to '.' included, return error 903 */ 904 char *q= (char*)(fn_ext(basename)+1); 905 if (strncmp(basename, log_name_tmp, (int)(q-basename))) 906 { 907 error= -2; 908 break; 909 } 910 // Now compare extensions. 911 char *q_end; 912 ulong group_master_log_name_extension= strtoul(q, &q_end, 10); 913 if (group_master_log_name_extension < log_name_extension) 914 cmp_result= -1 ; 915 else 916 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ; 917 918 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) || 919 cmp_result > 0); 920 if (pos_reached || thd->killed) 921 break; 922 } 923 924 //wait for master update, with optional timeout. 925 926 DBUG_PRINT("info",("Waiting for master update")); 927 /* 928 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it 929 will wake us up. 930 */ 931 thd_wait_begin(thd, THD_WAIT_BINLOG); 932 if (timeout > 0) 933 { 934 /* 935 Note that mysql_cond_timedwait checks for the timeout 936 before for the condition ; i.e. it returns ETIMEDOUT 937 if the system time equals or exceeds the time specified by abstime 938 before the condition variable is signaled or broadcast, _or_ if 939 the absolute time specified by abstime has already passed at the time 940 of the call. 941 For that reason, mysql_cond_timedwait will do the "timeoutting" job 942 even if its condition is always immediately signaled (case of a loaded 943 master). 944 */ 945 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime); 946 } 947 else 948 mysql_cond_wait(&data_cond, &data_lock); 949 thd_wait_end(thd); 950 DBUG_PRINT("info",("Got signal of master update or timed out")); 951 if (error == ETIMEDOUT || error == ETIME) 952 { 953 error= -1; 954 break; 955 } 956 error=0; 957 event_count++; 958 DBUG_PRINT("info",("Testing if killed or SQL thread not running")); 959 } 960 961 err: 962 thd->EXIT_COND(&old_stage); 963 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \ 964 improper_arguments: %d timed_out: %d", 965 thd->killed_errno(), 966 (int) (init_abort_pos_wait != abort_pos_wait), 967 (int) slave_running, 968 (int) (error == -2), 969 (int) (error == -1))); 970 if (thd->killed || init_abort_pos_wait != abort_pos_wait || 971 !slave_running) 972 { 973 error= -2; 974 } 975 DBUG_RETURN( error ? error : event_count ); 976 } 977 978 979 void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, 980 rpl_group_info *rgi, 981 bool skip_lock) 982 { 983 DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos"); 984 985 if (skip_lock) 986 mysql_mutex_assert_owner(&data_lock); 987 else 988 mysql_mutex_lock(&data_lock); 989 990 rgi->inc_event_relay_log_pos(); 991 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", 992 (long) log_pos, (long) group_master_log_pos)); 993 if (rgi->is_parallel_exec) 994 { 995 /* In case of parallel replication, do not update the position backwards. */ 996 int cmp= compare_log_name(group_relay_log_name, rgi->event_relay_log_name); 997 if (cmp < 0) 998 { 999 group_relay_log_pos= rgi->future_event_relay_log_pos; 1000 strmake_buf(group_relay_log_name, rgi->event_relay_log_name); 1001 } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos) 1002 group_relay_log_pos= rgi->future_event_relay_log_pos; 1003 1004 /* 1005 In the parallel case we need to update the master_log_name here, rather 1006 than in Rotate_log_event::do_update_pos(). 1007 */ 1008 cmp= compare_log_name(group_master_log_name, rgi->future_event_master_log_name); 1009 if (cmp <= 0) 1010 { 1011 if (cmp < 0) 1012 { 1013 strcpy(group_master_log_name, rgi->future_event_master_log_name); 1014 group_master_log_pos= log_pos; 1015 } 1016 else if (group_master_log_pos < log_pos) 1017 group_master_log_pos= log_pos; 1018 } 1019 1020 /* 1021 In the parallel case, we only update the Seconds_Behind_Master at the 1022 end of a transaction. In the non-parallel case, the value is updated as 1023 soon as an event is read from the relay log; however this would be too 1024 confusing for the user, seeing the slave reported as up-to-date when 1025 potentially thousands of events are still queued up for worker threads 1026 waiting for execution. 1027 */ 1028 if (rgi->last_master_timestamp && 1029 rgi->last_master_timestamp > last_master_timestamp) 1030 last_master_timestamp= rgi->last_master_timestamp; 1031 } 1032 else 1033 { 1034 /* Non-parallel case. */ 1035 group_relay_log_pos= event_relay_log_pos; 1036 strmake_buf(group_relay_log_name, event_relay_log_name); 1037 notify_group_relay_log_name_update(); 1038 if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event 1039 group_master_log_pos= log_pos; 1040 } 1041 1042 /* 1043 If the slave does not support transactions and replicates a transaction, 1044 users should not trust group_master_log_pos (which they can display with 1045 SHOW SLAVE STATUS or read from relay-log.info), because to compute 1046 group_master_log_pos the slave relies on log_pos stored in the master's 1047 binlog, but if we are in a master's transaction these positions are always 1048 the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does 1049 not advance as it should on the non-transactional slave (it advances by 1050 big leaps, whereas it should advance by small leaps). 1051 */ 1052 /* 1053 In 4.x we used the event's len to compute the positions here. This is 1054 wrong if the event was 3.23/4.0 and has been converted to 5.0, because 1055 then the event's len is not what is was in the master's binlog, so this 1056 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0 1057 replication: Exec_master_log_pos is wrong). Only way to solve this is to 1058 have the original offset of the end of the event the relay log. This is 1059 what we do in 5.0: log_pos has become "end_log_pos" (because the real use 1060 of log_pos in 4.0 was to compute the end_log_pos; so better to store 1061 end_log_pos instead of begin_log_pos. 1062 If we had not done this fix here, the problem would also have appeared 1063 when the slave and master are 5.0 but with different event length (for 1064 example the slave is more recent than the master and features the event 1065 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in 1066 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this 1067 value which would lead to badly broken replication. 1068 Even the relay_log_pos will be corrupted in this case, because the len is 1069 the relay log is not "val". 1070 With the end_log_pos solution, we avoid computations involving lengthes. 1071 */ 1072 mysql_cond_broadcast(&data_cond); 1073 if (!skip_lock) 1074 mysql_mutex_unlock(&data_lock); 1075 DBUG_VOID_RETURN; 1076 } 1077 1078 1079 void Relay_log_info::close_temporary_tables() 1080 { 1081 DBUG_ENTER("Relay_log_info::close_temporary_tables"); 1082 1083 TMP_TABLE_SHARE *share; 1084 TABLE *table; 1085 1086 if (!save_temporary_tables) 1087 { 1088 /* There are no temporary tables. */ 1089 DBUG_VOID_RETURN; 1090 } 1091 1092 while ((share= save_temporary_tables->pop_front())) 1093 { 1094 /* 1095 Iterate over the list of tables for this TABLE_SHARE and close them. 1096 */ 1097 while ((table= share->all_tmp_tables.pop_front())) 1098 { 1099 DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'", 1100 table->s->db.str, table->s->table_name.str)); 1101 1102 /* Reset in_use as the table may have been created by another thd */ 1103 table->in_use= 0; 1104 /* 1105 Lets not free TABLE_SHARE here as there could be multiple TABLEs opened 1106 for the same table (TABLE_SHARE). 1107 */ 1108 closefrm(table); 1109 my_free(table); 1110 } 1111 1112 /* 1113 Don't ask for disk deletion. For now, anyway they will be deleted when 1114 slave restarts, but it is a better intention to not delete them. 1115 */ 1116 1117 free_table_share(share); 1118 my_free(share); 1119 } 1120 1121 /* By now, there mustn't be any elements left in the list. */ 1122 DBUG_ASSERT(save_temporary_tables->is_empty()); 1123 1124 my_free(save_temporary_tables); 1125 save_temporary_tables= NULL; 1126 slave_open_temp_tables= 0; 1127 1128 DBUG_VOID_RETURN; 1129 } 1130 1131 /* 1132 purge_relay_logs() 1133 1134 @param rli Relay log information 1135 @param thd thread id. May be zero during startup 1136 1137 NOTES 1138 Assumes to have a run lock on rli and that no slave thread are running. 1139 */ 1140 1141 int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, 1142 const char** errmsg) 1143 { 1144 int error=0; 1145 const char *ln; 1146 char name_buf[FN_REFLEN]; 1147 DBUG_ENTER("purge_relay_logs"); 1148 1149 /* 1150 Even if rli->inited==0, we still try to empty rli->master_log_* variables. 1151 Indeed, rli->inited==0 does not imply that they already are empty. 1152 It could be that slave's info initialization partly succeeded : 1153 for example if relay-log.info existed but *relay-bin*.* 1154 have been manually removed, Relay_log_info::init() reads the old 1155 relay-log.info and fills rli->master_log_*, then Relay_log_info::init() 1156 checks for the existence of the relay log, this fails and 1157 Relay_log_info::init() leaves rli->inited to 0. 1158 In that pathological case, rli->master_log_pos* will be properly reinited 1159 at the next START SLAVE (as RESET SLAVE or CHANGE 1160 MASTER, the callers of purge_relay_logs, will delete bogus *.info files 1161 or replace them with correct files), however if the user does SHOW SLAVE 1162 STATUS before START SLAVE, he will see old, confusing rli->master_log_*. 1163 In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS 1164 to display fine in any case. 1165 */ 1166 1167 rli->group_master_log_name[0]= 0; 1168 rli->group_master_log_pos= 0; 1169 1170 if (!rli->inited) 1171 { 1172 DBUG_PRINT("info", ("rli->inited == 0")); 1173 if (rli->error_on_rli_init_info) 1174 { 1175 ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin", 1176 1, name_buf); 1177 1178 if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE)) 1179 { 1180 sql_print_error("Unable to purge relay log files. Failed to open relay " 1181 "log index file:%s.", rli->relay_log.get_index_fname()); 1182 DBUG_RETURN(1); 1183 } 1184 mysql_mutex_lock(rli->relay_log.get_log_lock()); 1185 if (rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, 1186 (ulong)(rli->max_relay_log_size ? rli->max_relay_log_size : 1187 max_binlog_size), 1, TRUE)) 1188 { 1189 sql_print_error("Unable to purge relay log files. Failed to open relay " 1190 "log file:%s.", rli->relay_log.get_log_fname()); 1191 mysql_mutex_unlock(rli->relay_log.get_log_lock()); 1192 DBUG_RETURN(1); 1193 } 1194 mysql_mutex_unlock(rli->relay_log.get_log_lock()); 1195 } 1196 else 1197 DBUG_RETURN(0); 1198 } 1199 else 1200 { 1201 DBUG_ASSERT(rli->slave_running == 0); 1202 DBUG_ASSERT(rli->mi->slave_running == 0); 1203 } 1204 mysql_mutex_lock(&rli->data_lock); 1205 1206 /* 1207 we close the relay log fd possibly left open by the slave SQL thread, 1208 to be able to delete it; the relay log fd possibly left open by the slave 1209 I/O thread will be closed naturally in reset_logs() by the 1210 close(LOG_CLOSE_TO_BE_OPENED) call 1211 */ my_net_read(NET * net)1212 if (rli->cur_log_fd >= 0) 1213 { 1214 end_io_cache(&rli->cache_buf); 1215 mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); 1216 rli->cur_log_fd= -1; 1217 } 1218 1219 if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0)) 1220 { 1221 *errmsg = "Failed during log reset"; 1222 error=1; 1223 goto err; 1224 } 1225 rli->relay_log_state.load(rpl_global_gtid_slave_state); 1226 if (!just_reset) 1227 { 1228 /* Save name of used relay log file */ 1229 strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); 1230 strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); 1231 rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; 1232 my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0, 1233 MY_MEMORY_ORDER_RELAXED); 1234 if (count_relay_log_space(rli)) 1235 { 1236 *errmsg= "Error counting relay log space"; my_net_read_packet(NET * net,my_bool read_from_server)1237 error=1; 1238 goto err; 1239 } 1240 error= init_relay_log_pos(rli, rli->group_relay_log_name, 1241 rli->group_relay_log_pos, 1242 0 /* do not need data lock */, errmsg, 0); 1243 } 1244 else my_net_read_packet_reallen(NET * net,my_bool read_from_server,ulong * reallen)1245 { 1246 /* Ensure relay log names are not used */ 1247 rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0; 1248 } 1249 1250 if (!rli->inited && rli->error_on_rli_init_info) 1251 { 1252 mysql_mutex_lock(rli->relay_log.get_log_lock()); 1253 rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); 1254 mysql_mutex_unlock(rli->relay_log.get_log_lock()); 1255 } 1256 err: 1257 DBUG_PRINT("info",("log_space_total: %llu",rli->log_space_total)); 1258 mysql_mutex_unlock(&rli->data_lock); 1259 DBUG_RETURN(error); 1260 } 1261 1262 1263 /* 1264 Check if condition stated in UNTIL clause of START SLAVE is reached. 1265 SYNOPSYS 1266 Relay_log_info::is_until_satisfied() 1267 master_beg_pos position of the beginning of to be executed event 1268 (not log_pos member of the event that points to the 1269 beginning of the following event) 1270 1271 1272 DESCRIPTION 1273 Checks if UNTIL condition is reached. Uses caching result of last 1274 comparison of current log file name and target log file name. So cached 1275 value should be invalidated if current log file name changes 1276 (see Relay_log_info::notify_... functions). 1277 1278 This caching is needed to avoid of expensive string comparisons and 1279 strtol() conversions needed for log names comparison. We don't need to 1280 compare them each time this function is called, we only need to do this 1281 when current log name changes. If we have UNTIL_MASTER_POS condition we 1282 need to do this only after Rotate_log_event::do_apply_event() (which is 1283 rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS 1284 condition then we should invalidate cached comarison value after 1285 inc_group_relay_log_pos() which called for each group of events (so we 1286 have some benefit if we have something like queries that use 1287 autoincrement or if we have transactions). 1288 1289 Should be called ONLY if until_condition != UNTIL_NONE ! 1290 1291 In the parallel execution mode and UNTIL_MASTER_POS the file name is 1292 presented by future_event_master_log_name which may be ahead of 1293 group_master_log_name. Log_event::log_pos does relate to it nevertheless 1294 so the pair comprises a correct binlog coordinate. 1295 Internal group events and events that have zero log_pos also 1296 produce the zero for the local log_pos which may not lead to the 1297 function falsely return true. 1298 In UNTIL_RELAY_POS the original caching and notification are simplified 1299 to straightforward files comparison when the current event can't be 1300 a part of an event group. 1301 1302 RETURN VALUE 1303 true - condition met or error happened (condition seems to have 1304 bad log file name) 1305 false - condition not met 1306 */ 1307 1308 bool Relay_log_info::is_until_satisfied(Log_event *ev) 1309 { 1310 const char *log_name; 1311 ulonglong log_pos; 1312 /* Prevents stopping within transaction; needed solely for Relay UNTIL. */ 1313 bool in_trans= false; 1314 1315 DBUG_ENTER("Relay_log_info::is_until_satisfied"); 1316 1317 if (until_condition == UNTIL_MASTER_POS) 1318 { 1319 log_name= (mi->using_parallel() ? future_event_master_log_name 1320 : group_master_log_name); 1321 log_pos= (get_flag(Relay_log_info::IN_TRANSACTION) || !ev || !ev->log_pos) ? 1322 (mi->using_parallel() ? 0 : group_master_log_pos) : 1323 ev->log_pos - ev->data_written; 1324 } 1325 else 1326 { 1327 DBUG_ASSERT(until_condition == UNTIL_RELAY_POS); 1328 if (!mi->using_parallel()) 1329 { 1330 log_name= group_relay_log_name; 1331 log_pos= group_relay_log_pos; 1332 } 1333 else 1334 { 1335 log_name= event_relay_log_name; 1336 log_pos= event_relay_log_pos; 1337 in_trans= get_flag(Relay_log_info::IN_TRANSACTION); 1338 /* 1339 until_log_names_cmp_result is set to UNKNOWN either 1340 - by a non-group event *and* only when it is in the middle of a group 1341 - or by a group event when the preceding group made the above 1342 non-group event to defer the resetting. 1343 */ 1344 if ((ev && !Log_event::is_group_event(ev->get_type_code()))) 1345 { 1346 if (in_trans) 1347 { 1348 until_relay_log_names_defer= true; 1349 } 1350 else 1351 { 1352 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN; 1353 until_relay_log_names_defer= false; 1354 } 1355 } 1356 else if (!in_trans && until_relay_log_names_defer) 1357 { 1358 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN; 1359 until_relay_log_names_defer= false; 1360 } 1361 } 1362 } 1363 1364 DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu", 1365 group_master_log_name, group_master_log_pos)); 1366 DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%llu", 1367 group_relay_log_name, group_relay_log_pos)); 1368 DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%llu", 1369 until_condition == UNTIL_MASTER_POS ? "master" : "relay", 1370 log_name, log_pos)); 1371 DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%llu", 1372 until_condition == UNTIL_MASTER_POS ? "master" : "relay", 1373 until_log_name, until_log_pos)); 1374 1375 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) 1376 { 1377 /* 1378 We have no cached comparison results so we should compare log names 1379 and cache result. 1380 If we are after RESET SLAVE, and the SQL slave thread has not processed 1381 any event yet, it could be that group_master_log_name is "". In that case, 1382 just wait for more events (as there is no sensible comparison to do). 1383 */ 1384 1385 if (*log_name) 1386 { 1387 const char *basename= log_name + dirname_length(log_name); 1388 1389 const char *q= (const char*)(fn_ext(basename)+1); 1390 if (strncmp(basename, until_log_name, (int)(q-basename)) == 0) 1391 { 1392 /* Now compare extensions. */ 1393 char *q_end; 1394 ulong log_name_extension= strtoul(q, &q_end, 10); 1395 if (log_name_extension < until_log_name_extension) my_net_set_read_timeout(NET * net,uint timeout)1396 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS; 1397 else 1398 until_log_names_cmp_result= 1399 (log_name_extension > until_log_name_extension) ? 1400 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ; 1401 } 1402 else 1403 { 1404 /* Probably error so we aborting */ 1405 sql_print_error("Slave SQL thread is stopped because UNTIL " 1406 "condition is bad."); 1407 DBUG_RETURN(TRUE); 1408 } 1409 } my_net_set_write_timeout(NET * net,uint timeout)1410 else 1411 DBUG_RETURN(until_log_pos == 0); 1412 } 1413 1414 DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && 1415 (log_pos >= until_log_pos && !in_trans)) || 1416 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER)); 1417 } 1418 1419 1420 bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, 1421 rpl_group_info *rgi) 1422 { 1423 int error= 0; 1424 DBUG_ENTER("Relay_log_info::stmt_done"); 1425 1426 DBUG_ASSERT(!belongs_to_client()); 1427 DBUG_ASSERT(rgi->rli == this); 1428 /* 1429 If in a transaction, and if the slave supports transactions, just 1430 inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN 1431 (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with 1432 BEGIN/COMMIT, not with SET AUTOCOMMIT= . 1433 1434 We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN 1435 is also used for single row transactions. 1436 1437 CAUTION: opt_using_transactions means innodb || bdb ; suppose the 1438 master supports InnoDB and BDB, but the slave supports only BDB, 1439 problems will arise: - suppose an InnoDB table is created on the 1440 master, - then it will be MyISAM on the slave - but as 1441 opt_using_transactions is true, the slave will believe he is 1442 transactional with the MyISAM table. And problems will come when 1443 one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will 1444 resume at BEGIN whereas there has not been any rollback). This is 1445 the problem of using opt_using_transactions instead of a finer 1446 "does the slave support _transactional handler used on the 1447 master_". 1448 1449 More generally, we'll have problems when a query mixes a 1450 transactional handler and MyISAM and STOP SLAVE is issued in the 1451 middle of the "transaction". START SLAVE will resume at BEGIN 1452 while the MyISAM table has already been updated. 1453 */ 1454 if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && 1455 opt_using_transactions) 1456 rgi->inc_event_relay_log_pos(); 1457 else 1458 { 1459 inc_group_relay_log_pos(event_master_log_pos, rgi); 1460 if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi)) 1461 { 1462 report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), 1463 "Failed to update GTID state in %s.%s, slave state may become " 1464 "inconsistent: %d: %s", 1465 "mysql", rpl_gtid_slave_state_table_name.str, 1466 thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); 1467 /* 1468 At this point we are not in a transaction (for example after DDL), 1469 so we can not roll back. Anyway, normally updates to the slave 1470 state table should not fail, and if they do, at least we made the 1471 DBA aware of the problem in the error log. 1472 */ 1473 } 1474 DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); 1475 if (mi->using_gtid == Master_info::USE_GTID_NO) 1476 { 1477 if (rgi->is_parallel_exec) 1478 mysql_mutex_lock(&data_lock); 1479 if (flush()) 1480 error= 1; 1481 if (rgi->is_parallel_exec) 1482 mysql_mutex_unlock(&data_lock); 1483 } 1484 DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); 1485 } 1486 DBUG_RETURN(error); 1487 } 1488 1489 1490 int 1491 Relay_log_info::alloc_inuse_relaylog(const char *name) 1492 { 1493 inuse_relaylog *ir; 1494 uint32 gtid_count; 1495 rpl_gtid *gtid_list; 1496 1497 if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) 1498 { 1499 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); 1500 return 1; 1501 } 1502 gtid_count= relay_log_state.count(); 1503 if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count, 1504 MYF(MY_WME)))) 1505 { 1506 my_free(ir); 1507 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count); 1508 return 1; 1509 } 1510 if (relay_log_state.get_gtid_list(gtid_list, gtid_count)) 1511 { 1512 my_free(gtid_list); 1513 my_free(ir); 1514 DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */); 1515 my_error(ER_OUT_OF_RESOURCES, MYF(0)); 1516 return 1; 1517 } 1518 ir->rli= this; 1519 strmake_buf(ir->name, name); 1520 ir->relay_log_state= gtid_list; 1521 ir->relay_log_state_count= gtid_count; 1522 1523 if (!inuse_relaylog_list) 1524 inuse_relaylog_list= ir; 1525 else 1526 { 1527 last_inuse_relaylog->completed= true; 1528 last_inuse_relaylog->next= ir; 1529 } 1530 last_inuse_relaylog= ir; 1531 1532 return 0; 1533 } 1534 1535 1536 void 1537 Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir) 1538 { 1539 my_free(ir->relay_log_state); 1540 my_free(ir); 1541 } 1542 1543 1544 void 1545 Relay_log_info::reset_inuse_relaylog() 1546 { 1547 inuse_relaylog *cur= inuse_relaylog_list; 1548 while (cur) 1549 { 1550 DBUG_ASSERT(cur->queued_count == cur->dequeued_count); 1551 inuse_relaylog *next= cur->next; 1552 free_inuse_relaylog(cur); 1553 cur= next; 1554 } 1555 inuse_relaylog_list= last_inuse_relaylog= NULL; 1556 } 1557 1558 1559 int 1560 Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) 1561 { 1562 int res= 0; 1563 while (count) 1564 { 1565 if (relay_log_state.update_nolock(gtid_list, false)) 1566 res= 1; 1567 ++gtid_list; 1568 --count; 1569 } 1570 return res; 1571 } 1572 1573 1574 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) 1575 struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; }; 1576 1577 static int 1578 scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, 1579 LEX_CSTRING *tablename, void **out_hton) 1580 { 1581 TABLE_LIST tlist; 1582 TABLE *UNINIT_VAR(table); 1583 bool table_opened= false; 1584 bool table_scanned= false; 1585 struct gtid_pos_element tmp_entry, *entry; 1586 int err= 0; 1587 1588 thd->reset_for_next_command(); 1589 tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ); 1590 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) 1591 goto end; 1592 table_opened= true; 1593 table= tlist.table; 1594 1595 if ((err= gtid_check_rpl_slave_state_table(table))) 1596 goto end; 1597 1598 bitmap_set_all(table->read_set); 1599 if (unlikely(err= table->file->ha_rnd_init_with_error(1))) 1600 goto end; 1601 1602 table_scanned= true; 1603 for (;;) 1604 { 1605 uint32 domain_id, server_id; 1606 uint64 sub_id, seq_no; 1607 uchar *rec; 1608 1609 if ((err= table->file->ha_rnd_next(table->record[0]))) 1610 { 1611 if (err == HA_ERR_END_OF_FILE) 1612 break; 1613 else 1614 { 1615 table->file->print_error(err, MYF(0)); 1616 goto end; 1617 } 1618 } 1619 domain_id= (uint32)table->field[0]->val_int(); 1620 sub_id= (ulonglong)table->field[1]->val_int(); 1621 server_id= (uint32)table->field[2]->val_int(); 1622 seq_no= (ulonglong)table->field[3]->val_int(); 1623 DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu", 1624 (unsigned)domain_id, (unsigned)server_id, 1625 (ulong)seq_no, (ulong)sub_id)); 1626 1627 tmp_entry.sub_id= sub_id; 1628 tmp_entry.gtid.domain_id= domain_id; 1629 tmp_entry.gtid.server_id= server_id; 1630 tmp_entry.gtid.seq_no= seq_no; 1631 tmp_entry.hton= table->s->db_type(); 1632 if ((err= insert_dynamic(array, (uchar *)&tmp_entry))) 1633 { 1634 my_error(ER_OUT_OF_RESOURCES, MYF(0)); 1635 goto end; 1636 } 1637 1638 if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0))) 1639 { 1640 entry= (struct gtid_pos_element *)rec; 1641 if (entry->sub_id >= sub_id) 1642 continue; 1643 entry->sub_id= sub_id; 1644 DBUG_ASSERT(entry->gtid.domain_id == domain_id); 1645 entry->gtid.server_id= server_id; 1646 entry->gtid.seq_no= seq_no; 1647 entry->hton= table->s->db_type(); 1648 } 1649 else 1650 { 1651 if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry), 1652 MYF(MY_WME)))) 1653 { 1654 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry)); 1655 err= 1; 1656 goto end; 1657 } 1658 entry->sub_id= sub_id; 1659 entry->gtid.domain_id= domain_id; 1660 entry->gtid.server_id= server_id; 1661 entry->gtid.seq_no= seq_no; 1662 entry->hton= table->s->db_type(); 1663 if ((err= my_hash_insert(hash, (uchar *)entry))) 1664 { 1665 my_free(entry); 1666 my_error(ER_OUT_OF_RESOURCES, MYF(0)); 1667 goto end; 1668 } 1669 } 1670 } 1671 err= 0; /* Clear HA_ERR_END_OF_FILE */ 1672 1673 end: 1674 if (table_scanned) 1675 { 1676 table->file->ha_index_or_rnd_end(); 1677 ha_commit_trans(thd, FALSE); 1678 ha_commit_trans(thd, TRUE); 1679 } 1680 if (table_opened) 1681 { 1682 *out_hton= table->s->db_type(); 1683 close_thread_tables(thd); 1684 thd->mdl_context.release_transactional_locks(); 1685 } 1686 return err; 1687 } 1688 1689 1690 /* 1691 Look for all tables mysql.gtid_slave_pos*. Read all rows from each such 1692 table found into ARRAY. For each domain id, put the row with highest sub_id 1693 into HASH. 1694 */ 1695 static int 1696 scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *), 1697 void *cb_data) 1698 { 1699 char path[FN_REFLEN]; 1700 MY_DIR *dirp; 1701 1702 thd->reset_for_next_command(); 1703 if (lock_schema_name(thd, MYSQL_SCHEMA_NAME.str)) 1704 return 1; 1705 1706 build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "", "", 0); 1707 if (!(dirp= my_dir(path, MYF(MY_DONT_SORT)))) 1708 { 1709 my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno); 1710 close_thread_tables(thd); 1711 thd->release_transactional_locks(); 1712 return 1; 1713 } 1714 else 1715 { 1716 size_t i; 1717 Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files); 1718 Discovered_table_list tl(thd, &files); 1719 int err; 1720 1721 err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false); 1722 my_dirend(dirp); 1723 close_thread_tables(thd); 1724 thd->release_transactional_locks(); 1725 if (err) 1726 return err; 1727 1728 for (i = 0; i < files.elements(); ++i) 1729 { 1730 if (strncmp(files.at(i)->str, 1731 rpl_gtid_slave_state_table_name.str, 1732 rpl_gtid_slave_state_table_name.length) == 0) 1733 { 1734 if ((err= (*cb)(thd, files.at(i), cb_data))) 1735 return err; 1736 } 1737 } 1738 } 1739 1740 return 0; 1741 } 1742 1743 1744 struct load_gtid_state_cb_data { 1745 HASH *hash; 1746 DYNAMIC_ARRAY *array; 1747 struct rpl_slave_state::gtid_pos_table *table_list; 1748 struct rpl_slave_state::gtid_pos_table *default_entry; 1749 }; 1750 1751 static int 1752 process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton, 1753 struct load_gtid_state_cb_data *data) 1754 { 1755 struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr; 1756 bool is_default= 1757 (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0); 1758 1759 /* 1760 Ignore tables with duplicate storage engine, with a warning. 1761 Prefer the default mysql.gtid_slave_pos over another table 1762 mysql.gtid_slave_posXXX with the same storage engine. 1763 */ 1764 next_ptr= &data->table_list; 1765 entry= data->table_list; 1766 while (entry) 1767 { 1768 if (entry->table_hton == hton) 1769 { 1770 static const char *warning_msg= "Ignoring redundant table mysql.%s " 1771 "since mysql.%s has the same storage engine"; 1772 if (!is_default) 1773 { 1774 /* Ignore the redundant table. */ 1775 sql_print_warning(warning_msg, table_name->str, entry->table_name.str); 1776 return 0; 1777 } 1778 else 1779 { 1780 sql_print_warning(warning_msg, entry->table_name.str, table_name->str); 1781 /* Delete the redundant table, and proceed to add this one instead. */ 1782 *next_ptr= entry->next; 1783 my_free(entry); 1784 break; 1785 } 1786 } 1787 next_ptr= &entry->next; 1788 entry= entry->next; 1789 } 1790 1791 p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, 1792 hton, rpl_slave_state::GTID_POS_AVAILABLE); 1793 if (!p) 1794 return 1; 1795 p->next= data->table_list; 1796 data->table_list= p; 1797 if (is_default) 1798 data->default_entry= p; 1799 return 0; 1800 } 1801 1802 1803 /* 1804 Put tables corresponding to @@gtid_pos_auto_engines at the end of the list, 1805 marked to be auto-created if needed. 1806 */ 1807 static int 1808 gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr) 1809 { 1810 plugin_ref *auto_engines; 1811 int err= 0; 1812 mysql_mutex_lock(&LOCK_global_system_variables); 1813 for (auto_engines= opt_gtid_pos_auto_plugins; 1814 !err && auto_engines && *auto_engines; 1815 ++auto_engines) 1816 { 1817 void *hton= plugin_hton(*auto_engines); 1818 char buf[FN_REFLEN+1]; 1819 LEX_CSTRING table_name; 1820 char *p; 1821 rpl_slave_state::gtid_pos_table *entry, **next_ptr; 1822 1823 /* See if this engine is already in the list. */ 1824 next_ptr= list_ptr; 1825 entry= *list_ptr; 1826 while (entry) 1827 { 1828 if (entry->table_hton == hton) 1829 break; 1830 next_ptr= &entry->next; 1831 entry= entry->next; 1832 } 1833 if (entry) 1834 continue; 1835 1836 /* Add an auto-create entry for this engine at end of list. */ 1837 p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN); 1838 p= strmake(p, "_", FN_REFLEN - (p - buf)); 1839 p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf)); 1840 table_name.str= buf; 1841 table_name.length= p - buf; 1842 table_case_convert(const_cast<char*>(table_name.str), 1843 static_cast<uint>(table_name.length)); 1844 entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table 1845 (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE); 1846 if (!entry) 1847 { 1848 err= 1; 1849 break; 1850 } 1851 *next_ptr= entry; 1852 } 1853 mysql_mutex_unlock(&LOCK_global_system_variables); 1854 return err; 1855 } 1856 1857 1858 static int 1859 load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg) 1860 { 1861 int err; 1862 load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); 1863 void *hton; 1864 1865 if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array, 1866 table_name, &hton))) 1867 return err; 1868 return process_gtid_pos_table(thd, table_name, hton, data); 1869 } 1870 1871 1872 int 1873 rpl_load_gtid_slave_state(THD *thd) 1874 { 1875 bool array_inited= false; 1876 struct gtid_pos_element tmp_entry, *entry; 1877 HASH hash; 1878 DYNAMIC_ARRAY array; 1879 int err= 0; 1880 uint32 i; 1881 load_gtid_state_cb_data cb_data; 1882 DBUG_ENTER("rpl_load_gtid_slave_state"); 1883 1884 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1885 bool loaded= rpl_global_gtid_slave_state->loaded; 1886 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1887 if (loaded) 1888 DBUG_RETURN(0); 1889 1890 cb_data.table_list= NULL; 1891 cb_data.default_entry= NULL; 1892 my_hash_init(&hash, &my_charset_bin, 32, 1893 offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id), 1894 sizeof(uint32), NULL, my_free, HASH_UNIQUE); 1895 if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0)))) 1896 goto end; 1897 array_inited= true; 1898 1899 cb_data.hash = &hash; 1900 cb_data.array = &array; 1901 if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data))) 1902 goto end; 1903 1904 if (!cb_data.default_entry) 1905 { 1906 /* 1907 If the mysql.gtid_slave_pos table does not exist, but at least one other 1908 table is available, arbitrarily pick the first in the list to use as 1909 default. 1910 */ 1911 cb_data.default_entry= cb_data.table_list; 1912 } 1913 if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) 1914 goto end; 1915 1916 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1917 if (rpl_global_gtid_slave_state->loaded) 1918 { 1919 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1920 goto end; 1921 } 1922 1923 if (!cb_data.table_list) 1924 { 1925 my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql", 1926 rpl_gtid_slave_state_table_name.str); 1927 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1928 err= 1; 1929 goto end; 1930 } 1931 1932 for (i= 0; i < array.elements; ++i) 1933 { 1934 get_dynamic(&array, (uchar *)&tmp_entry, i); 1935 if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id, 1936 tmp_entry.gtid.server_id, 1937 tmp_entry.sub_id, 1938 tmp_entry.gtid.seq_no, 1939 tmp_entry.hton, 1940 NULL))) 1941 { 1942 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1943 my_error(ER_OUT_OF_RESOURCES, MYF(0)); 1944 goto end; 1945 } 1946 } 1947 1948 for (i= 0; i < hash.records; ++i) 1949 { 1950 entry= (struct gtid_pos_element *)my_hash_element(&hash, i); 1951 if (opt_bin_log && 1952 mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, 1953 entry->gtid.seq_no)) 1954 { 1955 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1956 my_error(ER_OUT_OF_RESOURCES, MYF(0)); 1957 goto end; 1958 } 1959 } 1960 1961 rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, 1962 cb_data.default_entry); 1963 cb_data.table_list= NULL; 1964 rpl_global_gtid_slave_state->loaded= true; 1965 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 1966 1967 end: 1968 if (array_inited) 1969 delete_dynamic(&array); 1970 my_hash_free(&hash); 1971 if (cb_data.table_list) 1972 rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); 1973 DBUG_RETURN(err); 1974 } 1975 1976 1977 static int 1978 find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg) 1979 { 1980 load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); 1981 TABLE_LIST tlist; 1982 TABLE *table= NULL; 1983 int err; 1984 1985 thd->reset_for_next_command(); 1986 tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ); 1987 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) 1988 goto end; 1989 table= tlist.table; 1990 1991 if ((err= gtid_check_rpl_slave_state_table(table))) 1992 goto end; 1993 err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data); 1994 1995 end: 1996 if (table) 1997 { 1998 ha_commit_trans(thd, FALSE); 1999 ha_commit_trans(thd, TRUE); 2000 close_thread_tables(thd); 2001 thd->release_transactional_locks(); 2002 } 2003 2004 return err; 2005 } 2006 2007 2008 /* 2009 Re-compute the list of available mysql.gtid_slave_posXXX tables. 2010 2011 This is done at START SLAVE to pick up any newly created tables without 2012 requiring server restart. 2013 */ 2014 int 2015 find_gtid_slave_pos_tables(THD *thd) 2016 { 2017 int err= 0; 2018 load_gtid_state_cb_data cb_data; 2019 uint num_running; 2020 2021 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); 2022 bool loaded= rpl_global_gtid_slave_state->loaded; 2023 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 2024 if (!loaded) 2025 return 0; 2026 2027 cb_data.table_list= NULL; 2028 cb_data.default_entry= NULL; 2029 if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data))) 2030 goto end; 2031 2032 if (!cb_data.table_list) 2033 { 2034 my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql", 2035 rpl_gtid_slave_state_table_name.str); 2036 err= 1; 2037 goto end; 2038 } 2039 if (!cb_data.default_entry) 2040 { 2041 /* 2042 If the mysql.gtid_slave_pos table does not exist, but at least one other 2043 table is available, arbitrarily pick the first in the list to use as 2044 default. 2045 */ 2046 cb_data.default_entry= cb_data.table_list; 2047 } 2048 if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) 2049 goto end; 2050 2051 mysql_mutex_lock(&LOCK_active_mi); 2052 num_running= any_slave_sql_running(true); 2053 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); 2054 if (num_running <= 1) 2055 { 2056 /* 2057 If no slave is running now, the count will be 1, since this SQL thread 2058 which is starting is included in the count. In this case, we can safely 2059 replace the list, no-one can be trying to read it without lock. 2060 */ 2061 DBUG_ASSERT(num_running == 1); 2062 rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, 2063 cb_data.default_entry); 2064 cb_data.table_list= NULL; 2065 } 2066 else 2067 { 2068 /* 2069 If there are SQL threads running, we cannot safely remove the old list. 2070 However we can add new entries, and warn about any tables that 2071 disappeared, but may still be visible to running SQL threads. 2072 */ 2073 rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr; 2074 2075 old_entry= (rpl_slave_state::gtid_pos_table *) 2076 rpl_global_gtid_slave_state->gtid_pos_tables; 2077 while (old_entry) 2078 { 2079 new_entry= cb_data.table_list; 2080 while (new_entry) 2081 { 2082 if (new_entry->table_hton == old_entry->table_hton) 2083 break; 2084 new_entry= new_entry->next; 2085 } 2086 if (!new_entry) 2087 sql_print_warning("The table mysql.%s was removed. " 2088 "This change will not take full effect " 2089 "until all SQL threads have been restarted", 2090 old_entry->table_name.str); 2091 old_entry= old_entry->next; 2092 } 2093 next_ptr_ptr= &cb_data.table_list; 2094 new_entry= cb_data.table_list; 2095 while (new_entry) 2096 { 2097 /* Check if we already have a table with this storage engine. */ 2098 old_entry= (rpl_slave_state::gtid_pos_table *) 2099 rpl_global_gtid_slave_state->gtid_pos_tables; 2100 while (old_entry) 2101 { 2102 if (new_entry->table_hton == old_entry->table_hton) 2103 break; 2104 old_entry= old_entry->next; 2105 } 2106 if (old_entry) 2107 { 2108 /* This new_entry is already available in the list. */ 2109 next_ptr_ptr= &new_entry->next; 2110 new_entry= new_entry->next; 2111 } 2112 else 2113 { 2114 /* Move this new_entry to the list. */ 2115 rpl_slave_state::gtid_pos_table *next= new_entry->next; 2116 rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry); 2117 *next_ptr_ptr= next; 2118 new_entry= next; 2119 } 2120 } 2121 } 2122 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); 2123 mysql_mutex_unlock(&LOCK_active_mi); 2124 2125 end: 2126 if (cb_data.table_list) 2127 rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); 2128 return err; 2129 } 2130 2131 2132 void 2133 rpl_group_info::reinit(Relay_log_info *rli) 2134 { 2135 this->rli= rli; 2136 tables_to_lock= NULL; 2137 tables_to_lock_count= 0; 2138 trans_retries= 0; 2139 last_event_start_time= 0; 2140 gtid_sub_id= 0; 2141 commit_id= 0; 2142 gtid_pending= false; 2143 worker_error= 0; 2144 row_stmt_start_timestamp= 0; 2145 long_find_row_note_printed= false; 2146 did_mark_start_commit= false; 2147 gtid_ev_flags2= 0; 2148 pending_gtid_delete_list= NULL; 2149 last_master_timestamp = 0; 2150 gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; 2151 speculation= SPECULATE_NO; 2152 commit_orderer.reinit(); 2153 } 2154 2155 rpl_group_info::rpl_group_info(Relay_log_info *rli) 2156 : thd(0), wait_commit_sub_id(0), 2157 wait_commit_group_info(0), parallel_entry(0), 2158 deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) 2159 { 2160 reinit(rli); 2161 bzero(¤t_gtid, sizeof(current_gtid)); 2162 mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, 2163 MY_MUTEX_INIT_FAST); 2164 mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL); 2165 } 2166 2167 2168 rpl_group_info::~rpl_group_info() 2169 { 2170 free_annotate_event(); 2171 delete deferred_events; 2172 mysql_mutex_destroy(&sleep_lock); 2173 mysql_cond_destroy(&sleep_cond); 2174 } 2175 2176 2177 int 2178 event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) 2179 { 2180 uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id); 2181 if (!sub_id) 2182 { 2183 /* Out of memory caused hash insertion to fail. */ 2184 return 1; 2185 } 2186 rgi->gtid_sub_id= sub_id; 2187 rgi->current_gtid.domain_id= gev->domain_id; 2188 rgi->current_gtid.server_id= gev->server_id; 2189 rgi->current_gtid.seq_no= gev->seq_no; 2190 rgi->commit_id= gev->commit_id; 2191 rgi->gtid_pending= true; 2192 return 0; 2193 } 2194 2195 2196 void 2197 delete_or_keep_event_post_apply(rpl_group_info *rgi, 2198 Log_event_type typ, Log_event *ev) 2199 { 2200 /* 2201 ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be 2202 thread-safe for parallel replication. 2203 */ 2204 2205 switch (typ) { 2206 case FORMAT_DESCRIPTION_EVENT: 2207 /* 2208 Format_description_log_event should not be deleted because it 2209 will be used to read info about the relay log's format; 2210 it will be deleted when the SQL thread does not need it, 2211 i.e. when this thread terminates. 2212 */ 2213 break; 2214 case ANNOTATE_ROWS_EVENT: 2215 /* 2216 Annotate_rows event should not be deleted because after it has 2217 been applied, thd->query points to the string inside this event. 2218 The thd->query will be used to generate new Annotate_rows event 2219 during applying the subsequent Rows events. 2220 */ 2221 rgi->set_annotate_event((Annotate_rows_log_event*) ev); 2222 break; 2223 case DELETE_ROWS_EVENT_V1: 2224 case UPDATE_ROWS_EVENT_V1: 2225 case WRITE_ROWS_EVENT_V1: 2226 case DELETE_ROWS_EVENT: 2227 case UPDATE_ROWS_EVENT: 2228 case WRITE_ROWS_EVENT: 2229 case WRITE_ROWS_COMPRESSED_EVENT: 2230 case DELETE_ROWS_COMPRESSED_EVENT: 2231 case UPDATE_ROWS_COMPRESSED_EVENT: 2232 case WRITE_ROWS_COMPRESSED_EVENT_V1: 2233 case UPDATE_ROWS_COMPRESSED_EVENT_V1: 2234 case DELETE_ROWS_COMPRESSED_EVENT_V1: 2235 /* 2236 After the last Rows event has been applied, the saved Annotate_rows 2237 event (if any) is not needed anymore and can be deleted. 2238 */ 2239 if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) 2240 rgi->free_annotate_event(); 2241 /* fall through */ 2242 default: 2243 DBUG_PRINT("info", ("Deleting the event after it has been executed")); 2244 if (!rgi->is_deferred_event(ev)) 2245 delete ev; 2246 break; 2247 } 2248 } 2249 2250 2251 void rpl_group_info::cleanup_context(THD *thd, bool error) 2252 { 2253 DBUG_ENTER("rpl_group_info::cleanup_context"); 2254 DBUG_PRINT("enter", ("error: %d", (int) error)); 2255 2256 DBUG_ASSERT(this->thd == thd); 2257 /* 2258 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, 2259 may have opened tables, which we cannot be sure have been closed (because 2260 maybe the Rows_log_event have not been found or will not be, because slave 2261 SQL thread is stopping, or relay log has a missing tail etc). So we close 2262 all thread's tables. And so the table mappings have to be cancelled. 2263 2) Rows_log_event::do_apply_event() may even have started statements or 2264 transactions on them, which we need to rollback in case of error. 2265 3) If finding a Format_description_log_event after a BEGIN, we also need 2266 to rollback before continuing with the next events. 2267 4) so we need this "context cleanup" function. 2268 */ 2269 if (unlikely(error)) 2270 { 2271 trans_rollback_stmt(thd); // if a "statement transaction" 2272 /* trans_rollback() also resets OPTION_GTID_BEGIN */ 2273 trans_rollback(thd); // if a "real transaction" 2274 /* 2275 Now that we have rolled back the transaction, make sure we do not 2276 erroneously update the GTID position. 2277 */ 2278 gtid_pending= false; 2279 2280 /* 2281 Rollback will have undone any deletions of old rows we might have made 2282 in mysql.gtid_slave_pos. Put those rows back on the list to be deleted. 2283 */ 2284 pending_gtid_deletes_put_back(); 2285 } 2286 m_table_map.clear_tables(); 2287 slave_close_thread_tables(thd); 2288 2289 if (unlikely(error)) 2290 { 2291 thd->release_transactional_locks(); 2292 2293 if (thd == rli->sql_driver_thd) 2294 { 2295 /* 2296 Reset flags. This is needed to handle incident events and errors in 2297 the relay log noticed by the sql driver thread. 2298 */ 2299 rli->clear_flag(Relay_log_info::IN_STMT); 2300 rli->clear_flag(Relay_log_info::IN_TRANSACTION); 2301 } 2302 2303 /* 2304 Ensure we always release the domain for others to process, when using 2305 --gtid-ignore-duplicates. 2306 */ 2307 if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL) 2308 rpl_global_gtid_slave_state->release_domain_owner(this); 2309 } 2310 2311 /* 2312 Cleanup for the flags that have been set at do_apply_event. 2313 */ 2314 thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS | 2315 OPTION_RELAXED_UNIQUE_CHECKS | 2316 OPTION_NO_CHECK_CONSTRAINT_CHECKS); 2317 2318 /* 2319 Reset state related to long_find_row notes in the error log: 2320 - timestamp 2321 - flag that decides whether the slave prints or not 2322 */ 2323 reset_row_stmt_start_timestamp(); 2324 unset_long_find_row_note_printed(); 2325 2326 DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", { 2327 if (current_gtid.domain_id == 100) 2328 my_sleep(50000); 2329 };); 2330 2331 DBUG_VOID_RETURN; 2332 } 2333 2334 2335 void rpl_group_info::clear_tables_to_lock() 2336 { 2337 DBUG_ENTER("rpl_group_info::clear_tables_to_lock()"); 2338 #ifndef DBUG_OFF 2339 /** 2340 When replicating in RBR and MyISAM Merge tables are involved 2341 open_and_lock_tables (called in do_apply_event) appends the 2342 base tables to the list of tables_to_lock. Then these are 2343 removed from the list in close_thread_tables (which is called 2344 before we reach this point). 2345 2346 This assertion just confirms that we get no surprises at this 2347 point. 2348 */ 2349 uint i=0; 2350 for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ; 2351 DBUG_ASSERT(i == tables_to_lock_count); 2352 #endif 2353 2354 while (tables_to_lock) 2355 { 2356 uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock); 2357 if (tables_to_lock->m_tabledef_valid) 2358 { 2359 tables_to_lock->m_tabledef.table_def::~table_def(); 2360 tables_to_lock->m_tabledef_valid= FALSE; 2361 } 2362 2363 /* 2364 If blob fields were used during conversion of field values 2365 from the master table into the slave table, then we need to 2366 free the memory used temporarily to store their values before 2367 copying into the slave's table. 2368 */ 2369 if (tables_to_lock->m_conv_table) 2370 free_blobs(tables_to_lock->m_conv_table); 2371 2372 tables_to_lock= 2373 static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global); 2374 tables_to_lock_count--; 2375 my_free(to_free); 2376 } 2377 DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); 2378 DBUG_VOID_RETURN; 2379 } 2380 2381 2382 void rpl_group_info::slave_close_thread_tables(THD *thd) 2383 { 2384 DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)"); 2385 thd->get_stmt_da()->set_overwrite_status(true); 2386 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); 2387 thd->get_stmt_da()->set_overwrite_status(false); 2388 2389 close_thread_tables(thd); 2390 /* 2391 - If transaction rollback was requested due to deadlock 2392 perform it and release metadata locks. 2393 - If inside a multi-statement transaction, 2394 defer the release of metadata locks until the current 2395 transaction is either committed or rolled back. This prevents 2396 other statements from modifying the table for the entire 2397 duration of this transaction. This provides commit ordering 2398 and guarantees serializability across multiple transactions. 2399 - If in autocommit mode, or outside a transactional context, 2400 automatically release metadata locks of the current statement. 2401 */ 2402 if (thd->transaction_rollback_request) 2403 { 2404 trans_rollback_implicit(thd); 2405 thd->release_transactional_locks(); 2406 } 2407 else if (! thd->in_multi_stmt_transaction_mode()) 2408 thd->release_transactional_locks(); 2409 else 2410 thd->mdl_context.release_statement_locks(); 2411 2412 clear_tables_to_lock(); 2413 DBUG_VOID_RETURN; 2414 } 2415 2416 2417 2418 static void 2419 mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco, 2420 rpl_group_info *rgi) 2421 { 2422 group_commit_orderer *tmp; 2423 uint64 count= ++e->count_committing_event_groups; 2424 /* Signal any following GCO whose wait_count has been reached now. */ 2425 tmp= gco; 2426 while ((tmp= tmp->next_gco)) 2427 { 2428 uint64 wait_count= tmp->wait_count; 2429 if (wait_count > count) 2430 break; 2431 mysql_cond_broadcast(&tmp->COND_group_commit_orderer); 2432 } 2433 } 2434 2435 2436 void 2437 rpl_group_info::mark_start_commit_no_lock() 2438 { 2439 if (did_mark_start_commit) 2440 return; 2441 did_mark_start_commit= true; 2442 mark_start_commit_inner(parallel_entry, gco, this); 2443 } 2444 2445 2446 void 2447 rpl_group_info::mark_start_commit() 2448 { 2449 rpl_parallel_entry *e; 2450 2451 if (did_mark_start_commit) 2452 return; 2453 did_mark_start_commit= true; 2454 2455 e= this->parallel_entry; 2456 mysql_mutex_lock(&e->LOCK_parallel_entry); 2457 mark_start_commit_inner(e, gco, this); 2458 mysql_mutex_unlock(&e->LOCK_parallel_entry); 2459 } 2460 2461 2462 /* 2463 Format the current GTID as a string suitable for printing in error messages. 2464 2465 The string is stored in a buffer inside rpl_group_info, so remains valid 2466 until next call to gtid_info() or until destruction of rpl_group_info. 2467 2468 If no GTID is available, then NULL is returned. 2469 */ 2470 char * 2471 rpl_group_info::gtid_info() 2472 { 2473 if (!gtid_sub_id || !current_gtid.seq_no) 2474 return NULL; 2475 my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu", 2476 current_gtid.domain_id, current_gtid.server_id, 2477 current_gtid.seq_no); 2478 return gtid_info_buf; 2479 } 2480 2481 2482 /* 2483 Undo the effect of a prior mark_start_commit(). 2484 2485 This is only used for retrying a transaction in parallel replication, after 2486 we have encountered a deadlock or other temporary error. 2487 2488 When we get such a deadlock, it means that the current group of transactions 2489 did not yet all start committing (else they would not have deadlocked). So 2490 we will not yet have woken up anything in the next group, our rgi->gco is 2491 still live, and we can simply decrement the counter (to be incremented again 2492 later, when the retry succeeds and reaches the commit step). 2493 */ 2494 void 2495 rpl_group_info::unmark_start_commit() 2496 { 2497 rpl_parallel_entry *e; 2498 2499 if (!did_mark_start_commit) 2500 return; 2501 did_mark_start_commit= false; 2502 2503 e= this->parallel_entry; 2504 mysql_mutex_lock(&e->LOCK_parallel_entry); 2505 --e->count_committing_event_groups; 2506 mysql_mutex_unlock(&e->LOCK_parallel_entry); 2507 } 2508 2509 2510 /* 2511 When record_gtid() has deleted any old rows from the table 2512 mysql.gtid_slave_pos as part of a replicated transaction, save the list of 2513 rows deleted here. 2514 2515 If later the transaction fails (eg. optimistic parallel replication), the 2516 deletes will be undone when the transaction is rolled back. Then we can 2517 put back the list of rows into the rpl_global_gtid_slave_state, so that 2518 we can re-do the deletes and avoid accumulating old rows in the table. 2519 */ 2520 void 2521 rpl_group_info::pending_gtid_deletes_save(uint32 domain_id, 2522 rpl_slave_state::list_element *list) 2523 { 2524 /* 2525 We should never get to a state where we try to save a new pending list of 2526 gtid deletes while we still have an old one. But make sure we handle it 2527 anyway just in case, so we avoid leaving stray entries in the 2528 mysql.gtid_slave_pos table. 2529 */ 2530 DBUG_ASSERT(!pending_gtid_delete_list); 2531 if (unlikely(pending_gtid_delete_list)) 2532 pending_gtid_deletes_put_back(); 2533 2534 pending_gtid_delete_list= list; 2535 pending_gtid_delete_list_domain= domain_id; 2536 } 2537 2538 2539 /* 2540 Take the list recorded by pending_gtid_deletes_save() and put it back into 2541 rpl_global_gtid_slave_state. This is needed if deletion of the rows was 2542 rolled back due to transaction failure. 2543 */ 2544 void 2545 rpl_group_info::pending_gtid_deletes_put_back() 2546 { 2547 if (pending_gtid_delete_list) 2548 { 2549 rpl_global_gtid_slave_state->put_back_list(pending_gtid_delete_list_domain, 2550 pending_gtid_delete_list); 2551 pending_gtid_delete_list= NULL; 2552 } 2553 } 2554 2555 2556 /* 2557 Free the list recorded by pending_gtid_deletes_save(). Done when the deletes 2558 in the list have been permanently committed. 2559 */ 2560 void 2561 rpl_group_info::pending_gtid_deletes_clear() 2562 { 2563 pending_gtid_deletes_free(pending_gtid_delete_list); 2564 pending_gtid_delete_list= NULL; 2565 } 2566 2567 2568 void 2569 rpl_group_info::pending_gtid_deletes_free(rpl_slave_state::list_element *list) 2570 { 2571 rpl_slave_state::list_element *next; 2572 2573 while (list) 2574 { 2575 next= list->next; 2576 my_free(list); 2577 list= next; 2578 } 2579 } 2580 2581 2582 rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) 2583 : rpl_filter(filter) 2584 { 2585 cached_charset_invalidate(); 2586 } 2587 2588 2589 void rpl_sql_thread_info::cached_charset_invalidate() 2590 { 2591 DBUG_ENTER("rpl_group_info::cached_charset_invalidate"); 2592 2593 /* Full of zeroes means uninitialized. */ 2594 bzero(cached_charset, sizeof(cached_charset)); 2595 DBUG_VOID_RETURN; 2596 } 2597 2598 2599 bool rpl_sql_thread_info::cached_charset_compare(char *charset) const 2600 { 2601 DBUG_ENTER("rpl_group_info::cached_charset_compare"); 2602 2603 if (memcmp(cached_charset, charset, sizeof(cached_charset))) 2604 { 2605 memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); 2606 DBUG_RETURN(1); 2607 } 2608 DBUG_RETURN(0); 2609 } 2610 2611 2612 /** 2613 Store the file and position where the slave's SQL thread are in the 2614 relay log. 2615 2616 Notes: 2617 2618 - This function should be called either from the slave SQL thread, 2619 or when the slave thread is not running. (It reads the 2620 group_{relay|master}_log_{pos|name} and delay fields in the rli 2621 object. These may only be modified by the slave SQL thread or by 2622 a client thread when the slave SQL thread is not running.) 2623 2624 - If there is an active transaction, then we do not update the 2625 position in the relay log. This is to ensure that we re-execute 2626 statements if we die in the middle of an transaction that was 2627 rolled back. 2628 2629 - As a transaction never spans binary logs, we don't have to handle 2630 the case where we do a relay-log-rotation in the middle of the 2631 transaction. If transactions could span several binlogs, we would 2632 have to ensure that we do not delete the relay log file where the 2633 transaction started before switching to a new relay log file. 2634 2635 - Error can happen if writing to file fails or if flushing the file 2636 fails. 2637 2638 @param rli The object representing the Relay_log_info. 2639 2640 @todo Change the log file information to a binary format to avoid 2641 calling longlong2str. 2642 2643 @return 0 on success, 1 on error. 2644 */ 2645 bool Relay_log_info::flush() 2646 { 2647 bool error=0; 2648 2649 DBUG_ENTER("Relay_log_info::flush()"); 2650 2651 IO_CACHE *file = &info_file; 2652 // 2*file name, 2*long long, 2*unsigned long, 6*'\n' 2653 char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos; 2654 my_b_seek(file, 0L); 2655 pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10); 2656 *pos++='\n'; 2657 pos=strmov(pos, group_relay_log_name); 2658 *pos++='\n'; 2659 pos=longlong10_to_str(group_relay_log_pos, pos, 10); 2660 *pos++='\n'; 2661 pos=strmov(pos, group_master_log_name); 2662 *pos++='\n'; 2663 pos=longlong10_to_str(group_master_log_pos, pos, 10); 2664 *pos++='\n'; 2665 pos= longlong10_to_str(sql_delay, pos, 10); 2666 *pos++= '\n'; 2667 if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff))) 2668 error=1; 2669 if (flush_io_cache(file)) 2670 error=1; 2671 if (sync_relayloginfo_period && 2672 !error && 2673 ++sync_counter >= sync_relayloginfo_period) 2674 { 2675 if (my_sync(info_fd, MYF(MY_WME))) 2676 error=1; 2677 sync_counter= 0; 2678 } 2679 /* 2680 Flushing the relay log is done by the slave I/O thread 2681 or by the user on STOP SLAVE. 2682 */ 2683 DBUG_RETURN(error); 2684 } 2685 2686 #endif 2687