1 /* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
2    Copyright (c) 2020, MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
16 
17 
18 /* Definitions for MariaDB global transaction ID (GTID). */
19 
20 #include "mariadb.h"
21 #include "sql_priv.h"
22 #include "unireg.h"
23 #include "mariadb.h"
24 #include "sql_base.h"
25 #include "sql_parse.h"
26 #include "key.h"
27 #include "rpl_gtid.h"
28 #include "rpl_rli.h"
29 #include "slave.h"
30 #include "log_event.h"
31 
32 const LEX_CSTRING rpl_gtid_slave_state_table_name=
33   { STRING_WITH_LEN("gtid_slave_pos") };
34 
35 
36 void
update_state_hash(uint64 sub_id,rpl_gtid * gtid,void * hton,rpl_group_info * rgi)37 rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
38                                    rpl_group_info *rgi)
39 {
40   int err;
41   /*
42     Add the gtid to the HASH in the replication slave state.
43 
44     We must do this only _after_ commit, so that for parallel replication,
45     there will not be an attempt to delete the corresponding table row before
46     it is even committed.
47   */
48   mysql_mutex_lock(&LOCK_slave_state);
49   err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
50   mysql_mutex_unlock(&LOCK_slave_state);
51   if (err)
52   {
53     sql_print_warning("Slave: Out of memory during slave state maintenance. "
54                       "Some no longer necessary rows in table "
55                       "mysql.%s may be left undeleted.",
56                       rpl_gtid_slave_state_table_name.str);
57     /*
58       Such failure is not fatal. We will fail to delete the row for this
59       GTID, but it will do no harm and will be removed automatically on next
60       server restart.
61     */
62   }
63 }
64 
65 
66 int
record_and_update_gtid(THD * thd,rpl_group_info * rgi)67 rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
68 {
69   DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
70 
71   /*
72     Update the GTID position, if we have it and did not already update
73     it in a GTID transaction.
74   */
75   if (rgi->gtid_pending)
76   {
77     uint64 sub_id= rgi->gtid_sub_id;
78     void *hton= NULL;
79 
80     rgi->gtid_pending= false;
81     if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
82     {
83       if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
84         DBUG_RETURN(1);
85       update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
86     }
87     rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
88   }
89   DBUG_RETURN(0);
90 }
91 
92 
93 /*
94   Check GTID event execution when --gtid-ignore-duplicates.
95 
96   The idea with --gtid-ignore-duplicates is that we allow multiple master
97   connections (in multi-source replication) to all receive the same GTIDs and
98   event groups. Only one instance of each is applied; we use the sequence
99   number in the GTID to decide whether a GTID has already been applied.
100 
101   So if the seq_no of a GTID (or a higher sequence number) has already been
102   applied, then the event should be skipped. If not then the event should be
103   applied.
104 
105   To avoid two master connections tring to apply the same event
106   simultaneously, only one is allowed to work in any given domain at any point
107   in time. The associated Relay_log_info object is called the owner of the
108   domain (and there can be multiple parallel worker threads working in that
109   domain for that Relay_log_info). Any other Relay_log_info/master connection
110   must wait for the domain to become free, or for their GTID to have been
111   applied, before being allowed to proceed.
112 
113   Returns:
114     0  This GTID is already applied, it should be skipped.
115     1  The GTID is not yet applied; this rli is now the owner, and must apply
116        the event and release the domain afterwards.
117    -1  Error (out of memory to allocate a new element for the domain).
118 */
119 int
check_duplicate_gtid(rpl_gtid * gtid,rpl_group_info * rgi)120 rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
121 {
122   uint32 domain_id= gtid->domain_id;
123   uint64 seq_no= gtid->seq_no;
124   rpl_slave_state::element *elem;
125   int res;
126   bool did_enter_cond= false;
127   PSI_stage_info old_stage;
128   THD *UNINIT_VAR(thd);
129   Relay_log_info *rli= rgi->rli;
130 
131   mysql_mutex_lock(&LOCK_slave_state);
132   if (!(elem= get_element(domain_id)))
133   {
134     my_error(ER_OUT_OF_RESOURCES, MYF(0));
135     res= -1;
136     goto err;
137   }
138   /*
139     Note that the elem pointer does not change once inserted in the hash. So
140     we can re-use the pointer without looking it up again in the hash after
141     each lock release and re-take.
142   */
143 
144   for (;;)
145   {
146     if (elem->highest_seq_no >= seq_no)
147     {
148       /* This sequence number is already applied, ignore it. */
149       res= 0;
150       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
151       break;
152     }
153     if (!elem->owner_rli)
154     {
155       /* The domain became free, grab it and apply the event. */
156       elem->owner_rli= rli;
157       elem->owner_count= 1;
158       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
159       res= 1;
160       break;
161     }
162     if (elem->owner_rli == rli)
163     {
164       /* Already own this domain, increment reference count and apply event. */
165       ++elem->owner_count;
166       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
167       res= 1;
168       break;
169     }
170     thd= rgi->thd;
171     if (unlikely(thd->check_killed()))
172     {
173       res= -1;
174       break;
175     }
176     /*
177       Someone else is currently processing this GTID (or an earlier one).
178       Wait for them to complete (or fail), and then check again.
179     */
180     if (!did_enter_cond)
181     {
182       thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
183                       &stage_gtid_wait_other_connection, &old_stage);
184       did_enter_cond= true;
185     }
186     mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
187                     &LOCK_slave_state);
188   }
189 
190 err:
191   if (did_enter_cond)
192     thd->EXIT_COND(&old_stage);
193   else
194     mysql_mutex_unlock(&LOCK_slave_state);
195   return res;
196 }
197 
198 
199 void
release_domain_owner(rpl_group_info * rgi)200 rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
201 {
202   element *elem= NULL;
203 
204   mysql_mutex_lock(&LOCK_slave_state);
205   if (!(elem= get_element(rgi->current_gtid.domain_id)))
206   {
207     /*
208       We cannot really deal with error here, as we are already called in an
209       error handling case (transaction failure and rollback).
210 
211       However, get_element() only fails if the element did not exist already
212       and could not be allocated due to out-of-memory - and if it did not
213       exist, then we would not get here in the first place.
214     */
215     mysql_mutex_unlock(&LOCK_slave_state);
216     return;
217   }
218 
219   if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
220   {
221     uint32 count= elem->owner_count;
222     DBUG_ASSERT(count > 0);
223     DBUG_ASSERT(elem->owner_rli == rgi->rli);
224     --count;
225     elem->owner_count= count;
226     if (count == 0)
227     {
228       elem->owner_rli= NULL;
229       mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
230     }
231   }
232   rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
233   mysql_mutex_unlock(&LOCK_slave_state);
234 }
235 
236 
237 static void
rpl_slave_state_free_element(void * arg)238 rpl_slave_state_free_element(void *arg)
239 {
240   struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
241   mysql_cond_destroy(&elem->COND_wait_gtid);
242   mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
243   my_free(elem);
244 }
245 
246 
rpl_slave_state()247 rpl_slave_state::rpl_slave_state()
248   : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false)
249 {
250   mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
251                    MY_MUTEX_INIT_SLOW);
252   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, offsetof(element, domain_id),
253                sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
254   my_init_dynamic_array(PSI_INSTRUMENT_ME, &gtid_sort_array, sizeof(rpl_gtid),
255                         8, 8, MYF(0));
256 }
257 
258 
~rpl_slave_state()259 rpl_slave_state::~rpl_slave_state()
260 {
261   free_gtid_pos_tables(gtid_pos_tables.load(std::memory_order_relaxed));
262   truncate_hash();
263   my_hash_free(&hash);
264   delete_dynamic(&gtid_sort_array);
265   mysql_mutex_destroy(&LOCK_slave_state);
266 }
267 
268 
269 void
truncate_hash()270 rpl_slave_state::truncate_hash()
271 {
272   uint32 i;
273 
274   for (i= 0; i < hash.records; ++i)
275   {
276     element *e= (element *)my_hash_element(&hash, i);
277     list_element *l= e->list;
278     list_element *next;
279     while (l)
280     {
281       next= l->next;
282       my_free(l);
283       l= next;
284     }
285     /* The element itself is freed by the hash element free function. */
286   }
287   my_hash_reset(&hash);
288 }
289 
290 
291 int
update(uint32 domain_id,uint32 server_id,uint64 sub_id,uint64 seq_no,void * hton,rpl_group_info * rgi)292 rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
293                         uint64 seq_no, void *hton, rpl_group_info *rgi)
294 {
295   element *elem= NULL;
296   list_element *list_elem= NULL;
297 
298   DBUG_ASSERT(hton || !loaded);
299   if (!(elem= get_element(domain_id)))
300     return 1;
301 
302   if (seq_no > elem->highest_seq_no)
303     elem->highest_seq_no= seq_no;
304   if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
305   {
306     /*
307       Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
308       Signal (and remove) them. The waiter will handle all the processing
309       of all pending MASTER_GTID_WAIT(), so we do not slow down the
310       replication SQL thread.
311     */
312     mysql_mutex_assert_owner(&LOCK_slave_state);
313     elem->gtid_waiter= NULL;
314     mysql_cond_broadcast(&elem->COND_wait_gtid);
315   }
316 
317   if (rgi)
318   {
319     if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
320     {
321 #ifdef DBUG_ASSERT_EXISTS
322       Relay_log_info *rli= rgi->rli;
323 #endif
324       uint32 count= elem->owner_count;
325       DBUG_ASSERT(count > 0);
326       DBUG_ASSERT(elem->owner_rli == rli);
327       --count;
328       elem->owner_count= count;
329       if (count == 0)
330       {
331         elem->owner_rli= NULL;
332         mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
333       }
334     }
335     rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
336   }
337 
338   if (!(list_elem= (list_element *)my_malloc(PSI_INSTRUMENT_ME,
339                                              sizeof(*list_elem), MYF(MY_WME))))
340     return 1;
341   list_elem->domain_id= domain_id;
342   list_elem->server_id= server_id;
343   list_elem->sub_id= sub_id;
344   list_elem->seq_no= seq_no;
345   list_elem->hton= hton;
346 
347   elem->add(list_elem);
348   if (last_sub_id < sub_id)
349     last_sub_id= sub_id;
350 
351 #ifdef HAVE_REPLICATION
352   ++pending_gtid_count;
353   if (pending_gtid_count >= opt_gtid_cleanup_batch_size)
354   {
355     pending_gtid_count = 0;
356     slave_background_gtid_pending_delete_request();
357   }
358 #endif
359 
360   return 0;
361 }
362 
363 
364 struct rpl_slave_state::element *
get_element(uint32 domain_id)365 rpl_slave_state::get_element(uint32 domain_id)
366 {
367   struct element *elem;
368 
369   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
370   if (elem)
371     return elem;
372 
373   if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(MY_WME))))
374     return NULL;
375   elem->list= NULL;
376   elem->domain_id= domain_id;
377   elem->highest_seq_no= 0;
378   elem->gtid_waiter= NULL;
379   elem->owner_rli= NULL;
380   elem->owner_count= 0;
381   mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
382   mysql_cond_init(key_COND_gtid_ignore_duplicates,
383                   &elem->COND_gtid_ignore_duplicates, 0);
384   if (my_hash_insert(&hash, (uchar *)elem))
385   {
386     my_free(elem);
387     return NULL;
388   }
389   return elem;
390 }
391 
392 
393 int
put_back_list(list_element * list)394 rpl_slave_state::put_back_list(list_element *list)
395 {
396   element *e= NULL;
397   int err= 0;
398 
399   mysql_mutex_lock(&LOCK_slave_state);
400   while (list)
401   {
402     list_element *next= list->next;
403 
404     if ((!e || e->domain_id != list->domain_id) &&
405         !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, 0)))
406     {
407       err= 1;
408       goto end;
409     }
410     e->add(list);
411     list= next;
412   }
413 
414 end:
415   mysql_mutex_unlock(&LOCK_slave_state);
416   return err;
417 }
418 
419 
420 int
truncate_state_table(THD * thd)421 rpl_slave_state::truncate_state_table(THD *thd)
422 {
423   TABLE_LIST tlist;
424   int err= 0;
425 
426   tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name,
427                        NULL, TL_WRITE);
428   tlist.mdl_request.set_type(MDL_EXCLUSIVE);
429   if (!(err= open_and_lock_tables(thd, &tlist, FALSE,
430                                   MYSQL_OPEN_IGNORE_LOGGING_FORMAT)))
431   {
432     DBUG_ASSERT(!tlist.table->file->row_logging);
433     tlist.table->s->tdc->flush(thd, true);
434     err= tlist.table->file->ha_truncate();
435 
436     if (err)
437     {
438       ha_rollback_trans(thd, FALSE);
439       close_thread_tables(thd);
440       ha_rollback_trans(thd, TRUE);
441     }
442     else
443     {
444       ha_commit_trans(thd, FALSE);
445       close_thread_tables(thd);
446       ha_commit_trans(thd, TRUE);
447     }
448     thd->release_transactional_locks();
449   }
450   return err;
451 }
452 
453 
454 static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
455   { { STRING_WITH_LEN("domain_id") },
456     { STRING_WITH_LEN("int(10) unsigned") },
457     {NULL, 0} },
458   { { STRING_WITH_LEN("sub_id") },
459     { STRING_WITH_LEN("bigint(20) unsigned") },
460     {NULL, 0} },
461   { { STRING_WITH_LEN("server_id") },
462     { STRING_WITH_LEN("int(10) unsigned") },
463     {NULL, 0} },
464   { { STRING_WITH_LEN("seq_no") },
465     { STRING_WITH_LEN("bigint(20) unsigned") },
466     {NULL, 0} },
467 };
468 
469 static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
470 
471 static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
472   array_elements(mysql_rpl_slave_state_coltypes),
473   mysql_rpl_slave_state_coltypes,
474   array_elements(mysql_rpl_slave_state_pk_parts),
475   mysql_rpl_slave_state_pk_parts
476 };
477 
478 static Table_check_intact_log_error gtid_table_intact;
479 
480 /*
481   Check that the mysql.gtid_slave_pos table has the correct definition.
482 */
483 int
gtid_check_rpl_slave_state_table(TABLE * table)484 gtid_check_rpl_slave_state_table(TABLE *table)
485 {
486   int err;
487 
488   if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
489     my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
490              rpl_gtid_slave_state_table_name.str);
491   return err;
492 }
493 
494 
495 /*
496   Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
497   that is already in use by the current transaction, if any.
498 */
499 void
select_gtid_pos_table(THD * thd,LEX_CSTRING * out_tablename)500 rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
501 {
502   /*
503     See comments on rpl_slave_state::gtid_pos_tables for rules around proper
504     access to the list.
505   */
506   auto list= gtid_pos_tables.load(std::memory_order_acquire);
507 
508   Ha_trx_info *ha_info;
509   uint count = 0;
510   for (ha_info= thd->transaction->all.ha_list; ha_info; ha_info= ha_info->next())
511   {
512     void *trx_hton= ha_info->ht();
513     auto table_entry= list;
514 
515     if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
516       continue;
517     while (table_entry)
518     {
519       if (table_entry->table_hton == trx_hton)
520       {
521         if (likely(table_entry->state == GTID_POS_AVAILABLE))
522         {
523           *out_tablename= table_entry->table_name;
524           /*
525             Check if this is a cross-engine transaction, so we can correctly
526             maintain the rpl_transactions_multi_engine status variable.
527           */
528           if (count >= 1)
529             statistic_increment(rpl_transactions_multi_engine, LOCK_status);
530           else
531           {
532             for (;;)
533             {
534               ha_info= ha_info->next();
535               if (!ha_info)
536                 break;
537               if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
538               {
539                 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
540                 break;
541               }
542             }
543           }
544           return;
545         }
546         /*
547           This engine is marked to automatically create the table.
548           We cannot easily do this here (possibly in the middle of a
549           transaction). But we can request the slave background thread
550           to create it, and in a short while it should become available
551           for following transactions.
552         */
553 #ifdef HAVE_REPLICATION
554         slave_background_gtid_pos_create_request(table_entry);
555 #endif
556         break;
557       }
558       table_entry= table_entry->next;
559     }
560     ++count;
561   }
562   /*
563     If we cannot find any table whose engine matches an engine that is
564     already active in the transaction, or if there is no current transaction
565     engines available, we return the default gtid_slave_pos table.
566   */
567   *out_tablename=
568     default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
569   /* Record in status that we failed to find a suitable gtid_pos table. */
570   if (count > 0)
571   {
572     statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
573     if (count > 1)
574       statistic_increment(rpl_transactions_multi_engine, LOCK_status);
575   }
576 }
577 
578 
579 /*
580   Write a gtid to the replication slave state table.
581 
582   Do it as part of the transaction, to get slave crash safety, or as a separate
583   transaction if !in_transaction (eg. MyISAM or DDL).
584 
585     gtid    The global transaction id for this event group.
586     sub_id  Value allocated within the sub_id when the event group was
587             read (sub_id must be consistent with commit order in master binlog).
588 
589   Note that caller must later ensure that the new gtid and sub_id is inserted
590   into the appropriate HASH element with rpl_slave_state.add(), so that it can
591   be deleted later. But this must only be done after COMMIT if in transaction.
592 */
593 int
record_gtid(THD * thd,const rpl_gtid * gtid,uint64 sub_id,bool in_transaction,bool in_statement,void ** out_hton)594 rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
595                              bool in_transaction, bool in_statement,
596                              void **out_hton)
597 {
598   TABLE_LIST tlist;
599   int err= 0, not_sql_thread;
600   bool table_opened= false;
601   TABLE *table;
602   ulonglong thd_saved_option= thd->variables.option_bits;
603   Query_tables_list lex_backup;
604   wait_for_commit* suspended_wfc;
605   void *hton= NULL;
606   LEX_CSTRING gtid_pos_table_name;
607   DBUG_ENTER("record_gtid");
608 
609   *out_hton= NULL;
610   if (unlikely(!loaded))
611   {
612     /*
613       Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or
614       corrupt.
615 
616       We already complained loudly about this, but we can try to continue
617       until the DBA fixes it.
618     */
619     DBUG_RETURN(0);
620   }
621 
622   if (!in_statement)
623     thd->reset_for_next_command();
624 
625   /*
626     Only the SQL thread can call select_gtid_pos_table without a mutex
627     Other threads needs to use a mutex and take into account that the
628     result may change during execution, so we have to make a copy.
629   */
630 
631   if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL)))
632     mysql_mutex_lock(&LOCK_slave_state);
633   select_gtid_pos_table(thd, &gtid_pos_table_name);
634   if (not_sql_thread)
635   {
636     LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str,
637                                             gtid_pos_table_name.length);
638     mysql_mutex_unlock(&LOCK_slave_state);
639     if (!tmp)
640       DBUG_RETURN(1);
641     gtid_pos_table_name= *tmp;
642   }
643 
644   DBUG_EXECUTE_IF("gtid_inject_record_gtid",
645                   {
646                     my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
647                     DBUG_RETURN(1);
648                   } );
649 
650   /*
651     If we are applying a non-transactional event group, we will be committing
652     here a transaction, but that does not imply that the event group has
653     completed or has been binlogged. So we should not trigger
654     wakeup_subsequent_commits() here.
655 
656     Note: An alternative here could be to put a call to mark_start_commit() in
657     stmt_done() before the call to record_and_update_gtid(). This would
658     prevent later calling mark_start_commit() after we have run
659     wakeup_subsequent_commits() from committing the GTID update transaction
660     (which must be avoided to avoid accessing freed group_commit_orderer
661     object). It would also allow following event groups to start slightly
662     earlier. And in the cases where record_gtid() is called without an active
663     transaction, the current statement should have been binlogged already, so
664     binlog order is preserved.
665 
666     But this is rather subtle, and potentially fragile. And it does not really
667     seem worth it; non-transactional loads are unlikely to benefit much from
668     parallel replication in any case. So for now, we go with the simple
669     suspend/resume of wakeup_subsequent_commits() here in record_gtid().
670   */
671   suspended_wfc= thd->suspend_subsequent_commits();
672   thd->lex->reset_n_backup_query_tables_list(&lex_backup);
673   tlist.init_one_table(&MYSQL_SCHEMA_NAME, &gtid_pos_table_name, NULL, TL_WRITE);
674   if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
675     goto end;
676   table_opened= true;
677   table= tlist.table;
678   hton= table->s->db_type();
679   table->file->row_logging= 0;                  // No binary logging
680 
681   if ((err= gtid_check_rpl_slave_state_table(table)))
682     goto end;
683 
684 #ifdef WITH_WSREP
685   /*
686     Updates in slave state table should not be appended to galera transaction
687     writeset.
688   */
689   thd->wsrep_ignore_table= true;
690 #endif
691 
692   if (!in_transaction)
693   {
694     DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
695     thd->variables.option_bits&=
696       ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
697                    OPTION_GTID_BEGIN);
698   }
699   else
700     thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
701 
702   bitmap_set_all(table->write_set);
703   table->rpl_write_set= table->write_set;
704 
705   table->field[0]->store((ulonglong)gtid->domain_id, true);
706   table->field[1]->store(sub_id, true);
707   table->field[2]->store((ulonglong)gtid->server_id, true);
708   table->field[3]->store(gtid->seq_no, true);
709   DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
710   if ((err= table->file->ha_write_row(table->record[0])))
711   {
712     table->file->print_error(err, MYF(0));
713     goto end;
714   }
715   *out_hton= hton;
716 
717   if(opt_bin_log &&
718      (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
719                                                        gtid->seq_no)))
720   {
721     my_error(ER_OUT_OF_RESOURCES, MYF(0));
722     goto end;
723   }
724 end:
725 
726 #ifdef WITH_WSREP
727   thd->wsrep_ignore_table= false;
728 #endif
729 
730   if (table_opened)
731   {
732     if (err || (err= ha_commit_trans(thd, FALSE)))
733       ha_rollback_trans(thd, FALSE);
734 
735     close_thread_tables(thd);
736     if (in_transaction)
737       thd->mdl_context.release_statement_locks();
738     else
739       thd->release_transactional_locks();
740   }
741   thd->lex->restore_backup_query_tables_list(&lex_backup);
742   thd->variables.option_bits= thd_saved_option;
743   thd->resume_subsequent_commits(suspended_wfc);
744   DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
745     {
746       if (gtid->server_id == 100)
747         my_sleep(500000);
748     });
749   DBUG_RETURN(err);
750 }
751 
752 
753 /*
754   Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are
755   no longer needed and can be deleted from the table.
756 
757   Within each domain, we need to keep around the latest GTID (the one with the
758   highest sub_id), but any others in that domain can be deleted.
759 */
760 rpl_slave_state::list_element *
gtid_grab_pending_delete_list()761 rpl_slave_state::gtid_grab_pending_delete_list()
762 {
763   uint32 i;
764   list_element *full_list;
765 
766   mysql_mutex_lock(&LOCK_slave_state);
767   full_list= NULL;
768   for (i= 0; i < hash.records; ++i)
769   {
770     element *elem= (element *)my_hash_element(&hash, i);
771     list_element *elist= elem->list;
772     list_element *last_elem, **best_ptr_ptr, *cur, *next;
773     uint64 best_sub_id;
774 
775     if (!elist)
776       continue;                                 /* Nothing here */
777 
778     /* Delete any old stuff, but keep around the most recent one. */
779     cur= elist;
780     best_sub_id= cur->sub_id;
781     best_ptr_ptr= &elist;
782     last_elem= cur;
783     while ((next= cur->next)) {
784       last_elem= next;
785       if (next->sub_id > best_sub_id)
786       {
787         best_sub_id= next->sub_id;
788         best_ptr_ptr= &cur->next;
789       }
790       cur= next;
791     }
792     /*
793       Append the new elements to the full list. Note the order is important;
794       we do it here so that we do not break the list if best_sub_id is the
795       last of the new elements.
796     */
797     last_elem->next= full_list;
798     /*
799       Delete the highest sub_id element from the old list, and put it back as
800       the single-element new list.
801     */
802     cur= *best_ptr_ptr;
803     *best_ptr_ptr= cur->next;
804     cur->next= NULL;
805     elem->list= cur;
806 
807     /*
808       Collect the full list so far here. Note that elist may have moved if we
809       deleted the first element, so order is again important.
810     */
811     full_list= elist;
812   }
813   mysql_mutex_unlock(&LOCK_slave_state);
814 
815   return full_list;
816 }
817 
818 
819 /* Find the mysql.gtid_slave_posXXX table associated with a given hton. */
820 LEX_CSTRING *
select_gtid_pos_table(void * hton)821 rpl_slave_state::select_gtid_pos_table(void *hton)
822 {
823   /*
824     See comments on rpl_slave_state::gtid_pos_tables for rules around proper
825     access to the list.
826   */
827   auto table_entry= gtid_pos_tables.load(std::memory_order_acquire);
828 
829   while (table_entry)
830   {
831     if (table_entry->table_hton == hton)
832     {
833       if (likely(table_entry->state == GTID_POS_AVAILABLE))
834         return &table_entry->table_name;
835     }
836     table_entry= table_entry->next;
837   }
838 
839   return &default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
840 }
841 
842 
843 void
gtid_delete_pending(THD * thd,rpl_slave_state::list_element ** list_ptr)844 rpl_slave_state::gtid_delete_pending(THD *thd,
845                                      rpl_slave_state::list_element **list_ptr)
846 {
847   int err= 0;
848   ulonglong thd_saved_option;
849 
850   if (unlikely(!loaded))
851     return;
852 
853 #ifdef WITH_WSREP
854   /*
855     Updates in slave state table should not be appended to galera transaction
856     writeset.
857   */
858   thd->wsrep_ignore_table= true;
859 #endif
860 
861   thd_saved_option= thd->variables.option_bits;
862   thd->variables.option_bits&=
863     ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
864                  OPTION_GTID_BEGIN);
865 
866   while (*list_ptr)
867   {
868     LEX_CSTRING *gtid_pos_table_name, *tmp_table_name;
869     Query_tables_list lex_backup;
870     TABLE_LIST tlist;
871     TABLE *table;
872     handler::Table_flags direct_pos= 0;
873     list_element *cur, **cur_ptr_ptr;
874     bool table_opened= false;
875     bool index_inited= false;
876     void *hton= (*list_ptr)->hton;
877 
878     thd->reset_for_next_command();
879 
880     /*
881       Only the SQL thread can call select_gtid_pos_table without a mutex
882       Other threads needs to use a mutex and take into account that the
883       result may change during execution, so we have to make a copy.
884     */
885     mysql_mutex_lock(&LOCK_slave_state);
886     tmp_table_name= select_gtid_pos_table(hton);
887     gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str,
888                                                tmp_table_name->length);
889     mysql_mutex_unlock(&LOCK_slave_state);
890     if (!gtid_pos_table_name)
891     {
892       /* Out of memory - we can try again later. */
893       break;
894     }
895 
896     thd->lex->reset_n_backup_query_tables_list(&lex_backup);
897     tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE);
898     if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
899       goto end;
900     table_opened= true;
901     table= tlist.table;
902 
903     if ((err= gtid_check_rpl_slave_state_table(table)))
904       goto end;
905 
906     direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION;
907     bitmap_set_all(table->write_set);
908     table->rpl_write_set= table->write_set;
909 
910     /* Now delete any already committed GTIDs. */
911     bitmap_set_bit(table->read_set, table->field[0]->field_index);
912     bitmap_set_bit(table->read_set, table->field[1]->field_index);
913 
914     if (!direct_pos)
915     {
916       if ((err= table->file->ha_index_init(0, 0)))
917       {
918         table->file->print_error(err, MYF(0));
919         goto end;
920       }
921       index_inited= true;
922     }
923 
924     cur = *list_ptr;
925     cur_ptr_ptr = list_ptr;
926     do
927     {
928       uchar key_buffer[4+8];
929       list_element *next= cur->next;
930 
931       if (cur->hton == hton)
932       {
933         int res;
934 
935         table->field[0]->store((ulonglong)cur->domain_id, true);
936         table->field[1]->store(cur->sub_id, true);
937         if (direct_pos)
938         {
939           res= table->file->ha_rnd_pos_by_record(table->record[0]);
940         }
941         else
942         {
943           key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
944           res= table->file->ha_index_read_map(table->record[0], key_buffer,
945                                               HA_WHOLE_KEY, HA_READ_KEY_EXACT);
946         }
947         DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
948               { res= 1;
949                 err= ENOENT;
950                 sql_print_error("<DEBUG> Error deleting old GTID row");
951               });
952         if (res)
953           /* We cannot find the row, assume it is already deleted. */
954           ;
955         else if ((err= table->file->ha_delete_row(table->record[0])))
956         {
957           sql_print_error("Error deleting old GTID row: %s",
958                           thd->get_stmt_da()->message());
959           /*
960             In case of error, we still discard the element from the list. We do
961             not want to endlessly error on the same element in case of table
962             corruption or such.
963           */
964         }
965         *cur_ptr_ptr= next;
966         my_free(cur);
967       }
968       else
969       {
970         /* Leave this one in the list until we get to the table for its hton. */
971         cur_ptr_ptr= &cur->next;
972       }
973       cur= next;
974       if (err)
975         break;
976     } while (cur);
977 end:
978     if (table_opened)
979     {
980       DBUG_ASSERT(direct_pos || index_inited || err);
981       /*
982         Index may not be initialized if there was a failure during
983         'ha_index_init'. Hence check if index initialization is successful and
984         then invoke ha_index_end(). Ending an index which is not initialized
985         will lead to assert.
986       */
987       if (index_inited)
988         table->file->ha_index_end();
989 
990       if (err || (err= ha_commit_trans(thd, FALSE)))
991         ha_rollback_trans(thd, FALSE);
992     }
993     close_thread_tables(thd);
994     thd->release_transactional_locks();
995     thd->lex->restore_backup_query_tables_list(&lex_backup);
996 
997     if (err)
998       break;
999   }
1000   thd->variables.option_bits= thd_saved_option;
1001 
1002 #ifdef WITH_WSREP
1003   thd->wsrep_ignore_table= false;
1004 #endif
1005 }
1006 
1007 
1008 uint64
next_sub_id(uint32 domain_id)1009 rpl_slave_state::next_sub_id(uint32 domain_id)
1010 {
1011   uint64 sub_id= 0;
1012 
1013   mysql_mutex_lock(&LOCK_slave_state);
1014   sub_id= ++last_sub_id;
1015   mysql_mutex_unlock(&LOCK_slave_state);
1016 
1017   return sub_id;
1018 }
1019 
1020 /* A callback used in sorting of gtid list based on domain_id. */
rpl_gtid_cmp_cb(const void * id1,const void * id2)1021 static int rpl_gtid_cmp_cb(const void *id1, const void *id2)
1022 {
1023   uint32 d1= ((rpl_gtid *)id1)->domain_id;
1024   uint32 d2= ((rpl_gtid *)id2)->domain_id;
1025 
1026   if (d1 < d2)
1027     return -1;
1028   else if (d1 > d2)
1029     return 1;
1030   return 0;
1031 }
1032 
1033 /* Format the specified gtid and store it in the given string buffer. */
1034 bool
rpl_slave_state_tostring_helper(String * dest,const rpl_gtid * gtid,bool * first)1035 rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
1036 {
1037   if (*first)
1038     *first= false;
1039   else
1040     if (dest->append(",",1))
1041       return true;
1042   return
1043     dest->append_ulonglong(gtid->domain_id) ||
1044     dest->append("-",1) ||
1045     dest->append_ulonglong(gtid->server_id) ||
1046     dest->append("-",1) ||
1047     dest->append_ulonglong(gtid->seq_no);
1048 }
1049 
1050 /*
1051   Sort the given gtid list based on domain_id and store them in the specified
1052   string.
1053 */
1054 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,String * str)1055 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, String *str)
1056 {
1057   bool first= true, res= true;
1058 
1059   sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
1060 
1061   for (uint i= 0; i < gtid_dynarr->elements; i ++)
1062   {
1063     rpl_gtid *gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
1064     if (rpl_slave_state_tostring_helper(str, gtid, &first))
1065       goto err;
1066   }
1067   res= false;
1068 
1069 err:
1070   return res;
1071 }
1072 
1073 
1074 /* Sort the given gtid list based on domain_id and call cb for each gtid. */
1075 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,int (* cb)(rpl_gtid *,void *),void * data)1076 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr,
1077                                 int (*cb)(rpl_gtid *, void *),
1078                                 void *data)
1079 {
1080   rpl_gtid *gtid;
1081   bool res= true;
1082 
1083   sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
1084 
1085   for (uint i= 0; i < gtid_dynarr->elements; i ++)
1086   {
1087     gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
1088     if ((*cb)(gtid, data))
1089       goto err;
1090   }
1091   res= false;
1092 
1093 err:
1094   return res;
1095 }
1096 
1097 int
iterate(int (* cb)(rpl_gtid *,void *),void * data,rpl_gtid * extra_gtids,uint32 num_extra,bool sort)1098 rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
1099                          rpl_gtid *extra_gtids, uint32 num_extra,
1100                          bool sort)
1101 {
1102   uint32 i;
1103   HASH gtid_hash;
1104   uchar *rec;
1105   rpl_gtid *gtid;
1106   int res= 1;
1107   bool locked= false;
1108 
1109   my_hash_init(PSI_INSTRUMENT_ME, &gtid_hash, &my_charset_bin, 32,
1110                offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, NULL,
1111                HASH_UNIQUE);
1112   for (i= 0; i < num_extra; ++i)
1113     if (extra_gtids[i].server_id == global_system_variables.server_id &&
1114         my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
1115       goto err;
1116 
1117   mysql_mutex_lock(&LOCK_slave_state);
1118   locked= true;
1119   reset_dynamic(&gtid_sort_array);
1120 
1121   for (i= 0; i < hash.records; ++i)
1122   {
1123     uint64 best_sub_id;
1124     rpl_gtid best_gtid;
1125     element *e= (element *)my_hash_element(&hash, i);
1126     list_element *l= e->list;
1127 
1128     if (!l)
1129       continue;                                 /* Nothing here */
1130 
1131     best_gtid.domain_id= e->domain_id;
1132     best_gtid.server_id= l->server_id;
1133     best_gtid.seq_no= l->seq_no;
1134     best_sub_id= l->sub_id;
1135     while ((l= l->next))
1136     {
1137       if (l->sub_id > best_sub_id)
1138       {
1139         best_sub_id= l->sub_id;
1140         best_gtid.server_id= l->server_id;
1141         best_gtid.seq_no= l->seq_no;
1142       }
1143     }
1144 
1145     /* Check if we have something newer in the extra list. */
1146     rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
1147     if (rec)
1148     {
1149       gtid= (rpl_gtid *)rec;
1150       if (gtid->seq_no > best_gtid.seq_no)
1151         memcpy(&best_gtid, gtid, sizeof(best_gtid));
1152       if (my_hash_delete(&gtid_hash, rec))
1153       {
1154         goto err;
1155       }
1156     }
1157 
1158     if ((res= sort ? insert_dynamic(&gtid_sort_array,
1159                                     (const void *) &best_gtid) :
1160          (*cb)(&best_gtid, data)))
1161     {
1162       goto err;
1163     }
1164   }
1165 
1166   /* Also add any remaining extra domain_ids. */
1167   for (i= 0; i < gtid_hash.records; ++i)
1168   {
1169     gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
1170     if ((res= sort ? insert_dynamic(&gtid_sort_array, (const void *) gtid) :
1171          (*cb)(gtid, data)))
1172     {
1173       goto err;
1174     }
1175   }
1176 
1177   if (sort && rpl_slave_state_tostring_helper(&gtid_sort_array, cb, data))
1178   {
1179     goto err;
1180   }
1181 
1182   res= 0;
1183 
1184 err:
1185   if (locked) mysql_mutex_unlock(&LOCK_slave_state);
1186   my_hash_free(&gtid_hash);
1187 
1188   return res;
1189 }
1190 
1191 
1192 struct rpl_slave_state_tostring_data {
1193   String *dest;
1194   bool first;
1195 };
1196 static int
rpl_slave_state_tostring_cb(rpl_gtid * gtid,void * data)1197 rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
1198 {
1199   rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
1200   return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
1201 }
1202 
1203 
1204 /*
1205   Prepare the current slave state as a string, suitable for sending to the
1206   master to request to receive binlog events starting from that GTID state.
1207 
1208   The state consists of the most recently applied GTID for each domain_id,
1209   ie. the one with the highest sub_id within each domain_id.
1210 
1211   Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
1212   a server was previously a master and now needs to connect to a new master as
1213   a slave. For each domain_id, if the GTID in the binlog was logged with our
1214   own server_id _and_ has a higher seq_no than what is in the slave state,
1215   then this should be used as the position to start replicating at. This
1216   allows to promote a slave as new master, and connect the old master as a
1217   slave with MASTER_GTID_POS=AUTO.
1218 */
1219 int
tostring(String * dest,rpl_gtid * extra_gtids,uint32 num_extra)1220 rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
1221 {
1222   struct rpl_slave_state_tostring_data data;
1223   data.first= true;
1224   data.dest= dest;
1225 
1226   return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids,
1227                  num_extra, true);
1228 }
1229 
1230 
1231 /*
1232   Lookup a domain_id in the current replication slave state.
1233 
1234   Returns false if the domain_id has no entries in the slave state.
1235   Otherwise returns true, and fills in out_gtid with the corresponding
1236   GTID.
1237 */
1238 bool
domain_to_gtid(uint32 domain_id,rpl_gtid * out_gtid)1239 rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
1240 {
1241   element *elem;
1242   list_element *list;
1243   uint64 best_sub_id;
1244 
1245   mysql_mutex_lock(&LOCK_slave_state);
1246   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1247   if (!elem || !(list= elem->list))
1248   {
1249     mysql_mutex_unlock(&LOCK_slave_state);
1250     return false;
1251   }
1252 
1253   out_gtid->domain_id= domain_id;
1254   out_gtid->server_id= list->server_id;
1255   out_gtid->seq_no= list->seq_no;
1256   best_sub_id= list->sub_id;
1257 
1258   while ((list= list->next))
1259   {
1260     if (best_sub_id > list->sub_id)
1261       continue;
1262     best_sub_id= list->sub_id;
1263     out_gtid->server_id= list->server_id;
1264     out_gtid->seq_no= list->seq_no;
1265   }
1266 
1267   mysql_mutex_unlock(&LOCK_slave_state);
1268   return true;
1269 }
1270 
1271 
1272 /*
1273   Parse a GTID at the start of a string, and update the pointer to point
1274   at the first character after the parsed GTID.
1275 
1276   Returns 0 on ok, non-zero on parse error.
1277 */
1278 static int
gtid_parser_helper(const char ** ptr,const char * end,rpl_gtid * out_gtid)1279 gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
1280 {
1281   char *q;
1282   const char *p= *ptr;
1283   uint64 v1, v2, v3;
1284   int err= 0;
1285 
1286   q= (char*) end;
1287   v1= (uint64)my_strtoll10(p, &q, &err);
1288   if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
1289     return 1;
1290   p= q+1;
1291   q= (char*) end;
1292   v2= (uint64)my_strtoll10(p, &q, &err);
1293   if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
1294     return 1;
1295   p= q+1;
1296   q= (char*) end;
1297   v3= (uint64)my_strtoll10(p, &q, &err);
1298   if (err != 0)
1299     return 1;
1300 
1301   out_gtid->domain_id= (uint32) v1;
1302   out_gtid->server_id= (uint32) v2;
1303   out_gtid->seq_no= v3;
1304   *ptr= q;
1305   return 0;
1306 }
1307 
1308 
1309 rpl_gtid *
gtid_parse_string_to_list(const char * str,size_t str_len,uint32 * out_len)1310 gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
1311 {
1312   const char *p= const_cast<char *>(str);
1313   const char *end= p + str_len;
1314   uint32 len= 0, alloc_len= 5;
1315   rpl_gtid *list= NULL;
1316 
1317   for (;;)
1318   {
1319     rpl_gtid gtid;
1320 
1321     if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
1322     {
1323       my_free(list);
1324       return NULL;
1325     }
1326     if ((!list || len >= alloc_len) &&
1327         !(list=
1328           (rpl_gtid *)my_realloc(PSI_INSTRUMENT_ME, list,
1329                                  (alloc_len= alloc_len*2) * sizeof(rpl_gtid),
1330                                  MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
1331       return NULL;
1332     list[len++]= gtid;
1333 
1334     if (p == end)
1335       break;
1336     if (*p != ',')
1337     {
1338       my_free(list);
1339       return NULL;
1340     }
1341     ++p;
1342   }
1343   *out_len= len;
1344   return list;
1345 }
1346 
1347 
1348 /*
1349   Update the slave replication state with the GTID position obtained from
1350   master when connecting with old-style (filename,offset) position.
1351 
1352   If RESET is true then all existing entries are removed. Otherwise only
1353   domain_ids mentioned in the STATE_FROM_MASTER are changed.
1354 
1355   Returns 0 if ok, non-zero if error.
1356 */
1357 int
load(THD * thd,const char * state_from_master,size_t len,bool reset,bool in_statement)1358 rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
1359                       bool reset, bool in_statement)
1360 {
1361   const char *end= state_from_master + len;
1362 
1363   if (reset)
1364   {
1365     if (truncate_state_table(thd))
1366       return 1;
1367     truncate_hash();
1368   }
1369   if (state_from_master == end)
1370     return 0;
1371   for (;;)
1372   {
1373     rpl_gtid gtid;
1374     uint64 sub_id;
1375     void *hton= NULL;
1376 
1377     if (gtid_parser_helper(&state_from_master, end, &gtid) ||
1378         !(sub_id= next_sub_id(gtid.domain_id)) ||
1379         record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
1380         update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
1381       return 1;
1382     if (state_from_master == end)
1383       break;
1384     if (*state_from_master != ',')
1385       return 1;
1386     ++state_from_master;
1387   }
1388   return 0;
1389 }
1390 
1391 
1392 bool
is_empty()1393 rpl_slave_state::is_empty()
1394 {
1395   uint32 i;
1396   bool result= true;
1397 
1398   mysql_mutex_lock(&LOCK_slave_state);
1399   for (i= 0; i < hash.records; ++i)
1400   {
1401     element *e= (element *)my_hash_element(&hash, i);
1402     if (e->list)
1403     {
1404       result= false;
1405       break;
1406     }
1407   }
1408   mysql_mutex_unlock(&LOCK_slave_state);
1409 
1410   return result;
1411 }
1412 
1413 
1414 void
free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table * list)1415 rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
1416 {
1417   struct gtid_pos_table *cur, *next;
1418 
1419   cur= list;
1420   while (cur)
1421   {
1422     next= cur->next;
1423     my_free(cur);
1424     cur= next;
1425   }
1426 }
1427 
1428 
1429 /*
1430   Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
1431   The caller must be holding LOCK_slave_state. Additionally, this function
1432   must only be called while all SQL threads are stopped.
1433 */
1434 void
set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table * new_list,rpl_slave_state::gtid_pos_table * default_entry)1435 rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
1436                                           rpl_slave_state::gtid_pos_table *default_entry)
1437 {
1438   mysql_mutex_assert_owner(&LOCK_slave_state);
1439   auto old_list= gtid_pos_tables.load(std::memory_order_relaxed);
1440   gtid_pos_tables.store(new_list, std::memory_order_release);
1441   default_gtid_pos_table.store(default_entry, std::memory_order_release);
1442   free_gtid_pos_tables(old_list);
1443 }
1444 
1445 
1446 void
add_gtid_pos_table(rpl_slave_state::gtid_pos_table * entry)1447 rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
1448 {
1449   mysql_mutex_assert_owner(&LOCK_slave_state);
1450   entry->next= gtid_pos_tables.load(std::memory_order_relaxed);
1451   gtid_pos_tables.store(entry, std::memory_order_release);
1452 }
1453 
1454 
1455 struct rpl_slave_state::gtid_pos_table *
alloc_gtid_pos_table(LEX_CSTRING * table_name,void * hton,rpl_slave_state::gtid_pos_table_state state)1456 rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton,
1457                                       rpl_slave_state::gtid_pos_table_state state)
1458 {
1459   struct gtid_pos_table *p;
1460   char *allocated_str;
1461 
1462   if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME), &p, sizeof(*p),
1463                        &allocated_str, table_name->length+1, NULL))
1464   {
1465     my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
1466     return NULL;
1467   }
1468   memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
1469   p->next = NULL;
1470   p->table_hton= hton;
1471   p->table_name.str= allocated_str;
1472   p->table_name.length= table_name->length;
1473   p->state= state;
1474   return p;
1475 }
1476 
1477 
init()1478 void rpl_binlog_state::init()
1479 {
1480   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, offsetof(element, domain_id),
1481                sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1482   my_init_dynamic_array(PSI_INSTRUMENT_ME, &gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
1483   mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
1484                    MY_MUTEX_INIT_SLOW);
1485   initialized= 1;
1486 }
1487 
1488 void
reset_nolock()1489 rpl_binlog_state::reset_nolock()
1490 {
1491   uint32 i;
1492 
1493   for (i= 0; i < hash.records; ++i)
1494     my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
1495   my_hash_reset(&hash);
1496 }
1497 
1498 
1499 void
reset()1500 rpl_binlog_state::reset()
1501 {
1502   mysql_mutex_lock(&LOCK_binlog_state);
1503   reset_nolock();
1504   mysql_mutex_unlock(&LOCK_binlog_state);
1505 }
1506 
1507 
free()1508 void rpl_binlog_state::free()
1509 {
1510   if (initialized)
1511   {
1512     initialized= 0;
1513     reset_nolock();
1514     my_hash_free(&hash);
1515     delete_dynamic(&gtid_sort_array);
1516     mysql_mutex_destroy(&LOCK_binlog_state);
1517   }
1518 }
1519 
1520 
1521 bool
load(struct rpl_gtid * list,uint32 count)1522 rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
1523 {
1524   uint32 i;
1525   bool res= false;
1526 
1527   mysql_mutex_lock(&LOCK_binlog_state);
1528   reset_nolock();
1529   for (i= 0; i < count; ++i)
1530   {
1531     if (update_nolock(&(list[i]), false))
1532     {
1533       res= true;
1534       break;
1535     }
1536   }
1537   mysql_mutex_unlock(&LOCK_binlog_state);
1538   return res;
1539 }
1540 
1541 
rpl_binlog_state_load_cb(rpl_gtid * gtid,void * data)1542 static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
1543 {
1544   rpl_binlog_state *self= (rpl_binlog_state *)data;
1545   return self->update_nolock(gtid, false);
1546 }
1547 
1548 
1549 bool
load(rpl_slave_state * slave_pos)1550 rpl_binlog_state::load(rpl_slave_state *slave_pos)
1551 {
1552   bool res= false;
1553 
1554   mysql_mutex_lock(&LOCK_binlog_state);
1555   reset_nolock();
1556   if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0, false))
1557     res= true;
1558   mysql_mutex_unlock(&LOCK_binlog_state);
1559   return res;
1560 }
1561 
1562 
~rpl_binlog_state()1563 rpl_binlog_state::~rpl_binlog_state()
1564 {
1565   free();
1566 }
1567 
1568 
1569 /*
1570   Update replication state with a new GTID.
1571 
1572   If the (domain_id, server_id) pair already exists, then the new GTID replaces
1573   the old one for that domain id. Else a new entry is inserted.
1574 
1575   Returns 0 for ok, 1 for error.
1576 */
1577 int
update_nolock(const struct rpl_gtid * gtid,bool strict)1578 rpl_binlog_state::update_nolock(const struct rpl_gtid *gtid, bool strict)
1579 {
1580   element *elem;
1581 
1582   if ((elem= (element *)my_hash_search(&hash,
1583                                        (const uchar *)(&gtid->domain_id), 0)))
1584   {
1585     if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
1586     {
1587       my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
1588                gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
1589                elem->last_gtid->server_id, elem->last_gtid->seq_no);
1590       return 1;
1591     }
1592     if (elem->seq_no_counter < gtid->seq_no)
1593       elem->seq_no_counter= gtid->seq_no;
1594     if (!elem->update_element(gtid))
1595       return 0;
1596   }
1597   else if (!alloc_element_nolock(gtid))
1598     return 0;
1599 
1600   my_error(ER_OUT_OF_RESOURCES, MYF(0));
1601   return 1;
1602 }
1603 
1604 
1605 int
update(const struct rpl_gtid * gtid,bool strict)1606 rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
1607 {
1608   int res;
1609   mysql_mutex_lock(&LOCK_binlog_state);
1610   res= update_nolock(gtid, strict);
1611   mysql_mutex_unlock(&LOCK_binlog_state);
1612   return res;
1613 }
1614 
1615 
1616 /*
1617   Fill in a new GTID, allocating next sequence number, and update state
1618   accordingly.
1619 */
1620 int
update_with_next_gtid(uint32 domain_id,uint32 server_id,rpl_gtid * gtid)1621 rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
1622                                         rpl_gtid *gtid)
1623 {
1624   element *elem;
1625   int res= 0;
1626 
1627   gtid->domain_id= domain_id;
1628   gtid->server_id= server_id;
1629 
1630   mysql_mutex_lock(&LOCK_binlog_state);
1631   if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1632   {
1633     gtid->seq_no= ++elem->seq_no_counter;
1634     if (!elem->update_element(gtid))
1635       goto end;
1636   }
1637   else
1638   {
1639     gtid->seq_no= 1;
1640     if (!alloc_element_nolock(gtid))
1641       goto end;
1642   }
1643 
1644   my_error(ER_OUT_OF_RESOURCES, MYF(0));
1645   res= 1;
1646 end:
1647   mysql_mutex_unlock(&LOCK_binlog_state);
1648   return res;
1649 }
1650 
1651 
1652 /* Helper functions for update. */
1653 int
update_element(const rpl_gtid * gtid)1654 rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
1655 {
1656   rpl_gtid *lookup_gtid;
1657 
1658   /*
1659     By far the most common case is that successive events within same
1660     replication domain have the same server id (it changes only when
1661     switching to a new master). So save a hash lookup in this case.
1662   */
1663   if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
1664   {
1665     last_gtid->seq_no= gtid->seq_no;
1666     return 0;
1667   }
1668 
1669   lookup_gtid= (rpl_gtid *)
1670     my_hash_search(&hash, (const uchar *)&gtid->server_id, 0);
1671   if (lookup_gtid)
1672   {
1673     lookup_gtid->seq_no= gtid->seq_no;
1674     last_gtid= lookup_gtid;
1675     return 0;
1676   }
1677 
1678   /* Allocate a new GTID and insert it. */
1679   lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid),
1680                                      MYF(MY_WME));
1681   if (!lookup_gtid)
1682     return 1;
1683   memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1684   if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
1685   {
1686     my_free(lookup_gtid);
1687     return 1;
1688   }
1689   last_gtid= lookup_gtid;
1690   return 0;
1691 }
1692 
1693 
1694 int
alloc_element_nolock(const rpl_gtid * gtid)1695 rpl_binlog_state::alloc_element_nolock(const rpl_gtid *gtid)
1696 {
1697   element *elem;
1698   rpl_gtid *lookup_gtid;
1699 
1700   /* First time we see this domain_id; allocate a new element. */
1701   elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(MY_WME));
1702   lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid),
1703                                      MYF(MY_WME));
1704   if (elem && lookup_gtid)
1705   {
1706     elem->domain_id= gtid->domain_id;
1707     my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32,
1708                  offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1709                  HASH_UNIQUE);
1710     elem->last_gtid= lookup_gtid;
1711     elem->seq_no_counter= gtid->seq_no;
1712     memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1713     if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
1714     {
1715       lookup_gtid= NULL;                        /* Do not free. */
1716       if (0 == my_hash_insert(&hash, (const uchar *)elem))
1717         return 0;
1718     }
1719     my_hash_free(&elem->hash);
1720   }
1721 
1722   /* An error. */
1723   if (elem)
1724     my_free(elem);
1725   if (lookup_gtid)
1726     my_free(lookup_gtid);
1727   return 1;
1728 }
1729 
1730 
1731 /*
1732   Check that a new GTID can be logged without creating an out-of-order
1733   sequence number with existing GTIDs.
1734 */
1735 bool
check_strict_sequence(uint32 domain_id,uint32 server_id,uint64 seq_no)1736 rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
1737                                         uint64 seq_no)
1738 {
1739   element *elem;
1740   bool res= 0;
1741 
1742   mysql_mutex_lock(&LOCK_binlog_state);
1743   if ((elem= (element *)my_hash_search(&hash,
1744                                        (const uchar *)(&domain_id), 0)) &&
1745       elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
1746   {
1747     my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
1748              elem->last_gtid->domain_id, elem->last_gtid->server_id,
1749              elem->last_gtid->seq_no);
1750     res= 1;
1751   }
1752   mysql_mutex_unlock(&LOCK_binlog_state);
1753   return res;
1754 }
1755 
1756 
1757 /*
1758   When we see a new GTID that will not be binlogged (eg. slave thread
1759   with --log-slave-updates=0), then we need to remember to allocate any
1760   GTID seq_no of our own within that domain starting from there.
1761 
1762   Returns 0 if ok, non-zero if out-of-memory.
1763 */
1764 int
bump_seq_no_if_needed(uint32 domain_id,uint64 seq_no)1765 rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
1766 {
1767   element *elem;
1768   int res;
1769 
1770   mysql_mutex_lock(&LOCK_binlog_state);
1771   if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1772   {
1773     if (elem->seq_no_counter < seq_no)
1774       elem->seq_no_counter= seq_no;
1775     res= 0;
1776     goto end;
1777   }
1778 
1779   /* We need to allocate a new, empty element to remember the next seq_no. */
1780   if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem),
1781                                    MYF(MY_WME))))
1782   {
1783     res= 1;
1784     goto end;
1785   }
1786 
1787   elem->domain_id= domain_id;
1788   my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32,
1789                offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1790                HASH_UNIQUE);
1791   elem->last_gtid= NULL;
1792   elem->seq_no_counter= seq_no;
1793   if (0 == my_hash_insert(&hash, (const uchar *)elem))
1794   {
1795     res= 0;
1796     goto end;
1797   }
1798 
1799   my_hash_free(&elem->hash);
1800   my_free(elem);
1801   res= 1;
1802 
1803 end:
1804   mysql_mutex_unlock(&LOCK_binlog_state);
1805   return res;
1806 }
1807 
1808 
1809 /*
1810   Write binlog state to text file, so we can read it in again without having
1811   to scan last binlog file (normal shutdown/startup, not crash recovery).
1812 
1813   The most recent GTID within each domain_id is written after any other GTID
1814   within this domain.
1815 */
1816 int
write_to_iocache(IO_CACHE * dest)1817 rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
1818 {
1819   ulong i, j;
1820   char buf[21];
1821   int res= 0;
1822 
1823   mysql_mutex_lock(&LOCK_binlog_state);
1824   for (i= 0; i < hash.records; ++i)
1825   {
1826     element *e= (element *)my_hash_element(&hash, i);
1827     if (!e->last_gtid)
1828     {
1829       DBUG_ASSERT(e->hash.records == 0);
1830       continue;
1831     }
1832     for (j= 0; j <= e->hash.records; ++j)
1833     {
1834       const rpl_gtid *gtid;
1835       if (j < e->hash.records)
1836       {
1837         gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
1838         if (gtid == e->last_gtid)
1839           continue;
1840       }
1841       else
1842         gtid= e->last_gtid;
1843 
1844       longlong10_to_str(gtid->seq_no, buf, 10);
1845       if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id,
1846                       buf))
1847       {
1848         res= 1;
1849         goto end;
1850       }
1851     }
1852   }
1853 
1854 end:
1855   mysql_mutex_unlock(&LOCK_binlog_state);
1856   return res;
1857 }
1858 
1859 
1860 int
read_from_iocache(IO_CACHE * src)1861 rpl_binlog_state::read_from_iocache(IO_CACHE *src)
1862 {
1863   /* 10-digit - 10-digit - 20-digit \n \0 */
1864   char buf[10+1+10+1+20+1+1];
1865   const char *p, *end;
1866   rpl_gtid gtid;
1867   int res= 0;
1868 
1869   mysql_mutex_lock(&LOCK_binlog_state);
1870   reset_nolock();
1871   for (;;)
1872   {
1873     size_t len= my_b_gets(src, buf, sizeof(buf));
1874     if (!len)
1875       break;
1876     p= buf;
1877     end= buf + len;
1878     if (gtid_parser_helper(&p, end, &gtid) ||
1879         update_nolock(&gtid, false))
1880     {
1881       res= 1;
1882       break;
1883     }
1884   }
1885   mysql_mutex_unlock(&LOCK_binlog_state);
1886   return res;
1887 }
1888 
1889 
1890 rpl_gtid *
find_nolock(uint32 domain_id,uint32 server_id)1891 rpl_binlog_state::find_nolock(uint32 domain_id, uint32 server_id)
1892 {
1893   element *elem;
1894   if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
1895     return NULL;
1896   return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
1897 }
1898 
1899 rpl_gtid *
find(uint32 domain_id,uint32 server_id)1900 rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
1901 {
1902   rpl_gtid *p;
1903   mysql_mutex_lock(&LOCK_binlog_state);
1904   p= find_nolock(domain_id, server_id);
1905   mysql_mutex_unlock(&LOCK_binlog_state);
1906   return p;
1907 }
1908 
1909 rpl_gtid *
find_most_recent(uint32 domain_id)1910 rpl_binlog_state::find_most_recent(uint32 domain_id)
1911 {
1912   element *elem;
1913   rpl_gtid *gtid= NULL;
1914 
1915   mysql_mutex_lock(&LOCK_binlog_state);
1916   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1917   if (elem && elem->last_gtid)
1918     gtid= elem->last_gtid;
1919   mysql_mutex_unlock(&LOCK_binlog_state);
1920 
1921   return gtid;
1922 }
1923 
1924 
1925 uint32
count()1926 rpl_binlog_state::count()
1927 {
1928   uint32 c= 0;
1929   uint32 i;
1930 
1931   mysql_mutex_lock(&LOCK_binlog_state);
1932   for (i= 0; i < hash.records; ++i)
1933     c+= ((element *)my_hash_element(&hash, i))->hash.records;
1934   mysql_mutex_unlock(&LOCK_binlog_state);
1935 
1936   return c;
1937 }
1938 
1939 
1940 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)1941 rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
1942 {
1943   uint32 i, j, pos;
1944   int res= 0;
1945 
1946   mysql_mutex_lock(&LOCK_binlog_state);
1947   pos= 0;
1948   for (i= 0; i < hash.records; ++i)
1949   {
1950     element *e= (element *)my_hash_element(&hash, i);
1951     if (!e->last_gtid)
1952     {
1953       DBUG_ASSERT(e->hash.records==0);
1954       continue;
1955     }
1956     for (j= 0; j <= e->hash.records; ++j)
1957     {
1958       const rpl_gtid *gtid;
1959       if (j < e->hash.records)
1960       {
1961         gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1962         if (gtid == e->last_gtid)
1963           continue;
1964       }
1965       else
1966         gtid= e->last_gtid;
1967 
1968       if (pos >= list_size)
1969       {
1970         res= 1;
1971         goto end;
1972       }
1973       memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
1974     }
1975   }
1976 
1977 end:
1978   mysql_mutex_unlock(&LOCK_binlog_state);
1979   return res;
1980 }
1981 
1982 
1983 /*
1984   Get a list of the most recently binlogged GTID, for each domain_id.
1985 
1986   This can be used when switching from being a master to being a slave,
1987   to know where to start replicating from the new master.
1988 
1989   The returned list must be de-allocated with my_free().
1990 
1991   Returns 0 for ok, non-zero for out-of-memory.
1992 */
1993 int
get_most_recent_gtid_list(rpl_gtid ** list,uint32 * size)1994 rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
1995 {
1996   uint32 i;
1997   uint32 alloc_size, out_size;
1998   int res= 0;
1999 
2000   out_size= 0;
2001   mysql_mutex_lock(&LOCK_binlog_state);
2002   alloc_size= hash.records;
2003   if (!(*list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
2004                                      alloc_size * sizeof(rpl_gtid), MYF(MY_WME))))
2005   {
2006     res= 1;
2007     goto end;
2008   }
2009   for (i= 0; i < alloc_size; ++i)
2010   {
2011     element *e= (element *)my_hash_element(&hash, i);
2012     if (!e->last_gtid)
2013       continue;
2014     memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
2015   }
2016 
2017 end:
2018   mysql_mutex_unlock(&LOCK_binlog_state);
2019   *size= out_size;
2020   return res;
2021 }
2022 
2023 bool
append_pos(String * str)2024 rpl_binlog_state::append_pos(String *str)
2025 {
2026   uint32 i;
2027 
2028   mysql_mutex_lock(&LOCK_binlog_state);
2029   reset_dynamic(&gtid_sort_array);
2030 
2031   for (i= 0; i < hash.records; ++i)
2032   {
2033     element *e= (element *)my_hash_element(&hash, i);
2034     if (e->last_gtid &&
2035         insert_dynamic(&gtid_sort_array, (const void *) e->last_gtid))
2036     {
2037       mysql_mutex_unlock(&LOCK_binlog_state);
2038       return true;
2039     }
2040   }
2041   rpl_slave_state_tostring_helper(&gtid_sort_array, str);
2042   mysql_mutex_unlock(&LOCK_binlog_state);
2043 
2044   return false;
2045 }
2046 
2047 
2048 bool
append_state(String * str)2049 rpl_binlog_state::append_state(String *str)
2050 {
2051   uint32 i, j;
2052   bool res= false;
2053 
2054   mysql_mutex_lock(&LOCK_binlog_state);
2055   reset_dynamic(&gtid_sort_array);
2056 
2057   for (i= 0; i < hash.records; ++i)
2058   {
2059     element *e= (element *)my_hash_element(&hash, i);
2060     if (!e->last_gtid)
2061     {
2062       DBUG_ASSERT(e->hash.records==0);
2063       continue;
2064     }
2065     for (j= 0; j <= e->hash.records; ++j)
2066     {
2067       const rpl_gtid *gtid;
2068       if (j < e->hash.records)
2069       {
2070         gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
2071         if (gtid == e->last_gtid)
2072           continue;
2073       }
2074       else
2075         gtid= e->last_gtid;
2076 
2077       if (insert_dynamic(&gtid_sort_array, (const void *) gtid))
2078       {
2079         res= true;
2080         goto end;
2081       }
2082     }
2083   }
2084 
2085   rpl_slave_state_tostring_helper(&gtid_sort_array, str);
2086 
2087 end:
2088   mysql_mutex_unlock(&LOCK_binlog_state);
2089   return res;
2090 }
2091 
2092 /**
2093   Remove domains supplied by the first argument from binlog state.
2094   Removal is done for any domain whose last gtids (from all its servers) match
2095   ones in Gtid list event of the 2nd argument.
2096 
2097   @param  ids               gtid domain id sequence, may contain dups
2098   @param  glev              pointer to Gtid list event describing
2099                             the match condition
2100   @param  errbuf [out]      pointer to possible error message array
2101 
2102   @retval NULL              as success when at least one domain is removed
2103   @retval ""                empty string to indicate ineffective call
2104                             when no domains removed
2105   @retval NOT EMPTY string  otherwise an error message
2106 */
2107 const char*
drop_domain(DYNAMIC_ARRAY * ids,Gtid_list_log_event * glev,char * errbuf)2108 rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids,
2109                               Gtid_list_log_event *glev,
2110                               char* errbuf)
2111 {
2112   DYNAMIC_ARRAY domain_unique; // sequece (unsorted) of unique element*:s
2113   rpl_binlog_state::element* domain_unique_buffer[16];
2114   ulong k, l;
2115   const char* errmsg= NULL;
2116 
2117   DBUG_ENTER("rpl_binlog_state::drop_domain");
2118 
2119   my_init_dynamic_array2(PSI_INSTRUMENT_ME, &domain_unique,
2120                          sizeof(element*), domain_unique_buffer,
2121                          sizeof(domain_unique_buffer) / sizeof(element*), 4, 0);
2122 
2123   mysql_mutex_lock(&LOCK_binlog_state);
2124 
2125   /*
2126     Gtid list is supposed to come from a binlog's Gtid_list event and
2127     therefore should be a subset of the current binlog state. That is
2128     for every domain in the list the binlog state contains a gtid with
2129     sequence number not less than that of the list.
2130     Exceptions of this inclusion rule are:
2131       A. the list may still refer to gtids from already deleted domains.
2132          Files containing them must have been purged whereas the file
2133          with the list is not yet.
2134       B. out of order groups were injected
2135       C. manually build list of binlog files violating the inclusion
2136          constraint.
2137     While A is a normal case (not necessarily distinguishable from C though),
2138     B and C may require the user's attention so any (incl the A's suspected)
2139     inconsistency is diagnosed and *warned*.
2140   */
2141   for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0)
2142   {
2143     rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id,
2144                                          glev->list[l].server_id);
2145     if (!rb_state_gtid)
2146       sprintf(errbuf,
2147               "missing gtids from the '%u-%u' domain-server pair which is "
2148               "referred to in the gtid list describing an earlier state. Ignore "
2149               "if the domain ('%u') was already explicitly deleted",
2150               glev->list[l].domain_id, glev->list[l].server_id,
2151               glev->list[l].domain_id);
2152     else if (rb_state_gtid->seq_no < glev->list[l].seq_no)
2153       sprintf(errbuf,
2154               "having a gtid '%u-%u-%llu' which is less than "
2155               "the '%u-%u-%llu' of the gtid list describing an earlier state. "
2156               "The state may have been affected by manually injecting "
2157               "a lower sequence number gtid or via replication",
2158               rb_state_gtid->domain_id, rb_state_gtid->server_id,
2159               rb_state_gtid->seq_no, glev->list[l].domain_id,
2160               glev->list[l].server_id, glev->list[l].seq_no);
2161     if (strlen(errbuf)) // use strlen() as cheap flag
2162       push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2163                           ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2164                           "The current gtid binlog state is incompatible with "
2165                           "a former one %s.", errbuf);
2166   }
2167 
2168   /*
2169     For each domain_id from ids
2170       when no such domain in binlog state
2171         warn && continue
2172       For each domain.server's last gtid
2173         when not locate the last gtid in glev.list
2174           error out binlog state can't change
2175         otherwise continue
2176   */
2177   for (ulong i= 0; i < ids->elements; i++)
2178   {
2179     rpl_binlog_state::element *elem= NULL;
2180     uint32 *ptr_domain_id;
2181     bool not_match;
2182 
2183     ptr_domain_id= (uint32*) dynamic_array_ptr(ids, i);
2184     elem= (rpl_binlog_state::element *)
2185       my_hash_search(&hash, (const uchar *) ptr_domain_id, 0);
2186     if (!elem)
2187     {
2188       push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2189                           ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2190                           "The gtid domain being deleted ('%lu') is not in "
2191                           "the current binlog state", (unsigned long) *ptr_domain_id);
2192       continue;
2193     }
2194 
2195     for (not_match= true, k= 0; k < elem->hash.records; k++)
2196     {
2197       rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k);
2198       for (ulong l= 0; l < glev->count && not_match; l++)
2199         not_match= !(*d_gtid == glev->list[l]);
2200     }
2201 
2202     if (not_match)
2203     {
2204       sprintf(errbuf, "binlog files may contain gtids from the domain ('%u') "
2205               "being deleted. Make sure to first purge those files",
2206               *ptr_domain_id);
2207       errmsg= errbuf;
2208       goto end;
2209     }
2210     // compose a sequence of unique pointers to domain object
2211     for (k= 0; k < domain_unique.elements; k++)
2212     {
2213       if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k)
2214           == elem)
2215         break; // domain_id's elem has been already in
2216     }
2217     if (k == domain_unique.elements) // proven not to have duplicates
2218       insert_dynamic(&domain_unique, (uchar*) &elem);
2219   }
2220 
2221   // Domain removal from binlog state
2222   for (k= 0; k < domain_unique.elements; k++)
2223   {
2224     rpl_binlog_state::element *elem= *(rpl_binlog_state::element**)
2225       dynamic_array_ptr(&domain_unique, k);
2226     my_hash_free(&elem->hash);
2227     my_hash_delete(&hash, (uchar*) elem);
2228   }
2229 
2230   DBUG_ASSERT(strlen(errbuf) == 0);
2231 
2232   if (domain_unique.elements == 0)
2233     errmsg= "";
2234 
2235 end:
2236   mysql_mutex_unlock(&LOCK_binlog_state);
2237   delete_dynamic(&domain_unique);
2238 
2239   DBUG_RETURN(errmsg);
2240 }
2241 
slave_connection_state()2242 slave_connection_state::slave_connection_state()
2243 {
2244   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
2245                offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id),
2246                sizeof(uint32), NULL, my_free, HASH_UNIQUE);
2247   my_init_dynamic_array(PSI_INSTRUMENT_ME, &gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
2248 }
2249 
2250 
~slave_connection_state()2251 slave_connection_state::~slave_connection_state()
2252 {
2253   my_hash_free(&hash);
2254   delete_dynamic(&gtid_sort_array);
2255 }
2256 
2257 
2258 /*
2259   Create a hash from the slave GTID state that is sent to master when slave
2260   connects to start replication.
2261 
2262   The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
2263 
2264      0-2-112,1-4-1022
2265 
2266   The state gives for each domain_id the GTID to start replication from for
2267   the corresponding replication stream. So domain_id must be unique.
2268 
2269   Returns 0 if ok, non-zero if error due to malformed input.
2270 
2271   Note that input string is built by slave server, so it will not be incorrect
2272   unless bug/corruption/malicious server. So we just need basic sanity check,
2273   not fancy user-friendly error message.
2274 */
2275 
2276 int
load(const char * slave_request,size_t len)2277 slave_connection_state::load(const char *slave_request, size_t len)
2278 {
2279   const char *p, *end;
2280   uchar *rec;
2281   rpl_gtid *gtid;
2282   const entry *e;
2283 
2284   reset();
2285   p= slave_request;
2286   end= slave_request + len;
2287   if (p == end)
2288     return 0;
2289   for (;;)
2290   {
2291     if (!(rec= (uchar *)my_malloc(PSI_INSTRUMENT_ME, sizeof(entry), MYF(MY_WME))))
2292       return 1;
2293     gtid= &((entry *)rec)->gtid;
2294     if (gtid_parser_helper(&p, end, gtid))
2295     {
2296       my_free(rec);
2297       my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2298       return 1;
2299     }
2300     if ((e= (const entry *)
2301          my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0)))
2302     {
2303       my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
2304                gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id,
2305                e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id);
2306       my_free(rec);
2307       return 1;
2308     }
2309     ((entry *)rec)->flags= 0;
2310     if (my_hash_insert(&hash, rec))
2311     {
2312       my_free(rec);
2313       my_error(ER_OUT_OF_RESOURCES, MYF(0));
2314       return 1;
2315     }
2316     if (p == end)
2317       break;                                         /* Finished. */
2318     if (*p != ',')
2319     {
2320       my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2321       return 1;
2322     }
2323     ++p;
2324   }
2325 
2326   return 0;
2327 }
2328 
2329 
2330 int
load(const rpl_gtid * gtid_list,uint32 count)2331 slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
2332 {
2333   uint32 i;
2334 
2335   reset();
2336   for (i= 0; i < count; ++i)
2337     if (update(&gtid_list[i]))
2338       return 1;
2339   return 0;
2340 }
2341 
2342 
2343 static int
slave_connection_state_load_cb(rpl_gtid * gtid,void * data)2344 slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
2345 {
2346   slave_connection_state *state= (slave_connection_state *)data;
2347   return state->update(gtid);
2348 }
2349 
2350 
2351 /*
2352   Same as rpl_slave_state::tostring(), but populates a slave_connection_state
2353   instead.
2354 */
2355 int
load(rpl_slave_state * state,rpl_gtid * extra_gtids,uint32 num_extra)2356 slave_connection_state::load(rpl_slave_state *state,
2357                              rpl_gtid *extra_gtids, uint32 num_extra)
2358 {
2359   reset();
2360   return state->iterate(slave_connection_state_load_cb, this,
2361                         extra_gtids, num_extra, false);
2362 }
2363 
2364 
2365 slave_connection_state::entry *
find_entry(uint32 domain_id)2366 slave_connection_state::find_entry(uint32 domain_id)
2367 {
2368   return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
2369 }
2370 
2371 
2372 rpl_gtid *
find(uint32 domain_id)2373 slave_connection_state::find(uint32 domain_id)
2374 {
2375   entry *e= find_entry(domain_id);
2376   if (!e)
2377     return NULL;
2378   return &e->gtid;
2379 }
2380 
2381 
2382 int
update(const rpl_gtid * in_gtid)2383 slave_connection_state::update(const rpl_gtid *in_gtid)
2384 {
2385   entry *e;
2386   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2387   if (rec)
2388   {
2389     e= (entry *)rec;
2390     e->gtid= *in_gtid;
2391     return 0;
2392   }
2393 
2394   if (!(e= (entry *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME))))
2395     return 1;
2396   e->gtid= *in_gtid;
2397   e->flags= 0;
2398   if (my_hash_insert(&hash, (uchar *)e))
2399   {
2400     my_free(e);
2401     return 1;
2402   }
2403 
2404   return 0;
2405 }
2406 
2407 
2408 void
remove(const rpl_gtid * in_gtid)2409 slave_connection_state::remove(const rpl_gtid *in_gtid)
2410 {
2411   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2412 #ifdef DBUG_ASSERT_EXISTS
2413   bool err;
2414   rpl_gtid *slave_gtid= &((entry *)rec)->gtid;
2415   DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
2416   DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
2417   DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
2418   err=
2419 #endif
2420     my_hash_delete(&hash, rec);
2421   DBUG_ASSERT(!err);
2422 }
2423 
2424 
2425 void
remove_if_present(const rpl_gtid * in_gtid)2426 slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
2427 {
2428   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2429   if (rec)
2430     my_hash_delete(&hash, rec);
2431 }
2432 
2433 
2434 int
to_string(String * out_str)2435 slave_connection_state::to_string(String *out_str)
2436 {
2437   out_str->length(0);
2438   return append_to_string(out_str);
2439 }
2440 
2441 
2442 int
append_to_string(String * out_str)2443 slave_connection_state::append_to_string(String *out_str)
2444 {
2445   uint32 i;
2446   bool first;
2447 
2448   first= true;
2449   for (i= 0; i < hash.records; ++i)
2450   {
2451     const entry *e= (const entry *)my_hash_element(&hash, i);
2452     if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first))
2453       return 1;
2454   }
2455   return 0;
2456 }
2457 
2458 
2459 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)2460 slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
2461 {
2462   uint32 i, pos;
2463 
2464   pos= 0;
2465   for (i= 0; i < hash.records; ++i)
2466   {
2467     entry *e;
2468     if (pos >= list_size)
2469       return 1;
2470     e= (entry *)my_hash_element(&hash, i);
2471     memcpy(&gtid_list[pos++], &e->gtid, sizeof(e->gtid));
2472   }
2473 
2474   return 0;
2475 }
2476 
2477 
2478 /*
2479   Check if the GTID position has been reached, for mysql_binlog_send().
2480 
2481   The position has not been reached if we have anything in the state, unless
2482   it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
2483   belong to this master at all), or the START_OWN_SLAVE_POS (which means that
2484   we start on an old position from when the server was a slave with
2485   --log-slave-updates=0).
2486 */
2487 bool
is_pos_reached()2488 slave_connection_state::is_pos_reached()
2489 {
2490   uint32 i;
2491 
2492   for (i= 0; i < hash.records; ++i)
2493   {
2494     entry *e= (entry *)my_hash_element(&hash, i);
2495     if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
2496       return false;
2497   }
2498 
2499   return true;
2500 }
2501 
2502 
2503 /*
2504   Execute a MASTER_GTID_WAIT().
2505   The position to wait for is in gtid_str in string form.
2506   The timeout in microseconds is in timeout_us, zero means no timeout.
2507 
2508   Returns:
2509    1 for error.
2510    0 for wait completed.
2511   -1 for wait timed out.
2512 */
2513 int
wait_for_pos(THD * thd,String * gtid_str,longlong timeout_us)2514 gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
2515 {
2516   int err;
2517   rpl_gtid *wait_pos;
2518   uint32 count, i;
2519   struct timespec wait_until, *wait_until_ptr;
2520   ulonglong before;
2521 
2522   /* Wait for the empty position returns immediately. */
2523   if (gtid_str->length() == 0)
2524   {
2525     status_var_increment(thd->status_var.master_gtid_wait_count);
2526     return 0;
2527   }
2528 
2529   if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
2530                                             &count)))
2531   {
2532     my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2533     return 1;
2534   }
2535   status_var_increment(thd->status_var.master_gtid_wait_count);
2536   before= microsecond_interval_timer();
2537 
2538   if (timeout_us >= 0)
2539   {
2540     set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
2541     wait_until_ptr= &wait_until;
2542   }
2543   else
2544     wait_until_ptr= NULL;
2545   err= 0;
2546   for (i= 0; i < count; ++i)
2547   {
2548     if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
2549       break;
2550   }
2551   switch (err)
2552   {
2553     case -1:
2554       status_var_increment(thd->status_var.master_gtid_wait_timeouts);
2555       /* fall through */
2556     case 0:
2557       status_var_add(thd->status_var.master_gtid_wait_time,
2558                      static_cast<ulong>
2559                      (microsecond_interval_timer() - before));
2560   }
2561   my_free(wait_pos);
2562   return err;
2563 }
2564 
2565 
2566 void
promote_new_waiter(gtid_waiting::hash_element * he)2567 gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
2568 {
2569   queue_element *qe;
2570 
2571   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2572   if (queue_empty(&he->queue))
2573     return;
2574   qe= (queue_element *)queue_top(&he->queue);
2575   qe->do_small_wait= true;
2576   mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2577 }
2578 
2579 void
process_wait_hash(uint64 wakeup_seq_no,gtid_waiting::hash_element * he)2580 gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
2581                                 gtid_waiting::hash_element *he)
2582 {
2583   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2584 
2585   for (;;)
2586   {
2587     queue_element *qe;
2588 
2589     if (queue_empty(&he->queue))
2590       break;
2591     qe= (queue_element *)queue_top(&he->queue);
2592     if (qe->wait_seq_no > wakeup_seq_no)
2593       break;
2594     DBUG_ASSERT(!qe->done);
2595     queue_remove_top(&he->queue);
2596     qe->done= true;;
2597     mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2598   }
2599 }
2600 
2601 
2602 /*
2603   Execute a MASTER_GTID_WAIT() for one specific domain.
2604 
2605   The implementation is optimised primarily for (1) minimal performance impact
2606   on the slave replication threads, and secondarily for (2) quick performance
2607   of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
2608   read to clients in an async replication read-scaleout scenario.
2609 
2610   To achieve (1), we have a "small" wait and a "large" wait. The small wait
2611   contends with the replication threads on the lock on the gtid_slave_pos, so
2612   only minimal processing is done under that lock, and only a single waiter at
2613   a time does the small wait.
2614 
2615   If there is already a small waiter, a new thread will either replace the
2616   small waiter (if it needs to wait for an earlier sequence number), or
2617   instead do a "large" wait.
2618 
2619   Once awoken on the small wait, the waiting thread releases the lock shared
2620   with the SQL threads quickly, and then processes all waiters currently doing
2621   the large wait using a different lock that does not impact replication.
2622 
2623   This way, the SQL threads only need to do a single check + possibly a
2624   pthread_cond_signal() when updating the gtid_slave_state, and the time that
2625   non-SQL threads contend for the lock on gtid_slave_state is minimized.
2626 
2627   There is always at least one thread that has the responsibility to ensure
2628   that there is a small waiter; this thread has queue_element::do_small_wait
2629   set to true. This thread will do the small wait until it is done, at which
2630   point it will make sure to pass on the responsibility to another thread.
2631   Normally only one thread has do_small_wait==true, but it can occasionally
2632   happen that there is more than one, when threads race one another for the
2633   lock on the small wait (this results in slightly increased activity on the
2634   small lock but is otherwise harmless).
2635 
2636   Returns:
2637      0  Wait completed normally
2638     -1  Wait completed due to timeout
2639      1  An error (my_error() will have been called to set the error in the da)
2640 */
2641 int
wait_for_gtid(THD * thd,rpl_gtid * wait_gtid,struct timespec * wait_until)2642 gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
2643                             struct timespec *wait_until)
2644 {
2645   bool timed_out= false;
2646 #ifdef HAVE_REPLICATION
2647   queue_element elem;
2648   uint32 domain_id= wait_gtid->domain_id;
2649   uint64 seq_no= wait_gtid->seq_no;
2650   hash_element *he;
2651   rpl_slave_state::element *slave_state_elem= NULL;
2652   PSI_stage_info old_stage;
2653   bool did_enter_cond= false;
2654 
2655   elem.wait_seq_no= seq_no;
2656   elem.thd= thd;
2657   elem.done= false;
2658 
2659   mysql_mutex_lock(&LOCK_gtid_waiting);
2660   if (!(he= get_entry(wait_gtid->domain_id)))
2661   {
2662     mysql_mutex_unlock(&LOCK_gtid_waiting);
2663     return 1;
2664   }
2665   /*
2666     If there is already another waiter with seq_no no larger than our own,
2667     we are sure that there is already a small waiter that will wake us up
2668     (or later pass the small wait responsibility to us). So in this case, we
2669     do not need to touch the small wait lock at all.
2670   */
2671   elem.do_small_wait=
2672     (queue_empty(&he->queue) ||
2673      ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
2674 
2675   if (register_in_wait_queue(thd, wait_gtid, he, &elem))
2676   {
2677     mysql_mutex_unlock(&LOCK_gtid_waiting);
2678     return 1;
2679   }
2680   /*
2681     Loop, doing either the small or large wait as appropriate, until either
2682     the position waited for is reached, or we get a kill or timeout.
2683   */
2684   for (;;)
2685   {
2686     mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2687 
2688     if (elem.do_small_wait)
2689     {
2690       uint64 wakeup_seq_no;
2691       queue_element *cur_waiter;
2692 
2693       mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2694       /*
2695         The elements in the gtid_slave_state_hash are never re-allocated once
2696         they enter the hash, so we do not need to re-do the lookup after releasing
2697         and re-aquiring the lock.
2698       */
2699       if (!slave_state_elem &&
2700           !(slave_state_elem= rpl_global_gtid_slave_state->get_element(domain_id)))
2701       {
2702         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2703         remove_from_wait_queue(he, &elem);
2704         promote_new_waiter(he);
2705         if (did_enter_cond)
2706           thd->EXIT_COND(&old_stage);
2707         else
2708           mysql_mutex_unlock(&LOCK_gtid_waiting);
2709         my_error(ER_OUT_OF_RESOURCES, MYF(0));
2710         return 1;
2711       }
2712 
2713       if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
2714       {
2715         /*
2716           We do not have to wait. (We will be removed from the wait queue when
2717           we call process_wait_hash() below.
2718         */
2719         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2720       }
2721       else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
2722                slave_state_elem->min_wait_seq_no <= seq_no)
2723       {
2724         /*
2725           There is already a suitable small waiter, go do the large wait.
2726           (Normally we would not have needed to check the small wait in this
2727           case, but it can happen if we race with another thread for the small
2728           lock).
2729         */
2730         elem.do_small_wait= false;
2731         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2732       }
2733       else
2734       {
2735         /*
2736           We have to do the small wait ourselves (stealing it from any thread
2737           that might already be waiting for a later seq_no).
2738         */
2739         slave_state_elem->gtid_waiter= &elem;
2740         slave_state_elem->min_wait_seq_no= seq_no;
2741         if (cur_waiter)
2742         {
2743           /* We stole the wait, so wake up the old waiting thread. */
2744           mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
2745         }
2746 
2747         /* Release the large lock, and do the small wait. */
2748         if (did_enter_cond)
2749         {
2750           thd->EXIT_COND(&old_stage);
2751           did_enter_cond= false;
2752         }
2753         else
2754           mysql_mutex_unlock(&LOCK_gtid_waiting);
2755         thd->ENTER_COND(&slave_state_elem->COND_wait_gtid,
2756                         &rpl_global_gtid_slave_state->LOCK_slave_state,
2757                         &stage_master_gtid_wait_primary, &old_stage);
2758         do
2759         {
2760           if (unlikely(thd->check_killed(1)))
2761             break;
2762           else if (wait_until)
2763           {
2764             int err=
2765               mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
2766                                    &rpl_global_gtid_slave_state->LOCK_slave_state,
2767                                    wait_until);
2768             if (err == ETIMEDOUT || err == ETIME)
2769             {
2770               timed_out= true;
2771               break;
2772             }
2773           }
2774           else
2775             mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
2776                             &rpl_global_gtid_slave_state->LOCK_slave_state);
2777         } while (slave_state_elem->gtid_waiter == &elem);
2778         wakeup_seq_no= slave_state_elem->highest_seq_no;
2779         /*
2780           If we aborted due to timeout or kill, remove us as waiter.
2781 
2782           If we were replaced by another waiter with a smaller seq_no, then we
2783           no longer have responsibility for the small wait.
2784         */
2785         if ((cur_waiter= slave_state_elem->gtid_waiter))
2786         {
2787           if (cur_waiter == &elem)
2788             slave_state_elem->gtid_waiter= NULL;
2789           else if (slave_state_elem->min_wait_seq_no <= seq_no)
2790             elem.do_small_wait= false;
2791         }
2792         thd->EXIT_COND(&old_stage);
2793 
2794         mysql_mutex_lock(&LOCK_gtid_waiting);
2795       }
2796 
2797       /*
2798         Note that hash_entry pointers do not change once allocated, so we do
2799         not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
2800       */
2801       process_wait_hash(wakeup_seq_no, he);
2802     }
2803     else
2804     {
2805       /* Do the large wait. */
2806       if (!did_enter_cond)
2807       {
2808         thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
2809                         &stage_master_gtid_wait, &old_stage);
2810         did_enter_cond= true;
2811       }
2812       while (!elem.done && likely(!thd->check_killed(1)))
2813       {
2814         thd_wait_begin(thd, THD_WAIT_BINLOG);
2815         if (wait_until)
2816         {
2817           int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
2818                                         &LOCK_gtid_waiting, wait_until);
2819           if (err == ETIMEDOUT || err == ETIME)
2820             timed_out= true;
2821         }
2822         else
2823           mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
2824         thd_wait_end(thd);
2825         if (elem.do_small_wait || timed_out)
2826           break;
2827       }
2828     }
2829 
2830     if ((thd->killed || timed_out) && !elem.done)
2831     {
2832       /* Aborted, so remove ourselves from the hash. */
2833       remove_from_wait_queue(he, &elem);
2834       elem.done= true;
2835     }
2836     if (elem.done)
2837     {
2838       /*
2839         If our wait is done, but we have (or were passed) responsibility for
2840         the small wait, then we need to pass on that task to someone else.
2841       */
2842       if (elem.do_small_wait)
2843         promote_new_waiter(he);
2844       break;
2845     }
2846   }
2847 
2848   if (did_enter_cond)
2849     thd->EXIT_COND(&old_stage);
2850   else
2851     mysql_mutex_unlock(&LOCK_gtid_waiting);
2852   if (thd->killed)
2853     thd->send_kill_message();
2854 #endif  /* HAVE_REPLICATION */
2855   return timed_out ? -1 : 0;
2856 }
2857 
2858 
2859 static void
free_hash_element(void * p)2860 free_hash_element(void *p)
2861 {
2862   gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
2863   delete_queue(&e->queue);
2864   my_free(e);
2865 }
2866 
2867 
2868 void
init()2869 gtid_waiting::init()
2870 {
2871   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
2872                offsetof(hash_element, domain_id), sizeof(uint32), NULL,
2873                free_hash_element, HASH_UNIQUE);
2874   mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
2875 }
2876 
2877 
2878 void
destroy()2879 gtid_waiting::destroy()
2880 {
2881   mysql_mutex_destroy(&LOCK_gtid_waiting);
2882   my_hash_free(&hash);
2883 }
2884 
2885 
2886 static int
cmp_queue_elem(void *,uchar * a,uchar * b)2887 cmp_queue_elem(void *, uchar *a, uchar *b)
2888 {
2889   uint64 seq_no_a= *(uint64 *)a;
2890   uint64 seq_no_b= *(uint64 *)b;
2891   if (seq_no_a < seq_no_b)
2892     return -1;
2893   else if (seq_no_a == seq_no_b)
2894     return 0;
2895   else
2896     return 1;
2897 }
2898 
2899 
2900 gtid_waiting::hash_element *
get_entry(uint32 domain_id)2901 gtid_waiting::get_entry(uint32 domain_id)
2902 {
2903   hash_element *e;
2904 
2905   if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
2906     return e;
2907 
2908   if (!(e= (hash_element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME))))
2909     return NULL;
2910 
2911   if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
2912                  cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
2913   {
2914     my_error(ER_OUT_OF_RESOURCES, MYF(0));
2915     my_free(e);
2916     return NULL;
2917   }
2918   e->domain_id= domain_id;
2919   if (my_hash_insert(&hash, (uchar *)e))
2920   {
2921     my_error(ER_OUT_OF_RESOURCES, MYF(0));
2922     delete_queue(&e->queue);
2923     my_free(e);
2924     return NULL;
2925   }
2926   return e;
2927 }
2928 
2929 
2930 int
register_in_wait_queue(THD * thd,rpl_gtid * wait_gtid,gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2931 gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
2932                                      gtid_waiting::hash_element *he,
2933                                      gtid_waiting::queue_element *elem)
2934 {
2935   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2936 
2937   if (queue_insert_safe(&he->queue, (uchar *)elem))
2938   {
2939     my_error(ER_OUT_OF_RESOURCES, MYF(0));
2940     return 1;
2941   }
2942 
2943   return 0;
2944 }
2945 
2946 
2947 void
remove_from_wait_queue(gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2948 gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
2949                                      gtid_waiting::queue_element *elem)
2950 {
2951   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2952 
2953   queue_remove(&he->queue, elem->queue_idx);
2954 }
2955