1 /* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/rpl_channel_service_interface.h"
24
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <atomic>
29 #include <map>
30 #include <sstream>
31 #include <utility>
32
33 #include "mutex_lock.h" // MUTEX_LOCK
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_inttypes.h"
37 #include "my_loglevel.h"
38 #include "my_sys.h"
39 #include "my_thread.h"
40 #include "mysql/components/services/log_builtins.h"
41 #include "mysql/components/services/psi_stage_bits.h"
42 #include "mysql/psi/mysql_cond.h"
43 #include "mysql/psi/mysql_mutex.h"
44 #include "mysql/psi/psi_base.h"
45 #include "mysql/service_mysql_alloc.h"
46 #include "mysql_com.h"
47 #include "mysqld_error.h"
48 #include "sql/auth/sql_security_ctx.h"
49 #include "sql/binlog.h"
50 #include "sql/current_thd.h"
51 #include "sql/log.h"
52 #include "sql/log_event.h"
53 #include "sql/mysqld.h" // opt_mts_slave_parallel_workers
54 #include "sql/mysqld_thd_manager.h" // Global_THD_manager
55 #include "sql/protocol_classic.h"
56 #include "sql/rpl_channel_credentials.h"
57 #include "sql/rpl_gtid.h"
58 #include "sql/rpl_info_factory.h"
59 #include "sql/rpl_info_handler.h"
60 #include "sql/rpl_mi.h"
61 #include "sql/rpl_msr.h" /* Multisource replication */
62 #include "sql/rpl_mts_submode.h"
63 #include "sql/rpl_rli.h"
64 #include "sql/rpl_rli_pdb.h"
65 #include "sql/rpl_slave.h"
66 #include "sql/rpl_trx_boundary_parser.h"
67 #include "sql/sql_class.h"
68 #include "sql/sql_lex.h"
69
70 /**
71 Auxiliary function to stop all the running channel threads according to the
72 given mask.
73
74 @note: The caller shall possess channel_map lock before calling this function,
75 and unlock after returning from this function.
76
77 @param mi The pointer to Master_info instance
78 @param threads_to_stop The types of threads to be stopped
79 @param timeout The expected time in which the thread should stop
80
81 @return the operation status
82 @retval 0 OK
83 @retval !=0 Error
84 */
85 int channel_stop(Master_info *mi, int threads_to_stop, long timeout);
86
initialize_channel_service_interface()87 int initialize_channel_service_interface() {
88 DBUG_TRACE;
89
90 // master info and relay log repositories must be TABLE
91 if (opt_mi_repository_id != INFO_REPOSITORY_TABLE ||
92 opt_rli_repository_id != INFO_REPOSITORY_TABLE) {
93 LogErr(ERROR_LEVEL, ER_RPL_CHANNELS_REQUIRE_TABLES_AS_INFO_REPOSITORIES);
94 return 1;
95 }
96
97 // server id must be different from 0
98 if (server_id == 0) {
99 LogErr(ERROR_LEVEL, ER_RPL_CHANNELS_REQUIRE_NON_ZERO_SERVER_ID);
100 return 1;
101 }
102
103 return 0;
104 }
105
set_mi_settings(Master_info * mi,Channel_creation_info * channel_info)106 static void set_mi_settings(Master_info *mi,
107 Channel_creation_info *channel_info) {
108 mysql_mutex_lock(mi->rli->relay_log.get_log_lock());
109 mysql_mutex_lock(&mi->data_lock);
110
111 mi->rli->set_thd_tx_priority(channel_info->thd_tx_priority);
112
113 mi->rli->replicate_same_server_id =
114 (channel_info->replicate_same_server_id == RPL_SERVICE_SERVER_DEFAULT)
115 ? replicate_same_server_id
116 : channel_info->replicate_same_server_id;
117
118 mi->rli->opt_slave_parallel_workers =
119 (channel_info->channel_mts_parallel_workers == RPL_SERVICE_SERVER_DEFAULT)
120 ? opt_mts_slave_parallel_workers
121 : channel_info->channel_mts_parallel_workers;
122
123 if (channel_info->channel_mts_parallel_type == RPL_SERVICE_SERVER_DEFAULT) {
124 if (mts_parallel_option == MTS_PARALLEL_TYPE_DB_NAME)
125 mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_DB_NAME;
126 else
127 mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
128 } else {
129 if (channel_info->channel_mts_parallel_type ==
130 CHANNEL_MTS_PARALLEL_TYPE_DB_NAME)
131 mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_DB_NAME;
132 else
133 mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
134 }
135
136 mi->rli->checkpoint_group =
137 (channel_info->channel_mts_checkpoint_group == RPL_SERVICE_SERVER_DEFAULT)
138 ? opt_mts_checkpoint_group
139 : channel_info->channel_mts_checkpoint_group;
140
141 Format_description_log_event *fde = new Format_description_log_event();
142 /*
143 Group replication applier channel shall not use checksum on its relay log
144 files.
145 */
146 if (channel_map.is_group_replication_channel_name(mi->get_channel(), true)) {
147 fde->footer()->checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
148 /*
149 When the receiver thread connects to the master, it gets its current
150 binlog checksum algorithm, but as GR applier channel has no receiver
151 thread (and also does not connect to a master), we will set the variable
152 here to BINLOG_CHECKSUM_ALG_OFF as events queued after certification have
153 no checksum information.
154 */
155 mi->checksum_alg_before_fd = binary_log::BINLOG_CHECKSUM_ALG_OFF;
156 }
157 mi->set_mi_description_event(fde);
158
159 mysql_mutex_unlock(&mi->data_lock);
160 mysql_mutex_unlock(mi->rli->relay_log.get_log_lock());
161 }
162
init_thread_context()163 static bool init_thread_context() { return my_thread_init(); }
164
clean_thread_context()165 static void clean_thread_context() { my_thread_end(); }
166
create_surrogate_thread()167 static THD *create_surrogate_thread() {
168 THD *thd = nullptr;
169 thd = new THD;
170 thd->thread_stack = (char *)&thd;
171 thd->store_globals();
172 thd->security_context()->skip_grants();
173
174 return (thd);
175 }
176
delete_surrogate_thread(THD * thd)177 static void delete_surrogate_thread(THD *thd) {
178 thd->release_resources();
179 delete thd;
180 current_thd = nullptr;
181 }
182
initialize_channel_creation_info(Channel_creation_info * channel_info)183 void initialize_channel_creation_info(Channel_creation_info *channel_info) {
184 channel_info->type = SLAVE_REPLICATION_CHANNEL;
185 channel_info->hostname = nullptr;
186 channel_info->port = 0;
187 channel_info->user = nullptr;
188 channel_info->password = nullptr;
189 channel_info->ssl_info = nullptr;
190 channel_info->auto_position = RPL_SERVICE_SERVER_DEFAULT;
191 channel_info->channel_mts_parallel_type = RPL_SERVICE_SERVER_DEFAULT;
192 channel_info->channel_mts_parallel_workers = RPL_SERVICE_SERVER_DEFAULT;
193 channel_info->channel_mts_checkpoint_group = RPL_SERVICE_SERVER_DEFAULT;
194 channel_info->replicate_same_server_id = RPL_SERVICE_SERVER_DEFAULT;
195 channel_info->thd_tx_priority = 0;
196 channel_info->sql_delay = RPL_SERVICE_SERVER_DEFAULT;
197 channel_info->preserve_relay_logs = false;
198 channel_info->retry_count = 0;
199 channel_info->connect_retry = 0;
200 channel_info->public_key_path = nullptr;
201 channel_info->get_public_key = 0;
202 channel_info->compression_algorithm = nullptr;
203 channel_info->zstd_compression_level = 0;
204 }
205
initialize_channel_ssl_info(Channel_ssl_info * channel_ssl_info)206 void initialize_channel_ssl_info(Channel_ssl_info *channel_ssl_info) {
207 channel_ssl_info->use_ssl = 0;
208 channel_ssl_info->ssl_ca_file_name = nullptr;
209 channel_ssl_info->ssl_ca_directory = nullptr;
210 channel_ssl_info->ssl_cert_file_name = nullptr;
211 channel_ssl_info->ssl_crl_file_name = nullptr;
212 channel_ssl_info->ssl_crl_directory = nullptr;
213 channel_ssl_info->ssl_key = nullptr;
214 channel_ssl_info->ssl_cipher = nullptr;
215 channel_ssl_info->tls_version = nullptr;
216 channel_ssl_info->ssl_verify_server_cert = 0;
217 channel_ssl_info->tls_ciphersuites = nullptr;
218 }
219
initialize_channel_connection_info(Channel_connection_info * channel_info)220 void initialize_channel_connection_info(Channel_connection_info *channel_info) {
221 channel_info->until_condition = CHANNEL_NO_UNTIL_CONDITION;
222 channel_info->gtid = nullptr;
223 channel_info->view_id = nullptr;
224 }
225
set_mi_ssl_options(LEX_MASTER_INFO * lex_mi,Channel_ssl_info * channel_ssl_info)226 static void set_mi_ssl_options(LEX_MASTER_INFO *lex_mi,
227 Channel_ssl_info *channel_ssl_info) {
228 lex_mi->ssl = (channel_ssl_info->use_ssl) ? LEX_MASTER_INFO::LEX_MI_ENABLE
229 : LEX_MASTER_INFO::LEX_MI_DISABLE;
230
231 if (channel_ssl_info->ssl_ca_file_name != nullptr) {
232 lex_mi->ssl_ca = channel_ssl_info->ssl_ca_file_name;
233 }
234
235 if (channel_ssl_info->ssl_ca_directory != nullptr) {
236 lex_mi->ssl_capath = channel_ssl_info->ssl_ca_directory;
237 }
238
239 if (channel_ssl_info->tls_version != nullptr) {
240 lex_mi->tls_version = channel_ssl_info->tls_version;
241 }
242
243 if (channel_ssl_info->ssl_cert_file_name != nullptr) {
244 lex_mi->ssl_cert = channel_ssl_info->ssl_cert_file_name;
245 }
246
247 if (channel_ssl_info->ssl_crl_file_name != nullptr) {
248 lex_mi->ssl_crl = channel_ssl_info->ssl_crl_file_name;
249 }
250
251 if (channel_ssl_info->ssl_crl_directory != nullptr) {
252 lex_mi->ssl_crlpath = channel_ssl_info->ssl_crl_directory;
253 }
254
255 if (channel_ssl_info->ssl_key != nullptr) {
256 lex_mi->ssl_key = channel_ssl_info->ssl_key;
257 }
258
259 if (channel_ssl_info->ssl_cipher != nullptr) {
260 lex_mi->ssl_cipher = channel_ssl_info->ssl_cipher;
261 }
262
263 if (channel_ssl_info->tls_ciphersuites != nullptr) {
264 lex_mi->tls_ciphersuites = LEX_MASTER_INFO::SPECIFIED_STRING;
265 lex_mi->tls_ciphersuites_string = channel_ssl_info->tls_ciphersuites;
266 } else {
267 lex_mi->tls_ciphersuites = LEX_MASTER_INFO::SPECIFIED_NULL;
268 }
269
270 lex_mi->ssl_verify_server_cert = (channel_ssl_info->ssl_verify_server_cert)
271 ? LEX_MASTER_INFO::LEX_MI_ENABLE
272 : LEX_MASTER_INFO::LEX_MI_DISABLE;
273 }
274
channel_create(const char * channel,Channel_creation_info * channel_info)275 int channel_create(const char *channel, Channel_creation_info *channel_info) {
276 DBUG_TRACE;
277
278 Master_info *mi = nullptr;
279 int error = 0;
280 LEX_MASTER_INFO *lex_mi = nullptr;
281
282 bool thd_created = false;
283 THD *thd = current_thd;
284
285 // Don't create default channels
286 if (!strcmp(channel_map.get_default_channel(), channel))
287 return RPL_CHANNEL_SERVICE_DEFAULT_CHANNEL_CREATION_ERROR;
288
289 /* Service channels are not supposed to use sql_slave_skip_counter */
290 mysql_mutex_lock(&LOCK_sql_slave_skip_counter);
291 if (sql_slave_skip_counter > 0)
292 error = RPL_CHANNEL_SERVICE_SLAVE_SKIP_COUNTER_ACTIVE;
293 mysql_mutex_unlock(&LOCK_sql_slave_skip_counter);
294 if (error) return error;
295
296 channel_map.wrlock();
297
298 /* Get the Master_info of the channel */
299 mi = channel_map.get_mi(channel);
300
301 /* create a new channel if doesn't exist */
302 if (!mi) {
303 if ((error = add_new_channel(&mi, channel))) goto err;
304 }
305
306 lex_mi = new LEX_MASTER_INFO();
307 lex_mi->channel = channel;
308 lex_mi->host = channel_info->hostname;
309 /*
310 'group_replication_recovery' channel (*after recovery is done*)
311 or 'group_replication_applier' channel wants to set the port number
312 to '0' as there is no actual network usage on these channels.
313 */
314 lex_mi->port_opt = LEX_MASTER_INFO::LEX_MI_ENABLE;
315 lex_mi->port = channel_info->port;
316 lex_mi->user = channel_info->user;
317 lex_mi->password = channel_info->password;
318 lex_mi->sql_delay = channel_info->sql_delay;
319 lex_mi->connect_retry = channel_info->connect_retry;
320 if (channel_info->retry_count) {
321 lex_mi->retry_count_opt = LEX_MASTER_INFO::LEX_MI_ENABLE;
322 lex_mi->retry_count = channel_info->retry_count;
323 }
324
325 if (channel_info->auto_position) {
326 lex_mi->auto_position = LEX_MASTER_INFO::LEX_MI_ENABLE;
327 if ((mi && mi->is_auto_position()) ||
328 channel_info->auto_position == RPL_SERVICE_SERVER_DEFAULT) {
329 // So change master allows new configurations with a running SQL thread
330 lex_mi->auto_position = LEX_MASTER_INFO::LEX_MI_UNCHANGED;
331 }
332 }
333
334 if (channel_info->public_key_path) {
335 lex_mi->public_key_path = channel_info->public_key_path;
336 }
337
338 if (channel_info->get_public_key) {
339 lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_ENABLE;
340 if (mi && mi->get_public_key) {
341 // So change master allows new configurations with a running SQL thread
342 lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_UNCHANGED;
343 }
344 } else {
345 lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_DISABLE;
346 if (mi && !mi->get_public_key) {
347 // So change master allows new configurations with a running SQL thread
348 lex_mi->get_public_key = LEX_MASTER_INFO::LEX_MI_UNCHANGED;
349 }
350 }
351
352 if (channel_info->compression_algorithm != nullptr) {
353 lex_mi->compression_algorithm = channel_info->compression_algorithm;
354 }
355 if (channel_info->zstd_compression_level) {
356 lex_mi->zstd_compression_level = channel_info->zstd_compression_level;
357 }
358
359 if (channel_info->ssl_info != nullptr) {
360 set_mi_ssl_options(lex_mi, channel_info->ssl_info);
361 }
362
363 if (mi) {
364 if (!thd) {
365 thd_created = true;
366 thd = create_surrogate_thread();
367 }
368
369 if ((error = change_master(thd, mi, lex_mi,
370 channel_info->preserve_relay_logs))) {
371 goto err;
372 }
373 }
374
375 set_mi_settings(mi, channel_info);
376
377 if (channel_map.is_group_replication_channel_name(mi->get_channel())) {
378 thd->variables.max_allowed_packet = slave_max_allowed_packet;
379 thd->get_protocol_classic()->set_max_packet_size(slave_max_allowed_packet +
380 MAX_LOG_EVENT_HEADER);
381 }
382
383 err:
384 channel_map.unlock();
385
386 if (thd_created) {
387 delete_surrogate_thread(thd);
388 }
389
390 delete lex_mi;
391
392 return error;
393 }
394
channel_start(const char * channel,Channel_connection_info * connection_info,int threads_to_start,int wait_for_connection)395 int channel_start(const char *channel, Channel_connection_info *connection_info,
396 int threads_to_start, int wait_for_connection) {
397 DBUG_TRACE;
398 int error = 0;
399 int thread_mask = 0;
400 LEX_MASTER_INFO lex_mi;
401 ulong thread_start_id = 0;
402 bool thd_created = false;
403 THD *thd = current_thd;
404 String_set user, pass, auth;
405
406 /* Service channels are not supposed to use sql_slave_skip_counter */
407 mysql_mutex_lock(&LOCK_sql_slave_skip_counter);
408 if (sql_slave_skip_counter > 0)
409 error = RPL_CHANNEL_SERVICE_SLAVE_SKIP_COUNTER_ACTIVE;
410 mysql_mutex_unlock(&LOCK_sql_slave_skip_counter);
411 if (error) return error;
412
413 channel_map.wrlock();
414
415 Master_info *mi = channel_map.get_mi(channel);
416
417 if (mi == nullptr) {
418 error = RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
419 goto err;
420 }
421
422 if (threads_to_start & CHANNEL_APPLIER_THREAD) {
423 thread_mask |= SLAVE_SQL;
424 }
425 if (threads_to_start & CHANNEL_RECEIVER_THREAD) {
426 thread_mask |= SLAVE_IO;
427 }
428
429 // Nothing to be done here
430 if (!thread_mask) goto err;
431
432 LEX_SLAVE_CONNECTION lex_connection;
433 lex_connection.reset();
434
435 if (!Rpl_channel_credentials::get_instance().get_credentials(channel, user,
436 pass, auth)) {
437 lex_connection.user =
438 (user.first ? const_cast<char *>(user.second.c_str()) : nullptr);
439 lex_connection.password =
440 (pass.first ? const_cast<char *>(pass.second.c_str()) : nullptr);
441 lex_connection.plugin_auth =
442 (auth.first ? const_cast<char *>(auth.second.c_str()) : nullptr);
443 }
444
445 if (connection_info->until_condition != CHANNEL_NO_UNTIL_CONDITION) {
446 switch (connection_info->until_condition) {
447 case CHANNEL_UNTIL_APPLIER_AFTER_GTIDS:
448 lex_mi.gtid_until_condition = LEX_MASTER_INFO::UNTIL_SQL_AFTER_GTIDS;
449 lex_mi.gtid = connection_info->gtid;
450 break;
451 case CHANNEL_UNTIL_APPLIER_BEFORE_GTIDS:
452 lex_mi.gtid_until_condition = LEX_MASTER_INFO::UNTIL_SQL_BEFORE_GTIDS;
453 lex_mi.gtid = connection_info->gtid;
454 break;
455 case CHANNEL_UNTIL_APPLIER_AFTER_GAPS:
456 lex_mi.until_after_gaps = true;
457 break;
458 case CHANNEL_UNTIL_VIEW_ID:
459 DBUG_ASSERT((thread_mask & SLAVE_SQL) && connection_info->view_id);
460 lex_mi.view_id = connection_info->view_id;
461 break;
462 default:
463 DBUG_ASSERT(0);
464 }
465 }
466
467 if (wait_for_connection && (thread_mask & SLAVE_IO))
468 thread_start_id = mi->slave_run_id;
469
470 if (!thd) {
471 thd_created = true;
472 thd = create_surrogate_thread();
473 }
474
475 error = start_slave(thd, &lex_connection, &lex_mi, thread_mask, mi, false);
476
477 if (wait_for_connection && (thread_mask & SLAVE_IO) && !error) {
478 mysql_mutex_lock(&mi->run_lock);
479 /*
480 If the ids are still equal this means the start thread method did not
481 wait for the thread to start
482 */
483 while (thread_start_id == mi->slave_run_id) {
484 mysql_cond_wait(&mi->start_cond, &mi->run_lock);
485 }
486 mysql_mutex_unlock(&mi->run_lock);
487
488 while (mi->slave_running != MYSQL_SLAVE_RUN_CONNECT) {
489 // If there is such a state change then there was an error on connection
490 if (mi->slave_running == MYSQL_SLAVE_NOT_RUN) {
491 error = RPL_CHANNEL_SERVICE_RECEIVER_CONNECTION_ERROR;
492 break;
493 }
494 my_sleep(100);
495 }
496 }
497
498 err:
499 channel_map.unlock();
500
501 if (thd_created) {
502 delete_surrogate_thread(thd);
503 }
504
505 return error;
506 }
507
channel_stop(Master_info * mi,int threads_to_stop,long timeout)508 int channel_stop(Master_info *mi, int threads_to_stop, long timeout) {
509 channel_map.assert_some_lock();
510
511 if (mi == nullptr) {
512 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
513 }
514
515 int thread_mask = 0;
516 int server_thd_mask = 0;
517 int error = 0;
518 bool thd_init = false;
519
520 mi->channel_wrlock();
521 lock_slave_threads(mi);
522
523 init_thread_mask(&server_thd_mask, mi, false /* not inverse*/);
524
525 if ((threads_to_stop & CHANNEL_APPLIER_THREAD) &&
526 (server_thd_mask & SLAVE_SQL)) {
527 thread_mask |= SLAVE_SQL;
528 }
529 if ((threads_to_stop & CHANNEL_RECEIVER_THREAD) &&
530 (server_thd_mask & SLAVE_IO)) {
531 thread_mask |= SLAVE_IO;
532 }
533
534 if (thread_mask == 0) {
535 goto end;
536 }
537
538 thd_init = init_thread_context();
539
540 if (current_thd) current_thd->set_skip_readonly_check();
541
542 error = terminate_slave_threads(mi, thread_mask, timeout, false);
543
544 if (current_thd) current_thd->reset_skip_readonly_check();
545
546 end:
547 unlock_slave_threads(mi);
548 mi->channel_unlock();
549
550 if (thd_init) {
551 clean_thread_context();
552 }
553
554 return error;
555 }
556
channel_stop(const char * channel,int threads_to_stop,long timeout)557 int channel_stop(const char *channel, int threads_to_stop, long timeout) {
558 DBUG_TRACE;
559
560 channel_map.rdlock();
561
562 Master_info *mi = channel_map.get_mi(channel);
563
564 int error = channel_stop(mi, threads_to_stop, timeout);
565
566 channel_map.unlock();
567
568 return error;
569 }
570
channel_stop_all(int threads_to_stop,long timeout,std::string * error_message)571 int channel_stop_all(int threads_to_stop, long timeout,
572 std::string *error_message) {
573 Master_info *mi = nullptr;
574
575 /* Error related varaiables */
576 int error = 0;
577 std::stringstream err_msg_ss;
578 err_msg_ss << "Error stopping channel(s): ";
579
580 channel_map.rdlock();
581
582 for (mi_map::iterator it = channel_map.begin(); it != channel_map.end();
583 it++) {
584 mi = it->second;
585
586 if (mi) {
587 int channel_error = channel_stop(mi, threads_to_stop, timeout);
588
589 DBUG_EXECUTE_IF("group_replication_stop_all_channels_failure",
590 { channel_error = 1; });
591
592 if (channel_error &&
593 channel_error != RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR) {
594 error = channel_error;
595
596 mi->report(ERROR_LEVEL, error,
597 "Error stopping channel: %s. Got error: %d",
598 mi->get_channel(), error);
599
600 err_msg_ss << " '" << mi->get_channel() << "' [error number: " << error
601 << "],";
602 }
603 }
604 }
605
606 if (error) {
607 *error_message = err_msg_ss.str();
608 (*error_message)[error_message->length() - 1] = '.';
609 error_message->append(
610 " Please check the error log for additional details.");
611 }
612
613 channel_map.unlock();
614 return error;
615 }
616
617 class Kill_binlog_dump : public Do_THD_Impl {
618 public:
Kill_binlog_dump()619 Kill_binlog_dump() {}
620
operator ()(THD * thd_to_kill)621 virtual void operator()(THD *thd_to_kill) {
622 if (thd_to_kill->get_command() == COM_BINLOG_DUMP ||
623 thd_to_kill->get_command() == COM_BINLOG_DUMP_GTID) {
624 DBUG_ASSERT(thd_to_kill != current_thd);
625 MUTEX_LOCK(thd_data_lock, &thd_to_kill->LOCK_thd_data);
626 thd_to_kill->duplicate_slave_id = true;
627 thd_to_kill->awake(THD::KILL_CONNECTION);
628 }
629 }
630 };
631
binlog_dump_thread_kill()632 int binlog_dump_thread_kill() {
633 DBUG_TRACE;
634 Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
635 Kill_binlog_dump kill_binlog_dump;
636 thd_manager->do_for_all_thd(&kill_binlog_dump);
637 return 0;
638 }
639
channel_purge_queue(const char * channel,bool reset_all)640 int channel_purge_queue(const char *channel, bool reset_all) {
641 DBUG_TRACE;
642
643 channel_map.wrlock();
644
645 Master_info *mi = channel_map.get_mi(channel);
646
647 if (mi == nullptr) {
648 channel_map.unlock();
649 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
650 }
651
652 bool thd_init = init_thread_context();
653
654 int error = reset_slave(current_thd, mi, reset_all);
655
656 channel_map.unlock();
657
658 if (thd_init) {
659 clean_thread_context();
660 }
661
662 return error;
663 }
664
channel_is_active(const char * channel,enum_channel_thread_types thd_type)665 bool channel_is_active(const char *channel,
666 enum_channel_thread_types thd_type) {
667 int thread_mask = 0;
668 DBUG_TRACE;
669
670 channel_map.rdlock();
671
672 Master_info *mi = channel_map.get_mi(channel);
673
674 if (mi == nullptr) {
675 channel_map.unlock();
676 return false;
677 }
678
679 init_thread_mask(&thread_mask, mi, false /* not inverse*/);
680
681 channel_map.unlock();
682
683 switch (thd_type) {
684 case CHANNEL_NO_THD:
685 return true; // return true as the channel exists
686 case CHANNEL_RECEIVER_THREAD:
687 return thread_mask & SLAVE_IO;
688 case CHANNEL_APPLIER_THREAD:
689 return thread_mask & SLAVE_SQL;
690 default:
691 DBUG_ASSERT(0);
692 }
693 return false;
694 }
695
channel_get_thread_id(const char * channel,enum_channel_thread_types thd_type,unsigned long ** thread_id)696 int channel_get_thread_id(const char *channel,
697 enum_channel_thread_types thd_type,
698 unsigned long **thread_id) {
699 DBUG_TRACE;
700
701 int number_threads = -1;
702
703 channel_map.rdlock();
704
705 Master_info *mi = channel_map.get_mi(channel);
706
707 if (mi == nullptr) {
708 channel_map.unlock();
709 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
710 }
711
712 switch (thd_type) {
713 case CHANNEL_RECEIVER_THREAD:
714 mysql_mutex_lock(&mi->info_thd_lock);
715 if (mi->info_thd != nullptr) {
716 *thread_id = (unsigned long *)my_malloc(
717 PSI_NOT_INSTRUMENTED, sizeof(unsigned long), MYF(MY_WME));
718 **thread_id = mi->info_thd->thread_id();
719 number_threads = 1;
720 }
721 mysql_mutex_unlock(&mi->info_thd_lock);
722 break;
723 case CHANNEL_APPLIER_THREAD:
724 if (mi->rli != nullptr) {
725 mysql_mutex_lock(&mi->rli->run_lock);
726
727 if (mi->rli->slave_parallel_workers > 0) {
728 // Parallel applier.
729 size_t num_workers = mi->rli->get_worker_count();
730 number_threads = 1 + num_workers;
731 *thread_id = (unsigned long *)my_malloc(
732 PSI_NOT_INSTRUMENTED, number_threads * sizeof(unsigned long),
733 MYF(MY_WME));
734 unsigned long *thread_id_pointer = *thread_id;
735
736 // Set default values on thread_id array.
737 for (int i = 0; i < number_threads; i++, thread_id_pointer++)
738 *thread_id_pointer = -1;
739 thread_id_pointer = *thread_id;
740
741 // Coordinator thread id.
742 if (mi->rli->info_thd != nullptr) {
743 mysql_mutex_lock(&mi->rli->info_thd_lock);
744 *thread_id_pointer = mi->rli->info_thd->thread_id();
745 mysql_mutex_unlock(&mi->rli->info_thd_lock);
746 thread_id_pointer++;
747 }
748
749 // Workers thread id.
750 if (mi->rli->workers_array_initialized) {
751 for (size_t i = 0; i < num_workers; i++, thread_id_pointer++) {
752 Slave_worker *worker = mi->rli->get_worker(i);
753 if (worker != nullptr) {
754 mysql_mutex_lock(&worker->jobs_lock);
755 if (worker->info_thd != nullptr &&
756 worker->running_status != Slave_worker::NOT_RUNNING) {
757 mysql_mutex_lock(&worker->info_thd_lock);
758 *thread_id_pointer = worker->info_thd->thread_id();
759 mysql_mutex_unlock(&worker->info_thd_lock);
760 }
761 mysql_mutex_unlock(&worker->jobs_lock);
762 }
763 }
764 }
765 } else {
766 // Sequential applier.
767 if (mi->rli->info_thd != nullptr) {
768 *thread_id = (unsigned long *)my_malloc(
769 PSI_NOT_INSTRUMENTED, sizeof(unsigned long), MYF(MY_WME));
770 mysql_mutex_lock(&mi->rli->info_thd_lock);
771 **thread_id = mi->rli->info_thd->thread_id();
772 mysql_mutex_unlock(&mi->rli->info_thd_lock);
773 number_threads = 1;
774 }
775 }
776 mysql_mutex_unlock(&mi->rli->run_lock);
777 }
778 break;
779 default:
780 break;
781 }
782
783 channel_map.unlock();
784
785 return number_threads;
786 }
787
channel_get_last_delivered_gno(const char * channel,int sidno)788 long long channel_get_last_delivered_gno(const char *channel, int sidno) {
789 DBUG_TRACE;
790
791 channel_map.rdlock();
792
793 Master_info *mi = channel_map.get_mi(channel);
794
795 if (mi == nullptr) {
796 channel_map.unlock();
797 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
798 }
799
800 rpl_gno last_gno = 0;
801
802 Checkable_rwlock *sid_lock = mi->rli->get_sid_lock();
803 sid_lock->rdlock();
804 last_gno = mi->rli->get_gtid_set()->get_last_gno(sidno);
805 sid_lock->unlock();
806
807 #if !defined(DBUG_OFF)
808 const Gtid_set *retrieved_gtid_set = mi->rli->get_gtid_set();
809 char *retrieved_gtid_set_string = nullptr;
810 sid_lock->wrlock();
811 retrieved_gtid_set->to_string(&retrieved_gtid_set_string);
812 sid_lock->unlock();
813 DBUG_PRINT("info", ("get_last_delivered_gno retrieved_set_string: %s",
814 retrieved_gtid_set_string));
815 my_free(retrieved_gtid_set_string);
816 #endif
817
818 channel_map.unlock();
819
820 return last_gno;
821 }
822
channel_add_executed_gtids_to_received_gtids(const char * channel)823 int channel_add_executed_gtids_to_received_gtids(const char *channel) {
824 DBUG_TRACE;
825
826 channel_map.rdlock();
827 Master_info *mi = channel_map.get_mi(channel);
828 if (mi == nullptr) {
829 channel_map.unlock();
830 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
831 }
832
833 global_sid_lock->wrlock();
834
835 enum_return_status return_status =
836 mi->rli->add_gtid_set(gtid_state->get_executed_gtids());
837
838 global_sid_lock->unlock();
839 channel_map.unlock();
840
841 return return_status != RETURN_STATUS_OK;
842 }
843
channel_queue_packet(const char * channel,const char * buf,unsigned long event_len)844 int channel_queue_packet(const char *channel, const char *buf,
845 unsigned long event_len) {
846 int result;
847 DBUG_TRACE;
848
849 channel_map.rdlock();
850
851 Master_info *mi = channel_map.get_mi(channel);
852
853 if (mi == nullptr) {
854 channel_map.unlock();
855 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
856 }
857 channel_map.unlock();
858
859 result = queue_event(mi, buf, event_len, false /*flush_master_info*/);
860
861 return result;
862 }
863
channel_wait_until_apply_queue_applied(const char * channel,double timeout)864 int channel_wait_until_apply_queue_applied(const char *channel,
865 double timeout) {
866 DBUG_TRACE;
867
868 channel_map.rdlock();
869
870 Master_info *mi = channel_map.get_mi(channel);
871
872 if (mi == nullptr) {
873 channel_map.unlock();
874 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
875 }
876
877 mi->inc_reference();
878 channel_map.unlock();
879
880 /*
881 The retrieved_gtid_set (rli->get_gtid_set) has its own sid_map/sid_lock
882 and do not use global_sid_map/global_sid_lock. Instead of blocking both
883 sid locks on each wait iteration at rli->wait_for_gtid_set(Gtid_set), it
884 would be better to use rli->wait_for_gtid_set(char *) that will create a
885 new Gtid_set based on global_sid_map.
886 */
887 char *retrieved_gtid_set_buf;
888 mi->rli->get_sid_lock()->wrlock();
889 mi->rli->get_gtid_set()->to_string(&retrieved_gtid_set_buf);
890 mi->rli->get_sid_lock()->unlock();
891
892 int error = mi->rli->wait_for_gtid_set(current_thd, retrieved_gtid_set_buf,
893 timeout, false);
894 my_free(retrieved_gtid_set_buf);
895 mi->dec_reference();
896
897 if (error == -1) return REPLICATION_THREAD_WAIT_TIMEOUT_ERROR;
898 if (error == -2) return REPLICATION_THREAD_WAIT_NO_INFO_ERROR;
899
900 return error;
901 }
902
channel_wait_until_transactions_applied(const char * channel,const char * gtid_set,double timeout,bool update_THD_status)903 int channel_wait_until_transactions_applied(const char *channel,
904 const char *gtid_set,
905 double timeout,
906 bool update_THD_status) {
907 DBUG_TRACE;
908
909 channel_map.rdlock();
910
911 Master_info *mi = channel_map.get_mi(channel);
912
913 if (mi == nullptr) {
914 channel_map.unlock(); /* purecov: inspected */
915 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR; /* purecov:
916 inspected */
917 }
918
919 mi->inc_reference();
920 channel_map.unlock();
921
922 int error = mi->rli->wait_for_gtid_set(current_thd, gtid_set, timeout,
923 update_THD_status);
924 mi->dec_reference();
925
926 if (error == -1) return REPLICATION_THREAD_WAIT_TIMEOUT_ERROR;
927 if (error == -2) return REPLICATION_THREAD_WAIT_NO_INFO_ERROR;
928
929 return error;
930 }
931
channel_is_applier_waiting(const char * channel)932 int channel_is_applier_waiting(const char *channel) {
933 DBUG_TRACE;
934 int result = RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
935
936 channel_map.rdlock();
937
938 Master_info *mi = channel_map.get_mi(channel);
939
940 if (mi == nullptr) {
941 channel_map.unlock();
942 return result;
943 }
944
945 unsigned long *thread_ids = nullptr;
946 int number_appliers =
947 channel_get_thread_id(channel, CHANNEL_APPLIER_THREAD, &thread_ids);
948
949 if (number_appliers <= 0) {
950 goto end;
951 }
952
953 if (number_appliers == 1) {
954 result = channel_is_applier_thread_waiting(*thread_ids);
955 } else if (number_appliers > 1) {
956 int waiting = 0;
957
958 // Check if coordinator is waiting.
959 waiting += channel_is_applier_thread_waiting(thread_ids[0]);
960
961 // Check if workers are waiting.
962 for (int i = 1; i < number_appliers; i++)
963 waiting += channel_is_applier_thread_waiting(thread_ids[i], true);
964
965 // Check if all are waiting.
966 if (waiting == number_appliers)
967 result = 1;
968 else
969 result = 0;
970 }
971
972 end:
973 channel_map.unlock();
974 my_free(thread_ids);
975
976 return result;
977 }
978
channel_is_applier_thread_waiting(unsigned long thread_id,bool worker)979 int channel_is_applier_thread_waiting(unsigned long thread_id, bool worker) {
980 DBUG_TRACE;
981 int result = -1;
982
983 Find_thd_with_id find_thd_with_id(thread_id);
984 THD *thd = Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
985 if (thd) {
986 result = 0;
987
988 const char *proc_info = thd->get_proc_info();
989 if (proc_info) {
990 const char *stage_name = stage_slave_has_read_all_relay_log.m_name;
991 if (worker)
992 stage_name = stage_slave_waiting_event_from_coordinator.m_name;
993
994 if (!strcmp(proc_info, stage_name)) result = 1;
995 }
996 mysql_mutex_unlock(&thd->LOCK_thd_data);
997 }
998
999 return result;
1000 }
1001
channel_flush(const char * channel)1002 int channel_flush(const char *channel) {
1003 DBUG_TRACE;
1004
1005 channel_map.rdlock();
1006
1007 Master_info *mi = channel_map.get_mi(channel);
1008
1009 if (mi == nullptr) {
1010 channel_map.unlock();
1011 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
1012 }
1013
1014 bool error = (flush_relay_logs(mi, mi->info_thd) == 1);
1015
1016 channel_map.unlock();
1017
1018 return error ? 1 : 0;
1019 }
1020
channel_get_retrieved_gtid_set(const char * channel,char ** retrieved_set)1021 int channel_get_retrieved_gtid_set(const char *channel, char **retrieved_set) {
1022 DBUG_TRACE;
1023
1024 channel_map.rdlock();
1025
1026 Master_info *mi = channel_map.get_mi(channel);
1027
1028 if (mi == nullptr) {
1029 channel_map.unlock();
1030 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
1031 }
1032
1033 mi->inc_reference();
1034 channel_map.unlock();
1035
1036 int error = 0;
1037 const Gtid_set *receiver_gtid_set = mi->rli->get_gtid_set();
1038 if (receiver_gtid_set->to_string(retrieved_set, true /*need_lock*/) == -1)
1039 error = ER_OUTOFMEMORY;
1040
1041 mi->dec_reference();
1042
1043 return error;
1044 }
1045
channel_get_credentials(const char * channel,std::string & username,std::string & password)1046 int channel_get_credentials(const char *channel, std::string &username,
1047 std::string &password) {
1048 DBUG_TRACE;
1049 String_set user_store, pass_store, auth_store;
1050
1051 if (!Rpl_channel_credentials::get_instance().get_credentials(
1052 channel, user_store, pass_store, auth_store)) {
1053 if (user_store.first) username = user_store.second;
1054 if (pass_store.first) password = pass_store.second;
1055 return 0;
1056 }
1057
1058 channel_map.rdlock();
1059 Master_info *mi = channel_map.get_mi(channel);
1060
1061 if (mi == nullptr) {
1062 channel_map.unlock();
1063 return RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
1064 }
1065
1066 mi->inc_reference();
1067 channel_map.unlock();
1068
1069 char pass[MAX_PASSWORD_LENGTH + 1];
1070 size_t pass_size;
1071 mi->get_password(pass, &pass_size);
1072 username.assign(mi->get_user());
1073 password.assign(pass, pass_size);
1074
1075 mi->dec_reference();
1076
1077 return 0;
1078 }
1079
channel_is_stopping(const char * channel,enum_channel_thread_types thd_type)1080 bool channel_is_stopping(const char *channel,
1081 enum_channel_thread_types thd_type) {
1082 bool is_stopping = false;
1083 DBUG_TRACE;
1084
1085 channel_map.rdlock();
1086 Master_info *mi = channel_map.get_mi(channel);
1087 if (mi == nullptr) {
1088 channel_map.unlock();
1089 return false;
1090 }
1091
1092 switch (thd_type) {
1093 case CHANNEL_NO_THD:
1094 break;
1095 case CHANNEL_RECEIVER_THREAD:
1096 is_stopping = likely(mi->atomic_is_stopping);
1097 break;
1098 case CHANNEL_APPLIER_THREAD:
1099 is_stopping = likely(mi->rli->atomic_is_stopping);
1100 break;
1101 default:
1102 DBUG_ASSERT(0);
1103 }
1104
1105 channel_map.unlock();
1106
1107 return is_stopping;
1108 }
1109
is_partial_transaction_on_channel_relay_log(const char * channel)1110 bool is_partial_transaction_on_channel_relay_log(const char *channel) {
1111 DBUG_TRACE;
1112 channel_map.rdlock();
1113 Master_info *mi = channel_map.get_mi(channel);
1114 if (mi == nullptr) {
1115 channel_map.unlock();
1116 return false;
1117 }
1118 bool ret = mi->transaction_parser.is_inside_transaction();
1119 channel_map.unlock();
1120 return ret;
1121 }
1122
is_any_slave_channel_running(int thread_mask)1123 bool is_any_slave_channel_running(int thread_mask) {
1124 DBUG_TRACE;
1125 Master_info *mi = nullptr;
1126 bool is_running;
1127
1128 channel_map.rdlock();
1129
1130 for (mi_map::iterator it = channel_map.begin(); it != channel_map.end();
1131 it++) {
1132 mi = it->second;
1133
1134 if (mi) {
1135 if ((thread_mask & SLAVE_IO) != 0) {
1136 mysql_mutex_lock(&mi->run_lock);
1137 is_running = mi->slave_running;
1138 mysql_mutex_unlock(&mi->run_lock);
1139 if (is_running) {
1140 channel_map.unlock();
1141 return true;
1142 }
1143 }
1144
1145 if ((thread_mask & SLAVE_SQL) != 0) {
1146 mysql_mutex_lock(&mi->rli->run_lock);
1147 is_running = mi->rli->slave_running;
1148 mysql_mutex_unlock(&mi->rli->run_lock);
1149 if (is_running) {
1150 channel_map.unlock();
1151 return true;
1152 }
1153 }
1154 }
1155 }
1156
1157 channel_map.unlock();
1158 return false;
1159 }
1160
1161 enum_slave_channel_status
has_any_slave_channel_open_temp_table_or_is_its_applier_running()1162 has_any_slave_channel_open_temp_table_or_is_its_applier_running() {
1163 DBUG_TRACE;
1164 Master_info *mi = nullptr;
1165 bool is_applier_running = false;
1166 bool has_open_temp_tables = false;
1167 mi_map::iterator it;
1168
1169 channel_map.rdlock();
1170
1171 mi_map::iterator it_end = channel_map.end();
1172 for (it = channel_map.begin(); it != channel_map.end(); ++it) {
1173 mi = it->second;
1174
1175 if (Master_info::is_configured(mi)) {
1176 mysql_mutex_lock(&mi->rli->run_lock);
1177 is_applier_running = mi->rli->slave_running;
1178 if (mi->rli->atomic_channel_open_temp_tables > 0)
1179 has_open_temp_tables = true;
1180 if (is_applier_running || has_open_temp_tables) {
1181 /*
1182 Stop acquiring more run_locks and start to release the held
1183 run_locks once finding that a slave channel applier thread
1184 is running or a slave channel has open temporary table(s),
1185 and record the stop position.
1186 */
1187 it_end = ++it;
1188 break;
1189 }
1190 }
1191 }
1192
1193 /*
1194 Release the held run_locks until the stop position recorded in above
1195 or the end of the channel_map.
1196 */
1197 for (it = channel_map.begin(); it != it_end; ++it) {
1198 mi = it->second;
1199 if (Master_info::is_configured(mi)) mysql_mutex_unlock(&mi->rli->run_lock);
1200 }
1201
1202 channel_map.unlock();
1203
1204 if (has_open_temp_tables)
1205 return SLAVE_CHANNEL_HAS_OPEN_TEMPORARY_TABLE;
1206 else if (is_applier_running)
1207 return SLAVE_CHANNEL_APPLIER_IS_RUNNING;
1208
1209 return SLAVE_CHANNEL_NO_APPLIER_RUNNING_AND_NO_OPEN_TEMPORARY_TABLE;
1210 }
1211
channel_delete_credentials(const char * channel_name)1212 int channel_delete_credentials(const char *channel_name) {
1213 DBUG_TRACE;
1214 return Rpl_channel_credentials::get_instance().delete_credentials(
1215 channel_name);
1216 }
1217