1 /* Copyright (c) 2006, 2017, Oracle and/or its affiliates.
2 Copyright (c) 2010, 2020, MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software Foundation,
15 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #include "unireg.h" // HAVE_*
20 #include "rpl_mi.h"
21 #include "rpl_rli.h"
22 #include "sql_base.h" // close_thread_tables
Slave_reporting_capability(char const * thread_name)23 #include <my_dir.h> // For MY_STAT
24 #include "sql_repl.h" // For check_binlog_magic
25 #include "log_event.h" // Format_description_log_event, Log_event,
26 // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
27 // PREFIX_SQL_LOAD
28 #include "rpl_utility.h"
29 #include "transaction.h"
30 #include "sql_parse.h" // end_trans, ROLLBACK
31 #include "slave.h"
32 #include <mysql/plugin.h>
33 #include <mysql/service_thd_wait.h>
34 #include "lock.h"
35 #include "sql_table.h"
36
37 static int count_relay_log_space(Relay_log_info* rli);
38
39 /**
40 Current replication state (hash of last GTID executed, per replication
41 domain).
42 */
43 rpl_slave_state *rpl_global_gtid_slave_state;
44 /* Object used for MASTER_GTID_WAIT(). */
45 gtid_waiting rpl_global_gtid_waiting;
46
47 const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event";
48
49 Relay_log_info::Relay_log_info(bool is_slave_recovery)
50 :Slave_reporting_capability("SQL"),
51 replicate_same_server_id(::replicate_same_server_id),
52 info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
53 sync_counter(0), is_relay_log_recovery(is_slave_recovery),
54 save_temporary_tables(0),
55 mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0),
56 cur_log_old_open_count(0), error_on_rli_init_info(false),
57 group_relay_log_pos(0), event_relay_log_pos(0),
58 group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
59 last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
60 abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
61 gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
62 slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
63 until_log_pos(0), retried_trans(0), executed_entries(0),
64 sql_delay(0), sql_delay_end(0),
65 until_relay_log_names_defer(false),
66 m_flags(0)
67 {
68 DBUG_ENTER("Relay_log_info::Relay_log_info");
69
70 relay_log.is_relay_log= TRUE;
71 relay_log_state.init();
72 #ifdef HAVE_PSI_INTERFACE
73 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
74 key_RELAYLOG_COND_relay_log_updated,
75 key_RELAYLOG_COND_bin_log_updated,
76 key_file_relaylog,
77 key_file_relaylog_index,
78 key_RELAYLOG_COND_queue_busy,
79 key_LOCK_relaylog_end_pos);
~Slave_reporting_capability()80 #endif
81
82 group_relay_log_name[0]= event_relay_log_name[0]=
83 group_master_log_name[0]= 0;
84 until_log_name[0]= ign_master_log_name_end[0]= 0;
85 max_relay_log_size= global_system_variables.max_relay_log_size;
86 bzero((char*) &info_file, sizeof(info_file));
87 bzero((char*) &cache_buf, sizeof(cache_buf));
88 mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
89 mysql_mutex_init(key_relay_log_info_data_lock,
90 &data_lock, MY_MUTEX_INIT_FAST);
91 mysql_mutex_init(key_relay_log_info_log_space_lock,
92 &log_space_lock, MY_MUTEX_INIT_FAST);
93 mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
94 mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
95 mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
96 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
97 relay_log.init_pthread_objects();
98 DBUG_VOID_RETURN;
99 }
100
101
102 Relay_log_info::~Relay_log_info()
103 {
104 DBUG_ENTER("Relay_log_info::~Relay_log_info");
105
106 reset_inuse_relaylog();
107 mysql_mutex_destroy(&run_lock);
108 mysql_mutex_destroy(&data_lock);
109 mysql_mutex_destroy(&log_space_lock);
110 mysql_cond_destroy(&data_cond);
111 mysql_cond_destroy(&start_cond);
112 mysql_cond_destroy(&stop_cond);
113 mysql_cond_destroy(&log_space_cond);
114 relay_log.cleanup();
115 DBUG_VOID_RETURN;
116 }
117
118
119 /**
120 Read the relay_log.info file.
121
122 @param info_fname The name of the file to read from.
123 @retval 0 success
124 @retval 1 failure
125 */
126 int Relay_log_info::init(const char* info_fname)
127 {
128 char fname[FN_REFLEN+128];
129 const char* msg = 0;
130 int error = 0;
131 mysql_mutex_t *log_lock;
132 DBUG_ENTER("Relay_log_info::init");
133
134 if (inited) // Set if this function called
135 DBUG_RETURN(0);
136
137 log_lock= relay_log.get_log_lock();
138 fn_format(fname, info_fname, mysql_data_home, "", 4+32);
139 mysql_mutex_lock(&data_lock);
140 cur_log_fd = -1;
141 slave_skip_counter=0;
142 abort_pos_wait=0;
143 log_space_limit= relay_log_space_limit;
144 log_space_total= 0;
145
146 if (unlikely(error_on_rli_init_info))
147 goto err;
148
149 char pattern[FN_REFLEN];
150 (void) my_realpath(pattern, slave_load_tmpdir, 0);
151 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
152 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
153 {
154 mysql_mutex_unlock(&data_lock);
155 sql_print_error("Unable to use slave's temporary directory %s",
156 slave_load_tmpdir);
157 DBUG_RETURN(1);
158 }
159 unpack_filename(slave_patternload_file, pattern);
160 slave_patternload_file_size= strlen(slave_patternload_file);
161
162 /*
163 The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
164 Note that the I/O thread flushes it to disk after writing every
165 event, in flush_master_info(mi, 1, ?).
166 */
167
168 {
169 /* Reports an error and returns, if the --relay-log's path
170 is a directory.*/
171 if (opt_relay_logname &&
172 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
173 {
174 mysql_mutex_unlock(&data_lock);
175 sql_print_error("Path '%s' is a directory name, please specify \
176 a file name for --relay-log option", opt_relay_logname);
177 DBUG_RETURN(1);
178 }
179
180 /* Reports an error and returns, if the --relay-log-index's path
181 is a directory.*/
182 if (opt_relaylog_index_name &&
183 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
184 == FN_LIBCHAR)
185 {
186 mysql_mutex_unlock(&data_lock);
187 sql_print_error("Path '%s' is a directory name, please specify \
188 a file name for --relay-log-index option", opt_relaylog_index_name);
189 DBUG_RETURN(1);
190 }
191
192 char buf[FN_REFLEN];
193 const char *ln;
194 static bool name_warning_sent= 0;
195 ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
196 1, buf);
197 /* We send the warning only at startup, not after every RESET SLAVE */
198 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
199 !opt_bootstrap)
200 {
201 /*
202 User didn't give us info to name the relay log index file.
203 Picking `hostname`-relay-bin.index like we do, causes replication to
204 fail if this slave's hostname is changed later. So, we would like to
205 instead require a name. But as we don't want to break many existing
206 setups, we only give warning, not error.
207 */
208 sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
209 " so replication "
210 "may break when this MariaDB server acts as a "
211 "replica and has its hostname changed. Please "
212 "use '--log-basename=#' or '--relay-log=%s' to avoid "
213 "this problem.", ln);
214 name_warning_sent= 1;
215 }
216
217 /* For multimaster, add connection name to relay log filenames */
218 char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
219 char *buf_relaylog_index_name= opt_relaylog_index_name;
220
221 create_logfile_name_with_suffix(buf_relay_logname,
222 sizeof(buf_relay_logname),
223 ln, 1, &mi->cmp_connection_name);
224 ln= buf_relay_logname;
225
226 if (opt_relaylog_index_name)
227 {
228 buf_relaylog_index_name= buf_relaylog_index_name_buff;
229 create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
230 sizeof(buf_relaylog_index_name_buff),
231 opt_relaylog_index_name, 0,
232 &mi->cmp_connection_name);
233 }
234
235 /*
236 note, that if open() fails, we'll still have index file open
237 but a destructor will take care of that
238 */
239 mysql_mutex_lock(log_lock);
240 if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
241 relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
242 (ulong)max_relay_log_size, 1, TRUE))
243 {
244 mysql_mutex_unlock(log_lock);
245 mysql_mutex_unlock(&data_lock);
246 sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %M", ln, my_errno);
247 DBUG_RETURN(1);
248 }
249 mysql_mutex_unlock(log_lock);
250 }
251
252 /* if file does not exist */
253 if (access(fname,F_OK))
254 {
255 /*
256 If someone removed the file from underneath our feet, just close
257 the old descriptor and re-create the old file
258 */
259 if (info_fd >= 0)
260 mysql_file_close(info_fd, MYF(MY_WME));
261 if ((info_fd= mysql_file_open(key_file_relay_log_info,
262 fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
263 {
264 sql_print_error("Failed to create a new relay log info file ("
265 "file '%s', errno %d)", fname, my_errno);
266 msg= current_thd->get_stmt_da()->message();
267 goto err;
268 }
269 if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
270 MYF(MY_WME)))
271 {
272 sql_print_error("Failed to create a cache on relay log info file '%s'",
273 fname);
274 msg= current_thd->get_stmt_da()->message();
275 goto err;
276 }
277
278 /* Init relay log with first entry in the relay index file */
279 if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
280 &msg, 0))
281 {
282 sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
283 goto err;
284 }
285 group_master_log_name[0]= 0;
286 group_master_log_pos= 0;
287 }
288 else // file exists
289 {
290 if (info_fd >= 0)
291 reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
292 else
293 {
294 int error=0;
295 if ((info_fd= mysql_file_open(key_file_relay_log_info,
296 fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
297 {
298 sql_print_error("\
299 Failed to open the existing relay log info file '%s' (errno %d)",
300 fname, my_errno);
301 error= 1;
302 }
303 else if (init_io_cache(&info_file, info_fd,
304 IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
305 {
306 sql_print_error("Failed to create a cache on relay log info file '%s'",
307 fname);
308 error= 1;
309 }
310 if (unlikely(error))
311 {
312 if (info_fd >= 0)
313 mysql_file_close(info_fd, MYF(0));
314 info_fd= -1;
315 mysql_mutex_lock(log_lock);
316 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
317 mysql_mutex_unlock(log_lock);
318 mysql_mutex_unlock(&data_lock);
319 DBUG_RETURN(1);
320 }
321 }
322
323 int relay_log_pos, master_log_pos, lines;
324 char *first_non_digit;
325
326 /*
327 Starting from MySQL 5.6.x, relay-log.info has a new format.
328 Now, its first line contains the number of lines in the file.
329 By reading this number we can determine which version our master.info
330 comes from. We can't simply count the lines in the file, since
331 versions before 5.6.x could generate files with more lines than
332 needed. If first line doesn't contain a number, or if it
333 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
334 then the file is treated like a file from pre-5.6.x version.
335 There is no ambiguity when reading an old master.info: before
336 5.6.x, the first line contained the binlog's name, which is
337 either empty or has an extension (contains a '.'), so can't be
338 confused with an integer.
339
340 So we're just reading first line and trying to figure which
341 version is this.
342 */
343
344 /*
345 The first row is temporarily stored in mi->master_log_name, if
346 it is line count and not binlog name (new format) it will be
347 overwritten by the second row later.
348 */
349 if (init_strvar_from_file(group_relay_log_name,
350 sizeof(group_relay_log_name),
351 &info_file, ""))
352 {
353 msg="Error reading slave log configuration";
354 goto err;
355 }
356
357 lines= strtoul(group_relay_log_name, &first_non_digit, 10);
358
359 if (group_relay_log_name[0] != '\0' &&
360 *first_non_digit == '\0' &&
361 lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
362 {
363 DBUG_PRINT("info", ("relay_log_info file is in new format."));
364 /* Seems to be new format => read relay log name from next line */
365 if (init_strvar_from_file(group_relay_log_name,
366 sizeof(group_relay_log_name),
367 &info_file, ""))
368 {
369 msg="Error reading slave log configuration";
370 goto err;
371 }
372 }
373 else
374 DBUG_PRINT("info", ("relay_log_info file is in old format."));
375
376 if (init_intvar_from_file(&relay_log_pos,
377 &info_file, BIN_LOG_HEADER_SIZE) ||
378 init_strvar_from_file(group_master_log_name,
379 sizeof(group_master_log_name),
380 &info_file, "") ||
381 init_intvar_from_file(&master_log_pos, &info_file, 0) ||
382 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
383 init_intvar_from_file(&sql_delay, &info_file, 0)))
384 {
385 msg="Error reading slave log configuration";
386 goto err;
387 }
388
389 strmake_buf(event_relay_log_name,group_relay_log_name);
390 group_relay_log_pos= event_relay_log_pos= relay_log_pos;
391 group_master_log_pos= master_log_pos;
392
393 if (is_relay_log_recovery && init_recovery(mi, &msg))
394 goto err;
395
396 relay_log_state.load(rpl_global_gtid_slave_state);
397 if (init_relay_log_pos(this,
398 group_relay_log_name,
399 group_relay_log_pos,
400 0 /* no data lock*/,
401 &msg, 0))
402 {
403 sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)",
404 group_relay_log_name, group_relay_log_pos);
405 goto err;
406 }
407 }
408
409 DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu",
410 my_b_tell(cur_log), event_relay_log_pos));
411 DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
412 DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos);
413
414 /*
415 Now change the cache from READ to WRITE - must do this
416 before Relay_log_info::flush()
417 */
418 reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
419 if (unlikely((error= flush())))
420 {
421 msg= "Failed to flush relay log info file";
422 goto err;
423 }
424 if (count_relay_log_space(this))
425 {
426 msg="Error counting relay log space";
427 goto err;
428 }
429 inited= 1;
430 error_on_rli_init_info= false;
431 mysql_mutex_unlock(&data_lock);
432 DBUG_RETURN(0);
433
434 err:
435 error_on_rli_init_info= true;
436 if (msg)
437 sql_print_error("%s", msg);
438 end_io_cache(&info_file);
439 if (info_fd >= 0)
440 mysql_file_close(info_fd, MYF(0));
441 info_fd= -1;
442 mysql_mutex_lock(log_lock);
443 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
444 mysql_mutex_unlock(log_lock);
445 mysql_mutex_unlock(&data_lock);
446 DBUG_RETURN(1);
447 }
448
449
450 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
451 {
452 MY_STAT s;
453 DBUG_ENTER("add_relay_log");
454 if (!mysql_file_stat(key_file_relaylog,
455 linfo->log_file_name, &s, MYF(0)))
456 {
457 sql_print_error("log %s listed in the index, but failed to stat",
458 linfo->log_file_name);
459 DBUG_RETURN(1);
460 }
461 my_atomic_add64_explicit((volatile int64*)(&rli->log_space_total),
462 s.st_size, MY_MEMORY_ORDER_RELAXED);
463 DBUG_PRINT("info",("log_space_total: %llu", rli->log_space_total));
464 DBUG_RETURN(0);
465 }
466
467
468 static int count_relay_log_space(Relay_log_info* rli)
469 {
470 LOG_INFO linfo;
471 DBUG_ENTER("count_relay_log_space");
472 my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0,
473 MY_MEMORY_ORDER_RELAXED);
474 if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
475 {
476 sql_print_error("Could not find first log while counting relay log space");
477 DBUG_RETURN(1);
478 }
479 do
480 {
481 if (add_relay_log(rli,&linfo))
482 DBUG_RETURN(1);
483 } while (!rli->relay_log.find_next_log(&linfo, 1));
484 /*
485 As we have counted everything, including what may have written in a
486 preceding write, we must reset bytes_written, or we may count some space
487 twice.
488 */
489 rli->relay_log.reset_bytes_written();
490 DBUG_RETURN(0);
491 }
492
493
494 /*
495 Reset UNTIL condition for Relay_log_info
496
497 SYNOPSYS
498 clear_until_condition()
499 rli - Relay_log_info structure where UNTIL condition should be reset
500 */
501
502 void Relay_log_info::clear_until_condition()
503 {
504 DBUG_ENTER("clear_until_condition");
505
506 until_condition= Relay_log_info::UNTIL_NONE;
507 until_log_name[0]= 0;
508 until_log_pos= 0;
509 until_relay_log_names_defer= false;
510
511 DBUG_VOID_RETURN;
512 }
513
514
515 /*
516 Read the correct format description event for starting to replicate from
517 a given position in a relay log file.
518 */
519 Format_description_log_event *
520 read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
521 const char **errmsg)
522 {
523 Log_event *ev;
524 Format_description_log_event *fdev;
525 bool found= false;
526
527 /*
528 By default the relay log is in binlog format 3 (4.0).
529 Even if format is 4, this will work enough to read the first event
530 (Format_desc) (remember that format 4 is just lenghtened compared to format
531 3; format 3 is a prefix of format 4).
532 */
533 fdev= new Format_description_log_event(3);
534
535 while (!found)
536 {
537 Log_event_type typ;
538
539 /*
540 Read the possible Format_description_log_event; if position
541 was 4, no need, it will be read naturally.
542 */
543 DBUG_PRINT("info",("looking for a Format_description_log_event"));
544
545 if (my_b_tell(cur_log) >= start_pos)
546 break;
547
548 if (!(ev= Log_event::read_log_event(cur_log, fdev,
549 opt_slave_sql_verify_checksum)))
550 {
551 DBUG_PRINT("info",("could not read event, cur_log->error=%d",
552 cur_log->error));
553 if (cur_log->error) /* not EOF */
554 {
555 *errmsg= "I/O error reading event at position 4";
556 delete fdev;
557 return NULL;
558 }
559 break;
560 }
561 typ= ev->get_type_code();
562 if (typ == FORMAT_DESCRIPTION_EVENT)
563 {
564 Format_description_log_event *old= fdev;
565 DBUG_PRINT("info",("found Format_description_log_event"));
566 fdev= (Format_description_log_event*) ev;
567 fdev->copy_crypto_data(old);
568 delete old;
569
570 /*
571 As ev was returned by read_log_event, it has passed is_valid(), so
572 my_malloc() in ctor worked, no need to check again.
573 */
574 /*
575 Ok, we found a Format_description event. But it is not sure that this
576 describes the whole relay log; indeed, one can have this sequence
577 (starting from position 4):
578 Format_desc (of slave)
579 Rotate (of master)
580 Format_desc (of master)
581 So the Format_desc which really describes the rest of the relay log
582 is the 3rd event (it can't be further than that, because we rotate
583 the relay log when we queue a Rotate event from the master).
584 But what describes the Rotate is the first Format_desc.
585 So what we do is:
586 go on searching for Format_description events, until you exceed the
587 position (argument 'pos') or until you find another event than Rotate
588 or Format_desc.
589 */
590 }
591 else if (typ == START_ENCRYPTION_EVENT)
592 {
593 if (fdev->start_decryption((Start_encryption_log_event*) ev))
594 {
595 *errmsg= "Unable to set up decryption of binlog.";
596 delete ev;
597 delete fdev;
598 return NULL;
599 }
600 delete ev;
601 }
602 else
603 {
604 DBUG_PRINT("info",("found event of another type=%d", typ));
605 found= (typ != ROTATE_EVENT);
606 delete ev;
607 }
608 }
609 return fdev;
610 }
611
612
613 /*
614 Open the given relay log
615
616 SYNOPSIS
617 init_relay_log_pos()
618 rli Relay information (will be initialized)
619 log Name of relay log file to read from. NULL = First log
620 pos Position in relay log file
621 need_data_lock Set to 1 if this functions should do mutex locks
622 errmsg Store pointer to error message here
623 look_for_description_event
624 1 if we should look for such an event. We only need
625 this when the SQL thread starts and opens an existing
626 relay log and has to execute it (possibly from an
627 offset >4); then we need to read the first event of
628 the relay log to be able to parse the events we have
629 to execute.
630
631 DESCRIPTION
632 - Close old open relay log files.
633 - If we are using the same relay log as the running IO-thread, then set
634 rli->cur_log to point to the same IO_CACHE entry.
635 - If not, open the 'log' binary file.
636
637 TODO
638 - check proper initialization of group_master_log_name/group_master_log_pos
639
640 RETURN VALUES
641 0 ok
642 1 error. errmsg is set to point to the error message
643 */
644
645 int init_relay_log_pos(Relay_log_info* rli,const char* log,
646 ulonglong pos, bool need_data_lock,
647 const char** errmsg,
648 bool look_for_description_event)
649 {
650 DBUG_ENTER("init_relay_log_pos");
651 DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
652
653 *errmsg=0;
654 mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
655
656 if (need_data_lock)
657 mysql_mutex_lock(&rli->data_lock);
658
659 /*
660 Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
661 is, too, and init_slave() too; these 2 functions allocate a description
662 event in init_relay_log_pos, which is not freed by the terminating SQL slave
663 thread as that thread is not started by these functions. So we have to free
664 the description_event here, in case, so that there is no memory leak in
665 running, say, CHANGE MASTER.
666 */
667 delete rli->relay_log.description_event_for_exec;
668 /*
669 By default the relay log is in binlog format 3 (4.0).
670 Even if format is 4, this will work enough to read the first event
671 (Format_desc) (remember that format 4 is just lenghtened compared to format
672 3; format 3 is a prefix of format 4).
673 */
674 rli->relay_log.description_event_for_exec= new
675 Format_description_log_event(3);
676
677 mysql_mutex_lock(log_lock);
678
679 /* Close log file and free buffers if it's already open */
680 if (rli->cur_log_fd >= 0)
681 {
682 end_io_cache(&rli->cache_buf);
683 mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
684 rli->cur_log_fd = -1;
685 }
686
687 rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
688 rli->clear_flag(Relay_log_info::IN_STMT);
689 rli->clear_flag(Relay_log_info::IN_TRANSACTION);
690
691 /*
692 Test to see if the previous run was with the skip of purging
693 If yes, we do not purge when we restart
694 */
695 if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
696 {
697 *errmsg="Could not find first log during relay log initialization";
698 goto err;
699 }
700
701 if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
702 {
703 *errmsg="Could not find target log during relay log initialization";
704 goto err;
705 }
706 strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name);
707 strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
708 if (rli->relay_log.is_active(rli->linfo.log_file_name))
709 {
710 /*
711 The IO thread is using this log file.
712 In this case, we will use the same IO_CACHE pointer to
713 read data as the IO thread is using to write data.
714 */
715 my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
716 if (check_binlog_magic(rli->cur_log,errmsg))
717 goto err;
718 rli->cur_log_old_open_count=rli->relay_log.get_open_count();
719 }
720 else
721 {
722 /*
723 Open the relay log and set rli->cur_log to point at this one
724 */
725 if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
726 rli->linfo.log_file_name,errmsg)) < 0)
727 goto err;
728 rli->cur_log = &rli->cache_buf;
729 }
730 /*
731 In all cases, check_binlog_magic() has been called so we're at offset 4 for
732 sure.
733 */
734 if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
735 {
736 if (look_for_description_event)
737 {
738 Format_description_log_event *fdev;
739 if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
740 goto err;
741 delete rli->relay_log.description_event_for_exec;
742 rli->relay_log.description_event_for_exec= fdev;
743 }
744 my_b_seek(rli->cur_log,(off_t)pos);
745 DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu",
746 my_b_tell(rli->cur_log), rli->event_relay_log_pos));
747
748 }
749
750 err:
751 /*
752 If we don't purge, we can't honour relay_log_space_limit ;
753 silently discard it
754 */
755 if (!relay_log_purge)
756 rli->log_space_limit= 0;
757 mysql_cond_broadcast(&rli->data_cond);
758
759 mysql_mutex_unlock(log_lock);
760
761 if (need_data_lock)
762 mysql_mutex_unlock(&rli->data_lock);
763 if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
764 *errmsg= "Invalid Format_description log event; could be out of memory";
765
766 DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0));
767
768 DBUG_RETURN ((*errmsg) ? 1 : 0);
769 }
770
771
772 /*
773 Waits until the SQL thread reaches (has executed up to) the
774 log/position or timed out.
775
776 SYNOPSIS
777 wait_for_pos()
778 thd client thread that sent SELECT MASTER_POS_WAIT
779 log_name log name to wait for
780 log_pos position to wait for
781 timeout timeout in seconds before giving up waiting
782
783 NOTES
784 timeout is longlong whereas it should be ulong ; but this is
785 to catch if the user submitted a negative timeout.
786
787 RETURN VALUES
788 -2 improper arguments (log_pos<0)
789 or slave not running, or master info changed
790 during the function's execution,
791 or client thread killed. -2 is translated to NULL by caller
792 -1 timed out
793 >=0 number of log events the function had to wait
794 before reaching the desired log/position
795 */
796
797 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
798 longlong log_pos,
799 longlong timeout)
800 {
801 int event_count = 0;
802 ulong init_abort_pos_wait;
803 int error=0;
804 struct timespec abstime; // for timeout checking
805 PSI_stage_info old_stage;
806 DBUG_ENTER("Relay_log_info::wait_for_pos");
807
808 if (!inited)
809 DBUG_RETURN(-2);
810
811 DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
812 log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
813
814 set_timespec(abstime,timeout);
815 mysql_mutex_lock(&data_lock);
816 thd->ENTER_COND(&data_cond, &data_lock,
817 &stage_waiting_for_the_slave_thread_to_advance_position,
818 &old_stage);
819 /*
820 This function will abort when it notices that some CHANGE MASTER or
821 RESET MASTER has changed the master info.
822 To catch this, these commands modify abort_pos_wait ; We just monitor
823 abort_pos_wait and see if it has changed.
824 Why do we have this mechanism instead of simply monitoring slave_running
825 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
826 the SQL thread be stopped?
827 This is becasue if someones does:
828 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
829 the change may happen very quickly and we may not notice that
830 slave_running briefly switches between 1/0/1.
831 */
832 init_abort_pos_wait= abort_pos_wait;
833
834 /*
835 We'll need to
836 handle all possible log names comparisons (e.g. 999 vs 1000).
837 We use ulong for string->number conversion ; this is no
838 stronger limitation than in find_uniq_filename in sql/log.cc
839 */
840 ulong log_name_extension;
841 char log_name_tmp[FN_REFLEN]; //make a char[] from String
842
843 strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1));
844
845 char *p= fn_ext(log_name_tmp);
846 char *p_end;
847 if (!*p || log_pos<0)
848 {
849 error= -2; //means improper arguments
850 goto err;
851 }
852 // Convert 0-3 to 4
853 log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE);
854 /* p points to '.' */
855 log_name_extension= strtoul(++p, &p_end, 10);
856 /*
857 p_end points to the first invalid character.
858 If it equals to p, no digits were found, error.
859 If it contains '\0' it means conversion went ok.
860 */
861 if (p_end==p || *p_end)
862 {
863 error= -2;
864 goto err;
865 }
866
867 /* The "compare and wait" main loop */
868 while (!thd->killed &&
869 init_abort_pos_wait == abort_pos_wait &&
870 slave_running)
871 {
872 bool pos_reached;
873 int cmp_result= 0;
874
875 DBUG_PRINT("info",
876 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
877 init_abort_pos_wait, abort_pos_wait));
878 DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
879 group_master_log_name, (ulong) group_master_log_pos));
880
881 /*
882 group_master_log_name can be "", if we are just after a fresh
883 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
884 (before we have executed one Rotate event from the master) or
885 (rare) if the user is doing a weird slave setup (see next
886 paragraph). If group_master_log_name is "", we assume we don't
887 have enough info to do the comparison yet, so we just wait until
888 more data. In this case master_log_pos is always 0 except if
889 somebody (wrongly) sets this slave to be a slave of itself
890 without using --replicate-same-server-id (an unsupported
891 configuration which does nothing), then group_master_log_pos
892 will grow and group_master_log_name will stay "".
893 */
894 if (*group_master_log_name)
895 {
896 char *basename= (group_master_log_name +
897 dirname_length(group_master_log_name));
898 /*
899 First compare the parts before the extension.
900 Find the dot in the master's log basename,
901 and protect against user's input error :
902 if the names do not match up to '.' included, return error
903 */
904 char *q= (char*)(fn_ext(basename)+1);
905 if (strncmp(basename, log_name_tmp, (int)(q-basename)))
906 {
907 error= -2;
908 break;
909 }
910 // Now compare extensions.
911 char *q_end;
912 ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
913 if (group_master_log_name_extension < log_name_extension)
914 cmp_result= -1 ;
915 else
916 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
917
918 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
919 cmp_result > 0);
920 if (pos_reached || thd->killed)
921 break;
922 }
923
924 //wait for master update, with optional timeout.
925
926 DBUG_PRINT("info",("Waiting for master update"));
927 /*
928 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
929 will wake us up.
930 */
931 thd_wait_begin(thd, THD_WAIT_BINLOG);
932 if (timeout > 0)
933 {
934 /*
935 Note that mysql_cond_timedwait checks for the timeout
936 before for the condition ; i.e. it returns ETIMEDOUT
937 if the system time equals or exceeds the time specified by abstime
938 before the condition variable is signaled or broadcast, _or_ if
939 the absolute time specified by abstime has already passed at the time
940 of the call.
941 For that reason, mysql_cond_timedwait will do the "timeoutting" job
942 even if its condition is always immediately signaled (case of a loaded
943 master).
944 */
945 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
946 }
947 else
948 mysql_cond_wait(&data_cond, &data_lock);
949 thd_wait_end(thd);
950 DBUG_PRINT("info",("Got signal of master update or timed out"));
951 if (error == ETIMEDOUT || error == ETIME)
952 {
953 error= -1;
954 break;
955 }
956 error=0;
957 event_count++;
958 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
959 }
960
961 err:
962 thd->EXIT_COND(&old_stage);
963 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
964 improper_arguments: %d timed_out: %d",
965 thd->killed_errno(),
966 (int) (init_abort_pos_wait != abort_pos_wait),
967 (int) slave_running,
968 (int) (error == -2),
969 (int) (error == -1)));
970 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
971 !slave_running)
972 {
973 error= -2;
974 }
975 DBUG_RETURN( error ? error : event_count );
976 }
977
978
979 void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
980 rpl_group_info *rgi,
981 bool skip_lock)
982 {
983 DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
984
985 if (skip_lock)
986 mysql_mutex_assert_owner(&data_lock);
987 else
988 mysql_mutex_lock(&data_lock);
989
990 rgi->inc_event_relay_log_pos();
991 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
992 (long) log_pos, (long) group_master_log_pos));
993 if (rgi->is_parallel_exec)
994 {
995 /* In case of parallel replication, do not update the position backwards. */
996 int cmp= compare_log_name(group_relay_log_name, rgi->event_relay_log_name);
997 if (cmp < 0)
998 {
999 group_relay_log_pos= rgi->future_event_relay_log_pos;
1000 strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
1001 } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
1002 group_relay_log_pos= rgi->future_event_relay_log_pos;
1003
1004 /*
1005 In the parallel case we need to update the master_log_name here, rather
1006 than in Rotate_log_event::do_update_pos().
1007 */
1008 cmp= compare_log_name(group_master_log_name, rgi->future_event_master_log_name);
1009 if (cmp <= 0)
1010 {
1011 if (cmp < 0)
1012 {
1013 strcpy(group_master_log_name, rgi->future_event_master_log_name);
1014 group_master_log_pos= log_pos;
1015 }
1016 else if (group_master_log_pos < log_pos)
1017 group_master_log_pos= log_pos;
1018 }
1019
1020 /*
1021 In the parallel case, we only update the Seconds_Behind_Master at the
1022 end of a transaction. In the non-parallel case, the value is updated as
1023 soon as an event is read from the relay log; however this would be too
1024 confusing for the user, seeing the slave reported as up-to-date when
1025 potentially thousands of events are still queued up for worker threads
1026 waiting for execution.
1027 */
1028 if (rgi->last_master_timestamp &&
1029 rgi->last_master_timestamp > last_master_timestamp)
1030 last_master_timestamp= rgi->last_master_timestamp;
1031 }
1032 else
1033 {
1034 /* Non-parallel case. */
1035 group_relay_log_pos= event_relay_log_pos;
1036 strmake_buf(group_relay_log_name, event_relay_log_name);
1037 notify_group_relay_log_name_update();
1038 if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event
1039 group_master_log_pos= log_pos;
1040 }
1041
1042 /*
1043 If the slave does not support transactions and replicates a transaction,
1044 users should not trust group_master_log_pos (which they can display with
1045 SHOW SLAVE STATUS or read from relay-log.info), because to compute
1046 group_master_log_pos the slave relies on log_pos stored in the master's
1047 binlog, but if we are in a master's transaction these positions are always
1048 the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
1049 not advance as it should on the non-transactional slave (it advances by
1050 big leaps, whereas it should advance by small leaps).
1051 */
1052 /*
1053 In 4.x we used the event's len to compute the positions here. This is
1054 wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1055 then the event's len is not what is was in the master's binlog, so this
1056 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1057 replication: Exec_master_log_pos is wrong). Only way to solve this is to
1058 have the original offset of the end of the event the relay log. This is
1059 what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1060 of log_pos in 4.0 was to compute the end_log_pos; so better to store
1061 end_log_pos instead of begin_log_pos.
1062 If we had not done this fix here, the problem would also have appeared
1063 when the slave and master are 5.0 but with different event length (for
1064 example the slave is more recent than the master and features the event
1065 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1066 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1067 value which would lead to badly broken replication.
1068 Even the relay_log_pos will be corrupted in this case, because the len is
1069 the relay log is not "val".
1070 With the end_log_pos solution, we avoid computations involving lengthes.
1071 */
1072 mysql_cond_broadcast(&data_cond);
1073 if (!skip_lock)
1074 mysql_mutex_unlock(&data_lock);
1075 DBUG_VOID_RETURN;
1076 }
1077
1078
1079 void Relay_log_info::close_temporary_tables()
1080 {
1081 DBUG_ENTER("Relay_log_info::close_temporary_tables");
1082
1083 TMP_TABLE_SHARE *share;
1084 TABLE *table;
1085
1086 if (!save_temporary_tables)
1087 {
1088 /* There are no temporary tables. */
1089 DBUG_VOID_RETURN;
1090 }
1091
1092 while ((share= save_temporary_tables->pop_front()))
1093 {
1094 /*
1095 Iterate over the list of tables for this TABLE_SHARE and close them.
1096 */
1097 while ((table= share->all_tmp_tables.pop_front()))
1098 {
1099 DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
1100 table->s->db.str, table->s->table_name.str));
1101
1102 /* Reset in_use as the table may have been created by another thd */
1103 table->in_use= 0;
1104 /*
1105 Lets not free TABLE_SHARE here as there could be multiple TABLEs opened
1106 for the same table (TABLE_SHARE).
1107 */
1108 closefrm(table);
1109 my_free(table);
1110 }
1111
1112 /*
1113 Don't ask for disk deletion. For now, anyway they will be deleted when
1114 slave restarts, but it is a better intention to not delete them.
1115 */
1116
1117 free_table_share(share);
1118 my_free(share);
1119 }
1120
1121 /* By now, there mustn't be any elements left in the list. */
1122 DBUG_ASSERT(save_temporary_tables->is_empty());
1123
1124 my_free(save_temporary_tables);
1125 save_temporary_tables= NULL;
1126 slave_open_temp_tables= 0;
1127
1128 DBUG_VOID_RETURN;
1129 }
1130
1131 /*
1132 purge_relay_logs()
1133
1134 @param rli Relay log information
1135 @param thd thread id. May be zero during startup
1136
1137 NOTES
1138 Assumes to have a run lock on rli and that no slave thread are running.
1139 */
1140
1141 int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
1142 const char** errmsg)
1143 {
1144 int error=0;
1145 const char *ln;
1146 char name_buf[FN_REFLEN];
1147 DBUG_ENTER("purge_relay_logs");
1148
1149 /*
1150 Even if rli->inited==0, we still try to empty rli->master_log_* variables.
1151 Indeed, rli->inited==0 does not imply that they already are empty.
1152 It could be that slave's info initialization partly succeeded :
1153 for example if relay-log.info existed but *relay-bin*.*
1154 have been manually removed, Relay_log_info::init() reads the old
1155 relay-log.info and fills rli->master_log_*, then Relay_log_info::init()
1156 checks for the existence of the relay log, this fails and
1157 Relay_log_info::init() leaves rli->inited to 0.
1158 In that pathological case, rli->master_log_pos* will be properly reinited
1159 at the next START SLAVE (as RESET SLAVE or CHANGE
1160 MASTER, the callers of purge_relay_logs, will delete bogus *.info files
1161 or replace them with correct files), however if the user does SHOW SLAVE
1162 STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
1163 In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
1164 to display fine in any case.
1165 */
1166
1167 rli->group_master_log_name[0]= 0;
1168 rli->group_master_log_pos= 0;
1169
1170 if (!rli->inited)
1171 {
1172 DBUG_PRINT("info", ("rli->inited == 0"));
1173 if (rli->error_on_rli_init_info)
1174 {
1175 ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
1176 1, name_buf);
1177
1178 if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1179 {
1180 sql_print_error("Unable to purge relay log files. Failed to open relay "
1181 "log index file:%s.", rli->relay_log.get_index_fname());
1182 DBUG_RETURN(1);
1183 }
1184 mysql_mutex_lock(rli->relay_log.get_log_lock());
1185 if (rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
1186 (ulong)(rli->max_relay_log_size ? rli->max_relay_log_size :
1187 max_binlog_size), 1, TRUE))
1188 {
1189 sql_print_error("Unable to purge relay log files. Failed to open relay "
1190 "log file:%s.", rli->relay_log.get_log_fname());
1191 mysql_mutex_unlock(rli->relay_log.get_log_lock());
1192 DBUG_RETURN(1);
1193 }
1194 mysql_mutex_unlock(rli->relay_log.get_log_lock());
1195 }
1196 else
1197 DBUG_RETURN(0);
1198 }
1199 else
1200 {
1201 DBUG_ASSERT(rli->slave_running == 0);
1202 DBUG_ASSERT(rli->mi->slave_running == 0);
1203 }
1204 mysql_mutex_lock(&rli->data_lock);
1205
1206 /*
1207 we close the relay log fd possibly left open by the slave SQL thread,
1208 to be able to delete it; the relay log fd possibly left open by the slave
1209 I/O thread will be closed naturally in reset_logs() by the
1210 close(LOG_CLOSE_TO_BE_OPENED) call
1211 */
1212 if (rli->cur_log_fd >= 0)
1213 {
1214 end_io_cache(&rli->cache_buf);
1215 mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
1216 rli->cur_log_fd= -1;
1217 }
1218
1219 if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0))
1220 {
1221 *errmsg = "Failed during log reset";
1222 error=1;
1223 goto err;
1224 }
1225 rli->relay_log_state.load(rpl_global_gtid_slave_state);
1226 if (!just_reset)
1227 {
1228 /* Save name of used relay log file */
1229 strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
1230 strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
1231 rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1232 my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0,
1233 MY_MEMORY_ORDER_RELAXED);
1234 if (count_relay_log_space(rli))
1235 {
1236 *errmsg= "Error counting relay log space";
1237 error=1;
1238 goto err;
1239 }
1240 error= init_relay_log_pos(rli, rli->group_relay_log_name,
1241 rli->group_relay_log_pos,
1242 0 /* do not need data lock */, errmsg, 0);
1243 }
1244 else
1245 {
1246 /* Ensure relay log names are not used */
1247 rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
1248 }
1249
1250 if (!rli->inited && rli->error_on_rli_init_info)
1251 {
1252 mysql_mutex_lock(rli->relay_log.get_log_lock());
1253 rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1254 mysql_mutex_unlock(rli->relay_log.get_log_lock());
1255 }
1256 err:
1257 DBUG_PRINT("info",("log_space_total: %llu",rli->log_space_total));
1258 mysql_mutex_unlock(&rli->data_lock);
1259 DBUG_RETURN(error);
1260 }
1261
1262
1263 /*
1264 Check if condition stated in UNTIL clause of START SLAVE is reached.
1265 SYNOPSYS
1266 Relay_log_info::is_until_satisfied()
1267 master_beg_pos position of the beginning of to be executed event
1268 (not log_pos member of the event that points to the
1269 beginning of the following event)
1270
1271
1272 DESCRIPTION
1273 Checks if UNTIL condition is reached. Uses caching result of last
1274 comparison of current log file name and target log file name. So cached
1275 value should be invalidated if current log file name changes
1276 (see Relay_log_info::notify_... functions).
1277
1278 This caching is needed to avoid of expensive string comparisons and
1279 strtol() conversions needed for log names comparison. We don't need to
1280 compare them each time this function is called, we only need to do this
1281 when current log name changes. If we have UNTIL_MASTER_POS condition we
1282 need to do this only after Rotate_log_event::do_apply_event() (which is
1283 rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
1284 condition then we should invalidate cached comarison value after
1285 inc_group_relay_log_pos() which called for each group of events (so we
1286 have some benefit if we have something like queries that use
1287 autoincrement or if we have transactions).
1288
1289 Should be called ONLY if until_condition != UNTIL_NONE !
1290
1291 In the parallel execution mode and UNTIL_MASTER_POS the file name is
1292 presented by future_event_master_log_name which may be ahead of
1293 group_master_log_name. Log_event::log_pos does relate to it nevertheless
1294 so the pair comprises a correct binlog coordinate.
1295 Internal group events and events that have zero log_pos also
1296 produce the zero for the local log_pos which may not lead to the
1297 function falsely return true.
1298 In UNTIL_RELAY_POS the original caching and notification are simplified
1299 to straightforward files comparison when the current event can't be
1300 a part of an event group.
1301
1302 RETURN VALUE
1303 true - condition met or error happened (condition seems to have
1304 bad log file name)
1305 false - condition not met
1306 */
1307
1308 bool Relay_log_info::is_until_satisfied(Log_event *ev)
1309 {
1310 const char *log_name;
1311 ulonglong log_pos;
1312 /* Prevents stopping within transaction; needed solely for Relay UNTIL. */
1313 bool in_trans= false;
1314
1315 DBUG_ENTER("Relay_log_info::is_until_satisfied");
1316
1317 if (until_condition == UNTIL_MASTER_POS)
1318 {
1319 log_name= (mi->using_parallel() ? future_event_master_log_name
1320 : group_master_log_name);
1321 log_pos= (get_flag(Relay_log_info::IN_TRANSACTION) || !ev || !ev->log_pos) ?
1322 (mi->using_parallel() ? 0 : group_master_log_pos) :
1323 ev->log_pos - ev->data_written;
1324 }
1325 else
1326 {
1327 DBUG_ASSERT(until_condition == UNTIL_RELAY_POS);
1328 if (!mi->using_parallel())
1329 {
1330 log_name= group_relay_log_name;
1331 log_pos= group_relay_log_pos;
1332 }
1333 else
1334 {
1335 log_name= event_relay_log_name;
1336 log_pos= event_relay_log_pos;
1337 in_trans= get_flag(Relay_log_info::IN_TRANSACTION);
1338 /*
1339 until_log_names_cmp_result is set to UNKNOWN either
1340 - by a non-group event *and* only when it is in the middle of a group
1341 - or by a group event when the preceding group made the above
1342 non-group event to defer the resetting.
1343 */
1344 if ((ev && !Log_event::is_group_event(ev->get_type_code())))
1345 {
1346 if (in_trans)
1347 {
1348 until_relay_log_names_defer= true;
1349 }
1350 else
1351 {
1352 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
1353 until_relay_log_names_defer= false;
1354 }
1355 }
1356 else if (!in_trans && until_relay_log_names_defer)
1357 {
1358 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
1359 until_relay_log_names_defer= false;
1360 }
1361 }
1362 }
1363
1364 DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu",
1365 group_master_log_name, group_master_log_pos));
1366 DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%llu",
1367 group_relay_log_name, group_relay_log_pos));
1368 DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%llu",
1369 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1370 log_name, log_pos));
1371 DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%llu",
1372 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1373 until_log_name, until_log_pos));
1374
1375 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1376 {
1377 /*
1378 We have no cached comparison results so we should compare log names
1379 and cache result.
1380 If we are after RESET SLAVE, and the SQL slave thread has not processed
1381 any event yet, it could be that group_master_log_name is "". In that case,
1382 just wait for more events (as there is no sensible comparison to do).
1383 */
1384
1385 if (*log_name)
1386 {
1387 const char *basename= log_name + dirname_length(log_name);
1388
1389 const char *q= (const char*)(fn_ext(basename)+1);
1390 if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1391 {
1392 /* Now compare extensions. */
1393 char *q_end;
1394 ulong log_name_extension= strtoul(q, &q_end, 10);
1395 if (log_name_extension < until_log_name_extension)
1396 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1397 else
1398 until_log_names_cmp_result=
1399 (log_name_extension > until_log_name_extension) ?
1400 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1401 }
1402 else
1403 {
1404 /* Probably error so we aborting */
1405 sql_print_error("Slave SQL thread is stopped because UNTIL "
1406 "condition is bad.");
1407 DBUG_RETURN(TRUE);
1408 }
1409 }
1410 else
1411 DBUG_RETURN(until_log_pos == 0);
1412 }
1413
1414 DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1415 (log_pos >= until_log_pos && !in_trans)) ||
1416 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
1417 }
1418
1419
1420 bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
1421 rpl_group_info *rgi)
1422 {
1423 int error= 0;
1424 DBUG_ENTER("Relay_log_info::stmt_done");
1425
1426 DBUG_ASSERT(!belongs_to_client());
1427 DBUG_ASSERT(rgi->rli == this);
1428 /*
1429 If in a transaction, and if the slave supports transactions, just
1430 inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
1431 (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
1432 BEGIN/COMMIT, not with SET AUTOCOMMIT= .
1433
1434 We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
1435 is also used for single row transactions.
1436
1437 CAUTION: opt_using_transactions means innodb || bdb ; suppose the
1438 master supports InnoDB and BDB, but the slave supports only BDB,
1439 problems will arise: - suppose an InnoDB table is created on the
1440 master, - then it will be MyISAM on the slave - but as
1441 opt_using_transactions is true, the slave will believe he is
1442 transactional with the MyISAM table. And problems will come when
1443 one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
1444 resume at BEGIN whereas there has not been any rollback). This is
1445 the problem of using opt_using_transactions instead of a finer
1446 "does the slave support _transactional handler used on the
1447 master_".
1448
1449 More generally, we'll have problems when a query mixes a
1450 transactional handler and MyISAM and STOP SLAVE is issued in the
1451 middle of the "transaction". START SLAVE will resume at BEGIN
1452 while the MyISAM table has already been updated.
1453 */
1454 if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
1455 opt_using_transactions)
1456 rgi->inc_event_relay_log_pos();
1457 else
1458 {
1459 inc_group_relay_log_pos(event_master_log_pos, rgi);
1460 if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi))
1461 {
1462 report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
1463 "Failed to update GTID state in %s.%s, slave state may become "
1464 "inconsistent: %d: %s",
1465 "mysql", rpl_gtid_slave_state_table_name.str,
1466 thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
1467 /*
1468 At this point we are not in a transaction (for example after DDL),
1469 so we can not roll back. Anyway, normally updates to the slave
1470 state table should not fail, and if they do, at least we made the
1471 DBA aware of the problem in the error log.
1472 */
1473 }
1474 DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
1475 if (mi->using_gtid == Master_info::USE_GTID_NO)
1476 {
1477 if (rgi->is_parallel_exec)
1478 mysql_mutex_lock(&data_lock);
1479 if (flush())
1480 error= 1;
1481 if (rgi->is_parallel_exec)
1482 mysql_mutex_unlock(&data_lock);
1483 }
1484 DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
1485 }
1486 DBUG_RETURN(error);
1487 }
1488
1489
1490 int
1491 Relay_log_info::alloc_inuse_relaylog(const char *name)
1492 {
1493 inuse_relaylog *ir;
1494 uint32 gtid_count;
1495 rpl_gtid *gtid_list;
1496
1497 if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
1498 {
1499 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
1500 return 1;
1501 }
1502 gtid_count= relay_log_state.count();
1503 if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
1504 MYF(MY_WME))))
1505 {
1506 my_free(ir);
1507 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
1508 return 1;
1509 }
1510 if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
1511 {
1512 my_free(gtid_list);
1513 my_free(ir);
1514 DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
1515 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1516 return 1;
1517 }
1518 ir->rli= this;
1519 strmake_buf(ir->name, name);
1520 ir->relay_log_state= gtid_list;
1521 ir->relay_log_state_count= gtid_count;
1522
1523 if (!inuse_relaylog_list)
1524 inuse_relaylog_list= ir;
1525 else
1526 {
1527 last_inuse_relaylog->completed= true;
1528 last_inuse_relaylog->next= ir;
1529 }
1530 last_inuse_relaylog= ir;
1531
1532 return 0;
1533 }
1534
1535
1536 void
1537 Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
1538 {
1539 my_free(ir->relay_log_state);
1540 my_free(ir);
1541 }
1542
1543
1544 void
1545 Relay_log_info::reset_inuse_relaylog()
1546 {
1547 inuse_relaylog *cur= inuse_relaylog_list;
1548 while (cur)
1549 {
1550 DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
1551 inuse_relaylog *next= cur->next;
1552 free_inuse_relaylog(cur);
1553 cur= next;
1554 }
1555 inuse_relaylog_list= last_inuse_relaylog= NULL;
1556 }
1557
1558
1559 int
1560 Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
1561 {
1562 int res= 0;
1563 while (count)
1564 {
1565 if (relay_log_state.update_nolock(gtid_list, false))
1566 res= 1;
1567 ++gtid_list;
1568 --count;
1569 }
1570 return res;
1571 }
1572
1573
1574 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1575 struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
1576
1577 static int
1578 scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
1579 LEX_CSTRING *tablename, void **out_hton)
1580 {
1581 TABLE_LIST tlist;
1582 TABLE *UNINIT_VAR(table);
1583 bool table_opened= false;
1584 bool table_scanned= false;
1585 struct gtid_pos_element tmp_entry, *entry;
1586 int err= 0;
1587
1588 thd->reset_for_next_command();
1589 tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ);
1590 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
1591 goto end;
1592 table_opened= true;
1593 table= tlist.table;
1594
1595 if ((err= gtid_check_rpl_slave_state_table(table)))
1596 goto end;
1597
1598 bitmap_set_all(table->read_set);
1599 if (unlikely(err= table->file->ha_rnd_init_with_error(1)))
1600 goto end;
1601
1602 table_scanned= true;
1603 for (;;)
1604 {
1605 uint32 domain_id, server_id;
1606 uint64 sub_id, seq_no;
1607 uchar *rec;
1608
1609 if ((err= table->file->ha_rnd_next(table->record[0])))
1610 {
1611 if (err == HA_ERR_END_OF_FILE)
1612 break;
1613 else
1614 {
1615 table->file->print_error(err, MYF(0));
1616 goto end;
1617 }
1618 }
1619 domain_id= (uint32)table->field[0]->val_int();
1620 sub_id= (ulonglong)table->field[1]->val_int();
1621 server_id= (uint32)table->field[2]->val_int();
1622 seq_no= (ulonglong)table->field[3]->val_int();
1623 DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu",
1624 (unsigned)domain_id, (unsigned)server_id,
1625 (ulong)seq_no, (ulong)sub_id));
1626
1627 tmp_entry.sub_id= sub_id;
1628 tmp_entry.gtid.domain_id= domain_id;
1629 tmp_entry.gtid.server_id= server_id;
1630 tmp_entry.gtid.seq_no= seq_no;
1631 tmp_entry.hton= table->s->db_type();
1632 if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
1633 {
1634 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1635 goto end;
1636 }
1637
1638 if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0)))
1639 {
1640 entry= (struct gtid_pos_element *)rec;
1641 if (entry->sub_id >= sub_id)
1642 continue;
1643 entry->sub_id= sub_id;
1644 DBUG_ASSERT(entry->gtid.domain_id == domain_id);
1645 entry->gtid.server_id= server_id;
1646 entry->gtid.seq_no= seq_no;
1647 entry->hton= table->s->db_type();
1648 }
1649 else
1650 {
1651 if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry),
1652 MYF(MY_WME))))
1653 {
1654 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
1655 err= 1;
1656 goto end;
1657 }
1658 entry->sub_id= sub_id;
1659 entry->gtid.domain_id= domain_id;
1660 entry->gtid.server_id= server_id;
1661 entry->gtid.seq_no= seq_no;
1662 entry->hton= table->s->db_type();
1663 if ((err= my_hash_insert(hash, (uchar *)entry)))
1664 {
1665 my_free(entry);
1666 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1667 goto end;
1668 }
1669 }
1670 }
1671 err= 0; /* Clear HA_ERR_END_OF_FILE */
1672
1673 end:
1674 if (table_scanned)
1675 {
1676 table->file->ha_index_or_rnd_end();
1677 ha_commit_trans(thd, FALSE);
1678 ha_commit_trans(thd, TRUE);
1679 }
1680 if (table_opened)
1681 {
1682 *out_hton= table->s->db_type();
1683 close_thread_tables(thd);
1684 thd->mdl_context.release_transactional_locks();
1685 }
1686 return err;
1687 }
1688
1689
1690 /*
1691 Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
1692 table found into ARRAY. For each domain id, put the row with highest sub_id
1693 into HASH.
1694 */
1695 static int
1696 scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *),
1697 void *cb_data)
1698 {
1699 char path[FN_REFLEN];
1700 MY_DIR *dirp;
1701
1702 thd->reset_for_next_command();
1703 if (lock_schema_name(thd, MYSQL_SCHEMA_NAME.str))
1704 return 1;
1705
1706 build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "", "", 0);
1707 if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
1708 {
1709 my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
1710 close_thread_tables(thd);
1711 thd->release_transactional_locks();
1712 return 1;
1713 }
1714 else
1715 {
1716 size_t i;
1717 Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files);
1718 Discovered_table_list tl(thd, &files);
1719 int err;
1720
1721 err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false);
1722 my_dirend(dirp);
1723 close_thread_tables(thd);
1724 thd->release_transactional_locks();
1725 if (err)
1726 return err;
1727
1728 for (i = 0; i < files.elements(); ++i)
1729 {
1730 if (strncmp(files.at(i)->str,
1731 rpl_gtid_slave_state_table_name.str,
1732 rpl_gtid_slave_state_table_name.length) == 0)
1733 {
1734 if ((err= (*cb)(thd, files.at(i), cb_data)))
1735 return err;
1736 }
1737 }
1738 }
1739
1740 return 0;
1741 }
1742
1743
1744 struct load_gtid_state_cb_data {
1745 HASH *hash;
1746 DYNAMIC_ARRAY *array;
1747 struct rpl_slave_state::gtid_pos_table *table_list;
1748 struct rpl_slave_state::gtid_pos_table *default_entry;
1749 };
1750
1751 static int
1752 process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton,
1753 struct load_gtid_state_cb_data *data)
1754 {
1755 struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
1756 bool is_default=
1757 (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
1758
1759 /*
1760 Ignore tables with duplicate storage engine, with a warning.
1761 Prefer the default mysql.gtid_slave_pos over another table
1762 mysql.gtid_slave_posXXX with the same storage engine.
1763 */
1764 next_ptr= &data->table_list;
1765 entry= data->table_list;
1766 while (entry)
1767 {
1768 if (entry->table_hton == hton)
1769 {
1770 static const char *warning_msg= "Ignoring redundant table mysql.%s "
1771 "since mysql.%s has the same storage engine";
1772 if (!is_default)
1773 {
1774 /* Ignore the redundant table. */
1775 sql_print_warning(warning_msg, table_name->str, entry->table_name.str);
1776 return 0;
1777 }
1778 else
1779 {
1780 sql_print_warning(warning_msg, entry->table_name.str, table_name->str);
1781 /* Delete the redundant table, and proceed to add this one instead. */
1782 *next_ptr= entry->next;
1783 my_free(entry);
1784 break;
1785 }
1786 }
1787 next_ptr= &entry->next;
1788 entry= entry->next;
1789 }
1790
1791 p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name,
1792 hton, rpl_slave_state::GTID_POS_AVAILABLE);
1793 if (!p)
1794 return 1;
1795 p->next= data->table_list;
1796 data->table_list= p;
1797 if (is_default)
1798 data->default_entry= p;
1799 return 0;
1800 }
1801
1802
1803 /*
1804 Put tables corresponding to @@gtid_pos_auto_engines at the end of the list,
1805 marked to be auto-created if needed.
1806 */
1807 static int
1808 gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr)
1809 {
1810 plugin_ref *auto_engines;
1811 int err= 0;
1812 mysql_mutex_lock(&LOCK_global_system_variables);
1813 for (auto_engines= opt_gtid_pos_auto_plugins;
1814 !err && auto_engines && *auto_engines;
1815 ++auto_engines)
1816 {
1817 void *hton= plugin_hton(*auto_engines);
1818 char buf[FN_REFLEN+1];
1819 LEX_CSTRING table_name;
1820 char *p;
1821 rpl_slave_state::gtid_pos_table *entry, **next_ptr;
1822
1823 /* See if this engine is already in the list. */
1824 next_ptr= list_ptr;
1825 entry= *list_ptr;
1826 while (entry)
1827 {
1828 if (entry->table_hton == hton)
1829 break;
1830 next_ptr= &entry->next;
1831 entry= entry->next;
1832 }
1833 if (entry)
1834 continue;
1835
1836 /* Add an auto-create entry for this engine at end of list. */
1837 p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN);
1838 p= strmake(p, "_", FN_REFLEN - (p - buf));
1839 p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf));
1840 table_name.str= buf;
1841 table_name.length= p - buf;
1842 table_case_convert(const_cast<char*>(table_name.str),
1843 static_cast<uint>(table_name.length));
1844 entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table
1845 (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE);
1846 if (!entry)
1847 {
1848 err= 1;
1849 break;
1850 }
1851 *next_ptr= entry;
1852 }
1853 mysql_mutex_unlock(&LOCK_global_system_variables);
1854 return err;
1855 }
1856
1857
1858 static int
1859 load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
1860 {
1861 int err;
1862 load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
1863 void *hton;
1864
1865 if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
1866 table_name, &hton)))
1867 return err;
1868 return process_gtid_pos_table(thd, table_name, hton, data);
1869 }
1870
1871
1872 int
1873 rpl_load_gtid_slave_state(THD *thd)
1874 {
1875 bool array_inited= false;
1876 struct gtid_pos_element tmp_entry, *entry;
1877 HASH hash;
1878 DYNAMIC_ARRAY array;
1879 int err= 0;
1880 uint32 i;
1881 load_gtid_state_cb_data cb_data;
1882 DBUG_ENTER("rpl_load_gtid_slave_state");
1883
1884 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1885 bool loaded= rpl_global_gtid_slave_state->loaded;
1886 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1887 if (loaded)
1888 DBUG_RETURN(0);
1889
1890 cb_data.table_list= NULL;
1891 cb_data.default_entry= NULL;
1892 my_hash_init(&hash, &my_charset_bin, 32,
1893 offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
1894 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1895 if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0))))
1896 goto end;
1897 array_inited= true;
1898
1899 cb_data.hash = &hash;
1900 cb_data.array = &array;
1901 if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data)))
1902 goto end;
1903
1904 if (!cb_data.default_entry)
1905 {
1906 /*
1907 If the mysql.gtid_slave_pos table does not exist, but at least one other
1908 table is available, arbitrarily pick the first in the list to use as
1909 default.
1910 */
1911 cb_data.default_entry= cb_data.table_list;
1912 }
1913 if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
1914 goto end;
1915
1916 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1917 if (rpl_global_gtid_slave_state->loaded)
1918 {
1919 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1920 goto end;
1921 }
1922
1923 if (!cb_data.table_list)
1924 {
1925 my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
1926 rpl_gtid_slave_state_table_name.str);
1927 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1928 err= 1;
1929 goto end;
1930 }
1931
1932 for (i= 0; i < array.elements; ++i)
1933 {
1934 get_dynamic(&array, (uchar *)&tmp_entry, i);
1935 if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
1936 tmp_entry.gtid.server_id,
1937 tmp_entry.sub_id,
1938 tmp_entry.gtid.seq_no,
1939 tmp_entry.hton,
1940 NULL)))
1941 {
1942 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1943 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1944 goto end;
1945 }
1946 }
1947
1948 for (i= 0; i < hash.records; ++i)
1949 {
1950 entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
1951 if (opt_bin_log &&
1952 mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
1953 entry->gtid.seq_no))
1954 {
1955 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1956 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1957 goto end;
1958 }
1959 }
1960
1961 rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
1962 cb_data.default_entry);
1963 cb_data.table_list= NULL;
1964 rpl_global_gtid_slave_state->loaded= true;
1965 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1966
1967 end:
1968 if (array_inited)
1969 delete_dynamic(&array);
1970 my_hash_free(&hash);
1971 if (cb_data.table_list)
1972 rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
1973 DBUG_RETURN(err);
1974 }
1975
1976
1977 static int
1978 find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
1979 {
1980 load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
1981 TABLE_LIST tlist;
1982 TABLE *table= NULL;
1983 int err;
1984
1985 thd->reset_for_next_command();
1986 tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ);
1987 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
1988 goto end;
1989 table= tlist.table;
1990
1991 if ((err= gtid_check_rpl_slave_state_table(table)))
1992 goto end;
1993 err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
1994
1995 end:
1996 if (table)
1997 {
1998 ha_commit_trans(thd, FALSE);
1999 ha_commit_trans(thd, TRUE);
2000 close_thread_tables(thd);
2001 thd->release_transactional_locks();
2002 }
2003
2004 return err;
2005 }
2006
2007
2008 /*
2009 Re-compute the list of available mysql.gtid_slave_posXXX tables.
2010
2011 This is done at START SLAVE to pick up any newly created tables without
2012 requiring server restart.
2013 */
2014 int
2015 find_gtid_slave_pos_tables(THD *thd)
2016 {
2017 int err= 0;
2018 load_gtid_state_cb_data cb_data;
2019 uint num_running;
2020
2021 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2022 bool loaded= rpl_global_gtid_slave_state->loaded;
2023 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2024 if (!loaded)
2025 return 0;
2026
2027 cb_data.table_list= NULL;
2028 cb_data.default_entry= NULL;
2029 if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
2030 goto end;
2031
2032 if (!cb_data.table_list)
2033 {
2034 my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
2035 rpl_gtid_slave_state_table_name.str);
2036 err= 1;
2037 goto end;
2038 }
2039 if (!cb_data.default_entry)
2040 {
2041 /*
2042 If the mysql.gtid_slave_pos table does not exist, but at least one other
2043 table is available, arbitrarily pick the first in the list to use as
2044 default.
2045 */
2046 cb_data.default_entry= cb_data.table_list;
2047 }
2048 if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
2049 goto end;
2050
2051 mysql_mutex_lock(&LOCK_active_mi);
2052 num_running= any_slave_sql_running(true);
2053 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2054 if (num_running <= 1)
2055 {
2056 /*
2057 If no slave is running now, the count will be 1, since this SQL thread
2058 which is starting is included in the count. In this case, we can safely
2059 replace the list, no-one can be trying to read it without lock.
2060 */
2061 DBUG_ASSERT(num_running == 1);
2062 rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
2063 cb_data.default_entry);
2064 cb_data.table_list= NULL;
2065 }
2066 else
2067 {
2068 /*
2069 If there are SQL threads running, we cannot safely remove the old list.
2070 However we can add new entries, and warn about any tables that
2071 disappeared, but may still be visible to running SQL threads.
2072 */
2073 rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr;
2074
2075 old_entry= (rpl_slave_state::gtid_pos_table *)
2076 rpl_global_gtid_slave_state->gtid_pos_tables;
2077 while (old_entry)
2078 {
2079 new_entry= cb_data.table_list;
2080 while (new_entry)
2081 {
2082 if (new_entry->table_hton == old_entry->table_hton)
2083 break;
2084 new_entry= new_entry->next;
2085 }
2086 if (!new_entry)
2087 sql_print_warning("The table mysql.%s was removed. "
2088 "This change will not take full effect "
2089 "until all SQL threads have been restarted",
2090 old_entry->table_name.str);
2091 old_entry= old_entry->next;
2092 }
2093 next_ptr_ptr= &cb_data.table_list;
2094 new_entry= cb_data.table_list;
2095 while (new_entry)
2096 {
2097 /* Check if we already have a table with this storage engine. */
2098 old_entry= (rpl_slave_state::gtid_pos_table *)
2099 rpl_global_gtid_slave_state->gtid_pos_tables;
2100 while (old_entry)
2101 {
2102 if (new_entry->table_hton == old_entry->table_hton)
2103 break;
2104 old_entry= old_entry->next;
2105 }
2106 if (old_entry)
2107 {
2108 /* This new_entry is already available in the list. */
2109 next_ptr_ptr= &new_entry->next;
2110 new_entry= new_entry->next;
2111 }
2112 else
2113 {
2114 /* Move this new_entry to the list. */
2115 rpl_slave_state::gtid_pos_table *next= new_entry->next;
2116 rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
2117 *next_ptr_ptr= next;
2118 new_entry= next;
2119 }
2120 }
2121 }
2122 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2123 mysql_mutex_unlock(&LOCK_active_mi);
2124
2125 end:
2126 if (cb_data.table_list)
2127 rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
2128 return err;
2129 }
2130
2131
2132 void
2133 rpl_group_info::reinit(Relay_log_info *rli)
2134 {
2135 this->rli= rli;
2136 tables_to_lock= NULL;
2137 tables_to_lock_count= 0;
2138 trans_retries= 0;
2139 last_event_start_time= 0;
2140 gtid_sub_id= 0;
2141 commit_id= 0;
2142 gtid_pending= false;
2143 worker_error= 0;
2144 row_stmt_start_timestamp= 0;
2145 long_find_row_note_printed= false;
2146 did_mark_start_commit= false;
2147 gtid_ev_flags2= 0;
2148 pending_gtid_delete_list= NULL;
2149 last_master_timestamp = 0;
2150 gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
2151 speculation= SPECULATE_NO;
2152 commit_orderer.reinit();
2153 }
2154
2155 rpl_group_info::rpl_group_info(Relay_log_info *rli)
2156 : thd(0), wait_commit_sub_id(0),
2157 wait_commit_group_info(0), parallel_entry(0),
2158 deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
2159 {
2160 reinit(rli);
2161 bzero(¤t_gtid, sizeof(current_gtid));
2162 mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
2163 MY_MUTEX_INIT_FAST);
2164 mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
2165 }
2166
2167
2168 rpl_group_info::~rpl_group_info()
2169 {
2170 free_annotate_event();
2171 delete deferred_events;
2172 mysql_mutex_destroy(&sleep_lock);
2173 mysql_cond_destroy(&sleep_cond);
2174 }
2175
2176
2177 int
2178 event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
2179 {
2180 uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id);
2181 if (!sub_id)
2182 {
2183 /* Out of memory caused hash insertion to fail. */
2184 return 1;
2185 }
2186 rgi->gtid_sub_id= sub_id;
2187 rgi->current_gtid.domain_id= gev->domain_id;
2188 rgi->current_gtid.server_id= gev->server_id;
2189 rgi->current_gtid.seq_no= gev->seq_no;
2190 rgi->commit_id= gev->commit_id;
2191 rgi->gtid_pending= true;
2192 return 0;
2193 }
2194
2195
2196 void
2197 delete_or_keep_event_post_apply(rpl_group_info *rgi,
2198 Log_event_type typ, Log_event *ev)
2199 {
2200 /*
2201 ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
2202 thread-safe for parallel replication.
2203 */
2204
2205 switch (typ) {
2206 case FORMAT_DESCRIPTION_EVENT:
2207 /*
2208 Format_description_log_event should not be deleted because it
2209 will be used to read info about the relay log's format;
2210 it will be deleted when the SQL thread does not need it,
2211 i.e. when this thread terminates.
2212 */
2213 break;
2214 case ANNOTATE_ROWS_EVENT:
2215 /*
2216 Annotate_rows event should not be deleted because after it has
2217 been applied, thd->query points to the string inside this event.
2218 The thd->query will be used to generate new Annotate_rows event
2219 during applying the subsequent Rows events.
2220 */
2221 rgi->set_annotate_event((Annotate_rows_log_event*) ev);
2222 break;
2223 case DELETE_ROWS_EVENT_V1:
2224 case UPDATE_ROWS_EVENT_V1:
2225 case WRITE_ROWS_EVENT_V1:
2226 case DELETE_ROWS_EVENT:
2227 case UPDATE_ROWS_EVENT:
2228 case WRITE_ROWS_EVENT:
2229 case WRITE_ROWS_COMPRESSED_EVENT:
2230 case DELETE_ROWS_COMPRESSED_EVENT:
2231 case UPDATE_ROWS_COMPRESSED_EVENT:
2232 case WRITE_ROWS_COMPRESSED_EVENT_V1:
2233 case UPDATE_ROWS_COMPRESSED_EVENT_V1:
2234 case DELETE_ROWS_COMPRESSED_EVENT_V1:
2235 /*
2236 After the last Rows event has been applied, the saved Annotate_rows
2237 event (if any) is not needed anymore and can be deleted.
2238 */
2239 if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
2240 rgi->free_annotate_event();
2241 /* fall through */
2242 default:
2243 DBUG_PRINT("info", ("Deleting the event after it has been executed"));
2244 if (!rgi->is_deferred_event(ev))
2245 delete ev;
2246 break;
2247 }
2248 }
2249
2250
2251 void rpl_group_info::cleanup_context(THD *thd, bool error)
2252 {
2253 DBUG_ENTER("rpl_group_info::cleanup_context");
2254 DBUG_PRINT("enter", ("error: %d", (int) error));
2255
2256 DBUG_ASSERT(this->thd == thd);
2257 /*
2258 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
2259 may have opened tables, which we cannot be sure have been closed (because
2260 maybe the Rows_log_event have not been found or will not be, because slave
2261 SQL thread is stopping, or relay log has a missing tail etc). So we close
2262 all thread's tables. And so the table mappings have to be cancelled.
2263 2) Rows_log_event::do_apply_event() may even have started statements or
2264 transactions on them, which we need to rollback in case of error.
2265 3) If finding a Format_description_log_event after a BEGIN, we also need
2266 to rollback before continuing with the next events.
2267 4) so we need this "context cleanup" function.
2268 */
2269 if (unlikely(error))
2270 {
2271 trans_rollback_stmt(thd); // if a "statement transaction"
2272 /* trans_rollback() also resets OPTION_GTID_BEGIN */
2273 trans_rollback(thd); // if a "real transaction"
2274 /*
2275 Now that we have rolled back the transaction, make sure we do not
2276 erroneously update the GTID position.
2277 */
2278 gtid_pending= false;
2279
2280 /*
2281 Rollback will have undone any deletions of old rows we might have made
2282 in mysql.gtid_slave_pos. Put those rows back on the list to be deleted.
2283 */
2284 pending_gtid_deletes_put_back();
2285 }
2286 m_table_map.clear_tables();
2287 slave_close_thread_tables(thd);
2288
2289 if (unlikely(error))
2290 {
2291 thd->release_transactional_locks();
2292
2293 if (thd == rli->sql_driver_thd)
2294 {
2295 /*
2296 Reset flags. This is needed to handle incident events and errors in
2297 the relay log noticed by the sql driver thread.
2298 */
2299 rli->clear_flag(Relay_log_info::IN_STMT);
2300 rli->clear_flag(Relay_log_info::IN_TRANSACTION);
2301 }
2302
2303 /*
2304 Ensure we always release the domain for others to process, when using
2305 --gtid-ignore-duplicates.
2306 */
2307 if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
2308 rpl_global_gtid_slave_state->release_domain_owner(this);
2309 }
2310
2311 /*
2312 Cleanup for the flags that have been set at do_apply_event.
2313 */
2314 thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS |
2315 OPTION_RELAXED_UNIQUE_CHECKS |
2316 OPTION_NO_CHECK_CONSTRAINT_CHECKS);
2317
2318 /*
2319 Reset state related to long_find_row notes in the error log:
2320 - timestamp
2321 - flag that decides whether the slave prints or not
2322 */
2323 reset_row_stmt_start_timestamp();
2324 unset_long_find_row_note_printed();
2325
2326 DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", {
2327 if (current_gtid.domain_id == 100)
2328 my_sleep(50000);
2329 };);
2330
2331 DBUG_VOID_RETURN;
2332 }
2333
2334
2335 void rpl_group_info::clear_tables_to_lock()
2336 {
2337 DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
2338 #ifndef DBUG_OFF
2339 /**
2340 When replicating in RBR and MyISAM Merge tables are involved
2341 open_and_lock_tables (called in do_apply_event) appends the
2342 base tables to the list of tables_to_lock. Then these are
2343 removed from the list in close_thread_tables (which is called
2344 before we reach this point).
2345
2346 This assertion just confirms that we get no surprises at this
2347 point.
2348 */
2349 uint i=0;
2350 for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
2351 DBUG_ASSERT(i == tables_to_lock_count);
2352 #endif
2353
2354 while (tables_to_lock)
2355 {
2356 uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
2357 if (tables_to_lock->m_tabledef_valid)
2358 {
2359 tables_to_lock->m_tabledef.table_def::~table_def();
2360 tables_to_lock->m_tabledef_valid= FALSE;
2361 }
2362
2363 /*
2364 If blob fields were used during conversion of field values
2365 from the master table into the slave table, then we need to
2366 free the memory used temporarily to store their values before
2367 copying into the slave's table.
2368 */
2369 if (tables_to_lock->m_conv_table)
2370 free_blobs(tables_to_lock->m_conv_table);
2371
2372 tables_to_lock=
2373 static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
2374 tables_to_lock_count--;
2375 my_free(to_free);
2376 }
2377 DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
2378 DBUG_VOID_RETURN;
2379 }
2380
2381
2382 void rpl_group_info::slave_close_thread_tables(THD *thd)
2383 {
2384 DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
2385 thd->get_stmt_da()->set_overwrite_status(true);
2386 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
2387 thd->get_stmt_da()->set_overwrite_status(false);
2388
2389 close_thread_tables(thd);
2390 /*
2391 - If transaction rollback was requested due to deadlock
2392 perform it and release metadata locks.
2393 - If inside a multi-statement transaction,
2394 defer the release of metadata locks until the current
2395 transaction is either committed or rolled back. This prevents
2396 other statements from modifying the table for the entire
2397 duration of this transaction. This provides commit ordering
2398 and guarantees serializability across multiple transactions.
2399 - If in autocommit mode, or outside a transactional context,
2400 automatically release metadata locks of the current statement.
2401 */
2402 if (thd->transaction_rollback_request)
2403 {
2404 trans_rollback_implicit(thd);
2405 thd->release_transactional_locks();
2406 }
2407 else if (! thd->in_multi_stmt_transaction_mode())
2408 thd->release_transactional_locks();
2409 else
2410 thd->mdl_context.release_statement_locks();
2411
2412 clear_tables_to_lock();
2413 DBUG_VOID_RETURN;
2414 }
2415
2416
2417
2418 static void
2419 mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
2420 rpl_group_info *rgi)
2421 {
2422 group_commit_orderer *tmp;
2423 uint64 count= ++e->count_committing_event_groups;
2424 /* Signal any following GCO whose wait_count has been reached now. */
2425 tmp= gco;
2426 while ((tmp= tmp->next_gco))
2427 {
2428 uint64 wait_count= tmp->wait_count;
2429 if (wait_count > count)
2430 break;
2431 mysql_cond_broadcast(&tmp->COND_group_commit_orderer);
2432 }
2433 }
2434
2435
2436 void
2437 rpl_group_info::mark_start_commit_no_lock()
2438 {
2439 if (did_mark_start_commit)
2440 return;
2441 did_mark_start_commit= true;
2442 mark_start_commit_inner(parallel_entry, gco, this);
2443 }
2444
2445
2446 void
2447 rpl_group_info::mark_start_commit()
2448 {
2449 rpl_parallel_entry *e;
2450
2451 if (did_mark_start_commit)
2452 return;
2453 did_mark_start_commit= true;
2454
2455 e= this->parallel_entry;
2456 mysql_mutex_lock(&e->LOCK_parallel_entry);
2457 mark_start_commit_inner(e, gco, this);
2458 mysql_mutex_unlock(&e->LOCK_parallel_entry);
2459 }
2460
2461
2462 /*
2463 Format the current GTID as a string suitable for printing in error messages.
2464
2465 The string is stored in a buffer inside rpl_group_info, so remains valid
2466 until next call to gtid_info() or until destruction of rpl_group_info.
2467
2468 If no GTID is available, then NULL is returned.
2469 */
2470 char *
2471 rpl_group_info::gtid_info()
2472 {
2473 if (!gtid_sub_id || !current_gtid.seq_no)
2474 return NULL;
2475 my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
2476 current_gtid.domain_id, current_gtid.server_id,
2477 current_gtid.seq_no);
2478 return gtid_info_buf;
2479 }
2480
2481
2482 /*
2483 Undo the effect of a prior mark_start_commit().
2484
2485 This is only used for retrying a transaction in parallel replication, after
2486 we have encountered a deadlock or other temporary error.
2487
2488 When we get such a deadlock, it means that the current group of transactions
2489 did not yet all start committing (else they would not have deadlocked). So
2490 we will not yet have woken up anything in the next group, our rgi->gco is
2491 still live, and we can simply decrement the counter (to be incremented again
2492 later, when the retry succeeds and reaches the commit step).
2493 */
2494 void
2495 rpl_group_info::unmark_start_commit()
2496 {
2497 rpl_parallel_entry *e;
2498
2499 if (!did_mark_start_commit)
2500 return;
2501 did_mark_start_commit= false;
2502
2503 e= this->parallel_entry;
2504 mysql_mutex_lock(&e->LOCK_parallel_entry);
2505 --e->count_committing_event_groups;
2506 mysql_mutex_unlock(&e->LOCK_parallel_entry);
2507 }
2508
2509
2510 /*
2511 When record_gtid() has deleted any old rows from the table
2512 mysql.gtid_slave_pos as part of a replicated transaction, save the list of
2513 rows deleted here.
2514
2515 If later the transaction fails (eg. optimistic parallel replication), the
2516 deletes will be undone when the transaction is rolled back. Then we can
2517 put back the list of rows into the rpl_global_gtid_slave_state, so that
2518 we can re-do the deletes and avoid accumulating old rows in the table.
2519 */
2520 void
2521 rpl_group_info::pending_gtid_deletes_save(uint32 domain_id,
2522 rpl_slave_state::list_element *list)
2523 {
2524 /*
2525 We should never get to a state where we try to save a new pending list of
2526 gtid deletes while we still have an old one. But make sure we handle it
2527 anyway just in case, so we avoid leaving stray entries in the
2528 mysql.gtid_slave_pos table.
2529 */
2530 DBUG_ASSERT(!pending_gtid_delete_list);
2531 if (unlikely(pending_gtid_delete_list))
2532 pending_gtid_deletes_put_back();
2533
2534 pending_gtid_delete_list= list;
2535 pending_gtid_delete_list_domain= domain_id;
2536 }
2537
2538
2539 /*
2540 Take the list recorded by pending_gtid_deletes_save() and put it back into
2541 rpl_global_gtid_slave_state. This is needed if deletion of the rows was
2542 rolled back due to transaction failure.
2543 */
2544 void
2545 rpl_group_info::pending_gtid_deletes_put_back()
2546 {
2547 if (pending_gtid_delete_list)
2548 {
2549 rpl_global_gtid_slave_state->put_back_list(pending_gtid_delete_list_domain,
2550 pending_gtid_delete_list);
2551 pending_gtid_delete_list= NULL;
2552 }
2553 }
2554
2555
2556 /*
2557 Free the list recorded by pending_gtid_deletes_save(). Done when the deletes
2558 in the list have been permanently committed.
2559 */
2560 void
2561 rpl_group_info::pending_gtid_deletes_clear()
2562 {
2563 pending_gtid_deletes_free(pending_gtid_delete_list);
2564 pending_gtid_delete_list= NULL;
2565 }
2566
2567
2568 void
2569 rpl_group_info::pending_gtid_deletes_free(rpl_slave_state::list_element *list)
2570 {
2571 rpl_slave_state::list_element *next;
2572
2573 while (list)
2574 {
2575 next= list->next;
2576 my_free(list);
2577 list= next;
2578 }
2579 }
2580
2581
2582 rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
2583 : rpl_filter(filter)
2584 {
2585 cached_charset_invalidate();
2586 }
2587
2588
2589 void rpl_sql_thread_info::cached_charset_invalidate()
2590 {
2591 DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
2592
2593 /* Full of zeroes means uninitialized. */
2594 bzero(cached_charset, sizeof(cached_charset));
2595 DBUG_VOID_RETURN;
2596 }
2597
2598
2599 bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
2600 {
2601 DBUG_ENTER("rpl_group_info::cached_charset_compare");
2602
2603 if (memcmp(cached_charset, charset, sizeof(cached_charset)))
2604 {
2605 memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
2606 DBUG_RETURN(1);
2607 }
2608 DBUG_RETURN(0);
2609 }
2610
2611
2612 /**
2613 Store the file and position where the slave's SQL thread are in the
2614 relay log.
2615
2616 Notes:
2617
2618 - This function should be called either from the slave SQL thread,
2619 or when the slave thread is not running. (It reads the
2620 group_{relay|master}_log_{pos|name} and delay fields in the rli
2621 object. These may only be modified by the slave SQL thread or by
2622 a client thread when the slave SQL thread is not running.)
2623
2624 - If there is an active transaction, then we do not update the
2625 position in the relay log. This is to ensure that we re-execute
2626 statements if we die in the middle of an transaction that was
2627 rolled back.
2628
2629 - As a transaction never spans binary logs, we don't have to handle
2630 the case where we do a relay-log-rotation in the middle of the
2631 transaction. If transactions could span several binlogs, we would
2632 have to ensure that we do not delete the relay log file where the
2633 transaction started before switching to a new relay log file.
2634
2635 - Error can happen if writing to file fails or if flushing the file
2636 fails.
2637
2638 @param rli The object representing the Relay_log_info.
2639
2640 @todo Change the log file information to a binary format to avoid
2641 calling longlong2str.
2642
2643 @return 0 on success, 1 on error.
2644 */
2645 bool Relay_log_info::flush()
2646 {
2647 bool error=0;
2648
2649 DBUG_ENTER("Relay_log_info::flush()");
2650
2651 IO_CACHE *file = &info_file;
2652 // 2*file name, 2*long long, 2*unsigned long, 6*'\n'
2653 char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
2654 my_b_seek(file, 0L);
2655 pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
2656 *pos++='\n';
2657 pos=strmov(pos, group_relay_log_name);
2658 *pos++='\n';
2659 pos=longlong10_to_str(group_relay_log_pos, pos, 10);
2660 *pos++='\n';
2661 pos=strmov(pos, group_master_log_name);
2662 *pos++='\n';
2663 pos=longlong10_to_str(group_master_log_pos, pos, 10);
2664 *pos++='\n';
2665 pos= longlong10_to_str(sql_delay, pos, 10);
2666 *pos++= '\n';
2667 if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)))
2668 error=1;
2669 if (flush_io_cache(file))
2670 error=1;
2671 if (sync_relayloginfo_period &&
2672 !error &&
2673 ++sync_counter >= sync_relayloginfo_period)
2674 {
2675 if (my_sync(info_fd, MYF(MY_WME)))
2676 error=1;
2677 sync_counter= 0;
2678 }
2679 /*
2680 Flushing the relay log is done by the slave I/O thread
2681 or by the user on STOP SLAVE.
2682 */
2683 DBUG_RETURN(error);
2684 }
2685
2686 #endif
2687