1 /* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include "rpl_trx_tracking.h"
24
25 #include "mysqld.h"
26 #include "binlog.h"
27
28 #include "debug_sync.h"
29
Logical_clock()30 Logical_clock::Logical_clock()
31 : state(SEQ_UNINIT), offset(0)
32 {}
33
34 /**
35 Atomically fetch the current state.
36 @return not subtracted "absolute" value.
37 */
get_timestamp()38 inline int64 Logical_clock::get_timestamp()
39 {
40 int64 retval= 0;
41 DBUG_ENTER("Logical_clock::get_timestamp");
42 retval= my_atomic_load64(&state);
43 DBUG_RETURN(retval);
44 }
45
46 /**
47 Steps the absolute value of the clock (state) to return
48 an updated value.
49 The caller must be sure to call the method in no concurrent
50 execution context so either offset and state can't change.
51
52 @return incremented "absolute" value
53 */
step()54 inline int64 Logical_clock::step()
55 {
56 assert(SEQ_UNINIT == 0);
57 DBUG_EXECUTE_IF("logical_clock_step_2", ++state;);
58 return ++state;
59 }
60
61 /**
62 To try setting the clock *forward*.
63 The clock does not change when the new value is in the past
64 which is reflected by the new value and by offset.
65 In other words the function main effects is described as
66 state= max(state, new_value).
67 Offset that exceeds the new value indicates the binary log rotation
68 to render such new value useless.
69
70 @param new_val a new value (offset included)
71 @return a (new) value of state member regardless whether it's changed or not.
72 */
set_if_greater(int64 new_val)73 inline int64 Logical_clock::set_if_greater(int64 new_val)
74 {
75 longlong old_val= new_val - 1;
76 bool cas_rc;
77
78 DBUG_ENTER("Logical_clock::set_if_greater");
79
80 assert(new_val > 0);
81
82 if (new_val <= offset)
83 {
84 /*
85 This function's invocation can be separated from the
86 transaction's flushing by few rotations. A late to log
87 transaction does not change the clock, similarly to how
88 its timestamps are handled at flushing.
89 */
90 DBUG_RETURN(SEQ_UNINIT);
91 }
92
93 assert(new_val > 0);
94
95 while (!(cas_rc= my_atomic_cas64(&state, &old_val, new_val)) &&
96 old_val < new_val)
97 {}
98
99 assert(state >= new_val); // setting can't be done to past
100
101 assert(cas_rc || old_val >= new_val);
102
103 DBUG_RETURN(cas_rc ? new_val : old_val);
104 }
105
106
107 /**
108 Get the sequence_number for a transaction, and get the last_commit based
109 on parallel committing transactions.
110
111 @param[in] thd Current THD from which to extract trx context.
112 @param[in,out] sequence_number Sequence number of current transaction.
113 @param[in,out] commit_parent Commit_parent of current transaction,
114 pre-filled with the commit_parent calculated
115 by the logical clock logic.
116 */
117 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)118 Commit_order_trx_dependency_tracker::get_dependency(THD *thd,
119 int64 &sequence_number,
120 int64 &commit_parent)
121 {
122 Transaction_ctx *trn_ctx= thd->get_transaction();
123
124 assert(trn_ctx->sequence_number
125 > m_max_committed_transaction.get_offset());
126 /*
127 Prepare sequence_number and commit_parent relative to the current
128 binlog. This is done by subtracting the binlog's clock offset
129 from the values.
130
131 A transaction that commits after the binlog is rotated, can have a
132 commit parent in the previous binlog. In this case, subtracting
133 the offset from the sequence number results in a negative
134 number. The commit parent dependency gets lost in such
135 case. Therefore, we log the value SEQ_UNINIT in this case.
136 */
137 sequence_number=
138 trn_ctx->sequence_number - m_max_committed_transaction.get_offset();
139
140 commit_parent=
141 trn_ctx->last_committed <= m_max_committed_transaction.get_offset()
142 ? SEQ_UNINIT
143 : trn_ctx->last_committed - m_max_committed_transaction.get_offset();
144 }
145
146 int64
step()147 Commit_order_trx_dependency_tracker::step()
148 {
149 return m_transaction_counter.step();
150 }
151
152 void
rotate()153 Commit_order_trx_dependency_tracker::rotate()
154 {
155 m_max_committed_transaction.
156 update_offset(m_transaction_counter.get_timestamp());
157
158 m_transaction_counter.
159 update_offset(m_transaction_counter.get_timestamp());
160 }
161
162 void
update_max_committed(int64 sequence_number)163 Commit_order_trx_dependency_tracker::update_max_committed(int64 sequence_number)
164 {
165 mysql_mutex_assert_owner(&LOCK_slave_trans_dep_tracker);
166 m_max_committed_transaction.set_if_greater(sequence_number);
167 }
168
169 /**
170 Get the writeset dependencies of a transaction.
171 This takes the commit_parent that must be previously set using
172 Commit_order_trx_dependency_tracker and tries to make the commit_parent as
173 low as possible, using the writesets of each transaction.
174 The commit_parent returned depends on how many row hashes are stored in the
175 writeset_history, which is cleared once it reaches the user-defined maximum.
176
177 @param[in] thd Current THD from which to extract trx context.
178 @param[in,out] sequence_number Sequence number of current transaction.
179 @param[in,out] commit_parent Commit_parent of current transaction,
180 pre-filled with the commit_parent calculated by
181 Commit_order_trx_dependency_tracker to use when
182 the writeset commit_parent is not valid.
183 */
184 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)185 Writeset_trx_dependency_tracker::get_dependency(THD *thd,
186 int64 &sequence_number,
187 int64 &commit_parent)
188 {
189 Rpl_transaction_write_set_ctx *write_set_ctx=
190 thd->get_transaction()->get_transaction_write_set_ctx();
191 std::set<uint64> *writeset= write_set_ctx->get_write_set();
192
193 #ifndef NDEBUG
194 /* The writeset of an empty transaction must be empty. */
195 if (is_empty_transaction_in_binlog_cache(thd))
196 assert(writeset->size() == 0);
197 #endif
198
199 /*
200 Check if this transaction has a writeset, if the writeset will overflow the
201 history size, if the transaction_write_set_extraction is consistent
202 between session and global or if changes in the tables referenced in this
203 transaction cascade to other tables. If that happens revert to using the
204 COMMIT_ORDER and clear the history to keep data consistent.
205 */
206 bool can_use_writesets=
207 // empty writeset implies DDL or similar, except if there are missing keys
208 (writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
209 /*
210 The empty transactions do not need to clear the writeset history, since
211 they can be executed in parallel.
212 */
213 is_empty_transaction_in_binlog_cache(thd)) &&
214 // hashing algorithm for the session must be the same as used by other rows in history
215 (global_system_variables.transaction_write_set_extraction ==
216 thd->variables.transaction_write_set_extraction) &&
217 // must not use foreign keys
218 !write_set_ctx->get_has_related_foreign_keys() &&
219 // it did not broke past the capacity already
220 !write_set_ctx->was_write_set_limit_reached();
221 bool exceeds_capacity= false;
222
223 mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
224 if (can_use_writesets)
225 {
226 /*
227 Check if adding this transaction exceeds the capacity of the writeset
228 history. If that happens, m_writeset_history will be cleared only after
229 using its information for current transaction.
230 */
231 exceeds_capacity=
232 m_writeset_history.size() + writeset->size() > get_opt_max_history_size();
233
234 /*
235 Compute the greatest sequence_number among all conflicts and add the
236 transaction's row hashes to the history.
237 */
238 DEBUG_SYNC(thd, "wait_in_get_dependency");
239 int64 last_parent= m_writeset_history_start;
240 for (std::set<uint64>::iterator it= writeset->begin();
241 it != writeset->end(); ++it)
242 {
243 Writeset_history::iterator hst= m_writeset_history.find(*it);
244 if (hst != m_writeset_history.end())
245 {
246 if (hst->second > last_parent && hst->second < sequence_number)
247 last_parent= hst->second;
248
249 hst->second= sequence_number;
250 }
251 else
252 {
253 if (!exceeds_capacity)
254 m_writeset_history.insert(std::pair<uint64, int64>(*it, sequence_number));
255 }
256 }
257
258 /*
259 If the transaction references tables with missing primary keys revert to
260 COMMIT_ORDER, update and not reset history, as it is unnecessary because
261 any transaction that refers this table will also revert to COMMIT_ORDER.
262 */
263 if (!write_set_ctx->get_has_missing_keys())
264 {
265 /*
266 The WRITESET commit_parent then becomes the minimum of largest parent
267 found using the hashes of the row touched by the transaction and the
268 commit parent calculated with COMMIT_ORDER.
269 */
270 commit_parent= std::min(last_parent, commit_parent);
271 }
272 }
273
274 if (exceeds_capacity || !can_use_writesets)
275 {
276 m_writeset_history_start= sequence_number;
277 m_writeset_history.clear();
278 }
279 mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
280 }
281
282 void
rotate(int64 start)283 Writeset_trx_dependency_tracker::rotate(int64 start)
284 {
285 m_writeset_history_start= start;
286 m_writeset_history.clear();
287 }
288
289 /**
290 Get the writeset commit parent of transactions using the session dependencies.
291
292 @param[in] thd Current THD from which to extract trx context.
293 @param[in,out] sequence_number Sequence number of current transaction.
294 @param[in,out] commit_parent Commit_parent of current transaction,
295 pre-filled with the commit_parent calculated
296 by the Write_set_trx_dependency_tracker as a
297 fall-back.
298 */
299 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)300 Writeset_session_trx_dependency_tracker::get_dependency(THD *thd,
301 int64 &sequence_number,
302 int64 &commit_parent)
303 {
304 int64 session_parent= thd->rpl_thd_ctx.dependency_tracker_ctx().
305 get_last_session_sequence_number();
306
307 if (session_parent != 0 && session_parent < sequence_number)
308 commit_parent= std::max(commit_parent, session_parent);
309
310 thd->rpl_thd_ctx.dependency_tracker_ctx().
311 set_last_session_sequence_number(sequence_number);
312 }
313
314 /**
315 Get the dependencies in a transaction, the main entry point for the
316 dependency tracking work.
317 */
318 void
get_dependency(THD * thd,int64 & sequence_number,int64 & commit_parent)319 Transaction_dependency_tracker::get_dependency(THD *thd,
320 int64 &sequence_number,
321 int64 &commit_parent)
322 {
323 sequence_number= commit_parent= 0;
324
325 switch(my_atomic_load64(&m_opt_tracking_mode))
326 {
327 case DEPENDENCY_TRACKING_COMMIT_ORDER:
328 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
329 break;
330 case DEPENDENCY_TRACKING_WRITESET:
331 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
332 m_writeset.get_dependency(thd, sequence_number, commit_parent);
333 break;
334 case DEPENDENCY_TRACKING_WRITESET_SESSION:
335 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
336 m_writeset.get_dependency(thd, sequence_number, commit_parent);
337 m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
338 break;
339 default:
340 assert(0); // blow up on debug
341 /*
342 Fallback to commit order on production builds.
343 */
344 m_commit_order.get_dependency(thd, sequence_number, commit_parent);
345 }
346 }
347
348 void
tracking_mode_changed()349 Transaction_dependency_tracker::tracking_mode_changed()
350 {
351 Logical_clock max_committed_transaction=
352 m_commit_order.get_max_committed_transaction();
353 int64 timestamp= max_committed_transaction.get_timestamp()
354 - max_committed_transaction.get_offset();
355
356 m_writeset.rotate(timestamp);
357 }
358
359 /**
360 The method is to be executed right before committing time.
361 It must be invoked even if the transaction does not commit
362 to engine being merely logged into the binary log.
363 max_committed_transaction is updated with a greater timestamp
364 value.
365 As a side effect, the transaction context's sequence_number
366 is reset.
367
368 @param thd a pointer to THD instance
369 */
370 void
update_max_committed(THD * thd)371 Transaction_dependency_tracker::update_max_committed(THD *thd)
372 {
373 Transaction_ctx *trn_ctx= thd->get_transaction();
374 m_commit_order.update_max_committed(trn_ctx->sequence_number);
375 /*
376 sequence_number timestamp isn't needed anymore, so it's cleared off.
377 */
378 trn_ctx->sequence_number= SEQ_UNINIT;
379
380 assert(trn_ctx->last_committed == SEQ_UNINIT ||
381 thd->commit_error == THD::CE_FLUSH_ERROR ||
382 thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR);
383 }
384
385 int64
step()386 Transaction_dependency_tracker::step()
387 {
388 return m_commit_order.step();
389 }
390
391 void
rotate()392 Transaction_dependency_tracker::rotate()
393 {
394 m_commit_order.rotate();
395 /*
396 To make slave appliers be able to execute transactions in parallel
397 after rotation, set the minimum commit_parent to 1 after rotation.
398 */
399 m_writeset.rotate(1);
400 if (current_thd)
401 current_thd->get_transaction()->sequence_number= 2;
402 }
403
get_max_committed_timestamp()404 int64 Transaction_dependency_tracker::get_max_committed_timestamp()
405 {
406 return m_commit_order.get_max_committed_transaction().get_timestamp();
407 }
408
409
410