1 /* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include "rpl_trx_tracking.h"
24 
25 #include "mysqld.h"
26 #include "binlog.h"
27 
28 #include "debug_sync.h"
29 
Logical_clock()30 Logical_clock::Logical_clock()
31   : state(SEQ_UNINIT), offset(0)
32 {}
33 
34 /**
35   Atomically fetch the current state.
36   @return  not subtracted "absolute" value.
37  */
get_timestamp()38 inline int64 Logical_clock::get_timestamp()
39 {
40   int64 retval= 0;
41   DBUG_ENTER("Logical_clock::get_timestamp");
42   retval= my_atomic_load64(&state);
43   DBUG_RETURN(retval);
44 }
45 
46 /**
47   Steps the absolute value of the clock (state) to return
48   an updated value.
49   The caller must be sure to call the method in no concurrent
50   execution context so either offset and state can't change.
51 
52   @return  incremented "absolute" value
53  */
step()54 inline int64 Logical_clock::step()
55 {
56   assert(SEQ_UNINIT == 0);
57   DBUG_EXECUTE_IF("logical_clock_step_2", ++state;);
58   return ++state;
59 }
60 
61 /**
62   To try setting the clock *forward*.
63   The clock does not change when the new value is in the past
64   which is reflected by the new value and by offset.
65   In other words the function main effects is described as
66     state= max(state, new_value).
67   Offset that exceeds the new value indicates the binary log rotation
68   to render such new value useless.
69 
70   @param  new_val  a new value (offset included)
71   @return a (new) value of state member regardless whether it's changed or not.
72  */
set_if_greater(int64 new_val)73 inline int64 Logical_clock::set_if_greater(int64 new_val)
74 {
75   longlong old_val= new_val - 1;
76   bool cas_rc;
77 
78   DBUG_ENTER("Logical_clock::set_if_greater");
79 
80   assert(new_val > 0);
81 
82   if (new_val <= offset)
83   {
84     /*
85       This function's invocation can be separated from the
86       transaction's flushing by few rotations. A late to log
87       transaction does not change the clock, similarly to how
88       its timestamps are handled at flushing.
89     */
90     DBUG_RETURN(SEQ_UNINIT);
91   }
92 
93   assert(new_val > 0);
94 
95   while (!(cas_rc= my_atomic_cas64(&state, &old_val, new_val)) &&
96          old_val < new_val)
97   {}
98 
99   assert(state >= new_val); // setting can't be done to past
100 
101   assert(cas_rc || old_val >= new_val);
102 
103   DBUG_RETURN(cas_rc ? new_val : old_val);
104 }
105 
106 
107 /**
108   Get the sequence_number for a transaction, and get the last_commit based
109   on parallel committing transactions.
110 
111   @param[in]     thd             Current THD from which to extract trx context.
112   @param[in,out] sequence_number Sequence number of current transaction.
113   @param[in,out] commit_parent   Commit_parent of current transaction,
114                                  pre-filled with the commit_parent calculated
115                                  by the logical clock logic.
116 */
117 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)118 Commit_order_trx_dependency_tracker::get_dependency(THD *thd,
119                                                     int64 &sequence_number,
120                                                     int64 &commit_parent)
121 {
122   Transaction_ctx *trn_ctx= thd->get_transaction();
123 
124   assert(trn_ctx->sequence_number
125          > m_max_committed_transaction.get_offset());
126   /*
127     Prepare sequence_number and commit_parent relative to the current
128     binlog.  This is done by subtracting the binlog's clock offset
129     from the values.
130 
131     A transaction that commits after the binlog is rotated, can have a
132     commit parent in the previous binlog. In this case, subtracting
133     the offset from the sequence number results in a negative
134     number. The commit parent dependency gets lost in such
135     case. Therefore, we log the value SEQ_UNINIT in this case.
136   */
137   sequence_number=
138     trn_ctx->sequence_number - m_max_committed_transaction.get_offset();
139 
140   commit_parent=
141     trn_ctx->last_committed <= m_max_committed_transaction.get_offset()
142            ? SEQ_UNINIT
143            : trn_ctx->last_committed - m_max_committed_transaction.get_offset();
144 }
145 
146 int64
step()147 Commit_order_trx_dependency_tracker::step()
148 {
149   return m_transaction_counter.step();
150 }
151 
152 void
rotate()153 Commit_order_trx_dependency_tracker::rotate()
154 {
155   m_max_committed_transaction.
156     update_offset(m_transaction_counter.get_timestamp());
157 
158   m_transaction_counter.
159     update_offset(m_transaction_counter.get_timestamp());
160 }
161 
162 void
update_max_committed(int64 sequence_number)163 Commit_order_trx_dependency_tracker::update_max_committed(int64 sequence_number)
164 {
165   mysql_mutex_assert_owner(&LOCK_slave_trans_dep_tracker);
166   m_max_committed_transaction.set_if_greater(sequence_number);
167 }
168 
169 /**
170   Get the writeset dependencies of a transaction.
171   This takes the commit_parent that must be previously set using
172   Commit_order_trx_dependency_tracker and tries to make the commit_parent as
173   low as possible, using the writesets of each transaction.
174   The commit_parent returned depends on how many row hashes are stored in the
175   writeset_history, which is cleared once it reaches the user-defined maximum.
176 
177   @param[in]     thd             Current THD from which to extract trx context.
178   @param[in,out] sequence_number Sequence number of current transaction.
179   @param[in,out] commit_parent   Commit_parent of current transaction,
180                                  pre-filled with the commit_parent calculated by
181                                  Commit_order_trx_dependency_tracker to use when
182                                  the writeset commit_parent is not valid.
183 */
184 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)185 Writeset_trx_dependency_tracker::get_dependency(THD *thd,
186                                                 int64 &sequence_number,
187                                                 int64 &commit_parent)
188 {
189   Rpl_transaction_write_set_ctx *write_set_ctx=
190     thd->get_transaction()->get_transaction_write_set_ctx();
191   std::set<uint64> *writeset= write_set_ctx->get_write_set();
192 
193 #ifndef NDEBUG
194   /* The writeset of an empty transaction must be empty. */
195   if (is_empty_transaction_in_binlog_cache(thd))
196     assert(writeset->size() == 0);
197 #endif
198 
199   /*
200     Check if this transaction has a writeset, if the writeset will overflow the
201     history size, if the transaction_write_set_extraction is consistent
202     between session and global or if changes in the tables referenced in this
203     transaction cascade to other tables. If that happens revert to using the
204     COMMIT_ORDER and clear the history to keep data consistent.
205   */
206   bool can_use_writesets=
207     // empty writeset implies DDL or similar, except if there are missing keys
208     (writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
209      /*
210        The empty transactions do not need to clear the writeset history, since
211        they can be executed in parallel.
212      */
213      is_empty_transaction_in_binlog_cache(thd)) &&
214     // hashing algorithm for the session must be the same as used by other rows in history
215     (global_system_variables.transaction_write_set_extraction ==
216      thd->variables.transaction_write_set_extraction) &&
217     // must not use foreign keys
218     !write_set_ctx->get_has_related_foreign_keys() &&
219     // it did not broke past the capacity already
220     !write_set_ctx->was_write_set_limit_reached();
221   bool exceeds_capacity= false;
222 
223   mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
224   if (can_use_writesets)
225   {
226     /*
227      Check if adding this transaction exceeds the capacity of the writeset
228      history. If that happens, m_writeset_history will be cleared only after
229      using its information for current transaction.
230     */
231     exceeds_capacity=
232       m_writeset_history.size() + writeset->size() > get_opt_max_history_size();
233 
234     /*
235      Compute the greatest sequence_number among all conflicts and add the
236      transaction's row hashes to the history.
237     */
238     DEBUG_SYNC(thd, "wait_in_get_dependency");
239     int64 last_parent= m_writeset_history_start;
240     for (std::set<uint64>::iterator it= writeset->begin();
241          it != writeset->end(); ++it)
242     {
243       Writeset_history::iterator hst= m_writeset_history.find(*it);
244       if (hst != m_writeset_history.end())
245       {
246         if (hst->second > last_parent && hst->second < sequence_number)
247           last_parent= hst->second;
248 
249         hst->second= sequence_number;
250       }
251       else
252       {
253         if (!exceeds_capacity)
254           m_writeset_history.insert(std::pair<uint64, int64>(*it, sequence_number));
255       }
256     }
257 
258     /*
259       If the transaction references tables with missing primary keys revert to
260       COMMIT_ORDER, update and not reset history, as it is unnecessary because
261       any transaction that refers this table will also revert to COMMIT_ORDER.
262     */
263     if (!write_set_ctx->get_has_missing_keys())
264     {
265       /*
266        The WRITESET commit_parent then becomes the minimum of largest parent
267        found using the hashes of the row touched by the transaction and the
268        commit parent calculated with COMMIT_ORDER.
269       */
270       commit_parent= std::min(last_parent, commit_parent);
271     }
272   }
273 
274   if (exceeds_capacity || !can_use_writesets)
275   {
276     m_writeset_history_start= sequence_number;
277     m_writeset_history.clear();
278   }
279   mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
280 }
281 
282 void
rotate(int64 start)283 Writeset_trx_dependency_tracker::rotate(int64 start)
284 {
285   m_writeset_history_start= start;
286   m_writeset_history.clear();
287 }
288 
289 /**
290   Get the writeset commit parent of transactions using the session dependencies.
291 
292   @param[in]     thd             Current THD from which to extract trx context.
293   @param[in,out] sequence_number Sequence number of current transaction.
294   @param[in,out] commit_parent   Commit_parent of current transaction,
295                                  pre-filled with the commit_parent calculated
296                                  by the Write_set_trx_dependency_tracker as a
297                                  fall-back.
298 */
299 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)300 Writeset_session_trx_dependency_tracker::get_dependency(THD *thd,
301                                                         int64 &sequence_number,
302                                                         int64 &commit_parent)
303 {
304   int64 session_parent= thd->rpl_thd_ctx.dependency_tracker_ctx().
305                         get_last_session_sequence_number();
306 
307   if (session_parent != 0 && session_parent < sequence_number)
308     commit_parent= std::max(commit_parent, session_parent);
309 
310   thd->rpl_thd_ctx.dependency_tracker_ctx().
311     set_last_session_sequence_number(sequence_number);
312 }
313 
314 /**
315   Get the dependencies in a transaction, the main entry point for the
316   dependency tracking work.
317 */
318 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)319 Transaction_dependency_tracker::get_dependency(THD *thd,
320                                                int64 &sequence_number,
321                                                int64 &commit_parent)
322 {
323   sequence_number= commit_parent= 0;
324 
325   switch(my_atomic_load64(&m_opt_tracking_mode))
326   {
327     case DEPENDENCY_TRACKING_COMMIT_ORDER:
328       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
329       break;
330     case DEPENDENCY_TRACKING_WRITESET:
331       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
332       m_writeset.get_dependency(thd, sequence_number, commit_parent);
333       break;
334     case DEPENDENCY_TRACKING_WRITESET_SESSION:
335       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
336       m_writeset.get_dependency(thd, sequence_number, commit_parent);
337       m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
338       break;
339     default:
340       assert(0);  // blow up on debug
341       /*
342         Fallback to commit order on production builds.
343        */
344       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
345   }
346 }
347 
348 void
tracking_mode_changed()349 Transaction_dependency_tracker::tracking_mode_changed()
350 {
351   Logical_clock max_committed_transaction=
352     m_commit_order.get_max_committed_transaction();
353   int64 timestamp= max_committed_transaction.get_timestamp()
354                    - max_committed_transaction.get_offset();
355 
356   m_writeset.rotate(timestamp);
357 }
358 
359 /**
360   The method is to be executed right before committing time.
361   It must be invoked even if the transaction does not commit
362   to engine being merely logged into the binary log.
363   max_committed_transaction is updated with a greater timestamp
364   value.
365   As a side effect, the transaction context's sequence_number
366   is reset.
367 
368   @param thd a pointer to THD instance
369 */
370 void
update_max_committed(THD * thd)371 Transaction_dependency_tracker::update_max_committed(THD *thd)
372 {
373   Transaction_ctx *trn_ctx= thd->get_transaction();
374   m_commit_order.update_max_committed(trn_ctx->sequence_number);
375   /*
376     sequence_number timestamp isn't needed anymore, so it's cleared off.
377   */
378   trn_ctx->sequence_number= SEQ_UNINIT;
379 
380   assert(trn_ctx->last_committed == SEQ_UNINIT ||
381          thd->commit_error == THD::CE_FLUSH_ERROR ||
382          thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR);
383 }
384 
385 int64
step()386 Transaction_dependency_tracker::step()
387 {
388   return m_commit_order.step();
389 }
390 
391 void
rotate()392 Transaction_dependency_tracker::rotate()
393 {
394   m_commit_order.rotate();
395   /*
396     To make slave appliers be able to execute transactions in parallel
397     after rotation, set the minimum commit_parent to 1 after rotation.
398   */
399   m_writeset.rotate(1);
400   if (current_thd)
401     current_thd->get_transaction()->sequence_number= 2;
402 }
403 
get_max_committed_timestamp()404 int64 Transaction_dependency_tracker::get_max_committed_timestamp()
405 {
406   return m_commit_order.get_max_committed_transaction().get_timestamp();
407 }
408 
409 
410