1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include <signal.h>
24 #include <map>
25 
26 #include "certifier.h"
27 #include "plugin.h"
28 #include "plugin_log.h"
29 #include "observer_trans.h"
30 
31 #include "sql_service_command.h"
32 
33 const std::string Certifier::GTID_EXTRACTED_NAME= "gtid_extracted";
34 const std::string Certifier::CERTIFICATION_INFO_ERROR_NAME =
35     "certification_info_error";
36 
launch_broadcast_thread(void * arg)37 static void *launch_broadcast_thread(void* arg)
38 {
39   Certifier_broadcast_thread *handler= (Certifier_broadcast_thread*) arg;
40   handler->dispatcher();
41   return 0;
42 }
43 
Certifier_broadcast_thread()44 Certifier_broadcast_thread::Certifier_broadcast_thread()
45   :aborted(false), broadcast_thd_running(false), broadcast_counter(0),
46    broadcast_gtid_executed_period(BROADCAST_GTID_EXECUTED_PERIOD)
47 {
48   DBUG_EXECUTE_IF("group_replication_certifier_broadcast_thread_big_period",
49                   { broadcast_gtid_executed_period= 600; });
50 
51   mysql_mutex_init(key_GR_LOCK_cert_broadcast_run,
52                    &broadcast_run_lock,
53                    MY_MUTEX_INIT_FAST);
54   mysql_cond_init(key_GR_COND_cert_broadcast_run,
55                   &broadcast_run_cond);
56   mysql_mutex_init(key_GR_LOCK_cert_broadcast_dispatcher_run,
57                    &broadcast_dispatcher_lock,
58                    MY_MUTEX_INIT_FAST);
59   mysql_cond_init(key_GR_COND_cert_broadcast_dispatcher_run,
60                   &broadcast_dispatcher_cond);
61  }
62 
~Certifier_broadcast_thread()63 Certifier_broadcast_thread::~Certifier_broadcast_thread()
64 {
65   mysql_mutex_destroy(&broadcast_run_lock);
66   mysql_cond_destroy(&broadcast_run_cond);
67   mysql_mutex_destroy(&broadcast_dispatcher_lock);
68   mysql_cond_destroy(&broadcast_dispatcher_cond);
69 }
70 
initialize()71 int Certifier_broadcast_thread::initialize()
72 {
73   DBUG_ENTER("Certifier_broadcast_thread::initialize");
74 
75   mysql_mutex_lock(&broadcast_run_lock);
76   if (broadcast_thd_running)
77   {
78     mysql_mutex_unlock(&broadcast_run_lock); /* purecov: inspected */
79     DBUG_RETURN(0); /* purecov: inspected */
80   }
81 
82   aborted= false;
83 
84   if ((mysql_thread_create(key_GR_THD_cert_broadcast,
85                            &broadcast_pthd,
86                            get_connection_attrib(),
87                            launch_broadcast_thread,
88                            (void*)this)))
89   {
90     mysql_mutex_unlock(&broadcast_run_lock); /* purecov: inspected */
91     DBUG_RETURN(1); /* purecov: inspected */
92   }
93 
94   while (!broadcast_thd_running)
95   {
96     DBUG_PRINT("sleep",("Waiting for certifier broadcast thread to start"));
97     mysql_cond_wait(&broadcast_run_cond, &broadcast_run_lock);
98   }
99   mysql_mutex_unlock(&broadcast_run_lock);
100 
101   DBUG_RETURN(0);
102 }
103 
104 
terminate()105 int Certifier_broadcast_thread::terminate()
106 {
107   DBUG_ENTER("Certifier_broadcast_thread::terminate");
108 
109   mysql_mutex_lock(&broadcast_run_lock);
110   if (!broadcast_thd_running)
111   {
112     mysql_mutex_unlock(&broadcast_run_lock);
113     DBUG_RETURN(0);
114   }
115 
116   aborted= true;
117   while (broadcast_thd_running)
118   {
119     DBUG_PRINT("loop", ("killing certifier broadcast thread"));
120     mysql_mutex_lock(&broadcast_thd->LOCK_thd_data);
121 
122     //awake the cycle
123     mysql_mutex_lock(&broadcast_dispatcher_lock);
124     mysql_cond_broadcast(&broadcast_dispatcher_cond);
125     mysql_mutex_unlock(&broadcast_dispatcher_lock);
126 
127     broadcast_thd->awake(THD::NOT_KILLED);
128     mysql_mutex_unlock(&broadcast_thd->LOCK_thd_data);
129     mysql_cond_wait(&broadcast_run_cond, &broadcast_run_lock);
130   }
131   mysql_mutex_unlock(&broadcast_run_lock);
132 
133   DBUG_RETURN(0);
134 }
135 
136 
dispatcher()137 void Certifier_broadcast_thread::dispatcher()
138 {
139   DBUG_ENTER("Certifier_broadcast_thread::dispatcher");
140 
141   //Thread context operations
142   my_thread_init();
143   THD *thd= new THD;
144   thd->set_new_thread_id();
145   thd->thread_stack= (char*) &thd;
146   thd->store_globals();
147   global_thd_manager_add_thd(thd);
148   broadcast_thd= thd;
149 
150   mysql_mutex_lock(&broadcast_run_lock);
151   broadcast_thd_running= true;
152   mysql_cond_broadcast(&broadcast_run_cond);
153   mysql_mutex_unlock(&broadcast_run_lock);
154 
155   struct timespec abstime;
156   while (!aborted)
157   {
158     broadcast_counter++;
159 
160     applier_module->get_pipeline_stats_member_collector()
161         ->send_stats_member_message();
162 
163     applier_module->get_flow_control_module()->flow_control_step();
164 
165     if (broadcast_counter % broadcast_gtid_executed_period == 0)
166       broadcast_gtid_executed();
167 
168     mysql_mutex_lock(&broadcast_dispatcher_lock);
169     if (aborted)
170     {
171       mysql_mutex_unlock(&broadcast_dispatcher_lock); /* purecov: inspected */
172       break; /* purecov: inspected */
173     }
174     set_timespec(&abstime, 1);
175     mysql_cond_timedwait(&broadcast_dispatcher_cond,
176                          &broadcast_dispatcher_lock, &abstime);
177     mysql_mutex_unlock(&broadcast_dispatcher_lock);
178 
179     /*
180       Clear server sessions open caches on transactions observer.
181       TODO: move this to a global scheduler.
182     */
183     if (broadcast_counter % 300 == 0)
184       observer_trans_clear_io_cache_unused_list(); /* purecov: inspected */
185   }
186 
187   Gcs_interface_factory::cleanup(Gcs_operations::get_gcs_engine());
188 
189   thd->release_resources();
190   global_thd_manager_remove_thd(thd);
191   delete thd;
192 
193   mysql_mutex_lock(&broadcast_run_lock);
194   broadcast_thd_running= false;
195   mysql_cond_broadcast(&broadcast_run_cond);
196   mysql_mutex_unlock(&broadcast_run_lock);
197 
198   my_thread_end();
199 
200   DBUG_VOID_RETURN;
201 }
202 
broadcast_gtid_executed()203 int Certifier_broadcast_thread::broadcast_gtid_executed()
204 {
205   DBUG_ENTER("Certifier_broadcast_thread::broadcast_gtid_executed");
206 
207   /*
208     Member may be still joining group so we need to check if:
209       1) communication interfaces are ready to be used;
210       2) member is ONLINE, that is, distributed recovery is complete.
211   */
212   if (local_member_info == NULL)
213     DBUG_RETURN(0); /* purecov: inspected */
214   Group_member_info::Group_member_status member_status=
215       local_member_info->get_recovery_status();
216   if (member_status != Group_member_info::MEMBER_ONLINE &&
217       member_status != Group_member_info::MEMBER_IN_RECOVERY)
218     DBUG_RETURN(0);
219 
220   int error= 0;
221   uchar *encoded_gtid_executed= NULL;
222   size_t length;
223   get_server_encoded_gtid_executed(&encoded_gtid_executed, &length);
224 
225   Gtid_Executed_Message gtid_executed_message;
226   std::vector<uchar> encoded_gtid_executed_message;
227   gtid_executed_message.append_gtid_executed(encoded_gtid_executed, length);
228 
229   enum enum_gcs_error send_err=
230       gcs_module->send_message(gtid_executed_message, true);
231   if (send_err == GCS_MESSAGE_TOO_BIG)
232   {
233     log_message(MY_ERROR_LEVEL, "Broadcast of committed transactions message "
234                                 "failed. Message is too big."); /* purecov: inspected */
235     error= 1; /* purecov: inspected */
236   }
237   else if (send_err == GCS_NOK)
238   {
239     log_message(MY_INFORMATION_LEVEL,
240                 "Broadcast of committed transactions message failed."); /* purecov: inspected */
241     error= 1; /* purecov: inspected */
242   }
243 
244 
245 #if !defined(NDEBUG)
246   char *encoded_gtid_executed_string=
247       encoded_gtid_set_to_string(encoded_gtid_executed, length);
248   DBUG_PRINT("info", ("Certifier broadcast executed_set: %s", encoded_gtid_executed_string));
249   my_free(encoded_gtid_executed_string);
250 #endif
251 
252   my_free(encoded_gtid_executed);
253   DBUG_RETURN(error);
254 }
255 
256 
Certifier()257 Certifier::Certifier()
258   :initialized(false),
259    positive_cert(0), negative_cert(0),
260    parallel_applier_last_committed_global(1),
261    parallel_applier_sequence_number(2),
262    certifying_already_applied_transactions(false),
263    gtid_assignment_block_size(1),
264    gtids_assigned_in_blocks_counter(1),
265    conflict_detection_enable(!local_member_info->in_primary_mode())
266 {
267    last_conflict_free_transaction.clear();
268 
269 #if !defined(NDEBUG)
270   certifier_garbage_collection_block= false;
271   /*
272     Debug flag to block the garbage collection and discard incoming stable
273     set messages while garbage collection is on going.
274   */
275   DBUG_EXECUTE_IF("certifier_garbage_collection_block",
276                   certifier_garbage_collection_block= true;);
277 
278   same_member_message_discarded= false;
279   /*
280     Debug flag to check for similar member sending multiple messages.
281   */
282   DBUG_EXECUTE_IF("certifier_inject_duplicate_certifier_data_message",
283                   same_member_message_discarded= true;);
284 #endif
285 
286   certification_info_sid_map= new Sid_map(NULL);
287   incoming= new Synchronized_queue<Data_packet*>();
288 
289   stable_gtid_set_lock= new Checkable_rwlock(
290 #ifdef HAVE_PSI_INTERFACE
291                                              key_GR_RWLOCK_cert_stable_gtid_set
292 #endif
293                                             );
294   stable_sid_map= new Sid_map(stable_gtid_set_lock);
295   stable_gtid_set= new Gtid_set(stable_sid_map, stable_gtid_set_lock);
296   broadcast_thread= new Certifier_broadcast_thread();
297 
298   group_gtid_sid_map= new Sid_map(NULL);
299   group_gtid_executed= new Gtid_set(group_gtid_sid_map, NULL);
300   group_gtid_extracted= new Gtid_set(group_gtid_sid_map, NULL);
301 
302   last_local_gtid.clear();
303 
304   mysql_mutex_init(key_GR_LOCK_certification_info, &LOCK_certification_info,
305                    MY_MUTEX_INIT_FAST);
306   mysql_mutex_init(key_GR_LOCK_cert_members, &LOCK_members,
307                    MY_MUTEX_INIT_FAST);
308 }
309 
310 
~Certifier()311 Certifier::~Certifier()
312 {
313   clear_certification_info();
314   delete certification_info_sid_map;
315 
316   delete stable_gtid_set;
317   delete stable_sid_map;
318   delete stable_gtid_set_lock;
319   delete broadcast_thread;
320   delete group_gtid_executed;
321   delete group_gtid_extracted;
322   delete group_gtid_sid_map;
323 
324   clear_incoming();
325   delete incoming;
326 
327   clear_members();
328   mysql_mutex_destroy(&LOCK_certification_info);
329   mysql_mutex_destroy(&LOCK_members);
330 }
331 
initialize_server_gtid_set(bool get_server_gtid_retrieved)332 int Certifier::initialize_server_gtid_set(bool get_server_gtid_retrieved)
333 {
334   DBUG_ENTER("initialize_server_gtid_set");
335   mysql_mutex_assert_owner(&LOCK_certification_info);
336   int error= 0;
337   Sql_service_command_interface *sql_command_interface= NULL;
338   std::string gtid_executed;
339   std::string applier_retrieved_gtids;
340 
341   rpl_sid group_sid;
342   if (group_sid.parse(group_name_var) != RETURN_STATUS_OK)
343   {
344     log_message(MY_ERROR_LEVEL,
345                 "Unable to parse the group name during"
346                 " the Certification module initialization"); /* purecov: inspected */
347     error= 1; /* purecov: inspected */
348     goto end; /* purecov: inspected */
349   }
350 
351   group_gtid_sid_map_group_sidno= group_gtid_sid_map->add_sid(group_sid);
352   if (group_gtid_sid_map_group_sidno < 0)
353   {
354     log_message(MY_ERROR_LEVEL,
355                 "Unable to add the group_sid in the group_gtid_sid_map during"
356                 " the Certification module initialization"); /* purecov: inspected */
357     error= 1; /* purecov: inspected */
358     goto end; /* purecov: inspected */
359   }
360 
361   if (group_gtid_executed->ensure_sidno(group_gtid_sid_map_group_sidno) != RETURN_STATUS_OK)
362   {
363     log_message(MY_ERROR_LEVEL,
364                 "Error updating group_gtid_executed GITD set during"
365                 " the Certification module initialization"); /* purecov: inspected */
366     error= 1; /* purecov: inspected */
367     goto end; /* purecov: inspected */
368   }
369 
370   if (group_gtid_extracted->ensure_sidno(group_gtid_sid_map_group_sidno) != RETURN_STATUS_OK)
371   {
372     log_message(MY_ERROR_LEVEL,
373                 "Unable to handle the donor's transaction information"
374                 " when initializing the conflict detection component."
375                 " Possible out of memory error."); /* purecov: inspected */
376     error= 1; /* purecov: inspected */
377     goto end; /* purecov: inspected */
378   }
379 
380   sql_command_interface= new Sql_service_command_interface();
381   if (sql_command_interface->establish_session_connection(PSESSION_USE_THREAD) ||
382       sql_command_interface->set_interface_user(GROUPREPL_USER))
383   {
384     log_message(MY_ERROR_LEVEL,
385                 "Error when establishing a server connection during"
386                 " the Certification module initialization"); /* purecov: inspected */
387     error= 1; /* purecov: inspected */
388     goto end; /* purecov: inspected */
389   }
390 
391   error= sql_command_interface->get_server_gtid_executed(gtid_executed);
392   DBUG_EXECUTE_IF("gr_server_gtid_executed_extraction_error", error=1;);
393   if (error)
394   {
395     log_message(MY_WARNING_LEVEL,
396                 "Error when extracting this member GTID executed set."
397                 " Certification module can't be properly initialized");
398     goto end;
399   }
400 
401   if (group_gtid_executed->add_gtid_text(gtid_executed.c_str()) != RETURN_STATUS_OK)
402   {
403     log_message(MY_ERROR_LEVEL,
404                 "Error while adding the server GTID EXECUTED set to the"
405                 " group_gtid_execute during the Certification module"
406                 " initialization"); /* purecov: inspected */
407     error= 1; /* purecov: inspected */
408     goto end; /* purecov: inspected */
409   }
410 
411   if (get_server_gtid_retrieved)
412   {
413     Replication_thread_api applier_channel("group_replication_applier");
414     if (applier_channel.get_retrieved_gtid_set(applier_retrieved_gtids))
415     {
416       log_message(MY_WARNING_LEVEL,
417                   "Error when extracting this member retrieved set for its applier."
418                   " Certification module can't be properly initialized"); /* purecov: inspected */
419       error= 1; /* purecov: inspected */
420       goto end; /* purecov: inspected */
421     }
422 
423     if (group_gtid_executed->add_gtid_text(applier_retrieved_gtids.c_str()) != RETURN_STATUS_OK)
424     {
425       log_message(MY_ERROR_LEVEL,
426                   "Error while adding the member retrieved set to the"
427                   " group_gtid_executed during the Certification module"
428                   " initialization"); /* purecov: inspected */
429       error= 1; /* purecov: inspected */
430       goto end; /* purecov: inspected */
431     }
432   }
433 
434   compute_group_available_gtid_intervals();
435 
436 end:
437   delete sql_command_interface;
438 
439   DBUG_RETURN(error);
440 }
441 
compute_group_available_gtid_intervals()442 void Certifier::compute_group_available_gtid_intervals()
443 {
444   DBUG_ENTER("Certifier::compute_group_available_gtid_intervals");
445   mysql_mutex_assert_owner(&LOCK_certification_info);
446 
447   gtids_assigned_in_blocks_counter= 1;
448   member_gtids.clear();
449   group_available_gtid_intervals.clear();
450 
451   /*
452     Compute the GTID intervals that are available by inverting the
453     group_gtid_executed or group_gtid_extracted intervals.
454   */
455   Gtid_set::Const_interval_iterator ivit(certifying_already_applied_transactions
456                                              ? group_gtid_extracted
457                                              : group_gtid_executed,
458                                          group_gtid_sid_map_group_sidno);
459 #ifndef NDEBUG
460   if (certifying_already_applied_transactions)
461     DBUG_PRINT("Certifier::compute_group_available_gtid_intervals()",
462                ("Generating group transaction intervals from group_gtid_extracted"));
463 #endif
464 
465   const Gtid_set::Interval *iv= NULL, *iv_next= NULL;
466 
467   // The fist interval: UUID:100 -> we have the interval 1-99
468   if ((iv= ivit.get()) != NULL)
469   {
470     if (iv->start > 1)
471     {
472       Gtid_set::Interval interval= {1, iv->start - 1, NULL};
473       group_available_gtid_intervals.push_back(interval);
474     }
475   }
476 
477   // For each used interval find the upper bound and from there
478   // add the free GTIDs up to the next interval or GNO_END.
479   while ((iv= ivit.get()) != NULL)
480   {
481     ivit.next();
482     iv_next= ivit.get();
483 
484     rpl_gno start= iv->end;
485     rpl_gno end= GNO_END;
486     if (iv_next != NULL)
487       end= iv_next->start - 1;
488 
489     assert(start <= end);
490     Gtid_set::Interval interval= {start, end, NULL};
491     group_available_gtid_intervals.push_back(interval);
492   }
493 
494   // No GTIDs used, so the available interval is the complete set.
495   if (group_available_gtid_intervals.size() == 0)
496   {
497     Gtid_set::Interval interval= {1, GNO_END, NULL};
498     group_available_gtid_intervals.push_back(interval);
499   }
500 
501   DBUG_VOID_RETURN;
502 }
503 
reserve_gtid_block(longlong block_size)504 Gtid_set::Interval Certifier::reserve_gtid_block(longlong block_size)
505 {
506   DBUG_ENTER("Certifier::reserve_gtid_block");
507   assert(block_size > 1);
508   mysql_mutex_assert_owner(&LOCK_certification_info);
509 
510   Gtid_set::Interval result;
511 
512   // We are out of intervals, we need to force intervals computation.
513   if (group_available_gtid_intervals.size() == 0)
514     compute_group_available_gtid_intervals();
515 
516   std::list<Gtid_set::Interval>::iterator it= group_available_gtid_intervals.begin();
517   assert(it != group_available_gtid_intervals.end());
518 
519   /*
520     We always have one or more intervals, the only thing to check
521     is if the first interval is exhausted, if so we need to purge
522     it to avoid future use.
523   */
524   if (block_size > it->end - it->start)
525   {
526     result= *it;
527     group_available_gtid_intervals.erase(it);
528   }
529   else
530   {
531     result.start= it->start;
532     result.end= it->start + block_size - 1;
533     it->start= result.end + 1;
534     assert(result.start <= result.end);
535     assert(result.start < it->start);
536   }
537 
538   DBUG_RETURN(result);
539 }
540 
add_to_group_gtid_executed_internal(rpl_sidno sidno,rpl_gno gno,bool local)541 void Certifier::add_to_group_gtid_executed_internal(rpl_sidno sidno,
542                                                     rpl_gno gno,
543                                                     bool local)
544 {
545   DBUG_ENTER("Certifier::add_to_group_gtid_executed_internal");
546   mysql_mutex_assert_owner(&LOCK_certification_info);
547   group_gtid_executed->_add_gtid(sidno, gno);
548   if (local)
549   {
550     assert(sidno >0 && gno >0);
551     last_local_gtid.set(sidno, gno);
552   }
553   /*
554     We only need to track certified transactions on
555     group_gtid_extracted while:
556      1) certifier is handling already applied transactions
557         on distributed recovery procedure;
558      2) the transaction does have a group GTID.
559   */
560   if (certifying_already_applied_transactions &&
561       sidno == group_gtid_sid_map_group_sidno)
562     group_gtid_extracted->_add_gtid(sidno, gno);
563 
564   DBUG_VOID_RETURN;
565 }
566 
clear_certification_info()567 void Certifier::clear_certification_info()
568 {
569   for (Certification_info::iterator it= certification_info.begin();
570        it != certification_info.end();
571        ++it)
572   {
573     // We can only delete the last reference.
574     if (it->second->unlink() == 0)
575       delete it->second;
576   }
577 
578   certification_info.clear();
579 }
580 
581 
clear_incoming()582 void Certifier::clear_incoming()
583 {
584   DBUG_ENTER("Certifier::clear_incoming");
585   while (!this->incoming->empty())
586   {
587     Data_packet *packet= NULL;
588     this->incoming->pop(&packet);
589     delete packet;
590   }
591   DBUG_VOID_RETURN;
592 }
593 
clear_members()594 void Certifier::clear_members()
595 {
596   DBUG_ENTER("Certifier::clear_members");
597   mysql_mutex_lock(&LOCK_members);
598   members.clear();
599   mysql_mutex_unlock(&LOCK_members);
600   DBUG_VOID_RETURN;
601 }
602 
initialize(ulonglong gtid_assignment_block_size)603 int Certifier::initialize(ulonglong gtid_assignment_block_size)
604 {
605   DBUG_ENTER("Certifier::initialize");
606   int error= 0;
607   mysql_mutex_lock(&LOCK_certification_info);
608 
609   if (is_initialized())
610   {
611     error= 1; /* purecov: inspected */
612     goto end; /* purecov: inspected */
613   }
614 
615   assert(gtid_assignment_block_size >= 1);
616   this->gtid_assignment_block_size= gtid_assignment_block_size;
617 
618   /*
619     We need to initialize group_gtid_executed from both GTID_EXECUTED
620     and applier retrieved GTID set to consider the already certified
621     but not yet applied GTIDs, that may exist on applier relay log when
622     this member is the one bootstrapping the group.
623   */
624   if (initialize_server_gtid_set(true))
625   {
626     log_message(MY_ERROR_LEVEL,
627                 "Error during Certification module initialization.");
628     error= 1;
629     goto end;
630   }
631 
632   error= broadcast_thread->initialize();
633   initialized= !error;
634 
635 end:
636   mysql_mutex_unlock(&LOCK_certification_info);
637   DBUG_RETURN(error);
638 }
639 
640 
terminate()641 int Certifier::terminate()
642 {
643   DBUG_ENTER("Certifier::terminate");
644   int error= 0;
645 
646   if (is_initialized())
647     error= broadcast_thread->terminate();
648 
649   DBUG_RETURN(error);
650 }
651 
652 
increment_parallel_applier_sequence_number(bool update_parallel_applier_last_committed_global)653 void Certifier::increment_parallel_applier_sequence_number(
654     bool update_parallel_applier_last_committed_global)
655 {
656   DBUG_ENTER("Certifier::increment_parallel_applier_sequence_number");
657   mysql_mutex_assert_owner(&LOCK_certification_info);
658 
659   assert(parallel_applier_last_committed_global <
660          parallel_applier_sequence_number);
661   if (update_parallel_applier_last_committed_global)
662     parallel_applier_last_committed_global= parallel_applier_sequence_number;
663 
664   parallel_applier_sequence_number++;
665 
666   DBUG_VOID_RETURN;
667 }
668 
669 
certify(Gtid_set * snapshot_version,std::list<const char * > * write_set,bool generate_group_id,const char * member_uuid,Gtid_log_event * gle,bool local_transaction)670 rpl_gno Certifier::certify(Gtid_set *snapshot_version,
671                            std::list<const char*> *write_set,
672                            bool generate_group_id,
673                            const char *member_uuid,
674                            Gtid_log_event *gle,
675                            bool local_transaction)
676 {
677   DBUG_ENTER("Certifier::certify");
678   rpl_gno result= 0;
679   const bool has_write_set= !write_set->empty();
680 
681   if (!is_initialized())
682     DBUG_RETURN(-1); /* purecov: inspected */
683 
684   mysql_mutex_lock(&LOCK_certification_info);
685   int64 transaction_last_committed= parallel_applier_last_committed_global;
686 
687   DBUG_EXECUTE_IF("certifier_force_1_negative_certification", {
688                   DBUG_SET("-d,certifier_force_1_negative_certification");
689                   goto end;});
690 
691   if (conflict_detection_enable)
692   {
693     for (std::list<const char*>::iterator it= write_set->begin();
694          it != write_set->end();
695          ++it)
696     {
697       Gtid_set *certified_write_set_snapshot_version=
698           get_certified_write_set_snapshot_version(*it);
699 
700       /*
701         If the previous certified transaction snapshot version is not
702         a subset of the incoming transaction snapshot version, the current
703         transaction was executed on top of outdated data, so it will be
704         negatively certified. Otherwise, this transaction is marked
705         certified and goes into applier.
706       */
707       if (certified_write_set_snapshot_version != NULL &&
708           !certified_write_set_snapshot_version->is_subset(snapshot_version))
709         goto end;
710     }
711   }
712 
713   if (certifying_already_applied_transactions &&
714       !group_gtid_extracted->is_subset_not_equals(group_gtid_executed))
715   {
716     certifying_already_applied_transactions= false;
717 
718 #ifndef NDEBUG
719     char *group_gtid_executed_string= NULL;
720     char *group_gtid_extracted_string= NULL;
721     group_gtid_executed->to_string(&group_gtid_executed_string, true);
722     group_gtid_extracted->to_string(&group_gtid_extracted_string, true);
723     DBUG_PRINT("Certifier::certify()",
724                ("Set certifying_already_applied_transactions to false. "
725                 "group_gtid_executed: \"%s\"; group_gtid_extracted_string: \"%s\"",
726                 group_gtid_executed_string, group_gtid_extracted_string));
727     my_free(group_gtid_executed_string);
728     my_free(group_gtid_extracted_string);
729 #endif
730   }
731 
732   /*
733     If the current transaction doesn't have a specified GTID, one
734     for group UUID will be generated.
735     This situation happens when transactions are executed with
736     GTID_NEXT equal to AUTOMATIC_GROUP (the default case).
737   */
738   if (generate_group_id)
739   {
740     /*
741       We need to ensure that group sidno does exist on snapshot
742       version due to the following scenario:
743         1) Member joins the group.
744         2) Goes through recovery procedure, view change is queued to
745            apply, member is marked ONLINE. This requires
746              --group_replication_recovery_complete_at=TRANSACTIONS_CERTIFIED
747            to happen.
748         3) Despite the view change log event is still being applied,
749            since the member is already ONLINE it can execute
750            transactions. The first transaction from this member will
751            not include any group GTID, since no group transaction is
752            yet applied.
753         4) As a result of this sequence snapshot_version will not
754            contain any group GTID and the below instruction
755              snapshot_version->_add_gtid(group_sidno, result);
756            would fail because of that.
757     */
758     if (snapshot_version->ensure_sidno(group_sidno) != RETURN_STATUS_OK)
759     {
760       log_message(MY_ERROR_LEVEL,
761                   "Error updating transaction snapshot version after"
762                   " transaction being positively certified"); /* purecov: inspected */
763       goto end; /* purecov: inspected */
764     }
765 
766     result= get_group_next_available_gtid(member_uuid);
767     if (result < 0)
768       goto end;
769 
770     /*
771       Add generated transaction GTID to transaction snapshot version.
772     */
773     snapshot_version->_add_gtid(group_sidno, result);
774 
775     /*
776       Store last conflict free transaction identification.
777       sidno must be relative to group_gtid_sid_map.
778     */
779     last_conflict_free_transaction.set(group_gtid_sid_map_group_sidno,
780                                        result);
781 
782     DBUG_PRINT("info",
783                ("Group replication Certifier: generated transaction "
784                 "identifier: %llu", result));
785   }
786   else
787   {
788     /*
789       Check if it is an already used GTID
790     */
791     rpl_sidno sidno_for_group_gtid_sid_map= gle->get_sidno(group_gtid_sid_map);
792     if (sidno_for_group_gtid_sid_map < 1)
793     {
794       log_message(MY_ERROR_LEVEL,
795                   "Error fetching transaction sidno after transaction"
796                   " being positively certified"); /* purecov: inspected */
797       goto end; /* purecov: inspected */
798     }
799     if (group_gtid_executed->contains_gtid(sidno_for_group_gtid_sid_map, gle->get_gno()))
800     {
801       char buf[rpl_sid::TEXT_LENGTH + 1];
802       gle->get_sid()->to_string(buf);
803 
804       log_message(MY_ERROR_LEVEL,
805                   "The requested GTID '%s:%lld' was already used, the transaction will rollback"
806                   , buf, gle->get_gno());
807       goto end;
808     }
809     /*
810       Add received transaction GTID to transaction snapshot version.
811     */
812     rpl_sidno sidno= gle->get_sidno(snapshot_version->get_sid_map());
813     if (sidno < 1)
814     {
815       log_message(MY_ERROR_LEVEL,
816                   "Error fetching transaction sidno after transaction"
817                   " being positively certified"); /* purecov: inspected */
818       goto end; /* purecov: inspected */
819     }
820 
821     if (snapshot_version->ensure_sidno(sidno) != RETURN_STATUS_OK)
822     {
823       log_message(MY_ERROR_LEVEL,
824                   "Error updating transaction snapshot version after"
825                   " transaction being positively certified"); /* purecov: inspected */
826       goto end; /* purecov: inspected */
827     }
828     snapshot_version->_add_gtid(sidno, gle->get_gno());
829 
830     /*
831       Store last conflict free transaction identification.
832       sidno must be relative to group_gtid_sid_map.
833     */
834     rpl_sidno last_conflict_free_transaction_sidno= gle->get_sidno(group_gtid_sid_map);
835     if (last_conflict_free_transaction_sidno < 1)
836     {
837       log_message(MY_WARNING_LEVEL,
838                   "Unable to update last conflict free transaction, "
839                   "this transaction will not be tracked on "
840                   "performance_schema.replication_group_member_stats.last_conflict_free_transaction"); /* purecov: inspected */
841     }
842     else
843     {
844       last_conflict_free_transaction.set(last_conflict_free_transaction_sidno,
845                                          gle->get_gno());
846     }
847 
848     result= 1;
849     DBUG_PRINT("info",
850                ("Group replication Certifier: there was no transaction identifier "
851                 "generated since transaction already had a GTID specified"));
852   }
853 
854   /*
855     Add the transaction's write set to certification info.
856   */
857   if (has_write_set)
858   {
859     // Only consider remote transactions for parallel applier indexes.
860     int64 transaction_sequence_number=
861         local_transaction ? -1 : parallel_applier_sequence_number;
862     Gtid_set_ref *snapshot_version_value=
863         new Gtid_set_ref(certification_info_sid_map, transaction_sequence_number);
864     if (snapshot_version_value->add_gtid_set(snapshot_version) != RETURN_STATUS_OK)
865     {
866       result= 0; /* purecov: inspected */
867       delete snapshot_version_value; /* purecov: inspected */
868       log_message(MY_ERROR_LEVEL,
869                   "Error updating transaction snapshot version reference "
870                   "for internal storage"); /* purecov: inspected */
871       goto end; /* purecov: inspected */
872     }
873 
874     for(std::list<const char*>::iterator it= write_set->begin();
875         it != write_set->end();
876         ++it)
877     {
878       int64 item_previous_sequence_number= -1;
879 
880       add_item(*it, snapshot_version_value,
881                &item_previous_sequence_number);
882 
883       /*
884         Exclude previous sequence number that are smaller than global
885         last committed and that are the current sequence number.
886         transaction_last_committed is initialized with
887         parallel_applier_last_committed_global on the beginning of
888         this method.
889       */
890       if (item_previous_sequence_number > transaction_last_committed &&
891           item_previous_sequence_number != parallel_applier_sequence_number)
892         transaction_last_committed= item_previous_sequence_number;
893     }
894   }
895 
896   /*
897     Update parallel applier indexes.
898   */
899   if (!local_transaction)
900   {
901     if (!has_write_set)
902     {
903       /*
904         DDL does not have write-set, so we need to ensure that it
905         is applied without any other transaction in parallel.
906       */
907       transaction_last_committed= parallel_applier_sequence_number - 1;
908     }
909 
910     gle->last_committed= transaction_last_committed;
911     gle->sequence_number= parallel_applier_sequence_number;
912     assert(gle->last_committed >= 0);
913     assert(gle->sequence_number > 0);
914     assert(gle->last_committed < gle->sequence_number);
915 
916     increment_parallel_applier_sequence_number(!has_write_set);
917   }
918 
919 end:
920   update_certified_transaction_count(result>0);
921 
922   mysql_mutex_unlock(&LOCK_certification_info);
923   DBUG_PRINT("info", ("Group replication Certifier: certification result: %llu",
924                       result));
925   DBUG_RETURN(result);
926 }
927 
928 
add_specified_gtid_to_group_gtid_executed(Gtid_log_event * gle,bool local)929 int Certifier::add_specified_gtid_to_group_gtid_executed(Gtid_log_event *gle,
930                                                          bool local)
931 {
932   DBUG_ENTER("Certifier::add_specified_gtid_to_group_gtid_executed");
933 
934   mysql_mutex_lock(&LOCK_certification_info);
935   rpl_sidno sidno= gle->get_sidno(group_gtid_sid_map);
936 
937   if (sidno < 1)
938   {
939     log_message(MY_ERROR_LEVEL,
940                 "Error fetching transaction sidno while adding to the "
941                 "group_gtid_executed set."); /* purecov: inspected */
942     mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
943     DBUG_RETURN(1); /* purecov: inspected */
944   }
945 
946   if (group_gtid_executed->ensure_sidno(sidno) != RETURN_STATUS_OK)
947   {
948     log_message(MY_ERROR_LEVEL,
949                 "Error while ensuring the sidno be present in the "
950                 "group_gtid_executed"); /* purecov: inspected */
951     mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
952     DBUG_RETURN(1); /* purecov: inspected */
953   }
954 
955   add_to_group_gtid_executed_internal(sidno, gle->get_gno(), local);
956 
957   mysql_mutex_unlock(&LOCK_certification_info);
958   DBUG_RETURN(0);
959 }
960 
add_group_gtid_to_group_gtid_executed(rpl_gno gno,bool local)961 int Certifier::add_group_gtid_to_group_gtid_executed(rpl_gno gno, bool local)
962 {
963   DBUG_ENTER("Certifier::add_group_gtid_to_group_gtid_executed");
964   mysql_mutex_lock(&LOCK_certification_info);
965   add_to_group_gtid_executed_internal(group_gtid_sid_map_group_sidno, gno, local);
966   mysql_mutex_unlock(&LOCK_certification_info);
967   DBUG_RETURN(0);
968 }
969 
970 /*
971   This method will return the next GNO for the current transaction, it
972   will work with two behaviours:
973 
974   1) member_uuid == NULL || gtid_assignment_block_size <= 1
975      View_change_log_events creation does call this method with
976      member_uuid set to NULL to force it to be created with the
977      first available GNO of the group. This will ensure that all
978      members do use the same GNO for it.
979      After a View_change_log_event is created we recompute available
980      GNOs to ensure that all members do have the same available GNOs
981      set.
982      This branch is also used when gtid_assignment_block_size is
983      set to 1, meaning that GNO will be assigned sequentially
984      according with certification order.
985 
986   2) On the second branch we assign GNOs according to intervals
987      assigned to each member.
988      To avoid having eternal gaps when a member do use all of its
989      assigned GNOs, periodically we recompute the intervals, this
990      will make that GNOs available to other members.
991      The GNO is generated within the interval of available GNOs for
992      a given member.
993      When a member exhaust its assigned GNOs we reserve more for it
994      from the available GNOs set.
995 */
get_group_next_available_gtid(const char * member_uuid)996 rpl_gno Certifier::get_group_next_available_gtid(const char *member_uuid)
997 {
998   DBUG_ENTER("Certifier::get_group_next_available_gtid");
999   mysql_mutex_assert_owner(&LOCK_certification_info);
1000   rpl_gno result= 0;
1001 
1002   if (member_uuid == NULL || gtid_assignment_block_size <= 1)
1003   {
1004     result= get_group_next_available_gtid_candidate(1, GNO_END);
1005     if (result < 0)
1006     {
1007       assert(result == -1);
1008       DBUG_RETURN(result);
1009     }
1010 
1011     /*
1012       If we did log a view change event we need to recompute
1013       intervals, so that all members start from the same
1014       intervals.
1015     */
1016     if (member_uuid == NULL && gtid_assignment_block_size > 1)
1017       compute_group_available_gtid_intervals();
1018   }
1019   else
1020   {
1021     /*
1022       After a number of rounds equal to block size the blocks are
1023       collected back so that the GTID holes can be filled up by
1024       following transactions from other members.
1025     */
1026     if (gtids_assigned_in_blocks_counter % (gtid_assignment_block_size + 1) == 0)
1027       compute_group_available_gtid_intervals();
1028 
1029     /*
1030       GTID is assigned in blocks to each member and are consumed
1031       from that block unless a new block is needed.
1032     */
1033     std::string member(member_uuid);
1034     std::map<std::string, Gtid_set::Interval>::iterator it=
1035         member_gtids.find(member);
1036 
1037     if (it == member_gtids.end())
1038     {
1039       // There is no block assigned to this member so get one.
1040       std::pair<std::map<std::string, Gtid_set::Interval>::iterator, bool> insert_ret;
1041       std::pair<std::string, Gtid_set::Interval> member_pair(member,
1042           reserve_gtid_block(gtid_assignment_block_size));
1043       insert_ret= member_gtids.insert(member_pair);
1044       assert(insert_ret.second == true);
1045       it= insert_ret.first;
1046     }
1047 
1048     result= get_group_next_available_gtid_candidate(it->second.start,
1049                                                     it->second.end);
1050     while (result == -2)
1051     {
1052       // Block has no available GTIDs, reserve more.
1053       it->second= reserve_gtid_block(gtid_assignment_block_size);
1054       result= get_group_next_available_gtid_candidate(it->second.start,
1055                                                       it->second.end);
1056     }
1057     if (result < 0)
1058       DBUG_RETURN(result);
1059 
1060     it->second.start= result;
1061     gtids_assigned_in_blocks_counter++;
1062   }
1063 
1064   assert(result > 0);
1065   DBUG_RETURN(result);
1066 }
1067 
1068 rpl_gno
get_group_next_available_gtid_candidate(rpl_gno start,rpl_gno end) const1069 Certifier::get_group_next_available_gtid_candidate(rpl_gno start,
1070                                                    rpl_gno end) const
1071 {
1072   DBUG_ENTER("Certifier::get_group_next_available_gtid_candidate");
1073   assert(start > 0);
1074   assert(start <= end);
1075   mysql_mutex_assert_owner(&LOCK_certification_info);
1076 
1077   rpl_gno candidate= start;
1078   Gtid_set::Const_interval_iterator ivit(certifying_already_applied_transactions
1079                                              ? group_gtid_extracted
1080                                              : group_gtid_executed,
1081                                          group_gtid_sid_map_group_sidno);
1082 #ifndef NDEBUG
1083   if (certifying_already_applied_transactions)
1084     DBUG_PRINT("Certifier::get_group_next_available_gtid_candidate()",
1085                ("Generating group transaction id from group_gtid_extracted"));
1086 #endif
1087 
1088   /*
1089     Walk through available intervals until we find the correct one
1090     or return GNO exhausted error.
1091   */
1092   while (true)
1093   {
1094     assert(candidate >= start);
1095     const Gtid_set::Interval *iv= ivit.get();
1096     rpl_gno next_interval_start= iv != NULL ? iv->start : GNO_END;
1097 
1098     // Correct interval.
1099     if (candidate < next_interval_start)
1100     {
1101       if (candidate <= end)
1102         DBUG_RETURN(candidate);
1103       else
1104         DBUG_RETURN(-2);
1105     }
1106 
1107     if (iv == NULL)
1108     {
1109       log_message(MY_ERROR_LEVEL,
1110                   "Impossible to generate Global Transaction Identifier: "
1111                   "the integer component reached the maximal value. Restart "
1112                   "the group with a new group_replication_group_name.");
1113       DBUG_RETURN(-1);
1114     }
1115 
1116     candidate= std::max(candidate, iv->end);
1117     ivit.next();
1118   }
1119 }
1120 
add_item(const char * item,Gtid_set_ref * snapshot_version,int64 * item_previous_sequence_number)1121 bool Certifier::add_item(const char* item, Gtid_set_ref *snapshot_version,
1122                          int64 *item_previous_sequence_number)
1123 {
1124   DBUG_ENTER("Certifier::add_item");
1125   mysql_mutex_assert_owner(&LOCK_certification_info);
1126   bool error= true;
1127   std::string key(item);
1128   Certification_info::iterator it= certification_info.find(key);
1129   snapshot_version->link();
1130 
1131   if (it == certification_info.end())
1132   {
1133     std::pair<Certification_info::iterator, bool> ret=
1134         certification_info.insert(std::pair<std::string, Gtid_set_ref*>
1135                                   (key, snapshot_version));
1136     error= !ret.second;
1137   }
1138   else
1139   {
1140     *item_previous_sequence_number=
1141         it->second->get_parallel_applier_sequence_number();
1142 
1143     if (it->second->unlink() == 0)
1144       delete it->second;
1145 
1146     it->second= snapshot_version;
1147     error= false;
1148   }
1149 
1150   DBUG_RETURN(error);
1151 }
1152 
1153 
get_certified_write_set_snapshot_version(const char * item)1154 Gtid_set *Certifier::get_certified_write_set_snapshot_version(const char* item)
1155 {
1156   DBUG_ENTER("Certifier::get_certified_write_set_snapshot_version");
1157   mysql_mutex_assert_owner(&LOCK_certification_info);
1158 
1159   if (!is_initialized())
1160     DBUG_RETURN(NULL); /* purecov: inspected */
1161 
1162   Certification_info::iterator it;
1163   std::string item_str(item);
1164 
1165   it= certification_info.find(item_str);
1166 
1167   if (it == certification_info.end())
1168     DBUG_RETURN(NULL);
1169   else
1170     DBUG_RETURN(it->second);
1171 }
1172 
1173 
1174 int
get_group_stable_transactions_set_string(char ** buffer,size_t * length)1175 Certifier::get_group_stable_transactions_set_string(char **buffer,
1176                                                     size_t *length)
1177 {
1178   DBUG_ENTER("Certifier::get_group_stable_transactions_set_string");
1179   int error= 1;
1180 
1181   char *m_buffer= NULL;
1182   int m_length= stable_gtid_set->to_string(&m_buffer, true);
1183   if (m_length >= 0)
1184   {
1185     *buffer= m_buffer;
1186     *length= static_cast<size_t>(m_length);
1187     error= 0;
1188   }
1189   else
1190     my_free(m_buffer); /* purecov: inspected */
1191 
1192   DBUG_RETURN(error);
1193 }
1194 
1195 
set_group_stable_transactions_set(Gtid_set * executed_gtid_set)1196 bool Certifier::set_group_stable_transactions_set(Gtid_set* executed_gtid_set)
1197 {
1198   DBUG_ENTER("Certifier::set_group_stable_transactions_set");
1199 
1200   if (!is_initialized())
1201     DBUG_RETURN(true); /* purecov: inspected */
1202 
1203   if (executed_gtid_set == NULL)
1204   {
1205     log_message(MY_ERROR_LEVEL, "Invalid stable transactions set"); /* purecov: inspected */
1206     DBUG_RETURN(true); /* purecov: inspected */
1207   }
1208 
1209   stable_gtid_set_lock->wrlock();
1210   if (stable_gtid_set->add_gtid_set(executed_gtid_set) != RETURN_STATUS_OK)
1211   {
1212     stable_gtid_set_lock->unlock(); /* purecov: inspected */
1213     log_message(MY_ERROR_LEVEL, "Error updating stable transactions set"); /* purecov: inspected */
1214     DBUG_RETURN(true); /* purecov: inspected */
1215   }
1216   stable_gtid_set_lock->unlock();
1217 
1218   garbage_collect();
1219 
1220   DBUG_RETURN(false);
1221 }
1222 
garbage_collect()1223 void Certifier::garbage_collect()
1224 {
1225   DBUG_ENTER("Certifier::garbage_collect");
1226   DBUG_EXECUTE_IF("group_replication_do_not_clear_certification_database",
1227                     { DBUG_VOID_RETURN; };);
1228 
1229   mysql_mutex_lock(&LOCK_certification_info);
1230 
1231   /*
1232     When a transaction "t" is applied to all group members and for all
1233     ongoing, i.e., not yet committed or aborted transactions,
1234     "t" was already committed when they executed (thus "t"
1235     precedes them), then "t" is stable and can be removed from
1236     the certification info.
1237   */
1238   Certification_info::iterator it= certification_info.begin();
1239   stable_gtid_set_lock->wrlock();
1240   while (it != certification_info.end())
1241   {
1242     if (it->second->is_subset_not_equals(stable_gtid_set))
1243     {
1244       if (it->second->unlink() == 0)
1245         delete it->second;
1246       certification_info.erase(it++);
1247     }
1248     else
1249       ++it;
1250   }
1251   stable_gtid_set_lock->unlock();
1252 
1253   /*
1254     We need to update parallel applier indexes since we do not know
1255     what write sets were purged, which may cause transactions
1256     last committed to be incorrectly computed.
1257   */
1258   increment_parallel_applier_sequence_number(true);
1259 
1260 #if !defined(NDEBUG)
1261   /*
1262     This part blocks the garbage collection process for 300 sec in order to
1263     simulate the case that while garbage collection is going on, we should
1264     skip the stable set messages round in order to prevent simultaneous
1265     access to stable_gtid_set.
1266   */
1267   if (certifier_garbage_collection_block)
1268   {
1269     certifier_garbage_collection_block= false;
1270     // my_sleep expects a given number of microseconds.
1271     my_sleep(broadcast_thread->BROADCAST_GTID_EXECUTED_PERIOD * 1500000);
1272   }
1273 #endif
1274 
1275   mysql_mutex_unlock(&LOCK_certification_info);
1276 
1277   /*
1278     Applier channel received set does only contain the GTIDs of the
1279     remote (committed by other members) transactions. On the long
1280     term, the gaps may create performance issues on the received
1281     set update. To avoid that, periodically, we update the received
1282     set with the full set of transactions committed on the group,
1283     closing the gaps.
1284   */
1285   if (channel_add_executed_gtids_to_received_gtids(applier_module_channel_name))
1286   {
1287     log_message(MY_WARNING_LEVEL,
1288                 "There was an error when filling the missing GTIDs on "
1289                 "the applier channel received set. Despite not critical, "
1290                 "on the long run this may cause performance issues"); /* purecov: inspected */
1291   }
1292 
1293   DBUG_VOID_RETURN;
1294 }
1295 
1296 
handle_certifier_data(const uchar * data,ulong len,const Gcs_member_identifier & gcs_member_id)1297 int Certifier::handle_certifier_data(const uchar *data, ulong len,
1298                                      const Gcs_member_identifier& gcs_member_id)
1299 {
1300   DBUG_ENTER("Certifier::handle_certifier_data");
1301   bool member_message_received= false;
1302 
1303   if (!is_initialized())
1304     DBUG_RETURN(1); /* purecov: inspected */
1305 
1306   mysql_mutex_lock(&LOCK_members);
1307   std::string member_id= gcs_member_id.get_member_id();
1308 #if !defined(NDEBUG)
1309   if (same_member_message_discarded)
1310   {
1311     /*
1312       Injecting the member_id in the member's vector to simulate the case of
1313       same member sending multiple messages.
1314     */
1315     this->members.push_back(member_id);
1316   }
1317 #endif
1318 
1319   if (this->get_members_size() != plugin_get_group_members_number())
1320   {
1321     /*
1322       We check for the member_id of the current message if it is present in
1323       the member vector or not. If it is present, we will need to discard the
1324       message. If not we will add the message in the incoming message
1325       synchronized queue for stable set handling.
1326     */
1327     std::vector<std::string>::iterator it;
1328     it= std::find(members.begin(), members.end(), member_id);
1329     if (it != members.end())
1330       member_message_received= true;
1331     else
1332       this->members.push_back(member_id);
1333 
1334     /*
1335       Since member is not present we can queue this message.
1336     */
1337     if (!member_message_received)
1338     {
1339       this->incoming->push(new Data_packet(data, len));
1340     }
1341     else
1342     {
1343       /*
1344         As member is already received we can throw the necessary warning of the
1345         member message already received.
1346       */
1347       Group_member_info *member_info=
1348           group_member_mgr->get_group_member_info_by_member_id(gcs_member_id);
1349       if (member_info != NULL)
1350       {
1351         log_message(MY_WARNING_LEVEL, "The member with address %s:%u has "
1352                     "already sent the stable set. Therefore discarding the second "
1353                     "message.", member_info->get_hostname().c_str(),
1354                     member_info->get_port());
1355         delete member_info;
1356       }
1357     }
1358 
1359     mysql_mutex_unlock(&LOCK_members);
1360 
1361     /*
1362       If the incoming message queue size is equal to the number of the
1363       members in the group, we are sure that each member has sent their
1364       gtid_executed. So we can go ahead with the stable set handling.
1365     */
1366     if (plugin_get_group_members_number() == this->incoming->size())
1367     {
1368       int error= stable_set_handle();
1369       /*
1370         Clearing the members to proceed with the next round of garbage
1371         collection.
1372       */
1373       clear_members();
1374       DBUG_RETURN(error);
1375     }
1376   }
1377   else
1378   {
1379     log_message(MY_WARNING_LEVEL, "Skipping this round of stable set "
1380                 "computation as certification garbage collection process is "
1381                 "still running."); /* purecov: inspected */
1382     mysql_mutex_unlock(&LOCK_members); /* purecov: inspected */
1383   }
1384 
1385 #if !defined(NDEBUG)
1386   if (same_member_message_discarded)
1387   {
1388     /*
1389       Clearing the flag here as the members vector is not cleaned above.
1390     */
1391     same_member_message_discarded= false;
1392     clear_members();
1393   }
1394 #endif
1395 
1396   DBUG_RETURN(0);
1397 }
1398 
stable_set_handle()1399 int Certifier::stable_set_handle()
1400 {
1401   DBUG_ENTER("Certifier:stable_set_handle");
1402 
1403   Data_packet *packet= NULL;
1404   int error= 0;
1405 
1406   Sid_map sid_map(NULL);
1407   Gtid_set executed_set(&sid_map, NULL);
1408 
1409   /*
1410     Compute intersection between all received sets.
1411   */
1412   while(!error && !this->incoming->empty())
1413   {
1414     this->incoming->pop(&packet);
1415 
1416     if (packet == NULL)
1417     {
1418       log_message(MY_ERROR_LEVEL, "Null packet on certifier's queue"); /* purecov: inspected */
1419       error= 1; /* purecov: inspected */
1420       break;    /* purecov: inspected */
1421     }
1422 
1423     uchar* payload= packet->payload;
1424     Gtid_set member_set(&sid_map, NULL);
1425     Gtid_set intersection_result(&sid_map, NULL);
1426 
1427     if (member_set.add_gtid_encoding(payload, packet->len) != RETURN_STATUS_OK)
1428     {
1429       log_message(MY_ERROR_LEVEL, "Error reading GTIDs from the message"); /* purecov: inspected */
1430       error= 1; /* purecov: inspected */
1431     }
1432     else
1433     {
1434       /*
1435         First member set? If so we only need to add it to executed set.
1436       */
1437       if (executed_set.is_empty())
1438       {
1439         if (executed_set.add_gtid_set(&member_set))
1440         {
1441           log_message(MY_ERROR_LEVEL, "Error processing stable transactions set"); /* purecov: inspected */
1442           error= 1; /* purecov: inspected */
1443         }
1444       }
1445       else
1446       {
1447         /*
1448           We have three sets:
1449             member_set:          the one sent from a given member;
1450             executed_set:        the one that contains the intersection of
1451                                  the computed sets until now;
1452             intersection_result: the intersection between set and
1453                                  intersection_result.
1454           So we compute the intersection between set and executed_set, and
1455           set that value to executed_set to be used on the next intersection.
1456         */
1457         if (member_set.intersection(&executed_set, &intersection_result) != RETURN_STATUS_OK)
1458         {
1459           log_message(MY_ERROR_LEVEL, "Error processing intersection of stable transactions set"); /* purecov: inspected */
1460           error= 1; /* purecov: inspected */
1461         }
1462         else
1463         {
1464           executed_set.clear();
1465           if (executed_set.add_gtid_set(&intersection_result) != RETURN_STATUS_OK)
1466           {
1467             log_message(MY_ERROR_LEVEL, "Error processing stable transactions set"); /* purecov: inspected */
1468             error= 1; /* purecov: inspected */
1469           }
1470         }
1471       }
1472     }
1473 
1474     delete packet;
1475   }
1476 
1477   if (!error && set_group_stable_transactions_set(&executed_set))
1478   {
1479     log_message(MY_ERROR_LEVEL, "Error setting stable transactions set"); /* purecov: inspected */
1480     error= 1; /* purecov: inspected */
1481   }
1482 
1483 #if !defined(NDEBUG)
1484   char *executed_set_string;
1485   executed_set.to_string(&executed_set_string);
1486   DBUG_PRINT("info", ("Certifier stable_set_handle: executed_set: %s", executed_set_string));
1487   my_free(executed_set_string);
1488 #endif
1489 
1490   DBUG_RETURN(error);
1491 }
1492 
handle_view_change()1493 void Certifier::handle_view_change()
1494 {
1495   DBUG_ENTER("Certifier::handle_view_change");
1496   clear_incoming();
1497   clear_members();
1498   DBUG_VOID_RETURN;
1499 }
1500 
1501 
get_certification_info(std::map<std::string,std::string> * cert_info)1502 void Certifier::get_certification_info(std::map<std::string, std::string> *cert_info)
1503 {
1504   DBUG_ENTER("Certifier::get_certification_info");
1505   mysql_mutex_lock(&LOCK_certification_info);
1506 
1507   for(Certification_info::iterator it = certification_info.begin();
1508       it != certification_info.end(); ++it)
1509   {
1510     std::string key= it->first;
1511     assert(key.compare(GTID_EXTRACTED_NAME) != 0);
1512 
1513     size_t len= it->second->get_encoded_length();
1514     uchar* buf= (uchar *)my_malloc(
1515                                    PSI_NOT_INSTRUMENTED,
1516                                    len, MYF(0));
1517     it->second->encode(buf);
1518     std::string value(reinterpret_cast<const char*>(buf), len);
1519     my_free(buf);
1520 
1521     (*cert_info).insert(std::pair<std::string, std::string>(key, value));
1522   }
1523 
1524   // Add the group_gtid_executed to certification info sent to joiners.
1525   size_t len= group_gtid_executed->get_encoded_length();
1526   uchar* buf= (uchar*) my_malloc(PSI_NOT_INSTRUMENTED, len, MYF(0));
1527   group_gtid_executed->encode(buf);
1528   std::string value(reinterpret_cast<const char*>(buf), len);
1529   my_free(buf);
1530   (*cert_info).insert(std::pair<std::string, std::string>(
1531       GTID_EXTRACTED_NAME, value));
1532 
1533   mysql_mutex_unlock(&LOCK_certification_info);
1534   DBUG_VOID_RETURN;
1535 }
1536 
generate_view_change_group_gno()1537 rpl_gno Certifier::generate_view_change_group_gno()
1538 {
1539   DBUG_ENTER("Certifier::generate_view_change_group_gno");
1540 
1541   mysql_mutex_lock(&LOCK_certification_info);
1542   rpl_gno result= get_group_next_available_gtid(NULL);
1543 
1544   DBUG_EXECUTE_IF("certifier_assert_next_seqno_equal_5",
1545                   assert(result == 5););
1546   DBUG_EXECUTE_IF("certifier_assert_next_seqno_equal_7",
1547                   assert(result == 7););
1548 
1549   if (result > 0)
1550     add_to_group_gtid_executed_internal(group_gtid_sid_map_group_sidno, result, false);
1551   mysql_mutex_unlock(&LOCK_certification_info);
1552 
1553   DBUG_RETURN(result);
1554 }
1555 
1556 
set_certification_info(std::map<std::string,std::string> * cert_info)1557 int Certifier::set_certification_info(std::map<std::string, std::string> *cert_info)
1558 {
1559   DBUG_ENTER("Certifier::set_certification_info");
1560   assert(cert_info != NULL);
1561 
1562   if (cert_info->size() == 1) {
1563     std::map<std::string, std::string>::iterator it =
1564         cert_info->find(CERTIFICATION_INFO_ERROR_NAME);
1565     if (it != cert_info->end()) {
1566       log_message(MY_ERROR_LEVEL,
1567                   "The certification information could not be set in this server: '%s'",
1568                   it->second.c_str());
1569       DBUG_RETURN(1);
1570     }
1571   }
1572 
1573   mysql_mutex_lock(&LOCK_certification_info);
1574 
1575   clear_certification_info();
1576   for(std::map<std::string, std::string>::iterator it = cert_info->begin();
1577       it != cert_info->end(); ++it)
1578   {
1579     std::string key= it->first;
1580 
1581     /*
1582       Extract the donor group_gtid_executed so that it can be used to
1583       while member is applying transactions that were already applied
1584       by distrubuted recovery procedure.
1585     */
1586     if (it->first.compare(GTID_EXTRACTED_NAME) == 0)
1587     {
1588       if (group_gtid_extracted->add_gtid_encoding(
1589               reinterpret_cast<const uchar*>(it->second.c_str()), it->second.length())
1590             != RETURN_STATUS_OK)
1591       {
1592         log_message(MY_ERROR_LEVEL,
1593                     "Error reading group_gtid_extracted from the View_change_log_event"); /* purecov: inspected */
1594         mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
1595         DBUG_RETURN(1); /* purecov: inspected */
1596       }
1597       continue;
1598     }
1599 
1600     Gtid_set_ref *value = new Gtid_set_ref(certification_info_sid_map, -1);
1601     if (value->add_gtid_encoding(
1602             reinterpret_cast<const uchar*>(it->second.c_str()), it->second.length())
1603           != RETURN_STATUS_OK)
1604     {
1605       log_message(MY_ERROR_LEVEL,
1606                   "Error reading the write set item '%s' from the View_change_log_event",
1607                   key.c_str()); /* purecov: inspected */
1608       mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
1609       DBUG_RETURN(1); /* purecov: inspected */
1610     }
1611     value->link();
1612     certification_info.insert(std::pair<std::string, Gtid_set_ref*>(key, value));
1613   }
1614 
1615   if (initialize_server_gtid_set())
1616   {
1617     log_message(MY_ERROR_LEVEL, "Error during certfication_info"
1618                 " initialization."); /* purecov: inspected */
1619     mysql_mutex_unlock(&LOCK_certification_info); /* purecov: inspected */
1620     DBUG_RETURN(1); /* purecov: inspected */
1621   }
1622 
1623   if (group_gtid_extracted->is_subset_not_equals(group_gtid_executed))
1624   {
1625     certifying_already_applied_transactions= true;
1626     compute_group_available_gtid_intervals();
1627 
1628 #ifndef NDEBUG
1629     char *group_gtid_executed_string= NULL;
1630     char *group_gtid_extracted_string= NULL;
1631     group_gtid_executed->to_string(&group_gtid_executed_string, true);
1632     group_gtid_extracted->to_string(&group_gtid_extracted_string, true);
1633     DBUG_PRINT("Certifier::set_certification_info()",
1634                ("Set certifying_already_applied_transactions to true. "
1635                 "group_gtid_executed: \"%s\"; group_gtid_extracted_string: \"%s\"",
1636                 group_gtid_executed_string, group_gtid_extracted_string));
1637     my_free(group_gtid_executed_string);
1638     my_free(group_gtid_extracted_string);
1639 #endif
1640   }
1641 
1642   mysql_mutex_unlock(&LOCK_certification_info);
1643   DBUG_RETURN(0);
1644 }
1645 
update_certified_transaction_count(bool result)1646 void Certifier::update_certified_transaction_count(bool result)
1647 {
1648   if(result)
1649     positive_cert++;
1650   else
1651     negative_cert++;
1652 
1653   if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
1654   {
1655     applier_module->get_pipeline_stats_member_collector()
1656         ->increment_transactions_certified();
1657   }
1658 }
1659 
get_positive_certified()1660 ulonglong Certifier::get_positive_certified()
1661 {
1662   return positive_cert;
1663 }
1664 
get_negative_certified()1665 ulonglong Certifier::get_negative_certified()
1666 {
1667   return negative_cert;
1668 }
1669 
get_certification_info_size()1670 ulonglong Certifier::get_certification_info_size()
1671 {
1672   return certification_info.size();
1673 }
1674 
get_last_conflict_free_transaction(std::string * value)1675 void Certifier::get_last_conflict_free_transaction(std::string* value)
1676 {
1677   int length= 0;
1678   char buffer[Gtid::MAX_TEXT_LENGTH + 1];
1679 
1680   mysql_mutex_lock(&LOCK_certification_info);
1681   if (last_conflict_free_transaction.is_empty())
1682     goto end;
1683 
1684   length= last_conflict_free_transaction.to_string(group_gtid_sid_map, buffer);
1685   if (length > 0)
1686     value->assign(buffer);
1687 
1688 end:
1689   mysql_mutex_unlock(&LOCK_certification_info);
1690 }
1691 
get_members_size()1692 size_t Certifier::get_members_size()
1693 {
1694   return members.size();
1695 }
1696 
get_local_certified_gtid(std::string & local_gtid_certified_string)1697 size_t Certifier::get_local_certified_gtid(std::string& local_gtid_certified_string)
1698 {
1699   if (last_local_gtid.is_empty())
1700       return 0;
1701 
1702   char buf[Gtid::MAX_TEXT_LENGTH + 1];
1703   last_local_gtid.to_string(group_gtid_sid_map, buf);
1704   local_gtid_certified_string.assign(buf);
1705   return local_gtid_certified_string.size();
1706 }
1707 
enable_conflict_detection()1708 void Certifier::enable_conflict_detection()
1709 {
1710   DBUG_ENTER("Certifier::enable_conflict_detection");
1711   assert(local_member_info->in_primary_mode());
1712 
1713   mysql_mutex_lock(&LOCK_certification_info);
1714   conflict_detection_enable= true;
1715   local_member_info->enable_conflict_detection();
1716   mysql_mutex_unlock(&LOCK_certification_info);
1717   DBUG_VOID_RETURN;
1718 }
1719 
disable_conflict_detection()1720 void Certifier::disable_conflict_detection()
1721 {
1722   DBUG_ENTER("Certifier::disable_conflict_detection");
1723   assert(local_member_info->in_primary_mode());
1724 
1725   mysql_mutex_lock(&LOCK_certification_info);
1726   conflict_detection_enable= false;
1727   local_member_info->disable_conflict_detection();
1728   mysql_mutex_unlock(&LOCK_certification_info);
1729 
1730   log_message(MY_INFORMATION_LEVEL,
1731               "Primary had applied all relay logs, disabled conflict "
1732               "detection");
1733 
1734   DBUG_VOID_RETURN;
1735 }
1736 
is_conflict_detection_enable()1737 bool Certifier::is_conflict_detection_enable()
1738 {
1739   DBUG_ENTER("Certifier::is_conflict_detection_enable");
1740 
1741   mysql_mutex_lock(&LOCK_certification_info);
1742   bool result= conflict_detection_enable;
1743   mysql_mutex_unlock(&LOCK_certification_info);
1744 
1745   DBUG_RETURN(result);
1746 }
1747 
1748 /*
1749   Gtid_Executed_Message implementation
1750  */
1751 
Gtid_Executed_Message()1752 Gtid_Executed_Message::Gtid_Executed_Message()
1753   :Plugin_gcs_message(CT_CERTIFICATION_MESSAGE)
1754 {
1755 }
1756 
~Gtid_Executed_Message()1757 Gtid_Executed_Message::~Gtid_Executed_Message()
1758 {
1759 }
1760 
1761 void
append_gtid_executed(uchar * gtid_data,size_t len)1762 Gtid_Executed_Message::append_gtid_executed(uchar* gtid_data, size_t len)
1763 {
1764   data.insert(data.end(), gtid_data, gtid_data+len);
1765 }
1766 
1767 void
encode_payload(std::vector<unsigned char> * buffer) const1768 Gtid_Executed_Message::encode_payload(std::vector<unsigned char>* buffer) const
1769 {
1770   DBUG_ENTER("Gtid_Executed_Message::encode_payload");
1771 
1772   encode_payload_item_type_and_length(buffer, PIT_GTID_EXECUTED, data.size());
1773   buffer->insert(buffer->end(), data.begin(), data.end());
1774 
1775   DBUG_VOID_RETURN;
1776 }
1777 
1778 void
decode_payload(const unsigned char * buffer,const unsigned char * length)1779 Gtid_Executed_Message::decode_payload(const unsigned char* buffer,
1780                                       const unsigned char* length)
1781 {
1782   DBUG_ENTER("Gtid_Executed_Message::decode_payload");
1783   const unsigned char *slider= buffer;
1784   uint16 payload_item_type= 0;
1785   unsigned long long payload_item_length= 0;
1786 
1787   decode_payload_item_type_and_length(&slider,
1788                                       &payload_item_type,
1789                                       &payload_item_length);
1790   data.clear();
1791   data.insert(data.end(), slider, slider + payload_item_length);
1792 
1793   DBUG_VOID_RETURN;
1794 }
1795