1 /* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
2 Copyright (c) 2020, MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
16
17
18 /* Definitions for MariaDB global transaction ID (GTID). */
19
20 #include "mariadb.h"
21 #include "sql_priv.h"
22 #include "unireg.h"
23 #include "mariadb.h"
24 #include "sql_base.h"
25 #include "sql_parse.h"
26 #include "key.h"
27 #include "rpl_gtid.h"
28 #include "rpl_rli.h"
29 #include "slave.h"
30 #include "log_event.h"
31
32 const LEX_CSTRING rpl_gtid_slave_state_table_name=
33 { STRING_WITH_LEN("gtid_slave_pos") };
34
35
36 void
update_state_hash(uint64 sub_id,rpl_gtid * gtid,void * hton,rpl_group_info * rgi)37 rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
38 rpl_group_info *rgi)
39 {
40 int err;
41 /*
42 Add the gtid to the HASH in the replication slave state.
43
44 We must do this only _after_ commit, so that for parallel replication,
45 there will not be an attempt to delete the corresponding table row before
46 it is even committed.
47 */
48 mysql_mutex_lock(&LOCK_slave_state);
49 err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
50 mysql_mutex_unlock(&LOCK_slave_state);
51 if (err)
52 {
53 sql_print_warning("Slave: Out of memory during slave state maintenance. "
54 "Some no longer necessary rows in table "
55 "mysql.%s may be left undeleted.",
56 rpl_gtid_slave_state_table_name.str);
57 /*
58 Such failure is not fatal. We will fail to delete the row for this
59 GTID, but it will do no harm and will be removed automatically on next
60 server restart.
61 */
62 }
63 }
64
65
66 int
record_and_update_gtid(THD * thd,rpl_group_info * rgi)67 rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
68 {
69 DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
70
71 /*
72 Update the GTID position, if we have it and did not already update
73 it in a GTID transaction.
74 */
75 if (rgi->gtid_pending)
76 {
77 uint64 sub_id= rgi->gtid_sub_id;
78 void *hton= NULL;
79
80 rgi->gtid_pending= false;
81 if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
82 {
83 if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
84 DBUG_RETURN(1);
85 update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
86 }
87 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
88 }
89 DBUG_RETURN(0);
90 }
91
92
93 /*
94 Check GTID event execution when --gtid-ignore-duplicates.
95
96 The idea with --gtid-ignore-duplicates is that we allow multiple master
97 connections (in multi-source replication) to all receive the same GTIDs and
98 event groups. Only one instance of each is applied; we use the sequence
99 number in the GTID to decide whether a GTID has already been applied.
100
101 So if the seq_no of a GTID (or a higher sequence number) has already been
102 applied, then the event should be skipped. If not then the event should be
103 applied.
104
105 To avoid two master connections tring to apply the same event
106 simultaneously, only one is allowed to work in any given domain at any point
107 in time. The associated Relay_log_info object is called the owner of the
108 domain (and there can be multiple parallel worker threads working in that
109 domain for that Relay_log_info). Any other Relay_log_info/master connection
110 must wait for the domain to become free, or for their GTID to have been
111 applied, before being allowed to proceed.
112
113 Returns:
114 0 This GTID is already applied, it should be skipped.
115 1 The GTID is not yet applied; this rli is now the owner, and must apply
116 the event and release the domain afterwards.
117 -1 Error (out of memory to allocate a new element for the domain).
118 */
119 int
check_duplicate_gtid(rpl_gtid * gtid,rpl_group_info * rgi)120 rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
121 {
122 uint32 domain_id= gtid->domain_id;
123 uint64 seq_no= gtid->seq_no;
124 rpl_slave_state::element *elem;
125 int res;
126 bool did_enter_cond= false;
127 PSI_stage_info old_stage;
128 THD *UNINIT_VAR(thd);
129 Relay_log_info *rli= rgi->rli;
130
131 mysql_mutex_lock(&LOCK_slave_state);
132 if (!(elem= get_element(domain_id)))
133 {
134 my_error(ER_OUT_OF_RESOURCES, MYF(0));
135 res= -1;
136 goto err;
137 }
138 /*
139 Note that the elem pointer does not change once inserted in the hash. So
140 we can re-use the pointer without looking it up again in the hash after
141 each lock release and re-take.
142 */
143
144 for (;;)
145 {
146 if (elem->highest_seq_no >= seq_no)
147 {
148 /* This sequence number is already applied, ignore it. */
149 res= 0;
150 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
151 break;
152 }
153 if (!elem->owner_rli)
154 {
155 /* The domain became free, grab it and apply the event. */
156 elem->owner_rli= rli;
157 elem->owner_count= 1;
158 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
159 res= 1;
160 break;
161 }
162 if (elem->owner_rli == rli)
163 {
164 /* Already own this domain, increment reference count and apply event. */
165 ++elem->owner_count;
166 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
167 res= 1;
168 break;
169 }
170 thd= rgi->thd;
171 if (unlikely(thd->check_killed()))
172 {
173 res= -1;
174 break;
175 }
176 /*
177 Someone else is currently processing this GTID (or an earlier one).
178 Wait for them to complete (or fail), and then check again.
179 */
180 if (!did_enter_cond)
181 {
182 thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
183 &stage_gtid_wait_other_connection, &old_stage);
184 did_enter_cond= true;
185 }
186 mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
187 &LOCK_slave_state);
188 }
189
190 err:
191 if (did_enter_cond)
192 thd->EXIT_COND(&old_stage);
193 else
194 mysql_mutex_unlock(&LOCK_slave_state);
195 return res;
196 }
197
198
199 void
release_domain_owner(rpl_group_info * rgi)200 rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
201 {
202 element *elem= NULL;
203
204 mysql_mutex_lock(&LOCK_slave_state);
205 if (!(elem= get_element(rgi->current_gtid.domain_id)))
206 {
207 /*
208 We cannot really deal with error here, as we are already called in an
209 error handling case (transaction failure and rollback).
210
211 However, get_element() only fails if the element did not exist already
212 and could not be allocated due to out-of-memory - and if it did not
213 exist, then we would not get here in the first place.
214 */
215 mysql_mutex_unlock(&LOCK_slave_state);
216 return;
217 }
218
219 if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
220 {
221 uint32 count= elem->owner_count;
222 DBUG_ASSERT(count > 0);
223 DBUG_ASSERT(elem->owner_rli == rgi->rli);
224 --count;
225 elem->owner_count= count;
226 if (count == 0)
227 {
228 elem->owner_rli= NULL;
229 mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
230 }
231 }
232 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
233 mysql_mutex_unlock(&LOCK_slave_state);
234 }
235
236
237 static void
rpl_slave_state_free_element(void * arg)238 rpl_slave_state_free_element(void *arg)
239 {
240 struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
241 mysql_cond_destroy(&elem->COND_wait_gtid);
242 mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
243 my_free(elem);
244 }
245
246
rpl_slave_state()247 rpl_slave_state::rpl_slave_state()
248 : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false)
249 {
250 mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
251 MY_MUTEX_INIT_SLOW);
252 my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, offsetof(element, domain_id),
253 sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
254 my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid),
255 8, 8, MYF(0));
256 }
257
258
~rpl_slave_state()259 rpl_slave_state::~rpl_slave_state()
260 {
261 free_gtid_pos_tables(gtid_pos_tables.load(std::memory_order_relaxed));
262 truncate_hash();
263 my_hash_free(&hash);
264 delete_dynamic(>id_sort_array);
265 mysql_mutex_destroy(&LOCK_slave_state);
266 }
267
268
269 void
truncate_hash()270 rpl_slave_state::truncate_hash()
271 {
272 uint32 i;
273
274 for (i= 0; i < hash.records; ++i)
275 {
276 element *e= (element *)my_hash_element(&hash, i);
277 list_element *l= e->list;
278 list_element *next;
279 while (l)
280 {
281 next= l->next;
282 my_free(l);
283 l= next;
284 }
285 /* The element itself is freed by the hash element free function. */
286 }
287 my_hash_reset(&hash);
288 }
289
290
291 int
update(uint32 domain_id,uint32 server_id,uint64 sub_id,uint64 seq_no,void * hton,rpl_group_info * rgi)292 rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
293 uint64 seq_no, void *hton, rpl_group_info *rgi)
294 {
295 element *elem= NULL;
296 list_element *list_elem= NULL;
297
298 DBUG_ASSERT(hton || !loaded);
299 if (!(elem= get_element(domain_id)))
300 return 1;
301
302 if (seq_no > elem->highest_seq_no)
303 elem->highest_seq_no= seq_no;
304 if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
305 {
306 /*
307 Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
308 Signal (and remove) them. The waiter will handle all the processing
309 of all pending MASTER_GTID_WAIT(), so we do not slow down the
310 replication SQL thread.
311 */
312 mysql_mutex_assert_owner(&LOCK_slave_state);
313 elem->gtid_waiter= NULL;
314 mysql_cond_broadcast(&elem->COND_wait_gtid);
315 }
316
317 if (rgi)
318 {
319 if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
320 {
321 #ifdef DBUG_ASSERT_EXISTS
322 Relay_log_info *rli= rgi->rli;
323 #endif
324 uint32 count= elem->owner_count;
325 DBUG_ASSERT(count > 0);
326 DBUG_ASSERT(elem->owner_rli == rli);
327 --count;
328 elem->owner_count= count;
329 if (count == 0)
330 {
331 elem->owner_rli= NULL;
332 mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
333 }
334 }
335 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
336 }
337
338 if (!(list_elem= (list_element *)my_malloc(PSI_INSTRUMENT_ME,
339 sizeof(*list_elem), MYF(MY_WME))))
340 return 1;
341 list_elem->domain_id= domain_id;
342 list_elem->server_id= server_id;
343 list_elem->sub_id= sub_id;
344 list_elem->seq_no= seq_no;
345 list_elem->hton= hton;
346
347 elem->add(list_elem);
348 if (last_sub_id < sub_id)
349 last_sub_id= sub_id;
350
351 #ifdef HAVE_REPLICATION
352 ++pending_gtid_count;
353 if (pending_gtid_count >= opt_gtid_cleanup_batch_size)
354 {
355 pending_gtid_count = 0;
356 slave_background_gtid_pending_delete_request();
357 }
358 #endif
359
360 return 0;
361 }
362
363
364 struct rpl_slave_state::element *
get_element(uint32 domain_id)365 rpl_slave_state::get_element(uint32 domain_id)
366 {
367 struct element *elem;
368
369 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
370 if (elem)
371 return elem;
372
373 if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(MY_WME))))
374 return NULL;
375 elem->list= NULL;
376 elem->domain_id= domain_id;
377 elem->highest_seq_no= 0;
378 elem->gtid_waiter= NULL;
379 elem->owner_rli= NULL;
380 elem->owner_count= 0;
381 mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
382 mysql_cond_init(key_COND_gtid_ignore_duplicates,
383 &elem->COND_gtid_ignore_duplicates, 0);
384 if (my_hash_insert(&hash, (uchar *)elem))
385 {
386 my_free(elem);
387 return NULL;
388 }
389 return elem;
390 }
391
392
393 int
put_back_list(list_element * list)394 rpl_slave_state::put_back_list(list_element *list)
395 {
396 element *e= NULL;
397 int err= 0;
398
399 mysql_mutex_lock(&LOCK_slave_state);
400 while (list)
401 {
402 list_element *next= list->next;
403
404 if ((!e || e->domain_id != list->domain_id) &&
405 !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, 0)))
406 {
407 err= 1;
408 goto end;
409 }
410 e->add(list);
411 list= next;
412 }
413
414 end:
415 mysql_mutex_unlock(&LOCK_slave_state);
416 return err;
417 }
418
419
420 int
truncate_state_table(THD * thd)421 rpl_slave_state::truncate_state_table(THD *thd)
422 {
423 TABLE_LIST tlist;
424 int err= 0;
425
426 tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name,
427 NULL, TL_WRITE);
428 tlist.mdl_request.set_type(MDL_EXCLUSIVE);
429 if (!(err= open_and_lock_tables(thd, &tlist, FALSE,
430 MYSQL_OPEN_IGNORE_LOGGING_FORMAT)))
431 {
432 DBUG_ASSERT(!tlist.table->file->row_logging);
433 tlist.table->s->tdc->flush(thd, true);
434 err= tlist.table->file->ha_truncate();
435
436 if (err)
437 {
438 ha_rollback_trans(thd, FALSE);
439 close_thread_tables(thd);
440 ha_rollback_trans(thd, TRUE);
441 }
442 else
443 {
444 ha_commit_trans(thd, FALSE);
445 close_thread_tables(thd);
446 ha_commit_trans(thd, TRUE);
447 }
448 thd->release_transactional_locks();
449 }
450 return err;
451 }
452
453
454 static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
455 { { STRING_WITH_LEN("domain_id") },
456 { STRING_WITH_LEN("int(10) unsigned") },
457 {NULL, 0} },
458 { { STRING_WITH_LEN("sub_id") },
459 { STRING_WITH_LEN("bigint(20) unsigned") },
460 {NULL, 0} },
461 { { STRING_WITH_LEN("server_id") },
462 { STRING_WITH_LEN("int(10) unsigned") },
463 {NULL, 0} },
464 { { STRING_WITH_LEN("seq_no") },
465 { STRING_WITH_LEN("bigint(20) unsigned") },
466 {NULL, 0} },
467 };
468
469 static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
470
471 static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
472 array_elements(mysql_rpl_slave_state_coltypes),
473 mysql_rpl_slave_state_coltypes,
474 array_elements(mysql_rpl_slave_state_pk_parts),
475 mysql_rpl_slave_state_pk_parts
476 };
477
478 static Table_check_intact_log_error gtid_table_intact;
479
480 /*
481 Check that the mysql.gtid_slave_pos table has the correct definition.
482 */
483 int
gtid_check_rpl_slave_state_table(TABLE * table)484 gtid_check_rpl_slave_state_table(TABLE *table)
485 {
486 int err;
487
488 if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
489 my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
490 rpl_gtid_slave_state_table_name.str);
491 return err;
492 }
493
494
495 /*
496 Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
497 that is already in use by the current transaction, if any.
498 */
499 void
select_gtid_pos_table(THD * thd,LEX_CSTRING * out_tablename)500 rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
501 {
502 /*
503 See comments on rpl_slave_state::gtid_pos_tables for rules around proper
504 access to the list.
505 */
506 auto list= gtid_pos_tables.load(std::memory_order_acquire);
507
508 Ha_trx_info *ha_info;
509 uint count = 0;
510 for (ha_info= thd->transaction->all.ha_list; ha_info; ha_info= ha_info->next())
511 {
512 void *trx_hton= ha_info->ht();
513 auto table_entry= list;
514
515 if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
516 continue;
517 while (table_entry)
518 {
519 if (table_entry->table_hton == trx_hton)
520 {
521 if (likely(table_entry->state == GTID_POS_AVAILABLE))
522 {
523 *out_tablename= table_entry->table_name;
524 /*
525 Check if this is a cross-engine transaction, so we can correctly
526 maintain the rpl_transactions_multi_engine status variable.
527 */
528 if (count >= 1)
529 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
530 else
531 {
532 for (;;)
533 {
534 ha_info= ha_info->next();
535 if (!ha_info)
536 break;
537 if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
538 {
539 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
540 break;
541 }
542 }
543 }
544 return;
545 }
546 /*
547 This engine is marked to automatically create the table.
548 We cannot easily do this here (possibly in the middle of a
549 transaction). But we can request the slave background thread
550 to create it, and in a short while it should become available
551 for following transactions.
552 */
553 #ifdef HAVE_REPLICATION
554 slave_background_gtid_pos_create_request(table_entry);
555 #endif
556 break;
557 }
558 table_entry= table_entry->next;
559 }
560 ++count;
561 }
562 /*
563 If we cannot find any table whose engine matches an engine that is
564 already active in the transaction, or if there is no current transaction
565 engines available, we return the default gtid_slave_pos table.
566 */
567 *out_tablename=
568 default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
569 /* Record in status that we failed to find a suitable gtid_pos table. */
570 if (count > 0)
571 {
572 statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
573 if (count > 1)
574 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
575 }
576 }
577
578
579 /*
580 Write a gtid to the replication slave state table.
581
582 Do it as part of the transaction, to get slave crash safety, or as a separate
583 transaction if !in_transaction (eg. MyISAM or DDL).
584
585 gtid The global transaction id for this event group.
586 sub_id Value allocated within the sub_id when the event group was
587 read (sub_id must be consistent with commit order in master binlog).
588
589 Note that caller must later ensure that the new gtid and sub_id is inserted
590 into the appropriate HASH element with rpl_slave_state.add(), so that it can
591 be deleted later. But this must only be done after COMMIT if in transaction.
592 */
593 int
record_gtid(THD * thd,const rpl_gtid * gtid,uint64 sub_id,bool in_transaction,bool in_statement,void ** out_hton)594 rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
595 bool in_transaction, bool in_statement,
596 void **out_hton)
597 {
598 TABLE_LIST tlist;
599 int err= 0, not_sql_thread;
600 bool table_opened= false;
601 TABLE *table;
602 ulonglong thd_saved_option= thd->variables.option_bits;
603 Query_tables_list lex_backup;
604 wait_for_commit* suspended_wfc;
605 void *hton= NULL;
606 LEX_CSTRING gtid_pos_table_name;
607 DBUG_ENTER("record_gtid");
608
609 *out_hton= NULL;
610 if (unlikely(!loaded))
611 {
612 /*
613 Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or
614 corrupt.
615
616 We already complained loudly about this, but we can try to continue
617 until the DBA fixes it.
618 */
619 DBUG_RETURN(0);
620 }
621
622 if (!in_statement)
623 thd->reset_for_next_command();
624
625 /*
626 Only the SQL thread can call select_gtid_pos_table without a mutex
627 Other threads needs to use a mutex and take into account that the
628 result may change during execution, so we have to make a copy.
629 */
630
631 if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL)))
632 mysql_mutex_lock(&LOCK_slave_state);
633 select_gtid_pos_table(thd, >id_pos_table_name);
634 if (not_sql_thread)
635 {
636 LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str,
637 gtid_pos_table_name.length);
638 mysql_mutex_unlock(&LOCK_slave_state);
639 if (!tmp)
640 DBUG_RETURN(1);
641 gtid_pos_table_name= *tmp;
642 }
643
644 DBUG_EXECUTE_IF("gtid_inject_record_gtid",
645 {
646 my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
647 DBUG_RETURN(1);
648 } );
649
650 /*
651 If we are applying a non-transactional event group, we will be committing
652 here a transaction, but that does not imply that the event group has
653 completed or has been binlogged. So we should not trigger
654 wakeup_subsequent_commits() here.
655
656 Note: An alternative here could be to put a call to mark_start_commit() in
657 stmt_done() before the call to record_and_update_gtid(). This would
658 prevent later calling mark_start_commit() after we have run
659 wakeup_subsequent_commits() from committing the GTID update transaction
660 (which must be avoided to avoid accessing freed group_commit_orderer
661 object). It would also allow following event groups to start slightly
662 earlier. And in the cases where record_gtid() is called without an active
663 transaction, the current statement should have been binlogged already, so
664 binlog order is preserved.
665
666 But this is rather subtle, and potentially fragile. And it does not really
667 seem worth it; non-transactional loads are unlikely to benefit much from
668 parallel replication in any case. So for now, we go with the simple
669 suspend/resume of wakeup_subsequent_commits() here in record_gtid().
670 */
671 suspended_wfc= thd->suspend_subsequent_commits();
672 thd->lex->reset_n_backup_query_tables_list(&lex_backup);
673 tlist.init_one_table(&MYSQL_SCHEMA_NAME, >id_pos_table_name, NULL, TL_WRITE);
674 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
675 goto end;
676 table_opened= true;
677 table= tlist.table;
678 hton= table->s->db_type();
679 table->file->row_logging= 0; // No binary logging
680
681 if ((err= gtid_check_rpl_slave_state_table(table)))
682 goto end;
683
684 #ifdef WITH_WSREP
685 /*
686 Updates in slave state table should not be appended to galera transaction
687 writeset.
688 */
689 thd->wsrep_ignore_table= true;
690 #endif
691
692 if (!in_transaction)
693 {
694 DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
695 thd->variables.option_bits&=
696 ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
697 OPTION_GTID_BEGIN);
698 }
699 else
700 thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
701
702 bitmap_set_all(table->write_set);
703 table->rpl_write_set= table->write_set;
704
705 table->field[0]->store((ulonglong)gtid->domain_id, true);
706 table->field[1]->store(sub_id, true);
707 table->field[2]->store((ulonglong)gtid->server_id, true);
708 table->field[3]->store(gtid->seq_no, true);
709 DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
710 if ((err= table->file->ha_write_row(table->record[0])))
711 {
712 table->file->print_error(err, MYF(0));
713 goto end;
714 }
715 *out_hton= hton;
716
717 if(opt_bin_log &&
718 (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
719 gtid->seq_no)))
720 {
721 my_error(ER_OUT_OF_RESOURCES, MYF(0));
722 goto end;
723 }
724 end:
725
726 #ifdef WITH_WSREP
727 thd->wsrep_ignore_table= false;
728 #endif
729
730 if (table_opened)
731 {
732 if (err || (err= ha_commit_trans(thd, FALSE)))
733 ha_rollback_trans(thd, FALSE);
734
735 close_thread_tables(thd);
736 if (in_transaction)
737 thd->mdl_context.release_statement_locks();
738 else
739 thd->release_transactional_locks();
740 }
741 thd->lex->restore_backup_query_tables_list(&lex_backup);
742 thd->variables.option_bits= thd_saved_option;
743 thd->resume_subsequent_commits(suspended_wfc);
744 DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
745 {
746 if (gtid->server_id == 100)
747 my_sleep(500000);
748 });
749 DBUG_RETURN(err);
750 }
751
752
753 /*
754 Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are
755 no longer needed and can be deleted from the table.
756
757 Within each domain, we need to keep around the latest GTID (the one with the
758 highest sub_id), but any others in that domain can be deleted.
759 */
760 rpl_slave_state::list_element *
gtid_grab_pending_delete_list()761 rpl_slave_state::gtid_grab_pending_delete_list()
762 {
763 uint32 i;
764 list_element *full_list;
765
766 mysql_mutex_lock(&LOCK_slave_state);
767 full_list= NULL;
768 for (i= 0; i < hash.records; ++i)
769 {
770 element *elem= (element *)my_hash_element(&hash, i);
771 list_element *elist= elem->list;
772 list_element *last_elem, **best_ptr_ptr, *cur, *next;
773 uint64 best_sub_id;
774
775 if (!elist)
776 continue; /* Nothing here */
777
778 /* Delete any old stuff, but keep around the most recent one. */
779 cur= elist;
780 best_sub_id= cur->sub_id;
781 best_ptr_ptr= &elist;
782 last_elem= cur;
783 while ((next= cur->next)) {
784 last_elem= next;
785 if (next->sub_id > best_sub_id)
786 {
787 best_sub_id= next->sub_id;
788 best_ptr_ptr= &cur->next;
789 }
790 cur= next;
791 }
792 /*
793 Append the new elements to the full list. Note the order is important;
794 we do it here so that we do not break the list if best_sub_id is the
795 last of the new elements.
796 */
797 last_elem->next= full_list;
798 /*
799 Delete the highest sub_id element from the old list, and put it back as
800 the single-element new list.
801 */
802 cur= *best_ptr_ptr;
803 *best_ptr_ptr= cur->next;
804 cur->next= NULL;
805 elem->list= cur;
806
807 /*
808 Collect the full list so far here. Note that elist may have moved if we
809 deleted the first element, so order is again important.
810 */
811 full_list= elist;
812 }
813 mysql_mutex_unlock(&LOCK_slave_state);
814
815 return full_list;
816 }
817
818
819 /* Find the mysql.gtid_slave_posXXX table associated with a given hton. */
820 LEX_CSTRING *
select_gtid_pos_table(void * hton)821 rpl_slave_state::select_gtid_pos_table(void *hton)
822 {
823 /*
824 See comments on rpl_slave_state::gtid_pos_tables for rules around proper
825 access to the list.
826 */
827 auto table_entry= gtid_pos_tables.load(std::memory_order_acquire);
828
829 while (table_entry)
830 {
831 if (table_entry->table_hton == hton)
832 {
833 if (likely(table_entry->state == GTID_POS_AVAILABLE))
834 return &table_entry->table_name;
835 }
836 table_entry= table_entry->next;
837 }
838
839 return &default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
840 }
841
842
843 void
gtid_delete_pending(THD * thd,rpl_slave_state::list_element ** list_ptr)844 rpl_slave_state::gtid_delete_pending(THD *thd,
845 rpl_slave_state::list_element **list_ptr)
846 {
847 int err= 0;
848 ulonglong thd_saved_option;
849
850 if (unlikely(!loaded))
851 return;
852
853 #ifdef WITH_WSREP
854 /*
855 Updates in slave state table should not be appended to galera transaction
856 writeset.
857 */
858 thd->wsrep_ignore_table= true;
859 #endif
860
861 thd_saved_option= thd->variables.option_bits;
862 thd->variables.option_bits&=
863 ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
864 OPTION_GTID_BEGIN);
865
866 while (*list_ptr)
867 {
868 LEX_CSTRING *gtid_pos_table_name, *tmp_table_name;
869 Query_tables_list lex_backup;
870 TABLE_LIST tlist;
871 TABLE *table;
872 handler::Table_flags direct_pos= 0;
873 list_element *cur, **cur_ptr_ptr;
874 bool table_opened= false;
875 bool index_inited= false;
876 void *hton= (*list_ptr)->hton;
877
878 thd->reset_for_next_command();
879
880 /*
881 Only the SQL thread can call select_gtid_pos_table without a mutex
882 Other threads needs to use a mutex and take into account that the
883 result may change during execution, so we have to make a copy.
884 */
885 mysql_mutex_lock(&LOCK_slave_state);
886 tmp_table_name= select_gtid_pos_table(hton);
887 gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str,
888 tmp_table_name->length);
889 mysql_mutex_unlock(&LOCK_slave_state);
890 if (!gtid_pos_table_name)
891 {
892 /* Out of memory - we can try again later. */
893 break;
894 }
895
896 thd->lex->reset_n_backup_query_tables_list(&lex_backup);
897 tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE);
898 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
899 goto end;
900 table_opened= true;
901 table= tlist.table;
902
903 if ((err= gtid_check_rpl_slave_state_table(table)))
904 goto end;
905
906 direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION;
907 bitmap_set_all(table->write_set);
908 table->rpl_write_set= table->write_set;
909
910 /* Now delete any already committed GTIDs. */
911 bitmap_set_bit(table->read_set, table->field[0]->field_index);
912 bitmap_set_bit(table->read_set, table->field[1]->field_index);
913
914 if (!direct_pos)
915 {
916 if ((err= table->file->ha_index_init(0, 0)))
917 {
918 table->file->print_error(err, MYF(0));
919 goto end;
920 }
921 index_inited= true;
922 }
923
924 cur = *list_ptr;
925 cur_ptr_ptr = list_ptr;
926 do
927 {
928 uchar key_buffer[4+8];
929 list_element *next= cur->next;
930
931 if (cur->hton == hton)
932 {
933 int res;
934
935 table->field[0]->store((ulonglong)cur->domain_id, true);
936 table->field[1]->store(cur->sub_id, true);
937 if (direct_pos)
938 {
939 res= table->file->ha_rnd_pos_by_record(table->record[0]);
940 }
941 else
942 {
943 key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
944 res= table->file->ha_index_read_map(table->record[0], key_buffer,
945 HA_WHOLE_KEY, HA_READ_KEY_EXACT);
946 }
947 DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
948 { res= 1;
949 err= ENOENT;
950 sql_print_error("<DEBUG> Error deleting old GTID row");
951 });
952 if (res)
953 /* We cannot find the row, assume it is already deleted. */
954 ;
955 else if ((err= table->file->ha_delete_row(table->record[0])))
956 {
957 sql_print_error("Error deleting old GTID row: %s",
958 thd->get_stmt_da()->message());
959 /*
960 In case of error, we still discard the element from the list. We do
961 not want to endlessly error on the same element in case of table
962 corruption or such.
963 */
964 }
965 *cur_ptr_ptr= next;
966 my_free(cur);
967 }
968 else
969 {
970 /* Leave this one in the list until we get to the table for its hton. */
971 cur_ptr_ptr= &cur->next;
972 }
973 cur= next;
974 if (err)
975 break;
976 } while (cur);
977 end:
978 if (table_opened)
979 {
980 DBUG_ASSERT(direct_pos || index_inited || err);
981 /*
982 Index may not be initialized if there was a failure during
983 'ha_index_init'. Hence check if index initialization is successful and
984 then invoke ha_index_end(). Ending an index which is not initialized
985 will lead to assert.
986 */
987 if (index_inited)
988 table->file->ha_index_end();
989
990 if (err || (err= ha_commit_trans(thd, FALSE)))
991 ha_rollback_trans(thd, FALSE);
992 }
993 close_thread_tables(thd);
994 thd->release_transactional_locks();
995 thd->lex->restore_backup_query_tables_list(&lex_backup);
996
997 if (err)
998 break;
999 }
1000 thd->variables.option_bits= thd_saved_option;
1001
1002 #ifdef WITH_WSREP
1003 thd->wsrep_ignore_table= false;
1004 #endif
1005 }
1006
1007
1008 uint64
next_sub_id(uint32 domain_id)1009 rpl_slave_state::next_sub_id(uint32 domain_id)
1010 {
1011 uint64 sub_id= 0;
1012
1013 mysql_mutex_lock(&LOCK_slave_state);
1014 sub_id= ++last_sub_id;
1015 mysql_mutex_unlock(&LOCK_slave_state);
1016
1017 return sub_id;
1018 }
1019
1020 /* A callback used in sorting of gtid list based on domain_id. */
rpl_gtid_cmp_cb(const void * id1,const void * id2)1021 static int rpl_gtid_cmp_cb(const void *id1, const void *id2)
1022 {
1023 uint32 d1= ((rpl_gtid *)id1)->domain_id;
1024 uint32 d2= ((rpl_gtid *)id2)->domain_id;
1025
1026 if (d1 < d2)
1027 return -1;
1028 else if (d1 > d2)
1029 return 1;
1030 return 0;
1031 }
1032
1033 /* Format the specified gtid and store it in the given string buffer. */
1034 bool
rpl_slave_state_tostring_helper(String * dest,const rpl_gtid * gtid,bool * first)1035 rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
1036 {
1037 if (*first)
1038 *first= false;
1039 else
1040 if (dest->append(",",1))
1041 return true;
1042 return
1043 dest->append_ulonglong(gtid->domain_id) ||
1044 dest->append("-",1) ||
1045 dest->append_ulonglong(gtid->server_id) ||
1046 dest->append("-",1) ||
1047 dest->append_ulonglong(gtid->seq_no);
1048 }
1049
1050 /*
1051 Sort the given gtid list based on domain_id and store them in the specified
1052 string.
1053 */
1054 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,String * str)1055 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, String *str)
1056 {
1057 bool first= true, res= true;
1058
1059 sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
1060
1061 for (uint i= 0; i < gtid_dynarr->elements; i ++)
1062 {
1063 rpl_gtid *gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
1064 if (rpl_slave_state_tostring_helper(str, gtid, &first))
1065 goto err;
1066 }
1067 res= false;
1068
1069 err:
1070 return res;
1071 }
1072
1073
1074 /* Sort the given gtid list based on domain_id and call cb for each gtid. */
1075 static bool
rpl_slave_state_tostring_helper(DYNAMIC_ARRAY * gtid_dynarr,int (* cb)(rpl_gtid *,void *),void * data)1076 rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr,
1077 int (*cb)(rpl_gtid *, void *),
1078 void *data)
1079 {
1080 rpl_gtid *gtid;
1081 bool res= true;
1082
1083 sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
1084
1085 for (uint i= 0; i < gtid_dynarr->elements; i ++)
1086 {
1087 gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
1088 if ((*cb)(gtid, data))
1089 goto err;
1090 }
1091 res= false;
1092
1093 err:
1094 return res;
1095 }
1096
1097 int
iterate(int (* cb)(rpl_gtid *,void *),void * data,rpl_gtid * extra_gtids,uint32 num_extra,bool sort)1098 rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
1099 rpl_gtid *extra_gtids, uint32 num_extra,
1100 bool sort)
1101 {
1102 uint32 i;
1103 HASH gtid_hash;
1104 uchar *rec;
1105 rpl_gtid *gtid;
1106 int res= 1;
1107 bool locked= false;
1108
1109 my_hash_init(PSI_INSTRUMENT_ME, >id_hash, &my_charset_bin, 32,
1110 offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, NULL,
1111 HASH_UNIQUE);
1112 for (i= 0; i < num_extra; ++i)
1113 if (extra_gtids[i].server_id == global_system_variables.server_id &&
1114 my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i])))
1115 goto err;
1116
1117 mysql_mutex_lock(&LOCK_slave_state);
1118 locked= true;
1119 reset_dynamic(>id_sort_array);
1120
1121 for (i= 0; i < hash.records; ++i)
1122 {
1123 uint64 best_sub_id;
1124 rpl_gtid best_gtid;
1125 element *e= (element *)my_hash_element(&hash, i);
1126 list_element *l= e->list;
1127
1128 if (!l)
1129 continue; /* Nothing here */
1130
1131 best_gtid.domain_id= e->domain_id;
1132 best_gtid.server_id= l->server_id;
1133 best_gtid.seq_no= l->seq_no;
1134 best_sub_id= l->sub_id;
1135 while ((l= l->next))
1136 {
1137 if (l->sub_id > best_sub_id)
1138 {
1139 best_sub_id= l->sub_id;
1140 best_gtid.server_id= l->server_id;
1141 best_gtid.seq_no= l->seq_no;
1142 }
1143 }
1144
1145 /* Check if we have something newer in the extra list. */
1146 rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, 0);
1147 if (rec)
1148 {
1149 gtid= (rpl_gtid *)rec;
1150 if (gtid->seq_no > best_gtid.seq_no)
1151 memcpy(&best_gtid, gtid, sizeof(best_gtid));
1152 if (my_hash_delete(>id_hash, rec))
1153 {
1154 goto err;
1155 }
1156 }
1157
1158 if ((res= sort ? insert_dynamic(>id_sort_array,
1159 (const void *) &best_gtid) :
1160 (*cb)(&best_gtid, data)))
1161 {
1162 goto err;
1163 }
1164 }
1165
1166 /* Also add any remaining extra domain_ids. */
1167 for (i= 0; i < gtid_hash.records; ++i)
1168 {
1169 gtid= (rpl_gtid *)my_hash_element(>id_hash, i);
1170 if ((res= sort ? insert_dynamic(>id_sort_array, (const void *) gtid) :
1171 (*cb)(gtid, data)))
1172 {
1173 goto err;
1174 }
1175 }
1176
1177 if (sort && rpl_slave_state_tostring_helper(>id_sort_array, cb, data))
1178 {
1179 goto err;
1180 }
1181
1182 res= 0;
1183
1184 err:
1185 if (locked) mysql_mutex_unlock(&LOCK_slave_state);
1186 my_hash_free(>id_hash);
1187
1188 return res;
1189 }
1190
1191
1192 struct rpl_slave_state_tostring_data {
1193 String *dest;
1194 bool first;
1195 };
1196 static int
rpl_slave_state_tostring_cb(rpl_gtid * gtid,void * data)1197 rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
1198 {
1199 rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
1200 return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
1201 }
1202
1203
1204 /*
1205 Prepare the current slave state as a string, suitable for sending to the
1206 master to request to receive binlog events starting from that GTID state.
1207
1208 The state consists of the most recently applied GTID for each domain_id,
1209 ie. the one with the highest sub_id within each domain_id.
1210
1211 Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
1212 a server was previously a master and now needs to connect to a new master as
1213 a slave. For each domain_id, if the GTID in the binlog was logged with our
1214 own server_id _and_ has a higher seq_no than what is in the slave state,
1215 then this should be used as the position to start replicating at. This
1216 allows to promote a slave as new master, and connect the old master as a
1217 slave with MASTER_GTID_POS=AUTO.
1218 */
1219 int
tostring(String * dest,rpl_gtid * extra_gtids,uint32 num_extra)1220 rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
1221 {
1222 struct rpl_slave_state_tostring_data data;
1223 data.first= true;
1224 data.dest= dest;
1225
1226 return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids,
1227 num_extra, true);
1228 }
1229
1230
1231 /*
1232 Lookup a domain_id in the current replication slave state.
1233
1234 Returns false if the domain_id has no entries in the slave state.
1235 Otherwise returns true, and fills in out_gtid with the corresponding
1236 GTID.
1237 */
1238 bool
domain_to_gtid(uint32 domain_id,rpl_gtid * out_gtid)1239 rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
1240 {
1241 element *elem;
1242 list_element *list;
1243 uint64 best_sub_id;
1244
1245 mysql_mutex_lock(&LOCK_slave_state);
1246 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1247 if (!elem || !(list= elem->list))
1248 {
1249 mysql_mutex_unlock(&LOCK_slave_state);
1250 return false;
1251 }
1252
1253 out_gtid->domain_id= domain_id;
1254 out_gtid->server_id= list->server_id;
1255 out_gtid->seq_no= list->seq_no;
1256 best_sub_id= list->sub_id;
1257
1258 while ((list= list->next))
1259 {
1260 if (best_sub_id > list->sub_id)
1261 continue;
1262 best_sub_id= list->sub_id;
1263 out_gtid->server_id= list->server_id;
1264 out_gtid->seq_no= list->seq_no;
1265 }
1266
1267 mysql_mutex_unlock(&LOCK_slave_state);
1268 return true;
1269 }
1270
1271
1272 /*
1273 Parse a GTID at the start of a string, and update the pointer to point
1274 at the first character after the parsed GTID.
1275
1276 Returns 0 on ok, non-zero on parse error.
1277 */
1278 static int
gtid_parser_helper(const char ** ptr,const char * end,rpl_gtid * out_gtid)1279 gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
1280 {
1281 char *q;
1282 const char *p= *ptr;
1283 uint64 v1, v2, v3;
1284 int err= 0;
1285
1286 q= (char*) end;
1287 v1= (uint64)my_strtoll10(p, &q, &err);
1288 if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
1289 return 1;
1290 p= q+1;
1291 q= (char*) end;
1292 v2= (uint64)my_strtoll10(p, &q, &err);
1293 if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
1294 return 1;
1295 p= q+1;
1296 q= (char*) end;
1297 v3= (uint64)my_strtoll10(p, &q, &err);
1298 if (err != 0)
1299 return 1;
1300
1301 out_gtid->domain_id= (uint32) v1;
1302 out_gtid->server_id= (uint32) v2;
1303 out_gtid->seq_no= v3;
1304 *ptr= q;
1305 return 0;
1306 }
1307
1308
1309 rpl_gtid *
gtid_parse_string_to_list(const char * str,size_t str_len,uint32 * out_len)1310 gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
1311 {
1312 const char *p= const_cast<char *>(str);
1313 const char *end= p + str_len;
1314 uint32 len= 0, alloc_len= 5;
1315 rpl_gtid *list= NULL;
1316
1317 for (;;)
1318 {
1319 rpl_gtid gtid;
1320
1321 if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id))
1322 {
1323 my_free(list);
1324 return NULL;
1325 }
1326 if ((!list || len >= alloc_len) &&
1327 !(list=
1328 (rpl_gtid *)my_realloc(PSI_INSTRUMENT_ME, list,
1329 (alloc_len= alloc_len*2) * sizeof(rpl_gtid),
1330 MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
1331 return NULL;
1332 list[len++]= gtid;
1333
1334 if (p == end)
1335 break;
1336 if (*p != ',')
1337 {
1338 my_free(list);
1339 return NULL;
1340 }
1341 ++p;
1342 }
1343 *out_len= len;
1344 return list;
1345 }
1346
1347
1348 /*
1349 Update the slave replication state with the GTID position obtained from
1350 master when connecting with old-style (filename,offset) position.
1351
1352 If RESET is true then all existing entries are removed. Otherwise only
1353 domain_ids mentioned in the STATE_FROM_MASTER are changed.
1354
1355 Returns 0 if ok, non-zero if error.
1356 */
1357 int
load(THD * thd,const char * state_from_master,size_t len,bool reset,bool in_statement)1358 rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
1359 bool reset, bool in_statement)
1360 {
1361 const char *end= state_from_master + len;
1362
1363 if (reset)
1364 {
1365 if (truncate_state_table(thd))
1366 return 1;
1367 truncate_hash();
1368 }
1369 if (state_from_master == end)
1370 return 0;
1371 for (;;)
1372 {
1373 rpl_gtid gtid;
1374 uint64 sub_id;
1375 void *hton= NULL;
1376
1377 if (gtid_parser_helper(&state_from_master, end, >id) ||
1378 !(sub_id= next_sub_id(gtid.domain_id)) ||
1379 record_gtid(thd, >id, sub_id, false, in_statement, &hton) ||
1380 update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
1381 return 1;
1382 if (state_from_master == end)
1383 break;
1384 if (*state_from_master != ',')
1385 return 1;
1386 ++state_from_master;
1387 }
1388 return 0;
1389 }
1390
1391
1392 bool
is_empty()1393 rpl_slave_state::is_empty()
1394 {
1395 uint32 i;
1396 bool result= true;
1397
1398 mysql_mutex_lock(&LOCK_slave_state);
1399 for (i= 0; i < hash.records; ++i)
1400 {
1401 element *e= (element *)my_hash_element(&hash, i);
1402 if (e->list)
1403 {
1404 result= false;
1405 break;
1406 }
1407 }
1408 mysql_mutex_unlock(&LOCK_slave_state);
1409
1410 return result;
1411 }
1412
1413
1414 void
free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table * list)1415 rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
1416 {
1417 struct gtid_pos_table *cur, *next;
1418
1419 cur= list;
1420 while (cur)
1421 {
1422 next= cur->next;
1423 my_free(cur);
1424 cur= next;
1425 }
1426 }
1427
1428
1429 /*
1430 Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
1431 The caller must be holding LOCK_slave_state. Additionally, this function
1432 must only be called while all SQL threads are stopped.
1433 */
1434 void
set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table * new_list,rpl_slave_state::gtid_pos_table * default_entry)1435 rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
1436 rpl_slave_state::gtid_pos_table *default_entry)
1437 {
1438 mysql_mutex_assert_owner(&LOCK_slave_state);
1439 auto old_list= gtid_pos_tables.load(std::memory_order_relaxed);
1440 gtid_pos_tables.store(new_list, std::memory_order_release);
1441 default_gtid_pos_table.store(default_entry, std::memory_order_release);
1442 free_gtid_pos_tables(old_list);
1443 }
1444
1445
1446 void
add_gtid_pos_table(rpl_slave_state::gtid_pos_table * entry)1447 rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
1448 {
1449 mysql_mutex_assert_owner(&LOCK_slave_state);
1450 entry->next= gtid_pos_tables.load(std::memory_order_relaxed);
1451 gtid_pos_tables.store(entry, std::memory_order_release);
1452 }
1453
1454
1455 struct rpl_slave_state::gtid_pos_table *
alloc_gtid_pos_table(LEX_CSTRING * table_name,void * hton,rpl_slave_state::gtid_pos_table_state state)1456 rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton,
1457 rpl_slave_state::gtid_pos_table_state state)
1458 {
1459 struct gtid_pos_table *p;
1460 char *allocated_str;
1461
1462 if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME), &p, sizeof(*p),
1463 &allocated_str, table_name->length+1, NULL))
1464 {
1465 my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
1466 return NULL;
1467 }
1468 memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
1469 p->next = NULL;
1470 p->table_hton= hton;
1471 p->table_name.str= allocated_str;
1472 p->table_name.length= table_name->length;
1473 p->state= state;
1474 return p;
1475 }
1476
1477
init()1478 void rpl_binlog_state::init()
1479 {
1480 my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, offsetof(element, domain_id),
1481 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1482 my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
1483 mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
1484 MY_MUTEX_INIT_SLOW);
1485 initialized= 1;
1486 }
1487
1488 void
reset_nolock()1489 rpl_binlog_state::reset_nolock()
1490 {
1491 uint32 i;
1492
1493 for (i= 0; i < hash.records; ++i)
1494 my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
1495 my_hash_reset(&hash);
1496 }
1497
1498
1499 void
reset()1500 rpl_binlog_state::reset()
1501 {
1502 mysql_mutex_lock(&LOCK_binlog_state);
1503 reset_nolock();
1504 mysql_mutex_unlock(&LOCK_binlog_state);
1505 }
1506
1507
free()1508 void rpl_binlog_state::free()
1509 {
1510 if (initialized)
1511 {
1512 initialized= 0;
1513 reset_nolock();
1514 my_hash_free(&hash);
1515 delete_dynamic(>id_sort_array);
1516 mysql_mutex_destroy(&LOCK_binlog_state);
1517 }
1518 }
1519
1520
1521 bool
load(struct rpl_gtid * list,uint32 count)1522 rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
1523 {
1524 uint32 i;
1525 bool res= false;
1526
1527 mysql_mutex_lock(&LOCK_binlog_state);
1528 reset_nolock();
1529 for (i= 0; i < count; ++i)
1530 {
1531 if (update_nolock(&(list[i]), false))
1532 {
1533 res= true;
1534 break;
1535 }
1536 }
1537 mysql_mutex_unlock(&LOCK_binlog_state);
1538 return res;
1539 }
1540
1541
rpl_binlog_state_load_cb(rpl_gtid * gtid,void * data)1542 static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
1543 {
1544 rpl_binlog_state *self= (rpl_binlog_state *)data;
1545 return self->update_nolock(gtid, false);
1546 }
1547
1548
1549 bool
load(rpl_slave_state * slave_pos)1550 rpl_binlog_state::load(rpl_slave_state *slave_pos)
1551 {
1552 bool res= false;
1553
1554 mysql_mutex_lock(&LOCK_binlog_state);
1555 reset_nolock();
1556 if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0, false))
1557 res= true;
1558 mysql_mutex_unlock(&LOCK_binlog_state);
1559 return res;
1560 }
1561
1562
~rpl_binlog_state()1563 rpl_binlog_state::~rpl_binlog_state()
1564 {
1565 free();
1566 }
1567
1568
1569 /*
1570 Update replication state with a new GTID.
1571
1572 If the (domain_id, server_id) pair already exists, then the new GTID replaces
1573 the old one for that domain id. Else a new entry is inserted.
1574
1575 Returns 0 for ok, 1 for error.
1576 */
1577 int
update_nolock(const struct rpl_gtid * gtid,bool strict)1578 rpl_binlog_state::update_nolock(const struct rpl_gtid *gtid, bool strict)
1579 {
1580 element *elem;
1581
1582 if ((elem= (element *)my_hash_search(&hash,
1583 (const uchar *)(>id->domain_id), 0)))
1584 {
1585 if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
1586 {
1587 my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
1588 gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
1589 elem->last_gtid->server_id, elem->last_gtid->seq_no);
1590 return 1;
1591 }
1592 if (elem->seq_no_counter < gtid->seq_no)
1593 elem->seq_no_counter= gtid->seq_no;
1594 if (!elem->update_element(gtid))
1595 return 0;
1596 }
1597 else if (!alloc_element_nolock(gtid))
1598 return 0;
1599
1600 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1601 return 1;
1602 }
1603
1604
1605 int
update(const struct rpl_gtid * gtid,bool strict)1606 rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
1607 {
1608 int res;
1609 mysql_mutex_lock(&LOCK_binlog_state);
1610 res= update_nolock(gtid, strict);
1611 mysql_mutex_unlock(&LOCK_binlog_state);
1612 return res;
1613 }
1614
1615
1616 /*
1617 Fill in a new GTID, allocating next sequence number, and update state
1618 accordingly.
1619 */
1620 int
update_with_next_gtid(uint32 domain_id,uint32 server_id,rpl_gtid * gtid)1621 rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
1622 rpl_gtid *gtid)
1623 {
1624 element *elem;
1625 int res= 0;
1626
1627 gtid->domain_id= domain_id;
1628 gtid->server_id= server_id;
1629
1630 mysql_mutex_lock(&LOCK_binlog_state);
1631 if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1632 {
1633 gtid->seq_no= ++elem->seq_no_counter;
1634 if (!elem->update_element(gtid))
1635 goto end;
1636 }
1637 else
1638 {
1639 gtid->seq_no= 1;
1640 if (!alloc_element_nolock(gtid))
1641 goto end;
1642 }
1643
1644 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1645 res= 1;
1646 end:
1647 mysql_mutex_unlock(&LOCK_binlog_state);
1648 return res;
1649 }
1650
1651
1652 /* Helper functions for update. */
1653 int
update_element(const rpl_gtid * gtid)1654 rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
1655 {
1656 rpl_gtid *lookup_gtid;
1657
1658 /*
1659 By far the most common case is that successive events within same
1660 replication domain have the same server id (it changes only when
1661 switching to a new master). So save a hash lookup in this case.
1662 */
1663 if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
1664 {
1665 last_gtid->seq_no= gtid->seq_no;
1666 return 0;
1667 }
1668
1669 lookup_gtid= (rpl_gtid *)
1670 my_hash_search(&hash, (const uchar *)>id->server_id, 0);
1671 if (lookup_gtid)
1672 {
1673 lookup_gtid->seq_no= gtid->seq_no;
1674 last_gtid= lookup_gtid;
1675 return 0;
1676 }
1677
1678 /* Allocate a new GTID and insert it. */
1679 lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid),
1680 MYF(MY_WME));
1681 if (!lookup_gtid)
1682 return 1;
1683 memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1684 if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
1685 {
1686 my_free(lookup_gtid);
1687 return 1;
1688 }
1689 last_gtid= lookup_gtid;
1690 return 0;
1691 }
1692
1693
1694 int
alloc_element_nolock(const rpl_gtid * gtid)1695 rpl_binlog_state::alloc_element_nolock(const rpl_gtid *gtid)
1696 {
1697 element *elem;
1698 rpl_gtid *lookup_gtid;
1699
1700 /* First time we see this domain_id; allocate a new element. */
1701 elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(MY_WME));
1702 lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid),
1703 MYF(MY_WME));
1704 if (elem && lookup_gtid)
1705 {
1706 elem->domain_id= gtid->domain_id;
1707 my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32,
1708 offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1709 HASH_UNIQUE);
1710 elem->last_gtid= lookup_gtid;
1711 elem->seq_no_counter= gtid->seq_no;
1712 memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1713 if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
1714 {
1715 lookup_gtid= NULL; /* Do not free. */
1716 if (0 == my_hash_insert(&hash, (const uchar *)elem))
1717 return 0;
1718 }
1719 my_hash_free(&elem->hash);
1720 }
1721
1722 /* An error. */
1723 if (elem)
1724 my_free(elem);
1725 if (lookup_gtid)
1726 my_free(lookup_gtid);
1727 return 1;
1728 }
1729
1730
1731 /*
1732 Check that a new GTID can be logged without creating an out-of-order
1733 sequence number with existing GTIDs.
1734 */
1735 bool
check_strict_sequence(uint32 domain_id,uint32 server_id,uint64 seq_no)1736 rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
1737 uint64 seq_no)
1738 {
1739 element *elem;
1740 bool res= 0;
1741
1742 mysql_mutex_lock(&LOCK_binlog_state);
1743 if ((elem= (element *)my_hash_search(&hash,
1744 (const uchar *)(&domain_id), 0)) &&
1745 elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
1746 {
1747 my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
1748 elem->last_gtid->domain_id, elem->last_gtid->server_id,
1749 elem->last_gtid->seq_no);
1750 res= 1;
1751 }
1752 mysql_mutex_unlock(&LOCK_binlog_state);
1753 return res;
1754 }
1755
1756
1757 /*
1758 When we see a new GTID that will not be binlogged (eg. slave thread
1759 with --log-slave-updates=0), then we need to remember to allocate any
1760 GTID seq_no of our own within that domain starting from there.
1761
1762 Returns 0 if ok, non-zero if out-of-memory.
1763 */
1764 int
bump_seq_no_if_needed(uint32 domain_id,uint64 seq_no)1765 rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
1766 {
1767 element *elem;
1768 int res;
1769
1770 mysql_mutex_lock(&LOCK_binlog_state);
1771 if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1772 {
1773 if (elem->seq_no_counter < seq_no)
1774 elem->seq_no_counter= seq_no;
1775 res= 0;
1776 goto end;
1777 }
1778
1779 /* We need to allocate a new, empty element to remember the next seq_no. */
1780 if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem),
1781 MYF(MY_WME))))
1782 {
1783 res= 1;
1784 goto end;
1785 }
1786
1787 elem->domain_id= domain_id;
1788 my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32,
1789 offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1790 HASH_UNIQUE);
1791 elem->last_gtid= NULL;
1792 elem->seq_no_counter= seq_no;
1793 if (0 == my_hash_insert(&hash, (const uchar *)elem))
1794 {
1795 res= 0;
1796 goto end;
1797 }
1798
1799 my_hash_free(&elem->hash);
1800 my_free(elem);
1801 res= 1;
1802
1803 end:
1804 mysql_mutex_unlock(&LOCK_binlog_state);
1805 return res;
1806 }
1807
1808
1809 /*
1810 Write binlog state to text file, so we can read it in again without having
1811 to scan last binlog file (normal shutdown/startup, not crash recovery).
1812
1813 The most recent GTID within each domain_id is written after any other GTID
1814 within this domain.
1815 */
1816 int
write_to_iocache(IO_CACHE * dest)1817 rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
1818 {
1819 ulong i, j;
1820 char buf[21];
1821 int res= 0;
1822
1823 mysql_mutex_lock(&LOCK_binlog_state);
1824 for (i= 0; i < hash.records; ++i)
1825 {
1826 element *e= (element *)my_hash_element(&hash, i);
1827 if (!e->last_gtid)
1828 {
1829 DBUG_ASSERT(e->hash.records == 0);
1830 continue;
1831 }
1832 for (j= 0; j <= e->hash.records; ++j)
1833 {
1834 const rpl_gtid *gtid;
1835 if (j < e->hash.records)
1836 {
1837 gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
1838 if (gtid == e->last_gtid)
1839 continue;
1840 }
1841 else
1842 gtid= e->last_gtid;
1843
1844 longlong10_to_str(gtid->seq_no, buf, 10);
1845 if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id,
1846 buf))
1847 {
1848 res= 1;
1849 goto end;
1850 }
1851 }
1852 }
1853
1854 end:
1855 mysql_mutex_unlock(&LOCK_binlog_state);
1856 return res;
1857 }
1858
1859
1860 int
read_from_iocache(IO_CACHE * src)1861 rpl_binlog_state::read_from_iocache(IO_CACHE *src)
1862 {
1863 /* 10-digit - 10-digit - 20-digit \n \0 */
1864 char buf[10+1+10+1+20+1+1];
1865 const char *p, *end;
1866 rpl_gtid gtid;
1867 int res= 0;
1868
1869 mysql_mutex_lock(&LOCK_binlog_state);
1870 reset_nolock();
1871 for (;;)
1872 {
1873 size_t len= my_b_gets(src, buf, sizeof(buf));
1874 if (!len)
1875 break;
1876 p= buf;
1877 end= buf + len;
1878 if (gtid_parser_helper(&p, end, >id) ||
1879 update_nolock(>id, false))
1880 {
1881 res= 1;
1882 break;
1883 }
1884 }
1885 mysql_mutex_unlock(&LOCK_binlog_state);
1886 return res;
1887 }
1888
1889
1890 rpl_gtid *
find_nolock(uint32 domain_id,uint32 server_id)1891 rpl_binlog_state::find_nolock(uint32 domain_id, uint32 server_id)
1892 {
1893 element *elem;
1894 if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
1895 return NULL;
1896 return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
1897 }
1898
1899 rpl_gtid *
find(uint32 domain_id,uint32 server_id)1900 rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
1901 {
1902 rpl_gtid *p;
1903 mysql_mutex_lock(&LOCK_binlog_state);
1904 p= find_nolock(domain_id, server_id);
1905 mysql_mutex_unlock(&LOCK_binlog_state);
1906 return p;
1907 }
1908
1909 rpl_gtid *
find_most_recent(uint32 domain_id)1910 rpl_binlog_state::find_most_recent(uint32 domain_id)
1911 {
1912 element *elem;
1913 rpl_gtid *gtid= NULL;
1914
1915 mysql_mutex_lock(&LOCK_binlog_state);
1916 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1917 if (elem && elem->last_gtid)
1918 gtid= elem->last_gtid;
1919 mysql_mutex_unlock(&LOCK_binlog_state);
1920
1921 return gtid;
1922 }
1923
1924
1925 uint32
count()1926 rpl_binlog_state::count()
1927 {
1928 uint32 c= 0;
1929 uint32 i;
1930
1931 mysql_mutex_lock(&LOCK_binlog_state);
1932 for (i= 0; i < hash.records; ++i)
1933 c+= ((element *)my_hash_element(&hash, i))->hash.records;
1934 mysql_mutex_unlock(&LOCK_binlog_state);
1935
1936 return c;
1937 }
1938
1939
1940 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)1941 rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
1942 {
1943 uint32 i, j, pos;
1944 int res= 0;
1945
1946 mysql_mutex_lock(&LOCK_binlog_state);
1947 pos= 0;
1948 for (i= 0; i < hash.records; ++i)
1949 {
1950 element *e= (element *)my_hash_element(&hash, i);
1951 if (!e->last_gtid)
1952 {
1953 DBUG_ASSERT(e->hash.records==0);
1954 continue;
1955 }
1956 for (j= 0; j <= e->hash.records; ++j)
1957 {
1958 const rpl_gtid *gtid;
1959 if (j < e->hash.records)
1960 {
1961 gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1962 if (gtid == e->last_gtid)
1963 continue;
1964 }
1965 else
1966 gtid= e->last_gtid;
1967
1968 if (pos >= list_size)
1969 {
1970 res= 1;
1971 goto end;
1972 }
1973 memcpy(>id_list[pos++], gtid, sizeof(*gtid));
1974 }
1975 }
1976
1977 end:
1978 mysql_mutex_unlock(&LOCK_binlog_state);
1979 return res;
1980 }
1981
1982
1983 /*
1984 Get a list of the most recently binlogged GTID, for each domain_id.
1985
1986 This can be used when switching from being a master to being a slave,
1987 to know where to start replicating from the new master.
1988
1989 The returned list must be de-allocated with my_free().
1990
1991 Returns 0 for ok, non-zero for out-of-memory.
1992 */
1993 int
get_most_recent_gtid_list(rpl_gtid ** list,uint32 * size)1994 rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
1995 {
1996 uint32 i;
1997 uint32 alloc_size, out_size;
1998 int res= 0;
1999
2000 out_size= 0;
2001 mysql_mutex_lock(&LOCK_binlog_state);
2002 alloc_size= hash.records;
2003 if (!(*list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
2004 alloc_size * sizeof(rpl_gtid), MYF(MY_WME))))
2005 {
2006 res= 1;
2007 goto end;
2008 }
2009 for (i= 0; i < alloc_size; ++i)
2010 {
2011 element *e= (element *)my_hash_element(&hash, i);
2012 if (!e->last_gtid)
2013 continue;
2014 memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
2015 }
2016
2017 end:
2018 mysql_mutex_unlock(&LOCK_binlog_state);
2019 *size= out_size;
2020 return res;
2021 }
2022
2023 bool
append_pos(String * str)2024 rpl_binlog_state::append_pos(String *str)
2025 {
2026 uint32 i;
2027
2028 mysql_mutex_lock(&LOCK_binlog_state);
2029 reset_dynamic(>id_sort_array);
2030
2031 for (i= 0; i < hash.records; ++i)
2032 {
2033 element *e= (element *)my_hash_element(&hash, i);
2034 if (e->last_gtid &&
2035 insert_dynamic(>id_sort_array, (const void *) e->last_gtid))
2036 {
2037 mysql_mutex_unlock(&LOCK_binlog_state);
2038 return true;
2039 }
2040 }
2041 rpl_slave_state_tostring_helper(>id_sort_array, str);
2042 mysql_mutex_unlock(&LOCK_binlog_state);
2043
2044 return false;
2045 }
2046
2047
2048 bool
append_state(String * str)2049 rpl_binlog_state::append_state(String *str)
2050 {
2051 uint32 i, j;
2052 bool res= false;
2053
2054 mysql_mutex_lock(&LOCK_binlog_state);
2055 reset_dynamic(>id_sort_array);
2056
2057 for (i= 0; i < hash.records; ++i)
2058 {
2059 element *e= (element *)my_hash_element(&hash, i);
2060 if (!e->last_gtid)
2061 {
2062 DBUG_ASSERT(e->hash.records==0);
2063 continue;
2064 }
2065 for (j= 0; j <= e->hash.records; ++j)
2066 {
2067 const rpl_gtid *gtid;
2068 if (j < e->hash.records)
2069 {
2070 gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
2071 if (gtid == e->last_gtid)
2072 continue;
2073 }
2074 else
2075 gtid= e->last_gtid;
2076
2077 if (insert_dynamic(>id_sort_array, (const void *) gtid))
2078 {
2079 res= true;
2080 goto end;
2081 }
2082 }
2083 }
2084
2085 rpl_slave_state_tostring_helper(>id_sort_array, str);
2086
2087 end:
2088 mysql_mutex_unlock(&LOCK_binlog_state);
2089 return res;
2090 }
2091
2092 /**
2093 Remove domains supplied by the first argument from binlog state.
2094 Removal is done for any domain whose last gtids (from all its servers) match
2095 ones in Gtid list event of the 2nd argument.
2096
2097 @param ids gtid domain id sequence, may contain dups
2098 @param glev pointer to Gtid list event describing
2099 the match condition
2100 @param errbuf [out] pointer to possible error message array
2101
2102 @retval NULL as success when at least one domain is removed
2103 @retval "" empty string to indicate ineffective call
2104 when no domains removed
2105 @retval NOT EMPTY string otherwise an error message
2106 */
2107 const char*
drop_domain(DYNAMIC_ARRAY * ids,Gtid_list_log_event * glev,char * errbuf)2108 rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids,
2109 Gtid_list_log_event *glev,
2110 char* errbuf)
2111 {
2112 DYNAMIC_ARRAY domain_unique; // sequece (unsorted) of unique element*:s
2113 rpl_binlog_state::element* domain_unique_buffer[16];
2114 ulong k, l;
2115 const char* errmsg= NULL;
2116
2117 DBUG_ENTER("rpl_binlog_state::drop_domain");
2118
2119 my_init_dynamic_array2(PSI_INSTRUMENT_ME, &domain_unique,
2120 sizeof(element*), domain_unique_buffer,
2121 sizeof(domain_unique_buffer) / sizeof(element*), 4, 0);
2122
2123 mysql_mutex_lock(&LOCK_binlog_state);
2124
2125 /*
2126 Gtid list is supposed to come from a binlog's Gtid_list event and
2127 therefore should be a subset of the current binlog state. That is
2128 for every domain in the list the binlog state contains a gtid with
2129 sequence number not less than that of the list.
2130 Exceptions of this inclusion rule are:
2131 A. the list may still refer to gtids from already deleted domains.
2132 Files containing them must have been purged whereas the file
2133 with the list is not yet.
2134 B. out of order groups were injected
2135 C. manually build list of binlog files violating the inclusion
2136 constraint.
2137 While A is a normal case (not necessarily distinguishable from C though),
2138 B and C may require the user's attention so any (incl the A's suspected)
2139 inconsistency is diagnosed and *warned*.
2140 */
2141 for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0)
2142 {
2143 rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id,
2144 glev->list[l].server_id);
2145 if (!rb_state_gtid)
2146 sprintf(errbuf,
2147 "missing gtids from the '%u-%u' domain-server pair which is "
2148 "referred to in the gtid list describing an earlier state. Ignore "
2149 "if the domain ('%u') was already explicitly deleted",
2150 glev->list[l].domain_id, glev->list[l].server_id,
2151 glev->list[l].domain_id);
2152 else if (rb_state_gtid->seq_no < glev->list[l].seq_no)
2153 sprintf(errbuf,
2154 "having a gtid '%u-%u-%llu' which is less than "
2155 "the '%u-%u-%llu' of the gtid list describing an earlier state. "
2156 "The state may have been affected by manually injecting "
2157 "a lower sequence number gtid or via replication",
2158 rb_state_gtid->domain_id, rb_state_gtid->server_id,
2159 rb_state_gtid->seq_no, glev->list[l].domain_id,
2160 glev->list[l].server_id, glev->list[l].seq_no);
2161 if (strlen(errbuf)) // use strlen() as cheap flag
2162 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2163 ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2164 "The current gtid binlog state is incompatible with "
2165 "a former one %s.", errbuf);
2166 }
2167
2168 /*
2169 For each domain_id from ids
2170 when no such domain in binlog state
2171 warn && continue
2172 For each domain.server's last gtid
2173 when not locate the last gtid in glev.list
2174 error out binlog state can't change
2175 otherwise continue
2176 */
2177 for (ulong i= 0; i < ids->elements; i++)
2178 {
2179 rpl_binlog_state::element *elem= NULL;
2180 uint32 *ptr_domain_id;
2181 bool not_match;
2182
2183 ptr_domain_id= (uint32*) dynamic_array_ptr(ids, i);
2184 elem= (rpl_binlog_state::element *)
2185 my_hash_search(&hash, (const uchar *) ptr_domain_id, 0);
2186 if (!elem)
2187 {
2188 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2189 ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2190 "The gtid domain being deleted ('%lu') is not in "
2191 "the current binlog state", (unsigned long) *ptr_domain_id);
2192 continue;
2193 }
2194
2195 for (not_match= true, k= 0; k < elem->hash.records; k++)
2196 {
2197 rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k);
2198 for (ulong l= 0; l < glev->count && not_match; l++)
2199 not_match= !(*d_gtid == glev->list[l]);
2200 }
2201
2202 if (not_match)
2203 {
2204 sprintf(errbuf, "binlog files may contain gtids from the domain ('%u') "
2205 "being deleted. Make sure to first purge those files",
2206 *ptr_domain_id);
2207 errmsg= errbuf;
2208 goto end;
2209 }
2210 // compose a sequence of unique pointers to domain object
2211 for (k= 0; k < domain_unique.elements; k++)
2212 {
2213 if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k)
2214 == elem)
2215 break; // domain_id's elem has been already in
2216 }
2217 if (k == domain_unique.elements) // proven not to have duplicates
2218 insert_dynamic(&domain_unique, (uchar*) &elem);
2219 }
2220
2221 // Domain removal from binlog state
2222 for (k= 0; k < domain_unique.elements; k++)
2223 {
2224 rpl_binlog_state::element *elem= *(rpl_binlog_state::element**)
2225 dynamic_array_ptr(&domain_unique, k);
2226 my_hash_free(&elem->hash);
2227 my_hash_delete(&hash, (uchar*) elem);
2228 }
2229
2230 DBUG_ASSERT(strlen(errbuf) == 0);
2231
2232 if (domain_unique.elements == 0)
2233 errmsg= "";
2234
2235 end:
2236 mysql_mutex_unlock(&LOCK_binlog_state);
2237 delete_dynamic(&domain_unique);
2238
2239 DBUG_RETURN(errmsg);
2240 }
2241
slave_connection_state()2242 slave_connection_state::slave_connection_state()
2243 {
2244 my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
2245 offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id),
2246 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
2247 my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
2248 }
2249
2250
~slave_connection_state()2251 slave_connection_state::~slave_connection_state()
2252 {
2253 my_hash_free(&hash);
2254 delete_dynamic(>id_sort_array);
2255 }
2256
2257
2258 /*
2259 Create a hash from the slave GTID state that is sent to master when slave
2260 connects to start replication.
2261
2262 The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
2263
2264 0-2-112,1-4-1022
2265
2266 The state gives for each domain_id the GTID to start replication from for
2267 the corresponding replication stream. So domain_id must be unique.
2268
2269 Returns 0 if ok, non-zero if error due to malformed input.
2270
2271 Note that input string is built by slave server, so it will not be incorrect
2272 unless bug/corruption/malicious server. So we just need basic sanity check,
2273 not fancy user-friendly error message.
2274 */
2275
2276 int
load(const char * slave_request,size_t len)2277 slave_connection_state::load(const char *slave_request, size_t len)
2278 {
2279 const char *p, *end;
2280 uchar *rec;
2281 rpl_gtid *gtid;
2282 const entry *e;
2283
2284 reset();
2285 p= slave_request;
2286 end= slave_request + len;
2287 if (p == end)
2288 return 0;
2289 for (;;)
2290 {
2291 if (!(rec= (uchar *)my_malloc(PSI_INSTRUMENT_ME, sizeof(entry), MYF(MY_WME))))
2292 return 1;
2293 gtid= &((entry *)rec)->gtid;
2294 if (gtid_parser_helper(&p, end, gtid))
2295 {
2296 my_free(rec);
2297 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2298 return 1;
2299 }
2300 if ((e= (const entry *)
2301 my_hash_search(&hash, (const uchar *)(>id->domain_id), 0)))
2302 {
2303 my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
2304 gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id,
2305 e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id);
2306 my_free(rec);
2307 return 1;
2308 }
2309 ((entry *)rec)->flags= 0;
2310 if (my_hash_insert(&hash, rec))
2311 {
2312 my_free(rec);
2313 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2314 return 1;
2315 }
2316 if (p == end)
2317 break; /* Finished. */
2318 if (*p != ',')
2319 {
2320 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2321 return 1;
2322 }
2323 ++p;
2324 }
2325
2326 return 0;
2327 }
2328
2329
2330 int
load(const rpl_gtid * gtid_list,uint32 count)2331 slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
2332 {
2333 uint32 i;
2334
2335 reset();
2336 for (i= 0; i < count; ++i)
2337 if (update(>id_list[i]))
2338 return 1;
2339 return 0;
2340 }
2341
2342
2343 static int
slave_connection_state_load_cb(rpl_gtid * gtid,void * data)2344 slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
2345 {
2346 slave_connection_state *state= (slave_connection_state *)data;
2347 return state->update(gtid);
2348 }
2349
2350
2351 /*
2352 Same as rpl_slave_state::tostring(), but populates a slave_connection_state
2353 instead.
2354 */
2355 int
load(rpl_slave_state * state,rpl_gtid * extra_gtids,uint32 num_extra)2356 slave_connection_state::load(rpl_slave_state *state,
2357 rpl_gtid *extra_gtids, uint32 num_extra)
2358 {
2359 reset();
2360 return state->iterate(slave_connection_state_load_cb, this,
2361 extra_gtids, num_extra, false);
2362 }
2363
2364
2365 slave_connection_state::entry *
find_entry(uint32 domain_id)2366 slave_connection_state::find_entry(uint32 domain_id)
2367 {
2368 return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
2369 }
2370
2371
2372 rpl_gtid *
find(uint32 domain_id)2373 slave_connection_state::find(uint32 domain_id)
2374 {
2375 entry *e= find_entry(domain_id);
2376 if (!e)
2377 return NULL;
2378 return &e->gtid;
2379 }
2380
2381
2382 int
update(const rpl_gtid * in_gtid)2383 slave_connection_state::update(const rpl_gtid *in_gtid)
2384 {
2385 entry *e;
2386 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2387 if (rec)
2388 {
2389 e= (entry *)rec;
2390 e->gtid= *in_gtid;
2391 return 0;
2392 }
2393
2394 if (!(e= (entry *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME))))
2395 return 1;
2396 e->gtid= *in_gtid;
2397 e->flags= 0;
2398 if (my_hash_insert(&hash, (uchar *)e))
2399 {
2400 my_free(e);
2401 return 1;
2402 }
2403
2404 return 0;
2405 }
2406
2407
2408 void
remove(const rpl_gtid * in_gtid)2409 slave_connection_state::remove(const rpl_gtid *in_gtid)
2410 {
2411 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2412 #ifdef DBUG_ASSERT_EXISTS
2413 bool err;
2414 rpl_gtid *slave_gtid= &((entry *)rec)->gtid;
2415 DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
2416 DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
2417 DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
2418 err=
2419 #endif
2420 my_hash_delete(&hash, rec);
2421 DBUG_ASSERT(!err);
2422 }
2423
2424
2425 void
remove_if_present(const rpl_gtid * in_gtid)2426 slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
2427 {
2428 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2429 if (rec)
2430 my_hash_delete(&hash, rec);
2431 }
2432
2433
2434 int
to_string(String * out_str)2435 slave_connection_state::to_string(String *out_str)
2436 {
2437 out_str->length(0);
2438 return append_to_string(out_str);
2439 }
2440
2441
2442 int
append_to_string(String * out_str)2443 slave_connection_state::append_to_string(String *out_str)
2444 {
2445 uint32 i;
2446 bool first;
2447
2448 first= true;
2449 for (i= 0; i < hash.records; ++i)
2450 {
2451 const entry *e= (const entry *)my_hash_element(&hash, i);
2452 if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first))
2453 return 1;
2454 }
2455 return 0;
2456 }
2457
2458
2459 int
get_gtid_list(rpl_gtid * gtid_list,uint32 list_size)2460 slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
2461 {
2462 uint32 i, pos;
2463
2464 pos= 0;
2465 for (i= 0; i < hash.records; ++i)
2466 {
2467 entry *e;
2468 if (pos >= list_size)
2469 return 1;
2470 e= (entry *)my_hash_element(&hash, i);
2471 memcpy(>id_list[pos++], &e->gtid, sizeof(e->gtid));
2472 }
2473
2474 return 0;
2475 }
2476
2477
2478 /*
2479 Check if the GTID position has been reached, for mysql_binlog_send().
2480
2481 The position has not been reached if we have anything in the state, unless
2482 it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
2483 belong to this master at all), or the START_OWN_SLAVE_POS (which means that
2484 we start on an old position from when the server was a slave with
2485 --log-slave-updates=0).
2486 */
2487 bool
is_pos_reached()2488 slave_connection_state::is_pos_reached()
2489 {
2490 uint32 i;
2491
2492 for (i= 0; i < hash.records; ++i)
2493 {
2494 entry *e= (entry *)my_hash_element(&hash, i);
2495 if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
2496 return false;
2497 }
2498
2499 return true;
2500 }
2501
2502
2503 /*
2504 Execute a MASTER_GTID_WAIT().
2505 The position to wait for is in gtid_str in string form.
2506 The timeout in microseconds is in timeout_us, zero means no timeout.
2507
2508 Returns:
2509 1 for error.
2510 0 for wait completed.
2511 -1 for wait timed out.
2512 */
2513 int
wait_for_pos(THD * thd,String * gtid_str,longlong timeout_us)2514 gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
2515 {
2516 int err;
2517 rpl_gtid *wait_pos;
2518 uint32 count, i;
2519 struct timespec wait_until, *wait_until_ptr;
2520 ulonglong before;
2521
2522 /* Wait for the empty position returns immediately. */
2523 if (gtid_str->length() == 0)
2524 {
2525 status_var_increment(thd->status_var.master_gtid_wait_count);
2526 return 0;
2527 }
2528
2529 if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
2530 &count)))
2531 {
2532 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2533 return 1;
2534 }
2535 status_var_increment(thd->status_var.master_gtid_wait_count);
2536 before= microsecond_interval_timer();
2537
2538 if (timeout_us >= 0)
2539 {
2540 set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
2541 wait_until_ptr= &wait_until;
2542 }
2543 else
2544 wait_until_ptr= NULL;
2545 err= 0;
2546 for (i= 0; i < count; ++i)
2547 {
2548 if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
2549 break;
2550 }
2551 switch (err)
2552 {
2553 case -1:
2554 status_var_increment(thd->status_var.master_gtid_wait_timeouts);
2555 /* fall through */
2556 case 0:
2557 status_var_add(thd->status_var.master_gtid_wait_time,
2558 static_cast<ulong>
2559 (microsecond_interval_timer() - before));
2560 }
2561 my_free(wait_pos);
2562 return err;
2563 }
2564
2565
2566 void
promote_new_waiter(gtid_waiting::hash_element * he)2567 gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
2568 {
2569 queue_element *qe;
2570
2571 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2572 if (queue_empty(&he->queue))
2573 return;
2574 qe= (queue_element *)queue_top(&he->queue);
2575 qe->do_small_wait= true;
2576 mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2577 }
2578
2579 void
process_wait_hash(uint64 wakeup_seq_no,gtid_waiting::hash_element * he)2580 gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
2581 gtid_waiting::hash_element *he)
2582 {
2583 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2584
2585 for (;;)
2586 {
2587 queue_element *qe;
2588
2589 if (queue_empty(&he->queue))
2590 break;
2591 qe= (queue_element *)queue_top(&he->queue);
2592 if (qe->wait_seq_no > wakeup_seq_no)
2593 break;
2594 DBUG_ASSERT(!qe->done);
2595 queue_remove_top(&he->queue);
2596 qe->done= true;;
2597 mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2598 }
2599 }
2600
2601
2602 /*
2603 Execute a MASTER_GTID_WAIT() for one specific domain.
2604
2605 The implementation is optimised primarily for (1) minimal performance impact
2606 on the slave replication threads, and secondarily for (2) quick performance
2607 of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
2608 read to clients in an async replication read-scaleout scenario.
2609
2610 To achieve (1), we have a "small" wait and a "large" wait. The small wait
2611 contends with the replication threads on the lock on the gtid_slave_pos, so
2612 only minimal processing is done under that lock, and only a single waiter at
2613 a time does the small wait.
2614
2615 If there is already a small waiter, a new thread will either replace the
2616 small waiter (if it needs to wait for an earlier sequence number), or
2617 instead do a "large" wait.
2618
2619 Once awoken on the small wait, the waiting thread releases the lock shared
2620 with the SQL threads quickly, and then processes all waiters currently doing
2621 the large wait using a different lock that does not impact replication.
2622
2623 This way, the SQL threads only need to do a single check + possibly a
2624 pthread_cond_signal() when updating the gtid_slave_state, and the time that
2625 non-SQL threads contend for the lock on gtid_slave_state is minimized.
2626
2627 There is always at least one thread that has the responsibility to ensure
2628 that there is a small waiter; this thread has queue_element::do_small_wait
2629 set to true. This thread will do the small wait until it is done, at which
2630 point it will make sure to pass on the responsibility to another thread.
2631 Normally only one thread has do_small_wait==true, but it can occasionally
2632 happen that there is more than one, when threads race one another for the
2633 lock on the small wait (this results in slightly increased activity on the
2634 small lock but is otherwise harmless).
2635
2636 Returns:
2637 0 Wait completed normally
2638 -1 Wait completed due to timeout
2639 1 An error (my_error() will have been called to set the error in the da)
2640 */
2641 int
wait_for_gtid(THD * thd,rpl_gtid * wait_gtid,struct timespec * wait_until)2642 gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
2643 struct timespec *wait_until)
2644 {
2645 bool timed_out= false;
2646 #ifdef HAVE_REPLICATION
2647 queue_element elem;
2648 uint32 domain_id= wait_gtid->domain_id;
2649 uint64 seq_no= wait_gtid->seq_no;
2650 hash_element *he;
2651 rpl_slave_state::element *slave_state_elem= NULL;
2652 PSI_stage_info old_stage;
2653 bool did_enter_cond= false;
2654
2655 elem.wait_seq_no= seq_no;
2656 elem.thd= thd;
2657 elem.done= false;
2658
2659 mysql_mutex_lock(&LOCK_gtid_waiting);
2660 if (!(he= get_entry(wait_gtid->domain_id)))
2661 {
2662 mysql_mutex_unlock(&LOCK_gtid_waiting);
2663 return 1;
2664 }
2665 /*
2666 If there is already another waiter with seq_no no larger than our own,
2667 we are sure that there is already a small waiter that will wake us up
2668 (or later pass the small wait responsibility to us). So in this case, we
2669 do not need to touch the small wait lock at all.
2670 */
2671 elem.do_small_wait=
2672 (queue_empty(&he->queue) ||
2673 ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
2674
2675 if (register_in_wait_queue(thd, wait_gtid, he, &elem))
2676 {
2677 mysql_mutex_unlock(&LOCK_gtid_waiting);
2678 return 1;
2679 }
2680 /*
2681 Loop, doing either the small or large wait as appropriate, until either
2682 the position waited for is reached, or we get a kill or timeout.
2683 */
2684 for (;;)
2685 {
2686 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2687
2688 if (elem.do_small_wait)
2689 {
2690 uint64 wakeup_seq_no;
2691 queue_element *cur_waiter;
2692
2693 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2694 /*
2695 The elements in the gtid_slave_state_hash are never re-allocated once
2696 they enter the hash, so we do not need to re-do the lookup after releasing
2697 and re-aquiring the lock.
2698 */
2699 if (!slave_state_elem &&
2700 !(slave_state_elem= rpl_global_gtid_slave_state->get_element(domain_id)))
2701 {
2702 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2703 remove_from_wait_queue(he, &elem);
2704 promote_new_waiter(he);
2705 if (did_enter_cond)
2706 thd->EXIT_COND(&old_stage);
2707 else
2708 mysql_mutex_unlock(&LOCK_gtid_waiting);
2709 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2710 return 1;
2711 }
2712
2713 if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
2714 {
2715 /*
2716 We do not have to wait. (We will be removed from the wait queue when
2717 we call process_wait_hash() below.
2718 */
2719 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2720 }
2721 else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
2722 slave_state_elem->min_wait_seq_no <= seq_no)
2723 {
2724 /*
2725 There is already a suitable small waiter, go do the large wait.
2726 (Normally we would not have needed to check the small wait in this
2727 case, but it can happen if we race with another thread for the small
2728 lock).
2729 */
2730 elem.do_small_wait= false;
2731 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2732 }
2733 else
2734 {
2735 /*
2736 We have to do the small wait ourselves (stealing it from any thread
2737 that might already be waiting for a later seq_no).
2738 */
2739 slave_state_elem->gtid_waiter= &elem;
2740 slave_state_elem->min_wait_seq_no= seq_no;
2741 if (cur_waiter)
2742 {
2743 /* We stole the wait, so wake up the old waiting thread. */
2744 mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
2745 }
2746
2747 /* Release the large lock, and do the small wait. */
2748 if (did_enter_cond)
2749 {
2750 thd->EXIT_COND(&old_stage);
2751 did_enter_cond= false;
2752 }
2753 else
2754 mysql_mutex_unlock(&LOCK_gtid_waiting);
2755 thd->ENTER_COND(&slave_state_elem->COND_wait_gtid,
2756 &rpl_global_gtid_slave_state->LOCK_slave_state,
2757 &stage_master_gtid_wait_primary, &old_stage);
2758 do
2759 {
2760 if (unlikely(thd->check_killed(1)))
2761 break;
2762 else if (wait_until)
2763 {
2764 int err=
2765 mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
2766 &rpl_global_gtid_slave_state->LOCK_slave_state,
2767 wait_until);
2768 if (err == ETIMEDOUT || err == ETIME)
2769 {
2770 timed_out= true;
2771 break;
2772 }
2773 }
2774 else
2775 mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
2776 &rpl_global_gtid_slave_state->LOCK_slave_state);
2777 } while (slave_state_elem->gtid_waiter == &elem);
2778 wakeup_seq_no= slave_state_elem->highest_seq_no;
2779 /*
2780 If we aborted due to timeout or kill, remove us as waiter.
2781
2782 If we were replaced by another waiter with a smaller seq_no, then we
2783 no longer have responsibility for the small wait.
2784 */
2785 if ((cur_waiter= slave_state_elem->gtid_waiter))
2786 {
2787 if (cur_waiter == &elem)
2788 slave_state_elem->gtid_waiter= NULL;
2789 else if (slave_state_elem->min_wait_seq_no <= seq_no)
2790 elem.do_small_wait= false;
2791 }
2792 thd->EXIT_COND(&old_stage);
2793
2794 mysql_mutex_lock(&LOCK_gtid_waiting);
2795 }
2796
2797 /*
2798 Note that hash_entry pointers do not change once allocated, so we do
2799 not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
2800 */
2801 process_wait_hash(wakeup_seq_no, he);
2802 }
2803 else
2804 {
2805 /* Do the large wait. */
2806 if (!did_enter_cond)
2807 {
2808 thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
2809 &stage_master_gtid_wait, &old_stage);
2810 did_enter_cond= true;
2811 }
2812 while (!elem.done && likely(!thd->check_killed(1)))
2813 {
2814 thd_wait_begin(thd, THD_WAIT_BINLOG);
2815 if (wait_until)
2816 {
2817 int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
2818 &LOCK_gtid_waiting, wait_until);
2819 if (err == ETIMEDOUT || err == ETIME)
2820 timed_out= true;
2821 }
2822 else
2823 mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
2824 thd_wait_end(thd);
2825 if (elem.do_small_wait || timed_out)
2826 break;
2827 }
2828 }
2829
2830 if ((thd->killed || timed_out) && !elem.done)
2831 {
2832 /* Aborted, so remove ourselves from the hash. */
2833 remove_from_wait_queue(he, &elem);
2834 elem.done= true;
2835 }
2836 if (elem.done)
2837 {
2838 /*
2839 If our wait is done, but we have (or were passed) responsibility for
2840 the small wait, then we need to pass on that task to someone else.
2841 */
2842 if (elem.do_small_wait)
2843 promote_new_waiter(he);
2844 break;
2845 }
2846 }
2847
2848 if (did_enter_cond)
2849 thd->EXIT_COND(&old_stage);
2850 else
2851 mysql_mutex_unlock(&LOCK_gtid_waiting);
2852 if (thd->killed)
2853 thd->send_kill_message();
2854 #endif /* HAVE_REPLICATION */
2855 return timed_out ? -1 : 0;
2856 }
2857
2858
2859 static void
free_hash_element(void * p)2860 free_hash_element(void *p)
2861 {
2862 gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
2863 delete_queue(&e->queue);
2864 my_free(e);
2865 }
2866
2867
2868 void
init()2869 gtid_waiting::init()
2870 {
2871 my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
2872 offsetof(hash_element, domain_id), sizeof(uint32), NULL,
2873 free_hash_element, HASH_UNIQUE);
2874 mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
2875 }
2876
2877
2878 void
destroy()2879 gtid_waiting::destroy()
2880 {
2881 mysql_mutex_destroy(&LOCK_gtid_waiting);
2882 my_hash_free(&hash);
2883 }
2884
2885
2886 static int
cmp_queue_elem(void *,uchar * a,uchar * b)2887 cmp_queue_elem(void *, uchar *a, uchar *b)
2888 {
2889 uint64 seq_no_a= *(uint64 *)a;
2890 uint64 seq_no_b= *(uint64 *)b;
2891 if (seq_no_a < seq_no_b)
2892 return -1;
2893 else if (seq_no_a == seq_no_b)
2894 return 0;
2895 else
2896 return 1;
2897 }
2898
2899
2900 gtid_waiting::hash_element *
get_entry(uint32 domain_id)2901 gtid_waiting::get_entry(uint32 domain_id)
2902 {
2903 hash_element *e;
2904
2905 if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
2906 return e;
2907
2908 if (!(e= (hash_element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME))))
2909 return NULL;
2910
2911 if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
2912 cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
2913 {
2914 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2915 my_free(e);
2916 return NULL;
2917 }
2918 e->domain_id= domain_id;
2919 if (my_hash_insert(&hash, (uchar *)e))
2920 {
2921 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2922 delete_queue(&e->queue);
2923 my_free(e);
2924 return NULL;
2925 }
2926 return e;
2927 }
2928
2929
2930 int
register_in_wait_queue(THD * thd,rpl_gtid * wait_gtid,gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2931 gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
2932 gtid_waiting::hash_element *he,
2933 gtid_waiting::queue_element *elem)
2934 {
2935 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2936
2937 if (queue_insert_safe(&he->queue, (uchar *)elem))
2938 {
2939 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2940 return 1;
2941 }
2942
2943 return 0;
2944 }
2945
2946
2947 void
remove_from_wait_queue(gtid_waiting::hash_element * he,gtid_waiting::queue_element * elem)2948 gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
2949 gtid_waiting::queue_element *elem)
2950 {
2951 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2952
2953 queue_remove(&he->queue, elem->queue_idx);
2954 }
2955