1 /* Copyright (c) 2014, 2018, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software Foundation, 21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ 22 23 #ifndef CERTIFIER_INCLUDE 24 #define CERTIFIER_INCLUDE 25 26 #include <map> 27 #include <string> 28 #include <list> 29 #include <vector> 30 31 #include "certifier_stats_interface.h" 32 #include "member_info.h" 33 #include "gcs_plugin_messages.h" 34 #include "plugin_utils.h" 35 #include "pipeline_interfaces.h" 36 37 #include <mysql/gcs/gcs_communication_interface.h> 38 #include <mysql/gcs/gcs_control_interface.h> 39 #include <mysql/group_replication_priv.h> 40 41 42 /** 43 This class extends Gtid_set to include a reference counter. 44 45 It is for Certifier only, so it is single-threaded and no locks 46 are needed since Certifier already ensures sequential use. 47 48 It is to be used to share by multiple entries in the 49 certification info and released when the last reference to it 50 needs to be freed. 51 */ 52 class Gtid_set_ref: public Gtid_set 53 { 54 public: Gtid_set_ref(Sid_map * sid_map,int64 parallel_applier_sequence_number)55 Gtid_set_ref(Sid_map *sid_map, int64 parallel_applier_sequence_number) 56 :Gtid_set(sid_map), reference_counter(0), 57 parallel_applier_sequence_number(parallel_applier_sequence_number) 58 {} 59 ~Gtid_set_ref()60 virtual ~Gtid_set_ref() 61 {} 62 63 /** 64 Increment the number of references by one. 65 66 @return the number of references 67 */ link()68 size_t link() 69 { 70 return ++reference_counter; 71 } 72 73 /** 74 Decrement the number of references by one. 75 76 @return the number of references 77 */ unlink()78 size_t unlink() 79 { 80 DBUG_ASSERT(reference_counter > 0); 81 return --reference_counter; 82 } 83 get_parallel_applier_sequence_number()84 int64 get_parallel_applier_sequence_number() const 85 { 86 return parallel_applier_sequence_number; 87 } 88 89 private: 90 size_t reference_counter; 91 int64 parallel_applier_sequence_number; 92 }; 93 94 95 /** 96 This class is a core component of the database state machine 97 replication protocol. It implements conflict detection based 98 on a certification procedure. 99 100 Snapshot Isolation is based on assigning logical timestamp to optimistic 101 transactions, i.e. the ones which successfully meet certification and 102 are good to commit on all members in the group. This timestamp is a 103 monotonically increasing counter, and is same across all members in the group. 104 105 This timestamp, which in our algorithm is the snapshot version, is further 106 used to update the certification info. 107 The snapshot version maps the items in a transaction to the GTID_EXECUTED 108 that this transaction saw when it was executed, that is, on which version 109 the transaction was executed. 110 111 If the incoming transaction snapshot version is a subset of a 112 previous certified transaction for the same write set, the current 113 transaction was executed on top of outdated data, so it will be 114 negatively certified. Otherwise, this transaction is marked 115 certified and goes into applier. 116 */ 117 typedef std::map<std::string, Gtid_set_ref*> Certification_info; 118 119 120 class Certifier_broadcast_thread 121 { 122 public: 123 /** 124 Certifier_broadcast_thread constructor 125 */ 126 Certifier_broadcast_thread(); 127 virtual ~Certifier_broadcast_thread(); 128 129 /** 130 Initialize broadcast thread. 131 132 @return the operation status 133 @retval 0 OK 134 @retval !=0 Error 135 */ 136 int initialize(); 137 138 /** 139 Terminate broadcast thread. 140 141 @return the operation status 142 @retval 0 OK 143 @retval !=0 Error 144 */ 145 int terminate(); 146 147 /** 148 Broadcast thread worker method. 149 */ 150 void dispatcher(); 151 152 /** 153 Period (in seconds) between stable transactions set 154 broadcast. 155 */ 156 static const int BROADCAST_GTID_EXECUTED_PERIOD= 60; // seconds 157 158 private: 159 /** 160 Thread control. 161 */ 162 bool aborted; 163 THD *broadcast_thd; 164 my_thread_handle broadcast_pthd; 165 mysql_mutex_t broadcast_run_lock; 166 mysql_cond_t broadcast_run_cond; 167 mysql_mutex_t broadcast_dispatcher_lock; 168 mysql_cond_t broadcast_dispatcher_cond; 169 bool broadcast_thd_running; 170 size_t broadcast_counter; 171 int broadcast_gtid_executed_period; 172 173 /** 174 Broadcast local GTID_EXECUTED to group. 175 176 @return the operation status 177 @retval 0 OK 178 @retval !=0 Error 179 */ 180 int broadcast_gtid_executed(); 181 }; 182 183 184 class Certifier_interface : public Certifier_stats 185 { 186 public: ~Certifier_interface()187 virtual ~Certifier_interface() {} 188 virtual void handle_view_change()= 0; 189 virtual int handle_certifier_data(const uchar *data, ulong len, 190 const Gcs_member_identifier& gcs_member_id)= 0; 191 192 virtual void get_certification_info(std::map<std::string, std::string> *cert_info)= 0; 193 virtual int set_certification_info(std::map<std::string, std::string> *cert_info)= 0; 194 virtual bool set_group_stable_transactions_set(Gtid_set* executed_gtid_set)= 0; 195 virtual void enable_conflict_detection()= 0; 196 virtual void disable_conflict_detection()= 0; 197 virtual bool is_conflict_detection_enable()= 0; 198 }; 199 200 201 class Certifier: public Certifier_interface 202 { 203 public: 204 Certifier(); 205 virtual ~Certifier(); 206 207 /** 208 Key used to store errors in the certification info 209 on View_change_log_event. 210 */ 211 static const std::string CERTIFICATION_INFO_ERROR_NAME; 212 213 /** 214 Initialize certifier. 215 216 @param gtid_assignment_block_size the group gtid assignment block size 217 218 @return the operation status 219 @retval 0 OK 220 @retval !=0 Error 221 */ 222 int initialize(ulonglong gtid_assignment_block_size); 223 224 /** 225 Terminate certifier. 226 227 @return the operation status 228 @retval 0 OK 229 @retval !=0 Error 230 */ 231 int terminate(); 232 233 /** 234 Handle view changes on certifier. 235 */ 236 virtual void handle_view_change(); 237 238 /** 239 Queues the packet coming from the reader for future processing. 240 241 @param[in] data the packet data 242 @param[in] len the packet length 243 @param[in] gcs_member_id the member_id which sent the message 244 245 @return the operation status 246 @retval 0 OK 247 @retval !=0 Error on queue 248 */ 249 virtual int handle_certifier_data(const uchar *data, ulong len, 250 const Gcs_member_identifier& gcs_member_id); 251 252 /** 253 This member function SHALL certify the set of items against transactions 254 that have already passed the certification test. 255 256 @param snapshot_version The incoming transaction snapshot version. 257 @param write_set The incoming transaction write set. 258 @param generate_group_id Flag that indicates if transaction group id 259 must be generated. 260 @param member_uuid The UUID of the member from which this 261 transaction originates. 262 @param gle The incoming transaction global identifier 263 event. 264 @param local_transaction True if this transaction did originate from 265 this member, false otherwise. 266 267 @retval >0 transaction identifier (positively certified). 268 If generate_group_id is false and certification 269 positive a 1 is returned; 270 @retval 0 negatively certified; 271 @retval -1 error. 272 */ 273 rpl_gno certify(Gtid_set *snapshot_version, 274 std::list<const char*> *write_set, 275 bool generate_group_id, 276 const char *member_uuid, 277 Gtid_log_event *gle, 278 bool local_transaction); 279 280 /** 281 Returns the transactions in stable set in text format, that is, the set of 282 transactions already applied on all group members. 283 284 @param[out] buffer Pointer to pointer to string. The method will set it to 285 point to the newly allocated buffer, or NULL on out of 286 memory. 287 Caller must free the allocated memory. 288 @param[out] length Length of the generated string. 289 290 @return the operation status 291 @retval 0 OK 292 @retval !=0 Out of memory error 293 */ 294 int get_group_stable_transactions_set_string(char **buffer, size_t *length); 295 296 /** 297 Retrieves the current certification info. 298 299 @note if concurrent access is introduce to these variables, 300 locking is needed in this method 301 302 @param[out] cert_info a pointer to retrieve the certification info 303 */ 304 virtual void get_certification_info(std::map<std::string, std::string> *cert_info); 305 306 /** 307 Sets the certification info according to the given value. 308 309 @note if concurrent access is introduce to these variables, 310 locking is needed in this method 311 312 @param[in] cert_info certification info retrieved from recovery procedure 313 314 @retval > 0 Error during setting certification info. 315 @retval = 0 Everything went fine. 316 */ 317 virtual int set_certification_info(std::map<std::string, std::string> *cert_info); 318 319 /** 320 Get the number of postively certified transactions by the certifier 321 */ 322 ulonglong get_positive_certified(); 323 324 /** 325 Get method to retrieve the number of negatively certified transactions. 326 */ 327 ulonglong get_negative_certified(); 328 329 /** 330 Get method to retrieve the certification db size. 331 */ 332 ulonglong get_certification_info_size(); 333 334 /** 335 Get method to retrieve the last conflict free transaction. 336 337 @param[out] value The last conflict free transaction 338 */ 339 void get_last_conflict_free_transaction(std::string* value); 340 341 /** 342 Get method to retrieve the size of the members. 343 */ 344 size_t get_members_size(); 345 346 /** 347 Generate group GNO for a view change log event. 348 349 @retval >0 view change GNO 350 @retval otherwise Error on GNO generation 351 */ 352 rpl_gno generate_view_change_group_gno(); 353 354 /** 355 Public method to add the given gno value to the group_gtid_executed set 356 which is used to support skip gtid functionality. 357 358 @param[in] gno The gno of the transaction which will be added to the 359 group_gtid executed GTID set. The sidno used for this transaction 360 will be the group_sidno. The gno here belongs specifically 361 to the group UUID. 362 @param[in] local If the gtid value is local or comes from a remote server 363 364 @retval 1 error during addition. 365 @retval 0 success. 366 */ 367 int add_group_gtid_to_group_gtid_executed(rpl_gno gno, bool local); 368 369 /** 370 Public method to add the given GTID value in the group_gtid_executed set 371 which is used to support skip gtid functionality. 372 373 @param[in] gle The gtid value that needs to the added in the 374 group_gtid_executed GTID set. 375 @param[in] local If the gtid value is local or comes from a remote server 376 377 @retval 1 error during addition. 378 @retval 0 success. 379 */ 380 int add_specified_gtid_to_group_gtid_executed(Gtid_log_event *gle, bool local); 381 382 /** 383 This member function shall add transactions to the stable set 384 385 @param executed_gtid_set The GTID set of the transactions to be added 386 to the stable set. 387 388 @note when set, the stable set will cause the garbage collection 389 process to be invoked 390 391 @returns 392 @retval False if adds successfully, 393 @retval True otherwise. 394 */ 395 bool set_group_stable_transactions_set(Gtid_set* executed_gtid_set); 396 397 /** 398 Method to get a string that represents the last local certified GTID 399 400 @param[out] local_gtid_certified_string The last local GTID transaction string 401 402 @retval 0 if there is no GTID / the string is empty 403 @retval !=0 the size of the string 404 */ 405 size_t get_local_certified_gtid(std::string& local_gtid_certified_string); 406 407 /** 408 Enables conflict detection. 409 */ 410 void enable_conflict_detection(); 411 412 /** 413 Disables conflict detection. 414 */ 415 void disable_conflict_detection(); 416 417 /** 418 Check if conflict detection is enable. 419 420 @returns 421 @retval True conflict detection is enable 422 @retval False otherwise 423 */ 424 bool is_conflict_detection_enable(); 425 426 private: 427 /** 428 Key used to store group_gtid_executed on certification 429 info on View_change_log_event. 430 */ 431 static const std::string GTID_EXTRACTED_NAME; 432 433 /** 434 Is certifier initialized. 435 */ 436 bool initialized; 437 438 /** 439 Variable to store the sidno used for transactions which will be logged 440 with the group_uuid. 441 */ 442 rpl_sidno group_gtid_sid_map_group_sidno; 443 444 /** 445 Method to initialize the group_gtid_executed gtid set with the server gtid 446 executed set and applier retrieved gtid set values. 447 448 @param get_server_gtid_retrieved add applier retrieved gtid set to 449 group_gtid_executed gtid set 450 451 @retval 1 error during initialization 452 @retval 0 success 453 454 */ 455 int initialize_server_gtid_set(bool get_server_gtid_retrieved= false); 456 457 /** 458 This function computes the available GTID intervals from group 459 UUID and stores them on group_available_gtid_intervals. 460 */ 461 void compute_group_available_gtid_intervals(); 462 463 /** 464 This function reserves a block of GTIDs from the 465 group_available_gtid_intervals list. 466 467 @retval Gtid_set::Interval which is the interval os GTIDs attributed 468 */ 469 Gtid_set::Interval reserve_gtid_block(longlong block_size); 470 471 /** 472 This function updates parallel applier indexes. 473 It must be called for each remote transaction. 474 475 @param[in] update_parallel_applier_last_committed_global 476 If true parallel_applier_last_committed_global 477 is updated to the current sequence number 478 (before update sequence number). 479 480 Note: parallel_applier_last_committed_global should be updated 481 on the following situations: 482 1) Transaction without write set is certified, since it 483 represents the lowest last_committed for all future 484 transactions; 485 2) After certification info garbage collection, since we 486 do not know what write sets were purged, which may cause 487 transactions last committed to be incorrectly computed. 488 */ 489 void increment_parallel_applier_sequence_number( 490 bool update_parallel_applier_last_committed_global); 491 492 /** 493 Internal method to add the given gtid gno in the group_gtid_executed set. 494 This will be used in the skip gtid implementation. 495 496 @note this will update the last know local transaction GTID. 497 498 @param[in] sidno rpl_sidno part of the executing gtid of the ongoing 499 transaction. 500 501 @param[in] gno rpl_gno part of the executing gtid of the ongoing 502 transaction. 503 @param[in] local_transaction if the GTID belongs to a local transaction 504 */ 505 void add_to_group_gtid_executed_internal(rpl_sidno sidno, rpl_gno gno, 506 bool local_transaction); 507 508 /** 509 This method is used to get the next valid GNO for the 510 transaction that is being executed. It checks the already used 511 up GNOs and based on that chooses the next possible value. 512 This method will consult group_available_gtid_intervals to 513 assign GTIDs in blocks according to gtid_assignment_block_size. 514 515 @param member_uuid The UUID of the member from which this 516 transaction originates. It will be NULL 517 on View_change_log_event. 518 519 @retval >0 The GNO to be used. 520 @retval -1 Error: GNOs exhausted for group UUID. 521 */ 522 rpl_gno get_group_next_available_gtid(const char *member_uuid); 523 524 /** 525 Generate the candidate GNO for the current transaction. 526 The candidate will be on the interval [start, end] or a error 527 be returned. 528 This method will consult group_gtid_executed to avoid generate 529 the same value twice. 530 531 @param start The first possible value for the GNO 532 @param end The last possible value for the GNO 533 534 @retval >0 The GNO to be used. 535 @retval -1 Error: GNOs exhausted for group UUID. 536 @retval -2 Error: generated GNO is bigger than end. 537 */ 538 rpl_gno get_group_next_available_gtid_candidate(rpl_gno start, 539 rpl_gno end) const; 540 is_initialized()541 bool inline is_initialized() 542 { 543 return initialized; 544 } 545 546 void clear_certification_info(); 547 548 /** 549 Method to clear the members. 550 */ 551 void clear_members(); 552 553 /** 554 Last conflict free transaction identification. 555 */ 556 Gtid last_conflict_free_transaction; 557 558 /** 559 Certification database. 560 */ 561 Certification_info certification_info; 562 Sid_map *certification_info_sid_map; 563 564 ulonglong positive_cert; 565 ulonglong negative_cert; 566 int64 parallel_applier_last_committed_global; 567 int64 parallel_applier_sequence_number; 568 569 #if !defined(DBUG_OFF) 570 bool certifier_garbage_collection_block; 571 bool same_member_message_discarded; 572 #endif 573 574 mysql_mutex_t LOCK_certification_info; 575 576 /** 577 Stable set and garbage collector variables. 578 */ 579 Checkable_rwlock *stable_gtid_set_lock; 580 Sid_map *stable_sid_map; 581 Gtid_set *stable_gtid_set; 582 Synchronized_queue<Data_packet *> *incoming; 583 584 std::vector<std::string> members; 585 586 /* 587 Flag to indicate that certifier is handling already applied 588 transactions during distributed recovery procedure. 589 590 On donor we may have local transactions certified after 591 View_change_log_event (VCLE) logged into binary log before VCLE. 592 That is, these local transactions will be appear on recovery 593 and also on GCS messages. One can see on example scenario below: 594 595 GCS order | donor binary log order | joiner apply order 596 -----------+------------------------+-------------------- 597 T1 | T1 | T1 598 T2 | T2 | T2 599 V1 | T3 | T3 (recovery) 600 T3 | V1 | V1 601 | | T3 (GCS) 602 -----------+------------------------+-------------------- 603 604 T3 is delivered to donor by both recovery and GCS, so joiner needs 605 to ensure that T3 has the same global identifier on both cases, so 606 that it is correctly skipped on the second time it is applied. 607 608 We ensure that T3 (and other transactions on that situation) have 609 the same global identifiers on joiner by: 610 1) When the VCLE is applied, we set on joiner certification info 611 the same exact certification that was on donor, including the 612 set of certified transactions before the joiner joined: 613 group_gtid_extracted. 614 2) We compare group_gtid_extracted and group_gtid_executed: 615 If group_gtid_extracted is a non equal subset of 616 group_gtid_executed, it means that we are on the above 617 scenario, that is, when applying the last transaction from 618 the distributed recovery process we have more transactions 619 than the ones certified before the view on which joiner joined. 620 So until group_gtid_extracted is a non equal subset of 621 group_gtid_executed certifier will generate transactions ids 622 following group_gtid_extracted so that we have the same exact 623 ids that donor has. 624 3) When joiner group_gtid_extracted and group_gtid_executed are 625 equal, joiner switches to its regular ids generation mode, 626 generating ids from group_gtid_executed. 627 */ 628 bool certifying_already_applied_transactions; 629 630 /* 631 Sid map to store the GTIDs that are executed in the group. 632 */ 633 Sid_map *group_gtid_sid_map; 634 635 /* 636 A Gtid_set containing the already executed for the group. 637 This is used to support skip_gtid. 638 */ 639 Gtid_set* group_gtid_executed; 640 641 /** 642 A Gtid_set which contains the gtid extracted from the certification info 643 map of the donor. It is the set of transactions that is executed at the 644 time of View_change_log_event at donor. 645 */ 646 Gtid_set *group_gtid_extracted; 647 648 /** 649 The group GTID assignment block size. 650 */ 651 ulonglong gtid_assignment_block_size; 652 653 /** 654 List of free GTID intervals in group 655 */ 656 std::list<Gtid_set::Interval> group_available_gtid_intervals; 657 658 /** 659 Extends the above to allow GTIDs to be assigned in blocks per member. 660 */ 661 std::map<std::string, Gtid_set::Interval> member_gtids; 662 ulonglong gtids_assigned_in_blocks_counter; 663 664 /** 665 Last local known GTID 666 */ 667 Gtid last_local_gtid; 668 669 /** 670 Conflict detection is performed when: 671 1) group is on multi-master mode; 672 2) group is on single-primary mode and primary is applying 673 relay logs with transactions from a previous primary. 674 */ 675 bool conflict_detection_enable; 676 677 mysql_mutex_t LOCK_members; 678 679 /** 680 Broadcast thread. 681 */ 682 Certifier_broadcast_thread *broadcast_thread; 683 684 /** 685 Adds an item from transaction writeset to the certification DB. 686 @param[in] item item in the writeset to be added to the 687 Certification DB. 688 @param[in] snapshot_version Snapshot version of the incoming transaction 689 which modified the above mentioned item. 690 @param[out] item_previous_sequence_number 691 The previous parallel applier sequence number 692 for this item. 693 694 @return 695 @retval False successfully added to the map. 696 True otherwise. 697 */ 698 bool add_item(const char* item, Gtid_set_ref *snapshot_version, 699 int64 *item_previous_sequence_number); 700 701 /** 702 Find the snapshot_version corresponding to an item. Return if 703 it exists, other wise return NULL; 704 705 @param[in] item item for the snapshot version. 706 @retval Gtid_set pointer if exists in the map. 707 Otherwise 0; 708 */ 709 Gtid_set *get_certified_write_set_snapshot_version(const char* item); 710 711 /** 712 Computes intersection between all sets received, so that we 713 have the already applied transactions on all servers. 714 715 @return the operation status 716 @retval 0 OK 717 @retval !=0 Error 718 */ 719 int stable_set_handle(); 720 721 /** 722 Removes the intersection of the received transactions stable 723 sets from certification database. 724 */ 725 void garbage_collect(); 726 727 /** 728 Clear incoming queue. 729 */ 730 void clear_incoming(); 731 732 /* 733 Update method to store the count of the positively and negatively 734 certified transaction on a particular group member. 735 */ 736 void update_certified_transaction_count(bool result); 737 }; 738 739 /* 740 @class Gtid_Executed_Message 741 742 Class to convey the serialized contents of the previously executed GTIDs 743 */ 744 class Gtid_Executed_Message: public Plugin_gcs_message 745 { 746 public: 747 enum enum_payload_item_type 748 { 749 // This type should not be used anywhere. 750 PIT_UNKNOWN= 0, 751 752 // Length of the payload item: variable 753 PIT_GTID_EXECUTED= 1, 754 755 // No valid type codes can appear after this one. 756 PIT_MAX= 2 757 }; 758 759 /** 760 Gtid_Executed_Message constructor 761 */ 762 Gtid_Executed_Message(); 763 virtual ~Gtid_Executed_Message(); 764 765 /** 766 Appends Gtid executed information in a raw format 767 768 * @param[in] gtid_data encoded GTID data 769 * @param[in] len GTID data length 770 */ 771 void append_gtid_executed(uchar* gtid_data, size_t len); 772 773 protected: 774 /* 775 Implementation of the template methods of Gcs_plugin_message 776 */ 777 void encode_payload(std::vector<unsigned char>* buffer) const; 778 void decode_payload(const unsigned char* buffer, const unsigned char* end); 779 780 private: 781 std::vector<uchar> data; 782 }; 783 784 #endif /* CERTIFIER_INCLUDE */ 785