1 /* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/rpl_handler.h"
24
25 #include <string.h>
26 #include <memory>
27 #include <new>
28 #include <unordered_map>
29 #include <utility>
30 #include <vector>
31
32 #include "lex_string.h"
33 #include "map_helpers.h"
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_io.h"
37 #include "my_loglevel.h"
38 #include "mysql/components/services/log_builtins.h"
39 #include "mysql/components/services/log_shared.h"
40 #include "mysql/plugin.h"
41 #include "mysql/psi/mysql_mutex.h"
42 #include "mysql/psi/psi_base.h"
43 #include "mysql/service_mysql_alloc.h"
44 #include "mysqld_error.h"
45 #include "prealloced_array.h"
46 #include "sql/current_thd.h"
47 #include "sql/debug_sync.h" // DEBUG_SYNC
48 #include "sql/handler.h"
49 #include "sql/item_func.h" // user_var_entry
50 #include "sql/key.h"
51 #include "sql/log.h"
52 #include "sql/mysqld.h" // server_uuid
53 #include "sql/psi_memory_key.h"
54 #include "sql/replication.h" // Trans_param
55 #include "sql/rpl_gtid.h"
56 #include "sql/rpl_mi.h" // Master_info
57 #include "sql/set_var.h" // OPT_PERSIST
58 #include "sql/sql_class.h" // THD
59 #include "sql/sql_const.h"
60 #include "sql/sql_lex.h"
61 #include "sql/sql_parse.h" // sql_command_flags
62 #include "sql/sql_plugin.h" // plugin_int_to_ref
63 #include "sql/system_variables.h"
64 #include "sql/table.h"
65 #include "sql/thr_malloc.h"
66 #include "sql/transaction_info.h"
67 #include "sql_string.h"
68
69 Trans_delegate *transaction_delegate;
70 Binlog_storage_delegate *binlog_storage_delegate;
71 Server_state_delegate *server_state_delegate;
72
73 Binlog_transmit_delegate *binlog_transmit_delegate;
74 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
75
Observer_info(void * ob,st_plugin_int * p)76 Observer_info::Observer_info(void *ob, st_plugin_int *p)
77 : observer(ob), plugin_int(p) {
78 plugin = plugin_int_to_ref(plugin_int);
79 }
80
Delegate(PSI_rwlock_key key)81 Delegate::Delegate(
82 #ifdef HAVE_PSI_RWLOCK_INTERFACE
83 PSI_rwlock_key key
84 #endif
85 ) {
86 inited = false;
87 #ifdef HAVE_PSI_RWLOCK_INTERFACE
88 if (mysql_rwlock_init(key, &lock)) return;
89 #else
90 if (mysql_rwlock_init(0, &lock)) return;
91 #endif
92 init_sql_alloc(key_memory_delegate, &memroot, 1024, 0);
93 inited = true;
94 }
95
96 /*
97 structure to save transaction log filename and position
98 */
99 typedef struct Trans_binlog_info {
100 my_off_t log_pos;
101 char log_file[FN_REFLEN];
102 } Trans_binlog_info;
103
get_user_var_int(const char * name,long long int * value,int * null_value)104 int get_user_var_int(const char *name, long long int *value, int *null_value) {
105 bool null_val;
106 THD *thd = current_thd;
107
108 /* Protects thd->user_vars. */
109 mysql_mutex_lock(&thd->LOCK_thd_data);
110
111 const auto it = thd->user_vars.find(name);
112 if (it == thd->user_vars.end()) {
113 mysql_mutex_unlock(&thd->LOCK_thd_data);
114 return 1;
115 }
116 *value = it->second->val_int(&null_val);
117 if (null_value) *null_value = null_val;
118 mysql_mutex_unlock(&thd->LOCK_thd_data);
119 return 0;
120 }
121
get_user_var_real(const char * name,double * value,int * null_value)122 int get_user_var_real(const char *name, double *value, int *null_value) {
123 bool null_val;
124 THD *thd = current_thd;
125
126 /* Protects thd->user_vars. */
127 mysql_mutex_lock(&thd->LOCK_thd_data);
128
129 const auto it = thd->user_vars.find(name);
130 if (it == thd->user_vars.end()) {
131 mysql_mutex_unlock(&thd->LOCK_thd_data);
132 return 1;
133 }
134 *value = it->second->val_real(&null_val);
135 if (null_value) *null_value = null_val;
136 mysql_mutex_unlock(&thd->LOCK_thd_data);
137 return 0;
138 }
139
get_user_var_str(const char * name,char * value,size_t len,unsigned int precision,int * null_value)140 int get_user_var_str(const char *name, char *value, size_t len,
141 unsigned int precision, int *null_value) {
142 String str;
143 bool null_val;
144 THD *thd = current_thd;
145
146 /* Protects thd->user_vars. */
147 mysql_mutex_lock(&thd->LOCK_thd_data);
148
149 const auto it = thd->user_vars.find(name);
150 if (it == thd->user_vars.end()) {
151 mysql_mutex_unlock(&thd->LOCK_thd_data);
152 return 1;
153 }
154 it->second->val_str(&null_val, &str, precision);
155 strncpy(value, str.c_ptr(), len);
156 if (null_value) *null_value = null_val;
157 mysql_mutex_unlock(&thd->LOCK_thd_data);
158 return 0;
159 }
160
delegates_init()161 int delegates_init() {
162 alignas(Trans_delegate) static char place_trans_mem[sizeof(Trans_delegate)];
163 alignas(Binlog_storage_delegate) static char
164 place_storage_mem[sizeof(Binlog_storage_delegate)];
165 alignas(Server_state_delegate) static char
166 place_state_mem[sizeof(Server_state_delegate)];
167 alignas(Binlog_transmit_delegate) static char
168 place_transmit_mem[sizeof(Binlog_transmit_delegate)];
169 alignas(Binlog_relay_IO_delegate) static char
170 place_relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
171
172 transaction_delegate = new (place_trans_mem) Trans_delegate;
173 if (!transaction_delegate->is_inited()) {
174 LogErr(ERROR_LEVEL, ER_RPL_TRX_DELEGATES_INIT_FAILED);
175 return 1;
176 }
177
178 binlog_storage_delegate = new (place_storage_mem) Binlog_storage_delegate;
179 if (!binlog_storage_delegate->is_inited()) {
180 LogErr(ERROR_LEVEL, ER_RPL_BINLOG_STORAGE_DELEGATES_INIT_FAILED);
181 return 1;
182 }
183
184 server_state_delegate = new (place_state_mem) Server_state_delegate;
185 binlog_transmit_delegate = new (place_transmit_mem) Binlog_transmit_delegate;
186 if (!binlog_transmit_delegate->is_inited()) {
187 LogErr(ERROR_LEVEL, ER_RPL_BINLOG_TRANSMIT_DELEGATES_INIT_FAILED);
188 return 1;
189 }
190
191 binlog_relay_io_delegate = new (place_relay_io_mem) Binlog_relay_IO_delegate;
192 if (!binlog_relay_io_delegate->is_inited()) {
193 LogErr(ERROR_LEVEL, ER_RPL_BINLOG_RELAY_DELEGATES_INIT_FAILED);
194 return 1;
195 }
196
197 return 0;
198 }
199
delegates_destroy()200 void delegates_destroy() {
201 if (transaction_delegate) transaction_delegate->~Trans_delegate();
202 if (binlog_storage_delegate)
203 binlog_storage_delegate->~Binlog_storage_delegate();
204 if (server_state_delegate) server_state_delegate->~Server_state_delegate();
205 if (binlog_transmit_delegate)
206 binlog_transmit_delegate->~Binlog_transmit_delegate();
207 if (binlog_relay_io_delegate)
208 binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
209 }
210
211 /*
212 This macro is used by almost all the Delegate methods to iterate
213 over all the observers running given callback function of the
214 delegate .
215
216 Add observer plugins to the thd->lex list, after each statement, all
217 plugins add to thd->lex will be automatically unlocked.
218 */
219 #define FOREACH_OBSERVER(r, f, args) \
220 Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED); \
221 read_lock(); \
222 Observer_info_iterator iter = observer_info_iter(); \
223 Observer_info *info = iter++; \
224 for (; info; info = iter++) { \
225 plugin_ref plugin = my_plugin_lock(0, &info->plugin); \
226 if (!plugin) { \
227 /* plugin is not intialized or deleted, this is not an error */ \
228 r = 0; \
229 break; \
230 } \
231 plugins.push_back(plugin); \
232 if (((Observer *)info->observer)->f && \
233 ((Observer *)info->observer)->f args) { \
234 r = 1; \
235 LogEvent() \
236 .prio(ERROR_LEVEL) \
237 .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED) \
238 .subsys(LOG_SUBSYSTEM_TAG) \
239 .function(#f) \
240 .message("Run function '" #f "' in plugin '%s' failed", \
241 info->plugin_int->name.str); \
242 break; \
243 } \
244 } \
245 unlock(); \
246 /* \
247 Unlock plugins should be done after we released the Delegate lock \
248 to avoid possible deadlock when this is the last user of the \
249 plugin, and when we unlock the plugin, it will try to \
250 deinitialize the plugin, which will try to lock the Delegate in \
251 order to remove the observers. \
252 */ \
253 if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
254
255 #define FOREACH_OBSERVER_ERROR_OUT(r, f, args, out) \
256 Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED); \
257 read_lock(); \
258 Observer_info_iterator iter = observer_info_iter(); \
259 Observer_info *info = iter++; \
260 \
261 int error_out = 0; \
262 for (; info; info = iter++) { \
263 plugin_ref plugin = my_plugin_lock(0, &info->plugin); \
264 if (!plugin) { \
265 /* plugin is not intialized or deleted, this is not an error */ \
266 r = 0; \
267 break; \
268 } \
269 plugins.push_back(plugin); \
270 \
271 bool hook_error = false; \
272 hook_error = ((Observer *)info->observer)->f(args, error_out); \
273 \
274 out += error_out; \
275 if (hook_error) { \
276 r = 1; \
277 LogEvent() \
278 .prio(ERROR_LEVEL) \
279 .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED) \
280 .subsys(LOG_SUBSYSTEM_TAG) \
281 .function(#f) \
282 .message("Run function '" #f "' in plugin '%s' failed", \
283 info->plugin_int->name.str); \
284 break; \
285 } \
286 } \
287 unlock(); \
288 /* \
289 Unlock plugins should be done after we released the Delegate lock \
290 to avoid possible deadlock when this is the last user of the \
291 plugin, and when we unlock the plugin, it will try to \
292 deinitialize the plugin, which will try to lock the Delegate in \
293 order to remove the observers. \
294 */ \
295 if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
296
se_before_commit(THD *,plugin_ref plugin,void * arg)297 static bool se_before_commit(THD *, plugin_ref plugin, void *arg) {
298 handlerton *hton = plugin_data<handlerton *>(plugin);
299 if (hton->se_before_commit) hton->se_before_commit(arg);
300 return false;
301 }
302
before_commit(THD * thd,bool all,Binlog_cache_storage * trx_cache_log,Binlog_cache_storage * stmt_cache_log,ulonglong cache_log_max_size,bool is_atomic_ddl_arg)303 int Trans_delegate::before_commit(THD *thd, bool all,
304 Binlog_cache_storage *trx_cache_log,
305 Binlog_cache_storage *stmt_cache_log,
306 ulonglong cache_log_max_size,
307 bool is_atomic_ddl_arg) {
308 DBUG_TRACE;
309 Trans_param param;
310 TRANS_PARAM_ZERO(param);
311 param.server_id = thd->server_id;
312 param.server_uuid = server_uuid;
313 param.thread_id = thd->thread_id();
314 param.gtid_info.type = thd->variables.gtid_next.type;
315 param.gtid_info.sidno = thd->variables.gtid_next.gtid.sidno;
316 param.gtid_info.gno = thd->variables.gtid_next.gtid.gno;
317 param.trx_cache_log = trx_cache_log;
318 param.stmt_cache_log = stmt_cache_log;
319 param.cache_log_max_size = cache_log_max_size;
320 param.original_commit_timestamp = &thd->variables.original_commit_timestamp;
321 param.is_atomic_ddl = is_atomic_ddl_arg;
322 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
323 param.group_replication_consistency =
324 thd->variables.group_replication_consistency;
325 param.original_server_version = &(thd->variables.original_server_version);
326 param.immediate_server_version = &(thd->variables.immediate_server_version);
327 param.is_create_table_as_select =
328 (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
329 thd->lex->select_lex->get_fields_list()->elements);
330
331 bool is_real_trans =
332 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
333 if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
334
335 int ret = 0;
336 FOREACH_OBSERVER(ret, before_commit, (¶m));
337 plugin_foreach(thd, se_before_commit, MYSQL_STORAGE_ENGINE_PLUGIN, ¶m);
338 return ret;
339 }
340
341 /**
342 Helper method to check if the given table has 'CASCADE' foreign key or not.
343
344 @param[in] table Table object that needs to be verified.
345
346 @return bool true If the table has 'CASCADE' foreign key.
347 false If the table does not have 'CASCADE' foreign key.
348 */
has_cascade_foreign_key(TABLE * table)349 bool has_cascade_foreign_key(TABLE *table) {
350 DBUG_TRACE;
351
352 TABLE_SHARE_FOREIGN_KEY_INFO *fk = table->s->foreign_key;
353
354 for (uint i = 0; i < table->s->foreign_keys; i++) {
355 /*
356 The supported values of update/delete_rule are: CASCADE, SET NULL,
357 NO ACTION, RESTRICT and SET DEFAULT.
358 */
359 if (dd::Foreign_key::RULE_CASCADE == fk[i].update_rule ||
360 dd::Foreign_key::RULE_CASCADE == fk[i].delete_rule ||
361 dd::Foreign_key::RULE_SET_NULL == fk[i].update_rule ||
362 dd::Foreign_key::RULE_SET_NULL == fk[i].delete_rule ||
363 dd::Foreign_key::RULE_SET_DEFAULT == fk[i].update_rule ||
364 dd::Foreign_key::RULE_SET_DEFAULT == fk[i].delete_rule) {
365 return true;
366 }
367 }
368 return false;
369 }
370
371 /**
372 Helper method to create table information for the hook call
373 */
prepare_table_info(THD * thd,Trans_table_info * & table_info_list,uint & number_of_tables)374 void prepare_table_info(THD *thd, Trans_table_info *&table_info_list,
375 uint &number_of_tables) {
376 DBUG_TRACE;
377
378 TABLE *open_tables = thd->open_tables;
379
380 // Fail if tables are not open
381 if (open_tables == nullptr) {
382 return;
383 }
384
385 // Gather table information
386 std::vector<Trans_table_info> table_info_holder;
387 for (; open_tables != nullptr; open_tables = open_tables->next) {
388 Trans_table_info table_info = {nullptr, 0, 0, false};
389
390 if (open_tables->no_replicate) {
391 continue;
392 }
393
394 table_info.table_name = open_tables->s->table_name.str;
395
396 uint primary_keys = 0;
397 if (open_tables->key_info != nullptr &&
398 (open_tables->s->primary_key < MAX_KEY)) {
399 primary_keys = open_tables->s->primary_key;
400
401 // if primary keys is still 0, lets double check on another var
402 if (primary_keys == 0) {
403 primary_keys = open_tables->key_info->user_defined_key_parts;
404 }
405 }
406
407 table_info.number_of_primary_keys = primary_keys;
408
409 table_info.db_type = open_tables->s->db_type()->db_type;
410
411 /*
412 Find out if the table has foreign key with ON UPDATE/DELETE CASCADE
413 clause.
414 */
415 table_info.has_cascade_foreign_key = has_cascade_foreign_key(open_tables);
416
417 table_info_holder.push_back(table_info);
418 }
419
420 // Now that one has all the information, one should build the
421 // data that will be delivered to the plugin
422 if (table_info_holder.size() > 0) {
423 number_of_tables = table_info_holder.size();
424
425 table_info_list = (Trans_table_info *)my_malloc(
426 PSI_NOT_INSTRUMENTED, number_of_tables * sizeof(Trans_table_info),
427 MYF(0));
428
429 std::vector<Trans_table_info>::iterator table_info_holder_it =
430 table_info_holder.begin();
431 for (int table = 0; table_info_holder_it != table_info_holder.end();
432 table_info_holder_it++, table++) {
433 table_info_list[table].number_of_primary_keys =
434 (*table_info_holder_it).number_of_primary_keys;
435 table_info_list[table].table_name = (*table_info_holder_it).table_name;
436 table_info_list[table].db_type = (*table_info_holder_it).db_type;
437 table_info_list[table].has_cascade_foreign_key =
438 (*table_info_holder_it).has_cascade_foreign_key;
439 }
440 }
441 }
442
443 /**
444 Helper that gathers all table runtime information
445
446 @param[in] thd the current execution thread
447 @param[out] ctx_info Trans_context_info in which the result is stored.
448 */
prepare_transaction_context(THD * thd,Trans_context_info & ctx_info)449 static void prepare_transaction_context(THD *thd,
450 Trans_context_info &ctx_info) {
451 // Extracting the session value of SQL binlogging
452 ctx_info.binlog_enabled = thd->variables.sql_log_bin;
453
454 // Extracting the session value of binlog format
455 ctx_info.binlog_format = thd->variables.binlog_format;
456
457 // Extracting the global mutable value of binlog checksum
458 ctx_info.binlog_checksum_options = binlog_checksum_options;
459
460 // Extracting the session value of transaction_write_set_extraction
461 ctx_info.transaction_write_set_extraction =
462 thd->variables.transaction_write_set_extraction;
463
464 // Extracting transaction isolation level
465 ctx_info.tx_isolation = thd->tx_isolation;
466 }
467
before_dml(THD * thd,int & result)468 int Trans_delegate::before_dml(THD *thd, int &result) {
469 DBUG_TRACE;
470 Trans_param param;
471 TRANS_PARAM_ZERO(param);
472
473 param.server_id = thd->server_id;
474 param.server_uuid = server_uuid;
475 param.thread_id = thd->thread_id();
476
477 prepare_table_info(thd, param.tables_info, param.number_of_tables);
478 prepare_transaction_context(thd, param.trans_ctx_info);
479
480 int ret = 0;
481 FOREACH_OBSERVER_ERROR_OUT(ret, before_dml, ¶m, result);
482
483 my_free(param.tables_info);
484
485 return ret;
486 }
487
se_before_rollback(THD *,plugin_ref plugin,void * arg)488 static bool se_before_rollback(THD *, plugin_ref plugin, void *arg) {
489 handlerton *hton = plugin_data<handlerton *>(plugin);
490 if (hton->se_before_rollback) hton->se_before_rollback(arg);
491 return false;
492 }
493
before_rollback(THD * thd,bool all)494 int Trans_delegate::before_rollback(THD *thd, bool all) {
495 DBUG_TRACE;
496 Trans_param param;
497 TRANS_PARAM_ZERO(param);
498 param.server_id = thd->server_id;
499 param.server_uuid = server_uuid;
500 param.thread_id = thd->thread_id();
501 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
502
503 bool is_real_trans =
504 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
505 if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
506
507 int ret = 0;
508 FOREACH_OBSERVER(ret, before_rollback, (¶m));
509 plugin_foreach(thd, se_before_rollback, MYSQL_STORAGE_ENGINE_PLUGIN, ¶m);
510 return ret;
511 }
512
se_after_commit(THD *,plugin_ref plugin,void * arg)513 static bool se_after_commit(THD *, plugin_ref plugin, void *arg) {
514 handlerton *hton = plugin_data<handlerton *>(plugin);
515 if (hton->se_after_commit) hton->se_after_commit(arg);
516 return false;
517 }
518
after_commit(THD * thd,bool all)519 int Trans_delegate::after_commit(THD *thd, bool all) {
520 DBUG_TRACE;
521 Trans_param param;
522 TRANS_PARAM_ZERO(param);
523 param.server_uuid = server_uuid;
524 param.thread_id = thd->thread_id();
525
526 Gtid gtid;
527 thd->rpl_thd_ctx.last_used_gtid_tracker_ctx().get_last_used_gtid(gtid);
528 param.gtid_info.sidno = gtid.sidno;
529 param.gtid_info.gno = gtid.gno;
530
531 bool is_real_trans =
532 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
533 if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
534
535 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
536 param.server_id = thd->server_id;
537 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
538
539 DBUG_PRINT("enter",
540 ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
541 DEBUG_SYNC(thd, "before_call_after_commit_observer");
542
543 int ret = 0;
544 FOREACH_OBSERVER(ret, after_commit, (¶m));
545 plugin_foreach(thd, se_after_commit, MYSQL_STORAGE_ENGINE_PLUGIN, ¶m);
546 return ret;
547 }
548
after_rollback(THD * thd,bool all)549 int Trans_delegate::after_rollback(THD *thd, bool all) {
550 DBUG_TRACE;
551 Trans_param param;
552 TRANS_PARAM_ZERO(param);
553 param.server_uuid = server_uuid;
554 param.thread_id = thd->thread_id();
555
556 bool is_real_trans =
557 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
558 if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS;
559 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
560 param.server_id = thd->server_id;
561 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
562
563 int ret = 0;
564 FOREACH_OBSERVER(ret, after_rollback, (¶m));
565 return ret;
566 }
567
trans_begin(THD * thd,int & out)568 int Trans_delegate::trans_begin(THD *thd, int &out) {
569 DBUG_TRACE;
570 Trans_param param;
571 TRANS_PARAM_ZERO(param);
572 param.server_uuid = server_uuid;
573 param.thread_id = thd->thread_id();
574 param.group_replication_consistency =
575 thd->variables.group_replication_consistency;
576 param.hold_timeout = thd->variables.net_wait_timeout;
577 param.server_id = thd->server_id;
578 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
579
580 int ret = 0;
581 FOREACH_OBSERVER_ERROR_OUT(ret, begin, ¶m, out);
582 return ret;
583 }
584
after_flush(THD * thd,const char * log_file,my_off_t log_pos)585 int Binlog_storage_delegate::after_flush(THD *thd, const char *log_file,
586 my_off_t log_pos) {
587 DBUG_TRACE;
588 DBUG_PRINT("enter",
589 ("log_file: %s, log_pos: %llu", log_file, (ulonglong)log_pos));
590 Binlog_storage_param param;
591 param.server_id = thd->server_id;
592
593 int ret = 0;
594 FOREACH_OBSERVER(ret, after_flush, (¶m, log_file, log_pos));
595 return ret;
596 }
597
598 /**
599 * This hook MUST be invoked after ALL recovery operations are performed
600 * and the server is ready to serve clients.
601 *
602 * @return 0 on success, >0 otherwise.
603 */
before_handle_connection(THD *)604 int Server_state_delegate::before_handle_connection(THD *) {
605 DBUG_TRACE;
606 Server_state_param param;
607
608 int ret = 0;
609 FOREACH_OBSERVER(ret, before_handle_connection, (¶m));
610 return ret;
611 }
612
613 /**
614 * This hook MUST be invoked before ANY recovery action is started.
615 *
616 * @return 0 on success, >0 otherwise.
617 */
before_recovery(THD *)618 int Server_state_delegate::before_recovery(THD *) {
619 DBUG_TRACE;
620 Server_state_param param;
621
622 int ret = 0;
623 FOREACH_OBSERVER(ret, before_recovery, (¶m));
624 return ret;
625 }
626
627 /**
628 * This hook MUST be invoked after the recovery from the engine
629 * is complete.
630 *
631 * @return 0 on success, >0 otherwise.
632 */
after_engine_recovery(THD *)633 int Server_state_delegate::after_engine_recovery(THD *) {
634 DBUG_TRACE;
635 Server_state_param param;
636
637 int ret = 0;
638 FOREACH_OBSERVER(ret, after_engine_recovery, (¶m));
639 return ret;
640 }
641
642 /**
643 * This hook MUST be invoked after the server has completed the
644 * local recovery. The server can proceed with the further operations
645 * like engaging in distributed recovery etc.
646 *
647 * @return 0 on success, >0 otherwise.
648 */
after_recovery(THD *)649 int Server_state_delegate::after_recovery(THD *) {
650 DBUG_TRACE;
651 Server_state_param param;
652
653 int ret = 0;
654 FOREACH_OBSERVER(ret, after_recovery, (¶m));
655 return ret;
656 }
657
658 /**
659 * This hook MUST be invoked before server shutdown action is
660 * initiated.
661 *
662 * @return 0 on success, >0 otherwise.
663 */
before_server_shutdown(THD *)664 int Server_state_delegate::before_server_shutdown(THD *) {
665 DBUG_TRACE;
666 Server_state_param param;
667
668 int ret = 0;
669 FOREACH_OBSERVER(ret, before_server_shutdown, (¶m));
670 return ret;
671 }
672
673 /**
674 * This hook MUST be invoked after server shutdown operation is
675 * complete.
676 *
677 * @return 0 on success, >0 otherwise.
678 */
after_server_shutdown(THD *)679 int Server_state_delegate::after_server_shutdown(THD *) {
680 DBUG_TRACE;
681 Server_state_param param;
682
683 int ret = 0;
684 FOREACH_OBSERVER(ret, after_server_shutdown, (¶m));
685 return ret;
686 }
687
688 /**
689 * This hook MUST be invoked after upgrade from .frm to data dictionary
690 *
691 * @return 0 on success, >0 otherwise.
692 */
after_dd_upgrade_from_57(THD *)693 int Server_state_delegate::after_dd_upgrade_from_57(THD *) {
694 DBUG_TRACE;
695 Server_state_param param;
696
697 int ret = 0;
698 FOREACH_OBSERVER(ret, after_dd_upgrade_from_57, (¶m));
699 return ret;
700 }
701
after_sync(THD * thd,const char * log_file,my_off_t log_pos)702 int Binlog_storage_delegate::after_sync(THD *thd, const char *log_file,
703 my_off_t log_pos) {
704 DBUG_TRACE;
705 DBUG_PRINT("enter",
706 ("log_file: %s, log_pos: %llu", log_file, (ulonglong)log_pos));
707 Binlog_storage_param param;
708 param.server_id = thd->server_id;
709
710 DBUG_ASSERT(log_pos != 0);
711 int ret = 0;
712 FOREACH_OBSERVER(ret, after_sync, (¶m, log_file, log_pos));
713
714 DEBUG_SYNC(thd, "after_call_after_sync_observer");
715 return ret;
716 }
717
transmit_start(THD * thd,ushort flags,const char * log_file,my_off_t log_pos,bool * observe_transmission)718 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
719 const char *log_file,
720 my_off_t log_pos,
721 bool *observe_transmission) {
722 Binlog_transmit_param param;
723 param.flags = flags;
724 param.server_id = thd->server_id;
725
726 int ret = 0;
727 FOREACH_OBSERVER(ret, transmit_start, (¶m, log_file, log_pos));
728 *observe_transmission = param.should_observe();
729 return ret;
730 }
731
transmit_stop(THD * thd,ushort flags)732 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) {
733 Binlog_transmit_param param;
734 param.flags = flags;
735 param.server_id = thd->server_id;
736
737 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
738
739 int ret = 0;
740 FOREACH_OBSERVER(ret, transmit_stop, (¶m));
741 return ret;
742 }
743
reserve_header(THD * thd,ushort flags,String * packet)744 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
745 String *packet) {
746 /* NOTE2ME: Maximum extra header size for each observer, I hope 32
747 bytes should be enough for each Observer to reserve their extra
748 header. If later found this is not enough, we can increase this
749 /HEZX
750 */
751 #define RESERVE_HEADER_SIZE 32
752 unsigned char header[RESERVE_HEADER_SIZE];
753 ulong hlen;
754 Binlog_transmit_param param;
755 param.flags = flags;
756 param.server_id = thd->server_id;
757
758 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
759
760 int ret = 0;
761 read_lock();
762 Observer_info_iterator iter = observer_info_iter();
763 Observer_info *info = iter++;
764 for (; info; info = iter++) {
765 plugin_ref plugin = my_plugin_lock(thd, &info->plugin);
766 if (!plugin) {
767 ret = 1;
768 break;
769 }
770 hlen = 0;
771 if (((Observer *)info->observer)->reserve_header &&
772 ((Observer *)info->observer)
773 ->reserve_header(¶m, header, RESERVE_HEADER_SIZE, &hlen)) {
774 ret = 1;
775 plugin_unlock(thd, plugin);
776 break;
777 }
778 plugin_unlock(thd, plugin);
779 if (hlen == 0) continue;
780 if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) {
781 ret = 1;
782 break;
783 }
784 }
785 unlock();
786 return ret;
787 }
788
before_send_event(THD * thd,ushort flags,String * packet,const char * log_file,my_off_t log_pos)789 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
790 String *packet,
791 const char *log_file,
792 my_off_t log_pos) {
793 Binlog_transmit_param param;
794 param.flags = flags;
795 param.server_id = thd->server_id;
796
797 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
798
799 int ret = 0;
800 FOREACH_OBSERVER(
801 ret, before_send_event,
802 (¶m, pointer_cast<uchar *>(packet->ptr()), packet->length(),
803 log_file + dirname_length(log_file), log_pos));
804 return ret;
805 }
806
after_send_event(THD * thd,ushort flags,String * packet,const char * skipped_log_file,my_off_t skipped_log_pos)807 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
808 String *packet,
809 const char *skipped_log_file,
810 my_off_t skipped_log_pos) {
811 Binlog_transmit_param param;
812 param.flags = flags;
813 param.server_id = thd->server_id;
814
815 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
816
817 int ret = 0;
818 FOREACH_OBSERVER(
819 ret, after_send_event,
820 (¶m, packet->ptr(), packet->length(),
821 skipped_log_file + dirname_length(skipped_log_file), skipped_log_pos));
822 return ret;
823 }
824
after_reset_master(THD * thd,ushort flags)825 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
826
827 {
828 Binlog_transmit_param param;
829 param.flags = flags;
830 param.server_id = thd->server_id;
831
832 int ret = 0;
833 FOREACH_OBSERVER(ret, after_reset_master, (¶m));
834 return ret;
835 }
836
init_param(Binlog_relay_IO_param * param,Master_info * mi)837 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
838 Master_info *mi) {
839 param->mysql = mi->mysql;
840 param->channel_name = mi->get_channel();
841 param->user = const_cast<char *>(mi->get_user());
842 param->host = mi->host;
843 param->port = mi->port;
844 param->master_log_name = const_cast<char *>(mi->get_master_log_name());
845 param->master_log_pos = mi->get_master_log_pos();
846 }
847
thread_start(THD * thd,Master_info * mi)848 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) {
849 Binlog_relay_IO_param param;
850 init_param(¶m, mi);
851 param.server_id = thd->server_id;
852 param.thread_id = thd->thread_id();
853
854 int ret = 0;
855 FOREACH_OBSERVER(ret, thread_start, (¶m));
856 return ret;
857 }
858
thread_stop(THD * thd,Master_info * mi)859 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) {
860 Binlog_relay_IO_param param;
861 init_param(¶m, mi);
862 param.server_id = thd->server_id;
863 param.thread_id = thd->thread_id();
864
865 int ret = 0;
866 FOREACH_OBSERVER(ret, thread_stop, (¶m));
867 return ret;
868 }
869
applier_start(THD * thd,Master_info * mi)870 int Binlog_relay_IO_delegate::applier_start(THD *thd, Master_info *mi) {
871 Binlog_relay_IO_param param;
872 init_param(¶m, mi);
873 param.server_id = thd->server_id;
874 param.thread_id = thd->thread_id();
875
876 int ret = 0;
877 FOREACH_OBSERVER(ret, applier_start, (¶m));
878 return ret;
879 }
880
applier_stop(THD * thd,Master_info * mi,bool aborted)881 int Binlog_relay_IO_delegate::applier_stop(THD *thd, Master_info *mi,
882 bool aborted) {
883 Binlog_relay_IO_param param;
884 init_param(¶m, mi);
885 param.server_id = thd->server_id;
886 param.thread_id = thd->thread_id();
887
888 int ret = 0;
889 FOREACH_OBSERVER(ret, applier_stop, (¶m, aborted));
890 return ret;
891 }
892
before_request_transmit(THD * thd,Master_info * mi,ushort flags)893 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, Master_info *mi,
894 ushort flags) {
895 Binlog_relay_IO_param param;
896 init_param(¶m, mi);
897 param.server_id = thd->server_id;
898 param.thread_id = thd->thread_id();
899
900 int ret = 0;
901 FOREACH_OBSERVER(ret, before_request_transmit, (¶m, (uint32)flags));
902 return ret;
903 }
904
after_read_event(THD * thd,Master_info * mi,const char * packet,ulong len,const char ** event_buf,ulong * event_len)905 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
906 const char *packet, ulong len,
907 const char **event_buf,
908 ulong *event_len) {
909 Binlog_relay_IO_param param;
910 init_param(¶m, mi);
911 param.server_id = thd->server_id;
912 param.thread_id = thd->thread_id();
913
914 int ret = 0;
915 FOREACH_OBSERVER(ret, after_read_event,
916 (¶m, packet, len, event_buf, event_len));
917 return ret;
918 }
919
after_queue_event(THD * thd,Master_info * mi,const char * event_buf,ulong event_len,bool synced)920 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
921 const char *event_buf,
922 ulong event_len, bool synced) {
923 Binlog_relay_IO_param param;
924 init_param(¶m, mi);
925 param.server_id = thd->server_id;
926 param.thread_id = thd->thread_id();
927
928 uint32 flags = 0;
929 if (synced) flags |= BINLOG_STORAGE_IS_SYNCED;
930
931 int ret = 0;
932 FOREACH_OBSERVER(ret, after_queue_event,
933 (¶m, event_buf, event_len, flags));
934 return ret;
935 }
936
after_reset_slave(THD * thd,Master_info * mi)937 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
938
939 {
940 Binlog_relay_IO_param param;
941 init_param(¶m, mi);
942 param.server_id = thd->server_id;
943 param.thread_id = thd->thread_id();
944
945 int ret = 0;
946 FOREACH_OBSERVER(ret, after_reset_slave, (¶m));
947 return ret;
948 }
949
applier_log_event(THD * thd,int & out)950 int Binlog_relay_IO_delegate::applier_log_event(THD *thd, int &out) {
951 DBUG_TRACE;
952 Trans_param trans_param;
953 TRANS_PARAM_ZERO(trans_param);
954 Binlog_relay_IO_param param;
955
956 param.server_id = thd->server_id;
957 param.thread_id = thd->thread_id();
958
959 prepare_table_info(thd, trans_param.tables_info,
960 trans_param.number_of_tables);
961
962 int ret = 0;
963 FOREACH_OBSERVER(ret, applier_log_event, (¶m, &trans_param, out));
964
965 my_free(trans_param.tables_info);
966
967 return ret;
968 }
969
register_trans_observer(Trans_observer * observer,void * p)970 int register_trans_observer(Trans_observer *observer, void *p) {
971 return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
972 }
973
unregister_trans_observer(Trans_observer * observer,void *)974 int unregister_trans_observer(Trans_observer *observer, void *) {
975 return transaction_delegate->remove_observer(observer);
976 }
977
register_binlog_storage_observer(Binlog_storage_observer * observer,void * p)978 int register_binlog_storage_observer(Binlog_storage_observer *observer,
979 void *p) {
980 DBUG_TRACE;
981 int result =
982 binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
983 return result;
984 }
985
unregister_binlog_storage_observer(Binlog_storage_observer * observer,void *)986 int unregister_binlog_storage_observer(Binlog_storage_observer *observer,
987 void *) {
988 return binlog_storage_delegate->remove_observer(observer);
989 }
990
register_server_state_observer(Server_state_observer * observer,void * plugin_var)991 int register_server_state_observer(Server_state_observer *observer,
992 void *plugin_var) {
993 DBUG_TRACE;
994 int result = server_state_delegate->add_observer(observer,
995 (st_plugin_int *)plugin_var);
996 return result;
997 }
998
unregister_server_state_observer(Server_state_observer * observer,void *)999 int unregister_server_state_observer(Server_state_observer *observer, void *) {
1000 DBUG_TRACE;
1001 int result = server_state_delegate->remove_observer(observer);
1002 return result;
1003 }
1004
register_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)1005 int register_binlog_transmit_observer(Binlog_transmit_observer *observer,
1006 void *p) {
1007 return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
1008 }
1009
unregister_binlog_transmit_observer(Binlog_transmit_observer * observer,void *)1010 int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer,
1011 void *) {
1012 return binlog_transmit_delegate->remove_observer(observer);
1013 }
1014
register_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)1015 int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
1016 void *p) {
1017 return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
1018 }
1019
unregister_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void *)1020 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
1021 void *) {
1022 return binlog_relay_io_delegate->remove_observer(observer);
1023 }
1024
launch_hook_trans_begin(THD * thd,TABLE_LIST * all_tables)1025 int launch_hook_trans_begin(THD *thd, TABLE_LIST *all_tables) {
1026 DBUG_TRACE;
1027 LEX *lex = thd->lex;
1028 enum_sql_command sql_command = lex->sql_command;
1029 // by default commands are put on hold
1030 bool hold_command = true;
1031 int ret = 0;
1032
1033 // if command belong to a transaction that already pass by hook, it can
1034 // continue
1035 if (thd->get_transaction()->was_trans_begin_hook_invoked()) {
1036 return 0;
1037 }
1038
1039 bool is_show = ((sql_command_flags[sql_command] & CF_STATUS_COMMAND) &&
1040 (sql_command != SQLCOM_BINLOG_BASE64_EVENT)) ||
1041 (sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1042 bool is_set = (sql_command == SQLCOM_SET_OPTION);
1043 bool is_select = (sql_command == SQLCOM_SELECT);
1044 bool is_do = (sql_command == SQLCOM_DO);
1045 bool is_empty = (sql_command == SQLCOM_EMPTY_QUERY);
1046 bool is_use = (sql_command == SQLCOM_CHANGE_DB);
1047 bool is_stop_gr = (sql_command == SQLCOM_STOP_GROUP_REPLICATION);
1048 bool is_shutdown = (sql_command == SQLCOM_SHUTDOWN);
1049 bool is_reset_persist =
1050 (sql_command == SQLCOM_RESET && lex->option_type == OPT_PERSIST);
1051
1052 if ((is_set || is_do || is_show || is_empty || is_use || is_stop_gr ||
1053 is_shutdown || is_reset_persist) &&
1054 !lex->uses_stored_routines()) {
1055 return 0;
1056 }
1057
1058 if (is_select) {
1059 bool is_udf = false;
1060
1061 // if select is an udf function
1062 SELECT_LEX *select_lex_elem = lex->unit->first_select();
1063 while (select_lex_elem != nullptr) {
1064 Item *item;
1065 List_iterator_fast<Item> it(select_lex_elem->fields_list);
1066 while ((item = it++)) {
1067 if (item->type() == Item::FUNC_ITEM) {
1068 Item_func *func_item = down_cast<Item_func *>(item);
1069 Item_func::Functype functype = func_item->functype();
1070 if (functype == Item_func::FUNC_SP || functype == Item_func::UDF_FUNC)
1071 is_udf = true;
1072 }
1073 }
1074 select_lex_elem = select_lex_elem->next_select();
1075 }
1076
1077 if (!is_udf && all_tables == nullptr) {
1078 // SELECT that don't use tables and isn't a UDF
1079 hold_command = false;
1080 }
1081
1082 if (hold_command && all_tables != nullptr) {
1083 // SELECT that use tables
1084 bool is_perf_schema_table = false;
1085 bool is_process_list = false;
1086 bool is_sys_db = false;
1087 bool stop_db_check = false;
1088
1089 for (TABLE_LIST *table = all_tables; table && !stop_db_check;
1090 table = table->next_global) {
1091 DBUG_ASSERT(table->db && table->table_name);
1092
1093 if (is_perfschema_db(table->db, table->db_length))
1094 is_perf_schema_table = true;
1095 else if (is_infoschema_db(table->db, table->db_length) &&
1096 !my_strcasecmp(system_charset_info, "PROCESSLIST",
1097 table->table_name)) {
1098 is_process_list = true;
1099 } else if (table->db_length == 3 &&
1100 !my_strcasecmp(system_charset_info, "sys", table->db)) {
1101 is_sys_db = true;
1102 } else {
1103 is_perf_schema_table = false;
1104 is_process_list = false;
1105 is_sys_db = false;
1106 stop_db_check = true;
1107 }
1108 }
1109
1110 if (is_process_list || is_perf_schema_table || is_sys_db) {
1111 hold_command = false;
1112 }
1113 }
1114 }
1115
1116 if (hold_command) {
1117 DBUG_EXECUTE_IF("launch_hook_trans_begin_assert_if_hold",
1118 { DBUG_ASSERT(0); };);
1119
1120 PSI_stage_info old_stage;
1121 thd->enter_stage(&stage_hook_begin_trans, &old_stage, __func__, __FILE__,
1122 __LINE__);
1123 RUN_HOOK(transaction, trans_begin, (thd, ret));
1124 THD_STAGE_INFO(thd, old_stage);
1125 if (!ret && (sql_command == SQLCOM_BEGIN ||
1126 thd->in_active_multi_stmt_transaction())) {
1127 thd->get_transaction()->set_trans_begin_hook_invoked();
1128 }
1129 }
1130
1131 return ret;
1132 }
1133