1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22    02110-1301 USA */
23 
24 #include "rpl_gtid_persist.h"
25 
26 #include "debug_sync.h"       // debug_sync_set_action
27 #include "log.h"              // sql_print_error
28 #include "replication.h"      // THD_ENTER_COND
29 #include "sql_base.h"         // MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK
30 #include "sql_parse.h"        // mysql_reset_thd_for_next_command
31 
32 using std::list;
33 using std::string;
34 
35 
36 my_thread_handle compress_thread_id;
37 static bool terminate_compress_thread= false;
38 static bool should_compress= false;
39 const LEX_STRING Gtid_table_access_context::TABLE_NAME= {C_STRING_WITH_LEN("gtid_executed")};
40 const LEX_STRING Gtid_table_access_context::DB_NAME= {C_STRING_WITH_LEN("mysql")};
41 
42 /**
43   A derived from THD::Attachable_trx class allows updates in
44   the attachable transaction. Callers of the class methods must
45   make sure the attachable_rw won't cause deadlock with the main transaction.
46   The destructor does not invoke ha_commit_{stmt,trans} nor ha_rollback_trans
47   on purpose.
48   Burden to terminate the read-write instance also lies on the caller!
49   In order to use this interface it *MUST* prove that no side effect to
50   the global transaction state can be inflicted by a chosen method.
51 */
52 
53 class THD::Attachable_trx_rw : public THD::Attachable_trx
54 {
55 public:
is_read_only() const56   bool is_read_only() const { return false; }
Attachable_trx_rw(THD * thd)57   Attachable_trx_rw(THD *thd) : THD::Attachable_trx(thd)
58   {
59     m_thd->tx_read_only= false;
60     m_thd->lex->sql_command= SQLCOM_END;
61     m_xa_state_saved= m_thd->get_transaction()->xid_state()->get_state();
62     thd->get_transaction()->xid_state()->set_state(XID_STATE::XA_NOTR);
63   }
~Attachable_trx_rw()64   ~Attachable_trx_rw()
65   {
66     /* The attachable transaction has been already committed */
67     assert(!m_thd->get_transaction()->is_active(Transaction_ctx::STMT)
68            && !m_thd->get_transaction()->is_active(Transaction_ctx::SESSION));
69 
70     m_thd->get_transaction()->xid_state()->set_state(m_xa_state_saved);
71     m_thd->tx_read_only= true;
72   }
73 
74 private:
75   XID_STATE::xa_states m_xa_state_saved;
76   Attachable_trx_rw(const Attachable_trx_rw &);
77   Attachable_trx_rw &operator =(const Attachable_trx_rw &);
78 };
79 
80 
is_attachable_rw_transaction_active() const81 bool THD::is_attachable_rw_transaction_active() const
82 {
83   return m_attachable_trx != NULL && !m_attachable_trx->is_read_only();
84 }
85 
86 
begin_attachable_rw_transaction()87 void THD::begin_attachable_rw_transaction()
88 {
89   assert(!m_attachable_trx);
90 
91   m_attachable_trx= new Attachable_trx_rw(this);
92 }
93 
94 
95 /**
96   Initialize a new THD.
97 
98   @param p_thd  Pointer to pointer to thread structure
99 */
init_thd(THD ** p_thd)100 static void init_thd(THD **p_thd)
101 {
102   DBUG_ENTER("init_thd");
103   THD *thd= *p_thd;
104   thd->thread_stack= reinterpret_cast<char *>(p_thd);
105   thd->set_command(COM_DAEMON);
106   thd->security_context()->skip_grants();
107   thd->system_thread= SYSTEM_THREAD_COMPRESS_GTID_TABLE;
108   thd->store_globals();
109   thd->set_time();
110   DBUG_VOID_RETURN;
111 }
112 
113 
114 /**
115   Release resourses for the thread and restores the
116   system_thread information.
117 
118   @param thd Thread requesting to be destroyed
119 */
deinit_thd(THD * thd)120 static void deinit_thd(THD *thd)
121 {
122   DBUG_ENTER("deinit_thd");
123   thd->release_resources();
124   thd->restore_globals();
125   delete thd;
126   my_thread_set_THR_THD(NULL);
127   DBUG_VOID_RETURN;
128 }
129 
130 
create_thd()131 THD *Gtid_table_access_context::create_thd()
132 {
133   THD *thd= System_table_access::create_thd();
134   thd->system_thread= SYSTEM_THREAD_COMPRESS_GTID_TABLE;
135   /*
136     This is equivalent to a new "statement". For that reason, we call
137     both lex_start() and mysql_reset_thd_for_next_command.
138   */
139   lex_start(thd);
140   mysql_reset_thd_for_next_command(thd);
141   thd->set_skip_readonly_check();
142   return(thd);
143 }
144 
drop_thd(THD * thd)145 void Gtid_table_access_context::drop_thd(THD *thd)
146 {
147   thd->reset_skip_readonly_check();
148   System_table_access::drop_thd(thd);
149 }
150 
before_open(THD * thd)151 void Gtid_table_access_context::before_open(THD* thd)
152 {
153   DBUG_ENTER("Gtid_table_access_context::before_open");
154   /*
155     Allow to operate the gtid_executed table
156     while disconnecting the session.
157   */
158   m_flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
159             MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
160             MYSQL_OPEN_IGNORE_FLUSH |
161             MYSQL_LOCK_IGNORE_TIMEOUT |
162             MYSQL_OPEN_IGNORE_KILLED);
163   DBUG_VOID_RETURN;
164 }
165 
166 
init(THD ** thd,TABLE ** table,bool is_write)167 bool Gtid_table_access_context::init(THD **thd, TABLE **table, bool is_write)
168 {
169   DBUG_ENTER("Gtid_table_access_context::init");
170 
171   if (!(*thd))
172     *thd= m_drop_thd_object= this->create_thd();
173   m_is_write= is_write;
174   if (m_is_write)
175   {
176     /* Disable binlog temporarily */
177     m_tmp_disable_binlog__save_options= (*thd)->variables.option_bits;
178     (*thd)->variables.option_bits&= ~OPTION_BIN_LOG;
179   }
180 
181   if (!(*thd)->get_transaction()->xid_state()->has_state(XID_STATE::XA_NOTR))
182   {
183     /*
184       This type of caller of Attachable_trx_rw is deadlock-free with
185       the main transaction thanks to rejection to update
186       'mysql.gtid_executed' by XA main transaction.
187     */
188     assert((*thd)->get_transaction()->xid_state()->
189            has_state(XID_STATE::XA_IDLE) ||
190            (*thd)->get_transaction()->xid_state()->
191            has_state(XID_STATE::XA_PREPARED));
192 
193     (*thd)->begin_attachable_rw_transaction();
194   }
195 
196   (*thd)->is_operating_gtid_table_implicitly= true;
197   bool ret= this->open_table(*thd, DB_NAME, TABLE_NAME,
198                              Gtid_table_persistor::number_fields,
199                              m_is_write ? TL_WRITE : TL_READ,
200                              table, &m_backup);
201 
202   DBUG_RETURN(ret);
203 }
204 
205 
deinit(THD * thd,TABLE * table,bool error,bool need_commit)206 bool Gtid_table_access_context::deinit(THD *thd, TABLE *table,
207                                        bool error, bool need_commit)
208 {
209   DBUG_ENTER("Gtid_table_access_context::deinit");
210 
211   bool err;
212   err= this->close_table(thd, table, &m_backup, 0 != error, need_commit);
213 
214   /*
215     If err is true this means that there was some problem during
216     FLUSH LOGS commit phase.
217   */
218   if (err)
219   {
220     my_printf_error(ER_ERROR_DURING_FLUSH_LOGS, ER(ER_ERROR_DURING_FLUSH_LOGS),
221                     MYF(ME_FATALERROR), err);
222     sql_print_error(ER(ER_ERROR_DURING_FLUSH_LOGS), err);
223     DBUG_RETURN(err);
224   }
225 
226   /*
227     If Gtid is inserted through Attachable_trx_rw its has been done
228     in the above close_table() through ha_commit_trans().
229     It does not have any side effect to the global transaction state
230     as the only vulnerable part there relates to gtid (and is blocked
231     from recursive invocation).
232   */
233   if (thd->is_attachable_rw_transaction_active())
234     thd->end_attachable_transaction();
235 
236   thd->is_operating_gtid_table_implicitly= false;
237   /* Reenable binlog */
238   if (m_is_write)
239     thd->variables.option_bits= m_tmp_disable_binlog__save_options;
240   if (m_drop_thd_object)
241     this->drop_thd(m_drop_thd_object);
242 
243   DBUG_RETURN(err);
244 }
245 
246 
fill_fields(Field ** fields,const char * sid,rpl_gno gno_start,rpl_gno gno_end)247 int Gtid_table_persistor::fill_fields(Field **fields, const char *sid,
248                                       rpl_gno gno_start, rpl_gno gno_end)
249 {
250   DBUG_ENTER("Gtid_table_persistor::fill_field");
251 
252   /* Store SID */
253   fields[0]->set_notnull();
254   if (fields[0]->store(sid, binary_log::Uuid::TEXT_LENGTH, &my_charset_bin))
255   {
256     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[0]->field_name);
257     goto err;
258   }
259 
260   /* Store gno_start */
261   fields[1]->set_notnull();
262   if (fields[1]->store(gno_start, true /* unsigned = true*/))
263   {
264     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[1]->field_name);
265     goto err;
266   }
267 
268   /* Store gno_end */
269   fields[2]->set_notnull();
270   if (fields[2]->store(gno_end, true /* unsigned = true*/))
271   {
272     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[2]->field_name);
273     goto err;
274   }
275 
276   DBUG_RETURN(0);
277 err:
278   DBUG_RETURN(-1);
279 }
280 
281 
write_row(TABLE * table,const char * sid,rpl_gno gno_start,rpl_gno gno_end)282 int Gtid_table_persistor::write_row(TABLE *table, const char *sid,
283                                     rpl_gno gno_start, rpl_gno gno_end)
284 {
285   DBUG_ENTER("Gtid_table_persistor::write_row");
286   int error= 0;
287   Field **fields= NULL;
288 
289   fields= table->field;
290   empty_record(table);
291 
292   if (fill_fields(fields, sid, gno_start, gno_end))
293     DBUG_RETURN(-1);
294 
295   /* Inserts a new row into the gtid_executed table. */
296   error= table->file->ha_write_row(table->record[0]);
297   if (DBUG_EVALUATE_IF("simulate_err_on_write_gtid_into_table",
298                        (error= -1), error))
299   {
300     if (error == HA_ERR_FOUND_DUPP_KEY)
301     {
302       /* Ignore the duplicate key error, log a warning for it. */
303       sql_print_warning("The transaction owned GTID is already in "
304                         "the %s table, which is caused by an "
305                         "explicit modifying from user client.",
306                         Gtid_table_access_context::TABLE_NAME.str);
307     }
308     else
309     {
310       table->file->print_error(error, MYF(0));
311       /*
312         This makes sure that the error is -1 and not the status
313         returned by the handler.
314       */
315       DBUG_RETURN(-1);
316     }
317   }
318 
319   DBUG_RETURN(0);
320 }
321 
322 
update_row(TABLE * table,const char * sid,rpl_gno gno_start,rpl_gno new_gno_end)323 int Gtid_table_persistor::update_row(TABLE *table, const char *sid,
324                                      rpl_gno gno_start, rpl_gno new_gno_end)
325 {
326   DBUG_ENTER("Gtid_table_persistor::update_row");
327   int error= 0;
328   Field **fields= NULL;
329   uchar user_key[MAX_KEY_LENGTH];
330 
331   fields= table->field;
332   empty_record(table);
333 
334   /* Store SID */
335   fields[0]->set_notnull();
336   if (fields[0]->store(sid, binary_log::Uuid::TEXT_LENGTH, &my_charset_bin))
337   {
338     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[0]->field_name);
339     DBUG_RETURN(-1);
340   }
341 
342   /* Store gno_start */
343   fields[1]->set_notnull();
344   if (fields[1]->store(gno_start, true /* unsigned = true*/))
345   {
346     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[1]->field_name);
347     DBUG_RETURN(-1);
348   }
349 
350   key_copy(user_key, table->record[0], table->key_info,
351            table->key_info->key_length);
352 
353   if ((error= table->file->ha_index_init(0, 1)))
354   {
355     table->file->print_error(error, MYF(0));
356     DBUG_PRINT("info", ("ha_index_init error"));
357     goto end;
358   }
359 
360   if ((error= table->file->ha_index_read_map(table->record[0], user_key,
361                                              HA_WHOLE_KEY,
362                                              HA_READ_KEY_EXACT)))
363   {
364     DBUG_PRINT ("info", ("Row not found"));
365     goto end;
366   }
367   else
368   {
369     DBUG_PRINT("info", ("Row found"));
370     store_record(table, record[1]);
371   }
372 
373   /* Store new_gno_end */
374   fields[2]->set_notnull();
375   if ((error= fields[2]->store(new_gno_end, true /* unsigned = true*/)))
376   {
377     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[2]->field_name);
378     goto end;
379   }
380 
381   /* Update a row in the gtid_executed table. */
382   error= table->file->ha_update_row(table->record[1], table->record[0]);
383   if (DBUG_EVALUATE_IF("simulate_error_on_compress_gtid_table",
384                        (error= -1), error))
385   {
386     table->file->print_error(error, MYF(0));
387     /*
388       This makes sure that the error is -1 and not the status returned
389       by the handler.
390     */
391     goto end;
392   }
393 
394 end:
395   table->file->ha_index_end();
396   if (error)
397     DBUG_RETURN(-1);
398   else
399     DBUG_RETURN(0);
400 }
401 
402 
save(THD * thd,const Gtid * gtid)403 int Gtid_table_persistor::save(THD *thd, const Gtid *gtid)
404 {
405   DBUG_ENTER("Gtid_table_persistor::save(THD *thd, Gtid *gtid)");
406   int error= 0;
407   TABLE *table= NULL;
408   Gtid_table_access_context table_access_ctx;
409   char buf[binary_log::Uuid::TEXT_LENGTH + 1];
410 
411   /* Get source id */
412   global_sid_lock->rdlock();
413   rpl_sid sid= global_sid_map->sidno_to_sid(gtid->sidno);
414   global_sid_lock->unlock();
415   sid.to_string(buf);
416 
417   if (table_access_ctx.init(&thd, &table, true))
418   {
419     error= 1;
420     goto end;
421   }
422 
423   /* Save the gtid info into table. */
424   error= write_row(table, buf, gtid->gno, gtid->gno);
425 
426 end:
427   table_access_ctx.deinit(thd, table, 0 != error, false);
428 
429   /* Do not protect m_count for improving transactions' concurrency */
430   if (error == 0 && gtid_executed_compression_period != 0)
431   {
432     uint32 count= (uint32)m_count.atomic_add(1);
433     if (count == gtid_executed_compression_period ||
434         DBUG_EVALUATE_IF("compress_gtid_table", 1, 0))
435     {
436       mysql_mutex_lock(&LOCK_compress_gtid_table);
437       should_compress= true;
438       mysql_cond_signal(&COND_compress_gtid_table);
439       mysql_mutex_unlock(&LOCK_compress_gtid_table);
440     }
441   }
442 
443   DBUG_RETURN(error);
444 }
445 
446 
save(const Gtid_set * gtid_set)447 int Gtid_table_persistor::save(const Gtid_set *gtid_set)
448 {
449   DBUG_ENTER("Gtid_table_persistor::save(Gtid_set *gtid_set)");
450   int ret= 0;
451   int error= 0;
452   TABLE *table= NULL;
453   Gtid_table_access_context table_access_ctx;
454   THD *thd= current_thd;
455 
456   if (table_access_ctx.init(&thd, &table, true))
457   {
458     error= 1;
459     /*
460       Gtid table is not ready to be used, so failed to
461       open it. Ignore the error.
462     */
463     thd->clear_error();
464     if (!thd->get_stmt_da()->is_set())
465       thd->get_stmt_da()->set_ok_status(0, 0, NULL);
466     goto end;
467   }
468 
469   ret= error= save(table, gtid_set);
470 
471 end:
472   const int deinit_ret= table_access_ctx.deinit(thd, table, 0 != error, true);
473 
474   if (!ret && deinit_ret)
475     ret= -1;
476 
477   /* Notify compression thread to compress gtid_executed table. */
478   if (error == 0 && DBUG_EVALUATE_IF("dont_compress_gtid_table", 0, 1))
479   {
480     mysql_mutex_lock(&LOCK_compress_gtid_table);
481     should_compress= true;
482     mysql_cond_signal(&COND_compress_gtid_table);
483     mysql_mutex_unlock(&LOCK_compress_gtid_table);
484   }
485 
486   DBUG_RETURN(ret);
487 }
488 
489 
save(TABLE * table,const Gtid_set * gtid_set)490 int Gtid_table_persistor::save(TABLE *table, const Gtid_set *gtid_set)
491 {
492   DBUG_ENTER("Gtid_table_persistor::save(TABLE* table, "
493              "Gtid_set *gtid_set)");
494   int error= 0;
495   list<Gtid_interval> gtid_intervals;
496   list<Gtid_interval>::iterator iter;
497 
498   /* Get GTID intervals from gtid_set. */
499   gtid_set->get_gtid_intervals(&gtid_intervals);
500   for (iter= gtid_intervals.begin(); iter != gtid_intervals.end(); iter++)
501   {
502     /* Get source id. */
503     char buf[binary_log::Uuid::TEXT_LENGTH + 1];
504     rpl_sid sid= gtid_set->get_sid_map()->sidno_to_sid(iter->sidno);
505     sid.to_string(buf);
506 
507     /* Save the gtid interval into table. */
508     if ((error= write_row(table, buf, iter->gno_start, iter->gno_end)))
509       break;
510   }
511 
512   gtid_intervals.clear();
513   DBUG_RETURN(error);
514 }
515 
516 
517 /**
518   Simulate error and crash in the middle of the transaction
519   of compressing gtid_executed table.
520 
521   @param  thd Thread requesting to compress the table
522 
523   @return
524     @retval 0    OK.
525     @retval -1   Error.
526 */
527 #ifndef NDEBUG
dbug_test_on_compress(THD * thd)528 static int dbug_test_on_compress(THD *thd)
529 {
530   DBUG_ENTER("dbug_test_on_compress");
531   /*
532     Sleep a little, so that notified user thread executed the statement
533     completely.
534   */
535   DBUG_EXECUTE_IF("fetch_compression_thread_stage_info", sleep(5););
536   DBUG_EXECUTE_IF("fetch_compression_thread_stage_info",
537                   {
538                     const char act[]= "now signal fetch_thread_stage";
539                     assert(opt_debug_sync_timeout > 0);
540                     assert(!debug_sync_set_action(thd,
541                                                   STRING_WITH_LEN(act)));
542                   };);
543   /* Sleep a little, so that we can always fetch the correct stage info. */
544   DBUG_EXECUTE_IF("fetch_compression_thread_stage_info", sleep(1););
545 
546   /*
547     Wait until notified user thread executed the statement completely,
548     then go to crash.
549   */
550   DBUG_EXECUTE_IF("simulate_crash_on_compress_gtid_table",
551                   {
552                     const char act[]= "now wait_for notified_thread_complete";
553                     assert(opt_debug_sync_timeout > 0);
554                     assert(!debug_sync_set_action(thd,
555                                                   STRING_WITH_LEN(act)));
556                   };);
557   DBUG_EXECUTE_IF("simulate_crash_on_compress_gtid_table", DBUG_SUICIDE(););
558 
559   DBUG_RETURN(0);
560 }
561 #endif
562 
563 
compress(THD * thd)564 int Gtid_table_persistor::compress(THD *thd)
565 {
566   DBUG_ENTER("Gtid_table_persistor::compress");
567   int error= 0;
568   bool is_complete= false;
569 
570   while (!is_complete && !error)
571     error= compress_in_single_transaction(thd, is_complete);
572 
573   m_count.atomic_set(0);
574 
575   DBUG_EXECUTE_IF("compress_gtid_table",
576                   {
577                     const char act[]= "now signal complete_compression";
578                     assert(opt_debug_sync_timeout > 0);
579                     assert(!debug_sync_set_action(thd,
580                                                   STRING_WITH_LEN(act)));
581                   };);
582 
583   DBUG_RETURN(error);
584 }
585 
586 
compress_in_single_transaction(THD * thd,bool & is_complete)587 int Gtid_table_persistor::compress_in_single_transaction(THD *thd,
588                                                          bool &is_complete)
589 {
590   DBUG_ENTER("Gtid_table_persistor::compress_in_single_transaction");
591   int error= 0;
592   TABLE *table= NULL;
593   Gtid_table_access_context table_access_ctx;
594 
595   mysql_mutex_lock(&LOCK_reset_gtid_table);
596   if (table_access_ctx.init(&thd, &table, true))
597   {
598     error= 1;
599     goto end;
600   }
601 
602   /*
603     Reset stage_compressing_gtid_table to overwrite
604     stage_system_lock set in open_table(...).
605   */
606   THD_STAGE_INFO(thd, stage_compressing_gtid_table);
607 
608   if ((error= compress_first_consecutive_range(table, is_complete)))
609     goto end;
610 
611 #ifndef NDEBUG
612   error= dbug_test_on_compress(thd);
613 #endif
614 
615 end:
616   table_access_ctx.deinit(thd, table, 0 != error, true);
617   mysql_mutex_unlock(&LOCK_reset_gtid_table);
618 
619   DBUG_RETURN(error);
620 }
621 
622 
compress_first_consecutive_range(TABLE * table,bool & is_complete)623 int Gtid_table_persistor::compress_first_consecutive_range(TABLE *table,
624                                                            bool &is_complete)
625 {
626   DBUG_ENTER("Gtid_table_persistor::compress_first_consecutive_range");
627   int ret= 0;
628   int err= 0;
629   /* Record the source id of the first consecutive gtid. */
630   string sid;
631   /* Record the first GNO of the first consecutive gtid. */
632   rpl_gno gno_start= 0;
633   /* Record the last GNO of the last consecutive gtid. */
634   rpl_gno gno_end= 0;
635   /* Record the gtid interval of the current gtid. */
636   string cur_sid;
637   rpl_gno cur_gno_start= 0;
638   rpl_gno cur_gno_end= 0;
639   /*
640     Indicate if we have consecutive gtids in the table.
641     Set the flag to true if we find the first consecutive gtids.
642     The first consecutive range of gtids will be compressed if
643     the flag is true.
644   */
645   bool find_first_consecutive_gtids= false;
646 
647   if ((err= table->file->ha_index_init(0, true)))
648     DBUG_RETURN(-1);
649 
650   /* Read each row by the PK(sid, gno_start) in increasing order. */
651   err= table->file->ha_index_first(table->record[0]);
652   /* Compress the first consecutive range of gtids. */
653   while(!err)
654   {
655     get_gtid_interval(table, cur_sid, cur_gno_start, cur_gno_end);
656     /*
657       Check if gtid intervals of previous gtid and current gtid
658       are consecutive.
659     */
660     if (sid == cur_sid && gno_end + 1 == cur_gno_start)
661     {
662       find_first_consecutive_gtids= true;
663       gno_end= cur_gno_end;
664       /* Delete the consecutive gtid. We do not delete the first
665          consecutive gtid, so that we can update it later. */
666       if ((err= table->file->ha_delete_row(table->record[0])))
667       {
668         table->file->print_error(err, MYF(0));
669         break;
670       }
671     }
672     else
673     {
674       if (find_first_consecutive_gtids)
675         break;
676 
677       /* Record the gtid interval of the first consecutive gtid. */
678       sid= cur_sid;
679       gno_start= cur_gno_start;
680       gno_end= cur_gno_end;
681     }
682     err= table->file->ha_index_next(table->record[0]);
683   }
684 
685   table->file->ha_index_end();
686   /* Indicate if the gtid_executed table is compressd completely. */
687   is_complete= (err == HA_ERR_END_OF_FILE);
688 
689   if (err != HA_ERR_END_OF_FILE && err != 0)
690     ret= -1;
691   else if (find_first_consecutive_gtids)
692     /*
693       Update the gno_end of the first consecutive gtid with the gno_end of
694       the last consecutive gtid for the first consecutive range of gtids.
695     */
696     ret= update_row(table, sid.c_str(), gno_start, gno_end);
697 
698   DBUG_RETURN(ret);
699 }
700 
701 
reset(THD * thd)702 int Gtid_table_persistor::reset(THD *thd)
703 {
704   DBUG_ENTER("Gtid_table_persistor::reset");
705   int error= 0;
706   TABLE *table= NULL;
707   Gtid_table_access_context table_access_ctx;
708 
709   mysql_mutex_lock(&LOCK_reset_gtid_table);
710   if (table_access_ctx.init(&thd, &table, true))
711   {
712     error= 1;
713     goto end;
714   }
715 
716   error= delete_all(table);
717 
718 end:
719   table_access_ctx.deinit(thd, table, 0 != error, true);
720   mysql_mutex_unlock(&LOCK_reset_gtid_table);
721 
722   DBUG_RETURN(error);
723 }
724 
725 
encode_gtid_text(TABLE * table)726 string Gtid_table_persistor::encode_gtid_text(TABLE *table)
727 {
728   DBUG_ENTER("Gtid_table_persistor::encode_gtid_text");
729   char buff[MAX_FIELD_WIDTH];
730   String str(buff, sizeof(buff), &my_charset_bin);
731 
732   /* Fetch gtid interval from the table */
733   table->field[0]->val_str(&str);
734   string gtid_text(str.c_ptr_safe());
735   gtid_text.append(Gtid_set::default_string_format.sid_gno_separator);
736   table->field[1]->val_str(&str);
737   gtid_text.append(str.c_ptr_safe());
738   gtid_text.append(Gtid_set::default_string_format.gno_start_end_separator);
739   table->field[2]->val_str(&str);
740   gtid_text.append(str.c_ptr_safe());
741 
742   DBUG_RETURN(gtid_text);
743 }
744 
745 
get_gtid_interval(TABLE * table,string & sid,rpl_gno & gno_start,rpl_gno & gno_end)746 void Gtid_table_persistor::get_gtid_interval(TABLE *table, string &sid,
747                                              rpl_gno &gno_start,
748                                              rpl_gno &gno_end)
749 {
750   DBUG_ENTER("Gtid_table_persistor::get_gtid_interval");
751   char buff[MAX_FIELD_WIDTH];
752   String str(buff, sizeof(buff), &my_charset_bin);
753 
754   /* Fetch gtid interval from the table */
755   table->field[0]->val_str(&str);
756   sid= string(str.c_ptr_safe());
757   gno_start= table->field[1]->val_int();
758   gno_end= table->field[2]->val_int();
759   DBUG_VOID_RETURN;
760 }
761 
762 
fetch_gtids(Gtid_set * gtid_set)763 int Gtid_table_persistor::fetch_gtids(Gtid_set *gtid_set)
764 {
765   DBUG_ENTER("Gtid_table_persistor::fetch_gtids");
766   int ret= 0;
767   int err= 0;
768   TABLE *table= NULL;
769   Gtid_table_access_context table_access_ctx;
770   THD *thd= current_thd;
771 
772   if (table_access_ctx.init(&thd, &table, false))
773   {
774     ret= 1;
775     goto end;
776   }
777 
778   if ((err= table->file->ha_rnd_init(true)))
779   {
780     ret= -1;
781     goto end;
782   }
783 
784   while(!(err= table->file->ha_rnd_next(table->record[0])))
785   {
786     /* Store the gtid into the gtid_set */
787 
788     /**
789       @todo:
790       - take only global_sid_lock->rdlock(), and take
791         gtid_state->sid_lock for each iteration.
792       - Add wrapper around Gtid_set::add_gno_interval and call that
793         instead.
794     */
795     global_sid_lock->wrlock();
796     if (gtid_set->add_gtid_text(encode_gtid_text(table).c_str()) !=
797         RETURN_STATUS_OK)
798     {
799       global_sid_lock->unlock();
800       break;
801     }
802     global_sid_lock->unlock();
803   }
804 
805   table->file->ha_rnd_end();
806   if (err != HA_ERR_END_OF_FILE)
807     ret= -1;
808 
809 end:
810   table_access_ctx.deinit(thd, table, 0 != ret, true);
811 
812   DBUG_RETURN(ret);
813 }
814 
815 
delete_all(TABLE * table)816 int Gtid_table_persistor::delete_all(TABLE *table)
817 {
818   DBUG_ENTER("Gtid_table_persistor::delete_all");
819   int err= 0;
820 
821   if ((err= table->file->ha_rnd_init(true)))
822     DBUG_RETURN(-1);
823 
824   /*
825     Delete all rows in the gtid_executed table. We cannot use truncate(),
826     since it is a non-transactional DDL operation.
827   */
828   while(!(err= table->file->ha_rnd_next(table->record[0])))
829   {
830     /* Delete current row. */
831     err= table->file->ha_delete_row(table->record[0]);
832     if (DBUG_EVALUATE_IF("simulate_error_on_delete_gtid_from_table",
833                          (err= -1), err))
834     {
835       table->file->print_error(err, MYF(0));
836       sql_print_error("Failed to delete the row: '%s' from the gtid_executed "
837                       "table.", encode_gtid_text(table).c_str());
838       break;
839     }
840   }
841 
842   table->file->ha_rnd_end();
843   if (err != HA_ERR_END_OF_FILE)
844     DBUG_RETURN(-1);
845 
846   DBUG_RETURN(0);
847 }
848 
849 
850 /**
851   The main function of the compression thread.
852   - compress the gtid_executed table when get a compression signal.
853 
854   @param  p_thd    Thread requesting to compress the table
855 
856   @return
857       @retval 0    OK. always, the compression thread will swallow any error
858                        for going to wait for next compression signal until
859                        it is terminated.
860 */
compress_gtid_table(void * p_thd)861 extern "C" void *compress_gtid_table(void *p_thd)
862 {
863   THD *thd=(THD*) p_thd;
864   mysql_thread_set_psi_id(thd->thread_id());
865   my_thread_init();
866   DBUG_ENTER("compress_gtid_table");
867 
868   init_thd(&thd);
869   /*
870     Gtid table compression thread should ignore 'read-only' and
871     'super_read_only' options so that it can update 'mysql.gtid_executed'
872     replication repository tables.
873   */
874   thd->set_skip_readonly_check();
875   for (;;)
876   {
877     mysql_mutex_lock(&LOCK_compress_gtid_table);
878     if (terminate_compress_thread)
879       break;
880     THD_ENTER_COND(thd, &COND_compress_gtid_table,
881                    &LOCK_compress_gtid_table,
882                    &stage_suspending, NULL);
883     /* Add the check to handle spurious wakeups from system. */
884     while(!(should_compress || terminate_compress_thread))
885       mysql_cond_wait(&COND_compress_gtid_table, &LOCK_compress_gtid_table);
886     should_compress= false;
887     if (terminate_compress_thread)
888       break;
889     mysql_mutex_unlock(&LOCK_compress_gtid_table);
890     THD_EXIT_COND(thd, NULL);
891 
892     THD_STAGE_INFO(thd, stage_compressing_gtid_table);
893     /* Compressing the gtid_executed table. */
894     if (gtid_state->compress(thd))
895     {
896       sql_print_warning("Failed to compress the gtid_executed table.");
897       /* Clear the error for going to wait for next compression signal. */
898       thd->clear_error();
899       DBUG_EXECUTE_IF("simulate_error_on_compress_gtid_table",
900                       {
901                         const char act[]= "now signal compression_failed";
902                         assert(opt_debug_sync_timeout > 0);
903                         assert(!debug_sync_set_action(current_thd,
904                                                       STRING_WITH_LEN(act)));
905                       };);
906     }
907   }
908 
909   mysql_mutex_unlock(&LOCK_compress_gtid_table);
910   thd->reset_skip_readonly_check();
911   deinit_thd(thd);
912   DBUG_LEAVE;
913   my_thread_end();
914   my_thread_exit(0);
915   return 0;
916 }
917 
918 
919 /**
920   Create the compression thread to compress gtid_executed table.
921 */
create_compress_gtid_table_thread()922 void create_compress_gtid_table_thread()
923 {
924   my_thread_attr_t attr;
925   int error;
926   THD *thd;
927   if (!(thd= new THD))
928   {
929     sql_print_error("Failed to compress the gtid_executed table, because "
930                     "it is failed to allocate the THD.");
931     return;
932   }
933 
934   thd->set_new_thread_id();
935 
936   THD_CHECK_SENTRY(thd);
937 
938   if (my_thread_attr_init(&attr))
939   {
940     sql_print_error("Failed to initialize thread attribute "
941                     "when creating compression thread.");
942     delete thd;
943     return;
944   }
945 
946   if (DBUG_EVALUATE_IF("simulate_create_compress_thread_failure",
947                        error= 1, 0) ||
948 #ifndef _WIN32
949       (error= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)) ||
950 #endif
951       (error= mysql_thread_create(key_thread_compress_gtid_table,
952                                   &compress_thread_id, &attr,
953                                   compress_gtid_table, (void*) thd)))
954   {
955     sql_print_error("Can not create thread to compress gtid_executed table "
956                     "(errno= %d)", error);
957     /* Delete the created THD after failed to create a compression thread. */
958     delete thd;
959   }
960 
961   (void) my_thread_attr_destroy(&attr);
962 }
963 
964 
965 /**
966   Terminate the compression thread.
967 */
terminate_compress_gtid_table_thread()968 void terminate_compress_gtid_table_thread()
969 {
970   DBUG_ENTER("terminate_compress_gtid_table_thread");
971   int error= 0;
972 
973   /* Notify suspended compression thread. */
974   mysql_mutex_lock(&LOCK_compress_gtid_table);
975   terminate_compress_thread= true;
976   mysql_cond_signal(&COND_compress_gtid_table);
977   mysql_mutex_unlock(&LOCK_compress_gtid_table);
978 
979   if (compress_thread_id.thread != 0)
980   {
981     error= my_thread_join(&compress_thread_id, NULL);
982     compress_thread_id.thread= 0;
983   }
984 
985   if (error != 0)
986     sql_print_warning("Could not join gtid_executed table compression thread. "
987                       "error:%d", error);
988 
989   DBUG_VOID_RETURN;
990 }
991 
992