1 /* Copyright 2016-2019 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
15
16 #ifndef WSREP_TRANS_OBSERVER_H
17 #define WSREP_TRANS_OBSERVER_H
18
19 #include "my_global.h"
20 #include "mysql/service_wsrep.h"
21 #include "wsrep_applier.h" /* wsrep_apply_error */
22 #include "wsrep_xid.h"
23 #include "wsrep_thd.h"
24 #include "wsrep_binlog.h" /* register/deregister group commit */
25 #include "my_dbug.h"
26
27 class THD;
28
29 /*
30 Return true if THD has active wsrep transaction.
31 */
wsrep_is_active(THD * thd)32 static inline bool wsrep_is_active(THD* thd)
33 {
34 return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
35 thd->wsrep_cs().transaction().active());
36 }
37
38 /*
39 Return true if transaction is ordered.
40 */
wsrep_is_ordered(THD * thd)41 static inline bool wsrep_is_ordered(THD* thd)
42 {
43 return thd->wsrep_trx().ordered();
44 }
45
46 /*
47 Return true if transaction has been BF aborted but has not been
48 rolled back yet.
49
50 It is required that the caller holds thd->LOCK_thd_data.
51 */
wsrep_must_abort(THD * thd)52 static inline bool wsrep_must_abort(THD* thd)
53 {
54 mysql_mutex_assert_owner(&thd->LOCK_thd_data);
55 return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort);
56 }
57
58 /*
59 Return true if the transaction must be replayed.
60 */
wsrep_must_replay(THD * thd)61 static inline bool wsrep_must_replay(THD* thd)
62 {
63 return (thd->wsrep_trx().state() == wsrep::transaction::s_must_replay);
64 }
65 /*
66 Return true if transaction has not been committed.
67
68 Note that we don't require thd->LOCK_thd_data here. Calling this method
69 makes sense only from codepaths which are past ordered_commit state
70 and the wsrep transaction is immune to BF aborts at that point.
71 */
wsrep_not_committed(THD * thd)72 static inline bool wsrep_not_committed(THD* thd)
73 {
74 return (thd->wsrep_trx().state() != wsrep::transaction::s_committed);
75 }
76
77 /*
78 Return true if THD is either committing a transaction or statement
79 is autocommit.
80 */
wsrep_is_real(THD * thd,bool all)81 static inline bool wsrep_is_real(THD* thd, bool all)
82 {
83 return (all || thd->transaction.all.ha_list == 0);
84 }
85
86 /*
87 Check if a transaction has generated changes.
88 */
wsrep_has_changes(THD * thd)89 static inline bool wsrep_has_changes(THD* thd)
90 {
91 return (thd->wsrep_trx().is_empty() == false);
92 }
93
94 /*
95 Check if an active transaction has been BF aborted.
96 */
wsrep_is_bf_aborted(THD * thd)97 static inline bool wsrep_is_bf_aborted(THD* thd)
98 {
99 return (thd->wsrep_trx().active() && thd->wsrep_trx().bf_aborted());
100 }
101
wsrep_check_pk(THD * thd)102 static inline int wsrep_check_pk(THD* thd)
103 {
104 if (!wsrep_certify_nonPK)
105 {
106 for (TABLE* table= thd->open_tables; table != NULL; table= table->next)
107 {
108 if (table->key_info == NULL || table->s->primary_key == MAX_KEY)
109 {
110 WSREP_DEBUG("No primary key found for table %s.%s",
111 table->s->db.str, table->s->table_name.str);
112 wsrep_override_error(thd, ER_LOCK_DEADLOCK);
113 return 1;
114 }
115 }
116 }
117 return 0;
118 }
119
wsrep_streaming_enabled(THD * thd)120 static inline bool wsrep_streaming_enabled(THD* thd)
121 {
122 return (thd->wsrep_sr().fragment_size() > 0);
123 }
124
125 /*
126 Return number of fragments succesfully certified for the
127 current statement.
128 */
wsrep_fragments_certified_for_stmt(THD * thd)129 static inline size_t wsrep_fragments_certified_for_stmt(THD* thd)
130 {
131 return thd->wsrep_trx().fragments_certified_for_statement();
132 }
133
wsrep_start_transaction(THD * thd,wsrep_trx_id_t trx_id)134 static inline int wsrep_start_transaction(THD* thd, wsrep_trx_id_t trx_id)
135 {
136 if (thd->wsrep_cs().state() != wsrep::client_state::s_none) {
137 if (wsrep_is_active(thd) == false)
138 return thd->wsrep_cs().start_transaction(wsrep::transaction_id(trx_id));
139 }
140 return 0;
141 }
142
143 /**/
wsrep_start_trx_if_not_started(THD * thd)144 static inline int wsrep_start_trx_if_not_started(THD* thd)
145 {
146 int ret= 0;
147 DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
148 DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_local);
149 if (thd->wsrep_trx().active() == false)
150 {
151 ret= wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
152 }
153 return ret;
154 }
155
156 /*
157 Called after each row operation.
158
159 Return zero on succes, non-zero on failure.
160 */
wsrep_after_row_internal(THD * thd)161 static inline int wsrep_after_row_internal(THD* thd)
162 {
163 if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
164 wsrep_thd_is_local(thd))
165 {
166 if (wsrep_check_pk(thd))
167 {
168 return 1;
169 }
170 else if (wsrep_streaming_enabled(thd))
171 {
172 return thd->wsrep_cs().after_row();
173 }
174 }
175 return 0;
176 }
177
178 /*
179 Helper method to determine whether commit time hooks
180 should be run for the transaction.
181
182 Commit hooks must be run in the following cases:
183 - The transaction is local and has generated write set and is committing.
184 - The transaction has been BF aborted
185 - Is running in high priority mode and is ordered. This can be replayer,
186 applier or storage access.
187 */
wsrep_run_commit_hook(THD * thd,bool all)188 static inline bool wsrep_run_commit_hook(THD* thd, bool all)
189 {
190 DBUG_ENTER("wsrep_run_commit_hook");
191 DBUG_PRINT("wsrep", ("Is_active: %d is_real %d has_changes %d is_applying %d "
192 "is_ordered: %d",
193 wsrep_is_active(thd), wsrep_is_real(thd, all),
194 wsrep_has_changes(thd), wsrep_thd_is_applying(thd),
195 wsrep_is_ordered(thd)));
196 /* Is MST commit or autocommit? */
197 bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all);
198 /* Do not commit if we are aborting */
199 ret= ret && (thd->wsrep_trx().state() != wsrep::transaction::s_aborting);
200 if (ret && !(wsrep_has_changes(thd) || /* Has generated write set */
201 /* Is high priority (replay, applier, storage) and the
202 transaction is scheduled for commit ordering */
203 (wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd))))
204 {
205 mysql_mutex_lock(&thd->LOCK_thd_data);
206 DBUG_PRINT("wsrep", ("state: %s",
207 wsrep::to_c_string(thd->wsrep_trx().state())));
208 /* Transaction is local but has no changes, the commit hooks will
209 be skipped and the wsrep transaction is terminated in
210 wsrep_commit_empty() */
211 if (thd->wsrep_trx().state() == wsrep::transaction::s_executing)
212 {
213 ret= false;
214 }
215 mysql_mutex_unlock(&thd->LOCK_thd_data);
216 }
217 DBUG_PRINT("wsrep", ("return: %d", ret));
218 DBUG_RETURN(ret);
219 }
220
221 /*
222 Called before the transaction is prepared.
223
224 Return zero on succes, non-zero on failure.
225 */
wsrep_before_prepare(THD * thd,bool all)226 static inline int wsrep_before_prepare(THD* thd, bool all)
227 {
228 DBUG_ENTER("wsrep_before_prepare");
229 WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all));
230 int ret= 0;
231 DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
232 if ((ret= thd->wsrep_cs().before_prepare()) == 0)
233 {
234 DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
235 wsrep_xid_init(&thd->wsrep_xid,
236 thd->wsrep_trx().ws_meta().gtid());
237 }
238 DBUG_RETURN(ret);
239 }
240
241 /*
242 Called after the transaction has been prepared.
243
244 Return zero on succes, non-zero on failure.
245 */
wsrep_after_prepare(THD * thd,bool all)246 static inline int wsrep_after_prepare(THD* thd, bool all)
247 {
248 DBUG_ENTER("wsrep_after_prepare");
249 WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all));
250 DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
251 int ret= thd->wsrep_cs().after_prepare();
252 DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() ||
253 thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay);
254 DBUG_RETURN(ret);
255 }
256
257 /*
258 Called before the transaction is committed.
259
260 This function must be called from both client and
261 applier contexts before commit.
262
263 Return zero on succes, non-zero on failure.
264 */
wsrep_before_commit(THD * thd,bool all)265 static inline int wsrep_before_commit(THD* thd, bool all)
266 {
267 DBUG_ENTER("wsrep_before_commit");
268 WSREP_DEBUG("wsrep_before_commit: %d, %lld",
269 wsrep_is_real(thd, all),
270 (long long)wsrep_thd_trx_seqno(thd));
271 int ret= 0;
272 DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
273 if ((ret= thd->wsrep_cs().before_commit()) == 0)
274 {
275 DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
276 wsrep_xid_init(&thd->wsrep_xid,
277 thd->wsrep_trx().ws_meta().gtid());
278 wsrep_register_for_group_commit(thd);
279 }
280 DBUG_RETURN(ret);
281 }
282
283 /*
284 Called after the transaction has been ordered for commit.
285
286 This function must be called from both client and
287 applier contexts after the commit has been ordered.
288
289 @param thd Pointer to THD
290 @param all
291 @param err Error buffer in case of applying error
292
293 Return zero on succes, non-zero on failure.
294 */
wsrep_ordered_commit(THD * thd,bool all,const wsrep_apply_error &)295 static inline int wsrep_ordered_commit(THD* thd,
296 bool all,
297 const wsrep_apply_error&)
298 {
299 DBUG_ENTER("wsrep_ordered_commit");
300 WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all));
301 DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
302 DBUG_RETURN(thd->wsrep_cs().ordered_commit());
303 }
304
305 /*
306 Called after the transaction has been committed.
307
308 Return zero on succes, non-zero on failure.
309 */
wsrep_after_commit(THD * thd,bool all)310 static inline int wsrep_after_commit(THD* thd, bool all)
311 {
312 DBUG_ENTER("wsrep_after_commit");
313 WSREP_DEBUG("wsrep_after_commit: %d, %d, %lld, %d",
314 wsrep_is_real(thd, all),
315 wsrep_is_active(thd),
316 (long long)wsrep_thd_trx_seqno(thd),
317 wsrep_has_changes(thd));
318 DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
319 int ret= 0;
320 if (thd->wsrep_trx().state() == wsrep::transaction::s_committing)
321 {
322 ret= thd->wsrep_cs().ordered_commit();
323 }
324 wsrep_unregister_from_group_commit(thd);
325 thd->wsrep_xid.null();
326 DBUG_RETURN(ret || thd->wsrep_cs().after_commit());
327 }
328
329 /*
330 Called before the transaction is rolled back.
331
332 Return zero on succes, non-zero on failure.
333 */
wsrep_before_rollback(THD * thd,bool all)334 static inline int wsrep_before_rollback(THD* thd, bool all)
335 {
336 DBUG_ENTER("wsrep_before_rollback");
337 int ret= 0;
338 if (wsrep_is_active(thd))
339 {
340 if (!all && thd->in_active_multi_stmt_transaction())
341 {
342 if (wsrep_emulate_bin_log)
343 {
344 wsrep_thd_binlog_stmt_rollback(thd);
345 }
346
347 if (thd->wsrep_trx().is_streaming() &&
348 (wsrep_fragments_certified_for_stmt(thd) > 0))
349 {
350 /* Non-safe statement rollback during SR multi statement
351 transaction. A statement rollback is considered unsafe, if
352 the same statement has already replicated one or more fragments.
353 Self abort the transaction, the actual rollback and error
354 handling will be done in after statement phase. */
355 WSREP_DEBUG("statement rollback is not safe for streaming replication");
356 wsrep_thd_self_abort(thd);
357 ret= 0;
358 }
359 }
360 else if (wsrep_is_real(thd, all) &&
361 thd->wsrep_trx().state() != wsrep::transaction::s_aborted)
362 {
363 /* Real transaction rolling back and wsrep abort not completed
364 yet */
365 /* Reset XID so that it does not trigger writing serialization
366 history in InnoDB. This needs to be avoided because rollback
367 may happen out of order and replay may follow. */
368 thd->wsrep_xid.null();
369 ret= thd->wsrep_cs().before_rollback();
370 }
371 }
372 DBUG_RETURN(ret);
373 }
374
375 /*
376 Called after the transaction has been rolled back.
377
378 Return zero on succes, non-zero on failure.
379 */
wsrep_after_rollback(THD * thd,bool all)380 static inline int wsrep_after_rollback(THD* thd, bool all)
381 {
382 DBUG_ENTER("wsrep_after_rollback");
383 DBUG_RETURN((wsrep_is_real(thd, all) && wsrep_is_active(thd) &&
384 thd->wsrep_cs().transaction().state() !=
385 wsrep::transaction::s_aborted) ?
386 thd->wsrep_cs().after_rollback() : 0);
387 }
388
wsrep_before_statement(THD * thd)389 static inline int wsrep_before_statement(THD* thd)
390 {
391 return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
392 thd->wsrep_cs().before_statement() : 0);
393 }
394
395 static inline
wsrep_after_statement(THD * thd)396 int wsrep_after_statement(THD* thd)
397 {
398 DBUG_ENTER("wsrep_after_statement");
399 WSREP_DEBUG("wsrep_after_statement for %lu client_state %s "
400 " client_mode %s trans_state %s",
401 thd_get_thread_id(thd),
402 wsrep::to_c_string(thd->wsrep_cs().state()),
403 wsrep::to_c_string(thd->wsrep_cs().mode()),
404 wsrep::to_c_string(thd->wsrep_cs().transaction().state()));
405 DBUG_RETURN((thd->wsrep_cs().state() != wsrep::client_state::s_none &&
406 thd->wsrep_cs().mode() == Wsrep_client_state::m_local) ?
407 thd->wsrep_cs().after_statement() : 0);
408 }
409
wsrep_after_apply(THD * thd)410 static inline void wsrep_after_apply(THD* thd)
411 {
412 DBUG_ASSERT(wsrep_thd_is_applying(thd));
413 WSREP_DEBUG("wsrep_after_apply %lld", thd->thread_id);
414 thd->wsrep_cs().after_applying();
415 }
416
wsrep_open(THD * thd)417 static inline void wsrep_open(THD* thd)
418 {
419 DBUG_ENTER("wsrep_open");
420 if (WSREP_ON_)
421 {
422 /* WSREP_PROVIDER_EXISTS_ cannot be set if WSREP_ON_ is not set */
423 DBUG_ASSERT(WSREP_PROVIDER_EXISTS_);
424 thd->wsrep_cs().open(wsrep::client_id(thd->thread_id));
425 thd->wsrep_cs().debug_log_level(wsrep_debug);
426 if (!thd->wsrep_applier && thd->variables.wsrep_trx_fragment_size)
427 {
428 thd->wsrep_cs().enable_streaming(
429 wsrep_fragment_unit(thd->variables.wsrep_trx_fragment_unit),
430 size_t(thd->variables.wsrep_trx_fragment_size));
431 }
432 }
433 DBUG_VOID_RETURN;
434 }
435
wsrep_close(THD * thd)436 static inline void wsrep_close(THD* thd)
437 {
438 DBUG_ENTER("wsrep_close");
439 if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
440 {
441 thd->wsrep_cs().close();
442 }
443 DBUG_VOID_RETURN;
444 }
445
wsrep_cleanup(THD * thd)446 static inline void wsrep_cleanup(THD* thd)
447 {
448 DBUG_ENTER("wsrep_cleanup");
449 if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
450 {
451 thd->wsrep_cs().cleanup();
452 }
453 DBUG_VOID_RETURN;
454 }
455
456 static inline void
wsrep_wait_rollback_complete_and_acquire_ownership(THD * thd)457 wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd)
458 {
459 DBUG_ENTER("wsrep_wait_rollback_complete_and_acquire_ownership");
460 if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
461 {
462 thd->wsrep_cs().wait_rollback_complete_and_acquire_ownership();
463 }
464 DBUG_VOID_RETURN;
465 }
466
wsrep_before_command(THD * thd,bool keep_command_error)467 static inline int wsrep_before_command(THD* thd, bool keep_command_error)
468 {
469 return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
470 thd->wsrep_cs().before_command(keep_command_error) : 0);
471 }
472
wsrep_before_command(THD * thd)473 static inline int wsrep_before_command(THD* thd)
474 {
475 return wsrep_before_command(thd, false);
476 }
477
478 /*
479 Called after each command.
480
481 Return zero on success, non-zero on failure.
482 */
wsrep_after_command_before_result(THD * thd)483 static inline void wsrep_after_command_before_result(THD* thd)
484 {
485 if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
486 {
487 thd->wsrep_cs().after_command_before_result();
488 }
489 }
490
wsrep_after_command_after_result(THD * thd)491 static inline void wsrep_after_command_after_result(THD* thd)
492 {
493 if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
494 {
495 thd->wsrep_cs().after_command_after_result();
496 }
497 }
498
wsrep_after_command_ignore_result(THD * thd)499 static inline void wsrep_after_command_ignore_result(THD* thd)
500 {
501 wsrep_after_command_before_result(thd);
502 DBUG_ASSERT(!thd->wsrep_cs().current_error());
503 wsrep_after_command_after_result(thd);
504 }
505
wsrep_current_error(THD * thd)506 static inline enum wsrep::client_error wsrep_current_error(THD* thd)
507 {
508 return thd->wsrep_cs().current_error();
509 }
510
511 static inline enum wsrep::provider::status
wsrep_current_error_status(THD * thd)512 wsrep_current_error_status(THD* thd)
513 {
514 return thd->wsrep_cs().current_error_status();
515 }
516
517
518 /*
519 Commit an empty transaction.
520
521 If the transaction is real and the wsrep transaction is still active,
522 the transaction did not generate any rows or keys and is committed
523 as empty. Here the wsrep transaction is rolled back and after statement
524 step is performed to leave the wsrep transaction in the state as it
525 never existed.
526 */
wsrep_commit_empty(THD * thd,bool all)527 static inline void wsrep_commit_empty(THD* thd, bool all)
528 {
529 DBUG_ENTER("wsrep_commit_empty");
530 WSREP_DEBUG("wsrep_commit_empty(%llu)", thd->thread_id);
531 if (wsrep_is_real(thd, all) &&
532 wsrep_thd_is_local(thd) &&
533 thd->wsrep_trx().active() &&
534 thd->wsrep_trx().state() != wsrep::transaction::s_committed)
535 {
536 /* @todo CTAS with STATEMENT binlog format and empty result set
537 seems to be committing empty. Figure out why and try to fix
538 elsewhere. */
539 DBUG_ASSERT(!wsrep_has_changes(thd) ||
540 (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
541 !thd->is_current_stmt_binlog_format_row()));
542 bool have_error= wsrep_current_error(thd);
543 int ret= wsrep_before_rollback(thd, all) ||
544 wsrep_after_rollback(thd, all) ||
545 wsrep_after_statement(thd);
546 /* The committing transaction was empty but it held some locks and
547 got BF aborted. As there were no certified changes in the
548 data, we ignore the deadlock error and rely on error reporting
549 by storage engine/server. */
550 if (!ret && !have_error && wsrep_current_error(thd))
551 {
552 DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error);
553 thd->wsrep_cs().reset_error();
554 }
555 if (ret)
556 {
557 WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd));
558 }
559 }
560 DBUG_VOID_RETURN;
561 }
562
563 #endif /* WSREP_TRANS_OBSERVER */
564