1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "rpl_rli.h"
24
25 #include "my_dir.h" // MY_STAT
26 #include "log.h" // sql_print_error
27 #include "log_event.h" // Log_event
28 #include "m_string.h"
29 #include "rpl_group_replication.h" // set_group_replication_retrieved_certifi...
30 #include "rpl_info_factory.h" // Rpl_info_factory
31 #include "rpl_mi.h" // Master_info
32 #include "rpl_msr.h" // channel_map
33 #include "rpl_rli_pdb.h" // Slave_worker
34 #include "sql_base.h" // close_thread_tables
35 #include "strfunc.h" // strconvert
36 #include "transaction.h" // trans_commit_stmt
37 #include "debug_sync.h"
38 #include "pfs_file_provider.h"
39 #include "mysql/psi/mysql_file.h"
40 #include "mutex_lock.h" // Mutex_lock
41
42 #include <algorithm>
43 using std::min;
44 using std::max;
45
46 /*
47 Please every time you add a new field to the relay log info, update
48 what follows. For now, this is just used to get the number of
49 fields.
50 */
51 const char* info_rli_fields[]=
52 {
53 "number_of_lines",
54 "group_relay_log_name",
55 "group_relay_log_pos",
56 "group_master_log_name",
57 "group_master_log_pos",
58 "sql_delay",
59 "number_of_workers",
60 "id",
61 "channel_name"
62 };
63
Relay_log_info(bool is_slave_recovery,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel,bool is_rli_fake)64 Relay_log_info::Relay_log_info(bool is_slave_recovery
65 #ifdef HAVE_PSI_INTERFACE
66 ,PSI_mutex_key *param_key_info_run_lock,
67 PSI_mutex_key *param_key_info_data_lock,
68 PSI_mutex_key *param_key_info_sleep_lock,
69 PSI_mutex_key *param_key_info_thd_lock,
70 PSI_mutex_key *param_key_info_data_cond,
71 PSI_mutex_key *param_key_info_start_cond,
72 PSI_mutex_key *param_key_info_stop_cond,
73 PSI_mutex_key *param_key_info_sleep_cond
74 #endif
75 , uint param_id, const char *param_channel,
76 bool is_rli_fake
77 )
78 :Rpl_info("SQL"
79 #ifdef HAVE_PSI_INTERFACE
80 ,param_key_info_run_lock, param_key_info_data_lock,
81 param_key_info_sleep_lock, param_key_info_thd_lock,
82 param_key_info_data_cond, param_key_info_start_cond,
83 param_key_info_stop_cond, param_key_info_sleep_cond
84 #endif
85 , param_id, param_channel
86 ),
87 replicate_same_server_id(::replicate_same_server_id),
88 cur_log_fd(-1), relay_log(&sync_relaylog_period, SEQ_READ_APPEND),
89 is_relay_log_recovery(is_slave_recovery),
90 save_temporary_tables(0),
91 cur_log_old_open_count(0), error_on_rli_init_info(false),
92 group_relay_log_pos(0), event_relay_log_number(0),
93 event_relay_log_pos(0), event_start_pos(0),
94 m_group_master_log_pos(0),
95 gtid_set(global_sid_map, global_sid_lock),
96 rli_fake(is_rli_fake),
97 gtid_retrieved_initialized(false),
98 is_group_master_log_pos_invalid(false),
99 log_space_total(0), ignore_log_space_limit(0),
100 sql_force_rotate_relay(false),
101 last_master_timestamp(0), slave_skip_counter(0),
102 abort_pos_wait(0), until_condition(UNTIL_NONE),
103 until_log_pos(0),
104 until_sql_gtids(global_sid_map),
105 until_sql_gtids_first_event(true),
106 trans_retries(0), retried_trans(0),
107 tables_to_lock(0), tables_to_lock_count(0),
108 rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
109 workers(PSI_NOT_INSTRUMENTED),
110 workers_array_initialized(false),
111 curr_group_assigned_parts(PSI_NOT_INSTRUMENTED),
112 curr_group_da(PSI_NOT_INSTRUMENTED),
113 slave_parallel_workers(0),
114 exit_counter(0),
115 max_updated_index(0),
116 recovery_parallel_workers(0), checkpoint_seqno(0),
117 checkpoint_group(opt_mts_checkpoint_group),
118 recovery_groups_inited(false), mts_recovery_group_cnt(0),
119 mts_recovery_index(0), mts_recovery_group_seen_begin(0),
120 mts_group_status(MTS_NOT_IN_GROUP),
121 stats_exec_time(0), stats_read_time(0),
122 least_occupied_workers(PSI_NOT_INSTRUMENTED),
123 current_mts_submode(0),
124 reported_unsafe_warning(false), rli_description_event(NULL),
125 commit_order_mngr(NULL),
126 sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
127 long_find_row_note_printed(false),
128 thd_tx_priority(0),
129 m_ignore_write_set_memory_limit(false),
130 m_allow_drop_write_set(false),
131 is_engine_ha_data_detached(false)
132 {
133 DBUG_ENTER("Relay_log_info::Relay_log_info");
134
135 #ifdef HAVE_PSI_INTERFACE
136 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
137 key_RELAYLOG_LOCK_commit,
138 key_RELAYLOG_LOCK_commit_queue,
139 key_RELAYLOG_LOCK_done,
140 key_RELAYLOG_LOCK_flush_queue,
141 key_RELAYLOG_LOCK_log,
142 PSI_NOT_INSTRUMENTED, /* Relaylog doesn't support LOCK_binlog_end_pos */
143 key_RELAYLOG_LOCK_sync,
144 key_RELAYLOG_LOCK_sync_queue,
145 key_RELAYLOG_LOCK_xids,
146 key_RELAYLOG_COND_done,
147 key_RELAYLOG_update_cond,
148 key_RELAYLOG_prep_xids_cond,
149 key_file_relaylog,
150 key_file_relaylog_index,
151 key_file_relaylog_cache,
152 key_file_relaylog_index_cache);
153 #endif
154
155 group_relay_log_name[0]= event_relay_log_name[0]=
156 m_group_master_log_name[0]= 0;
157 until_log_name[0]= ign_master_log_name_end[0]= 0;
158 set_timespec_nsec(&last_clock, 0);
159 memset(&cache_buf, 0, sizeof(cache_buf));
160 cached_charset_invalidate();
161 inited_hash_workers= FALSE;
162 channel_open_temp_tables.atomic_set(0);
163 /*
164 For applier threads, currently_executing_gtid is set to automatic
165 when they are not executing any transaction.
166 */
167 currently_executing_gtid.set_automatic();
168
169 if (!rli_fake)
170 {
171 mysql_mutex_init(key_relay_log_info_log_space_lock,
172 &log_space_lock, MY_MUTEX_INIT_FAST);
173 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond);
174 mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
175 MY_MUTEX_INIT_FAST);
176 mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond);
177 mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock,
178 MY_MUTEX_INIT_FAST);
179 mysql_mutex_init(key_mts_temp_table_LOCK, &mts_temp_table_LOCK,
180 MY_MUTEX_INIT_FAST);
181 mysql_mutex_init(key_mts_gaq_LOCK, &mts_gaq_LOCK,
182 MY_MUTEX_INIT_FAST);
183 mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
184
185 relay_log.init_pthread_objects();
186 force_flush_postponed_due_to_split_trans= false;
187 }
188 do_server_version_split(::server_version, slave_version_split);
189
190 DBUG_VOID_RETURN;
191 }
192
193 /**
194 The method to invoke at slave threads start
195 */
init_workers(ulong n_workers)196 void Relay_log_info::init_workers(ulong n_workers)
197 {
198 /*
199 Parallel slave parameters initialization is done regardless
200 whether the feature is or going to be active or not.
201 */
202 mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
203 mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
204 mts_total_wait_overlap= 0;
205 mts_total_wait_worker_avail= 0;
206 mts_last_online_stat= 0;
207
208 workers.reserve(n_workers);
209 workers_array_initialized= true; //set after init
210 }
211
212 /**
213 The method to invoke at slave threads stop
214 */
deinit_workers()215 void Relay_log_info::deinit_workers()
216 {
217 workers.clear();
218 }
219
~Relay_log_info()220 Relay_log_info::~Relay_log_info()
221 {
222 DBUG_ENTER("Relay_log_info::~Relay_log_info");
223
224 if(!rli_fake)
225 {
226 if (recovery_groups_inited)
227 bitmap_free(&recovery_groups);
228 delete current_mts_submode;
229
230 if(workers_copy_pfs.size())
231 {
232 for (int i= static_cast<int>(workers_copy_pfs.size()) - 1; i >= 0; i--)
233 delete workers_copy_pfs[i];
234 workers_copy_pfs.clear();
235 }
236
237 mysql_mutex_destroy(&log_space_lock);
238 mysql_cond_destroy(&log_space_cond);
239 mysql_mutex_destroy(&pending_jobs_lock);
240 mysql_cond_destroy(&pending_jobs_cond);
241 mysql_mutex_destroy(&exit_count_lock);
242 mysql_mutex_destroy(&mts_temp_table_LOCK);
243 mysql_mutex_destroy(&mts_gaq_LOCK);
244 mysql_cond_destroy(&logical_clock_cond);
245 relay_log.cleanup();
246 }
247
248 set_rli_description_event(NULL);
249
250 DBUG_VOID_RETURN;
251 }
252
253 /**
254 Method is called when MTS coordinator senses the relay-log name
255 has been changed.
256 It marks each Worker member with this fact to make an action
257 at time it will distribute a terminal event of a group to the Worker.
258
259 Worker receives the new name at the group commiting phase
260 @c Slave_worker::slave_worker_ends_group().
261 */
reset_notified_relay_log_change()262 void Relay_log_info::reset_notified_relay_log_change()
263 {
264 if (!is_parallel_exec())
265 return;
266 for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
267 {
268 Slave_worker *w= *it;
269 w->relay_log_change_notified= FALSE;
270 }
271 }
272
273 /**
274 This method is called in mts_checkpoint_routine() to mark that each
275 worker is required to adapt to a new checkpoint data whose coordinates
276 are passed to it through GAQ index.
277
278 Worker notices the new checkpoint value at the group commit to reset
279 the current bitmap and starts using the clean bitmap indexed from zero
280 of being reset checkpoint_seqno.
281
282 New seconds_behind_master timestamp is installed.
283
284 @param shift number of bits to shift by Worker due to the
285 current checkpoint change.
286 @param new_ts new seconds_behind_master timestamp value
287 unless zero. Zero could be due to FD event
288 or fake rotate event.
289 @param need_data_lock False if caller has locked @c data_lock
290 @param update_timestamp if true, this function will update the
291 rli->last_master_timestamp.
292 */
reset_notified_checkpoint(ulong shift,time_t new_ts,bool need_data_lock,bool update_timestamp)293 void Relay_log_info::reset_notified_checkpoint(ulong shift, time_t new_ts,
294 bool need_data_lock,
295 bool update_timestamp)
296 {
297 /*
298 If this is not a parallel execution we return immediately.
299 */
300 if (!is_parallel_exec())
301 return;
302
303 for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
304 {
305 Slave_worker *w= *it;
306 /*
307 Reseting the notification information in order to force workers to
308 assign jobs with the new updated information.
309 Notice that the bitmap_shifted is accumulated to indicate how many
310 consecutive jobs were successfully processed.
311
312 The worker when assigning a new job will set the value back to
313 zero.
314 */
315 w->checkpoint_notified= FALSE;
316 w->bitmap_shifted= w->bitmap_shifted + shift;
317 /*
318 Zero shift indicates the caller rotates the master binlog.
319 The new name will be passed to W through the group descriptor
320 during the first post-rotation time scheduling.
321 */
322 if (shift == 0)
323 w->master_log_change_notified= false;
324
325 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
326 "worker->bitmap_shifted --> %lu, worker --> %u.",
327 shift, w->bitmap_shifted,
328 static_cast<unsigned>(it - workers.begin())));
329 }
330 /*
331 There should not be a call where (shift == 0 && checkpoint_seqno != 0).
332 Then the new checkpoint sequence is updated by subtracting the number
333 of consecutive jobs that were successfully processed.
334 */
335 assert(current_mts_submode->get_type() != MTS_PARALLEL_TYPE_DB_NAME ||
336 !(shift == 0 && checkpoint_seqno != 0));
337 checkpoint_seqno= checkpoint_seqno - shift;
338 DBUG_PRINT("mts", ("reset_notified_checkpoint shift --> %lu, "
339 "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
340
341 if (update_timestamp)
342 {
343 if (need_data_lock)
344 mysql_mutex_lock(&data_lock);
345 else
346 mysql_mutex_assert_owner(&data_lock);
347 last_master_timestamp= new_ts;
348 if (need_data_lock)
349 mysql_mutex_unlock(&data_lock);
350 }
351 }
352
353 /**
354 Reset recovery info from Worker info table and
355 mark MTS recovery is completed.
356
357 @return false on success true when @c reset_notified_checkpoint failed.
358 */
mts_finalize_recovery()359 bool Relay_log_info::mts_finalize_recovery()
360 {
361 bool ret= false;
362 uint i;
363 uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
364
365 DBUG_ENTER("Relay_log_info::mts_finalize_recovery");
366
367 for (Slave_worker **it= workers.begin(); !ret && it != workers.end(); ++it)
368 {
369 Slave_worker *w= *it;
370 ret= w->reset_recovery_info();
371 DBUG_EXECUTE_IF("mts_debug_recovery_reset_fails", ret= true;);
372 }
373 /*
374 The loop is traversed in the worker index descending order due
375 to specifics of the Worker table repository that does not like
376 even temporary holes. Therefore stale records are deleted
377 from the tail.
378 */
379 DBUG_EXECUTE_IF("enable_mts_wokrer_failure_in_recovery_finalize",
380 {DBUG_SET("+d,mts_worker_thread_init_fails");});
381 for (i= recovery_parallel_workers; i > workers.size() && !ret; i--)
382 {
383 Slave_worker *w=
384 Rpl_info_factory::create_worker(repo_type, i - 1, this, true);
385 /*
386 If an error occurs during the above create_worker call, the newly created
387 worker object gets deleted within the above function call itself and only
388 NULL is returned. Hence the following check has been added to verify
389 that a valid worker object exists.
390 */
391 if (w)
392 {
393 ret= w->remove_info();
394 delete w;
395 }
396 else
397 {
398 ret= true;
399 goto err;
400 }
401 }
402 recovery_parallel_workers= slave_parallel_workers;
403
404 err:
405 DBUG_RETURN(ret);
406 }
407
mts_workers_queue_empty() const408 bool Relay_log_info::mts_workers_queue_empty() const
409 {
410 ulong ret= 0;
411
412 for (Slave_worker * const *it= workers.begin(); ret == 0 && it != workers.end(); ++it)
413 {
414 Slave_worker *worker= *it;
415 mysql_mutex_lock(&worker->jobs_lock);
416 ret+= worker->curr_jobs;
417 mysql_mutex_unlock(&worker->jobs_lock);
418 }
419 return ret == 0;
420 }
421
422 /* Checks if all in-flight stmts/trx can be safely rolled back */
cannot_safely_rollback() const423 bool Relay_log_info::cannot_safely_rollback() const
424 {
425 if (!is_parallel_exec())
426 return info_thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::SESSION);
427
428 bool ret= false;
429
430 for (Slave_worker * const *it= workers.begin(); !ret && it != workers.end(); ++it)
431 {
432 Slave_worker *worker= *it;
433 mysql_mutex_lock(&worker->jobs_lock);
434 ret= worker->info_thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::SESSION);
435 mysql_mutex_unlock(&worker->jobs_lock);
436 }
437 return ret;
438 }
439
add_relay_log(Relay_log_info * rli,LOG_INFO * linfo)440 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
441 {
442 MY_STAT s;
443 DBUG_ENTER("add_relay_log");
444 mysql_mutex_assert_owner(&rli->log_space_lock);
445 if (!mysql_file_stat(key_file_relaylog,
446 linfo->log_file_name, &s, MYF(0)))
447 {
448 sql_print_error("log %s listed in the index, but failed to stat.",
449 linfo->log_file_name);
450 DBUG_RETURN(1);
451 }
452 rli->log_space_total += s.st_size;
453 #ifndef NDEBUG
454 char buf[22];
455 DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
456 #endif
457 DBUG_RETURN(0);
458 }
459
count_relay_log_space()460 int Relay_log_info::count_relay_log_space()
461 {
462 LOG_INFO flinfo;
463 DBUG_ENTER("Relay_log_info::count_relay_log_space");
464 Mutex_lock lock(&log_space_lock);
465 log_space_total= 0;
466 if (relay_log.find_log_pos(&flinfo, NullS, 1))
467 {
468 sql_print_error("Could not find first log while counting relay log space.");
469 DBUG_RETURN(1);
470 }
471 do
472 {
473 if (add_relay_log(this, &flinfo))
474 DBUG_RETURN(1);
475 } while (!relay_log.find_next_log(&flinfo, 1));
476 /*
477 As we have counted everything, including what may have written in a
478 preceding write, we must reset bytes_written, or we may count some space
479 twice.
480 */
481 relay_log.reset_bytes_written();
482 DBUG_RETURN(0);
483 }
484
485 /**
486 Resets UNTIL condition for Relay_log_info
487 */
488
clear_until_condition()489 void Relay_log_info::clear_until_condition()
490 {
491 DBUG_ENTER("clear_until_condition");
492
493 until_condition= Relay_log_info::UNTIL_NONE;
494 until_log_name[0]= 0;
495 until_log_pos= 0;
496 until_sql_gtids.clear();
497 until_sql_gtids_first_event= true;
498 DBUG_VOID_RETURN;
499 }
500
501 /**
502 Opens and intialize the given relay log. Specifically, it does what follows:
503
504 - Closes old open relay log files.
505 - If we are using the same relay log as the running IO-thread, then sets.
506 rli->cur_log to point to the same IO_CACHE entry.
507 - If not, opens the 'log' binary file.
508
509 @todo check proper initialization of
510 group_master_log_name/group_master_log_pos. /alfranio
511
512 @param rli[in] Relay information (will be initialized)
513 @param log[in] Name of relay log file to read from. NULL = First log
514 @param pos[in] Position in relay log file
515 @param need_data_lock[in] If true, this function will acquire the
516 relay_log.data_lock(); otherwise the caller should already have
517 acquired it.
518 @param errmsg[out] On error, this function will store a pointer to
519 an error message here
520 @param keep_looking_for_fd[in] If true, this function will
521 look for a Format_description_log_event. We only need this when the
522 SQL thread starts and opens an existing relay log and has to execute
523 it (possibly from an offset >4); then we need to read the first
524 event of the relay log to be able to parse the events we have to
525 execute.
526
527 @retval 0 ok,
528 @retval 1 error. In this case, *errmsg is set to point to the error
529 message.
530 */
531
init_relay_log_pos(const char * log,ulonglong pos,bool need_data_lock,const char ** errmsg,bool keep_looking_for_fd)532 int Relay_log_info::init_relay_log_pos(const char* log,
533 ulonglong pos, bool need_data_lock,
534 const char** errmsg,
535 bool keep_looking_for_fd)
536 {
537 DBUG_ENTER("Relay_log_info::init_relay_log_pos");
538 DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
539
540 *errmsg=0;
541 const char* errmsg_fmt= 0;
542 static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
543 mysql_mutex_t *log_lock= relay_log.get_log_lock();
544
545 if (need_data_lock)
546 mysql_mutex_lock(&data_lock);
547 else
548 mysql_mutex_assert_owner(&data_lock);
549
550 /*
551 By default the relay log is in binlog format 3 (4.0).
552 Even if format is 4, this will work enough to read the first event
553 (Format_desc) (remember that format 4 is just lenghtened compared to format
554 3; format 3 is a prefix of format 4).
555 */
556 set_rli_description_event(new Format_description_log_event(3));
557
558 mysql_mutex_lock(log_lock);
559
560 /* Close log file and free buffers if it's already open */
561 if (cur_log_fd >= 0)
562 {
563 end_io_cache(&cache_buf);
564 mysql_file_close(cur_log_fd, MYF(MY_WME));
565 cur_log_fd = -1;
566 }
567
568 group_relay_log_pos= event_relay_log_pos= pos;
569
570 /*
571 Test to see if the previous run was with the skip of purging
572 If yes, we do not purge when we restart
573 */
574 if (relay_log.find_log_pos(&linfo, NullS, 1))
575 {
576 *errmsg="Could not find first log during relay log initialization";
577 goto err;
578 }
579
580 if (log && relay_log.find_log_pos(&linfo, log, 1))
581 {
582 errmsg_fmt= "Could not find target log file mentioned in "
583 "relay log info in the index file '%s' during "
584 "relay log initialization";
585 sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
586 *errmsg= errmsg_buff;
587 goto err;
588 }
589
590 set_group_relay_log_name(linfo.log_file_name);
591 set_event_relay_log_name(linfo.log_file_name);
592
593 if (relay_log.is_active(linfo.log_file_name))
594 {
595 /*
596 The IO thread is using this log file.
597 In this case, we will use the same IO_CACHE pointer to
598 read data as the IO thread is using to write data.
599 */
600 my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
601 if (check_binlog_magic(cur_log, errmsg))
602 goto err;
603 cur_log_old_open_count=relay_log.get_open_count();
604 }
605 else
606 {
607 /*
608 Open the relay log and set cur_log to point at this one
609 */
610 if ((cur_log_fd=open_binlog_file(&cache_buf,
611 linfo.log_file_name,errmsg)) < 0)
612 goto err;
613 cur_log = &cache_buf;
614 }
615 /*
616 In all cases, check_binlog_magic() has been called so we're at offset 4 for
617 sure.
618 */
619 if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
620 {
621 Log_event* ev;
622 while (keep_looking_for_fd)
623 {
624 /*
625 Read the possible Format_description_log_event; if position
626 was 4, no need, it will be read naturally.
627 */
628 DBUG_PRINT("info",("looking for a Format_description_log_event"));
629
630 if (my_b_tell(cur_log) >= pos)
631 break;
632
633 /*
634 Because of we have data_lock and log_lock, we can safely read an
635 event
636 */
637 if (!(ev= Log_event::read_log_event(cur_log, 0,
638 rli_description_event,
639 opt_slave_sql_verify_checksum)))
640 {
641 DBUG_PRINT("info",("could not read event, cur_log->error=%d",
642 cur_log->error));
643 if (cur_log->error) /* not EOF */
644 {
645 *errmsg= "I/O error reading event at position 4";
646 goto err;
647 }
648 break;
649 }
650 else if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT)
651 {
652 Format_description_log_event *old= rli_description_event;
653 DBUG_PRINT("info",("found Format_description_log_event"));
654 Format_description_log_event *new_fdev=
655 static_cast<Format_description_log_event*>(ev);
656 new_fdev->copy_crypto_data(*old);
657 set_rli_description_event(new_fdev);
658 /*
659 As ev was returned by read_log_event, it has passed is_valid(), so
660 my_malloc() in ctor worked, no need to check again.
661 */
662 /*
663 Ok, we found a Format_description event. But it is not sure that this
664 describes the whole relay log; indeed, one can have this sequence
665 (starting from position 4):
666 Format_desc (of slave)
667 Previous-GTIDs (of slave IO thread, if GTIDs are enabled)
668 Rotate (of master)
669 Format_desc (of master)
670 So the Format_desc which really describes the rest of the relay log
671 can be the 3rd or the 4th event (depending on GTIDs being enabled or
672 not, it can't be further than that, because we rotate
673 the relay log when we queue a Rotate event from the master).
674 But what describes the Rotate is the first Format_desc.
675 So what we do is:
676 go on searching for Format_description events, until you exceed the
677 position (argument 'pos') or until you find an event other than
678 Previous-GTIDs, Rotate or Format_desc.
679 */
680 }
681 else if (ev->get_type_code() == binary_log::START_ENCRYPTION_EVENT)
682 {
683 if (rli_description_event->start_decryption(static_cast<Start_encryption_log_event*>(ev)))
684 {
685 *errmsg= "Unable to set up decryption of binlog.";
686 delete ev;
687 goto err;
688 }
689 delete ev;
690 }
691 else
692 {
693 DBUG_PRINT("info",("found event of another type=%d",
694 ev->get_type_code()));
695 keep_looking_for_fd=
696 (ev->get_type_code() == binary_log::ROTATE_EVENT ||
697 ev->get_type_code() == binary_log::PREVIOUS_GTIDS_LOG_EVENT);
698 delete ev;
699 }
700 }
701 my_b_seek(cur_log,(off_t)pos);
702 #ifndef NDEBUG
703 {
704 char llbuf1[22], llbuf2[22];
705 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
706 llstr(my_b_tell(cur_log),llbuf1),
707 llstr(get_event_relay_log_pos(),llbuf2)));
708 }
709 #endif
710
711 }
712
713 err:
714 /*
715 If we don't purge, we can't honour relay_log_space_limit ;
716 silently discard it
717 */
718 if (!relay_log_purge)
719 {
720 log_space_limit= 0; // todo: consider to throw a warning at least
721 }
722 mysql_cond_broadcast(&data_cond);
723
724 mysql_mutex_unlock(log_lock);
725
726 if (need_data_lock)
727 mysql_mutex_unlock(&data_lock);
728 if (!rli_description_event->is_valid() && !*errmsg)
729 *errmsg= "Invalid Format_description log event; could be out of memory";
730
731 DBUG_RETURN ((*errmsg) ? 1 : 0);
732 }
733
734 /**
735 Update the error number, message and timestamp fields. This function is
736 different from va_report() as va_report() also logs the error message in the
737 log apart from updating the error fields.
738
739 SYNOPSIS
740 @param[in] level specifies the level- error, warning or information,
741 @param[in] err_code error number,
742 @param[in] buff_coord error message to be used.
743
744 */
fill_coord_err_buf(loglevel level,int err_code,const char * buff_coord) const745 void Relay_log_info::fill_coord_err_buf(loglevel level, int err_code,
746 const char *buff_coord) const
747 {
748 mysql_mutex_lock(&err_lock);
749
750 if(level == ERROR_LEVEL)
751 {
752 m_last_error.number = err_code;
753 my_snprintf(m_last_error.message, sizeof(m_last_error.message), "%.*s",
754 MAX_SLAVE_ERRMSG - 1, buff_coord);
755 m_last_error.update_timestamp();
756 }
757
758 mysql_mutex_unlock(&err_lock);
759 }
760
761 /**
762 Waits until the SQL thread reaches (has executed up to) the
763 log/position or timed out.
764
765 SYNOPSIS
766 @param[in] thd client thread that sent @c SELECT @c MASTER_POS_WAIT,
767 @param[in] log_name log name to wait for,
768 @param[in] log_pos position to wait for,
769 @param[in] timeout @c timeout in seconds before giving up waiting.
770 @c timeout is double whereas it should be ulong; but this is
771 to catch if the user submitted a negative timeout.
772
773 @retval -2 improper arguments (log_pos<0)
774 or slave not running, or master info changed
775 during the function's execution,
776 or client thread killed. -2 is translated to NULL by caller,
777 @retval -1 timed out
778 @retval >=0 number of log events the function had to wait
779 before reaching the desired log/position
780 */
781
wait_for_pos(THD * thd,String * log_name,longlong log_pos,double timeout)782 int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
783 longlong log_pos,
784 double timeout)
785 {
786 int event_count = 0;
787 ulong init_abort_pos_wait;
788 int error=0;
789 struct timespec abstime; // for timeout checking
790 PSI_stage_info old_stage;
791 DBUG_ENTER("Relay_log_info::wait_for_pos");
792
793 if (!inited)
794 DBUG_RETURN(-2);
795
796 DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
797 log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
798
799 DEBUG_SYNC(thd, "begin_master_pos_wait");
800
801 set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
802 mysql_mutex_lock(&data_lock);
803 thd->ENTER_COND(&data_cond, &data_lock,
804 &stage_waiting_for_the_slave_thread_to_advance_position,
805 &old_stage);
806 /*
807 This function will abort when it notices that some CHANGE MASTER or
808 RESET MASTER has changed the master info.
809 To catch this, these commands modify abort_pos_wait ; We just monitor
810 abort_pos_wait and see if it has changed.
811 Why do we have this mechanism instead of simply monitoring slave_running
812 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
813 the SQL thread be stopped?
814 This is becasue if someones does:
815 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
816 the change may happen very quickly and we may not notice that
817 slave_running briefly switches between 1/0/1.
818 */
819 init_abort_pos_wait= abort_pos_wait;
820
821 /*
822 We'll need to
823 handle all possible log names comparisons (e.g. 999 vs 1000).
824 We use ulong for string->number conversion ; this is no
825 stronger limitation than in find_uniq_filename in sql/log.cc
826 */
827 ulong log_name_extension;
828 char log_name_tmp[FN_REFLEN]; //make a char[] from String
829
830 strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
831
832 char *p= fn_ext(log_name_tmp);
833 char *p_end;
834 if (!*p || log_pos<0)
835 {
836 error= -2; //means improper arguments
837 goto err;
838 }
839 // Convert 0-3 to 4
840 log_pos= max(log_pos, static_cast<longlong>(BIN_LOG_HEADER_SIZE));
841 /* p points to '.' */
842 log_name_extension= strtoul(++p, &p_end, 10);
843 /*
844 p_end points to the first invalid character.
845 If it equals to p, no digits were found, error.
846 If it contains '\0' it means conversion went ok.
847 */
848 if (p_end==p || *p_end)
849 {
850 error= -2;
851 goto err;
852 }
853
854 /* The "compare and wait" main loop */
855 while (!thd->killed &&
856 init_abort_pos_wait == abort_pos_wait &&
857 slave_running)
858 {
859 bool pos_reached;
860 int cmp_result= 0;
861 const char * const group_master_log_name= get_group_master_log_name();
862 const ulonglong group_master_log_pos= get_group_master_log_pos();
863
864 DBUG_PRINT("info",
865 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
866 init_abort_pos_wait, abort_pos_wait));
867 DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
868 group_master_log_name, (ulong) group_master_log_pos));
869
870 /*
871 group_master_log_name can be "", if we are just after a fresh
872 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
873 (before we have executed one Rotate event from the master) or
874 (rare) if the user is doing a weird slave setup (see next
875 paragraph). If group_master_log_name is "", we assume we don't
876 have enough info to do the comparison yet, so we just wait until
877 more data. In this case master_log_pos is always 0 except if
878 somebody (wrongly) sets this slave to be a slave of itself
879 without using --replicate-same-server-id (an unsupported
880 configuration which does nothing), then group_master_log_pos
881 will grow and group_master_log_name will stay "".
882 Also in case the group master log position is invalid (e.g. after
883 CHANGE MASTER TO RELAY_LOG_POS ), we will wait till the first event
884 is read and the log position is valid again.
885 */
886 if (*group_master_log_name && !is_group_master_log_pos_invalid)
887 {
888 const char * const basename= (group_master_log_name +
889 dirname_length(group_master_log_name));
890 /*
891 First compare the parts before the extension.
892 Find the dot in the master's log basename,
893 and protect against user's input error :
894 if the names do not match up to '.' included, return error
895 */
896 char *q= (fn_ext(basename)+1);
897 if (strncmp(basename, log_name_tmp, (int)(q-basename)))
898 {
899 error= -2;
900 break;
901 }
902 // Now compare extensions.
903 char *q_end;
904 ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
905 if (group_master_log_name_extension < log_name_extension)
906 cmp_result= -1 ;
907 else
908 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
909
910 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
911 cmp_result > 0);
912 if (pos_reached || thd->killed)
913 break;
914 }
915
916 //wait for master update, with optional timeout.
917
918 DBUG_PRINT("info",("Waiting for master update"));
919 /*
920 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
921 will wake us up.
922 */
923 thd_wait_begin(thd, THD_WAIT_BINLOG);
924 if (timeout > 0)
925 {
926 /*
927 Note that mysql_cond_timedwait checks for the timeout
928 before for the condition ; i.e. it returns ETIMEDOUT
929 if the system time equals or exceeds the time specified by abstime
930 before the condition variable is signaled or broadcast, _or_ if
931 the absolute time specified by abstime has already passed at the time
932 of the call.
933 For that reason, mysql_cond_timedwait will do the "timeoutting" job
934 even if its condition is always immediately signaled (case of a loaded
935 master).
936 */
937 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
938 }
939 else
940 mysql_cond_wait(&data_cond, &data_lock);
941 thd_wait_end(thd);
942 DBUG_PRINT("info",("Got signal of master update or timed out"));
943 if (error == ETIMEDOUT || error == ETIME)
944 {
945 #ifndef NDEBUG
946 /*
947 Doing this to generate a stack trace and make debugging
948 easier.
949 */
950 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
951 assert(0);
952 #endif
953 error= -1;
954 break;
955 }
956 error=0;
957 event_count++;
958 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
959 }
960
961 err:
962 mysql_mutex_unlock(&data_lock);
963 thd->EXIT_COND(&old_stage);
964 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
965 improper_arguments: %d timed_out: %d",
966 thd->killed_errno(),
967 (int) (init_abort_pos_wait != abort_pos_wait),
968 (int) slave_running,
969 (int) (error == -2),
970 (int) (error == -1)));
971 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
972 !slave_running)
973 {
974 error= -2;
975 }
976 DBUG_RETURN( error ? error : event_count );
977 }
978
wait_for_gtid_set(THD * thd,String * gtid,double timeout)979 int Relay_log_info::wait_for_gtid_set(THD* thd, String* gtid,
980 double timeout)
981 {
982 DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, String, timeout)");
983
984 DBUG_PRINT("info", ("Waiting for %s timeout %lf", gtid->c_ptr_safe(),
985 timeout));
986
987 Gtid_set wait_gtid_set(global_sid_map);
988 global_sid_lock->rdlock();
989
990 if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
991 {
992 global_sid_lock->unlock();
993 DBUG_PRINT("exit",("improper gtid argument"));
994 DBUG_RETURN(-2);
995
996 }
997 global_sid_lock->unlock();
998
999 DBUG_RETURN(wait_for_gtid_set(thd, &wait_gtid_set, timeout));
1000 }
1001
1002 /*
1003 TODO: This is a duplicated code that needs to be simplified.
1004 This will be done while developing all possible sync options.
1005 See WL#3584's specification.
1006
1007 /Alfranio
1008 */
wait_for_gtid_set(THD * thd,const Gtid_set * wait_gtid_set,double timeout)1009 int Relay_log_info::wait_for_gtid_set(THD* thd, const Gtid_set* wait_gtid_set,
1010 double timeout)
1011 {
1012 int event_count = 0;
1013 ulong init_abort_pos_wait;
1014 int error=0;
1015 struct timespec abstime; // for timeout checking
1016 PSI_stage_info old_stage;
1017 DBUG_ENTER("Relay_log_info::wait_for_gtid_set(thd, gtid_set, timeout)");
1018
1019 if (!inited)
1020 DBUG_RETURN(-2);
1021
1022 DEBUG_SYNC(thd, "begin_wait_for_gtid_set");
1023
1024 set_timespec_nsec(&abstime, static_cast<ulonglong>(timeout * 1000000000ULL));
1025
1026 mysql_mutex_lock(&data_lock);
1027 thd->ENTER_COND(&data_cond, &data_lock,
1028 &stage_waiting_for_the_slave_thread_to_advance_position,
1029 &old_stage);
1030 /*
1031 This function will abort when it notices that some CHANGE MASTER or
1032 RESET MASTER has changed the master info.
1033 To catch this, these commands modify abort_pos_wait ; We just monitor
1034 abort_pos_wait and see if it has changed.
1035 Why do we have this mechanism instead of simply monitoring slave_running
1036 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
1037 the SQL thread be stopped?
1038 This is becasue if someones does:
1039 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
1040 the change may happen very quickly and we may not notice that
1041 slave_running briefly switches between 1/0/1.
1042 */
1043 init_abort_pos_wait= abort_pos_wait;
1044
1045 /* The "compare and wait" main loop */
1046 while (!thd->killed &&
1047 init_abort_pos_wait == abort_pos_wait &&
1048 slave_running)
1049 {
1050 DBUG_PRINT("info",
1051 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
1052 init_abort_pos_wait, abort_pos_wait));
1053
1054 //wait for master update, with optional timeout.
1055
1056 global_sid_lock->wrlock();
1057 const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1058 const Owned_gtids* owned_gtids= gtid_state->get_owned_gtids();
1059
1060 char *wait_gtid_set_buf;
1061 wait_gtid_set->to_string(&wait_gtid_set_buf);
1062 DBUG_PRINT("info", ("Waiting for '%s'. is_subset: %d and "
1063 "!is_intersection_nonempty: %d",
1064 wait_gtid_set_buf,
1065 wait_gtid_set->is_subset(executed_gtids),
1066 !owned_gtids->is_intersection_nonempty(wait_gtid_set)));
1067 my_free(wait_gtid_set_buf);
1068 executed_gtids->dbug_print("gtid_executed:");
1069 owned_gtids->dbug_print("owned_gtids:");
1070
1071 /*
1072 Since commit is performed after log to binary log, we must also
1073 check if any GTID of wait_gtid_set is not yet committed.
1074 */
1075 if (wait_gtid_set->is_subset(executed_gtids) &&
1076 !owned_gtids->is_intersection_nonempty(wait_gtid_set))
1077 {
1078 global_sid_lock->unlock();
1079 break;
1080 }
1081 global_sid_lock->unlock();
1082
1083 DBUG_PRINT("info",("Waiting for master update"));
1084
1085 /*
1086 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
1087 will wake us up.
1088 */
1089 thd_wait_begin(thd, THD_WAIT_BINLOG);
1090 if (timeout > 0)
1091 {
1092 /*
1093 Note that mysql_cond_timedwait checks for the timeout
1094 before for the condition ; i.e. it returns ETIMEDOUT
1095 if the system time equals or exceeds the time specified by abstime
1096 before the condition variable is signaled or broadcast, _or_ if
1097 the absolute time specified by abstime has already passed at the time
1098 of the call.
1099 For that reason, mysql_cond_timedwait will do the "timeoutting" job
1100 even if its condition is always immediately signaled (case of a loaded
1101 master).
1102 */
1103 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
1104 }
1105 else
1106 mysql_cond_wait(&data_cond, &data_lock);
1107 thd_wait_end(thd);
1108 DBUG_PRINT("info",("Got signal of master update or timed out"));
1109 if (error == ETIMEDOUT || error == ETIME)
1110 {
1111 #ifndef NDEBUG
1112 /*
1113 Doing this to generate a stack trace and make debugging
1114 easier.
1115 */
1116 if (DBUG_EVALUATE_IF("debug_crash_slave_time_out", 1, 0))
1117 assert(0);
1118 #endif
1119 error= -1;
1120 break;
1121 }
1122 error=0;
1123 event_count++;
1124 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
1125 }
1126
1127 mysql_mutex_unlock(&data_lock);
1128 thd->EXIT_COND(&old_stage);
1129 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
1130 improper_arguments: %d timed_out: %d",
1131 thd->killed_errno(),
1132 (int) (init_abort_pos_wait != abort_pos_wait),
1133 (int) slave_running,
1134 (int) (error == -2),
1135 (int) (error == -1)));
1136 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
1137 !slave_running)
1138 {
1139 error= -2;
1140 }
1141 DBUG_RETURN( error ? error : event_count );
1142 }
1143
inc_group_relay_log_pos(ulonglong log_pos,bool need_data_lock)1144 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
1145 bool need_data_lock)
1146 {
1147 int error= 0;
1148 DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
1149
1150 if (need_data_lock)
1151 {
1152 const ulong timeout= info_thd->variables.lock_wait_timeout;
1153
1154 /*
1155 Acquire protection against global BINLOG lock before rli->data_lock is
1156 locked (otherwise we would also block SHOW SLAVE STATUS).
1157 */
1158 assert(!info_thd->backup_binlog_lock.is_acquired());
1159 DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
1160 mysql_mutex_assert_not_owner(&data_lock);
1161 if (info_thd->backup_binlog_lock.acquire_protection(info_thd, MDL_EXPLICIT,
1162 timeout))
1163 DBUG_RETURN(1);
1164
1165 mysql_mutex_lock(&data_lock);
1166 }
1167 else
1168 {
1169 mysql_mutex_assert_owner(&data_lock);
1170 assert(info_thd->backup_binlog_lock.is_protection_acquired());
1171 }
1172
1173 inc_event_relay_log_pos();
1174 group_relay_log_pos= event_relay_log_pos;
1175 strmake(group_relay_log_name,event_relay_log_name,
1176 sizeof(group_relay_log_name)-1);
1177
1178 notify_group_relay_log_name_update();
1179
1180 /*
1181 In 4.x we used the event's len to compute the positions here. This is
1182 wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1183 then the event's len is not what is was in the master's binlog, so this
1184 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1185 replication: Exec_master_log_pos is wrong). Only way to solve this is to
1186 have the original offset of the end of the event the relay log. This is
1187 what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1188 of log_pos in 4.0 was to compute the end_log_pos; so better to store
1189 end_log_pos instead of begin_log_pos.
1190 If we had not done this fix here, the problem would also have appeared
1191 when the slave and master are 5.0 but with different event length (for
1192 example the slave is more recent than the master and features the event
1193 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1194 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1195 value which would lead to badly broken replication.
1196 Even the relay_log_pos will be corrupted in this case, because the len is
1197 the relay log is not "val".
1198 With the end_log_pos solution, we avoid computations involving lengthes.
1199 */
1200 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
1201 (long) log_pos, (long) get_group_master_log_pos()));
1202
1203 if (log_pos > 0) // 3.23 binlogs don't have log_posx
1204 set_group_master_log_pos(log_pos);
1205 /*
1206 If the master log position was invalidiated by say, "CHANGE MASTER TO
1207 RELAY_LOG_POS=N", it is now valid,
1208 */
1209 if (is_group_master_log_pos_invalid)
1210 is_group_master_log_pos_invalid= false;
1211
1212 /*
1213 In MTS mode FD or Rotate event commit their solitary group to
1214 Coordinator's info table. Callers make sure that Workers have been
1215 executed all assignements.
1216 Broadcast to master_pos_wait() waiters should be done after
1217 the table is updated.
1218 */
1219 assert(!is_parallel_exec() ||
1220 mts_group_status != Relay_log_info::MTS_IN_GROUP);
1221 /*
1222 We do not force synchronization at this point, note the
1223 parameter false, because a non-transactional change is
1224 being committed.
1225
1226 For that reason, the synchronization here is subjected to
1227 the option sync_relay_log_info.
1228
1229 See sql/rpl_rli.h for further information on this behavior.
1230 */
1231 error= flush_info(FALSE);
1232
1233 mysql_cond_broadcast(&data_cond);
1234 if (need_data_lock)
1235 {
1236 mysql_mutex_unlock(&data_lock);
1237
1238 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1239 info_thd->backup_binlog_lock.release_protection(info_thd);
1240 }
1241
1242 DBUG_RETURN(error);
1243 }
1244
1245
close_temporary_tables()1246 void Relay_log_info::close_temporary_tables()
1247 {
1248 TABLE *table,*next;
1249 int num_closed_temp_tables= 0;
1250 DBUG_ENTER("Relay_log_info::close_temporary_tables");
1251
1252 for (table=save_temporary_tables ; table ; table=next)
1253 {
1254 next=table->next;
1255 /*
1256 Don't ask for disk deletion. For now, anyway they will be deleted when
1257 slave restarts, but it is a better intention to not delete them.
1258 */
1259 DBUG_PRINT("info", ("table: 0x%lx", (long) table));
1260 close_temporary(table, 1, 0);
1261 num_closed_temp_tables++;
1262 }
1263 save_temporary_tables= 0;
1264 slave_open_temp_tables.atomic_add(-num_closed_temp_tables);
1265 channel_open_temp_tables.atomic_add(-num_closed_temp_tables);
1266 DBUG_VOID_RETURN;
1267 }
1268
1269 /**
1270 Purges relay logs. It assumes to have a run lock on rli and that no
1271 slave thread are running.
1272
1273 @param[in] THD connection,
1274 @param[in] just_reset if false, it tells that logs should be purged
1275 and @c init_relay_log_pos() should be called,
1276 @errmsg[out] errmsg store pointer to an error message.
1277
1278 @retval 0 successfuly executed,
1279 @retval 1 otherwise error, where errmsg is set to point to the error message.
1280 */
1281
purge_relay_logs(THD * thd,bool just_reset,const char ** errmsg,bool delete_only)1282 int Relay_log_info::purge_relay_logs(THD *thd, bool just_reset,
1283 const char** errmsg, bool delete_only)
1284 {
1285 int error=0;
1286 const char *ln;
1287 /* name of the index file if opt_relaylog_index_name is set*/
1288 const char* log_index_name;
1289 /*
1290 Buffer to add channel name suffix when relay-log-index option is
1291 provided
1292 */
1293 char relay_bin_index_channel[FN_REFLEN];
1294
1295 const char *ln_without_channel_name;
1296 /*
1297 Buffer to add channel name suffix when relay-log option is provided.
1298 */
1299 char relay_bin_channel[FN_REFLEN];
1300
1301 char buffer[FN_REFLEN];
1302
1303 mysql_mutex_t *log_lock= relay_log.get_log_lock();
1304
1305 DBUG_ENTER("Relay_log_info::purge_relay_logs");
1306 bool binlog_prot_acquired= false;
1307
1308 /*
1309 Even if inited==0, we still try to empty master_log_* variables. Indeed,
1310 inited==0 does not imply that they already are empty.
1311
1312 It could be that slave's info initialization partly succeeded: for example
1313 if relay-log.info existed but *relay-bin*.* have been manually removed,
1314 init_info reads the old relay-log.info and fills rli->master_log_*, then
1315 init_info checks for the existence of the relay log, this fails and
1316 init_info leaves inited to 0.
1317 In that pathological case, master_log_pos* will be properly reinited at
1318 the next START SLAVE (as RESET SLAVE or CHANGE MASTER, the callers of
1319 purge_relay_logs, will delete bogus *.info files or replace them with
1320 correct files), however if the user does SHOW SLAVE STATUS before START
1321 SLAVE, he will see old, confusing master_log_*. In other words, we reinit
1322 master_log_* for SHOW SLAVE STATUS to display fine in any case.
1323 */
1324
1325 if (!thd->backup_binlog_lock.is_acquired())
1326 {
1327 const ulong timeout= thd->variables.lock_wait_timeout;
1328
1329 DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
1330 mysql_mutex_assert_not_owner(&data_lock);
1331 if (thd->backup_binlog_lock.acquire_protection(thd, MDL_EXPLICIT,
1332 timeout))
1333 DBUG_RETURN(1);
1334
1335 binlog_prot_acquired= true;
1336 }
1337
1338 set_group_master_log_name("");
1339 set_group_master_log_pos(0);
1340
1341 /*
1342 Following the the relay log purge, the master_log_pos will be in sync
1343 with relay_log_pos, so the flag should be cleared. Refer bug#11766010.
1344 */
1345
1346 is_group_master_log_pos_invalid= false;
1347
1348 if (!inited)
1349 {
1350 DBUG_PRINT("info", ("inited == 0"));
1351 if (error_on_rli_init_info ||
1352 /*
1353 mi->reset means that the channel was reset but still exists. Channel
1354 shall have the index and the first relay log file.
1355
1356 Those files shall be remove in a following RESET SLAVE ALL (even when
1357 channel was not inited again).
1358 */
1359 (mi->reset && delete_only))
1360 {
1361 ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
1362 "-relay-bin", buffer);
1363
1364 ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
1365 ln_without_channel_name);
1366 if (opt_relaylog_index_name)
1367 {
1368 char index_file_withoutext[FN_REFLEN];
1369 relay_log.generate_name(opt_relaylog_index_name,"",
1370 index_file_withoutext);
1371
1372 log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
1373 FN_REFLEN,
1374 index_file_withoutext);
1375 }
1376 else
1377 log_index_name= 0;
1378
1379 if (relay_log.open_index_file(log_index_name, ln, TRUE))
1380 {
1381 sql_print_error("Unable to purge relay log files. Failed to open relay "
1382 "log index file:%s.", relay_log.get_index_fname());
1383 if (binlog_prot_acquired)
1384 {
1385 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1386 thd->backup_binlog_lock.release_protection(thd);
1387 }
1388 DBUG_RETURN(1);
1389 }
1390 mysql_mutex_lock(&mi->data_lock);
1391 mysql_mutex_lock(log_lock);
1392 if (relay_log.open_binlog(ln, 0,
1393 (max_relay_log_size ? max_relay_log_size :
1394 max_binlog_size), true,
1395 true/*need_lock_index=true*/,
1396 true/*need_sid_lock=true*/,
1397 mi->get_mi_description_event()))
1398 {
1399 mysql_mutex_unlock(log_lock);
1400 mysql_mutex_unlock(&mi->data_lock);
1401 sql_print_error("Unable to purge relay log files. Failed to open relay "
1402 "log file:%s.", relay_log.get_log_fname());
1403 if (binlog_prot_acquired)
1404 {
1405 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1406 thd->backup_binlog_lock.release_protection(thd);
1407 }
1408 DBUG_RETURN(1);
1409 }
1410 mysql_mutex_unlock(log_lock);
1411 mysql_mutex_unlock(&mi->data_lock);
1412 }
1413 else
1414 {
1415 if (binlog_prot_acquired)
1416 {
1417 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1418 thd->backup_binlog_lock.release_protection(thd);
1419 }
1420 DBUG_RETURN(0);
1421 }
1422 }
1423 else
1424 {
1425 assert(slave_running == 0);
1426 assert(mi->slave_running == 0);
1427 }
1428 /* Reset the transaction boundary parser and clear the last GTID queued */
1429 mi->transaction_parser.reset();
1430 mi->clear_last_gtid_queued();
1431
1432 slave_skip_counter= 0;
1433 mysql_mutex_lock(&data_lock);
1434
1435 /*
1436 we close the relay log fd possibly left open by the slave SQL thread,
1437 to be able to delete it; the relay log fd possibly left open by the slave
1438 I/O thread will be closed naturally in reset_logs() by the
1439 close(LOG_CLOSE_TO_BE_OPENED) call
1440 */
1441 if (cur_log_fd >= 0)
1442 {
1443 end_io_cache(&cache_buf);
1444 my_close(cur_log_fd, MYF(MY_WME));
1445 cur_log_fd= -1;
1446 }
1447
1448 /**
1449 Clear the retrieved gtid set for this channel.
1450 global_sid_lock->wrlock() is needed.
1451 */
1452 global_sid_lock->wrlock();
1453 (const_cast<Gtid_set *>(get_gtid_set()))->clear();
1454 global_sid_lock->unlock();
1455
1456 if (relay_log.reset_logs(thd, delete_only))
1457 {
1458 *errmsg = "Failed during log reset";
1459 error=1;
1460 goto err;
1461 }
1462
1463 /* Save name of used relay log file */
1464 set_group_relay_log_name(relay_log.get_log_fname());
1465 set_event_relay_log_name(relay_log.get_log_fname());
1466 group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1467 if (!delete_only && count_relay_log_space())
1468 {
1469 *errmsg= "Error counting relay log space";
1470 error= 1;
1471 goto err;
1472 }
1473 if (!just_reset)
1474 error= init_relay_log_pos(group_relay_log_name,
1475 group_relay_log_pos,
1476 false/*need_data_lock=false*/, errmsg, 0);
1477 if (!inited && error_on_rli_init_info)
1478 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
1479 true/*need_lock_log=true*/,
1480 true/*need_lock_index=true*/);
1481 err:
1482 #ifndef NDEBUG
1483 char buf[22];
1484 #endif
1485 DBUG_PRINT("info",("log_space_total: %s",llstr(log_space_total,buf)));
1486 mysql_mutex_unlock(&data_lock);
1487
1488 if (binlog_prot_acquired)
1489 {
1490 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
1491 thd->backup_binlog_lock.release_protection(thd);
1492 }
1493
1494 DBUG_RETURN(error);
1495 }
1496
1497 /*
1498 When --relay-bin option is not provided, the names of the
1499 relay log files are host-relay-bin.0000x or
1500 host-relay-bin-CHANNEL.00000x in the case of MSR.
1501 However, if that option is provided, then the names of the
1502 relay log files are <relay-bin-option>.0000x or
1503 <relay-bin-option>-CHANNEL.00000x in the case of MSR.
1504
1505 The function adds a channel suffix (according to the channel to file name
1506 conventions and conversions) to the relay log file.
1507
1508 @todo: truncate the log file if length exceeds.
1509 */
1510
1511 const char*
add_channel_to_relay_log_name(char * buff,uint buff_size,const char * base_name)1512 Relay_log_info::add_channel_to_relay_log_name(char *buff, uint buff_size,
1513 const char *base_name)
1514 {
1515 char *ptr;
1516 char channel_to_file[FN_REFLEN];
1517 uint errors, length;
1518 uint base_name_len;
1519 uint suffix_buff_size;
1520
1521 assert(base_name !=NULL);
1522
1523 base_name_len= strlen(base_name);
1524 suffix_buff_size= buff_size - base_name_len;
1525
1526 ptr= strmake(buff, base_name, buff_size-1);
1527
1528 if (channel[0])
1529 {
1530
1531 /* adding a "-" */
1532 ptr= strmake(ptr, "-", suffix_buff_size-1);
1533
1534 /*
1535 Convert the channel name to the file names charset.
1536 Channel name is in system_charset which is UTF8_general_ci
1537 as it was defined as utf8 in the mysql.slaveinfo tables.
1538 */
1539 length= strconvert(system_charset_info, channel, &my_charset_filename,
1540 channel_to_file, NAME_LEN, &errors);
1541 ptr= strmake(ptr, channel_to_file, suffix_buff_size-length-1);
1542
1543 }
1544
1545 return (const char*)buff;
1546 }
1547
1548
1549 /**
1550 Checks if condition stated in UNTIL clause of START SLAVE is reached.
1551
1552 Specifically, it checks if UNTIL condition is reached. Uses caching result
1553 of last comparison of current log file name and target log file name. So
1554 cached value should be invalidated if current log file name changes (see
1555 @c Relay_log_info::notify_... functions).
1556
1557 This caching is needed to avoid of expensive string comparisons and
1558 @c strtol() conversions needed for log names comparison. We don't need to
1559 compare them each time this function is called, we only need to do this
1560 when current log name changes. If we have @c UNTIL_MASTER_POS condition we
1561 need to do this only after @c Rotate_log_event::do_apply_event() (which is
1562 rare, so caching gives real benifit), and if we have @c UNTIL_RELAY_POS
1563 condition then we should invalidate cached comarison value after
1564 @c inc_group_relay_log_pos() which called for each group of events (so we
1565 have some benefit if we have something like queries that use
1566 autoincrement or if we have transactions).
1567
1568 Should be called ONLY if @c until_condition @c != @c UNTIL_NONE !
1569
1570 @param master_beg_pos position of the beginning of to be executed event
1571 (not @c log_pos member of the event that points to
1572 the beginning of the following event)
1573
1574 @retval true condition met or error happened (condition seems to have
1575 bad log file name),
1576 @retval false condition not met.
1577 */
1578
is_until_satisfied(THD * thd,Log_event * ev)1579 bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
1580 {
1581 char error_msg[]= "Slave SQL thread is stopped because UNTIL "
1582 "condition is bad.";
1583 DBUG_ENTER("Relay_log_info::is_until_satisfied");
1584
1585 switch (until_condition)
1586 {
1587 case UNTIL_MASTER_POS:
1588 case UNTIL_RELAY_POS:
1589 {
1590 const char *log_name= NULL;
1591 ulonglong log_pos= 0;
1592
1593 if (until_condition == UNTIL_MASTER_POS)
1594 {
1595 if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1596 DBUG_RETURN(false);
1597 /*
1598 Rotate events originating from the slave have server_id==0,
1599 and their log_pos is relative to the slave, so in case their
1600 log_pos is greater than the log_pos we are waiting for, they
1601 can cause the slave to stop prematurely. So we ignore such
1602 events.
1603 */
1604 if (ev && ev->server_id == 0)
1605 DBUG_RETURN(false);
1606 log_name= get_group_master_log_name();
1607 if (!ev || is_in_group() || !ev->common_header->log_pos)
1608 log_pos= get_group_master_log_pos();
1609 else
1610 log_pos= ev->common_header->log_pos - ev->common_header->data_written;
1611 }
1612 else
1613 { /* until_condition == UNTIL_RELAY_POS */
1614 log_name= group_relay_log_name;
1615 log_pos= group_relay_log_pos;
1616 }
1617
1618 #ifndef NDEBUG
1619 {
1620 char buf[32];
1621 DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
1622 get_group_master_log_name(),
1623 llstr(get_group_master_log_pos(), buf)));
1624 DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
1625 group_relay_log_name, llstr(group_relay_log_pos, buf)));
1626 DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
1627 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1628 log_name, llstr(log_pos, buf)));
1629 DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
1630 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1631 until_log_name, llstr(until_log_pos, buf)));
1632 }
1633 #endif
1634
1635 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1636 {
1637 /*
1638 We have no cached comparison results so we should compare log names
1639 and cache result.
1640 If we are after RESET SLAVE, and the SQL slave thread has not processed
1641 any event yet, it could be that group_master_log_name is "". In that case,
1642 just wait for more events (as there is no sensible comparison to do).
1643 */
1644
1645 if (*log_name)
1646 {
1647 const char *basename= log_name + dirname_length(log_name);
1648
1649 const char *q= (const char*)(fn_ext(basename)+1);
1650 if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1651 {
1652 /* Now compare extensions. */
1653 char *q_end;
1654 ulong log_name_extension= strtoul(q, &q_end, 10);
1655 if (log_name_extension < until_log_name_extension)
1656 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1657 else
1658 until_log_names_cmp_result=
1659 (log_name_extension > until_log_name_extension) ?
1660 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1661 }
1662 else
1663 {
1664 /* Base names do not match, so we abort */
1665 sql_print_error("%s", error_msg);
1666 DBUG_RETURN(true);
1667 }
1668 }
1669 else
1670 DBUG_RETURN(until_log_pos == 0);
1671 }
1672
1673 if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1674 log_pos >= until_log_pos) ||
1675 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1676 {
1677 char buf[22];
1678 sql_print_information("Slave SQL thread stopped because it reached its"
1679 " UNTIL position %s", llstr(until_pos(), buf));
1680 DBUG_RETURN(true);
1681 }
1682 DBUG_RETURN(false);
1683 }
1684
1685 case UNTIL_SQL_BEFORE_GTIDS:
1686 /*
1687 We only need to check once if executed_gtids set
1688 contains any of the until_sql_gtids.
1689 */
1690 if (until_sql_gtids_first_event)
1691 {
1692 until_sql_gtids_first_event= false;
1693 global_sid_lock->wrlock();
1694 /* Check if until GTIDs were already applied. */
1695 const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1696 if (until_sql_gtids.is_intersection_nonempty(executed_gtids))
1697 {
1698 char *buffer;
1699 until_sql_gtids.to_string(&buffer);
1700 global_sid_lock->unlock();
1701 sql_print_information("Slave SQL thread stopped because "
1702 "UNTIL SQL_BEFORE_GTIDS %s is already "
1703 "applied", buffer);
1704 my_free(buffer);
1705 DBUG_RETURN(true);
1706 }
1707 global_sid_lock->unlock();
1708 }
1709 if (ev != NULL && ev->get_type_code() == binary_log::GTID_LOG_EVENT)
1710 {
1711 Gtid_log_event *gev= (Gtid_log_event *)ev;
1712 global_sid_lock->rdlock();
1713 if (until_sql_gtids.contains_gtid(gev->get_sidno(false), gev->get_gno()))
1714 {
1715 char *buffer;
1716 until_sql_gtids.to_string(&buffer);
1717 global_sid_lock->unlock();
1718 sql_print_information("Slave SQL thread stopped because it reached "
1719 "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1720 my_free(buffer);
1721 DBUG_RETURN(true);
1722 }
1723 global_sid_lock->unlock();
1724 }
1725 DBUG_RETURN(false);
1726 break;
1727
1728 case UNTIL_SQL_AFTER_GTIDS:
1729 {
1730 global_sid_lock->wrlock();
1731 const Gtid_set* executed_gtids= gtid_state->get_executed_gtids();
1732 if (until_sql_gtids.is_subset(executed_gtids))
1733 {
1734 char *buffer;
1735 until_sql_gtids.to_string(&buffer);
1736 global_sid_lock->unlock();
1737 sql_print_information("Slave SQL thread stopped because it reached "
1738 "UNTIL SQL_AFTER_GTIDS %s", buffer);
1739 my_free(buffer);
1740 DBUG_RETURN(true);
1741 }
1742 global_sid_lock->unlock();
1743 DBUG_RETURN(false);
1744 }
1745 break;
1746
1747 case UNTIL_SQL_AFTER_MTS_GAPS:
1748 case UNTIL_DONE:
1749 /*
1750 TODO: this condition is actually post-execution or post-scheduling
1751 so the proper place to check it before SQL thread goes
1752 into next_event() where it can wait while the condition
1753 has been satisfied already.
1754 It's deployed here temporarily to be fixed along the regular UNTIL
1755 support for MTS is provided.
1756 */
1757 if (mts_recovery_group_cnt == 0)
1758 {
1759 sql_print_information("Slave SQL thread stopped according to "
1760 "UNTIL SQL_AFTER_MTS_GAPS as it has "
1761 "processed all gap transactions left from "
1762 "the previous slave session.");
1763 until_condition= UNTIL_DONE;
1764 DBUG_RETURN(true);
1765 }
1766 else
1767 {
1768 DBUG_RETURN(false);
1769 }
1770 break;
1771
1772 case UNTIL_SQL_VIEW_ID:
1773 if (ev != NULL && ev->get_type_code() == binary_log::VIEW_CHANGE_EVENT)
1774 {
1775 View_change_log_event *view_event= (View_change_log_event *)ev;
1776
1777 if (until_view_id.compare(view_event->get_view_id()) == 0)
1778 {
1779 set_group_replication_retrieved_certification_info(view_event);
1780 until_view_id_found= true;
1781 DBUG_RETURN(false);
1782 }
1783 }
1784
1785 if (until_view_id_found && ev != NULL && ev->ends_group())
1786 {
1787 until_view_id_commit_found= true;
1788 DBUG_RETURN(false);
1789 }
1790
1791 if (until_view_id_commit_found && ev == NULL)
1792 {
1793 DBUG_RETURN(true);
1794 }
1795
1796 DBUG_RETURN(false);
1797 break;
1798
1799 case UNTIL_NONE:
1800 assert(0);
1801 break;
1802 }
1803
1804 assert(0);
1805 DBUG_RETURN(false);
1806 }
1807
cached_charset_invalidate()1808 void Relay_log_info::cached_charset_invalidate()
1809 {
1810 DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
1811
1812 /* Full of zeroes means uninitialized. */
1813 memset(cached_charset, 0, sizeof(cached_charset));
1814 DBUG_VOID_RETURN;
1815 }
1816
1817
cached_charset_compare(char * charset) const1818 bool Relay_log_info::cached_charset_compare(char *charset) const
1819 {
1820 DBUG_ENTER("Relay_log_info::cached_charset_compare");
1821
1822 if (memcmp(cached_charset, charset, sizeof(cached_charset)))
1823 {
1824 memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
1825 DBUG_RETURN(1);
1826 }
1827 DBUG_RETURN(0);
1828 }
1829
1830
stmt_done(my_off_t event_master_log_pos)1831 int Relay_log_info::stmt_done(my_off_t event_master_log_pos)
1832 {
1833 int error= 0;
1834
1835 clear_flag(IN_STMT);
1836
1837 assert(!belongs_to_client());
1838 /* Worker does not execute binlog update position logics */
1839 assert(!is_mts_worker(info_thd));
1840
1841 /*
1842 Replication keeps event and group positions to specify the
1843 set of events that were executed.
1844 Event positions are incremented after processing each event
1845 whereas group positions are incremented when an event or a
1846 set of events is processed such as in a transaction and are
1847 committed or rolled back.
1848
1849 A transaction can be ended with a Query Event, i.e. either
1850 commit or rollback, or by a Xid Log Event. Query Event is
1851 used to terminate pseudo-transactions that are executed
1852 against non-transactional engines such as MyIsam. Xid Log
1853 Event denotes though that a set of changes executed
1854 against a transactional engine is about to commit.
1855
1856 Events' positions are incremented at stmt_done(). However,
1857 transactions that are ended with Xid Log Event have their
1858 group position incremented in the do_apply_event() and in
1859 the do_apply_event_work().
1860
1861 Notice that the type of the engine, i.e. where data and
1862 positions are stored, against what events are being applied
1863 are not considered in this logic.
1864
1865 Regarding the code that follows, notice that the executed
1866 group coordinates don't change if the current event is internal
1867 to the group. The same applies to MTS Coordinator when it
1868 handles a Format Descriptor event that appears in the middle
1869 of a group that is about to be assigned.
1870 */
1871 if ((!is_parallel_exec() && is_in_group()) ||
1872 mts_group_status != MTS_NOT_IN_GROUP)
1873 {
1874 inc_event_relay_log_pos();
1875 }
1876 else
1877 {
1878 if (is_parallel_exec())
1879 {
1880
1881 assert(!is_mts_worker(info_thd));
1882
1883 /*
1884 Format Description events only can drive MTS execution to this
1885 point. It is a special event group that is handled with
1886 synchronization. For that reason, the checkpoint routine is
1887 called here.
1888 */
1889 error= mts_checkpoint_routine(this, 0, false,
1890 true/*need_data_lock=true*/);
1891 }
1892 if (!error)
1893 error= inc_group_relay_log_pos(event_master_log_pos,
1894 true/*need_data_lock=true*/);
1895 }
1896
1897 return error;
1898 }
1899
1900 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
cleanup_context(THD * thd,bool error)1901 void Relay_log_info::cleanup_context(THD *thd, bool error)
1902 {
1903 DBUG_ENTER("Relay_log_info::cleanup_context");
1904
1905 assert(info_thd == thd);
1906 /*
1907 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1908 may have opened tables, which we cannot be sure have been closed (because
1909 maybe the Rows_log_event have not been found or will not be, because slave
1910 SQL thread is stopping, or relay log has a missing tail etc). So we close
1911 all thread's tables. And so the table mappings have to be cancelled.
1912 2) Rows_log_event::do_apply_event() may even have started statements or
1913 transactions on them, which we need to rollback in case of error.
1914 3) If finding a Format_description_log_event after a BEGIN, we also need
1915 to rollback before continuing with the next events.
1916 4) so we need this "context cleanup" function.
1917 */
1918 if (error)
1919 {
1920 trans_rollback_stmt(thd); // if a "statement transaction"
1921 trans_rollback(thd); // if a "real transaction"
1922 }
1923 if (rows_query_ev)
1924 {
1925 /*
1926 In order to avoid invalid memory access, THD::reset_query() should be
1927 called before deleting the rows_query event.
1928 */
1929 info_thd->reset_query();
1930 info_thd->reset_query_for_display();
1931 delete rows_query_ev;
1932 rows_query_ev= NULL;
1933 DBUG_EXECUTE_IF("after_deleting_the_rows_query_ev",
1934 {
1935 const char action[]="now SIGNAL deleted_rows_query_ev WAIT_FOR go_ahead";
1936 assert(!debug_sync_set_action(info_thd,
1937 STRING_WITH_LEN(action)));
1938 };);
1939 }
1940 m_table_map.clear_tables();
1941 slave_close_thread_tables(thd);
1942 if (error)
1943 {
1944 /*
1945 trans_rollback above does not rollback XA transactions.
1946 It could be done only after necessarily closing tables which dictates
1947 the following placement.
1948 */
1949 XID_STATE *xid_state= thd->get_transaction()->xid_state();
1950 if (!xid_state->has_state(XID_STATE::XA_NOTR))
1951 {
1952 assert(DBUG_EVALUATE_IF("simulate_commit_failure",1,
1953 xid_state->has_state(XID_STATE::XA_ACTIVE) ||
1954 xid_state->has_state(XID_STATE::XA_IDLE)
1955 ));
1956
1957 xa_trans_force_rollback(thd);
1958 xid_state->reset();
1959 cleanup_trans_state(thd);
1960 thd->rpl_unflag_detached_engine_ha_data();
1961 }
1962 thd->mdl_context.release_transactional_locks();
1963 }
1964 clear_flag(IN_STMT);
1965 /*
1966 Cleanup for the flags that have been set at do_apply_event.
1967 */
1968 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1969 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1970
1971 /*
1972 Reset state related to long_find_row notes in the error log:
1973 - timestamp
1974 - flag that decides whether the slave prints or not
1975 */
1976 reset_row_stmt_start_timestamp();
1977 unset_long_find_row_note_printed();
1978
1979 /*
1980 If the slave applier changed the current transaction isolation level,
1981 it need to be restored to the session default value once having the
1982 current transaction cleared.
1983
1984 We should call "trans_reset_one_shot_chistics()" only if the "error"
1985 flag is "true", because "cleanup_context()" is called at the end of each
1986 set of Table_maps/Rows representing a statement (when the rows event
1987 is tagged with the STMT_END_F) with the "error" flag as "false".
1988
1989 So, without the "if (error)" below, the isolation level might be reset
1990 in the middle of a pure row based transaction.
1991 */
1992 if (error)
1993 trans_reset_one_shot_chistics(thd);
1994
1995 DBUG_VOID_RETURN;
1996 }
1997
clear_tables_to_lock()1998 void Relay_log_info::clear_tables_to_lock()
1999 {
2000 DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
2001 #ifndef NDEBUG
2002 /**
2003 When replicating in RBR and MyISAM Merge tables are involved
2004 open_and_lock_tables (called in do_apply_event) appends the
2005 base tables to the list of tables_to_lock. Then these are
2006 removed from the list in close_thread_tables (which is called
2007 before we reach this point).
2008
2009 This assertion just confirms that we get no surprises at this
2010 point.
2011 */
2012 uint i=0;
2013 for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
2014 assert(i == tables_to_lock_count);
2015 #endif
2016
2017 while (tables_to_lock)
2018 {
2019 uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
2020 if (tables_to_lock->m_tabledef_valid)
2021 {
2022 tables_to_lock->m_tabledef.table_def::~table_def();
2023 tables_to_lock->m_tabledef_valid= FALSE;
2024 }
2025
2026 /*
2027 If blob fields were used during conversion of field values
2028 from the master table into the slave table, then we need to
2029 free the memory used temporarily to store their values before
2030 copying into the slave's table.
2031 */
2032 if (tables_to_lock->m_conv_table)
2033 free_blobs(tables_to_lock->m_conv_table);
2034
2035 tables_to_lock=
2036 static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
2037 tables_to_lock_count--;
2038 my_free(to_free);
2039 }
2040 assert(tables_to_lock == NULL && tables_to_lock_count == 0);
2041 DBUG_VOID_RETURN;
2042 }
2043
slave_close_thread_tables(THD * thd)2044 void Relay_log_info::slave_close_thread_tables(THD *thd)
2045 {
2046 thd->get_stmt_da()->set_overwrite_status(true);
2047 DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
2048 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
2049 thd->get_stmt_da()->set_overwrite_status(false);
2050
2051 close_thread_tables(thd);
2052 /*
2053 - If transaction rollback was requested due to deadlock
2054 perform it and release metadata locks.
2055 - If inside a multi-statement transaction,
2056 defer the release of metadata locks until the current
2057 transaction is either committed or rolled back. This prevents
2058 other statements from modifying the table for the entire
2059 duration of this transaction. This provides commit ordering
2060 and guarantees serializability across multiple transactions.
2061 - If in autocommit mode, or outside a transactional context,
2062 automatically release metadata locks of the current statement.
2063 */
2064 if (thd->transaction_rollback_request)
2065 {
2066 trans_rollback_implicit(thd);
2067 thd->mdl_context.release_transactional_locks();
2068 }
2069 else if (! thd->in_multi_stmt_transaction_mode())
2070 thd->mdl_context.release_transactional_locks();
2071 else
2072 thd->mdl_context.release_statement_locks();
2073
2074 clear_tables_to_lock();
2075 DBUG_VOID_RETURN;
2076 }
2077
2078
2079 /**
2080 Execute a SHOW RELAYLOG EVENTS statement.
2081
2082 When multiple replication channels exist on this slave
2083 and no channel name is specified through FOR CHANNEL clause
2084 this function errors out and exits.
2085
2086 @param thd Pointer to THD object for the client thread executing the
2087 statement.
2088
2089 @retval FALSE success
2090 @retval TRUE failure
2091 */
mysql_show_relaylog_events(THD * thd)2092 bool mysql_show_relaylog_events(THD* thd)
2093 {
2094
2095 Master_info *mi =0;
2096 List<Item> field_list;
2097 bool res;
2098 DBUG_ENTER("mysql_show_relaylog_events");
2099
2100 assert(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
2101
2102 channel_map.wrlock();
2103
2104 if (!thd->lex->mi.for_channel && channel_map.get_num_instances() > 1)
2105 {
2106 my_error(ER_SLAVE_MULTIPLE_CHANNELS_CMD, MYF(0));
2107 res= true;
2108 goto err;
2109 }
2110
2111 Log_event::init_show_field_list(&field_list);
2112 if (thd->send_result_metadata(&field_list,
2113 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2114 {
2115 res= true;
2116 goto err;
2117 }
2118
2119 mi= channel_map.get_mi(thd->lex->mi.channel);
2120
2121 if (!mi && strcmp(thd->lex->mi.channel, channel_map.get_default_channel()))
2122 {
2123 my_error(ER_SLAVE_CHANNEL_DOES_NOT_EXIST, MYF(0), thd->lex->mi.channel);
2124 res= true;
2125 goto err;
2126 }
2127
2128 if (mi == NULL)
2129 {
2130 my_error(ER_SLAVE_CONFIGURATION, MYF(0));
2131 res= true;
2132 goto err;
2133 }
2134
2135 res= show_binlog_events(thd, &mi->rli->relay_log);
2136
2137 err:
2138 channel_map.unlock();
2139
2140 DBUG_RETURN(res);
2141 }
2142 #endif
2143
2144
rli_init_info()2145 int Relay_log_info::rli_init_info()
2146 {
2147 int error= 0;
2148 enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
2149 const char *msg= NULL;
2150 /* Store the GTID of a transaction spanned in multiple relay log files */
2151 Gtid gtid_partial_trx= {0, 0};
2152
2153 DBUG_ENTER("Relay_log_info::rli_init_info");
2154
2155 mysql_mutex_assert_owner(&data_lock);
2156
2157 /*
2158 If Relay_log_info is issued again after a failed init_info(), for
2159 instance because of missing relay log files, it will generate new
2160 files and ignore the previous failure, to avoid that we set
2161 error_on_rli_init_info as true.
2162 This a consequence of the behaviour change, in the past server was
2163 stopped when there were replication initialization errors, now it is
2164 not and so init_info() must be aware of previous failures.
2165 */
2166 if (error_on_rli_init_info)
2167 goto err;
2168
2169 if (inited)
2170 {
2171 /*
2172 We have to reset read position of relay-log-bin as we may have
2173 already been reading from 'hotlog' when the slave was stopped
2174 last time. If this case pos_in_file would be set and we would
2175 get a crash when trying to read the signature for the binary
2176 relay log.
2177
2178 We only rewind the read position if we are starting the SQL
2179 thread. The handle_slave_sql thread assumes that the read
2180 position is at the beginning of the file, and will read the
2181 "signature" and then fast-forward to the last position read.
2182 */
2183 bool hot_log= FALSE;
2184 /*
2185 my_b_seek does an implicit flush_io_cache, so we need to:
2186
2187 1. check if this log is active (hot)
2188 2. if it is we keep log_lock until the seek ends, otherwise
2189 release it right away.
2190
2191 If we did not take log_lock, SQL thread might race with IO
2192 thread for the IO_CACHE mutex.
2193
2194 */
2195 mysql_mutex_t *log_lock= relay_log.get_log_lock();
2196 mysql_mutex_lock(log_lock);
2197 hot_log= relay_log.is_active(linfo.log_file_name);
2198
2199 if (!hot_log)
2200 mysql_mutex_unlock(log_lock);
2201
2202 my_b_seek(cur_log, (my_off_t) 0);
2203
2204 if (hot_log)
2205 mysql_mutex_unlock(log_lock);
2206 DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(this) : 0);
2207 }
2208
2209 cur_log_fd = -1;
2210 slave_skip_counter= 0;
2211 abort_pos_wait= 0;
2212 log_space_limit= relay_log_space_limit;
2213 log_space_total= 0;
2214 tables_to_lock= 0;
2215 tables_to_lock_count= 0;
2216
2217 char pattern[FN_REFLEN];
2218 (void) my_realpath(pattern, slave_load_tmpdir, 0);
2219 /*
2220 @TODO:
2221 In MSR, sometimes slave fail with the following error:
2222 Unable to use slave's temporary directory /tmp -
2223 Can't create/write to file '/tmp/SQL_LOAD-92d1eee0-9de4-11e3-8874-68730ad50fcb' (Errcode: 17 - File exists), Error_code: 1
2224
2225 */
2226 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
2227 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
2228 {
2229 sql_print_error("Unable to use slave's temporary directory '%s'.",
2230 slave_load_tmpdir);
2231 DBUG_RETURN(1);
2232 }
2233 unpack_filename(slave_patternload_file, pattern);
2234 slave_patternload_file_size= strlen(slave_patternload_file);
2235
2236 /*
2237 The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
2238 Note that the I/O thread flushes it to disk after writing every
2239 event, in flush_info within the master info.
2240 */
2241 /*
2242 For the maximum log size, we choose max_relay_log_size if it is
2243 non-zero, max_binlog_size otherwise. If later the user does SET
2244 GLOBAL on one of these variables, fix_max_binlog_size and
2245 fix_max_relay_log_size will reconsider the choice (for example
2246 if the user changes max_relay_log_size to zero, we have to
2247 switch to using max_binlog_size for the relay log) and update
2248 relay_log.max_size (and mysql_bin_log.max_size).
2249 */
2250 {
2251 /* Reports an error and returns, if the --relay-log's path
2252 is a directory.*/
2253 if (opt_relay_logname &&
2254 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
2255 {
2256 sql_print_error("Path '%s' is a directory name, please specify \
2257 a file name for --relay-log option.", opt_relay_logname);
2258 DBUG_RETURN(1);
2259 }
2260
2261 /* Reports an error and returns, if the --relay-log-index's path
2262 is a directory.*/
2263 if (opt_relaylog_index_name &&
2264 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
2265 == FN_LIBCHAR)
2266 {
2267 sql_print_error("Path '%s' is a directory name, please specify \
2268 a file name for --relay-log-index option.", opt_relaylog_index_name);
2269 DBUG_RETURN(1);
2270 }
2271
2272 char buf[FN_REFLEN];
2273 /* The base name of the relay log file considering multisource rep */
2274 const char *ln;
2275 /*
2276 relay log name without channel prefix taking into account
2277 --relay-log option.
2278 */
2279 const char *ln_without_channel_name;
2280 static bool name_warning_sent= 0;
2281
2282 /*
2283 Buffer to add channel name suffix when relay-log option is provided.
2284 */
2285 char relay_bin_channel[FN_REFLEN];
2286 /*
2287 Buffer to add channel name suffix when relay-log-index option is provided
2288 */
2289 char relay_bin_index_channel[FN_REFLEN];
2290
2291 /* name of the index file if opt_relaylog_index_name is set*/
2292 const char* log_index_name;
2293
2294
2295 ln_without_channel_name= relay_log.generate_name(opt_relay_logname,
2296 "-relay-bin", buf);
2297
2298 ln= add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN,
2299 ln_without_channel_name);
2300
2301 /* We send the warning only at startup, not after every RESET SLAVE */
2302 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
2303 {
2304 /*
2305 User didn't give us info to name the relay log index file.
2306 Picking `hostname`-relay-bin.index like we do, causes replication to
2307 fail if this slave's hostname is changed later. So, we would like to
2308 instead require a name. But as we don't want to break many existing
2309 setups, we only give warning, not error.
2310 */
2311 sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
2312 " so replication "
2313 "may break when this MySQL server acts as a "
2314 "slave and has his hostname changed!! Please "
2315 "use '--relay-log=%s' to avoid this problem.",
2316 ln_without_channel_name);
2317 name_warning_sent= 1;
2318 }
2319
2320 relay_log.is_relay_log= TRUE;
2321
2322 /*
2323 If relay log index option is set, convert into channel specific
2324 index file. If the opt_relaylog_index has an extension, we strip
2325 it too. This is inconsistent to relay log names.
2326 */
2327 if (opt_relaylog_index_name)
2328 {
2329 char index_file_withoutext[FN_REFLEN];
2330 relay_log.generate_name(opt_relaylog_index_name,"",
2331 index_file_withoutext);
2332
2333 log_index_name= add_channel_to_relay_log_name(relay_bin_index_channel,
2334 FN_REFLEN,
2335 index_file_withoutext);
2336 }
2337 else
2338 log_index_name= 0;
2339
2340
2341
2342 if (relay_log.open_index_file(log_index_name, ln, TRUE))
2343 {
2344 sql_print_error("Failed in open_index_file() called from Relay_log_info::rli_init_info().");
2345 DBUG_RETURN(1);
2346 }
2347 #ifndef NDEBUG
2348 global_sid_lock->wrlock();
2349 gtid_set.dbug_print("set of GTIDs in relay log before initialization");
2350 global_sid_lock->unlock();
2351 #endif
2352 /*
2353 In the init_gtid_set below we pass the mi->transaction_parser.
2354 This will be useful to ensure that we only add a GTID to
2355 the Retrieved_Gtid_Set for fully retrieved transactions. Also, it will
2356 be useful to ensure the Retrieved_Gtid_Set behavior when auto
2357 positioning is disabled (we could have transactions spanning multiple
2358 relay log files in this case).
2359 We will skip this initialization if relay_log_recovery is set in order
2360 to save time, as neither the GTIDs nor the transaction_parser state
2361 would be useful when the relay log will be cleaned up later when calling
2362 init_recovery.
2363 */
2364 if (!is_relay_log_recovery &&
2365 !gtid_retrieved_initialized &&
2366 relay_log.init_gtid_sets(>id_set, NULL,
2367 opt_slave_sql_verify_checksum,
2368 true/*true=need lock*/,
2369 &mi->transaction_parser, >id_partial_trx))
2370 {
2371 sql_print_error("Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
2372 DBUG_RETURN(1);
2373 }
2374 gtid_retrieved_initialized= true;
2375 #ifndef NDEBUG
2376 global_sid_lock->wrlock();
2377 gtid_set.dbug_print("set of GTIDs in relay log after initialization");
2378 global_sid_lock->unlock();
2379 #endif
2380 if (!gtid_partial_trx.is_empty())
2381 {
2382 /*
2383 The init_gtid_set has found an incomplete transaction in the relay log.
2384 We add this transaction's GTID to the last_gtid_queued so the IO thread
2385 knows which GTID to add to the Retrieved_Gtid_Set when reaching the end
2386 of the incomplete transaction.
2387 */
2388 mi->set_last_gtid_queued(gtid_partial_trx);
2389 }
2390 else
2391 {
2392 mi->clear_last_gtid_queued();
2393 }
2394 /*
2395 Configures what object is used by the current log to store processed
2396 gtid(s). This is necessary in the MYSQL_BIN_LOG::MYSQL_BIN_LOG to
2397 corretly compute the set of previous gtids.
2398 */
2399 relay_log.set_previous_gtid_set_relaylog(>id_set);
2400 /*
2401 note, that if open() fails, we'll still have index file open
2402 but a destructor will take care of that
2403 */
2404
2405 mysql_mutex_t *log_lock= relay_log.get_log_lock();
2406 mysql_mutex_lock(log_lock);
2407
2408 if (relay_log.open_binlog(ln, 0,
2409 (max_relay_log_size ? max_relay_log_size :
2410 max_binlog_size), true,
2411 true/*need_lock_index=true*/,
2412 true/*need_sid_lock=true*/,
2413 mi->get_mi_description_event()))
2414 {
2415 mysql_mutex_unlock(log_lock);
2416 sql_print_error("Failed in open_log() called from Relay_log_info::rli_init_info().");
2417 DBUG_RETURN(1);
2418 }
2419
2420 mysql_mutex_unlock(log_lock);
2421
2422 }
2423
2424 /*
2425 This checks if the repository was created before and thus there
2426 will be values to be read. Please, do not move this call after
2427 the handler->init_info().
2428 */
2429 if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
2430 {
2431 msg= "Error checking relay log repository";
2432 error= 1;
2433 goto err;
2434 }
2435
2436 if (handler->init_info())
2437 {
2438 msg= "Error reading relay log configuration";
2439 error= 1;
2440 goto err;
2441 }
2442
2443 if (check_return == REPOSITORY_DOES_NOT_EXIST)
2444 {
2445 /* Init relay log with first entry in the relay index file */
2446 if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
2447 false/*need_data_lock=false (lock should be held
2448 prior to invoking this function)*/,
2449 &msg, 0))
2450 {
2451 error= 1;
2452 goto err;
2453 }
2454 set_group_master_log_name("");
2455 set_group_master_log_pos(0);
2456 }
2457 else
2458 {
2459 if (read_info(handler))
2460 {
2461 msg= "Error reading relay log configuration";
2462 error= 1;
2463 goto err;
2464 }
2465
2466 if (is_relay_log_recovery && init_recovery(mi, &msg))
2467 {
2468 error= 1;
2469 goto err;
2470 }
2471
2472 if (init_relay_log_pos(group_relay_log_name,
2473 group_relay_log_pos,
2474 false/*need_data_lock=false (lock should be held
2475 prior to invoking this function)*/,
2476 &msg, 0))
2477 {
2478 char llbuf[22];
2479 sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s).",
2480 group_relay_log_name,
2481 llstr(group_relay_log_pos, llbuf));
2482 error= 1;
2483 goto err;
2484 }
2485
2486 #ifndef NDEBUG
2487 {
2488 char llbuf1[22], llbuf2[22];
2489 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
2490 llstr(my_b_tell(cur_log),llbuf1),
2491 llstr(event_relay_log_pos,llbuf2)));
2492 assert(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2493 assert((my_b_tell(cur_log) == event_relay_log_pos));
2494 }
2495 #endif
2496 }
2497
2498 inited= 1;
2499 error_on_rli_init_info= false;
2500 if (flush_info(TRUE))
2501 {
2502 msg= "Error reading relay log configuration";
2503 error= 1;
2504 goto err;
2505 }
2506
2507 if (count_relay_log_space())
2508 {
2509 msg= "Error counting relay log space";
2510 error= 1;
2511 goto err;
2512 }
2513
2514 /*
2515 In case of MTS the recovery is deferred until the end of
2516 load_mi_and_rli_from_repositories.
2517 */
2518 if (!mi->rli->mts_recovery_group_cnt)
2519 is_relay_log_recovery= FALSE;
2520 DBUG_RETURN(error);
2521
2522 err:
2523 handler->end_info();
2524 inited= 0;
2525 error_on_rli_init_info= true;
2526 if (msg)
2527 sql_print_error("%s.", msg);
2528 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2529 true/*need_lock_log=true*/,
2530 true/*need_lock_index=true*/);
2531 DBUG_RETURN(error);
2532 }
2533
end_info()2534 void Relay_log_info::end_info()
2535 {
2536 DBUG_ENTER("Relay_log_info::end_info");
2537
2538 error_on_rli_init_info= false;
2539 if (!inited)
2540 DBUG_VOID_RETURN;
2541
2542 handler->end_info();
2543
2544 if (cur_log_fd >= 0)
2545 {
2546 end_io_cache(&cache_buf);
2547 (void)my_close(cur_log_fd, MYF(MY_WME));
2548 cur_log_fd= -1;
2549 }
2550 inited = 0;
2551 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT,
2552 true/*need_lock_log=true*/,
2553 true/*need_lock_index=true*/);
2554 relay_log.harvest_bytes_written(this, true/*need_log_space_lock=true*/);
2555 /*
2556 Delete the slave's temporary tables from memory.
2557 In the future there will be other actions than this, to ensure persistance
2558 of slave's temp tables after shutdown.
2559 */
2560 close_temporary_tables();
2561
2562 DBUG_VOID_RETURN;
2563 }
2564
flush_current_log()2565 int Relay_log_info::flush_current_log()
2566 {
2567 DBUG_ENTER("Relay_log_info::flush_current_log");
2568 /*
2569 When we come to this place in code, relay log may or not be initialized;
2570 the caller is responsible for setting 'flush_relay_log_cache' accordingly.
2571 */
2572 IO_CACHE *log_file= relay_log.get_log_file();
2573 if (flush_io_cache(log_file))
2574 DBUG_RETURN(2);
2575
2576 DBUG_RETURN(0);
2577 }
2578
set_master_info(Master_info * info)2579 void Relay_log_info::set_master_info(Master_info* info)
2580 {
2581 mi= info;
2582 }
2583
2584 /**
2585 Stores the file and position where the execute-slave thread are in the
2586 relay log:
2587
2588 - As this is only called by the slave thread or on STOP SLAVE, with the
2589 log_lock grabbed and the slave thread stopped, we don't need to have
2590 a lock here.
2591 - If there is an active transaction, then we don't update the position
2592 in the relay log. This is to ensure that we re-execute statements
2593 if we die in the middle of an transaction that was rolled back.
2594 - As a transaction never spans binary logs, we don't have to handle the
2595 case where we do a relay-log-rotation in the middle of the transaction.
2596 If this would not be the case, we would have to ensure that we
2597 don't delete the relay log file where the transaction started when
2598 we switch to a new relay log file.
2599
2600 @retval 0 ok,
2601 @retval 1 write error, otherwise.
2602 */
2603
2604 /**
2605 Store the file and position where the slave's SQL thread are in the
2606 relay log.
2607
2608 Notes:
2609
2610 - This function should be called either from the slave SQL thread,
2611 or when the slave thread is not running. (It reads the
2612 group_{relay|master}_log_{pos|name} and delay fields in the rli
2613 object. These may only be modified by the slave SQL thread or by
2614 a client thread when the slave SQL thread is not running.)
2615
2616 - If there is an active transaction, then we do not update the
2617 position in the relay log. This is to ensure that we re-execute
2618 statements if we die in the middle of an transaction that was
2619 rolled back.
2620
2621 - As a transaction never spans binary logs, we don't have to handle
2622 the case where we do a relay-log-rotation in the middle of the
2623 transaction. If transactions could span several binlogs, we would
2624 have to ensure that we do not delete the relay log file where the
2625 transaction started before switching to a new relay log file.
2626
2627 - Error can happen if writing to file fails or if flushing the file
2628 fails.
2629
2630 @param rli The object representing the Relay_log_info.
2631
2632 @todo Change the log file information to a binary format to avoid
2633 calling longlong2str.
2634
2635 @return 0 on success, 1 on error.
2636 */
flush_info(const bool force)2637 int Relay_log_info::flush_info(const bool force)
2638 {
2639 DBUG_ENTER("Relay_log_info::flush_info");
2640
2641 if (!inited)
2642 DBUG_RETURN(0);
2643
2644 /*
2645 We update the sync_period at this point because only here we
2646 now that we are handling a relay log info. This needs to be
2647 update every time we call flush because the option maybe
2648 dinamically set.
2649 */
2650 mysql_mutex_lock(&mts_temp_table_LOCK);
2651 handler->set_sync_period(sync_relayloginfo_period);
2652
2653 if (write_info(handler))
2654 goto err;
2655
2656 if (handler->flush_info(force || force_flush_postponed_due_to_split_trans))
2657 goto err;
2658
2659 force_flush_postponed_due_to_split_trans= false;
2660 mysql_mutex_unlock(&mts_temp_table_LOCK);
2661 DBUG_RETURN(0);
2662
2663 err:
2664 sql_print_error("Error writing relay log configuration.");
2665 mysql_mutex_unlock(&mts_temp_table_LOCK);
2666 DBUG_RETURN(1);
2667 }
2668
get_number_info_rli_fields()2669 size_t Relay_log_info::get_number_info_rli_fields()
2670 {
2671 return sizeof(info_rli_fields)/sizeof(info_rli_fields[0]);
2672 }
2673
read_info(Rpl_info_handler * from)2674 bool Relay_log_info::read_info(Rpl_info_handler *from)
2675 {
2676 int lines= 0;
2677 char *first_non_digit= NULL;
2678 ulong temp_group_relay_log_pos= 0;
2679 char temp_group_master_log_name[FN_REFLEN];
2680 ulong temp_group_master_log_pos= 0;
2681 int temp_sql_delay= 0;
2682 int temp_internal_id= internal_id;
2683
2684 DBUG_ENTER("Relay_log_info::read_info");
2685
2686 /*
2687 Should not read RLI from file in client threads. Client threads
2688 only use RLI to execute BINLOG statements.
2689
2690 @todo Uncomment the following assertion. Currently,
2691 Relay_log_info::init() is called from init_master_info() before
2692 the THD object Relay_log_info::sql_thd is created. That means we
2693 cannot call belongs_to_client() since belongs_to_client()
2694 dereferences Relay_log_info::sql_thd. So we need to refactor
2695 slightly: the THD object should be created by Relay_log_info
2696 constructor (or passed to it), so that we are guaranteed that it
2697 exists at this point. /Sven
2698 */
2699 //assert(!belongs_to_client());
2700
2701 /*
2702 Starting from 5.1.x, relay-log.info has a new format. Now, its
2703 first line contains the number of lines in the file. By reading
2704 this number we can determine which version our master.info comes
2705 from. We can't simply count the lines in the file, since
2706 versions before 5.1.x could generate files with more lines than
2707 needed. If first line doesn't contain a number, or if it
2708 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
2709 then the file is treated like a file from pre-5.1.x version.
2710 There is no ambiguity when reading an old master.info: before
2711 5.1.x, the first line contained the binlog's name, which is
2712 either empty or has an extension (contains a '.'), so can't be
2713 confused with an integer.
2714
2715 So we're just reading first line and trying to figure which
2716 version is this.
2717 */
2718
2719 /*
2720 The first row is temporarily stored in mi->master_log_name, if
2721 it is line count and not binlog name (new format) it will be
2722 overwritten by the second row later.
2723 */
2724 if (from->prepare_info_for_read() ||
2725 from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2726 (char *) ""))
2727 DBUG_RETURN(TRUE);
2728
2729 lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2730
2731 if (group_relay_log_name[0]!='\0' &&
2732 *first_non_digit=='\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2733 {
2734 /* Seems to be new format => read group relay log name */
2735 if (from->get_info(group_relay_log_name, sizeof(group_relay_log_name),
2736 (char *) ""))
2737 DBUG_RETURN(TRUE);
2738 }
2739 else
2740 DBUG_PRINT("info", ("relay_log_info file is in old format."));
2741
2742 if (from->get_info(&temp_group_relay_log_pos,
2743 (ulong) BIN_LOG_HEADER_SIZE) ||
2744 from->get_info(temp_group_master_log_name,
2745 sizeof(temp_group_master_log_name),
2746 (char *) "") ||
2747 from->get_info(&temp_group_master_log_pos,
2748 0UL))
2749 DBUG_RETURN(TRUE);
2750
2751 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2752 {
2753 if (from->get_info(&temp_sql_delay, 0))
2754 DBUG_RETURN(TRUE);
2755 }
2756
2757 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2758 {
2759 if (from->get_info(&recovery_parallel_workers, 0UL))
2760 DBUG_RETURN(TRUE);
2761 }
2762
2763 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2764 {
2765 if (from->get_info(&temp_internal_id, 1))
2766 DBUG_RETURN(TRUE);
2767 }
2768
2769 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL)
2770 {
2771 /* the default value is empty string"" */
2772 if (from->get_info(channel, sizeof(channel), (char*)""))
2773 DBUG_RETURN(TRUE);
2774 }
2775
2776 group_relay_log_pos= temp_group_relay_log_pos;
2777 set_group_master_log_name(temp_group_master_log_name);
2778 set_group_master_log_pos(temp_group_master_log_pos);
2779 sql_delay= (int32) temp_sql_delay;
2780 internal_id= (uint) temp_internal_id;
2781
2782 assert(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2783 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2784 DBUG_RETURN(FALSE);
2785 }
2786
set_info_search_keys(Rpl_info_handler * to)2787 bool Relay_log_info::set_info_search_keys(Rpl_info_handler *to)
2788 {
2789 DBUG_ENTER("Relay_log_info::set_info_search_keys");
2790
2791 if (to->set_info(LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL, channel))
2792 DBUG_RETURN(TRUE);
2793
2794 DBUG_RETURN(FALSE);
2795 }
2796
2797
write_info(Rpl_info_handler * to)2798 bool Relay_log_info::write_info(Rpl_info_handler *to)
2799 {
2800 DBUG_ENTER("Relay_log_info::write_info");
2801
2802 /*
2803 @todo Uncomment the following assertion. See todo in
2804 Relay_log_info::read_info() for details. /Sven
2805 */
2806 //assert(!belongs_to_client());
2807
2808 if (to->prepare_info_for_write() ||
2809 to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2810 to->set_info(group_relay_log_name) ||
2811 to->set_info((ulong) group_relay_log_pos) ||
2812 to->set_info(get_group_master_log_name()) ||
2813 to->set_info((ulong) get_group_master_log_pos()) ||
2814 to->set_info((int) sql_delay) ||
2815 to->set_info(recovery_parallel_workers) ||
2816 to->set_info((int) internal_id) ||
2817 to->set_info(channel))
2818 DBUG_RETURN(TRUE);
2819
2820 DBUG_RETURN(FALSE);
2821 }
2822
2823 /**
2824 The method is run by SQL thread/MTS Coordinator.
2825 It replaces the current FD event with a new one.
2826 A version adaptation routine is invoked for the new FD
2827 to align the slave applier execution context with the master version.
2828
2829 Since FD are shared by Coordinator and Workers in the MTS mode,
2830 deletion of the old FD is done through decrementing its usage counter.
2831 The destructor runs when the later drops to zero,
2832 also see @c Slave_worker::set_rli_description_event().
2833 The usage counter of the new FD is incremented.
2834
2835 Although notice that MTS worker runs it, inefficiently (see assert),
2836 once at its destruction time.
2837
2838 @param a pointer to be installed into execution context
2839 FormatDescriptor event
2840 */
2841
set_rli_description_event(Format_description_log_event * fe)2842 void Relay_log_info::set_rli_description_event(Format_description_log_event *fe)
2843 {
2844 DBUG_ENTER("Relay_log_info::set_rli_description_event");
2845 assert(!info_thd || !is_mts_worker(info_thd) || !fe);
2846
2847 if (fe)
2848 {
2849 ulong fe_version= adapt_to_master_version(fe);
2850
2851 if (info_thd)
2852 {
2853 // See rpl_rli_pdb.h:Slave_worker::set_rli_description_event.
2854 if (!is_in_group() &&
2855 (info_thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
2856 info_thd->variables.gtid_next.type == UNDEFINED_GROUP))
2857 {
2858 DBUG_PRINT("info", ("Setting gtid_next.type to NOT_YET_DETERMINED_GROUP"));
2859 info_thd->variables.gtid_next.set_not_yet_determined();
2860 }
2861
2862 if (is_parallel_exec() && fe_version > 0)
2863 {
2864 /*
2865 Prepare for workers' adaption to a new FD version. Workers
2866 will see notification through scheduling of a first event of
2867 a new post-new-FD.
2868 */
2869 for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
2870 (*it)->fd_change_notified= false;
2871 }
2872 }
2873 }
2874 if (rli_description_event &&
2875 rli_description_event->usage_counter.atomic_add(-1) == 1)
2876 delete rli_description_event;
2877 #ifndef NDEBUG
2878 else
2879 /* It must be MTS mode when the usage counter greater than 1. */
2880 assert(!rli_description_event || is_parallel_exec());
2881 #endif
2882 rli_description_event= fe;
2883 if (rli_description_event)
2884 rli_description_event->usage_counter.atomic_add(1);
2885
2886 DBUG_VOID_RETURN;
2887 }
2888
2889 struct st_feature_version
2890 {
2891 /*
2892 The enum must be in the version non-descending top-down order,
2893 the last item formally corresponds to highest possible server
2894 version (never reached, thereby no adapting actions here);
2895 enumeration starts from zero.
2896 */
2897 enum
2898 {
2899 WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2900 _END_OF_LIST // always last
2901 } item;
2902 /*
2903 Version where the feature is introduced.
2904 */
2905 uchar version_split[3];
2906 /*
2907 Action to perform when according to FormatDescriptor event Master
2908 is found to be feature-aware while previously it has *not* been.
2909 */
2910 void (*upgrade) (THD*);
2911 /*
2912 Action to perform when according to FormatDescriptor event Master
2913 is found to be feature-*un*aware while previously it has been.
2914 */
2915 void (*downgrade) (THD*);
2916 };
2917
wl6292_upgrade_func(THD * thd)2918 void wl6292_upgrade_func(THD *thd)
2919 {
2920 thd->variables.explicit_defaults_for_timestamp= false;
2921 if (global_system_variables.explicit_defaults_for_timestamp)
2922 thd->variables.explicit_defaults_for_timestamp= true;
2923
2924 return;
2925 }
2926
wl6292_downgrade_func(THD * thd)2927 void wl6292_downgrade_func(THD *thd)
2928 {
2929 if (global_system_variables.explicit_defaults_for_timestamp)
2930 thd->variables.explicit_defaults_for_timestamp= false;
2931
2932 return;
2933 }
2934
2935 /**
2936 Sensitive to Master-vs-Slave version difference features
2937 should be listed in the version non-descending order.
2938 */
2939 static st_feature_version s_features[]=
2940 {
2941 // order is the same as in the enum
2942 { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2943 {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2944 { st_feature_version::_END_OF_LIST,
2945 {255, 255, 255}, NULL, NULL }
2946 };
2947
2948 /**
2949 The method computes the incoming "master"'s FD server version and that
2950 of the currently installed (if ever) rli_description_event, to
2951 invoke more specific method to compare the two and adapt slave applier execution
2952 context to the new incoming master's version.
2953
2954 This method is specifically for STS applier/MTS Coordinator as well as
2955 for a user thread applying binlog events.
2956
2957 @param fdle a pointer to new Format Description event that is being
2958 set up a new execution context.
2959 @return 0 when the versions are equal,
2960 master_version otherwise
2961 */
adapt_to_master_version(Format_description_log_event * fdle)2962 ulong Relay_log_info::adapt_to_master_version(Format_description_log_event *fdle)
2963 {
2964 ulong master_version, current_version, slave_version;
2965
2966 slave_version= version_product(slave_version_split);
2967 /* When rli_description_event is uninitialized yet take the slave's version */
2968 master_version= !fdle ? slave_version : fdle->get_product_version();
2969 current_version= !rli_description_event ? slave_version :
2970 rli_description_event->get_product_version();
2971
2972 return adapt_to_master_version_updown(master_version, current_version);
2973 }
2974
2975 /**
2976 The method compares two supplied versions and carries out down- or
2977 up- grade customization of execution context of the slave applier
2978 (thd).
2979
2980 The method is invoked in the STS case through
2981 Relay_log_info::adapt_to_master_version() right before a new master
2982 FD is installed into the applier execution context; in the MTS
2983 case it's done by the Worker when it's assigned with a first event
2984 after the latest new FD has been installed.
2985
2986 Comparison of the current (old, existing) and the master (new,
2987 incoming) versions yields adaptive actions.
2988 To explain that, let's denote V_0 as the current, and the master's
2989 one as V_1.
2990 In the downgrade case (V_1 < V_0) a server feature that is undefined
2991 in V_1 but is defined starting from some V_f of [V_1 + 1, V_0] range
2992 (+1 to mean V_1 excluded) are invalidated ("removed" from execution context)
2993 by running so called here downgrade action.
2994 Conversely in the upgrade case a feature defined in [V_0 + 1, V_1] range
2995 is validated ("added" to execution context) by running its upgrade action.
2996 A typical use case showing how adaptive actions are necessary for the slave
2997 applier is when the master version is lesser than the slave's one.
2998 In such case events generated on the "older" master may need to be applied
2999 in their native server context. And such context can be provided by downgrade
3000 actions.
3001 Conversely, when the old master events are run out and a newer master's events
3002 show up for applying, the execution context will be upgraded through
3003 the namesake actions.
3004
3005 Notice that a relay log may have two FD events, one the slave local
3006 and the other from the Master. As there's no concern for the FD
3007 originator this leads to two adapt_to_master_version() calls.
3008 It's not harmful as can be seen from the following example.
3009 Say the currently installed FD's version is
3010 V_m, then at relay-log rotation the following transition takes
3011 place:
3012
3013 V_m -adapt-> V_s -adapt-> V_m.
3014
3015 here and further `m' subscript stands for the master, `s' for the slave.
3016 It's clear that in this case an ineffective V_m -> V_m transition occurs.
3017
3018 At composing downgrade/upgrade actions keep in mind that the slave applier
3019 version transition goes the following route:
3020 The initial version is that of the slave server (V_ss).
3021 It changes to a magic 4.0 at the slave relay log initialization.
3022 In the following course versions are extracted from each FD read out,
3023 regardless of what server generated it. Here is a typical version
3024 transition sequence underscored with annotation:
3025
3026 V_ss -> 4.0 -> V(FD_s^1) -> V(FD_m^2) ---> V(FD_s^3) -> V(FD_m^4) ...
3027
3028 ---------- ----------------- -------- ------------------ ---
3029 bootstrap 1st relay log rotation 2nd log etc
3030
3031 The upper (^) subscipt enumerates Format Description events, V(FD^i) stands
3032 for a function extrating the version data from the i:th FD.
3033
3034 There won't be any action to execute when info_thd is undefined,
3035 e.g at bootstrap.
3036
3037 @param master_version an upcoming new version
3038 @param current_version the current version
3039 @return 0 when the new version is equal to the current one,
3040 master_version otherwise
3041 */
adapt_to_master_version_updown(ulong master_version,ulong current_version)3042 ulong Relay_log_info::adapt_to_master_version_updown(ulong master_version,
3043 ulong current_version)
3044 {
3045 THD *thd= info_thd;
3046 /*
3047 When the SQL thread or MTS Coordinator executes this method
3048 there's a constraint on current_version argument.
3049 */
3050 assert(!thd ||
3051 thd->rli_fake != NULL ||
3052 thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER ||
3053 (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
3054 (!rli_description_event ||
3055 current_version ==
3056 rli_description_event->get_product_version())));
3057
3058 if (master_version == current_version)
3059 return 0;
3060 else if (!thd)
3061 return master_version;
3062
3063 bool downgrade= master_version < current_version;
3064 /*
3065 find item starting from and ending at for which adaptive actions run
3066 for downgrade or upgrade branches.
3067 (todo: convert into bsearch when number of features will grow significantly)
3068 */
3069 long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
3070
3071 for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
3072 {
3073 ulong ver_f= version_product(s_features[i].version_split);
3074
3075 if ((downgrade ? master_version : current_version) < ver_f &&
3076 i_first == st_feature_version::_END_OF_LIST)
3077 i_first= i;
3078 if ((downgrade ? current_version : master_version) < ver_f)
3079 {
3080 i_last= i;
3081 assert(i_last >= i_first);
3082 break;
3083 }
3084 }
3085
3086 /*
3087 actions, executed in version non-descending st_feature_version order
3088 */
3089 for (i= i_first; i < i_last; i++)
3090 {
3091 /* Run time check of the st_feature_version items ordering */
3092 assert(!i ||
3093 version_product(s_features[i - 1].version_split) <=
3094 version_product(s_features[i].version_split));
3095
3096 assert((downgrade ? master_version : current_version) <
3097 version_product(s_features[i].version_split) &&
3098 (downgrade ? current_version : master_version >=
3099 version_product(s_features[i].version_split)));
3100
3101 if (downgrade && s_features[i].downgrade)
3102 {
3103 s_features[i].downgrade(thd);
3104 }
3105 else if (s_features[i].upgrade)
3106 {
3107 s_features[i].upgrade(thd);
3108 }
3109 }
3110
3111 return master_version;
3112 }
3113
relay_log_number_to_name(uint number,char name[FN_REFLEN+1])3114 void Relay_log_info::relay_log_number_to_name(uint number,
3115 char name[FN_REFLEN+1])
3116 {
3117 char *str= NULL;
3118 char relay_bin_channel[FN_REFLEN+1];
3119 const char *relay_log_basename_channel=
3120 add_channel_to_relay_log_name(relay_bin_channel, FN_REFLEN+1,
3121 relay_log_basename);
3122
3123 /* str points to closing null of relay log basename channel */
3124 str= strmake(name, relay_log_basename_channel, FN_REFLEN+1);
3125 *str++= '.';
3126 sprintf(str, "%06u", number);
3127 }
3128
relay_log_name_to_number(const char * name)3129 uint Relay_log_info::relay_log_name_to_number(const char *name)
3130 {
3131 return static_cast<uint>(atoi(fn_ext(name)+1));
3132 }
3133
is_mts_db_partitioned(Relay_log_info * rli)3134 bool is_mts_db_partitioned(Relay_log_info * rli)
3135 {
3136 return (rli->current_mts_submode->get_type() ==
3137 MTS_PARALLEL_TYPE_DB_NAME);
3138 }
3139
get_for_channel_str(bool upper_case) const3140 const char* Relay_log_info::get_for_channel_str(bool upper_case) const
3141 {
3142 if (rli_fake)
3143 return "";
3144 else
3145 return mi->get_for_channel_str(upper_case);
3146 }
3147
add_gtid_set(const Gtid_set * gtid_set)3148 enum_return_status Relay_log_info::add_gtid_set(const Gtid_set *gtid_set)
3149 {
3150 DBUG_ENTER("Relay_log_info::add_gtid_set(gtid_set)");
3151
3152 enum_return_status return_status= this->gtid_set.add_gtid_set(gtid_set);
3153
3154 DBUG_RETURN(return_status);
3155 }
3156
detach_engine_ha_data(THD * thd)3157 void Relay_log_info::detach_engine_ha_data(THD *thd)
3158 {
3159 is_engine_ha_data_detached= true;
3160 /*
3161 In case of slave thread applier or processing binlog by client,
3162 detach the engine ha_data ("native" engine transaction)
3163 in favor of dynamically created.
3164 */
3165 plugin_foreach(thd, detach_native_trx,
3166 MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3167 }
3168
reattach_engine_ha_data(THD * thd)3169 void Relay_log_info::reattach_engine_ha_data(THD *thd)
3170 {
3171 is_engine_ha_data_detached = false;
3172 /*
3173 In case of slave thread applier or processing binlog by client,
3174 reattach the engine ha_data ("native" engine transaction)
3175 in favor of dynamically created.
3176 */
3177 plugin_foreach(thd, reattach_native_trx, MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
3178 }
3179
operator new(size_t request)3180 void* Relay_log_info::operator new(size_t request)
3181 {
3182 void* ptr;
3183 if (posix_memalign(&ptr, __alignof__(Relay_log_info), sizeof(Relay_log_info))) {
3184 throw std::bad_alloc();
3185 }
3186 return ptr;
3187 }
3188
operator delete(void * ptr)3189 void Relay_log_info::operator delete(void * ptr) {
3190 free(ptr);
3191 }
3192