1 /* Copyright 2018-2021 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
15 
16 #include "wsrep_client_service.h"
17 #include "wsrep_high_priority_service.h"
18 #include "wsrep_binlog.h"  /* wsrep_dump_rbr_buf() */
19 #include "wsrep_schema.h"  /* remove_fragments() */
20 #include "wsrep_thd.h"
21 #include "wsrep_xid.h"
22 #include "wsrep_trans_observer.h"
23 
24 #include "sql_base.h"    /* close_temporary_table() */
25 #include "sql_class.h"   /* THD */
26 #include "sql_parse.h"   /* stmt_causes_implicit_commit() */
27 #include "rpl_filter.h"  /* binlog_filter */
28 #include "rpl_rli.h"     /* Relay_log_info */
29 #include "slave.h"   /* opt_log_slave_updates */
30 #include "transaction.h" /* trans_commit()... */
31 #include "log.h"      /* stmt_has_updated_trans_table() */
32 #include "mysql/service_debug_sync.h"
33 #include "mysql/psi/mysql_thread.h" /* mysql_mutex_assert_owner() */
34 
35 namespace
36 {
37 
debug_sync_caller(THD * thd,const char * sync_point)38 void debug_sync_caller(THD* thd, const char* sync_point)
39 {
40 #ifdef ENABLED_DEBUG_SYNC_OUT
41   debug_sync_set_action(thd, sync_point, strlen(sync_point));
42 #endif
43 #ifdef ENABLED_DEBUG_SYNC
44   if (debug_sync_service) debug_sync_service(thd,sync_point,strlen(sync_point));
45 #endif
46 
47 }
48 }
49 
Wsrep_client_service(THD * thd,Wsrep_client_state & client_state)50 Wsrep_client_service::Wsrep_client_service(THD* thd,
51                                            Wsrep_client_state& client_state)
52   : wsrep::client_service()
53   , m_thd(thd)
54   , m_client_state(client_state)
55 { }
56 
store_globals()57 void Wsrep_client_service::store_globals()
58 {
59   wsrep_store_threadvars(m_thd);
60 }
61 
reset_globals()62 void Wsrep_client_service::reset_globals()
63 {
64   wsrep_reset_threadvars(m_thd);
65 }
66 
interrupted(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED) const67 bool Wsrep_client_service::interrupted(
68   wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED) const
69 {
70   DBUG_ASSERT(m_thd == current_thd);
71   /* Underlying mutex in lock object points to THD::LOCK_thd_data, which
72   protects m_thd->wsrep_trx() and protects us from thd delete. */
73   mysql_mutex_assert_owner(static_cast<mysql_mutex_t*>(lock.mutex()->native()));
74   bool ret= (m_thd->killed != NOT_KILLED);
75   if (ret)
76   {
77     WSREP_DEBUG("wsrep state is interrupted, THD::killed %d trx state %d",
78                 m_thd->killed,  m_thd->wsrep_trx().state());
79   }
80   return ret;
81 }
82 
prepare_data_for_replication()83 int Wsrep_client_service::prepare_data_for_replication()
84 {
85   DBUG_ASSERT(m_thd == current_thd);
86   DBUG_ENTER("Wsrep_client_service::prepare_data_for_replication");
87   size_t data_len= 0;
88   IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
89 
90   if (cache)
91   {
92     m_thd->binlog_flush_pending_rows_event(true);
93     if (wsrep_write_cache(m_thd, cache, &data_len))
94     {
95       WSREP_ERROR("rbr write fail, data_len: %zu",
96                   data_len);
97       // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
98       DBUG_RETURN(1);
99     }
100   }
101 
102   if (data_len == 0)
103   {
104     if (m_thd->get_stmt_da()->is_ok()              &&
105         m_thd->get_stmt_da()->affected_rows() > 0  &&
106         !binlog_filter->is_on() &&
107         !m_thd->wsrep_trx().is_streaming())
108     {
109       WSREP_DEBUG("empty rbr buffer, query: %s, "
110                   "affected rows: %llu, "
111                   "changed tables: %d, "
112                   "sql_log_bin: %d",
113                   wsrep_thd_query(m_thd),
114                   m_thd->get_stmt_da()->affected_rows(),
115                   stmt_has_updated_trans_table(m_thd),
116                   m_thd->variables.sql_log_bin);
117     }
118     else
119     {
120       WSREP_DEBUG("empty rbr buffer, query: %s", wsrep_thd_query(m_thd));
121     }
122   }
123   DBUG_RETURN(0);
124 }
125 
126 
cleanup_transaction()127 void Wsrep_client_service::cleanup_transaction()
128 {
129   DBUG_ASSERT(m_thd == current_thd);
130   if (WSREP_EMULATE_BINLOG(m_thd)) wsrep_thd_binlog_trx_reset(m_thd);
131   m_thd->wsrep_affected_rows= 0;
132 }
133 
134 
prepare_fragment_for_replication(wsrep::mutable_buffer & buffer,size_t & log_position)135 int Wsrep_client_service::prepare_fragment_for_replication(
136   wsrep::mutable_buffer& buffer, size_t& log_position)
137 {
138   DBUG_ASSERT(m_thd == current_thd);
139   THD* thd= m_thd;
140   DBUG_ENTER("Wsrep_client_service::prepare_fragment_for_replication");
141   IO_CACHE* cache= wsrep_get_trans_cache(thd);
142   thd->binlog_flush_pending_rows_event(true);
143 
144   if (!cache)
145   {
146     DBUG_RETURN(0);
147   }
148 
149   const my_off_t saved_pos(my_b_tell(cache));
150   if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
151   {
152     DBUG_RETURN(1);
153   }
154 
155   int ret= 0;
156   size_t total_length= 0;
157   size_t length= my_b_bytes_in_cache(cache);
158 
159   if (!length)
160   {
161     length= my_b_fill(cache);
162   }
163 
164   if (length > 0)
165   {
166     do
167     {
168       total_length+= length;
169       if (total_length > wsrep_max_ws_size)
170       {
171         WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
172                    wsrep_max_ws_size, total_length);
173         ret= 1;
174         goto cleanup;
175       }
176 
177       buffer.push_back(reinterpret_cast<const char*>(cache->read_pos),
178                        reinterpret_cast<const char*>(cache->read_pos + length));
179       cache->read_pos= cache->read_end;
180     }
181     while (cache->file >= 0 && (length= my_b_fill(cache)));
182   }
183   DBUG_ASSERT(total_length == buffer.size());
184   log_position= saved_pos;
185 cleanup:
186   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
187   {
188     WSREP_WARN("Failed to reinitialize IO cache");
189     ret= 1;
190   }
191   DBUG_RETURN(ret);
192 }
193 
remove_fragments()194 int Wsrep_client_service::remove_fragments()
195 {
196   DBUG_ENTER("Wsrep_client_service::remove_fragments");
197   DEBUG_SYNC(m_thd, "wsrep_before_fragment_removal");
198   if (wsrep_schema->remove_fragments(m_thd,
199                                      Wsrep_server_state::instance().id(),
200                                      m_thd->wsrep_trx().id(),
201                                      m_thd->wsrep_sr().fragments()))
202   {
203     WSREP_DEBUG("Failed to remove fragments from SR storage for transaction "
204                 "%llu, %llu",
205                 m_thd->thread_id, m_thd->wsrep_trx().id().get());
206     DBUG_RETURN(1);
207   }
208   DBUG_RETURN(0);
209 }
210 
statement_allowed_for_streaming() const211 bool Wsrep_client_service::statement_allowed_for_streaming() const
212 {
213   /*
214     Todo: Decide if implicit commit is allowed with streaming
215     replication.
216     !stmt_causes_implicit_commit(m_thd, CF_IMPLICIT_COMMIT_BEGIN);
217   */
218   return true;
219 }
220 
bytes_generated() const221 size_t Wsrep_client_service::bytes_generated() const
222 {
223   IO_CACHE* cache= wsrep_get_trans_cache(m_thd);
224   if (cache)
225   {
226     size_t pending_rows_event_length= 0;
227     if (Rows_log_event* ev= m_thd->binlog_get_pending_rows_event(true))
228     {
229       pending_rows_event_length= ev->get_data_size();
230     }
231     return my_b_tell(cache) + pending_rows_event_length;
232   }
233   return 0;
234 }
235 
will_replay()236 void Wsrep_client_service::will_replay()
237 {
238   DBUG_ASSERT(m_thd == current_thd);
239   mysql_mutex_lock(&LOCK_wsrep_replaying);
240   ++wsrep_replaying;
241   mysql_mutex_unlock(&LOCK_wsrep_replaying);
242 }
243 
signal_replayed()244 void Wsrep_client_service::signal_replayed()
245 {
246   DBUG_ASSERT(m_thd == current_thd);
247   mysql_mutex_lock(&LOCK_wsrep_replaying);
248   --wsrep_replaying;
249   DBUG_ASSERT(wsrep_replaying >= 0);
250   mysql_cond_broadcast(&COND_wsrep_replaying);
251   mysql_mutex_unlock(&LOCK_wsrep_replaying);
252 }
253 
replay()254 enum wsrep::provider::status Wsrep_client_service::replay()
255 {
256 
257   DBUG_ASSERT(m_thd == current_thd);
258   DBUG_ENTER("Wsrep_client_service::replay");
259 
260   /*
261     Allocate separate THD for replaying to avoid tampering
262     original THD state during replication event applying.
263    */
264   THD *replayer_thd= new THD(true, true);
265   replayer_thd->thread_stack= m_thd->thread_stack;
266   replayer_thd->real_id= pthread_self();
267   replayer_thd->prior_thr_create_utime=
268       replayer_thd->start_utime= microsecond_interval_timer();
269   replayer_thd->set_command(COM_SLEEP);
270   replayer_thd->reset_for_next_command(true);
271 
272   enum wsrep::provider::status ret;
273   {
274     Wsrep_replayer_service replayer_service(replayer_thd, m_thd);
275     wsrep::provider& provider(replayer_thd->wsrep_cs().provider());
276     ret= provider.replay(replayer_thd->wsrep_trx().ws_handle(),
277                          &replayer_service);
278     replayer_service.replay_status(ret);
279   }
280 
281   delete replayer_thd;
282   DBUG_RETURN(ret);
283 }
284 
replay_unordered()285 enum wsrep::provider::status Wsrep_client_service::replay_unordered()
286 {
287   DBUG_ASSERT(0);
288   return wsrep::provider::error_not_implemented;
289 }
290 
wait_for_replayers(wsrep::unique_lock<wsrep::mutex> & lock)291 void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
292 {
293   DBUG_ASSERT(m_thd == current_thd);
294   lock.unlock();
295   mysql_mutex_lock(&LOCK_wsrep_replaying);
296   /* We need to check if the THD is BF aborted during condition wait.
297      Because the aborter does not know which condition this thread is waiting,
298      use timed wait and check if the THD is BF aborted in the loop. */
299   while (wsrep_replaying > 0 && !wsrep_is_bf_aborted(m_thd))
300   {
301     struct timespec wait_time;
302     set_timespec_nsec(wait_time, 10000000L);
303     mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
304                          &wait_time);
305   }
306   mysql_mutex_unlock(&LOCK_wsrep_replaying);
307   lock.lock();
308 }
309 
commit_by_xid()310 enum wsrep::provider::status Wsrep_client_service::commit_by_xid()
311 {
312   DBUG_ASSERT(0);
313   return wsrep::provider::error_not_implemented;
314 }
315 
debug_sync(const char * sync_point)316 void Wsrep_client_service::debug_sync(const char* sync_point)
317 {
318   DBUG_ASSERT(m_thd == current_thd);
319   debug_sync_caller(m_thd, sync_point);
320 }
321 
debug_crash(const char * crash_point)322 void Wsrep_client_service::debug_crash(const char* crash_point)
323 {
324   // DBUG_ASSERT(m_thd == current_thd);
325   DBUG_EXECUTE_IF(crash_point, DBUG_SUICIDE(); );
326 }
327 
bf_rollback()328 int Wsrep_client_service::bf_rollback()
329 {
330   DBUG_ASSERT(m_thd == current_thd);
331   DBUG_ENTER("Wsrep_client_service::rollback");
332 
333   int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd));
334   if (m_thd->locked_tables_mode && m_thd->lock)
335   {
336     if (m_thd->locked_tables_list.unlock_locked_tables(m_thd))
337       ret= 1;
338     m_thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
339   }
340   if (m_thd->global_read_lock.is_acquired())
341   {
342     m_thd->global_read_lock.unlock_global_read_lock(m_thd);
343   }
344   m_thd->release_transactional_locks();
345   m_thd->mdl_context.release_explicit_locks();
346 
347   DBUG_RETURN(ret);
348 }
349