1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "rpl_handler.h"
24
25 #include "debug_sync.h" // DEBUG_SYNC
26 #include "log.h" // sql_print_error
27 #include "replication.h" // Trans_param
28 #include "rpl_mi.h" // Master_info
29 #include "sql_class.h" // THD
30 #include "sql_plugin.h" // plugin_int_to_ref
31
32
33 #include <vector>
34
35 Trans_delegate *transaction_delegate;
36 Binlog_storage_delegate *binlog_storage_delegate;
37 Server_state_delegate *server_state_delegate;
38
39 #ifdef HAVE_REPLICATION
40 Binlog_transmit_delegate *binlog_transmit_delegate;
41 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
42 #endif /* HAVE_REPLICATION */
43
44 bool opt_replication_optimize_for_static_plugin_config= 0;
45 bool opt_replication_sender_observe_commit_only= 0;
46 int32 opt_atomic_replication_sender_observe_commit_only= 0;
47
Observer_info(void * ob,st_plugin_int * p)48 Observer_info::Observer_info(void *ob, st_plugin_int *p)
49 : observer(ob), plugin_int(p)
50 {
51 plugin= plugin_int_to_ref(plugin_int);
52 }
53
Delegate(PSI_rwlock_key key)54 Delegate::Delegate(
55 #ifdef HAVE_PSI_INTERFACE
56 PSI_rwlock_key key
57 #endif
58 )
59 {
60 inited= FALSE;
61 my_atomic_fas32(&m_configured_lock_type,
62 opt_replication_optimize_for_static_plugin_config
63 ? DELEGATE_SPIN_LOCK
64 : DELEGATE_OS_LOCK);
65 my_atomic_store32(&m_acquired_locks, 0);
66 #ifdef HAVE_PSI_INTERFACE
67 if (mysql_rwlock_init(key, &lock)) return;
68 #else
69 if (mysql_rwlock_init(0, &lock)) return;
70 #endif
71 init_sql_alloc(key_memory_delegate, &memroot, 1024, 0);
72 inited= TRUE;
73 }
74
~Delegate()75 Delegate::~Delegate()
76 {
77 inited= FALSE;
78 mysql_rwlock_destroy(&lock);
79 free_root(&memroot, MYF(0));
80 }
81
add_observer(void * observer,st_plugin_int * plugin)82 int Delegate::add_observer(void *observer, st_plugin_int *plugin)
83 {
84 int ret= FALSE;
85 if (!inited) return TRUE;
86 write_lock();
87 Observer_info_iterator iter(observer_info_list);
88 Observer_info *info= iter++;
89 while (info && info->observer != observer) info= iter++;
90 if (!info)
91 {
92 info= new Observer_info(observer, plugin);
93 if (!info || observer_info_list.push_back(info, &memroot))
94 ret= TRUE;
95 else if (this->use_spin_lock_type())
96 acquire_plugin_ref_count(info);
97 }
98 else
99 ret= TRUE;
100 unlock();
101 return ret;
102 }
103
remove_observer(void * observer,st_plugin_int * plugin)104 int Delegate::remove_observer(void *observer, st_plugin_int *plugin)
105 {
106 int ret= FALSE;
107 if (!inited) return TRUE;
108 write_lock();
109 Observer_info_iterator iter(observer_info_list);
110 Observer_info *info= iter++;
111 while (info && info->observer != observer) info= iter++;
112 if (info)
113 {
114 iter.remove();
115 delete info;
116 }
117 else
118 ret= TRUE;
119 unlock();
120 return ret;
121 }
122
observer_info_iter()123 Delegate::Observer_info_iterator Delegate::observer_info_iter()
124 {
125 return Observer_info_iterator(observer_info_list);
126 }
127
is_empty()128 bool Delegate::is_empty()
129 {
130 DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
131 return observer_info_list.is_empty();
132 }
133
read_lock()134 int Delegate::read_lock()
135 {
136 if (!inited) return 1;
137 this->lock_it(DELEGATE_LOCK_MODE_SHARED);
138 return 0;
139 }
140
write_lock()141 int Delegate::write_lock()
142 {
143 if (!inited) return 1;
144 this->lock_it(DELEGATE_LOCK_MODE_EXCLUSIVE);
145 return 0;
146 }
147
unlock()148 int Delegate::unlock()
149 {
150 if (!inited) return 1;
151
152 int result= 0;
153
154 if (my_atomic_load32(&m_acquired_locks) > 0)
155 {
156 my_atomic_add32(&m_acquired_locks, -DELEGATE_SPIN_LOCK);
157 if (m_spin_lock.is_exclusive_acquisition())
158 m_spin_lock.release_exclusive();
159 else
160 {
161 assert(m_spin_lock.is_shared_acquisition());
162 m_spin_lock.release_shared();
163 }
164 }
165 else
166 {
167 assert(my_atomic_load32(&m_acquired_locks) < 0);
168 my_atomic_add32(&m_acquired_locks, -DELEGATE_OS_LOCK);
169 result= mysql_rwlock_unlock(&lock);
170 }
171
172 return result;
173 }
174
is_inited()175 bool Delegate::is_inited() { return inited; }
176
update_lock_type()177 void Delegate::update_lock_type()
178 {
179 if (!inited) return;
180
181 int32 opt_value= opt_replication_optimize_for_static_plugin_config
182 ? DELEGATE_SPIN_LOCK
183 : DELEGATE_OS_LOCK;
184 my_atomic_fas32(&m_configured_lock_type, opt_value);
185 }
186
update_plugin_ref_count()187 void Delegate::update_plugin_ref_count()
188 {
189 if (!inited) return;
190 int32 opt_value= opt_replication_optimize_for_static_plugin_config
191 ? DELEGATE_SPIN_LOCK
192 : DELEGATE_OS_LOCK;
193 int32 intern_value= my_atomic_load32(&m_configured_lock_type);
194
195 if (intern_value == DELEGATE_SPIN_LOCK && opt_value == DELEGATE_OS_LOCK)
196 {
197 for (std::map<plugin_ref, size_t>::iterator ref=
198 m_acquired_references.begin();
199 ref != m_acquired_references.end(); ++ref)
200 {
201 for (size_t count= ref->second; count != 0; --count)
202 plugin_unlock(NULL, ref->first);
203 }
204 m_acquired_references.clear();
205 }
206 else if (intern_value == DELEGATE_OS_LOCK && opt_value == DELEGATE_SPIN_LOCK)
207 {
208 Observer_info_iterator iter= observer_info_iter();
209 for (Observer_info *info= iter++; info; info= iter++)
210 {
211 acquire_plugin_ref_count(info);
212 }
213 }
214 }
215
use_rw_lock_type()216 bool Delegate::use_rw_lock_type()
217 {
218 return my_atomic_load32(&m_acquired_locks) <
219 0 || // If there are acquisitions using the read-write lock
220 (my_atomic_load32(&m_configured_lock_type) ==
221 DELEGATE_OS_LOCK && // or the lock type has been set to use the
222 // read-write lock
223 my_atomic_load32(&m_acquired_locks) ==
224 0); // and there are no outstanding acquisitions using shared
225 // spin-lock, use the read-write lock
226 }
227
use_spin_lock_type()228 bool Delegate::use_spin_lock_type()
229 {
230 return my_atomic_load32(&m_acquired_locks) >
231 0 || // If there are acquisitions using the shared spin-lock
232 (my_atomic_load32(&m_configured_lock_type) ==
233 DELEGATE_SPIN_LOCK && // or the lock type has been set to use the
234 // shared spin-lock
235 my_atomic_load32(&m_acquired_locks) ==
236 0); // and there are no outstanding acquisitions using read-write
237 // lock, use the shared spin-lock
238 }
239
acquire_plugin_ref_count(Observer_info * info)240 void Delegate::acquire_plugin_ref_count(Observer_info *info)
241 {
242 plugin_ref internal_ref= plugin_lock(NULL, &info->plugin);
243 ++(m_acquired_references[internal_ref]);
244 }
245
lock_it(enum_delegate_lock_mode mode)246 void Delegate::lock_it(enum_delegate_lock_mode mode)
247 {
248 do
249 {
250 if (this->use_spin_lock_type())
251 {
252 if (mode == DELEGATE_LOCK_MODE_SHARED)
253 m_spin_lock.acquire_shared();
254 else
255 m_spin_lock.acquire_exclusive();
256
257 if (my_atomic_load32(&m_configured_lock_type) !=
258 DELEGATE_SPIN_LOCK) // Lock type changed in the meanwhile, lets
259 // revert the acquisition and try again
260 {
261 if (mode == DELEGATE_LOCK_MODE_SHARED)
262 m_spin_lock.release_shared();
263 else
264 m_spin_lock.release_exclusive();
265 }
266 else
267 {
268 my_atomic_add32(&m_acquired_locks, DELEGATE_SPIN_LOCK);
269 break;
270 }
271 }
272 if (this->use_rw_lock_type())
273 {
274 if (mode == DELEGATE_LOCK_MODE_SHARED)
275 mysql_rwlock_rdlock(&lock);
276 else
277 mysql_rwlock_wrlock(&lock);
278
279 if (my_atomic_load32(&m_configured_lock_type) !=
280 DELEGATE_OS_LOCK) // Lock type changed in the meanwhile, lets revert
281 // the acquisition and try again
282 mysql_rwlock_unlock(&lock);
283 else
284 {
285 my_atomic_add32(&m_acquired_locks, DELEGATE_OS_LOCK);
286 break;
287 }
288 }
289 } while (true);
290 }
291
292 /*
293 structure to save transaction log filename and position
294 */
295 typedef struct Trans_binlog_info {
296 my_off_t log_pos;
297 char log_file[FN_REFLEN];
298 } Trans_binlog_info;
299
get_user_var_int(const char * name,long long int * value,int * null_value)300 int get_user_var_int(const char *name,
301 long long int *value, int *null_value)
302 {
303 my_bool null_val;
304 THD *thd= current_thd;
305
306 /* Protects thd->user_vars. */
307 mysql_mutex_lock(&thd->LOCK_thd_data);
308
309 user_var_entry *entry=
310 (user_var_entry*) my_hash_search(&thd->user_vars,
311 (uchar*) name, strlen(name));
312 if (!entry)
313 {
314 mysql_mutex_unlock(&thd->LOCK_thd_data);
315 return 1;
316 }
317 *value= entry->val_int(&null_val);
318 if (null_value)
319 *null_value= null_val;
320 mysql_mutex_unlock(&thd->LOCK_thd_data);
321 return 0;
322 }
323
get_user_var_real(const char * name,double * value,int * null_value)324 int get_user_var_real(const char *name,
325 double *value, int *null_value)
326 {
327 my_bool null_val;
328 THD *thd= current_thd;
329
330 /* Protects thd->user_vars. */
331 mysql_mutex_lock(&thd->LOCK_thd_data);
332
333 user_var_entry *entry=
334 (user_var_entry*) my_hash_search(&thd->user_vars,
335 (uchar*) name, strlen(name));
336 if (!entry)
337 {
338 mysql_mutex_unlock(&thd->LOCK_thd_data);
339 return 1;
340 }
341 *value= entry->val_real(&null_val);
342 if (null_value)
343 *null_value= null_val;
344 mysql_mutex_unlock(&thd->LOCK_thd_data);
345 return 0;
346 }
347
get_user_var_str(const char * name,char * value,size_t len,unsigned int precision,int * null_value)348 int get_user_var_str(const char *name, char *value,
349 size_t len, unsigned int precision, int *null_value)
350 {
351 String str;
352 my_bool null_val;
353 THD *thd= current_thd;
354
355 /* Protects thd->user_vars. */
356 mysql_mutex_lock(&thd->LOCK_thd_data);
357
358 user_var_entry *entry=
359 (user_var_entry*) my_hash_search(&thd->user_vars,
360 (uchar*) name, strlen(name));
361 if (!entry)
362 {
363 mysql_mutex_unlock(&thd->LOCK_thd_data);
364 return 1;
365 }
366 entry->val_str(&null_val, &str, precision);
367 strncpy(value, str.c_ptr(), len);
368 if (null_value)
369 *null_value= null_val;
370 mysql_mutex_unlock(&thd->LOCK_thd_data);
371 return 0;
372 }
373
delegates_init()374 int delegates_init()
375 {
376 static my_aligned_storage<sizeof(Trans_delegate),
377 MY_ALIGNOF(longlong)> trans_mem;
378 static my_aligned_storage<sizeof(Binlog_storage_delegate),
379 MY_ALIGNOF(longlong)> storage_mem;
380 static my_aligned_storage<sizeof(Server_state_delegate),
381 MY_ALIGNOF(longlong)> server_state_mem;
382 #ifdef HAVE_REPLICATION
383 static my_aligned_storage<sizeof(Binlog_transmit_delegate),
384 MY_ALIGNOF(longlong)> transmit_mem;
385 static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
386 MY_ALIGNOF(longlong)> relay_io_mem;
387 #endif
388
389 void *place_trans_mem= trans_mem.data;
390 void *place_storage_mem= storage_mem.data;
391 void *place_state_mem= server_state_mem.data;
392
393 transaction_delegate= new (place_trans_mem) Trans_delegate;
394
395 if (!transaction_delegate->is_inited())
396 {
397 sql_print_error("Initialization of transaction delegates failed. "
398 "Please report a bug.");
399 return 1;
400 }
401
402 binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
403
404 if (!binlog_storage_delegate->is_inited())
405 {
406 sql_print_error("Initialization binlog storage delegates failed. "
407 "Please report a bug.");
408 return 1;
409 }
410
411 server_state_delegate= new (place_state_mem) Server_state_delegate;
412
413 #ifdef HAVE_REPLICATION
414 void *place_transmit_mem= transmit_mem.data;
415 void *place_relay_io_mem= relay_io_mem.data;
416
417 binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
418
419 if (!binlog_transmit_delegate->is_inited())
420 {
421 sql_print_error("Initialization of binlog transmit delegates failed. "
422 "Please report a bug.");
423 return 1;
424 }
425
426 binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
427
428 if (!binlog_relay_io_delegate->is_inited())
429 {
430 sql_print_error("Initialization binlog relay IO delegates failed. "
431 "Please report a bug.");
432 return 1;
433 }
434 #endif
435
436 return 0;
437 }
438
delegates_shutdown()439 void delegates_shutdown()
440 {
441 if (opt_replication_optimize_for_static_plugin_config)
442 {
443 opt_replication_optimize_for_static_plugin_config= false;
444 delegates_acquire_locks();
445 delegates_update_lock_type();
446 delegates_release_locks();
447 }
448 }
449
delegates_destroy()450 void delegates_destroy()
451 {
452 if (transaction_delegate)
453 transaction_delegate->~Trans_delegate();
454 if (binlog_storage_delegate)
455 binlog_storage_delegate->~Binlog_storage_delegate();
456 if (server_state_delegate)
457 server_state_delegate->~Server_state_delegate();
458 #ifdef HAVE_REPLICATION
459 if (binlog_transmit_delegate)
460 binlog_transmit_delegate->~Binlog_transmit_delegate();
461 if (binlog_relay_io_delegate)
462 binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
463 #endif /* HAVE_REPLICATION */
464 }
465
delegates_update_plugin_ref_count()466 static void delegates_update_plugin_ref_count()
467 {
468 if (transaction_delegate)
469 transaction_delegate->update_plugin_ref_count();
470 if (binlog_storage_delegate)
471 binlog_storage_delegate->update_plugin_ref_count();
472 if (server_state_delegate)
473 server_state_delegate->update_plugin_ref_count();
474 #ifdef HAVE_REPLICATION
475 if (binlog_transmit_delegate)
476 binlog_transmit_delegate->update_plugin_ref_count();
477 if (binlog_relay_io_delegate)
478 binlog_relay_io_delegate->update_plugin_ref_count();
479 #endif /* HAVE_REPLICATION */
480 }
481
delegates_acquire_locks()482 void delegates_acquire_locks()
483 {
484 if (transaction_delegate)
485 transaction_delegate->write_lock();
486 if (binlog_storage_delegate)
487 binlog_storage_delegate->write_lock();
488 if (server_state_delegate)
489 server_state_delegate->write_lock();
490 #ifdef HAVE_REPLICATION
491 if (binlog_transmit_delegate)
492 binlog_transmit_delegate->write_lock();
493 if (binlog_relay_io_delegate)
494 binlog_relay_io_delegate->write_lock();
495 #endif /* HAVE_REPLICATION */
496 }
497
delegates_release_locks()498 void delegates_release_locks()
499 {
500 if (transaction_delegate)
501 transaction_delegate->unlock();
502 if (binlog_storage_delegate)
503 binlog_storage_delegate->unlock();
504 if (server_state_delegate)
505 server_state_delegate->unlock();
506 #ifdef HAVE_REPLICATION
507 if (binlog_transmit_delegate)
508 binlog_transmit_delegate->unlock();
509 if (binlog_relay_io_delegate)
510 binlog_relay_io_delegate->unlock();
511 #endif /* HAVE_REPLICATION */
512 }
513
delegates_update_lock_type()514 void delegates_update_lock_type()
515 {
516 delegates_update_plugin_ref_count();
517
518 if (transaction_delegate)
519 transaction_delegate->update_lock_type();
520 if (binlog_storage_delegate)
521 binlog_storage_delegate->update_lock_type();
522 if (server_state_delegate)
523 server_state_delegate->update_lock_type();
524 #ifdef HAVE_REPLICATION
525 if (binlog_transmit_delegate)
526 binlog_transmit_delegate->update_lock_type();
527 if (binlog_relay_io_delegate)
528 binlog_relay_io_delegate->update_lock_type();
529 #endif /* HAVE_REPLICATION */
530 }
531
532 /*
533 This macro is used by almost all the Delegate methods to iterate
534 over all the observers running given callback function of the
535 delegate .
536
537 Add observer plugins to the thd->lex list, after each statement, all
538 plugins add to thd->lex will be automatically unlocked.
539 */
540 #define FOREACH_OBSERVER(r, f, thd, args) \
541 /* \
542 Use a struct to make sure that they are allocated adjacent, check \
543 delete_dynamic(). \
544 */ \
545 Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED); \
546 read_lock(); \
547 Observer_info_iterator iter= observer_info_iter(); \
548 Observer_info *info= iter++; \
549 for (; info; info= iter++) \
550 { \
551 bool replication_optimize_for_static_plugin_config= \
552 this->use_spin_lock_type(); \
553 plugin_ref plugin= (replication_optimize_for_static_plugin_config \
554 ? info->plugin \
555 : my_plugin_lock(0, &info->plugin)); \
556 if (!plugin) \
557 { \
558 /* plugin is not intialized or deleted, this is not an error */ \
559 r= 0; \
560 break; \
561 } \
562 if (!replication_optimize_for_static_plugin_config) \
563 plugins.push_back(plugin); \
564 if (((Observer *)info->observer)->f && \
565 ((Observer *)info->observer)->f args) \
566 { \
567 r= 1; \
568 sql_print_error("Run function '" #f "' in plugin '%s' failed", \
569 info->plugin_int->name.str); \
570 break; \
571 } \
572 } \
573 unlock(); \
574 /* \
575 Unlock plugins should be done after we released the Delegate lock \
576 to avoid possible deadlock when this is the last user of the \
577 plugin, and when we unlock the plugin, it will try to \
578 deinitialize the plugin, which will try to lock the Delegate in \
579 order to remove the observers. \
580 */ \
581 if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
582
583 #define FOREACH_OBSERVER_ERROR_OUT(r, f, thd, args, out) \
584 /* \
585 Use a struct to make sure that they are allocated adjacent, check \
586 delete_dynamic(). \
587 */ \
588 Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED); \
589 read_lock(); \
590 Observer_info_iterator iter= observer_info_iter(); \
591 Observer_info *info= iter++; \
592 \
593 int error_out= 0; \
594 for (; info; info= iter++) \
595 { \
596 bool replication_optimize_for_static_plugin_config= \
597 this->use_spin_lock_type(); \
598 plugin_ref plugin= (replication_optimize_for_static_plugin_config \
599 ? info->plugin \
600 : my_plugin_lock(0, &info->plugin)); \
601 if (!plugin) \
602 { \
603 /* plugin is not intialized or deleted, this is not an error */ \
604 r= 0; \
605 break; \
606 } \
607 if (!replication_optimize_for_static_plugin_config) \
608 plugins.push_back(plugin); \
609 \
610 bool hook_error= false; \
611 hook_error= ((Observer *)info->observer)->f(args, error_out); \
612 \
613 out+= error_out; \
614 if (hook_error) \
615 { \
616 r= 1; \
617 sql_print_error("Run function '" #f "' in plugin '%s' failed", \
618 info->plugin_int->name.str); \
619 break; \
620 } \
621 } \
622 unlock(); \
623 /* \
624 Unlock plugins should be done after we released the Delegate lock \
625 to avoid possible deadlock when this is the last user of the \
626 plugin, and when we unlock the plugin, it will try to \
627 deinitialize the plugin, which will try to lock the Delegate in \
628 order to remove the observers. \
629 */ \
630 if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
631
before_commit(THD * thd,bool all,IO_CACHE * trx_cache_log,IO_CACHE * stmt_cache_log,ulonglong cache_log_max_size)632 int Trans_delegate::before_commit(THD *thd, bool all,
633 IO_CACHE *trx_cache_log,
634 IO_CACHE *stmt_cache_log,
635 ulonglong cache_log_max_size)
636 {
637 DBUG_ENTER("Trans_delegate::before_commit");
638 Trans_param param;
639 TRANS_PARAM_ZERO(param);
640 param.server_id= thd->server_id;
641 param.server_uuid= server_uuid;
642 param.thread_id= thd->thread_id();
643 param.gtid_info.type= thd->variables.gtid_next.type;
644 param.gtid_info.sidno= thd->variables.gtid_next.gtid.sidno;
645 param.gtid_info.gno= thd->variables.gtid_next.gtid.gno;
646 param.trx_cache_log= trx_cache_log;
647 param.stmt_cache_log= stmt_cache_log;
648 param.cache_log_max_size= cache_log_max_size;
649 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
650
651 bool is_real_trans=
652 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
653 if (is_real_trans)
654 param.flags|= TRANS_IS_REAL_TRANS;
655
656 int ret= 0;
657 FOREACH_OBSERVER(ret, before_commit, thd, (¶m));
658 DBUG_RETURN(ret);
659 }
660
661 /**
662 Helper method to check if the given table has 'CASCADE' foreign key or not.
663
664 @param[in] TABLE Table object that needs to be verified.
665 @param[in] THD Current execution thread.
666
667 @return bool TRUE If the table has 'CASCADE' foreign key.
668 FALSE If the table does not have 'CASCADE' foreign key.
669 */
has_cascade_foreign_key(TABLE * table,THD * thd)670 bool has_cascade_foreign_key(TABLE *table, THD *thd)
671 {
672 DBUG_ENTER("has_cascade_foreign_key");
673 List<FOREIGN_KEY_INFO> f_key_list;
674 table->file->get_foreign_key_list(thd, &f_key_list);
675
676 FOREIGN_KEY_INFO *f_key_info;
677 List_iterator_fast<FOREIGN_KEY_INFO> foreign_key_iterator(f_key_list);
678 while ((f_key_info=foreign_key_iterator++))
679 {
680 /*
681 The possible values for update_method are
682 {"CASCADE", "SET NULL", "NO ACTION", "RESTRICT"}.
683
684 Hence we are avoiding the usage of strncmp
685 ("'update_method' value with 'CASCADE' or 'SET NULL'") and just comparing
686 the first character of the update_method value with 'C' or 'S'.
687 */
688 if (f_key_info->update_method->str[0] == 'C' ||
689 f_key_info->delete_method->str[0] == 'C' ||
690 f_key_info->update_method->str[0] == 'S' ||
691 f_key_info->delete_method->str[0] == 'S')
692 {
693 assert(!strncmp(f_key_info->update_method->str, "CASCADE", 7) ||
694 !strncmp(f_key_info->delete_method->str, "CASCADE", 7) ||
695 !strncmp(f_key_info->update_method->str, "SET NUL", 7) ||
696 !strncmp(f_key_info->delete_method->str, "SET NUL", 7));
697 DBUG_RETURN(TRUE);
698 }
699 }
700 DBUG_RETURN(FALSE);
701 }
702
703 /**
704 Helper method to create table information for the hook call
705 */
706 void
prepare_table_info(THD * thd,Trans_table_info * & table_info_list,uint & number_of_tables)707 Trans_delegate::prepare_table_info(THD* thd,
708 Trans_table_info*& table_info_list,
709 uint& number_of_tables)
710 {
711 DBUG_ENTER("Trans_delegate::prepare_table_info");
712
713 TABLE* open_tables= thd->open_tables;
714
715 // Fail if tables are not open
716 if(open_tables == NULL)
717 {
718 DBUG_VOID_RETURN;
719 }
720
721 //Gather table information
722 std::vector<Trans_table_info> table_info_holder;
723 for(; open_tables != NULL; open_tables= open_tables->next)
724 {
725 Trans_table_info table_info = {0,0,0,0};
726
727 if (open_tables->no_replicate)
728 {
729 continue;
730 }
731
732 table_info.table_name= open_tables->s->table_name.str;
733
734 uint primary_keys= 0;
735 if(open_tables->key_info != NULL && (open_tables->s->primary_key < MAX_KEY))
736 {
737 primary_keys= open_tables->s->primary_key;
738
739 //if primary keys is still 0, lets double check on another var
740 if(primary_keys == 0)
741 {
742 primary_keys= open_tables->key_info->user_defined_key_parts;
743 }
744 }
745
746 table_info.number_of_primary_keys= primary_keys;
747
748 table_info.db_type= open_tables->s->db_type()->db_type;
749
750 /*
751 Find out if the table has foreign key with ON UPDATE/DELETE CASCADE
752 clause.
753 */
754 table_info.has_cascade_foreign_key= has_cascade_foreign_key(open_tables, thd);
755
756 table_info_holder.push_back(table_info);
757 }
758
759 //Now that one has all the information, one should build the
760 // data that will be delivered to the plugin
761 if(table_info_holder.size() > 0)
762 {
763 number_of_tables= table_info_holder.size();
764
765 table_info_list= (Trans_table_info*)my_malloc(
766 PSI_NOT_INSTRUMENTED,
767 number_of_tables * sizeof(Trans_table_info),
768 MYF(0));
769
770 std::vector<Trans_table_info>::iterator table_info_holder_it
771 = table_info_holder.begin();
772 for(int table= 0;
773 table_info_holder_it != table_info_holder.end();
774 table_info_holder_it++, table++)
775 {
776 table_info_list[table].number_of_primary_keys
777 = (*table_info_holder_it).number_of_primary_keys;
778 table_info_list[table].table_name
779 = (*table_info_holder_it).table_name;
780 table_info_list[table].db_type
781 = (*table_info_holder_it).db_type;
782 table_info_list[table].has_cascade_foreign_key
783 = (*table_info_holder_it).has_cascade_foreign_key;
784 }
785 }
786
787 DBUG_VOID_RETURN;
788 }
789
790 /**
791 Helper that gathers all table runtime information
792
793 @param[in] THD the current execution thread
794 @param[out] ctx_info Trans_context_info in which the result is stored.
795 */
prepare_transaction_context(THD * thd,Trans_context_info & ctx_info)796 void prepare_transaction_context(THD* thd, Trans_context_info& ctx_info)
797 {
798 //Extracting the session value of SQL binlogging
799 ctx_info.binlog_enabled= thd->variables.sql_log_bin;
800
801 //Extracting the session value of binlog format
802 ctx_info.binlog_format= thd->variables.binlog_format;
803
804 //Extracting the global mutable value of binlog checksum
805 ctx_info.binlog_checksum_options= binlog_checksum_options;
806
807 //Extracting the session value of transaction_write_set_extraction
808 ctx_info.transaction_write_set_extraction=
809 thd->variables.transaction_write_set_extraction;
810
811 //Extracting transaction isolation level
812 ctx_info.tx_isolation= thd->tx_isolation;
813 }
814
before_dml(THD * thd,int & result)815 int Trans_delegate::before_dml(THD* thd, int& result)
816 {
817 DBUG_ENTER("Trans_delegate::before_dml");
818 Trans_param param;
819 TRANS_PARAM_ZERO(param);
820
821 param.server_id= thd->server_id;
822 param.server_uuid= server_uuid;
823 param.thread_id= thd->thread_id();
824
825 prepare_table_info(thd, param.tables_info, param.number_of_tables);
826 prepare_transaction_context(thd, param.trans_ctx_info);
827
828 int ret= 0;
829 FOREACH_OBSERVER_ERROR_OUT(ret, before_dml, thd, ¶m, result);
830
831 my_free(param.tables_info);
832
833 DBUG_RETURN(ret);
834 }
835
before_rollback(THD * thd,bool all)836 int Trans_delegate::before_rollback(THD *thd, bool all)
837 {
838 DBUG_ENTER("Trans_delegate::before_rollback");
839 Trans_param param;
840 TRANS_PARAM_ZERO(param);
841 param.server_id= thd->server_id;
842 param.server_uuid= server_uuid;
843 param.thread_id= thd->thread_id();
844 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
845
846 bool is_real_trans=
847 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
848 if (is_real_trans)
849 param.flags|= TRANS_IS_REAL_TRANS;
850
851 int ret= 0;
852 FOREACH_OBSERVER(ret, before_rollback, thd, (¶m));
853 DBUG_RETURN(ret);
854 }
855
after_commit(THD * thd,bool all)856 int Trans_delegate::after_commit(THD *thd, bool all)
857 {
858 DBUG_ENTER("Trans_delegate::after_commit");
859 Trans_param param;
860 TRANS_PARAM_ZERO(param);
861 param.server_uuid= server_uuid;
862 param.thread_id= thd->thread_id();
863 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
864
865 bool is_real_trans=
866 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
867 if (is_real_trans)
868 param.flags|= TRANS_IS_REAL_TRANS;
869
870 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
871 param.server_id= thd->server_id;
872
873 DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
874 DEBUG_SYNC(thd, "before_call_after_commit_observer");
875
876 int ret= 0;
877 FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
878 DBUG_RETURN(ret);
879 }
880
after_rollback(THD * thd,bool all)881 int Trans_delegate::after_rollback(THD *thd, bool all)
882 {
883 DBUG_ENTER("Trans_delegate::after_rollback");
884 Trans_param param;
885 TRANS_PARAM_ZERO(param);
886 param.server_uuid= server_uuid;
887 param.thread_id= thd->thread_id();
888 param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
889
890 bool is_real_trans=
891 (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
892 if (is_real_trans)
893 param.flags|= TRANS_IS_REAL_TRANS;
894 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
895 param.server_id= thd->server_id;
896
897 int ret= 0;
898 FOREACH_OBSERVER(ret, after_rollback, thd, (¶m));
899 DBUG_RETURN(ret);
900 }
901
after_flush(THD * thd,const char * log_file,my_off_t log_pos)902 int Binlog_storage_delegate::after_flush(THD *thd,
903 const char *log_file,
904 my_off_t log_pos)
905 {
906 DBUG_ENTER("Binlog_storage_delegate::after_flush");
907 DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
908 log_file, (ulonglong) log_pos));
909 Binlog_storage_param param;
910 param.server_id= thd->server_id;
911
912 int ret= 0;
913 FOREACH_OBSERVER(ret, after_flush, thd, (¶m, log_file, log_pos));
914 DBUG_RETURN(ret);
915 }
916
917 /**
918 * This hook MUST be invoked after ALL recovery operations are performed
919 * and the server is ready to serve clients.
920 *
921 * @param[in] thd The thread context.
922 * @return 0 on success, >0 otherwise.
923 */
before_handle_connection(THD * thd)924 int Server_state_delegate::before_handle_connection(THD *thd)
925 {
926 DBUG_ENTER("Server_state_delegate::before_client_connection");
927 Server_state_param param;
928
929 int ret= 0;
930 FOREACH_OBSERVER(ret, before_handle_connection, thd, (¶m));
931 DBUG_RETURN(ret);
932
933 }
934
935 /**
936 * This hook MUST be invoked before ANY recovery action is started.
937 *
938 * @param[in] thd The thread context.
939 * @return 0 on success, >0 otherwise.
940 */
before_recovery(THD * thd)941 int Server_state_delegate::before_recovery(THD *thd)
942 {
943 DBUG_ENTER("Server_state_delegate::before_recovery");
944 Server_state_param param;
945
946 int ret= 0;
947 FOREACH_OBSERVER(ret, before_recovery, thd, (¶m));
948 DBUG_RETURN(ret);
949 }
950
951 /**
952 * This hook MUST be invoked after the recovery from the engine
953 * is complete.
954 *
955 * @param[in] thd The thread context.
956 * @return 0 on success, >0 otherwise.
957 */
after_engine_recovery(THD * thd)958 int Server_state_delegate::after_engine_recovery(THD *thd)
959 {
960 DBUG_ENTER("Server_state_delegate::after_engine_recovery");
961 Server_state_param param;
962
963 int ret= 0;
964 FOREACH_OBSERVER(ret, after_engine_recovery, thd, (¶m));
965 DBUG_RETURN(ret);
966
967 }
968
969 /**
970 * This hook MUST be invoked after the server has completed the
971 * local recovery. The server can proceed with the further operations
972 * like engaging in distributed recovery etc.
973 *
974 * @param[in] thd The thread context.
975 * @return 0 on success, >0 otherwise.
976 */
after_recovery(THD * thd)977 int Server_state_delegate::after_recovery(THD *thd)
978 {
979 DBUG_ENTER("Server_state_delegate::after_recovery");
980 Server_state_param param;
981
982 int ret= 0;
983 FOREACH_OBSERVER(ret, after_recovery, thd, (¶m));
984 DBUG_RETURN(ret);
985 }
986
987 /**
988 * This hook MUST be invoked before server shutdown action is
989 * initiated.
990 *
991 * @param[in] thd The thread context.
992 * @return 0 on success, >0 otherwise.
993 */
before_server_shutdown(THD * thd)994 int Server_state_delegate::before_server_shutdown(THD *thd)
995 {
996 DBUG_ENTER("Server_state_delegate::before_server_shutdown");
997 Server_state_param param;
998
999 int ret= 0;
1000 FOREACH_OBSERVER(ret, before_server_shutdown, thd, (¶m));
1001 DBUG_RETURN(ret);
1002 }
1003
1004 /**
1005 * This hook MUST be invoked after server shutdown operation is
1006 * complete.
1007 *
1008 * @param[in] thd The thread context.
1009 * @return 0 on success, >0 otherwise.
1010 */
after_server_shutdown(THD * thd)1011 int Server_state_delegate::after_server_shutdown(THD *thd)
1012 {
1013 DBUG_ENTER("Server_state_delegate::after_server_shutdown");
1014 Server_state_param param;
1015
1016 int ret= 0;
1017 FOREACH_OBSERVER(ret, after_server_shutdown, thd, (¶m));
1018 DBUG_RETURN(ret);
1019 }
1020
after_sync(THD * thd,const char * log_file,my_off_t log_pos)1021 int Binlog_storage_delegate::after_sync(THD *thd,
1022 const char *log_file,
1023 my_off_t log_pos)
1024 {
1025 DBUG_ENTER("Binlog_storage_delegate::after_sync");
1026 DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
1027 log_file, (ulonglong) log_pos));
1028 Binlog_storage_param param;
1029 param.server_id= thd->server_id;
1030
1031 assert(log_pos != 0);
1032 int ret= 0;
1033 FOREACH_OBSERVER(ret, after_sync, thd, (¶m, log_file, log_pos));
1034
1035 DEBUG_SYNC(thd, "after_call_after_sync_observer");
1036 DBUG_RETURN(ret);
1037 }
1038
1039 #ifdef HAVE_REPLICATION
transmit_start(THD * thd,ushort flags,const char * log_file,my_off_t log_pos,bool * observe_transmission)1040 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
1041 const char *log_file,
1042 my_off_t log_pos,
1043 bool *observe_transmission)
1044 {
1045 Binlog_transmit_param param;
1046 param.flags= flags;
1047 param.server_id= thd->server_id;
1048
1049 int ret= 0;
1050 FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
1051 *observe_transmission= param.should_observe();
1052 return ret;
1053 }
1054
transmit_stop(THD * thd,ushort flags)1055 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
1056 {
1057 Binlog_transmit_param param;
1058 param.flags= flags;
1059 param.server_id= thd->server_id;
1060
1061 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1062
1063 int ret= 0;
1064 FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
1065 return ret;
1066 }
1067
reserve_header(THD * thd,ushort flags,String * packet)1068 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
1069 String *packet)
1070 {
1071 /* NOTE2ME: Maximum extra header size for each observer, I hope 32
1072 bytes should be enough for each Observer to reserve their extra
1073 header. If later found this is not enough, we can increase this
1074 /HEZX
1075 */
1076 #define RESERVE_HEADER_SIZE 32
1077 unsigned char header[RESERVE_HEADER_SIZE];
1078 ulong hlen;
1079 Binlog_transmit_param param;
1080 param.flags= flags;
1081 param.server_id= thd->server_id;
1082
1083 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1084
1085 int ret= 0;
1086 read_lock();
1087 Observer_info_iterator iter= observer_info_iter();
1088 Observer_info *info= iter++;
1089 for (; info; info= iter++)
1090 {
1091 bool replication_optimize_for_static_plugin_config=
1092 this->use_spin_lock_type();
1093 plugin_ref plugin= (replication_optimize_for_static_plugin_config
1094 ? info->plugin
1095 : my_plugin_lock(thd, &info->plugin));
1096 if (!plugin)
1097 {
1098 ret= 1;
1099 break;
1100 }
1101 hlen= 0;
1102 if (((Observer *)info->observer)->reserve_header
1103 && ((Observer *)info->observer)->reserve_header(¶m,
1104 header,
1105 RESERVE_HEADER_SIZE,
1106 &hlen))
1107 {
1108 ret= 1;
1109 if (!replication_optimize_for_static_plugin_config)
1110 plugin_unlock(thd, plugin);
1111 break;
1112 }
1113 if (!replication_optimize_for_static_plugin_config)
1114 plugin_unlock(thd, plugin);
1115 if (hlen == 0)
1116 continue;
1117 if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
1118 {
1119 ret= 1;
1120 break;
1121 }
1122 }
1123 unlock();
1124 return ret;
1125 }
1126
before_send_event(THD * thd,ushort flags,String * packet,const char * log_file,my_off_t log_pos)1127 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
1128 String *packet,
1129 const char *log_file,
1130 my_off_t log_pos)
1131 {
1132 Binlog_transmit_param param;
1133 param.flags= flags;
1134 param.server_id= thd->server_id;
1135
1136 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1137
1138 int ret= 0;
1139 FOREACH_OBSERVER(ret, before_send_event, thd,
1140 (¶m, (uchar *)packet->ptr(),
1141 packet->length(),
1142 log_file+dirname_length(log_file), log_pos));
1143 return ret;
1144 }
1145
after_send_event(THD * thd,ushort flags,String * packet,const char * skipped_log_file,my_off_t skipped_log_pos)1146 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
1147 String *packet,
1148 const char *skipped_log_file,
1149 my_off_t skipped_log_pos)
1150 {
1151 Binlog_transmit_param param;
1152 param.flags= flags;
1153 param.server_id= thd->server_id;
1154
1155 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1156
1157 int ret= 0;
1158 FOREACH_OBSERVER(ret, after_send_event, thd,
1159 (¶m, packet->ptr(), packet->length(),
1160 skipped_log_file+dirname_length(skipped_log_file),
1161 skipped_log_pos));
1162 return ret;
1163 }
1164
after_reset_master(THD * thd,ushort flags)1165 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
1166
1167 {
1168 Binlog_transmit_param param;
1169 param.flags= flags;
1170 param.server_id= thd->server_id;
1171
1172 int ret= 0;
1173 FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
1174 return ret;
1175 }
1176
init_param(Binlog_relay_IO_param * param,Master_info * mi)1177 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
1178 Master_info *mi)
1179 {
1180 param->mysql= mi->mysql;
1181 param->channel_name= mi->get_channel();
1182 param->user= const_cast<char *>(mi->get_user());
1183 param->host= mi->host;
1184 param->port= mi->port;
1185 param->master_log_name= const_cast<char *>(mi->get_master_log_name());
1186 param->master_log_pos= mi->get_master_log_pos();
1187 }
1188
thread_start(THD * thd,Master_info * mi)1189 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
1190 {
1191 Binlog_relay_IO_param param;
1192 init_param(¶m, mi);
1193 param.server_id= thd->server_id;
1194 param.thread_id= thd->thread_id();
1195
1196 int ret= 0;
1197 FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
1198 return ret;
1199 }
1200
1201
thread_stop(THD * thd,Master_info * mi)1202 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
1203 {
1204
1205 Binlog_relay_IO_param param;
1206 init_param(¶m, mi);
1207 param.server_id= thd->server_id;
1208 param.thread_id= thd->thread_id();
1209
1210 int ret= 0;
1211 FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
1212 return ret;
1213 }
1214
applier_start(THD * thd,Master_info * mi)1215 int Binlog_relay_IO_delegate::applier_start(THD *thd, Master_info *mi)
1216 {
1217 Binlog_relay_IO_param param;
1218 init_param(¶m, mi);
1219 param.server_id= thd->server_id;
1220 param.thread_id= thd->thread_id();
1221
1222 int ret= 0;
1223 FOREACH_OBSERVER(ret, applier_start, thd, (¶m));
1224 return ret;
1225 }
1226
applier_stop(THD * thd,Master_info * mi,bool aborted)1227 int Binlog_relay_IO_delegate::applier_stop(THD *thd,
1228 Master_info *mi,
1229 bool aborted)
1230 {
1231 Binlog_relay_IO_param param;
1232 init_param(¶m, mi);
1233 param.server_id= thd->server_id;
1234 param.thread_id= thd->thread_id();
1235
1236 int ret= 0;
1237 FOREACH_OBSERVER(ret, applier_stop, thd, (¶m, aborted));
1238 return ret;
1239 }
1240
before_request_transmit(THD * thd,Master_info * mi,ushort flags)1241 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
1242 Master_info *mi,
1243 ushort flags)
1244 {
1245 Binlog_relay_IO_param param;
1246 init_param(¶m, mi);
1247 param.server_id= thd->server_id;
1248 param.thread_id= thd->thread_id();
1249
1250 int ret= 0;
1251 FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
1252 return ret;
1253 }
1254
after_read_event(THD * thd,Master_info * mi,const char * packet,ulong len,const char ** event_buf,ulong * event_len)1255 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
1256 const char *packet, ulong len,
1257 const char **event_buf,
1258 ulong *event_len)
1259 {
1260 Binlog_relay_IO_param param;
1261 init_param(¶m, mi);
1262 param.server_id= thd->server_id;
1263 param.thread_id= thd->thread_id();
1264
1265 int ret= 0;
1266 FOREACH_OBSERVER(ret, after_read_event, thd,
1267 (¶m, packet, len, event_buf, event_len));
1268 return ret;
1269 }
1270
after_queue_event(THD * thd,Master_info * mi,const char * event_buf,ulong event_len,bool synced)1271 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
1272 const char *event_buf,
1273 ulong event_len,
1274 bool synced)
1275 {
1276 Binlog_relay_IO_param param;
1277 init_param(¶m, mi);
1278 param.server_id= thd->server_id;
1279 param.thread_id= thd->thread_id();
1280
1281 uint32 flags=0;
1282 if (synced)
1283 flags |= BINLOG_STORAGE_IS_SYNCED;
1284
1285 int ret= 0;
1286 FOREACH_OBSERVER(ret, after_queue_event, thd,
1287 (¶m, event_buf, event_len, flags));
1288 return ret;
1289 }
1290
after_reset_slave(THD * thd,Master_info * mi)1291 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
1292
1293 {
1294 Binlog_relay_IO_param param;
1295 init_param(¶m, mi);
1296 param.server_id= thd->server_id;
1297 param.thread_id= thd->thread_id();
1298
1299 int ret= 0;
1300 FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
1301 return ret;
1302 }
1303 #endif /* HAVE_REPLICATION */
1304
register_trans_observer(Trans_observer * observer,void * p)1305 int register_trans_observer(Trans_observer *observer, void *p)
1306 {
1307 return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
1308 }
1309
unregister_trans_observer(Trans_observer * observer,void * p)1310 int unregister_trans_observer(Trans_observer *observer, void *p)
1311 {
1312 return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
1313 }
1314
register_binlog_storage_observer(Binlog_storage_observer * observer,void * p)1315 int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
1316 {
1317 DBUG_ENTER("register_binlog_storage_observer");
1318 int result= binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
1319 DBUG_RETURN(result);
1320 }
1321
unregister_binlog_storage_observer(Binlog_storage_observer * observer,void * p)1322 int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
1323 {
1324 return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
1325 }
1326
register_server_state_observer(Server_state_observer * observer,void * plugin_var)1327 int register_server_state_observer(Server_state_observer *observer, void *plugin_var)
1328 {
1329 DBUG_ENTER("register_server_state_observer");
1330 int result= server_state_delegate->add_observer(observer, (st_plugin_int *)plugin_var);
1331 DBUG_RETURN(result);
1332 }
1333
unregister_server_state_observer(Server_state_observer * observer,void * plugin_var)1334 int unregister_server_state_observer(Server_state_observer *observer, void *plugin_var)
1335 {
1336 DBUG_ENTER("unregister_server_state_observer");
1337 int result= server_state_delegate->remove_observer(observer, (st_plugin_int *)plugin_var);
1338 DBUG_RETURN(result);
1339 }
1340
1341 #ifdef HAVE_REPLICATION
register_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)1342 int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
1343 {
1344 return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
1345 }
1346
unregister_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)1347 int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
1348 {
1349 return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
1350 }
1351
register_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)1352 int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
1353 {
1354 return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
1355 }
1356
unregister_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)1357 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
1358 {
1359 return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
1360 }
1361 #endif /* HAVE_REPLICATION */
1362