1 /* Copyright (C) 2007 Google Inc. 2 Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 23 24 #ifndef SEMISYNC_MASTER_H 25 #define SEMISYNC_MASTER_H 26 27 #include <sys/types.h> 28 29 #include "my_dbug.h" 30 #include "my_inttypes.h" 31 #include "my_io.h" 32 #include "my_psi_config.h" 33 #include "plugin/semisync/semisync.h" 34 35 extern PSI_memory_key key_ss_memory_TranxNodeAllocator_block; 36 37 #ifdef HAVE_PSI_INTERFACE 38 extern PSI_mutex_key key_ss_mutex_LOCK_binlog_; 39 extern PSI_cond_key key_ss_cond_COND_binlog_send_; 40 #endif 41 42 extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; 43 extern PSI_stage_info stage_waiting_for_semi_sync_slave; 44 extern PSI_stage_info stage_reading_semi_sync_ack; 45 46 extern unsigned int rpl_semi_sync_master_wait_for_slave_count; 47 48 struct TranxNode { 49 char log_name_[FN_REFLEN]; 50 my_off_t log_pos_; 51 mysql_cond_t cond; 52 int n_waiters; 53 struct TranxNode *next_; /* the next node in the sorted list */ 54 struct TranxNode *hash_next_; /* the next node during hash collision */ 55 }; 56 57 /** 58 @class TranxNodeAllocator 59 60 This class provides memory allocating and freeing methods for 61 TranxNode. The main target is performance. 62 63 @section ALLOCATE How to allocate a node 64 The pointer of the first node after 'last_node' in current_block is 65 returned. current_block will move to the next free Block when all nodes of 66 it are in use. A new Block is allocated and is put into the rear of the 67 Block link table if no Block is free. 68 69 The list starts up empty (ie, there is no allocated Block). 70 71 After some nodes are freed, there probably are some free nodes before 72 the sequence of the allocated nodes, but we do not reuse it. It is better 73 to keep the allocated nodes are in the sequence, for it is more efficient 74 for allocating and freeing TranxNode. 75 76 @section FREENODE How to free nodes 77 There are two methods for freeing nodes. They are free_all_nodes and 78 free_nodes_before. 79 80 'A Block is free' means all of its nodes are free. 81 @subsection free_nodes_before 82 As all allocated nodes are in the sequence, 'Before one node' means all 83 nodes before given node in the same Block and all Blocks before the Block 84 which containing the given node. As such, all Blocks before the given one 85 ('node') are free Block and moved into the rear of the Block link table. 86 The Block containing the given 'node', however, is not. For at least the 87 given 'node' is still in use. This will waste at most one Block, but it is 88 more efficient. 89 */ 90 #define BLOCK_TRANX_NODES 16 91 class TranxNodeAllocator { 92 public: 93 /** 94 @param reserved_nodes 95 The number of reserved TranxNodes. It is used to set 'reserved_blocks' 96 which can contain at least 'reserved_nodes' number of TranxNodes. When 97 freeing memory, we will reserve at least reserved_blocks of Blocks not 98 freed. 99 */ TranxNodeAllocator(uint reserved_nodes)100 TranxNodeAllocator(uint reserved_nodes) 101 : reserved_blocks(reserved_nodes / BLOCK_TRANX_NODES + 102 (reserved_nodes % BLOCK_TRANX_NODES > 1 ? 2 : 1)), 103 first_block(nullptr), 104 last_block(nullptr), 105 current_block(nullptr), 106 last_node(-1), 107 block_num(0) {} 108 ~TranxNodeAllocator()109 ~TranxNodeAllocator() { 110 Block *block = first_block; 111 while (block != nullptr) { 112 Block *next = block->next; 113 free_block(block); 114 block = next; 115 } 116 } 117 118 /** 119 The pointer of the first node after 'last_node' in current_block is 120 returned. current_block will move to the next free Block when all nodes of 121 it are in use. A new Block is allocated and is put into the rear of the 122 Block link table if no Block is free. 123 124 @return Return a TranxNode *, or NULL if an error occurred. 125 */ allocate_node()126 TranxNode *allocate_node() { 127 TranxNode *trx_node; 128 Block *block = current_block; 129 130 if (last_node == BLOCK_TRANX_NODES - 1) { 131 current_block = current_block->next; 132 last_node = -1; 133 } 134 135 if (current_block == nullptr && allocate_block()) { 136 current_block = block; 137 if (current_block) last_node = BLOCK_TRANX_NODES - 1; 138 return nullptr; 139 } 140 141 trx_node = &(current_block->nodes[++last_node]); 142 trx_node->log_name_[0] = '\0'; 143 trx_node->log_pos_ = 0; 144 trx_node->next_ = nullptr; 145 trx_node->hash_next_ = nullptr; 146 trx_node->n_waiters = 0; 147 return trx_node; 148 } 149 150 /** 151 All nodes are freed. 152 153 @return Return 0, or 1 if an error occurred. 154 */ free_all_nodes()155 int free_all_nodes() { 156 current_block = first_block; 157 last_node = -1; 158 free_blocks(); 159 return 0; 160 } 161 162 /** 163 All Blocks before the given 'node' are free Block and moved into the rear 164 of the Block link table. 165 166 @param node All nodes before 'node' will be freed 167 168 @return Return 0, or 1 if an error occurred. 169 */ free_nodes_before(TranxNode * node)170 int free_nodes_before(TranxNode *node) { 171 Block *block; 172 Block *prev_block = nullptr; 173 174 block = first_block; 175 while (block != current_block->next) { 176 /* Find the Block containing the given node */ 177 if (&(block->nodes[0]) <= node && 178 &(block->nodes[BLOCK_TRANX_NODES]) >= node) { 179 /* All Blocks before the given node are put into the rear */ 180 if (first_block != block) { 181 last_block->next = first_block; 182 first_block = block; 183 last_block = prev_block; 184 last_block->next = nullptr; 185 free_blocks(); 186 } 187 return 0; 188 } 189 prev_block = block; 190 block = block->next; 191 } 192 193 /* Node does not find should never happen */ 194 DBUG_ASSERT(0); 195 return 1; 196 } 197 198 private: 199 uint reserved_blocks; 200 201 /** 202 A sequence memory which contains BLOCK_TRANX_NODES TranxNodes. 203 204 BLOCK_TRANX_NODES The number of TranxNodes which are in a Block. 205 206 next Every Block has a 'next' pointer which points to the next Block. 207 These linking Blocks constitute a Block link table. 208 */ 209 struct Block { 210 Block *next; 211 TranxNode nodes[BLOCK_TRANX_NODES]; 212 }; 213 214 /** 215 The 'first_block' is the head of the Block link table; 216 */ 217 Block *first_block; 218 /** 219 The 'last_block' is the rear of the Block link table; 220 */ 221 Block *last_block; 222 223 /** 224 current_block always points the Block in the Block link table in 225 which the last allocated node is. The Blocks before it are all in use 226 and the Blocks after it are all free. 227 */ 228 Block *current_block; 229 230 /** 231 It always points to the last node which has been allocated in the 232 current_block. 233 */ 234 int last_node; 235 236 /** 237 How many Blocks are in the Block link table. 238 */ 239 uint block_num; 240 241 /** 242 Allocate a block and then assign it to current_block. 243 */ allocate_block()244 int allocate_block() { 245 Block *block = (Block *)my_malloc(key_ss_memory_TranxNodeAllocator_block, 246 sizeof(Block), MYF(0)); 247 if (block) { 248 block->next = nullptr; 249 250 if (first_block == nullptr) 251 first_block = block; 252 else 253 last_block->next = block; 254 255 /* New Block is always put into the rear */ 256 last_block = block; 257 /* New Block is always the current_block */ 258 current_block = block; 259 ++block_num; 260 261 for (int i = 0; i < BLOCK_TRANX_NODES; i++) 262 mysql_cond_init(key_ss_cond_COND_binlog_send_, 263 ¤t_block->nodes[i].cond); 264 265 return 0; 266 } 267 return 1; 268 } 269 270 /** 271 Free a given Block. 272 @param block The Block will be freed. 273 */ free_block(Block * block)274 void free_block(Block *block) { 275 for (int i = 0; i < BLOCK_TRANX_NODES; i++) 276 mysql_cond_destroy(&block->nodes[i].cond); 277 my_free(block); 278 --block_num; 279 } 280 281 /** 282 If there are some free Blocks and the total number of the Blocks in the 283 Block link table is larger than the 'reserved_blocks', Some free Blocks 284 will be freed until the total number of the Blocks is equal to the 285 'reserved_blocks' or there is only one free Block behind the 286 'current_block'. 287 */ free_blocks()288 void free_blocks() { 289 if (current_block == nullptr || current_block->next == nullptr) return; 290 291 /* One free Block is always kept behind the current block */ 292 Block *block = current_block->next->next; 293 while (block_num > reserved_blocks && block != nullptr) { 294 Block *next = block->next; 295 free_block(block); 296 block = next; 297 } 298 current_block->next->next = block; 299 if (block == nullptr) last_block = current_block->next; 300 } 301 }; 302 303 /** 304 This class manages memory for active transaction list. 305 306 We record each active transaction with a TranxNode, each session 307 can have only one open transaction. Because of EVENT, the total 308 active transaction nodes can exceed the maximum allowed 309 connections. 310 */ 311 class ActiveTranx : public Trace { 312 private: 313 TranxNodeAllocator allocator_; 314 /* These two record the active transaction list in sort order. */ 315 TranxNode *trx_front_, *trx_rear_; 316 317 TranxNode **trx_htb_; /* A hash table on active transactions. */ 318 319 int num_entries_; /* maximum hash table entries */ 320 mysql_mutex_t *lock_; /* mutex lock */ 321 322 inline void assert_lock_owner(); 323 324 inline unsigned int calc_hash(const unsigned char *key, unsigned int length); 325 unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos); 326 compare(const char * log_file_name1,my_off_t log_file_pos1,const TranxNode * node2)327 int compare(const char *log_file_name1, my_off_t log_file_pos1, 328 const TranxNode *node2) { 329 return compare(log_file_name1, log_file_pos1, node2->log_name_, 330 node2->log_pos_); 331 } compare(const TranxNode * node1,const char * log_file_name2,my_off_t log_file_pos2)332 int compare(const TranxNode *node1, const char *log_file_name2, 333 my_off_t log_file_pos2) { 334 return compare(node1->log_name_, node1->log_pos_, log_file_name2, 335 log_file_pos2); 336 } compare(const TranxNode * node1,const TranxNode * node2)337 int compare(const TranxNode *node1, const TranxNode *node2) { 338 return compare(node1->log_name_, node1->log_pos_, node2->log_name_, 339 node2->log_pos_); 340 } 341 342 public: 343 int signal_waiting_sessions_all(); 344 int signal_waiting_sessions_up_to(const char *log_file_name, 345 my_off_t log_file_pos); 346 TranxNode *find_active_tranx_node(const char *log_file_name, 347 my_off_t log_file_pos); 348 ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); 349 ~ActiveTranx(); 350 351 /* Insert an active transaction node with the specified position. 352 * 353 * Return: 354 * 0: success; non-zero: error 355 */ 356 int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); 357 358 /* Clear the active transaction nodes until(inclusive) the specified 359 * position. 360 * If log_file_name is NULL, everything will be cleared: the sorted 361 * list and the hash table will be reset to empty. 362 * 363 * Return: 364 * 0: success; non-zero: error 365 */ 366 int clear_active_tranx_nodes(const char *log_file_name, 367 my_off_t log_file_pos); 368 369 /* Given a position, check to see whether the position is an active 370 * transaction's ending position by probing the hash table. 371 */ 372 bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); 373 374 /* Given two binlog positions, compare which one is bigger based on 375 * (file_name, file_position). 376 */ 377 static int compare(const char *log_file_name1, my_off_t log_file_pos1, 378 const char *log_file_name2, my_off_t log_file_pos2); 379 380 /* Find out if active tranx node list is empty or not 381 * 382 * Return: 383 * True : If there are no nodes 384 * False: othewise 385 */ is_empty()386 bool is_empty() { return (trx_front_ == nullptr); } 387 }; 388 389 /** 390 AckInfo is a POD. It defines a structure includes information related to an 391 ack: server_id - which slave the ack comes from. binlog_name - the binlog 392 file name included in the ack. binlog_pos - the binlog file position 393 included in the ack. 394 */ 395 struct AckInfo { 396 int server_id; 397 char binlog_name[FN_REFLEN]; 398 unsigned long long binlog_pos = 0; 399 AckInfoAckInfo400 AckInfo() { clear(); } 401 clearAckInfo402 void clear() { binlog_name[0] = '\0'; } emptyAckInfo403 bool empty() const { return binlog_name[0] == '\0'; } is_serverAckInfo404 bool is_server(int server_id) const { return this->server_id == server_id; } 405 equal_toAckInfo406 bool equal_to(const char *log_file_name, my_off_t log_file_pos) const { 407 return (ActiveTranx::compare(binlog_name, binlog_pos, log_file_name, 408 log_file_pos) == 0); 409 } less_thanAckInfo410 bool less_than(const char *log_file_name, my_off_t log_file_pos) const { 411 return (ActiveTranx::compare(binlog_name, binlog_pos, log_file_name, 412 log_file_pos) < 0); 413 } 414 setAckInfo415 void set(int server_id, const char *log_file_name, my_off_t log_file_pos) { 416 this->server_id = server_id; 417 update(log_file_name, log_file_pos); 418 } updateAckInfo419 void update(const char *log_file_name, my_off_t log_file_pos) { 420 strcpy(binlog_name, log_file_name); 421 binlog_pos = log_file_pos; 422 } 423 }; 424 425 /** 426 AckContainer stores received acks internally and tell the caller the 427 ack's position when a transaction is fully acknowledged, so it can wake 428 up the waiting transactions. 429 */ 430 class AckContainer : public Trace { 431 public: AckContainer()432 AckContainer() : m_ack_array(nullptr), m_size(0), m_empty_slot(0) {} ~AckContainer()433 ~AckContainer() { 434 if (m_ack_array) my_free(m_ack_array); 435 } 436 437 /** Clear the content of the ack array */ clear()438 void clear() { 439 if (m_ack_array) { 440 for (unsigned i = 0; i < m_size; ++i) { 441 m_ack_array[i].clear(); 442 m_ack_array[i].server_id = 0; 443 m_ack_array[i].binlog_pos = 0; 444 } 445 m_empty_slot = m_size; 446 } 447 m_greatest_ack.clear(); 448 } 449 450 /** 451 Adjust capacity for the container and report the ack to semisync master, 452 if it is full. 453 454 @param[in] size size of the container. 455 @param ackinfo Acknowledgement information 456 457 @return 0 if succeeds, otherwise fails. 458 */ 459 int resize(unsigned int size, const AckInfo **ackinfo); 460 461 /** 462 Insert an ack's information into the container and report the minimum 463 ack to semisync master if it is full. 464 465 @param[in] server_id slave server_id of the ack 466 @param[in] log_file_name binlog file name of the ack 467 @param[in] log_file_pos binlog file position of the ack 468 469 @return Pointer of an ack if the ack should be reported to semisync master. 470 Otherwise, NULL is returned. 471 */ 472 const AckInfo *insert(int server_id, const char *log_file_name, 473 my_off_t log_file_pos); insert(const AckInfo & ackinfo)474 const AckInfo *insert(const AckInfo &ackinfo) { 475 return insert(ackinfo.server_id, ackinfo.binlog_name, ackinfo.binlog_pos); 476 } 477 478 private: 479 /* The greatest ack of the acks already reported to semisync master. */ 480 AckInfo m_greatest_ack; 481 482 AckInfo *m_ack_array; 483 /* size of the array */ 484 unsigned int m_size; 485 /* index of an empty slot, it helps improving insert speed. */ 486 unsigned int m_empty_slot; 487 488 /* Prohibit to copy AckContainer objects */ 489 AckContainer(AckContainer &container); 490 AckContainer &operator=(const AckContainer &container); 491 full()492 bool full() { return m_empty_slot == m_size; } size()493 unsigned int size() { return m_size; } 494 495 /** 496 Remove all acks which equal to the given position. 497 498 @param[in] log_file_name binlog name of the ack that should be removed 499 @param[in] log_file_pos binlog position of the ack that should removed 500 */ remove_all(const char * log_file_name,my_off_t log_file_pos)501 void remove_all(const char *log_file_name, my_off_t log_file_pos) { 502 unsigned int i = m_size; 503 for (i = 0; i < m_size; i++) { 504 if (m_ack_array[i].equal_to(log_file_name, log_file_pos)) { 505 m_ack_array[i].clear(); 506 m_empty_slot = i; 507 } 508 } 509 } 510 511 /** 512 Update a slave's ack into the container if another ack of the 513 slave is already in it. 514 515 @param[in] server_id server_id of the ack 516 @param[in] log_file_name binlog file name of the ack 517 @param[in] log_file_pos binlog file position of the ack 518 519 @return index of the slot that is updated. if it equals to 520 the size of container, then no slot is updated. 521 */ updateIfExist(int server_id,const char * log_file_name,my_off_t log_file_pos)522 unsigned int updateIfExist(int server_id, const char *log_file_name, 523 my_off_t log_file_pos) { 524 unsigned int i; 525 526 m_empty_slot = m_size; 527 for (i = 0; i < m_size; i++) { 528 if (m_ack_array[i].empty()) 529 m_empty_slot = i; 530 else if (m_ack_array[i].is_server(server_id)) { 531 m_ack_array[i].update(log_file_name, log_file_pos); 532 if (trace_level_ & kTraceDetail) 533 LogErr(INFORMATION_LEVEL, ER_SEMISYNC_UPDATE_EXISTING_SLAVE_ACK, i); 534 break; 535 } 536 } 537 return i; 538 } 539 540 /** 541 Find the minimum ack which is smaller than given position. When more than 542 one slots are minimum acks, it returns the one has smallest index. 543 544 @param[in] log_file_name binlog file name 545 @param[in] log_file_pos binlog file position 546 547 @return NULL if no ack is smaller than given position, otherwise 548 return its pointer. 549 */ minAck(const char * log_file_name,my_off_t log_file_pos)550 AckInfo *minAck(const char *log_file_name, my_off_t log_file_pos) { 551 unsigned int i; 552 AckInfo *ackinfo = nullptr; 553 554 for (i = 0; i < m_size; i++) { 555 if (m_ack_array[i].less_than(log_file_name, log_file_pos)) 556 ackinfo = m_ack_array + i; 557 } 558 559 return ackinfo; 560 } 561 }; 562 563 /** 564 The extension class for the master of semi-synchronous replication 565 */ 566 class ReplSemiSyncMaster : public ReplSemiSyncBase { 567 private: 568 ActiveTranx *active_tranxs_ = nullptr; 569 /* active transaction list: the list will 570 be cleared when semi-sync switches off. */ 571 572 /* True when initObject has been called */ 573 bool init_done_ = false; 574 575 /* Mutex that protects the following state variables and the active 576 * transaction list. 577 * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are 578 * already holding LOCK_binlog_ because it can cause deadlocks. 579 */ 580 mysql_mutex_t LOCK_binlog_; 581 582 /* This is set to true when reply_file_name_ contains meaningful data. */ 583 bool reply_file_name_inited_ = false; 584 585 /* The binlog name up to which we have received replies from any slaves. */ 586 char reply_file_name_[FN_REFLEN]; 587 588 /* The position in that file up to which we have the reply from any slaves. */ 589 my_off_t reply_file_pos_ = 0; 590 591 /* This is set to true when we know the 'smallest' wait position. */ 592 bool wait_file_name_inited_ = false; 593 594 /* NULL, or the 'smallest' filename that a transaction is waiting for 595 * slave replies. 596 */ 597 char wait_file_name_[FN_REFLEN]; 598 599 /* The smallest position in that file that a trx is waiting for: the trx 600 * can proceed and send an 'ok' to the client when the master has got the 601 * reply from the slave indicating that it already got the binlog events. 602 */ 603 my_off_t wait_file_pos_ = 0; 604 605 /* This is set to true when we know the 'largest' transaction commit 606 * position in the binlog file. 607 * We always maintain the position no matter whether semi-sync is switched 608 * on switched off. When a transaction wait timeout occurs, semi-sync will 609 * switch off. Binlog-dump thread can use the three fields to detect when 610 * slaves catch up on replication so that semi-sync can switch on again. 611 */ 612 bool commit_file_name_inited_ = false; 613 614 /* The 'largest' binlog filename that a commit transaction is seeing. */ 615 char commit_file_name_[FN_REFLEN]; 616 617 /* The 'largest' position in that file that a commit transaction is seeing. */ 618 my_off_t commit_file_pos_ = 0; 619 620 /* All global variables which can be set by parameters. */ 621 volatile bool master_enabled_ = 622 false; /* semi-sync is enabled on the master */ 623 unsigned long wait_timeout_ = 0; /* timeout period(ms) during tranx wait */ 624 625 bool state_ = false; /* whether semi-sync is switched */ 626 627 AckContainer ack_container_; 628 629 void lock(); 630 void unlock(); 631 632 /* Is semi-sync replication on? */ is_on()633 bool is_on() { return (state_); } 634 set_master_enabled(bool enabled)635 void set_master_enabled(bool enabled) { master_enabled_ = enabled; } 636 637 /* Switch semi-sync off because of timeout in transaction waiting. */ 638 int switch_off(); 639 640 void force_switch_on(); 641 642 /* Switch semi-sync on when slaves catch up. */ 643 int try_switch_on(const char *log_file_name, my_off_t log_file_pos); 644 645 public: 646 ReplSemiSyncMaster(); 647 ~ReplSemiSyncMaster(); 648 getMasterEnabled()649 bool getMasterEnabled() { return master_enabled_; } setTraceLevel(unsigned long trace_level)650 void setTraceLevel(unsigned long trace_level) { 651 trace_level_ = trace_level; 652 ack_container_.trace_level_ = trace_level; 653 if (active_tranxs_) active_tranxs_->trace_level_ = trace_level; 654 } 655 656 /* Set if the master has to wait for an ack from the salve or not. */ 657 void set_wait_no_slave(const void *val); 658 659 /* Set the transaction wait timeout period, in milliseconds. */ setWaitTimeout(unsigned long wait_timeout)660 void setWaitTimeout(unsigned long wait_timeout) { 661 wait_timeout_ = wait_timeout; 662 } 663 664 /* Initialize this class after MySQL parameters are initialized. this 665 * function should be called once at bootstrap time. 666 */ 667 int initObject(); 668 669 /* Enable the object to enable semi-sync replication inside the master. */ 670 int enableMaster(); 671 672 /* Enable the object to enable semi-sync replication inside the master. */ 673 int disableMaster(); 674 675 /* Add a semi-sync replication slave */ 676 void add_slave(); 677 678 /* Remove a semi-sync replication slave */ 679 void remove_slave(); 680 681 /* Is the slave servered by the thread requested semi-sync */ 682 bool is_semi_sync_slave(); 683 684 /* It parses a reply packet and call reportReplyBinlog to handle it. */ 685 int reportReplyPacket(uint32 server_id, const uchar *packet, 686 ulong packet_len); 687 688 /* In semi-sync replication, reports up to which binlog position we have 689 * received replies from the slave indicating that it already get the events 690 * or that was skipped in the master. 691 * 692 * Input: 693 * log_file_name - (IN) binlog file name 694 * end_offset - (IN) the offset in the binlog file up to which we have 695 * the replies from the slave or that was skipped 696 */ 697 void reportReplyBinlog(const char *log_file_name, my_off_t end_offset); 698 699 /* Commit a transaction in the final step. This function is called from 700 * InnoDB before returning from the low commit. If semi-sync is switch on, 701 * the function will wait to see whether binlog-dump thread get the reply for 702 * the events of the transaction. Remember that this is not a direct wait, 703 * instead, it waits to see whether the binlog-dump thread has reached the 704 * point. If the wait times out, semi-sync status will be switched off and 705 * all other transaction would not wait either. 706 * 707 * Input: (the transaction events' ending binlog position) 708 * trx_wait_binlog_name - (IN) ending position's file name 709 * trx_wait_binlog_pos - (IN) ending position's file offset 710 * 711 * Return: 712 * 0: success; non-zero: error 713 */ 714 int commitTrx(const char *trx_wait_binlog_name, my_off_t trx_wait_binlog_pos); 715 716 /* Reserve space in the replication event packet header: 717 * . slave semi-sync off: 1 byte - (0) 718 * . slave semi-sync on: 3 byte - (0, 0xef, 0/1} 719 * 720 * Input: 721 * header - (IN) the header buffer 722 * size - (IN) size of the header buffer 723 * 724 * Return: 725 * size of the bytes reserved for header 726 */ 727 int reserveSyncHeader(unsigned char *header, unsigned long size); 728 729 /* Update the sync bit in the packet header to indicate to the slave whether 730 * the master will wait for the reply of the event. If semi-sync is switched 731 * off and we detect that the slave is catching up, we switch semi-sync on. 732 * 733 * Input: 734 * packet - (IN) the packet containing the replication event 735 * log_file_name - (IN) the event ending position's file name 736 * log_file_pos - (IN) the event ending position's file offset 737 * server_id - (IN) master server id number 738 * 739 * Return: 740 * 0: success; non-zero: error 741 */ 742 int updateSyncHeader(unsigned char *packet, const char *log_file_name, 743 my_off_t log_file_pos, uint32 server_id); 744 745 /* Called when a transaction finished writing binlog events. 746 * . update the 'largest' transactions' binlog event position 747 * . insert the ending position in the active transaction list if 748 * semi-sync is on 749 * 750 * Input: (the transaction events' ending binlog position) 751 * log_file_name - (IN) transaction ending position's file name 752 * log_file_pos - (IN) transaction ending position's file offset 753 * 754 * Return: 755 * 0: success; non-zero: error 756 */ 757 int writeTranxInBinlog(const char *log_file_name, my_off_t log_file_pos); 758 759 /* Read the slave's reply so that we know how much progress the slave makes 760 * on receive replication events. 761 * 762 * Input: 763 * net - (IN) the connection to master 764 * event_buf - (IN) pointer to the event packet 765 * 766 * Return: 767 * 0: success; non-zero: error 768 */ 769 int readSlaveReply(NET *net, const char *event_buf); 770 771 /* In semi-sync replication, this method simulates the reception of 772 * an reply and executes reportReplyBinlog directly when a transaction 773 * is skipped in the master. 774 * 775 * Input: 776 * event_buf - (IN) pointer to the event packet 777 * server_id - (IN) master server id numbe 778 * log_file_name - (IN) the event ending position's file name 779 * log_file_pos - (IN) the event ending position's file offset 780 * 781 * Return: 782 * 0: success; non-zero: error 783 */ 784 int skipSlaveReply(const char *event_buf, uint32 server_id, 785 const char *log_file_name, my_off_t log_file_pos); 786 787 /* Export internal statistics for semi-sync replication. */ 788 void setExportStats(); 789 790 /* 'reset master' command is issued from the user and semi-sync need to 791 * go off for that. 792 */ 793 int resetMaster(); 794 795 /* 796 'SET rpl_semi_sync_master_wait_for_slave_count' command is issued from user 797 and semi-sync need to update rpl_semi_sync_master_wait_for_slave_count and 798 notify ack_container_ to resize itself. 799 800 @param[in] new_value The value users want to set to. 801 802 @return It returns 0 if succeeds, otherwise 1 is returned. 803 */ 804 int setWaitSlaveCount(unsigned int new_value); 805 806 /* 807 Update ack_array after receiving an ack from a dump connection. If any 808 binlog pos is already replied by rpl_semi_sync_master_wait_for_slave_count 809 slaves, it will call reportReplyBinlog to increase received binlog 810 position and wake up waiting transactions. It acquires LOCK_binlog_ 811 to protect the operation. 812 813 @param[in] server_id slave server_id of the ack 814 @param[in] log_file_name binlog file name of the ack 815 @param[in] log_file_pos binlog file position of the ack 816 */ handleAck(int server_id,const char * log_file_name,my_off_t log_file_pos)817 void handleAck(int server_id, const char *log_file_name, 818 my_off_t log_file_pos) { 819 lock(); 820 if (rpl_semi_sync_master_wait_for_slave_count == 1) 821 reportReplyBinlog(log_file_name, log_file_pos); 822 else { 823 const AckInfo *ackinfo = nullptr; 824 825 ackinfo = ack_container_.insert(server_id, log_file_name, log_file_pos); 826 if (ackinfo != nullptr) 827 reportReplyBinlog(ackinfo->binlog_name, ackinfo->binlog_pos); 828 } 829 unlock(); 830 } 831 }; 832 833 /* System and status variables for the master component */ 834 extern bool rpl_semi_sync_master_enabled; 835 extern char rpl_semi_sync_master_status; 836 extern unsigned long rpl_semi_sync_master_clients; 837 extern unsigned long rpl_semi_sync_master_timeout; 838 extern unsigned long rpl_semi_sync_master_trace_level; 839 extern unsigned long rpl_semi_sync_master_yes_transactions; 840 extern unsigned long rpl_semi_sync_master_no_transactions; 841 extern unsigned long rpl_semi_sync_master_off_times; 842 extern unsigned long rpl_semi_sync_master_wait_timeouts; 843 extern unsigned long rpl_semi_sync_master_timefunc_fails; 844 extern unsigned long rpl_semi_sync_master_num_timeouts; 845 extern unsigned long rpl_semi_sync_master_wait_sessions; 846 extern unsigned long rpl_semi_sync_master_wait_pos_backtraverse; 847 extern unsigned long rpl_semi_sync_master_avg_trx_wait_time; 848 extern unsigned long rpl_semi_sync_master_avg_net_wait_time; 849 extern unsigned long long rpl_semi_sync_master_net_wait_num; 850 extern unsigned long long rpl_semi_sync_master_trx_wait_num; 851 extern unsigned long long rpl_semi_sync_master_net_wait_time; 852 extern unsigned long long rpl_semi_sync_master_trx_wait_time; 853 854 /* 855 This indicates whether we should keep waiting if no semi-sync slave 856 is available. 857 0 : stop waiting if detected no avaialable semi-sync slave. 858 1 (default) : keep waiting until timeout even no available semi-sync slave. 859 */ 860 extern bool rpl_semi_sync_master_wait_no_slave; 861 #endif /* SEMISYNC_MASTER_H */ 862