1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "rpl_rli.h"
24
25 #include "my_dir.h" // MY_STAT
26 #include "log.h" // sql_print_error
27 #include "log_event.h" // Log_event
28 #include "m_string.h"
29 #include "rpl_group_replication.h" // set_group_replication_retrieved_certifi...
30 #include "rpl_info_factory.h" // Rpl_info_factory
31 #include "rpl_mi.h" // Master_info
32 #include "rpl_msr.h" // channel_map
33 #include "rpl_rli_pdb.h" // Slave_worker
34 #include "sql_base.h" // close_thread_tables
35 #include "strfunc.h" // strconvert
36 #include "transaction.h" // trans_commit_stmt
37 #include "debug_sync.h"
38 #include "pfs_file_provider.h"
39 #include "mysql/psi/mysql_file.h"
40 #include "mutex_lock.h" // Mutex_lock
41
42 #include <algorithm>
43 using std::min;
44 using std::max;
45
46 /*
47 Please every time you add a new field to the relay log info, update
48 what follows. For now, this is just used to get the number of
49 fields.
50 */
51 const char* info_rli_fields[]=
52 {
53 "number_of_lines",
54 "group_relay_log_name",
55 "group_relay_log_pos",
56 "group_master_log_name",
57 "group_master_log_pos",
58 "sql_delay",
59 "number_of_workers",
60 "id",
61 "channel_name"
62 };
63
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel,bool is_rli_fake)64 Relay_log_info::Relay_log_info(bool is_slave_recovery
65 #ifdef HAVE_PSI_INTERFACE
66 ,PSI_mutex_key *param_key_info_run_lock,
67 PSI_mutex_key *param_key_info_data_lock,
68 PSI_mutex_key *param_key_info_sleep_lock,
69 PSI_mutex_key *param_key_info_thd_lock,
70 PSI_mutex_key *param_key_info_data_cond,
71 PSI_mutex_key *param_key_info_start_cond,
72 PSI_mutex_key *param_key_info_stop_cond,
73 PSI_mutex_key *param_key_info_sleep_cond
74 #endif
75 , uint param_id, const char *param_channel,
76 bool is_rli_fake
77 )
78 :Rpl_info("SQL"
79 #ifdef HAVE_PSI_INTERFACE
80 ,param_key_info_run_lock, param_key_info_data_lock,
81 param_key_info_sleep_lock, param_key_info_thd_lock,
82 param_key_info_data_cond, param_key_info_start_cond,
83 param_key_info_stop_cond, param_key_info_sleep_cond
84 #endif
85 , param_id, param_channel
86 ),
87 replicate_same_server_id(::replicate_same_server_id),
88 cur_log_fd(-1), relay_log(&sync_relaylog_period, SEQ_READ_APPEND),
89 is_relay_log_recovery(is_slave_recovery),
90 save_temporary_tables(0),
91 cur_log_old_open_count(0), error_on_rli_init_info(false),
92 group_relay_log_pos(0), event_relay_log_number(0),
93 event_relay_log_pos(0), event_start_pos(0),
94 group_master_log_pos(0),
95 gtid_set(global_sid_map, global_sid_lock),
96 rli_fake(is_rli_fake),
97 gtid_retrieved_initialized(false),
98 is_group_master_log_pos_invalid(false),
99 log_space_total(0), ignore_log_space_limit(0),
100 sql_force_rotate_relay(false),
101 last_master_timestamp(0), slave_skip_counter(0),
102 abort_pos_wait(0), until_condition(UNTIL_NONE),
103 until_log_pos(0),
104 until_sql_gtids(global_sid_map),
105 until_sql_gtids_first_event(true),
106 trans_retries(0), retried_trans(0),
107 tables_to_lock(0), tables_to_lock_count(0),
108 rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
109 workers(PSI_NOT_INSTRUMENTED),
110 workers_array_initialized(false),
111 curr_group_assigned_parts(PSI_NOT_INSTRUMENTED),
112 curr_group_da(PSI_NOT_INSTRUMENTED),
113 slave_parallel_workers(0),
114 exit_counter(0),
115 max_updated_index(0),
116 recovery_parallel_workers(0), checkpoint_seqno(0),
117 checkpoint_group(opt_mts_checkpoint_group),
118 recovery_groups_inited(false), mts_recovery_group_cnt(0),
119 mts_recovery_index(0), mts_recovery_group_seen_begin(0),
120 mts_group_status(MTS_NOT_IN_GROUP),
121 stats_exec_time(0), stats_read_time(0),
122 least_occupied_workers(PSI_NOT_INSTRUMENTED),
123 current_mts_submode(0),
124 reported_unsafe_warning(false), rli_description_event(NULL),
125 commit_order_mngr(NULL),
126 sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
127 long_find_row_note_printed(false),
128 thd_tx_priority(0),
129 m_ignore_write_set_memory_limit(false),
130 m_allow_drop_write_set(false),
131 is_engine_ha_data_detached(false)
132 {
133 DBUG_ENTER("Relay_log_info::Relay_log_info");
134
135 #ifdef HAVE_PSI_INTERFACE
136 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
137 key_RELAYLOG_LOCK_commit,
138 key_RELAYLOG_LOCK_commit_queue,
139 key_RELAYLOG_LOCK_done,
140 key_RELAYLOG_LOCK_flush_queue,
141 key_RELAYLOG_LOCK_log,
142 PSI_NOT_INSTRUMENTED, /* Relaylog doesn't support LOCK_binlog_end_pos */
143 key_RELAYLOG_LOCK_sync,
144 key_RELAYLOG_LOCK_sync_queue,
145 key_RELAYLOG_LOCK_xids,
146 key_RELAYLOG_COND_done,
147 key_RELAYLOG_update_cond,
148 key_RELAYLOG_prep_xids_cond,
149 key_file_relaylog,
150 key_file_relaylog_index,
151 key_file_relaylog_cache,
152 key_file_relaylog_index_cache);
153 #endif
154
155 group_relay_log_name[0]= event_relay_log_name[0]=
156 group_master_log_name[0]= 0;
157 until_log_name[0]= ign_master_log_name_end[0]= 0;
158 set_timespec_nsec(&last_clock, 0);
159 memset(&cache_buf, 0, sizeof(cache_buf));
160 cached_charset_invalidate();
161 inited_hash_workers= FALSE;
162 channel_open_temp_tables.atomic_set(0);
163 /*
164 For applier threads, currently_executing_gtid is set to automatic
165 when they are not executing any transaction.
166 */
167 currently_executing_gtid.set_automatic();
168
169 if (!rli_fake)
170 {
171 mysql_mutex_init(key_relay_log_info_log_space_lock,
172 &log_space_lock, MY_MUTEX_INIT_FAST);
173 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond);
174 mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
175 MY_MUTEX_INIT_FAST);
176 mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond);
177 mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
178 MY_MUTEX_INIT_FAST);
179 mysql_mutex_init(key_mts_temp_table_LOCK, &mts_temp_table_LOCK,
180 MY_MUTEX_INIT_FAST);
181 mysql_mutex_init(key_mts_gaq_LOCK, &mts_gaq_LOCK,
182 MY_MUTEX_INIT_FAST);
183 mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
184
185 relay_log.init_pthread_objects();
186 force_flush_postponed_due_to_split_trans= false;
187 }
188 do_server_version_split(::server_version, slave_version_split);
189
190 DBUG_VOID_RETURN;
191 }
192
193 /**
194 The method to invoke at slave threads start
195 */
init_workers(ulong n_workers)196 void Relay_log_info::init_workers(ulong n_workers)
197 {
198 /*
199 Parallel slave parameters initialization is done regardless
200 whether the feature is or going to be active or not.
201 */
202 mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
203 mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
204 mts_total_wait_overlap= 0;
205 mts_total_wait_worker_avail= 0;
206 mts_last_online_stat= 0;
207
208 workers.reserve(n_workers);
209 workers_array_initialized= true; //set after init
210 }
211
212 /**
213 The method to invoke at slave threads stop
214 */
deinit_workers()215 void Relay_log_info::deinit_workers()
216 {
217 workers.clear();
218 }
219
~Relay_log_info()220 Relay_log_info::~Relay_log_info()
221 {
222 DBUG_ENTER("Relay_log_info::~Relay_log_info");
223
224 if(!rli_fake)
225 {
226 if (recovery_groups_inited)
227 bitmap_free(&recovery_groups);
228 delete current_mts_submode;
229
230 if(workers_copy_pfs.size())
231 {
232 for (int i= static_cast<int>(workers_copy_pfs.size()) - 1; i >= 0; i--)
233 delete workers_copy_pfs[i];
234 workers_copy_pfs.clear();
235 }
236
237 mysql_mutex_destroy(&log_space_lock);
238 mysql_cond_destroy(&log_space_cond);
239 mysql_mutex_destroy(&pending_jobs_lock);
240 mysql_cond_destroy(&pending_jobs_cond);
241 mysql_mutex_destroy(&exit_count_lock);
242 mysql_mutex_destroy(&mts_temp_table_LOCK);
243 mysql_mutex_destroy(&mts_gaq_LOCK);
244 mysql_cond_destroy(&logical_clock_cond);
245 relay_log.cleanup();
246 }
247
248 set_rli_description_event(NULL);
249
250 DBUG_VOID_RETURN;
251 }
252
253 /**
254 Method is called when MTS coordinator senses the relay-log name
255 has been changed.
256 It marks each Worker member with this fact to make an action
257 at time it will distribute a terminal event of a group to the Worker.
258
259 Worker receives the new name at the group commiting phase
260 @c Slave_worker::slave_worker_ends_group().
261 */
reset_notified_relay_log_change()262 void Relay_log_info::reset_notified_relay_log_change()
263 {
264 if (!is_parallel_exec())
265 return;
266 for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
267 {
268 Slave_worker *w= *it;
269 w->relay_log_change_notified= FALSE;
270 }
271 }
272
273 /**
274 This method is called in mts_checkpoint_routine() to mark that each
275 worker is required to adapt to a new checkpoint data whose coordinates
276 are passed to it through GAQ index.
277
278 Worker notices the new checkpoint value at the group commit to reset
279 the current bitmap and starts using the clean bitmap indexed from zero
280 of being reset checkpoint_seqno.
281
282 New seconds_behind_master timestamp is installed.
283
284 @param shift number of bits to shift by Worker due to the
285 current checkpoint change.
286 @param new_ts new seconds_behind_master timestamp value
287 unless zero. Zero could be due to FD event
288 or fake rotate event.
289 @param need_data_lock False if caller has locked @c data_lock
290 @param update_timestamp if true, this function will update the
291 rli->last_master_timestamp.
292 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool need_data_lock,bool update_timestamp)293 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
294 bool need_data_lock,
295 bool update_timestamp)
296 {
297 /*
298 If this is not a parallel execution we return immediately.
299 */
300 if (!is_parallel_exec())
301 return;
302
303 for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
304 {
305 Slave_worker *w= *it;
306 /*
307 Reseting the notification information in order to force workers to
308 assign jobs with the new updated information.
309 Notice that the bitmap_shifted is accumulated to indicate how many
310 consecutive jobs were successfully processed.
311
312 The worker when assigning a new job will set the value back to
313 zero.
314 */
315 w->checkpoint_notified= FALSE;
316 w->bitmap_shifted= w->bitmap_shifted + shift;
317 /*
318 Zero shift indicates the caller rotates the master binlog.
319 The new name will be passed to W through the group descriptor
320 during the first post-rotation time scheduling.
321 */
322 if (shift == 0)
323 w->master_log_change_notified= false;
324
325 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
326 "worker->bitmap_shifted --> %lu, worker --> %u.",
327 shift, w->bitmap_shifted,
328 static_cast<unsigned>(it - workers.begin())));
329 }
330 /*
331 There should not be a call where (shift == 0 && checkpoint_seqno != 0).
332 Then the new checkpoint sequence is updated by subtracting the number
333 of consecutive jobs that were successfully processed.
334 */
335 assert(current_mts_submode->get_type() != MTS_PARALLEL_TYPE_DB_NAME ||
336 !(shift == 0 && checkpoint_seqno != 0));
337 checkpoint_seqno= checkpoint_seqno - shift;
338 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
339 "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
340
341 if (update_timestamp)
342 {
343 if (need_data_lock)
344 mysql_mutex_lock(&data_lock);
345 else
346 mysql_mutex_assert_owner(&data_lock);
347 last_master_timestamp= new_ts;
348 if (need_data_lock)
349 mysql_mutex_unlock(&data_lock);
350 }
351 }
352
353 /**
354 Reset recovery info from Worker info table and
355 mark MTS recovery is completed.
356
357 @return false on success true when @c reset_notified_checkpoint failed.
358 */
mts_finalize_recovery()359 bool Relay_log_info::mts_finalize_recovery()
360 {
361 bool ret= false;
362 uint i;
363 uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
364
365 DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
366
367 for (Slave_worker **it= workers.begin(); !ret && it != workers.end(); ++it)
368 {
369 Slave_worker *w= *it;
370 ret= w->reset_recovery_info();
371 DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
372 }
373 /*
374 The loop is traversed in the worker index descending order due
375 to specifics of the Worker table repository that does not like
376 even temporary holes. Therefore stale records are deleted
377 from the tail.
378 */
379 DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
380 {DBUG_SET("+d,mts_worker_thread_init_fails");});
381 for (i= recovery_parallel_workers; i > workers.size() && !ret; i--)
382 {
383 Slave_worker *w=
384 Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
385 /*
386 If an error occurs during the above create_worker call, the newly created
387 worker object gets deleted within the above function call itself and only
388 NULL is returned. Hence the following check has been added to verify
389 that a valid worker object exists.
390 */
391 if (w)
392 {
393 ret= w->remove_info();
394 delete w;
395 }
396 else
397 {
398 ret= true;
399 goto err;
400 }
401 }
402 recovery_parallel_workers= slave_parallel_workers;
403
404 err:
405 DBUG_RETURN(ret);
406 }
407
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)408 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
409 {
410 MY_STAT s;
411 DBUG_ENTER("add_relay_log");
412 mysql_mutex_assert_owner(&rli->log_space_lock);
413 if (!mysql_file_stat(key_file_relaylog,
414 linfo->log_file_name, &s, MYF(0)))
415 {
416 sql_print_error("log %s listed in the index, but failed to stat.",
417 linfo->log_file_name);
418 DBUG_RETURN(1);
419 }
420 rli->log_space_total += s.st_size;
421 #ifndef NDEBUG
422 char buf[22];
423 DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
424 #endif
425 DBUG_RETURN(0);
426 }
427
count_relay_log_space()428 int Relay_log_info::count_relay_log_space()
429 {
430 LOG_INFO flinfo;
431 DBUG_ENTER("Relay_log_info::count_relay_log_space");
432 Mutex_lock lock(&log_space_lock);
433 log_space_total= 0;
434 if (relay_log.find_log_pos(&flinfo, NullS, 1))
435 {
436 sql_print_error("Could not find first log while counting relay log space.");
437 DBUG_RETURN(1);
438 }
439 do
440 {
441 if (add_relay_log(this, &flinfo))
442 DBUG_RETURN(1);
443 } while (!relay_log.find_next_log(&flinfo, 1));
444 /*
445 As we have counted everything, including what may have written in a
446 preceding write, we must reset bytes_written, or we may count some space
447 twice.
448 */
449 relay_log.reset_bytes_written();
450 DBUG_RETURN(0);
451 }
452
453 /**
454 Resets UNTIL condition for Relay_log_info
455 */
456
clear_until_condition()457 void Relay_log_info::clear_until_condition()
458 {
459 DBUG_ENTER("clear_until_condition");
460
461 until_condition= Relay_log_info::UNTIL_NONE;
462 until_log_name[0]= 0;
463 until_log_pos= 0;
464 until_sql_gtids.clear();
465 until_sql_gtids_first_event= true;
466 DBUG_VOID_RETURN;
467 }
468
469 /**
470 Opens and intialize the given relay log. Specifically, it does what follows:
471
472 - Closes old open relay log files.
473 - If we are using the same relay log as the running IO-thread, then sets.
474 rli->cur_log to point to the same IO_CACHE entry.
475 - If not, opens the 'log' binary file.
476
477 @todo check proper initialization of
478 group_master_log_name/group_master_log_pos. /alfranio
479
480 @param rli[in] Relay information (will be initialized)
481 @param log[in] Name of relay log file to read from. NULL = First log
482 @param pos[in] Position in relay log file
483 @param need_data_lock[in] If true, this function will acquire the
484 relay_log.data_lock(); otherwise the caller should already have
485 acquired it.
486 @param errmsg[out] On error, this function will store a pointer to
487 an error message here
488 @param keep_looking_for_fd[in] If true, this function will
489 look for a Format_description_log_event. We only need this when the
490 SQL thread starts and opens an existing relay log and has to execute
491 it (possibly from an offset >4); then we need to read the first
492 event of the relay log to be able to parse the events we have to
493 execute.
494
495 @retval 0 ok,
496 @retval 1 error. In this case, *errmsg is set to point to the error
497 message.
498 */
499
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)500 int Relay_log_info::init_relay_log_pos(const char* log,
501 ulonglong pos, bool need_data_lock,
502 const char** errmsg,
503 bool keep_looking_for_fd)
504 {
505 DBUG_ENTER("Relay_log_info::init_relay_log_pos");
506 DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
507
508 *errmsg=0;
509 const char* errmsg_fmt= 0;
510 static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
511 mysql_mutex_t *log_lock= relay_log.get_log_lock();
512
513 if (need_data_lock)
514 mysql_mutex_lock(&data_lock);
515 else
516 mysql_mutex_assert_owner(&data_lock);
517
518 /*
519 By default the relay log is in binlog format 3 (4.0).
520 Even if format is 4, this will work enough to read the first event
521 (Format_desc) (remember that format 4 is just lenghtened compared to format
522 3; format 3 is a prefix of format 4).
523 */
524 set_rli_description_event(new Format_description_log_event(3));
525
526 mysql_mutex_lock(log_lock);
527
528 /* Close log file and free buffers if it's already open */
529 if (cur_log_fd >= 0)
530 {
531 end_io_cache(&cache_buf);
532 mysql_file_close(cur_log_fd, MYF(MY_WME));
533 cur_log_fd = -1;
534 }
535
536 group_relay_log_pos= event_relay_log_pos= pos;
537
538 /*
539 Test to see if the previous run was with the skip of purging
540 If yes, we do not purge when we restart
541 */
542 if (relay_log.find_log_pos(&linfo, NullS, 1))
543 {
544 *errmsg="Could not find first log during relay log initialization";
545 goto err;
546 }
547
548 if (log && relay_log.find_log_pos(&linfo, log, 1))
549 {
550 errmsg_fmt= "Could not find target log file mentioned in "
551 "relay log info in the index file '%s' during "
552 "relay log initialization";
553 sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
554 *errmsg= errmsg_buff;
555 goto err;
556 }
557
558 set_group_relay_log_name(linfo.log_file_name);
559 set_event_relay_log_name(linfo.log_file_name);
560
561 if (relay_log.is_active(linfo.log_file_name))
562 {
563 /*
564 The IO thread is using this log file.
565 In this case, we will use the same IO_CACHE pointer to
566 read data as the IO thread is using to write data.
567 */
568 my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
569 if (check_binlog_magic(cur_log, errmsg))
570 goto err;
571 cur_log_old_open_count=relay_log.get_open_count();
572 }
573 else
574 {
575 /*
576 Open the relay log and set cur_log to point at this one
577 */
578 if ((cur_log_fd=open_binlog_file(&cache_buf,
579 linfo.log_file_name,errmsg)) < 0)
580 goto err;
581 cur_log = &cache_buf;
582 }
583 /*
584 In all cases, check_binlog_magic() has been called so we're at offset 4 for
585 sure.
586 */
587 if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
588 {
589 Log_event* ev;
590 while (keep_looking_for_fd)
591 {
592 /*
593 Read the possible Format_description_log_event; if position
594 was 4, no need, it will be read naturally.
595 */
596 DBUG_PRINT("info",("looking for a Format_description_log_event"));
597
598 if (my_b_tell(cur_log) >= pos)
599 break;
600
601 /*
602 Because of we have data_lock and log_lock, we can safely read an
603 event
604 */
605 if (!(ev= Log_event::read_log_event(cur_log, 0,
606 rli_description_event,
607 opt_slave_sql_verify_checksum)))
608 {
609 DBUG_PRINT("info",("could not read event, cur_log->error=%d",
610 cur_log->error));
611 if (cur_log->error) /* not EOF */
612 {
613 *errmsg= "I/O error reading event at position 4";
614 goto err;
615 }
616 break;
617 }
618 else if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT)
619 {
620 DBUG_PRINT("info",("found Format_description_log_event"));
621 set_rli_description_event((Format_description_log_event *)ev);
622 /*
623 As ev was returned by read_log_event, it has passed is_valid(), so
624 my_malloc() in ctor worked, no need to check again.
625 */
626 /*
627 Ok, we found a Format_description event. But it is not sure that this
628 describes the whole relay log; indeed, one can have this sequence
629 (starting from position 4):
630 Format_desc (of slave)
631 Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
632 Rotate (of master)
633 Format_desc (of master)
634 So the Format_desc which really describes the rest of the relay log
635 can be the 3rd or the 4th event (depending on GTIDs being enabled or
636 not, it can't be further than that, because we rotate
637 the relay log when we queue a Rotate event from the master).
638 But what describes the Rotate is the first Format_desc.
639 So what we do is:
640 go on searching for Format_description events, until you exceed the
641 position (argument 'pos') or until you find an event other than
642 Previous-GTIDs, Rotate or Format_desc.
643 */
644 }
645 else
646 {
647 DBUG_PRINT("info",("found event of another type=%d",
648 ev->get_type_code()));
649 keep_looking_for_fd=
650 (ev->get_type_code() == binary_log::ROTATE_EVENT ||
651 ev->get_type_code() == binary_log::PREVIOUS_GTIDS_LOG_EVENT);
652 delete ev;
653 }
654 }
655 my_b_seek(cur_log,(off_t)pos);
656 #ifndef NDEBUG
657 {
658 char llbuf1[22], llbuf2[22];
659 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
660 llstr(my_b_tell(cur_log),llbuf1),
661 llstr(get_event_relay_log_pos(),llbuf2)));
662 }
663 #endif
664
665 }
666
667 err:
668 /*
669 If we don't purge, we can't honour relay_log_space_limit ;
670 silently discard it
671 */
672 if (!relay_log_purge)
673 {
674 log_space_limit= 0; // todo: consider to throw a warning at least
675 }
676 mysql_cond_broadcast(&data_cond);
677
678 mysql_mutex_unlock(log_lock);
679
680 if (need_data_lock)
681 mysql_mutex_unlock(&data_lock);
682 if (!rli_description_event->is_valid() && !*errmsg)
683 *errmsg= "Invalid Format_description log event; could be out of memory";
684
685 DBUG_RETURN ((*errmsg) ? 1 : 0);
686 }
687
688 /**
689 Update the error number, message and timestamp fields. This function is
690 different from va_report() as va_report() also logs the error message in the
691 log apart from updating the error fields.
692
693 SYNOPSIS
694 @param[in] level specifies the level- error, warning or information,
695 @param[in] err_code error number,
696 @param[in] buff_coord error message to be used.
697
698 */
fill_coord_err_buf(loglevel level,int err_code,const char * buff_coord) const699 void Relay_log_info::fill_coord_err_buf(loglevel level, int err_code,
700 const char *buff_coord) const
701 {
702 mysql_mutex_lock(&err_lock);
703
704 if(level == ERROR_LEVEL)
705 {
706 m_last_error.number = err_code;
707 my_snprintf(m_last_error.message, sizeof(m_last_error.message), "%.*s",
708 MAX_SLAVE_ERRMSG - 1, buff_coord);
709 m_last_error.update_timestamp();
710 }
711
712 mysql_mutex_unlock(&err_lock);
713 }
714
715 /**
716 Waits until the SQL thread reaches (has executed up to) the
717 log/position or timed out.
718
719 SYNOPSIS
720 @param[in] thd client thread that sent @c SELECT @c MASTER_POS_WAIT,
721 @param[in] log_name log name to wait for,
722 @param[in] log_pos position to wait for,
723 @param[in] timeout @c timeout in seconds before giving up waiting.
724 @c timeout is double whereas it should be ulong; but this is
725 to catch if the user submitted a negative timeout.
726
727 @retval -2 improper arguments (log_pos<0)
728 or slave not running, or master info changed
729 during the function's execution,
730 or client thread killed. -2 is translated to NULL by caller,
731 @retval -1 timed out
732 @retval >=0 number of log events the function had to wait
733 before reaching the desired log/position
734 */
735
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)736 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
737 longlong log_pos,
738 double timeout)
739 {
740 int event_count = 0;
741 ulong init_abort_pos_wait;
742 int error=0;
743 struct timespec abstime; // for timeout checking
744 PSI_stage_info old_stage;
745 DBUG_ENTER("Relay_log_info::wait_for_pos");
746
747 if (!inited)
748 DBUG_RETURN(-2);
749
750 DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
751 log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
752
753 DEBUG_SYNC(thd, "begin_master_pos_wait");
754
755 set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
756 mysql_mutex_lock(&data_lock);
757 thd->ENTER_COND(&data_cond, &data_lock,
758 &stage_waiting_for_the_slave_thread_to_advance_position,
759 &old_stage);
760 /*
761 This function will abort when it notices that some CHANGE MASTER or
762 RESET MASTER has changed the master info.
763 To catch this, these commands modify abort_pos_wait ; We just monitor
764 abort_pos_wait and see if it has changed.
765 Why do we have this mechanism instead of simply monitoring slave_running
766 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
767 the SQL thread be stopped?
768 This is becasue if someones does:
769 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
770 the change may happen very quickly and we may not notice that
771 slave_running briefly switches between 1/0/1.
772 */
773 init_abort_pos_wait= abort_pos_wait;
774
775 /*
776 We'll need to
777 handle all possible log names comparisons (e.g. 999 vs 1000).
778 We use ulong for string->number conversion ; this is no
779 stronger limitation than in find_uniq_filename in sql/log.cc
780 */
781 ulong log_name_extension;
782 char log_name_tmp[FN_REFLEN]; //make a char[] from String
783
784 strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
785
786 char *p= fn_ext(log_name_tmp);
787 char *p_end;
788 if (!*p || log_pos<0)
789 {
790 error= -2; //means improper arguments
791 goto err;
792 }
793 // Convert 0-3 to 4
794 log_pos= max(log_pos, static_cast<longlong>(BIN_LOG_HEADER_SIZE));
795 /* p points to '.' */
796 log_name_extension= strtoul(++p, &p_end, 10);
797 /*
798 p_end points to the first invalid character.
799 If it equals to p, no digits were found, error.
800 If it contains '\0' it means conversion went ok.
801 */
802 if (p_end==p || *p_end)
803 {
804 error= -2;
805 goto err;
806 }
807
808 /* The "compare and wait" main loop */
809 while (!thd->killed &&
810 init_abort_pos_wait == abort_pos_wait &&
811 slave_running)
812 {
813 bool pos_reached;
814 int cmp_result= 0;
815
816 DBUG_PRINT("info",
817 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
818 init_abort_pos_wait, abort_pos_wait));
819 DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
820 group_master_log_name, (ulong) group_master_log_pos));
821
822 /*
823 group_master_log_name can be "", if we are just after a fresh
824 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
825 (before we have executed one Rotate event from the master) or
826 (rare) if the user is doing a weird slave setup (see next
827 paragraph). If group_master_log_name is "", we assume we don't
828 have enough info to do the comparison yet, so we just wait until
829 more data. In this case master_log_pos is always 0 except if
830 somebody (wrongly) sets this slave to be a slave of itself
831 without using --replicate-same-server-id (an unsupported
832 configuration which does nothing), then group_master_log_pos
833 will grow and group_master_log_name will stay "".
834 Also in case the group master log position is invalid (e.g. after
835 CHANGE MASTER TO RELAY_LOG_POS ), we will wait till the first event
836 is read and the log position is valid again.
837 */
838 if (*group_master_log_name && !is_group_master_log_pos_invalid)
839 {
840 char *basename= (group_master_log_name +
841 dirname_length(group_master_log_name));
842 /*
843 First compare the parts before the extension.
844 Find the dot in the master's log basename,
845 and protect against user's input error :
846 if the names do not match up to '.' included, return error
847 */
848 char *q= (fn_ext(basename)+1);
849 if (strncmp(basename, log_name_tmp, (int)(q-basename)))
850 {
851 error= -2;
852 break;
853 }
854 // Now compare extensions.
855 char *q_end;
856 ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
857 if (group_master_log_name_extension < log_name_extension)
858 cmp_result= -1 ;
859 else
860 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
861
862 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
863 cmp_result > 0);
864 if (pos_reached || thd->killed)
865 break;
866 }
867
868 //wait for master update, with optional timeout.
869
870 DBUG_PRINT("info",("Waiting for master update"));
871 /*
872 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
873 will wake us up.
874 */
875 thd_wait_begin(thd, THD_WAIT_BINLOG);
876 if (timeout > 0)
877 {
878 /*
879 Note that mysql_cond_timedwait checks for the timeout
880 before for the condition ; i.e. it returns ETIMEDOUT
881 if the system time equals or exceeds the time specified by abstime
882 before the condition variable is signaled or broadcast, _or_ if
883 the absolute time specified by abstime has already passed at the time
884 of the call.
885 For that reason, mysql_cond_timedwait will do the "timeoutting" job
886 even if its condition is always immediately signaled (case of a loaded
887 master).
888 */
889 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
890 }
891 else
892 mysql_cond_wait(&data_cond, &data_lock);
893 thd_wait_end(thd);
894 DBUG_PRINT("info",("Got signal of master update or timed out"));
895 if (error == ETIMEDOUT || error == ETIME)
896 {
897 #ifndef NDEBUG
898 /*
899 Doing this to generate a stack trace and make debugging
900 easier.
901 */
902 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
903 assert(0);
904 #endif
905 error= -1;
906 break;
907 }
908 error=0;
909 event_count++;
910 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
911 }
912
913 err:
914 mysql_mutex_unlock(&data_lock);
915 thd->EXIT_COND(&old_stage);
916 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
917 improper_arguments: %d timed_out: %d",
918 thd->killed_errno(),
919 (int) (init_abort_pos_wait != abort_pos_wait),
920 (int) slave_running,
921 (int) (error == -2),
922 (int) (error == -1)));
923 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
924 !slave_running)
925 {
926 error= -2;
927 }
928 DBUG_RETURN( error ? error : event_count );
929 }
930
wait_for_gtid_set(THD * thd,String * gtid,double timeout)931 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
932 double timeout)
933 {
934 DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, String, timeout)");
935
936 DBUG_PRINT("info", ("Waiting for %s timeout %lf", gtid->c_ptr_safe(),
937 timeout));
938
939 Gtid_set wait_gtid_set(global_sid_map);
940 global_sid_lock->rdlock();
941
942 if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
943 {
944 global_sid_lock->unlock();
945 DBUG_PRINT("exit",("improper gtid argument"));
946 DBUG_RETURN(-2);
947
948 }
949 global_sid_lock->unlock();
950
951 DBUG_RETURN(wait_for_gtid_set(thd, &wait_gtid_set, timeout));
952 }
953
954 /*
955 TODO: This is a duplicated code that needs to be simplified.
956 This will be done while developing all possible sync options.
957 See WL#3584's specification.
958
959 /Alfranio
960 */
wait_for_gtid_set(THD * thd,const Gtid_set * wait_gtid_set,double timeout)961 int Relay_log_info::wait_for_gtid_set(THD* thd, const Gtid_set* wait_gtid_set,
962 double timeout)
963 {
964 int event_count = 0;
965 ulong init_abort_pos_wait;
966 int error=0;
967 struct timespec abstime; // for timeout checking
968 PSI_stage_info old_stage;
969 DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, gtid_set, timeout)");
970
971 if (!inited)
972 DBUG_RETURN(-2);
973
974 DEBUG_SYNC(thd, "begin_wait_for_gtid_set");
975
976 set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
977
978 mysql_mutex_lock(&data_lock);
979 thd->ENTER_COND(&data_cond, &data_lock,
980 &stage_waiting_for_the_slave_thread_to_advance_position,
981 &old_stage);
982 /*
983 This function will abort when it notices that some CHANGE MASTER or
984 RESET MASTER has changed the master info.
985 To catch this, these commands modify abort_pos_wait ; We just monitor
986 abort_pos_wait and see if it has changed.
987 Why do we have this mechanism instead of simply monitoring slave_running
988 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
989 the SQL thread be stopped?
990 This is becasue if someones does:
991 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
992 the change may happen very quickly and we may not notice that
993 slave_running briefly switches between 1/0/1.
994 */
995 init_abort_pos_wait= abort_pos_wait;
996
997 /* The "compare and wait" main loop */
998 while (!thd->killed &&
999 init_abort_pos_wait == abort_pos_wait &&
1000 slave_running)
1001 {
1002 DBUG_PRINT("info",
1003 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
1004 init_abort_pos_wait, abort_pos_wait));
1005
1006 //wait for master update, with optional timeout.
1007
1008 global_sid_lock->wrlock();
1009 const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1010 const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
1011
1012 char *wait_gtid_set_buf;
1013 wait_gtid_set->to_string(&wait_gtid_set_buf);
1014 DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
1015 "!is_intersection_nonempty: %d",
1016 wait_gtid_set_buf,
1017 wait_gtid_set->is_subset(executed_gtids),
1018 !owned_gtids->is_intersection_nonempty(wait_gtid_set)));
1019 my_free(wait_gtid_set_buf);
1020 executed_gtids->dbug_print("gtid_executed:");
1021 owned_gtids->dbug_print("owned_gtids:");
1022
1023 /*
1024 Since commit is performed after log to binary log, we must also
1025 check if any GTID of wait_gtid_set is not yet committed.
1026 */
1027 if (wait_gtid_set->is_subset(executed_gtids) &&
1028 !owned_gtids->is_intersection_nonempty(wait_gtid_set))
1029 {
1030 global_sid_lock->unlock();
1031 break;
1032 }
1033 global_sid_lock->unlock();
1034
1035 DBUG_PRINT("info",("Waiting for master update"));
1036
1037 /*
1038 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
1039 will wake us up.
1040 */
1041 thd_wait_begin(thd, THD_WAIT_BINLOG);
1042 if (timeout > 0)
1043 {
1044 /*
1045 Note that mysql_cond_timedwait checks for the timeout
1046 before for the condition ; i.e. it returns ETIMEDOUT
1047 if the system time equals or exceeds the time specified by abstime
1048 before the condition variable is signaled or broadcast, _or_ if
1049 the absolute time specified by abstime has already passed at the time
1050 of the call.
1051 For that reason, mysql_cond_timedwait will do the "timeoutting" job
1052 even if its condition is always immediately signaled (case of a loaded
1053 master).
1054 */
1055 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
1056 }
1057 else
1058 mysql_cond_wait(&data_cond, &data_lock);
1059 thd_wait_end(thd);
1060 DBUG_PRINT("info",("Got signal of master update or timed out"));
1061 if (error == ETIMEDOUT || error == ETIME)
1062 {
1063 #ifndef NDEBUG
1064 /*
1065 Doing this to generate a stack trace and make debugging
1066 easier.
1067 */
1068 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
1069 assert(0);
1070 #endif
1071 error= -1;
1072 break;
1073 }
1074 error=0;
1075 event_count++;
1076 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
1077 }
1078
1079 mysql_mutex_unlock(&data_lock);
1080 thd->EXIT_COND(&old_stage);
1081 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
1082 improper_arguments: %d timed_out: %d",
1083 thd->killed_errno(),
1084 (int) (init_abort_pos_wait != abort_pos_wait),
1085 (int) slave_running,
1086 (int) (error == -2),
1087 (int) (error == -1)));
1088 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
1089 !slave_running)
1090 {
1091 error= -2;
1092 }
1093 DBUG_RETURN( error ? error : event_count );
1094 }
1095
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)1096 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
1097 bool need_data_lock)
1098 {
1099 int error= 0;
1100 DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
1101
1102 if (need_data_lock)
1103 mysql_mutex_lock(&data_lock);
1104 else
1105 mysql_mutex_assert_owner(&data_lock);
1106
1107 inc_event_relay_log_pos();
1108 group_relay_log_pos= event_relay_log_pos;
1109 strmake(group_relay_log_name,event_relay_log_name,
1110 sizeof(group_relay_log_name)-1);
1111
1112 notify_group_relay_log_name_update();
1113
1114 /*
1115 In 4.x we used the event's len to compute the positions here. This is
1116 wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1117 then the event's len is not what is was in the master's binlog, so this
1118 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1119 replication: Exec_master_log_pos is wrong). Only way to solve this is to
1120 have the original offset of the end of the event the relay log. This is
1121 what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1122 of log_pos in 4.0 was to compute the end_log_pos; so better to store
1123 end_log_pos instead of begin_log_pos.
1124 If we had not done this fix here, the problem would also have appeared
1125 when the slave and master are 5.0 but with different event length (for
1126 example the slave is more recent than the master and features the event
1127 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1128 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1129 value which would lead to badly broken replication.
1130 Even the relay_log_pos will be corrupted in this case, because the len is
1131 the relay log is not "val".
1132 With the end_log_pos solution, we avoid computations involving lengthes.
1133 */
1134 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
1135 (long) log_pos, (long) group_master_log_pos));
1136
1137 if (log_pos > 0) // 3.23 binlogs don't have log_posx
1138 group_master_log_pos= log_pos;
1139 /*
1140 If the master log position was invalidiated by say, "CHANGE MASTER TO
1141 RELAY_LOG_POS=N", it is now valid,
1142 */
1143 if (is_group_master_log_pos_invalid)
1144 is_group_master_log_pos_invalid= false;
1145
1146 /*
1147 In MTS mode FD or Rotate event commit their solitary group to
1148 Coordinator's info table. Callers make sure that Workers have been
1149 executed all assignements.
1150 Broadcast to master_pos_wait() waiters should be done after
1151 the table is updated.
1152 */
1153 assert(!is_parallel_exec() ||
1154 mts_group_status != Relay_log_info::MTS_IN_GROUP);
1155 /*
1156 We do not force synchronization at this point, note the
1157 parameter false, because a non-transactional change is
1158 being committed.
1159
1160 For that reason, the synchronization here is subjected to
1161 the option sync_relay_log_info.
1162
1163 See sql/rpl_rli.h for further information on this behavior.
1164 */
1165 error= flush_info(FALSE);
1166
1167 mysql_cond_broadcast(&data_cond);
1168 if (need_data_lock)
1169 mysql_mutex_unlock(&data_lock);
1170 DBUG_RETURN(error);
1171 }
1172
1173
close_temporary_tables()1174 void Relay_log_info::close_temporary_tables()
1175 {
1176 TABLE *table,*next;
1177 int num_closed_temp_tables= 0;
1178 DBUG_ENTER("Relay_log_info::close_temporary_tables");
1179
1180 for (table=save_temporary_tables ; table ; table=next)
1181 {
1182 next=table->next;
1183 /*
1184 Don't ask for disk deletion. For now, anyway they will be deleted when
1185 slave restarts, but it is a better intention to not delete them.
1186 */
1187 DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1188 close_temporary(table, 1, 0);
1189 num_closed_temp_tables++;
1190 }
1191 save_temporary_tables= 0;
1192 slave_open_temp_tables.atomic_add(-num_closed_temp_tables);
1193 channel_open_temp_tables.atomic_add(-num_closed_temp_tables);
1194 DBUG_VOID_RETURN;
1195 }
1196
1197 /**
1198 Purges relay logs. It assumes to have a run lock on rli and that no
1199 slave thread are running.
1200
1201 @param[in] THD connection,
1202 @param[in] just_reset if false, it tells that logs should be purged
1203 and @c init_relay_log_pos() should be called,
1204 @errmsg[out] errmsg store pointer to an error message.
1205
1206 @retval 0 successfuly executed,
1207 @retval 1 otherwise error, where errmsg is set to point to the error message.
1208 */
1209
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg,bool delete_only)1210 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1211 const char** errmsg, bool delete_only)
1212 {
1213 int error=0;
1214 const char *ln;
1215 /* name of the index file if opt_relaylog_index_name is set*/
1216 const char* log_index_name;
1217 /*
1218 Buffer to add channel name suffix when relay-log-index option is
1219 provided
1220 */
1221 char relay_bin_index_channel[FN_REFLEN];
1222
1223 const char *ln_without_channel_name;
1224 /*
1225 Buffer to add channel name suffix when relay-log option is provided.
1226 */
1227 char relay_bin_channel[FN_REFLEN];
1228
1229 char buffer[FN_REFLEN];
1230
1231 mysql_mutex_t *log_lock= relay_log.get_log_lock();
1232
1233 DBUG_ENTER("Relay_log_info::purge_relay_logs");
1234
1235 /*
1236 Even if inited==0, we still try to empty master_log_* variables. Indeed,
1237 inited==0 does not imply that they already are empty.
1238
1239 It could be that slave's info initialization partly succeeded: for example
1240 if relay-log.info existed but *relay-bin*.* have been manually removed,
1241 init_info reads the old relay-log.info and fills rli->master_log_*, then
1242 init_info checks for the existence of the relay log, this fails and
1243 init_info leaves inited to 0.
1244 In that pathological case, master_log_pos* will be properly reinited at
1245 the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1246 purge_relay_logs, will delete bogus *.info files or replace them with
1247 correct files), however if the user does SHOW SLAVE STATUS before START
1248 SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1249 master_log_* for SHOW SLAVE STATUS to display fine in any case.
1250 */
1251 group_master_log_name[0]= 0;
1252 group_master_log_pos= 0;
1253
1254 /*
1255 Following the the relay log purge, the master_log_pos will be in sync
1256 with relay_log_pos, so the flag should be cleared. Refer bug#11766010.
1257 */
1258
1259 is_group_master_log_pos_invalid= false;
1260
1261 if (!inited)
1262 {
1263 DBUG_PRINT("info", ("inited == 0"));
1264 if (error_on_rli_init_info ||
1265 /*
1266 mi->reset means that the channel was reset but still exists. Channel
1267 shall have the index and the first relay log file.
1268
1269 Those files shall be remove in a following RESET SLAVE ALL (even when
1270 channel was not inited again).
1271 */
1272 (mi->reset && delete_only))
1273 {
1274 ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
1275 "-relay-bin", buffer);
1276
1277 ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1278 ln_without_channel_name);
1279 if (opt_relaylog_index_name)
1280 {
1281 char index_file_withoutext[FN_REFLEN];
1282 relay_log.generate_name(opt_relaylog_index_name,"",
1283 index_file_withoutext);
1284
1285 log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
1286 FN_REFLEN,
1287 index_file_withoutext);
1288 }
1289 else
1290 log_index_name= 0;
1291
1292 if (relay_log.open_index_file(log_index_name, ln, TRUE))
1293 {
1294 sql_print_error("Unable to purge relay log files. Failed to open relay "
1295 "log index file:%s.", relay_log.get_index_fname());
1296 DBUG_RETURN(1);
1297 }
1298 mysql_mutex_lock(&mi->data_lock);
1299 mysql_mutex_lock(log_lock);
1300 if (relay_log.open_binlog(ln, 0,
1301 (max_relay_log_size ? max_relay_log_size :
1302 max_binlog_size), true,
1303 true/*need_lock_index=true*/,
1304 true/*need_sid_lock=true*/,
1305 mi->get_mi_description_event()))
1306 {
1307 mysql_mutex_unlock(log_lock);
1308 mysql_mutex_unlock(&mi->data_lock);
1309 sql_print_error("Unable to purge relay log files. Failed to open relay "
1310 "log file:%s.", relay_log.get_log_fname());
1311 DBUG_RETURN(1);
1312 }
1313 mysql_mutex_unlock(log_lock);
1314 mysql_mutex_unlock(&mi->data_lock);
1315 }
1316 else
1317 DBUG_RETURN(0);
1318 }
1319 else
1320 {
1321 assert(slave_running == 0);
1322 assert(mi->slave_running == 0);
1323 }
1324 /* Reset the transaction boundary parser and clear the last GTID queued */
1325 mi->transaction_parser.reset();
1326 mi->clear_last_gtid_queued();
1327
1328 slave_skip_counter= 0;
1329 mysql_mutex_lock(&data_lock);
1330
1331 /*
1332 we close the relay log fd possibly left open by the slave SQL thread,
1333 to be able to delete it; the relay log fd possibly left open by the slave
1334 I/O thread will be closed naturally in reset_logs() by the
1335 close(LOG_CLOSE_TO_BE_OPENED) call
1336 */
1337 if (cur_log_fd >= 0)
1338 {
1339 end_io_cache(&cache_buf);
1340 my_close(cur_log_fd, MYF(MY_WME));
1341 cur_log_fd= -1;
1342 }
1343
1344 /**
1345 Clear the retrieved gtid set for this channel.
1346 global_sid_lock->wrlock() is needed.
1347 */
1348 global_sid_lock->wrlock();
1349 (const_cast<Gtid_set *>(get_gtid_set()))->clear();
1350 global_sid_lock->unlock();
1351
1352 if (relay_log.reset_logs(thd, delete_only))
1353 {
1354 *errmsg = "Failed during log reset";
1355 error=1;
1356 goto err;
1357 }
1358
1359 /* Save name of used relay log file */
1360 set_group_relay_log_name(relay_log.get_log_fname());
1361 set_event_relay_log_name(relay_log.get_log_fname());
1362 group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1363 if (!delete_only && count_relay_log_space())
1364 {
1365 *errmsg= "Error counting relay log space";
1366 error= 1;
1367 goto err;
1368 }
1369 if (!just_reset)
1370 error= init_relay_log_pos(group_relay_log_name,
1371 group_relay_log_pos,
1372 false/*need_data_lock=false*/, errmsg, 0);
1373 if (!inited && error_on_rli_init_info)
1374 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1375 true/*need_lock_log=true*/,
1376 true/*need_lock_index=true*/);
1377 err:
1378 #ifndef NDEBUG
1379 char buf[22];
1380 #endif
1381 DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1382 mysql_mutex_unlock(&data_lock);
1383 DBUG_RETURN(error);
1384 }
1385
1386 /*
1387 When --relay-bin option is not provided, the names of the
1388 relay log files are host-relay-bin.0000x or
1389 host-relay-bin-CHANNEL.00000x in the case of MSR.
1390 However, if that option is provided, then the names of the
1391 relay log files are <relay-bin-option>.0000x or
1392 <relay-bin-option>-CHANNEL.00000x in the case of MSR.
1393
1394 The function adds a channel suffix (according to the channel to file name
1395 conventions and conversions) to the relay log file.
1396
1397 @todo: truncate the log file if length exceeds.
1398 */
1399
1400 const char*
add_channel_to_relay_log_name(char * buff,uint buff_size,const char * base_name)1401 Relay_log_info::add_channel_to_relay_log_name(char *buff, uint buff_size,
1402 const char *base_name)
1403 {
1404 char *ptr;
1405 char channel_to_file[FN_REFLEN];
1406 uint errors, length;
1407 uint base_name_len;
1408 uint suffix_buff_size;
1409
1410 assert(base_name !=NULL);
1411
1412 base_name_len= strlen(base_name);
1413 suffix_buff_size= buff_size - base_name_len;
1414
1415 ptr= strmake(buff, base_name, buff_size-1);
1416
1417 if (channel[0])
1418 {
1419
1420 /* adding a "-" */
1421 ptr= strmake(ptr, "-", suffix_buff_size-1);
1422
1423 /*
1424 Convert the channel name to the file names charset.
1425 Channel name is in system_charset which is UTF8_general_ci
1426 as it was defined as utf8 in the mysql.slaveinfo tables.
1427 */
1428 length= strconvert(system_charset_info, channel, &my_charset_filename,
1429 channel_to_file, NAME_LEN, &errors);
1430 ptr= strmake(ptr, channel_to_file, suffix_buff_size-length-1);
1431
1432 }
1433
1434 return (const char*)buff;
1435 }
1436
1437
1438 /**
1439 Checks if condition stated in UNTIL clause of START SLAVE is reached.
1440
1441 Specifically, it checks if UNTIL condition is reached. Uses caching result
1442 of last comparison of current log file name and target log file name. So
1443 cached value should be invalidated if current log file name changes (see
1444 @c Relay_log_info::notify_... functions).
1445
1446 This caching is needed to avoid of expensive string comparisons and
1447 @c strtol() conversions needed for log names comparison. We don't need to
1448 compare them each time this function is called, we only need to do this
1449 when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1450 need to do this only after @c Rotate_log_event::do_apply_event() (which is
1451 rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1452 condition then we should invalidate cached comarison value after
1453 @c inc_group_relay_log_pos() which called for each group of events (so we
1454 have some benefit if we have something like queries that use
1455 autoincrement or if we have transactions).
1456
1457 Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1458
1459 @param master_beg_pos position of the beginning of to be executed event
1460 (not @c log_pos member of the event that points to
1461 the beginning of the following event)
1462
1463 @retval true condition met or error happened (condition seems to have
1464 bad log file name),
1465 @retval false condition not met.
1466 */
1467
is_until_satisfied(THD * thd,Log_event * ev)1468 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1469 {
1470 char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1471 "condition is bad.";
1472 DBUG_ENTER("Relay_log_info::is_until_satisfied");
1473
1474 switch (until_condition)
1475 {
1476 case UNTIL_MASTER_POS:
1477 case UNTIL_RELAY_POS:
1478 {
1479 const char *log_name= NULL;
1480 ulonglong log_pos= 0;
1481
1482 if (until_condition == UNTIL_MASTER_POS)
1483 {
1484 if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1485 DBUG_RETURN(false);
1486 /*
1487 Rotate events originating from the slave have server_id==0,
1488 and their log_pos is relative to the slave, so in case their
1489 log_pos is greater than the log_pos we are waiting for, they
1490 can cause the slave to stop prematurely. So we ignore such
1491 events.
1492 */
1493 if (ev && ev->server_id == 0)
1494 DBUG_RETURN(false);
1495 log_name= group_master_log_name;
1496 if (!ev || is_in_group() || !ev->common_header->log_pos)
1497 log_pos= group_master_log_pos;
1498 else
1499 log_pos= ev->common_header->log_pos - ev->common_header->data_written;
1500 }
1501 else
1502 { /* until_condition == UNTIL_RELAY_POS */
1503 log_name= group_relay_log_name;
1504 log_pos= group_relay_log_pos;
1505 }
1506
1507 #ifndef NDEBUG
1508 {
1509 char buf[32];
1510 DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1511 group_master_log_name, llstr(group_master_log_pos, buf)));
1512 DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1513 group_relay_log_name, llstr(group_relay_log_pos, buf)));
1514 DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1515 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1516 log_name, llstr(log_pos, buf)));
1517 DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1518 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1519 until_log_name, llstr(until_log_pos, buf)));
1520 }
1521 #endif
1522
1523 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1524 {
1525 /*
1526 We have no cached comparison results so we should compare log names
1527 and cache result.
1528 If we are after RESET SLAVE, and the SQL slave thread has not processed
1529 any event yet, it could be that group_master_log_name is "". In that case,
1530 just wait for more events (as there is no sensible comparison to do).
1531 */
1532
1533 if (*log_name)
1534 {
1535 const char *basename= log_name + dirname_length(log_name);
1536
1537 const char *q= (const char*)(fn_ext(basename)+1);
1538 if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1539 {
1540 /* Now compare extensions. */
1541 char *q_end;
1542 ulong log_name_extension= strtoul(q, &q_end, 10);
1543 if (log_name_extension < until_log_name_extension)
1544 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1545 else
1546 until_log_names_cmp_result=
1547 (log_name_extension > until_log_name_extension) ?
1548 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1549 }
1550 else
1551 {
1552 /* Base names do not match, so we abort */
1553 sql_print_error("%s", error_msg);
1554 DBUG_RETURN(true);
1555 }
1556 }
1557 else
1558 DBUG_RETURN(until_log_pos == 0);
1559 }
1560
1561 if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1562 log_pos >= until_log_pos) ||
1563 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1564 {
1565 char buf[22];
1566 sql_print_information("Slave SQL thread stopped because it reached its"
1567 " UNTIL position %s", llstr(until_pos(), buf));
1568 DBUG_RETURN(true);
1569 }
1570 DBUG_RETURN(false);
1571 }
1572
1573 case UNTIL_SQL_BEFORE_GTIDS:
1574 /*
1575 We only need to check once if executed_gtids set
1576 contains any of the until_sql_gtids.
1577 */
1578 if (until_sql_gtids_first_event)
1579 {
1580 until_sql_gtids_first_event= false;
1581 global_sid_lock->wrlock();
1582 /* Check if until GTIDs were already applied. */
1583 const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1584 if (until_sql_gtids.is_intersection_nonempty(executed_gtids))
1585 {
1586 char *buffer;
1587 until_sql_gtids.to_string(&buffer);
1588 global_sid_lock->unlock();
1589 sql_print_information("Slave SQL thread stopped because "
1590 "UNTIL SQL_BEFORE_GTIDS %s is already "
1591 "applied", buffer);
1592 my_free(buffer);
1593 DBUG_RETURN(true);
1594 }
1595 global_sid_lock->unlock();
1596 }
1597 if (ev != NULL && ev->get_type_code() == binary_log::GTID_LOG_EVENT)
1598 {
1599 Gtid_log_event *gev= (Gtid_log_event *)ev;
1600 global_sid_lock->rdlock();
1601 if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1602 {
1603 char *buffer;
1604 until_sql_gtids.to_string(&buffer);
1605 global_sid_lock->unlock();
1606 sql_print_information("Slave SQL thread stopped because it reached "
1607 "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1608 my_free(buffer);
1609 DBUG_RETURN(true);
1610 }
1611 global_sid_lock->unlock();
1612 }
1613 DBUG_RETURN(false);
1614 break;
1615
1616 case UNTIL_SQL_AFTER_GTIDS:
1617 {
1618 global_sid_lock->wrlock();
1619 const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1620 if (until_sql_gtids.is_subset(executed_gtids))
1621 {
1622 char *buffer;
1623 until_sql_gtids.to_string(&buffer);
1624 global_sid_lock->unlock();
1625 sql_print_information("Slave SQL thread stopped because it reached "
1626 "UNTIL SQL_AFTER_GTIDS %s", buffer);
1627 my_free(buffer);
1628 DBUG_RETURN(true);
1629 }
1630 global_sid_lock->unlock();
1631 DBUG_RETURN(false);
1632 }
1633 break;
1634
1635 case UNTIL_SQL_AFTER_MTS_GAPS:
1636 case UNTIL_DONE:
1637 /*
1638 TODO: this condition is actually post-execution or post-scheduling
1639 so the proper place to check it before SQL thread goes
1640 into next_event() where it can wait while the condition
1641 has been satisfied already.
1642 It's deployed here temporarily to be fixed along the regular UNTIL
1643 support for MTS is provided.
1644 */
1645 if (mts_recovery_group_cnt == 0)
1646 {
1647 sql_print_information("Slave SQL thread stopped according to "
1648 "UNTIL SQL_AFTER_MTS_GAPS as it has "
1649 "processed all gap transactions left from "
1650 "the previous slave session.");
1651 until_condition= UNTIL_DONE;
1652 DBUG_RETURN(true);
1653 }
1654 else
1655 {
1656 DBUG_RETURN(false);
1657 }
1658 break;
1659
1660 case UNTIL_SQL_VIEW_ID:
1661 if (ev != NULL && ev->get_type_code() == binary_log::VIEW_CHANGE_EVENT)
1662 {
1663 View_change_log_event *view_event= (View_change_log_event *)ev;
1664
1665 if (until_view_id.compare(view_event->get_view_id()) == 0)
1666 {
1667 set_group_replication_retrieved_certification_info(view_event);
1668 until_view_id_found= true;
1669 DBUG_RETURN(false);
1670 }
1671 }
1672
1673 if (until_view_id_found && ev != NULL && ev->ends_group())
1674 {
1675 until_view_id_commit_found= true;
1676 DBUG_RETURN(false);
1677 }
1678
1679 if (until_view_id_commit_found && ev == NULL)
1680 {
1681 DBUG_RETURN(true);
1682 }
1683
1684 DBUG_RETURN(false);
1685 break;
1686
1687 case UNTIL_NONE:
1688 assert(0);
1689 break;
1690 }
1691
1692 assert(0);
1693 DBUG_RETURN(false);
1694 }
1695
cached_charset_invalidate()1696 void Relay_log_info::cached_charset_invalidate()
1697 {
1698 DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1699
1700 /* Full of zeroes means uninitialized. */
1701 memset(cached_charset, 0, sizeof(cached_charset));
1702 DBUG_VOID_RETURN;
1703 }
1704
1705
cached_charset_compare(char * charset) const1706 bool Relay_log_info::cached_charset_compare(char *charset) const
1707 {
1708 DBUG_ENTER("Relay_log_info::cached_charset_compare");
1709
1710 if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1711 {
1712 memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1713 DBUG_RETURN(1);
1714 }
1715 DBUG_RETURN(0);
1716 }
1717
1718
stmt_done(my_off_t event_master_log_pos)1719 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1720 {
1721 int error= 0;
1722
1723 clear_flag(IN_STMT);
1724
1725 assert(!belongs_to_client());
1726 /* Worker does not execute binlog update position logics */
1727 assert(!is_mts_worker(info_thd));
1728
1729 /*
1730 Replication keeps event and group positions to specify the
1731 set of events that were executed.
1732 Event positions are incremented after processing each event
1733 whereas group positions are incremented when an event or a
1734 set of events is processed such as in a transaction and are
1735 committed or rolled back.
1736
1737 A transaction can be ended with a Query Event, i.e. either
1738 commit or rollback, or by a Xid Log Event. Query Event is
1739 used to terminate pseudo-transactions that are executed
1740 against non-transactional engines such as MyIsam. Xid Log
1741 Event denotes though that a set of changes executed
1742 against a transactional engine is about to commit.
1743
1744 Events' positions are incremented at stmt_done(). However,
1745 transactions that are ended with Xid Log Event have their
1746 group position incremented in the do_apply_event() and in
1747 the do_apply_event_work().
1748
1749 Notice that the type of the engine, i.e. where data and
1750 positions are stored, against what events are being applied
1751 are not considered in this logic.
1752
1753 Regarding the code that follows, notice that the executed
1754 group coordinates don't change if the current event is internal
1755 to the group. The same applies to MTS Coordinator when it
1756 handles a Format Descriptor event that appears in the middle
1757 of a group that is about to be assigned.
1758 */
1759 if ((!is_parallel_exec() && is_in_group()) ||
1760 mts_group_status != MTS_NOT_IN_GROUP)
1761 {
1762 inc_event_relay_log_pos();
1763 }
1764 else
1765 {
1766 if (is_parallel_exec())
1767 {
1768
1769 assert(!is_mts_worker(info_thd));
1770
1771 /*
1772 Format Description events only can drive MTS execution to this
1773 point. It is a special event group that is handled with
1774 synchronization. For that reason, the checkpoint routine is
1775 called here.
1776 */
1777 error= mts_checkpoint_routine(this, 0, false,
1778 true/*need_data_lock=true*/);
1779 }
1780 if (!error)
1781 error= inc_group_relay_log_pos(event_master_log_pos,
1782 true/*need_data_lock=true*/);
1783 }
1784
1785 return error;
1786 }
1787
1788 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1789 void Relay_log_info::cleanup_context(THD *thd, bool error)
1790 {
1791 DBUG_ENTER("Relay_log_info::cleanup_context");
1792
1793 assert(info_thd == thd);
1794 /*
1795 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1796 may have opened tables, which we cannot be sure have been closed (because
1797 maybe the Rows_log_event have not been found or will not be, because slave
1798 SQL thread is stopping, or relay log has a missing tail etc). So we close
1799 all thread's tables. And so the table mappings have to be cancelled.
1800 2) Rows_log_event::do_apply_event() may even have started statements or
1801 transactions on them, which we need to rollback in case of error.
1802 3) If finding a Format_description_log_event after a BEGIN, we also need
1803 to rollback before continuing with the next events.
1804 4) so we need this "context cleanup" function.
1805 */
1806 if (error)
1807 {
1808 trans_rollback_stmt(thd); // if a "statement transaction"
1809 trans_rollback(thd); // if a "real transaction"
1810 }
1811 if (rows_query_ev)
1812 {
1813 /*
1814 In order to avoid invalid memory access, THD::reset_query() should be
1815 called before deleting the rows_query event.
1816 */
1817 info_thd->reset_query();
1818 info_thd->reset_query_for_display();
1819 delete rows_query_ev;
1820 rows_query_ev= NULL;
1821 DBUG_EXECUTE_IF("after_deleting_the_rows_query_ev",
1822 {
1823 const char action[]="now SIGNAL deleted_rows_query_ev WAIT_FOR go_ahead";
1824 assert(!debug_sync_set_action(info_thd,
1825 STRING_WITH_LEN(action)));
1826 };);
1827 }
1828 m_table_map.clear_tables();
1829 slave_close_thread_tables(thd);
1830 if (error)
1831 {
1832 /*
1833 trans_rollback above does not rollback XA transactions.
1834 It could be done only after necessarily closing tables which dictates
1835 the following placement.
1836 */
1837 XID_STATE *xid_state= thd->get_transaction()->xid_state();
1838 if (!xid_state->has_state(XID_STATE::XA_NOTR))
1839 {
1840 assert(DBUG_EVALUATE_IF("simulate_commit_failure",1,
1841 xid_state->has_state(XID_STATE::XA_ACTIVE) ||
1842 xid_state->has_state(XID_STATE::XA_IDLE)
1843 ));
1844
1845 xa_trans_force_rollback(thd);
1846 xid_state->reset();
1847 cleanup_trans_state(thd);
1848 thd->rpl_unflag_detached_engine_ha_data();
1849 }
1850 thd->mdl_context.release_transactional_locks();
1851 }
1852 clear_flag(IN_STMT);
1853 /*
1854 Cleanup for the flags that have been set at do_apply_event.
1855 */
1856 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1857 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1858
1859 /*
1860 Reset state related to long_find_row notes in the error log:
1861 - timestamp
1862 - flag that decides whether the slave prints or not
1863 */
1864 reset_row_stmt_start_timestamp();
1865 unset_long_find_row_note_printed();
1866
1867 /*
1868 If the slave applier changed the current transaction isolation level,
1869 it need to be restored to the session default value once having the
1870 current transaction cleared.
1871
1872 We should call "trans_reset_one_shot_chistics()" only if the "error"
1873 flag is "true", because "cleanup_context()" is called at the end of each
1874 set of Table_maps/Rows representing a statement (when the rows event
1875 is tagged with the STMT_END_F) with the "error" flag as "false".
1876
1877 So, without the "if (error)" below, the isolation level might be reset
1878 in the middle of a pure row based transaction.
1879 */
1880 if (error)
1881 trans_reset_one_shot_chistics(thd);
1882
1883 DBUG_VOID_RETURN;
1884 }
1885
clear_tables_to_lock()1886 void Relay_log_info::clear_tables_to_lock()
1887 {
1888 DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
1889 #ifndef NDEBUG
1890 /**
1891 When replicating in RBR and MyISAM Merge tables are involved
1892 open_and_lock_tables (called in do_apply_event) appends the
1893 base tables to the list of tables_to_lock. Then these are
1894 removed from the list in close_thread_tables (which is called
1895 before we reach this point).
1896
1897 This assertion just confirms that we get no surprises at this
1898 point.
1899 */
1900 uint i=0;
1901 for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1902 assert(i == tables_to_lock_count);
1903 #endif
1904
1905 while (tables_to_lock)
1906 {
1907 uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1908 if (tables_to_lock->m_tabledef_valid)
1909 {
1910 tables_to_lock->m_tabledef.table_def::~table_def();
1911 tables_to_lock->m_tabledef_valid= FALSE;
1912 }
1913
1914 /*
1915 If blob fields were used during conversion of field values
1916 from the master table into the slave table, then we need to
1917 free the memory used temporarily to store their values before
1918 copying into the slave's table.
1919 */
1920 if (tables_to_lock->m_conv_table)
1921 free_blobs(tables_to_lock->m_conv_table);
1922
1923 tables_to_lock=
1924 static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1925 tables_to_lock_count--;
1926 my_free(to_free);
1927 }
1928 assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1929 DBUG_VOID_RETURN;
1930 }
1931
slave_close_thread_tables(THD * thd)1932 void Relay_log_info::slave_close_thread_tables(THD *thd)
1933 {
1934 thd->get_stmt_da()->set_overwrite_status(true);
1935 DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
1936 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1937 thd->get_stmt_da()->set_overwrite_status(false);
1938
1939 close_thread_tables(thd);
1940 /*
1941 - If transaction rollback was requested due to deadlock
1942 perform it and release metadata locks.
1943 - If inside a multi-statement transaction,
1944 defer the release of metadata locks until the current
1945 transaction is either committed or rolled back. This prevents
1946 other statements from modifying the table for the entire
1947 duration of this transaction. This provides commit ordering
1948 and guarantees serializability across multiple transactions.
1949 - If in autocommit mode, or outside a transactional context,
1950 automatically release metadata locks of the current statement.
1951 */
1952 if (thd->transaction_rollback_request)
1953 {
1954 trans_rollback_implicit(thd);
1955 thd->mdl_context.release_transactional_locks();
1956 }
1957 else if (! thd->in_multi_stmt_transaction_mode())
1958 thd->mdl_context.release_transactional_locks();
1959 else
1960 thd->mdl_context.release_statement_locks();
1961
1962 clear_tables_to_lock();
1963 DBUG_VOID_RETURN;
1964 }
1965
1966
1967 /**
1968 Execute a SHOW RELAYLOG EVENTS statement.
1969
1970 When multiple replication channels exist on this slave
1971 and no channel name is specified through FOR CHANNEL clause
1972 this function errors out and exits.
1973
1974 @param thd Pointer to THD object for the client thread executing the
1975 statement.
1976
1977 @retval FALSE success
1978 @retval TRUE failure
1979 */
mysql_show_relaylog_events(THD * thd)1980 bool mysql_show_relaylog_events(THD* thd)
1981 {
1982
1983 Master_info *mi =0;
1984 List<Item> field_list;
1985 bool res;
1986 DBUG_ENTER("mysql_show_relaylog_events");
1987
1988 assert(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1989
1990 channel_map.wrlock();
1991
1992 if (!thd->lex->mi.for_channel && channel_map.get_num_instances() > 1)
1993 {
1994 my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0));
1995 res= true;
1996 goto err;
1997 }
1998
1999 Log_event::init_show_field_list(&field_list);
2000 if (thd->send_result_metadata(&field_list,
2001 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2002 {
2003 res= true;
2004 goto err;
2005 }
2006
2007 mi= channel_map.get_mi(thd->lex->mi.channel);
2008
2009 if (!mi && strcmp(thd->lex->mi.channel, channel_map.get_default_channel()))
2010 {
2011 my_error(ER_SLAVE_CHANNEL_DOES_NOT_EXIST, MYF(0), thd->lex->mi.channel);
2012 res= true;
2013 goto err;
2014 }
2015
2016 if (mi == NULL)
2017 {
2018 my_error(ER_SLAVE_CONFIGURATION, MYF(0));
2019 res= true;
2020 goto err;
2021 }
2022
2023 res= show_binlog_events(thd, &mi->rli->relay_log);
2024
2025 err:
2026 channel_map.unlock();
2027
2028 DBUG_RETURN(res);
2029 }
2030 #endif
2031
2032
rli_init_info()2033 int Relay_log_info::rli_init_info()
2034 {
2035 int error= 0;
2036 enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
2037 const char *msg= NULL;
2038 /* Store the GTID of a transaction spanned in multiple relay log files */
2039 Gtid gtid_partial_trx= {0, 0};
2040
2041 DBUG_ENTER("Relay_log_info::rli_init_info");
2042
2043 mysql_mutex_assert_owner(&data_lock);
2044
2045 /*
2046 If Relay_log_info is issued again after a failed init_info(), for
2047 instance because of missing relay log files, it will generate new
2048 files and ignore the previous failure, to avoid that we set
2049 error_on_rli_init_info as true.
2050 This a consequence of the behaviour change, in the past server was
2051 stopped when there were replication initialization errors, now it is
2052 not and so init_info() must be aware of previous failures.
2053 */
2054 if (error_on_rli_init_info)
2055 goto err;
2056
2057 if (inited)
2058 {
2059 /*
2060 We have to reset read position of relay-log-bin as we may have
2061 already been reading from 'hotlog' when the slave was stopped
2062 last time. If this case pos_in_file would be set and we would
2063 get a crash when trying to read the signature for the binary
2064 relay log.
2065
2066 We only rewind the read position if we are starting the SQL
2067 thread. The handle_slave_sql thread assumes that the read
2068 position is at the beginning of the file, and will read the
2069 "signature" and then fast-forward to the last position read.
2070 */
2071 bool hot_log= FALSE;
2072 /*
2073 my_b_seek does an implicit flush_io_cache, so we need to:
2074
2075 1. check if this log is active (hot)
2076 2. if it is we keep log_lock until the seek ends, otherwise
2077 release it right away.
2078
2079 If we did not take log_lock, SQL thread might race with IO
2080 thread for the IO_CACHE mutex.
2081
2082 */
2083 mysql_mutex_t *log_lock= relay_log.get_log_lock();
2084 mysql_mutex_lock(log_lock);
2085 hot_log= relay_log.is_active(linfo.log_file_name);
2086
2087 if (!hot_log)
2088 mysql_mutex_unlock(log_lock);
2089
2090 my_b_seek(cur_log, (my_off_t) 0);
2091
2092 if (hot_log)
2093 mysql_mutex_unlock(log_lock);
2094 DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
2095 }
2096
2097 cur_log_fd = -1;
2098 slave_skip_counter= 0;
2099 abort_pos_wait= 0;
2100 log_space_limit= relay_log_space_limit;
2101 log_space_total= 0;
2102 tables_to_lock= 0;
2103 tables_to_lock_count= 0;
2104
2105 char pattern[FN_REFLEN];
2106 (void) my_realpath(pattern, slave_load_tmpdir, 0);
2107 /*
2108 @TODO:
2109 In MSR, sometimes slave fail with the following error:
2110 Unable to use slave's temporary directory /tmp -
2111 Can't create/write to file '/tmp/SQL_LOAD-92d1eee0-9de4-11e3-8874-68730ad50fcb' (Errcode: 17 - File exists), Error_code: 1
2112
2113 */
2114 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
2115 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
2116 {
2117 sql_print_error("Unable to use slave's temporary directory '%s'.",
2118 slave_load_tmpdir);
2119 DBUG_RETURN(1);
2120 }
2121 unpack_filename(slave_patternload_file, pattern);
2122 slave_patternload_file_size= strlen(slave_patternload_file);
2123
2124 /*
2125 The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
2126 Note that the I/O thread flushes it to disk after writing every
2127 event, in flush_info within the master info.
2128 */
2129 /*
2130 For the maximum log size, we choose max_relay_log_size if it is
2131 non-zero, max_binlog_size otherwise. If later the user does SET
2132 GLOBAL on one of these variables, fix_max_binlog_size and
2133 fix_max_relay_log_size will reconsider the choice (for example
2134 if the user changes max_relay_log_size to zero, we have to
2135 switch to using max_binlog_size for the relay log) and update
2136 relay_log.max_size (and mysql_bin_log.max_size).
2137 */
2138 {
2139 /* Reports an error and returns, if the --relay-log's path
2140 is a directory.*/
2141 if (opt_relay_logname &&
2142 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
2143 {
2144 sql_print_error("Path '%s' is a directory name, please specify \
2145 a file name for --relay-log option.", opt_relay_logname);
2146 DBUG_RETURN(1);
2147 }
2148
2149 /* Reports an error and returns, if the --relay-log-index's path
2150 is a directory.*/
2151 if (opt_relaylog_index_name &&
2152 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
2153 == FN_LIBCHAR)
2154 {
2155 sql_print_error("Path '%s' is a directory name, please specify \
2156 a file name for --relay-log-index option.", opt_relaylog_index_name);
2157 DBUG_RETURN(1);
2158 }
2159
2160 char buf[FN_REFLEN];
2161 /* The base name of the relay log file considering multisource rep */
2162 const char *ln;
2163 /*
2164 relay log name without channel prefix taking into account
2165 --relay-log option.
2166 */
2167 const char *ln_without_channel_name;
2168 static bool name_warning_sent= 0;
2169
2170 /*
2171 Buffer to add channel name suffix when relay-log option is provided.
2172 */
2173 char relay_bin_channel[FN_REFLEN];
2174 /*
2175 Buffer to add channel name suffix when relay-log-index option is provided
2176 */
2177 char relay_bin_index_channel[FN_REFLEN];
2178
2179 /* name of the index file if opt_relaylog_index_name is set*/
2180 const char* log_index_name;
2181
2182
2183 ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
2184 "-relay-bin", buf);
2185
2186 ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
2187 ln_without_channel_name);
2188
2189 /* We send the warning only at startup, not after every RESET SLAVE */
2190 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
2191 {
2192 /*
2193 User didn't give us info to name the relay log index file.
2194 Picking `hostname`-relay-bin.index like we do, causes replication to
2195 fail if this slave's hostname is changed later. So, we would like to
2196 instead require a name. But as we don't want to break many existing
2197 setups, we only give warning, not error.
2198 */
2199 sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
2200 " so replication "
2201 "may break when this MySQL server acts as a "
2202 "slave and has his hostname changed!! Please "
2203 "use '--relay-log=%s' to avoid this problem.",
2204 ln_without_channel_name);
2205 name_warning_sent= 1;
2206 }
2207
2208 relay_log.is_relay_log= TRUE;
2209
2210 /*
2211 If relay log index option is set, convert into channel specific
2212 index file. If the opt_relaylog_index has an extension, we strip
2213 it too. This is inconsistent to relay log names.
2214 */
2215 if (opt_relaylog_index_name)
2216 {
2217 char index_file_withoutext[FN_REFLEN];
2218 relay_log.generate_name(opt_relaylog_index_name,"",
2219 index_file_withoutext);
2220
2221 log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
2222 FN_REFLEN,
2223 index_file_withoutext);
2224 }
2225 else
2226 log_index_name= 0;
2227
2228
2229
2230 if (relay_log.open_index_file(log_index_name, ln, TRUE))
2231 {
2232 sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
2233 DBUG_RETURN(1);
2234 }
2235 #ifndef NDEBUG
2236 global_sid_lock->wrlock();
2237 gtid_set.dbug_print("set of GTIDs in relay log before initialization");
2238 global_sid_lock->unlock();
2239 #endif
2240 /*
2241 In the init_gtid_set below we pass the mi->transaction_parser.
2242 This will be useful to ensure that we only add a GTID to
2243 the Retrieved_Gtid_Set for fully retrieved transactions. Also, it will
2244 be useful to ensure the Retrieved_Gtid_Set behavior when auto
2245 positioning is disabled (we could have transactions spanning multiple
2246 relay log files in this case).
2247 We will skip this initialization if relay_log_recovery is set in order
2248 to save time, as neither the GTIDs nor the transaction_parser state
2249 would be useful when the relay log will be cleaned up later when calling
2250 init_recovery.
2251 */
2252 if (!is_relay_log_recovery &&
2253 !gtid_retrieved_initialized &&
2254 relay_log.init_gtid_sets(>id_set, NULL,
2255 opt_slave_sql_verify_checksum,
2256 true/*true=need lock*/,
2257 &mi->transaction_parser, >id_partial_trx))
2258 {
2259 sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
2260 DBUG_RETURN(1);
2261 }
2262 gtid_retrieved_initialized= true;
2263 #ifndef NDEBUG
2264 global_sid_lock->wrlock();
2265 gtid_set.dbug_print("set of GTIDs in relay log after initialization");
2266 global_sid_lock->unlock();
2267 #endif
2268 if (!gtid_partial_trx.is_empty())
2269 {
2270 /*
2271 The init_gtid_set has found an incomplete transaction in the relay log.
2272 We add this transaction's GTID to the last_gtid_queued so the IO thread
2273 knows which GTID to add to the Retrieved_Gtid_Set when reaching the end
2274 of the incomplete transaction.
2275 */
2276 mi->set_last_gtid_queued(gtid_partial_trx);
2277 }
2278 else
2279 {
2280 mi->clear_last_gtid_queued();
2281 }
2282 /*
2283 Configures what object is used by the current log to store processed
2284 gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
2285 corretly compute the set of previous gtids.
2286 */
2287 relay_log.set_previous_gtid_set_relaylog(>id_set);
2288 /*
2289 note, that if open() fails, we'll still have index file open
2290 but a destructor will take care of that
2291 */
2292
2293 mysql_mutex_t *log_lock= relay_log.get_log_lock();
2294 mysql_mutex_lock(log_lock);
2295
2296 if (relay_log.open_binlog(ln, 0,
2297 (max_relay_log_size ? max_relay_log_size :
2298 max_binlog_size), true,
2299 true/*need_lock_index=true*/,
2300 true/*need_sid_lock=true*/,
2301 mi->get_mi_description_event()))
2302 {
2303 mysql_mutex_unlock(log_lock);
2304 sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
2305 DBUG_RETURN(1);
2306 }
2307
2308 mysql_mutex_unlock(log_lock);
2309
2310 }
2311
2312 /*
2313 This checks if the repository was created before and thus there
2314 will be values to be read. Please, do not move this call after
2315 the handler->init_info().
2316 */
2317 if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
2318 {
2319 msg= "Error checking relay log repository";
2320 error= 1;
2321 goto err;
2322 }
2323
2324 if (handler->init_info())
2325 {
2326 msg= "Error reading relay log configuration";
2327 error= 1;
2328 goto err;
2329 }
2330
2331 if (check_return == REPOSITORY_DOES_NOT_EXIST)
2332 {
2333 /* Init relay log with first entry in the relay index file */
2334 if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
2335 false/*need_data_lock=false (lock should be held
2336 prior to invoking this function)*/,
2337 &msg, 0))
2338 {
2339 error= 1;
2340 goto err;
2341 }
2342 group_master_log_name[0]= 0;
2343 group_master_log_pos= 0;
2344 }
2345 else
2346 {
2347 if (read_info(handler))
2348 {
2349 msg= "Error reading relay log configuration";
2350 error= 1;
2351 goto err;
2352 }
2353
2354 if (is_relay_log_recovery && init_recovery(mi, &msg))
2355 {
2356 error= 1;
2357 goto err;
2358 }
2359
2360 if (init_relay_log_pos(group_relay_log_name,
2361 group_relay_log_pos,
2362 false/*need_data_lock=false (lock should be held
2363 prior to invoking this function)*/,
2364 &msg, 0))
2365 {
2366 char llbuf[22];
2367 sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
2368 group_relay_log_name,
2369 llstr(group_relay_log_pos, llbuf));
2370 error= 1;
2371 goto err;
2372 }
2373
2374 #ifndef NDEBUG
2375 {
2376 char llbuf1[22], llbuf2[22];
2377 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
2378 llstr(my_b_tell(cur_log),llbuf1),
2379 llstr(event_relay_log_pos,llbuf2)));
2380 assert(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2381 assert((my_b_tell(cur_log) == event_relay_log_pos));
2382 }
2383 #endif
2384 }
2385
2386 inited= 1;
2387 error_on_rli_init_info= false;
2388 if (flush_info(TRUE))
2389 {
2390 msg= "Error reading relay log configuration";
2391 error= 1;
2392 goto err;
2393 }
2394
2395 if (count_relay_log_space())
2396 {
2397 msg= "Error counting relay log space";
2398 error= 1;
2399 goto err;
2400 }
2401
2402 /*
2403 In case of MTS the recovery is deferred until the end of
2404 load_mi_and_rli_from_repositories.
2405 */
2406 if (!mi->rli->mts_recovery_group_cnt)
2407 is_relay_log_recovery= FALSE;
2408 DBUG_RETURN(error);
2409
2410 err:
2411 handler->end_info();
2412 inited= 0;
2413 error_on_rli_init_info= true;
2414 if (msg)
2415 sql_print_error("%s.", msg);
2416 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2417 true/*need_lock_log=true*/,
2418 true/*need_lock_index=true*/);
2419 DBUG_RETURN(error);
2420 }
2421
end_info()2422 void Relay_log_info::end_info()
2423 {
2424 DBUG_ENTER("Relay_log_info::end_info");
2425
2426 error_on_rli_init_info= false;
2427 if (!inited)
2428 DBUG_VOID_RETURN;
2429
2430 handler->end_info();
2431
2432 if (cur_log_fd >= 0)
2433 {
2434 end_io_cache(&cache_buf);
2435 (void)my_close(cur_log_fd, MYF(MY_WME));
2436 cur_log_fd= -1;
2437 }
2438 inited = 0;
2439 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2440 true/*need_lock_log=true*/,
2441 true/*need_lock_index=true*/);
2442 relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2443 /*
2444 Delete the slave's temporary tables from memory.
2445 In the future there will be other actions than this, to ensure persistance
2446 of slave's temp tables after shutdown.
2447 */
2448 close_temporary_tables();
2449
2450 DBUG_VOID_RETURN;
2451 }
2452
flush_current_log()2453 int Relay_log_info::flush_current_log()
2454 {
2455 DBUG_ENTER("Relay_log_info::flush_current_log");
2456 /*
2457 When we come to this place in code, relay log may or not be initialized;
2458 the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2459 */
2460 IO_CACHE *log_file= relay_log.get_log_file();
2461 if (flush_io_cache(log_file))
2462 DBUG_RETURN(2);
2463
2464 DBUG_RETURN(0);
2465 }
2466
set_master_info(Master_info * info)2467 void Relay_log_info::set_master_info(Master_info* info)
2468 {
2469 mi= info;
2470 }
2471
2472 /**
2473 Stores the file and position where the execute-slave thread are in the
2474 relay log:
2475
2476 - As this is only called by the slave thread or on STOP SLAVE, with the
2477 log_lock grabbed and the slave thread stopped, we don't need to have
2478 a lock here.
2479 - If there is an active transaction, then we don't update the position
2480 in the relay log. This is to ensure that we re-execute statements
2481 if we die in the middle of an transaction that was rolled back.
2482 - As a transaction never spans binary logs, we don't have to handle the
2483 case where we do a relay-log-rotation in the middle of the transaction.
2484 If this would not be the case, we would have to ensure that we
2485 don't delete the relay log file where the transaction started when
2486 we switch to a new relay log file.
2487
2488 @retval 0 ok,
2489 @retval 1 write error, otherwise.
2490 */
2491
2492 /**
2493 Store the file and position where the slave's SQL thread are in the
2494 relay log.
2495
2496 Notes:
2497
2498 - This function should be called either from the slave SQL thread,
2499 or when the slave thread is not running. (It reads the
2500 group_{relay|master}_log_{pos|name} and delay fields in the rli
2501 object. These may only be modified by the slave SQL thread or by
2502 a client thread when the slave SQL thread is not running.)
2503
2504 - If there is an active transaction, then we do not update the
2505 position in the relay log. This is to ensure that we re-execute
2506 statements if we die in the middle of an transaction that was
2507 rolled back.
2508
2509 - As a transaction never spans binary logs, we don't have to handle
2510 the case where we do a relay-log-rotation in the middle of the
2511 transaction. If transactions could span several binlogs, we would
2512 have to ensure that we do not delete the relay log file where the
2513 transaction started before switching to a new relay log file.
2514
2515 - Error can happen if writing to file fails or if flushing the file
2516 fails.
2517
2518 @param rli The object representing the Relay_log_info.
2519
2520 @todo Change the log file information to a binary format to avoid
2521 calling longlong2str.
2522
2523 @return 0 on success, 1 on error.
2524 */
flush_info(const bool force)2525 int Relay_log_info::flush_info(const bool force)
2526 {
2527 DBUG_ENTER("Relay_log_info::flush_info");
2528
2529 if (!inited)
2530 DBUG_RETURN(0);
2531
2532 /*
2533 We update the sync_period at this point because only here we
2534 now that we are handling a relay log info. This needs to be
2535 update every time we call flush because the option maybe
2536 dinamically set.
2537 */
2538 mysql_mutex_lock(&mts_temp_table_LOCK);
2539 handler->set_sync_period(sync_relayloginfo_period);
2540
2541 if (write_info(handler))
2542 goto err;
2543
2544 if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2545 goto err;
2546
2547 force_flush_postponed_due_to_split_trans= false;
2548 mysql_mutex_unlock(&mts_temp_table_LOCK);
2549 DBUG_RETURN(0);
2550
2551 err:
2552 sql_print_error("Error writing relay log configuration.");
2553 mysql_mutex_unlock(&mts_temp_table_LOCK);
2554 DBUG_RETURN(1);
2555 }
2556
get_number_info_rli_fields()2557 size_t Relay_log_info::get_number_info_rli_fields()
2558 {
2559 return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2560 }
2561
read_info(Rpl_info_handler * from)2562 bool Relay_log_info::read_info(Rpl_info_handler *from)
2563 {
2564 int lines= 0;
2565 char *first_non_digit= NULL;
2566 ulong temp_group_relay_log_pos= 0;
2567 ulong temp_group_master_log_pos= 0;
2568 int temp_sql_delay= 0;
2569 int temp_internal_id= internal_id;
2570
2571 DBUG_ENTER("Relay_log_info::read_info");
2572
2573 /*
2574 Should not read RLI from file in client threads. Client threads
2575 only use RLI to execute BINLOG statements.
2576
2577 @todo Uncomment the following assertion. Currently,
2578 Relay_log_info::init() is called from init_master_info() before
2579 the THD object Relay_log_info::sql_thd is created. That means we
2580 cannot call belongs_to_client() since belongs_to_client()
2581 dereferences Relay_log_info::sql_thd. So we need to refactor
2582 slightly: the THD object should be created by Relay_log_info
2583 constructor (or passed to it), so that we are guaranteed that it
2584 exists at this point. /Sven
2585 */
2586 //assert(!belongs_to_client());
2587
2588 /*
2589 Starting from 5.1.x, relay-log.info has a new format. Now, its
2590 first line contains the number of lines in the file. By reading
2591 this number we can determine which version our master.info comes
2592 from. We can't simply count the lines in the file, since
2593 versions before 5.1.x could generate files with more lines than
2594 needed. If first line doesn't contain a number, or if it
2595 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2596 then the file is treated like a file from pre-5.1.x version.
2597 There is no ambiguity when reading an old master.info: before
2598 5.1.x, the first line contained the binlog's name, which is
2599 either empty or has an extension (contains a '.'), so can't be
2600 confused with an integer.
2601
2602 So we're just reading first line and trying to figure which
2603 version is this.
2604 */
2605
2606 /*
2607 The first row is temporarily stored in mi->master_log_name, if
2608 it is line count and not binlog name (new format) it will be
2609 overwritten by the second row later.
2610 */
2611 if (from->prepare_info_for_read() ||
2612 from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2613 (char *) ""))
2614 DBUG_RETURN(TRUE);
2615
2616 lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2617
2618 if (group_relay_log_name[0]!='\0' &&
2619 *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2620 {
2621 /* Seems to be new format => read group relay log name */
2622 if (from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2623 (char *) ""))
2624 DBUG_RETURN(TRUE);
2625 }
2626 else
2627 DBUG_PRINT("info", ("relay_log_info file is in old format."));
2628
2629 if (from->get_info(&temp_group_relay_log_pos,
2630 (ulong) BIN_LOG_HEADER_SIZE) ||
2631 from->get_info(group_master_log_name,
2632 sizeof(group_relay_log_name),
2633 (char *) "") ||
2634 from->get_info(&temp_group_master_log_pos,
2635 0UL))
2636 DBUG_RETURN(TRUE);
2637
2638 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2639 {
2640 if (from->get_info(&temp_sql_delay, 0))
2641 DBUG_RETURN(TRUE);
2642 }
2643
2644 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2645 {
2646 if (from->get_info(&recovery_parallel_workers, 0UL))
2647 DBUG_RETURN(TRUE);
2648 }
2649
2650 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2651 {
2652 if (from->get_info(&temp_internal_id, 1))
2653 DBUG_RETURN(TRUE);
2654 }
2655
2656 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL)
2657 {
2658 /* the default value is empty string"" */
2659 if (from->get_info(channel, sizeof(channel), (char*)""))
2660 DBUG_RETURN(TRUE);
2661 }
2662
2663 group_relay_log_pos= temp_group_relay_log_pos;
2664 group_master_log_pos= temp_group_master_log_pos;
2665 sql_delay= (int32) temp_sql_delay;
2666 internal_id= (uint) temp_internal_id;
2667
2668 assert(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2669 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2670 DBUG_RETURN(FALSE);
2671 }
2672
set_info_search_keys(Rpl_info_handler * to)2673 bool Relay_log_info::set_info_search_keys(Rpl_info_handler *to)
2674 {
2675 DBUG_ENTER("Relay_log_info::set_info_search_keys");
2676
2677 if (to->set_info(LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL, channel))
2678 DBUG_RETURN(TRUE);
2679
2680 DBUG_RETURN(FALSE);
2681 }
2682
2683
write_info(Rpl_info_handler * to)2684 bool Relay_log_info::write_info(Rpl_info_handler *to)
2685 {
2686 DBUG_ENTER("Relay_log_info::write_info");
2687
2688 /*
2689 @todo Uncomment the following assertion. See todo in
2690 Relay_log_info::read_info() for details. /Sven
2691 */
2692 //assert(!belongs_to_client());
2693
2694 if (to->prepare_info_for_write() ||
2695 to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2696 to->set_info(group_relay_log_name) ||
2697 to->set_info((ulong) group_relay_log_pos) ||
2698 to->set_info(group_master_log_name) ||
2699 to->set_info((ulong) group_master_log_pos) ||
2700 to->set_info((int) sql_delay) ||
2701 to->set_info(recovery_parallel_workers) ||
2702 to->set_info((int) internal_id) ||
2703 to->set_info(channel))
2704 DBUG_RETURN(TRUE);
2705
2706 DBUG_RETURN(FALSE);
2707 }
2708
2709 /**
2710 The method is run by SQL thread/MTS Coordinator.
2711 It replaces the current FD event with a new one.
2712 A version adaptation routine is invoked for the new FD
2713 to align the slave applier execution context with the master version.
2714
2715 Since FD are shared by Coordinator and Workers in the MTS mode,
2716 deletion of the old FD is done through decrementing its usage counter.
2717 The destructor runs when the later drops to zero,
2718 also see @c Slave_worker::set_rli_description_event().
2719 The usage counter of the new FD is incremented.
2720
2721 Although notice that MTS worker runs it, inefficiently (see assert),
2722 once at its destruction time.
2723
2724 @param a pointer to be installed into execution context
2725 FormatDescriptor event
2726 */
2727
set_rli_description_event(Format_description_log_event * fe)2728 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2729 {
2730 DBUG_ENTER("Relay_log_info::set_rli_description_event");
2731 assert(!info_thd || !is_mts_worker(info_thd) || !fe);
2732
2733 if (fe)
2734 {
2735 ulong fe_version= adapt_to_master_version(fe);
2736
2737 if (info_thd)
2738 {
2739 // See rpl_rli_pdb.h:Slave_worker::set_rli_description_event.
2740 if (!is_in_group() &&
2741 (info_thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
2742 info_thd->variables.gtid_next.type == UNDEFINED_GROUP))
2743 {
2744 DBUG_PRINT("info", ("Setting gtid_next.type to NOT_YET_DETERMINED_GROUP"));
2745 info_thd->variables.gtid_next.set_not_yet_determined();
2746 }
2747
2748 if (is_parallel_exec() && fe_version > 0)
2749 {
2750 /*
2751 Prepare for workers' adaption to a new FD version. Workers
2752 will see notification through scheduling of a first event of
2753 a new post-new-FD.
2754 */
2755 for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
2756 (*it)->fd_change_notified= false;
2757 }
2758 }
2759 }
2760 if (rli_description_event &&
2761 rli_description_event->usage_counter.atomic_add(-1) == 1)
2762 delete rli_description_event;
2763 #ifndef NDEBUG
2764 else
2765 /* It must be MTS mode when the usage counter greater than 1. */
2766 assert(!rli_description_event || is_parallel_exec());
2767 #endif
2768 rli_description_event= fe;
2769 if (rli_description_event)
2770 rli_description_event->usage_counter.atomic_add(1);
2771
2772 DBUG_VOID_RETURN;
2773 }
2774
2775 struct st_feature_version
2776 {
2777 /*
2778 The enum must be in the version non-descending top-down order,
2779 the last item formally corresponds to highest possible server
2780 version (never reached, thereby no adapting actions here);
2781 enumeration starts from zero.
2782 */
2783 enum
2784 {
2785 WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2786 _END_OF_LIST // always last
2787 } item;
2788 /*
2789 Version where the feature is introduced.
2790 */
2791 uchar version_split[3];
2792 /*
2793 Action to perform when according to FormatDescriptor event Master
2794 is found to be feature-aware while previously it has *not* been.
2795 */
2796 void (*upgrade) (THD*);
2797 /*
2798 Action to perform when according to FormatDescriptor event Master
2799 is found to be feature-*un*aware while previously it has been.
2800 */
2801 void (*downgrade) (THD*);
2802 };
2803
wl6292_upgrade_func(THD * thd)2804 void wl6292_upgrade_func(THD *thd)
2805 {
2806 thd->variables.explicit_defaults_for_timestamp= false;
2807 if (global_system_variables.explicit_defaults_for_timestamp)
2808 thd->variables.explicit_defaults_for_timestamp= true;
2809
2810 return;
2811 }
2812
wl6292_downgrade_func(THD * thd)2813 void wl6292_downgrade_func(THD *thd)
2814 {
2815 if (global_system_variables.explicit_defaults_for_timestamp)
2816 thd->variables.explicit_defaults_for_timestamp= false;
2817
2818 return;
2819 }
2820
2821 /**
2822 Sensitive to Master-vs-Slave version difference features
2823 should be listed in the version non-descending order.
2824 */
2825 static st_feature_version s_features[]=
2826 {
2827 // order is the same as in the enum
2828 { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2829 {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2830 { st_feature_version::_END_OF_LIST,
2831 {255, 255, 255}, NULL, NULL }
2832 };
2833
2834 /**
2835 The method computes the incoming "master"'s FD server version and that
2836 of the currently installed (if ever) rli_description_event, to
2837 invoke more specific method to compare the two and adapt slave applier execution
2838 context to the new incoming master's version.
2839
2840 This method is specifically for STS applier/MTS Coordinator as well as
2841 for a user thread applying binlog events.
2842
2843 @param fdle a pointer to new Format Description event that is being
2844 set up a new execution context.
2845 @return 0 when the versions are equal,
2846 master_version otherwise
2847 */
adapt_to_master_version(Format_description_log_event * fdle)2848 ulong Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2849 {
2850 ulong master_version, current_version, slave_version;
2851
2852 slave_version= version_product(slave_version_split);
2853 /* When rli_description_event is uninitialized yet take the slave's version */
2854 master_version= !fdle ? slave_version : fdle->get_product_version();
2855 current_version= !rli_description_event ? slave_version :
2856 rli_description_event->get_product_version();
2857
2858 return adapt_to_master_version_updown(master_version, current_version);
2859 }
2860
2861 /**
2862 The method compares two supplied versions and carries out down- or
2863 up- grade customization of execution context of the slave applier
2864 (thd).
2865
2866 The method is invoked in the STS case through
2867 Relay_log_info::adapt_to_master_version() right before a new master
2868 FD is installed into the applier execution context; in the MTS
2869 case it's done by the Worker when it's assigned with a first event
2870 after the latest new FD has been installed.
2871
2872 Comparison of the current (old, existing) and the master (new,
2873 incoming) versions yields adaptive actions.
2874 To explain that, let's denote V_0 as the current, and the master's
2875 one as V_1.
2876 In the downgrade case (V_1 < V_0) a server feature that is undefined
2877 in V_1 but is defined starting from some V_f of [V_1 + 1, V_0] range
2878 (+1 to mean V_1 excluded) are invalidated ("removed" from execution context)
2879 by running so called here downgrade action.
2880 Conversely in the upgrade case a feature defined in [V_0 + 1, V_1] range
2881 is validated ("added" to execution context) by running its upgrade action.
2882 A typical use case showing how adaptive actions are necessary for the slave
2883 applier is when the master version is lesser than the slave's one.
2884 In such case events generated on the "older" master may need to be applied
2885 in their native server context. And such context can be provided by downgrade
2886 actions.
2887 Conversely, when the old master events are run out and a newer master's events
2888 show up for applying, the execution context will be upgraded through
2889 the namesake actions.
2890
2891 Notice that a relay log may have two FD events, one the slave local
2892 and the other from the Master. As there's no concern for the FD
2893 originator this leads to two adapt_to_master_version() calls.
2894 It's not harmful as can be seen from the following example.
2895 Say the currently installed FD's version is
2896 V_m, then at relay-log rotation the following transition takes
2897 place:
2898
2899 V_m -adapt-> V_s -adapt-> V_m.
2900
2901 here and further `m' subscript stands for the master, `s' for the slave.
2902 It's clear that in this case an ineffective V_m -> V_m transition occurs.
2903
2904 At composing downgrade/upgrade actions keep in mind that the slave applier
2905 version transition goes the following route:
2906 The initial version is that of the slave server (V_ss).
2907 It changes to a magic 4.0 at the slave relay log initialization.
2908 In the following course versions are extracted from each FD read out,
2909 regardless of what server generated it. Here is a typical version
2910 transition sequence underscored with annotation:
2911
2912 V_ss -> 4.0 -> V(FD_s^1) -> V(FD_m^2) ---> V(FD_s^3) -> V(FD_m^4) ...
2913
2914 ---------- ----------------- -------- ------------------ ---
2915 bootstrap 1st relay log rotation 2nd log etc
2916
2917 The upper (^) subscipt enumerates Format Description events, V(FD^i) stands
2918 for a function extrating the version data from the i:th FD.
2919
2920 There won't be any action to execute when info_thd is undefined,
2921 e.g at bootstrap.
2922
2923 @param master_version an upcoming new version
2924 @param current_version the current version
2925 @return 0 when the new version is equal to the current one,
2926 master_version otherwise
2927 */
adapt_to_master_version_updown(ulong master_version,ulong current_version)2928 ulong Relay_log_info::adapt_to_master_version_updown(ulong master_version,
2929 ulong current_version)
2930 {
2931 THD *thd= info_thd;
2932 /*
2933 When the SQL thread or MTS Coordinator executes this method
2934 there's a constraint on current_version argument.
2935 */
2936 assert(!thd ||
2937 thd->rli_fake != NULL ||
2938 thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER ||
2939 (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
2940 (!rli_description_event ||
2941 current_version ==
2942 rli_description_event->get_product_version())));
2943
2944 if (master_version == current_version)
2945 return 0;
2946 else if (!thd)
2947 return master_version;
2948
2949 bool downgrade= master_version < current_version;
2950 /*
2951 find item starting from and ending at for which adaptive actions run
2952 for downgrade or upgrade branches.
2953 (todo: convert into bsearch when number of features will grow significantly)
2954 */
2955 long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2956
2957 for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2958 {
2959 ulong ver_f= version_product(s_features[i].version_split);
2960
2961 if ((downgrade ? master_version : current_version) < ver_f &&
2962 i_first == st_feature_version::_END_OF_LIST)
2963 i_first= i;
2964 if ((downgrade ? current_version : master_version) < ver_f)
2965 {
2966 i_last= i;
2967 assert(i_last >= i_first);
2968 break;
2969 }
2970 }
2971
2972 /*
2973 actions, executed in version non-descending st_feature_version order
2974 */
2975 for (i= i_first; i < i_last; i++)
2976 {
2977 /* Run time check of the st_feature_version items ordering */
2978 assert(!i ||
2979 version_product(s_features[i - 1].version_split) <=
2980 version_product(s_features[i].version_split));
2981
2982 assert((downgrade ? master_version : current_version) <
2983 version_product(s_features[i].version_split) &&
2984 (downgrade ? current_version : master_version >=
2985 version_product(s_features[i].version_split)));
2986
2987 if (downgrade && s_features[i].downgrade)
2988 {
2989 s_features[i].downgrade(thd);
2990 }
2991 else if (s_features[i].upgrade)
2992 {
2993 s_features[i].upgrade(thd);
2994 }
2995 }
2996
2997 return master_version;
2998 }
2999
relay_log_number_to_name(uint number,char name[FN_REFLEN+1])3000 void Relay_log_info::relay_log_number_to_name(uint number,
3001 char name[FN_REFLEN+1])
3002 {
3003 char *str= NULL;
3004 char relay_bin_channel[FN_REFLEN+1];
3005 const char *relay_log_basename_channel=
3006 add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN+1,
3007 relay_log_basename);
3008
3009 /* str points to closing null of relay log basename channel */
3010 str= strmake(name, relay_log_basename_channel, FN_REFLEN+1);
3011 *str++= '.';
3012 sprintf(str, "%06u", number);
3013 }
3014
relay_log_name_to_number(const char * name)3015 uint Relay_log_info::relay_log_name_to_number(const char *name)
3016 {
3017 return static_cast<uint>(atoi(fn_ext(name)+1));
3018 }
3019
is_mts_db_partitioned(Relay_log_info * rli)3020 bool is_mts_db_partitioned(Relay_log_info * rli)
3021 {
3022 return (rli->current_mts_submode->get_type() ==
3023 MTS_PARALLEL_TYPE_DB_NAME);
3024 }
3025
get_for_channel_str(bool upper_case) const3026 const char* Relay_log_info::get_for_channel_str(bool upper_case) const
3027 {
3028 if (rli_fake)
3029 return "";
3030 else
3031 return mi->get_for_channel_str(upper_case);
3032 }
3033
add_gtid_set(const Gtid_set * gtid_set)3034 enum_return_status Relay_log_info::add_gtid_set(const Gtid_set *gtid_set)
3035 {
3036 DBUG_ENTER("Relay_log_info::add_gtid_set(gtid_set)");
3037
3038 enum_return_status return_status= this->gtid_set.add_gtid_set(gtid_set);
3039
3040 DBUG_RETURN(return_status);
3041 }
3042
detach_engine_ha_data(THD * thd)3043 void Relay_log_info::detach_engine_ha_data(THD *thd)
3044 {
3045 is_engine_ha_data_detached= true;
3046 /*
3047 In case of slave thread applier or processing binlog by client,
3048 detach the engine ha_data ("native" engine transaction)
3049 in favor of dynamically created.
3050 */
3051 plugin_foreach(thd, detach_native_trx,
3052 MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3053 }
3054
reattach_engine_ha_data(THD * thd)3055 void Relay_log_info::reattach_engine_ha_data(THD *thd)
3056 {
3057 is_engine_ha_data_detached = false;
3058 /*
3059 In case of slave thread applier or processing binlog by client,
3060 reattach the engine ha_data ("native" engine transaction)
3061 in favor of dynamically created.
3062 */
3063 plugin_foreach(thd, reattach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3064 }
3065