1 /* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #ifndef RPL_HANDLER_H
24 #define RPL_HANDLER_H
25 
26 #include "sql_priv.h"
27 #include "rpl_gtid.h"
28 #include "rpl_mi.h"
29 #include "rpl_rli.h"
30 #include "sql_plugin.h"
31 #include "replication.h"
32 
33 class Observer_info {
34 public:
35   void *observer;
36   st_plugin_int *plugin_int;
37   plugin_ref plugin;
38 
Observer_info(void * ob,st_plugin_int * p)39   Observer_info(void *ob, st_plugin_int *p)
40     :observer(ob), plugin_int(p)
41   {
42     plugin= plugin_int_to_ref(plugin_int);
43   }
44 };
45 
46 class Delegate {
47 public:
48   typedef List<Observer_info> Observer_info_list;
49   typedef List_iterator<Observer_info> Observer_info_iterator;
50 
add_observer(void * observer,st_plugin_int * plugin)51   int add_observer(void *observer, st_plugin_int *plugin)
52   {
53     int ret= FALSE;
54     if (!inited)
55       return TRUE;
56     write_lock();
57     Observer_info_iterator iter(observer_info_list);
58     Observer_info *info= iter++;
59     while (info && info->observer != observer)
60       info= iter++;
61     if (!info)
62     {
63       info= new Observer_info(observer, plugin);
64       if (!info || observer_info_list.push_back(info, &memroot))
65         ret= TRUE;
66     }
67     else
68       ret= TRUE;
69     unlock();
70     return ret;
71   }
72 
remove_observer(void * observer,st_plugin_int * plugin)73   int remove_observer(void *observer, st_plugin_int *plugin)
74   {
75     int ret= FALSE;
76     if (!inited)
77       return TRUE;
78     write_lock();
79     Observer_info_iterator iter(observer_info_list);
80     Observer_info *info= iter++;
81     while (info && info->observer != observer)
82       info= iter++;
83     if (info)
84     {
85       iter.remove();
86       delete info;
87     }
88     else
89       ret= TRUE;
90     unlock();
91     return ret;
92   }
93 
observer_info_iter()94   inline Observer_info_iterator observer_info_iter()
95   {
96     return Observer_info_iterator(observer_info_list);
97   }
98 
is_empty()99   inline bool is_empty()
100   {
101     DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
102     return observer_info_list.is_empty();
103   }
104 
read_lock()105   inline int read_lock()
106   {
107     if (!inited)
108       return TRUE;
109     return mysql_rwlock_rdlock(&lock);
110   }
111 
write_lock()112   inline int write_lock()
113   {
114     if (!inited)
115       return TRUE;
116     return mysql_rwlock_wrlock(&lock);
117   }
118 
unlock()119   inline int unlock()
120   {
121     if (!inited)
122       return TRUE;
123     return mysql_rwlock_unlock(&lock);
124   }
125 
is_inited()126   inline bool is_inited()
127   {
128     return inited;
129   }
130 
Delegate(PSI_rwlock_key key)131   Delegate(
132 #ifdef HAVE_PSI_INTERFACE
133            PSI_rwlock_key key
134 #endif
135            )
136   {
137     inited= FALSE;
138 #ifdef HAVE_PSI_INTERFACE
139     if (mysql_rwlock_init(key, &lock))
140       return;
141 #else
142     if (mysql_rwlock_init(0, &lock))
143       return;
144 #endif
145     init_sql_alloc(&memroot, 1024, 0);
146     inited= TRUE;
147   }
~Delegate()148   ~Delegate()
149   {
150     inited= FALSE;
151     mysql_rwlock_destroy(&lock);
152     free_root(&memroot, MYF(0));
153   }
154 
155 private:
156   Observer_info_list observer_info_list;
157   mysql_rwlock_t lock;
158   MEM_ROOT memroot;
159   bool inited;
160 };
161 
162 #ifdef HAVE_PSI_INTERFACE
163 extern PSI_rwlock_key key_rwlock_Trans_delegate_lock;
164 #endif
165 
166 class Trans_delegate
167   :public Delegate {
168 public:
169 
Trans_delegate()170   Trans_delegate()
171   : Delegate(
172 #ifdef HAVE_PSI_INTERFACE
173              key_rwlock_Trans_delegate_lock
174 #endif
175              )
176   {}
177 
178   typedef Trans_observer Observer;
179   int before_commit(THD *thd, bool all);
180   int before_rollback(THD *thd, bool all);
181   int after_commit(THD *thd, bool all);
182   int after_rollback(THD *thd, bool all);
183 };
184 
185 #ifdef HAVE_PSI_INTERFACE
186 extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock;
187 #endif
188 
189 class Binlog_storage_delegate
190   :public Delegate {
191 public:
192 
Binlog_storage_delegate()193   Binlog_storage_delegate()
194   : Delegate(
195 #ifdef HAVE_PSI_INTERFACE
196              key_rwlock_Binlog_storage_delegate_lock
197 #endif
198              )
199   {}
200 
201   typedef Binlog_storage_observer Observer;
202   int after_flush(THD *thd, const char *log_file,
203                   my_off_t log_pos);
204 };
205 
206 #ifdef HAVE_REPLICATION
207 #ifdef HAVE_PSI_INTERFACE
208 extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock;
209 #endif
210 
211 class Binlog_transmit_delegate
212   :public Delegate {
213 public:
214 
Binlog_transmit_delegate()215   Binlog_transmit_delegate()
216   : Delegate(
217 #ifdef HAVE_PSI_INTERFACE
218              key_rwlock_Binlog_transmit_delegate_lock
219 #endif
220              )
221   {}
222 
223   typedef Binlog_transmit_observer Observer;
224   int transmit_start(THD *thd, ushort flags,
225                      const char *log_file, my_off_t log_pos,
226                      bool *observe_transmission);
227   int transmit_stop(THD *thd, ushort flags);
228   int reserve_header(THD *thd, ushort flags, String *packet);
229   int before_send_event(THD *thd, ushort flags,
230                         String *packet, const
231                         char *log_file, my_off_t log_pos );
232   int after_send_event(THD *thd, ushort flags,
233                        String *packet, const char *skipped_log_file,
234                        my_off_t skipped_log_pos);
235   int after_reset_master(THD *thd, ushort flags);
236 };
237 
238 #ifdef HAVE_PSI_INTERFACE
239 extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock;
240 #endif
241 
242 class Binlog_relay_IO_delegate
243   :public Delegate {
244 public:
245 
Binlog_relay_IO_delegate()246   Binlog_relay_IO_delegate()
247   : Delegate(
248 #ifdef HAVE_PSI_INTERFACE
249              key_rwlock_Binlog_relay_IO_delegate_lock
250 #endif
251              )
252   {}
253 
254   typedef Binlog_relay_IO_observer Observer;
255   int thread_start(THD *thd, Master_info *mi);
256   int thread_stop(THD *thd, Master_info *mi);
257   int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
258   int after_read_event(THD *thd, Master_info *mi,
259                        const char *packet, ulong len,
260                        const char **event_buf, ulong *event_len);
261   int after_queue_event(THD *thd, Master_info *mi,
262                         const char *event_buf, ulong event_len,
263                         bool synced);
264   int after_reset_slave(THD *thd, Master_info *mi);
265 private:
266   void init_param(Binlog_relay_IO_param *param, Master_info *mi);
267 };
268 #endif /* HAVE_REPLICATION */
269 
270 int delegates_init();
271 void delegates_destroy();
272 
273 extern Trans_delegate *transaction_delegate;
274 extern Binlog_storage_delegate *binlog_storage_delegate;
275 #ifdef HAVE_REPLICATION
276 extern Binlog_transmit_delegate *binlog_transmit_delegate;
277 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
278 #endif /* HAVE_REPLICATION */
279 
280 /*
281   if there is no observers in the delegate, we can return 0
282   immediately.
283 */
284 #define RUN_HOOK(group, hook, args)             \
285   (group ##_delegate->is_empty() ?              \
286    0 : group ##_delegate->hook args)
287 
288 #endif /* RPL_HANDLER_H */
289