1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "member_info.h"
24 #include "plugin_psi.h"
25
26 using std::string;
27 using std::vector;
28 using std::map;
29
30 Group_member_info::
Group_member_info(char * hostname_arg,uint port_arg,char * uuid_arg,int write_set_extraction_algorithm_arg,const std::string & gcs_member_id_arg,Group_member_info::Group_member_status status_arg,Member_version & member_version_arg,ulonglong gtid_assignment_block_size_arg,Group_member_info::Group_member_role role_arg,bool in_single_primary_mode,bool has_enforces_update_everywhere_checks,uint member_weight_arg,uint lower_case_table_names_arg)31 Group_member_info(char* hostname_arg,
32 uint port_arg,
33 char* uuid_arg,
34 int write_set_extraction_algorithm_arg,
35 const std::string& gcs_member_id_arg,
36 Group_member_info::Group_member_status status_arg,
37 Member_version& member_version_arg,
38 ulonglong gtid_assignment_block_size_arg,
39 Group_member_info::Group_member_role role_arg,
40 bool in_single_primary_mode,
41 bool has_enforces_update_everywhere_checks,
42 uint member_weight_arg,
43 uint lower_case_table_names_arg)
44 : Plugin_gcs_message(CT_MEMBER_INFO_MESSAGE),
45 hostname(hostname_arg), port(port_arg), uuid(uuid_arg),
46 status(status_arg),
47 write_set_extraction_algorithm(write_set_extraction_algorithm_arg),
48 gtid_assignment_block_size(gtid_assignment_block_size_arg),
49 unreachable(false),
50 role(role_arg),
51 configuration_flags(0), conflict_detection_enable(!in_single_primary_mode),
52 member_weight(member_weight_arg),
53 lower_case_table_names(lower_case_table_names_arg)
54 {
55 gcs_member_id= new Gcs_member_identifier(gcs_member_id_arg);
56 member_version= new Member_version(member_version_arg.get_version());
57
58 /* Handle single_primary_mode */
59 if (in_single_primary_mode)
60 configuration_flags |= CNF_SINGLE_PRIMARY_MODE_F;
61
62 /* Handle enforce_update_everywhere_checks */
63 if (has_enforces_update_everywhere_checks)
64 configuration_flags |= CNF_ENFORCE_UPDATE_EVERYWHERE_CHECKS_F;
65 }
66
Group_member_info(Group_member_info & other)67 Group_member_info::Group_member_info(Group_member_info& other)
68 : Plugin_gcs_message(CT_MEMBER_INFO_MESSAGE),
69 hostname(other.get_hostname()),
70 port(other.get_port()),
71 uuid(other.get_uuid()),
72 status(other.get_recovery_status()),
73 executed_gtid_set(other.get_gtid_executed()),
74 retrieved_gtid_set(other.get_gtid_retrieved()),
75 write_set_extraction_algorithm(other.get_write_set_extraction_algorithm()),
76 gtid_assignment_block_size(other.get_gtid_assignment_block_size()),
77 unreachable(other.is_unreachable()),
78 role(other.get_role()),
79 configuration_flags(other.get_configuration_flags()),
80 conflict_detection_enable(other.is_conflict_detection_enabled()),
81 member_weight(other.get_member_weight()),
82 lower_case_table_names(other.get_lower_case_table_names())
83 {
84 gcs_member_id= new Gcs_member_identifier(other.get_gcs_member_id()
85 .get_member_id());
86 member_version= new Member_version(other.get_member_version()
87 .get_version());
88 }
89
Group_member_info(const uchar * data,uint64 len)90 Group_member_info::Group_member_info(const uchar* data, uint64 len)
91 : Plugin_gcs_message(CT_MEMBER_INFO_MESSAGE),
92 gcs_member_id(NULL), member_version(NULL),
93 unreachable(false),
94 lower_case_table_names(DEFAULT_NOT_RECEIVED_LOWER_CASE_TABLE_NAMES)
95 {
96 decode(data, len);
97 }
98
~Group_member_info()99 Group_member_info::~Group_member_info()
100 {
101 delete gcs_member_id;
102 delete member_version;
103 }
104
105 void
encode_payload(std::vector<unsigned char> * buffer) const106 Group_member_info::encode_payload(std::vector<unsigned char>* buffer) const
107 {
108 DBUG_ENTER("Group_member_info::encode_payload");
109
110 encode_payload_item_string(buffer, PIT_HOSTNAME,
111 hostname.c_str(),
112 hostname.length());
113
114 uint16 port_aux= (uint16)port;
115 encode_payload_item_int2(buffer, PIT_PORT,
116 port_aux);
117
118 encode_payload_item_string(buffer, PIT_UUID,
119 uuid.c_str(),
120 uuid.length());
121
122 encode_payload_item_string(buffer, PIT_GCS_ID,
123 gcs_member_id->get_member_id().c_str(),
124 gcs_member_id->get_member_id().length());
125
126 char status_aux= (uchar)status;
127 encode_payload_item_char(buffer, PIT_STATUS,
128 status_aux);
129
130 uint32 version_aux= (uint32)member_version->get_version();
131 encode_payload_item_int4(buffer, PIT_VERSION,
132 version_aux);
133
134 uint16 write_set_extraction_algorithm_aux=
135 (uint16)write_set_extraction_algorithm;
136 encode_payload_item_int2(buffer, PIT_WRITE_SET_EXTRACTION_ALGORITHM,
137 write_set_extraction_algorithm_aux);
138
139 encode_payload_item_string(buffer, PIT_EXECUTED_GTID,
140 executed_gtid_set.c_str(),
141 executed_gtid_set.length());
142
143 encode_payload_item_string(buffer, PIT_RETRIEVED_GTID,
144 retrieved_gtid_set.c_str(),
145 retrieved_gtid_set.length());
146
147 encode_payload_item_int8(buffer, PIT_GTID_ASSIGNMENT_BLOCK_SIZE,
148 gtid_assignment_block_size);
149
150 char role_aux= (uchar)role;
151 encode_payload_item_char(buffer, PIT_MEMBER_ROLE, role_aux);
152
153 uint32 configuration_flags_aux= (uint32)configuration_flags;
154 encode_payload_item_int4(buffer, PIT_CONFIGURATION_FLAGS,
155 configuration_flags_aux);
156
157 /*
158 MySQL 5.7.18+ payloads
159 */
160 char conflict_detection_enable_aux= conflict_detection_enable ? '1' : '0';
161 encode_payload_item_char(buffer, PIT_CONFLICT_DETECTION_ENABLE,
162 conflict_detection_enable_aux);
163
164 uint16 member_weight_aux= (uint16)member_weight;
165 encode_payload_item_int2(buffer, PIT_MEMBER_WEIGHT,
166 member_weight_aux);
167
168 uint16 lower_case_table_names_aux= static_cast <uint16> (lower_case_table_names);
169 #ifndef NDEBUG
170 if (lower_case_table_names != SKIP_ENCODING_LOWER_CASE_TABLE_NAMES)
171 #endif
172 encode_payload_item_int2(buffer, PIT_LOWER_CASE_TABLE_NAME,
173 lower_case_table_names_aux);
174
175 DBUG_VOID_RETURN;
176 }
177
178 void
decode_payload(const unsigned char * buffer,const unsigned char * end)179 Group_member_info::decode_payload(const unsigned char* buffer,
180 const unsigned char* end)
181 {
182 DBUG_ENTER("Group_member_info::decode_payload");
183 const unsigned char *slider= buffer;
184 uint16 payload_item_type= 0;
185 unsigned long long payload_item_length= 0;
186
187 decode_payload_item_string(&slider,
188 &payload_item_type,
189 &hostname,
190 &payload_item_length);
191
192 uint16 port_aux= 0;
193 decode_payload_item_int2(&slider,
194 &payload_item_type,
195 &port_aux);
196 port= (uint)port_aux;
197
198 decode_payload_item_string(&slider,
199 &payload_item_type,
200 &uuid,
201 &payload_item_length);
202
203 std::string gcs_member_id_aux;
204 decode_payload_item_string(&slider,
205 &payload_item_type,
206 &gcs_member_id_aux,
207 &payload_item_length);
208 delete gcs_member_id;
209 gcs_member_id= new Gcs_member_identifier(gcs_member_id_aux);
210
211 unsigned char status_aux= 0;
212 decode_payload_item_char(&slider,
213 &payload_item_type,
214 &status_aux);
215 status= (Group_member_status)status_aux;
216
217 uint32 member_version_aux= 0;
218 decode_payload_item_int4(&slider,
219 &payload_item_type,
220 &member_version_aux);
221 delete member_version;
222 member_version= new Member_version(member_version_aux);
223
224 uint16 write_set_extraction_algorithm_aux= 0;
225 decode_payload_item_int2(&slider,
226 &payload_item_type,
227 &write_set_extraction_algorithm_aux);
228 write_set_extraction_algorithm= (uint)write_set_extraction_algorithm_aux;
229
230 decode_payload_item_string(&slider,
231 &payload_item_type,
232 &executed_gtid_set,
233 &payload_item_length);
234
235 decode_payload_item_string(&slider,
236 &payload_item_type,
237 &retrieved_gtid_set,
238 &payload_item_length);
239
240 decode_payload_item_int8(&slider,
241 &payload_item_type,
242 >id_assignment_block_size);
243
244 unsigned char role_aux= 0;
245 decode_payload_item_char(&slider,
246 &payload_item_type,
247 &role_aux);
248 role= (Group_member_role)role_aux;
249
250 uint32 configuration_flags_aux= 0;
251 decode_payload_item_int4(&slider,
252 &payload_item_type,
253 &configuration_flags_aux);
254 configuration_flags= configuration_flags_aux;
255
256 /*
257 MySQL 5.7.18+ payloads
258 We need to check if there are more payload items to read, if the member
259 info message was send by a lower version member, there will not.
260 */
261 while (slider + Plugin_gcs_message::WIRE_PAYLOAD_ITEM_HEADER_SIZE <= end)
262 {
263 // Read payload item header to find payload item length.
264 decode_payload_item_type_and_length(&slider,
265 &payload_item_type,
266 &payload_item_length);
267
268 switch (payload_item_type)
269 {
270 case PIT_CONFLICT_DETECTION_ENABLE:
271 if (slider + payload_item_length <= end)
272 {
273 unsigned char conflict_detection_enable_aux= *slider;
274 slider += payload_item_length;
275 conflict_detection_enable=
276 (conflict_detection_enable_aux == '1') ? true : false;
277 }
278 break;
279
280 case PIT_MEMBER_WEIGHT:
281 if (slider + payload_item_length <= end)
282 {
283 uint16 member_weight_aux= uint2korr(slider);
284 slider += payload_item_length;
285 member_weight= (uint)member_weight_aux;
286 }
287 break;
288
289 case PIT_LOWER_CASE_TABLE_NAME:
290 if (slider + payload_item_length <= end)
291 {
292 uint16 lower_case_table_names_aux= uint2korr(slider);
293 slider += payload_item_length;
294 lower_case_table_names= static_cast <uint>(lower_case_table_names_aux);
295 }
296 break;
297 }
298 }
299
300 DBUG_VOID_RETURN;
301 }
302
303 const string&
get_hostname()304 Group_member_info::get_hostname()
305 {
306 return hostname;
307 }
308
309 uint
get_port()310 Group_member_info::get_port()
311 {
312 return port;
313 }
314
315 const string&
get_uuid()316 Group_member_info::get_uuid()
317 {
318 return uuid;
319 }
320
321 Group_member_info::Group_member_status
get_recovery_status()322 Group_member_info::get_recovery_status()
323 {
324 return status;
325 }
326
327 Group_member_info::Group_member_role
get_role()328 Group_member_info::get_role()
329 {
330 return role;
331 }
332
333 const Gcs_member_identifier&
get_gcs_member_id()334 Group_member_info::get_gcs_member_id()
335 {
336 return *gcs_member_id;
337 }
338
339 void
update_recovery_status(Group_member_status new_status)340 Group_member_info::update_recovery_status(Group_member_status new_status)
341 {
342 status= new_status;
343 }
344
345 void
update_gtid_sets(std::string & executed_gtids,std::string & retrieved_gtids)346 Group_member_info::update_gtid_sets(std::string& executed_gtids,
347 std::string& retrieved_gtids)
348 {
349 executed_gtid_set.assign(executed_gtids);
350 retrieved_gtid_set.assign(retrieved_gtids);
351 }
352
353 void
set_role(Group_member_role new_role)354 Group_member_info::set_role(Group_member_role new_role)
355 {
356 role= new_role;
357 }
358
359 const Member_version&
get_member_version()360 Group_member_info::get_member_version()
361 {
362 return *member_version;
363 }
364
365 const std::string&
get_gtid_executed()366 Group_member_info::get_gtid_executed()
367 {
368 return executed_gtid_set;
369 }
370
371 const std::string&
get_gtid_retrieved()372 Group_member_info::get_gtid_retrieved()
373 {
374 return retrieved_gtid_set;
375 }
376
377 uint
get_write_set_extraction_algorithm()378 Group_member_info::get_write_set_extraction_algorithm()
379 {
380 return write_set_extraction_algorithm;
381 }
382
383 ulonglong
get_gtid_assignment_block_size()384 Group_member_info::get_gtid_assignment_block_size()
385 {
386 return gtid_assignment_block_size;
387 }
388
389 uint32
get_configuration_flags()390 Group_member_info::get_configuration_flags()
391 {
392 return configuration_flags;
393 }
394
395 uint
get_lower_case_table_names() const396 Group_member_info::get_lower_case_table_names() const
397 {
398 return lower_case_table_names;
399 }
400
in_primary_mode()401 bool Group_member_info::in_primary_mode()
402 {
403 return get_configuration_flags() & CNF_SINGLE_PRIMARY_MODE_F;
404 }
405
has_enforces_update_everywhere_checks()406 bool Group_member_info::has_enforces_update_everywhere_checks()
407 {
408 return get_configuration_flags() & CNF_ENFORCE_UPDATE_EVERYWHERE_CHECKS_F;
409 }
410
411 bool
is_unreachable()412 Group_member_info::is_unreachable()
413 {
414 return unreachable;
415 }
416
417 void
set_unreachable()418 Group_member_info::set_unreachable()
419 {
420 unreachable= true;
421 }
422
423 void
set_reachable()424 Group_member_info::set_reachable()
425 {
426 unreachable= false;
427 }
428
enable_conflict_detection()429 void Group_member_info::enable_conflict_detection()
430 {
431 conflict_detection_enable= true;
432 }
433
disable_conflict_detection()434 void Group_member_info::disable_conflict_detection()
435 {
436 conflict_detection_enable= false;
437 }
438
is_conflict_detection_enabled()439 bool Group_member_info::is_conflict_detection_enabled()
440 {
441 return conflict_detection_enable;
442 }
443
set_member_weight(uint new_member_weight)444 void Group_member_info::set_member_weight(uint new_member_weight)
445 {
446 member_weight= new_member_weight;
447 }
448
get_member_weight()449 uint Group_member_info::get_member_weight()
450 {
451 return member_weight;
452 }
453
454 bool
operator ==(Group_member_info & other)455 Group_member_info::operator ==(Group_member_info& other)
456 {
457 return this->get_uuid().compare(other.get_uuid()) == 0;
458 }
459
460 const char*
get_member_status_string(Group_member_status status)461 Group_member_info::get_member_status_string(Group_member_status status)
462 {
463 switch(status)
464 {
465 case MEMBER_ONLINE:
466 return "ONLINE";
467 case MEMBER_OFFLINE:
468 return "OFFLINE";
469 case MEMBER_IN_RECOVERY:
470 return "RECOVERING";
471 case MEMBER_ERROR:
472 return "ERROR";
473 case MEMBER_UNREACHABLE:
474 return "UNREACHABLE";
475 default:
476 return "OFFLINE"; /* purecov: inspected */
477 }
478 }
479
480 const char*
get_configuration_flag_string(const uint32 configuation_flag)481 Group_member_info::get_configuration_flag_string(const uint32 configuation_flag)
482 {
483 switch (configuation_flag)
484 {
485 case 0:
486 return "";
487 case CNF_ENFORCE_UPDATE_EVERYWHERE_CHECKS_F:
488 return "group_replication_enforce_update_everywhere_checks";
489 case CNF_SINGLE_PRIMARY_MODE_F:
490 return "group_replication_single_primary_mode";
491 default:
492 return "UNKNOWN"; /* purecov: inspected */
493 }
494 }
495
496 std::string
get_configuration_flags_string(const uint32 configuation_flags)497 Group_member_info::get_configuration_flags_string(const uint32 configuation_flags)
498 {
499 std::string result;
500 uint32 configuration_flags_mask = 1;
501
502 while (configuration_flags_mask > 0)
503 {
504 const uint32 current_flag = configuration_flags_mask & configuation_flags;
505 const char* current_flag_name = get_configuration_flag_string(current_flag);
506
507 if (current_flag)
508 {
509 if (!result.empty())
510 result += ","; /* purecov: inspected */
511
512 result += current_flag_name;
513 }
514
515 configuration_flags_mask = configuration_flags_mask << 1;
516 }
517
518 return result;
519 }
520
521 bool
comparator_group_member_version(Group_member_info * m1,Group_member_info * m2)522 Group_member_info::comparator_group_member_version(Group_member_info *m1,
523 Group_member_info *m2)
524 {
525 return m2->has_greater_version(m1);
526 }
527
528 bool
comparator_group_member_uuid(Group_member_info * m1,Group_member_info * m2)529 Group_member_info::comparator_group_member_uuid(Group_member_info *m1,
530 Group_member_info *m2)
531 {
532 return m1->has_lower_uuid(m2);
533 }
534
535 bool
comparator_group_member_weight(Group_member_info * m1,Group_member_info * m2)536 Group_member_info::comparator_group_member_weight(Group_member_info *m1,
537 Group_member_info *m2)
538 {
539 return m1->has_greater_weight(m2);
540 }
541
542 bool
has_greater_version(Group_member_info * other)543 Group_member_info::has_greater_version(Group_member_info *other)
544 {
545 if (*member_version > *(other->member_version))
546 return true;
547
548 return false;
549 }
550
551 bool
has_lower_uuid(Group_member_info * other)552 Group_member_info::has_lower_uuid(Group_member_info *other)
553 {
554 return this->get_uuid().compare(other->get_uuid()) < 0;
555 }
556
557 bool
has_greater_weight(Group_member_info * other)558 Group_member_info::has_greater_weight(Group_member_info *other)
559 {
560 if (this->get_member_weight() > other->get_member_weight())
561 return true;
562
563 if (this->get_member_weight() == other->get_member_weight())
564 return has_lower_uuid(other);
565
566 return false;
567 }
568
569 Group_member_info_manager::
Group_member_info_manager(Group_member_info * local_member_info)570 Group_member_info_manager(Group_member_info* local_member_info)
571 {
572 members= new map<string, Group_member_info*>();
573 this->local_member_info= local_member_info;
574
575 mysql_mutex_init(key_GR_LOCK_group_info_manager, &update_lock,
576 MY_MUTEX_INIT_FAST);
577
578 add(local_member_info);
579 }
580
~Group_member_info_manager()581 Group_member_info_manager::~Group_member_info_manager()
582 {
583 clear_members();
584 delete members;
585 }
586
587 size_t
get_number_of_members()588 Group_member_info_manager::get_number_of_members()
589 {
590 return members->size();
591 }
592
593 Group_member_info*
get_group_member_info(const string & uuid)594 Group_member_info_manager::get_group_member_info(const string& uuid)
595 {
596 Group_member_info* member= NULL;
597 mysql_mutex_lock(&update_lock);
598
599 map<string, Group_member_info*>::iterator it;
600
601 it= members->find(uuid);
602
603 if(it != members->end())
604 {
605 member= (*it).second;
606 }
607
608 Group_member_info* member_copy= NULL;
609 if(member != NULL)
610 {
611 member_copy= new Group_member_info(*member);
612 }
613
614 mysql_mutex_unlock(&update_lock);
615
616 return member_copy;
617 }
618
619 Group_member_info*
get_group_member_info_by_index(int idx)620 Group_member_info_manager::get_group_member_info_by_index(int idx)
621 {
622 Group_member_info* member= NULL;
623
624 mysql_mutex_lock(&update_lock);
625
626 map<string, Group_member_info*>::iterator it;
627 if(idx < (int)members->size())
628 {
629 int i= 0;
630 for(it= members->begin(); i <= idx; i++, it++)
631 {
632 member= (*it).second;
633 }
634 }
635
636 Group_member_info* member_copy= NULL;
637 if(member != NULL)
638 {
639 member_copy= new Group_member_info(*member);
640 }
641 mysql_mutex_unlock(&update_lock);
642
643 return member_copy;
644 }
645
646 Group_member_info*
647 Group_member_info_manager::
get_group_member_info_by_member_id(Gcs_member_identifier idx)648 get_group_member_info_by_member_id(Gcs_member_identifier idx)
649 {
650 Group_member_info* member= NULL;
651
652 mysql_mutex_lock(&update_lock);
653
654 map<string, Group_member_info*>::iterator it;
655 for(it= members->begin(); it != members->end(); it++)
656 {
657 if((*it).second->get_gcs_member_id() == idx)
658 {
659 member= new Group_member_info(*(*it).second);
660 break;
661 }
662 }
663
664 mysql_mutex_unlock(&update_lock);
665 return member;
666 }
667
668 vector<Group_member_info*>*
get_all_members()669 Group_member_info_manager::get_all_members()
670 {
671 mysql_mutex_lock(&update_lock);
672
673 vector<Group_member_info*>* all_members= new vector<Group_member_info*>();
674 map<string, Group_member_info*>::iterator it;
675 for(it= members->begin(); it != members->end(); it++)
676 {
677 Group_member_info* member_copy = new Group_member_info(*(*it).second);
678 all_members->push_back(member_copy);
679 }
680
681 mysql_mutex_unlock(&update_lock);
682 return all_members;
683 }
684
685 void
add(Group_member_info * new_member)686 Group_member_info_manager::add(Group_member_info* new_member)
687 {
688 mysql_mutex_lock(&update_lock);
689
690 (*members)[new_member->get_uuid()]= new_member;
691
692 mysql_mutex_unlock(&update_lock);
693 }
694
695 void
update(vector<Group_member_info * > * new_members)696 Group_member_info_manager::update(vector<Group_member_info*>* new_members)
697 {
698 mysql_mutex_lock(&update_lock);
699
700 this->clear_members();
701
702 vector<Group_member_info*>::iterator new_members_it;
703 for(new_members_it= new_members->begin();
704 new_members_it != new_members->end();
705 new_members_it++)
706 {
707 //If this bears the local member to be updated
708 // It will add the current reference and update its status
709 if(*(*new_members_it) == *local_member_info)
710 {
711 local_member_info
712 ->update_recovery_status((*new_members_it)->get_recovery_status());
713
714 delete (*new_members_it);
715
716 continue;
717 }
718
719 (*members)[(*new_members_it)->get_uuid()]= (*new_members_it);
720 }
721
722 mysql_mutex_unlock(&update_lock);
723 }
724
725 void
726 Group_member_info_manager::
update_member_status(const string & uuid,Group_member_info::Group_member_status new_status)727 update_member_status(const string& uuid,
728 Group_member_info::Group_member_status new_status)
729 {
730 mysql_mutex_lock(&update_lock);
731
732 map<string, Group_member_info*>::iterator it;
733
734 it= members->find(uuid);
735
736 if(it != members->end())
737 {
738 (*it).second->update_recovery_status(new_status);
739 }
740
741 mysql_mutex_unlock(&update_lock);
742 }
743
744 void
745 Group_member_info_manager::
set_member_unreachable(const std::string & uuid)746 set_member_unreachable(const std::string &uuid)
747 {
748 mysql_mutex_lock(&update_lock);
749
750 map<string, Group_member_info*>::iterator it = members->find(uuid);
751 if (it != members->end()) {
752 (*it).second->set_unreachable();
753 }
754
755 mysql_mutex_unlock(&update_lock);
756 }
757
758 void
759 Group_member_info_manager::
set_member_reachable(const std::string & uuid)760 set_member_reachable(const std::string &uuid)
761 {
762 mysql_mutex_lock(&update_lock);
763
764 map<string, Group_member_info*>::iterator it = members->find(uuid);
765 if (it != members->end()) {
766 (*it).second->set_reachable();
767 }
768
769 mysql_mutex_unlock(&update_lock);
770 }
771
772 void
773 Group_member_info_manager::
update_gtid_sets(const string & uuid,string & gtid_executed,string & gtid_retrieved)774 update_gtid_sets(const string& uuid,
775 string& gtid_executed,
776 string& gtid_retrieved)
777 {
778 mysql_mutex_lock(&update_lock);
779
780 map<string, Group_member_info*>::iterator it;
781
782 it= members->find(uuid);
783
784 if(it != members->end())
785 {
786 (*it).second->update_gtid_sets(gtid_executed, gtid_retrieved);
787 }
788
789 mysql_mutex_unlock(&update_lock);
790 }
791
792 void
793 Group_member_info_manager::
update_member_role(const string & uuid,Group_member_info::Group_member_role new_role)794 update_member_role(const string& uuid,
795 Group_member_info::Group_member_role new_role)
796 {
797 mysql_mutex_lock(&update_lock);
798
799 map<string, Group_member_info*>::iterator it;
800
801 it= members->find(uuid);
802
803 if(it != members->end())
804 {
805 (*it).second->set_role(new_role);
806 }
807
808 mysql_mutex_unlock(&update_lock);
809 }
810
811
812 void
clear_members()813 Group_member_info_manager::clear_members()
814 {
815 map<string, Group_member_info*>::iterator it= members->begin();
816 while (it != members->end()) {
817 if((*it).second == local_member_info)
818 {
819 ++it;
820 continue;
821 }
822
823 delete (*it).second;
824 members->erase(it++);
825 }
826 }
827
is_conflict_detection_enabled()828 bool Group_member_info_manager::is_conflict_detection_enabled()
829 {
830 bool conflict_detection= false;
831
832 mysql_mutex_lock(&update_lock);
833 map<string, Group_member_info*>::iterator it= members->begin();
834 while (it != members->end())
835 {
836 if ((*it).second != local_member_info)
837 {
838 conflict_detection|= (*it).second->is_conflict_detection_enabled();
839 }
840 ++it;
841 }
842 mysql_mutex_unlock(&update_lock);
843
844 return conflict_detection;
845 }
846
847
848 void
encode(vector<uchar> * to_encode)849 Group_member_info_manager::encode(vector<uchar>* to_encode)
850 {
851 Group_member_info_manager_message *group_info_message=
852 new Group_member_info_manager_message(*this);
853 group_info_message->encode(to_encode);
854 delete group_info_message;
855 }
856
857 vector<Group_member_info*>*
decode(const uchar * to_decode,uint64 length)858 Group_member_info_manager::decode(const uchar* to_decode, uint64 length)
859 {
860 vector<Group_member_info*>* decoded_members= NULL;
861
862 Group_member_info_manager_message *group_info_message=
863 new Group_member_info_manager_message();
864 group_info_message->decode(to_decode, length);
865 decoded_members= group_info_message->get_all_members();
866 delete group_info_message;
867
868 return decoded_members;
869 }
870
871 void
872 Group_member_info_manager::
get_primary_member_uuid(std::string & primary_member_uuid)873 get_primary_member_uuid(std::string &primary_member_uuid)
874 {
875 map<string, Group_member_info*>::iterator it= members->begin();
876
877 for (it= members->begin(); it != members->end(); it++)
878 {
879 Group_member_info* info= (*it).second;
880 if (info->get_role() == Group_member_info::MEMBER_ROLE_PRIMARY)
881 {
882 assert(primary_member_uuid.empty());
883 primary_member_uuid =info->get_uuid();
884 }
885 }
886
887 if (primary_member_uuid.empty() ||
888 Group_member_info::MEMBER_ERROR ==
889 local_member_info->get_recovery_status())
890 primary_member_uuid= "UNDEFINED";
891 }
892
is_majority_unreachable()893 bool Group_member_info_manager::is_majority_unreachable()
894 {
895 bool ret= false;
896 int unreachables= 0;
897
898 mysql_mutex_lock(&update_lock);
899 map<string, Group_member_info*>::iterator it= members->begin();
900
901 for (it= members->begin(); it != members->end(); it++)
902 {
903 Group_member_info* info= (*it).second;
904 if (info->is_unreachable())
905 unreachables++;
906 }
907 ret= (members->size() - unreachables) <= (members->size() / 2);
908 mysql_mutex_unlock(&update_lock);
909
910 return ret;
911 }
912
913 std::string
get_string_current_view_active_hosts() const914 Group_member_info_manager::get_string_current_view_active_hosts() const
915 {
916 std::stringstream hosts_string;
917 map<string, Group_member_info*>::iterator all_members_it= members->begin();
918 bool first_entry = true;
919
920 while (all_members_it != members->end())
921 {
922 Group_member_info* member_info= (*all_members_it).second;
923 if (!first_entry)
924 hosts_string << ", ";
925 else
926 first_entry = false;
927 hosts_string << member_info->get_hostname() << ":" << member_info->get_port();
928 all_members_it++;
929 }
930
931 return hosts_string.str();
932 }
933
Group_member_info_manager_message()934 Group_member_info_manager_message::Group_member_info_manager_message()
935 : Plugin_gcs_message(CT_MEMBER_INFO_MANAGER_MESSAGE)
936 {
937 DBUG_ENTER("Group_member_info_manager_message::Group_member_info_manager_message");
938 members= new vector<Group_member_info*>();
939 DBUG_VOID_RETURN;
940 }
941
Group_member_info_manager_message(Group_member_info_manager & group_info)942 Group_member_info_manager_message::Group_member_info_manager_message(
943 Group_member_info_manager& group_info)
944 : Plugin_gcs_message(CT_MEMBER_INFO_MANAGER_MESSAGE),
945 members(group_info.get_all_members())
946 {
947 DBUG_ENTER("Group_member_info_manager_message::Group_member_info_manager_message");
948 DBUG_VOID_RETURN;
949 }
950
Group_member_info_manager_message(Group_member_info * member_info)951 Group_member_info_manager_message::Group_member_info_manager_message(
952 Group_member_info* member_info)
953 : Plugin_gcs_message(CT_MEMBER_INFO_MANAGER_MESSAGE),
954 members(NULL)
955 {
956 DBUG_ENTER("Group_member_info_manager_message::Group_member_info_manager_message");
957 members= new vector<Group_member_info*>();
958 members->push_back(member_info);
959 DBUG_VOID_RETURN;
960 }
961
~Group_member_info_manager_message()962 Group_member_info_manager_message::~Group_member_info_manager_message()
963 {
964 DBUG_ENTER("Group_member_info_manager_message::~Group_member_info_manager_message");
965 clear_members();
966 delete members;
967 DBUG_VOID_RETURN;
968 }
969
clear_members()970 void Group_member_info_manager_message::clear_members()
971 {
972 DBUG_ENTER("Group_member_info_manager_message::clear_members");
973 std::vector<Group_member_info*>::iterator it;
974 for(it= members->begin(); it != members->end(); it++)
975 {
976 delete (*it);
977 }
978 members->clear();
979 DBUG_VOID_RETURN;
980 }
981
982 std::vector<Group_member_info*>*
get_all_members()983 Group_member_info_manager_message::get_all_members()
984 {
985 DBUG_ENTER("Group_member_info_manager_message::get_all_members");
986 vector<Group_member_info*>* all_members= new vector<Group_member_info*>();
987
988 std::vector<Group_member_info*>::iterator it;
989 for(it= members->begin(); it != members->end(); it++)
990 {
991 Group_member_info* member_copy = new Group_member_info(*(*it));
992 all_members->push_back(member_copy);
993 }
994
995 DBUG_RETURN(all_members);
996 }
997
998 void
encode_payload(std::vector<unsigned char> * buffer) const999 Group_member_info_manager_message::encode_payload(std::vector<unsigned char>* buffer) const
1000 {
1001 DBUG_ENTER("Group_member_info_manager_message::encode_payload");
1002
1003 uint16 number_of_members= (uint16)members->size();
1004 encode_payload_item_int2(buffer, PIT_MEMBERS_NUMBER,
1005 number_of_members);
1006
1007 std::vector<Group_member_info*>::iterator it;
1008 for(it= members->begin(); it != members->end(); it++)
1009 {
1010 std::vector<uchar> encoded_member;
1011 (*it)->encode(&encoded_member);
1012
1013 encode_payload_item_type_and_length(buffer, PIT_MEMBER_DATA,
1014 encoded_member.size());
1015 buffer->insert(buffer->end(), encoded_member.begin(), encoded_member.end());
1016 }
1017
1018 DBUG_VOID_RETURN;
1019 }
1020
1021 void
decode_payload(const unsigned char * buffer,const unsigned char * end)1022 Group_member_info_manager_message::decode_payload(const unsigned char* buffer,
1023 const unsigned char* end)
1024 {
1025 DBUG_ENTER("Group_member_info_manager_message::decode_payload");
1026 const unsigned char *slider= buffer;
1027 uint16 payload_item_type= 0;
1028 unsigned long long payload_item_length= 0;
1029
1030 uint16 number_of_members= 0;
1031 decode_payload_item_int2(&slider,
1032 &payload_item_type,
1033 &number_of_members);
1034
1035 clear_members();
1036 for(uint16 i= 0; i < number_of_members; i++)
1037 {
1038 decode_payload_item_type_and_length(&slider,
1039 &payload_item_type,
1040 &payload_item_length);
1041 Group_member_info* member= new Group_member_info(slider,
1042 payload_item_length);
1043 members->push_back(member);
1044 slider+= payload_item_length;
1045 }
1046
1047 DBUG_VOID_RETURN;
1048 }
1049