1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008, 2015, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
23 
24 
25 #ifndef SEMISYNC_MASTER_H
26 #define SEMISYNC_MASTER_H
27 
28 #include "semisync.h"
29 
30 #ifdef HAVE_PSI_INTERFACE
31 extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
32 extern PSI_cond_key key_ss_cond_COND_binlog_send_;
33 #endif
34 
35 extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
36 
37 struct TranxNode {
38   char             log_name_[FN_REFLEN];
39   my_off_t         log_pos_;
40   mysql_cond_t     cond;
41   int              n_waiters;
42   struct TranxNode *next_;            /* the next node in the sorted list */
43   struct TranxNode *hash_next_;    /* the next node during hash collision */
44 };
45 
46 /**
47   @class TranxNodeAllocator
48 
49   This class provides memory allocating and freeing methods for
50   TranxNode. The main target is performance.
51 
52   @section ALLOCATE How to allocate a node
53     The pointer of the first node after 'last_node' in current_block is
54     returned. current_block will move to the next free Block when all nodes of
55     it are in use. A new Block is allocated and is put into the rear of the
56     Block link table if no Block is free.
57 
58     The list starts up empty (ie, there is no allocated Block).
59 
60     After some nodes are freed, there probably are some free nodes before
61     the sequence of the allocated nodes, but we do not reuse it. It is better
62     to keep the allocated nodes are in the sequence, for it is more efficient
63     for allocating and freeing TranxNode.
64 
65   @section FREENODE How to free nodes
66     There are two methods for freeing nodes. They are free_all_nodes and
67     free_nodes_before.
68 
69     'A Block is free' means all of its nodes are free.
70     @subsection free_nodes_before
71     As all allocated nodes are in the sequence, 'Before one node' means all
72     nodes before given node in the same Block and all Blocks before the Block
73     which containing the given node. As such, all Blocks before the given one
74     ('node') are free Block and moved into the rear of the Block link table.
75     The Block containing the given 'node', however, is not. For at least the
76     given 'node' is still in use. This will waste at most one Block, but it is
77     more efficient.
78  */
79 #define BLOCK_TRANX_NODES 16
80 class TranxNodeAllocator
81 {
82 public:
83   /**
84     @param reserved_nodes
85       The number of reserved TranxNodes. It is used to set 'reserved_blocks'
86       which can contain at least 'reserved_nodes' number of TranxNodes.  When
87       freeing memory, we will reserve at least reserved_blocks of Blocks not
88       freed.
89    */
TranxNodeAllocator(uint reserved_nodes)90   TranxNodeAllocator(uint reserved_nodes) :
91     reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES +
92                   (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)),
93     first_block(NULL), last_block(NULL),
94     current_block(NULL), last_node(-1), block_num(0) {}
95 
~TranxNodeAllocator()96   ~TranxNodeAllocator()
97   {
98     Block *block= first_block;
99     while (block != NULL)
100     {
101       Block *next= block->next;
102       free_block(block);
103       block= next;
104     }
105   }
106 
107   /**
108     The pointer of the first node after 'last_node' in current_block is
109     returned. current_block will move to the next free Block when all nodes of
110     it are in use. A new Block is allocated and is put into the rear of the
111     Block link table if no Block is free.
112 
113     @return Return a TranxNode *, or NULL if an error occured.
114    */
allocate_node()115   TranxNode *allocate_node()
116   {
117     TranxNode *trx_node;
118     Block *block= current_block;
119 
120     if (last_node == BLOCK_TRANX_NODES-1)
121     {
122       current_block= current_block->next;
123       last_node= -1;
124     }
125 
126     if (current_block == NULL && allocate_block())
127     {
128       current_block= block;
129       if (current_block)
130         last_node= BLOCK_TRANX_NODES-1;
131       return NULL;
132     }
133 
134     trx_node= &(current_block->nodes[++last_node]);
135     trx_node->log_name_[0] = '\0';
136     trx_node->log_pos_= 0;
137     trx_node->next_= 0;
138     trx_node->hash_next_= 0;
139     trx_node->n_waiters= 0;
140     return trx_node;
141   }
142 
143   /**
144     All nodes are freed.
145 
146     @return Return 0, or 1 if an error occured.
147    */
free_all_nodes()148   int free_all_nodes()
149   {
150     current_block= first_block;
151     last_node= -1;
152     free_blocks();
153     return 0;
154   }
155 
156   /**
157     All Blocks before the given 'node' are free Block and moved into the rear
158     of the Block link table.
159 
160     @param node All nodes before 'node' will be freed
161 
162     @return Return 0, or 1 if an error occured.
163    */
free_nodes_before(TranxNode * node)164   int free_nodes_before(TranxNode* node)
165   {
166     Block *block;
167     Block *prev_block= NULL;
168 
169     block= first_block;
170     while (block != current_block->next)
171     {
172       /* Find the Block containing the given node */
173       if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node)
174       {
175         /* All Blocks before the given node are put into the rear */
176         if (first_block != block)
177         {
178           last_block->next= first_block;
179           first_block= block;
180           last_block= prev_block;
181           last_block->next= NULL;
182           free_blocks();
183         }
184         return 0;
185       }
186       prev_block= block;
187       block= block->next;
188     }
189 
190     /* Node does not find should never happen */
191     DBUG_ASSERT(0);
192     return 1;
193   }
194 
195 private:
196   uint reserved_blocks;
197 
198  /**
199    A sequence memory which contains BLOCK_TRANX_NODES TranxNodes.
200 
201    BLOCK_TRANX_NODES The number of TranxNodes which are in a Block.
202 
203    next Every Block has a 'next' pointer which points to the next Block.
204         These linking Blocks constitute a Block link table.
205   */
206   struct Block {
207     Block *next;
208     TranxNode nodes[BLOCK_TRANX_NODES];
209   };
210 
211   /**
212     The 'first_block' is the head of the Block link table;
213    */
214   Block *first_block;
215   /**
216     The 'last_block' is the rear of the Block link table;
217    */
218   Block *last_block;
219 
220   /**
221     current_block always points the Block in the Block link table in
222     which the last allocated node is. The Blocks before it are all in use
223     and the Blocks after it are all free.
224    */
225   Block *current_block;
226 
227   /**
228     It always points to the last node which has been allocated in the
229     current_block.
230    */
231   int last_node;
232 
233   /**
234     How many Blocks are in the Block link table.
235    */
236   uint block_num;
237 
238   /**
239     Allocate a block and then assign it to current_block.
240   */
allocate_block()241   int allocate_block()
242   {
243     Block *block= (Block *)my_malloc(sizeof(Block), MYF(0));
244     if (block)
245     {
246       block->next= NULL;
247 
248       if (first_block == NULL)
249         first_block= block;
250       else
251         last_block->next= block;
252 
253       /* New Block is always put into the rear */
254       last_block= block;
255       /* New Block is always the current_block */
256       current_block= block;
257       ++block_num;
258 
259       for (int i=0; i< BLOCK_TRANX_NODES; i++)
260         mysql_cond_init(key_ss_cond_COND_binlog_send_,
261                         &current_block->nodes[i].cond,
262                         NULL);
263 
264       return 0;
265     }
266     return 1;
267   }
268 
269   /**
270     Free a given Block.
271     @param block The Block will be freed.
272    */
free_block(Block * block)273   void free_block(Block *block)
274   {
275     for (int i=0; i< BLOCK_TRANX_NODES; i++)
276       mysql_cond_destroy(&block->nodes[i].cond);
277     my_free(block);
278     --block_num;
279   }
280 
281 
282   /**
283     If there are some free Blocks and the total number of the Blocks in the
284     Block link table is larger than the 'reserved_blocks', Some free Blocks
285     will be freed until the total number of the Blocks is equal to the
286     'reserved_blocks' or there is only one free Block behind the
287     'current_block'.
288    */
free_blocks()289   void free_blocks()
290   {
291     if (current_block == NULL || current_block->next == NULL)
292       return;
293 
294     /* One free Block is always kept behind the current block */
295     Block *block= current_block->next->next;
296     while (block_num > reserved_blocks && block != NULL)
297     {
298       Block *next= block->next;
299       free_block(block);
300       block= next;
301     }
302     current_block->next->next= block;
303     if (block == NULL)
304       last_block= current_block->next;
305   }
306 };
307 
308 /**
309    This class manages memory for active transaction list.
310 
311    We record each active transaction with a TranxNode, each session
312    can have only one open transaction. Because of EVENT, the total
313    active transaction nodes can exceed the maximum allowed
314    connections.
315 */
316 class ActiveTranx
317   :public Trace {
318 private:
319 
320   TranxNodeAllocator allocator_;
321   /* These two record the active transaction list in sort order. */
322   TranxNode       *trx_front_, *trx_rear_;
323 
324   TranxNode      **trx_htb_;        /* A hash table on active transactions. */
325 
326   int              num_entries_;              /* maximum hash table entries */
327   mysql_mutex_t *lock_;                                     /* mutex lock */
328 
329   inline void assert_lock_owner();
330 
331   inline unsigned int calc_hash(const unsigned char *key,unsigned int length);
332   unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
333 
compare(const char * log_file_name1,my_off_t log_file_pos1,const TranxNode * node2)334   int compare(const char *log_file_name1, my_off_t log_file_pos1,
335               const TranxNode *node2) {
336     return compare(log_file_name1, log_file_pos1,
337                    node2->log_name_, node2->log_pos_);
338   }
compare(const TranxNode * node1,const char * log_file_name2,my_off_t log_file_pos2)339   int compare(const TranxNode *node1,
340               const char *log_file_name2, my_off_t log_file_pos2) {
341     return compare(node1->log_name_, node1->log_pos_,
342                    log_file_name2, log_file_pos2);
343   }
compare(const TranxNode * node1,const TranxNode * node2)344   int compare(const TranxNode *node1, const TranxNode *node2) {
345     return compare(node1->log_name_, node1->log_pos_,
346                    node2->log_name_, node2->log_pos_);
347   }
348 
349 public:
350   int signal_waiting_sessions_all();
351   int signal_waiting_sessions_up_to(const char *log_file_name,
352                                     my_off_t log_file_pos);
353   TranxNode* find_active_tranx_node(const char *log_file_name,
354                                     my_off_t log_file_pos);
355   ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
356   ~ActiveTranx();
357 
358   /* Insert an active transaction node with the specified position.
359    *
360    * Return:
361    *  0: success;  non-zero: error
362    */
363   int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
364 
365   /* Clear the active transaction nodes until(inclusive) the specified
366    * position.
367    * If log_file_name is NULL, everything will be cleared: the sorted
368    * list and the hash table will be reset to empty.
369    *
370    * Return:
371    *  0: success;  non-zero: error
372    */
373   int clear_active_tranx_nodes(const char *log_file_name,
374                                my_off_t    log_file_pos);
375 
376   /* Given a position, check to see whether the position is an active
377    * transaction's ending position by probing the hash table.
378    */
379   bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
380 
381   /* Given two binlog positions, compare which one is bigger based on
382    * (file_name, file_position).
383    */
384   static int compare(const char *log_file_name1, my_off_t log_file_pos1,
385                      const char *log_file_name2, my_off_t log_file_pos2);
386 
387   /* Find out if active tranx node list is empty or not
388    *
389    * Return:
390    *   True :  If there are no nodes
391    *   False:  othewise
392   */
is_empty()393   bool is_empty()
394   {
395     return (trx_front_ == NULL);
396   }
397 
398 };
399 
400 /**
401    The extension class for the master of semi-synchronous replication
402 */
403 class ReplSemiSyncMaster
404   :public ReplSemiSyncBase {
405  private:
406   ActiveTranx    *active_tranxs_;  /* active transaction list: the list will
407                                       be cleared when semi-sync switches off. */
408 
409   /* True when initObject has been called */
410   bool init_done_;
411 
412   /* Mutex that protects the following state variables and the active
413    * transaction list.
414    * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
415    * already holding LOCK_binlog_ because it can cause deadlocks.
416    */
417   mysql_mutex_t LOCK_binlog_;
418 
419   /* This is set to true when reply_file_name_ contains meaningful data. */
420   bool            reply_file_name_inited_;
421 
422   /* The binlog name up to which we have received replies from any slaves. */
423   char            reply_file_name_[FN_REFLEN];
424 
425   /* The position in that file up to which we have the reply from any slaves. */
426   my_off_t        reply_file_pos_;
427 
428   /* This is set to true when we know the 'smallest' wait position. */
429   bool            wait_file_name_inited_;
430 
431   /* NULL, or the 'smallest' filename that a transaction is waiting for
432    * slave replies.
433    */
434   char            wait_file_name_[FN_REFLEN];
435 
436   /* The smallest position in that file that a trx is waiting for: the trx
437    * can proceed and send an 'ok' to the client when the master has got the
438    * reply from the slave indicating that it already got the binlog events.
439    */
440   my_off_t        wait_file_pos_;
441 
442   /* This is set to true when we know the 'largest' transaction commit
443    * position in the binlog file.
444    * We always maintain the position no matter whether semi-sync is switched
445    * on switched off.  When a transaction wait timeout occurs, semi-sync will
446    * switch off.  Binlog-dump thread can use the three fields to detect when
447    * slaves catch up on replication so that semi-sync can switch on again.
448    */
449   bool            commit_file_name_inited_;
450 
451   /* The 'largest' binlog filename that a commit transaction is seeing.       */
452   char            commit_file_name_[FN_REFLEN];
453 
454   /* The 'largest' position in that file that a commit transaction is seeing. */
455   my_off_t        commit_file_pos_;
456 
457   /* All global variables which can be set by parameters. */
458   volatile bool            master_enabled_;      /* semi-sync is enabled on the master */
459   unsigned long           wait_timeout_;      /* timeout period(ms) during tranx wait */
460 
461   bool            state_;                    /* whether semi-sync is switched */
462 
463   void lock();
464   void unlock();
465 
466   /* Is semi-sync replication on? */
is_on()467   bool is_on() {
468     return (state_);
469   }
470 
set_master_enabled(bool enabled)471   void set_master_enabled(bool enabled) {
472     master_enabled_ = enabled;
473   }
474 
475   /* Switch semi-sync off because of timeout in transaction waiting. */
476   int switch_off();
477 
478   /* Switch semi-sync on when slaves catch up. */
479   int try_switch_on(int server_id,
480                     const char *log_file_name, my_off_t log_file_pos);
481 
482  public:
483   ReplSemiSyncMaster();
484   ~ReplSemiSyncMaster();
485 
getMasterEnabled()486   bool getMasterEnabled() {
487     return master_enabled_;
488   }
setTraceLevel(unsigned long trace_level)489   void setTraceLevel(unsigned long trace_level) {
490     trace_level_ = trace_level;
491     if (active_tranxs_)
492       active_tranxs_->trace_level_ = trace_level;
493   }
494 
495   /* Set the transaction wait timeout period, in milliseconds. */
setWaitTimeout(unsigned long wait_timeout)496   void setWaitTimeout(unsigned long wait_timeout) {
497     wait_timeout_ = wait_timeout;
498   }
499 
500   /* Initialize this class after MySQL parameters are initialized. this
501    * function should be called once at bootstrap time.
502    */
503   int initObject();
504 
505   /* Enable the object to enable semi-sync replication inside the master. */
506   int enableMaster();
507 
508   /* Enable the object to enable semi-sync replication inside the master. */
509   int disableMaster();
510 
511   /* Add a semi-sync replication slave */
512   void add_slave();
513 
514   /* Remove a semi-sync replication slave */
515   void remove_slave();
516 
517   /* Is the slave servered by the thread requested semi-sync */
518   bool is_semi_sync_slave();
519 
520   /* In semi-sync replication, reports up to which binlog position we have
521    * received replies from the slave indicating that it already get the events
522    * or that was skipped in the master.
523    *
524    * Input:
525    *  server_id     - (IN)  master server id number
526    *  log_file_name - (IN)  binlog file name
527    *  end_offset    - (IN)  the offset in the binlog file up to which we have
528    *                        the replies from the slave or that was skipped
529    *  skipped_event - (IN)  if the event was skipped
530    *
531    * Return:
532    *  0: success;  non-zero: error
533    */
534   int reportReplyBinlog(uint32 server_id,
535                         const char* log_file_name,
536                         my_off_t end_offset,
537                         bool skipped_event= false);
538 
539   /* Commit a transaction in the final step.  This function is called from
540    * InnoDB before returning from the low commit.  If semi-sync is switch on,
541    * the function will wait to see whether binlog-dump thread get the reply for
542    * the events of the transaction.  Remember that this is not a direct wait,
543    * instead, it waits to see whether the binlog-dump thread has reached the
544    * point.  If the wait times out, semi-sync status will be switched off and
545    * all other transaction would not wait either.
546    *
547    * Input:  (the transaction events' ending binlog position)
548    *  trx_wait_binlog_name - (IN)  ending position's file name
549    *  trx_wait_binlog_pos  - (IN)  ending position's file offset
550    *
551    * Return:
552    *  0: success;  non-zero: error
553    */
554   int commitTrx(const char* trx_wait_binlog_name,
555                 my_off_t trx_wait_binlog_pos);
556 
557   /* Reserve space in the replication event packet header:
558    *  . slave semi-sync off: 1 byte - (0)
559    *  . slave semi-sync on:  3 byte - (0, 0xef, 0/1}
560    *
561    * Input:
562    *  header   - (IN)  the header buffer
563    *  size     - (IN)  size of the header buffer
564    *
565    * Return:
566    *  size of the bytes reserved for header
567    */
568   int reserveSyncHeader(unsigned char *header, unsigned long size);
569 
570   /* Update the sync bit in the packet header to indicate to the slave whether
571    * the master will wait for the reply of the event.  If semi-sync is switched
572    * off and we detect that the slave is catching up, we switch semi-sync on.
573    *
574    * Input:
575    *  packet        - (IN)  the packet containing the replication event
576    *  log_file_name - (IN)  the event ending position's file name
577    *  log_file_pos  - (IN)  the event ending position's file offset
578    *  server_id     - (IN)  master server id number
579    *
580    * Return:
581    *  0: success;  non-zero: error
582    */
583   int updateSyncHeader(unsigned char *packet,
584                        const char *log_file_name,
585 		       my_off_t log_file_pos,
586 		       uint32 server_id);
587 
588   /* Called when a transaction finished writing binlog events.
589    *  . update the 'largest' transactions' binlog event position
590    *  . insert the ending position in the active transaction list if
591    *    semi-sync is on
592    *
593    * Input:  (the transaction events' ending binlog position)
594    *  log_file_name - (IN)  transaction ending position's file name
595    *  log_file_pos  - (IN)  transaction ending position's file offset
596    *
597    * Return:
598    *  0: success;  non-zero: error
599    */
600   int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
601 
602   /* Read the slave's reply so that we know how much progress the slave makes
603    * on receive replication events.
604    *
605    * Input:
606    *  net          - (IN)  the connection to master
607    *  server_id    - (IN)  master server id number
608    *  event_buf    - (IN)  pointer to the event packet
609    *
610    * Return:
611    *  0: success;  non-zero: error
612    */
613   int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
614 
615   /* In semi-sync replication, this method simulates the reception of
616    * an reply and executes reportReplyBinlog directly when a transaction
617    * is skipped in the master.
618    *
619    * Input:
620    *  event_buf     - (IN)  pointer to the event packet
621    *  server_id     - (IN)  master server id numbe
622    *  log_file_name - (IN)  the event ending position's file name
623    *  log_file_pos  - (IN)  the event ending position's file offset
624    *
625    * Return:
626    *  0: success;  non-zero: error
627    */
628   int skipSlaveReply(const char *event_buf, uint32 server_id,
629                      const char* log_file_name, my_off_t log_file_pos);
630 
631   /* Export internal statistics for semi-sync replication. */
632   void setExportStats();
633 
634   /* 'reset master' command is issued from the user and semi-sync need to
635    * go off for that.
636    */
637   int resetMaster();
638 };
639 
640 /* System and status variables for the master component */
641 extern char rpl_semi_sync_master_enabled;
642 extern char rpl_semi_sync_master_status;
643 extern unsigned long rpl_semi_sync_master_clients;
644 extern unsigned long rpl_semi_sync_master_timeout;
645 extern unsigned long rpl_semi_sync_master_trace_level;
646 extern unsigned long rpl_semi_sync_master_yes_transactions;
647 extern unsigned long rpl_semi_sync_master_no_transactions;
648 extern unsigned long rpl_semi_sync_master_off_times;
649 extern unsigned long rpl_semi_sync_master_wait_timeouts;
650 extern unsigned long rpl_semi_sync_master_timefunc_fails;
651 extern unsigned long rpl_semi_sync_master_num_timeouts;
652 extern unsigned long rpl_semi_sync_master_wait_sessions;
653 extern unsigned long rpl_semi_sync_master_wait_pos_backtraverse;
654 extern unsigned long rpl_semi_sync_master_avg_trx_wait_time;
655 extern unsigned long rpl_semi_sync_master_avg_net_wait_time;
656 extern unsigned long long rpl_semi_sync_master_net_wait_num;
657 extern unsigned long long rpl_semi_sync_master_trx_wait_num;
658 extern unsigned long long rpl_semi_sync_master_net_wait_time;
659 extern unsigned long long rpl_semi_sync_master_trx_wait_time;
660 
661 /*
662   This indicates whether we should keep waiting if no semi-sync slave
663   is available.
664      0           : stop waiting if detected no avaialable semi-sync slave.
665      1 (default) : keep waiting until timeout even no available semi-sync slave.
666 */
667 extern char rpl_semi_sync_master_wait_no_slave;
668 
669 #endif /* SEMISYNC_MASTER_H */
670