1 /* Copyright 2018-2021 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
15 
16 #include "wsrep_client_service.h"
17 #include "wsrep_high_priority_service.h"
18 #include "wsrep_applier.h" /* wsrep_apply_events() */
19 #include "wsrep_binlog.h"  /* wsrep_dump_rbr_buf() */
20 #include "wsrep_schema.h"  /* remove_fragments() */
21 #include "wsrep_thd.h"
22 #include "wsrep_xid.h"
23 #include "wsrep_trans_observer.h"
24 
25 #include "sql_base.h"    /* close_temporary_table() */
26 #include "sql_class.h"   /* THD */
27 #include "sql_parse.h"   /* stmt_causes_implicit_commit() */
28 #include "rpl_filter.h"  /* binlog_filter */
29 #include "rpl_rli.h"     /* Relay_log_info */
30 #include "slave.h"   /* opt_log_slave_updates */
31 #include "transaction.h" /* trans_commit()... */
32 #include "log.h"      /* stmt_has_updated_trans_table() */
33 #include "mysql/service_debug_sync.h"
34 #include "mysql/psi/mysql_thread.h" /* mysql_mutex_assert_owner() */
35 
36 namespace
37 {
38 
debug_sync_caller(THD * thd,const char * sync_point)39 void debug_sync_caller(THD* thd, const char* sync_point)
40 {
41 #ifdef ENABLED_DEBUG_SYNC_OUT
42   debug_sync_set_action(thd, sync_point, strlen(sync_point));
43 #endif
44 #ifdef ENABLED_DEBUG_SYNC
45   if (debug_sync_service) debug_sync_service(thd,sync_point,strlen(sync_point));
46 #endif
47 
48 }
49 }
50 
Wsrep_client_service(THD * thd,Wsrep_client_state & client_state)51 Wsrep_client_service::Wsrep_client_service(THD* thd,
52                                            Wsrep_client_state& client_state)
53   : wsrep::client_service()
54   , m_thd(thd)
55   , m_client_state(client_state)
56 { }
57 
store_globals()58 void Wsrep_client_service::store_globals()
59 {
60   wsrep_store_threadvars(m_thd);
61 }
62 
reset_globals()63 void Wsrep_client_service::reset_globals()
64 {
65   wsrep_reset_threadvars(m_thd);
66 }
67 
interrupted(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED) const68 bool Wsrep_client_service::interrupted(
69   wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED) const
70 {
71   DBUG_ASSERT(m_thd == current_thd);
72   /* Underlying mutex in lock object points to THD::LOCK_thd_data, which
73   protects m_thd->wsrep_trx() and protects us from thd delete. */
74   mysql_mutex_assert_owner(static_cast<mysql_mutex_t*>(lock.mutex()->native()));
75   bool ret= (m_thd->killed != NOT_KILLED);
76   if (ret)
77   {
78     WSREP_DEBUG("wsrep state is interrupted, THD::killed %d trx state %d",
79                 m_thd->killed,  m_thd->wsrep_trx().state());
80   }
81   return ret;
82 }
83 
prepare_data_for_replication()84 int Wsrep_client_service::prepare_data_for_replication()
85 {
86   DBUG_ASSERT(m_thd == current_thd);
87   DBUG_ENTER("Wsrep_client_service::prepare_data_for_replication");
88   size_t data_len= 0;
89   IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
90 
91   if (cache)
92   {
93     m_thd->binlog_flush_pending_rows_event(true);
94     if (wsrep_write_cache(m_thd, cache, &data_len))
95     {
96       WSREP_ERROR("rbr write fail, data_len: %zu",
97                   data_len);
98       // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
99       DBUG_RETURN(1);
100     }
101   }
102 
103   if (data_len == 0)
104   {
105     if (m_thd->get_stmt_da()->is_ok()              &&
106         m_thd->get_stmt_da()->affected_rows() > 0  &&
107         !binlog_filter->is_on() &&
108         !m_thd->wsrep_trx().is_streaming())
109     {
110       WSREP_DEBUG("empty rbr buffer, query: %s, "
111                   "affected rows: %llu, "
112                   "changed tables: %d, "
113                   "sql_log_bin: %d",
114                   wsrep_thd_query(m_thd),
115                   m_thd->get_stmt_da()->affected_rows(),
116                   stmt_has_updated_trans_table(m_thd),
117                   m_thd->variables.sql_log_bin);
118     }
119     else
120     {
121       WSREP_DEBUG("empty rbr buffer, query: %s", wsrep_thd_query(m_thd));
122     }
123   }
124   DBUG_RETURN(0);
125 }
126 
127 
cleanup_transaction()128 void Wsrep_client_service::cleanup_transaction()
129 {
130   DBUG_ASSERT(m_thd == current_thd);
131   if (WSREP_EMULATE_BINLOG(m_thd)) wsrep_thd_binlog_trx_reset(m_thd);
132   m_thd->wsrep_affected_rows= 0;
133 }
134 
135 
prepare_fragment_for_replication(wsrep::mutable_buffer & buffer,size_t & log_position)136 int Wsrep_client_service::prepare_fragment_for_replication(
137   wsrep::mutable_buffer& buffer, size_t& log_position)
138 {
139   DBUG_ASSERT(m_thd == current_thd);
140   THD* thd= m_thd;
141   DBUG_ENTER("Wsrep_client_service::prepare_fragment_for_replication");
142   IO_CACHE* cache= wsrep_get_trans_cache(thd);
143   thd->binlog_flush_pending_rows_event(true);
144 
145   if (!cache)
146   {
147     DBUG_RETURN(0);
148   }
149 
150   const my_off_t saved_pos(my_b_tell(cache));
151   if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
152   {
153     DBUG_RETURN(1);
154   }
155 
156   int ret= 0;
157   size_t total_length= 0;
158   size_t length= my_b_bytes_in_cache(cache);
159 
160   if (!length)
161   {
162     length= my_b_fill(cache);
163   }
164 
165   if (length > 0)
166   {
167     do
168     {
169       total_length+= length;
170       if (total_length > wsrep_max_ws_size)
171       {
172         WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
173                    wsrep_max_ws_size, total_length);
174         ret= 1;
175         goto cleanup;
176       }
177 
178       buffer.push_back(reinterpret_cast<const char*>(cache->read_pos),
179                        reinterpret_cast<const char*>(cache->read_pos + length));
180       cache->read_pos= cache->read_end;
181     }
182     while (cache->file >= 0 && (length= my_b_fill(cache)));
183   }
184   DBUG_ASSERT(total_length == buffer.size());
185   log_position= saved_pos;
186 cleanup:
187   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
188   {
189     WSREP_WARN("Failed to reinitialize IO cache");
190     ret= 1;
191   }
192   DBUG_RETURN(ret);
193 }
194 
remove_fragments()195 int Wsrep_client_service::remove_fragments()
196 {
197   DBUG_ENTER("Wsrep_client_service::remove_fragments");
198   DEBUG_SYNC(m_thd, "wsrep_before_fragment_removal");
199   if (wsrep_schema->remove_fragments(m_thd,
200                                      Wsrep_server_state::instance().id(),
201                                      m_thd->wsrep_trx().id(),
202                                      m_thd->wsrep_sr().fragments()))
203   {
204     WSREP_DEBUG("Failed to remove fragments from SR storage for transaction "
205                 "%llu, %llu",
206                 m_thd->thread_id, m_thd->wsrep_trx().id().get());
207     DBUG_RETURN(1);
208   }
209   DBUG_RETURN(0);
210 }
211 
statement_allowed_for_streaming() const212 bool Wsrep_client_service::statement_allowed_for_streaming() const
213 {
214   /*
215     Todo: Decide if implicit commit is allowed with streaming
216     replication.
217     !stmt_causes_implicit_commit(m_thd, CF_IMPLICIT_COMMIT_BEGIN);
218   */
219   return true;
220 }
221 
bytes_generated() const222 size_t Wsrep_client_service::bytes_generated() const
223 {
224   IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
225   if (cache)
226   {
227     size_t pending_rows_event_length= 0;
228     if (Rows_log_event* ev= m_thd->binlog_get_pending_rows_event(true))
229     {
230       pending_rows_event_length= ev->get_data_size();
231     }
232     return my_b_tell(cache) + pending_rows_event_length;
233   }
234   return 0;
235 }
236 
will_replay()237 void Wsrep_client_service::will_replay()
238 {
239   DBUG_ASSERT(m_thd == current_thd);
240   mysql_mutex_lock(&LOCK_wsrep_replaying);
241   ++wsrep_replaying;
242   mysql_mutex_unlock(&LOCK_wsrep_replaying);
243 }
244 
signal_replayed()245 void Wsrep_client_service::signal_replayed()
246 {
247   DBUG_ASSERT(m_thd == current_thd);
248   mysql_mutex_lock(&LOCK_wsrep_replaying);
249   --wsrep_replaying;
250   DBUG_ASSERT(wsrep_replaying >= 0);
251   mysql_cond_broadcast(&COND_wsrep_replaying);
252   mysql_mutex_unlock(&LOCK_wsrep_replaying);
253 }
254 
replay()255 enum wsrep::provider::status Wsrep_client_service::replay()
256 {
257 
258   DBUG_ASSERT(m_thd == current_thd);
259   DBUG_ENTER("Wsrep_client_service::replay");
260 
261   /*
262     Allocate separate THD for replaying to avoid tampering
263     original THD state during replication event applying.
264    */
265   THD *replayer_thd= new THD(true, true);
266   replayer_thd->thread_stack= m_thd->thread_stack;
267   replayer_thd->real_id= pthread_self();
268   replayer_thd->prior_thr_create_utime=
269       replayer_thd->start_utime= microsecond_interval_timer();
270   replayer_thd->set_command(COM_SLEEP);
271   replayer_thd->reset_for_next_command(true);
272 
273   enum wsrep::provider::status ret;
274   {
275     Wsrep_replayer_service replayer_service(replayer_thd, m_thd);
276     wsrep::provider& provider(replayer_thd->wsrep_cs().provider());
277     ret= provider.replay(replayer_thd->wsrep_trx().ws_handle(),
278                          &replayer_service);
279     replayer_service.replay_status(ret);
280   }
281 
282   delete replayer_thd;
283   DBUG_RETURN(ret);
284 }
285 
replay_unordered()286 enum wsrep::provider::status Wsrep_client_service::replay_unordered()
287 {
288   DBUG_ASSERT(0);
289   return wsrep::provider::error_not_implemented;
290 }
291 
wait_for_replayers(wsrep::unique_lock<wsrep::mutex> & lock)292 void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
293 {
294   DBUG_ASSERT(m_thd == current_thd);
295   lock.unlock();
296   mysql_mutex_lock(&LOCK_wsrep_replaying);
297   /* We need to check if the THD is BF aborted during condition wait.
298      Because the aborter does not know which condition this thread is waiting,
299      use timed wait and check if the THD is BF aborted in the loop. */
300   while (wsrep_replaying > 0 && !wsrep_is_bf_aborted(m_thd))
301   {
302     struct timespec wait_time;
303     set_timespec_nsec(wait_time, 10000000L);
304     mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
305                          &wait_time);
306   }
307   mysql_mutex_unlock(&LOCK_wsrep_replaying);
308   lock.lock();
309 }
310 
commit_by_xid()311 enum wsrep::provider::status Wsrep_client_service::commit_by_xid()
312 {
313   DBUG_ASSERT(0);
314   return wsrep::provider::error_not_implemented;
315 }
316 
debug_sync(const char * sync_point)317 void Wsrep_client_service::debug_sync(const char* sync_point)
318 {
319   DBUG_ASSERT(m_thd == current_thd);
320   debug_sync_caller(m_thd, sync_point);
321 }
322 
debug_crash(const char * crash_point)323 void Wsrep_client_service::debug_crash(const char* crash_point)
324 {
325   // DBUG_ASSERT(m_thd == current_thd);
326   DBUG_EXECUTE_IF(crash_point, DBUG_SUICIDE(); );
327 }
328 
bf_rollback()329 int Wsrep_client_service::bf_rollback()
330 {
331   DBUG_ASSERT(m_thd == current_thd);
332   DBUG_ENTER("Wsrep_client_service::rollback");
333 
334   int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd));
335   if (m_thd->locked_tables_mode && m_thd->lock)
336   {
337     m_thd->locked_tables_list.unlock_locked_tables(m_thd);
338     m_thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
339   }
340   if (m_thd->global_read_lock.is_acquired())
341   {
342     m_thd->global_read_lock.unlock_global_read_lock(m_thd);
343   }
344   m_thd->release_transactional_locks();
345   m_thd->mdl_context.release_explicit_locks();
346 
347   DBUG_RETURN(ret);
348 }
349