1 /* Copyright (c) 2012, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <stdio.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <algorithm>
27 #include <atomic>
28
29 #include "libbinlogevents/include/control_events.h"
30 #include "m_string.h"
31 #include "my_dbug.h"
32 #include "my_inttypes.h"
33 #include "my_macros.h"
34 #include "my_thread.h"
35 #include "mysql/psi/mysql_mutex.h"
36 #include "sql/rpl_gtid.h"
37 #include "typelib.h"
38
39 struct mysql_mutex_t;
40
41 #ifdef MYSQL_SERVER
42 #include "mysql/thread_type.h"
43 #include "mysqld_error.h" // ER_*
44 #include "sql/binlog.h"
45 #include "sql/current_thd.h"
46 #include "sql/rpl_msr.h"
47 #include "sql/sql_class.h" // THD
48 #include "sql/sql_error.h"
49 #include "storage/perfschema/pfs_instr_class.h" // gtid_monitoring_getsystime
50 #endif // ifdef MYSQL_SERVER
51
52 #ifndef MYSQL_SERVER
53 #include "client/mysqlbinlog.h"
54 #endif
55
56 ulong _gtid_consistency_mode;
57 const char *gtid_consistency_mode_names[] = {"OFF", "ON", "WARN", NullS};
58 TYPELIB gtid_consistency_mode_typelib = {
59 array_elements(gtid_consistency_mode_names) - 1, "",
60 gtid_consistency_mode_names, nullptr};
61
62 #ifdef MYSQL_SERVER
get_gtid_consistency_mode()63 enum_gtid_consistency_mode get_gtid_consistency_mode() {
64 global_sid_lock->assert_some_lock();
65 return (enum_gtid_consistency_mode)_gtid_consistency_mode;
66 }
67 #endif
68
parse(Sid_map * sid_map,const char * text)69 enum_return_status Gtid::parse(Sid_map *sid_map, const char *text) {
70 DBUG_TRACE;
71 rpl_sid sid;
72 const char *s = text;
73
74 SKIP_WHITESPACE();
75
76 // parse sid
77 if (sid.parse(s, binary_log::Uuid::TEXT_LENGTH) == 0) {
78 rpl_sidno sidno_var = sid_map->add_sid(sid);
79 if (sidno_var <= 0) RETURN_REPORTED_ERROR;
80 s += binary_log::Uuid::TEXT_LENGTH;
81
82 SKIP_WHITESPACE();
83
84 // parse colon
85 if (*s == ':') {
86 s++;
87
88 SKIP_WHITESPACE();
89
90 // parse gno
91 rpl_gno gno_var = parse_gno(&s);
92 if (gno_var > 0) {
93 SKIP_WHITESPACE();
94 if (*s == '\0') {
95 sidno = sidno_var;
96 gno = gno_var;
97 RETURN_OK;
98 } else
99 DBUG_PRINT("info", ("expected end of string, found garbage '%.80s' "
100 "at char %d in '%s'",
101 s, (int)(s - text), text));
102 } else
103 DBUG_PRINT("info", ("GNO was zero or invalid (%lld) at char %d in '%s'",
104 gno_var, (int)(s - text), text));
105 } else
106 DBUG_PRINT("info",
107 ("missing colon at char %d in '%s'", (int)(s - text), text));
108 } else
109 DBUG_PRINT("info",
110 ("not a uuid at char %d in '%s'", (int)(s - text), text));
111 BINLOG_ERROR(("Malformed GTID specification: %.200s", text),
112 (ER_MALFORMED_GTID_SPECIFICATION, MYF(0), text));
113 RETURN_REPORTED_ERROR;
114 }
115
to_string(const rpl_sid & sid,char * buf) const116 int Gtid::to_string(const rpl_sid &sid, char *buf) const {
117 DBUG_TRACE;
118 char *s = buf + sid.to_string(buf);
119 *s = ':';
120 s++;
121 s += format_gno(s, gno);
122 return (int)(s - buf);
123 }
124
to_string(const Sid_map * sid_map,char * buf,bool need_lock) const125 int Gtid::to_string(const Sid_map *sid_map, char *buf, bool need_lock) const {
126 DBUG_TRACE;
127 int ret;
128 if (sid_map != nullptr) {
129 Checkable_rwlock *lock = sid_map->get_sid_lock();
130 if (lock) {
131 if (need_lock)
132 lock->rdlock();
133 else
134 lock->assert_some_lock();
135 }
136 const rpl_sid &sid = sid_map->sidno_to_sid(sidno);
137 if (lock && need_lock) lock->unlock();
138 ret = to_string(sid, buf);
139 } else {
140 #ifdef DBUG_OFF
141 /*
142 NULL is only allowed in debug mode, since the sidno does not
143 make sense for users but is useful to include in debug
144 printouts. Therefore, we want to ASSERT(0) in non-debug mode.
145 Since there is no ASSERT in non-debug mode, we use abort
146 instead.
147 */
148 abort();
149 #endif
150 ret = sprintf(buf, "%d:%lld", sidno, gno);
151 }
152 return ret;
153 }
154
is_valid(const char * text)155 bool Gtid::is_valid(const char *text) {
156 DBUG_TRACE;
157 const char *s = text;
158 SKIP_WHITESPACE();
159 if (!rpl_sid::is_valid(s, binary_log::Uuid::TEXT_LENGTH)) {
160 DBUG_PRINT("info",
161 ("not a uuid at char %d in '%s'", (int)(s - text), text));
162 return false;
163 }
164 s += binary_log::Uuid::TEXT_LENGTH;
165 SKIP_WHITESPACE();
166 if (*s != ':') {
167 DBUG_PRINT("info",
168 ("missing colon at char %d in '%s'", (int)(s - text), text));
169 return false;
170 }
171 s++;
172 SKIP_WHITESPACE();
173 if (parse_gno(&s) <= 0) {
174 DBUG_PRINT("info", ("GNO was zero or invalid at char %d in '%s'",
175 (int)(s - text), text));
176 return false;
177 }
178 SKIP_WHITESPACE();
179 if (*s != 0) {
180 DBUG_PRINT("info", ("expected end of string, found garbage '%.80s' "
181 "at char %d in '%s'",
182 s, (int)(s - text), text));
183 return false;
184 }
185 return true;
186 }
187
188 #ifndef DBUG_OFF
check_return_status(enum_return_status status,const char * action,const char * status_name,int allow_unreported)189 void check_return_status(enum_return_status status, const char *action,
190 const char *status_name, int allow_unreported) {
191 if (status != RETURN_STATUS_OK) {
192 DBUG_ASSERT(allow_unreported || status == RETURN_STATUS_REPORTED_ERROR);
193 if (status == RETURN_STATUS_REPORTED_ERROR) {
194 #if defined(MYSQL_SERVER) && !defined(DBUG_OFF)
195 THD *thd = current_thd;
196 /*
197 We create a new system THD with 'SYSTEM_THREAD_COMPRESS_GTID_TABLE'
198 when initializing gtid state by fetching gtids during server startup,
199 so we can check on it before diagnostic area is active and skip the
200 assert in this case. We assert that diagnostic area logged the error
201 outside server startup since the assert is realy useful.
202 */
203 DBUG_ASSERT(thd == nullptr ||
204 thd->get_stmt_da()->status() == Diagnostics_area::DA_ERROR ||
205 (thd->get_stmt_da()->status() == Diagnostics_area::DA_EMPTY &&
206 thd->system_thread == SYSTEM_THREAD_COMPRESS_GTID_TABLE));
207 #endif
208 }
209 DBUG_PRINT("info", ("%s error %d (%s)", action, status, status_name));
210 }
211 }
212 #endif // ! DBUG_OFF
213
214 #ifdef MYSQL_SERVER
get_sidno_from_global_sid_map(rpl_sid sid)215 rpl_sidno get_sidno_from_global_sid_map(rpl_sid sid) {
216 DBUG_TRACE;
217
218 global_sid_lock->rdlock();
219 rpl_sidno sidno = global_sid_map->add_sid(sid);
220 global_sid_lock->unlock();
221
222 return sidno;
223 }
224
get_last_executed_gno(rpl_sidno sidno)225 rpl_gno get_last_executed_gno(rpl_sidno sidno) {
226 DBUG_TRACE;
227
228 global_sid_lock->rdlock();
229 rpl_gno gno = gtid_state->get_last_executed_gno(sidno);
230 global_sid_lock->unlock();
231
232 return gno;
233 }
234
Trx_monitoring_info()235 Trx_monitoring_info::Trx_monitoring_info() { clear(); }
236
Trx_monitoring_info(const Trx_monitoring_info & info)237 Trx_monitoring_info::Trx_monitoring_info(const Trx_monitoring_info &info) {
238 if ((is_info_set = info.is_info_set)) {
239 gtid = info.gtid;
240 original_commit_timestamp = info.original_commit_timestamp;
241 immediate_commit_timestamp = info.immediate_commit_timestamp;
242 start_time = info.start_time;
243 end_time = info.end_time;
244 skipped = info.skipped;
245 last_transient_error_number = info.last_transient_error_number;
246 strcpy(last_transient_error_message, info.last_transient_error_message);
247 last_transient_error_timestamp = info.last_transient_error_timestamp;
248 transaction_retries = info.transaction_retries;
249 is_retrying = info.is_retrying;
250 compression_type = info.compression_type;
251 compressed_bytes = info.compressed_bytes;
252 uncompressed_bytes = info.uncompressed_bytes;
253 }
254 }
255
clear()256 void Trx_monitoring_info::clear() {
257 gtid = {0, 0};
258 original_commit_timestamp = 0;
259 immediate_commit_timestamp = 0;
260 start_time = 0;
261 end_time = 0;
262 skipped = false;
263 is_info_set = false;
264 last_transient_error_number = 0;
265 last_transient_error_message[0] = '\0';
266 last_transient_error_timestamp = 0;
267 transaction_retries = 0;
268 is_retrying = false;
269 compression_type = binary_log::transaction::compression::type::NONE;
270 compressed_bytes = 0;
271 uncompressed_bytes = 0;
272 }
273
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg)274 void Trx_monitoring_info::copy_to_ps_table(Sid_map *sid_map, char *gtid_arg,
275 uint *gtid_length_arg,
276 ulonglong *original_commit_ts_arg,
277 ulonglong *immediate_commit_ts_arg,
278 ulonglong *start_time_arg) {
279 DBUG_ASSERT(sid_map);
280 DBUG_ASSERT(gtid_arg);
281 DBUG_ASSERT(gtid_length_arg);
282 DBUG_ASSERT(original_commit_ts_arg);
283 DBUG_ASSERT(immediate_commit_ts_arg);
284 DBUG_ASSERT(start_time_arg);
285
286 if (is_info_set) {
287 // The trx_monitoring_info is populated
288 if (gtid.is_empty()) {
289 // The transaction is anonymous
290 memcpy(gtid_arg, "ANONYMOUS", 10);
291 *gtid_length_arg = 9;
292 } else {
293 // The GTID is set
294 Checkable_rwlock *sid_lock = sid_map->get_sid_lock();
295 sid_lock->rdlock();
296 *gtid_length_arg = gtid.to_string(sid_map, gtid_arg);
297 sid_lock->unlock();
298 }
299 *original_commit_ts_arg = original_commit_timestamp;
300 *immediate_commit_ts_arg = immediate_commit_timestamp;
301 *start_time_arg = start_time / 10;
302 } else {
303 // This monitoring info is not populated, so let's zero the input
304 memcpy(gtid_arg, "", 1);
305 *gtid_length_arg = 0;
306 *original_commit_ts_arg = 0;
307 *immediate_commit_ts_arg = 0;
308 *start_time_arg = 0;
309 }
310 }
311
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg,ulonglong * end_time_arg)312 void Trx_monitoring_info::copy_to_ps_table(Sid_map *sid_map, char *gtid_arg,
313 uint *gtid_length_arg,
314 ulonglong *original_commit_ts_arg,
315 ulonglong *immediate_commit_ts_arg,
316 ulonglong *start_time_arg,
317 ulonglong *end_time_arg) {
318 DBUG_ASSERT(end_time_arg);
319
320 *end_time_arg = is_info_set ? end_time / 10 : 0;
321 copy_to_ps_table(sid_map, gtid_arg, gtid_length_arg, original_commit_ts_arg,
322 immediate_commit_ts_arg, start_time_arg);
323 }
324
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg,uint * last_transient_errno_arg,char * last_transient_errmsg_arg,uint * last_transient_errmsg_length_arg,ulonglong * last_transient_timestamp_arg,ulong * retries_count_arg)325 void Trx_monitoring_info::copy_to_ps_table(
326 Sid_map *sid_map, char *gtid_arg, uint *gtid_length_arg,
327 ulonglong *original_commit_ts_arg, ulonglong *immediate_commit_ts_arg,
328 ulonglong *start_time_arg, uint *last_transient_errno_arg,
329 char *last_transient_errmsg_arg, uint *last_transient_errmsg_length_arg,
330 ulonglong *last_transient_timestamp_arg, ulong *retries_count_arg) {
331 DBUG_ASSERT(last_transient_errno_arg);
332 DBUG_ASSERT(last_transient_errmsg_arg);
333 DBUG_ASSERT(last_transient_errmsg_length_arg);
334 DBUG_ASSERT(last_transient_timestamp_arg);
335 DBUG_ASSERT(retries_count_arg);
336
337 if (is_info_set) {
338 *last_transient_errno_arg = last_transient_error_number;
339 strcpy(last_transient_errmsg_arg, last_transient_error_message);
340 *last_transient_errmsg_length_arg = strlen(last_transient_error_message);
341 *last_transient_timestamp_arg = last_transient_error_timestamp / 10;
342 *retries_count_arg = transaction_retries;
343 } else {
344 *last_transient_errno_arg = 0;
345 memcpy(last_transient_errmsg_arg, "", 1);
346 *last_transient_errmsg_length_arg = 0;
347 *last_transient_timestamp_arg = 0;
348 *retries_count_arg = 0;
349 }
350 copy_to_ps_table(sid_map, gtid_arg, gtid_length_arg, original_commit_ts_arg,
351 immediate_commit_ts_arg, start_time_arg);
352 }
353
copy_to_ps_table(Sid_map * sid_map,char * gtid_arg,uint * gtid_length_arg,ulonglong * original_commit_ts_arg,ulonglong * immediate_commit_ts_arg,ulonglong * start_time_arg,ulonglong * end_time_arg,uint * last_transient_errno_arg,char * last_transient_errmsg_arg,uint * last_transient_errmsg_length_arg,ulonglong * last_transient_timestamp_arg,ulong * retries_count_arg)354 void Trx_monitoring_info::copy_to_ps_table(
355 Sid_map *sid_map, char *gtid_arg, uint *gtid_length_arg,
356 ulonglong *original_commit_ts_arg, ulonglong *immediate_commit_ts_arg,
357 ulonglong *start_time_arg, ulonglong *end_time_arg,
358 uint *last_transient_errno_arg, char *last_transient_errmsg_arg,
359 uint *last_transient_errmsg_length_arg,
360 ulonglong *last_transient_timestamp_arg, ulong *retries_count_arg) {
361 DBUG_ASSERT(end_time_arg);
362
363 *end_time_arg = is_info_set ? end_time / 10 : 0;
364 copy_to_ps_table(sid_map, gtid_arg, gtid_length_arg, original_commit_ts_arg,
365 immediate_commit_ts_arg, start_time_arg,
366 last_transient_errno_arg, last_transient_errmsg_arg,
367 last_transient_errmsg_length_arg,
368 last_transient_timestamp_arg, retries_count_arg);
369 }
370
Gtid_monitoring_info(mysql_mutex_t * atomic_mutex_arg)371 Gtid_monitoring_info::Gtid_monitoring_info(mysql_mutex_t *atomic_mutex_arg)
372 : atomic_mutex(atomic_mutex_arg) {
373 processing_trx = new Trx_monitoring_info;
374 last_processed_trx = new Trx_monitoring_info;
375 }
376
~Gtid_monitoring_info()377 Gtid_monitoring_info::~Gtid_monitoring_info() {
378 delete last_processed_trx;
379 delete processing_trx;
380 }
381
atomic_lock()382 void Gtid_monitoring_info::atomic_lock() {
383 if (atomic_mutex == nullptr) {
384 bool expected = false;
385 while (!atomic_locked.compare_exchange_weak(expected, true)) {
386 /*
387 On exchange failures, the atomic_locked value (true) is set
388 to the expected variable. It needs to be reset again.
389 */
390 expected = false;
391 /*
392 All "atomic" operations on this object are based on copying
393 variable contents and setting values. They should not take long.
394 */
395 my_thread_yield();
396 }
397 #ifndef DBUG_OFF
398 DBUG_ASSERT(!is_locked);
399 is_locked = true;
400 #endif
401 } else {
402 // If this object is relying on a mutex, just ensure it was acquired.
403 mysql_mutex_assert_owner(atomic_mutex)
404 }
405 }
406
atomic_unlock()407 void Gtid_monitoring_info::atomic_unlock() {
408 if (atomic_mutex == nullptr) {
409 #ifndef DBUG_OFF
410 DBUG_ASSERT(is_locked);
411 is_locked = false;
412 #endif
413 atomic_locked = false;
414 } else
415 mysql_mutex_assert_owner(atomic_mutex)
416 }
417
clear()418 void Gtid_monitoring_info::clear() {
419 atomic_lock();
420 processing_trx->clear();
421 last_processed_trx->clear();
422 atomic_unlock();
423 }
424
clear_processing_trx()425 void Gtid_monitoring_info::clear_processing_trx() {
426 atomic_lock();
427 processing_trx->clear();
428 atomic_unlock();
429 }
430
clear_last_processed_trx()431 void Gtid_monitoring_info::clear_last_processed_trx() {
432 atomic_lock();
433 last_processed_trx->clear();
434 atomic_unlock();
435 }
436
update(binary_log::transaction::compression::type t,size_t payload_size,size_t uncompressed_size)437 void Gtid_monitoring_info::update(binary_log::transaction::compression::type t,
438 size_t payload_size,
439 size_t uncompressed_size) {
440 processing_trx->compression_type = t;
441 processing_trx->compressed_bytes = payload_size;
442 processing_trx->uncompressed_bytes = uncompressed_size;
443 }
444
start(Gtid gtid_arg,ulonglong original_ts_arg,ulonglong immediate_ts_arg,bool skipped_arg)445 void Gtid_monitoring_info::start(Gtid gtid_arg, ulonglong original_ts_arg,
446 ulonglong immediate_ts_arg, bool skipped_arg) {
447 /**
448 When a new transaction starts processing, we reset all the information from
449 the previous processing_trx and fetch the current timestamp as the new
450 start_time.
451 */
452 if (!processing_trx->gtid.equals(gtid_arg) || !processing_trx->is_retrying) {
453 /* Collect current timestamp before the atomic operation */
454 ulonglong start_time = gtid_monitoring_getsystime();
455
456 atomic_lock();
457 processing_trx->gtid = gtid_arg;
458 processing_trx->original_commit_timestamp = original_ts_arg;
459 processing_trx->immediate_commit_timestamp = immediate_ts_arg;
460 processing_trx->start_time = start_time;
461 processing_trx->end_time = 0;
462 processing_trx->skipped = skipped_arg;
463 processing_trx->is_info_set = true;
464 processing_trx->last_transient_error_number = 0;
465 processing_trx->last_transient_error_message[0] = '\0';
466 processing_trx->last_transient_error_timestamp = 0;
467 processing_trx->transaction_retries = 0;
468 processing_trx->compression_type =
469 binary_log::transaction::compression::type::NONE;
470 processing_trx->compressed_bytes = 0;
471 processing_trx->uncompressed_bytes = 0;
472 atomic_unlock();
473 } else {
474 /**
475 If the transaction is being retried, only update the skipped field
476 because it determines if the information will be kept after it finishes
477 executing.
478 */
479 atomic_lock();
480 processing_trx->skipped = skipped_arg;
481 atomic_unlock();
482 }
483 }
484
finish()485 void Gtid_monitoring_info::finish() {
486 /* Collect current timestamp before the atomic operation */
487 ulonglong end_time = gtid_monitoring_getsystime();
488
489 atomic_lock();
490 processing_trx->end_time = end_time;
491 /*
492 We only swap if the transaction was not skipped.
493
494 Notice that only applier thread set the skipped variable to true.
495 */
496 if (!processing_trx->skipped) std::swap(processing_trx, last_processed_trx);
497
498 processing_trx->clear();
499 atomic_unlock();
500 }
501
copy_info_to(Trx_monitoring_info * processing_dest,Trx_monitoring_info * last_processed_dest)502 void Gtid_monitoring_info::copy_info_to(
503 Trx_monitoring_info *processing_dest,
504 Trx_monitoring_info *last_processed_dest) {
505 atomic_lock();
506 *processing_dest = *processing_trx;
507 *last_processed_dest = *last_processed_trx;
508 atomic_unlock();
509 }
510
copy_info_to(Gtid_monitoring_info * dest)511 void Gtid_monitoring_info::copy_info_to(Gtid_monitoring_info *dest) {
512 copy_info_to(dest->processing_trx, dest->last_processed_trx);
513 }
514
is_processing_trx_set()515 bool Gtid_monitoring_info::is_processing_trx_set() {
516 /*
517 This function is only called by threads about to update the monitoring
518 information. It should be safe to collect this information without
519 acquiring locks.
520 */
521 return processing_trx->is_info_set;
522 }
523
get_processing_trx_gtid()524 const Gtid *Gtid_monitoring_info::get_processing_trx_gtid() {
525 /*
526 This function is only called by relay log recovery/queuing.
527 */
528 DBUG_ASSERT(atomic_mutex != nullptr);
529 mysql_mutex_assert_owner(atomic_mutex);
530 return &processing_trx->gtid;
531 }
532
store_transient_error(uint transient_errno_arg,const char * transient_err_message_arg,ulong trans_retries_arg)533 void Gtid_monitoring_info::store_transient_error(
534 uint transient_errno_arg, const char *transient_err_message_arg,
535 ulong trans_retries_arg) {
536 ulonglong retry_timestamp = gtid_monitoring_getsystime();
537 processing_trx->is_retrying = true;
538 atomic_lock();
539 processing_trx->transaction_retries = trans_retries_arg;
540 processing_trx->last_transient_error_number = transient_errno_arg;
541 snprintf(processing_trx->last_transient_error_message,
542 sizeof(processing_trx->last_transient_error_message), "%.*s",
543 MAX_SLAVE_ERRMSG - 1, transient_err_message_arg);
544 processing_trx->last_transient_error_timestamp = retry_timestamp;
545 atomic_unlock();
546 }
547 #endif // ifdef MYSQL_SERVER
548