1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "rpl_handler.h"
24 
25 #include "debug_sync.h"        // DEBUG_SYNC
26 #include "log.h"               // sql_print_error
27 #include "replication.h"       // Trans_param
28 #include "rpl_mi.h"            // Master_info
29 #include "sql_class.h"         // THD
30 #include "sql_plugin.h"        // plugin_int_to_ref
31 
32 
33 #include <vector>
34 
35 Trans_delegate *transaction_delegate;
36 Binlog_storage_delegate *binlog_storage_delegate;
37 Server_state_delegate *server_state_delegate;
38 
39 #ifdef HAVE_REPLICATION
40 Binlog_transmit_delegate *binlog_transmit_delegate;
41 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
42 #endif /* HAVE_REPLICATION */
43 
44 bool opt_replication_optimize_for_static_plugin_config= 0;
45 bool opt_replication_sender_observe_commit_only= 0;
46 int32 opt_atomic_replication_sender_observe_commit_only= 0;
47 
Observer_info(void * ob,st_plugin_int * p)48 Observer_info::Observer_info(void *ob, st_plugin_int *p)
49   : observer(ob), plugin_int(p)
50 {
51   plugin= plugin_int_to_ref(plugin_int);
52 }
53 
Delegate(PSI_rwlock_key key)54 Delegate::Delegate(
55 #ifdef HAVE_PSI_INTERFACE
56     PSI_rwlock_key key
57 #endif
58 )
59 {
60   inited= FALSE;
61   my_atomic_fas32(&m_configured_lock_type,
62                   opt_replication_optimize_for_static_plugin_config
63                       ? DELEGATE_SPIN_LOCK
64                       : DELEGATE_OS_LOCK);
65   my_atomic_store32(&m_acquired_locks, 0);
66 #ifdef HAVE_PSI_INTERFACE
67   if (mysql_rwlock_init(key, &lock)) return;
68 #else
69   if (mysql_rwlock_init(0, &lock)) return;
70 #endif
71   init_sql_alloc(key_memory_delegate, &memroot, 1024, 0);
72   inited= TRUE;
73 }
74 
~Delegate()75 Delegate::~Delegate()
76 {
77   inited= FALSE;
78   mysql_rwlock_destroy(&lock);
79   free_root(&memroot, MYF(0));
80 }
81 
add_observer(void * observer,st_plugin_int * plugin)82 int Delegate::add_observer(void *observer, st_plugin_int *plugin)
83 {
84   int ret= FALSE;
85   if (!inited) return TRUE;
86   write_lock();
87   Observer_info_iterator iter(observer_info_list);
88   Observer_info *info= iter++;
89   while (info && info->observer != observer) info= iter++;
90   if (!info)
91   {
92     info= new Observer_info(observer, plugin);
93     if (!info || observer_info_list.push_back(info, &memroot))
94       ret= TRUE;
95     else if (this->use_spin_lock_type())
96       acquire_plugin_ref_count(info);
97   }
98   else
99     ret= TRUE;
100   unlock();
101   return ret;
102 }
103 
remove_observer(void * observer,st_plugin_int * plugin)104 int Delegate::remove_observer(void *observer, st_plugin_int *plugin)
105 {
106   int ret= FALSE;
107   if (!inited) return TRUE;
108   write_lock();
109   Observer_info_iterator iter(observer_info_list);
110   Observer_info *info= iter++;
111   while (info && info->observer != observer) info= iter++;
112   if (info)
113   {
114     iter.remove();
115     delete info;
116   }
117   else
118     ret= TRUE;
119   unlock();
120   return ret;
121 }
122 
observer_info_iter()123 Delegate::Observer_info_iterator Delegate::observer_info_iter()
124 {
125   return Observer_info_iterator(observer_info_list);
126 }
127 
is_empty()128 bool Delegate::is_empty()
129 {
130   DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
131   return observer_info_list.is_empty();
132 }
133 
read_lock()134 int Delegate::read_lock()
135 {
136   if (!inited) return 1;
137   this->lock_it(DELEGATE_LOCK_MODE_SHARED);
138   return 0;
139 }
140 
write_lock()141 int Delegate::write_lock()
142 {
143   if (!inited) return 1;
144   this->lock_it(DELEGATE_LOCK_MODE_EXCLUSIVE);
145   return 0;
146 }
147 
unlock()148 int Delegate::unlock()
149 {
150   if (!inited) return 1;
151 
152   int result= 0;
153 
154   if (my_atomic_load32(&m_acquired_locks) > 0)
155   {
156     my_atomic_add32(&m_acquired_locks, -DELEGATE_SPIN_LOCK);
157     if (m_spin_lock.is_exclusive_acquisition())
158       m_spin_lock.release_exclusive();
159     else
160     {
161       assert(m_spin_lock.is_shared_acquisition());
162       m_spin_lock.release_shared();
163     }
164   }
165   else
166   {
167     assert(my_atomic_load32(&m_acquired_locks) < 0);
168     my_atomic_add32(&m_acquired_locks, -DELEGATE_OS_LOCK);
169     result= mysql_rwlock_unlock(&lock);
170   }
171 
172   return result;
173 }
174 
is_inited()175 bool Delegate::is_inited() { return inited; }
176 
update_lock_type()177 void Delegate::update_lock_type()
178 {
179   if (!inited) return;
180 
181   int32 opt_value= opt_replication_optimize_for_static_plugin_config
182                        ? DELEGATE_SPIN_LOCK
183                        : DELEGATE_OS_LOCK;
184   my_atomic_fas32(&m_configured_lock_type, opt_value);
185 }
186 
update_plugin_ref_count()187 void Delegate::update_plugin_ref_count()
188 {
189   if (!inited) return;
190   int32 opt_value= opt_replication_optimize_for_static_plugin_config
191                        ? DELEGATE_SPIN_LOCK
192                        : DELEGATE_OS_LOCK;
193   int32 intern_value= my_atomic_load32(&m_configured_lock_type);
194 
195   if (intern_value == DELEGATE_SPIN_LOCK && opt_value == DELEGATE_OS_LOCK)
196   {
197     for (std::map<plugin_ref, size_t>::iterator ref=
198              m_acquired_references.begin();
199          ref != m_acquired_references.end(); ++ref)
200     {
201       for (size_t count= ref->second; count != 0; --count)
202         plugin_unlock(NULL, ref->first);
203     }
204     m_acquired_references.clear();
205   }
206   else if (intern_value == DELEGATE_OS_LOCK && opt_value == DELEGATE_SPIN_LOCK)
207   {
208     Observer_info_iterator iter= observer_info_iter();
209     for (Observer_info *info= iter++; info; info= iter++)
210     {
211       acquire_plugin_ref_count(info);
212     }
213   }
214 }
215 
use_rw_lock_type()216 bool Delegate::use_rw_lock_type()
217 {
218   return my_atomic_load32(&m_acquired_locks) <
219              0 ||  // If there are acquisitions using the read-write lock
220          (my_atomic_load32(&m_configured_lock_type) ==
221               DELEGATE_OS_LOCK &&  // or the lock type has been set to use the
222                                    // read-write lock
223           my_atomic_load32(&m_acquired_locks) ==
224               0);  // and there are no outstanding acquisitions using shared
225                    // spin-lock, use the read-write lock
226 }
227 
use_spin_lock_type()228 bool Delegate::use_spin_lock_type()
229 {
230   return my_atomic_load32(&m_acquired_locks) >
231              0 ||  // If there are acquisitions using the shared spin-lock
232          (my_atomic_load32(&m_configured_lock_type) ==
233               DELEGATE_SPIN_LOCK &&  // or the lock type has been set to use the
234                                      // shared spin-lock
235           my_atomic_load32(&m_acquired_locks) ==
236               0);  // and there are no outstanding acquisitions using read-write
237                    // lock, use the shared spin-lock
238 }
239 
acquire_plugin_ref_count(Observer_info * info)240 void Delegate::acquire_plugin_ref_count(Observer_info *info)
241 {
242   plugin_ref internal_ref= plugin_lock(NULL, &info->plugin);
243   ++(m_acquired_references[internal_ref]);
244 }
245 
lock_it(enum_delegate_lock_mode mode)246 void Delegate::lock_it(enum_delegate_lock_mode mode)
247 {
248   do
249   {
250     if (this->use_spin_lock_type())
251     {
252       if (mode == DELEGATE_LOCK_MODE_SHARED)
253         m_spin_lock.acquire_shared();
254       else
255         m_spin_lock.acquire_exclusive();
256 
257       if (my_atomic_load32(&m_configured_lock_type) !=
258           DELEGATE_SPIN_LOCK)  // Lock type changed in the meanwhile, lets
259                                // revert the acquisition and try again
260       {
261         if (mode == DELEGATE_LOCK_MODE_SHARED)
262           m_spin_lock.release_shared();
263         else
264           m_spin_lock.release_exclusive();
265       }
266       else
267       {
268         my_atomic_add32(&m_acquired_locks, DELEGATE_SPIN_LOCK);
269         break;
270       }
271     }
272     if (this->use_rw_lock_type())
273     {
274       if (mode == DELEGATE_LOCK_MODE_SHARED)
275         mysql_rwlock_rdlock(&lock);
276       else
277         mysql_rwlock_wrlock(&lock);
278 
279       if (my_atomic_load32(&m_configured_lock_type) !=
280           DELEGATE_OS_LOCK)  // Lock type changed in the meanwhile, lets revert
281                              // the acquisition and try again
282         mysql_rwlock_unlock(&lock);
283       else
284       {
285         my_atomic_add32(&m_acquired_locks, DELEGATE_OS_LOCK);
286         break;
287       }
288     }
289   } while (true);
290 }
291 
292 /*
293   structure to save transaction log filename and position
294 */
295 typedef struct Trans_binlog_info {
296   my_off_t log_pos;
297   char log_file[FN_REFLEN];
298 } Trans_binlog_info;
299 
get_user_var_int(const char * name,long long int * value,int * null_value)300 int get_user_var_int(const char *name,
301                      long long int *value, int *null_value)
302 {
303   my_bool null_val;
304   THD *thd= current_thd;
305 
306   /* Protects thd->user_vars. */
307   mysql_mutex_lock(&thd->LOCK_thd_data);
308 
309   user_var_entry *entry=
310     (user_var_entry*) my_hash_search(&thd->user_vars,
311                                   (uchar*) name, strlen(name));
312   if (!entry)
313   {
314     mysql_mutex_unlock(&thd->LOCK_thd_data);
315     return 1;
316   }
317   *value= entry->val_int(&null_val);
318   if (null_value)
319     *null_value= null_val;
320   mysql_mutex_unlock(&thd->LOCK_thd_data);
321   return 0;
322 }
323 
get_user_var_real(const char * name,double * value,int * null_value)324 int get_user_var_real(const char *name,
325                       double *value, int *null_value)
326 {
327   my_bool null_val;
328   THD *thd= current_thd;
329 
330   /* Protects thd->user_vars. */
331   mysql_mutex_lock(&thd->LOCK_thd_data);
332 
333   user_var_entry *entry=
334     (user_var_entry*) my_hash_search(&thd->user_vars,
335                                   (uchar*) name, strlen(name));
336   if (!entry)
337   {
338     mysql_mutex_unlock(&thd->LOCK_thd_data);
339     return 1;
340   }
341   *value= entry->val_real(&null_val);
342   if (null_value)
343     *null_value= null_val;
344   mysql_mutex_unlock(&thd->LOCK_thd_data);
345   return 0;
346 }
347 
get_user_var_str(const char * name,char * value,size_t len,unsigned int precision,int * null_value)348 int get_user_var_str(const char *name, char *value,
349                      size_t len, unsigned int precision, int *null_value)
350 {
351   String str;
352   my_bool null_val;
353   THD *thd= current_thd;
354 
355   /* Protects thd->user_vars. */
356   mysql_mutex_lock(&thd->LOCK_thd_data);
357 
358   user_var_entry *entry=
359     (user_var_entry*) my_hash_search(&thd->user_vars,
360                                   (uchar*) name, strlen(name));
361   if (!entry)
362   {
363     mysql_mutex_unlock(&thd->LOCK_thd_data);
364     return 1;
365   }
366   entry->val_str(&null_val, &str, precision);
367   strncpy(value, str.c_ptr(), len);
368   if (null_value)
369     *null_value= null_val;
370   mysql_mutex_unlock(&thd->LOCK_thd_data);
371   return 0;
372 }
373 
delegates_init()374 int delegates_init()
375 {
376   static my_aligned_storage<sizeof(Trans_delegate),
377                             MY_ALIGNOF(longlong)> trans_mem;
378   static my_aligned_storage<sizeof(Binlog_storage_delegate),
379                             MY_ALIGNOF(longlong)> storage_mem;
380   static my_aligned_storage<sizeof(Server_state_delegate),
381                             MY_ALIGNOF(longlong)> server_state_mem;
382 #ifdef HAVE_REPLICATION
383   static my_aligned_storage<sizeof(Binlog_transmit_delegate),
384                             MY_ALIGNOF(longlong)> transmit_mem;
385   static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
386                             MY_ALIGNOF(longlong)> relay_io_mem;
387 #endif
388 
389   void *place_trans_mem= trans_mem.data;
390   void *place_storage_mem= storage_mem.data;
391   void *place_state_mem= server_state_mem.data;
392 
393   transaction_delegate= new (place_trans_mem) Trans_delegate;
394 
395   if (!transaction_delegate->is_inited())
396   {
397     sql_print_error("Initialization of transaction delegates failed. "
398                     "Please report a bug.");
399     return 1;
400   }
401 
402   binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
403 
404   if (!binlog_storage_delegate->is_inited())
405   {
406     sql_print_error("Initialization binlog storage delegates failed. "
407                     "Please report a bug.");
408     return 1;
409   }
410 
411   server_state_delegate= new (place_state_mem) Server_state_delegate;
412 
413 #ifdef HAVE_REPLICATION
414   void *place_transmit_mem= transmit_mem.data;
415   void *place_relay_io_mem= relay_io_mem.data;
416 
417   binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
418 
419   if (!binlog_transmit_delegate->is_inited())
420   {
421     sql_print_error("Initialization of binlog transmit delegates failed. "
422                     "Please report a bug.");
423     return 1;
424   }
425 
426   binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
427 
428   if (!binlog_relay_io_delegate->is_inited())
429   {
430     sql_print_error("Initialization binlog relay IO delegates failed. "
431                     "Please report a bug.");
432     return 1;
433   }
434 #endif
435 
436   return 0;
437 }
438 
delegates_shutdown()439 void delegates_shutdown()
440 {
441   if (opt_replication_optimize_for_static_plugin_config)
442   {
443     opt_replication_optimize_for_static_plugin_config= false;
444     delegates_acquire_locks();
445     delegates_update_lock_type();
446     delegates_release_locks();
447   }
448 }
449 
delegates_destroy()450 void delegates_destroy()
451 {
452   if (transaction_delegate)
453     transaction_delegate->~Trans_delegate();
454   if (binlog_storage_delegate)
455     binlog_storage_delegate->~Binlog_storage_delegate();
456   if (server_state_delegate)
457     server_state_delegate->~Server_state_delegate();
458 #ifdef HAVE_REPLICATION
459   if (binlog_transmit_delegate)
460     binlog_transmit_delegate->~Binlog_transmit_delegate();
461   if (binlog_relay_io_delegate)
462     binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
463 #endif /* HAVE_REPLICATION */
464 }
465 
delegates_update_plugin_ref_count()466 static void delegates_update_plugin_ref_count()
467 {
468   if (transaction_delegate)
469     transaction_delegate->update_plugin_ref_count();
470   if (binlog_storage_delegate)
471     binlog_storage_delegate->update_plugin_ref_count();
472   if (server_state_delegate)
473     server_state_delegate->update_plugin_ref_count();
474 #ifdef HAVE_REPLICATION
475   if (binlog_transmit_delegate)
476     binlog_transmit_delegate->update_plugin_ref_count();
477   if (binlog_relay_io_delegate)
478     binlog_relay_io_delegate->update_plugin_ref_count();
479 #endif /* HAVE_REPLICATION */
480 }
481 
delegates_acquire_locks()482 void delegates_acquire_locks()
483 {
484   if (transaction_delegate)
485     transaction_delegate->write_lock();
486   if (binlog_storage_delegate)
487     binlog_storage_delegate->write_lock();
488   if (server_state_delegate)
489     server_state_delegate->write_lock();
490 #ifdef HAVE_REPLICATION
491   if (binlog_transmit_delegate)
492     binlog_transmit_delegate->write_lock();
493   if (binlog_relay_io_delegate)
494     binlog_relay_io_delegate->write_lock();
495 #endif /* HAVE_REPLICATION */
496 }
497 
delegates_release_locks()498 void delegates_release_locks()
499 {
500   if (transaction_delegate)
501     transaction_delegate->unlock();
502   if (binlog_storage_delegate)
503     binlog_storage_delegate->unlock();
504   if (server_state_delegate)
505     server_state_delegate->unlock();
506 #ifdef HAVE_REPLICATION
507   if (binlog_transmit_delegate)
508     binlog_transmit_delegate->unlock();
509   if (binlog_relay_io_delegate)
510     binlog_relay_io_delegate->unlock();
511 #endif /* HAVE_REPLICATION */
512 }
513 
delegates_update_lock_type()514 void delegates_update_lock_type()
515 {
516   delegates_update_plugin_ref_count();
517 
518   if (transaction_delegate)
519     transaction_delegate->update_lock_type();
520   if (binlog_storage_delegate)
521     binlog_storage_delegate->update_lock_type();
522   if (server_state_delegate)
523     server_state_delegate->update_lock_type();
524 #ifdef HAVE_REPLICATION
525   if (binlog_transmit_delegate)
526     binlog_transmit_delegate->update_lock_type();
527   if (binlog_relay_io_delegate)
528     binlog_relay_io_delegate->update_lock_type();
529 #endif /* HAVE_REPLICATION */
530 }
531 
532 /*
533   This macro is used by almost all the Delegate methods to iterate
534   over all the observers running given callback function of the
535   delegate .
536 
537   Add observer plugins to the thd->lex list, after each statement, all
538   plugins add to thd->lex will be automatically unlocked.
539  */
540 #define FOREACH_OBSERVER(r, f, thd, args)                              \
541   /*                                                                   \
542      Use a struct to make sure that they are allocated adjacent, check \
543      delete_dynamic().                                                 \
544   */                                                                   \
545   Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED);       \
546   read_lock();                                                         \
547   Observer_info_iterator iter= observer_info_iter();                   \
548   Observer_info *info= iter++;                                         \
549   for (; info; info= iter++)                                           \
550   {                                                                    \
551     bool replication_optimize_for_static_plugin_config=                \
552         this->use_spin_lock_type();                                    \
553     plugin_ref plugin= (replication_optimize_for_static_plugin_config  \
554                             ? info->plugin                             \
555                             : my_plugin_lock(0, &info->plugin));       \
556     if (!plugin)                                                       \
557     {                                                                  \
558       /* plugin is not intialized or deleted, this is not an error */  \
559       r= 0;                                                            \
560       break;                                                           \
561     }                                                                  \
562     if (!replication_optimize_for_static_plugin_config)                \
563       plugins.push_back(plugin);                                       \
564     if (((Observer *)info->observer)->f &&                             \
565         ((Observer *)info->observer)->f args)                          \
566     {                                                                  \
567       r= 1;                                                            \
568       sql_print_error("Run function '" #f "' in plugin '%s' failed",   \
569                       info->plugin_int->name.str);                     \
570       break;                                                           \
571     }                                                                  \
572   }                                                                    \
573   unlock();                                                            \
574   /*                                                                   \
575      Unlock plugins should be done after we released the Delegate lock \
576      to avoid possible deadlock when this is the last user of the      \
577      plugin, and when we unlock the plugin, it will try to             \
578      deinitialize the plugin, which will try to lock the Delegate in   \
579      order to remove the observers.                                    \
580   */                                                                   \
581   if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
582 
583 #define FOREACH_OBSERVER_ERROR_OUT(r, f, thd, args, out)               \
584   /*                                                                   \
585      Use a struct to make sure that they are allocated adjacent, check \
586      delete_dynamic().                                                 \
587   */                                                                   \
588   Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED);       \
589   read_lock();                                                         \
590   Observer_info_iterator iter= observer_info_iter();                   \
591   Observer_info *info= iter++;                                         \
592                                                                        \
593   int error_out= 0;                                                    \
594   for (; info; info= iter++)                                           \
595   {                                                                    \
596     bool replication_optimize_for_static_plugin_config=                \
597         this->use_spin_lock_type();                                    \
598     plugin_ref plugin= (replication_optimize_for_static_plugin_config  \
599                             ? info->plugin                             \
600                             : my_plugin_lock(0, &info->plugin));       \
601     if (!plugin)                                                       \
602     {                                                                  \
603       /* plugin is not intialized or deleted, this is not an error */  \
604       r= 0;                                                            \
605       break;                                                           \
606     }                                                                  \
607     if (!replication_optimize_for_static_plugin_config)                \
608       plugins.push_back(plugin);                                       \
609                                                                        \
610     bool hook_error= false;                                            \
611     hook_error= ((Observer *)info->observer)->f(args, error_out);      \
612                                                                        \
613     out+= error_out;                                                   \
614     if (hook_error)                                                    \
615     {                                                                  \
616       r= 1;                                                            \
617       sql_print_error("Run function '" #f "' in plugin '%s' failed",   \
618                       info->plugin_int->name.str);                     \
619       break;                                                           \
620     }                                                                  \
621   }                                                                    \
622   unlock();                                                            \
623   /*                                                                   \
624      Unlock plugins should be done after we released the Delegate lock \
625      to avoid possible deadlock when this is the last user of the      \
626      plugin, and when we unlock the plugin, it will try to             \
627      deinitialize the plugin, which will try to lock the Delegate in   \
628      order to remove the observers.                                    \
629   */                                                                   \
630   if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
631 
before_commit(THD * thd,bool all,IO_CACHE * trx_cache_log,IO_CACHE * stmt_cache_log,ulonglong cache_log_max_size)632 int Trans_delegate::before_commit(THD *thd, bool all,
633                                   IO_CACHE *trx_cache_log,
634                                   IO_CACHE *stmt_cache_log,
635                                   ulonglong cache_log_max_size)
636 {
637   DBUG_ENTER("Trans_delegate::before_commit");
638   Trans_param param;
639   TRANS_PARAM_ZERO(param);
640   param.server_id= thd->server_id;
641   param.server_uuid= server_uuid;
642   param.thread_id= thd->thread_id();
643   param.gtid_info.type= thd->variables.gtid_next.type;
644   param.gtid_info.sidno= thd->variables.gtid_next.gtid.sidno;
645   param.gtid_info.gno= thd->variables.gtid_next.gtid.gno;
646   param.trx_cache_log= trx_cache_log;
647   param.stmt_cache_log= stmt_cache_log;
648   param.cache_log_max_size= cache_log_max_size;
649   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
650 
651   bool is_real_trans=
652     (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
653   if (is_real_trans)
654     param.flags|= TRANS_IS_REAL_TRANS;
655 
656   int ret= 0;
657   FOREACH_OBSERVER(ret, before_commit, thd, (&param));
658   DBUG_RETURN(ret);
659 }
660 
661 /**
662  Helper method to check if the given table has 'CASCADE' foreign key or not.
663 
664  @param[in]   TABLE     Table object that needs to be verified.
665  @param[in]   THD       Current execution thread.
666 
667  @return bool TRUE      If the table has 'CASCADE' foreign key.
668               FALSE     If the table does not have 'CASCADE' foreign key.
669 */
has_cascade_foreign_key(TABLE * table,THD * thd)670 bool has_cascade_foreign_key(TABLE *table, THD *thd)
671 {
672   DBUG_ENTER("has_cascade_foreign_key");
673   List<FOREIGN_KEY_INFO> f_key_list;
674   table->file->get_foreign_key_list(thd, &f_key_list);
675 
676   FOREIGN_KEY_INFO *f_key_info;
677   List_iterator_fast<FOREIGN_KEY_INFO> foreign_key_iterator(f_key_list);
678   while ((f_key_info=foreign_key_iterator++))
679   {
680     /*
681      The possible values for update_method are
682      {"CASCADE", "SET NULL", "NO ACTION", "RESTRICT"}.
683 
684      Hence we are avoiding the usage of strncmp
685      ("'update_method' value with 'CASCADE' or 'SET NULL'") and just comparing
686      the first character of the update_method value with 'C' or 'S'.
687     */
688     if (f_key_info->update_method->str[0] == 'C' ||
689         f_key_info->delete_method->str[0] == 'C' ||
690         f_key_info->update_method->str[0] == 'S' ||
691         f_key_info->delete_method->str[0] == 'S')
692     {
693       assert(!strncmp(f_key_info->update_method->str, "CASCADE", 7) ||
694              !strncmp(f_key_info->delete_method->str, "CASCADE", 7) ||
695              !strncmp(f_key_info->update_method->str, "SET NUL", 7) ||
696              !strncmp(f_key_info->delete_method->str, "SET NUL", 7));
697       DBUG_RETURN(TRUE);
698     }
699   }
700   DBUG_RETURN(FALSE);
701 }
702 
703 /**
704  Helper method to create table information for the hook call
705  */
706 void
prepare_table_info(THD * thd,Trans_table_info * & table_info_list,uint & number_of_tables)707 Trans_delegate::prepare_table_info(THD* thd,
708                                    Trans_table_info*& table_info_list,
709                                    uint& number_of_tables)
710 {
711   DBUG_ENTER("Trans_delegate::prepare_table_info");
712 
713   TABLE* open_tables= thd->open_tables;
714 
715   // Fail if tables are not open
716   if(open_tables == NULL)
717   {
718     DBUG_VOID_RETURN;
719   }
720 
721   //Gather table information
722   std::vector<Trans_table_info> table_info_holder;
723   for(; open_tables != NULL; open_tables= open_tables->next)
724   {
725     Trans_table_info table_info = {0,0,0,0};
726 
727     if (open_tables->no_replicate)
728     {
729       continue;
730     }
731 
732     table_info.table_name= open_tables->s->table_name.str;
733 
734     uint primary_keys= 0;
735     if(open_tables->key_info != NULL && (open_tables->s->primary_key < MAX_KEY))
736     {
737       primary_keys= open_tables->s->primary_key;
738 
739       //if primary keys is still 0, lets double check on another var
740       if(primary_keys == 0)
741       {
742         primary_keys= open_tables->key_info->user_defined_key_parts;
743       }
744     }
745 
746     table_info.number_of_primary_keys= primary_keys;
747 
748     table_info.db_type= open_tables->s->db_type()->db_type;
749 
750     /*
751       Find out if the table has foreign key with ON UPDATE/DELETE CASCADE
752       clause.
753     */
754     table_info.has_cascade_foreign_key= has_cascade_foreign_key(open_tables, thd);
755 
756     table_info_holder.push_back(table_info);
757   }
758 
759   //Now that one has all the information, one should build the
760   // data that will be delivered to the plugin
761   if(table_info_holder.size() > 0)
762   {
763     number_of_tables= table_info_holder.size();
764 
765     table_info_list= (Trans_table_info*)my_malloc(
766                                PSI_NOT_INSTRUMENTED,
767                                number_of_tables * sizeof(Trans_table_info),
768                                MYF(0));
769 
770     std::vector<Trans_table_info>::iterator table_info_holder_it
771                                                   = table_info_holder.begin();
772     for(int table= 0;
773         table_info_holder_it != table_info_holder.end();
774         table_info_holder_it++, table++)
775     {
776       table_info_list[table].number_of_primary_keys
777                                 = (*table_info_holder_it).number_of_primary_keys;
778       table_info_list[table].table_name
779                                 = (*table_info_holder_it).table_name;
780       table_info_list[table].db_type
781                                 = (*table_info_holder_it).db_type;
782       table_info_list[table].has_cascade_foreign_key
783                                 = (*table_info_holder_it).has_cascade_foreign_key;
784     }
785   }
786 
787   DBUG_VOID_RETURN;
788 }
789 
790 /**
791   Helper that gathers all table runtime information
792 
793   @param[in]   THD       the current execution thread
794   @param[out]  ctx_info  Trans_context_info in which the result is stored.
795  */
prepare_transaction_context(THD * thd,Trans_context_info & ctx_info)796 void prepare_transaction_context(THD* thd, Trans_context_info& ctx_info)
797 {
798   //Extracting the session value of SQL binlogging
799   ctx_info.binlog_enabled= thd->variables.sql_log_bin;
800 
801   //Extracting the session value of binlog format
802   ctx_info.binlog_format= thd->variables.binlog_format;
803 
804   //Extracting the global mutable value of binlog checksum
805   ctx_info.binlog_checksum_options= binlog_checksum_options;
806 
807   //Extracting the session value of transaction_write_set_extraction
808   ctx_info.transaction_write_set_extraction=
809     thd->variables.transaction_write_set_extraction;
810 
811   //Extracting transaction isolation level
812   ctx_info.tx_isolation= thd->tx_isolation;
813 }
814 
before_dml(THD * thd,int & result)815 int Trans_delegate::before_dml(THD* thd, int& result)
816 {
817   DBUG_ENTER("Trans_delegate::before_dml");
818   Trans_param param;
819   TRANS_PARAM_ZERO(param);
820 
821   param.server_id= thd->server_id;
822   param.server_uuid= server_uuid;
823   param.thread_id= thd->thread_id();
824 
825   prepare_table_info(thd, param.tables_info, param.number_of_tables);
826   prepare_transaction_context(thd, param.trans_ctx_info);
827 
828   int ret= 0;
829   FOREACH_OBSERVER_ERROR_OUT(ret, before_dml, thd, &param, result);
830 
831   my_free(param.tables_info);
832 
833   DBUG_RETURN(ret);
834 }
835 
before_rollback(THD * thd,bool all)836 int Trans_delegate::before_rollback(THD *thd, bool all)
837 {
838   DBUG_ENTER("Trans_delegate::before_rollback");
839   Trans_param param;
840   TRANS_PARAM_ZERO(param);
841   param.server_id= thd->server_id;
842   param.server_uuid= server_uuid;
843   param.thread_id= thd->thread_id();
844   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
845 
846   bool is_real_trans=
847     (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
848   if (is_real_trans)
849     param.flags|= TRANS_IS_REAL_TRANS;
850 
851   int ret= 0;
852   FOREACH_OBSERVER(ret, before_rollback, thd, (&param));
853   DBUG_RETURN(ret);
854 }
855 
after_commit(THD * thd,bool all)856 int Trans_delegate::after_commit(THD *thd, bool all)
857 {
858   DBUG_ENTER("Trans_delegate::after_commit");
859   Trans_param param;
860   TRANS_PARAM_ZERO(param);
861   param.server_uuid= server_uuid;
862   param.thread_id= thd->thread_id();
863   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
864 
865   bool is_real_trans=
866     (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
867   if (is_real_trans)
868     param.flags|= TRANS_IS_REAL_TRANS;
869 
870   thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
871   param.server_id= thd->server_id;
872 
873   DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
874   DEBUG_SYNC(thd, "before_call_after_commit_observer");
875 
876   int ret= 0;
877   FOREACH_OBSERVER(ret, after_commit, thd, (&param));
878   DBUG_RETURN(ret);
879 }
880 
after_rollback(THD * thd,bool all)881 int Trans_delegate::after_rollback(THD *thd, bool all)
882 {
883   DBUG_ENTER("Trans_delegate::after_rollback");
884   Trans_param param;
885   TRANS_PARAM_ZERO(param);
886   param.server_uuid= server_uuid;
887   param.thread_id= thd->thread_id();
888   param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type();
889 
890   bool is_real_trans=
891     (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION));
892   if (is_real_trans)
893     param.flags|= TRANS_IS_REAL_TRANS;
894   thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
895   param.server_id= thd->server_id;
896 
897   int ret= 0;
898   FOREACH_OBSERVER(ret, after_rollback, thd, (&param));
899   DBUG_RETURN(ret);
900 }
901 
after_flush(THD * thd,const char * log_file,my_off_t log_pos)902 int Binlog_storage_delegate::after_flush(THD *thd,
903                                          const char *log_file,
904                                          my_off_t log_pos)
905 {
906   DBUG_ENTER("Binlog_storage_delegate::after_flush");
907   DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
908                        log_file, (ulonglong) log_pos));
909   Binlog_storage_param param;
910   param.server_id= thd->server_id;
911 
912   int ret= 0;
913   FOREACH_OBSERVER(ret, after_flush, thd, (&param, log_file, log_pos));
914   DBUG_RETURN(ret);
915 }
916 
917 /**
918   * This hook MUST be invoked after ALL recovery operations are performed
919   * and the server is ready to serve clients.
920   *
921   * @param[in] thd The thread context.
922   * @return 0 on success, >0 otherwise.
923 */
before_handle_connection(THD * thd)924 int Server_state_delegate::before_handle_connection(THD *thd)
925 {
926   DBUG_ENTER("Server_state_delegate::before_client_connection");
927   Server_state_param param;
928 
929   int ret= 0;
930   FOREACH_OBSERVER(ret, before_handle_connection, thd, (&param));
931   DBUG_RETURN(ret);
932 
933 }
934 
935 /**
936   * This hook MUST be invoked before ANY recovery action is started.
937   *
938   * @param[in] thd The thread context.
939   * @return 0 on success, >0 otherwise.
940 */
before_recovery(THD * thd)941 int Server_state_delegate::before_recovery(THD *thd)
942 {
943   DBUG_ENTER("Server_state_delegate::before_recovery");
944   Server_state_param param;
945 
946   int ret= 0;
947   FOREACH_OBSERVER(ret, before_recovery, thd, (&param));
948   DBUG_RETURN(ret);
949 }
950 
951 /**
952   * This hook MUST be invoked after the recovery from the engine
953   * is complete.
954   *
955   * @param[in] thd The thread context.
956   * @return 0 on success, >0 otherwise.
957 */
after_engine_recovery(THD * thd)958 int Server_state_delegate::after_engine_recovery(THD *thd)
959 {
960   DBUG_ENTER("Server_state_delegate::after_engine_recovery");
961   Server_state_param param;
962 
963   int ret= 0;
964   FOREACH_OBSERVER(ret, after_engine_recovery, thd, (&param));
965   DBUG_RETURN(ret);
966 
967 }
968 
969 /**
970   * This hook MUST be invoked after the server has completed the
971   * local recovery. The server can proceed with the further operations
972   * like engaging in distributed recovery etc.
973   *
974   * @param[in] thd The thread context.
975   * @return 0 on success, >0 otherwise.
976 */
after_recovery(THD * thd)977 int Server_state_delegate::after_recovery(THD *thd)
978 {
979   DBUG_ENTER("Server_state_delegate::after_recovery");
980   Server_state_param param;
981 
982   int ret= 0;
983   FOREACH_OBSERVER(ret, after_recovery, thd, (&param));
984   DBUG_RETURN(ret);
985 }
986 
987 /**
988   * This hook MUST be invoked before server shutdown action is
989   * initiated.
990   *
991   * @param[in] thd The thread context.
992   * @return 0 on success, >0 otherwise.
993 */
before_server_shutdown(THD * thd)994 int Server_state_delegate::before_server_shutdown(THD *thd)
995 {
996   DBUG_ENTER("Server_state_delegate::before_server_shutdown");
997   Server_state_param param;
998 
999   int ret= 0;
1000   FOREACH_OBSERVER(ret, before_server_shutdown, thd, (&param));
1001   DBUG_RETURN(ret);
1002 }
1003 
1004 /**
1005   * This hook MUST be invoked after server shutdown operation is
1006   * complete.
1007   *
1008   * @param[in] thd The thread context.
1009   * @return 0 on success, >0 otherwise.
1010 */
after_server_shutdown(THD * thd)1011 int Server_state_delegate::after_server_shutdown(THD *thd)
1012 {
1013   DBUG_ENTER("Server_state_delegate::after_server_shutdown");
1014   Server_state_param param;
1015 
1016   int ret= 0;
1017   FOREACH_OBSERVER(ret, after_server_shutdown, thd, (&param));
1018   DBUG_RETURN(ret);
1019 }
1020 
after_sync(THD * thd,const char * log_file,my_off_t log_pos)1021 int Binlog_storage_delegate::after_sync(THD *thd,
1022                                         const char *log_file,
1023                                         my_off_t log_pos)
1024 {
1025   DBUG_ENTER("Binlog_storage_delegate::after_sync");
1026   DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
1027                        log_file, (ulonglong) log_pos));
1028   Binlog_storage_param param;
1029   param.server_id= thd->server_id;
1030 
1031   assert(log_pos != 0);
1032   int ret= 0;
1033   FOREACH_OBSERVER(ret, after_sync, thd, (&param, log_file, log_pos));
1034 
1035   DEBUG_SYNC(thd, "after_call_after_sync_observer");
1036   DBUG_RETURN(ret);
1037 }
1038 
1039 #ifdef HAVE_REPLICATION
transmit_start(THD * thd,ushort flags,const char * log_file,my_off_t log_pos,bool * observe_transmission)1040 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
1041                                              const char *log_file,
1042                                              my_off_t log_pos,
1043                                              bool *observe_transmission)
1044 {
1045   Binlog_transmit_param param;
1046   param.flags= flags;
1047   param.server_id= thd->server_id;
1048 
1049   int ret= 0;
1050   FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
1051   *observe_transmission= param.should_observe();
1052   return ret;
1053 }
1054 
transmit_stop(THD * thd,ushort flags)1055 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
1056 {
1057   Binlog_transmit_param param;
1058   param.flags= flags;
1059   param.server_id= thd->server_id;
1060 
1061   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1062 
1063   int ret= 0;
1064   FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
1065   return ret;
1066 }
1067 
reserve_header(THD * thd,ushort flags,String * packet)1068 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
1069                                              String *packet)
1070 {
1071   /* NOTE2ME: Maximum extra header size for each observer, I hope 32
1072      bytes should be enough for each Observer to reserve their extra
1073      header. If later found this is not enough, we can increase this
1074      /HEZX
1075   */
1076 #define RESERVE_HEADER_SIZE 32
1077   unsigned char header[RESERVE_HEADER_SIZE];
1078   ulong hlen;
1079   Binlog_transmit_param param;
1080   param.flags= flags;
1081   param.server_id= thd->server_id;
1082 
1083   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1084 
1085   int ret= 0;
1086   read_lock();
1087   Observer_info_iterator iter= observer_info_iter();
1088   Observer_info *info= iter++;
1089   for (; info; info= iter++)
1090   {
1091     bool replication_optimize_for_static_plugin_config=
1092         this->use_spin_lock_type();
1093     plugin_ref plugin= (replication_optimize_for_static_plugin_config
1094                             ? info->plugin
1095                             : my_plugin_lock(thd, &info->plugin));
1096     if (!plugin)
1097     {
1098       ret= 1;
1099       break;
1100     }
1101     hlen= 0;
1102     if (((Observer *)info->observer)->reserve_header
1103         && ((Observer *)info->observer)->reserve_header(&param,
1104                                                         header,
1105                                                         RESERVE_HEADER_SIZE,
1106                                                         &hlen))
1107     {
1108       ret= 1;
1109       if (!replication_optimize_for_static_plugin_config)
1110         plugin_unlock(thd, plugin);
1111       break;
1112     }
1113     if (!replication_optimize_for_static_plugin_config)
1114       plugin_unlock(thd, plugin);
1115     if (hlen == 0)
1116       continue;
1117     if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
1118     {
1119       ret= 1;
1120       break;
1121     }
1122   }
1123   unlock();
1124   return ret;
1125 }
1126 
before_send_event(THD * thd,ushort flags,String * packet,const char * log_file,my_off_t log_pos)1127 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
1128                                                 String *packet,
1129                                                 const char *log_file,
1130                                                 my_off_t log_pos)
1131 {
1132   Binlog_transmit_param param;
1133   param.flags= flags;
1134   param.server_id= thd->server_id;
1135 
1136   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1137 
1138   int ret= 0;
1139   FOREACH_OBSERVER(ret, before_send_event, thd,
1140                    (&param, (uchar *)packet->ptr(),
1141                     packet->length(),
1142                     log_file+dirname_length(log_file), log_pos));
1143   return ret;
1144 }
1145 
after_send_event(THD * thd,ushort flags,String * packet,const char * skipped_log_file,my_off_t skipped_log_pos)1146 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
1147                                                String *packet,
1148                                                const char *skipped_log_file,
1149                                                my_off_t skipped_log_pos)
1150 {
1151   Binlog_transmit_param param;
1152   param.flags= flags;
1153   param.server_id= thd->server_id;
1154 
1155   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
1156 
1157   int ret= 0;
1158   FOREACH_OBSERVER(ret, after_send_event, thd,
1159                    (&param, packet->ptr(), packet->length(),
1160                    skipped_log_file+dirname_length(skipped_log_file),
1161                     skipped_log_pos));
1162   return ret;
1163 }
1164 
after_reset_master(THD * thd,ushort flags)1165 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
1166 
1167 {
1168   Binlog_transmit_param param;
1169   param.flags= flags;
1170   param.server_id= thd->server_id;
1171 
1172   int ret= 0;
1173   FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
1174   return ret;
1175 }
1176 
init_param(Binlog_relay_IO_param * param,Master_info * mi)1177 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
1178                                           Master_info *mi)
1179 {
1180   param->mysql= mi->mysql;
1181   param->channel_name= mi->get_channel();
1182   param->user= const_cast<char *>(mi->get_user());
1183   param->host= mi->host;
1184   param->port= mi->port;
1185   param->master_log_name= const_cast<char *>(mi->get_master_log_name());
1186   param->master_log_pos= mi->get_master_log_pos();
1187 }
1188 
thread_start(THD * thd,Master_info * mi)1189 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
1190 {
1191   Binlog_relay_IO_param param;
1192   init_param(&param, mi);
1193   param.server_id= thd->server_id;
1194   param.thread_id= thd->thread_id();
1195 
1196   int ret= 0;
1197   FOREACH_OBSERVER(ret, thread_start, thd, (&param));
1198   return ret;
1199 }
1200 
1201 
thread_stop(THD * thd,Master_info * mi)1202 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
1203 {
1204 
1205   Binlog_relay_IO_param param;
1206   init_param(&param, mi);
1207   param.server_id= thd->server_id;
1208   param.thread_id= thd->thread_id();
1209 
1210   int ret= 0;
1211   FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
1212   return ret;
1213 }
1214 
applier_start(THD * thd,Master_info * mi)1215 int Binlog_relay_IO_delegate::applier_start(THD *thd, Master_info *mi)
1216 {
1217   Binlog_relay_IO_param param;
1218   init_param(&param, mi);
1219   param.server_id= thd->server_id;
1220   param.thread_id= thd->thread_id();
1221 
1222   int ret= 0;
1223   FOREACH_OBSERVER(ret, applier_start, thd, (&param));
1224   return ret;
1225 }
1226 
applier_stop(THD * thd,Master_info * mi,bool aborted)1227 int Binlog_relay_IO_delegate::applier_stop(THD *thd,
1228                                            Master_info *mi,
1229                                            bool aborted)
1230 {
1231   Binlog_relay_IO_param param;
1232   init_param(&param, mi);
1233   param.server_id= thd->server_id;
1234   param.thread_id= thd->thread_id();
1235 
1236   int ret= 0;
1237   FOREACH_OBSERVER(ret, applier_stop, thd, (&param, aborted));
1238   return ret;
1239 }
1240 
before_request_transmit(THD * thd,Master_info * mi,ushort flags)1241 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
1242                                                       Master_info *mi,
1243                                                       ushort flags)
1244 {
1245   Binlog_relay_IO_param param;
1246   init_param(&param, mi);
1247   param.server_id= thd->server_id;
1248   param.thread_id= thd->thread_id();
1249 
1250   int ret= 0;
1251   FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
1252   return ret;
1253 }
1254 
after_read_event(THD * thd,Master_info * mi,const char * packet,ulong len,const char ** event_buf,ulong * event_len)1255 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
1256                                                const char *packet, ulong len,
1257                                                const char **event_buf,
1258                                                ulong *event_len)
1259 {
1260   Binlog_relay_IO_param param;
1261   init_param(&param, mi);
1262   param.server_id= thd->server_id;
1263   param.thread_id= thd->thread_id();
1264 
1265   int ret= 0;
1266   FOREACH_OBSERVER(ret, after_read_event, thd,
1267                    (&param, packet, len, event_buf, event_len));
1268   return ret;
1269 }
1270 
after_queue_event(THD * thd,Master_info * mi,const char * event_buf,ulong event_len,bool synced)1271 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
1272                                                 const char *event_buf,
1273                                                 ulong event_len,
1274                                                 bool synced)
1275 {
1276   Binlog_relay_IO_param param;
1277   init_param(&param, mi);
1278   param.server_id= thd->server_id;
1279   param.thread_id= thd->thread_id();
1280 
1281   uint32 flags=0;
1282   if (synced)
1283     flags |= BINLOG_STORAGE_IS_SYNCED;
1284 
1285   int ret= 0;
1286   FOREACH_OBSERVER(ret, after_queue_event, thd,
1287                    (&param, event_buf, event_len, flags));
1288   return ret;
1289 }
1290 
after_reset_slave(THD * thd,Master_info * mi)1291 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
1292 
1293 {
1294   Binlog_relay_IO_param param;
1295   init_param(&param, mi);
1296   param.server_id= thd->server_id;
1297   param.thread_id= thd->thread_id();
1298 
1299   int ret= 0;
1300   FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
1301   return ret;
1302 }
1303 #endif /* HAVE_REPLICATION */
1304 
register_trans_observer(Trans_observer * observer,void * p)1305 int register_trans_observer(Trans_observer *observer, void *p)
1306 {
1307   return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
1308 }
1309 
unregister_trans_observer(Trans_observer * observer,void * p)1310 int unregister_trans_observer(Trans_observer *observer, void *p)
1311 {
1312   return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
1313 }
1314 
register_binlog_storage_observer(Binlog_storage_observer * observer,void * p)1315 int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
1316 {
1317   DBUG_ENTER("register_binlog_storage_observer");
1318   int result= binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
1319   DBUG_RETURN(result);
1320 }
1321 
unregister_binlog_storage_observer(Binlog_storage_observer * observer,void * p)1322 int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
1323 {
1324   return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
1325 }
1326 
register_server_state_observer(Server_state_observer * observer,void * plugin_var)1327 int register_server_state_observer(Server_state_observer *observer, void *plugin_var)
1328 {
1329   DBUG_ENTER("register_server_state_observer");
1330   int result= server_state_delegate->add_observer(observer, (st_plugin_int *)plugin_var);
1331   DBUG_RETURN(result);
1332 }
1333 
unregister_server_state_observer(Server_state_observer * observer,void * plugin_var)1334 int unregister_server_state_observer(Server_state_observer *observer, void *plugin_var)
1335 {
1336   DBUG_ENTER("unregister_server_state_observer");
1337   int result= server_state_delegate->remove_observer(observer, (st_plugin_int *)plugin_var);
1338   DBUG_RETURN(result);
1339 }
1340 
1341 #ifdef HAVE_REPLICATION
register_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)1342 int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
1343 {
1344   return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
1345 }
1346 
unregister_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)1347 int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
1348 {
1349   return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
1350 }
1351 
register_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)1352 int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
1353 {
1354   return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
1355 }
1356 
unregister_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)1357 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
1358 {
1359   return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
1360 }
1361 #endif /* HAVE_REPLICATION */
1362