1 /* Copyright (C) 2013-2021 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License along
13    with this program; if not, write to the Free Software Foundation, Inc.,
14    51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15 
16 #ifndef WSREP_THD_H
17 #define WSREP_THD_H
18 
19 #include <my_config.h>
20 
21 #include "mysql/service_wsrep.h"
22 #include "wsrep/client_state.hpp"
23 #include "sql_class.h"
24 #include "wsrep_utils.h"
25 #include <deque>
26 class Wsrep_thd_queue
27 {
28 public:
Wsrep_thd_queue(THD * t)29   Wsrep_thd_queue(THD* t) : thd(t)
30   {
31     mysql_mutex_init(key_LOCK_wsrep_thd_queue,
32                      &LOCK_wsrep_thd_queue,
33                      MY_MUTEX_INIT_FAST);
34     mysql_cond_init(key_COND_wsrep_thd_queue, &COND_wsrep_thd_queue, NULL);
35   }
~Wsrep_thd_queue()36   ~Wsrep_thd_queue()
37   {
38     mysql_mutex_destroy(&LOCK_wsrep_thd_queue);
39     mysql_cond_destroy(&COND_wsrep_thd_queue);
40   }
push_back(THD * thd)41   bool push_back(THD* thd)
42   {
43     DBUG_ASSERT(thd);
44     wsp::auto_lock lock(&LOCK_wsrep_thd_queue);
45     std::deque<THD*>::iterator it = queue.begin();
46     while (it != queue.end())
47     {
48       if (*it == thd)
49       {
50         return true;
51       }
52       it++;
53     }
54     queue.push_back(thd);
55     mysql_cond_signal(&COND_wsrep_thd_queue);
56     return false;
57   }
pop_front()58   THD* pop_front()
59   {
60     wsp::auto_lock lock(&LOCK_wsrep_thd_queue);
61     while (queue.empty())
62     {
63       if (thd->killed != NOT_KILLED)
64         return NULL;
65 
66       thd->mysys_var->current_mutex= &LOCK_wsrep_thd_queue;
67       thd->mysys_var->current_cond=  &COND_wsrep_thd_queue;
68 
69       mysql_cond_wait(&COND_wsrep_thd_queue, &LOCK_wsrep_thd_queue);
70 
71       thd->mysys_var->current_mutex= 0;
72       thd->mysys_var->current_cond=  0;
73     }
74     THD* ret= queue.front();
75     queue.pop_front();
76     return ret;
77   }
78 private:
79   THD*             thd;
80   std::deque<THD*> queue;
81   mysql_mutex_t    LOCK_wsrep_thd_queue;
82   mysql_cond_t     COND_wsrep_thd_queue;
83 };
84 
85 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
86                           enum enum_var_type scope);
87 bool wsrep_create_appliers(long threads, bool mutex_protected=false);
88 void wsrep_create_rollbacker();
89 
90 bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd);
91 int  wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal);
92 
93 /*
94   Helper methods to deal with thread local storage.
95   The purpose of these methods is to hide the details of thread
96   local storage handling when operating with wsrep storage access
97   and streaming applier THDs
98 
99   With one-thread-per-connection thread handling thread specific
100   variables are allocated when the thread is started and deallocated
101   before thread exits (my_thread_init(), my_thread_end()). However,
102   with pool-of-threads thread handling new thread specific variables
103   are allocated for each THD separately (see threadpool_add_connection()),
104   and the variables in thread local storage are assigned from
105   currently active thread (see thread_attach()). This must be taken into
106   account when storing/resetting thread local storage and when creating
107   streaming applier THDs.
108 */
109 
110 /**
111    Create new variables for thread local storage. With
112    one-thread-per-connection thread handling this is a no op,
113    with pool-of-threads new variables are created via my_thread_init().
114    It is assumed that the caller has called wsrep_reset_threadvars() to clear
115    the thread local storage before this call.
116 
117    @return Zero in case of success, non-zero otherwise.
118 */
119 int wsrep_create_threadvars();
120 
121 /**
122    Delete variables which were created by wsrep_create_threadvars().
123    The caller must store variables into thread local storage before
124    this call via wsrep_store_threadvars().
125 */
126 void wsrep_delete_threadvars();
127 
128 /**
129    Assign variables from current thread local storage into THD.
130    This should be called for THDs whose lifetime is limited to single
131    thread execution or which may share the operation context with some
132    parent THD (e.g. storage access) and thus don't require separately
133    allocated globals.
134 
135    With one-thread-per-connection thread handling this is a no-op,
136    with pool-of-threads the variables which are currently stored into
137    thread local storage are assigned to THD.
138 */
139 void wsrep_assign_from_threadvars(THD *);
140 
141 /**
142    Helper struct to save variables from thread local storage.
143  */
144 struct Wsrep_threadvars
145 {
146   THD* cur_thd;
147   st_my_thread_var* mysys_var;
148 };
149 
150 /**
151    Save variables from thread local storage into Wsrep_threadvars struct.
152  */
153 Wsrep_threadvars wsrep_save_threadvars();
154 
155 /**
156    Restore variables into thread local storage from Wsrep_threadvars struct.
157 */
158 void wsrep_restore_threadvars(const Wsrep_threadvars&);
159 
160 /**
161    Store variables into thread local storage.
162 */
163 void wsrep_store_threadvars(THD *);
164 
165 /**
166    Reset thread local storage.
167 */
168 void wsrep_reset_threadvars(THD *);
169 
170 /**
171    Helper functions to override error status
172 
173    In many contexts it is desirable to mask the original error status
174    set for THD or it is necessary to change OK status to error.
175    This function implements the common logic for the most
176    of the cases.
177 
178    Rules:
179    * If the diagnostics are has OK or EOF status, override it unconditionally
180    * If the error is either ER_ERROR_DURING_COMMIT or ER_LOCK_DEADLOCK
181      it is usually the correct error status to be returned to client,
182      so don't override those by default
183  */
184 
wsrep_override_error(THD * thd,uint error)185 static inline void wsrep_override_error(THD *thd, uint error)
186 {
187   DBUG_ASSERT(error != ER_ERROR_DURING_COMMIT);
188   Diagnostics_area *da= thd->get_stmt_da();
189   if (da->is_ok() ||
190       da->is_eof() ||
191       !da->is_set() ||
192       (da->is_error() &&
193        da->sql_errno() != error &&
194        da->sql_errno() != ER_ERROR_DURING_COMMIT &&
195        da->sql_errno() != ER_LOCK_DEADLOCK))
196   {
197     da->reset_diagnostics_area();
198     my_error(error, MYF(0));
199   }
200 }
201 
202 /**
203    Override error with additional wsrep status.
204  */
wsrep_override_error(THD * thd,uint error,enum wsrep::provider::status status)205 static inline void wsrep_override_error(THD *thd, uint error,
206                                         enum wsrep::provider::status status)
207 {
208   Diagnostics_area *da= thd->get_stmt_da();
209   if (da->is_ok() ||
210       !da->is_set() ||
211       (da->is_error() &&
212        da->sql_errno() != error &&
213        da->sql_errno() != ER_ERROR_DURING_COMMIT &&
214        da->sql_errno() != ER_LOCK_DEADLOCK))
215   {
216     da->reset_diagnostics_area();
217     my_error(error, MYF(0), status);
218   }
219 }
220 
wsrep_override_error(THD * thd,wsrep::client_error ce,enum wsrep::provider::status status)221 static inline void wsrep_override_error(THD* thd,
222                                         wsrep::client_error ce,
223                                         enum wsrep::provider::status status)
224 {
225     DBUG_ASSERT(ce != wsrep::e_success);
226     switch (ce)
227     {
228     case wsrep::e_error_during_commit:
229       wsrep_override_error(thd, ER_ERROR_DURING_COMMIT, status);
230       break;
231     case wsrep::e_deadlock_error:
232       wsrep_override_error(thd, ER_LOCK_DEADLOCK);
233       break;
234     case wsrep::e_interrupted_error:
235       wsrep_override_error(thd, ER_QUERY_INTERRUPTED);
236       break;
237     case wsrep::e_size_exceeded_error:
238       wsrep_override_error(thd, ER_ERROR_DURING_COMMIT, status);
239       break;
240     case wsrep::e_append_fragment_error:
241       /* TODO: Figure out better error number */
242       wsrep_override_error(thd, ER_ERROR_DURING_COMMIT, status);
243       break;
244     case wsrep::e_not_supported_error:
245       wsrep_override_error(thd, ER_NOT_SUPPORTED_YET);
246       break;
247     case wsrep::e_timeout_error:
248       wsrep_override_error(thd, ER_LOCK_WAIT_TIMEOUT);
249       break;
250     default:
251       wsrep_override_error(thd, ER_UNKNOWN_ERROR);
252       break;
253     }
254 }
255 
256 /**
257    Helper function to log THD wsrep context.
258 
259    @param thd Pointer to THD
260    @param message Optional message
261    @param function Function where the call was made from
262  */
wsrep_log_thd(const THD * thd,const char * message,const char * function)263 static inline void wsrep_log_thd(const THD *thd,
264                                  const char *message,
265                                  const char *function)
266 {
267   WSREP_DEBUG("%s %s\n"
268               "    thd: %llu thd_ptr: %p client_mode: %s client_state: %s trx_state: %s\n"
269               "    next_trx_id: %lld trx_id: %lld seqno: %lld\n"
270               "    is_streaming: %d fragments: %zu\n"
271               "    sql_errno: %u message: %s\n"
272 #define WSREP_THD_LOG_QUERIES
273 #ifdef WSREP_THD_LOG_QUERIES
274               "    command: %d query: %.72s"
275 #endif /* WSREP_OBSERVER_LOG_QUERIES */
276               ,
277               function,
278               message ? message : "",
279               thd->thread_id,
280               thd,
281               wsrep_thd_client_mode_str(thd),
282               wsrep_thd_client_state_str(thd),
283               wsrep_thd_transaction_state_str(thd),
284               (long long)thd->wsrep_next_trx_id(),
285               (long long)thd->wsrep_trx_id(),
286               (long long)wsrep_thd_trx_seqno(thd),
287               thd->wsrep_trx().is_streaming(),
288               thd->wsrep_sr().fragments().size(),
289               (thd->get_stmt_da()->is_error() ? thd->get_stmt_da()->sql_errno() : 0),
290               (thd->get_stmt_da()->is_error() ? thd->get_stmt_da()->message() : "")
291 #ifdef WSREP_THD_LOG_QUERIES
292               , thd->lex->sql_command,
293               wsrep_thd_query(thd)
294 #endif /* WSREP_OBSERVER_LOG_QUERIES */
295               );
296 }
297 
298 #define WSREP_LOG_THD(thd_, message_) wsrep_log_thd(thd_, message_, __FUNCTION__)
299 
300 #endif /* WSREP_THD_H */
301