1 /* Copyright (C) 2013-2021 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15
16 #ifndef WSREP_THD_H
17 #define WSREP_THD_H
18
19 #include <my_config.h>
20
21 #include "mysql/service_wsrep.h"
22 #include "wsrep/client_state.hpp"
23 #include "sql_class.h"
24 #include "wsrep_utils.h"
25 #include <deque>
26 class Wsrep_thd_queue
27 {
28 public:
Wsrep_thd_queue(THD * t)29 Wsrep_thd_queue(THD* t) : thd(t)
30 {
31 mysql_mutex_init(key_LOCK_wsrep_thd_queue,
32 &LOCK_wsrep_thd_queue,
33 MY_MUTEX_INIT_FAST);
34 mysql_cond_init(key_COND_wsrep_thd_queue, &COND_wsrep_thd_queue, NULL);
35 }
~Wsrep_thd_queue()36 ~Wsrep_thd_queue()
37 {
38 mysql_mutex_destroy(&LOCK_wsrep_thd_queue);
39 mysql_cond_destroy(&COND_wsrep_thd_queue);
40 }
push_back(THD * thd)41 bool push_back(THD* thd)
42 {
43 DBUG_ASSERT(thd);
44 wsp::auto_lock lock(&LOCK_wsrep_thd_queue);
45 std::deque<THD*>::iterator it = queue.begin();
46 while (it != queue.end())
47 {
48 if (*it == thd)
49 {
50 return true;
51 }
52 it++;
53 }
54 queue.push_back(thd);
55 mysql_cond_signal(&COND_wsrep_thd_queue);
56 return false;
57 }
pop_front()58 THD* pop_front()
59 {
60 wsp::auto_lock lock(&LOCK_wsrep_thd_queue);
61 while (queue.empty())
62 {
63 if (thd->killed != NOT_KILLED)
64 return NULL;
65
66 thd->mysys_var->current_mutex= &LOCK_wsrep_thd_queue;
67 thd->mysys_var->current_cond= &COND_wsrep_thd_queue;
68
69 mysql_cond_wait(&COND_wsrep_thd_queue, &LOCK_wsrep_thd_queue);
70
71 thd->mysys_var->current_mutex= 0;
72 thd->mysys_var->current_cond= 0;
73 }
74 THD* ret= queue.front();
75 queue.pop_front();
76 return ret;
77 }
78 private:
79 THD* thd;
80 std::deque<THD*> queue;
81 mysql_mutex_t LOCK_wsrep_thd_queue;
82 mysql_cond_t COND_wsrep_thd_queue;
83 };
84
85 int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
86 enum enum_var_type scope);
87 bool wsrep_create_appliers(long threads, bool mutex_protected=false);
88 void wsrep_create_rollbacker();
89
90 bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd);
91 int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal);
92
93 /*
94 Helper methods to deal with thread local storage.
95 The purpose of these methods is to hide the details of thread
96 local storage handling when operating with wsrep storage access
97 and streaming applier THDs
98
99 With one-thread-per-connection thread handling thread specific
100 variables are allocated when the thread is started and deallocated
101 before thread exits (my_thread_init(), my_thread_end()). However,
102 with pool-of-threads thread handling new thread specific variables
103 are allocated for each THD separately (see threadpool_add_connection()),
104 and the variables in thread local storage are assigned from
105 currently active thread (see thread_attach()). This must be taken into
106 account when storing/resetting thread local storage and when creating
107 streaming applier THDs.
108 */
109
110 /**
111 Create new variables for thread local storage. With
112 one-thread-per-connection thread handling this is a no op,
113 with pool-of-threads new variables are created via my_thread_init().
114 It is assumed that the caller has called wsrep_reset_threadvars() to clear
115 the thread local storage before this call.
116
117 @return Zero in case of success, non-zero otherwise.
118 */
119 int wsrep_create_threadvars();
120
121 /**
122 Delete variables which were created by wsrep_create_threadvars().
123 The caller must store variables into thread local storage before
124 this call via wsrep_store_threadvars().
125 */
126 void wsrep_delete_threadvars();
127
128 /**
129 Assign variables from current thread local storage into THD.
130 This should be called for THDs whose lifetime is limited to single
131 thread execution or which may share the operation context with some
132 parent THD (e.g. storage access) and thus don't require separately
133 allocated globals.
134
135 With one-thread-per-connection thread handling this is a no-op,
136 with pool-of-threads the variables which are currently stored into
137 thread local storage are assigned to THD.
138 */
139 void wsrep_assign_from_threadvars(THD *);
140
141 /**
142 Helper struct to save variables from thread local storage.
143 */
144 struct Wsrep_threadvars
145 {
146 THD* cur_thd;
147 st_my_thread_var* mysys_var;
148 };
149
150 /**
151 Save variables from thread local storage into Wsrep_threadvars struct.
152 */
153 Wsrep_threadvars wsrep_save_threadvars();
154
155 /**
156 Restore variables into thread local storage from Wsrep_threadvars struct.
157 */
158 void wsrep_restore_threadvars(const Wsrep_threadvars&);
159
160 /**
161 Store variables into thread local storage.
162 */
163 void wsrep_store_threadvars(THD *);
164
165 /**
166 Reset thread local storage.
167 */
168 void wsrep_reset_threadvars(THD *);
169
170 /**
171 Helper functions to override error status
172
173 In many contexts it is desirable to mask the original error status
174 set for THD or it is necessary to change OK status to error.
175 This function implements the common logic for the most
176 of the cases.
177
178 Rules:
179 * If the diagnostics are has OK or EOF status, override it unconditionally
180 * If the error is either ER_ERROR_DURING_COMMIT or ER_LOCK_DEADLOCK
181 it is usually the correct error status to be returned to client,
182 so don't override those by default
183 */
184
wsrep_override_error(THD * thd,uint error)185 static inline void wsrep_override_error(THD *thd, uint error)
186 {
187 DBUG_ASSERT(error != ER_ERROR_DURING_COMMIT);
188 Diagnostics_area *da= thd->get_stmt_da();
189 if (da->is_ok() ||
190 da->is_eof() ||
191 !da->is_set() ||
192 (da->is_error() &&
193 da->sql_errno() != error &&
194 da->sql_errno() != ER_ERROR_DURING_COMMIT &&
195 da->sql_errno() != ER_LOCK_DEADLOCK))
196 {
197 da->reset_diagnostics_area();
198 my_error(error, MYF(0));
199 }
200 }
201
202 /**
203 Override error with additional wsrep status.
204 */
wsrep_override_error(THD * thd,uint error,enum wsrep::provider::status status)205 static inline void wsrep_override_error(THD *thd, uint error,
206 enum wsrep::provider::status status)
207 {
208 Diagnostics_area *da= thd->get_stmt_da();
209 if (da->is_ok() ||
210 !da->is_set() ||
211 (da->is_error() &&
212 da->sql_errno() != error &&
213 da->sql_errno() != ER_ERROR_DURING_COMMIT &&
214 da->sql_errno() != ER_LOCK_DEADLOCK))
215 {
216 da->reset_diagnostics_area();
217 my_error(error, MYF(0), status);
218 }
219 }
220
wsrep_override_error(THD * thd,wsrep::client_error ce,enum wsrep::provider::status status)221 static inline void wsrep_override_error(THD* thd,
222 wsrep::client_error ce,
223 enum wsrep::provider::status status)
224 {
225 DBUG_ASSERT(ce != wsrep::e_success);
226 switch (ce)
227 {
228 case wsrep::e_error_during_commit:
229 wsrep_override_error(thd, ER_ERROR_DURING_COMMIT, status);
230 break;
231 case wsrep::e_deadlock_error:
232 wsrep_override_error(thd, ER_LOCK_DEADLOCK);
233 break;
234 case wsrep::e_interrupted_error:
235 wsrep_override_error(thd, ER_QUERY_INTERRUPTED);
236 break;
237 case wsrep::e_size_exceeded_error:
238 wsrep_override_error(thd, ER_ERROR_DURING_COMMIT, status);
239 break;
240 case wsrep::e_append_fragment_error:
241 /* TODO: Figure out better error number */
242 wsrep_override_error(thd, ER_ERROR_DURING_COMMIT, status);
243 break;
244 case wsrep::e_not_supported_error:
245 wsrep_override_error(thd, ER_NOT_SUPPORTED_YET);
246 break;
247 case wsrep::e_timeout_error:
248 wsrep_override_error(thd, ER_LOCK_WAIT_TIMEOUT);
249 break;
250 default:
251 wsrep_override_error(thd, ER_UNKNOWN_ERROR);
252 break;
253 }
254 }
255
256 /**
257 Helper function to log THD wsrep context.
258
259 @param thd Pointer to THD
260 @param message Optional message
261 @param function Function where the call was made from
262 */
wsrep_log_thd(const THD * thd,const char * message,const char * function)263 static inline void wsrep_log_thd(const THD *thd,
264 const char *message,
265 const char *function)
266 {
267 WSREP_DEBUG("%s %s\n"
268 " thd: %llu thd_ptr: %p client_mode: %s client_state: %s trx_state: %s\n"
269 " next_trx_id: %lld trx_id: %lld seqno: %lld\n"
270 " is_streaming: %d fragments: %zu\n"
271 " sql_errno: %u message: %s\n"
272 #define WSREP_THD_LOG_QUERIES
273 #ifdef WSREP_THD_LOG_QUERIES
274 " command: %d query: %.72s"
275 #endif /* WSREP_OBSERVER_LOG_QUERIES */
276 ,
277 function,
278 message ? message : "",
279 thd->thread_id,
280 thd,
281 wsrep_thd_client_mode_str(thd),
282 wsrep_thd_client_state_str(thd),
283 wsrep_thd_transaction_state_str(thd),
284 (long long)thd->wsrep_next_trx_id(),
285 (long long)thd->wsrep_trx_id(),
286 (long long)wsrep_thd_trx_seqno(thd),
287 thd->wsrep_trx().is_streaming(),
288 thd->wsrep_sr().fragments().size(),
289 (thd->get_stmt_da()->is_error() ? thd->get_stmt_da()->sql_errno() : 0),
290 (thd->get_stmt_da()->is_error() ? thd->get_stmt_da()->message() : "")
291 #ifdef WSREP_THD_LOG_QUERIES
292 , thd->lex->sql_command,
293 wsrep_thd_query(thd)
294 #endif /* WSREP_OBSERVER_LOG_QUERIES */
295 );
296 }
297
298 #define WSREP_LOG_THD(thd_, message_) wsrep_log_thd(thd_, message_, __FUNCTION__)
299
300 #endif /* WSREP_THD_H */
301