1 /* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software Foundation, 21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ 22 23 #ifndef RPL_HANDLER_H 24 #define RPL_HANDLER_H 25 26 #include "sql_priv.h" 27 #include "rpl_gtid.h" 28 #include "rpl_mi.h" 29 #include "rpl_rli.h" 30 #include "sql_plugin.h" 31 #include "replication.h" 32 33 class Observer_info { 34 public: 35 void *observer; 36 st_plugin_int *plugin_int; 37 plugin_ref plugin; 38 Observer_info(void * ob,st_plugin_int * p)39 Observer_info(void *ob, st_plugin_int *p) 40 :observer(ob), plugin_int(p) 41 { 42 plugin= plugin_int_to_ref(plugin_int); 43 } 44 }; 45 46 class Delegate { 47 public: 48 typedef List<Observer_info> Observer_info_list; 49 typedef List_iterator<Observer_info> Observer_info_iterator; 50 add_observer(void * observer,st_plugin_int * plugin)51 int add_observer(void *observer, st_plugin_int *plugin) 52 { 53 int ret= FALSE; 54 if (!inited) 55 return TRUE; 56 write_lock(); 57 Observer_info_iterator iter(observer_info_list); 58 Observer_info *info= iter++; 59 while (info && info->observer != observer) 60 info= iter++; 61 if (!info) 62 { 63 info= new Observer_info(observer, plugin); 64 if (!info || observer_info_list.push_back(info, &memroot)) 65 ret= TRUE; 66 } 67 else 68 ret= TRUE; 69 unlock(); 70 return ret; 71 } 72 remove_observer(void * observer,st_plugin_int * plugin)73 int remove_observer(void *observer, st_plugin_int *plugin) 74 { 75 int ret= FALSE; 76 if (!inited) 77 return TRUE; 78 write_lock(); 79 Observer_info_iterator iter(observer_info_list); 80 Observer_info *info= iter++; 81 while (info && info->observer != observer) 82 info= iter++; 83 if (info) 84 { 85 iter.remove(); 86 delete info; 87 } 88 else 89 ret= TRUE; 90 unlock(); 91 return ret; 92 } 93 observer_info_iter()94 inline Observer_info_iterator observer_info_iter() 95 { 96 return Observer_info_iterator(observer_info_list); 97 } 98 is_empty()99 inline bool is_empty() 100 { 101 DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty())); 102 return observer_info_list.is_empty(); 103 } 104 read_lock()105 inline int read_lock() 106 { 107 if (!inited) 108 return TRUE; 109 return mysql_rwlock_rdlock(&lock); 110 } 111 write_lock()112 inline int write_lock() 113 { 114 if (!inited) 115 return TRUE; 116 return mysql_rwlock_wrlock(&lock); 117 } 118 unlock()119 inline int unlock() 120 { 121 if (!inited) 122 return TRUE; 123 return mysql_rwlock_unlock(&lock); 124 } 125 is_inited()126 inline bool is_inited() 127 { 128 return inited; 129 } 130 Delegate(PSI_rwlock_key key)131 Delegate( 132 #ifdef HAVE_PSI_INTERFACE 133 PSI_rwlock_key key 134 #endif 135 ) 136 { 137 inited= FALSE; 138 #ifdef HAVE_PSI_INTERFACE 139 if (mysql_rwlock_init(key, &lock)) 140 return; 141 #else 142 if (mysql_rwlock_init(0, &lock)) 143 return; 144 #endif 145 init_sql_alloc(&memroot, 1024, 0); 146 inited= TRUE; 147 } ~Delegate()148 ~Delegate() 149 { 150 inited= FALSE; 151 mysql_rwlock_destroy(&lock); 152 free_root(&memroot, MYF(0)); 153 } 154 155 private: 156 Observer_info_list observer_info_list; 157 mysql_rwlock_t lock; 158 MEM_ROOT memroot; 159 bool inited; 160 }; 161 162 #ifdef HAVE_PSI_INTERFACE 163 extern PSI_rwlock_key key_rwlock_Trans_delegate_lock; 164 #endif 165 166 class Trans_delegate 167 :public Delegate { 168 public: 169 Trans_delegate()170 Trans_delegate() 171 : Delegate( 172 #ifdef HAVE_PSI_INTERFACE 173 key_rwlock_Trans_delegate_lock 174 #endif 175 ) 176 {} 177 178 typedef Trans_observer Observer; 179 int before_commit(THD *thd, bool all); 180 int before_rollback(THD *thd, bool all); 181 int after_commit(THD *thd, bool all); 182 int after_rollback(THD *thd, bool all); 183 }; 184 185 #ifdef HAVE_PSI_INTERFACE 186 extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock; 187 #endif 188 189 class Binlog_storage_delegate 190 :public Delegate { 191 public: 192 Binlog_storage_delegate()193 Binlog_storage_delegate() 194 : Delegate( 195 #ifdef HAVE_PSI_INTERFACE 196 key_rwlock_Binlog_storage_delegate_lock 197 #endif 198 ) 199 {} 200 201 typedef Binlog_storage_observer Observer; 202 int after_flush(THD *thd, const char *log_file, 203 my_off_t log_pos); 204 }; 205 206 #ifdef HAVE_REPLICATION 207 #ifdef HAVE_PSI_INTERFACE 208 extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock; 209 #endif 210 211 class Binlog_transmit_delegate 212 :public Delegate { 213 public: 214 Binlog_transmit_delegate()215 Binlog_transmit_delegate() 216 : Delegate( 217 #ifdef HAVE_PSI_INTERFACE 218 key_rwlock_Binlog_transmit_delegate_lock 219 #endif 220 ) 221 {} 222 223 typedef Binlog_transmit_observer Observer; 224 int transmit_start(THD *thd, ushort flags, 225 const char *log_file, my_off_t log_pos, 226 bool *observe_transmission); 227 int transmit_stop(THD *thd, ushort flags); 228 int reserve_header(THD *thd, ushort flags, String *packet); 229 int before_send_event(THD *thd, ushort flags, 230 String *packet, const 231 char *log_file, my_off_t log_pos ); 232 int after_send_event(THD *thd, ushort flags, 233 String *packet, const char *skipped_log_file, 234 my_off_t skipped_log_pos); 235 int after_reset_master(THD *thd, ushort flags); 236 }; 237 238 #ifdef HAVE_PSI_INTERFACE 239 extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock; 240 #endif 241 242 class Binlog_relay_IO_delegate 243 :public Delegate { 244 public: 245 Binlog_relay_IO_delegate()246 Binlog_relay_IO_delegate() 247 : Delegate( 248 #ifdef HAVE_PSI_INTERFACE 249 key_rwlock_Binlog_relay_IO_delegate_lock 250 #endif 251 ) 252 {} 253 254 typedef Binlog_relay_IO_observer Observer; 255 int thread_start(THD *thd, Master_info *mi); 256 int thread_stop(THD *thd, Master_info *mi); 257 int before_request_transmit(THD *thd, Master_info *mi, ushort flags); 258 int after_read_event(THD *thd, Master_info *mi, 259 const char *packet, ulong len, 260 const char **event_buf, ulong *event_len); 261 int after_queue_event(THD *thd, Master_info *mi, 262 const char *event_buf, ulong event_len, 263 bool synced); 264 int after_reset_slave(THD *thd, Master_info *mi); 265 private: 266 void init_param(Binlog_relay_IO_param *param, Master_info *mi); 267 }; 268 #endif /* HAVE_REPLICATION */ 269 270 int delegates_init(); 271 void delegates_destroy(); 272 273 extern Trans_delegate *transaction_delegate; 274 extern Binlog_storage_delegate *binlog_storage_delegate; 275 #ifdef HAVE_REPLICATION 276 extern Binlog_transmit_delegate *binlog_transmit_delegate; 277 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; 278 #endif /* HAVE_REPLICATION */ 279 280 /* 281 if there is no observers in the delegate, we can return 0 282 immediately. 283 */ 284 #define RUN_HOOK(group, hook, args) \ 285 (group ##_delegate->is_empty() ? \ 286 0 : group ##_delegate->hook args) 287 288 #endif /* RPL_HANDLER_H */ 289