1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #ifndef RPL_HANDLER_H
24 #define RPL_HANDLER_H
25 
26 #include "my_global.h"
27 #include "my_sys.h"                        // free_root
28 #include "mysql/psi/mysql_thread.h"        // mysql_rwlock_t
29 #include "mysqld.h"                        // key_memory_delegate
30 #include "locks/shared_spin_lock.h"        // Shared_spin_lock
31 #include "sql_list.h"                      // List
32 #include "sql_plugin.h"                    // my_plugin_(un)lock
33 #include "sql_plugin_ref.h"                // plugin_ref
34 
35 #include <list>
36 #include <map>
37 #include <iostream>
38 
39 class Master_info;
40 class String;
41 struct Binlog_relay_IO_observer;
42 struct Binlog_relay_IO_param;
43 struct Binlog_storage_observer;
44 struct Binlog_transmit_observer;
45 struct Server_state_observer;
46 struct Trans_observer;
47 struct Trans_table_info;
48 
49 /**
50   Variable to keep the value set for the
51   `replication_optimize_for_static_plugin_config` global.
52 
53   When this global variable value is set to `1`, we prevent all plugins that
54   register replication observers to be unloaded until the variable is set to
55   `0`, again. While the value of the variable is `1`, we are also exchanging the
56   `Delegate` class read-write lock by an atomic-based shared spin-lock.
57 
58   This behaviour is usefull for increasing the throughtput of the master when a
59   large number of slaves is connected, by preventing the acquisition of the
60   `LOCK_plugin` mutex and using a more read-friendly lock in the `Delegate`
61   class, when invoking the observer's hooks.
62 
63   Note that a large number of slaves means a large number of dump threads, which
64   means a large number of threads calling the registered observers hooks.
65 
66   If `UNINSTALL` is executed on a replication observer plugin while the variable
67   is set to `1`, the unload of the plugin will be deferred until the variable's
68   value is set to `0`.
69  */
70 extern bool opt_replication_optimize_for_static_plugin_config;
71 /**
72   Variable to keep the value set for the
73   `replication_sender_observe_commit_only` global.
74 
75   When this global variable is set to `1`, only the replication observer's
76   commit hook will be called, every other registered hook invocation is skipped.
77  */
78 extern bool opt_replication_sender_observe_commit_only;
79 /**
80   Atomic access version of `opt_replication_sender_observe_commit_only`
81  */
82 extern int32 opt_atomic_replication_sender_observe_commit_only;
83 
84 class Observer_info {
85 public:
86   void *observer;
87   st_plugin_int *plugin_int;
88   plugin_ref plugin;
89 
90   Observer_info(void *ob, st_plugin_int *p);
91 };
92 
93 /**
94   Base class for adding replication event observer infra-structure. It's
95   meant to be sub-classed by classes that will provide the support for the
96   specific event types. This class is meant just to provide basic support
97   for managing observers and managing resource access and lock acquisition.
98  */
99 class Delegate {
100 public:
101   typedef List<Observer_info> Observer_info_list;
102   typedef List_iterator<Observer_info> Observer_info_iterator;
103 
104   /**
105     Class constructor
106 
107     @param key the PFS key for instrumenting the class lock
108    */
109   Delegate(
110 #ifdef HAVE_PSI_INTERFACE
111            PSI_rwlock_key key
112 #endif
113            );
114   /**
115     Class destructor
116    */
117   virtual ~Delegate();
118   /**
119     Adds an observer to the observer list.
120 
121     @param observer The observer object to be added to the list
122     @param plugin The plugin the observer is being loaded from
123 
124     @return 0 upon success, 1 otherwise
125    */
126   int add_observer(void *observer, st_plugin_int *plugin);
127   /**
128     Removes an observer from the observer list.
129 
130     @param observer The observer object to be added to the list
131     @param plugin The plugin the observer is being loaded from
132 
133     @return 0 upon success, 1 otherwise
134    */
135   int remove_observer(void *observer, st_plugin_int *plugin);
136   /**
137     Retrieves an iterator for the observer list.
138 
139     @return the iterator for the observer list
140    */
141   Observer_info_iterator observer_info_iter();
142   /**
143     Returns whether or not there are registered observers.
144 
145     @return whether or not there are registered observers
146    */
147   bool is_empty();
148   /**
149     Acquires this Delegate class instance lock in read/shared mode.
150 
151     @return 0 upon success, 1 otherwise
152    */
153   int read_lock();
154   /**
155     Acquires this Delegate class instance lock in write/exclusive mode.
156 
157     @return 0 upon success, 1 otherwise
158    */
159   int write_lock();
160   /**
161     Releases this Delegate class instance lock.
162 
163     @return 0 upon success, 1 otherwise
164    */
165   int unlock();
166   /**
167     Returns whether or not this instance was initialized.
168 
169     @return whether or not this instance was initialized
170    */
171   bool is_inited();
172   /**
173     Toggles the type of lock between a classical read-write lock and a
174     shared-exclusive spin-lock.
175    */
176   void update_lock_type();
177   /**
178     Increases the `info->plugin` usage reference counting if
179     `replication_optimize_for_static_plugin_config` is being enabled, in order
180     to prevent plugin removal.
181 
182     Decreases the `info->plugin` usage reference counting if
183     `replication_optimize_for_static_plugin_config` is being disabled, in order
184     to allow plugin removal.
185    */
186   void update_plugin_ref_count();
187   /**
188     Returns whether or not to use the classic read-write lock.
189 
190     The read-write lock should be used if that type of lock is already
191     acquired by some thread or if the server is not optimized for static
192     plugin configuration.
193 
194     @returns true if one should use the classic read-write lock, false otherwise
195    */
196   bool use_rw_lock_type();
197   /**
198     Returns whether or not to use the shared spin-lock.
199 
200     The shared spin-lock should be used if that type of lock is already
201     acquired by some thread or if the server is optimized for static plugin
202     configuration.
203 
204     @returns true if one should use the shared spin-lock, false otherwise
205    */
206   bool use_spin_lock_type();
207 
208 private:
209   /** List of registered observers */
210   Observer_info_list observer_info_list;
211   /**
212     A read/write lock to be used when not optimizing for static plugin config
213    */
214   mysql_rwlock_t lock;
215   /**
216     A shared-exclusive spin lock to be used when optimizing for static plugin
217     config.
218    */
219   lock::Shared_spin_lock m_spin_lock;
220   /** Memory pool to be used to allocate the observers list */
221   MEM_ROOT memroot;
222   /** Flag statign whether or not this instance was initialized */
223   bool inited;
224   /**
225     The type of lock configured to be used, either a classic read-write (-1)
226     lock or a shared-exclusive spin lock (1).
227    */
228   int32 m_configured_lock_type;
229   /**
230     The count of locks acquired: -1 will be added for each classic
231     read-write lock acquisitions; +1 will be added for each
232     shared-exclusive spin lock acquisition.
233    */
234   int32 m_acquired_locks;
235   /**
236     List of acquired plugin references, to be held while
237     `replication_optimize_for_static_plugin_config` option is enabled. If the
238     option is disabled, the references in this list will be released.
239    */
240   std::map<plugin_ref, size_t> m_acquired_references;
241 
242   enum enum_delegate_lock_type
243   {
244     DELEGATE_OS_LOCK= -1,   // Lock used by this class is an OS RW lock
245     DELEGATE_SPIN_LOCK= 1,  // Lock used by this class is a spin lock
246   };
247 
248   enum enum_delegate_lock_mode
249   {
250     DELEGATE_LOCK_MODE_SHARED= 0, // Lock acquired in shared/read mode
251     DELEGATE_LOCK_MODE_EXCLUSIVE= 1, // Lock acquired in exclusive/write mode
252   };
253 
254   /**
255     Increases the `info->plugin` reference counting and stores that refernce
256     internally.
257    */
258   void acquire_plugin_ref_count(Observer_info *info);
259   /**
260     Locks the active lock (OS read-write lock or shared spin-lock)
261     according to the mode passed on as a parameter.
262 
263     @param mode The mode to lock in, either DELEGATE_LOCK_MODE_SHARED or
264                 DELEGATE_LOCK_MODE_EXCLUSIVE.
265    */
266   void lock_it(enum_delegate_lock_mode mode);
267 };
268 
269 #ifdef HAVE_PSI_INTERFACE
270 extern PSI_rwlock_key key_rwlock_Trans_delegate_lock;
271 #endif
272 
273 class Trans_delegate
274   :public Delegate {
275 public:
276 
Trans_delegate()277   Trans_delegate()
278   : Delegate(
279 #ifdef HAVE_PSI_INTERFACE
280              key_rwlock_Trans_delegate_lock
281 #endif
282              )
283   {}
284 
285   typedef Trans_observer Observer;
286 
287   int before_dml(THD *thd, int& result);
288   int before_commit(THD *thd, bool all,
289                     IO_CACHE *trx_cache_log,
290                     IO_CACHE *stmt_cache_log,
291                     ulonglong cache_log_max_size);
292   int before_rollback(THD *thd, bool all);
293   int after_commit(THD *thd, bool all);
294   int after_rollback(THD *thd, bool all);
295 private:
296   void prepare_table_info(THD* thd,
297                           Trans_table_info*& table_info_list,
298                           uint& number_of_tables);
299 };
300 
301 #ifdef HAVE_PSI_INTERFACE
302 extern PSI_rwlock_key key_rwlock_Server_state_delegate_lock;
303 #endif
304 
305 class Server_state_delegate
306   :public Delegate {
307 public:
308 
Server_state_delegate()309   Server_state_delegate()
310   : Delegate(
311 #ifdef HAVE_PSI_INTERFACE
312              key_rwlock_Server_state_delegate_lock
313 #endif
314              )
315   {}
316 
317   typedef Server_state_observer Observer;
318   int before_handle_connection(THD *thd);
319   int before_recovery(THD *thd);
320   int after_engine_recovery(THD *thd);
321   int after_recovery(THD *thd);
322   int before_server_shutdown(THD *thd);
323   int after_server_shutdown(THD *thd);
324 };
325 
326 #ifdef HAVE_PSI_INTERFACE
327 extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock;
328 #endif
329 
330 class Binlog_storage_delegate
331   :public Delegate {
332 public:
333 
Binlog_storage_delegate()334   Binlog_storage_delegate()
335   : Delegate(
336 #ifdef HAVE_PSI_INTERFACE
337              key_rwlock_Binlog_storage_delegate_lock
338 #endif
339              )
340   {}
341 
342   typedef Binlog_storage_observer Observer;
343   int after_flush(THD *thd, const char *log_file,
344                   my_off_t log_pos);
345   int after_sync(THD *thd, const char *log_file,
346                  my_off_t log_pos);
347 };
348 
349 #ifdef HAVE_REPLICATION
350 #ifdef HAVE_PSI_INTERFACE
351 extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock;
352 #endif
353 
354 class Binlog_transmit_delegate
355   :public Delegate {
356 public:
357 
Binlog_transmit_delegate()358   Binlog_transmit_delegate()
359   : Delegate(
360 #ifdef HAVE_PSI_INTERFACE
361              key_rwlock_Binlog_transmit_delegate_lock
362 #endif
363              )
364   {}
365 
366   typedef Binlog_transmit_observer Observer;
367   int transmit_start(THD *thd, ushort flags,
368                      const char *log_file, my_off_t log_pos,
369                      bool *observe_transmission);
370   int transmit_stop(THD *thd, ushort flags);
371   int reserve_header(THD *thd, ushort flags, String *packet);
372   int before_send_event(THD *thd, ushort flags,
373                         String *packet, const
374                         char *log_file, my_off_t log_pos );
375   int after_send_event(THD *thd, ushort flags,
376                        String *packet, const char *skipped_log_file,
377                        my_off_t skipped_log_pos);
378   int after_reset_master(THD *thd, ushort flags);
379 };
380 
381 #ifdef HAVE_PSI_INTERFACE
382 extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock;
383 #endif
384 
385 class Binlog_relay_IO_delegate
386   :public Delegate {
387 public:
388 
Binlog_relay_IO_delegate()389   Binlog_relay_IO_delegate()
390   : Delegate(
391 #ifdef HAVE_PSI_INTERFACE
392              key_rwlock_Binlog_relay_IO_delegate_lock
393 #endif
394              )
395   {}
396 
397   typedef Binlog_relay_IO_observer Observer;
398   int thread_start(THD *thd, Master_info *mi);
399   int thread_stop(THD *thd, Master_info *mi);
400   int applier_start(THD *thd, Master_info *mi);
401   int applier_stop(THD *thd, Master_info *mi, bool aborted);
402   int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
403   int after_read_event(THD *thd, Master_info *mi,
404                        const char *packet, ulong len,
405                        const char **event_buf, ulong *event_len);
406   int after_queue_event(THD *thd, Master_info *mi,
407                         const char *event_buf, ulong event_len,
408                         bool synced);
409   int after_reset_slave(THD *thd, Master_info *mi);
410 private:
411   void init_param(Binlog_relay_IO_param *param, Master_info *mi);
412 };
413 #endif /* HAVE_REPLICATION */
414 
415 int delegates_init();
416 /**
417   Verify that the replication plugins are ready and OK to be unloaded.
418  */
419 void delegates_shutdown();
420 void delegates_destroy();
421 /**
422   Invokes `write_lock()` for all the observer delegate objects.
423  */
424 void delegates_acquire_locks();
425 /**
426   Releases locks for all the observer delegate objects.
427  */
428 void delegates_release_locks();
429 /**
430   Toggles the type of lock between a classical read-write lock and a
431   shared-exclusive spin-lock.
432  */
433 void delegates_update_lock_type();
434 
435 extern Trans_delegate *transaction_delegate;
436 extern Binlog_storage_delegate *binlog_storage_delegate;
437 extern Server_state_delegate *server_state_delegate;
438 #ifdef HAVE_REPLICATION
439 extern Binlog_transmit_delegate *binlog_transmit_delegate;
440 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
441 #endif /* HAVE_REPLICATION */
442 
443 /*
444   if there is no observers in the delegate, we can return 0
445   immediately.
446 */
447 #define RUN_HOOK(group, hook, args)             \
448   (group ##_delegate->is_empty() ?              \
449    0 : group ##_delegate->hook args)
450 
451 #define NO_HOOK(group) (group ##_delegate->is_empty())
452 
453 #endif /* RPL_HANDLER_H */
454