1 /* Copyright (c) 2018, 2020, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/rpl_trx_tracking.h"
24
25 #include <algorithm>
26 #include <utility>
27 #include <vector>
28
29 #include "debug_sync.h"
30 #include "libbinlogevents/include/binlog_event.h"
31 #include "my_inttypes.h"
32 #include "my_sqlcommand.h"
33 #include "sql/binlog.h"
34 #include "sql/current_thd.h"
35 #include "sql/mysqld.h"
36 #include "sql/rpl_context.h"
37 #include "sql/rpl_transaction_write_set_ctx.h"
38 #include "sql/sql_alter.h"
39 #include "sql/sql_class.h"
40 #include "sql/sql_lex.h"
41 #include "sql/system_variables.h"
42 #include "sql/transaction_info.h"
43
Logical_clock()44 Logical_clock::Logical_clock() : state(SEQ_UNINIT), offset(0) {}
45
46 /**
47 Atomically fetch the current state.
48 @return not subtracted "absolute" value.
49 */
get_timestamp()50 inline int64 Logical_clock::get_timestamp() {
51 int64 retval = 0;
52 DBUG_TRACE;
53 retval = state.load();
54 return retval;
55 }
56
57 /**
58 Steps the absolute value of the clock (state) to return
59 an updated value.
60 The caller must be sure to call the method in no concurrent
61 execution context so either offset and state can't change.
62
63 @return incremented "absolute" value
64 */
step()65 inline int64 Logical_clock::step() {
66 static_assert(SEQ_UNINIT == 0, "");
67 DBUG_EXECUTE_IF("logical_clock_step_2", ++state;);
68 return ++state;
69 }
70
71 /**
72 To try setting the clock *forward*.
73 The clock does not change when the new value is in the past
74 which is reflected by the new value and by offset.
75 In other words the function main effects is described as
76 state= max(state, new_value).
77 Offset that exceeds the new value indicates the binary log rotation
78 to render such new value useless.
79
80 @param new_val a new value (offset included)
81 @return a (new) value of state member regardless whether it's changed or not.
82 */
set_if_greater(int64 new_val)83 inline int64 Logical_clock::set_if_greater(int64 new_val) {
84 int64 old_val = new_val - 1;
85 bool cas_rc;
86
87 DBUG_TRACE;
88
89 DBUG_ASSERT(new_val > 0);
90
91 if (new_val <= offset) {
92 /*
93 This function's invocation can be separated from the
94 transaction's flushing by few rotations. A late to log
95 transaction does not change the clock, similarly to how
96 its timestamps are handled at flushing.
97 */
98 return SEQ_UNINIT;
99 }
100
101 DBUG_ASSERT(new_val > 0);
102
103 while (
104 !(cas_rc = atomic_compare_exchange_strong(&state, &old_val, new_val)) &&
105 old_val < new_val) {
106 }
107
108 DBUG_ASSERT(state >= new_val); // setting can't be done to past
109
110 DBUG_ASSERT(cas_rc || old_val >= new_val);
111
112 return cas_rc ? new_val : old_val;
113 }
114
115 /*
116 Admin statements release metadata lock too earlier. It breaks the rule of lock
117 based logical clock. This function recognizes the statements.
118 */
is_trx_unsafe_for_parallel_slave(const THD * thd)119 static bool is_trx_unsafe_for_parallel_slave(const THD *thd) {
120 switch (thd->lex->sql_command) {
121 case SQLCOM_ANALYZE:
122 case SQLCOM_REPAIR:
123 case SQLCOM_OPTIMIZE:
124 case SQLCOM_CREATE_DB:
125 case SQLCOM_ALTER_DB:
126 case SQLCOM_DROP_DB:
127 return true;
128 case SQLCOM_ALTER_TABLE:
129 return thd->lex->alter_info->flags & Alter_info::ALTER_ADMIN_PARTITION;
130 default:
131 return false;
132 }
133 return false;
134 }
135
136 /**
137 Get the sequence_number for a transaction, and get the last_commit based
138 on parallel committing transactions.
139
140 @param[in] thd Current THD from which to extract trx context.
141 @param[in,out] sequence_number Sequence number of current transaction.
142 @param[in,out] commit_parent Commit_parent of current transaction,
143 pre-filled with the commit_parent calculated
144 by the logical clock logic.
145 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)146 void Commit_order_trx_dependency_tracker::get_dependency(THD *thd,
147 int64 &sequence_number,
148 int64 &commit_parent) {
149 Transaction_ctx *trn_ctx = thd->get_transaction();
150
151 DBUG_ASSERT(trn_ctx->sequence_number >
152 m_max_committed_transaction.get_offset());
153 /*
154 Prepare sequence_number and commit_parent relative to the current
155 binlog. This is done by subtracting the binlog's clock offset
156 from the values.
157
158 A transaction that commits after the binlog is rotated, can have a
159 commit parent in the previous binlog. In this case, subtracting
160 the offset from the sequence number results in a negative
161 number. The commit parent dependency gets lost in such
162 case. Therefore, we log the value SEQ_UNINIT in this case.
163 */
164 sequence_number =
165 trn_ctx->sequence_number - m_max_committed_transaction.get_offset();
166
167 if (trn_ctx->last_committed <= m_max_committed_transaction.get_offset())
168 commit_parent = SEQ_UNINIT;
169 else
170 commit_parent =
171 std::max(trn_ctx->last_committed, m_last_blocking_transaction) -
172 m_max_committed_transaction.get_offset();
173
174 if (is_trx_unsafe_for_parallel_slave(thd))
175 m_last_blocking_transaction = trn_ctx->sequence_number;
176 }
177
step()178 int64 Commit_order_trx_dependency_tracker::step() {
179 return m_transaction_counter.step();
180 }
181
rotate()182 void Commit_order_trx_dependency_tracker::rotate() {
183 m_max_committed_transaction.update_offset(
184 m_transaction_counter.get_timestamp());
185
186 m_transaction_counter.update_offset(m_transaction_counter.get_timestamp());
187 }
188
update_max_committed(int64 sequence_number)189 void Commit_order_trx_dependency_tracker::update_max_committed(
190 int64 sequence_number) {
191 mysql_mutex_assert_owner(&LOCK_slave_trans_dep_tracker);
192 m_max_committed_transaction.set_if_greater(sequence_number);
193 }
194
195 /**
196 Get the writeset dependencies of a transaction.
197 This takes the commit_parent that must be previously set using
198 Commit_order_trx_dependency_tracker and tries to make the commit_parent as
199 low as possible, using the writesets of each transaction.
200 The commit_parent returned depends on how many row hashes are stored in the
201 writeset_history, which is cleared once it reaches the user-defined maximum.
202
203 @param[in] thd Current THD from which to extract trx context.
204 @param[in,out] sequence_number Sequence number of current transaction.
205 @param[in,out] commit_parent Commit_parent of current transaction,
206 pre-filled with the commit_parent calculated by
207 Commit_order_trx_dependency_tracker to use when
208 the writeset commit_parent is not valid.
209 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)210 void Writeset_trx_dependency_tracker::get_dependency(THD *thd,
211 int64 &sequence_number,
212 int64 &commit_parent) {
213 Rpl_transaction_write_set_ctx *write_set_ctx =
214 thd->get_transaction()->get_transaction_write_set_ctx();
215 std::vector<uint64> *writeset = write_set_ctx->get_write_set();
216
217 #ifndef DBUG_OFF
218 /* The writeset of an empty transaction must be empty. */
219 if (is_empty_transaction_in_binlog_cache(thd))
220 DBUG_ASSERT(writeset->size() == 0);
221 #endif
222
223 /*
224 Check if this transaction has a writeset, if the writeset will overflow the
225 history size, if the transaction_write_set_extraction is consistent
226 between session and global or if changes in the tables referenced in this
227 transaction cascade to other tables. If that happens revert to using the
228 COMMIT_ORDER and clear the history to keep data consistent.
229 */
230 bool can_use_writesets =
231 // empty writeset implies DDL or similar, except if there are missing keys
232 (writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
233 /*
234 The empty transactions do not need to clear the writeset history, since
235 they can be executed in parallel.
236 */
237 is_empty_transaction_in_binlog_cache(thd)) &&
238 // hashing algorithm for the session must be the same as used by other
239 // rows in history
240 (global_system_variables.transaction_write_set_extraction ==
241 thd->variables.transaction_write_set_extraction) &&
242 // must not use foreign keys
243 !write_set_ctx->get_has_related_foreign_keys();
244 bool exceeds_capacity = false;
245
246 mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
247 if (can_use_writesets) {
248 /*
249 Check if adding this transaction exceeds the capacity of the writeset
250 history. If that happens, m_writeset_history will be cleared only after
251 using its information for current transaction.
252 */
253 exceeds_capacity =
254 m_writeset_history.size() + writeset->size() > m_opt_max_history_size;
255
256 /*
257 Compute the greatest sequence_number among all conflicts and add the
258 transaction's row hashes to the history.
259 */
260 DEBUG_SYNC(thd, "wait_in_get_dependency");
261 int64 last_parent = m_writeset_history_start;
262 for (std::vector<uint64>::iterator it = writeset->begin();
263 it != writeset->end(); ++it) {
264 Writeset_history::iterator hst = m_writeset_history.find(*it);
265 if (hst != m_writeset_history.end()) {
266 if (hst->second > last_parent && hst->second < sequence_number)
267 last_parent = hst->second;
268
269 hst->second = sequence_number;
270 } else {
271 if (!exceeds_capacity)
272 m_writeset_history.insert(
273 std::pair<uint64, int64>(*it, sequence_number));
274 }
275 }
276
277 /*
278 If the transaction references tables with missing primary keys revert to
279 COMMIT_ORDER, update and not reset history, as it is unnecessary because
280 any transaction that refers this table will also revert to COMMIT_ORDER.
281 */
282 if (!write_set_ctx->get_has_missing_keys()) {
283 /*
284 The WRITESET commit_parent then becomes the minimum of largest parent
285 found using the hashes of the row touched by the transaction and the
286 commit parent calculated with COMMIT_ORDER.
287 */
288 commit_parent = std::min(last_parent, commit_parent);
289 }
290 }
291
292 if (exceeds_capacity || !can_use_writesets) {
293 m_writeset_history_start = sequence_number;
294 m_writeset_history.clear();
295 }
296 mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
297 }
298
rotate(int64 start)299 void Writeset_trx_dependency_tracker::rotate(int64 start) {
300 m_writeset_history_start = start;
301 m_writeset_history.clear();
302 }
303
304 /**
305 Get the writeset commit parent of transactions using the session dependencies.
306
307 @param[in] thd Current THD from which to extract trx context.
308 @param[in,out] sequence_number Sequence number of current transaction.
309 @param[in,out] commit_parent Commit_parent of current transaction,
310 pre-filled with the commit_parent calculated
311 by the Write_set_trx_dependency_tracker as a
312 fall-back.
313 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)314 void Writeset_session_trx_dependency_tracker::get_dependency(
315 THD *thd, int64 &sequence_number, int64 &commit_parent) {
316 int64 session_parent = thd->rpl_thd_ctx.dependency_tracker_ctx()
317 .get_last_session_sequence_number();
318
319 if (session_parent != 0 && session_parent < sequence_number)
320 commit_parent = std::max(commit_parent, session_parent);
321
322 thd->rpl_thd_ctx.dependency_tracker_ctx().set_last_session_sequence_number(
323 sequence_number);
324 }
325
326 /**
327 Get the dependencies in a transaction, the main entry point for the
328 dependency tracking work.
329 */
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)330 void Transaction_dependency_tracker::get_dependency(THD *thd,
331 int64 &sequence_number,
332 int64 &commit_parent) {
333 sequence_number = commit_parent = 0;
334
335 switch (m_opt_tracking_mode.load(std::memory_order_relaxed)) {
336 case DEPENDENCY_TRACKING_COMMIT_ORDER:
337 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
338 break;
339 case DEPENDENCY_TRACKING_WRITESET:
340 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
341 m_writeset.get_dependency(thd, sequence_number, commit_parent);
342 break;
343 case DEPENDENCY_TRACKING_WRITESET_SESSION:
344 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
345 m_writeset.get_dependency(thd, sequence_number, commit_parent);
346 m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
347 break;
348 default:
349 DBUG_ASSERT(0); // blow up on debug
350 /*
351 Fallback to commit order on production builds.
352 */
353 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
354 }
355 }
356
tracking_mode_changed()357 void Transaction_dependency_tracker::tracking_mode_changed() {
358 Logical_clock max_committed_transaction =
359 m_commit_order.get_max_committed_transaction();
360 int64 timestamp = max_committed_transaction.get_timestamp() -
361 max_committed_transaction.get_offset();
362
363 m_writeset.rotate(timestamp);
364 }
365
366 /**
367 The method is to be executed right before committing time.
368 It must be invoked even if the transaction does not commit
369 to engine being merely logged into the binary log.
370 max_committed_transaction is updated with a greater timestamp
371 value.
372 As a side effect, the transaction context's sequence_number
373 is reset.
374
375 @param thd a pointer to THD instance
376 */
update_max_committed(THD * thd)377 void Transaction_dependency_tracker::update_max_committed(THD *thd) {
378 Transaction_ctx *trn_ctx = thd->get_transaction();
379 m_commit_order.update_max_committed(trn_ctx->sequence_number);
380 /*
381 sequence_number timestamp isn't needed anymore, so it's cleared off.
382 */
383 trn_ctx->sequence_number = SEQ_UNINIT;
384
385 DBUG_ASSERT(trn_ctx->last_committed == SEQ_UNINIT ||
386 thd->commit_error == THD::CE_FLUSH_ERROR);
387 }
388
step()389 int64 Transaction_dependency_tracker::step() { return m_commit_order.step(); }
390
rotate()391 void Transaction_dependency_tracker::rotate() {
392 m_commit_order.rotate();
393 /*
394 To make slave appliers be able to execute transactions in parallel
395 after rotation, set the minimum commit_parent to 1 after rotation.
396 */
397 m_writeset.rotate(1);
398 if (current_thd) current_thd->get_transaction()->sequence_number = 2;
399 }
400
get_max_committed_timestamp()401 int64 Transaction_dependency_tracker::get_max_committed_timestamp() {
402 return m_commit_order.get_max_committed_transaction().get_timestamp();
403 }
404