1 /* Copyright (c) 2018, 2020, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/rpl_trx_tracking.h"
24 
25 #include <algorithm>
26 #include <utility>
27 #include <vector>
28 
29 #include "debug_sync.h"
30 #include "libbinlogevents/include/binlog_event.h"
31 #include "my_inttypes.h"
32 #include "my_sqlcommand.h"
33 #include "sql/binlog.h"
34 #include "sql/current_thd.h"
35 #include "sql/mysqld.h"
36 #include "sql/rpl_context.h"
37 #include "sql/rpl_transaction_write_set_ctx.h"
38 #include "sql/sql_alter.h"
39 #include "sql/sql_class.h"
40 #include "sql/sql_lex.h"
41 #include "sql/system_variables.h"
42 #include "sql/transaction_info.h"
43 
Logical_clock()44 Logical_clock::Logical_clock() : state(SEQ_UNINIT), offset(0) {}
45 
46 /**
47   Atomically fetch the current state.
48   @return  not subtracted "absolute" value.
49  */
get_timestamp()50 inline int64 Logical_clock::get_timestamp() {
51   int64 retval = 0;
52   DBUG_TRACE;
53   retval = state.load();
54   return retval;
55 }
56 
57 /**
58   Steps the absolute value of the clock (state) to return
59   an updated value.
60   The caller must be sure to call the method in no concurrent
61   execution context so either offset and state can't change.
62 
63   @return  incremented "absolute" value
64  */
step()65 inline int64 Logical_clock::step() {
66   static_assert(SEQ_UNINIT == 0, "");
67   DBUG_EXECUTE_IF("logical_clock_step_2", ++state;);
68   return ++state;
69 }
70 
71 /**
72   To try setting the clock *forward*.
73   The clock does not change when the new value is in the past
74   which is reflected by the new value and by offset.
75   In other words the function main effects is described as
76     state= max(state, new_value).
77   Offset that exceeds the new value indicates the binary log rotation
78   to render such new value useless.
79 
80   @param  new_val  a new value (offset included)
81   @return a (new) value of state member regardless whether it's changed or not.
82  */
set_if_greater(int64 new_val)83 inline int64 Logical_clock::set_if_greater(int64 new_val) {
84   int64 old_val = new_val - 1;
85   bool cas_rc;
86 
87   DBUG_TRACE;
88 
89   DBUG_ASSERT(new_val > 0);
90 
91   if (new_val <= offset) {
92     /*
93       This function's invocation can be separated from the
94       transaction's flushing by few rotations. A late to log
95       transaction does not change the clock, similarly to how
96       its timestamps are handled at flushing.
97     */
98     return SEQ_UNINIT;
99   }
100 
101   DBUG_ASSERT(new_val > 0);
102 
103   while (
104       !(cas_rc = atomic_compare_exchange_strong(&state, &old_val, new_val)) &&
105       old_val < new_val) {
106   }
107 
108   DBUG_ASSERT(state >= new_val);  // setting can't be done to past
109 
110   DBUG_ASSERT(cas_rc || old_val >= new_val);
111 
112   return cas_rc ? new_val : old_val;
113 }
114 
115 /*
116   Admin statements release metadata lock too earlier. It breaks the rule of lock
117   based logical clock. This function recognizes the statements.
118  */
is_trx_unsafe_for_parallel_slave(const THD * thd)119 static bool is_trx_unsafe_for_parallel_slave(const THD *thd) {
120   switch (thd->lex->sql_command) {
121     case SQLCOM_ANALYZE:
122     case SQLCOM_REPAIR:
123     case SQLCOM_OPTIMIZE:
124     case SQLCOM_CREATE_DB:
125     case SQLCOM_ALTER_DB:
126     case SQLCOM_DROP_DB:
127       return true;
128     case SQLCOM_ALTER_TABLE:
129       return thd->lex->alter_info->flags & Alter_info::ALTER_ADMIN_PARTITION;
130     default:
131       return false;
132   }
133   return false;
134 }
135 
136 /**
137   Get the sequence_number for a transaction, and get the last_commit based
138   on parallel committing transactions.
139 
140   @param[in]     thd             Current THD from which to extract trx context.
141   @param[in,out] sequence_number Sequence number of current transaction.
142   @param[in,out] commit_parent   Commit_parent of current transaction,
143                                  pre-filled with the commit_parent calculated
144                                  by the logical clock logic.
145 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)146 void Commit_order_trx_dependency_tracker::get_dependency(THD *thd,
147                                                          int64 &sequence_number,
148                                                          int64 &commit_parent) {
149   Transaction_ctx *trn_ctx = thd->get_transaction();
150 
151   DBUG_ASSERT(trn_ctx->sequence_number >
152               m_max_committed_transaction.get_offset());
153   /*
154     Prepare sequence_number and commit_parent relative to the current
155     binlog.  This is done by subtracting the binlog's clock offset
156     from the values.
157 
158     A transaction that commits after the binlog is rotated, can have a
159     commit parent in the previous binlog. In this case, subtracting
160     the offset from the sequence number results in a negative
161     number. The commit parent dependency gets lost in such
162     case. Therefore, we log the value SEQ_UNINIT in this case.
163   */
164   sequence_number =
165       trn_ctx->sequence_number - m_max_committed_transaction.get_offset();
166 
167   if (trn_ctx->last_committed <= m_max_committed_transaction.get_offset())
168     commit_parent = SEQ_UNINIT;
169   else
170     commit_parent =
171         std::max(trn_ctx->last_committed, m_last_blocking_transaction) -
172         m_max_committed_transaction.get_offset();
173 
174   if (is_trx_unsafe_for_parallel_slave(thd))
175     m_last_blocking_transaction = trn_ctx->sequence_number;
176 }
177 
step()178 int64 Commit_order_trx_dependency_tracker::step() {
179   return m_transaction_counter.step();
180 }
181 
rotate()182 void Commit_order_trx_dependency_tracker::rotate() {
183   m_max_committed_transaction.update_offset(
184       m_transaction_counter.get_timestamp());
185 
186   m_transaction_counter.update_offset(m_transaction_counter.get_timestamp());
187 }
188 
update_max_committed(int64 sequence_number)189 void Commit_order_trx_dependency_tracker::update_max_committed(
190     int64 sequence_number) {
191   mysql_mutex_assert_owner(&LOCK_slave_trans_dep_tracker);
192   m_max_committed_transaction.set_if_greater(sequence_number);
193 }
194 
195 /**
196   Get the writeset dependencies of a transaction.
197   This takes the commit_parent that must be previously set using
198   Commit_order_trx_dependency_tracker and tries to make the commit_parent as
199   low as possible, using the writesets of each transaction.
200   The commit_parent returned depends on how many row hashes are stored in the
201   writeset_history, which is cleared once it reaches the user-defined maximum.
202 
203   @param[in]     thd             Current THD from which to extract trx context.
204   @param[in,out] sequence_number Sequence number of current transaction.
205   @param[in,out] commit_parent   Commit_parent of current transaction,
206                                  pre-filled with the commit_parent calculated by
207                                  Commit_order_trx_dependency_tracker to use when
208                                  the writeset commit_parent is not valid.
209 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)210 void Writeset_trx_dependency_tracker::get_dependency(THD *thd,
211                                                      int64 &sequence_number,
212                                                      int64 &commit_parent) {
213   Rpl_transaction_write_set_ctx *write_set_ctx =
214       thd->get_transaction()->get_transaction_write_set_ctx();
215   std::vector<uint64> *writeset = write_set_ctx->get_write_set();
216 
217 #ifndef DBUG_OFF
218   /* The writeset of an empty transaction must be empty. */
219   if (is_empty_transaction_in_binlog_cache(thd))
220     DBUG_ASSERT(writeset->size() == 0);
221 #endif
222 
223   /*
224     Check if this transaction has a writeset, if the writeset will overflow the
225     history size, if the transaction_write_set_extraction is consistent
226     between session and global or if changes in the tables referenced in this
227     transaction cascade to other tables. If that happens revert to using the
228     COMMIT_ORDER and clear the history to keep data consistent.
229   */
230   bool can_use_writesets =
231       // empty writeset implies DDL or similar, except if there are missing keys
232       (writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
233        /*
234          The empty transactions do not need to clear the writeset history, since
235          they can be executed in parallel.
236        */
237        is_empty_transaction_in_binlog_cache(thd)) &&
238       // hashing algorithm for the session must be the same as used by other
239       // rows in history
240       (global_system_variables.transaction_write_set_extraction ==
241        thd->variables.transaction_write_set_extraction) &&
242       // must not use foreign keys
243       !write_set_ctx->get_has_related_foreign_keys();
244   bool exceeds_capacity = false;
245 
246   mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
247   if (can_use_writesets) {
248     /*
249      Check if adding this transaction exceeds the capacity of the writeset
250      history. If that happens, m_writeset_history will be cleared only after
251      using its information for current transaction.
252     */
253     exceeds_capacity =
254         m_writeset_history.size() + writeset->size() > m_opt_max_history_size;
255 
256     /*
257      Compute the greatest sequence_number among all conflicts and add the
258      transaction's row hashes to the history.
259     */
260     DEBUG_SYNC(thd, "wait_in_get_dependency");
261     int64 last_parent = m_writeset_history_start;
262     for (std::vector<uint64>::iterator it = writeset->begin();
263          it != writeset->end(); ++it) {
264       Writeset_history::iterator hst = m_writeset_history.find(*it);
265       if (hst != m_writeset_history.end()) {
266         if (hst->second > last_parent && hst->second < sequence_number)
267           last_parent = hst->second;
268 
269         hst->second = sequence_number;
270       } else {
271         if (!exceeds_capacity)
272           m_writeset_history.insert(
273               std::pair<uint64, int64>(*it, sequence_number));
274       }
275     }
276 
277     /*
278       If the transaction references tables with missing primary keys revert to
279       COMMIT_ORDER, update and not reset history, as it is unnecessary because
280       any transaction that refers this table will also revert to COMMIT_ORDER.
281     */
282     if (!write_set_ctx->get_has_missing_keys()) {
283       /*
284        The WRITESET commit_parent then becomes the minimum of largest parent
285        found using the hashes of the row touched by the transaction and the
286        commit parent calculated with COMMIT_ORDER.
287       */
288       commit_parent = std::min(last_parent, commit_parent);
289     }
290   }
291 
292   if (exceeds_capacity || !can_use_writesets) {
293     m_writeset_history_start = sequence_number;
294     m_writeset_history.clear();
295   }
296   mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
297 }
298 
rotate(int64 start)299 void Writeset_trx_dependency_tracker::rotate(int64 start) {
300   m_writeset_history_start = start;
301   m_writeset_history.clear();
302 }
303 
304 /**
305   Get the writeset commit parent of transactions using the session dependencies.
306 
307   @param[in]     thd             Current THD from which to extract trx context.
308   @param[in,out] sequence_number Sequence number of current transaction.
309   @param[in,out] commit_parent   Commit_parent of current transaction,
310                                  pre-filled with the commit_parent calculated
311                                  by the Write_set_trx_dependency_tracker as a
312                                  fall-back.
313 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)314 void Writeset_session_trx_dependency_tracker::get_dependency(
315     THD *thd, int64 &sequence_number, int64 &commit_parent) {
316   int64 session_parent = thd->rpl_thd_ctx.dependency_tracker_ctx()
317                              .get_last_session_sequence_number();
318 
319   if (session_parent != 0 && session_parent < sequence_number)
320     commit_parent = std::max(commit_parent, session_parent);
321 
322   thd->rpl_thd_ctx.dependency_tracker_ctx().set_last_session_sequence_number(
323       sequence_number);
324 }
325 
326 /**
327   Get the dependencies in a transaction, the main entry point for the
328   dependency tracking work.
329 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)330 void Transaction_dependency_tracker::get_dependency(THD *thd,
331                                                     int64 &sequence_number,
332                                                     int64 &commit_parent) {
333   sequence_number = commit_parent = 0;
334 
335   switch (m_opt_tracking_mode.load(std::memory_order_relaxed)) {
336     case DEPENDENCY_TRACKING_COMMIT_ORDER:
337       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
338       break;
339     case DEPENDENCY_TRACKING_WRITESET:
340       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
341       m_writeset.get_dependency(thd, sequence_number, commit_parent);
342       break;
343     case DEPENDENCY_TRACKING_WRITESET_SESSION:
344       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
345       m_writeset.get_dependency(thd, sequence_number, commit_parent);
346       m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
347       break;
348     default:
349       DBUG_ASSERT(0);  // blow up on debug
350       /*
351         Fallback to commit order on production builds.
352        */
353       m_commit_order.get_dependency(thd, sequence_number, commit_parent);
354   }
355 }
356 
tracking_mode_changed()357 void Transaction_dependency_tracker::tracking_mode_changed() {
358   Logical_clock max_committed_transaction =
359       m_commit_order.get_max_committed_transaction();
360   int64 timestamp = max_committed_transaction.get_timestamp() -
361                     max_committed_transaction.get_offset();
362 
363   m_writeset.rotate(timestamp);
364 }
365 
366 /**
367   The method is to be executed right before committing time.
368   It must be invoked even if the transaction does not commit
369   to engine being merely logged into the binary log.
370   max_committed_transaction is updated with a greater timestamp
371   value.
372   As a side effect, the transaction context's sequence_number
373   is reset.
374 
375   @param thd a pointer to THD instance
376 */
update_max_committed(THD * thd)377 void Transaction_dependency_tracker::update_max_committed(THD *thd) {
378   Transaction_ctx *trn_ctx = thd->get_transaction();
379   m_commit_order.update_max_committed(trn_ctx->sequence_number);
380   /*
381     sequence_number timestamp isn't needed anymore, so it's cleared off.
382   */
383   trn_ctx->sequence_number = SEQ_UNINIT;
384 
385   DBUG_ASSERT(trn_ctx->last_committed == SEQ_UNINIT ||
386               thd->commit_error == THD::CE_FLUSH_ERROR);
387 }
388 
step()389 int64 Transaction_dependency_tracker::step() { return m_commit_order.step(); }
390 
rotate()391 void Transaction_dependency_tracker::rotate() {
392   m_commit_order.rotate();
393   /*
394     To make slave appliers be able to execute transactions in parallel
395     after rotation, set the minimum commit_parent to 1 after rotation.
396   */
397   m_writeset.rotate(1);
398   if (current_thd) current_thd->get_transaction()->sequence_number = 2;
399 }
400 
get_max_committed_timestamp()401 int64 Transaction_dependency_tracker::get_max_committed_timestamp() {
402   return m_commit_order.get_max_committed_transaction().get_timestamp();
403 }
404