1 /* Copyright (c) 2005, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #ifndef RPL_RLI_H
24 #define RPL_RLI_H
25 
26 #if defined(__SUNPRO_CC)
27 /*
28   Solaris Studio 12.5 has a bug where, if you use dynamic_cast
29   and then later #include this file (which Boost does), you will
30   get a compile error. Work around it by just including it right now.
31 */
32 #include <cxxabi.h>
33 #endif
34 
35 #include <sys/types.h>
36 #include <time.h>
37 #include <atomic>
38 #include <memory>
39 #include <string>
40 #include <vector>
41 
42 #include "lex_string.h"
43 #include "libbinlogevents/include/binlog_event.h"
44 #include "m_string.h"
45 #include "map_helpers.h"
46 #include "my_bitmap.h"
47 #include "my_dbug.h"
48 #include "my_inttypes.h"
49 #include "my_io.h"
50 #include "my_loglevel.h"
51 #include "my_psi_config.h"
52 #include "my_sys.h"
53 #include "mysql/components/services/mysql_cond_bits.h"
54 #include "mysql/components/services/mysql_mutex_bits.h"
55 #include "mysql/components/services/psi_mutex_bits.h"
56 #include "mysql/psi/mysql_mutex.h"
57 #include "mysql/thread_type.h"
58 #include "prealloced_array.h"  // Prealloced_array
59 #include "sql/binlog.h"        // MYSQL_BIN_LOG
60 #include "sql/log_event.h"     //Gtid_log_event
61 #include "sql/psi_memory_key.h"
62 #include "sql/query_options.h"
63 #include "sql/rpl_gtid.h"         // Gtid_set
64 #include "sql/rpl_info.h"         // Rpl_info
65 #include "sql/rpl_mts_submode.h"  // enum_mts_parallel_type
66 #include "sql/rpl_slave_until_options.h"
67 #include "sql/rpl_tblmap.h"  // table_mapping
68 #include "sql/rpl_trx_boundary_parser.h"
69 #include "sql/rpl_utility.h"  // Deferred_log_events
70 #include "sql/sql_class.h"    // THD
71 #include "sql/system_variables.h"
72 #include "sql/table.h"
73 
74 class Commit_order_manager;
75 class Master_info;
76 class Rpl_filter;
77 class Rpl_info_handler;
78 class Slave_committed_queue;
79 class Slave_worker;
80 class String;
81 struct LEX_MASTER_INFO;
82 struct db_worker_hash_entry;
83 
84 extern uint sql_slave_skip_counter;
85 
86 typedef Prealloced_array<Slave_worker *, 4> Slave_worker_array;
87 
88 typedef struct slave_job_item {
89   Log_event *data;
90   uint relay_number;
91   my_off_t relay_pos;
92 } Slave_job_item;
93 
94 /*******************************************************************************
95 Replication SQL Thread
96 
97 Relay_log_info contains:
98   - the current relay log
99   - the current relay log offset
100   - master log name
101   - master log sequence corresponding to the last update
102   - misc information specific to the SQL thread
103 
104 Relay_log_info is initialized from a repository, i.e. table or file, if there is
105 one. Otherwise, data members are intialized with defaults by calling
106 init_relay_log_info().
107 
108 The relay.info table/file shall be updated whenever: (i) the relay log file
109 is rotated, (ii) SQL Thread is stopped, (iii) while processing a Xid_log_event,
110 (iv) after a Query_log_event (i.e. commit or rollback) and (v) after processing
111 any statement written to the binary log without a transaction context.
112 
113 The Xid_log_event is a commit for transactional engines and must be handled
114 differently to provide reliability/data integrity. In this case, positions
115 are updated within the context of the current transaction. So
116 
117   . If the relay.info is stored in a transactional repository and the server
118   crashes before successfully committing the transaction the changes to the
119   position table will be rolled back along with the data.
120 
121   . If the relay.info is stored in a non-transactional repository, for instance,
122   a file or a system table created using MyIsam, and the server crashes before
123   successfully committing the transaction the changes to the position table
124   will not be rolled back but data will.
125 
126 In particular, when there are mixed transactions, i.e a transaction that updates
127 both transaction and non-transactional engines, the Xid_log_event is still used
128 but reliability/data integrity cannot be achieved as we shall explain in what
129 follows.
130 
131 Changes to non-transactional engines, such as MyIsam, cannot be rolled back if a
132 failure happens. For that reason, there is no point in updating the positions
133 within the boundaries of any on-going transaction. This is true for both commit
134 and rollback. If a failure happens after processing the pseudo-transaction but
135 before updating the positions, the transaction will be re-executed when the
136 slave is up most likely causing an error that needs to be manually circumvented.
137 This is a well-known issue when non-transactional statements are executed.
138 
139 Specifically, if rolling back any transaction, positions are updated outside the
140 transaction boundaries. However, there may be a problem in this scenario even
141 when only transactional engines are updated. This happens because if there is a
142 rollback and such transaction is written to the binary log, a non-transactional
143 engine was updated or a temporary table was created or dropped within its
144 boundaries.
145 
146 In particular, in both STATEMENT and MIXED logging formats, this happens because
147 any temporary table is automatically dropped after a shutdown/startup.
148 See BUG#26945 for further details.
149 
150 Statements written to the binary log outside the boundaries of a transaction are
151 DDLs or maintenance commands which are not transactional. These means that they
152 cannot be rolled back if a failure happens. In such cases, the positions are
153 updated after processing the events. If a failure happens after processing the
154 statement but before updating the positions, the statement will be
155 re-executed when the slave is up most likely causing an error that needs to be
156 manually circumvented. This is a well-known issue when non-transactional
157 statements are executed.
158 
159 The --sync-relay-log-info does not have effect when a system table, either
160 transactional or non-transactional is used.
161 
162 To correctly recovery from failures, one should combine transactional system
163 tables along with the --relay-log-recovery.
164 *******************************************************************************/
165 class Relay_log_info : public Rpl_info {
166   friend class Rpl_info_factory;
167 
168  public:
169   /**
170     Set of possible return values for the member methods related to
171     `PRIVILEGE_CHECKS_USER` management.
172    */
173   enum class enum_priv_checks_status : int {
174     /** Function ended successfully */
175     SUCCESS = 0,
176     /** Value for user is anonymous (''@'...') */
177     USER_ANONYMOUS,
178     /** Value for the username part of the user is larger than 32 characters */
179     USERNAME_TOO_LONG,
180     /** Value for the hostname part of the user is larger than 255 characters */
181     HOSTNAME_TOO_LONG,
182     /** Value for the hostname part of the user has illegal characters */
183     HOSTNAME_SYNTAX_ERROR,
184     /**
185       Value for the username part of the user is NULL but the value for the
186       hostname is not NULL.
187      */
188     USERNAME_NULL_HOSTNAME_NOT_NULL,
189     /**
190       Provided user doesn't exists.
191      */
192     USER_DOES_NOT_EXIST,
193     /**
194       Provided user doesn't have the necesary privileges to execute the needed
195       operations.
196      */
197     USER_DOES_NOT_HAVE_PRIVILEGES,
198     /** Values provided for the internal variables are corrupted. */
199     USER_DATA_CORRUPTED,
200     /**
201       Provided user doesn't have `FILE` privileges during the execution of a
202       `LOAD DATA`event.
203      */
204     LOAD_DATA_EVENT_NOT_ALLOWED
205   };
206 
207   enum class enum_require_row_status : int {
208     /** Function ended successfully */
209     SUCCESS = 0,
210     /** Value for `privilege_checks_user` is not empty */
211     PRIV_CHECKS_USER_NOT_NULL
212   };
213 
214   /*
215     The per-channel filter associated with this RLI
216   */
217   Rpl_filter *rpl_filter;
218   /**
219      Flags for the state of the replication.
220    */
221   enum enum_state_flag {
222     /** The replication thread is inside a statement */
223     IN_STMT,
224 
225     /** Flag counter.  Should always be last */
226     STATE_FLAGS_COUNT
227   };
228 
229   /**
230     Identifies what is the slave policy on primary keys in tables.
231   */
232   enum enum_require_table_primary_key {
233     /**No policy, used on PFS*/
234     PK_CHECK_NONE = 0,
235     /**
236       The slave sets the value of sql_require_primary_key according to
237       the source replicated value.
238     */
239     PK_CHECK_STREAM = 1,
240     /** The slave enforces tables to have primary keys for a given channel*/
241     PK_CHECK_ON = 2,
242     /** The slave does not enforce any policy around primary keys*/
243     PK_CHECK_OFF = 3
244   };
245 
246   /*
247     The SQL thread owns one Relay_log_info, and each client that has
248     executed a BINLOG statement owns one Relay_log_info. This function
249     returns zero for the Relay_log_info object that belongs to the SQL
250     thread and nonzero for Relay_log_info objects that belong to
251     clients.
252   */
belongs_to_client()253   inline bool belongs_to_client() {
254     DBUG_ASSERT(info_thd);
255     return !info_thd->slave_thread;
256   }
257 /* Instrumentation key for performance schema for mts_temp_table_LOCK */
258 #ifdef HAVE_PSI_INTERFACE
259   PSI_mutex_key m_key_mts_temp_table_LOCK;
260 #endif
261   /*
262      Lock to protect race condition while transferring temporary table from
263      worker thread to coordinator thread and vice-versa
264    */
265   mysql_mutex_t mts_temp_table_LOCK;
266   /*
267      Lock to acquire by methods that concurrently update lwm of committed
268      transactions and the min waited timestamp and its index.
269   */
270   mysql_mutex_t mts_gaq_LOCK;
271   mysql_cond_t logical_clock_cond;
272   /*
273     If true, events with the same server id should be replicated. This
274     field is set on creation of a relay log info structure by copying
275     the value of ::replicate_same_server_id and can be overridden if
276     necessary. For example of when this is done, check sql_binlog.cc,
277     where the BINLOG statement can be used to execute "raw" events.
278    */
279   bool replicate_same_server_id;
280 
281   /*
282     Protected with internal locks.
283     Must get data_lock when resetting the logs.
284   */
285   MYSQL_BIN_LOG relay_log;
286 
287   /*
288     Identifies when the recovery process is going on.
289     See sql/rpl_slave.h:init_recovery for further details.
290   */
291   bool is_relay_log_recovery;
292 
293   /* The following variables are safe to read any time */
294 
295   /*
296     When we restart slave thread we need to have access to the previously
297     created temporary tables. Modified only on init/end and by the SQL
298     thread, read only by SQL thread.
299   */
300   TABLE *save_temporary_tables;
301 
302   /* parent Master_info structure */
303   Master_info *mi;
304 
305   /* number of temporary tables open in this channel */
306   std::atomic<int32> atomic_channel_open_temp_tables{0};
307 
308   /** the status of the commit timestamps for the relay log */
309   enum {
310     /*
311       no GTID log event has been processed, so it is not known if this log
312       has commit timestamps
313     */
314     COMMIT_TS_UNKNOWN,
315     // the immediate master does not support commit timestamps
316     COMMIT_TS_NOT_FOUND,
317     // the immediate master supports commit timestamps
318     COMMIT_TS_FOUND
319   } commit_timestamps_status;
320 
321   /**
322     @return the pointer to the Gtid_monitoring_info.
323   */
get_gtid_monitoring_info()324   Gtid_monitoring_info *get_gtid_monitoring_info() {
325     return gtid_monitoring_info;
326   }
327 
328   /**
329     Stores the details of the transaction which has just started processing.
330 
331     This function is called by the STS applier or MTS worker when applying a
332     Gtid.
333 
334     @param  gtid_arg         the gtid of the trx
335     @param  original_ts_arg  the original commit timestamp of the transaction
336     @param  immediate_ts_arg the immediate commit timestamp of the transaction
337     @param  skipped          true if the transaction was gtid skipped
338   */
339   void started_processing(Gtid gtid_arg, ulonglong original_ts_arg,
340                           ulonglong immediate_ts_arg, bool skipped = false) {
341     gtid_monitoring_info->start(gtid_arg, original_ts_arg, immediate_ts_arg,
342                                 skipped);
343   }
344 
345   /**
346     Stores the details of the transaction which has just started processing.
347 
348     This function is called by the MTS coordinator when queuing a Gtid to
349     a worker.
350 
351     @param  gtid_log_ev_arg the gtid log event of the trx
352   */
started_processing(Gtid_log_event * gtid_log_ev_arg)353   void started_processing(Gtid_log_event *gtid_log_ev_arg) {
354     Gtid gtid = {0, 0};
355     if (gtid_log_ev_arg->get_type() == ASSIGNED_GTID) {
356       gtid = {gtid_log_ev_arg->get_sidno(true), gtid_log_ev_arg->get_gno()};
357     }
358     started_processing(gtid, gtid_log_ev_arg->original_commit_timestamp,
359                        gtid_log_ev_arg->immediate_commit_timestamp);
360   }
361 
362   /**
363     When the processing of a transaction is completed, that timestamp is
364     recorded, the information is copied to last_processed_trx and the
365     information in processing_trx is cleared.
366 
367     If the transaction was "applied" but GTID-skipped, the copy will not
368     happen and the last_processed_trx will keep its current value.
369   */
finished_processing()370   void finished_processing() { gtid_monitoring_info->finish(); }
371 
372   /**
373     @return True if there is a transaction being currently processed
374   */
is_processing_trx()375   bool is_processing_trx() {
376     return gtid_monitoring_info->is_processing_trx_set();
377   }
378 
379   /**
380    Clears the processing_trx structure fields. Normally called when there is an
381    error while processing the transaction.
382   */
clear_processing_trx()383   void clear_processing_trx() { gtid_monitoring_info->clear_processing_trx(); }
384 
385   /**
386     Clears the Gtid_monitoring_info fields.
387   */
clear_gtid_monitoring_info()388   void clear_gtid_monitoring_info() { gtid_monitoring_info->clear(); }
389 
390   /**
391    When a transaction is retried, the error number and message, and total number
392    of retries are stored. The timestamp for this error is also set here.
393 
394    @param transient_errno_arg        Transient error number.
395    @param transient_err_message_arg  Transient error message.
396    @param trans_retries_arg          Number of times this transaction has been
397                                      retried so far.
398   */
retried_processing(uint transient_errno_arg,const char * transient_err_message_arg,ulong trans_retries_arg)399   void retried_processing(uint transient_errno_arg,
400                           const char *transient_err_message_arg,
401                           ulong trans_retries_arg) {
402     gtid_monitoring_info->store_transient_error(
403         transient_errno_arg, transient_err_message_arg, trans_retries_arg);
404   }
405 
406   /*
407     If on init_info() call error_on_rli_init_info is true that means
408     that previous call to init_info() terminated with an error, RESET
409     SLAVE must be executed and the problem fixed manually.
410    */
411   bool error_on_rli_init_info;
412 
413   /**
414     Variable is set to true as long as
415     original_commit_timestamp > immediate_commit_timestamp so that the
416     corresponding warning is only logged once.
417   */
418   bool gtid_timestamps_warning_logged;
419 
420   /**
421     Retrieves the username part of the `PRIVILEGE_CHECKS_USER` option of `CHANGE
422     MASTER TO` statement.
423 
424     @return a string holding the username part of the user or an empty string.
425    */
426   std::string get_privilege_checks_username() const;
427 
428   /**
429     Retrieves the host part of the `PRIVILEGE_CHECKS_USER` option of `CHANGE
430     MASTER TO` statement.
431 
432     @return a string holding the host part of the user or an empty string.
433    */
434   std::string get_privilege_checks_hostname() const;
435 
436   /**
437     Returns whether or not there is no user configured for
438     `PRIVILEGE_CHECKS_USER`.
439 
440     @return true if there is no user configured for `PRIVILEGE_CHECKS_USER` and
441             false otherwise.
442    */
443   bool is_privilege_checks_user_null() const;
444 
445   /**
446     Returns whether or not the internal data regarding `PRIVILEGE_CHECKS_USER`
447     is corrupted. This may happen, for instance, if the user tries to change the
448     Relay_log_info repository manually or after a server crash.
449 
450     @return true if the data is corrupted, false otherwise.
451    */
452   bool is_privilege_checks_user_corrupted() const;
453 
454   /**
455     Clears the info related to the data initialized from
456     `PRIVILEGE_CHECKS_USER`.
457    */
458   void clear_privilege_checks_user();
459 
460   /**
461     Sets the flag that tells whether or not the data regarding the
462     `PRIVILEGE_CHECKS_USER` is corrupted.
463 
464     @param is_corrupted the flag value.
465    */
466   void set_privilege_checks_user_corrupted(bool is_corrupted);
467 
468   /**
469     Initializes data related to `PRIVILEGE_CHECKS_USER`, specifically the user
470     name and the user hostname.
471 
472     @param param_privilege_checks_username the username part of the user.
473     @param param_privilege_checks_hostname the hostname part of the user.
474 
475     @return a status code describing the state of the data initialization.
476    */
477   enum_priv_checks_status set_privilege_checks_user(
478       char const *param_privilege_checks_username,
479       char const *param_privilege_checks_hostname);
480 
481   /**
482     Checks the validity and integrity of the data related to
483     `PRIVILEGE_CHECKS_USER`, specifically the user name and the user
484     hostname. Also checks if the user exists.
485 
486     This method takes no parameters as it checks the values stored in the
487     internal member variables.
488 
489     @return a status code describing the state of the data initialization.
490    */
491   enum_priv_checks_status check_privilege_checks_user();
492 
493   /**
494     Checks the validity and integrity of the data related to
495     `PRIVILEGE_CHECKS_USER`, specifically the user name and the user
496     hostname. Also checks if the user exists.
497 
498     @param param_privilege_checks_username the username part of the user.
499     @param param_privilege_checks_hostname the hostname part of the user.
500 
501     @return a status code describing the state of the data initialization.
502    */
503   enum_priv_checks_status check_privilege_checks_user(
504       char const *param_privilege_checks_username,
505       char const *param_privilege_checks_hostname);
506   /**
507     Checks the existence of user provided as part of the `PRIVILEGE_CHECKS_USER`
508     option.
509 
510     @param param_privilege_checks_username the username part of the user.
511     @param param_privilege_checks_hostname the host part of the user.
512 
513     @return a status code describing the state of the data initialization.
514    */
515   enum_priv_checks_status check_applier_acl_user(
516       char const *param_privilege_checks_username,
517       char const *param_privilege_checks_hostname);
518 
519   /**
520     Returns a printable representation of the username and hostname currently
521     being used in the applier security context or empty strings other wise.
522 
523     @return an `std::pair` containing the username and the hostname printable
524             representations.
525    */
526   std::pair<const char *, const char *>
527   print_applier_security_context_user_host() const;
528 
529   /**
530     Outputs the error message associated with applier thread user privilege
531     checks error `error_code`.
532 
533     The output stream to which is outputted is decided based on `to_client`
534     which, if set to `true` will output the message to the client session and if
535     `false` will output to the server log.
536 
537     @param level the message urgency level, e.g., `ERROR_LEVEL`,
538                  `WARNING_LEVEL`, etc.
539     @param status_code the status code to output the associated error message
540                        for.
541     @param to_client a flag indicating if the message should be sent to the
542                      client session or to the server log.
543     @param channel_name name of the channel for which the error is being
544                         reported.
545     @param user_name username for which the error is being reported.
546     @param host_name hostname for which the error is being reported.
547    */
548   void report_privilege_check_error(enum loglevel level,
549                                     enum_priv_checks_status status_code,
550                                     bool to_client,
551                                     char const *channel_name = nullptr,
552                                     char const *user_name = nullptr,
553                                     char const *host_name = nullptr) const;
554 
555   /**
556     Initializes the security context associated with the `PRIVILEGE_CHECKS_USER`
557     user that is to be used by the provided THD object.
558 
559     @return a status code describing the state of the data initialization.
560    */
561   enum_priv_checks_status initialize_security_context(THD *thd);
562   /**
563     Initializes the security context associated with the `PRIVILEGE_CHECKS_USER`
564     user that is to be used by the applier thread.
565 
566     @return a status code describing the state of the data initialization.
567    */
568   enum_priv_checks_status initialize_applier_security_context();
569 
570   /**
571     Returns whether the slave is running in row mode only.
572 
573     @return true if row_format_required is active, false otherwise.
574    */
575   bool is_row_format_required() const;
576 
577   /**
578     Sets the flag that tells whether or not the slave is running in row mode
579     only.
580 
581     @param require_row the flag value.
582    */
583   void set_require_row_format(bool require_row);
584 
585   /**
586      Returns what is the slave policy concerning primary keys on
587      replicated tables.
588 
589      @return STREAM if it replicates the source values, ON if it enforces the
590              need on primary keys, OFF if it does no enforce any restrictions.
591    */
592   enum_require_table_primary_key get_require_table_primary_key_check() const;
593 
594   /**
595     Sets the field that tells what is the slave policy concerning primary keys
596     on replicated tables.
597 
598     @param require_pk the policy value.
599   */
600   void set_require_table_primary_key_check(
601       enum_require_table_primary_key require_pk);
602 
603   /*
604     This will be used to verify transactions boundaries of events being applied
605 
606     Its output is used to detect when events were not logged using row based
607     logging.
608   */
609   Replication_transaction_boundary_parser transaction_parser;
610 
611   /*
612     Let's call a group (of events) :
613       - a transaction
614       or
615       - an autocommiting query + its associated events (INSERT_ID,
616     TIMESTAMP...)
617     We need these rli coordinates :
618     - relay log name and position of the beginning of the group we currently are
619     executing. Needed to know where we have to restart when replication has
620     stopped in the middle of a group (which has been rolled back by the slave).
621     - relay log name and position just after the event we have just
622     executed. This event is part of the current group.
623     Formerly we only had the immediately above coordinates, plus a 'pending'
624     variable, but this dealt wrong with the case of a transaction starting on a
625     relay log and finishing (commiting) on another relay log. Case which can
626     happen when, for example, the relay log gets rotated because of
627     max_binlog_size.
628   */
629  protected:
630   /**
631      Event group means a group of events of a transaction. group_relay_log_name
632      and group_relay_log_pos record the place before where all event groups
633      are applied. When slave starts, it resume to apply events from
634      group_relay_log_pos. They will be initialized to the begin of the first
635      relay log file if it is a new slave(including SLAVE RESET). Then,
636      group_relay_log_pos is advanced after each transaction is applied
637      successfully in single thread slave. For MTS, group_relay_log_pos
638      is updated by mts checkpoint mechanism. group_relay_log_pos and
639      group_relay_log_name are stored into relay_log_info file/table
640      periodically. When server startup, they are loaded from relay log info
641      file/table.
642    */
643   char group_relay_log_name[FN_REFLEN];
644   ulonglong group_relay_log_pos;
645   char event_relay_log_name[FN_REFLEN];
646   /* The suffix number of relay log name */
647   uint event_relay_log_number;
648   ulonglong event_relay_log_pos;
649   ulonglong future_event_relay_log_pos;
650 
651   /* current event's start position in relay log */
652   my_off_t event_start_pos;
653   /*
654      Original log name and position of the group we're currently executing
655      (whose coordinates are group_relay_log_name/pos in the relay log)
656      in the master's binlog. These concern the *group*, because in the master's
657      binlog the log_pos that comes with each event is the position of the
658      beginning of the group.
659 
660     Note: group_master_log_name, group_master_log_pos must only be
661     written from the thread owning the Relay_log_info (SQL thread if
662     !belongs_to_client(); client thread executing BINLOG statement if
663     belongs_to_client()).
664   */
665   char group_master_log_name[FN_REFLEN];
666   volatile my_off_t group_master_log_pos;
667 
668  private:
669   Gtid_set *gtid_set;
670   /*
671     Identifies when this object belongs to the SQL thread and was not
672     created for a client thread or some other purpose including
673     Slave_worker instance initializations. Ends up serving the same
674     purpose as the belongs_to_client method, but its value is set
675     earlier on in the class constructor.
676   */
677   bool rli_fake;
678   /* Flag that ensures the retrieved GTID set is initialized only once. */
679   bool gtid_retrieved_initialized;
680 
681   /**
682     Stores information on the last processed transaction or the transaction
683     that is currently being processed.
684 
685     STS:
686     - timestamps of the currently applying/last applied transaction
687 
688     MTS:
689     - coordinator thread: timestamps of the currently scheduling/last scheduled
690       transaction in a worker's queue
691     - worker thread: timestamps of the currently applying/last applied
692       transaction
693   */
694   Gtid_monitoring_info *gtid_monitoring_info;
695 
696   /**
697      It will be set to true when receiver truncated relay log for some reason.
698      The truncated data may already be read by applier. So applier need to check
699      it each time the binlog_end_pos is updated.
700    */
701   bool m_relay_log_truncated = false;
702 
703   /**
704     The user name part of the user passed on to `PRIVILEGE_CHECKS_USER`.
705    */
706   std::string m_privilege_checks_username;
707 
708   /**
709     The host name part of the user passed on to `PRIVILEGE_CHECKS_USER`.
710    */
711   std::string m_privilege_checks_hostname;
712 
713   /**
714     Tells whether or not the internal data regarding `PRIVILEGE_CHECKS_USER` is
715     corrupted. This may happen if the user tries to change the Relay_log_info
716     repository by hand.
717    */
718   bool m_privilege_checks_user_corrupted;
719 
720   /**
721    Tells if the slave is only accepting events logged with row based logging.
722    It also blocks
723      Operations with temporary table creation/deletion
724      Operations with LOAD DATA
725      Events: INTVAR_EVENT, RAND_EVENT, USER_VAR_EVENT
726   */
727   bool m_require_row_format;
728 
729   /**
730     Identifies what is the slave policy on primary keys in tables.
731     If set to STREAM it just replicates the value of sql_require_primary_key.
732     If set to ON it fails when the source tries to replicate a table creation
733     or alter operation that does not have a primary key.
734     If set to OFF it does not enforce any policies on the channel for primary
735     keys.
736   */
737   enum_require_table_primary_key m_require_table_primary_key_check;
738 
739  public:
is_relay_log_truncated()740   bool is_relay_log_truncated() { return m_relay_log_truncated; }
741 
get_sid_map()742   Sid_map *get_sid_map() { return gtid_set->get_sid_map(); }
743 
get_sid_lock()744   Checkable_rwlock *get_sid_lock() { return get_sid_map()->get_sid_lock(); }
745 
add_logged_gtid(rpl_sidno sidno,rpl_gno gno)746   void add_logged_gtid(rpl_sidno sidno, rpl_gno gno) {
747     get_sid_lock()->assert_some_lock();
748     DBUG_ASSERT(sidno <= get_sid_map()->get_max_sidno());
749     gtid_set->ensure_sidno(sidno);
750     gtid_set->_add_gtid(sidno, gno);
751   }
752 
753   /**
754     Adds a GTID set to received GTID set.
755 
756     @param gtid_set the gtid_set to add
757 
758     @return RETURN_STATUS_OK or RETURN_STATUS_REPORTED_ERROR.
759   */
760   enum_return_status add_gtid_set(const Gtid_set *gtid_set);
761 
get_gtid_set()762   const Gtid_set *get_gtid_set() const { return gtid_set; }
763 
764   bool reinit_sql_thread_io_cache(const char *log, bool need_data_lock);
765 
766   /**
767      Check if group_relay_log_name is in index file.
768 
769      @param [out] errmsg An error message is returned if error happens.
770 
771      @retval    false    It is valid.
772      @retval    true     It is invalid. In this case, *errmsg is set to point to
773                          the error message.
774 */
775   bool is_group_relay_log_name_invalid(const char **errmsg);
776   /**
777      Reset group_relay_log_name and group_relay_log_pos to the start of the
778      first relay log file. The caller must hold data_lock.
779 
780      @param[out]     errmsg    An error message is set into it if error happens.
781 
782      @retval    false    Success
783      @retval    true     Error
784  */
785   bool reset_group_relay_log_pos(const char **errmsg);
786   /*
787     Update the error number, message and timestamp fields. This function is
788     different from va_report() as va_report() also logs the error message in the
789     log apart from updating the error fields.
790   */
791   void fill_coord_err_buf(loglevel level, int err_code,
792                           const char *buff_coord) const;
793 
794   /*
795     Flag that the group_master_log_pos is invalid. This may occur
796     (for example) after CHANGE MASTER TO RELAY_LOG_POS.  This will
797     be unset after the first event has been executed and the
798     group_master_log_pos is valid again.
799    */
800   bool is_group_master_log_pos_invalid;
801 
802   /*
803     Handling of the relay_log_space_limit optional constraint.
804     ignore_log_space_limit is used to resolve a deadlock between I/O and SQL
805     threads, the SQL thread sets it to unblock the I/O thread and make it
806     temporarily forget about the constraint.
807   */
808   ulonglong log_space_limit, log_space_total;
809   std::atomic<bool> ignore_log_space_limit;
810 
811   /*
812     Used by the SQL thread to instructs the IO thread to rotate
813     the logs when the SQL thread needs to purge to release some
814     disk space.
815    */
816   std::atomic<bool> sql_force_rotate_relay;
817 
818   time_t last_master_timestamp;
819 
820   /**
821     Reset the delay.
822     This is used by RESET SLAVE to clear the delay.
823   */
clear_sql_delay()824   void clear_sql_delay() { sql_delay = 0; }
825 
826   /*
827     Needed for problems when slave stops and we want to restart it
828     skipping one or more events in the master log that have caused
829     errors, and have been manually applied by DBA already.
830   */
831   volatile uint32 slave_skip_counter;
832   volatile ulong abort_pos_wait; /* Incremented on change master */
833   mysql_mutex_t log_space_lock;
834   mysql_cond_t log_space_cond;
835 
836   /*
837      Condition and its parameters from START SLAVE UNTIL clause.
838 
839      UNTIL condition is tested with is_until_satisfied() method that is
840      called by exec_relay_log_event(). is_until_satisfied() caches the result
841      of the comparison of log names because log names don't change very often;
842      this cache is invalidated by parts of code which change log names with
843      notify_*_log_name_updated() methods. (They need to be called only if SQL
844      thread is running).
845    */
846   enum {
847     UNTIL_NONE = 0,
848     UNTIL_MASTER_POS,
849     UNTIL_RELAY_POS,
850     UNTIL_SQL_BEFORE_GTIDS,
851     UNTIL_SQL_AFTER_GTIDS,
852     UNTIL_SQL_AFTER_MTS_GAPS,
853     UNTIL_SQL_VIEW_ID,
854     UNTIL_DONE
855   } until_condition;
856 
857   char cached_charset[6];
858 
859   /*
860     trans_retries varies between 0 to slave_transaction_retries and counts how
861     many times the slave has retried the present transaction; gets reset to 0
862     when the transaction finally succeeds. retried_trans is a cumulative
863     counter: how many times the slave has retried a transaction (any) since
864     slave started.
865   */
866   ulong trans_retries, retried_trans;
867 
868   /*
869     If the end of the hot relay log is made of master's events ignored by the
870     slave I/O thread, these two keep track of the coords (in the master's
871     binlog) of the last of these events seen by the slave I/O thread. If not,
872     ign_master_log_name_end[0] == 0.
873     As they are like a Rotate event read/written from/to the relay log, they
874     are both protected by rli->relay_log.LOCK_binlog_end_pos.
875   */
876   char ign_master_log_name_end[FN_REFLEN];
877   ulonglong ign_master_log_pos_end;
878 
879   /*
880     Indentifies where the SQL Thread should create temporary files for the
881     LOAD DATA INFILE. This is used for security reasons.
882    */
883   char slave_patternload_file[FN_REFLEN];
884   size_t slave_patternload_file_size;
885 
886   /**
887     Identifies the last time a checkpoint routine has been executed.
888   */
889   struct timespec last_clock;
890 
891   /**
892     Invalidates cached until_log_name and event_relay_log_name comparison
893     result. Should be called after switch to next relay log if
894     there chances that sql_thread is running.
895   */
notify_relay_log_change()896   inline void notify_relay_log_change() {
897     if (until_condition == UNTIL_RELAY_POS)
898       dynamic_cast<Until_position *>(until_option)->notify_log_name_change();
899   }
900 
901   /**
902      Receiver thread notifies that it truncated some data from relay log.
903      data_lock will be acquired, so the caller should not hold data_lock.
904   */
905   void notify_relay_log_truncated();
906   /**
907      Applier clears the flag after it handled the situation. The caller must
908      hold data_lock.
909   */
910   void clear_relay_log_truncated();
911 
912   /**
913     The same as @c notify_group_relay_log_name_update but for
914     @c group_master_log_name.
915   */
notify_group_master_log_name_update()916   inline void notify_group_master_log_name_update() {
917     if (until_condition == UNTIL_MASTER_POS)
918       dynamic_cast<Until_position *>(until_option)->notify_log_name_change();
919   }
920 
inc_event_relay_log_pos()921   inline void inc_event_relay_log_pos() {
922     event_relay_log_pos = future_event_relay_log_pos;
923   }
924 
925   /**
926     Last executed event group coordinates are updated and optionally
927     forcibly flushed to a repository.
928     @param log_pos         a value of the executed position to update to
929     @param need_data_lock  whether data_lock should be acquired
930     @param force           the value is passed to eventual flush_info()
931   */
932   int inc_group_relay_log_pos(ulonglong log_pos, bool need_data_lock,
933                               bool force = false);
934 
935   int wait_for_pos(THD *thd, String *log_name, longlong log_pos,
936                    double timeout);
937   /**
938     Wait for a GTID set to be executed.
939 
940     @param thd                 The thread for status changes and kill status
941     @param gtid                A char array with a GTID set
942     @param timeout             Number of seconds to wait before timing out
943     @param update_THD_status  Shall the method update the THD stage
944 
945     @retval 0  The set is already executed
946     @retval -1 There was a timeout waiting for the set
947     @retval -2 There was an issue while waiting.
948    */
949   int wait_for_gtid_set(THD *thd, const char *gtid, double timeout,
950                         bool update_THD_status = true);
951   /**
952     Wait for a GTID set to be executed.
953 
954     @param thd                 The thread for status changes and kill status
955     @param gtid                A String with a GTID set
956     @param timeout             Number of seconds to wait before timing out
957     @param update_THD_status  Shall the method update the THD stage
958 
959     @retval 0  The set is already executed
960     @retval -1 There was a timeout waiting for the set
961     @retval -2 There was an issue while waiting.
962   */
963   int wait_for_gtid_set(THD *thd, String *gtid, double timeout,
964                         bool update_THD_status = true);
965   /**
966     Wait for a GTID set to be executed.
967 
968     @param thd                 The thread for status changes and kill status
969     @param wait_gtid_set       A GTID_set object
970     @param timeout             Number of seconds to wait before timing out
971     @param update_THD_status   Shall the method update the THD stage
972 
973     @retval 0  The set is already executed
974     @retval -1 There was a timeout waiting for the set
975     @retval -2 There was an issue while waiting.
976   */
977   int wait_for_gtid_set(THD *thd, const Gtid_set *wait_gtid_set, double timeout,
978                         bool update_THD_status = true);
979 
980   void close_temporary_tables();
981 
982   RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock  */
983   uint tables_to_lock_count;      /* RBR: Count of tables to lock */
984   table_mapping m_table_map;      /* RBR: Mapping table-id to table */
985   /* RBR: Record Rows_query log event */
986   Rows_query_log_event *rows_query_ev;
987 
get_table_data(TABLE * table_arg,table_def ** tabledef_var,TABLE ** conv_table_var)988   bool get_table_data(TABLE *table_arg, table_def **tabledef_var,
989                       TABLE **conv_table_var) const {
990     DBUG_ASSERT(tabledef_var && conv_table_var);
991     for (TABLE_LIST *ptr = tables_to_lock; ptr != nullptr;
992          ptr = ptr->next_global)
993       if (ptr->table == table_arg) {
994         *tabledef_var = &static_cast<RPL_TABLE_LIST *>(ptr)->m_tabledef;
995         *conv_table_var = static_cast<RPL_TABLE_LIST *>(ptr)->m_conv_table;
996         DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
997                              " tabledef: %p, conv_table: %p",
998                              table_arg->s->db.str, table_arg->s->table_name.str,
999                              *tabledef_var, *conv_table_var));
1000         return true;
1001       }
1002     return false;
1003   }
1004 
1005   /**
1006     Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
1007     the thread save 3 @c get_charset() per @c Query_log_event if the charset is
1008     not changing from event to event (common situation). When the 6 bytes are
1009     equal to 0 is used to mean "cache is invalidated".
1010   */
1011   void cached_charset_invalidate();
1012   bool cached_charset_compare(char *charset) const;
1013 
1014   void cleanup_context(THD *, bool);
1015   void slave_close_thread_tables(THD *);
1016   void clear_tables_to_lock();
1017   int purge_relay_logs(THD *thd, const char **errmsg, bool delete_only = false);
1018 
1019   /*
1020     Used to defer stopping the SQL thread to give it a chance
1021     to finish up the current group of events.
1022     The timestamp is set and reset in @c sql_slave_killed().
1023   */
1024   time_t last_event_start_time;
1025 
1026   /* The original master commit timestamp in microseconds since epoch */
1027   uint64 original_commit_timestamp;
1028 
1029   /*
1030     A container to hold on Intvar-, Rand-, Uservar- log-events in case
1031     the slave is configured with table filtering rules.
1032     The withhold events are executed when their parent Query destiny is
1033     determined for execution as well.
1034   */
1035   Deferred_log_events *deferred_events;
1036 
1037   /*
1038     State of the container: true stands for IRU events gathering,
1039     false does for execution, either deferred or direct.
1040   */
1041   bool deferred_events_collecting;
1042 
1043   /*****************************************************************************
1044     WL#5569 MTS
1045 
1046     legends:
1047     C  - Coordinator;
1048     W  - Worker;
1049     WQ - Worker Queue containing event assignments
1050   */
1051   // number's is determined by global slave_parallel_workers
1052   Slave_worker_array workers;
1053 
1054   // To map a database to a worker
1055   malloc_unordered_map<std::string,
1056                        unique_ptr_with_deleter<db_worker_hash_entry>>
1057       mapping_db_to_worker{key_memory_db_worker_hash_entry};
1058   bool inited_hash_workers;  //  flag to check if mapping_db_to_worker is inited
1059 
1060   mysql_mutex_t slave_worker_hash_lock;  // for mapping_db_to_worker
1061   mysql_cond_t slave_worker_hash_cond;   // for mapping_db_to_worker
1062 
1063   /*
1064     For the purpose of reporting the worker status in performance schema table,
1065     we need to preserve the workers array after worker thread was killed. So, we
1066     copy this array into the below vector which is used for reporting
1067     until next init_workers(). Note that we only copy those attributes that
1068     would be useful in reporting worker status. We only use a few attributes in
1069     this object as of now but still save the whole object. The idea is
1070     to be future proof. We will extend performance schema tables in future
1071     and then we would use a good number of attributes from this object.
1072   */
1073 
1074   std::vector<Slave_worker *> workers_copy_pfs;
1075 
1076   /*
1077     This flag is turned ON when the workers array is initialized.
1078     Before destroying the workers array we check this flag to make sure
1079     we are not destroying an unitilized array. For the purpose of reporting the
1080     worker status in performance schema table, we need to preserve the workers
1081     array after worker thread was killed. So, we copy this array into
1082     workers_copy_pfs array which is used for reporting until next
1083     init_workers().
1084   */
1085   bool workers_array_initialized;
1086 
1087   volatile ulong pending_jobs;
1088   mysql_mutex_t pending_jobs_lock;
1089   mysql_cond_t pending_jobs_cond;
1090   mysql_mutex_t exit_count_lock;  // mutex of worker exit count
1091   ulong mts_slave_worker_queue_len_max;
1092   ulonglong mts_pending_jobs_size;      // actual mem usage by WQ:s
1093   ulonglong mts_pending_jobs_size_max;  // max of WQ:s size forcing C to wait
1094   bool mts_wq_oversize;  // C raises flag to wait some memory's released
1095   Slave_worker
1096       *last_assigned_worker;  // is set to a Worker at assigning a group
1097   /*
1098     master-binlog ordered queue of Slave_job_group descriptors of groups
1099     that are under processing. The queue size is @c checkpoint_group.
1100   */
1101   Slave_committed_queue *gaq;
1102   /*
1103     Container for references of involved partitions for the current event group
1104   */
1105   // CGAP dynarray holds id:s of partitions of the Current being executed Group
1106   Prealloced_array<db_worker_hash_entry *, 4> curr_group_assigned_parts;
1107   // deferred array to hold partition-info-free events
1108   Prealloced_array<Slave_job_item, 8> curr_group_da;
1109 
1110   bool curr_group_seen_gtid;   // current group started with Gtid-event or not
1111   bool curr_group_seen_begin;  // current group started with B-event or not
1112   bool curr_group_isolated;    // current group requires execution in isolation
1113   bool mts_end_group_sets_max_dbs;  // flag indicates if partitioning info is
1114                                     // discovered
1115   volatile ulong
1116       mts_wq_underrun_w_id;  // Id of a Worker whose queue is getting empty
1117   /*
1118      Ongoing excessive overrun counter to correspond to number of events that
1119      are being scheduled while a WQ is close to be filled up.
1120      `Close' is defined as (100 - mts_worker_underrun_level) %.
1121      The counter is incremented each time a WQ get filled over that level
1122      and decremented when the level drops below.
1123      The counter therefore describes level of saturation that Workers
1124      are experiencing and is used as a parameter to compute a nap time for
1125      Coordinator in order to avoid reaching WQ limits.
1126   */
1127   volatile long mts_wq_excess_cnt;
1128   long mts_worker_underrun_level;    // % of WQ size at which W is considered
1129                                      // hungry
1130   ulong mts_coordinator_basic_nap;   // C sleeps to avoid WQs overrun
1131   ulong opt_slave_parallel_workers;  // cache for ::opt_slave_parallel_workers
1132   ulong slave_parallel_workers;  // the one slave session time number of workers
1133   ulong
1134       exit_counter;  // Number of workers contributed to max updated group index
1135   ulonglong max_updated_index;
1136   ulong recovery_parallel_workers;  // number of workers while recovering
1137   uint rli_checkpoint_seqno;        // counter of groups executed after the most
1138                                     // recent CP
1139   uint checkpoint_group;            // cache for ::opt_mts_checkpoint_group
1140   MY_BITMAP recovery_groups;        // bitmap used during recovery
1141   bool recovery_groups_inited;
1142   ulong mts_recovery_group_cnt;  // number of groups to execute at recovery
1143   ulong mts_recovery_index;      // running index of recoverable groups
1144   bool mts_recovery_group_seen_begin;
1145 
1146   /*
1147     While distibuting events basing on their properties MTS
1148     Coordinator changes its mts group status.
1149     Transition normally flowws to follow `=>' arrows on the diagram:
1150 
1151             +----------------------------+
1152             V                            |
1153     MTS_NOT_IN_GROUP =>                  |
1154         {MTS_IN_GROUP => MTS_END_GROUP --+} while (!killed) => MTS_KILLED_GROUP
1155 
1156     MTS_END_GROUP has `->' loop breaking link to MTS_NOT_IN_GROUP when
1157     Coordinator synchronizes with Workers by demanding them to
1158     complete their assignments.
1159   */
1160   enum {
1161     /*
1162        no new events were scheduled after last synchronization,
1163        includes Single-Threaded-Slave case.
1164     */
1165     MTS_NOT_IN_GROUP,
1166 
1167     MTS_IN_GROUP,    /* at least one not-terminal event scheduled to a Worker */
1168     MTS_END_GROUP,   /* the last scheduled event is a terminal event */
1169     MTS_KILLED_GROUP /* Coordinator gave up to reach MTS_END_GROUP */
1170   } mts_group_status;
1171 
1172   /*
1173     MTS statistics:
1174   */
1175   ulonglong mts_events_assigned;  // number of events (statements) scheduled
1176   ulonglong mts_groups_assigned;  // number of groups (transactions) scheduled
1177   volatile ulong
1178       mts_wq_overrun_cnt;   // counter of all mts_wq_excess_cnt increments
1179   ulong wq_size_waits_cnt;  // number of times C slept due to WQ:s oversize
1180   /*
1181     Counter of how many times Coordinator saw Workers are filled up
1182     "enough" with assignements. The enough definition depends on
1183     the scheduler type.
1184   */
1185   ulong mts_wq_no_underrun_cnt;
1186   std::atomic<longlong>
1187       mts_total_wait_overlap;  // Waiting time corresponding to above
1188   /*
1189     Stats to compute Coordinator waiting time for any Worker available,
1190     applies solely to the Commit-clock scheduler.
1191   */
1192   ulonglong mts_total_wait_worker_avail;
1193   ulong mts_wq_overfill_cnt;  // counter of C waited due to a WQ queue was full
1194   /*
1195     Statistics (todo: replace with WL5631) applies to either Coordinator and
1196     Worker. The exec time in the Coordinator case means scheduling. The read
1197     time in the Worker case means getting an event out of Worker queue
1198   */
1199   ulonglong stats_exec_time;
1200   ulonglong stats_read_time;
1201   struct timespec ts_exec[2];   // per event pre- and post- exec timestamp
1202   struct timespec stats_begin;  // applier's bootstrap time
1203 
1204   /*
1205      A sorted array of the Workers' current assignement numbers to provide
1206      approximate view on Workers loading.
1207      The first row of the least occupied Worker is queried at assigning
1208      a new partition. Is updated at checkpoint commit to the main RLI.
1209   */
1210   Prealloced_array<ulong, 16> least_occupied_workers;
1211   time_t mts_last_online_stat;
1212   /* end of MTS statistics */
1213 
1214   /**
1215     Storage for holding newly computed values for the last executed
1216     event group coordinates while the current group of events is
1217     being committed, see @c pre_commit, post_commit.
1218   */
1219   char new_group_master_log_name[FN_REFLEN];
1220   my_off_t new_group_master_log_pos;
1221   char new_group_relay_log_name[FN_REFLEN];
1222   my_off_t new_group_relay_log_pos;
1223 
1224   /* Returns the number of elements in workers array/vector. */
get_worker_count()1225   inline size_t get_worker_count() {
1226     if (workers_array_initialized)
1227       return workers.size();
1228     else
1229       return workers_copy_pfs.size();
1230   }
1231 
1232   /*
1233     Returns a pointer to the worker instance at index n in workers
1234     array/vector.
1235   */
get_worker(size_t n)1236   Slave_worker *get_worker(size_t n) {
1237     if (workers_array_initialized) {
1238       if (n >= workers.size()) return nullptr;
1239 
1240       return workers[n];
1241     } else if (workers_copy_pfs.size()) {
1242       if (n >= workers_copy_pfs.size()) return nullptr;
1243 
1244       return workers_copy_pfs[n];
1245     } else
1246       return nullptr;
1247   }
1248 
1249   /**
1250     The method implements updating a slave info table. It's
1251     specialized differently for STS and MTS.
1252   */
1253   virtual bool commit_positions();
1254 
1255   /*Channel defined mts submode*/
1256   enum_mts_parallel_type channel_mts_submode;
1257   /* MTS submode  */
1258   Mts_submode *current_mts_submode;
1259 
1260   /* most of allocation in the coordinator rli is there */
1261   void init_workers(ulong);
1262 
1263   /* counterpart of the init */
1264   void deinit_workers();
1265 
1266   /**
1267      returns true if there is any gap-group of events to execute
1268                   at slave starting phase.
1269   */
is_mts_recovery()1270   inline bool is_mts_recovery() const { return mts_recovery_group_cnt != 0; }
1271 
clear_mts_recovery_groups()1272   inline void clear_mts_recovery_groups() {
1273     if (recovery_groups_inited) {
1274       bitmap_free(&recovery_groups);
1275       mts_recovery_group_cnt = 0;
1276       recovery_groups_inited = false;
1277     }
1278   }
1279 
1280   /**
1281      returns true if events are to be executed in parallel
1282   */
is_parallel_exec()1283   inline bool is_parallel_exec() const {
1284     bool ret = (slave_parallel_workers > 0) && !is_mts_recovery();
1285 
1286     DBUG_ASSERT(!ret || !workers.empty());
1287 
1288     return ret;
1289   }
1290 
1291   /**
1292      returns true if Coordinator is scheduling events belonging to
1293      the same group and has not reached yet its terminal event.
1294   */
is_mts_in_group()1295   inline bool is_mts_in_group() {
1296     return is_parallel_exec() && mts_group_status == MTS_IN_GROUP;
1297   }
1298 
1299   /**
1300      Check if it is time to compute MTS checkpoint.
1301 
1302      @retval true   It is time to compute MTS checkpoint.
1303      @retval false  It is not MTS or it is not time for computing checkpoint.
1304   */
1305   bool is_time_for_mts_checkpoint();
1306   /**
1307      While a group is executed by a Worker the relay log can change.
1308      Coordinator notifies Workers about this event. Worker is supposed
1309      to commit to the recovery table with the new info.
1310   */
1311   void reset_notified_relay_log_change();
1312 
1313   /**
1314      While a group is executed by a Worker the relay log can change.
1315      Coordinator notifies Workers about this event. Coordinator and Workers
1316      maintain a bitmap of executed group that is reset with a new checkpoint.
1317   */
1318   void reset_notified_checkpoint(ulong count, time_t new_ts,
1319                                  bool update_timestamp = false);
1320 
1321   /**
1322      Called when gaps execution is ended so it is crash-safe
1323      to reset the last session Workers info.
1324   */
1325   bool mts_finalize_recovery();
1326   /*
1327    * End of MTS section ******************************************************/
1328 
1329   /* The general cleanup that slave applier may need at the end of query. */
cleanup_after_query()1330   inline void cleanup_after_query() {
1331     if (deferred_events) deferred_events->rewind();
1332   }
1333   /* The general cleanup that slave applier may need at the end of session. */
cleanup_after_session()1334   void cleanup_after_session() {
1335     if (deferred_events) delete deferred_events;
1336   }
1337 
1338   /**
1339     Helper function to do after statement completion.
1340 
1341     This function is called from an event to complete the group by
1342     either stepping the group position, if the "statement" is not
1343     inside a transaction; or increase the event position, if the
1344     "statement" is inside a transaction.
1345 
1346     @param event_log_pos
1347     Master log position of the event. The position is recorded in the
1348     relay log info and used to produce information for <code>SHOW
1349     SLAVE STATUS</code>.
1350   */
1351   int stmt_done(my_off_t event_log_pos);
1352 
1353   /**
1354      Set the value of a replication state flag.
1355 
1356      @param flag Flag to set
1357    */
set_flag(enum_state_flag flag)1358   void set_flag(enum_state_flag flag) { m_flags |= (1UL << flag); }
1359 
1360   /**
1361      Get the value of a replication state flag.
1362 
1363      @param flag Flag to get value of
1364 
1365      @return @c true if the flag was set, @c false otherwise.
1366    */
get_flag(enum_state_flag flag)1367   bool get_flag(enum_state_flag flag) { return m_flags & (1UL << flag); }
1368 
1369   /**
1370      Clear the value of a replication state flag.
1371 
1372      @param flag Flag to clear
1373    */
clear_flag(enum_state_flag flag)1374   void clear_flag(enum_state_flag flag) { m_flags &= ~(1UL << flag); }
1375 
1376  private:
1377   /**
1378     Auxiliary function used by is_in_group.
1379 
1380     The execute thread is in the middle of a statement in the
1381     following cases:
1382     - User_var/Intvar/Rand events have been processed, but the
1383       corresponding Query_log_event has not been processed.
1384     - Table_map or Row events have been processed, and the last Row
1385       event did not have the STMT_END_F set.
1386 
1387     @retval true Replication thread is inside a statement.
1388     @retval false Replication thread is not inside a statement.
1389    */
is_in_stmt()1390   bool is_in_stmt() const {
1391     bool ret = (m_flags & (1UL << IN_STMT));
1392     DBUG_PRINT("info", ("is_in_stmt()=%d", ret));
1393     return ret;
1394   }
1395   /**
1396     Auxiliary function used by is_in_group.
1397 
1398     @retval true The execute thread is inside a statement or a
1399     transaction, i.e., either a BEGIN has been executed or we are in
1400     the middle of a statement.
1401     @retval false The execute thread thread is not inside a statement
1402     or a transaction.
1403   */
is_in_trx_or_stmt()1404   bool is_in_trx_or_stmt() const {
1405     bool ret = is_in_stmt() || (info_thd->variables.option_bits & OPTION_BEGIN);
1406     DBUG_PRINT("info", ("is_in_trx_or_stmt()=%d", ret));
1407     return ret;
1408   }
1409 
1410  public:
1411   /**
1412     A group is defined as the entire range of events that constitute
1413     a transaction or auto-committed statement. It has one of the
1414     following forms:
1415 
1416     (Gtid)? Query(BEGIN) ... (Query(COMMIT) | Query(ROLLBACK) | Xid)
1417     (Gtid)? (Rand | User_var | Int_var)* Query(DDL)
1418 
1419     Thus, to check if the execute thread is in a group, there are
1420     two cases:
1421 
1422     - If the master generates Gtid events (5.7.5 or later, or 5.6 or
1423       later with GTID_MODE=ON), then is_in_group is the same as
1424       info_thd->owned_gtid.sidno != 0, since owned_gtid.sidno is set
1425       to non-zero by the Gtid_log_event and cleared to zero at commit
1426       or rollback.
1427 
1428     - If the master does not generate Gtid events (i.e., master is
1429       pre-5.6, or pre-5.7.5 with GTID_MODE=OFF), then is_in_group is
1430       the same as is_in_trx_or_stmt().
1431 
1432     @retval true Replication thread is inside a group.
1433     @retval false Replication thread is not inside a group.
1434   */
is_in_group()1435   bool is_in_group() const {
1436     bool ret = is_in_trx_or_stmt() || info_thd->owned_gtid.sidno != 0;
1437     DBUG_PRINT("info", ("is_in_group()=%d", ret));
1438     return ret;
1439   }
1440 
1441   int count_relay_log_space();
1442 
1443   /**
1444     Initialize the relay log info. This function does a set of operations
1445     on the rli object like initializing variables, loading information from
1446     repository, setting up name for relay log files and index, MTS recovery
1447     (if necessary), calculating the received GTID set for the channel and
1448     storing the updated rli object configuration into the repository.
1449 
1450     When this function is called in a change master process and the change
1451     master procedure will purge all the relay log files later, there is no
1452     reason to try to calculate the received GTID set of the channel based on
1453     existing relay log files (they will be purged). Allowing reads to existing
1454     relay log files at this point may lead to put the server in a state where
1455     it will be no possible to configure it if it was reset when encryption of
1456     replication log files was ON and the keyring plugin is not available
1457     anymore.
1458 
1459     @param skip_received_gtid_set_recovery When true, skips the received GTID
1460                                            set recovery.
1461 
1462     @retval 0 Success.
1463     @retval 1 Error.
1464   */
1465   int rli_init_info(bool skip_received_gtid_set_recovery = false);
1466   void end_info();
1467   int flush_info(bool force = false);
1468   /**
1469    Clears from `this` Relay_log_info object all attribute values that are
1470    not to be kept.
1471 
1472    @returns true if there were a problem with clearing the data and false
1473             otherwise.
1474    */
1475   bool clear_info();
1476   /**
1477    Checks if the underlying `Rpl_info` handler holds information for the fields
1478    to be kept between slave resets, while the other fields were cleared.
1479 
1480    @param previous_result the result return from invoking the `check_info`
1481                           method on `this` object.
1482 
1483    @returns function success state represented by the `enum_return_check`
1484             enumeration.
1485    */
1486   enum_return_check check_if_info_was_cleared(
1487       const enum_return_check &previous_result) const;
1488   int flush_current_log();
1489   void set_master_info(Master_info *info);
1490 
get_future_event_relay_log_pos()1491   inline ulonglong get_future_event_relay_log_pos() {
1492     return future_event_relay_log_pos;
1493   }
set_future_event_relay_log_pos(ulonglong log_pos)1494   inline void set_future_event_relay_log_pos(ulonglong log_pos) {
1495     future_event_relay_log_pos = log_pos;
1496   }
1497 
get_group_master_log_name()1498   inline const char *get_group_master_log_name() {
1499     return group_master_log_name;
1500   }
get_group_master_log_pos()1501   inline ulonglong get_group_master_log_pos() { return group_master_log_pos; }
set_group_master_log_name(const char * log_file_name)1502   inline void set_group_master_log_name(const char *log_file_name) {
1503     strmake(group_master_log_name, log_file_name,
1504             sizeof(group_master_log_name) - 1);
1505   }
set_group_master_log_pos(ulonglong log_pos)1506   inline void set_group_master_log_pos(ulonglong log_pos) {
1507     group_master_log_pos = log_pos;
1508   }
1509 
get_group_relay_log_name()1510   inline const char *get_group_relay_log_name() { return group_relay_log_name; }
get_group_relay_log_pos()1511   inline ulonglong get_group_relay_log_pos() { return group_relay_log_pos; }
set_group_relay_log_name(const char * log_file_name)1512   inline void set_group_relay_log_name(const char *log_file_name) {
1513     strmake(group_relay_log_name, log_file_name,
1514             sizeof(group_relay_log_name) - 1);
1515   }
set_group_relay_log_name(const char * log_file_name,size_t len)1516   inline void set_group_relay_log_name(const char *log_file_name, size_t len) {
1517     strmake(group_relay_log_name, log_file_name, len);
1518   }
set_group_relay_log_pos(ulonglong log_pos)1519   inline void set_group_relay_log_pos(ulonglong log_pos) {
1520     group_relay_log_pos = log_pos;
1521   }
1522 
get_event_relay_log_name()1523   inline const char *get_event_relay_log_name() { return event_relay_log_name; }
get_event_relay_log_pos()1524   inline ulonglong get_event_relay_log_pos() { return event_relay_log_pos; }
set_event_relay_log_name(const char * log_file_name)1525   inline void set_event_relay_log_name(const char *log_file_name) {
1526     strmake(event_relay_log_name, log_file_name,
1527             sizeof(event_relay_log_name) - 1);
1528     set_event_relay_log_number(relay_log_name_to_number(log_file_name));
1529     notify_relay_log_change();
1530   }
1531 
get_event_relay_log_number()1532   uint get_event_relay_log_number() { return event_relay_log_number; }
set_event_relay_log_number(uint number)1533   void set_event_relay_log_number(uint number) {
1534     event_relay_log_number = number;
1535   }
1536 
1537   /**
1538     Given the extension number of the relay log, gets the full
1539     relay log path. Currently used in Slave_worker::retry_transaction()
1540 
1541     @param [in]   number      extension number of relay log
1542     @param[in, out] name      The full path of the relay log (per-channel)
1543                               to be read by the slave worker.
1544   */
1545   void relay_log_number_to_name(uint number, char name[FN_REFLEN + 1]);
1546   uint relay_log_name_to_number(const char *name);
1547 
set_event_start_pos(my_off_t pos)1548   void set_event_start_pos(my_off_t pos) { event_start_pos = pos; }
get_event_start_pos()1549   my_off_t get_event_start_pos() { return event_start_pos; }
1550 
set_event_relay_log_pos(ulonglong log_pos)1551   inline void set_event_relay_log_pos(ulonglong log_pos) {
1552     event_relay_log_pos = log_pos;
1553   }
get_rpl_log_name()1554   inline const char *get_rpl_log_name() {
1555     return (group_master_log_name[0] ? group_master_log_name : "FIRST");
1556   }
1557 
1558   static size_t get_number_info_rli_fields();
1559 
1560   /**
1561      Sets bits for columns that are allowed to be `NULL`.
1562 
1563      @param nullable_fields the bitmap to hold the nullable fields.
1564   */
1565   static void set_nullable_fields(MY_BITMAP *nullable_fields);
1566 
1567   /**
1568     Indicate that a delay starts.
1569 
1570     This does not actually sleep; it only sets the state of this
1571     Relay_log_info object to delaying so that the correct state can be
1572     reported by SHOW SLAVE STATUS and SHOW PROCESSLIST.
1573 
1574     Requires rli->data_lock.
1575 
1576     @param delay_end The time when the delay shall end.
1577   */
1578   void start_sql_delay(time_t delay_end);
1579 
1580   /* Note that this is cast to uint32 in show_slave_status(). */
get_sql_delay()1581   time_t get_sql_delay() { return sql_delay; }
set_sql_delay(time_t _sql_delay)1582   void set_sql_delay(time_t _sql_delay) { sql_delay = _sql_delay; }
get_sql_delay_end()1583   time_t get_sql_delay_end() { return sql_delay_end; }
1584 
1585   Relay_log_info(bool is_slave_recovery,
1586 #ifdef HAVE_PSI_INTERFACE
1587                  PSI_mutex_key *param_key_info_run_lock,
1588                  PSI_mutex_key *param_key_info_data_lock,
1589                  PSI_mutex_key *param_key_info_sleep_lock,
1590                  PSI_mutex_key *param_key_info_thd_lock,
1591                  PSI_mutex_key *param_key_info_data_cond,
1592                  PSI_mutex_key *param_key_info_start_cond,
1593                  PSI_mutex_key *param_key_info_stop_cond,
1594                  PSI_mutex_key *param_key_info_sleep_cond,
1595 #endif
1596                  uint param_id, const char *param_channel, bool is_rli_fake);
1597   virtual ~Relay_log_info();
1598 
1599   /*
1600     Determines if a warning message on unsafe execution was
1601     already printed out to avoid clutering the error log
1602     with several warning messages.
1603   */
1604   bool reported_unsafe_warning;
1605 
1606   /*
1607     'sql_thread_kill_accepted is set to true when killed status is recognized.
1608   */
1609   bool sql_thread_kill_accepted;
1610 
get_row_stmt_start_timestamp()1611   time_t get_row_stmt_start_timestamp() { return row_stmt_start_timestamp; }
1612 
set_row_stmt_start_timestamp()1613   time_t set_row_stmt_start_timestamp() {
1614     if (row_stmt_start_timestamp == 0) row_stmt_start_timestamp = my_time(0);
1615 
1616     return row_stmt_start_timestamp;
1617   }
1618 
reset_row_stmt_start_timestamp()1619   void reset_row_stmt_start_timestamp() { row_stmt_start_timestamp = 0; }
1620 
set_long_find_row_note_printed()1621   void set_long_find_row_note_printed() { long_find_row_note_printed = true; }
1622 
unset_long_find_row_note_printed()1623   void unset_long_find_row_note_printed() {
1624     long_find_row_note_printed = false;
1625   }
1626 
is_long_find_row_note_printed()1627   bool is_long_find_row_note_printed() { return long_find_row_note_printed; }
1628 
1629  public:
1630   /**
1631     Delete the existing event and set a new one.  This class is
1632     responsible for freeing the event, the caller should not do that.
1633 
1634     @return 1 if an error was encountered, 0 otherwise.
1635   */
1636   virtual int set_rli_description_event(Format_description_log_event *fdle);
1637 
1638   /**
1639     Return the current Format_description_log_event.
1640   */
get_rli_description_event()1641   Format_description_log_event *get_rli_description_event() const {
1642     return rli_description_event;
1643   }
1644 
1645   /**
1646     adaptation for the slave applier to specific master versions.
1647   */
1648   ulong adapt_to_master_version(Format_description_log_event *fdle);
1649   ulong adapt_to_master_version_updown(ulong master_version,
1650                                        ulong current_version);
1651   uchar slave_version_split[3];  // bytes of the slave server version
1652   /*
1653     relay log info repository should be updated on relay log
1654     rotate. But when the transaction is split across two relay logs,
1655     update the repository will cause unexpected results and should
1656     be postponed till the 'commit' of the transaction is executed.
1657 
1658     A flag that set to 'true' when this type of 'forced flush'(at the
1659     time of rotate relay log) is postponed due to transaction split
1660     across the relay logs.
1661   */
1662   bool force_flush_postponed_due_to_split_trans;
1663 
get_commit_order_manager()1664   Commit_order_manager *get_commit_order_manager() { return commit_order_mngr; }
1665 
set_commit_order_manager(Commit_order_manager * mngr)1666   void set_commit_order_manager(Commit_order_manager *mngr) {
1667     commit_order_mngr = mngr;
1668   }
1669 
1670   /*
1671     Following set function is required to initialize the 'until_option' during
1672     MTS relay log recovery process.
1673 
1674     Ideally initialization of 'until_option' is done through
1675     rli::init_until_option. This init_until_option requires the main server
1676     thread object and it makes use of the thd->lex->mi object to initialize the
1677     'until_option'.
1678 
1679     But MTS relay log recovery process happens before the main server comes
1680     up at this time the THD object will not be available. Hence the following
1681     set function does the initialization of 'until_option'.
1682   */
set_until_option(Until_option * option)1683   void set_until_option(Until_option *option) {
1684     mysql_mutex_lock(&data_lock);
1685     until_option = option;
1686     mysql_mutex_unlock(&data_lock);
1687   }
1688 
clear_until_option()1689   void clear_until_option() {
1690     mysql_mutex_lock(&data_lock);
1691     if (until_option) {
1692       delete until_option;
1693       until_option = nullptr;
1694     }
1695     mysql_mutex_unlock(&data_lock);
1696   }
1697 
1698   bool set_info_search_keys(Rpl_info_handler *to);
1699 
1700   /**
1701     Get coordinator's RLI. Especially used get the rli from
1702     a slave thread, like this: thd->rli_slave->get_c_rli();
1703     thd could be a SQL thread or a worker thread
1704   */
get_c_rli()1705   virtual Relay_log_info *get_c_rli() { return this; }
1706 
1707   virtual const char *get_for_channel_str(bool upper_case = false) const;
1708 
1709   /**
1710     Set replication filter for the channel.
1711   */
set_filter(Rpl_filter * channel_filter)1712   inline void set_filter(Rpl_filter *channel_filter) {
1713     rpl_filter = channel_filter;
1714   }
1715 
1716  protected:
1717   Format_description_log_event *rli_description_event;
1718 
1719  private:
1720   /*
1721     Commit order manager to order commits made by its workers. In context of
1722     Multi Source Replication each worker will be ordered by the coresponding
1723     corrdinator's order manager.
1724    */
1725   Commit_order_manager *commit_order_mngr;
1726 
1727   /**
1728     Delay slave SQL thread by this amount of seconds.
1729     The delay is applied per transaction and based on the immediate master's
1730     commit time. Exceptionally, if a server in the replication chain does not
1731     support the commit timestamps in Gtid_log_event, the delay is applied per
1732     event and is based on the event timestamp.
1733     This is set with CHANGE MASTER TO MASTER_DELAY=X.
1734 
1735     Guarded by data_lock.  Initialized by the client thread executing
1736     START SLAVE.  Written by client threads executing CHANGE MASTER TO
1737     MASTER_DELAY=X.  Read by SQL thread and by client threads
1738     executing SHOW SLAVE STATUS.  Note: must not be written while the
1739     slave SQL thread is running, since the SQL thread reads it without
1740     a lock when executing flush_info().
1741   */
1742   time_t sql_delay;
1743 
1744   /**
1745     During a delay, specifies the point in time when the delay ends.
1746 
1747     This is used for the SQL_Remaining_Delay column in SHOW SLAVE STATUS.
1748 
1749     Guarded by data_lock. Written by the sql thread.  Read by client
1750     threads executing SHOW SLAVE STATUS.
1751   */
1752   time_t sql_delay_end;
1753 
1754   uint32 m_flags;
1755 
1756   /*
1757     Before the MASTER_DELAY parameter was added (WL#344), relay_log.info
1758     had 4 lines. Now it has 5 lines.
1759   */
1760   static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY = 5;
1761 
1762   /*
1763     Before the WL#5599, relay_log.info had 5 lines. Now it has 6 lines.
1764   */
1765   static const int LINES_IN_RELAY_LOG_INFO_WITH_WORKERS = 6;
1766 
1767   /*
1768     Before the Id was added (BUG#2334346), relay_log.info
1769     had 6 lines. Now it has 7 lines.
1770   */
1771   static const int LINES_IN_RELAY_LOG_INFO_WITH_ID = 7;
1772 
1773   /*
1774     Add a channel in the slave relay log info
1775   */
1776   static const int LINES_IN_RELAY_LOG_INFO_WITH_CHANNEL = 8;
1777 
1778   /*
1779     Represents line number in relay_log.info to save PRIVILEGE_CHECKS_USERNAME.
1780     It is username part of PRIVILEGES_CHECKS_USER column in
1781     performance_schema.replication_applier_configuration.
1782   */
1783   static const int LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_USERNAME = 9;
1784 
1785   /*
1786     Maximum length of PRIVILEGE_CHECKS_USERNAME.
1787   */
1788   static const int PRIV_CHECKS_USERNAME_LENGTH = 32;
1789 
1790   /*
1791     Represents line number in relay_log.info to save PRIVILEGE_CHECKS_HOSTNAME.
1792     It is hostname part of PRIVILEGES_CHECKS_USER column in
1793     performance_schema.replication_applier_configuration.
1794   */
1795   static const int LINES_IN_RELAY_LOG_INFO_WITH_PRIV_CHECKS_HOSTNAME = 10;
1796 
1797   /*
1798     Maximum length of PRIVILEGE_CHECKS_USERNAME.
1799   */
1800   static const int PRIV_CHECKS_HOSTNAME_LENGTH = 255;
1801 
1802   /*
1803     Represents line number in relay_log.info to save REQUIRE_ROW_FORMAT
1804   */
1805   static const int LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_ROW_FORMAT = 11;
1806 
1807   /*
1808     Represents line number in relay_log.info to save
1809     REQUIRE_TABLE_PRIMARY_KEY_CHECK
1810   */
1811   static const int
1812       LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_TABLE_PRIMARY_KEY_CHECK = 12;
1813 
1814   /*
1815     Total lines in relay_log.info.
1816     This has to be updated every time a member is added or removed.
1817   */
1818   static const int MAXIMUM_LINES_IN_RELAY_LOG_INFO_FILE =
1819       LINES_IN_RELAY_LOG_INFO_WITH_REQUIRE_TABLE_PRIMARY_KEY_CHECK;
1820 
1821   bool read_info(Rpl_info_handler *from);
1822   bool write_info(Rpl_info_handler *to);
1823 
1824   Relay_log_info(const Relay_log_info &info);
1825   Relay_log_info &operator=(const Relay_log_info &info);
1826 
1827   /*
1828     Runtime state for printing a note when slave is taking
1829     too long while processing a row event.
1830    */
1831   time_t row_stmt_start_timestamp;
1832   bool long_find_row_note_printed;
1833 
1834   /**
1835     sets the suffix required for relay log names in multisource
1836     replication. When --relay-log option is not provided, the
1837     names of the relay log files are relaylog.0000x or
1838     relaylog-CHANNEL.00000x in the case of MSR. However, if
1839     that option is provided, then the names of the relay log
1840     files are <relay-log-option>.0000x or
1841     <relay-log-option>-CHANNEL.00000x in the case of MSR.
1842 
1843     The function adds a channel suffix (according to the channel to
1844     file name conventions and conversions) to the relay log file.
1845 
1846     @todo: truncate the log file if length exceeds.
1847 
1848     @param[in, out]  buff       buffer to store the complete relay log file name
1849     @param[in]       buff_size  size of buffer buff
1850     @param[in]       base_name  the base name of the relay log file
1851   */
1852   const char *add_channel_to_relay_log_name(char *buff, uint buff_size,
1853                                             const char *base_name);
1854 
1855   /*
1856     Applier thread InnoDB priority.
1857     When two transactions conflict inside InnoDB, the one with
1858     greater priority wins.
1859     Priority must be set before applier thread start so that all
1860     executed transactions have the same priority.
1861   */
1862   int thd_tx_priority;
1863 
1864   /* The object stores and handles START SLAVE UNTIL option */
1865   Until_option *until_option;
1866 
1867  public:
1868   /*
1869     The boolean is set to true when the binlog (rli_fake) or slave
1870     (rli_slave) applier thread detaches any engine ha_data
1871     it has dealt with at time of XA START processing.
1872     The boolean is reset to false at the end of XA PREPARE,
1873     XA COMMIT ONE PHASE for the binlog applier, and
1874     at internal rollback of the slave applier at the same time with
1875     the engine ha_data re-attachment.
1876   */
1877   bool is_engine_ha_data_detached;
1878   /**
1879     Reference to being applied event. The member is set at event reading
1880     and gets reset at the end of the event lifetime.
1881     See more in @c RLI_current_event_raii that provides the main
1882     interface to the member.
1883   */
1884   Log_event *current_event;
1885 
1886   /**
1887     Raised when slave applies and writes to its binary log statement
1888     which is not atomic DDL and has no XID assigned. Checked at commit
1889     time to decide whether it is safe to update slave info table
1890     within the same transaction as the write to binary log or this
1891     should be deffered. The deffered scenario applies for not XIDed events
1892     in which case such update might be lost on recovery.
1893   */
1894   bool ddl_not_atomic;
1895 
set_thd_tx_priority(int priority)1896   void set_thd_tx_priority(int priority) { thd_tx_priority = priority; }
1897 
get_thd_tx_priority()1898   int get_thd_tx_priority() { return thd_tx_priority; }
1899 
1900   const char *get_until_log_name();
1901   my_off_t get_until_log_pos();
is_until_satisfied_at_start_slave()1902   bool is_until_satisfied_at_start_slave() {
1903     return until_option != nullptr &&
1904            until_option->is_satisfied_at_start_slave();
1905   }
is_until_satisfied_before_dispatching_event(const Log_event * ev)1906   bool is_until_satisfied_before_dispatching_event(const Log_event *ev) {
1907     return until_option != nullptr &&
1908            until_option->is_satisfied_before_dispatching_event(ev);
1909   }
is_until_satisfied_after_dispatching_event()1910   bool is_until_satisfied_after_dispatching_event() {
1911     return until_option != nullptr &&
1912            until_option->is_satisfied_after_dispatching_event();
1913   }
1914   /**
1915    Intialize until option object when starting slave.
1916 
1917    @param[in] thd The thread object of current session.
1918    @param[in] master_param the parameters of START SLAVE.
1919 
1920    @return int
1921      @retval 0      Succeeds to initialize until option object.
1922      @retval <> 0   A defined error number is return if any error happens.
1923  */
1924   int init_until_option(THD *thd, const LEX_MASTER_INFO *master_param);
1925 
1926   /**
1927     Detaches the engine ha_data from THD. The fact
1928     is memorized in @c is_engine_ha_detached flag.
1929 
1930     @param  thd a reference to THD
1931   */
1932 
1933   void detach_engine_ha_data(THD *thd);
1934 
1935   /**
1936     Reattaches the engine ha_data to THD. The fact
1937     is memorized in @c is_engine_ha_detached flag.
1938 
1939     @param  thd a reference to THD
1940   */
1941 
1942   void reattach_engine_ha_data(THD *thd);
1943   /**
1944     Drops the engine ha_data flag when it is up.
1945     The method is run at execution points of the engine ha_data
1946     re-attachment.
1947 
1948     @return true   when THD has detached the engine ha_data,
1949             false  otherwise
1950   */
1951 
unflag_detached_engine_ha_data()1952   bool unflag_detached_engine_ha_data() {
1953     bool rc = false;
1954 
1955     if (is_engine_ha_data_detached)
1956       rc = !(is_engine_ha_data_detached = false);  // return the old value
1957 
1958     return rc;
1959   }
1960 
1961   /**
1962     Execute actions at replicated atomic DLL post rollback time.
1963     This include marking the current atomic DDL query-log-event
1964     as having processed.
1965     This measure is necessary to avoid slave info table update execution
1966     when @c pre_commit() hook is called as part of DDL's eventual
1967     implicit commit.
1968   */
post_rollback()1969   void post_rollback() {
1970     static_cast<Query_log_event *>(current_event)->has_ddl_committed = true;
1971   }
1972 
1973   /**
1974     The method implements a pre-commit hook to add up a new statement
1975     typically to a DDL transaction to update the slave info table.
1976     Note, in the non-transactional repository case the slave info
1977     is updated after successful commit of the main transaction.
1978 
1979     @return false as success, otherwise true
1980   */
pre_commit()1981   bool pre_commit() {
1982     bool rc = false;
1983 
1984     if (is_transactional()) {
1985       static_cast<Query_log_event *>(current_event)->has_ddl_committed = true;
1986       rc = commit_positions();
1987     }
1988     return rc;
1989   }
1990   /**
1991     Cleanup of any side effect that pre_commit() inflicts, including
1992     restore of the last executed group coordinates in case the current group
1993     has been destined to rollback, and signaling to possible waiters
1994     in the positive case.
1995 
1996     @param on_rollback  when true the method carries out rollback action
1997   */
1998   virtual void post_commit(bool on_rollback);
1999 };
2000 
2001 /**
2002   Negation operator for `enum_priv_checks_status`, to facilitate validation
2003   against `SUCCESS`. To test for error status, use the `!!` idiom.
2004 
2005   @param status the status code to check against `SUCCESS`
2006 
2007   @return true if the status is `SUCCESS` and false otherwise.
2008  */
2009 bool operator!(Relay_log_info::enum_priv_checks_status status);
2010 
2011 /**
2012   Negation operator for `enum_require_row_status`, to facilitate validation
2013   against `SUCCESS`. To test for error status, use the `!!` idiom.
2014 
2015   @param status the status code to check against `SUCCESS`
2016 
2017   @return true if the status is `SUCCESS` and false otherwise.
2018  */
2019 bool operator!(Relay_log_info::enum_require_row_status status);
2020 
2021 bool mysql_show_relaylog_events(THD *thd);
2022 
2023 /**
2024    @param  thd a reference to THD
2025    @return true if thd belongs to a Worker thread and false otherwise.
2026 */
is_mts_worker(const THD * thd)2027 inline bool is_mts_worker(const THD *thd) {
2028   return thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER;
2029 }
2030 
2031 /**
2032  Auxiliary function to check if we have a db partitioned MTS
2033  */
2034 bool is_mts_db_partitioned(Relay_log_info *rli);
2035 
2036 /**
2037   Checks whether the supplied event encodes a (2pc-aware) DDL
2038   that has been already committed.
2039 
2040   @param  ev    A reference to Query-log-event
2041   @return true  when the event is already committed transactional DDL
2042 */
is_committed_ddl(Log_event * ev)2043 inline bool is_committed_ddl(Log_event *ev) {
2044   return ev->get_type_code() == binary_log::QUERY_EVENT &&
2045          /* has been already committed */
2046          static_cast<Query_log_event *>(ev)->has_ddl_committed;
2047 }
2048 
2049 /**
2050   Checks whether the transaction identified by the argument
2051   is executed by a slave applier thread is an atomic DDL
2052   not yet committed (see @c Query_log_event::has_ddl_committed).
2053   THD::is_operating_substatement_implicitly filters out intermediate
2054   commits done by non-atomic DDLs.
2055   The error-tagged atomic statements are regarded as non-atomic
2056   therefore this predicate returns negative in such case.
2057 
2058   Note that call to is_atomic_ddl() returns "approximate" outcome in
2059   this case as it misses information about type of tables used by the DDL.
2060 
2061   This can be a problem for binlogging slave, as updates to slave info
2062   which happen in the same transaction as write of binary log event
2063   without XID might be lost on recovery. To avoid this problem
2064   RLI::ddl_not_atomic flag is employed which is set to true when
2065   non-atomic DDL without XID is written to the binary log.
2066 
2067   "Approximate" outcome is always fine for non-binlogging slave as in
2068   this case commit happens using one-phase routine for which recovery
2069   is always correct.
2070 
2071   @param  thd   a pointer to THD describing the transaction context
2072   @return true  when a slave applier thread is set to commmit being processed
2073                 DDL query-log-event, otherwise returns false.
2074 */
is_atomic_ddl_commit_on_slave(THD * thd)2075 inline bool is_atomic_ddl_commit_on_slave(THD *thd) {
2076   DBUG_ASSERT(thd);
2077 
2078   Relay_log_info *rli = thd->rli_slave;
2079 
2080   /* Early return is about an error in the SQL thread initialization */
2081   if (!rli) return false;
2082 
2083   return ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
2084            thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) &&
2085           rli->current_event)
2086              ? (rli->is_transactional() &&
2087                 /* has not yet committed */
2088                 (rli->current_event->get_type_code() ==
2089                      binary_log::QUERY_EVENT &&
2090                  !static_cast<Query_log_event *>(rli->current_event)
2091                       ->has_ddl_committed) &&
2092                 /* unless slave binlogger identified non-atomic */
2093                 !rli->ddl_not_atomic &&
2094                 /* slave info is not updated when a part of multi-DROP-TABLE
2095                    commits */
2096                 !thd->is_commit_in_middle_of_statement &&
2097                 (is_atomic_ddl(thd, true) &&
2098                  !thd->is_operating_substatement_implicitly) &&
2099                 /* error-tagged atomic DDL do not update yet slave info */
2100                 static_cast<Query_log_event *>(rli->current_event)
2101                         ->error_code == 0)
2102              : false;
2103 }
2104 
2105 /**
2106   RAII class to control the slave applier execution context binding
2107   with a being handled event. The main object of control is Query-log-event
2108   containing DDL statement.
2109   The member RLI::current_event is set to refer to an event once it is
2110   read, e.g by next_event() and is reset to NULL at exiting a
2111   read-exec loop. Once the event is destroyed RLI::current_event must be reset
2112   or guaranteed not be accessed anymore.
2113   In the MTS execution the worker is reliably associated with an event
2114   only with the latter is not deferred. This includes Query-log-event.
2115 */
2116 class RLI_current_event_raii {
2117   Relay_log_info *m_rli;
2118 
2119  public:
RLI_current_event_raii(Relay_log_info * rli_arg,Log_event * ev)2120   RLI_current_event_raii(Relay_log_info *rli_arg, Log_event *ev)
2121       : m_rli(rli_arg) {
2122     m_rli->current_event = ev;
2123   }
set_current_event(Log_event * ev)2124   void set_current_event(Log_event *ev) { m_rli->current_event = ev; }
~RLI_current_event_raii()2125   ~RLI_current_event_raii() { m_rli->current_event = nullptr; }
2126 };
2127 
2128 /**
2129  @class MDL_lock_guard
2130 
2131  Utility class to allow RAII pattern with `MDL_request` and `MDL_context`
2132  classes.
2133  */
2134 class MDL_lock_guard {
2135  public:
2136   /**
2137    Constructor that initializes the object and the target `THD` object but
2138    doesn't try to acquire any lock.
2139 
2140    @param target THD object, source for the `MDL_context` to use.
2141    */
2142   MDL_lock_guard(THD *target);
2143   /**
2144    Constructor that initializes the object and the target `THD` object and tries
2145    to acquire the lock identified by `namespace_arg` with MDL type identified by
2146    `mdl_type_arg`.
2147 
2148    If the `blocking` parameter is true, it will instantly try to acquire the
2149    lock and block. If the `blocking` parameter is false, it will first test if
2150    the lock is already acquired and only try to lock if no conflicting lock is
2151    already acquired.
2152 
2153    @param target THD object, source for the `MDL_context` to use.
2154    @param namespace_arg MDL key namespace to acquire the lock from.
2155    @param mdl_type_arg MDL acquisition type
2156    @param blocking whether or not the execution should block if the lock is
2157                    already acquired.
2158    */
2159   MDL_lock_guard(THD *target, MDL_key::enum_mdl_namespace namespace_arg,
2160                  enum_mdl_type mdl_type_arg, bool blocking = false);
2161   /**
2162    Destructor that unlocks all acquired locks.
2163    */
2164   virtual ~MDL_lock_guard();
2165 
2166   /**
2167    Uses the target `THD` object MDL context to acquire the lock identified by
2168    `namespace_arg` with MDL type identified by `mdl_type_arg`.
2169 
2170    If the `blocking` parameter is true, it will instantly try to acquire the
2171    lock and block. If the `blocking` parameter is false, it will first test if
2172    the lock is already acquired and only try to lock if no conflicting lock is
2173    already acquired.
2174 
2175    The lock is determined to have been acquired if the `THD` object MDL context
2176    hasn't already a lock and the lock is acquired. In other words, if the MDL
2177    context already has acquired the lock, the method will return failure.
2178 
2179    @param namespace_arg MDL key namespace to acquire the lock from.
2180    @param mdl_type_arg MDL acquisition type
2181    @param blocking whether or not the execution should block if the lock is
2182                    already acquired.
2183 
2184    @return false if the lock has been acquired by this method invocation and
2185            true if not.
2186    */
2187   bool lock(MDL_key::enum_mdl_namespace namespace_arg,
2188             enum_mdl_type mdl_type_arg, bool blocking = false);
2189   /**
2190    Returns whether or not the lock as been acquired within this object
2191    life-cycle.
2192 
2193    @return true if the lock has been acquired within this object life-cycle.
2194    */
2195   bool is_locked();
2196 
2197  private:
2198   /** The `THD` object holding the MDL context used for acquiring/releasing. */
2199   THD *m_target;
2200   /** The MDL request holding the MDL ticket issued upon acquisition */
2201   MDL_request m_request;
2202 };
2203 
2204 /**
2205   @class Applier_security_context_guard
2206 
2207   Utility class to allow RAII pattern with `Security_context` class.
2208 
2209   At initiliazation, if the `THD` main security context isn't already the
2210   appropriate one, it copies the `Relay_log_info::info_thd::security_context`
2211   and replaces it with the one initialized with the `PRIVILEGE_CHECK_USER` user.
2212   At deinitialization, it copies the backed up security context.
2213 
2214   It also deals with the case where no privilege checks are required, meaning,
2215   `PRIVILEGE_CHECKS_USER` is `NULL`.
2216 
2217   Usage examples:
2218 
2219   (1)
2220   @code
2221       Applier_security_context_guard security_context{rli, thd};
2222       if (!security_context.has_access({SUPER_ACL})) {
2223         return ER_NO_ACCESS;
2224       }
2225   @endcode
2226 
2227   (4)
2228   @code
2229       Applier_security_context_guard security_context{rli, thd};
2230       if (!security_context.has_access(
2231               {{CREATE_ACL | INSERT_ACL | UPDATE_ACL, table},
2232                {SELECT_ACL, table}})) {
2233         return ER_NO_ACCESS;
2234       }
2235   @endcode
2236  */
2237 
2238 class Applier_security_context_guard {
2239  public:
2240   /**
2241     If needed, backs up the current `thd` security context and replaces it with
2242     a security context for `PRIVILEGE_CHECKS_USER` user.
2243 
2244     @param rli the `Relay_log_info` object that holds the
2245                `PRIVILEGE_CHECKS_USER` info.
2246     @param thd the `THD` for which initialize the security context.
2247    */
2248   Applier_security_context_guard(Relay_log_info const *rli, THD const *thd);
2249   /**
2250     Destructor that restores the backed up security context, if needed.
2251    */
2252   virtual ~Applier_security_context_guard();
2253 
2254   // --> Deleted constructors and methods to remove default move/copy semantics
2255   Applier_security_context_guard(const Applier_security_context_guard &) =
2256       delete;
2257   Applier_security_context_guard(Applier_security_context_guard &&) = delete;
2258   Applier_security_context_guard &operator=(
2259       const Applier_security_context_guard &) = delete;
2260   Applier_security_context_guard &operator=(Applier_security_context_guard &&) =
2261       delete;
2262   // <--
2263 
2264   /**
2265     Returns whether or not privilege checks may be skipped within the current
2266     context.
2267 
2268     @return true if privilege checks may be skipped and false otherwise.
2269    */
2270   bool skip_priv_checks() const;
2271   /**
2272     Checks if the `PRIVILEGE_CHECKS_USER` user has access to the privilieges
2273     passed on by `extra_privileges` parameter as well as to the privileges
2274     passed on at initilization time.
2275 
2276     This particular method checks those privileges agains a given table and
2277     against that table's columns - the ones that are used or changed in the
2278     event.
2279 
2280     @param extra_privileges set of privileges to check, additionally to those
2281                             passed on at initialization. It's a list of
2282                             (privilege, TABLE*, Rows_log_event*) tuples.
2283 
2284     @return true if the privileges are included in the security context and
2285             false, otherwise.
2286    */
2287   bool has_access(
2288       std::vector<std::tuple<ulong, TABLE const *, Rows_log_event *>>
2289           &extra_privileges) const;
2290   /**
2291     Checks if the `PRIVILEGE_CHECKS_USER` user has access to the privilieges
2292     passed on by `extra_privileges` parameter as well as to the privileges
2293     passed on at initilization time.
2294 
2295     @param extra_privileges set of privileges to check, additionally to those
2296                             passed on at initialization. It's a list of
2297                             privileges to be checked against any database.
2298 
2299     @return true if the privileges are included in the security context and
2300             false, otherwise.
2301    */
2302   bool has_access(std::initializer_list<std::string> extra_privileges) const;
2303 
2304   /**
2305     Checks if the `PRIVILEGE_CHECKS_USER` user has access to the privilieges
2306     passed on by `extra_privileges` parameter as well as to the privileges
2307     passed on at initilization time.
2308 
2309     @param extra_privileges set of privileges to check, additionally to those
2310                             passed on at initialization. It's a list of
2311                             privileges to be checked against any database.
2312 
2313     @return true if the privileges are included in the security context and
2314             false, otherwise.
2315    */
2316   bool has_access(std::initializer_list<ulong> extra_privileges) const;
2317 
2318   /**
2319     Returns the username for the user for which the security context was
2320     initialized.
2321 
2322     If `PRIVILEGE_CHECKS_USER` was configured for the target `Relay_log_info`
2323     object, that one is returned.
2324 
2325     Otherwise, the username associated with the `Security_context` initialized
2326     for `Relay_log_info::info_thd` will be returned.
2327 
2328     @return an `std::string` holding the username for the active security
2329             context.
2330    */
2331   std::string get_username() const;
2332   /**
2333     Returns the hostname for the user for which the security context was
2334     initialized.
2335 
2336     If `PRIVILEGE_CHECKS_USER` was configured for the target `Relay_log_info`
2337     object, that one is returned.
2338 
2339     Otherwise, the hostname associated with the `Security_context` initialized
2340     for `Relay_log_info::info_thd` will be returned.
2341 
2342     @return an `std::string` holding the hostname for the active security
2343             context.
2344    */
2345   std::string get_hostname() const;
2346 
2347  private:
2348   /**
2349     The `Relay_log_info` object holding the info required to initialize the
2350     context.
2351    */
2352   Relay_log_info const *m_target;
2353   /**
2354     The `THD` object for which the security context will be initialized.
2355    */
2356   THD const *m_thd;
2357   /** Applier security context based on `PRIVILEGE_CHECK_USER` user */
2358   Security_context m_applier_security_ctx;
2359   /** Currently in use security context */
2360   Security_context *m_current;
2361   /** Backed up security context */
2362   Security_context *m_previous;
2363   /** Flag that states if privilege check should be skipped */
2364   bool m_privilege_checks_none;
2365   /** Flag that states if there is a logged user */
2366   bool m_logged_in_acl_user;
2367 
2368   void extract_columns_to_check(TABLE const *table, Rows_log_event *event,
2369                                 std::vector<std::string> &columns) const;
2370 };
2371 
2372 #endif /* RPL_RLI_H */
2373