1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008, 2013, Oracle and/or its affiliates.
3    Copyright (c) 2011, 2016, MariaDB
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
17 
18 
19 #include <my_global.h>
20 #include "semisync_master.h"
21 
22 #define TIME_THOUSAND 1000
23 #define TIME_MILLION  1000000
24 #define TIME_BILLION  1000000000
25 
26 /* This indicates whether semi-synchronous replication is enabled. */
27 my_bool rpl_semi_sync_master_enabled= 0;
28 unsigned long long rpl_semi_sync_master_request_ack = 0;
29 unsigned long long rpl_semi_sync_master_get_ack = 0;
30 my_bool rpl_semi_sync_master_wait_no_slave = 1;
31 my_bool rpl_semi_sync_master_status        = 0;
32 ulong rpl_semi_sync_master_wait_point       =
33     SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT;
34 ulong rpl_semi_sync_master_timeout;
35 ulong rpl_semi_sync_master_trace_level;
36 ulong rpl_semi_sync_master_yes_transactions = 0;
37 ulong rpl_semi_sync_master_no_transactions  = 0;
38 ulong rpl_semi_sync_master_off_times        = 0;
39 ulong rpl_semi_sync_master_timefunc_fails   = 0;
40 ulong rpl_semi_sync_master_wait_timeouts     = 0;
41 ulong rpl_semi_sync_master_wait_sessions    = 0;
42 ulong rpl_semi_sync_master_wait_pos_backtraverse = 0;
43 ulong rpl_semi_sync_master_avg_trx_wait_time = 0;
44 ulonglong rpl_semi_sync_master_trx_wait_num = 0;
45 ulong rpl_semi_sync_master_avg_net_wait_time    = 0;
46 ulonglong rpl_semi_sync_master_net_wait_num = 0;
47 ulong rpl_semi_sync_master_clients          = 0;
48 ulonglong rpl_semi_sync_master_net_wait_time = 0;
49 ulonglong rpl_semi_sync_master_trx_wait_time = 0;
50 
51 Repl_semi_sync_master repl_semisync_master;
52 Ack_receiver ack_receiver;
53 
54 /*
55   structure to save transaction log filename and position
56 */
57 typedef struct Trans_binlog_info {
58   my_off_t log_pos;
59   char log_file[FN_REFLEN];
60 } Trans_binlog_info;
61 
62 static int get_wait_time(const struct timespec& start_ts);
63 
timespec_to_usec(const struct timespec * ts)64 static ulonglong timespec_to_usec(const struct timespec *ts)
65 {
66   return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
67 }
68 
69 /*******************************************************************************
70  *
71  * <Active_tranx> class : manage all active transaction nodes
72  *
73  ******************************************************************************/
74 
Active_tranx(mysql_mutex_t * lock,ulong trace_level)75 Active_tranx::Active_tranx(mysql_mutex_t *lock,
76                            ulong trace_level)
77   : Trace(trace_level), m_allocator(max_connections),
78     m_num_entries(max_connections << 1), /* Transaction hash table size
79                                          * is set to double the size
80                                          * of max_connections */
81     m_lock(lock)
82 {
83   /* No transactions are in the list initially. */
84   m_trx_front = NULL;
85   m_trx_rear  = NULL;
86 
87   /* Create the hash table to find a transaction's ending event. */
88   m_trx_htb = new Tranx_node *[m_num_entries];
89   for (int idx = 0; idx < m_num_entries; ++idx)
90     m_trx_htb[idx] = NULL;
91 
92   sql_print_information("Semi-sync replication initialized for transactions.");
93 }
94 
~Active_tranx()95 Active_tranx::~Active_tranx()
96 {
97   delete [] m_trx_htb;
98   m_trx_htb          = NULL;
99   m_num_entries      = 0;
100 }
101 
calc_hash(const unsigned char * key,size_t length)102 unsigned int Active_tranx::calc_hash(const unsigned char *key, size_t length)
103 {
104   unsigned int nr = 1, nr2 = 4;
105 
106   /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
107   while (length--)
108   {
109     nr  ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
110     nr2 += 3;
111   }
112   return((unsigned int) nr);
113 }
114 
get_hash_value(const char * log_file_name,my_off_t log_file_pos)115 unsigned int Active_tranx::get_hash_value(const char *log_file_name,
116                                           my_off_t    log_file_pos)
117 {
118   unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
119                                  strlen(log_file_name));
120   unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
121                                  sizeof(log_file_pos));
122 
123   return (hash1 + hash2) % m_num_entries;
124 }
125 
compare(const char * log_file_name1,my_off_t log_file_pos1,const char * log_file_name2,my_off_t log_file_pos2)126 int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
127                           const char *log_file_name2, my_off_t log_file_pos2)
128 {
129   int cmp = strcmp(log_file_name1, log_file_name2);
130 
131   if (cmp != 0)
132     return cmp;
133 
134   if (log_file_pos1 > log_file_pos2)
135     return 1;
136   else if (log_file_pos1 < log_file_pos2)
137     return -1;
138   return 0;
139 }
140 
insert_tranx_node(const char * log_file_name,my_off_t log_file_pos)141 int Active_tranx::insert_tranx_node(const char *log_file_name,
142                                     my_off_t log_file_pos)
143 {
144   Tranx_node  *ins_node;
145   int         result = 0;
146   unsigned int        hash_val;
147 
148   DBUG_ENTER("Active_tranx:insert_tranx_node");
149 
150   ins_node = m_allocator.allocate_node();
151   if (!ins_node)
152   {
153     sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
154                     "Active_tranx:insert_tranx_node",
155                     log_file_name, (ulong)log_file_pos);
156     result = -1;
157     goto l_end;
158   }
159 
160   /* insert the binlog position in the active transaction list. */
161   strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1);
162   ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
163   ins_node->log_pos = log_file_pos;
164 
165   if (!m_trx_front)
166   {
167     /* The list is empty. */
168     m_trx_front = m_trx_rear = ins_node;
169   }
170   else
171   {
172     int cmp = compare(ins_node, m_trx_rear);
173     if (cmp > 0)
174     {
175       /* Compare with the tail first.  If the transaction happens later in
176        * binlog, then make it the new tail.
177        */
178       m_trx_rear->next = ins_node;
179       m_trx_rear        = ins_node;
180     }
181     else
182     {
183       /* Otherwise, it is an error because the transaction should hold the
184        * mysql_bin_log.LOCK_log when appending events.
185        */
186       sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
187                       "new node (%s, %lu)", "Active_tranx:insert_tranx_node",
188                       m_trx_rear->log_name, (ulong)m_trx_rear->log_pos,
189                       ins_node->log_name, (ulong)ins_node->log_pos);
190       result = -1;
191       goto l_end;
192     }
193   }
194 
195   hash_val = get_hash_value(ins_node->log_name, ins_node->log_pos);
196   ins_node->hash_next = m_trx_htb[hash_val];
197   m_trx_htb[hash_val]   = ins_node;
198 
199   DBUG_PRINT("semisync", ("%s: insert (%s, %lu) in entry(%u)",
200                           "Active_tranx:insert_tranx_node",
201                           ins_node->log_name, (ulong)ins_node->log_pos,
202                           hash_val));
203  l_end:
204 
205   DBUG_RETURN(result);
206 }
207 
is_tranx_end_pos(const char * log_file_name,my_off_t log_file_pos)208 bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
209                                     my_off_t    log_file_pos)
210 {
211   DBUG_ENTER("Active_tranx::is_tranx_end_pos");
212 
213   unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
214   Tranx_node *entry = m_trx_htb[hash_val];
215 
216   while (entry != NULL)
217   {
218     if (compare(entry, log_file_name, log_file_pos) == 0)
219       break;
220 
221     entry = entry->hash_next;
222   }
223 
224   DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)",
225                           "Active_tranx::is_tranx_end_pos",
226                           log_file_name, (ulong)log_file_pos, hash_val));
227 
228   DBUG_RETURN(entry != NULL);
229 }
230 
clear_active_tranx_nodes(const char * log_file_name,my_off_t log_file_pos)231 void Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
232                                            my_off_t log_file_pos)
233 {
234   Tranx_node *new_front;
235 
236   DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes");
237 
238   if (log_file_name != NULL)
239   {
240     new_front = m_trx_front;
241 
242     while (new_front)
243     {
244       if (compare(new_front, log_file_name, log_file_pos) > 0)
245         break;
246       new_front = new_front->next;
247     }
248   }
249   else
250   {
251     /* If log_file_name is NULL, clear everything. */
252     new_front = NULL;
253   }
254 
255   if (new_front == NULL)
256   {
257     /* No active transaction nodes after the call. */
258 
259     /* Clear the hash table. */
260     memset(m_trx_htb, 0, m_num_entries * sizeof(Tranx_node *));
261     m_allocator.free_all_nodes();
262 
263     /* Clear the active transaction list. */
264     if (m_trx_front != NULL)
265     {
266       m_trx_front = NULL;
267       m_trx_rear  = NULL;
268     }
269 
270     DBUG_PRINT("semisync", ("%s: cleared all nodes",
271                             "Active_tranx::::clear_active_tranx_nodes"));
272   }
273   else if (new_front != m_trx_front)
274   {
275     Tranx_node *curr_node, *next_node;
276 
277     /* Delete all transaction nodes before the confirmation point. */
278     int n_frees = 0;
279     curr_node = m_trx_front;
280     while (curr_node != new_front)
281     {
282       next_node = curr_node->next;
283       n_frees++;
284 
285       /* Remove the node from the hash table. */
286       unsigned int hash_val = get_hash_value(curr_node->log_name, curr_node->log_pos);
287       Tranx_node **hash_ptr = &(m_trx_htb[hash_val]);
288       while ((*hash_ptr) != NULL)
289       {
290         if ((*hash_ptr) == curr_node)
291 	{
292           (*hash_ptr) = curr_node->hash_next;
293           break;
294         }
295         hash_ptr = &((*hash_ptr)->hash_next);
296       }
297 
298       curr_node = next_node;
299     }
300 
301     m_trx_front = new_front;
302     m_allocator.free_nodes_before(m_trx_front);
303 
304     DBUG_PRINT("semisync", ("%s: cleared %d nodes back until pos (%s, %lu)",
305                             "Active_tranx::::clear_active_tranx_nodes",
306                             n_frees,
307                             m_trx_front->log_name, (ulong)m_trx_front->log_pos));
308   }
309 
310   DBUG_VOID_RETURN;
311 }
312 
313 
314 /*******************************************************************************
315  *
316  * <Repl_semi_sync_master> class: the basic code layer for syncsync master.
317  * <Repl_semi_sync_slave>  class: the basic code layer for syncsync slave.
318  *
319  * The most important functions during semi-syn replication listed:
320  *
321  * Master:
322  *  . report_reply_binlog():  called by the binlog dump thread when it receives
323  *                          the slave's status information.
324  *  . update_sync_header():   based on transaction waiting information, decide
325  *                          whether to request the slave to reply.
326  *  . write_tranx_in_binlog(): called by the transaction thread when it finishes
327  *                          writing all transaction events in binlog.
328  *  . commit_trx():          transaction thread wait for the slave reply.
329  *
330  * Slave:
331  *  . slave_read_sync_header(): read the semi-sync header from the master, get
332  *                              the sync status and get the payload for events.
333  *  . slave_reply():          reply to the master about the replication progress.
334  *
335  ******************************************************************************/
336 
Repl_semi_sync_master()337 Repl_semi_sync_master::Repl_semi_sync_master()
338   : m_active_tranxs(NULL),
339     m_init_done(false),
340     m_reply_file_name_inited(false),
341     m_reply_file_pos(0L),
342     m_wait_file_name_inited(false),
343     m_wait_file_pos(0),
344     m_master_enabled(false),
345     m_wait_timeout(0L),
346     m_state(0),
347     m_wait_point(0)
348 {
349   strcpy(m_reply_file_name, "");
350   strcpy(m_wait_file_name, "");
351 }
352 
init_object()353 int Repl_semi_sync_master::init_object()
354 {
355   int result= 0;
356 
357   m_init_done = true;
358 
359   /* References to the parameter works after set_options(). */
360   set_wait_timeout(rpl_semi_sync_master_timeout);
361   set_trace_level(rpl_semi_sync_master_trace_level);
362   set_wait_point(rpl_semi_sync_master_wait_point);
363 
364   /* Mutex initialization can only be done after MY_INIT(). */
365   mysql_mutex_init(key_LOCK_rpl_semi_sync_master_enabled,
366                    &LOCK_rpl_semi_sync_master_enabled, MY_MUTEX_INIT_FAST);
367   mysql_mutex_init(key_LOCK_binlog,
368                    &LOCK_binlog, MY_MUTEX_INIT_FAST);
369   mysql_cond_init(key_COND_binlog_send,
370                   &COND_binlog_send, NULL);
371 
372   if (rpl_semi_sync_master_enabled)
373   {
374     result = enable_master();
375     if (!result)
376     {
377       result= ack_receiver.start(); /* Start the ACK thread. */
378       /*
379         If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily
380         switch off semisync to avoid hang if there's none active slave.
381       */
382       if (!rpl_semi_sync_master_wait_no_slave)
383         switch_off();
384     }
385   }
386   else
387   {
388     disable_master();
389   }
390 
391   return result;
392 }
393 
enable_master()394 int Repl_semi_sync_master::enable_master()
395 {
396   int result = 0;
397 
398   /* Must have the lock when we do enable of disable. */
399   lock();
400 
401   if (!get_master_enabled())
402   {
403     m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level);
404     if (m_active_tranxs != NULL)
405     {
406       m_commit_file_name_inited = false;
407       m_reply_file_name_inited  = false;
408       m_wait_file_name_inited   = false;
409 
410       set_master_enabled(true);
411       m_state = true;
412       sql_print_information("Semi-sync replication enabled on the master.");
413     }
414     else
415     {
416       sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
417       result = -1;
418     }
419   }
420 
421   unlock();
422 
423   return result;
424 }
425 
disable_master()426 void Repl_semi_sync_master::disable_master()
427 {
428   /* Must have the lock when we do enable of disable. */
429   lock();
430 
431   if (get_master_enabled())
432   {
433     /* Switch off the semi-sync first so that waiting transaction will be
434      * waken up.
435      */
436     switch_off();
437 
438     assert(m_active_tranxs != NULL);
439     delete m_active_tranxs;
440     m_active_tranxs = NULL;
441 
442     m_reply_file_name_inited = false;
443     m_wait_file_name_inited  = false;
444     m_commit_file_name_inited = false;
445 
446     set_master_enabled(false);
447     sql_print_information("Semi-sync replication disabled on the master.");
448   }
449 
450   unlock();
451 }
452 
cleanup()453 void Repl_semi_sync_master::cleanup()
454 {
455   if (m_init_done)
456   {
457     mysql_mutex_destroy(&LOCK_rpl_semi_sync_master_enabled);
458     mysql_mutex_destroy(&LOCK_binlog);
459     mysql_cond_destroy(&COND_binlog_send);
460     m_init_done= 0;
461   }
462 
463   delete m_active_tranxs;
464 }
465 
lock()466 void Repl_semi_sync_master::lock()
467 {
468   mysql_mutex_lock(&LOCK_binlog);
469 }
470 
unlock()471 void Repl_semi_sync_master::unlock()
472 {
473   mysql_mutex_unlock(&LOCK_binlog);
474 }
475 
cond_broadcast()476 void Repl_semi_sync_master::cond_broadcast()
477 {
478   mysql_cond_broadcast(&COND_binlog_send);
479 }
480 
cond_timewait(struct timespec * wait_time)481 int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time)
482 {
483   int wait_res;
484 
485   DBUG_ENTER("Repl_semi_sync_master::cond_timewait()");
486 
487   wait_res= mysql_cond_timedwait(&COND_binlog_send,
488                                  &LOCK_binlog, wait_time);
489 
490   DBUG_RETURN(wait_res);
491 }
492 
add_slave()493 void Repl_semi_sync_master::add_slave()
494 {
495   lock();
496   rpl_semi_sync_master_clients++;
497   unlock();
498 }
499 
remove_slave()500 void Repl_semi_sync_master::remove_slave()
501 {
502   lock();
503   rpl_semi_sync_master_clients--;
504 
505   /* Only switch off if semi-sync is enabled and is on */
506   if (get_master_enabled() && is_on())
507   {
508     /* If user has chosen not to wait if no semi-sync slave available
509        and the last semi-sync slave exits, turn off semi-sync on master
510        immediately.
511      */
512     if (!rpl_semi_sync_master_wait_no_slave &&
513         rpl_semi_sync_master_clients == 0)
514       switch_off();
515   }
516   unlock();
517 }
518 
report_reply_packet(uint32 server_id,const uchar * packet,ulong packet_len)519 int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
520                                                const uchar *packet,
521                                                ulong packet_len)
522 {
523   int result= -1;
524   char log_file_name[FN_REFLEN+1];
525   my_off_t log_file_pos;
526   ulong log_file_len = 0;
527 
528   DBUG_ENTER("Repl_semi_sync_master::report_reply_packet");
529 
530   if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] !=
531                Repl_semi_sync_master::k_packet_magic_num))
532   {
533     sql_print_error("Read semi-sync reply magic number error");
534     goto l_end;
535   }
536 
537   if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET))
538   {
539     sql_print_error("Read semi-sync reply length error: packet is too small");
540     goto l_end;
541   }
542 
543   log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
544   log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
545   if (unlikely(log_file_len >= FN_REFLEN))
546   {
547     sql_print_error("Read semi-sync reply binlog file length too large");
548     goto l_end;
549   }
550   strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
551   log_file_name[log_file_len] = 0;
552 
553   DBUG_ASSERT(dirname_length(log_file_name) == 0);
554 
555   DBUG_PRINT("semisync", ("%s: Got reply(%s, %lu) from server %u",
556                           "Repl_semi_sync_master::report_reply_packet",
557                           log_file_name, (ulong)log_file_pos, server_id));
558 
559   rpl_semi_sync_master_get_ack++;
560   report_reply_binlog(server_id, log_file_name, log_file_pos);
561 
562 l_end:
563 
564   DBUG_RETURN(result);
565 }
566 
report_reply_binlog(uint32 server_id,const char * log_file_name,my_off_t log_file_pos)567 int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
568                                                const char *log_file_name,
569                                                my_off_t log_file_pos)
570 {
571   int   cmp;
572   bool  can_release_threads = false;
573   bool  need_copy_send_pos = true;
574 
575   DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog");
576 
577   if (!(get_master_enabled()))
578     DBUG_RETURN(0);
579 
580   lock();
581 
582   /* This is the real check inside the mutex. */
583   if (!get_master_enabled())
584     goto l_end;
585 
586   if (!is_on())
587     /* We check to see whether we can switch semi-sync ON. */
588     try_switch_on(server_id, log_file_name, log_file_pos);
589 
590   /* The position should increase monotonically, if there is only one
591    * thread sending the binlog to the slave.
592    * In reality, to improve the transaction availability, we allow multiple
593    * sync replication slaves.  So, if any one of them get the transaction,
594    * the transaction session in the primary can move forward.
595    */
596   if (m_reply_file_name_inited)
597   {
598     cmp = Active_tranx::compare(log_file_name, log_file_pos,
599                                m_reply_file_name, m_reply_file_pos);
600 
601     /* If the requested position is behind the sending binlog position,
602      * would not adjust sending binlog position.
603      * We based on the assumption that there are multiple semi-sync slave,
604      * and at least one of them shou/ld be up to date.
605      * If all semi-sync slaves are behind, at least initially, the primary
606      * can find the situation after the waiting timeout.  After that, some
607      * slaves should catch up quickly.
608      */
609     if (cmp < 0)
610     {
611       /* If the position is behind, do not copy it. */
612       need_copy_send_pos = false;
613     }
614   }
615 
616   if (need_copy_send_pos)
617   {
618     strmake_buf(m_reply_file_name, log_file_name);
619     m_reply_file_pos = log_file_pos;
620     m_reply_file_name_inited = true;
621 
622     /* Remove all active transaction nodes before this point. */
623     assert(m_active_tranxs != NULL);
624     m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos);
625 
626     DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
627                             "Repl_semi_sync_master::report_reply_binlog",
628                             log_file_name, (ulong)log_file_pos));
629   }
630 
631   if (rpl_semi_sync_master_wait_sessions > 0)
632   {
633     /* Let us check if some of the waiting threads doing a trx
634      * commit can now proceed.
635      */
636     cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
637                                 m_wait_file_name, m_wait_file_pos);
638     if (cmp >= 0)
639     {
640       /* Yes, at least one waiting thread can now proceed:
641        * let us release all waiting threads with a broadcast
642        */
643       can_release_threads = true;
644       m_wait_file_name_inited = false;
645     }
646   }
647 
648  l_end:
649   unlock();
650 
651   if (can_release_threads)
652   {
653     DBUG_PRINT("semisync", ("%s: signal all waiting threads.",
654                             "Repl_semi_sync_master::report_reply_binlog"));
655 
656     cond_broadcast();
657   }
658 
659   DBUG_RETURN(0);
660 }
661 
wait_after_sync(const char * log_file,my_off_t log_pos)662 int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos)
663 {
664   if (!get_master_enabled())
665     return 0;
666 
667   int ret= 0;
668   if(log_pos &&
669      wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
670     ret= commit_trx(log_file + dirname_length(log_file), log_pos);
671 
672   return ret;
673 }
674 
wait_after_commit(THD * thd,bool all)675 int Repl_semi_sync_master::wait_after_commit(THD* thd, bool all)
676 {
677   if (!get_master_enabled())
678     return 0;
679 
680   int ret= 0;
681   const char *log_file;
682   my_off_t log_pos;
683 
684   bool is_real_trans=
685     (all || thd->transaction.all.ha_list == 0);
686   /*
687     The coordinates are propagated to this point having been computed
688     in report_binlog_update
689   */
690   Trans_binlog_info *log_info= thd->semisync_info;
691   log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
692   log_pos= log_info ? log_info->log_pos : 0;
693 
694   DBUG_ASSERT(!log_file || dirname_length(log_file) == 0);
695 
696   if (is_real_trans &&
697       log_pos &&
698       wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
699     ret= commit_trx(log_file, log_pos);
700 
701   if (is_real_trans && log_info)
702   {
703     log_info->log_file[0]= 0;
704     log_info->log_pos= 0;
705   }
706 
707   return ret;
708 }
709 
wait_after_rollback(THD * thd,bool all)710 int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all)
711 {
712   return wait_after_commit(thd, all);
713 }
714 
715 /**
716   The method runs after flush to binary log is done.
717 */
report_binlog_update(THD * thd,const char * log_file,my_off_t log_pos)718 int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file,
719                                                 my_off_t log_pos)
720 {
721   if (get_master_enabled())
722   {
723     Trans_binlog_info *log_info;
724 
725     if (!(log_info= thd->semisync_info))
726     {
727       if(!(log_info=
728            (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
729         return 1;
730       thd->semisync_info= log_info;
731     }
732     strcpy(log_info->log_file, log_file + dirname_length(log_file));
733     log_info->log_pos = log_pos;
734 
735     return write_tranx_in_binlog(log_info->log_file, log_pos);
736   }
737 
738   return 0;
739 }
740 
dump_start(THD * thd,const char * log_file,my_off_t log_pos)741 int Repl_semi_sync_master::dump_start(THD* thd,
742                                    const char *log_file,
743                                    my_off_t log_pos)
744 {
745   if (!thd->semi_sync_slave)
746     return 0;
747 
748   if (ack_receiver.add_slave(thd))
749   {
750     sql_print_error("Failed to register slave to semi-sync ACK receiver "
751                     "thread. Turning off semisync");
752     thd->semi_sync_slave= 0;
753     return 1;
754   }
755 
756   add_slave();
757   report_reply_binlog(thd->variables.server_id,
758                       log_file + dirname_length(log_file), log_pos);
759   sql_print_information("Start semi-sync binlog_dump to slave "
760                         "(server_id: %ld), pos(%s, %lu)",
761                         (long) thd->variables.server_id, log_file,
762                         (ulong) log_pos);
763 
764   return 0;
765 }
766 
dump_end(THD * thd)767 void Repl_semi_sync_master::dump_end(THD* thd)
768 {
769   if (!thd->semi_sync_slave)
770     return;
771 
772   sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %ld)",
773                         (long) thd->variables.server_id);
774 
775   remove_slave();
776   ack_receiver.remove_slave(thd);
777 
778   return;
779 }
780 
commit_trx(const char * trx_wait_binlog_name,my_off_t trx_wait_binlog_pos)781 int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
782                                       my_off_t trx_wait_binlog_pos)
783 {
784   DBUG_ENTER("Repl_semi_sync_master::commit_trx");
785 
786   if (get_master_enabled() && trx_wait_binlog_name)
787   {
788     struct timespec start_ts;
789     struct timespec abstime;
790     int wait_result;
791     PSI_stage_info old_stage;
792     THD *thd= current_thd;
793 
794     set_timespec(start_ts, 0);
795 
796     DEBUG_SYNC(thd, "rpl_semisync_master_commit_trx_before_lock");
797     /* Acquire the mutex. */
798     lock();
799 
800     /* This must be called after acquired the lock */
801     THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog,
802                    & stage_waiting_for_semi_sync_ack_from_slave,
803                    & old_stage);
804 
805     /* This is the real check inside the mutex. */
806     if (!get_master_enabled() || !is_on())
807       goto l_end;
808 
809     DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)",
810                             "Repl_semi_sync_master::commit_trx",
811                             trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
812                             (int)is_on()));
813 
814     while (is_on() && !thd_killed(thd))
815     {
816       if (m_reply_file_name_inited)
817       {
818         int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
819                                         trx_wait_binlog_name,
820                                         trx_wait_binlog_pos);
821         if (cmp >= 0)
822         {
823           /* We have already sent the relevant binlog to the slave: no need to
824            * wait here.
825            */
826           DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),",
827                                   "Repl_semi_sync_master::commit_trx",
828                                   m_reply_file_name,
829                                   (ulong)m_reply_file_pos));
830           break;
831         }
832       }
833 
834       /* Let us update the info about the minimum binlog position of waiting
835        * threads.
836        */
837       if (m_wait_file_name_inited)
838       {
839         int cmp = Active_tranx::compare(trx_wait_binlog_name,
840                                         trx_wait_binlog_pos,
841                                         m_wait_file_name, m_wait_file_pos);
842         if (cmp <= 0)
843 	{
844           /* This thd has a lower position, let's update the minimum info. */
845           strmake_buf(m_wait_file_name, trx_wait_binlog_name);
846           m_wait_file_pos = trx_wait_binlog_pos;
847 
848           rpl_semi_sync_master_wait_pos_backtraverse++;
849           DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),",
850                                   "Repl_semi_sync_master::commit_trx",
851                                   m_wait_file_name, (ulong)m_wait_file_pos));
852         }
853       }
854       else
855       {
856         strmake_buf(m_wait_file_name, trx_wait_binlog_name);
857         m_wait_file_pos = trx_wait_binlog_pos;
858         m_wait_file_name_inited = true;
859 
860         DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),",
861                                 "Repl_semi_sync_master::commit_trx",
862                                 m_wait_file_name, (ulong)m_wait_file_pos));
863       }
864 
865       /* Calcuate the waiting period. */
866       long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND);
867       long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
868       long nsecs = start_ts.tv_nsec + diff_nsecs;
869       abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
870       abstime.tv_nsec = nsecs % TIME_BILLION;
871 
872       /* In semi-synchronous replication, we wait until the binlog-dump
873        * thread has received the reply on the relevant binlog segment from the
874        * replication slave.
875        *
876        * Let us suspend this thread to wait on the condition;
877        * when replication has progressed far enough, we will release
878        * these waiting threads.
879        */
880       rpl_semi_sync_master_wait_sessions++;
881 
882       DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
883                               "Repl_semi_sync_master::commit_trx",
884                               m_wait_timeout,
885                               m_wait_file_name, (ulong)m_wait_file_pos));
886 
887       wait_result = cond_timewait(&abstime);
888       rpl_semi_sync_master_wait_sessions--;
889 
890       if (wait_result != 0)
891       {
892         /* This is a real wait timeout. */
893         sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
894                           "semi-sync up to file %s, position %lu.",
895                           trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
896                           m_reply_file_name, (ulong)m_reply_file_pos);
897         rpl_semi_sync_master_wait_timeouts++;
898 
899         /* switch semi-sync off */
900         switch_off();
901       }
902       else
903       {
904         int wait_time;
905 
906         wait_time = get_wait_time(start_ts);
907         if (wait_time < 0)
908         {
909           DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at "
910                                   "wait position (%s, %lu)",
911                                   trx_wait_binlog_name,
912                                   (ulong)trx_wait_binlog_pos));
913           rpl_semi_sync_master_timefunc_fails++;
914         }
915         else
916         {
917           rpl_semi_sync_master_trx_wait_num++;
918           rpl_semi_sync_master_trx_wait_time += wait_time;
919         }
920       }
921     }
922 
923     /*
924       At this point, the binlog file and position of this transaction
925       must have been removed from Active_tranx.
926       m_active_tranxs may be NULL if someone disabled semi sync during
927       cond_timewait()
928     */
929     assert(thd_killed(thd) || !m_active_tranxs ||
930            !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
931                                              trx_wait_binlog_pos));
932 
933   l_end:
934     /* Update the status counter. */
935     if (is_on())
936       rpl_semi_sync_master_yes_transactions++;
937     else
938       rpl_semi_sync_master_no_transactions++;
939 
940     /* The lock held will be released by thd_exit_cond, so no need to
941        call unlock() here */
942     THD_EXIT_COND(thd, &old_stage);
943   }
944 
945   DBUG_RETURN(0);
946 }
947 
948 /* Indicate that semi-sync replication is OFF now.
949  *
950  * What should we do when it is disabled?  The problem is that we want
951  * the semi-sync replication enabled again when the slave catches up
952  * later.  But, it is not that easy to detect that the slave has caught
953  * up.  This is caused by the fact that MySQL's replication protocol is
954  * asynchronous, meaning that if the master does not use the semi-sync
955  * protocol, the slave would not send anything to the master.
956  * Still, if the master is sending (N+1)-th event, we assume that it is
957  * an indicator that the slave has received N-th event and earlier ones.
958  *
959  * If semi-sync is disabled, all transactions still update the wait
960  * position with the last position in binlog.  But no transactions will
961  * wait for confirmations and the active transaction list would not be
962  * maintained.  In binlog dump thread, update_sync_header() checks whether
963  * the current sending event catches up with last wait position.  If it
964  * does match, semi-sync will be switched on again.
965  */
switch_off()966 void Repl_semi_sync_master::switch_off()
967 {
968   DBUG_ENTER("Repl_semi_sync_master::switch_off");
969 
970   m_state = false;
971 
972   /* Clear the active transaction list. */
973   assert(m_active_tranxs != NULL);
974   m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
975 
976   rpl_semi_sync_master_off_times++;
977   m_wait_file_name_inited   = false;
978   m_reply_file_name_inited  = false;
979   sql_print_information("Semi-sync replication switched OFF.");
980   cond_broadcast();                            /* wake up all waiting threads */
981 
982   DBUG_VOID_RETURN;
983 }
984 
try_switch_on(int server_id,const char * log_file_name,my_off_t log_file_pos)985 int Repl_semi_sync_master::try_switch_on(int server_id,
986                                          const char *log_file_name,
987                                          my_off_t log_file_pos)
988 {
989   bool semi_sync_on = false;
990 
991   DBUG_ENTER("Repl_semi_sync_master::try_switch_on");
992 
993   /* If the current sending event's position is larger than or equal to the
994    * 'largest' commit transaction binlog position, the slave is already
995    * catching up now and we can switch semi-sync on here.
996    * If m_commit_file_name_inited indicates there are no recent transactions,
997    * we can enable semi-sync immediately.
998    */
999   if (m_commit_file_name_inited)
1000   {
1001     int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1002                                    m_commit_file_name, m_commit_file_pos);
1003     semi_sync_on = (cmp >= 0);
1004   }
1005   else
1006   {
1007     semi_sync_on = true;
1008   }
1009 
1010   if (semi_sync_on)
1011   {
1012     /* Switch semi-sync replication on. */
1013     m_state = true;
1014 
1015     sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
1016                           "at (%s, %lu)",
1017                           server_id, log_file_name,
1018                           (ulong)log_file_pos);
1019   }
1020 
1021   DBUG_RETURN(0);
1022 }
1023 
reserve_sync_header(String * packet)1024 int Repl_semi_sync_master::reserve_sync_header(String* packet)
1025 {
1026   DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header");
1027 
1028   /* Set the magic number and the sync status.  By default, no sync
1029    * is required.
1030    */
1031   packet->append(reinterpret_cast<const char*>(k_sync_header),
1032                  sizeof(k_sync_header));
1033   DBUG_RETURN(0);
1034 }
1035 
update_sync_header(THD * thd,unsigned char * packet,const char * log_file_name,my_off_t log_file_pos,bool * need_sync)1036 int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
1037                                               const char *log_file_name,
1038                                               my_off_t log_file_pos,
1039                                               bool* need_sync)
1040 {
1041   int  cmp = 0;
1042   bool sync = false;
1043 
1044   DBUG_ENTER("Repl_semi_sync_master::update_sync_header");
1045 
1046   /* If the semi-sync master is not enabled, or the slave is not a semi-sync
1047    * target, do not request replies from the slave.
1048    */
1049   if (!get_master_enabled() || !thd->semi_sync_slave)
1050   {
1051     *need_sync = false;
1052     DBUG_RETURN(0);
1053   }
1054 
1055   lock();
1056 
1057   /* This is the real check inside the mutex. */
1058   if (!get_master_enabled())
1059   {
1060     assert(sync == false);
1061     goto l_end;
1062   }
1063 
1064   if (is_on())
1065   {
1066     /* semi-sync is ON */
1067     sync = false;     /* No sync unless a transaction is involved. */
1068 
1069     if (m_reply_file_name_inited)
1070     {
1071       cmp = Active_tranx::compare(log_file_name, log_file_pos,
1072                                  m_reply_file_name, m_reply_file_pos);
1073       if (cmp <= 0)
1074       {
1075         /* If we have already got the reply for the event, then we do
1076          * not need to sync the transaction again.
1077          */
1078         goto l_end;
1079       }
1080     }
1081 
1082     if (m_wait_file_name_inited)
1083     {
1084       cmp = Active_tranx::compare(log_file_name, log_file_pos,
1085                                  m_wait_file_name, m_wait_file_pos);
1086     }
1087     else
1088     {
1089       cmp = 1;
1090     }
1091 
1092     /* If we are already waiting for some transaction replies which
1093      * are later in binlog, do not wait for this one event.
1094      */
1095     if (cmp >= 0)
1096     {
1097       /*
1098        * We only wait if the event is a transaction's ending event.
1099        */
1100       assert(m_active_tranxs != NULL);
1101       sync = m_active_tranxs->is_tranx_end_pos(log_file_name,
1102                                                log_file_pos);
1103     }
1104   }
1105   else
1106   {
1107     if (m_commit_file_name_inited)
1108     {
1109       int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1110                                      m_commit_file_name, m_commit_file_pos);
1111       sync = (cmp >= 0);
1112     }
1113     else
1114     {
1115       sync = true;
1116     }
1117   }
1118 
1119   DBUG_PRINT("semisync", ("%s: server(%lu), (%s, %lu) sync(%d), repl(%d)",
1120                           "Repl_semi_sync_master::update_sync_header",
1121                           thd->variables.server_id, log_file_name,
1122                           (ulong)log_file_pos, sync, (int)is_on()));
1123   *need_sync= sync;
1124 
1125  l_end:
1126   unlock();
1127 
1128   /* We do not need to clear sync flag because we set it to 0 when we
1129    * reserve the packet header.
1130    */
1131   if (sync)
1132   {
1133     (packet)[2] = k_packet_flag_sync;
1134   }
1135 
1136   DBUG_RETURN(0);
1137 }
1138 
write_tranx_in_binlog(const char * log_file_name,my_off_t log_file_pos)1139 int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
1140                                                  my_off_t log_file_pos)
1141 {
1142   int result = 0;
1143 
1144   DBUG_ENTER("Repl_semi_sync_master::write_tranx_in_binlog");
1145 
1146   lock();
1147 
1148   /* This is the real check inside the mutex. */
1149   if (!get_master_enabled())
1150     goto l_end;
1151 
1152   /* Update the 'largest' transaction commit position seen so far even
1153    * though semi-sync is switched off.
1154    * It is much better that we update m_commit_file* here, instead of
1155    * inside commit_trx().  This is mostly because update_sync_header()
1156    * will watch for m_commit_file* to decide whether to switch semi-sync
1157    * on. The detailed reason is explained in function update_sync_header().
1158    */
1159   if (m_commit_file_name_inited)
1160   {
1161     int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1162                                     m_commit_file_name, m_commit_file_pos);
1163     if (cmp > 0)
1164     {
1165       /* This is a larger position, let's update the maximum info. */
1166       strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
1167       m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
1168       m_commit_file_pos = log_file_pos;
1169     }
1170   }
1171   else
1172   {
1173     strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
1174     m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
1175     m_commit_file_pos = log_file_pos;
1176     m_commit_file_name_inited = true;
1177   }
1178 
1179   if (is_on())
1180   {
1181     assert(m_active_tranxs != NULL);
1182     if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos))
1183     {
1184       /*
1185         if insert tranx_node failed, print a warning message
1186         and turn off semi-sync
1187       */
1188       sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1189                         log_file_name, (ulong)log_file_pos);
1190       switch_off();
1191     }
1192     else
1193     {
1194       rpl_semi_sync_master_request_ack++;
1195     }
1196   }
1197 
1198  l_end:
1199   unlock();
1200 
1201   DBUG_RETURN(result);
1202 }
1203 
flush_net(THD * thd,const char * event_buf)1204 int Repl_semi_sync_master::flush_net(THD *thd,
1205                                      const char *event_buf)
1206 {
1207   int      result = -1;
1208   NET* net= &thd->net;
1209 
1210   DBUG_ENTER("Repl_semi_sync_master::flush_net");
1211 
1212   assert((unsigned char)event_buf[1] == k_packet_magic_num);
1213   if ((unsigned char)event_buf[2] != k_packet_flag_sync)
1214   {
1215     /* current event does not require reply */
1216     result = 0;
1217     goto l_end;
1218   }
1219 
1220   /* We flush to make sure that the current event is sent to the network,
1221    * instead of being buffered in the TCP/IP stack.
1222    */
1223   if (net_flush(net))
1224   {
1225     sql_print_error("Semi-sync master failed on net_flush() "
1226                     "before waiting for slave reply");
1227     goto l_end;
1228   }
1229 
1230   net_clear(net, 0);
1231   net->pkt_nr++;
1232   result = 0;
1233   rpl_semi_sync_master_net_wait_num++;
1234 
1235  l_end:
1236   thd->clear_error();
1237 
1238   DBUG_RETURN(result);
1239 }
1240 
after_reset_master()1241 int Repl_semi_sync_master::after_reset_master()
1242 {
1243   int result = 0;
1244 
1245   DBUG_ENTER("Repl_semi_sync_master::after_reset_master");
1246 
1247   if (rpl_semi_sync_master_enabled)
1248   {
1249     sql_print_information("Enable Semi-sync Master after reset master");
1250     enable_master();
1251   }
1252 
1253   lock();
1254 
1255   if (rpl_semi_sync_master_clients == 0 &&
1256       !rpl_semi_sync_master_wait_no_slave)
1257     m_state = 0;
1258   else
1259     m_state = get_master_enabled()? 1 : 0;
1260 
1261   m_wait_file_name_inited   = false;
1262   m_reply_file_name_inited  = false;
1263   m_commit_file_name_inited = false;
1264 
1265   rpl_semi_sync_master_yes_transactions = 0;
1266   rpl_semi_sync_master_no_transactions = 0;
1267   rpl_semi_sync_master_off_times = 0;
1268   rpl_semi_sync_master_timefunc_fails = 0;
1269   rpl_semi_sync_master_wait_sessions = 0;
1270   rpl_semi_sync_master_wait_pos_backtraverse = 0;
1271   rpl_semi_sync_master_trx_wait_num = 0;
1272   rpl_semi_sync_master_trx_wait_time = 0;
1273   rpl_semi_sync_master_net_wait_num = 0;
1274   rpl_semi_sync_master_net_wait_time = 0;
1275 
1276   unlock();
1277 
1278   DBUG_RETURN(result);
1279 }
1280 
before_reset_master()1281 int Repl_semi_sync_master::before_reset_master()
1282 {
1283   int result = 0;
1284 
1285   DBUG_ENTER("Repl_semi_sync_master::before_reset_master");
1286 
1287   if (rpl_semi_sync_master_enabled)
1288     disable_master();
1289 
1290   DBUG_RETURN(result);
1291 }
1292 
check_and_switch()1293 void Repl_semi_sync_master::check_and_switch()
1294 {
1295   lock();
1296   if (get_master_enabled() && is_on())
1297   {
1298     if (!rpl_semi_sync_master_wait_no_slave
1299          && rpl_semi_sync_master_clients == 0)
1300       switch_off();
1301   }
1302   unlock();
1303 }
1304 
set_export_stats()1305 void Repl_semi_sync_master::set_export_stats()
1306 {
1307   lock();
1308 
1309   rpl_semi_sync_master_status           = m_state;
1310   rpl_semi_sync_master_avg_trx_wait_time=
1311     ((rpl_semi_sync_master_trx_wait_num) ?
1312      (ulong)((double)rpl_semi_sync_master_trx_wait_time /
1313                      ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
1314   rpl_semi_sync_master_avg_net_wait_time=
1315     ((rpl_semi_sync_master_net_wait_num) ?
1316      (ulong)((double)rpl_semi_sync_master_net_wait_time /
1317                      ((double)rpl_semi_sync_master_net_wait_num)) : 0);
1318 
1319   unlock();
1320 }
1321 
1322 /* Get the waiting time given the wait's staring time.
1323  *
1324  * Return:
1325  *  >= 0: the waiting time in microsecons(us)
1326  *   < 0: error in get time or time back traverse
1327  */
get_wait_time(const struct timespec & start_ts)1328 static int get_wait_time(const struct timespec& start_ts)
1329 {
1330   ulonglong start_usecs, end_usecs;
1331   struct timespec end_ts;
1332 
1333   /* Starting time in microseconds(us). */
1334   start_usecs = timespec_to_usec(&start_ts);
1335 
1336   /* Get the wait time interval. */
1337   set_timespec(end_ts, 0);
1338 
1339   /* Ending time in microseconds(us). */
1340   end_usecs = timespec_to_usec(&end_ts);
1341 
1342   if (end_usecs < start_usecs)
1343     return -1;
1344 
1345   return (int)(end_usecs - start_usecs);
1346 }
1347 
semi_sync_master_deinit()1348 void semi_sync_master_deinit()
1349 {
1350   repl_semisync_master.cleanup();
1351   ack_receiver.cleanup();
1352 }
1353