1 /* Copyright 2016-2019 Codership Oy <http://www.codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15 
16 #ifndef WSREP_TRANS_OBSERVER_H
17 #define WSREP_TRANS_OBSERVER_H
18 
19 #include "my_global.h"
20 #include "mysql/service_wsrep.h"
21 #include "wsrep_applier.h" /* wsrep_apply_error */
22 #include "wsrep_xid.h"
23 #include "wsrep_thd.h"
24 #include "wsrep_binlog.h" /* register/deregister group commit */
25 #include "my_dbug.h"
26 
27 class THD;
28 
29 /*
30    Return true if THD has active wsrep transaction.
31  */
wsrep_is_active(THD * thd)32 static inline bool wsrep_is_active(THD* thd)
33 {
34   return (thd->wsrep_cs().state() != wsrep::client_state::s_none  &&
35           thd->wsrep_cs().transaction().active());
36 }
37 
38 /*
39   Return true if transaction is ordered.
40  */
wsrep_is_ordered(THD * thd)41 static inline bool wsrep_is_ordered(THD* thd)
42 {
43   return thd->wsrep_trx().ordered();
44 }
45 
46 /*
47   Return true if transaction has been BF aborted but has not been
48   rolled back yet.
49 
50   It is required that the caller holds thd->LOCK_thd_data.
51 */
wsrep_must_abort(THD * thd)52 static inline bool wsrep_must_abort(THD* thd)
53 {
54   mysql_mutex_assert_owner(&thd->LOCK_thd_data);
55   return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort);
56 }
57 
58 /*
59   Return true if the transaction must be replayed.
60  */
wsrep_must_replay(THD * thd)61 static inline bool wsrep_must_replay(THD* thd)
62 {
63   return (thd->wsrep_trx().state() == wsrep::transaction::s_must_replay);
64 }
65 /*
66   Return true if transaction has not been committed.
67 
68   Note that we don't require thd->LOCK_thd_data here. Calling this method
69   makes sense only from codepaths which are past ordered_commit state
70   and the wsrep transaction is immune to BF aborts at that point.
71 */
wsrep_not_committed(THD * thd)72 static inline bool wsrep_not_committed(THD* thd)
73 {
74   return (thd->wsrep_trx().state() != wsrep::transaction::s_committed);
75 }
76 
77 /*
78   Return true if THD is either committing a transaction or statement
79   is autocommit.
80  */
wsrep_is_real(THD * thd,bool all)81 static inline bool wsrep_is_real(THD* thd, bool all)
82 {
83   return (all || thd->transaction.all.ha_list == 0);
84 }
85 
86 /*
87   Check if a transaction has generated changes.
88  */
wsrep_has_changes(THD * thd)89 static inline bool wsrep_has_changes(THD* thd)
90 {
91   return (thd->wsrep_trx().is_empty() == false);
92 }
93 
94 /*
95   Check if an active transaction has been BF aborted.
96  */
wsrep_is_bf_aborted(THD * thd)97 static inline bool wsrep_is_bf_aborted(THD* thd)
98 {
99   return (thd->wsrep_trx().active() && thd->wsrep_trx().bf_aborted());
100 }
101 
wsrep_check_pk(THD * thd)102 static inline int wsrep_check_pk(THD* thd)
103 {
104   if (!wsrep_certify_nonPK)
105   {
106     for (TABLE* table= thd->open_tables; table != NULL; table= table->next)
107     {
108       if (table->key_info == NULL || table->s->primary_key == MAX_KEY)
109       {
110         WSREP_DEBUG("No primary key found for table %s.%s",
111                     table->s->db.str, table->s->table_name.str);
112         wsrep_override_error(thd, ER_LOCK_DEADLOCK);
113         return 1;
114       }
115     }
116   }
117   return 0;
118 }
119 
wsrep_streaming_enabled(THD * thd)120 static inline bool wsrep_streaming_enabled(THD* thd)
121 {
122   return (thd->wsrep_sr().fragment_size() > 0);
123 }
124 
125 /*
126   Return number of fragments succesfully certified for the
127   current statement.
128  */
wsrep_fragments_certified_for_stmt(THD * thd)129 static inline size_t wsrep_fragments_certified_for_stmt(THD* thd)
130 {
131     return thd->wsrep_trx().fragments_certified_for_statement();
132 }
133 
wsrep_start_transaction(THD * thd,wsrep_trx_id_t trx_id)134 static inline int wsrep_start_transaction(THD* thd, wsrep_trx_id_t trx_id)
135 {
136   if (thd->wsrep_cs().state() != wsrep::client_state::s_none) {
137     if (wsrep_is_active(thd) == false)
138       return thd->wsrep_cs().start_transaction(wsrep::transaction_id(trx_id));
139   }
140   return 0;
141 }
142 
143 /**/
wsrep_start_trx_if_not_started(THD * thd)144 static inline int wsrep_start_trx_if_not_started(THD* thd)
145 {
146   int ret= 0;
147   DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
148   DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_local);
149   if (thd->wsrep_trx().active() == false)
150   {
151     ret= wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
152   }
153   return ret;
154 }
155 
156 /*
157   Called after each row operation.
158 
159   Return zero on succes, non-zero on failure.
160  */
wsrep_after_row_internal(THD * thd)161 static inline int wsrep_after_row_internal(THD* thd)
162 {
163   if (thd->wsrep_cs().state() != wsrep::client_state::s_none  &&
164       wsrep_thd_is_local(thd))
165   {
166     if (wsrep_check_pk(thd))
167     {
168       return 1;
169     }
170     else if (wsrep_streaming_enabled(thd))
171     {
172       return thd->wsrep_cs().after_row();
173     }
174   }
175   return 0;
176 }
177 
178 /*
179   Helper method to determine whether commit time hooks
180   should be run for the transaction.
181 
182   Commit hooks must be run in the following cases:
183   - The transaction is local and has generated write set and is committing.
184   - The transaction has been BF aborted
185   - Is running in high priority mode and is ordered. This can be replayer,
186     applier or storage access.
187  */
wsrep_run_commit_hook(THD * thd,bool all)188 static inline bool wsrep_run_commit_hook(THD* thd, bool all)
189 {
190   DBUG_ENTER("wsrep_run_commit_hook");
191   DBUG_PRINT("wsrep", ("Is_active: %d is_real %d has_changes %d is_applying %d "
192                        "is_ordered: %d",
193                        wsrep_is_active(thd), wsrep_is_real(thd, all),
194                        wsrep_has_changes(thd), wsrep_thd_is_applying(thd),
195                        wsrep_is_ordered(thd)));
196   /* Is MST commit or autocommit? */
197   bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all);
198   /* Do not commit if we are aborting */
199   ret= ret && (thd->wsrep_trx().state() != wsrep::transaction::s_aborting);
200   if (ret && !(wsrep_has_changes(thd) ||  /* Has generated write set */
201                /* Is high priority (replay, applier, storage) and the
202                   transaction is scheduled for commit ordering */
203                (wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd))))
204   {
205     mysql_mutex_lock(&thd->LOCK_thd_data);
206     DBUG_PRINT("wsrep", ("state: %s",
207                          wsrep::to_c_string(thd->wsrep_trx().state())));
208     /* Transaction is local but has no changes, the commit hooks will
209        be skipped and the wsrep transaction is terminated in
210        wsrep_commit_empty() */
211     if (thd->wsrep_trx().state() == wsrep::transaction::s_executing)
212     {
213       ret= false;
214     }
215     mysql_mutex_unlock(&thd->LOCK_thd_data);
216   }
217   DBUG_PRINT("wsrep", ("return: %d", ret));
218   DBUG_RETURN(ret);
219 }
220 
221 /*
222   Called before the transaction is prepared.
223 
224   Return zero on succes, non-zero on failure.
225  */
wsrep_before_prepare(THD * thd,bool all)226 static inline int wsrep_before_prepare(THD* thd, bool all)
227 {
228   DBUG_ENTER("wsrep_before_prepare");
229   WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all));
230   int ret= 0;
231   DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
232   if ((ret= thd->wsrep_cs().before_prepare()) == 0)
233   {
234     DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
235     wsrep_xid_init(&thd->wsrep_xid,
236                      thd->wsrep_trx().ws_meta().gtid());
237   }
238   DBUG_RETURN(ret);
239 }
240 
241 /*
242   Called after the transaction has been prepared.
243 
244   Return zero on succes, non-zero on failure.
245  */
wsrep_after_prepare(THD * thd,bool all)246 static inline int wsrep_after_prepare(THD* thd, bool all)
247 {
248   DBUG_ENTER("wsrep_after_prepare");
249   WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all));
250   DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
251   int ret= thd->wsrep_cs().after_prepare();
252   DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() ||
253               thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay);
254   DBUG_RETURN(ret);
255 }
256 
257 /*
258   Called before the transaction is committed.
259 
260   This function must be called from both client and
261   applier contexts before commit.
262 
263   Return zero on succes, non-zero on failure.
264  */
wsrep_before_commit(THD * thd,bool all)265 static inline int wsrep_before_commit(THD* thd, bool all)
266 {
267   DBUG_ENTER("wsrep_before_commit");
268   WSREP_DEBUG("wsrep_before_commit: %d, %lld",
269               wsrep_is_real(thd, all),
270               (long long)wsrep_thd_trx_seqno(thd));
271   int ret= 0;
272   DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
273   if ((ret= thd->wsrep_cs().before_commit()) == 0)
274   {
275     DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
276     wsrep_xid_init(&thd->wsrep_xid,
277                    thd->wsrep_trx().ws_meta().gtid());
278     wsrep_register_for_group_commit(thd);
279   }
280   DBUG_RETURN(ret);
281 }
282 
283 /*
284   Called after the transaction has been ordered for commit.
285 
286   This function must be called from both client and
287   applier contexts after the commit has been ordered.
288 
289   @param thd Pointer to THD
290   @param all
291   @param err Error buffer in case of applying error
292 
293   Return zero on succes, non-zero on failure.
294  */
wsrep_ordered_commit(THD * thd,bool all,const wsrep_apply_error &)295 static inline int wsrep_ordered_commit(THD* thd,
296                                       bool all,
297                                       const wsrep_apply_error&)
298 {
299   DBUG_ENTER("wsrep_ordered_commit");
300   WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all));
301   DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
302   DBUG_RETURN(thd->wsrep_cs().ordered_commit());
303 }
304 
305 /*
306   Called after the transaction has been committed.
307 
308   Return zero on succes, non-zero on failure.
309  */
wsrep_after_commit(THD * thd,bool all)310 static inline int wsrep_after_commit(THD* thd, bool all)
311 {
312   DBUG_ENTER("wsrep_after_commit");
313   WSREP_DEBUG("wsrep_after_commit: %d, %d, %lld, %d",
314               wsrep_is_real(thd, all),
315               wsrep_is_active(thd),
316               (long long)wsrep_thd_trx_seqno(thd),
317               wsrep_has_changes(thd));
318   DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
319   int ret= 0;
320   if (thd->wsrep_trx().state() == wsrep::transaction::s_committing)
321   {
322     ret= thd->wsrep_cs().ordered_commit();
323   }
324   wsrep_unregister_from_group_commit(thd);
325   thd->wsrep_xid.null();
326   DBUG_RETURN(ret || thd->wsrep_cs().after_commit());
327 }
328 
329 /*
330   Called before the transaction is rolled back.
331 
332   Return zero on succes, non-zero on failure.
333  */
wsrep_before_rollback(THD * thd,bool all)334 static inline int wsrep_before_rollback(THD* thd, bool all)
335 {
336   DBUG_ENTER("wsrep_before_rollback");
337   int ret= 0;
338   if (wsrep_is_active(thd))
339   {
340     if (!all && thd->in_active_multi_stmt_transaction())
341     {
342       if (wsrep_emulate_bin_log)
343       {
344         wsrep_thd_binlog_stmt_rollback(thd);
345       }
346 
347       if (thd->wsrep_trx().is_streaming() &&
348           (wsrep_fragments_certified_for_stmt(thd) > 0))
349       {
350         /* Non-safe statement rollback during SR multi statement
351            transaction. A statement rollback is considered unsafe, if
352            the same statement has already replicated one or more fragments.
353            Self abort the transaction, the actual rollback and error
354            handling will be done in after statement phase. */
355         WSREP_DEBUG("statement rollback is not safe for streaming replication");
356         wsrep_thd_self_abort(thd);
357         ret= 0;
358       }
359     }
360     else if (wsrep_is_real(thd, all) &&
361              thd->wsrep_trx().state() != wsrep::transaction::s_aborted)
362     {
363       /* Real transaction rolling back and wsrep abort not completed
364          yet */
365       /* Reset XID so that it does not trigger writing serialization
366          history in InnoDB. This needs to be avoided because rollback
367          may happen out of order and replay may follow. */
368       thd->wsrep_xid.null();
369       ret= thd->wsrep_cs().before_rollback();
370     }
371   }
372   DBUG_RETURN(ret);
373 }
374 
375 /*
376   Called after the transaction has been rolled back.
377 
378   Return zero on succes, non-zero on failure.
379  */
wsrep_after_rollback(THD * thd,bool all)380 static inline int wsrep_after_rollback(THD* thd, bool all)
381 {
382   DBUG_ENTER("wsrep_after_rollback");
383   DBUG_RETURN((wsrep_is_real(thd, all) && wsrep_is_active(thd) &&
384                thd->wsrep_cs().transaction().state() !=
385                wsrep::transaction::s_aborted) ?
386               thd->wsrep_cs().after_rollback() : 0);
387 }
388 
wsrep_before_statement(THD * thd)389 static inline int wsrep_before_statement(THD* thd)
390 {
391   return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
392 	  thd->wsrep_cs().before_statement() : 0);
393 }
394 
395 static inline
wsrep_after_statement(THD * thd)396 int wsrep_after_statement(THD* thd)
397 {
398   DBUG_ENTER("wsrep_after_statement");
399   WSREP_DEBUG("wsrep_after_statement for %lu client_state %s "
400 	  " client_mode %s trans_state %s",
401 	  thd_get_thread_id(thd),
402 	  wsrep::to_c_string(thd->wsrep_cs().state()),
403 	  wsrep::to_c_string(thd->wsrep_cs().mode()),
404 	  wsrep::to_c_string(thd->wsrep_cs().transaction().state()));
405   DBUG_RETURN((thd->wsrep_cs().state() != wsrep::client_state::s_none &&
406 	       thd->wsrep_cs().mode() == Wsrep_client_state::m_local) ?
407               thd->wsrep_cs().after_statement() : 0);
408 }
409 
wsrep_after_apply(THD * thd)410 static inline void wsrep_after_apply(THD* thd)
411 {
412   DBUG_ASSERT(wsrep_thd_is_applying(thd));
413   WSREP_DEBUG("wsrep_after_apply %lld", thd->thread_id);
414   thd->wsrep_cs().after_applying();
415 }
416 
wsrep_open(THD * thd)417 static inline void wsrep_open(THD* thd)
418 {
419   DBUG_ENTER("wsrep_open");
420   if (WSREP_ON_)
421   {
422     /* WSREP_PROVIDER_EXISTS_ cannot be set if WSREP_ON_ is not set */
423     DBUG_ASSERT(WSREP_PROVIDER_EXISTS_);
424     thd->wsrep_cs().open(wsrep::client_id(thd->thread_id));
425     thd->wsrep_cs().debug_log_level(wsrep_debug);
426     if (!thd->wsrep_applier && thd->variables.wsrep_trx_fragment_size)
427     {
428       thd->wsrep_cs().enable_streaming(
429         wsrep_fragment_unit(thd->variables.wsrep_trx_fragment_unit),
430         size_t(thd->variables.wsrep_trx_fragment_size));
431     }
432   }
433   DBUG_VOID_RETURN;
434 }
435 
wsrep_close(THD * thd)436 static inline void wsrep_close(THD* thd)
437 {
438   DBUG_ENTER("wsrep_close");
439   if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
440   {
441     thd->wsrep_cs().close();
442   }
443   DBUG_VOID_RETURN;
444 }
445 
wsrep_cleanup(THD * thd)446 static inline void wsrep_cleanup(THD* thd)
447 {
448   DBUG_ENTER("wsrep_cleanup");
449   if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
450   {
451     thd->wsrep_cs().cleanup();
452   }
453   DBUG_VOID_RETURN;
454 }
455 
456 static inline void
wsrep_wait_rollback_complete_and_acquire_ownership(THD * thd)457 wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd)
458 {
459   DBUG_ENTER("wsrep_wait_rollback_complete_and_acquire_ownership");
460   if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
461   {
462     thd->wsrep_cs().wait_rollback_complete_and_acquire_ownership();
463   }
464   DBUG_VOID_RETURN;
465 }
466 
wsrep_before_command(THD * thd,bool keep_command_error)467 static inline int wsrep_before_command(THD* thd, bool keep_command_error)
468 {
469   return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
470 	  thd->wsrep_cs().before_command(keep_command_error) : 0);
471 }
472 
wsrep_before_command(THD * thd)473 static inline int wsrep_before_command(THD* thd)
474 {
475   return wsrep_before_command(thd, false);
476 }
477 
478 /*
479   Called after each command.
480 
481   Return zero on success, non-zero on failure.
482 */
wsrep_after_command_before_result(THD * thd)483 static inline void wsrep_after_command_before_result(THD* thd)
484 {
485   if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
486   {
487     thd->wsrep_cs().after_command_before_result();
488   }
489 }
490 
wsrep_after_command_after_result(THD * thd)491 static inline void wsrep_after_command_after_result(THD* thd)
492 {
493   if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
494   {
495     thd->wsrep_cs().after_command_after_result();
496   }
497 }
498 
wsrep_after_command_ignore_result(THD * thd)499 static inline void wsrep_after_command_ignore_result(THD* thd)
500 {
501   wsrep_after_command_before_result(thd);
502   DBUG_ASSERT(!thd->wsrep_cs().current_error());
503   wsrep_after_command_after_result(thd);
504 }
505 
wsrep_current_error(THD * thd)506 static inline enum wsrep::client_error wsrep_current_error(THD* thd)
507 {
508   return thd->wsrep_cs().current_error();
509 }
510 
511 static inline enum wsrep::provider::status
wsrep_current_error_status(THD * thd)512 wsrep_current_error_status(THD* thd)
513 {
514   return thd->wsrep_cs().current_error_status();
515 }
516 
517 
518 /*
519   Commit an empty transaction.
520 
521   If the transaction is real and the wsrep transaction is still active,
522   the transaction did not generate any rows or keys and is committed
523   as empty. Here the wsrep transaction is rolled back and after statement
524   step is performed to leave the wsrep transaction in the state as it
525   never existed.
526 */
wsrep_commit_empty(THD * thd,bool all)527 static inline void wsrep_commit_empty(THD* thd, bool all)
528 {
529   DBUG_ENTER("wsrep_commit_empty");
530   WSREP_DEBUG("wsrep_commit_empty(%llu)", thd->thread_id);
531   if (wsrep_is_real(thd, all) &&
532       wsrep_thd_is_local(thd) &&
533       thd->wsrep_trx().active() &&
534       thd->wsrep_trx().state() != wsrep::transaction::s_committed)
535   {
536     /* @todo CTAS with STATEMENT binlog format and empty result set
537        seems to be committing empty. Figure out why and try to fix
538        elsewhere. */
539     DBUG_ASSERT(!wsrep_has_changes(thd) ||
540                 (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
541                  !thd->is_current_stmt_binlog_format_row()));
542     bool have_error= wsrep_current_error(thd);
543     int ret= wsrep_before_rollback(thd, all) ||
544       wsrep_after_rollback(thd, all) ||
545       wsrep_after_statement(thd);
546     /* The committing transaction was empty but it held some locks and
547        got BF aborted. As there were no certified changes in the
548        data, we ignore the deadlock error and rely on error reporting
549        by storage engine/server. */
550     if (!ret && !have_error && wsrep_current_error(thd))
551     {
552       DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error);
553       thd->wsrep_cs().reset_error();
554     }
555     if (ret)
556     {
557       WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd));
558     }
559   }
560   DBUG_VOID_RETURN;
561 }
562 
563 #endif /* WSREP_TRANS_OBSERVER */
564