1 /* Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22    02110-1301 USA */
23 
24 #include "rpl_gtid.h"
25 
26 #include "rpl_gtid_persist.h"      // gtid_table_persistor
27 #include "sql_class.h"             // THD
28 #include "debug_sync.h"            // DEBUG_SYNC
29 #include "binlog.h"
30 
31 PSI_memory_key key_memory_Gtid_state_group_commit_sidno;
32 
clear(THD * thd)33 int Gtid_state::clear(THD *thd)
34 {
35   DBUG_ENTER("Gtid_state::clear()");
36   int ret= 0;
37   // the wrlock implies that no other thread can hold any of the mutexes
38   sid_lock->assert_some_wrlock();
39   lost_gtids.clear();
40   executed_gtids.clear();
41   gtids_only_in_table.clear();
42   previous_gtids_logged.clear();
43   /* Reset gtid_executed table. */
44   if ((ret= gtid_table_persistor->reset(thd)) == 1)
45   {
46     /*
47       Gtid table is not ready to be used, so failed to
48       open it. Ignore the error.
49     */
50     thd->clear_error();
51     ret= 0;
52   }
53   next_free_gno= 1;
54   DBUG_RETURN(ret);
55 }
56 
57 
acquire_ownership(THD * thd,const Gtid & gtid)58 enum_return_status Gtid_state::acquire_ownership(THD *thd, const Gtid &gtid)
59 {
60   DBUG_ENTER("Gtid_state::acquire_ownership");
61   // caller must take both global_sid_lock and lock on the SIDNO.
62   global_sid_lock->assert_some_lock();
63   gtid_state->assert_sidno_lock_owner(gtid.sidno);
64   DBUG_ASSERT(!executed_gtids.contains_gtid(gtid));
65   DBUG_PRINT("info", ("gtid=%d:%lld", gtid.sidno, gtid.gno));
66   DBUG_ASSERT(thd->owned_gtid.sidno == 0);
67   if (owned_gtids.add_gtid_owner(gtid, thd->thread_id()) != RETURN_STATUS_OK)
68     goto err;
69   if (thd->get_gtid_next_list() != NULL)
70   {
71 #ifdef HAVE_GTID_NEXT_LIST
72     thd->owned_gtid_set._add_gtid(gtid);
73     thd->owned_gtid.sidno= THD::OWNED_SIDNO_GTID_SET;
74     thd->owned_sid.clear();
75 #else
76     DBUG_ASSERT(0);
77 #endif
78   }
79   else
80   {
81     thd->owned_gtid= gtid;
82     thd->owned_gtid.dbug_print(NULL, "set owned_gtid in acquire_ownership");
83     thd->owned_sid= sid_map->sidno_to_sid(gtid.sidno);
84   }
85   RETURN_OK;
86 err:
87   if (thd->get_gtid_next_list() != NULL)
88   {
89 #ifdef HAVE_GTID_NEXT_LIST
90     Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
91     Gtid g= git.get();
92     while (g.sidno != 0)
93     {
94       owned_gtids.remove_gtid(g);
95       g= git.get();
96     }
97 #else
98     DBUG_ASSERT(0);
99 #endif
100   }
101   thd->clear_owned_gtids();
102   thd->owned_gtid.dbug_print(NULL,
103                              "set owned_gtid (clear) in acquire_ownership");
104   RETURN_REPORTED_ERROR;
105 }
106 
107 #ifdef HAVE_GTID_NEXT_LIST
lock_owned_sidnos(const THD * thd)108 void Gtid_state::lock_owned_sidnos(const THD *thd)
109 {
110   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
111     lock_sidnos(&thd->owned_gtid_set);
112   else if (thd->owned_gtid.sidno > 0)
113     lock_sidno(thd->owned_gtid.sidno);
114 }
115 #endif
116 
117 
unlock_owned_sidnos(const THD * thd)118 void Gtid_state::unlock_owned_sidnos(const THD *thd)
119 {
120   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
121   {
122 #ifdef HAVE_GTID_NEXT_LIST
123     unlock_sidnos(&thd->owned_gtid_set);
124 #else
125     DBUG_ASSERT(0);
126 #endif
127   }
128   else if (thd->owned_gtid.sidno > 0)
129   {
130     unlock_sidno(thd->owned_gtid.sidno);
131   }
132 }
133 
134 
broadcast_owned_sidnos(const THD * thd)135 void Gtid_state::broadcast_owned_sidnos(const THD *thd)
136 {
137   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
138   {
139 #ifdef HAVE_GTID_NEXT_LIST
140     broadcast_sidnos(&thd->owned_gtid_set);
141 #else
142     DBUG_ASSERT(0);
143 #endif
144   }
145   else if (thd->owned_gtid.sidno > 0)
146   {
147     broadcast_sidno(thd->owned_gtid.sidno);
148   }
149 }
150 
151 
update_commit_group(THD * first_thd)152 void Gtid_state::update_commit_group(THD *first_thd)
153 {
154   DBUG_ENTER("Gtid_state::update_commit_group");
155 
156   /*
157     We are going to loop in all sessions of the group commit in order to avoid
158     being taking and releasing the global_sid_lock and sidno_lock for each
159     session.
160   */
161   DEBUG_SYNC(first_thd, "update_gtid_state_before_global_sid_lock");
162   global_sid_lock->rdlock();
163   DEBUG_SYNC(first_thd, "update_gtid_state_after_global_sid_lock");
164 
165   update_gtids_impl_lock_sidnos(first_thd);
166 
167   for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
168   {
169     bool is_commit= (thd->commit_error != THD::CE_COMMIT_ERROR);
170 
171     if (update_gtids_impl_do_nothing(thd) ||
172         (!is_commit && update_gtids_impl_check_skip_gtid_rollback(thd)))
173       continue;
174 
175     bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
176 
177     if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
178     {
179       update_gtids_impl_own_gtid_set(thd, is_commit);
180     }
181     else if (thd->owned_gtid.sidno > 0)
182     {
183       update_gtids_impl_own_gtid(thd, is_commit);
184     }
185     else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
186     {
187       update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
188     }
189     else
190     {
191       update_gtids_impl_own_nothing(thd);
192     }
193 
194     update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
195   }
196 
197   update_gtids_impl_broadcast_and_unlock_sidnos();
198 
199   global_sid_lock->unlock();
200 
201   DBUG_VOID_RETURN;
202 }
203 
update_on_commit(THD * thd)204 void Gtid_state::update_on_commit(THD *thd)
205 {
206   DBUG_ENTER("Gtid_state::update_on_commit");
207 
208   update_gtids_impl(thd, true);
209   DEBUG_SYNC(thd, "end_of_gtid_state_update_on_commit");
210 
211   DBUG_VOID_RETURN;
212 }
213 
214 
update_on_rollback(THD * thd)215 void Gtid_state::update_on_rollback(THD *thd)
216 {
217   DBUG_ENTER("Gtid_state::update_on_rollback");
218 
219   if (!update_gtids_impl_check_skip_gtid_rollback(thd))
220     update_gtids_impl(thd, false);
221 
222   DBUG_VOID_RETURN;
223 }
224 
225 
update_gtids_impl(THD * thd,bool is_commit)226 void Gtid_state::update_gtids_impl(THD *thd, bool is_commit)
227 {
228   DBUG_ENTER("Gtid_state::update_gtids_impl");
229 
230   if (update_gtids_impl_do_nothing(thd))
231     DBUG_VOID_RETURN;
232 
233   bool more_trx_with_same_gtid_next= update_gtids_impl_begin(thd);
234 
235   DEBUG_SYNC(thd, "update_gtid_state_before_global_sid_lock");
236   global_sid_lock->rdlock();
237   DEBUG_SYNC(thd, "update_gtid_state_after_global_sid_lock");
238 
239   if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
240   {
241     update_gtids_impl_own_gtid_set(thd, is_commit);
242   }
243   else if (thd->owned_gtid.sidno > 0)
244   {
245     rpl_sidno sidno= thd->owned_gtid.sidno;
246     update_gtids_impl_lock_sidno(sidno);
247     update_gtids_impl_own_gtid(thd, is_commit);
248     update_gtids_impl_broadcast_and_unlock_sidno(sidno);
249   }
250   else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS)
251   {
252     update_gtids_impl_own_anonymous(thd, &more_trx_with_same_gtid_next);
253   }
254   else
255   {
256     update_gtids_impl_own_nothing(thd);
257   }
258 
259   global_sid_lock->unlock();
260 
261   update_gtids_impl_end(thd, more_trx_with_same_gtid_next);
262 
263   thd->owned_gtid.dbug_print(NULL,
264                              "set owned_gtid (clear) in update_gtids_impl");
265 
266   DBUG_VOID_RETURN;
267 }
268 
269 
end_gtid_violating_transaction(THD * thd)270 void Gtid_state::end_gtid_violating_transaction(THD *thd)
271 {
272   DBUG_ENTER("end_gtid_violating_transaction");
273   if (thd->has_gtid_consistency_violation)
274   {
275     if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
276       end_automatic_gtid_violating_transaction();
277     else
278     {
279       DBUG_ASSERT(thd->variables.gtid_next.type == ANONYMOUS_GROUP);
280       end_anonymous_gtid_violating_transaction();
281     }
282     thd->has_gtid_consistency_violation= false;
283   }
284   DBUG_VOID_RETURN;
285 }
286 
287 
wait_for_sidno(THD * thd,rpl_sidno sidno,struct timespec * abstime)288 bool Gtid_state::wait_for_sidno(THD *thd, rpl_sidno sidno,
289                                 struct timespec *abstime)
290 {
291   DBUG_ENTER("wait_for_sidno");
292   PSI_stage_info old_stage;
293   sid_lock->assert_some_lock();
294   sid_locks.assert_owner(sidno);
295   sid_locks.enter_cond(thd, sidno,
296                        &stage_waiting_for_gtid_to_be_committed,
297                        &old_stage);
298   bool ret= sid_locks.wait(thd, sidno, abstime);
299   // Can't call sid_locks.unlock() as that requires global_sid_lock.
300   mysql_mutex_unlock(thd->current_mutex);
301   thd->EXIT_COND(&old_stage);
302   DBUG_RETURN(ret);
303 }
304 
305 
wait_for_gtid(THD * thd,const Gtid & gtid,struct timespec * abstime)306 bool Gtid_state::wait_for_gtid(THD *thd, const Gtid &gtid,
307                                struct timespec *abstime)
308 {
309   DBUG_ENTER("Gtid_state::wait_for_gtid");
310   DBUG_PRINT("info", ("SIDNO=%d GNO=%lld thread_id=%u",
311                       gtid.sidno, gtid.gno,
312                       thd->thread_id()));
313   DBUG_ASSERT(!owned_gtids.is_owned_by(gtid, thd->thread_id()));
314   DBUG_ASSERT(!owned_gtids.is_owned_by(gtid, 0));
315 
316   bool ret= wait_for_sidno(thd, gtid.sidno, abstime);
317   DBUG_RETURN(ret);
318 }
319 
320 
wait_for_gtid_set(THD * thd,Gtid_set * wait_for,double timeout)321 bool Gtid_state::wait_for_gtid_set(THD *thd, Gtid_set* wait_for,
322                                    double timeout)
323 {
324   struct timespec abstime;
325   DBUG_ENTER("Gtid_state::wait_for_gtid_set");
326   DEBUG_SYNC(thd, "begin_wait_for_executed_gtid_set");
327   wait_for->dbug_print("Waiting for");
328   DBUG_PRINT("info", ("Timeout %f", timeout));
329 
330   global_sid_lock->assert_some_rdlock();
331 
332   DBUG_ASSERT(wait_for->get_sid_map() == global_sid_map);
333 
334   if (timeout > 0) {
335     set_timespec_nsec(&abstime,
336                       static_cast<ulonglong>(timeout * 1000000000ULL));
337   }
338 
339   /*
340     Algorithm:
341 
342     Let 'todo' contain the GTIDs to wait for. Iterate over SIDNOs in
343     'todo' (this is the 'for' loop below).
344 
345     For each SIDNO in 'todo', remove gtid_executed for that SIDNO from
346     'todo'.  If, after this removal, there is still some interval for
347     this SIDNO in 'todo', then wait for a signal on this SIDNO.
348     Repeat this step until 'todo' is empty for this SIDNO (this is the
349     innermost 'while' loop below).
350 
351     Once the loop over SIDNOs has completed, 'todo' is guaranteed to
352     be empty.  However, it may still be the case that not all GTIDs of
353     wait_for are included in gtid_executed, since RESET MASTER may
354     have been executed while we were waiting.
355 
356     RESET MASTER requires global_sid_lock.wrlock.  We hold
357     global_sid_lock.rdlock while removing GTIDs from 'todo', but the
358     wait operation releases global_sid_lock.rdlock.  So if we
359     completed the 'for' loop without waiting, we know for sure that
360     global_sid_lock.rdlock was held while emptying 'todo', and thus
361     RESET MASTER cannot have executed in the meantime.  But if we
362     waited at some point during the execution of the 'for' loop, RESET
363     MASTER may have been called.  Thus, we repeatedly run 'for' loop
364     until it completes without waiting (this is the outermost 'while'
365     loop).
366   */
367 
368   // Will be true once the entire 'for' loop completes without waiting.
369   bool verified= false;
370 
371   // The set of GTIDs that we are still waiting for.
372   Gtid_set todo(global_sid_map, NULL);
373   // As an optimization, add 100 Intervals that do not need to be
374   // allocated. This avoids allocation of these intervals.
375   static const int preallocated_interval_count= 100;
376   Gtid_set::Interval ivs[preallocated_interval_count];
377   todo.add_interval_memory(preallocated_interval_count, ivs);
378 
379   /*
380     Iterate until we have verified that all GTIDs in the set are
381     included in gtid_executed.
382   */
383   while (!verified)
384   {
385     todo.add_gtid_set(wait_for);
386 
387     // Iterate over SIDNOs until all GTIDs have been removed from 'todo'.
388 
389     // Set 'verified' to true; it will be set to 'false' if any wait
390     // is done.
391     verified= true;
392     for (int sidno= 1; sidno <= todo.get_max_sidno(); sidno++)
393     {
394       // Iterate until 'todo' is empty for this SIDNO.
395       while (todo.contains_sidno(sidno))
396       {
397         lock_sidno(sidno);
398         todo.remove_intervals_for_sidno(&executed_gtids, sidno);
399 
400         if (todo.contains_sidno(sidno))
401         {
402           bool ret= wait_for_sidno(thd, sidno, timeout > 0 ? &abstime : NULL);
403 
404           // wait_for_gtid will release both the global lock and the
405           // mutex.  Acquire the global lock again.
406           global_sid_lock->rdlock();
407           verified= false;
408 
409           if (thd->killed)
410           {
411             switch (thd->killed)
412             {
413             case ER_SERVER_SHUTDOWN:
414             case ER_QUERY_INTERRUPTED:
415             case ER_QUERY_TIMEOUT:
416               my_error(thd->killed, MYF(0));
417               break;
418             default:
419               my_error(ER_QUERY_INTERRUPTED, MYF(0));
420               break;
421             }
422             DBUG_RETURN(true);
423           }
424 
425           if (ret)
426             DBUG_RETURN(true);
427         }
428         else
429         {
430           // Keep the global lock since it may be needed in a later
431           // iteration of the for loop.
432           unlock_sidno(sidno);
433           break;
434         }
435       }
436     }
437   }
438   DBUG_RETURN(false);
439 }
440 
get_automatic_gno(rpl_sidno sidno) const441 rpl_gno Gtid_state::get_automatic_gno(rpl_sidno sidno) const
442 {
443   DBUG_ENTER("Gtid_state::get_automatic_gno");
444   Gtid_set::Const_interval_iterator ivit(&executed_gtids, sidno);
445   /*
446     When assigning new automatic GTIDs, we can optimize the assignment by start
447     searching an available GNO from the "supposed" next free one instead of
448     starting from 1.
449 
450     This is useful mostly on systems having many transactions committing in
451     group asking for automatic GTIDs. When a GNO is assigned to be owned by a
452     transaction, it is not removed from the free intervals, but will be added
453     to the owned_gtids set. In this way, picking up the actual first free GNO
454     would often lead to getting a GNO already owned by other thread. This can
455     lead to many "tries" of getting a free and not owned yet GNO (a thread
456     would try N times, N being the sum of transactions in the FLUSH stage plus
457     the transactions in the COMMIT stage that didn't released their ownership
458     yet).
459 
460     The optimization just set next_free_gno variable to the last assigned
461     GNO + 1, as this would be the common case without having transactions
462     rolling back. This is done at Gtid_state::generate_automatic_gtid.
463 
464     In order to fill the gaps of GTID_EXECUTED when a transaction rolls back
465     releasing the ownership of a GTID, we check if the released GNO is smaller
466     than the next_free_gno at Gtid_state::update_gtids_impl_own_gtid function
467     to set next_free_gno with the "released" GNO in this case.
468   */
469   Gtid next_candidate= { sidno,
470                          sidno == get_server_sidno() ? next_free_gno : 1 };
471   while (true)
472   {
473     const Gtid_set::Interval *iv= ivit.get();
474     rpl_gno next_interval_start= iv != NULL ? iv->start : MAX_GNO;
475     while (next_candidate.gno < next_interval_start &&
476            DBUG_EVALUATE_IF("simulate_gno_exhausted", false, true))
477     {
478       DBUG_PRINT("debug",("Checking availability of gno= %llu", next_candidate.gno));
479       if (owned_gtids.is_owned_by(next_candidate, 0))
480         DBUG_RETURN(next_candidate.gno);
481       next_candidate.gno++;
482     }
483     if (iv == NULL ||
484         DBUG_EVALUATE_IF("simulate_gno_exhausted", true, false))
485     {
486       my_error(ER_GNO_EXHAUSTED, MYF(0));
487       DBUG_RETURN(-1);
488     }
489     if (next_candidate.gno <= iv->end)
490       next_candidate.gno= iv->end;
491     ivit.next();
492   }
493 }
494 
495 
get_last_executed_gno(rpl_sidno sidno) const496 rpl_gno Gtid_state::get_last_executed_gno(rpl_sidno sidno) const
497 {
498   DBUG_ENTER("Gtid:state::get_last_executed_gno");
499   rpl_gno gno= 0;
500 
501   gtid_state->lock_sidno(sidno);
502   gno= executed_gtids.get_last_gno(sidno);
503   gtid_state->unlock_sidno(sidno);
504 
505   DBUG_RETURN(gno);
506 }
507 
508 
generate_automatic_gtid(THD * thd,rpl_sidno specified_sidno,rpl_gno specified_gno,rpl_sidno * locked_sidno)509 enum_return_status Gtid_state::generate_automatic_gtid(THD *thd,
510                                                        rpl_sidno specified_sidno,
511                                                        rpl_gno specified_gno,
512                                                        rpl_sidno *locked_sidno)
513 {
514   DBUG_ENTER("Gtid_state::generate_automatic_gtid");
515   enum_return_status ret= RETURN_STATUS_OK;
516 
517   DBUG_ASSERT(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
518   DBUG_ASSERT(specified_sidno >= 0);
519   DBUG_ASSERT(specified_gno >= 0);
520   DBUG_ASSERT(thd->owned_gtid.is_empty());
521 
522   bool locked_sidno_was_passed_null = (locked_sidno == NULL);
523 
524   if (locked_sidno_was_passed_null)
525     sid_lock->rdlock();
526   else
527     /* The caller must lock the sid_lock when locked_sidno is passed */
528     sid_lock->assert_some_lock();
529 
530   // If GTID_MODE = ON_PERMISSIVE or ON, generate a new GTID
531   if (get_gtid_mode(GTID_MODE_LOCK_SID) >= GTID_MODE_ON_PERMISSIVE)
532   {
533     Gtid automatic_gtid= { specified_sidno, specified_gno };
534 
535     if (automatic_gtid.sidno == 0)
536       automatic_gtid.sidno= get_server_sidno();
537 
538     /*
539       We need to lock the sidno if locked_sidno wasn't passed as paramenter
540       or the already locked sidno doesn't match the one to generate the new
541       automatic GTID.
542     */
543     bool need_to_lock_sidno= (locked_sidno_was_passed_null ||
544                               *locked_sidno != automatic_gtid.sidno);
545     if (need_to_lock_sidno)
546     {
547       /*
548         When locked_sidno contains a value greater than zero we must release
549         the current locked sidno. This should not happen with current code, as
550         the server only generates automatic GTIDs with server's UUID as sid.
551       */
552       if (!locked_sidno_was_passed_null && *locked_sidno != 0)
553         unlock_sidno(*locked_sidno);
554       lock_sidno(automatic_gtid.sidno);
555       /* Update the locked_sidno, so the caller would know what to unlock */
556       if (!locked_sidno_was_passed_null)
557         *locked_sidno= automatic_gtid.sidno;
558     }
559 
560     if (automatic_gtid.gno == 0)
561     {
562       automatic_gtid.gno= get_automatic_gno(automatic_gtid.sidno);
563       if (automatic_gtid.sidno == get_server_sidno() &&
564           automatic_gtid.gno != -1)
565         next_free_gno= automatic_gtid.gno + 1;
566     }
567 
568     if (automatic_gtid.gno != -1)
569       acquire_ownership(thd, automatic_gtid);
570     else
571       ret= RETURN_STATUS_REPORTED_ERROR;
572 
573     /* The caller will unlock the sidno_lock if locked_sidno was passed */
574     if (locked_sidno_was_passed_null)
575       unlock_sidno(automatic_gtid.sidno);
576 
577   }
578   else
579   {
580     // If GTID_MODE = OFF or OFF_PERMISSIVE, just mark this thread as
581     // using an anonymous transaction.
582     thd->owned_gtid.sidno= THD::OWNED_SIDNO_ANONYMOUS;
583     thd->owned_gtid.gno= 0;
584     acquire_anonymous_ownership();
585     thd->owned_gtid.dbug_print(NULL,
586                                "set owned_gtid (anonymous) in generate_automatic_gtid");
587   }
588 
589   /* The caller will unlock the sid_lock if locked_sidno was passed */
590   if (locked_sidno_was_passed_null)
591     sid_lock->unlock();
592 
593   gtid_set_performance_schema_values(thd);
594 
595   DBUG_RETURN(ret);
596 }
597 
598 
lock_sidnos(const Gtid_set * gs)599 void Gtid_state::lock_sidnos(const Gtid_set *gs)
600 {
601   DBUG_ASSERT(gs);
602   rpl_sidno max_sidno= gs->get_max_sidno();
603   for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
604     if (gs->contains_sidno(sidno))
605       lock_sidno(sidno);
606 }
607 
608 
unlock_sidnos(const Gtid_set * gs)609 void Gtid_state::unlock_sidnos(const Gtid_set *gs)
610 {
611   DBUG_ASSERT(gs);
612   rpl_sidno max_sidno= gs->get_max_sidno();
613   for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
614     if (gs->contains_sidno(sidno))
615       unlock_sidno(sidno);
616 }
617 
618 
broadcast_sidnos(const Gtid_set * gs)619 void Gtid_state::broadcast_sidnos(const Gtid_set *gs)
620 {
621   DBUG_ASSERT(gs);
622   rpl_sidno max_sidno= gs->get_max_sidno();
623   for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
624     if (gs->contains_sidno(sidno))
625       broadcast_sidno(sidno);
626 }
627 
628 
ensure_sidno()629 enum_return_status Gtid_state::ensure_sidno()
630 {
631   DBUG_ENTER("Gtid_state::ensure_sidno");
632   sid_lock->assert_some_wrlock();
633   rpl_sidno sidno= sid_map->get_max_sidno();
634   if (sidno > 0)
635   {
636     // The lock may be temporarily released during one of the calls to
637     // ensure_sidno or ensure_index.  Hence, we must re-check the
638     // condition after the calls.
639     PROPAGATE_REPORTED_ERROR(executed_gtids.ensure_sidno(sidno));
640     PROPAGATE_REPORTED_ERROR(gtids_only_in_table.ensure_sidno(sidno));
641     PROPAGATE_REPORTED_ERROR(previous_gtids_logged.ensure_sidno(sidno));
642     PROPAGATE_REPORTED_ERROR(lost_gtids.ensure_sidno(sidno));
643     PROPAGATE_REPORTED_ERROR(owned_gtids.ensure_sidno(sidno));
644     PROPAGATE_REPORTED_ERROR(sid_locks.ensure_index(sidno));
645     PROPAGATE_REPORTED_ERROR(ensure_commit_group_sidnos(sidno));
646     sidno= sid_map->get_max_sidno();
647     DBUG_ASSERT(executed_gtids.get_max_sidno() >= sidno);
648     DBUG_ASSERT(gtids_only_in_table.get_max_sidno() >= sidno);
649     DBUG_ASSERT(previous_gtids_logged.get_max_sidno() >= sidno);
650     DBUG_ASSERT(lost_gtids.get_max_sidno() >= sidno);
651     DBUG_ASSERT(owned_gtids.get_max_sidno() >= sidno);
652     DBUG_ASSERT(sid_locks.get_max_index() >= sidno);
653     DBUG_ASSERT(commit_group_sidnos.size() >= (unsigned int)sidno);
654   }
655   RETURN_OK;
656 }
657 
658 
add_lost_gtids(const Gtid_set * gtid_set)659 enum_return_status Gtid_state::add_lost_gtids(const Gtid_set *gtid_set)
660 {
661   DBUG_ENTER("Gtid_state::add_lost_gtids()");
662   sid_lock->assert_some_wrlock();
663 
664   gtid_set->dbug_print("add_lost_gtids");
665 
666   if (!executed_gtids.is_empty())
667   {
668     BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY)),
669                  (ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY,
670                  MYF(0)));
671     RETURN_REPORTED_ERROR;
672   }
673   if (!owned_gtids.is_empty())
674   {
675     BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY)),
676                  (ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY,
677                  MYF(0)));
678     RETURN_REPORTED_ERROR;
679   }
680   DBUG_ASSERT(lost_gtids.is_empty());
681 
682   if (save(gtid_set))
683     RETURN_REPORTED_ERROR;
684   PROPAGATE_REPORTED_ERROR(gtids_only_in_table.add_gtid_set(gtid_set));
685   PROPAGATE_REPORTED_ERROR(lost_gtids.add_gtid_set(gtid_set));
686   PROPAGATE_REPORTED_ERROR(executed_gtids.add_gtid_set(gtid_set));
687   lock_sidnos(gtid_set);
688   broadcast_sidnos(gtid_set);
689   unlock_sidnos(gtid_set);
690 
691   DBUG_RETURN(RETURN_STATUS_OK);
692 }
693 
694 
init()695 int Gtid_state::init()
696 {
697   DBUG_ENTER("Gtid_state::init()");
698 
699   global_sid_lock->assert_some_wrlock();
700 
701   rpl_sid server_sid;
702   if (server_sid.parse(server_uuid) != 0)
703     DBUG_RETURN(1);
704   rpl_sidno sidno= sid_map->add_sid(server_sid);
705   if (sidno <= 0)
706     DBUG_RETURN(1);
707   server_sidno= sidno;
708   next_free_gno= 1;
709   DBUG_RETURN(0);
710 }
711 
712 
save(THD * thd)713 int Gtid_state::save(THD *thd)
714 {
715   DBUG_ENTER("Gtid_state::save(THD *thd)");
716   DBUG_ASSERT(gtid_table_persistor != NULL);
717   DBUG_ASSERT(thd->owned_gtid.sidno > 0);
718   int error= 0;
719 
720   int ret= gtid_table_persistor->save(thd, &thd->owned_gtid);
721   if (1 == ret)
722   {
723     /*
724       Gtid table is not ready to be used, so failed to
725       open it. Ignore the error.
726     */
727     thd->clear_error();
728     if (!thd->get_stmt_da()->is_set())
729         thd->get_stmt_da()->set_ok_status(0, 0, NULL);
730   }
731   else if (-1 == ret)
732     error= -1;
733 
734   DBUG_RETURN(error);
735 }
736 
737 
save(const Gtid_set * gtid_set)738 int Gtid_state::save(const Gtid_set *gtid_set)
739 {
740   DBUG_ENTER("Gtid_state::save(Gtid_set *gtid_set)");
741   int ret= gtid_table_persistor->save(gtid_set);
742   DBUG_RETURN(ret);
743 }
744 
745 
save_gtids_of_last_binlog_into_table(bool on_rotation)746 int Gtid_state::save_gtids_of_last_binlog_into_table(bool on_rotation)
747 {
748   DBUG_ENTER("Gtid_state::save_gtids_of_last_binlog_into_table");
749   int ret= 0;
750 
751   /*
752     Use local Sid_map, so that we don't need a lock while inserting
753     into the table.
754   */
755   Sid_map sid_map(NULL);
756   Gtid_set logged_gtids_last_binlog(&sid_map, NULL);
757   // Allocate some intervals on stack to reduce allocation.
758   static const int PREALLOCATED_INTERVAL_COUNT= 64;
759   Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT];
760   logged_gtids_last_binlog.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv);
761   /*
762     logged_gtids_last_binlog= executed_gtids - previous_gtids_logged -
763                               gtids_only_in_table
764   */
765   global_sid_lock->wrlock();
766   ret= (logged_gtids_last_binlog.add_gtid_set(&executed_gtids) !=
767         RETURN_STATUS_OK);
768   if (!ret)
769   {
770     logged_gtids_last_binlog.remove_gtid_set(&previous_gtids_logged);
771     logged_gtids_last_binlog.remove_gtid_set(&gtids_only_in_table);
772     if (!logged_gtids_last_binlog.is_empty() ||
773         mysql_bin_log.is_rotating_caused_by_incident)
774     {
775       /* Prepare previous_gtids_logged for next binlog on binlog rotation */
776       if (on_rotation)
777         ret= previous_gtids_logged.add_gtid_set(&logged_gtids_last_binlog);
778       global_sid_lock->unlock();
779       /* Save set of GTIDs of the last binlog into gtid_executed table */
780       if (!ret)
781         ret= save(&logged_gtids_last_binlog);
782     }
783     else
784       global_sid_lock->unlock();
785   }
786   else
787     global_sid_lock->unlock();
788 
789   DBUG_RETURN(ret);
790 }
791 
792 
read_gtid_executed_from_table()793 int Gtid_state::read_gtid_executed_from_table()
794 {
795   return gtid_table_persistor->fetch_gtids(&executed_gtids);
796 }
797 
798 
compress(THD * thd)799 int Gtid_state::compress(THD *thd)
800 {
801   return gtid_table_persistor->compress(thd);
802 }
803 
804 
805 #ifdef MYSQL_SERVER
warn_or_err_on_modify_gtid_table(THD * thd,TABLE_LIST * table)806 int Gtid_state::warn_or_err_on_modify_gtid_table(THD *thd, TABLE_LIST *table)
807 {
808   DBUG_ENTER("Gtid_state::warn_or_err_on_modify_gtid_table");
809   int ret=
810     gtid_table_persistor->warn_or_err_on_explicit_modification(thd, table);
811   DBUG_RETURN(ret);
812 }
813 #endif
814 
update_gtids_impl_check_skip_gtid_rollback(THD * thd)815 bool Gtid_state::update_gtids_impl_check_skip_gtid_rollback(THD *thd)
816 {
817   if (thd->skip_gtid_rollback)
818   {
819     DBUG_PRINT("info", ("skipping gtid rollback because "
820                         "thd->skip_gtid_rollback is set"));
821     return true;
822   }
823   return false;
824 }
825 
update_gtids_impl_do_nothing(THD * thd)826 bool Gtid_state::update_gtids_impl_do_nothing(THD *thd)
827 {
828   if (thd->owned_gtid.is_empty() && !thd->has_gtid_consistency_violation)
829   {
830     if (thd->variables.gtid_next.type == GTID_GROUP)
831       thd->variables.gtid_next.set_undefined();
832     DBUG_PRINT("info", ("skipping update_gtids_impl because "
833                         "thread does not own anything and does not violate "
834                         "gtid consistency"));
835 
836     return true;
837   }
838   return false;
839 }
840 
update_gtids_impl_begin(THD * thd)841 bool Gtid_state::update_gtids_impl_begin(THD *thd)
842 {
843 #ifndef DBUG_OFF
844   if (current_thd != thd)
845     mysql_mutex_lock(&thd->LOCK_thd_query);
846   DBUG_PRINT("info", ("query='%s' thd->is_commit_in_middle_of_statement=%d",
847                       thd->query().str,
848                       thd->is_commit_in_middle_of_statement));
849   if (current_thd != thd)
850     mysql_mutex_unlock(&thd->LOCK_thd_query);
851 #endif
852   return thd->is_commit_in_middle_of_statement;
853 }
854 
update_gtids_impl_own_gtid_set(THD * thd,bool is_commit)855 void Gtid_state::update_gtids_impl_own_gtid_set(THD *thd, bool is_commit)
856 {
857 #ifdef HAVE_GTID_NEXT_LIST
858   rpl_sidno prev_sidno= 0;
859   Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
860   Gtid g= git.get();
861   while (g.sidno != 0)
862   {
863     if (g.sidno != prev_sidno)
864       sid_locks.lock(g.sidno);
865     owned_gtids.remove_gtid(g);
866     git.next();
867     g= git.get();
868     if (is_commit)
869       executed_gtids._add_gtid(g);
870   }
871 
872   if (is_commit && !thd->owned_gtid_set.is_empty())
873     thd->rpl_thd_ctx.session_gtids_ctx().
874       notify_after_gtid_executed_update(thd);
875 
876   thd->variables.gtid_next.set_undefined();
877   thd->owned_gtid.dbug_print(NULL,
878                              "set owned_gtid (clear; old was gtid_set) "
879                              "in update_gtids_impl");
880   thd->clear_owned_gtids();
881 #else
882   DBUG_ASSERT(0);
883 #endif
884 }
885 
update_gtids_impl_lock_sidno(rpl_sidno sidno)886 void Gtid_state::update_gtids_impl_lock_sidno(rpl_sidno sidno)
887 {
888   DBUG_ASSERT(sidno > 0);
889   DBUG_PRINT("info",("Locking sidno %d", sidno));
890   lock_sidno(sidno);
891 }
892 
update_gtids_impl_lock_sidnos(THD * first_thd)893 void Gtid_state::update_gtids_impl_lock_sidnos(THD *first_thd)
894 {
895   /* Define which sidnos should be locked to be updated */
896   for (THD *thd= first_thd; thd != NULL; thd= thd->next_to_commit)
897   {
898     if (thd->owned_gtid.sidno > 0)
899     {
900       DBUG_PRINT("info",("Setting sidno %d to be locked",
901                          thd->owned_gtid.sidno));
902       commit_group_sidnos[thd->owned_gtid.sidno]= true;
903     }
904     else if (thd->owned_gtid.sidno == THD::OWNED_SIDNO_GTID_SET)
905 #ifdef HAVE_GTID_NEXT_LIST
906       for (rpl_sidno i= 1; i < thd->owned_gtid_set.max_sidno; i++)
907         if (owned_gtid_set.contains_sidno(i))
908           commit_group_sidnos[i]= true;
909 #else
910       DBUG_ASSERT(0);
911 #endif
912   }
913 
914   /* Take the sidno_locks in order */
915   for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
916     if (commit_group_sidnos[i])
917       update_gtids_impl_lock_sidno(i);
918 }
919 
update_gtids_impl_own_gtid(THD * thd,bool is_commit)920 void Gtid_state::update_gtids_impl_own_gtid(THD *thd, bool is_commit)
921 {
922   assert_sidno_lock_owner(thd->owned_gtid.sidno);
923   /*
924     In Group Replication the GTID may additionally be owned by another
925     thread, and we won't remove that ownership (it will be rolled back later)
926   */
927   DBUG_ASSERT(owned_gtids.is_owned_by(thd->owned_gtid, thd->thread_id()));
928   owned_gtids.remove_gtid(thd->owned_gtid, thd->thread_id());
929 
930   if (is_commit)
931   {
932     DBUG_ASSERT(!executed_gtids.contains_gtid(thd->owned_gtid));
933     DBUG_EXECUTE_IF(
934       "rpl_gtid_update_on_commit_simulate_out_of_memory",
935       DBUG_SET("+d,rpl_gtid_get_free_interval_simulate_out_of_memory"););
936     /*
937       Any session adds transaction owned GTID into global executed_gtids.
938 
939       If binlog is disabled, we report @@GLOBAL.GTID_PURGED from
940       executed_gtids, since @@GLOBAL.GTID_PURGED and @@GLOBAL.GTID_EXECUTED
941       are always same, so we did not save gtid into lost_gtids for every
942       transaction for improving performance.
943 
944       If binlog is enabled and log_slave_updates is disabled, slave
945       SQL thread or slave worker thread adds transaction owned GTID
946       into global executed_gtids, lost_gtids and gtids_only_in_table.
947     */
948     executed_gtids._add_gtid(thd->owned_gtid);
949     thd->rpl_thd_ctx.session_gtids_ctx().
950       notify_after_gtid_executed_update(thd);
951     if (thd->slave_thread && opt_bin_log && !opt_log_slave_updates)
952     {
953       lost_gtids._add_gtid(thd->owned_gtid);
954       gtids_only_in_table._add_gtid(thd->owned_gtid);
955     }
956   }
957   else
958   {
959     if (thd->owned_gtid.sidno == server_sidno &&
960         next_free_gno > thd->owned_gtid.gno)
961       next_free_gno= thd->owned_gtid.gno;
962   }
963 
964   thd->clear_owned_gtids();
965   if (thd->variables.gtid_next.type == GTID_GROUP)
966   {
967     DBUG_ASSERT(!thd->is_commit_in_middle_of_statement);
968     thd->variables.gtid_next.set_undefined();
969   }
970   else
971   {
972     /*
973       Can be UNDEFINED for statements where
974       gtid_pre_statement_checks skips the test for undefined,
975       e.g. ROLLBACK.
976     */
977     DBUG_ASSERT(thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
978                 thd->variables.gtid_next.type == UNDEFINED_GROUP);
979   }
980 }
981 
update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)982 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidno(rpl_sidno sidno)
983 {
984   DBUG_PRINT("info",("Unlocking sidno %d", sidno));
985   broadcast_sidno(sidno);
986   unlock_sidno(sidno);
987 }
988 
update_gtids_impl_broadcast_and_unlock_sidnos()989 void Gtid_state::update_gtids_impl_broadcast_and_unlock_sidnos()
990 {
991   for (rpl_sidno i= 1; i < (rpl_sidno)commit_group_sidnos.size(); i++)
992     if (commit_group_sidnos[i])
993     {
994       update_gtids_impl_broadcast_and_unlock_sidno(i);
995       commit_group_sidnos[i]= false;
996     }
997 }
998 
update_gtids_impl_own_anonymous(THD * thd,bool * more_trx)999 void Gtid_state::update_gtids_impl_own_anonymous(THD* thd,
1000                                                  bool *more_trx)
1001 {
1002   DBUG_ASSERT(thd->variables.gtid_next.type == ANONYMOUS_GROUP ||
1003               thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1004   /*
1005     If there is more in the transaction cache, set more_trx to indicate this.
1006 
1007     See comment for the update_gtids_impl_begin function.
1008   */
1009   if (opt_bin_log)
1010   {
1011     // Needed before is_binlog_cache_empty.
1012     thd->binlog_setup_trx_data();
1013     if (!thd->is_binlog_cache_empty(true))
1014     {
1015       *more_trx= true;
1016       DBUG_PRINT("info", ("Transaction cache is non-empty: setting "
1017                           "more_transaction_with_same_gtid_next="
1018                           "true."));
1019     }
1020   }
1021   if (!(*more_trx &&
1022         thd->variables.gtid_next.type == ANONYMOUS_GROUP))
1023   {
1024     release_anonymous_ownership();
1025     thd->clear_owned_gtids();
1026   }
1027 }
1028 
update_gtids_impl_own_nothing(THD * thd)1029 void Gtid_state::update_gtids_impl_own_nothing(THD *thd)
1030 {
1031   DBUG_ASSERT(thd->commit_error != THD::CE_COMMIT_ERROR ||
1032               thd->has_gtid_consistency_violation);
1033   DBUG_ASSERT(thd->variables.gtid_next.type == AUTOMATIC_GROUP);
1034 }
1035 
update_gtids_impl_end(THD * thd,bool more_trx)1036 void Gtid_state::update_gtids_impl_end(THD *thd, bool more_trx)
1037 {
1038   if (!more_trx)
1039     end_gtid_violating_transaction(thd);
1040 }
1041 
ensure_commit_group_sidnos(rpl_sidno sidno)1042 enum_return_status Gtid_state::ensure_commit_group_sidnos(rpl_sidno sidno)
1043 {
1044   DBUG_ENTER("Gtid_state::ensure_commit_group_sidnos");
1045   sid_lock->assert_some_wrlock();
1046   /*
1047     As we use the sidno as index of commit_group_sidnos and there is no
1048     sidno=0, the array size must be at least sidno + 1.
1049   */
1050   while ((commit_group_sidnos.size()) < (size_t)sidno + 1)
1051   {
1052     if (commit_group_sidnos.push_back(false))
1053       goto error;
1054   }
1055   RETURN_OK;
1056 error:
1057   BINLOG_ERROR(("Out of memory."), (ER_OUT_OF_RESOURCES, MYF(0)));
1058   RETURN_REPORTED_ERROR;
1059 
1060 }
1061