1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22    02110-1301 USA */
23 
24 #include "rpl_gtid_persist.h"
25 
26 #include "debug_sync.h"       // debug_sync_set_action
27 #include "log.h"              // sql_print_error
28 #include "replication.h"      // THD_ENTER_COND
29 #include "sql_base.h"         // MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK
30 #include "sql_parse.h"        // mysql_reset_thd_for_next_command
31 
32 using std::list;
33 using std::string;
34 
35 
36 my_thread_handle compress_thread_id;
37 static bool terminate_compress_thread= false;
38 static bool should_compress= false;
39 const LEX_STRING Gtid_table_access_context::TABLE_NAME= {C_STRING_WITH_LEN("gtid_executed")};
40 const LEX_STRING Gtid_table_access_context::DB_NAME= {C_STRING_WITH_LEN("mysql")};
41 
42 /**
43   A derived from THD::Attachable_trx class allows updates in
44   the attachable transaction. Callers of the class methods must
45   make sure the attachable_rw won't cause deadlock with the main transaction.
46   The destructor does not invoke ha_commit_{stmt,trans} nor ha_rollback_trans
47   on purpose.
48   Burden to terminate the read-write instance also lies on the caller!
49   In order to use this interface it *MUST* prove that no side effect to
50   the global transaction state can be inflicted by a chosen method.
51 */
52 
53 class THD::Attachable_trx_rw : public THD::Attachable_trx
54 {
55 public:
is_read_only() const56   bool is_read_only() const { return false; }
Attachable_trx_rw(THD * thd)57   Attachable_trx_rw(THD *thd) : THD::Attachable_trx(thd)
58   {
59     m_thd->tx_read_only= false;
60     m_thd->lex->sql_command= SQLCOM_END;
61     m_xa_state_saved= m_thd->get_transaction()->xid_state()->get_state();
62     thd->get_transaction()->xid_state()->set_state(XID_STATE::XA_NOTR);
63   }
~Attachable_trx_rw()64   ~Attachable_trx_rw()
65   {
66     /* The attachable transaction has been already committed */
67     assert(!m_thd->get_transaction()->is_active(Transaction_ctx::STMT)
68            && !m_thd->get_transaction()->is_active(Transaction_ctx::SESSION));
69 
70     m_thd->get_transaction()->xid_state()->set_state(m_xa_state_saved);
71     m_thd->tx_read_only= true;
72   }
73 
74 private:
75   XID_STATE::xa_states m_xa_state_saved;
76   Attachable_trx_rw(const Attachable_trx_rw &);
77   Attachable_trx_rw &operator =(const Attachable_trx_rw &);
78 };
79 
80 
is_attachable_rw_transaction_active() const81 bool THD::is_attachable_rw_transaction_active() const
82 {
83   return m_attachable_trx != NULL && !m_attachable_trx->is_read_only();
84 }
85 
86 
begin_attachable_rw_transaction()87 void THD::begin_attachable_rw_transaction()
88 {
89   assert(!m_attachable_trx);
90 
91   m_attachable_trx= new Attachable_trx_rw(this);
92 }
93 
94 
95 /**
96   Initialize a new THD.
97 
98   @param p_thd  Pointer to pointer to thread structure
99 */
init_thd(THD ** p_thd)100 static void init_thd(THD **p_thd)
101 {
102   DBUG_ENTER("init_thd");
103   THD *thd= *p_thd;
104   thd->thread_stack= reinterpret_cast<char *>(p_thd);
105   thd->set_command(COM_DAEMON);
106   thd->security_context()->skip_grants();
107   thd->system_thread= SYSTEM_THREAD_COMPRESS_GTID_TABLE;
108   thd->store_globals();
109   thd->set_time();
110 #ifdef WITH_WSREP
111   WSREP_DEBUG("SYSTEM_THREAD_COMPRESS_GTID_TABLE declared as non wsrep_on");
112   thd->variables.wsrep_on= 0;
113 #endif /* WITH_WSREP */
114   DBUG_VOID_RETURN;
115 }
116 
117 
118 /**
119   Release resourses for the thread and restores the
120   system_thread information.
121 
122   @param thd Thread requesting to be destroyed
123 */
deinit_thd(THD * thd)124 static void deinit_thd(THD *thd)
125 {
126   DBUG_ENTER("deinit_thd");
127   thd->release_resources();
128   thd->restore_globals();
129   delete thd;
130   my_thread_set_THR_THD(NULL);
131   DBUG_VOID_RETURN;
132 }
133 
134 
create_thd()135 THD *Gtid_table_access_context::create_thd()
136 {
137   THD *thd= System_table_access::create_thd();
138   thd->system_thread= SYSTEM_THREAD_COMPRESS_GTID_TABLE;
139 #ifdef WITH_WSREP
140   WSREP_DEBUG("SYSTEM_THREAD_COMPRESS_GTID_TABLE declared  as non wsrep_on");
141   thd->variables.wsrep_on= 0;
142 #endif /* WITH_WSREP */
143   /*
144     This is equivalent to a new "statement". For that reason, we call
145     both lex_start() and mysql_reset_thd_for_next_command.
146   */
147   lex_start(thd);
148   mysql_reset_thd_for_next_command(thd);
149   thd->set_skip_readonly_check();
150   return(thd);
151 }
152 
drop_thd(THD * thd)153 void Gtid_table_access_context::drop_thd(THD *thd)
154 {
155   thd->reset_skip_readonly_check();
156   System_table_access::drop_thd(thd);
157 }
158 
before_open(THD * thd)159 void Gtid_table_access_context::before_open(THD* thd)
160 {
161   DBUG_ENTER("Gtid_table_access_context::before_open");
162   /*
163     Allow to operate the gtid_executed table
164     while disconnecting the session.
165   */
166   m_flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
167             MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
168             MYSQL_OPEN_IGNORE_FLUSH |
169             MYSQL_LOCK_IGNORE_TIMEOUT |
170             MYSQL_OPEN_IGNORE_KILLED);
171   DBUG_VOID_RETURN;
172 }
173 
174 
init(THD ** thd,TABLE ** table,bool is_write)175 bool Gtid_table_access_context::init(THD **thd, TABLE **table, bool is_write)
176 {
177   DBUG_ENTER("Gtid_table_access_context::init");
178 
179   if (!(*thd))
180     *thd= m_drop_thd_object= this->create_thd();
181   m_is_write= is_write;
182   if (m_is_write)
183   {
184     /* Disable binlog temporarily */
185     m_tmp_disable_binlog__save_options= (*thd)->variables.option_bits;
186     (*thd)->variables.option_bits&= ~OPTION_BIN_LOG;
187   }
188 
189   if (!(*thd)->get_transaction()->xid_state()->has_state(XID_STATE::XA_NOTR))
190   {
191     /*
192       This type of caller of Attachable_trx_rw is deadlock-free with
193       the main transaction thanks to rejection to update
194       'mysql.gtid_executed' by XA main transaction.
195     */
196     assert((*thd)->get_transaction()->xid_state()->
197            has_state(XID_STATE::XA_IDLE) ||
198            (*thd)->get_transaction()->xid_state()->
199            has_state(XID_STATE::XA_PREPARED));
200 
201     (*thd)->begin_attachable_rw_transaction();
202   }
203 
204   (*thd)->is_operating_gtid_table_implicitly= true;
205   bool ret= this->open_table(*thd, DB_NAME, TABLE_NAME,
206                              Gtid_table_persistor::number_fields,
207                              m_is_write ? TL_WRITE : TL_READ,
208                              table, &m_backup);
209 
210   DBUG_RETURN(ret);
211 }
212 
213 
deinit(THD * thd,TABLE * table,bool error,bool need_commit)214 bool Gtid_table_access_context::deinit(THD *thd, TABLE *table,
215                                        bool error, bool need_commit)
216 {
217   DBUG_ENTER("Gtid_table_access_context::deinit");
218 
219   bool err;
220   err= this->close_table(thd, table, &m_backup, 0 != error, need_commit);
221 
222   /*
223     If err is true this means that there was some problem during
224     FLUSH LOGS commit phase.
225   */
226   if (err)
227   {
228     my_printf_error(ER_ERROR_DURING_FLUSH_LOGS, ER(ER_ERROR_DURING_FLUSH_LOGS),
229                     MYF(ME_FATALERROR), err);
230     sql_print_error(ER(ER_ERROR_DURING_FLUSH_LOGS), err);
231     DBUG_RETURN(err);
232   }
233 
234   /*
235     If Gtid is inserted through Attachable_trx_rw its has been done
236     in the above close_table() through ha_commit_trans().
237     It does not have any side effect to the global transaction state
238     as the only vulnerable part there relates to gtid (and is blocked
239     from recursive invocation).
240   */
241   if (thd->is_attachable_rw_transaction_active())
242     thd->end_attachable_transaction();
243 
244   thd->is_operating_gtid_table_implicitly= false;
245   /* Reenable binlog */
246   if (m_is_write)
247     thd->variables.option_bits= m_tmp_disable_binlog__save_options;
248   if (m_drop_thd_object)
249     this->drop_thd(m_drop_thd_object);
250 
251   DBUG_RETURN(err);
252 }
253 
254 
fill_fields(Field ** fields,const char * sid,rpl_gno gno_start,rpl_gno gno_end)255 int Gtid_table_persistor::fill_fields(Field **fields, const char *sid,
256                                       rpl_gno gno_start, rpl_gno gno_end)
257 {
258   DBUG_ENTER("Gtid_table_persistor::fill_field");
259 
260   /* Store SID */
261   fields[0]->set_notnull();
262   if (fields[0]->store(sid, binary_log::Uuid::TEXT_LENGTH, &my_charset_bin))
263   {
264     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[0]->field_name);
265     goto err;
266   }
267 
268   /* Store gno_start */
269   fields[1]->set_notnull();
270   if (fields[1]->store(gno_start, true /* unsigned = true*/))
271   {
272     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[1]->field_name);
273     goto err;
274   }
275 
276   /* Store gno_end */
277   fields[2]->set_notnull();
278   if (fields[2]->store(gno_end, true /* unsigned = true*/))
279   {
280     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[2]->field_name);
281     goto err;
282   }
283 
284   DBUG_RETURN(0);
285 err:
286   DBUG_RETURN(-1);
287 }
288 
289 
write_row(TABLE * table,const char * sid,rpl_gno gno_start,rpl_gno gno_end)290 int Gtid_table_persistor::write_row(TABLE *table, const char *sid,
291                                     rpl_gno gno_start, rpl_gno gno_end)
292 {
293   DBUG_ENTER("Gtid_table_persistor::write_row");
294   int error= 0;
295   Field **fields= NULL;
296 
297   fields= table->field;
298   empty_record(table);
299 
300   if (fill_fields(fields, sid, gno_start, gno_end))
301     DBUG_RETURN(-1);
302 
303   /* Inserts a new row into the gtid_executed table. */
304   error= table->file->ha_write_row(table->record[0]);
305   if (DBUG_EVALUATE_IF("simulate_err_on_write_gtid_into_table",
306                        (error= -1), error))
307   {
308     if (error == HA_ERR_FOUND_DUPP_KEY)
309     {
310       /* Ignore the duplicate key error, log a warning for it. */
311       sql_print_warning("The transaction owned GTID is already in "
312                         "the %s table, which is caused by an "
313                         "explicit modifying from user client.",
314                         Gtid_table_access_context::TABLE_NAME.str);
315     }
316     else
317     {
318       table->file->print_error(error, MYF(0));
319       /*
320         This makes sure that the error is -1 and not the status
321         returned by the handler.
322       */
323       DBUG_RETURN(-1);
324     }
325   }
326 
327   DBUG_RETURN(0);
328 }
329 
330 
update_row(TABLE * table,const char * sid,rpl_gno gno_start,rpl_gno new_gno_end)331 int Gtid_table_persistor::update_row(TABLE *table, const char *sid,
332                                      rpl_gno gno_start, rpl_gno new_gno_end)
333 {
334   DBUG_ENTER("Gtid_table_persistor::update_row");
335   int error= 0;
336   Field **fields= NULL;
337   uchar user_key[MAX_KEY_LENGTH];
338 
339   fields= table->field;
340   empty_record(table);
341 
342   /* Store SID */
343   fields[0]->set_notnull();
344   if (fields[0]->store(sid, binary_log::Uuid::TEXT_LENGTH, &my_charset_bin))
345   {
346     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[0]->field_name);
347     DBUG_RETURN(-1);
348   }
349 
350   /* Store gno_start */
351   fields[1]->set_notnull();
352   if (fields[1]->store(gno_start, true /* unsigned = true*/))
353   {
354     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[1]->field_name);
355     DBUG_RETURN(-1);
356   }
357 
358   key_copy(user_key, table->record[0], table->key_info,
359            table->key_info->key_length);
360 
361   if ((error= table->file->ha_index_init(0, 1)))
362   {
363     table->file->print_error(error, MYF(0));
364     DBUG_PRINT("info", ("ha_index_init error"));
365     goto end;
366   }
367 
368   if ((error= table->file->ha_index_read_map(table->record[0], user_key,
369                                              HA_WHOLE_KEY,
370                                              HA_READ_KEY_EXACT)))
371   {
372     DBUG_PRINT ("info", ("Row not found"));
373     goto end;
374   }
375   else
376   {
377     DBUG_PRINT("info", ("Row found"));
378     store_record(table, record[1]);
379   }
380 
381   /* Store new_gno_end */
382   fields[2]->set_notnull();
383   if ((error= fields[2]->store(new_gno_end, true /* unsigned = true*/)))
384   {
385     my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0), fields[2]->field_name);
386     goto end;
387   }
388 
389   /* Update a row in the gtid_executed table. */
390   error= table->file->ha_update_row(table->record[1], table->record[0]);
391   if (DBUG_EVALUATE_IF("simulate_error_on_compress_gtid_table",
392                        (error= -1), error))
393   {
394     table->file->print_error(error, MYF(0));
395     /*
396       This makes sure that the error is -1 and not the status returned
397       by the handler.
398     */
399     goto end;
400   }
401 
402 end:
403   table->file->ha_index_end();
404   if (error)
405     DBUG_RETURN(-1);
406   else
407     DBUG_RETURN(0);
408 }
409 
410 
save(THD * thd,const Gtid * gtid)411 int Gtid_table_persistor::save(THD *thd, const Gtid *gtid)
412 {
413   DBUG_ENTER("Gtid_table_persistor::save(THD *thd, Gtid *gtid)");
414   int error= 0;
415   TABLE *table= NULL;
416   Gtid_table_access_context table_access_ctx;
417   char buf[binary_log::Uuid::TEXT_LENGTH + 1];
418 
419   /* Get source id */
420   global_sid_lock->rdlock();
421   rpl_sid sid= global_sid_map->sidno_to_sid(gtid->sidno);
422   global_sid_lock->unlock();
423   sid.to_string(buf);
424 
425   if (table_access_ctx.init(&thd, &table, true))
426   {
427     error= 1;
428     goto end;
429   }
430 
431   /* Save the gtid info into table. */
432   error= write_row(table, buf, gtid->gno, gtid->gno);
433 
434 end:
435   table_access_ctx.deinit(thd, table, 0 != error, false);
436 
437   /* Do not protect m_count for improving transactions' concurrency */
438   if (error == 0 && gtid_executed_compression_period != 0)
439   {
440     uint32 count= (uint32)m_count.atomic_add(1);
441     if (count == gtid_executed_compression_period ||
442         DBUG_EVALUATE_IF("compress_gtid_table", 1, 0))
443     {
444       mysql_mutex_lock(&LOCK_compress_gtid_table);
445       should_compress= true;
446       mysql_cond_signal(&COND_compress_gtid_table);
447       mysql_mutex_unlock(&LOCK_compress_gtid_table);
448     }
449   }
450 
451   DBUG_RETURN(error);
452 }
453 
454 
save(const Gtid_set * gtid_set)455 int Gtid_table_persistor::save(const Gtid_set *gtid_set)
456 {
457   DBUG_ENTER("Gtid_table_persistor::save(Gtid_set *gtid_set)");
458   int ret= 0;
459   int error= 0;
460   TABLE *table= NULL;
461   Gtid_table_access_context table_access_ctx;
462   THD *thd= current_thd;
463 
464   if (table_access_ctx.init(&thd, &table, true))
465   {
466     error= 1;
467     /*
468       Gtid table is not ready to be used, so failed to
469       open it. Ignore the error.
470     */
471     thd->clear_error();
472     if (!thd->get_stmt_da()->is_set())
473       thd->get_stmt_da()->set_ok_status(0, 0, NULL);
474     goto end;
475   }
476 
477   ret= error= save(table, gtid_set);
478 
479 end:
480   const int deinit_ret= table_access_ctx.deinit(thd, table, 0 != error, true);
481 
482   if (!ret && deinit_ret)
483     ret= -1;
484 
485   /* Notify compression thread to compress gtid_executed table. */
486   if (error == 0 && DBUG_EVALUATE_IF("dont_compress_gtid_table", 0, 1))
487   {
488     mysql_mutex_lock(&LOCK_compress_gtid_table);
489     should_compress= true;
490     mysql_cond_signal(&COND_compress_gtid_table);
491     mysql_mutex_unlock(&LOCK_compress_gtid_table);
492   }
493 
494   DBUG_RETURN(ret);
495 }
496 
497 
save(TABLE * table,const Gtid_set * gtid_set)498 int Gtid_table_persistor::save(TABLE *table, const Gtid_set *gtid_set)
499 {
500   DBUG_ENTER("Gtid_table_persistor::save(TABLE* table, "
501              "Gtid_set *gtid_set)");
502   int error= 0;
503   list<Gtid_interval> gtid_intervals;
504   list<Gtid_interval>::iterator iter;
505 
506   /* Get GTID intervals from gtid_set. */
507   gtid_set->get_gtid_intervals(&gtid_intervals);
508   for (iter= gtid_intervals.begin(); iter != gtid_intervals.end(); iter++)
509   {
510     /* Get source id. */
511     char buf[binary_log::Uuid::TEXT_LENGTH + 1];
512     rpl_sid sid= gtid_set->get_sid_map()->sidno_to_sid(iter->sidno);
513     sid.to_string(buf);
514 
515     /* Save the gtid interval into table. */
516     if ((error= write_row(table, buf, iter->gno_start, iter->gno_end)))
517       break;
518   }
519 
520   gtid_intervals.clear();
521   DBUG_RETURN(error);
522 }
523 
524 
525 /**
526   Simulate error and crash in the middle of the transaction
527   of compressing gtid_executed table.
528 
529   @param  thd Thread requesting to compress the table
530 
531   @return
532     @retval 0    OK.
533     @retval -1   Error.
534 */
535 #ifndef NDEBUG
dbug_test_on_compress(THD * thd)536 static int dbug_test_on_compress(THD *thd)
537 {
538   DBUG_ENTER("dbug_test_on_compress");
539   /*
540     Sleep a little, so that notified user thread executed the statement
541     completely.
542   */
543   DBUG_EXECUTE_IF("fetch_compression_thread_stage_info", sleep(5););
544   DBUG_EXECUTE_IF("fetch_compression_thread_stage_info",
545                   {
546                     const char act[]= "now signal fetch_thread_stage";
547                     assert(opt_debug_sync_timeout > 0);
548                     assert(!debug_sync_set_action(thd,
549                                                   STRING_WITH_LEN(act)));
550                   };);
551   /* Sleep a little, so that we can always fetch the correct stage info. */
552   DBUG_EXECUTE_IF("fetch_compression_thread_stage_info", sleep(1););
553 
554   /*
555     Wait until notified user thread executed the statement completely,
556     then go to crash.
557   */
558   DBUG_EXECUTE_IF("simulate_crash_on_compress_gtid_table",
559                   {
560                     const char act[]= "now wait_for notified_thread_complete";
561                     assert(opt_debug_sync_timeout > 0);
562                     assert(!debug_sync_set_action(thd,
563                                                   STRING_WITH_LEN(act)));
564                   };);
565   DBUG_EXECUTE_IF("simulate_crash_on_compress_gtid_table", DBUG_SUICIDE(););
566 
567   DBUG_RETURN(0);
568 }
569 #endif
570 
571 
compress(THD * thd)572 int Gtid_table_persistor::compress(THD *thd)
573 {
574   DBUG_ENTER("Gtid_table_persistor::compress");
575   int error= 0;
576   bool is_complete= false;
577 
578   while (!is_complete && !error)
579     error= compress_in_single_transaction(thd, is_complete);
580 
581   m_count.atomic_set(0);
582 
583   DBUG_EXECUTE_IF("compress_gtid_table",
584                   {
585                     const char act[]= "now signal complete_compression";
586                     assert(opt_debug_sync_timeout > 0);
587                     assert(!debug_sync_set_action(thd,
588                                                   STRING_WITH_LEN(act)));
589                   };);
590 
591   DBUG_RETURN(error);
592 }
593 
594 
compress_in_single_transaction(THD * thd,bool & is_complete)595 int Gtid_table_persistor::compress_in_single_transaction(THD *thd,
596                                                          bool &is_complete)
597 {
598   DBUG_ENTER("Gtid_table_persistor::compress_in_single_transaction");
599   int error= 0;
600   TABLE *table= NULL;
601   Gtid_table_access_context table_access_ctx;
602 
603   mysql_mutex_lock(&LOCK_reset_gtid_table);
604   if (table_access_ctx.init(&thd, &table, true))
605   {
606     error= 1;
607     goto end;
608   }
609 
610   /*
611     Reset stage_compressing_gtid_table to overwrite
612     stage_system_lock set in open_table(...).
613   */
614   THD_STAGE_INFO(thd, stage_compressing_gtid_table);
615 
616   if ((error= compress_first_consecutive_range(table, is_complete)))
617     goto end;
618 
619 #ifndef NDEBUG
620   error= dbug_test_on_compress(thd);
621 #endif
622 
623 end:
624   table_access_ctx.deinit(thd, table, 0 != error, true);
625   mysql_mutex_unlock(&LOCK_reset_gtid_table);
626 
627   DBUG_RETURN(error);
628 }
629 
630 
compress_first_consecutive_range(TABLE * table,bool & is_complete)631 int Gtid_table_persistor::compress_first_consecutive_range(TABLE *table,
632                                                            bool &is_complete)
633 {
634   DBUG_ENTER("Gtid_table_persistor::compress_first_consecutive_range");
635   int ret= 0;
636   int err= 0;
637   /* Record the source id of the first consecutive gtid. */
638   string sid;
639   /* Record the first GNO of the first consecutive gtid. */
640   rpl_gno gno_start= 0;
641   /* Record the last GNO of the last consecutive gtid. */
642   rpl_gno gno_end= 0;
643   /* Record the gtid interval of the current gtid. */
644   string cur_sid;
645   rpl_gno cur_gno_start= 0;
646   rpl_gno cur_gno_end= 0;
647   /*
648     Indicate if we have consecutive gtids in the table.
649     Set the flag to true if we find the first consecutive gtids.
650     The first consecutive range of gtids will be compressed if
651     the flag is true.
652   */
653   bool find_first_consecutive_gtids= false;
654 
655   if ((err= table->file->ha_index_init(0, true)))
656     DBUG_RETURN(-1);
657 
658   /* Read each row by the PK(sid, gno_start) in increasing order. */
659   err= table->file->ha_index_first(table->record[0]);
660   /* Compress the first consecutive range of gtids. */
661   while(!err)
662   {
663     get_gtid_interval(table, cur_sid, cur_gno_start, cur_gno_end);
664     /*
665       Check if gtid intervals of previous gtid and current gtid
666       are consecutive.
667     */
668     if (sid == cur_sid && gno_end + 1 == cur_gno_start)
669     {
670       find_first_consecutive_gtids= true;
671       gno_end= cur_gno_end;
672       /* Delete the consecutive gtid. We do not delete the first
673          consecutive gtid, so that we can update it later. */
674       if ((err= table->file->ha_delete_row(table->record[0])))
675       {
676         table->file->print_error(err, MYF(0));
677         break;
678       }
679     }
680     else
681     {
682       if (find_first_consecutive_gtids)
683         break;
684 
685       /* Record the gtid interval of the first consecutive gtid. */
686       sid= cur_sid;
687       gno_start= cur_gno_start;
688       gno_end= cur_gno_end;
689     }
690     err= table->file->ha_index_next(table->record[0]);
691   }
692 
693   table->file->ha_index_end();
694   /* Indicate if the gtid_executed table is compressd completely. */
695   is_complete= (err == HA_ERR_END_OF_FILE);
696 
697   if (err != HA_ERR_END_OF_FILE && err != 0)
698     ret= -1;
699   else if (find_first_consecutive_gtids)
700     /*
701       Update the gno_end of the first consecutive gtid with the gno_end of
702       the last consecutive gtid for the first consecutive range of gtids.
703     */
704     ret= update_row(table, sid.c_str(), gno_start, gno_end);
705 
706   DBUG_RETURN(ret);
707 }
708 
709 
reset(THD * thd)710 int Gtid_table_persistor::reset(THD *thd)
711 {
712   DBUG_ENTER("Gtid_table_persistor::reset");
713   int error= 0;
714   TABLE *table= NULL;
715   Gtid_table_access_context table_access_ctx;
716 
717   mysql_mutex_lock(&LOCK_reset_gtid_table);
718   if (table_access_ctx.init(&thd, &table, true))
719   {
720     error= 1;
721     goto end;
722   }
723 
724   error= delete_all(table);
725 
726 end:
727   table_access_ctx.deinit(thd, table, 0 != error, true);
728   mysql_mutex_unlock(&LOCK_reset_gtid_table);
729 
730   DBUG_RETURN(error);
731 }
732 
733 
encode_gtid_text(TABLE * table)734 string Gtid_table_persistor::encode_gtid_text(TABLE *table)
735 {
736   DBUG_ENTER("Gtid_table_persistor::encode_gtid_text");
737   char buff[MAX_FIELD_WIDTH];
738   String str(buff, sizeof(buff), &my_charset_bin);
739 
740   /* Fetch gtid interval from the table */
741   table->field[0]->val_str(&str);
742   string gtid_text(str.c_ptr_safe());
743   gtid_text.append(Gtid_set::default_string_format.sid_gno_separator);
744   table->field[1]->val_str(&str);
745   gtid_text.append(str.c_ptr_safe());
746   gtid_text.append(Gtid_set::default_string_format.gno_start_end_separator);
747   table->field[2]->val_str(&str);
748   gtid_text.append(str.c_ptr_safe());
749 
750   DBUG_RETURN(gtid_text);
751 }
752 
753 
get_gtid_interval(TABLE * table,string & sid,rpl_gno & gno_start,rpl_gno & gno_end)754 void Gtid_table_persistor::get_gtid_interval(TABLE *table, string &sid,
755                                              rpl_gno &gno_start,
756                                              rpl_gno &gno_end)
757 {
758   DBUG_ENTER("Gtid_table_persistor::get_gtid_interval");
759   char buff[MAX_FIELD_WIDTH];
760   String str(buff, sizeof(buff), &my_charset_bin);
761 
762   /* Fetch gtid interval from the table */
763   table->field[0]->val_str(&str);
764   sid= string(str.c_ptr_safe());
765   gno_start= table->field[1]->val_int();
766   gno_end= table->field[2]->val_int();
767   DBUG_VOID_RETURN;
768 }
769 
770 
fetch_gtids(Gtid_set * gtid_set)771 int Gtid_table_persistor::fetch_gtids(Gtid_set *gtid_set)
772 {
773   DBUG_ENTER("Gtid_table_persistor::fetch_gtids");
774   int ret= 0;
775   int err= 0;
776   TABLE *table= NULL;
777   Gtid_table_access_context table_access_ctx;
778   THD *thd= current_thd;
779 
780   if (table_access_ctx.init(&thd, &table, false))
781   {
782     ret= 1;
783     goto end;
784   }
785 
786   if ((err= table->file->ha_rnd_init(true)))
787   {
788     ret= -1;
789     goto end;
790   }
791 
792   while(!(err= table->file->ha_rnd_next(table->record[0])))
793   {
794     /* Store the gtid into the gtid_set */
795 
796     /**
797       @todo:
798       - take only global_sid_lock->rdlock(), and take
799         gtid_state->sid_lock for each iteration.
800       - Add wrapper around Gtid_set::add_gno_interval and call that
801         instead.
802     */
803     global_sid_lock->wrlock();
804     if (gtid_set->add_gtid_text(encode_gtid_text(table).c_str()) !=
805         RETURN_STATUS_OK)
806     {
807       global_sid_lock->unlock();
808       break;
809     }
810     global_sid_lock->unlock();
811   }
812 
813   table->file->ha_rnd_end();
814   if (err != HA_ERR_END_OF_FILE)
815     ret= -1;
816 
817 end:
818   table_access_ctx.deinit(thd, table, 0 != ret, true);
819 
820   DBUG_RETURN(ret);
821 }
822 
823 
delete_all(TABLE * table)824 int Gtid_table_persistor::delete_all(TABLE *table)
825 {
826   DBUG_ENTER("Gtid_table_persistor::delete_all");
827   int err= 0;
828 
829   if ((err= table->file->ha_rnd_init(true)))
830     DBUG_RETURN(-1);
831 
832   /*
833     Delete all rows in the gtid_executed table. We cannot use truncate(),
834     since it is a non-transactional DDL operation.
835   */
836   while(!(err= table->file->ha_rnd_next(table->record[0])))
837   {
838     /* Delete current row. */
839     err= table->file->ha_delete_row(table->record[0]);
840     if (DBUG_EVALUATE_IF("simulate_error_on_delete_gtid_from_table",
841                          (err= -1), err))
842     {
843       table->file->print_error(err, MYF(0));
844       sql_print_error("Failed to delete the row: '%s' from the gtid_executed "
845                       "table.", encode_gtid_text(table).c_str());
846       break;
847     }
848   }
849 
850   table->file->ha_rnd_end();
851   if (err != HA_ERR_END_OF_FILE)
852     DBUG_RETURN(-1);
853 
854   DBUG_RETURN(0);
855 }
856 
857 
858 /**
859   The main function of the compression thread.
860   - compress the gtid_executed table when get a compression signal.
861 
862   @param  p_thd    Thread requesting to compress the table
863 
864   @return
865       @retval 0    OK. always, the compression thread will swallow any error
866                        for going to wait for next compression signal until
867                        it is terminated.
868 */
compress_gtid_table(void * p_thd)869 extern "C" void *compress_gtid_table(void *p_thd)
870 {
871   THD *thd=(THD*) p_thd;
872   mysql_thread_set_psi_id(thd->thread_id());
873   my_thread_init();
874   DBUG_ENTER("compress_gtid_table");
875 
876   init_thd(&thd);
877   /*
878     Gtid table compression thread should ignore 'read-only' and
879     'super_read_only' options so that it can update 'mysql.gtid_executed'
880     replication repository tables.
881   */
882   thd->set_skip_readonly_check();
883   for (;;)
884   {
885     mysql_mutex_lock(&LOCK_compress_gtid_table);
886     if (terminate_compress_thread)
887       break;
888     THD_ENTER_COND(thd, &COND_compress_gtid_table,
889                    &LOCK_compress_gtid_table,
890                    &stage_suspending, NULL);
891     /* Add the check to handle spurious wakeups from system. */
892     while(!(should_compress || terminate_compress_thread))
893       mysql_cond_wait(&COND_compress_gtid_table, &LOCK_compress_gtid_table);
894     should_compress= false;
895     if (terminate_compress_thread)
896       break;
897     mysql_mutex_unlock(&LOCK_compress_gtid_table);
898     THD_EXIT_COND(thd, NULL);
899 
900     THD_STAGE_INFO(thd, stage_compressing_gtid_table);
901     /* Compressing the gtid_executed table. */
902     if (gtid_state->compress(thd))
903     {
904       sql_print_warning("Failed to compress the gtid_executed table.");
905       /* Clear the error for going to wait for next compression signal. */
906       thd->clear_error();
907       DBUG_EXECUTE_IF("simulate_error_on_compress_gtid_table",
908                       {
909                         const char act[]= "now signal compression_failed";
910                         assert(opt_debug_sync_timeout > 0);
911                         assert(!debug_sync_set_action(current_thd,
912                                                       STRING_WITH_LEN(act)));
913                       };);
914     }
915   }
916 
917   mysql_mutex_unlock(&LOCK_compress_gtid_table);
918   thd->reset_skip_readonly_check();
919   deinit_thd(thd);
920   DBUG_LEAVE;
921   my_thread_end();
922   my_thread_exit(0);
923   return 0;
924 }
925 
926 
927 /**
928   Create the compression thread to compress gtid_executed table.
929 */
create_compress_gtid_table_thread()930 void create_compress_gtid_table_thread()
931 {
932   my_thread_attr_t attr;
933   int error;
934   THD *thd;
935   if (!(thd= new THD))
936   {
937     sql_print_error("Failed to compress the gtid_executed table, because "
938                     "it is failed to allocate the THD.");
939     return;
940   }
941 
942   thd->set_new_thread_id();
943 
944   THD_CHECK_SENTRY(thd);
945 
946   if (my_thread_attr_init(&attr))
947   {
948     sql_print_error("Failed to initialize thread attribute "
949                     "when creating compression thread.");
950     delete thd;
951     return;
952   }
953 
954   if (DBUG_EVALUATE_IF("simulate_create_compress_thread_failure",
955                        error= 1, 0) ||
956 #ifndef _WIN32
957       (error= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)) ||
958 #endif
959       (error= mysql_thread_create(key_thread_compress_gtid_table,
960                                   &compress_thread_id, &attr,
961                                   compress_gtid_table, (void*) thd)))
962   {
963     sql_print_error("Can not create thread to compress gtid_executed table "
964                     "(errno= %d)", error);
965     /* Delete the created THD after failed to create a compression thread. */
966     delete thd;
967   }
968 
969   (void) my_thread_attr_destroy(&attr);
970 }
971 
972 
973 /**
974   Terminate the compression thread.
975 */
terminate_compress_gtid_table_thread()976 void terminate_compress_gtid_table_thread()
977 {
978   DBUG_ENTER("terminate_compress_gtid_table_thread");
979   int error= 0;
980 
981   /* Notify suspended compression thread. */
982   mysql_mutex_lock(&LOCK_compress_gtid_table);
983   terminate_compress_thread= true;
984   mysql_cond_signal(&COND_compress_gtid_table);
985   mysql_mutex_unlock(&LOCK_compress_gtid_table);
986 
987   if (compress_thread_id.thread != 0)
988   {
989     error= my_thread_join(&compress_thread_id, NULL);
990     compress_thread_id.thread= 0;
991   }
992 
993   if (error != 0)
994     sql_print_warning("Could not join gtid_executed table compression thread. "
995                       "error:%d", error);
996 
997   DBUG_VOID_RETURN;
998 }
999 
1000