1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "handlers/applier_handler.h"
24 #include "plugin_log.h"
25 #include "plugin.h"
26 
Applier_handler()27 Applier_handler::Applier_handler() {}
28 
initialize()29 int Applier_handler::initialize()
30 {
31   DBUG_ENTER("Applier_handler::initialize");
32   DBUG_RETURN(0);
33 }
34 
terminate()35 int Applier_handler::terminate()
36 {
37   DBUG_ENTER("Applier_handler::terminate");
38   DBUG_RETURN(0);
39 }
40 
41 int
initialize_repositories(bool reset_logs,ulong plugin_shutdown_timeout)42 Applier_handler::initialize_repositories(bool reset_logs,
43                                          ulong plugin_shutdown_timeout)
44 {
45   DBUG_ENTER("Applier_handler::initialize_repositories");
46 
47   int error=0;
48 
49   if (reset_logs)
50   {
51     log_message(MY_INFORMATION_LEVEL,
52                 "Detected previous RESET MASTER invocation or an issue exists "
53                 "in the group replication applier relay log. "
54                 "Purging existing applier logs.");
55 
56     if ((error = channel_interface.purge_logs(true)))
57     {
58       /* purecov: begin inspected */
59       log_message(MY_ERROR_LEVEL,
60                  "Unknown error occurred while resetting applier's module logs");
61       DBUG_RETURN(error);
62       /* purecov: end */
63     }
64   }
65 
66   channel_interface.set_stop_wait_timeout(plugin_shutdown_timeout);
67 
68   error= channel_interface.initialize_channel(const_cast<char*>("<NULL>"),
69                                               0, NULL, NULL,
70                                               false,
71                                               NULL,
72                                               NULL,
73                                               NULL,
74                                               NULL,
75                                               NULL,
76                                               NULL,
77                                               NULL,
78                                               false,
79                                               GROUP_REPLICATION_APPLIER_THREAD_PRIORITY,
80                                               0, true, true, true);
81 
82   if (error)
83   {
84     log_message(MY_ERROR_LEVEL,
85                 "Failed to setup the group replication applier thread."); /* purecov: inspected */
86   }
87 
88   DBUG_RETURN(error);
89 }
90 
start_applier_thread()91 int Applier_handler::start_applier_thread()
92 {
93   DBUG_ENTER("Applier_handler::start_applier_thread");
94 
95   int error= channel_interface.start_threads(false, true,
96                                              NULL, false);
97   if (error)
98   {
99       log_message(MY_ERROR_LEVEL,
100                   "Error while starting the group replication applier thread");
101   }
102 
103   DBUG_RETURN(error);
104 }
105 
stop_applier_thread()106 int Applier_handler::stop_applier_thread()
107 {
108   DBUG_ENTER("Applier_handler::stop_applier_thread");
109 
110   int error= 0;
111 
112   if (!channel_interface.is_applier_thread_running())
113     DBUG_RETURN(0);
114 
115   if ((error= channel_interface.stop_threads(false, true)))
116   {
117       log_message(MY_ERROR_LEVEL,
118                   "Failed to stop the group replication applier thread."); /* purecov: inspected */
119   }
120 
121   DBUG_RETURN(error);
122 }
123 
handle_event(Pipeline_event * event,Continuation * cont)124 int Applier_handler::handle_event(Pipeline_event *event,Continuation *cont)
125 {
126   DBUG_ENTER("Applier_handler::handle_event");
127   int error= 0;
128 
129   Data_packet* p=  NULL;
130   error= event->get_Packet(&p);
131   DBUG_EXECUTE_IF("applier_handler_force_error_on_pipeline", error= 1;);
132   if (error || (p == NULL))
133   {
134     log_message(MY_ERROR_LEVEL,
135                 "Failed to fetch transaction data containing required"
136                 " transaction info for applier");
137     error= 1;
138     goto end;
139   }
140 
141   /*
142     There is no need to queue Transaction_context_log_event to
143     server applier, this event is only need for certification,
144     performed on the previous handler.
145   */
146   if (event->get_event_type() != binary_log::TRANSACTION_CONTEXT_EVENT)
147   {
148     error= channel_interface.queue_packet((const char*)p->payload, p->len);
149 
150     if (event->get_event_type() == binary_log::GTID_LOG_EVENT &&
151         local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
152     {
153       applier_module->get_pipeline_stats_member_collector()
154           ->increment_transactions_waiting_apply();
155     }
156   }
157 
158 end:
159   if (error)
160     cont->signal(error);
161   else
162     next(event,cont);
163 
164   DBUG_RETURN(error);
165 }
166 
handle_action(Pipeline_action * action)167 int Applier_handler::handle_action(Pipeline_action *action)
168 {
169   DBUG_ENTER("Applier_handler::handle_action");
170   int error= 0;
171 
172   Plugin_handler_action action_type=
173     (Plugin_handler_action)action->get_action_type();
174 
175   switch(action_type)
176   {
177     case HANDLER_START_ACTION:
178       error= start_applier_thread();
179       break;
180     case HANDLER_STOP_ACTION:
181       error= stop_applier_thread();
182       break;
183     case HANDLER_APPLIER_CONF_ACTION:
184     {
185       Handler_applier_configuration_action* conf_action=
186               (Handler_applier_configuration_action*) action;
187 
188       if (conf_action->is_initialization_conf())
189       {
190         channel_interface.set_channel_name(conf_action->get_applier_name());
191         error= initialize_repositories(conf_action->is_reset_logs_planned(),
192                                        conf_action->get_applier_shutdown_timeout());
193       }
194       else
195       {
196         ulong timeout= conf_action->get_applier_shutdown_timeout();
197         channel_interface.set_stop_wait_timeout(timeout);
198       }
199       break;
200     }
201     default:
202       break;
203   }
204 
205   if (error)
206     DBUG_RETURN(error);
207 
208   DBUG_RETURN(next(action));
209 }
210 
is_unique()211 bool Applier_handler::is_unique(){
212   return true;
213 }
214 
get_role()215 int Applier_handler::get_role()
216 {
217   return APPLIER;
218 }
219 
is_applier_thread_waiting()220 bool Applier_handler::is_applier_thread_waiting()
221 {
222   DBUG_ENTER("Applier_handler::is_applier_thread_waiting");
223 
224   bool result= channel_interface.is_applier_thread_waiting();
225 
226   DBUG_RETURN(result);
227 }
228 
wait_for_gtid_execution(double timeout)229 int Applier_handler::wait_for_gtid_execution(double timeout)
230 {
231   DBUG_ENTER("Applier_handler::wait_for_gtid_execution");
232 
233   int error= channel_interface.wait_for_gtid_execution(timeout);
234 
235   DBUG_RETURN(error);
236 }
237 
is_partial_transaction_on_relay_log()238 int Applier_handler::is_partial_transaction_on_relay_log()
239 {
240   return channel_interface.is_partial_transaction_on_relay_log();
241 }
242