1 /* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/rpl_handler.h"
24 
25 #include <string.h>
26 #include <memory>
27 #include <new>
28 #include <unordered_map>
29 #include <utility>
30 #include <vector>
31 
32 #include "lex_string.h"
33 #include "map_helpers.h"
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_io.h"
37 #include "my_loglevel.h"
38 #include "mysql/components/services/log_builtins.h"
39 #include "mysql/components/services/log_shared.h"
40 #include "mysql/plugin.h"
41 #include "mysql/psi/mysql_mutex.h"
42 #include "mysql/psi/psi_base.h"
43 #include "mysql/service_mysql_alloc.h"
44 #include "mysqld_error.h"
45 #include "prealloced_array.h"
46 #include "sql/current_thd.h"
47 #include "sql/debug_sync.h"  // DEBUG_SYNC
48 #include "sql/handler.h"
49 #include "sql/item_func.h"  // user_var_entry
50 #include "sql/key.h"
51 #include "sql/log.h"
52 #include "sql/mysqld.h"  // server_uuid
53 #include "sql/psi_memory_key.h"
54 #include "sql/replication.h"  // Trans_param
55 #include "sql/rpl_gtid.h"
56 #include "sql/rpl_mi.h"     // Master_info
57 #include "sql/set_var.h"    // OPT_PERSIST
58 #include "sql/sql_class.h"  // THD
59 #include "sql/sql_const.h"
60 #include "sql/sql_lex.h"
61 #include "sql/sql_parse.h"   // sql_command_flags
62 #include "sql/sql_plugin.h"  // plugin_int_to_ref
63 #include "sql/system_variables.h"
64 #include "sql/table.h"
65 #include "sql/thr_malloc.h"
66 #include "sql/transaction_info.h"
67 #include "sql_string.h"
68 
69 Trans_delegate *transaction_delegate;
70 Binlog_storage_delegate *binlog_storage_delegate;
71 Server_state_delegate *server_state_delegate;
72 
73 Binlog_transmit_delegate *binlog_transmit_delegate;
74 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
75 
Observer_info(void * ob,st_plugin_int * p)76 Observer_info::Observer_info(void *ob, st_plugin_int *p)
77     : observer(ob), plugin_int(p) {
78   plugin = plugin_int_to_ref(plugin_int);
79 }
80 
Delegate(PSI_rwlock_key key)81 Delegate::Delegate(
82 #ifdef HAVE_PSI_RWLOCK_INTERFACE
83     PSI_rwlock_key key
84 #endif
85 ) {
86   inited = false;
87 #ifdef HAVE_PSI_RWLOCK_INTERFACE
88   if (mysql_rwlock_init(key, &lock)) return;
89 #else
90   if (mysql_rwlock_init(0, &lock)) return;
91 #endif
92   init_sql_alloc(key_memory_delegate, &memroot, 1024, 0);
93   inited = true;
94 }
95 
96 /*
97   structure to save transaction log filename and position
98 */
99 typedef struct Trans_binlog_info {
100   my_off_t log_pos;
101   char log_file[FN_REFLEN];
102 } Trans_binlog_info;
103 
get_user_var_int(const char * name,long long int * value,int * null_value)104 int get_user_var_int(const char *name, long long int *value, int *null_value) {
105   bool null_val;
106   THD *thd = current_thd;
107 
108   /* Protects thd->user_vars. */
109   mysql_mutex_lock(&thd->LOCK_thd_data);
110 
111   const auto it = thd->user_vars.find(name);
112   if (it == thd->user_vars.end()) {
113     mysql_mutex_unlock(&thd->LOCK_thd_data);
114     return 1;
115   }
116   *value = it->second->val_int(&null_val);
117   if (null_value) *null_value = null_val;
118   mysql_mutex_unlock(&thd->LOCK_thd_data);
119   return 0;
120 }
121 
get_user_var_real(const char * name,double * value,int * null_value)122 int get_user_var_real(const char *name, double *value, int *null_value) {
123   bool null_val;
124   THD *thd = current_thd;
125 
126   /* Protects thd->user_vars. */
127   mysql_mutex_lock(&thd->LOCK_thd_data);
128 
129   const auto it = thd->user_vars.find(name);
130   if (it == thd->user_vars.end()) {
131     mysql_mutex_unlock(&thd->LOCK_thd_data);
132     return 1;
133   }
134   *value = it->second->val_real(&null_val);
135   if (null_value) *null_value = null_val;
136   mysql_mutex_unlock(&thd->LOCK_thd_data);
137   return 0;
138 }
139 
get_user_var_str(const char * name,char * value,size_t len,unsigned int precision,int * null_value)140 int get_user_var_str(const char *name, char *value, size_t len,
141                      unsigned int precision, int *null_value) {
142   String str;
143   bool null_val;
144   THD *thd = current_thd;
145 
146   /* Protects thd->user_vars. */
147   mysql_mutex_lock(&thd->LOCK_thd_data);
148 
149   const auto it = thd->user_vars.find(name);
150   if (it == thd->user_vars.end()) {
151     mysql_mutex_unlock(&thd->LOCK_thd_data);
152     return 1;
153   }
154   it->second->val_str(&null_val, &str, precision);
155   strncpy(value, str.c_ptr(), len);
156   if (null_value) *null_value = null_val;
157   mysql_mutex_unlock(&thd->LOCK_thd_data);
158   return 0;
159 }
160 
delegates_init()161 int delegates_init() {
162   alignas(Trans_delegate) static char place_trans_mem[sizeof(Trans_delegate)];
163   alignas(Binlog_storage_delegate) static char
164       place_storage_mem[sizeof(Binlog_storage_delegate)];
165   alignas(Server_state_delegate) static char
166       place_state_mem[sizeof(Server_state_delegate)];
167   alignas(Binlog_transmit_delegate) static char
168       place_transmit_mem[sizeof(Binlog_transmit_delegate)];
169   alignas(Binlog_relay_IO_delegate) static char
170       place_relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
171 
172   transaction_delegate = new (place_trans_mem) Trans_delegate;
173   if (!transaction_delegate->is_inited()) {
174     LogErr(ERROR_LEVEL, ER_RPL_TRX_DELEGATES_INIT_FAILED);
175     return 1;
176   }
177 
178   binlog_storage_delegate = new (place_storage_mem) Binlog_storage_delegate;
179   if (!binlog_storage_delegate->is_inited()) {
180     LogErr(ERROR_LEVEL, ER_RPL_BINLOG_STORAGE_DELEGATES_INIT_FAILED);
181     return 1;
182   }
183 
184   server_state_delegate = new (place_state_mem) Server_state_delegate;
185   binlog_transmit_delegate = new (place_transmit_mem) Binlog_transmit_delegate;
186   if (!binlog_transmit_delegate->is_inited()) {
187     LogErr(ERROR_LEVEL, ER_RPL_BINLOG_TRANSMIT_DELEGATES_INIT_FAILED);
188     return 1;
189   }
190 
191   binlog_relay_io_delegate = new (place_relay_io_mem) Binlog_relay_IO_delegate;
192   if (!binlog_relay_io_delegate->is_inited()) {
193     LogErr(ERROR_LEVEL, ER_RPL_BINLOG_RELAY_DELEGATES_INIT_FAILED);
194     return 1;
195   }
196 
197   return 0;
198 }
199 
delegates_destroy()200 void delegates_destroy() {
201   if (transaction_delegate) transaction_delegate->~Trans_delegate();
202   if (binlog_storage_delegate)
203     binlog_storage_delegate->~Binlog_storage_delegate();
204   if (server_state_delegate) server_state_delegate->~Server_state_delegate();
205   if (binlog_transmit_delegate)
206     binlog_transmit_delegate->~Binlog_transmit_delegate();
207   if (binlog_relay_io_delegate)
208     binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
209 }
210 
211 /*
212   This macro is used by almost all the Delegate methods to iterate
213   over all the observers running given callback function of the
214   delegate .
215 
216   Add observer plugins to the thd->lex list, after each statement, all
217   plugins add to thd->lex will be automatically unlocked.
218  */
219 #define FOREACH_OBSERVER(r, f, args)                                   \
220   Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED);       \
221   read_lock();                                                         \
222   Observer_info_iterator iter = observer_info_iter();                  \
223   Observer_info *info = iter++;                                        \
224   for (; info; info = iter++) {                                        \
225     plugin_ref plugin = my_plugin_lock(0, &info->plugin);              \
226     if (!plugin) {                                                     \
227       /* plugin is not intialized or deleted, this is not an error */  \
228       r = 0;                                                           \
229       break;                                                           \
230     }                                                                  \
231     plugins.push_back(plugin);                                         \
232     if (((Observer *)info->observer)->f &&                             \
233         ((Observer *)info->observer)->f args) {                        \
234       r = 1;                                                           \
235       LogEvent()                                                       \
236           .prio(ERROR_LEVEL)                                           \
237           .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED)                      \
238           .subsys(LOG_SUBSYSTEM_TAG)                                   \
239           .function(#f)                                                \
240           .message("Run function '" #f "' in plugin '%s' failed",      \
241                    info->plugin_int->name.str);                        \
242       break;                                                           \
243     }                                                                  \
244   }                                                                    \
245   unlock();                                                            \
246   /*                                                                   \
247      Unlock plugins should be done after we released the Delegate lock \
248      to avoid possible deadlock when this is the last user of the      \
249      plugin, and when we unlock the plugin, it will try to             \
250      deinitialize the plugin, which will try to lock the Delegate in   \
251      order to remove the observers.                                    \
252   */                                                                   \
253   if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
254 
255 #define FOREACH_OBSERVER_ERROR_OUT(r, f, args, out)                    \
256   Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED);       \
257   read_lock();                                                         \
258   Observer_info_iterator iter = observer_info_iter();                  \
259   Observer_info *info = iter++;                                        \
260                                                                        \
261   int error_out = 0;                                                   \
262   for (; info; info = iter++) {                                        \
263     plugin_ref plugin = my_plugin_lock(0, &info->plugin);              \
264     if (!plugin) {                                                     \
265       /* plugin is not intialized or deleted, this is not an error */  \
266       r = 0;                                                           \
267       break;                                                           \
268     }                                                                  \
269     plugins.push_back(plugin);                                         \
270                                                                        \
271     bool hook_error = false;                                           \
272     hook_error = ((Observer *)info->observer)->f(args, error_out);     \
273                                                                        \
274     out += error_out;                                                  \
275     if (hook_error) {                                                  \
276       r = 1;                                                           \
277       LogEvent()                                                       \
278           .prio(ERROR_LEVEL)                                           \
279           .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED)                      \
280           .subsys(LOG_SUBSYSTEM_TAG)                                   \
281           .function(#f)                                                \
282           .message("Run function '" #f "' in plugin '%s' failed",      \
283                    info->plugin_int->name.str);                        \
284       break;                                                           \
285     }                                                                  \
286   }                                                                    \
287   unlock();                                                            \
288   /*                                                                   \
289      Unlock plugins should be done after we released the Delegate lock \
290      to avoid possible deadlock when this is the last user of the      \
291      plugin, and when we unlock the plugin, it will try to             \
292      deinitialize the plugin, which will try to lock the Delegate in   \
293      order to remove the observers.                                    \
294   */                                                                   \
295   if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
296 
se_before_commit(THD *,plugin_ref plugin,void * arg)297 static bool se_before_commit(THD *, plugin_ref plugin, void *arg) {
298   handlerton *hton = plugin_data<handlerton *>(plugin);
299   if (hton->se_before_commit) hton->se_before_commit(arg);
300   return false;
301 }
302 
before_commit(THD * thd,bool all,Binlog_cache_storage * trx_cache_log,Binlog_cache_storage * stmt_cache_log,ulonglong cache_log_max_size,bool is_atomic_ddl_arg)303 int Trans_delegate::before_commit(THD *thd, bool all,
304                                   Binlog_cache_storage *trx_cache_log,
305                                   Binlog_cache_storage *stmt_cache_log,
306                                   ulonglong cache_log_max_size,
307                                   bool is_atomic_ddl_arg) {
308   DBUG_TRACE;
309   Trans_param param;
310   TRANS_PARAM_ZERO(param);
311   param.server_id = thd->server_id;
312   param.server_uuid = server_uuid;
313   param.thread_id = thd->thread_id();
314   param.gtid_info.type = thd->variables.gtid_next.type;
315   param.gtid_info.sidno = thd->variables.gtid_next.gtid.sidno;
316   param.gtid_info.gno = thd->variables.gtid_next.gtid.gno;
317   param.trx_cache_log = trx_cache_log;
318   param.stmt_cache_log = stmt_cache_log;
319   param.cache_log_max_size = cache_log_max_size;
320   param.original_commit_timestamp = &thd->variables.original_commit_timestamp;
321   param.is_atomic_ddl = is_atomic_ddl_arg;
322   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
323   param.group_replication_consistency =
324       thd->variables.group_replication_consistency;
325   param.original_server_version = &(thd->variables.original_server_version);
326   param.immediate_server_version = &(thd->variables.immediate_server_version);
327   param.is_create_table_as_select =
328       (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
329        thd->lex->select_lex->get_fields_list()->elements);
330 
331   bool is_real_trans =
332       (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
333   if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
334 
335   int ret = 0;
336   FOREACH_OBSERVER(ret, before_commit, (&param));
337   plugin_foreach(thd, se_before_commit, MYSQL_STORAGE_ENGINE_PLUGIN, &param);
338   return ret;
339 }
340 
341 /**
342  Helper method to check if the given table has 'CASCADE' foreign key or not.
343 
344  @param[in]   table     Table object that needs to be verified.
345 
346  @return bool true      If the table has 'CASCADE' foreign key.
347               false     If the table does not have 'CASCADE' foreign key.
348 */
has_cascade_foreign_key(TABLE * table)349 bool has_cascade_foreign_key(TABLE *table) {
350   DBUG_TRACE;
351 
352   TABLE_SHARE_FOREIGN_KEY_INFO *fk = table->s->foreign_key;
353 
354   for (uint i = 0; i < table->s->foreign_keys; i++) {
355     /*
356       The supported values of update/delete_rule are: CASCADE, SET NULL,
357       NO ACTION, RESTRICT and SET DEFAULT.
358     */
359     if (dd::Foreign_key::RULE_CASCADE == fk[i].update_rule ||
360         dd::Foreign_key::RULE_CASCADE == fk[i].delete_rule ||
361         dd::Foreign_key::RULE_SET_NULL == fk[i].update_rule ||
362         dd::Foreign_key::RULE_SET_NULL == fk[i].delete_rule ||
363         dd::Foreign_key::RULE_SET_DEFAULT == fk[i].update_rule ||
364         dd::Foreign_key::RULE_SET_DEFAULT == fk[i].delete_rule) {
365       return true;
366     }
367   }
368   return false;
369 }
370 
371 /**
372  Helper method to create table information for the hook call
373  */
prepare_table_info(THD * thd,Trans_table_info * & table_info_list,uint & number_of_tables)374 void prepare_table_info(THD *thd, Trans_table_info *&table_info_list,
375                         uint &number_of_tables) {
376   DBUG_TRACE;
377 
378   TABLE *open_tables = thd->open_tables;
379 
380   // Fail if tables are not open
381   if (open_tables == nullptr) {
382     return;
383   }
384 
385   // Gather table information
386   std::vector<Trans_table_info> table_info_holder;
387   for (; open_tables != nullptr; open_tables = open_tables->next) {
388     Trans_table_info table_info = {nullptr, 0, 0, false};
389 
390     if (open_tables->no_replicate) {
391       continue;
392     }
393 
394     table_info.table_name = open_tables->s->table_name.str;
395 
396     uint primary_keys = 0;
397     if (open_tables->key_info != nullptr &&
398         (open_tables->s->primary_key < MAX_KEY)) {
399       primary_keys = open_tables->s->primary_key;
400 
401       // if primary keys is still 0, lets double check on another var
402       if (primary_keys == 0) {
403         primary_keys = open_tables->key_info->user_defined_key_parts;
404       }
405     }
406 
407     table_info.number_of_primary_keys = primary_keys;
408 
409     table_info.db_type = open_tables->s->db_type()->db_type;
410 
411     /*
412       Find out if the table has foreign key with ON UPDATE/DELETE CASCADE
413       clause.
414     */
415     table_info.has_cascade_foreign_key = has_cascade_foreign_key(open_tables);
416 
417     table_info_holder.push_back(table_info);
418   }
419 
420   // Now that one has all the information, one should build the
421   // data that will be delivered to the plugin
422   if (table_info_holder.size() > 0) {
423     number_of_tables = table_info_holder.size();
424 
425     table_info_list = (Trans_table_info *)my_malloc(
426         PSI_NOT_INSTRUMENTED, number_of_tables * sizeof(Trans_table_info),
427         MYF(0));
428 
429     std::vector<Trans_table_info>::iterator table_info_holder_it =
430         table_info_holder.begin();
431     for (int table = 0; table_info_holder_it != table_info_holder.end();
432          table_info_holder_it++, table++) {
433       table_info_list[table].number_of_primary_keys =
434           (*table_info_holder_it).number_of_primary_keys;
435       table_info_list[table].table_name = (*table_info_holder_it).table_name;
436       table_info_list[table].db_type = (*table_info_holder_it).db_type;
437       table_info_list[table].has_cascade_foreign_key =
438           (*table_info_holder_it).has_cascade_foreign_key;
439     }
440   }
441 }
442 
443 /**
444   Helper that gathers all table runtime information
445 
446   @param[in]   thd       the current execution thread
447   @param[out]  ctx_info  Trans_context_info in which the result is stored.
448  */
prepare_transaction_context(THD * thd,Trans_context_info & ctx_info)449 static void prepare_transaction_context(THD *thd,
450                                         Trans_context_info &ctx_info) {
451   // Extracting the session value of SQL binlogging
452   ctx_info.binlog_enabled = thd->variables.sql_log_bin;
453 
454   // Extracting the session value of binlog format
455   ctx_info.binlog_format = thd->variables.binlog_format;
456 
457   // Extracting the global mutable value of binlog checksum
458   ctx_info.binlog_checksum_options = binlog_checksum_options;
459 
460   // Extracting the session value of transaction_write_set_extraction
461   ctx_info.transaction_write_set_extraction =
462       thd->variables.transaction_write_set_extraction;
463 
464   // Extracting transaction isolation level
465   ctx_info.tx_isolation = thd->tx_isolation;
466 }
467 
before_dml(THD * thd,int & result)468 int Trans_delegate::before_dml(THD *thd, int &result) {
469   DBUG_TRACE;
470   Trans_param param;
471   TRANS_PARAM_ZERO(param);
472 
473   param.server_id = thd->server_id;
474   param.server_uuid = server_uuid;
475   param.thread_id = thd->thread_id();
476 
477   prepare_table_info(thd, param.tables_info, param.number_of_tables);
478   prepare_transaction_context(thd, param.trans_ctx_info);
479 
480   int ret = 0;
481   FOREACH_OBSERVER_ERROR_OUT(ret, before_dml, &param, result);
482 
483   my_free(param.tables_info);
484 
485   return ret;
486 }
487 
se_before_rollback(THD *,plugin_ref plugin,void * arg)488 static bool se_before_rollback(THD *, plugin_ref plugin, void *arg) {
489   handlerton *hton = plugin_data<handlerton *>(plugin);
490   if (hton->se_before_rollback) hton->se_before_rollback(arg);
491   return false;
492 }
493 
before_rollback(THD * thd,bool all)494 int Trans_delegate::before_rollback(THD *thd, bool all) {
495   DBUG_TRACE;
496   Trans_param param;
497   TRANS_PARAM_ZERO(param);
498   param.server_id = thd->server_id;
499   param.server_uuid = server_uuid;
500   param.thread_id = thd->thread_id();
501   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
502 
503   bool is_real_trans =
504       (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
505   if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
506 
507   int ret = 0;
508   FOREACH_OBSERVER(ret, before_rollback, (&param));
509   plugin_foreach(thd, se_before_rollback, MYSQL_STORAGE_ENGINE_PLUGIN, &param);
510   return ret;
511 }
512 
se_after_commit(THD *,plugin_ref plugin,void * arg)513 static bool se_after_commit(THD *, plugin_ref plugin, void *arg) {
514   handlerton *hton = plugin_data<handlerton *>(plugin);
515   if (hton->se_after_commit) hton->se_after_commit(arg);
516   return false;
517 }
518 
after_commit(THD * thd,bool all)519 int Trans_delegate::after_commit(THD *thd, bool all) {
520   DBUG_TRACE;
521   Trans_param param;
522   TRANS_PARAM_ZERO(param);
523   param.server_uuid = server_uuid;
524   param.thread_id = thd->thread_id();
525 
526   Gtid gtid;
527   thd->rpl_thd_ctx.last_used_gtid_tracker_ctx().get_last_used_gtid(gtid);
528   param.gtid_info.sidno = gtid.sidno;
529   param.gtid_info.gno = gtid.gno;
530 
531   bool is_real_trans =
532       (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
533   if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
534 
535   thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
536   param.server_id = thd->server_id;
537   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
538 
539   DBUG_PRINT("enter",
540              ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
541   DEBUG_SYNC(thd, "before_call_after_commit_observer");
542 
543   int ret = 0;
544   FOREACH_OBSERVER(ret, after_commit, (&param));
545   plugin_foreach(thd, se_after_commit, MYSQL_STORAGE_ENGINE_PLUGIN, &param);
546   return ret;
547 }
548 
after_rollback(THD * thd,bool all)549 int Trans_delegate::after_rollback(THD *thd, bool all) {
550   DBUG_TRACE;
551   Trans_param param;
552   TRANS_PARAM_ZERO(param);
553   param.server_uuid = server_uuid;
554   param.thread_id = thd->thread_id();
555 
556   bool is_real_trans =
557       (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
558   if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
559   thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
560   param.server_id = thd->server_id;
561   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
562 
563   int ret = 0;
564   FOREACH_OBSERVER(ret, after_rollback, (&param));
565   return ret;
566 }
567 
trans_begin(THD * thd,int & out)568 int Trans_delegate::trans_begin(THD *thd, int &out) {
569   DBUG_TRACE;
570   Trans_param param;
571   TRANS_PARAM_ZERO(param);
572   param.server_uuid = server_uuid;
573   param.thread_id = thd->thread_id();
574   param.group_replication_consistency =
575       thd->variables.group_replication_consistency;
576   param.hold_timeout = thd->variables.net_wait_timeout;
577   param.server_id = thd->server_id;
578   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
579 
580   int ret = 0;
581   FOREACH_OBSERVER_ERROR_OUT(ret, begin, &param, out);
582   return ret;
583 }
584 
after_flush(THD * thd,const char * log_file,my_off_t log_pos)585 int Binlog_storage_delegate::after_flush(THD *thd, const char *log_file,
586                                          my_off_t log_pos) {
587   DBUG_TRACE;
588   DBUG_PRINT("enter",
589              ("log_file: %s, log_pos: %llu", log_file, (ulonglong)log_pos));
590   Binlog_storage_param param;
591   param.server_id = thd->server_id;
592 
593   int ret = 0;
594   FOREACH_OBSERVER(ret, after_flush, (&param, log_file, log_pos));
595   return ret;
596 }
597 
598 /**
599  * This hook MUST be invoked after ALL recovery operations are performed
600  * and the server is ready to serve clients.
601  *
602  * @return 0 on success, >0 otherwise.
603  */
before_handle_connection(THD *)604 int Server_state_delegate::before_handle_connection(THD *) {
605   DBUG_TRACE;
606   Server_state_param param;
607 
608   int ret = 0;
609   FOREACH_OBSERVER(ret, before_handle_connection, (&param));
610   return ret;
611 }
612 
613 /**
614  * This hook MUST be invoked before ANY recovery action is started.
615  *
616  * @return 0 on success, >0 otherwise.
617  */
before_recovery(THD *)618 int Server_state_delegate::before_recovery(THD *) {
619   DBUG_TRACE;
620   Server_state_param param;
621 
622   int ret = 0;
623   FOREACH_OBSERVER(ret, before_recovery, (&param));
624   return ret;
625 }
626 
627 /**
628  * This hook MUST be invoked after the recovery from the engine
629  * is complete.
630  *
631  * @return 0 on success, >0 otherwise.
632  */
after_engine_recovery(THD *)633 int Server_state_delegate::after_engine_recovery(THD *) {
634   DBUG_TRACE;
635   Server_state_param param;
636 
637   int ret = 0;
638   FOREACH_OBSERVER(ret, after_engine_recovery, (&param));
639   return ret;
640 }
641 
642 /**
643  * This hook MUST be invoked after the server has completed the
644  * local recovery. The server can proceed with the further operations
645  * like engaging in distributed recovery etc.
646  *
647  * @return 0 on success, >0 otherwise.
648  */
after_recovery(THD *)649 int Server_state_delegate::after_recovery(THD *) {
650   DBUG_TRACE;
651   Server_state_param param;
652 
653   int ret = 0;
654   FOREACH_OBSERVER(ret, after_recovery, (&param));
655   return ret;
656 }
657 
658 /**
659  * This hook MUST be invoked before server shutdown action is
660  * initiated.
661  *
662  * @return 0 on success, >0 otherwise.
663  */
before_server_shutdown(THD *)664 int Server_state_delegate::before_server_shutdown(THD *) {
665   DBUG_TRACE;
666   Server_state_param param;
667 
668   int ret = 0;
669   FOREACH_OBSERVER(ret, before_server_shutdown, (&param));
670   return ret;
671 }
672 
673 /**
674  * This hook MUST be invoked after server shutdown operation is
675  * complete.
676  *
677  * @return 0 on success, >0 otherwise.
678  */
after_server_shutdown(THD *)679 int Server_state_delegate::after_server_shutdown(THD *) {
680   DBUG_TRACE;
681   Server_state_param param;
682 
683   int ret = 0;
684   FOREACH_OBSERVER(ret, after_server_shutdown, (&param));
685   return ret;
686 }
687 
688 /**
689  * This hook MUST be invoked after upgrade from .frm to data dictionary
690  *
691  * @return 0 on success, >0 otherwise.
692  */
after_dd_upgrade_from_57(THD *)693 int Server_state_delegate::after_dd_upgrade_from_57(THD *) {
694   DBUG_TRACE;
695   Server_state_param param;
696 
697   int ret = 0;
698   FOREACH_OBSERVER(ret, after_dd_upgrade_from_57, (&param));
699   return ret;
700 }
701 
after_sync(THD * thd,const char * log_file,my_off_t log_pos)702 int Binlog_storage_delegate::after_sync(THD *thd, const char *log_file,
703                                         my_off_t log_pos) {
704   DBUG_TRACE;
705   DBUG_PRINT("enter",
706              ("log_file: %s, log_pos: %llu", log_file, (ulonglong)log_pos));
707   Binlog_storage_param param;
708   param.server_id = thd->server_id;
709 
710   DBUG_ASSERT(log_pos != 0);
711   int ret = 0;
712   FOREACH_OBSERVER(ret, after_sync, (&param, log_file, log_pos));
713 
714   DEBUG_SYNC(thd, "after_call_after_sync_observer");
715   return ret;
716 }
717 
transmit_start(THD * thd,ushort flags,const char * log_file,my_off_t log_pos,bool * observe_transmission)718 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
719                                              const char *log_file,
720                                              my_off_t log_pos,
721                                              bool *observe_transmission) {
722   Binlog_transmit_param param;
723   param.flags = flags;
724   param.server_id = thd->server_id;
725 
726   int ret = 0;
727   FOREACH_OBSERVER(ret, transmit_start, (&param, log_file, log_pos));
728   *observe_transmission = param.should_observe();
729   return ret;
730 }
731 
transmit_stop(THD * thd,ushort flags)732 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) {
733   Binlog_transmit_param param;
734   param.flags = flags;
735   param.server_id = thd->server_id;
736 
737   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
738 
739   int ret = 0;
740   FOREACH_OBSERVER(ret, transmit_stop, (&param));
741   return ret;
742 }
743 
reserve_header(THD * thd,ushort flags,String * packet)744 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
745                                              String *packet) {
746 /* NOTE2ME: Maximum extra header size for each observer, I hope 32
747    bytes should be enough for each Observer to reserve their extra
748    header. If later found this is not enough, we can increase this
749    /HEZX
750 */
751 #define RESERVE_HEADER_SIZE 32
752   unsigned char header[RESERVE_HEADER_SIZE];
753   ulong hlen;
754   Binlog_transmit_param param;
755   param.flags = flags;
756   param.server_id = thd->server_id;
757 
758   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
759 
760   int ret = 0;
761   read_lock();
762   Observer_info_iterator iter = observer_info_iter();
763   Observer_info *info = iter++;
764   for (; info; info = iter++) {
765     plugin_ref plugin = my_plugin_lock(thd, &info->plugin);
766     if (!plugin) {
767       ret = 1;
768       break;
769     }
770     hlen = 0;
771     if (((Observer *)info->observer)->reserve_header &&
772         ((Observer *)info->observer)
773             ->reserve_header(&param, header, RESERVE_HEADER_SIZE, &hlen)) {
774       ret = 1;
775       plugin_unlock(thd, plugin);
776       break;
777     }
778     plugin_unlock(thd, plugin);
779     if (hlen == 0) continue;
780     if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) {
781       ret = 1;
782       break;
783     }
784   }
785   unlock();
786   return ret;
787 }
788 
before_send_event(THD * thd,ushort flags,String * packet,const char * log_file,my_off_t log_pos)789 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
790                                                 String *packet,
791                                                 const char *log_file,
792                                                 my_off_t log_pos) {
793   Binlog_transmit_param param;
794   param.flags = flags;
795   param.server_id = thd->server_id;
796 
797   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
798 
799   int ret = 0;
800   FOREACH_OBSERVER(
801       ret, before_send_event,
802       (&param, pointer_cast<uchar *>(packet->ptr()), packet->length(),
803        log_file + dirname_length(log_file), log_pos));
804   return ret;
805 }
806 
after_send_event(THD * thd,ushort flags,String * packet,const char * skipped_log_file,my_off_t skipped_log_pos)807 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
808                                                String *packet,
809                                                const char *skipped_log_file,
810                                                my_off_t skipped_log_pos) {
811   Binlog_transmit_param param;
812   param.flags = flags;
813   param.server_id = thd->server_id;
814 
815   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
816 
817   int ret = 0;
818   FOREACH_OBSERVER(
819       ret, after_send_event,
820       (&param, packet->ptr(), packet->length(),
821        skipped_log_file + dirname_length(skipped_log_file), skipped_log_pos));
822   return ret;
823 }
824 
after_reset_master(THD * thd,ushort flags)825 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
826 
827 {
828   Binlog_transmit_param param;
829   param.flags = flags;
830   param.server_id = thd->server_id;
831 
832   int ret = 0;
833   FOREACH_OBSERVER(ret, after_reset_master, (&param));
834   return ret;
835 }
836 
init_param(Binlog_relay_IO_param * param,Master_info * mi)837 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
838                                           Master_info *mi) {
839   param->mysql = mi->mysql;
840   param->channel_name = mi->get_channel();
841   param->user = const_cast<char *>(mi->get_user());
842   param->host = mi->host;
843   param->port = mi->port;
844   param->master_log_name = const_cast<char *>(mi->get_master_log_name());
845   param->master_log_pos = mi->get_master_log_pos();
846 }
847 
thread_start(THD * thd,Master_info * mi)848 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) {
849   Binlog_relay_IO_param param;
850   init_param(&param, mi);
851   param.server_id = thd->server_id;
852   param.thread_id = thd->thread_id();
853 
854   int ret = 0;
855   FOREACH_OBSERVER(ret, thread_start, (&param));
856   return ret;
857 }
858 
thread_stop(THD * thd,Master_info * mi)859 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) {
860   Binlog_relay_IO_param param;
861   init_param(&param, mi);
862   param.server_id = thd->server_id;
863   param.thread_id = thd->thread_id();
864 
865   int ret = 0;
866   FOREACH_OBSERVER(ret, thread_stop, (&param));
867   return ret;
868 }
869 
applier_start(THD * thd,Master_info * mi)870 int Binlog_relay_IO_delegate::applier_start(THD *thd, Master_info *mi) {
871   Binlog_relay_IO_param param;
872   init_param(&param, mi);
873   param.server_id = thd->server_id;
874   param.thread_id = thd->thread_id();
875 
876   int ret = 0;
877   FOREACH_OBSERVER(ret, applier_start, (&param));
878   return ret;
879 }
880 
applier_stop(THD * thd,Master_info * mi,bool aborted)881 int Binlog_relay_IO_delegate::applier_stop(THD *thd, Master_info *mi,
882                                            bool aborted) {
883   Binlog_relay_IO_param param;
884   init_param(&param, mi);
885   param.server_id = thd->server_id;
886   param.thread_id = thd->thread_id();
887 
888   int ret = 0;
889   FOREACH_OBSERVER(ret, applier_stop, (&param, aborted));
890   return ret;
891 }
892 
before_request_transmit(THD * thd,Master_info * mi,ushort flags)893 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, Master_info *mi,
894                                                       ushort flags) {
895   Binlog_relay_IO_param param;
896   init_param(&param, mi);
897   param.server_id = thd->server_id;
898   param.thread_id = thd->thread_id();
899 
900   int ret = 0;
901   FOREACH_OBSERVER(ret, before_request_transmit, (&param, (uint32)flags));
902   return ret;
903 }
904 
after_read_event(THD * thd,Master_info * mi,const char * packet,ulong len,const char ** event_buf,ulong * event_len)905 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
906                                                const char *packet, ulong len,
907                                                const char **event_buf,
908                                                ulong *event_len) {
909   Binlog_relay_IO_param param;
910   init_param(&param, mi);
911   param.server_id = thd->server_id;
912   param.thread_id = thd->thread_id();
913 
914   int ret = 0;
915   FOREACH_OBSERVER(ret, after_read_event,
916                    (&param, packet, len, event_buf, event_len));
917   return ret;
918 }
919 
after_queue_event(THD * thd,Master_info * mi,const char * event_buf,ulong event_len,bool synced)920 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
921                                                 const char *event_buf,
922                                                 ulong event_len, bool synced) {
923   Binlog_relay_IO_param param;
924   init_param(&param, mi);
925   param.server_id = thd->server_id;
926   param.thread_id = thd->thread_id();
927 
928   uint32 flags = 0;
929   if (synced) flags |= BINLOG_STORAGE_IS_SYNCED;
930 
931   int ret = 0;
932   FOREACH_OBSERVER(ret, after_queue_event,
933                    (&param, event_buf, event_len, flags));
934   return ret;
935 }
936 
after_reset_slave(THD * thd,Master_info * mi)937 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
938 
939 {
940   Binlog_relay_IO_param param;
941   init_param(&param, mi);
942   param.server_id = thd->server_id;
943   param.thread_id = thd->thread_id();
944 
945   int ret = 0;
946   FOREACH_OBSERVER(ret, after_reset_slave, (&param));
947   return ret;
948 }
949 
applier_log_event(THD * thd,int & out)950 int Binlog_relay_IO_delegate::applier_log_event(THD *thd, int &out) {
951   DBUG_TRACE;
952   Trans_param trans_param;
953   TRANS_PARAM_ZERO(trans_param);
954   Binlog_relay_IO_param param;
955 
956   param.server_id = thd->server_id;
957   param.thread_id = thd->thread_id();
958 
959   prepare_table_info(thd, trans_param.tables_info,
960                      trans_param.number_of_tables);
961 
962   int ret = 0;
963   FOREACH_OBSERVER(ret, applier_log_event, (&param, &trans_param, out));
964 
965   my_free(trans_param.tables_info);
966 
967   return ret;
968 }
969 
register_trans_observer(Trans_observer * observer,void * p)970 int register_trans_observer(Trans_observer *observer, void *p) {
971   return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
972 }
973 
unregister_trans_observer(Trans_observer * observer,void *)974 int unregister_trans_observer(Trans_observer *observer, void *) {
975   return transaction_delegate->remove_observer(observer);
976 }
977 
register_binlog_storage_observer(Binlog_storage_observer * observer,void * p)978 int register_binlog_storage_observer(Binlog_storage_observer *observer,
979                                      void *p) {
980   DBUG_TRACE;
981   int result =
982       binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
983   return result;
984 }
985 
unregister_binlog_storage_observer(Binlog_storage_observer * observer,void *)986 int unregister_binlog_storage_observer(Binlog_storage_observer *observer,
987                                        void *) {
988   return binlog_storage_delegate->remove_observer(observer);
989 }
990 
register_server_state_observer(Server_state_observer * observer,void * plugin_var)991 int register_server_state_observer(Server_state_observer *observer,
992                                    void *plugin_var) {
993   DBUG_TRACE;
994   int result = server_state_delegate->add_observer(observer,
995                                                    (st_plugin_int *)plugin_var);
996   return result;
997 }
998 
unregister_server_state_observer(Server_state_observer * observer,void *)999 int unregister_server_state_observer(Server_state_observer *observer, void *) {
1000   DBUG_TRACE;
1001   int result = server_state_delegate->remove_observer(observer);
1002   return result;
1003 }
1004 
register_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)1005 int register_binlog_transmit_observer(Binlog_transmit_observer *observer,
1006                                       void *p) {
1007   return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
1008 }
1009 
unregister_binlog_transmit_observer(Binlog_transmit_observer * observer,void *)1010 int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer,
1011                                         void *) {
1012   return binlog_transmit_delegate->remove_observer(observer);
1013 }
1014 
register_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)1015 int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
1016                                       void *p) {
1017   return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
1018 }
1019 
unregister_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void *)1020 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
1021                                         void *) {
1022   return binlog_relay_io_delegate->remove_observer(observer);
1023 }
1024 
launch_hook_trans_begin(THD * thd,TABLE_LIST * all_tables)1025 int launch_hook_trans_begin(THD *thd, TABLE_LIST *all_tables) {
1026   DBUG_TRACE;
1027   LEX *lex = thd->lex;
1028   enum_sql_command sql_command = lex->sql_command;
1029   // by default commands are put on hold
1030   bool hold_command = true;
1031   int ret = 0;
1032 
1033   // if command belong to a transaction that already pass by hook, it can
1034   // continue
1035   if (thd->get_transaction()->was_trans_begin_hook_invoked()) {
1036     return 0;
1037   }
1038 
1039   bool is_show = ((sql_command_flags[sql_command] & CF_STATUS_COMMAND) &&
1040                   (sql_command != SQLCOM_BINLOG_BASE64_EVENT)) ||
1041                  (sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1042   bool is_set = (sql_command == SQLCOM_SET_OPTION);
1043   bool is_select = (sql_command == SQLCOM_SELECT);
1044   bool is_do = (sql_command == SQLCOM_DO);
1045   bool is_empty = (sql_command == SQLCOM_EMPTY_QUERY);
1046   bool is_use = (sql_command == SQLCOM_CHANGE_DB);
1047   bool is_stop_gr = (sql_command == SQLCOM_STOP_GROUP_REPLICATION);
1048   bool is_shutdown = (sql_command == SQLCOM_SHUTDOWN);
1049   bool is_reset_persist =
1050       (sql_command == SQLCOM_RESET && lex->option_type == OPT_PERSIST);
1051 
1052   if ((is_set || is_do || is_show || is_empty || is_use || is_stop_gr ||
1053        is_shutdown || is_reset_persist) &&
1054       !lex->uses_stored_routines()) {
1055     return 0;
1056   }
1057 
1058   if (is_select) {
1059     bool is_udf = false;
1060 
1061     // if select is an udf function
1062     SELECT_LEX *select_lex_elem = lex->unit->first_select();
1063     while (select_lex_elem != nullptr) {
1064       Item *item;
1065       List_iterator_fast<Item> it(select_lex_elem->fields_list);
1066       while ((item = it++)) {
1067         if (item->type() == Item::FUNC_ITEM) {
1068           Item_func *func_item = down_cast<Item_func *>(item);
1069           Item_func::Functype functype = func_item->functype();
1070           if (functype == Item_func::FUNC_SP || functype == Item_func::UDF_FUNC)
1071             is_udf = true;
1072         }
1073       }
1074       select_lex_elem = select_lex_elem->next_select();
1075     }
1076 
1077     if (!is_udf && all_tables == nullptr) {
1078       // SELECT that don't use tables and isn't a UDF
1079       hold_command = false;
1080     }
1081 
1082     if (hold_command && all_tables != nullptr) {
1083       // SELECT that use tables
1084       bool is_perf_schema_table = false;
1085       bool is_process_list = false;
1086       bool is_sys_db = false;
1087       bool stop_db_check = false;
1088 
1089       for (TABLE_LIST *table = all_tables; table && !stop_db_check;
1090            table = table->next_global) {
1091         DBUG_ASSERT(table->db && table->table_name);
1092 
1093         if (is_perfschema_db(table->db, table->db_length))
1094           is_perf_schema_table = true;
1095         else if (is_infoschema_db(table->db, table->db_length) &&
1096                  !my_strcasecmp(system_charset_info, "PROCESSLIST",
1097                                 table->table_name)) {
1098           is_process_list = true;
1099         } else if (table->db_length == 3 &&
1100                    !my_strcasecmp(system_charset_info, "sys", table->db)) {
1101           is_sys_db = true;
1102         } else {
1103           is_perf_schema_table = false;
1104           is_process_list = false;
1105           is_sys_db = false;
1106           stop_db_check = true;
1107         }
1108       }
1109 
1110       if (is_process_list || is_perf_schema_table || is_sys_db) {
1111         hold_command = false;
1112       }
1113     }
1114   }
1115 
1116   if (hold_command) {
1117     DBUG_EXECUTE_IF("launch_hook_trans_begin_assert_if_hold",
1118                     { DBUG_ASSERT(0); };);
1119 
1120     PSI_stage_info old_stage;
1121     thd->enter_stage(&stage_hook_begin_trans, &old_stage, __func__, __FILE__,
1122                      __LINE__);
1123     RUN_HOOK(transaction, trans_begin, (thd, ret));
1124     THD_STAGE_INFO(thd, old_stage);
1125     if (!ret && (sql_command == SQLCOM_BEGIN ||
1126                  thd->in_active_multi_stmt_transaction())) {
1127       thd->get_transaction()->set_trans_begin_hook_invoked();
1128     }
1129   }
1130 
1131   return ret;
1132 }
1133