1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "replication_threads_api.h"
24
25 using std::string;
26
27 int
initialize_channel(char * hostname,uint port,char * user,char * password,bool use_ssl,char * ssl_ca,char * ssl_capath,char * ssl_cert,char * ssl_cipher,char * ssl_key,char * ssl_crl,char * ssl_crlpath,bool ssl_verify_server_cert,int priority,int retry_count,bool preserve_logs,bool ignore_ws_mem_limit,bool allow_drop_write_set)28 Replication_thread_api::initialize_channel(char* hostname,
29 uint port,
30 char* user,
31 char* password,
32 bool use_ssl,
33 char* ssl_ca,
34 char* ssl_capath,
35 char* ssl_cert,
36 char* ssl_cipher,
37 char* ssl_key,
38 char* ssl_crl,
39 char* ssl_crlpath,
40 bool ssl_verify_server_cert,
41 int priority,
42 int retry_count,
43 bool preserve_logs,
44 bool ignore_ws_mem_limit,
45 bool allow_drop_write_set)
46 {
47 DBUG_ENTER("Replication_thread_api::initialize");
48 int error= 0;
49
50 Channel_creation_info info;
51 initialize_channel_creation_info(&info);
52 Channel_ssl_info ssl_info;
53 initialize_channel_ssl_info(&ssl_info);
54
55 info.user= user;
56 info.password= password;
57 info.hostname= hostname;
58 info.port= port;
59
60 info.auto_position= true;
61 info.replicate_same_server_id= true;
62 if (priority == GROUP_REPLICATION_APPLIER_THREAD_PRIORITY)
63 {
64 info.thd_tx_priority= GROUP_REPLICATION_APPLIER_THREAD_PRIORITY;
65 }
66 info.type= GROUP_REPLICATION_CHANNEL;
67
68 info.retry_count= retry_count;
69
70 info.preserve_relay_logs= preserve_logs;
71
72 info.m_ignore_write_set_memory_limit = ignore_ws_mem_limit;
73 info.m_allow_drop_write_set = allow_drop_write_set;
74
75 if( use_ssl || ssl_ca != NULL || ssl_capath != NULL || ssl_cert != NULL ||
76 ssl_cipher!= NULL || ssl_key != NULL || ssl_crl != NULL ||
77 ssl_crlpath != NULL || ssl_verify_server_cert)
78 {
79 ssl_info.use_ssl= use_ssl;
80 ssl_info.ssl_ca_file_name= ssl_ca;
81 ssl_info.ssl_ca_directory= ssl_capath;
82 ssl_info.ssl_cert_file_name= ssl_cert;
83 ssl_info.ssl_cipher= ssl_cipher;
84 ssl_info.ssl_key= ssl_key;
85 ssl_info.ssl_crl_file_name= ssl_crl;
86 ssl_info.ssl_crl_directory= ssl_crlpath;
87 ssl_info.ssl_verify_server_cert= ssl_verify_server_cert;
88 info.ssl_info= &ssl_info;
89 }
90
91 error= channel_create(interface_channel, &info);
92
93 /*
94 Flush relay log to indicate a new start.
95 */
96 if (!error)
97 error= channel_flush(interface_channel);
98
99 DBUG_RETURN(error);
100
101 }
102
103 int
start_threads(bool start_receiver,bool start_applier,string * view_id,bool wait_for_connection)104 Replication_thread_api::start_threads(bool start_receiver,
105 bool start_applier,
106 string* view_id,
107 bool wait_for_connection)
108 {
109 DBUG_ENTER("Replication_thread_api::start_threads");
110
111 Channel_connection_info info;
112 initialize_channel_connection_info(&info);
113
114 char* cview_id= NULL;
115
116 if (view_id)
117 {
118 cview_id= new char[view_id->size() + 1];
119 memcpy(cview_id, view_id->c_str(), view_id->size() + 1);
120
121 info.until_condition= CHANNEL_UNTIL_VIEW_ID;
122 info.view_id= cview_id;
123 }
124
125 int thread_mask= 0;
126 if (start_applier)
127 {
128 thread_mask |= CHANNEL_APPLIER_THREAD;
129 }
130 if (start_receiver)
131 {
132 thread_mask |= CHANNEL_RECEIVER_THREAD;
133 }
134
135 int error= channel_start(interface_channel,
136 &info,
137 thread_mask,
138 wait_for_connection);
139
140 if (view_id)
141 {
142 delete [] cview_id;
143 }
144
145 DBUG_RETURN(error);
146 }
147
purge_logs(bool reset_all)148 int Replication_thread_api::purge_logs(bool reset_all)
149 {
150 DBUG_ENTER("Replication_thread_api::purge_logs");
151
152 //If there is no channel, no point in invoking the method
153 if (!channel_is_active(interface_channel, CHANNEL_NO_THD))
154 DBUG_RETURN(0);
155
156 int error= channel_purge_queue(interface_channel, reset_all);
157
158 DBUG_RETURN(error);
159 }
160
stop_threads(bool stop_receiver,bool stop_applier)161 int Replication_thread_api::stop_threads(bool stop_receiver, bool stop_applier)
162 {
163 DBUG_ENTER("Replication_thread_api::stop_threads");
164
165 stop_receiver= stop_receiver && is_receiver_thread_running();
166 stop_applier= stop_applier && is_applier_thread_running();
167
168 //If there is nothing to do, return 0
169 if (!stop_applier && !stop_receiver)
170 DBUG_RETURN(0);
171
172 int thread_mask= 0;
173 if (stop_applier)
174 {
175 thread_mask |= CHANNEL_APPLIER_THREAD;
176 }
177 if (stop_receiver)
178 {
179 thread_mask |= CHANNEL_RECEIVER_THREAD;
180 }
181
182 int error= channel_stop(interface_channel,
183 thread_mask,
184 stop_wait_timeout);
185
186 DBUG_RETURN(error);
187 }
188
is_receiver_thread_running()189 bool Replication_thread_api::is_receiver_thread_running()
190 {
191 return(channel_is_active(interface_channel, CHANNEL_RECEIVER_THREAD));
192 }
193
is_receiver_thread_stopping()194 bool Replication_thread_api::is_receiver_thread_stopping()
195 {
196 return(channel_is_stopping(interface_channel, CHANNEL_RECEIVER_THREAD));
197 }
198
is_applier_thread_running()199 bool Replication_thread_api::is_applier_thread_running()
200 {
201 return(channel_is_active(interface_channel, CHANNEL_APPLIER_THREAD));
202 }
203
is_applier_thread_stopping()204 bool Replication_thread_api::is_applier_thread_stopping()
205 {
206 return(channel_is_stopping(interface_channel, CHANNEL_APPLIER_THREAD));
207 }
208
209 int
queue_packet(const char * buf,ulong event_len)210 Replication_thread_api::queue_packet(const char* buf, ulong event_len)
211 {
212 return channel_queue_packet(interface_channel, buf, event_len);
213 }
214
is_applier_thread_waiting()215 bool Replication_thread_api::is_applier_thread_waiting()
216 {
217 return (channel_is_applier_waiting(interface_channel) == 1);
218 }
219
220 int
wait_for_gtid_execution(double timeout)221 Replication_thread_api::wait_for_gtid_execution(double timeout)
222 {
223 DBUG_ENTER("Replication_thread_api::wait_for_gtid_execution");
224
225 int error= channel_wait_until_apply_queue_applied(interface_channel, timeout);
226
227 /*
228 Check that applier relay log is indeed consumed.
229 This is different from channel_wait_until_apply_queue_applied()
230 on the following case: if transactions on relay log are already
231 on GTID_EXECUTED, applier thread still needs to read the relay
232 log and update log positions. So despite transactions on relay
233 log are applied, applier thread is still updating log positions
234 on info tables.
235 */
236 if (!error)
237 {
238 if (channel_is_applier_waiting(interface_channel) != 1)
239 error= REPLICATION_THREAD_WAIT_TIMEOUT_ERROR;
240 }
241
242 DBUG_RETURN(error);
243 }
244
245 rpl_gno
get_last_delivered_gno(rpl_sidno sidno)246 Replication_thread_api::get_last_delivered_gno(rpl_sidno sidno)
247 {
248 DBUG_ENTER("Replication_thread_api::get_last_delivered_gno");
249 DBUG_RETURN(channel_get_last_delivered_gno(interface_channel, sidno));
250 }
251
get_applier_thread_ids(unsigned long ** thread_ids)252 int Replication_thread_api::get_applier_thread_ids(unsigned long** thread_ids)
253 {
254 DBUG_ENTER("Replication_thread_api::get_applier_thread_ids");
255 DBUG_RETURN(channel_get_thread_id(interface_channel,
256 CHANNEL_APPLIER_THREAD,
257 thread_ids));
258 }
259
is_own_event_applier(my_thread_id id,const char * channel_name)260 bool Replication_thread_api::is_own_event_applier(my_thread_id id,
261 const char* channel_name)
262 {
263 DBUG_ENTER("Replication_thread_api::is_own_event_applier");
264
265 bool result= false;
266 unsigned long* thread_ids= NULL;
267 const char* name= channel_name ? channel_name : interface_channel;
268
269 //Fetch all applier thread ids for this channel.
270 int number_appliers= channel_get_thread_id(name,
271 CHANNEL_APPLIER_THREAD,
272 &thread_ids);
273
274 //If none are found return false
275 if (number_appliers <= 0)
276 {
277 goto end;
278 }
279
280 if (number_appliers == 1) //One applier, check its id
281 {
282 result= (*thread_ids == id);
283 }
284 else //The channel has more than one applier, check if the id is in the list
285 {
286 for (int i = 0; i < number_appliers; i++)
287 {
288 unsigned long thread_id= thread_ids[i];
289 if (thread_id == id)
290 {
291 result= true;
292 break;
293 }
294 }
295 }
296
297 end:
298 my_free(thread_ids);
299
300 //The given id is not an id of the channel applier threads, return false
301 DBUG_RETURN(result);
302 }
303
is_own_event_receiver(my_thread_id id)304 bool Replication_thread_api::is_own_event_receiver(my_thread_id id)
305 {
306 DBUG_ENTER("Replication_thread_api::is_own_event_receiver");
307
308 bool result= false;
309 unsigned long* thread_id= NULL;
310
311 //Fetch the receiver thread id for this channel
312 int number_receivers= channel_get_thread_id(interface_channel,
313 CHANNEL_RECEIVER_THREAD,
314 &thread_id);
315
316 //If one is found
317 if (number_receivers > 0)
318 {
319 result= (*thread_id == id);
320 }
321 my_free(thread_id);
322
323 //The given id is not the id of the channel receiver thread, return false
324 DBUG_RETURN(result);
325 }
326
get_retrieved_gtid_set(std::string & retrieved_set,const char * channel_name)327 bool Replication_thread_api::get_retrieved_gtid_set(std::string& retrieved_set,
328 const char* channel_name)
329 {
330 DBUG_ENTER("Replication_thread_api::get_retrieved_gtid_set");
331
332 const char* name= channel_name ? channel_name : interface_channel;
333 char *receiver_retrieved_gtid_set= NULL;
334 int error;
335
336 error= channel_get_retrieved_gtid_set(name,
337 &receiver_retrieved_gtid_set);
338 if (!error)
339 retrieved_set.assign(receiver_retrieved_gtid_set);
340
341 my_free(receiver_retrieved_gtid_set);
342
343 DBUG_RETURN((error != 0));
344 }
345
is_partial_transaction_on_relay_log()346 bool Replication_thread_api::is_partial_transaction_on_relay_log()
347 {
348 return is_partial_transaction_on_channel_relay_log(interface_channel);
349 }
350