1 /* Copyright (c) 2006, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "sql_priv.h"
24 #include "unireg.h" // HAVE_*
25 #include "rpl_mi.h"
26 #include "rpl_rli.h"
27 #include "sql_base.h" // close_thread_tables
28 #include <my_dir.h> // For MY_STAT
29 #include "log_event.h" // Format_description_log_event, Log_event,
30 // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
31 // PREFIX_SQL_LOAD
32 #include "rpl_slave.h"
33 #include "rpl_utility.h"
34 #include "transaction.h"
35 #include "sql_parse.h" // end_trans, ROLLBACK
36 #include "rpl_slave.h"
37 #include "rpl_rli_pdb.h"
38 #include "rpl_info_factory.h"
39 #include <mysql/plugin.h>
40 #include <mysql/service_thd_wait.h>
41
42 using std::min;
43 using std::max;
44
45 /*
46 Please every time you add a new field to the relay log info, update
47 what follows. For now, this is just used to get the number of
48 fields.
49 */
50 const char* info_rli_fields[]=
51 {
52 "number_of_lines",
53 "group_relay_log_name",
54 "group_relay_log_pos",
55 "group_master_log_name",
56 "group_master_log_pos",
57 "sql_delay",
58 "number_of_workers",
59 "id"
60 };
61
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id)62 Relay_log_info::Relay_log_info(bool is_slave_recovery
63 #ifdef HAVE_PSI_INTERFACE
64 ,PSI_mutex_key *param_key_info_run_lock,
65 PSI_mutex_key *param_key_info_data_lock,
66 PSI_mutex_key *param_key_info_sleep_lock,
67 PSI_mutex_key *param_key_info_data_cond,
68 PSI_mutex_key *param_key_info_start_cond,
69 PSI_mutex_key *param_key_info_stop_cond,
70 PSI_mutex_key *param_key_info_sleep_cond
71 #endif
72 , uint param_id
73 )
74 :Rpl_info("SQL"
75 #ifdef HAVE_PSI_INTERFACE
76 ,param_key_info_run_lock, param_key_info_data_lock,
77 param_key_info_sleep_lock,
78 param_key_info_data_cond, param_key_info_start_cond,
79 param_key_info_stop_cond, param_key_info_sleep_cond
80 #endif
81 , param_id
82 ),
83 replicate_same_server_id(::replicate_same_server_id),
84 cur_log_fd(-1), relay_log(&sync_relaylog_period),
85 is_relay_log_recovery(is_slave_recovery),
86 save_temporary_tables(0),
87 cur_log_old_open_count(0), error_on_rli_init_info(false),
88 group_relay_log_pos(0), event_relay_log_pos(0),
89 m_group_master_log_pos(0),
90 gtid_set(global_sid_map, global_sid_lock),
91 log_space_total(0), ignore_log_space_limit(0),
92 sql_force_rotate_relay(false),
93 last_master_timestamp(0), slave_skip_counter(0),
94 abort_pos_wait(0), until_condition(UNTIL_NONE),
95 until_log_pos(0),
96 until_sql_gtids(global_sid_map),
97 until_sql_gtids_first_event(true),
98 retried_trans(0),
99 tables_to_lock(0), tables_to_lock_count(0),
100 rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
101 slave_parallel_workers(0),
102 exit_counter(0),
103 max_updated_index(0),
104 recovery_parallel_workers(0), checkpoint_seqno(0),
105 checkpoint_group(opt_mts_checkpoint_group),
106 recovery_groups_inited(false), mts_recovery_group_cnt(0),
107 mts_recovery_index(0), mts_recovery_group_seen_begin(0),
108 mts_group_status(MTS_NOT_IN_GROUP), reported_unsafe_warning(false),
109 rli_description_event(NULL),
110 sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
111 long_find_row_note_printed(false)
112 {
113 DBUG_ENTER("Relay_log_info::Relay_log_info");
114
115 #ifdef HAVE_PSI_INTERFACE
116 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
117 key_RELAYLOG_LOCK_commit,
118 key_RELAYLOG_LOCK_commit_queue,
119 key_RELAYLOG_LOCK_done,
120 key_RELAYLOG_LOCK_flush_queue,
121 key_RELAYLOG_LOCK_log,
122 key_RELAYLOG_LOCK_sync,
123 key_RELAYLOG_LOCK_sync_queue,
124 key_RELAYLOG_LOCK_xids,
125 key_RELAYLOG_COND_done,
126 key_RELAYLOG_update_cond,
127 key_RELAYLOG_prep_xids_cond,
128 key_file_relaylog,
129 key_file_relaylog_index);
130 #endif
131
132 group_relay_log_name[0]= event_relay_log_name[0]=
133 m_group_master_log_name[0]= 0;
134 until_log_name[0]= ign_master_log_name_end[0]= 0;
135 set_timespec_nsec(last_clock, 0);
136 memset(&cache_buf, 0, sizeof(cache_buf));
137 cached_charset_invalidate();
138
139 mysql_mutex_init(key_relay_log_info_log_space_lock,
140 &log_space_lock, MY_MUTEX_INIT_FAST);
141 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
142 mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
143 MY_MUTEX_INIT_FAST);
144 mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
145 mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
146 MY_MUTEX_INIT_FAST);
147 my_atomic_rwlock_init(&slave_open_temp_tables_lock);
148
149 relay_log.init_pthread_objects();
150 do_server_version_split(::server_version, slave_version_split);
151 last_retrieved_gtid.clear();
152 force_flush_postponed_due_to_split_trans= false;
153 DBUG_VOID_RETURN;
154 }
155
156 /**
157 The method to invoke at slave threads start
158 */
init_workers(ulong n_workers)159 void Relay_log_info::init_workers(ulong n_workers)
160 {
161 /*
162 Parallel slave parameters initialization is done regardless
163 whether the feature is or going to be active or not.
164 */
165 mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
166 mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
167 mts_last_online_stat= 0;
168 my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
169 }
170
171 /**
172 The method to invoke at slave threads stop
173 */
deinit_workers()174 void Relay_log_info::deinit_workers()
175 {
176 delete_dynamic(&workers);
177 }
178
~Relay_log_info()179 Relay_log_info::~Relay_log_info()
180 {
181 DBUG_ENTER("Relay_log_info::~Relay_log_info");
182
183 if (recovery_groups_inited)
184 bitmap_free(&recovery_groups);
185 mysql_mutex_destroy(&log_space_lock);
186 mysql_cond_destroy(&log_space_cond);
187 mysql_mutex_destroy(&pending_jobs_lock);
188 mysql_cond_destroy(&pending_jobs_cond);
189 mysql_mutex_destroy(&exit_count_lock);
190 my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
191 relay_log.cleanup();
192 set_rli_description_event(NULL);
193 last_retrieved_gtid.clear();
194
195 DBUG_VOID_RETURN;
196 }
197
198 /**
199 Method is called when MTS coordinator senses the relay-log name
200 has been changed.
201 It marks each Worker member with this fact to make an action
202 at time it will distribute a terminal event of a group to the Worker.
203
204 Worker receives the new name at the group commiting phase
205 @c Slave_worker::slave_worker_ends_group().
206 */
reset_notified_relay_log_change()207 void Relay_log_info::reset_notified_relay_log_change()
208 {
209 if (!is_parallel_exec())
210 return;
211 for (uint i= 0; i < workers.elements; i++)
212 {
213 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
214 w->relay_log_change_notified= FALSE;
215 }
216 }
217
218 /**
219 This method is called in mts_checkpoint_routine() to mark that each
220 worker is required to adapt to a new checkpoint data whose coordinates
221 are passed to it through GAQ index.
222
223 Worker notices the new checkpoint value at the group commit to reset
224 the current bitmap and starts using the clean bitmap indexed from zero
225 of being reset checkpoint_seqno.
226
227 New seconds_behind_master timestamp is installed.
228
229 @param shift number of bits to shift by Worker due to the
230 current checkpoint change.
231 @param new_ts new seconds_behind_master timestamp value
232 unless NULL. NULL could be due to FD event
233 or fake rotate event.
234 @param need_data_lock False if caller has locked @c data_lock
235 */
reset_notified_checkpoint(ulong shift,const time_t * const new_ts,bool need_data_lock)236 void Relay_log_info::reset_notified_checkpoint(ulong shift,
237 const time_t *const new_ts,
238 bool need_data_lock)
239 {
240 /*
241 If this is not a parallel execution we return immediately.
242 */
243 if (!is_parallel_exec())
244 return;
245
246 for (uint i= 0; i < workers.elements; i++)
247 {
248 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
249 /*
250 Reseting the notification information in order to force workers to
251 assign jobs with the new updated information.
252 Notice that the bitmap_shifted is accumulated to indicate how many
253 consecutive jobs were successfully processed.
254
255 The worker when assigning a new job will set the value back to
256 zero.
257 */
258 w->checkpoint_notified= FALSE;
259 w->bitmap_shifted= w->bitmap_shifted + shift;
260 /*
261 Zero shift indicates the caller rotates the master binlog.
262 The new name will be passed to W through the group descriptor
263 during the first post-rotation time scheduling.
264 */
265 if (shift == 0)
266 w->master_log_change_notified= false;
267
268 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
269 "worker->bitmap_shifted --> %lu, worker --> %u.",
270 shift, w->bitmap_shifted, i));
271 }
272 /*
273 There should not be a call where (shift == 0 && checkpoint_seqno != 0).
274
275 Then the new checkpoint sequence is updated by subtracting the number
276 of consecutive jobs that were successfully processed.
277 */
278 DBUG_ASSERT(!(shift == 0 && checkpoint_seqno != 0));
279 checkpoint_seqno= checkpoint_seqno - shift;
280 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
281 "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
282
283 if (new_ts)
284 {
285 if (need_data_lock)
286 mysql_mutex_lock(&data_lock);
287 else
288 mysql_mutex_assert_owner(&data_lock);
289 last_master_timestamp= *new_ts;
290 if (need_data_lock)
291 mysql_mutex_unlock(&data_lock);
292 }
293 }
294
295 /**
296 Reset recovery info from Worker info table and
297 mark MTS recovery is completed.
298
299 @return false on success true when @c reset_notified_checkpoint failed.
300 */
mts_finalize_recovery()301 bool Relay_log_info::mts_finalize_recovery()
302 {
303 bool ret= false;
304 uint i;
305 uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
306
307 DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
308
309 for (i= 0; !ret && i < workers.elements; i++)
310 {
311 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
312 ret= w->reset_recovery_info();
313 DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
314 }
315 /*
316 The loop is traversed in the worker index descending order due
317 to specifics of the Worker table repository that does not like
318 even temporary holes. Therefore stale records are deleted
319 from the tail.
320 */
321 DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
322 {DBUG_SET("+d,mts_worker_thread_init_fails");});
323
324 for (i= recovery_parallel_workers; i > workers.elements && !ret; i--)
325 {
326 Slave_worker *w=
327 Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
328 /*
329 If an error occurs during the above create_worker call, the newly created
330 worker object gets deleted within the above function call itself and only
331 NULL is returned. Hence the following check has been added to verify
332 that a valid worker object exists.
333 */
334 if (w)
335 {
336 ret= w->remove_info();
337 delete w;
338 }
339 else
340 {
341 ret= true;
342 goto err;
343 }
344 }
345 recovery_parallel_workers= slave_parallel_workers;
346
347 err:
348 DBUG_RETURN(ret);
349 }
350
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)351 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
352 {
353 MY_STAT s;
354 DBUG_ENTER("add_relay_log");
355 mysql_mutex_assert_owner(&rli->log_space_lock);
356 if (!mysql_file_stat(key_file_relaylog,
357 linfo->log_file_name, &s, MYF(0)))
358 {
359 sql_print_error("log %s listed in the index, but failed to stat.",
360 linfo->log_file_name);
361 DBUG_RETURN(1);
362 }
363 rli->log_space_total += s.st_size;
364 #ifndef DBUG_OFF
365 char buf[22];
366 DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
367 #endif
368 DBUG_RETURN(0);
369 }
370
count_relay_log_space()371 int Relay_log_info::count_relay_log_space()
372 {
373 LOG_INFO flinfo;
374 DBUG_ENTER("Relay_log_info::count_relay_log_space");
375 mysql_mutex_lock(&log_space_lock);
376 log_space_total= 0;
377 if (relay_log.find_log_pos(&flinfo, NullS, 1))
378 {
379 sql_print_error("Could not find first log while counting relay log space.");
380 mysql_mutex_unlock(&log_space_lock);
381 DBUG_RETURN(1);
382 }
383 do
384 {
385 if (add_relay_log(this, &flinfo))
386 {
387 mysql_mutex_unlock(&log_space_lock);
388 DBUG_RETURN(1);
389 }
390 } while (!relay_log.find_next_log(&flinfo, 1));
391 /*
392 As we have counted everything, including what may have written in a
393 preceding write, we must reset bytes_written, or we may count some space
394 twice.
395 */
396 relay_log.reset_bytes_written();
397 mysql_mutex_unlock(&log_space_lock);
398 DBUG_RETURN(0);
399 }
400
401 /**
402 Resets UNTIL condition for Relay_log_info
403 */
404
clear_until_condition()405 void Relay_log_info::clear_until_condition()
406 {
407 DBUG_ENTER("clear_until_condition");
408
409 until_condition= Relay_log_info::UNTIL_NONE;
410 until_log_name[0]= 0;
411 until_log_pos= 0;
412 until_sql_gtids.clear();
413 until_sql_gtids_first_event= true;
414 DBUG_VOID_RETURN;
415 }
416
417 /**
418 Opens and intialize the given relay log. Specifically, it does what follows:
419
420 - Closes old open relay log files.
421 - If we are using the same relay log as the running IO-thread, then sets.
422 rli->cur_log to point to the same IO_CACHE entry.
423 - If not, opens the 'log' binary file.
424
425 @todo check proper initialization of
426 group_master_log_name/group_master_log_pos. /alfranio
427
428 @param rli[in] Relay information (will be initialized)
429 @param log[in] Name of relay log file to read from. NULL = First log
430 @param pos[in] Position in relay log file
431 @param need_data_lock[in] If true, this function will acquire the
432 relay_log.data_lock(); otherwise the caller should already have
433 acquired it.
434 @param errmsg[out] On error, this function will store a pointer to
435 an error message here
436 @param keep_looking_for_fd[in] If true, this function will
437 look for a Format_description_log_event. We only need this when the
438 SQL thread starts and opens an existing relay log and has to execute
439 it (possibly from an offset >4); then we need to read the first
440 event of the relay log to be able to parse the events we have to
441 execute.
442
443 @retval 0 ok,
444 @retval 1 error. In this case, *errmsg is set to point to the error
445 message.
446 */
447
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)448 int Relay_log_info::init_relay_log_pos(const char* log,
449 ulonglong pos, bool need_data_lock,
450 const char** errmsg,
451 bool keep_looking_for_fd)
452 {
453 DBUG_ENTER("Relay_log_info::init_relay_log_pos");
454 DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
455
456 *errmsg=0;
457 const char* errmsg_fmt= 0;
458 static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
459 mysql_mutex_t *log_lock= relay_log.get_log_lock();
460
461 if (need_data_lock)
462 mysql_mutex_lock(&data_lock);
463 else
464 mysql_mutex_assert_owner(&data_lock);
465
466 /*
467 By default the relay log is in binlog format 3 (4.0).
468 Even if format is 4, this will work enough to read the first event
469 (Format_desc) (remember that format 4 is just lenghtened compared to format
470 3; format 3 is a prefix of format 4).
471 */
472 set_rli_description_event(new Format_description_log_event(3));
473
474 mysql_mutex_lock(log_lock);
475
476 /* Close log file and free buffers if it's already open */
477 if (cur_log_fd >= 0)
478 {
479 end_io_cache(&cache_buf);
480 mysql_file_close(cur_log_fd, MYF(MY_WME));
481 cur_log_fd = -1;
482 }
483
484 group_relay_log_pos= event_relay_log_pos= pos;
485
486 /*
487 Test to see if the previous run was with the skip of purging
488 If yes, we do not purge when we restart
489 */
490 if (relay_log.find_log_pos(&linfo, NullS, 1))
491 {
492 *errmsg="Could not find first log during relay log initialization";
493 goto err;
494 }
495
496 if (log && relay_log.find_log_pos(&linfo, log, 1))
497 {
498 errmsg_fmt= "Could not find target log file mentioned in "
499 "relay log info in the index file '%s' during "
500 "relay log initialization";
501 sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
502 *errmsg= errmsg_buff;
503 goto err;
504 }
505
506 strmake(group_relay_log_name, linfo.log_file_name,
507 sizeof(group_relay_log_name) - 1);
508 strmake(event_relay_log_name, linfo.log_file_name,
509 sizeof(event_relay_log_name) - 1);
510
511 if (relay_log.is_active(linfo.log_file_name))
512 {
513 /*
514 The IO thread is using this log file.
515 In this case, we will use the same IO_CACHE pointer to
516 read data as the IO thread is using to write data.
517 */
518 my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
519 if (check_binlog_magic(cur_log, errmsg))
520 goto err;
521 cur_log_old_open_count=relay_log.get_open_count();
522 }
523 else
524 {
525 /*
526 Open the relay log and set cur_log to point at this one
527 */
528 if ((cur_log_fd=open_binlog_file(&cache_buf,
529 linfo.log_file_name,errmsg)) < 0)
530 goto err;
531 cur_log = &cache_buf;
532 }
533 /*
534 In all cases, check_binlog_magic() has been called so we're at offset 4 for
535 sure.
536 */
537 if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
538 {
539 Log_event* ev;
540 while (keep_looking_for_fd)
541 {
542 /*
543 Read the possible Format_description_log_event; if position
544 was 4, no need, it will be read naturally.
545 */
546 DBUG_PRINT("info",("looking for a Format_description_log_event"));
547
548 if (my_b_tell(cur_log) >= pos)
549 break;
550
551 /*
552 Because of we have data_lock and log_lock, we can safely read an
553 event
554 */
555 if (!(ev= Log_event::read_log_event(cur_log, 0,
556 rli_description_event,
557 opt_slave_sql_verify_checksum)))
558 {
559 DBUG_PRINT("info",("could not read event, cur_log->error=%d",
560 cur_log->error));
561 if (cur_log->error) /* not EOF */
562 {
563 *errmsg= "I/O error reading event at position 4";
564 goto err;
565 }
566 break;
567 }
568 else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
569 {
570 DBUG_PRINT("info",("found Format_description_log_event"));
571 set_rli_description_event((Format_description_log_event *)ev);
572 /*
573 As ev was returned by read_log_event, it has passed is_valid(), so
574 my_malloc() in ctor worked, no need to check again.
575 */
576 /*
577 Ok, we found a Format_description event. But it is not sure that this
578 describes the whole relay log; indeed, one can have this sequence
579 (starting from position 4):
580 Format_desc (of slave)
581 Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
582 Rotate (of master)
583 Format_desc (of master)
584 So the Format_desc which really describes the rest of the relay log
585 can be the 3rd or the 4th event (depending on GTIDs being enabled or
586 not, it can't be further than that, because we rotate
587 the relay log when we queue a Rotate event from the master).
588 But what describes the Rotate is the first Format_desc.
589 So what we do is:
590 go on searching for Format_description events, until you exceed the
591 position (argument 'pos') or until you find an event other than
592 Previous-GTIDs, Rotate or Format_desc.
593 */
594 }
595 else
596 {
597 DBUG_PRINT("info",("found event of another type=%d",
598 ev->get_type_code()));
599 keep_looking_for_fd=
600 (ev->get_type_code() == ROTATE_EVENT ||
601 ev->get_type_code() == PREVIOUS_GTIDS_LOG_EVENT);
602 delete ev;
603 }
604 }
605 my_b_seek(cur_log,(off_t)pos);
606 #ifndef DBUG_OFF
607 {
608 char llbuf1[22], llbuf2[22];
609 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
610 llstr(my_b_tell(cur_log),llbuf1),
611 llstr(get_event_relay_log_pos(),llbuf2)));
612 }
613 #endif
614
615 }
616
617 err:
618 /*
619 If we don't purge, we can't honour relay_log_space_limit ;
620 silently discard it
621 */
622 if (!relay_log_purge)
623 {
624 log_space_limit= 0; // todo: consider to throw a warning at least
625 }
626 mysql_cond_broadcast(&data_cond);
627
628 mysql_mutex_unlock(log_lock);
629
630 if (need_data_lock)
631 mysql_mutex_unlock(&data_lock);
632 if (!rli_description_event->is_valid() && !*errmsg)
633 *errmsg= "Invalid Format_description log event; could be out of memory";
634
635 DBUG_RETURN ((*errmsg) ? 1 : 0);
636 }
637
638 /**
639 Waits until the SQL thread reaches (has executed up to) the
640 log/position or timed out.
641
642 SYNOPSIS
643 @param[in] thd client thread that sent @c SELECT @c MASTER_POS_WAIT,
644 @param[in] log_name log name to wait for,
645 @param[in] log_pos position to wait for,
646 @param[in] timeout @c timeout in seconds before giving up waiting.
647 @c timeout is double whereas it should be ulong; but this is
648 to catch if the user submitted a negative timeout.
649
650 @retval -2 improper arguments (log_pos<0)
651 or slave not running, or master info changed
652 during the function's execution,
653 or client thread killed. -2 is translated to NULL by caller,
654 @retval -1 timed out
655 @retval >=0 number of log events the function had to wait
656 before reaching the desired log/position
657 */
658
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)659 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
660 longlong log_pos,
661 double timeout)
662 {
663 int event_count = 0;
664 ulong init_abort_pos_wait;
665 int error=0;
666 struct timespec abstime; // for timeout checking
667 PSI_stage_info old_stage;
668 DBUG_ENTER("Relay_log_info::wait_for_pos");
669
670 if (!inited)
671 DBUG_RETURN(-2);
672
673 DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
674 log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
675
676 set_timespec_nsec(abstime, (ulonglong) timeout * 1000000000ULL);
677 mysql_mutex_lock(&data_lock);
678 thd->ENTER_COND(&data_cond, &data_lock,
679 &stage_waiting_for_the_slave_thread_to_advance_position,
680 &old_stage);
681 /*
682 This function will abort when it notices that some CHANGE MASTER or
683 RESET MASTER has changed the master info.
684 To catch this, these commands modify abort_pos_wait ; We just monitor
685 abort_pos_wait and see if it has changed.
686 Why do we have this mechanism instead of simply monitoring slave_running
687 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
688 the SQL thread be stopped?
689 This is becasue if someones does:
690 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
691 the change may happen very quickly and we may not notice that
692 slave_running briefly switches between 1/0/1.
693 */
694 init_abort_pos_wait= abort_pos_wait;
695
696 /*
697 We'll need to
698 handle all possible log names comparisons (e.g. 999 vs 1000).
699 We use ulong for string->number conversion ; this is no
700 stronger limitation than in find_uniq_filename in sql/log.cc
701 */
702 ulong log_name_extension;
703 char log_name_tmp[FN_REFLEN]; //make a char[] from String
704
705 strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
706
707 char *p= fn_ext(log_name_tmp);
708 char *p_end;
709 if (!*p || log_pos<0)
710 {
711 error= -2; //means improper arguments
712 goto err;
713 }
714 // Convert 0-3 to 4
715 log_pos= max<ulong>(log_pos, BIN_LOG_HEADER_SIZE);
716 /* p points to '.' */
717 log_name_extension= strtoul(++p, &p_end, 10);
718 /*
719 p_end points to the first invalid character.
720 If it equals to p, no digits were found, error.
721 If it contains '\0' it means conversion went ok.
722 */
723 if (p_end==p || *p_end)
724 {
725 error= -2;
726 goto err;
727 }
728
729 /* The "compare and wait" main loop */
730 while (!thd->killed &&
731 init_abort_pos_wait == abort_pos_wait &&
732 slave_running)
733 {
734 bool pos_reached;
735 int cmp_result= 0;
736 const char * const group_master_log_name= get_group_master_log_name();
737 const ulonglong group_master_log_pos= get_group_master_log_pos();
738
739 DBUG_PRINT("info",
740 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
741 init_abort_pos_wait, abort_pos_wait));
742 DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
743 group_master_log_name, (ulong) group_master_log_pos));
744
745 /*
746 group_master_log_name can be "", if we are just after a fresh
747 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
748 (before we have executed one Rotate event from the master) or
749 (rare) if the user is doing a weird slave setup (see next
750 paragraph). If group_master_log_name is "", we assume we don't
751 have enough info to do the comparison yet, so we just wait until
752 more data. In this case master_log_pos is always 0 except if
753 somebody (wrongly) sets this slave to be a slave of itself
754 without using --replicate-same-server-id (an unsupported
755 configuration which does nothing), then group_master_log_pos
756 will grow and group_master_log_name will stay "".
757 */
758 if (*group_master_log_name)
759 {
760 const char * const basename= (group_master_log_name +
761 dirname_length(group_master_log_name));
762 /*
763 First compare the parts before the extension.
764 Find the dot in the master's log basename,
765 and protect against user's input error :
766 if the names do not match up to '.' included, return error
767 */
768 char *q= (char*)(fn_ext(basename)+1);
769 if (strncmp(basename, log_name_tmp, (int)(q-basename)))
770 {
771 error= -2;
772 break;
773 }
774 // Now compare extensions.
775 char *q_end;
776 ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
777 if (group_master_log_name_extension < log_name_extension)
778 cmp_result= -1 ;
779 else
780 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
781
782 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
783 cmp_result > 0);
784 if (pos_reached || thd->killed)
785 break;
786 }
787
788 //wait for master update, with optional timeout.
789
790 DBUG_PRINT("info",("Waiting for master update"));
791 /*
792 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
793 will wake us up.
794 */
795 thd_wait_begin(thd, THD_WAIT_BINLOG);
796 if (timeout > 0)
797 {
798 /*
799 Note that mysql_cond_timedwait checks for the timeout
800 before for the condition ; i.e. it returns ETIMEDOUT
801 if the system time equals or exceeds the time specified by abstime
802 before the condition variable is signaled or broadcast, _or_ if
803 the absolute time specified by abstime has already passed at the time
804 of the call.
805 For that reason, mysql_cond_timedwait will do the "timeoutting" job
806 even if its condition is always immediately signaled (case of a loaded
807 master).
808 */
809 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
810 }
811 else
812 mysql_cond_wait(&data_cond, &data_lock);
813 thd_wait_end(thd);
814 DBUG_PRINT("info",("Got signal of master update or timed out"));
815 if (error == ETIMEDOUT || error == ETIME)
816 {
817 #ifndef DBUG_OFF
818 /*
819 Doing this to generate a stack trace and make debugging
820 easier.
821 */
822 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
823 DBUG_ASSERT(0);
824 #endif
825 error= -1;
826 break;
827 }
828 error=0;
829 event_count++;
830 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
831 }
832
833 err:
834 thd->EXIT_COND(&old_stage);
835 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
836 improper_arguments: %d timed_out: %d",
837 thd->killed_errno(),
838 (int) (init_abort_pos_wait != abort_pos_wait),
839 (int) slave_running,
840 (int) (error == -2),
841 (int) (error == -1)));
842 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
843 !slave_running)
844 {
845 error= -2;
846 }
847 DBUG_RETURN( error ? error : event_count );
848 }
849
850 /*
851 TODO: This is a duplicated code that needs to be simplified.
852 This will be done while developing all possible sync options.
853 See WL#3584's specification.
854
855 /Alfranio
856 */
wait_for_gtid_set(THD * thd,String * gtid,double timeout)857 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
858 double timeout)
859 {
860 int event_count = 0;
861 ulong init_abort_pos_wait;
862 int error=0;
863 struct timespec abstime; // for timeout checking
864 PSI_stage_info old_stage;
865 DBUG_ENTER("Relay_log_info::wait_for_gtid_set");
866
867 if (!inited)
868 DBUG_RETURN(-2);
869
870 DBUG_PRINT("info", ("Waiting for %s timeout %f", gtid->c_ptr_safe(),
871 timeout));
872
873 set_timespec_nsec(abstime, (ulonglong) timeout * 1000000000ULL);
874 mysql_mutex_lock(&data_lock);
875 thd->ENTER_COND(&data_cond, &data_lock,
876 &stage_waiting_for_the_slave_thread_to_advance_position,
877 &old_stage);
878 /*
879 This function will abort when it notices that some CHANGE MASTER or
880 RESET MASTER has changed the master info.
881 To catch this, these commands modify abort_pos_wait ; We just monitor
882 abort_pos_wait and see if it has changed.
883 Why do we have this mechanism instead of simply monitoring slave_running
884 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
885 the SQL thread be stopped?
886 This is becasue if someones does:
887 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
888 the change may happen very quickly and we may not notice that
889 slave_running briefly switches between 1/0/1.
890 */
891 init_abort_pos_wait= abort_pos_wait;
892 Gtid_set wait_gtid_set(global_sid_map);
893 global_sid_lock->rdlock();
894 if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
895 {
896 global_sid_lock->unlock();
897 goto err;
898 }
899 global_sid_lock->unlock();
900
901 /* The "compare and wait" main loop */
902 while (!thd->killed &&
903 init_abort_pos_wait == abort_pos_wait &&
904 slave_running)
905 {
906 DBUG_PRINT("info",
907 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
908 init_abort_pos_wait, abort_pos_wait));
909
910 //wait for master update, with optional timeout.
911
912 global_sid_lock->wrlock();
913 const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
914 const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
915
916 DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
917 "!is_intersection_nonempty: %d",
918 gtid->c_ptr_safe(), wait_gtid_set.is_subset(logged_gtids),
919 !owned_gtids->is_intersection_nonempty(&wait_gtid_set)));
920 logged_gtids->dbug_print("gtid_executed:");
921 owned_gtids->dbug_print("owned_gtids:");
922
923 /*
924 Since commit is performed after log to binary log, we must also
925 check if any GTID of wait_gtid_set is not yet committed.
926 */
927 if (wait_gtid_set.is_subset(logged_gtids) &&
928 !owned_gtids->is_intersection_nonempty(&wait_gtid_set))
929 {
930 global_sid_lock->unlock();
931 break;
932 }
933 global_sid_lock->unlock();
934
935 DBUG_PRINT("info",("Waiting for master update"));
936
937 /*
938 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
939 will wake us up.
940 */
941 thd_wait_begin(thd, THD_WAIT_BINLOG);
942 if (timeout > 0)
943 {
944 /*
945 Note that mysql_cond_timedwait checks for the timeout
946 before for the condition ; i.e. it returns ETIMEDOUT
947 if the system time equals or exceeds the time specified by abstime
948 before the condition variable is signaled or broadcast, _or_ if
949 the absolute time specified by abstime has already passed at the time
950 of the call.
951 For that reason, mysql_cond_timedwait will do the "timeoutting" job
952 even if its condition is always immediately signaled (case of a loaded
953 master).
954 */
955 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
956 }
957 else
958 mysql_cond_wait(&data_cond, &data_lock);
959 thd_wait_end(thd);
960 DBUG_PRINT("info",("Got signal of master update or timed out"));
961 if (error == ETIMEDOUT || error == ETIME)
962 {
963 #ifndef DBUG_OFF
964 /*
965 Doing this to generate a stack trace and make debugging
966 easier.
967 */
968 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
969 DBUG_ASSERT(0);
970 #endif
971 error= -1;
972 break;
973 }
974 error=0;
975 event_count++;
976 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
977 }
978
979 err:
980 thd->EXIT_COND(&old_stage);
981 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
982 improper_arguments: %d timed_out: %d",
983 thd->killed_errno(),
984 (int) (init_abort_pos_wait != abort_pos_wait),
985 (int) slave_running,
986 (int) (error == -2),
987 (int) (error == -1)));
988 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
989 !slave_running)
990 {
991 error= -2;
992 }
993 DBUG_RETURN( error ? error : event_count );
994 }
995
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)996 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
997 bool need_data_lock)
998 {
999 int error= 0;
1000 DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
1001
1002 if (need_data_lock)
1003 {
1004 const ulong timeout= info_thd->variables.lock_wait_timeout;
1005
1006 /*
1007 Acquire protection against global BINLOG lock before rli->data_lock is
1008 locked (otherwise we would also block SHOW SLAVE STATUS).
1009 */
1010 DBUG_ASSERT(!info_thd->backup_binlog_lock.is_acquired());
1011 DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
1012 mysql_mutex_assert_not_owner(&data_lock);
1013 if (info_thd->backup_binlog_lock.acquire_protection(info_thd, MDL_EXPLICIT,
1014 timeout))
1015 DBUG_RETURN(1);
1016
1017 mysql_mutex_lock(&data_lock);
1018 }
1019 else
1020 {
1021 mysql_mutex_assert_owner(&data_lock);
1022 DBUG_ASSERT(info_thd->backup_binlog_lock.is_protection_acquired());
1023 }
1024
1025 inc_event_relay_log_pos();
1026 group_relay_log_pos= event_relay_log_pos;
1027 strmake(group_relay_log_name,event_relay_log_name,
1028 sizeof(group_relay_log_name)-1);
1029
1030 notify_group_relay_log_name_update();
1031
1032 /*
1033 In 4.x we used the event's len to compute the positions here. This is
1034 wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1035 then the event's len is not what is was in the master's binlog, so this
1036 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1037 replication: Exec_master_log_pos is wrong). Only way to solve this is to
1038 have the original offset of the end of the event the relay log. This is
1039 what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1040 of log_pos in 4.0 was to compute the end_log_pos; so better to store
1041 end_log_pos instead of begin_log_pos.
1042 If we had not done this fix here, the problem would also have appeared
1043 when the slave and master are 5.0 but with different event length (for
1044 example the slave is more recent than the master and features the event
1045 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1046 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1047 value which would lead to badly broken replication.
1048 Even the relay_log_pos will be corrupted in this case, because the len is
1049 the relay log is not "val".
1050 With the end_log_pos solution, we avoid computations involving lengthes.
1051 */
1052 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
1053 (long) log_pos, (long) get_group_master_log_pos()));
1054
1055 if (log_pos > 0) // 3.23 binlogs don't have log_posx
1056 set_group_master_log_pos(log_pos);
1057
1058 /*
1059 In MTS mode FD or Rotate event commit their solitary group to
1060 Coordinator's info table. Callers make sure that Workers have been
1061 executed all assignements.
1062 Broadcast to master_pos_wait() waiters should be done after
1063 the table is updated.
1064 */
1065 DBUG_ASSERT(!is_parallel_exec() ||
1066 mts_group_status != Relay_log_info::MTS_IN_GROUP);
1067 /*
1068 We do not force synchronization at this point, note the
1069 parameter false, because a non-transactional change is
1070 being committed.
1071
1072 For that reason, the synchronization here is subjected to
1073 the option sync_relay_log_info.
1074
1075 See sql/rpl_rli.h for further information on this behavior.
1076 */
1077 error= flush_info(FALSE);
1078
1079 mysql_cond_broadcast(&data_cond);
1080 if (need_data_lock)
1081 {
1082 mysql_mutex_unlock(&data_lock);
1083
1084 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1085 info_thd->backup_binlog_lock.release_protection(info_thd);
1086 }
1087
1088 DBUG_RETURN(error);
1089 }
1090
1091
close_temporary_tables()1092 void Relay_log_info::close_temporary_tables()
1093 {
1094 TABLE *table,*next;
1095 DBUG_ENTER("Relay_log_info::close_temporary_tables");
1096
1097 for (table=save_temporary_tables ; table ; table=next)
1098 {
1099 next=table->next;
1100 /*
1101 Don't ask for disk deletion. For now, anyway they will be deleted when
1102 slave restarts, but it is a better intention to not delete them.
1103 */
1104 DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1105 close_temporary(table, 1, 0);
1106 }
1107 save_temporary_tables= 0;
1108 slave_open_temp_tables= 0;
1109 DBUG_VOID_RETURN;
1110 }
1111
1112 /**
1113 Purges relay logs. It assumes to have a run lock on rli and that no
1114 slave thread are running.
1115
1116 @param[in] THD connection,
1117 @param[in] just_reset if false, it tells that logs should be purged
1118 and @c init_relay_log_pos() should be called,
1119 @errmsg[out] errmsg store pointer to an error message.
1120
1121 @retval 0 successfuly executed,
1122 @retval 1 otherwise error, where errmsg is set to point to the error message.
1123 */
1124
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg)1125 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1126 const char** errmsg)
1127 {
1128 int error=0;
1129 const char *ln;
1130 char name_buf[FN_REFLEN];
1131 DBUG_ENTER("Relay_log_info::purge_relay_logs");
1132 bool binlog_prot_acquired= false;
1133
1134 /*
1135 Even if inited==0, we still try to empty master_log_* variables. Indeed,
1136 inited==0 does not imply that they already are empty.
1137
1138 It could be that slave's info initialization partly succeeded: for example
1139 if relay-log.info existed but *relay-bin*.* have been manually removed,
1140 init_info reads the old relay-log.info and fills rli->master_log_*, then
1141 init_info checks for the existence of the relay log, this fails and
1142 init_info leaves inited to 0.
1143 In that pathological case, master_log_pos* will be properly reinited at
1144 the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1145 purge_relay_logs, will delete bogus *.info files or replace them with
1146 correct files), however if the user does SHOW SLAVE STATUS before START
1147 SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1148 master_log_* for SHOW SLAVE STATUS to display fine in any case.
1149 */
1150
1151 if (!thd->backup_binlog_lock.is_acquired())
1152 {
1153 const ulong timeout= thd->variables.lock_wait_timeout;
1154
1155 DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
1156 mysql_mutex_assert_not_owner(&data_lock);
1157 if (thd->backup_binlog_lock.acquire_protection(thd, MDL_EXPLICIT,
1158 timeout))
1159 DBUG_RETURN(1);
1160
1161 binlog_prot_acquired= true;
1162 }
1163
1164 set_group_master_log_name("");
1165 set_group_master_log_pos(0);
1166
1167 if (!inited)
1168 {
1169 DBUG_PRINT("info", ("inited == 0"));
1170 if (error_on_rli_init_info)
1171 {
1172 ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1173 1, name_buf);
1174 if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1175 {
1176 sql_print_error("Unable to purge relay log files. Failed to open relay "
1177 "log index file:%s.", relay_log.get_index_fname());
1178
1179 if (binlog_prot_acquired)
1180 {
1181 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1182 thd->backup_binlog_lock.release_protection(thd);
1183 }
1184 DBUG_RETURN(1);
1185 }
1186 mysql_mutex_lock(&mi->data_lock);
1187 if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1188 (max_relay_log_size ? max_relay_log_size :
1189 max_binlog_size), true,
1190 true/*need_lock_log=true*/,
1191 true/*need_lock_index=true*/,
1192 true/*need_sid_lock=true*/,
1193 mi->get_mi_description_event()))
1194 {
1195 sql_print_error("Unable to purge relay log files. Failed to open relay "
1196 "log file:%s.", relay_log.get_log_fname());
1197 mysql_mutex_unlock(&mi->data_lock);
1198 if (binlog_prot_acquired)
1199 {
1200 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1201 thd->backup_binlog_lock.release_protection(thd);
1202 }
1203 DBUG_RETURN(1);
1204 }
1205 mysql_mutex_unlock(&mi->data_lock);
1206 }
1207 else
1208 {
1209 if (binlog_prot_acquired)
1210 {
1211 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1212 thd->backup_binlog_lock.release_protection(thd);
1213 }
1214 DBUG_RETURN(0);
1215 }
1216 }
1217 else
1218 {
1219 DBUG_ASSERT(slave_running == 0);
1220 DBUG_ASSERT(mi->slave_running == 0);
1221 }
1222
1223 slave_skip_counter= 0;
1224 mysql_mutex_lock(&data_lock);
1225
1226 /*
1227 we close the relay log fd possibly left open by the slave SQL thread,
1228 to be able to delete it; the relay log fd possibly left open by the slave
1229 I/O thread will be closed naturally in reset_logs() by the
1230 close(LOG_CLOSE_TO_BE_OPENED) call
1231 */
1232 if (cur_log_fd >= 0)
1233 {
1234 end_io_cache(&cache_buf);
1235 my_close(cur_log_fd, MYF(MY_WME));
1236 cur_log_fd= -1;
1237 }
1238
1239 if (relay_log.reset_logs(thd))
1240 {
1241 *errmsg = "Failed during log reset";
1242 error=1;
1243 goto err;
1244 }
1245 /* Save name of used relay log file */
1246 strmake(group_relay_log_name, relay_log.get_log_fname(),
1247 sizeof(group_relay_log_name)-1);
1248 strmake(event_relay_log_name, relay_log.get_log_fname(),
1249 sizeof(event_relay_log_name)-1);
1250 group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1251 if (count_relay_log_space())
1252 {
1253 *errmsg= "Error counting relay log space";
1254 error= 1;
1255 goto err;
1256 }
1257 if (!just_reset)
1258 error= init_relay_log_pos(group_relay_log_name,
1259 group_relay_log_pos,
1260 false/*need_data_lock=false*/, errmsg, 0);
1261
1262 if (!inited && error_on_rli_init_info)
1263 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1264 true/*need_lock_log=true*/,
1265 true/*need_lock_index=true*/);
1266
1267 err:
1268 #ifndef DBUG_OFF
1269 char buf[22];
1270 #endif
1271 DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1272 mysql_mutex_unlock(&data_lock);
1273
1274 if (binlog_prot_acquired)
1275 {
1276 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1277 thd->backup_binlog_lock.release_protection(thd);
1278 }
1279
1280 DBUG_RETURN(error);
1281 }
1282
1283
1284 /**
1285 Checks if condition stated in UNTIL clause of START SLAVE is reached.
1286
1287 Specifically, it checks if UNTIL condition is reached. Uses caching result
1288 of last comparison of current log file name and target log file name. So
1289 cached value should be invalidated if current log file name changes (see
1290 @c Relay_log_info::notify_... functions).
1291
1292 This caching is needed to avoid of expensive string comparisons and
1293 @c strtol() conversions needed for log names comparison. We don't need to
1294 compare them each time this function is called, we only need to do this
1295 when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1296 need to do this only after @c Rotate_log_event::do_apply_event() (which is
1297 rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1298 condition then we should invalidate cached comarison value after
1299 @c inc_group_relay_log_pos() which called for each group of events (so we
1300 have some benefit if we have something like queries that use
1301 autoincrement or if we have transactions).
1302
1303 Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1304
1305 @param master_beg_pos position of the beginning of to be executed event
1306 (not @c log_pos member of the event that points to
1307 the beginning of the following event)
1308
1309 @retval true condition met or error happened (condition seems to have
1310 bad log file name),
1311 @retval false condition not met.
1312 */
1313
is_until_satisfied(THD * thd,Log_event * ev)1314 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1315 {
1316 char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1317 "condition is bad.";
1318 DBUG_ENTER("Relay_log_info::is_until_satisfied");
1319
1320 switch (until_condition)
1321 {
1322 case UNTIL_MASTER_POS:
1323 case UNTIL_RELAY_POS:
1324 {
1325 const char *log_name= NULL;
1326 ulonglong log_pos= 0;
1327
1328 if (until_condition == UNTIL_MASTER_POS)
1329 {
1330 if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1331 DBUG_RETURN(false);
1332 log_name= get_group_master_log_name();
1333 log_pos= (!ev || is_in_group() || !ev->log_pos) ?
1334 get_group_master_log_pos() : ev->log_pos - ev->data_written;
1335 }
1336 else
1337 { /* until_condition == UNTIL_RELAY_POS */
1338 log_name= group_relay_log_name;
1339 log_pos= group_relay_log_pos;
1340 }
1341
1342 #ifndef DBUG_OFF
1343 {
1344 char buf[32];
1345 DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1346 get_group_master_log_name(),
1347 llstr(get_group_master_log_pos(), buf)));
1348 DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1349 group_relay_log_name, llstr(group_relay_log_pos, buf)));
1350 DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1351 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1352 log_name, llstr(log_pos, buf)));
1353 DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1354 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1355 until_log_name, llstr(until_log_pos, buf)));
1356 }
1357 #endif
1358
1359 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1360 {
1361 /*
1362 We have no cached comparison results so we should compare log names
1363 and cache result.
1364 If we are after RESET SLAVE, and the SQL slave thread has not processed
1365 any event yet, it could be that group_master_log_name is "". In that case,
1366 just wait for more events (as there is no sensible comparison to do).
1367 */
1368
1369 if (*log_name)
1370 {
1371 const char *basename= log_name + dirname_length(log_name);
1372
1373 const char *q= (const char*)(fn_ext(basename)+1);
1374 if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1375 {
1376 /* Now compare extensions. */
1377 char *q_end;
1378 ulong log_name_extension= strtoul(q, &q_end, 10);
1379 if (log_name_extension < until_log_name_extension)
1380 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1381 else
1382 until_log_names_cmp_result=
1383 (log_name_extension > until_log_name_extension) ?
1384 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1385 }
1386 else
1387 {
1388 /* Base names do not match, so we abort */
1389 sql_print_error("%s", error_msg);
1390 DBUG_RETURN(true);
1391 }
1392 }
1393 else
1394 DBUG_RETURN(until_log_pos == 0);
1395 }
1396
1397 if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1398 log_pos >= until_log_pos) ||
1399 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1400 {
1401 char buf[22];
1402 sql_print_information("Slave SQL thread stopped because it reached its"
1403 " UNTIL position %s", llstr(until_pos(), buf));
1404 DBUG_RETURN(true);
1405 }
1406 DBUG_RETURN(false);
1407 }
1408
1409 case UNTIL_SQL_BEFORE_GTIDS:
1410 // We only need to check once if logged_gtids set contains any of the until_sql_gtids.
1411 if (until_sql_gtids_first_event)
1412 {
1413 until_sql_gtids_first_event= false;
1414 global_sid_lock->wrlock();
1415 /* Check if until GTIDs were already applied. */
1416 const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1417 if (until_sql_gtids.is_intersection_nonempty(logged_gtids))
1418 {
1419 char *buffer= until_sql_gtids.to_string();
1420 global_sid_lock->unlock();
1421 sql_print_information("Slave SQL thread stopped because "
1422 "UNTIL SQL_BEFORE_GTIDS %s is already "
1423 "applied", buffer);
1424 my_free(buffer);
1425 DBUG_RETURN(true);
1426 }
1427 global_sid_lock->unlock();
1428 }
1429 if (ev != NULL && ev->get_type_code() == GTID_LOG_EVENT)
1430 {
1431 Gtid_log_event *gev= (Gtid_log_event *)ev;
1432 global_sid_lock->rdlock();
1433 if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1434 {
1435 char *buffer= until_sql_gtids.to_string();
1436 global_sid_lock->unlock();
1437 sql_print_information("Slave SQL thread stopped because it reached "
1438 "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1439 my_free(buffer);
1440 DBUG_RETURN(true);
1441 }
1442 global_sid_lock->unlock();
1443 }
1444 DBUG_RETURN(false);
1445 break;
1446
1447 case UNTIL_SQL_AFTER_GTIDS:
1448 {
1449 global_sid_lock->wrlock();
1450 const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1451 if (until_sql_gtids.is_subset(logged_gtids))
1452 {
1453 char *buffer= until_sql_gtids.to_string();
1454 global_sid_lock->unlock();
1455 sql_print_information("Slave SQL thread stopped because it reached "
1456 "UNTIL SQL_AFTER_GTIDS %s", buffer);
1457 my_free(buffer);
1458 DBUG_RETURN(true);
1459 }
1460 global_sid_lock->unlock();
1461 DBUG_RETURN(false);
1462 }
1463 break;
1464
1465 case UNTIL_SQL_AFTER_MTS_GAPS:
1466 case UNTIL_DONE:
1467 /*
1468 TODO: this condition is actually post-execution or post-scheduling
1469 so the proper place to check it before SQL thread goes
1470 into next_event() where it can wait while the condition
1471 has been satisfied already.
1472 It's deployed here temporarily to be fixed along the regular UNTIL
1473 support for MTS is provided.
1474 */
1475 if (mts_recovery_group_cnt == 0)
1476 {
1477 sql_print_information("Slave SQL thread stopped according to "
1478 "UNTIL SQL_AFTER_MTS_GAPS as it has "
1479 "processed all gap transactions left from "
1480 "the previous slave session.");
1481 until_condition= UNTIL_DONE;
1482 DBUG_RETURN(true);
1483 }
1484 else
1485 {
1486 DBUG_RETURN(false);
1487 }
1488 break;
1489
1490 case UNTIL_NONE:
1491 DBUG_ASSERT(0);
1492 break;
1493 }
1494
1495 DBUG_ASSERT(0);
1496 DBUG_RETURN(false);
1497 }
1498
cached_charset_invalidate()1499 void Relay_log_info::cached_charset_invalidate()
1500 {
1501 DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1502
1503 /* Full of zeroes means uninitialized. */
1504 memset(cached_charset, 0, sizeof(cached_charset));
1505 DBUG_VOID_RETURN;
1506 }
1507
1508
cached_charset_compare(char * charset) const1509 bool Relay_log_info::cached_charset_compare(char *charset) const
1510 {
1511 DBUG_ENTER("Relay_log_info::cached_charset_compare");
1512
1513 if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1514 {
1515 memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1516 DBUG_RETURN(1);
1517 }
1518 DBUG_RETURN(0);
1519 }
1520
1521
stmt_done(my_off_t event_master_log_pos)1522 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1523 {
1524 int error= 0;
1525
1526 clear_flag(IN_STMT);
1527
1528 DBUG_ASSERT(!belongs_to_client());
1529 /* Worker does not execute binlog update position logics */
1530 DBUG_ASSERT(!is_mts_worker(info_thd));
1531
1532 /*
1533 Replication keeps event and group positions to specify the
1534 set of events that were executed.
1535 Event positions are incremented after processing each event
1536 whereas group positions are incremented when an event or a
1537 set of events is processed such as in a transaction and are
1538 committed or rolled back.
1539
1540 A transaction can be ended with a Query Event, i.e. either
1541 commit or rollback, or by a Xid Log Event. Query Event is
1542 used to terminate pseudo-transactions that are executed
1543 against non-transactional engines such as MyIsam. Xid Log
1544 Event denotes though that a set of changes executed
1545 against a transactional engine is about to commit.
1546
1547 Events' positions are incremented at stmt_done(). However,
1548 transactions that are ended with Xid Log Event have their
1549 group position incremented in the do_apply_event() and in
1550 the do_apply_event_work().
1551
1552 Notice that the type of the engine, i.e. where data and
1553 positions are stored, against what events are being applied
1554 are not considered in this logic.
1555
1556 Regarding the code that follows, notice that the executed
1557 group coordinates don't change if the current event is internal
1558 to the group. The same applies to MTS Coordinator when it
1559 handles a Format Descriptor event that appears in the middle
1560 of a group that is about to be assigned.
1561 */
1562 if ((!is_parallel_exec() && is_in_group()) ||
1563 mts_group_status != MTS_NOT_IN_GROUP)
1564 {
1565 inc_event_relay_log_pos();
1566 }
1567 else
1568 {
1569 if (is_parallel_exec())
1570 {
1571
1572 DBUG_ASSERT(!is_mts_worker(info_thd));
1573
1574 /*
1575 Format Description events only can drive MTS execution to this
1576 point. It is a special event group that is handled with
1577 synchronization. For that reason, the checkpoint routine is
1578 called here.
1579 */
1580 error= mts_checkpoint_routine(this, 0, false,
1581 true/*need_data_lock=true*/);
1582 }
1583 if (!error)
1584 error= inc_group_relay_log_pos(event_master_log_pos,
1585 true/*need_data_lock=true*/);
1586 }
1587
1588 return error;
1589 }
1590
1591 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1592 void Relay_log_info::cleanup_context(THD *thd, bool error)
1593 {
1594 DBUG_ENTER("Relay_log_info::cleanup_context");
1595
1596 DBUG_ASSERT(info_thd == thd);
1597 /*
1598 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1599 may have opened tables, which we cannot be sure have been closed (because
1600 maybe the Rows_log_event have not been found or will not be, because slave
1601 SQL thread is stopping, or relay log has a missing tail etc). So we close
1602 all thread's tables. And so the table mappings have to be cancelled.
1603 2) Rows_log_event::do_apply_event() may even have started statements or
1604 transactions on them, which we need to rollback in case of error.
1605 3) If finding a Format_description_log_event after a BEGIN, we also need
1606 to rollback before continuing with the next events.
1607 4) so we need this "context cleanup" function.
1608 */
1609 if (error)
1610 {
1611 trans_rollback_stmt(thd); // if a "statement transaction"
1612 trans_rollback(thd); // if a "real transaction"
1613 }
1614 if (rows_query_ev)
1615 {
1616 info_thd->reset_query_for_display();
1617 delete rows_query_ev;
1618 rows_query_ev= NULL;
1619 info_thd->set_query(NULL, 0);
1620 }
1621 m_table_map.clear_tables();
1622 slave_close_thread_tables(thd);
1623 if (error)
1624 thd->mdl_context.release_transactional_locks();
1625 clear_flag(IN_STMT);
1626 /*
1627 Cleanup for the flags that have been set at do_apply_event.
1628 */
1629 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1630 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1631
1632 /*
1633 Reset state related to long_find_row notes in the error log:
1634 - timestamp
1635 - flag that decides whether the slave prints or not
1636 */
1637 reset_row_stmt_start_timestamp();
1638 unset_long_find_row_note_printed();
1639
1640 DBUG_VOID_RETURN;
1641 }
1642
clear_tables_to_lock()1643 void Relay_log_info::clear_tables_to_lock()
1644 {
1645 DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
1646 #ifndef DBUG_OFF
1647 /**
1648 When replicating in RBR and MyISAM Merge tables are involved
1649 open_and_lock_tables (called in do_apply_event) appends the
1650 base tables to the list of tables_to_lock. Then these are
1651 removed from the list in close_thread_tables (which is called
1652 before we reach this point).
1653
1654 This assertion just confirms that we get no surprises at this
1655 point.
1656 */
1657 uint i=0;
1658 for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1659 DBUG_ASSERT(i == tables_to_lock_count);
1660 #endif
1661
1662 while (tables_to_lock)
1663 {
1664 uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1665 if (tables_to_lock->m_tabledef_valid)
1666 {
1667 tables_to_lock->m_tabledef.table_def::~table_def();
1668 tables_to_lock->m_tabledef_valid= FALSE;
1669 }
1670
1671 /*
1672 If blob fields were used during conversion of field values
1673 from the master table into the slave table, then we need to
1674 free the memory used temporarily to store their values before
1675 copying into the slave's table.
1676 */
1677 if (tables_to_lock->m_conv_table)
1678 free_blobs(tables_to_lock->m_conv_table);
1679
1680 tables_to_lock=
1681 static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1682 tables_to_lock_count--;
1683 my_free(to_free);
1684 }
1685 DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
1686 DBUG_VOID_RETURN;
1687 }
1688
slave_close_thread_tables(THD * thd)1689 void Relay_log_info::slave_close_thread_tables(THD *thd)
1690 {
1691 thd->get_stmt_da()->set_overwrite_status(true);
1692 DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
1693 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1694 thd->get_stmt_da()->set_overwrite_status(false);
1695
1696 close_thread_tables(thd);
1697 /*
1698 - If transaction rollback was requested due to deadlock
1699 perform it and release metadata locks.
1700 - If inside a multi-statement transaction,
1701 defer the release of metadata locks until the current
1702 transaction is either committed or rolled back. This prevents
1703 other statements from modifying the table for the entire
1704 duration of this transaction. This provides commit ordering
1705 and guarantees serializability across multiple transactions.
1706 - If in autocommit mode, or outside a transactional context,
1707 automatically release metadata locks of the current statement.
1708 */
1709 if (thd->transaction_rollback_request)
1710 {
1711 trans_rollback_implicit(thd);
1712 thd->mdl_context.release_transactional_locks();
1713 }
1714 else if (! thd->in_multi_stmt_transaction_mode())
1715 thd->mdl_context.release_transactional_locks();
1716 else
1717 thd->mdl_context.release_statement_locks();
1718
1719 clear_tables_to_lock();
1720 DBUG_VOID_RETURN;
1721 }
1722 /**
1723 Execute a SHOW RELAYLOG EVENTS statement.
1724
1725 @param thd Pointer to THD object for the client thread executing the
1726 statement.
1727
1728 @retval FALSE success
1729 @retval TRUE failure
1730 */
mysql_show_relaylog_events(THD * thd)1731 bool mysql_show_relaylog_events(THD* thd)
1732 {
1733 Protocol *protocol= thd->protocol;
1734 List<Item> field_list;
1735 DBUG_ENTER("mysql_show_relaylog_events");
1736
1737 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1738
1739 Log_event::init_show_field_list(&field_list);
1740 if (protocol->send_result_set_metadata(&field_list,
1741 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1742 DBUG_RETURN(TRUE);
1743
1744 if (active_mi == NULL)
1745 {
1746 my_error(ER_SLAVE_CONFIGURATION, MYF(0));
1747 DBUG_RETURN(true);
1748 }
1749
1750 DBUG_RETURN(show_binlog_events(thd, &active_mi->rli->relay_log));
1751 }
1752
1753 #endif
1754
rli_init_info()1755 int Relay_log_info::rli_init_info()
1756 {
1757 int error= 0;
1758 enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
1759 const char *msg= NULL;
1760
1761 DBUG_ENTER("Relay_log_info::rli_init_info");
1762
1763 mysql_mutex_assert_owner(&data_lock);
1764
1765 /*
1766 If Relay_log_info is issued again after a failed init_info(), for
1767 instance because of missing relay log files, it will generate new
1768 files and ignore the previous failure, to avoid that we set
1769 error_on_rli_init_info as true.
1770 This a consequence of the behaviour change, in the past server was
1771 stopped when there were replication initialization errors, now it is
1772 not and so init_info() must be aware of previous failures.
1773 */
1774 if (error_on_rli_init_info)
1775 goto err;
1776
1777 if (inited)
1778 {
1779 /*
1780 We have to reset read position of relay-log-bin as we may have
1781 already been reading from 'hotlog' when the slave was stopped
1782 last time. If this case pos_in_file would be set and we would
1783 get a crash when trying to read the signature for the binary
1784 relay log.
1785
1786 We only rewind the read position if we are starting the SQL
1787 thread. The handle_slave_sql thread assumes that the read
1788 position is at the beginning of the file, and will read the
1789 "signature" and then fast-forward to the last position read.
1790 */
1791 bool hot_log= FALSE;
1792 /*
1793 my_b_seek does an implicit flush_io_cache, so we need to:
1794
1795 1. check if this log is active (hot)
1796 2. if it is we keep log_lock until the seek ends, otherwise
1797 release it right away.
1798
1799 If we did not take log_lock, SQL thread might race with IO
1800 thread for the IO_CACHE mutex.
1801
1802 */
1803 mysql_mutex_t *log_lock= relay_log.get_log_lock();
1804 mysql_mutex_lock(log_lock);
1805 hot_log= relay_log.is_active(linfo.log_file_name);
1806
1807 if (!hot_log)
1808 mysql_mutex_unlock(log_lock);
1809
1810 my_b_seek(cur_log, (my_off_t) 0);
1811
1812 if (hot_log)
1813 mysql_mutex_unlock(log_lock);
1814 DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
1815 }
1816
1817 cur_log_fd = -1;
1818 slave_skip_counter= 0;
1819 abort_pos_wait= 0;
1820 log_space_limit= relay_log_space_limit;
1821 log_space_total= 0;
1822 tables_to_lock= 0;
1823 tables_to_lock_count= 0;
1824
1825 char pattern[FN_REFLEN];
1826 (void) my_realpath(pattern, slave_load_tmpdir, 0);
1827 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
1828 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
1829 {
1830 sql_print_error("Unable to use slave's temporary directory '%s'.",
1831 slave_load_tmpdir);
1832 DBUG_RETURN(1);
1833 }
1834 unpack_filename(slave_patternload_file, pattern);
1835 slave_patternload_file_size= strlen(slave_patternload_file);
1836
1837 /*
1838 The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
1839 Note that the I/O thread flushes it to disk after writing every
1840 event, in flush_info within the master info.
1841 */
1842 /*
1843 For the maximum log size, we choose max_relay_log_size if it is
1844 non-zero, max_binlog_size otherwise. If later the user does SET
1845 GLOBAL on one of these variables, fix_max_binlog_size and
1846 fix_max_relay_log_size will reconsider the choice (for example
1847 if the user changes max_relay_log_size to zero, we have to
1848 switch to using max_binlog_size for the relay log) and update
1849 relay_log.max_size (and mysql_bin_log.max_size).
1850 */
1851 {
1852 /* Reports an error and returns, if the --relay-log's path
1853 is a directory.*/
1854 if (opt_relay_logname &&
1855 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
1856 {
1857 sql_print_error("Path '%s' is a directory name, please specify \
1858 a file name for --relay-log option.", opt_relay_logname);
1859 DBUG_RETURN(1);
1860 }
1861
1862 /* Reports an error and returns, if the --relay-log-index's path
1863 is a directory.*/
1864 if (opt_relaylog_index_name &&
1865 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
1866 == FN_LIBCHAR)
1867 {
1868 sql_print_error("Path '%s' is a directory name, please specify \
1869 a file name for --relay-log-index option.", opt_relaylog_index_name);
1870 DBUG_RETURN(1);
1871 }
1872
1873 char buf[FN_REFLEN];
1874 const char *ln;
1875 static bool name_warning_sent= 0;
1876 ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1877 1, buf);
1878 /* We send the warning only at startup, not after every RESET SLAVE */
1879 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
1880 {
1881 /*
1882 User didn't give us info to name the relay log index file.
1883 Picking `hostname`-relay-bin.index like we do, causes replication to
1884 fail if this slave's hostname is changed later. So, we would like to
1885 instead require a name. But as we don't want to break many existing
1886 setups, we only give warning, not error.
1887 */
1888 sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
1889 " so replication "
1890 "may break when this MySQL server acts as a "
1891 "slave and has his hostname changed!! Please "
1892 "use '--relay-log=%s' to avoid this problem.", ln);
1893 name_warning_sent= 1;
1894 }
1895
1896 relay_log.is_relay_log= TRUE;
1897
1898 if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1899 {
1900 sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
1901 DBUG_RETURN(1);
1902 }
1903 #ifndef DBUG_OFF
1904 global_sid_lock->wrlock();
1905 gtid_set.dbug_print("set of GTIDs in relay log before initialization");
1906 global_sid_lock->unlock();
1907 #endif
1908 /*
1909 Below init_gtid_sets() function will parse the available relay logs and
1910 set I/O retrieved gtid event in gtid_state object. We dont need to find
1911 last_retrieved_gtid_event if relay_log_recovery=1 (retrieved set will
1912 be cleared off in that case).
1913 */
1914 if (!current_thd &&
1915 relay_log.init_gtid_sets(>id_set, NULL,
1916 is_relay_log_recovery ? NULL : get_last_retrieved_gtid(),
1917 opt_slave_sql_verify_checksum,
1918 true/*true=need lock*/))
1919 {
1920 sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
1921 DBUG_RETURN(1);
1922 }
1923 #ifndef DBUG_OFF
1924 global_sid_lock->wrlock();
1925 gtid_set.dbug_print("set of GTIDs in relay log after initialization");
1926 global_sid_lock->unlock();
1927 #endif
1928 /*
1929 Configures what object is used by the current log to store processed
1930 gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
1931 corretly compute the set of previous gtids.
1932 */
1933 relay_log.set_previous_gtid_set(>id_set);
1934 /*
1935 note, that if open() fails, we'll still have index file open
1936 but a destructor will take care of that
1937 */
1938 if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1939 (max_relay_log_size ? max_relay_log_size :
1940 max_binlog_size), true,
1941 true/*need_lock_log=true*/,
1942 true/*need_lock_index=true*/,
1943 true/*need_sid_lock=true*/,
1944 mi->get_mi_description_event()))
1945 {
1946 sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
1947 DBUG_RETURN(1);
1948 }
1949 }
1950
1951 /*
1952 This checks if the repository was created before and thus there
1953 will be values to be read. Please, do not move this call after
1954 the handler->init_info().
1955 */
1956 if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
1957 {
1958 msg= "Error checking relay log repository";
1959 error= 1;
1960 goto err;
1961 }
1962
1963 if (handler->init_info())
1964 {
1965 msg= "Error reading relay log configuration";
1966 error= 1;
1967 goto err;
1968 }
1969
1970 if (check_return == REPOSITORY_DOES_NOT_EXIST)
1971 {
1972 /* Init relay log with first entry in the relay index file */
1973 if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
1974 false/*need_data_lock=false (lock should be held
1975 prior to invoking this function)*/,
1976 &msg, 0))
1977 {
1978 error= 1;
1979 goto err;
1980 }
1981 set_group_master_log_name("");
1982 set_group_master_log_pos(0);
1983 }
1984 else
1985 {
1986 if (read_info(handler))
1987 {
1988 msg= "Error reading relay log configuration";
1989 error= 1;
1990 goto err;
1991 }
1992
1993 if (is_relay_log_recovery && init_recovery(mi, &msg))
1994 {
1995 error= 1;
1996 goto err;
1997 }
1998
1999 if (init_relay_log_pos(group_relay_log_name,
2000 group_relay_log_pos,
2001 false/*need_data_lock=false (lock should be held
2002 prior to invoking this function)*/,
2003 &msg, 0))
2004 {
2005 char llbuf[22];
2006 sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
2007 group_relay_log_name,
2008 llstr(group_relay_log_pos, llbuf));
2009 error= 1;
2010 goto err;
2011 }
2012
2013 #ifndef DBUG_OFF
2014 {
2015 char llbuf1[22], llbuf2[22];
2016 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
2017 llstr(my_b_tell(cur_log),llbuf1),
2018 llstr(event_relay_log_pos,llbuf2)));
2019 DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2020 DBUG_ASSERT((my_b_tell(cur_log) == event_relay_log_pos));
2021 }
2022 #endif
2023 }
2024
2025 inited= 1;
2026 error_on_rli_init_info= false;
2027 if (flush_info(TRUE))
2028 {
2029 msg= "Error reading relay log configuration";
2030 error= 1;
2031 goto err;
2032 }
2033
2034 if (count_relay_log_space())
2035 {
2036 msg= "Error counting relay log space";
2037 error= 1;
2038 goto err;
2039 }
2040
2041 /*
2042 In case of MTS the recovery is deferred until the end of global_init_info.
2043 */
2044 if (!mi->rli->mts_recovery_group_cnt)
2045 is_relay_log_recovery= FALSE;
2046 DBUG_RETURN(error);
2047
2048 err:
2049 handler->end_info();
2050 inited= 0;
2051 error_on_rli_init_info= true;
2052 if (msg)
2053 sql_print_error("%s.", msg);
2054 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2055 true/*need_lock_log=true*/,
2056 true/*need_lock_index=true*/);
2057 DBUG_RETURN(error);
2058 }
2059
end_info()2060 void Relay_log_info::end_info()
2061 {
2062 DBUG_ENTER("Relay_log_info::end_info");
2063
2064 error_on_rli_init_info= false;
2065 if (!inited)
2066 DBUG_VOID_RETURN;
2067
2068 handler->end_info();
2069
2070 if (cur_log_fd >= 0)
2071 {
2072 end_io_cache(&cache_buf);
2073 (void)my_close(cur_log_fd, MYF(MY_WME));
2074 cur_log_fd= -1;
2075 }
2076 inited = 0;
2077 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2078 true/*need_lock_log=true*/,
2079 true/*need_lock_index=true*/);
2080 relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2081 /*
2082 Delete the slave's temporary tables from memory.
2083 In the future there will be other actions than this, to ensure persistance
2084 of slave's temp tables after shutdown.
2085 */
2086 close_temporary_tables();
2087
2088 DBUG_VOID_RETURN;
2089 }
2090
flush_current_log()2091 int Relay_log_info::flush_current_log()
2092 {
2093 DBUG_ENTER("Relay_log_info::flush_current_log");
2094 /*
2095 When we come to this place in code, relay log may or not be initialized;
2096 the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2097 */
2098 IO_CACHE *log_file= relay_log.get_log_file();
2099 if (flush_io_cache(log_file))
2100 DBUG_RETURN(2);
2101
2102 DBUG_RETURN(0);
2103 }
2104
set_master_info(Master_info * info)2105 void Relay_log_info::set_master_info(Master_info* info)
2106 {
2107 mi= info;
2108 }
2109
2110 /**
2111 Stores the file and position where the execute-slave thread are in the
2112 relay log:
2113
2114 - As this is only called by the slave thread or on STOP SLAVE, with the
2115 log_lock grabbed and the slave thread stopped, we don't need to have
2116 a lock here.
2117 - If there is an active transaction, then we don't update the position
2118 in the relay log. This is to ensure that we re-execute statements
2119 if we die in the middle of an transaction that was rolled back.
2120 - As a transaction never spans binary logs, we don't have to handle the
2121 case where we do a relay-log-rotation in the middle of the transaction.
2122 If this would not be the case, we would have to ensure that we
2123 don't delete the relay log file where the transaction started when
2124 we switch to a new relay log file.
2125
2126 @retval 0 ok,
2127 @retval 1 write error, otherwise.
2128 */
2129
2130 /**
2131 Store the file and position where the slave's SQL thread are in the
2132 relay log.
2133
2134 Notes:
2135
2136 - This function should be called either from the slave SQL thread,
2137 or when the slave thread is not running. (It reads the
2138 group_{relay|master}_log_{pos|name} and delay fields in the rli
2139 object. These may only be modified by the slave SQL thread or by
2140 a client thread when the slave SQL thread is not running.)
2141
2142 - If there is an active transaction, then we do not update the
2143 position in the relay log. This is to ensure that we re-execute
2144 statements if we die in the middle of an transaction that was
2145 rolled back.
2146
2147 - As a transaction never spans binary logs, we don't have to handle
2148 the case where we do a relay-log-rotation in the middle of the
2149 transaction. If transactions could span several binlogs, we would
2150 have to ensure that we do not delete the relay log file where the
2151 transaction started before switching to a new relay log file.
2152
2153 - Error can happen if writing to file fails or if flushing the file
2154 fails.
2155
2156 @param rli The object representing the Relay_log_info.
2157
2158 @todo Change the log file information to a binary format to avoid
2159 calling longlong2str.
2160
2161 @return 0 on success, 1 on error.
2162 */
flush_info(const bool force)2163 int Relay_log_info::flush_info(const bool force)
2164 {
2165 DBUG_ENTER("Relay_log_info::flush_info");
2166
2167 if (!inited)
2168 DBUG_RETURN(0);
2169
2170 /*
2171 We update the sync_period at this point because only here we
2172 now that we are handling a relay log info. This needs to be
2173 update every time we call flush because the option maybe
2174 dinamically set.
2175 */
2176 handler->set_sync_period(sync_relayloginfo_period);
2177
2178 if (write_info(handler))
2179 goto err;
2180
2181 if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2182 goto err;
2183
2184 force_flush_postponed_due_to_split_trans= false;
2185 DBUG_RETURN(0);
2186
2187 err:
2188 sql_print_error("Error writing relay log configuration.");
2189 DBUG_RETURN(1);
2190 }
2191
get_number_info_rli_fields()2192 size_t Relay_log_info::get_number_info_rli_fields()
2193 {
2194 return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2195 }
2196
read_info(Rpl_info_handler * from)2197 bool Relay_log_info::read_info(Rpl_info_handler *from)
2198 {
2199 int lines= 0;
2200 char *first_non_digit= NULL;
2201 ulong temp_group_relay_log_pos= 0;
2202 char temp_group_master_log_name[FN_REFLEN];
2203 ulong temp_group_master_log_pos= 0;
2204 int temp_sql_delay= 0;
2205 int temp_internal_id= internal_id;
2206
2207 DBUG_ENTER("Relay_log_info::read_info");
2208
2209 /*
2210 Should not read RLI from file in client threads. Client threads
2211 only use RLI to execute BINLOG statements.
2212
2213 @todo Uncomment the following assertion. Currently,
2214 Relay_log_info::init() is called from init_master_info() before
2215 the THD object Relay_log_info::sql_thd is created. That means we
2216 cannot call belongs_to_client() since belongs_to_client()
2217 dereferences Relay_log_info::sql_thd. So we need to refactor
2218 slightly: the THD object should be created by Relay_log_info
2219 constructor (or passed to it), so that we are guaranteed that it
2220 exists at this point. /Sven
2221 */
2222 //DBUG_ASSERT(!belongs_to_client());
2223
2224 /*
2225 Starting from 5.1.x, relay-log.info has a new format. Now, its
2226 first line contains the number of lines in the file. By reading
2227 this number we can determine which version our master.info comes
2228 from. We can't simply count the lines in the file, since
2229 versions before 5.1.x could generate files with more lines than
2230 needed. If first line doesn't contain a number, or if it
2231 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2232 then the file is treated like a file from pre-5.1.x version.
2233 There is no ambiguity when reading an old master.info: before
2234 5.1.x, the first line contained the binlog's name, which is
2235 either empty or has an extension (contains a '.'), so can't be
2236 confused with an integer.
2237
2238 So we're just reading first line and trying to figure which
2239 version is this.
2240 */
2241
2242 /*
2243 The first row is temporarily stored in mi->master_log_name, if
2244 it is line count and not binlog name (new format) it will be
2245 overwritten by the second row later.
2246 */
2247 if (from->prepare_info_for_read() ||
2248 from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2249 (char *) ""))
2250 DBUG_RETURN(TRUE);
2251
2252 lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2253
2254 if (group_relay_log_name[0]!='\0' &&
2255 *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2256 {
2257 /* Seems to be new format => read group relay log name */
2258 if (from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2259 (char *) ""))
2260 DBUG_RETURN(TRUE);
2261 }
2262 else
2263 DBUG_PRINT("info", ("relay_log_info file is in old format."));
2264
2265 if (from->get_info((ulong *) &temp_group_relay_log_pos,
2266 (ulong) BIN_LOG_HEADER_SIZE) ||
2267 from->get_info(temp_group_master_log_name,
2268 (size_t) sizeof(temp_group_master_log_name),
2269 (char *) "") ||
2270 from->get_info((ulong *) &temp_group_master_log_pos,
2271 (ulong) 0))
2272 DBUG_RETURN(TRUE);
2273
2274 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2275 {
2276 if (from->get_info((int *) &temp_sql_delay, (int) 0))
2277 DBUG_RETURN(TRUE);
2278 }
2279
2280 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2281 {
2282 if (from->get_info(&recovery_parallel_workers,(ulong) 0))
2283 DBUG_RETURN(TRUE);
2284 }
2285
2286 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2287 {
2288 if (from->get_info(&temp_internal_id, (int) 1))
2289 DBUG_RETURN(TRUE);
2290 }
2291
2292 group_relay_log_pos= temp_group_relay_log_pos;
2293 set_group_master_log_name(temp_group_master_log_name);
2294 set_group_master_log_pos(temp_group_master_log_pos);
2295 sql_delay= (int32) temp_sql_delay;
2296 internal_id= (uint) temp_internal_id;
2297
2298 DBUG_ASSERT(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2299 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2300 DBUG_RETURN(FALSE);
2301 }
2302
write_info(Rpl_info_handler * to)2303 bool Relay_log_info::write_info(Rpl_info_handler *to)
2304 {
2305 DBUG_ENTER("Relay_log_info::write_info");
2306
2307 /*
2308 @todo Uncomment the following assertion. See todo in
2309 Relay_log_info::read_info() for details. /Sven
2310 */
2311 //DBUG_ASSERT(!belongs_to_client());
2312
2313 if (to->prepare_info_for_write() ||
2314 to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2315 to->set_info(group_relay_log_name) ||
2316 to->set_info((ulong) group_relay_log_pos) ||
2317 to->set_info(get_group_master_log_name()) ||
2318 to->set_info((ulong) get_group_master_log_pos()) ||
2319 to->set_info((int) sql_delay) ||
2320 to->set_info(recovery_parallel_workers) ||
2321 to->set_info((int) internal_id))
2322 DBUG_RETURN(TRUE);
2323
2324 DBUG_RETURN(FALSE);
2325 }
2326
2327 /**
2328 Delete the existing event and set a new one. This class is
2329 responsible for freeing the event, the caller should not do that.
2330 When a new FD is from the master adaptation routine is invoked
2331 to align the slave applier execution context with the master version.
2332
2333 The method is run by SQL thread/MTS Coordinator.
2334 Although notice that MTS worker runs it, inefficiently (see assert),
2335 once at its destruction time.
2336 todo: fix Slave_worker and Relay_log_info inheritance relation.
2337
2338 @param a pointer to be installed into execution context
2339 FormatDescriptor event
2340 */
2341
set_rli_description_event(Format_description_log_event * fe)2342 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2343 {
2344 DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2345
2346 if (fe)
2347 {
2348 adapt_to_master_version(fe);
2349 if (info_thd && is_parallel_exec())
2350 {
2351 for (uint i= 0; i < workers.elements; i++)
2352 {
2353 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
2354 mysql_mutex_lock(&w->jobs_lock);
2355 if (w->running_status == Slave_worker::RUNNING)
2356 w->set_rli_description_event(fe);
2357 mysql_mutex_unlock(&w->jobs_lock);
2358 }
2359 }
2360 }
2361 delete rli_description_event;
2362 rli_description_event= fe;
2363 }
2364
2365 struct st_feature_version
2366 {
2367 /*
2368 The enum must be in the version non-descending top-down order,
2369 the last item formally corresponds to highest possible server
2370 version (never reached, thereby no adapting actions here);
2371 enumeration starts from zero.
2372 */
2373 enum
2374 {
2375 WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2376 _END_OF_LIST // always last
2377 } item;
2378 /*
2379 Version where the feature is introduced.
2380 */
2381 uchar version_split[3];
2382 /*
2383 Action to perform when according to FormatDescriptor event Master
2384 is found to be feature-aware while previously it has *not* been.
2385 */
2386 void (*upgrade) (THD*);
2387 /*
2388 Action to perform when according to FormatDescriptor event Master
2389 is found to be feature-*un*aware while previously it has been.
2390 */
2391 void (*downgrade) (THD*);
2392 };
2393
wl6292_upgrade_func(THD * thd)2394 void wl6292_upgrade_func(THD *thd)
2395 {
2396 thd->variables.explicit_defaults_for_timestamp= false;
2397 if (global_system_variables.explicit_defaults_for_timestamp)
2398 thd->variables.explicit_defaults_for_timestamp= true;
2399
2400 return;
2401 }
2402
wl6292_downgrade_func(THD * thd)2403 void wl6292_downgrade_func(THD *thd)
2404 {
2405 if (global_system_variables.explicit_defaults_for_timestamp)
2406 thd->variables.explicit_defaults_for_timestamp= false;
2407
2408 return;
2409 }
2410
2411 /**
2412 Sensitive to Master-vs-Slave version difference features
2413 should be listed in the version non-descending order.
2414 */
2415 static st_feature_version s_features[]=
2416 {
2417 // order is the same as in the enum
2418 { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2419 {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2420 { st_feature_version::_END_OF_LIST,
2421 {255, 255, 255}, NULL, NULL }
2422 };
2423
2424 /**
2425 The method lists rules of adaptation for the slave applier
2426 to specific master versions.
2427 It's executed right before a new master FD is set for
2428 slave appliers execution context.
2429 Comparison of the old and new version yields the adaptive
2430 actions direction.
2431 Current execution FD's version, V_0, is compared with the new being set up
2432 FD (the arg), let's call it V_1.
2433 In the case of downgrade features that are defined in [V_0, V_1-1] range
2434 (V_1 excluded) are "removed" by running the downgrade actions.
2435 In the upgrade case the featured defined in [V_0 + 1, V_1] range are
2436 "added" by running the upgrade actions.
2437
2438 Notice, that due to relay log may have two FD events, one the slave local
2439 and the other from the Master. That can lead to extra
2440 adapt_to_master_version() calls and in case Slave and Master are of different
2441 versions the extra two calls should compensate each other.
2442
2443 Also, at composing downgrade/upgrade actions keep in mind that
2444 at initialization Slave sets up FD of version 4.0 and then transits to
2445 the current server version. At transition all upgrading actions in
2446 the range of [4.0..current] are run.
2447
2448 @param fdle a pointer to new Format Description event that is being set
2449 up for execution context.
2450 */
adapt_to_master_version(Format_description_log_event * fdle)2451 void Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2452 {
2453 THD *thd=info_thd;
2454 ulong master_version, current_version;
2455 int changed= !fdle || ! rli_description_event ? 0 :
2456 (master_version= fdle->get_version_product()) -
2457 (current_version= rli_description_event->get_version_product());
2458
2459 /* When the last version is not changed nothing to adapt for */
2460 if (!changed)
2461 return;
2462
2463 /*
2464 find item starting from and ending at for which adaptive actions run
2465 for downgrade or upgrade branches.
2466 (todo: convert into bsearch when number of features will grow significantly)
2467 */
2468 bool downgrade= changed < 0;
2469 long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2470
2471 for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2472 {
2473 ulong ver_f= version_product(s_features[i].version_split);
2474
2475 if ((downgrade ? master_version : current_version) < ver_f &&
2476 i_first == st_feature_version::_END_OF_LIST)
2477 i_first= i;
2478 if ((downgrade ? current_version : master_version) < ver_f)
2479 {
2480 i_last= i;
2481 DBUG_ASSERT(i_last >= i_first);
2482 break;
2483 }
2484 }
2485
2486 /*
2487 actions, executed in version non-descending st_feature_version order
2488 */
2489 for (i= i_first; i < i_last; i++)
2490 {
2491 /* Run time check of the st_feature_version items ordering */
2492 DBUG_ASSERT(!i ||
2493 version_product(s_features[i - 1].version_split) <=
2494 version_product(s_features[i].version_split));
2495
2496 DBUG_ASSERT((downgrade ? master_version : current_version) <
2497 version_product(s_features[i].version_split) &&
2498 (downgrade ? current_version : master_version >=
2499 version_product(s_features[i].version_split)));
2500
2501 if (downgrade && s_features[i].downgrade)
2502 {
2503 s_features[i].downgrade(thd);
2504 }
2505 else if (s_features[i].upgrade)
2506 {
2507 s_features[i].upgrade(thd);
2508 }
2509 }
2510 }
2511
operator new(size_t request)2512 void* Relay_log_info::operator new(size_t request)
2513 {
2514 void* ptr;
2515 if (posix_memalign(&ptr, __alignof__(Relay_log_info), sizeof(Relay_log_info))) {
2516 throw std::bad_alloc();
2517 }
2518 return ptr;
2519 }
2520
operator delete(void * ptr)2521 void Relay_log_info::operator delete(void * ptr) {
2522 free(ptr);
2523 }
2524
2525