1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 02110-1301 USA */
23
24 #include "rpl_gtid.h"
25
26 #include "rpl_gtid_persist.h" // gtid_table_persistor
27 #include "sql_class.h" // THD
28 #include "debug_sync.h" // DEBUG_SYNC
29 #include "binlog.h"
30
31 #include <vector>
32
33 PSI_memory_key key_memory_Gtid_state_group_commit_sidno;
34
clear(THD * thd)35 int Gtid_state::clear(THD *thd)
36 {
37 DBUG_ENTER("Gtid_state::clear()");
38 int ret= 0;
39 // the wrlock implies that no other thread can hold any of the mutexes
40 sid_lock->assert_some_wrlock();
41 lost_gtids.clear();
42 executed_gtids.clear();
43 gtids_only_in_table.clear();
44 previous_gtids_logged.clear();
45 /* Reset gtid_executed table. */
46 if ((ret= gtid_table_persistor->reset(thd)) == 1)
47 {
48 /*
49 Gtid table is not ready to be used, so failed to
50 open it. Ignore the error.
51 */
52 thd->clear_error();
53 ret= 0;
54 }
55 next_free_gno= 1;
56 DBUG_RETURN(ret);
57 }
58
59
acquire_ownership(THD * thd,const Gtid & gtid)60 enum_return_status Gtid_state::acquire_ownership(THD *thd, const Gtid >id)
61 {
62 DBUG_ENTER("Gtid_state::acquire_ownership");
63 // caller must take both global_sid_lock and lock on the SIDNO.
64 global_sid_lock->assert_some_lock();
65 gtid_state->assert_sidno_lock_owner(gtid.sidno);
66 assert(!executed_gtids.contains_gtid(gtid));
67 DBUG_PRINT("info", ("gtid=%d:%lld", gtid.sidno, gtid.gno));
68 assert(thd->owned_gtid.sidno == 0);
69 if (owned_gtids.add_gtid_owner(gtid, thd->thread_id()) != RETURN_STATUS_OK)
70 goto err;
71 if (thd->get_gtid_next_list() != NULL)
72 {
73 #ifdef HAVE_GTID_NEXT_LIST
74 thd->owned_gtid_set._add_gtid(gtid);
75 thd->owned_gtid.sidno= THD::OWNED_SIDNO_GTID_SET;
76 thd->owned_sid.clear();
77 #else
78 assert(0);
79 #endif
80 }
81 else
82 {
83 thd->owned_gtid= gtid;
84 thd->owned_gtid.dbug_print(NULL, "set owned_gtid in acquire_ownership");
85 thd->owned_sid= sid_map->sidno_to_sid(gtid.sidno);
86 }
87 RETURN_OK;
88 err:
89 if (thd->get_gtid_next_list() != NULL)
90 {
91 #ifdef HAVE_GTID_NEXT_LIST
92 Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
93 Gtid g= git.get();
94 while (g.sidno != 0)
95 {
96 owned_gtids.remove_gtid(g);
97 g= git.get();
98 }
99 #else
100 assert(0);
101 #endif
102 }
103 thd->clear_owned_gtids();
104 thd->owned_gtid.dbug_print(NULL,
105 "set owned_gtid (clear) in acquire_ownership");
106 RETURN_REPORTED_ERROR;
107 }
108
109 #ifdef HAVE_GTID_NEXT_LIST
lock_owned_sidnos(const THD * thd)110 void Gtid_state::lock_owned_sidnos(const THD *thd)
111 {
112 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
113 lock_sidnos(&thd->owned_gtid_set);
114 else if (thd->owned_gtid.sidno > 0)
115 lock_sidno(thd->owned_gtid.sidno);
116 }
117 #endif
118
119
unlock_owned_sidnos(const THD * thd)120 void Gtid_state::unlock_owned_sidnos(const THD *thd)
121 {
122 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
123 {
124 #ifdef HAVE_GTID_NEXT_LIST
125 unlock_sidnos(&thd->owned_gtid_set);
126 #else
127 assert(0);
128 #endif
129 }
130 else if (thd->owned_gtid.sidno > 0)
131 {
132 unlock_sidno(thd->owned_gtid.sidno);
133 }
134 }
135
136
broadcast_owned_sidnos(const THD * thd)137 void Gtid_state::broadcast_owned_sidnos(const THD *thd)
138 {
139 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
140 {
141 #ifdef HAVE_GTID_NEXT_LIST
142 broadcast_sidnos(&thd->owned_gtid_set);
143 #else
144 assert(0);
145 #endif
146 }
147 else if (thd->owned_gtid.sidno > 0)
148 {
149 broadcast_sidno(thd->owned_gtid.sidno);
150 }
151 }
152
get_snapshot_gtid_executed(std::string & snapshot_gtid_executed)153 void Gtid_state::get_snapshot_gtid_executed(
154 std::string &snapshot_gtid_executed)
155 {
156 global_sid_lock->wrlock();
157 size_t size= executed_gtids.get_string_length() + 1;
158 std::vector<char> buf(size);
159 executed_gtids.to_string(buf.data());
160 snapshot_gtid_executed= buf.data();
161 global_sid_lock->unlock();
162 }
163
update_commit_group(THD * first_thd)164 void Gtid_state::update_commit_group(THD *first_thd)
165 {
166 DBUG_ENTER("Gtid_state::update_commit_group");
167
168 /*
169 We are going to loop in all sessions of the group commit in order to avoid
170 being taking and releasing the global_sid_lock and sidno_lock for each
171 session.
172 */
173 DEBUG_SYNC(first_thd, "update_gtid_state_before_global_sid_lock");
174 global_sid_lock->rdlock();
175 DEBUG_SYNC(first_thd, "update_gtid_state_after_global_sid_lock");
176
177 update_gtids_impl_lock_sidnos(first_thd);
178
179 for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
180 {
181 bool is_commit= (thd->commit_error != THD::CE_COMMIT_ERROR);
182
183 if (update_gtids_impl_do_nothing(thd) ||
184 (!is_commit && update_gtids_impl_check_skip_gtid_rollback(thd)))
185 continue;
186
187 bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
188
189 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
190 {
191 update_gtids_impl_own_gtid_set(thd, is_commit);
192 }
193 else if (thd->owned_gtid.sidno > 0)
194 {
195 update_gtids_impl_own_gtid(thd, is_commit);
196 }
197 else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
198 {
199 update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
200 }
201 else
202 {
203 update_gtids_impl_own_nothing(thd);
204 }
205
206 update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
207 }
208
209 update_gtids_impl_broadcast_and_unlock_sidnos();
210
211 global_sid_lock->unlock();
212
213 DBUG_VOID_RETURN;
214 }
215
update_on_commit(THD * thd)216 void Gtid_state::update_on_commit(THD *thd)
217 {
218 DBUG_ENTER("Gtid_state::update_on_commit");
219
220 update_gtids_impl(thd, true);
221 DEBUG_SYNC(thd, "end_of_gtid_state_update_on_commit");
222
223 DBUG_VOID_RETURN;
224 }
225
226
update_on_rollback(THD * thd)227 void Gtid_state::update_on_rollback(THD *thd)
228 {
229 DBUG_ENTER("Gtid_state::update_on_rollback");
230
231 if (!update_gtids_impl_check_skip_gtid_rollback(thd))
232 update_gtids_impl(thd, false);
233
234 DBUG_VOID_RETURN;
235 }
236
237
update_gtids_impl(THD * thd,bool is_commit)238 void Gtid_state::update_gtids_impl(THD *thd, bool is_commit)
239 {
240 DBUG_ENTER("Gtid_state::update_gtids_impl");
241
242 if (update_gtids_impl_do_nothing(thd))
243 DBUG_VOID_RETURN;
244
245 bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
246
247 DEBUG_SYNC(thd, "update_gtid_state_before_global_sid_lock");
248 global_sid_lock->rdlock();
249 DEBUG_SYNC(thd, "update_gtid_state_after_global_sid_lock");
250
251 if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
252 {
253 update_gtids_impl_own_gtid_set(thd, is_commit);
254 }
255 else if (thd->owned_gtid.sidno > 0)
256 {
257 rpl_sidno sidno= thd->owned_gtid.sidno;
258 update_gtids_impl_lock_sidno(sidno);
259 update_gtids_impl_own_gtid(thd, is_commit);
260 update_gtids_impl_broadcast_and_unlock_sidno(sidno);
261 }
262 else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
263 {
264 update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
265 }
266 else
267 {
268 update_gtids_impl_own_nothing(thd);
269 }
270
271 global_sid_lock->unlock();
272
273 update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
274
275 thd->owned_gtid.dbug_print(NULL,
276 "set owned_gtid (clear) in update_gtids_impl");
277
278 DBUG_VOID_RETURN;
279 }
280
281
end_gtid_violating_transaction(THD * thd)282 void Gtid_state::end_gtid_violating_transaction(THD *thd)
283 {
284 DBUG_ENTER("end_gtid_violating_transaction");
285 if (thd->has_gtid_consistency_violation)
286 {
287 if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
288 end_automatic_gtid_violating_transaction();
289 else
290 {
291 assert(thd->variables.gtid_next.type == ANONYMOUS_GROUP);
292 end_anonymous_gtid_violating_transaction();
293 }
294 thd->has_gtid_consistency_violation= false;
295 }
296 DBUG_VOID_RETURN;
297 }
298
299
wait_for_sidno(THD * thd,rpl_sidno sidno,struct timespec * abstime)300 bool Gtid_state::wait_for_sidno(THD *thd, rpl_sidno sidno,
301 struct timespec *abstime)
302 {
303 DBUG_ENTER("wait_for_sidno");
304 PSI_stage_info old_stage;
305 sid_lock->assert_some_lock();
306 sid_locks.assert_owner(sidno);
307 sid_locks.enter_cond(thd, sidno,
308 &stage_waiting_for_gtid_to_be_committed,
309 &old_stage);
310 bool ret= sid_locks.wait(thd, sidno, abstime);
311 // Can't call sid_locks.unlock() as that requires global_sid_lock.
312 mysql_mutex_unlock(thd->current_mutex);
313 thd->EXIT_COND(&old_stage);
314 DBUG_RETURN(ret);
315 }
316
317
wait_for_gtid(THD * thd,const Gtid & gtid,struct timespec * abstime)318 bool Gtid_state::wait_for_gtid(THD *thd, const Gtid >id,
319 struct timespec *abstime)
320 {
321 DBUG_ENTER("Gtid_state::wait_for_gtid");
322 DBUG_PRINT("info", ("SIDNO=%d GNO=%lld thread_id=%u",
323 gtid.sidno, gtid.gno,
324 thd->thread_id()));
325 assert(!owned_gtids.is_owned_by(gtid, thd->thread_id()));
326 assert(!owned_gtids.is_owned_by(gtid, 0));
327
328 bool ret= wait_for_sidno(thd, gtid.sidno, abstime);
329 DBUG_RETURN(ret);
330 }
331
332
wait_for_gtid_set(THD * thd,Gtid_set * wait_for,double timeout)333 bool Gtid_state::wait_for_gtid_set(THD *thd, Gtid_set* wait_for,
334 double timeout)
335 {
336 struct timespec abstime;
337 DBUG_ENTER("Gtid_state::wait_for_gtid_set");
338 DEBUG_SYNC(thd, "begin_wait_for_executed_gtid_set");
339 wait_for->dbug_print("Waiting for");
340 DBUG_PRINT("info", ("Timeout %f", timeout));
341
342 global_sid_lock->assert_some_rdlock();
343
344 assert(wait_for->get_sid_map() == global_sid_map);
345
346 if (timeout > 0) {
347 set_timespec_nsec(&abstime,
348 static_cast<ulonglong>(timeout * 1000000000ULL));
349 }
350
351 /*
352 Algorithm:
353
354 Let 'todo' contain the GTIDs to wait for. Iterate over SIDNOs in
355 'todo' (this is the 'for' loop below).
356
357 For each SIDNO in 'todo', remove gtid_executed for that SIDNO from
358 'todo'. If, after this removal, there is still some interval for
359 this SIDNO in 'todo', then wait for a signal on this SIDNO.
360 Repeat this step until 'todo' is empty for this SIDNO (this is the
361 innermost 'while' loop below).
362
363 Once the loop over SIDNOs has completed, 'todo' is guaranteed to
364 be empty. However, it may still be the case that not all GTIDs of
365 wait_for are included in gtid_executed, since RESET MASTER may
366 have been executed while we were waiting.
367
368 RESET MASTER requires global_sid_lock.wrlock. We hold
369 global_sid_lock.rdlock while removing GTIDs from 'todo', but the
370 wait operation releases global_sid_lock.rdlock. So if we
371 completed the 'for' loop without waiting, we know for sure that
372 global_sid_lock.rdlock was held while emptying 'todo', and thus
373 RESET MASTER cannot have executed in the meantime. But if we
374 waited at some point during the execution of the 'for' loop, RESET
375 MASTER may have been called. Thus, we repeatedly run 'for' loop
376 until it completes without waiting (this is the outermost 'while'
377 loop).
378 */
379
380 // Will be true once the entire 'for' loop completes without waiting.
381 bool verified= false;
382
383 // The set of GTIDs that we are still waiting for.
384 Gtid_set todo(global_sid_map, NULL);
385 // As an optimization, add 100 Intervals that do not need to be
386 // allocated. This avoids allocation of these intervals.
387 static const int preallocated_interval_count= 100;
388 Gtid_set::Interval ivs[preallocated_interval_count];
389 todo.add_interval_memory(preallocated_interval_count, ivs);
390
391 /*
392 Iterate until we have verified that all GTIDs in the set are
393 included in gtid_executed.
394 */
395 while (!verified)
396 {
397 todo.add_gtid_set(wait_for);
398
399 // Iterate over SIDNOs until all GTIDs have been removed from 'todo'.
400
401 // Set 'verified' to true; it will be set to 'false' if any wait
402 // is done.
403 verified= true;
404 for (int sidno= 1; sidno <= todo.get_max_sidno(); sidno++)
405 {
406 // Iterate until 'todo' is empty for this SIDNO.
407 while (todo.contains_sidno(sidno))
408 {
409 lock_sidno(sidno);
410 todo.remove_intervals_for_sidno(&executed_gtids, sidno);
411
412 if (todo.contains_sidno(sidno))
413 {
414 bool ret= wait_for_sidno(thd, sidno, timeout > 0 ? &abstime : NULL);
415
416 // wait_for_gtid will release both the global lock and the
417 // mutex. Acquire the global lock again.
418 global_sid_lock->rdlock();
419 verified= false;
420
421 if (thd->killed)
422 {
423 switch (thd->killed)
424 {
425 case ER_SERVER_SHUTDOWN:
426 case ER_QUERY_INTERRUPTED:
427 case ER_QUERY_TIMEOUT:
428 my_error(thd->killed, MYF(0));
429 break;
430 default:
431 my_error(ER_QUERY_INTERRUPTED, MYF(0));
432 break;
433 }
434 DBUG_RETURN(true);
435 }
436
437 if (ret)
438 DBUG_RETURN(true);
439 }
440 else
441 {
442 // Keep the global lock since it may be needed in a later
443 // iteration of the for loop.
444 unlock_sidno(sidno);
445 break;
446 }
447 }
448 }
449 }
450 DBUG_RETURN(false);
451 }
452
get_automatic_gno(rpl_sidno sidno) const453 rpl_gno Gtid_state::get_automatic_gno(rpl_sidno sidno) const
454 {
455 DBUG_ENTER("Gtid_state::get_automatic_gno");
456 Gtid_set::Const_interval_iterator ivit(&executed_gtids, sidno);
457 /*
458 When assigning new automatic GTIDs, we can optimize the assignment by start
459 searching an available GNO from the "supposed" next free one instead of
460 starting from 1.
461
462 This is useful mostly on systems having many transactions committing in
463 group asking for automatic GTIDs. When a GNO is assigned to be owned by a
464 transaction, it is not removed from the free intervals, but will be added
465 to the owned_gtids set. In this way, picking up the actual first free GNO
466 would often lead to getting a GNO already owned by other thread. This can
467 lead to many "tries" of getting a free and not owned yet GNO (a thread
468 would try N times, N being the sum of transactions in the FLUSH stage plus
469 the transactions in the COMMIT stage that didn't released their ownership
470 yet).
471
472 The optimization just set next_free_gno variable to the last assigned
473 GNO + 1, as this would be the common case without having transactions
474 rolling back. This is done at Gtid_state::generate_automatic_gtid.
475
476 In order to fill the gaps of GTID_EXECUTED when a transaction rolls back
477 releasing the ownership of a GTID, we check if the released GNO is smaller
478 than the next_free_gno at Gtid_state::update_gtids_impl_own_gtid function
479 to set next_free_gno with the "released" GNO in this case.
480 */
481 Gtid next_candidate= { sidno,
482 sidno == get_server_sidno() ? next_free_gno : 1 };
483 while (true)
484 {
485 const Gtid_set::Interval *iv= ivit.get();
486 rpl_gno next_interval_start= iv != NULL ? iv->start : GNO_END;
487 while (next_candidate.gno < next_interval_start &&
488 DBUG_EVALUATE_IF("simulate_gno_exhausted", false, true))
489 {
490 DBUG_PRINT("debug",("Checking availability of gno= %llu", next_candidate.gno));
491 if (owned_gtids.is_owned_by(next_candidate, 0))
492 DBUG_RETURN(next_candidate.gno);
493 next_candidate.gno++;
494 }
495 if (iv == NULL ||
496 DBUG_EVALUATE_IF("simulate_gno_exhausted", true, false))
497 {
498 my_error(ER_GNO_EXHAUSTED, MYF(0));
499 DBUG_RETURN(-1);
500 }
501 if (next_candidate.gno < iv->end)
502 next_candidate.gno= iv->end;
503 ivit.next();
504 }
505 }
506
507
get_last_executed_gno(rpl_sidno sidno) const508 rpl_gno Gtid_state::get_last_executed_gno(rpl_sidno sidno) const
509 {
510 DBUG_ENTER("Gtid:state::get_last_executed_gno");
511 rpl_gno gno= 0;
512
513 gtid_state->lock_sidno(sidno);
514 gno= executed_gtids.get_last_gno(sidno);
515 gtid_state->unlock_sidno(sidno);
516
517 DBUG_RETURN(gno);
518 }
519
520
generate_automatic_gtid(THD * thd,rpl_sidno specified_sidno,rpl_gno specified_gno,rpl_sidno * locked_sidno)521 enum_return_status Gtid_state::generate_automatic_gtid(THD *thd,
522 rpl_sidno specified_sidno,
523 rpl_gno specified_gno,
524 rpl_sidno *locked_sidno)
525 {
526 DBUG_ENTER("Gtid_state::generate_automatic_gtid");
527 enum_return_status ret= RETURN_STATUS_OK;
528
529 assert(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
530 assert(specified_sidno >= 0);
531 assert(specified_gno >= 0);
532 assert(thd->owned_gtid.is_empty());
533
534 bool locked_sidno_was_passed_null = (locked_sidno == NULL);
535
536 if (locked_sidno_was_passed_null)
537 sid_lock->rdlock();
538 else
539 /* The caller must lock the sid_lock when locked_sidno is passed */
540 sid_lock->assert_some_lock();
541
542 // If GTID_MODE = ON_PERMISSIVE or ON, generate a new GTID
543 if (get_gtid_mode(GTID_MODE_LOCK_SID) >= GTID_MODE_ON_PERMISSIVE)
544 {
545 Gtid automatic_gtid= { specified_sidno, specified_gno };
546
547 if (automatic_gtid.sidno == 0)
548 automatic_gtid.sidno= get_server_sidno();
549
550 /*
551 We need to lock the sidno if locked_sidno wasn't passed as paramenter
552 or the already locked sidno doesn't match the one to generate the new
553 automatic GTID.
554 */
555 bool need_to_lock_sidno= (locked_sidno_was_passed_null ||
556 *locked_sidno != automatic_gtid.sidno);
557 if (need_to_lock_sidno)
558 {
559 /*
560 When locked_sidno contains a value greater than zero we must release
561 the current locked sidno. This should not happen with current code, as
562 the server only generates automatic GTIDs with server's UUID as sid.
563 */
564 if (!locked_sidno_was_passed_null && *locked_sidno != 0)
565 unlock_sidno(*locked_sidno);
566 lock_sidno(automatic_gtid.sidno);
567 /* Update the locked_sidno, so the caller would know what to unlock */
568 if (!locked_sidno_was_passed_null)
569 *locked_sidno= automatic_gtid.sidno;
570 }
571
572 if (automatic_gtid.gno == 0)
573 {
574 automatic_gtid.gno= get_automatic_gno(automatic_gtid.sidno);
575 if (automatic_gtid.sidno == get_server_sidno() &&
576 automatic_gtid.gno != -1)
577 next_free_gno= automatic_gtid.gno + 1;
578 }
579
580 if (automatic_gtid.gno != -1)
581 acquire_ownership(thd, automatic_gtid);
582 else
583 ret= RETURN_STATUS_REPORTED_ERROR;
584
585 /* The caller will unlock the sidno_lock if locked_sidno was passed */
586 if (locked_sidno_was_passed_null)
587 unlock_sidno(automatic_gtid.sidno);
588
589 }
590 else
591 {
592 // If GTID_MODE = OFF or OFF_PERMISSIVE, just mark this thread as
593 // using an anonymous transaction.
594 thd->owned_gtid.sidno= THD::OWNED_SIDNO_ANONYMOUS;
595 thd->owned_gtid.gno= 0;
596 acquire_anonymous_ownership();
597 thd->owned_gtid.dbug_print(NULL,
598 "set owned_gtid (anonymous) in generate_automatic_gtid");
599 }
600
601 /* The caller will unlock the sid_lock if locked_sidno was passed */
602 if (locked_sidno_was_passed_null)
603 sid_lock->unlock();
604
605 gtid_set_performance_schema_values(thd);
606
607 DBUG_RETURN(ret);
608 }
609
610
lock_sidnos(const Gtid_set * gs)611 void Gtid_state::lock_sidnos(const Gtid_set *gs)
612 {
613 assert(gs);
614 rpl_sidno max_sidno= gs->get_max_sidno();
615 for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
616 if (gs->contains_sidno(sidno))
617 lock_sidno(sidno);
618 }
619
620
unlock_sidnos(const Gtid_set * gs)621 void Gtid_state::unlock_sidnos(const Gtid_set *gs)
622 {
623 assert(gs);
624 rpl_sidno max_sidno= gs->get_max_sidno();
625 for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
626 if (gs->contains_sidno(sidno))
627 unlock_sidno(sidno);
628 }
629
630
broadcast_sidnos(const Gtid_set * gs)631 void Gtid_state::broadcast_sidnos(const Gtid_set *gs)
632 {
633 assert(gs);
634 rpl_sidno max_sidno= gs->get_max_sidno();
635 for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
636 if (gs->contains_sidno(sidno))
637 broadcast_sidno(sidno);
638 }
639
640
ensure_sidno()641 enum_return_status Gtid_state::ensure_sidno()
642 {
643 DBUG_ENTER("Gtid_state::ensure_sidno");
644 sid_lock->assert_some_wrlock();
645 rpl_sidno sidno= sid_map->get_max_sidno();
646 if (sidno > 0)
647 {
648 // The lock may be temporarily released during one of the calls to
649 // ensure_sidno or ensure_index. Hence, we must re-check the
650 // condition after the calls.
651 PROPAGATE_REPORTED_ERROR(executed_gtids.ensure_sidno(sidno));
652 PROPAGATE_REPORTED_ERROR(gtids_only_in_table.ensure_sidno(sidno));
653 PROPAGATE_REPORTED_ERROR(previous_gtids_logged.ensure_sidno(sidno));
654 PROPAGATE_REPORTED_ERROR(lost_gtids.ensure_sidno(sidno));
655 PROPAGATE_REPORTED_ERROR(owned_gtids.ensure_sidno(sidno));
656 PROPAGATE_REPORTED_ERROR(sid_locks.ensure_index(sidno));
657 PROPAGATE_REPORTED_ERROR(ensure_commit_group_sidnos(sidno));
658 sidno= sid_map->get_max_sidno();
659 assert(executed_gtids.get_max_sidno() >= sidno);
660 assert(gtids_only_in_table.get_max_sidno() >= sidno);
661 assert(previous_gtids_logged.get_max_sidno() >= sidno);
662 assert(lost_gtids.get_max_sidno() >= sidno);
663 assert(owned_gtids.get_max_sidno() >= sidno);
664 assert(sid_locks.get_max_index() >= sidno);
665 assert(commit_group_sidnos.size() >= (unsigned int)sidno);
666 }
667 RETURN_OK;
668 }
669
670
add_lost_gtids(const Gtid_set * gtid_set)671 enum_return_status Gtid_state::add_lost_gtids(const Gtid_set *gtid_set)
672 {
673 DBUG_ENTER("Gtid_state::add_lost_gtids()");
674 sid_lock->assert_some_wrlock();
675
676 gtid_set->dbug_print("add_lost_gtids");
677
678 if (!executed_gtids.is_empty())
679 {
680 BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY)),
681 (ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY,
682 MYF(0)));
683 RETURN_REPORTED_ERROR;
684 }
685 if (!owned_gtids.is_empty())
686 {
687 BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY)),
688 (ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY,
689 MYF(0)));
690 RETURN_REPORTED_ERROR;
691 }
692 assert(lost_gtids.is_empty());
693
694 if (save(gtid_set))
695 RETURN_REPORTED_ERROR;
696 PROPAGATE_REPORTED_ERROR(gtids_only_in_table.add_gtid_set(gtid_set));
697 PROPAGATE_REPORTED_ERROR(lost_gtids.add_gtid_set(gtid_set));
698 PROPAGATE_REPORTED_ERROR(executed_gtids.add_gtid_set(gtid_set));
699 lock_sidnos(gtid_set);
700 broadcast_sidnos(gtid_set);
701 unlock_sidnos(gtid_set);
702
703 DBUG_RETURN(RETURN_STATUS_OK);
704 }
705
706
init()707 int Gtid_state::init()
708 {
709 DBUG_ENTER("Gtid_state::init()");
710
711 global_sid_lock->assert_some_wrlock();
712
713 rpl_sid server_sid;
714 if (server_sid.parse(server_uuid) != 0)
715 DBUG_RETURN(1);
716 rpl_sidno sidno= sid_map->add_sid(server_sid);
717 if (sidno <= 0)
718 DBUG_RETURN(1);
719 server_sidno= sidno;
720 next_free_gno= 1;
721 DBUG_RETURN(0);
722 }
723
724
save(THD * thd)725 int Gtid_state::save(THD *thd)
726 {
727 DBUG_ENTER("Gtid_state::save(THD *thd)");
728 assert(gtid_table_persistor != NULL);
729 assert(thd->owned_gtid.sidno > 0);
730 int error= 0;
731
732 int ret= gtid_table_persistor->save(thd, &thd->owned_gtid);
733 if (1 == ret)
734 {
735 /*
736 Gtid table is not ready to be used, so failed to
737 open it. Ignore the error.
738 */
739 thd->clear_error();
740 if (!thd->get_stmt_da()->is_set())
741 thd->get_stmt_da()->set_ok_status(0, 0, NULL);
742 }
743 else if (-1 == ret)
744 error= -1;
745
746 DBUG_RETURN(error);
747 }
748
749
save(const Gtid_set * gtid_set)750 int Gtid_state::save(const Gtid_set *gtid_set)
751 {
752 DBUG_ENTER("Gtid_state::save(Gtid_set *gtid_set)");
753 int ret= gtid_table_persistor->save(gtid_set);
754 DBUG_RETURN(ret);
755 }
756
757
save_gtids_of_last_binlog_into_table(bool on_rotation)758 int Gtid_state::save_gtids_of_last_binlog_into_table(bool on_rotation)
759 {
760 DBUG_ENTER("Gtid_state::save_gtids_of_last_binlog_into_table");
761 int ret= 0;
762
763 /*
764 Use local Sid_map, so that we don't need a lock while inserting
765 into the table.
766 */
767 Sid_map sid_map(NULL);
768 Gtid_set logged_gtids_last_binlog(&sid_map, NULL);
769 // Allocate some intervals on stack to reduce allocation.
770 static const int PREALLOCATED_INTERVAL_COUNT= 64;
771 Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT];
772 logged_gtids_last_binlog.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv);
773 /*
774 logged_gtids_last_binlog= executed_gtids - previous_gtids_logged -
775 gtids_only_in_table
776 */
777 global_sid_lock->wrlock();
778 ret= (logged_gtids_last_binlog.add_gtid_set(&executed_gtids) !=
779 RETURN_STATUS_OK);
780 if (!ret)
781 {
782 logged_gtids_last_binlog.remove_gtid_set(&previous_gtids_logged);
783 logged_gtids_last_binlog.remove_gtid_set(>ids_only_in_table);
784 if (!logged_gtids_last_binlog.is_empty() ||
785 mysql_bin_log.is_rotating_caused_by_incident)
786 {
787 /* Prepare previous_gtids_logged for next binlog on binlog rotation */
788 if (on_rotation)
789 ret= previous_gtids_logged.add_gtid_set(&logged_gtids_last_binlog);
790 global_sid_lock->unlock();
791 /* Save set of GTIDs of the last binlog into gtid_executed table */
792 if (!ret)
793 ret= save(&logged_gtids_last_binlog);
794 }
795 else
796 global_sid_lock->unlock();
797 }
798 else
799 global_sid_lock->unlock();
800
801 DBUG_RETURN(ret);
802 }
803
804
read_gtid_executed_from_table()805 int Gtid_state::read_gtid_executed_from_table()
806 {
807 return gtid_table_persistor->fetch_gtids(&executed_gtids);
808 }
809
810
compress(THD * thd)811 int Gtid_state::compress(THD *thd)
812 {
813 return gtid_table_persistor->compress(thd);
814 }
815
816
817 #ifdef MYSQL_SERVER
warn_or_err_on_modify_gtid_table(THD * thd,TABLE_LIST * table)818 int Gtid_state::warn_or_err_on_modify_gtid_table(THD *thd, TABLE_LIST *table)
819 {
820 DBUG_ENTER("Gtid_state::warn_or_err_on_modify_gtid_table");
821 int ret=
822 gtid_table_persistor->warn_or_err_on_explicit_modification(thd, table);
823 DBUG_RETURN(ret);
824 }
825 #endif
826
update_gtids_impl_check_skip_gtid_rollback(THD * thd)827 bool Gtid_state::update_gtids_impl_check_skip_gtid_rollback(THD *thd)
828 {
829 if (thd->skip_gtid_rollback)
830 {
831 DBUG_PRINT("info", ("skipping gtid rollback because "
832 "thd->skip_gtid_rollback is set"));
833 return true;
834 }
835 return false;
836 }
837
update_gtids_impl_do_nothing(THD * thd)838 bool Gtid_state::update_gtids_impl_do_nothing(THD *thd)
839 {
840 if (thd->owned_gtid.is_empty() && !thd->has_gtid_consistency_violation)
841 {
842 if (thd->variables.gtid_next.type == GTID_GROUP)
843 thd->variables.gtid_next.set_undefined();
844 DBUG_PRINT("info", ("skipping update_gtids_impl because "
845 "thread does not own anything and does not violate "
846 "gtid consistency"));
847
848 return true;
849 }
850 return false;
851 }
852
update_gtids_impl_begin(THD * thd)853 bool Gtid_state::update_gtids_impl_begin(THD *thd)
854 {
855 #ifndef NDEBUG
856 if (current_thd != thd)
857 mysql_mutex_lock(&thd->LOCK_thd_query);
858 DBUG_PRINT("info", ("query='%s' thd->is_commit_in_middle_of_statement=%d",
859 thd->query().str,
860 thd->is_commit_in_middle_of_statement));
861 if (current_thd != thd)
862 mysql_mutex_unlock(&thd->LOCK_thd_query);
863 #endif
864 return thd->is_commit_in_middle_of_statement;
865 }
866
update_gtids_impl_own_gtid_set(THD * thd,bool is_commit)867 void Gtid_state::update_gtids_impl_own_gtid_set(THD *thd, bool is_commit)
868 {
869 #ifdef HAVE_GTID_NEXT_LIST
870 rpl_sidno prev_sidno= 0;
871 Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
872 Gtid g= git.get();
873 while (g.sidno != 0)
874 {
875 if (g.sidno != prev_sidno)
876 sid_locks.lock(g.sidno);
877 owned_gtids.remove_gtid(g);
878 git.next();
879 g= git.get();
880 if (is_commit)
881 executed_gtids._add_gtid(g);
882 }
883
884 if (is_commit && !thd->owned_gtid_set.is_empty())
885 thd->rpl_thd_ctx.session_gtids_ctx().
886 notify_after_gtid_executed_update(thd);
887
888 thd->variables.gtid_next.set_undefined();
889 thd->owned_gtid.dbug_print(NULL,
890 "set owned_gtid (clear; old was gtid_set) "
891 "in update_gtids_impl");
892 thd->clear_owned_gtids();
893 #else
894 assert(0);
895 #endif
896 }
897
update_gtids_impl_lock_sidno(rpl_sidno sidno)898 void Gtid_state::update_gtids_impl_lock_sidno(rpl_sidno sidno)
899 {
900 assert(sidno > 0);
901 DBUG_PRINT("info",("Locking sidno %d", sidno));
902 lock_sidno(sidno);
903 }
904
update_gtids_impl_lock_sidnos(THD * first_thd)905 void Gtid_state::update_gtids_impl_lock_sidnos(THD *first_thd)
906 {
907 /* Define which sidnos should be locked to be updated */
908 for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
909 {
910 if (thd->owned_gtid.sidno > 0)
911 {
912 DBUG_PRINT("info",("Setting sidno %d to be locked",
913 thd->owned_gtid.sidno));
914 commit_group_sidnos[thd->owned_gtid.sidno]= true;
915 }
916 else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
917 #ifdef HAVE_GTID_NEXT_LIST
918 for (rpl_sidno i= 1; i < thd->owned_gtid_set.max_sidno; i++)
919 if (owned_gtid_set.contains_sidno(i))
920 commit_group_sidnos[i]= true;
921 #else
922 assert(0);
923 #endif
924 }
925
926 /* Take the sidno_locks in order */
927 for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
928 if (commit_group_sidnos[i])
929 update_gtids_impl_lock_sidno(i);
930 }
931
update_gtids_impl_own_gtid(THD * thd,bool is_commit)932 void Gtid_state::update_gtids_impl_own_gtid(THD *thd, bool is_commit)
933 {
934 assert_sidno_lock_owner(thd->owned_gtid.sidno);
935 /*
936 In Group Replication the GTID may additionally be owned by another
937 thread, and we won't remove that ownership (it will be rolled back later)
938 */
939 assert(owned_gtids.is_owned_by(thd->owned_gtid, thd->thread_id()));
940 owned_gtids.remove_gtid(thd->owned_gtid, thd->thread_id());
941
942 if (is_commit)
943 {
944 assert(!executed_gtids.contains_gtid(thd->owned_gtid));
945 DBUG_EXECUTE_IF(
946 "rpl_gtid_update_on_commit_simulate_out_of_memory",
947 DBUG_SET("+d,rpl_gtid_get_free_interval_simulate_out_of_memory"););
948 /*
949 Any session adds transaction owned GTID into global executed_gtids.
950
951 If binlog is disabled, we report @@GLOBAL.GTID_PURGED from
952 executed_gtids, since @@GLOBAL.GTID_PURGED and @@GLOBAL.GTID_EXECUTED
953 are always same, so we did not save gtid into lost_gtids for every
954 transaction for improving performance.
955
956 If binlog is enabled and log_slave_updates is disabled, slave
957 SQL thread or slave worker thread adds transaction owned GTID
958 into global executed_gtids, lost_gtids and gtids_only_in_table.
959 */
960 executed_gtids._add_gtid(thd->owned_gtid);
961 thd->rpl_thd_ctx.session_gtids_ctx().
962 notify_after_gtid_executed_update(thd);
963 if (thd->slave_thread && opt_bin_log && !opt_log_slave_updates)
964 {
965 lost_gtids._add_gtid(thd->owned_gtid);
966 gtids_only_in_table._add_gtid(thd->owned_gtid);
967 }
968 }
969 else
970 {
971 if (thd->owned_gtid.sidno == server_sidno &&
972 next_free_gno > thd->owned_gtid.gno)
973 next_free_gno= thd->owned_gtid.gno;
974 }
975
976 thd->clear_owned_gtids();
977 if (thd->variables.gtid_next.type == GTID_GROUP)
978 {
979 assert(!thd->is_commit_in_middle_of_statement);
980 thd->variables.gtid_next.set_undefined();
981 }
982 else
983 {
984 /*
985 Can be UNDEFINED for statements where
986 gtid_pre_statement_checks skips the test for undefined,
987 e.g. ROLLBACK.
988 */
989 assert(thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
990 thd->variables.gtid_next.type == UNDEFINED_GROUP);
991 }
992 }
993
update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)994 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)
995 {
996 DBUG_PRINT("info",("Unlocking sidno %d", sidno));
997 broadcast_sidno(sidno);
998 unlock_sidno(sidno);
999 }
1000
update_gtids_impl_broadcast_and_unlock_sidnos()1001 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidnos()
1002 {
1003 for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
1004 if (commit_group_sidnos[i])
1005 {
1006 update_gtids_impl_broadcast_and_unlock_sidno(i);
1007 commit_group_sidnos[i]= false;
1008 }
1009 }
1010
update_gtids_impl_own_anonymous(THD * thd,bool * more_trx)1011 void Gtid_state::update_gtids_impl_own_anonymous(THD* thd,
1012 bool *more_trx)
1013 {
1014 assert(thd->variables.gtid_next.type == ANONYMOUS_GROUP ||
1015 thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1016 /*
1017 If there is more in the transaction cache, set more_trx to indicate this.
1018
1019 See comment for the update_gtids_impl_begin function.
1020 */
1021 if (opt_bin_log)
1022 {
1023 // Needed before is_binlog_cache_empty.
1024 thd->binlog_setup_trx_data();
1025 if (!thd->is_binlog_cache_empty(true))
1026 {
1027 *more_trx= true;
1028 DBUG_PRINT("info", ("Transaction cache is non-empty: setting "
1029 "more_transaction_with_same_gtid_next="
1030 "true."));
1031 }
1032 }
1033 if (!(*more_trx &&
1034 thd->variables.gtid_next.type == ANONYMOUS_GROUP))
1035 {
1036 release_anonymous_ownership();
1037 thd->clear_owned_gtids();
1038 }
1039 }
1040
update_gtids_impl_own_nothing(THD * thd)1041 void Gtid_state::update_gtids_impl_own_nothing(THD *thd)
1042 {
1043 assert(thd->commit_error != THD::CE_COMMIT_ERROR ||
1044 thd->has_gtid_consistency_violation);
1045 assert(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1046 }
1047
update_gtids_impl_end(THD * thd,bool more_trx)1048 void Gtid_state::update_gtids_impl_end(THD *thd, bool more_trx)
1049 {
1050 if (!more_trx)
1051 end_gtid_violating_transaction(thd);
1052 }
1053
ensure_commit_group_sidnos(rpl_sidno sidno)1054 enum_return_status Gtid_state::ensure_commit_group_sidnos(rpl_sidno sidno)
1055 {
1056 DBUG_ENTER("Gtid_state::ensure_commit_group_sidnos");
1057 sid_lock->assert_some_wrlock();
1058 /*
1059 As we use the sidno as index of commit_group_sidnos and there is no
1060 sidno=0, the array size must be at least sidno + 1.
1061 */
1062 while ((commit_group_sidnos.size()) < (size_t)sidno + 1)
1063 {
1064 if (commit_group_sidnos.push_back(false))
1065 goto error;
1066 }
1067 RETURN_OK;
1068 error:
1069 BINLOG_ERROR(("Out of memory."), (ER_OUT_OF_RESOURCES, MYF(0)));
1070 RETURN_REPORTED_ERROR;
1071
1072 }
1073