1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 
24 #include "observer_server_channels.h"
25 #include "plugin.h"
26 
27 
group_replication_thread_start(Binlog_relay_IO_param * param)28 int group_replication_thread_start(Binlog_relay_IO_param *param)
29 {
30   int error= 0;
31   if (channel_observation_manager == NULL)
32   {
33     return error; /* purecov: inspected */
34   }
35 
36   channel_observation_manager->read_lock_channel_list();
37 
38   std::list<Channel_state_observer*>* channel_observers=
39       channel_observation_manager->get_channel_state_observers();
40 
41   std::list<Channel_state_observer*>::const_iterator obs_iterator;
42   for (obs_iterator = channel_observers->begin();
43        obs_iterator != channel_observers->end();
44        ++obs_iterator)
45   {
46     error+= (*obs_iterator)->thread_start(param);
47   }
48 
49   channel_observation_manager->unlock_channel_list();
50 
51   return error;
52 }
53 
group_replication_thread_stop(Binlog_relay_IO_param * param)54 int group_replication_thread_stop(Binlog_relay_IO_param *param)
55 {
56   int error= 0;
57   if (channel_observation_manager == NULL)
58   {
59     return error; /* purecov: inspected */
60   }
61 
62   channel_observation_manager->read_lock_channel_list();
63 
64   std::list<Channel_state_observer*>* channel_observers=
65       channel_observation_manager->get_channel_state_observers();
66 
67   std::list<Channel_state_observer*>::const_iterator obs_iterator;
68   for (obs_iterator = channel_observers->begin();
69        obs_iterator != channel_observers->end();
70        ++obs_iterator)
71   {
72     error+= (*obs_iterator)->thread_stop(param);
73   }
74 
75   channel_observation_manager->unlock_channel_list();
76 
77   return error;
78 }
79 
group_replication_applier_start(Binlog_relay_IO_param * param)80 int group_replication_applier_start(Binlog_relay_IO_param *param)
81 {
82   int error= 0;
83   if (channel_observation_manager == NULL)
84   {
85     return error; /* purecov: inspected */
86   }
87 
88   channel_observation_manager->read_lock_channel_list();
89 
90   std::list<Channel_state_observer*>* channel_observers=
91       channel_observation_manager->get_channel_state_observers();
92 
93   std::list<Channel_state_observer*>::const_iterator obs_iterator;
94   for (obs_iterator = channel_observers->begin();
95        obs_iterator != channel_observers->end();
96        ++obs_iterator)
97   {
98     error+= (*obs_iterator)->applier_start(param);
99   }
100 
101   channel_observation_manager->unlock_channel_list();
102 
103   return error;
104 }
105 
group_replication_applier_stop(Binlog_relay_IO_param * param,bool aborted)106 int group_replication_applier_stop(Binlog_relay_IO_param *param, bool aborted)
107 {
108   int error= 0;
109   if (channel_observation_manager == NULL)
110   {
111     return error; /* purecov: inspected */
112   }
113 
114   channel_observation_manager->read_lock_channel_list();
115 
116   std::list<Channel_state_observer*>* channel_observers=
117       channel_observation_manager->get_channel_state_observers();
118 
119   std::list<Channel_state_observer*>::const_iterator obs_iterator;
120   for (obs_iterator = channel_observers->begin();
121        obs_iterator != channel_observers->end();
122        ++obs_iterator)
123   {
124     error+= (*obs_iterator)->applier_stop(param, aborted);
125   }
126 
127   channel_observation_manager->unlock_channel_list();
128 
129   return error;
130 }
131 
group_replication_before_request_transmit(Binlog_relay_IO_param * param,uint32 flags)132 int group_replication_before_request_transmit(Binlog_relay_IO_param *param,
133                                               uint32 flags)
134 {
135   int error= 0;
136   if (channel_observation_manager == NULL)
137   {
138     return error; /* purecov: inspected */
139   }
140 
141   channel_observation_manager->read_lock_channel_list();
142 
143   std::list<Channel_state_observer*>* channel_observers=
144       channel_observation_manager->get_channel_state_observers();
145 
146   std::list<Channel_state_observer*>::const_iterator obs_iterator;
147   for (obs_iterator = channel_observers->begin();
148        obs_iterator != channel_observers->end();
149        ++obs_iterator)
150   {
151     error+= (*obs_iterator)->before_request_transmit(param, flags);
152   }
153 
154   channel_observation_manager->unlock_channel_list();
155 
156   return error;
157 }
158 
159 
group_replication_after_read_event(Binlog_relay_IO_param * param,const char * packet,unsigned long len,const char ** event_buf,unsigned long * event_len)160 int group_replication_after_read_event(Binlog_relay_IO_param *param,
161                                        const char *packet, unsigned long len,
162                                        const char **event_buf,
163                                        unsigned long *event_len)
164 {
165   int error= 0;
166   if (channel_observation_manager == NULL)
167   {
168     return error; /* purecov: inspected */
169   }
170 
171   channel_observation_manager->read_lock_channel_list();
172 
173   std::list<Channel_state_observer*>* channel_observers=
174       channel_observation_manager->get_channel_state_observers();
175 
176   std::list<Channel_state_observer*>::const_iterator obs_iterator;
177   for (obs_iterator = channel_observers->begin();
178        obs_iterator != channel_observers->end();
179        ++obs_iterator)
180   {
181     error+= (*obs_iterator)->after_read_event(param, packet, len,
182                                               event_buf, event_len);
183   }
184 
185   channel_observation_manager->unlock_channel_list();
186 
187   return error;
188 }
189 
190 
group_replication_after_queue_event(Binlog_relay_IO_param * param,const char * event_buf,unsigned long event_len,uint32 flags)191 int group_replication_after_queue_event(Binlog_relay_IO_param *param,
192                                         const char *event_buf,
193                                         unsigned long event_len,
194                                         uint32 flags)
195 {
196   int error= 0;
197   if (channel_observation_manager == NULL)
198   {
199     return error; /* purecov: inspected */
200   }
201 
202   channel_observation_manager->read_lock_channel_list();
203 
204   std::list<Channel_state_observer*>* channel_observers=
205       channel_observation_manager->get_channel_state_observers();
206 
207   std::list<Channel_state_observer*>::const_iterator obs_iterator;
208   for (obs_iterator = channel_observers->begin();
209        obs_iterator != channel_observers->end();
210        ++obs_iterator)
211   {
212     error+= (*obs_iterator)->after_queue_event(param, event_buf,
213                                                event_len, flags);
214   }
215 
216   channel_observation_manager->unlock_channel_list();
217 
218   return error;
219 }
220 
221 
group_replication_after_reset_slave(Binlog_relay_IO_param * param)222 int group_replication_after_reset_slave(Binlog_relay_IO_param *param)
223 {
224   int error= 0;
225   if (channel_observation_manager == NULL)
226   {
227     return error; /* purecov: inspected */
228   }
229 
230   channel_observation_manager->read_lock_channel_list();
231 
232   std::list<Channel_state_observer*>* channel_observers=
233       channel_observation_manager->get_channel_state_observers();
234 
235   std::list<Channel_state_observer*>::const_iterator obs_iterator;
236   for (obs_iterator = channel_observers->begin();
237        obs_iterator != channel_observers->end();
238        ++obs_iterator)
239   {
240     error+= (*obs_iterator)->after_reset_slave(param);
241   }
242 
243   channel_observation_manager->unlock_channel_list();
244 
245   return error;
246 }
247 
248 
249 Binlog_relay_IO_observer binlog_IO_observer= {
250     sizeof(Binlog_relay_IO_observer),
251 
252     group_replication_thread_start,
253     group_replication_thread_stop,
254     group_replication_applier_start,
255     group_replication_applier_stop,
256     group_replication_before_request_transmit,
257     group_replication_after_read_event,
258     group_replication_after_queue_event,
259     group_replication_after_reset_slave
260   };
261