1 /* Copyright (c) 2012, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <stdio.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <algorithm>
27 #include <atomic>
28 
29 #include "libbinlogevents/include/control_events.h"
30 #include "m_string.h"
31 #include "my_dbug.h"
32 #include "my_inttypes.h"
33 #include "my_macros.h"
34 #include "my_thread.h"
35 #include "mysql/psi/mysql_mutex.h"
36 #include "sql/rpl_gtid.h"
37 #include "typelib.h"
38 
39 struct mysql_mutex_t;
40 
41 #ifdef MYSQL_SERVER
42 #include "mysql/thread_type.h"
43 #include "mysqld_error.h"  // ER_*
44 #include "sql/binlog.h"
45 #include "sql/current_thd.h"
46 #include "sql/rpl_msr.h"
47 #include "sql/sql_class.h"  // THD
48 #include "sql/sql_error.h"
49 #include "storage/perfschema/pfs_instr_class.h"  // gtid_monitoring_getsystime
50 #endif                                           // ifdef MYSQL_SERVER
51 
52 #ifndef MYSQL_SERVER
53 #include "client/mysqlbinlog.h"
54 #endif
55 
56 ulong _gtid_consistency_mode;
57 const char *gtid_consistency_mode_names[] = {"OFF", "ON", "WARN", NullS};
58 TYPELIB gtid_consistency_mode_typelib = {
59     array_elements(gtid_consistency_mode_names) - 1, "",
60     gtid_consistency_mode_names, nullptr};
61 
62 #ifdef MYSQL_SERVER
get_gtid_consistency_mode()63 enum_gtid_consistency_mode get_gtid_consistency_mode() {
64   global_sid_lock->assert_some_lock();
65   return (enum_gtid_consistency_mode)_gtid_consistency_mode;
66 }
67 #endif
68 
parse(Sid_map * sid_map,const char * text)69 enum_return_status Gtid::parse(Sid_map *sid_map, const char *text) {
70   DBUG_TRACE;
71   rpl_sid sid;
72   const char *s = text;
73 
74   SKIP_WHITESPACE();
75 
76   // parse sid
77   if (sid.parse(s, binary_log::Uuid::TEXT_LENGTH) == 0) {
78     rpl_sidno sidno_var = sid_map->add_sid(sid);
79     if (sidno_var <= 0) RETURN_REPORTED_ERROR;
80     s += binary_log::Uuid::TEXT_LENGTH;
81 
82     SKIP_WHITESPACE();
83 
84     // parse colon
85     if (*s == ':') {
86       s++;
87 
88       SKIP_WHITESPACE();
89 
90       // parse gno
91       rpl_gno gno_var = parse_gno(&s);
92       if (gno_var > 0) {
93         SKIP_WHITESPACE();
94         if (*s == '\0') {
95           sidno = sidno_var;
96           gno = gno_var;
97           RETURN_OK;
98         } else
99           DBUG_PRINT("info", ("expected end of string, found garbage '%.80s' "
100                               "at char %d in '%s'",
101                               s, (int)(s - text), text));
102       } else
103         DBUG_PRINT("info", ("GNO was zero or invalid (%lld) at char %d in '%s'",
104                             gno_var, (int)(s - text), text));
105     } else
106       DBUG_PRINT("info",
107                  ("missing colon at char %d in '%s'", (int)(s - text), text));
108   } else
109     DBUG_PRINT("info",
110                ("not a uuid at char %d in '%s'", (int)(s - text), text));
111   BINLOG_ERROR(("Malformed GTID specification: %.200s", text),
112                (ER_MALFORMED_GTID_SPECIFICATION, MYF(0), text));
113   RETURN_REPORTED_ERROR;
114 }
115 
to_string(const rpl_sid & sid,char * buf) const116 int Gtid::to_string(const rpl_sid &sid, char *buf) const {
117   DBUG_TRACE;
118   char *s = buf + sid.to_string(buf);
119   *s = ':';
120   s++;
121   s += format_gno(s, gno);
122   return (int)(s - buf);
123 }
124 
to_string(const Sid_map * sid_map,char * buf,bool need_lock) const125 int Gtid::to_string(const Sid_map *sid_map, char *buf, bool need_lock) const {
126   DBUG_TRACE;
127   int ret;
128   if (sid_map != nullptr) {
129     Checkable_rwlock *lock = sid_map->get_sid_lock();
130     if (lock) {
131       if (need_lock)
132         lock->rdlock();
133       else
134         lock->assert_some_lock();
135     }
136     const rpl_sid &sid = sid_map->sidno_to_sid(sidno);
137     if (lock && need_lock) lock->unlock();
138     ret = to_string(sid, buf);
139   } else {
140 #ifdef DBUG_OFF
141     /*
142       NULL is only allowed in debug mode, since the sidno does not
143       make sense for users but is useful to include in debug
144       printouts.  Therefore, we want to ASSERT(0) in non-debug mode.
145       Since there is no ASSERT in non-debug mode, we use abort
146       instead.
147     */
148     abort();
149 #endif
150     ret = sprintf(buf, "%d:%lld", sidno, gno);
151   }
152   return ret;
153 }
154 
is_valid(const char * text)155 bool Gtid::is_valid(const char *text) {
156   DBUG_TRACE;
157   const char *s = text;
158   SKIP_WHITESPACE();
159   if (!rpl_sid::is_valid(s, binary_log::Uuid::TEXT_LENGTH)) {
160     DBUG_PRINT("info",
161                ("not a uuid at char %d in '%s'", (int)(s - text), text));
162     return false;
163   }
164   s += binary_log::Uuid::TEXT_LENGTH;
165   SKIP_WHITESPACE();
166   if (*s != ':') {
167     DBUG_PRINT("info",
168                ("missing colon at char %d in '%s'", (int)(s - text), text));
169     return false;
170   }
171   s++;
172   SKIP_WHITESPACE();
173   if (parse_gno(&s) <= 0) {
174     DBUG_PRINT("info", ("GNO was zero or invalid at char %d in '%s'",
175                         (int)(s - text), text));
176     return false;
177   }
178   SKIP_WHITESPACE();
179   if (*s != 0) {
180     DBUG_PRINT("info", ("expected end of string, found garbage '%.80s' "
181                         "at char %d in '%s'",
182                         s, (int)(s - text), text));
183     return false;
184   }
185   return true;
186 }
187 
188 #ifndef DBUG_OFF
check_return_status(enum_return_status status,const char * action,const char * status_name,int allow_unreported)189 void check_return_status(enum_return_status status, const char *action,
190                          const char *status_name, int allow_unreported) {
191   if (status != RETURN_STATUS_OK) {
192     DBUG_ASSERT(allow_unreported || status == RETURN_STATUS_REPORTED_ERROR);
193     if (status == RETURN_STATUS_REPORTED_ERROR) {
194 #if defined(MYSQL_SERVER) && !defined(DBUG_OFF)
195       THD *thd = current_thd;
196       /*
197         We create a new system THD with 'SYSTEM_THREAD_COMPRESS_GTID_TABLE'
198         when initializing gtid state by fetching gtids during server startup,
199         so we can check on it before diagnostic area is active and skip the
200         assert in this case. We assert that diagnostic area logged the error
201         outside server startup since the assert is realy useful.
202      */
203       DBUG_ASSERT(thd == nullptr ||
204                   thd->get_stmt_da()->status() == Diagnostics_area::DA_ERROR ||
205                   (thd->get_stmt_da()->status() == Diagnostics_area::DA_EMPTY &&
206                    thd->system_thread == SYSTEM_THREAD_COMPRESS_GTID_TABLE));
207 #endif
208     }
209     DBUG_PRINT("info", ("%s error %d (%s)", action, status, status_name));
210   }
211 }
212 #endif  // ! DBUG_OFF
213 
214 #ifdef MYSQL_SERVER
get_sidno_from_global_sid_map(rpl_sid sid)215 rpl_sidno get_sidno_from_global_sid_map(rpl_sid sid) {
216   DBUG_TRACE;
217 
218   global_sid_lock->rdlock();
219   rpl_sidno sidno = global_sid_map->add_sid(sid);
220   global_sid_lock->unlock();
221 
222   return sidno;
223 }
224 
get_last_executed_gno(rpl_sidno sidno)225 rpl_gno get_last_executed_gno(rpl_sidno sidno) {
226   DBUG_TRACE;
227 
228   global_sid_lock->rdlock();
229   rpl_gno gno = gtid_state->get_last_executed_gno(sidno);
230   global_sid_lock->unlock();
231 
232   return gno;
233 }
234 
Trx_monitoring_info()235 Trx_monitoring_info::Trx_monitoring_info() { clear(); }
236 
Trx_monitoring_info(const Trx_monitoring_info & info)237 Trx_monitoring_info::Trx_monitoring_info(const Trx_monitoring_info &info) {
238   if ((is_info_set = info.is_info_set)) {
239     gtid = info.gtid;
240     original_commit_timestamp = info.original_commit_timestamp;
241     immediate_commit_timestamp = info.immediate_commit_timestamp;
242     start_time = info.start_time;
243     end_time = info.end_time;
244     skipped = info.skipped;
245     last_transient_error_number = info.last_transient_error_number;
246     strcpy(last_transient_error_message, info.last_transient_error_message);
247     last_transient_error_timestamp = info.last_transient_error_timestamp;
248     transaction_retries = info.transaction_retries;
249     is_retrying = info.is_retrying;
250     compression_type = info.compression_type;
251     compressed_bytes = info.compressed_bytes;
252     uncompressed_bytes = info.uncompressed_bytes;
253   }
254 }
255 
clear()256 void Trx_monitoring_info::clear() {
257   gtid = {0, 0};
258   original_commit_timestamp = 0;
259   immediate_commit_timestamp = 0;
260   start_time = 0;
261   end_time = 0;
262   skipped = false;
263   is_info_set = false;
264   last_transient_error_number = 0;
265   last_transient_error_message[0] = '\0';
266   last_transient_error_timestamp = 0;
267   transaction_retries = 0;
268   is_retrying = false;
269   compression_type = binary_log::transaction::compression::type::NONE;
270   compressed_bytes = 0;
271   uncompressed_bytes = 0;
272 }
273 
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg)274 void Trx_monitoring_info::copy_to_ps_table(Sid_map *sid_map, char *gtid_arg,
275                                            uint *gtid_length_arg,
276                                            ulonglong *original_commit_ts_arg,
277                                            ulonglong *immediate_commit_ts_arg,
278                                            ulonglong *start_time_arg) {
279   DBUG_ASSERT(sid_map);
280   DBUG_ASSERT(gtid_arg);
281   DBUG_ASSERT(gtid_length_arg);
282   DBUG_ASSERT(original_commit_ts_arg);
283   DBUG_ASSERT(immediate_commit_ts_arg);
284   DBUG_ASSERT(start_time_arg);
285 
286   if (is_info_set) {
287     // The trx_monitoring_info is populated
288     if (gtid.is_empty()) {
289       // The transaction is anonymous
290       memcpy(gtid_arg, "ANONYMOUS", 10);
291       *gtid_length_arg = 9;
292     } else {
293       // The GTID is set
294       Checkable_rwlock *sid_lock = sid_map->get_sid_lock();
295       sid_lock->rdlock();
296       *gtid_length_arg = gtid.to_string(sid_map, gtid_arg);
297       sid_lock->unlock();
298     }
299     *original_commit_ts_arg = original_commit_timestamp;
300     *immediate_commit_ts_arg = immediate_commit_timestamp;
301     *start_time_arg = start_time / 10;
302   } else {
303     // This monitoring info is not populated, so let's zero the input
304     memcpy(gtid_arg, "", 1);
305     *gtid_length_arg = 0;
306     *original_commit_ts_arg = 0;
307     *immediate_commit_ts_arg = 0;
308     *start_time_arg = 0;
309   }
310 }
311 
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg,ulonglong * end_time_arg)312 void Trx_monitoring_info::copy_to_ps_table(Sid_map *sid_map, char *gtid_arg,
313                                            uint *gtid_length_arg,
314                                            ulonglong *original_commit_ts_arg,
315                                            ulonglong *immediate_commit_ts_arg,
316                                            ulonglong *start_time_arg,
317                                            ulonglong *end_time_arg) {
318   DBUG_ASSERT(end_time_arg);
319 
320   *end_time_arg = is_info_set ? end_time / 10 : 0;
321   copy_to_ps_table(sid_map, gtid_arg, gtid_length_arg, original_commit_ts_arg,
322                    immediate_commit_ts_arg, start_time_arg);
323 }
324 
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg,uint * last_transient_errno_arg,char * last_transient_errmsg_arg,uint * last_transient_errmsg_length_arg,ulonglong * last_transient_timestamp_arg,ulong * retries_count_arg)325 void Trx_monitoring_info::copy_to_ps_table(
326     Sid_map *sid_map, char *gtid_arg, uint *gtid_length_arg,
327     ulonglong *original_commit_ts_arg, ulonglong *immediate_commit_ts_arg,
328     ulonglong *start_time_arg, uint *last_transient_errno_arg,
329     char *last_transient_errmsg_arg, uint *last_transient_errmsg_length_arg,
330     ulonglong *last_transient_timestamp_arg, ulong *retries_count_arg) {
331   DBUG_ASSERT(last_transient_errno_arg);
332   DBUG_ASSERT(last_transient_errmsg_arg);
333   DBUG_ASSERT(last_transient_errmsg_length_arg);
334   DBUG_ASSERT(last_transient_timestamp_arg);
335   DBUG_ASSERT(retries_count_arg);
336 
337   if (is_info_set) {
338     *last_transient_errno_arg = last_transient_error_number;
339     strcpy(last_transient_errmsg_arg, last_transient_error_message);
340     *last_transient_errmsg_length_arg = strlen(last_transient_error_message);
341     *last_transient_timestamp_arg = last_transient_error_timestamp / 10;
342     *retries_count_arg = transaction_retries;
343   } else {
344     *last_transient_errno_arg = 0;
345     memcpy(last_transient_errmsg_arg, "", 1);
346     *last_transient_errmsg_length_arg = 0;
347     *last_transient_timestamp_arg = 0;
348     *retries_count_arg = 0;
349   }
350   copy_to_ps_table(sid_map, gtid_arg, gtid_length_arg, original_commit_ts_arg,
351                    immediate_commit_ts_arg, start_time_arg);
352 }
353 
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg,ulonglong * end_time_arg,uint * last_transient_errno_arg,char * last_transient_errmsg_arg,uint * last_transient_errmsg_length_arg,ulonglong * last_transient_timestamp_arg,ulong * retries_count_arg)354 void Trx_monitoring_info::copy_to_ps_table(
355     Sid_map *sid_map, char *gtid_arg, uint *gtid_length_arg,
356     ulonglong *original_commit_ts_arg, ulonglong *immediate_commit_ts_arg,
357     ulonglong *start_time_arg, ulonglong *end_time_arg,
358     uint *last_transient_errno_arg, char *last_transient_errmsg_arg,
359     uint *last_transient_errmsg_length_arg,
360     ulonglong *last_transient_timestamp_arg, ulong *retries_count_arg) {
361   DBUG_ASSERT(end_time_arg);
362 
363   *end_time_arg = is_info_set ? end_time / 10 : 0;
364   copy_to_ps_table(sid_map, gtid_arg, gtid_length_arg, original_commit_ts_arg,
365                    immediate_commit_ts_arg, start_time_arg,
366                    last_transient_errno_arg, last_transient_errmsg_arg,
367                    last_transient_errmsg_length_arg,
368                    last_transient_timestamp_arg, retries_count_arg);
369 }
370 
Gtid_monitoring_info(mysql_mutex_t * atomic_mutex_arg)371 Gtid_monitoring_info::Gtid_monitoring_info(mysql_mutex_t *atomic_mutex_arg)
372     : atomic_mutex(atomic_mutex_arg) {
373   processing_trx = new Trx_monitoring_info;
374   last_processed_trx = new Trx_monitoring_info;
375 }
376 
~Gtid_monitoring_info()377 Gtid_monitoring_info::~Gtid_monitoring_info() {
378   delete last_processed_trx;
379   delete processing_trx;
380 }
381 
atomic_lock()382 void Gtid_monitoring_info::atomic_lock() {
383   if (atomic_mutex == nullptr) {
384     bool expected = false;
385     while (!atomic_locked.compare_exchange_weak(expected, true)) {
386       /*
387         On exchange failures, the atomic_locked value (true) is set
388         to the expected variable. It needs to be reset again.
389       */
390       expected = false;
391       /*
392         All "atomic" operations on this object are based on copying
393         variable contents and setting values. They should not take long.
394       */
395       my_thread_yield();
396     }
397 #ifndef DBUG_OFF
398     DBUG_ASSERT(!is_locked);
399     is_locked = true;
400 #endif
401   } else {
402     // If this object is relying on a mutex, just ensure it was acquired.
403     mysql_mutex_assert_owner(atomic_mutex)
404   }
405 }
406 
atomic_unlock()407 void Gtid_monitoring_info::atomic_unlock() {
408   if (atomic_mutex == nullptr) {
409 #ifndef DBUG_OFF
410     DBUG_ASSERT(is_locked);
411     is_locked = false;
412 #endif
413     atomic_locked = false;
414   } else
415     mysql_mutex_assert_owner(atomic_mutex)
416 }
417 
clear()418 void Gtid_monitoring_info::clear() {
419   atomic_lock();
420   processing_trx->clear();
421   last_processed_trx->clear();
422   atomic_unlock();
423 }
424 
clear_processing_trx()425 void Gtid_monitoring_info::clear_processing_trx() {
426   atomic_lock();
427   processing_trx->clear();
428   atomic_unlock();
429 }
430 
clear_last_processed_trx()431 void Gtid_monitoring_info::clear_last_processed_trx() {
432   atomic_lock();
433   last_processed_trx->clear();
434   atomic_unlock();
435 }
436 
update(binary_log::transaction::compression::type t,size_t payload_size,size_t uncompressed_size)437 void Gtid_monitoring_info::update(binary_log::transaction::compression::type t,
438                                   size_t payload_size,
439                                   size_t uncompressed_size) {
440   processing_trx->compression_type = t;
441   processing_trx->compressed_bytes = payload_size;
442   processing_trx->uncompressed_bytes = uncompressed_size;
443 }
444 
start(Gtid gtid_arg,ulonglong original_ts_arg,ulonglong immediate_ts_arg,bool skipped_arg)445 void Gtid_monitoring_info::start(Gtid gtid_arg, ulonglong original_ts_arg,
446                                  ulonglong immediate_ts_arg, bool skipped_arg) {
447   /**
448     When a new transaction starts processing, we reset all the information from
449     the previous processing_trx and fetch the current timestamp as the new
450     start_time.
451   */
452   if (!processing_trx->gtid.equals(gtid_arg) || !processing_trx->is_retrying) {
453     /* Collect current timestamp before the atomic operation */
454     ulonglong start_time = gtid_monitoring_getsystime();
455 
456     atomic_lock();
457     processing_trx->gtid = gtid_arg;
458     processing_trx->original_commit_timestamp = original_ts_arg;
459     processing_trx->immediate_commit_timestamp = immediate_ts_arg;
460     processing_trx->start_time = start_time;
461     processing_trx->end_time = 0;
462     processing_trx->skipped = skipped_arg;
463     processing_trx->is_info_set = true;
464     processing_trx->last_transient_error_number = 0;
465     processing_trx->last_transient_error_message[0] = '\0';
466     processing_trx->last_transient_error_timestamp = 0;
467     processing_trx->transaction_retries = 0;
468     processing_trx->compression_type =
469         binary_log::transaction::compression::type::NONE;
470     processing_trx->compressed_bytes = 0;
471     processing_trx->uncompressed_bytes = 0;
472     atomic_unlock();
473   } else {
474     /**
475       If the transaction is being retried, only update the skipped field
476       because it determines if the information will be kept after it finishes
477       executing.
478     */
479     atomic_lock();
480     processing_trx->skipped = skipped_arg;
481     atomic_unlock();
482   }
483 }
484 
finish()485 void Gtid_monitoring_info::finish() {
486   /* Collect current timestamp before the atomic operation */
487   ulonglong end_time = gtid_monitoring_getsystime();
488 
489   atomic_lock();
490   processing_trx->end_time = end_time;
491   /*
492     We only swap if the transaction was not skipped.
493 
494     Notice that only applier thread set the skipped variable to true.
495   */
496   if (!processing_trx->skipped) std::swap(processing_trx, last_processed_trx);
497 
498   processing_trx->clear();
499   atomic_unlock();
500 }
501 
copy_info_to(Trx_monitoring_info * processing_dest,Trx_monitoring_info * last_processed_dest)502 void Gtid_monitoring_info::copy_info_to(
503     Trx_monitoring_info *processing_dest,
504     Trx_monitoring_info *last_processed_dest) {
505   atomic_lock();
506   *processing_dest = *processing_trx;
507   *last_processed_dest = *last_processed_trx;
508   atomic_unlock();
509 }
510 
copy_info_to(Gtid_monitoring_info * dest)511 void Gtid_monitoring_info::copy_info_to(Gtid_monitoring_info *dest) {
512   copy_info_to(dest->processing_trx, dest->last_processed_trx);
513 }
514 
is_processing_trx_set()515 bool Gtid_monitoring_info::is_processing_trx_set() {
516   /*
517     This function is only called by threads about to update the monitoring
518     information. It should be safe to collect this information without
519     acquiring locks.
520   */
521   return processing_trx->is_info_set;
522 }
523 
get_processing_trx_gtid()524 const Gtid *Gtid_monitoring_info::get_processing_trx_gtid() {
525   /*
526     This function is only called by relay log recovery/queuing.
527   */
528   DBUG_ASSERT(atomic_mutex != nullptr);
529   mysql_mutex_assert_owner(atomic_mutex);
530   return &processing_trx->gtid;
531 }
532 
store_transient_error(uint transient_errno_arg,const char * transient_err_message_arg,ulong trans_retries_arg)533 void Gtid_monitoring_info::store_transient_error(
534     uint transient_errno_arg, const char *transient_err_message_arg,
535     ulong trans_retries_arg) {
536   ulonglong retry_timestamp = gtid_monitoring_getsystime();
537   processing_trx->is_retrying = true;
538   atomic_lock();
539   processing_trx->transaction_retries = trans_retries_arg;
540   processing_trx->last_transient_error_number = transient_errno_arg;
541   snprintf(processing_trx->last_transient_error_message,
542            sizeof(processing_trx->last_transient_error_message), "%.*s",
543            MAX_SLAVE_ERRMSG - 1, transient_err_message_arg);
544   processing_trx->last_transient_error_timestamp = retry_timestamp;
545   atomic_unlock();
546 }
547 #endif  // ifdef MYSQL_SERVER
548