1 /* Copyright (c) 2005, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #ifndef RPL_RLI_H
24 #define RPL_RLI_H
25
26 #if defined(__SUNPRO_CC)
27 /*
28 Solaris Studio 12.5 has a bug where, if you use dynamic_cast
29 and then later #include this file (which Boost does), you will
30 get a compile error. Work around it by just including it right now.
31 */
32 #include <cxxabi.h>
33 #endif
34
35 #include <sys/types.h>
36 #include <time.h>
37 #include <atomic>
38 #include <memory>
39 #include <string>
40 #include <vector>
41
42 #include "lex_string.h"
43 #include "libbinlogevents/include/binlog_event.h"
44 #include "m_string.h"
45 #include "map_helpers.h"
46 #include "my_bitmap.h"
47 #include "my_dbug.h"
48 #include "my_inttypes.h"
49 #include "my_io.h"
50 #include "my_loglevel.h"
51 #include "my_psi_config.h"
52 #include "my_sys.h"
53 #include "mysql/components/services/mysql_cond_bits.h"
54 #include "mysql/components/services/mysql_mutex_bits.h"
55 #include "mysql/components/services/psi_mutex_bits.h"
56 #include "mysql/psi/mysql_mutex.h"
57 #include "mysql/thread_type.h"
58 #include "prealloced_array.h" // Prealloced_array
59 #include "sql/binlog.h" // MYSQL_BIN_LOG
60 #include "sql/log_event.h" //Gtid_log_event
61 #include "sql/psi_memory_key.h"
62 #include "sql/query_options.h"
63 #include "sql/rpl_gtid.h" // Gtid_set
64 #include "sql/rpl_info.h" // Rpl_info
65 #include "sql/rpl_mts_submode.h" // enum_mts_parallel_type
66 #include "sql/rpl_slave_until_options.h"
67 #include "sql/rpl_tblmap.h" // table_mapping
68 #include "sql/rpl_trx_boundary_parser.h"
69 #include "sql/rpl_utility.h" // Deferred_log_events
70 #include "sql/sql_class.h" // THD
71 #include "sql/system_variables.h"
72 #include "sql/table.h"
73
74 class Commit_order_manager;
75 class Master_info;
76 class Rpl_filter;
77 class Rpl_info_handler;
78 class Slave_committed_queue;
79 class Slave_worker;
80 class String;
81 struct LEX_MASTER_INFO;
82 struct db_worker_hash_entry;
83
84 extern uint sql_slave_skip_counter;
85
86 typedef Prealloced_array<Slave_worker *, 4> Slave_worker_array;
87
88 typedef struct slave_job_item {
89 Log_event *data;
90 uint relay_number;
91 my_off_t relay_pos;
92 } Slave_job_item;
93
94 /*******************************************************************************
95 Replication SQL Thread
96
97 Relay_log_info contains:
98 - the current relay log
99 - the current relay log offset
100 - master log name
101 - master log sequence corresponding to the last update
102 - misc information specific to the SQL thread
103
104 Relay_log_info is initialized from a repository, i.e. table or file, if there is
105 one. Otherwise, data members are intialized with defaults by calling
106 init_relay_log_info().
107
108 The relay.info table/file shall be updated whenever: (i) the relay log file
109 is rotated, (ii) SQL Thread is stopped, (iii) while processing a Xid_log_event,
110 (iv) after a Query_log_event (i.e. commit or rollback) and (v) after processing
111 any statement written to the binary log without a transaction context.
112
113 The Xid_log_event is a commit for transactional engines and must be handled
114 differently to provide reliability/data integrity. In this case, positions
115 are updated within the context of the current transaction. So
116
117 . If the relay.info is stored in a transactional repository and the server
118 crashes before successfully committing the transaction the changes to the
119 position table will be rolled back along with the data.
120
121 . If the relay.info is stored in a non-transactional repository, for instance,
122 a file or a system table created using MyIsam, and the server crashes before
123 successfully committing the transaction the changes to the position table
124 will not be rolled back but data will.
125
126 In particular, when there are mixed transactions, i.e a transaction that updates
127 both transaction and non-transactional engines, the Xid_log_event is still used
128 but reliability/data integrity cannot be achieved as we shall explain in what
129 follows.
130
131 Changes to non-transactional engines, such as MyIsam, cannot be rolled back if a
132 failure happens. For that reason, there is no point in updating the positions
133 within the boundaries of any on-going transaction. This is true for both commit
134 and rollback. If a failure happens after processing the pseudo-transaction but
135 before updating the positions, the transaction will be re-executed when the
136 slave is up most likely causing an error that needs to be manually circumvented.
137 This is a well-known issue when non-transactional statements are executed.
138
139 Specifically, if rolling back any transaction, positions are updated outside the
140 transaction boundaries. However, there may be a problem in this scenario even
141 when only transactional engines are updated. This happens because if there is a
142 rollback and such transaction is written to the binary log, a non-transactional
143 engine was updated or a temporary table was created or dropped within its
144 boundaries.
145
146 In particular, in both STATEMENT and MIXED logging formats, this happens because
147 any temporary table is automatically dropped after a shutdown/startup.
148 See BUG#26945 for further details.
149
150 Statements written to the binary log outside the boundaries of a transaction are
151 DDLs or maintenance commands which are not transactional. These means that they
152 cannot be rolled back if a failure happens. In such cases, the positions are
153 updated after processing the events. If a failure happens after processing the
154 statement but before updating the positions, the statement will be
155 re-executed when the slave is up most likely causing an error that needs to be
156 manually circumvented. This is a well-known issue when non-transactional
157 statements are executed.
158
159 The --sync-relay-log-info does not have effect when a system table, either
160 transactional or non-transactional is used.
161
162 To correctly recovery from failures, one should combine transactional system
163 tables along with the --relay-log-recovery.
164 *******************************************************************************/
165 class Relay_log_info : public Rpl_info {
166 friend class Rpl_info_factory;
167
168 public:
169 /**
170 Set of possible return values for the member methods related to
171 `PRIVILEGE_CHECKS_USER` management.
172 */
173 enum class enum_priv_checks_status : int {
174 /** Function ended successfully */
175 SUCCESS = 0,
176 /** Value for user is anonymous (''@'...') */
177 USER_ANONYMOUS,
178 /** Value for the username part of the user is larger than 32 characters */
179 USERNAME_TOO_LONG,
180 /** Value for the hostname part of the user is larger than 255 characters */
181 HOSTNAME_TOO_LONG,
182 /** Value for the hostname part of the user has illegal characters */
183 HOSTNAME_SYNTAX_ERROR,
184 /**
185 Value for the username part of the user is NULL but the value for the
186 hostname is not NULL.
187 */
188 USERNAME_NULL_HOSTNAME_NOT_NULL,
189 /**
190 Provided user doesn't exists.
191 */
192 USER_DOES_NOT_EXIST,
193 /**
194 Provided user doesn't have the necesary privileges to execute the needed
195 operations.
196 */
197 USER_DOES_NOT_HAVE_PRIVILEGES,
198 /** Values provided for the internal variables are corrupted. */
199 USER_DATA_CORRUPTED,
200 /**
201 Provided user doesn't have `FILE` privileges during the execution of a
202 `LOAD DATA`event.
203 */
204 LOAD_DATA_EVENT_NOT_ALLOWED
205 };
206
207 enum class enum_require_row_status : int {
208 /** Function ended successfully */
209 SUCCESS = 0,
210 /** Value for `privilege_checks_user` is not empty */
211 PRIV_CHECKS_USER_NOT_NULL
212 };
213
214 /*
215 The per-channel filter associated with this RLI
216 */
217 Rpl_filter *rpl_filter;
218 /**
219 Flags for the state of the replication.
220 */
221 enum enum_state_flag {
222 /** The replication thread is inside a statement */
223 IN_STMT,
224
225 /** Flag counter. Should always be last */
226 STATE_FLAGS_COUNT
227 };
228
229 /**
230 Identifies what is the slave policy on primary keys in tables.
231 */
232 enum enum_require_table_primary_key {
233 /**No policy, used on PFS*/
234 PK_CHECK_NONE = 0,
235 /**
236 The slave sets the value of sql_require_primary_key according to
237 the source replicated value.
238 */
239 PK_CHECK_STREAM = 1,
240 /** The slave enforces tables to have primary keys for a given channel*/
241 PK_CHECK_ON = 2,
242 /** The slave does not enforce any policy around primary keys*/
243 PK_CHECK_OFF = 3
244 };
245
246 /*
247 The SQL thread owns one Relay_log_info, and each client that has
248 executed a BINLOG statement owns one Relay_log_info. This function
249 returns zero for the Relay_log_info object that belongs to the SQL
250 thread and nonzero for Relay_log_info objects that belong to
251 clients.
252 */
belongs_to_client()253 inline bool belongs_to_client() {
254 DBUG_ASSERT(info_thd);
255 return !info_thd->slave_thread;
256 }
257 /* Instrumentation key for performance schema for mts_temp_table_LOCK */
258 #ifdef HAVE_PSI_INTERFACE
259 PSI_mutex_key m_key_mts_temp_table_LOCK;
260 #endif
261 /*
262 Lock to protect race condition while transferring temporary table from
263 worker thread to coordinator thread and vice-versa
264 */
265 mysql_mutex_t mts_temp_table_LOCK;
266 /*
267 Lock to acquire by methods that concurrently update lwm of committed
268 transactions and the min waited timestamp and its index.
269 */
270 mysql_mutex_t mts_gaq_LOCK;
271 mysql_cond_t logical_clock_cond;
272 /*
273 If true, events with the same server id should be replicated. This
274 field is set on creation of a relay log info structure by copying
275 the value of ::replicate_same_server_id and can be overridden if
276 necessary. For example of when this is done, check sql_binlog.cc,
277 where the BINLOG statement can be used to execute "raw" events.
278 */
279 bool replicate_same_server_id;
280
281 /*
282 Protected with internal locks.
283 Must get data_lock when resetting the logs.
284 */
285 MYSQL_BIN_LOG relay_log;
286
287 /*
288 Identifies when the recovery process is going on.
289 See sql/rpl_slave.h:init_recovery for further details.
290 */
291 bool is_relay_log_recovery;
292
293 /* The following variables are safe to read any time */
294
295 /*
296 When we restart slave thread we need to have access to the previously
297 created temporary tables. Modified only on init/end and by the SQL
298 thread, read only by SQL thread.
299 */
300 TABLE *save_temporary_tables;
301
302 /* parent Master_info structure */
303 Master_info *mi;
304
305 /* number of temporary tables open in this channel */
306 std::atomic<int32> atomic_channel_open_temp_tables{0};
307
308 /** the status of the commit timestamps for the relay log */
309 enum {
310 /*
311 no GTID log event has been processed, so it is not known if this log
312 has commit timestamps
313 */
314 COMMIT_TS_UNKNOWN,
315 // the immediate master does not support commit timestamps
316 COMMIT_TS_NOT_FOUND,
317 // the immediate master supports commit timestamps
318 COMMIT_TS_FOUND
319 } commit_timestamps_status;
320
321 /**
322 @return the pointer to the Gtid_monitoring_info.
323 */
get_gtid_monitoring_info()324 Gtid_monitoring_info *get_gtid_monitoring_info() {
325 return gtid_monitoring_info;
326 }
327
328 /**
329 Stores the details of the transaction which has just started processing.
330
331 This function is called by the STS applier or MTS worker when applying a
332 Gtid.
333
334 @param gtid_arg the gtid of the trx
335 @param original_ts_arg the original commit timestamp of the transaction
336 @param immediate_ts_arg the immediate commit timestamp of the transaction
337 @param skipped true if the transaction was gtid skipped
338 */
339 void started_processing(Gtid gtid_arg, ulonglong original_ts_arg,
340 ulonglong immediate_ts_arg, bool skipped = false) {
341 gtid_monitoring_info->start(gtid_arg, original_ts_arg, immediate_ts_arg,
342 skipped);
343 }
344
345 /**
346 Stores the details of the transaction which has just started processing.
347
348 This function is called by the MTS coordinator when queuing a Gtid to
349 a worker.
350
351 @param gtid_log_ev_arg the gtid log event of the trx
352 */
started_processing(Gtid_log_event * gtid_log_ev_arg)353 void started_processing(Gtid_log_event *gtid_log_ev_arg) {
354 Gtid gtid = {0, 0};
355 if (gtid_log_ev_arg->get_type() == ASSIGNED_GTID) {
356 gtid = {gtid_log_ev_arg->get_sidno(true), gtid_log_ev_arg->get_gno()};
357 }
358 started_processing(gtid, gtid_log_ev_arg->original_commit_timestamp,
359 gtid_log_ev_arg->immediate_commit_timestamp);
360 }
361
362 /**
363 When the processing of a transaction is completed, that timestamp is
364 recorded, the information is copied to last_processed_trx and the
365 information in processing_trx is cleared.
366
367 If the transaction was "applied" but GTID-skipped, the copy will not
368 happen and the last_processed_trx will keep its current value.
369 */
finished_processing()370 void finished_processing() { gtid_monitoring_info->finish(); }
371
372 /**
373 @return True if there is a transaction being currently processed
374 */
is_processing_trx()375 bool is_processing_trx() {
376 return gtid_monitoring_info->is_processing_trx_set();
377 }
378
379 /**
380 Clears the processing_trx structure fields. Normally called when there is an
381 error while processing the transaction.
382 */
clear_processing_trx()383 void clear_processing_trx() { gtid_monitoring_info->clear_processing_trx(); }
384
385 /**
386 Clears the Gtid_monitoring_info fields.
387 */
clear_gtid_monitoring_info()388 void clear_gtid_monitoring_info() { gtid_monitoring_info->clear(); }
389
390 /**
391 When a transaction is retried, the error number and message, and total number
392 of retries are stored. The timestamp for this error is also set here.
393
394 @param transient_errno_arg Transient error number.
395 @param transient_err_message_arg Transient error message.
396 @param trans_retries_arg Number of times this transaction has been
397 retried so far.
398 */
retried_processing(uint transient_errno_arg,const char * transient_err_message_arg,ulong trans_retries_arg)399 void retried_processing(uint transient_errno_arg,
400 const char *transient_err_message_arg,
401 ulong trans_retries_arg) {
402 gtid_monitoring_info->store_transient_error(
403 transient_errno_arg, transient_err_message_arg, trans_retries_arg);
404 }
405
406 /*
407 If on init_info() call error_on_rli_init_info is true that means
408 that previous call to init_info() terminated with an error, RESET
409 SLAVE must be executed and the problem fixed manually.
410 */
411 bool error_on_rli_init_info;
412
413 /**
414 Variable is set to true as long as
415 original_commit_timestamp > immediate_commit_timestamp so that the
416 corresponding warning is only logged once.
417 */
418 bool gtid_timestamps_warning_logged;
419
420 /**
421 Retrieves the username part of the `PRIVILEGE_CHECKS_USER` option of `CHANGE
422 MASTER TO` statement.
423
424 @return a string holding the username part of the user or an empty string.
425 */
426 std::string get_privilege_checks_username() const;
427
428 /**
429 Retrieves the host part of the `PRIVILEGE_CHECKS_USER` option of `CHANGE
430 MASTER TO` statement.
431
432 @return a string holding the host part of the user or an empty string.
433 */
434 std::string get_privilege_checks_hostname() const;
435
436 /**
437 Returns whether or not there is no user configured for
438 `PRIVILEGE_CHECKS_USER`.
439
440 @return true if there is no user configured for `PRIVILEGE_CHECKS_USER` and
441 false otherwise.
442 */
443 bool is_privilege_checks_user_null() const;
444
445 /**
446 Returns whether or not the internal data regarding `PRIVILEGE_CHECKS_USER`
447 is corrupted. This may happen, for instance, if the user tries to change the
448 Relay_log_info repository manually or after a server crash.
449
450 @return true if the data is corrupted, false otherwise.
451 */
452 bool is_privilege_checks_user_corrupted() const;
453
454 /**
455 Clears the info related to the data initialized from
456 `PRIVILEGE_CHECKS_USER`.
457 */
458 void clear_privilege_checks_user();
459
460 /**
461 Sets the flag that tells whether or not the data regarding the
462 `PRIVILEGE_CHECKS_USER` is corrupted.
463
464 @param is_corrupted the flag value.
465 */
466 void set_privilege_checks_user_corrupted(bool is_corrupted);
467
468 /**
469 Initializes data related to `PRIVILEGE_CHECKS_USER`, specifically the user
470 name and the user hostname.
471
472 @param param_privilege_checks_username the username part of the user.
473 @param param_privilege_checks_hostname the hostname part of the user.
474
475 @return a status code describing the state of the data initialization.
476 */
477 enum_priv_checks_status set_privilege_checks_user(
478 char const *param_privilege_checks_username,
479 char const *param_privilege_checks_hostname);
480
481 /**
482 Checks the validity and integrity of the data related to
483 `PRIVILEGE_CHECKS_USER`, specifically the user name and the user
484 hostname. Also checks if the user exists.
485
486 This method takes no parameters as it checks the values stored in the
487 internal member variables.
488
489 @return a status code describing the state of the data initialization.
490 */
491 enum_priv_checks_status check_privilege_checks_user();
492
493 /**
494 Checks the validity and integrity of the data related to
495 `PRIVILEGE_CHECKS_USER`, specifically the user name and the user
496 hostname. Also checks if the user exists.
497
498 @param param_privilege_checks_username the username part of the user.
499 @param param_privilege_checks_hostname the hostname part of the user.
500
501 @return a status code describing the state of the data initialization.
502 */
503 enum_priv_checks_status check_privilege_checks_user(
504 char const *param_privilege_checks_username,
505 char const *param_privilege_checks_hostname);
506 /**
507 Checks the existence of user provided as part of the `PRIVILEGE_CHECKS_USER`
508 option.
509
510 @param param_privilege_checks_username the username part of the user.
511 @param param_privilege_checks_hostname the host part of the user.
512
513 @return a status code describing the state of the data initialization.
514 */
515 enum_priv_checks_status check_applier_acl_user(
516 char const *param_privilege_checks_username,
517 char const *param_privilege_checks_hostname);
518
519 /**
520 Returns a printable representation of the username and hostname currently
521 being used in the applier security context or empty strings other wise.
522
523 @return an `std::pair` containing the username and the hostname printable
524 representations.
525 */
526 std::pair<const char *, const char *>
527 print_applier_security_context_user_host() const;
528
529 /**
530 Outputs the error message associated with applier thread user privilege
531 checks error `error_code`.
532
533 The output stream to which is outputted is decided based on `to_client`
534 which, if set to `true` will output the message to the client session and if
535 `false` will output to the server log.
536
537 @param level the message urgency level, e.g., `ERROR_LEVEL`,
538 `WARNING_LEVEL`, etc.
539 @param status_code the status code to output the associated error message
540 for.
541 @param to_client a flag indicating if the message should be sent to the
542 client session or to the server log.
543 @param channel_name name of the channel for which the error is being
544 reported.
545 @param user_name username for which the error is being reported.
546 @param host_name hostname for which the error is being reported.
547 */
548 void report_privilege_check_error(enum loglevel level,
549 enum_priv_checks_status status_code,
550 bool to_client,
551 char const *channel_name = nullptr,
552 char const *user_name = nullptr,
553 char const *host_name = nullptr) const;
554
555 /**
556 Initializes the security context associated with the `PRIVILEGE_CHECKS_USER`
557 user that is to be used by the provided THD object.
558
559 @return a status code describing the state of the data initialization.
560 */
561 enum_priv_checks_status initialize_security_context(THD *thd);
562 /**
563 Initializes the security context associated with the `PRIVILEGE_CHECKS_USER`
564 user that is to be used by the applier thread.
565
566 @return a status code describing the state of the data initialization.
567 */
568 enum_priv_checks_status initialize_applier_security_context();
569
570 /**
571 Returns whether the slave is running in row mode only.
572
573 @return true if row_format_required is active, false otherwise.
574 */
575 bool is_row_format_required() const;
576
577 /**
578 Sets the flag that tells whether or not the slave is running in row mode
579 only.
580
581 @param require_row the flag value.
582 */
583 void set_require_row_format(bool require_row);
584
585 /**
586 Returns what is the slave policy concerning primary keys on
587 replicated tables.
588
589 @return STREAM if it replicates the source values, ON if it enforces the
590 need on primary keys, OFF if it does no enforce any restrictions.
591 */
592 enum_require_table_primary_key get_require_table_primary_key_check() const;
593
594 /**
595 Sets the field that tells what is the slave policy concerning primary keys
596 on replicated tables.
597
598 @param require_pk the policy value.
599 */
600 void set_require_table_primary_key_check(
601 enum_require_table_primary_key require_pk);
602
603 /*
604 This will be used to verify transactions boundaries of events being applied
605
606 Its output is used to detect when events were not logged using row based
607 logging.
608 */
609 Replication_transaction_boundary_parser transaction_parser;
610
611 /*
612 Let's call a group (of events) :
613 - a transaction
614 or
615 - an autocommiting query + its associated events (INSERT_ID,
616 TIMESTAMP...)
617 We need these rli coordinates :
618 - relay log name and position of the beginning of the group we currently are
619 executing. Needed to know where we have to restart when replication has
620 stopped in the middle of a group (which has been rolled back by the slave).
621 - relay log name and position just after the event we have just
622 executed. This event is part of the current group.
623 Formerly we only had the immediately above coordinates, plus a 'pending'
624 variable, but this dealt wrong with the case of a transaction starting on a
625 relay log and finishing (commiting) on another relay log. Case which can
626 happen when, for example, the relay log gets rotated because of
627 max_binlog_size.
628 */
629 protected:
630 /**
631 Event group means a group of events of a transaction. group_relay_log_name
632 and group_relay_log_pos record the place before where all event groups
633 are applied. When slave starts, it resume to apply events from
634 group_relay_log_pos. They will be initialized to the begin of the first
635 relay log file if it is a new slave(including SLAVE RESET). Then,
636 group_relay_log_pos is advanced after each transaction is applied
637 successfully in single thread slave. For MTS, group_relay_log_pos
638 is updated by mts checkpoint mechanism. group_relay_log_pos and
639 group_relay_log_name are stored into relay_log_info file/table
640 periodically. When server startup, they are loaded from relay log info
641 file/table.
642 */
643 char group_relay_log_name[FN_REFLEN];
644 ulonglong group_relay_log_pos;
645 char event_relay_log_name[FN_REFLEN];
646 /* The suffix number of relay log name */
647 uint event_relay_log_number;
648 ulonglong event_relay_log_pos;
649 ulonglong future_event_relay_log_pos;
650
651 /* current event's start position in relay log */
652 my_off_t event_start_pos;
653 /*
654 Original log name and position of the group we're currently executing
655 (whose coordinates are group_relay_log_name/pos in the relay log)
656 in the master's binlog. These concern the *group*, because in the master's
657 binlog the log_pos that comes with each event is the position of the
658 beginning of the group.
659
660 Note: group_master_log_name, group_master_log_pos must only be
661 written from the thread owning the Relay_log_info (SQL thread if
662 !belongs_to_client(); client thread executing BINLOG statement if
663 belongs_to_client()).
664 */
665 char group_master_log_name[FN_REFLEN];
666 volatile my_off_t group_master_log_pos;
667
668 private:
669 Gtid_set *gtid_set;
670 /*
671 Identifies when this object belongs to the SQL thread and was not
672 created for a client thread or some other purpose including
673 Slave_worker instance initializations. Ends up serving the same
674 purpose as the belongs_to_client method, but its value is set
675 earlier on in the class constructor.
676 */
677 bool rli_fake;
678 /* Flag that ensures the retrieved GTID set is initialized only once. */
679 bool gtid_retrieved_initialized;
680
681 /**
682 Stores information on the last processed transaction or the transaction
683 that is currently being processed.
684
685 STS:
686 - timestamps of the currently applying/last applied transaction
687
688 MTS:
689 - coordinator thread: timestamps of the currently scheduling/last scheduled
690 transaction in a worker's queue
691 - worker thread: timestamps of the currently applying/last applied
692 transaction
693 */
694 Gtid_monitoring_info *gtid_monitoring_info;
695
696 /**
697 It will be set to true when receiver truncated relay log for some reason.
698 The truncated data may already be read by applier. So applier need to check
699 it each time the binlog_end_pos is updated.
700 */
701 bool m_relay_log_truncated = false;
702
703 /**
704 The user name part of the user passed on to `PRIVILEGE_CHECKS_USER`.
705 */
706 std::string m_privilege_checks_username;
707
708 /**
709 The host name part of the user passed on to `PRIVILEGE_CHECKS_USER`.
710 */
711 std::string m_privilege_checks_hostname;
712
713 /**
714 Tells whether or not the internal data regarding `PRIVILEGE_CHECKS_USER` is
715 corrupted. This may happen if the user tries to change the Relay_log_info
716 repository by hand.
717 */
718 bool m_privilege_checks_user_corrupted;
719
720 /**
721 Tells if the slave is only accepting events logged with row based logging.
722 It also blocks
723 Operations with temporary table creation/deletion
724 Operations with LOAD DATA
725 Events: INTVAR_EVENT, RAND_EVENT, USER_VAR_EVENT
726 */
727 bool m_require_row_format;
728
729 /**
730 Identifies what is the slave policy on primary keys in tables.
731 If set to STREAM it just replicates the value of sql_require_primary_key.
732 If set to ON it fails when the source tries to replicate a table creation
733 or alter operation that does not have a primary key.
734 If set to OFF it does not enforce any policies on the channel for primary
735 keys.
736 */
737 enum_require_table_primary_key m_require_table_primary_key_check;
738
739 public:
is_relay_log_truncated()740 bool is_relay_log_truncated() { return m_relay_log_truncated; }
741
get_sid_map()742 Sid_map *get_sid_map() { return gtid_set->get_sid_map(); }
743
get_sid_lock()744 Checkable_rwlock *get_sid_lock() { return get_sid_map()->get_sid_lock(); }
745
add_logged_gtid(rpl_sidno sidno,rpl_gno gno)746 void add_logged_gtid(rpl_sidno sidno, rpl_gno gno) {
747 get_sid_lock()->assert_some_lock();
748 DBUG_ASSERT(sidno <= get_sid_map()->get_max_sidno());
749 gtid_set->ensure_sidno(sidno);
750 gtid_set->_add_gtid(sidno, gno);
751 }
752
753 /**
754 Adds a GTID set to received GTID set.
755
756 @param gtid_set the gtid_set to add
757
758 @return RETURN_STATUS_OK or RETURN_STATUS_REPORTED_ERROR.
759 */
760 enum_return_status add_gtid_set(const Gtid_set *gtid_set);
761
get_gtid_set()762 const Gtid_set *get_gtid_set() const { return gtid_set; }
763
764 bool reinit_sql_thread_io_cache(const char *log, bool need_data_lock);
765
766 /**
767 Check if group_relay_log_name is in index file.
768
769 @param [out] errmsg An error message is returned if error happens.
770
771 @retval false It is valid.
772 @retval true It is invalid. In this case, *errmsg is set to point to
773 the error message.
774 */
775 bool is_group_relay_log_name_invalid(const char **errmsg);
776 /**
777 Reset group_relay_log_name and group_relay_log_pos to the start of the
778 first relay log file. The caller must hold data_lock.
779
780 @param[out] errmsg An error message is set into it if error happens.
781
782 @retval false Success
783 @retval true Error
784 */
785 bool reset_group_relay_log_pos(const char **errmsg);
786 /*
787 Update the error number, message and timestamp fields. This function is
788 different from va_report() as va_report() also logs the error message in the
789 log apart from updating the error fields.
790 */
791 void fill_coord_err_buf(loglevel level, int err_code,
792 const char *buff_coord) const;
793
794 /*
795 Flag that the group_master_log_pos is invalid. This may occur
796 (for example) after CHANGE MASTER TO RELAY_LOG_POS. This will
797 be unset after the first event has been executed and the
798 group_master_log_pos is valid again.
799 */
800 bool is_group_master_log_pos_invalid;
801
802 /*
803 Handling of the relay_log_space_limit optional constraint.
804 ignore_log_space_limit is used to resolve a deadlock between I/O and SQL
805 threads, the SQL thread sets it to unblock the I/O thread and make it
806 temporarily forget about the constraint.
807 */
808 ulonglong log_space_limit, log_space_total;
809 std::atomic<bool> ignore_log_space_limit;
810
811 /*
812 Used by the SQL thread to instructs the IO thread to rotate
813 the logs when the SQL thread needs to purge to release some
814 disk space.
815 */
816 std::atomic<bool> sql_force_rotate_relay;
817
818 time_t last_master_timestamp;
819
820 /**
821 Reset the delay.
822 This is used by RESET SLAVE to clear the delay.
823 */
clear_sql_delay()824 void clear_sql_delay() { sql_delay = 0; }
825
826 /*
827 Needed for problems when slave stops and we want to restart it
828 skipping one or more events in the master log that have caused
829 errors, and have been manually applied by DBA already.
830 */
831 volatile uint32 slave_skip_counter;
832 volatile ulong abort_pos_wait; /* Incremented on change master */
833 mysql_mutex_t log_space_lock;
834 mysql_cond_t log_space_cond;
835
836 /*
837 Condition and its parameters from START SLAVE UNTIL clause.
838
839 UNTIL condition is tested with is_until_satisfied() method that is
840 called by exec_relay_log_event(). is_until_satisfied() caches the result
841 of the comparison of log names because log names don't change very often;
842 this cache is invalidated by parts of code which change log names with
843 notify_*_log_name_updated() methods. (They need to be called only if SQL
844 thread is running).
845 */
846 enum {
847 UNTIL_NONE = 0,
848 UNTIL_MASTER_POS,
849 UNTIL_RELAY_POS,
850 UNTIL_SQL_BEFORE_GTIDS,
851 UNTIL_SQL_AFTER_GTIDS,
852 UNTIL_SQL_AFTER_MTS_GAPS,
853 UNTIL_SQL_VIEW_ID,
854 UNTIL_DONE
855 } until_condition;
856
857 char cached_charset[6];
858
859 /*
860 trans_retries varies between 0 to slave_transaction_retries and counts how
861 many times the slave has retried the present transaction; gets reset to 0
862 when the transaction finally succeeds. retried_trans is a cumulative
863 counter: how many times the slave has retried a transaction (any) since
864 slave started.
865 */
866 ulong trans_retries, retried_trans;
867
868 /*
869 If the end of the hot relay log is made of master's events ignored by the
870 slave I/O thread, these two keep track of the coords (in the master's
871 binlog) of the last of these events seen by the slave I/O thread. If not,
872 ign_master_log_name_end[0] == 0.
873 As they are like a Rotate event read/written from/to the relay log, they
874 are both protected by rli->relay_log.LOCK_binlog_end_pos.
875 */
876 char ign_master_log_name_end[FN_REFLEN];
877 ulonglong ign_master_log_pos_end;
878
879 /*
880 Indentifies where the SQL Thread should create temporary files for the
881 LOAD DATA INFILE. This is used for security reasons.
882 */
883 char slave_patternload_file[FN_REFLEN];
884 size_t slave_patternload_file_size;
885
886 /**
887 Identifies the last time a checkpoint routine has been executed.
888 */
889 struct timespec last_clock;
890
891 /**
892 Invalidates cached until_log_name and event_relay_log_name comparison
893 result. Should be called after switch to next relay log if
894 there chances that sql_thread is running.
895 */
notify_relay_log_change()896 inline void notify_relay_log_change() {
897 if (until_condition == UNTIL_RELAY_POS)
898 dynamic_cast<Until_position *>(until_option)->notify_log_name_change();
899 }
900
901 /**
902 Receiver thread notifies that it truncated some data from relay log.
903 data_lock will be acquired, so the caller should not hold data_lock.
904 */
905 void notify_relay_log_truncated();
906 /**
907 Applier clears the flag after it handled the situation. The caller must
908 hold data_lock.
909 */
910 void clear_relay_log_truncated();
911
912 /**
913 The same as @c notify_group_relay_log_name_update but for
914 @c group_master_log_name.
915 */
notify_group_master_log_name_update()916 inline void notify_group_master_log_name_update() {
917 if (until_condition == UNTIL_MASTER_POS)
918 dynamic_cast<Until_position *>(until_option)->notify_log_name_change();
919 }
920
inc_event_relay_log_pos()921 inline void inc_event_relay_log_pos() {
922 event_relay_log_pos = future_event_relay_log_pos;
923 }
924
925 /**
926 Last executed event group coordinates are updated and optionally
927 forcibly flushed to a repository.
928 @param log_pos a value of the executed position to update to
929 @param need_data_lock whether data_lock should be acquired
930 @param force the value is passed to eventual flush_info()
931 */
932 int inc_group_relay_log_pos(ulonglong log_pos, bool need_data_lock,
933 bool force = false);
934
935 int wait_for_pos(THD *thd, String *log_name, longlong log_pos,
936 double timeout);
937 /**
938 Wait for a GTID set to be executed.
939
940 @param thd The thread for status changes and kill status
941 @param gtid A char array with a GTID set
942 @param timeout Number of seconds to wait before timing out
943 @param update_THD_status Shall the method update the THD stage
944
945 @retval 0 The set is already executed
946 @retval -1 There was a timeout waiting for the set
947 @retval -2 There was an issue while waiting.
948 */
949 int wait_for_gtid_set(THD *thd, const char *gtid, double timeout,
950 bool update_THD_status = true);
951 /**
952 Wait for a GTID set to be executed.
953
954 @param thd The thread for status changes and kill status
955 @param gtid A String with a GTID set
956 @param timeout Number of seconds to wait before timing out
957 @param update_THD_status Shall the method update the THD stage
958
959 @retval 0 The set is already executed
960 @retval -1 There was a timeout waiting for the set
961 @retval -2 There was an issue while waiting.
962 */
963 int wait_for_gtid_set(THD *thd, String *gtid, double timeout,
964 bool update_THD_status = true);
965 /**
966 Wait for a GTID set to be executed.
967
968 @param thd The thread for status changes and kill status
969 @param wait_gtid_set A GTID_set object
970 @param timeout Number of seconds to wait before timing out
971 @param update_THD_status Shall the method update the THD stage
972
973 @retval 0 The set is already executed
974 @retval -1 There was a timeout waiting for the set
975 @retval -2 There was an issue while waiting.
976 */
977 int wait_for_gtid_set(THD *thd, const Gtid_set *wait_gtid_set, double timeout,
978 bool update_THD_status = true);
979
980 void close_temporary_tables();
981
982 RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
983 uint tables_to_lock_count; /* RBR: Count of tables to lock */
984 table_mapping m_table_map; /* RBR: Mapping table-id to table */
985 /* RBR: Record Rows_query log event */
986 Rows_query_log_event *rows_query_ev;
987
get_table_data(TABLE * table_arg,table_def ** tabledef_var,TABLE ** conv_table_var)988 bool get_table_data(TABLE *table_arg, table_def **tabledef_var,
989 TABLE **conv_table_var) const {
990 DBUG_ASSERT(tabledef_var && conv_table_var);
991 for (TABLE_LIST *ptr = tables_to_lock; ptr != nullptr;
992 ptr = ptr->next_global)
993 if (ptr->table == table_arg) {
994 *tabledef_var = &static_cast<RPL_TABLE_LIST *>(ptr)->m_tabledef;
995 *conv_table_var = static_cast<RPL_TABLE_LIST *>(ptr)->m_conv_table;
996 DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
997 " tabledef: %p, conv_table: %p",
998 table_arg->s->db.str, table_arg->s->table_name.str,
999 *tabledef_var, *conv_table_var));
1000 return true;
1001 }
1002 return false;
1003 }
1004
1005 /**
1006 Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
1007 the thread save 3 @c get_charset() per @c Query_log_event if the charset is
1008 not changing from event to event (common situation). When the 6 bytes are
1009 equal to 0 is used to mean "cache is invalidated".
1010 */
1011 void cached_charset_invalidate();
1012 bool cached_charset_compare(char *charset) const;
1013
1014 void cleanup_context(THD *, bool);
1015 void slave_close_thread_tables(THD *);
1016 void clear_tables_to_lock();
1017 int purge_relay_logs(THD *thd, const char **errmsg, bool delete_only = false);
1018
1019 /*
1020 Used to defer stopping the SQL thread to give it a chance
1021 to finish up the current group of events.
1022 The timestamp is set and reset in @c sql_slave_killed().
1023 */
1024 time_t last_event_start_time;
1025
1026 /* The original master commit timestamp in microseconds since epoch */
1027 uint64 original_commit_timestamp;
1028
1029 /*
1030 A container to hold on Intvar-, Rand-, Uservar- log-events in case
1031 the slave is configured with table filtering rules.
1032 The withhold events are executed when their parent Query destiny is
1033 determined for execution as well.
1034 */
1035 Deferred_log_events *deferred_events;
1036
1037 /*
1038 State of the container: true stands for IRU events gathering,
1039 false does for execution, either deferred or direct.
1040 */
1041 bool deferred_events_collecting;
1042
1043 /*****************************************************************************
1044 WL#5569 MTS
1045
1046 legends:
1047 C - Coordinator;
1048 W - Worker;
1049 WQ - Worker Queue containing event assignments
1050 */
1051 // number's is determined by global slave_parallel_workers
1052 Slave_worker_array workers;
1053
1054 // To map a database to a worker
1055 malloc_unordered_map<std::string,
1056 unique_ptr_with_deleter<db_worker_hash_entry>>
1057 mapping_db_to_worker{key_memory_db_worker_hash_entry};
1058 bool inited_hash_workers; // flag to check if mapping_db_to_worker is inited
1059
1060 mysql_mutex_t slave_worker_hash_lock; // for mapping_db_to_worker
1061 mysql_cond_t slave_worker_hash_cond; // for mapping_db_to_worker
1062
1063 /*
1064 For the purpose of reporting the worker status in performance schema table,
1065 we need to preserve the workers array after worker thread was killed. So, we
1066 copy this array into the below vector which is used for reporting
1067 until next init_workers(). Note that we only copy those attributes that
1068 would be useful in reporting worker status. We only use a few attributes in
1069 this object as of now but still save the whole object. The idea is
1070 to be future proof. We will extend performance schema tables in future
1071 and then we would use a good number of attributes from this object.
1072 */
1073
1074 std::vector<Slave_worker *> workers_copy_pfs;
1075
1076 /*
1077 This flag is turned ON when the workers array is initialized.
1078 Before destroying the workers array we check this flag to make sure
1079 we are not destroying an unitilized array. For the purpose of reporting the
1080 worker status in performance schema table, we need to preserve the workers
1081 array after worker thread was killed. So, we copy this array into
1082 workers_copy_pfs array which is used for reporting until next
1083 init_workers().
1084 */
1085 bool workers_array_initialized;
1086
1087 volatile ulong pending_jobs;
1088 mysql_mutex_t pending_jobs_lock;
1089 mysql_cond_t pending_jobs_cond;
1090 mysql_mutex_t exit_count_lock; // mutex of worker exit count
1091 ulong mts_slave_worker_queue_len_max;
1092 ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s
1093 ulonglong mts_pending_jobs_size_max; // max of WQ:s size forcing C to wait
1094 bool mts_wq_oversize; // C raises flag to wait some memory's released
1095 Slave_worker
1096 *last_assigned_worker; // is set to a Worker at assigning a group
1097 /*
1098 master-binlog ordered queue of Slave_job_group descriptors of groups
1099 that are under processing. The queue size is @c checkpoint_group.
1100 */
1101 Slave_committed_queue *gaq;
1102 /*
1103 Container for references of involved partitions for the current event group
1104 */
1105 // CGAP dynarray holds id:s of partitions of the Current being executed Group
1106 Prealloced_array<db_worker_hash_entry *, 4> curr_group_assigned_parts;
1107 // deferred array to hold partition-info-free events
1108 Prealloced_array<Slave_job_item, 8> curr_group_da;
1109
1110 bool curr_group_seen_gtid; // current group started with Gtid-event or not
1111 bool curr_group_seen_begin; // current group started with B-event or not
1112 bool curr_group_isolated; // current group requires execution in isolation
1113 bool mts_end_group_sets_max_dbs; // flag indicates if partitioning info is
1114 // discovered
1115 volatile ulong
1116 mts_wq_underrun_w_id; // Id of a Worker whose queue is getting empty
1117 /*
1118 Ongoing excessive overrun counter to correspond to number of events that
1119 are being scheduled while a WQ is close to be filled up.
1120 `Close' is defined as (100 - mts_worker_underrun_level) %.
1121 The counter is incremented each time a WQ get filled over that level
1122 and decremented when the level drops below.
1123 The counter therefore describes level of saturation that Workers
1124 are experiencing and is used as a parameter to compute a nap time for
1125 Coordinator in order to avoid reaching WQ limits.
1126 */
1127 volatile long mts_wq_excess_cnt;
1128 long mts_worker_underrun_level; // % of WQ size at which W is considered
1129 // hungry
1130 ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
1131 ulong opt_slave_parallel_workers; // cache for ::opt_slave_parallel_workers
1132 ulong slave_parallel_workers; // the one slave session time number of workers
1133 ulong
1134 exit_counter; // Number of workers contributed to max updated group index
1135 ulonglong max_updated_index;
1136 ulong recovery_parallel_workers; // number of workers while recovering
1137 uint rli_checkpoint_seqno; // counter of groups executed after the most
1138 // recent CP
1139 uint checkpoint_group; // cache for ::opt_mts_checkpoint_group
1140 MY_BITMAP recovery_groups; // bitmap used during recovery
1141 bool recovery_groups_inited;
1142 ulong mts_recovery_group_cnt; // number of groups to execute at recovery
1143 ulong mts_recovery_index; // running index of recoverable groups
1144 bool mts_recovery_group_seen_begin;
1145
1146 /*
1147 While distibuting events basing on their properties MTS
1148 Coordinator changes its mts group status.
1149 Transition normally flowws to follow `=>' arrows on the diagram:
1150
1151 +----------------------------+
1152 V |
1153 MTS_NOT_IN_GROUP => |
1154 {MTS_IN_GROUP => MTS_END_GROUP --+} while (!killed) => MTS_KILLED_GROUP
1155
1156 MTS_END_GROUP has `->' loop breaking link to MTS_NOT_IN_GROUP when
1157 Coordinator synchronizes with Workers by demanding them to
1158 complete their assignments.
1159 */
1160 enum {
1161 /*
1162 no new events were scheduled after last synchronization,
1163 includes Single-Threaded-Slave case.
1164 */
1165 MTS_NOT_IN_GROUP,
1166
1167 MTS_IN_GROUP, /* at least one not-terminal event scheduled to a Worker */
1168 MTS_END_GROUP, /* the last scheduled event is a terminal event */
1169 MTS_KILLED_GROUP /* Coordinator gave up to reach MTS_END_GROUP */
1170 } mts_group_status;
1171
1172 /*
1173 MTS statistics:
1174 */
1175 ulonglong mts_events_assigned; // number of events (statements) scheduled
1176 ulonglong mts_groups_assigned; // number of groups (transactions) scheduled
1177 volatile ulong
1178 mts_wq_overrun_cnt; // counter of all mts_wq_excess_cnt increments
1179 ulong wq_size_waits_cnt; // number of times C slept due to WQ:s oversize
1180 /*
1181 Counter of how many times Coordinator saw Workers are filled up
1182 "enough" with assignements. The enough definition depends on
1183 the scheduler type.
1184 */
1185 ulong mts_wq_no_underrun_cnt;
1186 std::atomic<longlong>
1187 mts_total_wait_overlap; // Waiting time corresponding to above
1188 /*
1189 Stats to compute Coordinator waiting time for any Worker available,
1190 applies solely to the Commit-clock scheduler.
1191 */
1192 ulonglong mts_total_wait_worker_avail;
1193 ulong mts_wq_overfill_cnt; // counter of C waited due to a WQ queue was full
1194 /*
1195 Statistics (todo: replace with WL5631) applies to either Coordinator and
1196 Worker. The exec time in the Coordinator case means scheduling. The read
1197 time in the Worker case means getting an event out of Worker queue
1198 */
1199 ulonglong stats_exec_time;
1200 ulonglong stats_read_time;
1201 struct timespec ts_exec[2]; // per event pre- and post- exec timestamp
1202 struct timespec stats_begin; // applier's bootstrap time
1203
1204 /*
1205 A sorted array of the Workers' current assignement numbers to provide
1206 approximate view on Workers loading.
1207 The first row of the least occupied Worker is queried at assigning
1208 a new partition. Is updated at checkpoint commit to the main RLI.
1209 */
1210 Prealloced_array<ulong, 16> least_occupied_workers;
1211 time_t mts_last_online_stat;
1212 /* end of MTS statistics */
1213
1214 /**
1215 Storage for holding newly computed values for the last executed
1216 event group coordinates while the current group of events is
1217 being committed, see @c pre_commit, post_commit.
1218 */
1219 char new_group_master_log_name[FN_REFLEN];
1220 my_off_t new_group_master_log_pos;
1221 char new_group_relay_log_name[FN_REFLEN];
1222 my_off_t new_group_relay_log_pos;
1223
1224 /* Returns the number of elements in workers array/vector. */
get_worker_count()1225 inline size_t get_worker_count() {
1226 if (workers_array_initialized)
1227 return workers.size();
1228 else
1229 return workers_copy_pfs.size();
1230 }
1231
1232 /*
1233 Returns a pointer to the worker instance at index n in workers
1234 array/vector.
1235 */
get_worker(size_t n)1236 Slave_worker *get_worker(size_t n) {
1237 if (workers_array_initialized) {
1238 if (n >= workers.size()) return nullptr;
1239
1240 return workers[n];
1241 } else if (workers_copy_pfs.size()) {
1242 if (n >= workers_copy_pfs.size()) return nullptr;
1243
1244 return workers_copy_pfs[n];
1245 } else
1246 return nullptr;
1247 }
1248
1249 /**
1250 The method implements updating a slave info table. It's
1251 specialized differently for STS and MTS.
1252 */
1253 virtual bool commit_positions();
1254
1255 /*Channel defined mts submode*/
1256 enum_mts_parallel_type channel_mts_submode;
1257 /* MTS submode */
1258 Mts_submode *current_mts_submode;
1259
1260 /* most of allocation in the coordinator rli is there */
1261 void init_workers(ulong);
1262
1263 /* counterpart of the init */
1264 void deinit_workers();
1265
1266 /**
1267 returns true if there is any gap-group of events to execute
1268 at slave starting phase.
1269 */
is_mts_recovery()1270 inline bool is_mts_recovery() const { return mts_recovery_group_cnt != 0; }
1271
clear_mts_recovery_groups()1272 inline void clear_mts_recovery_groups() {
1273 if (recovery_groups_inited) {
1274 bitmap_free(&recovery_groups);
1275 mts_recovery_group_cnt = 0;
1276 recovery_groups_inited = false;
1277 }
1278 }
1279
1280 /**
1281 returns true if events are to be executed in parallel
1282 */
is_parallel_exec()1283 inline bool is_parallel_exec() const {
1284 bool ret = (slave_parallel_workers > 0) && !is_mts_recovery();
1285
1286 DBUG_ASSERT(!ret || !workers.empty());
1287
1288 return ret;
1289 }
1290
1291 /**
1292 returns true if Coordinator is scheduling events belonging to
1293 the same group and has not reached yet its terminal event.
1294 */
is_mts_in_group()1295 inline bool is_mts_in_group() {
1296 return is_parallel_exec() && mts_group_status == MTS_IN_GROUP;
1297 }
1298
1299 /**
1300 Check if it is time to compute MTS checkpoint.
1301
1302 @retval true It is time to compute MTS checkpoint.
1303 @retval false It is not MTS or it is not time for computing checkpoint.
1304 */
1305 bool is_time_for_mts_checkpoint();
1306 /**
1307 While a group is executed by a Worker the relay log can change.
1308 Coordinator notifies Workers about this event. Worker is supposed
1309 to commit to the recovery table with the new info.
1310 */
1311 void reset_notified_relay_log_change();
1312
1313 /**
1314 While a group is executed by a Worker the relay log can change.
1315 Coordinator notifies Workers about this event. Coordinator and Workers
1316 maintain a bitmap of executed group that is reset with a new checkpoint.
1317 */
1318 void reset_notified_checkpoint(ulong count, time_t new_ts,
1319 bool update_timestamp = false);
1320
1321 /**
1322 Called when gaps execution is ended so it is crash-safe
1323 to reset the last session Workers info.
1324 */
1325 bool mts_finalize_recovery();
1326 /*
1327 * End of MTS section ******************************************************/
1328
1329 /* The general cleanup that slave applier may need at the end of query. */
cleanup_after_query()1330 inline void cleanup_after_query() {
1331 if (deferred_events) deferred_events->rewind();
1332 }
1333 /* The general cleanup that slave applier may need at the end of session. */
cleanup_after_session()1334 void cleanup_after_session() {
1335 if (deferred_events) delete deferred_events;
1336 }
1337
1338 /**
1339 Helper function to do after statement completion.
1340
1341 This function is called from an event to complete the group by
1342 either stepping the group position, if the "statement" is not
1343 inside a transaction; or increase the event position, if the
1344 "statement" is inside a transaction.
1345
1346 @param event_log_pos
1347 Master log position of the event. The position is recorded in the
1348 relay log info and used to produce information for <code>SHOW
1349 SLAVE STATUS</code>.
1350 */
1351 int stmt_done(my_off_t event_log_pos);
1352
1353 /**
1354 Set the value of a replication state flag.
1355
1356 @param flag Flag to set
1357 */
set_flag(enum_state_flag flag)1358 void set_flag(enum_state_flag flag) { m_flags |= (1UL << flag); }
1359
1360 /**
1361 Get the value of a replication state flag.
1362
1363 @param flag Flag to get value of
1364
1365 @return @c true if the flag was set, @c false otherwise.
1366 */
get_flag(enum_state_flag flag)1367 bool get_flag(enum_state_flag flag) { return m_flags & (1UL << flag); }
1368
1369 /**
1370 Clear the value of a replication state flag.
1371
1372 @param flag Flag to clear
1373 */
clear_flag(enum_state_flag flag)1374 void clear_flag(enum_state_flag flag) { m_flags &= ~(1UL << flag); }
1375
1376 private:
1377 /**
1378 Auxiliary function used by is_in_group.
1379
1380 The execute thread is in the middle of a statement in the
1381 following cases:
1382 - User_var/Intvar/Rand events have been processed, but the
1383 corresponding Query_log_event has not been processed.
1384 - Table_map or Row events have been processed, and the last Row
1385 event did not have the STMT_END_F set.
1386
1387 @retval true Replication thread is inside a statement.
1388 @retval false Replication thread is not inside a statement.
1389 */
is_in_stmt()1390 bool is_in_stmt() const {
1391 bool ret = (m_flags & (1UL << IN_STMT));
1392 DBUG_PRINT("info", ("is_in_stmt()=%d", ret));
1393 return ret;
1394 }
1395 /**
1396 Auxiliary function used by is_in_group.
1397
1398 @retval true The execute thread is inside a statement or a
1399 transaction, i.e., either a BEGIN has been executed or we are in
1400 the middle of a statement.
1401 @retval false The execute thread thread is not inside a statement
1402 or a transaction.
1403 */
is_in_trx_or_stmt()1404 bool is_in_trx_or_stmt() const {
1405 bool ret = is_in_stmt() || (info_thd->variables.option_bits & OPTION_BEGIN);
1406 DBUG_PRINT("info", ("is_in_trx_or_stmt()=%d", ret));
1407 return ret;
1408 }
1409
1410 public:
1411 /**
1412 A group is defined as the entire range of events that constitute
1413 a transaction or auto-committed statement. It has one of the
1414 following forms:
1415
1416 (Gtid)? Query(BEGIN) ... (Query(COMMIT) | Query(ROLLBACK) | Xid)
1417 (Gtid)? (Rand | User_var | Int_var)* Query(DDL)
1418
1419 Thus, to check if the execute thread is in a group, there are
1420 two cases:
1421
1422 - If the master generates Gtid events (5.7.5 or later, or 5.6 or
1423 later with GTID_MODE=ON), then is_in_group is the same as
1424 info_thd->owned_gtid.sidno != 0, since owned_gtid.sidno is set
1425 to non-zero by the Gtid_log_event and cleared to zero at commit
1426 or rollback.
1427
1428 - If the master does not generate Gtid events (i.e., master is
1429 pre-5.6, or pre-5.7.5 with GTID_MODE=OFF), then is_in_group is
1430 the same as is_in_trx_or_stmt().
1431
1432 @retval true Replication thread is inside a group.
1433 @retval false Replication thread is not inside a group.
1434 */
is_in_group()1435 bool is_in_group() const {
1436 bool ret = is_in_trx_or_stmt() || info_thd->owned_gtid.sidno != 0;
1437 DBUG_PRINT("info", ("is_in_group()=%d", ret));
1438 return ret;
1439 }
1440
1441 int count_relay_log_space();
1442
1443 /**
1444 Initialize the relay log info. This function does a set of operations
1445 on the rli object like initializing variables, loading information from
1446 repository, setting up name for relay log files and index, MTS recovery
1447 (if necessary), calculating the received GTID set for the channel and
1448 storing the updated rli object configuration into the repository.
1449
1450 When this function is called in a change master process and the change
1451 master procedure will purge all the relay log files later, there is no
1452 reason to try to calculate the received GTID set of the channel based on
1453 existing relay log files (they will be purged). Allowing reads to existing
1454 relay log files at this point may lead to put the server in a state where
1455 it will be no possible to configure it if it was reset when encryption of
1456 replication log files was ON and the keyring plugin is not available
1457 anymore.
1458
1459 @param skip_received_gtid_set_recovery When true, skips the received GTID
1460 set recovery.
1461
1462 @retval 0 Success.
1463 @retval 1 Error.
1464 */
1465 int rli_init_info(bool skip_received_gtid_set_recovery = false);
1466 void end_info();
1467 int flush_info(bool force = false);
1468 /**
1469 Clears from `this` Relay_log_info object all attribute values that are
1470 not to be kept.
1471
1472 @returns true if there were a problem with clearing the data and false
1473 otherwise.
1474 */
1475 bool clear_info();
1476 /**
1477 Checks if the underlying `Rpl_info` handler holds information for the fields
1478 to be kept between slave resets, while the other fields were cleared.
1479
1480 @param previous_result the result return from invoking the `check_info`
1481 method on `this` object.
1482
1483 @returns function success state represented by the `enum_return_check`
1484 enumeration.
1485 */
1486 enum_return_check check_if_info_was_cleared(
1487 const enum_return_check &previous_result) const;
1488 int flush_current_log();
1489 void set_master_info(Master_info *info);
1490
get_future_event_relay_log_pos()1491 inline ulonglong get_future_event_relay_log_pos() {
1492 return future_event_relay_log_pos;
1493 }
set_future_event_relay_log_pos(ulonglong log_pos)1494 inline void set_future_event_relay_log_pos(ulonglong log_pos) {
1495 future_event_relay_log_pos = log_pos;
1496 }
1497
get_group_master_log_name()1498 inline const char *get_group_master_log_name() {
1499 return group_master_log_name;
1500 }
get_group_master_log_pos()1501 inline ulonglong get_group_master_log_pos() { return group_master_log_pos; }
set_group_master_log_name(const char * log_file_name)1502 inline void set_group_master_log_name(const char *log_file_name) {
1503 strmake(group_master_log_name, log_file_name,
1504 sizeof(group_master_log_name) - 1);
1505 }
set_group_master_log_pos(ulonglong log_pos)1506 inline void set_group_master_log_pos(ulonglong log_pos) {
1507 group_master_log_pos = log_pos;
1508 }
1509
get_group_relay_log_name()1510 inline const char *get_group_relay_log_name() { return group_relay_log_name; }
get_group_relay_log_pos()1511 inline ulonglong get_group_relay_log_pos() { return group_relay_log_pos; }
set_group_relay_log_name(const char * log_file_name)1512 inline void set_group_relay_log_name(const char *log_file_name) {
1513 strmake(group_relay_log_name, log_file_name,
1514 sizeof(group_relay_log_name) - 1);
1515 }
set_group_relay_log_name(const char * log_file_name,size_t len)1516 inline void set_group_relay_log_name(const char *log_file_name, size_t len) {
1517 strmake(group_relay_log_name, log_file_name, len);
1518 }
set_group_relay_log_pos(ulonglong log_pos)1519 inline void set_group_relay_log_pos(ulonglong log_pos) {
1520 group_relay_log_pos = log_pos;
1521 }
1522
get_event_relay_log_name()1523 inline const char *get_event_relay_log_name() { return event_relay_log_name; }
get_event_relay_log_pos()1524 inline ulonglong get_event_relay_log_pos() { return event_relay_log_pos; }
set_event_relay_log_name(const char * log_file_name)1525 inline void set_event_relay_log_name(const char *log_file_name) {
1526 strmake(event_relay_log_name, log_file_name,
1527 sizeof(event_relay_log_name) - 1);
1528 set_event_relay_log_number(relay_log_name_to_number(log_file_name));
1529 notify_relay_log_change();
1530 }
1531
get_event_relay_log_number()1532 uint get_event_relay_log_number() { return event_relay_log_number; }
set_event_relay_log_number(uint number)1533 void set_event_relay_log_number(uint number) {
1534 event_relay_log_number = number;
1535 }
1536
1537 /**
1538 Given the extension number of the relay log, gets the full
1539 relay log path. Currently used in Slave_worker::retry_transaction()
1540
1541 @param [in] number extension number of relay log
1542 @param[in, out] name The full path of the relay log (per-channel)
1543 to be read by the slave worker.
1544 */
1545 void relay_log_number_to_name(uint number, char name[FN_REFLEN + 1]);
1546 uint relay_log_name_to_number(const char *name);
1547
set_event_start_pos(my_off_t pos)1548 void set_event_start_pos(my_off_t pos) { event_start_pos = pos; }
get_event_start_pos()1549 my_off_t get_event_start_pos() { return event_start_pos; }
1550
set_event_relay_log_pos(ulonglong log_pos)1551 inline void set_event_relay_log_pos(ulonglong log_pos) {
1552 event_relay_log_pos = log_pos;
1553 }
get_rpl_log_name()1554 inline const char *get_rpl_log_name() {
1555 return (group_master_log_name[0] ? group_master_log_name : "FIRST");
1556 }
1557
1558 static size_t get_number_info_rli_fields();
1559
1560 /**
1561 Sets bits for columns that are allowed to be `NULL`.
1562
1563 @param nullable_fields the bitmap to hold the nullable fields.
1564 */
1565 static void set_nullable_fields(MY_BITMAP *nullable_fields);
1566
1567 /**
1568 Indicate that a delay starts.
1569
1570 This does not actually sleep; it only sets the state of this
1571 Relay_log_info object to delaying so that the correct state can be
1572 reported by SHOW SLAVE STATUS and SHOW PROCESSLIST.
1573
1574 Requires rli->data_lock.
1575
1576 @param delay_end The time when the delay shall end.
1577 */
1578 void start_sql_delay(time_t delay_end);
1579
1580 /* Note that this is cast to uint32 in show_slave_status(). */
get_sql_delay()1581 time_t get_sql_delay() { return sql_delay; }
set_sql_delay(time_t _sql_delay)1582 void set_sql_delay(time_t _sql_delay) { sql_delay = _sql_delay; }
get_sql_delay_end()1583 time_t get_sql_delay_end() { return sql_delay_end; }
1584
1585 Relay_log_info(bool is_slave_recovery,
1586 #ifdef HAVE_PSI_INTERFACE
1587 PSI_mutex_key *param_key_info_run_lock,
1588 PSI_mutex_key *param_key_info_data_lock,
1589 PSI_mutex_key *param_key_info_sleep_lock,
1590 PSI_mutex_key *param_key_info_thd_lock,
1591 PSI_mutex_key *param_key_info_data_cond,
1592 PSI_mutex_key *param_key_info_start_cond,
1593 PSI_mutex_key *param_key_info_stop_cond,
1594 PSI_mutex_key *param_key_info_sleep_cond,
1595 #endif
1596 uint param_id, const char *param_channel, bool is_rli_fake);
1597 virtual ~Relay_log_info();
1598
1599 /*
1600 Determines if a warning message on unsafe execution was
1601 already printed out to avoid clutering the error log
1602 with several warning messages.
1603 */
1604 bool reported_unsafe_warning;
1605
1606 /*
1607 'sql_thread_kill_accepted is set to true when killed status is recognized.
1608 */
1609 bool sql_thread_kill_accepted;
1610
get_row_stmt_start_timestamp()1611 time_t get_row_stmt_start_timestamp() { return row_stmt_start_timestamp; }
1612
set_row_stmt_start_timestamp()1613 time_t set_row_stmt_start_timestamp() {
1614 if (row_stmt_start_timestamp == 0) row_stmt_start_timestamp = my_time(0);
1615
1616 return row_stmt_start_timestamp;
1617 }
1618
reset_row_stmt_start_timestamp()1619 void reset_row_stmt_start_timestamp() { row_stmt_start_timestamp = 0; }
1620
set_long_find_row_note_printed()1621 void set_long_find_row_note_printed() { long_find_row_note_printed = true; }
1622
unset_long_find_row_note_printed()1623 void unset_long_find_row_note_printed() {
1624 long_find_row_note_printed = false;
1625 }
1626
is_long_find_row_note_printed()1627 bool is_long_find_row_note_printed() { return long_find_row_note_printed; }
1628
1629 public:
1630 /**
1631 Delete the existing event and set a new one. This class is
1632 responsible for freeing the event, the caller should not do that.
1633
1634 @return 1 if an error was encountered, 0 otherwise.
1635 */
1636 virtual int set_rli_description_event(Format_description_log_event *fdle);
1637
1638 /**
1639 Return the current Format_description_log_event.
1640 */
get_rli_description_event()1641 Format_description_log_event *get_rli_description_event() const {
1642 return rli_description_event;
1643 }
1644
1645 /**
1646 adaptation for the slave applier to specific master versions.
1647 */
1648 ulong adapt_to_master_version(Format_description_log_event *fdle);
1649 ulong adapt_to_master_version_updown(ulong master_version,
1650 ulong current_version);
1651 uchar slave_version_split[3]; // bytes of the slave server version
1652 /*
1653 relay log info repository should be updated on relay log
1654 rotate. But when the transaction is split across two relay logs,
1655 update the repository will cause unexpected results and should
1656 be postponed till the 'commit' of the transaction is executed.
1657
1658 A flag that set to 'true' when this type of 'forced flush'(at the
1659 time of rotate relay log) is postponed due to transaction split
1660 across the relay logs.
1661 */
1662 bool force_flush_postponed_due_to_split_trans;
1663
get_commit_order_manager()1664 Commit_order_manager *get_commit_order_manager() { return commit_order_mngr; }
1665
set_commit_order_manager(Commit_order_manager * mngr)1666 void set_commit_order_manager(Commit_order_manager *mngr) {
1667 commit_order_mngr = mngr;
1668 }
1669
1670 /*
1671 Following set function is required to initialize the 'until_option' during
1672 MTS relay log recovery process.
1673
1674 Ideally initialization of 'until_option' is done through
1675 rli::init_until_option. This init_until_option requires the main server
1676 thread object and it makes use of the thd->lex->mi object to initialize the
1677 'until_option'.
1678
1679 But MTS relay log recovery process happens before the main server comes
1680 up at this time the THD object will not be available. Hence the following
1681 set function does the initialization of 'until_option'.
1682 */
set_until_option(Until_option * option)1683 void set_until_option(Until_option *option) {
1684 mysql_mutex_lock(&data_lock);
1685 until_option = option;
1686 mysql_mutex_unlock(&data_lock);
1687 }
1688
clear_until_option()1689 void clear_until_option() {
1690 mysql_mutex_lock(&data_lock);
1691 if (until_option) {
1692 delete until_option;
1693 until_option = nullptr;
1694 }
1695 mysql_mutex_unlock(&data_lock);
1696 }
1697
1698 bool set_info_search_keys(Rpl_info_handler *to);
1699
1700 /**
1701 Get coordinator's RLI. Especially used get the rli from
1702 a slave thread, like this: thd->rli_slave->get_c_rli();
1703 thd could be a SQL thread or a worker thread
1704 */
get_c_rli()1705 virtual Relay_log_info *get_c_rli() { return this; }
1706
1707 virtual const char *get_for_channel_str(bool upper_case = false) const;
1708
1709 /**
1710 Set replication filter for the channel.
1711 */
set_filter(Rpl_filter * channel_filter)1712 inline void set_filter(Rpl_filter *channel_filter) {
1713 rpl_filter = channel_filter;
1714 }
1715
1716 protected:
1717 Format_description_log_event *rli_description_event;
1718
1719 private:
1720 /*
1721 Commit order manager to order commits made by its workers. In context of
1722 Multi Source Replication each worker will be ordered by the coresponding
1723 corrdinator's order manager.
1724 */
1725 Commit_order_manager *commit_order_mngr;
1726
1727 /**
1728 Delay slave SQL thread by this amount of seconds.
1729 The delay is applied per transaction and based on the immediate master's
1730 commit time. Exceptionally, if a server in the replication chain does not
1731 support the commit timestamps in Gtid_log_event, the delay is applied per
1732 event and is based on the event timestamp.
1733 This is set with CHANGE MASTER TO MASTER_DELAY=X.
1734
1735 Guarded by data_lock. Initialized by the client thread executing
1736 START SLAVE. Written by client threads executing CHANGE MASTER TO
1737 MASTER_DELAY=X. Read by SQL thread and by client threads
1738 executing SHOW SLAVE STATUS. Note: must not be written while the
1739 slave SQL thread is running, since the SQL thread reads it without
1740 a lock when executing flush_info().
1741 */
1742 time_t sql_delay;
1743
1744 /**
1745 During a delay, specifies the point in time when the delay ends.
1746
1747 This is used for the SQL_Remaining_Delay column in SHOW SLAVE STATUS.
1748
1749 Guarded by data_lock. Written by the sql thread. Read by client
1750 threads executing SHOW SLAVE STATUS.
1751 */
1752 time_t sql_delay_end;
1753
1754 uint32 m_flags;
1755
1756 /*
1757 Before the MASTER_DELAY parameter was added (WL#344), relay_log.info
1758 had 4 lines. Now it has 5 lines.
1759 */
1760 static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY = 5;
1761
1762 /*
1763 Before the WL#5599, relay_log.info had 5 lines. Now it has 6 lines.
1764 */
1765 static const int LINES_IN_RELAY_LOG_INFO_WITH_WORKERS = 6;
1766
1767 /*
1768 Before the Id was added (BUG#2334346), relay_log.info
1769 had 6 lines. Now it has 7 lines.
1770 */
1771 static const int LINES_IN_RELAY_LOG_INFO_WITH_ID = 7;
1772
1773 /*
1774 Add a channel in the slave relay log info
1775 */
1776 static const int LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL = 8;
1777
1778 /*
1779 Represents line number in relay_log.info to save PRIVILEGE_CHECKS_USERNAME.
1780 It is username part of PRIVILEGES_CHECKS_USER column in
1781 performance_schema.replication_applier_configuration.
1782 */
1783 static const int LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_USERNAME = 9;
1784
1785 /*
1786 Maximum length of PRIVILEGE_CHECKS_USERNAME.
1787 */
1788 static const int PRIV_CHECKS_USERNAME_LENGTH = 32;
1789
1790 /*
1791 Represents line number in relay_log.info to save PRIVILEGE_CHECKS_HOSTNAME.
1792 It is hostname part of PRIVILEGES_CHECKS_USER column in
1793 performance_schema.replication_applier_configuration.
1794 */
1795 static const int LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_HOSTNAME = 10;
1796
1797 /*
1798 Maximum length of PRIVILEGE_CHECKS_USERNAME.
1799 */
1800 static const int PRIV_CHECKS_HOSTNAME_LENGTH = 255;
1801
1802 /*
1803 Represents line number in relay_log.info to save REQUIRE_ROW_FORMAT
1804 */
1805 static const int LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_ROW_FORMAT = 11;
1806
1807 /*
1808 Represents line number in relay_log.info to save
1809 REQUIRE_TABLE_PRIMARY_KEY_CHECK
1810 */
1811 static const int
1812 LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_TABLE_PRIMARY_KEY_CHECK = 12;
1813
1814 /*
1815 Total lines in relay_log.info.
1816 This has to be updated every time a member is added or removed.
1817 */
1818 static const int MAXIMUM_LINES_IN_RELAY_LOG_INFO_FILE =
1819 LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_TABLE_PRIMARY_KEY_CHECK;
1820
1821 bool read_info(Rpl_info_handler *from);
1822 bool write_info(Rpl_info_handler *to);
1823
1824 Relay_log_info(const Relay_log_info &info);
1825 Relay_log_info &operator=(const Relay_log_info &info);
1826
1827 /*
1828 Runtime state for printing a note when slave is taking
1829 too long while processing a row event.
1830 */
1831 time_t row_stmt_start_timestamp;
1832 bool long_find_row_note_printed;
1833
1834 /**
1835 sets the suffix required for relay log names in multisource
1836 replication. When --relay-log option is not provided, the
1837 names of the relay log files are relaylog.0000x or
1838 relaylog-CHANNEL.00000x in the case of MSR. However, if
1839 that option is provided, then the names of the relay log
1840 files are <relay-log-option>.0000x or
1841 <relay-log-option>-CHANNEL.00000x in the case of MSR.
1842
1843 The function adds a channel suffix (according to the channel to
1844 file name conventions and conversions) to the relay log file.
1845
1846 @todo: truncate the log file if length exceeds.
1847
1848 @param[in, out] buff buffer to store the complete relay log file name
1849 @param[in] buff_size size of buffer buff
1850 @param[in] base_name the base name of the relay log file
1851 */
1852 const char *add_channel_to_relay_log_name(char *buff, uint buff_size,
1853 const char *base_name);
1854
1855 /*
1856 Applier thread InnoDB priority.
1857 When two transactions conflict inside InnoDB, the one with
1858 greater priority wins.
1859 Priority must be set before applier thread start so that all
1860 executed transactions have the same priority.
1861 */
1862 int thd_tx_priority;
1863
1864 /* The object stores and handles START SLAVE UNTIL option */
1865 Until_option *until_option;
1866
1867 public:
1868 /*
1869 The boolean is set to true when the binlog (rli_fake) or slave
1870 (rli_slave) applier thread detaches any engine ha_data
1871 it has dealt with at time of XA START processing.
1872 The boolean is reset to false at the end of XA PREPARE,
1873 XA COMMIT ONE PHASE for the binlog applier, and
1874 at internal rollback of the slave applier at the same time with
1875 the engine ha_data re-attachment.
1876 */
1877 bool is_engine_ha_data_detached;
1878 /**
1879 Reference to being applied event. The member is set at event reading
1880 and gets reset at the end of the event lifetime.
1881 See more in @c RLI_current_event_raii that provides the main
1882 interface to the member.
1883 */
1884 Log_event *current_event;
1885
1886 /**
1887 Raised when slave applies and writes to its binary log statement
1888 which is not atomic DDL and has no XID assigned. Checked at commit
1889 time to decide whether it is safe to update slave info table
1890 within the same transaction as the write to binary log or this
1891 should be deffered. The deffered scenario applies for not XIDed events
1892 in which case such update might be lost on recovery.
1893 */
1894 bool ddl_not_atomic;
1895
set_thd_tx_priority(int priority)1896 void set_thd_tx_priority(int priority) { thd_tx_priority = priority; }
1897
get_thd_tx_priority()1898 int get_thd_tx_priority() { return thd_tx_priority; }
1899
1900 const char *get_until_log_name();
1901 my_off_t get_until_log_pos();
is_until_satisfied_at_start_slave()1902 bool is_until_satisfied_at_start_slave() {
1903 return until_option != nullptr &&
1904 until_option->is_satisfied_at_start_slave();
1905 }
is_until_satisfied_before_dispatching_event(const Log_event * ev)1906 bool is_until_satisfied_before_dispatching_event(const Log_event *ev) {
1907 return until_option != nullptr &&
1908 until_option->is_satisfied_before_dispatching_event(ev);
1909 }
is_until_satisfied_after_dispatching_event()1910 bool is_until_satisfied_after_dispatching_event() {
1911 return until_option != nullptr &&
1912 until_option->is_satisfied_after_dispatching_event();
1913 }
1914 /**
1915 Intialize until option object when starting slave.
1916
1917 @param[in] thd The thread object of current session.
1918 @param[in] master_param the parameters of START SLAVE.
1919
1920 @return int
1921 @retval 0 Succeeds to initialize until option object.
1922 @retval <> 0 A defined error number is return if any error happens.
1923 */
1924 int init_until_option(THD *thd, const LEX_MASTER_INFO *master_param);
1925
1926 /**
1927 Detaches the engine ha_data from THD. The fact
1928 is memorized in @c is_engine_ha_detached flag.
1929
1930 @param thd a reference to THD
1931 */
1932
1933 void detach_engine_ha_data(THD *thd);
1934
1935 /**
1936 Reattaches the engine ha_data to THD. The fact
1937 is memorized in @c is_engine_ha_detached flag.
1938
1939 @param thd a reference to THD
1940 */
1941
1942 void reattach_engine_ha_data(THD *thd);
1943 /**
1944 Drops the engine ha_data flag when it is up.
1945 The method is run at execution points of the engine ha_data
1946 re-attachment.
1947
1948 @return true when THD has detached the engine ha_data,
1949 false otherwise
1950 */
1951
unflag_detached_engine_ha_data()1952 bool unflag_detached_engine_ha_data() {
1953 bool rc = false;
1954
1955 if (is_engine_ha_data_detached)
1956 rc = !(is_engine_ha_data_detached = false); // return the old value
1957
1958 return rc;
1959 }
1960
1961 /**
1962 Execute actions at replicated atomic DLL post rollback time.
1963 This include marking the current atomic DDL query-log-event
1964 as having processed.
1965 This measure is necessary to avoid slave info table update execution
1966 when @c pre_commit() hook is called as part of DDL's eventual
1967 implicit commit.
1968 */
post_rollback()1969 void post_rollback() {
1970 static_cast<Query_log_event *>(current_event)->has_ddl_committed = true;
1971 }
1972
1973 /**
1974 The method implements a pre-commit hook to add up a new statement
1975 typically to a DDL transaction to update the slave info table.
1976 Note, in the non-transactional repository case the slave info
1977 is updated after successful commit of the main transaction.
1978
1979 @return false as success, otherwise true
1980 */
pre_commit()1981 bool pre_commit() {
1982 bool rc = false;
1983
1984 if (is_transactional()) {
1985 static_cast<Query_log_event *>(current_event)->has_ddl_committed = true;
1986 rc = commit_positions();
1987 }
1988 return rc;
1989 }
1990 /**
1991 Cleanup of any side effect that pre_commit() inflicts, including
1992 restore of the last executed group coordinates in case the current group
1993 has been destined to rollback, and signaling to possible waiters
1994 in the positive case.
1995
1996 @param on_rollback when true the method carries out rollback action
1997 */
1998 virtual void post_commit(bool on_rollback);
1999 };
2000
2001 /**
2002 Negation operator for `enum_priv_checks_status`, to facilitate validation
2003 against `SUCCESS`. To test for error status, use the `!!` idiom.
2004
2005 @param status the status code to check against `SUCCESS`
2006
2007 @return true if the status is `SUCCESS` and false otherwise.
2008 */
2009 bool operator!(Relay_log_info::enum_priv_checks_status status);
2010
2011 /**
2012 Negation operator for `enum_require_row_status`, to facilitate validation
2013 against `SUCCESS`. To test for error status, use the `!!` idiom.
2014
2015 @param status the status code to check against `SUCCESS`
2016
2017 @return true if the status is `SUCCESS` and false otherwise.
2018 */
2019 bool operator!(Relay_log_info::enum_require_row_status status);
2020
2021 bool mysql_show_relaylog_events(THD *thd);
2022
2023 /**
2024 @param thd a reference to THD
2025 @return true if thd belongs to a Worker thread and false otherwise.
2026 */
is_mts_worker(const THD * thd)2027 inline bool is_mts_worker(const THD *thd) {
2028 return thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER;
2029 }
2030
2031 /**
2032 Auxiliary function to check if we have a db partitioned MTS
2033 */
2034 bool is_mts_db_partitioned(Relay_log_info *rli);
2035
2036 /**
2037 Checks whether the supplied event encodes a (2pc-aware) DDL
2038 that has been already committed.
2039
2040 @param ev A reference to Query-log-event
2041 @return true when the event is already committed transactional DDL
2042 */
is_committed_ddl(Log_event * ev)2043 inline bool is_committed_ddl(Log_event *ev) {
2044 return ev->get_type_code() == binary_log::QUERY_EVENT &&
2045 /* has been already committed */
2046 static_cast<Query_log_event *>(ev)->has_ddl_committed;
2047 }
2048
2049 /**
2050 Checks whether the transaction identified by the argument
2051 is executed by a slave applier thread is an atomic DDL
2052 not yet committed (see @c Query_log_event::has_ddl_committed).
2053 THD::is_operating_substatement_implicitly filters out intermediate
2054 commits done by non-atomic DDLs.
2055 The error-tagged atomic statements are regarded as non-atomic
2056 therefore this predicate returns negative in such case.
2057
2058 Note that call to is_atomic_ddl() returns "approximate" outcome in
2059 this case as it misses information about type of tables used by the DDL.
2060
2061 This can be a problem for binlogging slave, as updates to slave info
2062 which happen in the same transaction as write of binary log event
2063 without XID might be lost on recovery. To avoid this problem
2064 RLI::ddl_not_atomic flag is employed which is set to true when
2065 non-atomic DDL without XID is written to the binary log.
2066
2067 "Approximate" outcome is always fine for non-binlogging slave as in
2068 this case commit happens using one-phase routine for which recovery
2069 is always correct.
2070
2071 @param thd a pointer to THD describing the transaction context
2072 @return true when a slave applier thread is set to commmit being processed
2073 DDL query-log-event, otherwise returns false.
2074 */
is_atomic_ddl_commit_on_slave(THD * thd)2075 inline bool is_atomic_ddl_commit_on_slave(THD *thd) {
2076 DBUG_ASSERT(thd);
2077
2078 Relay_log_info *rli = thd->rli_slave;
2079
2080 /* Early return is about an error in the SQL thread initialization */
2081 if (!rli) return false;
2082
2083 return ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
2084 thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) &&
2085 rli->current_event)
2086 ? (rli->is_transactional() &&
2087 /* has not yet committed */
2088 (rli->current_event->get_type_code() ==
2089 binary_log::QUERY_EVENT &&
2090 !static_cast<Query_log_event *>(rli->current_event)
2091 ->has_ddl_committed) &&
2092 /* unless slave binlogger identified non-atomic */
2093 !rli->ddl_not_atomic &&
2094 /* slave info is not updated when a part of multi-DROP-TABLE
2095 commits */
2096 !thd->is_commit_in_middle_of_statement &&
2097 (is_atomic_ddl(thd, true) &&
2098 !thd->is_operating_substatement_implicitly) &&
2099 /* error-tagged atomic DDL do not update yet slave info */
2100 static_cast<Query_log_event *>(rli->current_event)
2101 ->error_code == 0)
2102 : false;
2103 }
2104
2105 /**
2106 RAII class to control the slave applier execution context binding
2107 with a being handled event. The main object of control is Query-log-event
2108 containing DDL statement.
2109 The member RLI::current_event is set to refer to an event once it is
2110 read, e.g by next_event() and is reset to NULL at exiting a
2111 read-exec loop. Once the event is destroyed RLI::current_event must be reset
2112 or guaranteed not be accessed anymore.
2113 In the MTS execution the worker is reliably associated with an event
2114 only with the latter is not deferred. This includes Query-log-event.
2115 */
2116 class RLI_current_event_raii {
2117 Relay_log_info *m_rli;
2118
2119 public:
RLI_current_event_raii(Relay_log_info * rli_arg,Log_event * ev)2120 RLI_current_event_raii(Relay_log_info *rli_arg, Log_event *ev)
2121 : m_rli(rli_arg) {
2122 m_rli->current_event = ev;
2123 }
set_current_event(Log_event * ev)2124 void set_current_event(Log_event *ev) { m_rli->current_event = ev; }
~RLI_current_event_raii()2125 ~RLI_current_event_raii() { m_rli->current_event = nullptr; }
2126 };
2127
2128 /**
2129 @class MDL_lock_guard
2130
2131 Utility class to allow RAII pattern with `MDL_request` and `MDL_context`
2132 classes.
2133 */
2134 class MDL_lock_guard {
2135 public:
2136 /**
2137 Constructor that initializes the object and the target `THD` object but
2138 doesn't try to acquire any lock.
2139
2140 @param target THD object, source for the `MDL_context` to use.
2141 */
2142 MDL_lock_guard(THD *target);
2143 /**
2144 Constructor that initializes the object and the target `THD` object and tries
2145 to acquire the lock identified by `namespace_arg` with MDL type identified by
2146 `mdl_type_arg`.
2147
2148 If the `blocking` parameter is true, it will instantly try to acquire the
2149 lock and block. If the `blocking` parameter is false, it will first test if
2150 the lock is already acquired and only try to lock if no conflicting lock is
2151 already acquired.
2152
2153 @param target THD object, source for the `MDL_context` to use.
2154 @param namespace_arg MDL key namespace to acquire the lock from.
2155 @param mdl_type_arg MDL acquisition type
2156 @param blocking whether or not the execution should block if the lock is
2157 already acquired.
2158 */
2159 MDL_lock_guard(THD *target, MDL_key::enum_mdl_namespace namespace_arg,
2160 enum_mdl_type mdl_type_arg, bool blocking = false);
2161 /**
2162 Destructor that unlocks all acquired locks.
2163 */
2164 virtual ~MDL_lock_guard();
2165
2166 /**
2167 Uses the target `THD` object MDL context to acquire the lock identified by
2168 `namespace_arg` with MDL type identified by `mdl_type_arg`.
2169
2170 If the `blocking` parameter is true, it will instantly try to acquire the
2171 lock and block. If the `blocking` parameter is false, it will first test if
2172 the lock is already acquired and only try to lock if no conflicting lock is
2173 already acquired.
2174
2175 The lock is determined to have been acquired if the `THD` object MDL context
2176 hasn't already a lock and the lock is acquired. In other words, if the MDL
2177 context already has acquired the lock, the method will return failure.
2178
2179 @param namespace_arg MDL key namespace to acquire the lock from.
2180 @param mdl_type_arg MDL acquisition type
2181 @param blocking whether or not the execution should block if the lock is
2182 already acquired.
2183
2184 @return false if the lock has been acquired by this method invocation and
2185 true if not.
2186 */
2187 bool lock(MDL_key::enum_mdl_namespace namespace_arg,
2188 enum_mdl_type mdl_type_arg, bool blocking = false);
2189 /**
2190 Returns whether or not the lock as been acquired within this object
2191 life-cycle.
2192
2193 @return true if the lock has been acquired within this object life-cycle.
2194 */
2195 bool is_locked();
2196
2197 private:
2198 /** The `THD` object holding the MDL context used for acquiring/releasing. */
2199 THD *m_target;
2200 /** The MDL request holding the MDL ticket issued upon acquisition */
2201 MDL_request m_request;
2202 };
2203
2204 /**
2205 @class Applier_security_context_guard
2206
2207 Utility class to allow RAII pattern with `Security_context` class.
2208
2209 At initiliazation, if the `THD` main security context isn't already the
2210 appropriate one, it copies the `Relay_log_info::info_thd::security_context`
2211 and replaces it with the one initialized with the `PRIVILEGE_CHECK_USER` user.
2212 At deinitialization, it copies the backed up security context.
2213
2214 It also deals with the case where no privilege checks are required, meaning,
2215 `PRIVILEGE_CHECKS_USER` is `NULL`.
2216
2217 Usage examples:
2218
2219 (1)
2220 @code
2221 Applier_security_context_guard security_context{rli, thd};
2222 if (!security_context.has_access({SUPER_ACL})) {
2223 return ER_NO_ACCESS;
2224 }
2225 @endcode
2226
2227 (4)
2228 @code
2229 Applier_security_context_guard security_context{rli, thd};
2230 if (!security_context.has_access(
2231 {{CREATE_ACL | INSERT_ACL | UPDATE_ACL, table},
2232 {SELECT_ACL, table}})) {
2233 return ER_NO_ACCESS;
2234 }
2235 @endcode
2236 */
2237
2238 class Applier_security_context_guard {
2239 public:
2240 /**
2241 If needed, backs up the current `thd` security context and replaces it with
2242 a security context for `PRIVILEGE_CHECKS_USER` user.
2243
2244 @param rli the `Relay_log_info` object that holds the
2245 `PRIVILEGE_CHECKS_USER` info.
2246 @param thd the `THD` for which initialize the security context.
2247 */
2248 Applier_security_context_guard(Relay_log_info const *rli, THD const *thd);
2249 /**
2250 Destructor that restores the backed up security context, if needed.
2251 */
2252 virtual ~Applier_security_context_guard();
2253
2254 // --> Deleted constructors and methods to remove default move/copy semantics
2255 Applier_security_context_guard(const Applier_security_context_guard &) =
2256 delete;
2257 Applier_security_context_guard(Applier_security_context_guard &&) = delete;
2258 Applier_security_context_guard &operator=(
2259 const Applier_security_context_guard &) = delete;
2260 Applier_security_context_guard &operator=(Applier_security_context_guard &&) =
2261 delete;
2262 // <--
2263
2264 /**
2265 Returns whether or not privilege checks may be skipped within the current
2266 context.
2267
2268 @return true if privilege checks may be skipped and false otherwise.
2269 */
2270 bool skip_priv_checks() const;
2271 /**
2272 Checks if the `PRIVILEGE_CHECKS_USER` user has access to the privilieges
2273 passed on by `extra_privileges` parameter as well as to the privileges
2274 passed on at initilization time.
2275
2276 This particular method checks those privileges agains a given table and
2277 against that table's columns - the ones that are used or changed in the
2278 event.
2279
2280 @param extra_privileges set of privileges to check, additionally to those
2281 passed on at initialization. It's a list of
2282 (privilege, TABLE*, Rows_log_event*) tuples.
2283
2284 @return true if the privileges are included in the security context and
2285 false, otherwise.
2286 */
2287 bool has_access(
2288 std::vector<std::tuple<ulong, TABLE const *, Rows_log_event *>>
2289 &extra_privileges) const;
2290 /**
2291 Checks if the `PRIVILEGE_CHECKS_USER` user has access to the privilieges
2292 passed on by `extra_privileges` parameter as well as to the privileges
2293 passed on at initilization time.
2294
2295 @param extra_privileges set of privileges to check, additionally to those
2296 passed on at initialization. It's a list of
2297 privileges to be checked against any database.
2298
2299 @return true if the privileges are included in the security context and
2300 false, otherwise.
2301 */
2302 bool has_access(std::initializer_list<std::string> extra_privileges) const;
2303
2304 /**
2305 Checks if the `PRIVILEGE_CHECKS_USER` user has access to the privilieges
2306 passed on by `extra_privileges` parameter as well as to the privileges
2307 passed on at initilization time.
2308
2309 @param extra_privileges set of privileges to check, additionally to those
2310 passed on at initialization. It's a list of
2311 privileges to be checked against any database.
2312
2313 @return true if the privileges are included in the security context and
2314 false, otherwise.
2315 */
2316 bool has_access(std::initializer_list<ulong> extra_privileges) const;
2317
2318 /**
2319 Returns the username for the user for which the security context was
2320 initialized.
2321
2322 If `PRIVILEGE_CHECKS_USER` was configured for the target `Relay_log_info`
2323 object, that one is returned.
2324
2325 Otherwise, the username associated with the `Security_context` initialized
2326 for `Relay_log_info::info_thd` will be returned.
2327
2328 @return an `std::string` holding the username for the active security
2329 context.
2330 */
2331 std::string get_username() const;
2332 /**
2333 Returns the hostname for the user for which the security context was
2334 initialized.
2335
2336 If `PRIVILEGE_CHECKS_USER` was configured for the target `Relay_log_info`
2337 object, that one is returned.
2338
2339 Otherwise, the hostname associated with the `Security_context` initialized
2340 for `Relay_log_info::info_thd` will be returned.
2341
2342 @return an `std::string` holding the hostname for the active security
2343 context.
2344 */
2345 std::string get_hostname() const;
2346
2347 private:
2348 /**
2349 The `Relay_log_info` object holding the info required to initialize the
2350 context.
2351 */
2352 Relay_log_info const *m_target;
2353 /**
2354 The `THD` object for which the security context will be initialized.
2355 */
2356 THD const *m_thd;
2357 /** Applier security context based on `PRIVILEGE_CHECK_USER` user */
2358 Security_context m_applier_security_ctx;
2359 /** Currently in use security context */
2360 Security_context *m_current;
2361 /** Backed up security context */
2362 Security_context *m_previous;
2363 /** Flag that states if privilege check should be skipped */
2364 bool m_privilege_checks_none;
2365 /** Flag that states if there is a logged user */
2366 bool m_logged_in_acl_user;
2367
2368 void extract_columns_to_check(TABLE const *table, Rows_log_event *event,
2369 std::vector<std::string> &columns) const;
2370 };
2371
2372 #endif /* RPL_RLI_H */
2373