1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "replication_threads_api.h"
24 
25 using std::string;
26 
27 int
initialize_channel(char * hostname,uint port,char * user,char * password,bool use_ssl,char * ssl_ca,char * ssl_capath,char * ssl_cert,char * ssl_cipher,char * ssl_key,char * ssl_crl,char * ssl_crlpath,bool ssl_verify_server_cert,int priority,int retry_count,bool preserve_logs,bool ignore_ws_mem_limit,bool allow_drop_write_set)28 Replication_thread_api::initialize_channel(char* hostname,
29                                            uint  port,
30                                            char* user,
31                                            char* password,
32                                            bool  use_ssl,
33                                            char* ssl_ca,
34                                            char* ssl_capath,
35                                            char* ssl_cert,
36                                            char* ssl_cipher,
37                                            char* ssl_key,
38                                            char* ssl_crl,
39                                            char* ssl_crlpath,
40                                            bool  ssl_verify_server_cert,
41                                            int   priority,
42                                            int   retry_count,
43                                            bool  preserve_logs,
44                                            bool ignore_ws_mem_limit,
45                                            bool allow_drop_write_set)
46 {
47   DBUG_ENTER("Replication_thread_api::initialize");
48   int error= 0;
49 
50   Channel_creation_info info;
51   initialize_channel_creation_info(&info);
52   Channel_ssl_info ssl_info;
53   initialize_channel_ssl_info(&ssl_info);
54 
55   info.user= user;
56   info.password= password;
57   info.hostname= hostname;
58   info.port= port;
59 
60   info.auto_position= true;
61   info.replicate_same_server_id= true;
62   if (priority == GROUP_REPLICATION_APPLIER_THREAD_PRIORITY)
63   {
64     info.thd_tx_priority= GROUP_REPLICATION_APPLIER_THREAD_PRIORITY;
65   }
66   info.type= GROUP_REPLICATION_CHANNEL;
67 
68   info.retry_count= retry_count;
69 
70   info.preserve_relay_logs= preserve_logs;
71 
72   info.m_ignore_write_set_memory_limit = ignore_ws_mem_limit;
73   info.m_allow_drop_write_set = allow_drop_write_set;
74 
75   if( use_ssl || ssl_ca != NULL || ssl_capath != NULL || ssl_cert != NULL ||
76       ssl_cipher!= NULL || ssl_key != NULL || ssl_crl != NULL ||
77       ssl_crlpath != NULL || ssl_verify_server_cert)
78   {
79     ssl_info.use_ssl= use_ssl;
80     ssl_info.ssl_ca_file_name= ssl_ca;
81     ssl_info.ssl_ca_directory= ssl_capath;
82     ssl_info.ssl_cert_file_name= ssl_cert;
83     ssl_info.ssl_cipher= ssl_cipher;
84     ssl_info.ssl_key= ssl_key;
85     ssl_info.ssl_crl_file_name= ssl_crl;
86     ssl_info.ssl_crl_directory= ssl_crlpath;
87     ssl_info.ssl_verify_server_cert= ssl_verify_server_cert;
88     info.ssl_info= &ssl_info;
89   }
90 
91   error= channel_create(interface_channel, &info);
92 
93   /*
94     Flush relay log to indicate a new start.
95   */
96   if (!error)
97     error= channel_flush(interface_channel);
98 
99   DBUG_RETURN(error);
100 
101 }
102 
103 int
start_threads(bool start_receiver,bool start_applier,string * view_id,bool wait_for_connection)104 Replication_thread_api::start_threads(bool start_receiver,
105                                       bool start_applier,
106                                       string* view_id,
107                                       bool wait_for_connection)
108 {
109   DBUG_ENTER("Replication_thread_api::start_threads");
110 
111   Channel_connection_info info;
112   initialize_channel_connection_info(&info);
113 
114   char* cview_id= NULL;
115 
116   if (view_id)
117   {
118     cview_id= new char[view_id->size() + 1];
119     memcpy(cview_id, view_id->c_str(), view_id->size() + 1);
120 
121     info.until_condition= CHANNEL_UNTIL_VIEW_ID;
122     info.view_id= cview_id;
123   }
124 
125   int thread_mask= 0;
126   if (start_applier)
127   {
128     thread_mask |= CHANNEL_APPLIER_THREAD;
129   }
130   if (start_receiver)
131   {
132     thread_mask |= CHANNEL_RECEIVER_THREAD;
133   }
134 
135   int error= channel_start(interface_channel,
136                            &info,
137                            thread_mask,
138                            wait_for_connection);
139 
140   if (view_id)
141   {
142     delete [] cview_id;
143   }
144 
145   DBUG_RETURN(error);
146 }
147 
purge_logs(bool reset_all)148 int Replication_thread_api::purge_logs(bool reset_all)
149 {
150   DBUG_ENTER("Replication_thread_api::purge_logs");
151 
152   //If there is no channel, no point in invoking the method
153   if (!channel_is_active(interface_channel, CHANNEL_NO_THD))
154       DBUG_RETURN(0);
155 
156   int error= channel_purge_queue(interface_channel, reset_all);
157 
158   DBUG_RETURN(error);
159 }
160 
stop_threads(bool stop_receiver,bool stop_applier)161 int Replication_thread_api::stop_threads(bool stop_receiver, bool stop_applier)
162 {
163   DBUG_ENTER("Replication_thread_api::stop_threads");
164 
165   stop_receiver= stop_receiver && is_receiver_thread_running();
166   stop_applier= stop_applier && is_applier_thread_running();
167 
168   //If there is nothing to do, return 0
169   if (!stop_applier && !stop_receiver)
170     DBUG_RETURN(0);
171 
172   int thread_mask= 0;
173   if (stop_applier)
174   {
175     thread_mask |= CHANNEL_APPLIER_THREAD;
176   }
177   if (stop_receiver)
178   {
179     thread_mask |= CHANNEL_RECEIVER_THREAD;
180   }
181 
182   int error= channel_stop(interface_channel,
183                           thread_mask,
184                           stop_wait_timeout);
185 
186   DBUG_RETURN(error);
187 }
188 
is_receiver_thread_running()189 bool Replication_thread_api::is_receiver_thread_running()
190 {
191   return(channel_is_active(interface_channel, CHANNEL_RECEIVER_THREAD));
192 }
193 
is_receiver_thread_stopping()194 bool Replication_thread_api::is_receiver_thread_stopping()
195 {
196   return(channel_is_stopping(interface_channel, CHANNEL_RECEIVER_THREAD));
197 }
198 
is_applier_thread_running()199 bool Replication_thread_api::is_applier_thread_running()
200 {
201   return(channel_is_active(interface_channel, CHANNEL_APPLIER_THREAD));
202 }
203 
is_applier_thread_stopping()204 bool Replication_thread_api::is_applier_thread_stopping()
205 {
206   return(channel_is_stopping(interface_channel, CHANNEL_APPLIER_THREAD));
207 }
208 
209 int
queue_packet(const char * buf,ulong event_len)210 Replication_thread_api::queue_packet(const char* buf, ulong event_len)
211 {
212   return channel_queue_packet(interface_channel, buf, event_len);
213 }
214 
is_applier_thread_waiting()215 bool Replication_thread_api::is_applier_thread_waiting()
216 {
217   return (channel_is_applier_waiting(interface_channel) == 1);
218 }
219 
220 int
wait_for_gtid_execution(double timeout)221 Replication_thread_api::wait_for_gtid_execution(double timeout)
222 {
223   DBUG_ENTER("Replication_thread_api::wait_for_gtid_execution");
224 
225   int error= channel_wait_until_apply_queue_applied(interface_channel, timeout);
226 
227   /*
228     Check that applier relay log is indeed consumed.
229     This is different from channel_wait_until_apply_queue_applied()
230     on the following case: if transactions on relay log are already
231     on GTID_EXECUTED, applier thread still needs to read the relay
232     log and update log positions. So despite transactions on relay
233     log are applied, applier thread is still updating log positions
234     on info tables.
235   */
236   if (!error)
237   {
238     if (channel_is_applier_waiting(interface_channel) != 1)
239       error= REPLICATION_THREAD_WAIT_TIMEOUT_ERROR;
240   }
241 
242   DBUG_RETURN(error);
243 }
244 
245 rpl_gno
get_last_delivered_gno(rpl_sidno sidno)246 Replication_thread_api::get_last_delivered_gno(rpl_sidno sidno)
247 {
248   DBUG_ENTER("Replication_thread_api::get_last_delivered_gno");
249   DBUG_RETURN(channel_get_last_delivered_gno(interface_channel, sidno));
250 }
251 
get_applier_thread_ids(unsigned long ** thread_ids)252 int Replication_thread_api::get_applier_thread_ids(unsigned long** thread_ids)
253 {
254   DBUG_ENTER("Replication_thread_api::get_applier_thread_ids");
255   DBUG_RETURN(channel_get_thread_id(interface_channel,
256                                     CHANNEL_APPLIER_THREAD,
257                                     thread_ids));
258 }
259 
is_own_event_applier(my_thread_id id,const char * channel_name)260 bool Replication_thread_api::is_own_event_applier(my_thread_id id,
261                                                   const char* channel_name)
262 {
263   DBUG_ENTER("Replication_thread_api::is_own_event_applier");
264 
265   bool result= false;
266   unsigned long* thread_ids= NULL;
267   const char* name= channel_name ? channel_name : interface_channel;
268 
269   //Fetch all applier thread ids for this channel.
270   int number_appliers= channel_get_thread_id(name,
271                                              CHANNEL_APPLIER_THREAD,
272                                              &thread_ids);
273 
274   //If none are found return false
275   if (number_appliers <= 0)
276   {
277     goto end;
278   }
279 
280   if (number_appliers == 1)  //One applier, check its id
281   {
282     result= (*thread_ids == id);
283   }
284   else //The channel has  more than one applier, check if the id is in the list
285   {
286     for (int i = 0; i < number_appliers; i++)
287     {
288       unsigned long thread_id= thread_ids[i];
289       if (thread_id == id)
290       {
291         result= true;
292         break;
293       }
294     }
295   }
296 
297 end:
298   my_free(thread_ids);
299 
300   //The given id is not an id of the channel applier threads, return false
301   DBUG_RETURN(result);
302 }
303 
is_own_event_receiver(my_thread_id id)304 bool Replication_thread_api::is_own_event_receiver(my_thread_id id)
305 {
306   DBUG_ENTER("Replication_thread_api::is_own_event_receiver");
307 
308   bool result= false;
309   unsigned long* thread_id= NULL;
310 
311   //Fetch the receiver thread id for this channel
312   int number_receivers= channel_get_thread_id(interface_channel,
313                                               CHANNEL_RECEIVER_THREAD,
314                                               &thread_id);
315 
316   //If one is found
317   if (number_receivers > 0)
318   {
319     result= (*thread_id == id);
320   }
321   my_free(thread_id);
322 
323   //The given id is not the id of the channel receiver thread, return false
324   DBUG_RETURN(result);
325 }
326 
get_retrieved_gtid_set(std::string & retrieved_set,const char * channel_name)327 bool Replication_thread_api::get_retrieved_gtid_set(std::string& retrieved_set,
328                                                     const char* channel_name)
329 {
330   DBUG_ENTER("Replication_thread_api::get_retrieved_gtid_set");
331 
332   const char* name= channel_name ? channel_name : interface_channel;
333   char *receiver_retrieved_gtid_set= NULL;
334   int error;
335 
336   error= channel_get_retrieved_gtid_set(name,
337                                         &receiver_retrieved_gtid_set);
338   if (!error)
339     retrieved_set.assign(receiver_retrieved_gtid_set);
340 
341   my_free(receiver_retrieved_gtid_set);
342 
343   DBUG_RETURN((error != 0));
344 }
345 
is_partial_transaction_on_relay_log()346 bool Replication_thread_api::is_partial_transaction_on_relay_log()
347 {
348   return is_partial_transaction_on_channel_relay_log(interface_channel);
349 }
350