1 /* Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 02110-1301 USA */
23
24 #include "rpl_gtid.h"
25
26 #include "rpl_gtid_persist.h" // gtid_table_persistor
27 #include "sql_class.h" // THD
28 #include "debug_sync.h" // DEBUG_SYNC
29 #include "binlog.h"
30
31 PSI_memory_key key_memory_Gtid_state_group_commit_sidno;
32
clear(THD * thd)33 int Gtid_state::clear(THD *thd)
34 {
35 DBUG_ENTER("Gtid_state::clear()");
36 int ret= 0;
37 // the wrlock implies that no other thread can hold any of the mutexes
38 sid_lock->assert_some_wrlock();
39 lost_gtids.clear();
40 executed_gtids.clear();
41 gtids_only_in_table.clear();
42 previous_gtids_logged.clear();
43 /* Reset gtid_executed table. */
44 if ((ret= gtid_table_persistor->reset(thd)) == 1)
45 {
46 /*
47 Gtid table is not ready to be used, so failed to
48 open it. Ignore the error.
49 */
50 thd->clear_error();
51 ret= 0;
52 }
53 next_free_gno= 1;
54 DBUG_RETURN(ret);
55 }
56
57
acquire_ownership(THD * thd,const Gtid & gtid)58 enum_return_status Gtid_state::acquire_ownership(THD *thd, const Gtid >id)
59 {
60 DBUG_ENTER("Gtid_state::acquire_ownership");
61 // caller must take both global_sid_lock and lock on the SIDNO.
62 global_sid_lock->assert_some_lock();
63 gtid_state->assert_sidno_lock_owner(gtid.sidno);
64 DBUG_ASSERT(!executed_gtids.contains_gtid(gtid));
65 DBUG_PRINT("info", ("gtid=%d:%lld", gtid.sidno, gtid.gno));
66 DBUG_ASSERT(thd->owned_gtid.sidno == 0);
67 if (owned_gtids.add_gtid_owner(gtid, thd->thread_id()) != RETURN_STATUS_OK)
68 goto err;
69 if (thd->get_gtid_next_list() != NULL)
70 {
71 #ifdef HAVE_GTID_NEXT_LIST
72 thd->owned_gtid_set._add_gtid(gtid);
73 thd->owned_gtid.sidno= THD::OWNED_SIDNO_GTID_SET;
74 thd->owned_sid.clear();
75 #else
76 DBUG_ASSERT(0);
77 #endif
78 }
79 else
80 {
81 thd->owned_gtid= gtid;
82 thd->owned_gtid.dbug_print(NULL, "set owned_gtid in acquire_ownership");
83 thd->owned_sid= sid_map->sidno_to_sid(gtid.sidno);
84 }
85 RETURN_OK;
86 err:
87 if (thd->get_gtid_next_list() != NULL)
88 {
89 #ifdef HAVE_GTID_NEXT_LIST
90 Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
91 Gtid g= git.get();
92 while (g.sidno != 0)
93 {
94 owned_gtids.remove_gtid(g);
95 g= git.get();
96 }
97 #else
98 DBUG_ASSERT(0);
99 #endif
100 }
101 thd->clear_owned_gtids();
102 thd->owned_gtid.dbug_print(NULL,
103 "set owned_gtid (clear) in acquire_ownership");
104 RETURN_REPORTED_ERROR;
105 }
106
107 #ifdef HAVE_GTID_NEXT_LIST
lock_owned_sidnos(const THD * thd)108 void Gtid_state::lock_owned_sidnos(const THD *thd)
109 {
110 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
111 lock_sidnos(&thd->owned_gtid_set);
112 else if (thd->owned_gtid.sidno > 0)
113 lock_sidno(thd->owned_gtid.sidno);
114 }
115 #endif
116
117
unlock_owned_sidnos(const THD * thd)118 void Gtid_state::unlock_owned_sidnos(const THD *thd)
119 {
120 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
121 {
122 #ifdef HAVE_GTID_NEXT_LIST
123 unlock_sidnos(&thd->owned_gtid_set);
124 #else
125 DBUG_ASSERT(0);
126 #endif
127 }
128 else if (thd->owned_gtid.sidno > 0)
129 {
130 unlock_sidno(thd->owned_gtid.sidno);
131 }
132 }
133
134
broadcast_owned_sidnos(const THD * thd)135 void Gtid_state::broadcast_owned_sidnos(const THD *thd)
136 {
137 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
138 {
139 #ifdef HAVE_GTID_NEXT_LIST
140 broadcast_sidnos(&thd->owned_gtid_set);
141 #else
142 DBUG_ASSERT(0);
143 #endif
144 }
145 else if (thd->owned_gtid.sidno > 0)
146 {
147 broadcast_sidno(thd->owned_gtid.sidno);
148 }
149 }
150
151
update_commit_group(THD * first_thd)152 void Gtid_state::update_commit_group(THD *first_thd)
153 {
154 DBUG_ENTER("Gtid_state::update_commit_group");
155
156 /*
157 We are going to loop in all sessions of the group commit in order to avoid
158 being taking and releasing the global_sid_lock and sidno_lock for each
159 session.
160 */
161 DEBUG_SYNC(first_thd, "update_gtid_state_before_global_sid_lock");
162 global_sid_lock->rdlock();
163 DEBUG_SYNC(first_thd, "update_gtid_state_after_global_sid_lock");
164
165 update_gtids_impl_lock_sidnos(first_thd);
166
167 for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
168 {
169 bool is_commit= (thd->commit_error != THD::CE_COMMIT_ERROR);
170
171 if (update_gtids_impl_do_nothing(thd) ||
172 (!is_commit && update_gtids_impl_check_skip_gtid_rollback(thd)))
173 continue;
174
175 bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
176
177 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
178 {
179 update_gtids_impl_own_gtid_set(thd, is_commit);
180 }
181 else if (thd->owned_gtid.sidno > 0)
182 {
183 update_gtids_impl_own_gtid(thd, is_commit);
184 }
185 else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
186 {
187 update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
188 }
189 else
190 {
191 update_gtids_impl_own_nothing(thd);
192 }
193
194 update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
195 }
196
197 update_gtids_impl_broadcast_and_unlock_sidnos();
198
199 global_sid_lock->unlock();
200
201 DBUG_VOID_RETURN;
202 }
203
update_on_commit(THD * thd)204 void Gtid_state::update_on_commit(THD *thd)
205 {
206 DBUG_ENTER("Gtid_state::update_on_commit");
207
208 update_gtids_impl(thd, true);
209 DEBUG_SYNC(thd, "end_of_gtid_state_update_on_commit");
210
211 DBUG_VOID_RETURN;
212 }
213
214
update_on_rollback(THD * thd)215 void Gtid_state::update_on_rollback(THD *thd)
216 {
217 DBUG_ENTER("Gtid_state::update_on_rollback");
218
219 if (!update_gtids_impl_check_skip_gtid_rollback(thd))
220 update_gtids_impl(thd, false);
221
222 DBUG_VOID_RETURN;
223 }
224
225
update_gtids_impl(THD * thd,bool is_commit)226 void Gtid_state::update_gtids_impl(THD *thd, bool is_commit)
227 {
228 DBUG_ENTER("Gtid_state::update_gtids_impl");
229
230 if (update_gtids_impl_do_nothing(thd))
231 DBUG_VOID_RETURN;
232
233 bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
234
235 DEBUG_SYNC(thd, "update_gtid_state_before_global_sid_lock");
236 global_sid_lock->rdlock();
237 DEBUG_SYNC(thd, "update_gtid_state_after_global_sid_lock");
238
239 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
240 {
241 update_gtids_impl_own_gtid_set(thd, is_commit);
242 }
243 else if (thd->owned_gtid.sidno > 0)
244 {
245 rpl_sidno sidno= thd->owned_gtid.sidno;
246 update_gtids_impl_lock_sidno(sidno);
247 update_gtids_impl_own_gtid(thd, is_commit);
248 update_gtids_impl_broadcast_and_unlock_sidno(sidno);
249 }
250 else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
251 {
252 update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
253 }
254 else
255 {
256 update_gtids_impl_own_nothing(thd);
257 }
258
259 global_sid_lock->unlock();
260
261 update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
262
263 thd->owned_gtid.dbug_print(NULL,
264 "set owned_gtid (clear) in update_gtids_impl");
265
266 DBUG_VOID_RETURN;
267 }
268
269
end_gtid_violating_transaction(THD * thd)270 void Gtid_state::end_gtid_violating_transaction(THD *thd)
271 {
272 DBUG_ENTER("end_gtid_violating_transaction");
273 if (thd->has_gtid_consistency_violation)
274 {
275 if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
276 end_automatic_gtid_violating_transaction();
277 else
278 {
279 DBUG_ASSERT(thd->variables.gtid_next.type == ANONYMOUS_GROUP);
280 end_anonymous_gtid_violating_transaction();
281 }
282 thd->has_gtid_consistency_violation= false;
283 }
284 DBUG_VOID_RETURN;
285 }
286
287
wait_for_sidno(THD * thd,rpl_sidno sidno,struct timespec * abstime)288 bool Gtid_state::wait_for_sidno(THD *thd, rpl_sidno sidno,
289 struct timespec *abstime)
290 {
291 DBUG_ENTER("wait_for_sidno");
292 PSI_stage_info old_stage;
293 sid_lock->assert_some_lock();
294 sid_locks.assert_owner(sidno);
295 sid_locks.enter_cond(thd, sidno,
296 &stage_waiting_for_gtid_to_be_committed,
297 &old_stage);
298 bool ret= sid_locks.wait(thd, sidno, abstime);
299 // Can't call sid_locks.unlock() as that requires global_sid_lock.
300 mysql_mutex_unlock(thd->current_mutex);
301 thd->EXIT_COND(&old_stage);
302 DBUG_RETURN(ret);
303 }
304
305
wait_for_gtid(THD * thd,const Gtid & gtid,struct timespec * abstime)306 bool Gtid_state::wait_for_gtid(THD *thd, const Gtid >id,
307 struct timespec *abstime)
308 {
309 DBUG_ENTER("Gtid_state::wait_for_gtid");
310 DBUG_PRINT("info", ("SIDNO=%d GNO=%lld thread_id=%u",
311 gtid.sidno, gtid.gno,
312 thd->thread_id()));
313 DBUG_ASSERT(!owned_gtids.is_owned_by(gtid, thd->thread_id()));
314 DBUG_ASSERT(!owned_gtids.is_owned_by(gtid, 0));
315
316 bool ret= wait_for_sidno(thd, gtid.sidno, abstime);
317 DBUG_RETURN(ret);
318 }
319
320
wait_for_gtid_set(THD * thd,Gtid_set * wait_for,double timeout)321 bool Gtid_state::wait_for_gtid_set(THD *thd, Gtid_set* wait_for,
322 double timeout)
323 {
324 struct timespec abstime;
325 DBUG_ENTER("Gtid_state::wait_for_gtid_set");
326 DEBUG_SYNC(thd, "begin_wait_for_executed_gtid_set");
327 wait_for->dbug_print("Waiting for");
328 DBUG_PRINT("info", ("Timeout %f", timeout));
329
330 global_sid_lock->assert_some_rdlock();
331
332 DBUG_ASSERT(wait_for->get_sid_map() == global_sid_map);
333
334 if (timeout > 0) {
335 set_timespec_nsec(&abstime,
336 static_cast<ulonglong>(timeout * 1000000000ULL));
337 }
338
339 /*
340 Algorithm:
341
342 Let 'todo' contain the GTIDs to wait for. Iterate over SIDNOs in
343 'todo' (this is the 'for' loop below).
344
345 For each SIDNO in 'todo', remove gtid_executed for that SIDNO from
346 'todo'. If, after this removal, there is still some interval for
347 this SIDNO in 'todo', then wait for a signal on this SIDNO.
348 Repeat this step until 'todo' is empty for this SIDNO (this is the
349 innermost 'while' loop below).
350
351 Once the loop over SIDNOs has completed, 'todo' is guaranteed to
352 be empty. However, it may still be the case that not all GTIDs of
353 wait_for are included in gtid_executed, since RESET MASTER may
354 have been executed while we were waiting.
355
356 RESET MASTER requires global_sid_lock.wrlock. We hold
357 global_sid_lock.rdlock while removing GTIDs from 'todo', but the
358 wait operation releases global_sid_lock.rdlock. So if we
359 completed the 'for' loop without waiting, we know for sure that
360 global_sid_lock.rdlock was held while emptying 'todo', and thus
361 RESET MASTER cannot have executed in the meantime. But if we
362 waited at some point during the execution of the 'for' loop, RESET
363 MASTER may have been called. Thus, we repeatedly run 'for' loop
364 until it completes without waiting (this is the outermost 'while'
365 loop).
366 */
367
368 // Will be true once the entire 'for' loop completes without waiting.
369 bool verified= false;
370
371 // The set of GTIDs that we are still waiting for.
372 Gtid_set todo(global_sid_map, NULL);
373 // As an optimization, add 100 Intervals that do not need to be
374 // allocated. This avoids allocation of these intervals.
375 static const int preallocated_interval_count= 100;
376 Gtid_set::Interval ivs[preallocated_interval_count];
377 todo.add_interval_memory(preallocated_interval_count, ivs);
378
379 /*
380 Iterate until we have verified that all GTIDs in the set are
381 included in gtid_executed.
382 */
383 while (!verified)
384 {
385 todo.add_gtid_set(wait_for);
386
387 // Iterate over SIDNOs until all GTIDs have been removed from 'todo'.
388
389 // Set 'verified' to true; it will be set to 'false' if any wait
390 // is done.
391 verified= true;
392 for (int sidno= 1; sidno <= todo.get_max_sidno(); sidno++)
393 {
394 // Iterate until 'todo' is empty for this SIDNO.
395 while (todo.contains_sidno(sidno))
396 {
397 lock_sidno(sidno);
398 todo.remove_intervals_for_sidno(&executed_gtids, sidno);
399
400 if (todo.contains_sidno(sidno))
401 {
402 bool ret= wait_for_sidno(thd, sidno, timeout > 0 ? &abstime : NULL);
403
404 // wait_for_gtid will release both the global lock and the
405 // mutex. Acquire the global lock again.
406 global_sid_lock->rdlock();
407 verified= false;
408
409 if (thd->killed)
410 {
411 switch (thd->killed)
412 {
413 case ER_SERVER_SHUTDOWN:
414 case ER_QUERY_INTERRUPTED:
415 case ER_QUERY_TIMEOUT:
416 my_error(thd->killed, MYF(0));
417 break;
418 default:
419 my_error(ER_QUERY_INTERRUPTED, MYF(0));
420 break;
421 }
422 DBUG_RETURN(true);
423 }
424
425 if (ret)
426 DBUG_RETURN(true);
427 }
428 else
429 {
430 // Keep the global lock since it may be needed in a later
431 // iteration of the for loop.
432 unlock_sidno(sidno);
433 break;
434 }
435 }
436 }
437 }
438 DBUG_RETURN(false);
439 }
440
get_automatic_gno(rpl_sidno sidno) const441 rpl_gno Gtid_state::get_automatic_gno(rpl_sidno sidno) const
442 {
443 DBUG_ENTER("Gtid_state::get_automatic_gno");
444 Gtid_set::Const_interval_iterator ivit(&executed_gtids, sidno);
445 /*
446 When assigning new automatic GTIDs, we can optimize the assignment by start
447 searching an available GNO from the "supposed" next free one instead of
448 starting from 1.
449
450 This is useful mostly on systems having many transactions committing in
451 group asking for automatic GTIDs. When a GNO is assigned to be owned by a
452 transaction, it is not removed from the free intervals, but will be added
453 to the owned_gtids set. In this way, picking up the actual first free GNO
454 would often lead to getting a GNO already owned by other thread. This can
455 lead to many "tries" of getting a free and not owned yet GNO (a thread
456 would try N times, N being the sum of transactions in the FLUSH stage plus
457 the transactions in the COMMIT stage that didn't released their ownership
458 yet).
459
460 The optimization just set next_free_gno variable to the last assigned
461 GNO + 1, as this would be the common case without having transactions
462 rolling back. This is done at Gtid_state::generate_automatic_gtid.
463
464 In order to fill the gaps of GTID_EXECUTED when a transaction rolls back
465 releasing the ownership of a GTID, we check if the released GNO is smaller
466 than the next_free_gno at Gtid_state::update_gtids_impl_own_gtid function
467 to set next_free_gno with the "released" GNO in this case.
468 */
469 Gtid next_candidate= { sidno,
470 sidno == get_server_sidno() ? next_free_gno : 1 };
471 while (true)
472 {
473 const Gtid_set::Interval *iv= ivit.get();
474 rpl_gno next_interval_start= iv != NULL ? iv->start : MAX_GNO;
475 while (next_candidate.gno < next_interval_start &&
476 DBUG_EVALUATE_IF("simulate_gno_exhausted", false, true))
477 {
478 DBUG_PRINT("debug",("Checking availability of gno= %llu", next_candidate.gno));
479 if (owned_gtids.is_owned_by(next_candidate, 0))
480 DBUG_RETURN(next_candidate.gno);
481 next_candidate.gno++;
482 }
483 if (iv == NULL ||
484 DBUG_EVALUATE_IF("simulate_gno_exhausted", true, false))
485 {
486 my_error(ER_GNO_EXHAUSTED, MYF(0));
487 DBUG_RETURN(-1);
488 }
489 if (next_candidate.gno <= iv->end)
490 next_candidate.gno= iv->end;
491 ivit.next();
492 }
493 }
494
495
get_last_executed_gno(rpl_sidno sidno) const496 rpl_gno Gtid_state::get_last_executed_gno(rpl_sidno sidno) const
497 {
498 DBUG_ENTER("Gtid:state::get_last_executed_gno");
499 rpl_gno gno= 0;
500
501 gtid_state->lock_sidno(sidno);
502 gno= executed_gtids.get_last_gno(sidno);
503 gtid_state->unlock_sidno(sidno);
504
505 DBUG_RETURN(gno);
506 }
507
508
generate_automatic_gtid(THD * thd,rpl_sidno specified_sidno,rpl_gno specified_gno,rpl_sidno * locked_sidno)509 enum_return_status Gtid_state::generate_automatic_gtid(THD *thd,
510 rpl_sidno specified_sidno,
511 rpl_gno specified_gno,
512 rpl_sidno *locked_sidno)
513 {
514 DBUG_ENTER("Gtid_state::generate_automatic_gtid");
515 enum_return_status ret= RETURN_STATUS_OK;
516
517 DBUG_ASSERT(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
518 DBUG_ASSERT(specified_sidno >= 0);
519 DBUG_ASSERT(specified_gno >= 0);
520 DBUG_ASSERT(thd->owned_gtid.is_empty());
521
522 bool locked_sidno_was_passed_null = (locked_sidno == NULL);
523
524 if (locked_sidno_was_passed_null)
525 sid_lock->rdlock();
526 else
527 /* The caller must lock the sid_lock when locked_sidno is passed */
528 sid_lock->assert_some_lock();
529
530 // If GTID_MODE = ON_PERMISSIVE or ON, generate a new GTID
531 if (get_gtid_mode(GTID_MODE_LOCK_SID) >= GTID_MODE_ON_PERMISSIVE)
532 {
533 Gtid automatic_gtid= { specified_sidno, specified_gno };
534
535 if (automatic_gtid.sidno == 0)
536 automatic_gtid.sidno= get_server_sidno();
537
538 /*
539 We need to lock the sidno if locked_sidno wasn't passed as paramenter
540 or the already locked sidno doesn't match the one to generate the new
541 automatic GTID.
542 */
543 bool need_to_lock_sidno= (locked_sidno_was_passed_null ||
544 *locked_sidno != automatic_gtid.sidno);
545 if (need_to_lock_sidno)
546 {
547 /*
548 When locked_sidno contains a value greater than zero we must release
549 the current locked sidno. This should not happen with current code, as
550 the server only generates automatic GTIDs with server's UUID as sid.
551 */
552 if (!locked_sidno_was_passed_null && *locked_sidno != 0)
553 unlock_sidno(*locked_sidno);
554 lock_sidno(automatic_gtid.sidno);
555 /* Update the locked_sidno, so the caller would know what to unlock */
556 if (!locked_sidno_was_passed_null)
557 *locked_sidno= automatic_gtid.sidno;
558 }
559
560 if (automatic_gtid.gno == 0)
561 {
562 automatic_gtid.gno= get_automatic_gno(automatic_gtid.sidno);
563 if (automatic_gtid.sidno == get_server_sidno() &&
564 automatic_gtid.gno != -1)
565 next_free_gno= automatic_gtid.gno + 1;
566 }
567
568 if (automatic_gtid.gno != -1)
569 acquire_ownership(thd, automatic_gtid);
570 else
571 ret= RETURN_STATUS_REPORTED_ERROR;
572
573 /* The caller will unlock the sidno_lock if locked_sidno was passed */
574 if (locked_sidno_was_passed_null)
575 unlock_sidno(automatic_gtid.sidno);
576
577 }
578 else
579 {
580 // If GTID_MODE = OFF or OFF_PERMISSIVE, just mark this thread as
581 // using an anonymous transaction.
582 thd->owned_gtid.sidno= THD::OWNED_SIDNO_ANONYMOUS;
583 thd->owned_gtid.gno= 0;
584 acquire_anonymous_ownership();
585 thd->owned_gtid.dbug_print(NULL,
586 "set owned_gtid (anonymous) in generate_automatic_gtid");
587 }
588
589 /* The caller will unlock the sid_lock if locked_sidno was passed */
590 if (locked_sidno_was_passed_null)
591 sid_lock->unlock();
592
593 gtid_set_performance_schema_values(thd);
594
595 DBUG_RETURN(ret);
596 }
597
598
lock_sidnos(const Gtid_set * gs)599 void Gtid_state::lock_sidnos(const Gtid_set *gs)
600 {
601 DBUG_ASSERT(gs);
602 rpl_sidno max_sidno= gs->get_max_sidno();
603 for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
604 if (gs->contains_sidno(sidno))
605 lock_sidno(sidno);
606 }
607
608
unlock_sidnos(const Gtid_set * gs)609 void Gtid_state::unlock_sidnos(const Gtid_set *gs)
610 {
611 DBUG_ASSERT(gs);
612 rpl_sidno max_sidno= gs->get_max_sidno();
613 for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
614 if (gs->contains_sidno(sidno))
615 unlock_sidno(sidno);
616 }
617
618
broadcast_sidnos(const Gtid_set * gs)619 void Gtid_state::broadcast_sidnos(const Gtid_set *gs)
620 {
621 DBUG_ASSERT(gs);
622 rpl_sidno max_sidno= gs->get_max_sidno();
623 for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
624 if (gs->contains_sidno(sidno))
625 broadcast_sidno(sidno);
626 }
627
628
ensure_sidno()629 enum_return_status Gtid_state::ensure_sidno()
630 {
631 DBUG_ENTER("Gtid_state::ensure_sidno");
632 sid_lock->assert_some_wrlock();
633 rpl_sidno sidno= sid_map->get_max_sidno();
634 if (sidno > 0)
635 {
636 // The lock may be temporarily released during one of the calls to
637 // ensure_sidno or ensure_index. Hence, we must re-check the
638 // condition after the calls.
639 PROPAGATE_REPORTED_ERROR(executed_gtids.ensure_sidno(sidno));
640 PROPAGATE_REPORTED_ERROR(gtids_only_in_table.ensure_sidno(sidno));
641 PROPAGATE_REPORTED_ERROR(previous_gtids_logged.ensure_sidno(sidno));
642 PROPAGATE_REPORTED_ERROR(lost_gtids.ensure_sidno(sidno));
643 PROPAGATE_REPORTED_ERROR(owned_gtids.ensure_sidno(sidno));
644 PROPAGATE_REPORTED_ERROR(sid_locks.ensure_index(sidno));
645 PROPAGATE_REPORTED_ERROR(ensure_commit_group_sidnos(sidno));
646 sidno= sid_map->get_max_sidno();
647 DBUG_ASSERT(executed_gtids.get_max_sidno() >= sidno);
648 DBUG_ASSERT(gtids_only_in_table.get_max_sidno() >= sidno);
649 DBUG_ASSERT(previous_gtids_logged.get_max_sidno() >= sidno);
650 DBUG_ASSERT(lost_gtids.get_max_sidno() >= sidno);
651 DBUG_ASSERT(owned_gtids.get_max_sidno() >= sidno);
652 DBUG_ASSERT(sid_locks.get_max_index() >= sidno);
653 DBUG_ASSERT(commit_group_sidnos.size() >= (unsigned int)sidno);
654 }
655 RETURN_OK;
656 }
657
658
add_lost_gtids(const Gtid_set * gtid_set)659 enum_return_status Gtid_state::add_lost_gtids(const Gtid_set *gtid_set)
660 {
661 DBUG_ENTER("Gtid_state::add_lost_gtids()");
662 sid_lock->assert_some_wrlock();
663
664 gtid_set->dbug_print("add_lost_gtids");
665
666 if (!executed_gtids.is_empty())
667 {
668 BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY)),
669 (ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY,
670 MYF(0)));
671 RETURN_REPORTED_ERROR;
672 }
673 if (!owned_gtids.is_empty())
674 {
675 BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY)),
676 (ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY,
677 MYF(0)));
678 RETURN_REPORTED_ERROR;
679 }
680 DBUG_ASSERT(lost_gtids.is_empty());
681
682 if (save(gtid_set))
683 RETURN_REPORTED_ERROR;
684 PROPAGATE_REPORTED_ERROR(gtids_only_in_table.add_gtid_set(gtid_set));
685 PROPAGATE_REPORTED_ERROR(lost_gtids.add_gtid_set(gtid_set));
686 PROPAGATE_REPORTED_ERROR(executed_gtids.add_gtid_set(gtid_set));
687 lock_sidnos(gtid_set);
688 broadcast_sidnos(gtid_set);
689 unlock_sidnos(gtid_set);
690
691 DBUG_RETURN(RETURN_STATUS_OK);
692 }
693
694
init()695 int Gtid_state::init()
696 {
697 DBUG_ENTER("Gtid_state::init()");
698
699 global_sid_lock->assert_some_wrlock();
700
701 rpl_sid server_sid;
702 if (server_sid.parse(server_uuid) != 0)
703 DBUG_RETURN(1);
704 rpl_sidno sidno= sid_map->add_sid(server_sid);
705 if (sidno <= 0)
706 DBUG_RETURN(1);
707 server_sidno= sidno;
708 next_free_gno= 1;
709 DBUG_RETURN(0);
710 }
711
712
save(THD * thd)713 int Gtid_state::save(THD *thd)
714 {
715 DBUG_ENTER("Gtid_state::save(THD *thd)");
716 DBUG_ASSERT(gtid_table_persistor != NULL);
717 DBUG_ASSERT(thd->owned_gtid.sidno > 0);
718 int error= 0;
719
720 int ret= gtid_table_persistor->save(thd, &thd->owned_gtid);
721 if (1 == ret)
722 {
723 /*
724 Gtid table is not ready to be used, so failed to
725 open it. Ignore the error.
726 */
727 thd->clear_error();
728 if (!thd->get_stmt_da()->is_set())
729 thd->get_stmt_da()->set_ok_status(0, 0, NULL);
730 }
731 else if (-1 == ret)
732 error= -1;
733
734 DBUG_RETURN(error);
735 }
736
737
save(const Gtid_set * gtid_set)738 int Gtid_state::save(const Gtid_set *gtid_set)
739 {
740 DBUG_ENTER("Gtid_state::save(Gtid_set *gtid_set)");
741 int ret= gtid_table_persistor->save(gtid_set);
742 DBUG_RETURN(ret);
743 }
744
745
save_gtids_of_last_binlog_into_table(bool on_rotation)746 int Gtid_state::save_gtids_of_last_binlog_into_table(bool on_rotation)
747 {
748 DBUG_ENTER("Gtid_state::save_gtids_of_last_binlog_into_table");
749 int ret= 0;
750
751 /*
752 Use local Sid_map, so that we don't need a lock while inserting
753 into the table.
754 */
755 Sid_map sid_map(NULL);
756 Gtid_set logged_gtids_last_binlog(&sid_map, NULL);
757 // Allocate some intervals on stack to reduce allocation.
758 static const int PREALLOCATED_INTERVAL_COUNT= 64;
759 Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT];
760 logged_gtids_last_binlog.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv);
761 /*
762 logged_gtids_last_binlog= executed_gtids - previous_gtids_logged -
763 gtids_only_in_table
764 */
765 global_sid_lock->wrlock();
766 ret= (logged_gtids_last_binlog.add_gtid_set(&executed_gtids) !=
767 RETURN_STATUS_OK);
768 if (!ret)
769 {
770 logged_gtids_last_binlog.remove_gtid_set(&previous_gtids_logged);
771 logged_gtids_last_binlog.remove_gtid_set(>ids_only_in_table);
772 if (!logged_gtids_last_binlog.is_empty() ||
773 mysql_bin_log.is_rotating_caused_by_incident)
774 {
775 /* Prepare previous_gtids_logged for next binlog on binlog rotation */
776 if (on_rotation)
777 ret= previous_gtids_logged.add_gtid_set(&logged_gtids_last_binlog);
778 global_sid_lock->unlock();
779 /* Save set of GTIDs of the last binlog into gtid_executed table */
780 if (!ret)
781 ret= save(&logged_gtids_last_binlog);
782 }
783 else
784 global_sid_lock->unlock();
785 }
786 else
787 global_sid_lock->unlock();
788
789 DBUG_RETURN(ret);
790 }
791
792
read_gtid_executed_from_table()793 int Gtid_state::read_gtid_executed_from_table()
794 {
795 return gtid_table_persistor->fetch_gtids(&executed_gtids);
796 }
797
798
compress(THD * thd)799 int Gtid_state::compress(THD *thd)
800 {
801 return gtid_table_persistor->compress(thd);
802 }
803
804
805 #ifdef MYSQL_SERVER
warn_or_err_on_modify_gtid_table(THD * thd,TABLE_LIST * table)806 int Gtid_state::warn_or_err_on_modify_gtid_table(THD *thd, TABLE_LIST *table)
807 {
808 DBUG_ENTER("Gtid_state::warn_or_err_on_modify_gtid_table");
809 int ret=
810 gtid_table_persistor->warn_or_err_on_explicit_modification(thd, table);
811 DBUG_RETURN(ret);
812 }
813 #endif
814
update_gtids_impl_check_skip_gtid_rollback(THD * thd)815 bool Gtid_state::update_gtids_impl_check_skip_gtid_rollback(THD *thd)
816 {
817 if (thd->skip_gtid_rollback)
818 {
819 DBUG_PRINT("info", ("skipping gtid rollback because "
820 "thd->skip_gtid_rollback is set"));
821 return true;
822 }
823 return false;
824 }
825
update_gtids_impl_do_nothing(THD * thd)826 bool Gtid_state::update_gtids_impl_do_nothing(THD *thd)
827 {
828 if (thd->owned_gtid.is_empty() && !thd->has_gtid_consistency_violation)
829 {
830 if (thd->variables.gtid_next.type == GTID_GROUP)
831 thd->variables.gtid_next.set_undefined();
832 DBUG_PRINT("info", ("skipping update_gtids_impl because "
833 "thread does not own anything and does not violate "
834 "gtid consistency"));
835
836 return true;
837 }
838 return false;
839 }
840
update_gtids_impl_begin(THD * thd)841 bool Gtid_state::update_gtids_impl_begin(THD *thd)
842 {
843 #ifndef DBUG_OFF
844 if (current_thd != thd)
845 mysql_mutex_lock(&thd->LOCK_thd_query);
846 DBUG_PRINT("info", ("query='%s' thd->is_commit_in_middle_of_statement=%d",
847 thd->query().str,
848 thd->is_commit_in_middle_of_statement));
849 if (current_thd != thd)
850 mysql_mutex_unlock(&thd->LOCK_thd_query);
851 #endif
852 return thd->is_commit_in_middle_of_statement;
853 }
854
update_gtids_impl_own_gtid_set(THD * thd,bool is_commit)855 void Gtid_state::update_gtids_impl_own_gtid_set(THD *thd, bool is_commit)
856 {
857 #ifdef HAVE_GTID_NEXT_LIST
858 rpl_sidno prev_sidno= 0;
859 Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
860 Gtid g= git.get();
861 while (g.sidno != 0)
862 {
863 if (g.sidno != prev_sidno)
864 sid_locks.lock(g.sidno);
865 owned_gtids.remove_gtid(g);
866 git.next();
867 g= git.get();
868 if (is_commit)
869 executed_gtids._add_gtid(g);
870 }
871
872 if (is_commit && !thd->owned_gtid_set.is_empty())
873 thd->rpl_thd_ctx.session_gtids_ctx().
874 notify_after_gtid_executed_update(thd);
875
876 thd->variables.gtid_next.set_undefined();
877 thd->owned_gtid.dbug_print(NULL,
878 "set owned_gtid (clear; old was gtid_set) "
879 "in update_gtids_impl");
880 thd->clear_owned_gtids();
881 #else
882 DBUG_ASSERT(0);
883 #endif
884 }
885
update_gtids_impl_lock_sidno(rpl_sidno sidno)886 void Gtid_state::update_gtids_impl_lock_sidno(rpl_sidno sidno)
887 {
888 DBUG_ASSERT(sidno > 0);
889 DBUG_PRINT("info",("Locking sidno %d", sidno));
890 lock_sidno(sidno);
891 }
892
update_gtids_impl_lock_sidnos(THD * first_thd)893 void Gtid_state::update_gtids_impl_lock_sidnos(THD *first_thd)
894 {
895 /* Define which sidnos should be locked to be updated */
896 for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
897 {
898 if (thd->owned_gtid.sidno > 0)
899 {
900 DBUG_PRINT("info",("Setting sidno %d to be locked",
901 thd->owned_gtid.sidno));
902 commit_group_sidnos[thd->owned_gtid.sidno]= true;
903 }
904 else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
905 #ifdef HAVE_GTID_NEXT_LIST
906 for (rpl_sidno i= 1; i < thd->owned_gtid_set.max_sidno; i++)
907 if (owned_gtid_set.contains_sidno(i))
908 commit_group_sidnos[i]= true;
909 #else
910 DBUG_ASSERT(0);
911 #endif
912 }
913
914 /* Take the sidno_locks in order */
915 for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
916 if (commit_group_sidnos[i])
917 update_gtids_impl_lock_sidno(i);
918 }
919
update_gtids_impl_own_gtid(THD * thd,bool is_commit)920 void Gtid_state::update_gtids_impl_own_gtid(THD *thd, bool is_commit)
921 {
922 assert_sidno_lock_owner(thd->owned_gtid.sidno);
923 /*
924 In Group Replication the GTID may additionally be owned by another
925 thread, and we won't remove that ownership (it will be rolled back later)
926 */
927 DBUG_ASSERT(owned_gtids.is_owned_by(thd->owned_gtid, thd->thread_id()));
928 owned_gtids.remove_gtid(thd->owned_gtid, thd->thread_id());
929
930 if (is_commit)
931 {
932 DBUG_ASSERT(!executed_gtids.contains_gtid(thd->owned_gtid));
933 DBUG_EXECUTE_IF(
934 "rpl_gtid_update_on_commit_simulate_out_of_memory",
935 DBUG_SET("+d,rpl_gtid_get_free_interval_simulate_out_of_memory"););
936 /*
937 Any session adds transaction owned GTID into global executed_gtids.
938
939 If binlog is disabled, we report @@GLOBAL.GTID_PURGED from
940 executed_gtids, since @@GLOBAL.GTID_PURGED and @@GLOBAL.GTID_EXECUTED
941 are always same, so we did not save gtid into lost_gtids for every
942 transaction for improving performance.
943
944 If binlog is enabled and log_slave_updates is disabled, slave
945 SQL thread or slave worker thread adds transaction owned GTID
946 into global executed_gtids, lost_gtids and gtids_only_in_table.
947 */
948 executed_gtids._add_gtid(thd->owned_gtid);
949 thd->rpl_thd_ctx.session_gtids_ctx().
950 notify_after_gtid_executed_update(thd);
951 if (thd->slave_thread && opt_bin_log && !opt_log_slave_updates)
952 {
953 lost_gtids._add_gtid(thd->owned_gtid);
954 gtids_only_in_table._add_gtid(thd->owned_gtid);
955 }
956 }
957 else
958 {
959 if (thd->owned_gtid.sidno == server_sidno &&
960 next_free_gno > thd->owned_gtid.gno)
961 next_free_gno= thd->owned_gtid.gno;
962 }
963
964 thd->clear_owned_gtids();
965 if (thd->variables.gtid_next.type == GTID_GROUP)
966 {
967 DBUG_ASSERT(!thd->is_commit_in_middle_of_statement);
968 thd->variables.gtid_next.set_undefined();
969 }
970 else
971 {
972 /*
973 Can be UNDEFINED for statements where
974 gtid_pre_statement_checks skips the test for undefined,
975 e.g. ROLLBACK.
976 */
977 DBUG_ASSERT(thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
978 thd->variables.gtid_next.type == UNDEFINED_GROUP);
979 }
980 }
981
update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)982 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)
983 {
984 DBUG_PRINT("info",("Unlocking sidno %d", sidno));
985 broadcast_sidno(sidno);
986 unlock_sidno(sidno);
987 }
988
update_gtids_impl_broadcast_and_unlock_sidnos()989 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidnos()
990 {
991 for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
992 if (commit_group_sidnos[i])
993 {
994 update_gtids_impl_broadcast_and_unlock_sidno(i);
995 commit_group_sidnos[i]= false;
996 }
997 }
998
update_gtids_impl_own_anonymous(THD * thd,bool * more_trx)999 void Gtid_state::update_gtids_impl_own_anonymous(THD* thd,
1000 bool *more_trx)
1001 {
1002 DBUG_ASSERT(thd->variables.gtid_next.type == ANONYMOUS_GROUP ||
1003 thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1004 /*
1005 If there is more in the transaction cache, set more_trx to indicate this.
1006
1007 See comment for the update_gtids_impl_begin function.
1008 */
1009 if (opt_bin_log)
1010 {
1011 // Needed before is_binlog_cache_empty.
1012 thd->binlog_setup_trx_data();
1013 if (!thd->is_binlog_cache_empty(true))
1014 {
1015 *more_trx= true;
1016 DBUG_PRINT("info", ("Transaction cache is non-empty: setting "
1017 "more_transaction_with_same_gtid_next="
1018 "true."));
1019 }
1020 }
1021 if (!(*more_trx &&
1022 thd->variables.gtid_next.type == ANONYMOUS_GROUP))
1023 {
1024 release_anonymous_ownership();
1025 thd->clear_owned_gtids();
1026 }
1027 }
1028
update_gtids_impl_own_nothing(THD * thd)1029 void Gtid_state::update_gtids_impl_own_nothing(THD *thd)
1030 {
1031 DBUG_ASSERT(thd->commit_error != THD::CE_COMMIT_ERROR ||
1032 thd->has_gtid_consistency_violation);
1033 DBUG_ASSERT(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1034 }
1035
update_gtids_impl_end(THD * thd,bool more_trx)1036 void Gtid_state::update_gtids_impl_end(THD *thd, bool more_trx)
1037 {
1038 if (!more_trx)
1039 end_gtid_violating_transaction(thd);
1040 }
1041
ensure_commit_group_sidnos(rpl_sidno sidno)1042 enum_return_status Gtid_state::ensure_commit_group_sidnos(rpl_sidno sidno)
1043 {
1044 DBUG_ENTER("Gtid_state::ensure_commit_group_sidnos");
1045 sid_lock->assert_some_wrlock();
1046 /*
1047 As we use the sidno as index of commit_group_sidnos and there is no
1048 sidno=0, the array size must be at least sidno + 1.
1049 */
1050 while ((commit_group_sidnos.size()) < (size_t)sidno + 1)
1051 {
1052 if (commit_group_sidnos.push_back(false))
1053 goto error;
1054 }
1055 RETURN_OK;
1056 error:
1057 BINLOG_ERROR(("Out of memory."), (ER_OUT_OF_RESOURCES, MYF(0)));
1058 RETURN_REPORTED_ERROR;
1059
1060 }
1061