1 /* Copyright 2018-2021 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16 #include "wsrep_client_service.h"
17 #include "wsrep_high_priority_service.h"
18 #include "wsrep_binlog.h" /* wsrep_dump_rbr_buf() */
19 #include "wsrep_schema.h" /* remove_fragments() */
20 #include "wsrep_thd.h"
21 #include "wsrep_xid.h"
22 #include "wsrep_trans_observer.h"
23
24 #include "sql_base.h" /* close_temporary_table() */
25 #include "sql_class.h" /* THD */
26 #include "sql_parse.h" /* stmt_causes_implicit_commit() */
27 #include "rpl_filter.h" /* binlog_filter */
28 #include "rpl_rli.h" /* Relay_log_info */
29 #include "slave.h" /* opt_log_slave_updates */
30 #include "transaction.h" /* trans_commit()... */
31 #include "log.h" /* stmt_has_updated_trans_table() */
32 #include "mysql/service_debug_sync.h"
33 #include "mysql/psi/mysql_thread.h" /* mysql_mutex_assert_owner() */
34
35 namespace
36 {
37
debug_sync_caller(THD * thd,const char * sync_point)38 void debug_sync_caller(THD* thd, const char* sync_point)
39 {
40 #ifdef ENABLED_DEBUG_SYNC_OUT
41 debug_sync_set_action(thd, sync_point, strlen(sync_point));
42 #endif
43 #ifdef ENABLED_DEBUG_SYNC
44 if (debug_sync_service) debug_sync_service(thd,sync_point,strlen(sync_point));
45 #endif
46
47 }
48 }
49
Wsrep_client_service(THD * thd,Wsrep_client_state & client_state)50 Wsrep_client_service::Wsrep_client_service(THD* thd,
51 Wsrep_client_state& client_state)
52 : wsrep::client_service()
53 , m_thd(thd)
54 , m_client_state(client_state)
55 { }
56
store_globals()57 void Wsrep_client_service::store_globals()
58 {
59 wsrep_store_threadvars(m_thd);
60 }
61
reset_globals()62 void Wsrep_client_service::reset_globals()
63 {
64 wsrep_reset_threadvars(m_thd);
65 }
66
interrupted(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED) const67 bool Wsrep_client_service::interrupted(
68 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED) const
69 {
70 DBUG_ASSERT(m_thd == current_thd);
71 /* Underlying mutex in lock object points to THD::LOCK_thd_data, which
72 protects m_thd->wsrep_trx() and protects us from thd delete. */
73 mysql_mutex_assert_owner(static_cast<mysql_mutex_t*>(lock.mutex()->native()));
74 bool ret= (m_thd->killed != NOT_KILLED);
75 if (ret)
76 {
77 WSREP_DEBUG("wsrep state is interrupted, THD::killed %d trx state %d",
78 m_thd->killed, m_thd->wsrep_trx().state());
79 }
80 return ret;
81 }
82
prepare_data_for_replication()83 int Wsrep_client_service::prepare_data_for_replication()
84 {
85 DBUG_ASSERT(m_thd == current_thd);
86 DBUG_ENTER("Wsrep_client_service::prepare_data_for_replication");
87 size_t data_len= 0;
88 IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
89
90 if (cache)
91 {
92 m_thd->binlog_flush_pending_rows_event(true);
93 if (wsrep_write_cache(m_thd, cache, &data_len))
94 {
95 WSREP_ERROR("rbr write fail, data_len: %zu",
96 data_len);
97 // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
98 DBUG_RETURN(1);
99 }
100 }
101
102 if (data_len == 0)
103 {
104 if (m_thd->get_stmt_da()->is_ok() &&
105 m_thd->get_stmt_da()->affected_rows() > 0 &&
106 !binlog_filter->is_on() &&
107 !m_thd->wsrep_trx().is_streaming())
108 {
109 WSREP_DEBUG("empty rbr buffer, query: %s, "
110 "affected rows: %llu, "
111 "changed tables: %d, "
112 "sql_log_bin: %d",
113 wsrep_thd_query(m_thd),
114 m_thd->get_stmt_da()->affected_rows(),
115 stmt_has_updated_trans_table(m_thd),
116 m_thd->variables.sql_log_bin);
117 }
118 else
119 {
120 WSREP_DEBUG("empty rbr buffer, query: %s", wsrep_thd_query(m_thd));
121 }
122 }
123 DBUG_RETURN(0);
124 }
125
126
cleanup_transaction()127 void Wsrep_client_service::cleanup_transaction()
128 {
129 DBUG_ASSERT(m_thd == current_thd);
130 if (WSREP_EMULATE_BINLOG(m_thd)) wsrep_thd_binlog_trx_reset(m_thd);
131 m_thd->wsrep_affected_rows= 0;
132 }
133
134
prepare_fragment_for_replication(wsrep::mutable_buffer & buffer,size_t & log_position)135 int Wsrep_client_service::prepare_fragment_for_replication(
136 wsrep::mutable_buffer& buffer, size_t& log_position)
137 {
138 DBUG_ASSERT(m_thd == current_thd);
139 THD* thd= m_thd;
140 DBUG_ENTER("Wsrep_client_service::prepare_fragment_for_replication");
141 IO_CACHE* cache= wsrep_get_trans_cache(thd);
142 thd->binlog_flush_pending_rows_event(true);
143
144 if (!cache)
145 {
146 DBUG_RETURN(0);
147 }
148
149 const my_off_t saved_pos(my_b_tell(cache));
150 if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
151 {
152 DBUG_RETURN(1);
153 }
154
155 int ret= 0;
156 size_t total_length= 0;
157 size_t length= my_b_bytes_in_cache(cache);
158
159 if (!length)
160 {
161 length= my_b_fill(cache);
162 }
163
164 if (length > 0)
165 {
166 do
167 {
168 total_length+= length;
169 if (total_length > wsrep_max_ws_size)
170 {
171 WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
172 wsrep_max_ws_size, total_length);
173 ret= 1;
174 goto cleanup;
175 }
176
177 buffer.push_back(reinterpret_cast<const char*>(cache->read_pos),
178 reinterpret_cast<const char*>(cache->read_pos + length));
179 cache->read_pos= cache->read_end;
180 }
181 while (cache->file >= 0 && (length= my_b_fill(cache)));
182 }
183 DBUG_ASSERT(total_length == buffer.size());
184 log_position= saved_pos;
185 cleanup:
186 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
187 {
188 WSREP_WARN("Failed to reinitialize IO cache");
189 ret= 1;
190 }
191 DBUG_RETURN(ret);
192 }
193
remove_fragments()194 int Wsrep_client_service::remove_fragments()
195 {
196 DBUG_ENTER("Wsrep_client_service::remove_fragments");
197 DEBUG_SYNC(m_thd, "wsrep_before_fragment_removal");
198 if (wsrep_schema->remove_fragments(m_thd,
199 Wsrep_server_state::instance().id(),
200 m_thd->wsrep_trx().id(),
201 m_thd->wsrep_sr().fragments()))
202 {
203 WSREP_DEBUG("Failed to remove fragments from SR storage for transaction "
204 "%llu, %llu",
205 m_thd->thread_id, m_thd->wsrep_trx().id().get());
206 DBUG_RETURN(1);
207 }
208 DBUG_RETURN(0);
209 }
210
statement_allowed_for_streaming() const211 bool Wsrep_client_service::statement_allowed_for_streaming() const
212 {
213 /*
214 Todo: Decide if implicit commit is allowed with streaming
215 replication.
216 !stmt_causes_implicit_commit(m_thd, CF_IMPLICIT_COMMIT_BEGIN);
217 */
218 return true;
219 }
220
bytes_generated() const221 size_t Wsrep_client_service::bytes_generated() const
222 {
223 IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
224 if (cache)
225 {
226 size_t pending_rows_event_length= 0;
227 if (Rows_log_event* ev= m_thd->binlog_get_pending_rows_event(true))
228 {
229 pending_rows_event_length= ev->get_data_size();
230 }
231 return my_b_tell(cache) + pending_rows_event_length;
232 }
233 return 0;
234 }
235
will_replay()236 void Wsrep_client_service::will_replay()
237 {
238 DBUG_ASSERT(m_thd == current_thd);
239 mysql_mutex_lock(&LOCK_wsrep_replaying);
240 ++wsrep_replaying;
241 mysql_mutex_unlock(&LOCK_wsrep_replaying);
242 }
243
signal_replayed()244 void Wsrep_client_service::signal_replayed()
245 {
246 DBUG_ASSERT(m_thd == current_thd);
247 mysql_mutex_lock(&LOCK_wsrep_replaying);
248 --wsrep_replaying;
249 DBUG_ASSERT(wsrep_replaying >= 0);
250 mysql_cond_broadcast(&COND_wsrep_replaying);
251 mysql_mutex_unlock(&LOCK_wsrep_replaying);
252 }
253
replay()254 enum wsrep::provider::status Wsrep_client_service::replay()
255 {
256
257 DBUG_ASSERT(m_thd == current_thd);
258 DBUG_ENTER("Wsrep_client_service::replay");
259
260 /*
261 Allocate separate THD for replaying to avoid tampering
262 original THD state during replication event applying.
263 */
264 THD *replayer_thd= new THD(true, true);
265 replayer_thd->thread_stack= m_thd->thread_stack;
266 replayer_thd->real_id= pthread_self();
267 replayer_thd->prior_thr_create_utime=
268 replayer_thd->start_utime= microsecond_interval_timer();
269 replayer_thd->set_command(COM_SLEEP);
270 replayer_thd->reset_for_next_command(true);
271
272 enum wsrep::provider::status ret;
273 {
274 Wsrep_replayer_service replayer_service(replayer_thd, m_thd);
275 wsrep::provider& provider(replayer_thd->wsrep_cs().provider());
276 ret= provider.replay(replayer_thd->wsrep_trx().ws_handle(),
277 &replayer_service);
278 replayer_service.replay_status(ret);
279 }
280
281 delete replayer_thd;
282 DBUG_RETURN(ret);
283 }
284
replay_unordered()285 enum wsrep::provider::status Wsrep_client_service::replay_unordered()
286 {
287 DBUG_ASSERT(0);
288 return wsrep::provider::error_not_implemented;
289 }
290
wait_for_replayers(wsrep::unique_lock<wsrep::mutex> & lock)291 void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
292 {
293 DBUG_ASSERT(m_thd == current_thd);
294 lock.unlock();
295 mysql_mutex_lock(&LOCK_wsrep_replaying);
296 /* We need to check if the THD is BF aborted during condition wait.
297 Because the aborter does not know which condition this thread is waiting,
298 use timed wait and check if the THD is BF aborted in the loop. */
299 while (wsrep_replaying > 0 && !wsrep_is_bf_aborted(m_thd))
300 {
301 struct timespec wait_time;
302 set_timespec_nsec(wait_time, 10000000L);
303 mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
304 &wait_time);
305 }
306 mysql_mutex_unlock(&LOCK_wsrep_replaying);
307 lock.lock();
308 }
309
commit_by_xid()310 enum wsrep::provider::status Wsrep_client_service::commit_by_xid()
311 {
312 DBUG_ASSERT(0);
313 return wsrep::provider::error_not_implemented;
314 }
315
debug_sync(const char * sync_point)316 void Wsrep_client_service::debug_sync(const char* sync_point)
317 {
318 DBUG_ASSERT(m_thd == current_thd);
319 debug_sync_caller(m_thd, sync_point);
320 }
321
debug_crash(const char * crash_point)322 void Wsrep_client_service::debug_crash(const char* crash_point)
323 {
324 // DBUG_ASSERT(m_thd == current_thd);
325 DBUG_EXECUTE_IF(crash_point, DBUG_SUICIDE(); );
326 }
327
bf_rollback()328 int Wsrep_client_service::bf_rollback()
329 {
330 DBUG_ASSERT(m_thd == current_thd);
331 DBUG_ENTER("Wsrep_client_service::rollback");
332
333 int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd));
334 if (m_thd->locked_tables_mode && m_thd->lock)
335 {
336 if (m_thd->locked_tables_list.unlock_locked_tables(m_thd))
337 ret= 1;
338 m_thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
339 }
340 if (m_thd->global_read_lock.is_acquired())
341 {
342 m_thd->global_read_lock.unlock_global_read_lock(m_thd);
343 }
344 m_thd->release_transactional_locks();
345 m_thd->mdl_context.release_explicit_locks();
346
347 DBUG_RETURN(ret);
348 }
349