1 /* Copyright (C) 2007 Google Inc.
2    Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
23 
24 
25 #include "semisync_master.h"
26 #if defined(ENABLED_DEBUG_SYNC)
27 #include "debug_sync.h"
28 #include "sql_class.h"
29 #endif
30 
31 #define TIME_THOUSAND 1000
32 #define TIME_MILLION  1000000
33 #define TIME_BILLION  1000000000
34 
35 /* This indicates whether semi-synchronous replication is enabled. */
36 char rpl_semi_sync_master_enabled;
37 unsigned long rpl_semi_sync_master_timeout;
38 unsigned long rpl_semi_sync_master_trace_level;
39 char rpl_semi_sync_master_status                    = 0;
40 unsigned long rpl_semi_sync_master_yes_transactions = 0;
41 unsigned long rpl_semi_sync_master_no_transactions  = 0;
42 unsigned long rpl_semi_sync_master_off_times        = 0;
43 unsigned long rpl_semi_sync_master_timefunc_fails   = 0;
44 unsigned long rpl_semi_sync_master_wait_timeouts     = 0;
45 unsigned long rpl_semi_sync_master_wait_sessions    = 0;
46 unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
47 unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
48 unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
49 unsigned long rpl_semi_sync_master_avg_net_wait_time    = 0;
50 unsigned long long rpl_semi_sync_master_net_wait_num = 0;
51 unsigned long rpl_semi_sync_master_clients          = 0;
52 unsigned long long rpl_semi_sync_master_net_wait_time = 0;
53 unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
54 char rpl_semi_sync_master_wait_no_slave = 1;
55 
56 
57 static int getWaitTime(const struct timespec& start_ts);
58 
timespec_to_usec(const struct timespec * ts)59 static unsigned long long timespec_to_usec(const struct timespec *ts)
60 {
61 #ifdef HAVE_STRUCT_TIMESPEC
62   return (unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
63 #else
64   return ts->tv.i64 / 10;
65 #endif /* __WIN__ */
66 }
67 
68 /*******************************************************************************
69  *
70  * <ActiveTranx> class : manage all active transaction nodes
71  *
72  ******************************************************************************/
73 
ActiveTranx(mysql_mutex_t * lock,unsigned long trace_level)74 ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
75 			 unsigned long trace_level)
76   : Trace(trace_level), allocator_(max_connections),
77     num_entries_(max_connections << 1), /* Transaction hash table size
78                                          * is set to double the size
79                                          * of max_connections */
80     lock_(lock)
81 {
82   /* No transactions are in the list initially. */
83   trx_front_ = NULL;
84   trx_rear_  = NULL;
85 
86   /* Create the hash table to find a transaction's ending event. */
87   trx_htb_ = new TranxNode *[num_entries_];
88   for (int idx = 0; idx < num_entries_; ++idx)
89     trx_htb_[idx] = NULL;
90 
91   sql_print_information("Semi-sync replication initialized for transactions.");
92 }
93 
~ActiveTranx()94 ActiveTranx::~ActiveTranx()
95 {
96   delete [] trx_htb_;
97   trx_htb_          = NULL;
98   num_entries_      = 0;
99 }
100 
calc_hash(const unsigned char * key,unsigned int length)101 unsigned int ActiveTranx::calc_hash(const unsigned char *key,
102                                     unsigned int length)
103 {
104   unsigned int nr = 1, nr2 = 4;
105 
106   /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
107   while (length--)
108   {
109     nr  ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
110     nr2 += 3;
111   }
112   return((unsigned int) nr);
113 }
114 
get_hash_value(const char * log_file_name,my_off_t log_file_pos)115 unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
116 				 my_off_t    log_file_pos)
117 {
118   unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
119                                  strlen(log_file_name));
120   unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
121                                  sizeof(log_file_pos));
122 
123   return (hash1 + hash2) % num_entries_;
124 }
125 
compare(const char * log_file_name1,my_off_t log_file_pos1,const char * log_file_name2,my_off_t log_file_pos2)126 int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
127 			 const char *log_file_name2, my_off_t log_file_pos2)
128 {
129   int cmp = strcmp(log_file_name1, log_file_name2);
130 
131   if (cmp != 0)
132     return cmp;
133 
134   if (log_file_pos1 > log_file_pos2)
135     return 1;
136   else if (log_file_pos1 < log_file_pos2)
137     return -1;
138   return 0;
139 }
140 
insert_tranx_node(const char * log_file_name,my_off_t log_file_pos)141 int ActiveTranx::insert_tranx_node(const char *log_file_name,
142 				   my_off_t log_file_pos)
143 {
144   const char *kWho = "ActiveTranx:insert_tranx_node";
145   TranxNode  *ins_node;
146   int         result = 0;
147   unsigned int        hash_val;
148 
149   function_enter(kWho);
150 
151   ins_node = allocator_.allocate_node();
152   if (!ins_node)
153   {
154     sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
155                     kWho, log_file_name, (unsigned long)log_file_pos);
156     result = -1;
157     goto l_end;
158   }
159 
160   /* insert the binlog position in the active transaction list. */
161   strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
162   ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
163   ins_node->log_pos_ = log_file_pos;
164 
165   if (!trx_front_)
166   {
167     /* The list is empty. */
168     trx_front_ = trx_rear_ = ins_node;
169   }
170   else
171   {
172     int cmp = compare(ins_node, trx_rear_);
173     if (cmp > 0)
174     {
175       /* Compare with the tail first.  If the transaction happens later in
176        * binlog, then make it the new tail.
177        */
178       trx_rear_->next_ = ins_node;
179       trx_rear_        = ins_node;
180     }
181     else
182     {
183       /* Otherwise, it is an error because the transaction should hold the
184        * mysql_bin_log.LOCK_log when appending events.
185        */
186       sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
187                       "new node (%s, %lu)", kWho,
188                       trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
189                       ins_node->log_name_, (unsigned long)ins_node->log_pos_);
190       result = -1;
191       goto l_end;
192     }
193   }
194 
195   hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
196   ins_node->hash_next_ = trx_htb_[hash_val];
197   trx_htb_[hash_val]   = ins_node;
198 
199   if (trace_level_ & kTraceDetail)
200     sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
201                           ins_node->log_name_, (unsigned long)ins_node->log_pos_,
202                           hash_val);
203 
204  l_end:
205   return function_exit(kWho, result);
206 }
207 
is_tranx_end_pos(const char * log_file_name,my_off_t log_file_pos)208 bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
209 				   my_off_t    log_file_pos)
210 {
211   const char *kWho = "ActiveTranx::is_tranx_end_pos";
212   function_enter(kWho);
213 
214   unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
215   TranxNode *entry = trx_htb_[hash_val];
216 
217   while (entry != NULL)
218   {
219     if (compare(entry, log_file_name, log_file_pos) == 0)
220       break;
221 
222     entry = entry->hash_next_;
223   }
224 
225   if (trace_level_ & kTraceDetail)
226     sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
227                           log_file_name, (unsigned long)log_file_pos, hash_val);
228 
229   function_exit(kWho, (entry != NULL));
230   return (entry != NULL);
231 }
232 
signal_waiting_sessions_all()233 int ActiveTranx::signal_waiting_sessions_all()
234 {
235   const char *kWho = "ActiveTranx::signal_waiting_sessions_all";
236   function_enter(kWho);
237   for (TranxNode* entry= trx_front_; entry; entry=entry->next_)
238     mysql_cond_broadcast(&entry->cond);
239 
240   return function_exit(kWho, 0);
241 }
242 
signal_waiting_sessions_up_to(const char * log_file_name,my_off_t log_file_pos)243 int ActiveTranx::signal_waiting_sessions_up_to(const char *log_file_name,
244                                                my_off_t log_file_pos)
245 {
246   const char *kWho = "ActiveTranx::signal_waiting_sessions_up_to";
247   function_enter(kWho);
248 
249   TranxNode* entry= trx_front_;
250   int cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
251   while (entry && cmp <= 0)
252   {
253     mysql_cond_broadcast(&entry->cond);
254     entry= entry->next_;
255     if (entry)
256       cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
257   }
258 
259   return function_exit(kWho, (entry != NULL));
260 }
261 
find_active_tranx_node(const char * log_file_name,my_off_t log_file_pos)262 TranxNode * ActiveTranx::find_active_tranx_node(const char *log_file_name,
263                                                 my_off_t log_file_pos)
264 {
265   const char *kWho = "ActiveTranx::find_active_tranx_node";
266   function_enter(kWho);
267 
268   TranxNode* entry= trx_front_;
269 
270   while (entry)
271   {
272     if (ActiveTranx::compare(log_file_name, log_file_pos, entry->log_name_,
273                              entry->log_pos_) <= 0)
274       break;
275     entry= entry->next_;
276   }
277   function_exit(kWho, 0);
278   return entry;
279 }
280 
clear_active_tranx_nodes(const char * log_file_name,my_off_t log_file_pos)281 int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
282 					  my_off_t log_file_pos)
283 {
284   const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
285   TranxNode *new_front;
286 
287   function_enter(kWho);
288 
289   if (log_file_name != NULL)
290   {
291     new_front = trx_front_;
292 
293     while (new_front)
294     {
295       if (compare(new_front, log_file_name, log_file_pos) > 0 ||
296           new_front->n_waiters > 0)
297         break;
298       new_front = new_front->next_;
299     }
300   }
301   else
302   {
303     /* If log_file_name is NULL, clear everything. */
304     new_front = NULL;
305   }
306 
307   if (new_front == NULL)
308   {
309     /* No active transaction nodes after the call. */
310 
311     /* Clear the hash table. */
312     memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
313     allocator_.free_all_nodes();
314 
315     /* Clear the active transaction list. */
316     if (trx_front_ != NULL)
317     {
318       trx_front_ = NULL;
319       trx_rear_  = NULL;
320     }
321 
322     if (trace_level_ & kTraceDetail)
323       sql_print_information("%s: cleared all nodes", kWho);
324   }
325   else if (new_front != trx_front_)
326   {
327     TranxNode *curr_node, *next_node;
328 
329     /* Delete all transaction nodes before the confirmation point. */
330     int n_frees = 0;
331     curr_node = trx_front_;
332     while (curr_node != new_front)
333     {
334       next_node = curr_node->next_;
335       n_frees++;
336 
337       /* Remove the node from the hash table. */
338       unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
339       TranxNode **hash_ptr = &(trx_htb_[hash_val]);
340       while ((*hash_ptr) != NULL)
341       {
342         if ((*hash_ptr) == curr_node)
343 	{
344           (*hash_ptr) = curr_node->hash_next_;
345           break;
346         }
347         hash_ptr = &((*hash_ptr)->hash_next_);
348       }
349 
350       curr_node = next_node;
351     }
352 
353     trx_front_ = new_front;
354     allocator_.free_nodes_before(trx_front_);
355 
356     if (trace_level_ & kTraceDetail)
357       sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
358                             kWho, n_frees,
359                             trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
360   }
361 
362   return function_exit(kWho, 0);
363 }
364 
365 
366 /*******************************************************************************
367  *
368  * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
369  * <ReplSemiSyncSlave>  class: the basic code layer for sync-replication slave.
370  *
371  * The most important functions during semi-syn replication listed:
372  *
373  * Master:
374  *  . reportReplyBinlog():  called by the binlog dump thread when it receives
375  *                          the slave's status information.
376  *  . updateSyncHeader():   based on transaction waiting information, decide
377  *                          whether to request the slave to reply.
378  *  . writeTranxInBinlog(): called by the transaction thread when it finishes
379  *                          writing all transaction events in binlog.
380  *  . commitTrx():          transaction thread wait for the slave reply.
381  *
382  * Slave:
383  *  . slaveReadSyncHeader(): read the semi-sync header from the master, get the
384  *                           sync status and get the payload for events.
385  *  . slaveReply():          reply to the master about the replication progress.
386  *
387  ******************************************************************************/
388 
ReplSemiSyncMaster()389 ReplSemiSyncMaster::ReplSemiSyncMaster()
390   : active_tranxs_(NULL),
391     init_done_(false),
392     reply_file_name_inited_(false),
393     reply_file_pos_(0L),
394     wait_file_name_inited_(false),
395     wait_file_pos_(0),
396     master_enabled_(false),
397     wait_timeout_(0L),
398     state_(0)
399 {
400   strcpy(reply_file_name_, "");
401   strcpy(wait_file_name_, "");
402 }
403 
initObject()404 int ReplSemiSyncMaster::initObject()
405 {
406   int result;
407   const char *kWho = "ReplSemiSyncMaster::initObject";
408 
409   if (init_done_)
410   {
411     fprintf(stderr, "%s called twice\n", kWho);
412     return 1;
413   }
414   init_done_ = true;
415 
416   /* References to the parameter works after set_options(). */
417   setWaitTimeout(rpl_semi_sync_master_timeout);
418   setTraceLevel(rpl_semi_sync_master_trace_level);
419 
420   /* Mutex initialization can only be done after MY_INIT(). */
421   mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
422                    &LOCK_binlog_, MY_MUTEX_INIT_FAST);
423 
424   if (rpl_semi_sync_master_enabled)
425     result = enableMaster();
426   else
427     result = disableMaster();
428 
429   return result;
430 }
431 
enableMaster()432 int ReplSemiSyncMaster::enableMaster()
433 {
434   int result = 0;
435 
436   /* Must have the lock when we do enable of disable. */
437   lock();
438 
439   if (!getMasterEnabled())
440   {
441     if (active_tranxs_ == NULL)
442       active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
443 
444     if (active_tranxs_ != NULL)
445     {
446       commit_file_name_inited_ = false;
447       reply_file_name_inited_  = false;
448       wait_file_name_inited_   = false;
449 
450       set_master_enabled(true);
451       state_ = true;
452       sql_print_information("Semi-sync replication enabled on the master.");
453     }
454     else
455     {
456       sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
457       result = -1;
458     }
459   }
460 
461   unlock();
462 
463   return result;
464 }
465 
disableMaster()466 int ReplSemiSyncMaster::disableMaster()
467 {
468   /* Must have the lock when we do enable of disable. */
469   lock();
470 
471   if (getMasterEnabled())
472   {
473     /* Switch off the semi-sync first so that waiting transaction will be
474      * waken up.
475      */
476     switch_off();
477 
478     if ( active_tranxs_ && active_tranxs_->is_empty())
479     {
480       delete active_tranxs_;
481       active_tranxs_ = NULL;
482     }
483 
484     reply_file_name_inited_ = false;
485     wait_file_name_inited_  = false;
486     commit_file_name_inited_ = false;
487 
488     set_master_enabled(false);
489     sql_print_information("Semi-sync replication disabled on the master.");
490   }
491 
492   unlock();
493 
494   return 0;
495 }
496 
~ReplSemiSyncMaster()497 ReplSemiSyncMaster::~ReplSemiSyncMaster()
498 {
499   if (init_done_)
500   {
501     mysql_mutex_destroy(&LOCK_binlog_);
502   }
503 
504   delete active_tranxs_;
505 }
506 
lock()507 void ReplSemiSyncMaster::lock()
508 {
509   mysql_mutex_lock(&LOCK_binlog_);
510 }
511 
unlock()512 void ReplSemiSyncMaster::unlock()
513 {
514   mysql_mutex_unlock(&LOCK_binlog_);
515 }
516 
add_slave()517 void ReplSemiSyncMaster::add_slave()
518 {
519   lock();
520   rpl_semi_sync_master_clients++;
521   unlock();
522 }
523 
remove_slave()524 void ReplSemiSyncMaster::remove_slave()
525 {
526   lock();
527   rpl_semi_sync_master_clients--;
528 
529   /* Only switch off if semi-sync is enabled and is on */
530   if (getMasterEnabled() && is_on())
531   {
532     /* If user has chosen not to wait or if the server is shutting down if no
533        semi-sync slave available and the last semi-sync slave exits, turn off
534        semi-sync on master immediately.
535      */
536     if (rpl_semi_sync_master_clients == 0 &&
537         (!rpl_semi_sync_master_wait_no_slave || abort_loop))
538     {
539       if (abort_loop)
540       {
541         if (commit_file_name_inited_ && reply_file_name_inited_)
542         {
543           int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_ ,
544                                          commit_file_name_, commit_file_pos_);
545           if (cmp < 0)
546             sql_print_warning("SEMISYNC: Forced shutdown. Some updates might "
547                               "not be replicated.");
548         }
549       }
550       switch_off();
551     }
552   }
553   unlock();
554 }
555 
is_semi_sync_slave()556 bool ReplSemiSyncMaster::is_semi_sync_slave()
557 {
558   int null_value;
559   long long val= 0;
560   get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
561   return val;
562 }
563 
reportReplyBinlog(uint32 server_id,const char * log_file_name,my_off_t log_file_pos,bool skipped_event)564 int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
565                                           const char *log_file_name,
566                                           my_off_t log_file_pos,
567                                           bool skipped_event)
568 {
569   const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
570   int   cmp;
571   bool  can_release_threads = false;
572   bool  need_copy_send_pos = true;
573 
574   if (!(getMasterEnabled()))
575     return 0;
576 
577   function_enter(kWho);
578 
579   lock();
580 
581   /* This is the real check inside the mutex. */
582   if (!getMasterEnabled())
583     goto l_end;
584 
585   if (!is_on())
586     /* We check to see whether we can switch semi-sync ON. */
587     try_switch_on(server_id, log_file_name, log_file_pos);
588 
589   /* The position should increase monotonically, if there is only one
590    * thread sending the binlog to the slave.
591    * In reality, to improve the transaction availability, we allow multiple
592    * sync replication slaves.  So, if any one of them get the transaction,
593    * the transaction session in the primary can move forward.
594    */
595   if (reply_file_name_inited_)
596   {
597     cmp = ActiveTranx::compare(log_file_name, log_file_pos,
598                                reply_file_name_, reply_file_pos_);
599 
600     /* If the requested position is behind the sending binlog position,
601      * would not adjust sending binlog position.
602      * We based on the assumption that there are multiple semi-sync slave,
603      * and at least one of them shou/ld be up to date.
604      * If all semi-sync slaves are behind, at least initially, the primary
605      * can find the situation after the waiting timeout.  After that, some
606      * slaves should catch up quickly.
607      */
608     if (cmp < 0)
609     {
610       /* If the position is behind, do not copy it. */
611       need_copy_send_pos = false;
612     }
613   }
614 
615   if (need_copy_send_pos)
616   {
617     strncpy(reply_file_name_, log_file_name, sizeof(reply_file_name_) - 1);
618     reply_file_name_[sizeof(reply_file_name_) - 1]= '\0';
619     reply_file_pos_ = log_file_pos;
620     reply_file_name_inited_ = true;
621 
622     if (trace_level_ & kTraceDetail)
623     {
624       if(!skipped_event)
625         sql_print_information("%s: Got reply at (%s, %lu)", kWho,
626                             log_file_name, (unsigned long)log_file_pos);
627       else
628         sql_print_information("%s: Transaction skipped at (%s, %lu)", kWho,
629                             log_file_name, (unsigned long)log_file_pos);
630     }
631   }
632 
633   if (rpl_semi_sync_master_wait_sessions > 0)
634   {
635     /* Let us check if some of the waiting threads doing a trx
636      * commit can now proceed.
637      */
638     cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
639                                wait_file_name_, wait_file_pos_);
640     if (cmp >= 0)
641     {
642       /* Yes, at least one waiting thread can now proceed:
643        * let us release all waiting threads with a broadcast
644        */
645       can_release_threads = true;
646       wait_file_name_inited_ = false;
647     }
648   }
649 
650  l_end:
651 
652   if (can_release_threads)
653   {
654     if (trace_level_ & kTraceDetail)
655       sql_print_information("%s: signal all waiting threads.", kWho);
656     active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
657   }
658   unlock();
659   return function_exit(kWho, 0);
660 }
661 
commitTrx(const char * trx_wait_binlog_name,my_off_t trx_wait_binlog_pos)662 int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
663 				  my_off_t trx_wait_binlog_pos)
664 {
665   const char *kWho = "ReplSemiSyncMaster::commitTrx";
666 
667   function_enter(kWho);
668   PSI_stage_info old_stage;
669 
670 #if defined(ENABLED_DEBUG_SYNC)
671   /* debug sync may not be initialized for a master */
672   if (current_thd->debug_sync_control)
673     DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
674 #endif
675   /* Acquire the mutex. */
676   lock();
677 
678   TranxNode* entry= NULL;
679   mysql_cond_t* thd_cond= NULL;
680   bool is_semi_sync_trans= true;
681   if (active_tranxs_ != NULL && trx_wait_binlog_name)
682   {
683     entry=
684       active_tranxs_->find_active_tranx_node(trx_wait_binlog_name,
685                                              trx_wait_binlog_pos);
686     if (entry)
687       thd_cond= &entry->cond;
688   }
689   /* This must be called after acquired the lock */
690   THD_ENTER_COND(NULL, thd_cond, &LOCK_binlog_,
691                  & stage_waiting_for_semi_sync_ack_from_slave,
692                  & old_stage);
693 
694   if (getMasterEnabled() && trx_wait_binlog_name)
695   {
696     struct timespec start_ts;
697     struct timespec abstime;
698     int wait_result;
699 
700     set_timespec(start_ts, 0);
701     /* This is the real check inside the mutex. */
702     if (!getMasterEnabled() || !is_on())
703       goto l_end;
704 
705     if (trace_level_ & kTraceDetail)
706     {
707       sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
708                             trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
709                             (int)is_on());
710     }
711 
712     /* Calcuate the waiting period. */
713 #ifndef HAVE_STRUCT_TIMESPEC
714       abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
715       abstime.max_timeout_msec= (long)wait_timeout_;
716 #else
717       abstime.tv_sec = start_ts.tv_sec + wait_timeout_ / TIME_THOUSAND;
718       abstime.tv_nsec = start_ts.tv_nsec +
719         (wait_timeout_ % TIME_THOUSAND) * TIME_MILLION;
720       if (abstime.tv_nsec >= TIME_BILLION)
721       {
722         abstime.tv_sec++;
723         abstime.tv_nsec -= TIME_BILLION;
724       }
725 #endif /* __WIN__ */
726 
727     while (is_on())
728     {
729       if (reply_file_name_inited_)
730       {
731         int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
732                                        trx_wait_binlog_name, trx_wait_binlog_pos);
733         if (cmp >= 0)
734         {
735           /* We have already sent the relevant binlog to the slave: no need to
736            * wait here.
737            */
738           if (trace_level_ & kTraceDetail)
739             sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
740                                   kWho, reply_file_name_, (unsigned long)reply_file_pos_);
741           break;
742         }
743       }
744       /*
745         When code reaches here an Entry object may not be present in the
746         following scenario.
747 
748         Semi sync was not enabled when transaction entered into ordered_commit
749         process. During flush stage, semi sync was not enabled and there was no
750         'Entry' object created for the transaction being committed and at a
751         later stage it was enabled. In this case trx_wait_binlog_name and
752         trx_wait_binlog_pos are set but the 'Entry' object is not present. Hence
753         dump thread will not wait for reply from slave and it will not update
754         reply_file_name. In such case the committing transaction should not wait
755         for an ack from slave and it should be considered as an async
756         transaction.
757       */
758       if (!entry)
759       {
760         is_semi_sync_trans= false;
761         goto l_end;
762       }
763 
764       /* Let us update the info about the minimum binlog position of waiting
765        * threads.
766        */
767       if (wait_file_name_inited_)
768       {
769         int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
770                                        wait_file_name_, wait_file_pos_);
771         if (cmp <= 0)
772 	{
773           /* This thd has a lower position, let's update the minimum info. */
774           strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1);
775           wait_file_name_[sizeof(wait_file_name_) - 1]= '\0';
776           wait_file_pos_ = trx_wait_binlog_pos;
777 
778           rpl_semi_sync_master_wait_pos_backtraverse++;
779           if (trace_level_ & kTraceDetail)
780             sql_print_information("%s: move back wait position (%s, %lu),",
781                                   kWho, wait_file_name_, (unsigned long)wait_file_pos_);
782         }
783       }
784       else
785       {
786         strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1);
787         wait_file_name_[sizeof(wait_file_name_) - 1]= '\0';
788         wait_file_pos_ = trx_wait_binlog_pos;
789         wait_file_name_inited_ = true;
790 
791         if (trace_level_ & kTraceDetail)
792           sql_print_information("%s: init wait position (%s, %lu),",
793                                 kWho, wait_file_name_, (unsigned long)wait_file_pos_);
794       }
795 
796       /* In semi-synchronous replication, we wait until the binlog-dump
797        * thread has received the reply on the relevant binlog segment from the
798        * replication slave.
799        *
800        * Let us suspend this thread to wait on the condition;
801        * when replication has progressed far enough, we will release
802        * these waiting threads.
803        */
804       if (abort_loop && rpl_semi_sync_master_clients == 0 && is_on())
805       {
806         sql_print_warning("SEMISYNC: Forced shutdown. Some updates might "
807                           "not be replicated.");
808         switch_off();
809         break;
810       }
811       rpl_semi_sync_master_wait_sessions++;
812 
813       if (trace_level_ & kTraceDetail)
814         sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
815                               kWho, wait_timeout_,
816                               wait_file_name_, (unsigned long)wait_file_pos_);
817 
818       /* wait for the position to be ACK'ed back */
819       assert(entry);
820       entry->n_waiters++;
821       wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
822       entry->n_waiters--;
823       /*
824         After we release LOCK_binlog_ above while waiting for the condition,
825         it can happen that some other parallel client session executed
826         RESET MASTER. That can set rpl_semi_sync_master_wait_sessions to zero.
827         Hence check the value before decrementing it and decrement it only if it is
828         non-zero value.
829       */
830       if (rpl_semi_sync_master_wait_sessions > 0)
831         rpl_semi_sync_master_wait_sessions--;
832 
833       if (wait_result != 0)
834       {
835         /* This is a real wait timeout. */
836         sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
837                           "semi-sync up to file %s, position %lu.",
838                           trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
839                           reply_file_name_, (unsigned long)reply_file_pos_);
840         rpl_semi_sync_master_wait_timeouts++;
841 
842         /* switch semi-sync off */
843         switch_off();
844       }
845       else
846       {
847         int wait_time;
848 
849         wait_time = getWaitTime(start_ts);
850         if (wait_time < 0)
851         {
852           if (trace_level_ & kTraceGeneral)
853           {
854             sql_print_information("Assessment of waiting time for commitTrx "
855                                   "failed at wait position (%s, %lu)",
856                                   trx_wait_binlog_name,
857                                   (unsigned long)trx_wait_binlog_pos);
858           }
859           rpl_semi_sync_master_timefunc_fails++;
860         }
861         else
862         {
863           rpl_semi_sync_master_trx_wait_num++;
864           rpl_semi_sync_master_trx_wait_time += wait_time;
865         }
866       }
867     }
868 
869 l_end:
870     /* Update the status counter. */
871     if (is_on() && is_semi_sync_trans)
872       rpl_semi_sync_master_yes_transactions++;
873     else
874       rpl_semi_sync_master_no_transactions++;
875 
876   }
877 
878   /* Last waiter removes the TranxNode */
879   if (trx_wait_binlog_name && active_tranxs_
880       && entry && entry->n_waiters == 0)
881     active_tranxs_->clear_active_tranx_nodes(trx_wait_binlog_name,
882                                              trx_wait_binlog_pos);
883 
884   /* The lock held will be released by thd_exit_cond, so no need to
885     call unlock() here */
886   THD_EXIT_COND(NULL, & old_stage);
887   return function_exit(kWho, 0);
888 }
889 
890 /* Indicate that semi-sync replication is OFF now.
891  *
892  * What should we do when it is disabled?  The problem is that we want
893  * the semi-sync replication enabled again when the slave catches up
894  * later.  But, it is not that easy to detect that the slave has caught
895  * up.  This is caused by the fact that MySQL's replication protocol is
896  * asynchronous, meaning that if the master does not use the semi-sync
897  * protocol, the slave would not send anything to the master.
898  * Still, if the master is sending (N+1)-th event, we assume that it is
899  * an indicator that the slave has received N-th event and earlier ones.
900  *
901  * If semi-sync is disabled, all transactions still update the wait
902  * position with the last position in binlog.  But no transactions will
903  * wait for confirmations maintained.  In binlog dump thread,
904  * updateSyncHeader() checks whether the current sending event catches
905  * up with last wait position.  If it does match, semi-sync will be
906  * switched on again.
907  */
switch_off()908 int ReplSemiSyncMaster::switch_off()
909 {
910   const char *kWho = "ReplSemiSyncMaster::switch_off";
911 
912   function_enter(kWho);
913   state_ = false;
914 
915   rpl_semi_sync_master_off_times++;
916   wait_file_name_inited_   = false;
917   reply_file_name_inited_  = false;
918   sql_print_information("Semi-sync replication switched OFF.");
919 
920   /* signal waiting sessions */
921   active_tranxs_->signal_waiting_sessions_all();
922 
923   return function_exit(kWho, 0);
924 }
925 
try_switch_on(int server_id,const char * log_file_name,my_off_t log_file_pos)926 int ReplSemiSyncMaster::try_switch_on(int server_id,
927 				      const char *log_file_name,
928 				      my_off_t log_file_pos)
929 {
930   const char *kWho = "ReplSemiSyncMaster::try_switch_on";
931   bool semi_sync_on = false;
932 
933   function_enter(kWho);
934 
935   /* If the current sending event's position is larger than or equal to the
936    * 'largest' commit transaction binlog position, the slave is already
937    * catching up now and we can switch semi-sync on here.
938    * If commit_file_name_inited_ indicates there are no recent transactions,
939    * we can enable semi-sync immediately.
940    */
941   if (commit_file_name_inited_)
942   {
943     int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
944                                    commit_file_name_, commit_file_pos_);
945     semi_sync_on = (cmp >= 0);
946   }
947   else
948   {
949     semi_sync_on = true;
950   }
951 
952   if (semi_sync_on)
953   {
954     /* Switch semi-sync replication on. */
955     state_ = true;
956 
957     sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
958                           "at (%s, %lu)",
959                           server_id, log_file_name,
960                           (unsigned long)log_file_pos);
961   }
962 
963   return function_exit(kWho, 0);
964 }
965 
reserveSyncHeader(unsigned char * header,unsigned long size)966 int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
967 					  unsigned long size)
968 {
969   const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
970   function_enter(kWho);
971 
972   int hlen=0;
973   if (!is_semi_sync_slave())
974   {
975     hlen= 0;
976   }
977   else
978   {
979     /* No enough space for the extra header, disable semi-sync master */
980     if (sizeof(kSyncHeader) > size)
981     {
982       sql_print_warning("No enough space in the packet "
983                         "for semi-sync extra header, "
984                         "semi-sync replication disabled");
985       disableMaster();
986       return 0;
987     }
988 
989     /* Set the magic number and the sync status.  By default, no sync
990      * is required.
991      */
992     memcpy(header, kSyncHeader, sizeof(kSyncHeader));
993     hlen= sizeof(kSyncHeader);
994   }
995   return function_exit(kWho, hlen);
996 }
997 
updateSyncHeader(unsigned char * packet,const char * log_file_name,my_off_t log_file_pos,uint32 server_id)998 int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
999 					 const char *log_file_name,
1000 					 my_off_t log_file_pos,
1001 					 uint32 server_id)
1002 {
1003   const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
1004   int  cmp = 0;
1005   bool sync = false;
1006 
1007   /* If the semi-sync master is not enabled, or the slave is not a semi-sync
1008    * target, do not request replies from the slave.
1009    */
1010   if (!getMasterEnabled() || !is_semi_sync_slave())
1011     return 0;
1012 
1013   function_enter(kWho);
1014 
1015   lock();
1016 
1017   /* This is the real check inside the mutex. */
1018   if (!getMasterEnabled())
1019     goto l_end; // sync= false at this point in time
1020 
1021   if (is_on())
1022   {
1023     /* semi-sync is ON */
1024     /* sync= false; No sync unless a transaction is involved. */
1025 
1026     if (reply_file_name_inited_)
1027     {
1028       cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1029                                  reply_file_name_, reply_file_pos_);
1030       if (cmp <= 0)
1031       {
1032         /* If we have already got the reply for the event, then we do
1033          * not need to sync the transaction again.
1034          */
1035         goto l_end;
1036       }
1037     }
1038 
1039     if (wait_file_name_inited_)
1040     {
1041       cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1042                                  wait_file_name_, wait_file_pos_);
1043     }
1044     else
1045     {
1046       cmp = 1;
1047     }
1048 
1049     /* If we are already waiting for some transaction replies which
1050      * are later in binlog, do not wait for this one event.
1051      */
1052     if (cmp >= 0)
1053     {
1054       /*
1055        * We only wait if the event is a transaction's ending event.
1056        */
1057       assert(active_tranxs_ != NULL);
1058       sync = active_tranxs_->is_tranx_end_pos(log_file_name,
1059                                                log_file_pos);
1060     }
1061   }
1062   else
1063   {
1064     if (commit_file_name_inited_)
1065     {
1066       int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1067                                      commit_file_name_, commit_file_pos_);
1068       sync = (cmp >= 0);
1069     }
1070     else
1071     {
1072       sync = true;
1073     }
1074   }
1075 
1076   if (trace_level_ & kTraceDetail)
1077     sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
1078                           kWho, server_id, log_file_name,
1079                           (unsigned long)log_file_pos, sync, (int)is_on());
1080 
1081  l_end:
1082   unlock();
1083 
1084   /* We do not need to clear sync flag because we set it to 0 when we
1085    * reserve the packet header.
1086    */
1087   if (sync)
1088   {
1089     (packet)[2] = kPacketFlagSync;
1090   }
1091 
1092   return function_exit(kWho, 0);
1093 }
1094 
writeTranxInBinlog(const char * log_file_name,my_off_t log_file_pos)1095 int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
1096 					   my_off_t log_file_pos)
1097 {
1098   const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
1099   int result = 0;
1100 
1101   function_enter(kWho);
1102 
1103   lock();
1104 
1105   /* This is the real check inside the mutex. */
1106   if (!getMasterEnabled())
1107     goto l_end;
1108 
1109   /* Update the 'largest' transaction commit position seen so far even
1110    * though semi-sync is switched off.
1111    * It is much better that we update commit_file_* here, instead of
1112    * inside commitTrx().  This is mostly because updateSyncHeader()
1113    * will watch for commit_file_* to decide whether to switch semi-sync
1114    * on. The detailed reason is explained in function updateSyncHeader().
1115    */
1116   if (commit_file_name_inited_)
1117   {
1118     int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1119                                    commit_file_name_, commit_file_pos_);
1120     if (cmp > 0)
1121     {
1122       /* This is a larger position, let's update the maximum info. */
1123       strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1124       commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1125       commit_file_pos_ = log_file_pos;
1126     }
1127   }
1128   else
1129   {
1130     strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1131     commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1132     commit_file_pos_ = log_file_pos;
1133     commit_file_name_inited_ = true;
1134   }
1135 
1136   if (is_on())
1137   {
1138     assert(active_tranxs_ != NULL);
1139     if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
1140     {
1141       /*
1142         if insert tranx_node failed, print a warning message
1143         and turn off semi-sync
1144       */
1145       sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1146                         log_file_name, (ulong)log_file_pos);
1147       switch_off();
1148     }
1149   }
1150 
1151  l_end:
1152   unlock();
1153 
1154   return function_exit(kWho, result);
1155 }
1156 
skipSlaveReply(const char * event_buf,uint32 server_id,const char * skipped_log_file,my_off_t skipped_log_pos)1157 int ReplSemiSyncMaster::skipSlaveReply(const char *event_buf,
1158                                        uint32 server_id,
1159                                        const char* skipped_log_file,
1160                                        my_off_t skipped_log_pos)
1161 {
1162   const char *kWho = "ReplSemiSyncMaster::skipSlaveReply";
1163 
1164   function_enter(kWho);
1165 
1166   assert((unsigned char)event_buf[1] == kPacketMagicNum);
1167   if ((unsigned char)event_buf[2] != kPacketFlagSync)
1168   {
1169     /* current event would not require a reply anyway */
1170     goto l_end;
1171   }
1172 
1173   reportReplyBinlog(server_id, skipped_log_file,
1174                     skipped_log_pos, true);
1175 
1176  l_end:
1177   return function_exit(kWho, 0);
1178 }
1179 
readSlaveReply(NET * net,uint32 server_id,const char * event_buf)1180 int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
1181                                        const char *event_buf)
1182 {
1183   const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
1184   const unsigned char *packet;
1185   char     log_file_name[FN_REFLEN];
1186   my_off_t log_file_pos;
1187   ulong    log_file_len = 0;
1188   ulong    packet_len;
1189   int      result = -1;
1190 
1191   struct timespec start_ts= { 0, 0 };
1192   ulong trc_level = trace_level_;
1193 
1194   function_enter(kWho);
1195 
1196   assert((unsigned char)event_buf[1] == kPacketMagicNum);
1197   if ((unsigned char)event_buf[2] != kPacketFlagSync)
1198   {
1199     /* current event does not require reply */
1200     result = 0;
1201     goto l_end;
1202   }
1203 
1204   if (trc_level & kTraceNetWait)
1205     set_timespec(start_ts, 0);
1206 
1207   /* We flush to make sure that the current event is sent to the network,
1208    * instead of being buffered in the TCP/IP stack.
1209    */
1210   if (net_flush(net))
1211   {
1212     sql_print_error("Semi-sync master failed on net_flush() "
1213                     "before waiting for slave reply");
1214     goto l_end;
1215   }
1216 
1217   net_clear(net, 0);
1218   if (trc_level & kTraceDetail)
1219     sql_print_information("%s: Wait for replica's reply", kWho);
1220 
1221   /* Wait for the network here.  Though binlog dump thread can indefinitely wait
1222    * here, transactions would not wait indefintely.
1223    * Transactions wait on binlog replies detected by binlog dump threads.  If
1224    * binlog dump threads wait too long, transactions will timeout and continue.
1225    */
1226   packet_len = my_net_read(net);
1227 
1228   if (trc_level & kTraceNetWait)
1229   {
1230     int wait_time = getWaitTime(start_ts);
1231     if (wait_time < 0)
1232     {
1233       sql_print_information("Assessment of waiting time for "
1234                             "readSlaveReply failed.");
1235       rpl_semi_sync_master_timefunc_fails++;
1236     }
1237     else
1238     {
1239       rpl_semi_sync_master_net_wait_num++;
1240       rpl_semi_sync_master_net_wait_time += wait_time;
1241     }
1242   }
1243 
1244   if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
1245   {
1246     if (packet_len == packet_error)
1247       sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
1248                       net->last_error, net->last_errno);
1249     else
1250       sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
1251                       net->last_error, net->last_errno);
1252     goto l_end;
1253   }
1254 
1255   packet = net->read_pos;
1256   if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
1257   {
1258     sql_print_error("Read semi-sync reply magic number error");
1259     goto l_end;
1260   }
1261 
1262   log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
1263   log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
1264   if (log_file_len >= FN_REFLEN)
1265   {
1266     sql_print_error("Read semi-sync reply binlog file length too large");
1267     goto l_end;
1268   }
1269   strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
1270   log_file_name[log_file_len] = 0;
1271 
1272   if (trc_level & kTraceDetail)
1273     sql_print_information("%s: Got reply (%s, %lu)",
1274                           kWho, log_file_name, (ulong)log_file_pos);
1275 
1276   result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
1277 
1278  l_end:
1279   return function_exit(kWho, result);
1280 }
1281 
1282 
resetMaster()1283 int ReplSemiSyncMaster::resetMaster()
1284 {
1285   const char *kWho = "ReplSemiSyncMaster::resetMaster";
1286   int result = 0;
1287 
1288   function_enter(kWho);
1289 
1290 
1291   lock();
1292 
1293   state_ = getMasterEnabled()? 1 : 0;
1294 
1295   wait_file_name_inited_   = false;
1296   reply_file_name_inited_  = false;
1297   commit_file_name_inited_ = false;
1298 
1299   rpl_semi_sync_master_yes_transactions = 0;
1300   rpl_semi_sync_master_no_transactions = 0;
1301   rpl_semi_sync_master_off_times = 0;
1302   rpl_semi_sync_master_timefunc_fails = 0;
1303   rpl_semi_sync_master_wait_sessions = 0;
1304   rpl_semi_sync_master_wait_pos_backtraverse = 0;
1305   rpl_semi_sync_master_trx_wait_num = 0;
1306   rpl_semi_sync_master_trx_wait_time = 0;
1307   rpl_semi_sync_master_net_wait_num = 0;
1308   rpl_semi_sync_master_net_wait_time = 0;
1309 
1310   unlock();
1311 
1312   return function_exit(kWho, result);
1313 }
1314 
setExportStats()1315 void ReplSemiSyncMaster::setExportStats()
1316 {
1317   lock();
1318 
1319   rpl_semi_sync_master_status           = state_;
1320   rpl_semi_sync_master_avg_trx_wait_time=
1321     ((rpl_semi_sync_master_trx_wait_num) ?
1322      (unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
1323                      ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
1324   rpl_semi_sync_master_avg_net_wait_time=
1325     ((rpl_semi_sync_master_net_wait_num) ?
1326      (unsigned long)((double)rpl_semi_sync_master_net_wait_time /
1327                      ((double)rpl_semi_sync_master_net_wait_num)) : 0);
1328 
1329   unlock();
1330 }
1331 
1332 /* Get the waiting time given the wait's staring time.
1333  *
1334  * Return:
1335  *  >= 0: the waiting time in microsecons(us)
1336  *   < 0: error in get time or time back traverse
1337  */
getWaitTime(const struct timespec & start_ts)1338 static int getWaitTime(const struct timespec& start_ts)
1339 {
1340   unsigned long long start_usecs, end_usecs;
1341   struct timespec end_ts;
1342 
1343   /* Starting time in microseconds(us). */
1344   start_usecs = timespec_to_usec(&start_ts);
1345 
1346   /* Get the wait time interval. */
1347   set_timespec(end_ts, 0);
1348 
1349   /* Ending time in microseconds(us). */
1350   end_usecs = timespec_to_usec(&end_ts);
1351 
1352   if (end_usecs < start_usecs)
1353     return -1;
1354 
1355   return (int)(end_usecs - start_usecs);
1356 }
1357