1 /* 2 * Copyright (C) 2018 Codership Oy <info@codership.com> 3 * 4 * This file is part of wsrep-lib. 5 * 6 * Wsrep-lib is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 2 of the License, or 9 * (at your option) any later version. 10 * 11 * Wsrep-lib is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>. 18 */ 19 20 /** @file transaction.hpp */ 21 #ifndef WSREP_TRANSACTION_HPP 22 #define WSREP_TRANSACTION_HPP 23 24 #include "provider.hpp" 25 #include "server_state.hpp" 26 #include "transaction_id.hpp" 27 #include "streaming_context.hpp" 28 #include "lock.hpp" 29 #include "sr_key_set.hpp" 30 #include "buffer.hpp" 31 #include "client_service.hpp" 32 #include "xid.hpp" 33 34 #include <cassert> 35 #include <vector> 36 37 namespace wsrep 38 { 39 class client_service; 40 class client_state; 41 class key; 42 class const_buffer; 43 44 class transaction 45 { 46 public: 47 enum state 48 { 49 s_executing, 50 s_preparing, 51 s_prepared, 52 s_certifying, 53 s_committing, 54 s_ordered_commit, 55 s_committed, 56 s_cert_failed, 57 s_must_abort, 58 s_aborting, 59 s_aborted, 60 s_must_replay, 61 s_replaying 62 }; 63 static const int n_states = s_replaying + 1; state() const64 enum state state() const 65 { return state_; } 66 67 transaction(wsrep::client_state& client_state); 68 ~transaction(); 69 // Accessors id() const70 wsrep::transaction_id id() const 71 { return id_; } 72 server_id() const73 const wsrep::id& server_id() const 74 { return server_id_; } 75 active() const76 bool active() const 77 { return (id_ != wsrep::transaction_id::undefined()); } 78 79 80 void state(wsrep::unique_lock<wsrep::mutex>&, enum state); 81 82 // Return true if the certification of the last 83 // fragment succeeded certified() const84 bool certified() const { return certified_; } 85 seqno() const86 wsrep::seqno seqno() const 87 { 88 return ws_meta_.seqno(); 89 } 90 // Return true if the last fragment was ordered by the 91 // provider ordered() const92 bool ordered() const 93 { return (ws_meta_.seqno().is_undefined() == false); } 94 95 /** 96 * Return true if any fragments have been successfully certified 97 * for the transaction. 98 */ is_streaming() const99 bool is_streaming() const 100 { 101 return (streaming_context_.fragments_certified() > 0); 102 } 103 104 /** 105 * Return number of fragments certified for current statement. 106 * 107 * This counts fragments which have been successfully certified 108 * since the construction of object or last after_statement() 109 * call. 110 * 111 * @return Number of fragments certified for current statement. 112 */ fragments_certified_for_statement() const113 size_t fragments_certified_for_statement() const 114 { 115 return fragments_certified_for_statement_; 116 } 117 118 /** 119 * Return true if transaction has not generated any changes. 120 */ is_empty() const121 bool is_empty() const 122 { 123 return sr_keys_.empty(); 124 } 125 is_xa() const126 bool is_xa() const 127 { 128 return !xid_.is_null(); 129 } 130 131 void assign_xid(const wsrep::xid& xid); 132 xid() const133 const wsrep::xid& xid() const 134 { 135 return xid_; 136 } 137 138 int restore_to_prepared_state(const wsrep::xid& xid); 139 140 int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit); 141 142 void xa_detach(); 143 144 int xa_replay(wsrep::unique_lock<wsrep::mutex>&); 145 pa_unsafe() const146 bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); } pa_unsafe(bool pa_unsafe)147 void pa_unsafe(bool pa_unsafe) { 148 if (pa_unsafe) { 149 flags(flags() | wsrep::provider::flag::pa_unsafe); 150 } else { 151 flags(flags() & ~wsrep::provider::flag::pa_unsafe); 152 } 153 } implicit_deps() const154 bool implicit_deps() const { return implicit_deps_; } implicit_deps(bool implicit)155 void implicit_deps(bool implicit) { implicit_deps_ = implicit; } 156 157 int start_transaction(const wsrep::transaction_id& id); 158 159 int start_transaction(const wsrep::ws_handle& ws_handle, 160 const wsrep::ws_meta& ws_meta); 161 162 int next_fragment(const wsrep::ws_meta& ws_meta); 163 164 void adopt(const transaction& transaction); 165 void fragment_applied(wsrep::seqno seqno); 166 167 int prepare_for_ordering(const wsrep::ws_handle& ws_handle, 168 const wsrep::ws_meta& ws_meta, 169 bool is_commit); 170 171 int assign_read_view(const wsrep::gtid* gtid); 172 173 int append_key(const wsrep::key&); 174 175 int append_data(const wsrep::const_buffer&); 176 177 int after_row(); 178 179 int before_prepare(wsrep::unique_lock<wsrep::mutex>&); 180 181 int after_prepare(wsrep::unique_lock<wsrep::mutex>&); 182 183 int before_commit(); 184 185 int ordered_commit(); 186 187 int after_commit(); 188 189 int before_rollback(); 190 191 int after_rollback(); 192 193 int before_statement(); 194 195 int after_statement(); 196 197 void after_command_must_abort(wsrep::unique_lock<wsrep::mutex>&); 198 199 void after_applying(); 200 201 bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock, 202 wsrep::seqno bf_seqno); 203 bool total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>&, 204 wsrep::seqno bf_seqno); 205 206 void clone_for_replay(const wsrep::transaction& other); 207 bf_aborted() const208 bool bf_aborted() const 209 { 210 return (bf_abort_client_state_ != 0); 211 } 212 bf_aborted_in_total_order() const213 bool bf_aborted_in_total_order() const 214 { 215 return bf_aborted_in_total_order_; 216 } 217 flags() const218 int flags() const 219 { 220 return flags_; 221 } 222 223 // wsrep::mutex& mutex(); ws_handle()224 wsrep::ws_handle& ws_handle() { return ws_handle_; } ws_handle() const225 const wsrep::ws_handle& ws_handle() const { return ws_handle_; } ws_meta() const226 const wsrep::ws_meta& ws_meta() const { return ws_meta_; } streaming_context() const227 const wsrep::streaming_context& streaming_context() const 228 { return streaming_context_; } streaming_context()229 wsrep::streaming_context& streaming_context() 230 { return streaming_context_; } adopt_apply_error(wsrep::mutable_buffer & buf)231 void adopt_apply_error(wsrep::mutable_buffer& buf) 232 { 233 apply_error_buf_ = std::move(buf); 234 } apply_error() const235 const wsrep::mutable_buffer& apply_error() const 236 { return apply_error_buf_; } 237 private: 238 transaction(const transaction&); 239 transaction operator=(const transaction&); 240 241 wsrep::provider& provider(); flags(int flags)242 void flags(int flags) { flags_ = flags; } 243 // Return true if the transaction must abort, is aborting, 244 // or has been aborted, or has been interrupted by DBMS 245 // as indicated by client_service::interrupted() call. 246 // The call will adjust transaction state and set client_state 247 // error status accordingly. 248 bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&); 249 int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false); 250 int certify_fragment(wsrep::unique_lock<wsrep::mutex>&); 251 int certify_commit(wsrep::unique_lock<wsrep::mutex>&); 252 int append_sr_keys_for_commit(); 253 int release_commit_order(wsrep::unique_lock<wsrep::mutex>&); 254 void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&); 255 int replay(wsrep::unique_lock<wsrep::mutex>&); 256 void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&); 257 int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&); 258 void cleanup(); 259 void debug_log_state(const char*) const; 260 void debug_log_key_append(const wsrep::key& key) const; 261 262 wsrep::server_service& server_service_; 263 wsrep::client_service& client_service_; 264 wsrep::client_state& client_state_; 265 wsrep::id server_id_; 266 wsrep::transaction_id id_; 267 enum state state_; 268 std::vector<enum state> state_hist_; 269 enum state bf_abort_state_; 270 enum wsrep::provider::status bf_abort_provider_status_; 271 int bf_abort_client_state_; 272 bool bf_aborted_in_total_order_; 273 wsrep::ws_handle ws_handle_; 274 wsrep::ws_meta ws_meta_; 275 int flags_; 276 bool implicit_deps_; 277 bool certified_; 278 size_t fragments_certified_for_statement_; 279 wsrep::streaming_context streaming_context_; 280 wsrep::sr_key_set sr_keys_; 281 wsrep::mutable_buffer apply_error_buf_; 282 wsrep::xid xid_; 283 bool streaming_rollback_in_progress_; 284 }; 285 to_c_string(enum wsrep::transaction::state state)286 static inline const char* to_c_string(enum wsrep::transaction::state state) 287 { 288 switch (state) 289 { 290 case wsrep::transaction::s_executing: return "executing"; 291 case wsrep::transaction::s_preparing: return "preparing"; 292 case wsrep::transaction::s_prepared: return "prepared"; 293 case wsrep::transaction::s_certifying: return "certifying"; 294 case wsrep::transaction::s_committing: return "committing"; 295 case wsrep::transaction::s_ordered_commit: return "ordered_commit"; 296 case wsrep::transaction::s_committed: return "committed"; 297 case wsrep::transaction::s_cert_failed: return "cert_failed"; 298 case wsrep::transaction::s_must_abort: return "must_abort"; 299 case wsrep::transaction::s_aborting: return "aborting"; 300 case wsrep::transaction::s_aborted: return "aborted"; 301 case wsrep::transaction::s_must_replay: return "must_replay"; 302 case wsrep::transaction::s_replaying: return "replaying"; 303 } 304 return "unknown"; 305 } to_string(enum wsrep::transaction::state state)306 static inline std::string to_string(enum wsrep::transaction::state state) 307 { 308 return to_c_string(state); 309 } 310 311 } 312 313 #endif // WSREP_TRANSACTION_HPP 314