1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
3    Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
17 
18 
19 #ifndef SEMISYNC_MASTER_H
20 #define SEMISYNC_MASTER_H
21 
22 #include "semisync.h"
23 
24 #ifdef HAVE_PSI_INTERFACE
25 extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
26 extern PSI_cond_key key_ss_cond_COND_binlog_send_;
27 #endif
28 
29 struct TranxNode {
30   char             log_name_[FN_REFLEN];
31   my_off_t          log_pos_;
32   struct TranxNode *next_;            /* the next node in the sorted list */
33   struct TranxNode *hash_next_;    /* the next node during hash collision */
34 };
35 
36 /**
37   @class TranxNodeAllocator
38 
39   This class provides memory allocating and freeing methods for
40   TranxNode. The main target is performance.
41 
42   @section ALLOCATE How to allocate a node
43     The pointer of the first node after 'last_node' in current_block is
44     returned. current_block will move to the next free Block when all nodes of
45     it are in use. A new Block is allocated and is put into the rear of the
46     Block link table if no Block is free.
47 
48     The list starts up empty (ie, there is no allocated Block).
49 
50     After some nodes are freed, there probably are some free nodes before
51     the sequence of the allocated nodes, but we do not reuse it. It is better
52     to keep the allocated nodes are in the sequence, for it is more efficient
53     for allocating and freeing TranxNode.
54 
55   @section FREENODE How to free nodes
56     There are two methods for freeing nodes. They are free_all_nodes and
57     free_nodes_before.
58 
59     'A Block is free' means all of its nodes are free.
60     @subsection free_nodes_before
61     As all allocated nodes are in the sequence, 'Before one node' means all
62     nodes before given node in the same Block and all Blocks before the Block
63     which containing the given node. As such, all Blocks before the given one
64     ('node') are free Block and moved into the rear of the Block link table.
65     The Block containing the given 'node', however, is not. For at least the
66     given 'node' is still in use. This will waste at most one Block, but it is
67     more efficient.
68  */
69 #define BLOCK_TRANX_NODES 16
70 class TranxNodeAllocator
71 {
72 public:
73   /**
74     @param reserved_nodes
75       The number of reserved TranxNodes. It is used to set 'reserved_blocks'
76       which can contain at least 'reserved_nodes' number of TranxNodes.  When
77       freeing memory, we will reserve at least reserved_blocks of Blocks not
78       freed.
79    */
TranxNodeAllocator(uint reserved_nodes)80   TranxNodeAllocator(uint reserved_nodes) :
81     reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES +
82                   (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)),
83     first_block(NULL), last_block(NULL),
84     current_block(NULL), last_node(-1), block_num(0) {}
85 
~TranxNodeAllocator()86   ~TranxNodeAllocator()
87   {
88     Block *block= first_block;
89     while (block != NULL)
90     {
91       Block *next= block->next;
92       free_block(block);
93       block= next;
94     }
95   }
96 
97   /**
98     The pointer of the first node after 'last_node' in current_block is
99     returned. current_block will move to the next free Block when all nodes of
100     it are in use. A new Block is allocated and is put into the rear of the
101     Block link table if no Block is free.
102 
103     @return Return a TranxNode *, or NULL if an error occured.
104    */
allocate_node()105   TranxNode *allocate_node()
106   {
107     TranxNode *trx_node;
108     Block *block= current_block;
109 
110     if (last_node == BLOCK_TRANX_NODES-1)
111     {
112       current_block= current_block->next;
113       last_node= -1;
114     }
115 
116     if (current_block == NULL && allocate_block())
117     {
118       current_block= block;
119       if (current_block)
120         last_node= BLOCK_TRANX_NODES-1;
121       return NULL;
122     }
123 
124     trx_node= &(current_block->nodes[++last_node]);
125     trx_node->log_name_[0] = '\0';
126     trx_node->log_pos_= 0;
127     trx_node->next_= 0;
128     trx_node->hash_next_= 0;
129     return trx_node;
130   }
131 
132   /**
133     All nodes are freed.
134 
135     @return Return 0, or 1 if an error occured.
136    */
free_all_nodes()137   int free_all_nodes()
138   {
139     current_block= first_block;
140     last_node= -1;
141     free_blocks();
142     return 0;
143   }
144 
145   /**
146     All Blocks before the given 'node' are free Block and moved into the rear
147     of the Block link table.
148 
149     @param node All nodes before 'node' will be freed
150 
151     @return Return 0, or 1 if an error occured.
152    */
free_nodes_before(TranxNode * node)153   int free_nodes_before(TranxNode* node)
154   {
155     Block *block;
156     Block *prev_block= NULL;
157 
158     block= first_block;
159     while (block != current_block->next)
160     {
161       /* Find the Block containing the given node */
162       if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node)
163       {
164         /* All Blocks before the given node are put into the rear */
165         if (first_block != block)
166         {
167           last_block->next= first_block;
168           first_block= block;
169           last_block= prev_block;
170           last_block->next= NULL;
171           free_blocks();
172         }
173         return 0;
174       }
175       prev_block= block;
176       block= block->next;
177     }
178 
179     /* Node does not find should never happen */
180     DBUG_ASSERT(0);
181     return 1;
182   }
183 
184 private:
185   uint reserved_blocks;
186 
187  /**
188    A sequence memory which contains BLOCK_TRANX_NODES TranxNodes.
189 
190    BLOCK_TRANX_NODES The number of TranxNodes which are in a Block.
191 
192    next Every Block has a 'next' pointer which points to the next Block.
193         These linking Blocks constitute a Block link table.
194   */
195   struct Block {
196     Block *next;
197     TranxNode nodes[BLOCK_TRANX_NODES];
198   };
199 
200   /**
201     The 'first_block' is the head of the Block link table;
202    */
203   Block *first_block;
204   /**
205     The 'last_block' is the rear of the Block link table;
206    */
207   Block *last_block;
208 
209   /**
210     current_block always points the Block in the Block link table in
211     which the last allocated node is. The Blocks before it are all in use
212     and the Blocks after it are all free.
213    */
214   Block *current_block;
215 
216   /**
217     It always points to the last node which has been allocated in the
218     current_block.
219    */
220   int last_node;
221 
222   /**
223     How many Blocks are in the Block link table.
224    */
225   uint block_num;
226 
227   /**
228     Allocate a block and then assign it to current_block.
229   */
allocate_block()230   int allocate_block()
231   {
232     Block *block= (Block *)my_malloc(sizeof(Block), MYF(0));
233     if (block)
234     {
235       block->next= NULL;
236 
237       if (first_block == NULL)
238         first_block= block;
239       else
240         last_block->next= block;
241 
242       /* New Block is always put into the rear */
243       last_block= block;
244       /* New Block is always the current_block */
245       current_block= block;
246       ++block_num;
247       return 0;
248     }
249     return 1;
250   }
251 
252   /**
253     Free a given Block.
254     @param block The Block will be freed.
255    */
free_block(Block * block)256   void free_block(Block *block)
257   {
258     my_free(block);
259     --block_num;
260   }
261 
262 
263   /**
264     If there are some free Blocks and the total number of the Blocks in the
265     Block link table is larger than the 'reserved_blocks', Some free Blocks
266     will be freed until the total number of the Blocks is equal to the
267     'reserved_blocks' or there is only one free Block behind the
268     'current_block'.
269    */
free_blocks()270   void free_blocks()
271   {
272     if (current_block == NULL || current_block->next == NULL)
273       return;
274 
275     /* One free Block is always kept behind the current block */
276     Block *block= current_block->next->next;
277     while (block_num > reserved_blocks && block != NULL)
278     {
279       Block *next= block->next;
280       free_block(block);
281       block= next;
282     }
283     current_block->next->next= block;
284     if (block == NULL)
285       last_block= current_block->next;
286   }
287 };
288 
289 /**
290    This class manages memory for active transaction list.
291 
292    We record each active transaction with a TranxNode, each session
293    can have only one open transaction. Because of EVENT, the total
294    active transaction nodes can exceed the maximum allowed
295    connections.
296 */
297 class ActiveTranx
298   :public Trace {
299 private:
300 
301   TranxNodeAllocator allocator_;
302   /* These two record the active transaction list in sort order. */
303   TranxNode       *trx_front_, *trx_rear_;
304 
305   TranxNode      **trx_htb_;        /* A hash table on active transactions. */
306 
307   int              num_entries_;              /* maximum hash table entries */
308   mysql_mutex_t *lock_;                                     /* mutex lock */
309 
310   inline void assert_lock_owner();
311 
312   inline unsigned int calc_hash(const unsigned char *key,unsigned int length);
313   unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
314 
compare(const char * log_file_name1,my_off_t log_file_pos1,const TranxNode * node2)315   int compare(const char *log_file_name1, my_off_t log_file_pos1,
316               const TranxNode *node2) {
317     return compare(log_file_name1, log_file_pos1,
318                    node2->log_name_, node2->log_pos_);
319   }
compare(const TranxNode * node1,const char * log_file_name2,my_off_t log_file_pos2)320   int compare(const TranxNode *node1,
321               const char *log_file_name2, my_off_t log_file_pos2) {
322     return compare(node1->log_name_, node1->log_pos_,
323                    log_file_name2, log_file_pos2);
324   }
compare(const TranxNode * node1,const TranxNode * node2)325   int compare(const TranxNode *node1, const TranxNode *node2) {
326     return compare(node1->log_name_, node1->log_pos_,
327                    node2->log_name_, node2->log_pos_);
328   }
329 
330 public:
331   ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
332   ~ActiveTranx();
333 
334   /* Insert an active transaction node with the specified position.
335    *
336    * Return:
337    *  0: success;  non-zero: error
338    */
339   int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
340 
341   /* Clear the active transaction nodes until(inclusive) the specified
342    * position.
343    * If log_file_name is NULL, everything will be cleared: the sorted
344    * list and the hash table will be reset to empty.
345    *
346    * Return:
347    *  0: success;  non-zero: error
348    */
349   int clear_active_tranx_nodes(const char *log_file_name,
350                                my_off_t    log_file_pos);
351 
352   /* Given a position, check to see whether the position is an active
353    * transaction's ending position by probing the hash table.
354    */
355   bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
356 
357   /* Given two binlog positions, compare which one is bigger based on
358    * (file_name, file_position).
359    */
360   static int compare(const char *log_file_name1, my_off_t log_file_pos1,
361                      const char *log_file_name2, my_off_t log_file_pos2);
362 
363 };
364 
365 /**
366    The extension class for the master of semi-synchronous replication
367 */
368 class ReplSemiSyncMaster
369   :public ReplSemiSyncBase {
370  private:
371   ActiveTranx    *active_tranxs_;  /* active transaction list: the list will
372                                       be cleared when semi-sync switches off. */
373 
374   /* True when initObject has been called */
375   bool init_done_;
376 
377   /* This cond variable is signaled when enough binlog has been sent to slave,
378    * so that a waiting trx can return the 'ok' to the client for a commit.
379    */
380   mysql_cond_t  COND_binlog_send_;
381 
382   /* Mutex that protects the following state variables and the active
383    * transaction list.
384    * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
385    * already holding LOCK_binlog_ because it can cause deadlocks.
386    */
387   mysql_mutex_t LOCK_binlog_;
388 
389   /* This is set to true when reply_file_name_ contains meaningful data. */
390   bool            reply_file_name_inited_;
391 
392   /* The binlog name up to which we have received replies from any slaves. */
393   char            reply_file_name_[FN_REFLEN];
394 
395   /* The position in that file up to which we have the reply from any slaves. */
396   my_off_t        reply_file_pos_;
397 
398   /* This is set to true when we know the 'smallest' wait position. */
399   bool            wait_file_name_inited_;
400 
401   /* NULL, or the 'smallest' filename that a transaction is waiting for
402    * slave replies.
403    */
404   char            wait_file_name_[FN_REFLEN];
405 
406   /* The smallest position in that file that a trx is waiting for: the trx
407    * can proceed and send an 'ok' to the client when the master has got the
408    * reply from the slave indicating that it already got the binlog events.
409    */
410   my_off_t        wait_file_pos_;
411 
412   /* This is set to true when we know the 'largest' transaction commit
413    * position in the binlog file.
414    * We always maintain the position no matter whether semi-sync is switched
415    * on switched off.  When a transaction wait timeout occurs, semi-sync will
416    * switch off.  Binlog-dump thread can use the three fields to detect when
417    * slaves catch up on replication so that semi-sync can switch on again.
418    */
419   bool            commit_file_name_inited_;
420 
421   /* The 'largest' binlog filename that a commit transaction is seeing.       */
422   char            commit_file_name_[FN_REFLEN];
423 
424   /* The 'largest' position in that file that a commit transaction is seeing. */
425   my_off_t        commit_file_pos_;
426 
427   /* All global variables which can be set by parameters. */
428   volatile bool            master_enabled_;      /* semi-sync is enabled on the master */
429   unsigned long           wait_timeout_;      /* timeout period(ms) during tranx wait */
430 
431   bool            state_;                    /* whether semi-sync is switched */
432 
433   void lock();
434   void unlock();
435   void cond_broadcast();
436   int  cond_timewait(struct timespec *wait_time);
437 
438   /* Is semi-sync replication on? */
is_on()439   bool is_on() {
440     return (state_);
441   }
442 
set_master_enabled(bool enabled)443   void set_master_enabled(bool enabled) {
444     master_enabled_ = enabled;
445   }
446 
447   /* Switch semi-sync off because of timeout in transaction waiting. */
448   int switch_off();
449 
450   /* Switch semi-sync on when slaves catch up. */
451   int try_switch_on(int server_id,
452                     const char *log_file_name, my_off_t log_file_pos);
453 
454  public:
455   ReplSemiSyncMaster();
456   ~ReplSemiSyncMaster();
457 
getMasterEnabled()458   bool getMasterEnabled() {
459     return master_enabled_;
460   }
setTraceLevel(unsigned long trace_level)461   void setTraceLevel(unsigned long trace_level) {
462     trace_level_ = trace_level;
463     if (active_tranxs_)
464       active_tranxs_->trace_level_ = trace_level;
465   }
466 
467   /* Set the transaction wait timeout period, in milliseconds. */
setWaitTimeout(unsigned long wait_timeout)468   void setWaitTimeout(unsigned long wait_timeout) {
469     wait_timeout_ = wait_timeout;
470   }
471 
472   /* Initialize this class after MySQL parameters are initialized. this
473    * function should be called once at bootstrap time.
474    */
475   int initObject();
476 
477   /* Enable the object to enable semi-sync replication inside the master. */
478   int enableMaster();
479 
480   /* Enable the object to enable semi-sync replication inside the master. */
481   int disableMaster();
482 
483   /* Add a semi-sync replication slave */
484   void add_slave();
485 
486   /* Remove a semi-sync replication slave */
487   void remove_slave();
488 
489   /* Is the slave servered by the thread requested semi-sync */
490   bool is_semi_sync_slave();
491 
492   /* In semi-sync replication, reports up to which binlog position we have
493    * received replies from the slave indicating that it already get the events.
494    *
495    * Input:
496    *  server_id     - (IN)  master server id number
497    *  log_file_name - (IN)  binlog file name
498    *  end_offset    - (IN)  the offset in the binlog file up to which we have
499    *                        the replies from the slave
500    *
501    * Return:
502    *  0: success;  non-zero: error
503    */
504   int reportReplyBinlog(uint32 server_id,
505                         const char* log_file_name,
506                         my_off_t end_offset);
507 
508   /* Commit a transaction in the final step.  This function is called from
509    * InnoDB before returning from the low commit.  If semi-sync is switch on,
510    * the function will wait to see whether binlog-dump thread get the reply for
511    * the events of the transaction.  Remember that this is not a direct wait,
512    * instead, it waits to see whether the binlog-dump thread has reached the
513    * point.  If the wait times out, semi-sync status will be switched off and
514    * all other transaction would not wait either.
515    *
516    * Input:  (the transaction events' ending binlog position)
517    *  trx_wait_binlog_name - (IN)  ending position's file name
518    *  trx_wait_binlog_pos  - (IN)  ending position's file offset
519    *
520    * Return:
521    *  0: success;  non-zero: error
522    */
523   int commitTrx(const char* trx_wait_binlog_name,
524                 my_off_t trx_wait_binlog_pos);
525 
526   /* Reserve space in the replication event packet header:
527    *  . slave semi-sync off: 1 byte - (0)
528    *  . slave semi-sync on:  3 byte - (0, 0xef, 0/1}
529    *
530    * Input:
531    *  header   - (IN)  the header buffer
532    *  size     - (IN)  size of the header buffer
533    *
534    * Return:
535    *  size of the bytes reserved for header
536    */
537   int reserveSyncHeader(unsigned char *header, unsigned long size);
538 
539   /* Update the sync bit in the packet header to indicate to the slave whether
540    * the master will wait for the reply of the event.  If semi-sync is switched
541    * off and we detect that the slave is catching up, we switch semi-sync on.
542    *
543    * Input:
544    *  packet        - (IN)  the packet containing the replication event
545    *  log_file_name - (IN)  the event ending position's file name
546    *  log_file_pos  - (IN)  the event ending position's file offset
547    *  server_id     - (IN)  master server id number
548    *
549    * Return:
550    *  0: success;  non-zero: error
551    */
552   int updateSyncHeader(unsigned char *packet,
553                        const char *log_file_name,
554 		       my_off_t log_file_pos,
555 		       uint32 server_id);
556 
557   /* Called when a transaction finished writing binlog events.
558    *  . update the 'largest' transactions' binlog event position
559    *  . insert the ending position in the active transaction list if
560    *    semi-sync is on
561    *
562    * Input:  (the transaction events' ending binlog position)
563    *  log_file_name - (IN)  transaction ending position's file name
564    *  log_file_pos  - (IN)  transaction ending position's file offset
565    *
566    * Return:
567    *  0: success;  non-zero: error
568    */
569   int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
570 
571   /* Read the slave's reply so that we know how much progress the slave makes
572    * on receive replication events.
573    *
574    * Input:
575    *  net          - (IN)  the connection to master
576    *  server_id    - (IN)  master server id number
577    *  event_buf    - (IN)  pointer to the event packet
578    *
579    * Return:
580    *  0: success;  non-zero: error
581    */
582   int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
583 
584   /* Export internal statistics for semi-sync replication. */
585   void setExportStats();
586 
587   /* 'reset master' command is issued from the user and semi-sync need to
588    * go off for that.
589    */
590   int resetMaster();
591 };
592 
593 /* System and status variables for the master component */
594 extern char rpl_semi_sync_master_enabled;
595 extern char rpl_semi_sync_master_status;
596 extern unsigned long rpl_semi_sync_master_clients;
597 extern unsigned long rpl_semi_sync_master_timeout;
598 extern unsigned long rpl_semi_sync_master_trace_level;
599 extern unsigned long rpl_semi_sync_master_yes_transactions;
600 extern unsigned long rpl_semi_sync_master_no_transactions;
601 extern unsigned long rpl_semi_sync_master_off_times;
602 extern unsigned long rpl_semi_sync_master_wait_timeouts;
603 extern unsigned long rpl_semi_sync_master_timefunc_fails;
604 extern unsigned long rpl_semi_sync_master_num_timeouts;
605 extern unsigned long rpl_semi_sync_master_wait_sessions;
606 extern unsigned long rpl_semi_sync_master_wait_pos_backtraverse;
607 extern unsigned long rpl_semi_sync_master_avg_trx_wait_time;
608 extern unsigned long rpl_semi_sync_master_avg_net_wait_time;
609 extern unsigned long long rpl_semi_sync_master_net_wait_num;
610 extern unsigned long long rpl_semi_sync_master_trx_wait_num;
611 extern unsigned long long rpl_semi_sync_master_net_wait_time;
612 extern unsigned long long rpl_semi_sync_master_trx_wait_time;
613 
614 /*
615   This indicates whether we should keep waiting if no semi-sync slave
616   is available.
617      0           : stop waiting if detected no avaialable semi-sync slave.
618      1 (default) : keep waiting until timeout even no available semi-sync slave.
619 */
620 extern char rpl_semi_sync_master_wait_no_slave;
621 
622 #endif /* SEMISYNC_MASTER_H */
623