1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23
24 #include "observer_server_channels.h"
25 #include "plugin.h"
26
27
group_replication_thread_start(Binlog_relay_IO_param * param)28 int group_replication_thread_start(Binlog_relay_IO_param *param)
29 {
30 int error= 0;
31 if (channel_observation_manager == NULL)
32 {
33 return error; /* purecov: inspected */
34 }
35
36 channel_observation_manager->read_lock_channel_list();
37
38 std::list<Channel_state_observer*>* channel_observers=
39 channel_observation_manager->get_channel_state_observers();
40
41 std::list<Channel_state_observer*>::const_iterator obs_iterator;
42 for (obs_iterator = channel_observers->begin();
43 obs_iterator != channel_observers->end();
44 ++obs_iterator)
45 {
46 error+= (*obs_iterator)->thread_start(param);
47 }
48
49 channel_observation_manager->unlock_channel_list();
50
51 return error;
52 }
53
group_replication_thread_stop(Binlog_relay_IO_param * param)54 int group_replication_thread_stop(Binlog_relay_IO_param *param)
55 {
56 int error= 0;
57 if (channel_observation_manager == NULL)
58 {
59 return error; /* purecov: inspected */
60 }
61
62 channel_observation_manager->read_lock_channel_list();
63
64 std::list<Channel_state_observer*>* channel_observers=
65 channel_observation_manager->get_channel_state_observers();
66
67 std::list<Channel_state_observer*>::const_iterator obs_iterator;
68 for (obs_iterator = channel_observers->begin();
69 obs_iterator != channel_observers->end();
70 ++obs_iterator)
71 {
72 error+= (*obs_iterator)->thread_stop(param);
73 }
74
75 channel_observation_manager->unlock_channel_list();
76
77 return error;
78 }
79
group_replication_applier_start(Binlog_relay_IO_param * param)80 int group_replication_applier_start(Binlog_relay_IO_param *param)
81 {
82 int error= 0;
83 if (channel_observation_manager == NULL)
84 {
85 return error; /* purecov: inspected */
86 }
87
88 channel_observation_manager->read_lock_channel_list();
89
90 std::list<Channel_state_observer*>* channel_observers=
91 channel_observation_manager->get_channel_state_observers();
92
93 std::list<Channel_state_observer*>::const_iterator obs_iterator;
94 for (obs_iterator = channel_observers->begin();
95 obs_iterator != channel_observers->end();
96 ++obs_iterator)
97 {
98 error+= (*obs_iterator)->applier_start(param);
99 }
100
101 channel_observation_manager->unlock_channel_list();
102
103 return error;
104 }
105
group_replication_applier_stop(Binlog_relay_IO_param * param,bool aborted)106 int group_replication_applier_stop(Binlog_relay_IO_param *param, bool aborted)
107 {
108 int error= 0;
109 if (channel_observation_manager == NULL)
110 {
111 return error; /* purecov: inspected */
112 }
113
114 channel_observation_manager->read_lock_channel_list();
115
116 std::list<Channel_state_observer*>* channel_observers=
117 channel_observation_manager->get_channel_state_observers();
118
119 std::list<Channel_state_observer*>::const_iterator obs_iterator;
120 for (obs_iterator = channel_observers->begin();
121 obs_iterator != channel_observers->end();
122 ++obs_iterator)
123 {
124 error+= (*obs_iterator)->applier_stop(param, aborted);
125 }
126
127 channel_observation_manager->unlock_channel_list();
128
129 return error;
130 }
131
group_replication_before_request_transmit(Binlog_relay_IO_param * param,uint32 flags)132 int group_replication_before_request_transmit(Binlog_relay_IO_param *param,
133 uint32 flags)
134 {
135 int error= 0;
136 if (channel_observation_manager == NULL)
137 {
138 return error; /* purecov: inspected */
139 }
140
141 channel_observation_manager->read_lock_channel_list();
142
143 std::list<Channel_state_observer*>* channel_observers=
144 channel_observation_manager->get_channel_state_observers();
145
146 std::list<Channel_state_observer*>::const_iterator obs_iterator;
147 for (obs_iterator = channel_observers->begin();
148 obs_iterator != channel_observers->end();
149 ++obs_iterator)
150 {
151 error+= (*obs_iterator)->before_request_transmit(param, flags);
152 }
153
154 channel_observation_manager->unlock_channel_list();
155
156 return error;
157 }
158
159
group_replication_after_read_event(Binlog_relay_IO_param * param,const char * packet,unsigned long len,const char ** event_buf,unsigned long * event_len)160 int group_replication_after_read_event(Binlog_relay_IO_param *param,
161 const char *packet, unsigned long len,
162 const char **event_buf,
163 unsigned long *event_len)
164 {
165 int error= 0;
166 if (channel_observation_manager == NULL)
167 {
168 return error; /* purecov: inspected */
169 }
170
171 channel_observation_manager->read_lock_channel_list();
172
173 std::list<Channel_state_observer*>* channel_observers=
174 channel_observation_manager->get_channel_state_observers();
175
176 std::list<Channel_state_observer*>::const_iterator obs_iterator;
177 for (obs_iterator = channel_observers->begin();
178 obs_iterator != channel_observers->end();
179 ++obs_iterator)
180 {
181 error+= (*obs_iterator)->after_read_event(param, packet, len,
182 event_buf, event_len);
183 }
184
185 channel_observation_manager->unlock_channel_list();
186
187 return error;
188 }
189
190
group_replication_after_queue_event(Binlog_relay_IO_param * param,const char * event_buf,unsigned long event_len,uint32 flags)191 int group_replication_after_queue_event(Binlog_relay_IO_param *param,
192 const char *event_buf,
193 unsigned long event_len,
194 uint32 flags)
195 {
196 int error= 0;
197 if (channel_observation_manager == NULL)
198 {
199 return error; /* purecov: inspected */
200 }
201
202 channel_observation_manager->read_lock_channel_list();
203
204 std::list<Channel_state_observer*>* channel_observers=
205 channel_observation_manager->get_channel_state_observers();
206
207 std::list<Channel_state_observer*>::const_iterator obs_iterator;
208 for (obs_iterator = channel_observers->begin();
209 obs_iterator != channel_observers->end();
210 ++obs_iterator)
211 {
212 error+= (*obs_iterator)->after_queue_event(param, event_buf,
213 event_len, flags);
214 }
215
216 channel_observation_manager->unlock_channel_list();
217
218 return error;
219 }
220
221
group_replication_after_reset_slave(Binlog_relay_IO_param * param)222 int group_replication_after_reset_slave(Binlog_relay_IO_param *param)
223 {
224 int error= 0;
225 if (channel_observation_manager == NULL)
226 {
227 return error; /* purecov: inspected */
228 }
229
230 channel_observation_manager->read_lock_channel_list();
231
232 std::list<Channel_state_observer*>* channel_observers=
233 channel_observation_manager->get_channel_state_observers();
234
235 std::list<Channel_state_observer*>::const_iterator obs_iterator;
236 for (obs_iterator = channel_observers->begin();
237 obs_iterator != channel_observers->end();
238 ++obs_iterator)
239 {
240 error+= (*obs_iterator)->after_reset_slave(param);
241 }
242
243 channel_observation_manager->unlock_channel_list();
244
245 return error;
246 }
247
248
249 Binlog_relay_IO_observer binlog_IO_observer= {
250 sizeof(Binlog_relay_IO_observer),
251
252 group_replication_thread_start,
253 group_replication_thread_stop,
254 group_replication_applier_start,
255 group_replication_applier_stop,
256 group_replication_before_request_transmit,
257 group_replication_after_read_event,
258 group_replication_after_queue_event,
259 group_replication_after_reset_slave
260 };
261