1 /* Copyright 2018 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16 #include "my_global.h"
17 #include "wsrep_storage_service.h"
18 #include "wsrep_trans_observer.h" /* wsrep_open() */
19 #include "wsrep_schema.h"
20 #include "wsrep_binlog.h"
21
22 #include "sql_class.h"
23 #include "mysqld.h" /* next_query_id() */
24 #include "slave.h" /* opt_log_slave_updates() */
25 #include "transaction.h" /* trans_commit(), trans_rollback() */
26
27 /*
28 Temporarily enable wsrep on thd
29 */
30 class Wsrep_on
31 {
32 public:
Wsrep_on(THD * thd)33 Wsrep_on(THD* thd)
34 : m_thd(thd)
35 , m_wsrep_on(thd->variables.wsrep_on)
36 {
37 thd->variables.wsrep_on= TRUE;
38 }
~Wsrep_on()39 ~Wsrep_on()
40 {
41 m_thd->variables.wsrep_on= m_wsrep_on;
42 }
43 private:
44 THD* m_thd;
45 my_bool m_wsrep_on;
46 };
47
Wsrep_storage_service(THD * thd)48 Wsrep_storage_service::Wsrep_storage_service(THD* thd)
49 : wsrep::storage_service()
50 , wsrep::high_priority_context(thd->wsrep_cs())
51 , m_thd(thd)
52 {
53 thd->security_ctx->skip_grants();
54 thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
55
56 /* No binlogging */
57
58 /* No general log */
59 thd->variables.option_bits |= OPTION_LOG_OFF;
60
61 /* Read committed isolation to avoid gap locking */
62 thd->variables.tx_isolation = ISO_READ_COMMITTED;
63
64 /* Keep wsrep on to enter commit ordering hooks */
65 thd->variables.wsrep_on= 1;
66 thd->wsrep_skip_locking= true;
67
68 wsrep_open(thd);
69 wsrep_before_command(thd);
70 }
71
~Wsrep_storage_service()72 Wsrep_storage_service::~Wsrep_storage_service()
73 {
74 wsrep_after_command_ignore_result(m_thd);
75 wsrep_close(m_thd);
76 m_thd->wsrep_skip_locking= false;
77 }
78
start_transaction(const wsrep::ws_handle & ws_handle)79 int Wsrep_storage_service::start_transaction(const wsrep::ws_handle& ws_handle)
80 {
81 DBUG_ENTER("Wsrep_storage_service::start_transaction");
82 DBUG_ASSERT(m_thd == current_thd);
83 DBUG_PRINT("info", ("Wsrep_storage_service::start_transcation(%llu, %p)",
84 m_thd->thread_id, m_thd));
85 m_thd->set_wsrep_next_trx_id(ws_handle.transaction_id().get());
86 DBUG_RETURN(m_thd->wsrep_cs().start_transaction(
87 wsrep::transaction_id(m_thd->wsrep_next_trx_id())) ||
88 trans_begin(m_thd, MYSQL_START_TRANS_OPT_READ_WRITE));
89 }
90
adopt_transaction(const wsrep::transaction & transaction)91 void Wsrep_storage_service::adopt_transaction(const wsrep::transaction& transaction)
92 {
93 DBUG_ENTER("Wsrep_Storage_server::adopt_transaction");
94 DBUG_ASSERT(m_thd == current_thd);
95 m_thd->wsrep_cs().adopt_transaction(transaction);
96 trans_begin(m_thd, MYSQL_START_TRANS_OPT_READ_WRITE);
97 DBUG_VOID_RETURN;
98 }
99
append_fragment(const wsrep::id & server_id,wsrep::transaction_id transaction_id,int flags,const wsrep::const_buffer & data,const wsrep::xid & xid WSREP_UNUSED)100 int Wsrep_storage_service::append_fragment(const wsrep::id& server_id,
101 wsrep::transaction_id transaction_id,
102 int flags,
103 const wsrep::const_buffer& data,
104 const wsrep::xid& xid WSREP_UNUSED)
105 {
106 DBUG_ENTER("Wsrep_storage_service::append_fragment");
107 DBUG_ASSERT(m_thd == current_thd);
108 DBUG_PRINT("info", ("Wsrep_storage_service::append_fragment(%llu, %p)",
109 m_thd->thread_id, m_thd));
110 int ret= wsrep_schema->append_fragment(m_thd,
111 server_id,
112 transaction_id,
113 wsrep::seqno(-1),
114 flags,
115 data);
116 DBUG_RETURN(ret);
117 }
118
update_fragment_meta(const wsrep::ws_meta & ws_meta)119 int Wsrep_storage_service::update_fragment_meta(const wsrep::ws_meta& ws_meta)
120 {
121 DBUG_ENTER("Wsrep_storage_service::update_fragment_meta");
122 DBUG_ASSERT(m_thd == current_thd);
123 DBUG_PRINT("info", ("Wsrep_storage_service::update_fragment_meta(%llu, %p)",
124 m_thd->thread_id, m_thd));
125 int ret= wsrep_schema->update_fragment_meta(m_thd, ws_meta);
126 DBUG_RETURN(ret);
127 }
128
remove_fragments()129 int Wsrep_storage_service::remove_fragments()
130 {
131 DBUG_ENTER("Wsrep_storage_service::remove_fragments");
132 DBUG_ASSERT(m_thd == current_thd);
133
134 int ret= wsrep_schema->remove_fragments(m_thd,
135 m_thd->wsrep_trx().server_id(),
136 m_thd->wsrep_trx().id(),
137 m_thd->wsrep_sr().fragments());
138 DBUG_RETURN(ret);
139 }
140
commit(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)141 int Wsrep_storage_service::commit(const wsrep::ws_handle& ws_handle,
142 const wsrep::ws_meta& ws_meta)
143 {
144 DBUG_ENTER("Wsrep_storage_service::commit");
145 DBUG_ASSERT(m_thd == current_thd);
146 DBUG_PRINT("info", ("Wsrep_storage_service::commit(%llu, %p)",
147 m_thd->thread_id, m_thd));
148 WSREP_DEBUG("Storage service commit: %llu, %lld",
149 ws_meta.transaction_id().get(), ws_meta.seqno().get());
150 int ret= 0;
151 const bool is_ordered= !ws_meta.seqno().is_undefined();
152
153 if (is_ordered)
154 {
155 ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true);
156 }
157
158 ret= ret || trans_commit(m_thd);
159
160 if (!is_ordered)
161 {
162 /* Wsrep commit was not ordered so it does not go through commit time
163 hooks and remains active. Roll it back to make cleanup happen
164 in after_applying() call. */
165 m_thd->wsrep_cs().before_rollback();
166 m_thd->wsrep_cs().after_rollback();
167 }
168 else if (ret)
169 {
170 /* Commit failed, this probably means that the parent SR transaction
171 was BF aborted. Roll back out of order, the parent
172 transaction will release commit order after it has rolled back. */
173 m_thd->wsrep_cs().prepare_for_ordering(wsrep::ws_handle(),
174 wsrep::ws_meta(),
175 false);
176 trans_rollback(m_thd);
177 }
178 m_thd->wsrep_cs().after_applying();
179 m_thd->release_transactional_locks();
180 DBUG_RETURN(ret);
181 }
182
rollback(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)183 int Wsrep_storage_service::rollback(const wsrep::ws_handle& ws_handle,
184 const wsrep::ws_meta& ws_meta)
185 {
186 DBUG_ENTER("Wsrep_storage_service::rollback");
187 DBUG_ASSERT(m_thd == current_thd);
188 DBUG_PRINT("info", ("Wsrep_storage_service::rollback(%llu, %p)",
189 m_thd->thread_id, m_thd));
190 int ret= (m_thd->wsrep_cs().prepare_for_ordering(
191 ws_handle, ws_meta, false) ||
192 trans_rollback(m_thd));
193 m_thd->wsrep_cs().after_applying();
194 m_thd->release_transactional_locks();
195 DBUG_RETURN(ret);
196 }
197
store_globals()198 void Wsrep_storage_service::store_globals()
199 {
200 wsrep_store_threadvars(m_thd);
201 }
202
reset_globals()203 void Wsrep_storage_service::reset_globals()
204 {
205 wsrep_reset_threadvars(m_thd);
206 }
207