1 /* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "sql_priv.h"
24 #include "unireg.h"
25
26 #include "rpl_mi.h"
27 #include "log_event.h"
28 #include "rpl_filter.h"
29 #include <my_dir.h>
30 #include "rpl_handler.h"
31
32 Trans_delegate *transaction_delegate;
33 Binlog_storage_delegate *binlog_storage_delegate;
34 #ifdef HAVE_REPLICATION
35 Binlog_transmit_delegate *binlog_transmit_delegate;
36 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
37 #endif /* HAVE_REPLICATION */
38
39 /*
40 structure to save transaction log filename and position
41 */
42 typedef struct Trans_binlog_info {
43 my_off_t log_pos;
44 char log_file[FN_REFLEN];
45 } Trans_binlog_info;
46
get_user_var_int(const char * name,long long int * value,int * null_value)47 int get_user_var_int(const char *name,
48 long long int *value, int *null_value)
49 {
50 my_bool null_val;
51 user_var_entry *entry=
52 (user_var_entry*) my_hash_search(¤t_thd->user_vars,
53 (uchar*) name, strlen(name));
54 if (!entry)
55 return 1;
56 *value= entry->val_int(&null_val);
57 if (null_value)
58 *null_value= null_val;
59 return 0;
60 }
61
get_user_var_real(const char * name,double * value,int * null_value)62 int get_user_var_real(const char *name,
63 double *value, int *null_value)
64 {
65 my_bool null_val;
66 user_var_entry *entry=
67 (user_var_entry*) my_hash_search(¤t_thd->user_vars,
68 (uchar*) name, strlen(name));
69 if (!entry)
70 return 1;
71 *value= entry->val_real(&null_val);
72 if (null_value)
73 *null_value= null_val;
74 return 0;
75 }
76
get_user_var_str(const char * name,char * value,size_t len,unsigned int precision,int * null_value)77 int get_user_var_str(const char *name, char *value,
78 size_t len, unsigned int precision, int *null_value)
79 {
80 String str;
81 my_bool null_val;
82 user_var_entry *entry=
83 (user_var_entry*) my_hash_search(¤t_thd->user_vars,
84 (uchar*) name, strlen(name));
85 if (!entry)
86 return 1;
87 entry->val_str(&null_val, &str, precision);
88 strncpy(value, str.c_ptr(), len);
89 if (null_value)
90 *null_value= null_val;
91 return 0;
92 }
93
delegates_init()94 int delegates_init()
95 {
96 static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
97 static my_aligned_storage<sizeof(Binlog_storage_delegate),
98 MY_ALIGNOF(long)> storage_mem;
99 #ifdef HAVE_REPLICATION
100 static my_aligned_storage<sizeof(Binlog_transmit_delegate),
101 MY_ALIGNOF(long)> transmit_mem;
102 static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
103 MY_ALIGNOF(long)> relay_io_mem;
104 #endif
105
106 void *place_trans_mem= trans_mem.data;
107 void *place_storage_mem= storage_mem.data;
108
109 transaction_delegate= new (place_trans_mem) Trans_delegate;
110
111 if (!transaction_delegate->is_inited())
112 {
113 sql_print_error("Initialization of transaction delegates failed. "
114 "Please report a bug.");
115 return 1;
116 }
117
118 binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
119
120 if (!binlog_storage_delegate->is_inited())
121 {
122 sql_print_error("Initialization binlog storage delegates failed. "
123 "Please report a bug.");
124 return 1;
125 }
126
127 #ifdef HAVE_REPLICATION
128 void *place_transmit_mem= transmit_mem.data;
129 void *place_relay_io_mem= relay_io_mem.data;
130
131 binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
132
133 if (!binlog_transmit_delegate->is_inited())
134 {
135 sql_print_error("Initialization of binlog transmit delegates failed. "
136 "Please report a bug.");
137 return 1;
138 }
139
140 binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
141
142 if (!binlog_relay_io_delegate->is_inited())
143 {
144 sql_print_error("Initialization binlog relay IO delegates failed. "
145 "Please report a bug.");
146 return 1;
147 }
148 #endif
149
150 return 0;
151 }
152
delegates_destroy()153 void delegates_destroy()
154 {
155 if (transaction_delegate)
156 transaction_delegate->~Trans_delegate();
157 if (binlog_storage_delegate)
158 binlog_storage_delegate->~Binlog_storage_delegate();
159 #ifdef HAVE_REPLICATION
160 if (binlog_transmit_delegate)
161 binlog_transmit_delegate->~Binlog_transmit_delegate();
162 if (binlog_relay_io_delegate)
163 binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
164 #endif /* HAVE_REPLICATION */
165 }
166
167 /*
168 This macro is used by almost all the Delegate methods to iterate
169 over all the observers running given callback function of the
170 delegate .
171
172 Add observer plugins to the thd->lex list, after each statement, all
173 plugins add to thd->lex will be automatically unlocked.
174 */
175 #define FOREACH_OBSERVER(r, f, thd, args) \
176 param.server_id= thd->server_id; \
177 /*
178 Use a struct to make sure that they are allocated adjacent, check
179 delete_dynamic().
180 */ \
181 struct { \
182 DYNAMIC_ARRAY plugins; \
183 /* preallocate 8 slots */ \
184 plugin_ref plugins_buffer[8]; \
185 } s; \
186 DYNAMIC_ARRAY *plugins= &s.plugins; \
187 plugin_ref *plugins_buffer= s.plugins_buffer; \
188 my_init_dynamic_array2(plugins, sizeof(plugin_ref), \
189 plugins_buffer, 8, 8); \
190 read_lock(); \
191 Observer_info_iterator iter= observer_info_iter(); \
192 Observer_info *info= iter++; \
193 for (; info; info= iter++) \
194 { \
195 plugin_ref plugin= \
196 my_plugin_lock(0, &info->plugin); \
197 if (!plugin) \
198 { \
199 /* plugin is not intialized or deleted, this is not an error */ \
200 r= 0; \
201 break; \
202 } \
203 insert_dynamic(plugins, &plugin); \
204 if (((Observer *)info->observer)->f \
205 && ((Observer *)info->observer)->f args) \
206 { \
207 r= 1; \
208 sql_print_error("Run function '" #f "' in plugin '%s' failed", \
209 info->plugin_int->name.str); \
210 break; \
211 } \
212 } \
213 unlock(); \
214 /*
215 Unlock plugins should be done after we released the Delegate lock
216 to avoid possible deadlock when this is the last user of the
217 plugin, and when we unlock the plugin, it will try to
218 deinitialize the plugin, which will try to lock the Delegate in
219 order to remove the observers.
220 */ \
221 plugin_unlock_list(0, (plugin_ref*)plugins->buffer, \
222 plugins->elements); \
223 delete_dynamic(plugins)
224
225
after_commit(THD * thd,bool all)226 int Trans_delegate::after_commit(THD *thd, bool all)
227 {
228 DBUG_ENTER("Trans_delegate::after_commit");
229 Trans_param param = { 0, 0, 0, 0 };
230 bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
231
232 if (is_real_trans)
233 param.flags = true;
234
235 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
236
237 DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
238
239 int ret= 0;
240 FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
241 DBUG_RETURN(ret);
242 }
243
after_rollback(THD * thd,bool all)244 int Trans_delegate::after_rollback(THD *thd, bool all)
245 {
246 Trans_param param = { 0, 0, 0, 0 };
247 bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
248
249 if (is_real_trans)
250 param.flags|= TRANS_IS_REAL_TRANS;
251 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
252 int ret= 0;
253 FOREACH_OBSERVER(ret, after_rollback, thd, (¶m));
254 return ret;
255 }
256
after_flush(THD * thd,const char * log_file,my_off_t log_pos)257 int Binlog_storage_delegate::after_flush(THD *thd,
258 const char *log_file,
259 my_off_t log_pos)
260 {
261 DBUG_ENTER("Binlog_storage_delegate::after_flush");
262 DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
263 log_file, (ulonglong) log_pos));
264 Binlog_storage_param param;
265
266 int ret= 0;
267 FOREACH_OBSERVER(ret, after_flush, thd, (¶m, log_file, log_pos));
268 DBUG_RETURN(ret);
269 }
270
271 #ifdef HAVE_REPLICATION
transmit_start(THD * thd,ushort flags,const char * log_file,my_off_t log_pos,bool * observe_transmission)272 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
273 const char *log_file,
274 my_off_t log_pos,
275 bool *observe_transmission)
276 {
277 Binlog_transmit_param param;
278 param.flags= flags;
279
280 int ret= 0;
281 FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
282 *observe_transmission= param.should_observe();
283 return ret;
284 }
285
transmit_stop(THD * thd,ushort flags)286 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
287 {
288 Binlog_transmit_param param;
289 param.flags= flags;
290
291 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
292
293 int ret= 0;
294 FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
295 return ret;
296 }
297
reserve_header(THD * thd,ushort flags,String * packet)298 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
299 String *packet)
300 {
301 /* NOTE2ME: Maximum extra header size for each observer, I hope 32
302 bytes should be enough for each Observer to reserve their extra
303 header. If later found this is not enough, we can increase this
304 /HEZX
305 */
306 #define RESERVE_HEADER_SIZE 32
307 unsigned char header[RESERVE_HEADER_SIZE];
308 ulong hlen;
309 Binlog_transmit_param param;
310 param.flags= flags;
311 param.server_id= thd->server_id;
312
313 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
314
315 int ret= 0;
316 read_lock();
317 Observer_info_iterator iter= observer_info_iter();
318 Observer_info *info= iter++;
319 for (; info; info= iter++)
320 {
321 plugin_ref plugin=
322 my_plugin_lock(thd, &info->plugin);
323 if (!plugin)
324 {
325 ret= 1;
326 break;
327 }
328 hlen= 0;
329 if (((Observer *)info->observer)->reserve_header
330 && ((Observer *)info->observer)->reserve_header(¶m,
331 header,
332 RESERVE_HEADER_SIZE,
333 &hlen))
334 {
335 ret= 1;
336 plugin_unlock(thd, plugin);
337 break;
338 }
339 plugin_unlock(thd, plugin);
340 if (hlen == 0)
341 continue;
342 if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
343 {
344 ret= 1;
345 break;
346 }
347 }
348 unlock();
349 return ret;
350 }
351
before_send_event(THD * thd,ushort flags,String * packet,const char * log_file,my_off_t log_pos)352 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
353 String *packet,
354 const char *log_file,
355 my_off_t log_pos)
356 {
357 Binlog_transmit_param param;
358 param.flags= flags;
359
360 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
361
362 int ret= 0;
363 FOREACH_OBSERVER(ret, before_send_event, thd,
364 (¶m, (uchar *)packet->c_ptr(),
365 packet->length(),
366 log_file+dirname_length(log_file), log_pos));
367 return ret;
368 }
369
after_send_event(THD * thd,ushort flags,String * packet,const char * skipped_log_file,my_off_t skipped_log_pos)370 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
371 String *packet,
372 const char *skipped_log_file,
373 my_off_t skipped_log_pos)
374 {
375 Binlog_transmit_param param;
376 param.flags= flags;
377
378 DBUG_EXECUTE_IF("crash_binlog_transmit_hook", DBUG_SUICIDE(););
379
380 int ret= 0;
381 FOREACH_OBSERVER(ret, after_send_event, thd,
382 (¶m, packet->c_ptr(), packet->length(),
383 skipped_log_file+dirname_length(skipped_log_file),
384 skipped_log_pos));
385 return ret;
386 }
387
after_reset_master(THD * thd,ushort flags)388 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
389
390 {
391 Binlog_transmit_param param;
392 param.flags= flags;
393
394 int ret= 0;
395 FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
396 return ret;
397 }
398
init_param(Binlog_relay_IO_param * param,Master_info * mi)399 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
400 Master_info *mi)
401 {
402 param->mysql= mi->mysql;
403 param->user= const_cast<char *>(mi->get_user());
404 param->host= mi->host;
405 param->port= mi->port;
406 param->master_log_name= const_cast<char *>(mi->get_master_log_name());
407 param->master_log_pos= mi->get_master_log_pos();
408 }
409
thread_start(THD * thd,Master_info * mi)410 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
411 {
412 Binlog_relay_IO_param param;
413 init_param(¶m, mi);
414
415 int ret= 0;
416 FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
417 return ret;
418 }
419
420
thread_stop(THD * thd,Master_info * mi)421 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
422 {
423
424 Binlog_relay_IO_param param;
425 init_param(¶m, mi);
426
427 int ret= 0;
428 FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
429 return ret;
430 }
431
before_request_transmit(THD * thd,Master_info * mi,ushort flags)432 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
433 Master_info *mi,
434 ushort flags)
435 {
436 Binlog_relay_IO_param param;
437 init_param(¶m, mi);
438
439 int ret= 0;
440 FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
441 return ret;
442 }
443
after_read_event(THD * thd,Master_info * mi,const char * packet,ulong len,const char ** event_buf,ulong * event_len)444 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
445 const char *packet, ulong len,
446 const char **event_buf,
447 ulong *event_len)
448 {
449 Binlog_relay_IO_param param;
450 init_param(¶m, mi);
451
452 int ret= 0;
453 FOREACH_OBSERVER(ret, after_read_event, thd,
454 (¶m, packet, len, event_buf, event_len));
455 return ret;
456 }
457
after_queue_event(THD * thd,Master_info * mi,const char * event_buf,ulong event_len,bool synced)458 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
459 const char *event_buf,
460 ulong event_len,
461 bool synced)
462 {
463 Binlog_relay_IO_param param;
464 init_param(¶m, mi);
465
466 uint32 flags=0;
467 if (synced)
468 flags |= BINLOG_STORAGE_IS_SYNCED;
469
470 int ret= 0;
471 FOREACH_OBSERVER(ret, after_queue_event, thd,
472 (¶m, event_buf, event_len, flags));
473 return ret;
474 }
475
after_reset_slave(THD * thd,Master_info * mi)476 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
477
478 {
479 Binlog_relay_IO_param param;
480 init_param(¶m, mi);
481
482 int ret= 0;
483 FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
484 return ret;
485 }
486 #endif /* HAVE_REPLICATION */
487
register_trans_observer(Trans_observer * observer,void * p)488 int register_trans_observer(Trans_observer *observer, void *p)
489 {
490 return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
491 }
492
unregister_trans_observer(Trans_observer * observer,void * p)493 int unregister_trans_observer(Trans_observer *observer, void *p)
494 {
495 return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
496 }
497
register_binlog_storage_observer(Binlog_storage_observer * observer,void * p)498 int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
499 {
500 DBUG_ENTER("register_binlog_storage_observer");
501 int result= binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
502 DBUG_RETURN(result);
503 }
504
unregister_binlog_storage_observer(Binlog_storage_observer * observer,void * p)505 int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
506 {
507 return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
508 }
509
510 #ifdef HAVE_REPLICATION
register_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)511 int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
512 {
513 return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
514 }
515
unregister_binlog_transmit_observer(Binlog_transmit_observer * observer,void * p)516 int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
517 {
518 return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
519 }
520
register_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)521 int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
522 {
523 return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
524 }
525
unregister_binlog_relay_io_observer(Binlog_relay_IO_observer * observer,void * p)526 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
527 {
528 return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
529 }
530 #endif /* HAVE_REPLICATION */
531