1 /* Copyright 2018-2021 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16 #include "wsrep_client_service.h"
17 #include "wsrep_high_priority_service.h"
18 #include "wsrep_applier.h" /* wsrep_apply_events() */
19 #include "wsrep_binlog.h" /* wsrep_dump_rbr_buf() */
20 #include "wsrep_schema.h" /* remove_fragments() */
21 #include "wsrep_thd.h"
22 #include "wsrep_xid.h"
23 #include "wsrep_trans_observer.h"
24
25 #include "sql_base.h" /* close_temporary_table() */
26 #include "sql_class.h" /* THD */
27 #include "sql_parse.h" /* stmt_causes_implicit_commit() */
28 #include "rpl_filter.h" /* binlog_filter */
29 #include "rpl_rli.h" /* Relay_log_info */
30 #include "slave.h" /* opt_log_slave_updates */
31 #include "transaction.h" /* trans_commit()... */
32 #include "log.h" /* stmt_has_updated_trans_table() */
33 #include "mysql/service_debug_sync.h"
34 #include "mysql/psi/mysql_thread.h" /* mysql_mutex_assert_owner() */
35
36 namespace
37 {
38
debug_sync_caller(THD * thd,const char * sync_point)39 void debug_sync_caller(THD* thd, const char* sync_point)
40 {
41 #ifdef ENABLED_DEBUG_SYNC_OUT
42 debug_sync_set_action(thd, sync_point, strlen(sync_point));
43 #endif
44 #ifdef ENABLED_DEBUG_SYNC
45 if (debug_sync_service) debug_sync_service(thd,sync_point,strlen(sync_point));
46 #endif
47
48 }
49 }
50
Wsrep_client_service(THD * thd,Wsrep_client_state & client_state)51 Wsrep_client_service::Wsrep_client_service(THD* thd,
52 Wsrep_client_state& client_state)
53 : wsrep::client_service()
54 , m_thd(thd)
55 , m_client_state(client_state)
56 { }
57
store_globals()58 void Wsrep_client_service::store_globals()
59 {
60 wsrep_store_threadvars(m_thd);
61 }
62
reset_globals()63 void Wsrep_client_service::reset_globals()
64 {
65 wsrep_reset_threadvars(m_thd);
66 }
67
interrupted(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED) const68 bool Wsrep_client_service::interrupted(
69 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED) const
70 {
71 DBUG_ASSERT(m_thd == current_thd);
72 /* Underlying mutex in lock object points to THD::LOCK_thd_data, which
73 protects m_thd->wsrep_trx() and protects us from thd delete. */
74 mysql_mutex_assert_owner(static_cast<mysql_mutex_t*>(lock.mutex()->native()));
75 bool ret= (m_thd->killed != NOT_KILLED);
76 if (ret)
77 {
78 WSREP_DEBUG("wsrep state is interrupted, THD::killed %d trx state %d",
79 m_thd->killed, m_thd->wsrep_trx().state());
80 }
81 return ret;
82 }
83
prepare_data_for_replication()84 int Wsrep_client_service::prepare_data_for_replication()
85 {
86 DBUG_ASSERT(m_thd == current_thd);
87 DBUG_ENTER("Wsrep_client_service::prepare_data_for_replication");
88 size_t data_len= 0;
89 IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
90
91 if (cache)
92 {
93 m_thd->binlog_flush_pending_rows_event(true);
94 if (wsrep_write_cache(m_thd, cache, &data_len))
95 {
96 WSREP_ERROR("rbr write fail, data_len: %zu",
97 data_len);
98 // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
99 DBUG_RETURN(1);
100 }
101 }
102
103 if (data_len == 0)
104 {
105 if (m_thd->get_stmt_da()->is_ok() &&
106 m_thd->get_stmt_da()->affected_rows() > 0 &&
107 !binlog_filter->is_on() &&
108 !m_thd->wsrep_trx().is_streaming())
109 {
110 WSREP_DEBUG("empty rbr buffer, query: %s, "
111 "affected rows: %llu, "
112 "changed tables: %d, "
113 "sql_log_bin: %d",
114 wsrep_thd_query(m_thd),
115 m_thd->get_stmt_da()->affected_rows(),
116 stmt_has_updated_trans_table(m_thd),
117 m_thd->variables.sql_log_bin);
118 }
119 else
120 {
121 WSREP_DEBUG("empty rbr buffer, query: %s", wsrep_thd_query(m_thd));
122 }
123 }
124 DBUG_RETURN(0);
125 }
126
127
cleanup_transaction()128 void Wsrep_client_service::cleanup_transaction()
129 {
130 DBUG_ASSERT(m_thd == current_thd);
131 if (WSREP_EMULATE_BINLOG(m_thd)) wsrep_thd_binlog_trx_reset(m_thd);
132 m_thd->wsrep_affected_rows= 0;
133 }
134
135
prepare_fragment_for_replication(wsrep::mutable_buffer & buffer,size_t & log_position)136 int Wsrep_client_service::prepare_fragment_for_replication(
137 wsrep::mutable_buffer& buffer, size_t& log_position)
138 {
139 DBUG_ASSERT(m_thd == current_thd);
140 THD* thd= m_thd;
141 DBUG_ENTER("Wsrep_client_service::prepare_fragment_for_replication");
142 IO_CACHE* cache= wsrep_get_trans_cache(thd);
143 thd->binlog_flush_pending_rows_event(true);
144
145 if (!cache)
146 {
147 DBUG_RETURN(0);
148 }
149
150 const my_off_t saved_pos(my_b_tell(cache));
151 if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
152 {
153 DBUG_RETURN(1);
154 }
155
156 int ret= 0;
157 size_t total_length= 0;
158 size_t length= my_b_bytes_in_cache(cache);
159
160 if (!length)
161 {
162 length= my_b_fill(cache);
163 }
164
165 if (length > 0)
166 {
167 do
168 {
169 total_length+= length;
170 if (total_length > wsrep_max_ws_size)
171 {
172 WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
173 wsrep_max_ws_size, total_length);
174 ret= 1;
175 goto cleanup;
176 }
177
178 buffer.push_back(reinterpret_cast<const char*>(cache->read_pos),
179 reinterpret_cast<const char*>(cache->read_pos + length));
180 cache->read_pos= cache->read_end;
181 }
182 while (cache->file >= 0 && (length= my_b_fill(cache)));
183 }
184 DBUG_ASSERT(total_length == buffer.size());
185 log_position= saved_pos;
186 cleanup:
187 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
188 {
189 WSREP_WARN("Failed to reinitialize IO cache");
190 ret= 1;
191 }
192 DBUG_RETURN(ret);
193 }
194
remove_fragments()195 int Wsrep_client_service::remove_fragments()
196 {
197 DBUG_ENTER("Wsrep_client_service::remove_fragments");
198 DEBUG_SYNC(m_thd, "wsrep_before_fragment_removal");
199 if (wsrep_schema->remove_fragments(m_thd,
200 Wsrep_server_state::instance().id(),
201 m_thd->wsrep_trx().id(),
202 m_thd->wsrep_sr().fragments()))
203 {
204 WSREP_DEBUG("Failed to remove fragments from SR storage for transaction "
205 "%llu, %llu",
206 m_thd->thread_id, m_thd->wsrep_trx().id().get());
207 DBUG_RETURN(1);
208 }
209 DBUG_RETURN(0);
210 }
211
statement_allowed_for_streaming() const212 bool Wsrep_client_service::statement_allowed_for_streaming() const
213 {
214 /*
215 Todo: Decide if implicit commit is allowed with streaming
216 replication.
217 !stmt_causes_implicit_commit(m_thd, CF_IMPLICIT_COMMIT_BEGIN);
218 */
219 return true;
220 }
221
bytes_generated() const222 size_t Wsrep_client_service::bytes_generated() const
223 {
224 IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
225 if (cache)
226 {
227 size_t pending_rows_event_length= 0;
228 if (Rows_log_event* ev= m_thd->binlog_get_pending_rows_event(true))
229 {
230 pending_rows_event_length= ev->get_data_size();
231 }
232 return my_b_tell(cache) + pending_rows_event_length;
233 }
234 return 0;
235 }
236
will_replay()237 void Wsrep_client_service::will_replay()
238 {
239 DBUG_ASSERT(m_thd == current_thd);
240 mysql_mutex_lock(&LOCK_wsrep_replaying);
241 ++wsrep_replaying;
242 mysql_mutex_unlock(&LOCK_wsrep_replaying);
243 }
244
signal_replayed()245 void Wsrep_client_service::signal_replayed()
246 {
247 DBUG_ASSERT(m_thd == current_thd);
248 mysql_mutex_lock(&LOCK_wsrep_replaying);
249 --wsrep_replaying;
250 DBUG_ASSERT(wsrep_replaying >= 0);
251 mysql_cond_broadcast(&COND_wsrep_replaying);
252 mysql_mutex_unlock(&LOCK_wsrep_replaying);
253 }
254
replay()255 enum wsrep::provider::status Wsrep_client_service::replay()
256 {
257
258 DBUG_ASSERT(m_thd == current_thd);
259 DBUG_ENTER("Wsrep_client_service::replay");
260
261 /*
262 Allocate separate THD for replaying to avoid tampering
263 original THD state during replication event applying.
264 */
265 THD *replayer_thd= new THD(true, true);
266 replayer_thd->thread_stack= m_thd->thread_stack;
267 replayer_thd->real_id= pthread_self();
268 replayer_thd->prior_thr_create_utime=
269 replayer_thd->start_utime= microsecond_interval_timer();
270 replayer_thd->set_command(COM_SLEEP);
271 replayer_thd->reset_for_next_command(true);
272
273 enum wsrep::provider::status ret;
274 {
275 Wsrep_replayer_service replayer_service(replayer_thd, m_thd);
276 wsrep::provider& provider(replayer_thd->wsrep_cs().provider());
277 ret= provider.replay(replayer_thd->wsrep_trx().ws_handle(),
278 &replayer_service);
279 replayer_service.replay_status(ret);
280 }
281
282 delete replayer_thd;
283 DBUG_RETURN(ret);
284 }
285
replay_unordered()286 enum wsrep::provider::status Wsrep_client_service::replay_unordered()
287 {
288 DBUG_ASSERT(0);
289 return wsrep::provider::error_not_implemented;
290 }
291
wait_for_replayers(wsrep::unique_lock<wsrep::mutex> & lock)292 void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
293 {
294 DBUG_ASSERT(m_thd == current_thd);
295 lock.unlock();
296 mysql_mutex_lock(&LOCK_wsrep_replaying);
297 /* We need to check if the THD is BF aborted during condition wait.
298 Because the aborter does not know which condition this thread is waiting,
299 use timed wait and check if the THD is BF aborted in the loop. */
300 while (wsrep_replaying > 0 && !wsrep_is_bf_aborted(m_thd))
301 {
302 struct timespec wait_time;
303 set_timespec_nsec(wait_time, 10000000L);
304 mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
305 &wait_time);
306 }
307 mysql_mutex_unlock(&LOCK_wsrep_replaying);
308 lock.lock();
309 }
310
commit_by_xid()311 enum wsrep::provider::status Wsrep_client_service::commit_by_xid()
312 {
313 DBUG_ASSERT(0);
314 return wsrep::provider::error_not_implemented;
315 }
316
debug_sync(const char * sync_point)317 void Wsrep_client_service::debug_sync(const char* sync_point)
318 {
319 DBUG_ASSERT(m_thd == current_thd);
320 debug_sync_caller(m_thd, sync_point);
321 }
322
debug_crash(const char * crash_point)323 void Wsrep_client_service::debug_crash(const char* crash_point)
324 {
325 // DBUG_ASSERT(m_thd == current_thd);
326 DBUG_EXECUTE_IF(crash_point, DBUG_SUICIDE(); );
327 }
328
bf_rollback()329 int Wsrep_client_service::bf_rollback()
330 {
331 DBUG_ASSERT(m_thd == current_thd);
332 DBUG_ENTER("Wsrep_client_service::rollback");
333
334 int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd));
335 if (m_thd->locked_tables_mode && m_thd->lock)
336 {
337 m_thd->locked_tables_list.unlock_locked_tables(m_thd);
338 m_thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
339 }
340 if (m_thd->global_read_lock.is_acquired())
341 {
342 m_thd->global_read_lock.unlock_global_read_lock(m_thd);
343 }
344 m_thd->release_transactional_locks();
345 m_thd->mdl_context.release_explicit_locks();
346
347 DBUG_RETURN(ret);
348 }
349