1 /* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "sql_priv.h"
24 #include "unireg.h"
25 
26 #include "rpl_mi.h"
27 #include "log_event.h"
28 #include "rpl_filter.h"
29 #include <my_dir.h>
30 #include "rpl_handler.h"
31 
32 Trans_delegate *transaction_delegate;
33 Binlog_storage_delegate *binlog_storage_delegate;
34 #ifdef HAVE_REPLICATION
35 Binlog_transmit_delegate *binlog_transmit_delegate;
36 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
37 #endif /* HAVE_REPLICATION */
38 
39 /*
40   structure to save transaction log filename and position
41 */
42 typedef struct Trans_binlog_info {
43   my_off_t log_pos;
44   char log_file[FN_REFLEN];
45 } Trans_binlog_info;
46 
get_user_var_int(const char * name,long long int * value,int * null_value)47 int get_user_var_int(const char *name,
48                      long long int *value, int *null_value)
49 {
50   my_bool null_val;
51   user_var_entry *entry=
52     (user_var_entry*) my_hash_search(&current_thd->user_vars,
53                                   (uchar*) name, strlen(name));
54   if (!entry)
55     return 1;
56   *value= entry->val_int(&null_val);
57   if (null_value)
58     *null_value= null_val;
59   return 0;
60 }
61 
get_user_var_real(const char * name,double * value,int * null_value)62 int get_user_var_real(const char *name,
63                       double *value, int *null_value)
64 {
65   my_bool null_val;
66   user_var_entry *entry=
67     (user_var_entry*) my_hash_search(&current_thd->user_vars,
68                                   (uchar*) name, strlen(name));
69   if (!entry)
70     return 1;
71   *value= entry->val_real(&null_val);
72   if (null_value)
73     *null_value= null_val;
74   return 0;
75 }
76 
get_user_var_str(const char * name,char * value,size_t len,unsigned int precision,int * null_value)77 int get_user_var_str(const char *name, char *value,
78                      size_t len, unsigned int precision, int *null_value)
79 {
80   String str;
81   my_bool null_val;
82   user_var_entry *entry=
83     (user_var_entry*) my_hash_search(&current_thd->user_vars,
84                                   (uchar*) name, strlen(name));
85   if (!entry)
86     return 1;
87   entry->val_str(&null_val, &str, precision);
88   strncpy(value, str.c_ptr(), len);
89   if (null_value)
90     *null_value= null_val;
91   return 0;
92 }
93 
delegates_init()94 int delegates_init()
95 {
96   static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
97   static my_aligned_storage<sizeof(Binlog_storage_delegate),
98                             MY_ALIGNOF(long)> storage_mem;
99 #ifdef HAVE_REPLICATION
100   static my_aligned_storage<sizeof(Binlog_transmit_delegate),
101                             MY_ALIGNOF(long)> transmit_mem;
102   static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
103                             MY_ALIGNOF(long)> relay_io_mem;
104 #endif
105 
106   void *place_trans_mem= trans_mem.data;
107   void *place_storage_mem= storage_mem.data;
108 
109   transaction_delegate= new (place_trans_mem) Trans_delegate;
110 
111   if (!transaction_delegate->is_inited())
112   {
113     sql_print_error("Initialization of transaction delegates failed. "
114                     "Please report a bug.");
115     return 1;
116   }
117 
118   binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
119 
120   if (!binlog_storage_delegate->is_inited())
121   {
122     sql_print_error("Initialization binlog storage delegates failed. "
123                     "Please report a bug.");
124     return 1;
125   }
126 
127 #ifdef HAVE_REPLICATION
128   void *place_transmit_mem= transmit_mem.data;
129   void *place_relay_io_mem= relay_io_mem.data;
130 
131   binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
132 
133   if (!binlog_transmit_delegate->is_inited())
134   {
135     sql_print_error("Initialization of binlog transmit delegates failed. "
136                     "Please report a bug.");
137     return 1;
138   }
139 
140   binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
141 
142   if (!binlog_relay_io_delegate->is_inited())
143   {
144     sql_print_error("Initialization binlog relay IO delegates failed. "
145                     "Please report a bug.");
146     return 1;
147   }
148 #endif
149 
150   return 0;
151 }
152 
delegates_destroy()153 void delegates_destroy()
154 {
155   if (transaction_delegate)
156     transaction_delegate->~Trans_delegate();
157   if (binlog_storage_delegate)
158     binlog_storage_delegate->~Binlog_storage_delegate();
159 #ifdef HAVE_REPLICATION
160   if (binlog_transmit_delegate)
161     binlog_transmit_delegate->~Binlog_transmit_delegate();
162   if (binlog_relay_io_delegate)
163     binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
164 #endif /* HAVE_REPLICATION */
165 }
166 
167 /*
168   This macro is used by almost all the Delegate methods to iterate
169   over all the observers running given callback function of the
170   delegate .
171 
172   Add observer plugins to the thd->lex list, after each statement, all
173   plugins add to thd->lex will be automatically unlocked.
174  */
175 #define FOREACH_OBSERVER(r, f, thd, args)                               \
176   param.server_id= thd->server_id;                                      \
177   /*
178      Use a struct to make sure that they are allocated adjacent, check
179      delete_dynamic().
180   */                                                                    \
181   struct {                                                              \
182     DYNAMIC_ARRAY plugins;                                              \
183     /* preallocate 8 slots */                                           \
184     plugin_ref plugins_buffer[8];                                       \
185   } s;                                                                  \
186   DYNAMIC_ARRAY *plugins= &s.plugins;                                   \
187   plugin_ref *plugins_buffer= s.plugins_buffer;                         \
188   my_init_dynamic_array2(plugins, sizeof(plugin_ref),                   \
189                          plugins_buffer, 8, 8);                         \
190   read_lock();                                                          \
191   Observer_info_iterator iter= observer_info_iter();                    \
192   Observer_info *info= iter++;                                          \
193   for (; info; info= iter++)                                            \
194   {                                                                     \
195     plugin_ref plugin=                                                  \
196       my_plugin_lock(0, &info->plugin);                                 \
197     if (!plugin)                                                        \
198     {                                                                   \
199       /* plugin is not intialized or deleted, this is not an error */   \
200       r= 0;                                                             \
201       break;                                                            \
202     }                                                                   \
203     insert_dynamic(plugins, &plugin);                                   \
204     if (((Observer *)info->observer)->f                                 \
205         && ((Observer *)info->observer)->f args)                        \
206     {                                                                   \
207       r= 1;                                                             \
208       sql_print_error("Run function '" #f "' in plugin '%s' failed",    \
209                       info->plugin_int->name.str);                      \
210       break;                                                            \
211     }                                                                   \
212   }                                                                     \
213   unlock();                                                             \
214   /*
215      Unlock plugins should be done after we released the Delegate lock
216      to avoid possible deadlock when this is the last user of the
217      plugin, and when we unlock the plugin, it will try to
218      deinitialize the plugin, which will try to lock the Delegate in
219      order to remove the observers.
220   */                                                                    \
221   plugin_unlock_list(0, (plugin_ref*)plugins->buffer,                   \
222                      plugins->elements);                                \
223   delete_dynamic(plugins)
224 
225 
after_commit(THD * thd,bool all)226 int Trans_delegate::after_commit(THD *thd, bool all)
227 {
228   DBUG_ENTER("Trans_delegate::after_commit");
229   Trans_param param = { 0, 0, 0, 0 };
230   bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
231 
232   if (is_real_trans)
233     param.flags = true;
234 
235   thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
236 
237   DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
238 
239   int ret= 0;
240   FOREACH_OBSERVER(ret, after_commit, thd, (&param));
241   DBUG_RETURN(ret);
242 }
243 
after_rollback(THD * thd,bool all)244 int Trans_delegate::after_rollback(THD *thd, bool all)
245 {
246   Trans_param param = { 0, 0, 0, 0 };
247   bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
248 
249   if (is_real_trans)
250     param.flags|= TRANS_IS_REAL_TRANS;
251   thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
252   int ret= 0;
253   FOREACH_OBSERVER(ret, after_rollback, thd, (&param));
254   return ret;
255 }
256 
after_flush(THD * thd,const char * log_file,my_off_t log_pos)257 int Binlog_storage_delegate::after_flush(THD *thd,
258                                          const char *log_file,
259                                          my_off_t log_pos)
260 {
261   DBUG_ENTER("Binlog_storage_delegate::after_flush");
262   DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
263                        log_file, (ulonglong) log_pos));
264   Binlog_storage_param param;
265 
266   int ret= 0;
267   FOREACH_OBSERVER(ret, after_flush, thd, (&param, log_file, log_pos));
268   DBUG_RETURN(ret);
269 }
270 
271 #ifdef HAVE_REPLICATION
transmit_start(THD * thd,ushort flags,const char * log_file,my_off_t log_pos,bool * observe_transmission)272 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
273                                              const char *log_file,
274                                              my_off_t log_pos,
275                                              bool *observe_transmission)
276 {
277   Binlog_transmit_param param;
278   param.flags= flags;
279 
280   int ret= 0;
281   FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
282   *observe_transmission= param.should_observe();
283   return ret;
284 }
285 
transmit_stop(THD * thd,ushort flags)286 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
287 {
288   Binlog_transmit_param param;
289   param.flags= flags;
290 
291   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
292 
293   int ret= 0;
294   FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
295   return ret;
296 }
297 
reserve_header(THD * thd,ushort flags,String * packet)298 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
299                                              String *packet)
300 {
301   /* NOTE2ME: Maximum extra header size for each observer, I hope 32
302      bytes should be enough for each Observer to reserve their extra
303      header. If later found this is not enough, we can increase this
304      /HEZX
305   */
306 #define RESERVE_HEADER_SIZE 32
307   unsigned char header[RESERVE_HEADER_SIZE];
308   ulong hlen;
309   Binlog_transmit_param param;
310   param.flags= flags;
311   param.server_id= thd->server_id;
312 
313   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
314 
315   int ret= 0;
316   read_lock();
317   Observer_info_iterator iter= observer_info_iter();
318   Observer_info *info= iter++;
319   for (; info; info= iter++)
320   {
321     plugin_ref plugin=
322       my_plugin_lock(thd, &info->plugin);
323     if (!plugin)
324     {
325       ret= 1;
326       break;
327     }
328     hlen= 0;
329     if (((Observer *)info->observer)->reserve_header
330         && ((Observer *)info->observer)->reserve_header(&param,
331                                                         header,
332                                                         RESERVE_HEADER_SIZE,
333                                                         &hlen))
334     {
335       ret= 1;
336       plugin_unlock(thd, plugin);
337       break;
338     }
339     plugin_unlock(thd, plugin);
340     if (hlen == 0)
341       continue;
342     if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
343     {
344       ret= 1;
345       break;
346     }
347   }
348   unlock();
349   return ret;
350 }
351 
before_send_event(THD * thd,ushort flags,String * packet,const char * log_file,my_off_t log_pos)352 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
353                                                 String *packet,
354                                                 const char *log_file,
355                                                 my_off_t log_pos)
356 {
357   Binlog_transmit_param param;
358   param.flags= flags;
359 
360   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
361 
362   int ret= 0;
363   FOREACH_OBSERVER(ret, before_send_event, thd,
364                    (&param, (uchar *)packet->c_ptr(),
365                     packet->length(),
366                     log_file+dirname_length(log_file), log_pos));
367   return ret;
368 }
369 
after_send_event(THD * thd,ushort flags,String * packet,const char * skipped_log_file,my_off_t skipped_log_pos)370 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
371                                                String *packet,
372                                                const char *skipped_log_file,
373                                                my_off_t skipped_log_pos)
374 {
375   Binlog_transmit_param param;
376   param.flags= flags;
377 
378   DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
379 
380   int ret= 0;
381   FOREACH_OBSERVER(ret, after_send_event, thd,
382                    (&param, packet->c_ptr(), packet->length(),
383                    skipped_log_file+dirname_length(skipped_log_file),
384                     skipped_log_pos));
385   return ret;
386 }
387 
after_reset_master(THD * thd,ushort flags)388 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
389 
390 {
391   Binlog_transmit_param param;
392   param.flags= flags;
393 
394   int ret= 0;
395   FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
396   return ret;
397 }
398 
init_param(Binlog_relay_IO_param * param,Master_info * mi)399 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
400                                           Master_info *mi)
401 {
402   param->mysql= mi->mysql;
403   param->user= const_cast<char *>(mi->get_user());
404   param->host= mi->host;
405   param->port= mi->port;
406   param->master_log_name= const_cast<char *>(mi->get_master_log_name());
407   param->master_log_pos= mi->get_master_log_pos();
408 }
409 
thread_start(THD * thd,Master_info * mi)410 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
411 {
412   Binlog_relay_IO_param param;
413   init_param(&param, mi);
414 
415   int ret= 0;
416   FOREACH_OBSERVER(ret, thread_start, thd, (&param));
417   return ret;
418 }
419 
420 
thread_stop(THD * thd,Master_info * mi)421 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
422 {
423 
424   Binlog_relay_IO_param param;
425   init_param(&param, mi);
426 
427   int ret= 0;
428   FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
429   return ret;
430 }
431 
before_request_transmit(THD * thd,Master_info * mi,ushort flags)432 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
433                                                       Master_info *mi,
434                                                       ushort flags)
435 {
436   Binlog_relay_IO_param param;
437   init_param(&param, mi);
438 
439   int ret= 0;
440   FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
441   return ret;
442 }
443 
after_read_event(THD * thd,Master_info * mi,const char * packet,ulong len,const char ** event_buf,ulong * event_len)444 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
445                                                const char *packet, ulong len,
446                                                const char **event_buf,
447                                                ulong *event_len)
448 {
449   Binlog_relay_IO_param param;
450   init_param(&param, mi);
451 
452   int ret= 0;
453   FOREACH_OBSERVER(ret, after_read_event, thd,
454                    (&param, packet, len, event_buf, event_len));
455   return ret;
456 }
457 
after_queue_event(THD * thd,Master_info * mi,const char * event_buf,ulong event_len,bool synced)458 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
459                                                 const char *event_buf,
460                                                 ulong event_len,
461                                                 bool synced)
462 {
463   Binlog_relay_IO_param param;
464   init_param(&param, mi);
465 
466   uint32 flags=0;
467   if (synced)
468     flags |= BINLOG_STORAGE_IS_SYNCED;
469 
470   int ret= 0;
471   FOREACH_OBSERVER(ret, after_queue_event, thd,
472                    (&param, event_buf, event_len, flags));
473   return ret;
474 }
475 
after_reset_slave(THD * thd,Master_info * mi)476 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
477 
478 {
479   Binlog_relay_IO_param param;
480   init_param(&param, mi);
481 
482   int ret= 0;
483   FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
484   return ret;
485 }
486 #endif /* HAVE_REPLICATION */
487 
register_trans_observer(Trans_observer * observer,void * p)488 int register_trans_observer(Trans_observer *observer, void *p)
489 {
490   return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
491 }
492 
unregister_trans_observer(Trans_observer * observer,void * p)493 int unregister_trans_observer(Trans_observer *observer, void *p)
494 {
495   return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
496 }
497 
register_binlog_storage_observer(Binlog_storage_observer * observer,void * p)498 int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
499 {
500   DBUG_ENTER("register_binlog_storage_observer");
501   int result= binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
502   DBUG_RETURN(result);
503 }
504 
unregister_binlog_storage_observer(Binlog_storage_observer * observer,void * p)505 int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
506 {
507   return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
508 }
509 
510 #ifdef HAVE_REPLICATION
register_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)511 int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
512 {
513   return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
514 }
515 
unregister_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)516 int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
517 {
518   return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
519 }
520 
register_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)521 int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
522 {
523   return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
524 }
525 
unregister_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)526 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
527 {
528   return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
529 }
530 #endif /* HAVE_REPLICATION */
531