1 /* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
2 Copyright (c) 2020, MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
16
17
18 /* Definitions for MariaDB global transaction ID (GTID). */
19
20 #include "mariadb.h"
21 #include "sql_priv.h"
22 #include "unireg.h"
23 #include "mariadb.h"
24 #include "sql_base.h"
25 #include "sql_parse.h"
26 #include "key.h"
27 #include "rpl_gtid.h"
28 #include "rpl_rli.h"
29 #include "slave.h"
30 #include "log_event.h"
31
32 const LEX_CSTRING rpl_gtid_slave_state_table_name=
33 { STRING_WITH_LEN("gtid_slave_pos") };
34
35
36 void
update_state_hash(uint64 sub_id,rpl_gtid * gtid,void * hton,rpl_group_info * rgi)37 rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
38 rpl_group_info *rgi)
39 {
40 int err;
41 /*
42 Add the gtid to the HASH in the replication slave state.
43
44 We must do this only _after_ commit, so that for parallel replication,
45 there will not be an attempt to delete the corresponding table row before
46 it is even committed.
47 */
48 mysql_mutex_lock(&LOCK_slave_state);
49 err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
50 mysql_mutex_unlock(&LOCK_slave_state);
51 if (err)
52 {
53 sql_print_warning("Slave: Out of memory during slave state maintenance. "
54 "Some no longer necessary rows in table "
55 "mysql.%s may be left undeleted.",
56 rpl_gtid_slave_state_table_name.str);
57 /*
58 Such failure is not fatal. We will fail to delete the row for this
59 GTID, but it will do no harm and will be removed automatically on next
60 server restart.
61 */
62 }
63 }
64
65
66 int
record_and_update_gtid(THD * thd,rpl_group_info * rgi)67 rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
68 {
69 DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
70
71 /*
72 Update the GTID position, if we have it and did not already update
73 it in a GTID transaction.
74 */
75 if (rgi->gtid_pending)
76 {
77 uint64 sub_id= rgi->gtid_sub_id;
78 void *hton= NULL;
79
80 rgi->gtid_pending= false;
81 if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
82 {
83 if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton))
84 DBUG_RETURN(1);
85 update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
86 }
87 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
88 }
89 DBUG_RETURN(0);
90 }
91
92
93 /*
94 Check GTID event execution when --gtid-ignore-duplicates.
95
96 The idea with --gtid-ignore-duplicates is that we allow multiple master
97 connections (in multi-source replication) to all receive the same GTIDs and
98 event groups. Only one instance of each is applied; we use the sequence
99 number in the GTID to decide whether a GTID has already been applied.
100
101 So if the seq_no of a GTID (or a higher sequence number) has already been
102 applied, then the event should be skipped. If not then the event should be
103 applied.
104
105 To avoid two master connections tring to apply the same event
106 simultaneously, only one is allowed to work in any given domain at any point
107 in time. The associated Relay_log_info object is called the owner of the
108 domain (and there can be multiple parallel worker threads working in that
109 domain for that Relay_log_info). Any other Relay_log_info/master connection
110 must wait for the domain to become free, or for their GTID to have been
111 applied, before being allowed to proceed.
112
113 Returns:
114 0 This GTID is already applied, it should be skipped.
115 1 The GTID is not yet applied; this rli is now the owner, and must apply
116 the event and release the domain afterwards.
117 -1 Error (out of memory to allocate a new element for the domain).
118 */
119 int
check_duplicate_gtid(rpl_gtid * gtid,rpl_group_info * rgi)120 rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
121 {
122 uint32 domain_id= gtid->domain_id;
123 uint64 seq_no= gtid->seq_no;
124 rpl_slave_state::element *elem;
125 int res;
126 bool did_enter_cond= false;
127 PSI_stage_info old_stage;
128 THD *UNINIT_VAR(thd);
129 Relay_log_info *rli= rgi->rli;
130
131 mysql_mutex_lock(&LOCK_slave_state);
132 if (!(elem= get_element(domain_id)))
133 {
134 my_error(ER_OUT_OF_RESOURCES, MYF(0));
135 res= -1;
136 goto err;
137 }
138 /*
139 Note that the elem pointer does not change once inserted in the hash. So
140 we can re-use the pointer without looking it up again in the hash after
141 each lock release and re-take.
142 */
143
144 for (;;)
145 {
146 if (elem->highest_seq_no >= seq_no)
147 {
148 /* This sequence number is already applied, ignore it. */
149 res= 0;
150 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
151 break;
152 }
153 if (!elem->owner_rli)
154 {
155 /* The domain became free, grab it and apply the event. */
156 elem->owner_rli= rli;
157 elem->owner_count= 1;
158 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
159 res= 1;
160 break;
161 }
162 if (elem->owner_rli == rli)
163 {
164 /* Already own this domain, increment reference count and apply event. */
165 ++elem->owner_count;
166 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
167 res= 1;
168 break;
169 }
170 thd= rgi->thd;
171 if (unlikely(thd->check_killed()))
172 {
173 res= -1;
174 break;
175 }
176 /*
177 Someone else is currently processing this GTID (or an earlier one).
178 Wait for them to complete (or fail), and then check again.
179 */
180 if (!did_enter_cond)
181 {
182 thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
183 &stage_gtid_wait_other_connection, &old_stage);
184 did_enter_cond= true;
185 }
186 mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
187 &LOCK_slave_state);
188 }
189
190 err:
191 if (did_enter_cond)
192 thd->EXIT_COND(&old_stage);
193 else
194 mysql_mutex_unlock(&LOCK_slave_state);
195 return res;
196 }
197
198
199 void
release_domain_owner(rpl_group_info * rgi)200 rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
201 {
202 element *elem= NULL;
203
204 mysql_mutex_lock(&LOCK_slave_state);
205 if (!(elem= get_element(rgi->current_gtid.domain_id)))
206 {
207 /*
208 We cannot really deal with error here, as we are already called in an
209 error handling case (transaction failure and rollback).
210
211 However, get_element() only fails if the element did not exist already
212 and could not be allocated due to out-of-memory - and if it did not
213 exist, then we would not get here in the first place.
214 */
215 mysql_mutex_unlock(&LOCK_slave_state);
216 return;
217 }
218
219 if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
220 {
221 uint32 count= elem->owner_count;
222 DBUG_ASSERT(count > 0);
223 DBUG_ASSERT(elem->owner_rli == rgi->rli);
224 --count;
225 elem->owner_count= count;
226 if (count == 0)
227 {
228 elem->owner_rli= NULL;
229 mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
230 }
231 }
232 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
233 mysql_mutex_unlock(&LOCK_slave_state);
234 }
235
236
237 static void
rpl_slave_state_free_element(void * arg)238 rpl_slave_state_free_element(void *arg)
239 {
240 struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
241 mysql_cond_destroy(&elem->COND_wait_gtid);
242 mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
243 my_free(elem);
244 }
245
246
rpl_slave_state()247 rpl_slave_state::rpl_slave_state()
248 : last_sub_id(0), gtid_pos_tables(0), loaded(false)
249 {
250 mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
251 MY_MUTEX_INIT_SLOW);
252 my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
253 sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
254 my_init_dynamic_array(>id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
255 }
256
257
~rpl_slave_state()258 rpl_slave_state::~rpl_slave_state()
259 {
260 free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables);
261 truncate_hash();
262 my_hash_free(&hash);
263 delete_dynamic(>id_sort_array);
264 mysql_mutex_destroy(&LOCK_slave_state);
265 }
266
267
268 void
truncate_hash()269 rpl_slave_state::truncate_hash()
270 {
271 uint32 i;
272
273 for (i= 0; i < hash.records; ++i)
274 {
275 element *e= (element *)my_hash_element(&hash, i);
276 list_element *l= e->list;
277 list_element *next;
278 while (l)
279 {
280 next= l->next;
281 my_free(l);
282 l= next;
283 }
284 /* The element itself is freed by the hash element free function. */
285 }
286 my_hash_reset(&hash);
287 }
288
289
290 int
update(uint32 domain_id,uint32 server_id,uint64 sub_id,uint64 seq_no,void * hton,rpl_group_info * rgi)291 rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
292 uint64 seq_no, void *hton, rpl_group_info *rgi)
293 {
294 element *elem= NULL;
295 list_element *list_elem= NULL;
296
297 DBUG_ASSERT(hton || !loaded);
298 if (!(elem= get_element(domain_id)))
299 return 1;
300
301 if (seq_no > elem->highest_seq_no)
302 elem->highest_seq_no= seq_no;
303 if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
304 {
305 /*
306 Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
307 Signal (and remove) them. The waiter will handle all the processing
308 of all pending MASTER_GTID_WAIT(), so we do not slow down the
309 replication SQL thread.
310 */
311 mysql_mutex_assert_owner(&LOCK_slave_state);
312 elem->gtid_waiter= NULL;
313 mysql_cond_broadcast(&elem->COND_wait_gtid);
314 }
315
316 if (rgi)
317 {
318 if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
319 {
320 #ifdef DBUG_ASSERT_EXISTS
321 Relay_log_info *rli= rgi->rli;
322 #endif
323 uint32 count= elem->owner_count;
324 DBUG_ASSERT(count > 0);
325 DBUG_ASSERT(elem->owner_rli == rli);
326 --count;
327 elem->owner_count= count;
328 if (count == 0)
329 {
330 elem->owner_rli= NULL;
331 mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
332 }
333 }
334 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
335
336 #ifdef HAVE_REPLICATION
337 rgi->pending_gtid_deletes_clear();
338 #endif
339 }
340
341 if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
342 return 1;
343 list_elem->server_id= server_id;
344 list_elem->sub_id= sub_id;
345 list_elem->seq_no= seq_no;
346 list_elem->hton= hton;
347
348 elem->add(list_elem);
349 if (last_sub_id < sub_id)
350 last_sub_id= sub_id;
351
352 return 0;
353 }
354
355
356 struct rpl_slave_state::element *
get_element(uint32 domain_id)357 rpl_slave_state::get_element(uint32 domain_id)
358 {
359 struct element *elem;
360
361 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
362 if (elem)
363 return elem;
364
365 if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
366 return NULL;
367 elem->list= NULL;
368 elem->domain_id= domain_id;
369 elem->highest_seq_no= 0;
370 elem->gtid_waiter= NULL;
371 elem->owner_rli= NULL;
372 elem->owner_count= 0;
373 mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
374 mysql_cond_init(key_COND_gtid_ignore_duplicates,
375 &elem->COND_gtid_ignore_duplicates, 0);
376 if (my_hash_insert(&hash, (uchar *)elem))
377 {
378 my_free(elem);
379 return NULL;
380 }
381 return elem;
382 }
383
384
385 int
put_back_list(uint32 domain_id,list_element * list)386 rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
387 {
388 element *e;
389 int err= 0;
390
391 mysql_mutex_lock(&LOCK_slave_state);
392 if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
393 {
394 err= 1;
395 goto end;
396 }
397 while (list)
398 {
399 list_element *next= list->next;
400 e->add(list);
401 list= next;
402 }
403
404 end:
405 mysql_mutex_unlock(&LOCK_slave_state);
406 return err;
407 }
408
409
410 int
truncate_state_table(THD * thd)411 rpl_slave_state::truncate_state_table(THD *thd)
412 {
413 TABLE_LIST tlist;
414 int err= 0;
415
416 tmp_disable_binlog(thd);
417 tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE);
418 if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
419 {
420 tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql",
421 rpl_gtid_slave_state_table_name.str, false);
422 err= tlist.table->file->ha_truncate();
423
424 if (err)
425 {
426 ha_rollback_trans(thd, FALSE);
427 close_thread_tables(thd);
428 ha_rollback_trans(thd, TRUE);
429 }
430 else
431 {
432 ha_commit_trans(thd, FALSE);
433 close_thread_tables(thd);
434 ha_commit_trans(thd, TRUE);
435 }
436 thd->release_transactional_locks();
437 }
438
439 reenable_binlog(thd);
440 return err;
441 }
442
443
444 static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
445 { { STRING_WITH_LEN("domain_id") },
446 { STRING_WITH_LEN("int(10) unsigned") },
447 {NULL, 0} },
448 { { STRING_WITH_LEN("sub_id") },
449 { STRING_WITH_LEN("bigint(20) unsigned") },
450 {NULL, 0} },
451 { { STRING_WITH_LEN("server_id") },
452 { STRING_WITH_LEN("int(10) unsigned") },
453 {NULL, 0} },
454 { { STRING_WITH_LEN("seq_no") },
455 { STRING_WITH_LEN("bigint(20) unsigned") },
456 {NULL, 0} },
457 };
458
459 static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
460
461 static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
462 array_elements(mysql_rpl_slave_state_coltypes),
463 mysql_rpl_slave_state_coltypes,
464 array_elements(mysql_rpl_slave_state_pk_parts),
465 mysql_rpl_slave_state_pk_parts
466 };
467
468 static Table_check_intact_log_error gtid_table_intact;
469
470 /*
471 Check that the mysql.gtid_slave_pos table has the correct definition.
472 */
473 int
gtid_check_rpl_slave_state_table(TABLE * table)474 gtid_check_rpl_slave_state_table(TABLE *table)
475 {
476 int err;
477
478 if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
479 my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
480 rpl_gtid_slave_state_table_name.str);
481 return err;
482 }
483
484
485 /*
486 Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
487 that is already in use by the current transaction, if any.
488 */
489 void
select_gtid_pos_table(THD * thd,LEX_CSTRING * out_tablename)490 rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
491 {
492 struct gtid_pos_table *list, *table_entry, *default_entry;
493
494 /*
495 See comments on rpl_slave_state::gtid_pos_tables for rules around proper
496 access to the list.
497 */
498 list= (struct gtid_pos_table *)
499 my_atomic_loadptr_explicit(>id_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
500
501 Ha_trx_info *ha_info;
502 uint count = 0;
503 for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next())
504 {
505 void *trx_hton= ha_info->ht();
506 table_entry= list;
507
508 if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
509 continue;
510 while (table_entry)
511 {
512 if (table_entry->table_hton == trx_hton)
513 {
514 if (likely(table_entry->state == GTID_POS_AVAILABLE))
515 {
516 *out_tablename= table_entry->table_name;
517 /*
518 Check if this is a cross-engine transaction, so we can correctly
519 maintain the rpl_transactions_multi_engine status variable.
520 */
521 if (count >= 1)
522 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
523 else
524 {
525 for (;;)
526 {
527 ha_info= ha_info->next();
528 if (!ha_info)
529 break;
530 if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
531 {
532 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
533 break;
534 }
535 }
536 }
537 return;
538 }
539 /*
540 This engine is marked to automatically create the table.
541 We cannot easily do this here (possibly in the middle of a
542 transaction). But we can request the slave background thread
543 to create it, and in a short while it should become available
544 for following transactions.
545 */
546 #ifdef HAVE_REPLICATION
547 slave_background_gtid_pos_create_request(table_entry);
548 #endif
549 break;
550 }
551 table_entry= table_entry->next;
552 }
553 ++count;
554 }
555 /*
556 If we cannot find any table whose engine matches an engine that is
557 already active in the transaction, or if there is no current transaction
558 engines available, we return the default gtid_slave_pos table.
559 */
560 default_entry= (struct gtid_pos_table *)
561 my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE);
562 *out_tablename= default_entry->table_name;
563 /* Record in status that we failed to find a suitable gtid_pos table. */
564 if (count > 0)
565 {
566 statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
567 if (count > 1)
568 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
569 }
570 }
571
572
573 /*
574 Write a gtid to the replication slave state table.
575
576 gtid The global transaction id for this event group.
577 sub_id Value allocated within the sub_id when the event group was
578 read (sub_id must be consistent with commit order in master binlog).
579 rgi rpl_group_info context, if we are recording the gtid transactionally
580 as part of replicating a transactional event. NULL if called from
581 outside of a replicated transaction.
582
583 Note that caller must later ensure that the new gtid and sub_id is inserted
584 into the appropriate HASH element with rpl_slave_state.add(), so that it can
585 be deleted later. But this must only be done after COMMIT if in transaction.
586 */
587 int
record_gtid(THD * thd,const rpl_gtid * gtid,uint64 sub_id,rpl_group_info * rgi,bool in_statement,void ** out_hton)588 rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
589 rpl_group_info *rgi, bool in_statement,
590 void **out_hton)
591 {
592 TABLE_LIST tlist;
593 int err= 0, not_sql_thread;
594 bool table_opened= false;
595 TABLE *table;
596 list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
597 uint64 best_sub_id;
598 element *elem;
599 ulonglong thd_saved_option= thd->variables.option_bits;
600 Query_tables_list lex_backup;
601 wait_for_commit* suspended_wfc;
602 void *hton= NULL;
603 LEX_CSTRING gtid_pos_table_name;
604 DBUG_ENTER("record_gtid");
605
606 *out_hton= NULL;
607 if (unlikely(!loaded))
608 {
609 /*
610 Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or
611 corrupt.
612
613 We already complained loudly about this, but we can try to continue
614 until the DBA fixes it.
615 */
616 DBUG_RETURN(0);
617 }
618
619 if (!in_statement)
620 thd->reset_for_next_command();
621
622 /*
623 Only the SQL thread can call select_gtid_pos_table without a mutex
624 Other threads needs to use a mutex and take into account that the
625 result may change during execution, so we have to make a copy.
626 */
627
628 if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL)))
629 mysql_mutex_lock(&LOCK_slave_state);
630 select_gtid_pos_table(thd, >id_pos_table_name);
631 if (not_sql_thread)
632 {
633 LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str,
634 gtid_pos_table_name.length);
635 mysql_mutex_unlock(&LOCK_slave_state);
636 if (!tmp)
637 DBUG_RETURN(1);
638 gtid_pos_table_name= *tmp;
639 }
640
641 DBUG_EXECUTE_IF("gtid_inject_record_gtid",
642 {
643 my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
644 DBUG_RETURN(1);
645 } );
646
647 /*
648 If we are applying a non-transactional event group, we will be committing
649 here a transaction, but that does not imply that the event group has
650 completed or has been binlogged. So we should not trigger
651 wakeup_subsequent_commits() here.
652
653 Note: An alternative here could be to put a call to mark_start_commit() in
654 stmt_done() before the call to record_and_update_gtid(). This would
655 prevent later calling mark_start_commit() after we have run
656 wakeup_subsequent_commits() from committing the GTID update transaction
657 (which must be avoided to avoid accessing freed group_commit_orderer
658 object). It would also allow following event groups to start slightly
659 earlier. And in the cases where record_gtid() is called without an active
660 transaction, the current statement should have been binlogged already, so
661 binlog order is preserved.
662
663 But this is rather subtle, and potentially fragile. And it does not really
664 seem worth it; non-transactional loads are unlikely to benefit much from
665 parallel replication in any case. So for now, we go with the simple
666 suspend/resume of wakeup_subsequent_commits() here in record_gtid().
667 */
668 suspended_wfc= thd->suspend_subsequent_commits();
669 thd->lex->reset_n_backup_query_tables_list(&lex_backup);
670 tlist.init_one_table(&MYSQL_SCHEMA_NAME, >id_pos_table_name, NULL, TL_WRITE);
671 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
672 goto end;
673 table_opened= true;
674 table= tlist.table;
675 hton= table->s->db_type();
676
677 if ((err= gtid_check_rpl_slave_state_table(table)))
678 goto end;
679
680 #ifdef WITH_WSREP
681 /*
682 Updates in slave state table should not be appended to galera transaction
683 writeset.
684 */
685 thd->wsrep_ignore_table= true;
686 #endif
687
688 if (!rgi)
689 {
690 DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
691 thd->variables.option_bits&=
692 ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
693 OPTION_GTID_BEGIN);
694 }
695 else
696 thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
697
698 bitmap_set_all(table->write_set);
699 table->rpl_write_set= table->write_set;
700
701 table->field[0]->store((ulonglong)gtid->domain_id, true);
702 table->field[1]->store(sub_id, true);
703 table->field[2]->store((ulonglong)gtid->server_id, true);
704 table->field[3]->store(gtid->seq_no, true);
705 DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
706 if ((err= table->file->ha_write_row(table->record[0])))
707 {
708 table->file->print_error(err, MYF(0));
709 goto end;
710 }
711 *out_hton= hton;
712
713 if(opt_bin_log &&
714 (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
715 gtid->seq_no)))
716 {
717 my_error(ER_OUT_OF_RESOURCES, MYF(0));
718 goto end;
719 }
720
721 mysql_mutex_lock(&LOCK_slave_state);
722 if ((elem= get_element(gtid->domain_id)) == NULL)
723 {
724 mysql_mutex_unlock(&LOCK_slave_state);
725 my_error(ER_OUT_OF_RESOURCES, MYF(0));
726 err= 1;
727 goto end;
728 }
729
730 /* Now pull out all GTIDs that were recorded in this engine. */
731 delete_list = NULL;
732 next_ptr_ptr= &elem->list;
733 cur= elem->list;
734 best_sub_id= 0;
735 best_ptr_ptr= NULL;
736 while (cur)
737 {
738 list_element *next= cur->next;
739 if (cur->hton == hton)
740 {
741 /* Belongs to same engine, so move it to the delete list. */
742 cur->next= delete_list;
743 delete_list= cur;
744 if (cur->sub_id > best_sub_id)
745 {
746 best_sub_id= cur->sub_id;
747 best_ptr_ptr= &delete_list;
748 }
749 else if (best_ptr_ptr == &delete_list)
750 best_ptr_ptr= &cur->next;
751 }
752 else
753 {
754 /* Another engine, leave it in the list. */
755 if (cur->sub_id > best_sub_id)
756 {
757 best_sub_id= cur->sub_id;
758 /* Current best is not on the delete list. */
759 best_ptr_ptr= NULL;
760 }
761 *next_ptr_ptr= cur;
762 next_ptr_ptr= &cur->next;
763 }
764 cur= next;
765 }
766 *next_ptr_ptr= NULL;
767 /*
768 If the highest sub_id element is on the delete list, put it back on the
769 original list, to preserve the highest sub_id element in the table for
770 GTID position recovery.
771 */
772 if (best_ptr_ptr)
773 {
774 cur= *best_ptr_ptr;
775 *best_ptr_ptr= cur->next;
776 cur->next= elem->list;
777 elem->list= cur;
778 }
779 mysql_mutex_unlock(&LOCK_slave_state);
780
781 if (!delete_list)
782 goto end;
783
784 /* Now delete any already committed GTIDs. */
785 bitmap_set_bit(table->read_set, table->field[0]->field_index);
786 bitmap_set_bit(table->read_set, table->field[1]->field_index);
787
788 if ((err= table->file->ha_index_init(0, 0)))
789 {
790 table->file->print_error(err, MYF(0));
791 goto end;
792 }
793 cur = delete_list;
794 while (cur)
795 {
796 uchar key_buffer[4+8];
797
798 DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
799 { err= ENOENT;
800 table->file->print_error(err, MYF(0));
801 /* `break' does not work inside DBUG_EXECUTE_IF */
802 goto dbug_break; });
803
804 next= cur->next;
805
806 table->field[1]->store(cur->sub_id, true);
807 /* domain_id is already set in table->record[0] from write_row() above. */
808 key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
809 if (table->file->ha_index_read_map(table->record[1], key_buffer,
810 HA_WHOLE_KEY, HA_READ_KEY_EXACT))
811 /* We cannot find the row, assume it is already deleted. */
812 ;
813 else if ((err= table->file->ha_delete_row(table->record[1])))
814 table->file->print_error(err, MYF(0));
815 /*
816 In case of error, we still discard the element from the list. We do
817 not want to endlessly error on the same element in case of table
818 corruption or such.
819 */
820 cur= next;
821 if (err)
822 break;
823 }
824 IF_DBUG(dbug_break:, )
825 table->file->ha_index_end();
826
827 end:
828
829 #ifdef WITH_WSREP
830 thd->wsrep_ignore_table= false;
831 #endif
832
833 if (table_opened)
834 {
835 if (err || (err= ha_commit_trans(thd, FALSE)))
836 {
837 /*
838 If error, we need to put any remaining delete_list back into the HASH
839 so we can do another delete attempt later.
840 */
841 if (delete_list)
842 {
843 put_back_list(gtid->domain_id, delete_list);
844 delete_list = 0;
845 }
846
847 ha_rollback_trans(thd, FALSE);
848 }
849 close_thread_tables(thd);
850 if (rgi)
851 {
852 thd->mdl_context.release_statement_locks();
853 /*
854 Save the list of old gtid entries we deleted. If this transaction
855 fails later for some reason and is rolled back, the deletion of those
856 entries will be rolled back as well, and we will need to put them back
857 on the to-be-deleted list so we can re-do the deletion. Otherwise
858 redundant rows in mysql.gtid_slave_pos may accumulate if transactions
859 are rolled back and retried after record_gtid().
860 */
861 #ifdef HAVE_REPLICATION
862 rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list);
863 #endif
864 }
865 else
866 {
867 thd->release_transactional_locks();
868 #ifdef HAVE_REPLICATION
869 rpl_group_info::pending_gtid_deletes_free(delete_list);
870 #endif
871 }
872 }
873 thd->lex->restore_backup_query_tables_list(&lex_backup);
874 thd->variables.option_bits= thd_saved_option;
875 thd->resume_subsequent_commits(suspended_wfc);
876 DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
877 {
878 if (gtid->server_id == 100)
879 my_sleep(500000);
880 });
881 DBUG_RETURN(err);
882 }
883
884
885 uint64
next_sub_id(uint32 domain_id)886 rpl_slave_state::next_sub_id(uint32 domain_id)
887 {
888 uint64 sub_id= 0;
889
890 mysql_mutex_lock(&LOCK_slave_state);
891 sub_id= ++last_sub_id;
892 mysql_mutex_unlock(&LOCK_slave_state);
893
894 return sub_id;
895 }
896
897 /* A callback used in sorting of gtid list based on domain_id. */
rpl_gtid_cmp_cb(const void * id1,const void * id2)898 static int rpl_gtid_cmp_cb(const void *id1, const void *id2)
899 {
900 uint32 d1= ((rpl_gtid *)id1)->domain_id;
901 uint32 d2= ((rpl_gtid *)id2)->domain_id;
902
903 if (d1 < d2)
904 return -1;
905 else if (d1 > d2)
906 return 1;
907 return 0;
908 }
909
910 /* Format the specified gtid and store it in the given string buffer. */
911 bool
rpl_slave_state_tostring_helper(String * dest,const rpl_gtid * gtid,bool * first)912 rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
913 {
914 if (*first)
915 *first= false;
916 else
917 if (dest->append(",",1))
918 return true;
919 return
920 dest->append_ulonglong(gtid->domain_id) ||
921 dest->append("-",1) ||
922 dest->append_ulonglong(gtid->server_id) ||
923 dest->append("-",1) ||
924 dest->append_ulonglong(gtid->seq_no);
925 }
926
927 /*
928 Sort the given gtid list based on domain_id and store them in the specified
929 string.
930 */
931 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,String * str)932 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, String *str)
933 {
934 bool first= true, res= true;
935
936 sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
937
938 for (uint i= 0; i < gtid_dynarr->elements; i ++)
939 {
940 rpl_gtid *gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
941 if (rpl_slave_state_tostring_helper(str, gtid, &first))
942 goto err;
943 }
944 res= false;
945
946 err:
947 return res;
948 }
949
950
951 /* Sort the given gtid list based on domain_id and call cb for each gtid. */
952 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,int (* cb)(rpl_gtid *,void *),void * data)953 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr,
954 int (*cb)(rpl_gtid *, void *),
955 void *data)
956 {
957 rpl_gtid *gtid;
958 bool res= true;
959
960 sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
961
962 for (uint i= 0; i < gtid_dynarr->elements; i ++)
963 {
964 gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
965 if ((*cb)(gtid, data))
966 goto err;
967 }
968 res= false;
969
970 err:
971 return res;
972 }
973
974 int
iterate(int (* cb)(rpl_gtid *,void *),void * data,rpl_gtid * extra_gtids,uint32 num_extra,bool sort)975 rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
976 rpl_gtid *extra_gtids, uint32 num_extra,
977 bool sort)
978 {
979 uint32 i;
980 HASH gtid_hash;
981 uchar *rec;
982 rpl_gtid *gtid;
983 int res= 1;
984 bool locked= false;
985
986 my_hash_init(>id_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
987 sizeof(uint32), NULL, NULL, HASH_UNIQUE);
988 for (i= 0; i < num_extra; ++i)
989 if (extra_gtids[i].server_id == global_system_variables.server_id &&
990 my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i])))
991 goto err;
992
993 mysql_mutex_lock(&LOCK_slave_state);
994 locked= true;
995 reset_dynamic(>id_sort_array);
996
997 for (i= 0; i < hash.records; ++i)
998 {
999 uint64 best_sub_id;
1000 rpl_gtid best_gtid;
1001 element *e= (element *)my_hash_element(&hash, i);
1002 list_element *l= e->list;
1003
1004 if (!l)
1005 continue; /* Nothing here */
1006
1007 best_gtid.domain_id= e->domain_id;
1008 best_gtid.server_id= l->server_id;
1009 best_gtid.seq_no= l->seq_no;
1010 best_sub_id= l->sub_id;
1011 while ((l= l->next))
1012 {
1013 if (l->sub_id > best_sub_id)
1014 {
1015 best_sub_id= l->sub_id;
1016 best_gtid.server_id= l->server_id;
1017 best_gtid.seq_no= l->seq_no;
1018 }
1019 }
1020
1021 /* Check if we have something newer in the extra list. */
1022 rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, 0);
1023 if (rec)
1024 {
1025 gtid= (rpl_gtid *)rec;
1026 if (gtid->seq_no > best_gtid.seq_no)
1027 memcpy(&best_gtid, gtid, sizeof(best_gtid));
1028 if (my_hash_delete(>id_hash, rec))
1029 {
1030 goto err;
1031 }
1032 }
1033
1034 if ((res= sort ? insert_dynamic(>id_sort_array,
1035 (const void *) &best_gtid) :
1036 (*cb)(&best_gtid, data)))
1037 {
1038 goto err;
1039 }
1040 }
1041
1042 /* Also add any remaining extra domain_ids. */
1043 for (i= 0; i < gtid_hash.records; ++i)
1044 {
1045 gtid= (rpl_gtid *)my_hash_element(>id_hash, i);
1046 if ((res= sort ? insert_dynamic(>id_sort_array, (const void *) gtid) :
1047 (*cb)(gtid, data)))
1048 {
1049 goto err;
1050 }
1051 }
1052
1053 if (sort && rpl_slave_state_tostring_helper(>id_sort_array, cb, data))
1054 {
1055 goto err;
1056 }
1057
1058 res= 0;
1059
1060 err:
1061 if (locked) mysql_mutex_unlock(&LOCK_slave_state);
1062 my_hash_free(>id_hash);
1063
1064 return res;
1065 }
1066
1067
1068 struct rpl_slave_state_tostring_data {
1069 String *dest;
1070 bool first;
1071 };
1072 static int
rpl_slave_state_tostring_cb(rpl_gtid * gtid,void * data)1073 rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
1074 {
1075 rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
1076 return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
1077 }
1078
1079
1080 /*
1081 Prepare the current slave state as a string, suitable for sending to the
1082 master to request to receive binlog events starting from that GTID state.
1083
1084 The state consists of the most recently applied GTID for each domain_id,
1085 ie. the one with the highest sub_id within each domain_id.
1086
1087 Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
1088 a server was previously a master and now needs to connect to a new master as
1089 a slave. For each domain_id, if the GTID in the binlog was logged with our
1090 own server_id _and_ has a higher seq_no than what is in the slave state,
1091 then this should be used as the position to start replicating at. This
1092 allows to promote a slave as new master, and connect the old master as a
1093 slave with MASTER_GTID_POS=AUTO.
1094 */
1095 int
tostring(String * dest,rpl_gtid * extra_gtids,uint32 num_extra)1096 rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
1097 {
1098 struct rpl_slave_state_tostring_data data;
1099 data.first= true;
1100 data.dest= dest;
1101
1102 return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids,
1103 num_extra, true);
1104 }
1105
1106
1107 /*
1108 Lookup a domain_id in the current replication slave state.
1109
1110 Returns false if the domain_id has no entries in the slave state.
1111 Otherwise returns true, and fills in out_gtid with the corresponding
1112 GTID.
1113 */
1114 bool
domain_to_gtid(uint32 domain_id,rpl_gtid * out_gtid)1115 rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
1116 {
1117 element *elem;
1118 list_element *list;
1119 uint64 best_sub_id;
1120
1121 mysql_mutex_lock(&LOCK_slave_state);
1122 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1123 if (!elem || !(list= elem->list))
1124 {
1125 mysql_mutex_unlock(&LOCK_slave_state);
1126 return false;
1127 }
1128
1129 out_gtid->domain_id= domain_id;
1130 out_gtid->server_id= list->server_id;
1131 out_gtid->seq_no= list->seq_no;
1132 best_sub_id= list->sub_id;
1133
1134 while ((list= list->next))
1135 {
1136 if (best_sub_id > list->sub_id)
1137 continue;
1138 best_sub_id= list->sub_id;
1139 out_gtid->server_id= list->server_id;
1140 out_gtid->seq_no= list->seq_no;
1141 }
1142
1143 mysql_mutex_unlock(&LOCK_slave_state);
1144 return true;
1145 }
1146
1147
1148 /*
1149 Parse a GTID at the start of a string, and update the pointer to point
1150 at the first character after the parsed GTID.
1151
1152 Returns 0 on ok, non-zero on parse error.
1153 */
1154 static int
gtid_parser_helper(const char ** ptr,const char * end,rpl_gtid * out_gtid)1155 gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
1156 {
1157 char *q;
1158 const char *p= *ptr;
1159 uint64 v1, v2, v3;
1160 int err= 0;
1161
1162 q= (char*) end;
1163 v1= (uint64)my_strtoll10(p, &q, &err);
1164 if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
1165 return 1;
1166 p= q+1;
1167 q= (char*) end;
1168 v2= (uint64)my_strtoll10(p, &q, &err);
1169 if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
1170 return 1;
1171 p= q+1;
1172 q= (char*) end;
1173 v3= (uint64)my_strtoll10(p, &q, &err);
1174 if (err != 0)
1175 return 1;
1176
1177 out_gtid->domain_id= (uint32) v1;
1178 out_gtid->server_id= (uint32) v2;
1179 out_gtid->seq_no= v3;
1180 *ptr= q;
1181 return 0;
1182 }
1183
1184
1185 rpl_gtid *
gtid_parse_string_to_list(const char * str,size_t str_len,uint32 * out_len)1186 gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
1187 {
1188 const char *p= const_cast<char *>(str);
1189 const char *end= p + str_len;
1190 uint32 len= 0, alloc_len= 5;
1191 rpl_gtid *list= NULL;
1192
1193 for (;;)
1194 {
1195 rpl_gtid gtid;
1196
1197 if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id))
1198 {
1199 my_free(list);
1200 return NULL;
1201 }
1202 if ((!list || len >= alloc_len) &&
1203 !(list=
1204 (rpl_gtid *)my_realloc(list,
1205 (alloc_len= alloc_len*2) * sizeof(rpl_gtid),
1206 MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
1207 return NULL;
1208 list[len++]= gtid;
1209
1210 if (p == end)
1211 break;
1212 if (*p != ',')
1213 {
1214 my_free(list);
1215 return NULL;
1216 }
1217 ++p;
1218 }
1219 *out_len= len;
1220 return list;
1221 }
1222
1223
1224 /*
1225 Update the slave replication state with the GTID position obtained from
1226 master when connecting with old-style (filename,offset) position.
1227
1228 If RESET is true then all existing entries are removed. Otherwise only
1229 domain_ids mentioned in the STATE_FROM_MASTER are changed.
1230
1231 Returns 0 if ok, non-zero if error.
1232 */
1233 int
load(THD * thd,const char * state_from_master,size_t len,bool reset,bool in_statement)1234 rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
1235 bool reset, bool in_statement)
1236 {
1237 const char *end= state_from_master + len;
1238
1239 if (reset)
1240 {
1241 if (truncate_state_table(thd))
1242 return 1;
1243 truncate_hash();
1244 }
1245 if (state_from_master == end)
1246 return 0;
1247 for (;;)
1248 {
1249 rpl_gtid gtid;
1250 uint64 sub_id;
1251 void *hton= NULL;
1252
1253 if (gtid_parser_helper(&state_from_master, end, >id) ||
1254 !(sub_id= next_sub_id(gtid.domain_id)) ||
1255 record_gtid(thd, >id, sub_id, NULL, in_statement, &hton) ||
1256 update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
1257 return 1;
1258 if (state_from_master == end)
1259 break;
1260 if (*state_from_master != ',')
1261 return 1;
1262 ++state_from_master;
1263 }
1264 return 0;
1265 }
1266
1267
1268 bool
is_empty()1269 rpl_slave_state::is_empty()
1270 {
1271 uint32 i;
1272 bool result= true;
1273
1274 mysql_mutex_lock(&LOCK_slave_state);
1275 for (i= 0; i < hash.records; ++i)
1276 {
1277 element *e= (element *)my_hash_element(&hash, i);
1278 if (e->list)
1279 {
1280 result= false;
1281 break;
1282 }
1283 }
1284 mysql_mutex_unlock(&LOCK_slave_state);
1285
1286 return result;
1287 }
1288
1289
1290 void
free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table * list)1291 rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
1292 {
1293 struct gtid_pos_table *cur, *next;
1294
1295 cur= list;
1296 while (cur)
1297 {
1298 next= cur->next;
1299 my_free(cur);
1300 cur= next;
1301 }
1302 }
1303
1304
1305 /*
1306 Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
1307 The caller must be holding LOCK_slave_state. Additionally, this function
1308 must only be called while all SQL threads are stopped.
1309 */
1310 void
set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table * new_list,rpl_slave_state::gtid_pos_table * default_entry)1311 rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
1312 rpl_slave_state::gtid_pos_table *default_entry)
1313 {
1314 gtid_pos_table *old_list;
1315
1316 mysql_mutex_assert_owner(&LOCK_slave_state);
1317 old_list= (struct gtid_pos_table *)gtid_pos_tables;
1318 my_atomic_storeptr_explicit(>id_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
1319 my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
1320 MY_MEMORY_ORDER_RELEASE);
1321 free_gtid_pos_tables(old_list);
1322 }
1323
1324
1325 void
add_gtid_pos_table(rpl_slave_state::gtid_pos_table * entry)1326 rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
1327 {
1328 mysql_mutex_assert_owner(&LOCK_slave_state);
1329 entry->next= (struct gtid_pos_table *)gtid_pos_tables;
1330 my_atomic_storeptr_explicit(>id_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
1331 }
1332
1333
1334 struct rpl_slave_state::gtid_pos_table *
alloc_gtid_pos_table(LEX_CSTRING * table_name,void * hton,rpl_slave_state::gtid_pos_table_state state)1335 rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton,
1336 rpl_slave_state::gtid_pos_table_state state)
1337 {
1338 struct gtid_pos_table *p;
1339 char *allocated_str;
1340
1341 if (!my_multi_malloc(MYF(MY_WME),
1342 &p, sizeof(*p),
1343 &allocated_str, table_name->length+1,
1344 NULL))
1345 {
1346 my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
1347 return NULL;
1348 }
1349 memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
1350 p->next = NULL;
1351 p->table_hton= hton;
1352 p->table_name.str= allocated_str;
1353 p->table_name.length= table_name->length;
1354 p->state= state;
1355 return p;
1356 }
1357
1358
init()1359 void rpl_binlog_state::init()
1360 {
1361 my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
1362 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1363 my_init_dynamic_array(>id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
1364 mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
1365 MY_MUTEX_INIT_SLOW);
1366 initialized= 1;
1367 }
1368
1369 void
reset_nolock()1370 rpl_binlog_state::reset_nolock()
1371 {
1372 uint32 i;
1373
1374 for (i= 0; i < hash.records; ++i)
1375 my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
1376 my_hash_reset(&hash);
1377 }
1378
1379
1380 void
reset()1381 rpl_binlog_state::reset()
1382 {
1383 mysql_mutex_lock(&LOCK_binlog_state);
1384 reset_nolock();
1385 mysql_mutex_unlock(&LOCK_binlog_state);
1386 }
1387
1388
free()1389 void rpl_binlog_state::free()
1390 {
1391 if (initialized)
1392 {
1393 initialized= 0;
1394 reset_nolock();
1395 my_hash_free(&hash);
1396 delete_dynamic(>id_sort_array);
1397 mysql_mutex_destroy(&LOCK_binlog_state);
1398 }
1399 }
1400
1401
1402 bool
load(struct rpl_gtid * list,uint32 count)1403 rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
1404 {
1405 uint32 i;
1406 bool res= false;
1407
1408 mysql_mutex_lock(&LOCK_binlog_state);
1409 reset_nolock();
1410 for (i= 0; i < count; ++i)
1411 {
1412 if (update_nolock(&(list[i]), false))
1413 {
1414 res= true;
1415 break;
1416 }
1417 }
1418 mysql_mutex_unlock(&LOCK_binlog_state);
1419 return res;
1420 }
1421
1422
rpl_binlog_state_load_cb(rpl_gtid * gtid,void * data)1423 static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
1424 {
1425 rpl_binlog_state *self= (rpl_binlog_state *)data;
1426 return self->update_nolock(gtid, false);
1427 }
1428
1429
1430 bool
load(rpl_slave_state * slave_pos)1431 rpl_binlog_state::load(rpl_slave_state *slave_pos)
1432 {
1433 bool res= false;
1434
1435 mysql_mutex_lock(&LOCK_binlog_state);
1436 reset_nolock();
1437 if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0, false))
1438 res= true;
1439 mysql_mutex_unlock(&LOCK_binlog_state);
1440 return res;
1441 }
1442
1443
~rpl_binlog_state()1444 rpl_binlog_state::~rpl_binlog_state()
1445 {
1446 free();
1447 }
1448
1449
1450 /*
1451 Update replication state with a new GTID.
1452
1453 If the (domain_id, server_id) pair already exists, then the new GTID replaces
1454 the old one for that domain id. Else a new entry is inserted.
1455
1456 Returns 0 for ok, 1 for error.
1457 */
1458 int
update_nolock(const struct rpl_gtid * gtid,bool strict)1459 rpl_binlog_state::update_nolock(const struct rpl_gtid *gtid, bool strict)
1460 {
1461 element *elem;
1462
1463 if ((elem= (element *)my_hash_search(&hash,
1464 (const uchar *)(>id->domain_id), 0)))
1465 {
1466 if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
1467 {
1468 my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
1469 gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
1470 elem->last_gtid->server_id, elem->last_gtid->seq_no);
1471 return 1;
1472 }
1473 if (elem->seq_no_counter < gtid->seq_no)
1474 elem->seq_no_counter= gtid->seq_no;
1475 if (!elem->update_element(gtid))
1476 return 0;
1477 }
1478 else if (!alloc_element_nolock(gtid))
1479 return 0;
1480
1481 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1482 return 1;
1483 }
1484
1485
1486 int
update(const struct rpl_gtid * gtid,bool strict)1487 rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
1488 {
1489 int res;
1490 mysql_mutex_lock(&LOCK_binlog_state);
1491 res= update_nolock(gtid, strict);
1492 mysql_mutex_unlock(&LOCK_binlog_state);
1493 return res;
1494 }
1495
1496
1497 /*
1498 Fill in a new GTID, allocating next sequence number, and update state
1499 accordingly.
1500 */
1501 int
update_with_next_gtid(uint32 domain_id,uint32 server_id,rpl_gtid * gtid)1502 rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
1503 rpl_gtid *gtid)
1504 {
1505 element *elem;
1506 int res= 0;
1507
1508 gtid->domain_id= domain_id;
1509 gtid->server_id= server_id;
1510
1511 mysql_mutex_lock(&LOCK_binlog_state);
1512 if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1513 {
1514 gtid->seq_no= ++elem->seq_no_counter;
1515 if (!elem->update_element(gtid))
1516 goto end;
1517 }
1518 else
1519 {
1520 gtid->seq_no= 1;
1521 if (!alloc_element_nolock(gtid))
1522 goto end;
1523 }
1524
1525 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1526 res= 1;
1527 end:
1528 mysql_mutex_unlock(&LOCK_binlog_state);
1529 return res;
1530 }
1531
1532
1533 /* Helper functions for update. */
1534 int
update_element(const rpl_gtid * gtid)1535 rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
1536 {
1537 rpl_gtid *lookup_gtid;
1538
1539 /*
1540 By far the most common case is that successive events within same
1541 replication domain have the same server id (it changes only when
1542 switching to a new master). So save a hash lookup in this case.
1543 */
1544 if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
1545 {
1546 last_gtid->seq_no= gtid->seq_no;
1547 return 0;
1548 }
1549
1550 lookup_gtid= (rpl_gtid *)
1551 my_hash_search(&hash, (const uchar *)>id->server_id, 0);
1552 if (lookup_gtid)
1553 {
1554 lookup_gtid->seq_no= gtid->seq_no;
1555 last_gtid= lookup_gtid;
1556 return 0;
1557 }
1558
1559 /* Allocate a new GTID and insert it. */
1560 lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
1561 if (!lookup_gtid)
1562 return 1;
1563 memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1564 if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
1565 {
1566 my_free(lookup_gtid);
1567 return 1;
1568 }
1569 last_gtid= lookup_gtid;
1570 return 0;
1571 }
1572
1573
1574 int
alloc_element_nolock(const rpl_gtid * gtid)1575 rpl_binlog_state::alloc_element_nolock(const rpl_gtid *gtid)
1576 {
1577 element *elem;
1578 rpl_gtid *lookup_gtid;
1579
1580 /* First time we see this domain_id; allocate a new element. */
1581 elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
1582 lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
1583 if (elem && lookup_gtid)
1584 {
1585 elem->domain_id= gtid->domain_id;
1586 my_hash_init(&elem->hash, &my_charset_bin, 32,
1587 offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1588 HASH_UNIQUE);
1589 elem->last_gtid= lookup_gtid;
1590 elem->seq_no_counter= gtid->seq_no;
1591 memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1592 if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
1593 {
1594 lookup_gtid= NULL; /* Do not free. */
1595 if (0 == my_hash_insert(&hash, (const uchar *)elem))
1596 return 0;
1597 }
1598 my_hash_free(&elem->hash);
1599 }
1600
1601 /* An error. */
1602 if (elem)
1603 my_free(elem);
1604 if (lookup_gtid)
1605 my_free(lookup_gtid);
1606 return 1;
1607 }
1608
1609
1610 /*
1611 Check that a new GTID can be logged without creating an out-of-order
1612 sequence number with existing GTIDs.
1613 */
1614 bool
check_strict_sequence(uint32 domain_id,uint32 server_id,uint64 seq_no)1615 rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
1616 uint64 seq_no)
1617 {
1618 element *elem;
1619 bool res= 0;
1620
1621 mysql_mutex_lock(&LOCK_binlog_state);
1622 if ((elem= (element *)my_hash_search(&hash,
1623 (const uchar *)(&domain_id), 0)) &&
1624 elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
1625 {
1626 my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
1627 elem->last_gtid->domain_id, elem->last_gtid->server_id,
1628 elem->last_gtid->seq_no);
1629 res= 1;
1630 }
1631 mysql_mutex_unlock(&LOCK_binlog_state);
1632 return res;
1633 }
1634
1635
1636 /*
1637 When we see a new GTID that will not be binlogged (eg. slave thread
1638 with --log-slave-updates=0), then we need to remember to allocate any
1639 GTID seq_no of our own within that domain starting from there.
1640
1641 Returns 0 if ok, non-zero if out-of-memory.
1642 */
1643 int
bump_seq_no_if_needed(uint32 domain_id,uint64 seq_no)1644 rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
1645 {
1646 element *elem;
1647 int res;
1648
1649 mysql_mutex_lock(&LOCK_binlog_state);
1650 if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1651 {
1652 if (elem->seq_no_counter < seq_no)
1653 elem->seq_no_counter= seq_no;
1654 res= 0;
1655 goto end;
1656 }
1657
1658 /* We need to allocate a new, empty element to remember the next seq_no. */
1659 if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
1660 {
1661 res= 1;
1662 goto end;
1663 }
1664
1665 elem->domain_id= domain_id;
1666 my_hash_init(&elem->hash, &my_charset_bin, 32,
1667 offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1668 HASH_UNIQUE);
1669 elem->last_gtid= NULL;
1670 elem->seq_no_counter= seq_no;
1671 if (0 == my_hash_insert(&hash, (const uchar *)elem))
1672 {
1673 res= 0;
1674 goto end;
1675 }
1676
1677 my_hash_free(&elem->hash);
1678 my_free(elem);
1679 res= 1;
1680
1681 end:
1682 mysql_mutex_unlock(&LOCK_binlog_state);
1683 return res;
1684 }
1685
1686
1687 /*
1688 Write binlog state to text file, so we can read it in again without having
1689 to scan last binlog file (normal shutdown/startup, not crash recovery).
1690
1691 The most recent GTID within each domain_id is written after any other GTID
1692 within this domain.
1693 */
1694 int
write_to_iocache(IO_CACHE * dest)1695 rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
1696 {
1697 ulong i, j;
1698 char buf[21];
1699 int res= 0;
1700
1701 mysql_mutex_lock(&LOCK_binlog_state);
1702 for (i= 0; i < hash.records; ++i)
1703 {
1704 element *e= (element *)my_hash_element(&hash, i);
1705 if (!e->last_gtid)
1706 {
1707 DBUG_ASSERT(e->hash.records == 0);
1708 continue;
1709 }
1710 for (j= 0; j <= e->hash.records; ++j)
1711 {
1712 const rpl_gtid *gtid;
1713 if (j < e->hash.records)
1714 {
1715 gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
1716 if (gtid == e->last_gtid)
1717 continue;
1718 }
1719 else
1720 gtid= e->last_gtid;
1721
1722 longlong10_to_str(gtid->seq_no, buf, 10);
1723 if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id,
1724 buf))
1725 {
1726 res= 1;
1727 goto end;
1728 }
1729 }
1730 }
1731
1732 end:
1733 mysql_mutex_unlock(&LOCK_binlog_state);
1734 return res;
1735 }
1736
1737
1738 int
read_from_iocache(IO_CACHE * src)1739 rpl_binlog_state::read_from_iocache(IO_CACHE *src)
1740 {
1741 /* 10-digit - 10-digit - 20-digit \n \0 */
1742 char buf[10+1+10+1+20+1+1];
1743 const char *p, *end;
1744 rpl_gtid gtid;
1745 int res= 0;
1746
1747 mysql_mutex_lock(&LOCK_binlog_state);
1748 reset_nolock();
1749 for (;;)
1750 {
1751 size_t len= my_b_gets(src, buf, sizeof(buf));
1752 if (!len)
1753 break;
1754 p= buf;
1755 end= buf + len;
1756 if (gtid_parser_helper(&p, end, >id) ||
1757 update_nolock(>id, false))
1758 {
1759 res= 1;
1760 break;
1761 }
1762 }
1763 mysql_mutex_unlock(&LOCK_binlog_state);
1764 return res;
1765 }
1766
1767
1768 rpl_gtid *
find_nolock(uint32 domain_id,uint32 server_id)1769 rpl_binlog_state::find_nolock(uint32 domain_id, uint32 server_id)
1770 {
1771 element *elem;
1772 if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
1773 return NULL;
1774 return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
1775 }
1776
1777 rpl_gtid *
find(uint32 domain_id,uint32 server_id)1778 rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
1779 {
1780 rpl_gtid *p;
1781 mysql_mutex_lock(&LOCK_binlog_state);
1782 p= find_nolock(domain_id, server_id);
1783 mysql_mutex_unlock(&LOCK_binlog_state);
1784 return p;
1785 }
1786
1787 rpl_gtid *
find_most_recent(uint32 domain_id)1788 rpl_binlog_state::find_most_recent(uint32 domain_id)
1789 {
1790 element *elem;
1791 rpl_gtid *gtid= NULL;
1792
1793 mysql_mutex_lock(&LOCK_binlog_state);
1794 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1795 if (elem && elem->last_gtid)
1796 gtid= elem->last_gtid;
1797 mysql_mutex_unlock(&LOCK_binlog_state);
1798
1799 return gtid;
1800 }
1801
1802
1803 uint32
count()1804 rpl_binlog_state::count()
1805 {
1806 uint32 c= 0;
1807 uint32 i;
1808
1809 mysql_mutex_lock(&LOCK_binlog_state);
1810 for (i= 0; i < hash.records; ++i)
1811 c+= ((element *)my_hash_element(&hash, i))->hash.records;
1812 mysql_mutex_unlock(&LOCK_binlog_state);
1813
1814 return c;
1815 }
1816
1817
1818 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)1819 rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
1820 {
1821 uint32 i, j, pos;
1822 int res= 0;
1823
1824 mysql_mutex_lock(&LOCK_binlog_state);
1825 pos= 0;
1826 for (i= 0; i < hash.records; ++i)
1827 {
1828 element *e= (element *)my_hash_element(&hash, i);
1829 if (!e->last_gtid)
1830 {
1831 DBUG_ASSERT(e->hash.records==0);
1832 continue;
1833 }
1834 for (j= 0; j <= e->hash.records; ++j)
1835 {
1836 const rpl_gtid *gtid;
1837 if (j < e->hash.records)
1838 {
1839 gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1840 if (gtid == e->last_gtid)
1841 continue;
1842 }
1843 else
1844 gtid= e->last_gtid;
1845
1846 if (pos >= list_size)
1847 {
1848 res= 1;
1849 goto end;
1850 }
1851 memcpy(>id_list[pos++], gtid, sizeof(*gtid));
1852 }
1853 }
1854
1855 end:
1856 mysql_mutex_unlock(&LOCK_binlog_state);
1857 return res;
1858 }
1859
1860
1861 /*
1862 Get a list of the most recently binlogged GTID, for each domain_id.
1863
1864 This can be used when switching from being a master to being a slave,
1865 to know where to start replicating from the new master.
1866
1867 The returned list must be de-allocated with my_free().
1868
1869 Returns 0 for ok, non-zero for out-of-memory.
1870 */
1871 int
get_most_recent_gtid_list(rpl_gtid ** list,uint32 * size)1872 rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
1873 {
1874 uint32 i;
1875 uint32 alloc_size, out_size;
1876 int res= 0;
1877
1878 out_size= 0;
1879 mysql_mutex_lock(&LOCK_binlog_state);
1880 alloc_size= hash.records;
1881 if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
1882 MYF(MY_WME))))
1883 {
1884 res= 1;
1885 goto end;
1886 }
1887 for (i= 0; i < alloc_size; ++i)
1888 {
1889 element *e= (element *)my_hash_element(&hash, i);
1890 if (!e->last_gtid)
1891 continue;
1892 memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
1893 }
1894
1895 end:
1896 mysql_mutex_unlock(&LOCK_binlog_state);
1897 *size= out_size;
1898 return res;
1899 }
1900
1901 bool
append_pos(String * str)1902 rpl_binlog_state::append_pos(String *str)
1903 {
1904 uint32 i;
1905
1906 mysql_mutex_lock(&LOCK_binlog_state);
1907 reset_dynamic(>id_sort_array);
1908
1909 for (i= 0; i < hash.records; ++i)
1910 {
1911 element *e= (element *)my_hash_element(&hash, i);
1912 if (e->last_gtid &&
1913 insert_dynamic(>id_sort_array, (const void *) e->last_gtid))
1914 {
1915 mysql_mutex_unlock(&LOCK_binlog_state);
1916 return true;
1917 }
1918 }
1919 rpl_slave_state_tostring_helper(>id_sort_array, str);
1920 mysql_mutex_unlock(&LOCK_binlog_state);
1921
1922 return false;
1923 }
1924
1925
1926 bool
append_state(String * str)1927 rpl_binlog_state::append_state(String *str)
1928 {
1929 uint32 i, j;
1930 bool res= false;
1931
1932 mysql_mutex_lock(&LOCK_binlog_state);
1933 reset_dynamic(>id_sort_array);
1934
1935 for (i= 0; i < hash.records; ++i)
1936 {
1937 element *e= (element *)my_hash_element(&hash, i);
1938 if (!e->last_gtid)
1939 {
1940 DBUG_ASSERT(e->hash.records==0);
1941 continue;
1942 }
1943 for (j= 0; j <= e->hash.records; ++j)
1944 {
1945 const rpl_gtid *gtid;
1946 if (j < e->hash.records)
1947 {
1948 gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1949 if (gtid == e->last_gtid)
1950 continue;
1951 }
1952 else
1953 gtid= e->last_gtid;
1954
1955 if (insert_dynamic(>id_sort_array, (const void *) gtid))
1956 {
1957 res= true;
1958 goto end;
1959 }
1960 }
1961 }
1962
1963 rpl_slave_state_tostring_helper(>id_sort_array, str);
1964
1965 end:
1966 mysql_mutex_unlock(&LOCK_binlog_state);
1967 return res;
1968 }
1969
1970 /**
1971 Remove domains supplied by the first argument from binlog state.
1972 Removal is done for any domain whose last gtids (from all its servers) match
1973 ones in Gtid list event of the 2nd argument.
1974
1975 @param ids gtid domain id sequence, may contain dups
1976 @param glev pointer to Gtid list event describing
1977 the match condition
1978 @param errbuf [out] pointer to possible error message array
1979
1980 @retval NULL as success when at least one domain is removed
1981 @retval "" empty string to indicate ineffective call
1982 when no domains removed
1983 @retval NOT EMPTY string otherwise an error message
1984 */
1985 const char*
drop_domain(DYNAMIC_ARRAY * ids,Gtid_list_log_event * glev,char * errbuf)1986 rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids,
1987 Gtid_list_log_event *glev,
1988 char* errbuf)
1989 {
1990 DYNAMIC_ARRAY domain_unique; // sequece (unsorted) of unique element*:s
1991 rpl_binlog_state::element* domain_unique_buffer[16];
1992 ulong k, l;
1993 const char* errmsg= NULL;
1994
1995 DBUG_ENTER("rpl_binlog_state::drop_domain");
1996
1997 my_init_dynamic_array2(&domain_unique,
1998 sizeof(element*), domain_unique_buffer,
1999 sizeof(domain_unique_buffer) / sizeof(element*), 4, 0);
2000
2001 mysql_mutex_lock(&LOCK_binlog_state);
2002
2003 /*
2004 Gtid list is supposed to come from a binlog's Gtid_list event and
2005 therefore should be a subset of the current binlog state. That is
2006 for every domain in the list the binlog state contains a gtid with
2007 sequence number not less than that of the list.
2008 Exceptions of this inclusion rule are:
2009 A. the list may still refer to gtids from already deleted domains.
2010 Files containing them must have been purged whereas the file
2011 with the list is not yet.
2012 B. out of order groups were injected
2013 C. manually build list of binlog files violating the inclusion
2014 constraint.
2015 While A is a normal case (not necessarily distinguishable from C though),
2016 B and C may require the user's attention so any (incl the A's suspected)
2017 inconsistency is diagnosed and *warned*.
2018 */
2019 for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0)
2020 {
2021 rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id,
2022 glev->list[l].server_id);
2023 if (!rb_state_gtid)
2024 sprintf(errbuf,
2025 "missing gtids from the '%u-%u' domain-server pair which is "
2026 "referred to in the gtid list describing an earlier state. Ignore "
2027 "if the domain ('%u') was already explicitly deleted",
2028 glev->list[l].domain_id, glev->list[l].server_id,
2029 glev->list[l].domain_id);
2030 else if (rb_state_gtid->seq_no < glev->list[l].seq_no)
2031 sprintf(errbuf,
2032 "having a gtid '%u-%u-%llu' which is less than "
2033 "the '%u-%u-%llu' of the gtid list describing an earlier state. "
2034 "The state may have been affected by manually injecting "
2035 "a lower sequence number gtid or via replication",
2036 rb_state_gtid->domain_id, rb_state_gtid->server_id,
2037 rb_state_gtid->seq_no, glev->list[l].domain_id,
2038 glev->list[l].server_id, glev->list[l].seq_no);
2039 if (strlen(errbuf)) // use strlen() as cheap flag
2040 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2041 ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2042 "The current gtid binlog state is incompatible with "
2043 "a former one %s.", errbuf);
2044 }
2045
2046 /*
2047 For each domain_id from ids
2048 when no such domain in binlog state
2049 warn && continue
2050 For each domain.server's last gtid
2051 when not locate the last gtid in glev.list
2052 error out binlog state can't change
2053 otherwise continue
2054 */
2055 for (ulong i= 0; i < ids->elements; i++)
2056 {
2057 rpl_binlog_state::element *elem= NULL;
2058 uint32 *ptr_domain_id;
2059 bool not_match;
2060
2061 ptr_domain_id= (uint32*) dynamic_array_ptr(ids, i);
2062 elem= (rpl_binlog_state::element *)
2063 my_hash_search(&hash, (const uchar *) ptr_domain_id, 0);
2064 if (!elem)
2065 {
2066 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2067 ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2068 "The gtid domain being deleted ('%lu') is not in "
2069 "the current binlog state", (unsigned long) *ptr_domain_id);
2070 continue;
2071 }
2072
2073 for (not_match= true, k= 0; k < elem->hash.records; k++)
2074 {
2075 rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k);
2076 for (ulong l= 0; l < glev->count && not_match; l++)
2077 not_match= !(*d_gtid == glev->list[l]);
2078 }
2079
2080 if (not_match)
2081 {
2082 sprintf(errbuf, "binlog files may contain gtids from the domain ('%u') "
2083 "being deleted. Make sure to first purge those files",
2084 *ptr_domain_id);
2085 errmsg= errbuf;
2086 goto end;
2087 }
2088 // compose a sequence of unique pointers to domain object
2089 for (k= 0; k < domain_unique.elements; k++)
2090 {
2091 if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k)
2092 == elem)
2093 break; // domain_id's elem has been already in
2094 }
2095 if (k == domain_unique.elements) // proven not to have duplicates
2096 insert_dynamic(&domain_unique, (uchar*) &elem);
2097 }
2098
2099 // Domain removal from binlog state
2100 for (k= 0; k < domain_unique.elements; k++)
2101 {
2102 rpl_binlog_state::element *elem= *(rpl_binlog_state::element**)
2103 dynamic_array_ptr(&domain_unique, k);
2104 my_hash_free(&elem->hash);
2105 my_hash_delete(&hash, (uchar*) elem);
2106 }
2107
2108 DBUG_ASSERT(strlen(errbuf) == 0);
2109
2110 if (domain_unique.elements == 0)
2111 errmsg= "";
2112
2113 end:
2114 mysql_mutex_unlock(&LOCK_binlog_state);
2115 delete_dynamic(&domain_unique);
2116
2117 DBUG_RETURN(errmsg);
2118 }
2119
slave_connection_state()2120 slave_connection_state::slave_connection_state()
2121 {
2122 my_hash_init(&hash, &my_charset_bin, 32,
2123 offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id),
2124 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
2125 my_init_dynamic_array(>id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
2126 }
2127
2128
~slave_connection_state()2129 slave_connection_state::~slave_connection_state()
2130 {
2131 my_hash_free(&hash);
2132 delete_dynamic(>id_sort_array);
2133 }
2134
2135
2136 /*
2137 Create a hash from the slave GTID state that is sent to master when slave
2138 connects to start replication.
2139
2140 The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
2141
2142 0-2-112,1-4-1022
2143
2144 The state gives for each domain_id the GTID to start replication from for
2145 the corresponding replication stream. So domain_id must be unique.
2146
2147 Returns 0 if ok, non-zero if error due to malformed input.
2148
2149 Note that input string is built by slave server, so it will not be incorrect
2150 unless bug/corruption/malicious server. So we just need basic sanity check,
2151 not fancy user-friendly error message.
2152 */
2153
2154 int
load(const char * slave_request,size_t len)2155 slave_connection_state::load(const char *slave_request, size_t len)
2156 {
2157 const char *p, *end;
2158 uchar *rec;
2159 rpl_gtid *gtid;
2160 const entry *e;
2161
2162 reset();
2163 p= slave_request;
2164 end= slave_request + len;
2165 if (p == end)
2166 return 0;
2167 for (;;)
2168 {
2169 if (!(rec= (uchar *)my_malloc(sizeof(entry), MYF(MY_WME))))
2170 return 1;
2171 gtid= &((entry *)rec)->gtid;
2172 if (gtid_parser_helper(&p, end, gtid))
2173 {
2174 my_free(rec);
2175 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2176 return 1;
2177 }
2178 if ((e= (const entry *)
2179 my_hash_search(&hash, (const uchar *)(>id->domain_id), 0)))
2180 {
2181 my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
2182 gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id,
2183 e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id);
2184 my_free(rec);
2185 return 1;
2186 }
2187 ((entry *)rec)->flags= 0;
2188 if (my_hash_insert(&hash, rec))
2189 {
2190 my_free(rec);
2191 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2192 return 1;
2193 }
2194 if (p == end)
2195 break; /* Finished. */
2196 if (*p != ',')
2197 {
2198 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2199 return 1;
2200 }
2201 ++p;
2202 }
2203
2204 return 0;
2205 }
2206
2207
2208 int
load(const rpl_gtid * gtid_list,uint32 count)2209 slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
2210 {
2211 uint32 i;
2212
2213 reset();
2214 for (i= 0; i < count; ++i)
2215 if (update(>id_list[i]))
2216 return 1;
2217 return 0;
2218 }
2219
2220
2221 static int
slave_connection_state_load_cb(rpl_gtid * gtid,void * data)2222 slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
2223 {
2224 slave_connection_state *state= (slave_connection_state *)data;
2225 return state->update(gtid);
2226 }
2227
2228
2229 /*
2230 Same as rpl_slave_state::tostring(), but populates a slave_connection_state
2231 instead.
2232 */
2233 int
load(rpl_slave_state * state,rpl_gtid * extra_gtids,uint32 num_extra)2234 slave_connection_state::load(rpl_slave_state *state,
2235 rpl_gtid *extra_gtids, uint32 num_extra)
2236 {
2237 reset();
2238 return state->iterate(slave_connection_state_load_cb, this,
2239 extra_gtids, num_extra, false);
2240 }
2241
2242
2243 slave_connection_state::entry *
find_entry(uint32 domain_id)2244 slave_connection_state::find_entry(uint32 domain_id)
2245 {
2246 return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
2247 }
2248
2249
2250 rpl_gtid *
find(uint32 domain_id)2251 slave_connection_state::find(uint32 domain_id)
2252 {
2253 entry *e= find_entry(domain_id);
2254 if (!e)
2255 return NULL;
2256 return &e->gtid;
2257 }
2258
2259
2260 int
update(const rpl_gtid * in_gtid)2261 slave_connection_state::update(const rpl_gtid *in_gtid)
2262 {
2263 entry *e;
2264 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2265 if (rec)
2266 {
2267 e= (entry *)rec;
2268 e->gtid= *in_gtid;
2269 return 0;
2270 }
2271
2272 if (!(e= (entry *)my_malloc(sizeof(*e), MYF(MY_WME))))
2273 return 1;
2274 e->gtid= *in_gtid;
2275 e->flags= 0;
2276 if (my_hash_insert(&hash, (uchar *)e))
2277 {
2278 my_free(e);
2279 return 1;
2280 }
2281
2282 return 0;
2283 }
2284
2285
2286 void
remove(const rpl_gtid * in_gtid)2287 slave_connection_state::remove(const rpl_gtid *in_gtid)
2288 {
2289 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2290 #ifdef DBUG_ASSERT_EXISTS
2291 bool err;
2292 rpl_gtid *slave_gtid= &((entry *)rec)->gtid;
2293 DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
2294 DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
2295 DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
2296 err=
2297 #endif
2298 my_hash_delete(&hash, rec);
2299 DBUG_ASSERT(!err);
2300 }
2301
2302
2303 void
remove_if_present(const rpl_gtid * in_gtid)2304 slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
2305 {
2306 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2307 if (rec)
2308 my_hash_delete(&hash, rec);
2309 }
2310
2311
2312 int
to_string(String * out_str)2313 slave_connection_state::to_string(String *out_str)
2314 {
2315 out_str->length(0);
2316 return append_to_string(out_str);
2317 }
2318
2319
2320 int
append_to_string(String * out_str)2321 slave_connection_state::append_to_string(String *out_str)
2322 {
2323 uint32 i;
2324 bool first;
2325
2326 first= true;
2327 for (i= 0; i < hash.records; ++i)
2328 {
2329 const entry *e= (const entry *)my_hash_element(&hash, i);
2330 if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first))
2331 return 1;
2332 }
2333 return 0;
2334 }
2335
2336
2337 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)2338 slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
2339 {
2340 uint32 i, pos;
2341
2342 pos= 0;
2343 for (i= 0; i < hash.records; ++i)
2344 {
2345 entry *e;
2346 if (pos >= list_size)
2347 return 1;
2348 e= (entry *)my_hash_element(&hash, i);
2349 memcpy(>id_list[pos++], &e->gtid, sizeof(e->gtid));
2350 }
2351
2352 return 0;
2353 }
2354
2355
2356 /*
2357 Check if the GTID position has been reached, for mysql_binlog_send().
2358
2359 The position has not been reached if we have anything in the state, unless
2360 it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
2361 belong to this master at all), or the START_OWN_SLAVE_POS (which means that
2362 we start on an old position from when the server was a slave with
2363 --log-slave-updates=0).
2364 */
2365 bool
is_pos_reached()2366 slave_connection_state::is_pos_reached()
2367 {
2368 uint32 i;
2369
2370 for (i= 0; i < hash.records; ++i)
2371 {
2372 entry *e= (entry *)my_hash_element(&hash, i);
2373 if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
2374 return false;
2375 }
2376
2377 return true;
2378 }
2379
2380
2381 /*
2382 Execute a MASTER_GTID_WAIT().
2383 The position to wait for is in gtid_str in string form.
2384 The timeout in microseconds is in timeout_us, zero means no timeout.
2385
2386 Returns:
2387 1 for error.
2388 0 for wait completed.
2389 -1 for wait timed out.
2390 */
2391 int
wait_for_pos(THD * thd,String * gtid_str,longlong timeout_us)2392 gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
2393 {
2394 int err;
2395 rpl_gtid *wait_pos;
2396 uint32 count, i;
2397 struct timespec wait_until, *wait_until_ptr;
2398 ulonglong before;
2399
2400 /* Wait for the empty position returns immediately. */
2401 if (gtid_str->length() == 0)
2402 {
2403 status_var_increment(thd->status_var.master_gtid_wait_count);
2404 return 0;
2405 }
2406
2407 if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
2408 &count)))
2409 {
2410 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2411 return 1;
2412 }
2413 status_var_increment(thd->status_var.master_gtid_wait_count);
2414 before= microsecond_interval_timer();
2415
2416 if (timeout_us >= 0)
2417 {
2418 set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
2419 wait_until_ptr= &wait_until;
2420 }
2421 else
2422 wait_until_ptr= NULL;
2423 err= 0;
2424 for (i= 0; i < count; ++i)
2425 {
2426 if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
2427 break;
2428 }
2429 switch (err)
2430 {
2431 case -1:
2432 status_var_increment(thd->status_var.master_gtid_wait_timeouts);
2433 /* fall through */
2434 case 0:
2435 status_var_add(thd->status_var.master_gtid_wait_time,
2436 static_cast<ulong>
2437 (microsecond_interval_timer() - before));
2438 }
2439 my_free(wait_pos);
2440 return err;
2441 }
2442
2443
2444 void
promote_new_waiter(gtid_waiting::hash_element * he)2445 gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
2446 {
2447 queue_element *qe;
2448
2449 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2450 if (queue_empty(&he->queue))
2451 return;
2452 qe= (queue_element *)queue_top(&he->queue);
2453 qe->do_small_wait= true;
2454 mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2455 }
2456
2457 void
process_wait_hash(uint64 wakeup_seq_no,gtid_waiting::hash_element * he)2458 gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
2459 gtid_waiting::hash_element *he)
2460 {
2461 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2462
2463 for (;;)
2464 {
2465 queue_element *qe;
2466
2467 if (queue_empty(&he->queue))
2468 break;
2469 qe= (queue_element *)queue_top(&he->queue);
2470 if (qe->wait_seq_no > wakeup_seq_no)
2471 break;
2472 DBUG_ASSERT(!qe->done);
2473 queue_remove_top(&he->queue);
2474 qe->done= true;;
2475 mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2476 }
2477 }
2478
2479
2480 /*
2481 Execute a MASTER_GTID_WAIT() for one specific domain.
2482
2483 The implementation is optimised primarily for (1) minimal performance impact
2484 on the slave replication threads, and secondarily for (2) quick performance
2485 of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
2486 read to clients in an async replication read-scaleout scenario.
2487
2488 To achieve (1), we have a "small" wait and a "large" wait. The small wait
2489 contends with the replication threads on the lock on the gtid_slave_pos, so
2490 only minimal processing is done under that lock, and only a single waiter at
2491 a time does the small wait.
2492
2493 If there is already a small waiter, a new thread will either replace the
2494 small waiter (if it needs to wait for an earlier sequence number), or
2495 instead do a "large" wait.
2496
2497 Once awoken on the small wait, the waiting thread releases the lock shared
2498 with the SQL threads quickly, and then processes all waiters currently doing
2499 the large wait using a different lock that does not impact replication.
2500
2501 This way, the SQL threads only need to do a single check + possibly a
2502 pthread_cond_signal() when updating the gtid_slave_state, and the time that
2503 non-SQL threads contend for the lock on gtid_slave_state is minimized.
2504
2505 There is always at least one thread that has the responsibility to ensure
2506 that there is a small waiter; this thread has queue_element::do_small_wait
2507 set to true. This thread will do the small wait until it is done, at which
2508 point it will make sure to pass on the responsibility to another thread.
2509 Normally only one thread has do_small_wait==true, but it can occasionally
2510 happen that there is more than one, when threads race one another for the
2511 lock on the small wait (this results in slightly increased activity on the
2512 small lock but is otherwise harmless).
2513
2514 Returns:
2515 0 Wait completed normally
2516 -1 Wait completed due to timeout
2517 1 An error (my_error() will have been called to set the error in the da)
2518 */
2519 int
wait_for_gtid(THD * thd,rpl_gtid * wait_gtid,struct timespec * wait_until)2520 gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
2521 struct timespec *wait_until)
2522 {
2523 bool timed_out= false;
2524 #ifdef HAVE_REPLICATION
2525 queue_element elem;
2526 uint32 domain_id= wait_gtid->domain_id;
2527 uint64 seq_no= wait_gtid->seq_no;
2528 hash_element *he;
2529 rpl_slave_state::element *slave_state_elem= NULL;
2530 PSI_stage_info old_stage;
2531 bool did_enter_cond= false;
2532
2533 elem.wait_seq_no= seq_no;
2534 elem.thd= thd;
2535 elem.done= false;
2536
2537 mysql_mutex_lock(&LOCK_gtid_waiting);
2538 if (!(he= get_entry(wait_gtid->domain_id)))
2539 {
2540 mysql_mutex_unlock(&LOCK_gtid_waiting);
2541 return 1;
2542 }
2543 /*
2544 If there is already another waiter with seq_no no larger than our own,
2545 we are sure that there is already a small waiter that will wake us up
2546 (or later pass the small wait responsibility to us). So in this case, we
2547 do not need to touch the small wait lock at all.
2548 */
2549 elem.do_small_wait=
2550 (queue_empty(&he->queue) ||
2551 ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
2552
2553 if (register_in_wait_queue(thd, wait_gtid, he, &elem))
2554 {
2555 mysql_mutex_unlock(&LOCK_gtid_waiting);
2556 return 1;
2557 }
2558 /*
2559 Loop, doing either the small or large wait as appropriate, until either
2560 the position waited for is reached, or we get a kill or timeout.
2561 */
2562 for (;;)
2563 {
2564 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2565
2566 if (elem.do_small_wait)
2567 {
2568 uint64 wakeup_seq_no;
2569 queue_element *cur_waiter;
2570
2571 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2572 /*
2573 The elements in the gtid_slave_state_hash are never re-allocated once
2574 they enter the hash, so we do not need to re-do the lookup after releasing
2575 and re-aquiring the lock.
2576 */
2577 if (!slave_state_elem &&
2578 !(slave_state_elem= rpl_global_gtid_slave_state->get_element(domain_id)))
2579 {
2580 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2581 remove_from_wait_queue(he, &elem);
2582 promote_new_waiter(he);
2583 if (did_enter_cond)
2584 thd->EXIT_COND(&old_stage);
2585 else
2586 mysql_mutex_unlock(&LOCK_gtid_waiting);
2587 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2588 return 1;
2589 }
2590
2591 if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
2592 {
2593 /*
2594 We do not have to wait. (We will be removed from the wait queue when
2595 we call process_wait_hash() below.
2596 */
2597 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2598 }
2599 else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
2600 slave_state_elem->min_wait_seq_no <= seq_no)
2601 {
2602 /*
2603 There is already a suitable small waiter, go do the large wait.
2604 (Normally we would not have needed to check the small wait in this
2605 case, but it can happen if we race with another thread for the small
2606 lock).
2607 */
2608 elem.do_small_wait= false;
2609 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2610 }
2611 else
2612 {
2613 /*
2614 We have to do the small wait ourselves (stealing it from any thread
2615 that might already be waiting for a later seq_no).
2616 */
2617 slave_state_elem->gtid_waiter= &elem;
2618 slave_state_elem->min_wait_seq_no= seq_no;
2619 if (cur_waiter)
2620 {
2621 /* We stole the wait, so wake up the old waiting thread. */
2622 mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
2623 }
2624
2625 /* Release the large lock, and do the small wait. */
2626 if (did_enter_cond)
2627 {
2628 thd->EXIT_COND(&old_stage);
2629 did_enter_cond= false;
2630 }
2631 else
2632 mysql_mutex_unlock(&LOCK_gtid_waiting);
2633 thd->ENTER_COND(&slave_state_elem->COND_wait_gtid,
2634 &rpl_global_gtid_slave_state->LOCK_slave_state,
2635 &stage_master_gtid_wait_primary, &old_stage);
2636 do
2637 {
2638 if (unlikely(thd->check_killed(1)))
2639 break;
2640 else if (wait_until)
2641 {
2642 int err=
2643 mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
2644 &rpl_global_gtid_slave_state->LOCK_slave_state,
2645 wait_until);
2646 if (err == ETIMEDOUT || err == ETIME)
2647 {
2648 timed_out= true;
2649 break;
2650 }
2651 }
2652 else
2653 mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
2654 &rpl_global_gtid_slave_state->LOCK_slave_state);
2655 } while (slave_state_elem->gtid_waiter == &elem);
2656 wakeup_seq_no= slave_state_elem->highest_seq_no;
2657 /*
2658 If we aborted due to timeout or kill, remove us as waiter.
2659
2660 If we were replaced by another waiter with a smaller seq_no, then we
2661 no longer have responsibility for the small wait.
2662 */
2663 if ((cur_waiter= slave_state_elem->gtid_waiter))
2664 {
2665 if (cur_waiter == &elem)
2666 slave_state_elem->gtid_waiter= NULL;
2667 else if (slave_state_elem->min_wait_seq_no <= seq_no)
2668 elem.do_small_wait= false;
2669 }
2670 thd->EXIT_COND(&old_stage);
2671
2672 mysql_mutex_lock(&LOCK_gtid_waiting);
2673 }
2674
2675 /*
2676 Note that hash_entry pointers do not change once allocated, so we do
2677 not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
2678 */
2679 process_wait_hash(wakeup_seq_no, he);
2680 }
2681 else
2682 {
2683 /* Do the large wait. */
2684 if (!did_enter_cond)
2685 {
2686 thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
2687 &stage_master_gtid_wait, &old_stage);
2688 did_enter_cond= true;
2689 }
2690 while (!elem.done && likely(!thd->check_killed(1)))
2691 {
2692 thd_wait_begin(thd, THD_WAIT_BINLOG);
2693 if (wait_until)
2694 {
2695 int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
2696 &LOCK_gtid_waiting, wait_until);
2697 if (err == ETIMEDOUT || err == ETIME)
2698 timed_out= true;
2699 }
2700 else
2701 mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
2702 thd_wait_end(thd);
2703 if (elem.do_small_wait || timed_out)
2704 break;
2705 }
2706 }
2707
2708 if ((thd->killed || timed_out) && !elem.done)
2709 {
2710 /* Aborted, so remove ourselves from the hash. */
2711 remove_from_wait_queue(he, &elem);
2712 elem.done= true;
2713 }
2714 if (elem.done)
2715 {
2716 /*
2717 If our wait is done, but we have (or were passed) responsibility for
2718 the small wait, then we need to pass on that task to someone else.
2719 */
2720 if (elem.do_small_wait)
2721 promote_new_waiter(he);
2722 break;
2723 }
2724 }
2725
2726 if (did_enter_cond)
2727 thd->EXIT_COND(&old_stage);
2728 else
2729 mysql_mutex_unlock(&LOCK_gtid_waiting);
2730 if (thd->killed)
2731 thd->send_kill_message();
2732 #endif /* HAVE_REPLICATION */
2733 return timed_out ? -1 : 0;
2734 }
2735
2736
2737 static void
free_hash_element(void * p)2738 free_hash_element(void *p)
2739 {
2740 gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
2741 delete_queue(&e->queue);
2742 my_free(e);
2743 }
2744
2745
2746 void
init()2747 gtid_waiting::init()
2748 {
2749 my_hash_init(&hash, &my_charset_bin, 32,
2750 offsetof(hash_element, domain_id), sizeof(uint32), NULL,
2751 free_hash_element, HASH_UNIQUE);
2752 mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
2753 }
2754
2755
2756 void
destroy()2757 gtid_waiting::destroy()
2758 {
2759 mysql_mutex_destroy(&LOCK_gtid_waiting);
2760 my_hash_free(&hash);
2761 }
2762
2763
2764 static int
cmp_queue_elem(void *,uchar * a,uchar * b)2765 cmp_queue_elem(void *, uchar *a, uchar *b)
2766 {
2767 uint64 seq_no_a= *(uint64 *)a;
2768 uint64 seq_no_b= *(uint64 *)b;
2769 if (seq_no_a < seq_no_b)
2770 return -1;
2771 else if (seq_no_a == seq_no_b)
2772 return 0;
2773 else
2774 return 1;
2775 }
2776
2777
2778 gtid_waiting::hash_element *
get_entry(uint32 domain_id)2779 gtid_waiting::get_entry(uint32 domain_id)
2780 {
2781 hash_element *e;
2782
2783 if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
2784 return e;
2785
2786 if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
2787 return NULL;
2788
2789 if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
2790 cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
2791 {
2792 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2793 my_free(e);
2794 return NULL;
2795 }
2796 e->domain_id= domain_id;
2797 if (my_hash_insert(&hash, (uchar *)e))
2798 {
2799 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2800 delete_queue(&e->queue);
2801 my_free(e);
2802 return NULL;
2803 }
2804 return e;
2805 }
2806
2807
2808 int
register_in_wait_queue(THD * thd,rpl_gtid * wait_gtid,gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2809 gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
2810 gtid_waiting::hash_element *he,
2811 gtid_waiting::queue_element *elem)
2812 {
2813 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2814
2815 if (queue_insert_safe(&he->queue, (uchar *)elem))
2816 {
2817 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2818 return 1;
2819 }
2820
2821 return 0;
2822 }
2823
2824
2825 void
remove_from_wait_queue(gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2826 gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
2827 gtid_waiting::queue_element *elem)
2828 {
2829 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2830
2831 queue_remove(&he->queue, elem->queue_idx);
2832 }
2833