1 /* Copyright (c) 2006, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "sql_priv.h"
24 #include "unireg.h" // HAVE_*
25 #include "rpl_mi.h"
26 #include "rpl_rli.h"
27 #include "sql_base.h" // close_thread_tables
28 #include <my_dir.h> // For MY_STAT
29 #include "log_event.h" // Format_description_log_event, Log_event,
30 // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
31 // PREFIX_SQL_LOAD
32 #include "rpl_slave.h"
33 #include "rpl_utility.h"
34 #include "transaction.h"
35 #include "sql_parse.h" // end_trans, ROLLBACK
36 #include "rpl_slave.h"
37 #include "rpl_rli_pdb.h"
38 #include "rpl_info_factory.h"
39 #include <mysql/plugin.h>
40 #include <mysql/service_thd_wait.h>
41
42 using std::min;
43 using std::max;
44
45 /*
46 Please every time you add a new field to the relay log info, update
47 what follows. For now, this is just used to get the number of
48 fields.
49 */
50 const char* info_rli_fields[]=
51 {
52 "number_of_lines",
53 "group_relay_log_name",
54 "group_relay_log_pos",
55 "group_master_log_name",
56 "group_master_log_pos",
57 "sql_delay",
58 "number_of_workers",
59 "id"
60 };
61
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id)62 Relay_log_info::Relay_log_info(bool is_slave_recovery
63 #ifdef HAVE_PSI_INTERFACE
64 ,PSI_mutex_key *param_key_info_run_lock,
65 PSI_mutex_key *param_key_info_data_lock,
66 PSI_mutex_key *param_key_info_sleep_lock,
67 PSI_mutex_key *param_key_info_data_cond,
68 PSI_mutex_key *param_key_info_start_cond,
69 PSI_mutex_key *param_key_info_stop_cond,
70 PSI_mutex_key *param_key_info_sleep_cond
71 #endif
72 , uint param_id
73 )
74 :Rpl_info("SQL"
75 #ifdef HAVE_PSI_INTERFACE
76 ,param_key_info_run_lock, param_key_info_data_lock,
77 param_key_info_sleep_lock,
78 param_key_info_data_cond, param_key_info_start_cond,
79 param_key_info_stop_cond, param_key_info_sleep_cond
80 #endif
81 , param_id
82 ),
83 replicate_same_server_id(::replicate_same_server_id),
84 cur_log_fd(-1), relay_log(&sync_relaylog_period),
85 is_relay_log_recovery(is_slave_recovery),
86 save_temporary_tables(0),
87 cur_log_old_open_count(0), error_on_rli_init_info(false),
88 group_relay_log_pos(0), event_relay_log_pos(0),
89 group_master_log_pos(0),
90 gtid_set(global_sid_map, global_sid_lock),
91 log_space_total(0), ignore_log_space_limit(0),
92 sql_force_rotate_relay(false),
93 last_master_timestamp(0), slave_skip_counter(0),
94 abort_pos_wait(0), until_condition(UNTIL_NONE),
95 until_log_pos(0),
96 until_sql_gtids(global_sid_map),
97 until_sql_gtids_first_event(true),
98 retried_trans(0),
99 tables_to_lock(0), tables_to_lock_count(0),
100 rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
101 slave_parallel_workers(0),
102 exit_counter(0),
103 max_updated_index(0),
104 recovery_parallel_workers(0), checkpoint_seqno(0),
105 checkpoint_group(opt_mts_checkpoint_group),
106 recovery_groups_inited(false), mts_recovery_group_cnt(0),
107 mts_recovery_index(0), mts_recovery_group_seen_begin(0),
108 mts_group_status(MTS_NOT_IN_GROUP), reported_unsafe_warning(false),
109 rli_description_event(NULL),
110 sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
111 long_find_row_note_printed(false)
112 {
113 DBUG_ENTER("Relay_log_info::Relay_log_info");
114
115 #ifdef HAVE_PSI_INTERFACE
116 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
117 key_RELAYLOG_LOCK_commit,
118 key_RELAYLOG_LOCK_commit_queue,
119 key_RELAYLOG_LOCK_done,
120 key_RELAYLOG_LOCK_flush_queue,
121 key_RELAYLOG_LOCK_log,
122 key_RELAYLOG_LOCK_sync,
123 key_RELAYLOG_LOCK_sync_queue,
124 key_RELAYLOG_LOCK_xids,
125 key_RELAYLOG_COND_done,
126 key_RELAYLOG_update_cond,
127 key_RELAYLOG_prep_xids_cond,
128 key_file_relaylog,
129 key_file_relaylog_index);
130 #endif
131
132 group_relay_log_name[0]= event_relay_log_name[0]=
133 group_master_log_name[0]= 0;
134 until_log_name[0]= ign_master_log_name_end[0]= 0;
135 set_timespec_nsec(last_clock, 0);
136 memset(&cache_buf, 0, sizeof(cache_buf));
137 cached_charset_invalidate();
138
139 mysql_mutex_init(key_relay_log_info_log_space_lock,
140 &log_space_lock, MY_MUTEX_INIT_FAST);
141 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
142 mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
143 MY_MUTEX_INIT_FAST);
144 mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
145 mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
146 MY_MUTEX_INIT_FAST);
147 my_atomic_rwlock_init(&slave_open_temp_tables_lock);
148
149 relay_log.init_pthread_objects();
150 do_server_version_split(::server_version, slave_version_split);
151 last_retrieved_gtid.clear();
152 force_flush_postponed_due_to_split_trans= false;
153 DBUG_VOID_RETURN;
154 }
155
156 /**
157 The method to invoke at slave threads start
158 */
init_workers(ulong n_workers)159 void Relay_log_info::init_workers(ulong n_workers)
160 {
161 /*
162 Parallel slave parameters initialization is done regardless
163 whether the feature is or going to be active or not.
164 */
165 mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
166 mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
167 mts_last_online_stat= 0;
168 my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
169 }
170
171 /**
172 The method to invoke at slave threads stop
173 */
deinit_workers()174 void Relay_log_info::deinit_workers()
175 {
176 delete_dynamic(&workers);
177 }
178
~Relay_log_info()179 Relay_log_info::~Relay_log_info()
180 {
181 DBUG_ENTER("Relay_log_info::~Relay_log_info");
182
183 if (recovery_groups_inited)
184 bitmap_free(&recovery_groups);
185 mysql_mutex_destroy(&log_space_lock);
186 mysql_cond_destroy(&log_space_cond);
187 mysql_mutex_destroy(&pending_jobs_lock);
188 mysql_cond_destroy(&pending_jobs_cond);
189 mysql_mutex_destroy(&exit_count_lock);
190 my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
191 relay_log.cleanup();
192 set_rli_description_event(NULL);
193 last_retrieved_gtid.clear();
194
195 DBUG_VOID_RETURN;
196 }
197
198 /**
199 Method is called when MTS coordinator senses the relay-log name
200 has been changed.
201 It marks each Worker member with this fact to make an action
202 at time it will distribute a terminal event of a group to the Worker.
203
204 Worker receives the new name at the group commiting phase
205 @c Slave_worker::slave_worker_ends_group().
206 */
reset_notified_relay_log_change()207 void Relay_log_info::reset_notified_relay_log_change()
208 {
209 if (!is_parallel_exec())
210 return;
211 for (uint i= 0; i < workers.elements; i++)
212 {
213 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
214 w->relay_log_change_notified= FALSE;
215 }
216 }
217
218 /**
219 This method is called in mts_checkpoint_routine() to mark that each
220 worker is required to adapt to a new checkpoint data whose coordinates
221 are passed to it through GAQ index.
222
223 Worker notices the new checkpoint value at the group commit to reset
224 the current bitmap and starts using the clean bitmap indexed from zero
225 of being reset checkpoint_seqno.
226
227 New seconds_behind_master timestamp is installed.
228
229 @param shift number of bits to shift by Worker due to the
230 current checkpoint change.
231 @param new_ts new seconds_behind_master timestamp value
232 unless zero. Zero could be due to FD event
233 or fake rotate event.
234 @param need_data_lock False if caller has locked @c data_lock
235 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool need_data_lock)236 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
237 bool need_data_lock)
238 {
239 /*
240 If this is not a parallel execution we return immediately.
241 */
242 if (!is_parallel_exec())
243 return;
244
245 for (uint i= 0; i < workers.elements; i++)
246 {
247 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
248 /*
249 Reseting the notification information in order to force workers to
250 assign jobs with the new updated information.
251 Notice that the bitmap_shifted is accumulated to indicate how many
252 consecutive jobs were successfully processed.
253
254 The worker when assigning a new job will set the value back to
255 zero.
256 */
257 w->checkpoint_notified= FALSE;
258 w->bitmap_shifted= w->bitmap_shifted + shift;
259 /*
260 Zero shift indicates the caller rotates the master binlog.
261 The new name will be passed to W through the group descriptor
262 during the first post-rotation time scheduling.
263 */
264 if (shift == 0)
265 w->master_log_change_notified= false;
266
267 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
268 "worker->bitmap_shifted --> %lu, worker --> %u.",
269 shift, w->bitmap_shifted, i));
270 }
271 /*
272 There should not be a call where (shift == 0 && checkpoint_seqno != 0).
273
274 Then the new checkpoint sequence is updated by subtracting the number
275 of consecutive jobs that were successfully processed.
276 */
277 DBUG_ASSERT(!(shift == 0 && checkpoint_seqno != 0));
278 checkpoint_seqno= checkpoint_seqno - shift;
279 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
280 "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
281
282 if (new_ts)
283 {
284 if (need_data_lock)
285 mysql_mutex_lock(&data_lock);
286 else
287 mysql_mutex_assert_owner(&data_lock);
288 last_master_timestamp= new_ts;
289 if (need_data_lock)
290 mysql_mutex_unlock(&data_lock);
291 }
292 }
293
294 /**
295 Reset recovery info from Worker info table and
296 mark MTS recovery is completed.
297
298 @return false on success true when @c reset_notified_checkpoint failed.
299 */
mts_finalize_recovery()300 bool Relay_log_info::mts_finalize_recovery()
301 {
302 bool ret= false;
303 uint i;
304 uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
305
306 DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
307
308 for (i= 0; !ret && i < workers.elements; i++)
309 {
310 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
311 ret= w->reset_recovery_info();
312 DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
313 }
314 /*
315 The loop is traversed in the worker index descending order due
316 to specifics of the Worker table repository that does not like
317 even temporary holes. Therefore stale records are deleted
318 from the tail.
319 */
320 DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
321 {DBUG_SET("+d,mts_worker_thread_init_fails");});
322
323 for (i= recovery_parallel_workers; i > workers.elements && !ret; i--)
324 {
325 Slave_worker *w=
326 Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
327 /*
328 If an error occurs during the above create_worker call, the newly created
329 worker object gets deleted within the above function call itself and only
330 NULL is returned. Hence the following check has been added to verify
331 that a valid worker object exists.
332 */
333 if (w)
334 {
335 ret= w->remove_info();
336 delete w;
337 }
338 else
339 {
340 ret= true;
341 goto err;
342 }
343 }
344 recovery_parallel_workers= slave_parallel_workers;
345
346 err:
347 DBUG_RETURN(ret);
348 }
349
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)350 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
351 {
352 MY_STAT s;
353 DBUG_ENTER("add_relay_log");
354 mysql_mutex_assert_owner(&rli->log_space_lock);
355 if (!mysql_file_stat(key_file_relaylog,
356 linfo->log_file_name, &s, MYF(0)))
357 {
358 sql_print_error("log %s listed in the index, but failed to stat.",
359 linfo->log_file_name);
360 DBUG_RETURN(1);
361 }
362 rli->log_space_total += s.st_size;
363 #ifndef DBUG_OFF
364 char buf[22];
365 DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
366 #endif
367 DBUG_RETURN(0);
368 }
369
count_relay_log_space()370 int Relay_log_info::count_relay_log_space()
371 {
372 LOG_INFO flinfo;
373 DBUG_ENTER("Relay_log_info::count_relay_log_space");
374 mysql_mutex_lock(&log_space_lock);
375 log_space_total= 0;
376 if (relay_log.find_log_pos(&flinfo, NullS, 1))
377 {
378 sql_print_error("Could not find first log while counting relay log space.");
379 mysql_mutex_unlock(&log_space_lock);
380 DBUG_RETURN(1);
381 }
382 do
383 {
384 if (add_relay_log(this, &flinfo))
385 {
386 mysql_mutex_unlock(&log_space_lock);
387 DBUG_RETURN(1);
388 }
389 } while (!relay_log.find_next_log(&flinfo, 1));
390 /*
391 As we have counted everything, including what may have written in a
392 preceding write, we must reset bytes_written, or we may count some space
393 twice.
394 */
395 relay_log.reset_bytes_written();
396 mysql_mutex_unlock(&log_space_lock);
397 DBUG_RETURN(0);
398 }
399
400 /**
401 Resets UNTIL condition for Relay_log_info
402 */
403
clear_until_condition()404 void Relay_log_info::clear_until_condition()
405 {
406 DBUG_ENTER("clear_until_condition");
407
408 until_condition= Relay_log_info::UNTIL_NONE;
409 until_log_name[0]= 0;
410 until_log_pos= 0;
411 until_sql_gtids.clear();
412 until_sql_gtids_first_event= true;
413 DBUG_VOID_RETURN;
414 }
415
416 /**
417 Opens and intialize the given relay log. Specifically, it does what follows:
418
419 - Closes old open relay log files.
420 - If we are using the same relay log as the running IO-thread, then sets.
421 rli->cur_log to point to the same IO_CACHE entry.
422 - If not, opens the 'log' binary file.
423
424 @todo check proper initialization of
425 group_master_log_name/group_master_log_pos. /alfranio
426
427 @param rli[in] Relay information (will be initialized)
428 @param log[in] Name of relay log file to read from. NULL = First log
429 @param pos[in] Position in relay log file
430 @param need_data_lock[in] If true, this function will acquire the
431 relay_log.data_lock(); otherwise the caller should already have
432 acquired it.
433 @param errmsg[out] On error, this function will store a pointer to
434 an error message here
435 @param keep_looking_for_fd[in] If true, this function will
436 look for a Format_description_log_event. We only need this when the
437 SQL thread starts and opens an existing relay log and has to execute
438 it (possibly from an offset >4); then we need to read the first
439 event of the relay log to be able to parse the events we have to
440 execute.
441
442 @retval 0 ok,
443 @retval 1 error. In this case, *errmsg is set to point to the error
444 message.
445 */
446
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)447 int Relay_log_info::init_relay_log_pos(const char* log,
448 ulonglong pos, bool need_data_lock,
449 const char** errmsg,
450 bool keep_looking_for_fd)
451 {
452 DBUG_ENTER("Relay_log_info::init_relay_log_pos");
453 DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
454
455 *errmsg=0;
456 const char* errmsg_fmt= 0;
457 static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
458 mysql_mutex_t *log_lock= relay_log.get_log_lock();
459
460 if (need_data_lock)
461 mysql_mutex_lock(&data_lock);
462 else
463 mysql_mutex_assert_owner(&data_lock);
464
465 /*
466 By default the relay log is in binlog format 3 (4.0).
467 Even if format is 4, this will work enough to read the first event
468 (Format_desc) (remember that format 4 is just lenghtened compared to format
469 3; format 3 is a prefix of format 4).
470 */
471 set_rli_description_event(new Format_description_log_event(3));
472
473 mysql_mutex_lock(log_lock);
474
475 /* Close log file and free buffers if it's already open */
476 if (cur_log_fd >= 0)
477 {
478 end_io_cache(&cache_buf);
479 mysql_file_close(cur_log_fd, MYF(MY_WME));
480 cur_log_fd = -1;
481 }
482
483 group_relay_log_pos= event_relay_log_pos= pos;
484
485 /*
486 Test to see if the previous run was with the skip of purging
487 If yes, we do not purge when we restart
488 */
489 if (relay_log.find_log_pos(&linfo, NullS, 1))
490 {
491 *errmsg="Could not find first log during relay log initialization";
492 goto err;
493 }
494
495 if (log && relay_log.find_log_pos(&linfo, log, 1))
496 {
497 errmsg_fmt= "Could not find target log file mentioned in "
498 "relay log info in the index file '%s' during "
499 "relay log initialization";
500 sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
501 *errmsg= errmsg_buff;
502 goto err;
503 }
504
505 strmake(group_relay_log_name, linfo.log_file_name,
506 sizeof(group_relay_log_name) - 1);
507 strmake(event_relay_log_name, linfo.log_file_name,
508 sizeof(event_relay_log_name) - 1);
509
510 if (relay_log.is_active(linfo.log_file_name))
511 {
512 /*
513 The IO thread is using this log file.
514 In this case, we will use the same IO_CACHE pointer to
515 read data as the IO thread is using to write data.
516 */
517 my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
518 if (check_binlog_magic(cur_log, errmsg))
519 goto err;
520 cur_log_old_open_count=relay_log.get_open_count();
521 }
522 else
523 {
524 /*
525 Open the relay log and set cur_log to point at this one
526 */
527 if ((cur_log_fd=open_binlog_file(&cache_buf,
528 linfo.log_file_name,errmsg)) < 0)
529 goto err;
530 cur_log = &cache_buf;
531 }
532 /*
533 In all cases, check_binlog_magic() has been called so we're at offset 4 for
534 sure.
535 */
536 if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
537 {
538 Log_event* ev;
539 while (keep_looking_for_fd)
540 {
541 /*
542 Read the possible Format_description_log_event; if position
543 was 4, no need, it will be read naturally.
544 */
545 DBUG_PRINT("info",("looking for a Format_description_log_event"));
546
547 if (my_b_tell(cur_log) >= pos)
548 break;
549
550 /*
551 Because of we have data_lock and log_lock, we can safely read an
552 event
553 */
554 if (!(ev= Log_event::read_log_event(cur_log, 0,
555 rli_description_event,
556 opt_slave_sql_verify_checksum)))
557 {
558 DBUG_PRINT("info",("could not read event, cur_log->error=%d",
559 cur_log->error));
560 if (cur_log->error) /* not EOF */
561 {
562 *errmsg= "I/O error reading event at position 4";
563 goto err;
564 }
565 break;
566 }
567 else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
568 {
569 DBUG_PRINT("info",("found Format_description_log_event"));
570 set_rli_description_event((Format_description_log_event *)ev);
571 /*
572 As ev was returned by read_log_event, it has passed is_valid(), so
573 my_malloc() in ctor worked, no need to check again.
574 */
575 /*
576 Ok, we found a Format_description event. But it is not sure that this
577 describes the whole relay log; indeed, one can have this sequence
578 (starting from position 4):
579 Format_desc (of slave)
580 Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
581 Rotate (of master)
582 Format_desc (of master)
583 So the Format_desc which really describes the rest of the relay log
584 can be the 3rd or the 4th event (depending on GTIDs being enabled or
585 not, it can't be further than that, because we rotate
586 the relay log when we queue a Rotate event from the master).
587 But what describes the Rotate is the first Format_desc.
588 So what we do is:
589 go on searching for Format_description events, until you exceed the
590 position (argument 'pos') or until you find an event other than
591 Previous-GTIDs, Rotate or Format_desc.
592 */
593 }
594 else
595 {
596 DBUG_PRINT("info",("found event of another type=%d",
597 ev->get_type_code()));
598 keep_looking_for_fd=
599 (ev->get_type_code() == ROTATE_EVENT ||
600 ev->get_type_code() == PREVIOUS_GTIDS_LOG_EVENT);
601 delete ev;
602 }
603 }
604 my_b_seek(cur_log,(off_t)pos);
605 #ifndef DBUG_OFF
606 {
607 char llbuf1[22], llbuf2[22];
608 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
609 llstr(my_b_tell(cur_log),llbuf1),
610 llstr(get_event_relay_log_pos(),llbuf2)));
611 }
612 #endif
613
614 }
615
616 err:
617 /*
618 If we don't purge, we can't honour relay_log_space_limit ;
619 silently discard it
620 */
621 if (!relay_log_purge)
622 {
623 log_space_limit= 0; // todo: consider to throw a warning at least
624 }
625 mysql_cond_broadcast(&data_cond);
626
627 mysql_mutex_unlock(log_lock);
628
629 if (need_data_lock)
630 mysql_mutex_unlock(&data_lock);
631 if (!rli_description_event->is_valid() && !*errmsg)
632 *errmsg= "Invalid Format_description log event; could be out of memory";
633
634 DBUG_RETURN ((*errmsg) ? 1 : 0);
635 }
636
637 /**
638 Waits until the SQL thread reaches (has executed up to) the
639 log/position or timed out.
640
641 SYNOPSIS
642 @param[in] thd client thread that sent @c SELECT @c MASTER_POS_WAIT,
643 @param[in] log_name log name to wait for,
644 @param[in] log_pos position to wait for,
645 @param[in] timeout @c timeout in seconds before giving up waiting.
646 @c timeout is double whereas it should be ulong; but this is
647 to catch if the user submitted a negative timeout.
648
649 @retval -2 improper arguments (log_pos<0)
650 or slave not running, or master info changed
651 during the function's execution,
652 or client thread killed. -2 is translated to NULL by caller,
653 @retval -1 timed out
654 @retval >=0 number of log events the function had to wait
655 before reaching the desired log/position
656 */
657
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)658 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
659 longlong log_pos,
660 double timeout)
661 {
662 int event_count = 0;
663 ulong init_abort_pos_wait;
664 int error=0;
665 struct timespec abstime; // for timeout checking
666 PSI_stage_info old_stage;
667 DBUG_ENTER("Relay_log_info::wait_for_pos");
668
669 if (!inited)
670 DBUG_RETURN(-2);
671
672 DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
673 log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
674
675 set_timespec_nsec(abstime, (ulonglong) timeout * 1000000000ULL);
676 mysql_mutex_lock(&data_lock);
677 thd->ENTER_COND(&data_cond, &data_lock,
678 &stage_waiting_for_the_slave_thread_to_advance_position,
679 &old_stage);
680 /*
681 This function will abort when it notices that some CHANGE MASTER or
682 RESET MASTER has changed the master info.
683 To catch this, these commands modify abort_pos_wait ; We just monitor
684 abort_pos_wait and see if it has changed.
685 Why do we have this mechanism instead of simply monitoring slave_running
686 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
687 the SQL thread be stopped?
688 This is becasue if someones does:
689 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
690 the change may happen very quickly and we may not notice that
691 slave_running briefly switches between 1/0/1.
692 */
693 init_abort_pos_wait= abort_pos_wait;
694
695 /*
696 We'll need to
697 handle all possible log names comparisons (e.g. 999 vs 1000).
698 We use ulong for string->number conversion ; this is no
699 stronger limitation than in find_uniq_filename in sql/log.cc
700 */
701 ulong log_name_extension;
702 char log_name_tmp[FN_REFLEN]; //make a char[] from String
703
704 strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
705
706 char *p= fn_ext(log_name_tmp);
707 char *p_end;
708 if (!*p || log_pos<0)
709 {
710 error= -2; //means improper arguments
711 goto err;
712 }
713 // Convert 0-3 to 4
714 log_pos= max<ulong>(log_pos, BIN_LOG_HEADER_SIZE);
715 /* p points to '.' */
716 log_name_extension= strtoul(++p, &p_end, 10);
717 /*
718 p_end points to the first invalid character.
719 If it equals to p, no digits were found, error.
720 If it contains '\0' it means conversion went ok.
721 */
722 if (p_end==p || *p_end)
723 {
724 error= -2;
725 goto err;
726 }
727
728 /* The "compare and wait" main loop */
729 while (!thd->killed &&
730 init_abort_pos_wait == abort_pos_wait &&
731 slave_running)
732 {
733 bool pos_reached;
734 int cmp_result= 0;
735
736 DBUG_PRINT("info",
737 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
738 init_abort_pos_wait, abort_pos_wait));
739 DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
740 group_master_log_name, (ulong) group_master_log_pos));
741
742 /*
743 group_master_log_name can be "", if we are just after a fresh
744 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
745 (before we have executed one Rotate event from the master) or
746 (rare) if the user is doing a weird slave setup (see next
747 paragraph). If group_master_log_name is "", we assume we don't
748 have enough info to do the comparison yet, so we just wait until
749 more data. In this case master_log_pos is always 0 except if
750 somebody (wrongly) sets this slave to be a slave of itself
751 without using --replicate-same-server-id (an unsupported
752 configuration which does nothing), then group_master_log_pos
753 will grow and group_master_log_name will stay "".
754 */
755 if (*group_master_log_name)
756 {
757 char *basename= (group_master_log_name +
758 dirname_length(group_master_log_name));
759 /*
760 First compare the parts before the extension.
761 Find the dot in the master's log basename,
762 and protect against user's input error :
763 if the names do not match up to '.' included, return error
764 */
765 char *q= (char*)(fn_ext(basename)+1);
766 if (strncmp(basename, log_name_tmp, (int)(q-basename)))
767 {
768 error= -2;
769 break;
770 }
771 // Now compare extensions.
772 char *q_end;
773 ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
774 if (group_master_log_name_extension < log_name_extension)
775 cmp_result= -1 ;
776 else
777 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
778
779 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
780 cmp_result > 0);
781 if (pos_reached || thd->killed)
782 break;
783 }
784
785 //wait for master update, with optional timeout.
786
787 DBUG_PRINT("info",("Waiting for master update"));
788 /*
789 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
790 will wake us up.
791 */
792 thd_wait_begin(thd, THD_WAIT_BINLOG);
793 if (timeout > 0)
794 {
795 /*
796 Note that mysql_cond_timedwait checks for the timeout
797 before for the condition ; i.e. it returns ETIMEDOUT
798 if the system time equals or exceeds the time specified by abstime
799 before the condition variable is signaled or broadcast, _or_ if
800 the absolute time specified by abstime has already passed at the time
801 of the call.
802 For that reason, mysql_cond_timedwait will do the "timeoutting" job
803 even if its condition is always immediately signaled (case of a loaded
804 master).
805 */
806 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
807 }
808 else
809 mysql_cond_wait(&data_cond, &data_lock);
810 thd_wait_end(thd);
811 DBUG_PRINT("info",("Got signal of master update or timed out"));
812 if (error == ETIMEDOUT || error == ETIME)
813 {
814 #ifndef DBUG_OFF
815 /*
816 Doing this to generate a stack trace and make debugging
817 easier.
818 */
819 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
820 DBUG_ASSERT(0);
821 #endif
822 error= -1;
823 break;
824 }
825 error=0;
826 event_count++;
827 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
828 }
829
830 err:
831 thd->EXIT_COND(&old_stage);
832 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
833 improper_arguments: %d timed_out: %d",
834 thd->killed_errno(),
835 (int) (init_abort_pos_wait != abort_pos_wait),
836 (int) slave_running,
837 (int) (error == -2),
838 (int) (error == -1)));
839 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
840 !slave_running)
841 {
842 error= -2;
843 }
844 DBUG_RETURN( error ? error : event_count );
845 }
846
847 /*
848 TODO: This is a duplicated code that needs to be simplified.
849 This will be done while developing all possible sync options.
850 See WL#3584's specification.
851
852 /Alfranio
853 */
wait_for_gtid_set(THD * thd,String * gtid,double timeout)854 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
855 double timeout)
856 {
857 int event_count = 0;
858 ulong init_abort_pos_wait;
859 int error=0;
860 struct timespec abstime; // for timeout checking
861 PSI_stage_info old_stage;
862 DBUG_ENTER("Relay_log_info::wait_for_gtid_set");
863
864 if (!inited)
865 DBUG_RETURN(-2);
866
867 DBUG_PRINT("info", ("Waiting for %s timeout %f", gtid->c_ptr_safe(),
868 timeout));
869
870 set_timespec_nsec(abstime, (ulonglong) timeout * 1000000000ULL);
871 mysql_mutex_lock(&data_lock);
872 thd->ENTER_COND(&data_cond, &data_lock,
873 &stage_waiting_for_the_slave_thread_to_advance_position,
874 &old_stage);
875 /*
876 This function will abort when it notices that some CHANGE MASTER or
877 RESET MASTER has changed the master info.
878 To catch this, these commands modify abort_pos_wait ; We just monitor
879 abort_pos_wait and see if it has changed.
880 Why do we have this mechanism instead of simply monitoring slave_running
881 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
882 the SQL thread be stopped?
883 This is becasue if someones does:
884 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
885 the change may happen very quickly and we may not notice that
886 slave_running briefly switches between 1/0/1.
887 */
888 init_abort_pos_wait= abort_pos_wait;
889 Gtid_set wait_gtid_set(global_sid_map);
890 global_sid_lock->rdlock();
891 if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
892 {
893 global_sid_lock->unlock();
894 goto err;
895 }
896 global_sid_lock->unlock();
897
898 /* The "compare and wait" main loop */
899 while (!thd->killed &&
900 init_abort_pos_wait == abort_pos_wait &&
901 slave_running)
902 {
903 DBUG_PRINT("info",
904 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
905 init_abort_pos_wait, abort_pos_wait));
906
907 //wait for master update, with optional timeout.
908
909 global_sid_lock->wrlock();
910 const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
911 const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
912
913 DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
914 "!is_intersection_nonempty: %d",
915 gtid->c_ptr_safe(), wait_gtid_set.is_subset(logged_gtids),
916 !owned_gtids->is_intersection_nonempty(&wait_gtid_set)));
917 logged_gtids->dbug_print("gtid_executed:");
918 owned_gtids->dbug_print("owned_gtids:");
919
920 /*
921 Since commit is performed after log to binary log, we must also
922 check if any GTID of wait_gtid_set is not yet committed.
923 */
924 if (wait_gtid_set.is_subset(logged_gtids) &&
925 !owned_gtids->is_intersection_nonempty(&wait_gtid_set))
926 {
927 global_sid_lock->unlock();
928 break;
929 }
930 global_sid_lock->unlock();
931
932 DBUG_PRINT("info",("Waiting for master update"));
933
934 /*
935 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
936 will wake us up.
937 */
938 thd_wait_begin(thd, THD_WAIT_BINLOG);
939 if (timeout > 0)
940 {
941 /*
942 Note that mysql_cond_timedwait checks for the timeout
943 before for the condition ; i.e. it returns ETIMEDOUT
944 if the system time equals or exceeds the time specified by abstime
945 before the condition variable is signaled or broadcast, _or_ if
946 the absolute time specified by abstime has already passed at the time
947 of the call.
948 For that reason, mysql_cond_timedwait will do the "timeoutting" job
949 even if its condition is always immediately signaled (case of a loaded
950 master).
951 */
952 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
953 }
954 else
955 mysql_cond_wait(&data_cond, &data_lock);
956 thd_wait_end(thd);
957 DBUG_PRINT("info",("Got signal of master update or timed out"));
958 if (error == ETIMEDOUT || error == ETIME)
959 {
960 #ifndef DBUG_OFF
961 /*
962 Doing this to generate a stack trace and make debugging
963 easier.
964 */
965 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
966 DBUG_ASSERT(0);
967 #endif
968 error= -1;
969 break;
970 }
971 error=0;
972 event_count++;
973 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
974 }
975
976 err:
977 thd->EXIT_COND(&old_stage);
978 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
979 improper_arguments: %d timed_out: %d",
980 thd->killed_errno(),
981 (int) (init_abort_pos_wait != abort_pos_wait),
982 (int) slave_running,
983 (int) (error == -2),
984 (int) (error == -1)));
985 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
986 !slave_running)
987 {
988 error= -2;
989 }
990 DBUG_RETURN( error ? error : event_count );
991 }
992
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)993 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
994 bool need_data_lock)
995 {
996 int error= 0;
997 DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
998
999 if (need_data_lock)
1000 mysql_mutex_lock(&data_lock);
1001 else
1002 mysql_mutex_assert_owner(&data_lock);
1003
1004 inc_event_relay_log_pos();
1005 group_relay_log_pos= event_relay_log_pos;
1006 strmake(group_relay_log_name,event_relay_log_name,
1007 sizeof(group_relay_log_name)-1);
1008
1009 notify_group_relay_log_name_update();
1010
1011 /*
1012 In 4.x we used the event's len to compute the positions here. This is
1013 wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1014 then the event's len is not what is was in the master's binlog, so this
1015 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1016 replication: Exec_master_log_pos is wrong). Only way to solve this is to
1017 have the original offset of the end of the event the relay log. This is
1018 what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1019 of log_pos in 4.0 was to compute the end_log_pos; so better to store
1020 end_log_pos instead of begin_log_pos.
1021 If we had not done this fix here, the problem would also have appeared
1022 when the slave and master are 5.0 but with different event length (for
1023 example the slave is more recent than the master and features the event
1024 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1025 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1026 value which would lead to badly broken replication.
1027 Even the relay_log_pos will be corrupted in this case, because the len is
1028 the relay log is not "val".
1029 With the end_log_pos solution, we avoid computations involving lengthes.
1030 */
1031 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
1032 (long) log_pos, (long) group_master_log_pos));
1033
1034 if (log_pos > 0) // 3.23 binlogs don't have log_posx
1035 group_master_log_pos= log_pos;
1036
1037 /*
1038 In MTS mode FD or Rotate event commit their solitary group to
1039 Coordinator's info table. Callers make sure that Workers have been
1040 executed all assignements.
1041 Broadcast to master_pos_wait() waiters should be done after
1042 the table is updated.
1043 */
1044 DBUG_ASSERT(!is_parallel_exec() ||
1045 mts_group_status != Relay_log_info::MTS_IN_GROUP);
1046 /*
1047 We do not force synchronization at this point, note the
1048 parameter false, because a non-transactional change is
1049 being committed.
1050
1051 For that reason, the synchronization here is subjected to
1052 the option sync_relay_log_info.
1053
1054 See sql/rpl_rli.h for further information on this behavior.
1055 */
1056 error= flush_info(FALSE);
1057
1058 mysql_cond_broadcast(&data_cond);
1059 if (need_data_lock)
1060 mysql_mutex_unlock(&data_lock);
1061 DBUG_RETURN(error);
1062 }
1063
1064
close_temporary_tables()1065 void Relay_log_info::close_temporary_tables()
1066 {
1067 TABLE *table,*next;
1068 DBUG_ENTER("Relay_log_info::close_temporary_tables");
1069
1070 for (table=save_temporary_tables ; table ; table=next)
1071 {
1072 next=table->next;
1073 /*
1074 Don't ask for disk deletion. For now, anyway they will be deleted when
1075 slave restarts, but it is a better intention to not delete them.
1076 */
1077 DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1078 close_temporary(table, 1, 0);
1079 }
1080 save_temporary_tables= 0;
1081 slave_open_temp_tables= 0;
1082 DBUG_VOID_RETURN;
1083 }
1084
1085 /**
1086 Purges relay logs. It assumes to have a run lock on rli and that no
1087 slave thread are running.
1088
1089 @param[in] THD connection,
1090 @param[in] just_reset if false, it tells that logs should be purged
1091 and @c init_relay_log_pos() should be called,
1092 @errmsg[out] errmsg store pointer to an error message.
1093
1094 @retval 0 successfuly executed,
1095 @retval 1 otherwise error, where errmsg is set to point to the error message.
1096 */
1097
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg)1098 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1099 const char** errmsg)
1100 {
1101 int error=0;
1102 const char *ln;
1103 char name_buf[FN_REFLEN];
1104 DBUG_ENTER("Relay_log_info::purge_relay_logs");
1105
1106 /*
1107 Even if inited==0, we still try to empty master_log_* variables. Indeed,
1108 inited==0 does not imply that they already are empty.
1109
1110 It could be that slave's info initialization partly succeeded: for example
1111 if relay-log.info existed but *relay-bin*.* have been manually removed,
1112 init_info reads the old relay-log.info and fills rli->master_log_*, then
1113 init_info checks for the existence of the relay log, this fails and
1114 init_info leaves inited to 0.
1115 In that pathological case, master_log_pos* will be properly reinited at
1116 the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1117 purge_relay_logs, will delete bogus *.info files or replace them with
1118 correct files), however if the user does SHOW SLAVE STATUS before START
1119 SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1120 master_log_* for SHOW SLAVE STATUS to display fine in any case.
1121 */
1122 group_master_log_name[0]= 0;
1123 group_master_log_pos= 0;
1124
1125 if (!inited)
1126 {
1127 DBUG_PRINT("info", ("inited == 0"));
1128 if (error_on_rli_init_info)
1129 {
1130 ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1131 1, name_buf);
1132 if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1133 {
1134 sql_print_error("Unable to purge relay log files. Failed to open relay "
1135 "log index file:%s.", relay_log.get_index_fname());
1136
1137 DBUG_RETURN(1);
1138 }
1139 mysql_mutex_lock(&mi->data_lock);
1140 if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1141 (max_relay_log_size ? max_relay_log_size :
1142 max_binlog_size), true,
1143 true/*need_lock_log=true*/,
1144 true/*need_lock_index=true*/,
1145 true/*need_sid_lock=true*/,
1146 mi->get_mi_description_event()))
1147 {
1148 sql_print_error("Unable to purge relay log files. Failed to open relay "
1149 "log file:%s.", relay_log.get_log_fname());
1150 mysql_mutex_unlock(&mi->data_lock);
1151 DBUG_RETURN(1);
1152 }
1153 mysql_mutex_unlock(&mi->data_lock);
1154 }
1155 else
1156 DBUG_RETURN(0);
1157 }
1158 else
1159 {
1160 DBUG_ASSERT(slave_running == 0);
1161 DBUG_ASSERT(mi->slave_running == 0);
1162 }
1163
1164 slave_skip_counter= 0;
1165 mysql_mutex_lock(&data_lock);
1166
1167 /*
1168 we close the relay log fd possibly left open by the slave SQL thread,
1169 to be able to delete it; the relay log fd possibly left open by the slave
1170 I/O thread will be closed naturally in reset_logs() by the
1171 close(LOG_CLOSE_TO_BE_OPENED) call
1172 */
1173 if (cur_log_fd >= 0)
1174 {
1175 end_io_cache(&cache_buf);
1176 my_close(cur_log_fd, MYF(MY_WME));
1177 cur_log_fd= -1;
1178 }
1179
1180 if (relay_log.reset_logs(thd))
1181 {
1182 *errmsg = "Failed during log reset";
1183 error=1;
1184 goto err;
1185 }
1186 /* Save name of used relay log file */
1187 strmake(group_relay_log_name, relay_log.get_log_fname(),
1188 sizeof(group_relay_log_name)-1);
1189 strmake(event_relay_log_name, relay_log.get_log_fname(),
1190 sizeof(event_relay_log_name)-1);
1191 group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1192 if (count_relay_log_space())
1193 {
1194 *errmsg= "Error counting relay log space";
1195 error= 1;
1196 goto err;
1197 }
1198 if (!just_reset)
1199 error= init_relay_log_pos(group_relay_log_name,
1200 group_relay_log_pos,
1201 false/*need_data_lock=false*/, errmsg, 0);
1202
1203 if (!inited && error_on_rli_init_info)
1204 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1205 true/*need_lock_log=true*/,
1206 true/*need_lock_index=true*/);
1207
1208 err:
1209 #ifndef DBUG_OFF
1210 char buf[22];
1211 #endif
1212 DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1213 mysql_mutex_unlock(&data_lock);
1214 DBUG_RETURN(error);
1215 }
1216
1217
1218 /**
1219 Checks if condition stated in UNTIL clause of START SLAVE is reached.
1220
1221 Specifically, it checks if UNTIL condition is reached. Uses caching result
1222 of last comparison of current log file name and target log file name. So
1223 cached value should be invalidated if current log file name changes (see
1224 @c Relay_log_info::notify_... functions).
1225
1226 This caching is needed to avoid of expensive string comparisons and
1227 @c strtol() conversions needed for log names comparison. We don't need to
1228 compare them each time this function is called, we only need to do this
1229 when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1230 need to do this only after @c Rotate_log_event::do_apply_event() (which is
1231 rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1232 condition then we should invalidate cached comarison value after
1233 @c inc_group_relay_log_pos() which called for each group of events (so we
1234 have some benefit if we have something like queries that use
1235 autoincrement or if we have transactions).
1236
1237 Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1238
1239 @param master_beg_pos position of the beginning of to be executed event
1240 (not @c log_pos member of the event that points to
1241 the beginning of the following event)
1242
1243 @retval true condition met or error happened (condition seems to have
1244 bad log file name),
1245 @retval false condition not met.
1246 */
1247
is_until_satisfied(THD * thd,Log_event * ev)1248 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1249 {
1250 char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1251 "condition is bad.";
1252 DBUG_ENTER("Relay_log_info::is_until_satisfied");
1253
1254 switch (until_condition)
1255 {
1256 case UNTIL_MASTER_POS:
1257 case UNTIL_RELAY_POS:
1258 {
1259 const char *log_name= NULL;
1260 ulonglong log_pos= 0;
1261
1262 if (until_condition == UNTIL_MASTER_POS)
1263 {
1264 if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1265 DBUG_RETURN(false);
1266 log_name= group_master_log_name;
1267 log_pos= (!ev || is_in_group() || !ev->log_pos) ?
1268 group_master_log_pos : ev->log_pos - ev->data_written;
1269 }
1270 else
1271 { /* until_condition == UNTIL_RELAY_POS */
1272 log_name= group_relay_log_name;
1273 log_pos= group_relay_log_pos;
1274 }
1275
1276 #ifndef DBUG_OFF
1277 {
1278 char buf[32];
1279 DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1280 group_master_log_name, llstr(group_master_log_pos, buf)));
1281 DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1282 group_relay_log_name, llstr(group_relay_log_pos, buf)));
1283 DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1284 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1285 log_name, llstr(log_pos, buf)));
1286 DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1287 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1288 until_log_name, llstr(until_log_pos, buf)));
1289 }
1290 #endif
1291
1292 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1293 {
1294 /*
1295 We have no cached comparison results so we should compare log names
1296 and cache result.
1297 If we are after RESET SLAVE, and the SQL slave thread has not processed
1298 any event yet, it could be that group_master_log_name is "". In that case,
1299 just wait for more events (as there is no sensible comparison to do).
1300 */
1301
1302 if (*log_name)
1303 {
1304 const char *basename= log_name + dirname_length(log_name);
1305
1306 const char *q= (const char*)(fn_ext(basename)+1);
1307 if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1308 {
1309 /* Now compare extensions. */
1310 char *q_end;
1311 ulong log_name_extension= strtoul(q, &q_end, 10);
1312 if (log_name_extension < until_log_name_extension)
1313 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1314 else
1315 until_log_names_cmp_result=
1316 (log_name_extension > until_log_name_extension) ?
1317 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1318 }
1319 else
1320 {
1321 /* Base names do not match, so we abort */
1322 sql_print_error("%s", error_msg);
1323 DBUG_RETURN(true);
1324 }
1325 }
1326 else
1327 DBUG_RETURN(until_log_pos == 0);
1328 }
1329
1330 if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1331 log_pos >= until_log_pos) ||
1332 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1333 {
1334 char buf[22];
1335 sql_print_information("Slave SQL thread stopped because it reached its"
1336 " UNTIL position %s", llstr(until_pos(), buf));
1337 DBUG_RETURN(true);
1338 }
1339 DBUG_RETURN(false);
1340 }
1341
1342 case UNTIL_SQL_BEFORE_GTIDS:
1343 // We only need to check once if logged_gtids set contains any of the until_sql_gtids.
1344 if (until_sql_gtids_first_event)
1345 {
1346 until_sql_gtids_first_event= false;
1347 global_sid_lock->wrlock();
1348 /* Check if until GTIDs were already applied. */
1349 const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1350 if (until_sql_gtids.is_intersection_nonempty(logged_gtids))
1351 {
1352 char *buffer= until_sql_gtids.to_string();
1353 global_sid_lock->unlock();
1354 sql_print_information("Slave SQL thread stopped because "
1355 "UNTIL SQL_BEFORE_GTIDS %s is already "
1356 "applied", buffer);
1357 my_free(buffer);
1358 DBUG_RETURN(true);
1359 }
1360 global_sid_lock->unlock();
1361 }
1362 if (ev != NULL && ev->get_type_code() == GTID_LOG_EVENT)
1363 {
1364 Gtid_log_event *gev= (Gtid_log_event *)ev;
1365 global_sid_lock->rdlock();
1366 if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1367 {
1368 char *buffer= until_sql_gtids.to_string();
1369 global_sid_lock->unlock();
1370 sql_print_information("Slave SQL thread stopped because it reached "
1371 "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1372 my_free(buffer);
1373 DBUG_RETURN(true);
1374 }
1375 global_sid_lock->unlock();
1376 }
1377 DBUG_RETURN(false);
1378 break;
1379
1380 case UNTIL_SQL_AFTER_GTIDS:
1381 {
1382 global_sid_lock->wrlock();
1383 const Gtid_set* logged_gtids= gtid_state->get_logged_gtids();
1384 if (until_sql_gtids.is_subset(logged_gtids))
1385 {
1386 char *buffer= until_sql_gtids.to_string();
1387 global_sid_lock->unlock();
1388 sql_print_information("Slave SQL thread stopped because it reached "
1389 "UNTIL SQL_AFTER_GTIDS %s", buffer);
1390 my_free(buffer);
1391 DBUG_RETURN(true);
1392 }
1393 global_sid_lock->unlock();
1394 DBUG_RETURN(false);
1395 }
1396 break;
1397
1398 case UNTIL_SQL_AFTER_MTS_GAPS:
1399 case UNTIL_DONE:
1400 /*
1401 TODO: this condition is actually post-execution or post-scheduling
1402 so the proper place to check it before SQL thread goes
1403 into next_event() where it can wait while the condition
1404 has been satisfied already.
1405 It's deployed here temporarily to be fixed along the regular UNTIL
1406 support for MTS is provided.
1407 */
1408 if (mts_recovery_group_cnt == 0)
1409 {
1410 sql_print_information("Slave SQL thread stopped according to "
1411 "UNTIL SQL_AFTER_MTS_GAPS as it has "
1412 "processed all gap transactions left from "
1413 "the previous slave session.");
1414 until_condition= UNTIL_DONE;
1415 DBUG_RETURN(true);
1416 }
1417 else
1418 {
1419 DBUG_RETURN(false);
1420 }
1421 break;
1422
1423 case UNTIL_NONE:
1424 DBUG_ASSERT(0);
1425 break;
1426 }
1427
1428 DBUG_ASSERT(0);
1429 DBUG_RETURN(false);
1430 }
1431
cached_charset_invalidate()1432 void Relay_log_info::cached_charset_invalidate()
1433 {
1434 DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1435
1436 /* Full of zeroes means uninitialized. */
1437 memset(cached_charset, 0, sizeof(cached_charset));
1438 DBUG_VOID_RETURN;
1439 }
1440
1441
cached_charset_compare(char * charset) const1442 bool Relay_log_info::cached_charset_compare(char *charset) const
1443 {
1444 DBUG_ENTER("Relay_log_info::cached_charset_compare");
1445
1446 if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1447 {
1448 memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1449 DBUG_RETURN(1);
1450 }
1451 DBUG_RETURN(0);
1452 }
1453
1454
stmt_done(my_off_t event_master_log_pos)1455 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1456 {
1457 int error= 0;
1458
1459 clear_flag(IN_STMT);
1460
1461 DBUG_ASSERT(!belongs_to_client());
1462 /* Worker does not execute binlog update position logics */
1463 DBUG_ASSERT(!is_mts_worker(info_thd));
1464
1465 /*
1466 Replication keeps event and group positions to specify the
1467 set of events that were executed.
1468 Event positions are incremented after processing each event
1469 whereas group positions are incremented when an event or a
1470 set of events is processed such as in a transaction and are
1471 committed or rolled back.
1472
1473 A transaction can be ended with a Query Event, i.e. either
1474 commit or rollback, or by a Xid Log Event. Query Event is
1475 used to terminate pseudo-transactions that are executed
1476 against non-transactional engines such as MyIsam. Xid Log
1477 Event denotes though that a set of changes executed
1478 against a transactional engine is about to commit.
1479
1480 Events' positions are incremented at stmt_done(). However,
1481 transactions that are ended with Xid Log Event have their
1482 group position incremented in the do_apply_event() and in
1483 the do_apply_event_work().
1484
1485 Notice that the type of the engine, i.e. where data and
1486 positions are stored, against what events are being applied
1487 are not considered in this logic.
1488
1489 Regarding the code that follows, notice that the executed
1490 group coordinates don't change if the current event is internal
1491 to the group. The same applies to MTS Coordinator when it
1492 handles a Format Descriptor event that appears in the middle
1493 of a group that is about to be assigned.
1494 */
1495 if ((!is_parallel_exec() && is_in_group()) ||
1496 mts_group_status != MTS_NOT_IN_GROUP)
1497 {
1498 inc_event_relay_log_pos();
1499 }
1500 else
1501 {
1502 if (is_parallel_exec())
1503 {
1504
1505 DBUG_ASSERT(!is_mts_worker(info_thd));
1506
1507 /*
1508 Format Description events only can drive MTS execution to this
1509 point. It is a special event group that is handled with
1510 synchronization. For that reason, the checkpoint routine is
1511 called here.
1512 */
1513 error= mts_checkpoint_routine(this, 0, false,
1514 true/*need_data_lock=true*/);
1515 }
1516 if (!error)
1517 error= inc_group_relay_log_pos(event_master_log_pos,
1518 true/*need_data_lock=true*/);
1519 }
1520
1521 return error;
1522 }
1523
1524 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1525 void Relay_log_info::cleanup_context(THD *thd, bool error)
1526 {
1527 DBUG_ENTER("Relay_log_info::cleanup_context");
1528
1529 DBUG_ASSERT(info_thd == thd);
1530 /*
1531 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1532 may have opened tables, which we cannot be sure have been closed (because
1533 maybe the Rows_log_event have not been found or will not be, because slave
1534 SQL thread is stopping, or relay log has a missing tail etc). So we close
1535 all thread's tables. And so the table mappings have to be cancelled.
1536 2) Rows_log_event::do_apply_event() may even have started statements or
1537 transactions on them, which we need to rollback in case of error.
1538 3) If finding a Format_description_log_event after a BEGIN, we also need
1539 to rollback before continuing with the next events.
1540 4) so we need this "context cleanup" function.
1541 */
1542 if (error)
1543 {
1544 trans_rollback_stmt(thd); // if a "statement transaction"
1545 trans_rollback(thd); // if a "real transaction"
1546 }
1547 if (rows_query_ev)
1548 {
1549 info_thd->reset_query_for_display();
1550 delete rows_query_ev;
1551 rows_query_ev= NULL;
1552 info_thd->set_query(NULL, 0);
1553 }
1554 m_table_map.clear_tables();
1555 slave_close_thread_tables(thd);
1556 if (error)
1557 thd->mdl_context.release_transactional_locks();
1558 clear_flag(IN_STMT);
1559 /*
1560 Cleanup for the flags that have been set at do_apply_event.
1561 */
1562 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1563 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1564
1565 /*
1566 Reset state related to long_find_row notes in the error log:
1567 - timestamp
1568 - flag that decides whether the slave prints or not
1569 */
1570 reset_row_stmt_start_timestamp();
1571 unset_long_find_row_note_printed();
1572
1573 DBUG_VOID_RETURN;
1574 }
1575
clear_tables_to_lock()1576 void Relay_log_info::clear_tables_to_lock()
1577 {
1578 DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
1579 #ifndef DBUG_OFF
1580 /**
1581 When replicating in RBR and MyISAM Merge tables are involved
1582 open_and_lock_tables (called in do_apply_event) appends the
1583 base tables to the list of tables_to_lock. Then these are
1584 removed from the list in close_thread_tables (which is called
1585 before we reach this point).
1586
1587 This assertion just confirms that we get no surprises at this
1588 point.
1589 */
1590 uint i=0;
1591 for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1592 DBUG_ASSERT(i == tables_to_lock_count);
1593 #endif
1594
1595 while (tables_to_lock)
1596 {
1597 uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1598 if (tables_to_lock->m_tabledef_valid)
1599 {
1600 tables_to_lock->m_tabledef.table_def::~table_def();
1601 tables_to_lock->m_tabledef_valid= FALSE;
1602 }
1603
1604 /*
1605 If blob fields were used during conversion of field values
1606 from the master table into the slave table, then we need to
1607 free the memory used temporarily to store their values before
1608 copying into the slave's table.
1609 */
1610 if (tables_to_lock->m_conv_table)
1611 free_blobs(tables_to_lock->m_conv_table);
1612
1613 tables_to_lock=
1614 static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1615 tables_to_lock_count--;
1616 my_free(to_free);
1617 }
1618 DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
1619 DBUG_VOID_RETURN;
1620 }
1621
slave_close_thread_tables(THD * thd)1622 void Relay_log_info::slave_close_thread_tables(THD *thd)
1623 {
1624 thd->get_stmt_da()->set_overwrite_status(true);
1625 DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
1626 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1627 thd->get_stmt_da()->set_overwrite_status(false);
1628
1629 close_thread_tables(thd);
1630 /*
1631 - If transaction rollback was requested due to deadlock
1632 perform it and release metadata locks.
1633 - If inside a multi-statement transaction,
1634 defer the release of metadata locks until the current
1635 transaction is either committed or rolled back. This prevents
1636 other statements from modifying the table for the entire
1637 duration of this transaction. This provides commit ordering
1638 and guarantees serializability across multiple transactions.
1639 - If in autocommit mode, or outside a transactional context,
1640 automatically release metadata locks of the current statement.
1641 */
1642 if (thd->transaction_rollback_request)
1643 {
1644 trans_rollback_implicit(thd);
1645 thd->mdl_context.release_transactional_locks();
1646 }
1647 else if (! thd->in_multi_stmt_transaction_mode())
1648 thd->mdl_context.release_transactional_locks();
1649 else
1650 thd->mdl_context.release_statement_locks();
1651
1652 clear_tables_to_lock();
1653 DBUG_VOID_RETURN;
1654 }
1655 /**
1656 Execute a SHOW RELAYLOG EVENTS statement.
1657
1658 @param thd Pointer to THD object for the client thread executing the
1659 statement.
1660
1661 @retval FALSE success
1662 @retval TRUE failure
1663 */
mysql_show_relaylog_events(THD * thd)1664 bool mysql_show_relaylog_events(THD* thd)
1665 {
1666 Protocol *protocol= thd->protocol;
1667 List<Item> field_list;
1668 DBUG_ENTER("mysql_show_relaylog_events");
1669
1670 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1671
1672 Log_event::init_show_field_list(&field_list);
1673 if (protocol->send_result_set_metadata(&field_list,
1674 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1675 DBUG_RETURN(TRUE);
1676
1677 if (active_mi == NULL)
1678 {
1679 my_error(ER_SLAVE_CONFIGURATION, MYF(0));
1680 DBUG_RETURN(true);
1681 }
1682
1683 DBUG_RETURN(show_binlog_events(thd, &active_mi->rli->relay_log));
1684 }
1685
1686 #endif
1687
rli_init_info()1688 int Relay_log_info::rli_init_info()
1689 {
1690 int error= 0;
1691 enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
1692 const char *msg= NULL;
1693
1694 DBUG_ENTER("Relay_log_info::rli_init_info");
1695
1696 mysql_mutex_assert_owner(&data_lock);
1697
1698 /*
1699 If Relay_log_info is issued again after a failed init_info(), for
1700 instance because of missing relay log files, it will generate new
1701 files and ignore the previous failure, to avoid that we set
1702 error_on_rli_init_info as true.
1703 This a consequence of the behaviour change, in the past server was
1704 stopped when there were replication initialization errors, now it is
1705 not and so init_info() must be aware of previous failures.
1706 */
1707 if (error_on_rli_init_info)
1708 goto err;
1709
1710 if (inited)
1711 {
1712 /*
1713 We have to reset read position of relay-log-bin as we may have
1714 already been reading from 'hotlog' when the slave was stopped
1715 last time. If this case pos_in_file would be set and we would
1716 get a crash when trying to read the signature for the binary
1717 relay log.
1718
1719 We only rewind the read position if we are starting the SQL
1720 thread. The handle_slave_sql thread assumes that the read
1721 position is at the beginning of the file, and will read the
1722 "signature" and then fast-forward to the last position read.
1723 */
1724 bool hot_log= FALSE;
1725 /*
1726 my_b_seek does an implicit flush_io_cache, so we need to:
1727
1728 1. check if this log is active (hot)
1729 2. if it is we keep log_lock until the seek ends, otherwise
1730 release it right away.
1731
1732 If we did not take log_lock, SQL thread might race with IO
1733 thread for the IO_CACHE mutex.
1734
1735 */
1736 mysql_mutex_t *log_lock= relay_log.get_log_lock();
1737 mysql_mutex_lock(log_lock);
1738 hot_log= relay_log.is_active(linfo.log_file_name);
1739
1740 if (!hot_log)
1741 mysql_mutex_unlock(log_lock);
1742
1743 my_b_seek(cur_log, (my_off_t) 0);
1744
1745 if (hot_log)
1746 mysql_mutex_unlock(log_lock);
1747 DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
1748 }
1749
1750 cur_log_fd = -1;
1751 slave_skip_counter= 0;
1752 abort_pos_wait= 0;
1753 log_space_limit= relay_log_space_limit;
1754 log_space_total= 0;
1755 tables_to_lock= 0;
1756 tables_to_lock_count= 0;
1757
1758 char pattern[FN_REFLEN];
1759 (void) my_realpath(pattern, slave_load_tmpdir, 0);
1760 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
1761 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
1762 {
1763 sql_print_error("Unable to use slave's temporary directory '%s'.",
1764 slave_load_tmpdir);
1765 DBUG_RETURN(1);
1766 }
1767 unpack_filename(slave_patternload_file, pattern);
1768 slave_patternload_file_size= strlen(slave_patternload_file);
1769
1770 /*
1771 The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
1772 Note that the I/O thread flushes it to disk after writing every
1773 event, in flush_info within the master info.
1774 */
1775 /*
1776 For the maximum log size, we choose max_relay_log_size if it is
1777 non-zero, max_binlog_size otherwise. If later the user does SET
1778 GLOBAL on one of these variables, fix_max_binlog_size and
1779 fix_max_relay_log_size will reconsider the choice (for example
1780 if the user changes max_relay_log_size to zero, we have to
1781 switch to using max_binlog_size for the relay log) and update
1782 relay_log.max_size (and mysql_bin_log.max_size).
1783 */
1784 {
1785 /* Reports an error and returns, if the --relay-log's path
1786 is a directory.*/
1787 if (opt_relay_logname &&
1788 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
1789 {
1790 sql_print_error("Path '%s' is a directory name, please specify \
1791 a file name for --relay-log option.", opt_relay_logname);
1792 DBUG_RETURN(1);
1793 }
1794
1795 /* Reports an error and returns, if the --relay-log-index's path
1796 is a directory.*/
1797 if (opt_relaylog_index_name &&
1798 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
1799 == FN_LIBCHAR)
1800 {
1801 sql_print_error("Path '%s' is a directory name, please specify \
1802 a file name for --relay-log-index option.", opt_relaylog_index_name);
1803 DBUG_RETURN(1);
1804 }
1805
1806 char buf[FN_REFLEN];
1807 const char *ln;
1808 static bool name_warning_sent= 0;
1809 ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1810 1, buf);
1811 /* We send the warning only at startup, not after every RESET SLAVE */
1812 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
1813 {
1814 /*
1815 User didn't give us info to name the relay log index file.
1816 Picking `hostname`-relay-bin.index like we do, causes replication to
1817 fail if this slave's hostname is changed later. So, we would like to
1818 instead require a name. But as we don't want to break many existing
1819 setups, we only give warning, not error.
1820 */
1821 sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
1822 " so replication "
1823 "may break when this MySQL server acts as a "
1824 "slave and has his hostname changed!! Please "
1825 "use '--relay-log=%s' to avoid this problem.", ln);
1826 name_warning_sent= 1;
1827 }
1828
1829 relay_log.is_relay_log= TRUE;
1830
1831 if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1832 {
1833 sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
1834 DBUG_RETURN(1);
1835 }
1836 #ifndef DBUG_OFF
1837 global_sid_lock->wrlock();
1838 gtid_set.dbug_print("set of GTIDs in relay log before initialization");
1839 global_sid_lock->unlock();
1840 #endif
1841 /*
1842 Below init_gtid_sets() function will parse the available relay logs and
1843 set I/O retrieved gtid event in gtid_state object. We dont need to find
1844 last_retrieved_gtid_event if relay_log_recovery=1 (retrieved set will
1845 be cleared off in that case).
1846 */
1847 if (!current_thd &&
1848 relay_log.init_gtid_sets(>id_set, NULL,
1849 is_relay_log_recovery ? NULL : get_last_retrieved_gtid(),
1850 opt_slave_sql_verify_checksum,
1851 true/*true=need lock*/))
1852 {
1853 sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
1854 DBUG_RETURN(1);
1855 }
1856 #ifndef DBUG_OFF
1857 global_sid_lock->wrlock();
1858 gtid_set.dbug_print("set of GTIDs in relay log after initialization");
1859 global_sid_lock->unlock();
1860 #endif
1861 /*
1862 Configures what object is used by the current log to store processed
1863 gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
1864 corretly compute the set of previous gtids.
1865 */
1866 relay_log.set_previous_gtid_set(>id_set);
1867 /*
1868 note, that if open() fails, we'll still have index file open
1869 but a destructor will take care of that
1870 */
1871 if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1872 (max_relay_log_size ? max_relay_log_size :
1873 max_binlog_size), true,
1874 true/*need_lock_log=true*/,
1875 true/*need_lock_index=true*/,
1876 true/*need_sid_lock=true*/,
1877 mi->get_mi_description_event()))
1878 {
1879 sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
1880 DBUG_RETURN(1);
1881 }
1882 }
1883
1884 /*
1885 This checks if the repository was created before and thus there
1886 will be values to be read. Please, do not move this call after
1887 the handler->init_info().
1888 */
1889 if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
1890 {
1891 msg= "Error checking relay log repository";
1892 error= 1;
1893 goto err;
1894 }
1895
1896 if (handler->init_info())
1897 {
1898 msg= "Error reading relay log configuration";
1899 error= 1;
1900 goto err;
1901 }
1902
1903 if (check_return == REPOSITORY_DOES_NOT_EXIST)
1904 {
1905 /* Init relay log with first entry in the relay index file */
1906 if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
1907 false/*need_data_lock=false (lock should be held
1908 prior to invoking this function)*/,
1909 &msg, 0))
1910 {
1911 error= 1;
1912 goto err;
1913 }
1914 group_master_log_name[0]= 0;
1915 group_master_log_pos= 0;
1916 }
1917 else
1918 {
1919 if (read_info(handler))
1920 {
1921 msg= "Error reading relay log configuration";
1922 error= 1;
1923 goto err;
1924 }
1925
1926 if (is_relay_log_recovery && init_recovery(mi, &msg))
1927 {
1928 error= 1;
1929 goto err;
1930 }
1931
1932 if (init_relay_log_pos(group_relay_log_name,
1933 group_relay_log_pos,
1934 false/*need_data_lock=false (lock should be held
1935 prior to invoking this function)*/,
1936 &msg, 0))
1937 {
1938 char llbuf[22];
1939 sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
1940 group_relay_log_name,
1941 llstr(group_relay_log_pos, llbuf));
1942 error= 1;
1943 goto err;
1944 }
1945
1946 #ifndef DBUG_OFF
1947 {
1948 char llbuf1[22], llbuf2[22];
1949 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
1950 llstr(my_b_tell(cur_log),llbuf1),
1951 llstr(event_relay_log_pos,llbuf2)));
1952 DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
1953 DBUG_ASSERT((my_b_tell(cur_log) == event_relay_log_pos));
1954 }
1955 #endif
1956 }
1957
1958 inited= 1;
1959 error_on_rli_init_info= false;
1960 if (flush_info(TRUE))
1961 {
1962 msg= "Error reading relay log configuration";
1963 error= 1;
1964 goto err;
1965 }
1966
1967 if (count_relay_log_space())
1968 {
1969 msg= "Error counting relay log space";
1970 error= 1;
1971 goto err;
1972 }
1973
1974 /*
1975 In case of MTS the recovery is deferred until the end of global_init_info.
1976 */
1977 if (!mi->rli->mts_recovery_group_cnt)
1978 is_relay_log_recovery= FALSE;
1979 DBUG_RETURN(error);
1980
1981 err:
1982 handler->end_info();
1983 inited= 0;
1984 error_on_rli_init_info= true;
1985 if (msg)
1986 sql_print_error("%s.", msg);
1987 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1988 true/*need_lock_log=true*/,
1989 true/*need_lock_index=true*/);
1990 DBUG_RETURN(error);
1991 }
1992
end_info()1993 void Relay_log_info::end_info()
1994 {
1995 DBUG_ENTER("Relay_log_info::end_info");
1996
1997 error_on_rli_init_info= false;
1998 if (!inited)
1999 DBUG_VOID_RETURN;
2000
2001 handler->end_info();
2002
2003 if (cur_log_fd >= 0)
2004 {
2005 end_io_cache(&cache_buf);
2006 (void)my_close(cur_log_fd, MYF(MY_WME));
2007 cur_log_fd= -1;
2008 }
2009 inited = 0;
2010 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2011 true/*need_lock_log=true*/,
2012 true/*need_lock_index=true*/);
2013 relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2014 /*
2015 Delete the slave's temporary tables from memory.
2016 In the future there will be other actions than this, to ensure persistance
2017 of slave's temp tables after shutdown.
2018 */
2019 close_temporary_tables();
2020
2021 DBUG_VOID_RETURN;
2022 }
2023
flush_current_log()2024 int Relay_log_info::flush_current_log()
2025 {
2026 DBUG_ENTER("Relay_log_info::flush_current_log");
2027 /*
2028 When we come to this place in code, relay log may or not be initialized;
2029 the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2030 */
2031 IO_CACHE *log_file= relay_log.get_log_file();
2032 if (flush_io_cache(log_file))
2033 DBUG_RETURN(2);
2034
2035 DBUG_RETURN(0);
2036 }
2037
set_master_info(Master_info * info)2038 void Relay_log_info::set_master_info(Master_info* info)
2039 {
2040 mi= info;
2041 }
2042
2043 /**
2044 Stores the file and position where the execute-slave thread are in the
2045 relay log:
2046
2047 - As this is only called by the slave thread or on STOP SLAVE, with the
2048 log_lock grabbed and the slave thread stopped, we don't need to have
2049 a lock here.
2050 - If there is an active transaction, then we don't update the position
2051 in the relay log. This is to ensure that we re-execute statements
2052 if we die in the middle of an transaction that was rolled back.
2053 - As a transaction never spans binary logs, we don't have to handle the
2054 case where we do a relay-log-rotation in the middle of the transaction.
2055 If this would not be the case, we would have to ensure that we
2056 don't delete the relay log file where the transaction started when
2057 we switch to a new relay log file.
2058
2059 @retval 0 ok,
2060 @retval 1 write error, otherwise.
2061 */
2062
2063 /**
2064 Store the file and position where the slave's SQL thread are in the
2065 relay log.
2066
2067 Notes:
2068
2069 - This function should be called either from the slave SQL thread,
2070 or when the slave thread is not running. (It reads the
2071 group_{relay|master}_log_{pos|name} and delay fields in the rli
2072 object. These may only be modified by the slave SQL thread or by
2073 a client thread when the slave SQL thread is not running.)
2074
2075 - If there is an active transaction, then we do not update the
2076 position in the relay log. This is to ensure that we re-execute
2077 statements if we die in the middle of an transaction that was
2078 rolled back.
2079
2080 - As a transaction never spans binary logs, we don't have to handle
2081 the case where we do a relay-log-rotation in the middle of the
2082 transaction. If transactions could span several binlogs, we would
2083 have to ensure that we do not delete the relay log file where the
2084 transaction started before switching to a new relay log file.
2085
2086 - Error can happen if writing to file fails or if flushing the file
2087 fails.
2088
2089 @param rli The object representing the Relay_log_info.
2090
2091 @todo Change the log file information to a binary format to avoid
2092 calling longlong2str.
2093
2094 @return 0 on success, 1 on error.
2095 */
flush_info(const bool force)2096 int Relay_log_info::flush_info(const bool force)
2097 {
2098 DBUG_ENTER("Relay_log_info::flush_info");
2099
2100 if (!inited)
2101 DBUG_RETURN(0);
2102
2103 /*
2104 We update the sync_period at this point because only here we
2105 now that we are handling a relay log info. This needs to be
2106 update every time we call flush because the option maybe
2107 dinamically set.
2108 */
2109 handler->set_sync_period(sync_relayloginfo_period);
2110
2111 if (write_info(handler))
2112 goto err;
2113
2114 if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2115 goto err;
2116
2117 force_flush_postponed_due_to_split_trans= false;
2118 DBUG_RETURN(0);
2119
2120 err:
2121 sql_print_error("Error writing relay log configuration.");
2122 DBUG_RETURN(1);
2123 }
2124
get_number_info_rli_fields()2125 size_t Relay_log_info::get_number_info_rli_fields()
2126 {
2127 return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2128 }
2129
read_info(Rpl_info_handler * from)2130 bool Relay_log_info::read_info(Rpl_info_handler *from)
2131 {
2132 int lines= 0;
2133 char *first_non_digit= NULL;
2134 ulong temp_group_relay_log_pos= 0;
2135 ulong temp_group_master_log_pos= 0;
2136 int temp_sql_delay= 0;
2137 int temp_internal_id= internal_id;
2138
2139 DBUG_ENTER("Relay_log_info::read_info");
2140
2141 /*
2142 Should not read RLI from file in client threads. Client threads
2143 only use RLI to execute BINLOG statements.
2144
2145 @todo Uncomment the following assertion. Currently,
2146 Relay_log_info::init() is called from init_master_info() before
2147 the THD object Relay_log_info::sql_thd is created. That means we
2148 cannot call belongs_to_client() since belongs_to_client()
2149 dereferences Relay_log_info::sql_thd. So we need to refactor
2150 slightly: the THD object should be created by Relay_log_info
2151 constructor (or passed to it), so that we are guaranteed that it
2152 exists at this point. /Sven
2153 */
2154 //DBUG_ASSERT(!belongs_to_client());
2155
2156 /*
2157 Starting from 5.1.x, relay-log.info has a new format. Now, its
2158 first line contains the number of lines in the file. By reading
2159 this number we can determine which version our master.info comes
2160 from. We can't simply count the lines in the file, since
2161 versions before 5.1.x could generate files with more lines than
2162 needed. If first line doesn't contain a number, or if it
2163 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2164 then the file is treated like a file from pre-5.1.x version.
2165 There is no ambiguity when reading an old master.info: before
2166 5.1.x, the first line contained the binlog's name, which is
2167 either empty or has an extension (contains a '.'), so can't be
2168 confused with an integer.
2169
2170 So we're just reading first line and trying to figure which
2171 version is this.
2172 */
2173
2174 /*
2175 The first row is temporarily stored in mi->master_log_name, if
2176 it is line count and not binlog name (new format) it will be
2177 overwritten by the second row later.
2178 */
2179 if (from->prepare_info_for_read() ||
2180 from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2181 (char *) ""))
2182 DBUG_RETURN(TRUE);
2183
2184 lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2185
2186 if (group_relay_log_name[0]!='\0' &&
2187 *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2188 {
2189 /* Seems to be new format => read group relay log name */
2190 if (from->get_info(group_relay_log_name, (size_t) sizeof(group_relay_log_name),
2191 (char *) ""))
2192 DBUG_RETURN(TRUE);
2193 }
2194 else
2195 DBUG_PRINT("info", ("relay_log_info file is in old format."));
2196
2197 if (from->get_info((ulong *) &temp_group_relay_log_pos,
2198 (ulong) BIN_LOG_HEADER_SIZE) ||
2199 from->get_info(group_master_log_name,
2200 (size_t) sizeof(group_relay_log_name),
2201 (char *) "") ||
2202 from->get_info((ulong *) &temp_group_master_log_pos,
2203 (ulong) 0))
2204 DBUG_RETURN(TRUE);
2205
2206 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2207 {
2208 if (from->get_info((int *) &temp_sql_delay, (int) 0))
2209 DBUG_RETURN(TRUE);
2210 }
2211
2212 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2213 {
2214 if (from->get_info(&recovery_parallel_workers,(ulong) 0))
2215 DBUG_RETURN(TRUE);
2216 }
2217
2218 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2219 {
2220 if (from->get_info(&temp_internal_id, (int) 1))
2221 DBUG_RETURN(TRUE);
2222 }
2223
2224 group_relay_log_pos= temp_group_relay_log_pos;
2225 group_master_log_pos= temp_group_master_log_pos;
2226 sql_delay= (int32) temp_sql_delay;
2227 internal_id= (uint) temp_internal_id;
2228
2229 DBUG_ASSERT(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2230 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2231 DBUG_RETURN(FALSE);
2232 }
2233
write_info(Rpl_info_handler * to)2234 bool Relay_log_info::write_info(Rpl_info_handler *to)
2235 {
2236 DBUG_ENTER("Relay_log_info::write_info");
2237
2238 /*
2239 @todo Uncomment the following assertion. See todo in
2240 Relay_log_info::read_info() for details. /Sven
2241 */
2242 //DBUG_ASSERT(!belongs_to_client());
2243
2244 if (to->prepare_info_for_write() ||
2245 to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2246 to->set_info(group_relay_log_name) ||
2247 to->set_info((ulong) group_relay_log_pos) ||
2248 to->set_info(group_master_log_name) ||
2249 to->set_info((ulong) group_master_log_pos) ||
2250 to->set_info((int) sql_delay) ||
2251 to->set_info(recovery_parallel_workers) ||
2252 to->set_info((int) internal_id))
2253 DBUG_RETURN(TRUE);
2254
2255 DBUG_RETURN(FALSE);
2256 }
2257
2258 /**
2259 Delete the existing event and set a new one. This class is
2260 responsible for freeing the event, the caller should not do that.
2261 When a new FD is from the master adaptation routine is invoked
2262 to align the slave applier execution context with the master version.
2263
2264 The method is run by SQL thread/MTS Coordinator.
2265 Although notice that MTS worker runs it, inefficiently (see assert),
2266 once at its destruction time.
2267 todo: fix Slave_worker and Relay_log_info inheritance relation.
2268
2269 @param a pointer to be installed into execution context
2270 FormatDescriptor event
2271 */
2272
set_rli_description_event(Format_description_log_event * fe)2273 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2274 {
2275 DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2276
2277 if (fe)
2278 {
2279 adapt_to_master_version(fe);
2280 if (info_thd && is_parallel_exec())
2281 {
2282 for (uint i= 0; i < workers.elements; i++)
2283 {
2284 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
2285 mysql_mutex_lock(&w->jobs_lock);
2286 if (w->running_status == Slave_worker::RUNNING)
2287 w->set_rli_description_event(fe);
2288 mysql_mutex_unlock(&w->jobs_lock);
2289 }
2290 }
2291 }
2292 delete rli_description_event;
2293 rli_description_event= fe;
2294 }
2295
2296 struct st_feature_version
2297 {
2298 /*
2299 The enum must be in the version non-descending top-down order,
2300 the last item formally corresponds to highest possible server
2301 version (never reached, thereby no adapting actions here);
2302 enumeration starts from zero.
2303 */
2304 enum
2305 {
2306 WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2307 _END_OF_LIST // always last
2308 } item;
2309 /*
2310 Version where the feature is introduced.
2311 */
2312 uchar version_split[3];
2313 /*
2314 Action to perform when according to FormatDescriptor event Master
2315 is found to be feature-aware while previously it has *not* been.
2316 */
2317 void (*upgrade) (THD*);
2318 /*
2319 Action to perform when according to FormatDescriptor event Master
2320 is found to be feature-*un*aware while previously it has been.
2321 */
2322 void (*downgrade) (THD*);
2323 };
2324
wl6292_upgrade_func(THD * thd)2325 void wl6292_upgrade_func(THD *thd)
2326 {
2327 thd->variables.explicit_defaults_for_timestamp= false;
2328 if (global_system_variables.explicit_defaults_for_timestamp)
2329 thd->variables.explicit_defaults_for_timestamp= true;
2330
2331 return;
2332 }
2333
wl6292_downgrade_func(THD * thd)2334 void wl6292_downgrade_func(THD *thd)
2335 {
2336 if (global_system_variables.explicit_defaults_for_timestamp)
2337 thd->variables.explicit_defaults_for_timestamp= false;
2338
2339 return;
2340 }
2341
2342 /**
2343 Sensitive to Master-vs-Slave version difference features
2344 should be listed in the version non-descending order.
2345 */
2346 static st_feature_version s_features[]=
2347 {
2348 // order is the same as in the enum
2349 { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2350 {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2351 { st_feature_version::_END_OF_LIST,
2352 {255, 255, 255}, NULL, NULL }
2353 };
2354
2355 /**
2356 The method lists rules of adaptation for the slave applier
2357 to specific master versions.
2358 It's executed right before a new master FD is set for
2359 slave appliers execution context.
2360 Comparison of the old and new version yields the adaptive
2361 actions direction.
2362 Current execution FD's version, V_0, is compared with the new being set up
2363 FD (the arg), let's call it V_1.
2364 In the case of downgrade features that are defined in [V_0, V_1-1] range
2365 (V_1 excluded) are "removed" by running the downgrade actions.
2366 In the upgrade case the featured defined in [V_0 + 1, V_1] range are
2367 "added" by running the upgrade actions.
2368
2369 Notice, that due to relay log may have two FD events, one the slave local
2370 and the other from the Master. That can lead to extra
2371 adapt_to_master_version() calls and in case Slave and Master are of different
2372 versions the extra two calls should compensate each other.
2373
2374 Also, at composing downgrade/upgrade actions keep in mind that
2375 at initialization Slave sets up FD of version 4.0 and then transits to
2376 the current server version. At transition all upgrading actions in
2377 the range of [4.0..current] are run.
2378
2379 @param fdle a pointer to new Format Description event that is being set
2380 up for execution context.
2381 */
adapt_to_master_version(Format_description_log_event * fdle)2382 void Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2383 {
2384 THD *thd=info_thd;
2385 ulong master_version, current_version;
2386 int changed= !fdle || ! rli_description_event ? 0 :
2387 (master_version= fdle->get_version_product()) -
2388 (current_version= rli_description_event->get_version_product());
2389
2390 /* When the last version is not changed nothing to adapt for */
2391 if (!changed)
2392 return;
2393
2394 /*
2395 find item starting from and ending at for which adaptive actions run
2396 for downgrade or upgrade branches.
2397 (todo: convert into bsearch when number of features will grow significantly)
2398 */
2399 bool downgrade= changed < 0;
2400 long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2401
2402 for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2403 {
2404 ulong ver_f= version_product(s_features[i].version_split);
2405
2406 if ((downgrade ? master_version : current_version) < ver_f &&
2407 i_first == st_feature_version::_END_OF_LIST)
2408 i_first= i;
2409 if ((downgrade ? current_version : master_version) < ver_f)
2410 {
2411 i_last= i;
2412 DBUG_ASSERT(i_last >= i_first);
2413 break;
2414 }
2415 }
2416
2417 /*
2418 actions, executed in version non-descending st_feature_version order
2419 */
2420 for (i= i_first; i < i_last; i++)
2421 {
2422 /* Run time check of the st_feature_version items ordering */
2423 DBUG_ASSERT(!i ||
2424 version_product(s_features[i - 1].version_split) <=
2425 version_product(s_features[i].version_split));
2426
2427 DBUG_ASSERT((downgrade ? master_version : current_version) <
2428 version_product(s_features[i].version_split) &&
2429 (downgrade ? current_version : master_version >=
2430 version_product(s_features[i].version_split)));
2431
2432 if (downgrade && s_features[i].downgrade)
2433 {
2434 s_features[i].downgrade(thd);
2435 }
2436 else if (s_features[i].upgrade)
2437 {
2438 s_features[i].upgrade(thd);
2439 }
2440 }
2441 }
2442