1 /* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
2    Copyright (c) 2020, MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
16 
17 
18 /* Definitions for MariaDB global transaction ID (GTID). */
19 
20 #include "mariadb.h"
21 #include "sql_priv.h"
22 #include "unireg.h"
23 #include "mariadb.h"
24 #include "sql_base.h"
25 #include "sql_parse.h"
26 #include "key.h"
27 #include "rpl_gtid.h"
28 #include "rpl_rli.h"
29 #include "slave.h"
30 #include "log_event.h"
31 
32 const LEX_CSTRING rpl_gtid_slave_state_table_name=
33   { STRING_WITH_LEN("gtid_slave_pos") };
34 
35 
36 void
update_state_hash(uint64 sub_id,rpl_gtid * gtid,void * hton,rpl_group_info * rgi)37 rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
38                                    rpl_group_info *rgi)
39 {
40   int err;
41   /*
42     Add the gtid to the HASH in the replication slave state.
43 
44     We must do this only _after_ commit, so that for parallel replication,
45     there will not be an attempt to delete the corresponding table row before
46     it is even committed.
47   */
48   mysql_mutex_lock(&LOCK_slave_state);
49   err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
50   mysql_mutex_unlock(&LOCK_slave_state);
51   if (err)
52   {
53     sql_print_warning("Slave: Out of memory during slave state maintenance. "
54                       "Some no longer necessary rows in table "
55                       "mysql.%s may be left undeleted.",
56                       rpl_gtid_slave_state_table_name.str);
57     /*
58       Such failure is not fatal. We will fail to delete the row for this
59       GTID, but it will do no harm and will be removed automatically on next
60       server restart.
61     */
62   }
63 }
64 
65 
66 int
record_and_update_gtid(THD * thd,rpl_group_info * rgi)67 rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
68 {
69   DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
70 
71   /*
72     Update the GTID position, if we have it and did not already update
73     it in a GTID transaction.
74   */
75   if (rgi->gtid_pending)
76   {
77     uint64 sub_id= rgi->gtid_sub_id;
78     void *hton= NULL;
79 
80     rgi->gtid_pending= false;
81     if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
82     {
83       if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton))
84         DBUG_RETURN(1);
85       update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
86     }
87     rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
88   }
89   DBUG_RETURN(0);
90 }
91 
92 
93 /*
94   Check GTID event execution when --gtid-ignore-duplicates.
95 
96   The idea with --gtid-ignore-duplicates is that we allow multiple master
97   connections (in multi-source replication) to all receive the same GTIDs and
98   event groups. Only one instance of each is applied; we use the sequence
99   number in the GTID to decide whether a GTID has already been applied.
100 
101   So if the seq_no of a GTID (or a higher sequence number) has already been
102   applied, then the event should be skipped. If not then the event should be
103   applied.
104 
105   To avoid two master connections tring to apply the same event
106   simultaneously, only one is allowed to work in any given domain at any point
107   in time. The associated Relay_log_info object is called the owner of the
108   domain (and there can be multiple parallel worker threads working in that
109   domain for that Relay_log_info). Any other Relay_log_info/master connection
110   must wait for the domain to become free, or for their GTID to have been
111   applied, before being allowed to proceed.
112 
113   Returns:
114     0  This GTID is already applied, it should be skipped.
115     1  The GTID is not yet applied; this rli is now the owner, and must apply
116        the event and release the domain afterwards.
117    -1  Error (out of memory to allocate a new element for the domain).
118 */
119 int
check_duplicate_gtid(rpl_gtid * gtid,rpl_group_info * rgi)120 rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
121 {
122   uint32 domain_id= gtid->domain_id;
123   uint64 seq_no= gtid->seq_no;
124   rpl_slave_state::element *elem;
125   int res;
126   bool did_enter_cond= false;
127   PSI_stage_info old_stage;
128   THD *UNINIT_VAR(thd);
129   Relay_log_info *rli= rgi->rli;
130 
131   mysql_mutex_lock(&LOCK_slave_state);
132   if (!(elem= get_element(domain_id)))
133   {
134     my_error(ER_OUT_OF_RESOURCES, MYF(0));
135     res= -1;
136     goto err;
137   }
138   /*
139     Note that the elem pointer does not change once inserted in the hash. So
140     we can re-use the pointer without looking it up again in the hash after
141     each lock release and re-take.
142   */
143 
144   for (;;)
145   {
146     if (elem->highest_seq_no >= seq_no)
147     {
148       /* This sequence number is already applied, ignore it. */
149       res= 0;
150       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
151       break;
152     }
153     if (!elem->owner_rli)
154     {
155       /* The domain became free, grab it and apply the event. */
156       elem->owner_rli= rli;
157       elem->owner_count= 1;
158       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
159       res= 1;
160       break;
161     }
162     if (elem->owner_rli == rli)
163     {
164       /* Already own this domain, increment reference count and apply event. */
165       ++elem->owner_count;
166       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
167       res= 1;
168       break;
169     }
170     thd= rgi->thd;
171     if (unlikely(thd->check_killed()))
172     {
173       res= -1;
174       break;
175     }
176     /*
177       Someone else is currently processing this GTID (or an earlier one).
178       Wait for them to complete (or fail), and then check again.
179     */
180     if (!did_enter_cond)
181     {
182       thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
183                       &stage_gtid_wait_other_connection, &old_stage);
184       did_enter_cond= true;
185     }
186     mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
187                     &LOCK_slave_state);
188   }
189 
190 err:
191   if (did_enter_cond)
192     thd->EXIT_COND(&old_stage);
193   else
194     mysql_mutex_unlock(&LOCK_slave_state);
195   return res;
196 }
197 
198 
199 void
release_domain_owner(rpl_group_info * rgi)200 rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
201 {
202   element *elem= NULL;
203 
204   mysql_mutex_lock(&LOCK_slave_state);
205   if (!(elem= get_element(rgi->current_gtid.domain_id)))
206   {
207     /*
208       We cannot really deal with error here, as we are already called in an
209       error handling case (transaction failure and rollback).
210 
211       However, get_element() only fails if the element did not exist already
212       and could not be allocated due to out-of-memory - and if it did not
213       exist, then we would not get here in the first place.
214     */
215     mysql_mutex_unlock(&LOCK_slave_state);
216     return;
217   }
218 
219   if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
220   {
221     uint32 count= elem->owner_count;
222     DBUG_ASSERT(count > 0);
223     DBUG_ASSERT(elem->owner_rli == rgi->rli);
224     --count;
225     elem->owner_count= count;
226     if (count == 0)
227     {
228       elem->owner_rli= NULL;
229       mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
230     }
231   }
232   rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
233   mysql_mutex_unlock(&LOCK_slave_state);
234 }
235 
236 
237 static void
rpl_slave_state_free_element(void * arg)238 rpl_slave_state_free_element(void *arg)
239 {
240   struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
241   mysql_cond_destroy(&elem->COND_wait_gtid);
242   mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
243   my_free(elem);
244 }
245 
246 
rpl_slave_state()247 rpl_slave_state::rpl_slave_state()
248   : last_sub_id(0), gtid_pos_tables(0), loaded(false)
249 {
250   mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
251                    MY_MUTEX_INIT_SLOW);
252   my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
253                sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
254   my_init_dynamic_array(&gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
255 }
256 
257 
~rpl_slave_state()258 rpl_slave_state::~rpl_slave_state()
259 {
260   free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables);
261   truncate_hash();
262   my_hash_free(&hash);
263   delete_dynamic(&gtid_sort_array);
264   mysql_mutex_destroy(&LOCK_slave_state);
265 }
266 
267 
268 void
truncate_hash()269 rpl_slave_state::truncate_hash()
270 {
271   uint32 i;
272 
273   for (i= 0; i < hash.records; ++i)
274   {
275     element *e= (element *)my_hash_element(&hash, i);
276     list_element *l= e->list;
277     list_element *next;
278     while (l)
279     {
280       next= l->next;
281       my_free(l);
282       l= next;
283     }
284     /* The element itself is freed by the hash element free function. */
285   }
286   my_hash_reset(&hash);
287 }
288 
289 
290 int
update(uint32 domain_id,uint32 server_id,uint64 sub_id,uint64 seq_no,void * hton,rpl_group_info * rgi)291 rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
292                         uint64 seq_no, void *hton, rpl_group_info *rgi)
293 {
294   element *elem= NULL;
295   list_element *list_elem= NULL;
296 
297   DBUG_ASSERT(hton || !loaded);
298   if (!(elem= get_element(domain_id)))
299     return 1;
300 
301   if (seq_no > elem->highest_seq_no)
302     elem->highest_seq_no= seq_no;
303   if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
304   {
305     /*
306       Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
307       Signal (and remove) them. The waiter will handle all the processing
308       of all pending MASTER_GTID_WAIT(), so we do not slow down the
309       replication SQL thread.
310     */
311     mysql_mutex_assert_owner(&LOCK_slave_state);
312     elem->gtid_waiter= NULL;
313     mysql_cond_broadcast(&elem->COND_wait_gtid);
314   }
315 
316   if (rgi)
317   {
318     if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
319     {
320 #ifdef DBUG_ASSERT_EXISTS
321       Relay_log_info *rli= rgi->rli;
322 #endif
323       uint32 count= elem->owner_count;
324       DBUG_ASSERT(count > 0);
325       DBUG_ASSERT(elem->owner_rli == rli);
326       --count;
327       elem->owner_count= count;
328       if (count == 0)
329       {
330         elem->owner_rli= NULL;
331         mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
332       }
333     }
334     rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
335 
336 #ifdef HAVE_REPLICATION
337     rgi->pending_gtid_deletes_clear();
338 #endif
339   }
340 
341   if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
342     return 1;
343   list_elem->server_id= server_id;
344   list_elem->sub_id= sub_id;
345   list_elem->seq_no= seq_no;
346   list_elem->hton= hton;
347 
348   elem->add(list_elem);
349   if (last_sub_id < sub_id)
350     last_sub_id= sub_id;
351 
352   return 0;
353 }
354 
355 
356 struct rpl_slave_state::element *
get_element(uint32 domain_id)357 rpl_slave_state::get_element(uint32 domain_id)
358 {
359   struct element *elem;
360 
361   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
362   if (elem)
363     return elem;
364 
365   if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
366     return NULL;
367   elem->list= NULL;
368   elem->domain_id= domain_id;
369   elem->highest_seq_no= 0;
370   elem->gtid_waiter= NULL;
371   elem->owner_rli= NULL;
372   elem->owner_count= 0;
373   mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
374   mysql_cond_init(key_COND_gtid_ignore_duplicates,
375                   &elem->COND_gtid_ignore_duplicates, 0);
376   if (my_hash_insert(&hash, (uchar *)elem))
377   {
378     my_free(elem);
379     return NULL;
380   }
381   return elem;
382 }
383 
384 
385 int
put_back_list(uint32 domain_id,list_element * list)386 rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
387 {
388   element *e;
389   int err= 0;
390 
391   mysql_mutex_lock(&LOCK_slave_state);
392   if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
393   {
394     err= 1;
395     goto end;
396   }
397   while (list)
398   {
399     list_element *next= list->next;
400     e->add(list);
401     list= next;
402   }
403 
404 end:
405   mysql_mutex_unlock(&LOCK_slave_state);
406   return err;
407 }
408 
409 
410 int
truncate_state_table(THD * thd)411 rpl_slave_state::truncate_state_table(THD *thd)
412 {
413   TABLE_LIST tlist;
414   int err= 0;
415 
416   tmp_disable_binlog(thd);
417   tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE);
418   if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
419   {
420     tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql",
421                      rpl_gtid_slave_state_table_name.str, false);
422     err= tlist.table->file->ha_truncate();
423 
424     if (err)
425     {
426       ha_rollback_trans(thd, FALSE);
427       close_thread_tables(thd);
428       ha_rollback_trans(thd, TRUE);
429     }
430     else
431     {
432       ha_commit_trans(thd, FALSE);
433       close_thread_tables(thd);
434       ha_commit_trans(thd, TRUE);
435     }
436     thd->release_transactional_locks();
437   }
438 
439   reenable_binlog(thd);
440   return err;
441 }
442 
443 
444 static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
445   { { STRING_WITH_LEN("domain_id") },
446     { STRING_WITH_LEN("int(10) unsigned") },
447     {NULL, 0} },
448   { { STRING_WITH_LEN("sub_id") },
449     { STRING_WITH_LEN("bigint(20) unsigned") },
450     {NULL, 0} },
451   { { STRING_WITH_LEN("server_id") },
452     { STRING_WITH_LEN("int(10) unsigned") },
453     {NULL, 0} },
454   { { STRING_WITH_LEN("seq_no") },
455     { STRING_WITH_LEN("bigint(20) unsigned") },
456     {NULL, 0} },
457 };
458 
459 static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
460 
461 static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
462   array_elements(mysql_rpl_slave_state_coltypes),
463   mysql_rpl_slave_state_coltypes,
464   array_elements(mysql_rpl_slave_state_pk_parts),
465   mysql_rpl_slave_state_pk_parts
466 };
467 
468 static Table_check_intact_log_error gtid_table_intact;
469 
470 /*
471   Check that the mysql.gtid_slave_pos table has the correct definition.
472 */
473 int
gtid_check_rpl_slave_state_table(TABLE * table)474 gtid_check_rpl_slave_state_table(TABLE *table)
475 {
476   int err;
477 
478   if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
479     my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
480              rpl_gtid_slave_state_table_name.str);
481   return err;
482 }
483 
484 
485 /*
486   Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
487   that is already in use by the current transaction, if any.
488 */
489 void
select_gtid_pos_table(THD * thd,LEX_CSTRING * out_tablename)490 rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
491 {
492   struct gtid_pos_table *list, *table_entry, *default_entry;
493 
494   /*
495     See comments on rpl_slave_state::gtid_pos_tables for rules around proper
496     access to the list.
497   */
498   list= (struct gtid_pos_table *)
499     my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
500 
501   Ha_trx_info *ha_info;
502   uint count = 0;
503   for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next())
504   {
505     void *trx_hton= ha_info->ht();
506     table_entry= list;
507 
508     if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
509       continue;
510     while (table_entry)
511     {
512       if (table_entry->table_hton == trx_hton)
513       {
514         if (likely(table_entry->state == GTID_POS_AVAILABLE))
515         {
516           *out_tablename= table_entry->table_name;
517           /*
518             Check if this is a cross-engine transaction, so we can correctly
519             maintain the rpl_transactions_multi_engine status variable.
520           */
521           if (count >= 1)
522             statistic_increment(rpl_transactions_multi_engine, LOCK_status);
523           else
524           {
525             for (;;)
526             {
527               ha_info= ha_info->next();
528               if (!ha_info)
529                 break;
530               if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
531               {
532                 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
533                 break;
534               }
535             }
536           }
537           return;
538         }
539         /*
540           This engine is marked to automatically create the table.
541           We cannot easily do this here (possibly in the middle of a
542           transaction). But we can request the slave background thread
543           to create it, and in a short while it should become available
544           for following transactions.
545         */
546 #ifdef HAVE_REPLICATION
547         slave_background_gtid_pos_create_request(table_entry);
548 #endif
549         break;
550       }
551       table_entry= table_entry->next;
552     }
553     ++count;
554   }
555   /*
556     If we cannot find any table whose engine matches an engine that is
557     already active in the transaction, or if there is no current transaction
558     engines available, we return the default gtid_slave_pos table.
559   */
560   default_entry= (struct gtid_pos_table *)
561     my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE);
562   *out_tablename= default_entry->table_name;
563   /* Record in status that we failed to find a suitable gtid_pos table. */
564   if (count > 0)
565   {
566     statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
567     if (count > 1)
568       statistic_increment(rpl_transactions_multi_engine, LOCK_status);
569   }
570 }
571 
572 
573 /*
574   Write a gtid to the replication slave state table.
575 
576     gtid    The global transaction id for this event group.
577     sub_id  Value allocated within the sub_id when the event group was
578             read (sub_id must be consistent with commit order in master binlog).
579     rgi     rpl_group_info context, if we are recording the gtid transactionally
580             as part of replicating a transactional event. NULL if called from
581             outside of a replicated transaction.
582 
583   Note that caller must later ensure that the new gtid and sub_id is inserted
584   into the appropriate HASH element with rpl_slave_state.add(), so that it can
585   be deleted later. But this must only be done after COMMIT if in transaction.
586 */
587 int
record_gtid(THD * thd,const rpl_gtid * gtid,uint64 sub_id,rpl_group_info * rgi,bool in_statement,void ** out_hton)588 rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
589                              rpl_group_info *rgi, bool in_statement,
590                              void **out_hton)
591 {
592   TABLE_LIST tlist;
593   int err= 0, not_sql_thread;
594   bool table_opened= false;
595   TABLE *table;
596   list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
597   uint64 best_sub_id;
598   element *elem;
599   ulonglong thd_saved_option= thd->variables.option_bits;
600   Query_tables_list lex_backup;
601   wait_for_commit* suspended_wfc;
602   void *hton= NULL;
603   LEX_CSTRING gtid_pos_table_name;
604   DBUG_ENTER("record_gtid");
605 
606   *out_hton= NULL;
607   if (unlikely(!loaded))
608   {
609     /*
610       Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or
611       corrupt.
612 
613       We already complained loudly about this, but we can try to continue
614       until the DBA fixes it.
615     */
616     DBUG_RETURN(0);
617   }
618 
619   if (!in_statement)
620     thd->reset_for_next_command();
621 
622   /*
623     Only the SQL thread can call select_gtid_pos_table without a mutex
624     Other threads needs to use a mutex and take into account that the
625     result may change during execution, so we have to make a copy.
626   */
627 
628   if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL)))
629     mysql_mutex_lock(&LOCK_slave_state);
630   select_gtid_pos_table(thd, &gtid_pos_table_name);
631   if (not_sql_thread)
632   {
633     LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str,
634                                             gtid_pos_table_name.length);
635     mysql_mutex_unlock(&LOCK_slave_state);
636     if (!tmp)
637       DBUG_RETURN(1);
638     gtid_pos_table_name= *tmp;
639   }
640 
641   DBUG_EXECUTE_IF("gtid_inject_record_gtid",
642                   {
643                     my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
644                     DBUG_RETURN(1);
645                   } );
646 
647   /*
648     If we are applying a non-transactional event group, we will be committing
649     here a transaction, but that does not imply that the event group has
650     completed or has been binlogged. So we should not trigger
651     wakeup_subsequent_commits() here.
652 
653     Note: An alternative here could be to put a call to mark_start_commit() in
654     stmt_done() before the call to record_and_update_gtid(). This would
655     prevent later calling mark_start_commit() after we have run
656     wakeup_subsequent_commits() from committing the GTID update transaction
657     (which must be avoided to avoid accessing freed group_commit_orderer
658     object). It would also allow following event groups to start slightly
659     earlier. And in the cases where record_gtid() is called without an active
660     transaction, the current statement should have been binlogged already, so
661     binlog order is preserved.
662 
663     But this is rather subtle, and potentially fragile. And it does not really
664     seem worth it; non-transactional loads are unlikely to benefit much from
665     parallel replication in any case. So for now, we go with the simple
666     suspend/resume of wakeup_subsequent_commits() here in record_gtid().
667   */
668   suspended_wfc= thd->suspend_subsequent_commits();
669   thd->lex->reset_n_backup_query_tables_list(&lex_backup);
670   tlist.init_one_table(&MYSQL_SCHEMA_NAME, &gtid_pos_table_name, NULL, TL_WRITE);
671   if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
672     goto end;
673   table_opened= true;
674   table= tlist.table;
675   hton= table->s->db_type();
676 
677   if ((err= gtid_check_rpl_slave_state_table(table)))
678     goto end;
679 
680 #ifdef WITH_WSREP
681   /*
682     Updates in slave state table should not be appended to galera transaction
683     writeset.
684   */
685   thd->wsrep_ignore_table= true;
686 #endif
687 
688   if (!rgi)
689   {
690     DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
691     thd->variables.option_bits&=
692       ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
693                    OPTION_GTID_BEGIN);
694   }
695   else
696     thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
697 
698   bitmap_set_all(table->write_set);
699   table->rpl_write_set= table->write_set;
700 
701   table->field[0]->store((ulonglong)gtid->domain_id, true);
702   table->field[1]->store(sub_id, true);
703   table->field[2]->store((ulonglong)gtid->server_id, true);
704   table->field[3]->store(gtid->seq_no, true);
705   DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
706   if ((err= table->file->ha_write_row(table->record[0])))
707   {
708     table->file->print_error(err, MYF(0));
709     goto end;
710   }
711   *out_hton= hton;
712 
713   if(opt_bin_log &&
714      (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
715                                                        gtid->seq_no)))
716   {
717     my_error(ER_OUT_OF_RESOURCES, MYF(0));
718     goto end;
719   }
720 
721   mysql_mutex_lock(&LOCK_slave_state);
722   if ((elem= get_element(gtid->domain_id)) == NULL)
723   {
724     mysql_mutex_unlock(&LOCK_slave_state);
725     my_error(ER_OUT_OF_RESOURCES, MYF(0));
726     err= 1;
727     goto end;
728   }
729 
730   /* Now pull out all GTIDs that were recorded in this engine. */
731   delete_list = NULL;
732   next_ptr_ptr= &elem->list;
733   cur= elem->list;
734   best_sub_id= 0;
735   best_ptr_ptr= NULL;
736   while (cur)
737   {
738     list_element *next= cur->next;
739     if (cur->hton == hton)
740     {
741       /* Belongs to same engine, so move it to the delete list. */
742       cur->next= delete_list;
743       delete_list= cur;
744       if (cur->sub_id > best_sub_id)
745       {
746         best_sub_id= cur->sub_id;
747         best_ptr_ptr= &delete_list;
748       }
749       else if (best_ptr_ptr == &delete_list)
750         best_ptr_ptr= &cur->next;
751     }
752     else
753     {
754       /* Another engine, leave it in the list. */
755       if (cur->sub_id > best_sub_id)
756       {
757         best_sub_id= cur->sub_id;
758         /* Current best is not on the delete list. */
759         best_ptr_ptr= NULL;
760       }
761       *next_ptr_ptr= cur;
762       next_ptr_ptr= &cur->next;
763     }
764     cur= next;
765   }
766   *next_ptr_ptr= NULL;
767   /*
768     If the highest sub_id element is on the delete list, put it back on the
769     original list, to preserve the highest sub_id element in the table for
770     GTID position recovery.
771   */
772   if (best_ptr_ptr)
773   {
774     cur= *best_ptr_ptr;
775     *best_ptr_ptr= cur->next;
776     cur->next= elem->list;
777     elem->list= cur;
778   }
779   mysql_mutex_unlock(&LOCK_slave_state);
780 
781   if (!delete_list)
782     goto end;
783 
784   /* Now delete any already committed GTIDs. */
785   bitmap_set_bit(table->read_set, table->field[0]->field_index);
786   bitmap_set_bit(table->read_set, table->field[1]->field_index);
787 
788   if ((err= table->file->ha_index_init(0, 0)))
789   {
790     table->file->print_error(err, MYF(0));
791     goto end;
792   }
793   cur = delete_list;
794   while (cur)
795   {
796     uchar key_buffer[4+8];
797 
798     DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
799                     { err= ENOENT;
800                       table->file->print_error(err, MYF(0));
801                       /* `break' does not work inside DBUG_EXECUTE_IF */
802                       goto dbug_break; });
803 
804     next= cur->next;
805 
806     table->field[1]->store(cur->sub_id, true);
807     /* domain_id is already set in table->record[0] from write_row() above. */
808     key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
809     if (table->file->ha_index_read_map(table->record[1], key_buffer,
810                                        HA_WHOLE_KEY, HA_READ_KEY_EXACT))
811       /* We cannot find the row, assume it is already deleted. */
812       ;
813     else if ((err= table->file->ha_delete_row(table->record[1])))
814       table->file->print_error(err, MYF(0));
815     /*
816       In case of error, we still discard the element from the list. We do
817       not want to endlessly error on the same element in case of table
818       corruption or such.
819     */
820     cur= next;
821     if (err)
822       break;
823   }
824 IF_DBUG(dbug_break:, )
825   table->file->ha_index_end();
826 
827 end:
828 
829 #ifdef WITH_WSREP
830   thd->wsrep_ignore_table= false;
831 #endif
832 
833   if (table_opened)
834   {
835     if (err || (err= ha_commit_trans(thd, FALSE)))
836     {
837       /*
838         If error, we need to put any remaining delete_list back into the HASH
839         so we can do another delete attempt later.
840       */
841       if (delete_list)
842       {
843         put_back_list(gtid->domain_id, delete_list);
844         delete_list = 0;
845       }
846 
847       ha_rollback_trans(thd, FALSE);
848     }
849     close_thread_tables(thd);
850     if (rgi)
851     {
852       thd->mdl_context.release_statement_locks();
853       /*
854         Save the list of old gtid entries we deleted. If this transaction
855         fails later for some reason and is rolled back, the deletion of those
856         entries will be rolled back as well, and we will need to put them back
857         on the to-be-deleted list so we can re-do the deletion. Otherwise
858         redundant rows in mysql.gtid_slave_pos may accumulate if transactions
859         are rolled back and retried after record_gtid().
860       */
861 #ifdef HAVE_REPLICATION
862       rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list);
863 #endif
864     }
865     else
866     {
867       thd->release_transactional_locks();
868 #ifdef HAVE_REPLICATION
869       rpl_group_info::pending_gtid_deletes_free(delete_list);
870 #endif
871     }
872   }
873   thd->lex->restore_backup_query_tables_list(&lex_backup);
874   thd->variables.option_bits= thd_saved_option;
875   thd->resume_subsequent_commits(suspended_wfc);
876   DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
877     {
878       if (gtid->server_id == 100)
879         my_sleep(500000);
880     });
881   DBUG_RETURN(err);
882 }
883 
884 
885 uint64
next_sub_id(uint32 domain_id)886 rpl_slave_state::next_sub_id(uint32 domain_id)
887 {
888   uint64 sub_id= 0;
889 
890   mysql_mutex_lock(&LOCK_slave_state);
891   sub_id= ++last_sub_id;
892   mysql_mutex_unlock(&LOCK_slave_state);
893 
894   return sub_id;
895 }
896 
897 /* A callback used in sorting of gtid list based on domain_id. */
rpl_gtid_cmp_cb(const void * id1,const void * id2)898 static int rpl_gtid_cmp_cb(const void *id1, const void *id2)
899 {
900   uint32 d1= ((rpl_gtid *)id1)->domain_id;
901   uint32 d2= ((rpl_gtid *)id2)->domain_id;
902 
903   if (d1 < d2)
904     return -1;
905   else if (d1 > d2)
906     return 1;
907   return 0;
908 }
909 
910 /* Format the specified gtid and store it in the given string buffer. */
911 bool
rpl_slave_state_tostring_helper(String * dest,const rpl_gtid * gtid,bool * first)912 rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
913 {
914   if (*first)
915     *first= false;
916   else
917     if (dest->append(",",1))
918       return true;
919   return
920     dest->append_ulonglong(gtid->domain_id) ||
921     dest->append("-",1) ||
922     dest->append_ulonglong(gtid->server_id) ||
923     dest->append("-",1) ||
924     dest->append_ulonglong(gtid->seq_no);
925 }
926 
927 /*
928   Sort the given gtid list based on domain_id and store them in the specified
929   string.
930 */
931 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,String * str)932 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, String *str)
933 {
934   bool first= true, res= true;
935 
936   sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
937 
938   for (uint i= 0; i < gtid_dynarr->elements; i ++)
939   {
940     rpl_gtid *gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
941     if (rpl_slave_state_tostring_helper(str, gtid, &first))
942       goto err;
943   }
944   res= false;
945 
946 err:
947   return res;
948 }
949 
950 
951 /* Sort the given gtid list based on domain_id and call cb for each gtid. */
952 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,int (* cb)(rpl_gtid *,void *),void * data)953 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr,
954                                 int (*cb)(rpl_gtid *, void *),
955                                 void *data)
956 {
957   rpl_gtid *gtid;
958   bool res= true;
959 
960   sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
961 
962   for (uint i= 0; i < gtid_dynarr->elements; i ++)
963   {
964     gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
965     if ((*cb)(gtid, data))
966       goto err;
967   }
968   res= false;
969 
970 err:
971   return res;
972 }
973 
974 int
iterate(int (* cb)(rpl_gtid *,void *),void * data,rpl_gtid * extra_gtids,uint32 num_extra,bool sort)975 rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
976                          rpl_gtid *extra_gtids, uint32 num_extra,
977                          bool sort)
978 {
979   uint32 i;
980   HASH gtid_hash;
981   uchar *rec;
982   rpl_gtid *gtid;
983   int res= 1;
984   bool locked= false;
985 
986   my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
987                sizeof(uint32), NULL, NULL, HASH_UNIQUE);
988   for (i= 0; i < num_extra; ++i)
989     if (extra_gtids[i].server_id == global_system_variables.server_id &&
990         my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
991       goto err;
992 
993   mysql_mutex_lock(&LOCK_slave_state);
994   locked= true;
995   reset_dynamic(&gtid_sort_array);
996 
997   for (i= 0; i < hash.records; ++i)
998   {
999     uint64 best_sub_id;
1000     rpl_gtid best_gtid;
1001     element *e= (element *)my_hash_element(&hash, i);
1002     list_element *l= e->list;
1003 
1004     if (!l)
1005       continue;                                 /* Nothing here */
1006 
1007     best_gtid.domain_id= e->domain_id;
1008     best_gtid.server_id= l->server_id;
1009     best_gtid.seq_no= l->seq_no;
1010     best_sub_id= l->sub_id;
1011     while ((l= l->next))
1012     {
1013       if (l->sub_id > best_sub_id)
1014       {
1015         best_sub_id= l->sub_id;
1016         best_gtid.server_id= l->server_id;
1017         best_gtid.seq_no= l->seq_no;
1018       }
1019     }
1020 
1021     /* Check if we have something newer in the extra list. */
1022     rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
1023     if (rec)
1024     {
1025       gtid= (rpl_gtid *)rec;
1026       if (gtid->seq_no > best_gtid.seq_no)
1027         memcpy(&best_gtid, gtid, sizeof(best_gtid));
1028       if (my_hash_delete(&gtid_hash, rec))
1029       {
1030         goto err;
1031       }
1032     }
1033 
1034     if ((res= sort ? insert_dynamic(&gtid_sort_array,
1035                                     (const void *) &best_gtid) :
1036          (*cb)(&best_gtid, data)))
1037     {
1038       goto err;
1039     }
1040   }
1041 
1042   /* Also add any remaining extra domain_ids. */
1043   for (i= 0; i < gtid_hash.records; ++i)
1044   {
1045     gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
1046     if ((res= sort ? insert_dynamic(&gtid_sort_array, (const void *) gtid) :
1047          (*cb)(gtid, data)))
1048     {
1049       goto err;
1050     }
1051   }
1052 
1053   if (sort && rpl_slave_state_tostring_helper(&gtid_sort_array, cb, data))
1054   {
1055     goto err;
1056   }
1057 
1058   res= 0;
1059 
1060 err:
1061   if (locked) mysql_mutex_unlock(&LOCK_slave_state);
1062   my_hash_free(&gtid_hash);
1063 
1064   return res;
1065 }
1066 
1067 
1068 struct rpl_slave_state_tostring_data {
1069   String *dest;
1070   bool first;
1071 };
1072 static int
rpl_slave_state_tostring_cb(rpl_gtid * gtid,void * data)1073 rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
1074 {
1075   rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
1076   return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
1077 }
1078 
1079 
1080 /*
1081   Prepare the current slave state as a string, suitable for sending to the
1082   master to request to receive binlog events starting from that GTID state.
1083 
1084   The state consists of the most recently applied GTID for each domain_id,
1085   ie. the one with the highest sub_id within each domain_id.
1086 
1087   Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
1088   a server was previously a master and now needs to connect to a new master as
1089   a slave. For each domain_id, if the GTID in the binlog was logged with our
1090   own server_id _and_ has a higher seq_no than what is in the slave state,
1091   then this should be used as the position to start replicating at. This
1092   allows to promote a slave as new master, and connect the old master as a
1093   slave with MASTER_GTID_POS=AUTO.
1094 */
1095 int
tostring(String * dest,rpl_gtid * extra_gtids,uint32 num_extra)1096 rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
1097 {
1098   struct rpl_slave_state_tostring_data data;
1099   data.first= true;
1100   data.dest= dest;
1101 
1102   return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids,
1103                  num_extra, true);
1104 }
1105 
1106 
1107 /*
1108   Lookup a domain_id in the current replication slave state.
1109 
1110   Returns false if the domain_id has no entries in the slave state.
1111   Otherwise returns true, and fills in out_gtid with the corresponding
1112   GTID.
1113 */
1114 bool
domain_to_gtid(uint32 domain_id,rpl_gtid * out_gtid)1115 rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
1116 {
1117   element *elem;
1118   list_element *list;
1119   uint64 best_sub_id;
1120 
1121   mysql_mutex_lock(&LOCK_slave_state);
1122   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1123   if (!elem || !(list= elem->list))
1124   {
1125     mysql_mutex_unlock(&LOCK_slave_state);
1126     return false;
1127   }
1128 
1129   out_gtid->domain_id= domain_id;
1130   out_gtid->server_id= list->server_id;
1131   out_gtid->seq_no= list->seq_no;
1132   best_sub_id= list->sub_id;
1133 
1134   while ((list= list->next))
1135   {
1136     if (best_sub_id > list->sub_id)
1137       continue;
1138     best_sub_id= list->sub_id;
1139     out_gtid->server_id= list->server_id;
1140     out_gtid->seq_no= list->seq_no;
1141   }
1142 
1143   mysql_mutex_unlock(&LOCK_slave_state);
1144   return true;
1145 }
1146 
1147 
1148 /*
1149   Parse a GTID at the start of a string, and update the pointer to point
1150   at the first character after the parsed GTID.
1151 
1152   Returns 0 on ok, non-zero on parse error.
1153 */
1154 static int
gtid_parser_helper(const char ** ptr,const char * end,rpl_gtid * out_gtid)1155 gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
1156 {
1157   char *q;
1158   const char *p= *ptr;
1159   uint64 v1, v2, v3;
1160   int err= 0;
1161 
1162   q= (char*) end;
1163   v1= (uint64)my_strtoll10(p, &q, &err);
1164   if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
1165     return 1;
1166   p= q+1;
1167   q= (char*) end;
1168   v2= (uint64)my_strtoll10(p, &q, &err);
1169   if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
1170     return 1;
1171   p= q+1;
1172   q= (char*) end;
1173   v3= (uint64)my_strtoll10(p, &q, &err);
1174   if (err != 0)
1175     return 1;
1176 
1177   out_gtid->domain_id= (uint32) v1;
1178   out_gtid->server_id= (uint32) v2;
1179   out_gtid->seq_no= v3;
1180   *ptr= q;
1181   return 0;
1182 }
1183 
1184 
1185 rpl_gtid *
gtid_parse_string_to_list(const char * str,size_t str_len,uint32 * out_len)1186 gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
1187 {
1188   const char *p= const_cast<char *>(str);
1189   const char *end= p + str_len;
1190   uint32 len= 0, alloc_len= 5;
1191   rpl_gtid *list= NULL;
1192 
1193   for (;;)
1194   {
1195     rpl_gtid gtid;
1196 
1197     if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
1198     {
1199       my_free(list);
1200       return NULL;
1201     }
1202     if ((!list || len >= alloc_len) &&
1203         !(list=
1204           (rpl_gtid *)my_realloc(list,
1205                                  (alloc_len= alloc_len*2) * sizeof(rpl_gtid),
1206                                  MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
1207       return NULL;
1208     list[len++]= gtid;
1209 
1210     if (p == end)
1211       break;
1212     if (*p != ',')
1213     {
1214       my_free(list);
1215       return NULL;
1216     }
1217     ++p;
1218   }
1219   *out_len= len;
1220   return list;
1221 }
1222 
1223 
1224 /*
1225   Update the slave replication state with the GTID position obtained from
1226   master when connecting with old-style (filename,offset) position.
1227 
1228   If RESET is true then all existing entries are removed. Otherwise only
1229   domain_ids mentioned in the STATE_FROM_MASTER are changed.
1230 
1231   Returns 0 if ok, non-zero if error.
1232 */
1233 int
load(THD * thd,const char * state_from_master,size_t len,bool reset,bool in_statement)1234 rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
1235                       bool reset, bool in_statement)
1236 {
1237   const char *end= state_from_master + len;
1238 
1239   if (reset)
1240   {
1241     if (truncate_state_table(thd))
1242       return 1;
1243     truncate_hash();
1244   }
1245   if (state_from_master == end)
1246     return 0;
1247   for (;;)
1248   {
1249     rpl_gtid gtid;
1250     uint64 sub_id;
1251     void *hton= NULL;
1252 
1253     if (gtid_parser_helper(&state_from_master, end, &gtid) ||
1254         !(sub_id= next_sub_id(gtid.domain_id)) ||
1255         record_gtid(thd, &gtid, sub_id, NULL, in_statement, &hton) ||
1256         update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
1257       return 1;
1258     if (state_from_master == end)
1259       break;
1260     if (*state_from_master != ',')
1261       return 1;
1262     ++state_from_master;
1263   }
1264   return 0;
1265 }
1266 
1267 
1268 bool
is_empty()1269 rpl_slave_state::is_empty()
1270 {
1271   uint32 i;
1272   bool result= true;
1273 
1274   mysql_mutex_lock(&LOCK_slave_state);
1275   for (i= 0; i < hash.records; ++i)
1276   {
1277     element *e= (element *)my_hash_element(&hash, i);
1278     if (e->list)
1279     {
1280       result= false;
1281       break;
1282     }
1283   }
1284   mysql_mutex_unlock(&LOCK_slave_state);
1285 
1286   return result;
1287 }
1288 
1289 
1290 void
free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table * list)1291 rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
1292 {
1293   struct gtid_pos_table *cur, *next;
1294 
1295   cur= list;
1296   while (cur)
1297   {
1298     next= cur->next;
1299     my_free(cur);
1300     cur= next;
1301   }
1302 }
1303 
1304 
1305 /*
1306   Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
1307   The caller must be holding LOCK_slave_state. Additionally, this function
1308   must only be called while all SQL threads are stopped.
1309 */
1310 void
set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table * new_list,rpl_slave_state::gtid_pos_table * default_entry)1311 rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
1312                                           rpl_slave_state::gtid_pos_table *default_entry)
1313 {
1314   gtid_pos_table *old_list;
1315 
1316   mysql_mutex_assert_owner(&LOCK_slave_state);
1317   old_list= (struct gtid_pos_table *)gtid_pos_tables;
1318   my_atomic_storeptr_explicit(&gtid_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
1319   my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
1320                               MY_MEMORY_ORDER_RELEASE);
1321   free_gtid_pos_tables(old_list);
1322 }
1323 
1324 
1325 void
add_gtid_pos_table(rpl_slave_state::gtid_pos_table * entry)1326 rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
1327 {
1328   mysql_mutex_assert_owner(&LOCK_slave_state);
1329   entry->next= (struct gtid_pos_table *)gtid_pos_tables;
1330   my_atomic_storeptr_explicit(&gtid_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
1331 }
1332 
1333 
1334 struct rpl_slave_state::gtid_pos_table *
alloc_gtid_pos_table(LEX_CSTRING * table_name,void * hton,rpl_slave_state::gtid_pos_table_state state)1335 rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton,
1336                                       rpl_slave_state::gtid_pos_table_state state)
1337 {
1338   struct gtid_pos_table *p;
1339   char *allocated_str;
1340 
1341   if (!my_multi_malloc(MYF(MY_WME),
1342                        &p, sizeof(*p),
1343                        &allocated_str, table_name->length+1,
1344                        NULL))
1345   {
1346     my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
1347     return NULL;
1348   }
1349   memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
1350   p->next = NULL;
1351   p->table_hton= hton;
1352   p->table_name.str= allocated_str;
1353   p->table_name.length= table_name->length;
1354   p->state= state;
1355   return p;
1356 }
1357 
1358 
init()1359 void rpl_binlog_state::init()
1360 {
1361   my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
1362                sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1363   my_init_dynamic_array(&gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
1364   mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
1365                    MY_MUTEX_INIT_SLOW);
1366   initialized= 1;
1367 }
1368 
1369 void
reset_nolock()1370 rpl_binlog_state::reset_nolock()
1371 {
1372   uint32 i;
1373 
1374   for (i= 0; i < hash.records; ++i)
1375     my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
1376   my_hash_reset(&hash);
1377 }
1378 
1379 
1380 void
reset()1381 rpl_binlog_state::reset()
1382 {
1383   mysql_mutex_lock(&LOCK_binlog_state);
1384   reset_nolock();
1385   mysql_mutex_unlock(&LOCK_binlog_state);
1386 }
1387 
1388 
free()1389 void rpl_binlog_state::free()
1390 {
1391   if (initialized)
1392   {
1393     initialized= 0;
1394     reset_nolock();
1395     my_hash_free(&hash);
1396     delete_dynamic(&gtid_sort_array);
1397     mysql_mutex_destroy(&LOCK_binlog_state);
1398   }
1399 }
1400 
1401 
1402 bool
load(struct rpl_gtid * list,uint32 count)1403 rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
1404 {
1405   uint32 i;
1406   bool res= false;
1407 
1408   mysql_mutex_lock(&LOCK_binlog_state);
1409   reset_nolock();
1410   for (i= 0; i < count; ++i)
1411   {
1412     if (update_nolock(&(list[i]), false))
1413     {
1414       res= true;
1415       break;
1416     }
1417   }
1418   mysql_mutex_unlock(&LOCK_binlog_state);
1419   return res;
1420 }
1421 
1422 
rpl_binlog_state_load_cb(rpl_gtid * gtid,void * data)1423 static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
1424 {
1425   rpl_binlog_state *self= (rpl_binlog_state *)data;
1426   return self->update_nolock(gtid, false);
1427 }
1428 
1429 
1430 bool
load(rpl_slave_state * slave_pos)1431 rpl_binlog_state::load(rpl_slave_state *slave_pos)
1432 {
1433   bool res= false;
1434 
1435   mysql_mutex_lock(&LOCK_binlog_state);
1436   reset_nolock();
1437   if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0, false))
1438     res= true;
1439   mysql_mutex_unlock(&LOCK_binlog_state);
1440   return res;
1441 }
1442 
1443 
~rpl_binlog_state()1444 rpl_binlog_state::~rpl_binlog_state()
1445 {
1446   free();
1447 }
1448 
1449 
1450 /*
1451   Update replication state with a new GTID.
1452 
1453   If the (domain_id, server_id) pair already exists, then the new GTID replaces
1454   the old one for that domain id. Else a new entry is inserted.
1455 
1456   Returns 0 for ok, 1 for error.
1457 */
1458 int
update_nolock(const struct rpl_gtid * gtid,bool strict)1459 rpl_binlog_state::update_nolock(const struct rpl_gtid *gtid, bool strict)
1460 {
1461   element *elem;
1462 
1463   if ((elem= (element *)my_hash_search(&hash,
1464                                        (const uchar *)(&gtid->domain_id), 0)))
1465   {
1466     if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
1467     {
1468       my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
1469                gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
1470                elem->last_gtid->server_id, elem->last_gtid->seq_no);
1471       return 1;
1472     }
1473     if (elem->seq_no_counter < gtid->seq_no)
1474       elem->seq_no_counter= gtid->seq_no;
1475     if (!elem->update_element(gtid))
1476       return 0;
1477   }
1478   else if (!alloc_element_nolock(gtid))
1479     return 0;
1480 
1481   my_error(ER_OUT_OF_RESOURCES, MYF(0));
1482   return 1;
1483 }
1484 
1485 
1486 int
update(const struct rpl_gtid * gtid,bool strict)1487 rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
1488 {
1489   int res;
1490   mysql_mutex_lock(&LOCK_binlog_state);
1491   res= update_nolock(gtid, strict);
1492   mysql_mutex_unlock(&LOCK_binlog_state);
1493   return res;
1494 }
1495 
1496 
1497 /*
1498   Fill in a new GTID, allocating next sequence number, and update state
1499   accordingly.
1500 */
1501 int
update_with_next_gtid(uint32 domain_id,uint32 server_id,rpl_gtid * gtid)1502 rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
1503                                         rpl_gtid *gtid)
1504 {
1505   element *elem;
1506   int res= 0;
1507 
1508   gtid->domain_id= domain_id;
1509   gtid->server_id= server_id;
1510 
1511   mysql_mutex_lock(&LOCK_binlog_state);
1512   if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1513   {
1514     gtid->seq_no= ++elem->seq_no_counter;
1515     if (!elem->update_element(gtid))
1516       goto end;
1517   }
1518   else
1519   {
1520     gtid->seq_no= 1;
1521     if (!alloc_element_nolock(gtid))
1522       goto end;
1523   }
1524 
1525   my_error(ER_OUT_OF_RESOURCES, MYF(0));
1526   res= 1;
1527 end:
1528   mysql_mutex_unlock(&LOCK_binlog_state);
1529   return res;
1530 }
1531 
1532 
1533 /* Helper functions for update. */
1534 int
update_element(const rpl_gtid * gtid)1535 rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
1536 {
1537   rpl_gtid *lookup_gtid;
1538 
1539   /*
1540     By far the most common case is that successive events within same
1541     replication domain have the same server id (it changes only when
1542     switching to a new master). So save a hash lookup in this case.
1543   */
1544   if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
1545   {
1546     last_gtid->seq_no= gtid->seq_no;
1547     return 0;
1548   }
1549 
1550   lookup_gtid= (rpl_gtid *)
1551     my_hash_search(&hash, (const uchar *)&gtid->server_id, 0);
1552   if (lookup_gtid)
1553   {
1554     lookup_gtid->seq_no= gtid->seq_no;
1555     last_gtid= lookup_gtid;
1556     return 0;
1557   }
1558 
1559   /* Allocate a new GTID and insert it. */
1560   lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
1561   if (!lookup_gtid)
1562     return 1;
1563   memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1564   if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
1565   {
1566     my_free(lookup_gtid);
1567     return 1;
1568   }
1569   last_gtid= lookup_gtid;
1570   return 0;
1571 }
1572 
1573 
1574 int
alloc_element_nolock(const rpl_gtid * gtid)1575 rpl_binlog_state::alloc_element_nolock(const rpl_gtid *gtid)
1576 {
1577   element *elem;
1578   rpl_gtid *lookup_gtid;
1579 
1580   /* First time we see this domain_id; allocate a new element. */
1581   elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
1582   lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
1583   if (elem && lookup_gtid)
1584   {
1585     elem->domain_id= gtid->domain_id;
1586     my_hash_init(&elem->hash, &my_charset_bin, 32,
1587                  offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1588                  HASH_UNIQUE);
1589     elem->last_gtid= lookup_gtid;
1590     elem->seq_no_counter= gtid->seq_no;
1591     memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1592     if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
1593     {
1594       lookup_gtid= NULL;                        /* Do not free. */
1595       if (0 == my_hash_insert(&hash, (const uchar *)elem))
1596         return 0;
1597     }
1598     my_hash_free(&elem->hash);
1599   }
1600 
1601   /* An error. */
1602   if (elem)
1603     my_free(elem);
1604   if (lookup_gtid)
1605     my_free(lookup_gtid);
1606   return 1;
1607 }
1608 
1609 
1610 /*
1611   Check that a new GTID can be logged without creating an out-of-order
1612   sequence number with existing GTIDs.
1613 */
1614 bool
check_strict_sequence(uint32 domain_id,uint32 server_id,uint64 seq_no)1615 rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
1616                                         uint64 seq_no)
1617 {
1618   element *elem;
1619   bool res= 0;
1620 
1621   mysql_mutex_lock(&LOCK_binlog_state);
1622   if ((elem= (element *)my_hash_search(&hash,
1623                                        (const uchar *)(&domain_id), 0)) &&
1624       elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
1625   {
1626     my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
1627              elem->last_gtid->domain_id, elem->last_gtid->server_id,
1628              elem->last_gtid->seq_no);
1629     res= 1;
1630   }
1631   mysql_mutex_unlock(&LOCK_binlog_state);
1632   return res;
1633 }
1634 
1635 
1636 /*
1637   When we see a new GTID that will not be binlogged (eg. slave thread
1638   with --log-slave-updates=0), then we need to remember to allocate any
1639   GTID seq_no of our own within that domain starting from there.
1640 
1641   Returns 0 if ok, non-zero if out-of-memory.
1642 */
1643 int
bump_seq_no_if_needed(uint32 domain_id,uint64 seq_no)1644 rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
1645 {
1646   element *elem;
1647   int res;
1648 
1649   mysql_mutex_lock(&LOCK_binlog_state);
1650   if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1651   {
1652     if (elem->seq_no_counter < seq_no)
1653       elem->seq_no_counter= seq_no;
1654     res= 0;
1655     goto end;
1656   }
1657 
1658   /* We need to allocate a new, empty element to remember the next seq_no. */
1659   if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
1660   {
1661     res= 1;
1662     goto end;
1663   }
1664 
1665   elem->domain_id= domain_id;
1666   my_hash_init(&elem->hash, &my_charset_bin, 32,
1667                offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1668                HASH_UNIQUE);
1669   elem->last_gtid= NULL;
1670   elem->seq_no_counter= seq_no;
1671   if (0 == my_hash_insert(&hash, (const uchar *)elem))
1672   {
1673     res= 0;
1674     goto end;
1675   }
1676 
1677   my_hash_free(&elem->hash);
1678   my_free(elem);
1679   res= 1;
1680 
1681 end:
1682   mysql_mutex_unlock(&LOCK_binlog_state);
1683   return res;
1684 }
1685 
1686 
1687 /*
1688   Write binlog state to text file, so we can read it in again without having
1689   to scan last binlog file (normal shutdown/startup, not crash recovery).
1690 
1691   The most recent GTID within each domain_id is written after any other GTID
1692   within this domain.
1693 */
1694 int
write_to_iocache(IO_CACHE * dest)1695 rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
1696 {
1697   ulong i, j;
1698   char buf[21];
1699   int res= 0;
1700 
1701   mysql_mutex_lock(&LOCK_binlog_state);
1702   for (i= 0; i < hash.records; ++i)
1703   {
1704     element *e= (element *)my_hash_element(&hash, i);
1705     if (!e->last_gtid)
1706     {
1707       DBUG_ASSERT(e->hash.records == 0);
1708       continue;
1709     }
1710     for (j= 0; j <= e->hash.records; ++j)
1711     {
1712       const rpl_gtid *gtid;
1713       if (j < e->hash.records)
1714       {
1715         gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
1716         if (gtid == e->last_gtid)
1717           continue;
1718       }
1719       else
1720         gtid= e->last_gtid;
1721 
1722       longlong10_to_str(gtid->seq_no, buf, 10);
1723       if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id,
1724                       buf))
1725       {
1726         res= 1;
1727         goto end;
1728       }
1729     }
1730   }
1731 
1732 end:
1733   mysql_mutex_unlock(&LOCK_binlog_state);
1734   return res;
1735 }
1736 
1737 
1738 int
read_from_iocache(IO_CACHE * src)1739 rpl_binlog_state::read_from_iocache(IO_CACHE *src)
1740 {
1741   /* 10-digit - 10-digit - 20-digit \n \0 */
1742   char buf[10+1+10+1+20+1+1];
1743   const char *p, *end;
1744   rpl_gtid gtid;
1745   int res= 0;
1746 
1747   mysql_mutex_lock(&LOCK_binlog_state);
1748   reset_nolock();
1749   for (;;)
1750   {
1751     size_t len= my_b_gets(src, buf, sizeof(buf));
1752     if (!len)
1753       break;
1754     p= buf;
1755     end= buf + len;
1756     if (gtid_parser_helper(&p, end, &gtid) ||
1757         update_nolock(&gtid, false))
1758     {
1759       res= 1;
1760       break;
1761     }
1762   }
1763   mysql_mutex_unlock(&LOCK_binlog_state);
1764   return res;
1765 }
1766 
1767 
1768 rpl_gtid *
find_nolock(uint32 domain_id,uint32 server_id)1769 rpl_binlog_state::find_nolock(uint32 domain_id, uint32 server_id)
1770 {
1771   element *elem;
1772   if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
1773     return NULL;
1774   return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
1775 }
1776 
1777 rpl_gtid *
find(uint32 domain_id,uint32 server_id)1778 rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
1779 {
1780   rpl_gtid *p;
1781   mysql_mutex_lock(&LOCK_binlog_state);
1782   p= find_nolock(domain_id, server_id);
1783   mysql_mutex_unlock(&LOCK_binlog_state);
1784   return p;
1785 }
1786 
1787 rpl_gtid *
find_most_recent(uint32 domain_id)1788 rpl_binlog_state::find_most_recent(uint32 domain_id)
1789 {
1790   element *elem;
1791   rpl_gtid *gtid= NULL;
1792 
1793   mysql_mutex_lock(&LOCK_binlog_state);
1794   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1795   if (elem && elem->last_gtid)
1796     gtid= elem->last_gtid;
1797   mysql_mutex_unlock(&LOCK_binlog_state);
1798 
1799   return gtid;
1800 }
1801 
1802 
1803 uint32
count()1804 rpl_binlog_state::count()
1805 {
1806   uint32 c= 0;
1807   uint32 i;
1808 
1809   mysql_mutex_lock(&LOCK_binlog_state);
1810   for (i= 0; i < hash.records; ++i)
1811     c+= ((element *)my_hash_element(&hash, i))->hash.records;
1812   mysql_mutex_unlock(&LOCK_binlog_state);
1813 
1814   return c;
1815 }
1816 
1817 
1818 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)1819 rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
1820 {
1821   uint32 i, j, pos;
1822   int res= 0;
1823 
1824   mysql_mutex_lock(&LOCK_binlog_state);
1825   pos= 0;
1826   for (i= 0; i < hash.records; ++i)
1827   {
1828     element *e= (element *)my_hash_element(&hash, i);
1829     if (!e->last_gtid)
1830     {
1831       DBUG_ASSERT(e->hash.records==0);
1832       continue;
1833     }
1834     for (j= 0; j <= e->hash.records; ++j)
1835     {
1836       const rpl_gtid *gtid;
1837       if (j < e->hash.records)
1838       {
1839         gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1840         if (gtid == e->last_gtid)
1841           continue;
1842       }
1843       else
1844         gtid= e->last_gtid;
1845 
1846       if (pos >= list_size)
1847       {
1848         res= 1;
1849         goto end;
1850       }
1851       memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
1852     }
1853   }
1854 
1855 end:
1856   mysql_mutex_unlock(&LOCK_binlog_state);
1857   return res;
1858 }
1859 
1860 
1861 /*
1862   Get a list of the most recently binlogged GTID, for each domain_id.
1863 
1864   This can be used when switching from being a master to being a slave,
1865   to know where to start replicating from the new master.
1866 
1867   The returned list must be de-allocated with my_free().
1868 
1869   Returns 0 for ok, non-zero for out-of-memory.
1870 */
1871 int
get_most_recent_gtid_list(rpl_gtid ** list,uint32 * size)1872 rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
1873 {
1874   uint32 i;
1875   uint32 alloc_size, out_size;
1876   int res= 0;
1877 
1878   out_size= 0;
1879   mysql_mutex_lock(&LOCK_binlog_state);
1880   alloc_size= hash.records;
1881   if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
1882                                      MYF(MY_WME))))
1883   {
1884     res= 1;
1885     goto end;
1886   }
1887   for (i= 0; i < alloc_size; ++i)
1888   {
1889     element *e= (element *)my_hash_element(&hash, i);
1890     if (!e->last_gtid)
1891       continue;
1892     memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
1893   }
1894 
1895 end:
1896   mysql_mutex_unlock(&LOCK_binlog_state);
1897   *size= out_size;
1898   return res;
1899 }
1900 
1901 bool
append_pos(String * str)1902 rpl_binlog_state::append_pos(String *str)
1903 {
1904   uint32 i;
1905 
1906   mysql_mutex_lock(&LOCK_binlog_state);
1907   reset_dynamic(&gtid_sort_array);
1908 
1909   for (i= 0; i < hash.records; ++i)
1910   {
1911     element *e= (element *)my_hash_element(&hash, i);
1912     if (e->last_gtid &&
1913         insert_dynamic(&gtid_sort_array, (const void *) e->last_gtid))
1914     {
1915       mysql_mutex_unlock(&LOCK_binlog_state);
1916       return true;
1917     }
1918   }
1919   rpl_slave_state_tostring_helper(&gtid_sort_array, str);
1920   mysql_mutex_unlock(&LOCK_binlog_state);
1921 
1922   return false;
1923 }
1924 
1925 
1926 bool
append_state(String * str)1927 rpl_binlog_state::append_state(String *str)
1928 {
1929   uint32 i, j;
1930   bool res= false;
1931 
1932   mysql_mutex_lock(&LOCK_binlog_state);
1933   reset_dynamic(&gtid_sort_array);
1934 
1935   for (i= 0; i < hash.records; ++i)
1936   {
1937     element *e= (element *)my_hash_element(&hash, i);
1938     if (!e->last_gtid)
1939     {
1940       DBUG_ASSERT(e->hash.records==0);
1941       continue;
1942     }
1943     for (j= 0; j <= e->hash.records; ++j)
1944     {
1945       const rpl_gtid *gtid;
1946       if (j < e->hash.records)
1947       {
1948         gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1949         if (gtid == e->last_gtid)
1950           continue;
1951       }
1952       else
1953         gtid= e->last_gtid;
1954 
1955       if (insert_dynamic(&gtid_sort_array, (const void *) gtid))
1956       {
1957         res= true;
1958         goto end;
1959       }
1960     }
1961   }
1962 
1963   rpl_slave_state_tostring_helper(&gtid_sort_array, str);
1964 
1965 end:
1966   mysql_mutex_unlock(&LOCK_binlog_state);
1967   return res;
1968 }
1969 
1970 /**
1971   Remove domains supplied by the first argument from binlog state.
1972   Removal is done for any domain whose last gtids (from all its servers) match
1973   ones in Gtid list event of the 2nd argument.
1974 
1975   @param  ids               gtid domain id sequence, may contain dups
1976   @param  glev              pointer to Gtid list event describing
1977                             the match condition
1978   @param  errbuf [out]      pointer to possible error message array
1979 
1980   @retval NULL              as success when at least one domain is removed
1981   @retval ""                empty string to indicate ineffective call
1982                             when no domains removed
1983   @retval NOT EMPTY string  otherwise an error message
1984 */
1985 const char*
drop_domain(DYNAMIC_ARRAY * ids,Gtid_list_log_event * glev,char * errbuf)1986 rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids,
1987                               Gtid_list_log_event *glev,
1988                               char* errbuf)
1989 {
1990   DYNAMIC_ARRAY domain_unique; // sequece (unsorted) of unique element*:s
1991   rpl_binlog_state::element* domain_unique_buffer[16];
1992   ulong k, l;
1993   const char* errmsg= NULL;
1994 
1995   DBUG_ENTER("rpl_binlog_state::drop_domain");
1996 
1997   my_init_dynamic_array2(&domain_unique,
1998                          sizeof(element*), domain_unique_buffer,
1999                          sizeof(domain_unique_buffer) / sizeof(element*), 4, 0);
2000 
2001   mysql_mutex_lock(&LOCK_binlog_state);
2002 
2003   /*
2004     Gtid list is supposed to come from a binlog's Gtid_list event and
2005     therefore should be a subset of the current binlog state. That is
2006     for every domain in the list the binlog state contains a gtid with
2007     sequence number not less than that of the list.
2008     Exceptions of this inclusion rule are:
2009       A. the list may still refer to gtids from already deleted domains.
2010          Files containing them must have been purged whereas the file
2011          with the list is not yet.
2012       B. out of order groups were injected
2013       C. manually build list of binlog files violating the inclusion
2014          constraint.
2015     While A is a normal case (not necessarily distinguishable from C though),
2016     B and C may require the user's attention so any (incl the A's suspected)
2017     inconsistency is diagnosed and *warned*.
2018   */
2019   for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0)
2020   {
2021     rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id,
2022                                          glev->list[l].server_id);
2023     if (!rb_state_gtid)
2024       sprintf(errbuf,
2025               "missing gtids from the '%u-%u' domain-server pair which is "
2026               "referred to in the gtid list describing an earlier state. Ignore "
2027               "if the domain ('%u') was already explicitly deleted",
2028               glev->list[l].domain_id, glev->list[l].server_id,
2029               glev->list[l].domain_id);
2030     else if (rb_state_gtid->seq_no < glev->list[l].seq_no)
2031       sprintf(errbuf,
2032               "having a gtid '%u-%u-%llu' which is less than "
2033               "the '%u-%u-%llu' of the gtid list describing an earlier state. "
2034               "The state may have been affected by manually injecting "
2035               "a lower sequence number gtid or via replication",
2036               rb_state_gtid->domain_id, rb_state_gtid->server_id,
2037               rb_state_gtid->seq_no, glev->list[l].domain_id,
2038               glev->list[l].server_id, glev->list[l].seq_no);
2039     if (strlen(errbuf)) // use strlen() as cheap flag
2040       push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2041                           ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2042                           "The current gtid binlog state is incompatible with "
2043                           "a former one %s.", errbuf);
2044   }
2045 
2046   /*
2047     For each domain_id from ids
2048       when no such domain in binlog state
2049         warn && continue
2050       For each domain.server's last gtid
2051         when not locate the last gtid in glev.list
2052           error out binlog state can't change
2053         otherwise continue
2054   */
2055   for (ulong i= 0; i < ids->elements; i++)
2056   {
2057     rpl_binlog_state::element *elem= NULL;
2058     uint32 *ptr_domain_id;
2059     bool not_match;
2060 
2061     ptr_domain_id= (uint32*) dynamic_array_ptr(ids, i);
2062     elem= (rpl_binlog_state::element *)
2063       my_hash_search(&hash, (const uchar *) ptr_domain_id, 0);
2064     if (!elem)
2065     {
2066       push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2067                           ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2068                           "The gtid domain being deleted ('%lu') is not in "
2069                           "the current binlog state", (unsigned long) *ptr_domain_id);
2070       continue;
2071     }
2072 
2073     for (not_match= true, k= 0; k < elem->hash.records; k++)
2074     {
2075       rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k);
2076       for (ulong l= 0; l < glev->count && not_match; l++)
2077         not_match= !(*d_gtid == glev->list[l]);
2078     }
2079 
2080     if (not_match)
2081     {
2082       sprintf(errbuf, "binlog files may contain gtids from the domain ('%u') "
2083               "being deleted. Make sure to first purge those files",
2084               *ptr_domain_id);
2085       errmsg= errbuf;
2086       goto end;
2087     }
2088     // compose a sequence of unique pointers to domain object
2089     for (k= 0; k < domain_unique.elements; k++)
2090     {
2091       if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k)
2092           == elem)
2093         break; // domain_id's elem has been already in
2094     }
2095     if (k == domain_unique.elements) // proven not to have duplicates
2096       insert_dynamic(&domain_unique, (uchar*) &elem);
2097   }
2098 
2099   // Domain removal from binlog state
2100   for (k= 0; k < domain_unique.elements; k++)
2101   {
2102     rpl_binlog_state::element *elem= *(rpl_binlog_state::element**)
2103       dynamic_array_ptr(&domain_unique, k);
2104     my_hash_free(&elem->hash);
2105     my_hash_delete(&hash, (uchar*) elem);
2106   }
2107 
2108   DBUG_ASSERT(strlen(errbuf) == 0);
2109 
2110   if (domain_unique.elements == 0)
2111     errmsg= "";
2112 
2113 end:
2114   mysql_mutex_unlock(&LOCK_binlog_state);
2115   delete_dynamic(&domain_unique);
2116 
2117   DBUG_RETURN(errmsg);
2118 }
2119 
slave_connection_state()2120 slave_connection_state::slave_connection_state()
2121 {
2122   my_hash_init(&hash, &my_charset_bin, 32,
2123                offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id),
2124                sizeof(uint32), NULL, my_free, HASH_UNIQUE);
2125   my_init_dynamic_array(&gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
2126 }
2127 
2128 
~slave_connection_state()2129 slave_connection_state::~slave_connection_state()
2130 {
2131   my_hash_free(&hash);
2132   delete_dynamic(&gtid_sort_array);
2133 }
2134 
2135 
2136 /*
2137   Create a hash from the slave GTID state that is sent to master when slave
2138   connects to start replication.
2139 
2140   The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
2141 
2142      0-2-112,1-4-1022
2143 
2144   The state gives for each domain_id the GTID to start replication from for
2145   the corresponding replication stream. So domain_id must be unique.
2146 
2147   Returns 0 if ok, non-zero if error due to malformed input.
2148 
2149   Note that input string is built by slave server, so it will not be incorrect
2150   unless bug/corruption/malicious server. So we just need basic sanity check,
2151   not fancy user-friendly error message.
2152 */
2153 
2154 int
load(const char * slave_request,size_t len)2155 slave_connection_state::load(const char *slave_request, size_t len)
2156 {
2157   const char *p, *end;
2158   uchar *rec;
2159   rpl_gtid *gtid;
2160   const entry *e;
2161 
2162   reset();
2163   p= slave_request;
2164   end= slave_request + len;
2165   if (p == end)
2166     return 0;
2167   for (;;)
2168   {
2169     if (!(rec= (uchar *)my_malloc(sizeof(entry), MYF(MY_WME))))
2170       return 1;
2171     gtid= &((entry *)rec)->gtid;
2172     if (gtid_parser_helper(&p, end, gtid))
2173     {
2174       my_free(rec);
2175       my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2176       return 1;
2177     }
2178     if ((e= (const entry *)
2179          my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0)))
2180     {
2181       my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
2182                gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id,
2183                e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id);
2184       my_free(rec);
2185       return 1;
2186     }
2187     ((entry *)rec)->flags= 0;
2188     if (my_hash_insert(&hash, rec))
2189     {
2190       my_free(rec);
2191       my_error(ER_OUT_OF_RESOURCES, MYF(0));
2192       return 1;
2193     }
2194     if (p == end)
2195       break;                                         /* Finished. */
2196     if (*p != ',')
2197     {
2198       my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2199       return 1;
2200     }
2201     ++p;
2202   }
2203 
2204   return 0;
2205 }
2206 
2207 
2208 int
load(const rpl_gtid * gtid_list,uint32 count)2209 slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
2210 {
2211   uint32 i;
2212 
2213   reset();
2214   for (i= 0; i < count; ++i)
2215     if (update(&gtid_list[i]))
2216       return 1;
2217   return 0;
2218 }
2219 
2220 
2221 static int
slave_connection_state_load_cb(rpl_gtid * gtid,void * data)2222 slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
2223 {
2224   slave_connection_state *state= (slave_connection_state *)data;
2225   return state->update(gtid);
2226 }
2227 
2228 
2229 /*
2230   Same as rpl_slave_state::tostring(), but populates a slave_connection_state
2231   instead.
2232 */
2233 int
load(rpl_slave_state * state,rpl_gtid * extra_gtids,uint32 num_extra)2234 slave_connection_state::load(rpl_slave_state *state,
2235                              rpl_gtid *extra_gtids, uint32 num_extra)
2236 {
2237   reset();
2238   return state->iterate(slave_connection_state_load_cb, this,
2239                         extra_gtids, num_extra, false);
2240 }
2241 
2242 
2243 slave_connection_state::entry *
find_entry(uint32 domain_id)2244 slave_connection_state::find_entry(uint32 domain_id)
2245 {
2246   return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
2247 }
2248 
2249 
2250 rpl_gtid *
find(uint32 domain_id)2251 slave_connection_state::find(uint32 domain_id)
2252 {
2253   entry *e= find_entry(domain_id);
2254   if (!e)
2255     return NULL;
2256   return &e->gtid;
2257 }
2258 
2259 
2260 int
update(const rpl_gtid * in_gtid)2261 slave_connection_state::update(const rpl_gtid *in_gtid)
2262 {
2263   entry *e;
2264   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2265   if (rec)
2266   {
2267     e= (entry *)rec;
2268     e->gtid= *in_gtid;
2269     return 0;
2270   }
2271 
2272   if (!(e= (entry *)my_malloc(sizeof(*e), MYF(MY_WME))))
2273     return 1;
2274   e->gtid= *in_gtid;
2275   e->flags= 0;
2276   if (my_hash_insert(&hash, (uchar *)e))
2277   {
2278     my_free(e);
2279     return 1;
2280   }
2281 
2282   return 0;
2283 }
2284 
2285 
2286 void
remove(const rpl_gtid * in_gtid)2287 slave_connection_state::remove(const rpl_gtid *in_gtid)
2288 {
2289   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2290 #ifdef DBUG_ASSERT_EXISTS
2291   bool err;
2292   rpl_gtid *slave_gtid= &((entry *)rec)->gtid;
2293   DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
2294   DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
2295   DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
2296   err=
2297 #endif
2298     my_hash_delete(&hash, rec);
2299   DBUG_ASSERT(!err);
2300 }
2301 
2302 
2303 void
remove_if_present(const rpl_gtid * in_gtid)2304 slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
2305 {
2306   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2307   if (rec)
2308     my_hash_delete(&hash, rec);
2309 }
2310 
2311 
2312 int
to_string(String * out_str)2313 slave_connection_state::to_string(String *out_str)
2314 {
2315   out_str->length(0);
2316   return append_to_string(out_str);
2317 }
2318 
2319 
2320 int
append_to_string(String * out_str)2321 slave_connection_state::append_to_string(String *out_str)
2322 {
2323   uint32 i;
2324   bool first;
2325 
2326   first= true;
2327   for (i= 0; i < hash.records; ++i)
2328   {
2329     const entry *e= (const entry *)my_hash_element(&hash, i);
2330     if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first))
2331       return 1;
2332   }
2333   return 0;
2334 }
2335 
2336 
2337 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)2338 slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
2339 {
2340   uint32 i, pos;
2341 
2342   pos= 0;
2343   for (i= 0; i < hash.records; ++i)
2344   {
2345     entry *e;
2346     if (pos >= list_size)
2347       return 1;
2348     e= (entry *)my_hash_element(&hash, i);
2349     memcpy(&gtid_list[pos++], &e->gtid, sizeof(e->gtid));
2350   }
2351 
2352   return 0;
2353 }
2354 
2355 
2356 /*
2357   Check if the GTID position has been reached, for mysql_binlog_send().
2358 
2359   The position has not been reached if we have anything in the state, unless
2360   it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
2361   belong to this master at all), or the START_OWN_SLAVE_POS (which means that
2362   we start on an old position from when the server was a slave with
2363   --log-slave-updates=0).
2364 */
2365 bool
is_pos_reached()2366 slave_connection_state::is_pos_reached()
2367 {
2368   uint32 i;
2369 
2370   for (i= 0; i < hash.records; ++i)
2371   {
2372     entry *e= (entry *)my_hash_element(&hash, i);
2373     if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
2374       return false;
2375   }
2376 
2377   return true;
2378 }
2379 
2380 
2381 /*
2382   Execute a MASTER_GTID_WAIT().
2383   The position to wait for is in gtid_str in string form.
2384   The timeout in microseconds is in timeout_us, zero means no timeout.
2385 
2386   Returns:
2387    1 for error.
2388    0 for wait completed.
2389   -1 for wait timed out.
2390 */
2391 int
wait_for_pos(THD * thd,String * gtid_str,longlong timeout_us)2392 gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
2393 {
2394   int err;
2395   rpl_gtid *wait_pos;
2396   uint32 count, i;
2397   struct timespec wait_until, *wait_until_ptr;
2398   ulonglong before;
2399 
2400   /* Wait for the empty position returns immediately. */
2401   if (gtid_str->length() == 0)
2402   {
2403     status_var_increment(thd->status_var.master_gtid_wait_count);
2404     return 0;
2405   }
2406 
2407   if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
2408                                             &count)))
2409   {
2410     my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2411     return 1;
2412   }
2413   status_var_increment(thd->status_var.master_gtid_wait_count);
2414   before= microsecond_interval_timer();
2415 
2416   if (timeout_us >= 0)
2417   {
2418     set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
2419     wait_until_ptr= &wait_until;
2420   }
2421   else
2422     wait_until_ptr= NULL;
2423   err= 0;
2424   for (i= 0; i < count; ++i)
2425   {
2426     if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
2427       break;
2428   }
2429   switch (err)
2430   {
2431     case -1:
2432       status_var_increment(thd->status_var.master_gtid_wait_timeouts);
2433       /* fall through */
2434     case 0:
2435       status_var_add(thd->status_var.master_gtid_wait_time,
2436                      static_cast<ulong>
2437                      (microsecond_interval_timer() - before));
2438   }
2439   my_free(wait_pos);
2440   return err;
2441 }
2442 
2443 
2444 void
promote_new_waiter(gtid_waiting::hash_element * he)2445 gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
2446 {
2447   queue_element *qe;
2448 
2449   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2450   if (queue_empty(&he->queue))
2451     return;
2452   qe= (queue_element *)queue_top(&he->queue);
2453   qe->do_small_wait= true;
2454   mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2455 }
2456 
2457 void
process_wait_hash(uint64 wakeup_seq_no,gtid_waiting::hash_element * he)2458 gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
2459                                 gtid_waiting::hash_element *he)
2460 {
2461   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2462 
2463   for (;;)
2464   {
2465     queue_element *qe;
2466 
2467     if (queue_empty(&he->queue))
2468       break;
2469     qe= (queue_element *)queue_top(&he->queue);
2470     if (qe->wait_seq_no > wakeup_seq_no)
2471       break;
2472     DBUG_ASSERT(!qe->done);
2473     queue_remove_top(&he->queue);
2474     qe->done= true;;
2475     mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2476   }
2477 }
2478 
2479 
2480 /*
2481   Execute a MASTER_GTID_WAIT() for one specific domain.
2482 
2483   The implementation is optimised primarily for (1) minimal performance impact
2484   on the slave replication threads, and secondarily for (2) quick performance
2485   of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
2486   read to clients in an async replication read-scaleout scenario.
2487 
2488   To achieve (1), we have a "small" wait and a "large" wait. The small wait
2489   contends with the replication threads on the lock on the gtid_slave_pos, so
2490   only minimal processing is done under that lock, and only a single waiter at
2491   a time does the small wait.
2492 
2493   If there is already a small waiter, a new thread will either replace the
2494   small waiter (if it needs to wait for an earlier sequence number), or
2495   instead do a "large" wait.
2496 
2497   Once awoken on the small wait, the waiting thread releases the lock shared
2498   with the SQL threads quickly, and then processes all waiters currently doing
2499   the large wait using a different lock that does not impact replication.
2500 
2501   This way, the SQL threads only need to do a single check + possibly a
2502   pthread_cond_signal() when updating the gtid_slave_state, and the time that
2503   non-SQL threads contend for the lock on gtid_slave_state is minimized.
2504 
2505   There is always at least one thread that has the responsibility to ensure
2506   that there is a small waiter; this thread has queue_element::do_small_wait
2507   set to true. This thread will do the small wait until it is done, at which
2508   point it will make sure to pass on the responsibility to another thread.
2509   Normally only one thread has do_small_wait==true, but it can occasionally
2510   happen that there is more than one, when threads race one another for the
2511   lock on the small wait (this results in slightly increased activity on the
2512   small lock but is otherwise harmless).
2513 
2514   Returns:
2515      0  Wait completed normally
2516     -1  Wait completed due to timeout
2517      1  An error (my_error() will have been called to set the error in the da)
2518 */
2519 int
wait_for_gtid(THD * thd,rpl_gtid * wait_gtid,struct timespec * wait_until)2520 gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
2521                             struct timespec *wait_until)
2522 {
2523   bool timed_out= false;
2524 #ifdef HAVE_REPLICATION
2525   queue_element elem;
2526   uint32 domain_id= wait_gtid->domain_id;
2527   uint64 seq_no= wait_gtid->seq_no;
2528   hash_element *he;
2529   rpl_slave_state::element *slave_state_elem= NULL;
2530   PSI_stage_info old_stage;
2531   bool did_enter_cond= false;
2532 
2533   elem.wait_seq_no= seq_no;
2534   elem.thd= thd;
2535   elem.done= false;
2536 
2537   mysql_mutex_lock(&LOCK_gtid_waiting);
2538   if (!(he= get_entry(wait_gtid->domain_id)))
2539   {
2540     mysql_mutex_unlock(&LOCK_gtid_waiting);
2541     return 1;
2542   }
2543   /*
2544     If there is already another waiter with seq_no no larger than our own,
2545     we are sure that there is already a small waiter that will wake us up
2546     (or later pass the small wait responsibility to us). So in this case, we
2547     do not need to touch the small wait lock at all.
2548   */
2549   elem.do_small_wait=
2550     (queue_empty(&he->queue) ||
2551      ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
2552 
2553   if (register_in_wait_queue(thd, wait_gtid, he, &elem))
2554   {
2555     mysql_mutex_unlock(&LOCK_gtid_waiting);
2556     return 1;
2557   }
2558   /*
2559     Loop, doing either the small or large wait as appropriate, until either
2560     the position waited for is reached, or we get a kill or timeout.
2561   */
2562   for (;;)
2563   {
2564     mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2565 
2566     if (elem.do_small_wait)
2567     {
2568       uint64 wakeup_seq_no;
2569       queue_element *cur_waiter;
2570 
2571       mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2572       /*
2573         The elements in the gtid_slave_state_hash are never re-allocated once
2574         they enter the hash, so we do not need to re-do the lookup after releasing
2575         and re-aquiring the lock.
2576       */
2577       if (!slave_state_elem &&
2578           !(slave_state_elem= rpl_global_gtid_slave_state->get_element(domain_id)))
2579       {
2580         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2581         remove_from_wait_queue(he, &elem);
2582         promote_new_waiter(he);
2583         if (did_enter_cond)
2584           thd->EXIT_COND(&old_stage);
2585         else
2586           mysql_mutex_unlock(&LOCK_gtid_waiting);
2587         my_error(ER_OUT_OF_RESOURCES, MYF(0));
2588         return 1;
2589       }
2590 
2591       if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
2592       {
2593         /*
2594           We do not have to wait. (We will be removed from the wait queue when
2595           we call process_wait_hash() below.
2596         */
2597         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2598       }
2599       else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
2600                slave_state_elem->min_wait_seq_no <= seq_no)
2601       {
2602         /*
2603           There is already a suitable small waiter, go do the large wait.
2604           (Normally we would not have needed to check the small wait in this
2605           case, but it can happen if we race with another thread for the small
2606           lock).
2607         */
2608         elem.do_small_wait= false;
2609         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2610       }
2611       else
2612       {
2613         /*
2614           We have to do the small wait ourselves (stealing it from any thread
2615           that might already be waiting for a later seq_no).
2616         */
2617         slave_state_elem->gtid_waiter= &elem;
2618         slave_state_elem->min_wait_seq_no= seq_no;
2619         if (cur_waiter)
2620         {
2621           /* We stole the wait, so wake up the old waiting thread. */
2622           mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
2623         }
2624 
2625         /* Release the large lock, and do the small wait. */
2626         if (did_enter_cond)
2627         {
2628           thd->EXIT_COND(&old_stage);
2629           did_enter_cond= false;
2630         }
2631         else
2632           mysql_mutex_unlock(&LOCK_gtid_waiting);
2633         thd->ENTER_COND(&slave_state_elem->COND_wait_gtid,
2634                         &rpl_global_gtid_slave_state->LOCK_slave_state,
2635                         &stage_master_gtid_wait_primary, &old_stage);
2636         do
2637         {
2638           if (unlikely(thd->check_killed(1)))
2639             break;
2640           else if (wait_until)
2641           {
2642             int err=
2643               mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
2644                                    &rpl_global_gtid_slave_state->LOCK_slave_state,
2645                                    wait_until);
2646             if (err == ETIMEDOUT || err == ETIME)
2647             {
2648               timed_out= true;
2649               break;
2650             }
2651           }
2652           else
2653             mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
2654                             &rpl_global_gtid_slave_state->LOCK_slave_state);
2655         } while (slave_state_elem->gtid_waiter == &elem);
2656         wakeup_seq_no= slave_state_elem->highest_seq_no;
2657         /*
2658           If we aborted due to timeout or kill, remove us as waiter.
2659 
2660           If we were replaced by another waiter with a smaller seq_no, then we
2661           no longer have responsibility for the small wait.
2662         */
2663         if ((cur_waiter= slave_state_elem->gtid_waiter))
2664         {
2665           if (cur_waiter == &elem)
2666             slave_state_elem->gtid_waiter= NULL;
2667           else if (slave_state_elem->min_wait_seq_no <= seq_no)
2668             elem.do_small_wait= false;
2669         }
2670         thd->EXIT_COND(&old_stage);
2671 
2672         mysql_mutex_lock(&LOCK_gtid_waiting);
2673       }
2674 
2675       /*
2676         Note that hash_entry pointers do not change once allocated, so we do
2677         not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
2678       */
2679       process_wait_hash(wakeup_seq_no, he);
2680     }
2681     else
2682     {
2683       /* Do the large wait. */
2684       if (!did_enter_cond)
2685       {
2686         thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
2687                         &stage_master_gtid_wait, &old_stage);
2688         did_enter_cond= true;
2689       }
2690       while (!elem.done && likely(!thd->check_killed(1)))
2691       {
2692         thd_wait_begin(thd, THD_WAIT_BINLOG);
2693         if (wait_until)
2694         {
2695           int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
2696                                         &LOCK_gtid_waiting, wait_until);
2697           if (err == ETIMEDOUT || err == ETIME)
2698             timed_out= true;
2699         }
2700         else
2701           mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
2702         thd_wait_end(thd);
2703         if (elem.do_small_wait || timed_out)
2704           break;
2705       }
2706     }
2707 
2708     if ((thd->killed || timed_out) && !elem.done)
2709     {
2710       /* Aborted, so remove ourselves from the hash. */
2711       remove_from_wait_queue(he, &elem);
2712       elem.done= true;
2713     }
2714     if (elem.done)
2715     {
2716       /*
2717         If our wait is done, but we have (or were passed) responsibility for
2718         the small wait, then we need to pass on that task to someone else.
2719       */
2720       if (elem.do_small_wait)
2721         promote_new_waiter(he);
2722       break;
2723     }
2724   }
2725 
2726   if (did_enter_cond)
2727     thd->EXIT_COND(&old_stage);
2728   else
2729     mysql_mutex_unlock(&LOCK_gtid_waiting);
2730   if (thd->killed)
2731     thd->send_kill_message();
2732 #endif  /* HAVE_REPLICATION */
2733   return timed_out ? -1 : 0;
2734 }
2735 
2736 
2737 static void
free_hash_element(void * p)2738 free_hash_element(void *p)
2739 {
2740   gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
2741   delete_queue(&e->queue);
2742   my_free(e);
2743 }
2744 
2745 
2746 void
init()2747 gtid_waiting::init()
2748 {
2749   my_hash_init(&hash, &my_charset_bin, 32,
2750                offsetof(hash_element, domain_id), sizeof(uint32), NULL,
2751                free_hash_element, HASH_UNIQUE);
2752   mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
2753 }
2754 
2755 
2756 void
destroy()2757 gtid_waiting::destroy()
2758 {
2759   mysql_mutex_destroy(&LOCK_gtid_waiting);
2760   my_hash_free(&hash);
2761 }
2762 
2763 
2764 static int
cmp_queue_elem(void *,uchar * a,uchar * b)2765 cmp_queue_elem(void *, uchar *a, uchar *b)
2766 {
2767   uint64 seq_no_a= *(uint64 *)a;
2768   uint64 seq_no_b= *(uint64 *)b;
2769   if (seq_no_a < seq_no_b)
2770     return -1;
2771   else if (seq_no_a == seq_no_b)
2772     return 0;
2773   else
2774     return 1;
2775 }
2776 
2777 
2778 gtid_waiting::hash_element *
get_entry(uint32 domain_id)2779 gtid_waiting::get_entry(uint32 domain_id)
2780 {
2781   hash_element *e;
2782 
2783   if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
2784     return e;
2785 
2786   if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
2787     return NULL;
2788 
2789   if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
2790                  cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
2791   {
2792     my_error(ER_OUT_OF_RESOURCES, MYF(0));
2793     my_free(e);
2794     return NULL;
2795   }
2796   e->domain_id= domain_id;
2797   if (my_hash_insert(&hash, (uchar *)e))
2798   {
2799     my_error(ER_OUT_OF_RESOURCES, MYF(0));
2800     delete_queue(&e->queue);
2801     my_free(e);
2802     return NULL;
2803   }
2804   return e;
2805 }
2806 
2807 
2808 int
register_in_wait_queue(THD * thd,rpl_gtid * wait_gtid,gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2809 gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
2810                                      gtid_waiting::hash_element *he,
2811                                      gtid_waiting::queue_element *elem)
2812 {
2813   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2814 
2815   if (queue_insert_safe(&he->queue, (uchar *)elem))
2816   {
2817     my_error(ER_OUT_OF_RESOURCES, MYF(0));
2818     return 1;
2819   }
2820 
2821   return 0;
2822 }
2823 
2824 
2825 void
remove_from_wait_queue(gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2826 gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
2827                                      gtid_waiting::queue_element *elem)
2828 {
2829   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2830 
2831   queue_remove(&he->queue, elem->queue_idx);
2832 }
2833