1 /*
2  * Copyright (C) 2018 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 /** @file transaction.hpp */
21 #ifndef WSREP_TRANSACTION_HPP
22 #define WSREP_TRANSACTION_HPP
23 
24 #include "provider.hpp"
25 #include "server_state.hpp"
26 #include "transaction_id.hpp"
27 #include "streaming_context.hpp"
28 #include "lock.hpp"
29 #include "sr_key_set.hpp"
30 #include "buffer.hpp"
31 #include "client_service.hpp"
32 #include "xid.hpp"
33 
34 #include <cassert>
35 #include <vector>
36 
37 namespace wsrep
38 {
39     class client_service;
40     class client_state;
41     class key;
42     class const_buffer;
43 
44     class transaction
45     {
46     public:
47         enum state
48         {
49             s_executing,
50             s_preparing,
51             s_prepared,
52             s_certifying,
53             s_committing,
54             s_ordered_commit,
55             s_committed,
56             s_cert_failed,
57             s_must_abort,
58             s_aborting,
59             s_aborted,
60             s_must_replay,
61             s_replaying
62         };
63         static const int n_states = s_replaying + 1;
state() const64         enum state state() const
65         { return state_; }
66 
67         transaction(wsrep::client_state& client_state);
68         ~transaction();
69         // Accessors
id() const70         wsrep::transaction_id id() const
71         { return id_; }
72 
server_id() const73         const wsrep::id& server_id() const
74         { return server_id_; }
75 
active() const76         bool active() const
77         { return (id_ != wsrep::transaction_id::undefined()); }
78 
79 
80         void state(wsrep::unique_lock<wsrep::mutex>&, enum state);
81 
82         // Return true if the certification of the last
83         // fragment succeeded
certified() const84         bool certified() const { return certified_; }
85 
seqno() const86         wsrep::seqno seqno() const
87         {
88             return ws_meta_.seqno();
89         }
90         // Return true if the last fragment was ordered by the
91         // provider
ordered() const92         bool ordered() const
93         { return (ws_meta_.seqno().is_undefined() == false); }
94 
95         /**
96          * Return true if any fragments have been successfully certified
97          * for the transaction.
98          */
is_streaming() const99         bool is_streaming() const
100         {
101             return (streaming_context_.fragments_certified() > 0);
102         }
103 
104         /**
105          * Return number of fragments certified for current statement.
106          *
107          * This counts fragments which have been successfully certified
108          * since the construction of object or last after_statement()
109          * call.
110          *
111          * @return Number of fragments certified for current statement.
112          */
fragments_certified_for_statement() const113         size_t fragments_certified_for_statement() const
114         {
115             return fragments_certified_for_statement_;
116         }
117 
118         /**
119          * Return true if transaction has not generated any changes.
120          */
is_empty() const121         bool is_empty() const
122         {
123             return sr_keys_.empty();
124         }
125 
is_xa() const126         bool is_xa() const
127         {
128             return !xid_.is_null();
129         }
130 
131         void assign_xid(const wsrep::xid& xid);
132 
xid() const133         const wsrep::xid& xid() const
134         {
135             return xid_;
136         }
137 
138         int restore_to_prepared_state(const wsrep::xid& xid);
139 
140         int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit);
141 
142         void xa_detach();
143 
144         int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
145 
pa_unsafe() const146         bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); }
pa_unsafe(bool pa_unsafe)147         void pa_unsafe(bool pa_unsafe) {
148           if (pa_unsafe) {
149             flags(flags() | wsrep::provider::flag::pa_unsafe);
150           } else {
151             flags(flags() & ~wsrep::provider::flag::pa_unsafe);
152           }
153         }
implicit_deps() const154         bool implicit_deps() const { return implicit_deps_; }
implicit_deps(bool implicit)155         void implicit_deps(bool implicit) { implicit_deps_ = implicit; }
156 
157         int start_transaction(const wsrep::transaction_id& id);
158 
159         int start_transaction(const wsrep::ws_handle& ws_handle,
160                               const wsrep::ws_meta& ws_meta);
161 
162         int next_fragment(const wsrep::ws_meta& ws_meta);
163 
164         void adopt(const transaction& transaction);
165         void fragment_applied(wsrep::seqno seqno);
166 
167         int prepare_for_ordering(const wsrep::ws_handle& ws_handle,
168                                  const wsrep::ws_meta& ws_meta,
169                                  bool is_commit);
170 
171         int assign_read_view(const wsrep::gtid* gtid);
172 
173         int append_key(const wsrep::key&);
174 
175         int append_data(const wsrep::const_buffer&);
176 
177         int after_row();
178 
179         int before_prepare(wsrep::unique_lock<wsrep::mutex>&);
180 
181         int after_prepare(wsrep::unique_lock<wsrep::mutex>&);
182 
183         int before_commit();
184 
185         int ordered_commit();
186 
187         int after_commit();
188 
189         int before_rollback();
190 
191         int after_rollback();
192 
193         int before_statement();
194 
195         int after_statement();
196 
197         void after_command_must_abort(wsrep::unique_lock<wsrep::mutex>&);
198 
199         void after_applying();
200 
201         bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
202                       wsrep::seqno bf_seqno);
203         bool total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>&,
204                                   wsrep::seqno bf_seqno);
205 
206         void clone_for_replay(const wsrep::transaction& other);
207 
bf_aborted() const208         bool bf_aborted() const
209         {
210             return (bf_abort_client_state_ != 0);
211         }
212 
bf_aborted_in_total_order() const213         bool bf_aborted_in_total_order() const
214         {
215             return bf_aborted_in_total_order_;
216         }
217 
flags() const218         int flags() const
219         {
220             return flags_;
221         }
222 
223         // wsrep::mutex& mutex();
ws_handle()224         wsrep::ws_handle& ws_handle() { return ws_handle_; }
ws_handle() const225         const wsrep::ws_handle& ws_handle() const { return ws_handle_; }
ws_meta() const226         const wsrep::ws_meta& ws_meta() const { return ws_meta_; }
streaming_context() const227         const wsrep::streaming_context& streaming_context() const
228         { return streaming_context_; }
streaming_context()229         wsrep::streaming_context& streaming_context()
230         { return streaming_context_; }
adopt_apply_error(wsrep::mutable_buffer & buf)231         void adopt_apply_error(wsrep::mutable_buffer& buf)
232         {
233             apply_error_buf_ = std::move(buf);
234         }
apply_error() const235         const wsrep::mutable_buffer& apply_error() const
236         { return apply_error_buf_; }
237     private:
238         transaction(const transaction&);
239         transaction operator=(const transaction&);
240 
241         wsrep::provider& provider();
flags(int flags)242         void flags(int flags) { flags_ = flags; }
243         // Return true if the transaction must abort, is aborting,
244         // or has been aborted, or has been interrupted by DBMS
245         // as indicated by client_service::interrupted() call.
246         // The call will adjust transaction state and set client_state
247         // error status accordingly.
248         bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
249         int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false);
250         int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
251         int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
252         int append_sr_keys_for_commit();
253         int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
254         void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&);
255         int replay(wsrep::unique_lock<wsrep::mutex>&);
256         void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&);
257         int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&);
258         void cleanup();
259         void debug_log_state(const char*) const;
260         void debug_log_key_append(const wsrep::key& key) const;
261 
262         wsrep::server_service& server_service_;
263         wsrep::client_service& client_service_;
264         wsrep::client_state& client_state_;
265         wsrep::id server_id_;
266         wsrep::transaction_id id_;
267         enum state state_;
268         std::vector<enum state> state_hist_;
269         enum state bf_abort_state_;
270         enum wsrep::provider::status bf_abort_provider_status_;
271         int bf_abort_client_state_;
272         bool bf_aborted_in_total_order_;
273         wsrep::ws_handle ws_handle_;
274         wsrep::ws_meta ws_meta_;
275         int flags_;
276         bool implicit_deps_;
277         bool certified_;
278         size_t fragments_certified_for_statement_;
279         wsrep::streaming_context streaming_context_;
280         wsrep::sr_key_set sr_keys_;
281         wsrep::mutable_buffer apply_error_buf_;
282         wsrep::xid xid_;
283         bool streaming_rollback_in_progress_;
284     };
285 
to_c_string(enum wsrep::transaction::state state)286     static inline const char* to_c_string(enum wsrep::transaction::state state)
287     {
288         switch (state)
289         {
290         case wsrep::transaction::s_executing: return "executing";
291         case wsrep::transaction::s_preparing: return "preparing";
292         case wsrep::transaction::s_prepared: return "prepared";
293         case wsrep::transaction::s_certifying: return "certifying";
294         case wsrep::transaction::s_committing: return "committing";
295         case wsrep::transaction::s_ordered_commit: return "ordered_commit";
296         case wsrep::transaction::s_committed: return "committed";
297         case wsrep::transaction::s_cert_failed: return "cert_failed";
298         case wsrep::transaction::s_must_abort: return "must_abort";
299         case wsrep::transaction::s_aborting: return "aborting";
300         case wsrep::transaction::s_aborted: return "aborted";
301         case wsrep::transaction::s_must_replay: return "must_replay";
302         case wsrep::transaction::s_replaying: return "replaying";
303         }
304         return "unknown";
305     }
to_string(enum wsrep::transaction::state state)306     static inline std::string to_string(enum wsrep::transaction::state state)
307     {
308         return to_c_string(state);
309     }
310 
311 }
312 
313 #endif // WSREP_TRANSACTION_HPP
314