1 /* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <assert.h>
24 #include "gcs_xcom_notification.h"
25 #include "gcs_logging.h"
26 
Finalize_notification(Gcs_xcom_engine * gcs_engine,xcom_finalize_functor * functor)27 Finalize_notification::Finalize_notification(
28   Gcs_xcom_engine *gcs_engine, xcom_finalize_functor *functor)
29   : m_gcs_engine(gcs_engine), m_functor(functor)
30 {
31 }
32 
33 
~Finalize_notification()34 Finalize_notification::~Finalize_notification() { }
35 
36 
do_execute()37 void Finalize_notification::do_execute()
38 {
39   /*
40     It will stop queueing notifications because we want to
41     stop the engine and flush any possible notification in
42     the queue. Note that after executing the callback, it
43     may not be possible to do so because some objects may
44     be destroyed.
45   */
46   m_gcs_engine->cleanup();
47 
48   /*
49     For example, now it is safe to kill XCOM's thread if it
50     has not been stopped already.
51   */
52   if (m_functor)
53     (*m_functor)();
54 }
55 
56 
Initialize_notification(xcom_initialize_functor * functor)57 Initialize_notification::Initialize_notification(
58   xcom_initialize_functor *functor) : m_functor(functor)
59 {
60 }
61 
62 
~Initialize_notification()63 Initialize_notification::~Initialize_notification() { }
64 
65 
do_execute()66 void Initialize_notification::do_execute()
67 {
68   if (m_functor)
69     (*m_functor)();
70 }
71 
72 
Data_notification(xcom_receive_data_functor * functor,synode_no message_id,Gcs_xcom_nodes * xcom_nodes,u_int size,char * data)73 Data_notification::Data_notification(xcom_receive_data_functor *functor,
74                                      synode_no message_id,
75                                      Gcs_xcom_nodes *xcom_nodes,
76                                      u_int size, char *data)
77   : m_functor(functor), m_message_id(message_id), m_xcom_nodes(xcom_nodes),
78     m_size(size), m_data(data)
79 {
80 }
81 
82 
~Data_notification()83 Data_notification::~Data_notification() { }
84 
85 
do_execute()86 void Data_notification::do_execute()
87 {
88   (*m_functor)(m_message_id, m_xcom_nodes, m_size, m_data);
89 }
90 
91 
Status_notification(xcom_status_functor * functor,int status)92 Status_notification::Status_notification(xcom_status_functor *functor, int status)
93   : m_functor(functor), m_status(status) { }
94 
95 
~Status_notification()96 Status_notification::~Status_notification() { }
97 
98 
do_execute()99 void Status_notification::do_execute()
100 {
101   (*m_functor)(m_status);
102 }
103 
104 
Global_view_notification(xcom_global_view_functor * functor,synode_no config_id,synode_no message_id,Gcs_xcom_nodes * xcom_nodes)105 Global_view_notification::Global_view_notification(xcom_global_view_functor* functor,
106                                                    synode_no config_id,
107                                                    synode_no message_id,
108                                                    Gcs_xcom_nodes *xcom_nodes)
109 
110  : m_functor(functor), m_config_id(config_id), m_message_id(message_id),
111    m_xcom_nodes(xcom_nodes) { }
112 
113 
~Global_view_notification()114 Global_view_notification::~Global_view_notification() { }
115 
116 
do_execute()117 void Global_view_notification::do_execute()
118 {
119   (*m_functor)(m_config_id, m_message_id, m_xcom_nodes);
120 }
121 
122 
Local_view_notification(xcom_local_view_functor * functor,synode_no message_id,Gcs_xcom_nodes * xcom_nodes)123 Local_view_notification::Local_view_notification(xcom_local_view_functor* functor,
124                                                  synode_no message_id,
125                                                  Gcs_xcom_nodes *xcom_nodes)
126 
127  : m_functor(functor), m_message_id(message_id), m_xcom_nodes(xcom_nodes) { }
128 
129 
~Local_view_notification()130 Local_view_notification::~Local_view_notification() { }
131 
132 
do_execute()133 void Local_view_notification::do_execute()
134 {
135   (*m_functor)(m_message_id, m_xcom_nodes);
136 }
137 
138 
Expel_notification(xcom_expel_functor * functor)139 Expel_notification::Expel_notification(xcom_expel_functor *functor)
140     : m_functor(functor) {}
141 
142 
~Expel_notification()143 Expel_notification::~Expel_notification() {}
144 
145 
do_execute()146 void Expel_notification::do_execute() { (*m_functor)(); }
147 
148 
Control_notification(xcom_control_functor * functor,Gcs_control_interface * control_if)149 Control_notification::Control_notification(xcom_control_functor* functor,
150                                            Gcs_control_interface *control_if)
151  : m_functor(functor), m_control_if(control_if) { }
152 
153 
~Control_notification()154 Control_notification::~Control_notification() { }
155 
156 
do_execute()157 void Control_notification::do_execute()
158 {
159   static_cast<void>((*m_functor)(m_control_if));
160 }
161 
162 
process_notification_thread(void * ptr_object)163 void *process_notification_thread(void *ptr_object)
164 {
165   Gcs_xcom_engine *engine= static_cast<Gcs_xcom_engine* >(ptr_object);
166   engine->process();
167   return NULL;
168 }
169 
170 
Gcs_xcom_engine()171 Gcs_xcom_engine::Gcs_xcom_engine()
172   : m_wait_for_notification_cond(), m_wait_for_notification_mutex(),
173     m_notification_queue(), m_engine_thread(), m_schedule(true)
174 {
175   m_wait_for_notification_cond.init();
176   m_wait_for_notification_mutex.init(NULL);
177 }
178 
179 
~Gcs_xcom_engine()180 Gcs_xcom_engine::~Gcs_xcom_engine()
181 {
182   m_wait_for_notification_cond.destroy();
183   m_wait_for_notification_mutex.destroy();
184 }
185 
186 
initialize(xcom_initialize_functor * functor MY_ATTRIBUTE ((unused)))187 void Gcs_xcom_engine::initialize(
188   xcom_initialize_functor *functor MY_ATTRIBUTE((unused)))
189 {
190   assert(m_notification_queue.empty());
191   assert(m_schedule);
192   m_engine_thread.create(NULL, process_notification_thread, (void *) this);
193 }
194 
195 
finalize(xcom_finalize_functor * functor)196 void Gcs_xcom_engine::finalize(xcom_finalize_functor *functor)
197 {
198   push(new Finalize_notification(this, functor));
199   m_engine_thread.join(NULL);
200   assert(m_notification_queue.empty());
201   assert(!m_schedule);
202 }
203 
204 
process()205 void Gcs_xcom_engine::process()
206 {
207   Gcs_xcom_notification *notification= NULL;
208   bool stop= false;
209 
210   while (!stop)
211   {
212     m_wait_for_notification_mutex.lock();
213     while (m_notification_queue.empty())
214     {
215       m_wait_for_notification_cond.wait(
216         m_wait_for_notification_mutex.get_native_mutex()
217       );
218     }
219     notification= m_notification_queue.front();
220     m_notification_queue.pop();
221     m_wait_for_notification_mutex.unlock();
222 
223     MYSQL_GCS_LOG_TRACE(
224       "Started executing during regular phase: " << notification
225     )
226     stop= (*notification)();
227     MYSQL_GCS_LOG_TRACE(
228       "Finish executing during regular phase: " << notification
229     )
230 
231     delete notification;
232   }
233 }
234 
235 
cleanup()236 void Gcs_xcom_engine::cleanup()
237 {
238   Gcs_xcom_notification *notification= NULL;
239 
240   m_wait_for_notification_mutex.lock();
241   m_schedule= false;
242   m_wait_for_notification_mutex.unlock();
243 
244   while (!m_notification_queue.empty())
245   {
246     notification= m_notification_queue.front();
247     m_notification_queue.pop();
248 
249     MYSQL_GCS_LOG_TRACE(
250       "Started executing during clean up phase: " << notification
251     )
252     (*notification)();
253     MYSQL_GCS_LOG_TRACE(
254       "Finished executing during clean up phase: " << notification
255     )
256 
257     delete notification;
258   }
259 }
260 
261 
push(Gcs_xcom_notification * request)262 bool Gcs_xcom_engine::push(Gcs_xcom_notification *request)
263 {
264   bool scheduled= false;
265 
266   m_wait_for_notification_mutex.lock();
267   if (m_schedule)
268   {
269     m_notification_queue.push(request);
270     m_wait_for_notification_cond.broadcast();
271     scheduled= true;
272   }
273   m_wait_for_notification_mutex.unlock();
274 
275   return scheduled;
276 }
277