1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software Foundation, 21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ 22 23 #ifndef RPL_HANDLER_H 24 #define RPL_HANDLER_H 25 26 #include "my_global.h" 27 #include "my_sys.h" // free_root 28 #include "mysql/psi/mysql_thread.h" // mysql_rwlock_t 29 #include "mysqld.h" // key_memory_delegate 30 #include "locks/shared_spin_lock.h" // Shared_spin_lock 31 #include "sql_list.h" // List 32 #include "sql_plugin.h" // my_plugin_(un)lock 33 #include "sql_plugin_ref.h" // plugin_ref 34 35 #include <list> 36 #include <map> 37 #include <iostream> 38 39 class Master_info; 40 class String; 41 struct Binlog_relay_IO_observer; 42 struct Binlog_relay_IO_param; 43 struct Binlog_storage_observer; 44 struct Binlog_transmit_observer; 45 struct Server_state_observer; 46 struct Trans_observer; 47 struct Trans_table_info; 48 49 /** 50 Variable to keep the value set for the 51 `replication_optimize_for_static_plugin_config` global. 52 53 When this global variable value is set to `1`, we prevent all plugins that 54 register replication observers to be unloaded until the variable is set to 55 `0`, again. While the value of the variable is `1`, we are also exchanging the 56 `Delegate` class read-write lock by an atomic-based shared spin-lock. 57 58 This behaviour is usefull for increasing the throughtput of the master when a 59 large number of slaves is connected, by preventing the acquisition of the 60 `LOCK_plugin` mutex and using a more read-friendly lock in the `Delegate` 61 class, when invoking the observer's hooks. 62 63 Note that a large number of slaves means a large number of dump threads, which 64 means a large number of threads calling the registered observers hooks. 65 66 If `UNINSTALL` is executed on a replication observer plugin while the variable 67 is set to `1`, the unload of the plugin will be deferred until the variable's 68 value is set to `0`. 69 */ 70 extern bool opt_replication_optimize_for_static_plugin_config; 71 /** 72 Variable to keep the value set for the 73 `replication_sender_observe_commit_only` global. 74 75 When this global variable is set to `1`, only the replication observer's 76 commit hook will be called, every other registered hook invocation is skipped. 77 */ 78 extern bool opt_replication_sender_observe_commit_only; 79 /** 80 Atomic access version of `opt_replication_sender_observe_commit_only` 81 */ 82 extern int32 opt_atomic_replication_sender_observe_commit_only; 83 84 class Observer_info { 85 public: 86 void *observer; 87 st_plugin_int *plugin_int; 88 plugin_ref plugin; 89 90 Observer_info(void *ob, st_plugin_int *p); 91 }; 92 93 /** 94 Base class for adding replication event observer infra-structure. It's 95 meant to be sub-classed by classes that will provide the support for the 96 specific event types. This class is meant just to provide basic support 97 for managing observers and managing resource access and lock acquisition. 98 */ 99 class Delegate { 100 public: 101 typedef List<Observer_info> Observer_info_list; 102 typedef List_iterator<Observer_info> Observer_info_iterator; 103 104 /** 105 Class constructor 106 107 @param key the PFS key for instrumenting the class lock 108 */ 109 Delegate( 110 #ifdef HAVE_PSI_INTERFACE 111 PSI_rwlock_key key 112 #endif 113 ); 114 /** 115 Class destructor 116 */ 117 virtual ~Delegate(); 118 /** 119 Adds an observer to the observer list. 120 121 @param observer The observer object to be added to the list 122 @param plugin The plugin the observer is being loaded from 123 124 @return 0 upon success, 1 otherwise 125 */ 126 int add_observer(void *observer, st_plugin_int *plugin); 127 /** 128 Removes an observer from the observer list. 129 130 @param observer The observer object to be added to the list 131 @param plugin The plugin the observer is being loaded from 132 133 @return 0 upon success, 1 otherwise 134 */ 135 int remove_observer(void *observer, st_plugin_int *plugin); 136 /** 137 Retrieves an iterator for the observer list. 138 139 @return the iterator for the observer list 140 */ 141 Observer_info_iterator observer_info_iter(); 142 /** 143 Returns whether or not there are registered observers. 144 145 @return whether or not there are registered observers 146 */ 147 bool is_empty(); 148 /** 149 Acquires this Delegate class instance lock in read/shared mode. 150 151 @return 0 upon success, 1 otherwise 152 */ 153 int read_lock(); 154 /** 155 Acquires this Delegate class instance lock in write/exclusive mode. 156 157 @return 0 upon success, 1 otherwise 158 */ 159 int write_lock(); 160 /** 161 Releases this Delegate class instance lock. 162 163 @return 0 upon success, 1 otherwise 164 */ 165 int unlock(); 166 /** 167 Returns whether or not this instance was initialized. 168 169 @return whether or not this instance was initialized 170 */ 171 bool is_inited(); 172 /** 173 Toggles the type of lock between a classical read-write lock and a 174 shared-exclusive spin-lock. 175 */ 176 void update_lock_type(); 177 /** 178 Increases the `info->plugin` usage reference counting if 179 `replication_optimize_for_static_plugin_config` is being enabled, in order 180 to prevent plugin removal. 181 182 Decreases the `info->plugin` usage reference counting if 183 `replication_optimize_for_static_plugin_config` is being disabled, in order 184 to allow plugin removal. 185 */ 186 void update_plugin_ref_count(); 187 /** 188 Returns whether or not to use the classic read-write lock. 189 190 The read-write lock should be used if that type of lock is already 191 acquired by some thread or if the server is not optimized for static 192 plugin configuration. 193 194 @returns true if one should use the classic read-write lock, false otherwise 195 */ 196 bool use_rw_lock_type(); 197 /** 198 Returns whether or not to use the shared spin-lock. 199 200 The shared spin-lock should be used if that type of lock is already 201 acquired by some thread or if the server is optimized for static plugin 202 configuration. 203 204 @returns true if one should use the shared spin-lock, false otherwise 205 */ 206 bool use_spin_lock_type(); 207 208 private: 209 /** List of registered observers */ 210 Observer_info_list observer_info_list; 211 /** 212 A read/write lock to be used when not optimizing for static plugin config 213 */ 214 mysql_rwlock_t lock; 215 /** 216 A shared-exclusive spin lock to be used when optimizing for static plugin 217 config. 218 */ 219 lock::Shared_spin_lock m_spin_lock; 220 /** Memory pool to be used to allocate the observers list */ 221 MEM_ROOT memroot; 222 /** Flag statign whether or not this instance was initialized */ 223 bool inited; 224 /** 225 The type of lock configured to be used, either a classic read-write (-1) 226 lock or a shared-exclusive spin lock (1). 227 */ 228 int32 m_configured_lock_type; 229 /** 230 The count of locks acquired: -1 will be added for each classic 231 read-write lock acquisitions; +1 will be added for each 232 shared-exclusive spin lock acquisition. 233 */ 234 int32 m_acquired_locks; 235 /** 236 List of acquired plugin references, to be held while 237 `replication_optimize_for_static_plugin_config` option is enabled. If the 238 option is disabled, the references in this list will be released. 239 */ 240 std::map<plugin_ref, size_t> m_acquired_references; 241 242 enum enum_delegate_lock_type 243 { 244 DELEGATE_OS_LOCK= -1, // Lock used by this class is an OS RW lock 245 DELEGATE_SPIN_LOCK= 1, // Lock used by this class is a spin lock 246 }; 247 248 enum enum_delegate_lock_mode 249 { 250 DELEGATE_LOCK_MODE_SHARED= 0, // Lock acquired in shared/read mode 251 DELEGATE_LOCK_MODE_EXCLUSIVE= 1, // Lock acquired in exclusive/write mode 252 }; 253 254 /** 255 Increases the `info->plugin` reference counting and stores that refernce 256 internally. 257 */ 258 void acquire_plugin_ref_count(Observer_info *info); 259 /** 260 Locks the active lock (OS read-write lock or shared spin-lock) 261 according to the mode passed on as a parameter. 262 263 @param mode The mode to lock in, either DELEGATE_LOCK_MODE_SHARED or 264 DELEGATE_LOCK_MODE_EXCLUSIVE. 265 */ 266 void lock_it(enum_delegate_lock_mode mode); 267 }; 268 269 #ifdef HAVE_PSI_INTERFACE 270 extern PSI_rwlock_key key_rwlock_Trans_delegate_lock; 271 #endif 272 273 class Trans_delegate 274 :public Delegate { 275 public: 276 Trans_delegate()277 Trans_delegate() 278 : Delegate( 279 #ifdef HAVE_PSI_INTERFACE 280 key_rwlock_Trans_delegate_lock 281 #endif 282 ) 283 {} 284 285 typedef Trans_observer Observer; 286 287 int before_dml(THD *thd, int& result); 288 int before_commit(THD *thd, bool all, 289 IO_CACHE *trx_cache_log, 290 IO_CACHE *stmt_cache_log, 291 ulonglong cache_log_max_size); 292 int before_rollback(THD *thd, bool all); 293 int after_commit(THD *thd, bool all); 294 int after_rollback(THD *thd, bool all); 295 private: 296 void prepare_table_info(THD* thd, 297 Trans_table_info*& table_info_list, 298 uint& number_of_tables); 299 }; 300 301 #ifdef HAVE_PSI_INTERFACE 302 extern PSI_rwlock_key key_rwlock_Server_state_delegate_lock; 303 #endif 304 305 class Server_state_delegate 306 :public Delegate { 307 public: 308 Server_state_delegate()309 Server_state_delegate() 310 : Delegate( 311 #ifdef HAVE_PSI_INTERFACE 312 key_rwlock_Server_state_delegate_lock 313 #endif 314 ) 315 {} 316 317 typedef Server_state_observer Observer; 318 int before_handle_connection(THD *thd); 319 int before_recovery(THD *thd); 320 int after_engine_recovery(THD *thd); 321 int after_recovery(THD *thd); 322 int before_server_shutdown(THD *thd); 323 int after_server_shutdown(THD *thd); 324 }; 325 326 #ifdef HAVE_PSI_INTERFACE 327 extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock; 328 #endif 329 330 class Binlog_storage_delegate 331 :public Delegate { 332 public: 333 Binlog_storage_delegate()334 Binlog_storage_delegate() 335 : Delegate( 336 #ifdef HAVE_PSI_INTERFACE 337 key_rwlock_Binlog_storage_delegate_lock 338 #endif 339 ) 340 {} 341 342 typedef Binlog_storage_observer Observer; 343 int after_flush(THD *thd, const char *log_file, 344 my_off_t log_pos); 345 int after_sync(THD *thd, const char *log_file, 346 my_off_t log_pos); 347 }; 348 349 #ifdef HAVE_REPLICATION 350 #ifdef HAVE_PSI_INTERFACE 351 extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock; 352 #endif 353 354 class Binlog_transmit_delegate 355 :public Delegate { 356 public: 357 Binlog_transmit_delegate()358 Binlog_transmit_delegate() 359 : Delegate( 360 #ifdef HAVE_PSI_INTERFACE 361 key_rwlock_Binlog_transmit_delegate_lock 362 #endif 363 ) 364 {} 365 366 typedef Binlog_transmit_observer Observer; 367 int transmit_start(THD *thd, ushort flags, 368 const char *log_file, my_off_t log_pos, 369 bool *observe_transmission); 370 int transmit_stop(THD *thd, ushort flags); 371 int reserve_header(THD *thd, ushort flags, String *packet); 372 int before_send_event(THD *thd, ushort flags, 373 String *packet, const 374 char *log_file, my_off_t log_pos ); 375 int after_send_event(THD *thd, ushort flags, 376 String *packet, const char *skipped_log_file, 377 my_off_t skipped_log_pos); 378 int after_reset_master(THD *thd, ushort flags); 379 }; 380 381 #ifdef HAVE_PSI_INTERFACE 382 extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock; 383 #endif 384 385 class Binlog_relay_IO_delegate 386 :public Delegate { 387 public: 388 Binlog_relay_IO_delegate()389 Binlog_relay_IO_delegate() 390 : Delegate( 391 #ifdef HAVE_PSI_INTERFACE 392 key_rwlock_Binlog_relay_IO_delegate_lock 393 #endif 394 ) 395 {} 396 397 typedef Binlog_relay_IO_observer Observer; 398 int thread_start(THD *thd, Master_info *mi); 399 int thread_stop(THD *thd, Master_info *mi); 400 int applier_start(THD *thd, Master_info *mi); 401 int applier_stop(THD *thd, Master_info *mi, bool aborted); 402 int before_request_transmit(THD *thd, Master_info *mi, ushort flags); 403 int after_read_event(THD *thd, Master_info *mi, 404 const char *packet, ulong len, 405 const char **event_buf, ulong *event_len); 406 int after_queue_event(THD *thd, Master_info *mi, 407 const char *event_buf, ulong event_len, 408 bool synced); 409 int after_reset_slave(THD *thd, Master_info *mi); 410 private: 411 void init_param(Binlog_relay_IO_param *param, Master_info *mi); 412 }; 413 #endif /* HAVE_REPLICATION */ 414 415 int delegates_init(); 416 /** 417 Verify that the replication plugins are ready and OK to be unloaded. 418 */ 419 void delegates_shutdown(); 420 void delegates_destroy(); 421 /** 422 Invokes `write_lock()` for all the observer delegate objects. 423 */ 424 void delegates_acquire_locks(); 425 /** 426 Releases locks for all the observer delegate objects. 427 */ 428 void delegates_release_locks(); 429 /** 430 Toggles the type of lock between a classical read-write lock and a 431 shared-exclusive spin-lock. 432 */ 433 void delegates_update_lock_type(); 434 435 extern Trans_delegate *transaction_delegate; 436 extern Binlog_storage_delegate *binlog_storage_delegate; 437 extern Server_state_delegate *server_state_delegate; 438 #ifdef HAVE_REPLICATION 439 extern Binlog_transmit_delegate *binlog_transmit_delegate; 440 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; 441 #endif /* HAVE_REPLICATION */ 442 443 /* 444 if there is no observers in the delegate, we can return 0 445 immediately. 446 */ 447 #define RUN_HOOK(group, hook, args) \ 448 (group ##_delegate->is_empty() ? \ 449 0 : group ##_delegate->hook args) 450 451 #define NO_HOOK(group) (group ##_delegate->is_empty()) 452 453 #endif /* RPL_HANDLER_H */ 454