1 /* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/rpl_channel_service_interface.h"
24 
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <atomic>
29 #include <map>
30 #include <sstream>
31 #include <utility>
32 
33 #include "mutex_lock.h"  // MUTEX_LOCK
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_inttypes.h"
37 #include "my_loglevel.h"
38 #include "my_sys.h"
39 #include "my_thread.h"
40 #include "mysql/components/services/log_builtins.h"
41 #include "mysql/components/services/psi_stage_bits.h"
42 #include "mysql/psi/mysql_cond.h"
43 #include "mysql/psi/mysql_mutex.h"
44 #include "mysql/psi/psi_base.h"
45 #include "mysql/service_mysql_alloc.h"
46 #include "mysql_com.h"
47 #include "mysqld_error.h"
48 #include "sql/auth/sql_security_ctx.h"
49 #include "sql/binlog.h"
50 #include "sql/current_thd.h"
51 #include "sql/log.h"
52 #include "sql/log_event.h"
53 #include "sql/mysqld.h"              // opt_mts_slave_parallel_workers
54 #include "sql/mysqld_thd_manager.h"  // Global_THD_manager
55 #include "sql/protocol_classic.h"
56 #include "sql/rpl_channel_credentials.h"
57 #include "sql/rpl_gtid.h"
58 #include "sql/rpl_info_factory.h"
59 #include "sql/rpl_info_handler.h"
60 #include "sql/rpl_mi.h"
61 #include "sql/rpl_msr.h" /* Multisource replication */
62 #include "sql/rpl_mts_submode.h"
63 #include "sql/rpl_rli.h"
64 #include "sql/rpl_rli_pdb.h"
65 #include "sql/rpl_slave.h"
66 #include "sql/rpl_trx_boundary_parser.h"
67 #include "sql/sql_class.h"
68 #include "sql/sql_lex.h"
69 
70 /**
71   Auxiliary function to stop all the running channel threads according to the
72   given mask.
73 
74   @note: The caller shall possess channel_map lock before calling this function,
75          and unlock after returning from this function.
76 
77   @param mi                   The pointer to Master_info instance
78   @param threads_to_stop      The types of threads to be stopped
79   @param timeout              The expected time in which the thread should stop
80 
81   @return the operation status
82     @retval 0      OK
83     @retval !=0    Error
84 */
85 int channel_stop(Master_info *mi, int threads_to_stop, long timeout);
86 
initialize_channel_service_interface()87 int initialize_channel_service_interface() {
88   DBUG_TRACE;
89 
90   // master info and relay log repositories must be TABLE
91   if (opt_mi_repository_id != INFO_REPOSITORY_TABLE ||
92       opt_rli_repository_id != INFO_REPOSITORY_TABLE) {
93     LogErr(ERROR_LEVEL, ER_RPL_CHANNELS_REQUIRE_TABLES_AS_INFO_REPOSITORIES);
94     return 1;
95   }
96 
97   // server id must be different from 0
98   if (server_id == 0) {
99     LogErr(ERROR_LEVEL, ER_RPL_CHANNELS_REQUIRE_NON_ZERO_SERVER_ID);
100     return 1;
101   }
102 
103   return 0;
104 }
105 
set_mi_settings(Master_info * mi,Channel_creation_info * channel_info)106 static void set_mi_settings(Master_info *mi,
107                             Channel_creation_info *channel_info) {
108   mysql_mutex_lock(mi->rli->relay_log.get_log_lock());
109   mysql_mutex_lock(&mi->data_lock);
110 
111   mi->rli->set_thd_tx_priority(channel_info->thd_tx_priority);
112 
113   mi->rli->replicate_same_server_id =
114       (channel_info->replicate_same_server_id == RPL_SERVICE_SERVER_DEFAULT)
115           ? replicate_same_server_id
116           : channel_info->replicate_same_server_id;
117 
118   mi->rli->opt_slave_parallel_workers =
119       (channel_info->channel_mts_parallel_workers == RPL_SERVICE_SERVER_DEFAULT)
120           ? opt_mts_slave_parallel_workers
121           : channel_info->channel_mts_parallel_workers;
122 
123   if (channel_info->channel_mts_parallel_type == RPL_SERVICE_SERVER_DEFAULT) {
124     if (mts_parallel_option == MTS_PARALLEL_TYPE_DB_NAME)
125       mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_DB_NAME;
126     else
127       mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
128   } else {
129     if (channel_info->channel_mts_parallel_type ==
130         CHANNEL_MTS_PARALLEL_TYPE_DB_NAME)
131       mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_DB_NAME;
132     else
133       mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
134   }
135 
136   mi->rli->checkpoint_group =
137       (channel_info->channel_mts_checkpoint_group == RPL_SERVICE_SERVER_DEFAULT)
138           ? opt_mts_checkpoint_group
139           : channel_info->channel_mts_checkpoint_group;
140 
141   Format_description_log_event *fde = new Format_description_log_event();
142   /*
143     Group replication applier channel shall not use checksum on its relay log
144     files.
145   */
146   if (channel_map.is_group_replication_channel_name(mi->get_channel(), true)) {
147     fde->footer()->checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
148     /*
149       When the receiver thread connects to the master, it gets its current
150       binlog checksum algorithm, but as GR applier channel has no receiver
151       thread (and also does not connect to a master), we will set the variable
152       here to BINLOG_CHECKSUM_ALG_OFF as events queued after certification have
153       no checksum information.
154     */
155     mi->checksum_alg_before_fd = binary_log::BINLOG_CHECKSUM_ALG_OFF;
156   }
157   mi->set_mi_description_event(fde);
158 
159   mysql_mutex_unlock(&mi->data_lock);
160   mysql_mutex_unlock(mi->rli->relay_log.get_log_lock());
161 }
162 
init_thread_context()163 static bool init_thread_context() { return my_thread_init(); }
164 
clean_thread_context()165 static void clean_thread_context() { my_thread_end(); }
166 
create_surrogate_thread()167 static THD *create_surrogate_thread() {
168   THD *thd = nullptr;
169   thd = new THD;
170   thd->thread_stack = (char *)&thd;
171   thd->store_globals();
172   thd->security_context()->skip_grants();
173 
174   return (thd);
175 }
176 
delete_surrogate_thread(THD * thd)177 static void delete_surrogate_thread(THD *thd) {
178   thd->release_resources();
179   delete thd;
180   current_thd = nullptr;
181 }
182 
initialize_channel_creation_info(Channel_creation_info * channel_info)183 void initialize_channel_creation_info(Channel_creation_info *channel_info) {
184   channel_info->type = SLAVE_REPLICATION_CHANNEL;
185   channel_info->hostname = nullptr;
186   channel_info->port = 0;
187   channel_info->user = nullptr;
188   channel_info->password = nullptr;
189   channel_info->ssl_info = nullptr;
190   channel_info->auto_position = RPL_SERVICE_SERVER_DEFAULT;
191   channel_info->channel_mts_parallel_type = RPL_SERVICE_SERVER_DEFAULT;
192   channel_info->channel_mts_parallel_workers = RPL_SERVICE_SERVER_DEFAULT;
193   channel_info->channel_mts_checkpoint_group = RPL_SERVICE_SERVER_DEFAULT;
194   channel_info->replicate_same_server_id = RPL_SERVICE_SERVER_DEFAULT;
195   channel_info->thd_tx_priority = 0;
196   channel_info->sql_delay = RPL_SERVICE_SERVER_DEFAULT;
197   channel_info->preserve_relay_logs = false;
198   channel_info->retry_count = 0;
199   channel_info->connect_retry = 0;
200   channel_info->public_key_path = nullptr;
201   channel_info->get_public_key = 0;
202   channel_info->compression_algorithm = nullptr;
203   channel_info->zstd_compression_level = 0;
204 }
205 
initialize_channel_ssl_info(Channel_ssl_info * channel_ssl_info)206 void initialize_channel_ssl_info(Channel_ssl_info *channel_ssl_info) {
207   channel_ssl_info->use_ssl = 0;
208   channel_ssl_info->ssl_ca_file_name = nullptr;
209   channel_ssl_info->ssl_ca_directory = nullptr;
210   channel_ssl_info->ssl_cert_file_name = nullptr;
211   channel_ssl_info->ssl_crl_file_name = nullptr;
212   channel_ssl_info->ssl_crl_directory = nullptr;
213   channel_ssl_info->ssl_key = nullptr;
214   channel_ssl_info->ssl_cipher = nullptr;
215   channel_ssl_info->tls_version = nullptr;
216   channel_ssl_info->ssl_verify_server_cert = 0;
217   channel_ssl_info->tls_ciphersuites = nullptr;
218 }
219 
initialize_channel_connection_info(Channel_connection_info * channel_info)220 void initialize_channel_connection_info(Channel_connection_info *channel_info) {
221   channel_info->until_condition = CHANNEL_NO_UNTIL_CONDITION;
222   channel_info->gtid = nullptr;
223   channel_info->view_id = nullptr;
224 }
225 
set_mi_ssl_options(LEX_MASTER_INFO * lex_mi,Channel_ssl_info * channel_ssl_info)226 static void set_mi_ssl_options(LEX_MASTER_INFO *lex_mi,
227                                Channel_ssl_info *channel_ssl_info) {
228   lex_mi->ssl = (channel_ssl_info->use_ssl) ? LEX_MASTER_INFO::LEX_MI_ENABLE
229                                             : LEX_MASTER_INFO::LEX_MI_DISABLE;
230 
231   if (channel_ssl_info->ssl_ca_file_name != nullptr) {
232     lex_mi->ssl_ca = channel_ssl_info->ssl_ca_file_name;
233   }
234 
235   if (channel_ssl_info->ssl_ca_directory != nullptr) {
236     lex_mi->ssl_capath = channel_ssl_info->ssl_ca_directory;
237   }
238 
239   if (channel_ssl_info->tls_version != nullptr) {
240     lex_mi->tls_version = channel_ssl_info->tls_version;
241   }
242 
243   if (channel_ssl_info->ssl_cert_file_name != nullptr) {
244     lex_mi->ssl_cert = channel_ssl_info->ssl_cert_file_name;
245   }
246 
247   if (channel_ssl_info->ssl_crl_file_name != nullptr) {
248     lex_mi->ssl_crl = channel_ssl_info->ssl_crl_file_name;
249   }
250 
251   if (channel_ssl_info->ssl_crl_directory != nullptr) {
252     lex_mi->ssl_crlpath = channel_ssl_info->ssl_crl_directory;
253   }
254 
255   if (channel_ssl_info->ssl_key != nullptr) {
256     lex_mi->ssl_key = channel_ssl_info->ssl_key;
257   }
258 
259   if (channel_ssl_info->ssl_cipher != nullptr) {
260     lex_mi->ssl_cipher = channel_ssl_info->ssl_cipher;
261   }
262 
263   if (channel_ssl_info->tls_ciphersuites != nullptr) {
264     lex_mi->tls_ciphersuites = LEX_MASTER_INFO::SPECIFIED_STRING;
265     lex_mi->tls_ciphersuites_string = channel_ssl_info->tls_ciphersuites;
266   } else {
267     lex_mi->tls_ciphersuites = LEX_MASTER_INFO::SPECIFIED_NULL;
268   }
269 
270   lex_mi->ssl_verify_server_cert = (channel_ssl_info->ssl_verify_server_cert)
271                                        ? LEX_MASTER_INFO::LEX_MI_ENABLE
272                                        : LEX_MASTER_INFO::LEX_MI_DISABLE;
273 }
274 
channel_create(const char * channel,Channel_creation_info * channel_info)275 int channel_create(const char *channel, Channel_creation_info *channel_info) {
276   DBUG_TRACE;
277 
278   Master_info *mi = nullptr;
279   int error = 0;
280   LEX_MASTER_INFO *lex_mi = nullptr;
281 
282   bool thd_created = false;
283   THD *thd = current_thd;
284 
285   // Don't create default channels
286   if (!strcmp(channel_map.get_default_channel(), channel))
287     return RPL_CHANNEL_SERVICE_DEFAULT_CHANNEL_CREATION_ERROR;
288 
289   /* Service channels are not supposed to use sql_slave_skip_counter */
290   mysql_mutex_lock(&LOCK_sql_slave_skip_counter);
291   if (sql_slave_skip_counter > 0)
292     error = RPL_CHANNEL_SERVICE_SLAVE_SKIP_COUNTER_ACTIVE;
293   mysql_mutex_unlock(&LOCK_sql_slave_skip_counter);
294   if (error) return error;
295 
296   channel_map.wrlock();
297 
298   /* Get the Master_info of the channel */
299   mi = channel_map.get_mi(channel);
300 
301   /* create a new channel if doesn't exist */
302   if (!mi) {
303     if ((error = add_new_channel(&mi, channel))) goto err;
304   }
305 
306   lex_mi = new LEX_MASTER_INFO();
307   lex_mi->channel = channel;
308   lex_mi->host = channel_info->hostname;
309   /*
310     'group_replication_recovery' channel (*after recovery is done*)
311     or 'group_replication_applier' channel wants to set the port number
312     to '0' as there is no actual network usage on these channels.
313   */
314   lex_mi->port_opt = LEX_MASTER_INFO::LEX_MI_ENABLE;
315   lex_mi->port = channel_info->port;
316   lex_mi->user = channel_info->user;
317   lex_mi->password = channel_info->password;
318   lex_mi->sql_delay = channel_info->sql_delay;
319   lex_mi->connect_retry = channel_info->connect_retry;
320   if (channel_info->retry_count) {
321     lex_mi->retry_count_opt = LEX_MASTER_INFO::LEX_MI_ENABLE;
322     lex_mi->retry_count = channel_info->retry_count;
323   }
324 
325   if (channel_info->auto_position) {
326     lex_mi->auto_position = LEX_MASTER_INFO::LEX_MI_ENABLE;
327     if ((mi && mi->is_auto_position()) ||
328         channel_info->auto_position == RPL_SERVICE_SERVER_DEFAULT) {
329       // So change master allows new configurations with a running SQL thread
330       lex_mi->auto_position = LEX_MASTER_INFO::LEX_MI_UNCHANGED;
331     }
332   }
333 
334   if (channel_info->public_key_path) {
335     lex_mi->public_key_path = channel_info->public_key_path;
336   }
337 
338   if (channel_info->get_public_key) {
339     lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_ENABLE;
340     if (mi && mi->get_public_key) {
341       // So change master allows new configurations with a running SQL thread
342       lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_UNCHANGED;
343     }
344   } else {
345     lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_DISABLE;
346     if (mi && !mi->get_public_key) {
347       // So change master allows new configurations with a running SQL thread
348       lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_UNCHANGED;
349     }
350   }
351 
352   if (channel_info->compression_algorithm != nullptr) {
353     lex_mi->compression_algorithm = channel_info->compression_algorithm;
354   }
355   if (channel_info->zstd_compression_level) {
356     lex_mi->zstd_compression_level = channel_info->zstd_compression_level;
357   }
358 
359   if (channel_info->ssl_info != nullptr) {
360     set_mi_ssl_options(lex_mi, channel_info->ssl_info);
361   }
362 
363   if (mi) {
364     if (!thd) {
365       thd_created = true;
366       thd = create_surrogate_thread();
367     }
368 
369     if ((error = change_master(thd, mi, lex_mi,
370                                channel_info->preserve_relay_logs))) {
371       goto err;
372     }
373   }
374 
375   set_mi_settings(mi, channel_info);
376 
377   if (channel_map.is_group_replication_channel_name(mi->get_channel())) {
378     thd->variables.max_allowed_packet = slave_max_allowed_packet;
379     thd->get_protocol_classic()->set_max_packet_size(slave_max_allowed_packet +
380                                                      MAX_LOG_EVENT_HEADER);
381   }
382 
383 err:
384   channel_map.unlock();
385 
386   if (thd_created) {
387     delete_surrogate_thread(thd);
388   }
389 
390   delete lex_mi;
391 
392   return error;
393 }
394 
channel_start(const char * channel,Channel_connection_info * connection_info,int threads_to_start,int wait_for_connection)395 int channel_start(const char *channel, Channel_connection_info *connection_info,
396                   int threads_to_start, int wait_for_connection) {
397   DBUG_TRACE;
398   int error = 0;
399   int thread_mask = 0;
400   LEX_MASTER_INFO lex_mi;
401   ulong thread_start_id = 0;
402   bool thd_created = false;
403   THD *thd = current_thd;
404   String_set user, pass, auth;
405 
406   /* Service channels are not supposed to use sql_slave_skip_counter */
407   mysql_mutex_lock(&LOCK_sql_slave_skip_counter);
408   if (sql_slave_skip_counter > 0)
409     error = RPL_CHANNEL_SERVICE_SLAVE_SKIP_COUNTER_ACTIVE;
410   mysql_mutex_unlock(&LOCK_sql_slave_skip_counter);
411   if (error) return error;
412 
413   channel_map.wrlock();
414 
415   Master_info *mi = channel_map.get_mi(channel);
416 
417   if (mi == nullptr) {
418     error = RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
419     goto err;
420   }
421 
422   if (threads_to_start & CHANNEL_APPLIER_THREAD) {
423     thread_mask |= SLAVE_SQL;
424   }
425   if (threads_to_start & CHANNEL_RECEIVER_THREAD) {
426     thread_mask |= SLAVE_IO;
427   }
428 
429   // Nothing to be done here
430   if (!thread_mask) goto err;
431 
432   LEX_SLAVE_CONNECTION lex_connection;
433   lex_connection.reset();
434 
435   if (!Rpl_channel_credentials::get_instance().get_credentials(channel, user,
436                                                                pass, auth)) {
437     lex_connection.user =
438         (user.first ? const_cast<char *>(user.second.c_str()) : nullptr);
439     lex_connection.password =
440         (pass.first ? const_cast<char *>(pass.second.c_str()) : nullptr);
441     lex_connection.plugin_auth =
442         (auth.first ? const_cast<char *>(auth.second.c_str()) : nullptr);
443   }
444 
445   if (connection_info->until_condition != CHANNEL_NO_UNTIL_CONDITION) {
446     switch (connection_info->until_condition) {
447       case CHANNEL_UNTIL_APPLIER_AFTER_GTIDS:
448         lex_mi.gtid_until_condition = LEX_MASTER_INFO::UNTIL_SQL_AFTER_GTIDS;
449         lex_mi.gtid = connection_info->gtid;
450         break;
451       case CHANNEL_UNTIL_APPLIER_BEFORE_GTIDS:
452         lex_mi.gtid_until_condition = LEX_MASTER_INFO::UNTIL_SQL_BEFORE_GTIDS;
453         lex_mi.gtid = connection_info->gtid;
454         break;
455       case CHANNEL_UNTIL_APPLIER_AFTER_GAPS:
456         lex_mi.until_after_gaps = true;
457         break;
458       case CHANNEL_UNTIL_VIEW_ID:
459         DBUG_ASSERT((thread_mask & SLAVE_SQL) && connection_info->view_id);
460         lex_mi.view_id = connection_info->view_id;
461         break;
462       default:
463         DBUG_ASSERT(0);
464     }
465   }
466 
467   if (wait_for_connection && (thread_mask & SLAVE_IO))
468     thread_start_id = mi->slave_run_id;
469 
470   if (!thd) {
471     thd_created = true;
472     thd = create_surrogate_thread();
473   }
474 
475   error = start_slave(thd, &lex_connection, &lex_mi, thread_mask, mi, false);
476 
477   if (wait_for_connection && (thread_mask & SLAVE_IO) && !error) {
478     mysql_mutex_lock(&mi->run_lock);
479     /*
480       If the ids are still equal this means the start thread method did not
481       wait for the thread to start
482     */
483     while (thread_start_id == mi->slave_run_id) {
484       mysql_cond_wait(&mi->start_cond, &mi->run_lock);
485     }
486     mysql_mutex_unlock(&mi->run_lock);
487 
488     while (mi->slave_running != MYSQL_SLAVE_RUN_CONNECT) {
489       // If there is such a state change then there was an error on connection
490       if (mi->slave_running == MYSQL_SLAVE_NOT_RUN) {
491         error = RPL_CHANNEL_SERVICE_RECEIVER_CONNECTION_ERROR;
492         break;
493       }
494       my_sleep(100);
495     }
496   }
497 
498 err:
499   channel_map.unlock();
500 
501   if (thd_created) {
502     delete_surrogate_thread(thd);
503   }
504 
505   return error;
506 }
507 
channel_stop(Master_info * mi,int threads_to_stop,long timeout)508 int channel_stop(Master_info *mi, int threads_to_stop, long timeout) {
509   channel_map.assert_some_lock();
510 
511   if (mi == nullptr) {
512     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
513   }
514 
515   int thread_mask = 0;
516   int server_thd_mask = 0;
517   int error = 0;
518   bool thd_init = false;
519 
520   mi->channel_wrlock();
521   lock_slave_threads(mi);
522 
523   init_thread_mask(&server_thd_mask, mi, false /* not inverse*/);
524 
525   if ((threads_to_stop & CHANNEL_APPLIER_THREAD) &&
526       (server_thd_mask & SLAVE_SQL)) {
527     thread_mask |= SLAVE_SQL;
528   }
529   if ((threads_to_stop & CHANNEL_RECEIVER_THREAD) &&
530       (server_thd_mask & SLAVE_IO)) {
531     thread_mask |= SLAVE_IO;
532   }
533 
534   if (thread_mask == 0) {
535     goto end;
536   }
537 
538   thd_init = init_thread_context();
539 
540   if (current_thd) current_thd->set_skip_readonly_check();
541 
542   error = terminate_slave_threads(mi, thread_mask, timeout, false);
543 
544   if (current_thd) current_thd->reset_skip_readonly_check();
545 
546 end:
547   unlock_slave_threads(mi);
548   mi->channel_unlock();
549 
550   if (thd_init) {
551     clean_thread_context();
552   }
553 
554   return error;
555 }
556 
channel_stop(const char * channel,int threads_to_stop,long timeout)557 int channel_stop(const char *channel, int threads_to_stop, long timeout) {
558   DBUG_TRACE;
559 
560   channel_map.rdlock();
561 
562   Master_info *mi = channel_map.get_mi(channel);
563 
564   int error = channel_stop(mi, threads_to_stop, timeout);
565 
566   channel_map.unlock();
567 
568   return error;
569 }
570 
channel_stop_all(int threads_to_stop,long timeout,std::string * error_message)571 int channel_stop_all(int threads_to_stop, long timeout,
572                      std::string *error_message) {
573   Master_info *mi = nullptr;
574 
575   /* Error related varaiables */
576   int error = 0;
577   std::stringstream err_msg_ss;
578   err_msg_ss << "Error stopping channel(s): ";
579 
580   channel_map.rdlock();
581 
582   for (mi_map::iterator it = channel_map.begin(); it != channel_map.end();
583        it++) {
584     mi = it->second;
585 
586     if (mi) {
587       int channel_error = channel_stop(mi, threads_to_stop, timeout);
588 
589       DBUG_EXECUTE_IF("group_replication_stop_all_channels_failure",
590                       { channel_error = 1; });
591 
592       if (channel_error &&
593           channel_error != RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR) {
594         error = channel_error;
595 
596         mi->report(ERROR_LEVEL, error,
597                    "Error stopping channel: %s. Got error: %d",
598                    mi->get_channel(), error);
599 
600         err_msg_ss << " '" << mi->get_channel() << "' [error number: " << error
601                    << "],";
602       }
603     }
604   }
605 
606   if (error) {
607     *error_message = err_msg_ss.str();
608     (*error_message)[error_message->length() - 1] = '.';
609     error_message->append(
610         " Please check the error log for additional details.");
611   }
612 
613   channel_map.unlock();
614   return error;
615 }
616 
617 class Kill_binlog_dump : public Do_THD_Impl {
618  public:
Kill_binlog_dump()619   Kill_binlog_dump() {}
620 
operator ()(THD * thd_to_kill)621   virtual void operator()(THD *thd_to_kill) {
622     if (thd_to_kill->get_command() == COM_BINLOG_DUMP ||
623         thd_to_kill->get_command() == COM_BINLOG_DUMP_GTID) {
624       DBUG_ASSERT(thd_to_kill != current_thd);
625       MUTEX_LOCK(thd_data_lock, &thd_to_kill->LOCK_thd_data);
626       thd_to_kill->duplicate_slave_id = true;
627       thd_to_kill->awake(THD::KILL_CONNECTION);
628     }
629   }
630 };
631 
binlog_dump_thread_kill()632 int binlog_dump_thread_kill() {
633   DBUG_TRACE;
634   Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
635   Kill_binlog_dump kill_binlog_dump;
636   thd_manager->do_for_all_thd(&kill_binlog_dump);
637   return 0;
638 }
639 
channel_purge_queue(const char * channel,bool reset_all)640 int channel_purge_queue(const char *channel, bool reset_all) {
641   DBUG_TRACE;
642 
643   channel_map.wrlock();
644 
645   Master_info *mi = channel_map.get_mi(channel);
646 
647   if (mi == nullptr) {
648     channel_map.unlock();
649     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
650   }
651 
652   bool thd_init = init_thread_context();
653 
654   int error = reset_slave(current_thd, mi, reset_all);
655 
656   channel_map.unlock();
657 
658   if (thd_init) {
659     clean_thread_context();
660   }
661 
662   return error;
663 }
664 
channel_is_active(const char * channel,enum_channel_thread_types thd_type)665 bool channel_is_active(const char *channel,
666                        enum_channel_thread_types thd_type) {
667   int thread_mask = 0;
668   DBUG_TRACE;
669 
670   channel_map.rdlock();
671 
672   Master_info *mi = channel_map.get_mi(channel);
673 
674   if (mi == nullptr) {
675     channel_map.unlock();
676     return false;
677   }
678 
679   init_thread_mask(&thread_mask, mi, false /* not inverse*/);
680 
681   channel_map.unlock();
682 
683   switch (thd_type) {
684     case CHANNEL_NO_THD:
685       return true;  // return true as the channel exists
686     case CHANNEL_RECEIVER_THREAD:
687       return thread_mask & SLAVE_IO;
688     case CHANNEL_APPLIER_THREAD:
689       return thread_mask & SLAVE_SQL;
690     default:
691       DBUG_ASSERT(0);
692   }
693   return false;
694 }
695 
channel_get_thread_id(const char * channel,enum_channel_thread_types thd_type,unsigned long ** thread_id)696 int channel_get_thread_id(const char *channel,
697                           enum_channel_thread_types thd_type,
698                           unsigned long **thread_id) {
699   DBUG_TRACE;
700 
701   int number_threads = -1;
702 
703   channel_map.rdlock();
704 
705   Master_info *mi = channel_map.get_mi(channel);
706 
707   if (mi == nullptr) {
708     channel_map.unlock();
709     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
710   }
711 
712   switch (thd_type) {
713     case CHANNEL_RECEIVER_THREAD:
714       mysql_mutex_lock(&mi->info_thd_lock);
715       if (mi->info_thd != nullptr) {
716         *thread_id = (unsigned long *)my_malloc(
717             PSI_NOT_INSTRUMENTED, sizeof(unsigned long), MYF(MY_WME));
718         **thread_id = mi->info_thd->thread_id();
719         number_threads = 1;
720       }
721       mysql_mutex_unlock(&mi->info_thd_lock);
722       break;
723     case CHANNEL_APPLIER_THREAD:
724       if (mi->rli != nullptr) {
725         mysql_mutex_lock(&mi->rli->run_lock);
726 
727         if (mi->rli->slave_parallel_workers > 0) {
728           // Parallel applier.
729           size_t num_workers = mi->rli->get_worker_count();
730           number_threads = 1 + num_workers;
731           *thread_id = (unsigned long *)my_malloc(
732               PSI_NOT_INSTRUMENTED, number_threads * sizeof(unsigned long),
733               MYF(MY_WME));
734           unsigned long *thread_id_pointer = *thread_id;
735 
736           // Set default values on thread_id array.
737           for (int i = 0; i < number_threads; i++, thread_id_pointer++)
738             *thread_id_pointer = -1;
739           thread_id_pointer = *thread_id;
740 
741           // Coordinator thread id.
742           if (mi->rli->info_thd != nullptr) {
743             mysql_mutex_lock(&mi->rli->info_thd_lock);
744             *thread_id_pointer = mi->rli->info_thd->thread_id();
745             mysql_mutex_unlock(&mi->rli->info_thd_lock);
746             thread_id_pointer++;
747           }
748 
749           // Workers thread id.
750           if (mi->rli->workers_array_initialized) {
751             for (size_t i = 0; i < num_workers; i++, thread_id_pointer++) {
752               Slave_worker *worker = mi->rli->get_worker(i);
753               if (worker != nullptr) {
754                 mysql_mutex_lock(&worker->jobs_lock);
755                 if (worker->info_thd != nullptr &&
756                     worker->running_status != Slave_worker::NOT_RUNNING) {
757                   mysql_mutex_lock(&worker->info_thd_lock);
758                   *thread_id_pointer = worker->info_thd->thread_id();
759                   mysql_mutex_unlock(&worker->info_thd_lock);
760                 }
761                 mysql_mutex_unlock(&worker->jobs_lock);
762               }
763             }
764           }
765         } else {
766           // Sequential applier.
767           if (mi->rli->info_thd != nullptr) {
768             *thread_id = (unsigned long *)my_malloc(
769                 PSI_NOT_INSTRUMENTED, sizeof(unsigned long), MYF(MY_WME));
770             mysql_mutex_lock(&mi->rli->info_thd_lock);
771             **thread_id = mi->rli->info_thd->thread_id();
772             mysql_mutex_unlock(&mi->rli->info_thd_lock);
773             number_threads = 1;
774           }
775         }
776         mysql_mutex_unlock(&mi->rli->run_lock);
777       }
778       break;
779     default:
780       break;
781   }
782 
783   channel_map.unlock();
784 
785   return number_threads;
786 }
787 
channel_get_last_delivered_gno(const char * channel,int sidno)788 long long channel_get_last_delivered_gno(const char *channel, int sidno) {
789   DBUG_TRACE;
790 
791   channel_map.rdlock();
792 
793   Master_info *mi = channel_map.get_mi(channel);
794 
795   if (mi == nullptr) {
796     channel_map.unlock();
797     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
798   }
799 
800   rpl_gno last_gno = 0;
801 
802   Checkable_rwlock *sid_lock = mi->rli->get_sid_lock();
803   sid_lock->rdlock();
804   last_gno = mi->rli->get_gtid_set()->get_last_gno(sidno);
805   sid_lock->unlock();
806 
807 #if !defined(DBUG_OFF)
808   const Gtid_set *retrieved_gtid_set = mi->rli->get_gtid_set();
809   char *retrieved_gtid_set_string = nullptr;
810   sid_lock->wrlock();
811   retrieved_gtid_set->to_string(&retrieved_gtid_set_string);
812   sid_lock->unlock();
813   DBUG_PRINT("info", ("get_last_delivered_gno retrieved_set_string: %s",
814                       retrieved_gtid_set_string));
815   my_free(retrieved_gtid_set_string);
816 #endif
817 
818   channel_map.unlock();
819 
820   return last_gno;
821 }
822 
channel_add_executed_gtids_to_received_gtids(const char * channel)823 int channel_add_executed_gtids_to_received_gtids(const char *channel) {
824   DBUG_TRACE;
825 
826   channel_map.rdlock();
827   Master_info *mi = channel_map.get_mi(channel);
828   if (mi == nullptr) {
829     channel_map.unlock();
830     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
831   }
832 
833   global_sid_lock->wrlock();
834 
835   enum_return_status return_status =
836       mi->rli->add_gtid_set(gtid_state->get_executed_gtids());
837 
838   global_sid_lock->unlock();
839   channel_map.unlock();
840 
841   return return_status != RETURN_STATUS_OK;
842 }
843 
channel_queue_packet(const char * channel,const char * buf,unsigned long event_len)844 int channel_queue_packet(const char *channel, const char *buf,
845                          unsigned long event_len) {
846   int result;
847   DBUG_TRACE;
848 
849   channel_map.rdlock();
850 
851   Master_info *mi = channel_map.get_mi(channel);
852 
853   if (mi == nullptr) {
854     channel_map.unlock();
855     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
856   }
857   channel_map.unlock();
858 
859   result = queue_event(mi, buf, event_len, false /*flush_master_info*/);
860 
861   return result;
862 }
863 
channel_wait_until_apply_queue_applied(const char * channel,double timeout)864 int channel_wait_until_apply_queue_applied(const char *channel,
865                                            double timeout) {
866   DBUG_TRACE;
867 
868   channel_map.rdlock();
869 
870   Master_info *mi = channel_map.get_mi(channel);
871 
872   if (mi == nullptr) {
873     channel_map.unlock();
874     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
875   }
876 
877   mi->inc_reference();
878   channel_map.unlock();
879 
880   /*
881     The retrieved_gtid_set (rli->get_gtid_set) has its own sid_map/sid_lock
882     and do not use global_sid_map/global_sid_lock. Instead of blocking both
883     sid locks on each wait iteration at rli->wait_for_gtid_set(Gtid_set), it
884     would be better to use rli->wait_for_gtid_set(char *) that will create a
885     new Gtid_set based on global_sid_map.
886   */
887   char *retrieved_gtid_set_buf;
888   mi->rli->get_sid_lock()->wrlock();
889   mi->rli->get_gtid_set()->to_string(&retrieved_gtid_set_buf);
890   mi->rli->get_sid_lock()->unlock();
891 
892   int error = mi->rli->wait_for_gtid_set(current_thd, retrieved_gtid_set_buf,
893                                          timeout, false);
894   my_free(retrieved_gtid_set_buf);
895   mi->dec_reference();
896 
897   if (error == -1) return REPLICATION_THREAD_WAIT_TIMEOUT_ERROR;
898   if (error == -2) return REPLICATION_THREAD_WAIT_NO_INFO_ERROR;
899 
900   return error;
901 }
902 
channel_wait_until_transactions_applied(const char * channel,const char * gtid_set,double timeout,bool update_THD_status)903 int channel_wait_until_transactions_applied(const char *channel,
904                                             const char *gtid_set,
905                                             double timeout,
906                                             bool update_THD_status) {
907   DBUG_TRACE;
908 
909   channel_map.rdlock();
910 
911   Master_info *mi = channel_map.get_mi(channel);
912 
913   if (mi == nullptr) {
914     channel_map.unlock(); /* purecov: inspected */
915     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR; /* purecov:
916                                                                   inspected */
917   }
918 
919   mi->inc_reference();
920   channel_map.unlock();
921 
922   int error = mi->rli->wait_for_gtid_set(current_thd, gtid_set, timeout,
923                                          update_THD_status);
924   mi->dec_reference();
925 
926   if (error == -1) return REPLICATION_THREAD_WAIT_TIMEOUT_ERROR;
927   if (error == -2) return REPLICATION_THREAD_WAIT_NO_INFO_ERROR;
928 
929   return error;
930 }
931 
channel_is_applier_waiting(const char * channel)932 int channel_is_applier_waiting(const char *channel) {
933   DBUG_TRACE;
934   int result = RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
935 
936   channel_map.rdlock();
937 
938   Master_info *mi = channel_map.get_mi(channel);
939 
940   if (mi == nullptr) {
941     channel_map.unlock();
942     return result;
943   }
944 
945   unsigned long *thread_ids = nullptr;
946   int number_appliers =
947       channel_get_thread_id(channel, CHANNEL_APPLIER_THREAD, &thread_ids);
948 
949   if (number_appliers <= 0) {
950     goto end;
951   }
952 
953   if (number_appliers == 1) {
954     result = channel_is_applier_thread_waiting(*thread_ids);
955   } else if (number_appliers > 1) {
956     int waiting = 0;
957 
958     // Check if coordinator is waiting.
959     waiting += channel_is_applier_thread_waiting(thread_ids[0]);
960 
961     // Check if workers are waiting.
962     for (int i = 1; i < number_appliers; i++)
963       waiting += channel_is_applier_thread_waiting(thread_ids[i], true);
964 
965     // Check if all are waiting.
966     if (waiting == number_appliers)
967       result = 1;
968     else
969       result = 0;
970   }
971 
972 end:
973   channel_map.unlock();
974   my_free(thread_ids);
975 
976   return result;
977 }
978 
channel_is_applier_thread_waiting(unsigned long thread_id,bool worker)979 int channel_is_applier_thread_waiting(unsigned long thread_id, bool worker) {
980   DBUG_TRACE;
981   int result = -1;
982 
983   Find_thd_with_id find_thd_with_id(thread_id);
984   THD *thd = Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
985   if (thd) {
986     result = 0;
987 
988     const char *proc_info = thd->get_proc_info();
989     if (proc_info) {
990       const char *stage_name = stage_slave_has_read_all_relay_log.m_name;
991       if (worker)
992         stage_name = stage_slave_waiting_event_from_coordinator.m_name;
993 
994       if (!strcmp(proc_info, stage_name)) result = 1;
995     }
996     mysql_mutex_unlock(&thd->LOCK_thd_data);
997   }
998 
999   return result;
1000 }
1001 
channel_flush(const char * channel)1002 int channel_flush(const char *channel) {
1003   DBUG_TRACE;
1004 
1005   channel_map.rdlock();
1006 
1007   Master_info *mi = channel_map.get_mi(channel);
1008 
1009   if (mi == nullptr) {
1010     channel_map.unlock();
1011     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
1012   }
1013 
1014   bool error = (flush_relay_logs(mi, mi->info_thd) == 1);
1015 
1016   channel_map.unlock();
1017 
1018   return error ? 1 : 0;
1019 }
1020 
channel_get_retrieved_gtid_set(const char * channel,char ** retrieved_set)1021 int channel_get_retrieved_gtid_set(const char *channel, char **retrieved_set) {
1022   DBUG_TRACE;
1023 
1024   channel_map.rdlock();
1025 
1026   Master_info *mi = channel_map.get_mi(channel);
1027 
1028   if (mi == nullptr) {
1029     channel_map.unlock();
1030     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
1031   }
1032 
1033   mi->inc_reference();
1034   channel_map.unlock();
1035 
1036   int error = 0;
1037   const Gtid_set *receiver_gtid_set = mi->rli->get_gtid_set();
1038   if (receiver_gtid_set->to_string(retrieved_set, true /*need_lock*/) == -1)
1039     error = ER_OUTOFMEMORY;
1040 
1041   mi->dec_reference();
1042 
1043   return error;
1044 }
1045 
channel_get_credentials(const char * channel,std::string & username,std::string & password)1046 int channel_get_credentials(const char *channel, std::string &username,
1047                             std::string &password) {
1048   DBUG_TRACE;
1049   String_set user_store, pass_store, auth_store;
1050 
1051   if (!Rpl_channel_credentials::get_instance().get_credentials(
1052           channel, user_store, pass_store, auth_store)) {
1053     if (user_store.first) username = user_store.second;
1054     if (pass_store.first) password = pass_store.second;
1055     return 0;
1056   }
1057 
1058   channel_map.rdlock();
1059   Master_info *mi = channel_map.get_mi(channel);
1060 
1061   if (mi == nullptr) {
1062     channel_map.unlock();
1063     return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
1064   }
1065 
1066   mi->inc_reference();
1067   channel_map.unlock();
1068 
1069   char pass[MAX_PASSWORD_LENGTH + 1];
1070   size_t pass_size;
1071   mi->get_password(pass, &pass_size);
1072   username.assign(mi->get_user());
1073   password.assign(pass, pass_size);
1074 
1075   mi->dec_reference();
1076 
1077   return 0;
1078 }
1079 
channel_is_stopping(const char * channel,enum_channel_thread_types thd_type)1080 bool channel_is_stopping(const char *channel,
1081                          enum_channel_thread_types thd_type) {
1082   bool is_stopping = false;
1083   DBUG_TRACE;
1084 
1085   channel_map.rdlock();
1086   Master_info *mi = channel_map.get_mi(channel);
1087   if (mi == nullptr) {
1088     channel_map.unlock();
1089     return false;
1090   }
1091 
1092   switch (thd_type) {
1093     case CHANNEL_NO_THD:
1094       break;
1095     case CHANNEL_RECEIVER_THREAD:
1096       is_stopping = likely(mi->atomic_is_stopping);
1097       break;
1098     case CHANNEL_APPLIER_THREAD:
1099       is_stopping = likely(mi->rli->atomic_is_stopping);
1100       break;
1101     default:
1102       DBUG_ASSERT(0);
1103   }
1104 
1105   channel_map.unlock();
1106 
1107   return is_stopping;
1108 }
1109 
is_partial_transaction_on_channel_relay_log(const char * channel)1110 bool is_partial_transaction_on_channel_relay_log(const char *channel) {
1111   DBUG_TRACE;
1112   channel_map.rdlock();
1113   Master_info *mi = channel_map.get_mi(channel);
1114   if (mi == nullptr) {
1115     channel_map.unlock();
1116     return false;
1117   }
1118   bool ret = mi->transaction_parser.is_inside_transaction();
1119   channel_map.unlock();
1120   return ret;
1121 }
1122 
is_any_slave_channel_running(int thread_mask)1123 bool is_any_slave_channel_running(int thread_mask) {
1124   DBUG_TRACE;
1125   Master_info *mi = nullptr;
1126   bool is_running;
1127 
1128   channel_map.rdlock();
1129 
1130   for (mi_map::iterator it = channel_map.begin(); it != channel_map.end();
1131        it++) {
1132     mi = it->second;
1133 
1134     if (mi) {
1135       if ((thread_mask & SLAVE_IO) != 0) {
1136         mysql_mutex_lock(&mi->run_lock);
1137         is_running = mi->slave_running;
1138         mysql_mutex_unlock(&mi->run_lock);
1139         if (is_running) {
1140           channel_map.unlock();
1141           return true;
1142         }
1143       }
1144 
1145       if ((thread_mask & SLAVE_SQL) != 0) {
1146         mysql_mutex_lock(&mi->rli->run_lock);
1147         is_running = mi->rli->slave_running;
1148         mysql_mutex_unlock(&mi->rli->run_lock);
1149         if (is_running) {
1150           channel_map.unlock();
1151           return true;
1152         }
1153       }
1154     }
1155   }
1156 
1157   channel_map.unlock();
1158   return false;
1159 }
1160 
1161 enum_slave_channel_status
has_any_slave_channel_open_temp_table_or_is_its_applier_running()1162 has_any_slave_channel_open_temp_table_or_is_its_applier_running() {
1163   DBUG_TRACE;
1164   Master_info *mi = nullptr;
1165   bool is_applier_running = false;
1166   bool has_open_temp_tables = false;
1167   mi_map::iterator it;
1168 
1169   channel_map.rdlock();
1170 
1171   mi_map::iterator it_end = channel_map.end();
1172   for (it = channel_map.begin(); it != channel_map.end(); ++it) {
1173     mi = it->second;
1174 
1175     if (Master_info::is_configured(mi)) {
1176       mysql_mutex_lock(&mi->rli->run_lock);
1177       is_applier_running = mi->rli->slave_running;
1178       if (mi->rli->atomic_channel_open_temp_tables > 0)
1179         has_open_temp_tables = true;
1180       if (is_applier_running || has_open_temp_tables) {
1181         /*
1182           Stop acquiring more run_locks and start to release the held
1183           run_locks once finding that a slave channel applier thread
1184           is running or a slave channel has open temporary table(s),
1185           and record the stop position.
1186         */
1187         it_end = ++it;
1188         break;
1189       }
1190     }
1191   }
1192 
1193   /*
1194     Release the held run_locks until the stop position recorded in above
1195     or the end of the channel_map.
1196   */
1197   for (it = channel_map.begin(); it != it_end; ++it) {
1198     mi = it->second;
1199     if (Master_info::is_configured(mi)) mysql_mutex_unlock(&mi->rli->run_lock);
1200   }
1201 
1202   channel_map.unlock();
1203 
1204   if (has_open_temp_tables)
1205     return SLAVE_CHANNEL_HAS_OPEN_TEMPORARY_TABLE;
1206   else if (is_applier_running)
1207     return SLAVE_CHANNEL_APPLIER_IS_RUNNING;
1208 
1209   return SLAVE_CHANNEL_NO_APPLIER_RUNNING_AND_NO_OPEN_TEMPORARY_TABLE;
1210 }
1211 
channel_delete_credentials(const char * channel_name)1212 int channel_delete_credentials(const char *channel_name) {
1213   DBUG_TRACE;
1214   return Rpl_channel_credentials::get_instance().delete_credentials(
1215       channel_name);
1216 }
1217