1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22    02110-1301 USA */
23 
24 #include "rpl_gtid.h"
25 
26 #include "rpl_gtid_persist.h"      // gtid_table_persistor
27 #include "sql_class.h"             // THD
28 #include "debug_sync.h"            // DEBUG_SYNC
29 #include "binlog.h"
30 
31 #include <vector>
32 
33 PSI_memory_key key_memory_Gtid_state_group_commit_sidno;
34 
clear(THD * thd)35 int Gtid_state::clear(THD *thd)
36 {
37   DBUG_ENTER("Gtid_state::clear()");
38   int ret= 0;
39   // the wrlock implies that no other thread can hold any of the mutexes
40   sid_lock->assert_some_wrlock();
41   lost_gtids.clear();
42   executed_gtids.clear();
43   gtids_only_in_table.clear();
44   previous_gtids_logged.clear();
45   /* Reset gtid_executed table. */
46   if ((ret= gtid_table_persistor->reset(thd)) == 1)
47   {
48     /*
49       Gtid table is not ready to be used, so failed to
50       open it. Ignore the error.
51     */
52     thd->clear_error();
53     ret= 0;
54   }
55   next_free_gno= 1;
56   DBUG_RETURN(ret);
57 }
58 
59 
acquire_ownership(THD * thd,const Gtid & gtid)60 enum_return_status Gtid_state::acquire_ownership(THD *thd, const Gtid &gtid)
61 {
62   DBUG_ENTER("Gtid_state::acquire_ownership");
63   // caller must take both global_sid_lock and lock on the SIDNO.
64   global_sid_lock->assert_some_lock();
65   gtid_state->assert_sidno_lock_owner(gtid.sidno);
66   assert(!executed_gtids.contains_gtid(gtid));
67   DBUG_PRINT("info", ("gtid=%d:%lld", gtid.sidno, gtid.gno));
68   assert(thd->owned_gtid.sidno == 0);
69   if (owned_gtids.add_gtid_owner(gtid, thd->thread_id()) != RETURN_STATUS_OK)
70     goto err;
71   if (thd->get_gtid_next_list() != NULL)
72   {
73 #ifdef HAVE_GTID_NEXT_LIST
74     thd->owned_gtid_set._add_gtid(gtid);
75     thd->owned_gtid.sidno= THD::OWNED_SIDNO_GTID_SET;
76     thd->owned_sid.clear();
77 #else
78     assert(0);
79 #endif
80   }
81   else
82   {
83     thd->owned_gtid= gtid;
84     thd->owned_gtid.dbug_print(NULL, "set owned_gtid in acquire_ownership");
85     thd->owned_sid= sid_map->sidno_to_sid(gtid.sidno);
86   }
87   RETURN_OK;
88 err:
89   if (thd->get_gtid_next_list() != NULL)
90   {
91 #ifdef HAVE_GTID_NEXT_LIST
92     Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
93     Gtid g= git.get();
94     while (g.sidno != 0)
95     {
96       owned_gtids.remove_gtid(g);
97       g= git.get();
98     }
99 #else
100     assert(0);
101 #endif
102   }
103   thd->clear_owned_gtids();
104   thd->owned_gtid.dbug_print(NULL,
105                              "set owned_gtid (clear) in acquire_ownership");
106   RETURN_REPORTED_ERROR;
107 }
108 
109 #ifdef HAVE_GTID_NEXT_LIST
lock_owned_sidnos(const THD * thd)110 void Gtid_state::lock_owned_sidnos(const THD *thd)
111 {
112   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
113     lock_sidnos(&thd->owned_gtid_set);
114   else if (thd->owned_gtid.sidno > 0)
115     lock_sidno(thd->owned_gtid.sidno);
116 }
117 #endif
118 
119 
unlock_owned_sidnos(const THD * thd)120 void Gtid_state::unlock_owned_sidnos(const THD *thd)
121 {
122   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
123   {
124 #ifdef HAVE_GTID_NEXT_LIST
125     unlock_sidnos(&thd->owned_gtid_set);
126 #else
127     assert(0);
128 #endif
129   }
130   else if (thd->owned_gtid.sidno > 0)
131   {
132     unlock_sidno(thd->owned_gtid.sidno);
133   }
134 }
135 
136 
broadcast_owned_sidnos(const THD * thd)137 void Gtid_state::broadcast_owned_sidnos(const THD *thd)
138 {
139   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
140   {
141 #ifdef HAVE_GTID_NEXT_LIST
142     broadcast_sidnos(&thd->owned_gtid_set);
143 #else
144     assert(0);
145 #endif
146   }
147   else if (thd->owned_gtid.sidno > 0)
148   {
149     broadcast_sidno(thd->owned_gtid.sidno);
150   }
151 }
152 
get_snapshot_gtid_executed(std::string & snapshot_gtid_executed)153 void Gtid_state::get_snapshot_gtid_executed(
154     std::string &snapshot_gtid_executed)
155 {
156   global_sid_lock->wrlock();
157   size_t size= executed_gtids.get_string_length() + 1;
158   std::vector<char> buf(size);
159   executed_gtids.to_string(buf.data());
160   snapshot_gtid_executed= buf.data();
161   global_sid_lock->unlock();
162 }
163 
update_commit_group(THD * first_thd)164 void Gtid_state::update_commit_group(THD *first_thd)
165 {
166   DBUG_ENTER("Gtid_state::update_commit_group");
167 
168   /*
169     We are going to loop in all sessions of the group commit in order to avoid
170     being taking and releasing the global_sid_lock and sidno_lock for each
171     session.
172   */
173   DEBUG_SYNC(first_thd, "update_gtid_state_before_global_sid_lock");
174   global_sid_lock->rdlock();
175   DEBUG_SYNC(first_thd, "update_gtid_state_after_global_sid_lock");
176 
177   update_gtids_impl_lock_sidnos(first_thd);
178 
179   for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
180   {
181     bool is_commit= (thd->commit_error != THD::CE_COMMIT_ERROR);
182 
183     if (update_gtids_impl_do_nothing(thd) ||
184         (!is_commit && update_gtids_impl_check_skip_gtid_rollback(thd)))
185       continue;
186 
187     bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
188 
189     if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
190     {
191       update_gtids_impl_own_gtid_set(thd, is_commit);
192     }
193     else if (thd->owned_gtid.sidno > 0)
194     {
195       update_gtids_impl_own_gtid(thd, is_commit);
196     }
197     else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
198     {
199       update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
200     }
201     else
202     {
203       update_gtids_impl_own_nothing(thd);
204     }
205 
206     update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
207   }
208 
209   update_gtids_impl_broadcast_and_unlock_sidnos();
210 
211   global_sid_lock->unlock();
212 
213   DBUG_VOID_RETURN;
214 }
215 
update_on_commit(THD * thd)216 void Gtid_state::update_on_commit(THD *thd)
217 {
218   DBUG_ENTER("Gtid_state::update_on_commit");
219 
220   update_gtids_impl(thd, true);
221   DEBUG_SYNC(thd, "end_of_gtid_state_update_on_commit");
222 
223   DBUG_VOID_RETURN;
224 }
225 
226 
update_on_rollback(THD * thd)227 void Gtid_state::update_on_rollback(THD *thd)
228 {
229   DBUG_ENTER("Gtid_state::update_on_rollback");
230 
231   if (!update_gtids_impl_check_skip_gtid_rollback(thd))
232     update_gtids_impl(thd, false);
233 
234   DBUG_VOID_RETURN;
235 }
236 
237 
update_gtids_impl(THD * thd,bool is_commit)238 void Gtid_state::update_gtids_impl(THD *thd, bool is_commit)
239 {
240   DBUG_ENTER("Gtid_state::update_gtids_impl");
241 
242   if (update_gtids_impl_do_nothing(thd))
243     DBUG_VOID_RETURN;
244 
245   bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
246 
247   DEBUG_SYNC(thd, "update_gtid_state_before_global_sid_lock");
248   global_sid_lock->rdlock();
249   DEBUG_SYNC(thd, "update_gtid_state_after_global_sid_lock");
250 
251   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
252   {
253     update_gtids_impl_own_gtid_set(thd, is_commit);
254   }
255   else if (thd->owned_gtid.sidno > 0)
256   {
257     rpl_sidno sidno= thd->owned_gtid.sidno;
258     update_gtids_impl_lock_sidno(sidno);
259     update_gtids_impl_own_gtid(thd, is_commit);
260     update_gtids_impl_broadcast_and_unlock_sidno(sidno);
261   }
262   else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
263   {
264     update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
265   }
266   else
267   {
268     update_gtids_impl_own_nothing(thd);
269   }
270 
271   global_sid_lock->unlock();
272 
273   update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
274 
275   thd->owned_gtid.dbug_print(NULL,
276                              "set owned_gtid (clear) in update_gtids_impl");
277 
278   DBUG_VOID_RETURN;
279 }
280 
281 
end_gtid_violating_transaction(THD * thd)282 void Gtid_state::end_gtid_violating_transaction(THD *thd)
283 {
284   DBUG_ENTER("end_gtid_violating_transaction");
285   if (thd->has_gtid_consistency_violation)
286   {
287     if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
288       end_automatic_gtid_violating_transaction();
289     else
290     {
291       assert(thd->variables.gtid_next.type == ANONYMOUS_GROUP);
292       end_anonymous_gtid_violating_transaction();
293     }
294     thd->has_gtid_consistency_violation= false;
295   }
296   DBUG_VOID_RETURN;
297 }
298 
299 
wait_for_sidno(THD * thd,rpl_sidno sidno,struct timespec * abstime)300 bool Gtid_state::wait_for_sidno(THD *thd, rpl_sidno sidno,
301                                 struct timespec *abstime)
302 {
303   DBUG_ENTER("wait_for_sidno");
304   PSI_stage_info old_stage;
305   sid_lock->assert_some_lock();
306   sid_locks.assert_owner(sidno);
307   sid_locks.enter_cond(thd, sidno,
308                        &stage_waiting_for_gtid_to_be_committed,
309                        &old_stage);
310   bool ret= sid_locks.wait(thd, sidno, abstime);
311   // Can't call sid_locks.unlock() as that requires global_sid_lock.
312   mysql_mutex_unlock(thd->current_mutex);
313   thd->EXIT_COND(&old_stage);
314   DBUG_RETURN(ret);
315 }
316 
317 
wait_for_gtid(THD * thd,const Gtid & gtid,struct timespec * abstime)318 bool Gtid_state::wait_for_gtid(THD *thd, const Gtid &gtid,
319                                struct timespec *abstime)
320 {
321   DBUG_ENTER("Gtid_state::wait_for_gtid");
322   DBUG_PRINT("info", ("SIDNO=%d GNO=%lld thread_id=%u",
323                       gtid.sidno, gtid.gno,
324                       thd->thread_id()));
325   assert(!owned_gtids.is_owned_by(gtid, thd->thread_id()));
326   assert(!owned_gtids.is_owned_by(gtid, 0));
327 
328   bool ret= wait_for_sidno(thd, gtid.sidno, abstime);
329   DBUG_RETURN(ret);
330 }
331 
332 
wait_for_gtid_set(THD * thd,Gtid_set * wait_for,double timeout)333 bool Gtid_state::wait_for_gtid_set(THD *thd, Gtid_set* wait_for,
334                                    double timeout)
335 {
336   struct timespec abstime;
337   DBUG_ENTER("Gtid_state::wait_for_gtid_set");
338   DEBUG_SYNC(thd, "begin_wait_for_executed_gtid_set");
339   wait_for->dbug_print("Waiting for");
340   DBUG_PRINT("info", ("Timeout %f", timeout));
341 
342   global_sid_lock->assert_some_rdlock();
343 
344   assert(wait_for->get_sid_map() == global_sid_map);
345 
346   if (timeout > 0) {
347     set_timespec_nsec(&abstime,
348                       static_cast<ulonglong>(timeout * 1000000000ULL));
349   }
350 
351   /*
352     Algorithm:
353 
354     Let 'todo' contain the GTIDs to wait for. Iterate over SIDNOs in
355     'todo' (this is the 'for' loop below).
356 
357     For each SIDNO in 'todo', remove gtid_executed for that SIDNO from
358     'todo'.  If, after this removal, there is still some interval for
359     this SIDNO in 'todo', then wait for a signal on this SIDNO.
360     Repeat this step until 'todo' is empty for this SIDNO (this is the
361     innermost 'while' loop below).
362 
363     Once the loop over SIDNOs has completed, 'todo' is guaranteed to
364     be empty.  However, it may still be the case that not all GTIDs of
365     wait_for are included in gtid_executed, since RESET MASTER may
366     have been executed while we were waiting.
367 
368     RESET MASTER requires global_sid_lock.wrlock.  We hold
369     global_sid_lock.rdlock while removing GTIDs from 'todo', but the
370     wait operation releases global_sid_lock.rdlock.  So if we
371     completed the 'for' loop without waiting, we know for sure that
372     global_sid_lock.rdlock was held while emptying 'todo', and thus
373     RESET MASTER cannot have executed in the meantime.  But if we
374     waited at some point during the execution of the 'for' loop, RESET
375     MASTER may have been called.  Thus, we repeatedly run 'for' loop
376     until it completes without waiting (this is the outermost 'while'
377     loop).
378   */
379 
380   // Will be true once the entire 'for' loop completes without waiting.
381   bool verified= false;
382 
383   // The set of GTIDs that we are still waiting for.
384   Gtid_set todo(global_sid_map, NULL);
385   // As an optimization, add 100 Intervals that do not need to be
386   // allocated. This avoids allocation of these intervals.
387   static const int preallocated_interval_count= 100;
388   Gtid_set::Interval ivs[preallocated_interval_count];
389   todo.add_interval_memory(preallocated_interval_count, ivs);
390 
391   /*
392     Iterate until we have verified that all GTIDs in the set are
393     included in gtid_executed.
394   */
395   while (!verified)
396   {
397     todo.add_gtid_set(wait_for);
398 
399     // Iterate over SIDNOs until all GTIDs have been removed from 'todo'.
400 
401     // Set 'verified' to true; it will be set to 'false' if any wait
402     // is done.
403     verified= true;
404     for (int sidno= 1; sidno <= todo.get_max_sidno(); sidno++)
405     {
406       // Iterate until 'todo' is empty for this SIDNO.
407       while (todo.contains_sidno(sidno))
408       {
409         lock_sidno(sidno);
410         todo.remove_intervals_for_sidno(&executed_gtids, sidno);
411 
412         if (todo.contains_sidno(sidno))
413         {
414           bool ret= wait_for_sidno(thd, sidno, timeout > 0 ? &abstime : NULL);
415 
416           // wait_for_gtid will release both the global lock and the
417           // mutex.  Acquire the global lock again.
418           global_sid_lock->rdlock();
419           verified= false;
420 
421           if (thd->killed)
422           {
423             switch (thd->killed)
424             {
425             case ER_SERVER_SHUTDOWN:
426             case ER_QUERY_INTERRUPTED:
427             case ER_QUERY_TIMEOUT:
428               my_error(thd->killed, MYF(0));
429               break;
430             default:
431               my_error(ER_QUERY_INTERRUPTED, MYF(0));
432               break;
433             }
434             DBUG_RETURN(true);
435           }
436 
437           if (ret)
438             DBUG_RETURN(true);
439         }
440         else
441         {
442           // Keep the global lock since it may be needed in a later
443           // iteration of the for loop.
444           unlock_sidno(sidno);
445           break;
446         }
447       }
448     }
449   }
450   DBUG_RETURN(false);
451 }
452 
get_automatic_gno(rpl_sidno sidno) const453 rpl_gno Gtid_state::get_automatic_gno(rpl_sidno sidno) const
454 {
455   DBUG_ENTER("Gtid_state::get_automatic_gno");
456   Gtid_set::Const_interval_iterator ivit(&executed_gtids, sidno);
457   /*
458     When assigning new automatic GTIDs, we can optimize the assignment by start
459     searching an available GNO from the "supposed" next free one instead of
460     starting from 1.
461 
462     This is useful mostly on systems having many transactions committing in
463     group asking for automatic GTIDs. When a GNO is assigned to be owned by a
464     transaction, it is not removed from the free intervals, but will be added
465     to the owned_gtids set. In this way, picking up the actual first free GNO
466     would often lead to getting a GNO already owned by other thread. This can
467     lead to many "tries" of getting a free and not owned yet GNO (a thread
468     would try N times, N being the sum of transactions in the FLUSH stage plus
469     the transactions in the COMMIT stage that didn't released their ownership
470     yet).
471 
472     The optimization just set next_free_gno variable to the last assigned
473     GNO + 1, as this would be the common case without having transactions
474     rolling back. This is done at Gtid_state::generate_automatic_gtid.
475 
476     In order to fill the gaps of GTID_EXECUTED when a transaction rolls back
477     releasing the ownership of a GTID, we check if the released GNO is smaller
478     than the next_free_gno at Gtid_state::update_gtids_impl_own_gtid function
479     to set next_free_gno with the "released" GNO in this case.
480   */
481   Gtid next_candidate= { sidno,
482                          sidno == get_server_sidno() ? next_free_gno : 1 };
483   while (true)
484   {
485     const Gtid_set::Interval *iv= ivit.get();
486     rpl_gno next_interval_start= iv != NULL ? iv->start : GNO_END;
487     while (next_candidate.gno < next_interval_start &&
488            DBUG_EVALUATE_IF("simulate_gno_exhausted", false, true))
489     {
490       DBUG_PRINT("debug",("Checking availability of gno= %llu", next_candidate.gno));
491       if (owned_gtids.is_owned_by(next_candidate, 0))
492         DBUG_RETURN(next_candidate.gno);
493       next_candidate.gno++;
494     }
495     if (iv == NULL ||
496         DBUG_EVALUATE_IF("simulate_gno_exhausted", true, false))
497     {
498       my_error(ER_GNO_EXHAUSTED, MYF(0));
499       DBUG_RETURN(-1);
500     }
501     if (next_candidate.gno < iv->end)
502       next_candidate.gno= iv->end;
503     ivit.next();
504   }
505 }
506 
507 
get_last_executed_gno(rpl_sidno sidno) const508 rpl_gno Gtid_state::get_last_executed_gno(rpl_sidno sidno) const
509 {
510   DBUG_ENTER("Gtid:state::get_last_executed_gno");
511   rpl_gno gno= 0;
512 
513   gtid_state->lock_sidno(sidno);
514   gno= executed_gtids.get_last_gno(sidno);
515   gtid_state->unlock_sidno(sidno);
516 
517   DBUG_RETURN(gno);
518 }
519 
520 
generate_automatic_gtid(THD * thd,rpl_sidno specified_sidno,rpl_gno specified_gno,rpl_sidno * locked_sidno)521 enum_return_status Gtid_state::generate_automatic_gtid(THD *thd,
522                                                        rpl_sidno specified_sidno,
523                                                        rpl_gno specified_gno,
524                                                        rpl_sidno *locked_sidno)
525 {
526   DBUG_ENTER("Gtid_state::generate_automatic_gtid");
527   enum_return_status ret= RETURN_STATUS_OK;
528 
529   assert(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
530   assert(specified_sidno >= 0);
531   assert(specified_gno >= 0);
532   assert(thd->owned_gtid.is_empty());
533 
534   bool locked_sidno_was_passed_null = (locked_sidno == NULL);
535 
536   if (locked_sidno_was_passed_null)
537     sid_lock->rdlock();
538   else
539     /* The caller must lock the sid_lock when locked_sidno is passed */
540     sid_lock->assert_some_lock();
541 
542   // If GTID_MODE = ON_PERMISSIVE or ON, generate a new GTID
543   if (get_gtid_mode(GTID_MODE_LOCK_SID) >= GTID_MODE_ON_PERMISSIVE)
544   {
545     Gtid automatic_gtid= { specified_sidno, specified_gno };
546 
547     if (automatic_gtid.sidno == 0)
548       automatic_gtid.sidno= get_server_sidno();
549 
550     /*
551       We need to lock the sidno if locked_sidno wasn't passed as paramenter
552       or the already locked sidno doesn't match the one to generate the new
553       automatic GTID.
554     */
555     bool need_to_lock_sidno= (locked_sidno_was_passed_null ||
556                               *locked_sidno != automatic_gtid.sidno);
557     if (need_to_lock_sidno)
558     {
559       /*
560         When locked_sidno contains a value greater than zero we must release
561         the current locked sidno. This should not happen with current code, as
562         the server only generates automatic GTIDs with server's UUID as sid.
563       */
564       if (!locked_sidno_was_passed_null && *locked_sidno != 0)
565         unlock_sidno(*locked_sidno);
566       lock_sidno(automatic_gtid.sidno);
567       /* Update the locked_sidno, so the caller would know what to unlock */
568       if (!locked_sidno_was_passed_null)
569         *locked_sidno= automatic_gtid.sidno;
570     }
571 
572     if (automatic_gtid.gno == 0)
573     {
574       automatic_gtid.gno= get_automatic_gno(automatic_gtid.sidno);
575       if (automatic_gtid.sidno == get_server_sidno() &&
576           automatic_gtid.gno != -1)
577         next_free_gno= automatic_gtid.gno + 1;
578     }
579 
580     if (automatic_gtid.gno != -1)
581       acquire_ownership(thd, automatic_gtid);
582     else
583       ret= RETURN_STATUS_REPORTED_ERROR;
584 
585     /* The caller will unlock the sidno_lock if locked_sidno was passed */
586     if (locked_sidno_was_passed_null)
587       unlock_sidno(automatic_gtid.sidno);
588 
589   }
590   else
591   {
592     // If GTID_MODE = OFF or OFF_PERMISSIVE, just mark this thread as
593     // using an anonymous transaction.
594     thd->owned_gtid.sidno= THD::OWNED_SIDNO_ANONYMOUS;
595     thd->owned_gtid.gno= 0;
596     acquire_anonymous_ownership();
597     thd->owned_gtid.dbug_print(NULL,
598                                "set owned_gtid (anonymous) in generate_automatic_gtid");
599   }
600 
601   /* The caller will unlock the sid_lock if locked_sidno was passed */
602   if (locked_sidno_was_passed_null)
603     sid_lock->unlock();
604 
605   gtid_set_performance_schema_values(thd);
606 
607   DBUG_RETURN(ret);
608 }
609 
610 
lock_sidnos(const Gtid_set * gs)611 void Gtid_state::lock_sidnos(const Gtid_set *gs)
612 {
613   assert(gs);
614   rpl_sidno max_sidno= gs->get_max_sidno();
615   for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
616     if (gs->contains_sidno(sidno))
617       lock_sidno(sidno);
618 }
619 
620 
unlock_sidnos(const Gtid_set * gs)621 void Gtid_state::unlock_sidnos(const Gtid_set *gs)
622 {
623   assert(gs);
624   rpl_sidno max_sidno= gs->get_max_sidno();
625   for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
626     if (gs->contains_sidno(sidno))
627       unlock_sidno(sidno);
628 }
629 
630 
broadcast_sidnos(const Gtid_set * gs)631 void Gtid_state::broadcast_sidnos(const Gtid_set *gs)
632 {
633   assert(gs);
634   rpl_sidno max_sidno= gs->get_max_sidno();
635   for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
636     if (gs->contains_sidno(sidno))
637       broadcast_sidno(sidno);
638 }
639 
640 
ensure_sidno()641 enum_return_status Gtid_state::ensure_sidno()
642 {
643   DBUG_ENTER("Gtid_state::ensure_sidno");
644   sid_lock->assert_some_wrlock();
645   rpl_sidno sidno= sid_map->get_max_sidno();
646   if (sidno > 0)
647   {
648     // The lock may be temporarily released during one of the calls to
649     // ensure_sidno or ensure_index.  Hence, we must re-check the
650     // condition after the calls.
651     PROPAGATE_REPORTED_ERROR(executed_gtids.ensure_sidno(sidno));
652     PROPAGATE_REPORTED_ERROR(gtids_only_in_table.ensure_sidno(sidno));
653     PROPAGATE_REPORTED_ERROR(previous_gtids_logged.ensure_sidno(sidno));
654     PROPAGATE_REPORTED_ERROR(lost_gtids.ensure_sidno(sidno));
655     PROPAGATE_REPORTED_ERROR(owned_gtids.ensure_sidno(sidno));
656     PROPAGATE_REPORTED_ERROR(sid_locks.ensure_index(sidno));
657     PROPAGATE_REPORTED_ERROR(ensure_commit_group_sidnos(sidno));
658     sidno= sid_map->get_max_sidno();
659     assert(executed_gtids.get_max_sidno() >= sidno);
660     assert(gtids_only_in_table.get_max_sidno() >= sidno);
661     assert(previous_gtids_logged.get_max_sidno() >= sidno);
662     assert(lost_gtids.get_max_sidno() >= sidno);
663     assert(owned_gtids.get_max_sidno() >= sidno);
664     assert(sid_locks.get_max_index() >= sidno);
665     assert(commit_group_sidnos.size() >= (unsigned int)sidno);
666   }
667   RETURN_OK;
668 }
669 
670 
add_lost_gtids(const Gtid_set * gtid_set)671 enum_return_status Gtid_state::add_lost_gtids(const Gtid_set *gtid_set)
672 {
673   DBUG_ENTER("Gtid_state::add_lost_gtids()");
674   sid_lock->assert_some_wrlock();
675 
676   gtid_set->dbug_print("add_lost_gtids");
677 
678   if (!executed_gtids.is_empty())
679   {
680     BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY)),
681                  (ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY,
682                  MYF(0)));
683     RETURN_REPORTED_ERROR;
684   }
685   if (!owned_gtids.is_empty())
686   {
687     BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY)),
688                  (ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY,
689                  MYF(0)));
690     RETURN_REPORTED_ERROR;
691   }
692   assert(lost_gtids.is_empty());
693 
694   if (save(gtid_set))
695     RETURN_REPORTED_ERROR;
696   PROPAGATE_REPORTED_ERROR(gtids_only_in_table.add_gtid_set(gtid_set));
697   PROPAGATE_REPORTED_ERROR(lost_gtids.add_gtid_set(gtid_set));
698   PROPAGATE_REPORTED_ERROR(executed_gtids.add_gtid_set(gtid_set));
699   lock_sidnos(gtid_set);
700   broadcast_sidnos(gtid_set);
701   unlock_sidnos(gtid_set);
702 
703   DBUG_RETURN(RETURN_STATUS_OK);
704 }
705 
706 
init()707 int Gtid_state::init()
708 {
709   DBUG_ENTER("Gtid_state::init()");
710 
711   global_sid_lock->assert_some_wrlock();
712 
713   rpl_sid server_sid;
714   if (server_sid.parse(server_uuid) != 0)
715     DBUG_RETURN(1);
716   rpl_sidno sidno= sid_map->add_sid(server_sid);
717   if (sidno <= 0)
718     DBUG_RETURN(1);
719   server_sidno= sidno;
720   next_free_gno= 1;
721   DBUG_RETURN(0);
722 }
723 
724 
save(THD * thd)725 int Gtid_state::save(THD *thd)
726 {
727   DBUG_ENTER("Gtid_state::save(THD *thd)");
728   assert(gtid_table_persistor != NULL);
729   assert(thd->owned_gtid.sidno > 0);
730   int error= 0;
731 
732   int ret= gtid_table_persistor->save(thd, &thd->owned_gtid);
733   if (1 == ret)
734   {
735     /*
736       Gtid table is not ready to be used, so failed to
737       open it. Ignore the error.
738     */
739     thd->clear_error();
740     if (!thd->get_stmt_da()->is_set())
741         thd->get_stmt_da()->set_ok_status(0, 0, NULL);
742   }
743   else if (-1 == ret)
744     error= -1;
745 
746   DBUG_RETURN(error);
747 }
748 
749 
save(const Gtid_set * gtid_set)750 int Gtid_state::save(const Gtid_set *gtid_set)
751 {
752   DBUG_ENTER("Gtid_state::save(Gtid_set *gtid_set)");
753   int ret= gtid_table_persistor->save(gtid_set);
754   DBUG_RETURN(ret);
755 }
756 
757 
save_gtids_of_last_binlog_into_table(bool on_rotation)758 int Gtid_state::save_gtids_of_last_binlog_into_table(bool on_rotation)
759 {
760   DBUG_ENTER("Gtid_state::save_gtids_of_last_binlog_into_table");
761   int ret= 0;
762 
763   /*
764     Use local Sid_map, so that we don't need a lock while inserting
765     into the table.
766   */
767   Sid_map sid_map(NULL);
768   Gtid_set logged_gtids_last_binlog(&sid_map, NULL);
769   // Allocate some intervals on stack to reduce allocation.
770   static const int PREALLOCATED_INTERVAL_COUNT= 64;
771   Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT];
772   logged_gtids_last_binlog.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv);
773   /*
774     logged_gtids_last_binlog= executed_gtids - previous_gtids_logged -
775                               gtids_only_in_table
776   */
777   global_sid_lock->wrlock();
778   ret= (logged_gtids_last_binlog.add_gtid_set(&executed_gtids) !=
779         RETURN_STATUS_OK);
780   if (!ret)
781   {
782     logged_gtids_last_binlog.remove_gtid_set(&previous_gtids_logged);
783     logged_gtids_last_binlog.remove_gtid_set(&gtids_only_in_table);
784     if (!logged_gtids_last_binlog.is_empty() ||
785         mysql_bin_log.is_rotating_caused_by_incident)
786     {
787       /* Prepare previous_gtids_logged for next binlog on binlog rotation */
788       if (on_rotation)
789         ret= previous_gtids_logged.add_gtid_set(&logged_gtids_last_binlog);
790       global_sid_lock->unlock();
791       /* Save set of GTIDs of the last binlog into gtid_executed table */
792       if (!ret)
793         ret= save(&logged_gtids_last_binlog);
794     }
795     else
796       global_sid_lock->unlock();
797   }
798   else
799     global_sid_lock->unlock();
800 
801   DBUG_RETURN(ret);
802 }
803 
804 
read_gtid_executed_from_table()805 int Gtid_state::read_gtid_executed_from_table()
806 {
807   return gtid_table_persistor->fetch_gtids(&executed_gtids);
808 }
809 
810 
compress(THD * thd)811 int Gtid_state::compress(THD *thd)
812 {
813   return gtid_table_persistor->compress(thd);
814 }
815 
816 
817 #ifdef MYSQL_SERVER
warn_or_err_on_modify_gtid_table(THD * thd,TABLE_LIST * table)818 int Gtid_state::warn_or_err_on_modify_gtid_table(THD *thd, TABLE_LIST *table)
819 {
820   DBUG_ENTER("Gtid_state::warn_or_err_on_modify_gtid_table");
821   int ret=
822     gtid_table_persistor->warn_or_err_on_explicit_modification(thd, table);
823   DBUG_RETURN(ret);
824 }
825 #endif
826 
update_gtids_impl_check_skip_gtid_rollback(THD * thd)827 bool Gtid_state::update_gtids_impl_check_skip_gtid_rollback(THD *thd)
828 {
829   if (thd->skip_gtid_rollback)
830   {
831     DBUG_PRINT("info", ("skipping gtid rollback because "
832                         "thd->skip_gtid_rollback is set"));
833     return true;
834   }
835   return false;
836 }
837 
update_gtids_impl_do_nothing(THD * thd)838 bool Gtid_state::update_gtids_impl_do_nothing(THD *thd)
839 {
840   if (thd->owned_gtid.is_empty() && !thd->has_gtid_consistency_violation)
841   {
842     if (thd->variables.gtid_next.type == GTID_GROUP)
843       thd->variables.gtid_next.set_undefined();
844     DBUG_PRINT("info", ("skipping update_gtids_impl because "
845                         "thread does not own anything and does not violate "
846                         "gtid consistency"));
847 
848     return true;
849   }
850   return false;
851 }
852 
update_gtids_impl_begin(THD * thd)853 bool Gtid_state::update_gtids_impl_begin(THD *thd)
854 {
855 #ifndef NDEBUG
856   if (current_thd != thd)
857     mysql_mutex_lock(&thd->LOCK_thd_query);
858   DBUG_PRINT("info", ("query='%s' thd->is_commit_in_middle_of_statement=%d",
859                       thd->query().str,
860                       thd->is_commit_in_middle_of_statement));
861   if (current_thd != thd)
862     mysql_mutex_unlock(&thd->LOCK_thd_query);
863 #endif
864   return thd->is_commit_in_middle_of_statement;
865 }
866 
update_gtids_impl_own_gtid_set(THD * thd,bool is_commit)867 void Gtid_state::update_gtids_impl_own_gtid_set(THD *thd, bool is_commit)
868 {
869 #ifdef HAVE_GTID_NEXT_LIST
870   rpl_sidno prev_sidno= 0;
871   Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
872   Gtid g= git.get();
873   while (g.sidno != 0)
874   {
875     if (g.sidno != prev_sidno)
876       sid_locks.lock(g.sidno);
877     owned_gtids.remove_gtid(g);
878     git.next();
879     g= git.get();
880     if (is_commit)
881       executed_gtids._add_gtid(g);
882   }
883 
884   if (is_commit && !thd->owned_gtid_set.is_empty())
885     thd->rpl_thd_ctx.session_gtids_ctx().
886       notify_after_gtid_executed_update(thd);
887 
888   thd->variables.gtid_next.set_undefined();
889   thd->owned_gtid.dbug_print(NULL,
890                              "set owned_gtid (clear; old was gtid_set) "
891                              "in update_gtids_impl");
892   thd->clear_owned_gtids();
893 #else
894   assert(0);
895 #endif
896 }
897 
update_gtids_impl_lock_sidno(rpl_sidno sidno)898 void Gtid_state::update_gtids_impl_lock_sidno(rpl_sidno sidno)
899 {
900   assert(sidno > 0);
901   DBUG_PRINT("info",("Locking sidno %d", sidno));
902   lock_sidno(sidno);
903 }
904 
update_gtids_impl_lock_sidnos(THD * first_thd)905 void Gtid_state::update_gtids_impl_lock_sidnos(THD *first_thd)
906 {
907   /* Define which sidnos should be locked to be updated */
908   for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
909   {
910     if (thd->owned_gtid.sidno > 0)
911     {
912       DBUG_PRINT("info",("Setting sidno %d to be locked",
913                          thd->owned_gtid.sidno));
914       commit_group_sidnos[thd->owned_gtid.sidno]= true;
915     }
916     else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
917 #ifdef HAVE_GTID_NEXT_LIST
918       for (rpl_sidno i= 1; i < thd->owned_gtid_set.max_sidno; i++)
919         if (owned_gtid_set.contains_sidno(i))
920           commit_group_sidnos[i]= true;
921 #else
922     assert(0);
923 #endif
924   }
925 
926   /* Take the sidno_locks in order */
927   for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
928     if (commit_group_sidnos[i])
929       update_gtids_impl_lock_sidno(i);
930 }
931 
update_gtids_impl_own_gtid(THD * thd,bool is_commit)932 void Gtid_state::update_gtids_impl_own_gtid(THD *thd, bool is_commit)
933 {
934   assert_sidno_lock_owner(thd->owned_gtid.sidno);
935   /*
936     In Group Replication the GTID may additionally be owned by another
937     thread, and we won't remove that ownership (it will be rolled back later)
938   */
939   assert(owned_gtids.is_owned_by(thd->owned_gtid, thd->thread_id()));
940   owned_gtids.remove_gtid(thd->owned_gtid, thd->thread_id());
941 
942   if (is_commit)
943   {
944     assert(!executed_gtids.contains_gtid(thd->owned_gtid));
945     DBUG_EXECUTE_IF(
946       "rpl_gtid_update_on_commit_simulate_out_of_memory",
947       DBUG_SET("+d,rpl_gtid_get_free_interval_simulate_out_of_memory"););
948     /*
949       Any session adds transaction owned GTID into global executed_gtids.
950 
951       If binlog is disabled, we report @@GLOBAL.GTID_PURGED from
952       executed_gtids, since @@GLOBAL.GTID_PURGED and @@GLOBAL.GTID_EXECUTED
953       are always same, so we did not save gtid into lost_gtids for every
954       transaction for improving performance.
955 
956       If binlog is enabled and log_slave_updates is disabled, slave
957       SQL thread or slave worker thread adds transaction owned GTID
958       into global executed_gtids, lost_gtids and gtids_only_in_table.
959     */
960     executed_gtids._add_gtid(thd->owned_gtid);
961     thd->rpl_thd_ctx.session_gtids_ctx().
962       notify_after_gtid_executed_update(thd);
963     if (thd->slave_thread && opt_bin_log && !opt_log_slave_updates)
964     {
965       lost_gtids._add_gtid(thd->owned_gtid);
966       gtids_only_in_table._add_gtid(thd->owned_gtid);
967     }
968   }
969   else
970   {
971     if (thd->owned_gtid.sidno == server_sidno &&
972         next_free_gno > thd->owned_gtid.gno)
973       next_free_gno= thd->owned_gtid.gno;
974   }
975 
976   thd->clear_owned_gtids();
977   if (thd->variables.gtid_next.type == GTID_GROUP)
978   {
979     assert(!thd->is_commit_in_middle_of_statement);
980     thd->variables.gtid_next.set_undefined();
981   }
982   else
983   {
984     /*
985       Can be UNDEFINED for statements where
986       gtid_pre_statement_checks skips the test for undefined,
987       e.g. ROLLBACK.
988     */
989     assert(thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
990            thd->variables.gtid_next.type == UNDEFINED_GROUP);
991   }
992 }
993 
update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)994 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)
995 {
996   DBUG_PRINT("info",("Unlocking sidno %d", sidno));
997   broadcast_sidno(sidno);
998   unlock_sidno(sidno);
999 }
1000 
update_gtids_impl_broadcast_and_unlock_sidnos()1001 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidnos()
1002 {
1003   for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
1004     if (commit_group_sidnos[i])
1005     {
1006       update_gtids_impl_broadcast_and_unlock_sidno(i);
1007       commit_group_sidnos[i]= false;
1008     }
1009 }
1010 
update_gtids_impl_own_anonymous(THD * thd,bool * more_trx)1011 void Gtid_state::update_gtids_impl_own_anonymous(THD* thd,
1012                                                  bool *more_trx)
1013 {
1014   assert(thd->variables.gtid_next.type == ANONYMOUS_GROUP ||
1015          thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1016   /*
1017     If there is more in the transaction cache, set more_trx to indicate this.
1018 
1019     See comment for the update_gtids_impl_begin function.
1020   */
1021   if (opt_bin_log)
1022   {
1023     // Needed before is_binlog_cache_empty.
1024     thd->binlog_setup_trx_data();
1025     if (!thd->is_binlog_cache_empty(true))
1026     {
1027       *more_trx= true;
1028       DBUG_PRINT("info", ("Transaction cache is non-empty: setting "
1029                           "more_transaction_with_same_gtid_next="
1030                           "true."));
1031     }
1032   }
1033   if (!(*more_trx &&
1034         thd->variables.gtid_next.type == ANONYMOUS_GROUP))
1035   {
1036     release_anonymous_ownership();
1037     thd->clear_owned_gtids();
1038   }
1039 }
1040 
update_gtids_impl_own_nothing(THD * thd)1041 void Gtid_state::update_gtids_impl_own_nothing(THD *thd)
1042 {
1043   assert(thd->commit_error != THD::CE_COMMIT_ERROR ||
1044          thd->has_gtid_consistency_violation);
1045   assert(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1046 }
1047 
update_gtids_impl_end(THD * thd,bool more_trx)1048 void Gtid_state::update_gtids_impl_end(THD *thd, bool more_trx)
1049 {
1050   if (!more_trx)
1051     end_gtid_violating_transaction(thd);
1052 }
1053 
ensure_commit_group_sidnos(rpl_sidno sidno)1054 enum_return_status Gtid_state::ensure_commit_group_sidnos(rpl_sidno sidno)
1055 {
1056   DBUG_ENTER("Gtid_state::ensure_commit_group_sidnos");
1057   sid_lock->assert_some_wrlock();
1058   /*
1059     As we use the sidno as index of commit_group_sidnos and there is no
1060     sidno=0, the array size must be at least sidno + 1.
1061   */
1062   while ((commit_group_sidnos.size()) < (size_t)sidno + 1)
1063   {
1064     if (commit_group_sidnos.push_back(false))
1065       goto error;
1066   }
1067   RETURN_OK;
1068 error:
1069   BINLOG_ERROR(("Out of memory."), (ER_OUT_OF_RESOURCES, MYF(0)));
1070   RETURN_REPORTED_ERROR;
1071 
1072 }
1073