1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include <signal.h>
24 #include <map>
25
26 #include "certifier.h"
27 #include "plugin.h"
28 #include "plugin_log.h"
29 #include "observer_trans.h"
30
31 #include "sql_service_command.h"
32
33 const std::string Certifier::GTID_EXTRACTED_NAME= "gtid_extracted";
34 const std::string Certifier::CERTIFICATION_INFO_ERROR_NAME =
35 "certification_info_error";
36
launch_broadcast_thread(void * arg)37 static void *launch_broadcast_thread(void* arg)
38 {
39 Certifier_broadcast_thread *handler= (Certifier_broadcast_thread*) arg;
40 handler->dispatcher();
41 return 0;
42 }
43
Certifier_broadcast_thread()44 Certifier_broadcast_thread::Certifier_broadcast_thread()
45 :aborted(false), broadcast_thd_running(false), broadcast_counter(0),
46 broadcast_gtid_executed_period(BROADCAST_GTID_EXECUTED_PERIOD)
47 {
48 DBUG_EXECUTE_IF("group_replication_certifier_broadcast_thread_big_period",
49 { broadcast_gtid_executed_period= 600; });
50
51 mysql_mutex_init(key_GR_LOCK_cert_broadcast_run,
52 &broadcast_run_lock,
53 MY_MUTEX_INIT_FAST);
54 mysql_cond_init(key_GR_COND_cert_broadcast_run,
55 &broadcast_run_cond);
56 mysql_mutex_init(key_GR_LOCK_cert_broadcast_dispatcher_run,
57 &broadcast_dispatcher_lock,
58 MY_MUTEX_INIT_FAST);
59 mysql_cond_init(key_GR_COND_cert_broadcast_dispatcher_run,
60 &broadcast_dispatcher_cond);
61 }
62
~Certifier_broadcast_thread()63 Certifier_broadcast_thread::~Certifier_broadcast_thread()
64 {
65 mysql_mutex_destroy(&broadcast_run_lock);
66 mysql_cond_destroy(&broadcast_run_cond);
67 mysql_mutex_destroy(&broadcast_dispatcher_lock);
68 mysql_cond_destroy(&broadcast_dispatcher_cond);
69 }
70
initialize()71 int Certifier_broadcast_thread::initialize()
72 {
73 DBUG_ENTER("Certifier_broadcast_thread::initialize");
74
75 mysql_mutex_lock(&broadcast_run_lock);
76 if (broadcast_thd_running)
77 {
78 mysql_mutex_unlock(&broadcast_run_lock); /* purecov: inspected */
79 DBUG_RETURN(0); /* purecov: inspected */
80 }
81
82 aborted= false;
83
84 if ((mysql_thread_create(key_GR_THD_cert_broadcast,
85 &broadcast_pthd,
86 get_connection_attrib(),
87 launch_broadcast_thread,
88 (void*)this)))
89 {
90 mysql_mutex_unlock(&broadcast_run_lock); /* purecov: inspected */
91 DBUG_RETURN(1); /* purecov: inspected */
92 }
93
94 while (!broadcast_thd_running)
95 {
96 DBUG_PRINT("sleep",("Waiting for certifier broadcast thread to start"));
97 mysql_cond_wait(&broadcast_run_cond, &broadcast_run_lock);
98 }
99 mysql_mutex_unlock(&broadcast_run_lock);
100
101 DBUG_RETURN(0);
102 }
103
104
terminate()105 int Certifier_broadcast_thread::terminate()
106 {
107 DBUG_ENTER("Certifier_broadcast_thread::terminate");
108
109 mysql_mutex_lock(&broadcast_run_lock);
110 if (!broadcast_thd_running)
111 {
112 mysql_mutex_unlock(&broadcast_run_lock);
113 DBUG_RETURN(0);
114 }
115
116 aborted= true;
117 while (broadcast_thd_running)
118 {
119 DBUG_PRINT("loop", ("killing certifier broadcast thread"));
120 mysql_mutex_lock(&broadcast_thd->LOCK_thd_data);
121
122 //awake the cycle
123 mysql_mutex_lock(&broadcast_dispatcher_lock);
124 mysql_cond_broadcast(&broadcast_dispatcher_cond);
125 mysql_mutex_unlock(&broadcast_dispatcher_lock);
126
127 broadcast_thd->awake(THD::NOT_KILLED);
128 mysql_mutex_unlock(&broadcast_thd->LOCK_thd_data);
129 mysql_cond_wait(&broadcast_run_cond, &broadcast_run_lock);
130 }
131 mysql_mutex_unlock(&broadcast_run_lock);
132
133 DBUG_RETURN(0);
134 }
135
136
dispatcher()137 void Certifier_broadcast_thread::dispatcher()
138 {
139 DBUG_ENTER("Certifier_broadcast_thread::dispatcher");
140
141 //Thread context operations
142 my_thread_init();
143 THD *thd= new THD;
144 thd->set_new_thread_id();
145 thd->thread_stack= (char*) &thd;
146 thd->store_globals();
147 global_thd_manager_add_thd(thd);
148 broadcast_thd= thd;
149
150 mysql_mutex_lock(&broadcast_run_lock);
151 broadcast_thd_running= true;
152 mysql_cond_broadcast(&broadcast_run_cond);
153 mysql_mutex_unlock(&broadcast_run_lock);
154
155 struct timespec abstime;
156 while (!aborted)
157 {
158 broadcast_counter++;
159
160 applier_module->get_pipeline_stats_member_collector()
161 ->send_stats_member_message();
162
163 applier_module->get_flow_control_module()->flow_control_step();
164
165 if (broadcast_counter % broadcast_gtid_executed_period == 0)
166 broadcast_gtid_executed();
167
168 mysql_mutex_lock(&broadcast_dispatcher_lock);
169 if (aborted)
170 {
171 mysql_mutex_unlock(&broadcast_dispatcher_lock); /* purecov: inspected */
172 break; /* purecov: inspected */
173 }
174 set_timespec(&abstime, 1);
175 mysql_cond_timedwait(&broadcast_dispatcher_cond,
176 &broadcast_dispatcher_lock, &abstime);
177 mysql_mutex_unlock(&broadcast_dispatcher_lock);
178
179 /*
180 Clear server sessions open caches on transactions observer.
181 TODO: move this to a global scheduler.
182 */
183 if (broadcast_counter % 300 == 0)
184 observer_trans_clear_io_cache_unused_list(); /* purecov: inspected */
185 }
186
187 Gcs_interface_factory::cleanup(Gcs_operations::get_gcs_engine());
188
189 thd->release_resources();
190 global_thd_manager_remove_thd(thd);
191 delete thd;
192
193 mysql_mutex_lock(&broadcast_run_lock);
194 broadcast_thd_running= false;
195 mysql_cond_broadcast(&broadcast_run_cond);
196 mysql_mutex_unlock(&broadcast_run_lock);
197
198 my_thread_end();
199
200 DBUG_VOID_RETURN;
201 }
202
broadcast_gtid_executed()203 int Certifier_broadcast_thread::broadcast_gtid_executed()
204 {
205 DBUG_ENTER("Certifier_broadcast_thread::broadcast_gtid_executed");
206
207 /*
208 Member may be still joining group so we need to check if:
209 1) communication interfaces are ready to be used;
210 2) member is ONLINE, that is, distributed recovery is complete.
211 */
212 if (local_member_info == NULL)
213 DBUG_RETURN(0); /* purecov: inspected */
214 Group_member_info::Group_member_status member_status=
215 local_member_info->get_recovery_status();
216 if (member_status != Group_member_info::MEMBER_ONLINE &&
217 member_status != Group_member_info::MEMBER_IN_RECOVERY)
218 DBUG_RETURN(0);
219
220 int error= 0;
221 uchar *encoded_gtid_executed= NULL;
222 size_t length;
223 get_server_encoded_gtid_executed(&encoded_gtid_executed, &length);
224
225 Gtid_Executed_Message gtid_executed_message;
226 std::vector<uchar> encoded_gtid_executed_message;
227 gtid_executed_message.append_gtid_executed(encoded_gtid_executed, length);
228
229 enum enum_gcs_error send_err=
230 gcs_module->send_message(gtid_executed_message, true);
231 if (send_err == GCS_MESSAGE_TOO_BIG)
232 {
233 log_message(MY_ERROR_LEVEL, "Broadcast of committed transactions message "
234 "failed. Message is too big."); /* purecov: inspected */
235 error= 1; /* purecov: inspected */
236 }
237 else if (send_err == GCS_NOK)
238 {
239 log_message(MY_INFORMATION_LEVEL,
240 "Broadcast of committed transactions message failed."); /* purecov: inspected */
241 error= 1; /* purecov: inspected */
242 }
243
244
245 #if !defined(NDEBUG)
246 char *encoded_gtid_executed_string=
247 encoded_gtid_set_to_string(encoded_gtid_executed, length);
248 DBUG_PRINT("info", ("Certifier broadcast executed_set: %s", encoded_gtid_executed_string));
249 my_free(encoded_gtid_executed_string);
250 #endif
251
252 my_free(encoded_gtid_executed);
253 DBUG_RETURN(error);
254 }
255
256
Certifier()257 Certifier::Certifier()
258 :initialized(false),
259 positive_cert(0), negative_cert(0),
260 parallel_applier_last_committed_global(1),
261 parallel_applier_sequence_number(2),
262 certifying_already_applied_transactions(false),
263 gtid_assignment_block_size(1),
264 gtids_assigned_in_blocks_counter(1),
265 conflict_detection_enable(!local_member_info->in_primary_mode())
266 {
267 last_conflict_free_transaction.clear();
268
269 #if !defined(NDEBUG)
270 certifier_garbage_collection_block= false;
271 /*
272 Debug flag to block the garbage collection and discard incoming stable
273 set messages while garbage collection is on going.
274 */
275 DBUG_EXECUTE_IF("certifier_garbage_collection_block",
276 certifier_garbage_collection_block= true;);
277
278 same_member_message_discarded= false;
279 /*
280 Debug flag to check for similar member sending multiple messages.
281 */
282 DBUG_EXECUTE_IF("certifier_inject_duplicate_certifier_data_message",
283 same_member_message_discarded= true;);
284 #endif
285
286 certification_info_sid_map= new Sid_map(NULL);
287 incoming= new Synchronized_queue<Data_packet*>();
288
289 stable_gtid_set_lock= new Checkable_rwlock(
290 #ifdef HAVE_PSI_INTERFACE
291 key_GR_RWLOCK_cert_stable_gtid_set
292 #endif
293 );
294 stable_sid_map= new Sid_map(stable_gtid_set_lock);
295 stable_gtid_set= new Gtid_set(stable_sid_map, stable_gtid_set_lock);
296 broadcast_thread= new Certifier_broadcast_thread();
297
298 group_gtid_sid_map= new Sid_map(NULL);
299 group_gtid_executed= new Gtid_set(group_gtid_sid_map, NULL);
300 group_gtid_extracted= new Gtid_set(group_gtid_sid_map, NULL);
301
302 last_local_gtid.clear();
303
304 mysql_mutex_init(key_GR_LOCK_certification_info, &LOCK_certification_info,
305 MY_MUTEX_INIT_FAST);
306 mysql_mutex_init(key_GR_LOCK_cert_members, &LOCK_members,
307 MY_MUTEX_INIT_FAST);
308 }
309
310
~Certifier()311 Certifier::~Certifier()
312 {
313 clear_certification_info();
314 delete certification_info_sid_map;
315
316 delete stable_gtid_set;
317 delete stable_sid_map;
318 delete stable_gtid_set_lock;
319 delete broadcast_thread;
320 delete group_gtid_executed;
321 delete group_gtid_extracted;
322 delete group_gtid_sid_map;
323
324 clear_incoming();
325 delete incoming;
326
327 clear_members();
328 mysql_mutex_destroy(&LOCK_certification_info);
329 mysql_mutex_destroy(&LOCK_members);
330 }
331
initialize_server_gtid_set(bool get_server_gtid_retrieved)332 int Certifier::initialize_server_gtid_set(bool get_server_gtid_retrieved)
333 {
334 DBUG_ENTER("initialize_server_gtid_set");
335 mysql_mutex_assert_owner(&LOCK_certification_info);
336 int error= 0;
337 Sql_service_command_interface *sql_command_interface= NULL;
338 std::string gtid_executed;
339 std::string applier_retrieved_gtids;
340
341 rpl_sid group_sid;
342 if (group_sid.parse(group_name_var) != RETURN_STATUS_OK)
343 {
344 log_message(MY_ERROR_LEVEL,
345 "Unable to parse the group name during"
346 " the Certification module initialization"); /* purecov: inspected */
347 error= 1; /* purecov: inspected */
348 goto end; /* purecov: inspected */
349 }
350
351 group_gtid_sid_map_group_sidno= group_gtid_sid_map->add_sid(group_sid);
352 if (group_gtid_sid_map_group_sidno < 0)
353 {
354 log_message(MY_ERROR_LEVEL,
355 "Unable to add the group_sid in the group_gtid_sid_map during"
356 " the Certification module initialization"); /* purecov: inspected */
357 error= 1; /* purecov: inspected */
358 goto end; /* purecov: inspected */
359 }
360
361 if (group_gtid_executed->ensure_sidno(group_gtid_sid_map_group_sidno) != RETURN_STATUS_OK)
362 {
363 log_message(MY_ERROR_LEVEL,
364 "Error updating group_gtid_executed GITD set during"
365 " the Certification module initialization"); /* purecov: inspected */
366 error= 1; /* purecov: inspected */
367 goto end; /* purecov: inspected */
368 }
369
370 if (group_gtid_extracted->ensure_sidno(group_gtid_sid_map_group_sidno) != RETURN_STATUS_OK)
371 {
372 log_message(MY_ERROR_LEVEL,
373 "Unable to handle the donor's transaction information"
374 " when initializing the conflict detection component."
375 " Possible out of memory error."); /* purecov: inspected */
376 error= 1; /* purecov: inspected */
377 goto end; /* purecov: inspected */
378 }
379
380 sql_command_interface= new Sql_service_command_interface();
381 if (sql_command_interface->establish_session_connection(PSESSION_USE_THREAD) ||
382 sql_command_interface->set_interface_user(GROUPREPL_USER))
383 {
384 log_message(MY_ERROR_LEVEL,
385 "Error when establishing a server connection during"
386 " the Certification module initialization"); /* purecov: inspected */
387 error= 1; /* purecov: inspected */
388 goto end; /* purecov: inspected */
389 }
390
391 error= sql_command_interface->get_server_gtid_executed(gtid_executed);
392 DBUG_EXECUTE_IF("gr_server_gtid_executed_extraction_error", error=1;);
393 if (error)
394 {
395 log_message(MY_WARNING_LEVEL,
396 "Error when extracting this member GTID executed set."
397 " Certification module can't be properly initialized");
398 goto end;
399 }
400
401 if (group_gtid_executed->add_gtid_text(gtid_executed.c_str()) != RETURN_STATUS_OK)
402 {
403 log_message(MY_ERROR_LEVEL,
404 "Error while adding the server GTID EXECUTED set to the"
405 " group_gtid_execute during the Certification module"
406 " initialization"); /* purecov: inspected */
407 error= 1; /* purecov: inspected */
408 goto end; /* purecov: inspected */
409 }
410
411 if (get_server_gtid_retrieved)
412 {
413 Replication_thread_api applier_channel("group_replication_applier");
414 if (applier_channel.get_retrieved_gtid_set(applier_retrieved_gtids))
415 {
416 log_message(MY_WARNING_LEVEL,
417 "Error when extracting this member retrieved set for its applier."
418 " Certification module can't be properly initialized"); /* purecov: inspected */
419 error= 1; /* purecov: inspected */
420 goto end; /* purecov: inspected */
421 }
422
423 if (group_gtid_executed->add_gtid_text(applier_retrieved_gtids.c_str()) != RETURN_STATUS_OK)
424 {
425 log_message(MY_ERROR_LEVEL,
426 "Error while adding the member retrieved set to the"
427 " group_gtid_executed during the Certification module"
428 " initialization"); /* purecov: inspected */
429 error= 1; /* purecov: inspected */
430 goto end; /* purecov: inspected */
431 }
432 }
433
434 compute_group_available_gtid_intervals();
435
436 end:
437 delete sql_command_interface;
438
439 DBUG_RETURN(error);
440 }
441
compute_group_available_gtid_intervals()442 void Certifier::compute_group_available_gtid_intervals()
443 {
444 DBUG_ENTER("Certifier::compute_group_available_gtid_intervals");
445 mysql_mutex_assert_owner(&LOCK_certification_info);
446
447 gtids_assigned_in_blocks_counter= 1;
448 member_gtids.clear();
449 group_available_gtid_intervals.clear();
450
451 /*
452 Compute the GTID intervals that are available by inverting the
453 group_gtid_executed or group_gtid_extracted intervals.
454 */
455 Gtid_set::Const_interval_iterator ivit(certifying_already_applied_transactions
456 ? group_gtid_extracted
457 : group_gtid_executed,
458 group_gtid_sid_map_group_sidno);
459 #ifndef NDEBUG
460 if (certifying_already_applied_transactions)
461 DBUG_PRINT("Certifier::compute_group_available_gtid_intervals()",
462 ("Generating group transaction intervals from group_gtid_extracted"));
463 #endif
464
465 const Gtid_set::Interval *iv= NULL, *iv_next= NULL;
466
467 // The fist interval: UUID:100 -> we have the interval 1-99
468 if ((iv= ivit.get()) != NULL)
469 {
470 if (iv->start > 1)
471 {
472 Gtid_set::Interval interval= {1, iv->start - 1, NULL};
473 group_available_gtid_intervals.push_back(interval);
474 }
475 }
476
477 // For each used interval find the upper bound and from there
478 // add the free GTIDs up to the next interval or GNO_END.
479 while ((iv= ivit.get()) != NULL)
480 {
481 ivit.next();
482 iv_next= ivit.get();
483
484 rpl_gno start= iv->end;
485 rpl_gno end= GNO_END;
486 if (iv_next != NULL)
487 end= iv_next->start - 1;
488
489 assert(start <= end);
490 Gtid_set::Interval interval= {start, end, NULL};
491 group_available_gtid_intervals.push_back(interval);
492 }
493
494 // No GTIDs used, so the available interval is the complete set.
495 if (group_available_gtid_intervals.size() == 0)
496 {
497 Gtid_set::Interval interval= {1, GNO_END, NULL};
498 group_available_gtid_intervals.push_back(interval);
499 }
500
501 DBUG_VOID_RETURN;
502 }
503
reserve_gtid_block(longlong block_size)504 Gtid_set::Interval Certifier::reserve_gtid_block(longlong block_size)
505 {
506 DBUG_ENTER("Certifier::reserve_gtid_block");
507 assert(block_size > 1);
508 mysql_mutex_assert_owner(&LOCK_certification_info);
509
510 Gtid_set::Interval result;
511
512 // We are out of intervals, we need to force intervals computation.
513 if (group_available_gtid_intervals.size() == 0)
514 compute_group_available_gtid_intervals();
515
516 std::list<Gtid_set::Interval>::iterator it= group_available_gtid_intervals.begin();
517 assert(it != group_available_gtid_intervals.end());
518
519 /*
520 We always have one or more intervals, the only thing to check
521 is if the first interval is exhausted, if so we need to purge
522 it to avoid future use.
523 */
524 if (block_size > it->end - it->start)
525 {
526 result= *it;
527 group_available_gtid_intervals.erase(it);
528 }
529 else
530 {
531 result.start= it->start;
532 result.end= it->start + block_size - 1;
533 it->start= result.end + 1;
534 assert(result.start <= result.end);
535 assert(result.start < it->start);
536 }
537
538 DBUG_RETURN(result);
539 }
540
add_to_group_gtid_executed_internal(rpl_sidno sidno,rpl_gno gno,bool local)541 void Certifier::add_to_group_gtid_executed_internal(rpl_sidno sidno,
542 rpl_gno gno,
543 bool local)
544 {
545 DBUG_ENTER("Certifier::add_to_group_gtid_executed_internal");
546 mysql_mutex_assert_owner(&LOCK_certification_info);
547 group_gtid_executed->_add_gtid(sidno, gno);
548 if (local)
549 {
550 assert(sidno >0 && gno >0);
551 last_local_gtid.set(sidno, gno);
552 }
553 /*
554 We only need to track certified transactions on
555 group_gtid_extracted while:
556 1) certifier is handling already applied transactions
557 on distributed recovery procedure;
558 2) the transaction does have a group GTID.
559 */
560 if (certifying_already_applied_transactions &&
561 sidno == group_gtid_sid_map_group_sidno)
562 group_gtid_extracted->_add_gtid(sidno, gno);
563
564 DBUG_VOID_RETURN;
565 }
566
clear_certification_info()567 void Certifier::clear_certification_info()
568 {
569 for (Certification_info::iterator it= certification_info.begin();
570 it != certification_info.end();
571 ++it)
572 {
573 // We can only delete the last reference.
574 if (it->second->unlink() == 0)
575 delete it->second;
576 }
577
578 certification_info.clear();
579 }
580
581
clear_incoming()582 void Certifier::clear_incoming()
583 {
584 DBUG_ENTER("Certifier::clear_incoming");
585 while (!this->incoming->empty())
586 {
587 Data_packet *packet= NULL;
588 this->incoming->pop(&packet);
589 delete packet;
590 }
591 DBUG_VOID_RETURN;
592 }
593
clear_members()594 void Certifier::clear_members()
595 {
596 DBUG_ENTER("Certifier::clear_members");
597 mysql_mutex_lock(&LOCK_members);
598 members.clear();
599 mysql_mutex_unlock(&LOCK_members);
600 DBUG_VOID_RETURN;
601 }
602
initialize(ulonglong gtid_assignment_block_size)603 int Certifier::initialize(ulonglong gtid_assignment_block_size)
604 {
605 DBUG_ENTER("Certifier::initialize");
606 int error= 0;
607 mysql_mutex_lock(&LOCK_certification_info);
608
609 if (is_initialized())
610 {
611 error= 1; /* purecov: inspected */
612 goto end; /* purecov: inspected */
613 }
614
615 assert(gtid_assignment_block_size >= 1);
616 this->gtid_assignment_block_size= gtid_assignment_block_size;
617
618 /*
619 We need to initialize group_gtid_executed from both GTID_EXECUTED
620 and applier retrieved GTID set to consider the already certified
621 but not yet applied GTIDs, that may exist on applier relay log when
622 this member is the one bootstrapping the group.
623 */
624 if (initialize_server_gtid_set(true))
625 {
626 log_message(MY_ERROR_LEVEL,
627 "Error during Certification module initialization.");
628 error= 1;
629 goto end;
630 }
631
632 error= broadcast_thread->initialize();
633 initialized= !error;
634
635 end:
636 mysql_mutex_unlock(&LOCK_certification_info);
637 DBUG_RETURN(error);
638 }
639
640
terminate()641 int Certifier::terminate()
642 {
643 DBUG_ENTER("Certifier::terminate");
644 int error= 0;
645
646 if (is_initialized())
647 error= broadcast_thread->terminate();
648
649 DBUG_RETURN(error);
650 }
651
652
increment_parallel_applier_sequence_number(bool update_parallel_applier_last_committed_global)653 void Certifier::increment_parallel_applier_sequence_number(
654 bool update_parallel_applier_last_committed_global)
655 {
656 DBUG_ENTER("Certifier::increment_parallel_applier_sequence_number");
657 mysql_mutex_assert_owner(&LOCK_certification_info);
658
659 assert(parallel_applier_last_committed_global <
660 parallel_applier_sequence_number);
661 if (update_parallel_applier_last_committed_global)
662 parallel_applier_last_committed_global= parallel_applier_sequence_number;
663
664 parallel_applier_sequence_number++;
665
666 DBUG_VOID_RETURN;
667 }
668
669
certify(Gtid_set * snapshot_version,std::list<const char * > * write_set,bool generate_group_id,const char * member_uuid,Gtid_log_event * gle,bool local_transaction)670 rpl_gno Certifier::certify(Gtid_set *snapshot_version,
671 std::list<const char*> *write_set,
672 bool generate_group_id,
673 const char *member_uuid,
674 Gtid_log_event *gle,
675 bool local_transaction)
676 {
677 DBUG_ENTER("Certifier::certify");
678 rpl_gno result= 0;
679 const bool has_write_set= !write_set->empty();
680
681 if (!is_initialized())
682 DBUG_RETURN(-1); /* purecov: inspected */
683
684 mysql_mutex_lock(&LOCK_certification_info);
685 int64 transaction_last_committed= parallel_applier_last_committed_global;
686
687 DBUG_EXECUTE_IF("certifier_force_1_negative_certification", {
688 DBUG_SET("-d,certifier_force_1_negative_certification");
689 goto end;});
690
691 if (conflict_detection_enable)
692 {
693 for (std::list<const char*>::iterator it= write_set->begin();
694 it != write_set->end();
695 ++it)
696 {
697 Gtid_set *certified_write_set_snapshot_version=
698 get_certified_write_set_snapshot_version(*it);
699
700 /*
701 If the previous certified transaction snapshot version is not
702 a subset of the incoming transaction snapshot version, the current
703 transaction was executed on top of outdated data, so it will be
704 negatively certified. Otherwise, this transaction is marked
705 certified and goes into applier.
706 */
707 if (certified_write_set_snapshot_version != NULL &&
708 !certified_write_set_snapshot_version->is_subset(snapshot_version))
709 goto end;
710 }
711 }
712
713 if (certifying_already_applied_transactions &&
714 !group_gtid_extracted->is_subset_not_equals(group_gtid_executed))
715 {
716 certifying_already_applied_transactions= false;
717
718 #ifndef NDEBUG
719 char *group_gtid_executed_string= NULL;
720 char *group_gtid_extracted_string= NULL;
721 group_gtid_executed->to_string(&group_gtid_executed_string, true);
722 group_gtid_extracted->to_string(&group_gtid_extracted_string, true);
723 DBUG_PRINT("Certifier::certify()",
724 ("Set certifying_already_applied_transactions to false. "
725 "group_gtid_executed: \"%s\"; group_gtid_extracted_string: \"%s\"",
726 group_gtid_executed_string, group_gtid_extracted_string));
727 my_free(group_gtid_executed_string);
728 my_free(group_gtid_extracted_string);
729 #endif
730 }
731
732 /*
733 If the current transaction doesn't have a specified GTID, one
734 for group UUID will be generated.
735 This situation happens when transactions are executed with
736 GTID_NEXT equal to AUTOMATIC_GROUP (the default case).
737 */
738 if (generate_group_id)
739 {
740 /*
741 We need to ensure that group sidno does exist on snapshot
742 version due to the following scenario:
743 1) Member joins the group.
744 2) Goes through recovery procedure, view change is queued to
745 apply, member is marked ONLINE. This requires
746 --group_replication_recovery_complete_at=TRANSACTIONS_CERTIFIED
747 to happen.
748 3) Despite the view change log event is still being applied,
749 since the member is already ONLINE it can execute
750 transactions. The first transaction from this member will
751 not include any group GTID, since no group transaction is
752 yet applied.
753 4) As a result of this sequence snapshot_version will not
754 contain any group GTID and the below instruction
755 snapshot_version->_add_gtid(group_sidno, result);
756 would fail because of that.
757 */
758 if (snapshot_version->ensure_sidno(group_sidno) != RETURN_STATUS_OK)
759 {
760 log_message(MY_ERROR_LEVEL,
761 "Error updating transaction snapshot version after"
762 " transaction being positively certified"); /* purecov: inspected */
763 goto end; /* purecov: inspected */
764 }
765
766 result= get_group_next_available_gtid(member_uuid);
767 if (result < 0)
768 goto end;
769
770 /*
771 Add generated transaction GTID to transaction snapshot version.
772 */
773 snapshot_version->_add_gtid(group_sidno, result);
774
775 /*
776 Store last conflict free transaction identification.
777 sidno must be relative to group_gtid_sid_map.
778 */
779 last_conflict_free_transaction.set(group_gtid_sid_map_group_sidno,
780 result);
781
782 DBUG_PRINT("info",
783 ("Group replication Certifier: generated transaction "
784 "identifier: %llu", result));
785 }
786 else
787 {
788 /*
789 Check if it is an already used GTID
790 */
791 rpl_sidno sidno_for_group_gtid_sid_map= gle->get_sidno(group_gtid_sid_map);
792 if (sidno_for_group_gtid_sid_map < 1)
793 {
794 log_message(MY_ERROR_LEVEL,
795 "Error fetching transaction sidno after transaction"
796 " being positively certified"); /* purecov: inspected */
797 goto end; /* purecov: inspected */
798 }
799 if (group_gtid_executed->contains_gtid(sidno_for_group_gtid_sid_map, gle->get_gno()))
800 {
801 char buf[rpl_sid::TEXT_LENGTH + 1];
802 gle->get_sid()->to_string(buf);
803
804 log_message(MY_ERROR_LEVEL,
805 "The requested GTID '%s:%lld' was already used, the transaction will rollback"
806 , buf, gle->get_gno());
807 goto end;
808 }
809 /*
810 Add received transaction GTID to transaction snapshot version.
811 */
812 rpl_sidno sidno= gle->get_sidno(snapshot_version->get_sid_map());
813 if (sidno < 1)
814 {
815 log_message(MY_ERROR_LEVEL,
816 "Error fetching transaction sidno after transaction"
817 " being positively certified"); /* purecov: inspected */
818 goto end; /* purecov: inspected */
819 }
820
821 if (snapshot_version->ensure_sidno(sidno) != RETURN_STATUS_OK)
822 {
823 log_message(MY_ERROR_LEVEL,
824 "Error updating transaction snapshot version after"
825 " transaction being positively certified"); /* purecov: inspected */
826 goto end; /* purecov: inspected */
827 }
828 snapshot_version->_add_gtid(sidno, gle->get_gno());
829
830 /*
831 Store last conflict free transaction identification.
832 sidno must be relative to group_gtid_sid_map.
833 */
834 rpl_sidno last_conflict_free_transaction_sidno= gle->get_sidno(group_gtid_sid_map);
835 if (last_conflict_free_transaction_sidno < 1)
836 {
837 log_message(MY_WARNING_LEVEL,
838 "Unable to update last conflict free transaction, "
839 "this transaction will not be tracked on "
840 "performance_schema.replication_group_member_stats.last_conflict_free_transaction"); /* purecov: inspected */
841 }
842 else
843 {
844 last_conflict_free_transaction.set(last_conflict_free_transaction_sidno,
845 gle->get_gno());
846 }
847
848 result= 1;
849 DBUG_PRINT("info",
850 ("Group replication Certifier: there was no transaction identifier "
851 "generated since transaction already had a GTID specified"));
852 }
853
854 /*
855 Add the transaction's write set to certification info.
856 */
857 if (has_write_set)
858 {
859 // Only consider remote transactions for parallel applier indexes.
860 int64 transaction_sequence_number=
861 local_transaction ? -1 : parallel_applier_sequence_number;
862 Gtid_set_ref *snapshot_version_value=
863 new Gtid_set_ref(certification_info_sid_map, transaction_sequence_number);
864 if (snapshot_version_value->add_gtid_set(snapshot_version) != RETURN_STATUS_OK)
865 {
866 result= 0; /* purecov: inspected */
867 delete snapshot_version_value; /* purecov: inspected */
868 log_message(MY_ERROR_LEVEL,
869 "Error updating transaction snapshot version reference "
870 "for internal storage"); /* purecov: inspected */
871 goto end; /* purecov: inspected */
872 }
873
874 for(std::list<const char*>::iterator it= write_set->begin();
875 it != write_set->end();
876 ++it)
877 {
878 int64 item_previous_sequence_number= -1;
879
880 add_item(*it, snapshot_version_value,
881 &item_previous_sequence_number);
882
883 /*
884 Exclude previous sequence number that are smaller than global
885 last committed and that are the current sequence number.
886 transaction_last_committed is initialized with
887 parallel_applier_last_committed_global on the beginning of
888 this method.
889 */
890 if (item_previous_sequence_number > transaction_last_committed &&
891 item_previous_sequence_number != parallel_applier_sequence_number)
892 transaction_last_committed= item_previous_sequence_number;
893 }
894 }
895
896 /*
897 Update parallel applier indexes.
898 */
899 if (!local_transaction)
900 {
901 if (!has_write_set)
902 {
903 /*
904 DDL does not have write-set, so we need to ensure that it
905 is applied without any other transaction in parallel.
906 */
907 transaction_last_committed= parallel_applier_sequence_number - 1;
908 }
909
910 gle->last_committed= transaction_last_committed;
911 gle->sequence_number= parallel_applier_sequence_number;
912 assert(gle->last_committed >= 0);
913 assert(gle->sequence_number > 0);
914 assert(gle->last_committed < gle->sequence_number);
915
916 increment_parallel_applier_sequence_number(!has_write_set);
917 }
918
919 end:
920 update_certified_transaction_count(result>0);
921
922 mysql_mutex_unlock(&LOCK_certification_info);
923 DBUG_PRINT("info", ("Group replication Certifier: certification result: %llu",
924 result));
925 DBUG_RETURN(result);
926 }
927
928
add_specified_gtid_to_group_gtid_executed(Gtid_log_event * gle,bool local)929 int Certifier::add_specified_gtid_to_group_gtid_executed(Gtid_log_event *gle,
930 bool local)
931 {
932 DBUG_ENTER("Certifier::add_specified_gtid_to_group_gtid_executed");
933
934 mysql_mutex_lock(&LOCK_certification_info);
935 rpl_sidno sidno= gle->get_sidno(group_gtid_sid_map);
936
937 if (sidno < 1)
938 {
939 log_message(MY_ERROR_LEVEL,
940 "Error fetching transaction sidno while adding to the "
941 "group_gtid_executed set."); /* purecov: inspected */
942 mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
943 DBUG_RETURN(1); /* purecov: inspected */
944 }
945
946 if (group_gtid_executed->ensure_sidno(sidno) != RETURN_STATUS_OK)
947 {
948 log_message(MY_ERROR_LEVEL,
949 "Error while ensuring the sidno be present in the "
950 "group_gtid_executed"); /* purecov: inspected */
951 mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
952 DBUG_RETURN(1); /* purecov: inspected */
953 }
954
955 add_to_group_gtid_executed_internal(sidno, gle->get_gno(), local);
956
957 mysql_mutex_unlock(&LOCK_certification_info);
958 DBUG_RETURN(0);
959 }
960
add_group_gtid_to_group_gtid_executed(rpl_gno gno,bool local)961 int Certifier::add_group_gtid_to_group_gtid_executed(rpl_gno gno, bool local)
962 {
963 DBUG_ENTER("Certifier::add_group_gtid_to_group_gtid_executed");
964 mysql_mutex_lock(&LOCK_certification_info);
965 add_to_group_gtid_executed_internal(group_gtid_sid_map_group_sidno, gno, local);
966 mysql_mutex_unlock(&LOCK_certification_info);
967 DBUG_RETURN(0);
968 }
969
970 /*
971 This method will return the next GNO for the current transaction, it
972 will work with two behaviours:
973
974 1) member_uuid == NULL || gtid_assignment_block_size <= 1
975 View_change_log_events creation does call this method with
976 member_uuid set to NULL to force it to be created with the
977 first available GNO of the group. This will ensure that all
978 members do use the same GNO for it.
979 After a View_change_log_event is created we recompute available
980 GNOs to ensure that all members do have the same available GNOs
981 set.
982 This branch is also used when gtid_assignment_block_size is
983 set to 1, meaning that GNO will be assigned sequentially
984 according with certification order.
985
986 2) On the second branch we assign GNOs according to intervals
987 assigned to each member.
988 To avoid having eternal gaps when a member do use all of its
989 assigned GNOs, periodically we recompute the intervals, this
990 will make that GNOs available to other members.
991 The GNO is generated within the interval of available GNOs for
992 a given member.
993 When a member exhaust its assigned GNOs we reserve more for it
994 from the available GNOs set.
995 */
get_group_next_available_gtid(const char * member_uuid)996 rpl_gno Certifier::get_group_next_available_gtid(const char *member_uuid)
997 {
998 DBUG_ENTER("Certifier::get_group_next_available_gtid");
999 mysql_mutex_assert_owner(&LOCK_certification_info);
1000 rpl_gno result= 0;
1001
1002 if (member_uuid == NULL || gtid_assignment_block_size <= 1)
1003 {
1004 result= get_group_next_available_gtid_candidate(1, GNO_END);
1005 if (result < 0)
1006 {
1007 assert(result == -1);
1008 DBUG_RETURN(result);
1009 }
1010
1011 /*
1012 If we did log a view change event we need to recompute
1013 intervals, so that all members start from the same
1014 intervals.
1015 */
1016 if (member_uuid == NULL && gtid_assignment_block_size > 1)
1017 compute_group_available_gtid_intervals();
1018 }
1019 else
1020 {
1021 /*
1022 After a number of rounds equal to block size the blocks are
1023 collected back so that the GTID holes can be filled up by
1024 following transactions from other members.
1025 */
1026 if (gtids_assigned_in_blocks_counter % (gtid_assignment_block_size + 1) == 0)
1027 compute_group_available_gtid_intervals();
1028
1029 /*
1030 GTID is assigned in blocks to each member and are consumed
1031 from that block unless a new block is needed.
1032 */
1033 std::string member(member_uuid);
1034 std::map<std::string, Gtid_set::Interval>::iterator it=
1035 member_gtids.find(member);
1036
1037 if (it == member_gtids.end())
1038 {
1039 // There is no block assigned to this member so get one.
1040 std::pair<std::map<std::string, Gtid_set::Interval>::iterator, bool> insert_ret;
1041 std::pair<std::string, Gtid_set::Interval> member_pair(member,
1042 reserve_gtid_block(gtid_assignment_block_size));
1043 insert_ret= member_gtids.insert(member_pair);
1044 assert(insert_ret.second == true);
1045 it= insert_ret.first;
1046 }
1047
1048 result= get_group_next_available_gtid_candidate(it->second.start,
1049 it->second.end);
1050 while (result == -2)
1051 {
1052 // Block has no available GTIDs, reserve more.
1053 it->second= reserve_gtid_block(gtid_assignment_block_size);
1054 result= get_group_next_available_gtid_candidate(it->second.start,
1055 it->second.end);
1056 }
1057 if (result < 0)
1058 DBUG_RETURN(result);
1059
1060 it->second.start= result;
1061 gtids_assigned_in_blocks_counter++;
1062 }
1063
1064 assert(result > 0);
1065 DBUG_RETURN(result);
1066 }
1067
1068 rpl_gno
get_group_next_available_gtid_candidate(rpl_gno start,rpl_gno end) const1069 Certifier::get_group_next_available_gtid_candidate(rpl_gno start,
1070 rpl_gno end) const
1071 {
1072 DBUG_ENTER("Certifier::get_group_next_available_gtid_candidate");
1073 assert(start > 0);
1074 assert(start <= end);
1075 mysql_mutex_assert_owner(&LOCK_certification_info);
1076
1077 rpl_gno candidate= start;
1078 Gtid_set::Const_interval_iterator ivit(certifying_already_applied_transactions
1079 ? group_gtid_extracted
1080 : group_gtid_executed,
1081 group_gtid_sid_map_group_sidno);
1082 #ifndef NDEBUG
1083 if (certifying_already_applied_transactions)
1084 DBUG_PRINT("Certifier::get_group_next_available_gtid_candidate()",
1085 ("Generating group transaction id from group_gtid_extracted"));
1086 #endif
1087
1088 /*
1089 Walk through available intervals until we find the correct one
1090 or return GNO exhausted error.
1091 */
1092 while (true)
1093 {
1094 assert(candidate >= start);
1095 const Gtid_set::Interval *iv= ivit.get();
1096 rpl_gno next_interval_start= iv != NULL ? iv->start : GNO_END;
1097
1098 // Correct interval.
1099 if (candidate < next_interval_start)
1100 {
1101 if (candidate <= end)
1102 DBUG_RETURN(candidate);
1103 else
1104 DBUG_RETURN(-2);
1105 }
1106
1107 if (iv == NULL)
1108 {
1109 log_message(MY_ERROR_LEVEL,
1110 "Impossible to generate Global Transaction Identifier: "
1111 "the integer component reached the maximal value. Restart "
1112 "the group with a new group_replication_group_name.");
1113 DBUG_RETURN(-1);
1114 }
1115
1116 candidate= std::max(candidate, iv->end);
1117 ivit.next();
1118 }
1119 }
1120
add_item(const char * item,Gtid_set_ref * snapshot_version,int64 * item_previous_sequence_number)1121 bool Certifier::add_item(const char* item, Gtid_set_ref *snapshot_version,
1122 int64 *item_previous_sequence_number)
1123 {
1124 DBUG_ENTER("Certifier::add_item");
1125 mysql_mutex_assert_owner(&LOCK_certification_info);
1126 bool error= true;
1127 std::string key(item);
1128 Certification_info::iterator it= certification_info.find(key);
1129 snapshot_version->link();
1130
1131 if (it == certification_info.end())
1132 {
1133 std::pair<Certification_info::iterator, bool> ret=
1134 certification_info.insert(std::pair<std::string, Gtid_set_ref*>
1135 (key, snapshot_version));
1136 error= !ret.second;
1137 }
1138 else
1139 {
1140 *item_previous_sequence_number=
1141 it->second->get_parallel_applier_sequence_number();
1142
1143 if (it->second->unlink() == 0)
1144 delete it->second;
1145
1146 it->second= snapshot_version;
1147 error= false;
1148 }
1149
1150 DBUG_RETURN(error);
1151 }
1152
1153
get_certified_write_set_snapshot_version(const char * item)1154 Gtid_set *Certifier::get_certified_write_set_snapshot_version(const char* item)
1155 {
1156 DBUG_ENTER("Certifier::get_certified_write_set_snapshot_version");
1157 mysql_mutex_assert_owner(&LOCK_certification_info);
1158
1159 if (!is_initialized())
1160 DBUG_RETURN(NULL); /* purecov: inspected */
1161
1162 Certification_info::iterator it;
1163 std::string item_str(item);
1164
1165 it= certification_info.find(item_str);
1166
1167 if (it == certification_info.end())
1168 DBUG_RETURN(NULL);
1169 else
1170 DBUG_RETURN(it->second);
1171 }
1172
1173
1174 int
get_group_stable_transactions_set_string(char ** buffer,size_t * length)1175 Certifier::get_group_stable_transactions_set_string(char **buffer,
1176 size_t *length)
1177 {
1178 DBUG_ENTER("Certifier::get_group_stable_transactions_set_string");
1179 int error= 1;
1180
1181 char *m_buffer= NULL;
1182 int m_length= stable_gtid_set->to_string(&m_buffer, true);
1183 if (m_length >= 0)
1184 {
1185 *buffer= m_buffer;
1186 *length= static_cast<size_t>(m_length);
1187 error= 0;
1188 }
1189 else
1190 my_free(m_buffer); /* purecov: inspected */
1191
1192 DBUG_RETURN(error);
1193 }
1194
1195
set_group_stable_transactions_set(Gtid_set * executed_gtid_set)1196 bool Certifier::set_group_stable_transactions_set(Gtid_set* executed_gtid_set)
1197 {
1198 DBUG_ENTER("Certifier::set_group_stable_transactions_set");
1199
1200 if (!is_initialized())
1201 DBUG_RETURN(true); /* purecov: inspected */
1202
1203 if (executed_gtid_set == NULL)
1204 {
1205 log_message(MY_ERROR_LEVEL, "Invalid stable transactions set"); /* purecov: inspected */
1206 DBUG_RETURN(true); /* purecov: inspected */
1207 }
1208
1209 stable_gtid_set_lock->wrlock();
1210 if (stable_gtid_set->add_gtid_set(executed_gtid_set) != RETURN_STATUS_OK)
1211 {
1212 stable_gtid_set_lock->unlock(); /* purecov: inspected */
1213 log_message(MY_ERROR_LEVEL, "Error updating stable transactions set"); /* purecov: inspected */
1214 DBUG_RETURN(true); /* purecov: inspected */
1215 }
1216 stable_gtid_set_lock->unlock();
1217
1218 garbage_collect();
1219
1220 DBUG_RETURN(false);
1221 }
1222
garbage_collect()1223 void Certifier::garbage_collect()
1224 {
1225 DBUG_ENTER("Certifier::garbage_collect");
1226 DBUG_EXECUTE_IF("group_replication_do_not_clear_certification_database",
1227 { DBUG_VOID_RETURN; };);
1228
1229 mysql_mutex_lock(&LOCK_certification_info);
1230
1231 /*
1232 When a transaction "t" is applied to all group members and for all
1233 ongoing, i.e., not yet committed or aborted transactions,
1234 "t" was already committed when they executed (thus "t"
1235 precedes them), then "t" is stable and can be removed from
1236 the certification info.
1237 */
1238 Certification_info::iterator it= certification_info.begin();
1239 stable_gtid_set_lock->wrlock();
1240 while (it != certification_info.end())
1241 {
1242 if (it->second->is_subset_not_equals(stable_gtid_set))
1243 {
1244 if (it->second->unlink() == 0)
1245 delete it->second;
1246 certification_info.erase(it++);
1247 }
1248 else
1249 ++it;
1250 }
1251 stable_gtid_set_lock->unlock();
1252
1253 /*
1254 We need to update parallel applier indexes since we do not know
1255 what write sets were purged, which may cause transactions
1256 last committed to be incorrectly computed.
1257 */
1258 increment_parallel_applier_sequence_number(true);
1259
1260 #if !defined(NDEBUG)
1261 /*
1262 This part blocks the garbage collection process for 300 sec in order to
1263 simulate the case that while garbage collection is going on, we should
1264 skip the stable set messages round in order to prevent simultaneous
1265 access to stable_gtid_set.
1266 */
1267 if (certifier_garbage_collection_block)
1268 {
1269 certifier_garbage_collection_block= false;
1270 // my_sleep expects a given number of microseconds.
1271 my_sleep(broadcast_thread->BROADCAST_GTID_EXECUTED_PERIOD * 1500000);
1272 }
1273 #endif
1274
1275 mysql_mutex_unlock(&LOCK_certification_info);
1276
1277 /*
1278 Applier channel received set does only contain the GTIDs of the
1279 remote (committed by other members) transactions. On the long
1280 term, the gaps may create performance issues on the received
1281 set update. To avoid that, periodically, we update the received
1282 set with the full set of transactions committed on the group,
1283 closing the gaps.
1284 */
1285 if (channel_add_executed_gtids_to_received_gtids(applier_module_channel_name))
1286 {
1287 log_message(MY_WARNING_LEVEL,
1288 "There was an error when filling the missing GTIDs on "
1289 "the applier channel received set. Despite not critical, "
1290 "on the long run this may cause performance issues"); /* purecov: inspected */
1291 }
1292
1293 DBUG_VOID_RETURN;
1294 }
1295
1296
handle_certifier_data(const uchar * data,ulong len,const Gcs_member_identifier & gcs_member_id)1297 int Certifier::handle_certifier_data(const uchar *data, ulong len,
1298 const Gcs_member_identifier& gcs_member_id)
1299 {
1300 DBUG_ENTER("Certifier::handle_certifier_data");
1301 bool member_message_received= false;
1302
1303 if (!is_initialized())
1304 DBUG_RETURN(1); /* purecov: inspected */
1305
1306 mysql_mutex_lock(&LOCK_members);
1307 std::string member_id= gcs_member_id.get_member_id();
1308 #if !defined(NDEBUG)
1309 if (same_member_message_discarded)
1310 {
1311 /*
1312 Injecting the member_id in the member's vector to simulate the case of
1313 same member sending multiple messages.
1314 */
1315 this->members.push_back(member_id);
1316 }
1317 #endif
1318
1319 if (this->get_members_size() != plugin_get_group_members_number())
1320 {
1321 /*
1322 We check for the member_id of the current message if it is present in
1323 the member vector or not. If it is present, we will need to discard the
1324 message. If not we will add the message in the incoming message
1325 synchronized queue for stable set handling.
1326 */
1327 std::vector<std::string>::iterator it;
1328 it= std::find(members.begin(), members.end(), member_id);
1329 if (it != members.end())
1330 member_message_received= true;
1331 else
1332 this->members.push_back(member_id);
1333
1334 /*
1335 Since member is not present we can queue this message.
1336 */
1337 if (!member_message_received)
1338 {
1339 this->incoming->push(new Data_packet(data, len));
1340 }
1341 else
1342 {
1343 /*
1344 As member is already received we can throw the necessary warning of the
1345 member message already received.
1346 */
1347 Group_member_info *member_info=
1348 group_member_mgr->get_group_member_info_by_member_id(gcs_member_id);
1349 if (member_info != NULL)
1350 {
1351 log_message(MY_WARNING_LEVEL, "The member with address %s:%u has "
1352 "already sent the stable set. Therefore discarding the second "
1353 "message.", member_info->get_hostname().c_str(),
1354 member_info->get_port());
1355 delete member_info;
1356 }
1357 }
1358
1359 mysql_mutex_unlock(&LOCK_members);
1360
1361 /*
1362 If the incoming message queue size is equal to the number of the
1363 members in the group, we are sure that each member has sent their
1364 gtid_executed. So we can go ahead with the stable set handling.
1365 */
1366 if (plugin_get_group_members_number() == this->incoming->size())
1367 {
1368 int error= stable_set_handle();
1369 /*
1370 Clearing the members to proceed with the next round of garbage
1371 collection.
1372 */
1373 clear_members();
1374 DBUG_RETURN(error);
1375 }
1376 }
1377 else
1378 {
1379 log_message(MY_WARNING_LEVEL, "Skipping this round of stable set "
1380 "computation as certification garbage collection process is "
1381 "still running."); /* purecov: inspected */
1382 mysql_mutex_unlock(&LOCK_members); /* purecov: inspected */
1383 }
1384
1385 #if !defined(NDEBUG)
1386 if (same_member_message_discarded)
1387 {
1388 /*
1389 Clearing the flag here as the members vector is not cleaned above.
1390 */
1391 same_member_message_discarded= false;
1392 clear_members();
1393 }
1394 #endif
1395
1396 DBUG_RETURN(0);
1397 }
1398
stable_set_handle()1399 int Certifier::stable_set_handle()
1400 {
1401 DBUG_ENTER("Certifier:stable_set_handle");
1402
1403 Data_packet *packet= NULL;
1404 int error= 0;
1405
1406 Sid_map sid_map(NULL);
1407 Gtid_set executed_set(&sid_map, NULL);
1408
1409 /*
1410 Compute intersection between all received sets.
1411 */
1412 while(!error && !this->incoming->empty())
1413 {
1414 this->incoming->pop(&packet);
1415
1416 if (packet == NULL)
1417 {
1418 log_message(MY_ERROR_LEVEL, "Null packet on certifier's queue"); /* purecov: inspected */
1419 error= 1; /* purecov: inspected */
1420 break; /* purecov: inspected */
1421 }
1422
1423 uchar* payload= packet->payload;
1424 Gtid_set member_set(&sid_map, NULL);
1425 Gtid_set intersection_result(&sid_map, NULL);
1426
1427 if (member_set.add_gtid_encoding(payload, packet->len) != RETURN_STATUS_OK)
1428 {
1429 log_message(MY_ERROR_LEVEL, "Error reading GTIDs from the message"); /* purecov: inspected */
1430 error= 1; /* purecov: inspected */
1431 }
1432 else
1433 {
1434 /*
1435 First member set? If so we only need to add it to executed set.
1436 */
1437 if (executed_set.is_empty())
1438 {
1439 if (executed_set.add_gtid_set(&member_set))
1440 {
1441 log_message(MY_ERROR_LEVEL, "Error processing stable transactions set"); /* purecov: inspected */
1442 error= 1; /* purecov: inspected */
1443 }
1444 }
1445 else
1446 {
1447 /*
1448 We have three sets:
1449 member_set: the one sent from a given member;
1450 executed_set: the one that contains the intersection of
1451 the computed sets until now;
1452 intersection_result: the intersection between set and
1453 intersection_result.
1454 So we compute the intersection between set and executed_set, and
1455 set that value to executed_set to be used on the next intersection.
1456 */
1457 if (member_set.intersection(&executed_set, &intersection_result) != RETURN_STATUS_OK)
1458 {
1459 log_message(MY_ERROR_LEVEL, "Error processing intersection of stable transactions set"); /* purecov: inspected */
1460 error= 1; /* purecov: inspected */
1461 }
1462 else
1463 {
1464 executed_set.clear();
1465 if (executed_set.add_gtid_set(&intersection_result) != RETURN_STATUS_OK)
1466 {
1467 log_message(MY_ERROR_LEVEL, "Error processing stable transactions set"); /* purecov: inspected */
1468 error= 1; /* purecov: inspected */
1469 }
1470 }
1471 }
1472 }
1473
1474 delete packet;
1475 }
1476
1477 if (!error && set_group_stable_transactions_set(&executed_set))
1478 {
1479 log_message(MY_ERROR_LEVEL, "Error setting stable transactions set"); /* purecov: inspected */
1480 error= 1; /* purecov: inspected */
1481 }
1482
1483 #if !defined(NDEBUG)
1484 char *executed_set_string;
1485 executed_set.to_string(&executed_set_string);
1486 DBUG_PRINT("info", ("Certifier stable_set_handle: executed_set: %s", executed_set_string));
1487 my_free(executed_set_string);
1488 #endif
1489
1490 DBUG_RETURN(error);
1491 }
1492
handle_view_change()1493 void Certifier::handle_view_change()
1494 {
1495 DBUG_ENTER("Certifier::handle_view_change");
1496 clear_incoming();
1497 clear_members();
1498 DBUG_VOID_RETURN;
1499 }
1500
1501
get_certification_info(std::map<std::string,std::string> * cert_info)1502 void Certifier::get_certification_info(std::map<std::string, std::string> *cert_info)
1503 {
1504 DBUG_ENTER("Certifier::get_certification_info");
1505 mysql_mutex_lock(&LOCK_certification_info);
1506
1507 for(Certification_info::iterator it = certification_info.begin();
1508 it != certification_info.end(); ++it)
1509 {
1510 std::string key= it->first;
1511 assert(key.compare(GTID_EXTRACTED_NAME) != 0);
1512
1513 size_t len= it->second->get_encoded_length();
1514 uchar* buf= (uchar *)my_malloc(
1515 PSI_NOT_INSTRUMENTED,
1516 len, MYF(0));
1517 it->second->encode(buf);
1518 std::string value(reinterpret_cast<const char*>(buf), len);
1519 my_free(buf);
1520
1521 (*cert_info).insert(std::pair<std::string, std::string>(key, value));
1522 }
1523
1524 // Add the group_gtid_executed to certification info sent to joiners.
1525 size_t len= group_gtid_executed->get_encoded_length();
1526 uchar* buf= (uchar*) my_malloc(PSI_NOT_INSTRUMENTED, len, MYF(0));
1527 group_gtid_executed->encode(buf);
1528 std::string value(reinterpret_cast<const char*>(buf), len);
1529 my_free(buf);
1530 (*cert_info).insert(std::pair<std::string, std::string>(
1531 GTID_EXTRACTED_NAME, value));
1532
1533 mysql_mutex_unlock(&LOCK_certification_info);
1534 DBUG_VOID_RETURN;
1535 }
1536
generate_view_change_group_gno()1537 rpl_gno Certifier::generate_view_change_group_gno()
1538 {
1539 DBUG_ENTER("Certifier::generate_view_change_group_gno");
1540
1541 mysql_mutex_lock(&LOCK_certification_info);
1542 rpl_gno result= get_group_next_available_gtid(NULL);
1543
1544 DBUG_EXECUTE_IF("certifier_assert_next_seqno_equal_5",
1545 assert(result == 5););
1546 DBUG_EXECUTE_IF("certifier_assert_next_seqno_equal_7",
1547 assert(result == 7););
1548
1549 if (result > 0)
1550 add_to_group_gtid_executed_internal(group_gtid_sid_map_group_sidno, result, false);
1551 mysql_mutex_unlock(&LOCK_certification_info);
1552
1553 DBUG_RETURN(result);
1554 }
1555
1556
set_certification_info(std::map<std::string,std::string> * cert_info)1557 int Certifier::set_certification_info(std::map<std::string, std::string> *cert_info)
1558 {
1559 DBUG_ENTER("Certifier::set_certification_info");
1560 assert(cert_info != NULL);
1561
1562 if (cert_info->size() == 1) {
1563 std::map<std::string, std::string>::iterator it =
1564 cert_info->find(CERTIFICATION_INFO_ERROR_NAME);
1565 if (it != cert_info->end()) {
1566 log_message(MY_ERROR_LEVEL,
1567 "The certification information could not be set in this server: '%s'",
1568 it->second.c_str());
1569 DBUG_RETURN(1);
1570 }
1571 }
1572
1573 mysql_mutex_lock(&LOCK_certification_info);
1574
1575 clear_certification_info();
1576 for(std::map<std::string, std::string>::iterator it = cert_info->begin();
1577 it != cert_info->end(); ++it)
1578 {
1579 std::string key= it->first;
1580
1581 /*
1582 Extract the donor group_gtid_executed so that it can be used to
1583 while member is applying transactions that were already applied
1584 by distrubuted recovery procedure.
1585 */
1586 if (it->first.compare(GTID_EXTRACTED_NAME) == 0)
1587 {
1588 if (group_gtid_extracted->add_gtid_encoding(
1589 reinterpret_cast<const uchar*>(it->second.c_str()), it->second.length())
1590 != RETURN_STATUS_OK)
1591 {
1592 log_message(MY_ERROR_LEVEL,
1593 "Error reading group_gtid_extracted from the View_change_log_event"); /* purecov: inspected */
1594 mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
1595 DBUG_RETURN(1); /* purecov: inspected */
1596 }
1597 continue;
1598 }
1599
1600 Gtid_set_ref *value = new Gtid_set_ref(certification_info_sid_map, -1);
1601 if (value->add_gtid_encoding(
1602 reinterpret_cast<const uchar*>(it->second.c_str()), it->second.length())
1603 != RETURN_STATUS_OK)
1604 {
1605 log_message(MY_ERROR_LEVEL,
1606 "Error reading the write set item '%s' from the View_change_log_event",
1607 key.c_str()); /* purecov: inspected */
1608 mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
1609 DBUG_RETURN(1); /* purecov: inspected */
1610 }
1611 value->link();
1612 certification_info.insert(std::pair<std::string, Gtid_set_ref*>(key, value));
1613 }
1614
1615 if (initialize_server_gtid_set())
1616 {
1617 log_message(MY_ERROR_LEVEL, "Error during certfication_info"
1618 " initialization."); /* purecov: inspected */
1619 mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
1620 DBUG_RETURN(1); /* purecov: inspected */
1621 }
1622
1623 if (group_gtid_extracted->is_subset_not_equals(group_gtid_executed))
1624 {
1625 certifying_already_applied_transactions= true;
1626 compute_group_available_gtid_intervals();
1627
1628 #ifndef NDEBUG
1629 char *group_gtid_executed_string= NULL;
1630 char *group_gtid_extracted_string= NULL;
1631 group_gtid_executed->to_string(&group_gtid_executed_string, true);
1632 group_gtid_extracted->to_string(&group_gtid_extracted_string, true);
1633 DBUG_PRINT("Certifier::set_certification_info()",
1634 ("Set certifying_already_applied_transactions to true. "
1635 "group_gtid_executed: \"%s\"; group_gtid_extracted_string: \"%s\"",
1636 group_gtid_executed_string, group_gtid_extracted_string));
1637 my_free(group_gtid_executed_string);
1638 my_free(group_gtid_extracted_string);
1639 #endif
1640 }
1641
1642 mysql_mutex_unlock(&LOCK_certification_info);
1643 DBUG_RETURN(0);
1644 }
1645
update_certified_transaction_count(bool result)1646 void Certifier::update_certified_transaction_count(bool result)
1647 {
1648 if(result)
1649 positive_cert++;
1650 else
1651 negative_cert++;
1652
1653 if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
1654 {
1655 applier_module->get_pipeline_stats_member_collector()
1656 ->increment_transactions_certified();
1657 }
1658 }
1659
get_positive_certified()1660 ulonglong Certifier::get_positive_certified()
1661 {
1662 return positive_cert;
1663 }
1664
get_negative_certified()1665 ulonglong Certifier::get_negative_certified()
1666 {
1667 return negative_cert;
1668 }
1669
get_certification_info_size()1670 ulonglong Certifier::get_certification_info_size()
1671 {
1672 return certification_info.size();
1673 }
1674
get_last_conflict_free_transaction(std::string * value)1675 void Certifier::get_last_conflict_free_transaction(std::string* value)
1676 {
1677 int length= 0;
1678 char buffer[Gtid::MAX_TEXT_LENGTH + 1];
1679
1680 mysql_mutex_lock(&LOCK_certification_info);
1681 if (last_conflict_free_transaction.is_empty())
1682 goto end;
1683
1684 length= last_conflict_free_transaction.to_string(group_gtid_sid_map, buffer);
1685 if (length > 0)
1686 value->assign(buffer);
1687
1688 end:
1689 mysql_mutex_unlock(&LOCK_certification_info);
1690 }
1691
get_members_size()1692 size_t Certifier::get_members_size()
1693 {
1694 return members.size();
1695 }
1696
get_local_certified_gtid(std::string & local_gtid_certified_string)1697 size_t Certifier::get_local_certified_gtid(std::string& local_gtid_certified_string)
1698 {
1699 if (last_local_gtid.is_empty())
1700 return 0;
1701
1702 char buf[Gtid::MAX_TEXT_LENGTH + 1];
1703 last_local_gtid.to_string(group_gtid_sid_map, buf);
1704 local_gtid_certified_string.assign(buf);
1705 return local_gtid_certified_string.size();
1706 }
1707
enable_conflict_detection()1708 void Certifier::enable_conflict_detection()
1709 {
1710 DBUG_ENTER("Certifier::enable_conflict_detection");
1711 assert(local_member_info->in_primary_mode());
1712
1713 mysql_mutex_lock(&LOCK_certification_info);
1714 conflict_detection_enable= true;
1715 local_member_info->enable_conflict_detection();
1716 mysql_mutex_unlock(&LOCK_certification_info);
1717 DBUG_VOID_RETURN;
1718 }
1719
disable_conflict_detection()1720 void Certifier::disable_conflict_detection()
1721 {
1722 DBUG_ENTER("Certifier::disable_conflict_detection");
1723 assert(local_member_info->in_primary_mode());
1724
1725 mysql_mutex_lock(&LOCK_certification_info);
1726 conflict_detection_enable= false;
1727 local_member_info->disable_conflict_detection();
1728 mysql_mutex_unlock(&LOCK_certification_info);
1729
1730 log_message(MY_INFORMATION_LEVEL,
1731 "Primary had applied all relay logs, disabled conflict "
1732 "detection");
1733
1734 DBUG_VOID_RETURN;
1735 }
1736
is_conflict_detection_enable()1737 bool Certifier::is_conflict_detection_enable()
1738 {
1739 DBUG_ENTER("Certifier::is_conflict_detection_enable");
1740
1741 mysql_mutex_lock(&LOCK_certification_info);
1742 bool result= conflict_detection_enable;
1743 mysql_mutex_unlock(&LOCK_certification_info);
1744
1745 DBUG_RETURN(result);
1746 }
1747
1748 /*
1749 Gtid_Executed_Message implementation
1750 */
1751
Gtid_Executed_Message()1752 Gtid_Executed_Message::Gtid_Executed_Message()
1753 :Plugin_gcs_message(CT_CERTIFICATION_MESSAGE)
1754 {
1755 }
1756
~Gtid_Executed_Message()1757 Gtid_Executed_Message::~Gtid_Executed_Message()
1758 {
1759 }
1760
1761 void
append_gtid_executed(uchar * gtid_data,size_t len)1762 Gtid_Executed_Message::append_gtid_executed(uchar* gtid_data, size_t len)
1763 {
1764 data.insert(data.end(), gtid_data, gtid_data+len);
1765 }
1766
1767 void
encode_payload(std::vector<unsigned char> * buffer) const1768 Gtid_Executed_Message::encode_payload(std::vector<unsigned char>* buffer) const
1769 {
1770 DBUG_ENTER("Gtid_Executed_Message::encode_payload");
1771
1772 encode_payload_item_type_and_length(buffer, PIT_GTID_EXECUTED, data.size());
1773 buffer->insert(buffer->end(), data.begin(), data.end());
1774
1775 DBUG_VOID_RETURN;
1776 }
1777
1778 void
decode_payload(const unsigned char * buffer,const unsigned char * length)1779 Gtid_Executed_Message::decode_payload(const unsigned char* buffer,
1780 const unsigned char* length)
1781 {
1782 DBUG_ENTER("Gtid_Executed_Message::decode_payload");
1783 const unsigned char *slider= buffer;
1784 uint16 payload_item_type= 0;
1785 unsigned long long payload_item_length= 0;
1786
1787 decode_payload_item_type_and_length(&slider,
1788 &payload_item_type,
1789 &payload_item_length);
1790 data.clear();
1791 data.insert(data.end(), slider, slider + payload_item_length);
1792
1793 DBUG_VOID_RETURN;
1794 }
1795