1 /* Copyright (c) 2014, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "plugin/group_replication/include/handlers/certification_handler.h"
24
25 #include <mysql/components/services/log_builtins.h>
26 #include "my_dbug.h"
27 #include "my_inttypes.h"
28 #include "plugin/group_replication/include/consistency_manager.h"
29 #include "plugin/group_replication/include/handlers/pipeline_handlers.h"
30 #include "plugin/group_replication/include/plugin.h"
31
32 using std::string;
33 const int GTID_WAIT_TIMEOUT = 10; // 10 seconds
34 const int LOCAL_WAIT_TIMEOUT_ERROR = -1;
35
Certification_handler()36 Certification_handler::Certification_handler()
37 : cert_module(nullptr),
38 applier_module_thd(nullptr),
39 group_sidno(0),
40 transaction_context_packet(nullptr),
41 transaction_context_pevent(nullptr),
42 m_view_change_event_on_wait(false) {}
43
~Certification_handler()44 Certification_handler::~Certification_handler() {
45 delete transaction_context_pevent;
46 delete transaction_context_packet;
47
48 for (std::list<View_change_stored_info *>::iterator stored_view_info_it =
49 pending_view_change_events.begin();
50 stored_view_info_it != pending_view_change_events.end();
51 ++stored_view_info_it) {
52 delete (*stored_view_info_it)->view_change_pevent;
53 delete *stored_view_info_it;
54 }
55 }
56
initialize()57 int Certification_handler::initialize() {
58 DBUG_TRACE;
59 DBUG_ASSERT(cert_module == nullptr);
60 cert_module = new Certifier();
61 return 0;
62 }
63
terminate()64 int Certification_handler::terminate() {
65 DBUG_TRACE;
66 int error = 0;
67
68 if (cert_module == nullptr) return error; /* purecov: inspected */
69
70 delete cert_module;
71 cert_module = nullptr;
72 return error;
73 }
74
handle_action(Pipeline_action * action)75 int Certification_handler::handle_action(Pipeline_action *action) {
76 DBUG_TRACE;
77
78 int error = 0;
79
80 Plugin_handler_action action_type =
81 (Plugin_handler_action)action->get_action_type();
82
83 if (action_type == HANDLER_CERT_CONF_ACTION) {
84 Handler_certifier_configuration_action *conf_action =
85 (Handler_certifier_configuration_action *)action;
86
87 error =
88 cert_module->initialize(conf_action->get_gtid_assignment_block_size());
89
90 group_sidno = conf_action->get_group_sidno();
91 } else if (action_type == HANDLER_CERT_INFO_ACTION) {
92 Handler_certifier_information_action *cert_inf_action =
93 (Handler_certifier_information_action *)action;
94
95 error = cert_module->set_certification_info(
96 cert_inf_action->get_certification_info());
97 } else if (action_type == HANDLER_VIEW_CHANGE_ACTION) {
98 View_change_pipeline_action *vc_action =
99 (View_change_pipeline_action *)action;
100
101 if (!vc_action->is_leaving()) {
102 cert_module->handle_view_change();
103 }
104 } else if (action_type == HANDLER_THD_ACTION) {
105 Handler_THD_setup_action *thd_conf_action =
106 (Handler_THD_setup_action *)action;
107 applier_module_thd = thd_conf_action->get_THD_object();
108 } else if (action_type == HANDLER_STOP_ACTION) {
109 error = cert_module->terminate();
110 }
111
112 if (error) return error;
113
114 return next(action);
115 }
116
handle_event(Pipeline_event * pevent,Continuation * cont)117 int Certification_handler::handle_event(Pipeline_event *pevent,
118 Continuation *cont) {
119 DBUG_TRACE;
120
121 Log_event_type ev_type = pevent->get_event_type();
122 switch (ev_type) {
123 case binary_log::TRANSACTION_CONTEXT_EVENT:
124 return handle_transaction_context(pevent, cont);
125 case binary_log::GTID_LOG_EVENT:
126 return handle_transaction_id(pevent, cont);
127 case binary_log::VIEW_CHANGE_EVENT:
128 return extract_certification_info(pevent, cont);
129 default:
130 next(pevent, cont);
131 return 0;
132 }
133 }
134
set_transaction_context(Pipeline_event * pevent)135 int Certification_handler::set_transaction_context(Pipeline_event *pevent) {
136 DBUG_TRACE;
137 int error = 0;
138
139 DBUG_ASSERT(transaction_context_packet == nullptr);
140 DBUG_ASSERT(transaction_context_pevent == nullptr);
141
142 Data_packet *packet = nullptr;
143 error = pevent->get_Packet(&packet);
144 if (error || (packet == nullptr)) {
145 /* purecov: begin inspected */
146 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_TRANS_CONTEXT_FAILED);
147 return 1;
148 /* purecov: end */
149 }
150 transaction_context_packet = new Data_packet(packet->payload, packet->len);
151
152 return error;
153 }
154
get_transaction_context(Pipeline_event * pevent,Transaction_context_log_event ** tcle)155 int Certification_handler::get_transaction_context(
156 Pipeline_event *pevent, Transaction_context_log_event **tcle) {
157 DBUG_TRACE;
158 int error = 0;
159
160 DBUG_ASSERT(transaction_context_packet != nullptr);
161 DBUG_ASSERT(transaction_context_pevent == nullptr);
162
163 Format_description_log_event *fdle = nullptr;
164 if (pevent->get_FormatDescription(&fdle) && (fdle == nullptr)) {
165 /* purecov: begin inspected */
166 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_FORMAT_DESC_LOG_EVENT_FAILED);
167 return 1;
168 /* purecov: end */
169 }
170
171 transaction_context_pevent =
172 new Pipeline_event(transaction_context_packet, fdle);
173 Log_event *transaction_context_event = nullptr;
174 error = transaction_context_pevent->get_LogEvent(&transaction_context_event);
175 transaction_context_packet = nullptr;
176 DBUG_EXECUTE_IF("certification_handler_force_error_on_pipeline", error = 1;);
177 if (error || (transaction_context_event == nullptr)) {
178 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_TRANS_CONTEXT_LOG_EVENT_FAILED);
179 return 1;
180 }
181
182 *tcle =
183 static_cast<Transaction_context_log_event *>(transaction_context_event);
184 if ((*tcle)->read_snapshot_version()) {
185 /* purecov: begin inspected */
186 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_SNAPSHOT_VERSION_FAILED);
187 return 1;
188 /* purecov: end */
189 }
190
191 return error;
192 }
193
reset_transaction_context()194 void Certification_handler::reset_transaction_context() {
195 DBUG_TRACE;
196
197 /*
198 Release memory allocated to transaction_context_packet,
199 since it is wrapped by transaction_context_pevent.
200 */
201 delete transaction_context_pevent;
202 transaction_context_pevent = nullptr;
203 }
204
handle_transaction_context(Pipeline_event * pevent,Continuation * cont)205 int Certification_handler::handle_transaction_context(Pipeline_event *pevent,
206 Continuation *cont) {
207 DBUG_TRACE;
208 int error = 0;
209
210 error = set_transaction_context(pevent);
211 if (error)
212 cont->signal(1, true); /* purecov: inspected */
213 else
214 next(pevent, cont);
215
216 return error;
217 }
218
handle_transaction_id(Pipeline_event * pevent,Continuation * cont)219 int Certification_handler::handle_transaction_id(Pipeline_event *pevent,
220 Continuation *cont) {
221 DBUG_TRACE;
222 int error = 0;
223 rpl_gno seq_number = 0;
224 bool local_transaction = true;
225 Transaction_context_log_event *tcle = nullptr;
226 Log_event *event = nullptr;
227 Gtid_log_event *gle = nullptr;
228 std::list<Gcs_member_identifier> *online_members =
229 pevent->get_online_members();
230
231 /*
232 Get transaction context.
233 */
234 error = get_transaction_context(pevent, &tcle);
235 if (error) {
236 cont->signal(1, true);
237 goto end;
238 }
239
240 /*
241 Get transaction global identifier event.
242 */
243 error = pevent->get_LogEvent(&event);
244 if (error || (event == nullptr)) {
245 /* purecov: begin inspected */
246 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_GTID_LOG_EVENT_FAILED);
247 cont->signal(1, true);
248 error = 1;
249 goto end;
250 /* purecov: end */
251 }
252 gle = static_cast<Gtid_log_event *>(event);
253
254 local_transaction =
255 !strncmp(tcle->get_server_uuid(), local_member_info->get_uuid().c_str(),
256 UUID_LENGTH);
257
258 /*
259 Group contains members that do not support transactions with group
260 coordination, thence the transaction must rollback.
261 */
262 DBUG_EXECUTE_IF(
263 "group_replication_force_lower_version_on_group_replication_consistency",
264 { online_members = nullptr; };);
265 if (pevent->get_consistency_level() >= GROUP_REPLICATION_CONSISTENCY_AFTER &&
266 nullptr == online_members) {
267 goto after_certify;
268 }
269
270 /*
271 Certify transaction.
272 */
273 seq_number =
274 cert_module->certify(tcle->get_snapshot_version(), tcle->get_write_set(),
275 !tcle->is_gtid_specified(), tcle->get_server_uuid(),
276 gle, local_transaction);
277
278 after_certify:
279 if (local_transaction) {
280 /*
281 Local transaction.
282 After certification we need to wake up the waiting thread on the
283 plugin to proceed with the transaction processing.
284 Sequence number <= 0 means abort, so we need to pass a negative
285 value to transaction context.
286 */
287 Transaction_termination_ctx transaction_termination_ctx;
288 memset(&transaction_termination_ctx, 0,
289 sizeof(transaction_termination_ctx));
290 transaction_termination_ctx.m_thread_id = tcle->get_thread_id();
291 if (seq_number > 0) {
292 transaction_termination_ctx.m_rollback_transaction = false;
293 if (tcle->is_gtid_specified()) {
294 transaction_termination_ctx.m_generated_gtid = false;
295 } else {
296 transaction_termination_ctx.m_generated_gtid = true;
297 transaction_termination_ctx.m_sidno = group_sidno;
298 transaction_termination_ctx.m_gno = seq_number;
299 }
300 } else {
301 transaction_termination_ctx.m_rollback_transaction = true;
302 transaction_termination_ctx.m_generated_gtid = false;
303 transaction_termination_ctx.m_sidno = -1;
304 transaction_termination_ctx.m_gno = -1;
305 }
306
307 if (set_transaction_ctx(transaction_termination_ctx)) {
308 /* purecov: begin inspected */
309 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_UPDATE_SERV_CERTIFICATE_FAILED,
310 tcle->get_thread_id());
311 cont->signal(1, true);
312 error = 1;
313 goto end;
314 /* purecov: end */
315 }
316
317 if (seq_number > 0) {
318 const rpl_sid *sid = nullptr;
319 rpl_sidno sidno = group_sidno;
320 rpl_gno gno = seq_number;
321
322 if (tcle->is_gtid_specified()) {
323 sid = gle->get_sid();
324 sidno = gle->get_sidno(true);
325 gno = gle->get_gno();
326 error =
327 cert_module->add_specified_gtid_to_group_gtid_executed(gle, true);
328 DBUG_EXECUTE_IF("unable_to_add_specified_gtid_for_local_transaction",
329 error = 1;);
330
331 if (error) {
332 LogPluginErr(ERROR_LEVEL,
333 ER_GRP_RPL_ADD_GTID_INFO_WITH_LOCAL_GTID_FAILED);
334 transactions_latch->releaseTicket(tcle->get_thread_id());
335 cont->signal(1, true);
336 goto end;
337 }
338 } else {
339 if (cert_module->add_group_gtid_to_group_gtid_executed(gno, true)) {
340 /* purecov: begin inspected */
341 LogPluginErr(ERROR_LEVEL,
342 ER_GRP_RPL_ADD_GTID_INFO_WITHOUT_LOCAL_GTID_FAILED);
343 transactions_latch->releaseTicket(tcle->get_thread_id());
344 cont->signal(1, true);
345 error = 1;
346 goto end;
347 /* purecov: end */
348 }
349 }
350
351 if (pevent->get_consistency_level() >=
352 GROUP_REPLICATION_CONSISTENCY_AFTER) {
353 Transaction_consistency_info *transaction_consistency_info =
354 new Transaction_consistency_info(
355 tcle->get_thread_id(), local_transaction, sid, sidno, gno,
356 pevent->get_consistency_level(), pevent->get_online_members());
357 pevent->release_online_members_memory_ownership();
358 if (transaction_consistency_manager->after_certification(
359 transaction_consistency_info)) {
360 /* purecov: begin inspected */
361 delete transaction_consistency_info;
362 cont->signal(1, true);
363 error = 1;
364 goto end;
365 /* purecov: end */
366 }
367 }
368 }
369
370 /*
371 We only release the local transaction here when its consistency
372 does not require group coordination.
373 */
374 if ((seq_number <= 0 || pevent->get_consistency_level() <
375 GROUP_REPLICATION_CONSISTENCY_AFTER) &&
376 transactions_latch->releaseTicket(tcle->get_thread_id())) {
377 /* purecov: begin inspected */
378 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_NOTIFY_CERTIFICATION_OUTCOME_FAILED);
379 cont->signal(1, true);
380 error = 1;
381 goto end;
382 /* purecov: end */
383 }
384
385 // The pipeline ended for this transaction
386 cont->signal(0, true);
387 } else {
388 /*
389 Remote transaction.
390 */
391 if (seq_number > 0) {
392 const rpl_sid *sid = nullptr;
393 rpl_sidno sidno = group_sidno;
394 rpl_gno gno = seq_number;
395
396 if (!tcle->is_gtid_specified()) {
397 // Create new GTID event.
398 Gtid gtid = {sidno, gno};
399 Gtid_specification gtid_specification = {ASSIGNED_GTID, gtid};
400 Gtid_log_event *gle_generated = new Gtid_log_event(
401 gle->server_id, gle->is_using_trans_cache(), gle->last_committed,
402 gle->sequence_number, gle->may_have_sbr_stmts,
403 gle->original_commit_timestamp, gle->immediate_commit_timestamp,
404 gtid_specification, gle->original_server_version,
405 gle->immediate_server_version);
406 // Copy the transaction length to the new event.
407 gle_generated->set_trx_length(gle->transaction_length);
408
409 pevent->reset_pipeline_event();
410 pevent->set_LogEvent(gle_generated);
411
412 // Add the gtid information in the executed gtid set for the remote
413 // transaction which have gtid specified.
414 if (cert_module->add_group_gtid_to_group_gtid_executed(gno, false)) {
415 /* purecov: begin inspected */
416 LogPluginErr(ERROR_LEVEL,
417 ER_GRP_RPL_ADD_GTID_INFO_WITHOUT_REMOTE_GTID_FAILED);
418 cont->signal(1, true);
419 error = 1;
420 goto end;
421 /* purecov: end */
422 }
423 }
424
425 else {
426 sid = gle->get_sid();
427 sidno = gle->get_sidno(true);
428 gno = gle->get_gno();
429 error =
430 cert_module->add_specified_gtid_to_group_gtid_executed(gle, false);
431 DBUG_EXECUTE_IF("unable_to_add_specified_gtid_for_remote_transaction",
432 error = 1;);
433
434 if (error) {
435 /* purecov: begin inspected */
436 LogPluginErr(ERROR_LEVEL,
437 ER_GRP_RPL_ADD_GTID_INFO_WITH_REMOTE_GTID_FAILED);
438 cont->signal(1, true);
439 goto end;
440 /* purecov: end */
441 }
442 }
443
444 if (pevent->get_consistency_level() >=
445 GROUP_REPLICATION_CONSISTENCY_AFTER) {
446 Transaction_consistency_info *transaction_consistency_info =
447 new Transaction_consistency_info(
448 tcle->get_thread_id(), local_transaction, sid, sidno, gno,
449 pevent->get_consistency_level(), pevent->get_online_members());
450 pevent->release_online_members_memory_ownership();
451 if (transaction_consistency_manager->after_certification(
452 transaction_consistency_info)) {
453 /* purecov: begin inspected */
454 delete transaction_consistency_info;
455 cont->signal(1, true);
456 error = 1;
457 goto end;
458 /* purecov: end */
459 }
460 }
461
462 // Pass transaction to next action.
463 next(pevent, cont);
464 } else if (seq_number < 0) {
465 error = 1;
466 cont->signal(1, true);
467 goto end;
468 } else {
469 // The transaction was negatively certified so discard it.
470 cont->signal(0, true);
471 }
472 }
473
474 end:
475 reset_transaction_context();
476 return error;
477 }
478
extract_certification_info(Pipeline_event * pevent,Continuation * cont)479 int Certification_handler::extract_certification_info(Pipeline_event *pevent,
480 Continuation *cont) {
481 DBUG_TRACE;
482 int error = 0;
483
484 if (pevent->get_event_context() != SINGLE_VIEW_EVENT) {
485 /*
486 If the current view event is embraced on a transaction:
487 GTID, BEGIN, VIEW, COMMIT; it means that we are handling
488 a view that was delivered by a asynchronous channel from
489 outside of the group.
490 On that case we just have to queue it on the group applier
491 channel, without any special handling.
492 */
493 next(pevent, cont);
494 return error;
495 }
496
497 /*
498 If the current view event is a standalone event (not inside a
499 transaction), it means that it was injected from GCS on a
500 membership change.
501 On that case we need to queue it on the group applier wrapped
502 on a transaction with a group generated GTID.
503 */
504
505 /*
506 If there are pending view changes to apply, apply them first.
507 If we can't apply the old VCLEs probably we can't apply the new one
508 */
509 if (unlikely(m_view_change_event_on_wait)) {
510 error = log_delayed_view_change_events(cont);
511 m_view_change_event_on_wait = !pending_view_change_events.empty();
512 }
513
514 std::string local_gtid_certified_string;
515 rpl_gno view_change_event_gno = -1;
516 if (!error) {
517 error = log_view_change_event_in_order(pevent, local_gtid_certified_string,
518 &view_change_event_gno, cont);
519 }
520
521 /*
522 If there are was a timeout applying this or an older view change,
523 just store the event for future application.
524 */
525 if (error) {
526 if (LOCAL_WAIT_TIMEOUT_ERROR == error) {
527 error = store_view_event_for_delayed_logging(
528 pevent, local_gtid_certified_string, view_change_event_gno, cont);
529 LogPluginErr(WARNING_LEVEL, ER_GRP_DELAYED_VCLE_LOGGING);
530 if (error)
531 cont->signal(1, false);
532 else
533 cont->signal(0, cont->is_transaction_discarded());
534 } else
535 cont->signal(1, false);
536 }
537
538 return error;
539 }
540
log_delayed_view_change_events(Continuation * cont)541 int Certification_handler::log_delayed_view_change_events(Continuation *cont) {
542 DBUG_TRACE;
543
544 int error = 0;
545
546 while (!pending_view_change_events.empty() && !error) {
547 View_change_stored_info *stored_view_info =
548 pending_view_change_events.front();
549 error = log_view_change_event_in_order(
550 stored_view_info->view_change_pevent,
551 stored_view_info->local_gtid_certified,
552 &(stored_view_info->view_change_event_gno), cont);
553 // if we timeout keep the event
554 if (LOCAL_WAIT_TIMEOUT_ERROR != error) {
555 delete stored_view_info->view_change_pevent;
556 delete stored_view_info;
557 pending_view_change_events.pop_front();
558 }
559 }
560 return error;
561 }
562
store_view_event_for_delayed_logging(Pipeline_event * pevent,std::string & local_gtid_certified_string,rpl_gno event_gno,Continuation * cont)563 int Certification_handler::store_view_event_for_delayed_logging(
564 Pipeline_event *pevent, std::string &local_gtid_certified_string,
565 rpl_gno event_gno, Continuation *cont) {
566 DBUG_TRACE;
567
568 int error = 0;
569
570 Log_event *event = nullptr;
571 error = pevent->get_LogEvent(&event);
572 if (error || (event == nullptr)) {
573 /* purecov: begin inspected */
574 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_VIEW_CHANGE_LOG_EVENT_FAILED);
575 return 1;
576 /* purecov: end */
577 }
578 View_change_log_event *vchange_event =
579 static_cast<View_change_log_event *>(event);
580 std::string view_change_event_id(vchange_event->get_view_id());
581
582 // -1 means there was a second timeout on a VCLE that we already delayed
583 if (view_change_event_id != "-1") {
584 m_view_change_event_on_wait = true;
585 View_change_stored_info *vcle_info = new View_change_stored_info(
586 pevent, local_gtid_certified_string, event_gno);
587 pending_view_change_events.push_back(vcle_info);
588 // Use the discard flag to let the applier know this was delayed
589 cont->set_transation_discarded(true);
590 }
591
592 // Add a packet back to the applier queue so it is processed in a later stage.
593 std::string delayed_view_id("-1");
594 View_change_packet *view_change_packet =
595 new View_change_packet(delayed_view_id);
596 applier_module->add_view_change_packet(view_change_packet);
597
598 return error;
599 }
600
wait_for_local_transaction_execution(std::string & local_gtid_certified_string)601 int Certification_handler::wait_for_local_transaction_execution(
602 std::string &local_gtid_certified_string) {
603 DBUG_TRACE;
604 int error = 0;
605
606 if (local_gtid_certified_string.empty()) {
607 if (!cert_module->get_local_certified_gtid(local_gtid_certified_string)) {
608 return 0; // set is empty, we don't need to wait
609 }
610 }
611
612 Sql_service_command_interface *sql_command_interface =
613 new Sql_service_command_interface();
614
615 if (sql_command_interface->establish_session_connection(PSESSION_USE_THREAD,
616 GROUPREPL_USER)) {
617 /* purecov: begin inspected */
618 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CONTACT_WITH_SRV_FAILED);
619 delete sql_command_interface;
620 return 1;
621 /* purecov: end */
622 }
623
624 if ((error = sql_command_interface->wait_for_server_gtid_executed(
625 local_gtid_certified_string, GTID_WAIT_TIMEOUT))) {
626 /* purecov: begin inspected */
627 if (error == -1) // timeout
628 {
629 LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_SRV_WAIT_TIME_OUT);
630 error = LOCAL_WAIT_TIMEOUT_ERROR;
631 } else {
632 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_SRV_GTID_WAIT_ERROR);
633 }
634 /* purecov: end */
635 }
636 delete sql_command_interface;
637 return error;
638 }
639
inject_transactional_events(Pipeline_event * pevent,rpl_gno * event_gno,Continuation * cont)640 int Certification_handler::inject_transactional_events(Pipeline_event *pevent,
641 rpl_gno *event_gno,
642 Continuation *cont) {
643 DBUG_TRACE;
644 Log_event *event = nullptr;
645 Format_description_log_event *fd_event = nullptr;
646
647 if (pevent->get_LogEvent(&event) || (event == nullptr)) {
648 /* purecov: begin inspected */
649 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_LOG_EVENT_FAILED);
650 cont->signal(1, true);
651 return 1;
652 /* purecov: end */
653 }
654
655 if (pevent->get_FormatDescription(&fd_event) && (fd_event == nullptr)) {
656 /* purecov: begin inspected */
657 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_FORMAT_DESC_LOG_EVENT_FAILED);
658 cont->signal(1, true);
659 return 1;
660 /* purecov: end */
661 }
662
663 // GTID event
664
665 if (*event_gno == -1) {
666 *event_gno = cert_module->generate_view_change_group_gno();
667 }
668 Gtid gtid = {group_sidno, *event_gno};
669 if (gtid.gno <= 0) {
670 cont->signal(1, true);
671 return 1;
672 }
673 Gtid_specification gtid_specification = {ASSIGNED_GTID, gtid};
674 /**
675 The original_commit_timestamp of this Gtid_log_event will be zero
676 because the transaction corresponds to a View_change_event, which is
677 generated and committed locally by all members. Consequently, there is no
678 'original master'. So, instead of each member generating a GTID with
679 its own unique original_commit_timestamp (and violating the property that
680 the original_commit_timestamp is the same for a given GTID), this timestamp
681 will not be defined.
682 */
683 uint32_t server_version = do_server_version_int(::server_version);
684 Gtid_log_event *gtid_log_event =
685 new Gtid_log_event(event->server_id, true, 0, 0, true, 0, 0,
686 gtid_specification, server_version, server_version);
687
688 Pipeline_event *gtid_pipeline_event =
689 new Pipeline_event(gtid_log_event, fd_event);
690 next(gtid_pipeline_event, cont);
691
692 int error = cont->wait();
693 delete gtid_pipeline_event;
694 if (error) {
695 return 0; /* purecov: inspected */
696 }
697
698 // BEGIN event
699
700 Log_event *begin_log_event = new Query_log_event(
701 applier_module_thd, STRING_WITH_LEN("BEGIN"), true, false, true, 0, true);
702
703 Pipeline_event *begin_pipeline_event =
704 new Pipeline_event(begin_log_event, fd_event);
705 next(begin_pipeline_event, cont);
706
707 error = cont->wait();
708 delete begin_pipeline_event;
709 if (error) {
710 return 0; /* purecov: inspected */
711 }
712
713 /*
714 Queues the given event.
715 As we don't have asynchronous we can use the received Continuation.
716 If that is no longer true, another Continuation object must be created here.
717 */
718 next(pevent, cont);
719 error = cont->wait();
720 if (error) {
721 return 0; /* purecov: inspected */
722 }
723
724 // COMMIT event
725
726 Log_event *end_log_event =
727 new Query_log_event(applier_module_thd, STRING_WITH_LEN("COMMIT"), true,
728 false, true, 0, true);
729
730 Pipeline_event *end_pipeline_event =
731 new Pipeline_event(end_log_event, fd_event);
732 next(end_pipeline_event, cont);
733 delete end_pipeline_event;
734
735 return 0;
736 }
737
log_view_change_event_in_order(Pipeline_event * view_pevent,std::string & local_gtid_string,rpl_gno * event_gno,Continuation * cont)738 int Certification_handler::log_view_change_event_in_order(
739 Pipeline_event *view_pevent, std::string &local_gtid_string,
740 rpl_gno *event_gno, Continuation *cont) {
741 DBUG_TRACE;
742
743 int error = 0;
744 bool first_log_attempt = (*event_gno == -1);
745
746 Log_event *event = nullptr;
747 error = view_pevent->get_LogEvent(&event);
748 if (error || (event == nullptr)) {
749 /* purecov: begin inspected */
750 LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_FETCH_VIEW_CHANGE_LOG_EVENT_FAILED);
751 return 1;
752 /* purecov: end */
753 }
754 View_change_log_event *vchange_event =
755 static_cast<View_change_log_event *>(event);
756 std::string view_change_event_id(vchange_event->get_view_id());
757
758 // We are just logging old event(s), this packet was created to delay that
759 // process
760 if (unlikely(view_change_event_id == "-1")) return 0;
761
762 if (first_log_attempt) {
763 std::map<std::string, std::string> cert_info;
764 cert_module->get_certification_info(&cert_info);
765 size_t event_size = 0;
766 vchange_event->set_certification_info(&cert_info, &event_size);
767
768 /*
769 If certification information is too big this event can't be transmitted
770 as it would cause failures on all group members.
771 To avoid this, we now instead encode an error that will make the joiner
772 leave the group.
773 */
774 if (event_size > get_slave_max_allowed_packet()) {
775 cert_info.clear();
776 cert_info[Certifier::CERTIFICATION_INFO_ERROR_NAME] =
777 "Certification information is too large for transmission.";
778 vchange_event->set_certification_info(&cert_info, &event_size);
779 }
780 }
781
782 // Assure the last known local transaction was already executed
783 error = wait_for_local_transaction_execution(local_gtid_string);
784
785 if (!error) {
786 /**
787 Create a transactional block for the View change log event
788 GTID
789 BEGIN
790 VCLE
791 COMMIT
792 */
793 error = inject_transactional_events(view_pevent, event_gno, cont);
794 } else if (LOCAL_WAIT_TIMEOUT_ERROR == error && first_log_attempt) {
795 // Even if we can't log it, register the position
796 *event_gno = cert_module->generate_view_change_group_gno();
797 }
798
799 return error;
800 }
801
is_unique()802 bool Certification_handler::is_unique() { return true; }
803
get_role()804 int Certification_handler::get_role() { return CERTIFIER; }
805
get_certifier()806 Certifier_interface *Certification_handler::get_certifier() {
807 return cert_module;
808 }
809