1 /* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <assert.h>
24 #include "gcs_xcom_notification.h"
25 #include "gcs_logging.h"
26
Finalize_notification(Gcs_xcom_engine * gcs_engine,xcom_finalize_functor * functor)27 Finalize_notification::Finalize_notification(
28 Gcs_xcom_engine *gcs_engine, xcom_finalize_functor *functor)
29 : m_gcs_engine(gcs_engine), m_functor(functor)
30 {
31 }
32
33
~Finalize_notification()34 Finalize_notification::~Finalize_notification() { }
35
36
do_execute()37 void Finalize_notification::do_execute()
38 {
39 /*
40 It will stop queueing notifications because we want to
41 stop the engine and flush any possible notification in
42 the queue. Note that after executing the callback, it
43 may not be possible to do so because some objects may
44 be destroyed.
45 */
46 m_gcs_engine->cleanup();
47
48 /*
49 For example, now it is safe to kill XCOM's thread if it
50 has not been stopped already.
51 */
52 if (m_functor)
53 (*m_functor)();
54 }
55
56
Initialize_notification(xcom_initialize_functor * functor)57 Initialize_notification::Initialize_notification(
58 xcom_initialize_functor *functor) : m_functor(functor)
59 {
60 }
61
62
~Initialize_notification()63 Initialize_notification::~Initialize_notification() { }
64
65
do_execute()66 void Initialize_notification::do_execute()
67 {
68 if (m_functor)
69 (*m_functor)();
70 }
71
72
Data_notification(xcom_receive_data_functor * functor,synode_no message_id,Gcs_xcom_nodes * xcom_nodes,u_int size,char * data)73 Data_notification::Data_notification(xcom_receive_data_functor *functor,
74 synode_no message_id,
75 Gcs_xcom_nodes *xcom_nodes,
76 u_int size, char *data)
77 : m_functor(functor), m_message_id(message_id), m_xcom_nodes(xcom_nodes),
78 m_size(size), m_data(data)
79 {
80 }
81
82
~Data_notification()83 Data_notification::~Data_notification() { }
84
85
do_execute()86 void Data_notification::do_execute()
87 {
88 (*m_functor)(m_message_id, m_xcom_nodes, m_size, m_data);
89 }
90
91
Status_notification(xcom_status_functor * functor,int status)92 Status_notification::Status_notification(xcom_status_functor *functor, int status)
93 : m_functor(functor), m_status(status) { }
94
95
~Status_notification()96 Status_notification::~Status_notification() { }
97
98
do_execute()99 void Status_notification::do_execute()
100 {
101 (*m_functor)(m_status);
102 }
103
104
Global_view_notification(xcom_global_view_functor * functor,synode_no config_id,synode_no message_id,Gcs_xcom_nodes * xcom_nodes)105 Global_view_notification::Global_view_notification(xcom_global_view_functor* functor,
106 synode_no config_id,
107 synode_no message_id,
108 Gcs_xcom_nodes *xcom_nodes)
109
110 : m_functor(functor), m_config_id(config_id), m_message_id(message_id),
111 m_xcom_nodes(xcom_nodes) { }
112
113
~Global_view_notification()114 Global_view_notification::~Global_view_notification() { }
115
116
do_execute()117 void Global_view_notification::do_execute()
118 {
119 (*m_functor)(m_config_id, m_message_id, m_xcom_nodes);
120 }
121
122
Local_view_notification(xcom_local_view_functor * functor,synode_no message_id,Gcs_xcom_nodes * xcom_nodes)123 Local_view_notification::Local_view_notification(xcom_local_view_functor* functor,
124 synode_no message_id,
125 Gcs_xcom_nodes *xcom_nodes)
126
127 : m_functor(functor), m_message_id(message_id), m_xcom_nodes(xcom_nodes) { }
128
129
~Local_view_notification()130 Local_view_notification::~Local_view_notification() { }
131
132
do_execute()133 void Local_view_notification::do_execute()
134 {
135 (*m_functor)(m_message_id, m_xcom_nodes);
136 }
137
138
Expel_notification(xcom_expel_functor * functor)139 Expel_notification::Expel_notification(xcom_expel_functor *functor)
140 : m_functor(functor) {}
141
142
~Expel_notification()143 Expel_notification::~Expel_notification() {}
144
145
do_execute()146 void Expel_notification::do_execute() { (*m_functor)(); }
147
148
Control_notification(xcom_control_functor * functor,Gcs_control_interface * control_if)149 Control_notification::Control_notification(xcom_control_functor* functor,
150 Gcs_control_interface *control_if)
151 : m_functor(functor), m_control_if(control_if) { }
152
153
~Control_notification()154 Control_notification::~Control_notification() { }
155
156
do_execute()157 void Control_notification::do_execute()
158 {
159 static_cast<void>((*m_functor)(m_control_if));
160 }
161
162
process_notification_thread(void * ptr_object)163 void *process_notification_thread(void *ptr_object)
164 {
165 Gcs_xcom_engine *engine= static_cast<Gcs_xcom_engine* >(ptr_object);
166 engine->process();
167 return NULL;
168 }
169
170
Gcs_xcom_engine()171 Gcs_xcom_engine::Gcs_xcom_engine()
172 : m_wait_for_notification_cond(), m_wait_for_notification_mutex(),
173 m_notification_queue(), m_engine_thread(), m_schedule(true)
174 {
175 m_wait_for_notification_cond.init();
176 m_wait_for_notification_mutex.init(NULL);
177 }
178
179
~Gcs_xcom_engine()180 Gcs_xcom_engine::~Gcs_xcom_engine()
181 {
182 m_wait_for_notification_cond.destroy();
183 m_wait_for_notification_mutex.destroy();
184 }
185
186
initialize(xcom_initialize_functor * functor MY_ATTRIBUTE ((unused)))187 void Gcs_xcom_engine::initialize(
188 xcom_initialize_functor *functor MY_ATTRIBUTE((unused)))
189 {
190 assert(m_notification_queue.empty());
191 assert(m_schedule);
192 m_engine_thread.create(NULL, process_notification_thread, (void *) this);
193 }
194
195
finalize(xcom_finalize_functor * functor)196 void Gcs_xcom_engine::finalize(xcom_finalize_functor *functor)
197 {
198 push(new Finalize_notification(this, functor));
199 m_engine_thread.join(NULL);
200 assert(m_notification_queue.empty());
201 assert(!m_schedule);
202 }
203
204
process()205 void Gcs_xcom_engine::process()
206 {
207 Gcs_xcom_notification *notification= NULL;
208 bool stop= false;
209
210 while (!stop)
211 {
212 m_wait_for_notification_mutex.lock();
213 while (m_notification_queue.empty())
214 {
215 m_wait_for_notification_cond.wait(
216 m_wait_for_notification_mutex.get_native_mutex()
217 );
218 }
219 notification= m_notification_queue.front();
220 m_notification_queue.pop();
221 m_wait_for_notification_mutex.unlock();
222
223 MYSQL_GCS_LOG_TRACE(
224 "Started executing during regular phase: " << notification
225 )
226 stop= (*notification)();
227 MYSQL_GCS_LOG_TRACE(
228 "Finish executing during regular phase: " << notification
229 )
230
231 delete notification;
232 }
233 }
234
235
cleanup()236 void Gcs_xcom_engine::cleanup()
237 {
238 Gcs_xcom_notification *notification= NULL;
239
240 m_wait_for_notification_mutex.lock();
241 m_schedule= false;
242 m_wait_for_notification_mutex.unlock();
243
244 while (!m_notification_queue.empty())
245 {
246 notification= m_notification_queue.front();
247 m_notification_queue.pop();
248
249 MYSQL_GCS_LOG_TRACE(
250 "Started executing during clean up phase: " << notification
251 )
252 (*notification)();
253 MYSQL_GCS_LOG_TRACE(
254 "Finished executing during clean up phase: " << notification
255 )
256
257 delete notification;
258 }
259 }
260
261
push(Gcs_xcom_notification * request)262 bool Gcs_xcom_engine::push(Gcs_xcom_notification *request)
263 {
264 bool scheduled= false;
265
266 m_wait_for_notification_mutex.lock();
267 if (m_schedule)
268 {
269 m_notification_queue.push(request);
270 m_wait_for_notification_cond.broadcast();
271 scheduled= true;
272 }
273 m_wait_for_notification_mutex.unlock();
274
275 return scheduled;
276 }
277