1 /* Copyright (c) 2014, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "plugin/group_replication/include/handlers/certification_handler.h"
24 
25 #include <mysql/components/services/log_builtins.h>
26 #include "my_dbug.h"
27 #include "my_inttypes.h"
28 #include "plugin/group_replication/include/consistency_manager.h"
29 #include "plugin/group_replication/include/handlers/pipeline_handlers.h"
30 #include "plugin/group_replication/include/plugin.h"
31 
32 using std::string;
33 const int GTID_WAIT_TIMEOUT = 10;  // 10 seconds
34 const int LOCAL_WAIT_TIMEOUT_ERROR = -1;
35 
Certification_handler()36 Certification_handler::Certification_handler()
37     : cert_module(nullptr),
38       applier_module_thd(nullptr),
39       group_sidno(0),
40       transaction_context_packet(nullptr),
41       transaction_context_pevent(nullptr),
42       m_view_change_event_on_wait(false) {}
43 
~Certification_handler()44 Certification_handler::~Certification_handler() {
45   delete transaction_context_pevent;
46   delete transaction_context_packet;
47 
48   for (std::list<View_change_stored_info *>::iterator stored_view_info_it =
49            pending_view_change_events.begin();
50        stored_view_info_it != pending_view_change_events.end();
51        ++stored_view_info_it) {
52     delete (*stored_view_info_it)->view_change_pevent;
53     delete *stored_view_info_it;
54   }
55 }
56 
initialize()57 int Certification_handler::initialize() {
58   DBUG_TRACE;
59   DBUG_ASSERT(cert_module == nullptr);
60   cert_module = new Certifier();
61   return 0;
62 }
63 
terminate()64 int Certification_handler::terminate() {
65   DBUG_TRACE;
66   int error = 0;
67 
68   if (cert_module == nullptr) return error; /* purecov: inspected */
69 
70   delete cert_module;
71   cert_module = nullptr;
72   return error;
73 }
74 
handle_action(Pipeline_action * action)75 int Certification_handler::handle_action(Pipeline_action *action) {
76   DBUG_TRACE;
77 
78   int error = 0;
79 
80   Plugin_handler_action action_type =
81       (Plugin_handler_action)action->get_action_type();
82 
83   if (action_type == HANDLER_CERT_CONF_ACTION) {
84     Handler_certifier_configuration_action *conf_action =
85         (Handler_certifier_configuration_action *)action;
86 
87     error =
88         cert_module->initialize(conf_action->get_gtid_assignment_block_size());
89 
90     group_sidno = conf_action->get_group_sidno();
91   } else if (action_type == HANDLER_CERT_INFO_ACTION) {
92     Handler_certifier_information_action *cert_inf_action =
93         (Handler_certifier_information_action *)action;
94 
95     error = cert_module->set_certification_info(
96         cert_inf_action->get_certification_info());
97   } else if (action_type == HANDLER_VIEW_CHANGE_ACTION) {
98     View_change_pipeline_action *vc_action =
99         (View_change_pipeline_action *)action;
100 
101     if (!vc_action->is_leaving()) {
102       cert_module->handle_view_change();
103     }
104   } else if (action_type == HANDLER_THD_ACTION) {
105     Handler_THD_setup_action *thd_conf_action =
106         (Handler_THD_setup_action *)action;
107     applier_module_thd = thd_conf_action->get_THD_object();
108   } else if (action_type == HANDLER_STOP_ACTION) {
109     error = cert_module->terminate();
110   }
111 
112   if (error) return error;
113 
114   return next(action);
115 }
116 
handle_event(Pipeline_event * pevent,Continuation * cont)117 int Certification_handler::handle_event(Pipeline_event *pevent,
118                                         Continuation *cont) {
119   DBUG_TRACE;
120 
121   Log_event_type ev_type = pevent->get_event_type();
122   switch (ev_type) {
123     case binary_log::TRANSACTION_CONTEXT_EVENT:
124       return handle_transaction_context(pevent, cont);
125     case binary_log::GTID_LOG_EVENT:
126       return handle_transaction_id(pevent, cont);
127     case binary_log::VIEW_CHANGE_EVENT:
128       return extract_certification_info(pevent, cont);
129     default:
130       next(pevent, cont);
131       return 0;
132   }
133 }
134 
set_transaction_context(Pipeline_event * pevent)135 int Certification_handler::set_transaction_context(Pipeline_event *pevent) {
136   DBUG_TRACE;
137   int error = 0;
138 
139   DBUG_ASSERT(transaction_context_packet == nullptr);
140   DBUG_ASSERT(transaction_context_pevent == nullptr);
141 
142   Data_packet *packet = nullptr;
143   error = pevent->get_Packet(&packet);
144   if (error || (packet == nullptr)) {
145     /* purecov: begin inspected */
146     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_TRANS_CONTEXT_FAILED);
147     return 1;
148     /* purecov: end */
149   }
150   transaction_context_packet = new Data_packet(packet->payload, packet->len);
151 
152   return error;
153 }
154 
get_transaction_context(Pipeline_event * pevent,Transaction_context_log_event ** tcle)155 int Certification_handler::get_transaction_context(
156     Pipeline_event *pevent, Transaction_context_log_event **tcle) {
157   DBUG_TRACE;
158   int error = 0;
159 
160   DBUG_ASSERT(transaction_context_packet != nullptr);
161   DBUG_ASSERT(transaction_context_pevent == nullptr);
162 
163   Format_description_log_event *fdle = nullptr;
164   if (pevent->get_FormatDescription(&fdle) && (fdle == nullptr)) {
165     /* purecov: begin inspected */
166     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_FORMAT_DESC_LOG_EVENT_FAILED);
167     return 1;
168     /* purecov: end */
169   }
170 
171   transaction_context_pevent =
172       new Pipeline_event(transaction_context_packet, fdle);
173   Log_event *transaction_context_event = nullptr;
174   error = transaction_context_pevent->get_LogEvent(&transaction_context_event);
175   transaction_context_packet = nullptr;
176   DBUG_EXECUTE_IF("certification_handler_force_error_on_pipeline", error = 1;);
177   if (error || (transaction_context_event == nullptr)) {
178     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_TRANS_CONTEXT_LOG_EVENT_FAILED);
179     return 1;
180   }
181 
182   *tcle =
183       static_cast<Transaction_context_log_event *>(transaction_context_event);
184   if ((*tcle)->read_snapshot_version()) {
185     /* purecov: begin inspected */
186     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_SNAPSHOT_VERSION_FAILED);
187     return 1;
188     /* purecov: end */
189   }
190 
191   return error;
192 }
193 
reset_transaction_context()194 void Certification_handler::reset_transaction_context() {
195   DBUG_TRACE;
196 
197   /*
198     Release memory allocated to transaction_context_packet,
199     since it is wrapped by transaction_context_pevent.
200   */
201   delete transaction_context_pevent;
202   transaction_context_pevent = nullptr;
203 }
204 
handle_transaction_context(Pipeline_event * pevent,Continuation * cont)205 int Certification_handler::handle_transaction_context(Pipeline_event *pevent,
206                                                       Continuation *cont) {
207   DBUG_TRACE;
208   int error = 0;
209 
210   error = set_transaction_context(pevent);
211   if (error)
212     cont->signal(1, true); /* purecov: inspected */
213   else
214     next(pevent, cont);
215 
216   return error;
217 }
218 
handle_transaction_id(Pipeline_event * pevent,Continuation * cont)219 int Certification_handler::handle_transaction_id(Pipeline_event *pevent,
220                                                  Continuation *cont) {
221   DBUG_TRACE;
222   int error = 0;
223   rpl_gno seq_number = 0;
224   bool local_transaction = true;
225   Transaction_context_log_event *tcle = nullptr;
226   Log_event *event = nullptr;
227   Gtid_log_event *gle = nullptr;
228   std::list<Gcs_member_identifier> *online_members =
229       pevent->get_online_members();
230 
231   /*
232     Get transaction context.
233   */
234   error = get_transaction_context(pevent, &tcle);
235   if (error) {
236     cont->signal(1, true);
237     goto end;
238   }
239 
240   /*
241     Get transaction global identifier event.
242   */
243   error = pevent->get_LogEvent(&event);
244   if (error || (event == nullptr)) {
245     /* purecov: begin inspected */
246     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_GTID_LOG_EVENT_FAILED);
247     cont->signal(1, true);
248     error = 1;
249     goto end;
250     /* purecov: end */
251   }
252   gle = static_cast<Gtid_log_event *>(event);
253 
254   local_transaction =
255       !strncmp(tcle->get_server_uuid(), local_member_info->get_uuid().c_str(),
256                UUID_LENGTH);
257 
258   /*
259     Group contains members that do not support transactions with group
260     coordination, thence the transaction must rollback.
261   */
262   DBUG_EXECUTE_IF(
263       "group_replication_force_lower_version_on_group_replication_consistency",
264       { online_members = nullptr; };);
265   if (pevent->get_consistency_level() >= GROUP_REPLICATION_CONSISTENCY_AFTER &&
266       nullptr == online_members) {
267     goto after_certify;
268   }
269 
270   /*
271     Certify transaction.
272   */
273   seq_number =
274       cert_module->certify(tcle->get_snapshot_version(), tcle->get_write_set(),
275                            !tcle->is_gtid_specified(), tcle->get_server_uuid(),
276                            gle, local_transaction);
277 
278 after_certify:
279   if (local_transaction) {
280     /*
281       Local transaction.
282       After certification we need to wake up the waiting thread on the
283       plugin to proceed with the transaction processing.
284       Sequence number <= 0 means abort, so we need to pass a negative
285       value to transaction context.
286     */
287     Transaction_termination_ctx transaction_termination_ctx;
288     memset(&transaction_termination_ctx, 0,
289            sizeof(transaction_termination_ctx));
290     transaction_termination_ctx.m_thread_id = tcle->get_thread_id();
291     if (seq_number > 0) {
292       transaction_termination_ctx.m_rollback_transaction = false;
293       if (tcle->is_gtid_specified()) {
294         transaction_termination_ctx.m_generated_gtid = false;
295       } else {
296         transaction_termination_ctx.m_generated_gtid = true;
297         transaction_termination_ctx.m_sidno = group_sidno;
298         transaction_termination_ctx.m_gno = seq_number;
299       }
300     } else {
301       transaction_termination_ctx.m_rollback_transaction = true;
302       transaction_termination_ctx.m_generated_gtid = false;
303       transaction_termination_ctx.m_sidno = -1;
304       transaction_termination_ctx.m_gno = -1;
305     }
306 
307     if (set_transaction_ctx(transaction_termination_ctx)) {
308       /* purecov: begin inspected */
309       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_UPDATE_SERV_CERTIFICATE_FAILED,
310                    tcle->get_thread_id());
311       cont->signal(1, true);
312       error = 1;
313       goto end;
314       /* purecov: end */
315     }
316 
317     if (seq_number > 0) {
318       const rpl_sid *sid = nullptr;
319       rpl_sidno sidno = group_sidno;
320       rpl_gno gno = seq_number;
321 
322       if (tcle->is_gtid_specified()) {
323         sid = gle->get_sid();
324         sidno = gle->get_sidno(true);
325         gno = gle->get_gno();
326         error =
327             cert_module->add_specified_gtid_to_group_gtid_executed(gle, true);
328         DBUG_EXECUTE_IF("unable_to_add_specified_gtid_for_local_transaction",
329                         error = 1;);
330 
331         if (error) {
332           LogPluginErr(ERROR_LEVEL,
333                        ER_GRP_RPL_ADD_GTID_INFO_WITH_LOCAL_GTID_FAILED);
334           transactions_latch->releaseTicket(tcle->get_thread_id());
335           cont->signal(1, true);
336           goto end;
337         }
338       } else {
339         if (cert_module->add_group_gtid_to_group_gtid_executed(gno, true)) {
340           /* purecov: begin inspected */
341           LogPluginErr(ERROR_LEVEL,
342                        ER_GRP_RPL_ADD_GTID_INFO_WITHOUT_LOCAL_GTID_FAILED);
343           transactions_latch->releaseTicket(tcle->get_thread_id());
344           cont->signal(1, true);
345           error = 1;
346           goto end;
347           /* purecov: end */
348         }
349       }
350 
351       if (pevent->get_consistency_level() >=
352           GROUP_REPLICATION_CONSISTENCY_AFTER) {
353         Transaction_consistency_info *transaction_consistency_info =
354             new Transaction_consistency_info(
355                 tcle->get_thread_id(), local_transaction, sid, sidno, gno,
356                 pevent->get_consistency_level(), pevent->get_online_members());
357         pevent->release_online_members_memory_ownership();
358         if (transaction_consistency_manager->after_certification(
359                 transaction_consistency_info)) {
360           /* purecov: begin inspected */
361           delete transaction_consistency_info;
362           cont->signal(1, true);
363           error = 1;
364           goto end;
365           /* purecov: end */
366         }
367       }
368     }
369 
370     /*
371       We only release the local transaction here when its consistency
372       does not require group coordination.
373     */
374     if ((seq_number <= 0 || pevent->get_consistency_level() <
375                                 GROUP_REPLICATION_CONSISTENCY_AFTER) &&
376         transactions_latch->releaseTicket(tcle->get_thread_id())) {
377       /* purecov: begin inspected */
378       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NOTIFY_CERTIFICATION_OUTCOME_FAILED);
379       cont->signal(1, true);
380       error = 1;
381       goto end;
382       /* purecov: end */
383     }
384 
385     // The pipeline ended for this transaction
386     cont->signal(0, true);
387   } else {
388     /*
389       Remote transaction.
390     */
391     if (seq_number > 0) {
392       const rpl_sid *sid = nullptr;
393       rpl_sidno sidno = group_sidno;
394       rpl_gno gno = seq_number;
395 
396       if (!tcle->is_gtid_specified()) {
397         // Create new GTID event.
398         Gtid gtid = {sidno, gno};
399         Gtid_specification gtid_specification = {ASSIGNED_GTID, gtid};
400         Gtid_log_event *gle_generated = new Gtid_log_event(
401             gle->server_id, gle->is_using_trans_cache(), gle->last_committed,
402             gle->sequence_number, gle->may_have_sbr_stmts,
403             gle->original_commit_timestamp, gle->immediate_commit_timestamp,
404             gtid_specification, gle->original_server_version,
405             gle->immediate_server_version);
406         // Copy the transaction length to the new event.
407         gle_generated->set_trx_length(gle->transaction_length);
408 
409         pevent->reset_pipeline_event();
410         pevent->set_LogEvent(gle_generated);
411 
412         // Add the gtid information in the executed gtid set for the remote
413         // transaction which have gtid specified.
414         if (cert_module->add_group_gtid_to_group_gtid_executed(gno, false)) {
415           /* purecov: begin inspected */
416           LogPluginErr(ERROR_LEVEL,
417                        ER_GRP_RPL_ADD_GTID_INFO_WITHOUT_REMOTE_GTID_FAILED);
418           cont->signal(1, true);
419           error = 1;
420           goto end;
421           /* purecov: end */
422         }
423       }
424 
425       else {
426         sid = gle->get_sid();
427         sidno = gle->get_sidno(true);
428         gno = gle->get_gno();
429         error =
430             cert_module->add_specified_gtid_to_group_gtid_executed(gle, false);
431         DBUG_EXECUTE_IF("unable_to_add_specified_gtid_for_remote_transaction",
432                         error = 1;);
433 
434         if (error) {
435           /* purecov: begin inspected */
436           LogPluginErr(ERROR_LEVEL,
437                        ER_GRP_RPL_ADD_GTID_INFO_WITH_REMOTE_GTID_FAILED);
438           cont->signal(1, true);
439           goto end;
440           /* purecov: end */
441         }
442       }
443 
444       if (pevent->get_consistency_level() >=
445           GROUP_REPLICATION_CONSISTENCY_AFTER) {
446         Transaction_consistency_info *transaction_consistency_info =
447             new Transaction_consistency_info(
448                 tcle->get_thread_id(), local_transaction, sid, sidno, gno,
449                 pevent->get_consistency_level(), pevent->get_online_members());
450         pevent->release_online_members_memory_ownership();
451         if (transaction_consistency_manager->after_certification(
452                 transaction_consistency_info)) {
453           /* purecov: begin inspected */
454           delete transaction_consistency_info;
455           cont->signal(1, true);
456           error = 1;
457           goto end;
458           /* purecov: end */
459         }
460       }
461 
462       // Pass transaction to next action.
463       next(pevent, cont);
464     } else if (seq_number < 0) {
465       error = 1;
466       cont->signal(1, true);
467       goto end;
468     } else {
469       // The transaction was negatively certified so discard it.
470       cont->signal(0, true);
471     }
472   }
473 
474 end:
475   reset_transaction_context();
476   return error;
477 }
478 
extract_certification_info(Pipeline_event * pevent,Continuation * cont)479 int Certification_handler::extract_certification_info(Pipeline_event *pevent,
480                                                       Continuation *cont) {
481   DBUG_TRACE;
482   int error = 0;
483 
484   if (pevent->get_event_context() != SINGLE_VIEW_EVENT) {
485     /*
486       If the current view event is embraced on a transaction:
487       GTID, BEGIN, VIEW, COMMIT; it means that we are handling
488       a view that was delivered by a asynchronous channel from
489       outside of the group.
490       On that case we just have to queue it on the group applier
491       channel, without any special handling.
492     */
493     next(pevent, cont);
494     return error;
495   }
496 
497   /*
498     If the current view event is a standalone event (not inside a
499     transaction), it means that it was injected from GCS on a
500     membership change.
501     On that case we need to queue it on the group applier wrapped
502     on a transaction with a group generated GTID.
503   */
504 
505   /*
506     If there are pending view changes to apply, apply them first.
507     If we can't apply the old VCLEs probably we can't apply the new one
508   */
509   if (unlikely(m_view_change_event_on_wait)) {
510     error = log_delayed_view_change_events(cont);
511     m_view_change_event_on_wait = !pending_view_change_events.empty();
512   }
513 
514   std::string local_gtid_certified_string;
515   rpl_gno view_change_event_gno = -1;
516   if (!error) {
517     error = log_view_change_event_in_order(pevent, local_gtid_certified_string,
518                                            &view_change_event_gno, cont);
519   }
520 
521   /*
522     If there are was a timeout applying this or an older view change,
523     just store the event for future application.
524   */
525   if (error) {
526     if (LOCAL_WAIT_TIMEOUT_ERROR == error) {
527       error = store_view_event_for_delayed_logging(
528           pevent, local_gtid_certified_string, view_change_event_gno, cont);
529       LogPluginErr(WARNING_LEVEL, ER_GRP_DELAYED_VCLE_LOGGING);
530       if (error)
531         cont->signal(1, false);
532       else
533         cont->signal(0, cont->is_transaction_discarded());
534     } else
535       cont->signal(1, false);
536   }
537 
538   return error;
539 }
540 
log_delayed_view_change_events(Continuation * cont)541 int Certification_handler::log_delayed_view_change_events(Continuation *cont) {
542   DBUG_TRACE;
543 
544   int error = 0;
545 
546   while (!pending_view_change_events.empty() && !error) {
547     View_change_stored_info *stored_view_info =
548         pending_view_change_events.front();
549     error = log_view_change_event_in_order(
550         stored_view_info->view_change_pevent,
551         stored_view_info->local_gtid_certified,
552         &(stored_view_info->view_change_event_gno), cont);
553     // if we timeout keep the event
554     if (LOCAL_WAIT_TIMEOUT_ERROR != error) {
555       delete stored_view_info->view_change_pevent;
556       delete stored_view_info;
557       pending_view_change_events.pop_front();
558     }
559   }
560   return error;
561 }
562 
store_view_event_for_delayed_logging(Pipeline_event * pevent,std::string & local_gtid_certified_string,rpl_gno event_gno,Continuation * cont)563 int Certification_handler::store_view_event_for_delayed_logging(
564     Pipeline_event *pevent, std::string &local_gtid_certified_string,
565     rpl_gno event_gno, Continuation *cont) {
566   DBUG_TRACE;
567 
568   int error = 0;
569 
570   Log_event *event = nullptr;
571   error = pevent->get_LogEvent(&event);
572   if (error || (event == nullptr)) {
573     /* purecov: begin inspected */
574     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_VIEW_CHANGE_LOG_EVENT_FAILED);
575     return 1;
576     /* purecov: end */
577   }
578   View_change_log_event *vchange_event =
579       static_cast<View_change_log_event *>(event);
580   std::string view_change_event_id(vchange_event->get_view_id());
581 
582   // -1 means there was a second timeout on a VCLE that we already delayed
583   if (view_change_event_id != "-1") {
584     m_view_change_event_on_wait = true;
585     View_change_stored_info *vcle_info = new View_change_stored_info(
586         pevent, local_gtid_certified_string, event_gno);
587     pending_view_change_events.push_back(vcle_info);
588     // Use the discard flag to let the applier know this was delayed
589     cont->set_transation_discarded(true);
590   }
591 
592   // Add a packet back to the applier queue so it is processed in a later stage.
593   std::string delayed_view_id("-1");
594   View_change_packet *view_change_packet =
595       new View_change_packet(delayed_view_id);
596   applier_module->add_view_change_packet(view_change_packet);
597 
598   return error;
599 }
600 
wait_for_local_transaction_execution(std::string & local_gtid_certified_string)601 int Certification_handler::wait_for_local_transaction_execution(
602     std::string &local_gtid_certified_string) {
603   DBUG_TRACE;
604   int error = 0;
605 
606   if (local_gtid_certified_string.empty()) {
607     if (!cert_module->get_local_certified_gtid(local_gtid_certified_string)) {
608       return 0;  // set is empty, we don't need to wait
609     }
610   }
611 
612   Sql_service_command_interface *sql_command_interface =
613       new Sql_service_command_interface();
614 
615   if (sql_command_interface->establish_session_connection(PSESSION_USE_THREAD,
616                                                           GROUPREPL_USER)) {
617     /* purecov: begin inspected */
618     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CONTACT_WITH_SRV_FAILED);
619     delete sql_command_interface;
620     return 1;
621     /* purecov: end */
622   }
623 
624   if ((error = sql_command_interface->wait_for_server_gtid_executed(
625            local_gtid_certified_string, GTID_WAIT_TIMEOUT))) {
626     /* purecov: begin inspected */
627     if (error == -1)  // timeout
628     {
629       LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_SRV_WAIT_TIME_OUT);
630       error = LOCAL_WAIT_TIMEOUT_ERROR;
631     } else {
632       LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_SRV_GTID_WAIT_ERROR);
633     }
634     /* purecov: end */
635   }
636   delete sql_command_interface;
637   return error;
638 }
639 
inject_transactional_events(Pipeline_event * pevent,rpl_gno * event_gno,Continuation * cont)640 int Certification_handler::inject_transactional_events(Pipeline_event *pevent,
641                                                        rpl_gno *event_gno,
642                                                        Continuation *cont) {
643   DBUG_TRACE;
644   Log_event *event = nullptr;
645   Format_description_log_event *fd_event = nullptr;
646 
647   if (pevent->get_LogEvent(&event) || (event == nullptr)) {
648     /* purecov: begin inspected */
649     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_LOG_EVENT_FAILED);
650     cont->signal(1, true);
651     return 1;
652     /* purecov: end */
653   }
654 
655   if (pevent->get_FormatDescription(&fd_event) && (fd_event == nullptr)) {
656     /* purecov: begin inspected */
657     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_FORMAT_DESC_LOG_EVENT_FAILED);
658     cont->signal(1, true);
659     return 1;
660     /* purecov: end */
661   }
662 
663   // GTID event
664 
665   if (*event_gno == -1) {
666     *event_gno = cert_module->generate_view_change_group_gno();
667   }
668   Gtid gtid = {group_sidno, *event_gno};
669   if (gtid.gno <= 0) {
670     cont->signal(1, true);
671     return 1;
672   }
673   Gtid_specification gtid_specification = {ASSIGNED_GTID, gtid};
674   /**
675    The original_commit_timestamp of this Gtid_log_event will be zero
676    because the transaction corresponds to a View_change_event, which is
677    generated and committed locally by all members. Consequently, there is no
678    'original master'. So, instead of each member generating a GTID with
679    its own unique original_commit_timestamp (and violating the property that
680    the original_commit_timestamp is the same for a given GTID), this timestamp
681    will not be defined.
682   */
683   uint32_t server_version = do_server_version_int(::server_version);
684   Gtid_log_event *gtid_log_event =
685       new Gtid_log_event(event->server_id, true, 0, 0, true, 0, 0,
686                          gtid_specification, server_version, server_version);
687 
688   Pipeline_event *gtid_pipeline_event =
689       new Pipeline_event(gtid_log_event, fd_event);
690   next(gtid_pipeline_event, cont);
691 
692   int error = cont->wait();
693   delete gtid_pipeline_event;
694   if (error) {
695     return 0; /* purecov: inspected */
696   }
697 
698   // BEGIN event
699 
700   Log_event *begin_log_event = new Query_log_event(
701       applier_module_thd, STRING_WITH_LEN("BEGIN"), true, false, true, 0, true);
702 
703   Pipeline_event *begin_pipeline_event =
704       new Pipeline_event(begin_log_event, fd_event);
705   next(begin_pipeline_event, cont);
706 
707   error = cont->wait();
708   delete begin_pipeline_event;
709   if (error) {
710     return 0; /* purecov: inspected */
711   }
712 
713   /*
714    Queues the given event.
715    As we don't have asynchronous we can use the received Continuation.
716    If that is no longer true, another Continuation object must be created here.
717   */
718   next(pevent, cont);
719   error = cont->wait();
720   if (error) {
721     return 0; /* purecov: inspected */
722   }
723 
724   // COMMIT event
725 
726   Log_event *end_log_event =
727       new Query_log_event(applier_module_thd, STRING_WITH_LEN("COMMIT"), true,
728                           false, true, 0, true);
729 
730   Pipeline_event *end_pipeline_event =
731       new Pipeline_event(end_log_event, fd_event);
732   next(end_pipeline_event, cont);
733   delete end_pipeline_event;
734 
735   return 0;
736 }
737 
log_view_change_event_in_order(Pipeline_event * view_pevent,std::string & local_gtid_string,rpl_gno * event_gno,Continuation * cont)738 int Certification_handler::log_view_change_event_in_order(
739     Pipeline_event *view_pevent, std::string &local_gtid_string,
740     rpl_gno *event_gno, Continuation *cont) {
741   DBUG_TRACE;
742 
743   int error = 0;
744   bool first_log_attempt = (*event_gno == -1);
745 
746   Log_event *event = nullptr;
747   error = view_pevent->get_LogEvent(&event);
748   if (error || (event == nullptr)) {
749     /* purecov: begin inspected */
750     LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_VIEW_CHANGE_LOG_EVENT_FAILED);
751     return 1;
752     /* purecov: end */
753   }
754   View_change_log_event *vchange_event =
755       static_cast<View_change_log_event *>(event);
756   std::string view_change_event_id(vchange_event->get_view_id());
757 
758   // We are just logging old event(s), this packet was created to delay that
759   // process
760   if (unlikely(view_change_event_id == "-1")) return 0;
761 
762   if (first_log_attempt) {
763     std::map<std::string, std::string> cert_info;
764     cert_module->get_certification_info(&cert_info);
765     size_t event_size = 0;
766     vchange_event->set_certification_info(&cert_info, &event_size);
767 
768     /*
769        If certification information is too big this event can't be transmitted
770        as it would cause failures on all group members.
771        To avoid this, we  now instead encode an error that will make the joiner
772        leave the group.
773     */
774     if (event_size > get_slave_max_allowed_packet()) {
775       cert_info.clear();
776       cert_info[Certifier::CERTIFICATION_INFO_ERROR_NAME] =
777           "Certification information is too large for transmission.";
778       vchange_event->set_certification_info(&cert_info, &event_size);
779     }
780   }
781 
782   // Assure the last known local transaction was already executed
783   error = wait_for_local_transaction_execution(local_gtid_string);
784 
785   if (!error) {
786     /**
787      Create a transactional block for the View change log event
788      GTID
789      BEGIN
790      VCLE
791      COMMIT
792     */
793     error = inject_transactional_events(view_pevent, event_gno, cont);
794   } else if (LOCAL_WAIT_TIMEOUT_ERROR == error && first_log_attempt) {
795     // Even if we can't log it, register the position
796     *event_gno = cert_module->generate_view_change_group_gno();
797   }
798 
799   return error;
800 }
801 
is_unique()802 bool Certification_handler::is_unique() { return true; }
803 
get_role()804 int Certification_handler::get_role() { return CERTIFIER; }
805 
get_certifier()806 Certifier_interface *Certification_handler::get_certifier() {
807   return cert_module;
808 }
809