1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 02110-1301 USA */
23
24 #include "rpl_gtid_persist.h"
25
26 #include "debug_sync.h" // debug_sync_set_action
27 #include "log.h" // sql_print_error
28 #include "replication.h" // THD_ENTER_COND
29 #include "sql_base.h" // MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK
30 #include "sql_parse.h" // mysql_reset_thd_for_next_command
31
32 using std::list;
33 using std::string;
34
35
36 my_thread_handle compress_thread_id;
37 static bool terminate_compress_thread= false;
38 static bool should_compress= false;
39 const LEX_STRING Gtid_table_access_context::TABLE_NAME= {C_STRING_WITH_LEN("gtid_executed")};
40 const LEX_STRING Gtid_table_access_context::DB_NAME= {C_STRING_WITH_LEN("mysql")};
41
42 /**
43 A derived from THD::Attachable_trx class allows updates in
44 the attachable transaction. Callers of the class methods must
45 make sure the attachable_rw won't cause deadlock with the main transaction.
46 The destructor does not invoke ha_commit_{stmt,trans} nor ha_rollback_trans
47 on purpose.
48 Burden to terminate the read-write instance also lies on the caller!
49 In order to use this interface it *MUST* prove that no side effect to
50 the global transaction state can be inflicted by a chosen method.
51 */
52
53 class THD::Attachable_trx_rw : public THD::Attachable_trx
54 {
55 public:
is_read_only() const56 bool is_read_only() const { return false; }
Attachable_trx_rw(THD * thd)57 Attachable_trx_rw(THD *thd) : THD::Attachable_trx(thd)
58 {
59 m_thd->tx_read_only= false;
60 m_thd->lex->sql_command= SQLCOM_END;
61 m_xa_state_saved= m_thd->get_transaction()->xid_state()->get_state();
62 thd->get_transaction()->xid_state()->set_state(XID_STATE::XA_NOTR);
63 }
~Attachable_trx_rw()64 ~Attachable_trx_rw()
65 {
66 /* The attachable transaction has been already committed */
67 assert(!m_thd->get_transaction()->is_active(Transaction_ctx::STMT)
68 && !m_thd->get_transaction()->is_active(Transaction_ctx::SESSION));
69
70 m_thd->get_transaction()->xid_state()->set_state(m_xa_state_saved);
71 m_thd->tx_read_only= true;
72 }
73
74 private:
75 XID_STATE::xa_states m_xa_state_saved;
76 Attachable_trx_rw(const Attachable_trx_rw &);
77 Attachable_trx_rw &operator =(const Attachable_trx_rw &);
78 };
79
80
is_attachable_rw_transaction_active() const81 bool THD::is_attachable_rw_transaction_active() const
82 {
83 return m_attachable_trx != NULL && !m_attachable_trx->is_read_only();
84 }
85
86
begin_attachable_rw_transaction()87 void THD::begin_attachable_rw_transaction()
88 {
89 assert(!m_attachable_trx);
90
91 m_attachable_trx= new Attachable_trx_rw(this);
92 }
93
94
95 /**
96 Initialize a new THD.
97
98 @param p_thd Pointer to pointer to thread structure
99 */
init_thd(THD ** p_thd)100 static void init_thd(THD **p_thd)
101 {
102 DBUG_ENTER("init_thd");
103 THD *thd= *p_thd;
104 thd->thread_stack= reinterpret_cast<char *>(p_thd);
105 thd->set_command(COM_DAEMON);
106 thd->security_context()->skip_grants();
107 thd->system_thread= SYSTEM_THREAD_COMPRESS_GTID_TABLE;
108 thd->store_globals();
109 thd->set_time();
110 DBUG_VOID_RETURN;
111 }
112
113
114 /**
115 Release resourses for the thread and restores the
116 system_thread information.
117
118 @param thd Thread requesting to be destroyed
119 */
deinit_thd(THD * thd)120 static void deinit_thd(THD *thd)
121 {
122 DBUG_ENTER("deinit_thd");
123 thd->release_resources();
124 thd->restore_globals();
125 delete thd;
126 my_thread_set_THR_THD(NULL);
127 DBUG_VOID_RETURN;
128 }
129
130
create_thd()131 THD *Gtid_table_access_context::create_thd()
132 {
133 THD *thd= System_table_access::create_thd();
134 thd->system_thread= SYSTEM_THREAD_COMPRESS_GTID_TABLE;
135 /*
136 This is equivalent to a new "statement". For that reason, we call
137 both lex_start() and mysql_reset_thd_for_next_command.
138 */
139 lex_start(thd);
140 mysql_reset_thd_for_next_command(thd);
141 thd->set_skip_readonly_check();
142 return(thd);
143 }
144
drop_thd(THD * thd)145 void Gtid_table_access_context::drop_thd(THD *thd)
146 {
147 thd->reset_skip_readonly_check();
148 System_table_access::drop_thd(thd);
149 }
150
before_open(THD * thd)151 void Gtid_table_access_context::before_open(THD* thd)
152 {
153 DBUG_ENTER("Gtid_table_access_context::before_open");
154 /*
155 Allow to operate the gtid_executed table
156 while disconnecting the session.
157 */
158 m_flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
159 MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
160 MYSQL_OPEN_IGNORE_FLUSH |
161 MYSQL_LOCK_IGNORE_TIMEOUT |
162 MYSQL_OPEN_IGNORE_KILLED);
163 DBUG_VOID_RETURN;
164 }
165
166
init(THD ** thd,TABLE ** table,bool is_write)167 bool Gtid_table_access_context::init(THD **thd, TABLE **table, bool is_write)
168 {
169 DBUG_ENTER("Gtid_table_access_context::init");
170
171 if (!(*thd))
172 *thd= m_drop_thd_object= this->create_thd();
173 m_is_write= is_write;
174 if (m_is_write)
175 {
176 /* Disable binlog temporarily */
177 m_tmp_disable_binlog__save_options= (*thd)->variables.option_bits;
178 (*thd)->variables.option_bits&= ~OPTION_BIN_LOG;
179 }
180
181 if (!(*thd)->get_transaction()->xid_state()->has_state(XID_STATE::XA_NOTR))
182 {
183 /*
184 This type of caller of Attachable_trx_rw is deadlock-free with
185 the main transaction thanks to rejection to update
186 'mysql.gtid_executed' by XA main transaction.
187 */
188 assert((*thd)->get_transaction()->xid_state()->
189 has_state(XID_STATE::XA_IDLE) ||
190 (*thd)->get_transaction()->xid_state()->
191 has_state(XID_STATE::XA_PREPARED));
192
193 (*thd)->begin_attachable_rw_transaction();
194 }
195
196 (*thd)->is_operating_gtid_table_implicitly= true;
197 bool ret= this->open_table(*thd, DB_NAME, TABLE_NAME,
198 Gtid_table_persistor::number_fields,
199 m_is_write ? TL_WRITE : TL_READ,
200 table, &m_backup);
201
202 DBUG_RETURN(ret);
203 }
204
205
deinit(THD * thd,TABLE * table,bool error,bool need_commit)206 bool Gtid_table_access_context::deinit(THD *thd, TABLE *table,
207 bool error, bool need_commit)
208 {
209 DBUG_ENTER("Gtid_table_access_context::deinit");
210
211 bool err;
212 err= this->close_table(thd, table, &m_backup, 0 != error, need_commit);
213
214 /*
215 If err is true this means that there was some problem during
216 FLUSH LOGS commit phase.
217 */
218 if (err)
219 {
220 my_printf_error(ER_ERROR_DURING_FLUSH_LOGS, ER(ER_ERROR_DURING_FLUSH_LOGS),
221 MYF(ME_FATALERROR), err);
222 sql_print_error(ER(ER_ERROR_DURING_FLUSH_LOGS), err);
223 DBUG_RETURN(err);
224 }
225
226 /*
227 If Gtid is inserted through Attachable_trx_rw its has been done
228 in the above close_table() through ha_commit_trans().
229 It does not have any side effect to the global transaction state
230 as the only vulnerable part there relates to gtid (and is blocked
231 from recursive invocation).
232 */
233 if (thd->is_attachable_rw_transaction_active())
234 thd->end_attachable_transaction();
235
236 thd->is_operating_gtid_table_implicitly= false;
237 /* Reenable binlog */
238 if (m_is_write)
239 thd->variables.option_bits= m_tmp_disable_binlog__save_options;
240 if (m_drop_thd_object)
241 this->drop_thd(m_drop_thd_object);
242
243 DBUG_RETURN(err);
244 }
245
246
fill_fields(Field ** fields,const char * sid,rpl_gno gno_start,rpl_gno gno_end)247 int Gtid_table_persistor::fill_fields(Field **fields, const char *sid,
248 rpl_gno gno_start, rpl_gno gno_end)
249 {
250 DBUG_ENTER("Gtid_table_persistor::fill_field");
251
252 /* Store SID */
253 fields[0]->set_notnull();
254 if (fields[0]->store(sid, binary_log::Uuid::TEXT_LENGTH, &my_charset_bin))
255 {
256 my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[0]->field_name);
257 goto err;
258 }
259
260 /* Store gno_start */
261 fields[1]->set_notnull();
262 if (fields[1]->store(gno_start, true /* unsigned = true*/))
263 {
264 my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[1]->field_name);
265 goto err;
266 }
267
268 /* Store gno_end */
269 fields[2]->set_notnull();
270 if (fields[2]->store(gno_end, true /* unsigned = true*/))
271 {
272 my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[2]->field_name);
273 goto err;
274 }
275
276 DBUG_RETURN(0);
277 err:
278 DBUG_RETURN(-1);
279 }
280
281
write_row(TABLE * table,const char * sid,rpl_gno gno_start,rpl_gno gno_end)282 int Gtid_table_persistor::write_row(TABLE *table, const char *sid,
283 rpl_gno gno_start, rpl_gno gno_end)
284 {
285 DBUG_ENTER("Gtid_table_persistor::write_row");
286 int error= 0;
287 Field **fields= NULL;
288
289 fields= table->field;
290 empty_record(table);
291
292 if (fill_fields(fields, sid, gno_start, gno_end))
293 DBUG_RETURN(-1);
294
295 /* Inserts a new row into the gtid_executed table. */
296 error= table->file->ha_write_row(table->record[0]);
297 if (DBUG_EVALUATE_IF("simulate_err_on_write_gtid_into_table",
298 (error= -1), error))
299 {
300 if (error == HA_ERR_FOUND_DUPP_KEY)
301 {
302 /* Ignore the duplicate key error, log a warning for it. */
303 sql_print_warning("The transaction owned GTID is already in "
304 "the %s table, which is caused by an "
305 "explicit modifying from user client.",
306 Gtid_table_access_context::TABLE_NAME.str);
307 }
308 else
309 {
310 table->file->print_error(error, MYF(0));
311 /*
312 This makes sure that the error is -1 and not the status
313 returned by the handler.
314 */
315 DBUG_RETURN(-1);
316 }
317 }
318
319 DBUG_RETURN(0);
320 }
321
322
update_row(TABLE * table,const char * sid,rpl_gno gno_start,rpl_gno new_gno_end)323 int Gtid_table_persistor::update_row(TABLE *table, const char *sid,
324 rpl_gno gno_start, rpl_gno new_gno_end)
325 {
326 DBUG_ENTER("Gtid_table_persistor::update_row");
327 int error= 0;
328 Field **fields= NULL;
329 uchar user_key[MAX_KEY_LENGTH];
330
331 fields= table->field;
332 empty_record(table);
333
334 /* Store SID */
335 fields[0]->set_notnull();
336 if (fields[0]->store(sid, binary_log::Uuid::TEXT_LENGTH, &my_charset_bin))
337 {
338 my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[0]->field_name);
339 DBUG_RETURN(-1);
340 }
341
342 /* Store gno_start */
343 fields[1]->set_notnull();
344 if (fields[1]->store(gno_start, true /* unsigned = true*/))
345 {
346 my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[1]->field_name);
347 DBUG_RETURN(-1);
348 }
349
350 key_copy(user_key, table->record[0], table->key_info,
351 table->key_info->key_length);
352
353 if ((error= table->file->ha_index_init(0, 1)))
354 {
355 table->file->print_error(error, MYF(0));
356 DBUG_PRINT("info", ("ha_index_init error"));
357 goto end;
358 }
359
360 if ((error= table->file->ha_index_read_map(table->record[0], user_key,
361 HA_WHOLE_KEY,
362 HA_READ_KEY_EXACT)))
363 {
364 DBUG_PRINT ("info", ("Row not found"));
365 goto end;
366 }
367 else
368 {
369 DBUG_PRINT("info", ("Row found"));
370 store_record(table, record[1]);
371 }
372
373 /* Store new_gno_end */
374 fields[2]->set_notnull();
375 if ((error= fields[2]->store(new_gno_end, true /* unsigned = true*/)))
376 {
377 my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[2]->field_name);
378 goto end;
379 }
380
381 /* Update a row in the gtid_executed table. */
382 error= table->file->ha_update_row(table->record[1], table->record[0]);
383 if (DBUG_EVALUATE_IF("simulate_error_on_compress_gtid_table",
384 (error= -1), error))
385 {
386 table->file->print_error(error, MYF(0));
387 /*
388 This makes sure that the error is -1 and not the status returned
389 by the handler.
390 */
391 goto end;
392 }
393
394 end:
395 table->file->ha_index_end();
396 if (error)
397 DBUG_RETURN(-1);
398 else
399 DBUG_RETURN(0);
400 }
401
402
save(THD * thd,const Gtid * gtid)403 int Gtid_table_persistor::save(THD *thd, const Gtid *gtid)
404 {
405 DBUG_ENTER("Gtid_table_persistor::save(THD *thd, Gtid *gtid)");
406 int error= 0;
407 TABLE *table= NULL;
408 Gtid_table_access_context table_access_ctx;
409 char buf[binary_log::Uuid::TEXT_LENGTH + 1];
410
411 /* Get source id */
412 global_sid_lock->rdlock();
413 rpl_sid sid= global_sid_map->sidno_to_sid(gtid->sidno);
414 global_sid_lock->unlock();
415 sid.to_string(buf);
416
417 if (table_access_ctx.init(&thd, &table, true))
418 {
419 error= 1;
420 goto end;
421 }
422
423 /* Save the gtid info into table. */
424 error= write_row(table, buf, gtid->gno, gtid->gno);
425
426 end:
427 table_access_ctx.deinit(thd, table, 0 != error, false);
428
429 /* Do not protect m_count for improving transactions' concurrency */
430 if (error == 0 && gtid_executed_compression_period != 0)
431 {
432 uint32 count= (uint32)m_count.atomic_add(1);
433 if (count == gtid_executed_compression_period ||
434 DBUG_EVALUATE_IF("compress_gtid_table", 1, 0))
435 {
436 mysql_mutex_lock(&LOCK_compress_gtid_table);
437 should_compress= true;
438 mysql_cond_signal(&COND_compress_gtid_table);
439 mysql_mutex_unlock(&LOCK_compress_gtid_table);
440 }
441 }
442
443 DBUG_RETURN(error);
444 }
445
446
save(const Gtid_set * gtid_set)447 int Gtid_table_persistor::save(const Gtid_set *gtid_set)
448 {
449 DBUG_ENTER("Gtid_table_persistor::save(Gtid_set *gtid_set)");
450 int ret= 0;
451 int error= 0;
452 TABLE *table= NULL;
453 Gtid_table_access_context table_access_ctx;
454 THD *thd= current_thd;
455
456 if (table_access_ctx.init(&thd, &table, true))
457 {
458 error= 1;
459 /*
460 Gtid table is not ready to be used, so failed to
461 open it. Ignore the error.
462 */
463 thd->clear_error();
464 if (!thd->get_stmt_da()->is_set())
465 thd->get_stmt_da()->set_ok_status(0, 0, NULL);
466 goto end;
467 }
468
469 ret= error= save(table, gtid_set);
470
471 end:
472 const int deinit_ret= table_access_ctx.deinit(thd, table, 0 != error, true);
473
474 if (!ret && deinit_ret)
475 ret= -1;
476
477 /* Notify compression thread to compress gtid_executed table. */
478 if (error == 0 && DBUG_EVALUATE_IF("dont_compress_gtid_table", 0, 1))
479 {
480 mysql_mutex_lock(&LOCK_compress_gtid_table);
481 should_compress= true;
482 mysql_cond_signal(&COND_compress_gtid_table);
483 mysql_mutex_unlock(&LOCK_compress_gtid_table);
484 }
485
486 DBUG_RETURN(ret);
487 }
488
489
save(TABLE * table,const Gtid_set * gtid_set)490 int Gtid_table_persistor::save(TABLE *table, const Gtid_set *gtid_set)
491 {
492 DBUG_ENTER("Gtid_table_persistor::save(TABLE* table, "
493 "Gtid_set *gtid_set)");
494 int error= 0;
495 list<Gtid_interval> gtid_intervals;
496 list<Gtid_interval>::iterator iter;
497
498 /* Get GTID intervals from gtid_set. */
499 gtid_set->get_gtid_intervals(>id_intervals);
500 for (iter= gtid_intervals.begin(); iter != gtid_intervals.end(); iter++)
501 {
502 /* Get source id. */
503 char buf[binary_log::Uuid::TEXT_LENGTH + 1];
504 rpl_sid sid= gtid_set->get_sid_map()->sidno_to_sid(iter->sidno);
505 sid.to_string(buf);
506
507 /* Save the gtid interval into table. */
508 if ((error= write_row(table, buf, iter->gno_start, iter->gno_end)))
509 break;
510 }
511
512 gtid_intervals.clear();
513 DBUG_RETURN(error);
514 }
515
516
517 /**
518 Simulate error and crash in the middle of the transaction
519 of compressing gtid_executed table.
520
521 @param thd Thread requesting to compress the table
522
523 @return
524 @retval 0 OK.
525 @retval -1 Error.
526 */
527 #ifndef NDEBUG
dbug_test_on_compress(THD * thd)528 static int dbug_test_on_compress(THD *thd)
529 {
530 DBUG_ENTER("dbug_test_on_compress");
531 /*
532 Sleep a little, so that notified user thread executed the statement
533 completely.
534 */
535 DBUG_EXECUTE_IF("fetch_compression_thread_stage_info", sleep(5););
536 DBUG_EXECUTE_IF("fetch_compression_thread_stage_info",
537 {
538 const char act[]= "now signal fetch_thread_stage";
539 assert(opt_debug_sync_timeout > 0);
540 assert(!debug_sync_set_action(thd,
541 STRING_WITH_LEN(act)));
542 };);
543 /* Sleep a little, so that we can always fetch the correct stage info. */
544 DBUG_EXECUTE_IF("fetch_compression_thread_stage_info", sleep(1););
545
546 /*
547 Wait until notified user thread executed the statement completely,
548 then go to crash.
549 */
550 DBUG_EXECUTE_IF("simulate_crash_on_compress_gtid_table",
551 {
552 const char act[]= "now wait_for notified_thread_complete";
553 assert(opt_debug_sync_timeout > 0);
554 assert(!debug_sync_set_action(thd,
555 STRING_WITH_LEN(act)));
556 };);
557 DBUG_EXECUTE_IF("simulate_crash_on_compress_gtid_table", DBUG_SUICIDE(););
558
559 DBUG_RETURN(0);
560 }
561 #endif
562
563
compress(THD * thd)564 int Gtid_table_persistor::compress(THD *thd)
565 {
566 DBUG_ENTER("Gtid_table_persistor::compress");
567 int error= 0;
568 bool is_complete= false;
569
570 while (!is_complete && !error)
571 error= compress_in_single_transaction(thd, is_complete);
572
573 m_count.atomic_set(0);
574
575 DBUG_EXECUTE_IF("compress_gtid_table",
576 {
577 const char act[]= "now signal complete_compression";
578 assert(opt_debug_sync_timeout > 0);
579 assert(!debug_sync_set_action(thd,
580 STRING_WITH_LEN(act)));
581 };);
582
583 DBUG_RETURN(error);
584 }
585
586
compress_in_single_transaction(THD * thd,bool & is_complete)587 int Gtid_table_persistor::compress_in_single_transaction(THD *thd,
588 bool &is_complete)
589 {
590 DBUG_ENTER("Gtid_table_persistor::compress_in_single_transaction");
591 int error= 0;
592 TABLE *table= NULL;
593 Gtid_table_access_context table_access_ctx;
594
595 mysql_mutex_lock(&LOCK_reset_gtid_table);
596 if (table_access_ctx.init(&thd, &table, true))
597 {
598 error= 1;
599 goto end;
600 }
601
602 /*
603 Reset stage_compressing_gtid_table to overwrite
604 stage_system_lock set in open_table(...).
605 */
606 THD_STAGE_INFO(thd, stage_compressing_gtid_table);
607
608 if ((error= compress_first_consecutive_range(table, is_complete)))
609 goto end;
610
611 #ifndef NDEBUG
612 error= dbug_test_on_compress(thd);
613 #endif
614
615 end:
616 table_access_ctx.deinit(thd, table, 0 != error, true);
617 mysql_mutex_unlock(&LOCK_reset_gtid_table);
618
619 DBUG_RETURN(error);
620 }
621
622
compress_first_consecutive_range(TABLE * table,bool & is_complete)623 int Gtid_table_persistor::compress_first_consecutive_range(TABLE *table,
624 bool &is_complete)
625 {
626 DBUG_ENTER("Gtid_table_persistor::compress_first_consecutive_range");
627 int ret= 0;
628 int err= 0;
629 /* Record the source id of the first consecutive gtid. */
630 string sid;
631 /* Record the first GNO of the first consecutive gtid. */
632 rpl_gno gno_start= 0;
633 /* Record the last GNO of the last consecutive gtid. */
634 rpl_gno gno_end= 0;
635 /* Record the gtid interval of the current gtid. */
636 string cur_sid;
637 rpl_gno cur_gno_start= 0;
638 rpl_gno cur_gno_end= 0;
639 /*
640 Indicate if we have consecutive gtids in the table.
641 Set the flag to true if we find the first consecutive gtids.
642 The first consecutive range of gtids will be compressed if
643 the flag is true.
644 */
645 bool find_first_consecutive_gtids= false;
646
647 if ((err= table->file->ha_index_init(0, true)))
648 DBUG_RETURN(-1);
649
650 /* Read each row by the PK(sid, gno_start) in increasing order. */
651 err= table->file->ha_index_first(table->record[0]);
652 /* Compress the first consecutive range of gtids. */
653 while(!err)
654 {
655 get_gtid_interval(table, cur_sid, cur_gno_start, cur_gno_end);
656 /*
657 Check if gtid intervals of previous gtid and current gtid
658 are consecutive.
659 */
660 if (sid == cur_sid && gno_end + 1 == cur_gno_start)
661 {
662 find_first_consecutive_gtids= true;
663 gno_end= cur_gno_end;
664 /* Delete the consecutive gtid. We do not delete the first
665 consecutive gtid, so that we can update it later. */
666 if ((err= table->file->ha_delete_row(table->record[0])))
667 {
668 table->file->print_error(err, MYF(0));
669 break;
670 }
671 }
672 else
673 {
674 if (find_first_consecutive_gtids)
675 break;
676
677 /* Record the gtid interval of the first consecutive gtid. */
678 sid= cur_sid;
679 gno_start= cur_gno_start;
680 gno_end= cur_gno_end;
681 }
682 err= table->file->ha_index_next(table->record[0]);
683 }
684
685 table->file->ha_index_end();
686 /* Indicate if the gtid_executed table is compressd completely. */
687 is_complete= (err == HA_ERR_END_OF_FILE);
688
689 if (err != HA_ERR_END_OF_FILE && err != 0)
690 ret= -1;
691 else if (find_first_consecutive_gtids)
692 /*
693 Update the gno_end of the first consecutive gtid with the gno_end of
694 the last consecutive gtid for the first consecutive range of gtids.
695 */
696 ret= update_row(table, sid.c_str(), gno_start, gno_end);
697
698 DBUG_RETURN(ret);
699 }
700
701
reset(THD * thd)702 int Gtid_table_persistor::reset(THD *thd)
703 {
704 DBUG_ENTER("Gtid_table_persistor::reset");
705 int error= 0;
706 TABLE *table= NULL;
707 Gtid_table_access_context table_access_ctx;
708
709 mysql_mutex_lock(&LOCK_reset_gtid_table);
710 if (table_access_ctx.init(&thd, &table, true))
711 {
712 error= 1;
713 goto end;
714 }
715
716 error= delete_all(table);
717
718 end:
719 table_access_ctx.deinit(thd, table, 0 != error, true);
720 mysql_mutex_unlock(&LOCK_reset_gtid_table);
721
722 DBUG_RETURN(error);
723 }
724
725
encode_gtid_text(TABLE * table)726 string Gtid_table_persistor::encode_gtid_text(TABLE *table)
727 {
728 DBUG_ENTER("Gtid_table_persistor::encode_gtid_text");
729 char buff[MAX_FIELD_WIDTH];
730 String str(buff, sizeof(buff), &my_charset_bin);
731
732 /* Fetch gtid interval from the table */
733 table->field[0]->val_str(&str);
734 string gtid_text(str.c_ptr_safe());
735 gtid_text.append(Gtid_set::default_string_format.sid_gno_separator);
736 table->field[1]->val_str(&str);
737 gtid_text.append(str.c_ptr_safe());
738 gtid_text.append(Gtid_set::default_string_format.gno_start_end_separator);
739 table->field[2]->val_str(&str);
740 gtid_text.append(str.c_ptr_safe());
741
742 DBUG_RETURN(gtid_text);
743 }
744
745
get_gtid_interval(TABLE * table,string & sid,rpl_gno & gno_start,rpl_gno & gno_end)746 void Gtid_table_persistor::get_gtid_interval(TABLE *table, string &sid,
747 rpl_gno &gno_start,
748 rpl_gno &gno_end)
749 {
750 DBUG_ENTER("Gtid_table_persistor::get_gtid_interval");
751 char buff[MAX_FIELD_WIDTH];
752 String str(buff, sizeof(buff), &my_charset_bin);
753
754 /* Fetch gtid interval from the table */
755 table->field[0]->val_str(&str);
756 sid= string(str.c_ptr_safe());
757 gno_start= table->field[1]->val_int();
758 gno_end= table->field[2]->val_int();
759 DBUG_VOID_RETURN;
760 }
761
762
fetch_gtids(Gtid_set * gtid_set)763 int Gtid_table_persistor::fetch_gtids(Gtid_set *gtid_set)
764 {
765 DBUG_ENTER("Gtid_table_persistor::fetch_gtids");
766 int ret= 0;
767 int err= 0;
768 TABLE *table= NULL;
769 Gtid_table_access_context table_access_ctx;
770 THD *thd= current_thd;
771
772 if (table_access_ctx.init(&thd, &table, false))
773 {
774 ret= 1;
775 goto end;
776 }
777
778 if ((err= table->file->ha_rnd_init(true)))
779 {
780 ret= -1;
781 goto end;
782 }
783
784 while(!(err= table->file->ha_rnd_next(table->record[0])))
785 {
786 /* Store the gtid into the gtid_set */
787
788 /**
789 @todo:
790 - take only global_sid_lock->rdlock(), and take
791 gtid_state->sid_lock for each iteration.
792 - Add wrapper around Gtid_set::add_gno_interval and call that
793 instead.
794 */
795 global_sid_lock->wrlock();
796 if (gtid_set->add_gtid_text(encode_gtid_text(table).c_str()) !=
797 RETURN_STATUS_OK)
798 {
799 global_sid_lock->unlock();
800 break;
801 }
802 global_sid_lock->unlock();
803 }
804
805 table->file->ha_rnd_end();
806 if (err != HA_ERR_END_OF_FILE)
807 ret= -1;
808
809 end:
810 table_access_ctx.deinit(thd, table, 0 != ret, true);
811
812 DBUG_RETURN(ret);
813 }
814
815
delete_all(TABLE * table)816 int Gtid_table_persistor::delete_all(TABLE *table)
817 {
818 DBUG_ENTER("Gtid_table_persistor::delete_all");
819 int err= 0;
820
821 if ((err= table->file->ha_rnd_init(true)))
822 DBUG_RETURN(-1);
823
824 /*
825 Delete all rows in the gtid_executed table. We cannot use truncate(),
826 since it is a non-transactional DDL operation.
827 */
828 while(!(err= table->file->ha_rnd_next(table->record[0])))
829 {
830 /* Delete current row. */
831 err= table->file->ha_delete_row(table->record[0]);
832 if (DBUG_EVALUATE_IF("simulate_error_on_delete_gtid_from_table",
833 (err= -1), err))
834 {
835 table->file->print_error(err, MYF(0));
836 sql_print_error("Failed to delete the row: '%s' from the gtid_executed "
837 "table.", encode_gtid_text(table).c_str());
838 break;
839 }
840 }
841
842 table->file->ha_rnd_end();
843 if (err != HA_ERR_END_OF_FILE)
844 DBUG_RETURN(-1);
845
846 DBUG_RETURN(0);
847 }
848
849
850 /**
851 The main function of the compression thread.
852 - compress the gtid_executed table when get a compression signal.
853
854 @param p_thd Thread requesting to compress the table
855
856 @return
857 @retval 0 OK. always, the compression thread will swallow any error
858 for going to wait for next compression signal until
859 it is terminated.
860 */
compress_gtid_table(void * p_thd)861 extern "C" void *compress_gtid_table(void *p_thd)
862 {
863 THD *thd=(THD*) p_thd;
864 mysql_thread_set_psi_id(thd->thread_id());
865 my_thread_init();
866 DBUG_ENTER("compress_gtid_table");
867
868 init_thd(&thd);
869 /*
870 Gtid table compression thread should ignore 'read-only' and
871 'super_read_only' options so that it can update 'mysql.gtid_executed'
872 replication repository tables.
873 */
874 thd->set_skip_readonly_check();
875 for (;;)
876 {
877 mysql_mutex_lock(&LOCK_compress_gtid_table);
878 if (terminate_compress_thread)
879 break;
880 THD_ENTER_COND(thd, &COND_compress_gtid_table,
881 &LOCK_compress_gtid_table,
882 &stage_suspending, NULL);
883 /* Add the check to handle spurious wakeups from system. */
884 while(!(should_compress || terminate_compress_thread))
885 mysql_cond_wait(&COND_compress_gtid_table, &LOCK_compress_gtid_table);
886 should_compress= false;
887 if (terminate_compress_thread)
888 break;
889 mysql_mutex_unlock(&LOCK_compress_gtid_table);
890 THD_EXIT_COND(thd, NULL);
891
892 THD_STAGE_INFO(thd, stage_compressing_gtid_table);
893 /* Compressing the gtid_executed table. */
894 if (gtid_state->compress(thd))
895 {
896 sql_print_warning("Failed to compress the gtid_executed table.");
897 /* Clear the error for going to wait for next compression signal. */
898 thd->clear_error();
899 DBUG_EXECUTE_IF("simulate_error_on_compress_gtid_table",
900 {
901 const char act[]= "now signal compression_failed";
902 assert(opt_debug_sync_timeout > 0);
903 assert(!debug_sync_set_action(current_thd,
904 STRING_WITH_LEN(act)));
905 };);
906 }
907 }
908
909 mysql_mutex_unlock(&LOCK_compress_gtid_table);
910 thd->reset_skip_readonly_check();
911 deinit_thd(thd);
912 DBUG_LEAVE;
913 my_thread_end();
914 my_thread_exit(0);
915 return 0;
916 }
917
918
919 /**
920 Create the compression thread to compress gtid_executed table.
921 */
create_compress_gtid_table_thread()922 void create_compress_gtid_table_thread()
923 {
924 my_thread_attr_t attr;
925 int error;
926 THD *thd;
927 if (!(thd= new THD))
928 {
929 sql_print_error("Failed to compress the gtid_executed table, because "
930 "it is failed to allocate the THD.");
931 return;
932 }
933
934 thd->set_new_thread_id();
935
936 THD_CHECK_SENTRY(thd);
937
938 if (my_thread_attr_init(&attr))
939 {
940 sql_print_error("Failed to initialize thread attribute "
941 "when creating compression thread.");
942 delete thd;
943 return;
944 }
945
946 if (DBUG_EVALUATE_IF("simulate_create_compress_thread_failure",
947 error= 1, 0) ||
948 #ifndef _WIN32
949 (error= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)) ||
950 #endif
951 (error= mysql_thread_create(key_thread_compress_gtid_table,
952 &compress_thread_id, &attr,
953 compress_gtid_table, (void*) thd)))
954 {
955 sql_print_error("Can not create thread to compress gtid_executed table "
956 "(errno= %d)", error);
957 /* Delete the created THD after failed to create a compression thread. */
958 delete thd;
959 }
960
961 (void) my_thread_attr_destroy(&attr);
962 }
963
964
965 /**
966 Terminate the compression thread.
967 */
terminate_compress_gtid_table_thread()968 void terminate_compress_gtid_table_thread()
969 {
970 DBUG_ENTER("terminate_compress_gtid_table_thread");
971 int error= 0;
972
973 /* Notify suspended compression thread. */
974 mysql_mutex_lock(&LOCK_compress_gtid_table);
975 terminate_compress_thread= true;
976 mysql_cond_signal(&COND_compress_gtid_table);
977 mysql_mutex_unlock(&LOCK_compress_gtid_table);
978
979 if (compress_thread_id.thread != 0)
980 {
981 error= my_thread_join(&compress_thread_id, NULL);
982 compress_thread_id.thread= 0;
983 }
984
985 if (error != 0)
986 sql_print_warning("Could not join gtid_executed table compression thread. "
987 "error:%d", error);
988
989 DBUG_VOID_RETURN;
990 }
991
992