1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "handlers/applier_handler.h"
24 #include "plugin_log.h"
25 #include "plugin.h"
26
Applier_handler()27 Applier_handler::Applier_handler() {}
28
initialize()29 int Applier_handler::initialize()
30 {
31 DBUG_ENTER("Applier_handler::initialize");
32 DBUG_RETURN(0);
33 }
34
terminate()35 int Applier_handler::terminate()
36 {
37 DBUG_ENTER("Applier_handler::terminate");
38 DBUG_RETURN(0);
39 }
40
41 int
initialize_repositories(bool reset_logs,ulong plugin_shutdown_timeout)42 Applier_handler::initialize_repositories(bool reset_logs,
43 ulong plugin_shutdown_timeout)
44 {
45 DBUG_ENTER("Applier_handler::initialize_repositories");
46
47 int error=0;
48
49 if (reset_logs)
50 {
51 log_message(MY_INFORMATION_LEVEL,
52 "Detected previous RESET MASTER invocation or an issue exists "
53 "in the group replication applier relay log. "
54 "Purging existing applier logs.");
55
56 if ((error = channel_interface.purge_logs(true)))
57 {
58 /* purecov: begin inspected */
59 log_message(MY_ERROR_LEVEL,
60 "Unknown error occurred while resetting applier's module logs");
61 DBUG_RETURN(error);
62 /* purecov: end */
63 }
64 }
65
66 channel_interface.set_stop_wait_timeout(plugin_shutdown_timeout);
67
68 error= channel_interface.initialize_channel(const_cast<char*>("<NULL>"),
69 0, NULL, NULL,
70 false,
71 NULL,
72 NULL,
73 NULL,
74 NULL,
75 NULL,
76 NULL,
77 NULL,
78 false,
79 GROUP_REPLICATION_APPLIER_THREAD_PRIORITY,
80 0, true, true, true);
81
82 if (error)
83 {
84 log_message(MY_ERROR_LEVEL,
85 "Failed to setup the group replication applier thread."); /* purecov: inspected */
86 }
87
88 DBUG_RETURN(error);
89 }
90
start_applier_thread()91 int Applier_handler::start_applier_thread()
92 {
93 DBUG_ENTER("Applier_handler::start_applier_thread");
94
95 int error= channel_interface.start_threads(false, true,
96 NULL, false);
97 if (error)
98 {
99 log_message(MY_ERROR_LEVEL,
100 "Error while starting the group replication applier thread");
101 }
102
103 DBUG_RETURN(error);
104 }
105
stop_applier_thread()106 int Applier_handler::stop_applier_thread()
107 {
108 DBUG_ENTER("Applier_handler::stop_applier_thread");
109
110 int error= 0;
111
112 if (!channel_interface.is_applier_thread_running())
113 DBUG_RETURN(0);
114
115 if ((error= channel_interface.stop_threads(false, true)))
116 {
117 log_message(MY_ERROR_LEVEL,
118 "Failed to stop the group replication applier thread."); /* purecov: inspected */
119 }
120
121 DBUG_RETURN(error);
122 }
123
handle_event(Pipeline_event * event,Continuation * cont)124 int Applier_handler::handle_event(Pipeline_event *event,Continuation *cont)
125 {
126 DBUG_ENTER("Applier_handler::handle_event");
127 int error= 0;
128
129 Data_packet* p= NULL;
130 error= event->get_Packet(&p);
131 DBUG_EXECUTE_IF("applier_handler_force_error_on_pipeline", error= 1;);
132 if (error || (p == NULL))
133 {
134 log_message(MY_ERROR_LEVEL,
135 "Failed to fetch transaction data containing required"
136 " transaction info for applier");
137 error= 1;
138 goto end;
139 }
140
141 /*
142 There is no need to queue Transaction_context_log_event to
143 server applier, this event is only need for certification,
144 performed on the previous handler.
145 */
146 if (event->get_event_type() != binary_log::TRANSACTION_CONTEXT_EVENT)
147 {
148 error= channel_interface.queue_packet((const char*)p->payload, p->len);
149
150 if (event->get_event_type() == binary_log::GTID_LOG_EVENT &&
151 local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
152 {
153 applier_module->get_pipeline_stats_member_collector()
154 ->increment_transactions_waiting_apply();
155 }
156 }
157
158 end:
159 if (error)
160 cont->signal(error);
161 else
162 next(event,cont);
163
164 DBUG_RETURN(error);
165 }
166
handle_action(Pipeline_action * action)167 int Applier_handler::handle_action(Pipeline_action *action)
168 {
169 DBUG_ENTER("Applier_handler::handle_action");
170 int error= 0;
171
172 Plugin_handler_action action_type=
173 (Plugin_handler_action)action->get_action_type();
174
175 switch(action_type)
176 {
177 case HANDLER_START_ACTION:
178 error= start_applier_thread();
179 break;
180 case HANDLER_STOP_ACTION:
181 error= stop_applier_thread();
182 break;
183 case HANDLER_APPLIER_CONF_ACTION:
184 {
185 Handler_applier_configuration_action* conf_action=
186 (Handler_applier_configuration_action*) action;
187
188 if (conf_action->is_initialization_conf())
189 {
190 channel_interface.set_channel_name(conf_action->get_applier_name());
191 error= initialize_repositories(conf_action->is_reset_logs_planned(),
192 conf_action->get_applier_shutdown_timeout());
193 }
194 else
195 {
196 ulong timeout= conf_action->get_applier_shutdown_timeout();
197 channel_interface.set_stop_wait_timeout(timeout);
198 }
199 break;
200 }
201 default:
202 break;
203 }
204
205 if (error)
206 DBUG_RETURN(error);
207
208 DBUG_RETURN(next(action));
209 }
210
is_unique()211 bool Applier_handler::is_unique(){
212 return true;
213 }
214
get_role()215 int Applier_handler::get_role()
216 {
217 return APPLIER;
218 }
219
is_applier_thread_waiting()220 bool Applier_handler::is_applier_thread_waiting()
221 {
222 DBUG_ENTER("Applier_handler::is_applier_thread_waiting");
223
224 bool result= channel_interface.is_applier_thread_waiting();
225
226 DBUG_RETURN(result);
227 }
228
wait_for_gtid_execution(double timeout)229 int Applier_handler::wait_for_gtid_execution(double timeout)
230 {
231 DBUG_ENTER("Applier_handler::wait_for_gtid_execution");
232
233 int error= channel_interface.wait_for_gtid_execution(timeout);
234
235 DBUG_RETURN(error);
236 }
237
is_partial_transaction_on_relay_log()238 int Applier_handler::is_partial_transaction_on_relay_log()
239 {
240 return channel_interface.is_partial_transaction_on_relay_log();
241 }
242