1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 #ifndef SEMISYNC_MASTER_H
25 #define SEMISYNC_MASTER_H
26 
27 #include <sys/types.h>
28 
29 #include "my_dbug.h"
30 #include "my_inttypes.h"
31 #include "my_io.h"
32 #include "my_psi_config.h"
33 #include "plugin/semisync/semisync.h"
34 
35 extern PSI_memory_key key_ss_memory_TranxNodeAllocator_block;
36 
37 #ifdef HAVE_PSI_INTERFACE
38 extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
39 extern PSI_cond_key key_ss_cond_COND_binlog_send_;
40 #endif
41 
42 extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
43 extern PSI_stage_info stage_waiting_for_semi_sync_slave;
44 extern PSI_stage_info stage_reading_semi_sync_ack;
45 
46 extern unsigned int rpl_semi_sync_master_wait_for_slave_count;
47 
48 struct TranxNode {
49   char log_name_[FN_REFLEN];
50   my_off_t log_pos_;
51   mysql_cond_t cond;
52   int n_waiters;
53   struct TranxNode *next_;      /* the next node in the sorted list */
54   struct TranxNode *hash_next_; /* the next node during hash collision */
55 };
56 
57 /**
58   @class TranxNodeAllocator
59 
60   This class provides memory allocating and freeing methods for
61   TranxNode. The main target is performance.
62 
63   @section ALLOCATE How to allocate a node
64     The pointer of the first node after 'last_node' in current_block is
65     returned. current_block will move to the next free Block when all nodes of
66     it are in use. A new Block is allocated and is put into the rear of the
67     Block link table if no Block is free.
68 
69     The list starts up empty (ie, there is no allocated Block).
70 
71     After some nodes are freed, there probably are some free nodes before
72     the sequence of the allocated nodes, but we do not reuse it. It is better
73     to keep the allocated nodes are in the sequence, for it is more efficient
74     for allocating and freeing TranxNode.
75 
76   @section FREENODE How to free nodes
77     There are two methods for freeing nodes. They are free_all_nodes and
78     free_nodes_before.
79 
80     'A Block is free' means all of its nodes are free.
81     @subsection free_nodes_before
82     As all allocated nodes are in the sequence, 'Before one node' means all
83     nodes before given node in the same Block and all Blocks before the Block
84     which containing the given node. As such, all Blocks before the given one
85     ('node') are free Block and moved into the rear of the Block link table.
86     The Block containing the given 'node', however, is not. For at least the
87     given 'node' is still in use. This will waste at most one Block, but it is
88     more efficient.
89  */
90 #define BLOCK_TRANX_NODES 16
91 class TranxNodeAllocator {
92  public:
93   /**
94     @param reserved_nodes
95       The number of reserved TranxNodes. It is used to set 'reserved_blocks'
96       which can contain at least 'reserved_nodes' number of TranxNodes.  When
97       freeing memory, we will reserve at least reserved_blocks of Blocks not
98       freed.
99    */
TranxNodeAllocator(uint reserved_nodes)100   TranxNodeAllocator(uint reserved_nodes)
101       : reserved_blocks(reserved_nodes / BLOCK_TRANX_NODES +
102                         (reserved_nodes % BLOCK_TRANX_NODES > 1 ? 2 : 1)),
103         first_block(nullptr),
104         last_block(nullptr),
105         current_block(nullptr),
106         last_node(-1),
107         block_num(0) {}
108 
~TranxNodeAllocator()109   ~TranxNodeAllocator() {
110     Block *block = first_block;
111     while (block != nullptr) {
112       Block *next = block->next;
113       free_block(block);
114       block = next;
115     }
116   }
117 
118   /**
119     The pointer of the first node after 'last_node' in current_block is
120     returned. current_block will move to the next free Block when all nodes of
121     it are in use. A new Block is allocated and is put into the rear of the
122     Block link table if no Block is free.
123 
124     @return Return a TranxNode *, or NULL if an error occurred.
125    */
allocate_node()126   TranxNode *allocate_node() {
127     TranxNode *trx_node;
128     Block *block = current_block;
129 
130     if (last_node == BLOCK_TRANX_NODES - 1) {
131       current_block = current_block->next;
132       last_node = -1;
133     }
134 
135     if (current_block == nullptr && allocate_block()) {
136       current_block = block;
137       if (current_block) last_node = BLOCK_TRANX_NODES - 1;
138       return nullptr;
139     }
140 
141     trx_node = &(current_block->nodes[++last_node]);
142     trx_node->log_name_[0] = '\0';
143     trx_node->log_pos_ = 0;
144     trx_node->next_ = nullptr;
145     trx_node->hash_next_ = nullptr;
146     trx_node->n_waiters = 0;
147     return trx_node;
148   }
149 
150   /**
151     All nodes are freed.
152 
153     @return Return 0, or 1 if an error occurred.
154    */
free_all_nodes()155   int free_all_nodes() {
156     current_block = first_block;
157     last_node = -1;
158     free_blocks();
159     return 0;
160   }
161 
162   /**
163     All Blocks before the given 'node' are free Block and moved into the rear
164     of the Block link table.
165 
166     @param node All nodes before 'node' will be freed
167 
168     @return Return 0, or 1 if an error occurred.
169    */
free_nodes_before(TranxNode * node)170   int free_nodes_before(TranxNode *node) {
171     Block *block;
172     Block *prev_block = nullptr;
173 
174     block = first_block;
175     while (block != current_block->next) {
176       /* Find the Block containing the given node */
177       if (&(block->nodes[0]) <= node &&
178           &(block->nodes[BLOCK_TRANX_NODES]) >= node) {
179         /* All Blocks before the given node are put into the rear */
180         if (first_block != block) {
181           last_block->next = first_block;
182           first_block = block;
183           last_block = prev_block;
184           last_block->next = nullptr;
185           free_blocks();
186         }
187         return 0;
188       }
189       prev_block = block;
190       block = block->next;
191     }
192 
193     /* Node does not find should never happen */
194     DBUG_ASSERT(0);
195     return 1;
196   }
197 
198  private:
199   uint reserved_blocks;
200 
201   /**
202     A sequence memory which contains BLOCK_TRANX_NODES TranxNodes.
203 
204     BLOCK_TRANX_NODES The number of TranxNodes which are in a Block.
205 
206     next Every Block has a 'next' pointer which points to the next Block.
207          These linking Blocks constitute a Block link table.
208    */
209   struct Block {
210     Block *next;
211     TranxNode nodes[BLOCK_TRANX_NODES];
212   };
213 
214   /**
215     The 'first_block' is the head of the Block link table;
216    */
217   Block *first_block;
218   /**
219     The 'last_block' is the rear of the Block link table;
220    */
221   Block *last_block;
222 
223   /**
224     current_block always points the Block in the Block link table in
225     which the last allocated node is. The Blocks before it are all in use
226     and the Blocks after it are all free.
227    */
228   Block *current_block;
229 
230   /**
231     It always points to the last node which has been allocated in the
232     current_block.
233    */
234   int last_node;
235 
236   /**
237     How many Blocks are in the Block link table.
238    */
239   uint block_num;
240 
241   /**
242     Allocate a block and then assign it to current_block.
243   */
allocate_block()244   int allocate_block() {
245     Block *block = (Block *)my_malloc(key_ss_memory_TranxNodeAllocator_block,
246                                       sizeof(Block), MYF(0));
247     if (block) {
248       block->next = nullptr;
249 
250       if (first_block == nullptr)
251         first_block = block;
252       else
253         last_block->next = block;
254 
255       /* New Block is always put into the rear */
256       last_block = block;
257       /* New Block is always the current_block */
258       current_block = block;
259       ++block_num;
260 
261       for (int i = 0; i < BLOCK_TRANX_NODES; i++)
262         mysql_cond_init(key_ss_cond_COND_binlog_send_,
263                         &current_block->nodes[i].cond);
264 
265       return 0;
266     }
267     return 1;
268   }
269 
270   /**
271     Free a given Block.
272     @param block The Block will be freed.
273    */
free_block(Block * block)274   void free_block(Block *block) {
275     for (int i = 0; i < BLOCK_TRANX_NODES; i++)
276       mysql_cond_destroy(&block->nodes[i].cond);
277     my_free(block);
278     --block_num;
279   }
280 
281   /**
282     If there are some free Blocks and the total number of the Blocks in the
283     Block link table is larger than the 'reserved_blocks', Some free Blocks
284     will be freed until the total number of the Blocks is equal to the
285     'reserved_blocks' or there is only one free Block behind the
286     'current_block'.
287    */
free_blocks()288   void free_blocks() {
289     if (current_block == nullptr || current_block->next == nullptr) return;
290 
291     /* One free Block is always kept behind the current block */
292     Block *block = current_block->next->next;
293     while (block_num > reserved_blocks && block != nullptr) {
294       Block *next = block->next;
295       free_block(block);
296       block = next;
297     }
298     current_block->next->next = block;
299     if (block == nullptr) last_block = current_block->next;
300   }
301 };
302 
303 /**
304    This class manages memory for active transaction list.
305 
306    We record each active transaction with a TranxNode, each session
307    can have only one open transaction. Because of EVENT, the total
308    active transaction nodes can exceed the maximum allowed
309    connections.
310 */
311 class ActiveTranx : public Trace {
312  private:
313   TranxNodeAllocator allocator_;
314   /* These two record the active transaction list in sort order. */
315   TranxNode *trx_front_, *trx_rear_;
316 
317   TranxNode **trx_htb_; /* A hash table on active transactions. */
318 
319   int num_entries_;     /* maximum hash table entries */
320   mysql_mutex_t *lock_; /* mutex lock */
321 
322   inline void assert_lock_owner();
323 
324   inline unsigned int calc_hash(const unsigned char *key, unsigned int length);
325   unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
326 
compare(const char * log_file_name1,my_off_t log_file_pos1,const TranxNode * node2)327   int compare(const char *log_file_name1, my_off_t log_file_pos1,
328               const TranxNode *node2) {
329     return compare(log_file_name1, log_file_pos1, node2->log_name_,
330                    node2->log_pos_);
331   }
compare(const TranxNode * node1,const char * log_file_name2,my_off_t log_file_pos2)332   int compare(const TranxNode *node1, const char *log_file_name2,
333               my_off_t log_file_pos2) {
334     return compare(node1->log_name_, node1->log_pos_, log_file_name2,
335                    log_file_pos2);
336   }
compare(const TranxNode * node1,const TranxNode * node2)337   int compare(const TranxNode *node1, const TranxNode *node2) {
338     return compare(node1->log_name_, node1->log_pos_, node2->log_name_,
339                    node2->log_pos_);
340   }
341 
342  public:
343   int signal_waiting_sessions_all();
344   int signal_waiting_sessions_up_to(const char *log_file_name,
345                                     my_off_t log_file_pos);
346   TranxNode *find_active_tranx_node(const char *log_file_name,
347                                     my_off_t log_file_pos);
348   ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
349   ~ActiveTranx();
350 
351   /* Insert an active transaction node with the specified position.
352    *
353    * Return:
354    *  0: success;  non-zero: error
355    */
356   int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
357 
358   /* Clear the active transaction nodes until(inclusive) the specified
359    * position.
360    * If log_file_name is NULL, everything will be cleared: the sorted
361    * list and the hash table will be reset to empty.
362    *
363    * Return:
364    *  0: success;  non-zero: error
365    */
366   int clear_active_tranx_nodes(const char *log_file_name,
367                                my_off_t log_file_pos);
368 
369   /* Given a position, check to see whether the position is an active
370    * transaction's ending position by probing the hash table.
371    */
372   bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
373 
374   /* Given two binlog positions, compare which one is bigger based on
375    * (file_name, file_position).
376    */
377   static int compare(const char *log_file_name1, my_off_t log_file_pos1,
378                      const char *log_file_name2, my_off_t log_file_pos2);
379 
380   /* Find out if active tranx node list is empty or not
381    *
382    * Return:
383    *   True :  If there are no nodes
384    *   False:  othewise
385    */
is_empty()386   bool is_empty() { return (trx_front_ == nullptr); }
387 };
388 
389 /**
390    AckInfo is a POD. It defines a structure includes information related to an
391    ack: server_id   - which slave the ack comes from. binlog_name - the binlog
392    file name included in the ack. binlog_pos  - the binlog file position
393    included in the ack.
394 */
395 struct AckInfo {
396   int server_id;
397   char binlog_name[FN_REFLEN];
398   unsigned long long binlog_pos = 0;
399 
AckInfoAckInfo400   AckInfo() { clear(); }
401 
clearAckInfo402   void clear() { binlog_name[0] = '\0'; }
emptyAckInfo403   bool empty() const { return binlog_name[0] == '\0'; }
is_serverAckInfo404   bool is_server(int server_id) const { return this->server_id == server_id; }
405 
equal_toAckInfo406   bool equal_to(const char *log_file_name, my_off_t log_file_pos) const {
407     return (ActiveTranx::compare(binlog_name, binlog_pos, log_file_name,
408                                  log_file_pos) == 0);
409   }
less_thanAckInfo410   bool less_than(const char *log_file_name, my_off_t log_file_pos) const {
411     return (ActiveTranx::compare(binlog_name, binlog_pos, log_file_name,
412                                  log_file_pos) < 0);
413   }
414 
setAckInfo415   void set(int server_id, const char *log_file_name, my_off_t log_file_pos) {
416     this->server_id = server_id;
417     update(log_file_name, log_file_pos);
418   }
updateAckInfo419   void update(const char *log_file_name, my_off_t log_file_pos) {
420     strcpy(binlog_name, log_file_name);
421     binlog_pos = log_file_pos;
422   }
423 };
424 
425 /**
426    AckContainer stores received acks internally and tell the caller the
427    ack's position when a transaction is fully acknowledged, so it can wake
428    up the waiting transactions.
429  */
430 class AckContainer : public Trace {
431  public:
AckContainer()432   AckContainer() : m_ack_array(nullptr), m_size(0), m_empty_slot(0) {}
~AckContainer()433   ~AckContainer() {
434     if (m_ack_array) my_free(m_ack_array);
435   }
436 
437   /** Clear the content of the ack array */
clear()438   void clear() {
439     if (m_ack_array) {
440       for (unsigned i = 0; i < m_size; ++i) {
441         m_ack_array[i].clear();
442         m_ack_array[i].server_id = 0;
443         m_ack_array[i].binlog_pos = 0;
444       }
445       m_empty_slot = m_size;
446     }
447     m_greatest_ack.clear();
448   }
449 
450   /**
451      Adjust capacity for the container and report the ack to semisync master,
452      if it is full.
453 
454      @param[in] size size of the container.
455      @param ackinfo Acknowledgement information
456 
457      @return 0 if succeeds, otherwise fails.
458   */
459   int resize(unsigned int size, const AckInfo **ackinfo);
460 
461   /**
462      Insert an ack's information into the container and report the minimum
463      ack to semisync master if it is full.
464 
465      @param[in] server_id  slave server_id of the ack
466      @param[in] log_file_name  binlog file name of the ack
467      @param[in] log_file_pos   binlog file position of the ack
468 
469      @return Pointer of an ack if the ack should be reported to semisync master.
470              Otherwise, NULL is returned.
471   */
472   const AckInfo *insert(int server_id, const char *log_file_name,
473                         my_off_t log_file_pos);
insert(const AckInfo & ackinfo)474   const AckInfo *insert(const AckInfo &ackinfo) {
475     return insert(ackinfo.server_id, ackinfo.binlog_name, ackinfo.binlog_pos);
476   }
477 
478  private:
479   /* The greatest ack of the acks already reported to semisync master. */
480   AckInfo m_greatest_ack;
481 
482   AckInfo *m_ack_array;
483   /* size of the array */
484   unsigned int m_size;
485   /* index of an empty slot, it helps improving insert speed. */
486   unsigned int m_empty_slot;
487 
488   /* Prohibit to copy AckContainer objects */
489   AckContainer(AckContainer &container);
490   AckContainer &operator=(const AckContainer &container);
491 
full()492   bool full() { return m_empty_slot == m_size; }
size()493   unsigned int size() { return m_size; }
494 
495   /**
496      Remove all acks which equal to the given position.
497 
498      @param[in] log_file_name  binlog name of the ack that should be removed
499      @param[in] log_file_pos   binlog position of the ack that should removed
500   */
remove_all(const char * log_file_name,my_off_t log_file_pos)501   void remove_all(const char *log_file_name, my_off_t log_file_pos) {
502     unsigned int i = m_size;
503     for (i = 0; i < m_size; i++) {
504       if (m_ack_array[i].equal_to(log_file_name, log_file_pos)) {
505         m_ack_array[i].clear();
506         m_empty_slot = i;
507       }
508     }
509   }
510 
511   /**
512      Update a slave's ack into the container if another ack of the
513      slave is already in it.
514 
515      @param[in] server_id      server_id of the ack
516      @param[in] log_file_name  binlog file name of the ack
517      @param[in] log_file_pos   binlog file position of the ack
518 
519      @return index of the slot that is updated. if it equals to
520              the size of container, then no slot is updated.
521   */
updateIfExist(int server_id,const char * log_file_name,my_off_t log_file_pos)522   unsigned int updateIfExist(int server_id, const char *log_file_name,
523                              my_off_t log_file_pos) {
524     unsigned int i;
525 
526     m_empty_slot = m_size;
527     for (i = 0; i < m_size; i++) {
528       if (m_ack_array[i].empty())
529         m_empty_slot = i;
530       else if (m_ack_array[i].is_server(server_id)) {
531         m_ack_array[i].update(log_file_name, log_file_pos);
532         if (trace_level_ & kTraceDetail)
533           LogErr(INFORMATION_LEVEL, ER_SEMISYNC_UPDATE_EXISTING_SLAVE_ACK, i);
534         break;
535       }
536     }
537     return i;
538   }
539 
540   /**
541      Find the minimum ack which is smaller than given position. When more than
542      one slots are minimum acks, it returns the one has smallest index.
543 
544      @param[in] log_file_name  binlog file name
545      @param[in] log_file_pos   binlog file position
546 
547      @return NULL if no ack is smaller than given position, otherwise
548               return its pointer.
549   */
minAck(const char * log_file_name,my_off_t log_file_pos)550   AckInfo *minAck(const char *log_file_name, my_off_t log_file_pos) {
551     unsigned int i;
552     AckInfo *ackinfo = nullptr;
553 
554     for (i = 0; i < m_size; i++) {
555       if (m_ack_array[i].less_than(log_file_name, log_file_pos))
556         ackinfo = m_ack_array + i;
557     }
558 
559     return ackinfo;
560   }
561 };
562 
563 /**
564    The extension class for the master of semi-synchronous replication
565 */
566 class ReplSemiSyncMaster : public ReplSemiSyncBase {
567  private:
568   ActiveTranx *active_tranxs_ = nullptr;
569   /* active transaction list: the list will
570      be cleared when semi-sync switches off. */
571 
572   /* True when initObject has been called */
573   bool init_done_ = false;
574 
575   /* Mutex that protects the following state variables and the active
576    * transaction list.
577    * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
578    * already holding LOCK_binlog_ because it can cause deadlocks.
579    */
580   mysql_mutex_t LOCK_binlog_;
581 
582   /* This is set to true when reply_file_name_ contains meaningful data. */
583   bool reply_file_name_inited_ = false;
584 
585   /* The binlog name up to which we have received replies from any slaves. */
586   char reply_file_name_[FN_REFLEN];
587 
588   /* The position in that file up to which we have the reply from any slaves. */
589   my_off_t reply_file_pos_ = 0;
590 
591   /* This is set to true when we know the 'smallest' wait position. */
592   bool wait_file_name_inited_ = false;
593 
594   /* NULL, or the 'smallest' filename that a transaction is waiting for
595    * slave replies.
596    */
597   char wait_file_name_[FN_REFLEN];
598 
599   /* The smallest position in that file that a trx is waiting for: the trx
600    * can proceed and send an 'ok' to the client when the master has got the
601    * reply from the slave indicating that it already got the binlog events.
602    */
603   my_off_t wait_file_pos_ = 0;
604 
605   /* This is set to true when we know the 'largest' transaction commit
606    * position in the binlog file.
607    * We always maintain the position no matter whether semi-sync is switched
608    * on switched off.  When a transaction wait timeout occurs, semi-sync will
609    * switch off.  Binlog-dump thread can use the three fields to detect when
610    * slaves catch up on replication so that semi-sync can switch on again.
611    */
612   bool commit_file_name_inited_ = false;
613 
614   /* The 'largest' binlog filename that a commit transaction is seeing.       */
615   char commit_file_name_[FN_REFLEN];
616 
617   /* The 'largest' position in that file that a commit transaction is seeing. */
618   my_off_t commit_file_pos_ = 0;
619 
620   /* All global variables which can be set by parameters. */
621   volatile bool master_enabled_ =
622       false;                       /* semi-sync is enabled on the master */
623   unsigned long wait_timeout_ = 0; /* timeout period(ms) during tranx wait */
624 
625   bool state_ = false; /* whether semi-sync is switched */
626 
627   AckContainer ack_container_;
628 
629   void lock();
630   void unlock();
631 
632   /* Is semi-sync replication on? */
is_on()633   bool is_on() { return (state_); }
634 
set_master_enabled(bool enabled)635   void set_master_enabled(bool enabled) { master_enabled_ = enabled; }
636 
637   /* Switch semi-sync off because of timeout in transaction waiting. */
638   int switch_off();
639 
640   void force_switch_on();
641 
642   /* Switch semi-sync on when slaves catch up. */
643   int try_switch_on(const char *log_file_name, my_off_t log_file_pos);
644 
645  public:
646   ReplSemiSyncMaster();
647   ~ReplSemiSyncMaster();
648 
getMasterEnabled()649   bool getMasterEnabled() { return master_enabled_; }
setTraceLevel(unsigned long trace_level)650   void setTraceLevel(unsigned long trace_level) {
651     trace_level_ = trace_level;
652     ack_container_.trace_level_ = trace_level;
653     if (active_tranxs_) active_tranxs_->trace_level_ = trace_level;
654   }
655 
656   /* Set if the master has to wait for an ack from the salve or not. */
657   void set_wait_no_slave(const void *val);
658 
659   /* Set the transaction wait timeout period, in milliseconds. */
setWaitTimeout(unsigned long wait_timeout)660   void setWaitTimeout(unsigned long wait_timeout) {
661     wait_timeout_ = wait_timeout;
662   }
663 
664   /* Initialize this class after MySQL parameters are initialized. this
665    * function should be called once at bootstrap time.
666    */
667   int initObject();
668 
669   /* Enable the object to enable semi-sync replication inside the master. */
670   int enableMaster();
671 
672   /* Enable the object to enable semi-sync replication inside the master. */
673   int disableMaster();
674 
675   /* Add a semi-sync replication slave */
676   void add_slave();
677 
678   /* Remove a semi-sync replication slave */
679   void remove_slave();
680 
681   /* Is the slave servered by the thread requested semi-sync */
682   bool is_semi_sync_slave();
683 
684   /* It parses a reply packet and call reportReplyBinlog to handle it. */
685   int reportReplyPacket(uint32 server_id, const uchar *packet,
686                         ulong packet_len);
687 
688   /* In semi-sync replication, reports up to which binlog position we have
689    * received replies from the slave indicating that it already get the events
690    * or that was skipped in the master.
691    *
692    * Input:
693    *  log_file_name - (IN)  binlog file name
694    *  end_offset    - (IN)  the offset in the binlog file up to which we have
695    *                        the replies from the slave or that was skipped
696    */
697   void reportReplyBinlog(const char *log_file_name, my_off_t end_offset);
698 
699   /* Commit a transaction in the final step.  This function is called from
700    * InnoDB before returning from the low commit.  If semi-sync is switch on,
701    * the function will wait to see whether binlog-dump thread get the reply for
702    * the events of the transaction.  Remember that this is not a direct wait,
703    * instead, it waits to see whether the binlog-dump thread has reached the
704    * point.  If the wait times out, semi-sync status will be switched off and
705    * all other transaction would not wait either.
706    *
707    * Input:  (the transaction events' ending binlog position)
708    *  trx_wait_binlog_name - (IN)  ending position's file name
709    *  trx_wait_binlog_pos  - (IN)  ending position's file offset
710    *
711    * Return:
712    *  0: success;  non-zero: error
713    */
714   int commitTrx(const char *trx_wait_binlog_name, my_off_t trx_wait_binlog_pos);
715 
716   /* Reserve space in the replication event packet header:
717    *  . slave semi-sync off: 1 byte - (0)
718    *  . slave semi-sync on:  3 byte - (0, 0xef, 0/1}
719    *
720    * Input:
721    *  header   - (IN)  the header buffer
722    *  size     - (IN)  size of the header buffer
723    *
724    * Return:
725    *  size of the bytes reserved for header
726    */
727   int reserveSyncHeader(unsigned char *header, unsigned long size);
728 
729   /* Update the sync bit in the packet header to indicate to the slave whether
730    * the master will wait for the reply of the event.  If semi-sync is switched
731    * off and we detect that the slave is catching up, we switch semi-sync on.
732    *
733    * Input:
734    *  packet        - (IN)  the packet containing the replication event
735    *  log_file_name - (IN)  the event ending position's file name
736    *  log_file_pos  - (IN)  the event ending position's file offset
737    *  server_id     - (IN)  master server id number
738    *
739    * Return:
740    *  0: success;  non-zero: error
741    */
742   int updateSyncHeader(unsigned char *packet, const char *log_file_name,
743                        my_off_t log_file_pos, uint32 server_id);
744 
745   /* Called when a transaction finished writing binlog events.
746    *  . update the 'largest' transactions' binlog event position
747    *  . insert the ending position in the active transaction list if
748    *    semi-sync is on
749    *
750    * Input:  (the transaction events' ending binlog position)
751    *  log_file_name - (IN)  transaction ending position's file name
752    *  log_file_pos  - (IN)  transaction ending position's file offset
753    *
754    * Return:
755    *  0: success;  non-zero: error
756    */
757   int writeTranxInBinlog(const char *log_file_name, my_off_t log_file_pos);
758 
759   /* Read the slave's reply so that we know how much progress the slave makes
760    * on receive replication events.
761    *
762    * Input:
763    *  net          - (IN)  the connection to master
764    *  event_buf    - (IN)  pointer to the event packet
765    *
766    * Return:
767    *  0: success;  non-zero: error
768    */
769   int readSlaveReply(NET *net, const char *event_buf);
770 
771   /* In semi-sync replication, this method simulates the reception of
772    * an reply and executes reportReplyBinlog directly when a transaction
773    * is skipped in the master.
774    *
775    * Input:
776    *  event_buf     - (IN)  pointer to the event packet
777    *  server_id     - (IN)  master server id numbe
778    *  log_file_name - (IN)  the event ending position's file name
779    *  log_file_pos  - (IN)  the event ending position's file offset
780    *
781    * Return:
782    *  0: success;  non-zero: error
783    */
784   int skipSlaveReply(const char *event_buf, uint32 server_id,
785                      const char *log_file_name, my_off_t log_file_pos);
786 
787   /* Export internal statistics for semi-sync replication. */
788   void setExportStats();
789 
790   /* 'reset master' command is issued from the user and semi-sync need to
791    * go off for that.
792    */
793   int resetMaster();
794 
795   /*
796     'SET rpl_semi_sync_master_wait_for_slave_count' command is issued from user
797     and semi-sync need to update rpl_semi_sync_master_wait_for_slave_count and
798     notify ack_container_ to resize itself.
799 
800     @param[in] new_value The value users want to set to.
801 
802     @return It returns 0 if succeeds, otherwise 1 is returned.
803    */
804   int setWaitSlaveCount(unsigned int new_value);
805 
806   /*
807     Update ack_array after receiving an ack from a dump connection. If any
808     binlog pos is already replied by rpl_semi_sync_master_wait_for_slave_count
809     slaves, it will call reportReplyBinlog to increase received binlog
810     position and wake up waiting transactions. It acquires LOCK_binlog_
811     to protect the operation.
812 
813     @param[in] server_id  slave server_id of the ack
814     @param[in] log_file_name  binlog file name of the ack
815     @param[in] log_file_pos   binlog file position of the ack
816   */
handleAck(int server_id,const char * log_file_name,my_off_t log_file_pos)817   void handleAck(int server_id, const char *log_file_name,
818                  my_off_t log_file_pos) {
819     lock();
820     if (rpl_semi_sync_master_wait_for_slave_count == 1)
821       reportReplyBinlog(log_file_name, log_file_pos);
822     else {
823       const AckInfo *ackinfo = nullptr;
824 
825       ackinfo = ack_container_.insert(server_id, log_file_name, log_file_pos);
826       if (ackinfo != nullptr)
827         reportReplyBinlog(ackinfo->binlog_name, ackinfo->binlog_pos);
828     }
829     unlock();
830   }
831 };
832 
833 /* System and status variables for the master component */
834 extern bool rpl_semi_sync_master_enabled;
835 extern char rpl_semi_sync_master_status;
836 extern unsigned long rpl_semi_sync_master_clients;
837 extern unsigned long rpl_semi_sync_master_timeout;
838 extern unsigned long rpl_semi_sync_master_trace_level;
839 extern unsigned long rpl_semi_sync_master_yes_transactions;
840 extern unsigned long rpl_semi_sync_master_no_transactions;
841 extern unsigned long rpl_semi_sync_master_off_times;
842 extern unsigned long rpl_semi_sync_master_wait_timeouts;
843 extern unsigned long rpl_semi_sync_master_timefunc_fails;
844 extern unsigned long rpl_semi_sync_master_num_timeouts;
845 extern unsigned long rpl_semi_sync_master_wait_sessions;
846 extern unsigned long rpl_semi_sync_master_wait_pos_backtraverse;
847 extern unsigned long rpl_semi_sync_master_avg_trx_wait_time;
848 extern unsigned long rpl_semi_sync_master_avg_net_wait_time;
849 extern unsigned long long rpl_semi_sync_master_net_wait_num;
850 extern unsigned long long rpl_semi_sync_master_trx_wait_num;
851 extern unsigned long long rpl_semi_sync_master_net_wait_time;
852 extern unsigned long long rpl_semi_sync_master_trx_wait_time;
853 
854 /*
855   This indicates whether we should keep waiting if no semi-sync slave
856   is available.
857      0           : stop waiting if detected no avaialable semi-sync slave.
858      1 (default) : keep waiting until timeout even no available semi-sync slave.
859 */
860 extern bool rpl_semi_sync_master_wait_no_slave;
861 #endif /* SEMISYNC_MASTER_H */
862