1 /* Copyright (C) 2007 Google Inc.
2 Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24
25 #include "semisync_master.h"
26 #if defined(ENABLED_DEBUG_SYNC)
27 #include "debug_sync.h"
28 #include "sql_class.h"
29 #endif
30
31 #define TIME_THOUSAND 1000
32 #define TIME_MILLION 1000000
33 #define TIME_BILLION 1000000000
34
35 /* This indicates whether semi-synchronous replication is enabled. */
36 char rpl_semi_sync_master_enabled;
37 unsigned long rpl_semi_sync_master_timeout;
38 unsigned long rpl_semi_sync_master_trace_level;
39 char rpl_semi_sync_master_status = 0;
40 unsigned long rpl_semi_sync_master_yes_transactions = 0;
41 unsigned long rpl_semi_sync_master_no_transactions = 0;
42 unsigned long rpl_semi_sync_master_off_times = 0;
43 unsigned long rpl_semi_sync_master_timefunc_fails = 0;
44 unsigned long rpl_semi_sync_master_wait_timeouts = 0;
45 unsigned long rpl_semi_sync_master_wait_sessions = 0;
46 unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
47 unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
48 unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
49 unsigned long rpl_semi_sync_master_avg_net_wait_time = 0;
50 unsigned long long rpl_semi_sync_master_net_wait_num = 0;
51 unsigned long rpl_semi_sync_master_clients = 0;
52 unsigned long long rpl_semi_sync_master_net_wait_time = 0;
53 unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
54 char rpl_semi_sync_master_wait_no_slave = 1;
55
56
57 static int getWaitTime(const struct timespec& start_ts);
58
timespec_to_usec(const struct timespec * ts)59 static unsigned long long timespec_to_usec(const struct timespec *ts)
60 {
61 #ifdef HAVE_STRUCT_TIMESPEC
62 return (unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
63 #else
64 return ts->tv.i64 / 10;
65 #endif /* __WIN__ */
66 }
67
68 /*******************************************************************************
69 *
70 * <ActiveTranx> class : manage all active transaction nodes
71 *
72 ******************************************************************************/
73
ActiveTranx(mysql_mutex_t * lock,unsigned long trace_level)74 ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
75 unsigned long trace_level)
76 : Trace(trace_level), allocator_(max_connections),
77 num_entries_(max_connections << 1), /* Transaction hash table size
78 * is set to double the size
79 * of max_connections */
80 lock_(lock)
81 {
82 /* No transactions are in the list initially. */
83 trx_front_ = NULL;
84 trx_rear_ = NULL;
85
86 /* Create the hash table to find a transaction's ending event. */
87 trx_htb_ = new TranxNode *[num_entries_];
88 for (int idx = 0; idx < num_entries_; ++idx)
89 trx_htb_[idx] = NULL;
90
91 sql_print_information("Semi-sync replication initialized for transactions.");
92 }
93
~ActiveTranx()94 ActiveTranx::~ActiveTranx()
95 {
96 delete [] trx_htb_;
97 trx_htb_ = NULL;
98 num_entries_ = 0;
99 }
100
calc_hash(const unsigned char * key,unsigned int length)101 unsigned int ActiveTranx::calc_hash(const unsigned char *key,
102 unsigned int length)
103 {
104 unsigned int nr = 1, nr2 = 4;
105
106 /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
107 while (length--)
108 {
109 nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
110 nr2 += 3;
111 }
112 return((unsigned int) nr);
113 }
114
get_hash_value(const char * log_file_name,my_off_t log_file_pos)115 unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
116 my_off_t log_file_pos)
117 {
118 unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
119 strlen(log_file_name));
120 unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
121 sizeof(log_file_pos));
122
123 return (hash1 + hash2) % num_entries_;
124 }
125
compare(const char * log_file_name1,my_off_t log_file_pos1,const char * log_file_name2,my_off_t log_file_pos2)126 int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
127 const char *log_file_name2, my_off_t log_file_pos2)
128 {
129 int cmp = strcmp(log_file_name1, log_file_name2);
130
131 if (cmp != 0)
132 return cmp;
133
134 if (log_file_pos1 > log_file_pos2)
135 return 1;
136 else if (log_file_pos1 < log_file_pos2)
137 return -1;
138 return 0;
139 }
140
insert_tranx_node(const char * log_file_name,my_off_t log_file_pos)141 int ActiveTranx::insert_tranx_node(const char *log_file_name,
142 my_off_t log_file_pos)
143 {
144 const char *kWho = "ActiveTranx:insert_tranx_node";
145 TranxNode *ins_node;
146 int result = 0;
147 unsigned int hash_val;
148
149 function_enter(kWho);
150
151 ins_node = allocator_.allocate_node();
152 if (!ins_node)
153 {
154 sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
155 kWho, log_file_name, (unsigned long)log_file_pos);
156 result = -1;
157 goto l_end;
158 }
159
160 /* insert the binlog position in the active transaction list. */
161 strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
162 ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
163 ins_node->log_pos_ = log_file_pos;
164
165 if (!trx_front_)
166 {
167 /* The list is empty. */
168 trx_front_ = trx_rear_ = ins_node;
169 }
170 else
171 {
172 int cmp = compare(ins_node, trx_rear_);
173 if (cmp > 0)
174 {
175 /* Compare with the tail first. If the transaction happens later in
176 * binlog, then make it the new tail.
177 */
178 trx_rear_->next_ = ins_node;
179 trx_rear_ = ins_node;
180 }
181 else
182 {
183 /* Otherwise, it is an error because the transaction should hold the
184 * mysql_bin_log.LOCK_log when appending events.
185 */
186 sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
187 "new node (%s, %lu)", kWho,
188 trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
189 ins_node->log_name_, (unsigned long)ins_node->log_pos_);
190 result = -1;
191 goto l_end;
192 }
193 }
194
195 hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
196 ins_node->hash_next_ = trx_htb_[hash_val];
197 trx_htb_[hash_val] = ins_node;
198
199 if (trace_level_ & kTraceDetail)
200 sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
201 ins_node->log_name_, (unsigned long)ins_node->log_pos_,
202 hash_val);
203
204 l_end:
205 return function_exit(kWho, result);
206 }
207
is_tranx_end_pos(const char * log_file_name,my_off_t log_file_pos)208 bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
209 my_off_t log_file_pos)
210 {
211 const char *kWho = "ActiveTranx::is_tranx_end_pos";
212 function_enter(kWho);
213
214 unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
215 TranxNode *entry = trx_htb_[hash_val];
216
217 while (entry != NULL)
218 {
219 if (compare(entry, log_file_name, log_file_pos) == 0)
220 break;
221
222 entry = entry->hash_next_;
223 }
224
225 if (trace_level_ & kTraceDetail)
226 sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
227 log_file_name, (unsigned long)log_file_pos, hash_val);
228
229 function_exit(kWho, (entry != NULL));
230 return (entry != NULL);
231 }
232
signal_waiting_sessions_all()233 int ActiveTranx::signal_waiting_sessions_all()
234 {
235 const char *kWho = "ActiveTranx::signal_waiting_sessions_all";
236 function_enter(kWho);
237 for (TranxNode* entry= trx_front_; entry; entry=entry->next_)
238 mysql_cond_broadcast(&entry->cond);
239
240 return function_exit(kWho, 0);
241 }
242
signal_waiting_sessions_up_to(const char * log_file_name,my_off_t log_file_pos)243 int ActiveTranx::signal_waiting_sessions_up_to(const char *log_file_name,
244 my_off_t log_file_pos)
245 {
246 const char *kWho = "ActiveTranx::signal_waiting_sessions_up_to";
247 function_enter(kWho);
248
249 TranxNode* entry= trx_front_;
250 int cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
251 while (entry && cmp <= 0)
252 {
253 mysql_cond_broadcast(&entry->cond);
254 entry= entry->next_;
255 if (entry)
256 cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
257 }
258
259 return function_exit(kWho, (entry != NULL));
260 }
261
find_active_tranx_node(const char * log_file_name,my_off_t log_file_pos)262 TranxNode * ActiveTranx::find_active_tranx_node(const char *log_file_name,
263 my_off_t log_file_pos)
264 {
265 const char *kWho = "ActiveTranx::find_active_tranx_node";
266 function_enter(kWho);
267
268 TranxNode* entry= trx_front_;
269
270 while (entry)
271 {
272 if (ActiveTranx::compare(log_file_name, log_file_pos, entry->log_name_,
273 entry->log_pos_) <= 0)
274 break;
275 entry= entry->next_;
276 }
277 function_exit(kWho, 0);
278 return entry;
279 }
280
clear_active_tranx_nodes(const char * log_file_name,my_off_t log_file_pos)281 int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
282 my_off_t log_file_pos)
283 {
284 const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
285 TranxNode *new_front;
286
287 function_enter(kWho);
288
289 if (log_file_name != NULL)
290 {
291 new_front = trx_front_;
292
293 while (new_front)
294 {
295 if (compare(new_front, log_file_name, log_file_pos) > 0 ||
296 new_front->n_waiters > 0)
297 break;
298 new_front = new_front->next_;
299 }
300 }
301 else
302 {
303 /* If log_file_name is NULL, clear everything. */
304 new_front = NULL;
305 }
306
307 if (new_front == NULL)
308 {
309 /* No active transaction nodes after the call. */
310
311 /* Clear the hash table. */
312 memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
313 allocator_.free_all_nodes();
314
315 /* Clear the active transaction list. */
316 if (trx_front_ != NULL)
317 {
318 trx_front_ = NULL;
319 trx_rear_ = NULL;
320 }
321
322 if (trace_level_ & kTraceDetail)
323 sql_print_information("%s: cleared all nodes", kWho);
324 }
325 else if (new_front != trx_front_)
326 {
327 TranxNode *curr_node, *next_node;
328
329 /* Delete all transaction nodes before the confirmation point. */
330 int n_frees = 0;
331 curr_node = trx_front_;
332 while (curr_node != new_front)
333 {
334 next_node = curr_node->next_;
335 n_frees++;
336
337 /* Remove the node from the hash table. */
338 unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
339 TranxNode **hash_ptr = &(trx_htb_[hash_val]);
340 while ((*hash_ptr) != NULL)
341 {
342 if ((*hash_ptr) == curr_node)
343 {
344 (*hash_ptr) = curr_node->hash_next_;
345 break;
346 }
347 hash_ptr = &((*hash_ptr)->hash_next_);
348 }
349
350 curr_node = next_node;
351 }
352
353 trx_front_ = new_front;
354 allocator_.free_nodes_before(trx_front_);
355
356 if (trace_level_ & kTraceDetail)
357 sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
358 kWho, n_frees,
359 trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
360 }
361
362 return function_exit(kWho, 0);
363 }
364
365
366 /*******************************************************************************
367 *
368 * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
369 * <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave.
370 *
371 * The most important functions during semi-syn replication listed:
372 *
373 * Master:
374 * . reportReplyBinlog(): called by the binlog dump thread when it receives
375 * the slave's status information.
376 * . updateSyncHeader(): based on transaction waiting information, decide
377 * whether to request the slave to reply.
378 * . writeTranxInBinlog(): called by the transaction thread when it finishes
379 * writing all transaction events in binlog.
380 * . commitTrx(): transaction thread wait for the slave reply.
381 *
382 * Slave:
383 * . slaveReadSyncHeader(): read the semi-sync header from the master, get the
384 * sync status and get the payload for events.
385 * . slaveReply(): reply to the master about the replication progress.
386 *
387 ******************************************************************************/
388
ReplSemiSyncMaster()389 ReplSemiSyncMaster::ReplSemiSyncMaster()
390 : active_tranxs_(NULL),
391 init_done_(false),
392 reply_file_name_inited_(false),
393 reply_file_pos_(0L),
394 wait_file_name_inited_(false),
395 wait_file_pos_(0),
396 master_enabled_(false),
397 wait_timeout_(0L),
398 state_(0)
399 {
400 strcpy(reply_file_name_, "");
401 strcpy(wait_file_name_, "");
402 }
403
initObject()404 int ReplSemiSyncMaster::initObject()
405 {
406 int result;
407 const char *kWho = "ReplSemiSyncMaster::initObject";
408
409 if (init_done_)
410 {
411 fprintf(stderr, "%s called twice\n", kWho);
412 return 1;
413 }
414 init_done_ = true;
415
416 /* References to the parameter works after set_options(). */
417 setWaitTimeout(rpl_semi_sync_master_timeout);
418 setTraceLevel(rpl_semi_sync_master_trace_level);
419
420 /* Mutex initialization can only be done after MY_INIT(). */
421 mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
422 &LOCK_binlog_, MY_MUTEX_INIT_FAST);
423
424 if (rpl_semi_sync_master_enabled)
425 result = enableMaster();
426 else
427 result = disableMaster();
428
429 return result;
430 }
431
enableMaster()432 int ReplSemiSyncMaster::enableMaster()
433 {
434 int result = 0;
435
436 /* Must have the lock when we do enable of disable. */
437 lock();
438
439 if (!getMasterEnabled())
440 {
441 if (active_tranxs_ == NULL)
442 active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
443
444 if (active_tranxs_ != NULL)
445 {
446 commit_file_name_inited_ = false;
447 reply_file_name_inited_ = false;
448 wait_file_name_inited_ = false;
449
450 set_master_enabled(true);
451 state_ = true;
452 sql_print_information("Semi-sync replication enabled on the master.");
453 }
454 else
455 {
456 sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
457 result = -1;
458 }
459 }
460
461 unlock();
462
463 return result;
464 }
465
disableMaster()466 int ReplSemiSyncMaster::disableMaster()
467 {
468 /* Must have the lock when we do enable of disable. */
469 lock();
470
471 if (getMasterEnabled())
472 {
473 /* Switch off the semi-sync first so that waiting transaction will be
474 * waken up.
475 */
476 switch_off();
477
478 if ( active_tranxs_ && active_tranxs_->is_empty())
479 {
480 delete active_tranxs_;
481 active_tranxs_ = NULL;
482 }
483
484 reply_file_name_inited_ = false;
485 wait_file_name_inited_ = false;
486 commit_file_name_inited_ = false;
487
488 set_master_enabled(false);
489 sql_print_information("Semi-sync replication disabled on the master.");
490 }
491
492 unlock();
493
494 return 0;
495 }
496
~ReplSemiSyncMaster()497 ReplSemiSyncMaster::~ReplSemiSyncMaster()
498 {
499 if (init_done_)
500 {
501 mysql_mutex_destroy(&LOCK_binlog_);
502 }
503
504 delete active_tranxs_;
505 }
506
lock()507 void ReplSemiSyncMaster::lock()
508 {
509 mysql_mutex_lock(&LOCK_binlog_);
510 }
511
unlock()512 void ReplSemiSyncMaster::unlock()
513 {
514 mysql_mutex_unlock(&LOCK_binlog_);
515 }
516
add_slave()517 void ReplSemiSyncMaster::add_slave()
518 {
519 lock();
520 rpl_semi_sync_master_clients++;
521 unlock();
522 }
523
remove_slave()524 void ReplSemiSyncMaster::remove_slave()
525 {
526 lock();
527 rpl_semi_sync_master_clients--;
528
529 /* Only switch off if semi-sync is enabled and is on */
530 if (getMasterEnabled() && is_on())
531 {
532 /* If user has chosen not to wait or if the server is shutting down if no
533 semi-sync slave available and the last semi-sync slave exits, turn off
534 semi-sync on master immediately.
535 */
536 if (rpl_semi_sync_master_clients == 0 &&
537 (!rpl_semi_sync_master_wait_no_slave || abort_loop))
538 {
539 if (abort_loop)
540 {
541 if (commit_file_name_inited_ && reply_file_name_inited_)
542 {
543 int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_ ,
544 commit_file_name_, commit_file_pos_);
545 if (cmp < 0)
546 sql_print_warning("SEMISYNC: Forced shutdown. Some updates might "
547 "not be replicated.");
548 }
549 }
550 switch_off();
551 }
552 }
553 unlock();
554 }
555
is_semi_sync_slave()556 bool ReplSemiSyncMaster::is_semi_sync_slave()
557 {
558 int null_value;
559 long long val= 0;
560 get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
561 return val;
562 }
563
reportReplyBinlog(uint32 server_id,const char * log_file_name,my_off_t log_file_pos,bool skipped_event)564 int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
565 const char *log_file_name,
566 my_off_t log_file_pos,
567 bool skipped_event)
568 {
569 const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
570 int cmp;
571 bool can_release_threads = false;
572 bool need_copy_send_pos = true;
573
574 if (!(getMasterEnabled()))
575 return 0;
576
577 function_enter(kWho);
578
579 lock();
580
581 /* This is the real check inside the mutex. */
582 if (!getMasterEnabled())
583 goto l_end;
584
585 if (!is_on())
586 /* We check to see whether we can switch semi-sync ON. */
587 try_switch_on(server_id, log_file_name, log_file_pos);
588
589 /* The position should increase monotonically, if there is only one
590 * thread sending the binlog to the slave.
591 * In reality, to improve the transaction availability, we allow multiple
592 * sync replication slaves. So, if any one of them get the transaction,
593 * the transaction session in the primary can move forward.
594 */
595 if (reply_file_name_inited_)
596 {
597 cmp = ActiveTranx::compare(log_file_name, log_file_pos,
598 reply_file_name_, reply_file_pos_);
599
600 /* If the requested position is behind the sending binlog position,
601 * would not adjust sending binlog position.
602 * We based on the assumption that there are multiple semi-sync slave,
603 * and at least one of them shou/ld be up to date.
604 * If all semi-sync slaves are behind, at least initially, the primary
605 * can find the situation after the waiting timeout. After that, some
606 * slaves should catch up quickly.
607 */
608 if (cmp < 0)
609 {
610 /* If the position is behind, do not copy it. */
611 need_copy_send_pos = false;
612 }
613 }
614
615 if (need_copy_send_pos)
616 {
617 strncpy(reply_file_name_, log_file_name, sizeof(reply_file_name_) - 1);
618 reply_file_name_[sizeof(reply_file_name_) - 1]= '\0';
619 reply_file_pos_ = log_file_pos;
620 reply_file_name_inited_ = true;
621
622 if (trace_level_ & kTraceDetail)
623 {
624 if(!skipped_event)
625 sql_print_information("%s: Got reply at (%s, %lu)", kWho,
626 log_file_name, (unsigned long)log_file_pos);
627 else
628 sql_print_information("%s: Transaction skipped at (%s, %lu)", kWho,
629 log_file_name, (unsigned long)log_file_pos);
630 }
631 }
632
633 if (rpl_semi_sync_master_wait_sessions > 0)
634 {
635 /* Let us check if some of the waiting threads doing a trx
636 * commit can now proceed.
637 */
638 cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
639 wait_file_name_, wait_file_pos_);
640 if (cmp >= 0)
641 {
642 /* Yes, at least one waiting thread can now proceed:
643 * let us release all waiting threads with a broadcast
644 */
645 can_release_threads = true;
646 wait_file_name_inited_ = false;
647 }
648 }
649
650 l_end:
651
652 if (can_release_threads)
653 {
654 if (trace_level_ & kTraceDetail)
655 sql_print_information("%s: signal all waiting threads.", kWho);
656 active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
657 }
658 unlock();
659 return function_exit(kWho, 0);
660 }
661
commitTrx(const char * trx_wait_binlog_name,my_off_t trx_wait_binlog_pos)662 int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
663 my_off_t trx_wait_binlog_pos)
664 {
665 const char *kWho = "ReplSemiSyncMaster::commitTrx";
666
667 function_enter(kWho);
668 PSI_stage_info old_stage;
669
670 #if defined(ENABLED_DEBUG_SYNC)
671 /* debug sync may not be initialized for a master */
672 if (current_thd->debug_sync_control)
673 DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
674 #endif
675 /* Acquire the mutex. */
676 lock();
677
678 TranxNode* entry= NULL;
679 mysql_cond_t* thd_cond= NULL;
680 bool is_semi_sync_trans= true;
681 if (active_tranxs_ != NULL && trx_wait_binlog_name)
682 {
683 entry=
684 active_tranxs_->find_active_tranx_node(trx_wait_binlog_name,
685 trx_wait_binlog_pos);
686 if (entry)
687 thd_cond= &entry->cond;
688 }
689 /* This must be called after acquired the lock */
690 THD_ENTER_COND(NULL, thd_cond, &LOCK_binlog_,
691 & stage_waiting_for_semi_sync_ack_from_slave,
692 & old_stage);
693
694 if (getMasterEnabled() && trx_wait_binlog_name)
695 {
696 struct timespec start_ts;
697 struct timespec abstime;
698 int wait_result;
699
700 set_timespec(start_ts, 0);
701 /* This is the real check inside the mutex. */
702 if (!getMasterEnabled() || !is_on())
703 goto l_end;
704
705 if (trace_level_ & kTraceDetail)
706 {
707 sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
708 trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
709 (int)is_on());
710 }
711
712 /* Calcuate the waiting period. */
713 #ifndef HAVE_STRUCT_TIMESPEC
714 abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
715 abstime.max_timeout_msec= (long)wait_timeout_;
716 #else
717 abstime.tv_sec = start_ts.tv_sec + wait_timeout_ / TIME_THOUSAND;
718 abstime.tv_nsec = start_ts.tv_nsec +
719 (wait_timeout_ % TIME_THOUSAND) * TIME_MILLION;
720 if (abstime.tv_nsec >= TIME_BILLION)
721 {
722 abstime.tv_sec++;
723 abstime.tv_nsec -= TIME_BILLION;
724 }
725 #endif /* __WIN__ */
726
727 while (is_on())
728 {
729 if (reply_file_name_inited_)
730 {
731 int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
732 trx_wait_binlog_name, trx_wait_binlog_pos);
733 if (cmp >= 0)
734 {
735 /* We have already sent the relevant binlog to the slave: no need to
736 * wait here.
737 */
738 if (trace_level_ & kTraceDetail)
739 sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
740 kWho, reply_file_name_, (unsigned long)reply_file_pos_);
741 break;
742 }
743 }
744 /*
745 When code reaches here an Entry object may not be present in the
746 following scenario.
747
748 Semi sync was not enabled when transaction entered into ordered_commit
749 process. During flush stage, semi sync was not enabled and there was no
750 'Entry' object created for the transaction being committed and at a
751 later stage it was enabled. In this case trx_wait_binlog_name and
752 trx_wait_binlog_pos are set but the 'Entry' object is not present. Hence
753 dump thread will not wait for reply from slave and it will not update
754 reply_file_name. In such case the committing transaction should not wait
755 for an ack from slave and it should be considered as an async
756 transaction.
757 */
758 if (!entry)
759 {
760 is_semi_sync_trans= false;
761 goto l_end;
762 }
763
764 /* Let us update the info about the minimum binlog position of waiting
765 * threads.
766 */
767 if (wait_file_name_inited_)
768 {
769 int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
770 wait_file_name_, wait_file_pos_);
771 if (cmp <= 0)
772 {
773 /* This thd has a lower position, let's update the minimum info. */
774 strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1);
775 wait_file_name_[sizeof(wait_file_name_) - 1]= '\0';
776 wait_file_pos_ = trx_wait_binlog_pos;
777
778 rpl_semi_sync_master_wait_pos_backtraverse++;
779 if (trace_level_ & kTraceDetail)
780 sql_print_information("%s: move back wait position (%s, %lu),",
781 kWho, wait_file_name_, (unsigned long)wait_file_pos_);
782 }
783 }
784 else
785 {
786 strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1);
787 wait_file_name_[sizeof(wait_file_name_) - 1]= '\0';
788 wait_file_pos_ = trx_wait_binlog_pos;
789 wait_file_name_inited_ = true;
790
791 if (trace_level_ & kTraceDetail)
792 sql_print_information("%s: init wait position (%s, %lu),",
793 kWho, wait_file_name_, (unsigned long)wait_file_pos_);
794 }
795
796 /* In semi-synchronous replication, we wait until the binlog-dump
797 * thread has received the reply on the relevant binlog segment from the
798 * replication slave.
799 *
800 * Let us suspend this thread to wait on the condition;
801 * when replication has progressed far enough, we will release
802 * these waiting threads.
803 */
804 if (abort_loop && rpl_semi_sync_master_clients == 0 && is_on())
805 {
806 sql_print_warning("SEMISYNC: Forced shutdown. Some updates might "
807 "not be replicated.");
808 switch_off();
809 break;
810 }
811 rpl_semi_sync_master_wait_sessions++;
812
813 if (trace_level_ & kTraceDetail)
814 sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
815 kWho, wait_timeout_,
816 wait_file_name_, (unsigned long)wait_file_pos_);
817
818 /* wait for the position to be ACK'ed back */
819 assert(entry);
820 entry->n_waiters++;
821 wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
822 entry->n_waiters--;
823 /*
824 After we release LOCK_binlog_ above while waiting for the condition,
825 it can happen that some other parallel client session executed
826 RESET MASTER. That can set rpl_semi_sync_master_wait_sessions to zero.
827 Hence check the value before decrementing it and decrement it only if it is
828 non-zero value.
829 */
830 if (rpl_semi_sync_master_wait_sessions > 0)
831 rpl_semi_sync_master_wait_sessions--;
832
833 if (wait_result != 0)
834 {
835 /* This is a real wait timeout. */
836 sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
837 "semi-sync up to file %s, position %lu.",
838 trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
839 reply_file_name_, (unsigned long)reply_file_pos_);
840 rpl_semi_sync_master_wait_timeouts++;
841
842 /* switch semi-sync off */
843 switch_off();
844 }
845 else
846 {
847 int wait_time;
848
849 wait_time = getWaitTime(start_ts);
850 if (wait_time < 0)
851 {
852 if (trace_level_ & kTraceGeneral)
853 {
854 sql_print_information("Assessment of waiting time for commitTrx "
855 "failed at wait position (%s, %lu)",
856 trx_wait_binlog_name,
857 (unsigned long)trx_wait_binlog_pos);
858 }
859 rpl_semi_sync_master_timefunc_fails++;
860 }
861 else
862 {
863 rpl_semi_sync_master_trx_wait_num++;
864 rpl_semi_sync_master_trx_wait_time += wait_time;
865 }
866 }
867 }
868
869 l_end:
870 /* Update the status counter. */
871 if (is_on() && is_semi_sync_trans)
872 rpl_semi_sync_master_yes_transactions++;
873 else
874 rpl_semi_sync_master_no_transactions++;
875
876 }
877
878 /* Last waiter removes the TranxNode */
879 if (trx_wait_binlog_name && active_tranxs_
880 && entry && entry->n_waiters == 0)
881 active_tranxs_->clear_active_tranx_nodes(trx_wait_binlog_name,
882 trx_wait_binlog_pos);
883
884 /* The lock held will be released by thd_exit_cond, so no need to
885 call unlock() here */
886 THD_EXIT_COND(NULL, & old_stage);
887 return function_exit(kWho, 0);
888 }
889
890 /* Indicate that semi-sync replication is OFF now.
891 *
892 * What should we do when it is disabled? The problem is that we want
893 * the semi-sync replication enabled again when the slave catches up
894 * later. But, it is not that easy to detect that the slave has caught
895 * up. This is caused by the fact that MySQL's replication protocol is
896 * asynchronous, meaning that if the master does not use the semi-sync
897 * protocol, the slave would not send anything to the master.
898 * Still, if the master is sending (N+1)-th event, we assume that it is
899 * an indicator that the slave has received N-th event and earlier ones.
900 *
901 * If semi-sync is disabled, all transactions still update the wait
902 * position with the last position in binlog. But no transactions will
903 * wait for confirmations maintained. In binlog dump thread,
904 * updateSyncHeader() checks whether the current sending event catches
905 * up with last wait position. If it does match, semi-sync will be
906 * switched on again.
907 */
switch_off()908 int ReplSemiSyncMaster::switch_off()
909 {
910 const char *kWho = "ReplSemiSyncMaster::switch_off";
911
912 function_enter(kWho);
913 state_ = false;
914
915 rpl_semi_sync_master_off_times++;
916 wait_file_name_inited_ = false;
917 reply_file_name_inited_ = false;
918 sql_print_information("Semi-sync replication switched OFF.");
919
920 /* signal waiting sessions */
921 active_tranxs_->signal_waiting_sessions_all();
922
923 return function_exit(kWho, 0);
924 }
925
try_switch_on(int server_id,const char * log_file_name,my_off_t log_file_pos)926 int ReplSemiSyncMaster::try_switch_on(int server_id,
927 const char *log_file_name,
928 my_off_t log_file_pos)
929 {
930 const char *kWho = "ReplSemiSyncMaster::try_switch_on";
931 bool semi_sync_on = false;
932
933 function_enter(kWho);
934
935 /* If the current sending event's position is larger than or equal to the
936 * 'largest' commit transaction binlog position, the slave is already
937 * catching up now and we can switch semi-sync on here.
938 * If commit_file_name_inited_ indicates there are no recent transactions,
939 * we can enable semi-sync immediately.
940 */
941 if (commit_file_name_inited_)
942 {
943 int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
944 commit_file_name_, commit_file_pos_);
945 semi_sync_on = (cmp >= 0);
946 }
947 else
948 {
949 semi_sync_on = true;
950 }
951
952 if (semi_sync_on)
953 {
954 /* Switch semi-sync replication on. */
955 state_ = true;
956
957 sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
958 "at (%s, %lu)",
959 server_id, log_file_name,
960 (unsigned long)log_file_pos);
961 }
962
963 return function_exit(kWho, 0);
964 }
965
reserveSyncHeader(unsigned char * header,unsigned long size)966 int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
967 unsigned long size)
968 {
969 const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
970 function_enter(kWho);
971
972 int hlen=0;
973 if (!is_semi_sync_slave())
974 {
975 hlen= 0;
976 }
977 else
978 {
979 /* No enough space for the extra header, disable semi-sync master */
980 if (sizeof(kSyncHeader) > size)
981 {
982 sql_print_warning("No enough space in the packet "
983 "for semi-sync extra header, "
984 "semi-sync replication disabled");
985 disableMaster();
986 return 0;
987 }
988
989 /* Set the magic number and the sync status. By default, no sync
990 * is required.
991 */
992 memcpy(header, kSyncHeader, sizeof(kSyncHeader));
993 hlen= sizeof(kSyncHeader);
994 }
995 return function_exit(kWho, hlen);
996 }
997
updateSyncHeader(unsigned char * packet,const char * log_file_name,my_off_t log_file_pos,uint32 server_id)998 int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
999 const char *log_file_name,
1000 my_off_t log_file_pos,
1001 uint32 server_id)
1002 {
1003 const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
1004 int cmp = 0;
1005 bool sync = false;
1006
1007 /* If the semi-sync master is not enabled, or the slave is not a semi-sync
1008 * target, do not request replies from the slave.
1009 */
1010 if (!getMasterEnabled() || !is_semi_sync_slave())
1011 return 0;
1012
1013 function_enter(kWho);
1014
1015 lock();
1016
1017 /* This is the real check inside the mutex. */
1018 if (!getMasterEnabled())
1019 goto l_end; // sync= false at this point in time
1020
1021 if (is_on())
1022 {
1023 /* semi-sync is ON */
1024 /* sync= false; No sync unless a transaction is involved. */
1025
1026 if (reply_file_name_inited_)
1027 {
1028 cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1029 reply_file_name_, reply_file_pos_);
1030 if (cmp <= 0)
1031 {
1032 /* If we have already got the reply for the event, then we do
1033 * not need to sync the transaction again.
1034 */
1035 goto l_end;
1036 }
1037 }
1038
1039 if (wait_file_name_inited_)
1040 {
1041 cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1042 wait_file_name_, wait_file_pos_);
1043 }
1044 else
1045 {
1046 cmp = 1;
1047 }
1048
1049 /* If we are already waiting for some transaction replies which
1050 * are later in binlog, do not wait for this one event.
1051 */
1052 if (cmp >= 0)
1053 {
1054 /*
1055 * We only wait if the event is a transaction's ending event.
1056 */
1057 assert(active_tranxs_ != NULL);
1058 sync = active_tranxs_->is_tranx_end_pos(log_file_name,
1059 log_file_pos);
1060 }
1061 }
1062 else
1063 {
1064 if (commit_file_name_inited_)
1065 {
1066 int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1067 commit_file_name_, commit_file_pos_);
1068 sync = (cmp >= 0);
1069 }
1070 else
1071 {
1072 sync = true;
1073 }
1074 }
1075
1076 if (trace_level_ & kTraceDetail)
1077 sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
1078 kWho, server_id, log_file_name,
1079 (unsigned long)log_file_pos, sync, (int)is_on());
1080
1081 l_end:
1082 unlock();
1083
1084 /* We do not need to clear sync flag because we set it to 0 when we
1085 * reserve the packet header.
1086 */
1087 if (sync)
1088 {
1089 (packet)[2] = kPacketFlagSync;
1090 }
1091
1092 return function_exit(kWho, 0);
1093 }
1094
writeTranxInBinlog(const char * log_file_name,my_off_t log_file_pos)1095 int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
1096 my_off_t log_file_pos)
1097 {
1098 const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
1099 int result = 0;
1100
1101 function_enter(kWho);
1102
1103 lock();
1104
1105 /* This is the real check inside the mutex. */
1106 if (!getMasterEnabled())
1107 goto l_end;
1108
1109 /* Update the 'largest' transaction commit position seen so far even
1110 * though semi-sync is switched off.
1111 * It is much better that we update commit_file_* here, instead of
1112 * inside commitTrx(). This is mostly because updateSyncHeader()
1113 * will watch for commit_file_* to decide whether to switch semi-sync
1114 * on. The detailed reason is explained in function updateSyncHeader().
1115 */
1116 if (commit_file_name_inited_)
1117 {
1118 int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1119 commit_file_name_, commit_file_pos_);
1120 if (cmp > 0)
1121 {
1122 /* This is a larger position, let's update the maximum info. */
1123 strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1124 commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1125 commit_file_pos_ = log_file_pos;
1126 }
1127 }
1128 else
1129 {
1130 strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1131 commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1132 commit_file_pos_ = log_file_pos;
1133 commit_file_name_inited_ = true;
1134 }
1135
1136 if (is_on())
1137 {
1138 assert(active_tranxs_ != NULL);
1139 if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
1140 {
1141 /*
1142 if insert tranx_node failed, print a warning message
1143 and turn off semi-sync
1144 */
1145 sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1146 log_file_name, (ulong)log_file_pos);
1147 switch_off();
1148 }
1149 }
1150
1151 l_end:
1152 unlock();
1153
1154 return function_exit(kWho, result);
1155 }
1156
skipSlaveReply(const char * event_buf,uint32 server_id,const char * skipped_log_file,my_off_t skipped_log_pos)1157 int ReplSemiSyncMaster::skipSlaveReply(const char *event_buf,
1158 uint32 server_id,
1159 const char* skipped_log_file,
1160 my_off_t skipped_log_pos)
1161 {
1162 const char *kWho = "ReplSemiSyncMaster::skipSlaveReply";
1163
1164 function_enter(kWho);
1165
1166 assert((unsigned char)event_buf[1] == kPacketMagicNum);
1167 if ((unsigned char)event_buf[2] != kPacketFlagSync)
1168 {
1169 /* current event would not require a reply anyway */
1170 goto l_end;
1171 }
1172
1173 reportReplyBinlog(server_id, skipped_log_file,
1174 skipped_log_pos, true);
1175
1176 l_end:
1177 return function_exit(kWho, 0);
1178 }
1179
readSlaveReply(NET * net,uint32 server_id,const char * event_buf)1180 int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
1181 const char *event_buf)
1182 {
1183 const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
1184 const unsigned char *packet;
1185 char log_file_name[FN_REFLEN];
1186 my_off_t log_file_pos;
1187 ulong log_file_len = 0;
1188 ulong packet_len;
1189 int result = -1;
1190
1191 struct timespec start_ts= { 0, 0 };
1192 ulong trc_level = trace_level_;
1193
1194 function_enter(kWho);
1195
1196 assert((unsigned char)event_buf[1] == kPacketMagicNum);
1197 if ((unsigned char)event_buf[2] != kPacketFlagSync)
1198 {
1199 /* current event does not require reply */
1200 result = 0;
1201 goto l_end;
1202 }
1203
1204 if (trc_level & kTraceNetWait)
1205 set_timespec(start_ts, 0);
1206
1207 /* We flush to make sure that the current event is sent to the network,
1208 * instead of being buffered in the TCP/IP stack.
1209 */
1210 if (net_flush(net))
1211 {
1212 sql_print_error("Semi-sync master failed on net_flush() "
1213 "before waiting for slave reply");
1214 goto l_end;
1215 }
1216
1217 net_clear(net, 0);
1218 if (trc_level & kTraceDetail)
1219 sql_print_information("%s: Wait for replica's reply", kWho);
1220
1221 /* Wait for the network here. Though binlog dump thread can indefinitely wait
1222 * here, transactions would not wait indefintely.
1223 * Transactions wait on binlog replies detected by binlog dump threads. If
1224 * binlog dump threads wait too long, transactions will timeout and continue.
1225 */
1226 packet_len = my_net_read(net);
1227
1228 if (trc_level & kTraceNetWait)
1229 {
1230 int wait_time = getWaitTime(start_ts);
1231 if (wait_time < 0)
1232 {
1233 sql_print_information("Assessment of waiting time for "
1234 "readSlaveReply failed.");
1235 rpl_semi_sync_master_timefunc_fails++;
1236 }
1237 else
1238 {
1239 rpl_semi_sync_master_net_wait_num++;
1240 rpl_semi_sync_master_net_wait_time += wait_time;
1241 }
1242 }
1243
1244 if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
1245 {
1246 if (packet_len == packet_error)
1247 sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
1248 net->last_error, net->last_errno);
1249 else
1250 sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
1251 net->last_error, net->last_errno);
1252 goto l_end;
1253 }
1254
1255 packet = net->read_pos;
1256 if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
1257 {
1258 sql_print_error("Read semi-sync reply magic number error");
1259 goto l_end;
1260 }
1261
1262 log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
1263 log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
1264 if (log_file_len >= FN_REFLEN)
1265 {
1266 sql_print_error("Read semi-sync reply binlog file length too large");
1267 goto l_end;
1268 }
1269 strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
1270 log_file_name[log_file_len] = 0;
1271
1272 if (trc_level & kTraceDetail)
1273 sql_print_information("%s: Got reply (%s, %lu)",
1274 kWho, log_file_name, (ulong)log_file_pos);
1275
1276 result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
1277
1278 l_end:
1279 return function_exit(kWho, result);
1280 }
1281
1282
resetMaster()1283 int ReplSemiSyncMaster::resetMaster()
1284 {
1285 const char *kWho = "ReplSemiSyncMaster::resetMaster";
1286 int result = 0;
1287
1288 function_enter(kWho);
1289
1290
1291 lock();
1292
1293 state_ = getMasterEnabled()? 1 : 0;
1294
1295 wait_file_name_inited_ = false;
1296 reply_file_name_inited_ = false;
1297 commit_file_name_inited_ = false;
1298
1299 rpl_semi_sync_master_yes_transactions = 0;
1300 rpl_semi_sync_master_no_transactions = 0;
1301 rpl_semi_sync_master_off_times = 0;
1302 rpl_semi_sync_master_timefunc_fails = 0;
1303 rpl_semi_sync_master_wait_sessions = 0;
1304 rpl_semi_sync_master_wait_pos_backtraverse = 0;
1305 rpl_semi_sync_master_trx_wait_num = 0;
1306 rpl_semi_sync_master_trx_wait_time = 0;
1307 rpl_semi_sync_master_net_wait_num = 0;
1308 rpl_semi_sync_master_net_wait_time = 0;
1309
1310 unlock();
1311
1312 return function_exit(kWho, result);
1313 }
1314
setExportStats()1315 void ReplSemiSyncMaster::setExportStats()
1316 {
1317 lock();
1318
1319 rpl_semi_sync_master_status = state_;
1320 rpl_semi_sync_master_avg_trx_wait_time=
1321 ((rpl_semi_sync_master_trx_wait_num) ?
1322 (unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
1323 ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
1324 rpl_semi_sync_master_avg_net_wait_time=
1325 ((rpl_semi_sync_master_net_wait_num) ?
1326 (unsigned long)((double)rpl_semi_sync_master_net_wait_time /
1327 ((double)rpl_semi_sync_master_net_wait_num)) : 0);
1328
1329 unlock();
1330 }
1331
1332 /* Get the waiting time given the wait's staring time.
1333 *
1334 * Return:
1335 * >= 0: the waiting time in microsecons(us)
1336 * < 0: error in get time or time back traverse
1337 */
getWaitTime(const struct timespec & start_ts)1338 static int getWaitTime(const struct timespec& start_ts)
1339 {
1340 unsigned long long start_usecs, end_usecs;
1341 struct timespec end_ts;
1342
1343 /* Starting time in microseconds(us). */
1344 start_usecs = timespec_to_usec(&start_ts);
1345
1346 /* Get the wait time interval. */
1347 set_timespec(end_ts, 0);
1348
1349 /* Ending time in microseconds(us). */
1350 end_usecs = timespec_to_usec(&end_ts);
1351
1352 if (end_usecs < start_usecs)
1353 return -1;
1354
1355 return (int)(end_usecs - start_usecs);
1356 }
1357