1 /* Copyright (C) 2007 Google Inc.
2 Copyright (c) 2008, 2013, Oracle and/or its affiliates.
3 Copyright (c) 2011, 2016, MariaDB
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17
18
19 #include <my_global.h>
20 #include "semisync_master.h"
21
22 #define TIME_THOUSAND 1000
23 #define TIME_MILLION 1000000
24 #define TIME_BILLION 1000000000
25
26 /* This indicates whether semi-synchronous replication is enabled. */
27 my_bool rpl_semi_sync_master_enabled= 0;
28 unsigned long long rpl_semi_sync_master_request_ack = 0;
29 unsigned long long rpl_semi_sync_master_get_ack = 0;
30 my_bool rpl_semi_sync_master_wait_no_slave = 1;
31 my_bool rpl_semi_sync_master_status = 0;
32 ulong rpl_semi_sync_master_wait_point =
33 SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT;
34 ulong rpl_semi_sync_master_timeout;
35 ulong rpl_semi_sync_master_trace_level;
36 ulong rpl_semi_sync_master_yes_transactions = 0;
37 ulong rpl_semi_sync_master_no_transactions = 0;
38 ulong rpl_semi_sync_master_off_times = 0;
39 ulong rpl_semi_sync_master_timefunc_fails = 0;
40 ulong rpl_semi_sync_master_wait_timeouts = 0;
41 ulong rpl_semi_sync_master_wait_sessions = 0;
42 ulong rpl_semi_sync_master_wait_pos_backtraverse = 0;
43 ulong rpl_semi_sync_master_avg_trx_wait_time = 0;
44 ulonglong rpl_semi_sync_master_trx_wait_num = 0;
45 ulong rpl_semi_sync_master_avg_net_wait_time = 0;
46 ulonglong rpl_semi_sync_master_net_wait_num = 0;
47 ulong rpl_semi_sync_master_clients = 0;
48 ulonglong rpl_semi_sync_master_net_wait_time = 0;
49 ulonglong rpl_semi_sync_master_trx_wait_time = 0;
50
51 Repl_semi_sync_master repl_semisync_master;
52 Ack_receiver ack_receiver;
53
54 /*
55 structure to save transaction log filename and position
56 */
57 typedef struct Trans_binlog_info {
58 my_off_t log_pos;
59 char log_file[FN_REFLEN];
60 } Trans_binlog_info;
61
62 static int get_wait_time(const struct timespec& start_ts);
63
timespec_to_usec(const struct timespec * ts)64 static ulonglong timespec_to_usec(const struct timespec *ts)
65 {
66 return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
67 }
68
69 /*******************************************************************************
70 *
71 * <Active_tranx> class : manage all active transaction nodes
72 *
73 ******************************************************************************/
74
Active_tranx(mysql_mutex_t * lock,ulong trace_level)75 Active_tranx::Active_tranx(mysql_mutex_t *lock,
76 ulong trace_level)
77 : Trace(trace_level), m_allocator(max_connections),
78 m_num_entries(max_connections << 1), /* Transaction hash table size
79 * is set to double the size
80 * of max_connections */
81 m_lock(lock)
82 {
83 /* No transactions are in the list initially. */
84 m_trx_front = NULL;
85 m_trx_rear = NULL;
86
87 /* Create the hash table to find a transaction's ending event. */
88 m_trx_htb = new Tranx_node *[m_num_entries];
89 for (int idx = 0; idx < m_num_entries; ++idx)
90 m_trx_htb[idx] = NULL;
91
92 sql_print_information("Semi-sync replication initialized for transactions.");
93 }
94
~Active_tranx()95 Active_tranx::~Active_tranx()
96 {
97 delete [] m_trx_htb;
98 m_trx_htb = NULL;
99 m_num_entries = 0;
100 }
101
calc_hash(const unsigned char * key,size_t length)102 unsigned int Active_tranx::calc_hash(const unsigned char *key, size_t length)
103 {
104 unsigned int nr = 1, nr2 = 4;
105
106 /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
107 while (length--)
108 {
109 nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
110 nr2 += 3;
111 }
112 return((unsigned int) nr);
113 }
114
get_hash_value(const char * log_file_name,my_off_t log_file_pos)115 unsigned int Active_tranx::get_hash_value(const char *log_file_name,
116 my_off_t log_file_pos)
117 {
118 unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
119 strlen(log_file_name));
120 unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
121 sizeof(log_file_pos));
122
123 return (hash1 + hash2) % m_num_entries;
124 }
125
compare(const char * log_file_name1,my_off_t log_file_pos1,const char * log_file_name2,my_off_t log_file_pos2)126 int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
127 const char *log_file_name2, my_off_t log_file_pos2)
128 {
129 int cmp = strcmp(log_file_name1, log_file_name2);
130
131 if (cmp != 0)
132 return cmp;
133
134 if (log_file_pos1 > log_file_pos2)
135 return 1;
136 else if (log_file_pos1 < log_file_pos2)
137 return -1;
138 return 0;
139 }
140
insert_tranx_node(const char * log_file_name,my_off_t log_file_pos)141 int Active_tranx::insert_tranx_node(const char *log_file_name,
142 my_off_t log_file_pos)
143 {
144 Tranx_node *ins_node;
145 int result = 0;
146 unsigned int hash_val;
147
148 DBUG_ENTER("Active_tranx:insert_tranx_node");
149
150 ins_node = m_allocator.allocate_node();
151 if (!ins_node)
152 {
153 sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
154 "Active_tranx:insert_tranx_node",
155 log_file_name, (ulong)log_file_pos);
156 result = -1;
157 goto l_end;
158 }
159
160 /* insert the binlog position in the active transaction list. */
161 strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1);
162 ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
163 ins_node->log_pos = log_file_pos;
164
165 if (!m_trx_front)
166 {
167 /* The list is empty. */
168 m_trx_front = m_trx_rear = ins_node;
169 }
170 else
171 {
172 int cmp = compare(ins_node, m_trx_rear);
173 if (cmp > 0)
174 {
175 /* Compare with the tail first. If the transaction happens later in
176 * binlog, then make it the new tail.
177 */
178 m_trx_rear->next = ins_node;
179 m_trx_rear = ins_node;
180 }
181 else
182 {
183 /* Otherwise, it is an error because the transaction should hold the
184 * mysql_bin_log.LOCK_log when appending events.
185 */
186 sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
187 "new node (%s, %lu)", "Active_tranx:insert_tranx_node",
188 m_trx_rear->log_name, (ulong)m_trx_rear->log_pos,
189 ins_node->log_name, (ulong)ins_node->log_pos);
190 result = -1;
191 goto l_end;
192 }
193 }
194
195 hash_val = get_hash_value(ins_node->log_name, ins_node->log_pos);
196 ins_node->hash_next = m_trx_htb[hash_val];
197 m_trx_htb[hash_val] = ins_node;
198
199 DBUG_PRINT("semisync", ("%s: insert (%s, %lu) in entry(%u)",
200 "Active_tranx:insert_tranx_node",
201 ins_node->log_name, (ulong)ins_node->log_pos,
202 hash_val));
203 l_end:
204
205 DBUG_RETURN(result);
206 }
207
is_tranx_end_pos(const char * log_file_name,my_off_t log_file_pos)208 bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
209 my_off_t log_file_pos)
210 {
211 DBUG_ENTER("Active_tranx::is_tranx_end_pos");
212
213 unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
214 Tranx_node *entry = m_trx_htb[hash_val];
215
216 while (entry != NULL)
217 {
218 if (compare(entry, log_file_name, log_file_pos) == 0)
219 break;
220
221 entry = entry->hash_next;
222 }
223
224 DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)",
225 "Active_tranx::is_tranx_end_pos",
226 log_file_name, (ulong)log_file_pos, hash_val));
227
228 DBUG_RETURN(entry != NULL);
229 }
230
clear_active_tranx_nodes(const char * log_file_name,my_off_t log_file_pos)231 void Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
232 my_off_t log_file_pos)
233 {
234 Tranx_node *new_front;
235
236 DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes");
237
238 if (log_file_name != NULL)
239 {
240 new_front = m_trx_front;
241
242 while (new_front)
243 {
244 if (compare(new_front, log_file_name, log_file_pos) > 0)
245 break;
246 new_front = new_front->next;
247 }
248 }
249 else
250 {
251 /* If log_file_name is NULL, clear everything. */
252 new_front = NULL;
253 }
254
255 if (new_front == NULL)
256 {
257 /* No active transaction nodes after the call. */
258
259 /* Clear the hash table. */
260 memset(m_trx_htb, 0, m_num_entries * sizeof(Tranx_node *));
261 m_allocator.free_all_nodes();
262
263 /* Clear the active transaction list. */
264 if (m_trx_front != NULL)
265 {
266 m_trx_front = NULL;
267 m_trx_rear = NULL;
268 }
269
270 DBUG_PRINT("semisync", ("%s: cleared all nodes",
271 "Active_tranx::::clear_active_tranx_nodes"));
272 }
273 else if (new_front != m_trx_front)
274 {
275 Tranx_node *curr_node, *next_node;
276
277 /* Delete all transaction nodes before the confirmation point. */
278 int n_frees = 0;
279 curr_node = m_trx_front;
280 while (curr_node != new_front)
281 {
282 next_node = curr_node->next;
283 n_frees++;
284
285 /* Remove the node from the hash table. */
286 unsigned int hash_val = get_hash_value(curr_node->log_name, curr_node->log_pos);
287 Tranx_node **hash_ptr = &(m_trx_htb[hash_val]);
288 while ((*hash_ptr) != NULL)
289 {
290 if ((*hash_ptr) == curr_node)
291 {
292 (*hash_ptr) = curr_node->hash_next;
293 break;
294 }
295 hash_ptr = &((*hash_ptr)->hash_next);
296 }
297
298 curr_node = next_node;
299 }
300
301 m_trx_front = new_front;
302 m_allocator.free_nodes_before(m_trx_front);
303
304 DBUG_PRINT("semisync", ("%s: cleared %d nodes back until pos (%s, %lu)",
305 "Active_tranx::::clear_active_tranx_nodes",
306 n_frees,
307 m_trx_front->log_name, (ulong)m_trx_front->log_pos));
308 }
309
310 DBUG_VOID_RETURN;
311 }
312
313
314 /*******************************************************************************
315 *
316 * <Repl_semi_sync_master> class: the basic code layer for syncsync master.
317 * <Repl_semi_sync_slave> class: the basic code layer for syncsync slave.
318 *
319 * The most important functions during semi-syn replication listed:
320 *
321 * Master:
322 * . report_reply_binlog(): called by the binlog dump thread when it receives
323 * the slave's status information.
324 * . update_sync_header(): based on transaction waiting information, decide
325 * whether to request the slave to reply.
326 * . write_tranx_in_binlog(): called by the transaction thread when it finishes
327 * writing all transaction events in binlog.
328 * . commit_trx(): transaction thread wait for the slave reply.
329 *
330 * Slave:
331 * . slave_read_sync_header(): read the semi-sync header from the master, get
332 * the sync status and get the payload for events.
333 * . slave_reply(): reply to the master about the replication progress.
334 *
335 ******************************************************************************/
336
Repl_semi_sync_master()337 Repl_semi_sync_master::Repl_semi_sync_master()
338 : m_active_tranxs(NULL),
339 m_init_done(false),
340 m_reply_file_name_inited(false),
341 m_reply_file_pos(0L),
342 m_wait_file_name_inited(false),
343 m_wait_file_pos(0),
344 m_master_enabled(false),
345 m_wait_timeout(0L),
346 m_state(0),
347 m_wait_point(0)
348 {
349 strcpy(m_reply_file_name, "");
350 strcpy(m_wait_file_name, "");
351 }
352
init_object()353 int Repl_semi_sync_master::init_object()
354 {
355 int result= 0;
356
357 m_init_done = true;
358
359 /* References to the parameter works after set_options(). */
360 set_wait_timeout(rpl_semi_sync_master_timeout);
361 set_trace_level(rpl_semi_sync_master_trace_level);
362 set_wait_point(rpl_semi_sync_master_wait_point);
363
364 /* Mutex initialization can only be done after MY_INIT(). */
365 mysql_mutex_init(key_LOCK_rpl_semi_sync_master_enabled,
366 &LOCK_rpl_semi_sync_master_enabled, MY_MUTEX_INIT_FAST);
367 mysql_mutex_init(key_LOCK_binlog,
368 &LOCK_binlog, MY_MUTEX_INIT_FAST);
369 mysql_cond_init(key_COND_binlog_send,
370 &COND_binlog_send, NULL);
371
372 if (rpl_semi_sync_master_enabled)
373 {
374 result = enable_master();
375 if (!result)
376 {
377 result= ack_receiver.start(); /* Start the ACK thread. */
378 /*
379 If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily
380 switch off semisync to avoid hang if there's none active slave.
381 */
382 if (!rpl_semi_sync_master_wait_no_slave)
383 switch_off();
384 }
385 }
386 else
387 {
388 disable_master();
389 }
390
391 return result;
392 }
393
enable_master()394 int Repl_semi_sync_master::enable_master()
395 {
396 int result = 0;
397
398 /* Must have the lock when we do enable of disable. */
399 lock();
400
401 if (!get_master_enabled())
402 {
403 m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level);
404 if (m_active_tranxs != NULL)
405 {
406 m_commit_file_name_inited = false;
407 m_reply_file_name_inited = false;
408 m_wait_file_name_inited = false;
409
410 set_master_enabled(true);
411 m_state = true;
412 sql_print_information("Semi-sync replication enabled on the master.");
413 }
414 else
415 {
416 sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
417 result = -1;
418 }
419 }
420
421 unlock();
422
423 return result;
424 }
425
disable_master()426 void Repl_semi_sync_master::disable_master()
427 {
428 /* Must have the lock when we do enable of disable. */
429 lock();
430
431 if (get_master_enabled())
432 {
433 /* Switch off the semi-sync first so that waiting transaction will be
434 * waken up.
435 */
436 switch_off();
437
438 assert(m_active_tranxs != NULL);
439 delete m_active_tranxs;
440 m_active_tranxs = NULL;
441
442 m_reply_file_name_inited = false;
443 m_wait_file_name_inited = false;
444 m_commit_file_name_inited = false;
445
446 set_master_enabled(false);
447 sql_print_information("Semi-sync replication disabled on the master.");
448 }
449
450 unlock();
451 }
452
cleanup()453 void Repl_semi_sync_master::cleanup()
454 {
455 if (m_init_done)
456 {
457 mysql_mutex_destroy(&LOCK_rpl_semi_sync_master_enabled);
458 mysql_mutex_destroy(&LOCK_binlog);
459 mysql_cond_destroy(&COND_binlog_send);
460 m_init_done= 0;
461 }
462
463 delete m_active_tranxs;
464 }
465
lock()466 void Repl_semi_sync_master::lock()
467 {
468 mysql_mutex_lock(&LOCK_binlog);
469 }
470
unlock()471 void Repl_semi_sync_master::unlock()
472 {
473 mysql_mutex_unlock(&LOCK_binlog);
474 }
475
cond_broadcast()476 void Repl_semi_sync_master::cond_broadcast()
477 {
478 mysql_cond_broadcast(&COND_binlog_send);
479 }
480
cond_timewait(struct timespec * wait_time)481 int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time)
482 {
483 int wait_res;
484
485 DBUG_ENTER("Repl_semi_sync_master::cond_timewait()");
486
487 wait_res= mysql_cond_timedwait(&COND_binlog_send,
488 &LOCK_binlog, wait_time);
489
490 DBUG_RETURN(wait_res);
491 }
492
add_slave()493 void Repl_semi_sync_master::add_slave()
494 {
495 lock();
496 rpl_semi_sync_master_clients++;
497 unlock();
498 }
499
remove_slave()500 void Repl_semi_sync_master::remove_slave()
501 {
502 lock();
503 rpl_semi_sync_master_clients--;
504
505 /* Only switch off if semi-sync is enabled and is on */
506 if (get_master_enabled() && is_on())
507 {
508 /* If user has chosen not to wait if no semi-sync slave available
509 and the last semi-sync slave exits, turn off semi-sync on master
510 immediately.
511 */
512 if (!rpl_semi_sync_master_wait_no_slave &&
513 rpl_semi_sync_master_clients == 0)
514 switch_off();
515 }
516 unlock();
517 }
518
report_reply_packet(uint32 server_id,const uchar * packet,ulong packet_len)519 int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
520 const uchar *packet,
521 ulong packet_len)
522 {
523 int result= -1;
524 char log_file_name[FN_REFLEN+1];
525 my_off_t log_file_pos;
526 ulong log_file_len = 0;
527
528 DBUG_ENTER("Repl_semi_sync_master::report_reply_packet");
529
530 if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] !=
531 Repl_semi_sync_master::k_packet_magic_num))
532 {
533 sql_print_error("Read semi-sync reply magic number error");
534 goto l_end;
535 }
536
537 if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET))
538 {
539 sql_print_error("Read semi-sync reply length error: packet is too small");
540 goto l_end;
541 }
542
543 log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
544 log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
545 if (unlikely(log_file_len >= FN_REFLEN))
546 {
547 sql_print_error("Read semi-sync reply binlog file length too large");
548 goto l_end;
549 }
550 strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
551 log_file_name[log_file_len] = 0;
552
553 DBUG_ASSERT(dirname_length(log_file_name) == 0);
554
555 DBUG_PRINT("semisync", ("%s: Got reply(%s, %lu) from server %u",
556 "Repl_semi_sync_master::report_reply_packet",
557 log_file_name, (ulong)log_file_pos, server_id));
558
559 rpl_semi_sync_master_get_ack++;
560 report_reply_binlog(server_id, log_file_name, log_file_pos);
561
562 l_end:
563
564 DBUG_RETURN(result);
565 }
566
report_reply_binlog(uint32 server_id,const char * log_file_name,my_off_t log_file_pos)567 int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
568 const char *log_file_name,
569 my_off_t log_file_pos)
570 {
571 int cmp;
572 bool can_release_threads = false;
573 bool need_copy_send_pos = true;
574
575 DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog");
576
577 if (!(get_master_enabled()))
578 DBUG_RETURN(0);
579
580 lock();
581
582 /* This is the real check inside the mutex. */
583 if (!get_master_enabled())
584 goto l_end;
585
586 if (!is_on())
587 /* We check to see whether we can switch semi-sync ON. */
588 try_switch_on(server_id, log_file_name, log_file_pos);
589
590 /* The position should increase monotonically, if there is only one
591 * thread sending the binlog to the slave.
592 * In reality, to improve the transaction availability, we allow multiple
593 * sync replication slaves. So, if any one of them get the transaction,
594 * the transaction session in the primary can move forward.
595 */
596 if (m_reply_file_name_inited)
597 {
598 cmp = Active_tranx::compare(log_file_name, log_file_pos,
599 m_reply_file_name, m_reply_file_pos);
600
601 /* If the requested position is behind the sending binlog position,
602 * would not adjust sending binlog position.
603 * We based on the assumption that there are multiple semi-sync slave,
604 * and at least one of them shou/ld be up to date.
605 * If all semi-sync slaves are behind, at least initially, the primary
606 * can find the situation after the waiting timeout. After that, some
607 * slaves should catch up quickly.
608 */
609 if (cmp < 0)
610 {
611 /* If the position is behind, do not copy it. */
612 need_copy_send_pos = false;
613 }
614 }
615
616 if (need_copy_send_pos)
617 {
618 strmake_buf(m_reply_file_name, log_file_name);
619 m_reply_file_pos = log_file_pos;
620 m_reply_file_name_inited = true;
621
622 /* Remove all active transaction nodes before this point. */
623 assert(m_active_tranxs != NULL);
624 m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos);
625
626 DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
627 "Repl_semi_sync_master::report_reply_binlog",
628 log_file_name, (ulong)log_file_pos));
629 }
630
631 if (rpl_semi_sync_master_wait_sessions > 0)
632 {
633 /* Let us check if some of the waiting threads doing a trx
634 * commit can now proceed.
635 */
636 cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
637 m_wait_file_name, m_wait_file_pos);
638 if (cmp >= 0)
639 {
640 /* Yes, at least one waiting thread can now proceed:
641 * let us release all waiting threads with a broadcast
642 */
643 can_release_threads = true;
644 m_wait_file_name_inited = false;
645 }
646 }
647
648 l_end:
649 unlock();
650
651 if (can_release_threads)
652 {
653 DBUG_PRINT("semisync", ("%s: signal all waiting threads.",
654 "Repl_semi_sync_master::report_reply_binlog"));
655
656 cond_broadcast();
657 }
658
659 DBUG_RETURN(0);
660 }
661
wait_after_sync(const char * log_file,my_off_t log_pos)662 int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos)
663 {
664 if (!get_master_enabled())
665 return 0;
666
667 int ret= 0;
668 if(log_pos &&
669 wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
670 ret= commit_trx(log_file + dirname_length(log_file), log_pos);
671
672 return ret;
673 }
674
wait_after_commit(THD * thd,bool all)675 int Repl_semi_sync_master::wait_after_commit(THD* thd, bool all)
676 {
677 if (!get_master_enabled())
678 return 0;
679
680 int ret= 0;
681 const char *log_file;
682 my_off_t log_pos;
683
684 bool is_real_trans=
685 (all || thd->transaction.all.ha_list == 0);
686 /*
687 The coordinates are propagated to this point having been computed
688 in report_binlog_update
689 */
690 Trans_binlog_info *log_info= thd->semisync_info;
691 log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
692 log_pos= log_info ? log_info->log_pos : 0;
693
694 DBUG_ASSERT(!log_file || dirname_length(log_file) == 0);
695
696 if (is_real_trans &&
697 log_pos &&
698 wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
699 ret= commit_trx(log_file, log_pos);
700
701 if (is_real_trans && log_info)
702 {
703 log_info->log_file[0]= 0;
704 log_info->log_pos= 0;
705 }
706
707 return ret;
708 }
709
wait_after_rollback(THD * thd,bool all)710 int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all)
711 {
712 return wait_after_commit(thd, all);
713 }
714
715 /**
716 The method runs after flush to binary log is done.
717 */
report_binlog_update(THD * thd,const char * log_file,my_off_t log_pos)718 int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file,
719 my_off_t log_pos)
720 {
721 if (get_master_enabled())
722 {
723 Trans_binlog_info *log_info;
724
725 if (!(log_info= thd->semisync_info))
726 {
727 if(!(log_info=
728 (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
729 return 1;
730 thd->semisync_info= log_info;
731 }
732 strcpy(log_info->log_file, log_file + dirname_length(log_file));
733 log_info->log_pos = log_pos;
734
735 return write_tranx_in_binlog(log_info->log_file, log_pos);
736 }
737
738 return 0;
739 }
740
dump_start(THD * thd,const char * log_file,my_off_t log_pos)741 int Repl_semi_sync_master::dump_start(THD* thd,
742 const char *log_file,
743 my_off_t log_pos)
744 {
745 if (!thd->semi_sync_slave)
746 return 0;
747
748 if (ack_receiver.add_slave(thd))
749 {
750 sql_print_error("Failed to register slave to semi-sync ACK receiver "
751 "thread. Turning off semisync");
752 thd->semi_sync_slave= 0;
753 return 1;
754 }
755
756 add_slave();
757 report_reply_binlog(thd->variables.server_id,
758 log_file + dirname_length(log_file), log_pos);
759 sql_print_information("Start semi-sync binlog_dump to slave "
760 "(server_id: %ld), pos(%s, %lu)",
761 (long) thd->variables.server_id, log_file,
762 (ulong) log_pos);
763
764 return 0;
765 }
766
dump_end(THD * thd)767 void Repl_semi_sync_master::dump_end(THD* thd)
768 {
769 if (!thd->semi_sync_slave)
770 return;
771
772 sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %ld)",
773 (long) thd->variables.server_id);
774
775 remove_slave();
776 ack_receiver.remove_slave(thd);
777
778 return;
779 }
780
commit_trx(const char * trx_wait_binlog_name,my_off_t trx_wait_binlog_pos)781 int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
782 my_off_t trx_wait_binlog_pos)
783 {
784 DBUG_ENTER("Repl_semi_sync_master::commit_trx");
785
786 if (get_master_enabled() && trx_wait_binlog_name)
787 {
788 struct timespec start_ts;
789 struct timespec abstime;
790 int wait_result;
791 PSI_stage_info old_stage;
792 THD *thd= current_thd;
793
794 set_timespec(start_ts, 0);
795
796 DEBUG_SYNC(thd, "rpl_semisync_master_commit_trx_before_lock");
797 /* Acquire the mutex. */
798 lock();
799
800 /* This must be called after acquired the lock */
801 THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog,
802 & stage_waiting_for_semi_sync_ack_from_slave,
803 & old_stage);
804
805 /* This is the real check inside the mutex. */
806 if (!get_master_enabled() || !is_on())
807 goto l_end;
808
809 DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)",
810 "Repl_semi_sync_master::commit_trx",
811 trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
812 (int)is_on()));
813
814 while (is_on() && !thd_killed(thd))
815 {
816 if (m_reply_file_name_inited)
817 {
818 int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
819 trx_wait_binlog_name,
820 trx_wait_binlog_pos);
821 if (cmp >= 0)
822 {
823 /* We have already sent the relevant binlog to the slave: no need to
824 * wait here.
825 */
826 DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),",
827 "Repl_semi_sync_master::commit_trx",
828 m_reply_file_name,
829 (ulong)m_reply_file_pos));
830 break;
831 }
832 }
833
834 /* Let us update the info about the minimum binlog position of waiting
835 * threads.
836 */
837 if (m_wait_file_name_inited)
838 {
839 int cmp = Active_tranx::compare(trx_wait_binlog_name,
840 trx_wait_binlog_pos,
841 m_wait_file_name, m_wait_file_pos);
842 if (cmp <= 0)
843 {
844 /* This thd has a lower position, let's update the minimum info. */
845 strmake_buf(m_wait_file_name, trx_wait_binlog_name);
846 m_wait_file_pos = trx_wait_binlog_pos;
847
848 rpl_semi_sync_master_wait_pos_backtraverse++;
849 DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),",
850 "Repl_semi_sync_master::commit_trx",
851 m_wait_file_name, (ulong)m_wait_file_pos));
852 }
853 }
854 else
855 {
856 strmake_buf(m_wait_file_name, trx_wait_binlog_name);
857 m_wait_file_pos = trx_wait_binlog_pos;
858 m_wait_file_name_inited = true;
859
860 DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),",
861 "Repl_semi_sync_master::commit_trx",
862 m_wait_file_name, (ulong)m_wait_file_pos));
863 }
864
865 /* Calcuate the waiting period. */
866 long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND);
867 long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
868 long nsecs = start_ts.tv_nsec + diff_nsecs;
869 abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
870 abstime.tv_nsec = nsecs % TIME_BILLION;
871
872 /* In semi-synchronous replication, we wait until the binlog-dump
873 * thread has received the reply on the relevant binlog segment from the
874 * replication slave.
875 *
876 * Let us suspend this thread to wait on the condition;
877 * when replication has progressed far enough, we will release
878 * these waiting threads.
879 */
880 rpl_semi_sync_master_wait_sessions++;
881
882 DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
883 "Repl_semi_sync_master::commit_trx",
884 m_wait_timeout,
885 m_wait_file_name, (ulong)m_wait_file_pos));
886
887 wait_result = cond_timewait(&abstime);
888 rpl_semi_sync_master_wait_sessions--;
889
890 if (wait_result != 0)
891 {
892 /* This is a real wait timeout. */
893 sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
894 "semi-sync up to file %s, position %lu.",
895 trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
896 m_reply_file_name, (ulong)m_reply_file_pos);
897 rpl_semi_sync_master_wait_timeouts++;
898
899 /* switch semi-sync off */
900 switch_off();
901 }
902 else
903 {
904 int wait_time;
905
906 wait_time = get_wait_time(start_ts);
907 if (wait_time < 0)
908 {
909 DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at "
910 "wait position (%s, %lu)",
911 trx_wait_binlog_name,
912 (ulong)trx_wait_binlog_pos));
913 rpl_semi_sync_master_timefunc_fails++;
914 }
915 else
916 {
917 rpl_semi_sync_master_trx_wait_num++;
918 rpl_semi_sync_master_trx_wait_time += wait_time;
919 }
920 }
921 }
922
923 /*
924 At this point, the binlog file and position of this transaction
925 must have been removed from Active_tranx.
926 m_active_tranxs may be NULL if someone disabled semi sync during
927 cond_timewait()
928 */
929 assert(thd_killed(thd) || !m_active_tranxs ||
930 !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
931 trx_wait_binlog_pos));
932
933 l_end:
934 /* Update the status counter. */
935 if (is_on())
936 rpl_semi_sync_master_yes_transactions++;
937 else
938 rpl_semi_sync_master_no_transactions++;
939
940 /* The lock held will be released by thd_exit_cond, so no need to
941 call unlock() here */
942 THD_EXIT_COND(thd, &old_stage);
943 }
944
945 DBUG_RETURN(0);
946 }
947
948 /* Indicate that semi-sync replication is OFF now.
949 *
950 * What should we do when it is disabled? The problem is that we want
951 * the semi-sync replication enabled again when the slave catches up
952 * later. But, it is not that easy to detect that the slave has caught
953 * up. This is caused by the fact that MySQL's replication protocol is
954 * asynchronous, meaning that if the master does not use the semi-sync
955 * protocol, the slave would not send anything to the master.
956 * Still, if the master is sending (N+1)-th event, we assume that it is
957 * an indicator that the slave has received N-th event and earlier ones.
958 *
959 * If semi-sync is disabled, all transactions still update the wait
960 * position with the last position in binlog. But no transactions will
961 * wait for confirmations and the active transaction list would not be
962 * maintained. In binlog dump thread, update_sync_header() checks whether
963 * the current sending event catches up with last wait position. If it
964 * does match, semi-sync will be switched on again.
965 */
switch_off()966 void Repl_semi_sync_master::switch_off()
967 {
968 DBUG_ENTER("Repl_semi_sync_master::switch_off");
969
970 m_state = false;
971
972 /* Clear the active transaction list. */
973 assert(m_active_tranxs != NULL);
974 m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
975
976 rpl_semi_sync_master_off_times++;
977 m_wait_file_name_inited = false;
978 m_reply_file_name_inited = false;
979 sql_print_information("Semi-sync replication switched OFF.");
980 cond_broadcast(); /* wake up all waiting threads */
981
982 DBUG_VOID_RETURN;
983 }
984
try_switch_on(int server_id,const char * log_file_name,my_off_t log_file_pos)985 int Repl_semi_sync_master::try_switch_on(int server_id,
986 const char *log_file_name,
987 my_off_t log_file_pos)
988 {
989 bool semi_sync_on = false;
990
991 DBUG_ENTER("Repl_semi_sync_master::try_switch_on");
992
993 /* If the current sending event's position is larger than or equal to the
994 * 'largest' commit transaction binlog position, the slave is already
995 * catching up now and we can switch semi-sync on here.
996 * If m_commit_file_name_inited indicates there are no recent transactions,
997 * we can enable semi-sync immediately.
998 */
999 if (m_commit_file_name_inited)
1000 {
1001 int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1002 m_commit_file_name, m_commit_file_pos);
1003 semi_sync_on = (cmp >= 0);
1004 }
1005 else
1006 {
1007 semi_sync_on = true;
1008 }
1009
1010 if (semi_sync_on)
1011 {
1012 /* Switch semi-sync replication on. */
1013 m_state = true;
1014
1015 sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
1016 "at (%s, %lu)",
1017 server_id, log_file_name,
1018 (ulong)log_file_pos);
1019 }
1020
1021 DBUG_RETURN(0);
1022 }
1023
reserve_sync_header(String * packet)1024 int Repl_semi_sync_master::reserve_sync_header(String* packet)
1025 {
1026 DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header");
1027
1028 /* Set the magic number and the sync status. By default, no sync
1029 * is required.
1030 */
1031 packet->append(reinterpret_cast<const char*>(k_sync_header),
1032 sizeof(k_sync_header));
1033 DBUG_RETURN(0);
1034 }
1035
update_sync_header(THD * thd,unsigned char * packet,const char * log_file_name,my_off_t log_file_pos,bool * need_sync)1036 int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
1037 const char *log_file_name,
1038 my_off_t log_file_pos,
1039 bool* need_sync)
1040 {
1041 int cmp = 0;
1042 bool sync = false;
1043
1044 DBUG_ENTER("Repl_semi_sync_master::update_sync_header");
1045
1046 /* If the semi-sync master is not enabled, or the slave is not a semi-sync
1047 * target, do not request replies from the slave.
1048 */
1049 if (!get_master_enabled() || !thd->semi_sync_slave)
1050 {
1051 *need_sync = false;
1052 DBUG_RETURN(0);
1053 }
1054
1055 lock();
1056
1057 /* This is the real check inside the mutex. */
1058 if (!get_master_enabled())
1059 {
1060 assert(sync == false);
1061 goto l_end;
1062 }
1063
1064 if (is_on())
1065 {
1066 /* semi-sync is ON */
1067 sync = false; /* No sync unless a transaction is involved. */
1068
1069 if (m_reply_file_name_inited)
1070 {
1071 cmp = Active_tranx::compare(log_file_name, log_file_pos,
1072 m_reply_file_name, m_reply_file_pos);
1073 if (cmp <= 0)
1074 {
1075 /* If we have already got the reply for the event, then we do
1076 * not need to sync the transaction again.
1077 */
1078 goto l_end;
1079 }
1080 }
1081
1082 if (m_wait_file_name_inited)
1083 {
1084 cmp = Active_tranx::compare(log_file_name, log_file_pos,
1085 m_wait_file_name, m_wait_file_pos);
1086 }
1087 else
1088 {
1089 cmp = 1;
1090 }
1091
1092 /* If we are already waiting for some transaction replies which
1093 * are later in binlog, do not wait for this one event.
1094 */
1095 if (cmp >= 0)
1096 {
1097 /*
1098 * We only wait if the event is a transaction's ending event.
1099 */
1100 assert(m_active_tranxs != NULL);
1101 sync = m_active_tranxs->is_tranx_end_pos(log_file_name,
1102 log_file_pos);
1103 }
1104 }
1105 else
1106 {
1107 if (m_commit_file_name_inited)
1108 {
1109 int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1110 m_commit_file_name, m_commit_file_pos);
1111 sync = (cmp >= 0);
1112 }
1113 else
1114 {
1115 sync = true;
1116 }
1117 }
1118
1119 DBUG_PRINT("semisync", ("%s: server(%lu), (%s, %lu) sync(%d), repl(%d)",
1120 "Repl_semi_sync_master::update_sync_header",
1121 thd->variables.server_id, log_file_name,
1122 (ulong)log_file_pos, sync, (int)is_on()));
1123 *need_sync= sync;
1124
1125 l_end:
1126 unlock();
1127
1128 /* We do not need to clear sync flag because we set it to 0 when we
1129 * reserve the packet header.
1130 */
1131 if (sync)
1132 {
1133 (packet)[2] = k_packet_flag_sync;
1134 }
1135
1136 DBUG_RETURN(0);
1137 }
1138
write_tranx_in_binlog(const char * log_file_name,my_off_t log_file_pos)1139 int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
1140 my_off_t log_file_pos)
1141 {
1142 int result = 0;
1143
1144 DBUG_ENTER("Repl_semi_sync_master::write_tranx_in_binlog");
1145
1146 lock();
1147
1148 /* This is the real check inside the mutex. */
1149 if (!get_master_enabled())
1150 goto l_end;
1151
1152 /* Update the 'largest' transaction commit position seen so far even
1153 * though semi-sync is switched off.
1154 * It is much better that we update m_commit_file* here, instead of
1155 * inside commit_trx(). This is mostly because update_sync_header()
1156 * will watch for m_commit_file* to decide whether to switch semi-sync
1157 * on. The detailed reason is explained in function update_sync_header().
1158 */
1159 if (m_commit_file_name_inited)
1160 {
1161 int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1162 m_commit_file_name, m_commit_file_pos);
1163 if (cmp > 0)
1164 {
1165 /* This is a larger position, let's update the maximum info. */
1166 strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
1167 m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
1168 m_commit_file_pos = log_file_pos;
1169 }
1170 }
1171 else
1172 {
1173 strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
1174 m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
1175 m_commit_file_pos = log_file_pos;
1176 m_commit_file_name_inited = true;
1177 }
1178
1179 if (is_on())
1180 {
1181 assert(m_active_tranxs != NULL);
1182 if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos))
1183 {
1184 /*
1185 if insert tranx_node failed, print a warning message
1186 and turn off semi-sync
1187 */
1188 sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1189 log_file_name, (ulong)log_file_pos);
1190 switch_off();
1191 }
1192 else
1193 {
1194 rpl_semi_sync_master_request_ack++;
1195 }
1196 }
1197
1198 l_end:
1199 unlock();
1200
1201 DBUG_RETURN(result);
1202 }
1203
flush_net(THD * thd,const char * event_buf)1204 int Repl_semi_sync_master::flush_net(THD *thd,
1205 const char *event_buf)
1206 {
1207 int result = -1;
1208 NET* net= &thd->net;
1209
1210 DBUG_ENTER("Repl_semi_sync_master::flush_net");
1211
1212 assert((unsigned char)event_buf[1] == k_packet_magic_num);
1213 if ((unsigned char)event_buf[2] != k_packet_flag_sync)
1214 {
1215 /* current event does not require reply */
1216 result = 0;
1217 goto l_end;
1218 }
1219
1220 /* We flush to make sure that the current event is sent to the network,
1221 * instead of being buffered in the TCP/IP stack.
1222 */
1223 if (net_flush(net))
1224 {
1225 sql_print_error("Semi-sync master failed on net_flush() "
1226 "before waiting for slave reply");
1227 goto l_end;
1228 }
1229
1230 net_clear(net, 0);
1231 net->pkt_nr++;
1232 result = 0;
1233 rpl_semi_sync_master_net_wait_num++;
1234
1235 l_end:
1236 thd->clear_error();
1237
1238 DBUG_RETURN(result);
1239 }
1240
after_reset_master()1241 int Repl_semi_sync_master::after_reset_master()
1242 {
1243 int result = 0;
1244
1245 DBUG_ENTER("Repl_semi_sync_master::after_reset_master");
1246
1247 if (rpl_semi_sync_master_enabled)
1248 {
1249 sql_print_information("Enable Semi-sync Master after reset master");
1250 enable_master();
1251 }
1252
1253 lock();
1254
1255 if (rpl_semi_sync_master_clients == 0 &&
1256 !rpl_semi_sync_master_wait_no_slave)
1257 m_state = 0;
1258 else
1259 m_state = get_master_enabled()? 1 : 0;
1260
1261 m_wait_file_name_inited = false;
1262 m_reply_file_name_inited = false;
1263 m_commit_file_name_inited = false;
1264
1265 rpl_semi_sync_master_yes_transactions = 0;
1266 rpl_semi_sync_master_no_transactions = 0;
1267 rpl_semi_sync_master_off_times = 0;
1268 rpl_semi_sync_master_timefunc_fails = 0;
1269 rpl_semi_sync_master_wait_sessions = 0;
1270 rpl_semi_sync_master_wait_pos_backtraverse = 0;
1271 rpl_semi_sync_master_trx_wait_num = 0;
1272 rpl_semi_sync_master_trx_wait_time = 0;
1273 rpl_semi_sync_master_net_wait_num = 0;
1274 rpl_semi_sync_master_net_wait_time = 0;
1275
1276 unlock();
1277
1278 DBUG_RETURN(result);
1279 }
1280
before_reset_master()1281 int Repl_semi_sync_master::before_reset_master()
1282 {
1283 int result = 0;
1284
1285 DBUG_ENTER("Repl_semi_sync_master::before_reset_master");
1286
1287 if (rpl_semi_sync_master_enabled)
1288 disable_master();
1289
1290 DBUG_RETURN(result);
1291 }
1292
check_and_switch()1293 void Repl_semi_sync_master::check_and_switch()
1294 {
1295 lock();
1296 if (get_master_enabled() && is_on())
1297 {
1298 if (!rpl_semi_sync_master_wait_no_slave
1299 && rpl_semi_sync_master_clients == 0)
1300 switch_off();
1301 }
1302 unlock();
1303 }
1304
set_export_stats()1305 void Repl_semi_sync_master::set_export_stats()
1306 {
1307 lock();
1308
1309 rpl_semi_sync_master_status = m_state;
1310 rpl_semi_sync_master_avg_trx_wait_time=
1311 ((rpl_semi_sync_master_trx_wait_num) ?
1312 (ulong)((double)rpl_semi_sync_master_trx_wait_time /
1313 ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
1314 rpl_semi_sync_master_avg_net_wait_time=
1315 ((rpl_semi_sync_master_net_wait_num) ?
1316 (ulong)((double)rpl_semi_sync_master_net_wait_time /
1317 ((double)rpl_semi_sync_master_net_wait_num)) : 0);
1318
1319 unlock();
1320 }
1321
1322 /* Get the waiting time given the wait's staring time.
1323 *
1324 * Return:
1325 * >= 0: the waiting time in microsecons(us)
1326 * < 0: error in get time or time back traverse
1327 */
get_wait_time(const struct timespec & start_ts)1328 static int get_wait_time(const struct timespec& start_ts)
1329 {
1330 ulonglong start_usecs, end_usecs;
1331 struct timespec end_ts;
1332
1333 /* Starting time in microseconds(us). */
1334 start_usecs = timespec_to_usec(&start_ts);
1335
1336 /* Get the wait time interval. */
1337 set_timespec(end_ts, 0);
1338
1339 /* Ending time in microseconds(us). */
1340 end_usecs = timespec_to_usec(&end_ts);
1341
1342 if (end_usecs < start_usecs)
1343 return -1;
1344
1345 return (int)(end_usecs - start_usecs);
1346 }
1347
semi_sync_master_deinit()1348 void semi_sync_master_deinit()
1349 {
1350 repl_semisync_master.cleanup();
1351 ack_receiver.cleanup();
1352 }
1353