1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "member_info.h"
24 #include "plugin_psi.h"
25 
26 using std::string;
27 using std::vector;
28 using std::map;
29 
30 Group_member_info::
Group_member_info(char * hostname_arg,uint port_arg,char * uuid_arg,int write_set_extraction_algorithm_arg,const std::string & gcs_member_id_arg,Group_member_info::Group_member_status status_arg,Member_version & member_version_arg,ulonglong gtid_assignment_block_size_arg,Group_member_info::Group_member_role role_arg,bool in_single_primary_mode,bool has_enforces_update_everywhere_checks,uint member_weight_arg,uint lower_case_table_names_arg)31 Group_member_info(char* hostname_arg,
32                   uint port_arg,
33                   char* uuid_arg,
34                   int write_set_extraction_algorithm_arg,
35                   const std::string& gcs_member_id_arg,
36                   Group_member_info::Group_member_status status_arg,
37                   Member_version& member_version_arg,
38                   ulonglong gtid_assignment_block_size_arg,
39                   Group_member_info::Group_member_role role_arg,
40                   bool in_single_primary_mode,
41                   bool has_enforces_update_everywhere_checks,
42                   uint member_weight_arg,
43                   uint lower_case_table_names_arg)
44   : Plugin_gcs_message(CT_MEMBER_INFO_MESSAGE),
45     hostname(hostname_arg), port(port_arg), uuid(uuid_arg),
46     status(status_arg),
47     write_set_extraction_algorithm(write_set_extraction_algorithm_arg),
48     gtid_assignment_block_size(gtid_assignment_block_size_arg),
49     unreachable(false),
50     role(role_arg),
51     configuration_flags(0), conflict_detection_enable(!in_single_primary_mode),
52     member_weight(member_weight_arg),
53     lower_case_table_names(lower_case_table_names_arg)
54 {
55   gcs_member_id= new Gcs_member_identifier(gcs_member_id_arg);
56   member_version= new Member_version(member_version_arg.get_version());
57 
58   /* Handle single_primary_mode */
59   if (in_single_primary_mode)
60     configuration_flags |= CNF_SINGLE_PRIMARY_MODE_F;
61 
62   /* Handle enforce_update_everywhere_checks */
63   if (has_enforces_update_everywhere_checks)
64     configuration_flags |= CNF_ENFORCE_UPDATE_EVERYWHERE_CHECKS_F;
65 }
66 
Group_member_info(Group_member_info & other)67 Group_member_info::Group_member_info(Group_member_info& other)
68   : Plugin_gcs_message(CT_MEMBER_INFO_MESSAGE),
69     hostname(other.get_hostname()),
70     port(other.get_port()),
71     uuid(other.get_uuid()),
72     status(other.get_recovery_status()),
73     executed_gtid_set(other.get_gtid_executed()),
74     retrieved_gtid_set(other.get_gtid_retrieved()),
75     write_set_extraction_algorithm(other.get_write_set_extraction_algorithm()),
76     gtid_assignment_block_size(other.get_gtid_assignment_block_size()),
77     unreachable(other.is_unreachable()),
78     role(other.get_role()),
79     configuration_flags(other.get_configuration_flags()),
80     conflict_detection_enable(other.is_conflict_detection_enabled()),
81     member_weight(other.get_member_weight()),
82     lower_case_table_names(other.get_lower_case_table_names())
83 {
84   gcs_member_id= new Gcs_member_identifier(other.get_gcs_member_id()
85                                                .get_member_id());
86   member_version= new Member_version(other.get_member_version()
87                                          .get_version());
88 }
89 
Group_member_info(const uchar * data,uint64 len)90 Group_member_info::Group_member_info(const uchar* data, uint64 len)
91   : Plugin_gcs_message(CT_MEMBER_INFO_MESSAGE),
92     gcs_member_id(NULL), member_version(NULL),
93     unreachable(false),
94     lower_case_table_names(DEFAULT_NOT_RECEIVED_LOWER_CASE_TABLE_NAMES)
95 {
96   decode(data, len);
97 }
98 
~Group_member_info()99 Group_member_info::~Group_member_info()
100 {
101   delete gcs_member_id;
102   delete member_version;
103 }
104 
105 void
encode_payload(std::vector<unsigned char> * buffer) const106 Group_member_info::encode_payload(std::vector<unsigned char>* buffer) const
107 {
108   DBUG_ENTER("Group_member_info::encode_payload");
109 
110   encode_payload_item_string(buffer, PIT_HOSTNAME,
111                              hostname.c_str(),
112                              hostname.length());
113 
114   uint16 port_aux= (uint16)port;
115   encode_payload_item_int2(buffer, PIT_PORT,
116                            port_aux);
117 
118   encode_payload_item_string(buffer, PIT_UUID,
119                              uuid.c_str(),
120                              uuid.length());
121 
122   encode_payload_item_string(buffer, PIT_GCS_ID,
123                              gcs_member_id->get_member_id().c_str(),
124                              gcs_member_id->get_member_id().length());
125 
126   char status_aux= (uchar)status;
127   encode_payload_item_char(buffer, PIT_STATUS,
128                            status_aux);
129 
130   uint32 version_aux= (uint32)member_version->get_version();
131   encode_payload_item_int4(buffer, PIT_VERSION,
132                            version_aux);
133 
134   uint16 write_set_extraction_algorithm_aux=
135       (uint16)write_set_extraction_algorithm;
136   encode_payload_item_int2(buffer, PIT_WRITE_SET_EXTRACTION_ALGORITHM,
137                            write_set_extraction_algorithm_aux);
138 
139   encode_payload_item_string(buffer, PIT_EXECUTED_GTID,
140                              executed_gtid_set.c_str(),
141                              executed_gtid_set.length());
142 
143   encode_payload_item_string(buffer, PIT_RETRIEVED_GTID,
144                              retrieved_gtid_set.c_str(),
145                              retrieved_gtid_set.length());
146 
147   encode_payload_item_int8(buffer, PIT_GTID_ASSIGNMENT_BLOCK_SIZE,
148                            gtid_assignment_block_size);
149 
150   char role_aux= (uchar)role;
151   encode_payload_item_char(buffer, PIT_MEMBER_ROLE, role_aux);
152 
153   uint32 configuration_flags_aux= (uint32)configuration_flags;
154   encode_payload_item_int4(buffer, PIT_CONFIGURATION_FLAGS,
155                            configuration_flags_aux);
156 
157   /*
158     MySQL 5.7.18+ payloads
159   */
160   char conflict_detection_enable_aux= conflict_detection_enable ? '1' : '0';
161   encode_payload_item_char(buffer, PIT_CONFLICT_DETECTION_ENABLE,
162                            conflict_detection_enable_aux);
163 
164   uint16 member_weight_aux= (uint16)member_weight;
165   encode_payload_item_int2(buffer, PIT_MEMBER_WEIGHT,
166                            member_weight_aux);
167 
168   uint16 lower_case_table_names_aux= static_cast <uint16> (lower_case_table_names);
169 #ifndef NDEBUG
170   if (lower_case_table_names != SKIP_ENCODING_LOWER_CASE_TABLE_NAMES)
171 #endif
172   encode_payload_item_int2(buffer, PIT_LOWER_CASE_TABLE_NAME,
173                            lower_case_table_names_aux);
174 
175   DBUG_VOID_RETURN;
176 }
177 
178 void
decode_payload(const unsigned char * buffer,const unsigned char * end)179 Group_member_info::decode_payload(const unsigned char* buffer,
180                                   const unsigned char* end)
181 {
182   DBUG_ENTER("Group_member_info::decode_payload");
183   const unsigned char *slider= buffer;
184   uint16 payload_item_type= 0;
185   unsigned long long payload_item_length= 0;
186 
187   decode_payload_item_string(&slider,
188                              &payload_item_type,
189                              &hostname,
190                              &payload_item_length);
191 
192   uint16 port_aux= 0;
193   decode_payload_item_int2(&slider,
194                            &payload_item_type,
195                            &port_aux);
196   port= (uint)port_aux;
197 
198   decode_payload_item_string(&slider,
199                              &payload_item_type,
200                              &uuid,
201                              &payload_item_length);
202 
203   std::string gcs_member_id_aux;
204   decode_payload_item_string(&slider,
205                              &payload_item_type,
206                              &gcs_member_id_aux,
207                              &payload_item_length);
208   delete gcs_member_id;
209   gcs_member_id= new Gcs_member_identifier(gcs_member_id_aux);
210 
211   unsigned char status_aux= 0;
212   decode_payload_item_char(&slider,
213                            &payload_item_type,
214                            &status_aux);
215   status= (Group_member_status)status_aux;
216 
217   uint32 member_version_aux= 0;
218   decode_payload_item_int4(&slider,
219                            &payload_item_type,
220                            &member_version_aux);
221   delete member_version;
222   member_version= new Member_version(member_version_aux);
223 
224   uint16 write_set_extraction_algorithm_aux= 0;
225   decode_payload_item_int2(&slider,
226                            &payload_item_type,
227                            &write_set_extraction_algorithm_aux);
228   write_set_extraction_algorithm= (uint)write_set_extraction_algorithm_aux;
229 
230   decode_payload_item_string(&slider,
231                              &payload_item_type,
232                              &executed_gtid_set,
233                              &payload_item_length);
234 
235   decode_payload_item_string(&slider,
236                              &payload_item_type,
237                              &retrieved_gtid_set,
238                              &payload_item_length);
239 
240   decode_payload_item_int8(&slider,
241                            &payload_item_type,
242                            &gtid_assignment_block_size);
243 
244   unsigned char role_aux= 0;
245   decode_payload_item_char(&slider,
246                            &payload_item_type,
247                            &role_aux);
248   role= (Group_member_role)role_aux;
249 
250   uint32 configuration_flags_aux= 0;
251   decode_payload_item_int4(&slider,
252                            &payload_item_type,
253                            &configuration_flags_aux);
254   configuration_flags= configuration_flags_aux;
255 
256   /*
257     MySQL 5.7.18+ payloads
258     We need to check if there are more payload items to read, if the member
259     info message was send by a lower version member, there will not.
260   */
261   while (slider + Plugin_gcs_message::WIRE_PAYLOAD_ITEM_HEADER_SIZE <= end)
262   {
263     // Read payload item header to find payload item length.
264     decode_payload_item_type_and_length(&slider,
265                                         &payload_item_type,
266                                         &payload_item_length);
267 
268     switch (payload_item_type)
269     {
270       case PIT_CONFLICT_DETECTION_ENABLE:
271         if (slider + payload_item_length <= end)
272         {
273           unsigned char conflict_detection_enable_aux= *slider;
274           slider += payload_item_length;
275           conflict_detection_enable=
276               (conflict_detection_enable_aux == '1') ? true : false;
277         }
278         break;
279 
280       case PIT_MEMBER_WEIGHT:
281         if (slider + payload_item_length <= end)
282         {
283           uint16 member_weight_aux= uint2korr(slider);
284           slider += payload_item_length;
285           member_weight= (uint)member_weight_aux;
286         }
287         break;
288 
289       case PIT_LOWER_CASE_TABLE_NAME:
290         if (slider + payload_item_length <= end)
291         {
292           uint16 lower_case_table_names_aux= uint2korr(slider);
293           slider += payload_item_length;
294           lower_case_table_names= static_cast <uint>(lower_case_table_names_aux);
295         }
296         break;
297     }
298   }
299 
300   DBUG_VOID_RETURN;
301 }
302 
303 const string&
get_hostname()304 Group_member_info::get_hostname()
305 {
306   return hostname;
307 }
308 
309 uint
get_port()310 Group_member_info::get_port()
311 {
312   return port;
313 }
314 
315 const string&
get_uuid()316 Group_member_info::get_uuid()
317 {
318   return uuid;
319 }
320 
321 Group_member_info::Group_member_status
get_recovery_status()322 Group_member_info::get_recovery_status()
323 {
324   return status;
325 }
326 
327 Group_member_info::Group_member_role
get_role()328 Group_member_info::get_role()
329 {
330   return role;
331 }
332 
333 const Gcs_member_identifier&
get_gcs_member_id()334 Group_member_info::get_gcs_member_id()
335 {
336   return *gcs_member_id;
337 }
338 
339 void
update_recovery_status(Group_member_status new_status)340 Group_member_info::update_recovery_status(Group_member_status new_status)
341 {
342   status= new_status;
343 }
344 
345 void
update_gtid_sets(std::string & executed_gtids,std::string & retrieved_gtids)346 Group_member_info::update_gtid_sets(std::string& executed_gtids,
347                                     std::string& retrieved_gtids)
348 {
349   executed_gtid_set.assign(executed_gtids);
350   retrieved_gtid_set.assign(retrieved_gtids);
351 }
352 
353 void
set_role(Group_member_role new_role)354 Group_member_info::set_role(Group_member_role new_role)
355 {
356   role= new_role;
357 }
358 
359 const Member_version&
get_member_version()360 Group_member_info::get_member_version()
361 {
362   return *member_version;
363 }
364 
365 const std::string&
get_gtid_executed()366 Group_member_info::get_gtid_executed()
367 {
368   return executed_gtid_set;
369 }
370 
371 const std::string&
get_gtid_retrieved()372 Group_member_info::get_gtid_retrieved()
373 {
374   return retrieved_gtid_set;
375 }
376 
377 uint
get_write_set_extraction_algorithm()378 Group_member_info::get_write_set_extraction_algorithm()
379 {
380   return write_set_extraction_algorithm;
381 }
382 
383 ulonglong
get_gtid_assignment_block_size()384 Group_member_info::get_gtid_assignment_block_size()
385 {
386   return gtid_assignment_block_size;
387 }
388 
389 uint32
get_configuration_flags()390 Group_member_info::get_configuration_flags()
391 {
392   return configuration_flags;
393 }
394 
395 uint
get_lower_case_table_names() const396 Group_member_info::get_lower_case_table_names() const
397 {
398   return lower_case_table_names;
399 }
400 
in_primary_mode()401 bool Group_member_info::in_primary_mode()
402 {
403   return get_configuration_flags() & CNF_SINGLE_PRIMARY_MODE_F;
404 }
405 
has_enforces_update_everywhere_checks()406 bool Group_member_info::has_enforces_update_everywhere_checks()
407 {
408   return get_configuration_flags() & CNF_ENFORCE_UPDATE_EVERYWHERE_CHECKS_F;
409 }
410 
411 bool
is_unreachable()412 Group_member_info::is_unreachable()
413 {
414   return unreachable;
415 }
416 
417 void
set_unreachable()418 Group_member_info::set_unreachable()
419 {
420   unreachable= true;
421 }
422 
423 void
set_reachable()424 Group_member_info::set_reachable()
425 {
426   unreachable= false;
427 }
428 
enable_conflict_detection()429 void Group_member_info::enable_conflict_detection()
430 {
431   conflict_detection_enable= true;
432 }
433 
disable_conflict_detection()434 void Group_member_info::disable_conflict_detection()
435 {
436   conflict_detection_enable= false;
437 }
438 
is_conflict_detection_enabled()439 bool Group_member_info::is_conflict_detection_enabled()
440 {
441   return conflict_detection_enable;
442 }
443 
set_member_weight(uint new_member_weight)444 void Group_member_info::set_member_weight(uint new_member_weight)
445 {
446   member_weight= new_member_weight;
447 }
448 
get_member_weight()449 uint Group_member_info::get_member_weight()
450 {
451   return member_weight;
452 }
453 
454 bool
operator ==(Group_member_info & other)455 Group_member_info::operator ==(Group_member_info& other)
456 {
457   return this->get_uuid().compare(other.get_uuid()) == 0;
458 }
459 
460 const char*
get_member_status_string(Group_member_status status)461 Group_member_info::get_member_status_string(Group_member_status status)
462 {
463   switch(status)
464   {
465     case MEMBER_ONLINE:
466       return "ONLINE";
467     case MEMBER_OFFLINE:
468       return "OFFLINE";
469     case MEMBER_IN_RECOVERY:
470       return "RECOVERING";
471     case MEMBER_ERROR:
472       return "ERROR";
473     case MEMBER_UNREACHABLE:
474       return "UNREACHABLE";
475     default:
476       return "OFFLINE"; /* purecov: inspected */
477   }
478 }
479 
480 const char*
get_configuration_flag_string(const uint32 configuation_flag)481 Group_member_info::get_configuration_flag_string(const uint32 configuation_flag)
482 {
483   switch (configuation_flag)
484   {
485   case 0:
486     return "";
487   case CNF_ENFORCE_UPDATE_EVERYWHERE_CHECKS_F:
488     return "group_replication_enforce_update_everywhere_checks";
489   case CNF_SINGLE_PRIMARY_MODE_F:
490     return "group_replication_single_primary_mode";
491   default:
492     return "UNKNOWN"; /* purecov: inspected */
493   }
494 }
495 
496 std::string
get_configuration_flags_string(const uint32 configuation_flags)497 Group_member_info::get_configuration_flags_string(const uint32 configuation_flags)
498 {
499   std::string result;
500   uint32 configuration_flags_mask = 1;
501 
502   while (configuration_flags_mask > 0)
503   {
504     const uint32 current_flag = configuration_flags_mask & configuation_flags;
505     const char* current_flag_name = get_configuration_flag_string(current_flag);
506 
507     if (current_flag)
508     {
509       if (!result.empty())
510         result += ","; /* purecov: inspected */
511 
512       result += current_flag_name;
513     }
514 
515     configuration_flags_mask = configuration_flags_mask << 1;
516   }
517 
518   return result;
519 }
520 
521 bool
comparator_group_member_version(Group_member_info * m1,Group_member_info * m2)522 Group_member_info::comparator_group_member_version(Group_member_info *m1,
523                                                    Group_member_info *m2)
524 {
525   return m2->has_greater_version(m1);
526 }
527 
528 bool
comparator_group_member_uuid(Group_member_info * m1,Group_member_info * m2)529 Group_member_info::comparator_group_member_uuid(Group_member_info *m1,
530                                                 Group_member_info *m2)
531 {
532   return m1->has_lower_uuid(m2);
533 }
534 
535 bool
comparator_group_member_weight(Group_member_info * m1,Group_member_info * m2)536 Group_member_info::comparator_group_member_weight(Group_member_info *m1,
537                                                   Group_member_info *m2)
538 {
539   return m1->has_greater_weight(m2);
540 }
541 
542 bool
has_greater_version(Group_member_info * other)543 Group_member_info::has_greater_version(Group_member_info *other)
544 {
545   if (*member_version > *(other->member_version))
546     return true;
547 
548   return false;
549 }
550 
551 bool
has_lower_uuid(Group_member_info * other)552 Group_member_info::has_lower_uuid(Group_member_info *other)
553 {
554   return this->get_uuid().compare(other->get_uuid()) < 0;
555 }
556 
557 bool
has_greater_weight(Group_member_info * other)558 Group_member_info::has_greater_weight(Group_member_info *other)
559 {
560   if (this->get_member_weight() > other->get_member_weight())
561     return true;
562 
563   if (this->get_member_weight() == other->get_member_weight())
564     return has_lower_uuid(other);
565 
566   return false;
567 }
568 
569 Group_member_info_manager::
Group_member_info_manager(Group_member_info * local_member_info)570 Group_member_info_manager(Group_member_info* local_member_info)
571 {
572   members= new map<string, Group_member_info*>();
573   this->local_member_info= local_member_info;
574 
575   mysql_mutex_init(key_GR_LOCK_group_info_manager, &update_lock,
576                    MY_MUTEX_INIT_FAST);
577 
578   add(local_member_info);
579 }
580 
~Group_member_info_manager()581 Group_member_info_manager::~Group_member_info_manager()
582 {
583   clear_members();
584   delete members;
585 }
586 
587 size_t
get_number_of_members()588 Group_member_info_manager::get_number_of_members()
589 {
590   return members->size();
591 }
592 
593 Group_member_info*
get_group_member_info(const string & uuid)594 Group_member_info_manager::get_group_member_info(const string& uuid)
595 {
596   Group_member_info* member= NULL;
597   mysql_mutex_lock(&update_lock);
598 
599   map<string, Group_member_info*>::iterator it;
600 
601   it= members->find(uuid);
602 
603   if(it != members->end())
604   {
605     member= (*it).second;
606   }
607 
608   Group_member_info* member_copy= NULL;
609   if(member != NULL)
610   {
611     member_copy= new Group_member_info(*member);
612   }
613 
614   mysql_mutex_unlock(&update_lock);
615 
616   return member_copy;
617 }
618 
619 Group_member_info*
get_group_member_info_by_index(int idx)620 Group_member_info_manager::get_group_member_info_by_index(int idx)
621 {
622   Group_member_info* member= NULL;
623 
624   mysql_mutex_lock(&update_lock);
625 
626   map<string, Group_member_info*>::iterator it;
627   if(idx < (int)members->size())
628   {
629     int i= 0;
630     for(it= members->begin(); i <= idx; i++, it++)
631     {
632       member= (*it).second;
633     }
634   }
635 
636   Group_member_info* member_copy= NULL;
637   if(member != NULL)
638   {
639     member_copy= new Group_member_info(*member);
640   }
641   mysql_mutex_unlock(&update_lock);
642 
643   return member_copy;
644 }
645 
646 Group_member_info*
647 Group_member_info_manager::
get_group_member_info_by_member_id(Gcs_member_identifier idx)648 get_group_member_info_by_member_id(Gcs_member_identifier idx)
649 {
650   Group_member_info* member= NULL;
651 
652   mysql_mutex_lock(&update_lock);
653 
654   map<string, Group_member_info*>::iterator it;
655   for(it= members->begin(); it != members->end(); it++)
656   {
657     if((*it).second->get_gcs_member_id() == idx)
658     {
659       member= new Group_member_info(*(*it).second);
660       break;
661     }
662   }
663 
664   mysql_mutex_unlock(&update_lock);
665   return member;
666 }
667 
668 vector<Group_member_info*>*
get_all_members()669 Group_member_info_manager::get_all_members()
670 {
671   mysql_mutex_lock(&update_lock);
672 
673   vector<Group_member_info*>* all_members= new vector<Group_member_info*>();
674   map<string, Group_member_info*>::iterator it;
675   for(it= members->begin(); it != members->end(); it++)
676   {
677     Group_member_info* member_copy = new Group_member_info(*(*it).second);
678     all_members->push_back(member_copy);
679   }
680 
681   mysql_mutex_unlock(&update_lock);
682   return all_members;
683 }
684 
685 void
add(Group_member_info * new_member)686 Group_member_info_manager::add(Group_member_info* new_member)
687 {
688   mysql_mutex_lock(&update_lock);
689 
690   (*members)[new_member->get_uuid()]= new_member;
691 
692   mysql_mutex_unlock(&update_lock);
693 }
694 
695 void
update(vector<Group_member_info * > * new_members)696 Group_member_info_manager::update(vector<Group_member_info*>* new_members)
697 {
698   mysql_mutex_lock(&update_lock);
699 
700   this->clear_members();
701 
702   vector<Group_member_info*>::iterator new_members_it;
703   for(new_members_it= new_members->begin();
704       new_members_it != new_members->end();
705       new_members_it++)
706   {
707     //If this bears the local member to be updated
708     // It will add the current reference and update its status
709     if(*(*new_members_it) == *local_member_info)
710     {
711       local_member_info
712           ->update_recovery_status((*new_members_it)->get_recovery_status());
713 
714       delete (*new_members_it);
715 
716       continue;
717     }
718 
719     (*members)[(*new_members_it)->get_uuid()]= (*new_members_it);
720   }
721 
722   mysql_mutex_unlock(&update_lock);
723 }
724 
725 void
726 Group_member_info_manager::
update_member_status(const string & uuid,Group_member_info::Group_member_status new_status)727 update_member_status(const string& uuid,
728                      Group_member_info::Group_member_status new_status)
729 {
730   mysql_mutex_lock(&update_lock);
731 
732   map<string, Group_member_info*>::iterator it;
733 
734   it= members->find(uuid);
735 
736   if(it != members->end())
737   {
738     (*it).second->update_recovery_status(new_status);
739   }
740 
741   mysql_mutex_unlock(&update_lock);
742 }
743 
744 void
745 Group_member_info_manager::
set_member_unreachable(const std::string & uuid)746 set_member_unreachable(const std::string &uuid)
747 {
748   mysql_mutex_lock(&update_lock);
749 
750   map<string, Group_member_info*>::iterator it = members->find(uuid);
751   if (it != members->end()) {
752     (*it).second->set_unreachable();
753   }
754 
755   mysql_mutex_unlock(&update_lock);
756 }
757 
758 void
759 Group_member_info_manager::
set_member_reachable(const std::string & uuid)760 set_member_reachable(const std::string &uuid)
761 {
762   mysql_mutex_lock(&update_lock);
763 
764   map<string, Group_member_info*>::iterator it = members->find(uuid);
765   if (it != members->end()) {
766     (*it).second->set_reachable();
767   }
768 
769   mysql_mutex_unlock(&update_lock);
770 }
771 
772 void
773 Group_member_info_manager::
update_gtid_sets(const string & uuid,string & gtid_executed,string & gtid_retrieved)774 update_gtid_sets(const string& uuid,
775                  string& gtid_executed,
776                  string& gtid_retrieved)
777 {
778   mysql_mutex_lock(&update_lock);
779 
780   map<string, Group_member_info*>::iterator it;
781 
782   it= members->find(uuid);
783 
784   if(it != members->end())
785   {
786     (*it).second->update_gtid_sets(gtid_executed, gtid_retrieved);
787   }
788 
789   mysql_mutex_unlock(&update_lock);
790 }
791 
792 void
793 Group_member_info_manager::
update_member_role(const string & uuid,Group_member_info::Group_member_role new_role)794 update_member_role(const string& uuid,
795                    Group_member_info::Group_member_role new_role)
796 {
797   mysql_mutex_lock(&update_lock);
798 
799   map<string, Group_member_info*>::iterator it;
800 
801   it= members->find(uuid);
802 
803   if(it != members->end())
804   {
805     (*it).second->set_role(new_role);
806   }
807 
808   mysql_mutex_unlock(&update_lock);
809 }
810 
811 
812 void
clear_members()813 Group_member_info_manager::clear_members()
814 {
815   map<string, Group_member_info*>::iterator it= members->begin();
816   while (it != members->end()) {
817     if((*it).second == local_member_info)
818     {
819       ++it;
820       continue;
821     }
822 
823     delete (*it).second;
824     members->erase(it++);
825   }
826 }
827 
is_conflict_detection_enabled()828 bool Group_member_info_manager::is_conflict_detection_enabled()
829 {
830   bool conflict_detection= false;
831 
832   mysql_mutex_lock(&update_lock);
833   map<string, Group_member_info*>::iterator it= members->begin();
834   while (it != members->end())
835   {
836     if ((*it).second != local_member_info)
837     {
838       conflict_detection|= (*it).second->is_conflict_detection_enabled();
839     }
840     ++it;
841   }
842   mysql_mutex_unlock(&update_lock);
843 
844   return conflict_detection;
845 }
846 
847 
848 void
encode(vector<uchar> * to_encode)849 Group_member_info_manager::encode(vector<uchar>* to_encode)
850 {
851   Group_member_info_manager_message *group_info_message=
852     new Group_member_info_manager_message(*this);
853   group_info_message->encode(to_encode);
854   delete group_info_message;
855 }
856 
857 vector<Group_member_info*>*
decode(const uchar * to_decode,uint64 length)858 Group_member_info_manager::decode(const uchar* to_decode, uint64 length)
859 {
860   vector<Group_member_info*>* decoded_members= NULL;
861 
862   Group_member_info_manager_message *group_info_message=
863     new Group_member_info_manager_message();
864   group_info_message->decode(to_decode, length);
865   decoded_members= group_info_message->get_all_members();
866   delete group_info_message;
867 
868   return decoded_members;
869 }
870 
871 void
872 Group_member_info_manager::
get_primary_member_uuid(std::string & primary_member_uuid)873 get_primary_member_uuid(std::string &primary_member_uuid)
874 {
875   map<string, Group_member_info*>::iterator it= members->begin();
876 
877   for (it= members->begin(); it != members->end(); it++)
878   {
879     Group_member_info* info= (*it).second;
880     if (info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY)
881     {
882       assert(primary_member_uuid.empty());
883       primary_member_uuid =info->get_uuid();
884     }
885   }
886 
887   if (primary_member_uuid.empty() ||
888       Group_member_info::MEMBER_ERROR ==
889         local_member_info->get_recovery_status())
890     primary_member_uuid= "UNDEFINED";
891 }
892 
is_majority_unreachable()893 bool Group_member_info_manager::is_majority_unreachable()
894 {
895   bool ret= false;
896   int unreachables= 0;
897 
898   mysql_mutex_lock(&update_lock);
899   map<string, Group_member_info*>::iterator it= members->begin();
900 
901   for (it= members->begin(); it != members->end(); it++)
902   {
903     Group_member_info* info= (*it).second;
904     if (info->is_unreachable())
905       unreachables++;
906   }
907   ret= (members->size() - unreachables) <= (members->size() / 2);
908   mysql_mutex_unlock(&update_lock);
909 
910   return ret;
911 }
912 
913 std::string
get_string_current_view_active_hosts() const914 Group_member_info_manager::get_string_current_view_active_hosts() const
915 {
916   std::stringstream hosts_string;
917   map<string, Group_member_info*>::iterator all_members_it= members->begin();
918   bool first_entry = true;
919 
920   while (all_members_it != members->end())
921   {
922     Group_member_info* member_info= (*all_members_it).second;
923     if (!first_entry)
924       hosts_string << ", ";
925     else
926       first_entry = false;
927     hosts_string << member_info->get_hostname() << ":" << member_info->get_port();
928     all_members_it++;
929   }
930 
931   return hosts_string.str();
932 }
933 
Group_member_info_manager_message()934 Group_member_info_manager_message::Group_member_info_manager_message()
935   : Plugin_gcs_message(CT_MEMBER_INFO_MANAGER_MESSAGE)
936 {
937   DBUG_ENTER("Group_member_info_manager_message::Group_member_info_manager_message");
938   members= new vector<Group_member_info*>();
939   DBUG_VOID_RETURN;
940 }
941 
Group_member_info_manager_message(Group_member_info_manager & group_info)942 Group_member_info_manager_message::Group_member_info_manager_message(
943     Group_member_info_manager& group_info)
944   : Plugin_gcs_message(CT_MEMBER_INFO_MANAGER_MESSAGE),
945     members(group_info.get_all_members())
946 {
947   DBUG_ENTER("Group_member_info_manager_message::Group_member_info_manager_message");
948   DBUG_VOID_RETURN;
949 }
950 
Group_member_info_manager_message(Group_member_info * member_info)951 Group_member_info_manager_message::Group_member_info_manager_message(
952     Group_member_info* member_info)
953   : Plugin_gcs_message(CT_MEMBER_INFO_MANAGER_MESSAGE),
954     members(NULL)
955 {
956   DBUG_ENTER("Group_member_info_manager_message::Group_member_info_manager_message");
957   members= new vector<Group_member_info*>();
958   members->push_back(member_info);
959   DBUG_VOID_RETURN;
960 }
961 
~Group_member_info_manager_message()962 Group_member_info_manager_message::~Group_member_info_manager_message()
963 {
964   DBUG_ENTER("Group_member_info_manager_message::~Group_member_info_manager_message");
965   clear_members();
966   delete members;
967   DBUG_VOID_RETURN;
968 }
969 
clear_members()970 void Group_member_info_manager_message::clear_members()
971 {
972   DBUG_ENTER("Group_member_info_manager_message::clear_members");
973   std::vector<Group_member_info*>::iterator it;
974   for(it= members->begin(); it != members->end(); it++)
975   {
976     delete (*it);
977   }
978   members->clear();
979   DBUG_VOID_RETURN;
980 }
981 
982 std::vector<Group_member_info*>*
get_all_members()983 Group_member_info_manager_message::get_all_members()
984 {
985   DBUG_ENTER("Group_member_info_manager_message::get_all_members");
986   vector<Group_member_info*>* all_members= new vector<Group_member_info*>();
987 
988   std::vector<Group_member_info*>::iterator it;
989   for(it= members->begin(); it != members->end(); it++)
990   {
991     Group_member_info* member_copy = new Group_member_info(*(*it));
992     all_members->push_back(member_copy);
993   }
994 
995   DBUG_RETURN(all_members);
996 }
997 
998 void
encode_payload(std::vector<unsigned char> * buffer) const999 Group_member_info_manager_message::encode_payload(std::vector<unsigned char>* buffer) const
1000 {
1001   DBUG_ENTER("Group_member_info_manager_message::encode_payload");
1002 
1003   uint16 number_of_members= (uint16)members->size();
1004   encode_payload_item_int2(buffer, PIT_MEMBERS_NUMBER,
1005                            number_of_members);
1006 
1007   std::vector<Group_member_info*>::iterator it;
1008   for(it= members->begin(); it != members->end(); it++)
1009   {
1010     std::vector<uchar> encoded_member;
1011     (*it)->encode(&encoded_member);
1012 
1013     encode_payload_item_type_and_length(buffer, PIT_MEMBER_DATA,
1014                                         encoded_member.size());
1015     buffer->insert(buffer->end(), encoded_member.begin(), encoded_member.end());
1016   }
1017 
1018   DBUG_VOID_RETURN;
1019 }
1020 
1021 void
decode_payload(const unsigned char * buffer,const unsigned char * end)1022 Group_member_info_manager_message::decode_payload(const unsigned char* buffer,
1023                                                   const unsigned char* end)
1024 {
1025   DBUG_ENTER("Group_member_info_manager_message::decode_payload");
1026   const unsigned char *slider= buffer;
1027   uint16 payload_item_type= 0;
1028   unsigned long long payload_item_length= 0;
1029 
1030   uint16 number_of_members= 0;
1031   decode_payload_item_int2(&slider,
1032                            &payload_item_type,
1033                            &number_of_members);
1034 
1035   clear_members();
1036   for(uint16 i= 0; i < number_of_members; i++)
1037   {
1038     decode_payload_item_type_and_length(&slider,
1039                                         &payload_item_type,
1040                                         &payload_item_length);
1041     Group_member_info* member= new Group_member_info(slider,
1042                                                      payload_item_length);
1043     members->push_back(member);
1044     slider+= payload_item_length;
1045   }
1046 
1047   DBUG_VOID_RETURN;
1048 }
1049